Skip to content

Commit 3b35a9e

Browse files
committed
Add whenDone method and CompletionCallback to Job and Operation (#1016)
1 parent f176d81 commit 3b35a9e

18 files changed

Lines changed: 852 additions & 330 deletions

File tree

README.md

Lines changed: 4 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -173,9 +173,7 @@ if (table == null) {
173173
}
174174
System.out.println("Loading data into table " + tableId);
175175
Job loadJob = table.load(FormatOptions.csv(), "gs://bucket/path");
176-
while (!loadJob.isDone()) {
177-
Thread.sleep(1000L);
178-
}
176+
loadJob = loadJob.waitFor();
179177
if (loadJob.status().error() != null) {
180178
System.out.println("Job completed with errors");
181179
} else {
@@ -203,7 +201,6 @@ import com.google.cloud.compute.Compute;
203201
import com.google.cloud.compute.ComputeOptions;
204202
import com.google.cloud.compute.Disk;
205203
import com.google.cloud.compute.DiskId;
206-
import com.google.cloud.compute.Operation;
207204
import com.google.cloud.compute.Snapshot;
208205
209206
Compute compute = ComputeOptions.defaultInstance().service();
@@ -212,12 +209,10 @@ Disk disk = compute.getDisk(diskId, Compute.DiskOption.fields());
212209
if (disk != null) {
213210
String snapshotName = "disk-name-snapshot";
214211
Operation operation = disk.createSnapshot(snapshotName);
215-
while (!operation.isDone()) {
216-
Thread.sleep(1000L);
217-
}
212+
operation = operation.waitFor();
218213
if (operation.errors() == null) {
219214
// use snapshot
220-
Snapshot snapshot = compute.getSnapshot("disk-name-snapshot");
215+
Snapshot snapshot = compute.getSnapshot(snapshotName);
221216
}
222217
}
223218
```
@@ -234,8 +229,6 @@ import com.google.cloud.compute.InstanceId;
234229
import com.google.cloud.compute.InstanceInfo;
235230
import com.google.cloud.compute.MachineTypeId;
236231
import com.google.cloud.compute.NetworkId;
237-
import com.google.cloud.compute.NetworkInterface;
238-
import com.google.cloud.compute.Operation;
239232
240233
Compute compute = ComputeOptions.defaultInstance().service();
241234
ImageId imageId = ImageId.of("debian-cloud", "debian-8-jessie-v20160329");
@@ -246,9 +239,7 @@ InstanceId instanceId = InstanceId.of("us-central1-a", "instance-name");
246239
MachineTypeId machineTypeId = MachineTypeId.of("us-central1-a", "n1-standard-1");
247240
Operation operation =
248241
compute.create(InstanceInfo.of(instanceId, machineTypeId, attachedDisk, networkInterface));
249-
while (!operation.isDone()) {
250-
Thread.sleep(1000L);
251-
}
242+
operation = operation.waitFor();
252243
if (operation.errors() == null) {
253244
// use instance
254245
Instance instance = compute.getInstance(instanceId);

gcloud-java-bigquery/src/main/java/com/google/cloud/bigquery/Job.java

Lines changed: 61 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,16 @@
1818

1919
import static com.google.common.base.Preconditions.checkNotNull;
2020

21+
import com.google.cloud.Clock;
22+
import com.google.cloud.WaitForOption;
23+
import com.google.cloud.WaitForOption.CheckingPeriod;
24+
import com.google.cloud.WaitForOption.Timeout;
25+
2126
import java.io.IOException;
2227
import java.io.ObjectInputStream;
2328
import java.util.Objects;
29+
import java.util.concurrent.TimeUnit;
30+
import java.util.concurrent.TimeoutException;
2431

2532
/**
2633
* A Google BigQuery Job.
@@ -143,6 +150,59 @@ public boolean isDone() {
143150
return job == null || job.status().state() == JobStatus.State.DONE;
144151
}
145152

153+
/**
154+
* Blocks until this job completes its execution, either failing or succeeding. This method
155+
* returns current job's latest information. If the job no longer exists, this method returns
156+
* {@code null}. By default, the job status is checked every 500 milliseconds, to configure this
157+
* value use {@link WaitForOption#checkEvery(long, TimeUnit)}. Use
158+
* {@link WaitForOption#timeout(long, TimeUnit)} to set the maximum time to wait.
159+
*
160+
* <p>Example usage of {@code waitFor()}:
161+
* <pre> {@code
162+
* Job completedJob = job.waitFor();
163+
* if (completedJob == null) {
164+
* // job no longer exists
165+
* } else if (completedJob.status().error() != null) {
166+
* // job failed, handle error
167+
* } else {
168+
* // job completed successfully
169+
* }}</pre>
170+
*
171+
* <p>Example usage of {@code waitFor()} with checking period and timeout:
172+
* <pre> {@code
173+
* Job completedJob = job.waitFor(WaitForOption.checkEvery(1, TimeUnit.SECONDS),
174+
* WaitForOption.timeout(60, TimeUnit.SECONDS));
175+
* if (completedJob == null) {
176+
* // job no longer exists
177+
* } else if (completedJob.status().error() != null) {
178+
* // job failed, handle error
179+
* } else {
180+
* // job completed successfully
181+
* }}</pre>
182+
*
183+
* @param waitOptions options to configure checking period and timeout
184+
* @throws BigQueryException upon failure
185+
* @throws InterruptedException if the current thread gets interrupted while waiting for the job
186+
* to complete
187+
* @throws TimeoutException if the timeout provided with
188+
* {@link WaitForOption#timeout(long, TimeUnit)} is exceeded. If no such option is provided
189+
* this exception is never thrown.
190+
*/
191+
public Job waitFor(WaitForOption... waitOptions) throws InterruptedException, TimeoutException {
192+
Timeout timeout = Timeout.getOrDefault(waitOptions);
193+
CheckingPeriod checkingPeriod = CheckingPeriod.getOrDefault(waitOptions);
194+
long timeoutMillis = timeout.timeoutMillis();
195+
Clock clock = options.clock();
196+
long startTime = clock.millis();
197+
while (!isDone()) {
198+
if (timeoutMillis != -1 && (clock.millis() - startTime) >= timeoutMillis) {
199+
throw new TimeoutException();
200+
}
201+
checkingPeriod.sleep();
202+
}
203+
return reload();
204+
}
205+
146206
/**
147207
* Fetches current job's latest information. Returns {@code null} if the job does not exist.
148208
*
@@ -151,7 +211,7 @@ public boolean isDone() {
151211
* @throws BigQueryException upon failure
152212
*/
153213
public Job reload(BigQuery.JobOption... options) {
154-
return bigquery.getJob(jobId().job(), options);
214+
return bigquery.getJob(jobId(), options);
155215
}
156216

157217
/**

gcloud-java-bigquery/src/main/java/com/google/cloud/bigquery/package-info.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,7 @@
3333
* }
3434
* System.out.println("Loading data into table " + tableId);
3535
* Job loadJob = table.load(FormatOptions.csv(), "gs://bucket/path");
36-
* while (!loadJob.isDone()) {
37-
* Thread.sleep(1000L);
38-
* }
36+
* loadJob = loadJob.waitFor();
3937
* if (loadJob.status().error() != null) {
4038
* System.out.println("Job completed with errors");
4139
* } else {

gcloud-java-bigquery/src/test/java/com/google/cloud/bigquery/JobTest.java

Lines changed: 114 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,18 @@
2727
import static org.junit.Assert.assertSame;
2828
import static org.junit.Assert.assertTrue;
2929

30+
import com.google.cloud.Clock;
31+
import com.google.cloud.WaitForOption;
3032
import com.google.cloud.bigquery.JobStatistics.CopyStatistics;
3133

34+
import org.easymock.EasyMock;
3235
import org.junit.After;
36+
import org.junit.Rule;
3337
import org.junit.Test;
38+
import org.junit.rules.ExpectedException;
39+
40+
import java.util.concurrent.TimeUnit;
41+
import java.util.concurrent.TimeoutException;
3442

3543
public class JobTest {
3644

@@ -66,6 +74,9 @@ public class JobTest {
6674
private Job expectedJob;
6775
private Job job;
6876

77+
@Rule
78+
public final ExpectedException thrown = ExpectedException.none();
79+
6980
private void initializeExpectedJob(int optionsCalls) {
7081
expect(serviceMockReturnsOptions.options()).andReturn(mockOptions).times(optionsCalls);
7182
replay(serviceMockReturnsOptions);
@@ -177,13 +188,113 @@ public void testIsDone_NotExists() throws Exception {
177188
assertTrue(job.isDone());
178189
}
179190

191+
@Test
192+
public void testWaitFor() throws InterruptedException, TimeoutException {
193+
initializeExpectedJob(2);
194+
BigQuery.JobOption[] expectedOptions = {BigQuery.JobOption.fields(BigQuery.JobField.STATUS)};
195+
JobStatus status = createStrictMock(JobStatus.class);
196+
expect(status.state()).andReturn(JobStatus.State.DONE);
197+
expect(bigquery.options()).andReturn(mockOptions);
198+
expect(mockOptions.clock()).andReturn(Clock.defaultClock());
199+
Job completedJob = expectedJob.toBuilder().status(status).build();
200+
expect(bigquery.getJob(JOB_INFO.jobId(), expectedOptions)).andReturn(completedJob);
201+
expect(bigquery.getJob(JOB_INFO.jobId())).andReturn(completedJob);
202+
replay(status, bigquery, mockOptions);
203+
initializeJob();
204+
assertSame(completedJob, job.waitFor());
205+
verify(status, mockOptions);
206+
}
207+
208+
@Test
209+
public void testWaitFor_Null() throws InterruptedException, TimeoutException {
210+
initializeExpectedJob(1);
211+
BigQuery.JobOption[] expectedOptions = {BigQuery.JobOption.fields(BigQuery.JobField.STATUS)};
212+
expect(bigquery.options()).andReturn(mockOptions);
213+
expect(mockOptions.clock()).andReturn(Clock.defaultClock());
214+
expect(bigquery.getJob(JOB_INFO.jobId(), expectedOptions)).andReturn(null);
215+
expect(bigquery.getJob(JOB_INFO.jobId())).andReturn(null);
216+
replay(bigquery, mockOptions);
217+
initializeJob();
218+
assertNull(job.waitFor());
219+
verify(mockOptions);
220+
}
221+
222+
@Test
223+
public void testWaitForWithCheckingPeriod() throws InterruptedException, TimeoutException {
224+
initializeExpectedJob(3);
225+
BigQuery.JobOption[] expectedOptions = {BigQuery.JobOption.fields(BigQuery.JobField.STATUS)};
226+
TimeUnit timeUnit = createStrictMock(TimeUnit.class);
227+
timeUnit.sleep(42);
228+
EasyMock.expectLastCall();
229+
JobStatus status = createStrictMock(JobStatus.class);
230+
expect(status.state()).andReturn(JobStatus.State.RUNNING);
231+
expect(status.state()).andReturn(JobStatus.State.DONE);
232+
expect(bigquery.options()).andReturn(mockOptions);
233+
expect(mockOptions.clock()).andReturn(Clock.defaultClock());
234+
Job runningJob = expectedJob.toBuilder().status(status).build();
235+
Job completedJob = expectedJob.toBuilder().status(status).build();
236+
expect(bigquery.getJob(JOB_INFO.jobId(), expectedOptions)).andReturn(runningJob);
237+
expect(bigquery.getJob(JOB_INFO.jobId(), expectedOptions)).andReturn(completedJob);
238+
expect(bigquery.getJob(JOB_INFO.jobId())).andReturn(completedJob);
239+
replay(status, bigquery, timeUnit, mockOptions);
240+
initializeJob();
241+
assertSame(completedJob, job.waitFor(WaitForOption.checkEvery(42, timeUnit)));
242+
verify(status, timeUnit, mockOptions);
243+
}
244+
245+
@Test
246+
public void testWaitForWithCheckingPeriod_Null() throws InterruptedException, TimeoutException {
247+
initializeExpectedJob(2);
248+
BigQuery.JobOption[] expectedOptions = {BigQuery.JobOption.fields(BigQuery.JobField.STATUS)};
249+
TimeUnit timeUnit = createStrictMock(TimeUnit.class);
250+
timeUnit.sleep(42);
251+
EasyMock.expectLastCall();
252+
expect(bigquery.options()).andReturn(mockOptions);
253+
expect(mockOptions.clock()).andReturn(Clock.defaultClock());
254+
Job runningJob = expectedJob.toBuilder().status(new JobStatus(JobStatus.State.RUNNING)).build();
255+
expect(bigquery.getJob(JOB_INFO.jobId(), expectedOptions)).andReturn(runningJob);
256+
expect(bigquery.getJob(JOB_INFO.jobId(), expectedOptions)).andReturn(null);
257+
expect(bigquery.getJob(JOB_INFO.jobId())).andReturn(null);
258+
replay(bigquery, timeUnit, mockOptions);
259+
initializeJob();
260+
assertNull(job.waitFor(WaitForOption.checkEvery(42, timeUnit)));
261+
verify(bigquery, timeUnit, mockOptions);
262+
}
263+
264+
@Test
265+
public void testWaitForWithTimeout() throws InterruptedException, TimeoutException {
266+
initializeExpectedJob(2);
267+
BigQuery.JobOption[] expectedOptions = {BigQuery.JobOption.fields(BigQuery.JobField.STATUS)};
268+
TimeUnit timeUnit = createStrictMock(TimeUnit.class);
269+
timeUnit.sleep(1);
270+
EasyMock.expectLastCall();
271+
Clock clock = createStrictMock(Clock.class);
272+
expect(clock.millis()).andReturn(0L);
273+
expect(clock.millis()).andReturn(1L);
274+
expect(clock.millis()).andReturn(3L);
275+
JobStatus status = createStrictMock(JobStatus.class);
276+
expect(status.state()).andReturn(JobStatus.State.RUNNING);
277+
expect(status.state()).andReturn(JobStatus.State.RUNNING);
278+
expect(bigquery.options()).andReturn(mockOptions);
279+
expect(mockOptions.clock()).andReturn(clock);
280+
Job runningJob = expectedJob.toBuilder().status(status).build();
281+
expect(bigquery.getJob(JOB_INFO.jobId(), expectedOptions)).andReturn(runningJob);
282+
expect(bigquery.getJob(JOB_INFO.jobId(), expectedOptions)).andReturn(runningJob);
283+
replay(status, bigquery, timeUnit, clock, mockOptions);
284+
initializeJob();
285+
thrown.expect(TimeoutException.class);
286+
job.waitFor(WaitForOption.checkEvery(1, timeUnit),
287+
WaitForOption.timeout(3, TimeUnit.MILLISECONDS));
288+
verify(status, timeUnit, clock, mockOptions);
289+
}
290+
180291
@Test
181292
public void testReload() throws Exception {
182293
initializeExpectedJob(4);
183294
JobInfo updatedInfo = JOB_INFO.toBuilder().etag("etag").build();
184295
Job expectedJob = new Job(serviceMockReturnsOptions, new JobInfo.BuilderImpl(updatedInfo));
185296
expect(bigquery.options()).andReturn(mockOptions);
186-
expect(bigquery.getJob(JOB_INFO.jobId().job())).andReturn(expectedJob);
297+
expect(bigquery.getJob(JOB_INFO.jobId())).andReturn(expectedJob);
187298
replay(bigquery);
188299
initializeJob();
189300
Job updatedJob = job.reload();
@@ -194,7 +305,7 @@ public void testReload() throws Exception {
194305
public void testReloadNull() throws Exception {
195306
initializeExpectedJob(1);
196307
expect(bigquery.options()).andReturn(mockOptions);
197-
expect(bigquery.getJob(JOB_INFO.jobId().job())).andReturn(null);
308+
expect(bigquery.getJob(JOB_INFO.jobId())).andReturn(null);
198309
replay(bigquery);
199310
initializeJob();
200311
assertNull(job.reload());
@@ -206,7 +317,7 @@ public void testReloadWithOptions() throws Exception {
206317
JobInfo updatedInfo = JOB_INFO.toBuilder().etag("etag").build();
207318
Job expectedJob = new Job(serviceMockReturnsOptions, new JobInfo.BuilderImpl(updatedInfo));
208319
expect(bigquery.options()).andReturn(mockOptions);
209-
expect(bigquery.getJob(JOB_INFO.jobId().job(), BigQuery.JobOption.fields()))
320+
expect(bigquery.getJob(JOB_INFO.jobId(), BigQuery.JobOption.fields()))
210321
.andReturn(expectedJob);
211322
replay(bigquery);
212323
initializeJob();

0 commit comments

Comments
 (0)