1- #include < utility>
2-
3- #include < utility>
4-
51// The MIT License (MIT)
62//
73// Copyright (c) 11/27/18 nick. <nbatkins@gmail.com>
2420// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
2521// SOFTWARE.#include "consumer.hpp"
2622
23+ #include < cpp_redis/core/consumer.hpp>
2724
25+ #include < functional>
2826
29- #include < cpp_redis/core/consumer.hpp>
27+ using std::bind;
28+ using namespace std ::placeholders;
3029
3130namespace cpp_redis {
3231
3332 consumer::consumer (std::string stream, std::string consumer, size_t max_concurrency)
3433 : m_stream(std::move(stream)),
3534 m_name (std::move(consumer)),
3635 m_max_concurrency(max_concurrency),
37- m_task_queue(),
38- m_client(new client()),
39- m_sub_client(new client()),
40- m_proc_queue(new dispatch_queue(stream, max_concurrency)) {
36+ m_callbacks(),
37+ is_new(true ) {
38+ // auto fn = bind(&consumer::dispatch_changed_handler, this, 1, std::placeholders::_1);
39+ m_dispatch_queue = std::unique_ptr<dispatch_queue_t >(new dispatch_queue (stream, [&](size_t size){
40+ dispatch_changed_handler (size);
41+ }, max_concurrency));
42+ m_client = std::unique_ptr<consumer_client_container_t >(new consumer_client_container ());
4143 }
4244
4345 consumer &cpp_redis::consumer::subscribe (const std::string &group,
4446 const consumer_callback_t &consumer_callback,
4547 const acknowledgement_callback_t &acknowledgement_callback) {
46- std::unique_lock<std::mutex> task_queue_lock (m_task_queue_mutex);
47- m_task_queue[group] = {consumer_callback, acknowledgement_callback};
48- task_queue_lock.unlock ();
48+ // std::lock_guard<std::mutex> task_queue_lock(m_callbacks_mutex);
49+ m_callbacks.insert ({group, {consumer_callback, acknowledgement_callback}});
4950 return *this ;
5051 }
5152
53+ void consumer::dispatch_changed_handler (size_t size) {
54+ if (size >= m_max_concurrency) {
55+ dispatcher_full.store (true );
56+ dispatch_changed.notify_all ();
57+
58+ std::cout << " Notified" <<
59+ std::endl;
60+ }
61+ }
62+
5263 void consumer::connect (const std::string &host, size_t port, const connect_callback_t &connect_callback,
5364 uint32_t timeout_ms, int32_t max_reconnects, uint32_t reconnect_interval_ms) {
54- m_client->connect (host, port, connect_callback, timeout_ms, max_reconnects, reconnect_interval_ms);
55- m_sub_client-> connect (host, port, connect_callback, timeout_ms, max_reconnects, reconnect_interval_ms);
65+ m_client->ack_client . connect (host, port, connect_callback, timeout_ms, max_reconnects, reconnect_interval_ms);
66+ m_client-> poll_client . connect (host, port, connect_callback, timeout_ms, max_reconnects, reconnect_interval_ms);
5667 }
5768
5869 consumer &consumer::commit () {
59- // std::thread p([&]() {
60- // Set the consumer id to 0 so that we start with failed messages
61- std::string consumer_name = " 0" ;
62-
63- std::unique_lock<std::mutex> cv_mutex_lock (m_cv_mutex);
6470 while (!is_ready) {
65- if (!is_ready)
66- if (m_max_concurrency <= m_proc_queue->size ())
67- m_cv.wait (cv_mutex_lock);
68-
69- std::lock_guard<std::mutex> task_queue_lock (m_task_queue_mutex);
70- for (auto &q : m_task_queue) {
71- // task_queue_lock.lock();
72- auto group = q.first ;
73- auto cb_container = q.second ;
74- // task_queue_lock.unlock();
75- m_sub_client->xreadgroup ({group, consumer_name, {{m_stream}, {" >" }}, 1 , -1 , false } // count, block, no_ack
76- , [&](cpp_redis::reply &reply) {
77- cpp_redis::xstream_reply xs (reply);
78- if (xs.empty ()) {
79- if (consumer_name == " 0" ) {
80- consumer_name = m_name;
81- }
82- } else {
83- m_reply_queue.push (reply);
84- m_q_status.notify_one ();
85- // m_proc_queue->dispatch(fp_)
86- // process();
87- }
88- });
89- m_sub_client->sync_commit ();
71+ if (!is_ready) {
72+ std::unique_lock<std::mutex> dispatch_lock (dispatch_changed_mutex);
73+ dispatch_changed.wait (dispatch_lock, [&]() { return !dispatcher_full.load (); });
74+ poll ();
9075 }
9176 }
9277 // });
9378 return *this ;
9479 }
9580
81+ void consumer::poll () {
82+ message_type m;
83+ m_dispatch_queue->dispatch (m, [](const message_type &message) {
84+ std::cout << " Something" << std::endl;
85+ return message;
86+ });
87+ // std::lock_guard<std::mutex> task_queue_lock(m_callbacks_mutex);
88+ // std::string consumer_name = m_name;
89+ // std::string consumer_name = (is_new ? "0" : m_name);
90+ std::string group = " groupone" ;
91+ // auto q = m_.find("groupone");
92+ // task_queue_lock.lock();
93+ // auto group = q->first;
94+ // auto cb_container = q->second;
95+ // task_queue_lock.unlock();
96+ read_group_handler ({group, m_name, {{m_stream}, {" >" }}, 1 , -1 , false }); // count, block, no_ack
97+ }
98+
99+ void consumer::read_group_handler (const xreadgroup_options_t &a) {
100+ m_client->poll_client .xreadgroup (a, [&](cpp_redis::reply &reply) {
101+ cpp_redis::xstream_reply xs (reply);
102+ if (xs.empty ()) {
103+ if (is_new)
104+ is_new = false ;
105+ } else {
106+ m_reply_queue.push (reply);
107+ m_dispatch_status.notify_one ();
108+ }
109+ }).sync_commit ();
110+ }
111+
112+ bool consumer::queue_is_full () {
113+ std::lock_guard<std::mutex> cv_mutex_lock (m_cv_mutex);
114+ size_t m_proc_size = m_dispatch_queue->size ();
115+ return m_max_concurrency <= m_proc_size;
116+ }
117+
96118 void consumer::process () {
97- std::unique_lock<std::mutex> m_q_status_lock (m_q_status_mutex );
98- m_q_status .wait (m_q_status_lock , [this ]() { return !m_reply_queue. empty (); });
119+ std::unique_lock<std::mutex> replies_lock (replies_changed_mutex );
120+ replies_changed .wait (replies_lock , [& ]() { return !replies_empty. load (); });
99121
100122 auto r = m_reply_queue.back ();
101123 m_reply_queue.pop ();
@@ -105,19 +127,19 @@ namespace cpp_redis {
105127 for (auto &m : r.Messages ) {
106128 try {
107129 std::string group_id = m.get_id ();
108- auto task = m_task_queue .find (group_id);
130+ auto task = m_callbacks .find (group_id);
109131 auto callback_container = task->second ;
110132
111133 auto callback = [&](const message_type &message) {
112134 auto response = callback_container.consumer_callback (message);
113- m_client->xack (m_stream, group_id, {m.get_id ()}, [&](const reply &r) {
135+ m_client->ack_client . xack (m_stream, group_id, {m.get_id ()}, [&](const reply &r) {
114136 if (r.is_integer ())
115137 callback_container.acknowledgement_callback (r.as_integer ());
116138 });
117- m_client->sync_commit ();
139+ m_client->ack_client . sync_commit ();
118140 return response;
119141 };
120- m_proc_queue ->dispatch (m, callback);
142+ m_dispatch_queue ->dispatch (m, callback);
121143 } catch (std::exception &exc) {
122144 __CPP_REDIS_LOG (1 , " Processing failed for message id: " + m.get_id () + " \n Details: " + exc.what ());
123145 throw exc;
@@ -126,4 +148,7 @@ namespace cpp_redis {
126148 }
127149 }
128150
151+ consumer_client_container::consumer_client_container () : ack_client(), poll_client() {
152+ }
153+
129154} // namespace cpp_redis
0 commit comments