forked from chronolaw/cpp_study
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathnode.cpp
More file actions
170 lines (144 loc) · 4.94 KB
/
node.cpp
File metadata and controls
170 lines (144 loc) · 4.94 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
// Asynchronous client-to-server (DEALER to ROUTER)
//
// While this example runs in a single process, that is to make
// it easier to start and stop the example. Each task has its own
// context and conceptually acts as a separate process.
#include <vector>
#include <thread>
#include <memory>
#include <functional>
#include <zmq.hpp>
#include "zhelpers.hpp"
// This is our client task class.
// It connects to the server, and then sends a request once per second
// It collects responses as they arrive, and it prints them out. We will
// run several client tasks in parallel, each with a different random ID.
// Attention! -- this random work well only on linux.
class client_task {
public:
client_task()
: ctx_(1),
client_socket_(ctx_, ZMQ_DEALER)
{}
void start() {
// generate random identity
char identity[10] = {};
sprintf(identity, "%04X-%04X", within(0x10000), within(0x10000));
printf("%s\n", identity);
client_socket_.setsockopt(ZMQ_IDENTITY, identity, strlen(identity));
client_socket_.connect("tcp://localhost:5570");
zmq::pollitem_t items[] = {
{ static_cast<void*>(client_socket_), 0, ZMQ_POLLIN, 0 } };
int request_nbr = 0;
try {
while (true) {
for (int i = 0; i < 100; ++i) {
// 10 milliseconds
zmq::poll(items, 1, 10);
if (items[0].revents & ZMQ_POLLIN) {
printf("\n%s ", identity);
s_dump(client_socket_);
}
}
char request_string[16] = {};
sprintf(request_string, "request #%d", ++request_nbr);
client_socket_.send(request_string, strlen(request_string));
}
}
catch (std::exception &e) {}
}
private:
zmq::context_t ctx_;
zmq::socket_t client_socket_;
};
// Each worker task works on one request at a time and sends a random number
// of replies back, with random delays between replies:
class server_worker {
public:
server_worker(zmq::context_t &ctx, int sock_type, int id)
: ctx_(ctx),
worker_(ctx_, sock_type)
{}
void work() {
worker_.connect("inproc://backend"+str(id)));
try {
while (true) {
zmq::message_t identity;
zmq::message_t msg;
zmq::message_t copied_id;
zmq::message_t copied_msg;
worker_.recv(&identity);
worker_.recv(&msg);
int replies = within(5);
for (int reply = 0; reply < replies; ++reply) {
s_sleep(within(1000) + 1);
copied_id.copy(&identity);
copied_msg.copy(&msg);
worker_.send(copied_id, ZMQ_SNDMORE);
worker_.send(copied_msg);
}
}
}
catch (std::exception &e) {}
}
private:
zmq::context_t &ctx_;
zmq::socket_t worker_;
};
// This is our server task.
// It uses the multithreaded server model to deal requests out to a pool
// of workers and route replies back to clients. One worker can handle
// one request at a time but one client can talk to multiple workers at
// once.
class server_task {
public:
server_task()
: ctx_(1),
frontend_(ctx_, ZMQ_ROUTER),
backend_(ctx_, ZMQ_DEALER)
{}
enum { kMaxThread = 5 };
void run() {
int port = 5570+m_id;
frontend_.bind("tcp://*:"+str(port);
backend_.bind("inproc://backend"+str(id)));
std::vector<server_worker *> worker;
std::vector<std::thread *> worker_thread;
for (int i = 0; i < kMaxThread; ++i) {
worker.push_back(new server_worker(ctx_, ZMQ_DEALER));
worker_thread.push_back(new std::thread(std::bind(&server_worker::work, worker)));
worker_thread->detach();
}
std::vector<client_task *> clients;
std::vector<std::thread *> client_thread;
client_task ct1;
client_task ct2;
client_task ct3;
std::thread t1(std::bind(&client_task::start, &ct1));
std::thread t2(std::bind(&client_task::start, &ct2));
std::thread t3(std::bind(&client_task::start, &ct3));
t1.detach();
t2.detach();
t3.detach();
try {
zmq::proxy(static_cast<void*>(frontend_),
static_cast<void*>(backend_),
nullptr);
}
catch (std::exception &e) {}
for (int i = 0; i < kMaxThread; ++i) {
delete worker;
delete worker_thread;
}
}
private:
zmq::context_t ctx_;
zmq::socket_t frontend_;
zmq::socket_t backend_;
};
// The main thread simply starts several clients and a server, and then
// waits for the server to finish.
int main (void){
server_task st(2, 5);
st.run();
}