Skip to content

Commit c7ab086

Browse files
committed
Merge branch 'master' into feat/dynamo_db_online_write_read
2 parents 5bc54d3 + 45db6dc commit c7ab086

File tree

96 files changed

+3129
-2072
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

96 files changed

+3129
-2072
lines changed

.github/workflows/java_master_only.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ jobs:
2020
- uses: actions/checkout@v2
2121
with:
2222
submodules: 'true'
23-
- uses: google-github-actions/setup-gcloud@master
23+
- uses: google-github-actions/setup-gcloud@v0
2424
with:
2525
version: '290.0.1'
2626
export_default_credentials: true

.github/workflows/master_only.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,7 @@ jobs:
184184
username: ${{ secrets.DOCKERHUB_USERNAME }}
185185
password: ${{ secrets.DOCKERHUB_TOKEN }}
186186
- name: Set up Cloud SDK
187-
uses: google-github-actions/setup-gcloud@master
187+
uses: google-github-actions/setup-gcloud@v0
188188
with:
189189
project_id: ${{ secrets.GCP_PROJECT_ID }}
190190
service_account_key: ${{ secrets.GCP_SA_KEY }}

.github/workflows/publish.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ jobs:
6464
username: ${{ secrets.DOCKERHUB_USERNAME }}
6565
password: ${{ secrets.DOCKERHUB_TOKEN }}
6666
- name: Set up Cloud SDK
67-
uses: google-github-actions/setup-gcloud@master
67+
uses: google-github-actions/setup-gcloud@v0
6868
with:
6969
project_id: ${{ secrets.GCP_PROJECT_ID }}
7070
service_account_key: ${{ secrets.GCP_SA_KEY }}
@@ -107,7 +107,7 @@ jobs:
107107
VERSION_WITHOUT_PREFIX: ${{ needs.get-version.outputs.version_without_prefix }}
108108
steps:
109109
- uses: actions/checkout@v2
110-
- uses: google-github-actions/setup-gcloud@master
110+
- uses: google-github-actions/setup-gcloud@v0
111111
with:
112112
version: '290.0.1'
113113
export_default_credentials: true

Makefile

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ package-protos:
4646
cp -r ${ROOT_DIR}/protos ${ROOT_DIR}/sdk/python/feast/protos
4747

4848
compile-protos-python:
49-
python sdk/python/setup.py build_python_protos
49+
cd sdk/python && python setup.py build_python_protos
5050

5151
install-python:
5252
cd sdk/python && python -m piptools sync requirements/py$(PYTHON)-requirements.txt
@@ -92,7 +92,7 @@ format-python:
9292
cd ${ROOT_DIR}/sdk/python; python -m black --target-version py37 feast tests
9393

9494
lint-python:
95-
cd ${ROOT_DIR}/sdk/python; python -m mypy feast/ tests/
95+
cd ${ROOT_DIR}/sdk/python; python -m mypy
9696
cd ${ROOT_DIR}/sdk/python; python -m isort feast/ tests/ --check-only
9797
cd ${ROOT_DIR}/sdk/python; python -m flake8 feast/ tests/
9898
cd ${ROOT_DIR}/sdk/python; python -m black --check feast tests
@@ -133,7 +133,7 @@ install-protoc-dependencies:
133133
pip install grpcio-tools==1.34.0
134134

135135
compile-protos-go: install-go-proto-dependencies install-protoc-dependencies
136-
python sdk/python/setup.py build_go_protos
136+
cd sdk/python && python setup.py build_go_protos
137137

138138
compile-go-feature-server: compile-protos-go
139139
go mod tidy

docs/how-to-guides/adding-a-new-offline-store.md

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,8 @@ The OfflineStore class contains a couple of methods to read features from the of
2828
There are two methods that deal with reading data from the offline stores`get_historical_features`and `pull_latest_from_table_or_query`.
2929

3030
* `pull_latest_from_table_or_query` is invoked when running materialization (using the `feast materialize` or `feast materialize-incremental` commands, or the corresponding `FeatureStore.materialize()` method. This method pull data from the offline store, and the `FeatureStore` class takes care of writing this data into the online store.
31-
* `get_historical_features `is invoked when reading values from the offline store using the `FeatureStore.get_historica_features()` method. Typically, this method is used to retrieve features when training ML models.
31+
* `get_historical_features` is invoked when reading values from the offline store using the `FeatureStore.get_historical_features()` method. Typically, this method is used to retrieve features when training ML models.
32+
* `pull_all_from_table_or_query` is a method that pulls all the data from an offline store from a specified start date to a specified end date.
3233

3334
{% code title="feast_custom_offline_store/file.py" %}
3435
```python
@@ -223,7 +224,7 @@ To use our custom file offline store, we can use the following `feature_store.ya
223224
project: test_custom
224225
registry: data/registry.db
225226
provider: local
226-
offline_store:
227+
offline_store:
227228
type: feast_custom_offline_store.file.CustomFileOfflineStore
228229
```
229230
{% endcode %}

infra/scripts/release/files_to_bump.txt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,4 +9,5 @@ infra/charts/feast/charts/feature-server/values.yaml
99
infra/charts/feast/README.md
1010
infra/charts/feast-python-server/Chart.yaml
1111
infra/charts/feast-python-server/README.md
12-
java/pom.xml
12+
java/pom.xml
13+
ui/package.json

protos/feast/core/DataSource.proto

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,14 +26,14 @@ import "feast/core/DataFormat.proto";
2626
import "feast/types/Value.proto";
2727

2828
// Defines a Data Source that can be used source Feature data
29-
// Next available id: 22
29+
// Next available id: 23
3030
message DataSource {
3131
// Field indexes should *not* be reused. Not sure if fields 6-10 were used previously or not,
3232
// but they are going to be reserved for backwards compatibility.
3333
reserved 6 to 10;
3434

3535
// Type of Data Source.
36-
// Next available id: 9
36+
// Next available id: 10
3737
enum SourceType {
3838
INVALID = 0;
3939
BATCH_FILE = 1;
@@ -44,7 +44,7 @@ message DataSource {
4444
STREAM_KINESIS = 4;
4545
CUSTOM_SOURCE = 6;
4646
REQUEST_SOURCE = 7;
47-
47+
PUSH_SOURCE = 9;
4848
}
4949

5050
// Unique name of data source within the project
@@ -71,6 +71,8 @@ message DataSource {
7171

7272
// This is an internal field that is represents the python class for the data source object a proto object represents.
7373
// This should be set by feast, and not by users.
74+
// The field is used primarily by custom data sources and is mandatory for them to set. Feast may set it for
75+
// first party sources as well.
7476
string data_source_class_type = 17;
7577

7678
// Defines options for DataSource that sources features from a file
@@ -169,6 +171,16 @@ message DataSource {
169171
map<string, feast.types.ValueType.Enum> schema = 2;
170172
}
171173

174+
// Defines options for DataSource that supports pushing data to it. This allows data to be pushed to
175+
// the online store on-demand, such as by stream consumers.
176+
message PushOptions {
177+
// Mapping of feature name to type
178+
map<string, feast.types.ValueType.Enum> schema = 1;
179+
// Optional batch source for the push source for historical features and materialization.
180+
DataSource batch_source = 2;
181+
}
182+
183+
172184
// DataSource options.
173185
oneof options {
174186
FileOptions file_options = 11;
@@ -179,5 +191,6 @@ message DataSource {
179191
RequestDataOptions request_data_options = 18;
180192
CustomSourceOptions custom_options = 16;
181193
SnowflakeOptions snowflake_options = 19;
194+
PushOptions push_options = 22;
182195
}
183196
}

sdk/python/MANIFEST.in

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
11
recursive-include feast/protos/ *.py
22
include feast/binaries/*
3+
recursive-include feast py.typed *.pyi

sdk/python/feast/data_source.py

Lines changed: 95 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,18 @@ def to_proto(self) -> DataSourceProto.KinesisOptions:
134134
return kinesis_options_proto
135135

136136

137+
_DATA_SOURCE_OPTIONS = {
138+
DataSourceProto.SourceType.BATCH_FILE: "feast.infra.offline_stores.file_source.FileSource",
139+
DataSourceProto.SourceType.BATCH_BIGQUERY: "feast.infra.offline_stores.bigquery_source.BigQuerySource",
140+
DataSourceProto.SourceType.BATCH_REDSHIFT: "feast.infra.offline_stores.redshift_source.RedshiftSource",
141+
DataSourceProto.SourceType.BATCH_SNOWFLAKE: "feast.infra.offline_stores.snowflake_source.SnowflakeSource",
142+
DataSourceProto.SourceType.STREAM_KAFKA: "feast.data_source.KafkaSource",
143+
DataSourceProto.SourceType.STREAM_KINESIS: "feast.data_source.KinesisSource",
144+
DataSourceProto.SourceType.REQUEST_SOURCE: "feast.data_source.RequestDataSource",
145+
DataSourceProto.SourceType.PUSH_SOURCE: "feast.data_source.PushSource",
146+
}
147+
148+
137149
class DataSource(ABC):
138150
"""
139151
DataSource that can be used to source features.
@@ -210,48 +222,20 @@ def from_proto(data_source: DataSourceProto) -> Any:
210222
Raises:
211223
ValueError: The type of DataSource could not be identified.
212224
"""
213-
if data_source.data_source_class_type:
214-
cls = get_data_source_class_from_type(data_source.data_source_class_type)
215-
return cls.from_proto(data_source)
216-
217-
if data_source.request_data_options and data_source.request_data_options.schema:
218-
data_source_obj = RequestDataSource.from_proto(data_source)
219-
elif data_source.file_options.file_format and data_source.file_options.file_url:
220-
from feast.infra.offline_stores.file_source import FileSource
221-
222-
data_source_obj = FileSource.from_proto(data_source)
223-
elif (
224-
data_source.bigquery_options.table_ref or data_source.bigquery_options.query
225+
data_source_type = data_source.type
226+
if not data_source_type or (
227+
data_source_type
228+
not in list(_DATA_SOURCE_OPTIONS.keys())
229+
+ [DataSourceProto.SourceType.CUSTOM_SOURCE]
225230
):
226-
from feast.infra.offline_stores.bigquery_source import BigQuerySource
227-
228-
data_source_obj = BigQuerySource.from_proto(data_source)
229-
elif data_source.redshift_options.table or data_source.redshift_options.query:
230-
from feast.infra.offline_stores.redshift_source import RedshiftSource
231-
232-
data_source_obj = RedshiftSource.from_proto(data_source)
233-
234-
elif data_source.snowflake_options.table or data_source.snowflake_options.query:
235-
from feast.infra.offline_stores.snowflake_source import SnowflakeSource
236-
237-
data_source_obj = SnowflakeSource.from_proto(data_source)
238-
239-
elif (
240-
data_source.kafka_options.bootstrap_servers
241-
and data_source.kafka_options.topic
242-
and data_source.kafka_options.message_format
243-
):
244-
data_source_obj = KafkaSource.from_proto(data_source)
245-
elif (
246-
data_source.kinesis_options.record_format
247-
and data_source.kinesis_options.region
248-
and data_source.kinesis_options.stream_name
249-
):
250-
data_source_obj = KinesisSource.from_proto(data_source)
251-
else:
252231
raise ValueError("Could not identify the source type being added.")
253232

254-
return data_source_obj
233+
if data_source_type == DataSourceProto.SourceType.CUSTOM_SOURCE:
234+
cls = get_data_source_class_from_type(data_source.data_source_class_type)
235+
return cls.from_proto(data_source)
236+
237+
cls = get_data_source_class_from_type(_DATA_SOURCE_OPTIONS[data_source_type])
238+
return cls.from_proto(data_source)
255239

256240
@abstractmethod
257241
def to_proto(self) -> DataSourceProto:
@@ -522,3 +506,75 @@ def to_proto(self) -> DataSourceProto:
522506
data_source_proto.date_partition_column = self.date_partition_column
523507

524508
return data_source_proto
509+
510+
511+
class PushSource(DataSource):
512+
"""
513+
PushSource that can be used to ingest features on request
514+
515+
Args:
516+
name: Name of the push source
517+
schema: Schema mapping from the input feature name to a ValueType
518+
"""
519+
520+
name: str
521+
schema: Dict[str, ValueType]
522+
batch_source: Optional[DataSource]
523+
524+
def __init__(
525+
self,
526+
name: str,
527+
schema: Dict[str, ValueType],
528+
batch_source: Optional[DataSource] = None,
529+
):
530+
"""Creates a PushSource object."""
531+
super().__init__(name)
532+
self.schema = schema
533+
self.batch_source = batch_source
534+
535+
def validate(self, config: RepoConfig):
536+
pass
537+
538+
def get_table_column_names_and_types(
539+
self, config: RepoConfig
540+
) -> Iterable[Tuple[str, str]]:
541+
pass
542+
543+
@staticmethod
544+
def from_proto(data_source: DataSourceProto):
545+
schema_pb = data_source.push_options.schema
546+
schema = {}
547+
for key, value in schema_pb.items():
548+
schema[key] = value
549+
550+
batch_source = None
551+
if data_source.push_options.HasField("batch_source"):
552+
batch_source = DataSource.from_proto(data_source.push_options.batch_source)
553+
554+
return PushSource(
555+
name=data_source.name, schema=schema, batch_source=batch_source
556+
)
557+
558+
def to_proto(self) -> DataSourceProto:
559+
schema_pb = {}
560+
for key, value in self.schema.items():
561+
schema_pb[key] = value
562+
batch_source_proto = None
563+
if self.batch_source:
564+
batch_source_proto = self.batch_source.to_proto()
565+
566+
options = DataSourceProto.PushOptions(
567+
schema=schema_pb, batch_source=batch_source_proto
568+
)
569+
data_source_proto = DataSourceProto(
570+
name=self.name, type=DataSourceProto.PUSH_SOURCE, push_options=options,
571+
)
572+
573+
return data_source_proto
574+
575+
def get_table_query_string(self) -> str:
576+
raise NotImplementedError
577+
578+
@staticmethod
579+
def source_datatype_to_feast_value_type() -> Callable[[str], ValueType]:
580+
raise NotImplementedError

sdk/python/feast/infra/offline_stores/offline_store.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -179,9 +179,24 @@ def pull_latest_from_table_or_query(
179179
end_date: datetime,
180180
) -> RetrievalJob:
181181
"""
182+
This method pulls data from the offline store, and the FeatureStore class is used to write
183+
this data into the online store. This method is invoked when running materialization (using
184+
the `feast materialize` or `feast materialize-incremental` commands, or the corresponding
185+
FeatureStore.materialize() method. This method pulls data from the offline store, and the FeatureStore
186+
class is used to write this data into the online store.
187+
182188
Note that join_key_columns, feature_name_columns, event_timestamp_column, and created_timestamp_column
183189
have all already been mapped to column names of the source table and those column names are the values passed
184190
into this function.
191+
192+
Args:
193+
config: Repo configuration object
194+
data_source: Data source to pull all of the columns from
195+
join_key_columns: Columns of the join keys
196+
feature_name_columns: Columns of the feature names needed
197+
event_timestamp_column: Timestamp column
198+
start_date: Starting date of query
199+
end_date: Ending date of query
185200
"""
186201
pass
187202

@@ -210,8 +225,19 @@ def pull_all_from_table_or_query(
210225
end_date: datetime,
211226
) -> RetrievalJob:
212227
"""
228+
Returns a Retrieval Job for all join key columns, feature name columns, and the event timestamp columns that occur between the start_date and end_date.
229+
213230
Note that join_key_columns, feature_name_columns, event_timestamp_column, and created_timestamp_column
214231
have all already been mapped to column names of the source table and those column names are the values passed
215232
into this function.
233+
234+
Args:
235+
config: Repo configuration object
236+
data_source: Data source to pull all of the columns from
237+
join_key_columns: Columns of the join keys
238+
feature_name_columns: Columns of the feature names needed
239+
event_timestamp_column: Timestamp column
240+
start_date: Starting date of query
241+
end_date: Ending date of query
216242
"""
217243
pass

0 commit comments

Comments
 (0)