Skip to content

Commit

Permalink
TiFlash supports stale read (#6459)
Browse files Browse the repository at this point in the history
close #4483
  • Loading branch information
hehechen authored Dec 16, 2022
1 parent 38e70c3 commit d174e12
Show file tree
Hide file tree
Showing 37 changed files with 520 additions and 225 deletions.
1 change: 1 addition & 0 deletions dbms/src/Common/TiFlashMetrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ namespace DB
M(tiflash_schema_apply_duration_seconds, "Bucketed histogram of ddl apply duration", Histogram, \
F(type_ddl_apply_duration, {{"req", "ddl_apply_duration"}}, ExpBuckets{0.001, 2, 20})) \
M(tiflash_raft_read_index_count, "Total number of raft read index", Counter) \
M(tiflash_stale_read_count, "Total number of stale read", Counter) \
M(tiflash_raft_read_index_duration_seconds, "Bucketed histogram of raft read index duration", Histogram, \
F(type_raft_read_index_duration, {{"type", "tmt_raft_read_index_duration"}}, ExpBuckets{0.001, 2, 20})) \
M(tiflash_raft_wait_index_duration_seconds, "Bucketed histogram of raft wait index duration", Histogram, \
Expand Down
5 changes: 5 additions & 0 deletions dbms/src/Debug/DAGProperties.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ struct DAGProperties
bool use_broadcast_join = false;
Int32 mpp_partition_num = 1;
Timestamp start_ts = DEFAULT_MAX_READ_TSO;
UInt64 query_ts = 0;
UInt64 server_id = 1;
UInt64 local_query_id = 1;
Int64 task_id = 1;

Int32 mpp_timeout = 10;
};
} // namespace DB
7 changes: 5 additions & 2 deletions dbms/src/Debug/MockComputeServerManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -115,11 +115,14 @@ void MockComputeServerManager::addServer(size_t partition_id, std::unique_ptr<Fl
server_map[partition_id] = std::move(server);
}

void MockComputeServerManager::cancelQuery(size_t start_ts)
void MockComputeServerManager::cancelQuery(const MPPQueryId & query_id)
{
mpp::CancelTaskRequest req;
auto * meta = req.mutable_meta();
meta->set_start_ts(start_ts);
meta->set_query_ts(query_id.query_ts);
meta->set_local_query_id(query_id.local_query_id);
meta->set_server_id(query_id.server_id);
meta->set_start_ts(query_id.start_ts);
mpp::CancelTaskResponse response;
for (const auto & server : server_map)
server.second->flashService()->cancelMPPTaskForTest(&req, &response);
Expand Down
3 changes: 1 addition & 2 deletions dbms/src/Debug/MockComputeServerManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@

namespace DB::tests
{

/** Hold Mock Compute Server to manage the lifetime of them.
* Maintains Mock Compute Server info.
*/
Expand All @@ -49,7 +48,7 @@ class MockComputeServerManager : public ext::Singleton<MockComputeServerManager>

void resetMockMPPServerInfo(size_t partition_num);

void cancelQuery(size_t start_ts);
void cancelQuery(const MPPQueryId & query_id);

static String queryInfo();

Expand Down
9 changes: 9 additions & 0 deletions dbms/src/Debug/MockExecutor/AstToPB.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,18 +57,27 @@ using MPPCtxPtr = std::shared_ptr<MPPCtx>;
struct MPPInfo
{
Timestamp start_ts;
UInt64 query_ts;
UInt64 server_id;
UInt64 local_query_id;
Int64 partition_id;
Int64 task_id;
const std::vector<Int64> sender_target_task_ids;
const std::unordered_map<String, std::vector<Int64>> receiver_source_task_ids_map;

MPPInfo(
Timestamp start_ts_,
UInt64 query_ts_,
UInt64 server_id_,
UInt64 local_query_id_,
Int64 partition_id_,
Int64 task_id_,
const std::vector<Int64> & sender_target_task_ids_,
const std::unordered_map<String, std::vector<Int64>> & receiver_source_task_ids_map_)
: start_ts(start_ts_)
, query_ts(query_ts_)
, server_id(server_id_)
, local_query_id(local_query_id_)
, partition_id(partition_id_)
, task_id(task_id_)
, sender_target_task_ids(sender_target_task_ids_)
Expand Down
3 changes: 3 additions & 0 deletions dbms/src/Debug/MockExecutor/ExchangeReceiverBinder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ bool ExchangeReceiverBinder::toTiPBExecutor(tipb::Executor * tipb_executor, int3
{
mpp::TaskMeta meta;
meta.set_start_ts(mpp_info.start_ts);
meta.set_query_ts(mpp_info.query_ts);
meta.set_server_id(mpp_info.server_id);
meta.set_local_query_id(mpp_info.local_query_id);
meta.set_task_id(it->second[i]);
meta.set_partition_id(i);
auto addr = context.isMPPTest() ? tests::MockComputeServerManager::instance().getServerConfigMap()[i].addr : Debug::LOCAL_HOST;
Expand Down
3 changes: 3 additions & 0 deletions dbms/src/Debug/MockExecutor/ExchangeSenderBinder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ bool ExchangeSenderBinder::toTiPBExecutor(tipb::Executor * tipb_executor, int32_
{
mpp::TaskMeta meta;
meta.set_start_ts(mpp_info.start_ts);
meta.set_query_ts(mpp_info.query_ts);
meta.set_server_id(mpp_info.server_id);
meta.set_local_query_id(mpp_info.local_query_id);
meta.set_task_id(task_id);
meta.set_partition_id(i);
auto addr = context.isMPPTest() ? tests::MockComputeServerManager::instance().getServerConfigMap()[i++].addr : Debug::LOCAL_HOST;
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Debug/dbgFuncMisc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ inline size_t getReadTSOForLog(const String & line)
{
std::regex rx(R"((0|[1-9][0-9]*))");
std::smatch m;
// Rely on that MPP task prefix "MPP<query:435802637197639681,task:1>"
auto pos = line.find("query:");
// Rely on that MPP task prefix "MPP<query:<query_ts:1671124209981679458, local_query_id:42578432, server_id:3340035, start_ts:438075169172357120>,task_id:42578433>"
auto pos = line.find(", start_ts:");
if (pos != std::string::npos && regex_search(line.cbegin() + pos, line.cend(), m, rx))
{
return std::stoul(m[1]);
Expand Down
5 changes: 4 additions & 1 deletion dbms/src/Debug/dbgQueryCompiler.h
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,9 @@ struct QueryFragment
{
MPPInfo mpp_info(
properties.start_ts,
properties.query_ts,
properties.server_id,
properties.local_query_id,
partition_id,
task_ids[partition_id],
sender_target_task_ids,
Expand All @@ -141,7 +144,7 @@ struct QueryFragment
}
else
{
MPPInfo mpp_info(properties.start_ts, /*partition_id*/ -1, /*task_id*/ -1, /*sender_target_task_ids*/ {}, /*receiver_source_task_ids_map*/ {});
MPPInfo mpp_info(properties.start_ts, properties.query_ts, properties.server_id, properties.local_query_id, /*partition_id*/ -1, /*task_id*/ -1, /*sender_target_task_ids*/ {}, /*receiver_source_task_ids_map*/ {});
ret.push_back(toQueryTask(properties, mpp_info, context));
}
return ret;
Expand Down
15 changes: 15 additions & 0 deletions dbms/src/Debug/dbgQueryExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ BlockInputStreamPtr constructExchangeReceiverStream(Context & context, tipb::Exc

mpp::TaskMeta root_tm;
root_tm.set_start_ts(properties.start_ts);
root_tm.set_query_ts(properties.query_ts);
root_tm.set_local_query_id(properties.local_query_id);
root_tm.set_server_id(properties.server_id);
root_tm.set_address(root_addr);
root_tm.set_task_id(-1);
root_tm.set_partition_id(-1);
Expand Down Expand Up @@ -71,6 +74,9 @@ BlockInputStreamPtr prepareRootExchangeReceiver(Context & context, const DAGProp
{
mpp::TaskMeta tm;
tm.set_start_ts(properties.start_ts);
tm.set_query_ts(properties.query_ts);
tm.set_local_query_id(properties.local_query_id);
tm.set_server_id(properties.server_id);
tm.set_address(Debug::LOCAL_HOST);
tm.set_task_id(root_task_id);
tm.set_partition_id(-1);
Expand All @@ -84,6 +90,9 @@ void prepareExchangeReceiverMetaWithMultipleContext(tipb::ExchangeReceiver & tip
{
mpp::TaskMeta tm;
tm.set_start_ts(properties.start_ts);
tm.set_query_ts(properties.query_ts);
tm.set_local_query_id(properties.local_query_id);
tm.set_server_id(properties.server_id);
tm.set_address(addr);
tm.set_task_id(task_id);
tm.set_partition_id(-1);
Expand All @@ -109,6 +118,9 @@ void prepareDispatchTaskRequest(QueryTask & task, std::shared_ptr<mpp::DispatchT
}
auto * tm = req->mutable_meta();
tm->set_start_ts(properties.start_ts);
tm->set_query_ts(properties.query_ts);
tm->set_local_query_id(properties.local_query_id);
tm->set_server_id(properties.server_id);
tm->set_partition_id(task.partition_id);
tm->set_address(addr);
tm->set_task_id(task.task_id);
Expand All @@ -128,6 +140,9 @@ void prepareDispatchTaskRequestWithMultipleContext(QueryTask & task, std::shared
}
auto * tm = req->mutable_meta();
tm->set_start_ts(properties.start_ts);
tm->set_query_ts(properties.query_ts);
tm->set_local_query_id(properties.local_query_id);
tm->set_server_id(properties.server_id);
tm->set_partition_id(task.partition_id);
tm->set_address(addr);
tm->set_task_id(task.task_id);
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Coprocessor/DAGContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ class DAGContext
, flags(dag_request->flags())
, sql_mode(dag_request->sql_mode())
, mpp_task_meta(meta_)
, mpp_task_id(mpp_task_meta.start_ts(), mpp_task_meta.task_id())
, mpp_task_id(mpp_task_meta)
, max_recorded_error_count(getMaxErrorCount(*dag_request))
, warnings(max_recorded_error_count)
, warning_count(0)
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Flash/FlashService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@ grpc::Status FlashService::CancelMPPTask(

auto & tmt_context = context->getTMTContext();
auto task_manager = tmt_context.getMPPTaskManager();
task_manager->abortMPPQuery(request->meta().start_ts(), "Receive cancel request from TiDB", AbortType::ONCANCELLATION);
task_manager->abortMPPQuery(MPPQueryId(request->meta()), "Receive cancel request from TiDB", AbortType::ONCANCELLATION);
return grpc::Status::OK;
}

Expand Down Expand Up @@ -407,7 +407,7 @@ ::grpc::Status FlashService::cancelMPPTaskForTest(const ::mpp::CancelTaskRequest
}
auto & tmt_context = context->getTMTContext();
auto task_manager = tmt_context.getMPPTaskManager();
task_manager->abortMPPQuery(request->meta().start_ts(), "Receive cancel request from GTest", AbortType::ONCANCELLATION);
task_manager->abortMPPQuery(MPPQueryId(request->meta()), "Receive cancel request from GTest", AbortType::ONCANCELLATION);
return grpc::Status::OK;
}

Expand Down
8 changes: 4 additions & 4 deletions dbms/src/Flash/Mpp/MPPTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ extern const char force_no_local_region_for_mpp_task[];

MPPTask::MPPTask(const mpp::TaskMeta & meta_, const ContextPtr & context_)
: meta(meta_)
, id(meta.start_ts(), meta.task_id())
, id(meta)
, context(context_)
, manager(context_->getTMTContext().getMPPTaskManager().get())
, schedule_entry(manager, id)
Expand Down Expand Up @@ -137,7 +137,7 @@ void MPPTask::registerTunnels(const mpp::DispatchTaskRequest & task_request)
LOG_DEBUG(log, "begin to register the tunnel {}, is_local: {}, is_async: {}", tunnel->id(), is_local, is_async);
if (status != INITIALIZING)
throw Exception(fmt::format("The tunnel {} can not be registered, because the task is not in initializing state", tunnel->id()));
tunnel_set_local->registerTunnel(MPPTaskId{task_meta.start_ts(), task_meta.task_id()}, tunnel);
tunnel_set_local->registerTunnel(MPPTaskId(task_meta), tunnel);
if (!dag_context->isRootMPPTask())
{
FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::exception_during_mpp_register_tunnel_for_non_root_mpp_task);
Expand Down Expand Up @@ -202,7 +202,7 @@ std::pair<MPPTunnelPtr, String> MPPTask::getTunnel(const ::mpp::EstablishMPPConn
return {nullptr, err_msg};
}

MPPTaskId receiver_id{request->receiver_meta().start_ts(), request->receiver_meta().task_id()};
MPPTaskId receiver_id(request->receiver_meta());
RUNTIME_ASSERT(tunnel_set != nullptr, log, "mpp task without tunnel set");
auto tunnel_ptr = tunnel_set->getTunnelByReceiverTaskId(receiver_id);
if (tunnel_ptr == nullptr)
Expand Down Expand Up @@ -450,7 +450,7 @@ void MPPTask::runImpl()
void MPPTask::handleError(const String & error_msg)
{
auto updated_msg = fmt::format("From {}: {}", id.toString(), error_msg);
manager->abortMPPQuery(id.start_ts, updated_msg, AbortType::ONERROR);
manager->abortMPPQuery(id.query_id, updated_msg, AbortType::ONERROR);
if (!registered)
// if the task is not registered, need to cancel it explicitly
abort(error_msg, AbortType::ONERROR);
Expand Down
76 changes: 74 additions & 2 deletions dbms/src/Flash/Mpp/MPPTaskId.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,91 @@
// limitations under the License.

#include <Flash/Mpp/MPPTaskId.h>
#include <common/likely.h>
#include <fmt/core.h>

namespace DB
{
bool isOldVersion(const MPPQueryId & mpp_query_id)
{
return mpp_query_id.query_ts == 0 && mpp_query_id.local_query_id == 0 && mpp_query_id.server_id == 0;
}


bool MPPQueryId::operator<(const MPPQueryId & mpp_query_id) const
{
// compare with MPP query generated by TiDB that version less than v6.6
bool left_old_version = isOldVersion(*this);
bool right_old_version = isOldVersion(mpp_query_id);
if (unlikely(left_old_version && right_old_version))
{
return start_ts < mpp_query_id.start_ts;
}
if (unlikely(left_old_version))
{
return true;
}
if (unlikely(right_old_version))
{
return false;
}
// compare with MPP query generated by TiDB that version after v6.6
if (query_ts != mpp_query_id.query_ts)
{
return query_ts < mpp_query_id.query_ts;
}
if (server_id == mpp_query_id.server_id)
{
return local_query_id < mpp_query_id.local_query_id;
}
// now we can't compare reasonably, just choose one randomly by hash.
auto lhash = MPPQueryIdHash()(*this);
auto rhash = MPPQueryIdHash()(mpp_query_id);
if (lhash != rhash)
{
return lhash < rhash;
}
// hash values are same, just compare the rest fields.
if (local_query_id != mpp_query_id.local_query_id)
{
return local_query_id < mpp_query_id.local_query_id;
}
return server_id < mpp_query_id.server_id;
}
bool MPPQueryId::operator==(const MPPQueryId & rid) const
{
return query_ts == rid.query_ts && local_query_id == rid.local_query_id && server_id == rid.server_id && start_ts == rid.start_ts;
}
bool MPPQueryId::operator!=(const MPPQueryId & rid) const
{
return !(*this == rid);
}
bool MPPQueryId::operator<=(const MPPQueryId & rid) const
{
return *this < rid || *this == rid;
}

size_t MPPQueryIdHash::operator()(MPPQueryId const & mpp_query_id) const noexcept
{
if (unlikely(isOldVersion(mpp_query_id)))
{
return std::hash<UInt64>()(mpp_query_id.start_ts);
}
return std::hash<UInt64>()(mpp_query_id.query_ts) ^ std::hash<UInt64>()(mpp_query_id.local_query_id) ^ std::hash<UInt64>()(mpp_query_id.server_id);
}

String MPPTaskId::toString() const
{
return isUnknown() ? "MPP<query:N/A,task:N/A>" : fmt::format("MPP<query:{},task:{}>", start_ts, task_id);
return isUnknown() ? "MPP<query_id:N/A,task_id:N/A>" : fmt::format("MPP<query:{},task_id:{}>", query_id.toString(), task_id);
}

const MPPTaskId MPPTaskId::unknown_mpp_task_id = MPPTaskId{};

constexpr UInt64 MAX_UINT64 = std::numeric_limits<UInt64>::max();
const MPPQueryId MPPTaskId::Max_Query_Id = MPPQueryId(MAX_UINT64, MAX_UINT64, MAX_UINT64, MAX_UINT64);

bool operator==(const MPPTaskId & lid, const MPPTaskId & rid)
{
return lid.start_ts == rid.start_ts && lid.task_id == rid.task_id;
return lid.query_id == rid.query_id && lid.task_id == rid.task_id;
}
} // namespace DB
Loading

0 comments on commit d174e12

Please sign in to comment.