Skip to content

Commit 67d2992

Browse files
authored
chore: Minimize impact of enabled feature logging on latency (feast-dev#2729)
* improve latency when feature logging is enabled Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com> * revert flush interval Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com> * some clean up Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com>
1 parent 44d53fd commit 67d2992

File tree

10 files changed

+149
-106
lines changed

10 files changed

+149
-106
lines changed

go/internal/feast/server/logging/filelogsink.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,15 +34,15 @@ func NewFileLogSink(path string) (*FileLogSink, error) {
3434
return &FileLogSink{path: absPath}, nil
3535
}
3636

37-
func (s *FileLogSink) Write(record arrow.Record) error {
37+
func (s *FileLogSink) Write(records []arrow.Record) error {
3838
fileName, _ := uuid.NewUUID()
3939

4040
var writer io.Writer
4141
writer, err := os.Create(filepath.Join(s.path, fmt.Sprintf("%s.parquet", fileName.String())))
4242
if err != nil {
4343
return err
4444
}
45-
table := array.NewTableFromRecords(record.Schema(), []arrow.Record{record})
45+
table := array.NewTableFromRecords(records[0].Schema(), records)
4646

4747
props := parquet.NewWriterProperties(parquet.WithDictionaryDefault(false))
4848
arrProps := pqarrow.DefaultWriterProps()

go/internal/feast/server/logging/logger.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ type LogSink interface {
3333
// Write is used to unload logs from memory buffer.
3434
// Logs are not guaranteed to be flushed to sink on this point.
3535
// The data can just be written to local disk (depending on implementation).
36-
Write(data arrow.Record) error
36+
Write(data []arrow.Record) error
3737

3838
// Flush actually send data to a sink.
3939
// We want to control amount to interaction with sink, since it could be a costly operation.
@@ -75,17 +75,18 @@ func NewLoggerConfig(sampleRate float32, opts LoggingOptions) LoggerConfig {
7575
}
7676

7777
func NewLogger(schema *FeatureServiceSchema, featureServiceName string, sink LogSink, config LoggerConfig) (*LoggerImpl, error) {
78+
buffer, err := NewMemoryBuffer(schema)
79+
if err != nil {
80+
return nil, err
81+
}
7882
logger := &LoggerImpl{
7983
featureServiceName: featureServiceName,
8084

8185
logCh: make(chan *Log, config.ChannelCapacity),
8286
signalCh: make(chan interface{}, 2),
8387
sink: sink,
8488

85-
buffer: &MemoryBuffer{
86-
logs: make([]*Log, 0),
87-
schema: schema,
88-
},
89+
buffer: buffer,
8990
schema: schema,
9091
config: config,
9192

go/internal/feast/server/logging/logger_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import (
2424

2525
type DummySink struct{}
2626

27-
func (s *DummySink) Write(rec arrow.Record) error {
27+
func (s *DummySink) Write(recs []arrow.Record) error {
2828
return nil
2929
}
3030

@@ -42,7 +42,7 @@ func TestLoggingChannelTimeout(t *testing.T) {
4242
WriteInterval: DefaultOptions.WriteInterval,
4343
},
4444
}
45-
logger, err := NewLogger(nil, "testFS", &DummySink{}, config)
45+
logger, err := NewLogger(&FeatureServiceSchema{}, "testFS", &DummySink{}, config)
4646

4747
// stop log processing to check buffering channel
4848
logger.Stop()

go/internal/feast/server/logging/memorybuffer.go

Lines changed: 51 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package logging
22

33
import (
44
"fmt"
5-
65
"github.com/apache/arrow/go/v8/arrow"
76
"github.com/apache/arrow/go/v8/arrow/array"
87
"github.com/apache/arrow/go/v8/arrow/memory"
@@ -14,63 +13,97 @@ import (
1413
type MemoryBuffer struct {
1514
logs []*Log
1615
schema *FeatureServiceSchema
16+
17+
arrowSchema *arrow.Schema
18+
records []arrow.Record
1719
}
1820

1921
const (
2022
LOG_TIMESTAMP_FIELD = "__log_timestamp"
2123
LOG_DATE_FIELD = "__log_date"
2224
LOG_REQUEST_ID_FIELD = "__request_id"
25+
RECORD_SIZE = 1000
2326
)
2427

28+
func NewMemoryBuffer(schema *FeatureServiceSchema) (*MemoryBuffer, error) {
29+
arrowSchema, err := getArrowSchema(schema)
30+
if err != nil {
31+
return nil, err
32+
}
33+
return &MemoryBuffer{
34+
logs: make([]*Log, 0),
35+
records: make([]arrow.Record, 0),
36+
schema: schema,
37+
arrowSchema: arrowSchema,
38+
}, nil
39+
}
40+
2541
// Acquires the logging schema from the feature service, converts the memory buffer array of rows of logs and flushes
2642
// them to the offline storage.
2743
func (b *MemoryBuffer) writeBatch(sink LogSink) error {
28-
if len(b.logs) == 0 {
29-
return nil
44+
if len(b.logs) > 0 {
45+
err := b.Compact()
46+
if err != nil {
47+
return err
48+
}
3049
}
3150

32-
record, err := b.convertToArrowRecord()
33-
34-
if err != nil {
35-
return err
51+
if len(b.records) == 0 {
52+
return nil
3653
}
37-
err = sink.Write(record)
54+
55+
err := sink.Write(b.records)
3856
if err != nil {
3957
return err
4058
}
4159

42-
b.logs = b.logs[:0]
60+
b.records = b.records[:0]
4361
return nil
4462
}
4563

4664
func (b *MemoryBuffer) Append(log *Log) error {
4765
b.logs = append(b.logs, log)
66+
67+
if len(b.logs) == RECORD_SIZE {
68+
return b.Compact()
69+
}
70+
4871
return nil
4972
}
5073

51-
func (b *MemoryBuffer) getArrowSchema() (*arrow.Schema, error) {
74+
func (b *MemoryBuffer) Compact() error {
75+
rec, err := b.convertToArrowRecord()
76+
if err != nil {
77+
return err
78+
}
79+
b.records = append(b.records, rec)
80+
b.logs = b.logs[:0]
81+
return nil
82+
}
83+
84+
func getArrowSchema(schema *FeatureServiceSchema) (*arrow.Schema, error) {
5285
fields := make([]arrow.Field, 0)
5386

54-
for _, joinKey := range b.schema.JoinKeys {
55-
arrowType, err := gotypes.ValueTypeEnumToArrowType(b.schema.JoinKeysTypes[joinKey])
87+
for _, joinKey := range schema.JoinKeys {
88+
arrowType, err := gotypes.ValueTypeEnumToArrowType(schema.JoinKeysTypes[joinKey])
5689
if err != nil {
5790
return nil, err
5891
}
5992

6093
fields = append(fields, arrow.Field{Name: joinKey, Type: arrowType})
6194
}
6295

63-
for _, requestParam := range b.schema.RequestData {
64-
arrowType, err := gotypes.ValueTypeEnumToArrowType(b.schema.RequestDataTypes[requestParam])
96+
for _, requestParam := range schema.RequestData {
97+
arrowType, err := gotypes.ValueTypeEnumToArrowType(schema.RequestDataTypes[requestParam])
6598
if err != nil {
6699
return nil, err
67100
}
68101

69102
fields = append(fields, arrow.Field{Name: requestParam, Type: arrowType})
70103
}
71104

72-
for _, featureName := range b.schema.Features {
73-
arrowType, err := gotypes.ValueTypeEnumToArrowType(b.schema.FeaturesTypes[featureName])
105+
for _, featureName := range schema.Features {
106+
arrowType, err := gotypes.ValueTypeEnumToArrowType(schema.FeaturesTypes[featureName])
74107
if err != nil {
75108
return nil, err
76109
}
@@ -98,18 +131,13 @@ func (b *MemoryBuffer) convertToArrowRecord() (arrow.Record, error) {
98131
arrowMemory := memory.NewGoAllocator()
99132
numRows := len(b.logs)
100133

101-
arrowSchema, err := b.getArrowSchema()
102-
if err != nil {
103-
return nil, err
104-
}
105-
106134
columns := make(map[string][]*types.Value)
107135
fieldNameToIdx := make(map[string]int)
108-
for idx, field := range arrowSchema.Fields() {
136+
for idx, field := range b.arrowSchema.Fields() {
109137
fieldNameToIdx[field.Name] = idx
110138
}
111139

112-
builder := array.NewRecordBuilder(arrowMemory, arrowSchema)
140+
builder := array.NewRecordBuilder(arrowMemory, b.arrowSchema)
113141
defer builder.Release()
114142

115143
builder.Reserve(numRows)

go/internal/feast/server/logging/memorybuffer_test.go

Lines changed: 38 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -37,10 +37,6 @@ func TestArrowSchemaGeneration(t *testing.T) {
3737
},
3838
}
3939

40-
b := &MemoryBuffer{
41-
schema: schema,
42-
}
43-
4440
expectedArrowSchema := []arrow.Field{
4541
{Name: "driver_id", Type: arrow.PrimitiveTypes.Int32},
4642
{Name: "featureView1__int64", Type: arrow.PrimitiveTypes.Int64},
@@ -60,7 +56,7 @@ func TestArrowSchemaGeneration(t *testing.T) {
6056
{Name: "__request_id", Type: arrow.BinaryTypes.String},
6157
}
6258

63-
actualSchema, err := b.getArrowSchema()
59+
actualSchema, err := getArrowSchema(schema)
6460
assert.Nil(t, err)
6561
assert.Equal(t, expectedArrowSchema, actualSchema.Fields())
6662
}
@@ -84,51 +80,46 @@ func TestSerializeToArrowTable(t *testing.T) {
8480
}
8581

8682
ts := timestamppb.New(time.Now())
87-
b := &MemoryBuffer{
88-
schema: schema,
89-
logs: []*Log{
90-
{
91-
EntityValue: []*types.Value{
92-
{Val: &types.Value_Int64Val{Int64Val: 1001}},
93-
},
94-
FeatureValues: []*types.Value{
95-
{Val: &types.Value_Int64Val{Int64Val: rand.Int63()}},
96-
{Val: &types.Value_FloatVal{FloatVal: rand.Float32()}},
97-
},
98-
FeatureStatuses: []serving.FieldStatus{
99-
serving.FieldStatus_PRESENT,
100-
serving.FieldStatus_OUTSIDE_MAX_AGE,
101-
},
102-
EventTimestamps: []*timestamppb.Timestamp{
103-
ts, ts,
104-
},
105-
RequestId: "aaa",
106-
LogTimestamp: time.Now(),
107-
},
108-
{
109-
EntityValue: []*types.Value{
110-
{Val: &types.Value_Int64Val{Int64Val: 1003}},
111-
},
112-
FeatureValues: []*types.Value{
113-
{Val: &types.Value_Int64Val{Int64Val: rand.Int63()}},
114-
{Val: &types.Value_FloatVal{FloatVal: rand.Float32()}},
115-
},
116-
FeatureStatuses: []serving.FieldStatus{
117-
serving.FieldStatus_PRESENT,
118-
serving.FieldStatus_PRESENT,
119-
},
120-
EventTimestamps: []*timestamppb.Timestamp{
121-
ts, ts,
122-
},
123-
RequestId: "bbb",
124-
LogTimestamp: time.Now(),
125-
},
83+
b, _ := NewMemoryBuffer(schema)
84+
b.Append(&Log{
85+
EntityValue: []*types.Value{
86+
{Val: &types.Value_Int64Val{Int64Val: 1001}},
12687
},
127-
}
88+
FeatureValues: []*types.Value{
89+
{Val: &types.Value_Int64Val{Int64Val: rand.Int63()}},
90+
{Val: &types.Value_FloatVal{FloatVal: rand.Float32()}},
91+
},
92+
FeatureStatuses: []serving.FieldStatus{
93+
serving.FieldStatus_PRESENT,
94+
serving.FieldStatus_OUTSIDE_MAX_AGE,
95+
},
96+
EventTimestamps: []*timestamppb.Timestamp{
97+
ts, ts,
98+
},
99+
RequestId: "aaa",
100+
LogTimestamp: time.Now(),
101+
})
102+
b.Append(&Log{
103+
EntityValue: []*types.Value{
104+
{Val: &types.Value_Int64Val{Int64Val: 1003}},
105+
},
106+
FeatureValues: []*types.Value{
107+
{Val: &types.Value_Int64Val{Int64Val: rand.Int63()}},
108+
{Val: &types.Value_FloatVal{FloatVal: rand.Float32()}},
109+
},
110+
FeatureStatuses: []serving.FieldStatus{
111+
serving.FieldStatus_PRESENT,
112+
serving.FieldStatus_PRESENT,
113+
},
114+
EventTimestamps: []*timestamppb.Timestamp{
115+
ts, ts,
116+
},
117+
RequestId: "bbb",
118+
LogTimestamp: time.Now(),
119+
})
128120

129121
pool := memory.NewGoAllocator()
130-
arrowSchema, _ := b.getArrowSchema()
131-
builder := array.NewRecordBuilder(pool, arrowSchema)
122+
builder := array.NewRecordBuilder(pool, b.arrowSchema)
132123
defer builder.Release()
133124

134125
// join key: driver_id

go/internal/feast/server/logging/offlinestoresink.go

Lines changed: 14 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
package logging
22

33
import (
4-
"errors"
54
"fmt"
65
"io"
76
"io/ioutil"
7+
"log"
88
"os"
99
"path/filepath"
1010

@@ -41,16 +41,7 @@ func (s *OfflineStoreSink) getOrCreateDatasetDir() (string, error) {
4141
return s.datasetDir, nil
4242
}
4343

44-
func (s *OfflineStoreSink) cleanCurrentDatasetDir() error {
45-
if s.datasetDir == "" {
46-
return nil
47-
}
48-
datasetDir := s.datasetDir
49-
s.datasetDir = ""
50-
return os.RemoveAll(datasetDir)
51-
}
52-
53-
func (s *OfflineStoreSink) Write(record arrow.Record) error {
44+
func (s *OfflineStoreSink) Write(records []arrow.Record) error {
5445
fileName, _ := uuid.NewUUID()
5546
datasetDir, err := s.getOrCreateDatasetDir()
5647
if err != nil {
@@ -62,7 +53,7 @@ func (s *OfflineStoreSink) Write(record arrow.Record) error {
6253
if err != nil {
6354
return err
6455
}
65-
table := array.NewTableFromRecords(record.Schema(), []arrow.Record{record})
56+
table := array.NewTableFromRecords(records[0].Schema(), records)
6657

6758
props := parquet.NewWriterProperties(parquet.WithDictionaryDefault(false))
6859
arrProps := pqarrow.DefaultWriterProps()
@@ -74,10 +65,16 @@ func (s *OfflineStoreSink) Flush(featureServiceName string) error {
7465
return nil
7566
}
7667

77-
errMsg := s.writeCallback(featureServiceName, s.datasetDir)
78-
if errMsg != "" {
79-
return errors.New(errMsg)
80-
}
68+
datasetDir := s.datasetDir
69+
s.datasetDir = ""
70+
71+
go func() {
72+
errMsg := s.writeCallback(featureServiceName, datasetDir)
73+
if errMsg != "" {
74+
log.Println(errMsg)
75+
}
76+
os.RemoveAll(datasetDir)
77+
}()
8178

82-
return s.cleanCurrentDatasetDir()
79+
return nil
8380
}

go/internal/feast/server/logging/service.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ type LoggingService struct {
4242

4343
var (
4444
DefaultOptions = LoggingOptions{
45-
ChannelCapacity: 1000,
45+
ChannelCapacity: 100000,
4646
FlushInterval: 10 * time.Minute,
4747
WriteInterval: 10 * time.Second,
4848
EmitTimeout: 10 * time.Millisecond,

0 commit comments

Comments
 (0)