Skip to content

Commit 4fee55f

Browse files
authored
py: kafka connector (#1807)
* py: kafka connector * also rename `Client` to `FelderaClient` Signed-off-by: Abhinav Gyawali <22275402+abhizer@users.noreply.github.com> py: update the kafka test Signed-off-by: Abhinav Gyawali <22275402+abhizer@users.noreply.github.com> py: update documentation * include an example for Kafka connector in the python docs * refactor `Client` to `FelderaClient` * add an entry in the changelog Signed-off-by: Abhinav Gyawali <22275402+abhizer@users.noreply.github.com> py: update documentation Signed-off-by: Abhinav Gyawali <22275402+abhizer@users.noreply.github.com> * py: HTTP GET input connector (#1816) * py: HTTP GET input connector Signed-off-by: Abhinav Gyawali <22275402+abhizer@users.noreply.github.com> * py: refactor: common function to check format to create connector Signed-off-by: Abhinav Gyawali <22275402+abhizer@users.noreply.github.com> * py: update docs, rename `UpdateFormat` to `JSONUpdateFormat` Signed-off-by: Abhinav Gyawali <22275402+abhizer@users.noreply.github.com> --------- Signed-off-by: Abhinav Gyawali <22275402+abhizer@users.noreply.github.com> --------- Signed-off-by: Abhinav Gyawali <22275402+abhizer@users.noreply.github.com>
1 parent a3bc757 commit 4fee55f

File tree

14 files changed

+507
-10
lines changed

14 files changed

+507
-10
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
1919

2020
## [0.17.0] - 2024-05-28
2121

22+
- [Python] Added support for Kafka connector via Python SDK
23+
([#1807](https://github.com/feldera/feldera/pull/1807))
24+
- [Python] Added support for HTTP GET connector via Python SDK
25+
2226
### Added
2327

2428
- Added backpressure mode to the `/egress` endpoint, which applies

python/.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,3 +10,4 @@ docs/feldera.rst
1010
docs/feldera.rest.rst
1111
build
1212
feldera.egg-info
13+
UNKNOWN.egg-info

python/docs/examples.rst

Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,3 +64,135 @@ to to call :meth:`.SQLContext.listen` before you call
6464
6565
# see the result
6666
print(df)
67+
68+
69+
Kafka
70+
******
71+
72+
To setup Kafka as the source use :meth:`.SQLContext.connect_source_kafka` and as the sink use
73+
:meth:`.SQLContext.connect_sink_kafka`.
74+
75+
Both of these methods require a ``config`` which is a dictionary, and ``fmt`` which is a
76+
`data format configuration <https://www.feldera.com/docs/api/json>`_ that is either a
77+
:class:`.JSONFormat` or :class:`.CSVFormat`.
78+
79+
The input config looks like the following:
80+
81+
.. highlight:: python
82+
.. code-block:: python
83+
84+
source_config = {
85+
"topics": [INPUT_TOPIC],
86+
"bootstrap.servers": KAFKA_SERVER_URL,
87+
"auto.offset.reset": "earliest",
88+
}
89+
90+
Here,
91+
92+
- ``topics`` is a list of Kafka topics to subscribe to for input data.
93+
- ``bootstrap.servers`` is the ``host:port`` of the Kafka server.
94+
- Similarly, other
95+
`relevant options supported by librdkafka <https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md>`_
96+
can also be set here, like: ``auto.offset.reset``
97+
98+
More on Kafka as an input connector at: https://www.feldera.com/docs/connectors/sources/kafka
99+
100+
Similarly, the output config looks like the following:
101+
102+
.. highlight:: python
103+
.. code-block:: python
104+
105+
sink_config = {
106+
"topic": OUTPUT_TOPIC,
107+
"bootstrap.servers": PIPELINE_TO_KAFKA_SERVER,
108+
"auto.offset.reset": "earliest",
109+
}
110+
111+
Here the only notable difference is:
112+
113+
- ``topic`` is the name of the topic to write the output data to.
114+
115+
More on Kafka as the output connector at: https://www.feldera.com/docs/connectors/sinks/kafka
116+
117+
.. warning::
118+
Kafka is a streaming data source, therefore running: :meth:`.SQLContext.run_to_completion` will run forever.
119+
120+
.. highlight:: python
121+
.. code-block:: python
122+
123+
from feldera import SQLContext, SQLSchema
124+
from feldera.formats import JSONFormat, JSONUpdateFormat
125+
126+
TABLE_NAME = "example"
127+
VIEW_NAME = "example_count"
128+
KAFKA_SERVER = "localhost:9092"
129+
130+
sql = SQLContext('kafka', 'http://localhost:8080').get_or_create()
131+
sql.register_table(TABLE_NAME, SQLSchema({"id": "INT NOT NULL PRIMARY KEY"}))
132+
sql.register_view(VIEW_NAME, f"SELECT COUNT(*) as num_rows FROM {TABLE_NAME}")
133+
134+
source_config = {
135+
"topics": ["example_topic"],
136+
"bootstrap.servers": KAFKA_SERVER,
137+
"auto.offset.reset": "earliest",
138+
}
139+
140+
sink_config = {
141+
"topic": "example_topic_out",
142+
"bootstrap.servers": KAFKA_SERVER,
143+
"auto.offset.reset": "earliest",
144+
}
145+
146+
# Data format configuration
147+
format = JSONFormat().with_update_format(JSONUpdateFormat.InsertDelete).with_array(False)
148+
149+
sql.connect_source_kafka(TABLE_NAME, "kafka_conn_in", source_config, format)
150+
sql.connect_sink_kafka(VIEW_NAME, "kafka_conn_out", sink_config, format)
151+
152+
out = sql.listen(VIEW_NAME)
153+
sql.start()
154+
time.sleep(10)
155+
sql.shutdown()
156+
df = out.to_pandas()
157+
158+
159+
HTTP GET
160+
*********
161+
162+
163+
Feldera can ingest data from a user-provided URL into a SQL table.
164+
The file is fetched using HTTP with the GET method.
165+
166+
More on the HTTP GET connector at: https://www.feldera.com/docs/connectors/sources/http-get
167+
168+
.. note::
169+
The JSON used as input for Feldera should be in
170+
`newline-delimited JSON (NDJSON) format <https://www.feldera.com/docs/api/json/#encoding-multiple-changes>`_.
171+
172+
173+
.. highlight:: python
174+
.. code-block:: python
175+
176+
from feldera import SQLContext, SQLSchema
177+
from feldera.formats import JSONFormat, JSONUpdateFormat
178+
179+
sql = SQLContext("test_http_get", TEST_CLIENT).get_or_create()
180+
181+
TBL_NAME = "items"
182+
VIEW_NAME = "s"
183+
184+
sql.register_table(TBL_NAME, SQLSchema({"id": "INT", "name": "STRING"}))
185+
186+
sql.register_view(VIEW_NAME, f"SELECT * FROM {TBL_NAME}")
187+
188+
path = "https://feldera-basics-tutorial.s3.amazonaws.com/part.json"
189+
190+
fmt = JSONFormat().with_update_format(JSONUpdateFormat.InsertDelete).with_array(False)
191+
sql.connect_source_url(TBL_NAME, "part", path, fmt)
192+
193+
out = sql.listen(VIEW_NAME)
194+
195+
sql.run_to_completion()
196+
197+
df = out.to_pandas()
198+

python/docs/introduction.rst

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,10 @@ Replace ``{BRANCH_NAME}`` with the name of the branch you want to install from.
2929
Key Concepts
3030
************
3131

32-
* :class:`feldera.FelderaClient` or :class:`.Client`
32+
* :class:`.FelderaClient`
3333
- This is the actual HTTP client used to make requests to your Feldera
3434
instance.
35-
- creating an instance of :class:`.Client` is usually the first thing you
35+
- creating an instance of :class:`.FelderaClient` is usually the first thing you
3636
will do while working with Feldera.
3737

3838
- Example:
@@ -62,7 +62,7 @@ Key Concepts
6262
6363
- The first parameter is the name of this SQL context. By default, this is
6464
the name used in both Feldera Program and Pipeline.
65-
- The second parameter here is :class:`.Client` that we created above.
65+
- The second parameter here is :class:`.FelderaClient` that we created above.
6666

6767
* :meth:`.SQLContext.run_to_completion`
6868
- Runs this Feldera pipeline to completion. Normally this means until the EoF

python/feldera/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
1-
from feldera.rest.client import Client as FelderaClient
1+
from feldera.rest.feldera_client import FelderaClient
22
from feldera.sql_context import SQLContext
33
from feldera.sql_schema import SQLSchema

python/feldera/_helpers.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import pandas as pd
2+
from feldera.formats import JSONFormat, CSVFormat
23

34

45
def ensure_dataframe_has_columns(df: pd.DataFrame):
@@ -23,3 +24,11 @@ def dataframe_from_response(buffer: list[list[dict]]):
2324
{**item['insert'], 'insert_delete': 1} if 'insert' in item else {**item['delete'], 'insert_delete': -1}
2425
for sublist in buffer for item in sublist
2526
])
27+
28+
29+
def validate_connector_input_format(fmt: JSONFormat | CSVFormat):
30+
if not isinstance(fmt, JSONFormat) and not isinstance(fmt, CSVFormat):
31+
raise ValueError("format must be JSONFormat or CSVFormat")
32+
33+
if isinstance(fmt, JSONFormat) and fmt.config.get("update_format") is None:
34+
raise ValueError("update_format not set in the format config; consider using: .with_update_format()")

python/feldera/formats.py

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
from typing import Optional
2+
from typing_extensions import Self
3+
from enum import Enum
4+
5+
6+
class JSONUpdateFormat(Enum):
7+
"""
8+
Supported JSON data change event formats.
9+
10+
Each element in a JSON-formatted input stream specifies
11+
an update to one or more records in an input table. We support
12+
several different ways to represent such updates.
13+
14+
https://www.feldera.com/docs/api/json/#the-insertdelete-format
15+
"""
16+
17+
InsertDelete = 1
18+
"""
19+
Insert/delete format.
20+
21+
Each element in the input stream consists of an "insert" or "delete"
22+
command and a record to be inserted to or deleted from the input table.
23+
24+
Example: `{"insert": {"id": 1, "name": "Alice"}, "delete": {"id": 2, "name": "Bob"}}`
25+
Here, `id` and `name` are the columns in the table.
26+
"""
27+
28+
Raw = 2
29+
"""
30+
Raw input format.
31+
32+
This format is suitable for insert-only streams (no deletions).
33+
Each element in the input stream contains a record without any
34+
additional envelope that gets inserted in the input table.
35+
36+
Example: `{"id": 1, "name": "Alice"}`
37+
Here, `id` and `name` are the columns in the table.
38+
"""
39+
40+
def __str__(self):
41+
match self:
42+
case JSONUpdateFormat.InsertDelete:
43+
return "insert_delete"
44+
case JSONUpdateFormat.Raw:
45+
return "raw"
46+
47+
48+
class JSONFormat:
49+
"""
50+
Used to represent data ingested and output from Feldera in the JSON format.
51+
"""
52+
53+
def __init__(self, config: Optional[dict] = None):
54+
"""
55+
Creates a new JSONFormat instance.
56+
57+
:param config: Optional. Configuration for the JSON format.
58+
"""
59+
60+
self.config: dict = config or {
61+
"array": False,
62+
}
63+
64+
def with_update_format(self, update_format: JSONUpdateFormat) -> Self:
65+
"""
66+
Specifies the format of the data change events in the JSON data stream.
67+
"""
68+
69+
self.config["update_format"] = update_format.__str__()
70+
return self
71+
72+
def with_array(self, array: bool) -> Self:
73+
"""
74+
Set to `True` if updates in this stream are packaged into JSON arrays.
75+
76+
Example: `[{"id": 1, "name": "Alice"}, {"id": 2, "name": "Bob"}]`
77+
"""
78+
79+
self.config["array"] = array
80+
return self
81+
82+
def to_dict(self):
83+
"""
84+
Serialize to a dict to be used in the API request.
85+
86+
:meta private:
87+
"""
88+
return {
89+
"name": "json",
90+
"config": self.config
91+
}
92+
93+
94+
class CSVFormat:
95+
"""
96+
Used to represent data ingested and output from Feldera in the CSV format.
97+
98+
https://www.feldera.com/docs/api/csv
99+
"""
100+
101+
def to_dict(self) -> dict:
102+
"""
103+
Serialize to a dict to be used in the API request.
104+
105+
:meta private:
106+
"""
107+
return {
108+
"name": "csv"
109+
}

python/feldera/rest/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,4 +8,4 @@
88
99
"""
1010

11-
from feldera.rest.client import Client
11+
from feldera.rest.feldera_client import FelderaClient

python/feldera/rest/config.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
class Config:
55
"""
6-
Client's credentials and configuration parameters
6+
:class:`.FelderaClient`'s credentials and configuration parameters
77
"""
88

99
def __init__(
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ def _prepare_boolean_input(value: bool) -> str:
1515
return "true" if value else "false"
1616

1717

18-
class Client:
18+
class FelderaClient:
1919
"""
2020
A client for the Feldera HTTP API
2121

0 commit comments

Comments
 (0)