Skip to content

Commit f4f33ff

Browse files
abhizerryzhyk
authored andcommitted
[py] support for UUID type
Signed-off-by: Abhinav Gyawali <22275402+abhizer@users.noreply.github.com>
1 parent 6ca9834 commit f4f33ff

File tree

6 files changed

+51
-8
lines changed

6 files changed

+51
-8
lines changed

demo/project_demo08-DebeziumJDBC/run.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -334,7 +334,7 @@ def generate_inputs(pipeline: Pipeline):
334334
"f8": date_time.strftime("%F"),
335335
# "f9": list("bar".encode('utf-8')),
336336
"f10": {"id": i + batch, "f1": True, "f2": "foo", "f4": 10.5},
337-
"f11": str(uuid.uuid4())
337+
"f11": str(uuid.uuid4()),
338338
}
339339
)
340340
inserts = [{"insert": element} for element in data]

python/feldera/_helpers.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import uuid
2+
13
import pandas as pd
24
from decimal import Decimal
35

@@ -8,6 +10,8 @@ def sql_type_to_pandas_type(sql_type: str):
810
"""
911

1012
match sql_type.upper():
13+
case "UUID":
14+
return None
1115
case "BOOLEAN":
1216
return "boolean"
1317
case "TINYINT":
@@ -64,6 +68,7 @@ def dataframe_from_response(buffer: list[list[dict]], schema: dict):
6468
pd_schema = {}
6569

6670
decimal_col = []
71+
uuid_col = []
6772

6873
for column in schema["fields"]:
6974
column_name = column["name"]
@@ -72,6 +77,8 @@ def dataframe_from_response(buffer: list[list[dict]], schema: dict):
7277
column_type = column["columntype"]["type"]
7378
if column_type == "DECIMAL":
7479
decimal_col.append(column_name)
80+
elif column_type == "UUID":
81+
uuid_col.append(column_name)
7582

7683
pd_schema[column_name] = sql_type_to_pandas_type(column_type)
7784

@@ -89,6 +96,12 @@ def dataframe_from_response(buffer: list[list[dict]], schema: dict):
8996
if datum[col] is not None:
9097
datum[col] = Decimal(datum[col])
9198

99+
if len(uuid_col) != 0:
100+
for datum in data:
101+
for col in uuid_col:
102+
if datum[col] is not None:
103+
datum[col] = uuid.UUID(datum[col])
104+
92105
df = pd.DataFrame(data)
93106
df = df.astype(pd_schema)
94107

python/feldera/rest/_httprequests.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,8 @@
1414

1515

1616
def json_serialize(body: Any) -> str:
17-
return json.dumps(body) if body else "" if body == "" else "null"
17+
# serialize as string if this object cannot be serialized (e.g. UUID)
18+
return json.dumps(body, default=str) if body else "" if body == "" else "null"
1819

1920

2021
class HttpRequests:

python/tests/test_pipeline.py

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,15 @@ def test_delete_all_pipelines(self):
1717
for pipeline in pipelines:
1818
TEST_CLIENT.delete_pipeline(pipeline.name)
1919

20-
def test_create_pipeline(self, name: str = "blah", delete=False):
20+
def test_create_pipeline(
21+
self, name: str = "blah", delete=False, runtime_config: dict = {}
22+
):
23+
self.test_delete_all_pipelines()
2124
sql = """
2225
CREATE TABLE tbl(id INT) WITH ('append_only' = 'true');
2326
CREATE VIEW V AS SELECT * FROM tbl;
2427
"""
25-
pipeline = Pipeline(name, sql, "", "", {}, {})
28+
pipeline = Pipeline(name, sql, "", "", {}, runtime_config)
2629
pipeline = TEST_CLIENT.create_pipeline(pipeline)
2730

2831
if delete:
@@ -84,12 +87,12 @@ def test_get_pipeline(self):
8487

8588
def test_get_pipeline_config(self):
8689
name = str(uuid.uuid4())
87-
self.test_create_pipeline(name, False)
90+
self.test_create_pipeline(name, False, {"workers": 2, "storage": False})
8891
config = TEST_CLIENT.get_runtime_config(name)
8992

9093
assert config is not None
91-
assert config.get("workers") is not None
92-
assert config.get("storage") is not None
94+
assert config.get("workers") == 2
95+
assert not config.get("storage")
9396

9497
TEST_CLIENT.delete_pipeline(name)
9598

python/tests/test_pipeline_builder.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
import os
22
import time
33
import unittest
4+
import uuid
5+
46
import pandas as pd
57
from kafka import KafkaProducer, KafkaConsumer
68
from kafka.admin import KafkaAdminClient, NewTopic
@@ -1057,6 +1059,30 @@ def test_connector_orchestration(self):
10571059
pipeline.shutdown()
10581060
pipeline.delete()
10591061

1062+
def test_uuid(self):
1063+
name = "test_uuid"
1064+
sql = """
1065+
CREATE TABLE t0(c0 UUID);
1066+
CREATE MATERIALIZED VIEW v0 AS SELECT c0 FROM t0;
1067+
"""
1068+
1069+
pipeline = PipelineBuilder(TEST_CLIENT, name, sql=sql).create_or_replace()
1070+
out = pipeline.listen("v0")
1071+
pipeline.start()
1072+
1073+
data = [{"c0": uuid.uuid4()}]
1074+
pipeline.input_json("t0", data)
1075+
1076+
for datum in data:
1077+
datum.update({"insert_delete": 1})
1078+
1079+
pipeline.wait_for_completion(True)
1080+
1081+
got = out.to_dict()
1082+
pipeline.shutdown()
1083+
1084+
assert got == data
1085+
10601086

10611087
if __name__ == "__main__":
10621088
unittest.main()

python/uv.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)