forked from Zipstack/unstract
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathplugin_manager.py
More file actions
274 lines (221 loc) · 9.37 KB
/
plugin_manager.py
File metadata and controls
274 lines (221 loc) · 9.37 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
"""Plugin Manager for Workers
This module provides programmatic tools for managing worker plugins.
CLI functionality has been removed as plugins are managed programmatically.
"""
import json
import logging
import sys
from pathlib import Path
# Add the workers directory to the path so we can import our modules
sys.path.insert(0, str(Path(__file__).parent.parent))
# Setup logger
logger = logging.getLogger(__name__)
try:
from plugins import (
get_plugin,
get_plugin_requirements,
list_available_plugins,
validate_plugin_structure,
)
except ImportError as e:
logger.error(f"Error importing plugin system: {e}")
logger.error("Make sure you're running this from the workers directory")
sys.exit(1)
class PluginManager:
"""Plugin management operations."""
def list_plugins(self) -> None:
"""List all available plugins."""
plugins = list_available_plugins()
if not plugins:
logger.info("No plugins found in the plugins directory.")
return
logger.info(f"Found {len(plugins)} plugin(s):\n")
for plugin in plugins:
logger.info(f"📦 {plugin['name']}")
logger.info(f" Path: {plugin['path']}")
# Show available components
components = []
if plugin.get("has_client", False):
components.append("client")
if plugin.get("has_tasks", False):
components.append("tasks")
if plugin.get("has_dto", False):
components.append("dto")
if plugin.get("has_backend_integration", False):
components.append("backend-integration")
if plugin.get("has_readme", False):
components.append("readme")
if components:
logger.info(f" Components: {', '.join(components)}")
else:
logger.info(" Components: none")
logger.info("")
def show_plugin_info(self, plugin_name: str) -> None:
"""Show detailed information about a plugin."""
logger.info(f"Plugin Information: {plugin_name}")
logger.info("=" * 50)
self._show_plugin_metadata(plugin_name)
self._show_plugin_structure_validation(plugin_name)
self._show_plugin_loading_test(plugin_name)
def _show_plugin_metadata(self, plugin_name: str) -> None:
"""Display plugin metadata information."""
requirements = get_plugin_requirements(plugin_name)
if not requirements:
return
logger.info("Metadata:")
for key, value in requirements.items():
if isinstance(value, (list, dict)):
logger.info(f" {key}: {json.dumps(value, indent=4)}")
else:
logger.info(f" {key}: {value}")
logger.info("")
def _show_plugin_structure_validation(self, plugin_name: str) -> None:
"""Display plugin structure validation results."""
validation = validate_plugin_structure(plugin_name)
logger.info("Structure Validation:")
for check, result in validation.items():
status = "✅" if result else "❌"
logger.info(f" {status} {check}")
logger.info("")
def _show_plugin_loading_test(self, plugin_name: str) -> None:
"""Test and display plugin loading results."""
logger.info("Plugin Loading Test:")
try:
plugin = get_plugin(plugin_name)
if plugin:
logger.info(" ✅ Plugin loaded successfully")
self._show_plugin_type_info(plugin)
else:
logger.error(" ❌ Plugin failed to load")
except Exception as e:
logger.error(f" ❌ Plugin loading error: {e}")
logger.info("")
def _show_plugin_type_info(self, plugin) -> None:
"""Display plugin type information."""
if hasattr(plugin, "__name__"):
logger.info(f" 📋 Plugin type: {plugin.__name__}")
elif hasattr(plugin, "__class__"):
logger.info(f" 📋 Plugin type: {plugin.__class__.__name__}")
def validate_plugin(self, plugin_name: str) -> bool:
"""Validate a plugin thoroughly."""
logger.info(f"Validating Plugin: {plugin_name}")
logger.info("=" * 50)
# Structure validation
if not self._validate_plugin_structure(plugin_name):
return False
# Import test
return self._validate_plugin_import(plugin_name)
def _validate_plugin_structure(self, plugin_name: str) -> bool:
"""Validate plugin structure and return True if valid."""
validation = validate_plugin_structure(plugin_name)
structure_valid = True
logger.info("Structure Validation:")
for check, result in validation.items():
status = "✅" if result else "❌"
logger.info(f" {status} {check}")
if not result and check in ["exists", "has_init"]:
structure_valid = False
if not structure_valid:
logger.error("\n❌ Plugin has critical structure issues")
return False
logger.info("")
return True
def _validate_plugin_import(self, plugin_name: str) -> bool:
"""Validate plugin import and methods."""
logger.info("Import Test:")
try:
plugin = get_plugin(plugin_name)
if not plugin:
logger.error(" ❌ Plugin failed to import")
return False
logger.info(" ✅ Plugin imports successfully")
self._test_plugin_methods(plugin)
logger.info("\n✅ Plugin validation completed successfully")
return True
except Exception as e:
logger.error(f" ❌ Import error: {e}")
return False
def _test_plugin_methods(self, plugin) -> None:
"""Test plugin methods if available."""
if hasattr(plugin, "get_metadata"):
try:
plugin.get_metadata()
logger.info(" ✅ Plugin metadata accessible")
except Exception as e:
logger.warning(f" ⚠️ Plugin metadata error: {e}")
if hasattr(plugin, "validate_requirements"):
try:
plugin.validate_requirements()
logger.info(" ✅ Plugin requirements validation available")
except Exception as e:
logger.warning(f" ⚠️ Plugin requirements validation error: {e}")
def test_plugin(self, plugin_name: str) -> None:
"""Run tests for a plugin."""
logger.info(f"Testing Plugin: {plugin_name}")
logger.info("=" * 50)
plugin_path = Path(__file__).parent / plugin_name
test_file = plugin_path / "test_plugin.py"
if not test_file.exists():
logger.error("❌ No test file found (test_plugin.py)")
return
logger.info("🧪 Running plugin tests...")
# Try to run the test file
import os
import subprocess
try:
# Set up environment
env = os.environ.copy()
env["PYTHONPATH"] = str(Path(__file__).parent.parent)
# Run the test
result = subprocess.run(
[sys.executable, str(test_file)],
cwd=str(Path(__file__).parent.parent),
env=env,
capture_output=True,
text=True,
timeout=60,
)
if result.returncode == 0:
logger.info("✅ Tests passed!")
logger.info("\nTest Output:")
logger.info(result.stdout)
else:
logger.error("❌ Tests failed!")
logger.info("\nTest Output:")
logger.info(result.stdout)
if result.stderr:
logger.error("\nError Output:")
logger.error(result.stderr)
except subprocess.TimeoutExpired:
logger.warning("⏰ Tests timed out after 60 seconds")
except Exception as e:
logger.error(f"❌ Error running tests: {e}")
def install_plugin_deps(self, plugin_name: str) -> None:
"""Install dependencies for a plugin."""
logger.info(f"Installing Dependencies for Plugin: {plugin_name}")
logger.info("=" * 50)
requirements = get_plugin_requirements(plugin_name)
if not requirements:
logger.error("❌ No plugin requirements found")
return
dependencies = requirements.get("dependencies", [])
if not dependencies:
logger.info("ℹ️ No dependencies specified for this plugin")
return
logger.info("Dependencies to install:")
for dep in dependencies:
logger.info(f" - {dep}")
logger.info("\n🚀 Installing dependencies...")
import subprocess
try:
# Install using pip
cmd = [sys.executable, "-m", "pip", "install"] + dependencies
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode == 0:
logger.info("✅ Dependencies installed successfully!")
else:
logger.error("❌ Failed to install dependencies")
logger.error("Error output:")
logger.error(result.stderr)
except Exception as e:
logger.error(f"❌ Error installing dependencies: {e}")