diff --git a/.secrets.baseline b/.secrets.baseline index e7126304069..d2796f34daf 100644 --- a/.secrets.baseline +++ b/.secrets.baseline @@ -957,7 +957,7 @@ "filename": "infra/feast-operator/api/v1/featurestore_types.go", "hashed_secret": "44e17306b837162269a410204daaa5ecee4ec22c", "is_verified": false, - "line_number": 906 + "line_number": 907 } ], "infra/feast-operator/api/v1/zz_generated.deepcopy.go": [ @@ -989,7 +989,7 @@ "filename": "infra/feast-operator/api/v1alpha1/featurestore_types.go", "hashed_secret": "44e17306b837162269a410204daaa5ecee4ec22c", "is_verified": false, - "line_number": 649 + "line_number": 650 } ], "infra/feast-operator/api/v1alpha1/zz_generated.deepcopy.go": [ @@ -1555,5 +1555,5 @@ } ] }, - "generated_at": "2026-06-11T15:45:28Z" + "generated_at": "2026-06-16T21:53:17Z" } diff --git a/README.md b/README.md index 115bd37903f..c8d5835c725 100644 --- a/README.md +++ b/README.md @@ -227,6 +227,7 @@ The list below contains the functionality that contributors are planning to deve * [x] [SingleStore](https://docs.feast.dev/reference/online-stores/singlestore) * [x] [Couchbase](https://docs.feast.dev/reference/online-stores/couchbase) * [x] [MongoDB](https://docs.feast.dev/reference/online-stores/mongodb) + * [x] [Aerospike](https://docs.feast.dev/reference/online-stores/aerospike) * [x] [Qdrant (vector store)](https://docs.feast.dev/reference/online-stores/qdrant) * [x] [Milvus (vector store)](https://docs.feast.dev/reference/online-stores/milvus) * [x] [Faiss (vector store)](https://docs.feast.dev/reference/online-stores/faiss) diff --git a/docs/SUMMARY.md b/docs/SUMMARY.md index c94dd88708d..44971fedda1 100644 --- a/docs/SUMMARY.md +++ b/docs/SUMMARY.md @@ -164,6 +164,7 @@ * [SingleStore](reference/online-stores/singlestore.md) * [Milvus](reference/online-stores/milvus.md) * [MongoDB](reference/online-stores/mongodb.md) + * [Aerospike](reference/online-stores/aerospike.md) * [Elasticsearch](reference/online-stores/elasticsearch.md) * [Qdrant](reference/online-stores/qdrant.md) * [Faiss](reference/online-stores/faiss.md) diff --git a/docs/how-to-guides/online-server-performance-tuning.md b/docs/how-to-guides/online-server-performance-tuning.md index b280fe53cff..f10d7bff8c8 100644 --- a/docs/how-to-guides/online-server-performance-tuning.md +++ b/docs/how-to-guides/online-server-performance-tuning.md @@ -278,6 +278,7 @@ The online store is the single largest factor in `get_online_features()` latency | **DynamoDB** | 2–5 ms | Yes | Serverless, auto-scaling on AWS | Pay-per-request cost; batch API limits (100 items) | | **PostgreSQL** | 3–10 ms | No (threadpool) | Teams with existing Postgres infra | Connection pooling needed at scale | | **MongoDB** | 2–5 ms | Yes | Flexible schema, async-native | Requires index tuning for large datasets | +| **Aerospike** | < 1 ms | No (threadpool) | Ultra-low latency, hybrid memory (RAM + SSD), large datasets | Namespace must be pre-configured on the cluster | | **Bigtable** | 3–8 ms | No (threadpool) | Large-scale GCP workloads | Row-key design affects read performance | | **Cassandra / ScyllaDB** | 2–5 ms | No (threadpool) | Multi-region, write-heavy | Tunable consistency; requires DC-aware routing | | **Remote** | Varies | No (threadpool) | Centralized feature server architecture | Adds an HTTP hop; tune connection pool | @@ -305,6 +306,7 @@ The feature server can read from the online store using either an **async** or * | **MongoDB** | Yes | Yes | Uses `motor` (async MongoDB driver) | | **PostgreSQL** | Implemented | No | Has `online_read_async` but does not yet advertise via `async_supported`; uses sync/threadpool path | | **Redis** | Implemented | **Yes** | `online_read_async` and `online_write_batch_async` both implemented; uses sync/threadpool path for `get_online_features` (overridden with batched single pipeline) | +| **Aerospike** | Implemented | No | Async methods wrap the blocking C client via `run_in_executor`; does not yet advertise via `async_supported`, so the server still uses the threadpool path | | All others | No | No | Fall back to sync with `run_in_threadpool()` | **When async matters most:** @@ -467,6 +469,34 @@ online_store: - **`connectTimeoutMS` / `socketTimeoutMS`**: Tighter timeouts improve p99 by failing fast on slow connections. - MongoDB is one of the stores with **full async support** (read and write), so it benefits from concurrent feature view reads via `asyncio.gather()`. +### Aerospike tuning + +Aerospike offers sub-millisecond reads thanks to its hybrid-memory architecture (primary index in RAM, data on SSD or RAM). Tune the per-call policies in the Feast config and rely on the Aerospike cluster's own tuning for everything else: + +```yaml +online_store: + type: aerospike + hosts: + - ["aerospike-1.internal", 3000] + - ["aerospike-2.internal", 3000] + namespace: feast + read_timeout_ms: 150 # hard deadline for a single-record get + write_timeout_ms: 300 # hard deadline for a single-record put/operate + batch_total_timeout_ms: 500 # hard deadline for online_read / online_write_batch + socket_timeout_ms: 50 # per-attempt deadline so max_retries can actually fire + max_retries: 2 + ttl_seconds: 86400 # record-level TTL; omit to use the namespace default + client_kwargs: # escape hatch for any client-config field not surfaced above + policies: + batch: + concurrent_nodes: 0 # 0 = parallel to every node (lowest latency on multi-node clusters) +``` + +- **`*_timeout_ms` (total)** vs **`socket_timeout_ms` (per-attempt)**: `*_timeout_ms` is the hard deadline for a whole call *including* retries; `socket_timeout_ms` is the per-attempt deadline that allows `max_retries` to actually fire within that budget. Without `socket_timeout_ms`, a single slow attempt can consume the entire total deadline and retries never run. +- **`hosts`**: List every seed node. The Aerospike client discovers the rest of the cluster automatically and opens one connection pool per node. +- **`ttl_seconds: 0`** means "never expire"; omit the key to inherit the namespace's `default-ttl`. Expiry is enforced by the server's `nsup` thread — nothing to delete on the client side. +- Co-locate the feature server in the **same availability zone / rack** as the Aerospike cluster; sub-millisecond reads are bandwidth- and RTT-sensitive. + ### Remote online store tuning The Remote online store connects to a Feast feature server over HTTP. Connection pooling is critical: @@ -700,6 +730,7 @@ This applies to every connection-oriented online store: | **DynamoDB** | `max_pool_connections` (HTTP pool) | 10 | No hard limit, but AWS SDK has per-process pool caps; monitor throttling | | **Redis** | Connection per worker | 1 | `maxclients` on the Redis server (default: 10,000) | | **MongoDB** | `maxPoolSize` (in `client_kwargs`) | 100 | Server's `net.maxIncomingConnections` | +| **Aerospike** | Driver manages pool per seed node | Auto | `proto-fd-max` (default 15000) on each Aerospike node | | **Cassandra** | Driver manages pool per node | Auto | `native_transport_max_threads` on each Cassandra node | | **Remote** | `connection_pool_size` (HTTP pool) | 50 | The target feature server's worker capacity | diff --git a/docs/reference/online-stores/README.md b/docs/reference/online-stores/README.md index 6f31993f896..39294966170 100644 --- a/docs/reference/online-stores/README.md +++ b/docs/reference/online-stores/README.md @@ -58,6 +58,10 @@ Please see [Online Store](../../getting-started/components/online-store.md) for [mongodb.md](mongodb.md) {% endcontent-ref %} +{% content-ref url="aerospike.md" %} +[aerospike.md](aerospike.md) +{% endcontent-ref %} + {% content-ref url="hazelcast.md" %} [hazelcast.md](hazelcast.md) {% endcontent-ref %} diff --git a/docs/reference/online-stores/aerospike.md b/docs/reference/online-stores/aerospike.md new file mode 100644 index 00000000000..8102cec3883 --- /dev/null +++ b/docs/reference/online-stores/aerospike.md @@ -0,0 +1,382 @@ +# Aerospike online store (Preview) + +## Description + +The [Aerospike](https://aerospike.com/) online store provides support for materializing feature values into an Aerospike cluster for serving online features. + +{% hint style="warning" %} +The Aerospike online store is currently in **preview**. Some functionality may be unstable, and breaking changes may occur in future releases. +{% endhint %} + +## Features + +* Supports both synchronous and asynchronous read/write paths (`online_read` / `online_read_async`, `online_write_batch` / `online_write_batch_async`). Async methods wrap the blocking client in `run_in_executor`, keeping the event loop responsive in feature-server workloads. +* Partial, server-side upserts via Aerospike Map CDT operations — writing one feature view never clobbers another feature view stored on the same entity. +* Record-level TTL controlled by a single `ttl_seconds` config option (honours the namespace default, a "never expire" sentinel, or an explicit number of seconds). +* Per-feature-view **namespace overrides** and **set overrides** — pin individual feature views to RAM-only or SSD-backed namespaces, or isolate one view in its own set, without splitting projects. +* **Prewriting hook** — a configurable, import-string-resolved callable applied to every write batch for cross-cutting concerns like PII masking, application-side encryption, or value coercion. +* Authentication and TLS options for Aerospike Enterprise Edition passed straight through to the Aerospike Python client. +* `client_kwargs` escape hatch for any advanced client-config field not surfaced on `AerospikeOnlineStoreConfig`. +* Baseline: Aerospike Server **≥ 6.0** (uses batch-write / batch-operate APIs). The store has been developed against CE 8.x. + +## Getting started + +Install the Aerospike extra (alongside the dependency for the offline store of choice): + +```bash +pip install 'feast[aerospike]' +``` + +You can start from any of the standard templates (e.g. `feast init -t local` or `feast init -t aws`) and then swap in Aerospike as the online store as shown below. + +## Examples + +### Basic configuration — local Aerospike CE + +{% code title="feature_store.yaml" %} +```yaml +project: my_feature_repo +registry: data/registry.db +provider: local +online_store: + type: aerospike + hosts: + - ["127.0.0.1", 3000] + namespace: feast +``` +{% endcode %} + +### Multi-node cluster + +{% code title="feature_store.yaml" %} +```yaml +project: my_feature_repo +registry: data/registry.db +provider: local +online_store: + type: aerospike + hosts: + - ["aerospike-1.internal", 3000] + - ["aerospike-2.internal", 3000] + - ["aerospike-3.internal", 3000] + namespace: feast + ttl_seconds: 86400 # 24h record-level TTL + read_timeout_ms: 150 # hard deadline for a single-record get + write_timeout_ms: 300 # hard deadline for a single-record put/operate + batch_total_timeout_ms: 500 # hard deadline for online_read / online_write_batch + socket_timeout_ms: 50 # per-attempt deadline so max_retries can fire + max_retries: 2 +``` +{% endcode %} + +> **Timeout semantics.** The Aerospike client distinguishes per-attempt +> (`socket_timeout`) from total (`total_timeout`) deadlines. `*_timeout_ms` map +> to `total_timeout` — the overall budget for a call including retries. Set +> `socket_timeout_ms` as well so each individual attempt has its own (shorter) +> deadline; without it, `max_retries` effectively never fires because the +> first attempt is allowed to consume the entire total deadline. + +### Aerospike Enterprise with authentication + +> Requires Aerospike Enterprise Edition. The Community Edition server has no built-in user/security model and will reject these config keys. + +{% code title="feature_store.yaml" %} +```yaml +project: my_feature_repo +registry: data/registry.db +provider: local +online_store: + type: aerospike + hosts: + - ["aerospike.internal", 3000] + namespace: feast + user: feast_user + password: ${AEROSPIKE_PASSWORD} # pragma: allowlist secret + auth_mode: internal # internal | external | pki +``` +{% endcode %} + +### Aerospike Enterprise with TLS + +> Requires Aerospike Enterprise Edition. The Community Edition server does not implement TLS, so `tls` config is effective only against EE clusters. + +{% code title="feature_store.yaml" %} +```yaml +project: my_feature_repo +registry: data/registry.db +provider: local +online_store: + type: aerospike + hosts: + - ["aerospike-1.internal", 4333, "aerospike-tls"] + namespace: feast + tls: + enable: true + cafile: /etc/aerospike/certs/ca.pem + certfile: /etc/aerospike/certs/client.pem + keyfile: /etc/aerospike/certs/client.key +``` +{% endcode %} + +### Per-feature-view namespace and set overrides + +Two `Dict[str, str]` config fields — `namespace_overrides` and `set_overrides` — let you place individual feature views on a different Aerospike namespace or set without splitting your project across stores. Anything not listed in either map falls back to the store-level default (`namespace` / `set_name_template`). + +Common reasons to reach for these: + +* A **hot, latency-sensitive view** belongs on a RAM-only namespace; a **wide, cold view** belongs on an SSD-backed namespace. Same project, different storage tiers. +* You want `feast apply` deletions or `truncate` on one feature view to be O(1) without scanning records of the others — give that view its own set. + +{% code title="feature_store.yaml" %} +```yaml +project: my_feature_repo +registry: data/registry.db +provider: local +online_store: + type: aerospike + hosts: + - ["aerospike.internal", 3000] + namespace: feast # default namespace + set_name_template: "{project}_{collection_suffix}" + namespace_overrides: + driver_realtime_stats: feast_ram # in-memory namespace + driver_history_lookup: feast_ssd # device-backed namespace + set_overrides: + isolated_view: my_feature_repo_isolated +``` +{% endcode %} + +> **Tradeoffs.** +> +> * Every namespace listed in `namespace_overrides` MUST already exist on the cluster — Aerospike cannot create namespaces at runtime, and a missing namespace surfaces as an opaque `AEROSPIKE_ERR_PARAM` on the first read or write. +> * Putting feature views on different sets means a multi-feature-view read for the same entity becomes one Aerospike round trip per set, not one round trip total. Only opt in when the operational isolation is worth that cost. Reads that touch a single feature view are unaffected. +> * Admin operations honour the overrides automatically: `update()` (called by `feast apply`) groups dropped feature views by their resolved `(namespace, set)` and issues one background scan per group; `teardown()` truncates every unique `(namespace, set)` pair the project may have written to (including the store-level default). + +### Prewriting hooks + +`prewriting_hook` is the import path of a callable that is invoked once per `online_write_batch` call, receives the rows about to be written, and returns the rows that actually go on the wire. Use it for cross-cutting write-side concerns that you don't want sprinkled through every materialization job — PII masking, application-side encryption, dual-write fan-out, value coercion, etc. + +Hooks are referenced by import string (rather than as a Python `Callable` value) so the config survives YAML/JSON serialisation and remote-feature-server transport. The resolved callable is cached on the store instance, so import cost is paid once per store lifetime. + +**Hook signature:** + +```python +def hook( + config: RepoConfig, + table: FeatureView, + data: list[ + tuple[ + EntityKeyProto, + dict[str, ValueProto], + datetime, + datetime | None, + ] + ], +) -> list[ + tuple[ + EntityKeyProto, + dict[str, ValueProto], + datetime, + datetime | None, + ] +]: + ... +``` + +The hook MUST return a row list with the same schema as its input. Returning `[]` short-circuits the write — same path as an empty input, no wire call is issued. Hooks that raise will fail the whole batch; there is no per-row fallback. + +**1. Drop a hook function in your project.** Any module on the `PYTHONPATH` of every process that writes through Feast will do (the materialization workers, the registry CLI host, and the feature server, if you run one). + +{% code title="my_feature_repo/hooks.py" %} +```python +"""Prewriting hooks for the Aerospike online store.""" +from __future__ import annotations + +import hashlib +import os +from datetime import datetime +from typing import Optional + +from feast import FeatureView +from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto +from feast.protos.feast.types.Value_pb2 import Value as ValueProto +from feast.repo_config import RepoConfig + +# Names of features that must never reach the online store as plaintext. +# Matched by exact feature name; tweak to your project's conventions. +_SENSITIVE_FEATURES = {"email", "phone_number", "ssn"} + + +def hash_pii_string_features( + config: RepoConfig, + table: FeatureView, + data: list[ + tuple[ + EntityKeyProto, + dict[str, ValueProto], + datetime, + Optional[datetime], + ] + ], +) -> list[ + tuple[ + EntityKeyProto, + dict[str, ValueProto], + datetime, + Optional[datetime], + ] +]: + """Replace any sensitive string feature with a salted SHA-256 hex digest. + + The hash is deterministic (same input → same digest) so downstream lookups + that hash the candidate value the same way still hit. ``FEAST_PII_SALT`` + must be set on every process that materialises features; an unset salt + raises rather than silently falling back to plaintext. + """ + salt = os.environ.get("FEAST_PII_SALT") + if salt is None: + raise RuntimeError( + "FEAST_PII_SALT is not set; refusing to write feature batches " + "without a configured PII salt." + ) + salt_bytes = salt.encode("utf-8") + + def _digest(plaintext: str) -> str: + h = hashlib.sha256() + h.update(salt_bytes) + h.update(plaintext.encode("utf-8")) + return h.hexdigest() + + transformed: list[ + tuple[ + EntityKeyProto, + dict[str, ValueProto], + datetime, + Optional[datetime], + ] + ] = [] + for entity_key, values, event_ts, created_ts in data: + new_values = dict(values) + for feature_name in _SENSITIVE_FEATURES.intersection(new_values): + v = new_values[feature_name] + if v.HasField("string_val") and v.string_val: + new_values[feature_name] = ValueProto(string_val=_digest(v.string_val)) + transformed.append((entity_key, new_values, event_ts, created_ts)) + return transformed +``` +{% endcode %} + +**2. Reference the hook from `feature_store.yaml`:** + +{% code title="feature_store.yaml" %} +```yaml +project: my_feature_repo +registry: data/registry.db +provider: local +online_store: + type: aerospike + hosts: + - ["aerospike.internal", 3000] + namespace: feast + prewriting_hook: my_feature_repo.hooks.hash_pii_string_features +``` +{% endcode %} + +> **Operational notes.** +> +> * The hook is **only invoked on the write path**; reads pass through the store untouched. If your hook is one-way (e.g. hashing) you have to apply the same transformation to the candidate value at read time yourself. +> * Hooks run inside the same process as the writer — they're not RPCs and not sandboxed. They can read environment variables, open files, call out to KMS, etc. Treat them as part of your trusted code base. +> * A misconfigured `prewriting_hook` (bad import path, missing function, non-callable target) raises `ValueError` / `TypeError` on the *first* `online_write_batch` call, not on store construction. Add a smoke test that writes one row at deploy time so misconfigurations surface before a real batch. + +The full set of configuration options is available in [`AerospikeOnlineStoreConfig`](https://rtd.feast.dev/en/latest/#feast.infra.online_stores.aerospike_online_store.aerospike.AerospikeOnlineStoreConfig). + +## Data Model + +The Aerospike online store uses a **single set per project** with entity-key collocation. Features from multiple feature views for the same entity are stored together on a single Aerospike record, analogous to the MongoDB online store's "one document per entity" layout. + +| Aerospike concept | Feast mapping | +| :---------------- | :---------------------------------------------------------------------------- | +| Namespace | `online_store.namespace` (must be pre-configured on the cluster); per-feature-view override via `online_store.namespace_overrides` | +| Set | `online_store.set_name_template` → `"{project}_{collection_suffix}"` by default; per-feature-view override via `online_store.set_overrides` | +| Key | `serialize_entity_key(entity_key)` as `bytearray` user key | +| Bin `features` | Map CDT keyed by feature-view name, each value a map of `feature → native` | +| Bin `event_ts` | Map CDT keyed by feature-view name, each value an int64 epoch-ms timestamp | +| Bin `created_ts` | Top-level int64 epoch-ms timestamp (last `feast materialize`) | + +### Example record + +For a single entity carrying features from two feature views (`driver_stats` and `pricing`): + +```text +key: (ns="feast", set="my_feature_repo_latest", user_key=) +bins: + features: + driver_stats: + rating: 4.91 + trips_last_7d: 132 + pricing: + surge_multiplier: 1.2 + event_ts: + driver_stats: 1737374400000 # 2025-01-20T12:00:00Z + pricing: 1737447000000 # 2025-01-21T08:30:00Z + created_ts: 1737460805000 # 2025-01-21T12:00:05Z +``` + +### Key design decisions + +* **Record per entity, bin per concept.** `features` and `event_ts` are Aerospike Map CDT bins, not dynamic bins, which keeps the store within the 15-byte Aerospike bin-name limit regardless of how many feature views a project has. +* **Partial upserts via Map CDT ops.** Writes use `batch_write` with `map_put_items("features", {: {...}})` and `map_put("event_ts", , )`. Concurrent writes to different feature views on the same entity never clobber each other — each write mutates only its own map keys. +* **Entity-key bytes as the Aerospike user key.** Feast's `serialize_entity_key` output is passed as a `bytearray` user key (not `bytes` — the Python client hashes only the first byte of `bytes` keys, which would collapse distinct entities). +* **Timestamps as int64 epoch milliseconds.** Aerospike has no native datetime type; tz-naive timestamps are treated as UTC per the `OnlineStore` contract. + +### TTL and expiry + +`ttl_seconds` is written as record-level metadata on every `online_write_batch` call: + +| `ttl_seconds` | Aerospike TTL | Effect | +| :------------ | :-------------------------------- | :---------------------------------------------------------- | +| not set / `null` | `TTL_NAMESPACE_DEFAULT` | Record inherits the namespace's configured `default-ttl`. | +| `0` | `TTL_NEVER_EXPIRE` | Record is kept until explicitly deleted. | +| `>0` | that many seconds | Record is evicted by the server's `nsup` thread. | + +There is no per-feature-view TTL override in this version — the setting is applied uniformly for every write made by the online store. + +### Indexes + +No secondary indexes are created. All access goes through the primary key, which is the serialized entity key. + +## Async support + +Async read/write are provided by running the Aerospike Python client's blocking calls on the default thread-pool executor (`loop.run_in_executor`). The underlying C client releases the GIL during network I/O, so `await store.online_read_async(...)` keeps the event loop responsive. A native asyncio Aerospike client is not currently used. + +Both sync and async methods are fully supported: + +* `online_read` / `online_read_async` +* `online_write_batch` / `online_write_batch_async` +* `initialize` / `close` — `initialize(config)` eagerly opens the connection so feature servers pay the TCP/handshake cost at startup; `close()` releases the cached client. + +## Functionality Matrix + +The set of functionality supported by online stores is described in detail [here](overview.md#functionality). +Below is a matrix indicating which functionality is supported by the Aerospike online store. + +| | Aerospike | +| :-------------------------------------------------------- | :-------- | +| write feature values to the online store | yes | +| read feature values from the online store | yes | +| update infrastructure (e.g. tables) in the online store | yes | +| teardown infrastructure (e.g. tables) in the online store | yes | +| generate a plan of infrastructure changes | no | +| support for on-demand transforms | yes | +| readable by Python SDK | yes | +| readable by Java | no | +| readable by Go | no | +| support for entityless feature views | yes | +| support for concurrent writing to the same key | yes | +| support for ttl (time to live) at retrieval | yes | +| support for deleting expired data | yes | +| collocated by feature view | no | +| collocated by feature service | no | +| collocated by entity key | yes | + +To compare this set of functionality against other online stores, please see the full [functionality matrix](overview.md#functionality-matrix). diff --git a/docs/roadmap.md b/docs/roadmap.md index e47aa79b573..ca0fab8e729 100644 --- a/docs/roadmap.md +++ b/docs/roadmap.md @@ -62,6 +62,7 @@ The list below contains the functionality that contributors are planning to deve * [x] [SingleStore](https://docs.feast.dev/reference/online-stores/singlestore) * [x] [Couchbase](https://docs.feast.dev/reference/online-stores/couchbase) * [x] [MongoDB](https://docs.feast.dev/reference/online-stores/mongodb) + * [x] [Aerospike](https://docs.feast.dev/reference/online-stores/aerospike) * [x] [Qdrant (vector store)](https://docs.feast.dev/reference/online-stores/qdrant) * [x] [Milvus (vector store)](https://docs.feast.dev/reference/online-stores/milvus) * [x] [Faiss (vector store)](https://docs.feast.dev/reference/online-stores/faiss) diff --git a/examples/online_store/aerospike_overrides_and_hooks/README.md b/examples/online_store/aerospike_overrides_and_hooks/README.md new file mode 100644 index 00000000000..7143b6a3dff --- /dev/null +++ b/examples/online_store/aerospike_overrides_and_hooks/README.md @@ -0,0 +1,65 @@ +# Aerospike: per-feature-view overrides + prewriting hooks + +A short companion to [`docs/reference/online-stores/aerospike.md`](../../../docs/reference/online-stores/aerospike.md) +demonstrating three deployment patterns the Aerospike online store supports +without needing any Feast extension code: + +1. **Per-feature-view namespace overrides** — pin one view to a RAM-only + namespace and another to an SSD-backed one without splitting the project. +2. **Per-feature-view set overrides** — isolate one view in its own set so + `feast apply` deletions or admin truncates only touch that view. +3. **Prewriting hooks** — apply a project-wide write-side transformation + (PII masking in this example) without sprinkling it through every + materialization job. + +Nothing here is Aerospike-specific *infrastructure* — it's all configured +in `feature_store.yaml`. This directory only adds the hook-target Python +module the YAML references. + +## Files + +| file | purpose | +|---|---| +| [`hooks.py`](hooks.py) | A pure-Python prewriting-hook module containing `hash_pii_string_features`, the same example used in the docs. Drop into any module on the writer's `PYTHONPATH`. | +| [`feature_store.yaml`](feature_store.yaml) | Reference `online_store` block showing all three features wired together. Copy the `online_store` section into your own `feature_store.yaml` — the rest is project-specific scaffolding. | + +## Prerequisites + +* Feast installed with the Aerospike extra (`pip install 'feast[aerospike]'`). +* An Aerospike cluster reachable from your writer process. The + [Aerospike online-store reference](../../../docs/reference/online-stores/aerospike.md) + shows a minimal local CE config (`127.0.0.1:3000`); run Aerospike however + you normally would (Docker, Kubernetes, bare metal). +* On every process that calls `online_write_batch` through this store + (materialization workers, the registry CLI host, the feature server if + you run one), the `FEAST_PII_SALT` environment variable must be set + before the first write — `hash_pii_string_features` raises rather than + silently writing plaintext if the salt isn't configured. +* The two namespaces referenced by `namespace_overrides` (`feast_ram` and + `feast_ssd` in the sample YAML) must already exist on the Aerospike + cluster — Aerospike cannot create namespaces at runtime. + +## Trying it out + +1. Drop `hooks.py` into a module on your `PYTHONPATH` that the writer + process can import (e.g. inside your existing feature-repo package). + The example uses the qualified path + `examples.online_store.aerospike_overrides_and_hooks.hooks.hash_pii_string_features`. +2. Copy the `online_store:` block from `feature_store.yaml` into your + own feature repo, adjusting hosts / namespaces / the hook import path + for your project. +3. `export FEAST_PII_SALT=...` (anything random and stable across + processes — rotate by re-running materialization with a new salt). +4. `feast apply` — the new config is registered. +5. Materialize as usual — the hook runs once per `online_write_batch`, + and any feature named `email`, `phone_number` or `ssn` lands in + Aerospike as a salted SHA-256 hex digest instead of plaintext. + +## Read-side note + +Prewriting hooks are **only** invoked on the write path. If your hook is +a one-way transform (hashing, encryption-without-decryption-key) you +have to apply the same transform to the candidate value at read time +yourself. Two-way transforms (deterministic encryption, Base64) need a +matching post-read step in your serving code; the Aerospike store does +not currently expose a symmetric "postreading hook". diff --git a/examples/online_store/aerospike_overrides_and_hooks/feature_store.yaml b/examples/online_store/aerospike_overrides_and_hooks/feature_store.yaml new file mode 100644 index 00000000000..bd1061f30aa --- /dev/null +++ b/examples/online_store/aerospike_overrides_and_hooks/feature_store.yaml @@ -0,0 +1,59 @@ +# Reference feature_store.yaml demonstrating all three Aerospike +# extension points wired together. Copy the `online_store:` block into +# your own feature repo and adjust hosts / namespaces / hook import +# path for your project. +# +# Prerequisites: +# - The `feast_ram` and `feast_ssd` namespaces must already exist on +# the Aerospike cluster. Aerospike cannot create namespaces at +# runtime; a missing namespace surfaces as AEROSPIKE_ERR_PARAM on +# the first read or write touching that view. +# - `FEAST_PII_SALT` must be set in every process that calls +# online_write_batch through this store. + +project: my_feature_repo +registry: data/registry.db +provider: local + +online_store: + type: aerospike + + hosts: + - ["aerospike.internal", 3000] + + # Store-level defaults. Anything not listed in *_overrides below + # falls back to these. + namespace: feast + set_name_template: "{project}_{collection_suffix}" + + # Pin individual feature views to different namespaces -- typically + # one in-memory namespace for hot, latency-sensitive views and one + # device-backed namespace for cold, wide views. + namespace_overrides: + driver_realtime_stats: feast_ram + driver_history_lookup: feast_ssd + + # Isolate one feature view in its own set so that admin operations on + # it (truncate, scan-based deletion via `feast apply`) do not touch + # the records of other views. + set_overrides: + isolated_view: my_feature_repo_isolated + + # Project-wide write-side hook. The store dynamically imports the + # callable on first use and caches it. Adjust the import path to + # whatever module is on your writers' PYTHONPATH; the value below + # assumes you have the example folder on PYTHONPATH from the + # repository root. + prewriting_hook: examples.online_store.aerospike_overrides_and_hooks.hooks.hash_pii_string_features + + # Standard timing knobs (optional -- shown for completeness). + ttl_seconds: 86400 + read_timeout_ms: 150 + write_timeout_ms: 300 + batch_total_timeout_ms: 500 + socket_timeout_ms: 50 + max_retries: 2 + +# Offline store / entity_key_serialization_version / etc. are +# project-specific and intentionally omitted; this file is a snippet, +# not a runnable repo. diff --git a/examples/online_store/aerospike_overrides_and_hooks/hooks.py b/examples/online_store/aerospike_overrides_and_hooks/hooks.py new file mode 100644 index 00000000000..15e6f8a10c7 --- /dev/null +++ b/examples/online_store/aerospike_overrides_and_hooks/hooks.py @@ -0,0 +1,109 @@ +"""Sample prewriting hooks for the Feast Aerospike online store. + +Reference the callable from ``feature_store.yaml`` via its import string, +e.g.:: + + online_store: + type: aerospike + ... + prewriting_hook: examples.online_store.aerospike_overrides_and_hooks.hooks.hash_pii_string_features + +The Aerospike online store invokes the configured callable once per +``online_write_batch`` call, passing the rows about to be written. The +callable must return a row list with the same schema. Returning ``[]`` +short-circuits the write — same path as an empty input, no wire call is +issued. +""" + +from __future__ import annotations + +import hashlib +import os +from datetime import datetime +from typing import List, Optional, Tuple + +from feast import FeatureView +from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto +from feast.protos.feast.types.Value_pb2 import Value as ValueProto +from feast.repo_config import RepoConfig + +# Names of features that must never reach the online store as plaintext. +# Match is by exact feature name; tweak to your project's conventions +# (regex, suffix-based, FV-tag-driven, etc.). +_SENSITIVE_FEATURES = frozenset({"email", "phone_number", "ssn"}) + +# Type alias for the per-row payload Feast hands to ``online_write_batch``. +WriteRow = Tuple[ + EntityKeyProto, + dict, + datetime, + Optional[datetime], +] + + +def hash_pii_string_features( + config: RepoConfig, + table: FeatureView, + data: List[WriteRow], +) -> List[WriteRow]: + """Replace any sensitive string feature with a salted SHA-256 hex digest. + + Determinism: same plaintext + same ``FEAST_PII_SALT`` → same digest. + Downstream lookups that hash the candidate value the same way still + hit; lookups against the raw plaintext silently miss. + + Safety: an unset salt raises rather than falling back to plaintext. + Set ``FEAST_PII_SALT`` on every process that materialises features + (workers, registry CLI host, feature server). + """ + salt = os.environ.get("FEAST_PII_SALT") + if salt is None: + raise RuntimeError( + "FEAST_PII_SALT is not set; refusing to write feature batches " + "without a configured PII salt." + ) + salt_bytes = salt.encode("utf-8") + + def _digest(plaintext: str) -> str: + h = hashlib.sha256() + h.update(salt_bytes) + h.update(plaintext.encode("utf-8")) + return h.hexdigest() + + transformed: List[WriteRow] = [] + for entity_key, values, event_ts, created_ts in data: + new_values = dict(values) + for feature_name in _SENSITIVE_FEATURES.intersection(new_values): + v: ValueProto = new_values[feature_name] + if v.HasField("string_val") and v.string_val: + new_values[feature_name] = ValueProto(string_val=_digest(v.string_val)) + transformed.append((entity_key, new_values, event_ts, created_ts)) + return transformed + + +def drop_rows_with_negative_amounts( + config: RepoConfig, + table: FeatureView, + data: List[WriteRow], +) -> List[WriteRow]: + """Defensive sample hook: filter rows whose ``amount`` feature is < 0. + + Demonstrates that hooks can also *remove* rows. Returning an empty + list short-circuits the wire call entirely — useful for emergency + feature-write quarantines without a code deploy. + """ + keep: List[WriteRow] = [] + for entity_key, values, event_ts, created_ts in data: + amount: Optional[ValueProto] = values.get("amount") + if ( + amount is not None + and amount.HasField("double_val") + and amount.double_val < 0 + ): + continue + if amount is not None and amount.HasField("float_val") and amount.float_val < 0: + continue + if amount is not None and amount.HasField("int64_val") and amount.int64_val < 0: + continue + keep.append((entity_key, values, event_ts, created_ts)) + return keep diff --git a/infra/feast-operator/api/v1/featurestore_types.go b/infra/feast-operator/api/v1/featurestore_types.go index 3dd68311245..24eeef778f9 100644 --- a/infra/feast-operator/api/v1/featurestore_types.go +++ b/infra/feast-operator/api/v1/featurestore_types.go @@ -597,7 +597,7 @@ type OnlineStoreFilePersistence struct { // OnlineStoreDBStorePersistence configures the DB store persistence for the online store service type OnlineStoreDBStorePersistence struct { // Type of the persistence type you want to use. - // +kubebuilder:validation:Enum=snowflake.online;redis;datastore;dynamodb;bigtable;postgres;cassandra;mysql;hazelcast;singlestore;hbase;elasticsearch;qdrant;couchbase.online;milvus;hybrid;mongodb + // +kubebuilder:validation:Enum=snowflake.online;redis;datastore;dynamodb;bigtable;postgres;cassandra;mysql;hazelcast;singlestore;hbase;elasticsearch;qdrant;couchbase.online;milvus;hybrid;mongodb;aerospike Type string `json:"type"` // Data store parameters should be placed as-is from the "feature_store.yaml" under the secret key. "registry_type" & "type" fields should be removed. SecretRef corev1.LocalObjectReference `json:"secretRef"` @@ -623,6 +623,7 @@ var ValidOnlineStoreDBStorePersistenceTypes = []string{ "milvus", "hybrid", "mongodb", + "aerospike", } // LocalRegistryConfig configures the registry service diff --git a/infra/feast-operator/api/v1alpha1/featurestore_types.go b/infra/feast-operator/api/v1alpha1/featurestore_types.go index 87d13003805..c165801eda7 100644 --- a/infra/feast-operator/api/v1alpha1/featurestore_types.go +++ b/infra/feast-operator/api/v1alpha1/featurestore_types.go @@ -373,7 +373,7 @@ type OnlineStoreFilePersistence struct { // OnlineStoreDBStorePersistence configures the DB store persistence for the online store service type OnlineStoreDBStorePersistence struct { // Type of the persistence type you want to use. - // +kubebuilder:validation:Enum=snowflake.online;redis;datastore;dynamodb;bigtable;postgres;cassandra;mysql;hazelcast;singlestore;hbase;elasticsearch;qdrant;couchbase.online;milvus;hybrid;mongodb + // +kubebuilder:validation:Enum=snowflake.online;redis;datastore;dynamodb;bigtable;postgres;cassandra;mysql;hazelcast;singlestore;hbase;elasticsearch;qdrant;couchbase.online;milvus;hybrid;mongodb;aerospike Type string `json:"type"` // Data store parameters should be placed as-is from the "feature_store.yaml" under the secret key. "registry_type" & "type" fields should be removed. SecretRef corev1.LocalObjectReference `json:"secretRef"` @@ -399,6 +399,7 @@ var ValidOnlineStoreDBStorePersistenceTypes = []string{ "milvus", "hybrid", "mongodb", + "aerospike", } // LocalRegistryConfig configures the registry service diff --git a/infra/feast-operator/bundle/manifests/feast.dev_featurestores.yaml b/infra/feast-operator/bundle/manifests/feast.dev_featurestores.yaml index d8791ae8a26..1355d65d993 100644 --- a/infra/feast-operator/bundle/manifests/feast.dev_featurestores.yaml +++ b/infra/feast-operator/bundle/manifests/feast.dev_featurestores.yaml @@ -2256,6 +2256,7 @@ spec: - milvus - hybrid - mongodb + - aerospike type: string required: - secretRef @@ -8281,6 +8282,7 @@ spec: - milvus - hybrid - mongodb + - aerospike type: string required: - secretRef @@ -13578,6 +13580,7 @@ spec: - milvus - hybrid - mongodb + - aerospike type: string required: - secretRef @@ -17856,6 +17859,7 @@ spec: - milvus - hybrid - mongodb + - aerospike type: string required: - secretRef diff --git a/infra/feast-operator/config/crd/bases/feast.dev_featurestores.yaml b/infra/feast-operator/config/crd/bases/feast.dev_featurestores.yaml index 0851b9abf96..c842e7d7364 100644 --- a/infra/feast-operator/config/crd/bases/feast.dev_featurestores.yaml +++ b/infra/feast-operator/config/crd/bases/feast.dev_featurestores.yaml @@ -2256,6 +2256,7 @@ spec: - milvus - hybrid - mongodb + - aerospike type: string required: - secretRef @@ -8281,6 +8282,7 @@ spec: - milvus - hybrid - mongodb + - aerospike type: string required: - secretRef @@ -13578,6 +13580,7 @@ spec: - milvus - hybrid - mongodb + - aerospike type: string required: - secretRef @@ -17856,6 +17859,7 @@ spec: - milvus - hybrid - mongodb + - aerospike type: string required: - secretRef diff --git a/infra/feast-operator/dist/install.yaml b/infra/feast-operator/dist/install.yaml index a6453c3e6a3..f53903cbf7f 100644 --- a/infra/feast-operator/dist/install.yaml +++ b/infra/feast-operator/dist/install.yaml @@ -2264,6 +2264,7 @@ spec: - milvus - hybrid - mongodb + - aerospike type: string required: - secretRef @@ -8289,6 +8290,7 @@ spec: - milvus - hybrid - mongodb + - aerospike type: string required: - secretRef @@ -13586,6 +13588,7 @@ spec: - milvus - hybrid - mongodb + - aerospike type: string required: - secretRef @@ -17864,6 +17867,7 @@ spec: - milvus - hybrid - mongodb + - aerospike type: string required: - secretRef diff --git a/pyproject.toml b/pyproject.toml index 25bef64ccfa..b628f5485f8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -48,6 +48,9 @@ dependencies = [ ] [project.optional-dependencies] +aerospike = [ + "aerospike>=19.0.0,<20.0.0", +] aws = ["boto3>=1.38.27", "fsspec>=2024.1.0", "aiobotocore>=2"] azure = [ "azure-storage-blob>=0.37.0", diff --git a/sdk/python/feast/infra/online_stores/aerospike_online_store/__init__.py b/sdk/python/feast/infra/online_stores/aerospike_online_store/__init__.py new file mode 100644 index 00000000000..099892626dd --- /dev/null +++ b/sdk/python/feast/infra/online_stores/aerospike_online_store/__init__.py @@ -0,0 +1,3 @@ +from .aerospike import AerospikeOnlineStore, AerospikeOnlineStoreConfig + +__all__ = ["AerospikeOnlineStore", "AerospikeOnlineStoreConfig"] diff --git a/sdk/python/feast/infra/online_stores/aerospike_online_store/aerospike.py b/sdk/python/feast/infra/online_stores/aerospike_online_store/aerospike.py new file mode 100644 index 00000000000..32d1f1cadd7 --- /dev/null +++ b/sdk/python/feast/infra/online_stores/aerospike_online_store/aerospike.py @@ -0,0 +1,990 @@ +from __future__ import annotations + +import asyncio +import functools +import importlib +from datetime import datetime, timezone +from logging import getLogger +from typing import ( + Any, + Callable, + Dict, + List, + Literal, + Optional, + Sequence, + Set, + Tuple, + Union, +) + +from pydantic import SecretStr + +try: + import aerospike + from aerospike_helpers import cdt_ctx + from aerospike_helpers.batch.records import BatchRecords + from aerospike_helpers.batch.records import Write as BatchWrite + from aerospike_helpers.operations import map_operations as map_ops + from aerospike_helpers.operations import operations as ops +except ImportError as e: + from feast.errors import FeastExtrasDependencyImportError + + raise FeastExtrasDependencyImportError("aerospike", str(e)) + +from feast.entity import Entity +from feast.feature_view import FeatureView +from feast.infra.key_encoding_utils import serialize_entity_key +from feast.infra.online_stores.online_store import OnlineStore +from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto +from feast.protos.feast.types.Value_pb2 import Value as ValueProto +from feast.repo_config import FeastConfigBaseModel, RepoConfig +from feast.type_map import ( + feast_value_type_to_python_type, + python_values_to_proto_values, +) + +logger = getLogger(__name__) + + +# Aerospike per-record batch result codes we treat specially. Anything not +# listed here is surfaced as an exception so a transient server error (e.g. +# timeout, device overload) is never silently misreported as a missing feature. +# See https://aerospike.com/docs/server/reference/errors. +_AS_OK: int = 0 +_AS_ERR_RECORD_NOT_FOUND: int = 2 # genuine "entity has never been written" +_AS_ERR_OP_NOT_APPLICABLE: int = 26 # map/list op targeted a missing CDT path + + +_AUTH_MODE_TO_CONSTANT: Dict[str, int] = { + "internal": aerospike.AUTH_INTERNAL, + "external": aerospike.AUTH_EXTERNAL, + "pki": aerospike.AUTH_PKI, +} + + +# Type alias for the prewriting-hook signature. Hooks receive the rows about +# to be written and return a (possibly transformed) row list with the same +# schema. Defined as ``Any`` in the value position to keep the public type +# hint readable; the actual contract is documented on +# ``AerospikeOnlineStoreConfig.prewriting_hook``. +PrewritingHook = Callable[ + [ + "RepoConfig", + "FeatureView", + List[ + Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]] + ], + ], + List[Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]]], +] + + +# Create every Map CDT bin with an ordered map. map_get_by_key / +# map_remove_by_key on an ordered map are O(log N) in the map size instead of +# O(N), which matters on the update() background scan (which walks every +# record in the project's set) and on reads of wide feature views. The policy +# is applied on each put so map-creation on the first write picks up the +# ordering; subsequent puts keep it. +_ORDERED_MAP_POLICY: Dict[str, Any] = {"map_order": aerospike.MAP_KEY_ORDERED} + + +def _datetime_to_epoch_ms(dt: datetime) -> int: + """Convert a datetime to int64 epoch milliseconds. + + Aerospike has no native datetime type, so timestamps are stored as int + bins. Per the OnlineStore contract, a tz-naive timestamp is treated as UTC. + """ + if dt.tzinfo is None: + dt = dt.replace(tzinfo=timezone.utc) + return int(dt.timestamp() * 1000) + + +def _epoch_ms_to_datetime(value: Optional[int]) -> Optional[datetime]: + """Inverse of :func:`_datetime_to_epoch_ms`. Returns a tz-aware UTC datetime.""" + if value is None: + return None + return datetime.fromtimestamp(value / 1000.0, tz=timezone.utc) + + +def _resolve_ttl(ttl_seconds: Optional[int]) -> int: + """Map the config's ``ttl_seconds`` to an Aerospike record-metadata TTL. + + * ``None`` -> use the namespace default (``TTL_NAMESPACE_DEFAULT``). + * ``0`` -> never expire (``TTL_NEVER_EXPIRE``). + * ``> 0`` -> that many seconds until expiry. + """ + if ttl_seconds is None: + return aerospike.TTL_NAMESPACE_DEFAULT + if ttl_seconds == 0: + return aerospike.TTL_NEVER_EXPIRE + return int(ttl_seconds) + + +class AerospikeOnlineStoreConfig(FeastConfigBaseModel): + """Aerospike configuration. + + Aerospike does not have a URI analogue; connections are established via a + seed list of ``(host, port)`` or ``(host, port, tls_name)`` tuples. See the + Aerospike Python client reference for the meaning of additional policies and + TLS options surfaced below, and use ``client_kwargs`` for anything not + explicitly modelled here. + """ + + type: Literal["aerospike"] = "aerospike" + """Online store type selector""" + + hosts: List[Union[Tuple[str, int], Tuple[str, int, str]]] = [("localhost", 3000)] + """Aerospike seed nodes. + + Each entry is either ``(host, port)`` or ``(host, port, tls_name)`` when TLS + is enabled. At least one seed node is required. + """ + + namespace: str = "feast" + """Default Aerospike namespace. Must be pre-configured on the cluster — + namespaces cannot be created at runtime. Used for any feature view not + listed in :attr:`namespace_overrides`.""" + + set_name_template: str = "{project}_{collection_suffix}" + """Template for the per-project Aerospike set name. Available substitutions: + ``{project}`` and ``{collection_suffix}``. Used for any feature view not + listed in :attr:`set_overrides`.""" + + collection_suffix: str = "latest" + """Suffix used by ``set_name_template`` to distinguish sets belonging to the + same project (e.g. a future multi-version layout).""" + + namespace_overrides: Dict[str, str] = {} + """Per-feature-view namespace overrides. Maps a feature view name to the + Aerospike namespace that view should be stored in. Falls back to + :attr:`namespace` for any feature view not listed. + + Lets a deployment pin hot small views to a RAM-only namespace and wider + archival views to an SSD-backed one without having to split projects:: + + namespace_overrides = { + "driver_realtime_stats": "feast_ram", + "driver_history_lookup": "feast_ssd", + } + + Every namespace listed here MUST already exist on the cluster — Aerospike + cannot create namespaces at runtime, and a missing namespace surfaces as + an opaque ``AEROSPIKE_ERR_PARAM`` on the first read or write. + """ + + set_overrides: Dict[str, str] = {} + """Per-feature-view set name overrides. Maps a feature view name to a + fully-qualified Aerospike set name. Falls back to the rendering of + :attr:`set_name_template` for any feature view not listed. + + Useful when a deployment wants each feature view in its own set so admin + operations like ``truncate`` or the ``feast apply`` deletion path can + target one view without scanning the records of the others. Tradeoff: + multi-feature-view reads on the same entity become multiple round trips + instead of one — only set this when the operational isolation is worth + that cost. + """ + + prewriting_hook: Optional[str] = None + """Optional import path of a callable applied to every batch of rows + before they are written. Used for cross-cutting concerns like PII + masking, encryption-at-rest, value coercion or dual-write fan-out. + + Format: ``"package.module.function_name"``. Resolved by import string + (rather than a Python ``Callable``) so the config survives YAML / JSON + serialisation and remote-feature-server transport. + + The hook signature is :data:`PrewritingHook`:: + + def hook( + config: RepoConfig, + table: FeatureView, + data: List[Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]]], + ) -> List[Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]]]: + ... + + The hook MUST return a row list with the same shape as its input. + Hooks that raise will fail the whole batch — there is no per-row + fallback. The resolved callable is cached on the store instance, so + swapping the config to a different hook also requires a new + ``AerospikeOnlineStore`` instance (which Feast already does on + ``RepoConfig`` change). + """ + + user: Optional[str] = None + """Optional username for Aerospike Enterprise authentication.""" + + password: Optional[SecretStr] = None + """Optional password for Aerospike Enterprise authentication.""" + + auth_mode: Literal["internal", "external", "pki"] = "internal" + """Authentication mode. ``internal`` for CE/EE user/password, ``external`` + for LDAP/Kerberos, ``pki`` for certificate-based auth.""" + + tls: Optional[Dict[str, Any]] = None + """TLS configuration, passed through verbatim to the Aerospike client. + See the Aerospike Python client ``tls`` policy options.""" + + ttl_seconds: Optional[int] = None + """Record-level TTL, applied to every write. ``None`` uses the namespace + default, ``0`` means never expire (mapped to the client's ``-1`` sentinel). + No per-feature-view override in v1.""" + + write_timeout_ms: int = 1_000 + """Per-call write total timeout in milliseconds. This is the hard deadline + the client gives a single ``put`` / ``operate`` — including any retries — + to return a response, after which the call fails.""" + + read_timeout_ms: int = 250 + """Per-call read total timeout in milliseconds. Hard deadline for a + single-record ``get`` — including any retries — after which the call + fails.""" + + batch_total_timeout_ms: int = 2_000 + """Total timeout in milliseconds for a whole ``batch_write`` / + ``batch_operate`` call, including retries. Applies to every batch + operation ``online_read`` and ``online_write_batch`` issue.""" + + socket_timeout_ms: Optional[int] = None + """Per-attempt socket timeout in milliseconds. This is the per-retry + trigger that lets ``max_retries`` actually fire within the caller's + overall ``*_timeout_ms`` budget — without it, a single attempt can + consume the whole deadline and retries never run. Applied uniformly + to ``read``, ``write`` and ``batch`` policies. ``None`` leaves the + client default in place.""" + + max_retries: int = 2 + """Maximum number of automatic retries on transient errors.""" + + client_kwargs: Dict[str, Any] = {} + """Escape hatch for any Aerospike client configuration not surfaced above. + Merged into the client config passed to ``aerospike.client()``.""" + + +class AerospikeOnlineStore(OnlineStore): + """Aerospike implementation of the Feast :class:`OnlineStore`. + + Storage layout (one set per project): + + * Namespace: ``config.online_store.namespace`` (server-configured) + * Set: ``{project}_{collection_suffix}`` + * Key: ``serialize_entity_key(entity_key)`` (bytes) + * Bins: + + * ``features`` — Map CDT ``{"": {"": }}`` + * ``event_ts`` — Map CDT ``{"": }`` + * ``created_ts`` — top-level ```` + + Timestamps are stored as int64 epoch milliseconds because Aerospike has no + native datetime type. Tz-naive timestamps are treated as UTC per the + :class:`OnlineStore` contract. + """ + + def __init__(self) -> None: + # Kept on the instance rather than the class so two ``AerospikeOnlineStore`` + # instances can't accidentally share a cached client through class state. + self._client: Optional[aerospike.Client] = None + # Resolved prewriting hook, cached on the instance so the import + # cost is paid once. ``_prewriting_hook_spec`` records the import + # string the cache was built from so a config swap (re-binding the + # store to a different hook) re-resolves on next call. + self._prewriting_hook: Optional[PrewritingHook] = None + self._prewriting_hook_spec: Optional[str] = None + + # ------------------------------------------------------------------ + # Lifecycle / connection management + # ------------------------------------------------------------------ + def _get_client(self, config: RepoConfig) -> aerospike.Client: + """Lazily create and cache an Aerospike client on first use. + + The underlying C client maintains its own connection pool, so a single + cached instance is safe to share across calls on this store. + """ + if self._client is not None: + return self._client + + if not isinstance(config.online_store, AerospikeOnlineStoreConfig): + raise RuntimeError(f"{config.online_store.type = }. It must be aerospike.") + store_cfg = config.online_store + + read_policy: Dict[str, Any] = { + "total_timeout": store_cfg.read_timeout_ms, + "max_retries": store_cfg.max_retries, + } + write_policy: Dict[str, Any] = { + "total_timeout": store_cfg.write_timeout_ms, + "max_retries": store_cfg.max_retries, + } + batch_policy: Dict[str, Any] = { + "total_timeout": store_cfg.batch_total_timeout_ms, + } + if store_cfg.socket_timeout_ms is not None: + # socket_timeout is the per-attempt deadline; without it, + # total_timeout is the whole budget and retries never fire. + read_policy["socket_timeout"] = store_cfg.socket_timeout_ms + write_policy["socket_timeout"] = store_cfg.socket_timeout_ms + batch_policy["socket_timeout"] = store_cfg.socket_timeout_ms + + client_config: Dict[str, Any] = { + "hosts": [tuple(h) for h in store_cfg.hosts], + "policies": { + "read": read_policy, + "write": write_policy, + "batch": batch_policy, + }, + **store_cfg.client_kwargs, + } + if store_cfg.user: + if store_cfg.password is None: + raise ValueError( + "AerospikeOnlineStoreConfig.user is set but password is not." + ) + client_config["user"] = store_cfg.user + client_config["password"] = store_cfg.password.get_secret_value() + client_config["auth_mode"] = _AUTH_MODE_TO_CONSTANT[store_cfg.auth_mode] + if store_cfg.tls: + client_config["tls"] = store_cfg.tls + + self._client = aerospike.client(client_config).connect() + return self._client + + def _set_name(self, config: RepoConfig, fv_name: Optional[str] = None) -> str: + """Resolve the Aerospike set name for a feature view. + + Without ``fv_name`` returns the project-level default rendered from + ``set_name_template``. With an ``fv_name`` returns the override from + :attr:`AerospikeOnlineStoreConfig.set_overrides` if present, falling + back to the project default. Calling without ``fv_name`` is preserved + for callers (and tests) that want the project's default set without + knowing which feature views might override it. + """ + store_cfg = config.online_store + overrides = getattr(store_cfg, "set_overrides", None) or {} + if fv_name is not None and fv_name in overrides: + return overrides[fv_name] + return store_cfg.set_name_template.format( + project=config.project, + collection_suffix=store_cfg.collection_suffix, + ) + + def _namespace_for_fv( + self, config: RepoConfig, fv_name: Optional[str] = None + ) -> str: + """Resolve the Aerospike namespace for a feature view. + + Mirrors :meth:`_set_name` semantics: without ``fv_name`` returns the + store-default namespace; with one, returns the override from + :attr:`AerospikeOnlineStoreConfig.namespace_overrides` if present. + """ + store_cfg = config.online_store + overrides = getattr(store_cfg, "namespace_overrides", None) or {} + if fv_name is not None and fv_name in overrides: + return overrides[fv_name] + return store_cfg.namespace + + def _aerospike_key( + self, + config: RepoConfig, + entity_key: EntityKeyProto, + fv_name: Optional[str] = None, + ) -> Tuple[str, str, bytearray]: + """Build a ``(namespace, set, user_key)`` tuple for an entity. + + When ``fv_name`` is provided, the namespace and set name honour the + per-feature-view overrides from + :class:`AerospikeOnlineStoreConfig`. Without ``fv_name`` the + store-level defaults are used. + + The user key is returned as a ``bytearray`` rather than ``bytes``: + the Aerospike Python C client rejects ``bytes`` user keys + (``calc_digest`` raises ``"Key is invalid"``), and ``batch_read`` / + ``batch_operate`` silently hash only the first byte of a ``bytes`` + key. ``bytearray`` is the supported binary-key type. + """ + user_key = serialize_entity_key( + entity_key, + entity_key_serialization_version=config.entity_key_serialization_version, + ) + return ( + self._namespace_for_fv(config, fv_name), + self._set_name(config, fv_name), + bytearray(user_key), + ) + + def _resolve_prewriting_hook(self, config: RepoConfig) -> Optional[PrewritingHook]: + """Resolve and cache the configured prewriting hook, if any. + + The hook is referenced by import string in the config (so it + survives YAML/JSON round-trips and remote-feature-server transport), + and resolved here on first use. The resolved callable is cached on + the store instance — and re-resolved automatically if a subsequent + config swap rebinds the store to a different hook. + """ + spec = getattr(config.online_store, "prewriting_hook", None) + if not spec: + # A previously-set hook on this instance must be cleared if the + # caller has since unset it on the config; otherwise we'd keep + # applying a hook the user explicitly turned off. + self._prewriting_hook = None + self._prewriting_hook_spec = None + return None + if spec == self._prewriting_hook_spec and self._prewriting_hook is not None: + return self._prewriting_hook + + module_path, _, fn_name = spec.rpartition(".") + if not module_path or not fn_name: + raise ValueError( + "AerospikeOnlineStoreConfig.prewriting_hook must be a fully " + f"qualified import path 'package.module.function', got: {spec!r}" + ) + try: + module = importlib.import_module(module_path) + except ImportError as e: + raise ValueError( + f"prewriting_hook {spec!r}: could not import module " + f"{module_path!r}: {e}" + ) from e + try: + hook = getattr(module, fn_name) + except AttributeError: + raise ValueError( + f"prewriting_hook {spec!r}: module {module_path!r} has no " + f"attribute {fn_name!r}" + ) from None + if not callable(hook): + raise TypeError( + f"prewriting_hook {spec!r} resolved to a non-callable " + f"{type(hook).__name__}" + ) + self._prewriting_hook = hook + self._prewriting_hook_spec = spec + return hook + + # ------------------------------------------------------------------ + # Write path + # ------------------------------------------------------------------ + @staticmethod + def _build_batch_writes( + config: RepoConfig, + table: FeatureView, + data: List[ + Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]] + ], + namespace: str, + set_name: str, + ) -> BatchRecords: + """Build a :class:`BatchRecords` with one :class:`BatchWrite` per row. + + Each row becomes an atomic, server-side Map-put op list: + + * ``features[][] = `` for every requested feature + (single ``map_put_items`` op keyed by feature-view name). + * ``event_ts[] = ``. + * ``created_ts = `` when provided. + + Using Map CDT ops rather than a full-record ``put`` means two writers + touching different feature views on the same entity will not clobber + each other — each write only mutates its own slot in the outer map. + + The Map CDTs are created with ``MAP_KEY_ORDERED`` so key lookups on + reads and the ``update()`` background scan stay O(log N) in the map + size. Writes use the default ``POLICY_KEY_DIGEST`` — the serialized + entity key itself is not stored on the server, saving per-record + storage that the read path never consumes (result order is preserved + by ``batch_operate`` and paired back via ``zip`` in ``online_read``). + """ + ttl_meta = {"ttl": _resolve_ttl(config.online_store.ttl_seconds)} + + batch = BatchRecords() + for entity_key, proto_values, event_timestamp, created_timestamp in data: + user_key = bytearray( + serialize_entity_key( + entity_key, + entity_key_serialization_version=config.entity_key_serialization_version, + ) + ) + feature_map = { + field: feast_value_type_to_python_type(val) + for field, val in proto_values.items() + } + operations: List[Dict[str, Any]] = [ + map_ops.map_put_items( + "features", + {table.name: feature_map}, + map_policy=_ORDERED_MAP_POLICY, + ), + map_ops.map_put( + "event_ts", + table.name, + _datetime_to_epoch_ms(event_timestamp), + map_policy=_ORDERED_MAP_POLICY, + ), + ] + if created_timestamp is not None: + operations.append( + ops.write("created_ts", _datetime_to_epoch_ms(created_timestamp)) + ) + batch.batch_records.append( + BatchWrite( + key=(namespace, set_name, user_key), + ops=operations, + meta=ttl_meta, + ) + ) + return batch + + def online_write_batch( + self, + config: RepoConfig, + table: FeatureView, + data: List[ + Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]] + ], + progress: Optional[Callable[[int], Any]], + ) -> None: + """Write a batch of feature rows using Aerospike's native batch-write API. + + Each row is upserted as a set of Map CDT operations on a single record, + preserving data for other feature views that share the same entity key. + """ + if not data: + if progress: + progress(0) + return + + # Hook resolution happens before the early-data check to avoid wiring + # work, but after the empty-batch check so a "no rows" call doesn't + # pay the import cost. Hooks are allowed to mutate ``data`` in place + # or return a new list; we always rebind to whatever they return. + hook = self._resolve_prewriting_hook(config) + if hook is not None: + data = hook(config, table, data) + if not data: + if progress: + progress(0) + return + + client = self._get_client(config) + namespace = self._namespace_for_fv(config, table.name) + set_name = self._set_name(config, table.name) + batch = self._build_batch_writes(config, table, data, namespace, set_name) + if batch.batch_records: + client.batch_write(batch) + # Per-record result codes must be inspected: client.batch_write + # only raises if the whole request was rejected. A partial failure + # (e.g. a single-partition timeout) is otherwise silent, which in + # an online-serving path presents downstream as "model saw stale + # features" weeks after the fact. + self._raise_on_batch_errors(batch.batch_records, set_name, op="write") + if progress: + progress(len(data)) + + # ------------------------------------------------------------------ + # Read path + # ------------------------------------------------------------------ + def online_read( + self, + config: RepoConfig, + table: FeatureView, + entity_keys: List[EntityKeyProto], + requested_features: Optional[List[str]] = None, + ) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]: + """Read feature values for a batch of entities in a single round trip. + + Uses Aerospike's ``batch_operate`` with two server-side Map-get ops per + record. When ``requested_features`` is provided, only those feature + columns are shipped over the wire using ``map_get_by_key_list`` nested + into the feature-view's Map CDT via ``cdt_ctx_map_key``. Otherwise the + whole feature-view slot is returned. + + * features op (projected): + ``map_get_by_key_list("features", requested_features, + MAP_RETURN_KEY_VALUE, ctx=[cdt_ctx_map_key()])`` + * features op (full): + ``map_get_by_key("features", , MAP_RETURN_VALUE)`` + * event_ts op (always): + ``map_get_by_key("event_ts", , MAP_RETURN_VALUE)`` + + Per-record status codes are inspected so we can tell a genuine miss + (``RECORD_NOT_FOUND``, or a nested ``OP_NOT_APPLICABLE`` when the + feature-view slot is absent) apart from a transient server error, + which is raised rather than silently returned as a null row. Output + order matches ``entity_keys``. + """ + if not entity_keys: + return [] + + client = self._get_client(config) + ns = self._namespace_for_fv(config, table.name) + set_name = self._set_name(config, table.name) + + keys = [ + ( + ns, + set_name, + bytearray( + serialize_entity_key( + k, + entity_key_serialization_version=config.entity_key_serialization_version, + ) + ), + ) + for k in entity_keys + ] + read_ops = self._build_read_ops(table.name, requested_features) + + batch = client.batch_operate(keys, read_ops) + + # ``ids`` and ``docs`` use immutable ``bytes`` because ``bytearray`` is + # unhashable and can't key a dict. Keys on the wire must stay + # ``bytearray`` (see ``_aerospike_key``) — we only convert here for + # lookup. + ids = [bytes(user_key) for _, _, user_key in keys] + docs: Dict[bytes, Dict[str, Any]] = {} + # batch_operate preserves request order. We pair each response with + # the original user-key rather than ``br.key[2]``: the Aerospike + # client may return the key in a different representation (e.g. only + # the first byte as a str when the write didn't use POLICY_KEY_SEND + # for reads). + for user_key, br in zip(ids, batch.batch_records): + if br.result == _AS_ERR_RECORD_NOT_FOUND: + continue + if br.result == _AS_ERR_OP_NOT_APPLICABLE: + # The record exists but the nested feature-view slot doesn't; + # treat as a miss to match the OnlineStore contract. + continue + if br.result != _AS_OK: + raise RuntimeError( + f"Aerospike batch_operate returned a non-OK status for " + f"entity (ns={ns}, set={set_name}): result={br.result}" + ) + if br.record is None: + continue + _, _, bins = br.record + raw_features = bins.get("features") if bins else None + fv_event_ts_ms = bins.get("event_ts") if bins else None + fv_features = self._normalize_projected_features(raw_features) + docs[user_key] = { + "features": {table.name: fv_features} if fv_features else {}, + "event_timestamps": {table.name: _epoch_ms_to_datetime(fv_event_ts_ms)}, + } + + return self._convert_raw_docs_to_proto(ids, docs, table) + + @staticmethod + def _build_read_ops( + fv_name: str, requested_features: Optional[List[str]] + ) -> List[Dict[str, Any]]: + """Build the per-record op list for an ``online_read`` call. + + Projects ``requested_features`` server-side via + ``map_get_by_key_list`` + ``cdt_ctx_map_key`` when a projection list + is provided. Without a projection, returns the whole feature-view + submap. + """ + if requested_features: + features_op = map_ops.map_get_by_key_list( + "features", + list(requested_features), + aerospike.MAP_RETURN_KEY_VALUE, + ctx=[cdt_ctx.cdt_ctx_map_key(fv_name)], + ) + else: + features_op = map_ops.map_get_by_key( + "features", fv_name, aerospike.MAP_RETURN_VALUE + ) + return [ + features_op, + map_ops.map_get_by_key("event_ts", fv_name, aerospike.MAP_RETURN_VALUE), + ] + + @staticmethod + def _normalize_projected_features( + raw: Optional[Union[Dict[str, Any], List[Any]]], + ) -> Optional[Dict[str, Any]]: + """Convert an Aerospike features payload into a uniform ``{name: val}`` dict. + + The shape depends on which op produced the payload: + + * ``map_get_by_key("features", , MAP_RETURN_VALUE)`` returns a + ``dict`` (the inner feature-view submap). + * ``map_get_by_key_list("features", [...], MAP_RETURN_KEY_VALUE, + ctx=...)`` returns a flat ``list`` of ``[k1, v1, k2, v2, ...]`` + containing only the requested keys that exist. + """ + if raw is None: + return None + if isinstance(raw, dict): + return raw + if isinstance(raw, list): + if not raw: + return None + return dict(zip(raw[0::2], raw[1::2])) + return None + + @staticmethod + def _raise_on_batch_errors( + batch_records: Sequence[Any], set_name: str, op: str + ) -> None: + """Raise if any per-record result code signals a failed batch write/op. + + ``client.batch_write`` and ``client.batch_operate`` only raise when the + overall request was rejected; partial failures (a single-partition + timeout, a replica quorum miss, etc.) are surfaced per record via + ``br.result`` and are otherwise silent. In an online-serving path + those silent failures later present as missing features, so we fail + loud here instead. + """ + errors = [br.result for br in batch_records if br.result != _AS_OK] + if errors: + raise RuntimeError( + f"Aerospike batch_{op} returned non-OK status codes for " + f"{len(errors)} of {len(batch_records)} records " + f"(set={set_name}): codes={errors[:10]}" + ) + + @staticmethod + def _convert_raw_docs_to_proto( + ids: List[bytes], + docs: Dict[bytes, Dict[str, Any]], + table: FeatureView, + ) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]: + """Convert raw feature maps into ordered proto rows. + + The heavy lifting is done by + :func:`feast.type_map.python_values_to_proto_values`, which is + column-oriented and expects a list of values of a single type. This + helper transforms the row-oriented Aerospike lookup result into + columns, converts each column once, then reassembles rows — mirroring + the MongoDB online store's reshape so we amortize the python→proto + cost across the whole batch. + + Args: + ids: serialized entity-key bytes, in the order requested. + docs: ``{entity_id_bytes: {"features": {: {...}}, + "event_timestamps": {: datetime}}}``. Missing keys denote + "record not found". + table: FeatureView being read; provides feature name → type. + + Returns: + A list of ``(event_timestamp, feature_dict)`` the same length as + ``ids`` (``(None, None)`` for entities that had no data for this + feature view). + """ + feature_type_map = { + feature.name: feature.dtype.to_value_type() for feature in table.features + } + + raw_feature_columns: Dict[str, List[Any]] = { + feature_name: [] for feature_name in feature_type_map + } + for entity_id in ids: + doc = docs.get(entity_id) + feature_dict = doc.get("features", {}).get(table.name, {}) if doc else {} + for feature_name in feature_type_map: + raw_feature_columns[feature_name].append( + feature_dict.get(feature_name, None) + ) + + proto_feature_columns = { + feature_name: python_values_to_proto_values( + raw_values, feature_type=feature_type_map[feature_name] + ) + for feature_name, raw_values in raw_feature_columns.items() + } + + results: List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]] = [] + for i, entity_id in enumerate(ids): + doc = docs.get(entity_id) + if doc is None: + results.append((None, None)) + continue + + fv_features = doc.get("features", {}).get(table.name) + if fv_features is None: + results.append((None, None)) + continue + + ts = doc.get("event_timestamps", {}).get(table.name) + row_features = { + feature_name: proto_feature_columns[feature_name][i] + for feature_name in proto_feature_columns + } + results.append((ts, row_features)) + return results + + # ------------------------------------------------------------------ + # Async wrappers + # ------------------------------------------------------------------ + # The Aerospike Python client is a synchronous C extension; there is no + # native asyncio interface. Network calls do release the GIL, so we expose + # a correct ``async`` surface by offloading each blocking call to the + # default thread-pool executor. Callers that ``await`` these methods keep + # the event loop responsive while the client talks to the cluster. + + async def online_write_batch_async( + self, + config: RepoConfig, + table: FeatureView, + data: List[ + Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]] + ], + progress: Optional[Callable[[int], Any]], + ) -> None: + loop = asyncio.get_running_loop() + await loop.run_in_executor( + None, + functools.partial(self.online_write_batch, config, table, data, progress), + ) + + async def online_read_async( + self, + config: RepoConfig, + table: FeatureView, + entity_keys: List[EntityKeyProto], + requested_features: Optional[List[str]] = None, + ) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]: + loop = asyncio.get_running_loop() + return await loop.run_in_executor( + None, + functools.partial( + self.online_read, config, table, entity_keys, requested_features + ), + ) + + async def initialize(self, config: RepoConfig) -> None: + """Pre-warm the Aerospike client so the first request is hot. + + Feature servers typically call :meth:`initialize` during startup so the + TCP + handshake latency is paid upfront rather than on the first + ``online_read``. + """ + loop = asyncio.get_running_loop() + await loop.run_in_executor(None, functools.partial(self._get_client, config)) + + async def close(self) -> None: + """Release the cached Aerospike client, if any.""" + if self._client is None: + return + client = self._client + self._client = None + loop = asyncio.get_running_loop() + await loop.run_in_executor(None, client.close) + + # ------------------------------------------------------------------ + # Admin paths (update / teardown) + # ------------------------------------------------------------------ + def update( + self, + config: RepoConfig, + tables_to_delete: Sequence[FeatureView], + tables_to_keep: Sequence[FeatureView], + entities_to_delete: Sequence[Entity], + entities_to_keep: Sequence[Entity], + partial: bool, + ) -> None: + """Reconcile per-feature-view data when a schema change is applied. + + Aerospike has no explicit schema, and records/sets are created lazily + on first write, so there is nothing to do for ``tables_to_keep`` or + either of the entity lists. For ``tables_to_delete`` we strip each + feature-view's slot out of the ``features`` and ``event_ts`` Map + CDTs on every record that contains it. + + Dropped feature views are grouped by their resolved + ``(namespace, set)`` (per :attr:`namespace_overrides` / + :attr:`set_overrides`), and each group is issued as one + **background scan** with a combined op list — a single server-side + pass per (ns, set), regardless of how many feature views in that + group are being dropped. Without this grouping, a deployment that + spreads feature views across multiple namespaces or sets would + either issue blind cross-namespace scans (impossible) or scan one + per dropped FV (wasteful). The scans run asynchronously server-side + and return immediately; this matches the intent of ``feast apply``, + after which the caller stops reading the dropped feature views + anyway. + """ + if not isinstance(config.online_store, AerospikeOnlineStoreConfig): + raise RuntimeError(f"{config.online_store.type = }. It must be aerospike.") + if not tables_to_delete: + return + + client = self._get_client(config) + + # Group dropped feature views by (namespace, set) so each scan only + # touches the records that could plausibly contain the dropped slot. + groups: Dict[Tuple[str, str], List[FeatureView]] = {} + for fv in tables_to_delete: + key = ( + self._namespace_for_fv(config, fv.name), + self._set_name(config, fv.name), + ) + groups.setdefault(key, []).append(fv) + + for (ns, set_name), fvs in groups.items(): + remove_ops: List[Dict[str, Any]] = [] + for fv in fvs: + remove_ops.append( + map_ops.map_remove_by_key( + "features", fv.name, aerospike.MAP_RETURN_NONE + ) + ) + remove_ops.append( + map_ops.map_remove_by_key( + "event_ts", fv.name, aerospike.MAP_RETURN_NONE + ) + ) + + scan = client.scan(ns, set_name) + scan.add_ops(remove_ops) + scan.execute_background() + + def teardown( + self, + config: RepoConfig, + tables: Sequence[FeatureView], + entities: Sequence[Entity], + ) -> None: + """Truncate every (namespace, set) the project may have written to + and close the cached client. + + Uses Aerospike's ``truncate(namespace, set, 0)`` — a set-scoped + metadata operation that clears every record in O(1) client time, + cheaper than Mongo's ``collection.drop()``. Passing ``0`` as the + cutoff means "drop everything regardless of last-update time". + + Collects the unique ``(namespace, set)`` pairs from the project's + store-level default plus every feature view in ``tables`` (each + resolved through :attr:`namespace_overrides` / + :attr:`set_overrides`). The default is always included so a + teardown invoked with an empty ``tables`` list still clears the + store-default location. + + Truncate on a non-existent set is a no-op, so calling ``teardown`` + on a project that never wrote data — or on a feature view that + was never written to — is safe. + """ + if not isinstance(config.online_store, AerospikeOnlineStoreConfig): + raise RuntimeError(f"{config.online_store.type = }. It must be aerospike.") + + client = self._get_client(config) + + pairs: Set[Tuple[str, str]] = { + (config.online_store.namespace, self._set_name(config)) + } + for fv in tables: + pairs.add( + ( + self._namespace_for_fv(config, fv.name), + self._set_name(config, fv.name), + ) + ) + # sort to keep the truncation order deterministic for tests; the + # operation is independent per (ns, set) so order has no semantic + # meaning. + for ns, set_name in sorted(pairs): + client.truncate(ns, set_name, 0) + if self._client is not None: + self._client.close() + self._client = None diff --git a/sdk/python/feast/infra/online_stores/aerospike_online_store/aerospike_repo_configuration.py b/sdk/python/feast/infra/online_stores/aerospike_online_store/aerospike_repo_configuration.py new file mode 100644 index 00000000000..e704aca780d --- /dev/null +++ b/sdk/python/feast/infra/online_stores/aerospike_online_store/aerospike_repo_configuration.py @@ -0,0 +1,13 @@ +from tests.universal.feature_repos.integration_test_repo_config import ( + IntegrationTestRepoConfig, +) +from tests.universal.feature_repos.universal.online_store.aerospike import ( + AerospikeOnlineStoreCreator, +) + +FULL_REPO_CONFIGS = [ + IntegrationTestRepoConfig( + online_store="aerospike", + online_store_creator=AerospikeOnlineStoreCreator, + ), +] diff --git a/sdk/python/feast/repo_config.py b/sdk/python/feast/repo_config.py index 06529cea0f2..d6669392696 100644 --- a/sdk/python/feast/repo_config.py +++ b/sdk/python/feast/repo_config.py @@ -66,6 +66,7 @@ } ONLINE_STORE_CLASS_FOR_TYPE = { + "aerospike": "feast.infra.online_stores.aerospike_online_store.AerospikeOnlineStore", "sqlite": "feast.infra.online_stores.sqlite.SqliteOnlineStore", "datastore": "feast.infra.online_stores.datastore.DatastoreOnlineStore", "redis": "feast.infra.online_stores.redis.RedisOnlineStore", diff --git a/sdk/python/tests/unit/online_store/test_aerospike_online_retrieval.py b/sdk/python/tests/unit/online_store/test_aerospike_online_retrieval.py new file mode 100644 index 00000000000..9ff56e5a67b --- /dev/null +++ b/sdk/python/tests/unit/online_store/test_aerospike_online_retrieval.py @@ -0,0 +1,1580 @@ +""" +Unit tests for the Aerospike online store. + +Most of the tests here are pure Python and run in any environment (they cover +the timestamp/TTL helpers, the column-oriented proto reshape, and the +write/read/admin dispatch with a mocked Aerospike client). One end-to-end test +is marked with ``@_requires_docker`` and is skipped when Docker is unavailable. +""" + +import time +from datetime import datetime, timedelta, timezone +from types import SimpleNamespace +from unittest.mock import MagicMock, patch + +import pytest + +pytest.importorskip("aerospike") + +import aerospike # noqa: E402 + +from feast import FeatureView, Field, FileSource # noqa: E402 +from feast.infra.online_stores.aerospike_online_store.aerospike import ( # noqa: E402 + AerospikeOnlineStore, + _datetime_to_epoch_ms, + _epoch_ms_to_datetime, + _resolve_ttl, +) +from feast.protos.feast.types.EntityKey_pb2 import ( + EntityKey as EntityKeyProto, # noqa: E402 +) +from feast.protos.feast.types.Value_pb2 import Value as ValueProto # noqa: E402 +from feast.repo_config import RepoConfig # noqa: E402 +from feast.types import Float64, Int64 # noqa: E402 +from feast.utils import _utc_now # noqa: E402 +from tests.universal.feature_repos.universal.online_store.aerospike import ( # noqa: E402 + AEROSPIKE_CE_IMAGE, +) +from tests.utils.cli_repo_creator import CliRunner, get_example_repo # noqa: E402 + +docker_available = False +try: + import docker + from testcontainers.core.container import DockerContainer + from testcontainers.core.waiting_utils import wait_for_logs + + try: + _docker = docker.from_env() + _docker.ping() + docker_available = True + except Exception: + pass +except ImportError: + pass + +_requires_docker = pytest.mark.skipif( + not docker_available, + reason="Docker is not available or not running. Start Docker daemon to run these tests.", +) + + +# --------------------------------------------------------------------------- +# Shared fixtures / helpers +# --------------------------------------------------------------------------- + + +def _make_fv(*field_names: str, dtype=Int64) -> FeatureView: + """Build a minimal FeatureView for conversion tests.""" + return FeatureView( + name="test_fv", + entities=[], + schema=[Field(name=n, dtype=dtype) for n in field_names], + source=FileSource(path="fake.parquet", timestamp_field="event_timestamp"), + ttl=timedelta(days=1), + ) + + +def _aerospike_repo_config(**online_store_overrides) -> RepoConfig: + base = {"type": "aerospike", "namespace": "feast"} + base.update(online_store_overrides) + return RepoConfig( + project="demo", + provider="local", + registry="/tmp/reg.db", + online_store=base, + entity_key_serialization_version=3, + ) + + +def _fake_batch_record(key: tuple, bins): + """Mimic aerospike_helpers.batch.records.BatchRecord for a successful read.""" + return SimpleNamespace( + key=key, + result=0, + record=(key, {"ttl": 0, "gen": 1}, bins) if bins is not None else None, + in_doubt=False, + ) + + +# --------------------------------------------------------------------------- +# Helpers: timestamp and TTL conversions +# --------------------------------------------------------------------------- + + +def test_datetime_helpers_round_trip_utc(): + dt = datetime(2026, 4, 20, 12, 30, 45, 123000, tzinfo=timezone.utc) + ms = _datetime_to_epoch_ms(dt) + assert _epoch_ms_to_datetime(ms) == dt + + +def test_datetime_helpers_treat_naive_as_utc(): + dt_naive = datetime(2026, 4, 20, 12, 30, 45, 123000) + dt_utc = dt_naive.replace(tzinfo=timezone.utc) + assert _datetime_to_epoch_ms(dt_naive) == _datetime_to_epoch_ms(dt_utc) + + +def test_epoch_ms_to_datetime_none_passthrough(): + assert _epoch_ms_to_datetime(None) is None + + +def test_resolve_ttl_sentinels(): + assert _resolve_ttl(None) == aerospike.TTL_NAMESPACE_DEFAULT + assert _resolve_ttl(0) == aerospike.TTL_NEVER_EXPIRE + assert _resolve_ttl(3600) == 3600 + + +def test_socket_timeout_ms_propagates_to_read_write_and_batch_policies(monkeypatch): + """``socket_timeout_ms`` must apply uniformly to read, write and batch + policies — otherwise ``max_retries`` can't fire within the total budget + (the first attempt alone consumes the whole deadline).""" + captured: dict = {} + + def fake_client(cfg): + captured["cfg"] = cfg + fake = MagicMock() + fake.connect.return_value = fake + return fake + + monkeypatch.setattr(aerospike, "client", fake_client) + + config = _aerospike_repo_config( + batch_total_timeout_ms=1500, + socket_timeout_ms=40, + read_timeout_ms=100, + write_timeout_ms=200, + ) + store = AerospikeOnlineStore() + store._get_client(config) + + policies = captured["cfg"]["policies"] + assert policies["read"]["total_timeout"] == 100 + assert policies["read"]["socket_timeout"] == 40 + assert policies["write"]["total_timeout"] == 200 + assert policies["write"]["socket_timeout"] == 40 + assert policies["batch"]["total_timeout"] == 1500 + assert policies["batch"]["socket_timeout"] == 40 + + +def test_socket_timeout_ms_omitted_when_not_set(monkeypatch): + """Unset ``socket_timeout_ms`` must leave the client's default in place, + not inject ``None`` into the policy dicts.""" + captured: dict = {} + + def fake_client(cfg): + captured["cfg"] = cfg + fake = MagicMock() + fake.connect.return_value = fake + return fake + + monkeypatch.setattr(aerospike, "client", fake_client) + + store = AerospikeOnlineStore() + store._get_client(_aerospike_repo_config()) + + policies = captured["cfg"]["policies"] + for scope in ("read", "write", "batch"): + assert "socket_timeout" not in policies[scope], ( + f"socket_timeout leaked into {scope} policy with no config override" + ) + + +# --------------------------------------------------------------------------- +# _convert_raw_docs_to_proto — same contract as MongoDB's helper +# --------------------------------------------------------------------------- + + +def test_convert_raw_docs_missing_entity(): + """Entity key absent from docs -> (None, None).""" + fv = _make_fv("score") + ts = datetime(2024, 1, 1, tzinfo=timezone.utc) + ids = [b"present", b"missing"] + docs = { + b"present": { + "features": {"test_fv": {"score": 42}}, + "event_timestamps": {"test_fv": ts}, + } + } + + results = AerospikeOnlineStore._convert_raw_docs_to_proto(ids, docs, fv) + + assert len(results) == 2 + ts_out, feats_out = results[0] + assert ts_out == ts + assert feats_out["score"].int64_val == 42 + assert results[1] == (None, None) + + +def test_convert_raw_docs_partial_doc(): + """Entity exists but one feature key is absent -> empty ValueProto for that feature.""" + fv = _make_fv("present_feat", "missing_feat") + ts = datetime(2024, 1, 1, tzinfo=timezone.utc) + ids = [b"entity1"] + docs = { + b"entity1": { + "features": {"test_fv": {"present_feat": 99}}, + "event_timestamps": {"test_fv": ts}, + } + } + + results = AerospikeOnlineStore._convert_raw_docs_to_proto(ids, docs, fv) + + assert len(results) == 1 + ts_out, feats_out = results[0] + assert ts_out == ts + assert feats_out["present_feat"].int64_val == 99 + assert feats_out["missing_feat"] == ValueProto() + + +def test_convert_raw_docs_entity_exists_but_fv_not_written(): + """Entity doc exists (written by another FV) but this FV was never written -> (None, None).""" + pricing_fv = _make_fv("price") + ts = datetime(2024, 1, 1, tzinfo=timezone.utc) + ids = [b"driver_1"] + docs = { + b"driver_1": { + "features": {"driver_stats": {"acc_rate": 0.9}}, + "event_timestamps": {"driver_stats": ts}, + } + } + + results = AerospikeOnlineStore._convert_raw_docs_to_proto(ids, docs, pricing_fv) + + assert len(results) == 1 + assert results[0] == (None, None) + + +def test_convert_raw_docs_ordering(): + """Result order matches the ids list regardless of dict insertion order in docs.""" + fv = _make_fv("score") + ts = datetime(2024, 1, 1, tzinfo=timezone.utc) + + ids = [b"entity_z", b"entity_a", b"entity_m"] + docs = { + b"entity_a": { + "features": {"test_fv": {"score": 2}}, + "event_timestamps": {"test_fv": ts}, + }, + b"entity_m": { + "features": {"test_fv": {"score": 3}}, + "event_timestamps": {"test_fv": ts}, + }, + b"entity_z": { + "features": {"test_fv": {"score": 1}}, + "event_timestamps": {"test_fv": ts}, + }, + } + + results = AerospikeOnlineStore._convert_raw_docs_to_proto(ids, docs, fv) + + assert [row[1]["score"].int64_val for row in results] == [1, 2, 3] + + +# --------------------------------------------------------------------------- +# Write path: _build_batch_writes + online_write_batch dispatch +# --------------------------------------------------------------------------- + + +def _entity_key(join_key: str, value: int) -> EntityKeyProto: + return EntityKeyProto( + join_keys=[join_key], entity_values=[ValueProto(int64_val=value)] + ) + + +def test_build_batch_writes_produces_three_ops_with_created_ts(): + config = _aerospike_repo_config(ttl_seconds=3600) + fv = SimpleNamespace(name="driver_stats") + ts = datetime(2026, 4, 20, 12, 30, 45, tzinfo=timezone.utc) + row = ( + _entity_key("driver_id", 1), + { + "rating": ValueProto(double_val=4.91), + "trips_last_7d": ValueProto(int64_val=132), + }, + ts, + ts, + ) + + batch = AerospikeOnlineStore._build_batch_writes( + config, fv, [row], namespace="feast", set_name="demo_latest" + ) + + assert len(batch.batch_records) == 1 + bw = batch.batch_records[0] + assert bw.key[:2] == ("feast", "demo_latest") + # Must be bytearray, not bytes: the Aerospike Python C client rejects + # bytes user keys ("Key is invalid") and silently hashes only the first + # byte inside batch_operate/batch_read — causing digest collisions. + assert isinstance(bw.key[2], bytearray) + assert bw.meta == {"ttl": 3600} + # No explicit per-record policy: writes rely on the client-level default + # (POLICY_KEY_DIGEST), which doesn't persist the serialized entity key + # server-side — batch_operate preserves request order so the stored key + # has no functional use on the read path. + assert bw.policy is None + assert len(bw.ops) == 3 + bin_names = [op["bin"] for op in bw.ops] + assert bin_names == ["features", "event_ts", "created_ts"] + # Map CDTs must be created with MAP_KEY_ORDERED so key lookups on reads + # and the update() background scan stay O(log N). + for op in bw.ops: + if op["bin"] in ("features", "event_ts"): + assert op["map_policy"] == {"map_order": aerospike.MAP_KEY_ORDERED} + + +def test_build_batch_writes_omits_created_ts_when_none(): + config = _aerospike_repo_config() + fv = SimpleNamespace(name="driver_stats") + row = ( + _entity_key("driver_id", 1), + {"rating": ValueProto(double_val=4.91)}, + datetime(2026, 4, 20, tzinfo=timezone.utc), + None, + ) + + batch = AerospikeOnlineStore._build_batch_writes( + config, fv, [row], namespace="feast", set_name="demo_latest" + ) + + ops = batch.batch_records[0].ops + assert len(ops) == 2 + assert {op["bin"] for op in ops} == {"features", "event_ts"} + + +def test_build_batch_writes_ttl_sentinels(): + config = _aerospike_repo_config(ttl_seconds=0) + fv = SimpleNamespace(name="fv") + row = ( + _entity_key("id", 1), + {"x": ValueProto(int64_val=1)}, + datetime(2026, 1, 1, tzinfo=timezone.utc), + None, + ) + + batch = AerospikeOnlineStore._build_batch_writes( + config, fv, [row], namespace="feast", set_name="set" + ) + assert batch.batch_records[0].meta == {"ttl": aerospike.TTL_NEVER_EXPIRE} + + +def test_online_write_batch_dispatches_to_client(): + config = _aerospike_repo_config() + fv = SimpleNamespace(name="fv") + store = AerospikeOnlineStore() + fake_client = MagicMock() + store._client = fake_client + + row = ( + _entity_key("id", 1), + {"x": ValueProto(int64_val=1)}, + datetime(2026, 1, 1, tzinfo=timezone.utc), + None, + ) + progress_calls: list[int] = [] + store.online_write_batch(config, fv, [row], progress=progress_calls.append) + + assert fake_client.batch_write.called + assert progress_calls == [1] + + +def test_online_write_batch_empty_short_circuits(): + config = _aerospike_repo_config() + fv = SimpleNamespace(name="fv") + store = AerospikeOnlineStore() + fake_client = MagicMock() + store._client = fake_client + + progress_calls: list[int] = [] + store.online_write_batch(config, fv, [], progress=progress_calls.append) + + assert not fake_client.batch_write.called + assert progress_calls == [0] + + +# --------------------------------------------------------------------------- +# Read path: online_read dispatches and converts via batch_operate +# --------------------------------------------------------------------------- + + +def _read_feature_view() -> SimpleNamespace: + """Minimal FV object exposing .name and .features with dtype mappings.""" + return SimpleNamespace( + name="driver_stats", + features=[ + SimpleNamespace(name="rating", dtype=Float64), + SimpleNamespace(name="trips_last_7d", dtype=Int64), + ], + ) + + +def test_online_read_happy_path_with_projection_and_ordering(): + config = _aerospike_repo_config() + fv = _read_feature_view() + store = AerospikeOnlineStore() + ts = datetime(2026, 4, 20, 12, 30, 45, 123000, tzinfo=timezone.utc) + + ek1 = _entity_key("driver_id", 1) + ek2 = _entity_key("driver_id", 2) + ek3 = _entity_key("driver_id", 3) + key1 = store._aerospike_key(config, ek1) + key2 = store._aerospike_key(config, ek2) + key3 = store._aerospike_key(config, ek3) + + def fake_batch_operate(keys, ops): + assert keys == [key1, key2, key3] + assert len(ops) == 2 + assert ops[0]["bin"] == "features" + assert ops[1]["bin"] == "event_ts" + # Aerospike's batch_operate preserves input order; the middle record + # is simulated as missing to verify we still emit (None, None) in + # the correct slot. + br1 = _fake_batch_record( + key1, + { + "features": {"rating": 4.91, "trips_last_7d": 132}, + "event_ts": _datetime_to_epoch_ms(ts), + }, + ) + br2 = _fake_batch_record(key2, None) # missing record + br3 = _fake_batch_record( + key3, + { + "features": {"rating": 3.75, "trips_last_7d": 42}, + "event_ts": _datetime_to_epoch_ms(ts), + }, + ) + return SimpleNamespace(batch_records=[br1, br2, br3]) + + fake_client = MagicMock() + fake_client.batch_operate.side_effect = fake_batch_operate + store._client = fake_client + + results = store.online_read(config, fv, [ek1, ek2, ek3]) + + assert len(results) == 3 + ts0, feats0 = results[0] + assert ts0 == ts + assert abs(feats0["rating"].double_val - 4.91) < 1e-9 + assert feats0["trips_last_7d"].int64_val == 132 + assert results[1] == (None, None) + ts2, feats2 = results[2] + assert ts2 == ts + assert abs(feats2["rating"].double_val - 3.75) < 1e-9 + + +def test_online_read_empty_keys_returns_empty(): + store = AerospikeOnlineStore() + store._client = MagicMock() + fv = _read_feature_view() + assert store.online_read(_aerospike_repo_config(), fv, []) == [] + assert not store._client.batch_operate.called + + +def test_online_read_record_exists_but_fv_not_present_returns_none(): + config = _aerospike_repo_config() + fv = _read_feature_view() + store = AerospikeOnlineStore() + ek = _entity_key("driver_id", 1) + key = store._aerospike_key(config, ek) + + def fake_batch_operate(keys, ops): + return SimpleNamespace( + batch_records=[ + _fake_batch_record(key, {"features": None, "event_ts": None}) + ] + ) + + fake_client = MagicMock() + fake_client.batch_operate.side_effect = fake_batch_operate + store._client = fake_client + + results = store.online_read(config, fv, [ek]) + assert results == [(None, None)] + + +def _fake_error_record(key: tuple, result_code: int): + """Batch response for an error case — no ``record`` but a non-OK result.""" + return SimpleNamespace(key=key, result=result_code, record=None, in_doubt=False) + + +def test_online_read_record_not_found_returns_none(): + """``br.result == 2`` (RECORD_NOT_FOUND) is a genuine miss, not an error.""" + config = _aerospike_repo_config() + fv = _read_feature_view() + store = AerospikeOnlineStore() + ek = _entity_key("driver_id", 1) + key = store._aerospike_key(config, ek) + + fake_client = MagicMock() + fake_client.batch_operate.return_value = SimpleNamespace( + batch_records=[_fake_error_record(key, result_code=2)] + ) + store._client = fake_client + + assert store.online_read(config, fv, [ek]) == [(None, None)] + + +def test_online_read_op_not_applicable_returns_none(): + """``br.result == 26`` (OP_NOT_APPLICABLE) happens when the nested FV + submap is absent — also a miss under the OnlineStore contract, not an + error.""" + config = _aerospike_repo_config() + fv = _read_feature_view() + store = AerospikeOnlineStore() + ek = _entity_key("driver_id", 1) + key = store._aerospike_key(config, ek) + + fake_client = MagicMock() + fake_client.batch_operate.return_value = SimpleNamespace( + batch_records=[_fake_error_record(key, result_code=26)] + ) + store._client = fake_client + + assert store.online_read(config, fv, [ek]) == [(None, None)] + + +def test_online_read_raises_on_transient_error(): + """Any other non-OK result (e.g. 9 = TIMEOUT) must be surfaced as an error. + + A silent null-return on a transient timeout is the root-cause class of + bug that shows up weeks later as a model-quality regression; pinning + this with a test keeps the read path loud on failure. + """ + config = _aerospike_repo_config() + fv = _read_feature_view() + store = AerospikeOnlineStore() + ek = _entity_key("driver_id", 1) + key = store._aerospike_key(config, ek) + + fake_client = MagicMock() + fake_client.batch_operate.return_value = SimpleNamespace( + batch_records=[_fake_error_record(key, result_code=9)] # TIMEOUT + ) + store._client = fake_client + + with pytest.raises(RuntimeError, match="non-OK status"): + store.online_read(config, fv, [ek]) + + +def test_online_read_projects_requested_features_server_side(): + """``requested_features`` must drive a ``map_get_by_key_list`` projection + nested into the feature-view submap via ``cdt_ctx``; the unordered + ``MAP_RETURN_KEY_VALUE`` response is a flat ``[k1,v1,k2,v2]`` list.""" + config = _aerospike_repo_config() + fv = _read_feature_view() + store = AerospikeOnlineStore() + ek = _entity_key("driver_id", 1) + key = store._aerospike_key(config, ek) + ts = datetime(2026, 4, 20, 12, 30, 45, tzinfo=timezone.utc) + + captured_ops: list = [] + + def fake_batch_operate(keys, ops): + captured_ops.extend(ops) + # Projected features come back as a flat [k,v,k,v] list rather than + # a dict — the store must normalize this before feeding the row + # reshape helper. + return SimpleNamespace( + batch_records=[ + _fake_batch_record( + key, + { + "features": ["rating", 4.91, "trips_last_7d", 132], + "event_ts": _datetime_to_epoch_ms(ts), + }, + ) + ] + ) + + fake_client = MagicMock() + fake_client.batch_operate.side_effect = fake_batch_operate + store._client = fake_client + + results = store.online_read(config, fv, [ek], requested_features=["rating"]) + + assert len(results) == 1 + ts_out, feats = results[0] + assert ts_out == ts + assert abs(feats["rating"].double_val - 4.91) < 1e-9 + assert feats["trips_last_7d"].int64_val == 132 + + features_op = captured_ops[0] + assert features_op["op"] == aerospike.OP_MAP_GET_BY_KEY_LIST + assert features_op["bin"] == "features" + assert features_op["val"] == ["rating"] + assert features_op["return_type"] == aerospike.MAP_RETURN_KEY_VALUE + # The ctx must nest the projection inside the FV's submap so the server + # only ships the requested columns over the wire. + assert features_op.get("ctx"), "projection op is missing ctx" + + +def test_normalize_projected_features_handles_all_payload_shapes(): + """The shape of the ``features`` payload depends on which op produced it; + the helper must accept all of them.""" + assert AerospikeOnlineStore._normalize_projected_features(None) is None + assert AerospikeOnlineStore._normalize_projected_features([]) is None + assert AerospikeOnlineStore._normalize_projected_features(["a", 1, "b", 2]) == { + "a": 1, + "b": 2, + } + assert AerospikeOnlineStore._normalize_projected_features({"a": 1, "b": 2}) == { + "a": 1, + "b": 2, + } + + +def test_online_write_batch_raises_on_per_record_error(): + """A partial batch failure (one record's result != 0) must raise rather + than silently succeed — otherwise downstream sees "model saw stale + features" weeks after the fact.""" + config = _aerospike_repo_config() + fv = SimpleNamespace(name="fv") + store = AerospikeOnlineStore() + fake_client = MagicMock() + + def fake_batch_write(batch): + # Simulate a one-partition timeout on the single record. + batch.batch_records[0].result = 9 # TIMEOUT + + fake_client.batch_write.side_effect = fake_batch_write + store._client = fake_client + + row = ( + _entity_key("id", 1), + {"x": ValueProto(int64_val=1)}, + datetime(2026, 1, 1, tzinfo=timezone.utc), + None, + ) + with pytest.raises(RuntimeError, match="non-OK"): + store.online_write_batch(config, fv, [row], progress=None) + + +# --------------------------------------------------------------------------- +# Admin paths: update / teardown +# --------------------------------------------------------------------------- + + +def test_update_no_op_when_nothing_to_delete(): + store = AerospikeOnlineStore() + fake_client = MagicMock() + store._client = fake_client + store.update(_aerospike_repo_config(), [], [], [], [], partial=False) + assert not fake_client.scan.called + + +def test_update_single_fv_issues_single_background_scan(): + store = AerospikeOnlineStore() + fake_client = MagicMock() + fake_scan = MagicMock() + fake_client.scan.return_value = fake_scan + store._client = fake_client + + store.update( + _aerospike_repo_config(), + [SimpleNamespace(name="old_fv")], + [], + [], + [], + partial=False, + ) + + assert fake_client.scan.call_args[0] == ("feast", "demo_latest") + ops = fake_scan.add_ops.call_args[0][0] + assert len(ops) == 2 + assert {op["bin"] for op in ops} == {"features", "event_ts"} + for op in ops: + assert op["key"] == "old_fv" + assert op["return_type"] == aerospike.MAP_RETURN_NONE + fake_scan.execute_background.assert_called_once() + + +def test_update_multi_fv_coalesces_into_one_scan(): + store = AerospikeOnlineStore() + fake_client = MagicMock() + fake_scan = MagicMock() + fake_client.scan.return_value = fake_scan + store._client = fake_client + + tables = [SimpleNamespace(name=n) for n in ("a", "b", "c")] + store.update(_aerospike_repo_config(), tables, [], [], [], partial=False) + + fake_client.scan.assert_called_once() + ops = fake_scan.add_ops.call_args[0][0] + assert len(ops) == 6 + assert {op["bin"] for op in ops} == {"features", "event_ts"} + assert {op["key"] for op in ops} == {"a", "b", "c"} + fake_scan.execute_background.assert_called_once() + + +def test_update_rejects_non_aerospike_config(): + wrong_config = RepoConfig( + project="demo", + provider="local", + registry="/tmp/reg.db", + online_store={"type": "sqlite", "path": "/tmp/online.db"}, + entity_key_serialization_version=3, + ) + store = AerospikeOnlineStore() + with pytest.raises(RuntimeError): + store.update( + wrong_config, + [SimpleNamespace(name="x")], + [], + [], + [], + partial=False, + ) + + +def test_teardown_truncates_and_closes_client(): + store = AerospikeOnlineStore() + fake_client = MagicMock() + store._client = fake_client + store.teardown(_aerospike_repo_config(), [], []) + assert fake_client.truncate.call_args[0] == ("feast", "demo_latest", 0) + fake_client.close.assert_called_once() + assert store._client is None + + +def test_teardown_rejects_non_aerospike_config(): + wrong_config = RepoConfig( + project="demo", + provider="local", + registry="/tmp/reg.db", + online_store={"type": "sqlite", "path": "/tmp/online.db"}, + entity_key_serialization_version=3, + ) + store = AerospikeOnlineStore() + with pytest.raises(RuntimeError): + store.teardown(wrong_config, [], []) + + +# --------------------------------------------------------------------------- +# Per-feature-view namespace + set overrides +# --------------------------------------------------------------------------- +# These let a deployment pin individual feature views to RAM-only / SSD-backed +# namespaces or isolate one view in its own set without splitting projects. +# Coverage: +# * the ``_namespace_for_fv`` / ``_set_name`` resolvers +# * read + write paths actually use the resolved ns/set on the wire +# * ``update()`` issues one scan per (ns, set) group +# * ``teardown()`` truncates every unique (ns, set) pair + + +def test_set_name_default_when_no_overrides(): + config = _aerospike_repo_config() + store = AerospikeOnlineStore() + assert store._set_name(config) == "demo_latest" + assert store._set_name(config, "any_fv") == "demo_latest" + + +def test_set_name_resolves_per_fv_override(): + config = _aerospike_repo_config( + set_overrides={"hot_fv": "demo_hot", "cold_fv": "demo_cold"} + ) + store = AerospikeOnlineStore() + assert store._set_name(config, "hot_fv") == "demo_hot" + assert store._set_name(config, "cold_fv") == "demo_cold" + # Unmapped FV still falls back to the project-level default. + assert store._set_name(config, "other_fv") == "demo_latest" + # Calling without an FV name explicitly asks for the project default. + assert store._set_name(config) == "demo_latest" + + +def test_namespace_for_fv_default_when_no_overrides(): + config = _aerospike_repo_config() + store = AerospikeOnlineStore() + assert store._namespace_for_fv(config) == "feast" + assert store._namespace_for_fv(config, "any_fv") == "feast" + + +def test_namespace_for_fv_resolves_per_fv_override(): + config = _aerospike_repo_config( + namespace_overrides={"hot_fv": "feast_ram", "cold_fv": "feast_ssd"} + ) + store = AerospikeOnlineStore() + assert store._namespace_for_fv(config, "hot_fv") == "feast_ram" + assert store._namespace_for_fv(config, "cold_fv") == "feast_ssd" + assert store._namespace_for_fv(config, "other_fv") == "feast" + + +def test_aerospike_key_honours_overrides_when_fv_passed(): + """``_aerospike_key`` is the choke point for everything that builds an + Aerospike (ns, set, user_key) tuple — it must respect the overrides + when an FV name is provided, and stay backwards-compatible without.""" + config = _aerospike_repo_config( + namespace_overrides={"hot_fv": "feast_ram"}, + set_overrides={"hot_fv": "demo_hot"}, + ) + store = AerospikeOnlineStore() + ek = _entity_key("driver_id", 1) + + default_ns, default_set, _ = store._aerospike_key(config, ek) + assert (default_ns, default_set) == ("feast", "demo_latest") + + hot_ns, hot_set, _ = store._aerospike_key(config, ek, fv_name="hot_fv") + assert (hot_ns, hot_set) == ("feast_ram", "demo_hot") + + +def test_online_write_batch_writes_to_per_fv_ns_and_set(): + """The wire keys produced by online_write_batch must use the FV's + resolved ns + set, not the store-level defaults.""" + config = _aerospike_repo_config( + namespace_overrides={"driver_stats": "feast_ram"}, + set_overrides={"driver_stats": "demo_hot"}, + ) + fv = SimpleNamespace(name="driver_stats") + store = AerospikeOnlineStore() + fake_client = MagicMock() + captured: dict = {} + + def fake_batch_write(batch): + captured["batch"] = batch + + fake_client.batch_write.side_effect = fake_batch_write + store._client = fake_client + + row = ( + _entity_key("id", 1), + {"x": ValueProto(int64_val=1)}, + datetime(2026, 1, 1, tzinfo=timezone.utc), + None, + ) + store.online_write_batch(config, fv, [row], progress=None) + + bw = captured["batch"].batch_records[0] + assert bw.key[:2] == ("feast_ram", "demo_hot") + + +def test_online_read_uses_per_fv_ns_and_set(): + """The ``batch_operate`` keys must use the FV's resolved ns + set.""" + config = _aerospike_repo_config( + namespace_overrides={"driver_stats": "feast_ram"}, + set_overrides={"driver_stats": "demo_hot"}, + ) + fv = _read_feature_view() # name="driver_stats" + store = AerospikeOnlineStore() + captured: dict = {} + + def fake_batch_operate(keys, ops): + captured["keys"] = keys + return SimpleNamespace(batch_records=[_fake_batch_record(keys[0], None)]) + + fake_client = MagicMock() + fake_client.batch_operate.side_effect = fake_batch_operate + store._client = fake_client + + store.online_read(config, fv, [_entity_key("driver_id", 1)]) + assert len(captured["keys"]) == 1 + assert captured["keys"][0][:2] == ("feast_ram", "demo_hot") + + +def test_update_groups_dropped_fvs_by_resolved_ns_and_set(): + """Dropped feature views in different (ns, set) buckets must produce + one background scan per bucket — a single combined scan would either + miss records or pointlessly scan unrelated namespaces.""" + config = _aerospike_repo_config( + namespace_overrides={"a": "ns_x"}, # b, c land on default ns + set_overrides={"b": "set_y"}, # a, c land on default set + ) + store = AerospikeOnlineStore() + fake_client = MagicMock() + + fake_scans: list = [] + + def make_scan(ns, set_name): + s = MagicMock(name=f"scan_{ns}_{set_name}") + s._ns = ns + s._set = set_name + fake_scans.append(s) + return s + + fake_client.scan.side_effect = make_scan + store._client = fake_client + + tables = [ + SimpleNamespace(name="a"), # (ns_x, demo_latest) + SimpleNamespace(name="b"), # (feast, set_y) + SimpleNamespace(name="c"), # (feast, demo_latest) + ] + store.update(config, tables, [], [], [], partial=False) + + targets = {(s._ns, s._set) for s in fake_scans} + assert targets == { + ("ns_x", "demo_latest"), + ("feast", "set_y"), + ("feast", "demo_latest"), + } + # Every group's scan must execute and ship the matching FVs' remove ops. + by_target = {(s._ns, s._set): s for s in fake_scans} + for tgt, fv_name in [ + (("ns_x", "demo_latest"), "a"), + (("feast", "set_y"), "b"), + (("feast", "demo_latest"), "c"), + ]: + scan = by_target[tgt] + ops = scan.add_ops.call_args[0][0] + assert {op["bin"] for op in ops} == {"features", "event_ts"} + assert all(op["key"] == fv_name for op in ops) + scan.execute_background.assert_called_once() + + +def test_update_coalesces_fvs_sharing_a_resolved_target(): + """Two dropped FVs that resolve to the same (ns, set) must share one + scan — the per-FV grouping is "by target", not "by FV name".""" + config = _aerospike_repo_config( + namespace_overrides={"a": "ns_x", "b": "ns_x"}, + set_overrides={"a": "set_y", "b": "set_y"}, + ) + store = AerospikeOnlineStore() + fake_client = MagicMock() + fake_scan = MagicMock() + fake_client.scan.return_value = fake_scan + store._client = fake_client + + store.update( + config, + [SimpleNamespace(name="a"), SimpleNamespace(name="b")], + [], + [], + [], + partial=False, + ) + + fake_client.scan.assert_called_once_with("ns_x", "set_y") + ops = fake_scan.add_ops.call_args[0][0] + # 4 ops total: features+event_ts for each of the 2 FVs. + assert len(ops) == 4 + assert {op["key"] for op in ops} == {"a", "b"} + + +def test_teardown_truncates_default_plus_every_overridden_pair(): + config = _aerospike_repo_config( + namespace_overrides={"hot": "feast_ram"}, + set_overrides={"isolated": "demo_iso"}, + ) + store = AerospikeOnlineStore() + fake_client = MagicMock() + store._client = fake_client + + tables = [ + SimpleNamespace(name="hot"), # (feast_ram, demo_latest) + SimpleNamespace(name="isolated"), # (feast, demo_iso) + SimpleNamespace(name="default"), # (feast, demo_latest) + ] + store.teardown(config, tables, []) + + truncated = {tuple(call.args[:2]) for call in fake_client.truncate.call_args_list} + assert truncated == { + ("feast", "demo_latest"), + ("feast_ram", "demo_latest"), + ("feast", "demo_iso"), + } + # cutoff is always 0 ("drop everything regardless of last-update time") + for call in fake_client.truncate.call_args_list: + assert call.args[2] == 0 + fake_client.close.assert_called_once() + + +def test_teardown_with_empty_tables_still_clears_default_pair(): + """A teardown call with an empty tables list (e.g. a bare ``feast + teardown`` on a repo whose registry is empty) must still clear the + store-default location, otherwise stale data lingers.""" + config = _aerospike_repo_config( + namespace_overrides={"unused": "feast_ram"}, + ) + store = AerospikeOnlineStore() + fake_client = MagicMock() + store._client = fake_client + + store.teardown(config, [], []) + + truncated = {tuple(call.args[:2]) for call in fake_client.truncate.call_args_list} + assert truncated == {("feast", "demo_latest")} + + +# --------------------------------------------------------------------------- +# Prewriting hook +# --------------------------------------------------------------------------- +# Hook-target functions must be module-level so the import-string resolver +# can find them via importlib + getattr. They're referenced by f"{__name__}.X". + + +def _hook_uppercase_string_features(config, table, data): # noqa: ARG001 + """Sample hook: uppercase every string ValueProto.string_val on every row.""" + new_data = [] + for entity_key, values, ts, created in data: + new_values = {} + for k, v in values.items(): + if v.HasField("string_val"): + new_values[k] = ValueProto(string_val=v.string_val.upper()) + else: + new_values[k] = v + new_data.append((entity_key, new_values, ts, created)) + return new_data + + +def _hook_drop_all_rows(config, table, data): # noqa: ARG001 + """Sample hook that filters every row — exercises the post-hook empty-batch path.""" + return [] + + +def _hook_raises(config, table, data): # noqa: ARG001 + raise ValueError("hook intentionally failed") + + +_NOT_A_FUNCTION = 42 + + +def test_prewriting_hook_unset_returns_none(): + config = _aerospike_repo_config() # no prewriting_hook + store = AerospikeOnlineStore() + assert store._resolve_prewriting_hook(config) is None + # Must clear any previously-cached value so a config swap takes effect. + store._prewriting_hook = lambda *a, **kw: None # type: ignore[assignment] + store._prewriting_hook_spec = "stale.spec" + store._resolve_prewriting_hook(config) + assert store._prewriting_hook is None + assert store._prewriting_hook_spec is None + + +def test_prewriting_hook_resolves_and_caches(): + spec = f"{__name__}._hook_uppercase_string_features" + config = _aerospike_repo_config(prewriting_hook=spec) + store = AerospikeOnlineStore() + + hook = store._resolve_prewriting_hook(config) + assert hook is _hook_uppercase_string_features + + # Second call must hit the cache (no re-import). + with patch( + "feast.infra.online_stores.aerospike_online_store.aerospike.importlib" + ) as m: + again = store._resolve_prewriting_hook(config) + assert again is hook + assert not m.import_module.called + + +def test_prewriting_hook_recompiles_after_spec_change(): + """If the config is rebound to a new hook string, the resolver must + re-resolve instead of returning the cached old callable.""" + store = AerospikeOnlineStore() + + cfg1 = _aerospike_repo_config( + prewriting_hook=f"{__name__}._hook_uppercase_string_features" + ) + assert store._resolve_prewriting_hook(cfg1) is _hook_uppercase_string_features + + cfg2 = _aerospike_repo_config(prewriting_hook=f"{__name__}._hook_drop_all_rows") + assert store._resolve_prewriting_hook(cfg2) is _hook_drop_all_rows + + +def test_prewriting_hook_bad_format_raises(): + config = _aerospike_repo_config(prewriting_hook="no_dot_here") + store = AerospikeOnlineStore() + with pytest.raises(ValueError, match="fully qualified import path"): + store._resolve_prewriting_hook(config) + + +def test_prewriting_hook_missing_module_raises(): + config = _aerospike_repo_config(prewriting_hook="definitely_not_a_real.module.fn") + store = AerospikeOnlineStore() + with pytest.raises(ValueError, match="could not import module"): + store._resolve_prewriting_hook(config) + + +def test_prewriting_hook_missing_attribute_raises(): + config = _aerospike_repo_config(prewriting_hook=f"{__name__}._does_not_exist") + store = AerospikeOnlineStore() + with pytest.raises(ValueError, match="has no attribute"): + store._resolve_prewriting_hook(config) + + +def test_prewriting_hook_non_callable_raises(): + config = _aerospike_repo_config(prewriting_hook=f"{__name__}._NOT_A_FUNCTION") + store = AerospikeOnlineStore() + with pytest.raises(TypeError, match="non-callable"): + store._resolve_prewriting_hook(config) + + +def test_prewriting_hook_transforms_data_before_write(): + """End-to-end: the hook's output is what lands on the wire, not the + caller-supplied data.""" + config = _aerospike_repo_config( + prewriting_hook=f"{__name__}._hook_uppercase_string_features" + ) + fv = SimpleNamespace(name="fv") + store = AerospikeOnlineStore() + fake_client = MagicMock() + captured: dict = {} + + def fake_batch_write(batch): + captured["batch"] = batch + + fake_client.batch_write.side_effect = fake_batch_write + store._client = fake_client + + row = ( + _entity_key("id", 1), + {"name": ValueProto(string_val="alice")}, + datetime(2026, 1, 1, tzinfo=timezone.utc), + None, + ) + store.online_write_batch(config, fv, [row], progress=None) + + bw = captured["batch"].batch_records[0] + map_put_op = next(op for op in bw.ops if op["bin"] == "features") + # The hook upper-cases string features. ``map_put_items`` stores its + # payload under the ``val`` key (mapping FV name -> feature submap). + fv_map = map_put_op["val"]["fv"] + assert fv_map == {"name": "ALICE"} + + +def test_prewriting_hook_returning_empty_short_circuits(): + """A hook that filters every row must skip the wire call and report + progress=0 — same shape as the empty-input fast path.""" + config = _aerospike_repo_config(prewriting_hook=f"{__name__}._hook_drop_all_rows") + fv = SimpleNamespace(name="fv") + store = AerospikeOnlineStore() + fake_client = MagicMock() + store._client = fake_client + + progress_calls: list[int] = [] + row = ( + _entity_key("id", 1), + {"x": ValueProto(int64_val=1)}, + datetime(2026, 1, 1, tzinfo=timezone.utc), + None, + ) + store.online_write_batch(config, fv, [row], progress=progress_calls.append) + + assert not fake_client.batch_write.called + assert progress_calls == [0] + + +def test_prewriting_hook_propagates_exceptions(): + """A raising hook must fail the whole batch — there's no per-row + fallback. ``batch_write`` must never be called.""" + config = _aerospike_repo_config(prewriting_hook=f"{__name__}._hook_raises") + fv = SimpleNamespace(name="fv") + store = AerospikeOnlineStore() + fake_client = MagicMock() + store._client = fake_client + + row = ( + _entity_key("id", 1), + {"x": ValueProto(int64_val=1)}, + datetime(2026, 1, 1, tzinfo=timezone.utc), + None, + ) + with pytest.raises(ValueError, match="hook intentionally failed"): + store.online_write_batch(config, fv, [row], progress=None) + assert not fake_client.batch_write.called + + +def test_prewriting_hook_skipped_for_empty_input(): + """The hook resolver must not even be invoked when ``data`` is empty — + avoids paying the import cost for no-op writes.""" + config = _aerospike_repo_config( + prewriting_hook="should.never.resolve" # would raise if resolved + ) + fv = SimpleNamespace(name="fv") + store = AerospikeOnlineStore() + fake_client = MagicMock() + store._client = fake_client + + progress_calls: list[int] = [] + store.online_write_batch(config, fv, [], progress=progress_calls.append) + assert not fake_client.batch_write.called + assert progress_calls == [0] + + +# --------------------------------------------------------------------------- +# Async wrappers (run_in_executor) +# --------------------------------------------------------------------------- + + +async def test_online_write_batch_async_delegates_to_sync(): + """The async write path must produce identical side-effects to the sync one.""" + config = _aerospike_repo_config() + fv = SimpleNamespace(name="fv") + store = AerospikeOnlineStore() + fake_client = MagicMock() + store._client = fake_client + + row = ( + _entity_key("id", 1), + {"x": ValueProto(int64_val=1)}, + datetime(2026, 1, 1, tzinfo=timezone.utc), + None, + ) + progress_calls: list[int] = [] + await store.online_write_batch_async( + config, fv, [row], progress=progress_calls.append + ) + + assert fake_client.batch_write.called + assert progress_calls == [1] + + +async def test_online_read_async_returns_same_shape_as_sync(): + config = _aerospike_repo_config() + fv = _read_feature_view() + store = AerospikeOnlineStore() + ts = datetime(2026, 4, 20, tzinfo=timezone.utc) + ek = _entity_key("driver_id", 1) + key = store._aerospike_key(config, ek) + + def fake_batch_operate(keys, ops): + return SimpleNamespace( + batch_records=[ + _fake_batch_record( + key, + { + "features": {"rating": 4.91, "trips_last_7d": 132}, + "event_ts": _datetime_to_epoch_ms(ts), + }, + ) + ] + ) + + fake_client = MagicMock() + fake_client.batch_operate.side_effect = fake_batch_operate + store._client = fake_client + + results = await store.online_read_async(config, fv, [ek]) + assert len(results) == 1 + ts_out, feats = results[0] + assert ts_out == ts + assert feats["trips_last_7d"].int64_val == 132 + + +async def test_initialize_pre_warms_client(): + """initialize() must cause the client to connect without needing a read/write.""" + config = _aerospike_repo_config() + store = AerospikeOnlineStore() + assert store._client is None + + sentinel = MagicMock(name="warm_client") + store._get_client = MagicMock(return_value=sentinel) # type: ignore[assignment] + + await store.initialize(config) + store._get_client.assert_called_once_with(config) + + +async def test_close_is_noop_without_client(): + store = AerospikeOnlineStore() + assert store._client is None + await store.close() # must not raise + + +async def test_close_releases_client(): + store = AerospikeOnlineStore() + fake_client = MagicMock() + store._client = fake_client + + await store.close() + + fake_client.close.assert_called_once() + assert store._client is None + + +# --------------------------------------------------------------------------- +# End-to-end integration test — requires Docker +# --------------------------------------------------------------------------- + + +@pytest.fixture(scope="module") +def aerospike_container(): + """Start a real Aerospike CE container for end-to-end testing.""" + container = DockerContainer(AEROSPIKE_CE_IMAGE).with_exposed_ports("3000") + container.start() + wait_for_logs(container=container, predicate="migrations: complete", timeout=60) + yield container + container.stop() + + +@pytest.fixture +def aerospike_online_store_config(aerospike_container): + port = int(aerospike_container.get_exposed_port("3000")) + return { + "type": "aerospike", + "hosts": [("127.0.0.1", port)], + "namespace": "test", # default namespace shipped in the CE image + } + + +@_requires_docker +def test_aerospike_online_features(aerospike_online_store_config): + """Full round-trip: write via the provider, read via the feature store API.""" + runner = CliRunner() + with runner.local_repo( + get_example_repo("example_feature_repo_1.py"), + offline_store="file", + online_store="aerospike", + teardown=False, # container torn down by fixture + ) as store: + # Patch in the live container's port. + store.config.online_store.hosts = aerospike_online_store_config["hosts"] + store.config.online_store.namespace = aerospike_online_store_config["namespace"] + + driver_locations_fv = store.get_feature_view(name="driver_locations") + customer_profile_fv = store.get_feature_view(name="customer_profile") + customer_driver_combined_fv = store.get_feature_view( + name="customer_driver_combined" + ) + + provider = store._get_provider() + + driver_key = EntityKeyProto( + join_keys=["driver_id"], entity_values=[ValueProto(int64_val=1)] + ) + provider.online_write_batch( + config=store.config, + table=driver_locations_fv, + data=[ + ( + driver_key, + { + "lat": ValueProto(double_val=0.1), + "lon": ValueProto(string_val="1.0"), + }, + _utc_now(), + _utc_now(), + ) + ], + progress=None, + ) + + customer_key = EntityKeyProto( + join_keys=["customer_id"], entity_values=[ValueProto(string_val="5")] + ) + provider.online_write_batch( + config=store.config, + table=customer_profile_fv, + data=[ + ( + customer_key, + { + "avg_orders_day": ValueProto(float_val=1.0), + "name": ValueProto(string_val="John"), + "age": ValueProto(int64_val=3), + }, + _utc_now(), + _utc_now(), + ) + ], + progress=None, + ) + + combined_key = EntityKeyProto( + join_keys=["customer_id", "driver_id"], + entity_values=[ValueProto(string_val="5"), ValueProto(int64_val=1)], + ) + provider.online_write_batch( + config=store.config, + table=customer_driver_combined_fv, + data=[ + ( + combined_key, + {"trips": ValueProto(int64_val=7)}, + _utc_now(), + _utc_now(), + ) + ], + progress=None, + ) + + result = store.get_online_features( + features=[ + "driver_locations:lon", + "customer_profile:avg_orders_day", + "customer_profile:name", + "customer_driver_combined:trips", + ], + entity_rows=[ + {"driver_id": 1, "customer_id": "5"}, + {"driver_id": 1, "customer_id": 5}, + ], + full_feature_names=False, + ).to_dict() + + assert result["driver_id"] == [1, 1] + assert result["customer_id"] == ["5", "5"] + assert result["lon"] == ["1.0", "1.0"] + assert result["avg_orders_day"] == [1.0, 1.0] + assert result["name"] == ["John", "John"] + assert result["trips"] == [7, 7] + + missing = store.get_online_features( + features=["customer_driver_combined:trips"], + entity_rows=[{"driver_id": 0, "customer_id": 0}], + full_feature_names=False, + ).to_dict() + assert missing["trips"] == [None] + + +# --------------------------------------------------------------------------- +# Integration tests that exercise the store directly (no CliRunner/apply). +# --------------------------------------------------------------------------- + + +def _integration_repo_config( + aerospike_online_store_config: dict, collection_suffix: str +) -> RepoConfig: + """Build a RepoConfig targeting the live container with an isolated set name. + + Each test passes a unique ``collection_suffix`` so the module-scoped + Aerospike container can host multiple tests without cross-contamination. + """ + return RepoConfig( + project="itest", + provider="local", + registry="/tmp/reg.db", + online_store={ + **aerospike_online_store_config, + "collection_suffix": collection_suffix, + }, + entity_key_serialization_version=3, + ) + + +def _multi_fv_feature_view(name: str) -> SimpleNamespace: + """Minimal FV duck-typed for the store: exposes .name and .features.""" + return SimpleNamespace( + name=name, + features=[ + SimpleNamespace(name="value", dtype=Int64), + ], + ) + + +@_requires_docker +def test_aerospike_cross_fv_map_cdt_upsert(aerospike_online_store_config): + """Writing two feature views for the same entity must not clobber each other. + + The store uses Aerospike Map CDT ops (``map_put_items``) rather than a + full-record ``put``, so two feature views sharing an entity key live in + separate slots of the ``features`` / ``event_ts`` maps. This test + exercises that guarantee end to end against a real server. + """ + config = _integration_repo_config(aerospike_online_store_config, "cross_fv") + store = AerospikeOnlineStore() + + fv_a = _multi_fv_feature_view("driver_stats") + fv_b = _multi_fv_feature_view("driver_geo") + ek = _entity_key("driver_id", 101) + ts = _utc_now() + + store.online_write_batch( + config, + fv_a, + [(ek, {"value": ValueProto(int64_val=42)}, ts, ts)], + progress=None, + ) + # Second write targets the SAME entity key under a different feature + # view — it must add a new map entry rather than overwrite fv_a. + store.online_write_batch( + config, + fv_b, + [(ek, {"value": ValueProto(int64_val=7)}, ts, ts)], + progress=None, + ) + + results_a = store.online_read(config, fv_a, [ek]) + results_b = store.online_read(config, fv_b, [ek]) + + assert len(results_a) == 1 and len(results_b) == 1 + _, feats_a = results_a[0] + _, feats_b = results_b[0] + assert feats_a is not None, "driver_stats was clobbered by the driver_geo write" + assert feats_b is not None, "driver_geo write did not produce a readable slot" + assert feats_a["value"].int64_val == 42 + assert feats_b["value"].int64_val == 7 + + # Sanity-check the raw record too: both feature views should coexist in + # the ``features`` Map CDT under their own names. + client = store._get_client(config) + _, _, bins = client.get(store._aerospike_key(config, ek)) + assert set(bins["features"].keys()) == {"driver_stats", "driver_geo"} + assert set(bins["event_ts"].keys()) == {"driver_stats", "driver_geo"} + + store.teardown(config, [], []) + + +@_requires_docker +def test_aerospike_update_strips_dropped_feature_view(aerospike_online_store_config): + """``update(tables_to_delete=[...])`` removes a FV's slot from every record. + + The store issues a single background scan that applies + ``map_remove_by_key`` to the ``features`` and ``event_ts`` bins for each + dropped feature view. We verify: + + * The dropped FV's slot disappears from the record (read returns + ``(None, None)``). + * The kept FV is untouched (still readable with its original values). + * The underlying record itself still exists (background scan strips map + entries, not whole records). + """ + config = _integration_repo_config(aerospike_online_store_config, "update") + store = AerospikeOnlineStore() + + fv_keep = _multi_fv_feature_view("keep_fv") + fv_drop = _multi_fv_feature_view("drop_fv") + ek = _entity_key("driver_id", 202) + ts = _utc_now() + + store.online_write_batch( + config, + fv_keep, + [(ek, {"value": ValueProto(int64_val=1)}, ts, ts)], + progress=None, + ) + store.online_write_batch( + config, + fv_drop, + [(ek, {"value": ValueProto(int64_val=999)}, ts, ts)], + progress=None, + ) + + # Pre-condition: both feature views readable. + assert store.online_read(config, fv_keep, [ek])[0][1] is not None + assert store.online_read(config, fv_drop, [ek])[0][1] is not None + + store.update( + config=config, + tables_to_delete=[fv_drop], + tables_to_keep=[fv_keep], + entities_to_delete=[], + entities_to_keep=[], + partial=False, + ) + + # The background scan is asynchronous server-side. Poll up to ~10s for + # the drop_fv slot to disappear before failing — this matches how a real + # Feast caller would experience the post-apply propagation delay. + deadline = time.monotonic() + 10.0 + drop_result = store.online_read(config, fv_drop, [ek])[0] + while drop_result != (None, None) and time.monotonic() < deadline: + time.sleep(0.1) + drop_result = store.online_read(config, fv_drop, [ek])[0] + assert drop_result == (None, None), ( + f"drop_fv slot was not cleared by background scan within 10s; " + f"last result: {drop_result!r}" + ) + + # keep_fv must still be present with its original value. + keep_ts, keep_feats = store.online_read(config, fv_keep, [ek])[0] + assert keep_feats is not None, "keep_fv was removed by the scan" + assert keep_feats["value"].int64_val == 1 + assert keep_ts is not None + + # The record itself still exists — only the dropped FV's map entries + # were removed. Verify directly via a raw get so we notice if a future + # change accidentally escalates to a full-record delete. + client = store._get_client(config) + _, _, bins = client.get(store._aerospike_key(config, ek)) + assert "drop_fv" not in bins["features"] + assert "drop_fv" not in bins["event_ts"] + assert "keep_fv" in bins["features"] + + store.teardown(config, [], []) diff --git a/sdk/python/tests/universal/feature_repos/universal/online_store/aerospike.py b/sdk/python/tests/universal/feature_repos/universal/online_store/aerospike.py new file mode 100644 index 00000000000..de28ad15163 --- /dev/null +++ b/sdk/python/tests/universal/feature_repos/universal/online_store/aerospike.py @@ -0,0 +1,58 @@ +# +# Copyright 2019 The Feast Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from typing import Any, Dict + +from testcontainers.core.container import DockerContainer +from testcontainers.core.waiting_utils import wait_for_logs + +from tests.universal.feature_repos.universal.online_store_creator import ( + OnlineStoreCreator, +) + +# Aerospike Community Edition — pin a known-good server image so CI stays +# deterministic; bump when Feast's Aerospike server baseline moves. +AEROSPIKE_CE_IMAGE = "aerospike/aerospike-server:8.0.0.10_1" + +# The Aerospike server prints this line once the default "test" namespace has +# finished starting up and the service is accepting client connections. +_READY_LOG_MARKER = "migrations: complete" + + +class AerospikeOnlineStoreCreator(OnlineStoreCreator): + """Spin up a single-node Aerospike CE cluster for universal online-store tests. + + The CE image ships a pre-configured namespace called ``test``, which we + reuse as the Feast namespace (avoiding the need to mount a custom + ``aerospike.conf``). + """ + + def __init__(self, project_name: str, **kwargs): + super().__init__(project_name) + self.container = DockerContainer(AEROSPIKE_CE_IMAGE).with_exposed_ports("3000") + + def create_online_store(self) -> Dict[str, Any]: + self.container.start() + wait_for_logs(container=self.container, predicate=_READY_LOG_MARKER, timeout=60) + exposed_port = int(self.container.get_exposed_port("3000")) + return { + "type": "aerospike", + "hosts": [("127.0.0.1", exposed_port)], + "namespace": "test", + } + + def teardown(self): + self.container.stop()