@@ -50,15 +50,15 @@ class KafkaOptions:
5050
5151 def __init__ (
5252 self ,
53- bootstrap_servers : str ,
53+ kafka_bootstrap_servers : str ,
5454 message_format : StreamFormat ,
5555 topic : str ,
56- watermark : Optional [timedelta ] = None ,
56+ watermark_delay_threshold : Optional [timedelta ] = None ,
5757 ):
58- self .bootstrap_servers = bootstrap_servers
58+ self .kafka_bootstrap_servers = kafka_bootstrap_servers
5959 self .message_format = message_format
6060 self .topic = topic
61- self .watermark = watermark or None
61+ self .watermark_delay_threshold = watermark_delay_threshold or None
6262
6363 @classmethod
6464 def from_proto (cls , kafka_options_proto : DataSourceProto .KafkaOptions ):
@@ -71,18 +71,18 @@ def from_proto(cls, kafka_options_proto: DataSourceProto.KafkaOptions):
7171 Returns:
7272 Returns a BigQueryOptions object based on the kafka_options protobuf
7373 """
74- watermark = None
75- if kafka_options_proto .HasField ("watermark " ):
76- watermark = (
74+ watermark_delay_threshold = None
75+ if kafka_options_proto .HasField ("watermark_delay_threshold " ):
76+ watermark_delay_threshold = (
7777 timedelta (days = 0 )
78- if kafka_options_proto .watermark .ToNanoseconds () == 0
79- else kafka_options_proto .watermark .ToTimedelta ()
78+ if kafka_options_proto .watermark_delay_threshold .ToNanoseconds () == 0
79+ else kafka_options_proto .watermark_delay_threshold .ToTimedelta ()
8080 )
8181 kafka_options = cls (
82- bootstrap_servers = kafka_options_proto .bootstrap_servers ,
82+ kafka_bootstrap_servers = kafka_options_proto .kafka_bootstrap_servers ,
8383 message_format = StreamFormat .from_proto (kafka_options_proto .message_format ),
8484 topic = kafka_options_proto .topic ,
85- watermark = watermark ,
85+ watermark_delay_threshold = watermark_delay_threshold ,
8686 )
8787
8888 return kafka_options
@@ -94,16 +94,16 @@ def to_proto(self) -> DataSourceProto.KafkaOptions:
9494 Returns:
9595 KafkaOptionsProto protobuf
9696 """
97- watermark_duration = None
98- if self .watermark is not None :
99- watermark_duration = Duration ()
100- watermark_duration .FromTimedelta (self .watermark )
97+ watermark_delay_threshold = None
98+ if self .watermark_delay_threshold is not None :
99+ watermark_delay_threshold = Duration ()
100+ watermark_delay_threshold .FromTimedelta (self .watermark_delay_threshold )
101101
102102 kafka_options_proto = DataSourceProto .KafkaOptions (
103- bootstrap_servers = self .bootstrap_servers ,
103+ kafka_bootstrap_servers = self .kafka_bootstrap_servers ,
104104 message_format = self .message_format .to_proto (),
105105 topic = self .topic ,
106- watermark = watermark_duration ,
106+ watermark_delay_threshold = watermark_delay_threshold ,
107107 )
108108
109109 return kafka_options_proto
@@ -178,8 +178,8 @@ class DataSource(ABC):
178178
179179 Args:
180180 name: Name of data source, which should be unique within a project
181- event_timestamp_column (optional): (Deprecated) Event timestamp column used for point in time
182- joins of feature values.
181+ event_timestamp_column (optional): (Deprecated in favor of timestamp_field) Event
182+ timestamp column used for point in time joins of feature values.
183183 created_timestamp_column (optional): Timestamp column indicating when the row
184184 was created, used for deduplicating rows.
185185 field_mapping (optional): A dictionary mapping of column names in this data
@@ -220,8 +220,8 @@ def __init__(
220220 Creates a DataSource object.
221221 Args:
222222 name: Name of data source, which should be unique within a project
223- event_timestamp_column (optional): (Deprecated) Event timestamp column used for point in time
224- joins of feature values.
223+ event_timestamp_column (optional): (Deprecated in favor of timestamp_field) Event
224+ timestamp column used for point in time joins of feature values.
225225 created_timestamp_column (optional): Timestamp column indicating when the row
226226 was created, used for deduplicating rows.
227227 field_mapping (optional): A dictionary mapping of column names in this data
@@ -260,6 +260,14 @@ def __init__(
260260 self .date_partition_column = (
261261 date_partition_column if date_partition_column else ""
262262 )
263+ if date_partition_column :
264+ warnings .warn (
265+ (
266+ "The argument 'date_partition_column' is being deprecated. "
267+ "Feast 0.25 and onwards will not support 'date_timestamp_column' for data sources."
268+ ),
269+ DeprecationWarning ,
270+ )
263271 self .description = description or ""
264272 self .tags = tags or {}
265273 self .owner = owner or ""
@@ -364,20 +372,13 @@ def get_table_query_string(self) -> str:
364372
365373
366374class KafkaSource (DataSource ):
367- def validate (self , config : RepoConfig ):
368- pass
369-
370- def get_table_column_names_and_types (
371- self , config : RepoConfig
372- ) -> Iterable [Tuple [str , str ]]:
373- pass
374-
375375 def __init__ (
376376 self ,
377377 * args ,
378378 name : Optional [str ] = None ,
379379 event_timestamp_column : Optional [str ] = "" ,
380380 bootstrap_servers : Optional [str ] = None ,
381+ kafka_bootstrap_servers : Optional [str ] = None ,
381382 message_format : Optional [StreamFormat ] = None ,
382383 topic : Optional [str ] = None ,
383384 created_timestamp_column : Optional [str ] = "" ,
@@ -388,31 +389,34 @@ def __init__(
388389 owner : Optional [str ] = "" ,
389390 timestamp_field : Optional [str ] = "" ,
390391 batch_source : Optional [DataSource ] = None ,
391- watermark : Optional [timedelta ] = None ,
392+ watermark_delay_threshold : Optional [timedelta ] = None ,
392393 ):
393394 """
394- Creates a KafkaSource stream source object.
395+ Creates a KafkaSource object.
396+
395397 Args:
396- name: str. Name of data source, which should be unique within a project
397- event_timestamp_column (optional): str. (Deprecated) Event timestamp column used for point in time
398- joins of feature values.
399- bootstrap_servers: str. The servers of the kafka broker in the form "localhost:9092".
400- message_format: StreamFormat. StreamFormat of serialized messages.
401- topic: str. The name of the topic to read from in the kafka source.
402- created_timestamp_column (optional): str. Timestamp column indicating when the row
398+ name: Name of data source, which should be unique within a project
399+ event_timestamp_column (optional): (Deprecated in favor of timestamp_field) Event
400+ timestamp column used for point in time joins of feature values.
401+ bootstrap_servers: (Deprecated) The servers of the kafka broker in the form "localhost:9092".
402+ kafka_bootstrap_servers: The servers of the kafka broker in the form "localhost:9092".
403+ message_format: StreamFormat of serialized messages.
404+ topic: The name of the topic to read from in the kafka source.
405+ created_timestamp_column (optional): Timestamp column indicating when the row
403406 was created, used for deduplicating rows.
404- field_mapping (optional): dict(str, str). A dictionary mapping of column names in this data
407+ field_mapping (optional): A dictionary mapping of column names in this data
405408 source to feature names in a feature table or view. Only used for feature
406409 columns, not entity or timestamp columns.
407- date_partition_column (optional): str. Timestamp column used for partitioning.
408- description (optional): str. A human-readable description.
409- tags (optional): dict(str, str). A dictionary of key-value pairs to store arbitrary metadata.
410- owner (optional): str. The owner of the data source, typically the email of the primary
410+ date_partition_column (optional): Timestamp column used for partitioning.
411+ description (optional): A human-readable description.
412+ tags (optional): A dictionary of key-value pairs to store arbitrary metadata.
413+ owner (optional): The owner of the data source, typically the email of the primary
411414 maintainer.
412- timestamp_field (optional): str. Event timestamp field used for point
415+ timestamp_field (optional): Event timestamp field used for point
413416 in time joins of feature values.
414- batch_source: DataSource. The datasource that acts as a batch source.
415- watermark: timedelta. The watermark for stream data. Specifically how late stream data can arrive without being discarded.
417+ batch_source: The datasource that acts as a batch source.
418+ watermark_delay_threshold: The watermark delay threshold for stream data. Specifically how
419+ late stream data can arrive without being discarded.
416420 """
417421 positional_attributes = [
418422 "name" ,
@@ -423,10 +427,19 @@ def __init__(
423427 ]
424428 _name = name
425429 _event_timestamp_column = event_timestamp_column
426- _bootstrap_servers = bootstrap_servers or ""
430+ _kafka_bootstrap_servers = kafka_bootstrap_servers or bootstrap_servers or ""
427431 _message_format = message_format
428432 _topic = topic or ""
429433
434+ if bootstrap_servers :
435+ warnings .warn (
436+ (
437+ "The 'bootstrap_servers' parameter has been deprecated in favor of 'kafka_bootstrap_servers'. "
438+ "Feast 0.25 and onwards will not support the 'bootstrap_servers' parameter."
439+ ),
440+ DeprecationWarning ,
441+ )
442+
430443 if args :
431444 warnings .warn (
432445 (
@@ -445,7 +458,7 @@ def __init__(
445458 if len (args ) >= 2 :
446459 _event_timestamp_column = args [1 ]
447460 if len (args ) >= 3 :
448- _bootstrap_servers = args [2 ]
461+ _kafka_bootstrap_servers = args [2 ]
449462 if len (args ) >= 4 :
450463 _message_format = args [3 ]
451464 if len (args ) >= 5 :
@@ -471,10 +484,10 @@ def __init__(
471484 self .batch_source = batch_source
472485
473486 self .kafka_options = KafkaOptions (
474- bootstrap_servers = _bootstrap_servers ,
487+ kafka_bootstrap_servers = _kafka_bootstrap_servers ,
475488 message_format = _message_format ,
476489 topic = _topic ,
477- watermark = watermark ,
490+ watermark_delay_threshold = watermark_delay_threshold ,
478491 )
479492
480493 def __eq__ (self , other ):
@@ -487,11 +500,12 @@ def __eq__(self, other):
487500 return False
488501
489502 if (
490- self .kafka_options .bootstrap_servers
491- != other .kafka_options .bootstrap_servers
503+ self .kafka_options .kafka_bootstrap_servers
504+ != other .kafka_options .kafka_bootstrap_servers
492505 or self .kafka_options .message_format != other .kafka_options .message_format
493506 or self .kafka_options .topic != other .kafka_options .topic
494- or self .kafka_options .watermark != other .kafka_options .watermark
507+ or self .kafka_options .watermark_delay_threshold
508+ != other .kafka_options .watermark_delay_threshold
495509 ):
496510 return False
497511
@@ -502,22 +516,23 @@ def __hash__(self):
502516
503517 @staticmethod
504518 def from_proto (data_source : DataSourceProto ):
505- watermark = None
506- if data_source .kafka_options .watermark :
507- watermark = (
519+ watermark_delay_threshold = None
520+ if data_source .kafka_options .watermark_delay_threshold :
521+ watermark_delay_threshold = (
508522 timedelta (days = 0 )
509- if data_source .kafka_options .watermark .ToNanoseconds () == 0
510- else data_source .kafka_options .watermark .ToTimedelta ()
523+ if data_source .kafka_options .watermark_delay_threshold .ToNanoseconds ()
524+ == 0
525+ else data_source .kafka_options .watermark_delay_threshold .ToTimedelta ()
511526 )
512527 return KafkaSource (
513528 name = data_source .name ,
514529 event_timestamp_column = data_source .timestamp_field ,
515530 field_mapping = dict (data_source .field_mapping ),
516- bootstrap_servers = data_source .kafka_options .bootstrap_servers ,
531+ kafka_bootstrap_servers = data_source .kafka_options .kafka_bootstrap_servers ,
517532 message_format = StreamFormat .from_proto (
518533 data_source .kafka_options .message_format
519534 ),
520- watermark = watermark ,
535+ watermark_delay_threshold = watermark_delay_threshold ,
521536 topic = data_source .kafka_options .topic ,
522537 created_timestamp_column = data_source .created_timestamp_column ,
523538 timestamp_field = data_source .timestamp_field ,
@@ -548,6 +563,14 @@ def to_proto(self) -> DataSourceProto:
548563 data_source_proto .batch_source .MergeFrom (self .batch_source .to_proto ())
549564 return data_source_proto
550565
566+ def validate (self , config : RepoConfig ):
567+ pass
568+
569+ def get_table_column_names_and_types (
570+ self , config : RepoConfig
571+ ) -> Iterable [Tuple [str , str ]]:
572+ pass
573+
551574 @staticmethod
552575 def source_datatype_to_feast_value_type () -> Callable [[str ], ValueType ]:
553576 return type_map .redshift_to_feast_value_type
0 commit comments