Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
501f59d
Initial structure for go sqlite online store
kevjumba Mar 23, 2022
0fbd8dc
Somewhat intermediate state
kevjumba Mar 23, 2022
21c8900
Add sqlite online store for go for testing
kevjumba Mar 24, 2022
01d9a1d
Revert
kevjumba Mar 24, 2022
2c4fccd
Revert
kevjumba Mar 24, 2022
34dcd15
Clean up
kevjumba Mar 24, 2022
80f4579
Address review issues
kevjumba Mar 24, 2022
17acc6a
Fix/address issues
kevjumba Mar 25, 2022
64fa88b
lint
kevjumba Mar 25, 2022
0601f0a
Fix
kevjumba Mar 25, 2022
2233ff8
Fix
kevjumba Mar 25, 2022
4de021b
Make integration test work
kevjumba Mar 25, 2022
e3f9970
Fix tests
kevjumba Mar 25, 2022
d9f5333
Fix tests
kevjumba Mar 25, 2022
a631c52
debugging
kevjumba Mar 25, 2022
993ead4
Debug
kevjumba Mar 25, 2022
63fbee6
Debug
kevjumba Mar 25, 2022
c325834
Debug
kevjumba Mar 25, 2022
f2202b8
Debug
kevjumba Mar 25, 2022
c242587
Debug
kevjumba Mar 25, 2022
08e51fb
Debug
kevjumba Mar 25, 2022
b551639
Remove feature_repo files
kevjumba Mar 28, 2022
0755e05
update gitignore
kevjumba Mar 28, 2022
eb8a320
Clean up code
kevjumba Mar 28, 2022
c3472bd
Update go mod
kevjumba Mar 28, 2022
a5b6607
Update makefile
kevjumba Mar 28, 2022
5f68689
Fix gitignore issue
kevjumba Mar 28, 2022
3896f99
Update makefile
kevjumba Mar 28, 2022
111c2d0
Update makefile
kevjumba Mar 28, 2022
c6197ce
Update makefile
kevjumba Mar 28, 2022
a5d4d28
Update makefile
kevjumba Mar 28, 2022
14d5602
Update makefile
kevjumba Mar 28, 2022
02eda7a
Revert worfklow
kevjumba Mar 28, 2022
14876a2
Update build path
kevjumba Mar 29, 2022
3bfcfad
remove
kevjumba Mar 29, 2022
8739e68
rename
kevjumba Mar 29, 2022
bd052a3
Address review
kevjumba Mar 29, 2022
189d3d2
fix tests
kevjumba Mar 29, 2022
acc86bb
Fix
kevjumba Mar 29, 2022
34ab63d
see if this fixes test
kevjumba Mar 29, 2022
fa0abf3
Fix
kevjumba Mar 29, 2022
941aea8
revert
kevjumba Mar 29, 2022
025a7e4
Will add in separate pr to update cryptography
kevjumba Mar 29, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Somewhat intermediate state
Signed-off-by: Kevin Zhang <kzhang@tecton.ai>
  • Loading branch information
kevjumba committed Mar 29, 2022
commit 0fbd8dc8590b24801ff89a7900904b7d7e1c826c
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ require (
github.com/klauspost/compress v1.13.6 // indirect
github.com/kr/pretty v0.1.0 // indirect
github.com/pierrec/lz4/v4 v4.1.9 // indirect
github.com/go-sql-driver/mysql v1.6.0 // indirect
Comment thread
kevjumba marked this conversation as resolved.
Outdated
github.com/google/go-cmp v0.5.7 // indirect
github.com/mattn/go-sqlite3 v1.14.12 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
golang.org/x/exp v0.0.0-20211028214138-64b4c8e87d1a // indirect
golang.org/x/net v0.0.0-20211015210444-4f30a5c0130f // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,8 @@ github.com/nats-io/nats.go v1.9.1/go.mod h1:ZjDU1L/7fJ09jvUSRVBR2e7+RnLiiIQyqyzE
github.com/nats-io/nkeys v0.1.0/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w=
github.com/nats-io/nkeys v0.1.3/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/mattn/go-sqlite3 v1.14.12 h1:TJ1bhYJPV44phC+IMu1u2K/i5RriLTPe+yc68XDJ1Z0=
github.com/mattn/go-sqlite3 v1.14.12/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU=
github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A=
github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE=
github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU=
Expand Down
Empty file added go/internal/feast/foo.db
Empty file.
68 changes: 60 additions & 8 deletions go/internal/feast/sqliteonlinestore.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
package feast

import (
"database/sql"
"errors"

"context"
"fmt"

"github.com/feast-dev/feast/go/protos/feast/types"
_ "github.com/mattn/go-sqlite3"
)

type SqliteOnlineStore struct {
Expand All @@ -13,6 +17,7 @@ type SqliteOnlineStore struct {
project string

path string
db *sql.DB
}

func NewSqliteOnlineStore(project string, onlineStoreConfig map[string]interface{}) (*SqliteOnlineStore, error) {
Expand All @@ -23,25 +28,72 @@ func NewSqliteOnlineStore(project string, onlineStoreConfig map[string]interface
return nil, fmt.Errorf("cannot find convert sqlite path to string %s", db_path)
} else {
store.path = dbPathStr
db, err := initializeConnection(dbPathStr)
if err != nil {
return nil, err
}
store.db = db
}
return &SqliteOnlineStore{
project: project,
}, nil
return &store, nil
}

func (*SqliteOnlineStore) Destruct() {

func (s *SqliteOnlineStore) Destruct() {
s.db.Close()
}

func (*SqliteOnlineStore) OnlineRead(ctx context.Context, entityKeys []types.EntityKey, featureViewNames []string, featureNames []string) ([][]FeatureData, error) {
func (s *SqliteOnlineStore) OnlineRead(ctx context.Context, entityKeys []types.EntityKey, featureViewNames []string, featureNames []string) ([][]FeatureData, error) {
_, err := s.getConnection()
if err != nil {
return nil, err
}
return nil, nil
}

// feature views, entities, dataframe?
func (*SqliteOnlineStore) WriteToOnlineStore(ctx context.Context, featureViewName string, data [][]FeatureData) error {
func (s *SqliteOnlineStore) WriteToOnlineStore(ctx context.Context, featureViewName string, data [][]FeatureData) error {
return nil
}

func (s *SqliteOnlineStore) Update(ctx context.Context, config *RepoConfig, tables_to_delete []*FeatureView, tables_to_keep []*FeatureView) error {
_, err := s.getConnection()
if err != nil {
return err
}
project := config.Project
for _, table := range tables_to_keep {
s.db.Exec(
fmt.Sprintf("CREATE TABLE IF NOT EXISTS %s (entity_key BLOB, feature_name TEXT, value BLOB, event_ts timestamp, created_ts timestamp, PRIMARY KEY(entity_key, feature_name))", tableId(project, table)))
s.db.Exec(
fmt.Sprintf("CREATE INDEX IF NOT EXISTS %s_ek ON %s (entity_key);", tableId(project, table), tableId(project, table)))
}
for _, table := range tables_to_delete {
s.db.Exec("DROP TABLE IF EXISTS %s", tableId(project, table))
}
return nil
}

func (*SqliteOnlineStore) initializeConnection(db_path string) {
func (s *SqliteOnlineStore) getConnection() (*sql.DB, error) {
if s.db == nil {
if s.path == "" {
return nil, errors.New("no database path available")
}
db, err := initializeConnection(s.path)
s.db = db
if err != nil {
return nil, err
}
}
Comment on lines 123 to 132
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should probably be guarded by a lock to prevent concurrent access creating multiple dangling connections

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added a mutex.

return s.db, nil
}

func tableId(project string, table *FeatureView) string {
return fmt.Sprintf("%s_%s", project, table.base.name)
}

func initializeConnection(db_path string) (*sql.DB, error) {
db, err := sql.Open("sqlite3", "./foo.db")
if err != nil {
return nil, err
}
return db, nil
}
54 changes: 54 additions & 0 deletions go/internal/feast/sqliteonlinestore_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package feast

import (
"fmt"
"log"
"testing"

"github.com/stretchr/testify/assert"
)

func TestSqliteSetup(t *testing.T) {
dir := "../test/selected_tetra"
config, err := NewRepoConfigFromFile(dir)
assert.Nil(t, err)
assert.Equal(t, "selected_tetra", config.Project)
assert.Equal(t, "data/registry.db", config.GetRegistryConfig().Path)
assert.Equal(t, "local", config.Provider)
assert.Equal(t, map[string]interface{}{
"path": "data/online_store.db",
}, config.OnlineStore)
assert.Empty(t, config.OfflineStore)
assert.Empty(t, config.FeatureServer)
assert.Empty(t, config.Flags)
}

func TestSqliteUpdate(t *testing.T) {
dir := "../test/selected_tetra"
config, err := NewRepoConfigFromFile(dir)
assert.Nil(t, err)
store, err := NewSqliteOnlineStore("test", config.OnlineStore)
defer store.Destruct()
assert.Nil(t, err)
show_tables := `SELECT
*
FROM
selected_tetra_driver_hourly_stats;`
rows, err := store.db.Query(show_tables)
if err != nil {
log.Fatal(err)
}
defer rows.Close()
for rows.Next() {
fmt.Println("in")
var id int
var name string
err = rows.Scan(&id, &name)
if err != nil {
log.Fatal(err)
}
fmt.Println(id, name)
}
assert.False(t, true)

}
Empty file.
Binary file not shown.
Binary file not shown.
35 changes: 35 additions & 0 deletions go/internal/test/selected_tetra/example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# This is an example feature definition file

from datetime import timedelta

from feast import Entity, Feature, FeatureView, FileSource, ValueType

# Read data from parquet files. Parquet is convenient for local development mode. For
# production, you can use your favorite DWH, such as BigQuery. See Feast documentation
# for more info.
driver_hourly_stats = FileSource(
path="/Users/kevinzhang/tecton-ai/offline_store/feast/go/internal/test/selected_tetra/data/driver_stats.parquet",
event_timestamp_column="event_timestamp",
created_timestamp_column="created",
)

# Define an entity for the driver. You can think of entity as a primary key used to
# fetch features.
driver = Entity(name="driver_id", value_type=ValueType.INT64, description="driver id",)

# Our parquet files contain sample data that includes a driver_id column, timestamps and
# three feature column. Here we define a Feature View that will allow us to serve this
# data to our model online.
driver_hourly_stats_view = FeatureView(
name="driver_hourly_stats",
entities=["driver_id"],
ttl=timedelta(days=1),
features=[
Feature(name="conv_rate", dtype=ValueType.FLOAT),
Feature(name="acc_rate", dtype=ValueType.FLOAT),
Feature(name="avg_daily_trips", dtype=ValueType.INT64),
],
online=True,
batch_source=driver_hourly_stats,
tags={},
)
5 changes: 5 additions & 0 deletions go/internal/test/selected_tetra/feature_store.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
project: selected_tetra
registry: data/registry.db
provider: local
online_store:
path: data/online_store.db
42 changes: 21 additions & 21 deletions sdk/python/tests/integration/feature_repos/repo_configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,28 +74,28 @@
[
IntegrationTestRepoConfig(online_store=REDIS_CONFIG),
# GCP configurations
IntegrationTestRepoConfig(
provider="gcp",
offline_store_creator=BigQueryDataSourceCreator,
online_store="datastore",
),
IntegrationTestRepoConfig(
provider="gcp",
offline_store_creator=BigQueryDataSourceCreator,
online_store=REDIS_CONFIG,
),
# IntegrationTestRepoConfig(
# provider="gcp",
# offline_store_creator=BigQueryDataSourceCreator,
# online_store="datastore",
# ),
# IntegrationTestRepoConfig(
# provider="gcp",
# offline_store_creator=BigQueryDataSourceCreator,
# online_store=REDIS_CONFIG,
# ),
# AWS configurations
IntegrationTestRepoConfig(
provider="aws",
offline_store_creator=RedshiftDataSourceCreator,
online_store=DYNAMO_CONFIG,
python_feature_server=True,
),
IntegrationTestRepoConfig(
provider="aws",
offline_store_creator=RedshiftDataSourceCreator,
online_store=REDIS_CONFIG,
),
# IntegrationTestRepoConfig(
# provider="aws",
# offline_store_creator=RedshiftDataSourceCreator,
# online_store=DYNAMO_CONFIG,
# python_feature_server=True,
# ),
# IntegrationTestRepoConfig(
# provider="aws",
# offline_store_creator=RedshiftDataSourceCreator,
# online_store=REDIS_CONFIG,
# ),
# Snowflake configurations
IntegrationTestRepoConfig(
provider="aws", # no list features, no feature server
Expand Down