This repository contains examples for streaming data from Kafka to BigQuery using both custom Java pipelines and Google's pre-built Dataflow templates.
.
├── pom.xml # Maven project configuration
├── run_local.sh # Simple local pipeline (Branch 1 only)
├── run_dataflow.sh # Simple Dataflow pipeline (Branch 1 only)
├── run_branched_pipeline.sh # Advanced pipeline with all three branches
├── run_multi_pipeline.sh # Multi-stream join pipeline (NEW)
├── run_dataflow_template.sh # Script to deploy using Dataflow Flex Template
├── MULTI_PIPELINE_README.md # Documentation for multi-stream joins
├── udf/
│ ├── process.js # JavaScript UDF for template transformation
│ └── user_event_aggregations.sql # SQL query for Beam SQL aggregations
├── src/main/java/com/johanesalxd/ # Java source code
│ ├── KafkaToBigQuery.java # Main pipeline logic with three branches
│ ├── MultiPipelineExample.java # Multi-stream join pipeline (NEW)
│ ├── KafkaPipelineOptions.java # Pipeline options for single-topic
│ ├── MultiPipelineOptions.java # Pipeline options for multi-stream (NEW)
│ ├── schemas/ # BigQuery schema definitions
│ │ ├── UserEventBigQuerySchema.java
│ │ ├── GenericBigQuerySchema.java
│ │ ├── UserEventAggregationSchema.java
│ │ ├── UserProfileBigQuerySchema.java # NEW
│ │ ├── ProductUpdateBigQuerySchema.java # NEW
│ │ └── EnrichedEventBigQuerySchema.java # NEW
│ └── transforms/ # Data transformation classes
│ ├── JsonToTableRow.java
│ ├── JsonToGenericTableRow.java
│ ├── JsonToRow.java
│ ├── UserProfileToTableRow.java # NEW
│ ├── UserProfileToRow.java # NEW
│ ├── ProductUpdateToRow.java # NEW
│ ├── EnrichedEventToTableRow.java # NEW
│ ├── TableRowToUserProfileRow.java # NEW
│ ├── EnrichWithUserProfileDoFn.java # NEW
│ └── SqlQueryReader.java
├── src/main/resources/udf/ # SQL queries for Beam SQL
│ ├── enriched_events_join.sql # Multi-stream join query (NEW)
│ └── user_event_aggregations.sql # Event aggregation query
├── kafka-tools/ # Kafka producer/consumer tools
│ ├── data_generator.py
│ ├── data_consumer.py
│ ├── detailed_consumer_groups.py
│ ├── docker-compose.yml
│ ├── example_usage.md
│ ├── requirements.txt
│ └── vm-deployment/ # VM deployment scripts
│ ├── 1-provision-kafka-vm.sh
│ ├── 2-setup-kafka-vm.sh
│ ├── docker-compose-vm.yml
│ └── example_usage.md
└── schemas/ # JSON schemas for BigQuery
├── user_events.json
├── user_profiles.json # NEW
├── product_updates.json # NEW
├── enriched_events.json # NEW
├── generic_table.json
└── user_event_aggregations.json
This repository provides multiple deployment scripts for different use cases:
run_local.sh- Local development with DirectRunner (Branch 1 only)run_dataflow.sh- Simple Dataflow deployment (Branch 1 only)
run_branched_pipeline.sh- Single topic with 3 parallel processing branchesrun_multi_pipeline.sh- Multi-stream joins with Beam SQL (NEW)run_dataflow_template.sh- Google's managed Flex Template
The KafkaToBigQuery.java pipeline has three conditional branches:
| Branch | Trigger | Purpose | Output |
|---|---|---|---|
| Branch 1 | Always runs (requires outputTable) |
Flattened structured data | raw_user_events |
| Branch 2 | Only if genericOutputTable provided |
Raw JSON with timestamps | raw_user_events_flex |
| Branch 3 | Only if sqlAggregationTable provided |
Real-time SQL aggregations | user_event_aggregations |
- Simple scripts (
run_local.sh,run_dataflow.sh) only provideoutputTable→ Branch 1 only - Branched script (
run_branched_pipeline.sh) provides all three tables → All branches run - Multi-pipeline script (
run_multi_pipeline.sh) runs completely different pipeline (MultiPipelineExample.java)
IMPORTANT: All scripts use explicit main class specification since no main class is defined in the JAR manifest:
# ✅ CORRECT - Only method that works
java -cp ${JAR_FILE} com.johanesalxd.KafkaToBigQuery
# ❌ FAILS - No main class in JAR manifest
java -jar ${JAR_FILE}
java -jar ${JAR_FILE} com.johanesalxd.KafkaToBigQuery # Still fails!Why this approach:
- The JAR manifest has no main class defined (removed from pom.xml for safety)
- Using
java -jarwill always fail with "no main manifest attribute" error - Only
java -cpwith explicit main class works - This prevents accidentally running the wrong pipeline
Benefits:
- Explicit Control: Must specify which main class to run
- No Ambiguity: Cannot accidentally run the wrong pipeline
- Consistency: All scripts use the same execution method (
java -cp) - Safety: Impossible to run without explicitly choosing the pipeline
This repository provides multiple approaches for streaming Kafka data to BigQuery:
-
Start Kafka:
docker-compose up -d
-
Install Java and Maven: This project requires Java 11 and Maven to be installed on your system.
-
Generate Sample Data: In a separate terminal, run the data generator to start producing messages to the
user-eventstopic:python3 kafka-tools/data_generator.py --topics user-events
-
Run the Pipeline Locally: In another terminal, run the following command to start the Beam pipeline locally:
./run_local.sh
The pipeline will start consuming messages from the user-events topic and writing them to the raw_user_events table in the dataflow_demo_local dataset in BigQuery.
For a simpler deployment without Java/Maven requirements:
- Start Kafka: (same as above)
- Generate Sample Data: (same as above)
- Configure and Deploy:
# Edit run_dataflow_template.sh with your GCP settings ./run_dataflow_template.sh
This approach uses Google's managed template and writes to the raw_user_events_flex table.
For demonstrating multiple processing approaches including real-time SQL aggregations:
- Start Kafka: (same as above)
- Generate Sample Data: (same as above)
- Deploy Multi-Branch Pipeline:
# Edit run_branched_pipeline.sh with your GCP settings ./run_branched_pipeline.sh
This pipeline processes the same Kafka stream into three parallel branches:
- Flattened events →
raw_user_events(structured schema) - Generic events →
raw_user_events_flex(raw JSON with timestamps) - SQL aggregations →
user_event_aggregations(real-time event counts by type)
The SQL aggregation branch demonstrates Beam SQL capabilities with 1-minute tumbling windows, counting events by type using the query in udf/user_event_aggregations.sql.
For demonstrating advanced stream processing with multiple Kafka topics and Beam SQL joins:
# Edit run_multi_pipeline.sh with your GCP settings
./run_multi_pipeline.shThis pipeline demonstrates advanced multi-stream joins with BigQuery side inputs and Beam SQL.
📖 For detailed documentation, see MULTI_PIPELINE_README.md
For production deployments, you can run the same pipeline on Google Cloud Dataflow:
For a simpler deployment without Java/Maven requirements:
- Start Kafka: (same as above)
- Generate Sample Data: (same as above)
- Configure and Deploy:
# Edit run_dataflow_template.sh with your GCP settings ./run_dataflow.sh
This approach uses Google's managed template and writes to the raw_user_events table.
- View jobs: Dataflow Console
- Monitor data: BigQuery Console
For users who prefer to use Google's pre-built templates instead of custom Java code, this repository also includes a template-based approach using the Kafka_to_BigQuery_Flex template.
| Feature | Custom Java Pipeline | Dataflow Flex Template |
|---|---|---|
| Setup | Requires Java 11 + Maven | Only requires gcloud CLI |
| Deployment | Compile + Deploy | Direct deployment |
| Customization | Full Java flexibility | JavaScript UDF only |
| Maintenance | User maintains code | Google maintains template |
| Table Schema | Custom schema | Generic schema with payload |
| Output Table | raw_user_events |
raw_user_events_flex |
- Java-based: A reliable and robust implementation using the Java SDK.
- Multi-Branch Processing: Single pipeline with three parallel processing branches for different use cases.
- Beam SQL Integration: Real-time SQL aggregations with external query files for easy maintenance.
- Simplified Configuration: All configuration is handled in the deployment scripts, making it easy to get started.
- Partitioned BigQuery Tables: All pipelines write to time-partitioned BigQuery tables, which is a best practice for managing large datasets.
- Clean and Simple Code: The code is well-structured and easy to understand, making it a great starting point for more complex pipelines.
- Flexible Schema Management: Support for both structured and generic schemas with automatic table creation.