Skip to content

Commit eaf354c

Browse files
feat: Cassandra online store, concurrency in bulk write operations (feast-dev#3367)
Concurrency in materialize and any bulk write operation write_concurrency parameter in configuration and bootstrap guided procedure Signed-off-by: Stefano Lottini <stefano.lottini@datastax.com> Signed-off-by: Stefano Lottini <stefano.lottini@datastax.com> Co-authored-by: Stefano Lottini <stefano.lottini@datastax.com>
1 parent 0c20a4e commit eaf354c

File tree

5 files changed

+84
-56
lines changed

5 files changed

+84
-56
lines changed

docs/reference/online-stores/cassandra.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ online_store:
3333
local_dc: 'datacenter1' # optional
3434
load_balancing_policy: 'TokenAwarePolicy(DCAwareRoundRobinPolicy)' # optional
3535
read_concurrency: 100 # optional
36+
write_concurrency: 100 # optional
3637
```
3738
{% endcode %}
3839
@@ -54,6 +55,7 @@ online_store:
5455
local_dc: 'eu-central-1' # optional
5556
load_balancing_policy: 'TokenAwarePolicy(DCAwareRoundRobinPolicy)' # optional
5657
read_concurrency: 100 # optional
58+
write_concurrency: 100 # optional
5759
```
5860
{% endcode %}
5961

sdk/python/feast/infra/online_stores/contrib/cassandra_online_store/README.md

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ online_store:
5959
local_dc: 'datacenter1' # optional
6060
load_balancing_policy: 'TokenAwarePolicy(DCAwareRoundRobinPolicy)' # optional
6161
read_concurrency: 100 # optional
62+
write_concurrency: 100 # optional
6263
```
6364
6465
#### Astra DB setup:
@@ -86,6 +87,7 @@ online_store:
8687
local_dc: 'eu-central-1' # optional
8788
load_balancing_policy: 'TokenAwarePolicy(DCAwareRoundRobinPolicy)' # optional
8889
read_concurrency: 100 # optional
90+
write_concurrency: 100 # optional
8991
```
9092
9193
#### Protocol version and load-balancing settings
@@ -113,13 +115,13 @@ The former parameter is a region name for Astra DB instances (as can be verified
113115
See the source code of the online store integration for the allowed values of
114116
the latter parameter.
115117

116-
#### Read concurrency value
118+
#### Read/write concurrency value
117119

118-
You can optionally specify the value of `read_concurrency`, which will be
119-
passed to the Cassandra driver function handling
120-
[concurrent reading of multiple entities](https://docs.datastax.com/en/developer/python-driver/3.25/api/cassandra/concurrent/#module-cassandra.concurrent).
121-
Consult the reference for guidance on this parameter (which in most cases can be left to its default value of 100).
122-
This is relevant only for retrieval of several entities at once.
120+
You can optionally specify the value of `read_concurrency` and `write_concurrency`,
121+
which will be passed to the Cassandra driver function handling
122+
[concurrent reading/writing of multiple entities](https://docs.datastax.com/en/developer/python-driver/3.25/api/cassandra/concurrent/#module-cassandra.concurrent).
123+
Consult the reference for guidance on this parameter (which in most cases can be left to its default value of).
124+
This is relevant only for retrieval of several entities at once and during bulk writes, such as in the materialization step.
123125

124126
### More info
125127

sdk/python/feast/infra/online_stores/contrib/cassandra_online_store/cassandra_online_store.py

Lines changed: 49 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,15 @@ class CassandraLoadBalancingPolicy(FeastConfigBaseModel):
170170
read_concurrency: Optional[StrictInt] = 100
171171
"""
172172
Value of the `concurrency` parameter internally passed to Cassandra driver's
173-
`execute_concurrent_with_args ` call.
173+
`execute_concurrent_with_args` call when reading rows from tables.
174+
See https://docs.datastax.com/en/developer/python-driver/3.25/api/cassandra/concurrent/#module-cassandra.concurrent .
175+
Default: 100.
176+
"""
177+
178+
write_concurrency: Optional[StrictInt] = 100
179+
"""
180+
Value of the `concurrency` parameter internally passed to Cassandra driver's
181+
`execute_concurrent_with_args` call when writing rows to tables.
174182
See https://docs.datastax.com/en/developer/python-driver/3.25/api/cassandra/concurrent/#module-cassandra.concurrent .
175183
Default: 100.
176184
"""
@@ -327,21 +335,37 @@ def online_write_batch(
327335
display progress.
328336
"""
329337
project = config.project
330-
for entity_key, values, timestamp, created_ts in data:
331-
entity_key_bin = serialize_entity_key(
332-
entity_key,
333-
entity_key_serialization_version=config.entity_key_serialization_version,
334-
).hex()
335-
with tracing_span(name="remote_call"):
336-
self._write_rows(
337-
config,
338-
project,
339-
table,
340-
entity_key_bin,
341-
values.items(),
342-
timestamp,
343-
created_ts,
344-
)
338+
339+
def unroll_insertion_tuples() -> Iterable[Tuple[str, bytes, str, datetime]]:
340+
"""
341+
We craft an iterable over all rows to be inserted (entities->features),
342+
but this way we can call `progress` after each entity is done.
343+
"""
344+
for entity_key, values, timestamp, created_ts in data:
345+
entity_key_bin = serialize_entity_key(
346+
entity_key,
347+
entity_key_serialization_version=config.entity_key_serialization_version,
348+
).hex()
349+
for feature_name, val in values.items():
350+
params: Tuple[str, bytes, str, datetime] = (
351+
feature_name,
352+
val.SerializeToString(),
353+
entity_key_bin,
354+
timestamp,
355+
)
356+
yield params
357+
# this happens N-1 times, will be corrected outside:
358+
if progress:
359+
progress(1)
360+
361+
with tracing_span(name="remote_call"):
362+
self._write_rows_concurrently(
363+
config,
364+
project,
365+
table,
366+
unroll_insertion_tuples(),
367+
)
368+
# correction for the last missing call to `progress`:
345369
if progress:
346370
progress(1)
347371

@@ -458,39 +482,24 @@ def _fq_table_name(keyspace: str, project: str, table: FeatureView) -> str:
458482
"""
459483
return f'"{keyspace}"."{project}_{table.name}"'
460484

461-
def _write_rows(
485+
def _write_rows_concurrently(
462486
self,
463487
config: RepoConfig,
464488
project: str,
465489
table: FeatureView,
466-
entity_key_bin: str,
467-
features_vals: Iterable[Tuple[str, ValueProto]],
468-
timestamp: datetime,
469-
created_ts: Optional[datetime],
490+
rows: Iterable[Tuple[str, bytes, str, datetime]],
470491
):
471-
"""
472-
Handle the CQL (low-level) insertion of feature values to a table.
473-
474-
Note: `created_ts` can be None: in that case we avoid explicitly
475-
inserting it to prevent unnecessary tombstone creation on Cassandra.
476-
Note: `created_ts` is being deprecated (July 2022) and the following
477-
reflects this fact.
478-
"""
479492
session: Session = self._get_session(config)
480493
keyspace: str = self._keyspace
481494
fqtable = CassandraOnlineStore._fq_table_name(keyspace, project, table)
482495
insert_cql = self._get_cql_statement(config, "insert4", fqtable=fqtable)
483-
for feature_name, val in features_vals:
484-
params: Sequence[object] = (
485-
feature_name,
486-
val.SerializeToString(),
487-
entity_key_bin,
488-
timestamp,
489-
)
490-
session.execute(
491-
insert_cql,
492-
params,
493-
)
496+
#
497+
execute_concurrent_with_args(
498+
session,
499+
insert_cql,
500+
rows,
501+
concurrency=config.online_store.write_concurrency,
502+
)
494503

495504
def _read_rows_by_entity_keys(
496505
self,

sdk/python/feast/templates/cassandra/bootstrap.py

Lines changed: 23 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -122,11 +122,17 @@ def collect_cassandra_store_settings():
122122
c_local_dc = None
123123
c_load_balancing_policy = None
124124

125-
needs_concurrency = click.confirm("Specify read concurrency level?", default=False)
126-
if needs_concurrency:
127-
c_concurrency = click.prompt(" Concurrency level?", default=100, type=int)
125+
specify_concurrency = click.confirm("Specify concurrency levels?", default=False)
126+
if specify_concurrency:
127+
c_r_concurrency = click.prompt(
128+
" Read-concurrency level?", default=100, type=int
129+
)
130+
c_w_concurrency = click.prompt(
131+
" Write-concurrency level?", default=100, type=int
132+
)
128133
else:
129-
c_concurrency = None
134+
c_r_concurrency = None
135+
c_w_concurrency = None
130136

131137
return {
132138
"c_secure_bundle_path": c_secure_bundle_path,
@@ -138,7 +144,8 @@ def collect_cassandra_store_settings():
138144
"c_protocol_version": c_protocol_version,
139145
"c_local_dc": c_local_dc,
140146
"c_load_balancing_policy": c_load_balancing_policy,
141-
"c_concurrency": c_concurrency,
147+
"c_r_concurrency": c_r_concurrency,
148+
"c_w_concurrency": c_w_concurrency,
142149
}
143150

144151

@@ -156,7 +163,8 @@ def apply_cassandra_store_settings(config_file, settings):
156163
'c_protocol_version'
157164
'c_local_dc'
158165
'c_load_balancing_policy'
159-
'c_concurrency'
166+
'c_r_concurrency'
167+
'c_w_concurrency'
160168
"""
161169
write_setting_or_remove(
162170
config_file,
@@ -224,12 +232,18 @@ def apply_cassandra_store_settings(config_file, settings):
224232
remove_lines_from_file(config_file, "load_balancing:")
225233
remove_lines_from_file(config_file, "local_dc:")
226234
remove_lines_from_file(config_file, "load_balancing_policy:")
227-
#
235+
228236
write_setting_or_remove(
229237
config_file,
230-
settings["c_concurrency"],
238+
settings["c_r_concurrency"],
231239
"read_concurrency",
232-
"100",
240+
"c_r_concurrency",
241+
)
242+
write_setting_or_remove(
243+
config_file,
244+
settings["c_w_concurrency"],
245+
"write_concurrency",
246+
"c_w_concurrency",
233247
)
234248

235249

sdk/python/feast/templates/cassandra/feature_repo/feature_store.yaml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,5 +16,6 @@ online_store:
1616
load_balancing:
1717
local_dc: c_local_dc
1818
load_balancing_policy: c_load_balancing_policy
19-
read_concurrency: 100
19+
read_concurrency: c_r_concurrency
20+
write_concurrency: c_w_concurrency
2021
entity_key_serialization_version: 2

0 commit comments

Comments
 (0)