diff --git a/include/ignition/transport/Helpers.hh b/include/ignition/transport/Helpers.hh index f7e222158..338f33b9b 100644 --- a/include/ignition/transport/Helpers.hh +++ b/include/ignition/transport/Helpers.hh @@ -34,8 +34,8 @@ #define STR(x) STR_HELPER(x) // Avoid using deprecated message send/receive function when possible. -#if ZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 4, 0) - #define IGN_ZMQ_POST_4_4_0 +#if ZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 3, 1) + #define IGN_ZMQ_POST_4_3_1 #endif namespace ignition diff --git a/src/Node.cc b/src/Node.cc index de930e549..551717833 100644 --- a/src/Node.cc +++ b/src/Node.cc @@ -619,8 +619,13 @@ bool Node::Unsubscribe(const std::string &_topic) if (!this->dataPtr->shared->localSubscribers .HasSubscriber(fullyQualifiedTopic)) { +#if (CPPZMQ_VERSION >= 40700) + this->dataPtr->shared->dataPtr->subscriber->set( + zmq::sockopt::unsubscribe, fullyQualifiedTopic); +#else this->dataPtr->shared->dataPtr->subscriber->setsockopt( ZMQ_UNSUBSCRIBE, fullyQualifiedTopic.data(), fullyQualifiedTopic.size()); +#endif } // Notify to the publishers that I am no longer interested in the topic. diff --git a/src/NodeShared.cc b/src/NodeShared.cc index 2ecfde07d..4c1c89a77 100644 --- a/src/NodeShared.cc +++ b/src/NodeShared.cc @@ -91,7 +91,7 @@ bool userPass(std::string &_user, std::string &_pass) ////////////////////////////////////////////////// // Helper to send messages -#ifdef IGN_ZMQ_POST_4_4_0 +#ifdef IGN_ZMQ_POST_4_3_1 int sendHelper(zmq::socket_t &_pub, const std::string &_data, const zmq::send_flags &_type) { @@ -109,7 +109,7 @@ int sendHelper(zmq::socket_t &_pub, const std::string &_data, int _type) { zmq::message_t msg(_data.data(), _data.size()); -#ifdef IGN_ZMQ_POST_4_4_0 +#ifdef IGN_ZMQ_POST_4_3_1 zmq::send_flags flags = zmq::send_flags::none; switch (_type) { @@ -136,7 +136,7 @@ std::string receiveHelper(zmq::socket_t &_socket) { zmq::message_t msg(0); -#ifdef IGN_ZMQ_POST_4_4_0 +#ifdef IGN_ZMQ_POST_4_3_1 if (!_socket.recv(msg)) #else if (!_socket.recv(&msg, 0)) @@ -152,7 +152,7 @@ std::string receiveHelper(zmq::socket_t &_socket) void sendAuthErrorHelper(zmq::socket_t &_socket, const std::string &_err) { std::cerr << _err << std::endl; -#ifdef IGN_ZMQ_POST_4_4_0 +#ifdef IGN_ZMQ_POST_4_3_1 sendHelper(_socket, "400", zmq::send_flags::sndmore); sendHelper(_socket, _err, zmq::send_flags::sndmore); sendHelper(_socket, "", zmq::send_flags::sndmore); @@ -331,7 +331,7 @@ bool NodeShared::Publish( // Send the messages std::lock_guard lock(this->mutex); -#ifdef IGN_ZMQ_POST_4_4_0 +#ifdef IGN_ZMQ_POST_4_3_1 this->dataPtr->publisher->send(msg0, zmq::send_flags::sndmore); this->dataPtr->publisher->send(msg1, zmq::send_flags::sndmore); this->dataPtr->publisher->send(msg2, zmq::send_flags::sndmore); @@ -367,7 +367,7 @@ void NodeShared::RecvMsgUpdate() try { -#ifdef IGN_ZMQ_POST_4_4_0 +#ifdef IGN_ZMQ_POST_4_3_1 if (!this->dataPtr->subscriber->recv(msg)) #else if (!this->dataPtr->subscriber->recv(&msg, 0)) @@ -376,7 +376,7 @@ void NodeShared::RecvMsgUpdate() topic = std::string(reinterpret_cast(msg.data()), msg.size()); // TODO(caguero): Use this as extra metadata for the subscriber. -#ifdef IGN_ZMQ_POST_4_4_0 +#ifdef IGN_ZMQ_POST_4_3_1 if (!this->dataPtr->subscriber->recv(msg)) #else if (!this->dataPtr->subscriber->recv(&msg, 0)) @@ -384,7 +384,7 @@ void NodeShared::RecvMsgUpdate() return; // sender = std::string(reinterpret_cast(msg.data()), msg.size()); -#ifdef IGN_ZMQ_POST_4_4_0 +#ifdef IGN_ZMQ_POST_4_3_1 if (!this->dataPtr->subscriber->recv(msg)) #else if (!this->dataPtr->subscriber->recv(&msg, 0)) @@ -392,7 +392,7 @@ void NodeShared::RecvMsgUpdate() return; data = std::string(reinterpret_cast(msg.data()), msg.size()); -#ifdef IGN_ZMQ_POST_4_4_0 +#ifdef IGN_ZMQ_POST_4_3_1 if (!this->dataPtr->subscriber->recv(msg)) #else if (!this->dataPtr->subscriber->recv(&msg, 0)) @@ -574,14 +574,14 @@ void NodeShared::RecvSrvRequest() try { -#ifdef IGN_ZMQ_POST_4_4_0 +#ifdef IGN_ZMQ_POST_4_3_1 if (!this->dataPtr->replier->recv(msg)) #else if (!this->dataPtr->replier->recv(&msg, 0)) #endif return; -#ifdef IGN_ZMQ_POST_4_4_0 +#ifdef IGN_ZMQ_POST_4_3_1 if (!this->dataPtr->replier->recv(msg)) #else if (!this->dataPtr->replier->recv(&msg, 0)) @@ -589,7 +589,7 @@ void NodeShared::RecvSrvRequest() return; topic = std::string(reinterpret_cast(msg.data()), msg.size()); -#ifdef IGN_ZMQ_POST_4_4_0 +#ifdef IGN_ZMQ_POST_4_3_1 if (!this->dataPtr->replier->recv(msg)) #else if (!this->dataPtr->replier->recv(&msg, 0)) @@ -597,7 +597,7 @@ void NodeShared::RecvSrvRequest() return; sender = std::string(reinterpret_cast(msg.data()), msg.size()); -#ifdef IGN_ZMQ_POST_4_4_0 +#ifdef IGN_ZMQ_POST_4_3_1 if (!this->dataPtr->replier->recv(msg)) #else if (!this->dataPtr->replier->recv(&msg, 0)) @@ -605,7 +605,7 @@ void NodeShared::RecvSrvRequest() return; dstId = std::string(reinterpret_cast(msg.data()), msg.size()); -#ifdef IGN_ZMQ_POST_4_4_0 +#ifdef IGN_ZMQ_POST_4_3_1 if (!this->dataPtr->replier->recv(msg)) #else if (!this->dataPtr->replier->recv(&msg, 0)) @@ -613,7 +613,7 @@ void NodeShared::RecvSrvRequest() return; nodeUuid = std::string(reinterpret_cast(msg.data()), msg.size()); -#ifdef IGN_ZMQ_POST_4_4_0 +#ifdef IGN_ZMQ_POST_4_3_1 if (!this->dataPtr->replier->recv(msg)) #else if (!this->dataPtr->replier->recv(&msg, 0)) @@ -621,7 +621,7 @@ void NodeShared::RecvSrvRequest() return; reqUuid = std::string(reinterpret_cast(msg.data()), msg.size()); -#ifdef IGN_ZMQ_POST_4_4_0 +#ifdef IGN_ZMQ_POST_4_3_1 if (!this->dataPtr->replier->recv(msg)) #else if (!this->dataPtr->replier->recv(&msg, 0)) @@ -629,7 +629,7 @@ void NodeShared::RecvSrvRequest() return; req = std::string(reinterpret_cast(msg.data()), msg.size()); -#ifdef IGN_ZMQ_POST_4_4_0 +#ifdef IGN_ZMQ_POST_4_3_1 if (!this->dataPtr->replier->recv(msg)) #else if (!this->dataPtr->replier->recv(&msg, 0)) @@ -637,7 +637,7 @@ void NodeShared::RecvSrvRequest() return; reqType = std::string(reinterpret_cast(msg.data()), msg.size()); -#ifdef IGN_ZMQ_POST_4_4_0 +#ifdef IGN_ZMQ_POST_4_3_1 if (!this->dataPtr->replier->recv(msg)) #else if (!this->dataPtr->replier->recv(&msg, 0)) @@ -700,7 +700,7 @@ void NodeShared::RecvSrvRequest() response.rebuild(dstId.size()); memcpy(response.data(), dstId.data(), dstId.size()); -#ifdef IGN_ZMQ_POST_4_4_0 +#ifdef IGN_ZMQ_POST_4_3_1 this->dataPtr->replier->send(response, zmq::send_flags::sndmore); #else this->dataPtr->replier->send(response, ZMQ_SNDMORE); @@ -708,7 +708,7 @@ void NodeShared::RecvSrvRequest() response.rebuild(topic.size()); memcpy(response.data(), topic.data(), topic.size()); -#ifdef IGN_ZMQ_POST_4_4_0 +#ifdef IGN_ZMQ_POST_4_3_1 this->dataPtr->replier->send(response, zmq::send_flags::sndmore); #else this->dataPtr->replier->send(response, ZMQ_SNDMORE); @@ -716,7 +716,7 @@ void NodeShared::RecvSrvRequest() response.rebuild(nodeUuid.size()); memcpy(response.data(), nodeUuid.data(), nodeUuid.size()); -#ifdef IGN_ZMQ_POST_4_4_0 +#ifdef IGN_ZMQ_POST_4_3_1 this->dataPtr->replier->send(response, zmq::send_flags::sndmore); #else this->dataPtr->replier->send(response, ZMQ_SNDMORE); @@ -724,7 +724,7 @@ void NodeShared::RecvSrvRequest() response.rebuild(reqUuid.size()); memcpy(response.data(), reqUuid.data(), reqUuid.size()); -#ifdef IGN_ZMQ_POST_4_4_0 +#ifdef IGN_ZMQ_POST_4_3_1 this->dataPtr->replier->send(response, zmq::send_flags::sndmore); #else this->dataPtr->replier->send(response, ZMQ_SNDMORE); @@ -732,7 +732,7 @@ void NodeShared::RecvSrvRequest() response.rebuild(rep.size()); memcpy(response.data(), rep.data(), rep.size()); -#ifdef IGN_ZMQ_POST_4_4_0 +#ifdef IGN_ZMQ_POST_4_3_1 this->dataPtr->replier->send(response, zmq::send_flags::sndmore); #else this->dataPtr->replier->send(response, ZMQ_SNDMORE); @@ -740,7 +740,7 @@ void NodeShared::RecvSrvRequest() response.rebuild(resultStr.size()); memcpy(response.data(), resultStr.data(), resultStr.size()); -#ifdef IGN_ZMQ_POST_4_4_0 +#ifdef IGN_ZMQ_POST_4_3_1 this->dataPtr->replier->send(response, zmq::send_flags::none); #else this->dataPtr->replier->send(response, 0); @@ -780,14 +780,14 @@ void NodeShared::RecvSrvResponse() try { -#ifdef IGN_ZMQ_POST_4_4_0 +#ifdef IGN_ZMQ_POST_4_3_1 if (!this->dataPtr->responseReceiver->recv(msg)) #else if (!this->dataPtr->responseReceiver->recv(&msg, 0)) #endif return; -#ifdef IGN_ZMQ_POST_4_4_0 +#ifdef IGN_ZMQ_POST_4_3_1 if (!this->dataPtr->responseReceiver->recv(msg)) #else if (!this->dataPtr->responseReceiver->recv(&msg, 0)) @@ -795,7 +795,7 @@ void NodeShared::RecvSrvResponse() return; topic = std::string(reinterpret_cast(msg.data()), msg.size()); -#ifdef IGN_ZMQ_POST_4_4_0 +#ifdef IGN_ZMQ_POST_4_3_1 if (!this->dataPtr->responseReceiver->recv(msg)) #else if (!this->dataPtr->responseReceiver->recv(&msg, 0)) @@ -803,7 +803,7 @@ void NodeShared::RecvSrvResponse() return; nodeUuid = std::string(reinterpret_cast(msg.data()), msg.size()); -#ifdef IGN_ZMQ_POST_4_4_0 +#ifdef IGN_ZMQ_POST_4_3_1 if (!this->dataPtr->responseReceiver->recv(msg)) #else if (!this->dataPtr->responseReceiver->recv(&msg, 0)) @@ -811,7 +811,7 @@ void NodeShared::RecvSrvResponse() return; reqUuid = std::string(reinterpret_cast(msg.data()), msg.size()); -#ifdef IGN_ZMQ_POST_4_4_0 +#ifdef IGN_ZMQ_POST_4_3_1 if (!this->dataPtr->responseReceiver->recv(msg)) #else if (!this->dataPtr->responseReceiver->recv(&msg, 0)) @@ -819,7 +819,7 @@ void NodeShared::RecvSrvResponse() return; rep = std::string(reinterpret_cast(msg.data()), msg.size()); -#ifdef IGN_ZMQ_POST_4_4_0 +#ifdef IGN_ZMQ_POST_4_3_1 if (!this->dataPtr->responseReceiver->recv(msg)) #else if (!this->dataPtr->responseReceiver->recv(&msg, 0)) @@ -953,7 +953,7 @@ void NodeShared::SendPendingRemoteReqs(const std::string &_topic, msg.rebuild(responserId.size()); memcpy(msg.data(), responserId.data(), responserId.size()); -#ifdef IGN_ZMQ_POST_4_4_0 +#ifdef IGN_ZMQ_POST_4_3_1 this->dataPtr->requester->send(msg, zmq::send_flags::sndmore); #else this->dataPtr->requester->send(msg, ZMQ_SNDMORE); @@ -961,7 +961,7 @@ void NodeShared::SendPendingRemoteReqs(const std::string &_topic, msg.rebuild(_topic.size()); memcpy(msg.data(), _topic.data(), _topic.size()); -#ifdef IGN_ZMQ_POST_4_4_0 +#ifdef IGN_ZMQ_POST_4_3_1 this->dataPtr->requester->send(msg, zmq::send_flags::sndmore); #else this->dataPtr->requester->send(msg, ZMQ_SNDMORE); @@ -970,7 +970,7 @@ void NodeShared::SendPendingRemoteReqs(const std::string &_topic, msg.rebuild(this->myRequesterAddress.size()); memcpy(msg.data(), this->myRequesterAddress.data(), this->myRequesterAddress.size()); -#ifdef IGN_ZMQ_POST_4_4_0 +#ifdef IGN_ZMQ_POST_4_3_1 this->dataPtr->requester->send(msg, zmq::send_flags::sndmore); #else this->dataPtr->requester->send(msg, ZMQ_SNDMORE); @@ -979,7 +979,7 @@ void NodeShared::SendPendingRemoteReqs(const std::string &_topic, std::string myId = this->responseReceiverId.ToString(); msg.rebuild(myId.size()); memcpy(msg.data(), myId.data(), myId.size()); -#ifdef IGN_ZMQ_POST_4_4_0 +#ifdef IGN_ZMQ_POST_4_3_1 this->dataPtr->requester->send(msg, zmq::send_flags::sndmore); #else this->dataPtr->requester->send(msg, ZMQ_SNDMORE); @@ -987,7 +987,7 @@ void NodeShared::SendPendingRemoteReqs(const std::string &_topic, msg.rebuild(nodeUuid.size()); memcpy(msg.data(), nodeUuid.data(), nodeUuid.size()); -#ifdef IGN_ZMQ_POST_4_4_0 +#ifdef IGN_ZMQ_POST_4_3_1 this->dataPtr->requester->send(msg, zmq::send_flags::sndmore); #else this->dataPtr->requester->send(msg, ZMQ_SNDMORE); @@ -995,7 +995,7 @@ void NodeShared::SendPendingRemoteReqs(const std::string &_topic, msg.rebuild(reqUuid.size()); memcpy(msg.data(), reqUuid.data(), reqUuid.size()); -#ifdef IGN_ZMQ_POST_4_4_0 +#ifdef IGN_ZMQ_POST_4_3_1 this->dataPtr->requester->send(msg, zmq::send_flags::sndmore); #else this->dataPtr->requester->send(msg, ZMQ_SNDMORE); @@ -1003,7 +1003,7 @@ void NodeShared::SendPendingRemoteReqs(const std::string &_topic, msg.rebuild(data.size()); memcpy(msg.data(), data.data(), data.size()); -#ifdef IGN_ZMQ_POST_4_4_0 +#ifdef IGN_ZMQ_POST_4_3_1 this->dataPtr->requester->send(msg, zmq::send_flags::sndmore); #else this->dataPtr->requester->send(msg, ZMQ_SNDMORE); @@ -1011,7 +1011,7 @@ void NodeShared::SendPendingRemoteReqs(const std::string &_topic, msg.rebuild(_reqType.size()); memcpy(msg.data(), _reqType.data(), _reqType.size()); -#ifdef IGN_ZMQ_POST_4_4_0 +#ifdef IGN_ZMQ_POST_4_3_1 this->dataPtr->requester->send(msg, zmq::send_flags::sndmore); #else this->dataPtr->requester->send(msg, ZMQ_SNDMORE); @@ -1019,7 +1019,7 @@ void NodeShared::SendPendingRemoteReqs(const std::string &_topic, msg.rebuild(_repType.size()); memcpy(msg.data(), _repType.data(), _repType.size()); -#ifdef IGN_ZMQ_POST_4_4_0 +#ifdef IGN_ZMQ_POST_4_3_1 this->dataPtr->requester->send(msg, zmq::send_flags::none); #else this->dataPtr->requester->send(msg, 0); @@ -1068,8 +1068,12 @@ void NodeShared::OnNewConnection(const MessagePublisher &_pub) this->dataPtr->subscriber->connect(addr.c_str()); // Add a new filter for the topic. +#if (CPPZMQ_VERSION >= 40700) + this->dataPtr->subscriber->set(zmq::sockopt::subscribe, topic); +#else this->dataPtr->subscriber->setsockopt(ZMQ_SUBSCRIBE, topic.data(), topic.size()); +#endif // Register the new connection with the publisher. this->connections.AddPublisher(_pub); @@ -1257,10 +1261,14 @@ bool NodeShared::InitializeSockets() // Initialize security this->dataPtr->SecurityInit(); - char bindEndPoint[1024]; + int lingerVal = 0; +#if (CPPZMQ_VERSION >= 40700) + this->dataPtr->publisher->set(zmq::sockopt::linger, lingerVal); +#else this->dataPtr->publisher->setsockopt(ZMQ_LINGER, &lingerVal, sizeof(lingerVal)); +#endif // Set the capacity of the buffer for receiving messages. std::string ignRcvHwm; @@ -1293,8 +1301,12 @@ bool NodeShared::InitializeSockets() << std::endl; } } +#if (CPPZMQ_VERSION >= 40700) + this->dataPtr->subscriber->set(zmq::sockopt::rcvhwm, rcvQueueVal); +#else this->dataPtr->subscriber->setsockopt(ZMQ_RCVHWM, &rcvQueueVal, sizeof(rcvQueueVal)); +#endif // Set the capacity of the buffer for sending messages. std::string ignSndHwm; @@ -1327,6 +1339,34 @@ bool NodeShared::InitializeSockets() << std::endl; } } +#if (CPPZMQ_VERSION >= 40700) + this->dataPtr->publisher->set(zmq::sockopt::sndhwm, sndQueueVal); + + this->dataPtr->publisher->bind(anyTcpEp.c_str()); + this->myAddress = + this->dataPtr->publisher->get(zmq::sockopt::last_endpoint); + + // ResponseReceiver socket listening in a random port. + std::string id = this->responseReceiverId.ToString(); + this->dataPtr->responseReceiver->set(zmq::sockopt::routing_id, id); + this->dataPtr->responseReceiver->bind(anyTcpEp.c_str()); + this->myRequesterAddress = this->dataPtr->responseReceiver->get( + zmq::sockopt::last_endpoint); + + // Replier socket listening in a random port. + id = this->replierId.ToString(); + this->dataPtr->replier->set(zmq::sockopt::routing_id, id); + int routeOn = 1; + this->dataPtr->replier->set(zmq::sockopt::linger, lingerVal); + this->dataPtr->replier->set(zmq::sockopt::router_mandatory, routeOn); + this->dataPtr->replier->bind(anyTcpEp.c_str()); + this->myReplierAddress = + this->dataPtr->replier->get(zmq::sockopt::last_endpoint); + + this->dataPtr->requester->set(zmq::sockopt::linger, lingerVal); + this->dataPtr->requester->set(zmq::sockopt::router_mandatory, routeOn); +#else + char bindEndPoint[1024]; this->dataPtr->publisher->setsockopt(ZMQ_SNDHWM, &sndQueueVal, sizeof(sndQueueVal)); @@ -1361,6 +1401,7 @@ bool NodeShared::InitializeSockets() &lingerVal, sizeof(lingerVal)); this->dataPtr->requester->setsockopt(ZMQ_ROUTER_MANDATORY, &RouteOn, sizeof(RouteOn)); +#endif } catch(const zmq::error_t& ze) { @@ -1396,10 +1437,14 @@ bool NodeShared::AdvertisePublisher(const ServicePublisher &_publisher) int NodeShared::RcvHwm() { int rcvHwm; - size_t rcvHwmSize = sizeof(rcvHwm); try { +#if (CPPZMQ_VERSION >= 40700) + rcvHwm = this->dataPtr->subscriber->get(zmq::sockopt::rcvhwm); +#else + size_t rcvHwmSize = sizeof(rcvHwm); this->dataPtr->subscriber->getsockopt(ZMQ_RCVHWM, &rcvHwm, &rcvHwmSize); +#endif } catch (zmq::error_t &_e) { @@ -1413,10 +1458,14 @@ int NodeShared::RcvHwm() int NodeShared::SndHwm() { int sndHwm; - size_t sndHwmSize = sizeof(sndHwm); try { +#if (CPPZMQ_VERSION >= 40700) + sndHwm = this->dataPtr->publisher->get(zmq::sockopt::sndhwm); +#else + size_t sndHwmSize = sizeof(sndHwm); this->dataPtr->publisher->getsockopt(ZMQ_SNDHWM, &sndHwm, &sndHwmSize); +#endif } catch (zmq::error_t &_e) { @@ -1512,8 +1561,13 @@ void NodeSharedPrivate::SecurityOnNewConnection() // See issue #74 if (userPass(user, pass)) { +#if (CPPZMQ_VERSION >= 40700) + this->subscriber->set(zmq::sockopt::plain_username, user); + this->subscriber->set(zmq::sockopt::plain_password, pass); +#else this->subscriber->setsockopt(ZMQ_PLAIN_USERNAME, user.c_str(), user.size()); this->subscriber->setsockopt(ZMQ_PLAIN_PASSWORD, pass.c_str(), pass.size()); +#endif } } @@ -1531,11 +1585,16 @@ void NodeSharedPrivate::SecurityInit() int asPlainSecurityServer = static_cast( ZmqPlainSecurityServerOptions::ZMQ_PLAIN_SECURITY_SERVER_ENABLED); + +#if (CPPZMQ_VERSION >= 40700) + this->publisher->set(zmq::sockopt::plain_server, asPlainSecurityServer); + this->publisher->set(zmq::sockopt::zap_domain, kIgnAuthDomain); +#else this->publisher->setsockopt(ZMQ_PLAIN_SERVER, &asPlainSecurityServer, sizeof(asPlainSecurityServer)); - this->publisher->setsockopt(ZMQ_ZAP_DOMAIN, kIgnAuthDomain, std::strlen(kIgnAuthDomain)); +#endif } } @@ -1645,7 +1704,7 @@ void NodeSharedPrivate::AccessControlHandler() continue; } -#ifdef IGN_ZMQ_POST_4_4_0 +#ifdef IGN_ZMQ_POST_4_3_1 sendHelper(*sock, version, zmq::send_flags::sndmore); sendHelper(*sock, sequence, zmq::send_flags::sndmore); #else @@ -1656,7 +1715,7 @@ void NodeSharedPrivate::AccessControlHandler() // Check the username and password if (givenUsername == user && givenPassword == pass) { -#ifdef IGN_ZMQ_POST_4_4_0 +#ifdef IGN_ZMQ_POST_4_3_1 sendHelper(*sock, "200", zmq::send_flags::sndmore); sendHelper(*sock, "OK", zmq::send_flags::sndmore); sendHelper(*sock, "anonymous", zmq::send_flags::sndmore);