Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
cff3a63
feat: Add Oracle DB as Offline store in python sdk & support in feas…
aniketpalu Feb 24, 2026
60f8d92
Added oracle db dependency from ibis-framework subgroups
aniketpalu Feb 24, 2026
9f00168
Operator yaml changes
aniketpalu Feb 24, 2026
07e6969
Data source writer ignored parameters, fixed
aniketpalu Feb 24, 2026
1fb6720
Replaced raw sql with dedicated truncate_table() to fix SQL Injection…
aniketpalu Feb 26, 2026
3abef8c
Minor improvements like single db connection, removal of default cred…
aniketpalu Feb 28, 2026
b991288
Fetching pre-filtered table from db
aniketpalu Mar 2, 2026
af7e7ee
Minor formatting changes
aniketpalu Mar 2, 2026
41e0901
Added Oracle DB Offline Store documentation
aniketpalu Mar 2, 2026
d4387a8
Resolved import error by removing OracleSource import from the __init__
aniketpalu Mar 2, 2026
b0e96d0
Fixed lint error by updating secret baseline
aniketpalu Mar 5, 2026
07c5e9a
fix: Exclude qdrant from docstring tests to avoid qdrant-client 1.17.…
aniketpalu Mar 5, 2026
be03d88
Generated secret.baseline to avoid lint error
aniketpalu Mar 5, 2026
c36299f
Fixed lint error
aniketpalu Mar 5, 2026
7e648c0
Updated .secrets.baseline
aniketpalu Mar 5, 2026
7573690
Fixed lint errors
aniketpalu Mar 6, 2026
1a78577
Fixed lint errors
aniketpalu Mar 6, 2026
8fc6190
Update sdk/python/feast/type_map.py
aniketpalu Mar 6, 2026
1f63beb
Updated dependency lock files
aniketpalu Mar 9, 2026
33c0b38
Fixed lint issues in Trino Offline Store
aniketpalu Mar 9, 2026
04adc97
Merge branch 'master' into oracle-db-offline-store
aniketpalu Mar 9, 2026
e91fa8d
Updated requirements
aniketpalu Mar 9, 2026
caed7c8
Updated pixi.lock file
aniketpalu Mar 9, 2026
de9759e
Merge branch 'master' into oracle-db-offline-store
aniketpalu Mar 9, 2026
e40a355
Merge branch 'master' into oracle-db-offline-store
aniketpalu Mar 9, 2026
c71cdfd
Merge branch 'master' into oracle-db-offline-store
aniketpalu Mar 9, 2026
9a20c65
Merge branch 'master' into oracle-db-offline-store
aniketpalu Mar 10, 2026
9880a56
Merge branch 'master' into oracle-db-offline-store
aniketpalu Mar 10, 2026
5335b0d
Restricted non-empty feature_views in get_historical_features() to av…
aniketpalu Mar 10, 2026
c706b0b
Removed _build_data_source_reader_for_retrieval function
aniketpalu Mar 10, 2026
414c0e0
Modified initial query to be _ to avoid empty string casting to Null …
aniketpalu Mar 10, 2026
5b84e73
cast DATE to TIMESTAMP in _read_oracle_table to preserve time lost by…
aniketpalu Mar 10, 2026
92f78ca
Use single database connection for pull_latest_from_table_or_query()
aniketpalu Mar 10, 2026
c79ba55
Improved readibility by breaking down the code into functions
aniketpalu Mar 10, 2026
95f7f55
Merge branch 'master' into oracle-db-offline-store
aniketpalu Mar 10, 2026
1181be1
Merge branch 'master' into oracle-db-offline-store
aniketpalu Mar 10, 2026
b54c2a2
Updated .secret.baseline
aniketpalu Mar 10, 2026
fe797ed
Updated .secret.baseline and pixi.lock
aniketpalu Mar 10, 2026
466e2d2
Fixed lint issue
aniketpalu Mar 10, 2026
7670a90
Conflicts resolved
aniketpalu Mar 11, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
feat: Add Oracle DB as Offline store in python sdk & support in feast…
…-operator

Signed-off-by: Aniket Paluskar <apaluska@redhat.com>
  • Loading branch information
aniketpalu committed Mar 9, 2026
commit cff3a6307e94e17c61b306e2551513cd831f1cd1
3 changes: 2 additions & 1 deletion infra/feast-operator/api/v1/featurestore_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,7 @@ var ValidOfflineStoreFilePersistenceTypes = []string{
// OfflineStoreDBStorePersistence configures the DB store persistence for the offline store service
type OfflineStoreDBStorePersistence struct {
// Type of the persistence type you want to use.
// +kubebuilder:validation:Enum=snowflake.offline;bigquery;redshift;spark;postgres;trino;athena;mssql;couchbase.offline;clickhouse;ray
// +kubebuilder:validation:Enum=snowflake.offline;bigquery;redshift;spark;postgres;trino;athena;mssql;couchbase.offline;clickhouse;ray;oracle
Type string `json:"type"`
// Data store parameters should be placed as-is from the "feature_store.yaml" under the secret key. "registry_type" & "type" fields should be removed.
SecretRef corev1.LocalObjectReference `json:"secretRef"`
Expand All @@ -422,6 +422,7 @@ var ValidOfflineStoreDBStorePersistenceTypes = []string{
"couchbase.offline",
"clickhouse",
"ray",
"oracle",
}

// OnlineStore configures the online store service
Expand Down
4 changes: 4 additions & 0 deletions sdk/python/feast/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
from feast.infra.offline_stores.contrib.athena_offline_store.athena_source import (
AthenaSource,
)
from feast.infra.offline_stores.contrib.oracle_offline_store.oracle_source import (
OracleSource,
)
Comment thread
aniketpalu marked this conversation as resolved.
from feast.infra.offline_stores.file_source import FileSource
from feast.infra.offline_stores.redshift_source import RedshiftSource
from feast.infra.offline_stores.snowflake_source import SnowflakeSource
Expand Down Expand Up @@ -56,6 +59,7 @@
"PushSource",
"RequestSource",
"AthenaSource",
"OracleSource",
"Project",
"FeastVectorStore",
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from feast.infra.offline_stores.contrib.oracle_offline_store.oracle import (
OracleOfflineStore,
OracleOfflineStoreConfig,
)
from feast.infra.offline_stores.contrib.oracle_offline_store.oracle_source import (
OracleSource,
)

__all__ = [
"OracleSource",
"OracleOfflineStore",
"OracleOfflineStoreConfig",
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,299 @@
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Any, Callable, List, Literal, Optional, Union

import ibis
import pandas as pd
import pyarrow
from ibis.expr.types import Table
from pydantic import StrictInt, StrictStr

from feast.data_source import DataSource
from feast.feature_logging import LoggingConfig, LoggingSource
from feast.feature_view import FeatureView
from feast.infra.offline_stores.contrib.oracle_offline_store.oracle_source import (
OracleSource,
)
from feast.infra.offline_stores.ibis import (
get_historical_features_ibis,
offline_write_batch_ibis,
pull_all_from_table_or_query_ibis,
pull_latest_from_table_or_query_ibis,
write_logged_features_ibis,
)
from feast.infra.offline_stores.offline_store import OfflineStore, RetrievalJob
from feast.infra.registry.base_registry import BaseRegistry
from feast.repo_config import FeastConfigBaseModel, RepoConfig


def get_ibis_connection(config: RepoConfig):
"""Create an ibis Oracle connection from the offline store config."""
offline_config = config.offline_store
assert isinstance(offline_config, OracleOfflineStoreConfig)

kwargs = {}
if offline_config.service_name:
kwargs["service_name"] = offline_config.service_name
if offline_config.sid:
kwargs["sid"] = offline_config.sid
if offline_config.database:
kwargs["database"] = offline_config.database
if offline_config.dsn:
kwargs["dsn"] = offline_config.dsn

return ibis.oracle.connect(
user=offline_config.user,
password=offline_config.password,
host=offline_config.host,
port=offline_config.port,
**kwargs,
)


def _read_oracle_table(con, data_source: DataSource) -> Table:
"""Read an Oracle table via ibis.

Column names are returned exactly as Oracle stores them. The user is
expected to reference columns using the same casing shown by Oracle
(e.g. ``USER_ID`` for unquoted identifiers, ``CamelCase`` for quoted).
"""
assert isinstance(data_source, OracleSource)
return con.table(data_source.table_ref)


def _build_data_source_reader(config: RepoConfig):
"""Build a reader that returns Oracle-backend ibis tables.

Used by ``pull_latest`` and ``pull_all`` where all operations happen on a
single backend (Oracle) and no cross-backend joins are needed.
"""
con = get_ibis_connection(config)

def _read_data_source(data_source: DataSource, repo_path: str = "") -> Table:
return _read_oracle_table(con, data_source)

return _read_data_source


def _build_data_source_reader_for_retrieval(config: RepoConfig):
"""Build a reader that materializes Oracle data into an in-memory table.

Used by ``get_historical_features`` which joins feature tables with an
in-memory entity table (``ibis.memtable``). Both sides must be on the
same backend for computed columns like ``entity_row_id`` to survive the
join — converting to memtable ensures this.
"""
con = get_ibis_connection(config)
Comment thread
aniketpalu marked this conversation as resolved.
Outdated

def _read_data_source(data_source: DataSource, repo_path: str = "") -> Table:
table = _read_oracle_table(con, data_source)
return ibis.memtable(table.execute())
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reading entire table in memory? Isn't there a better way to read filtered table based on timestamps may be?

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pretty sure _build_data_source_reader_for_retrieval function is completely redundant, it just repackages _build_data_source_reader for no good reason.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ntkathole @tokoko

Added pre-filter to avoid reading the whole table. Thanks for the catch

Copy link
Copy Markdown
Collaborator

@tokoko tokoko Mar 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to be clear, I don't understand why we need _build_data_source_reader_for_retrieval at all. can't you use _build_data_source_reader in get_historical_features_ibis call? that way there will be no materialization

Copy link
Copy Markdown
Contributor Author

@aniketpalu aniketpalu Mar 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tokoko

Thanks for clarification, I misunderstood your earlier comment. You are right, materialization can be avoided by using _build_data_source_reader.

After this fix, needed to do a little change in building entity_row_id in ibis as r = ibis.literal("") in _generate_row_id() creates broken entity_row_id due to Oracle DB casting it as NULL.


return _read_data_source


def _build_data_source_writer(config: RepoConfig):
"""Build a function that writes data to an Oracle table via ibis."""
con = get_ibis_connection(config)

def _write_data_source(
table: Table,
data_source: DataSource,
repo_path: str = "",
mode: str = "append",
allow_overwrite: bool = False,
):
assert isinstance(data_source, OracleSource)
con.insert(table_name=data_source.table_ref, obj=table.to_pandas())

Comment thread
devin-ai-integration[bot] marked this conversation as resolved.
return _write_data_source


class OracleOfflineStoreConfig(FeastConfigBaseModel):
"""Offline store config for Oracle Database"""

type: Literal["oracle"] = "oracle"
"""Offline store type selector"""

user: StrictStr = "system"
Comment thread
aniketpalu marked this conversation as resolved.
Outdated
"""Oracle database user"""

password: StrictStr = "oracle123"
"""Oracle database password"""

host: StrictStr = "localhost"
"""Oracle database host"""

port: StrictInt = 1521
"""Oracle database port"""

service_name: Optional[StrictStr] = None
"""Oracle service name (mutually exclusive with sid and dsn)"""
Comment thread
aniketpalu marked this conversation as resolved.

sid: Optional[StrictStr] = None
"""Oracle SID (mutually exclusive with service_name and dsn)"""

database: Optional[StrictStr] = None
"""Oracle database name"""

dsn: Optional[StrictStr] = None
"""Oracle DSN string (mutually exclusive with service_name and sid)"""


class OracleOfflineStore(OfflineStore):
@staticmethod
def pull_latest_from_table_or_query(
config: RepoConfig,
data_source: DataSource,
join_key_columns: List[str],
feature_name_columns: List[str],
timestamp_field: str,
created_timestamp_column: Optional[str],
start_date: datetime,
end_date: datetime,
) -> RetrievalJob:
return pull_latest_from_table_or_query_ibis(
config=config,
data_source=data_source,
join_key_columns=join_key_columns,
feature_name_columns=feature_name_columns,
timestamp_field=timestamp_field,
created_timestamp_column=created_timestamp_column,
start_date=start_date,
end_date=end_date,
data_source_reader=_build_data_source_reader(config),
data_source_writer=_build_data_source_writer(config),
Comment thread
aniketpalu marked this conversation as resolved.
Outdated
)

@staticmethod
def get_historical_features(
config: RepoConfig,
feature_views: List[FeatureView],
feature_refs: List[str],
entity_df: Optional[Union[pd.DataFrame, str]],
registry: BaseRegistry,
project: str,
full_feature_names: bool = False,
**kwargs,
) -> RetrievalJob:
# Handle non-entity retrieval mode (start_date/end_date only)
if entity_df is None:
start_date: Optional[datetime] = kwargs.get("start_date")
end_date: Optional[datetime] = kwargs.get("end_date")

if end_date is None:
end_date = datetime.now(tz=timezone.utc)
elif end_date.tzinfo is None:
end_date = end_date.replace(tzinfo=timezone.utc)

if start_date is None:
max_ttl = max(
(
int(fv.ttl.total_seconds())
for fv in feature_views
if fv.ttl and isinstance(fv.ttl, timedelta)
),
default=0,
)
start_date = end_date - timedelta(
seconds=max_ttl if max_ttl > 0 else 30 * 86400
)
elif start_date.tzinfo is None:
start_date = start_date.replace(tzinfo=timezone.utc)

# Build a synthetic entity_df from the feature source data
con = get_ibis_connection(config)
all_entities: set = set()
for fv in feature_views:
all_entities.update(e.name for e in fv.entity_columns)

entity_dfs = []
for fv in feature_views:
source = fv.batch_source
table = _read_oracle_table(con, source)
ts_col = source.timestamp_field
join_keys = [e.name for e in fv.entity_columns]
cols = join_keys + [ts_col]
sub = table.filter(
(table[ts_col] >= ibis.literal(start_date))
& (table[ts_col] <= ibis.literal(end_date))
).select(cols)
sub = sub.rename({"event_timestamp": ts_col})
entity_dfs.append(sub.execute())

entity_df = pd.concat(entity_dfs, ignore_index=True).drop_duplicates()
Comment thread
aniketpalu marked this conversation as resolved.
Outdated

# If entity_df is a SQL string, execute it to get a DataFrame
if type(entity_df) == str:
Comment thread
tokoko marked this conversation as resolved.
Outdated
Comment thread
aniketpalu marked this conversation as resolved.
Outdated
con = get_ibis_connection(config)
entity_df = con.sql(entity_df).execute()
Comment thread
devin-ai-integration[bot] marked this conversation as resolved.
Outdated

# Use the retrieval reader which materializes Oracle data into
# in-memory tables so the point-in-time join with the entity
# memtable happens on the same backend.
return get_historical_features_ibis(
config=config,
feature_views=feature_views,
feature_refs=feature_refs,
entity_df=entity_df,
registry=registry,
project=project,
full_feature_names=full_feature_names,
data_source_reader=_build_data_source_reader_for_retrieval(config),
data_source_writer=_build_data_source_writer(config),
)

@staticmethod
def pull_all_from_table_or_query(
config: RepoConfig,
data_source: DataSource,
join_key_columns: List[str],
feature_name_columns: List[str],
timestamp_field: str,
created_timestamp_column: Optional[str] = None,
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None,
) -> RetrievalJob:
return pull_all_from_table_or_query_ibis(
config=config,
data_source=data_source,
join_key_columns=join_key_columns,
feature_name_columns=feature_name_columns,
timestamp_field=timestamp_field,
created_timestamp_column=created_timestamp_column,
start_date=start_date,
end_date=end_date,
data_source_reader=_build_data_source_reader(config),
data_source_writer=_build_data_source_writer(config),
)

@staticmethod
def offline_write_batch(
config: RepoConfig,
feature_view: FeatureView,
table: pyarrow.Table,
progress: Optional[Callable[[int], Any]],
):
offline_write_batch_ibis(
config=config,
feature_view=feature_view,
table=table,
progress=progress,
data_source_writer=_build_data_source_writer(config),
)

@staticmethod
def write_logged_features(
config: RepoConfig,
data: Union[pyarrow.Table, Path],
source: LoggingSource,
logging_config: LoggingConfig,
registry: BaseRegistry,
):
write_logged_features_ibis(
config=config,
data=data,
source=source,
logging_config=logging_config,
registry=registry,
)
Loading