Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions docs/reference/online-stores/cassandra.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,44 @@ online_store:
```
{% endcode %}

### Example (Cassandra — multi-DC)

Use `datacenters` instead of `hosts` when your cluster spans multiple datacenters.
Each entry gets a named Cassandra **execution profile** (keyed by `local_dc`), enabling
per-DC routing. The default profile is set by `load_balancing.local_dc` (or the first
datacenter if `load_balancing` is absent). The keyspace must already exist.

{% code title="feature_store.yaml" %}
```yaml
project: my_feature_repo
registry: data/registry.db
provider: local
online_store:
type: cassandra
keyspace: KeyspaceName
datacenters:
- local_dc: dc1
hosts:
- 192.168.1.1
- 192.168.1.2
replication_factor: 3 # optional, informational
replication_strategy: NetworkTopologyStrategy # optional, informational
- local_dc: dc2
hosts:
- 10.0.0.1
replication_factor: 2 # optional, informational
port: 9042 # optional
username: user # optional
password: secret # optional
protocol_version: 5 # optional
load_balancing: # optional
local_dc: 'dc1' # selects the default DC profile
load_balancing_policy: 'TokenAwarePolicy(DCAwareRoundRobinPolicy)' # optional
read_concurrency: 100 # optional
write_concurrency: 100 # optional
```
{% endcode %}

### Example (Astra DB)

{% code title="feature_store.yaml" %}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@
E_CASSANDRA_UNKNOWN_LB_POLICY = (
"Unknown/unsupported Load Balancing Policy name in Cassandra configuration"
)
E_CASSANDRA_DC_DEFAULT_NOT_FOUND = (
"Cassandra multi-DC: 'load_balancing.local_dc' does not match any entry "
"in the 'datacenters' list"
)

# CQL command templates (that is, before replacing schema names)
INSERT_CQL_4_TEMPLATE = (
Expand Down Expand Up @@ -153,6 +157,61 @@ class CassandraOnlineStoreConfig(FeastConfigBaseModel):
request_timeout: Optional[StrictFloat] = None
"""Request timeout in seconds."""

class CassandraDatacenterConfig(FeastConfigBaseModel):
"""
Per-datacenter configuration for multi-DC Cassandra deployments.

When the top-level ``datacenters`` list is used, each entry defines
the contact points and replication settings for one datacenter.
A named Cassandra execution profile (keyed by ``local_dc``) is created
for every entry, enabling targeted per-DC routing via the driver's
execution-profile mechanism.
"""

local_dc: StrictStr
"""
Datacenter name as reported by the cluster (e.g. ``datacenter1``).
Also used as the named execution-profile key.
"""

hosts: List[StrictStr]
"""Contact-point host addresses belonging to this datacenter."""

replication_factor: Optional[StrictInt] = None
"""Replication factor for this datacenter (informational; keyspace must be pre-created)."""

replication_strategy: Optional[StrictStr] = None
"""
Replication strategy class (e.g. ``'SimpleStrategy'``,
``'NetworkTopologyStrategy'``). Informational; keyspace must be pre-created.
"""

datacenters: Optional[List[CassandraDatacenterConfig]] = None
"""
Per-datacenter configuration enabling multi-DC Cassandra support.

When set, a named Cassandra execution profile is registered for each
datacenter (profile name = ``local_dc``). The default profile is
determined by ``load_balancing.local_dc``; if ``load_balancing`` is
absent the first datacenter in the list is used as default.

Mutually exclusive with ``hosts`` and ``secure_bundle_path``.
The keyspace must already exist; Feast does not create it automatically.

Example::

datacenters:
- local_dc: dc1
hosts: [192.168.1.1, 192.168.1.2]
replication_factor: 3
- local_dc: dc2
hosts: [10.0.0.1]
replication_factor: 2
load_balancing:
local_dc: dc1
load_balancing_policy: TokenAwarePolicy(DCAwareRoundRobinPolicy)
"""

class CassandraLoadBalancingPolicy(FeastConfigBaseModel):
"""
Configuration block related to the Cluster's load-balancing policy.
Expand All @@ -173,6 +232,9 @@ class CassandraLoadBalancingPolicy(FeastConfigBaseModel):
"""
Details on the load-balancing policy: it will be
wrapped into an execution profile if present.

In multi-DC mode (``datacenters`` list), ``local_dc`` also selects
which datacenter's profile becomes the default execution profile.
"""

read_concurrency: Optional[StrictInt] = 100
Expand Down Expand Up @@ -208,6 +270,14 @@ class CassandraOnlineStore(OnlineStore):
_session: Session = None
_keyspace: str = "feast_keyspace"
_prepared_statements: Dict[str, PreparedStatement] = {}
_dc_execution_profiles: List[str] = []
"""
Named execution-profile keys registered in multi-DC mode, one per
datacenter (e.g. ``["dc1", "dc2"]``). Empty in single-DC / Astra mode.
Use these names to route driver-level queries to a specific datacenter::

session.execute(stmt, params, execution_profile="dc2")
"""

def _get_session(self, config: RepoConfig):
"""
Expand All @@ -223,6 +293,78 @@ def _get_session(self, config: RepoConfig):

if self._session:
return self._session

# ------------------------------------------------------------------
# Branch B: multi-DC mode — one named execution profile per datacenter
# ------------------------------------------------------------------
if online_store_config.datacenters:
port = online_store_config.port or 9042
keyspace = online_store_config.keyspace
username = online_store_config.username
password = online_store_config.password
protocol_version = online_store_config.protocol_version

if (username is None) ^ (password is None):
raise CassandraInvalidConfig(E_CASSANDRA_INCONSISTENT_AUTH)

auth_provider = (
PlainTextAuthProvider(username=username, password=password)
if username is not None
else None
)

_lbp_name = (
online_store_config.load_balancing.load_balancing_policy
if online_store_config.load_balancing
else "DCAwareRoundRobinPolicy"
)

execution_profiles: Dict[Any, ExecutionProfile] = {}
all_hosts: List[str] = []
for dc in online_store_config.datacenters:
all_hosts.extend(dc.hosts)
if _lbp_name == "DCAwareRoundRobinPolicy":
lb_policy = DCAwareRoundRobinPolicy(local_dc=dc.local_dc)
elif _lbp_name == "TokenAwarePolicy(DCAwareRoundRobinPolicy)":
lb_policy = TokenAwarePolicy(
DCAwareRoundRobinPolicy(local_dc=dc.local_dc)
)
else:
raise CassandraInvalidConfig(E_CASSANDRA_UNKNOWN_LB_POLICY)
execution_profiles[dc.local_dc] = ExecutionProfile(
request_timeout=online_store_config.request_timeout,
load_balancing_policy=lb_policy,
)
self._dc_execution_profiles.append(dc.local_dc)

default_dc = (
online_store_config.load_balancing.local_dc
if online_store_config.load_balancing
else online_store_config.datacenters[0].local_dc
)
if default_dc not in execution_profiles:
raise CassandraInvalidConfig(E_CASSANDRA_DC_DEFAULT_NOT_FOUND)
execution_profiles[EXEC_PROFILE_DEFAULT] = execution_profiles[default_dc]

cluster_kwargs = {
k: v
for k, v in {"protocol_version": protocol_version}.items()
if v is not None
}
self._cluster = Cluster(
all_hosts,
port=port,
auth_provider=auth_provider,
execution_profiles=execution_profiles,
**cluster_kwargs,
)
self._keyspace = keyspace
self._session = self._cluster.connect(self._keyspace)
return self._session

# ------------------------------------------------------------------
# Branch A: original single-DC / Astra DB (unchanged)
# ------------------------------------------------------------------
if not self._session:
# configuration consistency checks
hosts = online_store_config.hosts
Expand Down
Loading