Skip to content

Commit 970650b

Browse files
committed
feat: Fix Map/Dict support and implement schema validation
Signed-off-by: ntkathole <nikhilkathole2683@gmail.com>
1 parent 16696b8 commit 970650b

File tree

19 files changed

+648
-42
lines changed

19 files changed

+648
-42
lines changed

docs/getting-started/concepts/feast-types.md

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,35 @@ To make this possible, Feast itself has a type system for all the types it is ab
55

66
Feast's type system is built on top of [protobuf](https://github.com/protocolbuffers/protobuf). The messages that make up the type system can be found [here](https://github.com/feast-dev/feast/blob/master/protos/feast/types/Value.proto), and the corresponding python classes that wrap them can be found [here](https://github.com/feast-dev/feast/blob/master/sdk/python/feast/types.py).
77

8-
Feast supports primitive data types (numerical values, strings, bytes, booleans and timestamps). The only complex data type Feast supports is Arrays, and arrays cannot contain other arrays.
8+
Feast supports the following categories of data types:
9+
10+
- **Primitive types**: numerical values (`Int32`, `Int64`, `Float32`, `Float64`), `String`, `Bytes`, `Bool`, and `UnixTimestamp`.
11+
- **Array types**: ordered lists of any primitive type, e.g. `Array(Int64)`, `Array(String)`.
12+
- **Set types**: unordered collections of unique values for any primitive type, e.g. `Set(String)`, `Set(Int64)`.
13+
- **Map types**: dictionary-like structures with string keys and values that can be any supported Feast type (including nested maps), e.g. `Map`, `Array(Map)`.
14+
15+
For a complete reference with examples, see [Type System](../../reference/type-system.md).
916

1017
Each feature or schema field in Feast is associated with a data type, which is stored in Feast's [registry](registry.md). These types are also used to ensure that Feast operates on values correctly (e.g. making sure that timestamp columns used for [point-in-time correct joins](point-in-time-joins.md) actually have the timestamp type).
1118

12-
As a result, each system that feast interacts with needs a way to translate data types from the native platform, into a feast type. E.g., Snowflake SQL types are converted to Feast types [here](https://rtd.feast.dev/en/master/feast.html#feast.type_map.snowflake_python_type_to_feast_value_type). The onus is therefore on authors of offline or online store connectors to make sure that this type mapping happens correctly.
19+
As a result, each system that Feast interacts with needs a way to translate data types from the native platform into a Feast type. E.g., Snowflake SQL types are converted to Feast types [here](https://rtd.feast.dev/en/master/feast.html#feast.type_map.snowflake_python_type_to_feast_value_type). The onus is therefore on authors of offline or online store connectors to make sure that this type mapping happens correctly.
20+
21+
### Backend Type Mapping for Maps
22+
23+
Map types are supported across all major Feast backends:
24+
25+
| Backend | Native Type | Feast Type |
26+
|---------|-------------|------------|
27+
| PostgreSQL | `jsonb` | `Map` |
28+
| PostgreSQL | `jsonb[]` | `Array(Map)` |
29+
| Snowflake | `VARIANT`, `OBJECT` | `Map` |
30+
| Redshift | `SUPER` | `Map` |
31+
| BigQuery | `JSON`, `STRUCT` | `Map` |
32+
| Spark | `map<string,string>` | `Map` |
33+
| Spark | `array<map<string,string>>` | `Array(Map)` |
34+
| MSSQL | `nvarchar(max)` | `Map` |
35+
| DynamoDB | Proto bytes | `Map` |
36+
| Redis | Proto bytes | `Map` |
37+
| Milvus | `VARCHAR` (base64 proto) | `Map` |
1338

1439
**Note**: Feast currently does *not* support a null type in its type system.

docs/getting-started/concepts/feature-view.md

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ Feature views consist of:
2424
* (optional, but recommended) a schema specifying one or more [features](feature-view.md#field) (without this, Feast will infer the schema by reading from the data source)
2525
* (optional, but recommended) metadata (for example, description, or other free-form metadata via `tags`)
2626
* (optional) a TTL, which limits how far back Feast will look when generating historical datasets
27+
* (optional) `enable_validation=True`, which enables schema validation during materialization (see [Schema Validation](#schema-validation) below)
2728

2829
Feature views allow Feast to model your existing feature data in a consistent way in both an offline (training) and online (serving) environment. Feature views generally contain features that are properties of a specific object, in which case that object is defined as an entity and included in the feature view.
2930

@@ -159,6 +160,36 @@ Feature names must be unique within a [feature view](feature-view.md#feature-vie
159160

160161
Each field can have additional metadata associated with it, specified as key-value [tags](https://rtd.feast.dev/en/master/feast.html#feast.field.Field).
161162

163+
## Schema Validation
164+
165+
Feature views support an optional `enable_validation` parameter that enables schema validation during materialization and historical feature retrieval. When enabled, Feast verifies that:
166+
167+
- All declared feature columns are present in the input data.
168+
- Column data types match the expected Feast types (mismatches are logged as warnings).
169+
170+
This is useful for catching data quality issues early in the pipeline. To enable it:
171+
172+
```python
173+
from feast import FeatureView, Field
174+
from feast.types import Int64, Float32, Map
175+
176+
validated_fv = FeatureView(
177+
name="validated_features",
178+
entities=[driver],
179+
schema=[
180+
Field(name="trips_today", dtype=Int64),
181+
Field(name="rating", dtype=Float32),
182+
Field(name="preferences", dtype=Map),
183+
],
184+
source=my_source,
185+
enable_validation=True, # enables schema checks
186+
)
187+
```
188+
189+
Validation is supported in all compute engines (Local, Spark, and Ray). When a required column is missing, a `ValueError` is raised. Type mismatches are logged as warnings but do not block execution, allowing for safe gradual adoption.
190+
191+
The `enable_validation` parameter is also available on `BatchFeatureView` and `StreamFeatureView`, as well as their respective decorators (`@batch_feature_view` and `@stream_feature_view`).
192+
162193
## \[Alpha] On demand feature views
163194

164195
On demand feature views allows data scientists to use existing features and request time data (features only available at request time) to transform and create new features. Users define python transformation logic which is executed in both the historical retrieval and online retrieval paths.

docs/how-to-guides/dbt-integration.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -289,6 +289,10 @@ Feast automatically maps dbt/warehouse column types to Feast types:
289289
| `TIMESTAMP`, `DATETIME` | `UnixTimestamp` |
290290
| `BYTES`, `BINARY` | `Bytes` |
291291
| `ARRAY<type>` | `Array(type)` |
292+
| `JSON`, `JSONB` | `Map` |
293+
| `VARIANT`, `OBJECT` | `Map` |
294+
| `SUPER` | `Map` |
295+
| `MAP<string,string>` | `Map` |
292296

293297
Snowflake `NUMBER(precision, scale)` types are handled specially:
294298
- Scale > 0: `Float64`

docs/specs/offline_store_format.md

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@ Here's how Feast types map to Pandas types for Feast APIs that take in or return
4949
| DOUBLE\_LIST | `list[float]`|
5050
| FLOAT\_LIST | `list[float]`|
5151
| BOOL\_LIST | `list[bool]`|
52+
| MAP | `dict` (`Dict[str, Any]`)|
53+
| MAP\_LIST | `list[dict]` (`List[Dict[str, Any]]`)|
5254

5355
Note that this mapping is non-injective, that is more than one Pandas type may corresponds to one Feast type (but not vice versa). In these cases, when converting Feast values to Pandas, the **first** Pandas type in the table above is used.
5456

@@ -78,6 +80,8 @@ Here's how Feast types map to BigQuery types when using BigQuery for offline sto
7880
| DOUBLE\_LIST | `ARRAY<FLOAT64>`|
7981
| FLOAT\_LIST | `ARRAY<FLOAT64>`|
8082
| BOOL\_LIST | `ARRAY<BOOL>`|
83+
| MAP | `JSON` / `STRUCT` |
84+
| MAP\_LIST | `ARRAY<JSON>` / `ARRAY<STRUCT>` |
8185

8286
Values that are not specified by the table above will cause an error on conversion.
8387

@@ -94,3 +98,21 @@ https://docs.snowflake.com/en/user-guide/python-connector-pandas.html#snowflake-
9498
| INT32 | `INT8 / UINT8 / INT16 / UINT16 / INT32 / UINT32` |
9599
| INT64 | `INT64 / UINT64` |
96100
| DOUBLE | `FLOAT64` |
101+
| MAP | `VARIANT` / `OBJECT` |
102+
103+
#### Redshift Types
104+
Here's how Feast types map to Redshift types when using Redshift for offline storage:
105+
106+
| Feast Type | Redshift Type |
107+
|-------------|--|
108+
| Event Timestamp | `TIMESTAMP` / `TIMESTAMPTZ` |
109+
| BYTES | `VARBYTE` |
110+
| STRING | `VARCHAR` |
111+
| INT32 | `INT4` / `SMALLINT` |
112+
| INT64 | `INT8` / `BIGINT` |
113+
| DOUBLE | `FLOAT8` / `DOUBLE PRECISION` |
114+
| FLOAT | `FLOAT4` / `REAL` |
115+
| BOOL | `BOOL` |
116+
| MAP | `SUPER` |
117+
118+
Note: Redshift's `SUPER` type stores semi-structured JSON data. During materialization, Feast automatically handles `SUPER` columns that are exported as JSON strings by parsing them back into Python dictionaries before converting to `MAP` proto values.

sdk/python/feast/batch_feature_view.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,7 @@ def __init__(
9797
feature_transformation: Optional[Transformation] = None,
9898
batch_engine: Optional[Dict[str, Any]] = None,
9999
aggregations: Optional[List[Aggregation]] = None,
100+
enable_validation: bool = False,
100101
):
101102
if not flags_helper.is_test():
102103
warnings.warn(
@@ -136,6 +137,7 @@ def __init__(
136137
source=source, # type: ignore[arg-type]
137138
sink_source=sink_source,
138139
mode=mode,
140+
enable_validation=enable_validation,
139141
)
140142

141143
def get_feature_transformation(self) -> Optional[Transformation]:
@@ -169,6 +171,7 @@ def batch_feature_view(
169171
description: str = "",
170172
owner: str = "",
171173
schema: Optional[List[Field]] = None,
174+
enable_validation: bool = False,
172175
):
173176
"""
174177
Creates a BatchFeatureView object with the given user-defined function (UDF) as the transformation.
@@ -199,6 +202,7 @@ def decorator(user_function):
199202
schema=schema,
200203
udf=user_function,
201204
udf_string=udf_string,
205+
enable_validation=enable_validation,
202206
)
203207
functools.update_wrapper(wrapper=batch_feature_view_obj, wrapped=user_function)
204208
return batch_feature_view_obj

sdk/python/feast/feature_view.py

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,7 @@ class FeatureView(BaseFeatureView):
107107
owner: str
108108
materialization_intervals: List[Tuple[datetime, datetime]]
109109
mode: Optional[Union["TransformationMode", str]]
110+
enable_validation: bool
110111

111112
def __init__(
112113
self,
@@ -123,6 +124,7 @@ def __init__(
123124
tags: Optional[Dict[str, str]] = None,
124125
owner: str = "",
125126
mode: Optional[Union["TransformationMode", str]] = None,
127+
enable_validation: bool = False,
126128
):
127129
"""
128130
Creates a FeatureView object.
@@ -148,11 +150,14 @@ def __init__(
148150
primary maintainer.
149151
mode (optional): The transformation mode for feature transformations. Only meaningful
150152
when transformations are applied. Choose from TransformationMode enum values.
153+
enable_validation (optional): If True, enables schema validation during materialization
154+
to check that data conforms to the declared feature types. Default is False.
151155
152156
Raises:
153157
ValueError: A field mapping conflicts with an Entity or a Feature.
154158
"""
155159
self.name = name
160+
self.enable_validation = enable_validation
156161
self.entities = [e.name for e in entities] if entities else [DUMMY_ENTITY_NAME]
157162
self.ttl = ttl
158163
schema = schema or []
@@ -457,13 +462,17 @@ def to_proto_spec(
457462
else self.mode
458463
)
459464

465+
tags = dict(self.tags) if self.tags else {}
466+
if self.enable_validation:
467+
tags["feast:enable_validation"] = "true"
468+
460469
return FeatureViewSpecProto(
461470
name=self.name,
462471
entities=self.entities,
463472
entity_columns=[field.to_proto() for field in self.entity_columns],
464473
features=[feature.to_proto() for feature in self.features],
465474
description=self.description,
466-
tags=self.tags,
475+
tags=tags,
467476
owner=self.owner,
468477
ttl=(ttl_duration if ttl_duration is not None else None),
469478
online=self.online,
@@ -642,6 +651,13 @@ def _from_proto_internal(
642651
f"Entities: {feature_view.entities} vs Entity Columns: {feature_view.entity_columns}"
643652
)
644653

654+
# Restore enable_validation from well-known tag.
655+
proto_tags = dict(feature_view_proto.spec.tags)
656+
feature_view.enable_validation = (
657+
proto_tags.pop("feast:enable_validation", "false").lower() == "true"
658+
)
659+
feature_view.tags = proto_tags
660+
645661
# FeatureViewProjections are not saved in the FeatureView proto.
646662
# Create the default projection.
647663
feature_view.projection = FeatureViewProjection.from_feature_view_definition(

sdk/python/feast/infra/compute_engines/local/feature_builder.py

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import logging
12
from typing import Union
23

34
from feast.aggregation import aggregation_specs_to_agg_ops
@@ -16,6 +17,9 @@
1617
LocalValidationNode,
1718
)
1819
from feast.infra.registry.base_registry import BaseRegistry
20+
from feast.types import from_feast_to_pyarrow_type
21+
22+
logger = logging.getLogger(__name__)
1923

2024

2125
class LocalFeatureBuilder(FeatureBuilder):
@@ -88,7 +92,24 @@ def build_transformation_node(self, view, input_nodes):
8892
return node
8993

9094
def build_validation_node(self, view, input_node):
91-
validation_config = view.validation_config
95+
validation_config = getattr(view, "validation_config", None) or {}
96+
97+
if not validation_config.get("columns") and hasattr(view, "features"):
98+
columns = {}
99+
for feature in view.features:
100+
try:
101+
columns[feature.name] = from_feast_to_pyarrow_type(feature.dtype)
102+
except (ValueError, KeyError):
103+
logger.debug(
104+
"Could not resolve PyArrow type for feature '%s' "
105+
"(dtype=%s), skipping type check for this column.",
106+
feature.name,
107+
feature.dtype,
108+
)
109+
columns[feature.name] = None
110+
if columns:
111+
validation_config = {**validation_config, "columns": columns}
112+
92113
node = LocalValidationNode(
93114
"validate", validation_config, self.backend, inputs=[input_node]
94115
)

sdk/python/feast/infra/compute_engines/local/nodes.py

Lines changed: 45 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import logging
12
from datetime import datetime, timedelta
23
from typing import List, Optional, Union
34

@@ -19,6 +20,8 @@
1920
)
2021
from feast.utils import _convert_arrow_to_proto
2122

23+
logger = logging.getLogger(__name__)
24+
2225
ENTITY_TS_ALIAS = "__entity_event_timestamp"
2326

2427

@@ -236,15 +239,52 @@ def __init__(
236239

237240
def execute(self, context: ExecutionContext) -> ArrowTableValue:
238241
input_table = self.get_single_table(context).data
239-
df = self.backend.from_arrow(input_table)
240-
# Placeholder for actual validation logic
242+
241243
if self.validation_config:
242-
print(f"[Validation: {self.name}] Passed.")
243-
result = self.backend.to_arrow(df)
244-
output = ArrowTableValue(result)
244+
self._validate_schema(input_table)
245+
246+
output = ArrowTableValue(input_table)
245247
context.node_outputs[self.name] = output
246248
return output
247249

250+
def _validate_schema(self, table: pa.Table):
251+
"""Validate that the input table conforms to the expected schema.
252+
253+
Checks that all expected columns are present and that their types
254+
are compatible with the declared Feast types. Logs warnings for
255+
type mismatches but only raises on missing columns.
256+
"""
257+
expected_columns = self.validation_config.get("columns", {})
258+
if not expected_columns:
259+
logger.debug(
260+
"[Validation: %s] No column schema to validate against.",
261+
self.name,
262+
)
263+
return
264+
265+
actual_columns = set(table.column_names)
266+
expected_names = set(expected_columns.keys())
267+
268+
missing = expected_names - actual_columns
269+
if missing:
270+
raise ValueError(
271+
f"[Validation: {self.name}] Missing expected columns: {missing}. "
272+
f"Actual columns: {sorted(actual_columns)}"
273+
)
274+
275+
for col_name, expected_type in expected_columns.items():
276+
actual_type = table.schema.field(col_name).type
277+
if expected_type is not None and actual_type != expected_type:
278+
logger.warning(
279+
"[Validation: %s] Column '%s' type mismatch: expected %s, got %s",
280+
self.name,
281+
col_name,
282+
expected_type,
283+
actual_type,
284+
)
285+
286+
logger.debug("[Validation: %s] Schema validation passed.", self.name)
287+
248288

249289
class LocalOutputNode(LocalNode):
250290
def __init__(

sdk/python/feast/infra/compute_engines/ray/feature_builder.py

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,10 @@
1717
RayJoinNode,
1818
RayReadNode,
1919
RayTransformationNode,
20+
RayValidationNode,
2021
RayWriteNode,
2122
)
23+
from feast.types import from_feast_to_pyarrow_type
2224

2325
if TYPE_CHECKING:
2426
from feast.infra.compute_engines.ray.config import RayComputeEngineConfig
@@ -174,11 +176,29 @@ def build_output_nodes(self, view, final_node):
174176

175177
def build_validation_node(self, view, input_node):
176178
"""Build the validation node for feature validation."""
177-
# TODO: Implement validation logic
178-
logger.warning(
179-
"Feature validation is not yet implemented for Ray compute engine."
179+
expected_columns = {}
180+
if hasattr(view, "features"):
181+
for feature in view.features:
182+
try:
183+
expected_columns[feature.name] = from_feast_to_pyarrow_type(
184+
feature.dtype
185+
)
186+
except (ValueError, KeyError):
187+
logger.debug(
188+
"Could not resolve PyArrow type for feature '%s' "
189+
"(dtype=%s), skipping type check for this column.",
190+
feature.name,
191+
feature.dtype,
192+
)
193+
expected_columns[feature.name] = None
194+
195+
node = RayValidationNode(
196+
f"{view.name}:validate",
197+
expected_columns=expected_columns,
198+
inputs=[input_node],
180199
)
181-
return input_node
200+
self.nodes.append(node)
201+
return node
182202

183203
def _build(self, view, input_nodes: Optional[List[DAGNode]]) -> DAGNode:
184204
has_physical_source = (hasattr(view, "batch_source") and view.batch_source) or (

0 commit comments

Comments
 (0)