Skip to content

Commit 57a97d8

Browse files
kevjumbapyalex
andauthored
feat: Add DQM Logging on GRPC Server with FileLogStorage for Testing (#2403)
* Make a proof of concept Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Update Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * revert feature store Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * refactor Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Add time Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Add time Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * clean up Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Add comment Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Add pseudocode Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Refactor logging functionality to hide internals Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Refactor Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Revert changes Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Add tests Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Add new timeout test Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Fix python ci for m1 mac Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Fix lint Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Working state Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Move offline log store Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * refactor Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Update logs Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Update log storage Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * WOrking state Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Work Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Add tests for filestorage Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Fix logging Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Add more tests Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Fix Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Fix Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Clean up Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Update error Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * semi working state Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * b state Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Update types to be public Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Update structs to make fields public Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Fix Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * clean up Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Fix Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Fix go Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Fix issues Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Fix Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Fix tests Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Fix Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Working state Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Fix Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * fix Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * fix Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Clean up code a bit Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Fixes Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Fix Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Fix tests Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Fix Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Clean up Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Update schema functionality Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Remove xitongsys parquet reader Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Clean up Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Fix go mode Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Fix tests and errors and everything Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Fix tests Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Fix Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Remove unused code Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Fix Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Last working commit Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * work Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Address some changes Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * More addresses. Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Fix more review comments Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Fix Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Fix Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Rename Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Fix Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Add request id Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * More fixes Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Fix odfv Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Fix Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Fix Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Address other changes Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Reorder for optimization Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Fix Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Add more shcema tests Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Fix tests Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * refactor to clean Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Add initialized repo for testing Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Fix Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Remove Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Fix tests Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Fix Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Fix tests Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Fix Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Text Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Fix Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Fix? Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Fix Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Fix Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * remove entity map Signed-off-by: pyalex <moskalenko.alexey@gmail.com> * remove Cache method from registry Signed-off-by: pyalex <moskalenko.alexey@gmail.com> * clean up pre-initialized repo Signed-off-by: pyalex <moskalenko.alexey@gmail.com> * git ignore full data directory in tests Signed-off-by: pyalex <moskalenko.alexey@gmail.com> Co-authored-by: pyalex <moskalenko.alexey@gmail.com>
1 parent cf7bbc2 commit 57a97d8

28 files changed

+1532
-120
lines changed

.gitignore

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ coverage.xml
105105
.hypothesis/
106106
.pytest_cache/
107107
infra/scripts/*.conf
108-
go/internal/test/feature_repo
108+
go/cmd/server/logging/feature_repo/data/
109109

110110
# Translations
111111
*.mo

go.mod

Lines changed: 14 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,12 @@ require (
88
github.com/go-python/gopy v0.4.0
99
github.com/go-redis/redis/v8 v8.11.4
1010
github.com/golang/protobuf v1.5.2
11-
github.com/google/uuid v1.2.0
11+
github.com/google/uuid v1.3.0
1212
github.com/mattn/go-sqlite3 v1.14.12
1313
github.com/spaolacci/murmur3 v1.1.0
1414
github.com/stretchr/testify v1.7.0
15-
google.golang.org/grpc v1.44.0
16-
google.golang.org/protobuf v1.27.1
15+
google.golang.org/grpc v1.45.0
16+
google.golang.org/protobuf v1.28.0
1717
)
1818

1919
require (
@@ -23,28 +23,26 @@ require (
2323
github.com/cespare/xxhash/v2 v2.1.2 // indirect
2424
github.com/davecgh/go-spew v1.1.1 // indirect
2525
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
26-
github.com/goccy/go-json v0.7.10 // indirect
26+
github.com/goccy/go-json v0.9.6 // indirect
2727
github.com/golang/snappy v0.0.4 // indirect
28-
github.com/gonuts/commander v0.1.0 // indirect
29-
github.com/gonuts/flag v0.1.0 // indirect
30-
github.com/google/flatbuffers v2.0.5+incompatible // indirect
31-
github.com/klauspost/asmfmt v1.3.1 // indirect
28+
github.com/google/flatbuffers v2.0.6+incompatible // indirect
29+
github.com/klauspost/asmfmt v1.3.2 // indirect
3230
github.com/klauspost/compress v1.15.1 // indirect
33-
github.com/klauspost/cpuid/v2 v2.0.9 // indirect
31+
github.com/klauspost/cpuid/v2 v2.0.12 // indirect
3432
github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 // indirect
3533
github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 // indirect
36-
github.com/pierrec/lz4/v4 v4.1.12 // indirect
37-
github.com/pkg/errors v0.9.1 // indirect
34+
github.com/pierrec/lz4/v4 v4.1.14 // indirect
3835
github.com/pmezard/go-difflib v1.0.0 // indirect
39-
github.com/zeebo/xxh3 v1.0.1 // indirect
40-
golang.org/x/exp v0.0.0-20211216164055-b2b84827b756 // indirect
36+
github.com/zeebo/xxh3 v1.0.2 // indirect
37+
golang.org/x/exp v0.0.0-20220407100705-7b9b53b0aca4 // indirect
4138
golang.org/x/mod v0.6.0-dev.0.20220106191415-9b9b3d81d5e3 // indirect
42-
golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd // indirect
43-
golang.org/x/sys v0.0.0-20220128215802-99c3d69c2c27 // indirect
39+
golang.org/x/net v0.0.0-20220407224826-aac1ed45d8e3 // indirect
40+
golang.org/x/sys v0.0.0-20220406163625-3f8b81556e12 // indirect
4441
golang.org/x/text v0.3.7 // indirect
4542
golang.org/x/tools v0.1.10 // indirect
4643
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
47-
google.golang.org/genproto v0.0.0-20220126215142-9970aeb2e350 // indirect
44+
google.golang.org/genproto v0.0.0-20220407144326-9054f6ed7bac // indirect
45+
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect
4846
gopkg.in/yaml.v2 v2.4.0 // indirect
4947
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c // indirect
5048
)

go.sum

Lines changed: 34 additions & 19 deletions
Large diffs are not rendered by default.

go/cmd/server/logging/feature_repo/__init__.py

Whitespace-only changes.
33.9 KB
Binary file not shown.
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
# This is an example feature definition file
2+
3+
from google.protobuf.duration_pb2 import Duration
4+
5+
from feast import Entity, Feature, FeatureView, FileSource, ValueType, FeatureService
6+
7+
# Read data from parquet files. Parquet is convenient for local development mode. For
8+
# production, you can use your favorite DWH, such as BigQuery. See Feast documentation
9+
# for more info.
10+
driver_hourly_stats = FileSource(
11+
path="driver_stats.parquet",
12+
event_timestamp_column="event_timestamp",
13+
created_timestamp_column="created",
14+
)
15+
16+
# Define an entity for the driver. You can think of entity as a primary key used to
17+
# fetch features.
18+
driver = Entity(name="driver_id", value_type=ValueType.INT64, description="driver id",)
19+
20+
# Our parquet files contain sample data that includes a driver_id column, timestamps and
21+
# three feature column. Here we define a Feature View that will allow us to serve this
22+
# data to our model online.
23+
driver_hourly_stats_view = FeatureView(
24+
name="driver_hourly_stats",
25+
entities=["driver_id"],
26+
ttl=Duration(seconds=86400 * 365 * 10),
27+
features=[
28+
Feature(name="conv_rate", dtype=ValueType.FLOAT),
29+
Feature(name="acc_rate", dtype=ValueType.FLOAT),
30+
Feature(name="avg_daily_trips", dtype=ValueType.INT64),
31+
],
32+
online=True,
33+
batch_source=driver_hourly_stats,
34+
tags={},
35+
)
36+
37+
driver_stats_fs = FeatureService(
38+
name="test_service",
39+
features=[driver_hourly_stats_view]
40+
)
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
project: feature_repo
2+
registry: data/registry.db
3+
provider: local
4+
online_store:
5+
path: data/online_store.db
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
package logging
2+
3+
import (
4+
"errors"
5+
"fmt"
6+
"io"
7+
"os"
8+
"path/filepath"
9+
10+
"github.com/apache/arrow/go/v8/arrow/array"
11+
"github.com/apache/arrow/go/v8/parquet"
12+
"github.com/apache/arrow/go/v8/parquet/pqarrow"
13+
"github.com/feast-dev/feast/go/internal/feast/registry"
14+
)
15+
16+
type FileLogStorage struct {
17+
// Feast project name
18+
project string
19+
path string
20+
}
21+
22+
func GetFileConfig(config *registry.RepoConfig) (*OfflineLogStoreConfig, error) {
23+
fileConfig := OfflineLogStoreConfig{
24+
storeType: "file",
25+
}
26+
if onlineStorePath, ok := config.OfflineStore["path"]; ok {
27+
path, success := onlineStorePath.(string)
28+
if !success {
29+
return &fileConfig, fmt.Errorf("path, %s, cannot be converted to string", path)
30+
}
31+
fileConfig.path = path
32+
} else {
33+
return nil, errors.New("need path for file log storage")
34+
}
35+
return &fileConfig, nil
36+
}
37+
38+
// This offline store is currently only used for testing. It will be instantiated during go unit tests to log to file
39+
// and the parquet files will be cleaned up after the test is run.
40+
func NewFileOfflineStore(project string, offlineStoreConfig *OfflineLogStoreConfig) (*FileLogStorage, error) {
41+
store := FileLogStorage{project: project}
42+
var absPath string
43+
var err error
44+
// TODO(kevjumba) remove this default catch.
45+
if offlineStoreConfig.path != "" {
46+
absPath, err = filepath.Abs(offlineStoreConfig.path)
47+
} else {
48+
return nil, errors.New("need path for file log storage")
49+
}
50+
if err != nil {
51+
return nil, err
52+
}
53+
store.path = absPath
54+
return &store, nil
55+
}
56+
57+
func openLogFile(absPath string) (*os.File, error) {
58+
var _, err = os.Stat(absPath)
59+
60+
// create file if not exists
61+
if os.IsNotExist(err) {
62+
var file, err = os.Create(absPath)
63+
if err != nil {
64+
return nil, err
65+
}
66+
return file, nil
67+
} else {
68+
return nil, fmt.Errorf("path %s already exists", absPath)
69+
}
70+
}
71+
72+
func (f *FileLogStorage) FlushToStorage(tbl array.Table) error {
73+
w, err := openLogFile(f.path)
74+
var writer io.Writer = w
75+
if err != nil {
76+
return err
77+
}
78+
props := parquet.NewWriterProperties(parquet.WithDictionaryDefault(false))
79+
arrProps := pqarrow.DefaultWriterProps()
80+
err = pqarrow.WriteTable(tbl, writer, 100, props, arrProps)
81+
if err != nil {
82+
return err
83+
}
84+
return nil
85+
86+
}
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
package logging
2+
3+
import (
4+
"context"
5+
"path/filepath"
6+
7+
"testing"
8+
9+
"github.com/apache/arrow/go/v8/arrow/array"
10+
"github.com/apache/arrow/go/v8/arrow/memory"
11+
"github.com/apache/arrow/go/v8/parquet/file"
12+
"github.com/apache/arrow/go/v8/parquet/pqarrow"
13+
"github.com/feast-dev/feast/go/internal/test"
14+
"github.com/stretchr/testify/assert"
15+
)
16+
17+
func TestFlushToStorage(t *testing.T) {
18+
ctx := context.Background()
19+
table, expectedSchema, expectedColumns, err := GetTestArrowTableAndExpectedResults()
20+
defer table.Release()
21+
assert.Nil(t, err)
22+
offlineStoreConfig := OfflineLogStoreConfig{
23+
storeType: "file",
24+
path: "./log.parquet",
25+
}
26+
fileStore, err := NewFileOfflineStore("test", &offlineStoreConfig)
27+
assert.Nil(t, err)
28+
err = fileStore.FlushToStorage(array.Table(table))
29+
assert.Nil(t, err)
30+
logPath, err := filepath.Abs(offlineStoreConfig.path)
31+
assert.Nil(t, err)
32+
pf, err := file.OpenParquetFile(logPath, false)
33+
assert.Nil(t, err)
34+
35+
reader, err := pqarrow.NewFileReader(pf, pqarrow.ArrowReadProperties{}, memory.DefaultAllocator)
36+
assert.Nil(t, err)
37+
38+
tbl, err := reader.ReadTable(ctx)
39+
assert.Nil(t, err)
40+
tr := array.NewTableReader(tbl, -1)
41+
defer tbl.Release()
42+
43+
defer tr.Release()
44+
for tr.Next() {
45+
rec := tr.Record()
46+
assert.NotNil(t, rec)
47+
for _, field := range rec.Schema().Fields() {
48+
assert.Contains(t, expectedSchema, field.Name)
49+
assert.Equal(t, field.Type, expectedSchema[field.Name])
50+
}
51+
values, err := test.GetProtoFromRecord(rec)
52+
53+
assert.Nil(t, err)
54+
for name, val := range values {
55+
if name == "RequestId" {
56+
// Ensure there are request ids in record.
57+
assert.Greater(t, len(val.Val), 0)
58+
} else {
59+
assert.Equal(t, len(val.Val), len(expectedColumns[name].Val))
60+
for idx, featureVal := range val.Val {
61+
assert.Equal(t, featureVal.Val, expectedColumns[name].Val[idx].Val)
62+
}
63+
}
64+
}
65+
}
66+
67+
err = test.CleanUpFile(logPath)
68+
assert.Nil(t, err)
69+
70+
}

0 commit comments

Comments
 (0)