forked from aldebaran/libqi-python
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathpyfuture.cpp
More file actions
296 lines (250 loc) · 10.2 KB
/
pyfuture.cpp
File metadata and controls
296 lines (250 loc) · 10.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
/*
** Copyright (C) 2020 SoftBank Robotics Europe
** See COPYING for the license
*/
#include <qipython/pyfuture.hpp>
#include <qipython/common.hpp>
#include <qipython/pyguard.hpp>
#include <qipython/pystrand.hpp>
#include <qi/future.hpp>
#include <qi/anyobject.hpp>
#include <pybind11/pybind11.h>
static constexpr const auto logCategory = "qi.python.future";
qiLogCategory(logCategory);
namespace py = pybind11;
namespace qi
{
namespace py
{
namespace
{
Future futureBarrier(std::vector<Future> futs)
{
auto waitFut = waitForAll(futs).async();
Promise prom(boost::bind(waitFut.makeCanceler()));
waitFut.andThen([=](const std::vector<Future>& futs) mutable {
prom.setValue(AnyValue::from(futs));
});
return prom.future();
}
// A function to cast a Python object into a C++ object, unless R is void, in
// which case does nothing.
//
// pybind11 will fail to cast an object to void if the object is not `None` or
// a capsule, but all we want is to ignore the result if the return value is
// expected to be void.
template<typename R>
R castIfNotVoid(const ::py::object& obj)
{
return ::py::cast<R>(obj);
}
template<>
void castIfNotVoid<void>(const ::py::object&) {}
template<typename R, typename... Args>
std::function<qi::Future<R>(Args...)> toContinuation(const ::py::function& cb)
{
GILAcquire lock;
SharedObject sharedCb(cb);
auto callSharedCb = [=](Args... args) mutable {
GILAcquire lock;
const auto handleExcept = ka::handle_exception_rethrow(
exceptionLogVerbose(
logCategory,
"An exception occurred while executing a future continuation"),
ka::type_t<::py::object>{});
const auto pyRes =
ka::invoke_catch(handleExcept, [&] {
return invokeCatchPythonError(
sharedCb.takeInner(),
std::forward<Args>(args)...);
});
return castIfNotVoid<R>(pyRes);
};
auto strand = strandOfFunction(cb);
if (strand)
return strand->schedulerFor(std::move(callSharedCb));
return futurizeOutput(std::move(callSharedCb));
}
void addCallback(Future fut, const ::py::function& cb)
{
auto cont = toContinuation<void, Future>(cb);
GILRelease _unlock;
fut.connect(std::move(cont));
}
Future then(Future fut, const ::py::function& cb)
{
auto cont = toContinuation<AnyValue, Future>(cb);
GILRelease _unlock;
return fut.then(std::move(cont)).unwrap();
}
Future andThen(Future fut, const ::py::function& cb)
{
auto cont = toContinuation<AnyValue, AnyValue>(cb);
GILRelease _unlock;
return fut.andThen(std::move(cont)).unwrap();
}
Future unwrap(Future fut)
{
Promise prom(boost::bind(fut.makeCanceler()));
fut.connect([=](qi::Future<AnyValue> fut) mutable {
if (!fut.hasValue())
{
adaptFuture(fut, prom);
return;
}
AnyReference ref = fut.value().asReference();
while (ref.kind() == TypeKind_Dynamic)
ref = ref.content();
AnyValue val(ref);
if (!qi::detail::handleFuture(val.asReference(), prom))
{
std::ostringstream ss;
ss << "Unwrapping something that is not a nested future: "
<< ref.type()->infoString();
qiLogWarning() << ss.str();
prom.setError(ss.str());
return;
}
// `handleFuture` takes ownership of the value on success, we can release it.
val.release();
});
return prom.future();
}
} // namespace
void exportFuture(::py::module& m)
{
using namespace ::py;
using namespace ::py::literals;
GILAcquire lock;
enum_<FutureState>(m, "FutureState")
.value("None", FutureState_None)
.value("Running", FutureState_Running)
.value("Canceled", FutureState_Canceled)
.value("FinishedWithError", FutureState_FinishedWithError)
.value("FinishedWithValue", FutureState_FinishedWithValue);
enum_<FutureTimeout>(m, "FutureTimeout")
.value("None", FutureTimeout_None)
.value("Infinite", FutureTimeout_Infinite);
class_<Promise>(m, "Promise")
.def(
init([](std::function<void(Promise&)> onCancel) {
Promise prom;
if (onCancel)
{
prom.setOnCancel([=](Promise& prom) {
ka::invoke_catch(
exceptionLogWarning("qipy.future",
"Promise `onCancel` callback threw an exception"),
[&] { onCancel(prom); });
});
}
return prom;
}),
call_guard<GILRelease>(),
"on_cancel"_a = none(),
doc(":param on_cancel: a function that will be called when a cancel is requested on the future.\n"))
.def("setCanceled", &Promise::setCanceled,
call_guard<GILRelease>(),
doc("Set the state of the promise to Canceled."))
.def("setError", &Promise::setError,
call_guard<GILRelease>(),
"error"_a,
doc("Set the error of the promise."))
.def("setValue", &Promise::setValue,
call_guard<GILRelease>(),
"value"_a,
doc("Set the value of the promise."))
.def("future", &Promise::future,
call_guard<GILRelease>(),
doc("Get a future tied to the promise. You can get multiple futures from the same promise."))
.def("isCancelRequested", &Promise::isCancelRequested,
call_guard<GILRelease>(),
doc(":returns: True if the future associated with the promise asked for cancellation."));
class_<Future>(m, "Future")
.def(init<AnyValue>(),
doc("Create a future with a value."))
.def("value",
static_cast<const AnyValue&(Future::*)(int) const>(&Future::value),
call_guard<GILRelease>(),
"timeout"_a = FutureTimeout_Infinite,
doc("Block until the future is ready.\n\n"
":param timeout: a time in milliseconds. Optional.\n"
":returns: the value of the future.\n"
":raises: a RuntimeError if the timeout is reached or the future has error."))
.def("error", &Future::error,
call_guard<GILRelease>(),
"timeout"_a = FutureTimeout_Infinite,
doc("Block until the future is ready.\n\n"
":param timeout: a time in milliseconds. Optional.\n"
":returns: the error of the future.\n"
":raises: a RuntimeError if the timeout is reached or the future has no error."))
.def("wait",
static_cast<FutureState(Future::*)(int) const>(&Future::wait),
call_guard<GILRelease>(),
"timeout"_a = FutureTimeout_Infinite,
doc("Wait for the future to be ready.\n\n"
":param timeout: a time in milliseconds. Optional.\n"
":returns: a :data:`qi.FutureState`."))
.def("hasError", &Future::hasError,
call_guard<GILRelease>(),
"timeout"_a = FutureTimeout_Infinite,
doc(":param timeout: a time in milliseconds. Optional.\n"
":returns: true iff the future has an error.\n"
":raise: a RuntimeError if the timeout is reached."))
.def("hasValue", &Future::hasValue,
call_guard<GILRelease>(),
"timeout"_a = FutureTimeout_Infinite,
doc(":param timeout: a time in milliseconds. Optional.\n"
":returns: true iff the future has a value.\n"
":raise: a RuntimeError if the timeout is reached."))
.def("cancel", &Future::cancel,
call_guard<GILRelease>(),
doc("Ask for cancellation."))
.def("isFinished", &Future::isFinished,
call_guard<GILRelease>(),
doc("Return true if the future is not running anymore (i.e. if hasError or hasValue or isCanceled)."))
.def("isRunning", &Future::isRunning,
call_guard<GILRelease>(),
doc("Return true if the future is still running."))
.def("isCanceled", &Future::isCanceled,
call_guard<GILRelease>(),
doc("Return true if the future is canceled."))
.def("isCancelable", [](const Future&) { return true; },
call_guard<GILRelease>(),
doc(":returns: always true, all future are cancellable now\n"
".. deprecated:: 1.5.0\n"))
.def("addCallback", &qi::py::addCallback,
"callback"_a,
doc("Add a callback that will be called when the future becomes ready.\n\n"
"The callback will be called even if the future is already ready.\n"
"The first argument of the callback is the future itself.\n\n"
":param callback: a python callable, could be a method or a function."))
.def("then", &qi::py::then,
"callback"_a,
doc("Add a callback that will be called when the future becomes ready.\n\n"
"The callback will be called even if the future is already ready.\n"
"The first argument of the callback is the future itself.\n\n"
":param callback: a python callable, could be a method or a function.\n"
":returns: a future that will contain the return value of the callback."))
.def("andThen", &qi::py::andThen,
"callback"_a,
doc("Add a callback that will be called when the future becomes ready if it has a value.\n\n"
"If the future finishes with an error, the callback is not called and the future returned by "
"andThen is set to that error.\n"
"The callback will be called even if the future is already ready.\n"
"The first argument of the callback is the value of the future itself.\n\n"
":param callback: a python callable, could be a method or a function.\n"
":returns: a future that will contain the return value of the callback."))
.def("unwrap", &qi::py::unwrap,
call_guard<GILRelease>(),
doc("If this is a Future of a Future of X, return a Future of X.\n\n"
"The state of both futures is forwarded and cancel requests are forwarded to the appropriate future."));
m.def("futureBarrier", &qi::py::futureBarrier,
call_guard<GILRelease>(),
doc("Return a future that will be set with all the futures given as argument when they are\n"
" all finished. This is useful to wait for a bunch of Futures at once.\n\n"
":param futureList: A list of Futures to wait for\n"
":returns: A Future of list of futureList."));
}
} // namespace py
} // namespace qi