diff --git a/section6/CMakeLists.txt b/section6/CMakeLists.txt new file mode 100644 index 0000000..7254f25 --- /dev/null +++ b/section6/CMakeLists.txt @@ -0,0 +1,8 @@ +add_executable(client client.cpp) + +add_executable(srv srv.cpp) + +add_library(section5 client.cpp conf.lua Config.hpp cpplang.hpp hello.hpp SalesData.hpp SpinLock.hpp srv.cpp Summary.hpp Zmq.hpp) +install (TARGETS section5 DESTINATION bin) + +install (FILES client.cpp conf.lua Config.hpp cpplang.hpp hello.hpp SalesData.hpp SpinLock.hpp srv.cpp Summary.hpp Zmq.hpp DESTINATION include) diff --git a/section6/Config.hpp b/section6/Config.hpp new file mode 100644 index 0000000..4513c75 --- /dev/null +++ b/section6/Config.hpp @@ -0,0 +1,74 @@ +// Copyright (c) 2020 by Chrono + +#ifndef _CONFIG_HPP +#define _CONFIG_HPP + +#include "cpplang.hpp" + +extern "C" { +#include +#include +#include +} + +#include + +BEGIN_NAMESPACE(cpp_study) + +class Config final +{ +public: + using vm_type = std::shared_ptr; + using value_type = luabridge::LuaRef; + + using string_type = std::string; + using string_view_type = const std::string&; + using regex_type = std::regex; + using match_type = std::smatch; +public: + Config() noexcept + { + assert(m_vm); + + luaL_openlibs(m_vm.get()); + } + + ~Config() = default; +public: + void load(string_view_type filename) const + { + auto status = luaL_dofile(m_vm.get(), filename.c_str()); + + if (status != 0) { + throw std::runtime_error("failed to parse config"); + } + } + + template + T get(string_view_type key) const + { + if (!std::regex_match(key, m_what, m_reg)) { + throw std::runtime_error("config key error"); + } + + auto w1 = m_what[1].str(); + auto w2 = m_what[2].str(); + + using namespace luabridge; + + auto v = getGlobal( + m_vm.get(), w1.c_str()); + + return LuaRef_cast(v[w2]); + } +private: + vm_type m_vm {luaL_newstate(), lua_close}; + + const regex_type m_reg {R"(^(\w+)\.(\w+)$)"}; + mutable match_type m_what; +}; + +END_NAMESPACE(cpp_study) + +#endif //_CONFIG_HPP + diff --git a/section6/SalesData.hpp b/section6/SalesData.hpp new file mode 100644 index 0000000..c0aae91 --- /dev/null +++ b/section6/SalesData.hpp @@ -0,0 +1,125 @@ +// Copyright (c) 2020 by Chrono + +#ifndef _SALES_DATA_HPP +#define _SALES_DATA_HPP + +#include "cpplang.hpp" + +#include + +#if MSGPACK_VERSION_MAJOR < 2 +# error "msgpack is too old" +#endif + +BEGIN_NAMESPACE(cpp_study) + +// demo oop in C++ +class SalesData final +{ +public: + using this_type = SalesData; + +public: + using string_type = std::string; + using string_view_type = const std::string&; + using uint_type = unsigned int; + using currency_type = double; + + STATIC_ASSERT(sizeof(uint_type) >= 4); + STATIC_ASSERT(sizeof(currency_type) >= 4); +public: + SalesData(string_view_type id, uint_type s, currency_type r) noexcept + : m_id(id), m_sold(s), m_revenue(r) + {} + + SalesData(string_view_type id) noexcept + : SalesData(id, 0, 0) + {} + +public: +#if 0 + SalesData(SalesData&& s) noexcept + : m_id(std::move(s.m_id)), + m_sold(std::move(s.m_sold)), + m_revenue(std::move(s.m_revenue)) + {} + + SalesData& operator=(SalesData&& s) noexcept + { + m_id = std::move(s.m_id); + m_sold = std::move(s.m_sold); + m_revenue = std::move(s.m_revenue); + + return *this; + } +#endif + + SalesData() = default; + ~SalesData() = default; + + SalesData(const this_type&) = default; + SalesData& operator=(const this_type&) = default; + + SalesData(this_type&& s) = default; + SalesData& operator=(this_type&& s) = default; + +private: + string_type m_id = ""; + uint_type m_sold = 0; + uint_type m_revenue = 0; + +public: + MSGPACK_DEFINE(m_id, m_sold, m_revenue); + + msgpack::sbuffer pack() const + { + msgpack::sbuffer sbuf; + msgpack::pack(sbuf, *this); + + return sbuf; + } + + SalesData(const msgpack::sbuffer& sbuf) + { + auto obj = msgpack::unpack( + sbuf.data(), sbuf.size()).get(); + obj.convert(*this); + } + +public: + void inc_sold(uint_type s) noexcept + { + m_sold += s; + } + + void inc_revenue(currency_type r) noexcept + { + m_revenue += r; + } +public: + string_view_type id() const noexcept + { + return m_id; + } + + uint_type sold() const noexcept + { + return m_sold; + } + + currency_type revenue() const noexcept + { + return m_revenue; + } + + CPP_DEPRECATED + currency_type average() const + { + return m_revenue / m_sold; + } +}; + +END_NAMESPACE(cpp_study) + +#endif //_SALES_DATA_HPP + diff --git a/section6/SpinLock.hpp b/section6/SpinLock.hpp new file mode 100644 index 0000000..5a2aa50 --- /dev/null +++ b/section6/SpinLock.hpp @@ -0,0 +1,78 @@ +// Copyright (c) 2020 by Chrono + +#ifndef _SPIN_LOCK_HPP +#define _SPIN_LOCK_HPP + +#include "cpplang.hpp" + +BEGIN_NAMESPACE(cpp_study) + +// atomic spinlock with TAS +class SpinLock final +{ +public: + using this_type = SpinLock; + using atomic_type = std::atomic_flag; + +public: + SpinLock() = default; + ~SpinLock() = default; + + SpinLock(const this_type&) = delete; + SpinLock& operator=(const this_type&) = delete; +public: + void lock() noexcept + { + for(;;) { + if (!m_lock.test_and_set()) { + return; + } + + std::this_thread::yield(); + } + } + + bool try_lock() noexcept + { + return !m_lock.test_and_set(); + } + + void unlock() noexcept + { + m_lock.clear(); + } +private: + atomic_type m_lock {false}; +}; + +// RAII for lock +// you can change it to a template class +class SpinLockGuard final +{ +public: + using this_type = SpinLockGuard; + using spin_lock_type = SpinLock; +public: + SpinLockGuard(spin_lock_type& lock) noexcept + : m_lock(lock) + { + m_lock.lock(); + } + + ~SpinLockGuard() noexcept + { + m_lock.unlock(); + } + +public: + SpinLockGuard(const this_type&) = delete; + SpinLockGuard& operator=(const this_type&) = delete; +private: + spin_lock_type& m_lock; +}; + + +END_NAMESPACE(cpp_study) + +#endif //_SPIN_LOCK_HPP + diff --git a/section6/Summary.hpp b/section6/Summary.hpp new file mode 100644 index 0000000..2f13221 --- /dev/null +++ b/section6/Summary.hpp @@ -0,0 +1,82 @@ +// Copyright (c) 2020 by Chrono + +#ifndef _SUMMARY_HPP +#define _SUMMARY_HPP + +#include "cpplang.hpp" +#include "SalesData.hpp" +#include "SpinLock.hpp" + +BEGIN_NAMESPACE(cpp_study) + +class Summary final +{ +public: + using this_type = Summary; +public: + using sales_type = SalesData; + using lock_type = SpinLock; + using lock_guard_type = SpinLockGuard; + + using string_type = std::string; + using map_type = + std::map; + //std::unordered_map; + using minmax_sales_type = + std::pair; +public: + Summary() = default; + ~Summary() = default; + + Summary(const this_type&) = delete; + Summary& operator=(const this_type&) = delete; +private: + mutable lock_type m_lock; + map_type m_sales; +public: + void add_sales(const sales_type& s) + { + lock_guard_type guard(m_lock); + + const auto& id = s.id(); + + // not found + if (m_sales.find(id) == m_sales.end()) { + m_sales[id] = s; + return; + } + + // found + // you could use iter to optimize it + m_sales[id].inc_sold(s.sold()); + m_sales[id].inc_revenue(s.revenue()); + } + + minmax_sales_type minmax_sales() const + { + lock_guard_type guard(m_lock); + + if (m_sales.empty()) { + return minmax_sales_type(); + } + + // algorithm + auto ret = std::minmax_element( + std::begin(m_sales), std::end(m_sales), + [](const auto& a, const auto& b) + { + return a.second.sold() < b.second.sold(); + }); + + // min max + auto min_pos = ret.first; + auto max_pos = ret.second; + + return {min_pos->second.id(), max_pos->second.id()}; + } +}; + +END_NAMESPACE(cpp_study) + +#endif //_SUMMARY_HPP + diff --git a/section6/TcpConnect.hpp b/section6/TcpConnect.hpp new file mode 100644 index 0000000..771e5f8 --- /dev/null +++ b/section6/TcpConnect.hpp @@ -0,0 +1,7 @@ + +class TcpConnection : + public boost::enable_shared_from_this{ + + + +}; diff --git a/section6/TcpConnect.hpp~ b/section6/TcpConnect.hpp~ new file mode 100644 index 0000000..d9cffdc --- /dev/null +++ b/section6/TcpConnect.hpp~ @@ -0,0 +1,5 @@ + +class TcpConnection : + public boost::enable_shared_from_this{ + +}; diff --git a/section6/Zmq.hpp b/section6/Zmq.hpp new file mode 100644 index 0000000..c6f7e0c --- /dev/null +++ b/section6/Zmq.hpp @@ -0,0 +1,63 @@ +// Copyright (c) 2020 by Chrono + +#ifndef _ZMQ_HPP +#define _ZMQ_HPP + +#include "cpplang.hpp" + +// /usr/include/zmq.hpp +#include + +BEGIN_NAMESPACE(cpp_study) + +using zmq_context_type = zmq::context_t; +using zmq_socket_type = zmq::socket_t; +using zmq_message_type = zmq::message_t; + +template +class ZmqContext final +{ +#if 0 +public: + using zmq_context_type = zmq::context_t; + using zmq_socket_type = zmq::socket_t; + using zmq_message_type = zmq::message_t; +#endif +public: + ZmqContext() = default; + ~ZmqContext() = default; +public: + static + zmq_context_type& context() + { + static zmq_context_type ctx(thread_num); + return ctx; + } +public: + static + zmq_socket_type recv_sock(int hwm = 1000, int linger = 10) + { + zmq_socket_type sock(context(), ZMQ_PULL); + + sock.setsockopt(ZMQ_RCVHWM, hwm); + sock.setsockopt(ZMQ_LINGER, linger); // wait for 10ms + + return sock; + } + + static + zmq_socket_type send_sock(int hwm = 1000, int linger = 10) + { + zmq_socket_type sock(context(), ZMQ_PUSH); + + sock.setsockopt(ZMQ_SNDHWM, hwm); + sock.setsockopt(ZMQ_LINGER, linger); // wait for 10ms + + return sock; + } +}; + +END_NAMESPACE(cpp_study) + +#endif //_ZMQ_HPP + diff --git a/section6/client.cpp b/section6/client.cpp new file mode 100644 index 0000000..f6e1bb8 --- /dev/null +++ b/section6/client.cpp @@ -0,0 +1,97 @@ +// Copyright (c) 2020 by Chrono +// +// g++ client.cpp -std=c++14 -I../common -I../common/include -lzmq -lpthread -o c.out;./c.out +// g++ client.cpp -std=c++14 -I../common -I../common/include -lzmq -lpthread -g -O0 -o c.out +// g++ client.cpp -std=c++14 -I../common -I../common/include -lzmq -lpthread -g -O0 -o c.out;./c.out + +//#include + +#include "cpplang.hpp" +#include "SalesData.hpp" +#include "Zmq.hpp" + +// you should put json.hpp in ../common +#include "json.hpp" + +USING_NAMESPACE(std); +USING_NAMESPACE(cpp_study); + +static +auto debug_print = [](auto& b) +{ + using json_t = nlohmann::json; + + json_t j; + + j["id"] = b.id(); + j["sold"] = b.sold(); + j["revenue"] = b.revenue(); + //j["average"] = b.average(); + + std::cout << j.dump(2) << std::endl; +}; + +// sales data +static +auto make_sales = [=](const auto& id, auto s, auto r) +//-> msgpack::sbuffer +{ + return SalesData(id, s, r).pack(); + +#if 0 + SalesData book(id); + + book.inc_sold(s); + book.inc_revenue(r); + + debug_print(book); + + auto buf = book.pack(); + cout << buf.size() << endl; + + //SalesData book2 {buf}; + //assert(book.id() == book2.id()); + //debug_print(book2); + + return buf; +#endif +}; + +// zmq send +static +auto send_sales = [](const auto& addr, const auto& buf) +{ + using zmq_ctx = ZmqContext<1>; + + auto sock = zmq_ctx::send_sock(); + + sock.connect(addr); + assert(sock.connected()); + + auto len = sock.send(buf.data(), buf.size()); + assert(len == buf.size()); + + cout << "send len = " << len << endl; +}; + +int main() +try +{ + cout << "hello cpp_study client" << endl; + + //auto buf = make_sales("001", 10, 100); + //send_sales("tcp://127.0.0.1:5555", buf); + + send_sales("tcp://127.0.0.1:5555", + make_sales("001", 10, 100)); + + std::this_thread::sleep_for(100ms); + + send_sales("tcp://127.0.0.1:5555", + make_sales("002", 20, 200)); + +} +catch(std::exception& e) +{ + std::cerr << e.what() << std::endl; +} diff --git a/section6/conf.lua b/section6/conf.lua new file mode 100644 index 0000000..8668ee9 --- /dev/null +++ b/section6/conf.lua @@ -0,0 +1,37 @@ +-- copyright (c) 2020 chrono + +-- lua is more flexible than json +-- you can use comment/expression +-- and more lua pragram + +config = { + + -- should be same as client + -- you could change it to ipc + zmq_ipc_addr = "tcp://127.0.0.1:5555", + + -- see http_study's lua code + http_addr = "http://localhost/cpp_study?token=cpp@2020", + + time_interval = 5, -- seconds + + keyword = "super", + + sold_criteria = 100, + + revenue_criteria = 1000, + + best_n = 3, + + max_buf_size = 4 * 1024, + +} + +-- more config +others = { + -- add more +} + +-- debug: luajit conf.lua + +--print(config.http_addr) diff --git a/section6/cpplang.hpp b/section6/cpplang.hpp new file mode 100644 index 0000000..33f8b4f --- /dev/null +++ b/section6/cpplang.hpp @@ -0,0 +1,53 @@ +// Copyright (c) 2020 by Chrono + +#ifndef _CPP_LANG_HPP +#define _CPP_LANG_HPP + +#include + +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +//never 'using namespace std;' in c++ header + +// must be C++11 or later +#if __cplusplus < 201103 +# error "C++ is too old" +#endif // __cplusplus < 201103 + +// [[deprecated]] +#if __cplusplus >= 201402 +# define CPP_DEPRECATED [[deprecated]] +#else +# define CPP_DEPRECATED [[gnu::deprecated]] +#endif // __cplusplus >= 201402 + +// static_assert +#if __cpp_static_assert >= 201411 +# define STATIC_ASSERT(x) static_assert(x) +#else +# define STATIC_ASSERT(x) static_assert(x, #x) +#endif + +// macro for convienient namespace +#define BEGIN_NAMESPACE(x) namespace x { +#define END_NAMESPACE(x) } +#define USING_NAMESPACE(x) using namespace x + +//static_assert( +// __GNUC__ >= 4, "GCC is too old"); + +#endif //_CPP_LANG_HPP + diff --git a/section6/hello.hpp b/section6/hello.hpp new file mode 100644 index 0000000..f04d735 --- /dev/null +++ b/section6/hello.hpp @@ -0,0 +1,17 @@ +// Copyright (c) 2020 by Chrono + +// this is a template source for convient +// change 'HELLO' to your own macro identifier +#ifndef _HELLO_HPP +#define _HELLO_HPP + +#include "cpplang.hpp" + +BEGIN_NAMESPACE(cpp_study) + +// todo: your code are here + +END_NAMESPACE(cpp_study) + +#endif //_HELLO_HPP + diff --git a/section6/hwClient.cpp b/section6/hwClient.cpp new file mode 100644 index 0000000..5d2e068 --- /dev/null +++ b/section6/hwClient.cpp @@ -0,0 +1,57 @@ +#include +#include +#include + +class PeerNode{ + zmq::context_t context (1); + zmq::socket_t socket (context, ZMQ_REQ); + PeerNode(int i){ + int port = 5555+id; + socket.connect ("tcp://localhost:"+string(port)); + } + void send_request(zmq::message_t & request ){ + socket.send (request); + zmq::message_t reply; + socket.recv (&reply); + std::cout << "Received World " << handle_reply(reply) << std::endl; + } + +}; + + +class Node{ + map peer; + zmq::context_t context (1); + zmq::socket_t socket (context, ZMQ_REP); + + Node(int id){ + int port = 5555+id; + socket.bind ("tcp://*:"+string(port)); + for(int i=0; i< size; i++){ + if (i== id){ + continue; + } + peers[i] = PeerNode(i); + } + } + + void recv_request(zmq::message_t & request){ + socket.recv (&request); + auto reply = make_response(request); + socket.send (reply); + } + void start(){ + while(true){ + zmq::message_t request; + recv_request(request); + } + } + +}; + +int main (){ + // Prepare our context and socket + Node node(1)); + node.start(); + return 0; +} diff --git a/section6/hwServer.cpp b/section6/hwServer.cpp new file mode 100644 index 0000000..b759ead --- /dev/null +++ b/section6/hwServer.cpp @@ -0,0 +1,34 @@ +#include +#include +#include +#ifndef _WIN32 +#include +#else +#include + +#define sleep(n) Sleep(n) +#endif + +int main () { + // Prepare our context and socket + zmq::context_t context (1); + zmq::socket_t socket (context, ZMQ_REP); + socket.bind ("tcp://*:5555"); + + while (true) { + zmq::message_t request; + + // Wait for next request from client + socket.recv (&request); + std::cout << "Received Hello" << std::endl; + + // Do some 'work' + sleep(1); + + // Send reply back to client + zmq::message_t reply (5); + memcpy (reply.data (), "World", 5); + + } + return 0; +} diff --git a/section6/main.cpp b/section6/main.cpp new file mode 100644 index 0000000..c935d3f --- /dev/null +++ b/section6/main.cpp @@ -0,0 +1,22 @@ +#include +#include +#include + +namespace asio = boost::asio; + +int main(int argc, char** argv) { + asio::io_service ios; + azmq::sub_socket subscriber(ios); + subscriber.connect("tcp://192.168.55.112:5556"); + subscriber.connect("tcp://192.168.55.201:7721"); + subscriber.set_option(azmq::socket::subscribe("NASDAQ")); + + azmq::pub_socket publisher(ios); + publisher.bind("ipc://nasdaq-feed"); + + std::array buf; + for (;;) { + auto size = subscriber.receive(asio::buffer(buf)); + publisher.send(asio::buffer(buf)); + } + return 0; diff --git a/section6/node.cpp b/section6/node.cpp new file mode 100644 index 0000000..58f6294 --- /dev/null +++ b/section6/node.cpp @@ -0,0 +1,170 @@ +// Asynchronous client-to-server (DEALER to ROUTER) +// +// While this example runs in a single process, that is to make +// it easier to start and stop the example. Each task has its own +// context and conceptually acts as a separate process. + +#include +#include +#include +#include + +#include +#include "zhelpers.hpp" + +// This is our client task class. +// It connects to the server, and then sends a request once per second +// It collects responses as they arrive, and it prints them out. We will +// run several client tasks in parallel, each with a different random ID. +// Attention! -- this random work well only on linux. + +class client_task { +public: + client_task() + : ctx_(1), + client_socket_(ctx_, ZMQ_DEALER) + {} + + void start() { + // generate random identity + char identity[10] = {}; + sprintf(identity, "%04X-%04X", within(0x10000), within(0x10000)); + printf("%s\n", identity); + client_socket_.setsockopt(ZMQ_IDENTITY, identity, strlen(identity)); + client_socket_.connect("tcp://localhost:5570"); + + zmq::pollitem_t items[] = { + { static_cast(client_socket_), 0, ZMQ_POLLIN, 0 } }; + int request_nbr = 0; + try { + while (true) { + for (int i = 0; i < 100; ++i) { + // 10 milliseconds + zmq::poll(items, 1, 10); + if (items[0].revents & ZMQ_POLLIN) { + printf("\n%s ", identity); + s_dump(client_socket_); + } + } + char request_string[16] = {}; + sprintf(request_string, "request #%d", ++request_nbr); + client_socket_.send(request_string, strlen(request_string)); + } + } + catch (std::exception &e) {} + } + +private: + zmq::context_t ctx_; + zmq::socket_t client_socket_; +}; + +// Each worker task works on one request at a time and sends a random number +// of replies back, with random delays between replies: + +class server_worker { +public: + server_worker(zmq::context_t &ctx, int sock_type, int id) + : ctx_(ctx), + worker_(ctx_, sock_type) + {} + + void work() { + worker_.connect("inproc://backend"+str(id))); + + try { + while (true) { + zmq::message_t identity; + zmq::message_t msg; + + zmq::message_t copied_id; + zmq::message_t copied_msg; + worker_.recv(&identity); + worker_.recv(&msg); + + int replies = within(5); + for (int reply = 0; reply < replies; ++reply) { + s_sleep(within(1000) + 1); + copied_id.copy(&identity); + copied_msg.copy(&msg); + worker_.send(copied_id, ZMQ_SNDMORE); + worker_.send(copied_msg); + } + } + } + catch (std::exception &e) {} + } + +private: + zmq::context_t &ctx_; + zmq::socket_t worker_; +}; + +// This is our server task. +// It uses the multithreaded server model to deal requests out to a pool +// of workers and route replies back to clients. One worker can handle +// one request at a time but one client can talk to multiple workers at +// once. + +class server_task { +public: + server_task() + : ctx_(1), + frontend_(ctx_, ZMQ_ROUTER), + backend_(ctx_, ZMQ_DEALER) + {} + + enum { kMaxThread = 5 }; + + void run() { + int port = 5570+m_id; + frontend_.bind("tcp://*:"+str(port); + backend_.bind("inproc://backend"+str(id))); + + std::vector worker; + std::vector worker_thread; + for (int i = 0; i < kMaxThread; ++i) { + worker.push_back(new server_worker(ctx_, ZMQ_DEALER)); + + worker_thread.push_back(new std::thread(std::bind(&server_worker::work, worker))); + worker_thread->detach(); + } + std::vector clients; + std::vector client_thread; + + client_task ct1; + client_task ct2; + client_task ct3; + std::thread t1(std::bind(&client_task::start, &ct1)); + std::thread t2(std::bind(&client_task::start, &ct2)); + std::thread t3(std::bind(&client_task::start, &ct3)); + t1.detach(); + t2.detach(); + t3.detach(); + + try { + zmq::proxy(static_cast(frontend_), + static_cast(backend_), + nullptr); + } + catch (std::exception &e) {} + + for (int i = 0; i < kMaxThread; ++i) { + delete worker; + delete worker_thread; + } + } + +private: + zmq::context_t ctx_; + zmq::socket_t frontend_; + zmq::socket_t backend_; +}; + +// The main thread simply starts several clients and a server, and then +// waits for the server to finish. + +int main (void){ + server_task st(2, 5); + st.run(); +} diff --git a/section6/srv.cpp b/section6/srv.cpp new file mode 100644 index 0000000..11db6b8 --- /dev/null +++ b/section6/srv.cpp @@ -0,0 +1,139 @@ +// Copyright (c) 2020 by Chrono +// +// g++ srv.cpp -std=c++14 -I../common -I../common/include -I/usr/local/include/luajit-2.1 -lluajit-5.1 -ldl -lzmq -lpthread -lcpr -lcurl -o a.out;./a.out +// g++ srv.cpp -std=c++14 -I../common -I../common/include -I/usr/local/include/luajit-2.1 -lluajit-5.1 -ldl -lzmq -lpthread -lcpr -lcurl -g -O0 -o a.out +// g++ srv.cpp -std=c++14 -I../common -I../common/include -I/usr/local/include/luajit-2.1 -lluajit-5.1 -ldl -lzmq -lpthread -lcpr -lcurl -g -O0 -o a.out;./a.out + +//#include + +#include "cpplang.hpp" +#include "Summary.hpp" +#include "Zmq.hpp" +#include "Config.hpp" + +// you should put json.hpp in ../common +#include "json.hpp" + +#include +#include + +USING_NAMESPACE(std); +USING_NAMESPACE(cpp_study); + +static +auto debug_print = [](auto& b) +{ + using json_t = nlohmann::json; + + json_t j; + + j["id"] = b.id(); + j["sold"] = b.sold(); + j["revenue"] = b.revenue(); + //j["average"] = b.average(); + + std::cout << j.dump(2) << std::endl; +}; + +int main() +try{ + cout << "hello cpp_study server" << endl; + + Config conf; + conf.load("./conf.lua"); + + Summary sum; + std::atomic_int count {0}; + + // todo: try-catch + auto recv_cycle = [&]() + { + using zmq_ctx = ZmqContext<1>; + + // zmq recv + + auto sock = zmq_ctx::recv_sock(); + + sock.bind(conf.get("config.zmq_ipc_addr")); + assert(sock.connected()); + + for(;;) { + + // xxx : shared_ptr/unique_ptr + auto msg_ptr = std::make_shared(); + + sock.recv(msg_ptr.get()); + + ++count; + cout << count << endl; + //printf("count = %d\n", static_cast(count)); + + // async process msg + + // todo: try-catch + //auto f = std::async(std::launch::async, + std::thread( + [&sum, msg_ptr]() + //[&sum, &count](decltype(msg_ptr) ptr) + { + //cout << ptr.unique() << endl; + + SalesData book; + + // xxx: json/msgpack/protobuf + auto obj = msgpack::unpack( + msg_ptr->data(), msg_ptr->size()).get(); + obj.convert(book); + + //cout << book.id() << endl; + //debug_print(book); + + sum.add_sales(book); + }).detach(); // async + } // for(;;) + }; // recv_cycle lambda + + auto log_cycle = [&]() + { + auto http_addr = conf.get("config.http_addr"); + auto time_interval = conf.get("config.time_interval"); + + for(;;) { + std::this_thread::sleep_for(time_interval * 1s); + //cout << "log_cycle" << endl; + + //auto info = sum.minmax_sales(); + //cout << "log_cycle get info" << endl; + + using json_t = nlohmann::json; + + json_t j; + + j["count"] = static_cast(count); + j["minmax"] = sum.minmax_sales();//{info.first, info.second}; + + auto res = cpr::Post( + cpr::Url{http_addr}, + cpr::Body{j.dump()}, + cpr::Timeout{200ms} + ); + + if (res.status_code != 200) { + cerr << "http post failed" << endl; + //printf("http post failed\n"); + } + } // for(;;) + }; // log_cycle lambda + + // launch log_cycle + auto fu1 = std::async(std::launch::async, log_cycle); + + // launch recv_cycle then wait + auto fu2 = std::async(std::launch::async, recv_cycle); + + fu2.wait(); +} +catch(std::exception& e) +{ + std::cerr << e.what() << std::endl; +} diff --git a/section6/srv.cpp~ b/section6/srv.cpp~ new file mode 100644 index 0000000..d33bee8 --- /dev/null +++ b/section6/srv.cpp~ @@ -0,0 +1,141 @@ +// Copyright (c) 2020 by Chrono +// +// g++ srv.cpp -std=c++14 -I../common -I../common/include -I/usr/local/include/luajit-2.1 -lluajit-5.1 -ldl -lzmq -lpthread -lcpr -lcurl -o a.out;./a.out +// g++ srv.cpp -std=c++14 -I../common -I../common/include -I/usr/local/include/luajit-2.1 -lluajit-5.1 -ldl -lzmq -lpthread -lcpr -lcurl -g -O0 -o a.out +// g++ srv.cpp -std=c++14 -I../common -I../common/include -I/usr/local/include/luajit-2.1 -lluajit-5.1 -ldl -lzmq -lpthread -lcpr -lcurl -g -O0 -o a.out;./a.out + +//#include + +#include "cpplang.hpp" +#include "Summary.hpp" +#include "Zmq.hpp" +#include "Config.hpp" + +// you should put json.hpp in ../common +#include "json.hpp" + +#include +#include + +USING_NAMESPACE(std); +USING_NAMESPACE(cpp_study); + +static +auto debug_print = [](auto& b) +{ + using json_t = nlohmann::json; + + json_t j; + + j["id"] = b.id(); + j["sold"] = b.sold(); + j["revenue"] = b.revenue(); + //j["average"] = b.average(); + + std::cout << j.dump(2) << std::endl; +}; + +int main() +try +{ + cout << "hello cpp_study server" << endl; + + Config conf; + conf.load("./conf.lua"); + + Summary sum; + std::atomic_int count {0}; + + // todo: try-catch + auto recv_cycle = [&]() + { + using zmq_ctx = ZmqContext<1>; + + // zmq recv + + auto sock = zmq_ctx::recv_sock(); + + sock.bind(conf.get("config.zmq_ipc_addr")); + assert(sock.connected()); + + for(;;) { + + // xxx : shared_ptr/unique_ptr + auto msg_ptr = std::make_shared(); + + sock.recv(msg_ptr.get()); + //cout << msg_ptr->size() << endl; + + ++count; + cout << count << endl; + //printf("count = %d\n", static_cast(count)); + + // async process msg + + // todo: try-catch + //auto f = std::async(std::launch::async, + std::thread( + [&sum, msg_ptr]() + //[&sum, &count](decltype(msg_ptr) ptr) + { + //cout << ptr.unique() << endl; + + SalesData book; + + // xxx: json/msgpack/protobuf + auto obj = msgpack::unpack( + msg_ptr->data(), msg_ptr->size()).get(); + obj.convert(book); + + //cout << book.id() << endl; + //debug_print(book); + + sum.add_sales(book); + }).detach(); // async + } // for(;;) + }; // recv_cycle lambda + + auto log_cycle = [&]() + { + auto http_addr = conf.get("config.http_addr"); + auto time_interval = conf.get("config.time_interval"); + + for(;;) { + std::this_thread::sleep_for(time_interval * 1s); + //cout << "log_cycle" << endl; + + //auto info = sum.minmax_sales(); + //cout << "log_cycle get info" << endl; + + using json_t = nlohmann::json; + + json_t j; + + j["count"] = static_cast(count); + j["minmax"] = sum.minmax_sales();//{info.first, info.second}; + + auto res = cpr::Post( + cpr::Url{http_addr}, + cpr::Body{j.dump()}, + cpr::Timeout{200ms} + ); + + if (res.status_code != 200) { + cerr << "http post failed" << endl; + //printf("http post failed\n"); + } + } // for(;;) + }; // log_cycle lambda + + // launch log_cycle + auto fu1 = std::async(std::launch::async, log_cycle); + + // launch recv_cycle then wait + auto fu2 = std::async(std::launch::async, recv_cycle); + + fu2.wait(); +} +catch(std::exception& e) +{ + std::cerr << e.what() << std::endl; +}