diff --git a/include/dsn/cpp/serialization_helper/dsn.layer2_types.h b/include/dsn/cpp/serialization_helper/dsn.layer2_types.h index 4b188c9041..c5e594439b 100644 --- a/include/dsn/cpp/serialization_helper/dsn.layer2_types.h +++ b/include/dsn/cpp/serialization_helper/dsn.layer2_types.h @@ -44,6 +44,8 @@ class configuration_query_by_index_response; class app_info; +class thrift_request_meta_v1; + typedef struct _partition_configuration__isset { _partition_configuration__isset() @@ -426,6 +428,101 @@ inline std::ostream &operator<<(std::ostream &out, const app_info &obj) return out; } +typedef struct _thrift_request_meta_v1__isset +{ + _thrift_request_meta_v1__isset() + : app_id(false), + partition_index(false), + client_timeout(false), + client_partition_hash(false), + is_backup_request(false) + { + } + bool app_id : 1; + bool partition_index : 1; + bool client_timeout : 1; + bool client_partition_hash : 1; + bool is_backup_request : 1; +} _thrift_request_meta_v1__isset; + +class thrift_request_meta_v1 +{ +public: + thrift_request_meta_v1(const thrift_request_meta_v1 &); + thrift_request_meta_v1(thrift_request_meta_v1 &&); + thrift_request_meta_v1 &operator=(const thrift_request_meta_v1 &); + thrift_request_meta_v1 &operator=(thrift_request_meta_v1 &&); + thrift_request_meta_v1() + : app_id(0), + partition_index(0), + client_timeout(0), + client_partition_hash(0), + is_backup_request(0) + { + } + + virtual ~thrift_request_meta_v1() throw(); + int32_t app_id; + int32_t partition_index; + int32_t client_timeout; + int64_t client_partition_hash; + bool is_backup_request; + + _thrift_request_meta_v1__isset __isset; + + void __set_app_id(const int32_t val); + + void __set_partition_index(const int32_t val); + + void __set_client_timeout(const int32_t val); + + void __set_client_partition_hash(const int64_t val); + + void __set_is_backup_request(const bool val); + + bool operator==(const thrift_request_meta_v1 &rhs) const + { + if (__isset.app_id != rhs.__isset.app_id) + return false; + else if (__isset.app_id && !(app_id == rhs.app_id)) + return false; + if (__isset.partition_index != rhs.__isset.partition_index) + return false; + else if (__isset.partition_index && !(partition_index == rhs.partition_index)) + return false; + if (__isset.client_timeout != rhs.__isset.client_timeout) + return false; + else if (__isset.client_timeout && !(client_timeout == rhs.client_timeout)) + return false; + if (__isset.client_partition_hash != rhs.__isset.client_partition_hash) + return false; + else if (__isset.client_partition_hash && + !(client_partition_hash == rhs.client_partition_hash)) + return false; + if (__isset.is_backup_request != rhs.__isset.is_backup_request) + return false; + else if (__isset.is_backup_request && !(is_backup_request == rhs.is_backup_request)) + return false; + return true; + } + bool operator!=(const thrift_request_meta_v1 &rhs) const { return !(*this == rhs); } + + bool operator<(const thrift_request_meta_v1 &) const; + + uint32_t read(::apache::thrift::protocol::TProtocol *iprot); + uint32_t write(::apache::thrift::protocol::TProtocol *oprot) const; + + virtual void printTo(std::ostream &out) const; +}; + +void swap(thrift_request_meta_v1 &a, thrift_request_meta_v1 &b); + +inline std::ostream &operator<<(std::ostream &out, const thrift_request_meta_v1 &obj) +{ + obj.printTo(out); + return out; +} + } // namespace #endif diff --git a/include/dsn/tool-api/rpc_message.h b/include/dsn/tool-api/rpc_message.h index e23243de9b..d4c83774e3 100644 --- a/include/dsn/tool-api/rpc_message.h +++ b/include/dsn/tool-api/rpc_message.h @@ -70,7 +70,8 @@ typedef union msg_context uint64_t unused : 4; ///< not used yet uint64_t serialize_format : 4; ///< dsn_msg_serialize_format uint64_t is_forward_supported : 1; ///< whether support forwarding a message to real leader - uint64_t reserved : 53; + uint64_t is_backup_request : 1; ///< whether the RPC is a backup request + uint64_t reserved : 52; } u; uint64_t context; ///< msg_context is of sizeof(uint64_t) } msg_context_t; @@ -208,6 +209,8 @@ class message_ex : public ref_counter, size_t body_size() { return (size_t)header->body_length; } DSN_API void *rw_ptr(size_t offset_begin); + bool is_backup_request() const { return header->context.u.is_backup_request; } + private: DSN_API message_ex(); DSN_API void prepare_buffer_header(); @@ -228,4 +231,4 @@ class message_ex : public ref_counter, }; typedef dsn::ref_ptr message_ptr; -} // end namespace +} // namespace dsn diff --git a/src/core/core/dsn.layer2_types.cpp b/src/core/core/dsn.layer2_types.cpp index 3ad6595dc7..d01d069936 100644 --- a/src/core/core/dsn.layer2_types.cpp +++ b/src/core/core/dsn.layer2_types.cpp @@ -1070,4 +1070,214 @@ void app_info::printTo(std::ostream &out) const out << ")"; } +thrift_request_meta_v1::~thrift_request_meta_v1() throw() {} + +void thrift_request_meta_v1::__set_app_id(const int32_t val) +{ + this->app_id = val; + __isset.app_id = true; +} + +void thrift_request_meta_v1::__set_partition_index(const int32_t val) +{ + this->partition_index = val; + __isset.partition_index = true; +} + +void thrift_request_meta_v1::__set_client_timeout(const int32_t val) +{ + this->client_timeout = val; + __isset.client_timeout = true; +} + +void thrift_request_meta_v1::__set_client_partition_hash(const int64_t val) +{ + this->client_partition_hash = val; + __isset.client_partition_hash = true; +} + +void thrift_request_meta_v1::__set_is_backup_request(const bool val) +{ + this->is_backup_request = val; + __isset.is_backup_request = true; +} + +uint32_t thrift_request_meta_v1::read(::apache::thrift::protocol::TProtocol *iprot) +{ + + apache::thrift::protocol::TInputRecursionTracker tracker(*iprot); + uint32_t xfer = 0; + std::string fname; + ::apache::thrift::protocol::TType ftype; + int16_t fid; + + xfer += iprot->readStructBegin(fname); + + using ::apache::thrift::protocol::TProtocolException; + + while (true) { + xfer += iprot->readFieldBegin(fname, ftype, fid); + if (ftype == ::apache::thrift::protocol::T_STOP) { + break; + } + switch (fid) { + case 1: + if (ftype == ::apache::thrift::protocol::T_I32) { + xfer += iprot->readI32(this->app_id); + this->__isset.app_id = true; + } else { + xfer += iprot->skip(ftype); + } + break; + case 2: + if (ftype == ::apache::thrift::protocol::T_I32) { + xfer += iprot->readI32(this->partition_index); + this->__isset.partition_index = true; + } else { + xfer += iprot->skip(ftype); + } + break; + case 3: + if (ftype == ::apache::thrift::protocol::T_I32) { + xfer += iprot->readI32(this->client_timeout); + this->__isset.client_timeout = true; + } else { + xfer += iprot->skip(ftype); + } + break; + case 4: + if (ftype == ::apache::thrift::protocol::T_I64) { + xfer += iprot->readI64(this->client_partition_hash); + this->__isset.client_partition_hash = true; + } else { + xfer += iprot->skip(ftype); + } + break; + case 5: + if (ftype == ::apache::thrift::protocol::T_BOOL) { + xfer += iprot->readBool(this->is_backup_request); + this->__isset.is_backup_request = true; + } else { + xfer += iprot->skip(ftype); + } + break; + default: + xfer += iprot->skip(ftype); + break; + } + xfer += iprot->readFieldEnd(); + } + + xfer += iprot->readStructEnd(); + + return xfer; +} + +uint32_t thrift_request_meta_v1::write(::apache::thrift::protocol::TProtocol *oprot) const +{ + uint32_t xfer = 0; + apache::thrift::protocol::TOutputRecursionTracker tracker(*oprot); + xfer += oprot->writeStructBegin("thrift_request_meta_v1"); + + if (this->__isset.app_id) { + xfer += oprot->writeFieldBegin("app_id", ::apache::thrift::protocol::T_I32, 1); + xfer += oprot->writeI32(this->app_id); + xfer += oprot->writeFieldEnd(); + } + if (this->__isset.partition_index) { + xfer += oprot->writeFieldBegin("partition_index", ::apache::thrift::protocol::T_I32, 2); + xfer += oprot->writeI32(this->partition_index); + xfer += oprot->writeFieldEnd(); + } + if (this->__isset.client_timeout) { + xfer += oprot->writeFieldBegin("client_timeout", ::apache::thrift::protocol::T_I32, 3); + xfer += oprot->writeI32(this->client_timeout); + xfer += oprot->writeFieldEnd(); + } + if (this->__isset.client_partition_hash) { + xfer += + oprot->writeFieldBegin("client_partition_hash", ::apache::thrift::protocol::T_I64, 4); + xfer += oprot->writeI64(this->client_partition_hash); + xfer += oprot->writeFieldEnd(); + } + if (this->__isset.is_backup_request) { + xfer += oprot->writeFieldBegin("is_backup_request", ::apache::thrift::protocol::T_BOOL, 5); + xfer += oprot->writeBool(this->is_backup_request); + xfer += oprot->writeFieldEnd(); + } + xfer += oprot->writeFieldStop(); + xfer += oprot->writeStructEnd(); + return xfer; +} + +void swap(thrift_request_meta_v1 &a, thrift_request_meta_v1 &b) +{ + using ::std::swap; + swap(a.app_id, b.app_id); + swap(a.partition_index, b.partition_index); + swap(a.client_timeout, b.client_timeout); + swap(a.client_partition_hash, b.client_partition_hash); + swap(a.is_backup_request, b.is_backup_request); + swap(a.__isset, b.__isset); +} + +thrift_request_meta_v1::thrift_request_meta_v1(const thrift_request_meta_v1 &other49) +{ + app_id = other49.app_id; + partition_index = other49.partition_index; + client_timeout = other49.client_timeout; + client_partition_hash = other49.client_partition_hash; + is_backup_request = other49.is_backup_request; + __isset = other49.__isset; +} +thrift_request_meta_v1::thrift_request_meta_v1(thrift_request_meta_v1 &&other50) +{ + app_id = std::move(other50.app_id); + partition_index = std::move(other50.partition_index); + client_timeout = std::move(other50.client_timeout); + client_partition_hash = std::move(other50.client_partition_hash); + is_backup_request = std::move(other50.is_backup_request); + __isset = std::move(other50.__isset); +} +thrift_request_meta_v1 &thrift_request_meta_v1::operator=(const thrift_request_meta_v1 &other51) +{ + app_id = other51.app_id; + partition_index = other51.partition_index; + client_timeout = other51.client_timeout; + client_partition_hash = other51.client_partition_hash; + is_backup_request = other51.is_backup_request; + __isset = other51.__isset; + return *this; +} +thrift_request_meta_v1 &thrift_request_meta_v1::operator=(thrift_request_meta_v1 &&other52) +{ + app_id = std::move(other52.app_id); + partition_index = std::move(other52.partition_index); + client_timeout = std::move(other52.client_timeout); + client_partition_hash = std::move(other52.client_partition_hash); + is_backup_request = std::move(other52.is_backup_request); + __isset = std::move(other52.__isset); + return *this; +} +void thrift_request_meta_v1::printTo(std::ostream &out) const +{ + using ::apache::thrift::to_string; + out << "thrift_request_meta_v1("; + out << "app_id="; + (__isset.app_id ? (out << to_string(app_id)) : (out << "")); + out << ", " + << "partition_index="; + (__isset.partition_index ? (out << to_string(partition_index)) : (out << "")); + out << ", " + << "client_timeout="; + (__isset.client_timeout ? (out << to_string(client_timeout)) : (out << "")); + out << ", " + << "client_partition_hash="; + (__isset.client_partition_hash ? (out << to_string(client_partition_hash)) : (out << "")); + out << ", " + << "is_backup_request="; + (__isset.is_backup_request ? (out << to_string(is_backup_request)) : (out << "")); + out << ")"; +} + } // namespace diff --git a/src/core/tests/thrift_message_parser_test.cpp b/src/core/tests/thrift_message_parser_test.cpp new file mode 100644 index 0000000000..82ea4bacad --- /dev/null +++ b/src/core/tests/thrift_message_parser_test.cpp @@ -0,0 +1,396 @@ +// Copyright (c) 2019, Xiaomi, Inc. All rights reserved. +// This source code is licensed under the Apache License Version 2.0, which +// can be found in the LICENSE file in the root directory of this source tree. + +#include +#include +#include +#include +#include + +#include "core/tools/common/thrift_message_parser.h" + +namespace dsn { + +DEFINE_TASK_CODE_RPC(RPC_TEST_THRIFT_MESSAGE_PARSER, TASK_PRIORITY_COMMON, THREAD_POOL_DEFAULT) + +class thrift_message_parser_test : public testing::Test +{ +public: + void mock_reader_read_data(message_reader &reader, const std::string &data) + { + char *buf = reader.read_buffer_ptr(data.length()); + memcpy(buf, data.c_str(), data.size()); + reader.mark_read(data.length()); + } + + void test_get_message_on_receive_v0_data(message_reader &reader, + apache::thrift::protocol::TMessageType messageType, + bool is_request) + { + /// write rpc message + size_t body_length = 0; + message_ptr msg = + message_ex::create_request(RPC_TEST_THRIFT_MESSAGE_PARSER, 1000, 64, 5000000000); + rpc_write_stream stream(msg); + binary_writer_transport binary_transport(stream); + boost::shared_ptr trans_ptr(&binary_transport, + [](binary_writer_transport *) {}); + ::apache::thrift::protocol::TBinaryProtocol oprot(trans_ptr); + body_length += oprot.writeMessageBegin("RPC_TEST_THRIFT_MESSAGE_PARSER", messageType, 999); + body_length += oprot.writeMessageEnd(); + stream.commit_buffer(); + + thrift_message_parser parser; + std::string data; + int read_next = 0; + data = std::string("THFT") + std::string(44 + body_length, '\0'); // header+body_length + data_output out(&data[4], 44); + out.write_u32(0); // hdr_version + out.write_u32(48); // hdr_length + out.write_u32(0); // hdr_crc32 + out.write_u32(body_length); // body_length + out.write_u32(0); // body_crc32 + out.write_u32(1); // app_id + out.write_u32(28); // partition_index + out.write_u32(1000); // client_timeout + out.write_u32(64); // client_thread_hash + out.write_u64(5000000000); // client_partition_hash + ASSERT_EQ(stream.get_buffer().size(), body_length); + memcpy(&data[48], stream.get_buffer().data(), stream.get_buffer().size()); + + mock_reader_read_data(reader, data); + msg = parser.get_message_on_receive(&reader, read_next); + + if (is_request) { + ASSERT_NE(msg, nullptr); + ASSERT_EQ(msg->hdr_format, NET_HDR_THRIFT); + + ASSERT_EQ(msg->header->body_length, body_length); + ASSERT_EQ(msg->header->gpid, gpid(1, 28)); + ASSERT_EQ(msg->header->hdr_type, THRIFT_HDR_SIG); + ASSERT_EQ(msg->header->hdr_length, sizeof(message_header)); + ASSERT_EQ(msg->header->hdr_crc32, CRC_INVALID); + ASSERT_EQ(msg->header->body_crc32, CRC_INVALID); + ASSERT_EQ(msg->header->id, 999); + + ASSERT_EQ(msg->header->client.timeout_ms, 1000); + ASSERT_EQ(msg->header->client.thread_hash, 64); + ASSERT_EQ(msg->header->client.partition_hash, 5000000000); + + ASSERT_EQ(msg->header->context.u.is_request, true); + ASSERT_EQ(msg->header->context.u.serialize_format, DSF_THRIFT_BINARY); + + // v0 Thrift network format doesn't support message context. + ASSERT_EQ(msg->header->context.u.is_backup_request, false); + ASSERT_EQ(msg->header->context.u.is_forwarded, false); + ASSERT_EQ(msg->header->context.u.is_forward_supported, false); + + ASSERT_EQ(reader.buffer().size(), 0); + + // must be reset + ASSERT_EQ(parser._header_version, -1); + ASSERT_EQ(parser._v1_specific_vars->_meta_parsed, false); + ASSERT_EQ(parser._v1_specific_vars->_meta_length, 0); + ASSERT_EQ(parser._v1_specific_vars->_body_length, 0); + } else { + ASSERT_EQ(msg, nullptr); + ASSERT_EQ(read_next, -1); + } + } + + void test_get_message_on_receive_v1_data(message_reader &reader, + apache::thrift::protocol::TMessageType messageType, + bool is_request, + bool is_backup_request) + { + /// write rpc message + size_t body_length = 0; + message_ptr msg = + message_ex::create_request(RPC_TEST_THRIFT_MESSAGE_PARSER, 1000, 64, 5000000000); + rpc_write_stream body_stream(msg); + { + binary_writer_transport transport(body_stream); + boost::shared_ptr trans_ptr(&transport, + [](binary_writer_transport *) {}); + ::apache::thrift::protocol::TBinaryProtocol oprot(trans_ptr); + body_length += + oprot.writeMessageBegin("RPC_TEST_THRIFT_MESSAGE_PARSER", messageType, 999); + body_length += oprot.writeMessageEnd(); + body_stream.commit_buffer(); + ASSERT_EQ(body_stream.get_buffer().size(), body_length); + } + + // write rpc meta + size_t meta_length = 0; + thrift_request_meta_v1 meta; + meta.__set_is_backup_request(is_backup_request); + meta.__set_app_id(1); + meta.__set_partition_index(28); + meta.__set_client_timeout(1000); + meta.__set_client_partition_hash(5000000000); + + binary_writer meta_writer(1024); + ::dsn::binary_writer_transport trans(meta_writer); + boost::shared_ptr<::dsn::binary_writer_transport> transport( + &trans, [](::dsn::binary_writer_transport *) {}); + ::apache::thrift::protocol::TBinaryProtocol proto(transport); + meta.write(&proto); + + meta_length = meta_writer.get_buffer().size(); + + thrift_message_parser parser; + std::string data; + int read_next = 0; + data = std::string("THFT") + std::string(12 + meta_length + body_length, '\0'); + data_output out(&data[4], 12); + out.write_u32(1); + out.write_u32(meta_length); + out.write_u32(body_length); + + memcpy(&data[16], meta_writer.get_buffer().data(), meta_writer.get_buffer().size()); + memcpy(&data[16 + meta_length], + body_stream.get_buffer().data(), + body_stream.get_buffer().size()); + + mock_reader_read_data(reader, data); + ASSERT_EQ(reader.buffer().size(), data.size()); + ASSERT_EQ(reader.buffer().size(), 16 + meta_length + body_length); + + msg = parser.get_message_on_receive(&reader, read_next); + + if (is_request) { + ASSERT_NE(msg, nullptr); + ASSERT_EQ(msg->hdr_format, NET_HDR_THRIFT); + + ASSERT_EQ(msg->header->body_length, body_length); + ASSERT_EQ(msg->header->gpid, gpid(1, 28)); + ASSERT_EQ(msg->header->hdr_type, THRIFT_HDR_SIG); + ASSERT_EQ(msg->header->hdr_length, sizeof(message_header)); + ASSERT_EQ(msg->header->hdr_crc32, CRC_INVALID); + ASSERT_EQ(msg->header->body_crc32, CRC_INVALID); + ASSERT_EQ(msg->header->id, 999); + + ASSERT_EQ(msg->header->client.timeout_ms, 1000); + ASSERT_EQ(msg->header->client.thread_hash, 7947); + ASSERT_EQ(msg->header->client.partition_hash, 5000000000); + + ASSERT_EQ(msg->header->context.u.is_request, true); + ASSERT_EQ(msg->header->context.u.serialize_format, DSF_THRIFT_BINARY); + ASSERT_EQ(msg->header->context.u.is_backup_request, is_backup_request); + ASSERT_EQ(msg->header->context.u.is_forwarded, false); + ASSERT_EQ(msg->header->context.u.is_forward_supported, false); + + ASSERT_EQ(reader.buffer().size(), 0); + + // must be reset + ASSERT_EQ(parser._header_version, -1); + ASSERT_EQ(parser._v1_specific_vars->_meta_parsed, false); + } else { + ASSERT_EQ(msg, nullptr); + ASSERT_EQ(read_next, -1); + } + } +}; + +TEST_F(thrift_message_parser_test, get_message_on_receive_incomplete_second_field) +{ + for (int i = 0; i < 4; i++) { + thrift_message_parser parser; + + std::string data; + int read_next = 0; + message_reader reader(64); + data = std::string("THFT") + std::string(i, ' '); + mock_reader_read_data(reader, data); + ASSERT_EQ(reader._buffer_occupied, 4 + i); + ASSERT_EQ(reader.buffer().size(), 4 + i); + + message_ex *msg = parser.get_message_on_receive(&reader, read_next); + ASSERT_EQ(msg, nullptr); + ASSERT_EQ(read_next, 4 - i); + ASSERT_EQ(parser._header_version, -1); + ASSERT_EQ(parser._v1_specific_vars->_meta_parsed, false); + ASSERT_EQ(parser._v1_specific_vars->_meta_length, 0); + + // not consumed + ASSERT_EQ(reader._buffer_occupied, data.length()); + ASSERT_EQ(reader.buffer().size(), data.length()); + } +} + +TEST_F(thrift_message_parser_test, get_message_on_receive_incomplete_v0_hdr_len) +{ + for (int i = 4; i < 44; i++) { + thrift_message_parser parser; + + std::string data; + int read_next = 0; + message_reader reader(64); + data = std::string("THFT") + std::string(i, ' '); + + data_output out(&data[4], 8); + out.write_u32(0); + out.write_u32(48); + + mock_reader_read_data(reader, data); + ASSERT_EQ(reader.buffer().size(), data.length()); + + message_ex *msg = parser.get_message_on_receive(&reader, read_next); + ASSERT_EQ(msg, nullptr); + ASSERT_EQ(read_next, 48 - data.length()); // read remaining fields + ASSERT_EQ(parser._header_version, -1); + } +} + +TEST_F(thrift_message_parser_test, get_message_on_receive_invalid_v0_hdr_length) +{ + for (int i = 0; i < 48; i++) { + thrift_message_parser parser; + + std::string data; + int read_next = 0; + message_reader reader(64); + data = std::string("THFT") + std::string(44, '\0'); // full 48 bytes + + // hdr_version = 0 + data_output out(&data[4], 8); + out.write_u32(0); + // hdr_length = i + out.write_u32(i); + + mock_reader_read_data(reader, data); + message_ex *msg = parser.get_message_on_receive(&reader, read_next); + ASSERT_EQ(msg, nullptr); + ASSERT_EQ(read_next, -1); + ASSERT_EQ(parser._header_version, -1); + } +} + +TEST_F(thrift_message_parser_test, get_message_on_receive_valid_v0_hdr) +{ + thrift_message_parser parser; + std::string data; + int read_next = 0; + message_reader reader(64); + data = std::string("THFT") + std::string(44, '\0'); // full 48 bytes + data_output out(&data[4], 44); + out.write_u32(0); // hdr_version + out.write_u32(48); // hdr_length + out.write_u32(0); // hdr_crc32 + out.write_u32(100); // body_length + out.write_u32(0); // body_crc32 + out.write_u32(1); // app_id + out.write_u32(28); // partition_index + out.write_u32(1000); // client_timeout + out.write_u32(64); // client_thread_hash + out.write_u64(5000000000); // client_partition_hash + + mock_reader_read_data(reader, data); + + message_ex *msg = parser.get_message_on_receive(&reader, read_next); + ASSERT_EQ(msg, nullptr); + ASSERT_EQ(read_next, 100); // required to read more + ASSERT_EQ(parser._header_version, 0); + ASSERT_EQ(reader.buffer().size(), 0); + ASSERT_EQ(parser._meta_v0->hdr_crc32, 0); + ASSERT_EQ(parser._meta_v0->body_length, 100); + ASSERT_EQ(parser._meta_v0->body_crc32, 0); + ASSERT_EQ(parser._meta_v0->app_id, 1); + ASSERT_EQ(parser._meta_v0->partition_index, 28); + ASSERT_EQ(parser._meta_v0->client_timeout, 1000); + ASSERT_EQ(parser._meta_v0->client_thread_hash, 64); + ASSERT_EQ(parser._meta_v0->client_partition_hash, 5000000000); +} + +TEST_F(thrift_message_parser_test, get_message_on_receive_valid_v0_data) +{ + message_reader reader(64); + + ASSERT_NO_FATAL_FAILURE( + test_get_message_on_receive_v0_data(reader, apache::thrift::protocol::T_CALL, true)); + ASSERT_NO_FATAL_FAILURE( + test_get_message_on_receive_v0_data(reader, apache::thrift::protocol::T_ONEWAY, true)); +} + +TEST_F(thrift_message_parser_test, get_message_on_receive_v0_not_request) +{ + message_reader reader(64); + + // ensure server won't corrupt when it receives a non-request. + ASSERT_NO_FATAL_FAILURE( + test_get_message_on_receive_v0_data(reader, apache::thrift::protocol::T_REPLY, false)); + reader.truncate_read(); + ASSERT_NO_FATAL_FAILURE(test_get_message_on_receive_v0_data( + reader, apache::thrift::protocol::TMessageType(65), false)); + reader.truncate_read(); +} + +TEST_F(thrift_message_parser_test, get_message_on_receive_incomplete_v1_hdr) +{ + for (int i = 4; i < 12; i++) { + thrift_message_parser parser; + + std::string data; + int read_next = 0; + message_reader reader(64); + data = std::string("THFT") + std::string(i, ' '); + + data_output out(&data[4], 8); + out.write_u32(1); + + mock_reader_read_data(reader, data); + ASSERT_EQ(reader.buffer().size(), data.length()); + + message_ex *msg = parser.get_message_on_receive(&reader, read_next); + ASSERT_EQ(msg, nullptr); + ASSERT_EQ(read_next, 16 - data.length()); // read remaining fields + ASSERT_EQ(parser._header_version, -1); + ASSERT_EQ(parser._v1_specific_vars->_meta_length, 0); + ASSERT_EQ(parser._v1_specific_vars->_body_length, 0); + } +} + +TEST_F(thrift_message_parser_test, get_message_on_receive_valid_v1_hdr) +{ + thrift_message_parser parser; + std::string data; + int read_next = 0; + message_reader reader(64); + data = std::string("THFT") + std::string(12, '\0'); // full 12 bytes + data_output out(&data[4], 12); + out.write_u32(1); // header_version + out.write_u32(100); // meta_length + out.write_u32(200); // body_length + + mock_reader_read_data(reader, data); + ASSERT_EQ(reader.buffer().size(), 16); + + message_ex *msg = parser.get_message_on_receive(&reader, read_next); + ASSERT_EQ(msg, nullptr); + ASSERT_EQ(read_next, 100); // required to read more + ASSERT_EQ(parser._header_version, 1); + ASSERT_EQ(parser._v1_specific_vars->_meta_length, 100); + ASSERT_EQ(parser._v1_specific_vars->_body_length, 200); + ASSERT_EQ(parser._v1_specific_vars->_meta_parsed, false); + ASSERT_EQ(reader.buffer().size(), 0); +} + +TEST_F(thrift_message_parser_test, get_message_on_receive_v1_data) +{ + message_reader reader(64); + ASSERT_NO_FATAL_FAILURE( + test_get_message_on_receive_v1_data(reader, apache::thrift::protocol::T_CALL, true, true)); + ASSERT_NO_FATAL_FAILURE( + test_get_message_on_receive_v1_data(reader, apache::thrift::protocol::T_CALL, true, false)); + ASSERT_NO_FATAL_FAILURE(test_get_message_on_receive_v1_data( + reader, apache::thrift::protocol::T_ONEWAY, true, true)); + ASSERT_NO_FATAL_FAILURE(test_get_message_on_receive_v1_data( + reader, apache::thrift::protocol::T_ONEWAY, true, false)); + + ASSERT_NO_FATAL_FAILURE(test_get_message_on_receive_v1_data( + reader, apache::thrift::protocol::TMessageType(65), false, false)); + reader.truncate_read(); +} + +} // namespace dsn diff --git a/src/core/tools/common/thrift_message_parser.cpp b/src/core/tools/common/thrift_message_parser.cpp index bb12131ade..64ff4fb179 100644 --- a/src/core/tools/common/thrift_message_parser.cpp +++ b/src/core/tools/common/thrift_message_parser.cpp @@ -25,10 +25,15 @@ */ #include "thrift_message_parser.h" + #include #include +#include +#include #include #include +#include +#include namespace dsn { @@ -36,55 +41,52 @@ namespace dsn { // Request Parsing // // // -void thrift_message_parser::read_thrift_header(const char *buffer, - /*out*/ thrift_message_header &header) +/// +/// For version 0: +/// |<-- fixed-size request header -->|<--request body-->| +/// |-"THFT"-|- hdr_version + hdr_length -|- request_meta_v0 -|- blob -| +/// |-"THFT"-|- uint32(0) + uint32(48) -|- 36bytes -|- -| +/// |- 12bytes -|- 36bytes -|- -| +/// +/// For version 1: +/// |<-- fixed-size request header -->| <-- request body -->| +/// |-"THFT"-|- hdr_version + meta_length + body_length -|- thrift_request_meta_v1 -|- blob -| +/// |-"THFT"-|- uint32(1) + uint32 + uint32 -|- thrift struct -|- -| +/// |- 16bytes -|- thrift struct -|- -| +/// +/// TODO(wutao1): remove v0 once it has no user + +// "THFT" + uint32(hdr_version) + uint32(body_length) + uint32(meta_length) +static constexpr size_t HEADER_LENGTH_V1 = 16; + +// "THFT" + uint32(hdr_version) +static constexpr size_t THFT_HDR_VERSION_LENGTH = 8; + +// "THFT" + uint32(hdr_version) + uint32(hdr_length) + 36bytes(request_meta_v0) +static constexpr size_t HEADER_LENGTH_V0 = 48; + +static void parse_request_meta_v0(data_input &input, /*out*/ request_meta_v0 &meta) { - header.hdr_type = *(uint32_t *)(buffer); - buffer += sizeof(int32_t); - header.hdr_version = be32toh(*(int32_t *)(buffer)); - buffer += sizeof(int32_t); - header.hdr_length = be32toh(*(int32_t *)(buffer)); - buffer += sizeof(int32_t); - header.hdr_crc32 = be32toh(*(int32_t *)(buffer)); - buffer += sizeof(int32_t); - header.body_length = be32toh(*(int32_t *)(buffer)); - buffer += sizeof(int32_t); - header.body_crc32 = be32toh(*(int32_t *)(buffer)); - buffer += sizeof(int32_t); - header.app_id = be32toh(*(int32_t *)(buffer)); - buffer += sizeof(int32_t); - header.partition_index = be32toh(*(int32_t *)(buffer)); - buffer += sizeof(int32_t); - header.client_timeout = be32toh(*(int32_t *)(buffer)); - buffer += sizeof(int32_t); - header.client_thread_hash = be32toh(*(int32_t *)(buffer)); - buffer += sizeof(int32_t); - header.client_partition_hash = be64toh(*(int64_t *)(buffer)); + meta.hdr_crc32 = input.read_u32(); + meta.body_length = input.read_u32(); + meta.body_crc32 = input.read_u32(); + meta.app_id = input.read_u32(); + meta.partition_index = input.read_u32(); + meta.client_timeout = input.read_u32(); + meta.client_thread_hash = input.read_u32(); + meta.client_partition_hash = input.read_u64(); } -bool thrift_message_parser::check_thrift_header(const thrift_message_header &header) +static int32_t gpid_to_thread_hash(gpid id) { - if (header.hdr_type != THRIFT_HDR_SIG) { - derror("hdr_type should be %s, but %s", - message_parser::get_debug_string("THFT").c_str(), - message_parser::get_debug_string((const char *)&header.hdr_type).c_str()); - return false; - } - if (header.hdr_version != 0) { - derror("hdr_version should be 0, but %u", header.hdr_version); - return false; - } - if (header.hdr_length != sizeof(thrift_message_header)) { - derror("hdr_length should be %u, but %u", sizeof(thrift_message_header), header.hdr_length); - return false; - } - return true; + static const int magic_number = 7919; + return id.get_app_id() * magic_number + id.get_partition_index(); } -dsn::message_ex *thrift_message_parser::parse_message(const thrift_message_header &thrift_header, - dsn::blob &message_data) +// Reads the requests's name, seqid, and TMessageType from the binary data, +// and constructs a `message_ex` object. +static message_ex *create_message_from_request_blob(const blob &body_data) { - dsn::blob body_data = message_data.range(thrift_header.hdr_length); dsn::message_ex *msg = message_ex::create_receive_message_with_standalone_header(body_data); dsn::message_header *dsn_hdr = msg->header; @@ -98,83 +100,202 @@ dsn::message_ex *thrift_message_parser::parse_message(const thrift_message_heade ::apache::thrift::protocol::TMessageType mtype; int32_t seqid; iprot.readMessageBegin(fname, mtype, seqid); - dinfo("rpc name: %s, type: %d, seqid: %d", fname.c_str(), mtype, seqid); - - dsn_hdr->hdr_type = THRIFT_HDR_SIG; - dsn_hdr->hdr_length = sizeof(message_header); - dsn_hdr->body_length = thrift_header.body_length; - dsn_hdr->hdr_crc32 = dsn_hdr->body_crc32 = CRC_INVALID; - dsn_hdr->id = seqid; strncpy(dsn_hdr->rpc_name, fname.c_str(), sizeof(dsn_hdr->rpc_name) - 1); dsn_hdr->rpc_name[sizeof(dsn_hdr->rpc_name) - 1] = '\0'; - dsn_hdr->gpid.set_app_id(thrift_header.app_id); - dsn_hdr->gpid.set_partition_index(thrift_header.partition_index); - dsn_hdr->client.timeout_ms = thrift_header.client_timeout; - dsn_hdr->client.thread_hash = thrift_header.client_thread_hash; - dsn_hdr->client.partition_hash = thrift_header.client_partition_hash; if (mtype == ::apache::thrift::protocol::T_CALL || - mtype == ::apache::thrift::protocol::T_ONEWAY) + mtype == ::apache::thrift::protocol::T_ONEWAY) { dsn_hdr->context.u.is_request = 1; - dassert(dsn_hdr->context.u.is_request == 1, "only support receive request"); + } + if (dsn_hdr->context.u.is_request != 1) { + derror("invalid message type: %d", mtype); + delete msg; + return nullptr; + } dsn_hdr->context.u.serialize_format = DSF_THRIFT_BINARY; // always serialize in thrift binary + // common fields + msg->hdr_format = NET_HDR_THRIFT; + dsn_hdr->hdr_type = THRIFT_HDR_SIG; + dsn_hdr->hdr_length = sizeof(message_header); + dsn_hdr->hdr_crc32 = msg->header->body_crc32 = CRC_INVALID; return msg; } -message_ex *thrift_message_parser::get_message_on_receive(message_reader *reader, - /*out*/ int &read_next) +// Parses the request's fixed-size header. +// +// For version 0: +// |-"THFT"-|- hdr_version + hdr_length -|- request_meta_v0 -| +// +// For version 1: +// |-"THFT"-|- hdr_version + meta_length + body_length -| +// +bool thrift_message_parser::parse_request_header(message_reader *reader, int &read_next) { - read_next = 4096; + blob buf = reader->buffer(); + // make sure there is enough space for 'THFT' and header_version + if (buf.size() < THFT_HDR_VERSION_LENGTH) { + read_next = THFT_HDR_VERSION_LENGTH - buf.size(); + return false; + } - dsn::blob &buf = reader->_buffer; - char *buf_ptr = (char *)buf.data(); - unsigned int buf_len = reader->_buffer_occupied; - - if (buf_len >= sizeof(thrift_message_header)) { - if (!_header_parsed) { - read_thrift_header(buf_ptr, _thrift_header); - - if (!check_thrift_header(_thrift_header)) { - derror("header check failed"); - read_next = -1; - return nullptr; - } else { - _header_parsed = true; - } + // The first 4 bytes is "THFT" + data_input input(buf); + if (memcmp(buf.data(), "THFT", 4) != 0) { + derror("hdr_type mismatch %s", message_parser::get_debug_string(buf.data()).c_str()); + read_next = -1; + return false; + } + input.skip(4); + + // deal with different versions + int header_version = input.read_u32(); + if (0 == header_version) { + if (buf.size() < HEADER_LENGTH_V0) { + read_next = HEADER_LENGTH_V0 - buf.size(); + return false; + } + + uint32_t hdr_length = input.read_u32(); + if (hdr_length != HEADER_LENGTH_V0) { + derror("hdr_length should be %u, but %u", HEADER_LENGTH_V0, hdr_length); + read_next = -1; + return false; } - unsigned int msg_sz = sizeof(thrift_message_header) + _thrift_header.body_length; - - // msg done - if (buf_len >= msg_sz) { - dsn::blob msg_bb = buf.range(0, msg_sz); - message_ex *msg = parse_message(_thrift_header, msg_bb); - - reader->_buffer = buf.range(msg_sz); - reader->_buffer_occupied -= msg_sz; - _header_parsed = false; - read_next = (reader->_buffer_occupied >= sizeof(thrift_message_header) - ? 0 - : sizeof(thrift_message_header) - reader->_buffer_occupied); - msg->hdr_format = NET_HDR_THRIFT; - return msg; + parse_request_meta_v0(input, *_meta_v0); + reader->consume_buffer(HEADER_LENGTH_V0); + } else if (1 == header_version) { + if (buf.size() < HEADER_LENGTH_V1) { + read_next = HEADER_LENGTH_V1 - buf.size(); + return false; } - // buf_len < msg_sz - else { - read_next = msg_sz - buf_len; + + _v1_specific_vars->_meta_length = input.read_u32(); + _v1_specific_vars->_body_length = input.read_u32(); + reader->consume_buffer(HEADER_LENGTH_V1); + } else { + derror("invalid hdr_version %d", _header_version); + read_next = -1; + return false; + } + _header_version = header_version; + + return true; +} + +message_ex *thrift_message_parser::parse_request_body_v0(message_reader *reader, int &read_next) +{ + blob buf = reader->buffer(); + + // Parses request data + // TODO(wutao1): handle the case where body_length is too short to parse. + if (buf.size() < _meta_v0->body_length) { + read_next = _meta_v0->body_length - buf.size(); + return nullptr; + } + + message_ex *msg = create_message_from_request_blob(buf); + if (msg == nullptr) { + read_next = -1; + reset(); + return nullptr; + } + + reader->consume_buffer(_meta_v0->body_length); + read_next = (reader->_buffer_occupied >= HEADER_LENGTH_V0 + ? 0 + : HEADER_LENGTH_V0 - reader->_buffer_occupied); + + msg->header->body_length = _meta_v0->body_length; + msg->header->gpid.set_app_id(_meta_v0->app_id); + msg->header->gpid.set_partition_index(_meta_v0->partition_index); + msg->header->client.timeout_ms = _meta_v0->client_timeout; + msg->header->client.thread_hash = _meta_v0->client_thread_hash; + msg->header->client.partition_hash = _meta_v0->client_partition_hash; + reset(); + return msg; +} + +message_ex *thrift_message_parser::parse_request_body_v1(message_reader *reader, int &read_next) +{ + // Parses request meta + blob buf = reader->buffer(); + if (!_v1_specific_vars->_meta_parsed) { + if (buf.size() < _v1_specific_vars->_meta_length) { + read_next = _v1_specific_vars->_meta_length - buf.size(); return nullptr; } + + binary_reader meta_reader(buf); + ::dsn::binary_reader_transport trans(meta_reader); + boost::shared_ptr<::dsn::binary_reader_transport> transport( + &trans, [](::dsn::binary_reader_transport *) {}); + ::apache::thrift::protocol::TBinaryProtocol proto(transport); + _v1_specific_vars->_meta_v1->read(&proto); + _v1_specific_vars->_meta_parsed = true; + } + buf = buf.range(_v1_specific_vars->_meta_length); + + // Parses request body + if (buf.size() < _v1_specific_vars->_body_length) { + read_next = _v1_specific_vars->_body_length - buf.size(); + return nullptr; } - // buf_len < sizeof(thrift_message_header) - else { - read_next = sizeof(thrift_message_header) - buf_len; + message_ex *msg = create_message_from_request_blob(buf); + if (msg == nullptr) { + read_next = -1; + reset(); return nullptr; } + + reader->consume_buffer(_v1_specific_vars->_meta_length + _v1_specific_vars->_body_length); + read_next = (reader->_buffer_occupied >= HEADER_LENGTH_V1 + ? 0 + : HEADER_LENGTH_V1 - reader->_buffer_occupied); + + msg->header->body_length = _v1_specific_vars->_body_length; + msg->header->gpid.set_app_id(_v1_specific_vars->_meta_v1->app_id); + msg->header->gpid.set_partition_index(_v1_specific_vars->_meta_v1->partition_index); + msg->header->client.timeout_ms = _v1_specific_vars->_meta_v1->client_timeout; + msg->header->client.thread_hash = gpid_to_thread_hash(msg->header->gpid); + msg->header->client.partition_hash = _v1_specific_vars->_meta_v1->client_partition_hash; + msg->header->context.u.is_backup_request = _v1_specific_vars->_meta_v1->is_backup_request; + reset(); + return msg; +} + +message_ex *thrift_message_parser::get_message_on_receive(message_reader *reader, + /*out*/ int &read_next) +{ + read_next = 4096; + // Parses request header, -1 means header has not been parsed + if (-1 == _header_version) { + if (!parse_request_header(reader, read_next)) { + return nullptr; + } + } + + // Parses request body + switch (_header_version) { + case 0: + return parse_request_body_v0(reader, read_next); + case 1: + return parse_request_body_v1(reader, read_next); + default: + assert("invalid header version"); + } + + return nullptr; } -void thrift_message_parser::reset() { _header_parsed = false; } +void thrift_message_parser::reset() +{ + _header_version = -1; + _meta_v0->clear(); + _v1_specific_vars->clear(); +} // // // Response Encoding // @@ -278,4 +399,11 @@ int thrift_message_parser::get_buffers_on_send(message_ex *msg, /*out*/ send_buf return i; } +thrift_message_parser::thrift_message_parser() + : _v1_specific_vars(new v1_specific_vars), _meta_v0(new request_meta_v0) +{ +} + +thrift_message_parser::~thrift_message_parser() = default; + } // namespace dsn diff --git a/src/core/tools/common/thrift_message_parser.h b/src/core/tools/common/thrift_message_parser.h index 18d84ddc44..730b68320f 100644 --- a/src/core/tools/common/thrift_message_parser.h +++ b/src/core/tools/common/thrift_message_parser.h @@ -1,51 +1,80 @@ /* -* The MIT License (MIT) -* -* Copyright (c) 2015 Microsoft Corporation -* -* -=- Robust Distributed System Nucleus (rDSN) -=- -* -* Permission is hereby granted, free of charge, to any person obtaining a copy -* of this software and associated documentation files (the "Software"), to deal -* in the Software without restriction, including without limitation the rights -* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -* copies of the Software, and to permit persons to whom the Software is -* furnished to do so, subject to the following conditions: -* -* The above copyright notice and this permission notice shall be included in -* all copies or substantial portions of the Software. -* -* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -* THE SOFTWARE. -*/ + * The MIT License (MIT) + * + * Copyright (c) 2015 Microsoft Corporation + * + * -=- Robust Distributed System Nucleus (rDSN) -=- + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ #pragma once #include #include #include +#include +#include +#include namespace dsn { -// request header (in big-endian) -struct thrift_message_header + +struct request_meta_v0 { - uint32_t hdr_type; ///< must be "THFT" - uint32_t hdr_version; ///< must be 0 - uint32_t hdr_length; ///< must be sizeof(thrift_message_header) - uint32_t hdr_crc32; - uint32_t body_length; - uint32_t body_crc32; - int32_t app_id; - int32_t partition_index; - int32_t client_timeout; - int32_t client_thread_hash; - uint64_t client_partition_hash; - //------------- sizeof(thrift_message_header) = 48 ----------// + void clear() + { + hdr_crc32 = 0; + body_length = 0; + body_crc32 = 0; + app_id = 0; + partition_index = 0; + client_timeout = 0; + client_thread_hash = 0; + client_partition_hash = 0; + } + + uint32_t hdr_crc32 = 0; + uint32_t body_length = 0; + uint32_t body_crc32 = 0; + int32_t app_id = 0; + int32_t partition_index = 0; + int32_t client_timeout = 0; + int32_t client_thread_hash = 0; + uint64_t client_partition_hash = 0; +}; + +struct v1_specific_vars +{ + v1_specific_vars() : _meta_v1(new thrift_request_meta_v1) {} + + void clear() + { + _meta_v1.reset(new thrift_request_meta_v1); + _meta_parsed = false; + _meta_length = 0; + _body_length = 0; + } + + bool _meta_parsed{false}; + uint32_t _meta_length{0}; + uint32_t _body_length{0}; + std::unique_ptr _meta_v1; }; #define THRIFT_HDR_SIG (*(uint32_t *)"THFT") @@ -58,32 +87,47 @@ DEFINE_CUSTOMIZED_ID(network_header_format, NET_HDR_THRIFT) class thrift_message_parser final : public message_parser { public: - thrift_message_parser() : _header_parsed(false) {} + thrift_message_parser(); - ~thrift_message_parser() {} + ~thrift_message_parser() override; void reset() override; message_ex *get_message_on_receive(message_reader *reader, /*out*/ int &read_next) override; - // response format: + // thrift response format: // // void prepare_on_send(message_ex *msg) override; int get_buffers_on_send(message_ex *msg, /*out*/ send_buf *buffers) override; -public: - static void read_thrift_header(const char *buffer, /*out*/ thrift_message_header &header); - static bool check_thrift_header(const thrift_message_header &header); +private: + message_ex *parse_request_body_v0(message_reader *reader, + /*out*/ int &read_next); + + message_ex *parse_request_body_v1(message_reader *reader, + /*out*/ int &read_next); - static dsn::message_ex *parse_message(const thrift_message_header &thrift_header, - dsn::blob &message_data); + bool parse_request_header(message_reader *reader, int &read_next); private: - thrift_message_header _thrift_header; - bool _header_parsed; + friend class thrift_message_parser_test; + FRIEND_TEST(thrift_message_parser_test, get_message_on_receive_incomplete_second_field); + FRIEND_TEST(thrift_message_parser_test, get_message_on_receive_incomplete_v0_hdr_len); + FRIEND_TEST(thrift_message_parser_test, get_message_on_receive_invalid_v0_hdr_length); + FRIEND_TEST(thrift_message_parser_test, get_message_on_receive_valid_v0_hdr); + FRIEND_TEST(thrift_message_parser_test, get_message_on_receive_incomplete_v1_hdr); + FRIEND_TEST(thrift_message_parser_test, get_message_on_receive_valid_v1_hdr); + + int _header_version{-1}; + + // meta version 1 specific variables + std::unique_ptr _v1_specific_vars; + + // meta version 0 specific variables + std::unique_ptr _meta_v0; }; } // namespace dsn diff --git a/src/dist/replication/lib/replica.cpp b/src/dist/replication/lib/replica.cpp index cdb5ee1219..b25bc8fbba 100644 --- a/src/dist/replication/lib/replica.cpp +++ b/src/dist/replication/lib/replica.cpp @@ -36,6 +36,7 @@ #include #include #include +#include namespace dsn { namespace replication { @@ -147,15 +148,15 @@ void replica::on_client_read(dsn::message_ex *request) return; } - if (status() != partition_status::PS_PRIMARY || + if (!request->is_backup_request()) { + // only backup request is allowed to read from a stale replica - // a small window where the state is not the latest yet - last_committed_decree() < _primary_states.last_prepare_decree_on_new_primary) { if (status() != partition_status::PS_PRIMARY) { response_client_read(request, ERR_INVALID_STATE); return; } + // a small window where the state is not the latest yet if (last_committed_decree() < _primary_states.last_prepare_decree_on_new_primary) { derror_replica("last_committed_decree(%" PRId64 ") < last_prepare_decree_on_new_primary(%" PRId64 ")", diff --git a/src/dsn.layer2.thrift b/src/dsn.layer2.thrift index 5ea4780cc3..eb4f6ca9cc 100644 --- a/src/dsn.layer2.thrift +++ b/src/dsn.layer2.thrift @@ -72,3 +72,22 @@ struct app_info // We use init_partition_count to handle those child partitions while sync_apps_from_remote_stroage 13:i32 init_partition_count = -1; } + +// Metadata field of the request in rDSN's thrift protocol (version 1). +// TODO(wutao1): add design doc of the thrift protocol. +struct thrift_request_meta_v1 +{ + // The replica's gpid. + 1:optional i32 app_id; + 2:optional i32 partition_index; + + // The timeout of this request that's set on client side. + 3:optional i32 client_timeout; + + // The hash value calculated from the hash key. + 4:optional i64 client_partition_hash; + + // Whether it is a backup request. If true, this request (only if it's a read) can be handled by + // a secondary replica, which does not guarantee strong consistency. + 5:optional bool is_backup_request; +}