Skip to content

Commit 6527a27

Browse files
abhizerryzhyk
andauthored
py: refactor python api to use PipelineBuilder (#2177)
* py: refactor python api to use PipelineBuilder * removes the concept of SQLContext * introduces a PipelineBuilder, used to create a new pipeline or connect to existing ones Signed-off-by: Abhinav Gyawali <22275402+abhizer@users.noreply.github.com> * Apply suggestions from code review Co-authored-by: Leonid Ryzhyk <ryzhyk@gmail.com> Signed-off-by: Abhinav Gyawali <22275402+abhizer@users.noreply.github.com> * Apply suggestions from code review Co-authored-by: Leonid Ryzhyk <ryzhyk@gmail.com> Signed-off-by: Abhinav Gyawali <22275402+abhizer@users.noreply.github.com> * py: use newer PipelineBuilder api in demos & docs Signed-off-by: Abhinav Gyawali <22275402+abhizer@users.noreply.github.com> * py: add `Pipeline.restart()` method Signed-off-by: Abhinav Gyawali <22275402+abhizer@users.noreply.github.com> * py: remove PipelineBuilder.with_xxx, use constructor instead * add Pipeline.get(name, client) Signed-off-by: Abhinav Gyawali <22275402+abhizer@users.noreply.github.com> * py: fix python tests Signed-off-by: Abhinav Gyawali <22275402+abhizer@users.noreply.github.com> * demo: fix SQL error in debezium postgres demo Signed-off-by: Abhinav Gyawali <22275402+abhizer@users.noreply.github.com> * demos: remove ipynb checkpoints Signed-off-by: Abhinav Gyawali <22275402+abhizer@users.noreply.github.com> --------- Signed-off-by: Abhinav Gyawali <22275402+abhizer@users.noreply.github.com> Co-authored-by: Leonid Ryzhyk <ryzhyk@gmail.com>
1 parent 6343623 commit 6527a27

File tree

23 files changed

+596
-653
lines changed

23 files changed

+596
-653
lines changed

demo/project_demo08-DebeziumJDBC/run.py

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
import psycopg
1717
import json
1818
import random
19-
from feldera import SQLContext, FelderaClient
19+
from feldera import PipelineBuilder, FelderaClient, Pipeline
2020

2121
# File locations
2222
SCRIPT_DIR = os.path.join(os.path.dirname(__file__))
@@ -204,9 +204,8 @@ def build_sql(pipeline_to_redpanda_server: str) -> str:
204204

205205
def create_feldera_pipeline(api_url: str, pipeline_to_redpanda_server: str, start_pipeline: bool):
206206
client = FelderaClient(api_url)
207-
208-
pipeline = SQLContext(PIPELINE_NAME, client, "Debezium JDBC sink connector demo")
209-
pipeline.sql(build_sql(pipeline_to_redpanda_server))
207+
sql = build_sql(pipeline_to_redpanda_server)
208+
pipeline = PipelineBuilder(client, name="Debezium JDBC sink connector demo", sql=sql).create_or_replace()
210209

211210
if start_pipeline:
212211
print("Starting the pipeline...")
@@ -215,7 +214,7 @@ def create_feldera_pipeline(api_url: str, pipeline_to_redpanda_server: str, star
215214

216215
return pipeline
217216

218-
def generate_inputs(pipeline: SQLContext):
217+
def generate_inputs(pipeline: Pipeline):
219218
print("Generating records...")
220219
date_time = datetime.datetime(2024, 1, 30, 8, 58)
221220

demo/project_demo10-FraudDetectionDeltaLake/notebook.ipynb

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -249,7 +249,7 @@
249249
"source": [
250250
"import pandas as pd\n",
251251
"import json\n",
252-
"from feldera import SQLContext, FelderaClient\n",
252+
"from feldera import PipelineBuilder, FelderaClient\n",
253253
"\n",
254254
"DATA_URI = \"s3://feldera-fraud-detection-data\"\n",
255255
"\n",
@@ -295,9 +295,7 @@
295295
"client = FelderaClient(\"http://localhost:8080\")\n",
296296
"\n",
297297
"sql = build_program(json.dumps(transactions_connectors), json.dumps(demographics_connectors), '[]')\n",
298-
"pipeline = SQLContext(\"fraud_detection_training\", client)\n",
299-
"pipeline.sql(sql)\n",
300-
"\n",
298+
"pipeline = PipelineBuilder(client, name=\"fraud_detection_training\", sql=sql).create_or_replace()\n",
301299
"hfeature = pipeline.listen(\"feature\")\n",
302300
"\n",
303301
"# Process full snapshot of the input tables and compute a dataset with feature vectors.\n",
@@ -435,8 +433,7 @@
435433
"# }]\n",
436434
"\n",
437435
"sql = build_program(json.dumps(transactions_connectors), json.dumps(demographics_connectors), '[]')\n",
438-
"pipeline = SQLContext(\"fraud_detection_inference\", client)\n",
439-
"pipeline.sql(sql)\n",
436+
"pipeline = PipelineBuilder(client, name=\"fraud_detection_inference\", sql=sql).create_or_replace()\n",
440437
"\n",
441438
"pipeline.foreach_chunk(\"feature\", lambda df, chunk : inference(trained_model, df))\n",
442439
"\n",
@@ -484,9 +481,9 @@
484481
],
485482
"metadata": {
486483
"kernelspec": {
487-
"display_name": "venv",
484+
"display_name": "Python 3 (ipykernel)",
488485
"language": "python",
489-
"name": "venv"
486+
"name": "python3"
490487
},
491488
"language_info": {
492489
"codemirror_mode": {
@@ -498,7 +495,7 @@
498495
"name": "python",
499496
"nbconvert_exporter": "python",
500497
"pygments_lexer": "ipython3",
501-
"version": "3.12.4"
498+
"version": "3.11.9"
502499
}
503500
},
504501
"nbformat": 4,

demo/project_demo10-FraudDetectionDeltaLake/run.py

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
from IPython.display import display
2-
from feldera import SQLContext, FelderaClient
2+
from feldera import PipelineBuilder, FelderaClient
33
import pandas as pd
44
from xgboost import XGBClassifier
55
from sklearn.model_selection import train_test_split
@@ -122,8 +122,7 @@ def main():
122122

123123
sql = build_program(json.dumps(transactions_connectors), json.dumps(demographics_connectors), json.dumps(features_connectors))
124124

125-
pipeline = SQLContext("fraud_detection_training", client)
126-
pipeline.sql(sql)
125+
pipeline = PipelineBuilder(client, name="fraud_detection_training", sql=sql).create_or_replace()
127126

128127
hfeature = pipeline.listen("feature")
129128

@@ -199,8 +198,7 @@ def main():
199198

200199

201200
sql = build_program(json.dumps(transactions_connectors), json.dumps(demographics_connectors), json.dumps(features_connectors))
202-
pipeline = SQLContext("fraud_detection_inference", client)
203-
pipeline.sql(sql)
201+
pipeline = PipelineBuilder(client, name="fraud_detection_inference", sql=sql).create_or_replace()
204202

205203
pipeline.foreach_chunk("feature", lambda df, chunk : inference(trained_model, df))
206204

demo/project_demo11-Hopsworks/1_feature_pipeline.ipynb

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
},
2222
"outputs": [],
2323
"source": [
24-
"from feldera import FelderaClient, SQLContext"
24+
"from feldera import FelderaClient, PipelineBuilder"
2525
]
2626
},
2727
{
@@ -62,7 +62,7 @@
6262
"# Use local Feldera instance\n",
6363
"client = FelderaClient(\"http://localhost:8080\")\n",
6464
"\n",
65-
"pipeline = SQLContext(\"hopsworks_kafka\", client)"
65+
"pbuilder = PipelineBuilder(client).with_name(\"hopsworks_kafka\")"
6666
]
6767
},
6868
{
@@ -376,7 +376,7 @@
376376
" }\n",
377377
"})\n",
378378
"\n",
379-
"pipeline.sql(build_sql(transaction_source_config, combined_sink_config, windowed_sink_config))"
379+
"pipeline = pbuilder.with_sql(build_sql(transaction_source_config, combined_sink_config, windowed_sink_config)).create_or_replace()"
380380
]
381381
},
382382
{
@@ -579,9 +579,9 @@
579579
],
580580
"metadata": {
581581
"kernelspec": {
582-
"display_name": "venv",
582+
"display_name": "Python 3 (ipykernel)",
583583
"language": "python",
584-
"name": "venv"
584+
"name": "python3"
585585
},
586586
"language_info": {
587587
"codemirror_mode": {
@@ -593,7 +593,7 @@
593593
"name": "python",
594594
"nbconvert_exporter": "python",
595595
"pygments_lexer": "ipython3",
596-
"version": "3.12.4"
596+
"version": "3.11.9"
597597
}
598598
},
599599
"nbformat": 4,
Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,11 @@
1-
from feldera import FelderaClient, SQLContext
2-
from feldera.formats import JSONFormat, JSONUpdateFormat
1+
from feldera import FelderaClient, PipelineBuilder
2+
from feldera.runtime_config import RuntimeConfig
33
import config
44
from sql_program import generate_program
55
import time
66

77

88
client = FelderaClient("http://localhost:8080")
9-
sql = SQLContext("mil", client, workers=10, storage=False)
109

1110
code = generate_program(
1211
{
@@ -18,23 +17,30 @@
1817
"poller_threads": 12,
1918
},
2019
},
21-
JSONFormat().with_array(False).with_update_format(JSONUpdateFormat.Raw).to_dict(),
20+
{
21+
"name": "json",
22+
"config": {
23+
"update_format": "raw",
24+
"array": False
25+
}
26+
}
2227
)
2328

24-
sql.sql(code)
29+
runtime_config = RuntimeConfig(storage=False, workers=10)
30+
pipeline = PipelineBuilder(client, name="mil", sql=code, runtime_config=runtime_config).create_or_replace()
2531

2632
print("Starting Feldera Pipeline")
27-
sql.start()
33+
pipeline.start()
2834
print("Pipeline started")
29-
# sql.foreach_chunk("user_agg", lambda df, chunk: print(df))
35+
# pipeline.foreach_chunk("user_agg", lambda df, chunk: print(df))
3036

3137
start_time = time.time()
3238

33-
sql.wait_for_idle(idle_interval_s=1)
39+
pipeline.wait_for_idle(idle_interval_s=1)
3440

3541
end_time = time.time()
3642
elapsed = end_time - start_time
3743

3844
print(f"Pipeline finished in {elapsed}, shutting down...")
3945

40-
sql.shutdown()
46+
pipeline.shutdown()

demo/project_demo12-HopsworksTikTokRecSys/sql_program.py

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -64,10 +64,6 @@ def generate_program(transport_cfg, format_cfg):
6464

6565

6666
if __name__ == "__main__":
67-
from feldera.formats import JSONUpdateFormat, JSONFormat
68-
69-
in_fmt = JSONFormat().with_array(False).with_update_format(JSONUpdateFormat.Raw)
70-
7167
print(
7268
generate_program(
7369
{
@@ -79,6 +75,12 @@ def generate_program(transport_cfg, format_cfg):
7975
"poller_threads": 12,
8076
},
8177
},
82-
in_fmt.to_dict(),
78+
{
79+
"name": "json",
80+
"config": {
81+
"update_format": "raw",
82+
"array": False,
83+
}
84+
}
8385
)
8486
)

demo/project_demo12-HopsworksTikTokRecSys/test.py

Lines changed: 17 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,33 +1,34 @@
11
from datetime import datetime
2-
from feldera.formats import JSONFormat, JSONUpdateFormat
32
from sql_program import generate_program
4-
from feldera import SQLContext, FelderaClient
3+
from feldera import PipelineBuilder, Pipeline, FelderaClient
4+
from feldera.runtime_config import RuntimeConfig
55
import pandas as pd
66
from typing import Dict, Any, List
77

88

9-
def process_input(sql: SQLContext, data: List[Dict[str, Any]]):
10-
sql.input_pandas("interactions", pd.DataFrame(data))
11-
sql.wait_for_completion(shutdown=False)
9+
def process_input(p: Pipeline, data: List[Dict[str, Any]]):
10+
p.input_pandas("interactions", pd.DataFrame(data))
11+
p.wait_for_completion(shutdown=False)
1212

1313

14-
client = FelderaClient("http://localhost:8080")
15-
sql = SQLContext("tiktok_test", client, workers=10, storage=False)
1614

1715
code = generate_program(None, None)
18-
sql.sql(code)
1916

20-
# sql.foreach_chunk("user_agg", lambda df, chunk : print(df))
17+
client = FelderaClient("http://localhost:8080")
18+
config = RuntimeConfig(workers=10, storage=False)
19+
pipeline = PipelineBuilder(client, name="tiktok_test", sql=code, runtime_config=config).create_or_replace()
20+
21+
# pipeline.foreach_chunk("user_agg", lambda df, chunk : print(df))
2122

2223
print("Starting Feldera Pipeline")
23-
sql.start()
24+
pipeline.start()
2425
print("Pipeline started")
2526

26-
hvideo_agg = sql.listen("video_agg")
27-
huser_agg = sql.listen("user_agg")
27+
hvideo_agg = pipeline.listen("video_agg")
28+
huser_agg = pipeline.listen("user_agg")
2829

2930
process_input(
30-
sql,
31+
pipeline,
3132
[
3233
{
3334
"interaction_id": 1,
@@ -63,7 +64,7 @@ def process_input(sql: SQLContext, data: List[Dict[str, Any]]):
6364
)
6465

6566
process_input(
66-
sql,
67+
pipeline,
6768
[
6869
{
6970
"interaction_id": 3,
@@ -99,7 +100,7 @@ def process_input(sql: SQLContext, data: List[Dict[str, Any]]):
99100
)
100101

101102
process_input(
102-
sql,
103+
pipeline,
103104
[
104105
{
105106
"interaction_id": 5,
@@ -141,4 +142,4 @@ def process_input(sql: SQLContext, data: List[Dict[str, Any]]):
141142
print(huser_agg.to_pandas())
142143

143144
print("Success")
144-
sql.shutdown()
145+
pipeline.shutdown()

demo/project_demo13-DebeziumPostgres/project.sql

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ CREATE TABLE customers (
66
first_name varchar(255) NOT NULL,
77
last_name varchar(255) NOT NULL,
88
email varchar(255) NOT NULL
9-
) with
9+
) with (
1010
'materialized' = 'true',
1111
'connectors' = '[{
1212
"transport": {
@@ -24,7 +24,7 @@ CREATE TABLE customers (
2424
"json_flavor": "debezium_mysql"
2525
}
2626
}
27-
}]';
27+
}]');
2828

2929
CREATE TABLE orders (
3030
id int NOT NULL,

demo/project_demo13-DebeziumPostgres/run.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
import requests
1212
import argparse
1313
import json
14-
from feldera import FelderaClient, SQLContext
14+
from feldera import FelderaClient, PipelineBuilder
1515
from kafka.admin import KafkaAdminClient
1616
from kafka.errors import UnknownTopicOrPartitionError
1717

@@ -114,11 +114,11 @@ def prepare_feldera_pipeline(api_url, kafka_url):
114114

115115
pipeline_name = "demo-debezium-postgres-pipeline"
116116
client = FelderaClient(api_url)
117-
pipeline = SQLContext(pipeline_name, client)
118117
sql = open(PROJECT_SQL).read().replace("[REPLACE-BOOTSTRAP-SERVERS]", kafka_url)
119118

120119
print("Starting pipeline...")
121-
pipeline.sql(sql)
120+
pipeline = PipelineBuilder(client, name=pipeline_name, sql=sql).create_or_replace()
121+
pipeline.start()
122122
print("Pipeline started")
123123

124124
if __name__ == "__main__":

docs/connectors/sinks/delta.md

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -77,8 +77,3 @@ WITH (
7777
}
7878
}]'
7979
)
80-
81-
### Python SDK
82-
83-
See [Delta Lake output connector documentation](https://www.feldera.com/python/feldera.html#feldera.sql_context.SQLContext.connect_sink_delta_table)
84-
in the Python SDK.

0 commit comments

Comments
 (0)