forked from CodeGraphContext/CodeGraphContext
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcli_helpers.py
More file actions
201 lines (159 loc) · 6.92 KB
/
Copy pathcli_helpers.py
File metadata and controls
201 lines (159 loc) · 6.92 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
# src/codegraphcontext/cli/cli_helpers.py
import asyncio
import json
import urllib.parse
from pathlib import Path
import time
from rich.console import Console
from rich.table import Table
from ..core.database import DatabaseManager
from ..core.jobs import JobManager
from ..tools.code_finder import CodeFinder
from ..tools.graph_builder import GraphBuilder
from ..tools.package_resolver import get_local_package_path
console = Console()
def _initialize_services():
"""Initializes and returns core service managers."""
console.print("[dim]Initializing services and database connection...[/dim]")
db_manager = DatabaseManager()
try:
db_manager.get_driver()
except ValueError as e:
console.print(f"[bold red]Database Connection Error:[/bold red] {e}")
console.print("Please ensure your Neo4j credentials are correct and the database is running.")
return None, None, None
# The GraphBuilder requires an event loop, even for synchronous-style execution
try:
loop = asyncio.get_running_loop()
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
graph_builder = GraphBuilder(db_manager, JobManager(), loop)
code_finder = CodeFinder(db_manager)
console.print("[dim]Services initialized.[/dim]")
return db_manager, graph_builder, code_finder
def index_helper(path: str):
"""Synchronously indexes a repository."""
time_start = time.time()
services = _initialize_services()
if not all(services):
return
db_manager, graph_builder, code_finder = services
path_obj = Path(path).resolve()
if not path_obj.exists():
console.print(f"[red]Error: Path does not exist: {path_obj}[/red]")
db_manager.close_driver()
return
indexed_repos = code_finder.list_indexed_repositories()
if any(Path(repo["path"]).resolve() == path_obj for repo in indexed_repos):
console.print(f"[yellow]Repository '{path}' is already indexed. Skipping.[/yellow]")
db_manager.close_driver()
return
console.print(f"Starting indexing for: {path_obj}")
console.print("[yellow]This may take a few minutes for large repositories...[/yellow]")
async def do_index():
await graph_builder.build_graph_from_path_async(path_obj, is_dependency=False)
try:
asyncio.run(do_index())
time_end = time.time()
elapsed = time_end - time_start
console.print(f"[green]Successfully finished indexing: {path} in {elapsed:.2f} seconds[/green]")
except Exception as e:
console.print(f"[bold red]An error occurred during indexing:[/bold red] {e}")
finally:
db_manager.close_driver()
def add_package_helper(package_name: str, language: str):
"""Synchronously indexes a package."""
services = _initialize_services()
if not all(services):
return
db_manager, graph_builder, code_finder = services
package_path_str = get_local_package_path(package_name, language)
if not package_path_str:
console.print(f"[red]Error: Could not find package '{package_name}' for language '{language}'.[/red]")
db_manager.close_driver()
return
package_path = Path(package_path_str)
indexed_repos = code_finder.list_indexed_repositories()
if any(repo.get("name") == package_name for repo in indexed_repos if repo.get("is_dependency")):
console.print(f"[yellow]Package '{package_name}' is already indexed. Skipping.[/yellow]")
db_manager.close_driver()
return
console.print(f"Starting indexing for package '{package_name}' at: {package_path}")
console.print("[yellow]This may take a few minutes...[/yellow]")
async def do_index():
await graph_builder.build_graph_from_path_async(package_path, is_dependency=True)
try:
asyncio.run(do_index())
console.print(f"[green]Successfully finished indexing package: {package_name}[/green]")
except Exception as e:
console.print(f"[bold red]An error occurred during package indexing:[/bold red] {e}")
finally:
db_manager.close_driver()
def list_repos_helper():
"""Lists all indexed repositories."""
services = _initialize_services()
if not all(services):
return
db_manager, _, code_finder = services
try:
repos = code_finder.list_indexed_repositories()
if not repos:
console.print("[yellow]No repositories indexed yet.[/yellow]")
return
table = Table(show_header=True, header_style="bold magenta")
table.add_column("Name", style="dim")
table.add_column("Path")
table.add_column("Type")
for repo in repos:
repo_type = "Dependency" if repo.get("is_dependency") else "Project"
table.add_row(repo["name"], repo["path"], repo_type)
console.print(table)
except Exception as e:
console.print(f"[bold red]An error occurred:[/bold red] {e}")
finally:
db_manager.close_driver()
def delete_helper(repo_path: str):
"""Deletes a repository from the graph."""
services = _initialize_services()
if not all(services):
return
db_manager, graph_builder, _ = services
try:
graph_builder.delete_repository_from_graph(repo_path)
console.print(f"[green]Successfully deleted repository: {repo_path}[/green]")
except Exception as e:
console.print(f"[bold red]An error occurred:[/bold red] {e}")
finally:
db_manager.close_driver()
def cypher_helper(query: str):
"""Executes a read-only Cypher query."""
services = _initialize_services()
if not all(services):
return
db_manager, _, _ = services
# Replicating safety checks from MCPServer
forbidden_keywords = ['CREATE', 'MERGE', 'DELETE', 'SET', 'REMOVE', 'DROP', 'CALL apoc']
if any(keyword in query.upper() for keyword in forbidden_keywords):
console.print("[bold red]Error: This command only supports read-only queries.[/bold red]")
db_manager.close_driver()
return
try:
with db_manager.get_driver().session() as session:
result = session.run(query)
records = [record.data() for record in result]
console.print(json.dumps(records, indent=2))
except Exception as e:
console.print(f"[bold red]An error occurred while executing query:[/bold red] {e}")
finally:
db_manager.close_driver()
def visualize_helper(query: str):
"""Generates a URL to visualize a Cypher query."""
try:
encoded_query = urllib.parse.quote(query)
visualization_url = f"http://localhost:7474/browser/?cmd=edit&arg={encoded_query}"
console.print("[green]Graph visualization URL:[/green]")
console.print(visualization_url)
console.print("Open the URL in your browser to see the graph.")
except Exception as e:
console.print(f"[bold red]An error occurred while generating URL:[/bold red] {e}")