Skip to content

Commit

Permalink
Substitute quota-based throttling with blocking tx
Browse files Browse the repository at this point in the history
If socket send buffer is full, block until it has space. This will allow us to
constrain the throughput externally, e.g., using Linux traffic shaping
tools. The biggest advantage of the external throttling approach is that we can
also rate-limit other sources of traffic that are sent over satellite in
addition to bitcoin traffic, and which compete for satellite link capacity.

With blocking transmissions, the quota-based throttling approach becomes
unnecessary. Also, by removing it, we can avoid potential issues with inaccurate
sleeping and quota capping. In the new implementation, we poll the Tx socket for
space and change queues when the socket blocks (after a timeout). This way we
can keep going on a faster socket while the slower socket waits.
  • Loading branch information
blockstreamsatellite committed Jun 12, 2020
1 parent 5eb91ab commit 847ac2c
Showing 1 changed file with 20 additions and 35 deletions.
55 changes: 20 additions & 35 deletions src/udpnet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include <event2/event.h>

#include <boost/thread.hpp>
#include <poll.h>

#include <atomic>
#include <chrono>
Expand Down Expand Up @@ -176,11 +177,8 @@ struct PerGroupMessageQueue {

uint64_t bw;
bool multicast;
double byte_quota;
double target_bytes_per_sec;
double max_byte_quota_per_turn;
struct PerQueueSendState state;
PerGroupMessageQueue() : bw(0), multicast(false), byte_quota(0), target_bytes_per_sec(0), max_byte_quota_per_turn(0) {}
PerGroupMessageQueue() : bw(0), multicast(false) {}
PerGroupMessageQueue(PerGroupMessageQueue&& q) =delete;
};
static std::map<size_t, PerGroupMessageQueue> mapTxQueues;
Expand Down Expand Up @@ -1055,19 +1053,28 @@ static void do_send_messages() {

PendingMessagesBuff* buff = &queue.buffs[send_state.buff_state.buff_id];
std::tuple<CService, UDPMessage, unsigned int, uint64_t>& msg = buff->messagesPendingRingBuff[send_state.buff_state.nextPendingMessage];
queue.byte_quota += queue.target_bytes_per_sec * to_seconds(t_now - send_state.last_send);

/* Don't let this Tx group hold the link for too long */
if (queue.byte_quota > queue.max_byte_quota_per_turn)
queue.byte_quota = queue.max_byte_quota_per_turn;

while (queue.byte_quota > (std::get<2>(msg) + ip_udp_header_size) && send_state.buff_state.buff_id != -1) {
while (send_state.buff_state.buff_id != -1) {
if (queue.multicast) {
assert((std::get<1>(msg).header.msg_type & UDP_MSG_TYPE_TYPE_MASK) == MSG_TYPE_BLOCK_HEADER ||
(std::get<1>(msg).header.msg_type & UDP_MSG_TYPE_TYPE_MASK) == MSG_TYPE_BLOCK_CONTENTS ||
(std::get<1>(msg).header.msg_type & UDP_MSG_TYPE_TYPE_MASK) == MSG_TYPE_TX_CONTENTS);
}

// Check if we can write into the socket
struct pollfd pollfd = {};
pollfd.fd = udp_socks[group];
pollfd.events = POLLOUT;
const int timeout_ms = 1;
int nRet = poll(&pollfd, 1, timeout_ms);
if (nRet == 0) {
break; // Socket buffer is full. Move to the next Tx queue.
} else if (nRet < 0) {
LogPrintf("UDP: socket polling of group %d failed: %s\n",
group, strerror(errno));
// TODO handle
}

FillChecksum(std::get<3>(msg), std::get<1>(msg), std::get<2>(msg));

sockaddr_storage ss = {};
Expand All @@ -1087,14 +1094,13 @@ static void do_send_messages() {
addrlen = sizeof(sockaddr_in);
}

if (sendto(udp_socks[group], &std::get<1>(msg), std::get<2>(msg), 0, (sockaddr *) &ss, addrlen) != std::get<2>(msg)) {
//TODO: Handle?
ssize_t res = sendto(udp_socks[group], &std::get<1>(msg), std::get<2>(msg), 0, (sockaddr *) &ss, addrlen);
if (res != std::get<2>(msg)) {
LogPrintf("UDP: sendto to group %d failed: %s\n",
group, strerror(errno));
continue; /* Try sending the message again */
}

queue.byte_quota -= std::get<2>(msg) + ip_udp_header_size;

send_state.buff_state.nextPendingMessage = (send_state.buff_state.nextPendingMessage + 1) % PENDING_MESSAGES_BUFF_SIZE;
if (send_state.buff_state.nextPendingMessage == send_state.buff_state.nextUndefinedMessage) {
buff->nextPendingMessage = send_state.buff_state.nextPendingMessage;
Expand Down Expand Up @@ -1480,27 +1486,6 @@ static std::map<size_t, PerGroupMessageQueue> init_tx_queues(const std::vector<s
}
}

/* Throughput settings of each Tx group
*
* - On average, the transmission loop will try to respect
* `target_bytes_per_sec` bytes/sec for each Tx group. This will be the
* sum rate of all 3 message queues of the group.
*
* - Only one Tx group can be transmitting at a time and, hence, Tx groups
* take turns. On each turn, there is a limit for the number of bytes the
* Tx group can transmit. This is just so that other queues don't wait too
* long for their turn. The limit is proportional to the groups's
* throughput and there is some margin such that the residual quota from
* the previous turn is not lost due to capping.
*
* NOTE: -udpport sets bw in Mbps, while -udpmulticasttx sets in bps. */
for (auto it = mapQueues.begin(); it != mapQueues.end(); it++) {
double& target_bytes_per_sec = it->second.target_bytes_per_sec;
target_bytes_per_sec = it->second.bw * (it->second.multicast ? 1 : 1024 * 1024) / 8;
const double cap_margin = sizeof(UDPMessage) + ip_udp_header_size - 1;
it->second.max_byte_quota_per_turn = (target_bytes_per_sec * max_tx_group_turn_duration) + cap_margin;
}

return mapQueues;
}

Expand Down

0 comments on commit 847ac2c

Please sign in to comment.