1- from threading import Thread
2- from typing import Callable , Optional
1+ from threading import Thread , Event
2+ from typing import Callable , List , Optional , Mapping , Any
33
44import pandas as pd
55from feldera import FelderaClient
66from feldera ._helpers import dataframe_from_response
77from feldera .enums import PipelineFieldSelector
8+ from feldera .rest .sql_table import SQLTable
9+ from feldera .rest .sql_view import SQLView
10+ from feldera .rest .pipeline import Pipeline
811
912
1013class CallbackRunner (Thread ):
@@ -15,6 +18,7 @@ def __init__(
1518 view_name : str ,
1619 callback : Callable [[pd .DataFrame , int ], None ],
1720 exception_callback : Callable [[BaseException ], None ],
21+ event : Event ,
1822 ):
1923 super ().__init__ ()
2024 self .daemon = True
@@ -23,7 +27,32 @@ def __init__(
2327 self .view_name : str = view_name
2428 self .callback : Callable [[pd .DataFrame , int ], None ] = callback
2529 self .exception_callback : Callable [[BaseException ], None ] = exception_callback
26- self .schema : Optional [dict ] = None
30+ self .event : Event = event
31+
32+ self .pipeline : Pipeline = self .client .get_pipeline (
33+ self .pipeline_name , PipelineFieldSelector .ALL
34+ )
35+
36+ view_schema = None
37+
38+ schemas : List [SQLTable | SQLView ] = self .pipeline .tables + self .pipeline .views
39+ for schema in schemas :
40+ if schema .name == self .view_name :
41+ view_schema = schema
42+ break
43+
44+ if view_schema is None :
45+ raise ValueError (
46+ f"Table or View { self .view_name } not found in the pipeline schema."
47+ )
48+
49+ self .schema : SQLTable | SQLView = view_schema
50+
51+ def process_chunk (self , chunk : Mapping [str , Any ]):
52+ data : Optional [list [Mapping [str , Any ]]] = chunk .get ("json_data" )
53+ seq_no : Optional [int ] = chunk .get ("sequence_number" )
54+ if data is not None and seq_no is not None :
55+ self .callback (dataframe_from_response ([data ], self .schema .fields ), seq_no )
2756
2857 def run (self ):
2958 """
@@ -33,21 +62,6 @@ def run(self):
3362 """
3463
3564 try :
36- pipeline = self .client .get_pipeline (
37- self .pipeline_name , PipelineFieldSelector .ALL
38- )
39-
40- schemas = pipeline .tables + pipeline .views
41- for schema in schemas :
42- if schema .name == self .view_name :
43- self .schema = schema
44- break
45-
46- if self .schema is None :
47- raise ValueError (
48- f"Table or View { self .view_name } not found in the pipeline schema."
49- )
50-
5165 gen_obj = self .client .listen_to_pipeline (
5266 self .pipeline_name ,
5367 self .view_name ,
@@ -57,13 +71,16 @@ def run(self):
5771
5872 iterator = gen_obj ()
5973
74+ # Trigger the HTTP call
75+ chunk = next (iterator )
76+
77+ # Unblock the main thread
78+ self .event .set ()
79+
80+ self .process_chunk (chunk )
81+
6082 for chunk in iterator :
61- chunk : dict = chunk
62- data : Optional [list [dict ]] = chunk .get ("json_data" )
63- seq_no : Optional [int ] = chunk .get ("sequence_number" )
64- if data is not None and seq_no is not None :
65- self .callback (
66- dataframe_from_response ([data ], self .schema .fields ), seq_no
67- )
83+ self .process_chunk (chunk )
84+
6885 except BaseException as e :
6986 self .exception_callback (e )
0 commit comments