Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Remove the allowCache flag
Signed-off-by: Kevin Zhang <kzhang@tecton.ai>
  • Loading branch information
kevjumba committed Mar 22, 2022
commit c9ad601d6a61f03dc7b1ae8da2472ca5ba21de4e
28 changes: 14 additions & 14 deletions go/internal/feast/featurestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func (fs *FeatureStore) GetOnlineFeatures(ctx context.Context, request *serving.
return nil, err
}

fvs, requestedFeatureViews, requestedRequestFeatureViews, requestedOnDemandFeatureViews, err := fs.getFeatureViewsToUse(features, true, false)
fvs, requestedFeatureViews, requestedRequestFeatureViews, requestedOnDemandFeatureViews, err := fs.getFeatureViewsToUse(features, false)

if len(requestedRequestFeatureViews)+len(requestedOnDemandFeatureViews) > 0 {
return nil, status.Errorf(codes.InvalidArgument, "on demand feature views are currently not supported")
Expand Down Expand Up @@ -217,7 +217,7 @@ func (fs *FeatureStore) parseFeatures(kind interface{}) (*Features, error) {
return &Features{features: featureList.Features.GetVal(), featureService: nil}, nil
}
if featureServiceRequest, ok := kind.(*serving.GetOnlineFeaturesRequest_FeatureService); ok {
featureService, err := fs.registry.getFeatureService(fs.config.Project, featureServiceRequest.FeatureService, true)
featureService, err := fs.registry.getFeatureService(fs.config.Project, featureServiceRequest.FeatureService)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -254,28 +254,28 @@ func (fs *FeatureStore) getFeatureRefs(features *Features) ([]string, error) {
retrieving all feature views. Similar argument to FeatureService applies.

*/
func (fs *FeatureStore) getFeatureViewsToUse(features *Features, allowCache, hideDummyEntity bool) (map[string]*FeatureView, map[*FeatureView][]string, []*RequestFeatureView, []*OnDemandFeatureView, error) {
func (fs *FeatureStore) getFeatureViewsToUse(features *Features, hideDummyEntity bool) (map[string]*FeatureView, map[*FeatureView][]string, []*RequestFeatureView, []*OnDemandFeatureView, error) {
fvs := make(map[string]*FeatureView)
requestFvs := make(map[string]*RequestFeatureView)
odFvs := make(map[string]*OnDemandFeatureView)

featureViews, err := fs.listFeatureViews(allowCache, hideDummyEntity)
featureViews, err := fs.listFeatureViews(hideDummyEntity)
if err != nil {
return nil, nil, nil, nil, err
}
for _, featureView := range featureViews {
fvs[featureView.base.name] = featureView
}

requestFeatureViews, err := fs.registry.listRequestFeatureViews(fs.config.Project, allowCache)
requestFeatureViews, err := fs.registry.listRequestFeatureViews(fs.config.Project)
if err != nil {
return nil, nil, nil, nil, err
}
for _, requestFeatureView := range requestFeatureViews {
requestFvs[requestFeatureView.base.name] = requestFeatureView
}

onDemandFeatureViews, err := fs.registry.listOnDemandFeatureViews(fs.config.Project, allowCache)
onDemandFeatureViews, err := fs.registry.listOnDemandFeatureViews(fs.config.Project)
if err != nil {
return nil, nil, nil, nil, err
}
Expand Down Expand Up @@ -357,7 +357,7 @@ func (fs *FeatureStore) getEntityMaps(requestedFeatureViews map[*FeatureView][]s
var joinKeyMap map[string]string
var featureView *FeatureView

entities, err := fs.listEntities(true, false)
entities, err := fs.listEntities(false)
if err != nil {
return nil, err
}
Expand All @@ -372,7 +372,7 @@ func (fs *FeatureStore) getEntityMaps(requestedFeatureViews map[*FeatureView][]s
joinKeyMap = featureView.base.projection.joinKeyMap
for entityName = range entityNames {

entity, err := fs.registry.getEntity(fs.config.Project, entityName, true)
entity, err := fs.registry.getEntity(fs.config.Project, entityName)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -649,8 +649,8 @@ func (fs *FeatureStore) dropUnneededColumns(onlineFeaturesResponse *serving.GetO
}
}

func (fs *FeatureStore) listFeatureViews(allowCache, hideDummyEntity bool) ([]*FeatureView, error) {
featureViews, err := fs.registry.listFeatureViews(fs.config.Project, allowCache)
func (fs *FeatureStore) listFeatureViews(hideDummyEntity bool) ([]*FeatureView, error) {
Comment thread
kevjumba marked this conversation as resolved.
featureViews, err := fs.registry.listFeatureViews(fs.config.Project)
if err != nil {
return featureViews, err
}
Expand All @@ -662,9 +662,9 @@ func (fs *FeatureStore) listFeatureViews(allowCache, hideDummyEntity bool) ([]*F
return featureViews, nil
}
Comment thread
kevjumba marked this conversation as resolved.

func (fs *FeatureStore) listEntities(allowCache, hideDummyEntity bool) ([]*Entity, error) {
func (fs *FeatureStore) listEntities(hideDummyEntity bool) ([]*Entity, error) {

allEntities, err := fs.registry.listEntities(fs.config.Project, allowCache)
allEntities, err := fs.registry.listEntities(fs.config.Project)
if err != nil {
return allEntities, err
}
Expand Down Expand Up @@ -839,8 +839,8 @@ func (fs *FeatureStore) groupFeatureRefs(requestedFeatureViews map[*FeatureView]
return fvFeatures, nil
}

func (fs *FeatureStore) getFeatureView(project, featureViewName string, allowCache, hideDummyEntity bool) (*FeatureView, error) {
fv, err := fs.registry.getFeatureView(fs.config.Project, featureViewName, allowCache)
func (fs *FeatureStore) getFeatureView(project, featureViewName string, hideDummyEntity bool) (*FeatureView, error) {
fv, err := fs.registry.getFeatureView(fs.config.Project, featureViewName)
if err != nil {
return nil, err
}
Expand Down
86 changes: 30 additions & 56 deletions go/internal/feast/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"errors"
"fmt"
"net/url"
"sync"
"time"

"github.com/feast-dev/feast/go/protos/feast/core"
Expand Down Expand Up @@ -32,6 +33,7 @@ type Registry struct {
cachedRegistry *core.Registry
Comment thread
kevjumba marked this conversation as resolved.
Outdated
cachedRegistryProtoCreated time.Time
Comment thread
kevjumba marked this conversation as resolved.
Outdated
cachedRegistryProtoTtl time.Duration
mu sync.Mutex
}

func NewRegistry(registryConfig *RegistryConfig, repoPath string) (*Registry, error) {
Expand Down Expand Up @@ -59,35 +61,46 @@ func NewRegistry(registryConfig *RegistryConfig, repoPath string) (*Registry, er
}

func (r *Registry) initializeRegistry() {
_, err := r.getRegistryProto(false)
_, err := r.getRegistryProto()
if err != nil {
registryProto := &core.Registry{RegistrySchemaVersion: REGISTRY_SCHEMA_VERSION}
r.registryStore.UpdateRegistryProto(registryProto)
r.load(registryProto)
go r.refreshRegistryOnInterval()
}
}

func (r *Registry) refreshRegistryOnInterval() {
ticker := time.NewTicker(r.cachedRegistryProtoTtl)
for ; true; <-ticker.C {
err := r.refresh()
if err != nil {
return
}
}
}

// TODO: Add a goroutine and automatically refresh every cachedRegistryProtoTtl
func (r *Registry) refresh() error {
_, err := r.getRegistryProto(false)
_, err := r.getRegistryProto()
return err
}

func (r *Registry) getRegistryProto(allowCache bool) (*core.Registry, error) {
func (r *Registry) getRegistryProto() (*core.Registry, error) {
expired := r.cachedRegistry == nil || (r.cachedRegistryProtoTtl > 0 && time.Now().After(r.cachedRegistryProtoCreated.Add(r.cachedRegistryProtoTtl)))
if allowCache && !expired {
if !expired {
return r.cachedRegistry, nil
}
registryProto, err := r.registryStore.GetRegistryProto()
if err != nil {
return registryProto, err
}
r.load(registryProto)
r.cachedRegistryProtoCreated = time.Now()
return registryProto, nil
}

func (r *Registry) load(registry *core.Registry) {
r.mu.Lock()
defer r.mu.Unlock()
r.cachedRegistry = registry
r.cachedFeatureServices = make(map[string]map[string]*core.FeatureService)
r.cachedEntities = make(map[string]map[string]*core.Entity)
Expand All @@ -99,6 +112,7 @@ func (r *Registry) load(registry *core.Registry) {
r.loadFeatureViews(registry)
r.loadOnDemandFeatureViews(registry)
r.loadRequestFeatureViews(registry)
r.cachedRegistryProtoCreated = time.Now()
}

func (r *Registry) loadEntities(registry *core.Registry) {
Expand Down Expand Up @@ -156,11 +170,7 @@ func (r *Registry) loadRequestFeatureViews(registry *core.Registry) {
Returns empty list if project not found
*/

func (r *Registry) listEntities(project string, allowCache bool) ([]*Entity, error) {
_, err := r.getRegistryProto(allowCache)
if err != nil {
return nil, err
}
func (r *Registry) listEntities(project string) ([]*Entity, error) {
if cachedEntities, ok := r.cachedEntities[project]; !ok {
return []*Entity{}, nil
} else {
Expand All @@ -179,11 +189,7 @@ func (r *Registry) listEntities(project string, allowCache bool) ([]*Entity, err
Returns empty list if project not found
*/

func (r *Registry) listFeatureViews(project string, allowCache bool) ([]*FeatureView, error) {
_, err := r.getRegistryProto(allowCache)
if err != nil {
return nil, err
}
func (r *Registry) listFeatureViews(project string) ([]*FeatureView, error) {
if cachedFeatureViews, ok := r.cachedFeatureViews[project]; !ok {
Comment thread
kevjumba marked this conversation as resolved.
return []*FeatureView{}, nil
} else {
Expand All @@ -202,11 +208,7 @@ func (r *Registry) listFeatureViews(project string, allowCache bool) ([]*Feature
Returns empty list if project not found
*/

func (r *Registry) listFeatureServices(project string, allowCache bool) ([]*FeatureService, error) {
_, err := r.getRegistryProto(allowCache)
if err != nil {
return nil, err
}
func (r *Registry) listFeatureServices(project string) ([]*FeatureService, error) {
if cachedFeatureServices, ok := r.cachedFeatureServices[project]; !ok {
return []*FeatureService{}, nil
} else {
Expand All @@ -225,11 +227,7 @@ func (r *Registry) listFeatureServices(project string, allowCache bool) ([]*Feat
Returns empty list if project not found
*/

func (r *Registry) listOnDemandFeatureViews(project string, allowCache bool) ([]*OnDemandFeatureView, error) {
_, err := r.getRegistryProto(allowCache)
if err != nil {
return nil, err
}
func (r *Registry) listOnDemandFeatureViews(project string) ([]*OnDemandFeatureView, error) {
if cachedOnDemandFeatureViews, ok := r.cachedOnDemandFeatureViews[project]; !ok {
return []*OnDemandFeatureView{}, nil
} else {
Expand All @@ -248,11 +246,7 @@ func (r *Registry) listOnDemandFeatureViews(project string, allowCache bool) ([]
Returns empty list if project not found
*/

func (r *Registry) listRequestFeatureViews(project string, allowCache bool) ([]*RequestFeatureView, error) {
_, err := r.getRegistryProto(allowCache)
if err != nil {
return nil, err
}
func (r *Registry) listRequestFeatureViews(project string) ([]*RequestFeatureView, error) {
if cachedRequestFeatureViews, ok := r.cachedRequestFeatureViews[project]; !ok {
return []*RequestFeatureView{}, nil
} else {
Expand All @@ -266,11 +260,7 @@ func (r *Registry) listRequestFeatureViews(project string, allowCache bool) ([]*
}
}

func (r *Registry) getEntity(project, entityName string, allowCache bool) (*Entity, error) {
_, err := r.getRegistryProto(allowCache)
if err != nil {
return nil, err
}
func (r *Registry) getEntity(project, entityName string) (*Entity, error) {
if cachedEntities, ok := r.cachedEntities[project]; !ok {
return nil, fmt.Errorf("no cached entities found for project %s", project)
} else {
Expand All @@ -282,11 +272,7 @@ func (r *Registry) getEntity(project, entityName string, allowCache bool) (*Enti
}
}

func (r *Registry) getFeatureView(project, featureViewName string, allowCache bool) (*FeatureView, error) {
_, err := r.getRegistryProto(allowCache)
if err != nil {
return nil, err
}
func (r *Registry) getFeatureView(project, featureViewName string) (*FeatureView, error) {
if cachedFeatureViews, ok := r.cachedFeatureViews[project]; !ok {
return nil, fmt.Errorf("no cached feature views found for project %s", project)
} else {
Expand All @@ -298,11 +284,7 @@ func (r *Registry) getFeatureView(project, featureViewName string, allowCache bo
}
}

func (r *Registry) getFeatureService(project, featureServiceName string, allowCache bool) (*FeatureService, error) {
_, err := r.getRegistryProto(allowCache)
if err != nil {
return nil, err
}
func (r *Registry) getFeatureService(project, featureServiceName string) (*FeatureService, error) {
if cachedFeatureServices, ok := r.cachedFeatureServices[project]; !ok {
return nil, fmt.Errorf("no cached feature services found for project %s", project)
} else {
Expand All @@ -314,11 +296,7 @@ func (r *Registry) getFeatureService(project, featureServiceName string, allowCa
}
}

func (r *Registry) getOnDemandFeatureView(project, onDemandFeatureViewName string, allowCache bool) (*OnDemandFeatureView, error) {
_, err := r.getRegistryProto(allowCache)
if err != nil {
return nil, err
}
func (r *Registry) getOnDemandFeatureView(project, onDemandFeatureViewName string) (*OnDemandFeatureView, error) {
if cachedOnDemandFeatureViews, ok := r.cachedOnDemandFeatureViews[project]; !ok {
return nil, fmt.Errorf("no cached on demand feature views found for project %s", project)
} else {
Expand All @@ -330,11 +308,7 @@ func (r *Registry) getOnDemandFeatureView(project, onDemandFeatureViewName strin
}
}

func (r *Registry) getRequestFeatureView(project, requestFeatureViewName string, allowCache bool) (*RequestFeatureView, error) {
_, err := r.getRegistryProto(allowCache)
if err != nil {
return nil, err
}
func (r *Registry) getRequestFeatureView(project, requestFeatureViewName string) (*RequestFeatureView, error) {
if cachedRequestFeatureViews, ok := r.cachedRequestFeatureViews[project]; !ok {
return nil, fmt.Errorf("no cached on request feature views found for project %s", project)
} else {
Expand Down