Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
973 changes: 973 additions & 0 deletions crates/pipeline-manager/demos/sql/01-medallion-architecture.sql

Large diffs are not rendered by default.

9 changes: 9 additions & 0 deletions docs.feldera.com/docs/sidebars.js
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,15 @@ const guides = {
'use_cases/batch/part4'
]
},
{
type: 'category',
label: 'Real-Time Medallion Architecture',
items: [
'use_cases/medallion_architecture/part1',
'use_cases/medallion_architecture/part2',
'use_cases/medallion_architecture/part3'
]
},
{
type: 'doc',
id: 'use_cases/fraud_detection/fraud_detection',
Expand Down
105 changes: 105 additions & 0 deletions docs.feldera.com/docs/use_cases/medallion_architecture/part1.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
# Part 1: Create the Feldera Pipeline

## Overview

This demo shows how Feldera maintains a complete **medallion architecture (Bronze → Silver → Gold)** as a single, always-on **incremental** pipeline. Instead of running batch jobs on a schedule to rebuild each layer, Feldera computes every layer continuously: as new data arrives, only the affected results are recomputed, and downstream Silver and Gold views update within milliseconds.

The scenario is a retail e-commerce business. Raw operational data — orders, order line items, the product catalog, customers, suppliers, inventory movements, and clickstream events — lands in **Bronze** tables. Feldera cleans, validates, and joins it into **Silver** views, then rolls it up into **Gold** business metrics like supplier performance, inventory risk, weekly revenue trends, and cancellation impact.

The focus of the demo is *incremental view maintenance over a changing dataset*. Feldera supports a wide variety of [connectors](/connectors). In this demo, the pipeline first backfills a historical snapshot from delta tables on S3, then consumes a stream of **CDC (change data capture)** events — including updates and deletes (e.g. an order transitioning to `cancelled` or `returned`). Because Gold views are maintained incrementally, a single order status change moves correctly through window functions, moving averages, and cumulative totals — historical weekly revenue decreases, cancellation rates re-compute, and inventory alerts appear or clear — all without recomputing anything from scratch.

A Spark notebook is provided so you can compare a traditional batch refresh job against Feldera's incremental view maintenance (see [Part 3](./part3.md)).

### How data flows

1. **Snapshot backfill** — On start, each Bronze table reads a Delta table snapshot from S3 (`s3://feldera-demos/ecommerce-cdc-0-01/snapshot/...`) via Feldera's [Delta input connector](/connectors/sources/delta). The pipeline loads historical state from the delta tables on S3.
2. **CDC stream** — `push_changes.py` replays hourly Debezium NDJSON files into Feldera's HTTP ingress (`?update_format=debezium&json_flavor=debezium_mysql`), applying inserts, updates, and deletes against the Bronze tables.
3. **Continuous computation** — Every change propagates incrementally through the Silver and Gold layers. Ad-hoc queries (and a Grafana dashboard, if connected) reflect the new state immediately. Gold views are updated within milliseconds of Bronze tables receiving new input.

## Demo files

All of the demo's code is in the Feldera repository. You can download it directly:

| File | Purpose |
|------|---------|
| [`01-medallion-architecture.sql`](https://github.com/feldera/feldera/blob/main/crates/pipeline-manager/demos/sql/01-medallion-architecture.sql) | The full pipeline: Bronze tables, Silver views, Gold materialized views. |
| [`push_changes.py`](https://github.com/feldera/feldera/blob/main/docs.feldera.com/docs/use_cases/medallion_architecture/push_changes.py) | Replays CDC events from S3 into the running pipeline ([Part 2](./part2.md)). |
| [`spark_notebook.py`](https://github.com/feldera/feldera/blob/main/docs.feldera.com/docs/use_cases/medallion_architecture/spark_notebook.py) | The equivalent batch job in Spark, for comparison ([Part 3](./part3.md)). |

:::tip
Each GitHub page above has a **Download raw file** button. To fetch a file from a
terminal, use its raw URL, e.g.:

```bash
curl -O https://raw.githubusercontent.com/feldera/feldera/main/docs.feldera.com/docs/use_cases/medallion_architecture/push_changes.py
```
:::

## The medallion layers

The SQL pipeline is organized into three medallion layers. Bronze is the only layer that touches the raw source; Silver and Gold are pure transformations expressed as Feldera views, so they are maintained incrementally.

### Bronze — raw ingestion (`CREATE TABLE`)

Append-and-update tables that mirror the source systems with no transformation. Each is backfilled from a Delta snapshot and then kept current by the CDC stream.

| Table | Description |
|-------|-------------|
| `bronze_clickstream_events` | Page views, sessions, and product interactions |
| `bronze_orders` | Order headers with status, totals, payment, and coupons |
| `bronze_order_items` | Order line items (product, quantity, unit price, discount) |
| `bronze_products` | Product catalog with cost and list pricing |
| `bronze_inventory_events` | Inventory movements (restock, sale_reserve, returns) |
| `bronze_customers` | Customer dimension with tier and geography |
| `bronze_suppliers` | Supplier dimension with lead times |

### Silver — cleaned, validated, enriched (`LOCAL VIEW`)

All cleaning, validation, deduplication, and joins happen here, so that Gold never references Bronze directly.

- `silver_customers` — validated customer dimension (known tiers only)
- `silver_enriched_clickstream` — clickstream events joined to customer and product context
- `silver_orders_enriched` — orders joined to customers and aggregated line-item metrics
- `silver_order_items_enriched` — line items enriched with product, supplier, and customer context; computes line-level revenue and margin
- `silver_confirmed_order_items` — line items excluding cancelled/returned orders
- `silver_inventory_current` — running per-product/per-warehouse stock from cumulative inventory events
- `silver_inventory_by_supplier` — supplier-level inventory rollup

### Gold — business metrics and analytics (`MATERIALIZED VIEW`)

Pure aggregation over Silver — no filtering, no Bronze references. These are the views a BI tool or dashboard consumes.

- `gold_supplier_performance` — revenue, margin, reliability, and inventory by supplier
- `gold_inventory_risk` — days-of-stock vs. lead time, scored CRITICAL / WARNING / OK
- `gold_order_status_summary` — order count and revenue by status (changes visibly with every CDC commit)
- `gold_weekly_revenue_trend` — weekly revenue with WoW change, 4-week moving average, and cumulative YTD
- `gold_cancellation_impact` — cancellation rates with cumulative and 4-week moving windows
- `gold_realtime_inventory_alerts` — filtered stream of CRITICAL inventory items
- `gold_product_demand_surge` - Recent (24h-window from latest event) cart velocity vs stock & lead time

## Create the pipeline

You have two options.

### Option A — from the Web Console (recommended)

This pipeline ships as a packaged demo, so you don't have to copy any SQL:

1. Open the Feldera Web Console.
2. On the home page, find **Maintaining a Medallion Architecture in Real Time** in the **Explore use cases and tutorials** section and click it.
3. The Web Console creates a pipeline named **`ecommerce-medallion-architecture`** with the full SQL pre-loaded.
4. Click **▶ Start** to compile and run it.

### Option B — paste the SQL

1. Create a new pipeline in the Web Console.
2. Paste the contents of [`01-medallion-architecture.sql`](https://github.com/feldera/feldera/blob/main/crates/pipeline-manager/demos/sql/01-medallion-architecture.sql).
3. Click **▶ Start**.

When the pipeline starts, the `delta_table_input` connectors backfill the historical snapshot from S3. Once ingestion settles, query a Gold view from the **Ad-hoc query** tab to confirm the backfill succeeded:

```sql
SELECT * FROM gold_supplier_performance ORDER BY total_net_revenue DESC LIMIT 10;
```

With the pipeline running on the historical snapshot, continue to [Part 2](./part2.md) to push CDC changes and watch the Gold views update in real time.
112 changes: 112 additions & 0 deletions docs.feldera.com/docs/use_cases/medallion_architecture/part2.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
# Part 2: Push CDC events to the Feldera Pipeline

With the pipeline from [Part 1](./part1.md) running on the historical snapshot, this section streams **change data capture (CDC)** events into it and shows the Gold layer updating incrementally.

The CDC events live in the same public S3 bucket as the snapshot, partitioned by table and hour. In practice, many Feldera users with data in Delta format use our [snapshot_and_follow](/connectors/sources/delta) connector configuration, which follows the Delta log. Feldera processes the changes as soon as they are written to Delta. To make the demo more interactive, changes are pushed directly to the pipeline.

```
s3://feldera-demos/ecommerce-cdc-0-01/cdc/<table>/<YYYY-MM-DDTHH>.json
```

Each file is newline-delimited JSON (NDJSON) in **Debezium** format, so every record carries a `before`/`after` image and an operation (`c`reate, `u`pdate, `d`elete). Updates and deletes are what make the demo interesting: an order moving to `cancelled` is an *update* that must ripple back through historical weekly revenue, cancellation rates, and inventory alerts.

CDC is replayed by [`push_changes.py`](https://github.com/feldera/feldera/blob/main/docs.feldera.com/docs/use_cases/medallion_architecture/push_changes.py), which reads the hourly files from S3 and pushes them to Feldera's HTTP ingress with `update_format=debezium` and `json_flavor=debezium_mysql`. It pushes the four tables that actually change — `bronze_orders`, `bronze_order_items`, `bronze_clickstream_events`, and `bronze_inventory_events` — and times how long each push takes.

## Prerequisites

```bash
pip install feldera boto3 python-dotenv
```

Point the script at your pipeline. It reads connection settings from the environment (a `.env` file works, via `python-dotenv`):

```bash
export FELDERA_URL=http://localhost:8080 # default if unset
export FELDERA_API_KEY=<your-api-key> # only if your instance requires auth
```

No AWS credentials are needed — the demo bucket is read with anonymous access in `us-west-1`.

:::note Pipeline name
`push_changes.py` defaults to a pipeline named `ecommerce-demo`. If you created the
pipeline from the Web Console use-case tile, it is named
`ecommerce-medallion-architecture`, so pass `--pipeline ecommerce-medallion-architecture`
(as shown below) or rename your pipeline to match.
:::

## Usage

The script takes exactly one of `--hour`, `--date`, or `--all`:

```bash
# Push a single hour of CDC across all four tables
python push_changes.py --pipeline ecommerce-medallion-architecture --hour 2025-11-30T00

# Push all 24 hours for one day (each hour is timed individually)
python push_changes.py --pipeline ecommerce-medallion-architecture --date 2025-11-30

# Push every available hour, across all days
python push_changes.py --pipeline ecommerce-medallion-architecture --all
```

| Option | Description |
|--------|-------------|
| `--hour <YYYY-MM-DDTHH>` | Push one hour of CDC for every table. |
| `--date <YYYY-MM-DD>` | Push all 24 hours of a day, printing per-hour and total timing. |
| `--all` | Push all available CDC data, day by day. |
| `--pipeline <name>` | Target pipeline (default: `ecommerce-demo`). |
| `--feldera <url>` | Feldera URL (default: `$FELDERA_URL`, else `http://localhost:8080`). |
| `--chunk-input` | Split each table's payload into sub-1 MB chunks before pushing. Use this if a proxy in front of Feldera (e.g. nginx, whose default body limit is 1 MB) rejects large request bodies. |

A single-hour push prints something like:

```
Pushing CDC hour: 2025-11-30T00
------------------------------------------------------------
bronze_orders 412 rows → 0.061s
bronze_order_items 938 rows → 0.072s
bronze_clickstream_events 3,517 rows → 0.118s
bronze_inventory_events 684 rows → 0.054s
```

Each call uses `wait=True`, so the timing reflects Feldera fully ingesting *and* incrementally recomputing every affected Silver and Gold view — not just accepting the bytes. This is a good representation of how Feldera works in the real world, where the pipeline is always on and ready to process changes directly from the source data, be that Delta Lake, Kafka, Postgres, or any other connector.

## Watch a Gold view update in real time

The clearest way to see incremental maintenance is to watch alerts appear the instant their inputs change. Run this in the **Ad-hoc query** tab:

```sql
SELECT COUNT(*) FROM gold_realtime_inventory_alerts;
```

Now make a change that should generate alerts. Raising a supplier's lead time pushes every product it supplies that has recorded sales toward the CRITICAL threshold (`days_of_stock_remaining < lead_time_days * 1.5`):

```sql
INSERT INTO bronze_suppliers VALUES (7, 'Blake and Sons', 'DE', 10000, now());
```

Re-run the count immediately — it jumps within milliseconds, without a batch job or recomputation.

```sql
SELECT COUNT(*) FROM gold_realtime_inventory_alerts;
```

Or, push a real slice of history and watch the views change. With the ecommerce-medallion-architecture pipeline open, navigate to the change stream tab and select a view. `gold_realtime_inventory_alerts`, `gold_product_demand_surge`, and `gold_weekly_revenue_trend` are all great candidates.

Look at the Runtime tab to see Feldera's memory usage. The memory footprint is low and will stay that way even as data grows. The size of the change is the main factor in Feldera's memory usage.

```bash
python push_changes.py --pipeline ecommerce-medallion-architecture --date 2025-11-30
```

```sql
-- Cancellations updated incrementally as orders flip to 'cancelled'
SELECT category, week_start, cumulative_cancellation_rate
FROM gold_cancellation_impact
ORDER BY week_start DESC, category
LIMIT 20;
```

Because the orders that get cancelled in the CDC stream belong to *past* weeks, you'll see historical `weekly_net_revenue` in `gold_weekly_revenue_trend` change and the dependent window calculations (WoW change, 4-week moving average, cumulative YTD) adjust — all without rebuilding anything.

Next, [Part 3](./part3.md) runs the same logic as a Spark batch job for comparison.
53 changes: 53 additions & 0 deletions docs.feldera.com/docs/use_cases/medallion_architecture/part3.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
# Part 3: Compare Feldera and Spark

Parts [1](./part1.md) and [2](./part2.md) built the medallion as an always-on Feldera pipeline that updates Gold within milliseconds of each change. This section runs the **same business logic as a batch job in Spark** so you can compare the two cost models side by side.

The batch job is provided as a Databricks notebook:

- [`spark_notebook.py`](https://github.com/feldera/feldera/blob/main/docs.feldera.com/docs/use_cases/medallion_architecture/spark_notebook.py) — *Compute Silver & Gold Delta Tables — Spark Batch*

```bash
curl -O https://raw.githubusercontent.com/feldera/feldera/main/docs.feldera.com/docs/use_cases/medallion_architecture/spark_notebook.py
```

Import it into Databricks (or any Spark 3.x environment with Delta Lake), or run it as a plain PySpark script — outside Databricks you'll need the `delta-spark` and `hadoop-aws` packages configured, and the `# COMMAND` / `# MAGIC` lines are just comments. It reads the same Bronze snapshot from `s3://feldera-demos/ecommerce-cdc-0-01/snapshot` and produces the **same 7 Silver and 7 Gold tables** as the SQL pipeline.

## What the notebook does

The notebook mirrors the demo SQL one-to-one, organized into the sections you'll see in the file:

- **Configuration** — sets the scale factor and the Bronze/Silver/Gold S3 paths. By default `WRITE_OUTPUT = False`, so Gold is computed but written to a `noop` sink; this isolates the *compute* cost from S3 write latency. Flip it to `True` to persist the Silver and Gold layers as Delta tables.
- **Silver & Gold pipeline** — `run_pipeline()` rebuilds all 14 tables with full-refresh (`overwrite`) semantics, the way a scheduled batch job would.
- **Run** — executes one full refresh and prints the per-step and total wall-clock time.
- **CDC incremental comparison** — `run_pipeline_with_cdc(up_to_hour)` reprocesses the snapshot *plus* a growing prefix of the CDC stream at a series of `CDC_CHECKPOINTS`. The incremental comparison shows **batch cost scaling**.

## Key Differences

A batch engine has no memory of its previous run. To reflect a single new CDC event, it must re-read the entire dataset and recompute every layer from scratch. As the dataset grows, each refresh gets slower — even when only a handful of rows changed.

| | Spark (batch) | Feldera (incremental) |
|---|---|---|
| Cost of a refresh | Proportional to **total** dataset size | Proportional to the **size of the change** |
| Reflecting one CDC event | Full re-scan + full recompute | Update only the affected Gold rows |
| Freshness | Bounded by the batch schedule | Milliseconds |
| Orchestration | Scheduler, dependency DAG, full-vs-incremental logic | None — views are always current |

At every `CDC_CHECKPOINTS` step, Spark re-scans the *entire* dataset and rebuilds every layer from scratch. At the demo's scale factor the dataset is small, so per-checkpoint times stay roughly constant — but each run still recomputes everything, and that cost grows directly with total data size as history accumulates. Feldera absorbed those same changes in milliseconds because its cost is tied to *what changed*, not *how much data exists* — so its latency and compute cost stay flat as the dataset grows, while the batch cost climbs with it.

## Running the comparison

1. Keep the Feldera pipeline from Parts 1–2 running and note how long each `push_changes.py` push took (printed per table, per hour).
2. Run `spark_notebook.py` end to end. Note the **total refresh time** from the **Run** section.
3. Work through the **CDC incremental comparison** section. Each checkpoint re-scans the whole dataset and recomputes every layer; at this scale factor the times stay roughly constant, but each is a full from-scratch refresh whose cost scales with total data size.
4. Compare: the Spark numbers are the cost of *one* full refresh of the whole dataset — paid again on every run and growing with the data; the Feldera numbers in Part 2 are the cost of *applying the changes* and stay flat as history grows.

Some of the silver views could be made incremental by hand by a data engineer — for example:

- `silver_customers`
- `silver_confirmed_order_items`
- `silver_inventory_current`
- `silver_inventory_by_supplier`

More complex views, such as `gold_weekly_revenue_trend`, would require merge logic that is difficult to validate and entails significant engineering effort.

With Feldera, you use the same SQL your batch engine runs now, get the same results, and pay a cost that tracks change size instead of data size. In addition, you get millisecond update latency across your full medallion architecture.
Loading
Loading