-
Notifications
You must be signed in to change notification settings - Fork 1.3k
perf: Parallelize read calls by table and batch #4619
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
Signed-off-by: Rob Howley <howley.robert@gmail.com>
- Loading branch information
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -302,6 +302,14 @@ async def online_read_async( | |
|
|
||
| deserialize = TypeDeserializer().deserialize | ||
|
|
||
| def to_tbl_resp(raw_client_response): | ||
| return { | ||
| "entity_id": deserialize(raw_client_response["entity_id"]), | ||
| "event_ts": deserialize(raw_client_response["event_ts"]), | ||
| "values": deserialize(raw_client_response["values"]), | ||
| } | ||
|
|
||
| batches = [] | ||
| entity_id_batches = [] | ||
| while True: | ||
| batch = list(itertools.islice(entity_ids_iter, batch_size)) | ||
|
|
@@ -310,36 +318,30 @@ async def online_read_async( | |
| entity_id_batch = self._to_client_batch_get_payload( | ||
| online_config, table_name, batch | ||
| ) | ||
| batches.append(batch) | ||
| entity_id_batches.append(entity_id_batch) | ||
|
|
||
| async with self._get_aiodynamodb_client(online_config.region) as client: | ||
|
|
||
| async def get_and_format(entity_id_batch): | ||
| def to_tbl_resp(raw_client_response): | ||
| return { | ||
| "entity_id": deserialize(raw_client_response["entity_id"]), | ||
| "event_ts": deserialize(raw_client_response["event_ts"]), | ||
| "values": deserialize(raw_client_response["values"]), | ||
| } | ||
|
|
||
| response = await client.batch_get_item( | ||
| RequestItems=entity_id_batch, | ||
| ) | ||
| return self._process_batch_get_response( | ||
| table_name, | ||
| response, | ||
| entity_ids, | ||
| batch, | ||
| to_tbl_response=to_tbl_resp, | ||
| ) | ||
|
|
||
| result_batches = await asyncio.gather( | ||
| response_batches = await asyncio.gather( | ||
| *[ | ||
| get_and_format(entity_id_batch) | ||
| client.batch_get_item( | ||
| RequestItems=entity_id_batch, | ||
| ) | ||
| for entity_id_batch in entity_id_batches | ||
| ] | ||
| ) | ||
|
Comment on lines
+325
to
+332
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. make those batch requests in parallel. note: |
||
|
|
||
| result_batches = [] | ||
| for batch, response in zip(batches, response_batches): | ||
| result_batch = self._process_batch_get_response( | ||
| table_name, | ||
| response, | ||
| entity_ids, | ||
| batch, | ||
| to_tbl_response=to_tbl_resp, | ||
| ) | ||
| result_batches.append(result_batch) | ||
|
Comment on lines
+335
to
+343
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. format the responses to the final format. we iterate through the list three times in stead of one, but make up for it in asyncing the batches |
||
|
|
||
| return list(itertools.chain(*result_batches)) | ||
|
|
||
| def _get_aioboto_session(self): | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
construct the batches of ids/entity_ids that we'll be looking up