@@ -946,15 +946,15 @@ def test_dynamodb_online_store_online_read_many_batches(dynamodb_online_store):
946946
947947
948948@mock_dynamodb
949- def test_dynamodb_online_store_max_workers_capped_at_10 (dynamodb_online_store ):
950- """Verify ThreadPoolExecutor max_workers is capped at 10, not batch_size .
949+ def test_dynamodb_online_store_max_workers_capped_at_config (dynamodb_online_store ):
950+ """Verify ThreadPoolExecutor max_workers uses max_read_workers config .
951951
952952 Bug: Old code used min(len(batches), batch_size) which fails with small batch_size.
953- Fix: New code uses min(len(batches), 10) to ensure proper parallelization.
953+ Fix: New code uses min(len(batches), max_read_workers) for proper parallelization.
954954
955955 This test uses batch_size=5 with 15 batches to expose the bug:
956956 - OLD (buggy): max_workers = min(15, 5) = 5 (insufficient parallelism)
957- - NEW (fixed): max_workers = min(15, 10) = 10 (correct cap )
957+ - NEW (fixed): max_workers = min(15, 10) = 10 (uses max_read_workers default )
958958 """
959959 # Use small batch_size to expose the bug
960960 small_batch_config = RepoConfig (
@@ -998,13 +998,14 @@ def test_dynamodb_online_store_max_workers_capped_at_10(dynamodb_online_store):
998998
999999
10001000@mock_dynamodb
1001- def test_dynamodb_online_store_thread_safety_new_resource_per_thread (
1001+ def test_dynamodb_online_store_thread_safety_uses_shared_client (
10021002 dynamodb_online_store ,
10031003):
1004- """Verify each thread creates its own boto3 resource for thread-safety .
1004+ """Verify multi-batch reads use a shared thread-safe boto3 client .
10051005
1006- boto3 resources are NOT thread-safe, so we must create a new resource
1007- per thread when using ThreadPoolExecutor.
1006+ boto3 clients ARE thread-safe, so we share a single client across threads
1007+ for better performance (avoids creating new sessions per thread).
1008+ https://docs.aws.amazon.com/boto3/latest/guide/clients.html#multithreading-or-multiprocessing-with-clients
10081009 """
10091010 config = RepoConfig (
10101011 registry = REGISTRY ,
@@ -1023,16 +1024,16 @@ def test_dynamodb_online_store_thread_safety_new_resource_per_thread(
10231024
10241025 entity_keys , features , * rest = zip (* data )
10251026
1026- # Track resources created to verify thread-safety
1027- resources_created = []
1028- original_init = boto3 .resource
1027+ # Track clients created to verify thread-safety via shared client
1028+ clients_created = []
1029+ original_client = boto3 .client
10291030
1030- def tracking_resource (* args , ** kwargs ):
1031- resource = original_init (* args , ** kwargs )
1032- resources_created .append (id (resource ))
1033- return resource
1031+ def tracking_client (* args , ** kwargs ):
1032+ client = original_client (* args , ** kwargs )
1033+ clients_created .append (id (client ))
1034+ return client
10341035
1035- with patch .object (boto3 , "resource " , side_effect = tracking_resource ):
1036+ with patch .object (boto3 , "client " , side_effect = tracking_client ):
10361037 returned_items = dynamodb_online_store .online_read (
10371038 config = config ,
10381039 table = MockFeatureView (name = db_table_name ),
@@ -1042,18 +1043,10 @@ def tracking_resource(*args, **kwargs):
10421043 # Verify results are correct (functional correctness)
10431044 assert len (returned_items ) == n_samples
10441045
1045- # Verify multiple resources were created (thread-safety)
1046- # Each of the 3 batches should create its own resource
1047- # (plus potentially 1 for _get_dynamodb_resource cache initialization)
1048- assert len (resources_created ) >= 3 , (
1049- f"Expected at least 3 unique resources for 3 batches, "
1050- f"got { len (resources_created )} "
1051- )
1052-
1053- # Verify the resources are actually different objects (not reused)
1054- # At least the batch resources should be unique
1055- unique_resources = set (resources_created )
1056- assert len (unique_resources ) >= 3 , (
1057- f"Expected at least 3 unique resource IDs, "
1058- f"got { len (unique_resources )} unique out of { len (resources_created )} "
1046+ # Verify only one client was created (shared across threads)
1047+ # The client is cached and reused for all batch requests
1048+ dynamodb_clients = [c for c in clients_created ]
1049+ assert len (set (dynamodb_clients )) == 1 , (
1050+ f"Expected 1 shared client for thread-safety, "
1051+ f"got { len (set (dynamodb_clients ))} unique clients"
10591052 )
0 commit comments