Skip to content

Commit 82bb19a

Browse files
committed
added result logging for consumer queue
1 parent 7619ecc commit 82bb19a

File tree

9 files changed

+268
-172
lines changed

9 files changed

+268
-172
lines changed

examples/cpp_redis_consumer.cpp

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ main() {
5454
//! Enable logging
5555

5656
//const std::string group_name = "groupone";
57-
const std::vector<std::string> group_names = {"groupone", "grouptwo"};
57+
const std::vector<std::string> group_names = {"groupone"}; //, "grouptwo"};
5858
const std::string session_name = "sessone";
5959
const std::string consumer_name = "ABCD";
6060

@@ -69,14 +69,18 @@ main() {
6969
}
7070
});
7171

72+
sub.auth("LGdYFaAQzXG+NCzW3zgQUEbSPIn1M7Y6QbhBApYoZi8=");
73+
7274
for (auto &group : group_names) {
7375

7476
sub.subscribe(group,
7577
[group](const cpp_redis::message_type msg) {
78+
cpp_redis::consumer_response_t res;
7679
// Callback will run for each message obtained from the queue
7780
std::cout << "Group: " << group << std::endl;
7881
std::cout << "Id in the cb: " << msg.get_id() << std::endl;
79-
return msg;
82+
res.insert({"Id", msg.get_id()});
83+
return res;
8084
},
8185
[group](int ack_status) {
8286
// Callback will run upon return of xack

includes/cpp_redis/core/client.hpp

Lines changed: 135 additions & 135 deletions
Original file line numberDiff line numberDiff line change
@@ -42,19 +42,19 @@
4242

4343
namespace cpp_redis {
4444

45-
/**
46-
* cpp_redis::client is the class providing communication with a Redis server.
47-
* It is meant to be used for sending commands to the remote server and receiving its replies.
48-
* The client support asynchronous requests, as well as synchronous ones. Moreover, commands pipelining is supported.
49-
*
50-
*/
45+
/**
46+
* cpp_redis::client is the class providing communication with a Redis server.
47+
* It is meant to be used for sending commands to the remote server and receiving its replies.
48+
* The client support asynchronous requests, as well as synchronous ones. Moreover, commands pipelining is supported.
49+
*
50+
*/
5151
class client {
5252
public:
53-
/**
54-
* client type
55-
* used for client kill
56-
*
57-
*/
53+
/**
54+
* client type
55+
* used for client kill
56+
*
57+
*/
5858
enum class client_type {
5959
normal,
6060
master,
@@ -65,53 +65,53 @@ namespace cpp_redis {
6565
public:
6666
#ifndef __CPP_REDIS_USE_CUSTOM_TCP_CLIENT
6767

68-
/**
69-
* ctor
70-
*
71-
*/
68+
/**
69+
* ctor
70+
*
71+
*/
7272
client();
7373

7474
#endif /* __CPP_REDIS_USE_CUSTOM_TCP_CLIENT */
7575

76-
/**
77-
* custom ctor to specify custom tcp_client
78-
*
79-
* @param tcp_client tcp client to be used for network communications
80-
*
81-
*/
76+
/**
77+
* custom ctor to specify custom tcp_client
78+
*
79+
* @param tcp_client tcp client to be used for network communications
80+
*
81+
*/
8282
explicit client(const std::shared_ptr<network::tcp_client_iface> &tcp_client);
8383

84-
/**
85-
* dtor
86-
*
87-
*/
84+
/**
85+
* dtor
86+
*
87+
*/
8888
~client();
8989

90-
/**
91-
* copy ctor
92-
*
93-
*/
90+
/**
91+
* copy ctor
92+
*
93+
*/
9494
client(const client &) = delete;
9595

96-
/**
97-
* assignment operator
98-
*
99-
*/
96+
/**
97+
* assignment operator
98+
*
99+
*/
100100
client &operator=(const client &) = delete;
101101

102102
public:
103103

104-
/**
105-
* Connect to redis server
106-
*
107-
* @param host host to be connected to
108-
* @param port port to be connected to
109-
* @param connect_callback connect handler to be called on connect events (may be null)
110-
* @param timeout_ms maximum time to connect
111-
* @param max_reconnects maximum attempts of reconnection if connection dropped
112-
* @param reconnect_interval_ms time between two attempts of reconnection
113-
*
114-
*/
104+
/**
105+
* Connect to redis server
106+
*
107+
* @param host host to be connected to
108+
* @param port port to be connected to
109+
* @param connect_callback connect handler to be called on connect events (may be null)
110+
* @param timeout_ms maximum time to connect
111+
* @param max_reconnects maximum attempts of reconnection if connection dropped
112+
* @param reconnect_interval_ms time between two attempts of reconnection
113+
*
114+
*/
115115
void connect(
116116
const std::string &host = "127.0.0.1",
117117
std::size_t port = 6379,
@@ -120,114 +120,114 @@ namespace cpp_redis {
120120
std::int32_t max_reconnects = 0,
121121
std::uint32_t reconnect_interval_ms = 0);
122122

123-
/**
124-
* Connect to redis server
125-
*
126-
* @param name sentinel name
127-
* @param connect_callback connect handler to be called on connect events (may be null)
128-
* @param timeout_ms maximum time to connect
129-
* @param max_reconnects maximum attempts of reconnection if connection dropped
130-
* @param reconnect_interval_ms time between two attempts of reconnection
131-
*
132-
*/
123+
/**
124+
* Connect to redis server
125+
*
126+
* @param name sentinel name
127+
* @param connect_callback connect handler to be called on connect events (may be null)
128+
* @param timeout_ms maximum time to connect
129+
* @param max_reconnects maximum attempts of reconnection if connection dropped
130+
* @param reconnect_interval_ms time between two attempts of reconnection
131+
*
132+
*/
133133
void connect(
134134
const std::string &name,
135135
const connect_callback_t &connect_callback = nullptr,
136136
std::uint32_t timeout_ms = 0,
137137
std::int32_t max_reconnects = 0,
138138
std::uint32_t reconnect_interval_ms = 0);
139139

140-
/**
141-
* @return whether we are connected to the redis server
142-
*
143-
*/
140+
/**
141+
* @return whether we are connected to the redis server
142+
*
143+
*/
144144
bool is_connected() const;
145145

146-
/**
147-
* disconnect from redis server
148-
*
149-
* @param wait_for_removal when sets to true, disconnect blocks until the underlying TCP client has been effectively removed from the io_service and that all the underlying callbacks have completed.
150-
*
151-
*/
146+
/**
147+
* disconnect from redis server
148+
*
149+
* @param wait_for_removal when sets to true, disconnect blocks until the underlying TCP client has been effectively removed from the io_service and that all the underlying callbacks have completed.
150+
*
151+
*/
152152
void disconnect(bool wait_for_removal = false);
153153

154-
/**
155-
* @return whether an attempt to reconnect is in progress
156-
*
157-
*/
154+
/**
155+
* @return whether an attempt to reconnect is in progress
156+
*
157+
*/
158158
bool is_reconnecting() const;
159159

160-
/**
161-
* stop any reconnect in progress
162-
*
163-
*/
160+
/**
161+
* stop any reconnect in progress
162+
*
163+
*/
164164
void cancel_reconnect();
165165

166166
public:
167-
/**
168-
* reply callback called whenever a reply is received
169-
* takes as parameter the received reply
170-
*
171-
*/
167+
/**
168+
* reply callback called whenever a reply is received
169+
* takes as parameter the received reply
170+
*
171+
*/
172172
typedef std::function<void(reply &)> reply_callback_t;
173173

174-
/**
175-
* send the given command
176-
* the command is actually pipelined and only buffered, so nothing is sent to the network
177-
* please call commit() / sync_commit() to flush the buffer
178-
*
179-
* @param redis_cmd command to be sent
180-
* @param callback callback to be called on received reply
181-
* @return current instance
182-
*
183-
*/
174+
/**
175+
* send the given command
176+
* the command is actually pipelined and only buffered, so nothing is sent to the network
177+
* please call commit() / sync_commit() to flush the buffer
178+
*
179+
* @param redis_cmd command to be sent
180+
* @param callback callback to be called on received reply
181+
* @return current instance
182+
*
183+
*/
184184
client &send(const std::vector<std::string> &redis_cmd, const reply_callback_t &callback);
185185

186-
/**
187-
* same as the other send method
188-
* but future based: does not take any callback and return an std:;future to handle the reply
189-
*
190-
* @param redis_cmd command to be sent
191-
* @return std::future to handler redis reply
192-
*
193-
*/
186+
/**
187+
* same as the other send method
188+
* but future based: does not take any callback and return an std:;future to handle the reply
189+
*
190+
* @param redis_cmd command to be sent
191+
* @return std::future to handler redis reply
192+
*
193+
*/
194194
std::future<reply> send(const std::vector<std::string> &redis_cmd);
195195

196-
/**
197-
* Sends all the commands that have been stored by calling send() since the last commit() call to the redis server.
198-
* That is, pipelining is supported in a very simple and efficient way: client.send(...).send(...).send(...).commit() will send the 3 commands at once (instead of sending 3 network requests, one for each command, as it would have been done without pipelining).
199-
* Pipelined commands are always removed from the buffer, even in the case of an error (for example, calling commit while the client is not connected, something that throws an exception).
200-
* commit() works asynchronously: it returns immediately after sending the queued requests and replies are processed asynchronously.
201-
*
202-
* Please note that, while commit() can safely be called from inside a reply callback, calling sync_commit() from inside a reply callback is not permitted and will lead to undefined behavior, mostly deadlock.
203-
*
204-
*/
196+
/**
197+
* Sends all the commands that have been stored by calling send() since the last commit() call to the redis server.
198+
* That is, pipelining is supported in a very simple and efficient way: client.send(...).send(...).send(...).commit() will send the 3 commands at once (instead of sending 3 network requests, one for each command, as it would have been done without pipelining).
199+
* Pipelined commands are always removed from the buffer, even in the case of an error (for example, calling commit while the client is not connected, something that throws an exception).
200+
* commit() works asynchronously: it returns immediately after sending the queued requests and replies are processed asynchronously.
201+
*
202+
* Please note that, while commit() can safely be called from inside a reply callback, calling sync_commit() from inside a reply callback is not permitted and will lead to undefined behavior, mostly deadlock.
203+
*
204+
*/
205205
client &commit();
206206

207-
/**
208-
* same as commit(), but synchronous
209-
* will block until all pending commands have been sent and that a reply has been received for each of them and all underlying callbacks completed
210-
*
211-
* @return current instance
212-
*
213-
*/
207+
/**
208+
* same as commit(), but synchronous
209+
* will block until all pending commands have been sent and that a reply has been received for each of them and all underlying callbacks completed
210+
*
211+
* @return current instance
212+
*
213+
*/
214214
client &sync_commit();
215215

216-
/**
217-
* same as sync_commit, but with a timeout
218-
* will simply block until it completes or timeout expires
219-
*
220-
* @return current instance
221-
*
222-
*/
216+
/**
217+
* same as sync_commit, but with a timeout
218+
* will simply block until it completes or timeout expires
219+
*
220+
* @return current instance
221+
*
222+
*/
223223
template<class Rep, class Period>
224224
client &
225225
sync_commit(const std::chrono::duration<Rep, Period> &timeout) {
226-
/**
227-
* no need to call commit in case of reconnection
228-
* the reconnection flow will do it for us
229-
*
230-
*/
226+
/**
227+
* no need to call commit in case of reconnection
228+
* the reconnection flow will do it for us
229+
*
230+
*/
231231
if (!is_reconnecting()) {
232232
try_commit();
233233
}
@@ -245,22 +245,22 @@ namespace cpp_redis {
245245
}
246246

247247
private:
248-
/**
249-
* @return whether a reconnection attempt should be performed
250-
*
251-
*/
248+
/**
249+
* @return whether a reconnection attempt should be performed
250+
*
251+
*/
252252
bool should_reconnect() const;
253253

254-
/**
255-
* resend all pending commands that failed to be sent due to disconnection
256-
*
257-
*/
254+
/**
255+
* resend all pending commands that failed to be sent due to disconnection
256+
*
257+
*/
258258
void resend_failed_commands();
259259

260-
/**
261-
* sleep between two reconnect attempts if necessary
262-
*
263-
*/
260+
/**
261+
* sleep between two reconnect attempts if necessary
262+
*
263+
*/
264264
void sleep_before_next_reconnect_attempt();
265265

266266
/**

0 commit comments

Comments
 (0)