Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

- [Python] Add `SQLContext.input_json()` to send json
data to the pipeline
([#2088](https://github.com/feldera/feldera/pull/2088))

## [0.21.0] - 2024-07-22

- [SQL] Preliminary implementation of the `NOW()` function
Expand Down
10 changes: 10 additions & 0 deletions python/feldera/output_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,13 @@ def to_pandas(self, clear_buffer: bool = True):
self.buffer.clear()

return res

def to_dict(self, clear_buffer: bool = True):

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be better to make clear a separate method?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I felt like that introduces more friction than necessary. In most cases, we likely want to clear the buffer anyway.

"""
Returns the output of the pipeline as a list of python dictionaries

:param clear_buffer: Whether to clear the buffer after getting the output.
"""

return self.to_pandas(clear_buffer).to_dict(orient='records')

13 changes: 13 additions & 0 deletions python/feldera/sql_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,19 @@ def input_pandas(self, table_name: str, df: pandas.DataFrame, force: bool = Fals
)
return

def input_json(self, table_name: str, data: Dict | list, force: bool = False):
"""
Push this JSON data to the specified table of the pipeline.

:param table_name: The name of the table to push data into.
:param data: The JSON encoded data to be pushed to the pipeline. The data should be in the form:
`{'col1': 'val1', 'col2': 'val2'}` or `[{'col1': 'val1', 'col2': 'val2'}, {'col1': 'val1', 'col2': 'val2'}]`
:param force: `True` to push data even if the pipeline is paused. `False` by default.
"""

array = True if isinstance(data, list) else False
self.client.push_to_pipeline(self.pipeline_name, table_name, "json", data, array=array, force=force)

def register_local_view(self, name: str, query: str):
"""
Register a local view with the SQLContext.
Expand Down
46 changes: 46 additions & 0 deletions python/tests/test_wireframes.py
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,52 @@ def test_timestamp_pandas(self):

sql.delete()

def test_input_json0(self):
sql = SQLContext("test_input_json", TEST_CLIENT).get_or_create()

TBL_NAME = "items"
VIEW_NAME = "s"

sql.register_table(TBL_NAME, SQLSchema({"id": "INT", "name": "STRING"}))
sql.register_materialized_view(VIEW_NAME, f"SELECT * FROM {TBL_NAME}")

data = {'id': 1, 'name': 'a'}

out = sql.listen(VIEW_NAME)

sql.start()
sql.input_json(TBL_NAME, data)
sql.wait_for_completion(True)

out_data = out.to_dict()

data.update({"insert_delete": 1})
assert out_data == [data]

def test_input_json1(self):
sql = SQLContext("test_input_json", TEST_CLIENT).get_or_create()

TBL_NAME = "items"
VIEW_NAME = "s"

sql.register_table(TBL_NAME, SQLSchema({"id": "INT", "name": "STRING"}))
sql.register_materialized_view(VIEW_NAME, f"SELECT * FROM {TBL_NAME}")

data = [{'id': 1, 'name': 'a'}, {'id': 2, 'name': 'b'}]

out = sql.listen(VIEW_NAME)

sql.start()
sql.input_json(TBL_NAME, data)
sql.wait_for_completion(True)

out_data = out.to_dict()

for datum in data:
datum.update({"insert_delete": 1})

assert out_data == data


if __name__ == '__main__':
unittest.main()