Skip to content

Commit 620d47d

Browse files
committed
Create ml_pipeline.py
1 parent aca408c commit 620d47d

File tree

1 file changed

+338
-0
lines changed

1 file changed

+338
-0
lines changed

modelsync/pipelines/ml_pipeline.py

Lines changed: 338 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,338 @@
1+
"""
2+
ML Pipeline integration for ModelSync
3+
"""
4+
5+
import json
6+
import inspect
7+
from pathlib import Path
8+
from typing import Dict, List, Optional, Any, Callable, Union
9+
from datetime import datetime
10+
import pandas as pd
11+
import numpy as np
12+
from modelsync.utils.helpers import ensure_directory, write_json_file, read_json_file
13+
14+
# ML Framework imports
15+
try:
16+
import sklearn
17+
from sklearn.pipeline import Pipeline as SklearnPipeline
18+
from sklearn.base import BaseEstimator, TransformerMixin
19+
SKLEARN_AVAILABLE = True
20+
except ImportError:
21+
SKLEARN_AVAILABLE = False
22+
23+
try:
24+
import tensorflow as tf
25+
TENSORFLOW_AVAILABLE = True
26+
except ImportError:
27+
TENSORFLOW_AVAILABLE = False
28+
29+
try:
30+
import torch
31+
import torch.nn as nn
32+
PYTORCH_AVAILABLE = True
33+
except ImportError:
34+
PYTORCH_AVAILABLE = False
35+
36+
class PipelineStep:
37+
"""Represents a single step in an ML pipeline"""
38+
39+
def __init__(
40+
self,
41+
name: str,
42+
step_type: str,
43+
function: Callable,
44+
parameters: Dict[str, Any],
45+
framework: str = "custom"
46+
):
47+
self.name = name
48+
self.step_type = step_type # data_preprocessing, feature_engineering, model_training, evaluation
49+
self.function = function
50+
self.parameters = parameters
51+
self.framework = framework
52+
self.created_at = datetime.now().isoformat()
53+
54+
def execute(self, data: Any, context: Dict[str, Any] = None) -> Any:
55+
"""Execute this pipeline step"""
56+
try:
57+
# Prepare parameters
58+
params = self.parameters.copy()
59+
if context:
60+
params.update(context)
61+
62+
# Execute function
63+
if self.framework == "sklearn" and SKLEARN_AVAILABLE:
64+
return self._execute_sklearn_step(data, params)
65+
elif self.framework == "tensorflow" and TENSORFLOW_AVAILABLE:
66+
return self._execute_tensorflow_step(data, params)
67+
elif self.framework == "pytorch" and PYTORCH_AVAILABLE:
68+
return self._execute_pytorch_step(data, params)
69+
else:
70+
return self._execute_custom_step(data, params)
71+
72+
except Exception as e:
73+
print(f"❌ Error executing step '{self.name}': {e}")
74+
raise
75+
76+
def _execute_sklearn_step(self, data: Any, params: Dict[str, Any]) -> Any:
77+
"""Execute sklearn pipeline step"""
78+
if hasattr(self.function, 'fit'):
79+
# It's a transformer/estimator
80+
if hasattr(self.function, 'transform'):
81+
# Transformer
82+
if not hasattr(self.function, 'fitted_'):
83+
self.function.fit(data)
84+
return self.function.transform(data)
85+
else:
86+
# Estimator
87+
return self.function.fit(data, **params)
88+
else:
89+
# Regular function
90+
return self.function(data, **params)
91+
92+
def _execute_tensorflow_step(self, data: Any, params: Dict[str, Any]) -> Any:
93+
"""Execute TensorFlow pipeline step"""
94+
return self.function(data, **params)
95+
96+
def _execute_pytorch_step(self, data: Any, params: Dict[str, Any]) -> Any:
97+
"""Execute PyTorch pipeline step"""
98+
return self.function(data, **params)
99+
100+
def _execute_custom_step(self, data: Any, params: Dict[str, Any]) -> Any:
101+
"""Execute custom pipeline step"""
102+
return self.function(data, **params)
103+
104+
def to_dict(self) -> Dict[str, Any]:
105+
"""Convert step to dictionary for serialization"""
106+
return {
107+
"name": self.name,
108+
"step_type": self.step_type,
109+
"framework": self.framework,
110+
"parameters": self.parameters,
111+
"created_at": self.created_at,
112+
"function_name": self.function.__name__ if hasattr(self.function, '__name__') else str(self.function)
113+
}
114+
115+
class MLPipeline:
116+
"""ML Pipeline with versioning support"""
117+
118+
def __init__(self, name: str, repo_path: str = "."):
119+
self.name = name
120+
self.repo_path = Path(repo_path)
121+
self.pipeline_dir = self.repo_path / ".modelsync" / "pipelines" / name
122+
self.steps: List[PipelineStep] = []
123+
self.metadata = {
124+
"name": name,
125+
"created_at": datetime.now().isoformat(),
126+
"steps": [],
127+
"executions": [],
128+
"version": "1.0.0"
129+
}
130+
self._setup_pipeline()
131+
132+
def _setup_pipeline(self):
133+
"""Setup pipeline directory"""
134+
ensure_directory(str(self.pipeline_dir))
135+
self._load_metadata()
136+
137+
def _load_metadata(self):
138+
"""Load pipeline metadata"""
139+
metadata_file = self.pipeline_dir / "metadata.json"
140+
if metadata_file.exists():
141+
self.metadata = read_json_file(str(metadata_file))
142+
self._load_steps()
143+
144+
def _load_steps(self):
145+
"""Load pipeline steps from metadata"""
146+
self.steps = []
147+
for step_data in self.metadata.get("steps", []):
148+
# Note: In a real implementation, you'd need to reconstruct the function
149+
# This is a simplified version
150+
step = PipelineStep(
151+
name=step_data["name"],
152+
step_type=step_data["step_type"],
153+
function=None, # Would need to be reconstructed
154+
parameters=step_data["parameters"],
155+
framework=step_data["framework"]
156+
)
157+
step.created_at = step_data["created_at"]
158+
self.steps.append(step)
159+
160+
def add_step(
161+
self,
162+
name: str,
163+
step_type: str,
164+
function: Callable,
165+
parameters: Dict[str, Any] = None,
166+
framework: str = "custom"
167+
) -> 'MLPipeline':
168+
"""Add a step to the pipeline"""
169+
170+
step = PipelineStep(
171+
name=name,
172+
step_type=step_type,
173+
function=function,
174+
parameters=parameters or {},
175+
framework=framework
176+
)
177+
178+
self.steps.append(step)
179+
self.metadata["steps"].append(step.to_dict())
180+
self._save_metadata()
181+
182+
print(f"✅ Added step '{name}' to pipeline '{self.name}'")
183+
return self
184+
185+
def execute(
186+
self,
187+
data: Any,
188+
context: Dict[str, Any] = None,
189+
save_results: bool = True
190+
) -> Any:
191+
"""Execute the entire pipeline"""
192+
193+
execution_id = f"{self.name}_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
194+
execution_data = {
195+
"id": execution_id,
196+
"pipeline_name": self.name,
197+
"started_at": datetime.now().isoformat(),
198+
"steps_executed": [],
199+
"results": {},
200+
"status": "running"
201+
}
202+
203+
try:
204+
current_data = data
205+
context = context or {}
206+
207+
for i, step in enumerate(self.steps):
208+
print(f"🔄 Executing step {i+1}/{len(self.steps)}: {step.name}")
209+
210+
step_start = datetime.now()
211+
current_data = step.execute(current_data, context)
212+
step_end = datetime.now()
213+
214+
execution_data["steps_executed"].append({
215+
"step_name": step.name,
216+
"step_type": step.step_type,
217+
"started_at": step_start.isoformat(),
218+
"completed_at": step_end.isoformat(),
219+
"duration_seconds": (step_end - step_start).total_seconds()
220+
})
221+
222+
execution_data["status"] = "completed"
223+
execution_data["completed_at"] = datetime.now().isoformat()
224+
execution_data["results"]["final_output"] = str(type(current_data))
225+
226+
if save_results:
227+
self._save_execution(execution_data)
228+
229+
print(f"✅ Pipeline '{self.name}' executed successfully")
230+
return current_data
231+
232+
except Exception as e:
233+
execution_data["status"] = "failed"
234+
execution_data["error"] = str(e)
235+
execution_data["failed_at"] = datetime.now().isoformat()
236+
237+
if save_results:
238+
self._save_execution(execution_data)
239+
240+
print(f"❌ Pipeline '{self.name}' failed: {e}")
241+
raise
242+
243+
def get_step(self, name: str) -> Optional[PipelineStep]:
244+
"""Get a specific step by name"""
245+
for step in self.steps:
246+
if step.name == name:
247+
return step
248+
return None
249+
250+
def remove_step(self, name: str) -> bool:
251+
"""Remove a step from the pipeline"""
252+
for i, step in enumerate(self.steps):
253+
if step.name == name:
254+
del self.steps[i]
255+
self.metadata["steps"] = [s for s in self.metadata["steps"] if s["name"] != name]
256+
self._save_metadata()
257+
print(f"✅ Removed step '{name}' from pipeline '{self.name}'")
258+
return True
259+
260+
print(f"❌ Step '{name}' not found in pipeline '{self.name}'")
261+
return False
262+
263+
def get_executions(self) -> List[Dict[str, Any]]:
264+
"""Get all pipeline executions"""
265+
executions_dir = self.pipeline_dir / "executions"
266+
if not executions_dir.exists():
267+
return []
268+
269+
executions = []
270+
for execution_file in executions_dir.glob("*.json"):
271+
execution = read_json_file(str(execution_file))
272+
if execution:
273+
executions.append(execution)
274+
275+
return sorted(executions, key=lambda x: x["started_at"], reverse=True)
276+
277+
def _save_metadata(self):
278+
"""Save pipeline metadata"""
279+
metadata_file = self.pipeline_dir / "metadata.json"
280+
write_json_file(str(metadata_file), self.metadata)
281+
282+
def _save_execution(self, execution_data: Dict[str, Any]):
283+
"""Save execution data"""
284+
executions_dir = self.pipeline_dir / "executions"
285+
ensure_directory(str(executions_dir))
286+
287+
execution_file = executions_dir / f"{execution_data['id']}.json"
288+
write_json_file(str(execution_file), execution_data)
289+
290+
# Update pipeline metadata
291+
self.metadata["executions"].append(execution_data["id"])
292+
self._save_metadata()
293+
294+
class PipelineManager:
295+
"""Manages ML pipelines"""
296+
297+
def __init__(self, repo_path: str = "."):
298+
self.repo_path = Path(repo_path)
299+
self.pipelines_dir = self.repo_path / ".modelsync" / "pipelines"
300+
ensure_directory(str(self.pipelines_dir))
301+
302+
def create_pipeline(self, name: str) -> MLPipeline:
303+
"""Create a new ML pipeline"""
304+
if self.pipeline_exists(name):
305+
raise ValueError(f"Pipeline '{name}' already exists")
306+
307+
pipeline = MLPipeline(name, str(self.repo_path))
308+
print(f"✅ Created pipeline: {name}")
309+
return pipeline
310+
311+
def get_pipeline(self, name: str) -> Optional[MLPipeline]:
312+
"""Get an existing pipeline"""
313+
if not self.pipeline_exists(name):
314+
return None
315+
316+
return MLPipeline(name, str(self.repo_path))
317+
318+
def pipeline_exists(self, name: str) -> bool:
319+
"""Check if pipeline exists"""
320+
return (self.pipelines_dir / name).exists()
321+
322+
def list_pipelines(self) -> List[str]:
323+
"""List all pipelines"""
324+
if not self.pipelines_dir.exists():
325+
return []
326+
327+
return [d.name for d in self.pipelines_dir.iterdir() if d.is_dir()]
328+
329+
def delete_pipeline(self, name: str) -> bool:
330+
"""Delete a pipeline"""
331+
if not self.pipeline_exists(name):
332+
print(f"❌ Pipeline '{name}' not found")
333+
return False
334+
335+
import shutil
336+
shutil.rmtree(self.pipelines_dir / name)
337+
print(f"✅ Deleted pipeline: {name}")
338+
return True

0 commit comments

Comments
 (0)