Skip to content
Merged
Prev Previous commit
Next Next commit
Fix memory leak
Signed-off-by: Felix Wang <wangfelix98@gmail.com>
  • Loading branch information
felixwang9817 committed Jul 13, 2022
commit ae71efa50aedb4e5b6b8513e9cd705fe27430afc
30 changes: 30 additions & 0 deletions go/embedded/online_features.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ type OnlineFeatureService struct {
grpcStopCh chan os.Signal
httpStopCh chan os.Signal

statusColumnBuildersToRelease []*array.Int32Builder
tsColumnBuildersToRelease []*array.Int64Builder
arraysToRelease []arrow.Array
resultsToRelease []arrow.Record

err error
}

Expand Down Expand Up @@ -180,6 +185,24 @@ func (s *OnlineFeatureService) GetOnlineFeatures(
return err
}

// Release all objects that are no longer required.
for _, statusColumnBuilderToRelease := range s.statusColumnBuildersToRelease {
statusColumnBuilderToRelease.Release()
}
for _, tsColumnBuilderToRelease := range s.tsColumnBuildersToRelease {
tsColumnBuilderToRelease.Release()
}
for _, arrayToRelease := range s.arraysToRelease {
arrayToRelease.Release()
}
for _, resultsToRelease := range s.resultsToRelease {
resultsToRelease.Release()
}
s.statusColumnBuildersToRelease = nil
s.tsColumnBuildersToRelease = nil
s.arraysToRelease = nil
s.resultsToRelease = nil

outputFields := make([]arrow.Field, 0)
outputColumns := make([]arrow.Array, 0)
pool := memory.NewCgoArrowAllocator()
Expand Down Expand Up @@ -212,9 +235,16 @@ func (s *OnlineFeatureService) GetOnlineFeatures(
}
tsColumn := tsColumnBuilder.NewArray()
outputColumns = append(outputColumns, tsColumn)

// Mark builders and arrays for release.
s.statusColumnBuildersToRelease = append(s.statusColumnBuildersToRelease, statusColumnBuilder)
s.tsColumnBuildersToRelease = append(s.tsColumnBuildersToRelease, tsColumnBuilder)
s.arraysToRelease = append(s.arraysToRelease, statusColumn)
s.arraysToRelease = append(s.arraysToRelease, tsColumn)
}

result := array.NewRecord(arrow.NewSchema(outputFields, nil), outputColumns, int64(numRows))
s.resultsToRelease = append(s.resultsToRelease, result)

cdata.ExportArrowRecordBatch(result,
cdata.ArrayFromPtr(output.DataPtr),
Expand Down