forked from microsoft/Multiverso
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathworker.cpp
More file actions
90 lines (78 loc) · 2.79 KB
/
worker.cpp
File metadata and controls
90 lines (78 loc) · 2.79 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
#include "multiverso/worker.h"
#include <vector>
#include "multiverso/dashboard.h"
#include "multiverso/util/mt_queue.h"
#include "multiverso/util/log.h"
#include "multiverso/zoo.h"
namespace multiverso {
Worker::Worker() : Actor(actor::kWorker) {
RegisterHandler(MsgType::Request_Get, std::bind(
&Worker::ProcessGet, this, std::placeholders::_1));
RegisterHandler(MsgType::Request_Add, std::bind(
&Worker::ProcessAdd, this, std::placeholders::_1));
RegisterHandler(MsgType::Reply_Get, std::bind(
&Worker::ProcessReplyGet, this, std::placeholders::_1));
RegisterHandler(MsgType::Reply_Add, std::bind(
&Worker::ProcessReplyAdd, this, std::placeholders::_1));
}
int Worker::RegisterTable(WorkerTable* worker_table) {
CHECK_NOTNULL(worker_table);
int id = static_cast<int>(cache_.size());
cache_.push_back(worker_table);
return id;
}
void Worker::ProcessGet(MessagePtr& msg) {
MONITOR_BEGIN(WORKER_PROCESS_GET)
int table_id = msg->table_id();
int msg_id = msg->msg_id();
std::unordered_map<int, std::vector<Blob>> partitioned_key;
int num = cache_[table_id]->Partition(msg->data(),
MsgType::Request_Get,
&partitioned_key);
cache_[table_id]->Reset(msg_id, num);
for (auto& it : partitioned_key) {
MessagePtr msg(new Message());
msg->set_src(Zoo::Get()->rank());
msg->set_dst(it.first);
msg->set_type(MsgType::Request_Get);
msg->set_msg_id(msg_id);
msg->set_table_id(table_id);
msg->set_data(it.second);
SendTo(actor::kCommunicator, msg);
}
MONITOR_END(WORKER_PROCESS_GET)
}
void Worker::ProcessAdd(MessagePtr& msg) {
MONITOR_BEGIN(WORKER_PROCESS_ADD)
int table_id = msg->table_id();
int msg_id = msg->msg_id();
std::unordered_map<int, std::vector<Blob>> partitioned_kv;
CHECK_NOTNULL(msg.get());
CHECK(!msg->data().empty());
int num = cache_[table_id]->Partition(msg->data(),
MsgType::Request_Add,
&partitioned_kv);
cache_[table_id]->Reset(msg_id, num);
for (auto& it : partitioned_kv) {
MessagePtr msg(new Message());
msg->set_src(Zoo::Get()->rank());
msg->set_dst(it.first);
msg->set_type(MsgType::Request_Add);
msg->set_msg_id(msg_id);
msg->set_table_id(table_id);
msg->set_data(it.second);
SendTo(actor::kCommunicator, msg);
}
MONITOR_END(WORKER_PROCESS_ADD)
}
void Worker::ProcessReplyGet(MessagePtr& msg) {
MONITOR_BEGIN(WORKER_PROCESS_REPLY_GET)
int table_id = msg->table_id();
cache_[table_id]->ProcessReplyGet(msg->data());
cache_[table_id]->Notify(msg->msg_id());
MONITOR_END(WORKER_PROCESS_REPLY_GET)
}
void Worker::ProcessReplyAdd(MessagePtr& msg) {
cache_[msg->table_id()]->Notify(msg->msg_id());
}
} // namespace multiverso