Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add crc32c-checksum verification on message header-payload #43

Merged
merged 1 commit into from
Oct 11, 2016

Conversation

rdhabalia
Copy link
Contributor

Motivation

  • Right now, Client and Broker computes checksum on entire message rather on payload-data only.
  • Also Pulsar uses XXHashChecksum algorithm to compute checksum and SSE4.2CRC32C checksum uses machine-instruction which is faster than XXHashChecksum.
  • If client receives checksum error from broker then it keep retries with the same message again rather recomputing checksum and failing message if message is already corrupted.

Modifications

  • Replace XXHashChecksum with SSE4.2CRC32c checksum.
  • Compute checksum on payload-data only. So, added checksum: magicByte + checksum-value fields with in message-command.
  • Client try to recover if it receives checksum error from server and fails message if message is already corrupted.
  • Right now, default: checksum verification is disabled at client-producer side.

Result

Client and Broker can do SSE4.2CRC32c checksum to identify message corruption.

*/
protected boolean updateChecksumIfRequire(OpSendMsg op) {

if (op.cmd instanceof DoubleByteBuf) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here, to avoid creation of new ByteBuf we modify same DoubleByteBuf of the message with newly computed checksum.
However, while message creation if we see memory-leak then we create SimpleLeakAwareByteBuf or AdvancedLeakAwareByteBuf (based on ResourceLeak Level) instead DoubleByteBuf. So, should we keep this check.

@yahoocla
Copy link

CLA is valid!

@@ -51,7 +51,7 @@ message MessageMetadata {
optional CompressionType compression = 8 [default = NONE];
optional uint32 uncompressed_size = 9 [default = 0];
// XXHash64 checksum of the original message payload
optional sfixed64 checksum = 10;
//optional sfixed64 checksum = 10;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add comment to mention this field was removed in favor of header+payload checksum

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure.. mentioned in comment.

@@ -406,18 +428,23 @@ private static ByteBuf serializeWithSize(BaseCommand.Builder cmdBuilder) {
return buf;
}

private static ByteBuf serializeCommandSendWithSize(BaseCommand.Builder cmdBuilder, MessageMetadata msgMetadata,
private static ByteBuf serializeCommandSendWithSize(BaseCommand.Builder cmdBuilder, boolean includeChecksum, MessageMetadata msgMetadata,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Prefer an enum value instead of boolean to make it easier to read where the function is called.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure. added enum ChecksumType instead boolean

if (includeChecksum) {
headers.writeShort(magicCrc32c);
checksumReaderIndex = headers.writerIndex();
headers.writeZero(4); // write dummy checksum int to skip 4 bytes in write index
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can just move the writerIndex 4 bytes

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes.. made change to skip 4 bytes in writerIndex

CommandSendError sendError = sendErrorBuilder.build();
ByteBuf res = serializeWithSize(BaseCommand.newBuilder().setType(Type.SEND_ERROR).setSendError(sendError));
sendErrorBuilder.recycle();
sendError.recycle();
return res;
}


public static Long readChecksum(ByteBuf buffer) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd prefer to have 2 methods :

boolean hasChecksum(ByteBuf headersAndPayload);
int readChecksum(ByteBuf headersAndPayload);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

introduced hasChecksum() again.

headers.retain();
payload.retain();
headers.readerIndex(metadataReaderIndex);
ByteBuf msgMetadataBuf = DoubleByteBuf.get(headers, payload);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of using a DoubleByteBuf, wouldn't it be easier to compute the checksum incrementally?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually there are two things:

  • Circe java implementation doesn't have incremental checksum support.
 public int resume(int current, long address, long length) {
        throw new UnsupportedOperationException();
  }
  • and client side: payload is type of UnpooledHeapByteBuf. So, we can't use memoryAddress so, java implementation also doesn't have incremental checksum using array.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After #44 gets merged I guess we should make it work with incremental checksum for all kinds of buffers

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

refactored Code which avoids creation of new ByteBuf and computes checksum on same DoubleByteBuf which doesn't need incremental checksum as well.

* @param op
* @return isUpdated: returns true only if checksum is updated in {@link OpSendMsg}
*/
protected boolean updateChecksumIfRequire(OpSendMsg op) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Method name is a bit awkward. What about verifyLocalBufferIsNotCorrupted() ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually, this method not only verifies checksum but also update checksum if it doesn't match with newly computed checksum. Test-case
Caller

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Uhm, in any case it should not "update" the checksum.. If the checksum doesn't match we just need to give error to the application

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

renamed method name as verifyLocalBufferIsNotCorrupted and it just verifies checksum, if it is different then fails the callback else we will retry send-message again.

msg.resetReaderIndex();
}
} else {
log.warn("[{}] [{}] Memory leak detected while creating message with id {}", topic, producerName,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When can this happen? And why would that be a memory leak?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have seen it before in past. I will try to see if I can reproduce.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems the very first object of DoubleByteBuf is created as SimpleLeakAwareByteBuf or AdvancedLeakAwareByteBuf based on ResourceLeak Level. And it can be reproduced by starting any test-case and put the break-point at Commands where we create DoubleByteBuf. and at very first time it will create LeakAwareByteBuf.

It is because of following code which netty has:

         if ((leakCheckCnt ++ & mask) == 0) {
                reportLeak(level);
                return new DefaultResourceLeak(obj);
            } else {
                return null;
            }

At very first time leakCheckCnt will be 0 and it reports leak and it cause to create LeakAwareByteBuf

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

merged fixed at netty

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think the leak is a false positive. The commit in netty doesn't change the substance of the leak detector. By default, when leak detection level is simple, it will pick 1 out 100 allocated buffers and instrument it for leak detection.

Whether you start picking the 1st buffer or the 101st, only changes when you're going to detect the leak.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that you've fixed the incremental crc part, we should get rid of the DoubleByteBuf here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes.. with netty leak-detection my only concern was that there could be a possibility where we create LeakAwareByteBuf and that can fail DoubleByteBuf casting.

  • even with use of incremental-checksum, we need two buffers (b1, b2) that present into created DoubleByteBuf and to retrieve those buffers, we might have to cast into DoubleByteBuf. so, we may not be able to get rid of DoubleByteBuf right.?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually, yes.. we can avoid DoubleByteBuf

*
* @param op
*/
private static void removeChecksum(OpSendMsg op) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

stripChecksum() ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure. Renamed method name as stripChecksum()

temp.skipBytes(cmdSize);
boolean hasChecksum = Commands.readChecksum(temp) != null;

if (hasChecksum) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if (!hasChecksum) {
   return;
}

// strip the checksum
....

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure. addressed this.

int msgBufSize = op.cmd.readableBytes();
// ByteBuf can't use readBytes() to same buffer as it always requires readerIndex < writerIndex while writing
// into buffer. So, creating new temp ByteBuf to copy data without checksum.
ByteBuf temp = op.cmd.alloc().buffer(msgBufSize, msgBufSize);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should be able to avoid the copy

Copy link
Contributor Author

@rdhabalia rdhabalia Sep 28, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

modifying same buf to strip checksum without creating new buf.

@rdhabalia rdhabalia force-pushed the checksum branch 3 times, most recently from fe5bdb2 to dfe114e Compare September 28, 2016 01:46
@rdhabalia rdhabalia added the type/enhancement The enhancements for the existing features or docs. e.g. reduce memory usage of the delayed messages label Sep 28, 2016
@rdhabalia rdhabalia added this to the 1.15 milestone Sep 28, 2016
@rdhabalia rdhabalia self-assigned this Sep 28, 2016
@rdhabalia rdhabalia force-pushed the checksum branch 7 times, most recently from ffdec7c to 65388ee Compare September 29, 2016 18:17
@rdhabalia rdhabalia force-pushed the checksum branch 8 times, most recently from f858b05 to 3b64d60 Compare October 6, 2016 00:24
int checksum = readChecksum(headerFrame).intValue();
// msg.readerIndex is already at header-payload index, Recompute checksum for headers-payload
int metadataChecksum = computeChecksum(headerFrame);
long computedChecksum = resumeChecksum(metadataChecksum, msg.getSecond());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

merged change with incremental-checksum computation.

if (sequenceId == expectedSequenceId) {
boolean corrupted = !verifyLocalBufferIsNotCorrupted(op);
if (corrupted) {
op.callback.sendComplete(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the op was just peeked from the queue but not actually removed here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes.. removing and cleaning op from queue after failing callback.

@rdhabalia rdhabalia force-pushed the checksum branch 2 times, most recently from 0dc4f28 to 02c6693 Compare October 6, 2016 19:10
@@ -81,6 +88,8 @@

private static final AtomicLongFieldUpdater<ProducerImpl> msgIdGeneratorUpdater = AtomicLongFieldUpdater
.newUpdater(ProducerImpl.class, "msgIdGenerator");
// it prevents client to compute checksum and adding into payload
private static boolean checksumEnabled = false;
Copy link
Contributor

@merlimat merlimat Oct 6, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this flag? Shouldn't we enabled the checksum always?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it was part of out rollout plan.. we wanted to enable this feature in two phases: rollout broker first and later on enable at client-side.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Client will already check for the v6 protocol version, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

addressed it..

if (checksum == computedChecksum) {
return true;
} else {
log.error("[{}] [{}] Failed to verify checksum", topic, producerName);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should be able to include message id as well at this point

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually, at this point we haven't persisted the message so, we don't have message-id so, we are not logging message-id.

private final static IncrementalIntHash CRC32C_HASH;

static {
if (Sse42Crc32C.isSupported()) {
CRC32C_HASH = new Crc32cSse42Provider().getIncrementalInt(CRC32C);
log.info("SSE4.2 CRC32C provider initialized");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move this to debug level

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually, as it logs only once when broker starts. So, can we keep it "INFO" initially to get confirmation about broker loaded library successfully and it is not computing checksum using slower-software-algo .

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Though, this will all print in client lib logs

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure. changed log-level to debug.
However, client-lib should also print it only once, and I think, it would be great if user has transparency to know which version (hw/sw) of checksum is being used by app.

}
}
// close connection and let producer resend pending-messages
cnx.ctx().close();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we resend messages without closing the connection?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes.. actually we can do resendMessages(cnx); to resend without closing/disturbing connection.

@rdhabalia rdhabalia force-pushed the checksum branch 2 times, most recently from cfd3480 to e78c8b9 Compare October 9, 2016 21:00
Copy link
Contributor

@sboobna sboobna left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@merlimat
Copy link
Contributor

@rdhabalia This looks good to go. Can you rebase to resolve the conflict in SimpleProducerConsumerTest.java ?

Copy link
Contributor

@merlimat merlimat left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@merlimat merlimat merged commit 309d753 into apache:master Oct 11, 2016
@rdhabalia rdhabalia deleted the checksum branch January 23, 2017 22:10
sijie pushed a commit to sijie/pulsar that referenced this pull request Mar 4, 2018
* Create pulsar-functions module (#1)

* Create pulsar-functions module

* rename `sdk` package to `api`

* Added the first cut of the Java interface for Pulsar functions (#2)

* Adhere to rest semantics

* Complete the list of functions supported by cli
massakam pushed a commit to massakam/pulsar that referenced this pull request Aug 6, 2020
statsIntervalInSeconds can be 0 to disable log
hrsakai pushed a commit to hrsakai/pulsar that referenced this pull request Dec 10, 2020
Signed-off-by: xiaolong.ran ranxiaolong716@gmail.com

* Support batch logic for project

* add unit test case of event time

* add some unit tests case for producer

* fix error result type

* add unit test case of producer flush

* add receiver queue size test logic

* support partition consumer receive async

* add unit test case of ack timeout

* Fix consumer receiving message out of order
tisonkun added a commit to tisonkun/pulsar that referenced this pull request Jul 12, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/enhancement The enhancements for the existing features or docs. e.g. reduce memory usage of the delayed messages
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants