@@ -186,6 +186,7 @@ class DataSource(ABC):
186186
187187 def __init__ (
188188 self ,
189+ * ,
189190 event_timestamp_column : Optional [str ] = None ,
190191 created_timestamp_column : Optional [str ] = None ,
191192 field_mapping : Optional [Dict [str , str ]] = None ,
@@ -354,11 +355,12 @@ def get_table_column_names_and_types(
354355
355356 def __init__ (
356357 self ,
357- name : str ,
358- event_timestamp_column : str ,
359- bootstrap_servers : str ,
360- message_format : StreamFormat ,
361- topic : str ,
358+ * args ,
359+ name : Optional [str ] = None ,
360+ event_timestamp_column : Optional [str ] = "" ,
361+ bootstrap_servers : Optional [str ] = None ,
362+ message_format : Optional [StreamFormat ] = None ,
363+ topic : Optional [str ] = None ,
362364 created_timestamp_column : Optional [str ] = "" ,
363365 field_mapping : Optional [Dict [str , str ]] = None ,
364366 date_partition_column : Optional [str ] = "" ,
@@ -368,22 +370,62 @@ def __init__(
368370 timestamp_field : Optional [str ] = "" ,
369371 batch_source : Optional [DataSource ] = None ,
370372 ):
373+ positional_attributes = [
374+ "name" ,
375+ "event_timestamp_column" ,
376+ "bootstrap_servers" ,
377+ "message_format" ,
378+ "topic" ,
379+ ]
380+ _name = name
381+ _event_timestamp_column = event_timestamp_column
382+ _bootstrap_servers = bootstrap_servers or ""
383+ _message_format = message_format
384+ _topic = topic or ""
385+
386+ if args :
387+ warnings .warn (
388+ (
389+ "Kafka parameters should be specified as a keyword argument instead of a positional arg."
390+ "Feast 0.23+ will not support positional arguments to construct Kafka sources"
391+ ),
392+ DeprecationWarning ,
393+ )
394+ if len (args ) > len (positional_attributes ):
395+ raise ValueError (
396+ f"Only { ', ' .join (positional_attributes )} are allowed as positional args when defining "
397+ f"Kafka sources, for backwards compatibility."
398+ )
399+ if len (args ) >= 1 :
400+ _name = args [0 ]
401+ if len (args ) >= 2 :
402+ _event_timestamp_column = args [1 ]
403+ if len (args ) >= 3 :
404+ _bootstrap_servers = args [2 ]
405+ if len (args ) >= 4 :
406+ _message_format = args [3 ]
407+ if len (args ) >= 5 :
408+ _topic = args [4 ]
409+
410+ if _message_format is None :
411+ raise ValueError ("Message format must be specified for Kafka source" )
412+ print ("Asdfasdf" )
371413 super ().__init__ (
372- event_timestamp_column = event_timestamp_column ,
414+ event_timestamp_column = _event_timestamp_column ,
373415 created_timestamp_column = created_timestamp_column ,
374416 field_mapping = field_mapping ,
375417 date_partition_column = date_partition_column ,
376418 description = description ,
377419 tags = tags ,
378420 owner = owner ,
379- name = name ,
421+ name = _name ,
380422 timestamp_field = timestamp_field ,
381423 )
382424 self .batch_source = batch_source
383425 self .kafka_options = KafkaOptions (
384- bootstrap_servers = bootstrap_servers ,
385- message_format = message_format ,
386- topic = topic ,
426+ bootstrap_servers = _bootstrap_servers ,
427+ message_format = _message_format ,
428+ topic = _topic ,
387429 )
388430
389431 def __eq__ (self , other ):
@@ -472,32 +514,56 @@ class RequestSource(DataSource):
472514
473515 def __init__ (
474516 self ,
475- name : str ,
476- schema : Union [Dict [str , ValueType ], List [Field ]],
517+ * args ,
518+ name : Optional [str ] = None ,
519+ schema : Optional [Union [Dict [str , ValueType ], List [Field ]]] = None ,
477520 description : Optional [str ] = "" ,
478521 tags : Optional [Dict [str , str ]] = None ,
479522 owner : Optional [str ] = "" ,
480523 ):
481524 """Creates a RequestSource object."""
482- super ().__init__ (name = name , description = description , tags = tags , owner = owner )
483- if isinstance (schema , Dict ):
525+ positional_attributes = ["name" , "schema" ]
526+ _name = name
527+ _schema = schema
528+ if args :
529+ warnings .warn (
530+ (
531+ "Request source parameters should be specified as a keyword argument instead of a positional arg."
532+ "Feast 0.23+ will not support positional arguments to construct request sources"
533+ ),
534+ DeprecationWarning ,
535+ )
536+ if len (args ) > len (positional_attributes ):
537+ raise ValueError (
538+ f"Only { ', ' .join (positional_attributes )} are allowed as positional args when defining "
539+ f"feature views, for backwards compatibility."
540+ )
541+ if len (args ) >= 1 :
542+ _name = args [0 ]
543+ if len (args ) >= 2 :
544+ _schema = args [1 ]
545+
546+ super ().__init__ (name = _name , description = description , tags = tags , owner = owner )
547+ if not _schema :
548+ raise ValueError ("Schema needs to be provided for Request Source" )
549+ if isinstance (_schema , Dict ):
484550 warnings .warn (
485551 "Schema in RequestSource is changing type. The schema data type Dict[str, ValueType] is being deprecated in Feast 0.23. "
486552 "Please use List[Field] instead for the schema" ,
487553 DeprecationWarning ,
488554 )
489555 schemaList = []
490- for key , valueType in schema .items ():
556+ for key , valueType in _schema .items ():
491557 schemaList .append (
492558 Field (name = key , dtype = VALUE_TYPES_TO_FEAST_TYPES [valueType ])
493559 )
494560 self .schema = schemaList
495- elif isinstance (schema , List ):
496- self .schema = schema
561+ elif isinstance (_schema , List ):
562+ self .schema = _schema
497563 else :
498564 raise Exception (
499565 "Schema type must be either dictionary or list, not "
500- + str (type (schema ))
566+ + str (type (_schema ))
501567 )
502568
503569 def validate (self , config : RepoConfig ):
@@ -643,12 +709,13 @@ def get_table_query_string(self) -> str:
643709
644710 def __init__ (
645711 self ,
646- name : str ,
647- event_timestamp_column : str ,
648- created_timestamp_column : str ,
649- record_format : StreamFormat ,
650- region : str ,
651- stream_name : str ,
712+ * args ,
713+ name : Optional [str ] = None ,
714+ event_timestamp_column : Optional [str ] = "" ,
715+ created_timestamp_column : Optional [str ] = "" ,
716+ record_format : Optional [StreamFormat ] = None ,
717+ region : Optional [str ] = "" ,
718+ stream_name : Optional [str ] = "" ,
652719 field_mapping : Optional [Dict [str , str ]] = None ,
653720 date_partition_column : Optional [str ] = "" ,
654721 description : Optional [str ] = "" ,
@@ -657,10 +724,53 @@ def __init__(
657724 timestamp_field : Optional [str ] = "" ,
658725 batch_source : Optional [DataSource ] = None ,
659726 ):
727+ positional_attributes = [
728+ "name" ,
729+ "event_timestamp_column" ,
730+ "created_timestamp_column" ,
731+ "record_format" ,
732+ "region" ,
733+ "stream_name" ,
734+ ]
735+ _name = name
736+ _event_timestamp_column = event_timestamp_column
737+ _created_timestamp_column = created_timestamp_column
738+ _record_format = record_format
739+ _region = region or ""
740+ _stream_name = stream_name or ""
741+ if args :
742+ warnings .warn (
743+ (
744+ "Kinesis parameters should be specified as a keyword argument instead of a positional arg."
745+ "Feast 0.23+ will not support positional arguments to construct kinesis sources"
746+ ),
747+ DeprecationWarning ,
748+ )
749+ if len (args ) > len (positional_attributes ):
750+ raise ValueError (
751+ f"Only { ', ' .join (positional_attributes )} are allowed as positional args when defining "
752+ f"kinesis sources, for backwards compatibility."
753+ )
754+ if len (args ) >= 1 :
755+ _name = args [0 ]
756+ if len (args ) >= 2 :
757+ _event_timestamp_column = args [1 ]
758+ if len (args ) >= 3 :
759+ _created_timestamp_column = args [2 ]
760+ if len (args ) >= 4 :
761+ _record_format = args [3 ]
762+ if len (args ) >= 5 :
763+ _region = args [4 ]
764+ if len (args ) >= 6 :
765+ _stream_name = args [5 ]
766+
767+ if _record_format is None :
768+ raise ValueError ("Record format must be specified for kinesis source" )
769+
660770 super ().__init__ (
661- name = name ,
662- event_timestamp_column = event_timestamp_column ,
663- created_timestamp_column = created_timestamp_column ,
771+ name = _name ,
772+ event_timestamp_column = _event_timestamp_column ,
773+ created_timestamp_column = _created_timestamp_column ,
664774 field_mapping = field_mapping ,
665775 date_partition_column = date_partition_column ,
666776 description = description ,
@@ -670,7 +780,7 @@ def __init__(
670780 )
671781 self .batch_source = batch_source
672782 self .kinesis_options = KinesisOptions (
673- record_format = record_format , region = region , stream_name = stream_name
783+ record_format = _record_format , region = _region , stream_name = _stream_name
674784 )
675785
676786 def __eq__ (self , other ):
@@ -725,9 +835,9 @@ class PushSource(DataSource):
725835
726836 def __init__ (
727837 self ,
728- * ,
729- name : str ,
730- batch_source : DataSource ,
838+ * args ,
839+ name : Optional [ str ] = None ,
840+ batch_source : Optional [ DataSource ] = None ,
731841 description : Optional [str ] = "" ,
732842 tags : Optional [Dict [str , str ]] = None ,
733843 owner : Optional [str ] = "" ,
@@ -744,10 +854,33 @@ def __init__(
744854 maintainer.
745855
746856 """
747- super ().__init__ (name = name , description = description , tags = tags , owner = owner )
748- self .batch_source = batch_source
749- if not self .batch_source :
750- raise ValueError (f"batch_source is needed for push source { self .name } " )
857+ positional_attributes = ["name" , "batch_source" ]
858+ _name = name
859+ _batch_source = batch_source
860+ if args :
861+ warnings .warn (
862+ (
863+ "Push source parameters should be specified as a keyword argument instead of a positional arg."
864+ "Feast 0.23+ will not support positional arguments to construct push sources"
865+ ),
866+ DeprecationWarning ,
867+ )
868+ if len (args ) > len (positional_attributes ):
869+ raise ValueError (
870+ f"Only { ', ' .join (positional_attributes )} are allowed as positional args when defining "
871+ f"push sources, for backwards compatibility."
872+ )
873+ if len (args ) >= 1 :
874+ _name = args [0 ]
875+ if len (args ) >= 2 :
876+ _batch_source = args [1 ]
877+
878+ super ().__init__ (name = _name , description = description , tags = tags , owner = owner )
879+ if not _batch_source :
880+ raise ValueError (
881+ f"batch_source parameter is needed for push source { self .name } "
882+ )
883+ self .batch_source = _batch_source
751884
752885 def __eq__ (self , other ):
753886 if not isinstance (other , PushSource ):
0 commit comments