1313 */
1414package io .trino .plugin .bigquery ;
1515
16- import com .google .cloud .BaseServiceException ;
17- import com .google .cloud .bigquery .BigQueryException ;
18- import com .google .cloud .bigquery .Job ;
19- import com .google .cloud .bigquery .JobInfo ;
20- import com .google .cloud .bigquery .QueryJobConfiguration ;
21- import com .google .cloud .bigquery .Table ;
2216import com .google .cloud .bigquery .TableDefinition ;
2317import com .google .cloud .bigquery .TableId ;
2418import com .google .cloud .bigquery .TableInfo ;
3327
3428import java .util .List ;
3529import java .util .Optional ;
36- import java .util .concurrent .Callable ;
3730import java .util .concurrent .ExecutionException ;
3831import java .util .concurrent .TimeUnit ;
3932
4033import static io .trino .plugin .bigquery .BigQueryErrorCode .BIGQUERY_VIEW_DESTINATION_TABLE_CREATION_FAILED ;
41- import static io .trino .plugin .bigquery .BigQueryUtil .convertToBigQueryException ;
4234import static io .trino .spi .StandardErrorCode .NOT_SUPPORTED ;
4335import static java .lang .String .format ;
44- import static java .util .Objects .requireNonNull ;
4536import static java .util .stream .Collectors .toList ;
4637
4738// A helper class, also handles view materialization
@@ -130,7 +121,7 @@ private TableInfo getActualTable(
130121 String query = bigQueryClient .selectSql (table .getTableId (), requiredColumns );
131122 log .debug ("query is %s" , query );
132123 try {
133- return destinationTableCache .get (query , new DestinationTableBuilder (bigQueryClient , config , query , table .getTableId ()));
124+ return destinationTableCache .get (query , new BigQueryClient . DestinationTableBuilder (bigQueryClient , config , query , table .getTableId ()));
134125 }
135126 catch (ExecutionException e ) {
136127 throw new TrinoException (BIGQUERY_VIEW_DESTINATION_TABLE_CREATION_FAILED , "Error creating destination table" , e );
@@ -142,63 +133,4 @@ private TableInfo getActualTable(
142133 tableType , table .getTableId ().getDataset (), table .getTableId ().getTable ()));
143134 }
144135 }
145-
146- private static class DestinationTableBuilder
147- implements Callable <TableInfo >
148- {
149- private final BigQueryClient bigQueryClient ;
150- private final ReadSessionCreatorConfig config ;
151- private final String query ;
152- private final TableId table ;
153-
154- DestinationTableBuilder (BigQueryClient bigQueryClient , ReadSessionCreatorConfig config , String query , TableId table )
155- {
156- this .bigQueryClient = requireNonNull (bigQueryClient , "bigQueryClient is null" );
157- this .config = requireNonNull (config , "config is null" );
158- this .query = requireNonNull (query , "query is null" );
159- this .table = requireNonNull (table , "table is null" );
160- }
161-
162- @ Override
163- public TableInfo call ()
164- {
165- return createTableFromQuery ();
166- }
167-
168- TableInfo createTableFromQuery ()
169- {
170- TableId destinationTable = bigQueryClient .createDestinationTable (table );
171- log .debug ("destinationTable is %s" , destinationTable );
172- JobInfo jobInfo = JobInfo .of (
173- QueryJobConfiguration
174- .newBuilder (query )
175- .setDestinationTable (destinationTable )
176- .build ());
177- log .debug ("running query %s" , jobInfo );
178- Job job = waitForJob (bigQueryClient .create (jobInfo ));
179- log .debug ("job has finished. %s" , job );
180- if (job .getStatus ().getError () != null ) {
181- throw convertToBigQueryException (job .getStatus ().getError ());
182- }
183- // add expiration time to the table
184- TableInfo createdTable = bigQueryClient .getTable (destinationTable );
185- long expirationTime = createdTable .getCreationTime () +
186- TimeUnit .HOURS .toMillis (config .viewExpirationTimeInHours );
187- Table updatedTable = bigQueryClient .update (createdTable .toBuilder ()
188- .setExpirationTime (expirationTime )
189- .build ());
190- return updatedTable ;
191- }
192-
193- Job waitForJob (Job job )
194- {
195- try {
196- return job .waitFor ();
197- }
198- catch (InterruptedException e ) {
199- Thread .currentThread ().interrupt ();
200- throw new BigQueryException (BaseServiceException .UNKNOWN_CODE , format ("Job %s has been interrupted" , job .getJobId ()), e );
201- }
202- }
203- }
204136}
0 commit comments