forked from wttw/zeromqt
-
Notifications
You must be signed in to change notification settings - Fork 0
/
ZmqSocket.cc
137 lines (117 loc) · 2.67 KB
/
ZmqSocket.cc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
#include <zmq.h>
#include <errno.h>
#include <stdint.h>
#include "ZmqSocket.h"
#include "ZmqContext.h"
#include <QtDebug>
#if 1
#define ZMQERRNO errno
#else
#define ZMQERRNO zmq_errno()
#endif
#define USE_EXCEPTIONS 1
#ifdef USE_EXCEPTIONS
#include "ZmqException.h"
#define THROW() throw ZmqException()
#else
#define THROW() do { error_ = ZMQERRNO; \
errorString_ = QString::fromAscii(zmq_strerror(error_)); }
#endif
ZmqSocket::ZmqSocket(int type, QObject *parent) :
QObject(parent)
{
ZmqContext *ctx = ZmqContext::instance();
socket_ = zmq_socket(ctx->context_, type);
setLinger(ctx->linger());
// FIXME - on Windows getsockopt returns a SOCKET opaque handle
int fd;
size_t size=sizeof(fd);
getOpt(ZMQ_FD, &fd, &size);
notifier_ = new QSocketNotifier(fd, QSocketNotifier::Read, this);
connect(notifier_, SIGNAL(activated(int)), this, SLOT(activity()));
}
ZmqSocket::~ZmqSocket()
{
zmq_close(socket_);
}
bool ZmqSocket::getOpt(int opt_name, void *opt_value, size_t *opt_length) {
if(zmq_getsockopt(socket_, opt_name, opt_value, opt_length)) {
THROW();
return false;
}
return true;
}
bool ZmqSocket::setOpt(int opt_name, const void *opt_value, size_t opt_length) {
if(zmq_setsockopt(socket_, opt_name, opt_value, opt_length)) {
THROW();
return false;
}
return true;
}
void ZmqSocket::setIdentity(const QByteArray &name)
{
setOpt(ZMQ_IDENTITY, const_cast<char*>(name.constData()), name.size());
}
QByteArray ZmqSocket::identity()
{
char buff[256];
size_t size = sizeof(buff);
if(getOpt(ZMQ_IDENTITY, buff, &size)) {
return QByteArray(buff, size);
}
return QByteArray();
}
void ZmqSocket::setLinger(int msec)
{
setOpt(ZMQ_LINGER, msec);
}
int ZmqSocket::linger()
{
int msec=-1;
size_t size = sizeof(msec);
getOpt(ZMQ_LINGER, &msec, &size);
return msec;
}
void ZmqSocket::subscribe(const QByteArray &filter)
{
setOpt(ZMQ_SUBSCRIBE, filter);
}
void ZmqSocket::unsubscribe(const QByteArray &filter)
{
setOpt(ZMQ_UNSUBSCRIBE, filter);
}
void ZmqSocket::activity()
{
uint32_t flags;
size_t size = sizeof(flags);
if(!getOpt(ZMQ_EVENTS, &flags, &size)) {
qWarning("Error reading ZMQ_EVENTS in ZMQSocket::activity");
return;
}
if(flags & ZMQ_POLLIN) {
emit readyRead();
}
if(flags & ZMQ_POLLOUT) {
emit readyWrite();
}
if(flags & ZMQ_POLLERR) {
// ?
}
}
void ZmqSocket::bind(const char *addr_)
{
if(0 != zmq_bind(socket_, addr_)) THROW();
}
void ZmqSocket::connectTo(const char *addr_)
{
if(0 != zmq_connect(socket_, addr_)) THROW();
}
QList<QByteArray> ZmqSocket::recv() {
QList<QByteArray> ret;
ZmqMessage m;
while(recv(&m)) {
ret.append(m.toByteArray());
m.clear();
}
return ret;
}