Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 22 additions & 4 deletions Framework/Core/src/DPLWebSocket.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -516,6 +516,7 @@ struct WriteRequestContext {
struct BulkWriteRequestContext {
std::vector<uv_buf_t> buffers;
ServiceRegistryRef ref;
std::vector<char*>* freeList = nullptr; // if non-null, return chunks here instead of freeing
};

void ws_client_write_callback(uv_write_t* h, int status)
Expand Down Expand Up @@ -543,11 +544,14 @@ void ws_client_bulk_write_callback(uv_write_t* h, int status)
state.loopReason |= (DeviceState::WS_COMMUNICATION | DeviceState::WS_WRITING);
if (status < 0) {
LOG(error) << "uv_write error: " << uv_err_name(status);
free(h);
return;
}
if (context->buffers.size()) {
for (auto& b : context->buffers) {
// Return chunks to the free list (capped) so flushPending can pre-seed
// the backlog for the next cycle without malloc-ing.
constexpr size_t kMaxFreeChunks = 4;
for (auto& b : context->buffers) {
if (context->freeList && b.base && context->freeList->size() < kMaxFreeChunks) {
context->freeList->push_back(b.base);
} else {
free(b.base);
}
}
Expand Down Expand Up @@ -584,4 +588,18 @@ void WSDPLClient::write(std::vector<uv_buf_t>& outputs)
context->buffers.size(), ws_client_bulk_write_callback);
}

void WSDPLClient::write(std::vector<uv_buf_t>& outputs, std::vector<char*>& freeList)
{
if (outputs.empty()) {
return;
}
auto* write_req = (uv_write_t*)malloc(sizeof(uv_write_t));
auto* context = new BulkWriteRequestContext{.ref = mContext->ref};
context->buffers.swap(outputs);
context->freeList = &freeList;
write_req->data = context;
uv_write(write_req, (uv_stream_t*)mStream, &context->buffers.at(0),
context->buffers.size(), ws_client_bulk_write_callback);
}

} // namespace o2::framework
4 changes: 4 additions & 0 deletions Framework/Core/src/DPLWebSocket.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,10 @@ struct WSDPLClient : public HTTPParser {
/// Helper to write n buffers containing websockets frames to a server
void write(std::vector<uv_buf_t>& outputs);

/// Like write() above but recycles the chunk memory into freeList once the
/// kernel has consumed the buffers, instead of freeing it.
void write(std::vector<uv_buf_t>& outputs, std::vector<char*>& freeList);

/// Dump headers
void dumpHeaders();
void sendHandshake();
Expand Down
32 changes: 28 additions & 4 deletions Framework/Core/src/WSDriverClient.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,8 @@ void on_connect(uv_connect_t* connection, int status)
return false;
}
return true;
}, &name);
},
&name);
});

client->observe("/signpost:disable", [](std::string_view cmd) {
Expand All @@ -221,7 +222,8 @@ void on_connect(uv_connect_t* connection, int status)
return false;
}
return true;
}, &name);
},
&name);
});

// Client will be filled in the line after. I can probably have a single
Expand Down Expand Up @@ -263,6 +265,12 @@ WSDriverClient::WSDriverClient(ServiceRegistryRef registry, char const* ip, unsi

WSDriverClient::~WSDriverClient()
{
for (auto& buf : mBacklog) {
free(buf.base);
}
for (auto* chunk : mFreeChunks) {
free(chunk);
}
free(this->mAwakeMainThread);
}

Expand Down Expand Up @@ -329,8 +337,24 @@ void WSDriverClient::flushPending(ServiceRegistryRef mainThreadRef)
printed1 = false;
printed2 = false;
}
mClient->write(mBacklog);
mBacklog.resize(0);
// Return any pre-seeded but unused (zero-length) buffers to the free list
// so we don't send pointless zero-byte writes to the kernel.
while (!mBacklog.empty() && mBacklog.back().len == 0) {
mFreeChunks.push_back(mBacklog.back().base);
mBacklog.pop_back();
}
mClient->write(mBacklog, mFreeChunks);
// Pre-seed mBacklog with one recycled chunk from the previous write's callback
// so that the next encode_websocket_frames reuses memory instead of malloc-ing.
// Only one chunk (64 KB) since encode_websocket_frames appends to outputs.back().
if (!mFreeChunks.empty()) {
mBacklog.push_back(uv_buf_init(mFreeChunks.back(), 0));
mFreeChunks.pop_back();
}
for (auto* chunk : mFreeChunks) {
free(chunk);
}
mFreeChunks.clear();
}

} // namespace o2::framework
1 change: 1 addition & 0 deletions Framework/Core/src/WSDriverClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ class WSDriverClient : public DriverClient
std::mutex mClientMutex;
ServiceRegistryRef mRegistry;
std::vector<uv_buf_t> mBacklog;
std::vector<char*> mFreeChunks; ///< recycled WebSocket buffer chunks (main-thread only)
uv_async_t* mAwakeMainThread = nullptr;
uv_connect_t* mConnection = nullptr;
std::unique_ptr<WSDPLClient> mClient;
Expand Down