A data-stream processing framework written in Rust that enables complex data transformations through composable processors connected via a bash-like graph-based workflow language. This work includes options to leverage Amazon AWS services and can serve as a reference implementation of Rust integration.
Event Winnower is a flexible data processing graph engine that allows you to build complex workflows by connecting processors in directed acyclic workflow graphs (DAGs). With its familiar bash-like pipe syntax (|) and variable system ($var), Event Winnower makes it easy to create complex data flow patterns including branching, merging, and stream processing. It supports multiple input sources (JSON, CSV, S3, SQS, Kinesis) and output destinations (S3, SQS, Kinesis, Firehose, SNS) with a rich set of transformation and data enrichment processors. It is intended for rapid, iterative prototyping on the desktop as well as scalable deployments within cloud environments. It is especially suited for JSON data streams and most processing elements support selecting subsets of data from events using JMESpath syntax.
- take a realtime stream of data from S3 Notifications from an SNS+SQS queue, process all the events to look for novel items and write those novel items to S3 using firehose
- query athena periodically, transform the items, selectively query bedrock, and send results to Slack from a hook using SNS.
- process lots of files from an S3 prefix, filter them using matching and stateful filters, write results to kinesis
- correlate events that co-occur in time that share a common key-space or set of fields
- take a kinesis stream, lookup ips in maxmind, filter ones that are not external, send data to a new kinesis stream
- prototype a processing idea using test files then rapidly converting it to handle a realtime stream of data to send output to other systems
- Processing Graph Architecture: Build complex DAGs using variables for branching and merging of data streams
- Stream Processing: Low latency, event-at-a-time processing as well as batch-events at a time processing
- Multiple Input Sources: JSON files, CSV, S3 objects, SQS queues, Kinesis streams, Athena queries
- Rich Processing Library: 50+ built-in processors for filtering, transformation, aggregation, and analysis
- AWS Integration: Native support for S3, SQS, Kinesis, Firehose, SNS, Bedrock, DynamoDB, and Athena
- Concurrent Processing: Multi-threaded execution with configurable thread pools
- Stateful Analytics: Store intermediate results and create complex data flow patterns
- IP Enrichment: MaxMind GeoIP integration for location and ISP data
- Text Analysis: N-gram generation, tokenization, fuzzy clustering, and deduplication
- Cloudwatch aggregate metrics logging
Build from source using Cargo:
cargo build --releaseThe binary will be available at target/release/eventwinnower.
# Basic pipeline from command line
eventwinnower 'json | match "price > `20`" | print'
# Load workflow from file
eventwinnower -f workflow.txt
# List available processors
eventwinnower -p
# Get detailed help for all processors
eventwinnower -P
# Get help for specific processor
eventwinnower -H jsonWhile simple linear pipelines use pipe-separated syntax, Event Winnower supports full processing graphs through variables:
source | processor1 args | processor2 args | output
# Multiple sources, branching, merging, multiple outputs
source1 | processor1 | $intermediate
source2 | processor2 | $intermediate
$intermediate | filter1 | $branch1
$intermediate | filter2 | $branch2
$branch1 | output1
$branch2 | output2
where $intermediate, $branch1 and $branch2 are variables that simply connect the output of processors to the input of the next downstream processor, allowing for branching and joining of data streams.
The workflow system supports variables for both branching and joining data streams:
# Store results in variables
source | processor1 | $variable_name
# Use variables in multiple pipelines
$variable_name | processor2 | output1
$variable_name | processor3 | output2
# Multiple pipelines feeding the same output variable
$input | select foo | $stream
$stream | contains bar -m this | $output
$stream | contains bar -m other | $output
# Combined output from multiple sources
$output | print
# Multiple inputs converging and diverging
source1 | processor1 | $merged
source2 | processor2 | $merged
$merged | filter1 | $branch1
$merged | filter2 | $branch2
$branch1 | output1
$branch2 | output2
Variables start with $ and can be used to:
- Connect processing graph edges - they define data flow connections
- Create branching workflows where one input feeds multiple processing paths
- Join multiple streams - multiple pipelines can send data to the same variable
- Build complex DAG (Directed Acyclic Graph) workflows
- Avoid recomputing expensive operations by reusing graph connections
Event Winnower supports environment variable substitution in workflow scripts using the ${VARIABLE_NAME} syntax. This allows you to parameterize your workflows and reference system environment variables at runtime.
${ENVIRONMENT_VARIABLE}
Environment variables are detected and replaced with their values during workflow preprocessing. If an environment variable is not found, it will be ignored and left as-is in the output.
Create a workflow file process.txt:
# Read from S3 bucket specified by environment variable
s3 ${BUCKET_NAME}/logs/ | $logs
# Filter by status from environment variable
$logs | match "level == \"${LOG_LEVEL}\"" | $filtered
# Output to parameterized S3 path
$filtered | s3buffer ${OUTPUT_BUCKET}/processed/
Run with environment variables:
export BUCKET_NAME="my-data-bucket"
export LOG_LEVEL="ERROR"
export OUTPUT_BUCKET="results-bucket"
eventwinnower -f process.txtAnother example with SQS and dynamic queue names:
# workflow.txt
sqs ${INPUT_QUEUE} | $messages
$messages | match "priority == \"high\"" | sqs ${PRIORITY_QUEUE}
$messages | match "priority == \"low\"" | sqs ${STANDARD_QUEUE}
export INPUT_QUEUE="incoming-messages"
export PRIORITY_QUEUE="high-priority-queue"
export STANDARD_QUEUE="standard-queue"
eventwinnower -f workflow.txt- Dynamic S3 paths: Reference bucket names, prefixes, or dates from environment variables
- Queue/Stream names: Parameterize SQS queue names, Kinesis streams, or SNS topics
- Configuration values: Pass thresholds, filters, or other parameters without modifying workflow files
- Multi-environment workflows: Use the same workflow file across dev, staging, and production by setting different environment variables
# Read JSON file and filter by price
cat test.json | eventwinnower 'json | match "price > `30`" | print'
# Count unique values
cat test.json | eventwinnower 'json | select name | uniq name | count'# Branch data to multiple outputs
cat test.json | eventwinnower '
json | $data
$data | match "price > `50`" | print
$data | s3buffer processed/
'
eventwinnower '
s3 bucket1/data.json.gz bucket2/data.json.zst | select "{id:id, val:value}" | $out
$out | uniq id | print
'json- Read JSON lines from stdincsv- Read CSV file from stdins3 <bucket/key>- Read from S3 objects3list <bucket/prefix>- List S3 objectssqss3 <queue>- Read from SQS queue containing S3+SNS file notificationskinesis_in <stream>- Read from Kinesis streamathena <query>- Execute Athena query
match <jmespath>- Filter using JMESPath expressionsselect <jmespath>- Select specific fieldscontains <jmespath> -m <match>- Filter records containing textstarts_with <jmespath> -m <match>- Filter by prefixends_with <jmespath> -m <match>- Filter by suffixuniq <key:jmespath>- select only uniq events based on specified key
transform <string:jmespath> [-ul]- Transform field values (upper, lower, etc.)decode <string:jmespath> -m <method>- Decode strings (base64, hex, url)split <string:jmespath> -d <delimiter>- Split strings into arraysarrayjoiner <array:jmespath> -s <separator>- Join arrays into stringsannotate <annotation> -l <label>- Add fields to recordsremove <field>- Remove fields
count- Count recordssum <value:jmespath> -k <key:jmespath>- Sum numeric valuessort <key:jmespath>- Sort by keytop <key:jmespath>- Get top N by value
kinesis_out <stream>- Send to Kinesis streamfirehose <stream>- Send to Kinesis Firehosesns <topic>- Publish to SNS topics3buffer <bucket_name>- Buffer and write to S3
# Process thousands of S3 objects in parallel with thread pool
export EVENTWINNOWER_THREADS=20
export EVENTWINNOWER_THREADPOOL=1
eventwinnower "s3list 'my-bucket/logs/' -p '2024/' |
head -n 1000 |
contains ERROR -m 1 |
s3buffer processed-errors/"# spin up duplicate threads - each thread runs independently
# listen on an SQS queue, process s3 files, select events, write to firehose
EVENTWINNOWER_THREADS=8 eventwinnower 'sqs QUEUE_NAME | s3event | contains my.haystack -m needle | transform --uppercase name | firehose STREAM_NAME'# Process data once, output to multiple destinations
cat data.json | eventwinnower '
json | select "id, name, status, timestamp" | $clean
$clean | match "status == \"error\"" | sns error-alerts
$clean | match "status == \"success\"" | s3buffer success-logs/
$clean | keycount status | print
'# Join multiple data sources using different source processors
eventwinnower '
# Read from different sources and merge streams
s3 bucket/web-logs.json | select "{timestamp:timestamp, message:message, level:level}" | $combined
sqs api-log-queue | select "{timestamp:timestamp, message:message, level:level}" | $combined
# Filter combined stream into different outputs
$combined | match "level == \"ERROR\"" | $errors
$combined | match "level == \"WARN\"" | $errors
$combined | match "level == \"INFO\"" | $info
# Multiple outputs from joined streams
$errors | sns critical-alerts
$errors | s3buffer error-logs/
$info | s3buffer info-logs/
'# Read from S3, process, and output to multiple destinations
eventwinnower 's3 my-bucket/logs/app.log | match "level == \"ERROR\"" | sns error-topic | s3buffer processed-errors/'# Apply processor to nested data
cat data.json | eventwinnower 'json | apply tags upper name | print'Create a file workflow.txt:
# Read and preprocess data
json | $input
# Filter and prepare data
$input | match "price > `0`" | select "{id:id, name:name, price:price, tags:tags}" | $filtered
# Process tags
$filtered | split "," tags | arrayjoiner "|" tags | $processed
# Multiple outputs from same data
$processed | annotate processed_at "`date`" | s3buffer output-bucket/processed/
$processed | select "{name:name, price:price}" | csv_out summary.csv
$filtered | keycount tags | print
Run with: cat input.json | eventwinnower -f workflow.txt
Event Winnower supports two threading models:
# Use 20 threads with thread pool - ideal for S3 processing
export EVENTWINNOWER_THREADS=20
export EVENTWINNOWER_THREADPOOL=1
eventwinnower "s3list 'bucket/prefix' | head -n 1024 | contains foo -m bar"In thread pool mode:
- Source processors (like
s3list) run in a single thread - Downstream processing is load-balanced across the thread pool
- Excellent for I/O-bound operations like S3 reads, API calls, network operations
- Provides better resource utilization and throughput
# Use 8 threads with threaded execution
EVENTWINNOWER_THREADS=8 eventwinnower 'workflow'In threaded mode:
- Each thread processes the entire workflow independently
- Better for CPU-bound operations like text processing, calculations
- Simpler execution model
# Set trigger timer for batch processing (in seconds)
eventwinnower -T 30 'sqs my-queue | batch_process | output'EVENTWINNOWER_WORKFLOW_FILE: Path to workflow fileEVENTWINNOWER_THREADS: Number of threads for parallel processingEVENTWINNOWER_THREADPOOL: Enable thread pool mode (recommended for I/O-heavy workloads like S3 processing)EVENTWINNOWER_VERBOSE: Enable verbose debug logging to stderr (e.g.,EVENTWINNOWER_VERBOSE=1)
- Processors validate input and provide descriptive error messages
- Use
-hflag with any processor to see usage information - Check AWS credentials and permissions for AWS-related processors
- Ensure required files (MaxMind databases) are present for IP enrichment
Every new processor is likely a remix of some existing processor. We suggest finding a processor code base to use as a template - and copy that code to a new rust file under /src/processors.
- A Structure that derives SerialProcessorInit, SerialSourceProcessorInit, AsyncProcessorInit or AsyncSourceProcessorInit
- A command line parser (most use clapp formatted arguments)
- An implementation of get_simple_description() - for single line help
- An implementations of new() to initialize the process instance based on command line argument
- An implementation of process() or process_batch() - not both
- which takes in event(s) and outputs events
- An optional implementation of stats() which is called on exit to report processing stats
- An optional implementation of flush() which is called on exit to flush out any buffered data and generate output events
- An optional implementation of trigger() which can be called on time or event triggers to produce event output
- if your processor is a source processor, you need to implement a generate() function rather than a process() or process_batch() function
When your processor code is ready to integrate, you will need to modify /src/processors/mod.rs to include your rust module, a use crate statement, and insert your module alias into the PROCESSORS hashmap. Once that is included your module should now be accessible to be called from the command line.
- Follow Rust coding standards and run
cargo fmtandcargo clippy - Add tests for new processors, make sure tests compile without errors or warnings
- Ensure all builds pass:
cargo build --release