-
Notifications
You must be signed in to change notification settings - Fork 1.3k
Expand file tree
/
Copy pathonlinestore.go
More file actions
179 lines (154 loc) · 6.42 KB
/
onlinestore.go
File metadata and controls
179 lines (154 loc) · 6.42 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
package onlinestore
import (
"context"
"encoding/binary"
"fmt"
"sort"
"github.com/feast-dev/feast/go/internal/feast/registry"
"github.com/feast-dev/feast/go/protos/feast/serving"
"github.com/feast-dev/feast/go/protos/feast/types"
"github.com/golang/protobuf/ptypes/timestamp"
)
type FeatureData struct {
Reference serving.FeatureReferenceV2
Timestamp timestamp.Timestamp
Value types.Value
}
type OnlineStore interface {
// OnlineRead reads multiple features (specified in featureReferences) for multiple
// entity keys (specified in entityKeys) and returns an array of array of features,
// where each feature contains 3 fields:
// 1. feature Reference
// 2. feature event timestamp
// 3. feature value
// The inner array will have the same size as featureReferences,
// while the outer array will have the same size as entityKeys.
// TODO: Can we return [][]FeatureData, []timstamps, error
// instead and remove timestamp from FeatureData struct to mimic Python's code
// and reduces repeated memory storage for the same timstamp (which is stored as value and not as a pointer).
// Should each attribute in FeatureData be stored as a pointer instead since the current
// design forces value copied in OnlineRead + GetOnlineFeatures
// (array is destructed so we cannot use the same fields in each
// Feature object as pointers in GetOnlineFeaturesResponse)
// => allocate memory for each field once in OnlineRead
// and reuse them in GetOnlineFeaturesResponse?
OnlineRead(ctx context.Context, entityKeys []*types.EntityKey, featureViewNames []string, featureNames []string) ([][]FeatureData, error)
// Destruct must be call once user is done using OnlineStore
// This is to comply with the Connector since we have to close the plugin
Destruct()
}
func getOnlineStoreType(onlineStoreConfig map[string]interface{}) (string, bool) {
if onlineStoreType, ok := onlineStoreConfig["type"]; !ok {
// If online store type isn't specified, default to sqlite
return "sqlite", true
} else {
result, ok := onlineStoreType.(string)
return result, ok
}
}
func NewOnlineStore(config *registry.RepoConfig) (OnlineStore, error) {
onlineStoreType, ok := getOnlineStoreType(config.OnlineStore)
if !ok {
return nil, fmt.Errorf("could not get online store type from online store config: %+v", config.OnlineStore)
} else if onlineStoreType == "sqlite" {
onlineStore, err := NewSqliteOnlineStore(config.Project, config, config.OnlineStore)
return onlineStore, err
} else if onlineStoreType == "redis" {
onlineStore, err := NewRedisOnlineStore(config.Project, config, config.OnlineStore)
return onlineStore, err
} else if onlineStoreType == "dynamodb" {
onlineStore, err := NewDynamodbOnlineStore(config.Project, config, config.OnlineStore)
return onlineStore, err
} else {
return nil, fmt.Errorf("%s online store type is currently not supported; only redis and sqlite are supported", onlineStoreType)
}
}
func serializeEntityKey(entityKey *types.EntityKey, entityKeySerializationVersion int64) (*[]byte, error) {
// Serialize entity key to a bytestring so that it can be used as a lookup key in a hash table.
// Ensure that we have the right amount of join keys and entity values
if len(entityKey.JoinKeys) != len(entityKey.EntityValues) {
return nil, fmt.Errorf("the amount of join key names and entity values don't match: %s vs %s", entityKey.JoinKeys, entityKey.EntityValues)
}
// Make sure that join keys are sorted so that we have consistent key building
m := make(map[string]*types.Value)
for i := 0; i < len(entityKey.JoinKeys); i++ {
m[entityKey.JoinKeys[i]] = entityKey.EntityValues[i]
}
keys := make([]string, 0, len(m))
keys = append(keys, entityKey.JoinKeys...)
sort.Strings(keys) // Sort the keys
// Build the key
length := 7 * len(keys)
bufferList := make([][]byte, length)
offset := 0
// For entityKeySerializationVersion 3 and above, we add the number of join keys
// as the first 4 bytes of the serialized key.
if entityKeySerializationVersion >= 3 {
byteBuffer := make([]byte, 4)
binary.LittleEndian.PutUint32(byteBuffer, uint32(len(keys)))
bufferList[offset] = byteBuffer // First buffer is always the length of the keys
offset++
}
for i := 0; i < len(keys); i++ {
// Add the key type STRING info
byteBuffer := make([]byte, 4)
binary.LittleEndian.PutUint32(byteBuffer, uint32(types.ValueType_Enum_value["STRING"]))
bufferList[offset] = byteBuffer
offset++
// Add the size of current "key" string
keyLenByteBuffer := make([]byte, 4)
binary.LittleEndian.PutUint32(keyLenByteBuffer, uint32(len(keys[i])))
bufferList[offset] = keyLenByteBuffer
offset++
// Add value
bufferList[offset] = []byte(keys[i])
offset++
}
for i := 0; i < len(keys); i++ {
value := m[keys[i]].GetVal()
valueBytes, valueTypeBytes, err := serializeValue(value, entityKeySerializationVersion)
if err != nil {
return valueBytes, err
}
// Add value type info
typeBuffer := make([]byte, 4)
binary.LittleEndian.PutUint32(typeBuffer, uint32(valueTypeBytes))
bufferList[offset] = typeBuffer
offset++
// Add length info
lenBuffer := make([]byte, 4)
binary.LittleEndian.PutUint32(lenBuffer, uint32(len(*valueBytes)))
bufferList[offset] = lenBuffer
offset++
bufferList[offset] = *valueBytes
offset++
}
// Convert from an array of byte arrays to a single byte array
var entityKeyBuffer []byte
for i := 0; i < len(bufferList); i++ {
entityKeyBuffer = append(entityKeyBuffer, bufferList[i]...)
}
return &entityKeyBuffer, nil
}
func serializeValue(value interface{}, entityKeySerializationVersion int64) (*[]byte, types.ValueType_Enum, error) {
// TODO: Implement support for other types (at least the major types like ints, strings, bytes)
switch x := (value).(type) {
case *types.Value_StringVal:
valueString := []byte(x.StringVal)
return &valueString, types.ValueType_STRING, nil
case *types.Value_BytesVal:
return &x.BytesVal, types.ValueType_BYTES, nil
case *types.Value_Int32Val:
valueBuffer := make([]byte, 4)
binary.LittleEndian.PutUint32(valueBuffer, uint32(x.Int32Val))
return &valueBuffer, types.ValueType_INT32, nil
case *types.Value_Int64Val:
valueBuffer := make([]byte, 8)
binary.LittleEndian.PutUint64(valueBuffer, uint64(x.Int64Val))
return &valueBuffer, types.ValueType_INT64, nil
case nil:
return nil, types.ValueType_INVALID, fmt.Errorf("could not detect type for %v", x)
default:
return nil, types.ValueType_INVALID, fmt.Errorf("could not detect type for %v", x)
}
}