From 9409ca9ce1bac985236497e2e0b8651707b32a3e Mon Sep 17 00:00:00 2001 From: rdhabalia Date: Thu, 29 Sep 2016 18:42:36 -0700 Subject: [PATCH] Fix incremental checksum computation for crc --- .../src/main/circe/include/int_types.h | 12 +++- .../scurrilous/circe/crc/ReflectedIntCrc.java | 6 +- .../pulsar/checksum/utils/Crc32cChecksum.java | 15 ++++- .../com/scurrilous/circe/crc/CRCTest.java | 19 ++++++- .../compression/Crc32cChecksumTest.java | 56 +++++++++++++++++++ 5 files changed, 102 insertions(+), 6 deletions(-) diff --git a/pulsar-checksum/src/main/circe/include/int_types.h b/pulsar-checksum/src/main/circe/include/int_types.h index ca5d005e65333..16c5c85489cf5 100644 --- a/pulsar-checksum/src/main/circe/include/int_types.h +++ b/pulsar-checksum/src/main/circe/include/int_types.h @@ -15,7 +15,7 @@ ******************************************************************************/ #include // size_t -#ifdef _MSC_VER +#if defined(_MSC_VER) && _MSC_VER < 1600 // stdint.h added in MSVC 2010 typedef __int8 int8_t; typedef __int16 int16_t; @@ -26,12 +26,18 @@ typedef unsigned __int16 uint16_t; typedef unsigned __int32 uint32_t; typedef unsigned __int64 uint64_t; -# define SIZE_T_FORMAT "%Iu" - #else # include +#endif + +#if defined(_MSC_VER) && _MSC_VER < 1900 // MSVC 2015 + +# define SIZE_T_FORMAT "%Iu" + +#else + # define SIZE_T_FORMAT "%zu" #endif diff --git a/pulsar-checksum/src/main/java/com/scurrilous/circe/crc/ReflectedIntCrc.java b/pulsar-checksum/src/main/java/com/scurrilous/circe/crc/ReflectedIntCrc.java index aac5adf59d832..0b190fa2b6012 100644 --- a/pulsar-checksum/src/main/java/com/scurrilous/circe/crc/ReflectedIntCrc.java +++ b/pulsar-checksum/src/main/java/com/scurrilous/circe/crc/ReflectedIntCrc.java @@ -35,9 +35,13 @@ final class ReflectedIntCrc extends AbstractIntCrc { } } + @Override + protected int initial() { + return reflect(super.initial()); + } + @Override protected int resumeRaw(int crc, byte[] input, int index, int length) { - crc = reflect(crc); for (int i = 0; i < length; ++i) crc = table[(crc ^ input[index + i]) & 0xff] ^ (crc >>> 8); return crc; diff --git a/pulsar-checksum/src/main/java/com/yahoo/pulsar/checksum/utils/Crc32cChecksum.java b/pulsar-checksum/src/main/java/com/yahoo/pulsar/checksum/utils/Crc32cChecksum.java index c4ce5ae3c6bd4..8fe02cada2d42 100644 --- a/pulsar-checksum/src/main/java/com/yahoo/pulsar/checksum/utils/Crc32cChecksum.java +++ b/pulsar-checksum/src/main/java/com/yahoo/pulsar/checksum/utils/Crc32cChecksum.java @@ -46,7 +46,7 @@ public class Crc32cChecksum { */ public static int computeChecksum(ByteBuf payload) { if (payload.hasMemoryAddress() && (CRC32C_HASH instanceof Sse42Crc32C)) { - return CRC32C_HASH.calculate(payload.memoryAddress(), payload.readableBytes()); + return CRC32C_HASH.calculate(payload.memoryAddress() + payload.readerIndex(), payload.readableBytes()); } else if (payload.hasArray()) { return CRC32C_HASH.calculate(payload.array(), payload.arrayOffset() + payload.readerIndex(), payload.readableBytes()); @@ -54,5 +54,18 @@ public static int computeChecksum(ByteBuf payload) { return CRC32C_HASH.calculate(payload.nioBuffer()); } } + + + public static int resumeChecksum(int previousChecksum, ByteBuf payload) { + if (payload.hasMemoryAddress() && (CRC32C_HASH instanceof Sse42Crc32C)) { + return CRC32C_HASH.resume(previousChecksum, payload.memoryAddress() + payload.readerIndex(), + payload.readableBytes()); + } else if (payload.hasArray()) { + return CRC32C_HASH.resume(previousChecksum, payload.array(), payload.arrayOffset() + payload.readerIndex(), + payload.readableBytes()); + } else { + return CRC32C_HASH.resume(previousChecksum, payload.nioBuffer()); + } + } } diff --git a/pulsar-checksum/src/test/java/com/scurrilous/circe/crc/CRCTest.java b/pulsar-checksum/src/test/java/com/scurrilous/circe/crc/CRCTest.java index 65ebe1a5a9419..c2803c995a422 100644 --- a/pulsar-checksum/src/test/java/com/scurrilous/circe/crc/CRCTest.java +++ b/pulsar-checksum/src/test/java/com/scurrilous/circe/crc/CRCTest.java @@ -31,6 +31,7 @@ import org.testng.annotations.Test; import com.scurrilous.circe.HashProvider; +import com.scurrilous.circe.IncrementalIntHash; import com.scurrilous.circe.params.CrcParameters; /** @@ -160,4 +161,20 @@ public void testCRC64() { public void testCRC64_XZ() { assertEquals(0x995dc9bbdf1939faL, PROVIDER.getIncrementalLong(CRC64_XZ).calculate(DIGITS)); } -} + + @Test + public void testCRC32CIncremental() { + // reflected + testIncremental(PROVIDER.getIncrementalInt(CRC32C)); + } + + private void testIncremental(IncrementalIntHash hash) { + final String data = "data"; + final String combined = data + data; + + final int dataChecksum = hash.calculate(data.getBytes(ASCII)); + final int combinedChecksum = hash.calculate(combined.getBytes(ASCII)); + final int incrementalChecksum = hash.resume(dataChecksum, data.getBytes(ASCII)); + assertEquals(combinedChecksum, incrementalChecksum); + } +} \ No newline at end of file diff --git a/pulsar-common/src/test/java/com/yahoo/pulsar/common/compression/Crc32cChecksumTest.java b/pulsar-common/src/test/java/com/yahoo/pulsar/common/compression/Crc32cChecksumTest.java index 806c0f7338a9e..dea986f8bd8bd 100644 --- a/pulsar-common/src/test/java/com/yahoo/pulsar/common/compression/Crc32cChecksumTest.java +++ b/pulsar-common/src/test/java/com/yahoo/pulsar/common/compression/Crc32cChecksumTest.java @@ -96,4 +96,60 @@ public void testCrc32cDirectMemoryHardware() { payload.release(); assertEquals(checksum, expectedChecksum); } + + @Test + public void testCrc32cIncremental() { + if (HARDWARE_CRC32C_HASH == null) { + return; + } + + String data = "data-abcd-data-123-$%#"; + + for (int i = 0; i < 20; i++) { + String doubleData = data + data; + + int doubleDataCrcHW = HARDWARE_CRC32C_HASH.calculate(doubleData.getBytes()); + int data1CrcHW = HARDWARE_CRC32C_HASH.calculate(data.getBytes()); + int data2CrcHW = HARDWARE_CRC32C_HASH.resume(data1CrcHW, data.getBytes()); + assertEquals(doubleDataCrcHW, data2CrcHW); + + int doubleDataCrcSW = SOFTWARE_CRC32C_HASH.calculate(doubleData.getBytes()); + int data1CrcSW = SOFTWARE_CRC32C_HASH.calculate(data.getBytes()); + int data2CrcSW = SOFTWARE_CRC32C_HASH.resume(data1CrcSW, data.getBytes()); + assertEquals(doubleDataCrcSW, data2CrcSW); + + assertEquals(doubleDataCrcHW, doubleDataCrcSW); + + data += data; + } + } + + @Test + public void testCrc32cIncrementalUsingProvider() { + + final byte[] data = "data".getBytes(); + final byte[] doubleData = "datadata".getBytes(); + ByteBuf payload = Unpooled.wrappedBuffer(data); + ByteBuf doublePayload = Unpooled.wrappedBuffer(doubleData); + + int expectedChecksum = Crc32cChecksum.computeChecksum(doublePayload); + + // (1) heap-memory + int checksum = Crc32cChecksum.computeChecksum(payload); + int incrementalChecksum = Crc32cChecksum.resumeChecksum(checksum, payload); + assertEquals(expectedChecksum, incrementalChecksum); + payload.release(); + doublePayload.release(); + + // (2) direct-memory + payload = ByteBufAllocator.DEFAULT.directBuffer(data.length); + payload.writeBytes(data); + checksum = Crc32cChecksum.computeChecksum(payload); + incrementalChecksum = Crc32cChecksum.resumeChecksum(checksum, payload); + assertEquals(expectedChecksum, incrementalChecksum); + payload.release(); + + + } + }