Skip to content
This repository was archived by the owner on Oct 23, 2023. It is now read-only.

Commit 8cda19d

Browse files
committed
Some fixes and extra logging
1 parent ae1fff0 commit 8cda19d

5 files changed

Lines changed: 49 additions & 29 deletions

File tree

cpp/server/dispatcher.cpp

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,9 @@ kj::Promise<capnp::Response<capnproto::Evaluator::EvaluateResults>>
109109
Dispatcher::AddRequest(capnproto::Request::Reader request,
110110
kj::Own<kj::PromiseFulfiller<void>> notify,
111111
const std::shared_ptr<bool>& canceled, size_t retries) {
112+
if (*canceled || canceled_evaluations_.count(request.getEvaluationId())) {
113+
return KJ_EXCEPTION(FAILED, "Enqueueing canceled request");
114+
}
112115
if (evaluators_.empty()) {
113116
auto request_promise = kj::newPromiseAndFulfiller<
114117
capnp::Response<capnproto::Evaluator::EvaluateResults>>();
@@ -137,30 +140,25 @@ Dispatcher::AddRequest(capnproto::Request::Reader request,
137140
-> kj::Promise<
138141
capnp::Response<capnproto::Evaluator::EvaluateResults>> {
139142
KJ_LOG(WARNING, "Worker failed");
143+
fulfiller->reject(KJ_EXCEPTION(FAILED, kj::cp(exc)));
140144
if (retries == 0) {
141-
fulfiller->reject(kj::cp(exc));
142145
KJ_LOG(WARNING, "Retries exhausted");
143146
return exc;
144147
}
145-
if (*canceled) {
146-
fulfiller->reject(kj::cp(exc));
147-
KJ_LOG(INFO, "Request canceled");
148-
return exc;
149-
}
150148
kj::PromiseFulfillerPair<void> dummy_start =
151149
kj::newPromiseAndFulfiller<void>();
152150
dummy_start.promise.then([]() { KJ_LOG(INFO, "Retrying..."); })
153151
.detach([](kj::Exception exc) {
154152
KJ_FAIL_ASSERT("dummy_start rejected", exc.getDescription());
155153
});
156-
fulfiller->reject(KJ_EXCEPTION(FAILED, kj::cp(exc)));
157154
return AddRequest(request, std::move(dummy_start.fulfiller),
158155
canceled, retries - 1);
159156
})
160157
.eagerlyEvaluate(nullptr);
161158
}
162159

163160
kj::Promise<void> Dispatcher::Cancel(uint32_t frontend_id) {
161+
canceled_evaluations_.insert(frontend_id);
164162
util::UnionPromiseBuilder builder;
165163
for (auto& kv : running_) {
166164
auto req = kv.second->cancelRequestRequest();

cpp/server/dispatcher.hpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
#include <kj/async.h>
55
#include <memory>
66
#include <queue>
7+
#include <set>
78
#include <unordered_map>
89
#include <vector>
910
#include "capnp/evaluation.capnp.h"
@@ -50,6 +51,8 @@ class Dispatcher {
5051
std::unordered_map<size_t, std::unique_ptr<capnproto::Evaluator::Client>>
5152
running_;
5253

54+
std::set<uint32_t> canceled_evaluations_;
55+
5356
// TODO: I could not make a Queue<Evaluator, void> work, for some reason
5457
std::vector<capnproto::Evaluator::Client> evaluators_;
5558
std::vector<kj::Own<kj::PromiseFulfiller<void>>> fulfillers_;

cpp/worker/executor.cpp

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ namespace {
2828
kj::Promise<sandbox::ExecutionInfo> RunSandbox(
2929
const sandbox::ExecutionOptions& exec_options,
3030
kj::LowLevelAsyncIoProvider* async_io_provider, uint32_t frontend_id,
31+
const std::set<uint32_t>& canceled_frontends,
3132
std::unordered_map<uint32_t, std::set<int>>* running) {
3233
KJ_LOG(INFO, "Starting sandbox");
3334
int options_pipe[2];
@@ -71,27 +72,34 @@ kj::Promise<sandbox::ExecutionInfo> RunSandbox(
7172
return out->write(ex_opts.get(), sizeof(exec_options))
7273
.attach(std::move(out), std::move(ex_opts))
7374
.then([fd = options_pipe[1], outcome_pipe, pid, in = std::move(in),
74-
running, frontend_id]() mutable {
75+
running, frontend_id, &canceled_frontends]() mutable {
7576
close(fd);
7677
auto promise = in->readAllBytes();
7778
return promise.attach(std::move(in))
78-
.then([pid, fd = outcome_pipe[0], running,
79+
.then([pid, fd = outcome_pipe[0], running, &canceled_frontends,
7980
frontend_id](const kj::Array<kj::byte>& data) {
8081
close(fd);
82+
if (canceled_frontends.count(frontend_id)) {
83+
sandbox::ExecutionInfo info;
84+
info.killed_external = true;
85+
return info;
86+
}
87+
int ret = 0;
88+
int err = waitpid(pid, &ret, 0);
89+
(*running)[frontend_id].erase(pid);
90+
KJ_ASSERT(err != -1, strerror(errno));
91+
KJ_ASSERT(ret == 0, "Sandbox failed");
92+
KJ_ASSERT(data.size() >= sizeof(size_t));
8193
size_t error_sz =
8294
*reinterpret_cast<const size_t*>(data.begin()); // NOLINT
95+
KJ_ASSERT(data.size() >= sizeof(size_t) + error_sz);
8396
const char* msg =
8497
reinterpret_cast<const char*>(data.begin()) + // NOLINT
8598
sizeof(size_t);
8699
KJ_ASSERT(!error_sz,
87100
std::string(msg, data.size() - sizeof(size_t)));
88101
sandbox::ExecutionInfo outcome;
89102
memcpy(&outcome, msg, sizeof(outcome));
90-
int ret = 0;
91-
int err = waitpid(pid, &ret, 0);
92-
(*running)[frontend_id].erase(pid);
93-
KJ_ASSERT(err != -1, strerror(errno));
94-
KJ_ASSERT(ret == 0, "Sandbox failed");
95103
return outcome;
96104
});
97105
});
@@ -374,7 +382,7 @@ kj::Promise<void> Executor::Execute(capnproto::Request::Reader request_,
374382
info_.add(RunSandbox(
375383
exec_options_v[i],
376384
&manager_->Client().getLowLevelIoProvider(),
377-
frontend_id, &running_));
385+
frontend_id, canceled_evaluations_, &running_));
378386
}
379387
return kj::joinPromises(info_.releaseAsArray());
380388
}))
@@ -456,11 +464,13 @@ kj::Promise<void> Executor::Execute(capnproto::Request::Reader request_,
456464
}
457465
},
458466
[fail](kj::Exception exc) mutable {
467+
KJ_LOG(WARNING, "Execution failed: ", exc.getDescription());
459468
fail(exc.getDescription());
460469
})
461470
.eagerlyEvaluate(nullptr);
462471
},
463472
[this](kj::Exception exc) -> kj::Promise<void> {
473+
KJ_LOG(WARNING, "Execution canceled: ", exc.getDescription());
464474
manager_->CancelPending();
465475
return exc;
466476
});
@@ -470,6 +480,7 @@ kj::Promise<void> Executor::cancelRequest(CancelRequestContext context) {
470480
uint32_t evaluation_id = context.getParams().getEvaluationId();
471481
KJ_LOG(INFO,
472482
"Cancelling evaluations of frontend " + std::to_string(evaluation_id));
483+
canceled_evaluations_.insert(evaluation_id);
473484
for (int pid : running_[evaluation_id]) {
474485
kill(pid, SIGINT);
475486
}

cpp/worker/executor.hpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@ class Executor : public capnproto::Evaluator::Server {
4141

4242
std::unordered_map<uint32_t, std::set<int>> running_;
4343

44+
std::set<uint32_t> canceled_evaluations_;
45+
4446
capnproto::FileSender::Client server_;
4547
Manager* manager_;
4648
Cache* cache_;

cpp/worker/manager.hpp

Lines changed: 20 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -35,20 +35,26 @@ class Manager {
3535
pending_requests_--;
3636
waiting_tasks_.emplace(size, std::move(pf.fulfiller));
3737
OnDone();
38-
return pf.promise.then([this, f, size]() {
39-
kj::Promise<T> ret = f();
40-
return ret.then(
41-
[size, this](T r) {
42-
running_cores_ -= size;
43-
OnDone();
44-
return r;
45-
},
46-
[size, this](kj::Exception exc) {
47-
running_cores_ -= size;
48-
OnDone();
49-
return exc;
50-
});
51-
});
38+
return pf.promise.then(
39+
[this, f, size]() {
40+
kj::Promise<T> ret = f();
41+
return ret.then(
42+
[size, this](T r) {
43+
running_cores_ -= size;
44+
OnDone();
45+
return r;
46+
},
47+
[size, this](kj::Exception exc) {
48+
KJ_LOG(WARNING, "Task failed: ", exc.getDescription());
49+
running_cores_ -= size;
50+
OnDone();
51+
return exc;
52+
});
53+
},
54+
[](kj::Exception exc) {
55+
KJ_LOG(WARNING, "Task preparation failed: ", exc.getDescription());
56+
return kj::Promise<T>(std::move(exc));
57+
});
5258
}
5359

5460
// Removes a pending request to the server, typically because of a failure.

0 commit comments

Comments
 (0)