1111# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1212# See the License for the specific language governing permissions and
1313# limitations under the License.
14+ import itertools
1415import logging
1516from datetime import datetime
1617from typing import Any , Callable , Dict , List , Optional , Sequence , Tuple
@@ -50,10 +51,13 @@ class DynamoDBOnlineStoreConfig(FeastConfigBaseModel):
5051 """Online store type selector"""
5152
5253 region : StrictStr
53- """ AWS Region Name """
54+ """AWS Region Name"""
5455
5556 table_name_template : StrictStr = "{project}.{table_name}"
56- """ DynamoDB table name template """
57+ """DynamoDB table name template"""
58+
59+ sort_response : bool = True
60+ """Wether or not to sort BatchGetItem response."""
5761
5862
5963class DynamoDBOnlineStore (OnlineStore ):
@@ -63,10 +67,12 @@ class DynamoDBOnlineStore(OnlineStore):
6367 Attributes:
6468 _dynamodb_client: Boto3 DynamoDB client.
6569 _dynamodb_resource: Boto3 DynamoDB resource.
70+ _batch_size: Number of items to retrieve in a DynamoDB BatchGetItem call.
6671 """
6772
6873 _dynamodb_client = None
6974 _dynamodb_resource = None
75+ _batch_size = 40
7076
7177 @log_exceptions_and_usage (online_store = "dynamodb" )
7278 def update (
@@ -196,7 +202,6 @@ def online_read(
196202 table : FeatureView ,
197203 entity_keys : List [EntityKeyProto ],
198204 requested_features : Optional [List [str ]] = None ,
199- sort_response : bool = True ,
200205 ) -> List [Tuple [Optional [datetime ], Optional [Dict [str , ValueProto ]]]]:
201206 """
202207 Retrieve feature values from the online DynamoDB store.
@@ -208,7 +213,6 @@ def online_read(
208213 config: The RepoConfig for the current FeatureStore.
209214 table: Feast FeatureView.
210215 entity_keys: a list of entity keys that should be read from the FeatureStore.
211- sort_response: wether or not to sort DynamoDB responses by the entity_ids order.
212216 """
213217 online_config = config .online_store
214218 assert isinstance (online_config , DynamoDBOnlineStoreConfig )
@@ -219,36 +223,26 @@ def online_read(
219223
220224 result : List [Tuple [Optional [datetime ], Optional [Dict [str , ValueProto ]]]] = []
221225 entity_ids = [compute_entity_id (entity_key ) for entity_key in entity_keys ]
222-
223- len_entity_ids = len (entity_ids )
224- batch_size = 40
225- # Iterate until the end_index is the value len_entity_ids
226- iters = (
227- len_entity_ids // batch_size + 1
228- if len_entity_ids % batch_size > 0
229- else len_entity_ids // batch_size
230- )
231- for i in range (iters ):
232- start_index = min (i * batch_size , len_entity_ids )
233- end_index = min (i * batch_size + batch_size , len_entity_ids )
234-
226+
227+ batch_size = self ._batch_size
228+ sort_response = online_config .sort_response
229+ entity_ids_iter = iter (entity_ids )
230+ while True :
231+ batch = list (itertools .islice (entity_ids_iter , batch_size ))
232+ # No more items to insert
233+ if len (batch ) == 0 :
234+ break
235235 batch_entity_ids = {
236236 table_instance .name : {
237- "Keys" : [
238- {"entity_id" : entity_id }
239- for entity_id in entity_ids [start_index :end_index ]
240- ]
237+ "Keys" : [{"entity_id" : entity_id } for entity_id in batch ]
241238 }
242239 }
243-
244240 with tracing_span (name = "remote_call" ):
245241 response = dynamodb_resource .batch_get_item (
246242 RequestItems = batch_entity_ids
247243 )
248-
249244 response = response .get ("Responses" )
250245 table_responses = response .get (table_instance .name )
251-
252246 if table_responses :
253247 if sort_response :
254248 table_responses = self ._sort_dynamodb_response (
0 commit comments