@@ -33,6 +33,20 @@ struct ResultMParam : public GMessageParam {
3333 std::string eng_info_;
3434};
3535
36+ class InputGNode : public GNode {
37+ public:
38+ CStatus run () override {
39+ for (int i = 0 ; i < 30 ; i++) {
40+ std::unique_ptr<InputMParam> input (new InputMParam ());
41+ randomSleep (1 , 5 ); // 间隔1~6ms,发送一次
42+ input->num_ = std::abs ((int )std::random_device{}()) % 5 + 1 ;
43+ CGRAPH_SEND_MPARAM (InputMParam, INPUT_TOPIC_NAME, input, GMessagePushStrategy::WAIT);
44+ }
45+
46+ CGRAPH_ECHO (" InputGNode run finished" );
47+ return CStatus ();
48+ }
49+ };
3650
3751class ProcessGNode : public GNode {
3852public:
@@ -44,7 +58,7 @@ class ProcessGNode : public GNode {
4458 break ; // 一阵子收不到消息了,就自动停止好了
4559 }
4660
47- int ms = randomSleep (1 , 200 ); // 模拟处理流程,随机休息不超过 200ms
61+ int ms = randomSleep (1 , 100 ); // 模拟处理流程,随机休息不超过 100ms
4862 std::unique_ptr<ResultMParam> result (new ResultMParam);
4963 switch (input->num_ ) {
5064 case 1 : result->eng_info_ = " one" ; break ;
@@ -89,25 +103,17 @@ void example_third_flow() {
89103 CGRAPH_CREATE_MESSAGE_TOPIC (ResultMParam, RESULT_TOPIC_NAME, 16 );
90104
91105 auto pipeline = GPipelineFactory::create ();
92- GElementPtr a, b = nullptr ;
93- pipeline->registerGElement <ProcessGNode>(&a);
94- pipeline->registerGElement <ResultGNode>(&b);
106+ GElementPtr input, process, result = nullptr ;
107+ pipeline->registerGElement <InputGNode>(&input);
108+ pipeline->registerGElement <ProcessGNode>(&process);
109+ pipeline->registerGElement <ResultGNode>(&result);
95110
96111 UThreadPoolConfig config;
97- config.secondary_thread_size_ = 2 ;
98- pipeline->setUniqueThreadPoolConfig (config);
99-
100- auto fut = pipeline->asyncProcess ();
101-
102- // 在这里,一直制造input信息,然后放到后面的两个流中去处理
103- for (int i = 0 ; i < 30 ; i++) {
104- std::unique_ptr<InputMParam> input (new InputMParam ());
105- randomSleep (1 , 5 ); // 间隔1~6ms,发送一次
106- input->num_ = std::abs ((int )std::random_device{}()) % 5 + 1 ;
107- CGRAPH_SEND_MPARAM (InputMParam, INPUT_TOPIC_NAME, input, GMessagePushStrategy::WAIT);
108- }
112+ config.default_thread_size_ = 3 ;
113+ config.secondary_thread_size_ = 0 ;
114+ pipeline->setUniqueThreadPoolConfig (config); // 设置3个线程执行
109115
110- fut. wait (); // 等待pipeline执行结束
116+ pipeline-> process ();
111117 CGRAPH_CLEAR_MESSAGES ()
112118 GPipelineFactory::clear ();
113119}
0 commit comments