diff --git a/go/internal/feast/onlinestore/dynamodbonlinestore.go b/go/internal/feast/onlinestore/dynamodbonlinestore.go index 36aed52cddf..6cea79b5067 100644 --- a/go/internal/feast/onlinestore/dynamodbonlinestore.go +++ b/go/internal/feast/onlinestore/dynamodbonlinestore.go @@ -11,20 +11,22 @@ import ( "github.com/feast-dev/feast/go/protos/feast/serving" "github.com/feast-dev/feast/go/protos/feast/types" "github.com/roberson-io/mmh3" + "github.com/rs/zerolog/log" "golang.org/x/sync/errgroup" "golang.org/x/sync/semaphore" "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/timestamppb" + "math/rand" "runtime" "sync" "time" ) -type batchResult struct { - index int - response *dynamodb.BatchGetItemOutput - err error -} +const ( + maxRetriesDefault = 5 + initialBackoff = 50 * time.Millisecond + maxBackoff = 1 * time.Second +) type DynamodbOnlineStore struct { // Feast project name @@ -38,6 +40,7 @@ type DynamodbOnlineStore struct { // dynamodb configuration consistentRead *bool batchSize *int + maxRetries *int } func NewDynamodbOnlineStore(project string, config *registry.RepoConfig, onlineStoreConfig map[string]interface{}) (*DynamodbOnlineStore, error) { @@ -70,6 +73,14 @@ func NewDynamodbOnlineStore(project string, config *registry.RepoConfig, onlineS } store.batchSize = &batchSize + var maxRetries int + if maxRetriesFloat, ok := onlineStoreConfig["max_retries"].(float64); ok { + maxRetries = int(maxRetriesFloat) + } else { + maxRetries = maxRetriesDefault + } + store.maxRetries = &maxRetries + return &store, nil } @@ -79,12 +90,12 @@ func (d *DynamodbOnlineStore) OnlineRead(ctx context.Context, entityKeys []*type return nil, ctx.Err() } + maxRetries := *d.maxRetries results := make([][]FeatureData, len(entityKeys)) // serialize entity key into entity hash id entityIndexMap := make(map[string]int) entityIds := make([]string, 0, len(entityKeys)) - unprocessedEntityIds := make(map[string]bool) for i, entityKey := range entityKeys { serKey, err := serializeEntityKey(entityKey, d.config.EntityKeySerializationVersion) if err != nil { @@ -93,7 +104,6 @@ func (d *DynamodbOnlineStore) OnlineRead(ctx context.Context, entityKeys []*type entityId := hex.EncodeToString(mmh3.Hashx64_128(*serKey, 0)) entityIds = append(entityIds, entityId) entityIndexMap[entityId] = i - unprocessedEntityIds[entityId] = false } // metadata from feature views, feature names @@ -116,6 +126,11 @@ func (d *DynamodbOnlineStore) OnlineRead(ctx context.Context, entityKeys []*type for featureViewName, featureNames := range featureMap { tableName := fmt.Sprintf("%s.%s", d.project, featureViewName) + unprocessedEntityIdsFeatureView := make(map[string]bool) + for _, entityId := range entityIds { + unprocessedEntityIdsFeatureView[entityId] = true + } + var batchGetItemInputs []*dynamodb.BatchGetItemInput batchSize := *d.batchSize for i := 0; i < len(entityIds); i += batchSize { @@ -151,28 +166,87 @@ func (d *DynamodbOnlineStore) OnlineRead(ctx context.Context, entityKeys []*type } defer sem.Release(1) + var Responses []map[string]dtypes.AttributeValue + var unprocessedKeys dtypes.KeysAndAttributes + + // response from initial request to dynamodb resp, err := d.client.BatchGetItem(ctx, batchGetItemInput) if err != nil { return err } + if len(resp.Responses[tableName]) > 0 { + Responses = append(Responses, resp.Responses[tableName]...) + } + if len(resp.UnprocessedKeys[tableName].Keys) > 0 { + unprocessedKeys = resp.UnprocessedKeys[tableName] + } + // retry about unprocessed key from initial request to dynamodb + retries := 0 + backoff := initialBackoff + jitterRand := rand.New(rand.NewSource(time.Now().UnixNano())) + for len(unprocessedKeys.Keys) > 0 && retries < maxRetries { + log.Info().Msgf("%d retry using exponential backoff to dynamodb", retries+1) + if err := ctx.Err(); err != nil { + return err + } + // jitter before retrying + jitter := time.Duration(jitterRand.Intn(100)) * time.Millisecond + waitDuration := backoff + jitter + timer := time.NewTimer(waitDuration) + select { + case <-ctx.Done(): + timer.Stop() + return ctx.Err() + case <-timer.C: + } + + retries++ + backoff *= 2 + if backoff > maxBackoff { + backoff = maxBackoff + } + retryBatchGetItemInput := &dynamodb.BatchGetItemInput{ + RequestItems: map[string]dtypes.KeysAndAttributes{ + tableName: unprocessedKeys, + }, + } + retryResp, err := d.client.BatchGetItem(ctx, retryBatchGetItemInput) + if err != nil { + log.Info().Msgf("BatchGetItem retry attempt(%d) failed for table %s. err: %v\n", retries, tableName, err) + continue + } + if len(retryResp.Responses[tableName]) > 0 { + Responses = append(Responses, retryResp.Responses[tableName]...) + } + // check unprocessed key in retried response again + if len(retryResp.UnprocessedKeys[tableName].Keys) > 0 { + unprocessedKeys = retryResp.UnprocessedKeys[tableName] + } else { + unprocessedKeys = dtypes.KeysAndAttributes{} + } + } + + if len(unprocessedKeys.Keys) > 0 { + return fmt.Errorf("failed to process %d keys from table %s after %d retries. keys=%+v\n", len(unprocessedKeys.Keys), tableName, maxRetries, unprocessedKeys.Keys) + } // in case there is no entity id of a feature view in dynamodb - batchSize := len(resp.Responses[tableName]) + batchSize := len(Responses) if batchSize == 0 { return nil } // process response from dynamodb for j := 0; j < batchSize; j++ { - entityId := resp.Responses[tableName][j]["entity_id"].(*dtypes.AttributeValueMemberS).Value - timestampString := resp.Responses[tableName][j]["event_ts"].(*dtypes.AttributeValueMemberS).Value + entityId := Responses[j]["entity_id"].(*dtypes.AttributeValueMemberS).Value + timestampString := Responses[j]["event_ts"].(*dtypes.AttributeValueMemberS).Value t, err := time.Parse("2006-01-02 15:04:05-07:00", timestampString) if err != nil { return err } timeStamp := timestamppb.New(t) - featureValues := resp.Responses[tableName][j]["values"].(*dtypes.AttributeValueMemberM).Value + featureValues := Responses[j]["values"].(*dtypes.AttributeValueMemberM).Value entityIndex := entityIndexMap[entityId] for _, featureName := range featureNames { @@ -192,7 +266,7 @@ func (d *DynamodbOnlineStore) OnlineRead(ctx context.Context, entityKeys []*type } mu.Lock() - delete(unprocessedEntityIds, entityId) + delete(unprocessedEntityIdsFeatureView, entityId) mu.Unlock() } return nil @@ -204,7 +278,7 @@ func (d *DynamodbOnlineStore) OnlineRead(ctx context.Context, entityKeys []*type // process null imputation for entity ids that don't exist in dynamodb currentTime := timestamppb.Now() // TODO: should use a different timestamp? - for entityId, _ := range unprocessedEntityIds { + for entityId, _ := range unprocessedEntityIdsFeatureView { entityIndex := entityIndexMap[entityId] for _, featureName := range featureNames { featureIndex := featureNamesIndex[featureName]