Skip to content

Commit 5fa2b9c

Browse files
committed
[example] optimize third flow
1 parent 2ecc6c9 commit 5fa2b9c

File tree

1 file changed

+23
-17
lines changed

1 file changed

+23
-17
lines changed

example/E03-ThirdFlow.cpp

Lines changed: 23 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,20 @@ struct ResultMParam : public GMessageParam {
3333
std::string eng_info_;
3434
};
3535

36+
class InputGNode : public GNode {
37+
public:
38+
CStatus run() override {
39+
for (int i = 0; i < 30; i++) {
40+
std::unique_ptr<InputMParam> input(new InputMParam());
41+
randomSleep(1, 5); // 间隔1~6ms,发送一次
42+
input->num_ = std::abs((int)std::random_device{}()) % 5 + 1;
43+
CGRAPH_SEND_MPARAM(InputMParam, INPUT_TOPIC_NAME, input, GMessagePushStrategy::WAIT);
44+
}
45+
46+
CGRAPH_ECHO("InputGNode run finished");
47+
return CStatus();
48+
}
49+
};
3650

3751
class ProcessGNode : public GNode {
3852
public:
@@ -44,7 +58,7 @@ class ProcessGNode : public GNode {
4458
break; // 一阵子收不到消息了,就自动停止好了
4559
}
4660

47-
int ms = randomSleep(1, 200); // 模拟处理流程,随机休息不超过 200ms
61+
int ms = randomSleep(1, 100); // 模拟处理流程,随机休息不超过 100ms
4862
std::unique_ptr<ResultMParam> result(new ResultMParam);
4963
switch (input->num_) {
5064
case 1: result->eng_info_ = "one"; break;
@@ -89,25 +103,17 @@ void example_third_flow() {
89103
CGRAPH_CREATE_MESSAGE_TOPIC(ResultMParam, RESULT_TOPIC_NAME, 16);
90104

91105
auto pipeline = GPipelineFactory::create();
92-
GElementPtr a, b = nullptr;
93-
pipeline->registerGElement<ProcessGNode>(&a);
94-
pipeline->registerGElement<ResultGNode>(&b);
106+
GElementPtr input, process, result = nullptr;
107+
pipeline->registerGElement<InputGNode>(&input);
108+
pipeline->registerGElement<ProcessGNode>(&process);
109+
pipeline->registerGElement<ResultGNode>(&result);
95110

96111
UThreadPoolConfig config;
97-
config.secondary_thread_size_ = 2;
98-
pipeline->setUniqueThreadPoolConfig(config);
99-
100-
auto fut = pipeline->asyncProcess();
101-
102-
// 在这里,一直制造input信息,然后放到后面的两个流中去处理
103-
for (int i = 0; i < 30; i++) {
104-
std::unique_ptr<InputMParam> input(new InputMParam());
105-
randomSleep(1, 5); // 间隔1~6ms,发送一次
106-
input->num_ = std::abs((int)std::random_device{}()) % 5 + 1;
107-
CGRAPH_SEND_MPARAM(InputMParam, INPUT_TOPIC_NAME, input, GMessagePushStrategy::WAIT);
108-
}
112+
config.default_thread_size_ = 3;
113+
config.secondary_thread_size_ = 0;
114+
pipeline->setUniqueThreadPoolConfig(config); // 设置3个线程执行
109115

110-
fut.wait(); // 等待pipeline执行结束
116+
pipeline->process();
111117
CGRAPH_CLEAR_MESSAGES()
112118
GPipelineFactory::clear();
113119
}

0 commit comments

Comments
 (0)