Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
47a9cfb
feat: Added offline store remote deployment functionly using arrow fl…
redhatHameed May 7, 2024
088d739
Initial functional commit for remote get_historical_features
dmartinol May 9, 2024
241fe50
remote offline store example
dmartinol May 9, 2024
e276704
removing unneeded test code and fixinf impotrts
dmartinol May 9, 2024
cdd8a4e
call do_put only once, postpone the invocation of do_put and simplifi…
dmartinol May 10, 2024
54f6061
added primitive parameters to the command descriptor
dmartinol May 10, 2024
be91d23
removed redundant param
dmartinol May 13, 2024
f074044
Initial skeleton of unit test for offline server
dmartinol May 15, 2024
06fe80d
added unit test for offline store remote client
redhatHameed May 14, 2024
49857da
testing all offlinestore APIs
dmartinol May 17, 2024
a30c666
integrated comments
dmartinol May 17, 2024
2c8d677
Updated remote offline server readme with the capability to init with…
tmihalac May 17, 2024
1bca528
added RemoteOfflineStoreDataSourceCreator,
dmartinol May 27, 2024
f035430
added missing CI requirement
dmartinol May 27, 2024
632a4c0
fixed linter
dmartinol May 27, 2024
94f927b
fixed multiprocess CI requirement
dmartinol May 27, 2024
36b3479
feat: Added offline store remote deployment functionly using arrow fl…
redhatHameed May 7, 2024
799ae07
fix test errors
dmartinol May 29, 2024
3029881
managing feature view aliases and restored skipped tests
dmartinol May 29, 2024
f6c481b
fixced linter issue
dmartinol May 29, 2024
871e5d4
fixed broken test
dmartinol May 29, 2024
ddc3600
added supported deployment modes using helm chart for online (defaul…
redhatHameed May 22, 2024
9a34251
updated the document for offline remote server
redhatHameed Jun 4, 2024
efeeeae
added the document for remote offline server
redhatHameed Jun 4, 2024
7077bee
rebase and fix conflicts
redhatHameed Jun 6, 2024
5697056
feat: Added offline store remote deployment functionly using arrow fl…
redhatHameed May 7, 2024
f9ca13b
added unit test for offline store remote client
redhatHameed May 14, 2024
46fa3d5
added RemoteOfflineStoreDataSourceCreator,
dmartinol May 27, 2024
b36105a
feat: Added offline store remote deployment functionly using arrow fl…
redhatHameed May 7, 2024
2f4a5ba
Added missing remote offline store apis implementation
tmihalac Jun 4, 2024
52b0156
Fixed tests
tmihalac Jun 5, 2024
32600fc
Implemented PR change proposal
tmihalac Jun 6, 2024
d2e012c
Implemented PR change proposal
tmihalac Jun 6, 2024
301021f
updated example readme file
redhatHameed Jun 7, 2024
e34b070
Implemented PR change proposal
tmihalac Jun 10, 2024
dec05c9
fixing the integration tests
redhatHameed Jun 11, 2024
143ef18
Fixed OfflineServer teardown
tmihalac Jun 10, 2024
bd9cd79
updated the document for remote offline feature server and client
redhatHameed Jun 13, 2024
bdf0150
Merge pull request #15 from redhatHameed/doc-change
redhatHameed Jun 13, 2024
17c7e72
Implemented PR change proposal
tmihalac Jun 13, 2024
11a3dae
Merge pull request #16 from tmihalac/implement-remaining-offline-methods
tmihalac Jun 13, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Initial functional commit for remote get_historical_features
Signed-off-by: Abdul Hameed <ahameed@redhat.com>
  • Loading branch information
dmartinol authored and redhatHameed committed Jun 10, 2024
commit 088d7394da079886b2e99455970f129131d10358
182 changes: 98 additions & 84 deletions sdk/python/feast/infra/offline_stores/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import pandas as pd
import pyarrow as pa
import pyarrow.parquet
from pydantic import StrictStr
from pydantic import StrictInt, StrictStr

from feast import OnDemandFeatureView
from feast.data_source import DataSource
Expand All @@ -17,61 +17,67 @@
RetrievalJob,
)
from feast.infra.registry.base_registry import BaseRegistry
from feast.infra.registry.registry import Registry
from feast.repo_config import FeastConfigBaseModel, RepoConfig
from feast.usage import log_exceptions_and_usage


class RemoteOfflineStoreConfig(FeastConfigBaseModel):
type: Literal["remote"] = "remote"
host: StrictStr
""" str: remote offline store server port, e.g. the host URL for offline store of arrow flight server. """

offline_type: StrictStr = "remote"
""" str: Provider name or a class name that implements Offline store."""

path: StrictStr = ""
""" str: Path to metadata store.
If offline_type is 'remote', then this is a URL for offline server """

host: StrictStr = ""
""" str: host to offline store.
If offline_type is 'remote', then this is a host URL for offline store of arrow flight server """

port: StrictStr = ""
""" str: host to offline store."""
port: Optional[StrictInt] = None
""" str: remote offline store server port."""


class RemoteRetrievalJob(RetrievalJob):
def __init__(
self,
config: RepoConfig,
feature_refs: List[str],
entity_df: Union[pd.DataFrame, str],
# TODO add missing parameters from the OfflineStore API
self,
config: RepoConfig,
feature_refs: List[str],
entity_df: Union[pd.DataFrame, str],
# TODO add missing parameters from the OfflineStore API
):
# Generate unique command identifier
self.command = str(uuid.uuid4())
# Initialize the client connection
self.client = pa.flight.connect(f"grpc://{config.offline_store.host}:{config.offline_store.port}")
self.client = pa.flight.connect(
f"grpc://{config.offline_store.host}:{config.offline_store.port}"
)
# Put API parameters
self._put_parameters(feature_refs, entity_df)

def _put_parameters(self, feature_refs, entity_df):
historical_flight_descriptor = pa.flight.FlightDescriptor.for_command(
self.command
)

entity_df_table = pa.Table.from_pandas(entity_df)
historical_flight_descriptor = pa.flight.FlightDescriptor.for_command(self.command)
writer, _ = self.client.do_put(historical_flight_descriptor,
entity_df_table.schema.with_metadata({
'command': self.command,
'api': 'get_historical_features',
'param': 'entity_df'}))
writer, _ = self.client.do_put(
historical_flight_descriptor,
entity_df_table.schema.with_metadata(
{
"command": self.command,
"api": "get_historical_features",
"param": "entity_df",
}
),
)
writer.write_table(entity_df_table)
writer.close()

features_array = pa.array(feature_refs)
features_batch = pa.RecordBatch.from_arrays([features_array], ['features'])
writer, _ = self.client.do_put(historical_flight_descriptor,
features_batch.schema.with_metadata({
'command': self.command,
'api': 'get_historical_features',
'param': 'features'}))
features_batch = pa.RecordBatch.from_arrays([features_array], ["features"])
writer, _ = self.client.do_put(
historical_flight_descriptor,
features_batch.schema.with_metadata(
{
"command": self.command,
"api": "get_historical_features",
"param": "features",
}
),
)
writer.write_batch(features_batch)
writer.close()

Expand All @@ -96,65 +102,73 @@ def on_demand_feature_views(self) -> List[OnDemandFeatureView]:


class RemoteOfflineStore(OfflineStore):
def __init__(
self,

arrow_host,
arrow_port
):
self.arrow_host = arrow_host
self.arrow_port = arrow_port

@staticmethod
@log_exceptions_and_usage(offline_store="remote")
def get_historical_features(
self,
config: RepoConfig,
feature_views: List[FeatureView],
feature_refs: List[str],
entity_df: Union[pd.DataFrame, str],
registry: Registry = None,
project: str = '',
full_feature_names: bool = False,
config: RepoConfig,
feature_views: List[FeatureView],
feature_refs: List[str],
entity_df: Union[pd.DataFrame, str],
registry: BaseRegistry,
project: str,
full_feature_names: bool = False,
) -> RemoteRetrievalJob:
offline_store_config = config.offline_store
assert isinstance(config.offline_store_config, RemoteOfflineStoreConfig)
store_type = offline_store_config.type
port = offline_store_config.port
host = offline_store_config.host
print(f"config.offline_store is {type(config.offline_store)}")
assert isinstance(config.offline_store, RemoteOfflineStoreConfig)

return RemoteRetrievalJob(RepoConfig, feature_refs, entity_df)
# TODO: extend RemoteRetrievalJob API with all method parameters
return RemoteRetrievalJob(
config=config, feature_refs=feature_refs, entity_df=entity_df
)

@staticmethod
@log_exceptions_and_usage(offline_store="remote")
def pull_all_from_table_or_query(
config: RepoConfig,
data_source: DataSource,
join_key_columns: List[str],
feature_name_columns: List[str],
timestamp_field: str,
start_date: datetime,
end_date: datetime,
) -> RetrievalJob:
# TODO Implementation here.
raise NotImplementedError

@staticmethod
@log_exceptions_and_usage(offline_store="remote")
def pull_latest_from_table_or_query(
config: RepoConfig,
data_source: DataSource,
join_key_columns: List[str],
feature_name_columns: List[str],
timestamp_field: str,
created_timestamp_column: Optional[str],
start_date: datetime,
end_date: datetime,
) -> RetrievalJob:
# TODO Implementation here.
raise NotImplementedError

@staticmethod
@log_exceptions_and_usage(offline_store="remote")
def pull_latest_from_table_or_query(self,
config: RepoConfig,
data_source: DataSource,
join_key_columns: List[str],
feature_name_columns: List[str],
timestamp_field: str,
created_timestamp_column: Optional[str],
start_date: datetime,
end_date: datetime) -> RetrievalJob:
""" Pulls data from the offline store for use in materialization."""
print("Pulling latest features from my offline store")
# Implementation here.
pass

def write_logged_features(
config: RepoConfig,
data: Union[pyarrow.Table, Path],
source: LoggingSource,
logging_config: LoggingConfig,
registry: BaseRegistry,
config: RepoConfig,
data: Union[pyarrow.Table, Path],
source: LoggingSource,
logging_config: LoggingConfig,
registry: BaseRegistry,
):
""" Optional method to have Feast support logging your online features."""
# Implementation here.
pass
# TODO Implementation here.
raise NotImplementedError

@staticmethod
@log_exceptions_and_usage(offline_store="remote")
def offline_write_batch(
config: RepoConfig,
feature_view: FeatureView,
table: pyarrow.Table,
progress: Optional[Callable[[int], Any]],
config: RepoConfig,
feature_view: FeatureView,
table: pyarrow.Table,
progress: Optional[Callable[[int], Any]],
):
# Implementation here.
pass
# TODO Implementation here.
raise NotImplementedError
Loading