Skip to content

Commit

Permalink
[ZMQ] append a message sequence number to every ZMQ notification (#1082)
Browse files Browse the repository at this point in the history
* Zmq sequence (#1)

* Fixes ZMQ startup with bad arguments.

pr 7621

* [ZMQ] append a message sequence number to every ZMQ notification

- pr 7762
- contrib/zmq/zmq_sub.py to python 3 compatible

* typo in MSG_RAWTXLOCK

MMSG_RAWTXLOCK to MSG_RAWTXLOCK

* s/Bitcoind/dashd/
  • Loading branch information
chaeplin authored and UdjinM6 committed Oct 17, 2016
1 parent 7a45c93 commit 9db6b97
Show file tree
Hide file tree
Showing 6 changed files with 83 additions and 33 deletions.
46 changes: 26 additions & 20 deletions contrib/zmq/zmq_sub.py
Original file line number Diff line number Diff line change
@@ -1,45 +1,51 @@
#!/usr/bin/env python2
#!/usr/bin/env python

import array
import binascii
import zmq
import struct

port = 28332

zmqContext = zmq.Context()
zmqSubSocket = zmqContext.socket(zmq.SUB)
zmqSubSocket.setsockopt(zmq.SUBSCRIBE, "hashblock")
zmqSubSocket.setsockopt(zmq.SUBSCRIBE, "hashtx")
zmqSubSocket.setsockopt(zmq.SUBSCRIBE, "hashtxlock")
zmqSubSocket.setsockopt(zmq.SUBSCRIBE, "rawblock")
zmqSubSocket.setsockopt(zmq.SUBSCRIBE, "rawtx")
zmqSubSocket.setsockopt(zmq.SUBSCRIBE, "rawtxlock")
zmqSubSocket.setsockopt(zmq.SUBSCRIBE, b"hashblock")
zmqSubSocket.setsockopt(zmq.SUBSCRIBE, b"hashtx")
zmqSubSocket.setsockopt(zmq.SUBSCRIBE, b"hashtxlock")
zmqSubSocket.setsockopt(zmq.SUBSCRIBE, b"rawblock")
zmqSubSocket.setsockopt(zmq.SUBSCRIBE, b"rawtx")
zmqSubSocket.setsockopt(zmq.SUBSCRIBE, b"rawtxlock")
zmqSubSocket.connect("tcp://127.0.0.1:%i" % port)

try:
while True:
msg = zmqSubSocket.recv_multipart()
topic = str(msg[0])
topic = str(msg[0].decode("utf-8"))
body = msg[1]
sequence = "Unknown";

if len(msg[-1]) == 4:
msgSequence = struct.unpack('<I', msg[-1])[-1]
sequence = str(msgSequence)

if topic == "hashblock":
print "- HASH BLOCK -"
print binascii.hexlify(body)
print('- HASH BLOCK ('+sequence+') -')
print(binascii.hexlify(body).decode("utf-8"))
elif topic == "hashtx":
print '- HASH TX -'
print binascii.hexlify(body)
print ('- HASH TX ('+sequence+') -')
print(binascii.hexlify(body).decode("utf-8"))
elif topic == "hashtxlock":
print '- HASH TX LOCK -'
print binascii.hexlify(body)
print('- HASH TX LOCK ('+sequence+') -')
print(binascii.hexlify(body).decode("utf-8"))
elif topic == "rawblock":
print "- RAW BLOCK HEADER -"
print binascii.hexlify(body[:80])
print('- RAW BLOCK HEADER ('+sequence+') -')
print(binascii.hexlify(body[:80]).decode("utf-8"))
elif topic == "rawtx":
print '- RAW TX -'
print binascii.hexlify(body)
print('- RAW TX ('+sequence+') -')
print(binascii.hexlify(body).decode("utf-8"))
elif topic == "rawtxlock":
print '- RAW TX LOCK -'
print binascii.hexlify(body)
print('- RAW TX LOCK ('+sequence+') -')
print(binascii.hexlify(body).decode("utf-8"))

except KeyboardInterrupt:
zmqContext.destroy()
9 changes: 9 additions & 0 deletions doc/release-notes.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,15 @@ The following outputs are affected by this change:
- REST `/rest/block/` (JSON format when including extended tx details)
- `bitcoin-tx -json`

### ZMQ

Each ZMQ notification now contains an up-counting sequence number that allows
listeners to detect lost notifications.
The sequence number is always the last element in a multi-part ZMQ notification and
therefore backward compatible.
Each message type has its own counter.
(https://github.com/bitcoin/bitcoin/pull/7762)

### Configuration and command-line options

### Block and transaction handling
Expand Down
5 changes: 5 additions & 0 deletions doc/zmq.md
Original file line number Diff line number Diff line change
Expand Up @@ -101,3 +101,8 @@ using other means such as firewalling.
Note that when the block chain tip changes, a reorganisation may occur
and just the tip will be notified. It is up to the subscriber to
retrieve the chain from the last known block to the new tip.

There are several possibilities that ZMQ notification can get lost
during transmission depending on the communication type your are
using. Dashd appends an up-counting sequence number to each
notification which allows listeners to detect lost notifications.
1 change: 0 additions & 1 deletion src/zmq/zmqnotificationinterface.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,6 @@ bool CZMQNotificationInterface::Initialize()

if (i!=notifiers.end())
{
Shutdown();
return false;
}

Expand Down
43 changes: 31 additions & 12 deletions src/zmq/zmqpublishnotifier.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,13 @@

static std::multimap<std::string, CZMQAbstractPublishNotifier*> mapPublishNotifiers;

static const char *MSG_HASHBLOCK = "hashblock";
static const char *MSG_HASHTX = "hashtx";
static const char *MSG_HASHTXLOCK = "hashtxlock";
static const char *MSG_RAWBLOCK = "rawblock";
static const char *MSG_RAWTX = "rawtx";
static const char *MSG_RAWTXLOCK = "rawtxlock";

// Internal function to send multipart message
static int zmq_send_multipart(void *sock, const void* data, size_t size, ...)
{
Expand Down Expand Up @@ -69,6 +76,7 @@ bool CZMQAbstractPublishNotifier::Initialize(void *pcontext)
if (rc!=0)
{
zmqError("Failed to bind address");
zmq_close(psocket);
return false;
}

Expand Down Expand Up @@ -117,15 +125,31 @@ void CZMQAbstractPublishNotifier::Shutdown()
psocket = 0;
}

bool CZMQAbstractPublishNotifier::SendMessage(const char *command, const void* data, size_t size)
{
assert(psocket);

/* send three parts, command & data & a LE 4byte sequence number */
unsigned char msgseq[sizeof(uint32_t)];
WriteLE32(&msgseq[0], nSequence);
int rc = zmq_send_multipart(psocket, command, strlen(command), data, size, msgseq, (size_t)sizeof(uint32_t), (void*)0);
if (rc == -1)
return false;

/* increment memory only sequence number after sending */
nSequence++;

return true;
}

bool CZMQPublishHashBlockNotifier::NotifyBlock(const CBlockIndex *pindex)
{
uint256 hash = pindex->GetBlockHash();
LogPrint("zmq", "zmq: Publish hashblock %s\n", hash.GetHex());
char data[32];
for (unsigned int i = 0; i < 32; i++)
data[31 - i] = hash.begin()[i];
int rc = zmq_send_multipart(psocket, "hashblock", 9, data, 32, 0);
return rc == 0;
return SendMessage(MSG_HASHBLOCK, data, 32);
}

bool CZMQPublishHashTransactionNotifier::NotifyTransaction(const CTransaction &transaction)
Expand All @@ -135,8 +159,7 @@ bool CZMQPublishHashTransactionNotifier::NotifyTransaction(const CTransaction &t
char data[32];
for (unsigned int i = 0; i < 32; i++)
data[31 - i] = hash.begin()[i];
int rc = zmq_send_multipart(psocket, "hashtx", 6, data, 32, 0);
return rc == 0;
return SendMessage(MSG_HASHTX, data, 32);
}

bool CZMQPublishHashTransactionLockNotifier::NotifyTransactionLock(const CTransaction &transaction)
Expand All @@ -146,8 +169,7 @@ bool CZMQPublishHashTransactionLockNotifier::NotifyTransactionLock(const CTransa
char data[32];
for (unsigned int i = 0; i < 32; i++)
data[31 - i] = hash.begin()[i];
int rc = zmq_send_multipart(psocket, "hashtxlock", 10, data, 32, 0);
return rc == 0;
return SendMessage(MSG_HASHTXLOCK, data, 32);
}

bool CZMQPublishRawBlockNotifier::NotifyBlock(const CBlockIndex *pindex)
Expand All @@ -168,8 +190,7 @@ bool CZMQPublishRawBlockNotifier::NotifyBlock(const CBlockIndex *pindex)
ss << block;
}

int rc = zmq_send_multipart(psocket, "rawblock", 8, &(*ss.begin()), ss.size(), 0);
return rc == 0;
return SendMessage(MSG_RAWBLOCK, &(*ss.begin()), ss.size());
}

bool CZMQPublishRawTransactionNotifier::NotifyTransaction(const CTransaction &transaction)
Expand All @@ -178,8 +199,7 @@ bool CZMQPublishRawTransactionNotifier::NotifyTransaction(const CTransaction &tr
LogPrint("zmq", "zmq: Publish rawtx %s\n", hash.GetHex());
CDataStream ss(SER_NETWORK, PROTOCOL_VERSION);
ss << transaction;
int rc = zmq_send_multipart(psocket, "rawtx", 5, &(*ss.begin()), ss.size(), 0);
return rc == 0;
return SendMessage(MSG_RAWTX, &(*ss.begin()), ss.size());
}

bool CZMQPublishRawTransactionLockNotifier::NotifyTransactionLock(const CTransaction &transaction)
Expand All @@ -188,6 +208,5 @@ bool CZMQPublishRawTransactionLockNotifier::NotifyTransactionLock(const CTransac
LogPrint("zmq", "zmq: Publish rawtxlock %s\n", hash.GetHex());
CDataStream ss(SER_NETWORK, PROTOCOL_VERSION);
ss << transaction;
int rc = zmq_send_multipart(psocket, "rawtxlock", 9, &(*ss.begin()), ss.size(), 0);
return rc == 0;
return SendMessage(MSG_RAWTXLOCK, &(*ss.begin()), ss.size());
}
12 changes: 12 additions & 0 deletions src/zmq/zmqpublishnotifier.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,19 @@ class CBlockIndex;

class CZMQAbstractPublishNotifier : public CZMQAbstractNotifier
{
private:
uint32_t nSequence; // upcounting per message sequence number

public:

/* send zmq multipart message
parts:
* command
* data
* message sequence number
*/
bool SendMessage(const char *command, const void* data, size_t size);

bool Initialize(void *pcontext);
void Shutdown();
};
Expand Down

0 comments on commit 9db6b97

Please sign in to comment.