Skip to content

Commit 922dcd3

Browse files
achalsfelixwang9817
authored andcommitted
chore: Fixes and Readme for python<>go interface (#2936)
* chore: Fixes and Readme for python<>go interface Signed-off-by: Achal Shah <achals@gmail.com> * cr Signed-off-by: Achal Shah <achals@gmail.com> * Switch to cgo allocator Signed-off-by: Felix Wang <wangfelix98@gmail.com> * Fix memory leak Signed-off-by: Felix Wang <wangfelix98@gmail.com> * Fix another memory leak Signed-off-by: Felix Wang <wangfelix98@gmail.com> * Do not use cgo allocator for memory buffers Signed-off-by: Felix Wang <wangfelix98@gmail.com> * Switch to cgo allocator for memory buffers Signed-off-by: Felix Wang <wangfelix98@gmail.com> * Switch test to pass Signed-off-by: Felix Wang <wangfelix98@gmail.com> * Use more idiomatic way to test truth Signed-off-by: Felix Wang <wangfelix98@gmail.com> * Update docs Signed-off-by: Felix Wang <wangfelix98@gmail.com> Co-authored-by: Felix Wang <wangfelix98@gmail.com>
1 parent a71b9d0 commit 922dcd3

File tree

7 files changed

+158
-7
lines changed

7 files changed

+158
-7
lines changed

go/README.md

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
This directory contains the Go logic that's executed by the `EmbeddedOnlineFeatureServer` from Python.
2+
3+
## Building and Linking
4+
[gopy](https://github.com/go-python/gopy) generates (and compiles) a CPython extension module from a Go package. That's what we're using here, as visible in [setup.py](../setup.py).
5+
6+
Under the hood, gopy invokes `go build`, and then templates `cgo` stubs for the Go module that exposes the public functions from the Go module as C functions.
7+
For our project, this stuff can be found at `sdk/python/feast/embedded_go/lib/embedded.go` & `sdk/python/feast/embedded_go/lib/embedded_go.h` after running `make compile-go-lib`.
8+
9+
## Arrow memory management
10+
Understanding this is the trickiest part of this integration.
11+
12+
At a high level, when using the Python<>Go integration, the Python layer exports request data into an [Arrow Record batch](https://arrow.apache.org/docs/python/data.html) which is transferred to Go using Arrow's zero copy mechanism.
13+
Similarly, the Go layer converts feature values read from the online store into a Record Batch that's exported to Python using the same mechanics.
14+
15+
The first thing to note is that from the Python perspective, all the export logic assumes that we're exporting to & importing from C, not Go. This is because pyarrow only interops with C, and the fact we're using Go is an implementation detail not relevant to the Python layer.
16+
17+
### Export Entities & Request data from Python to Go
18+
The code exporting to C is this, in [online_feature_service.py](../sdk/python/feast/embedded_go/online_features_service.py)
19+
```
20+
(
21+
entities_c_schema,
22+
entities_ptr_schema,
23+
entities_c_array,
24+
entities_ptr_array,
25+
) = allocate_schema_and_array()
26+
(
27+
req_data_c_schema,
28+
req_data_ptr_schema,
29+
req_data_c_array,
30+
req_data_ptr_array,
31+
) = allocate_schema_and_array()
32+
33+
batch, schema = map_to_record_batch(entities, join_keys_types)
34+
schema._export_to_c(entities_ptr_schema)
35+
batch._export_to_c(entities_ptr_array)
36+
37+
batch, schema = map_to_record_batch(request_data)
38+
schema._export_to_c(req_data_ptr_schema)
39+
batch._export_to_c(req_data_ptr_array)
40+
```
41+
42+
Under the hood, `allocate_schema_and_array` allocates a pointer (`struct ArrowSchema*` and `struct ArrowArray*`) in native memory (i.e. the C layer) using `cffi`.
43+
Next, the RecordBatch exports to this pointer using [`_export_to_c`](https://github.com/apache/arrow/blob/master/python/pyarrow/table.pxi#L2509), which uses [`ExportRecordBatch`](https://arrow.apache.org/docs/cpp/api/c_abi.html#_CPPv417ExportRecordBatchRK11RecordBatchP10ArrowArrayP11ArrowSchema) under the hood.
44+
45+
As per the documentation for ExportRecordBatch:
46+
> Status ExportRecordBatch(const RecordBatch &batch, struct ArrowArray *out, struct ArrowSchema *out_schema = NULLPTR)
47+
> Export C++ RecordBatch using the C data interface format.
48+
>
49+
> The record batch is exported as if it were a struct array. The resulting ArrowArray struct keeps the record batch data and buffers alive until its release callback is called by the consumer.
50+
51+
This is why `GetOnlineFeatures()` in `online_features.go` calls `record.Release()` as below:
52+
```
53+
entitiesRecord, err := readArrowRecord(entities)
54+
if err != nil {
55+
return err
56+
}
57+
defer entitiesRecord.Release()
58+
...
59+
requestDataRecords, err := readArrowRecord(requestData)
60+
if err != nil {
61+
return err
62+
}
63+
defer requestDataRecords.Release()
64+
```
65+
66+
Additionally, we need to pass in a pair of pointers to `GetOnlineFeatures()` that are populated by the Go layer, and the resultant feature values can be passed back to Python (via the C layer) using zero-copy semantics.
67+
That happens as follows:
68+
```
69+
(
70+
features_c_schema,
71+
features_ptr_schema,
72+
features_c_array,
73+
features_ptr_array,
74+
) = allocate_schema_and_array()
75+
76+
...
77+
78+
record_batch = pa.RecordBatch._import_from_c(
79+
features_ptr_array, features_ptr_schema
80+
)
81+
```
82+
83+
The corresponding Go code that exports this data is:
84+
```
85+
result := array.NewRecord(arrow.NewSchema(outputFields, nil), outputColumns, int64(numRows))
86+
87+
cdata.ExportArrowRecordBatch(result,
88+
cdata.ArrayFromPtr(output.DataPtr),
89+
cdata.SchemaFromPtr(output.SchemaPtr))
90+
```
91+
92+
The documentation for `ExportArrowRecordBatch` is great. It has this super useful caveat:
93+
94+
> // The release function on the populated CArrowArray will properly decrease the reference counts,
95+
> // and release the memory if the record has already been released. But since this must be explicitly
96+
> // done, make sure it is released so that you do not create a memory leak.
97+
98+
This implies that the reciever is on the hook for explicitly releasing this memory.
99+
100+
However, we're using `_import_from_c`, which uses [`ImportRecordBatch`](https://arrow.apache.org/docs/cpp/api/c_abi.html#_CPPv417ImportRecordBatchP10ArrowArrayP11ArrowSchema), which implies that the receiver of the RecordBatch is the new owner of the data.
101+
This is wrapped by pyarrow - and when the corresponding python object goes out of scope, it should clean up the underlying record batch.
102+
103+
Another thing to note (which I'm not sure may be the source of issues) is that Arrow has the concept of [Memory Pools](https://arrow.apache.org/docs/python/api/memory.html#memory-pools).
104+
Memory pools can be set in python as well as in Go. I *believe* that if we use the CGoArrowAllocator, that uses whatever pool C++ uses, which should be the same as the one used by PyArrow. But this should be vetted.
105+
106+
107+
### References
108+
- https://arrow.apache.org/docs/format/CDataInterface.html#memory-management
109+
- https://arrow.apache.org/docs/python/memory.html

go/embedded/online_features.go

Lines changed: 34 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,11 @@ type OnlineFeatureService struct {
3333
grpcStopCh chan os.Signal
3434
httpStopCh chan os.Signal
3535

36+
statusColumnBuildersToRelease []*array.Int32Builder
37+
tsColumnBuildersToRelease []*array.Int64Builder
38+
arraysToRelease []arrow.Array
39+
resultsToRelease []arrow.Record
40+
3641
err error
3742
}
3843

@@ -143,6 +148,7 @@ func (s *OnlineFeatureService) GetOnlineFeatures(
143148
if err != nil {
144149
return err
145150
}
151+
defer entitiesRecord.Release()
146152

147153
numRows := entitiesRecord.Column(0).Len()
148154

@@ -155,6 +161,7 @@ func (s *OnlineFeatureService) GetOnlineFeatures(
155161
if err != nil {
156162
return err
157163
}
164+
defer requestDataRecords.Release()
158165

159166
requestDataProto, err := recordToProto(requestDataRecords)
160167
if err != nil {
@@ -178,6 +185,24 @@ func (s *OnlineFeatureService) GetOnlineFeatures(
178185
return err
179186
}
180187

188+
// Release all objects that are no longer required.
189+
for _, statusColumnBuilderToRelease := range s.statusColumnBuildersToRelease {
190+
statusColumnBuilderToRelease.Release()
191+
}
192+
for _, tsColumnBuilderToRelease := range s.tsColumnBuildersToRelease {
193+
tsColumnBuilderToRelease.Release()
194+
}
195+
for _, arrayToRelease := range s.arraysToRelease {
196+
arrayToRelease.Release()
197+
}
198+
for _, resultsToRelease := range s.resultsToRelease {
199+
resultsToRelease.Release()
200+
}
201+
s.statusColumnBuildersToRelease = nil
202+
s.tsColumnBuildersToRelease = nil
203+
s.arraysToRelease = nil
204+
s.resultsToRelease = nil
205+
181206
outputFields := make([]arrow.Field, 0)
182207
outputColumns := make([]arrow.Array, 0)
183208
pool := memory.NewCgoArrowAllocator()
@@ -210,13 +235,19 @@ func (s *OnlineFeatureService) GetOnlineFeatures(
210235
}
211236
tsColumn := tsColumnBuilder.NewArray()
212237
outputColumns = append(outputColumns, tsColumn)
238+
239+
// Mark builders and arrays for release.
240+
s.statusColumnBuildersToRelease = append(s.statusColumnBuildersToRelease, statusColumnBuilder)
241+
s.tsColumnBuildersToRelease = append(s.tsColumnBuildersToRelease, tsColumnBuilder)
242+
s.arraysToRelease = append(s.arraysToRelease, statusColumn)
243+
s.arraysToRelease = append(s.arraysToRelease, tsColumn)
244+
s.arraysToRelease = append(s.arraysToRelease, featureVector.Values)
213245
}
214246

215247
result := array.NewRecord(arrow.NewSchema(outputFields, nil), outputColumns, int64(numRows))
248+
s.resultsToRelease = append(s.resultsToRelease, result)
216249

217-
cdata.ExportArrowRecordBatch(result,
218-
cdata.ArrayFromPtr(output.DataPtr),
219-
cdata.SchemaFromPtr(output.SchemaPtr))
250+
cdata.ExportArrowRecordBatch(result, cdata.ArrayFromPtr(output.DataPtr), cdata.SchemaFromPtr(output.SchemaPtr))
220251

221252
return nil
222253
}

go/internal/feast/featurestore.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ func (fs *FeatureStore) GetOnlineFeatures(
113113
}
114114

115115
result := make([]*onlineserving.FeatureVector, 0)
116-
arrowMemory := memory.NewGoAllocator()
116+
arrowMemory := memory.NewCgoArrowAllocator()
117117
featureViews := make([]*model.FeatureView, len(requestedFeatureViews))
118118
index := 0
119119
for _, featuresAndView := range requestedFeatureViews {

go/internal/feast/onlineserving/serving.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -415,6 +415,8 @@ func KeepOnlyRequestedFeatures(
415415
vectorsByName := make(map[string]*FeatureVector)
416416
expectedVectors := make([]*FeatureVector, 0)
417417

418+
usedVectors := make(map[string]bool)
419+
418420
for _, vector := range vectors {
419421
vectorsByName[vector.Name] = vector
420422
}
@@ -438,6 +440,14 @@ func KeepOnlyRequestedFeatures(
438440
return nil, fmt.Errorf("requested feature %s can't be retrieved", featureRef)
439441
}
440442
expectedVectors = append(expectedVectors, vectorsByName[qualifiedName])
443+
usedVectors[qualifiedName] = true
444+
}
445+
446+
// Free arrow arrays for vectors that were not used.
447+
for _, vector := range vectors {
448+
if _, ok := usedVectors[vector.Name]; !ok {
449+
vector.Values.Release()
450+
}
441451
}
442452

443453
return expectedVectors, nil

go/internal/feast/server/logging/memorybuffer.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ func getArrowSchema(schema *FeatureServiceSchema) (*arrow.Schema, error) {
128128
// and writes them to arrow table.
129129
// Returns arrow table that contains all of the logs in columnar format.
130130
func (b *MemoryBuffer) convertToArrowRecord() (arrow.Record, error) {
131-
arrowMemory := memory.NewGoAllocator()
131+
arrowMemory := memory.NewCgoArrowAllocator()
132132
numRows := len(b.logs)
133133

134134
columns := make(map[string][]*types.Value)

go/internal/feast/server/logging/memorybuffer_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ func TestSerializeToArrowTable(t *testing.T) {
118118
LogTimestamp: time.Now(),
119119
})
120120

121-
pool := memory.NewGoAllocator()
121+
pool := memory.NewCgoArrowAllocator()
122122
builder := array.NewRecordBuilder(pool, b.arrowSchema)
123123
defer builder.Release()
124124

@@ -159,7 +159,7 @@ func TestSerializeToArrowTable(t *testing.T) {
159159
expectedRecord := builder.NewRecord()
160160
assert.Nil(t, err)
161161
for colIdx := 0; colIdx < int(record.NumCols()); colIdx++ {
162-
assert.Equal(t, expectedRecord.Column(colIdx), record.Column(colIdx), "Columns with idx %d are not equal", colIdx)
162+
assert.True(t, array.Equal(expectedRecord.Column(colIdx), record.Column(colIdx)), "Columns with idx %d are not equal", colIdx)
163163
}
164164

165165
}

sdk/python/feast/embedded_go/online_features_service.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,7 @@ def get_online_features(
147147
features_ptr_array, features_ptr_schema
148148
)
149149
resp = record_batch_to_online_response(record_batch)
150+
del record_batch
150151
return OnlineResponse(resp)
151152

152153
def start_grpc_server(

0 commit comments

Comments
 (0)