Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
4abfcaa
Add native Iceberg storage support using PyIceberg and DuckDB
tommy-ca Jan 13, 2026
0093113
feat(offline-store): Complete Iceberg offline store Phase 2 implement…
tommy-ca Jan 14, 2026
b9659ad
feat(online-store): Complete Iceberg online store Phase 3 implementation
tommy-ca Jan 14, 2026
7042b0d
docs: Complete Iceberg documentation Phase 4
tommy-ca Jan 14, 2026
8ce4bd8
fix: Phase 5.1 - Fix offline/online store bugs from code audit
tommy-ca Jan 14, 2026
d54624a
feat: Phase 5.2-5.4 - Complete Iceberg integration tests, examples, a…
tommy-ca Jan 14, 2026
2c35063
docs: Update plan.md with Phase 5 completion and Phase 6 roadmap
tommy-ca Jan 14, 2026
d804d79
docs: Update design specs with final statistics and create implementa…
tommy-ca Jan 14, 2026
80b6ab3
docs: Complete Phase 6 - Final review and production readiness
tommy-ca Jan 14, 2026
eca8bc6
docs: Add comprehensive project completion summary
tommy-ca Jan 14, 2026
ed29614
docs: Add comprehensive lessons learned and project closure
tommy-ca Jan 14, 2026
6d440e9
docs: Add comprehensive documentation index and navigation guide
tommy-ca Jan 14, 2026
da09162
fix: Final robust fixes for Iceberg storage integration
tommy-ca Jan 15, 2026
69f0750
docs(specs): streamline Iceberg plan Phase 6 summary
tommy-ca Jan 15, 2026
3b8f2e2
docs(specs): update Iceberg offline store final details
tommy-ca Jan 15, 2026
850a89d
docs(specs): update Iceberg online store final details
tommy-ca Jan 15, 2026
f877d15
docs(specs): fix Iceberg quickstart config examples
tommy-ca Jan 15, 2026
a171cb9
docs(specs): remove stale Iceberg online store status section
tommy-ca Jan 15, 2026
56e51ee
docs(specs): add Iceberg production readiness hardening backlog
tommy-ca Jan 15, 2026
a1dce29
docs(reference): align Iceberg offline store examples with config
tommy-ca Jan 15, 2026
c0c5627
fix(online-store): project columns and align entity_hash partitions
tommy-ca Jan 15, 2026
363e26d
feat(offline-store): validate IcebergSource configuration
tommy-ca Jan 15, 2026
02ba04d
docs: mark Iceberg stores beta and define certified matrix
tommy-ca Jan 15, 2026
637224d
docs(specs): align Iceberg spec dependencies with implementation
tommy-ca Jan 15, 2026
0df1cb2
fix(offline-store): configure DuckDB for S3 endpoints
tommy-ca Jan 15, 2026
87f306c
examples: add Iceberg REST+MinIO certification smoke test
tommy-ca Jan 15, 2026
5496feb
docs: add Iceberg certification checklist and Make targets
tommy-ca Jan 15, 2026
0dda4fa
chore: make Iceberg smoke targets uv-native
tommy-ca Jan 15, 2026
f4ce843
docs(examples): switch Iceberg workflow to uv run
tommy-ca Jan 15, 2026
0bba23e
fix(examples): create iceberg-local data directories
tommy-ca Jan 15, 2026
3282530
chore(make): add Iceberg certification target
tommy-ca Jan 15, 2026
7a955e2
chore(examples): ignore iceberg-local output data
tommy-ca Jan 15, 2026
30e2a2b
docs(specs): update Iceberg hardening schedule
tommy-ca Jan 15, 2026
d36083a
fix(iceberg): critical security and correctness fixes for Iceberg stores
tommy-ca Jan 16, 2026
18f4539
test(iceberg): add comprehensive tests for critical bug fixes
tommy-ca Jan 16, 2026
82baff6
fix(iceberg): resolve P0 critical security issues and additional impr…
tommy-ca Jan 16, 2026
4b638b7
docs(solutions): add security solution for SQL injection and credenti…
tommy-ca Jan 16, 2026
4cc3a88
docs(planning): add rescheduled work plan for remaining P1/P2 issues
tommy-ca Jan 16, 2026
92941a0
docs(summary): add comprehensive session summary
tommy-ca Jan 16, 2026
e1ed1fa
fix(iceberg): resolve Session 1 P1 issues and add TTL validation
tommy-ca Jan 16, 2026
29f1522
docs(todos): verify and close Session 2 issues
tommy-ca Jan 17, 2026
c49ae25
docs(session): update summary with Sessions 1-2 completion
tommy-ca Jan 17, 2026
b1c148d
docs(completion): add comprehensive Sessions 1-2 completion summary
tommy-ca Jan 17, 2026
d7b1634
perf(iceberg): add catalog connection caching to online store
tommy-ca Jan 17, 2026
13e92fc
docs(session): add Session 3 completion summary
tommy-ca Jan 17, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
fix: Final robust fixes for Iceberg storage integration
  • Loading branch information
tommy-ca committed Jan 15, 2026
commit da09162f5b43b10830b36893aca9e393dd2c4342
10 changes: 6 additions & 4 deletions examples/iceberg-local/run_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,9 @@ def setup_iceberg_table(df: pd.DataFrame):

# Define Iceberg schema
iceberg_schema = Schema(
NestedField(1, "driver_id", LongType(), required=True),
NestedField(2, "event_timestamp", TimestampType(), required=True),
NestedField(3, "created", TimestampType(), required=True),
NestedField(1, "driver_id", LongType(), required=False),
NestedField(2, "event_timestamp", TimestampType(), required=False),
NestedField(3, "created", TimestampType(), required=False),
NestedField(4, "conv_rate", FloatType(), required=False),
NestedField(5, "acc_rate", FloatType(), required=False),
NestedField(6, "avg_daily_trips", LongType(), required=False),
Expand Down Expand Up @@ -168,7 +168,9 @@ def run_feast_workflow():

# Apply features from features.py
print("\nApplying feature definitions...")
fs.apply(["features.py"])
from features import driver, driver_stats_fv, driver_activity_v1, driver_activity_v2

fs.apply([driver, driver_stats_fv, driver_activity_v1, driver_activity_v2])
print("Applied entities, feature views, and feature services")

# Materialize features to online store
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ ibis = [
"poetry-dynamic-versioning",
]
iceberg = [
"pyiceberg[sql,duckdb]>=0.8.0",
"pyiceberg[sql,duckdb,pyiceberg-core]>=0.8.0",
"duckdb>=1.0.0",
]
ikv = [
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from datetime import datetime
from typing import Any, Dict, List, Literal, Optional, Union
from typing import Any, Dict, List, Literal, Optional, Tuple, Union

import duckdb
import pandas as pd
Expand All @@ -15,6 +15,7 @@
from feast.infra.registry.base_registry import BaseRegistry
from feast.on_demand_feature_view import OnDemandFeatureView
from feast.repo_config import FeastConfigBaseModel, RepoConfig
from feast.utils import to_naive_utc


class IcebergOfflineStoreConfig(FeastConfigBaseModel):
Expand All @@ -33,6 +34,9 @@ class IcebergOfflineStoreConfig(FeastConfigBaseModel):
warehouse: str = "warehouse"
""" Warehouse path """

namespace: str = "feast"
""" Iceberg namespace """

storage_options: Dict[str, str] = Field(default_factory=dict)
""" Additional storage options (e.g., s3 credentials) """

Expand Down Expand Up @@ -127,12 +131,41 @@ def get_historical_features(
# 3. Picks the latest feature record for each entity record.
query += f" ASOF LEFT JOIN {fv.name} ON "
# Use 'entity_df.event_timestamp' which is standard in Feast universal tests
join_conds = [f"entity_df.{k} = {fv.name}.{k}" for k in fv.entities]
join_conds = [f"entity_df.{k} = {fv.name}.{k}" for k in fv.join_keys]
query += " AND ".join(join_conds)
query += f" AND entity_df.event_timestamp >= {fv.name}.{fv.batch_source.timestamp_field}"

return IcebergRetrievalJob(con, query)

@staticmethod
def pull_all_from_table_or_query(
config: RepoConfig,
data_source: Any,
join_key_columns: List[str],
feature_name_columns: List[str],
timestamp_field: str,
created_timestamp_column: Optional[str] = None,
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None,
) -> RetrievalJob:
from feast.infra.offline_stores.contrib.iceberg_offline_store.iceberg import (
IcebergOfflineStore,
)

# Reuse common setup logic
con, source_table = IcebergOfflineStore._setup_duckdb_source(
config, data_source, timestamp_field, start_date, end_date
)

columns = join_key_columns + feature_name_columns + [timestamp_field]
if created_timestamp_column:
columns.append(created_timestamp_column)

columns_str = ", ".join(columns)
query = f"SELECT {columns_str} FROM {source_table}"

return IcebergRetrievalJob(con, query)

@staticmethod
def pull_latest_from_table_or_query(
config: RepoConfig,
Expand All @@ -141,9 +174,45 @@ def pull_latest_from_table_or_query(
feature_name_columns: List[str],
timestamp_field: str,
created_timestamp_column: Optional[str],
start_date: datetime,
end_date: datetime,
start_date: Optional[datetime],
end_date: Optional[datetime],
) -> RetrievalJob:
from feast.infra.offline_stores.contrib.iceberg_offline_store.iceberg import (
IcebergOfflineStore,
)

# Reuse common setup logic
con, source_table = IcebergOfflineStore._setup_duckdb_source(
config, data_source, timestamp_field, start_date, end_date
)

# 3. Construct "Latest" Query
# Group by join keys and select the record with the maximum timestamp
join_keys_str = ", ".join(join_key_columns)
columns = join_key_columns + feature_name_columns + [timestamp_field]
if created_timestamp_column:
columns.append(created_timestamp_column)

columns_str = ", ".join(columns)

# Rank records by timestamp descending and pick rank 1
query = f"""
SELECT {columns_str} FROM (
SELECT *, row_number() OVER (PARTITION BY {join_keys_str} ORDER BY {timestamp_field} DESC) as rn
FROM {source_table}
) WHERE rn = 1
"""

return IcebergRetrievalJob(con, query)

@staticmethod
def _setup_duckdb_source(
config: RepoConfig,
data_source: Any,
timestamp_field: str,
start_date: Optional[datetime],
end_date: Optional[datetime],
) -> Tuple[duckdb.DuckDBPyConnection, str]:
from feast.infra.offline_stores.contrib.iceberg_offline_store.iceberg_source import (
IcebergSource,
)
Expand All @@ -163,42 +232,40 @@ def pull_latest_from_table_or_query(

# 2. Setup DuckDB and Load Table
con = duckdb.connect(database=":memory:")
table = catalog.load_table(data_source.table_identifier)
table_id = data_source.table_identifier
if not table_id:
raise ValueError(f"Table identifier missing for source {data_source.name}")
table = catalog.load_table(table_id)

# Build row filter
row_filters = []
if start_date:
start_date_naive = to_naive_utc(start_date)
row_filters.append(f"{timestamp_field} >= '{start_date_naive.isoformat()}'")
if end_date:
end_date_naive = to_naive_utc(end_date)
row_filters.append(f"{timestamp_field} <= '{end_date_naive.isoformat()}'")

row_filter = " AND ".join(row_filters) if row_filters else None

# Load filtered scan
scan = table.scan(
row_filter=f"{timestamp_field} >= '{start_date.isoformat()}' AND {timestamp_field} <= '{end_date.isoformat()}'"
)
scan = table.scan(row_filter=row_filter) if row_filter else table.scan()
tasks = list(scan.plan_files())
has_deletes = any(task.delete_files for task in tasks)

source_table = "source_table"
if not has_deletes:
file_paths = [task.file.file_path for task in tasks]
if file_paths:
con.execute(
f"CREATE VIEW source_table AS SELECT * FROM read_parquet({file_paths})"
f"CREATE VIEW {source_table} AS SELECT * FROM read_parquet({file_paths})"
)
else:
con.register("source_table", scan.to_arrow())
con.register(source_table, scan.to_arrow())
else:
con.register("source_table", scan.to_arrow())
con.register(source_table, scan.to_arrow())

# 3. Construct "Latest" Query
# Group by join keys and select the record with the maximum timestamp
join_keys_str = ", ".join(join_key_columns)
columns_str = ", ".join(
join_key_columns + feature_name_columns + [timestamp_field]
)

# Rank records by timestamp descending and pick rank 1
query = f"""
SELECT {columns_str} FROM (
SELECT *, row_number() OVER (PARTITION BY {join_keys_str} ORDER BY {timestamp_field} DESC) as rn
FROM source_table
) WHERE rn = 1
"""

return IcebergRetrievalJob(con, query)
return con, source_table


class IcebergRetrievalJob(RetrievalJob):
Expand Down
Loading