@@ -71,11 +71,6 @@ def __init__(
7171 self .tables : Dict [str , SQLTable ] = {}
7272 self .types : Dict [str , str ] = {}
7373
74- # TODO: to be used for schema inference
75- self .todo_tables : Dict [str , Optional [SQLTable ]] = {}
76-
77- self .http_input_buffer : list [Dict [str , pd .DataFrame ]] = []
78-
7974 # buffer that stores all input connectors to be created
8075 # this is a Mapping[table_name -> list[Connector]]
8176 self .input_connectors_buffer : Dict [str , list [Connector ]] = {}
@@ -167,28 +162,6 @@ def __setup_output_listeners(self):
167162 # block until the callback runner is ready
168163 queue .join ()
169164
170- def __push_http_inputs (self ):
171- """
172- Internal function used to push the input data to the pipeline.
173-
174- :meta private:
175- """
176-
177- for input_buffer in self .http_input_buffer :
178- for tbl_name , data in input_buffer .items ():
179- for datum in chunk_dataframe (data ):
180- self .client .push_to_pipeline (
181- self .pipeline_name ,
182- tbl_name ,
183- "json" ,
184- datum .to_json (orient = 'records' , date_format = 'epoch' ),
185- json_flavor = 'pandas' ,
186- array = True ,
187- serialize = False
188- )
189-
190- self .http_input_buffer .clear ()
191-
192165 def create (self ) -> Self :
193166 """
194167 Sets the build mode to CREATE, meaning that the pipeline will be created from scratch.
@@ -253,8 +226,6 @@ def register_table(self, table_name: str, schema: Optional[SQLSchema] = None, dd
253226
254227 if schema :
255228 self .tables [table_name ] = SQLTable (table_name , schema = schema )
256- else :
257- self .todo_tables [table_name ] = None
258229
259230 def register_table_from_sql (self , ddl : str ):
260231 """
@@ -272,34 +243,47 @@ def register_table_from_sql(self, ddl: str):
272243
273244 self .tables [name ] = SQLTable (name , ddl )
274245
275- def connect_source_pandas (self , table_name : str , df : pandas .DataFrame , flush : bool = False ):
246+ def input_pandas (self , table_name : str , df : pandas .DataFrame , force : bool = False ):
276247 """
277248 Adds a pandas DataFrame to the input buffer of the SQLContext, to be pushed to the pipeline.
278249 Note that if the pipeline is running, the data will not be pushed if `flush` is False.
279250
280251 :param table_name: The name of the table.
281252 :param df: The pandas DataFrame to be pushed to the pipeline.
282- :param flush: If True, the data will be pushed to the pipeline immediately. Defaults to False .
253+ :param force: ` True` to push data even if the pipeline is paused. `False` by default .
283254 """
284255
285- if flush and self .pipeline_status () != PipelineStatus .RUNNING :
286- raise RuntimeError ("Pipeline must be running to flush the data" )
256+ status = self .pipeline_status ()
257+ if status not in [
258+ PipelineStatus .RUNNING ,
259+ PipelineStatus .PAUSED ,
260+ ]:
261+ raise RuntimeError ("Pipeline must be running or paused to push data" )
262+
263+ if not force and status == PipelineStatus .PAUSED :
264+ raise RuntimeError ("Pipeline is paused, set force=True to push data" )
287265
288266 ensure_dataframe_has_columns (df )
289267
290268 tbl = self .tables .get (table_name )
291269
292- if tbl :
270+ if tbl is None :
271+ raise ValueError (f"Cannot push to table '{ table_name } ' as it is not registered yet" )
272+ else :
293273 # tbl.validate_schema(df) TODO: something like this would be nice
294- self .http_input_buffer .append ({tbl .name : df })
295- if flush :
296- self .__push_http_inputs ()
274+ for datum in chunk_dataframe (df ):
275+ self .client .push_to_pipeline (
276+ self .pipeline_name ,
277+ table_name ,
278+ "json" ,
279+ datum .to_json (orient = 'records' , date_format = 'epoch' ),
280+ json_flavor = 'pandas' ,
281+ array = True ,
282+ serialize = False ,
283+ force = force ,
284+ )
297285 return
298286
299- # TODO: handle schema inference
300- if tbl is None :
301- raise ValueError (f"Cannot push to table { table_name } as it is not registered yet" )
302-
303287 def register_local_view (self , name : str , query : str ):
304288 """
305289 Registers a local view with the SQLContext.
@@ -639,8 +623,6 @@ def start(self):
639623
640624 self .resume ()
641625
642- self .__push_http_inputs ()
643-
644626 def wait_for_idle (
645627 self ,
646628 idle_interval_s : float = 5.0 ,
@@ -740,7 +722,12 @@ def delete(self, delete_program: bool = True, delete_connectors: bool = False):
740722 :param delete_connectors: If True, also deletes the connectors associated with the pipeline. False by default.
741723 """
742724
743- if self .pipeline_status () != PipelineStatus .SHUTDOWN :
725+ current_status = self .pipeline_status ()
726+
727+ if current_status == PipelineStatus .NOT_FOUND :
728+ raise RuntimeError ("Attempting to delete a pipeline that hasn't been created yet" )
729+
730+ if current_status not in [PipelineStatus .SHUTDOWN , PipelineStatus .FAILED ]:
744731 raise RuntimeError ("Pipeline must be shutdown before deletion" )
745732
746733 self .client .delete_pipeline (self .pipeline_name )
@@ -754,4 +741,4 @@ def delete(self, delete_program: bool = True, delete_connectors: bool = False):
754741 self .client .delete_connector (conn .name )
755742 for connector in self .output_connectors_buffer .values ():
756743 for conn in connector :
757- self .client .delete_connector (conn .name )
744+ self .client .delete_connector (conn .name )
0 commit comments