Skip to content

Commit 334c2ad

Browse files
abhizerryzhyk
andauthored
py: convert SQL types to proper pandas type (#2305)
* py: convert SQL types to proper pandas type When receiving data from feldera and creating a Pandas dataframe, this commit considers the SQL schema of the data, and then uses appropriate pandas types for the data columns. Signed-off-by: Abhinav Gyawali <22275402+abhizer@users.noreply.github.com> * docs: document pandas types compatibility Signed-off-by: Abhinav Gyawali <22275402+abhizer@users.noreply.github.com> * Update python/feldera/_callback_runner.py Co-authored-by: Leonid Ryzhyk <ryzhyk@gmail.com> Signed-off-by: Abhinav Gyawali <22275402+abhizer@users.noreply.github.com> --------- Signed-off-by: Abhinav Gyawali <22275402+abhizer@users.noreply.github.com> Co-authored-by: Leonid Ryzhyk <ryzhyk@gmail.com>
1 parent 46f7f83 commit 334c2ad

File tree

6 files changed

+359
-66
lines changed

6 files changed

+359
-66
lines changed

python/docs/examples.rst

Lines changed: 3 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -1,66 +1,8 @@
11
Examples
22
========
33

4-
Using Pandas DataFrames as Input / Output
5-
*******************************************
4+
Specifying Data Sources / Sinks
5+
*******************************
66

7-
8-
You can use :meth:`.Pipeline.input_pandas` to insert records from a
9-
DataFrame to a Feldera table.
10-
11-
Use :meth:`.Pipeline.listen` to subscribe to updates to a view in the form of a stream of DataFrames.
12-
To ensure all data is received start listening before calling
13-
:meth:`.Pipeline.start`.
14-
15-
.. highlight:: python
16-
.. code-block:: python
17-
18-
from feldera import FelderaClient, PipelineBuilder
19-
import pandas as pd
20-
21-
sql = f"""
22-
CREATE TABLE students (
23-
name STRING,
24-
id INT
25-
);
26-
27-
CREATE TABLE grades (
28-
student_id INT,
29-
science INT,
30-
maths INT,
31-
art INT
32-
);
33-
34-
CREATE VIEW average_scores AS SELECT name, ((science + maths + art) / 3) as average FROM {TBL_NAMES[0]} JOIN {TBL_NAMES[1]} on id = student_id ORDER BY average DESC;
35-
"""
36-
37-
# Create a client
38-
client = FelderaClient("https://try.feldera.com", api_key="YOUR_API_KEY")
39-
pipeline = PipelineBuilder(client, name="notebook", sql=sql).create_or_replace()
40-
41-
df_students = pd.read_csv('students.csv')
42-
df_grades = pd.read_csv('grades.csv')
43-
44-
# listen for the output of the view here in the notebook
45-
# you do not need to call this if you are forwarding the data to a sink
46-
out = pipeline.listen("average_scores")
47-
48-
pipeline.start()
49-
pipeline.input_pandas("students", df_students)
50-
pipeline.input_pandas("grades", df_grades)
51-
52-
# wait for the pipeline to complete
53-
# note that if the source is a stream, this will run indefinitely
54-
pipeline.wait_for_completion(True)
55-
df = out.to_pandas()
56-
57-
# see the result
58-
print(df)
59-
60-
pipeline.delete()
61-
62-
Using Other Data Sources / Sinks
63-
**********************************
64-
65-
To connect Feldera to other data sources or sinks, you can specify them in the SQL code.
7+
To connect Feldera to data sources or sinks, you can specify them in the SQL code.
668
Refer to the connector documentation at: https://github.com/feldera/feldera/tree/main/docs/connectors

python/docs/index.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ Welcome to feldera's documentation!
1111
:caption: Contents:
1212

1313
introduction
14+
pandas
1415
examples
1516

1617
.. toctree::

python/docs/pandas.rst

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
Pandas Compatibility
2+
====================
3+
4+
Feldera tries to be compatible with the Pandas as much as possible.
5+
However, some types in SQL have limited support in Pandas.
6+
7+
Columns with the following SQL types will be converted to the corresponding Pandas types:
8+
9+
.. csv-table::
10+
:header: "SQL Type", "Pandas Type"
11+
12+
"BOOLEAN", "bool"
13+
"TINYINT", "Int8"
14+
"SMALLINT", "Int16"
15+
"INTEGER", "Int32"
16+
"BIGINT", "Int64"
17+
"REAL", "Float32"
18+
"DOUBLE", "Float64"
19+
"DECIMAL", "decimal.Decimal"
20+
"CHAR", "str"
21+
"VARCHAR", "str"
22+
"DATE", "datetime64[ns]"
23+
"TIMESTAMP", "datetime64[ns]"
24+
"TIME", "timedelta64[ns]"
25+
"INTERVAL", "timedelta64[ns]"
26+
"ARRAY", "object"
27+
"BINARY", "object"
28+
"VARBINARY", "object"
29+
"STRUCT", "object"
30+
"MAP", "object"
31+
32+
33+
.. note::
34+
Please note that the "object" type in Pandas is dynamic and can hold any type of data.
35+
Therefore, the representation of primitive types in arrays, binary, struct, and map types may be different to their
36+
representation as a standalone column.
37+
38+
Using Pandas DataFrames as Input / Output
39+
*******************************************
40+
41+
You can use :meth:`.Pipeline.input_pandas` to insert records from a
42+
DataFrame to a Feldera table.
43+
44+
Use :meth:`.Pipeline.listen` to subscribe to updates to a view in the form of a stream of DataFrames.
45+
To ensure all data is received start listening before calling
46+
:meth:`.Pipeline.start`.
47+
48+
.. highlight:: python
49+
.. code-block:: python
50+
51+
from feldera import FelderaClient, PipelineBuilder
52+
import pandas as pd
53+
54+
sql = f"""
55+
CREATE TABLE students (
56+
name STRING,
57+
id INT
58+
);
59+
60+
CREATE TABLE grades (
61+
student_id INT,
62+
science INT,
63+
maths INT,
64+
art INT
65+
);
66+
67+
CREATE VIEW average_scores AS SELECT name, ((science + maths + art) / 3) as average FROM {TBL_NAMES[0]} JOIN {TBL_NAMES[1]} on id = student_id ORDER BY average DESC;
68+
"""
69+
70+
# Create a client
71+
client = FelderaClient("https://try.feldera.com", api_key="YOUR_API_KEY")
72+
pipeline = PipelineBuilder(client, name="notebook", sql=sql).create_or_replace()
73+
74+
df_students = pd.read_csv('students.csv')
75+
df_grades = pd.read_csv('grades.csv')
76+
77+
# listen for the output of the view here in the notebook
78+
# you do not need to call this if you are forwarding the data to a sink
79+
out = pipeline.listen("average_scores")
80+
81+
pipeline.start()
82+
pipeline.input_pandas("students", df_students)
83+
pipeline.input_pandas("grades", df_grades)
84+
85+
# wait for the pipeline to complete
86+
# note that if the source is a stream, this will run indefinitely
87+
pipeline.wait_for_completion(True)
88+
df = out.to_pandas()
89+
90+
# see the result
91+
print(df)
92+
93+
pipeline.delete()

python/feldera/_callback_runner.py

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ def __init__(
2929
self.view_name: str = view_name
3030
self.callback: Callable[[pd.DataFrame, int], None] = callback
3131
self.queue: Optional[Queue] = queue
32+
self.schema: Optional[dict] = None
3233

3334
def run(self):
3435
"""
@@ -37,6 +38,19 @@ def run(self):
3738
:meta private:
3839
"""
3940

41+
pipeline = self.client.get_pipeline(self.pipeline_name)
42+
schema = pipeline.program_info["schema"]
43+
44+
if schema:
45+
schemas = [relation for relation in schema["inputs"] + schema["outputs"]]
46+
for schema in schemas:
47+
if schema["name"] == self.view_name:
48+
self.schema = schema
49+
break
50+
51+
if self.schema is None:
52+
raise ValueError(f"Table or View {self.view_name} not found in the pipeline schema.")
53+
4054
# by default, we assume that the pipeline has been started
4155
ack: _CallbackRunnerInstruction = _CallbackRunnerInstruction.PipelineStarted
4256

@@ -65,7 +79,7 @@ def run(self):
6579
seq_no: int = chunk.get("sequence_number")
6680

6781
if data is not None:
68-
self.callback(dataframe_from_response([data]), seq_no)
82+
self.callback(dataframe_from_response([data], schema), seq_no)
6983

7084
if self.queue:
7185
try:

python/feldera/_helpers.py

Lines changed: 69 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,45 @@
11
import pandas as pd
2+
from decimal import Decimal
3+
4+
5+
def sql_type_to_pandas_type(sql_type: str):
6+
"""
7+
Converts a SQL type to a pandas type.
8+
"""
9+
10+
match sql_type.upper():
11+
case 'BOOLEAN':
12+
return 'boolean'
13+
case 'TINYINT':
14+
return 'Int8'
15+
case 'SMALLINT':
16+
return 'Int16'
17+
case 'INTEGER':
18+
return 'Int32'
19+
case 'BIGINT':
20+
return 'Int64'
21+
case 'REAL':
22+
return 'Float32'
23+
case 'DOUBLE':
24+
return 'Float64'
25+
case 'DECIMAL':
26+
return None
27+
case 'CHAR':
28+
return 'str'
29+
case 'VARCHAR':
30+
return 'str'
31+
case 'DATE' | 'TIMESTAMP':
32+
return 'datetime64[ns]'
33+
case 'TIME' | 'INTERVAL':
34+
return 'timedelta64[ns]'
35+
case 'ARRAY':
36+
return None
37+
case 'NULL':
38+
return None
39+
case 'BINARY' | 'VARBINARY':
40+
return None
41+
case 'STRUCT' | 'MAP':
42+
return None
243

344

445
def ensure_dataframe_has_columns(df: pd.DataFrame):
@@ -15,14 +56,39 @@ def ensure_dataframe_has_columns(df: pd.DataFrame):
1556
)
1657

1758

18-
def dataframe_from_response(buffer: list[list[dict]]):
59+
def dataframe_from_response(buffer: list[list[dict]], schema: dict):
1960
"""
2061
Converts the response from Feldera to a pandas DataFrame.
2162
"""
22-
return pd.DataFrame([
63+
64+
pd_schema = {}
65+
66+
decimal_col = []
67+
68+
for column in schema['fields']:
69+
column_name = column['name']
70+
column_type = column['columntype']['type']
71+
if column_type == 'DECIMAL':
72+
decimal_col.append(column_name)
73+
74+
pd_schema[column_name] = sql_type_to_pandas_type(column_type)
75+
76+
data = [
2377
{**item['insert'], 'insert_delete': 1} if 'insert' in item else {**item['delete'], 'insert_delete': -1}
2478
for sublist in buffer for item in sublist
25-
])
79+
]
80+
81+
if len(decimal_col) != 0:
82+
for datum in data:
83+
for col in decimal_col:
84+
if datum[col] is not None:
85+
datum[col] = Decimal(datum[col])
86+
87+
88+
df = pd.DataFrame(data)
89+
df = df.astype(pd_schema)
90+
91+
return df
2692

2793

2894
def chunk_dataframe(df, chunk_size=1000):

0 commit comments

Comments
 (0)