Skip to content

Commit d3805c6

Browse files
committed
SquashedCommit
1 parent 599215a commit d3805c6

6 files changed

Lines changed: 1182 additions & 1 deletion

File tree

CCDB/CMakeLists.txt

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
o2_add_library(CCDB
1313
SOURCES src/CcdbApi.cxx
14+
src/CCDBDownloader.cxx
1415
src/BasicCCDBManager.cxx
1516
src/CCDBTimeStampUtils.cxx
1617
src/IdPath.cxx src/CCDBQuery.cxx
@@ -19,6 +20,7 @@ o2_add_library(CCDB
1920
O2::CommonUtils
2021
FairMQ::FairMQ
2122
libjalien::libjalienO2
23+
LibUV::LibUV
2224
TARGETVARNAME targetName)
2325

2426
o2_target_root_dictionary(CCDB
@@ -29,7 +31,8 @@ o2_target_root_dictionary(CCDB
2931
include/CCDB/IdPath.h
3032
include/CCDB/BasicCCDBManager.h
3133
include/CCDB/CCDBTimeStampUtils.h
32-
include/CCDB/CCDBQuery.h)
34+
include/CCDB/CCDBQuery.h
35+
include/CCDB/CCDBDownloader.h)
3336

3437
o2_add_executable(inspectccdbfile
3538
COMPONENT_NAME ccdb
@@ -76,3 +79,9 @@ o2_add_test(CcdbApiMultipleUrls
7679
COMPONENT_NAME ccdb
7780
PUBLIC_LINK_LIBRARIES O2::CCDB
7881
LABELS ccdb)
82+
83+
o2_add_test(CcdbDownloader
84+
SOURCES test/testCcdbApiDownloader.cxx
85+
COMPONENT_NAME ccdb
86+
PUBLIC_LINK_LIBRARIES O2::CCDB
87+
LABELS ccdb)

CCDB/include/CCDB/CCDBDownloader.h

Lines changed: 358 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,358 @@
1+
// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2+
// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3+
// All rights not expressly granted are reserved.
4+
//
5+
// This software is distributed under the terms of the GNU General Public
6+
// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7+
//
8+
// In applying this license CERN does not waive the privileges and immunities
9+
// granted to it by virtue of its status as an Intergovernmental Organization
10+
// or submit itself to any jurisdiction.
11+
12+
#include <stdio.h>
13+
#include <stdlib.h>
14+
#include <uv.h>
15+
#include <curl/curl.h>
16+
#include <string>
17+
#include <vector>
18+
#include <iostream>
19+
#include <thread>
20+
#include <mutex>
21+
#include <condition_variable>
22+
#include <unordered_map>
23+
24+
#ifndef ALICEO2_CCDBDOWNLOADER_H
25+
#define ALICEO2_CCDBDOWNLOADER_H
26+
27+
using namespace std;
28+
29+
namespace o2
30+
{
31+
namespace ccdb
32+
{
33+
34+
/*
35+
Some functions below aren't member functions of CCDBDownloader because both curl and libuv require callback functions which have to be either static or non-member.
36+
Because non-static functions are used in the functions below, they must be non-member.
37+
*/
38+
39+
/**
40+
* uv_walk callback which is used to close passed handle.
41+
*
42+
* @param handle Handle to be closed.
43+
* @param arg Argument required by callback template. Is not used in this implementation.
44+
*/
45+
void closeHandles(uv_handle_t* handle, void* arg);
46+
47+
/**
48+
* Called by CURL in order to open a new socket. Newly opened sockets are assigned a timeout timer and added to socketTimerMap.
49+
*
50+
* @param clientp Pointer to the CCDBDownloader instance which controls the socket.
51+
* @param purpose Purpose of opened socket. This parameter is unused but required by the callback template.
52+
* @param address Structure containing information about family, type and protocol for the socket.
53+
*/
54+
curl_socket_t opensocketCallback(void* clientp, curlsocktype purpose, struct curl_sockaddr* address);
55+
56+
/**
57+
* Delete the handle.
58+
*
59+
* @param handle Handle assigned to this callback.
60+
*/
61+
void onUVClose(uv_handle_t* handle);
62+
63+
class CCDBDownloader
64+
{
65+
public:
66+
67+
68+
/**
69+
* Timer starts for each socket when its respective transfer finishes, and is stopped when another transfer starts for that handle.
70+
* When the timer runs out it closes the socket. The period for which socket stays open is defined by socketTimoutMS.
71+
*/
72+
std::unordered_map<curl_socket_t, uv_timer_t*> socketTimerMap;
73+
74+
/**
75+
* The UV loop which handles transfers.
76+
*/
77+
uv_loop_t* loop;
78+
79+
std::unordered_map<uv_handle_t*, bool> handleMap;
80+
// ADD COMMENT
81+
82+
/**
83+
* Time for which sockets will stay open after last download finishes
84+
*/
85+
int socketTimoutMS = 4000;
86+
87+
/**
88+
* Max number of handles that can be used at the same time
89+
*/
90+
int maxHandlesInUse = 3;
91+
92+
// CCDBDownloader(uv_loop_t uv_loop);
93+
CCDBDownloader(uv_loop_t* uv_loop = nullptr);
94+
~CCDBDownloader();
95+
96+
/**
97+
* Perform on a single handle in a blocking manner. Has the same effect as curl_easy_perform().
98+
*
99+
* @param handle Handle to be performed on. It can be reused or cleaned after perform finishes.
100+
*/
101+
CURLcode perform(CURL* handle);
102+
103+
/**
104+
* Perform on a batch of handles. Callback will be exectuted in it's own thread after all handles finish their transfers.
105+
*
106+
* @param handles Handles to be performed on.
107+
*/
108+
std::vector<CURLcode>* asynchBatchPerformWithCallback(std::vector<CURL*> handles, bool* completionFlag, void (*cbFun)(void*), void* cbData);
109+
110+
/**
111+
* Perform on a batch of handles in a blocking manner. Has the same effect as calling curl_easy_perform() on all handles in the vector.
112+
* @param handleVector Handles to be performed on.
113+
*/
114+
std::vector<CURLcode> batchBlockingPerform(std::vector<CURL*> handleVector);
115+
116+
/**
117+
* Perform on a batch of handles. Completion flag will be set to true when all handles finish their transfers.
118+
* @param handleVector Handles to be performed on.
119+
* @param completionFlag Should be set to false before passing it to this function. Will be set to true after all transfers finish.
120+
*/
121+
std::vector<CURLcode>* batchAsynchPerform(std::vector<CURL*> handleVector, bool* completionFlag);
122+
123+
/**
124+
* Limits the number of parallel connections. Should be used only if no transfers are happening.
125+
*/
126+
void setMaxParallelConnections(int limit);
127+
128+
/**
129+
* Limits the time a socket and its connection will be opened after transfer finishes.
130+
*/
131+
void setSocketTimoutTime(int timoutMS);
132+
133+
private:
134+
135+
/**
136+
* Indicates whether the loop that the downloader is running on has been created by it or provided externally.
137+
* In case of external loop, the loop will not be closed after downloader is deleted.
138+
*/
139+
bool externalLoop;
140+
141+
/**
142+
* Current amount of handles which are performed on.
143+
*/
144+
int handlesInUse = 0;
145+
146+
/**
147+
* Multi handle which controlls all network flow.
148+
*/
149+
CURLM* curlMultiHandle = nullptr;
150+
151+
/**
152+
* The timeout clock that is be used by CURL.
153+
*/
154+
uv_timer_t* timeout;
155+
156+
/**
157+
* Queue of handles awaiting their transfers to start.
158+
*/
159+
std::vector<CURL*> handlesToBeAdded;
160+
161+
/**
162+
* Lock protecting the handleToBeAdded queue.
163+
*/
164+
std::mutex handlesQueueLock;
165+
166+
/**
167+
* Thread on which the thread with uv_loop runs.
168+
*/
169+
std::thread* loopThread;
170+
171+
/**
172+
* Vector with reference to callback threads with a flag marking whether they finished running.
173+
*/
174+
std::vector<std::pair<std::thread*, bool*>> threadFlagPairVector;
175+
176+
/**
177+
* Flag used to signall the loop to close.
178+
*/
179+
bool closeLoop = false;
180+
181+
/**
182+
* Types of requests.
183+
*/
184+
enum RequestType {
185+
BLOCKING,
186+
ASYNCHRONOUS,
187+
ASYNCHRONOUS_WITH_CALLBACK
188+
};
189+
190+
/**
191+
* Information about a socket.
192+
*/
193+
typedef struct curl_context_s {
194+
uv_poll_t poll_handle;
195+
curl_socket_t sockfd = -1;
196+
CCDBDownloader* CD = nullptr;
197+
} curl_context_t;
198+
199+
/**
200+
* Structure used for CURLMOPT_SOCKETDATA, which gives context for handleSocket
201+
*/
202+
typedef struct DataForSocket {
203+
CCDBDownloader* CD;
204+
CURLM* curlm;
205+
} DataForSocket;
206+
207+
/**
208+
* Structure which is stored in a easy_handle. It carries information about the request which the easy_handle is part of.
209+
* All easy handles coming from one request have an identical PerformData structure.
210+
*/
211+
typedef struct PerformData {
212+
std::condition_variable* cv;
213+
bool* completionFlag;
214+
CURLcode* codeDestination;
215+
void (*cbFun)(void*);
216+
std::thread* cbThread;
217+
void* cbData;
218+
size_t* requestsLeft;
219+
RequestType type;
220+
} PerformData;
221+
222+
/**
223+
* Called by CURL in order to close a socket. It will be called by CURL even if a timout timer closed the socket beforehand.
224+
*
225+
* @param clientp Pointer to the CCDBDownloader instance which controls the socket.
226+
* @param item File descriptor of the socket.
227+
*/
228+
static void closesocketCallback(void* clientp, curl_socket_t item);
229+
230+
/**
231+
* Is used to react to polling file descriptors in poll_handle.
232+
*
233+
* @param handle Handle assigned to this callback.
234+
* @param status Used to signal errors.
235+
* @param events Bitmask used to describe events on the socket.
236+
*/
237+
static void curlPerform(uv_poll_t* handle, int status, int events);
238+
239+
/**
240+
* Check if loop was signalled to close. The handle connected with this callbacks is always active as to prevent the uv_loop from stopping.
241+
*
242+
* @param handle uv_handle to which this callbacks is assigned
243+
*/
244+
static void checkStopSignal(uv_timer_t* handle);
245+
246+
/**
247+
* Used by CURL to react to action happening on a socket.
248+
*/
249+
static int handleSocket(CURL* easy, curl_socket_t s, int action, void* userp, void* socketp);
250+
251+
/**
252+
* Asynchronously notify the loop to check its CURL handle queue.
253+
*
254+
* @param handle Handle which is assigned to this callback.
255+
*/
256+
static void asyncUVHandleCheckQueue(uv_async_t* handle);
257+
258+
/**
259+
* Close socket assigned to the timer handle.
260+
*
261+
* @param handle Handle which is assigned to this callback.
262+
*/
263+
static void closeSocketByTimer(uv_timer_t* handle);
264+
265+
/**
266+
* Start new transfers, terminate expired transfers.
267+
*
268+
* @param req Handle which is assigned to this callback.
269+
*/
270+
static void curlTimeout(uv_timer_t* req);
271+
272+
/**
273+
* Free curl context assigned to the handle.
274+
*
275+
* @param handle Handle assigned to this callback.
276+
*/
277+
static void curlCloseCB(uv_handle_t* handle);
278+
279+
/**
280+
* Close poll handle assigned to the socket contained in the context and free data within the handle.
281+
*
282+
* @param context Structure containing information about socket and handle to be closed.
283+
*/
284+
static void destroyCurlContext(curl_context_t* context);
285+
286+
/**
287+
* Connect curl timer with uv timer.
288+
*
289+
* @param multi Multi handle for which the timout will be set
290+
* @param timeout_ms Time until timeout
291+
* @param userp Pointer to the uv_timer_t handle that is used for timeout.
292+
*/
293+
static int startTimeout(CURLM* multi, long timeout_ms, void* userp);
294+
295+
/**
296+
* Check if any of the callback threads have finished running and approprietly join them.
297+
*/
298+
void checkForThreadsToJoin();
299+
300+
/**
301+
* Create a new multi_handle for the downloader
302+
*/
303+
void initializeMultiHandle();
304+
305+
/**
306+
* Release resources reserver for the transfer, mark transfer as complete, passe the CURLcode to the destination and launche callbacks if it is specified in PerformData.
307+
*
308+
* @param handle The easy_handle for which the transfer completed
309+
* @param curlCode The code produced for the handle by the transfer
310+
*/
311+
void transferFinished(CURL* handle, CURLcode curlCode);
312+
313+
/**
314+
* Check message queue inside curl multi handle.
315+
*/
316+
void checkMultiInfo();
317+
318+
/**
319+
* Set openSocketCallback and closeSocketCallback with appropriate arguments. Stores data inside the CURL handle.
320+
*/
321+
void setHandleOptions(CURL* handle, PerformData* data);
322+
323+
/**
324+
* Create structure holding information about a socket including a poll handle assigned to it
325+
*
326+
* @param socketfd File descriptor of socket for which the structure will be created
327+
*/
328+
curl_context_t* createCurlContext(curl_socket_t sockfd);
329+
330+
/**
331+
* Asynchroniously signal the event loop to check for new easy_handles to add to multi handle.
332+
*/
333+
void makeLoopCheckQueueAsync();
334+
335+
/**
336+
* If multi_handles uses less then maximum number of handles then add handles from the queue.
337+
*/
338+
void checkHandleQueue();
339+
340+
/**
341+
* Start the event loop. This function should be ran in the `loopThread`.
342+
*/
343+
void runLoop();
344+
345+
};
346+
347+
/**
348+
* Structure assigned to a uv_timer_t before adding it to socketTimerMap. It stores the information about the socket connected to the timer.
349+
*/
350+
typedef struct DataForClosingSocket {
351+
CCDBDownloader* CD;
352+
curl_socket_t socket;
353+
} DataForClosingSocket;
354+
355+
} // namespace ccdb
356+
} // namespace o2
357+
358+
#endif

0 commit comments

Comments
 (0)