Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
4abfcaa
Add native Iceberg storage support using PyIceberg and DuckDB
tommy-ca Jan 13, 2026
0093113
feat(offline-store): Complete Iceberg offline store Phase 2 implement…
tommy-ca Jan 14, 2026
b9659ad
feat(online-store): Complete Iceberg online store Phase 3 implementation
tommy-ca Jan 14, 2026
7042b0d
docs: Complete Iceberg documentation Phase 4
tommy-ca Jan 14, 2026
8ce4bd8
fix: Phase 5.1 - Fix offline/online store bugs from code audit
tommy-ca Jan 14, 2026
d54624a
feat: Phase 5.2-5.4 - Complete Iceberg integration tests, examples, a…
tommy-ca Jan 14, 2026
2c35063
docs: Update plan.md with Phase 5 completion and Phase 6 roadmap
tommy-ca Jan 14, 2026
d804d79
docs: Update design specs with final statistics and create implementa…
tommy-ca Jan 14, 2026
80b6ab3
docs: Complete Phase 6 - Final review and production readiness
tommy-ca Jan 14, 2026
eca8bc6
docs: Add comprehensive project completion summary
tommy-ca Jan 14, 2026
ed29614
docs: Add comprehensive lessons learned and project closure
tommy-ca Jan 14, 2026
6d440e9
docs: Add comprehensive documentation index and navigation guide
tommy-ca Jan 14, 2026
da09162
fix: Final robust fixes for Iceberg storage integration
tommy-ca Jan 15, 2026
69f0750
docs(specs): streamline Iceberg plan Phase 6 summary
tommy-ca Jan 15, 2026
3b8f2e2
docs(specs): update Iceberg offline store final details
tommy-ca Jan 15, 2026
850a89d
docs(specs): update Iceberg online store final details
tommy-ca Jan 15, 2026
f877d15
docs(specs): fix Iceberg quickstart config examples
tommy-ca Jan 15, 2026
a171cb9
docs(specs): remove stale Iceberg online store status section
tommy-ca Jan 15, 2026
56e51ee
docs(specs): add Iceberg production readiness hardening backlog
tommy-ca Jan 15, 2026
a1dce29
docs(reference): align Iceberg offline store examples with config
tommy-ca Jan 15, 2026
c0c5627
fix(online-store): project columns and align entity_hash partitions
tommy-ca Jan 15, 2026
363e26d
feat(offline-store): validate IcebergSource configuration
tommy-ca Jan 15, 2026
02ba04d
docs: mark Iceberg stores beta and define certified matrix
tommy-ca Jan 15, 2026
637224d
docs(specs): align Iceberg spec dependencies with implementation
tommy-ca Jan 15, 2026
0df1cb2
fix(offline-store): configure DuckDB for S3 endpoints
tommy-ca Jan 15, 2026
87f306c
examples: add Iceberg REST+MinIO certification smoke test
tommy-ca Jan 15, 2026
5496feb
docs: add Iceberg certification checklist and Make targets
tommy-ca Jan 15, 2026
0dda4fa
chore: make Iceberg smoke targets uv-native
tommy-ca Jan 15, 2026
f4ce843
docs(examples): switch Iceberg workflow to uv run
tommy-ca Jan 15, 2026
0bba23e
fix(examples): create iceberg-local data directories
tommy-ca Jan 15, 2026
3282530
chore(make): add Iceberg certification target
tommy-ca Jan 15, 2026
7a955e2
chore(examples): ignore iceberg-local output data
tommy-ca Jan 15, 2026
30e2a2b
docs(specs): update Iceberg hardening schedule
tommy-ca Jan 15, 2026
d36083a
fix(iceberg): critical security and correctness fixes for Iceberg stores
tommy-ca Jan 16, 2026
18f4539
test(iceberg): add comprehensive tests for critical bug fixes
tommy-ca Jan 16, 2026
82baff6
fix(iceberg): resolve P0 critical security issues and additional impr…
tommy-ca Jan 16, 2026
4b638b7
docs(solutions): add security solution for SQL injection and credenti…
tommy-ca Jan 16, 2026
4cc3a88
docs(planning): add rescheduled work plan for remaining P1/P2 issues
tommy-ca Jan 16, 2026
92941a0
docs(summary): add comprehensive session summary
tommy-ca Jan 16, 2026
e1ed1fa
fix(iceberg): resolve Session 1 P1 issues and add TTL validation
tommy-ca Jan 16, 2026
29f1522
docs(todos): verify and close Session 2 issues
tommy-ca Jan 17, 2026
c49ae25
docs(session): update summary with Sessions 1-2 completion
tommy-ca Jan 17, 2026
b1c148d
docs(completion): add comprehensive Sessions 1-2 completion summary
tommy-ca Jan 17, 2026
d7b1634
perf(iceberg): add catalog connection caching to online store
tommy-ca Jan 17, 2026
13e92fc
docs(session): add Session 3 completion summary
tommy-ca Jan 17, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
examples: add Iceberg REST+MinIO certification smoke test
  • Loading branch information
tommy-ca committed Jan 15, 2026
commit 87f306c4c1ee2b7ba8b4728707e2cd700d58bde7
40 changes: 40 additions & 0 deletions examples/iceberg-rest-minio/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# Iceberg REST catalog + MinIO smoke test

This example is a deterministic smoke test for the **certified** configuration:

- Iceberg **REST catalog**
- S3-compatible warehouse via **MinIO**

It validates that Feast’s Iceberg offline/online store integrations can:

- connect to a REST Iceberg catalog
- create and append to Iceberg tables in S3-compatible storage
- read data back via the Iceberg online store API (write + read)
- read data back via the Iceberg offline store helper paths (schema resolve + DuckDB read)

## Prerequisites

- Docker + docker compose
- Python with `pyiceberg`, `pyarrow`, and `duckdb` available

From the Feast repo root, run the smoke test using the repo sources:

## Run

```bash
cd examples/iceberg-rest-minio

docker compose up -d

# Run smoke test against the REST catalog
PYTHONPATH=../../sdk/python python smoke_test.py

docker compose down -v
```

## Notes

- The compose stack exposes:
- MinIO: `http://localhost:9000` (console: `http://localhost:9001`)
- Iceberg REST: `http://localhost:8181`
- This is intended as a **smoke test**, not a benchmark.
59 changes: 59 additions & 0 deletions examples/iceberg-rest-minio/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
services:
minio:
image: minio/minio@sha256:14cea493d9a34af32f524e538b8346cf79f3321eff8e708c1e2960462bd8936e
command: server /data --console-address ":9001"
environment:
MINIO_ROOT_USER: minio
MINIO_ROOT_PASSWORD: minio123
ports:
- "9000:9000"
- "9001:9001"
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:9000/minio/health/ready"]
interval: 2s
timeout: 2s
retries: 30

mc:
image: minio/mc@sha256:a7fe349ef4bd8521fb8497f55c6042871b2ae640607cf99d9bede5e9bdf11727
depends_on:
minio:
condition: service_healthy
entrypoint: [
"/bin/sh",
"-c",
"mc alias set local http://minio:9000 minio minio123 && mc mb -p local/warehouse || true && mc anonymous set download local/warehouse || true"
]

iceberg-rest:
image: tabulario/iceberg-rest@sha256:3b7d31bdfec626b68e97531c9778a1b9119659e456fe28545a49f6aa6a9ce472
depends_on:
minio:
condition: service_healthy
ports:
- "8181:8181"
environment:
# Iceberg REST catalog configuration
CATALOG_WAREHOUSE: s3://warehouse/
CATALOG_IO__IMPL: org.apache.iceberg.aws.s3.S3FileIO
CATALOG_S3_ENDPOINT: http://minio:9000

# Iceberg S3FileIO properties (note: __ maps to '-')
CATALOG_S3_PATH__STYLE__ACCESS: "true"
CATALOG_S3_ACCESS__KEY__ID: minio
CATALOG_S3_SECRET__ACCESS__KEY: minio123

# S3 credentials (MinIO)
AWS_ACCESS_KEY_ID: minio
AWS_SECRET_ACCESS_KEY: minio123
AWS_REGION: us-east-1
AWS_ENDPOINT_URL: http://minio:9000
AWS_ENDPOINT_URL_S3: http://minio:9000

# Many S3-compatible stores require path-style access.
AWS_S3_PATH_STYLE_ACCESS: "true"
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8181/v1/config"]
interval: 2s
timeout: 2s
retries: 30
269 changes: 269 additions & 0 deletions examples/iceberg-rest-minio/smoke_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,269 @@
from __future__ import annotations

from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
from types import SimpleNamespace
from typing import Dict, List

import pyarrow as pa
from pyiceberg.catalog import load_catalog
from pyiceberg.schema import Schema
from pyiceberg.types import (
DoubleType,
LongType,
NestedField,
StringType,
TimestampType,
)

from feast import Entity, FeatureView, Field, FileSource
from feast.infra.offline_stores.contrib.iceberg_offline_store.iceberg import (
IcebergOfflineStore,
IcebergOfflineStoreConfig,
)
from feast.infra.offline_stores.contrib.iceberg_offline_store.iceberg_source import (
IcebergSource,
)
from feast.infra.online_stores.contrib.iceberg_online_store.iceberg import (
IcebergOnlineStore,
IcebergOnlineStoreConfig,
)
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
from feast.protos.feast.types.Value_pb2 import Value as ValueProto
from feast.value_type import ValueType
from feast.types import Float64


@dataclass(frozen=True)
class RestMinioEnv:
rest_uri: str
warehouse: str
catalog_name: str
namespace_offline: str
namespace_online: str
s3_endpoint: str
s3_access_key: str
s3_secret_key: str
s3_region: str


def _env() -> RestMinioEnv:
# Keep these defaults aligned with docker-compose.yml
return RestMinioEnv(
rest_uri="http://localhost:8181",
warehouse="s3://warehouse/",
catalog_name="feast_catalog",
namespace_offline="feast",
namespace_online="feast_online",
s3_endpoint="http://localhost:9000",
s3_access_key="minio",
s3_secret_key="minio123",
s3_region="us-east-1",
)


def _storage_options(env: RestMinioEnv) -> Dict[str, str]:
# pyiceberg uses dotted keys in its properties mapping.
return {
"s3.endpoint": env.s3_endpoint,
"s3.access-key-id": env.s3_access_key,
"s3.secret-access-key": env.s3_secret_key,
"s3.region": env.s3_region,
# S3-compatible endpoints typically require path-style access.
"s3.path-style-access": "true",
}


def _build_offline_table_schema() -> Schema:
# Minimal offline feature table schema for schema resolution + DuckDB reads.
return Schema(
NestedField(field_id=1, name="driver_id", field_type=LongType(), required=False),
NestedField(
field_id=2, name="event_timestamp", field_type=TimestampType(), required=False
),
NestedField(
field_id=3, name="created_ts", field_type=TimestampType(), required=False
),
NestedField(field_id=4, name="conv_rate", field_type=DoubleType(), required=False),
NestedField(field_id=5, name="acc_rate", field_type=DoubleType(), required=False),
NestedField(
field_id=6, name="avg_daily_trips", field_type=LongType(), required=False
),
)


def _build_offline_arrow_table(now: datetime) -> pa.Table:
# Use microsecond timestamps because Iceberg expects microseconds.
timestamps = [now - timedelta(hours=2), now - timedelta(hours=1), now]
created_ts = [t + timedelta(minutes=1) for t in timestamps]

return pa.Table.from_pydict(
{
"driver_id": [1001, 1001, 1002],
"event_timestamp": pa.array(timestamps, type=pa.timestamp("us")),
"created_ts": pa.array(created_ts, type=pa.timestamp("us")),
"conv_rate": [0.1, 0.2, 0.3],
"acc_rate": [0.9, 0.8, 0.7],
"avg_daily_trips": [10, 11, 12],
}
)


def _ensure_namespace(catalog, namespace: str) -> None:
try:
catalog.create_namespace(namespace)
except Exception:
pass


def _ensure_table(catalog, identifier: str, schema: Schema) -> None:
try:
catalog.load_table(identifier)
return
except Exception:
pass

catalog.create_table(identifier=identifier, schema=schema)


def _append_arrow(catalog, identifier: str, arrow_table: pa.Table) -> None:
table = catalog.load_table(identifier)
table.append(arrow_table)


def _offline_smoke(env: RestMinioEnv) -> None:
catalog = load_catalog(
env.catalog_name,
type="rest",
uri=env.rest_uri,
warehouse=env.warehouse,
**_storage_options(env),
)

_ensure_namespace(catalog, env.namespace_offline)

offline_table_id = f"{env.namespace_offline}.driver_stats"
_ensure_table(catalog, offline_table_id, _build_offline_table_schema())

now = datetime.now(timezone.utc).replace(tzinfo=None)
_append_arrow(catalog, offline_table_id, _build_offline_arrow_table(now))

offline_store_config = IcebergOfflineStoreConfig(
type="iceberg",
catalog_type="rest",
catalog_name=env.catalog_name,
uri=env.rest_uri,
warehouse=env.warehouse,
namespace=env.namespace_offline,
storage_options=_storage_options(env),
)

repo_config = SimpleNamespace(offline_store=offline_store_config)

source = IcebergSource(
name="driver_stats",
table_identifier=offline_table_id,
timestamp_field="event_timestamp",
created_timestamp_column="created_ts",
)

source.validate(repo_config)

job = IcebergOfflineStore.pull_latest_from_table_or_query(
config=repo_config,
data_source=source,
join_key_columns=["driver_id"],
feature_name_columns=["conv_rate"],
timestamp_field="event_timestamp",
created_timestamp_column="created_ts",
start_date=None,
end_date=None,
)

df = job.to_df()
assert len(df) >= 1
assert "driver_id" in df.columns
assert "event_timestamp" in df.columns
assert "created_ts" in df.columns
assert "conv_rate" in df.columns


def _online_smoke(env: RestMinioEnv) -> None:
online_store = IcebergOnlineStore()

online_store_config = IcebergOnlineStoreConfig(
type="iceberg",
catalog_type="rest",
catalog_name=env.catalog_name,
uri=env.rest_uri,
warehouse=env.warehouse,
namespace=env.namespace_online,
partition_strategy="entity_hash",
partition_count=256,
read_timeout_ms=1000,
storage_options=_storage_options(env),
)

repo_config = SimpleNamespace(
online_store=online_store_config,
project="iceberg_smoke",
entity_key_serialization_version=3,
)

driver = Entity(name="driver", join_keys=["driver_id"], value_type=ValueType.INT64)
file_source = FileSource(name="dummy", path="/tmp/unused.parquet")

fv = FeatureView(
name="driver_stats",
entities=[driver],
schema=[
Field(name="conv_rate", dtype=Float64),
Field(name="acc_rate", dtype=Float64),
],
source=file_source,
)

now = datetime.now(timezone.utc).replace(tzinfo=None)

ek = EntityKeyProto(join_keys=["driver_id"], entity_values=[ValueProto(int64_val=1001)])

online_store.online_write_batch(
config=repo_config,
table=fv,
data=[
(
ek,
{
"conv_rate": ValueProto(double_val=0.123),
"acc_rate": ValueProto(double_val=0.456),
},
now,
now,
)
],
progress=None,
)

results = online_store.online_read(
config=repo_config,
table=fv,
entity_keys=[ek],
requested_features=["conv_rate"],
)

assert len(results) == 1
ts, features = results[0]
assert ts is not None
assert features is not None
assert "conv_rate" in features


def main() -> None:
env = _env()
_offline_smoke(env)
_online_smoke(env)
print("βœ… Iceberg REST+MinIO smoke test passed")


if __name__ == "__main__":
main()