Skip to content

Commit 40d25c6

Browse files
authored
feat: Support DynamoDB as online store in Go feature server (#5464)
* feat: integrate dynamodb as onlinestore Signed-off-by: iamcodingcat <joyh951021@gmail.com> * refact: move serializeEntityKey method to common Signed-off-by: iamcodingcat <joyh951021@gmail.com> * chore: update mod, sum files Signed-off-by: iamcodingcat <joyh951021@gmail.com> * test: dummy commit for passing PR integration test Signed-off-by: iamcodingcat <joyh951021@gmail.com> --------- Signed-off-by: iamcodingcat <joyh951021@gmail.com>
1 parent 90e627f commit 40d25c6

File tree

6 files changed

+351
-82
lines changed

6 files changed

+351
-82
lines changed

go.mod

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,15 +6,21 @@ toolchain go1.22.5
66

77
require (
88
github.com/apache/arrow/go/v17 v17.0.0
9+
github.com/aws/aws-sdk-go-v2 v1.36.4
10+
github.com/aws/aws-sdk-go-v2/config v1.29.14
11+
github.com/aws/aws-sdk-go-v2/service/dynamodb v1.43.3
12+
github.com/aws/aws-sdk-go-v2/service/s3 v1.79.3
913
github.com/ghodss/yaml v1.0.0
1014
github.com/golang/protobuf v1.5.4
1115
github.com/google/uuid v1.6.0
1216
github.com/mattn/go-sqlite3 v1.14.23
1317
github.com/pkg/errors v0.9.1
1418
github.com/redis/go-redis/v9 v9.6.1
19+
github.com/roberson-io/mmh3 v0.0.0-20190729202758-fdfce3ba6225
1520
github.com/rs/zerolog v1.33.0
1621
github.com/spaolacci/murmur3 v1.1.0
1722
github.com/stretchr/testify v1.9.0
23+
google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1
1824
google.golang.org/grpc v1.67.0
1925
google.golang.org/protobuf v1.34.2
2026
)
@@ -23,20 +29,18 @@ require (
2329
github.com/JohnCGriffin/overflow v0.0.0-20211019200055-46fa312c352c // indirect
2430
github.com/andybalholm/brotli v1.1.0 // indirect
2531
github.com/apache/thrift v0.21.0 // indirect
26-
github.com/aws/aws-sdk-go-v2 v1.36.3 // indirect
2732
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.10 // indirect
28-
github.com/aws/aws-sdk-go-v2/config v1.29.14 // indirect
2933
github.com/aws/aws-sdk-go-v2/credentials v1.17.67 // indirect
3034
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.30 // indirect
31-
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.34 // indirect
32-
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.34 // indirect
35+
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.35 // indirect
36+
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.35 // indirect
3337
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.3 // indirect
3438
github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.34 // indirect
3539
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.3 // indirect
3640
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.7.1 // indirect
41+
github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.10.16 // indirect
3742
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.15 // indirect
3843
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.18.15 // indirect
39-
github.com/aws/aws-sdk-go-v2/service/s3 v1.79.3 // indirect
4044
github.com/aws/aws-sdk-go-v2/service/sso v1.25.3 // indirect
4145
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.30.1 // indirect
4246
github.com/aws/aws-sdk-go-v2/service/sts v1.33.19 // indirect
@@ -66,7 +70,6 @@ require (
6670
golang.org/x/text v0.18.0 // indirect
6771
golang.org/x/tools v0.25.0 // indirect
6872
golang.org/x/xerrors v0.0.0-20240903120638-7835f813f4da // indirect
69-
google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 // indirect
7073
gopkg.in/yaml.v2 v2.4.0 // indirect
7174
gopkg.in/yaml.v3 v3.0.1 // indirect
7275
)

go.sum

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ github.com/apache/thrift v0.21.0 h1:tdPmh/ptjE1IJnhbhrcl2++TauVjy242rkV/UzJChnE=
88
github.com/apache/thrift v0.21.0/go.mod h1:W1H8aR/QRtYNvrPeFXBtobyRkd0/YVhTc6i07XIAgDw=
99
github.com/aws/aws-sdk-go-v2 v1.36.3 h1:mJoei2CxPutQVxaATCzDUjcZEjVRdpsiiXi2o38yqWM=
1010
github.com/aws/aws-sdk-go-v2 v1.36.3/go.mod h1:LLXuLpgzEbD766Z5ECcRmi8AzSwfZItDtmABVkRLGzg=
11+
github.com/aws/aws-sdk-go-v2 v1.36.4 h1:GySzjhVvx0ERP6eyfAbAuAXLtAda5TEy19E5q5W8I9E=
12+
github.com/aws/aws-sdk-go-v2 v1.36.4/go.mod h1:LLXuLpgzEbD766Z5ECcRmi8AzSwfZItDtmABVkRLGzg=
1113
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.10 h1:zAybnyUQXIZ5mok5Jqwlf58/TFE7uvd3IAsa1aF9cXs=
1214
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.10/go.mod h1:qqvMj6gHLR/EXWZw4ZbqlPbQUyenf4h82UQUlKc+l14=
1315
github.com/aws/aws-sdk-go-v2/config v1.29.14 h1:f+eEi/2cKCg9pqKBoAIwRGzVb70MRKqWX4dg1BDcSJM=
@@ -18,16 +20,24 @@ github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.30 h1:x793wxmUWVDhshP8WW2mln
1820
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.30/go.mod h1:Jpne2tDnYiFascUEs2AWHJL9Yp7A5ZVy3TNyxaAjD6M=
1921
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.34 h1:ZK5jHhnrioRkUNOc+hOgQKlUL5JeC3S6JgLxtQ+Rm0Q=
2022
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.34/go.mod h1:p4VfIceZokChbA9FzMbRGz5OV+lekcVtHlPKEO0gSZY=
23+
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.35 h1:o1v1VFfPcDVlK3ll1L5xHsaQAFdNtZ5GXnNR7SwueC4=
24+
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.35/go.mod h1:rZUQNYMNG+8uZxz9FOerQJ+FceCiodXvixpeRtdESrU=
2125
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.34 h1:SZwFm17ZUNNg5Np0ioo/gq8Mn6u9w19Mri8DnJ15Jf0=
2226
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.34/go.mod h1:dFZsC0BLo346mvKQLWmoJxT+Sjp+qcVR1tRVHQGOH9Q=
27+
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.35 h1:R5b82ubO2NntENm3SAm0ADME+H630HomNJdgv+yZ3xw=
28+
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.35/go.mod h1:FuA+nmgMRfkzVKYDNEqQadvEMxtxl9+RLT9ribCwEMs=
2329
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.3 h1:bIqFDwgGXXN1Kpp99pDOdKMTTb5d2KyU5X/BZxjOkRo=
2430
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.3/go.mod h1:H5O/EsxDWyU+LP/V8i5sm8cxoZgc2fdNR9bxlOFrQTo=
2531
github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.34 h1:ZNTqv4nIdE/DiBfUUfXcLZ/Spcuz+RjeziUtNJackkM=
2632
github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.34/go.mod h1:zf7Vcd1ViW7cPqYWEHLHJkS50X0JS2IKz9Cgaj6ugrs=
33+
github.com/aws/aws-sdk-go-v2/service/dynamodb v1.43.3 h1:2FCJAT5wyPs5JjAFoLgaEB0MIiWvXiJ0T6PZiKDkJoo=
34+
github.com/aws/aws-sdk-go-v2/service/dynamodb v1.43.3/go.mod h1:rUOhTo9+gtTYTMnGD+xiiks/2Z8vssPP+uSMNhJBbmI=
2735
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.3 h1:eAh2A4b5IzM/lum78bZ590jy36+d/aFLgKF/4Vd1xPE=
2836
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.3/go.mod h1:0yKJC/kb8sAnmlYa6Zs3QVYqaC8ug2AbnNChv5Ox3uA=
2937
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.7.1 h1:4nm2G6A4pV9rdlWzGMPv4BNtQp22v1hg3yrtkYpeLl8=
3038
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.7.1/go.mod h1:iu6FSzgt+M2/x3Dk8zhycdIcHjEFb36IS8HVUVFoMg0=
39+
github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.10.16 h1:TLsOzHW9zlJoMgjcKQI/7bolyv/DL0796y4NigWgaw8=
40+
github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.10.16/go.mod h1:mNoiR5qsO9TxXZ6psjjQ3M+Zz7hURFTumXHF+UKjyAU=
3141
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.15 h1:dM9/92u2F1JbDaGooxTq18wmmFzbJRfXfVfy96/1CXM=
3242
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.15/go.mod h1:SwFBy2vjtA0vZbjjaFtfN045boopadnoVPhu4Fv66vY=
3343
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.18.15 h1:moLQUoVq91LiqT1nbvzDukyqAlCv89ZmwaHw/ZFlFZg=
@@ -94,6 +104,8 @@ github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRI
94104
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
95105
github.com/redis/go-redis/v9 v9.6.1 h1:HHDteefn6ZkTtY5fGUE8tj8uy85AHk6zP7CpzIAM0y4=
96106
github.com/redis/go-redis/v9 v9.6.1/go.mod h1:0C0c6ycQsdpVNQpxb1njEQIqkx5UcsM8FJCQLgE9+RA=
107+
github.com/roberson-io/mmh3 v0.0.0-20190729202758-fdfce3ba6225 h1:ZMsPCp7oYgjoIFt1c+sM2qojxZXotSYcMF8Ur9/LJlM=
108+
github.com/roberson-io/mmh3 v0.0.0-20190729202758-fdfce3ba6225/go.mod h1:XEESr+X1SY8ZSuc3jqsTlb3clCkqQJ4DcF3Qxv1N3PM=
97109
github.com/rs/xid v1.5.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg=
98110
github.com/rs/zerolog v1.33.0 h1:1cU2KZkvPxNyfgEmhHAz/1A9Bz+llsdYzklWFzgp0r8=
99111
github.com/rs/zerolog v1.33.0/go.mod h1:/7mN4D5sKwJLZQ2b/znpjC3/GQWY/xaDXUM0kKWRHss=
Lines changed: 240 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,240 @@
1+
package onlinestore
2+
3+
import (
4+
"context"
5+
"encoding/hex"
6+
"fmt"
7+
awsConfig "github.com/aws/aws-sdk-go-v2/config"
8+
"github.com/aws/aws-sdk-go-v2/service/dynamodb"
9+
dtypes "github.com/aws/aws-sdk-go-v2/service/dynamodb/types"
10+
"github.com/feast-dev/feast/go/internal/feast/registry"
11+
"github.com/feast-dev/feast/go/protos/feast/serving"
12+
"github.com/feast-dev/feast/go/protos/feast/types"
13+
"github.com/roberson-io/mmh3"
14+
"golang.org/x/sync/errgroup"
15+
"golang.org/x/sync/semaphore"
16+
"google.golang.org/protobuf/proto"
17+
"google.golang.org/protobuf/types/known/timestamppb"
18+
"runtime"
19+
"sync"
20+
"time"
21+
)
22+
23+
type batchResult struct {
24+
index int
25+
response *dynamodb.BatchGetItemOutput
26+
err error
27+
}
28+
29+
type DynamodbOnlineStore struct {
30+
// Feast project name
31+
// TODO: Should we remove project as state that is tracked at the store level?
32+
project string
33+
34+
client *dynamodb.Client
35+
36+
config *registry.RepoConfig
37+
38+
// dynamodb configuration
39+
consistentRead *bool
40+
batchSize *int
41+
}
42+
43+
func NewDynamodbOnlineStore(project string, config *registry.RepoConfig, onlineStoreConfig map[string]interface{}) (*DynamodbOnlineStore, error) {
44+
store := DynamodbOnlineStore{
45+
project: project,
46+
config: config,
47+
}
48+
49+
// aws configuration
50+
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
51+
defer cancel()
52+
cfg, err := awsConfig.LoadDefaultConfig(ctx)
53+
if err != nil {
54+
panic(err)
55+
}
56+
store.client = dynamodb.NewFromConfig(cfg)
57+
58+
// dynamodb configuration
59+
consistentRead, ok := onlineStoreConfig["consistent_reads"].(bool)
60+
if !ok {
61+
consistentRead = false
62+
}
63+
store.consistentRead = &consistentRead
64+
65+
var batchSize int
66+
if batchSizeFloat, ok := onlineStoreConfig["batch_size"].(float64); ok {
67+
batchSize = int(batchSizeFloat)
68+
} else {
69+
batchSize = 40
70+
}
71+
store.batchSize = &batchSize
72+
73+
return &store, nil
74+
}
75+
76+
func (d *DynamodbOnlineStore) OnlineRead(ctx context.Context, entityKeys []*types.EntityKey, featureViewNames []string, featureNames []string) ([][]FeatureData, error) {
77+
// prevent resource waste in case context is canceled earlier
78+
if ctx.Err() != nil {
79+
return nil, ctx.Err()
80+
}
81+
82+
results := make([][]FeatureData, len(entityKeys))
83+
84+
// serialize entity key into entity hash id
85+
entityIndexMap := make(map[string]int)
86+
entityIds := make([]string, 0, len(entityKeys))
87+
unprocessedEntityIds := make(map[string]bool)
88+
for i, entityKey := range entityKeys {
89+
serKey, err := serializeEntityKey(entityKey, d.config.EntityKeySerializationVersion)
90+
if err != nil {
91+
return nil, err
92+
}
93+
entityId := hex.EncodeToString(mmh3.Hashx64_128(*serKey, 0))
94+
entityIds = append(entityIds, entityId)
95+
entityIndexMap[entityId] = i
96+
unprocessedEntityIds[entityId] = false
97+
}
98+
99+
// metadata from feature views, feature names
100+
featureMap, featureNamesIndex, err := makeFeatureMeta(featureViewNames, featureNames)
101+
if err != nil {
102+
return nil, err
103+
}
104+
105+
// initialize `FeatureData` slice
106+
featureCount := len(featureNamesIndex)
107+
for i := 0; i < len(results); i++ {
108+
results[i] = make([]FeatureData, featureCount)
109+
}
110+
111+
// controls the maximum number of concurrent goroutines sending requests to DynamoDB using a semaphore
112+
cpuCount := runtime.NumCPU()
113+
sem := semaphore.NewWeighted(int64(cpuCount * 2))
114+
115+
var mu sync.Mutex
116+
for featureViewName, featureNames := range featureMap {
117+
tableName := fmt.Sprintf("%s.%s", d.project, featureViewName)
118+
119+
var batchGetItemInputs []*dynamodb.BatchGetItemInput
120+
batchSize := *d.batchSize
121+
for i := 0; i < len(entityIds); i += batchSize {
122+
end := i + batchSize
123+
if end > len(entityIds) {
124+
end = len(entityIds)
125+
}
126+
batchEntityIds := entityIds[i:end]
127+
entityIdBatch := make([]map[string]dtypes.AttributeValue, len(batchEntityIds))
128+
for i, entityId := range batchEntityIds {
129+
entityIdBatch[i] = map[string]dtypes.AttributeValue{
130+
"entity_id": &dtypes.AttributeValueMemberS{Value: entityId},
131+
}
132+
}
133+
batchGetItemInput := &dynamodb.BatchGetItemInput{
134+
RequestItems: map[string]dtypes.KeysAndAttributes{
135+
tableName: {
136+
Keys: entityIdBatch,
137+
ConsistentRead: d.consistentRead,
138+
},
139+
},
140+
}
141+
batchGetItemInputs = append(batchGetItemInputs, batchGetItemInput)
142+
}
143+
144+
// goroutines sending requests to DynamoDB
145+
errGroup, ctx := errgroup.WithContext(ctx)
146+
for i, batchGetItemInput := range batchGetItemInputs {
147+
_, batchGetItemInput := i, batchGetItemInput
148+
errGroup.Go(func() error {
149+
if err := sem.Acquire(ctx, 1); err != nil {
150+
return err
151+
}
152+
defer sem.Release(1)
153+
154+
resp, err := d.client.BatchGetItem(ctx, batchGetItemInput)
155+
if err != nil {
156+
return err
157+
}
158+
159+
// in case there is no entity id of a feature view in dynamodb
160+
batchSize := len(resp.Responses[tableName])
161+
if batchSize == 0 {
162+
return nil
163+
}
164+
165+
// process response from dynamodb
166+
for j := 0; j < batchSize; j++ {
167+
entityId := resp.Responses[tableName][j]["entity_id"].(*dtypes.AttributeValueMemberS).Value
168+
timestampString := resp.Responses[tableName][j]["event_ts"].(*dtypes.AttributeValueMemberS).Value
169+
t, err := time.Parse("2006-01-02 15:04:05-07:00", timestampString)
170+
if err != nil {
171+
return err
172+
}
173+
timeStamp := timestamppb.New(t)
174+
175+
featureValues := resp.Responses[tableName][j]["values"].(*dtypes.AttributeValueMemberM).Value
176+
entityIndex := entityIndexMap[entityId]
177+
178+
for _, featureName := range featureNames {
179+
featureValue := featureValues[featureName].(*dtypes.AttributeValueMemberB).Value
180+
var value types.Value
181+
if err := proto.Unmarshal(featureValue, &value); err != nil {
182+
return err
183+
}
184+
featureIndex := featureNamesIndex[featureName]
185+
186+
mu.Lock()
187+
results[entityIndex][featureIndex] = FeatureData{Reference: serving.FeatureReferenceV2{FeatureViewName: featureViewName, FeatureName: featureName},
188+
Timestamp: timestamppb.Timestamp{Seconds: timeStamp.Seconds, Nanos: timeStamp.Nanos},
189+
Value: types.Value{Val: value.Val},
190+
}
191+
mu.Unlock()
192+
}
193+
194+
mu.Lock()
195+
delete(unprocessedEntityIds, entityId)
196+
mu.Unlock()
197+
}
198+
return nil
199+
})
200+
}
201+
if err := errGroup.Wait(); err != nil {
202+
return nil, err
203+
}
204+
205+
// process null imputation for entity ids that don't exist in dynamodb
206+
currentTime := timestamppb.Now() // TODO: should use a different timestamp?
207+
for entityId, _ := range unprocessedEntityIds {
208+
entityIndex := entityIndexMap[entityId]
209+
for _, featureName := range featureNames {
210+
featureIndex := featureNamesIndex[featureName]
211+
results[entityIndex][featureIndex] = FeatureData{Reference: serving.FeatureReferenceV2{FeatureViewName: featureViewName, FeatureName: featureName},
212+
Timestamp: timestamppb.Timestamp{Seconds: currentTime.Seconds, Nanos: currentTime.Nanos},
213+
Value: types.Value{Val: &types.Value_NullVal{NullVal: types.Null_NULL}},
214+
}
215+
}
216+
}
217+
}
218+
219+
return results, nil
220+
}
221+
222+
func (d *DynamodbOnlineStore) Destruct() {
223+
224+
}
225+
226+
func makeFeatureMeta(featureViewNames []string, featureNames []string) (map[string][]string, map[string]int, error) {
227+
if len(featureViewNames) != len(featureNames) {
228+
return nil, nil, fmt.Errorf("the lengths of featureViewNames and featureNames must be the same. got=%d, %d", len(featureViewNames), len(featureNames))
229+
}
230+
featureMap := make(map[string][]string)
231+
featureNamesIndex := make(map[string]int)
232+
for i := 0; i < len(featureViewNames); i++ {
233+
featureViewName := featureViewNames[i]
234+
featureName := featureNames[i]
235+
236+
featureMap[featureViewName] = append(featureMap[featureViewName], featureName)
237+
featureNamesIndex[featureName] = i
238+
}
239+
return featureMap, featureNamesIndex, nil
240+
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
package onlinestore
2+
3+
import (
4+
"testing"
5+
6+
"github.com/feast-dev/feast/go/internal/feast/registry"
7+
"github.com/stretchr/testify/assert"
8+
)
9+
10+
func TestNewDynamodbOnlineStore(t *testing.T) {
11+
var config = map[string]interface{}{
12+
"batch_size": 40,
13+
"region": "us-east-1",
14+
"max_pool_connections": 4,
15+
"consistent_reads": "true",
16+
}
17+
rc := &registry.RepoConfig{
18+
OnlineStore: config,
19+
EntityKeySerializationVersion: 2,
20+
}
21+
_, err := NewDynamodbOnlineStore("test", rc, config)
22+
assert.Nil(t, err)
23+
}

0 commit comments

Comments
 (0)