-
Notifications
You must be signed in to change notification settings - Fork 1.3k
Expand file tree
/
Copy pathservice.go
More file actions
107 lines (84 loc) · 2.48 KB
/
service.go
File metadata and controls
107 lines (84 loc) · 2.48 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
package logging
import (
"sync"
"time"
"github.com/pkg/errors"
"github.com/feast-dev/feast/go/internal/feast/model"
)
type FeatureStore interface {
GetFcosMap() (map[string]*model.Entity, map[string]*model.FeatureView, map[string]*model.OnDemandFeatureView, error)
GetFeatureService(name string) (*model.FeatureService, error)
}
type LoggingOptions struct {
// How many log items can be buffered in channel
ChannelCapacity int
// Waiting time when inserting new log into the channel
EmitTimeout time.Duration
// Interval on which logs buffered in memory will be written to sink
WriteInterval time.Duration
// Interval on which sink will be flushed
// (see LogSink interface for better explanation on differences with Write)
FlushInterval time.Duration
}
type LoggingService struct {
// feature service name -> LoggerImpl
loggers map[string]*LoggerImpl
fs FeatureStore
sink LogSink
opts LoggingOptions
creationLock *sync.Mutex
}
var (
DefaultOptions = LoggingOptions{
ChannelCapacity: 100000,
FlushInterval: 10 * time.Minute,
WriteInterval: 10 * time.Second,
EmitTimeout: 10 * time.Millisecond,
}
)
func NewLoggingService(fs FeatureStore, sink LogSink, opts ...LoggingOptions) (*LoggingService, error) {
if len(opts) == 0 {
opts = append(opts, DefaultOptions)
}
return &LoggingService{
fs: fs,
loggers: make(map[string]*LoggerImpl),
sink: sink,
opts: opts[0],
creationLock: &sync.Mutex{},
}, nil
}
func (s *LoggingService) GetOrCreateLogger(featureService *model.FeatureService) (Logger, error) {
if logger, ok := s.loggers[featureService.Name]; ok {
return logger, nil
}
if featureService.LoggingConfig == nil {
return nil, errors.New("Only feature services with configured logging can be used")
}
s.creationLock.Lock()
defer s.creationLock.Unlock()
// could be created by another go-routine on this point
if logger, ok := s.loggers[featureService.Name]; ok {
return logger, nil
}
if s.sink == nil {
return &DummyLoggerImpl{}, nil
}
config := NewLoggerConfig(featureService.LoggingConfig.SampleRate, s.opts)
schema, err := GenerateSchemaFromFeatureService(s.fs, featureService.Name)
if err != nil {
return nil, err
}
logger, err := NewLogger(schema, featureService.Name, s.sink, config)
if err != nil {
return nil, err
}
s.loggers[featureService.Name] = logger
return logger, nil
}
func (s *LoggingService) Stop() {
for _, logger := range s.loggers {
logger.Stop()
logger.WaitUntilStopped()
}
}