@@ -764,13 +764,13 @@ class PushSource(DataSource):
764764
765765 # TODO(adchia): consider adding schema here in case where Feast manages pushing events to the offline store
766766 # TODO(adchia): consider a "mode" to support pushing raw vs transformed events
767- batch_source : DataSource
767+ batch_source : Optional [ DataSource ] = None
768768
769769 def __init__ (
770770 self ,
771771 * ,
772772 name : str ,
773- batch_source : DataSource ,
773+ batch_source : Optional [ DataSource ] = None ,
774774 description : Optional [str ] = "" ,
775775 tags : Optional [Dict [str , str ]] = None ,
776776 owner : Optional [str ] = "" ,
@@ -815,8 +815,12 @@ def get_table_column_names_and_types(
815815
816816 @staticmethod
817817 def from_proto (data_source : DataSourceProto ):
818- assert data_source .HasField ("batch_source" )
819- batch_source = DataSource .from_proto (data_source .batch_source )
818+ # assert data_source.HasField("batch_source")
819+ batch_source = (
820+ DataSource .from_proto (data_source .batch_source )
821+ if data_source .HasField ("batch_source" )
822+ else None
823+ )
820824
821825 return PushSource (
822826 name = data_source .name ,
@@ -827,19 +831,19 @@ def from_proto(data_source: DataSourceProto):
827831 )
828832
829833 def to_proto (self ) -> DataSourceProto :
830- batch_source_proto = None
831- if self .batch_source :
832- batch_source_proto = self .batch_source .to_proto ()
834+ # batch_source_proto = None
833835
834836 data_source_proto = DataSourceProto (
835837 name = self .name ,
836838 type = DataSourceProto .PUSH_SOURCE ,
837839 description = self .description ,
838840 tags = self .tags ,
839841 owner = self .owner ,
840- batch_source = batch_source_proto ,
841842 )
842843
844+ if self .batch_source :
845+ data_source_proto .batch_source .MergeFrom (self .batch_source .to_proto ())
846+
843847 return data_source_proto
844848
845849 def get_table_query_string (self ) -> str :
0 commit comments