Skip to content

Commit c19c0ca

Browse files
committed
feat: Add verbose mode with worker lifecycle monitoring and improved error handling
Adds --verbose flag to init and index commands that shows timestamped progress output instead of animated progress bars. Implements worker timeout protection (10s per file) and periodic worker recycling (every 500 files) to prevent WASM memory crashes from hanging the entire indexing process. Includes detailed logging of worker lifecycle events and memory usage for debugging large repository indexing issues.
1 parent 64d844c commit c19c0ca

3 files changed

Lines changed: 221 additions & 44 deletions

File tree

src/bin/codegraph.ts

Lines changed: 66 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,40 @@ function formatDuration(ms: number): string {
176176
// Shimmer progress renderer (runs in a worker thread for smooth animation)
177177
// Imported at top of file from '../ui/shimmer-progress'
178178
179+
/**
180+
* Create a plain-text progress callback for --verbose mode.
181+
* No animations, no ANSI tricks — just timestamped lines to stdout.
182+
*/
183+
function createVerboseProgress(): (progress: { phase: string; current: number; total: number; currentFile?: string }) => void {
184+
let lastPhase = '';
185+
let lastPct = -1;
186+
const startTime = Date.now();
187+
188+
return (progress) => {
189+
const elapsed = ((Date.now() - startTime) / 1000).toFixed(1);
190+
191+
if (progress.phase !== lastPhase) {
192+
lastPhase = progress.phase;
193+
lastPct = -1;
194+
console.log(`[${elapsed}s] Phase: ${progress.phase}`);
195+
}
196+
197+
if (progress.total > 0) {
198+
const pct = Math.floor((progress.current / progress.total) * 100);
199+
// Log every 5% to keep output manageable
200+
if (pct >= lastPct + 5 || progress.current === progress.total) {
201+
lastPct = pct;
202+
console.log(`[${elapsed}s] ${progress.current}/${progress.total} (${pct}%)${progress.currentFile ? ` ${progress.currentFile}` : ''}`);
203+
}
204+
} else if (progress.current > 0) {
205+
// Scanning phase (no total yet) — log periodically
206+
if (progress.current % 1000 === 0 || progress.current === 1) {
207+
console.log(`[${elapsed}s] ${formatNumber(progress.current)} files found`);
208+
}
209+
}
210+
};
211+
}
212+
179213
/**
180214
* Print success message
181215
*/
@@ -330,7 +364,8 @@ program
330364
.command('init [path]')
331365
.description('Initialize CodeGraph in a project directory')
332366
.option('-i, --index', 'Run initial indexing after initialization')
333-
.action(async (pathArg: string | undefined, options: { index?: boolean }) => {
367+
.option('-v, --verbose', 'Show detailed worker lifecycle and memory info')
368+
.action(async (pathArg: string | undefined, options: { index?: boolean; verbose?: boolean }) => {
334369
const projectPath = path.resolve(pathArg || process.cwd());
335370
const clack = await importESM('@clack/prompts');
336371

@@ -349,14 +384,22 @@ program
349384
clack.log.success(`Initialized in ${projectPath}`);
350385

351386
if (options.index) {
352-
process.stdout.write(`${colors.dim}${colors.reset}\n`);
353-
const progress = createShimmerProgress();
387+
let result: IndexResult;
354388

355-
const result = await cg.indexAll({
356-
onProgress: progress.onProgress,
357-
});
389+
if (options.verbose) {
390+
result = await cg.indexAll({
391+
onProgress: createVerboseProgress(),
392+
verbose: true,
393+
});
394+
} else {
395+
process.stdout.write(`${colors.dim}${colors.reset}\n`);
396+
const progress = createShimmerProgress();
397+
result = await cg.indexAll({
398+
onProgress: progress.onProgress,
399+
});
400+
await progress.stop();
401+
}
358402

359-
await progress.stop();
360403
printIndexResult(clack, result, projectPath);
361404
} else {
362405
clack.log.info('Run "codegraph index" to index the project');
@@ -423,7 +466,8 @@ program
423466
.description('Index all files in the project')
424467
.option('-f, --force', 'Force full re-index even if already indexed')
425468
.option('-q, --quiet', 'Suppress progress output')
426-
.action(async (pathArg: string | undefined, options: { force?: boolean; quiet?: boolean }) => {
469+
.option('-v, --verbose', 'Show detailed worker lifecycle and memory info')
470+
.action(async (pathArg: string | undefined, options: { force?: boolean; quiet?: boolean; verbose?: boolean }) => {
427471
const projectPath = resolveProjectPath(pathArg);
428472

429473
try {
@@ -453,14 +497,22 @@ program
453497
clack.log.info('Cleared existing index');
454498
}
455499

456-
process.stdout.write(`${colors.dim}${colors.reset}\n`);
457-
const progress = createShimmerProgress();
500+
let result: IndexResult;
458501

459-
const result = await cg.indexAll({
460-
onProgress: progress.onProgress,
461-
});
502+
if (options.verbose) {
503+
result = await cg.indexAll({
504+
onProgress: createVerboseProgress(),
505+
verbose: true,
506+
});
507+
} else {
508+
process.stdout.write(`${colors.dim}${colors.reset}\n`);
509+
const progress = createShimmerProgress();
510+
result = await cg.indexAll({
511+
onProgress: progress.onProgress,
512+
});
513+
await progress.stop();
514+
}
462515

463-
await progress.stop();
464516
printIndexResult(clack, result, projectPath);
465517

466518
if (!result.success) {

src/extraction/index.ts

Lines changed: 151 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,22 @@ const FILE_IO_BATCH_SIZE = 10;
3131

3232
// PARSER_RESET_INTERVAL moved to parse-worker.ts (runs in worker thread)
3333

34+
/**
35+
* Maximum time (ms) to wait for a single file to parse in the worker thread.
36+
* If tree-sitter hangs or WASM runs out of memory, this prevents the entire
37+
* indexing run from freezing. The worker is restarted after a timeout.
38+
*/
39+
const PARSE_TIMEOUT_MS = 10_000;
40+
41+
/**
42+
* Number of files to parse before recycling the worker thread.
43+
* WASM linear memory can grow but NEVER shrink (WebAssembly spec limitation).
44+
* The only way to reclaim tree-sitter's WASM heap is to destroy the entire
45+
* V8 isolate by terminating the worker thread and spawning a fresh one.
46+
* This interval balances memory usage against the cost of reloading grammars.
47+
*/
48+
const WORKER_RECYCLE_INTERVAL = 500;
49+
3450
/**
3551
* Progress callback for indexing operations
3652
*/
@@ -395,7 +411,8 @@ export class ExtractionOrchestrator {
395411
*/
396412
async indexAll(
397413
onProgress?: (progress: IndexProgress) => void,
398-
signal?: AbortSignal
414+
signal?: AbortSignal,
415+
verbose?: boolean
399416
): Promise<IndexResult> {
400417
await initGrammars();
401418
const startTime = Date.now();
@@ -406,6 +423,10 @@ export class ExtractionOrchestrator {
406423
let totalNodes = 0;
407424
let totalEdges = 0;
408425

426+
const log = verbose
427+
? (msg: string) => { console.log(`[worker] ${msg}`); }
428+
: (_msg: string) => {};
429+
409430
// Phase 1: Scan for files
410431
onProgress?.({
411432
phase: 'scanning',
@@ -446,58 +467,139 @@ export class ExtractionOrchestrator {
446467
// Falls back to in-process parsing if the compiled worker is unavailable (e.g. tests).
447468
const parseWorkerPath = path.join(__dirname, 'parse-worker.js');
448469
const useWorker = fs.existsSync(parseWorkerPath);
449-
let parseWorker: import('worker_threads').Worker | null = null;
470+
let WorkerClass: typeof import('worker_threads').Worker | null = null;
450471

451472
if (useWorker) {
452473
const { Worker } = await import('worker_threads');
453-
parseWorker = new Worker(parseWorkerPath);
474+
WorkerClass = Worker;
454475
} else {
455476
// In-process fallback: load grammars locally
456477
await loadGrammarsForLanguages(neededLanguages);
457478
}
458479

459-
// Set up worker-based or in-process parsing
480+
// --- Worker lifecycle management ---
481+
// The worker can crash (OOM in WASM) or hang on pathological files.
482+
// We track pending parse promises and handle both cases:
483+
// - Timeout: terminate + restart the worker, reject the timed-out request
484+
// - Crash: reject all pending promises, restart for remaining files
485+
let parseWorker: import('worker_threads').Worker | null = null;
460486
let nextId = 0;
487+
let workerParseCount = 0;
461488
const pendingParses = new Map<number, {
462489
resolve: (result: ExtractionResult) => void;
490+
reject: (err: Error) => void;
491+
timer: ReturnType<typeof setTimeout>;
463492
}>();
464493

465-
if (parseWorker) {
466-
// Wait for grammars to load in the worker
467-
await new Promise<void>((resolve, reject) => {
468-
parseWorker!.once('message', (msg: { type: string }) => {
469-
if (msg.type === 'grammars-loaded') resolve();
470-
else reject(new Error(`Unexpected message: ${msg.type}`));
471-
});
472-
parseWorker!.postMessage({ type: 'load-grammars', languages: neededLanguages });
473-
});
494+
function rejectAllPending(reason: string): void {
495+
for (const [id, pending] of pendingParses) {
496+
clearTimeout(pending.timer);
497+
pendingParses.delete(id);
498+
pending.reject(new Error(reason));
499+
}
500+
}
474501

475-
parseWorker.on('message', (msg: { type: string; id?: number; result?: ExtractionResult }) => {
502+
function attachWorkerHandlers(w: import('worker_threads').Worker): void {
503+
w.on('message', (msg: { type: string; id?: number; result?: ExtractionResult }) => {
476504
if (msg.type === 'parse-result' && msg.id !== undefined) {
477505
const pending = pendingParses.get(msg.id);
478506
if (pending) {
507+
clearTimeout(pending.timer);
479508
pendingParses.delete(msg.id);
480509
pending.resolve(msg.result!);
481510
}
482511
}
483512
});
513+
514+
w.on('error', (err) => {
515+
logWarn('Parse worker error', { error: err.message });
516+
rejectAllPending(`Worker error: ${err.message}`);
517+
});
518+
519+
w.on('exit', (code) => {
520+
if (code !== 0 && pendingParses.size > 0) {
521+
logWarn('Parse worker exited unexpectedly', { code });
522+
rejectAllPending(`Worker exited with code ${code}`);
523+
}
524+
// Clear reference so we know to respawn
525+
if (parseWorker === w) parseWorker = null;
526+
});
484527
}
485528

486-
function requestParse(filePath: string, content: string): Promise<ExtractionResult> {
487-
if (parseWorker) {
488-
return new Promise<ExtractionResult>((resolve) => {
489-
const id = nextId++;
490-
pendingParses.set(id, { resolve });
491-
parseWorker!.postMessage({ type: 'parse', id, filePath, content });
529+
async function ensureWorker(): Promise<import('worker_threads').Worker> {
530+
if (parseWorker) return parseWorker;
531+
log('Spawning new parse worker...');
532+
parseWorker = new WorkerClass!(parseWorkerPath);
533+
attachWorkerHandlers(parseWorker);
534+
535+
// Load grammars in the new worker
536+
await new Promise<void>((resolve, reject) => {
537+
parseWorker!.once('message', (msg: { type: string }) => {
538+
if (msg.type === 'grammars-loaded') resolve();
539+
else reject(new Error(`Unexpected message: ${msg.type}`));
492540
});
541+
parseWorker!.postMessage({ type: 'load-grammars', languages: neededLanguages });
542+
});
543+
544+
return parseWorker;
545+
}
546+
547+
if (WorkerClass) {
548+
await ensureWorker();
549+
}
550+
551+
/**
552+
* Recycle the worker thread to reclaim WASM memory.
553+
* Terminates the current worker and clears the reference so
554+
* ensureWorker() will spawn a fresh one on the next call.
555+
*/
556+
function recycleWorker(): void {
557+
if (!parseWorker) return;
558+
log(`Recycling worker after ${workerParseCount} parses (heap: ${Math.round(process.memoryUsage().rss / 1024 / 1024)}MB RSS)`);
559+
const w = parseWorker;
560+
parseWorker = null;
561+
workerParseCount = 0;
562+
// Fire-and-forget: worker.terminate() can hang if WASM is stuck
563+
w.terminate().catch(() => {});
564+
}
565+
566+
async function requestParse(filePath: string, content: string): Promise<ExtractionResult> {
567+
if (!WorkerClass) {
568+
// In-process fallback
569+
return extractFromSource(filePath, content, detectLanguage(filePath));
493570
}
494-
// In-process fallback
495-
return Promise.resolve(extractFromSource(filePath, content, detectLanguage(filePath)));
571+
572+
// Recycle the worker before the next parse if we've hit the threshold.
573+
// This destroys the WASM linear memory (which can grow but never shrink)
574+
// and starts a fresh worker with a clean heap.
575+
if (workerParseCount >= WORKER_RECYCLE_INTERVAL) {
576+
await recycleWorker();
577+
}
578+
579+
const worker = await ensureWorker();
580+
const id = nextId++;
581+
workerParseCount++;
582+
583+
return new Promise<ExtractionResult>((resolve, reject) => {
584+
const timer = setTimeout(() => {
585+
pendingParses.delete(id);
586+
log(`TIMEOUT: ${filePath} exceeded ${PARSE_TIMEOUT_MS}ms — killing worker`);
587+
// Reject FIRST — worker.terminate() can hang if WASM is stuck
588+
parseWorker = null;
589+
workerParseCount = 0;
590+
reject(new Error(`Parse timed out after ${PARSE_TIMEOUT_MS}ms`));
591+
// Fire-and-forget: kill the stuck worker in the background
592+
worker.terminate().catch(() => {});
593+
}, PARSE_TIMEOUT_MS);
594+
595+
pendingParses.set(id, { resolve, reject, timer });
596+
worker.postMessage({ type: 'parse', id, filePath, content });
597+
});
496598
}
497599

498600
for (let i = 0; i < files.length; i += FILE_IO_BATCH_SIZE) {
499601
if (signal?.aborted) {
500-
if (parseWorker) await parseWorker.terminate();
602+
if (parseWorker) (parseWorker as import('worker_threads').Worker).terminate().catch(() => {});
501603
return {
502604
success: false,
503605
filesIndexed,
@@ -533,7 +635,7 @@ export class ExtractionOrchestrator {
533635
// Send to worker for parsing, store results on main thread
534636
for (const { filePath, content, stats, error } of fileContents) {
535637
if (signal?.aborted) {
536-
if (parseWorker) await parseWorker.terminate();
638+
if (parseWorker) (parseWorker as import('worker_threads').Worker).terminate().catch(() => {});
537639
return {
538640
success: false,
539641
filesIndexed,
@@ -546,7 +648,7 @@ export class ExtractionOrchestrator {
546648
};
547649
}
548650

549-
processed++;
651+
// Report progress before parsing (show current file being worked on)
550652
onProgress?.({
551653
phase: 'parsing',
552654
current: processed,
@@ -555,6 +657,7 @@ export class ExtractionOrchestrator {
555657
});
556658

557659
if (error || content === null || stats === null) {
660+
processed++;
558661
filesErrored++;
559662
errors.push({
560663
message: `Failed to read file: ${error instanceof Error ? error.message : String(error)}`,
@@ -565,8 +668,24 @@ export class ExtractionOrchestrator {
565668
continue;
566669
}
567670

568-
// Parse in worker thread (main thread stays unblocked)
569-
const result = await requestParse(filePath, content);
671+
// Parse in worker thread (main thread stays unblocked).
672+
// Wrapped in try/catch to handle worker timeouts and crashes gracefully.
673+
let result: ExtractionResult;
674+
try {
675+
result = await requestParse(filePath, content);
676+
} catch (parseErr) {
677+
processed++;
678+
filesErrored++;
679+
errors.push({
680+
message: parseErr instanceof Error ? parseErr.message : String(parseErr),
681+
filePath,
682+
severity: 'error',
683+
code: 'parse_error',
684+
});
685+
continue;
686+
}
687+
688+
processed++;
570689

571690
// Store in database on main thread (SQLite is not thread-safe)
572691
if (result.nodes.length > 0 || result.errors.length === 0) {
@@ -593,8 +712,11 @@ export class ExtractionOrchestrator {
593712
}
594713
}
595714

596-
// Shut down parse worker
597-
if (parseWorker) await parseWorker.terminate();
715+
// Shut down parse worker and clear any pending timers
716+
rejectAllPending('Indexing complete');
717+
if (parseWorker) {
718+
(parseWorker as import('worker_threads').Worker).terminate().catch(() => {});
719+
}
598720

599721
// Phase 3: Resolve references
600722
onProgress?.({

0 commit comments

Comments
 (0)