@@ -214,12 +214,8 @@ def add_connector(connector_name, relation_name, is_input):
214214 "transport" : {
215215 "name" : transport_type ,
216216 "config" : {
217- "auto.offset.reset" : "earliest" ,
218- "bootstrap.servers" : kafka_broker ,
219- "enable.ssl.certificate.verification" : "true" ,
220- "sasl.mechanism" : "PLAIN" ,
221- "security.protocol" : "PLAINTEXT" ,
222- }
217+ "auto.offset.reset" : "earliest"
218+ } | kafka_options
223219 },
224220 "format" : {
225221 "name" : "csv" ,
@@ -233,7 +229,7 @@ def add_connector(connector_name, relation_name, is_input):
233229 config ["topics" ] = [connector_name ]
234230 else :
235231 config ["topic" ] = connector_name
236- requests .put (f"{ api_url } /v0/connectors/{ connector_name } " , json = json ).raise_for_status ()
232+ requests .put (f"{ api_url } /v0/connectors/{ connector_name } " , headers = headers , json = json ).raise_for_status ()
237233 return {
238234 "connector_name" : connector_name ,
239235 "is_input" : is_input ,
@@ -248,18 +244,18 @@ def add_output_connector(connector_name, relation_name):
248244 return add_connector (connector_name , relation_name , False )
249245
250246def stop_pipeline (pipeline_name , wait ):
251- requests .post (f"{ api_url } /v0/pipelines/{ pipeline_name } /shutdown" ).raise_for_status ()
247+ requests .post (f"{ api_url } /v0/pipelines/{ pipeline_name } /shutdown" , headers = headers ).raise_for_status ()
252248 if wait :
253249 return wait_for_status (pipeline_name , "Shutdown" )
254250
255251def start_pipeline (pipeline_name , wait ):
256- requests .post (f"{ api_url } /v0/pipelines/{ pipeline_name } /start" ).raise_for_status ()
252+ requests .post (f"{ api_url } /v0/pipelines/{ pipeline_name } /start" , headers = headers ).raise_for_status ()
257253 if wait :
258254 return wait_for_status (pipeline_name , "Running" )
259255
260256def wait_for_status (pipeline_name , status ):
261257 start = time .time ()
262- while requests .get (f"{ api_url } /v0/pipelines/{ pipeline_name } " ).json ()["state" ]["current_status" ] != status :
258+ while requests .get (f"{ api_url } /v0/pipelines/{ pipeline_name } " , headers = headers ).json ()["state" ]["current_status" ] != status :
263259 time .sleep (.1 )
264260 return time .time () - start
265261
@@ -287,7 +283,9 @@ def main():
287283 description = 'Nexmark benchmark demo'
288284 )
289285 parser .add_argument ("--api-url" , required = True , help = "Feldera API URL (e.g., http://localhost:8080 )" )
290- parser .add_argument ("--kafka-broker" , required = True , help = "Kafka broker (e.g., localhost:9092 )" )
286+ parser .add_argument ("--api-key" , required = False , help = "Feldera API key (e.g., \" apikey:0123456789ABCDEF\" )" )
287+ parser .add_argument ("-O" , "--option" , action = 'append' , required = True ,
288+ help = "Kafka options passed as -O option=value, e.g., -O bootstrap.servers=localhost:9092" )
291289 parser .add_argument ("--cores" , type = int , help = "Number of cores to use for workers (default: 16)" )
292290 parser .add_argument ('--lateness' , action = argparse .BooleanOptionalAction , help = 'whether to use lateness for GC to save memory (default: --lateness)' )
293291 parser .add_argument ('--merge' , action = argparse .BooleanOptionalAction , help = 'whether to merge all the queries into one program (default: --no-merge)' )
@@ -300,9 +298,14 @@ def main():
300298 parser .add_argument ('--metrics-interval' , help = 'How often metrics should be sampled, in seconds (default: 1)' )
301299 parser .set_defaults (lateness = True , merge = False , storage = False , cores = 16 , metrics_interval = 1 )
302300
303- global api_url , kafka_broker
301+ global api_url , kafka_options , headers
304302 api_url = parser .parse_args ().api_url
305- kafka_broker = parser .parse_args ().kafka_broker
303+ api_key = parser .parse_args ().api_key
304+ headers = {} if api_key is None else {"authorization" : f"Bearer { api_key } " }
305+ kafka_options = {}
306+ for option_value in parser .parse_args ().option :
307+ option , value = option_value .split ("=" )
308+ kafka_options [option ] = value
306309 with_lateness = parser .parse_args ().lateness
307310 merge = parser .parse_args ().merge
308311 queries = sort_queries (parse_queries (parser .parse_args ().query ))
@@ -327,20 +330,23 @@ def main():
327330 for program_name in queries :
328331 # Create program
329332 program_sql = table_sql (with_lateness ) + QUERY_SQL [program_name ]
330- response = requests .put (f"{ api_url } /v0/programs/{ program_name } " , json = {
333+ response = requests .put (f"{ api_url } /v0/programs/{ program_name } " , headers = headers , json = {
331334 "description" : f"Nexmark benchmark: { program_name } " ,
332- "code" : program_sql
335+ "code" : program_sql ,
336+ "config" : {
337+ "profile" : "optimized"
338+ }
333339 })
334340 response .raise_for_status ()
335341 program_version = response .json ()["version" ]
336342
337343 # Compile program
338- requests .post (f"{ api_url } /v0/programs/{ program_name } /compile" , json = {"version" : program_version }).raise_for_status ()
339344
345+ requests .post (f"{ api_url } /v0/programs/{ program_name } /compile" , headers = headers , json = {"version" : program_version }).raise_for_status ()
340346 print (f"Compiling program(s)..." )
341347 for program_name in queries :
342348 while True :
343- status = requests .get (f"{ api_url } /v0/programs/{ program_name } " ).json ()["status" ]
349+ status = requests .get (f"{ api_url } /v0/programs/{ program_name } " , headers = headers ).json ()["status" ]
344350 print (f"Program { program_name } status: { status } " )
345351 if status == "Success" :
346352 break
@@ -357,9 +363,21 @@ def main():
357363 print ("Creating pipeline(s)..." )
358364 for program_name in queries :
359365 pipeline_name = program_name
360- requests .put (f"{ api_url } /v0/pipelines/{ pipeline_name } " , json = {
366+ requests .put (f"{ api_url } /v0/pipelines/{ pipeline_name } " , headers = headers , json = {
361367 "description" : "" ,
362- "config" : {"workers" : cores , "storage" : storage , "min_storage_rows" : min_storage_rows },
368+ "config" : {
369+ "workers" : cores ,
370+ "storage" : storage ,
371+ "min_storage_rows" : min_storage_rows ,
372+ "resources" : {
373+ # "cpu_cores_min": 0,
374+ # "cpu_cores_max": 16,
375+ # "memory_mb_min": 100,
376+ # "memory_mb_max": 32000,
377+ # "storage_mb_max": 128000,
378+ # "storage_class": "..."
379+ }
380+ },
363381 "program_name" : program_name ,
364382 "connectors" : input_connectors + [output_connectors [s ] for s in program_name .split (',' )],
365383 }).raise_for_status ()
@@ -389,7 +407,7 @@ def main():
389407 last_metrics = 0
390408 peak_memory = 0
391409 while True :
392- stats = requests .get (f"{ api_url } /v0/pipelines/{ pipeline_name } /stats" ).json ()
410+ stats = requests .get (f"{ api_url } /v0/pipelines/{ pipeline_name } /stats" , headers = headers ).json ()
393411 elapsed = time .time () - start
394412 if "global_metrics" in stats :
395413 global_metrics = stats ["global_metrics" ]
0 commit comments