Skip to content

Commit f5b5d3d

Browse files
authored
Add support for Google Cloud Pub/Sub (#1085)
2 parents bc016f7 + aefce68 commit f5b5d3d

File tree

82 files changed

+19426
-190
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

82 files changed

+19426
-190
lines changed

.travis.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ before_install:
99
- cp target/travis/settings.xml ~/.m2/settings.xml
1010
install: mvn install -DskipTests=true -Dgpg.skip=true
1111
script:
12-
- travis_wait 30 utilities/verify.sh
12+
- travis_wait 60 utilities/verify.sh
1313
after_success:
1414
- utilities/after_success.sh
1515
env:

README.md

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ This client supports the following Google Cloud Platform services:
1818
- [Google Cloud Compute] (#google-cloud-compute-alpha) (Alpha)
1919
- [Google Cloud Datastore] (#google-cloud-datastore)
2020
- [Google Cloud DNS] (#google-cloud-dns-alpha) (Alpha)
21+
- [Google Cloud Pub/Sub] (#google-cloud-pubsub-alpha) (Alpha - Not working on App Engine Standard)
2122
- [Google Cloud Resource Manager] (#google-cloud-resource-manager-alpha) (Alpha)
2223
- [Google Cloud Storage] (#google-cloud-storage)
2324

@@ -62,6 +63,8 @@ Example Applications
6263
- [`Flexible Environment/Datastore example`](https://github.com/GoogleCloudPlatform/java-docs-samples/tree/master/managed_vms/datastore) - A simple app that uses Cloud Datastore to list the last 10 IP addresses that visited your site.
6364
- Read about how to run the application [here](https://github.com/GoogleCloudPlatform/java-docs-samples/blob/master/managed_vms/README.md).
6465
- [`Flexible Environment/Storage example`](https://github.com/GoogleCloudPlatform/java-docs-samples/tree/master/managed_vms/cloudstorage) - An app that uploads files to a public Cloud Storage bucket on the App Engine Flexible Environment runtime.
66+
- [`PubSubExample`](./gcloud-java-examples/src/main/java/com/google/cloud/examples/pubsub/PubSubExample.java) - A simple command line interface providing some of Cloud Pub/Sub's functionality
67+
- Read more about using this application on the [`PubSubExample` docs page](http://googlecloudplatform.github.io/gcloud-java/apidocs/?com/google/cloud/examples/pubsub/PubSubExample.html).
6568
- [`ResourceManagerExample`](./gcloud-java-examples/src/main/java/com/google/cloud/examples/resourcemanager/ResourceManagerExample.java) - A simple command line interface providing some of Cloud Resource Manager's functionality
6669
- Read more about using this application on the [`ResourceManagerExample` docs page](http://googlecloudplatform.github.io/gcloud-java/apidocs/?com/google/cloud/examples/resourcemanager/ResourceManagerExample.html).
6770
- [`SparkDemo`](https://github.com/GoogleCloudPlatform/java-docs-samples/blob/master/managed_vms/sparkjava) - An example of using `gcloud-java-datastore` from within the SparkJava and App Engine Flexible Environment frameworks.
@@ -368,6 +371,44 @@ ChangeRequestInfo changeRequest = changeBuilder.build();
368371
zone.applyChangeRequest(changeRequest);
369372
```
370373
374+
Google Cloud Pub/Sub (Alpha)
375+
----------------------
376+
377+
- [API Documentation][pubsub-api]
378+
- [Official Documentation][cloud-pubsub-docs]
379+
380+
#### Preview
381+
382+
Here is a code snippet showing a simple usage example from within Compute Engine/App Engine
383+
Flexible. Note that you must [supply credentials](#authentication) and a project ID if running this
384+
snippet elsewhere. Complete source code can be found at
385+
[CreateSubscriptionAndPullMessages.java](./gcloud-java-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/CreateSubscriptionAndPullMessages.java).
386+
387+
```java
388+
import com.google.cloud.pubsub.Message;
389+
import com.google.cloud.pubsub.PubSub;
390+
import com.google.cloud.pubsub.PubSub.MessageConsumer;
391+
import com.google.cloud.pubsub.PubSub.MessageProcessor;
392+
import com.google.cloud.pubsub.PubSubOptions;
393+
import com.google.cloud.pubsub.Subscription;
394+
import com.google.cloud.pubsub.SubscriptionInfo;
395+
396+
try (PubSub pubsub = PubSubOptions.defaultInstance().service()) {
397+
Subscription subscription =
398+
pubsub.create(SubscriptionInfo.of("test-topic", "test-subscription"));
399+
MessageProcessor callback = new MessageProcessor() {
400+
@Override
401+
public void process(Message message) throws Exception {
402+
System.out.printf("Received message \"%s\"%n", message.payloadAsString());
403+
}
404+
};
405+
// Create a message consumer and pull messages (for 60 seconds)
406+
try (MessageConsumer consumer = subscription.pullAsync(callback)) {
407+
Thread.sleep(60_000);
408+
}
409+
}
410+
```
411+
371412
Google Cloud Resource Manager (Alpha)
372413
----------------------
373414
@@ -513,6 +554,7 @@ Apache 2.0 - See [LICENSE] for more information.
513554
[cloud-dns-docs]: https://cloud.google.com/dns/docs
514555
[cloud-dns-activation]: https://console.cloud.google.com/start/api?id=dns
515556
557+
[pubsub-api]: http://googlecloudplatform.github.io/gcloud-java/apidocs/index.html?com/google/cloud/pubsub/package-summary.html
516558
[cloud-pubsub]: https://cloud.google.com/pubsub/
517559
[cloud-pubsub-docs]: https://cloud.google.com/pubsub/docs
518560

TESTING.md

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,5 +174,46 @@ Here is an example that clears the dataset created in Step 3.
174174
RemoteBigQueryHelper.forceDelete(bigquery, dataset);
175175
```
176176

177+
### Testing code that uses Pub/Sub
178+
179+
#### On your machine
180+
181+
You can test against a temporary local Pub/Sub by following these steps:
182+
183+
1. Start the local Pub/Sub emulator before running your tests using `LocalPubSubHelper`'s `create`
184+
and `start` methods. This will bind a port for communication with the local Pub/Sub emulator.
185+
```java
186+
LocalPubSubHelper helper = LocalPubSubHelper.create();
187+
188+
helper.start(); // Starts the local Pub/Sub emulator in a separate process
189+
```
190+
191+
2. Create and use a `PubSub` object with the options given by the `LocalPubSubHelper` instance. For
192+
example:
193+
```java
194+
PubSub localPubsub = helper.options().service();
195+
```
196+
197+
3. Run your tests.
198+
199+
4. Stop the local Pub/Sub emulator by calling the `stop()` method, like so:
200+
```java
201+
helper.stop();
202+
```
203+
204+
#### On a remote machine
205+
206+
You can test against a remote Pub/Sub emulator as well. To do this, set the `PubSubOptions` project
207+
endpoint to the hostname of the remote machine, like the example below.
208+
209+
```java
210+
PubSubOptions options = PubSubOptions.builder()
211+
.projectId("my-project-id") // must match project ID specified on remote machine
212+
.host("<hostname of machine>:<port>")
213+
.authCredentials(AuthCredentials.noAuth())
214+
.build();
215+
PubSub localPubsub= options.service();
216+
```
217+
177218
[cloud-platform-storage-authentication]:https://cloud.google.com/storage/docs/authentication?hl=en#service_accounts
178219
[create-service-account]:https://developers.google.com/identity/protocols/OAuth2ServiceAccount#creatinganaccount

gcloud-java-bigquery/src/main/java/com/google/cloud/bigquery/BigQueryOptions.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,19 +16,19 @@
1616

1717
package com.google.cloud.bigquery;
1818

19-
import com.google.cloud.ServiceOptions;
19+
import com.google.cloud.HttpServiceOptions;
2020
import com.google.cloud.bigquery.spi.BigQueryRpc;
2121
import com.google.cloud.bigquery.spi.BigQueryRpcFactory;
2222
import com.google.cloud.bigquery.spi.DefaultBigQueryRpc;
2323
import com.google.common.collect.ImmutableSet;
2424

2525
import java.util.Set;
2626

27-
public class BigQueryOptions extends ServiceOptions<BigQuery, BigQueryRpc, BigQueryOptions> {
27+
public class BigQueryOptions extends HttpServiceOptions<BigQuery, BigQueryRpc, BigQueryOptions> {
2828

2929
private static final String BIGQUERY_SCOPE = "https://www.googleapis.com/auth/bigquery";
3030
private static final Set<String> SCOPES = ImmutableSet.of(BIGQUERY_SCOPE);
31-
private static final long serialVersionUID = -215981591481708043L;
31+
private static final long serialVersionUID = -8592198255032667206L;
3232

3333
public static class DefaultBigqueryFactory implements BigQueryFactory {
3434

@@ -51,7 +51,7 @@ public BigQueryRpc create(BigQueryOptions options) {
5151
}
5252

5353
public static class Builder extends
54-
ServiceOptions.Builder<BigQuery, BigQueryRpc, BigQueryOptions, Builder> {
54+
HttpServiceOptions.Builder<BigQuery, BigQueryRpc, BigQueryOptions, Builder> {
5555

5656
private Builder() {
5757
}

gcloud-java-compute/src/main/java/com/google/cloud/compute/ComputeOptions.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,19 +16,19 @@
1616

1717
package com.google.cloud.compute;
1818

19-
import com.google.cloud.ServiceOptions;
19+
import com.google.cloud.HttpServiceOptions;
2020
import com.google.cloud.compute.spi.ComputeRpc;
2121
import com.google.cloud.compute.spi.ComputeRpcFactory;
2222
import com.google.cloud.compute.spi.DefaultComputeRpc;
2323
import com.google.common.collect.ImmutableSet;
2424

2525
import java.util.Set;
2626

27-
public class ComputeOptions extends ServiceOptions<Compute, ComputeRpc, ComputeOptions> {
27+
public class ComputeOptions extends HttpServiceOptions<Compute, ComputeRpc, ComputeOptions> {
2828

2929
private static final String COMPUTE_SCOPE = "https://www.googleapis.com/auth/compute";
3030
private static final Set<String> SCOPES = ImmutableSet.of(COMPUTE_SCOPE);
31-
private static final long serialVersionUID = 6509557711917342058L;
31+
private static final long serialVersionUID = 5074781985597996770L;
3232

3333
public static class DefaultComputeFactory implements ComputeFactory {
3434

@@ -51,7 +51,7 @@ public ComputeRpc create(ComputeOptions options) {
5151
}
5252

5353
public static class Builder extends
54-
ServiceOptions.Builder<Compute, ComputeRpc, ComputeOptions, Builder> {
54+
HttpServiceOptions.Builder<Compute, ComputeRpc, ComputeOptions, Builder> {
5555

5656
private Builder() {
5757
}

gcloud-java-core/pom.xml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,5 +98,15 @@
9898
<version>3.4</version>
9999
<scope>test</scope>
100100
</dependency>
101+
<dependency>
102+
<groupId>com.google.protobuf</groupId>
103+
<artifactId>protobuf-java</artifactId>
104+
<version>3.0.0-beta-3</version>
105+
</dependency>
106+
<dependency>
107+
<groupId>com.google.api</groupId>
108+
<artifactId>gax</artifactId>
109+
<version>0.0.13</version>
110+
</dependency>
101111
</dependencies>
102112
</project>
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* Copyright 2016 Google Inc. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.cloud;
18+
19+
import java.util.concurrent.Future;
20+
21+
/**
22+
* Interface for asynchronously consuming Google Cloud paginated results.
23+
*
24+
* <p>Use {@code AsyncPage} to iterate through all values (also in next pages):
25+
* <pre> {@code
26+
* AsyncPage<T> page = ...; // get an AsyncPage<T> instance
27+
* Iterator<T> iterator = page.iterateAll();
28+
* while (iterator.hasNext()) {
29+
* T value = iterator.next();
30+
* // do something with value
31+
* }}</pre>
32+
*
33+
* <p>Or handle pagination explicitly:
34+
* <pre> {@code
35+
* AsyncPage<T> page = ...; // get a AsyncPage<T> instance
36+
* while (page != null) {
37+
* for (T value : page.values()) {
38+
* // do something with value
39+
* }
40+
* page = page.nextPageAsync().get();
41+
* }}</pre>
42+
*
43+
* @param <T> the value type that the page holds
44+
*/
45+
public interface AsyncPage<T> extends Page<T> {
46+
47+
/**
48+
* Returns a {@link Future} object for the next page. {@link Future#get()} returns {@code null} if
49+
* the last page has been reached.
50+
*/
51+
Future<AsyncPage<T>> nextPageAsync();
52+
}
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
/*
2+
* Copyright 2016 Google Inc. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.cloud;
18+
19+
import com.google.common.base.Throwables;
20+
import com.google.common.util.concurrent.Futures;
21+
import com.google.common.util.concurrent.Uninterruptibles;
22+
23+
import java.io.Serializable;
24+
import java.util.concurrent.ExecutionException;
25+
import java.util.concurrent.Future;
26+
27+
/**
28+
* Base implementation for asynchronously consuming Google Cloud paginated results.
29+
*
30+
* @param <T> the value type that the page holds
31+
*/
32+
public class AsyncPageImpl<T> extends PageImpl<T> implements AsyncPage<T> {
33+
34+
private static final long serialVersionUID = -6009473188630364906L;
35+
36+
private final NextPageFetcher<T> asyncPageFetcher;
37+
38+
/**
39+
* Interface for asynchronously fetching the next page of results from the service.
40+
*
41+
* @param <T> the value type that the page holds
42+
*/
43+
public interface NextPageFetcher<T> extends Serializable {
44+
Future<AsyncPage<T>> nextPage();
45+
}
46+
47+
private static class SyncNextPageFetcher<T> implements PageImpl.NextPageFetcher<T> {
48+
49+
private static final long serialVersionUID = -4124568632363525351L;
50+
51+
private final NextPageFetcher<T> asyncPageFetcher;
52+
53+
private SyncNextPageFetcher(NextPageFetcher<T> asyncPageFetcher) {
54+
this.asyncPageFetcher = asyncPageFetcher;
55+
}
56+
57+
@Override
58+
public Page<T> nextPage() {
59+
try {
60+
return asyncPageFetcher != null
61+
? Uninterruptibles.getUninterruptibly(asyncPageFetcher.nextPage()) : null;
62+
} catch (ExecutionException ex) {
63+
throw Throwables.propagate(ex.getCause());
64+
}
65+
}
66+
}
67+
68+
/**
69+
* Creates an {@code AsyncPageImpl} object.
70+
*/
71+
public AsyncPageImpl(NextPageFetcher<T> asyncPageFetcher, String cursor, Iterable<T> results) {
72+
super(new SyncNextPageFetcher<T>(asyncPageFetcher), cursor, results);
73+
this.asyncPageFetcher = asyncPageFetcher;
74+
}
75+
76+
@Override
77+
public Future<AsyncPage<T>> nextPageAsync() {
78+
if (nextPageCursor() == null || asyncPageFetcher == null) {
79+
return Futures.immediateCheckedFuture(null);
80+
}
81+
return asyncPageFetcher.nextPage();
82+
}
83+
}

gcloud-java-core/src/main/java/com/google/cloud/BaseServiceException.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import com.google.api.client.googleapis.json.GoogleJsonError;
2020
import com.google.api.client.googleapis.json.GoogleJsonResponseException;
21+
import com.google.api.gax.grpc.ApiException;
2122
import com.google.common.base.MoreObjects;
2223

2324
import java.io.IOException;
@@ -167,6 +168,16 @@ public BaseServiceException(int code, String message, String reason, boolean ide
167168
this.debugInfo = null;
168169
}
169170

171+
public BaseServiceException(ApiException apiException, boolean idempotent) {
172+
super(apiException.getMessage(), apiException);
173+
this.code = apiException.getStatusCode().value();
174+
this.reason = apiException.getStatusCode().name();
175+
this.idempotent = idempotent;
176+
this.retryable = apiException.isRetryable();
177+
this.location = null;
178+
this.debugInfo = null;
179+
}
180+
170181
protected Set<Error> retryableErrors() {
171182
return Collections.emptySet();
172183
}

0 commit comments

Comments
 (0)