diff --git a/Framework/Core/src/DPLWebSocket.cxx b/Framework/Core/src/DPLWebSocket.cxx index 06de46b387c29..92c5b9af54ca6 100644 --- a/Framework/Core/src/DPLWebSocket.cxx +++ b/Framework/Core/src/DPLWebSocket.cxx @@ -516,6 +516,7 @@ struct WriteRequestContext { struct BulkWriteRequestContext { std::vector buffers; ServiceRegistryRef ref; + std::vector* freeList = nullptr; // if non-null, return chunks here instead of freeing }; void ws_client_write_callback(uv_write_t* h, int status) @@ -543,11 +544,14 @@ void ws_client_bulk_write_callback(uv_write_t* h, int status) state.loopReason |= (DeviceState::WS_COMMUNICATION | DeviceState::WS_WRITING); if (status < 0) { LOG(error) << "uv_write error: " << uv_err_name(status); - free(h); - return; } - if (context->buffers.size()) { - for (auto& b : context->buffers) { + // Return chunks to the free list (capped) so flushPending can pre-seed + // the backlog for the next cycle without malloc-ing. + constexpr size_t kMaxFreeChunks = 4; + for (auto& b : context->buffers) { + if (context->freeList && b.base && context->freeList->size() < kMaxFreeChunks) { + context->freeList->push_back(b.base); + } else { free(b.base); } } @@ -584,4 +588,18 @@ void WSDPLClient::write(std::vector& outputs) context->buffers.size(), ws_client_bulk_write_callback); } +void WSDPLClient::write(std::vector& outputs, std::vector& freeList) +{ + if (outputs.empty()) { + return; + } + auto* write_req = (uv_write_t*)malloc(sizeof(uv_write_t)); + auto* context = new BulkWriteRequestContext{.ref = mContext->ref}; + context->buffers.swap(outputs); + context->freeList = &freeList; + write_req->data = context; + uv_write(write_req, (uv_stream_t*)mStream, &context->buffers.at(0), + context->buffers.size(), ws_client_bulk_write_callback); +} + } // namespace o2::framework diff --git a/Framework/Core/src/DPLWebSocket.h b/Framework/Core/src/DPLWebSocket.h index 1985c37157d65..eca1924de9ab2 100644 --- a/Framework/Core/src/DPLWebSocket.h +++ b/Framework/Core/src/DPLWebSocket.h @@ -89,6 +89,10 @@ struct WSDPLClient : public HTTPParser { /// Helper to write n buffers containing websockets frames to a server void write(std::vector& outputs); + /// Like write() above but recycles the chunk memory into freeList once the + /// kernel has consumed the buffers, instead of freeing it. + void write(std::vector& outputs, std::vector& freeList); + /// Dump headers void dumpHeaders(); void sendHandshake(); diff --git a/Framework/Core/src/WSDriverClient.cxx b/Framework/Core/src/WSDriverClient.cxx index 97ea1b3dbf66a..d539b34acb42f 100644 --- a/Framework/Core/src/WSDriverClient.cxx +++ b/Framework/Core/src/WSDriverClient.cxx @@ -203,7 +203,8 @@ void on_connect(uv_connect_t* connection, int status) return false; } return true; - }, &name); + }, + &name); }); client->observe("/signpost:disable", [](std::string_view cmd) { @@ -221,7 +222,8 @@ void on_connect(uv_connect_t* connection, int status) return false; } return true; - }, &name); + }, + &name); }); // Client will be filled in the line after. I can probably have a single @@ -263,6 +265,12 @@ WSDriverClient::WSDriverClient(ServiceRegistryRef registry, char const* ip, unsi WSDriverClient::~WSDriverClient() { + for (auto& buf : mBacklog) { + free(buf.base); + } + for (auto* chunk : mFreeChunks) { + free(chunk); + } free(this->mAwakeMainThread); } @@ -329,8 +337,24 @@ void WSDriverClient::flushPending(ServiceRegistryRef mainThreadRef) printed1 = false; printed2 = false; } - mClient->write(mBacklog); - mBacklog.resize(0); + // Return any pre-seeded but unused (zero-length) buffers to the free list + // so we don't send pointless zero-byte writes to the kernel. + while (!mBacklog.empty() && mBacklog.back().len == 0) { + mFreeChunks.push_back(mBacklog.back().base); + mBacklog.pop_back(); + } + mClient->write(mBacklog, mFreeChunks); + // Pre-seed mBacklog with one recycled chunk from the previous write's callback + // so that the next encode_websocket_frames reuses memory instead of malloc-ing. + // Only one chunk (64 KB) since encode_websocket_frames appends to outputs.back(). + if (!mFreeChunks.empty()) { + mBacklog.push_back(uv_buf_init(mFreeChunks.back(), 0)); + mFreeChunks.pop_back(); + } + for (auto* chunk : mFreeChunks) { + free(chunk); + } + mFreeChunks.clear(); } } // namespace o2::framework diff --git a/Framework/Core/src/WSDriverClient.h b/Framework/Core/src/WSDriverClient.h index fc4f2b89d80bd..28d13d0abdc23 100644 --- a/Framework/Core/src/WSDriverClient.h +++ b/Framework/Core/src/WSDriverClient.h @@ -56,6 +56,7 @@ class WSDriverClient : public DriverClient std::mutex mClientMutex; ServiceRegistryRef mRegistry; std::vector mBacklog; + std::vector mFreeChunks; ///< recycled WebSocket buffer chunks (main-thread only) uv_async_t* mAwakeMainThread = nullptr; uv_connect_t* mConnection = nullptr; std::unique_ptr mClient;