Skip to content

Commit b35e1e8

Browse files
refactor: Go server cleanup (feast-dev#2397)
* Rename goserver folder to server Signed-off-by: Felix Wang <wangfelix98@gmail.com> * Various comments and other small cleanup Signed-off-by: Felix Wang <wangfelix98@gmail.com> * Format errors to be lower case and not end with punctuation Signed-off-by: Felix Wang <wangfelix98@gmail.com> * Format and other small cleanup Signed-off-by: Felix Wang <wangfelix98@gmail.com> * More cleanup Signed-off-by: Felix Wang <wangfelix98@gmail.com> * Fix constant Signed-off-by: Felix Wang <wangfelix98@gmail.com> * Switch to gRPC errors Signed-off-by: Felix Wang <wangfelix98@gmail.com> * Rename ParsedKind struct to Features Signed-off-by: Felix Wang <wangfelix98@gmail.com> * Cleanup Signed-off-by: Felix Wang <wangfelix98@gmail.com> * Rename goserver binary to server Signed-off-by: Felix Wang <wangfelix98@gmail.com>
1 parent c2389da commit b35e1e8

14 files changed

Lines changed: 87 additions & 110 deletions

File tree

Makefile

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -77,11 +77,11 @@ test-python-universal:
7777
FEAST_USAGE=False IS_TEST=True python -m pytest -n 8 --integration --universal sdk/python/tests
7878

7979
test-python-go-server:
80-
go build -o ${ROOT_DIR}/sdk/python/feast/binaries/goserver github.com/feast-dev/feast/go/cmd/goserver
80+
go build -o ${ROOT_DIR}/sdk/python/feast/binaries/server github.com/feast-dev/feast/go/cmd/server
8181
FEAST_USAGE=False IS_TEST=True python -m pytest -n 8 --integration --goserver sdk/python/tests
8282

8383
test-python-go-server-lifecycle:
84-
go build -o ${ROOT_DIR}/sdk/python/feast/binaries/goserver github.com/feast-dev/feast/go/cmd/goserver
84+
go build -o ${ROOT_DIR}/sdk/python/feast/binaries/server github.com/feast-dev/feast/go/cmd/server
8585
FEAST_USAGE=False IS_TEST=True python -m pytest -n 8 --integration --goserverlifecycle sdk/python/tests
8686

8787
format-python:
@@ -137,7 +137,7 @@ compile-protos-go: install-go-proto-dependencies install-protoc-dependencies
137137

138138
compile-go-feature-server: compile-protos-go
139139
go mod tidy
140-
go build -o ${ROOT_DIR}/sdk/python/feast/binaries/goserver github.com/feast-dev/feast/go/cmd/goserver
140+
go build -o ${ROOT_DIR}/sdk/python/feast/binaries/server github.com/feast-dev/feast/go/cmd/server
141141

142142
test-go: compile-protos-go
143143
go test ./...
@@ -146,7 +146,7 @@ format-go:
146146
gofmt -s -w go/
147147

148148
lint-go: compile-protos-go
149-
go vet ./go/internal/feast ./go/cmd/goserver
149+
go vet ./go/internal/feast ./go/cmd/server
150150

151151
# Docker
152152

go.mod

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ require (
77
github.com/go-redis/redis/v8 v8.11.4
88
github.com/golang/protobuf v1.5.2
99
github.com/google/uuid v1.1.2
10-
github.com/kelseyhightower/envconfig v1.4.0
1110
github.com/spaolacci/murmur3 v1.1.0
1211
github.com/stretchr/testify v1.7.0
1312
google.golang.org/grpc v1.44.0

go.sum

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,8 +66,6 @@ github.com/google/uuid v1.1.2 h1:EVhdT+1Kseyi1/pUmXKaFxYsDNy9RQYkMWRH68J/W7Y=
6666
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
6767
github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw=
6868
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
69-
github.com/kelseyhightower/envconfig v1.4.0 h1:Im6hONhd3pLkfDFsbRgu68RDNkGF1r3dvMUtDTo2cv8=
70-
github.com/kelseyhightower/envconfig v1.4.0/go.mod h1:cccZRl6mQpaq41TPp5QxidR+Sa3axMbJDNb//FQX6Gg=
7169
github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A=
7270
github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE=
7371
github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU=
Lines changed: 11 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -4,47 +4,37 @@ import (
44
"fmt"
55
"github.com/feast-dev/feast/go/internal/feast"
66
"github.com/feast-dev/feast/go/protos/feast/serving"
7-
"github.com/kelseyhightower/envconfig"
87
"google.golang.org/grpc"
98
"log"
109
"net"
10+
"os"
1111
)
1212

1313
const (
1414
flagFeastRepoPath = "FEAST_REPO_PATH"
1515
flagFeastRepoConfig = "FEAST_REPO_CONFIG"
16+
flagFeastSockFile = "FEAST_GRPC_SOCK_FILE"
1617
feastServerVersion = "0.18.0"
1718
)
1819

19-
type FeastEnvConfig struct {
20-
RepoPath string `envconfig:"FEAST_REPO_PATH"`
21-
RepoConfig string `envconfig:"FEAST_REPO_CONFIG"`
22-
SockFile string `envconfig:"FEAST_GRPC_SOCK_FILE"`
23-
}
24-
2520
// TODO: Add a proper logging library such as https://github.com/Sirupsen/logrus
2621
func main() {
27-
28-
var feastEnvConfig FeastEnvConfig
29-
var err error
30-
err = envconfig.Process("feast", &feastEnvConfig)
31-
if err != nil {
32-
log.Fatal(err)
33-
}
34-
if feastEnvConfig.RepoPath == "" && feastEnvConfig.RepoConfig == "" {
22+
repoPath := os.Getenv(flagFeastRepoPath)
23+
repoConfigJSON := os.Getenv(flagFeastRepoConfig)
24+
sockFile := os.Getenv(flagFeastSockFile)
25+
if repoPath == "" && repoConfigJSON == "" {
3526
log.Fatalln(fmt.Sprintf("One of %s of %s environment variables must be set", flagFeastRepoPath, flagFeastRepoConfig))
3627
}
37-
// TODO(Ly): Review: Should we return and error here if both repoPath and repoConfigJson are set and use the cwd for NewRepoConfigFromJson?
3828

3929
var repoConfig *feast.RepoConfig
40-
41-
if len(feastEnvConfig.RepoConfig) > 0 {
42-
repoConfig, err = feast.NewRepoConfigFromJson(feastEnvConfig.RepoPath, feastEnvConfig.RepoConfig)
30+
var err error
31+
if repoConfigJSON != "" {
32+
repoConfig, err = feast.NewRepoConfigFromJSON(repoPath, repoConfigJSON)
4333
if err != nil {
4434
log.Fatalln(err)
4535
}
4636
} else {
47-
repoConfig, err = feast.NewRepoConfigFromFile(feastEnvConfig.RepoPath)
37+
repoConfig, err = feast.NewRepoConfigFromFile(repoPath)
4838
if err != nil {
4939
log.Fatalln(err)
5040
}
@@ -56,7 +46,7 @@ func main() {
5646
log.Fatalln(err)
5747
}
5848
defer fs.DestructOnlineStore()
59-
startGrpcServer(fs, feastEnvConfig.SockFile)
49+
startGrpcServer(fs, sockFile)
6050
}
6151

6252
func startGrpcServer(fs *feast.FeatureStore, sockFile string) {

go/internal/feast/basefeatureview.go

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package feast
22

33
import (
4-
"errors"
54
"fmt"
65
"github.com/feast-dev/feast/go/protos/feast/core"
76
)
@@ -25,18 +24,18 @@ func NewBaseFeatureView(name string, featureProtos []*core.FeatureSpecV2) *BaseF
2524

2625
func (fv *BaseFeatureView) withProjection(projection *FeatureViewProjection) (*BaseFeatureView, error) {
2726
if projection.name != fv.name {
28-
return nil, errors.New(fmt.Sprintf("The projection for the %s FeatureView cannot be applied because it differs in name. "+
29-
"The projection is named %s and the name indicates which "+
30-
"FeatureView the projection is for.", fv.name, projection.name))
27+
return nil, fmt.Errorf("the projection for the %s FeatureView cannot be applied because it differs "+
28+
"in name; the projection is named %s and the name indicates which "+
29+
"FeatureView the projection is for", fv.name, projection.name)
3130
}
3231
features := make(map[string]bool)
3332
for _, feature := range fv.features {
3433
features[feature.name] = true
3534
}
3635
for _, feature := range projection.features {
3736
if _, ok := features[feature.name]; !ok {
38-
return nil, errors.New(fmt.Sprintf("The projection for %s cannot be applied because it contains %s which the "+
39-
"FeatureView doesn't have.", projection.name, feature.name))
37+
return nil, fmt.Errorf("the projection for %s cannot be applied because it contains %s which the "+
38+
"FeatureView doesn't have", projection.name, feature.name)
4039
}
4140
}
4241
return &BaseFeatureView{name: fv.name, features: fv.features, projection: projection}, nil

go/internal/feast/featurestore.go

Lines changed: 38 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,10 @@ import (
66
"fmt"
77
"github.com/feast-dev/feast/go/protos/feast/serving"
88
"github.com/feast-dev/feast/go/protos/feast/types"
9-
durationpb "google.golang.org/protobuf/types/known/durationpb"
10-
timestamppb "google.golang.org/protobuf/types/known/timestamppb"
9+
"google.golang.org/grpc/codes"
10+
"google.golang.org/grpc/status"
11+
"google.golang.org/protobuf/types/known/durationpb"
12+
"google.golang.org/protobuf/types/known/timestamppb"
1113
"sort"
1214
"strings"
1315
)
@@ -23,7 +25,10 @@ type entityKeyRow struct {
2325
rowIndices []int
2426
}
2527

26-
type ParsedKind struct {
28+
// A Features struct specifies a list of features to be retrieved from the online store. These features
29+
// can be specified either as a list of string feature references or as a feature service. String
30+
// feature references must have format "feature_view:feature", e.g. "customer_fv:daily_transactions".
31+
type Features struct {
2732
features []string
2833
featureService *FeatureService
2934
}
@@ -47,7 +52,7 @@ type GroupedFeaturesPerEntitySet struct {
4752
// NewFeatureStore constructs a feature store fat client using the
4853
// repo config (contents of feature_store.yaml converted to JSON map).
4954
func NewFeatureStore(config *RepoConfig) (*FeatureStore, error) {
50-
onlineStore, err := getOnlineStore(config)
55+
onlineStore, err := NewOnlineStore(config)
5156
if err != nil {
5257
return nil, err
5358
}
@@ -67,14 +72,13 @@ func NewFeatureStore(config *RepoConfig) (*FeatureStore, error) {
6772

6873
// TODO: Review all functions that use ODFV and Request FV since these have not been tested
6974
func (fs *FeatureStore) GetOnlineFeatures(ctx context.Context, request *serving.GetOnlineFeaturesRequest) (*serving.GetOnlineFeaturesResponse, error) {
70-
kind := request.GetKind()
7175
fullFeatureNames := request.GetFullFeatureNames()
72-
parsedKind, err := fs.parseKind(kind)
76+
features, err := fs.parseFeatures(request.GetKind())
7377
if err != nil {
7478
return nil, err
7579
}
7680

77-
featureRefs, err := fs.getFeatures(parsedKind, true)
81+
featureRefs, err := fs.getFeatureRefs(features)
7882
if err != nil {
7983
return nil, err
8084
}
@@ -88,13 +92,11 @@ func (fs *FeatureStore) GetOnlineFeatures(ctx context.Context, request *serving.
8892
return nil, err
8993
}
9094

91-
fvs, requestedFeatureViews, requestedRequestFeatureViews, requestedOnDemandFeatureViews, err := fs.getFeatureViewsToUse(parsedKind, true, false)
95+
fvs, requestedFeatureViews, requestedRequestFeatureViews, requestedOnDemandFeatureViews, err := fs.getFeatureViewsToUse(features, true, false)
9296

93-
// TODO (Ly): Remove this BLOCK once odfv is supported
9497
if len(requestedRequestFeatureViews)+len(requestedOnDemandFeatureViews) > 0 {
95-
return nil, errors.New("ODFV is not supported in this iteration, please wait!")
98+
return nil, status.Errorf(codes.InvalidArgument, "on demand feature views are currently not supported")
9699
}
97-
// END BLOCK
98100

99101
if err != nil {
100102
return nil, err
@@ -124,7 +126,7 @@ func (fs *FeatureStore) GetOnlineFeatures(ctx context.Context, request *serving.
124126
// requestDataFeatures[entityName] = vals
125127
} else {
126128
if joinKey, ok := entityNameToJoinKeyMap[entityName]; !ok {
127-
return nil, errors.New(fmt.Sprintf("entityNotFoundException: %s\n%v", entityName, entityNameToJoinKeyMap))
129+
return nil, fmt.Errorf("entityNotFoundException: %s\n%v", entityName, entityNameToJoinKeyMap)
128130
} else {
129131
responseEntities[joinKey] = vals
130132
}
@@ -207,42 +209,36 @@ func (fs *FeatureStore) DestructOnlineStore() {
207209
fs.onlineStore.Destruct()
208210
}
209211

210-
func (fs *FeatureStore) parseKind(kind interface{}) (*ParsedKind, error) {
212+
// parseFeatures parses the kind field of a GetOnlineFeaturesRequest protobuf message
213+
// and populates a Features struct with the result.
214+
func (fs *FeatureStore) parseFeatures(kind interface{}) (*Features, error) {
211215
if featureList, ok := kind.(*serving.GetOnlineFeaturesRequest_Features); ok {
212-
return &ParsedKind{features: featureList.Features.GetVal(), featureService: nil}, nil
216+
return &Features{features: featureList.Features.GetVal(), featureService: nil}, nil
213217
}
214218
if featureServiceRequest, ok := kind.(*serving.GetOnlineFeaturesRequest_FeatureService); ok {
215219
featureService, err := fs.registry.getFeatureService(fs.config.Project, featureServiceRequest.FeatureService)
216220
if err != nil {
217221
return nil, err
218222
}
219-
return &ParsedKind{features: nil, featureService: featureService}, nil
223+
return &Features{features: nil, featureService: featureService}, nil
220224
}
221225
return nil, errors.New("cannot parse kind from GetOnlineFeaturesRequest")
222226
}
223227

224-
/*
225-
This function returns all feature references from GetOnlineFeaturesRequest.
226-
If a list of feature references is passed, return it.
227-
Otherwise, FeatureService was passed, parse this feature service to get a list of FeatureViewProjection and return feature
228-
references from this list
229-
*/
230-
231-
func (fs *FeatureStore) getFeatures(parsedKind *ParsedKind, allowCache bool) ([]string, error) {
232-
233-
if parsedKind.featureService != nil {
234-
228+
// getFeatureRefs extracts a list of feature references from a Features struct.
229+
func (fs *FeatureStore) getFeatureRefs(features *Features) ([]string, error) {
230+
if features.featureService != nil {
235231
var featureViewName string
236-
features := make([]string, 0)
237-
for _, featureProjection := range parsedKind.featureService.projections {
232+
featureRefs := make([]string, 0)
233+
for _, featureProjection := range features.featureService.projections {
238234
featureViewName = featureProjection.nameToUse()
239235
for _, feature := range featureProjection.features {
240-
features = append(features, fmt.Sprintf("%s:%s", featureViewName, feature.name))
236+
featureRefs = append(featureRefs, fmt.Sprintf("%s:%s", featureViewName, feature.name))
241237
}
242238
}
243-
return features, nil
239+
return featureRefs, nil
244240
} else {
245-
return parsedKind.features, nil
241+
return features.features, nil
246242
}
247243
}
248244

@@ -257,8 +253,7 @@ func (fs *FeatureStore) getFeatures(parsedKind *ParsedKind, allowCache bool) ([]
257253
retrieving all feature views. Similar argument to FeatureService applies.
258254
259255
*/
260-
func (fs *FeatureStore) getFeatureViewsToUse(parsedKind *ParsedKind, allowCache, hideDummyEntity bool) (map[string]*FeatureView, map[*FeatureView][]string, []*RequestFeatureView, []*OnDemandFeatureView, error) {
261-
256+
func (fs *FeatureStore) getFeatureViewsToUse(features *Features, allowCache, hideDummyEntity bool) (map[string]*FeatureView, map[*FeatureView][]string, []*RequestFeatureView, []*OnDemandFeatureView, error) {
262257
fvs := make(map[string]*FeatureView)
263258
requestFvs := make(map[string]*RequestFeatureView)
264259
odFvs := make(map[string]*OnDemandFeatureView)
@@ -278,8 +273,8 @@ func (fs *FeatureStore) getFeatureViewsToUse(parsedKind *ParsedKind, allowCache,
278273
odFvs[onDemandFeatureView.base.name] = onDemandFeatureView
279274
}
280275

281-
if parsedKind.featureService != nil {
282-
featureService := parsedKind.featureService
276+
if features.featureService != nil {
277+
featureService := features.featureService
283278

284279
fvsToUse := make(map[*FeatureView][]string)
285280
requestFvsToUse := make([]*RequestFeatureView, 0)
@@ -312,9 +307,9 @@ func (fs *FeatureStore) getFeatureViewsToUse(parsedKind *ParsedKind, allowCache,
312307
}
313308
odFvsToUse = append(odFvsToUse, odFv.NewOnDemandFeatureViewFromBase(base))
314309
} else {
315-
return nil, nil, nil, nil, errors.New(fmt.Sprintf("the provided feature service %s contains a reference to a feature view"+
316-
"%s which doesn't exist. Please make sure that you have created the feature view"+
317-
"%s and that you have registered it by running \"apply\".", featureService.name, featureViewName, featureViewName))
310+
return nil, nil, nil, nil, fmt.Errorf("the provided feature service %s contains a reference to a feature view"+
311+
"%s which doesn't exist, please make sure that you have created the feature view"+
312+
"%s and that you have registered it by running \"apply\"", featureService.name, featureViewName, featureViewName)
318313
}
319314
}
320315
return fvs, fvsToUse, requestFvsToUse, odFvsToUse, nil
@@ -324,7 +319,7 @@ func (fs *FeatureStore) getFeatureViewsToUse(parsedKind *ParsedKind, allowCache,
324319
requestFvsToUse := make([]*RequestFeatureView, 0)
325320
odFvsToUse := make([]*OnDemandFeatureView, 0)
326321

327-
for _, featureRef := range parsedKind.features {
322+
for _, featureRef := range features.features {
328323
featureViewName, featureName, err := parseFeatureReference(featureRef)
329324
if err != nil {
330325
return nil, nil, nil, nil, err
@@ -336,8 +331,8 @@ func (fs *FeatureStore) getFeatureViewsToUse(parsedKind *ParsedKind, allowCache,
336331
} else if odFv, ok := odFvs[featureViewName]; ok {
337332
odFvsToUse = append(odFvsToUse, odFv)
338333
} else {
339-
return nil, nil, nil, nil, errors.New(fmt.Sprintf("feature view %s doesn't exist. Please make sure that you have created the"+
340-
" feature view %s and that you have registered it by running \"apply\".", featureViewName, featureViewName))
334+
return nil, nil, nil, nil, fmt.Errorf("feature view %s doesn't exist, please make sure that you have created the"+
335+
" feature view %s and that you have registered it by running \"apply\"", featureViewName, featureViewName)
341336
}
342337
}
343338

@@ -395,7 +390,7 @@ func (fs *FeatureStore) validateEntityValues(joinKeyValues map[string]*types.Rep
395390
numRows = len(col.Val)
396391
}
397392
if len(setOfRowLengths) > 1 {
398-
return 0, errors.New("valueError: All entity rows must have the same columns.")
393+
return 0, errors.New("valueError: All entity rows must have the same columns")
399394
}
400395
return numRows, nil
401396
}
@@ -490,7 +485,7 @@ func (fs *FeatureStore) ensureRequestedDataExist(neededRequestData map[string]st
490485
missingFeatures = append(missingFeatures, feature)
491486
}
492487
}
493-
return errors.New(fmt.Sprintf("requestDataNotFoundInEntityRowsException: %s", strings.Join(missingFeatures, ", ")))
488+
return fmt.Errorf("requestDataNotFoundInEntityRowsException: %s", strings.Join(missingFeatures, ", "))
494489
}
495490
return nil
496491
}

go/internal/feast/local.go

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,16 +4,19 @@ import (
44
"github.com/feast-dev/feast/go/protos/feast/core"
55
"github.com/golang/protobuf/proto"
66
"github.com/google/uuid"
7-
timestamppb "google.golang.org/protobuf/types/known/timestamppb"
7+
"google.golang.org/protobuf/types/known/timestamppb"
88
"io/ioutil"
99
"os"
1010
"path/filepath"
1111
)
1212

13+
// A LocalRegistryStore is a file-based implementation of the RegistryStore interface.
1314
type LocalRegistryStore struct {
1415
filePath string
1516
}
1617

18+
// NewLocalRegistryStore creates a LocalRegistryStore with the given configuration and infers
19+
// the file path from the repo path and registry path.
1720
func NewLocalRegistryStore(config *RegistryConfig, repoPath string) *LocalRegistryStore {
1821
lr := LocalRegistryStore{}
1922
registryPath := config.Path
@@ -25,11 +28,10 @@ func NewLocalRegistryStore(config *RegistryConfig, repoPath string) *LocalRegist
2528
return &lr
2629
}
2730

31+
// GetRegistryProto reads and parses the registry proto from the file path.
2832
func (r *LocalRegistryStore) GetRegistryProto() (*core.Registry, error) {
2933
registry := &core.Registry{}
30-
// Read the local registry
3134
in, err := ioutil.ReadFile(r.filePath)
32-
// Use an empty Registry Proto if file not exists or parse Registry Proto content from file
3335
if err != nil {
3436
return nil, err
3537
}
@@ -44,11 +46,7 @@ func (r *LocalRegistryStore) UpdateRegistryProto(rp *core.Registry) error {
4446
}
4547

4648
func (r *LocalRegistryStore) Teardown() error {
47-
err := os.Remove(r.filePath)
48-
if err != nil {
49-
return err
50-
}
51-
return nil
49+
return os.Remove(r.filePath)
5250
}
5351

5452
func (r *LocalRegistryStore) writeRegistry(rp *core.Registry) error {

0 commit comments

Comments
 (0)