Skip to content

Commit

Permalink
Merge pull request #120 from learningOrchestra/feature-data-scientist…
Browse files Browse the repository at this point in the history
…-pipeline

Feature data scientist pipeline
  • Loading branch information
riibeirogabriel authored Feb 18, 2021
2 parents ce38d66 + 6edcead commit a4a3be6
Show file tree
Hide file tree
Showing 50 changed files with 3,771 additions and 986 deletions.
124 changes: 87 additions & 37 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -99,13 +99,35 @@ services:
volumes:
- "images:/var/lib/registry"

visualizer:
image: dockersamples/visualizer:latest
agent:
image: portainer/agent:linux-amd64-2.1.0-alpine
volumes:
- "/var/run/docker.sock:/var/run/docker.sock"
- /var/run/docker.sock:/var/run/docker.sock
- /var/lib/docker/volumes:/var/lib/docker/volumes
networks:
- portainer
deploy:
mode: global
placement:
constraints:
- "node.platform.os==linux"

portainer:
image: portainer/portainer-ce:2.1.1-alpine
command: -H tcp://tasks.agent:9001 --tlsskipverify
ports:
- "8000:8080"
deploy: *default-deploy-manager
- "9000:9000"
- "8000:8000"
volumes:
- portainer:/data
networks:
- portainer
deploy:
mode: replicated
replicas: 1
placement:
constraints:
- "node.role==manager"

sparkmaster:
build: microservices/spark_image
Expand Down Expand Up @@ -201,68 +223,96 @@ services:
- database
environment: *default-service-database-env

tsne:
build: microservices/tsne_image
image: 127.0.0.1:5050/tsne
databasexecutor:
build: microservices/database_executor_image
image: 127.0.0.1:5050/database_executor
ports:
- "5005:5005"
- "41200:41200"
extra_hosts:
- "tsne:0.0.0.0"
- "5006:5006"
depends_on:
- databaseprimary
- images
- sparkmaster
- sparkworker
deploy: *default-deploy-manager
volumes:
- "tsne:/images"
- "database_executor:/explore"
- "database_executor:/transform"
- "default_model:/models"
- "binary_executor:/binaries"
networks:
- database
- spark
environment: *default-service-database-env

pca:
build: microservices/pca_image
image: 127.0.0.1:5050/pca
gatewayapi:
image: devopsfaith/krakend:1.2.0
volumes:
- "./microservices/krakend:/etc/krakend"
ports:
- "5006:5006"
- "41300:41300"
extra_hosts:
- "pca:0.0.0.0"
- "80:8080"
- "8090:8090"
deploy: *default-deploy-manager
networks:
- database
- spark

defaultmodel:
build: microservices/default_model_image
image: 127.0.0.1:5050/default_model
ports:
- "5007:5007"
depends_on:
- databaseprimary
- images
- sparkmaster
- sparkworker
deploy: *default-deploy-manager
volumes:
- "pca:/images"
networks:
- database
- spark
environment: *default-service-database-env

gatewayapi:
image: devopsfaith/krakend:1.2.0
volumes:
- "./microservices/krakend:/etc/krakend"
- "default_model:/models"

binaryexecutor:
build: microservices/binary_executor_image
image: 127.0.0.1:5050/binary_executor
ports:
- "80:8080"
- "8090:8090"
- "5008:5008"
depends_on:
- databaseprimary
- images
deploy: *default-deploy-manager
networks:
- database
- spark
environment: *default-service-database-env
volumes:
- "default_model:/models"
- "binary_executor:/binaries"
- "database_executor:/transform"

codexecutor:
build: microservices/code_executor_image
image: 127.0.0.1:5050/code_executor
ports:
- "5009:5009"
depends_on:
- databaseprimary
- images
deploy: *default-deploy-manager
volumes:
- "database_executor:/explore"
- "database_executor:/transform"
- "default_model:/models"
- "binary_executor:/binaries"
networks:
- database
environment: *default-service-database-env

networks:
database:
spark:
portainer:

volumes:
images:
database:
database_api:
tsne:
pca:
database_executor:
default_model:
binary_executor:
portainer:
13 changes: 13 additions & 0 deletions microservices/binary_executor_image/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
FROM python:3.7-slim

WORKDIR /usr/src/binary_executor
COPY . /usr/src/binary_executor
RUN pip install -r requirements.txt

ENV MICROSERVICE_IP "0.0.0.0"
ENV MICROSERVICE_PORT 5008
ENV BINARY_VOLUME_PATH "/binaries"
ENV MODELS_VOLUME_PATH "/models"
ENV TRANSFORM_VOLUME_PATH "/transform"

CMD ["python", "server.py"]
141 changes: 141 additions & 0 deletions microservices/binary_executor_image/binary_execution.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
import importlib
from concurrent.futures import ThreadPoolExecutor
from utils import Database, Data, Metadata, ObjectStorage
from constants import Constants


class Parameters:
__DATASET_KEY_CHARACTER = "$"
__DATASET_WITH_OBJECT_KEY_CHARACTER = "."
__REMOVE_KEY_CHARACTER = ""

def __init__(self, database: Database, data: Data):
self.__database_connector = database
self.__data = data

def treat(self, method_parameters: dict) -> dict:
parameters = method_parameters.copy()

for name, value in parameters.items():
if self.__is_dataset(value):
dataset_name = self.__get_dataset_name_from_value(
value)
if self.__has_dot_in_dataset_name(value):
object_name = self.__get_name_after_dot_from_value(value)

parameters[name] = self.__data.get_object_from_dataset(
dataset_name, object_name)

else:
parameters[name] = self.__data.get_dataset_content(
dataset_name)

return parameters

def __is_dataset(self, value: str) -> bool:
return self.__DATASET_KEY_CHARACTER in value

def __get_dataset_name_from_value(self, value: str) -> str:
dataset_name = value.replace(self.__DATASET_KEY_CHARACTER,
self.__REMOVE_KEY_CHARACTER)
return dataset_name.split(self.__DATASET_WITH_OBJECT_KEY_CHARACTER)[
Constants.FIRST_ARGUMENT]

def __has_dot_in_dataset_name(self, dataset_name: str) -> bool:
return self.__DATASET_WITH_OBJECT_KEY_CHARACTER in dataset_name

def __get_name_after_dot_from_value(self, value: str) -> str:
return value.split(
self.__DATASET_WITH_OBJECT_KEY_CHARACTER)[Constants.SECOND_ARGUMENT]


class Execution:
__DATASET_KEY_CHARACTER = "$"
__REMOVE_KEY_CHARACTER = ""

def __init__(self,
database_connector: Database,
executor_name: str,
executor_service_type: str,
parent_name: str,
parent_name_service_type: str,
metadata_creator: Metadata,
class_method: str,
parameters_handler: Parameters,
storage: ObjectStorage,
):
self.__metadata_creator = metadata_creator
self.__thread_pool = ThreadPoolExecutor()
self.__database_connector = database_connector
self.__storage = storage
self.__parameters_handler = parameters_handler
self.executor_name = executor_name
self.parent_name = parent_name
self.class_method = class_method
self.executor_service_type = executor_service_type
self.parent_name_service_type = parent_name_service_type

def create(self,
module_path: str,
class_name: str,
method_parameters: dict,
description: str) -> None:

self.__metadata_creator.create_file(self.parent_name,
self.executor_name,
module_path,
class_name,
self.class_method,
self.executor_service_type)

self.__thread_pool.submit(self.__pipeline,
module_path,
method_parameters,
description)

def update(self,
module_path: str,
method_parameters: dict,
description: str) -> None:
self.__metadata_creator.update_finished_flag(self.executor_name, False)

self.__thread_pool.submit(self.__pipeline,
module_path,
method_parameters,
description)

def __pipeline(self,
module_path: str,
method_parameters: dict,
description: str) -> None:
try:
importlib.import_module(module_path)
model_instance = self.__storage.read(self.parent_name,
self.parent_name_service_type)
method_result = self.__execute_a_object_method(model_instance,
self.class_method,
method_parameters)
self.__storage.save(method_result, self.executor_name,
self.executor_service_type)
self.__metadata_creator.update_finished_flag(self.executor_name,
flag=True)

except Exception as exception:
self.__metadata_creator.create_execution_document(
self.executor_name,
description,
method_parameters,
repr(exception))
return None

self.__metadata_creator.create_execution_document(self.executor_name,
description,
method_parameters,
)

def __execute_a_object_method(self, class_instance: object, method: str,
parameters: dict) -> object:
model_method = getattr(class_instance, method)

treated_parameters = self.__parameters_handler.treat(parameters)
return model_method(**treated_parameters)
55 changes: 55 additions & 0 deletions microservices/binary_executor_image/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
class Constants:
MODULE_PATH_FIELD_NAME = "modulePath"
CLASS_FIELD_NAME = "class"
PARENT_NAME_FIELD_NAME = "parentName"
NAME_FIELD_NAME = "name"
FINISHED_FIELD_NAME = "finished"
DESCRIPTION_FIELD_NAME = "description"
METHOD_FIELD_NAME = "method"
METHOD_PARAMETERS_FIELD_NAME = "methodParameters"
TYPE_FIELD_NAME = "type"
EXCEPTION_FIELD_NAME = "exception"

MODELS_VOLUME_PATH = "MODELS_VOLUME_PATH"
BINARY_VOLUME_PATH = "BINARY_VOLUME_PATH"
TRANSFORM_VOLUME_PATH = "TRANSFORM_VOLUME_PATH"

DELETED_MESSAGE = "deleted file"

HTTP_STATUS_CODE_SUCCESS = 200
HTTP_STATUS_CODE_SUCCESS_CREATED = 201
HTTP_STATUS_CODE_CONFLICT = 409
HTTP_STATUS_CODE_NOT_ACCEPTABLE = 406
HTTP_STATUS_CODE_NOT_FOUND = 404
GET_METHOD_NAME = "GET"

DATABASE_URL = "DATABASE_URL"
DATABASE_PORT = "DATABASE_PORT"
DATABASE_NAME = "DATABASE_NAME"
DATABASE_REPLICA_SET = "DATABASE_REPLICA_SET"

ID_FIELD_NAME = "_id"
METADATA_DOCUMENT_ID = 0

MESSAGE_RESULT = "result"

MICROSERVICE_URI_SWITCHER = {
"tune": "/api/learningOrchestra/v1/tune/",
"train": "/api/learningOrchestra/v1/train/",
"evaluate": "/api/learningOrchestra/v1/evaluate/",
"predict": "/api/learningOrchestra/v1/predict/"
}

DEFAULT_MODEL_TYPE = "defaultModel"
TUNE_TYPE = "tune"
TRAIN_TYPE = "train"
EVALUATE_TYPE = "evaluate"
PREDICT_TYPE = "predict"
TRANSFORM_TYPE = "transform"
PYTHON_TRANSFORM_TYPE = "pythonTransform"

MICROSERVICE_URI_PATH = "/binaryExecutor"
MICROSERVICE_URI_GET_PARAMS = "?query={}&limit=20&skip=0"

FIRST_ARGUMENT = 0
SECOND_ARGUMENT = 1
6 changes: 6 additions & 0 deletions microservices/binary_executor_image/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pymongo==3.10.1
flask==1.1.2
datetime==4.3
pytz==2020.1
scikit-learn
pandas==1.2.0
Loading

0 comments on commit a4a3be6

Please sign in to comment.