Skip to content

Commit e5af988

Browse files
committed
Add vararg options (WaitForOption) to waitFor method
1 parent 197ddeb commit e5af988

File tree

13 files changed

+535
-93
lines changed

13 files changed

+535
-93
lines changed

gcloud-java-bigquery/src/main/java/com/google/cloud/bigquery/Job.java

Lines changed: 34 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,21 @@
1616

1717
package com.google.cloud.bigquery;
1818

19+
import static com.google.cloud.WaitForOption.Option.CHECKING_PERIOD;
20+
import static com.google.cloud.WaitForOption.Option.TIMEOUT;
21+
import static com.google.common.base.MoreObjects.firstNonNull;
1922
import static com.google.common.base.Preconditions.checkNotNull;
2023

24+
import com.google.cloud.Clock;
25+
import com.google.cloud.WaitForOption;
26+
import com.google.cloud.WaitForOption.CheckingPeriod;
27+
2128
import java.io.IOException;
2229
import java.io.ObjectInputStream;
30+
import java.util.Map;
2331
import java.util.Objects;
2432
import java.util.concurrent.TimeUnit;
33+
import java.util.concurrent.TimeoutException;
2534

2635
/**
2736
* A Google BigQuery Job.
@@ -145,9 +154,13 @@ public boolean isDone() {
145154
}
146155

147156
/**
148-
* Blocks until this job completes its execution, either failing or succeeding. The job status is
149-
* checked every 500 milliseconds. This method returns current job's latest information. If the
150-
* job no longer exists, this method returns {@code null}.
157+
* Blocks until this job completes its execution, either failing or succeeding. This method
158+
* returns current job's latest information. If the job no longer exists, this method returns
159+
* {@code null}. By default, the job status is checked every 500 milliseconds, to configure this
160+
* value use {@link WaitForOption#checkEvery(long, TimeUnit)}. Use
161+
* {@link WaitForOption#timeout(long, TimeUnit)} to set the maximum time to wait.
162+
*
163+
* <p>Example usage of {@code waitFor()}:
151164
* <pre> {@code
152165
* Job completedJob = job.waitFor();
153166
* if (completedJob == null) {
@@ -158,21 +171,10 @@ public boolean isDone() {
158171
* // job completed successfully
159172
* }}</pre>
160173
*
161-
* @throws BigQueryException upon failure
162-
* @throws InterruptedException if the current thread gets interrupted while waiting for the job
163-
* to complete
164-
*/
165-
public Job waitFor() throws InterruptedException {
166-
return waitFor(500, TimeUnit.MILLISECONDS);
167-
}
168-
169-
/**
170-
* Blocks until this job completes its execution, either failing or succeeding. The
171-
* {@code checkEvery} and {@code unit} parameters determine how often the job's status is checked.
172-
* This method returns current job's latest information. If the job no longer exists, this method
173-
* returns {@code null}.
174+
* <p>Example usage of {@code waitFor()} with checking period and timeout:
174175
* <pre> {@code
175-
* Job completedJob = job.waitFor(1, TimeUnit.SECONDS);
176+
* Job completedJob = job.waitFor(WaitForOption.checkEvery(1, TimeUnit.SECONDS),
177+
* WaitForOption.timeout(60, TimeUnit.SECONDS));
176178
* if (completedJob == null) {
177179
* // job no longer exists
178180
* } else if (completedJob.status().error() != null) {
@@ -181,13 +183,26 @@ public Job waitFor() throws InterruptedException {
181183
* // job completed successfully
182184
* }}</pre>
183185
*
186+
* @param waitOptions options to configure checking period and timeout
184187
* @throws BigQueryException upon failure
185188
* @throws InterruptedException if the current thread gets interrupted while waiting for the job
186189
* to complete
190+
* @throws TimeoutException if the timeout provided with
191+
* {@link WaitForOption#timeout(long, TimeUnit)} is exceeded. If no such option is provided
192+
* this exception is never thrown.
187193
*/
188-
public Job waitFor(int checkEvery, TimeUnit unit) throws InterruptedException {
194+
public Job waitFor(WaitForOption... waitOptions) throws InterruptedException, TimeoutException {
195+
Map<WaitForOption.Option, ?> optionMap = WaitForOption.asMap(waitOptions);
196+
CheckingPeriod checkingPeriod = firstNonNull(CHECKING_PERIOD.getCheckingPeriod(optionMap),
197+
CheckingPeriod.defaultInstance());
198+
long timeout = firstNonNull(TIMEOUT.getLong(optionMap), -1L);
199+
Clock clock = options.clock();
200+
long startTime = clock.millis();
189201
while (!isDone()) {
190-
unit.sleep(checkEvery);
202+
if (timeout != -1 && (clock.millis() - startTime) >= timeout) {
203+
throw new TimeoutException();
204+
}
205+
checkingPeriod.sleep();
191206
}
192207
return reload();
193208
}

gcloud-java-bigquery/src/test/java/com/google/cloud/bigquery/JobTest.java

Lines changed: 53 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,18 @@
2727
import static org.junit.Assert.assertSame;
2828
import static org.junit.Assert.assertTrue;
2929

30+
import com.google.cloud.Clock;
31+
import com.google.cloud.WaitForOption;
3032
import com.google.cloud.bigquery.JobStatistics.CopyStatistics;
3133

3234
import org.easymock.EasyMock;
3335
import org.junit.After;
36+
import org.junit.Rule;
3437
import org.junit.Test;
38+
import org.junit.rules.ExpectedException;
3539

3640
import java.util.concurrent.TimeUnit;
41+
import java.util.concurrent.TimeoutException;
3742

3843
public class JobTest {
3944

@@ -69,6 +74,9 @@ public class JobTest {
6974
private Job expectedJob;
7075
private Job job;
7176

77+
@Rule
78+
public final ExpectedException thrown = ExpectedException.none();
79+
7280
private void initializeExpectedJob(int optionsCalls) {
7381
expect(serviceMockReturnsOptions.options()).andReturn(mockOptions).times(optionsCalls);
7482
replay(serviceMockReturnsOptions);
@@ -181,35 +189,38 @@ public void testIsDone_NotExists() throws Exception {
181189
}
182190

183191
@Test
184-
public void testWaitFor() throws InterruptedException {
192+
public void testWaitFor() throws InterruptedException, TimeoutException {
185193
initializeExpectedJob(2);
186194
BigQuery.JobOption[] expectedOptions = {BigQuery.JobOption.fields(BigQuery.JobField.STATUS)};
187195
JobStatus status = createStrictMock(JobStatus.class);
188196
expect(status.state()).andReturn(JobStatus.State.DONE);
189197
expect(bigquery.options()).andReturn(mockOptions);
198+
expect(mockOptions.clock()).andReturn(Clock.defaultClock());
190199
Job completedJob = expectedJob.toBuilder().status(status).build();
191200
expect(bigquery.getJob(JOB_INFO.jobId(), expectedOptions)).andReturn(completedJob);
192201
expect(bigquery.getJob(JOB_INFO.jobId())).andReturn(completedJob);
193-
replay(status, bigquery);
202+
replay(status, bigquery, mockOptions);
194203
initializeJob();
195204
assertSame(completedJob, job.waitFor());
196-
verify(status);
205+
verify(status, mockOptions);
197206
}
198207

199208
@Test
200-
public void testWaitFor_Null() throws InterruptedException {
209+
public void testWaitFor_Null() throws InterruptedException, TimeoutException {
201210
initializeExpectedJob(1);
202211
BigQuery.JobOption[] expectedOptions = {BigQuery.JobOption.fields(BigQuery.JobField.STATUS)};
203212
expect(bigquery.options()).andReturn(mockOptions);
213+
expect(mockOptions.clock()).andReturn(Clock.defaultClock());
204214
expect(bigquery.getJob(JOB_INFO.jobId(), expectedOptions)).andReturn(null);
205215
expect(bigquery.getJob(JOB_INFO.jobId())).andReturn(null);
206-
replay(bigquery);
216+
replay(bigquery, mockOptions);
207217
initializeJob();
208218
assertNull(job.waitFor());
219+
verify(mockOptions);
209220
}
210221

211222
@Test
212-
public void testWaitForWithTimeUnit() throws InterruptedException {
223+
public void testWaitForWithCheckingPeriod() throws InterruptedException, TimeoutException {
213224
initializeExpectedJob(3);
214225
BigQuery.JobOption[] expectedOptions = {BigQuery.JobOption.fields(BigQuery.JobField.STATUS)};
215226
TimeUnit timeUnit = createStrictMock(TimeUnit.class);
@@ -219,33 +230,62 @@ public void testWaitForWithTimeUnit() throws InterruptedException {
219230
expect(status.state()).andReturn(JobStatus.State.RUNNING);
220231
expect(status.state()).andReturn(JobStatus.State.DONE);
221232
expect(bigquery.options()).andReturn(mockOptions);
233+
expect(mockOptions.clock()).andReturn(Clock.defaultClock());
222234
Job runningJob = expectedJob.toBuilder().status(status).build();
223235
Job completedJob = expectedJob.toBuilder().status(status).build();
224236
expect(bigquery.getJob(JOB_INFO.jobId(), expectedOptions)).andReturn(runningJob);
225237
expect(bigquery.getJob(JOB_INFO.jobId(), expectedOptions)).andReturn(completedJob);
226238
expect(bigquery.getJob(JOB_INFO.jobId())).andReturn(completedJob);
227-
replay(status, bigquery, timeUnit);
239+
replay(status, bigquery, timeUnit, mockOptions);
228240
initializeJob();
229-
assertSame(completedJob, job.waitFor(42, timeUnit));
230-
verify(status, timeUnit);
241+
assertSame(completedJob, job.waitFor(WaitForOption.checkEvery(42, timeUnit)));
242+
verify(status, timeUnit, mockOptions);
231243
}
232244

233245
@Test
234-
public void testWaitForWithTimeUnit_Null() throws InterruptedException {
246+
public void testWaitForWithCheckingPeriod_Null() throws InterruptedException, TimeoutException {
235247
initializeExpectedJob(2);
236248
BigQuery.JobOption[] expectedOptions = {BigQuery.JobOption.fields(BigQuery.JobField.STATUS)};
237249
TimeUnit timeUnit = createStrictMock(TimeUnit.class);
238250
timeUnit.sleep(42);
239251
EasyMock.expectLastCall();
240252
expect(bigquery.options()).andReturn(mockOptions);
253+
expect(mockOptions.clock()).andReturn(Clock.defaultClock());
241254
Job runningJob = expectedJob.toBuilder().status(new JobStatus(JobStatus.State.RUNNING)).build();
242255
expect(bigquery.getJob(JOB_INFO.jobId(), expectedOptions)).andReturn(runningJob);
243256
expect(bigquery.getJob(JOB_INFO.jobId(), expectedOptions)).andReturn(null);
244257
expect(bigquery.getJob(JOB_INFO.jobId())).andReturn(null);
245-
replay(bigquery, timeUnit);
258+
replay(bigquery, timeUnit, mockOptions);
259+
initializeJob();
260+
assertNull(job.waitFor(WaitForOption.checkEvery(42, timeUnit)));
261+
verify(bigquery, timeUnit, mockOptions);
262+
}
263+
264+
@Test
265+
public void testWaitForWithTimeout() throws InterruptedException, TimeoutException {
266+
initializeExpectedJob(2);
267+
BigQuery.JobOption[] expectedOptions = {BigQuery.JobOption.fields(BigQuery.JobField.STATUS)};
268+
TimeUnit timeUnit = createStrictMock(TimeUnit.class);
269+
timeUnit.sleep(1);
270+
EasyMock.expectLastCall();
271+
Clock clock = createStrictMock(Clock.class);
272+
expect(clock.millis()).andReturn(0L);
273+
expect(clock.millis()).andReturn(1L);
274+
expect(clock.millis()).andReturn(3L);
275+
JobStatus status = createStrictMock(JobStatus.class);
276+
expect(status.state()).andReturn(JobStatus.State.RUNNING);
277+
expect(status.state()).andReturn(JobStatus.State.RUNNING);
278+
expect(bigquery.options()).andReturn(mockOptions);
279+
expect(mockOptions.clock()).andReturn(clock);
280+
Job runningJob = expectedJob.toBuilder().status(status).build();
281+
expect(bigquery.getJob(JOB_INFO.jobId(), expectedOptions)).andReturn(runningJob);
282+
expect(bigquery.getJob(JOB_INFO.jobId(), expectedOptions)).andReturn(runningJob);
283+
replay(status, bigquery, timeUnit, clock, mockOptions);
246284
initializeJob();
247-
assertNull(job.waitFor(42, timeUnit));
248-
verify(bigquery, timeUnit);
285+
thrown.expect(TimeoutException.class);
286+
job.waitFor(WaitForOption.checkEvery(1, timeUnit),
287+
WaitForOption.timeout(3, TimeUnit.MILLISECONDS));
288+
verify(status, timeUnit, clock, mockOptions);
249289
}
250290

251291
@Test

gcloud-java-bigquery/src/test/java/com/google/cloud/bigquery/it/ITBigQueryTest.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@
8282
import java.util.List;
8383
import java.util.concurrent.ExecutionException;
8484
import java.util.concurrent.TimeUnit;
85+
import java.util.concurrent.TimeoutException;
8586
import java.util.logging.Level;
8687
import java.util.logging.Logger;
8788

@@ -169,7 +170,7 @@ public class ITBigQueryTest {
169170
public Timeout globalTimeout = Timeout.seconds(300);
170171

171172
@BeforeClass
172-
public static void beforeClass() throws InterruptedException {
173+
public static void beforeClass() throws InterruptedException, TimeoutException {
173174
RemoteBigQueryHelper bigqueryHelper = RemoteBigQueryHelper.create();
174175
RemoteStorageHelper storageHelper = RemoteStorageHelper.create();
175176
bigquery = bigqueryHelper.options().service();
@@ -783,7 +784,7 @@ public void testCreateAndGetJobWithSelectedFields() {
783784
}
784785

785786
@Test
786-
public void testCopyJob() throws InterruptedException {
787+
public void testCopyJob() throws InterruptedException, TimeoutException {
787788
String sourceTableName = "test_copy_job_source_table";
788789
String destinationTableName = "test_copy_job_destination_table";
789790
TableId sourceTable = TableId.of(DATASET, sourceTableName);
@@ -808,7 +809,7 @@ public void testCopyJob() throws InterruptedException {
808809
}
809810

810811
@Test
811-
public void testQueryJob() throws InterruptedException {
812+
public void testQueryJob() throws InterruptedException, TimeoutException {
812813
String tableName = "test_query_job_table";
813814
String query = new StringBuilder()
814815
.append("SELECT TimestampField, StringField, BooleanField FROM ")
@@ -851,7 +852,7 @@ public void testQueryJob() throws InterruptedException {
851852
}
852853

853854
@Test
854-
public void testExtractJob() throws InterruptedException {
855+
public void testExtractJob() throws InterruptedException, TimeoutException {
855856
String tableName = "test_export_job_table";
856857
TableId destinationTable = TableId.of(DATASET, tableName);
857858
LoadJobConfiguration configuration =
@@ -875,7 +876,7 @@ public void testExtractJob() throws InterruptedException {
875876
}
876877

877878
@Test
878-
public void testCancelJob() throws InterruptedException {
879+
public void testCancelJob() throws InterruptedException, TimeoutException {
879880
String destinationTableName = "test_cancel_query_job_table";
880881
String query = "SELECT TimestampField, StringField, BooleanField FROM " + TABLE_ID.table();
881882
TableId destinationTable = TableId.of(DATASET, destinationTableName);

gcloud-java-compute/src/main/java/com/google/cloud/compute/Operation.java

Lines changed: 34 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,14 @@
1616

1717
package com.google.cloud.compute;
1818

19+
import static com.google.cloud.WaitForOption.Option.CHECKING_PERIOD;
20+
import static com.google.cloud.WaitForOption.Option.TIMEOUT;
21+
import static com.google.common.base.MoreObjects.firstNonNull;
1922
import static com.google.common.base.Preconditions.checkNotNull;
2023

24+
import com.google.cloud.Clock;
25+
import com.google.cloud.WaitForOption;
26+
import com.google.cloud.WaitForOption.CheckingPeriod;
2127
import com.google.cloud.compute.Compute.OperationOption;
2228
import com.google.common.base.Function;
2329
import com.google.common.base.MoreObjects;
@@ -37,6 +43,7 @@
3743
import java.util.Map;
3844
import java.util.Objects;
3945
import java.util.concurrent.TimeUnit;
46+
import java.util.concurrent.TimeoutException;
4047

4148
/**
4249
* Google Compute Engine operations. Operation identity can be obtained via {@link #operationId()}.
@@ -660,9 +667,13 @@ public boolean isDone() {
660667
}
661668

662669
/**
663-
* Blocks until this operation completes its execution, either failing or succeeding. The
664-
* operation status is checked every 500 milliseconds. This method returns current operation's
665-
* latest information. If the operation no longer exists, this method returns {@code null}.
670+
* Blocks until this operation completes its execution, either failing or succeeding. This method
671+
* returns current operation's latest information. If the operation no longer exists, this method
672+
* returns {@code null}. By default, the operation status is checked every 500 milliseconds, to
673+
* configure this value use {@link WaitForOption#checkEvery(long, TimeUnit)}. Use
674+
* {@link WaitForOption#timeout(long, TimeUnit)} to set the maximum time to wait.
675+
*
676+
* <p>Example usage of {@code waitFor()}:
666677
* <pre> {@code
667678
* Operation completedOperation = operation.waitFor();
668679
* if (completedOperation == null) {
@@ -673,21 +684,11 @@ public boolean isDone() {
673684
* // operation completed successfully
674685
* }}</pre>
675686
*
676-
* @throws ComputeException upon failure
677-
* @throws InterruptedException if the current thread gets interrupted while waiting for the
678-
* operation to complete
679-
*/
680-
public Operation waitFor() throws InterruptedException {
681-
return waitFor(500, TimeUnit.MILLISECONDS);
682-
}
683-
684-
/**
685-
* Blocks until this operation completes its execution, either failing or succeeding. The
686-
* {@code checkEvery} and {@code unit} parameters determine how often the operation status is
687-
* checked. This method returns current operation's latest information. If the operation no longer
688-
* exists, this method returns {@code null}.
687+
* <p>Example usage of {@code waitFor()} with checking period and timeout:
689688
* <pre> {@code
690-
* Operation completedOperation = operation.waitFor(1, TimeUnit.SECONDS);
689+
* Operation completedOperation =
690+
* operation.waitFor(WaitForOption.checkEvery(1, TimeUnit.SECONDS),
691+
* WaitForOption.timeout(60, TimeUnit.SECONDS));
691692
* if (completedOperation == null) {
692693
* // operation no longer exists
693694
* } else if (completedOperation.errors() != null) {
@@ -696,13 +697,27 @@ public Operation waitFor() throws InterruptedException {
696697
* // operation completed successfully
697698
* }}</pre>
698699
*
700+
* @param waitOptions options to configure checking period and timeout
699701
* @throws ComputeException upon failure
700702
* @throws InterruptedException if the current thread gets interrupted while waiting for the
701703
* operation to complete
704+
* @throws TimeoutException if the timeout provided with
705+
* {@link WaitForOption#timeout(long, TimeUnit)} is exceeded. If no such option is provided
706+
* this exception is never thrown.
702707
*/
703-
public Operation waitFor(int checkEvery, TimeUnit unit) throws InterruptedException {
708+
public Operation waitFor(WaitForOption... waitOptions)
709+
throws InterruptedException, TimeoutException {
710+
Map<WaitForOption.Option, ?> optionMap = WaitForOption.asMap(waitOptions);
711+
CheckingPeriod checkingPeriod = firstNonNull(CHECKING_PERIOD.getCheckingPeriod(optionMap),
712+
CheckingPeriod.defaultInstance());
713+
long timeout = firstNonNull(TIMEOUT.getLong(optionMap), -1L);
714+
Clock clock = options.clock();
715+
long startTime = clock.millis();
704716
while (!isDone()) {
705-
unit.sleep(checkEvery);
717+
if (timeout != -1 && (clock.millis() - startTime) >= timeout) {
718+
throw new TimeoutException();
719+
}
720+
checkingPeriod.sleep();
706721
}
707722
return reload();
708723
}

0 commit comments

Comments
 (0)