Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve compression support: #3419

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion src/ripple/overlay/Compression.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ namespace compression {
std::size_t constexpr headerBytes = 6;
std::size_t constexpr headerBytesCompressed = 10;

enum class Algorithm : std::uint8_t { None = 0x00, LZ4 = 0x01 };
// All values other than 'none' must have the high bit. The low order four bits
// must be 0.
enum class Algorithm : std::uint8_t { None = 0x00, LZ4 = 0x90 };
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the change from 0x01 to 0x90?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new framing for compressed messages alters the first byte of the header, and specifically the first four bits: If the high bit is set, it means the message is compressed. The remaining three bits specify the compression algorithm used.

0x90 is 1001000 in binary: high bit set (compression enabled) , low bit of high nibble set (LZ4).

While reading your comment, it occurred to me to check when we set this value, and I noticed that the code there is wrong. It does:

*h |= 0x80 | (static_cast<uint8_t>(comprAlgorithm) << 4);

Obviously not the right thing, this should now be: *h |= static_cast<std::uint8_t>(comprAlgorithm); instead. For the record, this was a bug that I introduced into this PR and not @gregtatcam.

I'll push a [FOLD] commit to address this.

Thanks @MarkusTeufelberger! Sharp eyes!


enum class Compressed : std::uint8_t { On, Off };

Expand Down
4 changes: 2 additions & 2 deletions src/ripple/overlay/Message.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ class Message : public std::enable_shared_from_this<Message>
* @param in Pointer to the payload
* @param payloadBytes Size of the payload excluding the header size
* @param type Protocol message type
* @param comprAlgorithm Compression algorithm used in compression,
* @param compression Compression algorithm used in compression,
* currently LZ4 only. If None then the message is uncompressed.
* @param uncompressedBytes Size of the uncompressed message
*/
Expand All @@ -93,7 +93,7 @@ class Message : public std::enable_shared_from_this<Message>
std::uint8_t* in,
std::uint32_t payloadBytes,
int type,
Algorithm comprAlgorithm,
Algorithm compression,
std::uint32_t uncompressedBytes);

/** Try to compress the payload.
Expand Down
3 changes: 3 additions & 0 deletions src/ripple/overlay/Peer.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,9 @@ class Peer
cycleStatus() = 0;
virtual bool
hasRange(std::uint32_t uMin, std::uint32_t uMax) = 0;

virtual bool
compressionEnabled() const = 0;
};

} // namespace ripple
Expand Down
50 changes: 37 additions & 13 deletions src/ripple/overlay/impl/Message.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -109,22 +109,46 @@ Message::compress()
}

/** Set payload header
* Uncompressed message header
* 47-42 Set to 0
* 41-16 Payload size
* 15-0 Message Type
* Compressed message header
* 79 Set to 0, indicates the message is compressed
* 78-76 Compression algorithm, value 1-7. Set to 1 to indicate LZ4
* compression 75-74 Set to 0 73-48 Payload size 47-32 Message Type
* 31-0 Uncompressed message size
*/

The header is a variable-sized structure that contains information about
the type of the message and the length and encoding of the payload.

The first bit determines whether a message is compressed or uncompressed;
for compressed messages, the next three bits identify the compression
algorithm.

All multi-byte values are represented in big endian.

For uncompressed messages (6 bytes), numbering bits from left to right:

- The first 6 bits are set to 0.
- The next 26 bits represent the payload size.
nbougalis marked this conversation as resolved.
Show resolved Hide resolved
- The remaining 16 bits represent the message type.

For compressed messages (10 bytes), numbering bits from left to right:

- The first 32 bits, together, represent the compression algorithm
and payload size:
- The first bit is set to 1 to indicate the message is compressed.
- The next 3 bits indicate the compression algorithm.
- The next 2 bits are reserved at this time and set to 0.
- The remaining 26 bits represent the payload size.
- The next 16 bits represent the message type.
- The remaining 32 bits are the uncompressed message size.

The maximum size of a message at this time is 64 MB. Messages larger than
this will be dropped and the recipient may, at its option, sever the link.

@note While nominally a part of the wire protocol, the framing is subject
to change; future versions of the code may negotiate the use of
substantially different framing.
*/
void
Message::setHeader(
std::uint8_t* in,
std::uint32_t payloadBytes,
int type,
Algorithm comprAlgorithm,
Algorithm compression,
std::uint32_t uncompressedBytes)
{
auto h = in;
gregtatcam marked this conversation as resolved.
Show resolved Hide resolved
Expand All @@ -142,10 +166,10 @@ Message::setHeader(
*in++ = static_cast<std::uint8_t>((type >> 8) & 0xFF);
*in++ = static_cast<std::uint8_t>(type & 0xFF);

if (comprAlgorithm != Algorithm::None)
if (compression != Algorithm::None)
{
pack(in, uncompressedBytes);
*h |= 0x80 | (static_cast<uint8_t>(comprAlgorithm) << 4);
*h |= static_cast<std::uint8_t>(compression);
}
}

Expand Down
6 changes: 6 additions & 0 deletions src/ripple/overlay/impl/PeerImp.h
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,12 @@ class PeerImp : public Peer,
boost::optional<hash_map<PublicKey, ShardInfo>>
getPeerShardInfo() const;

bool
compressionEnabled() const override
{
return compressionEnabled_ == Compressed::On;
}

private:
void
close();
Expand Down
99 changes: 77 additions & 22 deletions src/ripple/overlay/impl/ProtocolMessage.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,51 +120,94 @@ buffersBegin(BufferSequence const& bufs)
bufs);
}

/** Parse a message header
* @return a seated optional if the message header was successfully
* parsed. An unseated optional otherwise, in which case
* @param ec contains more information:
* - set to `errc::success` if not enough bytes were present
* - set to `errc::no_message` if a valid header was not present
*/
template <class BufferSequence>
boost::optional<MessageHeader>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we change this to std::optional? I think we are trying to migrate away from boost::optional

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can. Shouldn't be a problem.

parseMessageHeader(BufferSequence const& bufs, std::size_t size)
parseMessageHeader(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think of putting the bitwise comparison inside of a named lambda, where the name describes what is being checked? I don't really like bitwise operations in a conditional directly, because it doesn't read that easily. It always takes me a second to figure out whats going on, and I think sometimes subtle bugs can occur. Remember this: #3417? At the least, I think we should add a comment saying exactly what is being checked.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, after reading through the whole function, I think we should abstract away these bitwise operations to a class, or a bunch of helper functions. Bitwise operations are pretty low level, and I think it's hard to read the code and easy to mess up. We could compromise on more judicious use of comments (I would put a clear comment before every bitwise operation), but I'd really prefer named functions; it's pretty easy for a developer to change the code and forget to update the comment, but it's harder to change the function implementation and forget to update the function name.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I hear what you're saying, but I think that adding additional lambas would only add confusion. Better comments would probably help; I'll try and follow-up.

boost::system::error_code& ec,
BufferSequence const& bufs,
std::size_t size)
{
using namespace ripple::compression;
auto iter = buffersBegin(bufs);

MessageHeader hdr;
auto const compressed = (*iter & 0x80) == 0x80;
auto iter = buffersBegin(bufs);

// Check valid header
if ((*iter & 0xFC) == 0 || compressed)
if (*iter & 0x80)
nbougalis marked this conversation as resolved.
Show resolved Hide resolved
{
hdr.header_size = compressed ? headerBytesCompressed : headerBytes;
hdr.header_size = headerBytesCompressed;

// not enough bytes to parse the header
if (size < hdr.header_size)
return {};
{
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer a style that checks for errors and returns early as opposed for checking for success. That style tends to reduce nesting indentation levels. Most of the code in this function is written this way, but this block doesn't do that. It's a minor nit, so no change needed, but since I don't have many comments in this PR I thought I'd point it out.
I.e

    if ((*iter & 0xFC) != 0)
    {
        ec = make_error_code(boost::system::errc::no_message);
        return boost::none;
    }
    hdr.header_size = headerBytes;
   // ect.

ec = make_error_code(boost::system::errc::success);
return boost::none;
}

if (*iter & 0x0C)
{
ec = make_error_code(boost::system::errc::protocol_error);
return boost::none;
}

hdr.algorithm = static_cast<compression::Algorithm>(*iter);

if (compressed)
if (hdr.algorithm != compression::Algorithm::LZ4)
{
uint8_t algorithm = (*iter & 0x70) >> 4;
if (algorithm !=
static_cast<std::uint8_t>(compression::Algorithm::LZ4))
return {};
hdr.algorithm = compression::Algorithm::LZ4;
ec = make_error_code(boost::system::errc::protocol_error);
return boost::none;
}

for (int i = 0; i != 4; ++i)
hdr.payload_wire_size = (hdr.payload_wire_size << 8) + *iter++;
// clear the compression bits
hdr.payload_wire_size &= 0x03FFFFFF;

// clear the top four bits (the compression bits).
hdr.payload_wire_size &= 0x0FFFFFFF;

hdr.total_wire_size = hdr.header_size + hdr.payload_wire_size;

for (int i = 0; i != 2; ++i)
hdr.message_type = (hdr.message_type << 8) + *iter++;

if (compressed)
for (int i = 0; i != 4; ++i)
hdr.uncompressed_size = (hdr.uncompressed_size << 8) + *iter++;
for (int i = 0; i != 4; ++i)
hdr.uncompressed_size = (hdr.uncompressed_size << 8) + *iter++;

return hdr;
}

if ((*iter & 0xFC) == 0)
{
hdr.header_size = headerBytes;

if (size < hdr.header_size)
{
ec = make_error_code(boost::system::errc::success);
return boost::none;
}

hdr.algorithm = Algorithm::None;

for (int i = 0; i != 4; ++i)
hdr.payload_wire_size = (hdr.payload_wire_size << 8) + *iter++;

hdr.uncompressed_size = hdr.payload_wire_size;
hdr.total_wire_size = hdr.header_size + hdr.payload_wire_size;

for (int i = 0; i != 2; ++i)
hdr.message_type = (hdr.message_type << 8) + *iter++;

return hdr;
}

return {};
ec = make_error_code(boost::system::errc::no_message);
return boost::none;
}

template <
Expand All @@ -186,7 +229,7 @@ invoke(MessageHeader const& header, Buffers const& buffers, Handler& handler)
std::vector<std::uint8_t> payload;
payload.resize(header.uncompressed_size);

auto payloadSize = ripple::compression::decompress(
auto const payloadSize = ripple::compression::decompress(
stream,
header.payload_wire_size,
payload.data(),
Expand Down Expand Up @@ -226,23 +269,35 @@ invokeProtocolMessage(Buffers const& buffers, Handler& handler)
if (size == 0)
return result;

auto header = detail::parseMessageHeader(buffers, size);
auto header = detail::parseMessageHeader(result.second, buffers, size);

// If we can't parse the header then it may be that we don't have enough
// bytes yet, or because the message was cut off.
// bytes yet, or because the message was cut off (if error_code is success).
// Otherwise we failed to match the header's marker (error_code is set to
// no_message) or the compression algorithm is invalid (error_code is
// protocol_error) and signal an error.
if (!header)
return result;

// We implement a maximum size for protocol messages. Sending a message
// whose size exceeds this may result in the connection being dropped. A
// larger message size may be supported in the future or negotiated as
// part of a protocol upgrade.
if (header->payload_wire_size > megabytes(64))
if (header->payload_wire_size > megabytes(64) ||
header->uncompressed_size > megabytes(64))
{
result.second = make_error_code(boost::system::errc::message_size);
return result;
}

// We requested uncompressed messages from the peer but received compressed.
if (!handler.compressionEnabled() &&
header->algorithm != compression::Algorithm::None)
{
result.second = make_error_code(boost::system::errc::protocol_error);
return result;
}

// We don't have the whole message yet. This isn't an error but we have
// nothing to do.
if (header->total_wire_size > size)
Expand Down
40 changes: 10 additions & 30 deletions src/test/overlay/compression_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,21 +84,14 @@ class compression_test : public beast::unit_test::suite
std::shared_ptr<T> proto,
protocol::MessageType mt,
uint16_t nbuffers,
const char* msg,
bool log = false)
std::string msg)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

const&

{
if (log)
printf("=== compress/decompress %s ===\n", msg);
testcase("Compress/Decompress: " + msg);

Message m(*proto, mt);

auto& buffer = m.getBuffer(Compressed::On);

if (log)
printf(
"==> compressed, original %d bytes, compressed %d bytes\n",
(int)m.getBuffer(Compressed::Off).size(),
(int)m.getBuffer(Compressed::On).size());

boost::beast::multi_buffer buffers;

// simulate multi-buffer
Expand All @@ -112,26 +105,15 @@ class compression_test : public beast::unit_test::suite
buffers.commit(boost::asio::buffer_copy(
buffers.prepare(slice.size()), boost::asio::buffer(slice)));
}
auto header =
ripple::detail::parseMessageHeader(buffers.data(), buffer.size());

if (log)
printf(
"==> parsed header: buffers size %d, compressed %d, algorithm "
"%d, header size %d, payload size %d, buffer size %d\n",
(int)buffers.size(),
header->algorithm != Algorithm::None,
(int)header->algorithm,
(int)header->header_size,
(int)header->payload_wire_size,
(int)buffer.size());

boost::system::error_code ec;
auto header = ripple::detail::parseMessageHeader(
ec, buffers.data(), buffer.size());

BEAST_EXPECT(header);

if (header->algorithm == Algorithm::None)
{
if (log)
printf("==> NOT COMPRESSED\n");
return;
}

std::vector<std::uint8_t> decompressed;
decompressed.resize(header->uncompressed_size);
Expand All @@ -157,8 +139,6 @@ class compression_test : public beast::unit_test::suite
uncompressed.begin() + ripple::compression::headerBytes,
uncompressed.end(),
decompressed.begin()));
if (log)
printf("\n");
}

std::shared_ptr<protocol::TMManifests>
Expand Down Expand Up @@ -460,4 +440,4 @@ class compression_test : public beast::unit_test::suite
BEAST_DEFINE_TESTSUITE_MANUAL_PRIO(compression, ripple_data, ripple, 20);

} // namespace test
} // namespace ripple
} // namespace ripple