diff --git a/sdk/python/feast/feature_logging.py b/sdk/python/feast/feature_logging.py index 70fab930bb..e298298836 100644 --- a/sdk/python/feast/feature_logging.py +++ b/sdk/python/feast/feature_logging.py @@ -52,17 +52,11 @@ def get_schema(self, registry: "Registry") -> pa.Schema: fields: Dict[str, pa.DataType] = {} for projection in self._feature_service.feature_view_projections: - for feature in projection.features: - fields[ - f"{projection.name_to_use()}__{feature.name}" - ] = FEAST_TYPE_TO_ARROW_TYPE[feature.dtype] - fields[ - f"{projection.name_to_use()}__{feature.name}__timestamp" - ] = PA_TIMESTAMP_TYPE - fields[ - f"{projection.name_to_use()}__{feature.name}__status" - ] = pa.int32() - + # The order of fields in the generated schema should match + # the order created on the other side (inside Go logger). + # Otherwise, some offline stores might not accept parquet files (produced by Go). + # Go code can be found here: + # https://github.com/feast-dev/feast/blob/master/go/internal/feast/server/logging/memorybuffer.go#L51 try: feature_view = registry.get_feature_view(projection.name, self._project) except FeatureViewNotFoundException: @@ -91,9 +85,21 @@ def get_schema(self, registry: "Registry") -> pa.Schema: from_value_type(entity.value_type) ] + for feature in projection.features: + fields[ + f"{projection.name_to_use()}__{feature.name}" + ] = FEAST_TYPE_TO_ARROW_TYPE[feature.dtype] + fields[ + f"{projection.name_to_use()}__{feature.name}__timestamp" + ] = PA_TIMESTAMP_TYPE + fields[ + f"{projection.name_to_use()}__{feature.name}__status" + ] = pa.int32() + # system columns - fields[REQUEST_ID_FIELD] = pa.string() fields[LOG_TIMESTAMP_FIELD] = pa.timestamp("us", tz=UTC) + fields[LOG_DATE_FIELD] = pa.date32() + fields[REQUEST_ID_FIELD] = pa.string() return pa.schema( [pa.field(name, data_type) for name, data_type in fields.items()]