From 43ece3c66f9778d28e598571a1e2accd08c72d15 Mon Sep 17 00:00:00 2001 From: MichalT Date: Wed, 14 Dec 2022 14:51:12 +0100 Subject: [PATCH 1/3] SquashedCommit --- CCDB/CMakeLists.txt | 11 +- CCDB/include/CCDB/CCDBDownloader.h | 354 ++++++++++++++++++++ CCDB/src/CCDBDownloader.cxx | 495 ++++++++++++++++++++++++++++ CCDB/src/CcdbApi.cxx | 1 + CCDB/test/testCcdbApiDownloader.cxx | 296 +++++++++++++++++ doc/data/2021-11-o2_prs.json | 5 + 6 files changed, 1161 insertions(+), 1 deletion(-) create mode 100644 CCDB/include/CCDB/CCDBDownloader.h create mode 100644 CCDB/src/CCDBDownloader.cxx create mode 100644 CCDB/test/testCcdbApiDownloader.cxx diff --git a/CCDB/CMakeLists.txt b/CCDB/CMakeLists.txt index 9b91126bcb56f..a00b37b59e417 100644 --- a/CCDB/CMakeLists.txt +++ b/CCDB/CMakeLists.txt @@ -11,6 +11,7 @@ o2_add_library(CCDB SOURCES src/CcdbApi.cxx + src/CCDBDownloader.cxx src/BasicCCDBManager.cxx src/CCDBTimeStampUtils.cxx src/IdPath.cxx src/CCDBQuery.cxx @@ -19,6 +20,7 @@ o2_add_library(CCDB O2::CommonUtils FairMQ::FairMQ libjalien::libjalienO2 + LibUV::LibUV TARGETVARNAME targetName) o2_target_root_dictionary(CCDB @@ -29,7 +31,8 @@ o2_target_root_dictionary(CCDB include/CCDB/IdPath.h include/CCDB/BasicCCDBManager.h include/CCDB/CCDBTimeStampUtils.h - include/CCDB/CCDBQuery.h) + include/CCDB/CCDBQuery.h + include/CCDB/CCDBDownloader.h) o2_add_executable(inspectccdbfile COMPONENT_NAME ccdb @@ -76,3 +79,9 @@ o2_add_test(CcdbApiMultipleUrls COMPONENT_NAME ccdb PUBLIC_LINK_LIBRARIES O2::CCDB LABELS ccdb) + +o2_add_test(CcdbDownloader + SOURCES test/testCcdbApiDownloader.cxx + COMPONENT_NAME ccdb + PUBLIC_LINK_LIBRARIES O2::CCDB + LABELS ccdb) \ No newline at end of file diff --git a/CCDB/include/CCDB/CCDBDownloader.h b/CCDB/include/CCDB/CCDBDownloader.h new file mode 100644 index 0000000000000..1772d5f49d8c0 --- /dev/null +++ b/CCDB/include/CCDB/CCDBDownloader.h @@ -0,0 +1,354 @@ +// Copyright 2019-2020 CERN and copyright holders of ALICE O2. +// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders. +// All rights not expressly granted are reserved. +// +// This software is distributed under the terms of the GNU General Public +// License v3 (GPL Version 3), copied verbatim in the file "COPYING". +// +// In applying this license CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#ifndef ALICEO2_CCDBDOWNLOADER_H +#define ALICEO2_CCDBDOWNLOADER_H + +using namespace std; + +namespace o2 +{ +namespace ccdb +{ + +/* + Some functions below aren't member functions of CCDBDownloader because both curl and libuv require callback functions which have to be either static or non-member. + Because non-static functions are used in the functions below, they must be non-member. +*/ + +/** + * uv_walk callback which is used to close passed handle. + * + * @param handle Handle to be closed. + * @param arg Argument required by callback template. Is not used in this implementation. + */ +void closeHandles(uv_handle_t* handle, void* arg); + +/** + * Called by CURL in order to open a new socket. Newly opened sockets are assigned a timeout timer and added to socketTimerMap. + * + * @param clientp Pointer to the CCDBDownloader instance which controls the socket. + * @param purpose Purpose of opened socket. This parameter is unused but required by the callback template. + * @param address Structure containing information about family, type and protocol for the socket. + */ +curl_socket_t opensocketCallback(void* clientp, curlsocktype purpose, struct curl_sockaddr* address); + +/** + * Delete the handle. + * + * @param handle Handle assigned to this callback. + */ +void onUVClose(uv_handle_t* handle); + +class CCDBDownloader +{ + public: + /** + * Timer starts for each socket when its respective transfer finishes, and is stopped when another transfer starts for that handle. + * When the timer runs out it closes the socket. The period for which socket stays open is defined by socketTimoutMS. + */ + std::unordered_map socketTimerMap; + + /** + * The UV loop which handles transfers. + */ + uv_loop_t* loop; + + std::unordered_map handleMap; + // ADD COMMENT + + /** + * Time for which sockets will stay open after last download finishes + */ + int socketTimoutMS = 4000; + + /** + * Max number of handles that can be used at the same time + */ + int maxHandlesInUse = 3; + + // CCDBDownloader(uv_loop_t uv_loop); + CCDBDownloader(uv_loop_t* uv_loop = nullptr); + ~CCDBDownloader(); + + /** + * Perform on a single handle in a blocking manner. Has the same effect as curl_easy_perform(). + * + * @param handle Handle to be performed on. It can be reused or cleaned after perform finishes. + */ + CURLcode perform(CURL* handle); + + /** + * Perform on a batch of handles. Callback will be exectuted in it's own thread after all handles finish their transfers. + * + * @param handles Handles to be performed on. + */ + std::vector* asynchBatchPerformWithCallback(std::vector handles, bool* completionFlag, void (*cbFun)(void*), void* cbData); + + /** + * Perform on a batch of handles in a blocking manner. Has the same effect as calling curl_easy_perform() on all handles in the vector. + * @param handleVector Handles to be performed on. + */ + std::vector batchBlockingPerform(std::vector handleVector); + + /** + * Perform on a batch of handles. Completion flag will be set to true when all handles finish their transfers. + * @param handleVector Handles to be performed on. + * @param completionFlag Should be set to false before passing it to this function. Will be set to true after all transfers finish. + */ + std::vector* batchAsynchPerform(std::vector handleVector, bool* completionFlag); + + /** + * Limits the number of parallel connections. Should be used only if no transfers are happening. + */ + void setMaxParallelConnections(int limit); + + /** + * Limits the time a socket and its connection will be opened after transfer finishes. + */ + void setSocketTimoutTime(int timoutMS); + + private: + /** + * Indicates whether the loop that the downloader is running on has been created by it or provided externally. + * In case of external loop, the loop will not be closed after downloader is deleted. + */ + bool externalLoop; + + /** + * Current amount of handles which are performed on. + */ + int handlesInUse = 0; + + /** + * Multi handle which controlls all network flow. + */ + CURLM* curlMultiHandle = nullptr; + + /** + * The timeout clock that is be used by CURL. + */ + uv_timer_t* timeout; + + /** + * Queue of handles awaiting their transfers to start. + */ + std::vector handlesToBeAdded; + + /** + * Lock protecting the handleToBeAdded queue. + */ + std::mutex handlesQueueLock; + + /** + * Thread on which the thread with uv_loop runs. + */ + std::thread* loopThread; + + /** + * Vector with reference to callback threads with a flag marking whether they finished running. + */ + std::vector> threadFlagPairVector; + + /** + * Flag used to signall the loop to close. + */ + bool closeLoop = false; + + /** + * Types of requests. + */ + enum RequestType { + BLOCKING, + ASYNCHRONOUS, + ASYNCHRONOUS_WITH_CALLBACK + }; + + /** + * Information about a socket. + */ + typedef struct curl_context_s { + uv_poll_t poll_handle; + curl_socket_t sockfd = -1; + CCDBDownloader* CD = nullptr; + } curl_context_t; + + /** + * Structure used for CURLMOPT_SOCKETDATA, which gives context for handleSocket + */ + typedef struct DataForSocket { + CCDBDownloader* CD; + CURLM* curlm; + } DataForSocket; + + /** + * Structure which is stored in a easy_handle. It carries information about the request which the easy_handle is part of. + * All easy handles coming from one request have an identical PerformData structure. + */ + typedef struct PerformData { + std::condition_variable* cv; + bool* completionFlag; + CURLcode* codeDestination; + void (*cbFun)(void*); + std::thread* cbThread; + void* cbData; + size_t* requestsLeft; + RequestType type; + } PerformData; + + /** + * Called by CURL in order to close a socket. It will be called by CURL even if a timout timer closed the socket beforehand. + * + * @param clientp Pointer to the CCDBDownloader instance which controls the socket. + * @param item File descriptor of the socket. + */ + static void closesocketCallback(void* clientp, curl_socket_t item); + + /** + * Is used to react to polling file descriptors in poll_handle. + * + * @param handle Handle assigned to this callback. + * @param status Used to signal errors. + * @param events Bitmask used to describe events on the socket. + */ + static void curlPerform(uv_poll_t* handle, int status, int events); + + /** + * Check if loop was signalled to close. The handle connected with this callbacks is always active as to prevent the uv_loop from stopping. + * + * @param handle uv_handle to which this callbacks is assigned + */ + static void checkStopSignal(uv_timer_t* handle); + + /** + * Used by CURL to react to action happening on a socket. + */ + static int handleSocket(CURL* easy, curl_socket_t s, int action, void* userp, void* socketp); + + /** + * Asynchronously notify the loop to check its CURL handle queue. + * + * @param handle Handle which is assigned to this callback. + */ + static void asyncUVHandleCheckQueue(uv_async_t* handle); + + /** + * Close socket assigned to the timer handle. + * + * @param handle Handle which is assigned to this callback. + */ + static void closeSocketByTimer(uv_timer_t* handle); + + /** + * Start new transfers, terminate expired transfers. + * + * @param req Handle which is assigned to this callback. + */ + static void curlTimeout(uv_timer_t* req); + + /** + * Free curl context assigned to the handle. + * + * @param handle Handle assigned to this callback. + */ + static void curlCloseCB(uv_handle_t* handle); + + /** + * Close poll handle assigned to the socket contained in the context and free data within the handle. + * + * @param context Structure containing information about socket and handle to be closed. + */ + static void destroyCurlContext(curl_context_t* context); + + /** + * Connect curl timer with uv timer. + * + * @param multi Multi handle for which the timout will be set + * @param timeout_ms Time until timeout + * @param userp Pointer to the uv_timer_t handle that is used for timeout. + */ + static int startTimeout(CURLM* multi, long timeout_ms, void* userp); + + /** + * Check if any of the callback threads have finished running and approprietly join them. + */ + void checkForThreadsToJoin(); + + /** + * Create a new multi_handle for the downloader + */ + void initializeMultiHandle(); + + /** + * Release resources reserver for the transfer, mark transfer as complete, passe the CURLcode to the destination and launche callbacks if it is specified in PerformData. + * + * @param handle The easy_handle for which the transfer completed + * @param curlCode The code produced for the handle by the transfer + */ + void transferFinished(CURL* handle, CURLcode curlCode); + + /** + * Check message queue inside curl multi handle. + */ + void checkMultiInfo(); + + /** + * Set openSocketCallback and closeSocketCallback with appropriate arguments. Stores data inside the CURL handle. + */ + void setHandleOptions(CURL* handle, PerformData* data); + + /** + * Create structure holding information about a socket including a poll handle assigned to it + * + * @param socketfd File descriptor of socket for which the structure will be created + */ + curl_context_t* createCurlContext(curl_socket_t sockfd); + + /** + * Asynchroniously signal the event loop to check for new easy_handles to add to multi handle. + */ + void makeLoopCheckQueueAsync(); + + /** + * If multi_handles uses less then maximum number of handles then add handles from the queue. + */ + void checkHandleQueue(); + + /** + * Start the event loop. This function should be ran in the `loopThread`. + */ + void runLoop(); +}; + +/** + * Structure assigned to a uv_timer_t before adding it to socketTimerMap. It stores the information about the socket connected to the timer. + */ +typedef struct DataForClosingSocket { + CCDBDownloader* CD; + curl_socket_t socket; +} DataForClosingSocket; + +} // namespace ccdb +} // namespace o2 + +#endif diff --git a/CCDB/src/CCDBDownloader.cxx b/CCDB/src/CCDBDownloader.cxx new file mode 100644 index 0000000000000..d87bac333072e --- /dev/null +++ b/CCDB/src/CCDBDownloader.cxx @@ -0,0 +1,495 @@ +// Copyright 2019-2020 CERN and copyright holders of ALICE O2. +// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders. +// All rights not expressly granted are reserved. +// +// This software is distributed under the terms of the GNU General Public +// License v3 (GPL Version 3), copied verbatim in the file "COPYING". +// +// In applying this license CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace o2 +{ +namespace ccdb +{ + +CCDBDownloader::CCDBDownloader(uv_loop_t* uv_loop) +{ + if (uv_loop) { + loop = uv_loop; + externalLoop = true; + } else { + loop = new uv_loop_t(); + externalLoop = false; + } + + // Preparing timer to be used by curl + timeout = new uv_timer_t(); + timeout->data = this; + uv_loop_init(loop); + uv_timer_init(loop, timeout); + handleMap[(uv_handle_t*)timeout] = true; + + // Preparing curl handle + initializeMultiHandle(); + + // Global timer + // uv_loop runs only when there are active handles, this handle guarantees the loop won't close immedietly after starting + auto timerCheckQueueHandle = new uv_timer_t(); + timerCheckQueueHandle->data = this; + uv_timer_init(loop, timerCheckQueueHandle); + handleMap[(uv_handle_t*)timerCheckQueueHandle] = true; + uv_timer_start(timerCheckQueueHandle, checkStopSignal, 100, 100); + + loopThread = new std::thread(&CCDBDownloader::runLoop, this); +} + +void CCDBDownloader::initializeMultiHandle() +{ + curlMultiHandle = curl_multi_init(); + curl_multi_setopt(curlMultiHandle, CURLMOPT_SOCKETFUNCTION, handleSocket); + auto socketData = new DataForSocket(); + socketData->curlm = curlMultiHandle; + socketData->CD = this; + curl_multi_setopt(curlMultiHandle, CURLMOPT_SOCKETDATA, socketData); + curl_multi_setopt(curlMultiHandle, CURLMOPT_TIMERFUNCTION, startTimeout); + curl_multi_setopt(curlMultiHandle, CURLMOPT_TIMERDATA, timeout); + curl_multi_setopt(curlMultiHandle, CURLMOPT_MAX_TOTAL_CONNECTIONS, maxHandlesInUse); +} + +CCDBDownloader::~CCDBDownloader() +{ + // Close all socket timers (curl_multi_cleanup will take care of the sockets) + for (auto socketTimerPair : socketTimerMap) { + uv_timer_stop(socketTimerPair.second); + uv_close((uv_handle_t*)socketTimerPair.second, onUVClose); + } + + // Close loop thread + closeLoop = true; + loopThread->join(); + delete loopThread; + + // Close the loop and if any handles are running then signal to close, and run loop once to close them + // This may take more then one iteration of loop - hence the "while" + if (externalLoop) { + uv_walk(loop, closeHandles, this); + } else { + while (UV_EBUSY == uv_loop_close(loop)) { + closeLoop = false; + uv_walk(loop, closeHandles, this); + uv_run(loop, UV_RUN_ONCE); + } + } + curl_multi_cleanup(curlMultiHandle); +} + +void closeHandles(uv_handle_t* handle, void* arg) +{ + auto CD = (CCDBDownloader*)arg; + if (!uv_is_closing(handle) && CD->handleMap.find(handle) != CD->handleMap.end()) { + CD->handleMap.erase(handle); + uv_close(handle, onUVClose); + } +} + +void onUVClose(uv_handle_t* handle) +{ + if (handle != NULL) { + delete handle; + } +} + +void CCDBDownloader::checkStopSignal(uv_timer_t* handle) +{ + // Check for closing signal + auto CD = (CCDBDownloader*)handle->data; + if (CD->closeLoop) { + uv_timer_stop(handle); + uv_stop(CD->loop); + } + CD->checkForThreadsToJoin(); +} + +void CCDBDownloader::closesocketCallback(void* clientp, curl_socket_t item) +{ + auto CD = (CCDBDownloader*)clientp; + if (CD->socketTimerMap.find(item) != CD->socketTimerMap.end()) { + uv_timer_stop(CD->socketTimerMap[item]); + CD->socketTimerMap.erase(item); + close(item); + } +} + +curl_socket_t opensocketCallback(void* clientp, curlsocktype purpose, struct curl_sockaddr* address) +{ + auto CD = (CCDBDownloader*)clientp; + auto sock = socket(address->family, address->socktype, address->protocol); + + CD->socketTimerMap[sock] = new uv_timer_t(); + uv_timer_init(CD->loop, CD->socketTimerMap[sock]); + CD->handleMap[(uv_handle_t*)CD->socketTimerMap[sock]] = true; + + auto data = new DataForClosingSocket(); + data->CD = CD; + data->socket = sock; + CD->socketTimerMap[sock]->data = data; + + return sock; +} + +void CCDBDownloader::asyncUVHandleCheckQueue(uv_async_t* handle) +{ + auto CD = (CCDBDownloader*)handle->data; + uv_close((uv_handle_t*)handle, onUVClose); + CD->checkHandleQueue(); +} + +void CCDBDownloader::closeSocketByTimer(uv_timer_t* handle) +{ + auto data = (DataForClosingSocket*)handle->data; + auto CD = data->CD; + auto sock = data->socket; + + if (CD->socketTimerMap.find(sock) != CD->socketTimerMap.end()) { + uv_timer_stop(CD->socketTimerMap[sock]); + CD->socketTimerMap.erase(sock); + close(sock); + return; + } +} + +void CCDBDownloader::curlTimeout(uv_timer_t* handle) +{ + auto CD = (CCDBDownloader*)handle->data; + int running_handles; + curl_multi_socket_action(CD->curlMultiHandle, CURL_SOCKET_TIMEOUT, 0, &running_handles); + CD->checkMultiInfo(); +} + +void CCDBDownloader::curlPerform(uv_poll_t* handle, int status, int events) +{ + int running_handles; + int flags = 0; + if (events & UV_READABLE) + flags |= CURL_CSELECT_IN; + if (events & UV_WRITABLE) + flags |= CURL_CSELECT_OUT; + + auto context = (CCDBDownloader::curl_context_t*)handle->data; + + curl_multi_socket_action(context->CD->curlMultiHandle, context->sockfd, flags, &running_handles); + context->CD->checkMultiInfo(); +} + +int CCDBDownloader::handleSocket(CURL* easy, curl_socket_t s, int action, void* userp, void* socketp) +{ + auto socketData = (CCDBDownloader::DataForSocket*)userp; + auto CD = (CCDBDownloader*)socketData->CD; + CCDBDownloader::curl_context_t* curl_context; + int events = 0; + + switch (action) { + case CURL_POLL_IN: + case CURL_POLL_OUT: + case CURL_POLL_INOUT: + + curl_context = socketp ? (CCDBDownloader::curl_context_t*)socketp : CD->createCurlContext(s); + curl_multi_assign(socketData->curlm, s, (void*)curl_context); + + if (action != CURL_POLL_IN) + events |= UV_WRITABLE; + if (action != CURL_POLL_OUT) + events |= UV_READABLE; + + if (CD->socketTimerMap.find(s) != CD->socketTimerMap.end()) { + uv_timer_stop(CD->socketTimerMap[s]); + } + + uv_poll_start(&curl_context->poll_handle, events, curlPerform); + break; + case CURL_POLL_REMOVE: + if (socketp) { + if (CD->socketTimerMap.find(s) != CD->socketTimerMap.end()) { + uv_timer_start(CD->socketTimerMap[s], closeSocketByTimer, CD->socketTimoutMS, 0); + } + uv_poll_stop(&((CCDBDownloader::curl_context_t*)socketp)->poll_handle); + CD->destroyCurlContext((CCDBDownloader::curl_context_t*)socketp); + curl_multi_assign(socketData->curlm, s, NULL); + } + break; + default: + abort(); + } + + return 0; +} + +void CCDBDownloader::setMaxParallelConnections(int limit) +{ + maxHandlesInUse = limit; +} + +void CCDBDownloader::setSocketTimoutTime(int timoutMS) +{ + socketTimoutMS = timoutMS; +} + +void CCDBDownloader::checkForThreadsToJoin() +{ + for (int i = 0; i < threadFlagPairVector.size(); i++) { + if (*(threadFlagPairVector[i].second)) { + threadFlagPairVector[i].first->join(); + delete (threadFlagPairVector[i].first); + delete (threadFlagPairVector[i].second); + threadFlagPairVector.erase(threadFlagPairVector.begin() + i); + } + } +} + +CCDBDownloader::curl_context_t* CCDBDownloader::createCurlContext(curl_socket_t sockfd) +{ + curl_context_t* context; + + context = (curl_context_t*)malloc(sizeof(*context)); + context->CD = this; + context->sockfd = sockfd; + + uv_poll_init_socket(loop, &context->poll_handle, sockfd); + handleMap[(uv_handle_t*)(&context->poll_handle)] = true; + context->poll_handle.data = context; + + return context; +} + +void CCDBDownloader::curlCloseCB(uv_handle_t* handle) +{ + curl_context_t* context = (curl_context_t*)handle->data; + free(context); +} + +void CCDBDownloader::destroyCurlContext(curl_context_t* context) +{ + uv_close((uv_handle_t*)&context->poll_handle, curlCloseCB); +} + +void callbackWrappingFunction(void (*cbFun)(void*), void* data, bool* completionFlag) +{ + cbFun(data); + *completionFlag = true; +} + +void CCDBDownloader::transferFinished(CURL* easy_handle, CURLcode curlCode) +{ + handlesInUse--; + PerformData* data; + curl_easy_getinfo(easy_handle, CURLINFO_PRIVATE, &data); + + curl_multi_remove_handle(curlMultiHandle, easy_handle); + *data->codeDestination = curlCode; + + // If no requests left then signal finished based on type of operation + if (--(*data->requestsLeft) == 0) { + switch (data->type) { + case BLOCKING: + data->cv->notify_all(); + break; + case ASYNCHRONOUS: + *data->completionFlag = true; + break; + case ASYNCHRONOUS_WITH_CALLBACK: + *data->completionFlag = true; + bool* cbFlag = (bool*)malloc(sizeof(bool)); + *cbFlag = false; + auto cbThread = new std::thread(&callbackWrappingFunction, data->cbFun, data->cbData, cbFlag); + threadFlagPairVector.emplace_back(cbThread, cbFlag); + break; + } + } + delete data; + + checkHandleQueue(); + + // Calling timout starts a new download if a new easy_handle was added. + int running_handles; + curl_multi_socket_action(curlMultiHandle, CURL_SOCKET_TIMEOUT, 0, &running_handles); + checkMultiInfo(); +} + +void CCDBDownloader::checkMultiInfo() +{ + CURLMsg* message; + int pending; + + while ((message = curl_multi_info_read(curlMultiHandle, &pending))) { + switch (message->msg) { + case CURLMSG_DONE: { + CURLcode code = message->data.result; + transferFinished(message->easy_handle, code); + } break; + + default: + fprintf(stderr, "CURLMSG default\n"); + break; + } + } +} + +int CCDBDownloader::startTimeout(CURLM* multi, long timeout_ms, void* userp) +{ + auto timeout = (uv_timer_t*)userp; + + if (timeout_ms < 0) { + uv_timer_stop(timeout); + } else { + if (timeout_ms == 0) + timeout_ms = 1; // Calling curlTimeout when timeout = 0 could create an infinite loop + uv_timer_start(timeout, curlTimeout, timeout_ms, 0); + } + return 0; +} + +void CCDBDownloader::setHandleOptions(CURL* handle, PerformData* data) +{ + curl_easy_setopt(handle, CURLOPT_PRIVATE, data); + + curl_easy_setopt(handle, CURLOPT_CLOSESOCKETFUNCTION, closesocketCallback); + curl_easy_setopt(handle, CURLOPT_CLOSESOCKETDATA, this); + curl_easy_setopt(handle, CURLOPT_OPENSOCKETFUNCTION, opensocketCallback); + curl_easy_setopt(handle, CURLOPT_OPENSOCKETDATA, this); +} + +void CCDBDownloader::checkHandleQueue() +{ + // Lock access to handle queue + handlesQueueLock.lock(); + if (handlesToBeAdded.size() > 0) { + // Add handles without going over the limit + while (handlesToBeAdded.size() > 0 && handlesInUse < maxHandlesInUse) { + curl_multi_add_handle(curlMultiHandle, handlesToBeAdded.front()); + handlesInUse++; + handlesToBeAdded.erase(handlesToBeAdded.begin()); + } + } + handlesQueueLock.unlock(); +} + +void CCDBDownloader::runLoop() +{ + uv_run(loop, UV_RUN_DEFAULT); +} + +CURLcode CCDBDownloader::perform(CURL* handle) +{ + std::vector handleVector; + handleVector.push_back(handle); + return batchBlockingPerform(handleVector).back(); +} + +std::vector* CCDBDownloader::batchAsynchPerform(std::vector handleVector, bool* completionFlag) +{ + auto codeVector = new std::vector(handleVector.size()); + size_t* requestsLeft = new size_t(); + *requestsLeft = handleVector.size(); + + handlesQueueLock.lock(); + for (int i = 0; i < handleVector.size(); i++) { + auto* data = new CCDBDownloader::PerformData(); + + data->codeDestination = &(*codeVector)[i]; + (*codeVector)[i] = CURLE_FAILED_INIT; + + data->requestsLeft = requestsLeft; + data->completionFlag = completionFlag; + data->type = ASYNCHRONOUS; + + setHandleOptions(handleVector[i], data); + handlesToBeAdded.push_back(handleVector[i]); + } + handlesQueueLock.unlock(); + makeLoopCheckQueueAsync(); + return codeVector; +} + +std::vector CCDBDownloader::batchBlockingPerform(std::vector handleVector) +{ + std::condition_variable cv; + std::mutex cv_m; + std::unique_lock lk(cv_m); + + std::vector codeVector(handleVector.size()); + size_t requestsLeft = handleVector.size(); + + handlesQueueLock.lock(); + for (int i = 0; i < handleVector.size(); i++) { + auto* data = new CCDBDownloader::PerformData(); + data->codeDestination = &codeVector[i]; + codeVector[i] = CURLE_FAILED_INIT; + + data->cv = &cv; + data->type = BLOCKING; + data->requestsLeft = &requestsLeft; + + setHandleOptions(handleVector[i], data); + handlesToBeAdded.push_back(handleVector[i]); + } + handlesQueueLock.unlock(); + makeLoopCheckQueueAsync(); + cv.wait(lk); + return codeVector; +} + +std::vector* CCDBDownloader::asynchBatchPerformWithCallback(std::vector handleVector, bool* completionFlag, void (*cbFun)(void*), void* cbData) +{ + auto codeVector = new std::vector(handleVector.size()); + size_t* requestsLeft = new size_t(); + *requestsLeft = handleVector.size(); + + handlesQueueLock.lock(); + for (int i = 0; i < handleVector.size(); i++) { + auto* data = new CCDBDownloader::PerformData(); + + data->codeDestination = &(*codeVector)[i]; + (*codeVector)[i] = CURLE_FAILED_INIT; + + data->requestsLeft = requestsLeft; + data->completionFlag = completionFlag; + data->type = ASYNCHRONOUS_WITH_CALLBACK; + data->cbFun = cbFun; + data->cbData = cbData; + + setHandleOptions(handleVector[i], data); + handlesToBeAdded.push_back(handleVector[i]); + } + handlesQueueLock.unlock(); + makeLoopCheckQueueAsync(); + return codeVector; +} + +void CCDBDownloader::makeLoopCheckQueueAsync() +{ + auto asyncHandle = new uv_async_t(); + asyncHandle->data = this; + uv_async_init(loop, asyncHandle, asyncUVHandleCheckQueue); + uv_async_send(asyncHandle); +} + +} // namespace ccdb +} // namespace o2 \ No newline at end of file diff --git a/CCDB/src/CcdbApi.cxx b/CCDB/src/CcdbApi.cxx index 287913deef398..893bc2e532d6c 100644 --- a/CCDB/src/CcdbApi.cxx +++ b/CCDB/src/CcdbApi.cxx @@ -16,6 +16,7 @@ #include "CCDB/CcdbApi.h" #include "CCDB/CCDBQuery.h" + #include "CommonUtils/StringUtils.h" #include "CommonUtils/FileSystemUtils.h" #include "CommonUtils/MemFileHelper.h" diff --git a/CCDB/test/testCcdbApiDownloader.cxx b/CCDB/test/testCcdbApiDownloader.cxx new file mode 100644 index 0000000000000..ef3cb978703da --- /dev/null +++ b/CCDB/test/testCcdbApiDownloader.cxx @@ -0,0 +1,296 @@ +// Copyright 2019-2020 CERN and copyright holders of ALICE O2. +// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders. +// All rights not expressly granted are reserved. +// +// This software is distributed under the terms of the GNU General Public +// License v3 (GPL Version 3), copied verbatim in the file "COPYING". +// +// In applying this license CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +#define BOOST_TEST_MODULE CCDB +#define BOOST_TEST_MAIN +#define BOOST_TEST_DYN_LINK + +#include +#include +#include +#include +#include +#include // Sleep function to wait for asynch results + +#include +#include + +using namespace std; + +namespace o2 +{ +namespace ccdb +{ + +size_t CurlWrite_CallbackFunc_StdString2(void* contents, size_t size, size_t nmemb, std::string* s) +{ + size_t newLength = size * nmemb; + size_t oldLength = s->size(); + try { + s->resize(oldLength + newLength); + } catch (std::bad_alloc& e) { + LOG(error) << "memory error when getting data from CCDB"; + return 0; + } + + std::copy((char*)contents, (char*)contents + newLength, s->begin() + oldLength); + return size * nmemb; +} + +CURL* testHandle(std::string* dst) +{ + CURL* handle = curl_easy_init(); + curl_easy_setopt(handle, CURLOPT_WRITEFUNCTION, CurlWrite_CallbackFunc_StdString2); + curl_easy_setopt(handle, CURLOPT_WRITEDATA, dst); + curl_easy_setopt(handle, CURLOPT_URL, "http://ccdb-test.cern.ch:8080/latest/"); + return handle; +} + +BOOST_AUTO_TEST_CASE(perform_test) +{ + if (curl_global_init(CURL_GLOBAL_ALL)) { + fprintf(stderr, "Could not init curl\n"); + return; + } + + CCDBDownloader downloader; + std::string dst = ""; + CURL* handle = testHandle(&dst); + + CURLcode curlCode = downloader.perform(handle); + + BOOST_CHECK(curlCode == CURLE_OK); + std::cout << "CURL code: " << curlCode << "\n"; + + long httpCode; + curl_easy_getinfo(handle, CURLINFO_HTTP_CODE, &httpCode); + BOOST_CHECK(httpCode == 200); + std::cout << "HTTP code: " << httpCode << "\n"; + + curl_easy_cleanup(handle); + + curl_global_cleanup(); +} + +BOOST_AUTO_TEST_CASE(blocking_batch_test) +{ + if (curl_global_init(CURL_GLOBAL_ALL)) { + fprintf(stderr, "Could not init curl\n"); + return; + } + + CCDBDownloader downloader; + std::vector handleVector; + std::vector destinations; + for (int i = 0; i < 100; i++) { + destinations.push_back(new std::string()); + handleVector.push_back(testHandle(destinations.back())); + } + + auto curlCodes = downloader.batchBlockingPerform(handleVector); + for (CURLcode code : curlCodes) { + BOOST_CHECK(code == CURLE_OK); + if (code != CURLE_OK) + std::cout << "CURL Code: " << code << "\n"; + } + + for (CURL* handle : handleVector) { + long httpCode; + curl_easy_getinfo(handle, CURLINFO_HTTP_CODE, &httpCode); + BOOST_CHECK(httpCode == 200); + if (httpCode != 200) + std::cout << "HTTP Code: " << httpCode << "\n"; + curl_easy_cleanup(handle); + } + + for (std::string* dst : destinations) { + delete dst; + } + + curl_global_cleanup(); +} + +BOOST_AUTO_TEST_CASE(asynch_batch_test) +{ + if (curl_global_init(CURL_GLOBAL_ALL)) { + fprintf(stderr, "Could not init curl\n"); + return; + } + + CCDBDownloader downloader; + std::vector handleVector; + std::vector destinations; + for (int i = 0; i < 10; i++) { + destinations.push_back(new std::string()); + handleVector.push_back(testHandle(destinations.back())); + } + + bool flag = false; + auto curlCodes = downloader.batchAsynchPerform(handleVector, &flag); + while (!flag) + sleep(1); + + for (CURLcode code : (*curlCodes)) { + BOOST_CHECK(code == CURLE_OK); + if (code != CURLE_OK) + std::cout << "CURL Code: " << code << "\n"; + } + + for (CURL* handle : handleVector) { + long httpCode; + curl_easy_getinfo(handle, CURLINFO_HTTP_CODE, &httpCode); + BOOST_CHECK(httpCode == 200); + if (httpCode != 200) + std::cout << "HTTP Code: " << httpCode << "\n"; + curl_easy_cleanup(handle); + } + + for (std::string* dst : destinations) { + delete dst; + } + + curl_global_cleanup(); +} + +BOOST_AUTO_TEST_CASE(test_with_break) +{ + if (curl_global_init(CURL_GLOBAL_ALL)) { + fprintf(stderr, "Could not init curl\n"); + return; + } + + CCDBDownloader downloader; + std::vector handleVector; + std::vector destinations; + for (int i = 0; i < 100; i++) { + destinations.push_back(new std::string()); + handleVector.push_back(testHandle(destinations.back())); + } + + auto curlCodes = downloader.batchBlockingPerform(handleVector); + for (std::string* dst : destinations) { + delete dst; + } + + sleep(10); + + std::vector handleVector2; + std::vector destinations2; + for (int i = 0; i < 100; i++) { + destinations2.push_back(new std::string()); + handleVector2.push_back(testHandle(destinations2.back())); + } + + auto curlCodes2 = downloader.batchBlockingPerform(handleVector2); + for (CURLcode code : curlCodes2) { + BOOST_CHECK(code == CURLE_OK); + if (code != CURLE_OK) + std::cout << "CURL Code: " << code << "\n"; + } + + for (CURL* handle : handleVector2) { + long httpCode; + curl_easy_getinfo(handle, CURLINFO_HTTP_CODE, &httpCode); + BOOST_CHECK(httpCode == 200); + if (httpCode != 200) + std::cout << "HTTP Code: " << httpCode << "\n"; + curl_easy_cleanup(handle); + } + + for (std::string* dst : destinations2) { + delete dst; + } + + curl_global_cleanup(); +} + +void testCallback(void* ptr) +{ + int* intPtr = (int*)ptr; + *intPtr = 46; +} + +BOOST_AUTO_TEST_CASE(asynch_batch_callback) +{ + if (curl_global_init(CURL_GLOBAL_ALL)) { + fprintf(stderr, "Could not init curl\n"); + return; + } + + CCDBDownloader downloader; + std::vector handleVector; + std::vector destinations; + for (int i = 0; i < 10; i++) { + destinations.push_back(new std::string()); + handleVector.push_back(testHandle(destinations.back())); + } + + int testValue = 0; + + bool flag = false; + auto curlCodes = downloader.asynchBatchPerformWithCallback(handleVector, &flag, testCallback, &testValue); + while (!flag) + sleep(1); + + BOOST_CHECK(testValue == 46); + + for (CURLcode code : (*curlCodes)) { + BOOST_CHECK(code == CURLE_OK); + if (code != CURLE_OK) + std::cout << "CURL Code: " << code << "\n"; + } + + for (CURL* handle : handleVector) { + long httpCode; + curl_easy_getinfo(handle, CURLINFO_HTTP_CODE, &httpCode); + BOOST_CHECK(httpCode == 200); + if (httpCode != 200) + std::cout << "HTTP Code: " << httpCode << "\n"; + curl_easy_cleanup(handle); + } + + for (std::string* dst : destinations) { + delete dst; + } + + curl_global_cleanup(); +} + +BOOST_AUTO_TEST_CASE(external_loop_test) +{ + if (curl_global_init(CURL_GLOBAL_ALL)) { + fprintf(stderr, "Could not init curl\n"); + return; + } + + uv_loop_t loop; + + CCDBDownloader downloader(&loop); + std::string dst = ""; + CURL* handle = testHandle(&dst); + + CURLcode curlCode = downloader.perform(handle); + + BOOST_CHECK(curlCode == CURLE_OK); + std::cout << "CURL code: " << curlCode << "\n"; + + long httpCode; + curl_easy_getinfo(handle, CURLINFO_HTTP_CODE, &httpCode); + BOOST_CHECK(httpCode == 200); + std::cout << "HTTP code: " << httpCode << "\n"; + + curl_easy_cleanup(handle); + + curl_global_cleanup(); +} + +} // namespace ccdb +} // namespace o2 \ No newline at end of file diff --git a/doc/data/2021-11-o2_prs.json b/doc/data/2021-11-o2_prs.json index 1a5ec1b016c57..ad6de92d28957 100644 --- a/doc/data/2021-11-o2_prs.json +++ b/doc/data/2021-11-o2_prs.json @@ -2407,6 +2407,11 @@ "node": { "path": "CCDB/test/testCcdbApiMultipleUrls.cxx" } + }, + { + "node": { + "path": "CCDB/test/testCcdbApiDownloader.cxx" + } } ] } From b81cb3553947899a009a5f065021a74bfb5fe918 Mon Sep 17 00:00:00 2001 From: ALICE Action Bot Date: Wed, 14 Dec 2022 15:04:47 +0000 Subject: [PATCH 2/3] Please consider the following formatting changes --- CCDB/include/CCDB/CCDBDownloader.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/CCDB/include/CCDB/CCDBDownloader.h b/CCDB/include/CCDB/CCDBDownloader.h index 1772d5f49d8c0..659e8579ff658 100644 --- a/CCDB/include/CCDB/CCDBDownloader.h +++ b/CCDB/include/CCDB/CCDBDownloader.h @@ -78,8 +78,8 @@ class CCDBDownloader // ADD COMMENT /** - * Time for which sockets will stay open after last download finishes - */ + * Time for which sockets will stay open after last download finishes + */ int socketTimoutMS = 4000; /** From ca84c9cea3d2be851911a599e7837713ca720333 Mon Sep 17 00:00:00 2001 From: MichalT Date: Sun, 12 Feb 2023 12:26:11 +0100 Subject: [PATCH 3/3] Formatting fix for fullCL check --- CCDB/include/CCDB/CCDBDownloader.h | 4 ++-- CCDB/src/CCDBDownloader.cxx | 23 +++++++++++++--------- CCDB/test/testCcdbApiDownloader.cxx | 30 +++++++++++++++++++---------- 3 files changed, 36 insertions(+), 21 deletions(-) diff --git a/CCDB/include/CCDB/CCDBDownloader.h b/CCDB/include/CCDB/CCDBDownloader.h index 659e8579ff658..188c046fbac70 100644 --- a/CCDB/include/CCDB/CCDBDownloader.h +++ b/CCDB/include/CCDB/CCDBDownloader.h @@ -9,8 +9,8 @@ // granted to it by virtue of its status as an Intergovernmental Organization // or submit itself to any jurisdiction. -#include -#include +#include +#include #include #include #include diff --git a/CCDB/src/CCDBDownloader.cxx b/CCDB/src/CCDBDownloader.cxx index d87bac333072e..9fe3ebeac4dd1 100644 --- a/CCDB/src/CCDBDownloader.cxx +++ b/CCDB/src/CCDBDownloader.cxx @@ -13,8 +13,8 @@ #include #include -#include -#include +#include +#include #include #include #include @@ -112,7 +112,7 @@ void closeHandles(uv_handle_t* handle, void* arg) void onUVClose(uv_handle_t* handle) { - if (handle != NULL) { + if (handle != nullptr) { delete handle; } } @@ -188,10 +188,12 @@ void CCDBDownloader::curlPerform(uv_poll_t* handle, int status, int events) { int running_handles; int flags = 0; - if (events & UV_READABLE) + if (events & UV_READABLE) { flags |= CURL_CSELECT_IN; - if (events & UV_WRITABLE) + } + if (events & UV_WRITABLE) { flags |= CURL_CSELECT_OUT; + } auto context = (CCDBDownloader::curl_context_t*)handle->data; @@ -214,10 +216,12 @@ int CCDBDownloader::handleSocket(CURL* easy, curl_socket_t s, int action, void* curl_context = socketp ? (CCDBDownloader::curl_context_t*)socketp : CD->createCurlContext(s); curl_multi_assign(socketData->curlm, s, (void*)curl_context); - if (action != CURL_POLL_IN) + if (action != CURL_POLL_IN) { events |= UV_WRITABLE; - if (action != CURL_POLL_OUT) + } + if (action != CURL_POLL_OUT) { events |= UV_READABLE; + } if (CD->socketTimerMap.find(s) != CD->socketTimerMap.end()) { uv_timer_stop(CD->socketTimerMap[s]); @@ -232,7 +236,7 @@ int CCDBDownloader::handleSocket(CURL* easy, curl_socket_t s, int action, void* } uv_poll_stop(&((CCDBDownloader::curl_context_t*)socketp)->poll_handle); CD->destroyCurlContext((CCDBDownloader::curl_context_t*)socketp); - curl_multi_assign(socketData->curlm, s, NULL); + curl_multi_assign(socketData->curlm, s, nullptr); } break; default: @@ -359,8 +363,9 @@ int CCDBDownloader::startTimeout(CURLM* multi, long timeout_ms, void* userp) if (timeout_ms < 0) { uv_timer_stop(timeout); } else { - if (timeout_ms == 0) + if (timeout_ms == 0) { timeout_ms = 1; // Calling curlTimeout when timeout = 0 could create an infinite loop + } uv_timer_start(timeout, curlTimeout, timeout_ms, 0); } return 0; diff --git a/CCDB/test/testCcdbApiDownloader.cxx b/CCDB/test/testCcdbApiDownloader.cxx index f5f658e562925..0932751ef3144 100644 --- a/CCDB/test/testCcdbApiDownloader.cxx +++ b/CCDB/test/testCcdbApiDownloader.cxx @@ -97,16 +97,18 @@ BOOST_AUTO_TEST_CASE(blocking_batch_test) auto curlCodes = downloader.batchBlockingPerform(handleVector); for (CURLcode code : curlCodes) { BOOST_CHECK(code == CURLE_OK); - if (code != CURLE_OK) + if (code != CURLE_OK) { std::cout << "CURL Code: " << code << "\n"; + } } for (CURL* handle : handleVector) { long httpCode; curl_easy_getinfo(handle, CURLINFO_HTTP_CODE, &httpCode); BOOST_CHECK(httpCode == 200); - if (httpCode != 200) + if (httpCode != 200) { std::cout << "HTTP Code: " << httpCode << "\n"; + } curl_easy_cleanup(handle); } @@ -134,21 +136,24 @@ BOOST_AUTO_TEST_CASE(asynch_batch_test) bool flag = false; auto curlCodes = downloader.batchAsynchPerform(handleVector, &flag); - while (!flag) + while (!flag) { sleep(1); + } for (CURLcode code : (*curlCodes)) { BOOST_CHECK(code == CURLE_OK); - if (code != CURLE_OK) + if (code != CURLE_OK) { std::cout << "CURL Code: " << code << "\n"; + } } for (CURL* handle : handleVector) { long httpCode; curl_easy_getinfo(handle, CURLINFO_HTTP_CODE, &httpCode); BOOST_CHECK(httpCode == 200); - if (httpCode != 200) + if (httpCode != 200) { std::cout << "HTTP Code: " << httpCode << "\n"; + } curl_easy_cleanup(handle); } @@ -191,16 +196,18 @@ BOOST_AUTO_TEST_CASE(test_with_break) auto curlCodes2 = downloader.batchBlockingPerform(handleVector2); for (CURLcode code : curlCodes2) { BOOST_CHECK(code == CURLE_OK); - if (code != CURLE_OK) + if (code != CURLE_OK) { std::cout << "CURL Code: " << code << "\n"; + } } for (CURL* handle : handleVector2) { long httpCode; curl_easy_getinfo(handle, CURLINFO_HTTP_CODE, &httpCode); BOOST_CHECK(httpCode == 200); - if (httpCode != 200) + if (httpCode != 200) { std::cout << "HTTP Code: " << httpCode << "\n"; + } curl_easy_cleanup(handle); } @@ -236,23 +243,26 @@ BOOST_AUTO_TEST_CASE(asynch_batch_callback) bool flag = false; auto curlCodes = downloader.asynchBatchPerformWithCallback(handleVector, &flag, testCallback, &testValue); - while (!flag) + while (!flag) { sleep(1); + } BOOST_CHECK(testValue == 46); for (CURLcode code : (*curlCodes)) { BOOST_CHECK(code == CURLE_OK); - if (code != CURLE_OK) + if (code != CURLE_OK) { std::cout << "CURL Code: " << code << "\n"; + } } for (CURL* handle : handleVector) { long httpCode; curl_easy_getinfo(handle, CURLINFO_HTTP_CODE, &httpCode); BOOST_CHECK(httpCode == 200); - if (httpCode != 200) + if (httpCode != 200) { std::cout << "HTTP Code: " << httpCode << "\n"; + } curl_easy_cleanup(handle); }