From 5652a7377f6eafd2324e7bdd6909b4c7f26f66b8 Mon Sep 17 00:00:00 2001 From: Elliott Sales de Andrade Date: Tue, 22 Jan 2019 00:28:36 -0500 Subject: [PATCH] Update to latest version of libzmq and cppzmq. --- .gitignore | 1 + 1260.patch | 25 - 1574.patch | 27 - sources | 2 +- zeromq.spec | 39 +- zmq.hpp | 1543 +++++++++++++++++++++++++++++++-------------------- 6 files changed, 959 insertions(+), 678 deletions(-) delete mode 100644 1260.patch delete mode 100644 1574.patch diff --git a/.gitignore b/.gitignore index 2ba455d..5f933d7 100644 --- a/.gitignore +++ b/.gitignore @@ -7,3 +7,4 @@ /x86_64/ /i?86/ /tarballs/ +/libzmq-4.3.1.tar.gz diff --git a/1260.patch b/1260.patch deleted file mode 100644 index bc70ec5..0000000 --- a/1260.patch +++ /dev/null @@ -1,25 +0,0 @@ -From 32b2d3034b04a54118bc95c3f83ea5af78f9de41 Mon Sep 17 00:00:00 2001 -From: Nikolay Amiantov -Date: Thu, 20 Nov 2014 05:08:45 +0300 -Subject: [PATCH] Fix test_filter_ipc for cleared supplementary groups - -This should fix part of [https://github.com/zeromq/libzmq/issues/1129]. ---- - tests/test_filter_ipc.cpp | 4 ++-- - 1 file changed, 2 insertions(+), 2 deletions(-) - -diff --git a/tests/test_filter_ipc.cpp b/tests/test_filter_ipc.cpp -index 00518f710..83035949b 100644 ---- a/tests/test_filter_ipc.cpp -+++ b/tests/test_filter_ipc.cpp -@@ -122,8 +122,8 @@ int main (void) - // Get the group and supplimental groups of the process owner - gid_t groups[100]; - int ngroups = getgroups(100, groups); -- assert (ngroups != -1 && ngroups != 0); -- gid_t group = getgid(), supgroup = groups[0], notgroup = groups[ngroups - 1] + 1; -+ assert (ngroups != -1); -+ gid_t group = getgid(), supgroup = group, notgroup = group + 1; - for (int i = 0; i < ngroups; i++) { - if (supgroup == group && group != groups[i]) - supgroup = groups[i]; diff --git a/1574.patch b/1574.patch deleted file mode 100644 index cbce755..0000000 --- a/1574.patch +++ /dev/null @@ -1,27 +0,0 @@ -From 25a50ca0d5d3ea076c63c23692e4cb9868bfb55d Mon Sep 17 00:00:00 2001 -From: KIU Shueng Chuan -Date: Tue, 8 Sep 2015 17:26:31 +0800 -Subject: [PATCH] avoid dereferencing uint32_t on unaligned address - ---- - src/socket_base.cpp | 21 +++++---------------- - 1 file changed, 5 insertions(+), 16 deletions(-) - -diff --git a/src/socket_base.cpp b/src/socket_base.cpp -index a980015fc..ea178a868 100644 ---- a/src/socket_base.cpp -+++ b/src/socket_base.cpp -@@ -1361,8 +1361,11 @@ void zmq::socket_base_t::monitor_event (int event_, int value_, const std::strin - zmq_msg_t msg; - zmq_msg_init_size (&msg, 6); - uint8_t *data = (uint8_t *) zmq_msg_data (&msg); -- *(uint16_t *) (data + 0) = (uint16_t) event_; -- *(uint32_t *) (data + 2) = (uint32_t) value_; -+ // Avoid dereferencing uint32_t on unaligned address -+ uint16_t event = (uint16_t) event_; -+ uint32_t value = (uint32_t) value_; -+ memcpy (data + 0, &event, sizeof(event)); -+ memcpy (data + 2, &value, sizeof(value)); - zmq_sendmsg (monitor_socket, &msg, ZMQ_SNDMORE); - - // Send address in second frame diff --git a/sources b/sources index c7d0911..02cfb10 100644 --- a/sources +++ b/sources @@ -1 +1 @@ -c89db4dbc0b90c34c9f4983cbff6d321 zeromq-4.1.6.tar.gz +SHA512 (libzmq-4.3.1.tar.gz) = 64855a73331a194c43b01aa86a985a149eba4ed32b9f6483d2a7415cfd8bba557aab5b7b33d160cd177141de02360b73c20e4696a19c2cd798eb5f82eeb72840 diff --git a/zeromq.spec b/zeromq.spec index a2ff707..c181069 100644 --- a/zeromq.spec +++ b/zeromq.spec @@ -1,26 +1,26 @@ %bcond_without pgm +# TODO: Split into a separate package? +%global cppzmq_version 4.3.0 + Name: zeromq -Version: 4.1.6 -Release: 11%{?dist} +Version: 4.3.1 +Release: 1%{?dist} Summary: Software library for fast, message-based applications License: LGPLv3+ URL: http://www.zeromq.org -Source0: https://github.com/zeromq/zeromq4-1/releases/download/v%{version}/zeromq-%{version}.tar.gz -Source1: https://raw.githubusercontent.com/zeromq/cppzmq/master/zmq.hpp -Source2: https://raw.githubusercontent.com/zeromq/cppzmq/master/LICENSE -Patch0001: https://github.com/zeromq/libzmq/pull/1260.patch -Patch0002: https://github.com/zeromq/libzmq/pull/1574.patch +Source0: https://github.com/zeromq/libzmq/archive/v%{version}/libzmq-%{version}.tar.gz +Source1: https://github.com/zeromq/cppzmq/raw/v%{cppzmq_version}/zmq.hpp +Source2: https://github.com/zeromq/cppzmq/raw/v%{cppzmq_version}/LICENSE BuildRequires: autoconf BuildRequires: automake +BuildRequires: gcc-c++ BuildRequires: libtool BuildRequires: libsodium-devel -BuildRequires: gcc-c++ +BuildRequires: libunwind-devel -BuildRequires: glib2-devel -BuildRequires: libuuid-devel %if %{with pgm} BuildRequires: openpgm-devel BuildRequires: krb5-devel @@ -49,6 +49,7 @@ developing applications that use %{name}. %package -n cppzmq-devel Summary: Development files for cppzmq +Version: %{cppzmq_version} License: MIT Requires: %{name}-devel%{?_isa} = %{version}-%{release} @@ -59,12 +60,14 @@ developing applications that use the C++ header files of %{name}. %prep -%autosetup -p1 +%autosetup -p1 -n libzmq-%{version} cp -a %{SOURCE2} . -# Don't turn warnings into errors -sed -i "s/libzmq_werror=\"yes\"/libzmq_werror=\"no\"/g" \ - configure.ac +# Remove bundled code. +rm -rf external/wepoll + +# Fix permissions. +chmod -x src/xsub.hpp %build @@ -74,6 +77,9 @@ autoreconf -fi --with-pgm \ --with-libgssapi_krb5 \ %endif + --with-libsodium \ + --enable-libunwind \ + --disable-Werror \ --disable-static %make_build @@ -95,7 +101,7 @@ make check V=1 || ( cat test-suite.log && exit 1 ) %files -%doc AUTHORS ChangeLog MAINTAINERS NEWS +%doc README.md AUTHORS NEWS %license COPYING COPYING.LESSER %{_bindir}/curve_keygen %{_libdir}/libzmq.so.5* @@ -111,6 +117,9 @@ make check V=1 || ( cat test-suite.log && exit 1 ) %changelog +* Tue Jan 22 2019 Elliott Sales de Andrade - 4.3.1-1 +- Update to latest version of libzmq and cppzmq + * Mon Jan 21 2019 Elliott Sales de Andrade - 4.1.6-11 - Backport patches to fix test failures in build - Cleanup spec a little diff --git a/zmq.hpp b/zmq.hpp index 28dc67b..97bca4e 100644 --- a/zmq.hpp +++ b/zmq.hpp @@ -1,4 +1,5 @@ /* + Copyright (c) 2016-2017 ZeroMQ community Copyright (c) 2009-2011 250bpm s.r.o. Copyright (c) 2011 Botond Ballo Copyright (c) 2007-2009 iMatix Corporation @@ -25,59 +26,81 @@ #ifndef __ZMQ_HPP_INCLUDED__ #define __ZMQ_HPP_INCLUDED__ +#if (__cplusplus >= 201402L) +#define ZMQ_DEPRECATED(msg) [[deprecated(msg)]] +#elif defined(_MSC_VER) +#define ZMQ_DEPRECATED(msg) __declspec(deprecated(msg)) +#elif defined(__GNUC__) +#define ZMQ_DEPRECATED(msg) __attribute__((deprecated(msg))) +#endif + #if (__cplusplus >= 201103L) - #define ZMQ_CPP11 - #define ZMQ_NOTHROW noexcept - #define ZMQ_EXPLICIT explicit -#elif (defined(_MSC_VER) && (_MSC_VER >= 1900)) - #define ZMQ_CPP11 - #define ZMQ_NOTHROW noexcept - #define ZMQ_EXPLICIT explicit +#define ZMQ_CPP11 +#define ZMQ_NOTHROW noexcept +#define ZMQ_EXPLICIT explicit +#elif (defined(_MSC_VER) && (_MSC_VER >= 1900)) +#define ZMQ_CPP11 +#define ZMQ_NOTHROW noexcept +#define ZMQ_EXPLICIT explicit #else - #define ZMQ_CPP03 - #define ZMQ_NOTHROW - #define ZMQ_EXPLICIT +#define ZMQ_CPP03 +#define ZMQ_NOTHROW +#define ZMQ_EXPLICIT #endif #include -#include #include #include -#include + +#include #include -#include +#include #include +#include +#include +#include + +/* Version macros for compile-time API version detection */ +#define CPPZMQ_VERSION_MAJOR 4 +#define CPPZMQ_VERSION_MINOR 3 +#define CPPZMQ_VERSION_PATCH 0 + +#define CPPZMQ_VERSION \ + ZMQ_MAKE_VERSION(CPPZMQ_VERSION_MAJOR, CPPZMQ_VERSION_MINOR, \ + CPPZMQ_VERSION_PATCH) #ifdef ZMQ_CPP11 #include #include +#include +#include +#include #endif // Detect whether the compiler supports C++11 rvalue references. -#if (defined(__GNUC__) && (__GNUC__ > 4 || \ - (__GNUC__ == 4 && __GNUC_MINOR__ > 2)) && \ - defined(__GXX_EXPERIMENTAL_CXX0X__)) - #define ZMQ_HAS_RVALUE_REFS - #define ZMQ_DELETED_FUNCTION = delete +#if (defined(__GNUC__) && (__GNUC__ > 4 || (__GNUC__ == 4 && __GNUC_MINOR__ > 2)) \ + && defined(__GXX_EXPERIMENTAL_CXX0X__)) +#define ZMQ_HAS_RVALUE_REFS +#define ZMQ_DELETED_FUNCTION = delete #elif defined(__clang__) - #if __has_feature(cxx_rvalue_references) - #define ZMQ_HAS_RVALUE_REFS - #endif - - #if __has_feature(cxx_deleted_functions) - #define ZMQ_DELETED_FUNCTION = delete - #else - #define ZMQ_DELETED_FUNCTION - #endif +#if __has_feature(cxx_rvalue_references) +#define ZMQ_HAS_RVALUE_REFS +#endif + +#if __has_feature(cxx_deleted_functions) +#define ZMQ_DELETED_FUNCTION = delete +#else +#define ZMQ_DELETED_FUNCTION +#endif #elif defined(_MSC_VER) && (_MSC_VER >= 1900) - #define ZMQ_HAS_RVALUE_REFS - #define ZMQ_DELETED_FUNCTION = delete +#define ZMQ_HAS_RVALUE_REFS +#define ZMQ_DELETED_FUNCTION = delete #elif defined(_MSC_VER) && (_MSC_VER >= 1600) - #define ZMQ_HAS_RVALUE_REFS - #define ZMQ_DELETED_FUNCTION +#define ZMQ_HAS_RVALUE_REFS +#define ZMQ_DELETED_FUNCTION #else - #define ZMQ_DELETED_FUNCTION +#define ZMQ_DELETED_FUNCTION #endif #if ZMQ_VERSION >= ZMQ_MAKE_VERSION(3, 3, 0) @@ -87,726 +110,1026 @@ #if ZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 1, 0) #define ZMQ_HAS_PROXY_STEERABLE /* Socket event data */ -typedef struct { - uint16_t event; // id of the event as bitfield - int32_t value ; // value is either error code, fd or reconnect interval +typedef struct +{ + uint16_t event; // id of the event as bitfield + int32_t value; // value is either error code, fd or reconnect interval } zmq_event_t; #endif // Avoid using deprecated message receive function when possible #if ZMQ_VERSION < ZMQ_MAKE_VERSION(3, 2, 0) -# define zmq_msg_recv(msg, socket, flags) zmq_recvmsg(socket, msg, flags) +#define zmq_msg_recv(msg, socket, flags) zmq_recvmsg(socket, msg, flags) #endif // In order to prevent unused variable warnings when building in non-debug // mode use this macro to make assertions. #ifndef NDEBUG -# define ZMQ_ASSERT(expression) assert(expression) +#define ZMQ_ASSERT(expression) assert(expression) #else -# define ZMQ_ASSERT(expression) (void)(expression) +#define ZMQ_ASSERT(expression) (void) (expression) #endif namespace zmq { +typedef zmq_free_fn free_fn; +typedef zmq_pollitem_t pollitem_t; + +class error_t : public std::exception +{ + public: + error_t() : errnum(zmq_errno()) {} +#ifdef ZMQ_CPP11 + virtual const char *what() const noexcept { return zmq_strerror(errnum); } +#else + virtual const char *what() const throw() { return zmq_strerror(errnum); } +#endif + int num() const { return errnum; } - typedef zmq_free_fn free_fn; - typedef zmq_pollitem_t pollitem_t; + private: + int errnum; +}; - class error_t : public std::exception - { - public: +inline int poll(zmq_pollitem_t const *items_, size_t nitems_, long timeout_ = -1) +{ + int rc = zmq_poll(const_cast(items_), + static_cast(nitems_), timeout_); + if (rc < 0) + throw error_t(); + return rc; +} - error_t () : errnum (zmq_errno ()) {} #ifdef ZMQ_CPP11 - virtual const char *what () const noexcept - { - return zmq_strerror (errnum); - } -#else - virtual const char *what() const throw () - { - return zmq_strerror(errnum); - } -#endif - int num () const - { - return errnum; - } +inline int +poll(zmq_pollitem_t const *items, size_t nitems, std::chrono::milliseconds timeout) +{ + return poll(items, nitems, static_cast(timeout.count())); +} + +inline int poll(std::vector const &items, + std::chrono::milliseconds timeout) +{ + return poll(items.data(), items.size(), static_cast(timeout.count())); +} - private: +inline int poll(std::vector const &items, long timeout_ = -1) +{ + return poll(items.data(), items.size(), timeout_); +} +#endif - int errnum; - }; - inline int poll (zmq_pollitem_t const* items_, size_t nitems_, long timeout_ = -1) +inline void proxy(void *frontend, void *backend, void *capture) +{ + int rc = zmq_proxy(frontend, backend, capture); + if (rc != 0) + throw error_t(); +} + +#ifdef ZMQ_HAS_PROXY_STEERABLE +inline void +proxy_steerable(void *frontend, void *backend, void *capture, void *control) +{ + int rc = zmq_proxy_steerable(frontend, backend, capture, control); + if (rc != 0) + throw error_t(); +} +#endif + +inline void version(int *major_, int *minor_, int *patch_) +{ + zmq_version(major_, minor_, patch_); +} + +#ifdef ZMQ_CPP11 +inline std::tuple version() +{ + std::tuple v; + zmq_version(&std::get<0>(v), &std::get<1>(v), &std::get<2>(v)); + return v; +} +#endif + +class message_t +{ + friend class socket_t; + + public: + inline message_t() { - int rc = zmq_poll (const_cast(items_), static_cast(nitems_), timeout_); - if (rc < 0) - throw error_t (); - return rc; + int rc = zmq_msg_init(&msg); + if (rc != 0) + throw error_t(); } - inline int poll(zmq_pollitem_t const* items, size_t nitems) + inline explicit message_t(size_t size_) { - return poll(items, nitems, -1); + int rc = zmq_msg_init_size(&msg, size_); + if (rc != 0) + throw error_t(); } - #ifdef ZMQ_CPP11 - inline int poll(zmq_pollitem_t const* items, size_t nitems, std::chrono::milliseconds timeout) + template message_t(T first, T last) : msg() { - return poll(items, nitems, timeout.count() ); + typedef typename std::iterator_traits::difference_type size_type; + typedef typename std::iterator_traits::value_type value_t; + + size_type const size_ = std::distance(first, last) * sizeof(value_t); + int const rc = zmq_msg_init_size(&msg, size_); + if (rc != 0) + throw error_t(); + value_t *dest = data(); + while (first != last) { + *dest = *first; + ++dest; + ++first; + } } - inline int poll(std::vector const& items, std::chrono::milliseconds timeout) + inline message_t(const void *data_, size_t size_) { - return poll(items.data(), items.size(), timeout.count() ); + int rc = zmq_msg_init_size(&msg, size_); + if (rc != 0) + throw error_t(); + memcpy(data(), data_, size_); + } + + inline message_t(void *data_, size_t size_, free_fn *ffn_, void *hint_ = NULL) + { + int rc = zmq_msg_init_data(&msg, data_, size_, ffn_, hint_); + if (rc != 0) + throw error_t(); } - inline int poll(std::vector const& items, long timeout_ = -1) +#if defined(ZMQ_BUILD_DRAFT_API) && defined(ZMQ_CPP11) + template + message_t(const T &msg_) : message_t(std::begin(msg_), std::end(msg_)) { - return poll(items.data(), items.size(), timeout_); } - #endif +#endif + +#ifdef ZMQ_HAS_RVALUE_REFS + inline message_t(message_t &&rhs) : msg(rhs.msg) + { + int rc = zmq_msg_init(&rhs.msg); + if (rc != 0) + throw error_t(); + } + inline message_t &operator=(message_t &&rhs) ZMQ_NOTHROW + { + std::swap(msg, rhs.msg); + return *this; + } +#endif + inline ~message_t() ZMQ_NOTHROW + { + int rc = zmq_msg_close(&msg); + ZMQ_ASSERT(rc == 0); + } - inline void proxy (void *frontend, void *backend, void *capture) + inline void rebuild() { - int rc = zmq_proxy (frontend, backend, capture); + int rc = zmq_msg_close(&msg); + if (rc != 0) + throw error_t(); + rc = zmq_msg_init(&msg); if (rc != 0) - throw error_t (); + throw error_t(); } - -#ifdef ZMQ_HAS_PROXY_STEERABLE - inline void proxy_steerable (void *frontend, void *backend, void *capture, void *control) + + inline void rebuild(size_t size_) { - int rc = zmq_proxy_steerable (frontend, backend, capture, control); + int rc = zmq_msg_close(&msg); + if (rc != 0) + throw error_t(); + rc = zmq_msg_init_size(&msg, size_); if (rc != 0) - throw error_t (); + throw error_t(); } -#endif - - inline void version (int *major_, int *minor_, int *patch_) + + inline void rebuild(const void *data_, size_t size_) { - zmq_version (major_, minor_, patch_); + int rc = zmq_msg_close(&msg); + if (rc != 0) + throw error_t(); + rc = zmq_msg_init_size(&msg, size_); + if (rc != 0) + throw error_t(); + memcpy(data(), data_, size_); } - #ifdef ZMQ_CPP11 - inline std::tuple version() + inline void rebuild(void *data_, size_t size_, free_fn *ffn_, void *hint_ = NULL) { - std::tuple v; - zmq_version(&std::get<0>(v), &std::get<1>(v), &std::get<2>(v) ); - return v; + int rc = zmq_msg_close(&msg); + if (rc != 0) + throw error_t(); + rc = zmq_msg_init_data(&msg, data_, size_, ffn_, hint_); + if (rc != 0) + throw error_t(); } - #endif - class message_t + inline void move(message_t const *msg_) { - friend class socket_t; + int rc = zmq_msg_move(&msg, const_cast(&(msg_->msg))); + if (rc != 0) + throw error_t(); + } - public: + inline void copy(message_t const *msg_) + { + int rc = zmq_msg_copy(&msg, const_cast(&(msg_->msg))); + if (rc != 0) + throw error_t(); + } - inline message_t () - { - int rc = zmq_msg_init (&msg); - if (rc != 0) - throw error_t (); - } + inline bool more() const ZMQ_NOTHROW + { + int rc = zmq_msg_more(const_cast(&msg)); + return rc != 0; + } - inline explicit message_t (size_t size_) - { - int rc = zmq_msg_init_size (&msg, size_); - if (rc != 0) - throw error_t (); - } + inline void *data() ZMQ_NOTHROW { return zmq_msg_data(&msg); } - template message_t(I first, I last): - msg() - { - typedef typename std::iterator_traits::difference_type size_type; - typedef typename std::iterator_traits::value_type value_t; - - size_type const size_ = std::distance(first, last)*sizeof(value_t); - int const rc = zmq_msg_init_size (&msg, size_); - if (rc != 0) - throw error_t (); - value_t* dest = data(); - while (first != last) - { - *dest = *first; - ++dest; ++first; - } - } + inline const void *data() const ZMQ_NOTHROW + { + return zmq_msg_data(const_cast(&msg)); + } - inline message_t (const void *data_, size_t size_) - { - int rc = zmq_msg_init_size (&msg, size_); - if (rc != 0) - throw error_t (); - memcpy(data(), data_, size_); - } + inline size_t size() const ZMQ_NOTHROW + { + return zmq_msg_size(const_cast(&msg)); + } - inline message_t (void *data_, size_t size_, free_fn *ffn_, - void *hint_ = NULL) - { - int rc = zmq_msg_init_data (&msg, data_, size_, ffn_, hint_); - if (rc != 0) - throw error_t (); - } + template T *data() ZMQ_NOTHROW { return static_cast(data()); } -#ifdef ZMQ_HAS_RVALUE_REFS - inline message_t (message_t &&rhs): msg (rhs.msg) - { - int rc = zmq_msg_init (&rhs.msg); - if (rc != 0) - throw error_t (); - } + template T const *data() const ZMQ_NOTHROW + { + return static_cast(data()); + } - inline message_t &operator = (message_t &&rhs) ZMQ_NOTHROW - { - std::swap (msg, rhs.msg); - return *this; - } + ZMQ_DEPRECATED("from 4.3.0, use operator== instead") + inline bool equal(const message_t *other) const ZMQ_NOTHROW + { + return *this == *other; + } + + inline bool operator==(const message_t &other) const ZMQ_NOTHROW + { + const size_t my_size = size(); + return my_size == other.size() && 0 == memcmp(data(), other.data(), my_size); + } + + inline bool operator!=(const message_t &other) const ZMQ_NOTHROW + { + return !(*this == other); + } + +#if ZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 1, 0) + inline const char *gets(const char *property_) + { + const char *value = zmq_msg_gets(&msg, property_); + if (value == NULL) + throw error_t(); + return value; + } #endif - inline ~message_t () ZMQ_NOTHROW - { - int rc = zmq_msg_close (&msg); - ZMQ_ASSERT (rc == 0); - } +#if defined(ZMQ_BUILD_DRAFT_API) && ZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 2, 0) + inline uint32_t routing_id() const + { + return zmq_msg_routing_id(const_cast(&msg)); + } - inline void rebuild () - { - int rc = zmq_msg_close (&msg); - if (rc != 0) - throw error_t (); - rc = zmq_msg_init (&msg); - if (rc != 0) - throw error_t (); - } + inline void set_routing_id(uint32_t routing_id) + { + int rc = zmq_msg_set_routing_id(&msg, routing_id); + if (rc != 0) + throw error_t(); + } - inline void rebuild (size_t size_) - { - int rc = zmq_msg_close (&msg); - if (rc != 0) - throw error_t (); - rc = zmq_msg_init_size (&msg, size_); - if (rc != 0) - throw error_t (); - } + inline const char* group() const + { + return zmq_msg_group(const_cast(&msg)); + } - inline void rebuild (const void *data_, size_t size_) - { - int rc = zmq_msg_close (&msg); - if (rc != 0) - throw error_t (); - rc = zmq_msg_init_size (&msg, size_); - if (rc != 0) - throw error_t (); - memcpy(data(), data_, size_); - } + inline void set_group(const char* group) + { + int rc = zmq_msg_set_group(&msg, group); + if (rc != 0) + throw error_t(); + } +#endif - inline void rebuild (void *data_, size_t size_, free_fn *ffn_, - void *hint_ = NULL) - { - int rc = zmq_msg_close (&msg); - if (rc != 0) - throw error_t (); - rc = zmq_msg_init_data (&msg, data_, size_, ffn_, hint_); - if (rc != 0) - throw error_t (); + /** Dump content to string. Ascii chars are readable, the rest is printed as hex. + * Probably ridiculously slow. + */ + inline std::string str() const + { + // Partly mutuated from the same method in zmq::multipart_t + std::stringstream os; + + const unsigned char *msg_data = this->data(); + unsigned char byte; + size_t size = this->size(); + int is_ascii[2] = {0, 0}; + + os << "zmq::message_t [size " << std::dec << std::setw(3) + << std::setfill('0') << size << "] ("; + // Totally arbitrary + if (size >= 1000) { + os << "... too big to print)"; + } else { + while (size--) { + byte = *msg_data++; + + is_ascii[1] = (byte >= 33 && byte < 127); + if (is_ascii[1] != is_ascii[0]) + os << " "; // Separate text/non text + + if (is_ascii[1]) { + os << byte; + } else { + os << std::hex << std::uppercase << std::setw(2) + << std::setfill('0') << static_cast(byte); + } + is_ascii[0] = is_ascii[1]; + } + os << ")"; } + return os.str(); + } - inline void move (message_t const *msg_) - { - int rc = zmq_msg_move (&msg, const_cast(&(msg_->msg))); - if (rc != 0) - throw error_t (); - } + private: + // The underlying message + zmq_msg_t msg; - inline void copy (message_t const *msg_) - { - int rc = zmq_msg_copy (&msg, const_cast(&(msg_->msg))); - if (rc != 0) - throw error_t (); - } + // Disable implicit message copying, so that users won't use shared + // messages (less efficient) without being aware of the fact. + message_t(const message_t &) ZMQ_DELETED_FUNCTION; + void operator=(const message_t &) ZMQ_DELETED_FUNCTION; +}; - inline bool more () const ZMQ_NOTHROW - { - int rc = zmq_msg_more (const_cast(&msg) ); - return rc != 0; - } +class context_t +{ + friend class socket_t; - inline void *data () ZMQ_NOTHROW - { - return zmq_msg_data (&msg); - } + public: + inline context_t() + { + ptr = zmq_ctx_new(); + if (ptr == NULL) + throw error_t(); + } - inline const void* data () const ZMQ_NOTHROW - { - return zmq_msg_data (const_cast(&msg)); - } - inline size_t size () const ZMQ_NOTHROW - { - return zmq_msg_size (const_cast(&msg)); - } + inline explicit context_t(int io_threads_, + int max_sockets_ = ZMQ_MAX_SOCKETS_DFLT) + { + ptr = zmq_ctx_new(); + if (ptr == NULL) + throw error_t(); - template T* data() ZMQ_NOTHROW - { - return static_cast( data() ); - } + int rc = zmq_ctx_set(ptr, ZMQ_IO_THREADS, io_threads_); + ZMQ_ASSERT(rc == 0); - template T const* data() const ZMQ_NOTHROW - { - return static_cast( data() ); - } + rc = zmq_ctx_set(ptr, ZMQ_MAX_SOCKETS, max_sockets_); + ZMQ_ASSERT(rc == 0); + } - inline bool equal(const message_t* other) const ZMQ_NOTHROW - { - if (size() != other->size()) - return false; - std::string a(data(), size()); - std::string b(other->data(), other->size()); - return a == b; - } +#ifdef ZMQ_HAS_RVALUE_REFS + inline context_t(context_t &&rhs) ZMQ_NOTHROW : ptr(rhs.ptr) { rhs.ptr = NULL; } + inline context_t &operator=(context_t &&rhs) ZMQ_NOTHROW + { + std::swap(ptr, rhs.ptr); + return *this; + } +#endif - private: - // The underlying message - zmq_msg_t msg; + inline int setctxopt(int option_, int optval_) + { + int rc = zmq_ctx_set(ptr, option_, optval_); + ZMQ_ASSERT(rc == 0); + return rc; + } + + inline int getctxopt(int option_) { return zmq_ctx_get(ptr, option_); } - // Disable implicit message copying, so that users won't use shared - // messages (less efficient) without being aware of the fact. - message_t (const message_t&) ZMQ_DELETED_FUNCTION; - void operator = (const message_t&) ZMQ_DELETED_FUNCTION; - }; + inline ~context_t() ZMQ_NOTHROW { close(); } - class context_t + inline void close() ZMQ_NOTHROW { - friend class socket_t; + if (ptr == NULL) + return; - public: - inline context_t () - { - ptr = zmq_ctx_new (); - if (ptr == NULL) - throw error_t (); - } + int rc = zmq_ctx_destroy(ptr); + ZMQ_ASSERT(rc == 0); + ptr = NULL; + } + // Be careful with this, it's probably only useful for + // using the C api together with an existing C++ api. + // Normally you should never need to use this. + inline ZMQ_EXPLICIT operator void *() ZMQ_NOTHROW { return ptr; } - inline explicit context_t (int io_threads_, int max_sockets_ = ZMQ_MAX_SOCKETS_DFLT) - { - ptr = zmq_ctx_new (); - if (ptr == NULL) - throw error_t (); + inline ZMQ_EXPLICIT operator void const *() const ZMQ_NOTHROW { return ptr; } - int rc = zmq_ctx_set (ptr, ZMQ_IO_THREADS, io_threads_); - ZMQ_ASSERT (rc == 0); + inline operator bool() const ZMQ_NOTHROW { return ptr != NULL; } - rc = zmq_ctx_set (ptr, ZMQ_MAX_SOCKETS, max_sockets_); - ZMQ_ASSERT (rc == 0); - } + private: + void *ptr; + + context_t(const context_t &) ZMQ_DELETED_FUNCTION; + void operator=(const context_t &) ZMQ_DELETED_FUNCTION; +}; + +#ifdef ZMQ_CPP11 +enum class socket_type : int +{ + req = ZMQ_REQ, + rep = ZMQ_REP, + dealer = ZMQ_DEALER, + router = ZMQ_ROUTER, + pub = ZMQ_PUB, + sub = ZMQ_SUB, + xpub = ZMQ_XPUB, + xsub = ZMQ_XSUB, + push = ZMQ_PUSH, + pull = ZMQ_PULL, +#if defined(ZMQ_BUILD_DRAFT_API) && ZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 2, 0) + server = ZMQ_SERVER, + client = ZMQ_CLIENT, + radio = ZMQ_RADIO, + dish = ZMQ_DISH, +#endif +#if ZMQ_VERSION_MAJOR >= 4 + stream = ZMQ_STREAM, +#endif + pair = ZMQ_PAIR +}; +#endif + +class socket_t +{ + friend class monitor_t; + + public: + inline socket_t(context_t &context_, int type_) { init(context_, type_); } + +#ifdef ZMQ_CPP11 + inline socket_t(context_t &context_, socket_type type_) + { + init(context_, static_cast(type_)); + } +#endif #ifdef ZMQ_HAS_RVALUE_REFS - inline context_t (context_t &&rhs) ZMQ_NOTHROW : ptr (rhs.ptr) - { - rhs.ptr = NULL; - } - inline context_t &operator = (context_t &&rhs) ZMQ_NOTHROW - { - std::swap (ptr, rhs.ptr); - return *this; - } + inline socket_t(socket_t &&rhs) ZMQ_NOTHROW : ptr(rhs.ptr), ctxptr(rhs.ctxptr) + { + rhs.ptr = NULL; + rhs.ctxptr = NULL; + } + inline socket_t &operator=(socket_t &&rhs) ZMQ_NOTHROW + { + std::swap(ptr, rhs.ptr); + return *this; + } #endif - inline ~context_t () ZMQ_NOTHROW - { - close(); - } + inline ~socket_t() ZMQ_NOTHROW { close(); } - inline void close() ZMQ_NOTHROW - { - if (ptr == NULL) - return; + inline operator void *() ZMQ_NOTHROW { return ptr; } - int rc = zmq_ctx_destroy (ptr); - ZMQ_ASSERT (rc == 0); - ptr = NULL; - } + inline operator void const *() const ZMQ_NOTHROW { return ptr; } - // Be careful with this, it's probably only useful for - // using the C api together with an existing C++ api. - // Normally you should never need to use this. - inline ZMQ_EXPLICIT operator void* () ZMQ_NOTHROW - { - return ptr; - } + inline void close() ZMQ_NOTHROW + { + if (ptr == NULL) + // already closed + return; + int rc = zmq_close(ptr); + ZMQ_ASSERT(rc == 0); + ptr = 0; + } - inline ZMQ_EXPLICIT operator void const* () const ZMQ_NOTHROW - { - return ptr; - } - private: - - void *ptr; - - context_t (const context_t&) ZMQ_DELETED_FUNCTION; - void operator = (const context_t&) ZMQ_DELETED_FUNCTION; - }; - - #ifdef ZMQ_CPP11 - enum class socket_type: int - { - req = ZMQ_REQ, - rep = ZMQ_REP, - dealer = ZMQ_DEALER, - router = ZMQ_ROUTER, - pub = ZMQ_PUB, - sub = ZMQ_SUB, - xpub = ZMQ_XPUB, - xsub = ZMQ_XSUB, - push = ZMQ_PUSH, - pull = ZMQ_PULL, -#if ZMQ_VERSION_MAJOR < 4 - pair = ZMQ_PAIR -#else - pair = ZMQ_PAIR, - stream = ZMQ_STREAM -#endif - }; - #endif + template void setsockopt(int option_, T const &optval) + { + setsockopt(option_, &optval, sizeof(T)); + } - class socket_t + inline void setsockopt(int option_, const void *optval_, size_t optvallen_) { - friend class monitor_t; - public: - inline socket_t(context_t& context_, int type_) - { - init(context_, type_); - } + int rc = zmq_setsockopt(ptr, option_, optval_, optvallen_); + if (rc != 0) + throw error_t(); + } - #ifdef ZMQ_CPP11 - inline socket_t(context_t& context_, socket_type type_) - { - init(context_, static_cast(type_)); - } - #endif + inline void getsockopt(int option_, void *optval_, size_t *optvallen_) const + { + int rc = zmq_getsockopt(ptr, option_, optval_, optvallen_); + if (rc != 0) + throw error_t(); + } -#ifdef ZMQ_HAS_RVALUE_REFS - inline socket_t(socket_t&& rhs) ZMQ_NOTHROW : ptr(rhs.ptr) - { - rhs.ptr = NULL; - } - inline socket_t& operator=(socket_t&& rhs) ZMQ_NOTHROW - { - std::swap(ptr, rhs.ptr); - return *this; - } -#endif + template T getsockopt(int option_) const + { + T optval; + size_t optlen = sizeof(T); + getsockopt(option_, &optval, &optlen); + return optval; + } - inline ~socket_t () ZMQ_NOTHROW - { - close(); - } + inline void bind(std::string const &addr) { bind(addr.c_str()); } - inline operator void* () ZMQ_NOTHROW - { - return ptr; - } + inline void bind(const char *addr_) + { + int rc = zmq_bind(ptr, addr_); + if (rc != 0) + throw error_t(); + } - inline operator void const* () const ZMQ_NOTHROW - { - return ptr; - } + inline void unbind(std::string const &addr) { unbind(addr.c_str()); } - inline void close() ZMQ_NOTHROW - { - if(ptr == NULL) - // already closed - return ; - int rc = zmq_close (ptr); - ZMQ_ASSERT (rc == 0); - ptr = 0 ; - } + inline void unbind(const char *addr_) + { + int rc = zmq_unbind(ptr, addr_); + if (rc != 0) + throw error_t(); + } - template void setsockopt(int option_, T const& optval) - { - setsockopt(option_, &optval, sizeof(T) ); - } + inline void connect(std::string const &addr) { connect(addr.c_str()); } - inline void setsockopt (int option_, const void *optval_, - size_t optvallen_) - { - int rc = zmq_setsockopt (ptr, option_, optval_, optvallen_); - if (rc != 0) - throw error_t (); - } + inline void connect(const char *addr_) + { + int rc = zmq_connect(ptr, addr_); + if (rc != 0) + throw error_t(); + } - inline void getsockopt (int option_, void *optval_, - size_t *optvallen_) const - { - int rc = zmq_getsockopt (ptr, option_, optval_, optvallen_); - if (rc != 0) - throw error_t (); - } + inline void disconnect(std::string const &addr) { disconnect(addr.c_str()); } - template T getsockopt(int option_) const - { - T optval; - size_t optlen = sizeof(T); - getsockopt(option_, &optval, &optlen ); - return optval; - } + inline void disconnect(const char *addr_) + { + int rc = zmq_disconnect(ptr, addr_); + if (rc != 0) + throw error_t(); + } - inline void bind(std::string const& addr) - { - bind(addr.c_str()); - } + inline bool connected() const ZMQ_NOTHROW { return (ptr != NULL); } - inline void bind (const char *addr_) - { - int rc = zmq_bind (ptr, addr_); - if (rc != 0) - throw error_t (); - } + inline size_t send(const void *buf_, size_t len_, int flags_ = 0) + { + int nbytes = zmq_send(ptr, buf_, len_, flags_); + if (nbytes >= 0) + return (size_t) nbytes; + if (zmq_errno() == EAGAIN) + return 0; + throw error_t(); + } - inline void unbind(std::string const& addr) - { - unbind(addr.c_str()); - } + inline bool send(message_t &msg_, int flags_ = 0) + { + int nbytes = zmq_msg_send(&(msg_.msg), ptr, flags_); + if (nbytes >= 0) + return true; + if (zmq_errno() == EAGAIN) + return false; + throw error_t(); + } - inline void unbind (const char *addr_) - { - int rc = zmq_unbind (ptr, addr_); - if (rc != 0) - throw error_t (); - } + template bool send(T first, T last, int flags_ = 0) + { + zmq::message_t msg(first, last); + return send(msg, flags_); + } - inline void connect(std::string const& addr) - { - connect(addr.c_str()); - } +#ifdef ZMQ_HAS_RVALUE_REFS + inline bool send(message_t &&msg_, int flags_ = 0) { return send(msg_, flags_); } +#endif - inline void connect (const char *addr_) - { - int rc = zmq_connect (ptr, addr_); - if (rc != 0) - throw error_t (); - } + inline size_t recv(void *buf_, size_t len_, int flags_ = 0) + { + int nbytes = zmq_recv(ptr, buf_, len_, flags_); + if (nbytes >= 0) + return (size_t) nbytes; + if (zmq_errno() == EAGAIN) + return 0; + throw error_t(); + } - inline void disconnect(std::string const& addr) - { - disconnect(addr.c_str()); - } + inline bool recv(message_t *msg_, int flags_ = 0) + { + int nbytes = zmq_msg_recv(&(msg_->msg), ptr, flags_); + if (nbytes >= 0) + return true; + if (zmq_errno() == EAGAIN) + return false; + throw error_t(); + } - inline void disconnect (const char *addr_) - { - int rc = zmq_disconnect (ptr, addr_); - if (rc != 0) - throw error_t (); - } +#if defined(ZMQ_BUILD_DRAFT_API) && ZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 2, 0) + inline void join(const char* group) + { + int rc = zmq_join(ptr, group); + if (rc != 0) + throw error_t(); + } - inline bool connected() const ZMQ_NOTHROW - { - return(ptr != NULL); - } - - inline size_t send (const void *buf_, size_t len_, int flags_ = 0) - { - int nbytes = zmq_send (ptr, buf_, len_, flags_); - if (nbytes >= 0) - return (size_t) nbytes; - if (zmq_errno () == EAGAIN) - return 0; - throw error_t (); - } + inline void leave(const char* group) + { + int rc = zmq_leave(ptr, group); + if (rc != 0) + throw error_t(); + } +#endif - inline bool send (message_t &msg_, int flags_ = 0) - { - int nbytes = zmq_msg_send (&(msg_.msg), ptr, flags_); - if (nbytes >= 0) - return true; - if (zmq_errno () == EAGAIN) - return false; - throw error_t (); - } + private: + inline void init(context_t &context_, int type_) + { + ctxptr = context_.ptr; + ptr = zmq_socket(context_.ptr, type_); + if (ptr == NULL) + throw error_t(); + } + + void *ptr; + void *ctxptr; + + socket_t(const socket_t &) ZMQ_DELETED_FUNCTION; + void operator=(const socket_t &) ZMQ_DELETED_FUNCTION; +}; + +class monitor_t +{ + public: + monitor_t() : socketPtr(NULL), monitor_socket(NULL) {} + + virtual ~monitor_t() + { + if (socketPtr) + zmq_socket_monitor(socketPtr, NULL, 0); + + if (monitor_socket) + zmq_close(monitor_socket); + } - template bool send(I first, I last, int flags_=0) - { - zmq::message_t msg(first, last); - return send(msg, flags_); - } #ifdef ZMQ_HAS_RVALUE_REFS - inline bool send (message_t &&msg_, int flags_ = 0) - { - return send(msg_, flags_); - } + monitor_t(monitor_t &&rhs) ZMQ_NOTHROW : socketPtr(rhs.socketPtr), + monitor_socket(rhs.monitor_socket) + { + rhs.socketPtr = NULL; + rhs.monitor_socket = NULL; + } + + socket_t &operator=(socket_t &&rhs) ZMQ_DELETED_FUNCTION; #endif - inline size_t recv (void *buf_, size_t len_, int flags_ = 0) - { - int nbytes = zmq_recv (ptr, buf_, len_, flags_); - if (nbytes >= 0) - return (size_t) nbytes; - if (zmq_errno () == EAGAIN) - return 0; - throw error_t (); - } - inline bool recv (message_t *msg_, int flags_ = 0) - { - int nbytes = zmq_msg_recv (&(msg_->msg), ptr, flags_); - if (nbytes >= 0) - return true; - if (zmq_errno () == EAGAIN) - return false; - throw error_t (); - } - - private: - inline void init(context_t& context_, int type_) - { - ctxptr = context_.ptr; - ptr = zmq_socket (context_.ptr, type_ ); - if (ptr == NULL) - throw error_t (); + void + monitor(socket_t &socket, std::string const &addr, int events = ZMQ_EVENT_ALL) + { + monitor(socket, addr.c_str(), events); + } + + void monitor(socket_t &socket, const char *addr_, int events = ZMQ_EVENT_ALL) + { + init(socket, addr_, events); + while (true) { + check_event(-1); } + } + + void init(socket_t &socket, std::string const &addr, int events = ZMQ_EVENT_ALL) + { + init(socket, addr.c_str(), events); + } + + void init(socket_t &socket, const char *addr_, int events = ZMQ_EVENT_ALL) + { + int rc = zmq_socket_monitor(socket.ptr, addr_, events); + if (rc != 0) + throw error_t(); + + socketPtr = socket.ptr; + monitor_socket = zmq_socket(socket.ctxptr, ZMQ_PAIR); + assert(monitor_socket); - void *ptr; - void *ctxptr; + rc = zmq_connect(monitor_socket, addr_); + assert(rc == 0); - socket_t (const socket_t&) ZMQ_DELETED_FUNCTION; - void operator = (const socket_t&) ZMQ_DELETED_FUNCTION; - }; + on_monitor_started(); + } - class monitor_t + bool check_event(int timeout = 0) { - public: - monitor_t() : socketPtr(NULL) {} - virtual ~monitor_t() {} + assert(monitor_socket); - void monitor(socket_t &socket, std::string const& addr, int events = ZMQ_EVENT_ALL) - { - monitor(socket, addr.c_str(), events); + zmq_msg_t eventMsg; + zmq_msg_init(&eventMsg); + + zmq::pollitem_t items[] = { + {monitor_socket, 0, ZMQ_POLLIN, 0}, + }; + + zmq::poll(&items[0], 1, timeout); + + if (items[0].revents & ZMQ_POLLIN) { + int rc = zmq_msg_recv(&eventMsg, monitor_socket, 0); + if (rc == -1 && zmq_errno() == ETERM) + return false; + assert(rc != -1); + + } else { + zmq_msg_close(&eventMsg); + return false; } - void monitor(socket_t &socket, const char *addr_, int events = ZMQ_EVENT_ALL) - { - int rc = zmq_socket_monitor(socket.ptr, addr_, events); - if (rc != 0) - throw error_t (); - - socketPtr = socket.ptr; - void *s = zmq_socket (socket.ctxptr, ZMQ_PAIR); - assert (s); - - rc = zmq_connect (s, addr_); - assert (rc == 0); - - on_monitor_started(); - - while (true) { - zmq_msg_t eventMsg; - zmq_msg_init (&eventMsg); - rc = zmq_msg_recv (&eventMsg, s, 0); - if (rc == -1 && zmq_errno() == ETERM) - break; - assert (rc != -1); #if ZMQ_VERSION_MAJOR >= 4 - const char* data = static_cast(zmq_msg_data(&eventMsg)); - zmq_event_t msgEvent; - memcpy(&msgEvent.event, data, sizeof(uint16_t)); data += sizeof(uint16_t); - memcpy(&msgEvent.value, data, sizeof(int32_t)); - zmq_event_t* event = &msgEvent; + const char *data = static_cast(zmq_msg_data(&eventMsg)); + zmq_event_t msgEvent; + memcpy(&msgEvent.event, data, sizeof(uint16_t)); + data += sizeof(uint16_t); + memcpy(&msgEvent.value, data, sizeof(int32_t)); + zmq_event_t *event = &msgEvent; #else - zmq_event_t* event = static_cast(zmq_msg_data(&eventMsg)); + zmq_event_t *event = static_cast(zmq_msg_data(&eventMsg)); #endif - + #ifdef ZMQ_NEW_MONITOR_EVENT_LAYOUT - zmq_msg_t addrMsg; - zmq_msg_init (&addrMsg); - rc = zmq_msg_recv (&addrMsg, s, 0); - if (rc == -1 && zmq_errno() == ETERM) - break; - assert (rc != -1); - const char* str = static_cast(zmq_msg_data (&addrMsg)); - std::string address(str, str + zmq_msg_size(&addrMsg)); - zmq_msg_close (&addrMsg); + zmq_msg_t addrMsg; + zmq_msg_init(&addrMsg); + int rc = zmq_msg_recv(&addrMsg, monitor_socket, 0); + if (rc == -1 && zmq_errno() == ETERM) { + zmq_msg_close(&eventMsg); + return false; + } + + assert(rc != -1); + const char *str = static_cast(zmq_msg_data(&addrMsg)); + std::string address(str, str + zmq_msg_size(&addrMsg)); + zmq_msg_close(&addrMsg); #else - // Bit of a hack, but all events in the zmq_event_t union have the same layout so this will work for all event types. - std::string address = event->data.connected.addr; + // Bit of a hack, but all events in the zmq_event_t union have the same layout so this will work for all event types. + std::string address = event->data.connected.addr; #endif #ifdef ZMQ_EVENT_MONITOR_STOPPED - if (event->event == ZMQ_EVENT_MONITOR_STOPPED) - break; -#endif - - switch (event->event) { - case ZMQ_EVENT_CONNECTED: - on_event_connected(*event, address.c_str()); - break; - case ZMQ_EVENT_CONNECT_DELAYED: - on_event_connect_delayed(*event, address.c_str()); - break; - case ZMQ_EVENT_CONNECT_RETRIED: - on_event_connect_retried(*event, address.c_str()); - break; - case ZMQ_EVENT_LISTENING: - on_event_listening(*event, address.c_str()); - break; - case ZMQ_EVENT_BIND_FAILED: - on_event_bind_failed(*event, address.c_str()); - break; - case ZMQ_EVENT_ACCEPTED: - on_event_accepted(*event, address.c_str()); - break; - case ZMQ_EVENT_ACCEPT_FAILED: - on_event_accept_failed(*event, address.c_str()); - break; - case ZMQ_EVENT_CLOSED: - on_event_closed(*event, address.c_str()); - break; - case ZMQ_EVENT_CLOSE_FAILED: - on_event_close_failed(*event, address.c_str()); - break; - case ZMQ_EVENT_DISCONNECTED: - on_event_disconnected(*event, address.c_str()); - break; - default: - on_event_unknown(*event, address.c_str()); - break; - } - zmq_msg_close (&eventMsg); - } - zmq_close (s); - socketPtr = NULL; + if (event->event == ZMQ_EVENT_MONITOR_STOPPED) { + zmq_msg_close(&eventMsg); + return false; + } + +#endif + + switch (event->event) { + case ZMQ_EVENT_CONNECTED: + on_event_connected(*event, address.c_str()); + break; + case ZMQ_EVENT_CONNECT_DELAYED: + on_event_connect_delayed(*event, address.c_str()); + break; + case ZMQ_EVENT_CONNECT_RETRIED: + on_event_connect_retried(*event, address.c_str()); + break; + case ZMQ_EVENT_LISTENING: + on_event_listening(*event, address.c_str()); + break; + case ZMQ_EVENT_BIND_FAILED: + on_event_bind_failed(*event, address.c_str()); + break; + case ZMQ_EVENT_ACCEPTED: + on_event_accepted(*event, address.c_str()); + break; + case ZMQ_EVENT_ACCEPT_FAILED: + on_event_accept_failed(*event, address.c_str()); + break; + case ZMQ_EVENT_CLOSED: + on_event_closed(*event, address.c_str()); + break; + case ZMQ_EVENT_CLOSE_FAILED: + on_event_close_failed(*event, address.c_str()); + break; + case ZMQ_EVENT_DISCONNECTED: + on_event_disconnected(*event, address.c_str()); + break; +#ifdef ZMQ_BUILD_DRAFT_API +#if ZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 2, 3) + case ZMQ_EVENT_HANDSHAKE_FAILED_NO_DETAIL: + on_event_handshake_failed_no_detail(*event, address.c_str()); + break; + case ZMQ_EVENT_HANDSHAKE_FAILED_PROTOCOL: + on_event_handshake_failed_protocol(*event, address.c_str()); + break; + case ZMQ_EVENT_HANDSHAKE_FAILED_AUTH: + on_event_handshake_failed_auth(*event, address.c_str()); + break; + case ZMQ_EVENT_HANDSHAKE_SUCCEEDED: + on_event_handshake_succeeded(*event, address.c_str()); + break; +#elif ZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 2, 1) + case ZMQ_EVENT_HANDSHAKE_FAILED: + on_event_handshake_failed(*event, address.c_str()); + break; + case ZMQ_EVENT_HANDSHAKE_SUCCEED: + on_event_handshake_succeed(*event, address.c_str()); + break; +#endif +#endif + default: + on_event_unknown(*event, address.c_str()); + break; } + zmq_msg_close(&eventMsg); + + return true; + } #ifdef ZMQ_EVENT_MONITOR_STOPPED - void abort() - { - if (socketPtr) - zmq_socket_monitor(socketPtr, NULL, 0); + void abort() + { + if (socketPtr) + zmq_socket_monitor(socketPtr, NULL, 0); + + socketPtr = NULL; + } +#endif + virtual void on_monitor_started() {} + virtual void on_event_connected(const zmq_event_t &event_, const char *addr_) + { + (void) event_; + (void) addr_; + } + virtual void on_event_connect_delayed(const zmq_event_t &event_, + const char *addr_) + { + (void) event_; + (void) addr_; + } + virtual void on_event_connect_retried(const zmq_event_t &event_, + const char *addr_) + { + (void) event_; + (void) addr_; + } + virtual void on_event_listening(const zmq_event_t &event_, const char *addr_) + { + (void) event_; + (void) addr_; + } + virtual void on_event_bind_failed(const zmq_event_t &event_, const char *addr_) + { + (void) event_; + (void) addr_; + } + virtual void on_event_accepted(const zmq_event_t &event_, const char *addr_) + { + (void) event_; + (void) addr_; + } + virtual void on_event_accept_failed(const zmq_event_t &event_, const char *addr_) + { + (void) event_; + (void) addr_; + } + virtual void on_event_closed(const zmq_event_t &event_, const char *addr_) + { + (void) event_; + (void) addr_; + } + virtual void on_event_close_failed(const zmq_event_t &event_, const char *addr_) + { + (void) event_; + (void) addr_; + } + virtual void on_event_disconnected(const zmq_event_t &event_, const char *addr_) + { + (void) event_; + (void) addr_; + } +#if ZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 2, 3) + virtual void on_event_handshake_failed_no_detail(const zmq_event_t &event_, + const char *addr_) + { + (void) event_; + (void) addr_; + } + virtual void on_event_handshake_failed_protocol(const zmq_event_t &event_, + const char *addr_) + { + (void) event_; + (void) addr_; + } + virtual void on_event_handshake_failed_auth(const zmq_event_t &event_, + const char *addr_) + { + (void) event_; + (void) addr_; + } + virtual void on_event_handshake_succeeded(const zmq_event_t &event_, + const char *addr_) + { + (void) event_; + (void) addr_; + } +#elif ZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 2, 1) + virtual void on_event_handshake_failed(const zmq_event_t &event_, + const char *addr_) + { + (void) event_; + (void) addr_; + } + virtual void on_event_handshake_succeed(const zmq_event_t &event_, + const char *addr_) + { + (void) event_; + (void) addr_; + } +#endif + virtual void on_event_unknown(const zmq_event_t &event_, const char *addr_) + { + (void) event_; + (void) addr_; + } + + private: + monitor_t(const monitor_t &) ZMQ_DELETED_FUNCTION; + void operator=(const monitor_t &) ZMQ_DELETED_FUNCTION; + + void *socketPtr; + void *monitor_socket; +}; + +#if defined(ZMQ_BUILD_DRAFT_API) && defined(ZMQ_CPP11) && defined(ZMQ_HAVE_POLLER) +template class poller_t +{ + public: + void add(zmq::socket_t &socket, short events, T *user_data) + { + if (0 + != zmq_poller_add(poller_ptr.get(), static_cast(socket), + user_data, events)) { + throw error_t(); + } + } + + void remove(zmq::socket_t &socket) + { + if (0 != zmq_poller_remove(poller_ptr.get(), static_cast(socket))) { + throw error_t(); } + } + + void modify(zmq::socket_t &socket, short events) + { + if (0 + != zmq_poller_modify(poller_ptr.get(), static_cast(socket), + events)) { + throw error_t(); + } + } + + size_t wait_all(std::vector &poller_events, + const std::chrono::microseconds timeout) + { + int rc = zmq_poller_wait_all(poller_ptr.get(), poller_events.data(), + static_cast(poller_events.size()), + static_cast(timeout.count())); + if (rc > 0) + return static_cast(rc); + +#if ZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 2, 3) + if (zmq_errno() == EAGAIN) +#else + if (zmq_errno() == ETIMEDOUT) #endif - virtual void on_monitor_started() {} - virtual void on_event_connected(const zmq_event_t &event_, const char* addr_) { (void)event_; (void)addr_; } - virtual void on_event_connect_delayed(const zmq_event_t &event_, const char* addr_) { (void)event_; (void)addr_; } - virtual void on_event_connect_retried(const zmq_event_t &event_, const char* addr_) { (void)event_; (void)addr_; } - virtual void on_event_listening(const zmq_event_t &event_, const char* addr_) { (void)event_; (void)addr_; } - virtual void on_event_bind_failed(const zmq_event_t &event_, const char* addr_) { (void)event_; (void)addr_; } - virtual void on_event_accepted(const zmq_event_t &event_, const char* addr_) { (void)event_; (void)addr_; } - virtual void on_event_accept_failed(const zmq_event_t &event_, const char* addr_) { (void)event_; (void)addr_; } - virtual void on_event_closed(const zmq_event_t &event_, const char* addr_) { (void)event_; (void)addr_; } - virtual void on_event_close_failed(const zmq_event_t &event_, const char* addr_) { (void)event_; (void)addr_; } - virtual void on_event_disconnected(const zmq_event_t &event_, const char* addr_) { (void)event_; (void)addr_; } - virtual void on_event_unknown(const zmq_event_t &event_, const char* addr_) { (void)event_; (void)addr_; } - private: - void* socketPtr; - }; + return 0; + + throw error_t(); + } + + private: + std::unique_ptr> poller_ptr{ + []() { + auto poller_new = zmq_poller_new(); + if (poller_new) + return poller_new; + throw error_t(); + }(), + [](void *ptr) { + int rc = zmq_poller_destroy(&ptr); + ZMQ_ASSERT(rc == 0); + }}; +}; +#endif // defined(ZMQ_BUILD_DRAFT_API) && defined(ZMQ_CPP11) && defined(ZMQ_HAVE_POLLER) + +inline std::ostream &operator<<(std::ostream &os, const message_t &msg) +{ + return os << msg.str(); } -#endif +} // namespace zmq + +#endif // __ZMQ_HPP_INCLUDED__