@@ -139,6 +139,7 @@ class DataSource(ABC):
139139 DataSource that can be used to source features.
140140
141141 Args:
142+ name: Name of data source, which should be unique within a project
142143 event_timestamp_column (optional): Event timestamp column used for point in time
143144 joins of feature values.
144145 created_timestamp_column (optional): Timestamp column indicating when the row
@@ -149,19 +150,22 @@ class DataSource(ABC):
149150 date_partition_column (optional): Timestamp column used for partitioning.
150151 """
151152
153+ name : str
152154 event_timestamp_column : str
153155 created_timestamp_column : str
154156 field_mapping : Dict [str , str ]
155157 date_partition_column : str
156158
157159 def __init__ (
158160 self ,
161+ name : str ,
159162 event_timestamp_column : Optional [str ] = None ,
160163 created_timestamp_column : Optional [str ] = None ,
161164 field_mapping : Optional [Dict [str , str ]] = None ,
162165 date_partition_column : Optional [str ] = None ,
163166 ):
164167 """Creates a DataSource object."""
168+ self .name = name
165169 self .event_timestamp_column = (
166170 event_timestamp_column if event_timestamp_column else ""
167171 )
@@ -173,12 +177,16 @@ def __init__(
173177 date_partition_column if date_partition_column else ""
174178 )
175179
180+ def __hash__ (self ):
181+ return hash ((id (self ), self .name ))
182+
176183 def __eq__ (self , other ):
177184 if not isinstance (other , DataSource ):
178185 raise TypeError ("Comparisons should only involve DataSource class objects." )
179186
180187 if (
181- self .event_timestamp_column != other .event_timestamp_column
188+ self .name != other .name
189+ or self .event_timestamp_column != other .event_timestamp_column
182190 or self .created_timestamp_column != other .created_timestamp_column
183191 or self .field_mapping != other .field_mapping
184192 or self .date_partition_column != other .date_partition_column
@@ -206,7 +214,9 @@ def from_proto(data_source: DataSourceProto) -> Any:
206214 cls = get_data_source_class_from_type (data_source .data_source_class_type )
207215 return cls .from_proto (data_source )
208216
209- if data_source .file_options .file_format and data_source .file_options .file_url :
217+ if data_source .request_data_options and data_source .request_data_options .schema :
218+ data_source_obj = RequestDataSource .from_proto (data_source )
219+ elif data_source .file_options .file_format and data_source .file_options .file_url :
210220 from feast .infra .offline_stores .file_source import FileSource
211221
212222 data_source_obj = FileSource .from_proto (data_source )
@@ -246,7 +256,7 @@ def from_proto(data_source: DataSourceProto) -> Any:
246256 @abstractmethod
247257 def to_proto (self ) -> DataSourceProto :
248258 """
249- Converts an DataSourceProto object to its protobuf representation.
259+ Converts a DataSourceProto object to its protobuf representation.
250260 """
251261 raise NotImplementedError
252262
@@ -296,6 +306,7 @@ def get_table_column_names_and_types(
296306
297307 def __init__ (
298308 self ,
309+ name : str ,
299310 event_timestamp_column : str ,
300311 bootstrap_servers : str ,
301312 message_format : StreamFormat ,
@@ -305,6 +316,7 @@ def __init__(
305316 date_partition_column : Optional [str ] = "" ,
306317 ):
307318 super ().__init__ (
319+ name ,
308320 event_timestamp_column ,
309321 created_timestamp_column ,
310322 field_mapping ,
@@ -335,6 +347,7 @@ def __eq__(self, other):
335347 @staticmethod
336348 def from_proto (data_source : DataSourceProto ):
337349 return KafkaSource (
350+ name = data_source .name ,
338351 field_mapping = dict (data_source .field_mapping ),
339352 bootstrap_servers = data_source .kafka_options .bootstrap_servers ,
340353 message_format = StreamFormat .from_proto (
@@ -348,6 +361,7 @@ def from_proto(data_source: DataSourceProto):
348361
349362 def to_proto (self ) -> DataSourceProto :
350363 data_source_proto = DataSourceProto (
364+ name = self .name ,
351365 type = DataSourceProto .STREAM_KAFKA ,
352366 field_mapping = self .field_mapping ,
353367 kafka_options = self .kafka_options .to_proto (),
@@ -363,6 +377,9 @@ def to_proto(self) -> DataSourceProto:
363377 def source_datatype_to_feast_value_type () -> Callable [[str ], ValueType ]:
364378 return type_map .redshift_to_feast_value_type
365379
380+ def get_table_query_string (self ) -> str :
381+ raise NotImplementedError
382+
366383
367384class RequestDataSource (DataSource ):
368385 """
@@ -373,19 +390,14 @@ class RequestDataSource(DataSource):
373390 schema: Schema mapping from the input feature name to a ValueType
374391 """
375392
376- @staticmethod
377- def source_datatype_to_feast_value_type () -> Callable [[str ], ValueType ]:
378- raise NotImplementedError
379-
380393 name : str
381394 schema : Dict [str , ValueType ]
382395
383396 def __init__ (
384397 self , name : str , schema : Dict [str , ValueType ],
385398 ):
386399 """Creates a RequestDataSource object."""
387- super ().__init__ ()
388- self .name = name
400+ super ().__init__ (name )
389401 self .schema = schema
390402
391403 def validate (self , config : RepoConfig ):
@@ -402,21 +414,28 @@ def from_proto(data_source: DataSourceProto):
402414 schema = {}
403415 for key in schema_pb .keys ():
404416 schema [key ] = ValueType (schema_pb .get (key ))
405- return RequestDataSource (
406- name = data_source .request_data_options .name , schema = schema
407- )
417+ return RequestDataSource (name = data_source .name , schema = schema )
408418
409419 def to_proto (self ) -> DataSourceProto :
410420 schema_pb = {}
411421 for key , value in self .schema .items ():
412422 schema_pb [key ] = value .value
413- options = DataSourceProto .RequestDataOptions (name = self . name , schema = schema_pb )
423+ options = DataSourceProto .RequestDataOptions (schema = schema_pb )
414424 data_source_proto = DataSourceProto (
415- type = DataSourceProto .REQUEST_SOURCE , request_data_options = options
425+ name = self .name ,
426+ type = DataSourceProto .REQUEST_SOURCE ,
427+ request_data_options = options ,
416428 )
417429
418430 return data_source_proto
419431
432+ def get_table_query_string (self ) -> str :
433+ raise NotImplementedError
434+
435+ @staticmethod
436+ def source_datatype_to_feast_value_type () -> Callable [[str ], ValueType ]:
437+ raise NotImplementedError
438+
420439
421440class KinesisSource (DataSource ):
422441 def validate (self , config : RepoConfig ):
@@ -430,6 +449,7 @@ def get_table_column_names_and_types(
430449 @staticmethod
431450 def from_proto (data_source : DataSourceProto ):
432451 return KinesisSource (
452+ name = data_source .name ,
433453 field_mapping = dict (data_source .field_mapping ),
434454 record_format = StreamFormat .from_proto (
435455 data_source .kinesis_options .record_format
@@ -445,8 +465,12 @@ def from_proto(data_source: DataSourceProto):
445465 def source_datatype_to_feast_value_type () -> Callable [[str ], ValueType ]:
446466 pass
447467
468+ def get_table_query_string (self ) -> str :
469+ raise NotImplementedError
470+
448471 def __init__ (
449472 self ,
473+ name : str ,
450474 event_timestamp_column : str ,
451475 created_timestamp_column : str ,
452476 record_format : StreamFormat ,
@@ -456,6 +480,7 @@ def __init__(
456480 date_partition_column : Optional [str ] = "" ,
457481 ):
458482 super ().__init__ (
483+ name ,
459484 event_timestamp_column ,
460485 created_timestamp_column ,
461486 field_mapping ,
@@ -475,7 +500,8 @@ def __eq__(self, other):
475500 )
476501
477502 if (
478- self .kinesis_options .record_format != other .kinesis_options .record_format
503+ self .name != other .name
504+ or self .kinesis_options .record_format != other .kinesis_options .record_format
479505 or self .kinesis_options .region != other .kinesis_options .region
480506 or self .kinesis_options .stream_name != other .kinesis_options .stream_name
481507 ):
@@ -485,6 +511,7 @@ def __eq__(self, other):
485511
486512 def to_proto (self ) -> DataSourceProto :
487513 data_source_proto = DataSourceProto (
514+ name = self .name ,
488515 type = DataSourceProto .STREAM_KINESIS ,
489516 field_mapping = self .field_mapping ,
490517 kinesis_options = self .kinesis_options .to_proto (),
0 commit comments