Skip to content

Commit 0e24687

Browse files
abhizerryzhyk
andauthored
python sdk for feldera (#1745)
* py: python wrapper around the rest api client Signed-off-by: Abhinav Gyawali <22275402+abhizer@users.noreply.github.com> * test: get_pipeline_stats and get_pipeline_config Signed-off-by: Abhinav Gyawali <22275402+abhizer@users.noreply.github.com> * py: higher level abstraction for the REST client Intends to make creating a program, pipeline and overall experience of interacting with feldera via python easier. Has integrations with the pandas DataFrame to make them easy to use with Feldera. This is the first version with support for notebook only use cases. Uses HTTP to send / receive data from feldera. Signed-off-by: Abhinav Gyawali <22275402+abhizer@users.noreply.github.com> * docs: remove python sdk's auto generated docs Signed-off-by: Abhinav Gyawali <22275402+abhizer@users.noreply.github.com> * py: use pipeline_complete flag in run_to_completion Signed-off-by: Abhinav Gyawali <22275402+abhizer@users.noreply.github.com> * py: add pyproject.toml Signed-off-by: Abhinav Gyawali <22275402+abhizer@users.noreply.github.com> * py: remove built-in dependencies from pyproject Signed-off-by: Abhinav Gyawali <22275402+abhizer@users.noreply.github.com> * py: support for delta table connectors * SQLContext now has `from_delta_table`, `to_delta_table` methods to connect to delta table * also buffer chunks while listening to http egress connector until we have valid json * OutputHandler.to_pandas() now also creates a column `insert_delete` which preserves that information Signed-off-by: Abhinav Gyawali <22275402+abhizer@users.noreply.github.com> * py: default max buffer size for delta table output connector Co-authored-by: Leonid Ryzhyk <ryzhyk@gmail.com> Signed-off-by: Abhinav Gyawali <22275402+abhizer@users.noreply.github.com> * py: fixes based on suggestions * run_to_completion now starts the pipeline in paused state, and resumes it when output handlers have been created, once it has ran to completion, the pipeline gets shutdown Signed-off-by: Abhinav Gyawali <22275402+abhizer@users.noreply.github.com> * py: rename methods from_source* to connect_sink* * input_from_pandas -> connect_source_pandas * from_delta_table -> connect_source_delta_table * to_delta_table -> connect_sink_delta_table Signed-off-by: Abhinav Gyawali <22275402+abhizer@users.noreply.github.com> --------- Signed-off-by: Abhinav Gyawali <22275402+abhizer@users.noreply.github.com> Co-authored-by: Leonid Ryzhyk <ryzhyk@gmail.com>
1 parent 2e702ce commit 0e24687

26 files changed

+1919
-0
lines changed

python/.gitignore

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
.coverage
2+
__pycache__
3+
test.log
4+
.idea

python/README.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
# Feldera Python SDK
2+
3+
Feldera Python is the Feldera SDK for Python developers.

python/feldera/.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
__pycache__

python/feldera/__init__.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
from feldera.rest.client import Client as FelderaClient
2+
from feldera.sql_context import SQLContext
3+
from feldera.sql_schema import SQLSchema

python/feldera/_sql_table.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
from typing import Optional
2+
from feldera.sql_schema import SQLSchema
3+
4+
5+
class SQLTable:
6+
name: str
7+
ddl: str
8+
schema: Optional[SQLSchema]
9+
10+
def __init__(self, name: str, ddl: Optional[str] = None, schema: Optional[SQLSchema] = None):
11+
if ddl is None and schema is None:
12+
raise ValueError("Either ddl or schema must be provided")
13+
14+
self.name = name
15+
self.ddl = ddl
16+
self.schema = schema
17+
18+
def build_ddl(self):
19+
"""
20+
Either returns the provided ddl or builds it from the schema
21+
"""
22+
23+
if self.schema is None:
24+
return self.ddl
25+
26+
if self.ddl is not None:
27+
raise ValueError("Both ddl and schema are provided")
28+
29+
return self.schema.build_ddl(self.name)

python/feldera/output_handler.py

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
import pandas as pd
2+
3+
4+
from threading import Thread
5+
from queue import Queue, Empty
6+
from feldera import FelderaClient
7+
from enum import Enum
8+
9+
10+
class _OutputHandlerInstruction(Enum):
11+
PipelineStarted = 1
12+
RanToCompletion = 2
13+
14+
15+
class OutputHandler(Thread):
16+
client: FelderaClient
17+
pipeline_name: str
18+
view_name: str
19+
queue: Queue
20+
buffer: list[list[dict]] = []
21+
22+
def __init__(self, client: FelderaClient, pipeline_name: str, view_name: str, queue: Queue):
23+
super().__init__()
24+
self.client = client
25+
self.pipeline_name = pipeline_name
26+
self.view_name = view_name
27+
self.queue = queue
28+
29+
def run(self):
30+
"""
31+
The main loop of the thread. It listens to the pipeline and appends the data to the buffer.
32+
Doesn't do integration, just takes the data and ignores if they are `insert`s or `delete`s.
33+
"""
34+
35+
ack: _OutputHandlerInstruction = self.queue.get()
36+
37+
match ack:
38+
case _OutputHandlerInstruction.PipelineStarted:
39+
gen_obj = self.client.listen_to_pipeline(self.pipeline_name, self.view_name, format="json")
40+
self.queue.task_done()
41+
42+
for chunk in gen_obj:
43+
chunk: dict = chunk
44+
data: list[dict] = chunk.get("json_data")
45+
46+
if data:
47+
self.buffer.append(data)
48+
49+
try:
50+
again_ack: _OutputHandlerInstruction = self.queue.get(block=False)
51+
if again_ack:
52+
match again_ack:
53+
case _OutputHandlerInstruction.RanToCompletion:
54+
self.queue.task_done()
55+
return
56+
case _OutputHandlerInstruction.PipelineStarted:
57+
self.queue.task_done()
58+
continue
59+
60+
except Empty:
61+
continue
62+
63+
case _OutputHandlerInstruction.RanToCompletion:
64+
self.queue.task_done()
65+
return
66+
67+
def to_pandas(self):
68+
"""
69+
Converts the output of the pipeline to a pandas DataFrame
70+
"""
71+
self.join()
72+
return pd.DataFrame([
73+
{**item['insert'], 'insert_delete': 1} if 'insert' in item else {**item['delete'], 'insert_delete': -1}
74+
for sublist in self.buffer for item in sublist
75+
])

python/feldera/rest/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
from feldera.rest.client import Client
Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
import logging
2+
3+
from feldera.rest.config import Config
4+
5+
from feldera.rest.errors import FelderaAPIError, FelderaTimeoutError, FelderaCommunicationError
6+
7+
import json
8+
import requests
9+
from typing import Callable, Optional, Any, Union, Mapping, Sequence, List
10+
11+
12+
class HttpRequests:
13+
def __init__(self, config: Config) -> None:
14+
self.config = config
15+
self.headers = {
16+
"User-Agent": "feldera-python-sdk/v1"
17+
}
18+
if self.config.api_key:
19+
self.headers["Authorization"] = f"Bearer {self.config.api_key}"
20+
21+
def send_request(
22+
self,
23+
http_method: Callable,
24+
path: str,
25+
body: Optional[
26+
Union[Mapping[str, Any], Sequence[Mapping[str, Any]], List[str], str]
27+
] = None,
28+
content_type: str = "application/json",
29+
params: Optional[Mapping[str, Any]] = None,
30+
stream: bool = False,
31+
) -> Any:
32+
"""
33+
:param http_method: The HTTP method to use. Takes the equivalent `requests.*` module. (Example: `requests.get`)
34+
:param path: The path to send the request to.
35+
:param body: The HTTP request body.
36+
:param content_type: The value for `Content-Type` HTTP header. "application/json" by default.
37+
:param params: The query parameters part of this request.
38+
:param stream: True if the response is expected to be a HTTP stream.
39+
"""
40+
self.headers["Content-Type"] = content_type
41+
42+
try:
43+
timeout = self.config.timeout
44+
headers = self.headers
45+
46+
request_path = self.config.url + "/" + self.config.version + path
47+
48+
logging.debug(
49+
"sending %s request to: %s with headers: %s, and params: %s",
50+
http_method.__name__, request_path, str(headers), str(params)
51+
)
52+
53+
if http_method.__name__ == "get":
54+
request = http_method(
55+
request_path,
56+
timeout=timeout,
57+
headers=headers,
58+
params=params,
59+
)
60+
elif isinstance(body, bytes):
61+
request = http_method(
62+
request_path,
63+
timeout=timeout,
64+
headers=headers,
65+
data=body,
66+
params=params,
67+
stream=stream,
68+
)
69+
else:
70+
request = http_method(
71+
request_path,
72+
timeout=timeout,
73+
headers=headers,
74+
data=json.dumps(body) if body else "" if body == "" else "null",
75+
params=params,
76+
stream=stream,
77+
)
78+
if stream:
79+
return request
80+
resp = self.__validate(request)
81+
logging.debug("got response: %s", str(resp))
82+
return resp
83+
84+
except requests.exceptions.Timeout as err:
85+
raise FelderaTimeoutError(str(err)) from err
86+
except requests.exceptions.ConnectionError as err:
87+
raise FelderaCommunicationError(str(err)) from err
88+
89+
def get(
90+
self,
91+
path: str,
92+
params: Optional[Mapping[str, Any]] = None
93+
) -> Any:
94+
return self.send_request(requests.get, path, params)
95+
96+
def post(
97+
self,
98+
path: str,
99+
body: Optional[
100+
Union[Mapping[str, Any], Sequence[Mapping[str, Any]], List[str], str]
101+
] = None,
102+
content_type: Optional[str] = "application/json",
103+
params: Optional[Mapping[str, Any]] = None,
104+
stream: bool = False,
105+
) -> Any:
106+
return self.send_request(requests.post, path, body, content_type, params, stream=stream)
107+
108+
def patch(
109+
self,
110+
path: str,
111+
body: Optional[
112+
Union[Mapping[str, Any], Sequence[Mapping[str, Any]], List[str], str]
113+
] = None,
114+
content_type: Optional[str] = "application/json",
115+
params: Optional[Mapping[str, Any]] = None
116+
) -> Any:
117+
return self.send_request(requests.patch, path, body, content_type, params)
118+
119+
def put(
120+
self,
121+
path: str,
122+
body: Optional[
123+
Union[Mapping[str, Any], Sequence[Mapping[str, Any]], List[str], str]
124+
] = None,
125+
content_type: Optional[str] = "application/json",
126+
params: Optional[Mapping[str, Any]] = None
127+
) -> Any:
128+
return self.send_request(requests.put, path, body, content_type, params)
129+
130+
def delete(
131+
self,
132+
path: str,
133+
body: Optional[Union[Mapping[str, Any], Sequence[Mapping[str, Any]], List[str]]] = None,
134+
params: Optional[Mapping[str, Any]] = None
135+
) -> Any:
136+
return self.send_request(requests.delete, path, body, params=params)
137+
138+
@staticmethod
139+
def __to_json(request: requests.Response) -> Any:
140+
if request.content == b"":
141+
return request
142+
return request.json()
143+
144+
@staticmethod
145+
def __validate(request: requests.Response) -> Any:
146+
try:
147+
request.raise_for_status()
148+
resp = HttpRequests.__to_json(request)
149+
return resp
150+
except requests.exceptions.HTTPError as err:
151+
raise FelderaAPIError(str(err), request) from err
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
import uuid
2+
from typing import Optional
3+
4+
5+
class AttachedConnector:
6+
"""
7+
A connector that is attached to a pipeline.
8+
"""
9+
10+
name: str
11+
is_input: bool
12+
connector_name: str
13+
relation_name: str
14+
15+
def __init__(self, connector_name: str, relation_name: str, is_input: bool, name: Optional[str] = None):
16+
self.name = name or str(uuid.uuid4())
17+
self.is_input = is_input
18+
self.connector_name = connector_name
19+
self.relation_name = relation_name
20+
21+
def to_json(self):
22+
return {
23+
"name": self.name,
24+
"is_input": self.is_input,
25+
"connector_name": self.connector_name,
26+
"relation_name": self.relation_name
27+
}

0 commit comments

Comments
 (0)