@@ -64,3 +64,135 @@ to to call :meth:`.SQLContext.listen` before you call
6464
6565 # see the result
6666 print (df)
67+
68+
69+ Kafka
70+ ******
71+
72+ To setup Kafka as the source use :meth: `.SQLContext.connect_source_kafka ` and as the sink use
73+ :meth: `.SQLContext.connect_sink_kafka `.
74+
75+ Both of these methods require a ``config `` which is a dictionary, and ``fmt `` which is a
76+ `data format configuration <https://www.feldera.com/docs/api/json >`_ that is either a
77+ :class: `.JSONFormat ` or :class: `.CSVFormat `.
78+
79+ The input config looks like the following:
80+
81+ .. highlight :: python
82+ .. code-block :: python
83+
84+ source_config = {
85+ " topics" : [INPUT_TOPIC ],
86+ " bootstrap.servers" : KAFKA_SERVER_URL ,
87+ " auto.offset.reset" : " earliest" ,
88+ }
89+
90+ Here,
91+
92+ - ``topics `` is a list of Kafka topics to subscribe to for input data.
93+ - ``bootstrap.servers `` is the ``host:port `` of the Kafka server.
94+ - Similarly, other
95+ `relevant options supported by librdkafka <https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md >`_
96+ can also be set here, like: ``auto.offset.reset ``
97+
98+ More on Kafka as an input connector at: https://www.feldera.com/docs/connectors/sources/kafka
99+
100+ Similarly, the output config looks like the following:
101+
102+ .. highlight :: python
103+ .. code-block :: python
104+
105+ sink_config = {
106+ " topic" : OUTPUT_TOPIC ,
107+ " bootstrap.servers" : PIPELINE_TO_KAFKA_SERVER ,
108+ " auto.offset.reset" : " earliest" ,
109+ }
110+
111+ Here the only notable difference is:
112+
113+ - ``topic `` is the name of the topic to write the output data to.
114+
115+ More on Kafka as the output connector at: https://www.feldera.com/docs/connectors/sinks/kafka
116+
117+ .. warning ::
118+ Kafka is a streaming data source, therefore running: :meth: `.SQLContext.run_to_completion ` will run forever.
119+
120+ .. highlight :: python
121+ .. code-block :: python
122+
123+ from feldera import SQLContext, SQLSchema
124+ from feldera.formats import JSONFormat, JSONUpdateFormat
125+
126+ TABLE_NAME = " example"
127+ VIEW_NAME = " example_count"
128+ KAFKA_SERVER = " localhost:9092"
129+
130+ sql = SQLContext(' kafka' , ' http://localhost:8080' ).get_or_create()
131+ sql.register_table(TABLE_NAME , SQLSchema({" id" : " INT NOT NULL PRIMARY KEY" }))
132+ sql.register_view(VIEW_NAME , f " SELECT COUNT(*) as num_rows FROM { TABLE_NAME } " )
133+
134+ source_config = {
135+ " topics" : [" example_topic" ],
136+ " bootstrap.servers" : KAFKA_SERVER ,
137+ " auto.offset.reset" : " earliest" ,
138+ }
139+
140+ sink_config = {
141+ " topic" : " example_topic_out" ,
142+ " bootstrap.servers" : KAFKA_SERVER ,
143+ " auto.offset.reset" : " earliest" ,
144+ }
145+
146+ # Data format configuration
147+ format = JSONFormat().with_update_format(JSONUpdateFormat.InsertDelete).with_array(False )
148+
149+ sql.connect_source_kafka(TABLE_NAME , " kafka_conn_in" , source_config, format )
150+ sql.connect_sink_kafka(VIEW_NAME , " kafka_conn_out" , sink_config, format )
151+
152+ out = sql.listen(VIEW_NAME )
153+ sql.start()
154+ time.sleep(10 )
155+ sql.shutdown()
156+ df = out.to_pandas()
157+
158+
159+ HTTP GET
160+ *********
161+
162+
163+ Feldera can ingest data from a user-provided URL into a SQL table.
164+ The file is fetched using HTTP with the GET method.
165+
166+ More on the HTTP GET connector at: https://www.feldera.com/docs/connectors/sources/http-get
167+
168+ .. note ::
169+ The JSON used as input for Feldera should be in
170+ `newline-delimited JSON (NDJSON) format <https://www.feldera.com/docs/api/json/#encoding-multiple-changes >`_.
171+
172+
173+ .. highlight :: python
174+ .. code-block :: python
175+
176+ from feldera import SQLContext, SQLSchema
177+ from feldera.formats import JSONFormat, JSONUpdateFormat
178+
179+ sql = SQLContext(" test_http_get" , TEST_CLIENT ).get_or_create()
180+
181+ TBL_NAME = " items"
182+ VIEW_NAME = " s"
183+
184+ sql.register_table(TBL_NAME , SQLSchema({" id" : " INT" , " name" : " STRING" }))
185+
186+ sql.register_view(VIEW_NAME , f " SELECT * FROM { TBL_NAME } " )
187+
188+ path = " https://feldera-basics-tutorial.s3.amazonaws.com/part.json"
189+
190+ fmt = JSONFormat().with_update_format(JSONUpdateFormat.InsertDelete).with_array(False )
191+ sql.connect_source_url(TBL_NAME , " part" , path, fmt)
192+
193+ out = sql.listen(VIEW_NAME )
194+
195+ sql.run_to_completion()
196+
197+ df = out.to_pandas()
198+
0 commit comments