File tree Expand file tree Collapse file tree
examples/java/src/main/java/org/apache/beam/examples/complete/game/utils Expand file tree Collapse file tree Original file line number Diff line number Diff line change 2828import org .apache .beam .sdk .io .gcp .bigquery .BigQueryIO ;
2929import org .apache .beam .sdk .io .gcp .bigquery .BigQueryIO .Write .CreateDisposition ;
3030import org .apache .beam .sdk .io .gcp .bigquery .BigQueryIO .Write .WriteDisposition ;
31+ import org .apache .beam .sdk .io .gcp .bigquery .InsertRetryPolicy ;
3132import org .apache .beam .sdk .transforms .DoFn ;
3233import org .apache .beam .sdk .transforms .PTransform ;
3334import org .apache .beam .sdk .transforms .ParDo ;
@@ -129,7 +130,8 @@ public PDone expand(PCollection<InputT> teamAndScore) {
129130 .to (getTable (projectId , datasetId , tableName ))
130131 .withSchema (getSchema ())
131132 .withCreateDisposition (CreateDisposition .CREATE_IF_NEEDED )
132- .withWriteDisposition (WriteDisposition .WRITE_APPEND ));
133+ .withWriteDisposition (WriteDisposition .WRITE_APPEND )
134+ .withFailedInsertRetryPolicy (InsertRetryPolicy .retryTransientErrors ()));
133135 return PDone .in (teamAndScore .getPipeline ());
134136 }
135137
Original file line number Diff line number Diff line change @@ -98,15 +98,20 @@ class LeaderBoardRunner {
9898 def isSuccess = false
9999 String query_result = " "
100100 while ((System . currentTimeMillis() - startTime) / 60000 < mobileGamingCommands. EXECUTION_TIMEOUT_IN_MINUTES ) {
101- tables = t. run " bq query SELECT table_id FROM ${ t.bqDataset()} .__TABLES_SUMMARY__"
102- if (tables. contains(" leaderboard_${ runner} _user" ) && tables. contains(" leaderboard_${ runner} _team" )) {
103- query_result = t. run """ bq query --batch "SELECT user FROM [${ t.gcpProject()} :${
104- t.bqDataset()
105- } .leaderboard_${ runner} _user] LIMIT 10\" """
106- if (t. seeAnyOf(mobileGamingCommands. COLORS , query_result)) {
107- isSuccess = true
108- break
101+ try {
102+ tables = t. run " bq query --use_legacy_sql=false SELECT table_name FROM ${ t.bqDataset()} .INFORMATION_SCHEMA.TABLES"
103+ if (tables. contains(" leaderboard_${ runner} _user" ) && tables. contains(" leaderboard_${ runner} _team" )) {
104+ query_result = t. run """ bq query --batch "SELECT user FROM [${ t.gcpProject()} :${
105+ t.bqDataset()
106+ } .leaderboard_${ runner} _user] LIMIT 10\" """
107+ if (t. seeAnyOf(mobileGamingCommands. COLORS , query_result)) {
108+ isSuccess = true
109+ break
110+ }
109111 }
112+ } catch (Exception e) {
113+ println " Warning: Exception while checking tables: ${ e.message} "
114+ println " Retrying..."
110115 }
111116 println " Waiting for pipeline to produce more results..."
112117 sleep(60000 ) // wait for 1 min
Original file line number Diff line number Diff line change @@ -87,13 +87,18 @@ def startTime = System.currentTimeMillis()
8787def isSuccess = false
8888String query_result = " "
8989while ((System . currentTimeMillis() - startTime)/ 60000 < mobileGamingCommands. EXECUTION_TIMEOUT_IN_MINUTES ) {
90- tables = t. run " bq query SELECT table_id FROM ${ t.bqDataset()} .__TABLES_SUMMARY__"
91- if (tables. contains(" leaderboard_${ runner} _user" ) && tables. contains(" leaderboard_${ runner} _team" )){
92- query_result = t. run """ bq query --batch "SELECT user FROM [${ t.gcpProject()} :${ t.bqDataset()} .leaderboard_${ runner} _user] LIMIT 10\" """
93- if (t. seeAnyOf(mobileGamingCommands. COLORS , query_result)){
94- isSuccess = true
95- break
90+ try {
91+ tables = t. run " bq query --use_legacy_sql=false SELECT table_name FROM ${ t.bqDataset()} .INFORMATION_SCHEMA.TABLES"
92+ if (tables. contains(" leaderboard_${ runner} _user" ) && tables. contains(" leaderboard_${ runner} _team" )) {
93+ query_result = t. run """ bq query --batch "SELECT user FROM [${ t.gcpProject()} .${ t.bqDataset()} .leaderboard_${ runner} _user] LIMIT 10\" """
94+ if (t. seeAnyOf(mobileGamingCommands. COLORS , query_result)){
95+ isSuccess = true
96+ break
97+ }
9698 }
99+ } catch (Exception e) {
100+ println " Warning: Exception while checking tables: ${ e.message} "
101+ println " Retrying..."
97102 }
98103 println " Waiting for pipeline to produce more results..."
99104 sleep(60000 ) // wait for 1 min
You can’t perform that action at this time.
0 commit comments