@@ -52,17 +52,11 @@ def get_schema(self, registry: "Registry") -> pa.Schema:
5252 fields : Dict [str , pa .DataType ] = {}
5353
5454 for projection in self ._feature_service .feature_view_projections :
55- for feature in projection .features :
56- fields [
57- f"{ projection .name_to_use ()} __{ feature .name } "
58- ] = FEAST_TYPE_TO_ARROW_TYPE [feature .dtype ]
59- fields [
60- f"{ projection .name_to_use ()} __{ feature .name } __timestamp"
61- ] = PA_TIMESTAMP_TYPE
62- fields [
63- f"{ projection .name_to_use ()} __{ feature .name } __status"
64- ] = pa .int32 ()
65-
55+ # The order of fields in the generated schema should match
56+ # the order created on the other side (inside Go logger).
57+ # Otherwise, some offline stores might not accept parquet files (produced by Go).
58+ # Go code can be found here:
59+ # https://github.com/feast-dev/feast/blob/master/go/internal/feast/server/logging/memorybuffer.go#L51
6660 try :
6761 feature_view = registry .get_feature_view (projection .name , self ._project )
6862 except FeatureViewNotFoundException :
@@ -91,9 +85,21 @@ def get_schema(self, registry: "Registry") -> pa.Schema:
9185 from_value_type (entity .value_type )
9286 ]
9387
88+ for feature in projection .features :
89+ fields [
90+ f"{ projection .name_to_use ()} __{ feature .name } "
91+ ] = FEAST_TYPE_TO_ARROW_TYPE [feature .dtype ]
92+ fields [
93+ f"{ projection .name_to_use ()} __{ feature .name } __timestamp"
94+ ] = PA_TIMESTAMP_TYPE
95+ fields [
96+ f"{ projection .name_to_use ()} __{ feature .name } __status"
97+ ] = pa .int32 ()
98+
9499 # system columns
95- fields [REQUEST_ID_FIELD ] = pa .string ()
96100 fields [LOG_TIMESTAMP_FIELD ] = pa .timestamp ("us" , tz = UTC )
101+ fields [LOG_DATE_FIELD ] = pa .date32 ()
102+ fields [REQUEST_ID_FIELD ] = pa .string ()
97103
98104 return pa .schema (
99105 [pa .field (name , data_type ) for name , data_type in fields .items ()]
0 commit comments