Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Add tests
Signed-off-by: Felix Wang <wangfelix98@gmail.com>
  • Loading branch information
felixwang9817 committed Aug 12, 2022
commit e733c89eda7ac70504cf6de1382d3908074445a1
20 changes: 19 additions & 1 deletion sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,13 @@
from feast import feature_server, flags_helper, ui_server, utils
from feast.base_feature_view import BaseFeatureView
from feast.batch_feature_view import BatchFeatureView
from feast.data_source import DataSource, PushMode
from feast.data_source import (
DataSource,
KafkaSource,
KinesisSource,
PushMode,
PushSource,
)
from feast.diff.infra_diff import InfraDiff, diff_infra_protos
from feast.diff.registry_diff import RegistryDiff, apply_diff_to_registry, diff_between
from feast.dqm.errors import ValidationFailed
Expand Down Expand Up @@ -827,6 +833,18 @@ def apply(
ob for ob in objects if isinstance(ob, ValidationReference)
]

batch_sources_to_add: List[DataSource] = []
for data_source in data_sources_set_to_update:
if (
isinstance(data_source, PushSource)
or isinstance(data_source, KafkaSource)
or isinstance(data_source, KinesisSource)
):
assert data_source.batch_source
batch_sources_to_add.append(data_source.batch_source)
for batch_source in batch_sources_to_add:
data_sources_set_to_update.add(batch_source)

for fv in itertools.chain(views_to_update, sfvs_to_update):
data_sources_set_to_update.add(fv.batch_source)
if fv.stream_source:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
from datetime import timedelta

from feast import Entity, FeatureView, Field, FileSource
from feast.types import Float32, Int32, Int64

driver = Entity(
name="driver_id",
description="driver id",
)

driver_hourly_stats_view = FeatureView(
name="driver_hourly_stats",
entities=[driver],
ttl=timedelta(days=1),
schema=[
Field(name="conv_rate", dtype=Float32),
Field(name="acc_rate", dtype=Float32),
Field(name="avg_daily_trips", dtype=Int64),
Field(name="driver_id", dtype=Int32),
],
online=True,
source=FileSource(
path="data/driver_stats.parquet", # Fake path
timestamp_field="event_timestamp",
created_timestamp_column="created",
),
tags={},
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
from datetime import timedelta

from feast import Entity, FeatureView, Field, FileSource, KafkaSource
from feast.data_format import AvroFormat
from feast.types import Float32, Int32, Int64

driver = Entity(
name="driver_id",
description="driver id",
)

driver_hourly_stats_view = FeatureView(
name="driver_hourly_stats",
entities=[driver],
ttl=timedelta(days=1),
schema=[
Field(name="conv_rate", dtype=Float32),
Field(name="acc_rate", dtype=Float32),
Field(name="avg_daily_trips", dtype=Int64),
Field(name="driver_id", dtype=Int32),
],
online=True,
source=KafkaSource(
name="kafka",
timestamp_field="event_timestamp",
kafka_bootstrap_servers="",
message_format=AvroFormat(""),
topic="topic",
batch_source=FileSource(
path="data/driver_stats.parquet", # Fake path
timestamp_field="event_timestamp",
created_timestamp_column="created",
),
watermark_delay_threshold=timedelta(days=1),
),
tags={},
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
from datetime import timedelta

from feast import FileSource, KafkaSource
from feast.data_format import AvroFormat

stream_source = KafkaSource(
name="kafka",
timestamp_field="event_timestamp",
kafka_bootstrap_servers="",
message_format=AvroFormat(""),
topic="topic",
batch_source=FileSource(
path="data/driver_stats.parquet", # Fake path
timestamp_field="event_timestamp",
created_timestamp_column="created",
),
watermark_delay_threshold=timedelta(days=1),
)
Loading