Skip to content

Commit c9b6a1c

Browse files
committed
Add support for resumable uploads
- Add LoadConfiguration class for data needed by load jobs and resumable uploads - Update LoadJobInfo and related classes to use LoadConfiguration instead - Add insertAll(LoadConfiguration, byte[]) method - Add insertAll(LoadConfiguration. SeekableByteChannel) method - Add Table.insert methods that use resumable upload - Add unit and integration tests
1 parent 824c46c commit c9b6a1c

14 files changed

Lines changed: 1071 additions & 395 deletions

File tree

gcloud-java-bigquery/src/main/java/com/google/gcloud/bigquery/BigQuery.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import com.google.gcloud.Service;
2828
import com.google.gcloud.spi.BigQueryRpc;
2929

30+
import java.nio.channels.SeekableByteChannel;
3031
import java.util.List;
3132
import java.util.Set;
3233

@@ -590,6 +591,33 @@ Page<BaseTableInfo> listTables(DatasetId datasetId, TableListOption... options)
590591
*/
591592
InsertAllResponse insertAll(InsertAllRequest request) throws BigQueryException;
592593

594+
/**
595+
* Sends a resumable insert request given a seekable channel containing the rows to be inserted.
596+
* This method does not close the channel so you should take care of closing it.
597+
*
598+
* <p>Example usage of inserting data from a local file:
599+
* <pre> {@code
600+
* LoadConfiguration config = LoadConfiguration.of(TableId.of("my_dataset_id", "my_table_id"));
601+
* try(FileChannel channel = FileChannel.open(Paths.get("/path/to/file"))) {
602+
* bigquery.insertAll(config, channel);
603+
* }}</pre>
604+
*
605+
* @throws BigQueryException upon failure
606+
* @see <a href="https://cloud.google.com/bigquery/loading-data-post-request#resumable">Resumable
607+
* Upload</a>
608+
*/
609+
void insertAll(LoadConfiguration configuration, SeekableByteChannel channel)
610+
throws BigQueryException;
611+
612+
/**
613+
* Sends a resumable insert request given a byte array containing the rows to be inserted.
614+
*
615+
* @throws BigQueryException upon failure
616+
* @see <a href="https://cloud.google.com/bigquery/loading-data-post-request#resumable">Resumable
617+
* Upload</a>
618+
*/
619+
void insertAll(LoadConfiguration configuration, byte[] content) throws BigQueryException;
620+
593621
/**
594622
* Lists the table's rows.
595623
*

gcloud-java-bigquery/src/main/java/com/google/gcloud/bigquery/BigQueryImpl.java

Lines changed: 85 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import static com.google.common.base.Preconditions.checkArgument;
2020
import static com.google.gcloud.RetryHelper.runWithRetries;
21+
import static java.util.concurrent.Executors.callable;
2122

2223
import com.google.api.services.bigquery.model.Dataset;
2324
import com.google.api.services.bigquery.model.GetQueryResultsResponse;
@@ -43,9 +44,18 @@
4344
import com.google.gcloud.bigquery.InsertAllRequest.RowToInsert;
4445
import com.google.gcloud.spi.BigQueryRpc;
4546

47+
import java.io.ByteArrayInputStream;
48+
import java.io.FileInputStream;
49+
import java.io.IOException;
50+
import java.io.InputStream;
51+
import java.nio.channels.Channels;
52+
import java.nio.channels.FileChannel;
53+
import java.nio.channels.SeekableByteChannel;
54+
import java.nio.file.StandardOpenOption;
4655
import java.util.List;
4756
import java.util.Map;
4857
import java.util.concurrent.Callable;
58+
import java.util.concurrent.atomic.AtomicLong;
4959

5060
final class BigQueryImpl extends BaseService<BigQueryOptions> implements BigQuery {
5161

@@ -422,6 +432,74 @@ public Rows apply(RowToInsert rowToInsert) {
422432
bigQueryRpc.insertAll(tableId.dataset(), tableId.table(), requestPb));
423433
}
424434

435+
@Override
436+
public void insertAll(LoadConfiguration configuration, final byte[] content) {
437+
Function<Long, InputStream> nextStream = new Function<Long, InputStream>() {
438+
@Override
439+
public InputStream apply(Long startPos) {
440+
return new ByteArrayInputStream(content, startPos.intValue(), content.length);
441+
}
442+
};
443+
insertAll(configuration, nextStream);
444+
}
445+
446+
@Override
447+
public void insertAll(LoadConfiguration configuration, final SeekableByteChannel channel) {
448+
Function<Long, InputStream> nextStream = new Function<Long, InputStream>() {
449+
@Override
450+
public InputStream apply(Long startPos) {
451+
try {
452+
channel.position(startPos);
453+
return Channels.newInputStream(channel);
454+
} catch (IOException e) {
455+
BigQueryException exception = new BigQueryException(0, e.getMessage(), false);
456+
exception.initCause(e);
457+
throw exception;
458+
}
459+
}
460+
};
461+
insertAll(configuration, nextStream);
462+
}
463+
464+
private void insertAll(LoadConfiguration configuration,
465+
final Function<Long, InputStream> nextStream) throws BigQueryException {
466+
try {
467+
final String uploadId = open(configuration);
468+
final AtomicLong startPos = new AtomicLong(0L);
469+
runWithRetries(callable(new Runnable() {
470+
@Override
471+
public void run() {
472+
try {
473+
bigQueryRpc.write(uploadId, nextStream.apply(startPos.get()), startPos.get());
474+
} catch (BigQueryException ex) {
475+
BigQueryRpc.Tuple<Boolean, Long> uploadStatus = runWithRetries(
476+
new Callable<BigQueryRpc.Tuple<Boolean, Long>>() {
477+
@Override
478+
public BigQueryRpc.Tuple<Boolean, Long> call() {
479+
return bigQueryRpc.status(uploadId);
480+
}
481+
}, options().retryParams(), EXCEPTION_HANDLER);
482+
if (!uploadStatus.x()) {
483+
startPos.set(uploadStatus.y() != null ? uploadStatus.y() + 1 : 0);
484+
throw new BigQueryException(0, "Resume Incomplete", true);
485+
}
486+
}
487+
}
488+
}), options().retryParams(), EXCEPTION_HANDLER);
489+
} catch (RetryHelper.RetryHelperException e) {
490+
throw BigQueryException.translateAndThrow(e);
491+
}
492+
}
493+
494+
private String open(final LoadConfiguration configuration) {
495+
return runWithRetries(new Callable<String>() {
496+
@Override
497+
public String call() {
498+
return bigQueryRpc.open(setProjectId(configuration).toPb());
499+
}
500+
}, options().retryParams(), EXCEPTION_HANDLER);
501+
}
502+
425503
@Override
426504
public Page<List<FieldValue>> listTableData(String datasetId, String tableId,
427505
TableDataListOption... options) throws BigQueryException {
@@ -698,8 +776,7 @@ public TableId apply(TableId tableId) {
698776
if (job instanceof LoadJobInfo) {
699777
LoadJobInfo loadJob = (LoadJobInfo) job;
700778
LoadJobInfo.Builder loadBuilder = loadJob.toBuilder();
701-
loadBuilder.destinationTable(setProjectId(loadJob.destinationTable()));
702-
return loadBuilder.build();
779+
return loadBuilder.configuration(setProjectId(loadJob.configuration())).build();
703780
}
704781
return job;
705782
}
@@ -711,4 +788,10 @@ private QueryRequest setProjectId(QueryRequest request) {
711788
}
712789
return builder.build();
713790
}
791+
792+
private LoadConfiguration setProjectId(LoadConfiguration configuration) {
793+
LoadConfiguration.Builder builder = configuration.toBuilder();
794+
builder.destinationTable(setProjectId(configuration.destinationTable()));
795+
return builder.build();
796+
}
714797
}

0 commit comments

Comments
 (0)