11#! /usr/bin/python3
22
33import csv
4+ import json
45import os
56import sys
67import time
@@ -19,7 +20,7 @@ def load_queries(folder):
1920 queries [f .split ('.' )[0 ]] = file .read ()
2021 return queries
2122
22- def load_table (folder , with_lateness ):
23+ def load_table (folder , with_lateness , suffix ):
2324 p = os .path .join (FILE_DIR , folder + '/table.sql' )
2425 file = open (p , 'r' )
2526 text = file .read ()
@@ -29,11 +30,9 @@ def load_table(folder, with_lateness):
2930 i = line .find (table_start_string )
3031 if i >= 0 :
3132 inputs += [line [i + len (table_start_string ):].split (' ' )[0 ]]
32- lateness = ''
33- if with_lateness :
34- lateness = "LATENESS INTERVAL 4 SECONDS"
35- text = text .replace ('{lateness}' , lateness )
36- return [text , inputs ]
33+ subst = {input : make_connector (input , suffix ) for input in inputs }
34+ subst ["lateness" ] = "LATENESS INTERVAL 4 SECONDS"
35+ return text .format (** subst )
3736
3837def sort_queries (queries ):
3938 return sorted (queries , key = lambda q : int (q [1 :]))
@@ -60,46 +59,28 @@ def parse_queries(all_queries, arg):
6059
6160 return queries
6261
63- def add_connector (connector_name , relation_name , is_input ):
64- transport_type = "kafka_" + ("input" if is_input else "output" )
65- json = {
66- "description" : "" ,
67- "config" : {
68- "transport" : {
69- "name" : transport_type ,
70- "config" : {
71- "auto.offset.reset" : "earliest"
72- } | kafka_options
73- },
74- "format" : {
75- "name" : "csv" ,
76- "config" : {}
77- }
78- }
79- }
80- config = json ["config" ]["transport" ]["config" ]
81- if is_input :
82- config ["enable.partition.eof" ] = "true"
83- config ["topics" ] = [connector_name ]
84- else :
85- config ["topic" ] = connector_name
86- requests .put (f"{ api_url } /v0/connectors/{ connector_name } " , headers = headers , json = json ).raise_for_status ()
87- return {
88- "connector_name" : connector_name ,
89- "is_input" : is_input ,
90- "name" : connector_name ,
91- "relation_name" : relation_name ,
62+ def make_connector (topic , suffix ):
63+ name = "kafka_input"
64+ config = {
65+ "topics" : [topic + suffix ],
66+ "enable.partition.eof" : "true" ,
67+ "auto.offset.reset" : "earliest"
9268 }
9369
70+ return json .dumps ([{
71+ "format" : {
72+ "name" : "csv" ,
73+ "config" : {},
74+ },
75+ "transport" : {
76+ "name" : name ,
77+ "config" : config | kafka_options
78+ }
79+ }], indent = 4 )
80+
9481def get_full_name (folder , name ):
9582 return folder .split ('/' )[- 1 ] + '-' + name
9683
97- def add_input_connector (connector_name , relation_name ):
98- return add_connector (connector_name , relation_name , True )
99-
100- def add_output_connector (connector_name , relation_name ):
101- return add_connector (connector_name , relation_name , False )
102-
10384def stop_pipeline (pipeline_name , wait ):
10485 requests .post (f"{ api_url } /v0/pipelines/{ pipeline_name } /shutdown" , headers = headers ).raise_for_status ()
10586 if wait :
@@ -112,7 +93,7 @@ def start_pipeline(pipeline_name, wait):
11293
11394def wait_for_status (pipeline_name , status ):
11495 start = time .time ()
115- while requests .get (f"{ api_url } /v0/pipelines/{ pipeline_name } " , headers = headers ).json ()["state" ][ "current_status " ] != status :
96+ while requests .get (f"{ api_url } /v0/pipelines/{ pipeline_name } " , headers = headers ).json ()["deployment_status " ] != status :
11697 time .sleep (.1 )
11798 return time .time () - start
11899
@@ -145,8 +126,6 @@ def main():
145126 help = "Kafka options passed as -O option=value, e.g., -O bootstrap.servers=localhost:9092" )
146127 parser .add_argument ("--cores" , type = int , help = "Number of cores to use for workers (default: 16)" )
147128 parser .add_argument ('--lateness' , action = argparse .BooleanOptionalAction , help = 'whether to use lateness for GC to save memory (default: --lateness)' )
148- parser .add_argument ('--output' , action = argparse .BooleanOptionalAction , help = 'whether to write query output back to Kafka (default: --no-output)' )
149- parser .add_argument ('--merge' , action = argparse .BooleanOptionalAction , help = 'whether to merge all the queries into one program (default: --no-merge)' )
150129 parser .add_argument ('--storage' , action = argparse .BooleanOptionalAction , help = 'whether to enable storage (default: --no-storage)' )
151130 parser .add_argument ("--poller-threads" , required = False , type = int , help = "Override number of poller threads to use" )
152131 parser .add_argument ('--min-storage-bytes' , type = int , help = 'If storage is enabled, the minimum number of bytes to write a batch to storage.' )
@@ -156,7 +135,7 @@ def main():
156135 parser .add_argument ('--csv' , help = 'File to write results in .csv format' )
157136 parser .add_argument ('--csv-metrics' , help = 'File to write pipeline metrics (memory, disk) in .csv format' )
158137 parser .add_argument ('--metrics-interval' , help = 'How often metrics should be sampled, in seconds (default: 1)' )
159- parser .set_defaults (lateness = True , output = False , merge = False , storage = False , cores = 16 , metrics_interval = 1 , folder = 'benchmarks/nexmark' )
138+ parser .set_defaults (lateness = True , storage = False , cores = 16 , metrics_interval = 1 , folder = 'benchmarks/nexmark' )
160139
161140 global api_url , kafka_options , headers
162141 api_url = parser .parse_args ().api_url
@@ -166,12 +145,12 @@ def main():
166145 for option_value in parser .parse_args ().option :
167146 option , value = option_value .split ("=" )
168147 kafka_options [option ] = value
169- with_lateness = parser .parse_args ().lateness
170- save_output = parser .parse_args ().output
171- merge = parser .parse_args ().merge
148+ suffix = parser .parse_args ().input_topic_suffix or ''
149+
172150 folder = parser .parse_args ().folder
151+ table = load_table (folder , parser .parse_args ().lateness , suffix )
173152 all_queries = load_queries (folder )
174- table , inputs = load_table ( folder , with_lateness )
153+
175154 queries = sort_queries (parse_queries (all_queries , parser .parse_args ().query ))
176155 cores = int (parser .parse_args ().cores )
177156 storage = parser .parse_args ().storage
@@ -181,65 +160,21 @@ def main():
181160 min_storage_bytes = parser .parse_args ().min_storage_bytes
182161 if min_storage_bytes is not None :
183162 min_storage_bytes = int (min_storage_bytes )
184- suffix = parser .parse_args ().input_topic_suffix or ''
185163 csvfile = parser .parse_args ().csv
186164 csvmetricsfile = parser .parse_args ().csv_metrics
187165 metricsinterval = float (parser .parse_args ().metrics_interval )
188166
189- output_connector_names = queries
190- if merge and len (queries ) > 1 :
191- merged_name = ',' .join (queries )
192- QUERY_SQL [merged_name ] = '\n ' .join ([QUERY_SQL [q ] for q in queries ])
193- queries = [merged_name ]
194-
195167 when = time .strftime ('%Y-%m-%d %H:%M:%S' , time .gmtime (time .time ()))
196168
169+ print ("Creating programs..." )
197170 for program_name in queries :
198171 # Create program
199172 full_name = get_full_name (folder , program_name )
200173 program_sql = table + all_queries [program_name ]
201- response = requests .put (f"{ api_url } /v0/programs/{ full_name } " , headers = headers , json = {
174+ requests .put (f"{ api_url } /v0/pipelines/{ full_name } " , headers = headers , json = {
175+ "name" : full_name ,
202176 "description" : f"Benchmark: { full_name } " ,
203- "code" : program_sql ,
204- "config" : {
205- "profile" : "optimized"
206- }
207- })
208- response .raise_for_status ()
209- program_version = response .json ()["version" ]
210-
211- # Compile program
212-
213- requests .post (f"{ api_url } /v0/programs/{ full_name } /compile" , headers = headers , json = {"version" : program_version }).raise_for_status ()
214- print (f"Compiling program(s)..." )
215- for program_name in queries :
216- full_name = get_full_name (folder , program_name )
217- while True :
218- status = requests .get (f"{ api_url } /v0/programs/{ full_name } " , headers = headers ).json ()["status" ]
219- print (f"Program { full_name } status: { status } " )
220- if status == "Success" :
221- break
222- elif status != "Pending" and status != "CompilingRust" and status != "CompilingSql" :
223- raise RuntimeError (f"Failed program compilation with status { status } " )
224- time .sleep (5 )
225-
226- input_connectors = [add_input_connector (s + suffix , s ) for s in inputs ]
227- if save_output :
228- output_connectors = {}
229- for name in output_connector_names :
230- output_connectors [name ] = add_output_connector (name , name )
231-
232- # Create pipelines
233- print ("Creating pipeline(s)..." )
234- for program_name in queries :
235- pipeline_name = get_full_name (folder , program_name )
236- if save_output :
237- connectors = input_connectors + [output_connectors [s ] for s in program_name .split (',' )]
238- else :
239- connectors = input_connectors
240- requests .put (f"{ api_url } /v0/pipelines/{ pipeline_name } " , headers = headers , json = {
241- "description" : "" ,
242- "config" : {
177+ "runtime_config" : {
243178 "workers" : cores ,
244179 "storage" : storage ,
245180 "min_storage_bytes" : min_storage_bytes ,
@@ -253,10 +188,22 @@ def main():
253188 # "storage_class": "..."
254189 }
255190 },
256- "program_name " : pipeline_name ,
257- "connectors " : connectors
191+ "program_config " : {} ,
192+ "program_code " : program_sql ,
258193 }).raise_for_status ()
259194
195+ print ("Compiling program(s)..." )
196+ for program_name in queries :
197+ full_name = get_full_name (folder , program_name )
198+ while True :
199+ status = requests .get (f"{ api_url } /v0/pipelines/{ full_name } " , headers = headers ).json ()["program_status" ]
200+ print (f"Program { full_name } status: { status } " )
201+ if status == "Success" :
202+ break
203+ elif status != "Pending" and status != "CompilingRust" and status != "CompilingSql" :
204+ raise RuntimeError (f"Failed program compilation with status { status } " )
205+ time .sleep (5 )
206+
260207 # Stop pipelines
261208 print ("Stopping pipeline(s)..." )
262209 for pipeline_name in queries :
0 commit comments