Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
501f59d
Initial structure for go sqlite online store
kevjumba Mar 23, 2022
0fbd8dc
Somewhat intermediate state
kevjumba Mar 23, 2022
21c8900
Add sqlite online store for go for testing
kevjumba Mar 24, 2022
01d9a1d
Revert
kevjumba Mar 24, 2022
2c4fccd
Revert
kevjumba Mar 24, 2022
34dcd15
Clean up
kevjumba Mar 24, 2022
80f4579
Address review issues
kevjumba Mar 24, 2022
17acc6a
Fix/address issues
kevjumba Mar 25, 2022
64fa88b
lint
kevjumba Mar 25, 2022
0601f0a
Fix
kevjumba Mar 25, 2022
2233ff8
Fix
kevjumba Mar 25, 2022
4de021b
Make integration test work
kevjumba Mar 25, 2022
e3f9970
Fix tests
kevjumba Mar 25, 2022
d9f5333
Fix tests
kevjumba Mar 25, 2022
a631c52
debugging
kevjumba Mar 25, 2022
993ead4
Debug
kevjumba Mar 25, 2022
63fbee6
Debug
kevjumba Mar 25, 2022
c325834
Debug
kevjumba Mar 25, 2022
f2202b8
Debug
kevjumba Mar 25, 2022
c242587
Debug
kevjumba Mar 25, 2022
08e51fb
Debug
kevjumba Mar 25, 2022
b551639
Remove feature_repo files
kevjumba Mar 28, 2022
0755e05
update gitignore
kevjumba Mar 28, 2022
eb8a320
Clean up code
kevjumba Mar 28, 2022
c3472bd
Update go mod
kevjumba Mar 28, 2022
a5b6607
Update makefile
kevjumba Mar 28, 2022
5f68689
Fix gitignore issue
kevjumba Mar 28, 2022
3896f99
Update makefile
kevjumba Mar 28, 2022
111c2d0
Update makefile
kevjumba Mar 28, 2022
c6197ce
Update makefile
kevjumba Mar 28, 2022
a5d4d28
Update makefile
kevjumba Mar 28, 2022
14d5602
Update makefile
kevjumba Mar 28, 2022
02eda7a
Revert worfklow
kevjumba Mar 28, 2022
14876a2
Update build path
kevjumba Mar 29, 2022
3bfcfad
remove
kevjumba Mar 29, 2022
8739e68
rename
kevjumba Mar 29, 2022
bd052a3
Address review
kevjumba Mar 29, 2022
189d3d2
fix tests
kevjumba Mar 29, 2022
acc86bb
Fix
kevjumba Mar 29, 2022
34ab63d
see if this fixes test
kevjumba Mar 29, 2022
fa0abf3
Fix
kevjumba Mar 29, 2022
941aea8
revert
kevjumba Mar 29, 2022
025a7e4
Will add in separate pr to update cryptography
kevjumba Mar 29, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Add sqlite online store for go for testing
Signed-off-by: Kevin Zhang <kzhang@tecton.ai>
  • Loading branch information
kevjumba committed Mar 29, 2022
commit 21c8900a4d62733c5c25eef1bfb0972e0c16cd4c
3 changes: 2 additions & 1 deletion go/internal/feast/onlinestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ func NewOnlineStore(config *RepoConfig) (OnlineStore, error) {
return onlineStore, err
}
if onlineStoreType == "sqlite" {
onlineStore, err := NewSqliteOnlineStore(config.Project, config.OnlineStore)
print("ASFDSASF!!")
onlineStore, err := NewSqliteOnlineStore(config.Project, config, config.OnlineStore)
return onlineStore, err
} else {
return nil, fmt.Errorf("%s online store type is currently not supported; only Redis is supported", onlineStoreType)
Expand Down
96 changes: 78 additions & 18 deletions go/internal/feast/sqliteonlinestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,32 +3,37 @@ package feast
import (
"database/sql"
"errors"
"log"
"strings"
"time"

"context"
"fmt"

"github.com/feast-dev/feast/go/protos/feast/serving"
"github.com/feast-dev/feast/go/protos/feast/types"
_ "github.com/mattn/go-sqlite3"
"google.golang.org/protobuf/proto"
timestamppb "google.golang.org/protobuf/types/known/timestamppb"
)

type SqliteOnlineStore struct {
// Feast project name
// TODO (woop): Should we remove project as state that is tracked at the store level?
project string

path string
db *sql.DB
path string
db *sql.DB
}

func NewSqliteOnlineStore(project string, onlineStoreConfig map[string]interface{}) (*SqliteOnlineStore, error) {
// Creates a new sqlite online store object. onlineStoreConfig should have relative path of database file with respect to repoConfig.repoPath.
func NewSqliteOnlineStore(project string, repoConfig *RepoConfig, onlineStoreConfig map[string]interface{}) (*SqliteOnlineStore, error) {
store := SqliteOnlineStore{project: project}
if db_path, ok := onlineStoreConfig["path"]; !ok {
return nil, fmt.Errorf("cannot find sqlite path %s", db_path)
} else if dbPathStr, ok := db_path.(string); !ok {
return nil, fmt.Errorf("cannot find convert sqlite path to string %s", db_path)
} else {
store.path = dbPathStr
db, err := initializeConnection(dbPathStr)
store.path = fmt.Sprintf("%s/%s", repoConfig.RepoPath, dbPathStr)
db, err := initializeConnection(store.path)
if err != nil {
return nil, err
}
Expand All @@ -41,17 +46,69 @@ func (s *SqliteOnlineStore) Destruct() {
s.db.Close()
}

// Returns FeatureData 2D array. Each row corresponds to one entity value and each column corresponds to a single feature where the number of columns should be
// same length as the length of featureNames. Reads from every table in featureViewNames with the entity keys described.
func (s *SqliteOnlineStore) OnlineRead(ctx context.Context, entityKeys []types.EntityKey, featureViewNames []string, featureNames []string) ([][]FeatureData, error) {
featureCount := len(featureNames)
_, err := s.getConnection()
if err != nil {
return nil, err
}
return nil, nil
}

// feature views, entities, dataframe?
func (s *SqliteOnlineStore) WriteToOnlineStore(ctx context.Context, featureViewName string, data [][]FeatureData) error {
return nil
project := s.project
results := make([][]FeatureData, len(entityKeys))
entityNameToEntityIndex := make(map[string]int)
in_query := make([]string, len(entityKeys))
serialized_entities := make([]interface{}, len(entityKeys))
for i := 0; i < len(entityKeys); i++ {
serKey, err := serializeEntityKey(&entityKeys[i])
if err != nil {
return nil, err
}
// TODO: fix this, string conversion is not safe
entityNameToEntityIndex[string(*serKey)] = i
Comment thread
pyalex marked this conversation as resolved.
Outdated
// for IN clause in read query
in_query[i] = "?"
serialized_entities[i] = *serKey
}
featureNamesToIdx := make(map[string]int)
for idx, name := range featureNames {
featureNamesToIdx[name] = idx
}

for idx := 0; idx < len(entityKeys); idx++ {
results[idx] = make([]FeatureData, featureCount)
}
for _, featureViewName := range featureViewNames {
Comment thread
kevjumba marked this conversation as resolved.
Outdated
query_string := fmt.Sprintf(`SELECT entity_key, feature_name, value, event_ts
FROM %s
WHERE entity_key IN (%s)
ORDER BY entity_key`, tableId(project, featureViewName), strings.Join(in_query, ","))
rows, err := s.db.Query(query_string, serialized_entities...)
if err != nil {
return nil, err
}
defer rows.Close()
for rows.Next() {
var entity_key string
Comment thread
kevjumba marked this conversation as resolved.
Outdated
var feature_name string
var valueString string
Comment thread
pyalex marked this conversation as resolved.
Outdated
var event_ts time.Time
var value types.Value
err = rows.Scan(&entity_key, &feature_name, &valueString, &event_ts)
if err != nil {
log.Fatal(err)
Comment thread
kevjumba marked this conversation as resolved.
Outdated
}
if err := proto.Unmarshal([]byte(valueString), &value); err != nil {
return nil, errors.New("error converting parsed value to types.Value")
}
results[entityNameToEntityIndex[entity_key]][featureNamesToIdx[feature_name]] = FeatureData{reference: serving.FeatureReferenceV2{FeatureViewName: featureViewName, FeatureName: feature_name},
timestamp: *timestamppb.New(event_ts),
value: types.Value{Val: value.Val},
}
}
}
return results, nil
}

func (s *SqliteOnlineStore) Update(ctx context.Context, config *RepoConfig, tables_to_delete []*FeatureView, tables_to_keep []*FeatureView) error {
Expand All @@ -62,16 +119,17 @@ func (s *SqliteOnlineStore) Update(ctx context.Context, config *RepoConfig, tabl
project := config.Project
for _, table := range tables_to_keep {
s.db.Exec(
fmt.Sprintf("CREATE TABLE IF NOT EXISTS %s (entity_key BLOB, feature_name TEXT, value BLOB, event_ts timestamp, created_ts timestamp, PRIMARY KEY(entity_key, feature_name))", tableId(project, table)))
fmt.Sprintf("CREATE TABLE IF NOT EXISTS %s (entity_key BLOB, feature_name TEXT, value BLOB, event_ts timestamp, created_ts timestamp, PRIMARY KEY(entity_key, feature_name))", tableId(project, table.base.name)))
s.db.Exec(
fmt.Sprintf("CREATE INDEX IF NOT EXISTS %s_ek ON %s (entity_key);", tableId(project, table), tableId(project, table)))
fmt.Sprintf("CREATE INDEX IF NOT EXISTS %s_ek ON %s (entity_key);", tableId(project, table.base.name), tableId(project, table.base.name)))
}
for _, table := range tables_to_delete {
s.db.Exec("DROP TABLE IF EXISTS %s", tableId(project, table))
s.db.Exec("DROP TABLE IF EXISTS %s", tableId(project, table.base.name))
}
return nil
}

// Gets a sqlite connection and sets it to the online store and also returns a pointer to the connection.
func (s *SqliteOnlineStore) getConnection() (*sql.DB, error) {
if s.db == nil {
if s.path == "" {
Expand All @@ -86,12 +144,14 @@ func (s *SqliteOnlineStore) getConnection() (*sql.DB, error) {
return s.db, nil
}

func tableId(project string, table *FeatureView) string {
return fmt.Sprintf("%s_%s", project, table.base.name)
// Constructs the table id from the project and table(featureViewName) string.
func tableId(project string, featureViewName string) string {
return fmt.Sprintf("%s_%s", project, featureViewName)
}

// Creates a connection to the sqlite database and returns the connection.
func initializeConnection(db_path string) (*sql.DB, error) {
db, err := sql.Open("sqlite3", "./foo.db")
db, err := sql.Open("sqlite3", db_path)
if err != nil {
return nil, err
}
Expand Down
69 changes: 44 additions & 25 deletions go/internal/feast/sqliteonlinestore_test.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
package feast

import (
"fmt"
"log"
"context"
"reflect"
"testing"

"github.com/feast-dev/feast/go/protos/feast/types"
"github.com/stretchr/testify/assert"
)

func TestSqliteSetup(t *testing.T) {
dir := "../test/selected_tetra"
dir := "../test/feature_repo"
config, err := NewRepoConfigFromFile(dir)
assert.Nil(t, err)
assert.Equal(t, "selected_tetra", config.Project)
assert.Equal(t, "feature_repo", config.Project)
assert.Equal(t, "data/registry.db", config.GetRegistryConfig().Path)
assert.Equal(t, "local", config.Provider)
assert.Equal(t, map[string]interface{}{
Expand All @@ -23,32 +24,50 @@ func TestSqliteSetup(t *testing.T) {
assert.Empty(t, config.Flags)
}

func TestSqliteUpdate(t *testing.T) {
dir := "../test/selected_tetra"
func TestSqliteOnlineRead(t *testing.T) {
dir := "../test/feature_repo"
config, err := NewRepoConfigFromFile(dir)
assert.Nil(t, err)
store, err := NewSqliteOnlineStore("test", config.OnlineStore)
store, err := NewSqliteOnlineStore("feature_repo", config, config.OnlineStore)
defer store.Destruct()
assert.Nil(t, err)
show_tables := `SELECT
*
FROM
selected_tetra_driver_hourly_stats;`
rows, err := store.db.Query(show_tables)
if err != nil {
log.Fatal(err)
entity_key1 := types.EntityKey{
JoinKeys: []string{"driver_id"},
EntityValues: []*types.Value{{Val: &types.Value_Int64Val{1005}}},
}
defer rows.Close()
for rows.Next() {
fmt.Println("in")
var id int
var name string
err = rows.Scan(&id, &name)
if err != nil {
log.Fatal(err)
entity_key2 := types.EntityKey{
JoinKeys: []string{"driver_id"},
EntityValues: []*types.Value{{Val: &types.Value_Int64Val{1001}}},
}
entity_key3 := types.EntityKey{
JoinKeys: []string{"driver_id"},
EntityValues: []*types.Value{{Val: &types.Value_Int64Val{1003}}},
}
entityKeys := []types.EntityKey{entity_key1, entity_key2, entity_key3}
tableNames := []string{"driver_hourly_stats"}
featureNames := []string{"conv_rate", "acc_rate", "avg_daily_trips"}
featureData, err := store.OnlineRead(context.Background(), entityKeys, tableNames, featureNames)
assert.Nil(t, err)
returnedFeatureValues := make([]types.Value, 0)
returnedFeatureNames := make([]string, 0)
for _, featureVector := range featureData {
for _, feature := range featureVector {
returnedFeatureValues = append(returnedFeatureValues, feature.value)
returnedFeatureNames = append(returnedFeatureNames, feature.reference.FeatureName)
}
fmt.Println(id, name)
}
assert.False(t, true)

expectedFeatureValues := []types.Value{
{Val: &types.Value_FloatVal{0.78135854}},
{Val: &types.Value_FloatVal{0.38527268}},
{Val: &types.Value_Int64Val{755}},
{Val: &types.Value_FloatVal{0.49661186}},
{Val: &types.Value_FloatVal{0.9440974}},
{Val: &types.Value_Int64Val{169}},
{Val: &types.Value_FloatVal{0.80762655}},
{Val: &types.Value_FloatVal{0.71510273}},
{Val: &types.Value_Int64Val{545}},
}
expectedFeatureNames := []string{"conv_rate", "acc_rate", "avg_daily_trips", "conv_rate", "acc_rate", "avg_daily_trips", "conv_rate", "acc_rate", "avg_daily_trips"}
assert.True(t, reflect.DeepEqual(expectedFeatureValues, returnedFeatureValues))
assert.True(t, reflect.DeepEqual(expectedFeatureNames, returnedFeatureNames))
}
Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
# production, you can use your favorite DWH, such as BigQuery. See Feast documentation
# for more info.
driver_hourly_stats = FileSource(
path="/Users/kevinzhang/tecton-ai/offline_store/feast/go/internal/test/selected_tetra/data/driver_stats.parquet",
path="/Users/kevinzhang/tecton-ai/offline_store/feast/go/internal/test/feature_repo/data/driver_stats.parquet",
event_timestamp_column="event_timestamp",
created_timestamp_column="created",
)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
project: selected_tetra
project: feature_repo
registry: data/registry.db
provider: local
online_store:
Expand Down
Binary file not shown.