Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use CRC64 as checksum algo for LogFile #4261

Merged
merged 10 commits into from
Mar 15, 2022
6 changes: 3 additions & 3 deletions dbms/src/Storages/Page/V3/LogFile/LogFormat.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,13 @@ static constexpr UInt8 MaxRecordType = RecyclableLastType;
static constexpr UInt32 BLOCK_SIZE = 32 * 1024;
static_assert(BLOCK_SIZE < std::numeric_limits<UInt16>::max());

using ChecksumClass = Digest::CRC32; // TODO: CRC64
using ChecksumClass = Digest::CRC64;

using ChecksumType = ChecksumClass::HashType;

static constexpr UInt32 CHECKSUM_FIELD_SIZE = sizeof(ChecksumType);
static constexpr UInt32 CHECKSUM_FIELD_SIZE = ChecksumClass::hash_size;

// If the size of payload is larger than `BLOCK_SIZE`, it will be splited into
// If the size of payload is larger than `BLOCK_SIZE`, it will be splitted into
// fragments. So `PAYLOAD_FIELD_SIZE` must be fit in UInt16.
static constexpr UInt32 PAYLOAD_FIELD_SIZE = sizeof(UInt16);

Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/Page/V3/LogFile/LogReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,7 @@ UInt8 LogReader::readPhysicalRecord(std::string_view * result, size_t * drop_siz
}
else if (err != 0)
return err;
// else parse header successe.
// else parse header success.

if (verify_checksum)
{
Expand Down
12 changes: 6 additions & 6 deletions dbms/src/Storages/Page/V3/LogFile/LogWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,31 +39,31 @@ namespace PS::V3
* +-----+-------------+--+----+----------+------+-- ... ----+
* File | r0 | r1 |P | r2 | r3 | r4 | |
* +-----+-------------+--+----+----------+------+-- ... ----+
* <--- kBlockSize ------>|<-- kBlockSize ------>|
* <---- BlockSize ------>|<--- BlockSize ------>|
* rn = variable size records
* P = Padding
*
* Data is written out in kBlockSize chunks. If next record does not fit
* Data is written out in BlockSize chunks. If next record does not fit
* into the space left, the leftover space will be padded with \0.
*
* Legacy record format:
*
* +--------------+-----------+-----------+--- ... ---+
* |CheckSum (4B) | Size (2B) | Type (1B) | Payload |
* |CheckSum (8B) | Size (2B) | Type (1B) | Payload |
* +--------------+-----------+-----------+--- ... ---+
*
* CheckSum = 32bit hash computed over the record type and payload using checksum algo
* CheckSum = 64bit hash computed over the record type and payload using checksum algo (CRC64)
* Size = Length of the payload data
* Type = Type of record
* (kZeroType, kFullType, kFirstType, kLastType, kMiddleType )
* (ZeroType, FullType, FirstType, LastType, MiddleType)
* The type is used to group a bunch of records together to represent
* blocks that are larger than kBlockSize
* Payload = Byte stream as long as specified by the payload size
*
* Recyclable record format:
*
* +--------------+-----------+-----------+----------------+--- ... ---+
* |CheckSum (4B) | Size (2B) | Type (1B) | Log number (4B)| Payload |
* |CheckSum (8B) | Size (2B) | Type (1B) | Log number (4B)| Payload |
* +--------------+-----------+-----------+----------------+--- ... ---+
*
* Same as above, with the addition of
Expand Down
26 changes: 12 additions & 14 deletions dbms/src/Storages/Page/V3/tests/gtest_wal_log.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -216,19 +216,17 @@ class LogFileRWTest : public ::testing::TestWithParam<std::tuple<bool, bool>>
PageUtil::ftruncateFile(wr_file, writtenBytes() - bytes);
}

void fixChecksum(int header_offset, int len, bool recyclable)
void fixChecksum(int header_offset, int payload_len, bool recyclable)
{
// Compute crc of type/len/data
int header_size = recyclable ? Format::RECYCLABLE_HEADER_SIZE : Format::HEADER_SIZE;
Digest::CRC32 digest;
Format::ChecksumClass digest;

size_t crc_buff_size = header_size - Format::CHECKSUM_START_OFFSET + len;
size_t crc_buff_size = header_size - Format::CHECKSUM_START_OFFSET + payload_len;
char crc_buff[crc_buff_size];
PageUtil::readFile(wr_file, header_offset + Format::CHECKSUM_START_OFFSET, crc_buff, crc_buff_size, nullptr);

digest.update(
crc_buff,
crc_buff_size);
digest.update(crc_buff, crc_buff_size);

auto checksum = digest.checksum();
PageUtil::writeFile(wr_file, header_offset, reinterpret_cast<char *>(&checksum), sizeof(checksum), nullptr);
Expand Down Expand Up @@ -421,7 +419,7 @@ TEST_P(LogFileRWTest, BadRecordType)
TEST_P(LogFileRWTest, TruncatedTrailingRecordIsIgnored)
{
write("foo");
shrinkSize(4); // Drop all payload as well as a header byte
shrinkSize(3 + sizeof(Format::MaxRecordType)); // Drop all payload as well as a header byte
resetReader();
ASSERT_EQ("EOF", read());
// Truncated last record is ignored, not treated as an error
Expand All @@ -439,7 +437,7 @@ TEST_P(LogFileRWTest, TruncatedTrailingRecordIsNotIgnored)
}

write("foo");
shrinkSize(4); // Drop all payload as well as a header byte
shrinkSize(3 + sizeof(Format::MaxRecordType)); // Drop all payload as well as a header byte
resetReader(WALRecoveryMode::AbsoluteConsistency);
ASSERT_EQ("EOF", read());
// Truncated last record is ignored, not treated as an error
Expand All @@ -461,8 +459,8 @@ TEST_P(LogFileRWTest, BadLength)
write(repeatedString("bar", payload_size));
write("foo");
resetReader();
// Least significant size byte is stored in header[4].
incrementByte(4, 1);
// Least significant size byte is stored in header[SizePos].
incrementByte(Format::CHECKSUM_FIELD_SIZE, 1);
if (!recyclable_log)
{
ASSERT_EQ("foo", read());
Expand Down Expand Up @@ -512,11 +510,11 @@ TEST_P(LogFileRWTest, BadLengthAtEndIsNotIgnored)
TEST_P(LogFileRWTest, ChecksumMismatch)
{
write("foooooo");
incrementByte(0, 14);
incrementByte(0, Format::HEADER_SIZE + 7);
ASSERT_EQ("EOF", read());
if (!recyclable_log)
{
ASSERT_EQ(14, droppedBytes());
ASSERT_EQ(Format::HEADER_SIZE + 7, droppedBytes());
ASSERT_EQ("OK", matchError("checksum mismatch"));
}
else
Expand Down Expand Up @@ -574,7 +572,7 @@ TEST_P(LogFileRWTest, MissingLastIsIgnored)
{
write(repeatedString("bar", PS::V3::Format::BLOCK_SIZE));
// Remove the LAST block, including header.
shrinkSize(14);
shrinkSize(2 * (recyclable_log ? Format::RECYCLABLE_HEADER_SIZE : Format::HEADER_SIZE));
ASSERT_EQ("EOF", read());
ASSERT_EQ("", reportMessage());
ASSERT_EQ(0, droppedBytes());
Expand All @@ -591,7 +589,7 @@ TEST_P(LogFileRWTest, MissingLastIsNotIgnored)
resetReader(WALRecoveryMode::AbsoluteConsistency);
write(repeatedString("bar", PS::V3::Format::BLOCK_SIZE));
// Remove the LAST block, including header.
shrinkSize(14);
shrinkSize(2 * (recyclable_log ? Format::RECYCLABLE_HEADER_SIZE : Format::HEADER_SIZE));
ASSERT_EQ("EOF", read());
ASSERT_GT(droppedBytes(), 0);
ASSERT_EQ("OK", matchError("Corruption: error reading trailing data"));
Expand Down