-
Notifications
You must be signed in to change notification settings - Fork 1.3k
feat: Add DQM Logging on GRPC Server with FileLogStorage for Testing #2403
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
65880a9
b32b157
1996fd9
c6656d4
17c8168
a4efbce
79d8ba7
cd652bc
4a13e8e
a3f8384
706ee80
1831003
9e669c3
b972634
42bdc8b
ab8d1e0
f6c2c87
b6b412d
cb1da99
155fc77
86f2208
746dae9
514c2f6
161d81d
beec6b2
2a1d2ec
9446e05
31e0de8
137d700
ee847cd
ce12eee
79c700c
6cbf2c2
aefda92
29f72b0
c4dbb84
5d7978f
686c583
884b014
c02cf83
e89abc1
559bb83
0f2aca9
c631ef8
b0a2166
9ed79e8
e472104
629be18
8a789fb
6357efd
3168473
00228f7
b692e81
49cd91a
4890116
71e1e56
050db82
dd235ca
7c92d93
4265efd
79cbe42
21e99bd
1b173da
b5484c3
5458034
17b2bf4
0a1802d
18c7d60
a39c9b5
a4587c3
83c5f89
86e605d
fa14dae
9f81d04
e87fe22
bdf0c2b
f59f98d
d185ccd
3747a5d
e9bd35b
89974b3
d51544b
e0a4ec6
5f9a50e
1a5d98e
b4d94dc
8d9d0f9
4e65987
9d4effa
e604750
49fb8bc
72b1700
b4f7f41
f19d19d
0145e3d
cad1156
89960b2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
Signed-off-by: Kevin Zhang <kzhang@tecton.ai>
- Loading branch information
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -17,6 +17,9 @@ import ( | |
| "google.golang.org/protobuf/types/known/timestamppb" | ||
| ) | ||
|
|
||
| const DEFAULT_LOG_FLUSH_INTERVAL = 100 * time.Millisecond | ||
| const DEFAULT_LOG_INSERT_TIMEOUT = 20 * time.Millisecond | ||
|
|
||
| type Log struct { | ||
| // Example: val{int64_val: 5017}, val{int64_val: 1003} | ||
| EntityValue []*types.Value | ||
|
|
@@ -37,16 +40,19 @@ type LoggingService struct { | |
| logChannel chan *Log | ||
| fs *feast.FeatureStore | ||
| offlineLogStorage OfflineLogStorage | ||
| logInsertTTl time.Duration | ||
| logFlushInterval time.Duration | ||
| } | ||
|
|
||
| func NewLoggingService(fs *feast.FeatureStore, logChannelCapacity int, featureServiceName string, enableLogging bool) (*LoggingService, error) { | ||
| func NewLoggingService(fs *feast.FeatureStore, logChannelCapacity int, featureServiceName string, enableLogProcessing bool) (*LoggingService, error) { | ||
| var featureService *model.FeatureService = nil | ||
| var err error | ||
| if enableLogging { | ||
| if fs != nil { | ||
| featureService, err = fs.GetFeatureService(featureServiceName) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
|
|
||
| } | ||
|
|
||
| loggingService := &LoggingService{ | ||
|
|
@@ -55,20 +61,23 @@ func NewLoggingService(fs *feast.FeatureStore, logChannelCapacity int, featureSe | |
| logs: make([]*Log, 0), | ||
| featureService: featureService, | ||
| }, | ||
| fs: fs, | ||
| fs: fs, | ||
| logInsertTTl: DEFAULT_LOG_INSERT_TIMEOUT, | ||
| logFlushInterval: DEFAULT_LOG_FLUSH_INTERVAL, | ||
| } | ||
|
|
||
| if !enableLogging || fs == nil { | ||
| loggingService.offlineLogStorage = nil | ||
| } else { | ||
| if fs != nil { | ||
| offlineLogStorage, err := NewOfflineStore(fs.GetRepoConfig()) | ||
| loggingService.offlineLogStorage = offlineLogStorage | ||
|
|
||
| if err != nil { | ||
| return nil, err | ||
| } | ||
| // Start goroutine to process logs | ||
| } | ||
|
|
||
| // Start goroutine to process logs | ||
| if enableLogProcessing { | ||
| go loggingService.processLogs() | ||
|
|
||
| } | ||
| return loggingService, nil | ||
| } | ||
|
|
@@ -77,15 +86,15 @@ func (s *LoggingService) EmitLog(l *Log) error { | |
| select { | ||
| case s.logChannel <- l: | ||
| return nil | ||
| case <-time.After(20 * time.Millisecond): | ||
| case <-time.After(s.logInsertTTl): | ||
| return fmt.Errorf("could not add to log channel with capacity %d. Operation timed out. Current log channel length is %d", cap(s.logChannel), len(s.logChannel)) | ||
| } | ||
| } | ||
|
|
||
| func (s *LoggingService) processLogs() { | ||
| // start a periodic flush | ||
| // TODO(kevjumba): set param so users can configure flushing duration | ||
| ticker := time.NewTicker(100 * time.Millisecond) | ||
| ticker := time.NewTicker(s.logFlushInterval) | ||
| defer ticker.Stop() | ||
|
|
||
| for { | ||
|
|
@@ -263,9 +272,8 @@ func GetSchemaFromFeatureService(featureService *model.FeatureService, entities | |
| // Create copies of FeatureView that may contains the same *FeatureView but | ||
| // each differentiated by a *FeatureViewProjection | ||
| featureViewName := featureProjection.Name | ||
| if fv, ok := fvs[featureViewName]; ok { | ||
| for _, f := range fv.Base.Projection.Features { | ||
| // add feature to map | ||
| if _, ok := fvs[featureViewName]; ok { | ||
| for _, f := range featureProjection.Features { | ||
| features = append(features, f.Name) | ||
| allFeatureTypes[f.Name] = f.Dtype | ||
| } | ||
|
|
@@ -278,6 +286,7 @@ func GetSchemaFromFeatureService(featureService *model.FeatureService, entities | |
| return nil, fmt.Errorf("no such feature view found in feature service %s", featureViewName) | ||
| } | ||
| } | ||
|
|
||
| schema := &Schema{ | ||
| Entities: joinKeys, | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I guess here we need only joinKeys that required by current feature service
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For this, there no way of actually loading entities based on feature views or feature service. Should I add this functionality? I will have to update either featureservice to contain references to each entitiy or the entitys that reference the featureviews.
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||
| Features: features, | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldn't
fsalways be not nil?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In some tests it is nil because for example for logger tests, there isn't a need to instantiate a full feature service if I'm only testing individual components.