7070E_CASSANDRA_UNKNOWN_LB_POLICY = (
7171 "Unknown/unsupported Load Balancing Policy name in Cassandra configuration"
7272)
73+ E_CASSANDRA_DC_DEFAULT_NOT_FOUND = (
74+ "Cassandra multi-DC: 'load_balancing.local_dc' does not match any entry "
75+ "in the 'datacenters' list"
76+ )
7377
7478# CQL command templates (that is, before replacing schema names)
7579INSERT_CQL_4_TEMPLATE = (
@@ -153,6 +157,61 @@ class CassandraOnlineStoreConfig(FeastConfigBaseModel):
153157 request_timeout : Optional [StrictFloat ] = None
154158 """Request timeout in seconds."""
155159
160+ class CassandraDatacenterConfig (FeastConfigBaseModel ):
161+ """
162+ Per-datacenter configuration for multi-DC Cassandra deployments.
163+
164+ When the top-level ``datacenters`` list is used, each entry defines
165+ the contact points and replication settings for one datacenter.
166+ A named Cassandra execution profile (keyed by ``local_dc``) is created
167+ for every entry, enabling targeted per-DC routing via the driver's
168+ execution-profile mechanism.
169+ """
170+
171+ local_dc : StrictStr
172+ """
173+ Datacenter name as reported by the cluster (e.g. ``datacenter1``).
174+ Also used as the named execution-profile key.
175+ """
176+
177+ hosts : List [StrictStr ]
178+ """Contact-point host addresses belonging to this datacenter."""
179+
180+ replication_factor : Optional [StrictInt ] = None
181+ """Replication factor for this datacenter (informational; keyspace must be pre-created)."""
182+
183+ replication_strategy : Optional [StrictStr ] = None
184+ """
185+ Replication strategy class (e.g. ``'SimpleStrategy'``,
186+ ``'NetworkTopologyStrategy'``). Informational; keyspace must be pre-created.
187+ """
188+
189+ datacenters : Optional [List [CassandraDatacenterConfig ]] = None
190+ """
191+ Per-datacenter configuration enabling multi-DC Cassandra support.
192+
193+ When set, a named Cassandra execution profile is registered for each
194+ datacenter (profile name = ``local_dc``). The default profile is
195+ determined by ``load_balancing.local_dc``; if ``load_balancing`` is
196+ absent the first datacenter in the list is used as default.
197+
198+ Mutually exclusive with ``hosts`` and ``secure_bundle_path``.
199+ The keyspace must already exist; Feast does not create it automatically.
200+
201+ Example::
202+
203+ datacenters:
204+ - local_dc: dc1
205+ hosts: [192.168.1.1, 192.168.1.2]
206+ replication_factor: 3
207+ - local_dc: dc2
208+ hosts: [10.0.0.1]
209+ replication_factor: 2
210+ load_balancing:
211+ local_dc: dc1
212+ load_balancing_policy: TokenAwarePolicy(DCAwareRoundRobinPolicy)
213+ """
214+
156215 class CassandraLoadBalancingPolicy (FeastConfigBaseModel ):
157216 """
158217 Configuration block related to the Cluster's load-balancing policy.
@@ -173,6 +232,9 @@ class CassandraLoadBalancingPolicy(FeastConfigBaseModel):
173232 """
174233 Details on the load-balancing policy: it will be
175234 wrapped into an execution profile if present.
235+
236+ In multi-DC mode (``datacenters`` list), ``local_dc`` also selects
237+ which datacenter's profile becomes the default execution profile.
176238 """
177239
178240 read_concurrency : Optional [StrictInt ] = 100
@@ -208,6 +270,14 @@ class CassandraOnlineStore(OnlineStore):
208270 _session : Session = None
209271 _keyspace : str = "feast_keyspace"
210272 _prepared_statements : Dict [str , PreparedStatement ] = {}
273+ _dc_execution_profiles : List [str ] = []
274+ """
275+ Named execution-profile keys registered in multi-DC mode, one per
276+ datacenter (e.g. ``["dc1", "dc2"]``). Empty in single-DC / Astra mode.
277+ Use these names to route driver-level queries to a specific datacenter::
278+
279+ session.execute(stmt, params, execution_profile="dc2")
280+ """
211281
212282 def _get_session (self , config : RepoConfig ):
213283 """
@@ -223,6 +293,78 @@ def _get_session(self, config: RepoConfig):
223293
224294 if self ._session :
225295 return self ._session
296+
297+ # ------------------------------------------------------------------
298+ # Branch B: multi-DC mode — one named execution profile per datacenter
299+ # ------------------------------------------------------------------
300+ if online_store_config .datacenters :
301+ port = online_store_config .port or 9042
302+ keyspace = online_store_config .keyspace
303+ username = online_store_config .username
304+ password = online_store_config .password
305+ protocol_version = online_store_config .protocol_version
306+
307+ if (username is None ) ^ (password is None ):
308+ raise CassandraInvalidConfig (E_CASSANDRA_INCONSISTENT_AUTH )
309+
310+ auth_provider = (
311+ PlainTextAuthProvider (username = username , password = password )
312+ if username is not None
313+ else None
314+ )
315+
316+ _lbp_name = (
317+ online_store_config .load_balancing .load_balancing_policy
318+ if online_store_config .load_balancing
319+ else "DCAwareRoundRobinPolicy"
320+ )
321+
322+ execution_profiles : Dict [Any , ExecutionProfile ] = {}
323+ all_hosts : List [str ] = []
324+ for dc in online_store_config .datacenters :
325+ all_hosts .extend (dc .hosts )
326+ if _lbp_name == "DCAwareRoundRobinPolicy" :
327+ lb_policy = DCAwareRoundRobinPolicy (local_dc = dc .local_dc )
328+ elif _lbp_name == "TokenAwarePolicy(DCAwareRoundRobinPolicy)" :
329+ lb_policy = TokenAwarePolicy (
330+ DCAwareRoundRobinPolicy (local_dc = dc .local_dc )
331+ )
332+ else :
333+ raise CassandraInvalidConfig (E_CASSANDRA_UNKNOWN_LB_POLICY )
334+ execution_profiles [dc .local_dc ] = ExecutionProfile (
335+ request_timeout = online_store_config .request_timeout ,
336+ load_balancing_policy = lb_policy ,
337+ )
338+ self ._dc_execution_profiles .append (dc .local_dc )
339+
340+ default_dc = (
341+ online_store_config .load_balancing .local_dc
342+ if online_store_config .load_balancing
343+ else online_store_config .datacenters [0 ].local_dc
344+ )
345+ if default_dc not in execution_profiles :
346+ raise CassandraInvalidConfig (E_CASSANDRA_DC_DEFAULT_NOT_FOUND )
347+ execution_profiles [EXEC_PROFILE_DEFAULT ] = execution_profiles [default_dc ]
348+
349+ cluster_kwargs = {
350+ k : v
351+ for k , v in {"protocol_version" : protocol_version }.items ()
352+ if v is not None
353+ }
354+ self ._cluster = Cluster (
355+ all_hosts ,
356+ port = port ,
357+ auth_provider = auth_provider ,
358+ execution_profiles = execution_profiles ,
359+ ** cluster_kwargs ,
360+ )
361+ self ._keyspace = keyspace
362+ self ._session = self ._cluster .connect (self ._keyspace )
363+ return self ._session
364+
365+ # ------------------------------------------------------------------
366+ # Branch A: original single-DC / Astra DB (unchanged)
367+ # ------------------------------------------------------------------
226368 if not self ._session :
227369 # configuration consistency checks
228370 hosts = online_store_config .hosts
0 commit comments