@@ -2,7 +2,6 @@ package logging
22
33import (
44 "fmt"
5-
65 "github.com/apache/arrow/go/v8/arrow"
76 "github.com/apache/arrow/go/v8/arrow/array"
87 "github.com/apache/arrow/go/v8/arrow/memory"
@@ -14,63 +13,97 @@ import (
1413type MemoryBuffer struct {
1514 logs []* Log
1615 schema * FeatureServiceSchema
16+
17+ arrowSchema * arrow.Schema
18+ records []arrow.Record
1719}
1820
1921const (
2022 LOG_TIMESTAMP_FIELD = "__log_timestamp"
2123 LOG_DATE_FIELD = "__log_date"
2224 LOG_REQUEST_ID_FIELD = "__request_id"
25+ RECORD_SIZE = 1000
2326)
2427
28+ func NewMemoryBuffer (schema * FeatureServiceSchema ) (* MemoryBuffer , error ) {
29+ arrowSchema , err := getArrowSchema (schema )
30+ if err != nil {
31+ return nil , err
32+ }
33+ return & MemoryBuffer {
34+ logs : make ([]* Log , 0 ),
35+ records : make ([]arrow.Record , 0 ),
36+ schema : schema ,
37+ arrowSchema : arrowSchema ,
38+ }, nil
39+ }
40+
2541// Acquires the logging schema from the feature service, converts the memory buffer array of rows of logs and flushes
2642// them to the offline storage.
2743func (b * MemoryBuffer ) writeBatch (sink LogSink ) error {
28- if len (b .logs ) == 0 {
29- return nil
44+ if len (b .logs ) > 0 {
45+ err := b .Compact ()
46+ if err != nil {
47+ return err
48+ }
3049 }
3150
32- record , err := b .convertToArrowRecord ()
33-
34- if err != nil {
35- return err
51+ if len (b .records ) == 0 {
52+ return nil
3653 }
37- err = sink .Write (record )
54+
55+ err := sink .Write (b .records )
3856 if err != nil {
3957 return err
4058 }
4159
42- b .logs = b .logs [:0 ]
60+ b .records = b .records [:0 ]
4361 return nil
4462}
4563
4664func (b * MemoryBuffer ) Append (log * Log ) error {
4765 b .logs = append (b .logs , log )
66+
67+ if len (b .logs ) == RECORD_SIZE {
68+ return b .Compact ()
69+ }
70+
4871 return nil
4972}
5073
51- func (b * MemoryBuffer ) getArrowSchema () (* arrow.Schema , error ) {
74+ func (b * MemoryBuffer ) Compact () error {
75+ rec , err := b .convertToArrowRecord ()
76+ if err != nil {
77+ return err
78+ }
79+ b .records = append (b .records , rec )
80+ b .logs = b .logs [:0 ]
81+ return nil
82+ }
83+
84+ func getArrowSchema (schema * FeatureServiceSchema ) (* arrow.Schema , error ) {
5285 fields := make ([]arrow.Field , 0 )
5386
54- for _ , joinKey := range b . schema .JoinKeys {
55- arrowType , err := gotypes .ValueTypeEnumToArrowType (b . schema .JoinKeysTypes [joinKey ])
87+ for _ , joinKey := range schema .JoinKeys {
88+ arrowType , err := gotypes .ValueTypeEnumToArrowType (schema .JoinKeysTypes [joinKey ])
5689 if err != nil {
5790 return nil , err
5891 }
5992
6093 fields = append (fields , arrow.Field {Name : joinKey , Type : arrowType })
6194 }
6295
63- for _ , requestParam := range b . schema .RequestData {
64- arrowType , err := gotypes .ValueTypeEnumToArrowType (b . schema .RequestDataTypes [requestParam ])
96+ for _ , requestParam := range schema .RequestData {
97+ arrowType , err := gotypes .ValueTypeEnumToArrowType (schema .RequestDataTypes [requestParam ])
6598 if err != nil {
6699 return nil , err
67100 }
68101
69102 fields = append (fields , arrow.Field {Name : requestParam , Type : arrowType })
70103 }
71104
72- for _ , featureName := range b . schema .Features {
73- arrowType , err := gotypes .ValueTypeEnumToArrowType (b . schema .FeaturesTypes [featureName ])
105+ for _ , featureName := range schema .Features {
106+ arrowType , err := gotypes .ValueTypeEnumToArrowType (schema .FeaturesTypes [featureName ])
74107 if err != nil {
75108 return nil , err
76109 }
@@ -98,18 +131,13 @@ func (b *MemoryBuffer) convertToArrowRecord() (arrow.Record, error) {
98131 arrowMemory := memory .NewGoAllocator ()
99132 numRows := len (b .logs )
100133
101- arrowSchema , err := b .getArrowSchema ()
102- if err != nil {
103- return nil , err
104- }
105-
106134 columns := make (map [string ][]* types.Value )
107135 fieldNameToIdx := make (map [string ]int )
108- for idx , field := range arrowSchema .Fields () {
136+ for idx , field := range b . arrowSchema .Fields () {
109137 fieldNameToIdx [field .Name ] = idx
110138 }
111139
112- builder := array .NewRecordBuilder (arrowMemory , arrowSchema )
140+ builder := array .NewRecordBuilder (arrowMemory , b . arrowSchema )
113141 defer builder .Release ()
114142
115143 builder .Reserve (numRows )
0 commit comments