Skip to content

Latest commit

 

History

History
 
 

README.md

Dataflow Flex templates - Kafka to BigQuery

Open in Cloud Shell

Samples showing how to create and run an Apache Beam template with a custom Docker image on Google Cloud Dataflow.

Before you begin

If you are not familiar with Dataflow Flex templates, please see the Streaming Beam SQL sample first.

Follow the Getting started with Google Cloud Dataflow page, and make sure you have a Google Cloud project with billing enabled and a service account JSON key set up in your GOOGLE_APPLICATION_CREDENTIALS environment variable. Additionally, for this sample you need the following:

  1. Enable the APIs: App Engine, Cloud Scheduler, Cloud Build.

  2. Create a Cloud Storage bucket.

    export BUCKET="your-gcs-bucket"
    gsutil mb gs://$BUCKET
  3. Create a BigQuery dataset.

    export PROJECT="$(gcloud config get-value project)"
    export DATASET="beam_samples"
    export TABLE="kafka_to_bigquery"
    
    bq mk --dataset "$PROJECT:$DATASET"
  4. Select the compute region and zone to use.

    # Select your default compute/region, or default to "us-central1".
    export REGION=${"$(gcloud config get-value compute/region)":-"us-central1"}
    
    # Select your default compute/zone, or default to "$REGION-a".
    # Note that the zone *must* be in $REGION.
    export ZONE=${"$(gcloud config get-value compute/zone)":-"$REGION-a"}
  5. Clone the java-docs-samples repository.

    git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git
  6. Navigate to the sample code directory.

    cd java-docs-samples/dataflow/flex-templates/kafka_to_bigquery

Kafka to BigQuery sample

This sample shows how to deploy an Apache Beam streaming pipeline that reads JSON encoded messages from Apache Kafka, decodes them, and writes them into a BigQuery table.

For this, we need two parts running:

  1. A Kafka server container accessible through an external IP address. This services publishes messages to a topic.

  2. An Apache Beam streaming pipeline running in Dataflow Flex Templates. This subscribes to a Kafka topic, consumes the messages that are published to that topic, processes them, and writes them into a BigQuery table.

Starting the Kafka server

(Optional) Run the Kafka server locally for development. (Click to expand)

Note that you must have Docker installed in your machine to run the container locally. You do not need Docker installed to run in Cloud, skip this section if you want to go straight to building and deploying in Cloud.

# Create a network where containers can communicate.
docker network create kafka-net

# Build the image.
docker image build -t kafka kafka/

# Run a detached container (in the background) using the network we created.
docker run -d --rm \
  --name "kafka" \
  --net "kafka-net" \
  -p 2181:2181 -p 9092:9092 \
  kafka

Once you are done, you can stop and delete the resources.

# Stop the container.
docker kill kafka

# Delete the Docker network.
docker network rm kafka-net

For more information about creating a Docker application, see Containerizing an application.

The Kafka server must be accessible to external applications. For this we need a static IP address for the Kafka server to live.

ℹ️ If you already have a Kafka server running you can skip this section. Just make sure to store its IP address into an environment variable.

export KAFKA_ADDRESS="123.456.789"
# Create a new static IP address for the Kafka server to use.
gcloud compute addresses create --region "$REGION" kafka-address

# Get the static address into a variable.
export KAFKA_ADDRESS=$(gcloud compute addresses describe --region="$REGION" --format='value(address)' kafka-address)

ℹ️ Do not use --global to create the static IP address since the Kafka server must reside in a specific region.

We also need to create a firewall rule to allow incoming messages to the server.

Kafka uses port 9092 and Zookeeper uses port 2181 by default, unless configured differently.

# Create a firewall rule to open the port used by Zookeeper and Kafka.
# Allow connections to ports 2181, 9092 in VMs with the "kafka-server" tag.
gcloud compute firewall-rules create allow-kafka \
  --target-tags "kafka-server" \
  --allow tcp:2181,tcp:9092

Now we can start a new Compute Engine VM (Virtual Machine) instance for the Kafka server using the Docker image we created in Container Registry.

For this sample, we don't need a high performance VM, so we are using an e2-small machine with shared CPU cores for a more cost-effective option.

To learn more about pricing, see the VM instances pricing page.

export KAFKA_IMAGE="gcr.io/$PROJECT/samples/dataflow/kafka:latest"

# Build the Kafka server image into Container Registry.
gcloud builds submit --tag $KAFKA_IMAGE kafka/

# Create and start a new instance.
# The --address flag binds the VM's address to the static address we created.
# The --container-env KAFKA_ADDRESS is an environment variable passed to the
# container to configure Kafka to use the static address of the VM.
# The --tags "kafka-server" is used by the firewakll rule.
gcloud compute instances create-with-container kafka-vm \
  --zone "$ZONE" \
  --machine-type "e2-small" \
  --address "$KAFKA_ADDRESS" \
  --container-image "$KAFKA_IMAGE" \
  --container-env "KAFKA_ADDRESS=$KAFKA_ADDRESS" \
  --tags "kafka-server"

Creating and running a Flex Template

(Optional) Run the Apache Beam pipeline locally for development. (Click to expand)
# If you omit the --bootstrapServer argument, it connects to localhost.
# If you are running the Kafka server locally, you can omit --bootstrapServer.
mvn compile exec:java \
  -Dexec.mainClass=org.apache.beam.samples.KafkaToBigQuery \
  -Dexec.args="\
    --project=$PROJECT \
    --outputTable=$PROJECT:$DATASET.$TABLE \
    --bootstrapServer=$KAFKA_ADDRESS:9092"

First, let's build the container image.

export TEMPLATE_IMAGE="gcr.io/$PROJECT/samples/dataflow/streaming-beam-sql:latest"

# Build and package the application as an uber-jar file.
mvn clean package

# Build the Dataflow Flex template image into Container Registry.
gcloud builds submit --tag "$TEMPLATE_IMAGE" .

Now we can create the template file.

export TEMPLATE_PATH="gs://$BUCKET/samples/dataflow/templates/kafka-to-bigquery.json"

# Build the Flex Template.
gcloud beta dataflow flex-template build $TEMPLATE_PATH \
  --image "$TEMPLATE_IMAGE" \
  --sdk-language "JAVA" \
  --metadata-file "metadata.json"

Finally, to run a Dataflow job using the template.

# Run the Flex Template.
gcloud beta dataflow flex-template run "kafka-to-bigquery-`date +%Y%m%d-%H%M%S`" \
  --template-file-gcs-location "$TEMPLATE_PATH" \
  --parameters "inputTopic=messages,outputTable=$PROJECT:$DATASET.$TABLE,bootstrapServer=$KAFKA_ADDRESS:9092"

Run the following query to check the results in BigQuery.

bq query --use_legacy_sql=false 'SELECT * FROM `'"$PROJECT.$DATASET.$TABLE"'`'

Cleaning up

After you've finished this tutorial, you can clean up the resources you created on Google Cloud so you won't be billed for them in the future. The following sections describe how to delete or turn off these resources.

Clean up the Flex template resources

  1. Stop the Dataflow pipeline.

    gcloud dataflow jobs list \
        --filter 'NAME:kafka-to-bigquery AND STATE=Running' \
        --format 'value(JOB_ID)' \
      | xargs gcloud dataflow jobs cancel
  2. Delete the template spec file from Cloud Storage.

    gsutil rm $TEMPLATE_PATH
  3. Delete the Flex Template container images from Container Registry.

    gcloud container images delete $TEMPLATE_IMAGE --force-delete-tags

Clean up the Kafka server

  1. Delete the Kafka server VM instance.

    gcloud compute instances delete kafka-vm
  2. Delete the firewall rule, this does not incur any charges.

    gcloud compute firewall-rules delete allow-kafka
  3. Delete the static address.

    gcloud compute addresses delete --region "$REGION" kafka-address
  4. Delete the Kafka container image from Container Registry.

    gcloud container images delete $KAFKA_IMAGE --force-delete-tags

Clean up Google Cloud project resources

  1. Delete the BigQuery table.

    bq rm -f -t $PROJECT:$DATASET.$TABLE
  2. Delete the BigQuery dataset, this alone does not incur any charges.

    ⚠️ The following command also deletes all tables in the dataset. The tables and data cannot be recovered.

    bq rm -r -f -d $PROJECT:$DATASET
  3. Delete the Cloud Storage bucket, this alone does not incur any charges.

    ⚠️ The following command also deletes all objects in the bucket. These objects cannot be recovered.

    gsutil rm -r gs://$BUCKET