Skip to content

[DynamoDB] BatchWriteItem operation: Provided list of item keys contains duplicates #2483

@TremaMiguel

Description

@TremaMiguel

Expected Behavior

Duplication should be handled if a partition key already exists in the batch to be written to DynamoDB.

Current Behavior

The following exception raises when running the local test test_online_retrieval[LOCAL:File:dynamodb-True]

botocore.exceptions.ClientError: An error occurred (ValidationException) when calling the BatchWriteItem operation: Provided list of item keys contains duplicates

Steps to reproduce

This is the output from the pytest log

environment = Environment(name='integration_test_63b98a_1', test_repo_config=LOCAL:File:dynamodb, feature_store=<feast.feature_store...sal.data_sources.file.FileDataSourceCreator object at 0x7fb91d38f730>, python_feature_server=False, worker_id='master')
universal_data_sources = (UniversalEntities(customer_vals=[1001, 1002, 1003, 1004, 1005, 1006, 1007, 1008, 1009, 1010, 1011, 1012, 1013, 1014, ...object at 0x7fb905f6b7c0>, field_mapping=<feast.infra.offline_stores.file_source.FileSource object at 0x7fb905f794f0>))
full_feature_names = True

    @pytest.mark.integration
    @pytest.mark.universal
    @pytest.mark.parametrize("full_feature_names", [True, False], ids=lambda v: str(v))
    def test_online_retrieval(environment, universal_data_sources, full_feature_names):
        fs = environment.feature_store
        entities, datasets, data_sources = universal_data_sources
        feature_views = construct_universal_feature_views(data_sources)
    
        feature_service = FeatureService(
            "convrate_plus100",
            features=[feature_views.driver[["conv_rate"]], feature_views.driver_odfv],
        )
        feature_service_entity_mapping = FeatureService(
            name="entity_mapping",
            features=[
                feature_views.location.with_name("origin").with_join_key_map(
                    {"location_id": "origin_id"}
                ),
                feature_views.location.with_name("destination").with_join_key_map(
                    {"location_id": "destination_id"}
                ),
            ],
        )
    
        feast_objects = []
        feast_objects.extend(feature_views.values())
        feast_objects.extend(
            [
                driver(),
                customer(),
                location(),
                feature_service,
                feature_service_entity_mapping,
            ]
        )
        fs.apply(feast_objects)
>       fs.materialize(
            environment.start_date - timedelta(days=1),
            environment.end_date + timedelta(days=1),
        )

sdk/python/tests/integration/online_store/test_universal_online.py:426: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
sdk/python/feast/feature_store.py:1165: in materialize
    provider.materialize_single_feature_view(
sdk/python/feast/infra/passthrough_provider.py:164: in materialize_single_feature_view
    self.online_write_batch(
sdk/python/feast/infra/passthrough_provider.py:86: in online_write_batch
    self.online_store.online_write_batch(config, table, data, progress)
sdk/python/feast/infra/online_stores/dynamodb.py:208: in online_write_batch
    progress(1)
../venv/lib/python3.9/site-packages/boto3/dynamodb/table.py:168: in __exit__
    self._flush()
../venv/lib/python3.9/site-packages/boto3/dynamodb/table.py:144: in _flush
    response = self._client.batch_write_item(
../venv/lib/python3.9/site-packages/botocore/client.py:395: in _api_call
    return self._make_api_call(operation_name, kwargs)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <botocore.client.DynamoDB object at 0x7fb9056a0eb0>, operation_name = 'BatchWriteItem'
api_params = {'RequestItems': {'integration_test_63b98a_1.global_stats': [{'PutRequest': {'Item': {'entity_id': '361ad244a817acdb9c...Item': {'entity_id': '361ad244a817acdb9cb041cf7ee8b4b0', 'event_ts': '2022-04-03 16:00:00+00:00', 'values': {...}}}}]}}

    def _make_api_call(self, operation_name, api_params):
        operation_model = self._service_model.operation_model(operation_name)
        service_name = self._service_model.service_name
        history_recorder.record('API_CALL', {
            'service': service_name,
            'operation': operation_name,
            'params': api_params,
        })
        if operation_model.deprecated:
            logger.debug('Warning: %s.%s() is deprecated',
                         service_name, operation_name)
        request_context = {
            'client_region': self.meta.region_name,
            'client_config': self.meta.config,
            'has_streaming_input': operation_model.has_streaming_input,
            'auth_type': operation_model.auth_type,
        }
        request_dict = self._convert_to_request_dict(
            api_params, operation_model, context=request_context)
        resolve_checksum_context(request_dict, operation_model, api_params)
    
        service_id = self._service_model.service_id.hyphenize()
        handler, event_response = self.meta.events.emit_until_response(
            'before-call.{service_id}.{operation_name}'.format(
                service_id=service_id,
                operation_name=operation_name),
            model=operation_model, params=request_dict,
            request_signer=self._request_signer, context=request_context)
    
        if event_response is not None:
            http, parsed_response = event_response
        else:
            apply_request_checksum(request_dict)
            http, parsed_response = self._make_request(
                operation_model, request_dict, request_context)
    
        self.meta.events.emit(
            'after-call.{service_id}.{operation_name}'.format(
                service_id=service_id,
                operation_name=operation_name),
            http_response=http, parsed=parsed_response,
            model=operation_model, context=request_context
        )
    
        if http.status_code >= 300:
            error_code = parsed_response.get("Error", {}).get("Code")
            error_class = self.exceptions.from_code(error_code)
>           raise error_class(parsed_response, operation_name)
E           botocore.exceptions.ClientError: An error occurred (ValidationException) when calling the BatchWriteItem operation: Provided list of item keys contains duplicates

Specifications

  • Version: feast 0.18.1
  • Platform: Windows

Possible Solution

Overwrite by partition keys in DynamoDB.online_write_batch() method

with table_instance.batch_writer(overwrite_by_pkeys=["entity_id"]) as batch:
    for entity_key, features, timestamp, created_ts in data:
                entity_id = compute_entity_id(entity_key)

This solution comes from StackOverflow

Other Comments

This error prompt while developing #2358 , I can provide a solution to both in the same PR if possible.

Metadata

Metadata

Assignees

Type

No type
No fields configured for issues without a type.

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions