Skip to content

Commit ff7c7fa

Browse files
authored
chore: Go Sql Online Store (#2446)
* Initial structure for go sqlite online store Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Somewhat intermediate state Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Add sqlite online store for go for testing Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Revert Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Revert Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Clean up Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Address review issues Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Fix/address issues Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * lint Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Fix Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Fix Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Make integration test work Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Fix tests Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Fix tests Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * debugging Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Debug Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Debug Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Debug Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Debug Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Debug Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Debug Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Remove feature_repo files Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * update gitignore Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Clean up code Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Update go mod Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Update makefile Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Fix gitignore issue Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Update makefile Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Update makefile Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Update makefile Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Update makefile Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Update makefile Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Revert worfklow Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Update build path Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * remove Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * rename Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Address review Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * fix tests Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Fix Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * see if this fixes test Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Fix Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * revert Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Will add in separate pr to update cryptography Signed-off-by: Kevin Zhang <kzhang@tecton.ai>
1 parent b95f441 commit ff7c7fa

File tree

12 files changed

+711
-39
lines changed

12 files changed

+711
-39
lines changed

.gitignore

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,7 @@ coverage.xml
105105
.hypothesis/
106106
.pytest_cache/
107107
infra/scripts/*.conf
108+
go/internal/test/feature_repo
108109

109110
# Translations
110111
*.mo
@@ -204,7 +205,7 @@ ui/.pnp
204205
ui/.pnp.js
205206
ui/coverage
206207
ui/build
207-
ui/feature_repo/data/online.db
208+
ui/feature_repo/data/online.db
208209
ui/feature_repo/registry.db
209210
ui/.vercel
210211

Makefile

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,9 @@ compile-go-lib: install-go-proto-dependencies install-go-ci-dependencies
141141
python -m pip install pybindgen==0.22.0
142142
cd sdk/python && python setup.py build_go_lib
143143

144+
# Needs feast package to setup the feature store
144145
test-go: compile-protos-go
146+
pip install -e "sdk/python[ci]"
145147
go test ./...
146148

147149
format-go:

go.mod

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,21 +8,25 @@ require (
88
github.com/go-python/gopy v0.4.0
99
github.com/go-redis/redis/v8 v8.11.4
1010
github.com/golang/protobuf v1.5.2
11-
github.com/google/uuid v1.1.2
11+
github.com/google/uuid v1.2.0
12+
github.com/mattn/go-sqlite3 v1.14.12
1213
github.com/spaolacci/murmur3 v1.1.0
1314
github.com/stretchr/testify v1.7.0
15+
github.com/xitongsys/parquet-go v1.6.2
16+
github.com/xitongsys/parquet-go-source v0.0.0-20220315005136-aec0fe3e777c
1417
google.golang.org/grpc v1.44.0
1518
google.golang.org/protobuf v1.27.1
1619
)
1720

1821
require (
22+
github.com/apache/thrift v0.16.0 // indirect
1923
github.com/cespare/xxhash/v2 v2.1.2 // indirect
2024
github.com/davecgh/go-spew v1.1.1 // indirect
2125
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
26+
github.com/golang/snappy v0.0.4 // indirect
2227
github.com/google/flatbuffers v2.0.0+incompatible // indirect
2328
github.com/google/go-cmp v0.5.7 // indirect
24-
github.com/klauspost/compress v1.13.6 // indirect
25-
github.com/kr/pretty v0.1.0 // indirect
29+
github.com/klauspost/compress v1.15.1 // indirect
2630
github.com/pierrec/lz4/v4 v4.1.9 // indirect
2731
github.com/pmezard/go-difflib v1.0.0 // indirect
2832
golang.org/x/exp v0.0.0-20211028214138-64b4c8e87d1a // indirect
@@ -31,7 +35,6 @@ require (
3135
golang.org/x/text v0.3.7 // indirect
3236
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
3337
google.golang.org/genproto v0.0.0-20220118154757-00ab72f36ad5 // indirect
34-
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 // indirect
3538
gopkg.in/yaml.v2 v2.4.0 // indirect
3639
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c // indirect
3740
)

go.sum

Lines changed: 210 additions & 5 deletions
Large diffs are not rendered by default.

go/cmd/server/server.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package main
22

33
import (
44
"context"
5+
56
"github.com/feast-dev/feast/go/internal/feast"
67
"github.com/feast-dev/feast/go/protos/feast/serving"
78
prototypes "github.com/feast-dev/feast/go/protos/feast/types"
@@ -24,6 +25,9 @@ func (s *servingServiceServer) GetFeastServingInfo(ctx context.Context, request
2425
}, nil
2526
}
2627

28+
// Returns an object containing the response to GetOnlineFeatures.
29+
// Metadata contains featurenames that corresponds to the number of rows in response.Results.
30+
// Results contains values including the value of the feature, the event timestamp, and feature status in a columnar format.
2731
func (s *servingServiceServer) GetOnlineFeatures(ctx context.Context, request *serving.GetOnlineFeaturesRequest) (*serving.GetOnlineFeaturesResponse, error) {
2832
featuresOrService, err := s.fs.ParseFeatures(request.GetKind())
2933
if err != nil {
@@ -36,6 +40,9 @@ func (s *servingServiceServer) GetOnlineFeatures(ctx context.Context, request *s
3640
featuresOrService.FeatureService,
3741
request.GetEntities(),
3842
request.GetFullFeatureNames())
43+
if err != nil {
44+
return nil, err
45+
}
3946

4047
resp := &serving.GetOnlineFeaturesResponse{
4148
Results: make([]*serving.GetOnlineFeaturesResponse_FeatureVector, 0),

go/cmd/server/server_test.go

Lines changed: 78 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -2,33 +2,42 @@ package main
22

33
import (
44
"context"
5+
"net"
6+
"path/filepath"
7+
"reflect"
8+
"runtime"
9+
"testing"
10+
511
"github.com/feast-dev/feast/go/internal/feast"
12+
"github.com/feast-dev/feast/go/internal/test"
613
"github.com/feast-dev/feast/go/protos/feast/serving"
14+
"github.com/feast-dev/feast/go/protos/feast/types"
715
"github.com/stretchr/testify/assert"
816
"google.golang.org/grpc"
917
"google.golang.org/grpc/test/bufconn"
10-
"net"
11-
"path/filepath"
12-
"runtime"
13-
"testing"
1418
)
1519

1620
// Return absolute path to the test_repo directory regardless of the working directory
17-
func getRepoPath() string {
21+
func getRepoPath(basePath string) string {
1822
// Get the file path of this source file, regardless of the working directory
19-
_, filename, _, ok := runtime.Caller(0)
20-
if !ok {
21-
panic("couldn't find file path of the test file")
23+
if basePath == "" {
24+
_, filename, _, ok := runtime.Caller(0)
25+
if !ok {
26+
panic("couldn't find file path of the test file")
27+
}
28+
return filepath.Join(filename, "..", "..", "feature_repo")
29+
} else {
30+
return filepath.Join(basePath, "feature_repo")
2231
}
23-
return filepath.Join(filename, "..", "..", "feature_repo")
2432
}
2533

26-
func getClient(ctx context.Context) (serving.ServingServiceClient, func()) {
34+
// Starts a new grpc server, registers the serving service and returns a client.
35+
func getClient(ctx context.Context, basePath string) (serving.ServingServiceClient, func()) {
2736
buffer := 1024 * 1024
2837
listener := bufconn.Listen(buffer)
2938

3039
server := grpc.NewServer()
31-
config, err := feast.NewRepoConfigFromFile(getRepoPath())
40+
config, err := feast.NewRepoConfigFromFile(getRepoPath(basePath))
3241
if err != nil {
3342
panic(err)
3443
}
@@ -58,21 +67,73 @@ func getClient(ctx context.Context) (serving.ServingServiceClient, func()) {
5867
}
5968

6069
func TestGetFeastServingInfo(t *testing.T) {
61-
t.Skip("@todo(achals): feature_repo isn't checked in yet")
6270
ctx := context.Background()
63-
client, closer := getClient(ctx)
71+
// Pregenerated using `feast init`.
72+
dir := "."
73+
err := test.SetupFeatureRepo(dir)
74+
assert.Nil(t, err)
75+
defer test.CleanUpRepo(dir)
76+
client, closer := getClient(ctx, dir)
6477
defer closer()
6578
response, err := client.GetFeastServingInfo(ctx, &serving.GetFeastServingInfoRequest{})
6679
assert.Nil(t, err)
6780
assert.Equal(t, feastServerVersion, response.Version)
6881
}
6982

70-
func TestGetOnlineFeatures(t *testing.T) {
71-
t.Skip("@todo(achals): feature_repo isn't checked in yet")
83+
func TestGetOnlineFeaturesSqlite(t *testing.T) {
7284
ctx := context.Background()
73-
client, closer := getClient(ctx)
85+
// Pregenerated using `feast init`.
86+
dir := "."
87+
err := test.SetupFeatureRepo(dir)
88+
assert.Nil(t, err)
89+
defer test.CleanUpRepo(dir)
90+
client, closer := getClient(ctx, dir)
7491
defer closer()
75-
response, err := client.GetOnlineFeatures(ctx, &serving.GetOnlineFeaturesRequest{})
92+
entities := make(map[string]*types.RepeatedValue)
93+
entities["driver_id"] = &types.RepeatedValue{
94+
Val: []*types.Value{
95+
{Val: &types.Value_Int64Val{Int64Val: 1001}},
96+
{Val: &types.Value_Int64Val{Int64Val: 1003}},
97+
{Val: &types.Value_Int64Val{Int64Val: 1005}},
98+
},
99+
}
100+
request := &serving.GetOnlineFeaturesRequest{
101+
Kind: &serving.GetOnlineFeaturesRequest_Features{
102+
Features: &serving.FeatureList{
103+
Val: []string{"driver_hourly_stats:conv_rate", "driver_hourly_stats:acc_rate", "driver_hourly_stats:avg_daily_trips"},
104+
},
105+
},
106+
Entities: entities,
107+
}
108+
response, err := client.GetOnlineFeatures(ctx, request)
109+
expectedEntityValuesResp := []*types.Value{
110+
{Val: &types.Value_Int64Val{Int64Val: 1001}},
111+
{Val: &types.Value_Int64Val{Int64Val: 1003}},
112+
{Val: &types.Value_Int64Val{Int64Val: 1005}},
113+
}
114+
expectedFeatureNamesResp := []string{"driver_id", "conv_rate", "acc_rate", "avg_daily_trips"}
76115
assert.Nil(t, err)
77116
assert.NotNil(t, response)
117+
rows, err := test.ReadParquet(filepath.Join(dir, "feature_repo", "data", "driver_stats.parquet"))
118+
assert.Nil(t, err)
119+
entityKeys := map[int64]bool{1001: true, 1003: true, 1005: true}
120+
correctFeatures := test.GetLatestFeatures(rows, entityKeys)
121+
expectedConvRateValues := []*types.Value{}
122+
expectedAccRateValues := []*types.Value{}
123+
expectedAvgDailyTripsValues := []*types.Value{}
124+
125+
for _, key := range []int64{1001, 1003, 1005} {
126+
expectedConvRateValues = append(expectedConvRateValues, &types.Value{Val: &types.Value_FloatVal{FloatVal: correctFeatures[key].Conv_rate}})
127+
expectedAccRateValues = append(expectedAccRateValues, &types.Value{Val: &types.Value_FloatVal{FloatVal: correctFeatures[key].Acc_rate}})
128+
expectedAvgDailyTripsValues = append(expectedAvgDailyTripsValues, &types.Value{Val: &types.Value_Int64Val{Int64Val: int64(correctFeatures[key].Avg_daily_trips)}})
129+
}
130+
// Columnar so get in column format row by row should have column names of all features
131+
assert.Equal(t, len(response.Results), 4)
132+
133+
assert.True(t, reflect.DeepEqual(response.Results[0].Values, expectedEntityValuesResp))
134+
assert.True(t, reflect.DeepEqual(response.Results[1].Values, expectedConvRateValues))
135+
assert.True(t, reflect.DeepEqual(response.Results[2].Values, expectedAccRateValues))
136+
assert.True(t, reflect.DeepEqual(response.Results[3].Values, expectedAvgDailyTripsValues))
137+
138+
assert.True(t, reflect.DeepEqual(response.Metadata.FeatureNames.Val, expectedFeatureNamesResp))
78139
}

go/internal/feast/featurestore.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,7 @@ func (fs *FeatureStore) GetOnlineFeatures(
198198
}
199199
result := make([]*FeatureVector, 0)
200200
arrowMemory := memory.NewGoAllocator()
201+
201202
for _, groupRef := range groupedRefs {
202203
featureData, err := fs.readFromOnlineStore(ctx, groupRef.entityKeys, groupRef.featureViewNames, groupRef.featureNames)
203204
if err != nil {
@@ -348,7 +349,6 @@ func (fs *FeatureStore) getFeatureViewsToUseByFeatureRefs(features []string, hid
348349
fvs := make(map[string]*FeatureView)
349350
requestFvs := make(map[string]*RequestFeatureView)
350351
odFvs := make(map[string]*OnDemandFeatureView)
351-
352352
featureViews, err := fs.listFeatureViews(hideDummyEntity)
353353
if err != nil {
354354
return nil, nil, nil, nil, err
@@ -405,7 +405,6 @@ func (fs *FeatureStore) getFeatureViewsToUseByFeatureRefs(features []string, hid
405405
" feature view %s and that you have registered it by running \"apply\"", featureViewName, featureViewName)
406406
}
407407
}
408-
409408
return fvs, fvsToUse, requestFvsToUse, odFvsToUse, nil
410409
}
411410

@@ -561,9 +560,9 @@ func (fs *FeatureStore) readFromOnlineStore(ctx context.Context, entityRows []*p
561560
requestedFeatureNames []string,
562561
) ([][]FeatureData, error) {
563562
numRows := len(entityRows)
564-
entityRowsValue := make([]prototypes.EntityKey, numRows)
563+
entityRowsValue := make([]*prototypes.EntityKey, numRows)
565564
for index, entityKey := range entityRows {
566-
entityRowsValue[index] = prototypes.EntityKey{JoinKeys: entityKey.JoinKeys, EntityValues: entityKey.EntityValues}
565+
entityRowsValue[index] = &prototypes.EntityKey{JoinKeys: entityKey.JoinKeys, EntityValues: entityKey.EntityValues}
567566
}
568567
return fs.onlineStore.OnlineRead(ctx, entityRowsValue, requestedFeatureViewNames, requestedFeatureNames)
569568
}

go/internal/feast/onlinestore.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package feast
33
import (
44
"context"
55
"fmt"
6+
67
"github.com/feast-dev/feast/go/protos/feast/serving"
78
"github.com/feast-dev/feast/go/protos/feast/types"
89
"github.com/golang/protobuf/ptypes/timestamp"
@@ -33,7 +34,7 @@ type OnlineStore interface {
3334
// Feature object as pointers in GetOnlineFeaturesResponse)
3435
// => allocate memory for each field once in OnlineRead
3536
// and reuse them in GetOnlineFeaturesResponse?
36-
OnlineRead(ctx context.Context, entityKeys []types.EntityKey, featureViewNames []string, featureNames []string) ([][]FeatureData, error)
37+
OnlineRead(ctx context.Context, entityKeys []*types.EntityKey, featureViewNames []string, featureNames []string) ([][]FeatureData, error)
3738
// Destruct must be call once user is done using OnlineStore
3839
// This is to comply with the Connector since we have to close the plugin
3940
Destruct()
@@ -51,12 +52,13 @@ func getOnlineStoreType(onlineStoreConfig map[string]interface{}) (string, bool)
5152
func NewOnlineStore(config *RepoConfig) (OnlineStore, error) {
5253
onlineStoreType, ok := getOnlineStoreType(config.OnlineStore)
5354
if !ok {
54-
return nil, fmt.Errorf("could not get online store type from online store config: %+v", config.OnlineStore)
55+
onlineStore, err := NewSqliteOnlineStore(config.Project, config, config.OnlineStore)
56+
return onlineStore, err
5557
}
5658
if onlineStoreType == "redis" {
5759
onlineStore, err := NewRedisOnlineStore(config.Project, config.OnlineStore)
5860
return onlineStore, err
5961
} else {
60-
return nil, fmt.Errorf("%s online store type is currently not supported; only Redis is supported", onlineStoreType)
62+
return nil, fmt.Errorf("%s online store type is currently not supported; only redis and sqlite are supported", onlineStoreType)
6163
}
6264
}

go/internal/feast/redisonlinestore.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,15 +5,16 @@ import (
55
"encoding/binary"
66
"errors"
77
"fmt"
8+
"sort"
9+
"strconv"
10+
"strings"
11+
812
"github.com/feast-dev/feast/go/protos/feast/serving"
913
"github.com/feast-dev/feast/go/protos/feast/types"
1014
"github.com/go-redis/redis/v8"
1115
"github.com/golang/protobuf/proto"
1216
"github.com/spaolacci/murmur3"
1317
timestamppb "google.golang.org/protobuf/types/known/timestamppb"
14-
"sort"
15-
"strconv"
16-
"strings"
1718
)
1819

1920
type redisType int
@@ -117,7 +118,7 @@ func getRedisType(onlineStoreConfig map[string]interface{}) (redisType, error) {
117118
return t, nil
118119
}
119120

120-
func (r *RedisOnlineStore) OnlineRead(ctx context.Context, entityKeys []types.EntityKey, featureViewNames []string, featureNames []string) ([][]FeatureData, error) {
121+
func (r *RedisOnlineStore) OnlineRead(ctx context.Context, entityKeys []*types.EntityKey, featureViewNames []string, featureNames []string) ([][]FeatureData, error) {
121122
featureCount := len(featureNames)
122123
index := featureCount
123124
featureViewIndices := make(map[string]int)
@@ -152,7 +153,7 @@ func (r *RedisOnlineStore) OnlineRead(ctx context.Context, entityKeys []types.En
152153
redisKeyToEntityIndex := make(map[string]int)
153154
for i := 0; i < len(entityKeys); i++ {
154155

155-
var key, err = buildRedisKey(r.project, &entityKeys[i])
156+
var key, err = buildRedisKey(r.project, entityKeys[i])
156157
if err != nil {
157158
return nil, err
158159
}

0 commit comments

Comments
 (0)