forked from BoostGSoC21/python
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy patheventloop.hpp
More file actions
179 lines (141 loc) · 4.96 KB
/
eventloop.hpp
File metadata and controls
179 lines (141 loc) · 4.96 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
// Copyright Pan Yue 2021.
// Distributed under the Boost Software License, Version 1.0. (See
// accompanying file LICENSE_1_0.txt or copy at
// http://www.boost.org/LICENSE_1_0.txt)
// TODO:
// 1. posix::stream_descriptor need windows version
// 2. call_* need return async.Handle
# ifndef EVENT_LOOP_PY2021_HPP
# define EVENT_LOOP_PY2021_HPP
#include <unordered_map>
#include <boost/asio.hpp>
#include <boost/python.hpp>
#include <boost/mpl/vector.hpp>
namespace boost { namespace python { namespace asio {
class event_loop
{
public:
event_loop(const boost::asio::io_context::strand& strand): _strand{strand}
{
try
{
_pymod_ssl = import("ssl");
}
catch (const error_already_set& e)
{
if (PyErr_ExceptionMatches(PyExc_ImportError))
{
PyErr_Clear();
}
}
}
// TODO: An instance of asyncio.Handle is returned, which can be used later to cancel the callback.
inline void call_soon(object f)
{
_strand.post([f]{
PyEval_AcquireLock();
f();
PyEval_ReleaseLock();
});
}
// TODO: implement this
inline void call_soon_thread_safe(object f) {};
// Schedule callback to be called after the given delay number of seconds
// TODO: An instance of asyncio.Handle is returned, which can be used later to cancel the callback.
void call_later(double delay, object f);
void call_at(double when, object f);
inline double time()
{
return std::chrono::steady_clock::now().time_since_epoch().count();
}
inline void add_reader(int fd, object f)
{
_async_wait_fd(fd, f, _read_key(fd));
}
inline void remove_reader(int fd)
{
_descriptor_map.erase(_read_key(fd));
}
inline void add_writer(int fd, object f)
{
_async_wait_fd(fd, f, _write_key(fd));
}
inline void remove_writer(int fd)
{
_descriptor_map.erase(_write_key(fd));
}
object sock_recv(object sock, size_t nbytes);
object sock_recv_into(object sock, object buffer);
object sock_sendall(object sock, object data);
object sock_connect(object sock, object address);
object sock_accept(object sock);
object sock_sendfile(object sock, object file, int offset = 0, int count = 0, bool fallback = true);
object start_tls(object transport, object protocol, object sslcontext,
bool server_side = false,
object server_hostname = object(),
object ssl_handshake_timeout = object());
object getaddrinfo(object host, int port, int family = 0, int type = 0, int proto = 0, int flags = 0);
object getnameinfo(object sockaddr, int flags = 0);
void set_exception_handler(object handler)
{
if (handler != object() && !PyObject_HasAttrString(handler.ptr(), "__call__")) {
PyErr_SetString(PyExc_TypeError, "A callable object or None is expected");
throw_error_already_set();
}
_exception_handler = handler;
}
object get_exception_handler()
{
return _exception_handler;
}
void default_exception_handler(object context);
void call_exception_handler(object context);
object create_future();
// TODO
inline bool get_debug()
{
return false;
}
private:
object _pymod_ssl = object();
object _pymod_socket = import("socket");
object _pymod_traceback = import("traceback");
object _pymod_asyncio_futures = import("asyncio").attr("futures");
object _py_logger = import("asyncio.log").attr("logger");
object _pymod_concurrent_futures = import("concurrent").attr("futures");
object _exception_handler = object();
boost::asio::io_context::strand _strand;
// read: key = fd * 2 + 0, write: key = fd * 2 + 1
std::unordered_map<int, std::unique_ptr<boost::asio::posix::stream_descriptor>> _descriptor_map;
inline int _read_key(int fd)
{
return fd * 2;
}
inline int _write_key(int fd)
{
return fd * 2 + 1;
}
template<typename F>
void _async_wait_fd(int fd, F f, int key)
{
// add descriptor
if (_descriptor_map.find(key) == _descriptor_map.end())
{
_descriptor_map.emplace(key,
std::move(std::make_unique<boost::asio::posix::stream_descriptor>(_strand.context(), fd))
);
}
_descriptor_map.find(key)->second->async_wait(boost::asio::posix::descriptor::wait_type::wait_read,
boost::asio::bind_executor(_strand, [this, key, f] (const boost::system::error_code& ec)
{
_descriptor_map.erase(key);
f();
}));
return;
}
static void _sock_connect_cb(object pymod_socket, object fut, object sock, object addr);
static void _sock_accept(event_loop& loop, object fut, object sock);
};
void set_default_event_loop(const boost::asio::io_context::strand& strand);
}}} // namespace boost::python::asio
# endif