@@ -66,16 +66,53 @@ class LeaderBoardRunner {
6666 def run (runner , TestScripts t , MobileGamingCommands mobileGamingCommands , boolean useStreamingEngine ) {
6767 t. intent(" Running: LeaderBoard example on DataflowRunner" +
6868 (useStreamingEngine ? " with Streaming Engine" : " " ))
69- t. run(" bq rm -f -t ${ t.bqDataset()} .leaderboard_DataflowRunner_user" )
70- t. run(" bq rm -f -t ${ t.bqDataset()} .leaderboard_DataflowRunner_team" )
69+
70+ def dataset = t. bqDataset()
71+ def userTable = " leaderboard_DataflowRunner_user"
72+ def teamTable = " leaderboard_DataflowRunner_team"
73+ def userSchema = [
74+ " user:STRING" ,
75+ " total_score:INTEGER" ,
76+ " processing_time:STRING"
77+ ]. join(" ," )
78+ def teamSchema = [
79+ " team:STRING" ,
80+ " total_score:INTEGER" ,
81+ " window_start:STRING" ,
82+ " processing_time:STRING" ,
83+ " timing:STRING"
84+ ]. join(" ," )
85+
86+ // Remove existing tables if they exist
87+ String tables = t. run(" bq query --use_legacy_sql=false 'SELECT table_name FROM ${ dataset} .INFORMATION_SCHEMA.TABLES'" )
88+
89+ if (tables. contains(userTable)) {
90+ t. run(" bq rm -f -t ${ dataset} .${ userTable} " )
91+ }
92+ if (tables. contains(teamTable)) {
93+ t. run(" bq rm -f -t ${ dataset} .${ teamTable} " )
94+ }
95+
7196 // It will take couple seconds to clean up tables.
7297 // This loop makes sure tables are completely deleted before running the pipeline
73- String tables = " "
74- while ({
98+ tables = t . run( " bq query --use_legacy_sql=false 'SELECT table_name FROM ${ dataset } .INFORMATION_SCHEMA.TABLES' " )
99+ while (tables . contains(userTable) || tables . contains(teamTable)) {
75100 sleep(3000 )
76- tables = t. run(" bq query SELECT table_id FROM ${ t.bqDataset()} .__TABLES_SUMMARY__" )
77- tables. contains(" leaderboard_${} _user" ) || tables. contains(" leaderboard_${ runner} _team" )
78- }());
101+ tables = t. run(" bq query --use_legacy_sql=false 'SELECT table_name FROM ${ dataset} .INFORMATION_SCHEMA.TABLES'" )
102+ }
103+
104+ t. intent(" Creating table: ${ userTable} " )
105+ t. run(" bq mk --table ${ dataset} .${ userTable} ${ userSchema} " )
106+ t. intent(" Creating table: ${ teamTable} " )
107+ t. run(" bq mk --table ${ dataset} .${ teamTable} ${ teamSchema} " )
108+
109+ // Verify that the tables have been created successfully
110+ tables = t. run(" bq query --use_legacy_sql=false 'SELECT table_name FROM ${ dataset} .INFORMATION_SCHEMA.TABLES'" )
111+ while (! tables. contains(userTable) || ! tables. contains(teamTable)) {
112+ sleep(3000 )
113+ tables = t. run(" bq query --use_legacy_sql=false 'SELECT table_name FROM ${ dataset} .INFORMATION_SCHEMA.TABLES'" )
114+ }
115+ println " Tables ${ userTable} and ${ teamTable} created successfully."
79116
80117 def InjectorThread = Thread . start() {
81118 t. run(mobileGamingCommands. createInjectorCommand())
@@ -99,11 +136,9 @@ class LeaderBoardRunner {
99136 String query_result = " "
100137 while ((System . currentTimeMillis() - startTime) / 60000 < mobileGamingCommands. EXECUTION_TIMEOUT_IN_MINUTES ) {
101138 try {
102- tables = t. run " bq query --use_legacy_sql=false SELECT table_name FROM ${ t.bqDataset()} .INFORMATION_SCHEMA.TABLES"
103- if (tables. contains(" leaderboard_${ runner} _user" ) && tables. contains(" leaderboard_${ runner} _team" )) {
104- query_result = t. run """ bq query --batch "SELECT user FROM [${ t.gcpProject()} :${
105- t.bqDataset()
106- } .leaderboard_${ runner} _user] LIMIT 10\" """
139+ tables = t. run " bq query --use_legacy_sql=false SELECT table_name FROM ${ dataset} .INFORMATION_SCHEMA.TABLES"
140+ if (tables. contains(userTable) && tables. contains(teamTable)) {
141+ query_result = t. run """ bq query --batch "SELECT user FROM [${ dataset} .${ userTable} ] LIMIT 10\" """
107142 if (t. seeAnyOf(mobileGamingCommands. COLORS , query_result)) {
108143 isSuccess = true
109144 break
0 commit comments