@@ -65,6 +65,10 @@ def __init__(self, batch_size: int = PARSE_CHUNK, max_file_bytes: int = 3 * 1024
6565 self ._resolve : BackendResolver | None = None
6666 self ._thread : threading .Thread | None = None
6767 self ._stop = threading .Event ()
68+ # tid -> (rel_path, size, t_start) for each parse worker currently busy.
69+ # Used by stop()'s diagnostic dump and the "slow parse" log.
70+ self ._busy : dict [int , tuple [str , int , float ]] = {}
71+ self ._busy_lock = threading .Lock ()
6872 # Counters
6973 self ._n_enqueued = 0
7074 self ._n_deduped = 0
@@ -91,8 +95,36 @@ def stop(self, timeout: float = 10.0) -> None:
9195 self ._stop .set ()
9296 with self ._cond :
9397 self ._cond .notify_all ()
94- if self ._thread :
95- self ._thread .join (timeout = timeout )
98+ if not self ._thread :
99+ return
100+
101+ # Poll-join with a busy-file dump every second if the worker is still alive.
102+ # Diagnostics for "queue won't drain" — shows which files parse workers
103+ # are stuck on (typically large/minified JS or deeply nested ASTs).
104+ deadline = time .monotonic () + timeout
105+ while self ._thread .is_alive () and time .monotonic () < deadline :
106+ self ._thread .join (timeout = 1.0 )
107+ if not self ._thread .is_alive ():
108+ break
109+ now = time .perf_counter ()
110+ with self ._busy_lock :
111+ snapshot = list (self ._busy .items ())
112+ if snapshot :
113+ files = ", " .join (
114+ f"{ rel } ({ size :,} B, { now - t0 :.1f} s)"
115+ for _tid , (rel , size , t0 ) in snapshot
116+ )
117+ print (
118+ f"[index-queue] worker still alive (items_left={ len (self ._items )} ); "
119+ f"parsing: { files } " ,
120+ flush = True ,
121+ )
122+ else :
123+ print (
124+ f"[index-queue] worker still alive (items_left={ len (self ._items )} ); "
125+ f"no parse workers busy" ,
126+ flush = True ,
127+ )
96128
97129 # ── public interface ──────────────────────────────────────────────────────
98130
@@ -245,16 +277,30 @@ def _parse_one(item):
245277 if action == "delete" :
246278 return ("delete" , collection , _file_id (rel ))
247279 try :
248- if os .path .getsize (full_path ) > max_bytes :
280+ size = os .path .getsize (full_path )
281+ if size > max_bytes :
249282 return None
250283 except OSError :
251284 return None
285+ tid = threading .get_ident ()
286+ t0 = time .perf_counter ()
287+ with self ._busy_lock :
288+ self ._busy [tid ] = (rel , size , t0 )
252289 try :
253290 doc = build_document (full_path , rel )
291+ t = time .perf_counter () - t0
292+ if t > 1.0 :
293+ print (
294+ f"[index-queue] SLOW parse: { rel } took { t :.2f} s ({ size :,} bytes)" ,
295+ flush = True ,
296+ )
254297 if doc :
255298 return ("upsert" , collection , doc )
256299 except OSError :
257300 pass
301+ finally :
302+ with self ._busy_lock :
303+ self ._busy .pop (tid , None )
258304 return None
259305
260306 t_parse_start = time .perf_counter ()
0 commit comments