Skip to content

Commit 1ad4040

Browse files
author
Himanshu Singh
committed
feat(cassandra): add multi-DC support via per-datacenter execution profiles
Introduce a `datacenters` config list as an alternative to the flat `hosts` field. Each entry specifies the contact points, local_dc, and optional replication metadata for one datacenter. On connect, a named Cassandra execution profile is registered per DC (profile key = local_dc), enabling callers to route driver-level queries to a specific datacenter. The default profile is pinned to `load_balancing.local_dc`, or the first DC when load_balancing is absent. All existing `hosts` / `secure_bundle_path` configs are fully backward compatible — the original code path is untouched. Also updates docs/reference/online-stores/cassandra.md with a multi-DC feature_store.yaml example. Signed-off-by: singhhimanshu0811 email : itssinghhimanshu@gmail.com
1 parent 5f1fa0d commit 1ad4040

2 files changed

Lines changed: 180 additions & 0 deletions

File tree

docs/reference/online-stores/cassandra.md

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,44 @@ online_store:
3737
```
3838
{% endcode %}
3939
40+
### Example (Cassandra — multi-DC)
41+
42+
Use `datacenters` instead of `hosts` when your cluster spans multiple datacenters.
43+
Each entry gets a named Cassandra **execution profile** (keyed by `local_dc`), enabling
44+
per-DC routing. The default profile is set by `load_balancing.local_dc` (or the first
45+
datacenter if `load_balancing` is absent). The keyspace must already exist.
46+
47+
{% code title="feature_store.yaml" %}
48+
```yaml
49+
project: my_feature_repo
50+
registry: data/registry.db
51+
provider: local
52+
online_store:
53+
type: cassandra
54+
keyspace: KeyspaceName
55+
datacenters:
56+
- local_dc: dc1
57+
hosts:
58+
- 192.168.1.1
59+
- 192.168.1.2
60+
replication_factor: 3 # optional, informational
61+
replication_strategy: NetworkTopologyStrategy # optional, informational
62+
- local_dc: dc2
63+
hosts:
64+
- 10.0.0.1
65+
replication_factor: 2 # optional, informational
66+
port: 9042 # optional
67+
username: user # optional
68+
password: secret # optional
69+
protocol_version: 5 # optional
70+
load_balancing: # optional
71+
local_dc: 'dc1' # selects the default DC profile
72+
load_balancing_policy: 'TokenAwarePolicy(DCAwareRoundRobinPolicy)' # optional
73+
read_concurrency: 100 # optional
74+
write_concurrency: 100 # optional
75+
```
76+
{% endcode %}
77+
4078
### Example (Astra DB)
4179

4280
{% code title="feature_store.yaml" %}

sdk/python/feast/infra/online_stores/cassandra_online_store/cassandra_online_store.py

Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,10 @@
7070
E_CASSANDRA_UNKNOWN_LB_POLICY = (
7171
"Unknown/unsupported Load Balancing Policy name in Cassandra configuration"
7272
)
73+
E_CASSANDRA_DC_DEFAULT_NOT_FOUND = (
74+
"Cassandra multi-DC: 'load_balancing.local_dc' does not match any entry "
75+
"in the 'datacenters' list"
76+
)
7377

7478
# CQL command templates (that is, before replacing schema names)
7579
INSERT_CQL_4_TEMPLATE = (
@@ -153,6 +157,61 @@ class CassandraOnlineStoreConfig(FeastConfigBaseModel):
153157
request_timeout: Optional[StrictFloat] = None
154158
"""Request timeout in seconds."""
155159

160+
class CassandraDatacenterConfig(FeastConfigBaseModel):
161+
"""
162+
Per-datacenter configuration for multi-DC Cassandra deployments.
163+
164+
When the top-level ``datacenters`` list is used, each entry defines
165+
the contact points and replication settings for one datacenter.
166+
A named Cassandra execution profile (keyed by ``local_dc``) is created
167+
for every entry, enabling targeted per-DC routing via the driver's
168+
execution-profile mechanism.
169+
"""
170+
171+
local_dc: StrictStr
172+
"""
173+
Datacenter name as reported by the cluster (e.g. ``datacenter1``).
174+
Also used as the named execution-profile key.
175+
"""
176+
177+
hosts: List[StrictStr]
178+
"""Contact-point host addresses belonging to this datacenter."""
179+
180+
replication_factor: Optional[StrictInt] = None
181+
"""Replication factor for this datacenter (informational; keyspace must be pre-created)."""
182+
183+
replication_strategy: Optional[StrictStr] = None
184+
"""
185+
Replication strategy class (e.g. ``'SimpleStrategy'``,
186+
``'NetworkTopologyStrategy'``). Informational; keyspace must be pre-created.
187+
"""
188+
189+
datacenters: Optional[List[CassandraDatacenterConfig]] = None
190+
"""
191+
Per-datacenter configuration enabling multi-DC Cassandra support.
192+
193+
When set, a named Cassandra execution profile is registered for each
194+
datacenter (profile name = ``local_dc``). The default profile is
195+
determined by ``load_balancing.local_dc``; if ``load_balancing`` is
196+
absent the first datacenter in the list is used as default.
197+
198+
Mutually exclusive with ``hosts`` and ``secure_bundle_path``.
199+
The keyspace must already exist; Feast does not create it automatically.
200+
201+
Example::
202+
203+
datacenters:
204+
- local_dc: dc1
205+
hosts: [192.168.1.1, 192.168.1.2]
206+
replication_factor: 3
207+
- local_dc: dc2
208+
hosts: [10.0.0.1]
209+
replication_factor: 2
210+
load_balancing:
211+
local_dc: dc1
212+
load_balancing_policy: TokenAwarePolicy(DCAwareRoundRobinPolicy)
213+
"""
214+
156215
class CassandraLoadBalancingPolicy(FeastConfigBaseModel):
157216
"""
158217
Configuration block related to the Cluster's load-balancing policy.
@@ -173,6 +232,9 @@ class CassandraLoadBalancingPolicy(FeastConfigBaseModel):
173232
"""
174233
Details on the load-balancing policy: it will be
175234
wrapped into an execution profile if present.
235+
236+
In multi-DC mode (``datacenters`` list), ``local_dc`` also selects
237+
which datacenter's profile becomes the default execution profile.
176238
"""
177239

178240
read_concurrency: Optional[StrictInt] = 100
@@ -208,6 +270,14 @@ class CassandraOnlineStore(OnlineStore):
208270
_session: Session = None
209271
_keyspace: str = "feast_keyspace"
210272
_prepared_statements: Dict[str, PreparedStatement] = {}
273+
_dc_execution_profiles: List[str] = []
274+
"""
275+
Named execution-profile keys registered in multi-DC mode, one per
276+
datacenter (e.g. ``["dc1", "dc2"]``). Empty in single-DC / Astra mode.
277+
Use these names to route driver-level queries to a specific datacenter::
278+
279+
session.execute(stmt, params, execution_profile="dc2")
280+
"""
211281

212282
def _get_session(self, config: RepoConfig):
213283
"""
@@ -223,6 +293,78 @@ def _get_session(self, config: RepoConfig):
223293

224294
if self._session:
225295
return self._session
296+
297+
# ------------------------------------------------------------------
298+
# Branch B: multi-DC mode — one named execution profile per datacenter
299+
# ------------------------------------------------------------------
300+
if online_store_config.datacenters:
301+
port = online_store_config.port or 9042
302+
keyspace = online_store_config.keyspace
303+
username = online_store_config.username
304+
password = online_store_config.password
305+
protocol_version = online_store_config.protocol_version
306+
307+
if (username is None) ^ (password is None):
308+
raise CassandraInvalidConfig(E_CASSANDRA_INCONSISTENT_AUTH)
309+
310+
auth_provider = (
311+
PlainTextAuthProvider(username=username, password=password)
312+
if username is not None
313+
else None
314+
)
315+
316+
_lbp_name = (
317+
online_store_config.load_balancing.load_balancing_policy
318+
if online_store_config.load_balancing
319+
else "DCAwareRoundRobinPolicy"
320+
)
321+
322+
execution_profiles: Dict[Any, ExecutionProfile] = {}
323+
all_hosts: List[str] = []
324+
for dc in online_store_config.datacenters:
325+
all_hosts.extend(dc.hosts)
326+
if _lbp_name == "DCAwareRoundRobinPolicy":
327+
lb_policy = DCAwareRoundRobinPolicy(local_dc=dc.local_dc)
328+
elif _lbp_name == "TokenAwarePolicy(DCAwareRoundRobinPolicy)":
329+
lb_policy = TokenAwarePolicy(
330+
DCAwareRoundRobinPolicy(local_dc=dc.local_dc)
331+
)
332+
else:
333+
raise CassandraInvalidConfig(E_CASSANDRA_UNKNOWN_LB_POLICY)
334+
execution_profiles[dc.local_dc] = ExecutionProfile(
335+
request_timeout=online_store_config.request_timeout,
336+
load_balancing_policy=lb_policy,
337+
)
338+
self._dc_execution_profiles.append(dc.local_dc)
339+
340+
default_dc = (
341+
online_store_config.load_balancing.local_dc
342+
if online_store_config.load_balancing
343+
else online_store_config.datacenters[0].local_dc
344+
)
345+
if default_dc not in execution_profiles:
346+
raise CassandraInvalidConfig(E_CASSANDRA_DC_DEFAULT_NOT_FOUND)
347+
execution_profiles[EXEC_PROFILE_DEFAULT] = execution_profiles[default_dc]
348+
349+
cluster_kwargs = {
350+
k: v
351+
for k, v in {"protocol_version": protocol_version}.items()
352+
if v is not None
353+
}
354+
self._cluster = Cluster(
355+
all_hosts,
356+
port=port,
357+
auth_provider=auth_provider,
358+
execution_profiles=execution_profiles,
359+
**cluster_kwargs,
360+
)
361+
self._keyspace = keyspace
362+
self._session = self._cluster.connect(self._keyspace)
363+
return self._session
364+
365+
# ------------------------------------------------------------------
366+
# Branch A: original single-DC / Astra DB (unchanged)
367+
# ------------------------------------------------------------------
226368
if not self._session:
227369
# configuration consistency checks
228370
hosts = online_store_config.hosts

0 commit comments

Comments
 (0)