Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
rename package utils -> types
Signed-off-by: pyalex <moskalenko.alexey@gmail.com>
  • Loading branch information
pyalex committed Mar 24, 2022
commit 167c07ae18ed567ee68f1468652ac1b8b8e51404
4 changes: 2 additions & 2 deletions go/cmd/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"github.com/feast-dev/feast/go/internal/feast"
"github.com/feast-dev/feast/go/protos/feast/serving"
"github.com/feast-dev/feast/go/protos/feast/types"
"github.com/feast-dev/feast/go/utils"
"github.com/feast-dev/feast/go/types"
"github.com/golang/protobuf/ptypes/timestamp"
)

Expand Down Expand Up @@ -63,7 +63,7 @@ func (s *servingServiceServer) GetOnlineFeatures(ctx context.Context, request *s
for _, vector := range featureVectors {
resp.Metadata.FeatureNames.Val = append(resp.Metadata.FeatureNames.Val, vector.Name)

values, err := utils.ArrowValuesToProtoValues(vector.Values)
values, err := types.ArrowValuesToProtoValues(vector.Values)
if err != nil {
return nil, err
}
Expand Down
12 changes: 6 additions & 6 deletions go/embedded/online_features.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ import (
"github.com/apache/arrow/go/arrow/array"
"github.com/apache/arrow/go/arrow/cdata"
"github.com/feast-dev/feast/go/internal/feast"
"github.com/feast-dev/feast/go/protos/feast/types"
"github.com/feast-dev/feast/go/utils"
prototypes "github.com/feast-dev/feast/go/protos/feast/types"
"github.com/feast-dev/feast/go/types"
"log"
)

Expand Down Expand Up @@ -106,16 +106,16 @@ func readArrowRecord(data DataTable) (array.Record, error) {
cdata.SchemaFromPtr(data.SchemaPtr))
}

func recordToProto(rec array.Record) (map[string]*types.RepeatedValue, error) {
r := make(map[string]*types.RepeatedValue)
func recordToProto(rec array.Record) (map[string]*prototypes.RepeatedValue, error) {
r := make(map[string]*prototypes.RepeatedValue)
schema := rec.Schema()
for idx, column := range rec.Columns() {
field := schema.Field(idx)
values, err := utils.ArrowValuesToProtoValues(column)
values, err := types.ArrowValuesToProtoValues(column)
if err != nil {
return nil, err
}
r[field.Name] = &types.RepeatedValue{Val: values}
r[field.Name] = &prototypes.RepeatedValue{Val: values}
}
return r, nil
}
56 changes: 28 additions & 28 deletions go/internal/feast/featurestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ import (
"github.com/apache/arrow/go/arrow/array"
"github.com/apache/arrow/go/arrow/memory"
"github.com/feast-dev/feast/go/protos/feast/serving"
"github.com/feast-dev/feast/go/protos/feast/types"
"github.com/feast-dev/feast/go/utils"
prototypes "github.com/feast-dev/feast/go/protos/feast/types"
"github.com/feast-dev/feast/go/types"
"github.com/golang/protobuf/proto"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
Expand Down Expand Up @@ -67,7 +67,7 @@ type GroupedFeaturesPerEntitySet struct {
// full feature references as they supposed to appear in response
aliasedFeatureNames []string
Comment thread
felixwang9817 marked this conversation as resolved.
Outdated
// Entity set as a list of EntityKeys to pass to OnlineRead
entityKeys []*types.EntityKey
entityKeys []*prototypes.EntityKey
// Reversed mapping to project result of retrieval from storage to response
indices [][]int
}
Expand Down Expand Up @@ -99,7 +99,7 @@ func (fs *FeatureStore) GetOnlineFeatures(
ctx context.Context,
featureRefs []string,
featureService *FeatureService,
entityProtos map[string]*types.RepeatedValue,
entityProtos map[string]*prototypes.RepeatedValue,
fullFeatureNames bool) ([]*FeatureVector, error) {

numRows, err := fs.validateEntityValues(entityProtos)
Expand Down Expand Up @@ -142,8 +142,8 @@ func (fs *FeatureStore) GetOnlineFeatures(
// TODO: Add a map that contains provided entities + ODFV schema entities + request schema
// to use for ODFV
// Remove comments for requestDataFeatures when ODFV is supported
// requestDataFeatures := make(map[string]*types.RepeatedValue) // TODO (Ly): Should be empty now until ODFV and Request FV are supported
mappedEntityProtos := make(map[string]*types.RepeatedValue)
// requestDataFeatures := make(map[string]*prototypes.RepeatedValue) // TODO (Ly): Should be empty now until ODFV and Request FV are supported
mappedEntityProtos := make(map[string]*prototypes.RepeatedValue)
for joinKeyOrFeature, vals := range entityProtos {
if _, ok := neededRequestODFVFeatures[joinKeyOrFeature]; ok {
mappedEntityProtos[joinKeyOrFeature] = vals
Expand Down Expand Up @@ -185,7 +185,7 @@ func (fs *FeatureStore) GetOnlineFeatures(
}

if entitylessCase {
dummyEntityColumn := &types.RepeatedValue{Val: make([]*types.Value, numRows)}
dummyEntityColumn := &prototypes.RepeatedValue{Val: make([]*prototypes.Value, numRows)}
for index := 0; index < numRows; index++ {
dummyEntityColumn.Val[index] = &DUMMY_ENTITY
}
Expand All @@ -204,7 +204,7 @@ func (fs *FeatureStore) GetOnlineFeatures(
return nil, err
}

vectors, err := fs.transposeResponseIntoColumns(featureData,
vectors, err := fs.transposeFeatureRowsIntoColumns(featureData,
groupRef,
fvs,
arrowMemory,
Expand Down Expand Up @@ -446,7 +446,7 @@ func (fs *FeatureStore) getEntityMaps(requestedFeatureViews []*featureViewAndRef
return entityNameToJoinKeyMap, expectedJoinKeysSet, nil
}

func (fs *FeatureStore) validateEntityValues(joinKeyValues map[string]*types.RepeatedValue) (int, error) {
func (fs *FeatureStore) validateEntityValues(joinKeyValues map[string]*prototypes.RepeatedValue) (int, error) {
setOfRowLengths := make(map[int]bool)
var numRows int
for _, col := range joinKeyValues {
Expand Down Expand Up @@ -531,7 +531,7 @@ func (fs *FeatureStore) getNeededRequestData(requestedRequestFeatureViews []*Req

func (fs *FeatureStore) ensureRequestedDataExist(neededRequestData map[string]struct{},
neededRequestFvFeatures map[string]struct{},
requestDataFeatures map[string]*types.RepeatedValue) error {
requestDataFeatures map[string]*prototypes.RepeatedValue) error {
// TODO (Ly): Review: Skip checking even if composite set of
// neededRequestData neededRequestFvFeatures is different from
// request_data_features but same length?
Expand All @@ -556,27 +556,27 @@ func (fs *FeatureStore) checkOutsideTtl(featureTimestamp *timestamppb.Timestamp,
return currentTimestamp.GetSeconds()-featureTimestamp.GetSeconds() > ttl.Seconds
}

func (fs *FeatureStore) readFromOnlineStore(ctx context.Context, entityRows []*types.EntityKey,
func (fs *FeatureStore) readFromOnlineStore(ctx context.Context, entityRows []*prototypes.EntityKey,
requestedFeatureViewNames []string,
requestedFeatureNames []string,
) ([][]FeatureData, error) {
numRows := len(entityRows)
entityRowsValue := make([]types.EntityKey, numRows)
entityRowsValue := make([]prototypes.EntityKey, numRows)
for index, entityKey := range entityRows {
entityRowsValue[index] = types.EntityKey{JoinKeys: entityKey.JoinKeys, EntityValues: entityKey.EntityValues}
entityRowsValue[index] = prototypes.EntityKey{JoinKeys: entityKey.JoinKeys, EntityValues: entityKey.EntityValues}
}
return fs.onlineStore.OnlineRead(ctx, entityRowsValue, requestedFeatureViewNames, requestedFeatureNames)
}

func (fs *FeatureStore) transposeResponseIntoColumns(featureData2D [][]FeatureData,
func (fs *FeatureStore) transposeFeatureRowsIntoColumns(featureData2D [][]FeatureData,
groupRef *GroupedFeaturesPerEntitySet,
fvs map[string]*FeatureView,
arrowAllocator memory.Allocator,
numRows int) ([]*FeatureVector, error) {

numFeatures := len(groupRef.aliasedFeatureNames)

var value *types.Value
var value *prototypes.Value
var status serving.FieldStatus
var eventTimeStamp *timestamppb.Timestamp
var featureData *FeatureData
Expand All @@ -592,7 +592,7 @@ func (fs *FeatureStore) transposeResponseIntoColumns(featureData2D [][]FeatureDa
Timestamps: make([]*timestamppb.Timestamp, numRows),
}
vectors = append(vectors, currentVector)
protoValues := make([]*types.Value, numRows)
protoValues := make([]*prototypes.Value, numRows)

for rowEntityIndex, outputIndexes := range groupRef.indices {
if featureData2D[rowEntityIndex] == nil {
Expand All @@ -604,14 +604,14 @@ func (fs *FeatureStore) transposeResponseIntoColumns(featureData2D [][]FeatureDa
eventTimeStamp = &timestamppb.Timestamp{Seconds: featureData.timestamp.Seconds, Nanos: featureData.timestamp.Nanos}
featureViewName = featureData.reference.FeatureViewName
fv = fvs[featureViewName]
if _, ok := featureData.value.Val.(*types.Value_NullVal); ok {
if _, ok := featureData.value.Val.(*prototypes.Value_NullVal); ok {
value = nil
status = serving.FieldStatus_NOT_FOUND
} else if fs.checkOutsideTtl(eventTimeStamp, timestamppb.Now(), fv.ttl) {
value = &types.Value{Val: featureData.value.Val}
value = &prototypes.Value{Val: featureData.value.Val}
status = serving.FieldStatus_OUTSIDE_MAX_AGE
} else {
value = &types.Value{Val: featureData.value.Val}
value = &prototypes.Value{Val: featureData.value.Val}
status = serving.FieldStatus_PRESENT
}
}
Expand All @@ -621,7 +621,7 @@ func (fs *FeatureStore) transposeResponseIntoColumns(featureData2D [][]FeatureDa
currentVector.Timestamps[rowIndex] = eventTimeStamp
}
}
arrowValues, err := utils.ProtoValuesToArrowArray(protoValues, arrowAllocator, numRows)
arrowValues, err := types.ProtoValuesToArrowArray(protoValues, arrowAllocator, numRows)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -690,7 +690,7 @@ func (fs *FeatureStore) listEntities(hideDummyEntity bool) ([]*Entity, error) {
return entities, nil
}

func entityKeysToProtos(joinKeyValues map[string]*types.RepeatedValue) []*types.EntityKey {
func entityKeysToProtos(joinKeyValues map[string]*prototypes.RepeatedValue) []*prototypes.EntityKey {
keys := make([]string, len(joinKeyValues))
index := 0
var numRows int
Expand All @@ -700,11 +700,11 @@ func entityKeysToProtos(joinKeyValues map[string]*types.RepeatedValue) []*types.
numRows = len(v.Val)
}
sort.Strings(keys)
entityKeys := make([]*types.EntityKey, numRows)
entityKeys := make([]*prototypes.EntityKey, numRows)
numJoinKeys := len(keys)
// Construct each EntityKey object
for index = 0; index < numRows; index++ {
entityKeys[index] = &types.EntityKey{JoinKeys: keys, EntityValues: make([]*types.Value, numJoinKeys)}
entityKeys[index] = &prototypes.EntityKey{JoinKeys: keys, EntityValues: make([]*prototypes.Value, numJoinKeys)}
}

for colIndex, key := range keys {
Expand All @@ -721,7 +721,7 @@ rows for each requested feature
*/

func groupFeatureRefs(requestedFeatureViews []*featureViewAndRefs,
joinKeyValues map[string]*types.RepeatedValue,
joinKeyValues map[string]*prototypes.RepeatedValue,
entityNameToJoinKeyMap map[string]string,
fullFeatureNames bool,
) (map[string]*GroupedFeaturesPerEntitySet,
Expand All @@ -738,7 +738,7 @@ func groupFeatureRefs(requestedFeatureViews []*featureViewAndRefs,
}

groupKeyBuilder := make([]string, 0)
joinKeysValuesProjection := make(map[string]*types.RepeatedValue)
joinKeysValuesProjection := make(map[string]*prototypes.RepeatedValue)

joinKeyToAliasMap := make(map[string]string)
if fv.base.projection != nil && fv.base.projection.joinKeyMap != nil {
Expand Down Expand Up @@ -804,8 +804,8 @@ func groupFeatureRefs(requestedFeatureViews []*featureViewAndRefs,
return groups, nil
}

func getUniqueEntityRows(joinKeysProto []*types.EntityKey) ([]*types.EntityKey, [][]int, error) {
uniqueValues := make(map[[sha256.Size]byte]*types.EntityKey, 0)
func getUniqueEntityRows(joinKeysProto []*prototypes.EntityKey) ([]*prototypes.EntityKey, [][]int, error) {
uniqueValues := make(map[[sha256.Size]byte]*prototypes.EntityKey, 0)
positions := make(map[[sha256.Size]byte][]int, 0)

for index, entityKey := range joinKeysProto {
Expand All @@ -824,7 +824,7 @@ func getUniqueEntityRows(joinKeysProto []*types.EntityKey) ([]*types.EntityKey,
}

mappingIndices := make([][]int, len(uniqueValues))
uniqueEntityRows := make([]*types.EntityKey, 0)
uniqueEntityRows := make([]*prototypes.EntityKey, 0)
for rowHash, row := range uniqueValues {
nextIdx := len(uniqueEntityRows)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package utils
package types

import (
"fmt"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package utils
package types

import (
"github.com/apache/arrow/go/arrow/memory"
Expand Down