Skip to content

Commit b7bd1db

Browse files
authored
py: add SQLContext.input_json() to send json data (#2088)
* py: add SQLContext.input_json() to send json data Signed-off-by: Abhinav Gyawali <22275402+abhizer@users.noreply.github.com> * Update CHANGELOG Signed-off-by: Abhinav Gyawali <22275402+abhizer@users.noreply.github.com> --------- Signed-off-by: Abhinav Gyawali <22275402+abhizer@users.noreply.github.com>
1 parent b38acc8 commit b7bd1db

File tree

4 files changed

+73
-0
lines changed

4 files changed

+73
-0
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
77

88
## [Unreleased]
99

10+
- [Python] Add `SQLContext.input_json()` to send json
11+
data to the pipeline
12+
([#2088](https://github.com/feldera/feldera/pull/2088))
13+
1014
## [0.21.0] - 2024-07-22
1115

1216
- [SQL] Preliminary implementation of the `NOW()` function

python/feldera/output_handler.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,3 +48,13 @@ def to_pandas(self, clear_buffer: bool = True):
4848
self.buffer.clear()
4949

5050
return res
51+
52+
def to_dict(self, clear_buffer: bool = True):
53+
"""
54+
Returns the output of the pipeline as a list of python dictionaries
55+
56+
:param clear_buffer: Whether to clear the buffer after getting the output.
57+
"""
58+
59+
return self.to_pandas(clear_buffer).to_dict(orient='records')
60+

python/feldera/sql_context.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -283,6 +283,19 @@ def input_pandas(self, table_name: str, df: pandas.DataFrame, force: bool = Fals
283283
)
284284
return
285285

286+
def input_json(self, table_name: str, data: Dict | list, force: bool = False):
287+
"""
288+
Push this JSON data to the specified table of the pipeline.
289+
290+
:param table_name: The name of the table to push data into.
291+
:param data: The JSON encoded data to be pushed to the pipeline. The data should be in the form:
292+
`{'col1': 'val1', 'col2': 'val2'}` or `[{'col1': 'val1', 'col2': 'val2'}, {'col1': 'val1', 'col2': 'val2'}]`
293+
:param force: `True` to push data even if the pipeline is paused. `False` by default.
294+
"""
295+
296+
array = True if isinstance(data, list) else False
297+
self.client.push_to_pipeline(self.pipeline_name, table_name, "json", data, array=array, force=force)
298+
286299
def register_local_view(self, name: str, query: str):
287300
"""
288301
Register a local view with the SQLContext.

python/tests/test_wireframes.py

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -421,6 +421,52 @@ def test_timestamp_pandas(self):
421421

422422
sql.delete()
423423

424+
def test_input_json0(self):
425+
sql = SQLContext("test_input_json", TEST_CLIENT).get_or_create()
426+
427+
TBL_NAME = "items"
428+
VIEW_NAME = "s"
429+
430+
sql.register_table(TBL_NAME, SQLSchema({"id": "INT", "name": "STRING"}))
431+
sql.register_materialized_view(VIEW_NAME, f"SELECT * FROM {TBL_NAME}")
432+
433+
data = {'id': 1, 'name': 'a'}
434+
435+
out = sql.listen(VIEW_NAME)
436+
437+
sql.start()
438+
sql.input_json(TBL_NAME, data)
439+
sql.wait_for_completion(True)
440+
441+
out_data = out.to_dict()
442+
443+
data.update({"insert_delete": 1})
444+
assert out_data == [data]
445+
446+
def test_input_json1(self):
447+
sql = SQLContext("test_input_json", TEST_CLIENT).get_or_create()
448+
449+
TBL_NAME = "items"
450+
VIEW_NAME = "s"
451+
452+
sql.register_table(TBL_NAME, SQLSchema({"id": "INT", "name": "STRING"}))
453+
sql.register_materialized_view(VIEW_NAME, f"SELECT * FROM {TBL_NAME}")
454+
455+
data = [{'id': 1, 'name': 'a'}, {'id': 2, 'name': 'b'}]
456+
457+
out = sql.listen(VIEW_NAME)
458+
459+
sql.start()
460+
sql.input_json(TBL_NAME, data)
461+
sql.wait_for_completion(True)
462+
463+
out_data = out.to_dict()
464+
465+
for datum in data:
466+
datum.update({"insert_delete": 1})
467+
468+
assert out_data == data
469+
424470

425471
if __name__ == '__main__':
426472
unittest.main()

0 commit comments

Comments
 (0)