-
Notifications
You must be signed in to change notification settings - Fork 112
py: support arrow_ipc format for adhoc queries #4226
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 1 commit
a00ba2c
b3f2719
9a83dd1
50a0507
32d6996
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
* also support case sensitive view names to listen to pipelines Signed-off-by: Abhinav Gyawali <22275402+abhizer@users.noreply.github.com>
- Loading branch information
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -3,6 +3,8 @@ | |
| from datetime import datetime | ||
|
|
||
| import pandas | ||
| import warnings | ||
| import pyarrow | ||
|
|
||
| from typing import List, Dict, Callable, Optional, Generator, Mapping, Any | ||
| from collections import deque | ||
|
|
@@ -204,7 +206,7 @@ def resume_connector(self, table_name: str, connector_name: str): | |
|
|
||
| self.client.resume_connector(self.name, table_name, connector_name) | ||
|
|
||
| def listen(self, view_name: str) -> OutputHandler: | ||
| def listen(self, view_name: str, case_sensitive: bool = False) -> OutputHandler: | ||
| """ | ||
| Follow the change stream (i.e., the output) of the provided view. | ||
| Returns an output handler to read the changes. | ||
|
|
@@ -215,6 +217,7 @@ def listen(self, view_name: str) -> OutputHandler: | |
| If this method is called once the pipeline has started, you will only get the output from that point onwards. | ||
|
|
||
| :param view_name: The name of the view to listen to. | ||
| :param case_sensitive: True if the view name is case sensitive. | ||
| """ | ||
|
|
||
| queue: Optional[Queue] = None | ||
|
|
@@ -223,13 +226,18 @@ def listen(self, view_name: str) -> OutputHandler: | |
| queue = Queue(maxsize=1) | ||
| self.views_tx.append({view_name: queue}) | ||
|
|
||
| handler = OutputHandler(self.client, self.name, view_name, queue) | ||
| handler = OutputHandler( | ||
| self.client, self.name, view_name, queue, case_sensitive | ||
| ) | ||
| handler.start() | ||
|
|
||
| return handler | ||
|
|
||
| def foreach_chunk( | ||
| self, view_name: str, callback: Callable[[pandas.DataFrame, int], None] | ||
| self, | ||
| view_name: str, | ||
| callback: Callable[[pandas.DataFrame, int], None], | ||
| case_sensitive: bool = False, | ||
| ): | ||
| """ | ||
| Run the given callback on each chunk of the output of the specified view. | ||
|
|
@@ -244,6 +252,8 @@ def foreach_chunk( | |
| - **seq_no** -> The sequence number. The sequence number is a monotonically increasing integer that | ||
| starts from 0. Note that the sequence number is unique for each chunk, but not necessarily contiguous. | ||
|
|
||
| :param case_sensitive: True if the view name is case sensitive. | ||
|
|
||
| Please note that the callback is run in a separate thread, so it should be thread-safe. | ||
| Please note that the callback should not block for a long time, as by default, backpressure is enabled and | ||
| will block the pipeline. | ||
|
|
@@ -259,7 +269,9 @@ def foreach_chunk( | |
| queue = Queue(maxsize=1) | ||
| self.views_tx.append({view_name: queue}) | ||
|
|
||
| handler = CallbackRunner(self.client, self.name, view_name, callback, queue) | ||
| handler = CallbackRunner( | ||
| self.client, self.name, view_name, callback, queue, case_sensitive | ||
| ) | ||
| handler.start() | ||
|
|
||
| def wait_for_completion( | ||
|
|
@@ -692,9 +704,50 @@ def query(self, query: str) -> Generator[Mapping[str, Any], None, None]: | |
| :raises FelderaAPIError: If querying a non materialized table or view. | ||
| :raises FelderaAPIError: If the query is invalid. | ||
| """ | ||
| warnings.warn( | ||
| "function query is deprecated and will be removed in a future version", | ||
| category=DeprecationWarning, | ||
| stacklevel=2, | ||
| ) | ||
|
|
||
| return self.client.query_as_json(self.name, query) | ||
|
|
||
| def query_pyarrow(self, query: str) -> pyarrow.Table: | ||
| """ | ||
| Executes an ad-hoc SQL query on this pipeline and returns a | ||
| :class:`.pyarrow.Table` containing all the results. | ||
|
|
||
| Note: | ||
| You can only ``SELECT`` from materialized tables and views. | ||
|
|
||
| :param query: The SQL query to be executed. | ||
| :param path: The path of the parquet file. | ||
|
|
||
| :raises FelderaAPIError: If the pipeline is not in a RUNNING or PAUSED state. | ||
| :raises FelderaAPIError: If querying a non materialized table or view. | ||
| :raises FelderaAPIError: If the query is invalid. | ||
| """ | ||
|
|
||
| return self.client.query_as_pyarrow(self.name, query) | ||
|
|
||
| def query_pylist(self, query: str) -> List[Mapping[str, Any]]: | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it seems like given this return type suffers from the same problem as the json (aliased columns will be removed?)
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, it does. It doesn't suffer from it if we return
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ok probably not a good return type then, can we remove it? Its stupid but unless we change our SQL to not accept this nonsense I dont see a good alternative cc @mihaibudiu
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. maybe instead if str as key you can have a Column Type that hashes using unique identifiers/position in the select stmt but displays as the column name
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it should be a list, not a mapping
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It is mostly for convenience and works in most cases.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. no need to remove but either use a key that has a unique hash like the position or list of list |
||
| """ | ||
| Executes an ad-hoc SQL query on this pipeline and returns a python | ||
| dictionary containing all the results. | ||
|
|
||
| Note: | ||
| You can only ``SELECT`` from materialized tables and views. | ||
|
|
||
| :param query: The SQL query to be executed. | ||
| :param path: The path of the parquet file. | ||
|
|
||
| :raises FelderaAPIError: If the pipeline is not in a RUNNING or PAUSED state. | ||
| :raises FelderaAPIError: If querying a non materialized table or view. | ||
| :raises FelderaAPIError: If the query is invalid. | ||
| """ | ||
|
|
||
| return self.query_pyarrow(self.name, query).to_pylist() | ||
|
|
||
| def query_parquet(self, query: str, path: str): | ||
| """ | ||
| Executes an ad-hoc SQL query on this pipeline and saves the result to the specified path as a parquet file. | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.