1515import enum
1616import warnings
1717from abc import ABC , abstractmethod
18+ from datetime import timedelta
1819from typing import Any , Callable , Dict , Iterable , List , Optional , Tuple , Union
1920
21+ from google .protobuf .duration_pb2 import Duration
2022from google .protobuf .json_format import MessageToJson
2123
2224from feast import type_map
@@ -47,11 +49,16 @@ class KafkaOptions:
4749 """
4850
4951 def __init__ (
50- self , bootstrap_servers : str , message_format : StreamFormat , topic : str ,
52+ self ,
53+ bootstrap_servers : str ,
54+ message_format : StreamFormat ,
55+ topic : str ,
56+ watermark : Optional [timedelta ] = None ,
5157 ):
5258 self .bootstrap_servers = bootstrap_servers
5359 self .message_format = message_format
5460 self .topic = topic
61+ self .watermark = watermark or None
5562
5663 @classmethod
5764 def from_proto (cls , kafka_options_proto : DataSourceProto .KafkaOptions ):
@@ -64,11 +71,18 @@ def from_proto(cls, kafka_options_proto: DataSourceProto.KafkaOptions):
6471 Returns:
6572 Returns a BigQueryOptions object based on the kafka_options protobuf
6673 """
67-
74+ watermark = None
75+ if kafka_options_proto .HasField ("watermark" ):
76+ watermark = (
77+ timedelta (days = 0 )
78+ if kafka_options_proto .watermark .ToNanoseconds () == 0
79+ else kafka_options_proto .watermark .ToTimedelta ()
80+ )
6881 kafka_options = cls (
6982 bootstrap_servers = kafka_options_proto .bootstrap_servers ,
7083 message_format = StreamFormat .from_proto (kafka_options_proto .message_format ),
7184 topic = kafka_options_proto .topic ,
85+ watermark = watermark ,
7286 )
7387
7488 return kafka_options
@@ -80,11 +94,16 @@ def to_proto(self) -> DataSourceProto.KafkaOptions:
8094 Returns:
8195 KafkaOptionsProto protobuf
8296 """
97+ watermark_duration = None
98+ if self .watermark is not None :
99+ watermark_duration = Duration ()
100+ watermark_duration .FromTimedelta (self .watermark )
83101
84102 kafka_options_proto = DataSourceProto .KafkaOptions (
85103 bootstrap_servers = self .bootstrap_servers ,
86104 message_format = self .message_format .to_proto (),
87105 topic = self .topic ,
106+ watermark = watermark_duration ,
88107 )
89108
90109 return kafka_options_proto
@@ -369,7 +388,32 @@ def __init__(
369388 owner : Optional [str ] = "" ,
370389 timestamp_field : Optional [str ] = "" ,
371390 batch_source : Optional [DataSource ] = None ,
391+ watermark : Optional [timedelta ] = None ,
372392 ):
393+ """
394+ Creates a KafkaSource stream source object.
395+ Args:
396+ name: str. Name of data source, which should be unique within a project
397+ event_timestamp_column (optional): str. (Deprecated) Event timestamp column used for point in time
398+ joins of feature values.
399+ bootstrap_servers: str. The servers of the kafka broker in the form "localhost:9092".
400+ message_format: StreamFormat. StreamFormat of serialized messages.
401+ topic: str. The name of the topic to read from in the kafka source.
402+ created_timestamp_column (optional): str. Timestamp column indicating when the row
403+ was created, used for deduplicating rows.
404+ field_mapping (optional): dict(str, str). A dictionary mapping of column names in this data
405+ source to feature names in a feature table or view. Only used for feature
406+ columns, not entity or timestamp columns.
407+ date_partition_column (optional): str. Timestamp column used for partitioning.
408+ description (optional): str. A human-readable description.
409+ tags (optional): dict(str, str). A dictionary of key-value pairs to store arbitrary metadata.
410+ owner (optional): str. The owner of the data source, typically the email of the primary
411+ maintainer.
412+ timestamp_field (optional): str. Event timestamp field used for point
413+ in time joins of feature values.
414+ batch_source: DataSource. The datasource that acts as a batch source.
415+ watermark: timedelta. The watermark for stream data. Specifically how late stream data can arrive without being discarded.
416+ """
373417 positional_attributes = [
374418 "name" ,
375419 "event_timestamp_column" ,
@@ -425,10 +469,12 @@ def __init__(
425469 timestamp_field = timestamp_field ,
426470 )
427471 self .batch_source = batch_source
472+
428473 self .kafka_options = KafkaOptions (
429474 bootstrap_servers = _bootstrap_servers ,
430475 message_format = _message_format ,
431476 topic = _topic ,
477+ watermark = watermark ,
432478 )
433479
434480 def __eq__ (self , other ):
@@ -445,6 +491,7 @@ def __eq__(self, other):
445491 != other .kafka_options .bootstrap_servers
446492 or self .kafka_options .message_format != other .kafka_options .message_format
447493 or self .kafka_options .topic != other .kafka_options .topic
494+ or self .kafka_options .watermark != other .kafka_options .watermark
448495 ):
449496 return False
450497
@@ -455,6 +502,13 @@ def __hash__(self):
455502
456503 @staticmethod
457504 def from_proto (data_source : DataSourceProto ):
505+ watermark = None
506+ if data_source .kafka_options .HasField ("watermark" ):
507+ watermark = (
508+ timedelta (days = 0 )
509+ if data_source .kafka_options .watermark .ToNanoseconds () == 0
510+ else data_source .kafka_options .watermark .ToTimedelta ()
511+ )
458512 return KafkaSource (
459513 name = data_source .name ,
460514 event_timestamp_column = data_source .timestamp_field ,
@@ -463,6 +517,7 @@ def from_proto(data_source: DataSourceProto):
463517 message_format = StreamFormat .from_proto (
464518 data_source .kafka_options .message_format
465519 ),
520+ watermark = watermark ,
466521 topic = data_source .kafka_options .topic ,
467522 created_timestamp_column = data_source .created_timestamp_column ,
468523 timestamp_field = data_source .timestamp_field ,
0 commit comments