Skip to content

Commit 32abf7a

Browse files
committed
DPL: add WebSocket based DriverClient
Inactive for now, this is just to minimize changes needed to enable it.
1 parent 90557f5 commit 32abf7a

5 files changed

Lines changed: 552 additions & 0 deletions

File tree

Framework/Core/CMakeLists.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,12 +95,14 @@ o2_add_library(Framework
9595
src/WorkflowHelpers.cxx
9696
src/WorkflowSerializationHelpers.cxx
9797
src/WorkflowSpec.cxx
98+
src/WSDriverClient.cxx
9899
src/runDataProcessing.cxx
99100
src/ExternalFairMQDeviceProxy.cxx
100101
src/HistogramRegistry.cxx
101102
src/StepTHn.cxx
102103
src/SHA1.cxx
103104
src/Base64.cxx
105+
src/DPLWebSocket.cxx
104106
test/TestClasses.cxx
105107
PRIVATE_INCLUDE_DIRECTORIES ${CMAKE_CURRENT_LIST_DIR}/src
106108
PUBLIC_LINK_LIBRARIES AliceO2::Common
Lines changed: 291 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,291 @@
1+
// Copyright CERN and copyright holders of ALICE O2. This software is
2+
// distributed under the terms of the GNU General Public License v3 (GPL
3+
// Version 3), copied verbatim in the file "COPYING".
4+
//
5+
// See http://alice-o2.web.cern.ch/license for full licensing information.
6+
//
7+
// In applying this license CERN does not waive the privileges and immunities
8+
// granted to it by virtue of its status as an Intergovernmental Organization
9+
// or submit itself to any jurisdiction.
10+
#include "Framework/Logger.h"
11+
#include "DPLWebSocket.h"
12+
#include "Framework/RuntimeError.h"
13+
#include "Framework/DeviceSpec.h"
14+
#include "HTTPParser.h"
15+
#include <algorithm>
16+
#include <uv.h>
17+
#include <sys/types.h>
18+
#include <unistd.h>
19+
20+
namespace o2::framework
21+
{
22+
23+
static void my_alloc_cb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf)
24+
{
25+
buf->base = (char*)malloc(suggested_size);
26+
buf->len = suggested_size;
27+
}
28+
29+
/// Free any resource associated with the device - driver channel
30+
void websocket_server_close_callback(uv_handle_t* handle)
31+
{
32+
LOG(DEBUG) << "socket closed";
33+
delete (WSDPLHandler*)handle->data;
34+
free(handle);
35+
}
36+
37+
void ws_error_write_callback(uv_write_t* h, int status)
38+
{
39+
LOG(ERROR) << "Error in write callback: " << uv_strerror(status);
40+
if (h->data) {
41+
free(h->data);
42+
}
43+
uv_close((uv_handle_t*)h->handle, websocket_server_close_callback);
44+
free(h);
45+
}
46+
47+
/// Actually replies to any incoming websocket stuff.
48+
void websocket_server_callback(uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf)
49+
{
50+
WSDPLHandler* server = (WSDPLHandler*)stream->data;
51+
assert(server);
52+
if (nread == 0) {
53+
return;
54+
}
55+
if (nread == UV_EOF) {
56+
LOG(DEBUG) << "websocket_server_callback: communication with driver closed";
57+
uv_close((uv_handle_t*)stream, websocket_server_close_callback);
58+
return;
59+
}
60+
if (nread < 0) {
61+
LOG(ERROR) << "websocket_server_callback: Error while reading from websocket";
62+
uv_close((uv_handle_t*)stream, websocket_server_close_callback);
63+
return;
64+
}
65+
try {
66+
parse_http_request(buf->base, nread, server);
67+
} catch (RuntimeErrorRef& ref) {
68+
auto& err = o2::framework::error_from_ref(ref);
69+
LOG(ERROR) << "Error while parsing request: " << err.what;
70+
}
71+
}
72+
73+
/// Whenever we have handshaken correctly, we can wait for the
74+
/// actual frames until we get an error.
75+
void ws_handshake_done_callback(uv_write_t* h, int status)
76+
{
77+
if (status) {
78+
LOG(ERROR) << "uv_write error: " << uv_err_name(status);
79+
free(h);
80+
return;
81+
}
82+
uv_read_start((uv_stream_t*)h->handle, (uv_alloc_cb)my_alloc_cb, websocket_server_callback);
83+
}
84+
85+
WSDPLHandler::WSDPLHandler(uv_stream_t* s, std::unique_ptr<WebSocketHandler> h)
86+
: mStream{s},
87+
mHandler{std::move(h)}
88+
{
89+
}
90+
91+
void WSDPLHandler::method(std::string_view const& s)
92+
{
93+
if (s != "GET") {
94+
throw WSError{400, "Bad Request"};
95+
}
96+
}
97+
98+
void WSDPLHandler::target(std::string_view const& s)
99+
{
100+
if (s != "/") {
101+
throw WSError{404, "Unknown"};
102+
}
103+
}
104+
105+
void populateHeader(std::map<std::string, std::string>& headers, std::string_view const& k, std::string_view const& v)
106+
{
107+
std::string kk{k};
108+
std::string vv{v};
109+
std::transform(kk.begin(), kk.end(), kk.begin(),
110+
[](unsigned char c) { return std::tolower(c); });
111+
if (kk != "sec-websocket-accept" && kk != "sec-websocket-key") {
112+
std::transform(vv.begin(), vv.end(), vv.begin(),
113+
[](unsigned char c) { return std::tolower(c); });
114+
}
115+
headers.insert(std::make_pair(kk, vv));
116+
}
117+
118+
void WSDPLHandler::header(std::string_view const& k, std::string_view const& v)
119+
{
120+
populateHeader(mHeaders, k, v);
121+
}
122+
123+
void WSDPLHandler::endHeaders()
124+
{
125+
/// Make sure this is a websocket upgrade request.
126+
if (mHeaders["upgrade"] != "websocket") {
127+
throw WSError{400, "Bad Request: not a websocket upgrade"};
128+
}
129+
if (mHeaders["connection"] != "upgrade") {
130+
throw WSError{400, "Bad Request: connection not for upgrade"};
131+
}
132+
if (mHeaders["sec-websocket-protocol"] != "dpl") {
133+
throw WSError{400, "Bad Request: websocket protocol not \"dpl\"."};
134+
}
135+
if (mHeaders.count("sec-websocket-key") == 0) {
136+
throw WSError{400, "Bad Request: sec-websocket-key missing"};
137+
}
138+
if (mHeaders["sec-websocket-version"] != "13") {
139+
throw WSError{400, "Bad Request: wrong protocol version"};
140+
}
141+
mHandler->headers(mHeaders);
142+
/// Create an appropriate reply
143+
LOG(debug) << "Got upgrade request with nonce " << mHeaders["sec-websocket-key"].c_str();
144+
std::string reply = encode_websocket_handshake_reply(mHeaders["sec-websocket-key"].c_str());
145+
mHandshaken = true;
146+
147+
uv_buf_t bfr = uv_buf_init(strdup(reply.data()), reply.size());
148+
uv_write_t* info_req = (uv_write_t*)malloc(sizeof(uv_write_t));
149+
uv_write(info_req, (uv_stream_t*)mStream, &bfr, 1, ws_handshake_done_callback);
150+
}
151+
152+
/// Actual handling of WS frames happens inside here.
153+
void WSDPLHandler::body(char* data, size_t s)
154+
{
155+
decode_websocket(data, s, *mHandler.get());
156+
}
157+
158+
/// Helper to return an error
159+
void WSDPLHandler::error(int code, char const* message)
160+
{
161+
static char const* errorFMT = "HTTP/1.1 {} {}\r\ncontent-type: text/plain\r\n\r\n{}: {}\r\n";
162+
std::string error = fmt::format(errorFMT, code, message, code, message);
163+
char* reply = strdup(error.data());
164+
uv_buf_t bfr = uv_buf_init(reply, error.size());
165+
uv_write_t* error_rep = (uv_write_t*)malloc(sizeof(uv_write_t));
166+
error_rep->data = reply;
167+
uv_write(error_rep, (uv_stream_t*)mStream, &bfr, 1, ws_error_write_callback);
168+
}
169+
170+
void websocket_client_callback(uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf)
171+
{
172+
WSDPLClient* client = (WSDPLClient*)stream->data;
173+
assert(client);
174+
if (nread == 0) {
175+
return;
176+
}
177+
if (nread < 0) {
178+
// FIXME: improve error message
179+
// FIXME: should I close?
180+
LOG(ERROR) << "Error while reading from websocket";
181+
return;
182+
}
183+
try {
184+
LOG(INFO) << "Data received from server";
185+
parse_http_request(buf->base, nread, client);
186+
} catch (RuntimeErrorRef& ref) {
187+
auto& err = o2::framework::error_from_ref(ref);
188+
LOG(ERROR) << "Error while parsing request: " << err.what;
189+
}
190+
}
191+
192+
// FIXME: mNonce should be random
193+
WSDPLClient::WSDPLClient(uv_stream_t* s, DeviceSpec const& spec)
194+
: mStream{s},
195+
mNonce{"dGhlIHNhbXBsZSBub25jZQ=="},
196+
mSpec{spec}
197+
{
198+
s->data = this;
199+
uv_read_start((uv_stream_t*)s, (uv_alloc_cb)my_alloc_cb, websocket_client_callback);
200+
}
201+
202+
void WSDPLClient::sendHandshake()
203+
{
204+
std::vector<std::pair<std::string, std::string>> headers = {
205+
{{"x-dpl-pid"}, std::to_string(getpid())},
206+
{{"x-dpl-id"}, mSpec.id},
207+
{{"x-dpl-name"}, mSpec.name}};
208+
std::string handShakeString = encode_websocket_handshake_request("/", "dpl", 13, mNonce.c_str(), headers);
209+
this->write(handShakeString.c_str(), handShakeString.size());
210+
}
211+
212+
void WSDPLClient::replyVersion(std::string_view const& s)
213+
{
214+
if (s != "HTTP/1.1") {
215+
throw runtime_error("Not an HTTP reply");
216+
}
217+
}
218+
219+
void WSDPLClient::replyCode(std::string_view const& s)
220+
{
221+
if (s != "101") {
222+
throw runtime_error("Upgrade denied");
223+
}
224+
}
225+
226+
void WSDPLClient::header(std::string_view const& k, std::string_view const& v)
227+
{
228+
populateHeader(mHeaders, k, v);
229+
}
230+
231+
void WSDPLClient::dumpHeaders()
232+
{
233+
for (auto [k, v] : mHeaders) {
234+
LOG(INFO) << k << ": " << v;
235+
}
236+
}
237+
238+
void WSDPLClient::endHeaders()
239+
{
240+
/// Make sure this is a websocket upgrade request.
241+
if (mHeaders["upgrade"] != "websocket") {
242+
throw runtime_error_f("No websocket upgrade");
243+
}
244+
if (mHeaders["connection"] != "upgrade") {
245+
throw runtime_error_f("No connection upgrade");
246+
}
247+
if (mHeaders.count("sec-websocket-accept") == 0) {
248+
throw runtime_error("sec-websocket-accept not found");
249+
}
250+
251+
std::string expectedAccept = HTTPParserHelpers::calculateAccept(mNonce.c_str());
252+
if (mHeaders["sec-websocket-accept"] != expectedAccept) {
253+
throw runtime_error_f(R"(Invalid accept received: "%s", expected "%s")", mHeaders["sec-websocket-accept"].c_str(), expectedAccept.c_str());
254+
}
255+
256+
LOG(INFO) << "Correctly handshaken websocket connection.";
257+
/// Create an appropriate reply
258+
mHandshaken = true;
259+
}
260+
261+
void ws_client_write_callback(uv_write_t* h, int status)
262+
{
263+
if (status) {
264+
LOG(ERROR) << "uv_write error: " << uv_err_name(status);
265+
free(h);
266+
return;
267+
}
268+
if (h->data) {
269+
free(h->data);
270+
}
271+
}
272+
273+
/// Helper to return an error
274+
void WSDPLClient::write(char const* message, size_t s)
275+
{
276+
uv_buf_t bfr = uv_buf_init(strdup(message), s);
277+
uv_write_t* write_req = (uv_write_t*)malloc(sizeof(uv_write_t));
278+
write_req->data = bfr.base;
279+
uv_write(write_req, (uv_stream_t*)mStream, &bfr, 1, ws_client_write_callback);
280+
}
281+
282+
void WSDPLClient::write(std::vector<uv_buf_t>& outputs)
283+
{
284+
for (auto& msg : outputs) {
285+
uv_write_t* write_req = (uv_write_t*)malloc(sizeof(uv_write_t));
286+
write_req->data = msg.base;
287+
uv_write(write_req, (uv_stream_t*)mStream, &msg, 1, ws_client_write_callback);
288+
}
289+
}
290+
291+
} // namespace o2::framework

Framework/Core/src/DPLWebSocket.h

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
// Copyright CERN and copyright holders of ALICE O2. This software is
2+
// distributed under the terms of the GNU General Public License v3 (GPL
3+
// Version 3), copied verbatim in the file "COPYING".
4+
//
5+
// See http://alice-o2.web.cern.ch/license for full licensing information.
6+
//
7+
// In applying this license CERN does not waive the privileges and immunities
8+
// granted to it by virtue of its status as an Intergovernmental Organization
9+
// or submit itself to any jurisdiction.
10+
#ifndef O2_FRAMEWORK_DPLWEBSOCKET_H_
11+
#define O2_FRAMEWORK_DPLWEBSOCKET_H_
12+
13+
#include <uv.h>
14+
#include "HTTPParser.h"
15+
#include <memory>
16+
#include <string>
17+
#include <map>
18+
19+
class uv_stream_s;
20+
21+
namespace o2::framework
22+
{
23+
24+
struct DeviceSpec;
25+
26+
struct WSError {
27+
int code;
28+
std::string message;
29+
};
30+
31+
struct WSDPLHandler : public HTTPParser {
32+
/// A http parser suitable to be used by DPL as a server
33+
/// @a stream is the stream from which the data is read,
34+
/// @a handler is the websocket handler to react on the
35+
/// various frames
36+
WSDPLHandler(uv_stream_t* stream, std::unique_ptr<WebSocketHandler> handler);
37+
void method(std::string_view const& s) override;
38+
void target(std::string_view const& s) override;
39+
void header(std::string_view const& k, std::string_view const& v) override;
40+
void endHeaders() override;
41+
/// Actual handling of WS frames happens inside here.
42+
void body(char* data, size_t s) override;
43+
44+
/// Helper to return an error
45+
void error(int code, char const* message);
46+
47+
std::unique_ptr<WebSocketHandler> mHandler;
48+
bool mHandshaken = false;
49+
uv_stream_t* mStream = nullptr;
50+
std::map<std::string, std::string> mHeaders;
51+
};
52+
53+
struct WSDPLClient : public HTTPParser {
54+
/// @a stream where the communication happens and @a spec of the device connecting
55+
/// to the driver.
56+
WSDPLClient(uv_stream_t* stream, DeviceSpec const& spec);
57+
void replyVersion(std::string_view const& s) override;
58+
void replyCode(std::string_view const& s) override;
59+
void header(std::string_view const& k, std::string_view const& v) override;
60+
void endHeaders() override;
61+
/// Helper to write a message to the server
62+
void write(char const*, size_t);
63+
64+
/// Helper to write n buffers containing websockets frames to a server
65+
void write(std::vector<uv_buf_t>& outputs);
66+
67+
/// Dump headers
68+
void dumpHeaders();
69+
void sendHandshake();
70+
bool isConnected() { return mHandshaken; }
71+
72+
std::string mNonce;
73+
DeviceSpec const& mSpec;
74+
bool mHandshaken = false;
75+
uv_stream_t* mStream = nullptr;
76+
std::map<std::string, std::string> mHeaders;
77+
};
78+
} // namespace o2::framework
79+
80+
#endif // O2_FRAMEWORK_DPL_WEBSOCKET_H_

0 commit comments

Comments
 (0)