Skip to content

Commit a62a90d

Browse files
chore: Rename stream data source parameters (feast-dev#2804)
* Deprecate `bootstrap_servers` parameter in KafkaSource Signed-off-by: Felix Wang <wangfelix98@gmail.com> * Rename `watermark` to `watermark_delay_threshold` for KafkaSource Signed-off-by: Felix Wang <wangfelix98@gmail.com> * Deprecate `date_partition_column` for all data sources Signed-off-by: Felix Wang <wangfelix98@gmail.com> * Fix Java Signed-off-by: Felix Wang <wangfelix98@gmail.com> * Fix SparkKafkaProcessor Signed-off-by: Felix Wang <wangfelix98@gmail.com> * Clarify comment Signed-off-by: Felix Wang <wangfelix98@gmail.com> * More clarifications Signed-off-by: Felix Wang <wangfelix98@gmail.com>
1 parent 19aedc2 commit a62a90d

File tree

12 files changed

+111
-85
lines changed

12 files changed

+111
-85
lines changed

java/serving/src/test/java/feast/serving/util/DataGenerator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -226,7 +226,7 @@ public static DataSource createKafkaDataSourceSpec(
226226
.setKafkaOptions(
227227
KafkaOptions.newBuilder()
228228
.setTopic(topic)
229-
.setBootstrapServers(servers)
229+
.setKafkaBootstrapServers(servers)
230230
.setMessageFormat(createProtoFormat("class.path"))
231231
.build())
232232
.setTimestampField(timestampColumn)

protos/feast/core/DataSource.proto

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -128,15 +128,16 @@ message DataSource {
128128
// Java Protobuf class at the given class path
129129
message KafkaOptions {
130130
// Comma separated list of Kafka bootstrap servers. Used for feature tables without a defined source host[:port]]
131-
string bootstrap_servers = 1;
131+
string kafka_bootstrap_servers = 1;
132132

133133
// Kafka topic to collect feature data from.
134134
string topic = 2;
135135

136136
// Defines the stream data format encoding feature/entity data in Kafka messages.
137137
StreamFormat message_format = 3;
138138

139-
google.protobuf.Duration watermark = 4;
139+
// Watermark delay threshold for stream data
140+
google.protobuf.Duration watermark_delay_threshold = 4;
140141
}
141142

142143
// Defines options for DataSource that sources features from Kinesis records.

sdk/python/feast/data_source.py

Lines changed: 83 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -50,15 +50,15 @@ class KafkaOptions:
5050

5151
def __init__(
5252
self,
53-
bootstrap_servers: str,
53+
kafka_bootstrap_servers: str,
5454
message_format: StreamFormat,
5555
topic: str,
56-
watermark: Optional[timedelta] = None,
56+
watermark_delay_threshold: Optional[timedelta] = None,
5757
):
58-
self.bootstrap_servers = bootstrap_servers
58+
self.kafka_bootstrap_servers = kafka_bootstrap_servers
5959
self.message_format = message_format
6060
self.topic = topic
61-
self.watermark = watermark or None
61+
self.watermark_delay_threshold = watermark_delay_threshold or None
6262

6363
@classmethod
6464
def from_proto(cls, kafka_options_proto: DataSourceProto.KafkaOptions):
@@ -71,18 +71,18 @@ def from_proto(cls, kafka_options_proto: DataSourceProto.KafkaOptions):
7171
Returns:
7272
Returns a BigQueryOptions object based on the kafka_options protobuf
7373
"""
74-
watermark = None
75-
if kafka_options_proto.HasField("watermark"):
76-
watermark = (
74+
watermark_delay_threshold = None
75+
if kafka_options_proto.HasField("watermark_delay_threshold"):
76+
watermark_delay_threshold = (
7777
timedelta(days=0)
78-
if kafka_options_proto.watermark.ToNanoseconds() == 0
79-
else kafka_options_proto.watermark.ToTimedelta()
78+
if kafka_options_proto.watermark_delay_threshold.ToNanoseconds() == 0
79+
else kafka_options_proto.watermark_delay_threshold.ToTimedelta()
8080
)
8181
kafka_options = cls(
82-
bootstrap_servers=kafka_options_proto.bootstrap_servers,
82+
kafka_bootstrap_servers=kafka_options_proto.kafka_bootstrap_servers,
8383
message_format=StreamFormat.from_proto(kafka_options_proto.message_format),
8484
topic=kafka_options_proto.topic,
85-
watermark=watermark,
85+
watermark_delay_threshold=watermark_delay_threshold,
8686
)
8787

8888
return kafka_options
@@ -94,16 +94,16 @@ def to_proto(self) -> DataSourceProto.KafkaOptions:
9494
Returns:
9595
KafkaOptionsProto protobuf
9696
"""
97-
watermark_duration = None
98-
if self.watermark is not None:
99-
watermark_duration = Duration()
100-
watermark_duration.FromTimedelta(self.watermark)
97+
watermark_delay_threshold = None
98+
if self.watermark_delay_threshold is not None:
99+
watermark_delay_threshold = Duration()
100+
watermark_delay_threshold.FromTimedelta(self.watermark_delay_threshold)
101101

102102
kafka_options_proto = DataSourceProto.KafkaOptions(
103-
bootstrap_servers=self.bootstrap_servers,
103+
kafka_bootstrap_servers=self.kafka_bootstrap_servers,
104104
message_format=self.message_format.to_proto(),
105105
topic=self.topic,
106-
watermark=watermark_duration,
106+
watermark_delay_threshold=watermark_delay_threshold,
107107
)
108108

109109
return kafka_options_proto
@@ -178,8 +178,8 @@ class DataSource(ABC):
178178
179179
Args:
180180
name: Name of data source, which should be unique within a project
181-
event_timestamp_column (optional): (Deprecated) Event timestamp column used for point in time
182-
joins of feature values.
181+
event_timestamp_column (optional): (Deprecated in favor of timestamp_field) Event
182+
timestamp column used for point in time joins of feature values.
183183
created_timestamp_column (optional): Timestamp column indicating when the row
184184
was created, used for deduplicating rows.
185185
field_mapping (optional): A dictionary mapping of column names in this data
@@ -220,8 +220,8 @@ def __init__(
220220
Creates a DataSource object.
221221
Args:
222222
name: Name of data source, which should be unique within a project
223-
event_timestamp_column (optional): (Deprecated) Event timestamp column used for point in time
224-
joins of feature values.
223+
event_timestamp_column (optional): (Deprecated in favor of timestamp_field) Event
224+
timestamp column used for point in time joins of feature values.
225225
created_timestamp_column (optional): Timestamp column indicating when the row
226226
was created, used for deduplicating rows.
227227
field_mapping (optional): A dictionary mapping of column names in this data
@@ -260,6 +260,14 @@ def __init__(
260260
self.date_partition_column = (
261261
date_partition_column if date_partition_column else ""
262262
)
263+
if date_partition_column:
264+
warnings.warn(
265+
(
266+
"The argument 'date_partition_column' is being deprecated. "
267+
"Feast 0.25 and onwards will not support 'date_timestamp_column' for data sources."
268+
),
269+
DeprecationWarning,
270+
)
263271
self.description = description or ""
264272
self.tags = tags or {}
265273
self.owner = owner or ""
@@ -364,20 +372,13 @@ def get_table_query_string(self) -> str:
364372

365373

366374
class KafkaSource(DataSource):
367-
def validate(self, config: RepoConfig):
368-
pass
369-
370-
def get_table_column_names_and_types(
371-
self, config: RepoConfig
372-
) -> Iterable[Tuple[str, str]]:
373-
pass
374-
375375
def __init__(
376376
self,
377377
*args,
378378
name: Optional[str] = None,
379379
event_timestamp_column: Optional[str] = "",
380380
bootstrap_servers: Optional[str] = None,
381+
kafka_bootstrap_servers: Optional[str] = None,
381382
message_format: Optional[StreamFormat] = None,
382383
topic: Optional[str] = None,
383384
created_timestamp_column: Optional[str] = "",
@@ -388,31 +389,34 @@ def __init__(
388389
owner: Optional[str] = "",
389390
timestamp_field: Optional[str] = "",
390391
batch_source: Optional[DataSource] = None,
391-
watermark: Optional[timedelta] = None,
392+
watermark_delay_threshold: Optional[timedelta] = None,
392393
):
393394
"""
394-
Creates a KafkaSource stream source object.
395+
Creates a KafkaSource object.
396+
395397
Args:
396-
name: str. Name of data source, which should be unique within a project
397-
event_timestamp_column (optional): str. (Deprecated) Event timestamp column used for point in time
398-
joins of feature values.
399-
bootstrap_servers: str. The servers of the kafka broker in the form "localhost:9092".
400-
message_format: StreamFormat. StreamFormat of serialized messages.
401-
topic: str. The name of the topic to read from in the kafka source.
402-
created_timestamp_column (optional): str. Timestamp column indicating when the row
398+
name: Name of data source, which should be unique within a project
399+
event_timestamp_column (optional): (Deprecated in favor of timestamp_field) Event
400+
timestamp column used for point in time joins of feature values.
401+
bootstrap_servers: (Deprecated) The servers of the kafka broker in the form "localhost:9092".
402+
kafka_bootstrap_servers: The servers of the kafka broker in the form "localhost:9092".
403+
message_format: StreamFormat of serialized messages.
404+
topic: The name of the topic to read from in the kafka source.
405+
created_timestamp_column (optional): Timestamp column indicating when the row
403406
was created, used for deduplicating rows.
404-
field_mapping (optional): dict(str, str). A dictionary mapping of column names in this data
407+
field_mapping (optional): A dictionary mapping of column names in this data
405408
source to feature names in a feature table or view. Only used for feature
406409
columns, not entity or timestamp columns.
407-
date_partition_column (optional): str. Timestamp column used for partitioning.
408-
description (optional): str. A human-readable description.
409-
tags (optional): dict(str, str). A dictionary of key-value pairs to store arbitrary metadata.
410-
owner (optional): str. The owner of the data source, typically the email of the primary
410+
date_partition_column (optional): Timestamp column used for partitioning.
411+
description (optional): A human-readable description.
412+
tags (optional): A dictionary of key-value pairs to store arbitrary metadata.
413+
owner (optional): The owner of the data source, typically the email of the primary
411414
maintainer.
412-
timestamp_field (optional): str. Event timestamp field used for point
415+
timestamp_field (optional): Event timestamp field used for point
413416
in time joins of feature values.
414-
batch_source: DataSource. The datasource that acts as a batch source.
415-
watermark: timedelta. The watermark for stream data. Specifically how late stream data can arrive without being discarded.
417+
batch_source: The datasource that acts as a batch source.
418+
watermark_delay_threshold: The watermark delay threshold for stream data. Specifically how
419+
late stream data can arrive without being discarded.
416420
"""
417421
positional_attributes = [
418422
"name",
@@ -423,10 +427,19 @@ def __init__(
423427
]
424428
_name = name
425429
_event_timestamp_column = event_timestamp_column
426-
_bootstrap_servers = bootstrap_servers or ""
430+
_kafka_bootstrap_servers = kafka_bootstrap_servers or bootstrap_servers or ""
427431
_message_format = message_format
428432
_topic = topic or ""
429433

434+
if bootstrap_servers:
435+
warnings.warn(
436+
(
437+
"The 'bootstrap_servers' parameter has been deprecated in favor of 'kafka_bootstrap_servers'. "
438+
"Feast 0.25 and onwards will not support the 'bootstrap_servers' parameter."
439+
),
440+
DeprecationWarning,
441+
)
442+
430443
if args:
431444
warnings.warn(
432445
(
@@ -445,7 +458,7 @@ def __init__(
445458
if len(args) >= 2:
446459
_event_timestamp_column = args[1]
447460
if len(args) >= 3:
448-
_bootstrap_servers = args[2]
461+
_kafka_bootstrap_servers = args[2]
449462
if len(args) >= 4:
450463
_message_format = args[3]
451464
if len(args) >= 5:
@@ -471,10 +484,10 @@ def __init__(
471484
self.batch_source = batch_source
472485

473486
self.kafka_options = KafkaOptions(
474-
bootstrap_servers=_bootstrap_servers,
487+
kafka_bootstrap_servers=_kafka_bootstrap_servers,
475488
message_format=_message_format,
476489
topic=_topic,
477-
watermark=watermark,
490+
watermark_delay_threshold=watermark_delay_threshold,
478491
)
479492

480493
def __eq__(self, other):
@@ -487,11 +500,12 @@ def __eq__(self, other):
487500
return False
488501

489502
if (
490-
self.kafka_options.bootstrap_servers
491-
!= other.kafka_options.bootstrap_servers
503+
self.kafka_options.kafka_bootstrap_servers
504+
!= other.kafka_options.kafka_bootstrap_servers
492505
or self.kafka_options.message_format != other.kafka_options.message_format
493506
or self.kafka_options.topic != other.kafka_options.topic
494-
or self.kafka_options.watermark != other.kafka_options.watermark
507+
or self.kafka_options.watermark_delay_threshold
508+
!= other.kafka_options.watermark_delay_threshold
495509
):
496510
return False
497511

@@ -502,22 +516,23 @@ def __hash__(self):
502516

503517
@staticmethod
504518
def from_proto(data_source: DataSourceProto):
505-
watermark = None
506-
if data_source.kafka_options.watermark:
507-
watermark = (
519+
watermark_delay_threshold = None
520+
if data_source.kafka_options.watermark_delay_threshold:
521+
watermark_delay_threshold = (
508522
timedelta(days=0)
509-
if data_source.kafka_options.watermark.ToNanoseconds() == 0
510-
else data_source.kafka_options.watermark.ToTimedelta()
523+
if data_source.kafka_options.watermark_delay_threshold.ToNanoseconds()
524+
== 0
525+
else data_source.kafka_options.watermark_delay_threshold.ToTimedelta()
511526
)
512527
return KafkaSource(
513528
name=data_source.name,
514529
event_timestamp_column=data_source.timestamp_field,
515530
field_mapping=dict(data_source.field_mapping),
516-
bootstrap_servers=data_source.kafka_options.bootstrap_servers,
531+
kafka_bootstrap_servers=data_source.kafka_options.kafka_bootstrap_servers,
517532
message_format=StreamFormat.from_proto(
518533
data_source.kafka_options.message_format
519534
),
520-
watermark=watermark,
535+
watermark_delay_threshold=watermark_delay_threshold,
521536
topic=data_source.kafka_options.topic,
522537
created_timestamp_column=data_source.created_timestamp_column,
523538
timestamp_field=data_source.timestamp_field,
@@ -548,6 +563,14 @@ def to_proto(self) -> DataSourceProto:
548563
data_source_proto.batch_source.MergeFrom(self.batch_source.to_proto())
549564
return data_source_proto
550565

566+
def validate(self, config: RepoConfig):
567+
pass
568+
569+
def get_table_column_names_and_types(
570+
self, config: RepoConfig
571+
) -> Iterable[Tuple[str, str]]:
572+
pass
573+
551574
@staticmethod
552575
def source_datatype_to_feast_value_type() -> Callable[[str], ValueType]:
553576
return type_map.redshift_to_feast_value_type

sdk/python/feast/infra/contrib/spark_kafka_processor.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ def _ingest_stream_data(self) -> StreamTable:
7777
self.spark.readStream.format("kafka")
7878
.option(
7979
"kafka.bootstrap.servers",
80-
self.data_source.kafka_options.bootstrap_servers,
80+
self.data_source.kafka_options.kafka_bootstrap_servers,
8181
)
8282
.option("subscribe", self.data_source.kafka_options.topic)
8383
.option("startingOffsets", "latest") # Query start
@@ -100,7 +100,7 @@ def _ingest_stream_data(self) -> StreamTable:
100100
self.spark.readStream.format("kafka")
101101
.option(
102102
"kafka.bootstrap.servers",
103-
self.data_source.kafka_options.bootstrap_servers,
103+
self.data_source.kafka_options.kafka_bootstrap_servers,
104104
)
105105
.option("subscribe", self.data_source.kafka_options.topic)
106106
.option("startingOffsets", "latest") # Query start

sdk/python/feast/infra/offline_stores/bigquery_source.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,8 @@ def __init__(
3737
3838
Args:
3939
table (optional): The BigQuery table where features can be found.
40-
event_timestamp_column: (Deprecated) Event timestamp column used for point in time joins of feature values.
40+
event_timestamp_column (optional): (Deprecated in favor of timestamp_field) Event
41+
timestamp column used for point in time joins of feature values.
4142
created_timestamp_column (optional): Timestamp column when row was created, used for deduplicating rows.
4243
field_mapping: A dictionary mapping of column names in this data source to feature names in a feature table
4344
or view. Only used for feature columns, not entities or timestamp columns.

sdk/python/feast/infra/offline_stores/file_source.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,8 @@ def __init__(
4444
4545
path: File path to file containing feature data. Must contain an event_timestamp column, entity columns and
4646
feature columns.
47-
event_timestamp_column(optional): (Deprecated) Event timestamp column used for point in time joins of feature values.
47+
event_timestamp_column (optional): (Deprecated in favor of timestamp_field) Event
48+
timestamp column used for point in time joins of feature values.
4849
created_timestamp_column (optional): Timestamp column when row was created, used for deduplicating rows.
4950
file_format (optional): Explicitly set the file format. Allows Feast to bypass inferring the file format.
5051
field_mapping: A dictionary mapping of column names in this data source to feature names in a feature table

sdk/python/feast/infra/offline_stores/redshift_source.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,8 @@ def __init__(
3939
Creates a RedshiftSource object.
4040
4141
Args:
42-
event_timestamp_column (optional): (Deprecated) Event timestamp column used for point in
43-
time joins of feature values.
42+
event_timestamp_column (optional): (Deprecated in favor of timestamp_field) Event
43+
timestamp column used for point in time joins of feature values.
4444
table (optional): Redshift table where the features are stored.
4545
schema (optional): Redshift schema in which the table is located.
4646
created_timestamp_column (optional): Timestamp column indicating when the

sdk/python/feast/infra/offline_stores/snowflake_source.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,8 @@ def __init__(
4343
warehouse (optional): Snowflake warehouse where the database is stored.
4444
schema (optional): Snowflake schema in which the table is located.
4545
table (optional): Snowflake table where the features are stored.
46-
event_timestamp_column (optional): (Deprecated) Event timestamp column used for point in
47-
time joins of feature values.
46+
event_timestamp_column (optional): (Deprecated in favor of timestamp_field) Event
47+
timestamp column used for point in time joins of feature values.
4848
query (optional): The query to be executed to obtain the features.
4949
created_timestamp_column (optional): Timestamp column indicating when the
5050
row was created, used for deduplicating rows.

sdk/python/tests/integration/registration/test_registry.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -315,11 +315,11 @@ def simple_udf(x: int):
315315
stream_source = KafkaSource(
316316
name="kafka",
317317
timestamp_field="event_timestamp",
318-
bootstrap_servers="",
318+
kafka_bootstrap_servers="",
319319
message_format=AvroFormat(""),
320320
topic="topic",
321321
batch_source=FileSource(path="some path"),
322-
watermark=timedelta(days=1),
322+
watermark_delay_threshold=timedelta(days=1),
323323
)
324324

325325
sfv = StreamFeatureView(

0 commit comments

Comments
 (0)