Skip to content

Commit fb1e5c9

Browse files
committed
Create continuous_deploy.py
1 parent 8d5e23b commit fb1e5c9

File tree

1 file changed

+350
-0
lines changed

1 file changed

+350
-0
lines changed
Lines changed: 350 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,350 @@
1+
"""
2+
Continuous deployment based on metrics for ModelSync
3+
"""
4+
5+
import json
6+
import subprocess
7+
import requests
8+
from pathlib import Path
9+
from typing import Dict, List, Optional, Any, Callable
10+
from datetime import datetime
11+
from modelsync.utils.helpers import ensure_directory, write_json_file, read_json_file
12+
13+
class DeploymentRule:
14+
"""Represents a deployment rule based on metrics"""
15+
16+
def __init__(
17+
self,
18+
name: str,
19+
branch: str,
20+
metric_name: str,
21+
threshold: float,
22+
operator: str, # "greater_than", "less_than", "equals"
23+
deployment_target: str,
24+
deployment_config: Dict[str, Any]
25+
):
26+
self.name = name
27+
self.branch = branch
28+
self.metric_name = metric_name
29+
self.threshold = threshold
30+
self.operator = operator
31+
self.deployment_target = deployment_target
32+
self.deployment_config = deployment_config
33+
self.created_at = datetime.now().isoformat()
34+
self.last_checked = None
35+
self.triggered_count = 0
36+
37+
def check_condition(self, metrics: Dict[str, float]) -> bool:
38+
"""Check if deployment condition is met"""
39+
if self.metric_name not in metrics:
40+
return False
41+
42+
value = metrics[self.metric_name]
43+
44+
if self.operator == "greater_than":
45+
return value > self.threshold
46+
elif self.operator == "less_than":
47+
return value < self.threshold
48+
elif self.operator == "equals":
49+
return abs(value - self.threshold) < 0.001
50+
elif self.operator == "greater_equal":
51+
return value >= self.threshold
52+
elif self.operator == "less_equal":
53+
return value <= self.threshold
54+
55+
return False
56+
57+
def to_dict(self) -> Dict[str, Any]:
58+
"""Convert rule to dictionary"""
59+
return {
60+
"name": self.name,
61+
"branch": self.branch,
62+
"metric_name": self.metric_name,
63+
"threshold": self.threshold,
64+
"operator": self.operator,
65+
"deployment_target": self.deployment_target,
66+
"deployment_config": self.deployment_config,
67+
"created_at": self.created_at,
68+
"last_checked": self.last_checked,
69+
"triggered_count": self.triggered_count
70+
}
71+
72+
class DeploymentManager:
73+
"""Manages continuous deployment based on metrics"""
74+
75+
def __init__(self, repo_path: str = "."):
76+
self.repo_path = Path(repo_path)
77+
self.deploy_dir = self.repo_path / ".modelsync" / "deployment"
78+
self.rules_file = self.deploy_dir / "rules.json"
79+
self.deployments_file = self.deploy_dir / "deployments.json"
80+
self.rules: List[DeploymentRule] = []
81+
self.deployments: List[Dict[str, Any]] = []
82+
ensure_directory(str(self.deploy_dir))
83+
self._load_data()
84+
85+
def _load_data(self):
86+
"""Load deployment data"""
87+
rules_data = read_json_file(str(self.rules_file)) or []
88+
self.rules = [self._rule_from_dict(rule) for rule in rules_data]
89+
90+
self.deployments = read_json_file(str(self.deployments_file)) or []
91+
92+
def _save_data(self):
93+
"""Save deployment data"""
94+
rules_data = [rule.to_dict() for rule in self.rules]
95+
write_json_file(str(self.rules_file), rules_data)
96+
write_json_file(str(self.deployments_file), self.deployments)
97+
98+
def _rule_from_dict(self, rule_data: Dict[str, Any]) -> DeploymentRule:
99+
"""Create rule from dictionary"""
100+
rule = DeploymentRule(
101+
name=rule_data["name"],
102+
branch=rule_data["branch"],
103+
metric_name=rule_data["metric_name"],
104+
threshold=rule_data["threshold"],
105+
operator=rule_data["operator"],
106+
deployment_target=rule_data["deployment_target"],
107+
deployment_config=rule_data["deployment_config"]
108+
)
109+
rule.created_at = rule_data.get("created_at", rule.created_at)
110+
rule.last_checked = rule_data.get("last_checked")
111+
rule.triggered_count = rule_data.get("triggered_count", 0)
112+
return rule
113+
114+
def add_deployment_rule(
115+
self,
116+
name: str,
117+
branch: str,
118+
metric_name: str,
119+
threshold: float,
120+
operator: str,
121+
deployment_target: str,
122+
deployment_config: Dict[str, Any]
123+
) -> bool:
124+
"""Add a new deployment rule"""
125+
126+
# Check if rule name already exists
127+
if any(rule.name == name for rule in self.rules):
128+
print(f"❌ Rule '{name}' already exists")
129+
return False
130+
131+
rule = DeploymentRule(
132+
name=name,
133+
branch=branch,
134+
metric_name=metric_name,
135+
threshold=threshold,
136+
operator=operator,
137+
deployment_target=deployment_target,
138+
deployment_config=deployment_config
139+
)
140+
141+
self.rules.append(rule)
142+
self._save_data()
143+
144+
print(f"✅ Added deployment rule: {name}")
145+
return True
146+
147+
def check_deployment_rules(
148+
self,
149+
branch: str,
150+
metrics: Dict[str, float],
151+
model_id: str = None
152+
) -> List[Dict[str, Any]]:
153+
"""Check all deployment rules for a branch"""
154+
155+
triggered_rules = []
156+
157+
for rule in self.rules:
158+
if rule.branch != branch:
159+
continue
160+
161+
if rule.check_condition(metrics):
162+
print(f"🚀 Deployment rule triggered: {rule.name}")
163+
164+
# Update rule stats
165+
rule.last_checked = datetime.now().isoformat()
166+
rule.triggered_count += 1
167+
168+
# Execute deployment
169+
deployment_result = self._execute_deployment(rule, metrics, model_id)
170+
171+
triggered_rules.append({
172+
"rule_name": rule.name,
173+
"branch": branch,
174+
"metrics": metrics,
175+
"model_id": model_id,
176+
"deployment_result": deployment_result,
177+
"triggered_at": datetime.now().isoformat()
178+
})
179+
180+
if triggered_rules:
181+
self._save_data()
182+
183+
return triggered_rules
184+
185+
def _execute_deployment(
186+
self,
187+
rule: DeploymentRule,
188+
metrics: Dict[str, float],
189+
model_id: str = None
190+
) -> Dict[str, Any]:
191+
"""Execute deployment based on rule"""
192+
193+
deployment_id = f"{rule.name}_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
194+
195+
deployment_data = {
196+
"id": deployment_id,
197+
"rule_name": rule.name,
198+
"branch": rule.branch,
199+
"model_id": model_id,
200+
"metrics": metrics,
201+
"deployment_target": rule.deployment_target,
202+
"config": rule.deployment_config,
203+
"started_at": datetime.now().isoformat(),
204+
"status": "running"
205+
}
206+
207+
try:
208+
if rule.deployment_target == "docker":
209+
result = self._deploy_docker(deployment_data)
210+
elif rule.deployment_target == "kubernetes":
211+
result = self._deploy_kubernetes(deployment_data)
212+
elif rule.deployment_target == "api_endpoint":
213+
result = self._deploy_api_endpoint(deployment_data)
214+
elif rule.deployment_target == "mlflow":
215+
result = self._deploy_mlflow(deployment_data)
216+
else:
217+
result = {"status": "error", "message": f"Unknown deployment target: {rule.deployment_target}"}
218+
219+
deployment_data.update(result)
220+
deployment_data["completed_at"] = datetime.now().isoformat()
221+
222+
except Exception as e:
223+
deployment_data["status"] = "failed"
224+
deployment_data["error"] = str(e)
225+
deployment_data["failed_at"] = datetime.now().isoformat()
226+
result = {"status": "failed", "error": str(e)}
227+
228+
# Save deployment record
229+
self.deployments.append(deployment_data)
230+
self._save_data()
231+
232+
return result
233+
234+
def _deploy_docker(self, deployment_data: Dict[str, Any]) -> Dict[str, Any]:
235+
"""Deploy using Docker"""
236+
config = deployment_data["config"]
237+
238+
# Build Docker image
239+
build_cmd = config.get("build_command", "docker build -t {image_name} .")
240+
image_name = config.get("image_name", "modelsync-model")
241+
242+
build_cmd = build_cmd.format(image_name=image_name)
243+
244+
try:
245+
result = subprocess.run(build_cmd, shell=True, capture_output=True, text=True)
246+
if result.returncode != 0:
247+
return {"status": "failed", "error": result.stderr}
248+
except Exception as e:
249+
return {"status": "failed", "error": str(e)}
250+
251+
# Run Docker container
252+
run_cmd = config.get("run_command", "docker run -d -p {port}:8000 {image_name}")
253+
port = config.get("port", "8000")
254+
255+
run_cmd = run_cmd.format(image_name=image_name, port=port)
256+
257+
try:
258+
result = subprocess.run(run_cmd, shell=True, capture_output=True, text=True)
259+
if result.returncode != 0:
260+
return {"status": "failed", "error": result.stderr}
261+
262+
container_id = result.stdout.strip()
263+
return {"status": "success", "container_id": container_id, "port": port}
264+
except Exception as e:
265+
return {"status": "failed", "error": str(e)}
266+
267+
def _deploy_kubernetes(self, deployment_data: Dict[str, Any]) -> Dict[str, Any]:
268+
"""Deploy using Kubernetes"""
269+
config = deployment_data["config"]
270+
271+
# Apply Kubernetes manifests
272+
manifest_path = config.get("manifest_path")
273+
if not manifest_path:
274+
return {"status": "error", "message": "No manifest path specified"}
275+
276+
try:
277+
result = subprocess.run(f"kubectl apply -f {manifest_path}", shell=True, capture_output=True, text=True)
278+
if result.returncode != 0:
279+
return {"status": "failed", "error": result.stderr}
280+
281+
return {"status": "success", "output": result.stdout}
282+
except Exception as e:
283+
return {"status": "failed", "error": str(e)}
284+
285+
def _deploy_api_endpoint(self, deployment_data: Dict[str, Any]) -> Dict[str, Any]:
286+
"""Deploy to API endpoint"""
287+
config = deployment_data["config"]
288+
endpoint = config.get("endpoint")
289+
290+
if not endpoint:
291+
return {"status": "error", "message": "No endpoint specified"}
292+
293+
# Prepare deployment payload
294+
payload = {
295+
"model_id": deployment_data["model_id"],
296+
"metrics": deployment_data["metrics"],
297+
"config": config
298+
}
299+
300+
try:
301+
response = requests.post(endpoint, json=payload, timeout=30)
302+
if response.status_code == 200:
303+
return {"status": "success", "response": response.json()}
304+
else:
305+
return {"status": "failed", "error": f"HTTP {response.status_code}: {response.text}"}
306+
except Exception as e:
307+
return {"status": "failed", "error": str(e)}
308+
309+
def _deploy_mlflow(self, deployment_data: Dict[str, Any]) -> Dict[str, Any]:
310+
"""Deploy using MLflow"""
311+
config = deployment_data["config"]
312+
313+
# MLflow deployment commands
314+
model_uri = config.get("model_uri")
315+
if not model_uri:
316+
return {"status": "error", "message": "No model URI specified"}
317+
318+
try:
319+
# Register model in MLflow
320+
register_cmd = f"mlflow models serve -m {model_uri} -p {config.get('port', '5000')}"
321+
result = subprocess.run(register_cmd, shell=True, capture_output=True, text=True)
322+
323+
if result.returncode != 0:
324+
return {"status": "failed", "error": result.stderr}
325+
326+
return {"status": "success", "output": result.stdout}
327+
except Exception as e:
328+
return {"status": "failed", "error": str(e)}
329+
330+
def list_deployment_rules(self) -> List[Dict[str, Any]]:
331+
"""List all deployment rules"""
332+
return [rule.to_dict() for rule in self.rules]
333+
334+
def get_deployments(self, branch: str = None) -> List[Dict[str, Any]]:
335+
"""Get deployment history"""
336+
if branch:
337+
return [d for d in self.deployments if d.get("branch") == branch]
338+
return self.deployments
339+
340+
def remove_deployment_rule(self, name: str) -> bool:
341+
"""Remove a deployment rule"""
342+
for i, rule in enumerate(self.rules):
343+
if rule.name == name:
344+
del self.rules[i]
345+
self._save_data()
346+
print(f"✅ Removed deployment rule: {name}")
347+
return True
348+
349+
print(f"❌ Rule '{name}' not found")
350+
return False

0 commit comments

Comments
 (0)