This repository was archived by the owner on Oct 23, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 4
Expand file tree
/
Copy pathmanager.cpp
More file actions
56 lines (48 loc) · 1.47 KB
/
Copy pathmanager.cpp
File metadata and controls
56 lines (48 loc) · 1.47 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
#include "worker/manager.hpp"
#include "util/flags.hpp"
#include "worker/executor.hpp"
#include <fcntl.h>
#include <sys/wait.h>
#include <unistd.h>
#include <csignal>
#include <thread>
#include <unordered_set>
#include <capnp/ez-rpc.h>
#include "capnp/server.capnp.h"
#include "worker/executor.hpp"
#define check_error(op) \
if ((op) == -1) { \
throw std::runtime_error(std::string(#op) + strerror(errno)); \
}
namespace worker {
void Manager::Run() {
OnDone();
on_error_.promise.wait(client_.getWaitScope());
}
void Manager::OnDone() {
if (running_cores_ + reserved_cores_ < num_cores_) {
while (pending_requests_ < max_pending_requests_) {
pending_requests_++;
auto server = client_.getMain<capnproto::MainServer>();
auto req = server.registerEvaluatorRequest();
req.setName(name_ + " " + std::to_string(last_worker_id_++));
req.setEvaluator(kj::heap<Executor>(server, this, &cache_));
req.send().detach([this](kj::Exception exc) {
on_error_.fulfiller->reject(std::move(exc));
});
}
}
while (!waiting_tasks_.empty()) {
int sz = waiting_tasks_.front().first;
if (running_cores_ + sz > num_cores_) break;
reserved_cores_ -= sz;
running_cores_ += sz;
waiting_tasks_.front().second->fulfill();
waiting_tasks_.pop();
}
}
void Manager::CancelPending() {
pending_requests_--;
OnDone();
}
} // namespace worker