-
Notifications
You must be signed in to change notification settings - Fork 1.3k
feat: Add Oracle DB as Offline store in python sdk & operator #6017
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
cff3a63
60f8d92
9f00168
07e6969
1fb6720
3abef8c
b991288
af7e7ee
41e0901
d4387a8
b0e96d0
07c5e9a
be03d88
c36299f
7e648c0
7573690
1a78577
8fc6190
1f63beb
33c0b38
04adc97
e91fa8d
caed7c8
de9759e
e40a355
c71cdfd
9a20c65
9880a56
5335b0d
c706b0b
414c0e0
5b84e73
92f78ca
c79ba55
95f7f55
1181be1
b54c2a2
fe797ed
466e2d2
7670a90
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
…-operator Signed-off-by: Aniket Paluskar <apaluska@redhat.com>
- Loading branch information
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,13 @@ | ||
| from feast.infra.offline_stores.contrib.oracle_offline_store.oracle import ( | ||
| OracleOfflineStore, | ||
| OracleOfflineStoreConfig, | ||
| ) | ||
| from feast.infra.offline_stores.contrib.oracle_offline_store.oracle_source import ( | ||
| OracleSource, | ||
| ) | ||
|
|
||
| __all__ = [ | ||
| "OracleSource", | ||
| "OracleOfflineStore", | ||
| "OracleOfflineStoreConfig", | ||
| ] |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,299 @@ | ||
| from datetime import datetime, timedelta, timezone | ||
| from pathlib import Path | ||
| from typing import Any, Callable, List, Literal, Optional, Union | ||
|
|
||
| import ibis | ||
| import pandas as pd | ||
| import pyarrow | ||
| from ibis.expr.types import Table | ||
| from pydantic import StrictInt, StrictStr | ||
|
|
||
| from feast.data_source import DataSource | ||
| from feast.feature_logging import LoggingConfig, LoggingSource | ||
| from feast.feature_view import FeatureView | ||
| from feast.infra.offline_stores.contrib.oracle_offline_store.oracle_source import ( | ||
| OracleSource, | ||
| ) | ||
| from feast.infra.offline_stores.ibis import ( | ||
| get_historical_features_ibis, | ||
| offline_write_batch_ibis, | ||
| pull_all_from_table_or_query_ibis, | ||
| pull_latest_from_table_or_query_ibis, | ||
| write_logged_features_ibis, | ||
| ) | ||
| from feast.infra.offline_stores.offline_store import OfflineStore, RetrievalJob | ||
| from feast.infra.registry.base_registry import BaseRegistry | ||
| from feast.repo_config import FeastConfigBaseModel, RepoConfig | ||
|
|
||
|
|
||
| def get_ibis_connection(config: RepoConfig): | ||
| """Create an ibis Oracle connection from the offline store config.""" | ||
| offline_config = config.offline_store | ||
| assert isinstance(offline_config, OracleOfflineStoreConfig) | ||
|
|
||
| kwargs = {} | ||
| if offline_config.service_name: | ||
| kwargs["service_name"] = offline_config.service_name | ||
| if offline_config.sid: | ||
| kwargs["sid"] = offline_config.sid | ||
| if offline_config.database: | ||
| kwargs["database"] = offline_config.database | ||
| if offline_config.dsn: | ||
| kwargs["dsn"] = offline_config.dsn | ||
|
|
||
| return ibis.oracle.connect( | ||
| user=offline_config.user, | ||
| password=offline_config.password, | ||
| host=offline_config.host, | ||
| port=offline_config.port, | ||
| **kwargs, | ||
| ) | ||
|
|
||
|
|
||
| def _read_oracle_table(con, data_source: DataSource) -> Table: | ||
| """Read an Oracle table via ibis. | ||
|
|
||
| Column names are returned exactly as Oracle stores them. The user is | ||
| expected to reference columns using the same casing shown by Oracle | ||
| (e.g. ``USER_ID`` for unquoted identifiers, ``CamelCase`` for quoted). | ||
| """ | ||
| assert isinstance(data_source, OracleSource) | ||
| return con.table(data_source.table_ref) | ||
|
|
||
|
|
||
| def _build_data_source_reader(config: RepoConfig): | ||
| """Build a reader that returns Oracle-backend ibis tables. | ||
|
|
||
| Used by ``pull_latest`` and ``pull_all`` where all operations happen on a | ||
| single backend (Oracle) and no cross-backend joins are needed. | ||
| """ | ||
| con = get_ibis_connection(config) | ||
|
|
||
| def _read_data_source(data_source: DataSource, repo_path: str = "") -> Table: | ||
| return _read_oracle_table(con, data_source) | ||
|
|
||
| return _read_data_source | ||
|
|
||
|
|
||
| def _build_data_source_reader_for_retrieval(config: RepoConfig): | ||
| """Build a reader that materializes Oracle data into an in-memory table. | ||
|
|
||
| Used by ``get_historical_features`` which joins feature tables with an | ||
| in-memory entity table (``ibis.memtable``). Both sides must be on the | ||
| same backend for computed columns like ``entity_row_id`` to survive the | ||
| join — converting to memtable ensures this. | ||
| """ | ||
| con = get_ibis_connection(config) | ||
|
aniketpalu marked this conversation as resolved.
Outdated
|
||
|
|
||
| def _read_data_source(data_source: DataSource, repo_path: str = "") -> Table: | ||
| table = _read_oracle_table(con, data_source) | ||
| return ibis.memtable(table.execute()) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. reading entire table in memory? Isn't there a better way to read filtered table based on timestamps may be?
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. pretty sure
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added pre-filter to avoid reading the whole table. Thanks for the catch
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. to be clear, I don't understand why we need
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for clarification, I misunderstood your earlier comment. You are right, materialization can be avoided by using After this fix, needed to do a little change in building |
||
|
|
||
| return _read_data_source | ||
|
|
||
|
|
||
| def _build_data_source_writer(config: RepoConfig): | ||
| """Build a function that writes data to an Oracle table via ibis.""" | ||
| con = get_ibis_connection(config) | ||
|
|
||
| def _write_data_source( | ||
| table: Table, | ||
| data_source: DataSource, | ||
| repo_path: str = "", | ||
| mode: str = "append", | ||
| allow_overwrite: bool = False, | ||
| ): | ||
| assert isinstance(data_source, OracleSource) | ||
| con.insert(table_name=data_source.table_ref, obj=table.to_pandas()) | ||
|
|
||
|
devin-ai-integration[bot] marked this conversation as resolved.
|
||
| return _write_data_source | ||
|
|
||
|
|
||
| class OracleOfflineStoreConfig(FeastConfigBaseModel): | ||
| """Offline store config for Oracle Database""" | ||
|
|
||
| type: Literal["oracle"] = "oracle" | ||
| """Offline store type selector""" | ||
|
|
||
| user: StrictStr = "system" | ||
|
aniketpalu marked this conversation as resolved.
Outdated
|
||
| """Oracle database user""" | ||
|
|
||
| password: StrictStr = "oracle123" | ||
| """Oracle database password""" | ||
|
|
||
| host: StrictStr = "localhost" | ||
| """Oracle database host""" | ||
|
|
||
| port: StrictInt = 1521 | ||
| """Oracle database port""" | ||
|
|
||
| service_name: Optional[StrictStr] = None | ||
| """Oracle service name (mutually exclusive with sid and dsn)""" | ||
|
aniketpalu marked this conversation as resolved.
|
||
|
|
||
| sid: Optional[StrictStr] = None | ||
| """Oracle SID (mutually exclusive with service_name and dsn)""" | ||
|
|
||
| database: Optional[StrictStr] = None | ||
| """Oracle database name""" | ||
|
|
||
| dsn: Optional[StrictStr] = None | ||
| """Oracle DSN string (mutually exclusive with service_name and sid)""" | ||
|
|
||
|
|
||
| class OracleOfflineStore(OfflineStore): | ||
| @staticmethod | ||
| def pull_latest_from_table_or_query( | ||
| config: RepoConfig, | ||
| data_source: DataSource, | ||
| join_key_columns: List[str], | ||
| feature_name_columns: List[str], | ||
| timestamp_field: str, | ||
| created_timestamp_column: Optional[str], | ||
| start_date: datetime, | ||
| end_date: datetime, | ||
| ) -> RetrievalJob: | ||
| return pull_latest_from_table_or_query_ibis( | ||
| config=config, | ||
| data_source=data_source, | ||
| join_key_columns=join_key_columns, | ||
| feature_name_columns=feature_name_columns, | ||
| timestamp_field=timestamp_field, | ||
| created_timestamp_column=created_timestamp_column, | ||
| start_date=start_date, | ||
| end_date=end_date, | ||
| data_source_reader=_build_data_source_reader(config), | ||
| data_source_writer=_build_data_source_writer(config), | ||
|
aniketpalu marked this conversation as resolved.
Outdated
|
||
| ) | ||
|
|
||
| @staticmethod | ||
| def get_historical_features( | ||
| config: RepoConfig, | ||
| feature_views: List[FeatureView], | ||
| feature_refs: List[str], | ||
| entity_df: Optional[Union[pd.DataFrame, str]], | ||
| registry: BaseRegistry, | ||
| project: str, | ||
| full_feature_names: bool = False, | ||
| **kwargs, | ||
| ) -> RetrievalJob: | ||
| # Handle non-entity retrieval mode (start_date/end_date only) | ||
| if entity_df is None: | ||
| start_date: Optional[datetime] = kwargs.get("start_date") | ||
| end_date: Optional[datetime] = kwargs.get("end_date") | ||
|
|
||
| if end_date is None: | ||
| end_date = datetime.now(tz=timezone.utc) | ||
| elif end_date.tzinfo is None: | ||
| end_date = end_date.replace(tzinfo=timezone.utc) | ||
|
|
||
| if start_date is None: | ||
| max_ttl = max( | ||
| ( | ||
| int(fv.ttl.total_seconds()) | ||
| for fv in feature_views | ||
| if fv.ttl and isinstance(fv.ttl, timedelta) | ||
| ), | ||
| default=0, | ||
| ) | ||
| start_date = end_date - timedelta( | ||
| seconds=max_ttl if max_ttl > 0 else 30 * 86400 | ||
| ) | ||
| elif start_date.tzinfo is None: | ||
| start_date = start_date.replace(tzinfo=timezone.utc) | ||
|
|
||
| # Build a synthetic entity_df from the feature source data | ||
| con = get_ibis_connection(config) | ||
| all_entities: set = set() | ||
| for fv in feature_views: | ||
| all_entities.update(e.name for e in fv.entity_columns) | ||
|
|
||
| entity_dfs = [] | ||
| for fv in feature_views: | ||
| source = fv.batch_source | ||
| table = _read_oracle_table(con, source) | ||
| ts_col = source.timestamp_field | ||
| join_keys = [e.name for e in fv.entity_columns] | ||
| cols = join_keys + [ts_col] | ||
| sub = table.filter( | ||
| (table[ts_col] >= ibis.literal(start_date)) | ||
| & (table[ts_col] <= ibis.literal(end_date)) | ||
| ).select(cols) | ||
| sub = sub.rename({"event_timestamp": ts_col}) | ||
| entity_dfs.append(sub.execute()) | ||
|
|
||
| entity_df = pd.concat(entity_dfs, ignore_index=True).drop_duplicates() | ||
|
aniketpalu marked this conversation as resolved.
Outdated
|
||
|
|
||
| # If entity_df is a SQL string, execute it to get a DataFrame | ||
| if type(entity_df) == str: | ||
|
tokoko marked this conversation as resolved.
Outdated
aniketpalu marked this conversation as resolved.
Outdated
|
||
| con = get_ibis_connection(config) | ||
| entity_df = con.sql(entity_df).execute() | ||
|
devin-ai-integration[bot] marked this conversation as resolved.
Outdated
|
||
|
|
||
| # Use the retrieval reader which materializes Oracle data into | ||
| # in-memory tables so the point-in-time join with the entity | ||
| # memtable happens on the same backend. | ||
| return get_historical_features_ibis( | ||
| config=config, | ||
| feature_views=feature_views, | ||
| feature_refs=feature_refs, | ||
| entity_df=entity_df, | ||
| registry=registry, | ||
| project=project, | ||
| full_feature_names=full_feature_names, | ||
| data_source_reader=_build_data_source_reader_for_retrieval(config), | ||
| data_source_writer=_build_data_source_writer(config), | ||
| ) | ||
|
|
||
| @staticmethod | ||
| def pull_all_from_table_or_query( | ||
| config: RepoConfig, | ||
| data_source: DataSource, | ||
| join_key_columns: List[str], | ||
| feature_name_columns: List[str], | ||
| timestamp_field: str, | ||
| created_timestamp_column: Optional[str] = None, | ||
| start_date: Optional[datetime] = None, | ||
| end_date: Optional[datetime] = None, | ||
| ) -> RetrievalJob: | ||
| return pull_all_from_table_or_query_ibis( | ||
| config=config, | ||
| data_source=data_source, | ||
| join_key_columns=join_key_columns, | ||
| feature_name_columns=feature_name_columns, | ||
| timestamp_field=timestamp_field, | ||
| created_timestamp_column=created_timestamp_column, | ||
| start_date=start_date, | ||
| end_date=end_date, | ||
| data_source_reader=_build_data_source_reader(config), | ||
| data_source_writer=_build_data_source_writer(config), | ||
| ) | ||
|
|
||
| @staticmethod | ||
| def offline_write_batch( | ||
| config: RepoConfig, | ||
| feature_view: FeatureView, | ||
| table: pyarrow.Table, | ||
| progress: Optional[Callable[[int], Any]], | ||
| ): | ||
| offline_write_batch_ibis( | ||
| config=config, | ||
| feature_view=feature_view, | ||
| table=table, | ||
| progress=progress, | ||
| data_source_writer=_build_data_source_writer(config), | ||
| ) | ||
|
|
||
| @staticmethod | ||
| def write_logged_features( | ||
| config: RepoConfig, | ||
| data: Union[pyarrow.Table, Path], | ||
| source: LoggingSource, | ||
| logging_config: LoggingConfig, | ||
| registry: BaseRegistry, | ||
| ): | ||
| write_logged_features_ibis( | ||
| config=config, | ||
| data=data, | ||
| source=source, | ||
| logging_config=logging_config, | ||
| registry=registry, | ||
| ) | ||
Uh oh!
There was an error while loading. Please reload this page.