Skip to content

Commit f47999f

Browse files
CcdbDownloader implementation, which doesn't affect CcdbApi (#10483)
* SquashedCommit * Please consider the following formatting changes * Formatting fix for fullCL check --------- Co-authored-by: ALICE Action Bot <alibuild@cern.ch>
1 parent 5fa626f commit f47999f

6 files changed

Lines changed: 1175 additions & 1 deletion

File tree

CCDB/CMakeLists.txt

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
o2_add_library(CCDB
1313
SOURCES src/CcdbApi.cxx
14+
src/CCDBDownloader.cxx
1415
src/BasicCCDBManager.cxx
1516
src/CCDBTimeStampUtils.cxx
1617
src/IdPath.cxx src/CCDBQuery.cxx
@@ -19,6 +20,7 @@ o2_add_library(CCDB
1920
O2::CommonUtils
2021
FairMQ::FairMQ
2122
libjalien::libjalienO2
23+
LibUV::LibUV
2224
TARGETVARNAME targetName)
2325

2426
o2_target_root_dictionary(CCDB
@@ -29,7 +31,8 @@ o2_target_root_dictionary(CCDB
2931
include/CCDB/IdPath.h
3032
include/CCDB/BasicCCDBManager.h
3133
include/CCDB/CCDBTimeStampUtils.h
32-
include/CCDB/CCDBQuery.h)
34+
include/CCDB/CCDBQuery.h
35+
include/CCDB/CCDBDownloader.h)
3336

3437
o2_add_executable(inspectccdbfile
3538
COMPONENT_NAME ccdb
@@ -76,3 +79,9 @@ o2_add_test(CcdbApiMultipleUrls
7679
COMPONENT_NAME ccdb
7780
PUBLIC_LINK_LIBRARIES O2::CCDB
7881
LABELS ccdb)
82+
83+
o2_add_test(CcdbDownloader
84+
SOURCES test/testCcdbApiDownloader.cxx
85+
COMPONENT_NAME ccdb
86+
PUBLIC_LINK_LIBRARIES O2::CCDB
87+
LABELS ccdb)

CCDB/include/CCDB/CCDBDownloader.h

Lines changed: 354 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,354 @@
1+
// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2+
// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3+
// All rights not expressly granted are reserved.
4+
//
5+
// This software is distributed under the terms of the GNU General Public
6+
// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7+
//
8+
// In applying this license CERN does not waive the privileges and immunities
9+
// granted to it by virtue of its status as an Intergovernmental Organization
10+
// or submit itself to any jurisdiction.
11+
12+
#include <cstdio>
13+
#include <cstdlib>
14+
#include <uv.h>
15+
#include <curl/curl.h>
16+
#include <string>
17+
#include <vector>
18+
#include <iostream>
19+
#include <thread>
20+
#include <mutex>
21+
#include <condition_variable>
22+
#include <unordered_map>
23+
24+
#ifndef ALICEO2_CCDBDOWNLOADER_H
25+
#define ALICEO2_CCDBDOWNLOADER_H
26+
27+
using namespace std;
28+
29+
namespace o2
30+
{
31+
namespace ccdb
32+
{
33+
34+
/*
35+
Some functions below aren't member functions of CCDBDownloader because both curl and libuv require callback functions which have to be either static or non-member.
36+
Because non-static functions are used in the functions below, they must be non-member.
37+
*/
38+
39+
/**
40+
* uv_walk callback which is used to close passed handle.
41+
*
42+
* @param handle Handle to be closed.
43+
* @param arg Argument required by callback template. Is not used in this implementation.
44+
*/
45+
void closeHandles(uv_handle_t* handle, void* arg);
46+
47+
/**
48+
* Called by CURL in order to open a new socket. Newly opened sockets are assigned a timeout timer and added to socketTimerMap.
49+
*
50+
* @param clientp Pointer to the CCDBDownloader instance which controls the socket.
51+
* @param purpose Purpose of opened socket. This parameter is unused but required by the callback template.
52+
* @param address Structure containing information about family, type and protocol for the socket.
53+
*/
54+
curl_socket_t opensocketCallback(void* clientp, curlsocktype purpose, struct curl_sockaddr* address);
55+
56+
/**
57+
* Delete the handle.
58+
*
59+
* @param handle Handle assigned to this callback.
60+
*/
61+
void onUVClose(uv_handle_t* handle);
62+
63+
class CCDBDownloader
64+
{
65+
public:
66+
/**
67+
* Timer starts for each socket when its respective transfer finishes, and is stopped when another transfer starts for that handle.
68+
* When the timer runs out it closes the socket. The period for which socket stays open is defined by socketTimoutMS.
69+
*/
70+
std::unordered_map<curl_socket_t, uv_timer_t*> socketTimerMap;
71+
72+
/**
73+
* The UV loop which handles transfers.
74+
*/
75+
uv_loop_t* loop;
76+
77+
std::unordered_map<uv_handle_t*, bool> handleMap;
78+
// ADD COMMENT
79+
80+
/**
81+
* Time for which sockets will stay open after last download finishes
82+
*/
83+
int socketTimoutMS = 4000;
84+
85+
/**
86+
* Max number of handles that can be used at the same time
87+
*/
88+
int maxHandlesInUse = 3;
89+
90+
// CCDBDownloader(uv_loop_t uv_loop);
91+
CCDBDownloader(uv_loop_t* uv_loop = nullptr);
92+
~CCDBDownloader();
93+
94+
/**
95+
* Perform on a single handle in a blocking manner. Has the same effect as curl_easy_perform().
96+
*
97+
* @param handle Handle to be performed on. It can be reused or cleaned after perform finishes.
98+
*/
99+
CURLcode perform(CURL* handle);
100+
101+
/**
102+
* Perform on a batch of handles. Callback will be exectuted in it's own thread after all handles finish their transfers.
103+
*
104+
* @param handles Handles to be performed on.
105+
*/
106+
std::vector<CURLcode>* asynchBatchPerformWithCallback(std::vector<CURL*> handles, bool* completionFlag, void (*cbFun)(void*), void* cbData);
107+
108+
/**
109+
* Perform on a batch of handles in a blocking manner. Has the same effect as calling curl_easy_perform() on all handles in the vector.
110+
* @param handleVector Handles to be performed on.
111+
*/
112+
std::vector<CURLcode> batchBlockingPerform(std::vector<CURL*> handleVector);
113+
114+
/**
115+
* Perform on a batch of handles. Completion flag will be set to true when all handles finish their transfers.
116+
* @param handleVector Handles to be performed on.
117+
* @param completionFlag Should be set to false before passing it to this function. Will be set to true after all transfers finish.
118+
*/
119+
std::vector<CURLcode>* batchAsynchPerform(std::vector<CURL*> handleVector, bool* completionFlag);
120+
121+
/**
122+
* Limits the number of parallel connections. Should be used only if no transfers are happening.
123+
*/
124+
void setMaxParallelConnections(int limit);
125+
126+
/**
127+
* Limits the time a socket and its connection will be opened after transfer finishes.
128+
*/
129+
void setSocketTimoutTime(int timoutMS);
130+
131+
private:
132+
/**
133+
* Indicates whether the loop that the downloader is running on has been created by it or provided externally.
134+
* In case of external loop, the loop will not be closed after downloader is deleted.
135+
*/
136+
bool externalLoop;
137+
138+
/**
139+
* Current amount of handles which are performed on.
140+
*/
141+
int handlesInUse = 0;
142+
143+
/**
144+
* Multi handle which controlls all network flow.
145+
*/
146+
CURLM* curlMultiHandle = nullptr;
147+
148+
/**
149+
* The timeout clock that is be used by CURL.
150+
*/
151+
uv_timer_t* timeout;
152+
153+
/**
154+
* Queue of handles awaiting their transfers to start.
155+
*/
156+
std::vector<CURL*> handlesToBeAdded;
157+
158+
/**
159+
* Lock protecting the handleToBeAdded queue.
160+
*/
161+
std::mutex handlesQueueLock;
162+
163+
/**
164+
* Thread on which the thread with uv_loop runs.
165+
*/
166+
std::thread* loopThread;
167+
168+
/**
169+
* Vector with reference to callback threads with a flag marking whether they finished running.
170+
*/
171+
std::vector<std::pair<std::thread*, bool*>> threadFlagPairVector;
172+
173+
/**
174+
* Flag used to signall the loop to close.
175+
*/
176+
bool closeLoop = false;
177+
178+
/**
179+
* Types of requests.
180+
*/
181+
enum RequestType {
182+
BLOCKING,
183+
ASYNCHRONOUS,
184+
ASYNCHRONOUS_WITH_CALLBACK
185+
};
186+
187+
/**
188+
* Information about a socket.
189+
*/
190+
typedef struct curl_context_s {
191+
uv_poll_t poll_handle;
192+
curl_socket_t sockfd = -1;
193+
CCDBDownloader* CD = nullptr;
194+
} curl_context_t;
195+
196+
/**
197+
* Structure used for CURLMOPT_SOCKETDATA, which gives context for handleSocket
198+
*/
199+
typedef struct DataForSocket {
200+
CCDBDownloader* CD;
201+
CURLM* curlm;
202+
} DataForSocket;
203+
204+
/**
205+
* Structure which is stored in a easy_handle. It carries information about the request which the easy_handle is part of.
206+
* All easy handles coming from one request have an identical PerformData structure.
207+
*/
208+
typedef struct PerformData {
209+
std::condition_variable* cv;
210+
bool* completionFlag;
211+
CURLcode* codeDestination;
212+
void (*cbFun)(void*);
213+
std::thread* cbThread;
214+
void* cbData;
215+
size_t* requestsLeft;
216+
RequestType type;
217+
} PerformData;
218+
219+
/**
220+
* Called by CURL in order to close a socket. It will be called by CURL even if a timout timer closed the socket beforehand.
221+
*
222+
* @param clientp Pointer to the CCDBDownloader instance which controls the socket.
223+
* @param item File descriptor of the socket.
224+
*/
225+
static void closesocketCallback(void* clientp, curl_socket_t item);
226+
227+
/**
228+
* Is used to react to polling file descriptors in poll_handle.
229+
*
230+
* @param handle Handle assigned to this callback.
231+
* @param status Used to signal errors.
232+
* @param events Bitmask used to describe events on the socket.
233+
*/
234+
static void curlPerform(uv_poll_t* handle, int status, int events);
235+
236+
/**
237+
* Check if loop was signalled to close. The handle connected with this callbacks is always active as to prevent the uv_loop from stopping.
238+
*
239+
* @param handle uv_handle to which this callbacks is assigned
240+
*/
241+
static void checkStopSignal(uv_timer_t* handle);
242+
243+
/**
244+
* Used by CURL to react to action happening on a socket.
245+
*/
246+
static int handleSocket(CURL* easy, curl_socket_t s, int action, void* userp, void* socketp);
247+
248+
/**
249+
* Asynchronously notify the loop to check its CURL handle queue.
250+
*
251+
* @param handle Handle which is assigned to this callback.
252+
*/
253+
static void asyncUVHandleCheckQueue(uv_async_t* handle);
254+
255+
/**
256+
* Close socket assigned to the timer handle.
257+
*
258+
* @param handle Handle which is assigned to this callback.
259+
*/
260+
static void closeSocketByTimer(uv_timer_t* handle);
261+
262+
/**
263+
* Start new transfers, terminate expired transfers.
264+
*
265+
* @param req Handle which is assigned to this callback.
266+
*/
267+
static void curlTimeout(uv_timer_t* req);
268+
269+
/**
270+
* Free curl context assigned to the handle.
271+
*
272+
* @param handle Handle assigned to this callback.
273+
*/
274+
static void curlCloseCB(uv_handle_t* handle);
275+
276+
/**
277+
* Close poll handle assigned to the socket contained in the context and free data within the handle.
278+
*
279+
* @param context Structure containing information about socket and handle to be closed.
280+
*/
281+
static void destroyCurlContext(curl_context_t* context);
282+
283+
/**
284+
* Connect curl timer with uv timer.
285+
*
286+
* @param multi Multi handle for which the timout will be set
287+
* @param timeout_ms Time until timeout
288+
* @param userp Pointer to the uv_timer_t handle that is used for timeout.
289+
*/
290+
static int startTimeout(CURLM* multi, long timeout_ms, void* userp);
291+
292+
/**
293+
* Check if any of the callback threads have finished running and approprietly join them.
294+
*/
295+
void checkForThreadsToJoin();
296+
297+
/**
298+
* Create a new multi_handle for the downloader
299+
*/
300+
void initializeMultiHandle();
301+
302+
/**
303+
* Release resources reserver for the transfer, mark transfer as complete, passe the CURLcode to the destination and launche callbacks if it is specified in PerformData.
304+
*
305+
* @param handle The easy_handle for which the transfer completed
306+
* @param curlCode The code produced for the handle by the transfer
307+
*/
308+
void transferFinished(CURL* handle, CURLcode curlCode);
309+
310+
/**
311+
* Check message queue inside curl multi handle.
312+
*/
313+
void checkMultiInfo();
314+
315+
/**
316+
* Set openSocketCallback and closeSocketCallback with appropriate arguments. Stores data inside the CURL handle.
317+
*/
318+
void setHandleOptions(CURL* handle, PerformData* data);
319+
320+
/**
321+
* Create structure holding information about a socket including a poll handle assigned to it
322+
*
323+
* @param socketfd File descriptor of socket for which the structure will be created
324+
*/
325+
curl_context_t* createCurlContext(curl_socket_t sockfd);
326+
327+
/**
328+
* Asynchroniously signal the event loop to check for new easy_handles to add to multi handle.
329+
*/
330+
void makeLoopCheckQueueAsync();
331+
332+
/**
333+
* If multi_handles uses less then maximum number of handles then add handles from the queue.
334+
*/
335+
void checkHandleQueue();
336+
337+
/**
338+
* Start the event loop. This function should be ran in the `loopThread`.
339+
*/
340+
void runLoop();
341+
};
342+
343+
/**
344+
* Structure assigned to a uv_timer_t before adding it to socketTimerMap. It stores the information about the socket connected to the timer.
345+
*/
346+
typedef struct DataForClosingSocket {
347+
CCDBDownloader* CD;
348+
curl_socket_t socket;
349+
} DataForClosingSocket;
350+
351+
} // namespace ccdb
352+
} // namespace o2
353+
354+
#endif

0 commit comments

Comments
 (0)