Skip to content

Commit 9823f23

Browse files
abhizerryzhyk
authored andcommitted
py: refactor(connect_input_pandas) -> input_pandas
* `input_pandas` must now be called after starting a pipeline Signed-off-by: Abhinav Gyawali <22275402+abhizer@users.noreply.github.com>
1 parent 3dd6e11 commit 9823f23

9 files changed

Lines changed: 85 additions & 84 deletions

File tree

demo/project_demo10-FraudDetectionDeltaLake/notebook.ipynb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -333,6 +333,7 @@
333333
"hfeature = sql.listen(\"feature\")\n",
334334
"\n",
335335
"# Process full snapshot of the input tables and compute a dataset with feature vectors.\n",
336+
"sql.start()\n",
336337
"sql.wait_for_completion(shutdown=True)\n",
337338
"\n",
338339
"# Read computed feature vectors into a Pandas dataframe.\n",

demo/project_demo10-FraudDetectionDeltaLake/run.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,7 @@ def main():
121121

122122
# Process full snapshot of the input tables and compute a dataset
123123
# with feature vectors for use in model training and testing.
124+
sql.start()
124125
sql.wait_for_completion(shutdown=True)
125126

126127
features_pd = hfeature.to_pandas()

docs/use_cases/fraud_detection/fraud_detection.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -298,6 +298,7 @@ hfeature = sql.listen("feature")
298298

299299
# Process full snapshot of the input tables and compute a dataset
300300
# with feature vectors for use in model training and testing.
301+
sql.start()
301302
sql.wait_for_completion(shutdown=True)
302303

303304
features_pd = hfeature.to_pandas()

python/docs/examples.rst

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
Examples
22
========
33

4-
Pandas
5-
*******
4+
Using Pandas DataFrames as Input / Output
5+
*******************************************
66

77

88
Working wth pandas DataFrames in Feldera is fairly straight forward.
9-
You can use :meth:`.SQLContext.connect_source_pandas` to connect a
9+
You can use :meth:`.SQLContext.input_pandas` to connect a
1010
DataFrame to a feldera table as the data source.
1111

1212
To listen for response from feldera, in the form of DataFrames
@@ -48,15 +48,18 @@ To ensure all data is received start listening before calling
4848
query = f"SELECT name, ((science + maths + art) / 3) as average FROM {TBL_NAMES[0]} JOIN {TBL_NAMES[1]} on id = student_id ORDER BY average DESC"
4949
sql.register_output_view(view_name, query)
5050
51-
# connect the source (a pandas Dataframe in this case) to the tables
52-
sql.connect_source_pandas(TBL_NAMES[0], df_students)
53-
sql.connect_source_pandas(TBL_NAMES[1], df_grades)
54-
5551
# listen for the output of the view here in the notebook
5652
# you do not need to call this if you are forwarding the data to a sink
5753
out = sql.listen(view_name)
5854
59-
# run this to completion
55+
# start the pipeline
56+
sql.start()
57+
58+
# connect the source (a pandas Dataframe in this case) to the tables
59+
sql.input_pandas(TBL_NAMES[0], df_students)
60+
sql.input_pandas(TBL_NAMES[1], df_grades)
61+
62+
# wait for the pipeline to complete
6063
# note that if the source is a stream, this will run indefinitely
6164
sql.wait_for_completion(shutdown=True)
6265
@@ -67,8 +70,8 @@ To ensure all data is received start listening before calling
6770
print(df)
6871
6972
70-
Kafka
71-
******
73+
Using Kafka as Data Source / Sink
74+
***********************************
7275

7376
To setup Kafka as the source use :meth:`.SQLContext.connect_source_kafka` and as the sink use
7477
:meth:`.SQLContext.connect_sink_kafka`.
@@ -157,8 +160,8 @@ More on Kafka as the output connector at: https://www.feldera.com/docs/connector
157160
df = out.to_pandas()
158161
159162
160-
HTTP GET
161-
*********
163+
Ingesting data from a URL
164+
**************************
162165

163166

164167
Feldera can ingest data from a user-provided URL into a SQL table.
@@ -193,6 +196,7 @@ More on the HTTP GET connector at: https://www.feldera.com/docs/connectors/sourc
193196
194197
out = sql.listen(VIEW_NAME)
195198
199+
sql.start()
196200
sql.wait_for_completion(shutdown=True)
197201
198202
df = out.to_pandas()

python/feldera/_helpers.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,4 +40,4 @@ def chunk_dataframe(df, chunk_size=1000):
4040
"""
4141

4242
for i in range(0, len(df), chunk_size):
43-
yield df.iloc[i:i + chunk_size]
43+
yield df.iloc[i:i + chunk_size]

python/feldera/formats.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,7 @@ def __init__(
169169
self,
170170
config: Optional[dict] = None,
171171
schema: Optional[str] = None,
172-
skip_schema_id: Optional[bool] = None,
172+
skip_schema_id: Optional[bool] = False,
173173
registry_urls: Optional[list[str]] = None,
174174
registry_headers: Optional[Mapping[str, str]] = None,
175175
registry_proxy: Optional[str] = None,

python/feldera/sql_context.py

Lines changed: 32 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -71,11 +71,6 @@ def __init__(
7171
self.tables: Dict[str, SQLTable] = {}
7272
self.types: Dict[str, str] = {}
7373

74-
# TODO: to be used for schema inference
75-
self.todo_tables: Dict[str, Optional[SQLTable]] = {}
76-
77-
self.http_input_buffer: list[Dict[str, pd.DataFrame]] = []
78-
7974
# buffer that stores all input connectors to be created
8075
# this is a Mapping[table_name -> list[Connector]]
8176
self.input_connectors_buffer: Dict[str, list[Connector]] = {}
@@ -167,28 +162,6 @@ def __setup_output_listeners(self):
167162
# block until the callback runner is ready
168163
queue.join()
169164

170-
def __push_http_inputs(self):
171-
"""
172-
Internal function used to push the input data to the pipeline.
173-
174-
:meta private:
175-
"""
176-
177-
for input_buffer in self.http_input_buffer:
178-
for tbl_name, data in input_buffer.items():
179-
for datum in chunk_dataframe(data):
180-
self.client.push_to_pipeline(
181-
self.pipeline_name,
182-
tbl_name,
183-
"json",
184-
datum.to_json(orient='records', date_format='epoch'),
185-
json_flavor='pandas',
186-
array=True,
187-
serialize=False
188-
)
189-
190-
self.http_input_buffer.clear()
191-
192165
def create(self) -> Self:
193166
"""
194167
Sets the build mode to CREATE, meaning that the pipeline will be created from scratch.
@@ -253,8 +226,6 @@ def register_table(self, table_name: str, schema: Optional[SQLSchema] = None, dd
253226

254227
if schema:
255228
self.tables[table_name] = SQLTable(table_name, schema=schema)
256-
else:
257-
self.todo_tables[table_name] = None
258229

259230
def register_table_from_sql(self, ddl: str):
260231
"""
@@ -272,34 +243,47 @@ def register_table_from_sql(self, ddl: str):
272243

273244
self.tables[name] = SQLTable(name, ddl)
274245

275-
def connect_source_pandas(self, table_name: str, df: pandas.DataFrame, flush: bool = False):
246+
def input_pandas(self, table_name: str, df: pandas.DataFrame, force: bool = False):
276247
"""
277248
Adds a pandas DataFrame to the input buffer of the SQLContext, to be pushed to the pipeline.
278249
Note that if the pipeline is running, the data will not be pushed if `flush` is False.
279250
280251
:param table_name: The name of the table.
281252
:param df: The pandas DataFrame to be pushed to the pipeline.
282-
:param flush: If True, the data will be pushed to the pipeline immediately. Defaults to False.
253+
:param force: `True` to push data even if the pipeline is paused. `False` by default.
283254
"""
284255

285-
if flush and self.pipeline_status() != PipelineStatus.RUNNING:
286-
raise RuntimeError("Pipeline must be running to flush the data")
256+
status = self.pipeline_status()
257+
if status not in [
258+
PipelineStatus.RUNNING,
259+
PipelineStatus.PAUSED,
260+
]:
261+
raise RuntimeError("Pipeline must be running or paused to push data")
262+
263+
if not force and status == PipelineStatus.PAUSED:
264+
raise RuntimeError("Pipeline is paused, set force=True to push data")
287265

288266
ensure_dataframe_has_columns(df)
289267

290268
tbl = self.tables.get(table_name)
291269

292-
if tbl:
270+
if tbl is None:
271+
raise ValueError(f"Cannot push to table '{table_name}' as it is not registered yet")
272+
else:
293273
# tbl.validate_schema(df) TODO: something like this would be nice
294-
self.http_input_buffer.append({tbl.name: df})
295-
if flush:
296-
self.__push_http_inputs()
274+
for datum in chunk_dataframe(df):
275+
self.client.push_to_pipeline(
276+
self.pipeline_name,
277+
table_name,
278+
"json",
279+
datum.to_json(orient='records', date_format='epoch'),
280+
json_flavor='pandas',
281+
array=True,
282+
serialize=False,
283+
force=force,
284+
)
297285
return
298286

299-
# TODO: handle schema inference
300-
if tbl is None:
301-
raise ValueError(f"Cannot push to table {table_name} as it is not registered yet")
302-
303287
def register_local_view(self, name: str, query: str):
304288
"""
305289
Registers a local view with the SQLContext.
@@ -639,8 +623,6 @@ def start(self):
639623

640624
self.resume()
641625

642-
self.__push_http_inputs()
643-
644626
def wait_for_idle(
645627
self,
646628
idle_interval_s: float = 5.0,
@@ -740,7 +722,12 @@ def delete(self, delete_program: bool = True, delete_connectors: bool = False):
740722
:param delete_connectors: If True, also deletes the connectors associated with the pipeline. False by default.
741723
"""
742724

743-
if self.pipeline_status() != PipelineStatus.SHUTDOWN:
725+
current_status = self.pipeline_status()
726+
727+
if current_status == PipelineStatus.NOT_FOUND:
728+
raise RuntimeError("Attempting to delete a pipeline that hasn't been created yet")
729+
730+
if current_status not in [PipelineStatus.SHUTDOWN, PipelineStatus.FAILED]:
744731
raise RuntimeError("Pipeline must be shutdown before deletion")
745732

746733
self.client.delete_pipeline(self.pipeline_name)
@@ -754,4 +741,4 @@ def delete(self, delete_program: bool = True, delete_connectors: bool = False):
754741
self.client.delete_connector(conn.name)
755742
for connector in self.output_connectors_buffer.values():
756743
for conn in connector:
757-
self.client.delete_connector(conn.name)
744+
self.client.delete_connector(conn.name)

python/requirements-dev.txt

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,2 @@
11
sphinx==7.3.7
2-
sphinx_rtd_theme==2.0.0
3-
enum-tools[sphinx]
2+
sphinx_rtd_theme==2.0.0

0 commit comments

Comments
 (0)