The minimum version you should use for Pub/Sub Lite IO with beam is 2.34.0.
- Add the following to your
pom.xmlfile to download the Pub/Sub Lite I/O included with Beam.
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
<version>2.34.0</version>
</dependency>-
Create a topic using
gcloud pubsub lite-topics create -
Write some messages using:
import com.google.cloud.pubsublite.proto.PubSubMessage; import com.google.cloud.pubsublite.CloudZone; import com.google.cloud.pubsublite.ProjectIdOrNumber; import com.google.cloud.pubsublite.TopicName; import com.google.cloud.pubsublite.TopicPath; import org.apache.beam.sdk.io.gcp.pubsublite.PubsubLiteIO; import org.apache.beam.sdk.io.gcp.pubsublite.PublisherOptions; ... private final static String LOCATION = "us-central1-b"; private final static Long PROJECT_NUM = 123L; ... PCollection<PubSubMessage> messages = ...; messages.apply("Write messages", PubsubLiteIO.write( PublisherOptions.newBuilder() .setTopicPath(TopicPath.newBuilder() .setLocation(CloudZone.parse(ZONE)) .setProject(ProjectIdOrNumber.of(PROJECT_NUM)) .setName(TopicName.of("my-topic")) .build()) .build()));
-
Create a subscription using
gcloud pubsub lite-subscriptions create -
Read some messages using:
import com.google.cloud.pubsublite.proto.SequencedMessage; import com.google.cloud.pubsublite.CloudZone; import com.google.cloud.pubsublite.ProjectIdOrNumber; import com.google.cloud.pubsublite.SubscriptionName; import com.google.cloud.pubsublite.SubscriptionPath; import org.apache.beam.sdk.io.gcp.pubsublite.PubsubLiteIO; import org.apache.beam.sdk.io.gcp.pubsublite.SubscriberOptions; ... private final static String LOCATION = "us-central1-b"; private final static Long PROJECT_NUM = 123L; ... Pipeline pipeline = ...; PCollection<SequencedMessage> messages = pipeline.apply("Read messages", PubsubLiteIO.read( SubscriberOptions.newBuilder() .setSubscriptionPath(SubscriptionPath.newBuilder() .setLocation(CloudZone.parse(LOCATION)) .setProject(ProjectIdOrNumber.of(PROJECT_NUM)) .setName(SubscriptionName.of("my-sub")) .build()) .build()));
- Drain does not work on the default dataflow runner. You must set the option
--experiments=use_runner_v2for draining to function correctly.