1818
1919import static com .google .common .base .Preconditions .checkArgument ;
2020import static com .google .gcloud .RetryHelper .runWithRetries ;
21+ import static java .util .concurrent .Executors .callable ;
2122
2223import com .google .api .services .bigquery .model .Dataset ;
2324import com .google .api .services .bigquery .model .GetQueryResultsResponse ;
4344import com .google .gcloud .bigquery .InsertAllRequest .RowToInsert ;
4445import com .google .gcloud .spi .BigQueryRpc ;
4546
47+ import java .io .ByteArrayInputStream ;
48+ import java .io .FileInputStream ;
49+ import java .io .IOException ;
50+ import java .io .InputStream ;
51+ import java .nio .channels .Channels ;
52+ import java .nio .channels .FileChannel ;
53+ import java .nio .channels .SeekableByteChannel ;
54+ import java .nio .file .StandardOpenOption ;
4655import java .util .List ;
4756import java .util .Map ;
4857import java .util .concurrent .Callable ;
58+ import java .util .concurrent .atomic .AtomicLong ;
4959
5060final class BigQueryImpl extends BaseService <BigQueryOptions > implements BigQuery {
5161
@@ -422,6 +432,74 @@ public Rows apply(RowToInsert rowToInsert) {
422432 bigQueryRpc .insertAll (tableId .dataset (), tableId .table (), requestPb ));
423433 }
424434
435+ @ Override
436+ public void insertAll (LoadConfiguration configuration , final byte [] content ) {
437+ Function <Long , InputStream > nextStream = new Function <Long , InputStream >() {
438+ @ Override
439+ public InputStream apply (Long startPos ) {
440+ return new ByteArrayInputStream (content , startPos .intValue (), content .length );
441+ }
442+ };
443+ insertAll (configuration , nextStream );
444+ }
445+
446+ @ Override
447+ public void insertAll (LoadConfiguration configuration , final SeekableByteChannel channel ) {
448+ Function <Long , InputStream > nextStream = new Function <Long , InputStream >() {
449+ @ Override
450+ public InputStream apply (Long startPos ) {
451+ try {
452+ channel .position (startPos );
453+ return Channels .newInputStream (channel );
454+ } catch (IOException e ) {
455+ BigQueryException exception = new BigQueryException (0 , e .getMessage (), false );
456+ exception .initCause (e );
457+ throw exception ;
458+ }
459+ }
460+ };
461+ insertAll (configuration , nextStream );
462+ }
463+
464+ private void insertAll (LoadConfiguration configuration ,
465+ final Function <Long , InputStream > nextStream ) throws BigQueryException {
466+ try {
467+ final String uploadId = open (configuration );
468+ final AtomicLong startPos = new AtomicLong (0L );
469+ runWithRetries (callable (new Runnable () {
470+ @ Override
471+ public void run () {
472+ try {
473+ bigQueryRpc .write (uploadId , nextStream .apply (startPos .get ()), startPos .get ());
474+ } catch (BigQueryException ex ) {
475+ BigQueryRpc .Tuple <Boolean , Long > uploadStatus = runWithRetries (
476+ new Callable <BigQueryRpc .Tuple <Boolean , Long >>() {
477+ @ Override
478+ public BigQueryRpc .Tuple <Boolean , Long > call () {
479+ return bigQueryRpc .status (uploadId );
480+ }
481+ }, options ().retryParams (), EXCEPTION_HANDLER );
482+ if (!uploadStatus .x ()) {
483+ startPos .set (uploadStatus .y () != null ? uploadStatus .y () + 1 : 0 );
484+ throw new BigQueryException (0 , "Resume Incomplete" , true );
485+ }
486+ }
487+ }
488+ }), options ().retryParams (), EXCEPTION_HANDLER );
489+ } catch (RetryHelper .RetryHelperException e ) {
490+ throw BigQueryException .translateAndThrow (e );
491+ }
492+ }
493+
494+ private String open (final LoadConfiguration configuration ) {
495+ return runWithRetries (new Callable <String >() {
496+ @ Override
497+ public String call () {
498+ return bigQueryRpc .open (setProjectId (configuration ).toPb ());
499+ }
500+ }, options ().retryParams (), EXCEPTION_HANDLER );
501+ }
502+
425503 @ Override
426504 public Page <List <FieldValue >> listTableData (String datasetId , String tableId ,
427505 TableDataListOption ... options ) throws BigQueryException {
@@ -698,8 +776,7 @@ public TableId apply(TableId tableId) {
698776 if (job instanceof LoadJobInfo ) {
699777 LoadJobInfo loadJob = (LoadJobInfo ) job ;
700778 LoadJobInfo .Builder loadBuilder = loadJob .toBuilder ();
701- loadBuilder .destinationTable (setProjectId (loadJob .destinationTable ()));
702- return loadBuilder .build ();
779+ return loadBuilder .configuration (setProjectId (loadJob .configuration ())).build ();
703780 }
704781 return job ;
705782 }
@@ -711,4 +788,10 @@ private QueryRequest setProjectId(QueryRequest request) {
711788 }
712789 return builder .build ();
713790 }
791+
792+ private LoadConfiguration setProjectId (LoadConfiguration configuration ) {
793+ LoadConfiguration .Builder builder = configuration .toBuilder ();
794+ builder .destinationTable (setProjectId (configuration .destinationTable ()));
795+ return builder .build ();
796+ }
714797}
0 commit comments