diff --git a/docs/reference/data-sources/push.md b/docs/reference/data-sources/push.md index 7a7ef96c7ca..ddb531dcca2 100644 --- a/docs/reference/data-sources/push.md +++ b/docs/reference/data-sources/push.md @@ -6,9 +6,12 @@ Push sources allow feature values to be pushed to the online store and offline s Push sources can be used by multiple feature views. When data is pushed to a push source, Feast propagates the feature values to all the consuming feature views. -Push sources must have a batch source specified. The batch source will be used for retrieving historical features. Thus users are also responsible for pushing data to a batch data source such as a data warehouse table. When using a push source as a stream source in the definition of a feature view, a batch source doesn't need to be specified in the feature view definition explicitly. +Push sources can optionally have a batch_source specified. If provided, it enables retrieval of historical features and supports materialization from the offline store to the online store. However, if your features are generated post-training or are only needed online (e.g., embeddings), you can omit the batch_source. + +When a batch_source is used, users are responsible for ensuring that data is also pushed to a batch data source, such as a data warehouse. Note that when a push source is used as a stream source in a feature view definition, a batch_source does not need to be explicitly specified in the feature view itself. ## Stream sources + Streaming data sources are important sources of feature values. A typical setup with streaming data looks like: 1. Raw events come in (stream 1) @@ -20,7 +23,9 @@ Streaming data sources are important sources of feature values. A typical setup Feast allows users to push features previously registered in a feature view to the online store for fresher features. It also allows users to push batches of stream data to the offline store by specifying that the push be directed to the offline store. This will push the data to the offline store declared in the repository configuration used to initialize the feature store. ## Example (basic) + ### Defining a push source + Note that the push schema needs to also include the entity. ```python @@ -43,7 +48,9 @@ fv = FeatureView( ``` ### Pushing data + Note that the `to` parameter is optional and defaults to online but we can specify these options: `PushMode.ONLINE`, `PushMode.OFFLINE`, or `PushMode.ONLINE_AND_OFFLINE`. + ```python from feast import FeatureStore import pandas as pd diff --git a/sdk/python/feast/data_source.py b/sdk/python/feast/data_source.py index 25475fcb4c3..fea3034dd0d 100644 --- a/sdk/python/feast/data_source.py +++ b/sdk/python/feast/data_source.py @@ -764,13 +764,13 @@ class PushSource(DataSource): # TODO(adchia): consider adding schema here in case where Feast manages pushing events to the offline store # TODO(adchia): consider a "mode" to support pushing raw vs transformed events - batch_source: DataSource + batch_source: Optional[DataSource] = None def __init__( self, *, name: str, - batch_source: DataSource, + batch_source: Optional[DataSource] = None, description: Optional[str] = "", tags: Optional[Dict[str, str]] = None, owner: Optional[str] = "", @@ -815,8 +815,11 @@ def get_table_column_names_and_types( @staticmethod def from_proto(data_source: DataSourceProto): - assert data_source.HasField("batch_source") - batch_source = DataSource.from_proto(data_source.batch_source) + batch_source = ( + DataSource.from_proto(data_source.batch_source) + if data_source.HasField("batch_source") + else None + ) return PushSource( name=data_source.name, @@ -827,19 +830,17 @@ def from_proto(data_source: DataSourceProto): ) def to_proto(self) -> DataSourceProto: - batch_source_proto = None - if self.batch_source: - batch_source_proto = self.batch_source.to_proto() - data_source_proto = DataSourceProto( name=self.name, type=DataSourceProto.PUSH_SOURCE, description=self.description, tags=self.tags, owner=self.owner, - batch_source=batch_source_proto, ) + if self.batch_source: + data_source_proto.batch_source.MergeFrom(self.batch_source.to_proto()) + return data_source_proto def get_table_query_string(self) -> str: diff --git a/sdk/python/feast/inference.py b/sdk/python/feast/inference.py index f2a2ee637fd..dd43d1f5bdb 100644 --- a/sdk/python/feast/inference.py +++ b/sdk/python/feast/inference.py @@ -29,7 +29,10 @@ def update_data_sources_with_inferred_event_timestamp_col( if isinstance(data_source, RequestSource): continue if isinstance(data_source, PushSource): - data_source = data_source.batch_source + if not isinstance(data_source.batch_source, DataSource): + continue + else: + data_source = data_source.batch_source if data_source.timestamp_field is None or data_source.timestamp_field == "": # prepare right match pattern for data source ts_column_type_regex_pattern: str diff --git a/sdk/python/tests/unit/test_data_sources.py b/sdk/python/tests/unit/test_data_sources.py index 990c5d3b698..8a2d0f44001 100644 --- a/sdk/python/tests/unit/test_data_sources.py +++ b/sdk/python/tests/unit/test_data_sources.py @@ -30,6 +30,22 @@ def test_push_with_batch(): assert push_source.batch_source.name == push_source_unproto.batch_source.name +def test_push_source_without_batch_source(): + # Create PushSource with no batch_source + push_source = PushSource(name="test_push_source") + + # Convert to proto + push_source_proto = push_source.to_proto() + + # Assert batch_source is not present in proto + assert not push_source_proto.HasField("batch_source") + + # Deserialize and check again + push_source_unproto = PushSource.from_proto(push_source_proto) + assert push_source_unproto.batch_source is None + assert push_source_unproto.name == "test_push_source" + + def test_request_source_primitive_type_to_proto(): schema = [ Field(name="f1", dtype=Float32),