Skip to content

Commit 4a103bd

Browse files
committed
py: fix race condition in listen()
Fixes: #4898 Signed-off-by: Abhinav Gyawali <22275402+abhizer@users.noreply.github.com>
1 parent 4d877a5 commit 4a103bd

File tree

4 files changed

+54
-27
lines changed

4 files changed

+54
-27
lines changed

python/feldera/_callback_runner.py

Lines changed: 42 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,13 @@
1-
from threading import Thread
2-
from typing import Callable, Optional
1+
from threading import Thread, Event
2+
from typing import Callable, List, Optional, Mapping, Any
33

44
import pandas as pd
55
from feldera import FelderaClient
66
from feldera._helpers import dataframe_from_response
77
from feldera.enums import PipelineFieldSelector
8+
from feldera.rest.sql_table import SQLTable
9+
from feldera.rest.sql_view import SQLView
10+
from feldera.rest.pipeline import Pipeline
811

912

1013
class CallbackRunner(Thread):
@@ -15,6 +18,7 @@ def __init__(
1518
view_name: str,
1619
callback: Callable[[pd.DataFrame, int], None],
1720
exception_callback: Callable[[BaseException], None],
21+
event: Event,
1822
):
1923
super().__init__()
2024
self.daemon = True
@@ -23,7 +27,32 @@ def __init__(
2327
self.view_name: str = view_name
2428
self.callback: Callable[[pd.DataFrame, int], None] = callback
2529
self.exception_callback: Callable[[BaseException], None] = exception_callback
26-
self.schema: Optional[dict] = None
30+
self.event: Event = event
31+
32+
self.pipeline: Pipeline = self.client.get_pipeline(
33+
self.pipeline_name, PipelineFieldSelector.ALL
34+
)
35+
36+
view_schema = None
37+
38+
schemas: List[SQLTable | SQLView] = self.pipeline.tables + self.pipeline.views
39+
for schema in schemas:
40+
if schema.name == self.view_name:
41+
view_schema = schema
42+
break
43+
44+
if view_schema is None:
45+
raise ValueError(
46+
f"Table or View {self.view_name} not found in the pipeline schema."
47+
)
48+
49+
self.schema: SQLTable | SQLView = view_schema
50+
51+
def process_chunk(self, chunk: Mapping[str, Any]):
52+
data: Optional[list[Mapping[str, Any]]] = chunk.get("json_data")
53+
seq_no: Optional[int] = chunk.get("sequence_number")
54+
if data is not None and seq_no is not None:
55+
self.callback(dataframe_from_response([data], self.schema.fields), seq_no)
2756

2857
def run(self):
2958
"""
@@ -33,21 +62,6 @@ def run(self):
3362
"""
3463

3564
try:
36-
pipeline = self.client.get_pipeline(
37-
self.pipeline_name, PipelineFieldSelector.ALL
38-
)
39-
40-
schemas = pipeline.tables + pipeline.views
41-
for schema in schemas:
42-
if schema.name == self.view_name:
43-
self.schema = schema
44-
break
45-
46-
if self.schema is None:
47-
raise ValueError(
48-
f"Table or View {self.view_name} not found in the pipeline schema."
49-
)
50-
5165
gen_obj = self.client.listen_to_pipeline(
5266
self.pipeline_name,
5367
self.view_name,
@@ -57,13 +71,16 @@ def run(self):
5771

5872
iterator = gen_obj()
5973

74+
# Trigger the HTTP call
75+
chunk = next(iterator)
76+
77+
# Unblock the main thread
78+
self.event.set()
79+
80+
self.process_chunk(chunk)
81+
6082
for chunk in iterator:
61-
chunk: dict = chunk
62-
data: Optional[list[dict]] = chunk.get("json_data")
63-
seq_no: Optional[int] = chunk.get("sequence_number")
64-
if data is not None and seq_no is not None:
65-
self.callback(
66-
dataframe_from_response([data], self.schema.fields), seq_no
67-
)
83+
self.process_chunk(chunk)
84+
6885
except BaseException as e:
6986
self.exception_callback(e)

python/feldera/_helpers.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import pandas as pd
44
from decimal import Decimal
5+
from typing import Mapping, Any
56

67

78
def sql_type_to_pandas_type(sql_type: str):
@@ -60,7 +61,9 @@ def ensure_dataframe_has_columns(df: pd.DataFrame):
6061
)
6162

6263

63-
def dataframe_from_response(buffer: list[list[dict]], fields: list[dict]):
64+
def dataframe_from_response(
65+
buffer: list[list[Mapping[str, Any]]], fields: list[Mapping[str, Any]]
66+
):
6467
"""
6568
Converts the response from Feldera to a pandas DataFrame.
6669
"""

python/feldera/output_handler.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import pandas as pd
22

33
from typing import Optional
4+
from threading import Event
45

56
from feldera import FelderaClient
67
from feldera._callback_runner import CallbackRunner
@@ -23,6 +24,7 @@ def __init__(
2324
self.view_name: str = view_name
2425
self.buffer: list[pd.DataFrame] = []
2526
self.exception: Optional[BaseException] = None
27+
self.event = Event()
2628

2729
# the callback that is passed to the `CallbackRunner`
2830
def callback(df: pd.DataFrame, _: int):
@@ -39,6 +41,7 @@ def exception_callback(exception: BaseException):
3941
self.view_name,
4042
callback,
4143
exception_callback,
44+
self.event,
4245
)
4346

4447
def start(self):
@@ -47,6 +50,7 @@ def start(self):
4750
"""
4851

4952
self.handler.start()
53+
_ = self.event.wait()
5054

5155
def to_pandas(self, clear_buffer: bool = True):
5256
"""

python/feldera/pipeline.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
from uuid import UUID
88

99
from typing import List, Dict, Callable, Optional, Generator, Mapping, Any
10+
from threading import Event
1011
from collections import deque
1112

1213
from feldera.rest.errors import FelderaAPIError
@@ -294,10 +295,12 @@ def foreach_chunk(
294295
if self.status() not in [PipelineStatus.RUNNING, PipelineStatus.PAUSED]:
295296
raise RuntimeError("Pipeline must be running or paused to listen to output")
296297

298+
event = Event()
297299
handler = CallbackRunner(
298-
self.client, self.name, view_name, callback, lambda exception: None
300+
self.client, self.name, view_name, callback, lambda exception: None, event
299301
)
300302
handler.start()
303+
event.wait()
301304

302305
def wait_for_completion(
303306
self, force_stop: bool = False, timeout_s: float | None = None

0 commit comments

Comments
 (0)