diff --git a/adk/ADK.py b/adk/ADK.py index 10c0222..718ef98 100644 --- a/adk/ADK.py +++ b/adk/ADK.py @@ -3,8 +3,12 @@ import os import sys import Algorithmia +import os +import subprocess + from adk.io import create_exception, format_data, format_response from adk.modeldata import ModelData +from adk.mlops import MLOps class ADK(object): @@ -17,6 +21,7 @@ def __init__(self, apply_func, load_func=None, client=None): :param client: A Algorithmia Client instance that might be user defined, and is used for interacting with a model manifest file; if defined. """ + self.mlops = None self.FIFO_PATH = "/tmp/algoout" if client: @@ -39,10 +44,8 @@ def __init__(self, apply_func, load_func=None, client=None): self.load_result = None self.loading_exception = None self.manifest_path = "model_manifest.json" - self.model_data = self.init_manifest(self.manifest_path) - - def init_manifest(self, path): - return ModelData(self.client, path) + self.mlops_path = "mlops.json" + self.model_data = ModelData(self.client, self.manifest_path) def load(self): try: @@ -91,8 +94,18 @@ def write_to_pipe(self, payload, pprint=print): def process_local(self, local_payload, pprint): result = self.apply(local_payload) self.write_to_pipe(result, pprint=pprint) + + def mlops_init(self): + mlops_token = os.environ.get("DATAROBOT_MLOPS_API_TOKEN", None) + if mlops_token: + self.mlops = MLOps(mlops_token, self.mlops_path) + self.mlops.init() + else: + raise Exception("'DATAROBOT_MLOPS_API_TOKEN' was not found, please set to use mlops.") - def init(self, local_payload=None, pprint=print): + def init(self, local_payload=None, pprint=print, mlops=False): + if mlops and not self.is_local: + self.mlops_init() self.load() if self.is_local and local_payload is not None: if self.loading_exception: diff --git a/adk/mlops.py b/adk/mlops.py new file mode 100644 index 0000000..a64efbd --- /dev/null +++ b/adk/mlops.py @@ -0,0 +1,46 @@ +import yaml +import json +import os +import subprocess + + +class MLOps(object): + spool_dir = "/tmp/ta" + agent_dir = "/opt/mlops-agent" + mlops_dir_name = "datarobot_mlops_package-8.1.2" + + def __init__(self, api_token, path): + self.token = api_token + if os.path.exists(path): + with open(path) as f: + mlops_config = json.load(f) + else: + raise Exception("'mlops.json' file does not exist, but mlops was requested.") + if not os.path.exists(self.agent_dir): + raise Exception("environment is not configured for mlops.\nPlease select a valid mlops enabled environment.") + self.endpoint = mlops_config['datarobot_mlops_service_url'] + self.model_id = mlops_config['model_id'] + self.deployment_id = mlops_config['deployment_id'] + self.mlops_name = mlops_config.get('mlops_dir_name', 'datarobot_mlops_package-8.1.2') + + def init(self): + os.environ['MLOPS_DEPLOYMENT_ID'] = self.deployment_id + os.environ['MLOPS_MODEL_ID'] = self.model_id + os.environ['MLOPS_SPOOLER_TYPE'] = "FILESYSTEM" + os.environ['MLOPS_FILESYSTEM_DIRECTORY'] = self.spool_dir + + with open(f'{self.agent_dir}/{self.mlops_dir_name}/conf/mlops.agent.conf.yaml') as f: + documents = yaml.load(f, Loader=yaml.FullLoader) + documents['mlopsUrl'] = self.endpoint + documents['apiToken'] = self.token + with open(f'{self.agent_dir}/{self.mlops_dir_name}/conf/mlops.agent.conf.yaml', 'w') as f: + yaml.dump(documents, f) + + subprocess.call(f'{self.agent_dir}/{self.mlops_dir_name}/bin/start-agent.sh') + check = subprocess.Popen([f'{self.agent_dir}/{self.mlops_dir_name}/bin/status-agent.sh'], stdout=subprocess.PIPE) + output = check.stdout.readlines()[0] + check.terminate() + if b"DataRobot MLOps-Agent is running as a service." in output: + return True + else: + raise Exception(output) \ No newline at end of file diff --git a/adk/modeldata.py b/adk/modeldata.py index 0b6acab..e2b49cd 100644 --- a/adk/modeldata.py +++ b/adk/modeldata.py @@ -88,7 +88,6 @@ def find_optional_model(self, file_name): else: self.models[file_name] = FileData(real_hash, local_data_path) - def get_manifest(self): if os.path.exists(self.manifest_frozen_path): with open(self.manifest_frozen_path) as f: @@ -96,8 +95,9 @@ def get_manifest(self): if check_lock(manifest_data): return manifest_data else: - raise Exception("Manifest FreezeFile Tamper Detected; please use the CLI and 'algo freeze' to rebuild your " - "algorithm's freeze file.") + raise Exception( + "Manifest FreezeFile Tamper Detected; please use the CLI and 'algo freeze' to rebuild your " + "algorithm's freeze file.") elif os.path.exists(self.manifest_reg_path): with open(self.manifest_reg_path) as f: manifest_data = json.load(f) diff --git a/requirements.txt b/requirements.txt index ccb528b..eda150c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,3 @@ algorithmia>=1.7,<2 -six \ No newline at end of file +six +pyaml>=21.10,<21.11 \ No newline at end of file diff --git a/setup.py b/setup.py index f470b4a..8d34878 100644 --- a/setup.py +++ b/setup.py @@ -14,6 +14,7 @@ author_email='support@algorithmia.com', packages=['adk'], install_requires=[ + 'pyaml>=21.10,<21.11', 'six', ], include_package_data=True, diff --git a/tests/AdkTest.py b/tests/AdkTest.py index e6b4672..941d848 100644 --- a/tests/AdkTest.py +++ b/tests/AdkTest.py @@ -1,7 +1,7 @@ from adk import ADK - +from adk.modeldata import ModelData class ADKTest(ADK): def __init__(self, apply_func, load_func=None, client=None, manifest_path="model_manifest.json.freeze"): super(ADKTest, self).__init__(apply_func, load_func, client) - self.model_data = self.init_manifest(manifest_path) + self.model_data = ModelData(self.client, manifest_path)