1515import enum
1616import warnings
1717from abc import ABC , abstractmethod
18- from typing import Any , Callable , Dict , Iterable , Optional , Tuple
18+ from typing import Any , Callable , Dict , Iterable , List , Optional , Tuple , Union
1919
2020from google .protobuf .json_format import MessageToJson
2121
2222from feast import type_map
2323from feast .data_format import StreamFormat
24+ from feast .field import Field
2425from feast .protos .feast .core .DataSource_pb2 import DataSource as DataSourceProto
2526from feast .repo_config import RepoConfig , get_data_source_class_from_type
27+ from feast .types import VALUE_TYPES_TO_FEAST_TYPES
2628from feast .value_type import ValueType
2729
2830
@@ -449,27 +451,45 @@ class RequestSource(DataSource):
449451
450452 Args:
451453 name: Name of the request data source
452- schema: Schema mapping from the input feature name to a ValueType
454+ schema Union[Dict[str, ValueType], List[Field]] : Schema mapping from the input feature name to a ValueType
453455 description (optional): A human-readable description.
454456 tags (optional): A dictionary of key-value pairs to store arbitrary metadata.
455457 owner (optional): The owner of the request data source, typically the email of the primary
456458 maintainer.
457459 """
458460
459461 name : str
460- schema : Dict [ str , ValueType ]
462+ schema : List [ Field ]
461463
462464 def __init__ (
463465 self ,
464466 name : str ,
465- schema : Dict [str , ValueType ],
467+ schema : Union [ Dict [str , ValueType ], List [ Field ] ],
466468 description : Optional [str ] = "" ,
467469 tags : Optional [Dict [str , str ]] = None ,
468470 owner : Optional [str ] = "" ,
469471 ):
470472 """Creates a RequestSource object."""
471473 super ().__init__ (name = name , description = description , tags = tags , owner = owner )
472- self .schema = schema
474+ if isinstance (schema , Dict ):
475+ warnings .warn (
476+ "Schema in RequestSource is changing type. The schema data type Dict[str, ValueType] is being deprecated in Feast 0.23. "
477+ "Please use List[Field] instead for the schema" ,
478+ DeprecationWarning ,
479+ )
480+ schemaList = []
481+ for key , valueType in schema .items ():
482+ schemaList .append (
483+ Field (name = key , dtype = VALUE_TYPES_TO_FEAST_TYPES [valueType ])
484+ )
485+ self .schema = schemaList
486+ elif isinstance (schema , List ):
487+ self .schema = schema
488+ else :
489+ raise Exception (
490+ "Schema type must be either dictionary or list, not "
491+ + str (type (schema ))
492+ )
473493
474494 def validate (self , config : RepoConfig ):
475495 pass
@@ -479,33 +499,86 @@ def get_table_column_names_and_types(
479499 ) -> Iterable [Tuple [str , str ]]:
480500 pass
481501
502+ def __eq__ (self , other ):
503+ if not isinstance (other , RequestSource ):
504+ raise TypeError (
505+ "Comparisons should only involve RequestSource class objects."
506+ )
507+ if (
508+ self .name != other .name
509+ or self .description != other .description
510+ or self .owner != other .owner
511+ or self .tags != other .tags
512+ ):
513+ return False
514+ if isinstance (self .schema , List ) and isinstance (other .schema , List ):
515+ for field1 , field2 in zip (self .schema , other .schema ):
516+ if field1 != field2 :
517+ return False
518+ return True
519+ else :
520+ return False
521+
522+ def __hash__ (self ):
523+ return super ().__hash__ ()
524+
482525 @staticmethod
483526 def from_proto (data_source : DataSourceProto ):
527+
528+ deprecated_schema = data_source .request_data_options .deprecated_schema
484529 schema_pb = data_source .request_data_options .schema
485- schema = {}
486- for key , val in schema_pb .items ():
487- schema [key ] = ValueType (val )
488- return RequestSource (
489- name = data_source .name ,
490- schema = schema ,
491- description = data_source .description ,
492- tags = dict (data_source .tags ),
493- owner = data_source .owner ,
494- )
530+
531+ if deprecated_schema and not schema_pb :
532+ warnings .warn (
533+ "Schema in RequestSource is changing type. The schema data type Dict[str, ValueType] is being deprecated in Feast 0.23. "
534+ "Please use List[Field] instead for the schema" ,
535+ DeprecationWarning ,
536+ )
537+ dict_schema = {}
538+ for key , val in deprecated_schema .items ():
539+ dict_schema [key ] = ValueType (val )
540+ return RequestSource (
541+ name = data_source .name ,
542+ schema = dict_schema ,
543+ description = data_source .description ,
544+ tags = dict (data_source .tags ),
545+ owner = data_source .owner ,
546+ )
547+ else :
548+ list_schema = []
549+ for field_proto in schema_pb :
550+ list_schema .append (Field .from_proto (field_proto ))
551+
552+ return RequestSource (
553+ name = data_source .name ,
554+ schema = list_schema ,
555+ description = data_source .description ,
556+ tags = dict (data_source .tags ),
557+ owner = data_source .owner ,
558+ )
495559
496560 def to_proto (self ) -> DataSourceProto :
497- schema_pb = {}
498- for key , value in self .schema .items ():
499- schema_pb [key ] = value .value
500- options = DataSourceProto .RequestDataOptions (schema = schema_pb )
561+
562+ schema_pb = []
563+
564+ if isinstance (self .schema , Dict ):
565+ for key , value in self .schema .items ():
566+ schema_pb .append (
567+ Field (
568+ name = key , dtype = VALUE_TYPES_TO_FEAST_TYPES [value .value ]
569+ ).to_proto ()
570+ )
571+ else :
572+ for field in self .schema :
573+ schema_pb .append (field .to_proto ())
501574 data_source_proto = DataSourceProto (
502575 name = self .name ,
503576 type = DataSourceProto .REQUEST_SOURCE ,
504- request_data_options = options ,
505577 description = self .description ,
506578 tags = self .tags ,
507579 owner = self .owner ,
508580 )
581+ data_source_proto .request_data_options .schema .extend (schema_pb )
509582
510583 return data_source_proto
511584
0 commit comments