Skip to content

Commit 1d225ac

Browse files
abhizerLeonid Ryzhyk
andauthored
py: properly serialize DataFrames with Timestamp columns (#1846)
* py: properly serialize DataFrames with Timestamp columns Fixes: #1840 Also does the following things: * chunk dataframes into smaller groups of 1000 rows per request while ingesting data * avoids adding empty dataframes to output buffer * ignores the index while concatenating output dataframes Signed-off-by: Abhinav Gyawali <22275402+abhizer@users.noreply.github.com> * py: Testing instructions. Signed-off-by: Leonid Ryzhyk <leonid@feldera.com> * py: Encode Pandas timestamps as epoch. Introduces a new JSON dialect that matches how Pandas encodes timestamp types as millis since epoch. Signed-off-by: Leonid Ryzhyk <leonid@feldera.com> * py: rename dont_serialize to serialize in push_to_pipeline Signed-off-by: Abhinav Gyawali <22275402+abhizer@users.noreply.github.com> --------- Signed-off-by: Abhinav Gyawali <22275402+abhizer@users.noreply.github.com> Signed-off-by: Leonid Ryzhyk <leonid@feldera.com> Co-authored-by: Leonid Ryzhyk <leonid@feldera.com>
1 parent 768e47b commit 1d225ac

File tree

11 files changed

+112
-16
lines changed

11 files changed

+112
-16
lines changed

crates/pipeline-types/src/format/json.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,8 @@ pub enum JsonFlavor {
140140
/// JSON format accepted by the Kafka Connect `JsonConverter` class.
141141
#[serde(rename = "kafka_connect_json_converter")]
142142
KafkaConnectJsonConverter,
143+
#[serde(rename = "pandas")]
144+
Pandas,
143145
/// Parquet to-json format.
144146
/// (For internal use only)
145147
#[serde(skip)]

crates/pipeline-types/src/serde_with_context/serde_config.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,12 @@ impl From<JsonFlavor> for SqlSerdeConfig {
140140
timestamp_format: TimestampFormat::String("%Y-%m-%dT%H:%M:%S%.f%:z"),
141141
decimal_format: DecimalFormat::String,
142142
},
143+
JsonFlavor::Pandas => Self {
144+
time_format: TimeFormat::String("%H:%M:%S%.f"),
145+
date_format: DateFormat::String("%Y-%m-%d"),
146+
timestamp_format: TimestampFormat::MillisSinceEpoch,
147+
decimal_format: DecimalFormat::String,
148+
},
143149
JsonFlavor::ParquetConverter => Self {
144150
time_format: TimeFormat::Nanos,
145151
date_format: DateFormat::String("%Y-%m-%d"),

openapi.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3790,7 +3790,8 @@
37903790
"default",
37913791
"debezium_mysql",
37923792
"snowflake",
3793-
"kafka_connect_json_converter"
3793+
"kafka_connect_json_converter",
3794+
"pandas"
37943795
]
37953796
},
37963797
"JsonParserConfig": {

python/README.md

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,4 +32,12 @@ sphinx-apidoc -o . ../feldera
3232
make html
3333
```
3434

35-
To clean the build, run `make clean`.
35+
To clean the build, run `make clean`.
36+
37+
## Testing
38+
39+
40+
```bash
41+
cd python
42+
python3 -m unittest
43+
```

python/feldera/_helpers.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,3 +32,12 @@ def validate_connector_input_format(fmt: Format):
3232

3333
if isinstance(fmt, JSONFormat) and fmt.config.get("update_format") is None:
3434
raise ValueError("update_format not set in the format config; consider using: .with_update_format()")
35+
36+
37+
def chunk_dataframe(df, chunk_size=1000):
38+
"""
39+
Yield successive n-sized chunks from the given dataframe.
40+
"""
41+
42+
for i in range(0, len(df), chunk_size):
43+
yield df.iloc[i:i + chunk_size]

python/feldera/output_handler.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,8 @@ def __init__(self, client: FelderaClient, pipeline_name: str, view_name: str, qu
2020

2121
# the callback that is passed to the `CallbackRunner`
2222
def callback(df: pd.DataFrame, _: int):
23-
self.buffer.append(df)
23+
if not df.empty:
24+
self.buffer.append(df)
2425

2526
# sets up the callback runner
2627
self.handler = CallbackRunner(self.client, self.pipeline_name, self.view_name, callback, queue)
@@ -38,4 +39,7 @@ def to_pandas(self):
3839
"""
3940

4041
self.handler.join()
41-
return pd.concat(self.buffer)
42+
43+
if len(self.buffer) == 0:
44+
return pd.DataFrame()
45+
return pd.concat(self.buffer, ignore_index=True)

python/feldera/rest/_httprequests.py

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,10 @@
99
from typing import Callable, Optional, Any, Union, Mapping, Sequence, List
1010

1111

12+
def json_serialize(body: Any) -> str:
13+
return json.dumps(body) if body else "" if body == "" else "null"
14+
15+
1216
class HttpRequests:
1317
def __init__(self, config: Config) -> None:
1418
self.config = config
@@ -28,6 +32,7 @@ def send_request(
2832
content_type: str = "application/json",
2933
params: Optional[Mapping[str, Any]] = None,
3034
stream: bool = False,
35+
serialize: bool = True,
3136
) -> Any:
3237
"""
3338
:param http_method: The HTTP method to use. Takes the equivalent `requests.*` module. (Example: `requests.get`)
@@ -36,6 +41,7 @@ def send_request(
3641
:param content_type: The value for `Content-Type` HTTP header. "application/json" by default.
3742
:param params: The query parameters part of this request.
3843
:param stream: True if the response is expected to be a HTTP stream.
44+
:param serialize: True if the body needs to be serialized to JSON.
3945
"""
4046
self.headers["Content-Type"] = content_type
4147

@@ -71,7 +77,7 @@ def send_request(
7177
request_path,
7278
timeout=timeout,
7379
headers=headers,
74-
data=json.dumps(body) if body else "" if body == "" else "null",
80+
data=json_serialize(body) if serialize else body,
7581
params=params,
7682
stream=stream,
7783
)
@@ -102,8 +108,16 @@ def post(
102108
content_type: Optional[str] = "application/json",
103109
params: Optional[Mapping[str, Any]] = None,
104110
stream: bool = False,
111+
serialize: bool = True,
105112
) -> Any:
106-
return self.send_request(requests.post, path, body, content_type, params, stream=stream)
113+
return self.send_request(
114+
requests.post,
115+
path,
116+
body,
117+
content_type,
118+
params, stream=stream,
119+
serialize=serialize
120+
)
107121

108122
def patch(
109123
self,

python/feldera/rest/feldera_client.py

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,13 +32,19 @@ def __init__(
3232
"""
3333
:param url: The url to Feldera API (ex: https://try.feldera.com)
3434
:param api_key: The optional API key for Feldera
35-
:param timeout: (optional) The amount of time in seconds that the cient will wait for a response beforing timing
35+
:param timeout: (optional) The amount of time in seconds that the client will wait for a response before timing
3636
out.
3737
"""
3838

3939
self.config = Config(url, api_key, timeout)
4040
self.http = HttpRequests(self.config)
4141

42+
try:
43+
self.programs()
44+
except Exception as e:
45+
logging.error(f"Failed to connect to Feldera API: {e}")
46+
raise e
47+
4248
def programs(self) -> list[Program]:
4349
"""
4450
Get all programs
@@ -381,6 +387,8 @@ def push_to_pipeline(
381387
array: bool = False,
382388
force: bool = False,
383389
update_format: str = "raw",
390+
json_flavor: str = None,
391+
serialize: bool = True,
384392
):
385393
"""
386394
Insert data into a pipeline
@@ -394,8 +402,10 @@ def push_to_pipeline(
394402
:param force: If True, the data will be inserted even if the pipeline is paused
395403
:param update_format: JSON data change event format, used in conjunction with the "json" format,
396404
the default value is "insert_delete", other supported formats: "weighted", "debezium", "snowflake", "raw"
397-
405+
:param json_flavor: JSON encoding used for individual table records, the default value is "default", other supported encodings:
406+
"debezium_mysql", "snowflake", "kafka_connect_json_converter", "pandas"
398407
:param data: The data to insert
408+
:param serialize: If True, the data will be serialized to JSON. True by default
399409
"""
400410

401411
if format not in ["json", "csv"]:
@@ -404,6 +414,9 @@ def push_to_pipeline(
404414
if update_format not in ["insert_delete", "weighted", "debezium", "snowflake", "raw"]:
405415
raise ValueError("update_format must be one of 'insert_delete', 'weighted', 'debezium', 'snowflake', 'raw'")
406416

417+
if json_flavor is not None and json_flavor not in ["default", "debezium_mysql", "snowflake", "kafka_connect_json_converter", "pandas"]:
418+
raise ValueError("json_flavor must be one of 'default', 'debezium_mysql', 'snowflake', 'kafka_connect_json_converter', 'pandas'")
419+
407420
# python sends `True` which isn't accepted by the backend
408421
array = _prepare_boolean_input(array)
409422
force = _prepare_boolean_input(force)
@@ -417,6 +430,9 @@ def push_to_pipeline(
417430
params["array"] = array
418431
params["update_format"] = update_format
419432

433+
if json_flavor is not None:
434+
params["json_flavor"] = json_flavor
435+
420436
content_type = "application/json"
421437

422438
if format == "csv":
@@ -428,6 +444,7 @@ def push_to_pipeline(
428444
params=params,
429445
content_type=content_type,
430446
body=data,
447+
serialize=serialize,
431448
)
432449

433450
def listen_to_pipeline(
@@ -493,4 +510,4 @@ def listen_to_pipeline(
493510
if end and time.time() > end:
494511
break
495512
if chunk:
496-
yield json.loads(chunk)
513+
yield json.loads(chunk)

python/feldera/sql_context.py

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
from typing import Optional, Dict, Callable
66

7+
import pandas as pd
78
from typing_extensions import Self
89
from queue import Queue
910

@@ -18,9 +19,9 @@
1819
from feldera._callback_runner import CallbackRunner, _CallbackRunnerInstruction
1920
from feldera._helpers import ensure_dataframe_has_columns
2021
from feldera.formats import JSONFormat, CSVFormat, AvroFormat
21-
from feldera._helpers import validate_connector_input_format
2222
from feldera.resources import Resources
2323
from feldera.enums import BuildMode, CompilationProfile
24+
from feldera._helpers import validate_connector_input_format, chunk_dataframe
2425

2526

2627
def _table_name_from_sql(ddl: str) -> str:
@@ -72,7 +73,7 @@ def __init__(
7273
# TODO: to be used for schema inference
7374
self.todo_tables: Dict[str, Optional[SQLTable]] = {}
7475

75-
self.http_input_buffer: list[Dict[str, dict | list[dict] | str]] = []
76+
self.http_input_buffer: list[Dict[str, pd.DataFrame]] = []
7677

7778
# buffer that stores all input connectors to be created
7879
# this is a Mapping[table_name -> list[Connector]]
@@ -173,7 +174,16 @@ def __push_http_inputs(self):
173174

174175
for input_buffer in self.http_input_buffer:
175176
for tbl_name, data in input_buffer.items():
176-
self.client.push_to_pipeline(self.pipeline_name, tbl_name, "json", data, array=True)
177+
for datum in chunk_dataframe(data):
178+
self.client.push_to_pipeline(
179+
self.pipeline_name,
180+
tbl_name,
181+
"json",
182+
datum.to_json(orient='records', date_format='epoch'),
183+
json_flavor='pandas',
184+
array=True,
185+
serialize=False
186+
)
177187

178188
self.http_input_buffer.clear()
179189

@@ -273,7 +283,7 @@ def connect_source_pandas(self, table_name: str, df: pandas.DataFrame):
273283

274284
if tbl:
275285
# tbl.validate_schema(df) TODO: something like this would be nice
276-
self.http_input_buffer.append({tbl.name: df.to_dict('records')})
286+
self.http_input_buffer.append({tbl.name: df})
277287
return
278288

279289
tbl = self.todo_tables.get(table_name)

python/tests/test_pipeline.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -135,18 +135,18 @@ def test_listen_to_pipeline(self):
135135
name = str(uuid.uuid4())
136136
self.test_create_pipeline(name, False)
137137

138-
TEST_CLIENT.start_pipeline(name)
138+
TEST_CLIENT.pause_pipeline(name)
139139

140140
t1 = threading.Thread(target=self.__listener, args=(name,))
141141
t1.start()
142142

143+
TEST_CLIENT.start_pipeline(name)
143144
TEST_CLIENT.push_to_pipeline(name, "tbl", "csv", data)
144145

145146
t1.join()
146147

147148
assert self.result
148149

149-
TEST_CLIENT.pause_pipeline(name)
150150
TEST_CLIENT.shutdown_pipeline(name)
151151
TEST_CLIENT.delete_pipeline(name)
152152

0 commit comments

Comments
 (0)