Skip to content

Commit

Permalink
Merge branch 'master' into tidy_function_string
Browse files Browse the repository at this point in the history
  • Loading branch information
ywqzzy authored Jul 8, 2022
2 parents e8fd264 + e58a007 commit c0c0799
Show file tree
Hide file tree
Showing 38 changed files with 981 additions and 183 deletions.
3 changes: 2 additions & 1 deletion dbms/src/Columns/ColumnConst.h
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,8 @@ class ColumnConst final : public COWPtrHelper<IColumn, ColumnConst>
template <typename T>
T getValue() const
{
return getField().safeGet<typename NearestFieldType<T>::Type>();
auto && tmp = getField();
return std::move(tmp.safeGet<typename NearestFieldType<T>::Type>());
}
};

Expand Down
95 changes: 62 additions & 33 deletions dbms/src/Common/MPMCQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,56 +74,80 @@ class MPMCQueue
destruct(getObj(read_pos));
}

/// Block util:
/// Block until:
/// 1. Pop succeeds with a valid T: return true.
/// 2. The queue is cancelled or finished: return false.
bool pop(T & obj)
ALWAYS_INLINE bool pop(T & obj)
{
return popObj(obj);
return popObj<true>(obj);
}

/// Besides all conditions mentioned at `pop`, `tryPop` will return false if `timeout` is exceeded.
/// Besides all conditions mentioned at `pop`, `popTimeout` will return false if `timeout` is exceeded.
template <typename Duration>
bool tryPop(T & obj, const Duration & timeout)
ALWAYS_INLINE bool popTimeout(T & obj, const Duration & timeout)
{
/// std::condition_variable::wait_until will always use system_clock.
auto deadline = std::chrono::system_clock::now() + timeout;
return popObj(obj, &deadline);
return popObj<true>(obj, &deadline);
}

/// Block util:
/// Non-blocking function.
/// Return true if pop succeed.
/// else return false.
ALWAYS_INLINE bool tryPop(T & obj)
{
return popObj<false>(obj);
}

/// Block until:
/// 1. Push succeeds and return true.
/// 2. The queue is cancelled and return false.
/// 3. The queue has finished and return false.
template <typename U>
ALWAYS_INLINE bool push(U && u)
{
return pushObj(std::forward<U>(u));
return pushObj<true>(std::forward<U>(u));
}

/// Besides all conditions mentioned at `push`, `tryPush` will return false if `timeout` is exceeded.
/// Besides all conditions mentioned at `push`, `pushTimeout` will return false if `timeout` is exceeded.
template <typename U, typename Duration>
ALWAYS_INLINE bool tryPush(U && u, const Duration & timeout)
ALWAYS_INLINE bool pushTimeout(U && u, const Duration & timeout)
{
/// std::condition_variable::wait_until will always use system_clock.
auto deadline = std::chrono::system_clock::now() + timeout;
return pushObj(std::forward<U>(u), &deadline);
return pushObj<true>(std::forward<U>(u), &deadline);
}

/// Non-blocking function.
/// Return true if push succeed.
/// else return false.
template <typename U>
ALWAYS_INLINE bool tryPush(U && u)
{
return pushObj<false>(std::forward<U>(u));
}

/// The same as `push` except it will construct the object in place.
template <typename... Args>
ALWAYS_INLINE bool emplace(Args &&... args)
{
return emplaceObj(nullptr, std::forward<Args>(args)...);
return emplaceObj<true>(nullptr, std::forward<Args>(args)...);
}

/// The same as `tryPush` except it will construct the object in place.
/// The same as `pushTimeout` except it will construct the object in place.
template <typename... Args, typename Duration>
ALWAYS_INLINE bool tryEmplace(Args &&... args, const Duration & timeout)
ALWAYS_INLINE bool emplaceTimeout(Args &&... args, const Duration & timeout)
{
/// std::condition_variable::wait_until will always use system_clock.
auto deadline = std::chrono::system_clock::now() + timeout;
return emplaceObj(&deadline, std::forward<Args>(args)...);
return emplaceObj<true>(&deadline, std::forward<Args>(args)...);
}

/// The same as `tryPush` except it will construct the object in place.
template <typename... Args>
ALWAYS_INLINE bool tryEmplace(Args &&... args)
{
return emplaceObj<false>(nullptr, std::forward<Args>(args)...);
}

/// Cancel a NORMAL queue will wake up all blocking readers and writers.
Expand Down Expand Up @@ -233,22 +257,25 @@ class MPMCQueue
}
}

bool popObj(T & res, const TimePoint * deadline = nullptr)
template <bool need_wait>
bool popObj(T & res, [[maybe_unused]] const TimePoint * deadline = nullptr)
{
#ifdef __APPLE__
WaitingNode node;
#else
thread_local WaitingNode node;
#endif
{
/// read_pos < write_pos means the queue isn't empty
auto pred = [&] {
return read_pos < write_pos || !isNormal();
};

std::unique_lock lock(mu);

wait(lock, reader_head, node, pred, deadline);
if constexpr (need_wait)
{
/// read_pos < write_pos means the queue isn't empty
auto pred = [&] {
return read_pos < write_pos || !isNormal();
};
wait(lock, reader_head, node, pred, deadline);
}

if (!isCancelled() && read_pos < write_pos)
{
Expand All @@ -272,21 +299,23 @@ class MPMCQueue
return false;
}

template <typename F>
bool assignObj(const TimePoint * deadline, F && assigner)
template <bool need_wait, typename F>
bool assignObj([[maybe_unused]] const TimePoint * deadline, F && assigner)
{
#ifdef __APPLE__
WaitingNode node;
#else
thread_local WaitingNode node;
#endif
auto pred = [&] {
return write_pos - read_pos < capacity || !isNormal();
};

std::unique_lock lock(mu);

wait(lock, writer_head, node, pred, deadline);
if constexpr (need_wait)
{
auto pred = [&] {
return write_pos - read_pos < capacity || !isNormal();
};
wait(lock, writer_head, node, pred, deadline);
}

/// double check status after potential wait
/// check write_pos because timeouted will also reach here.
Expand All @@ -305,16 +334,16 @@ class MPMCQueue
return false;
}

template <typename U>
template <bool need_wait, typename U>
ALWAYS_INLINE bool pushObj(U && u, const TimePoint * deadline = nullptr)
{
return assignObj(deadline, [&](void * addr) { new (addr) T(std::forward<U>(u)); });
return assignObj<need_wait>(deadline, [&](void * addr) { new (addr) T(std::forward<U>(u)); });
}

template <typename... Args>
template <bool need_wait, typename... Args>
ALWAYS_INLINE bool emplaceObj(const TimePoint * deadline, Args &&... args)
{
return assignObj(deadline, [&](void * addr) { new (addr) T(std::forward<Args>(args)...); });
return assignObj<need_wait>(deadline, [&](void * addr) { new (addr) T(std::forward<Args>(args)...); });
}

ALWAYS_INLINE bool isNormal() const
Expand Down
25 changes: 14 additions & 11 deletions dbms/src/Common/tests/gtest_mpmc_queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,12 +98,14 @@ class MPMCQueueTest : public ::testing::Test
void testCannotTryPush(MPMCQueue<T> & queue)
{
auto old_size = queue.size();
auto res = queue.tryPush(ValueHelper<T>::make(-1), std::chrono::microseconds(1));
auto new_size = queue.size();
if (res)
bool ok1 = queue.tryPush(ValueHelper<T>::make(-1));
auto new_size1 = queue.size();
bool ok2 = queue.pushTimeout(ValueHelper<T>::make(-1), std::chrono::microseconds(1));
auto new_size2 = queue.size();
if (ok1 || ok2)
throw TiFlashTestException("Should push fail");
if (old_size != new_size)
throw TiFlashTestException(fmt::format("Size changed from {} to {} without push", old_size, new_size));
if (old_size != new_size1 || old_size != new_size2)
throw TiFlashTestException(fmt::format("Size changed from {} to {} and {} without push", old_size, new_size1, new_size2));
}

template <typename T>
Expand All @@ -124,12 +126,14 @@ class MPMCQueueTest : public ::testing::Test
{
auto old_size = queue.size();
T res;
bool ok = queue.tryPop(res, std::chrono::microseconds(1));
auto new_size = queue.size();
if (ok)
bool ok1 = queue.tryPop(res);
auto new_size1 = queue.size();
bool ok2 = queue.popTimeout(res, std::chrono::microseconds(1));
auto new_size2 = queue.size();
if (ok1 || ok2)
throw TiFlashTestException("Should pop fail");
if (old_size != new_size)
throw TiFlashTestException(fmt::format("Size changed from {} to {} without pop", old_size, new_size));
if (old_size != new_size1 || old_size != new_size2)
throw TiFlashTestException(fmt::format("Size changed from {} to {} and {} without pop", old_size, new_size1, new_size2));
}

template <typename T>
Expand Down Expand Up @@ -474,7 +478,6 @@ class MPMCQueueTest : public ::testing::Test
throwOrMove(std::move(rhs));
}


ThrowInjectable & operator=(ThrowInjectable && rhs)
{
if (this != &rhs)
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Common/tests/mpmc_queue_perftest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ struct Helper<MPMCQueue<T>>
template <typename U>
static void pushOneTo(MPMCQueue<T> & queue, U && data)
{
queue.tryPush(std::forward<U>(data), std::chrono::milliseconds(1));
queue.pushTimeout(std::forward<U>(data), std::chrono::milliseconds(1));
}
};

Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Debug/DBGInvoker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,8 @@ DBGInvoker::DBGInvoker()
regSchemafulFunc("query_mapped", dbgFuncQueryMapped);
regSchemalessFunc("get_tiflash_replica_count", dbgFuncGetTiflashReplicaCount);
regSchemalessFunc("get_partition_tables_tiflash_replica_count", dbgFuncGetPartitionTablesTiflashReplicaCount);
regSchemalessFunc("get_tiflash_mode", dbgFuncGetTiflashMode);
regSchemalessFunc("get_partition_tables_tiflash_mode", dbgFuncGetPartitionTablesTiflashMode);

regSchemalessFunc("search_log_for_key", dbgFuncSearchLogForKey);
regSchemalessFunc("tidb_dag", dbgFuncTiDBQueryFromNaturalDag);
Expand Down
52 changes: 52 additions & 0 deletions dbms/src/Debug/dbgFuncSchemaName.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -181,4 +181,56 @@ void dbgFuncGetPartitionTablesTiflashReplicaCount(Context & context, const ASTs
output(fmt_buf.toString());
}

void dbgFuncGetTiflashMode(Context & context, const ASTs & args, DBGInvoker::Printer output)
{
if (args.empty() || args.size() != 2)
throw Exception("Args not matched, should be: database-name[, table-name]", ErrorCodes::BAD_ARGUMENTS);

const String & database_name = typeid_cast<const ASTIdentifier &>(*args[0]).name;
FmtBuffer fmt_buf;

const String & table_name = typeid_cast<const ASTIdentifier &>(*args[1]).name;
auto mapped = mappedTable(context, database_name, table_name);
auto storage = context.getTable(mapped->first, mapped->second);
auto managed_storage = std::dynamic_pointer_cast<IManageableStorage>(storage);
if (!managed_storage)
throw Exception(database_name + "." + table_name + " is not ManageableStorage", ErrorCodes::BAD_ARGUMENTS);

fmt_buf.append((TiFlashModeToString(managed_storage->getTableInfo().tiflash_mode)));

output(fmt_buf.toString());
}

void dbgFuncGetPartitionTablesTiflashMode(Context & context, const ASTs & args, DBGInvoker::Printer output)
{
if (args.empty() || args.size() != 2)
throw Exception("Args not matched, should be: database-name[, table-name]", ErrorCodes::BAD_ARGUMENTS);

const String & database_name = typeid_cast<const ASTIdentifier &>(*args[0]).name;
FmtBuffer fmt_buf;

const String & table_name = typeid_cast<const ASTIdentifier &>(*args[1]).name;
auto mapped = mappedTable(context, database_name, table_name);
auto storage = context.getTable(mapped->first, mapped->second);
auto managed_storage = std::dynamic_pointer_cast<IManageableStorage>(storage);
if (!managed_storage)
throw Exception(database_name + "." + table_name + " is not ManageableStorage", ErrorCodes::BAD_ARGUMENTS);

auto table_info = managed_storage->getTableInfo();

if (!table_info.isLogicalPartitionTable())
throw Exception(database_name + "." + table_name + " is not logical partition table", ErrorCodes::BAD_ARGUMENTS);

SchemaNameMapper name_mapper;
for (const auto & part_def : table_info.partition.definitions)
{
auto paritition_table_info = table_info.producePartitionTableInfo(part_def.id, name_mapper);
auto partition_storage = context.getTMTContext().getStorages().get(paritition_table_info->id);
fmt_buf.append((TiFlashModeToString(partition_storage->getTableInfo().tiflash_mode)));
fmt_buf.append("/");
}

output(fmt_buf.toString());
}

} // namespace DB
10 changes: 10 additions & 0 deletions dbms/src/Debug/dbgFuncSchemaName.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,14 @@ void dbgFuncGetTiflashReplicaCount(Context & context, const ASTs & args, DBGInvo
// ./storage-client.sh "DBGInvoke get_partition_tables_tiflash_replica_count(db_name, table_name)"
void dbgFuncGetPartitionTablesTiflashReplicaCount(Context & context, const ASTs & args, DBGInvoker::Printer output);

// Get table's tiflash mode with mapped table name
// Usage:
// ./storage-client.sh "DBGInvoke get_tiflash_mode(db_name, table_name)"
void dbgFuncGetTiflashMode(Context & context, const ASTs & args, DBGInvoker::Printer output);

// Get the logical table's partition tables' tiflash replica counts with mapped table name
// Usage:
// ./storage-client.sh "DBGInvoke get_partition_tables_tiflash_mode(db_name, table_name)"
void dbgFuncGetPartitionTablesTiflashMode(Context & context, const ASTs & args, DBGInvoker::Printer output);

} // namespace DB
4 changes: 2 additions & 2 deletions dbms/src/Flash/Coprocessor/DAGUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -648,8 +648,8 @@ const std::unordered_map<tipb::ScalarFuncSig, String> scalar_func_map({
//{tipb::ScalarFuncSig::Quote, "cast"},
//{tipb::ScalarFuncSig::Repeat, "cast"},
{tipb::ScalarFuncSig::Replace, "replaceAll"},
//{tipb::ScalarFuncSig::ReverseUTF8, "cast"},
//{tipb::ScalarFuncSig::Reverse, "cast"},
{tipb::ScalarFuncSig::ReverseUTF8, "reverseUTF8"},
{tipb::ScalarFuncSig::Reverse, "reverse"},
{tipb::ScalarFuncSig::RightUTF8, "rightUTF8"},
//{tipb::ScalarFuncSig::Right, "cast"},
{tipb::ScalarFuncSig::RpadUTF8, "rpadUTF8"},
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/EstablishCall.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ void EstablishCallData::finishTunnelAndResponder()
state = FINISH;
if (mpp_tunnel)
{
mpp_tunnel->consumerFinish("grpc writes failed.", true); //trigger mpp tunnel finish work
mpp_tunnel->consumerFinish(fmt::format("{}: finishTunnelAndResponder called.", mpp_tunnel->id()), true); //trigger mpp tunnel finish work
}
grpc::Status status(static_cast<grpc::StatusCode>(GRPC_STATUS_UNKNOWN), "Consumer exits unexpected, grpc writes failed.");
responder.Finish(status, this);
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Mpp/ExchangeReceiver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,7 @@ void ExchangeReceiverBase<RPCContext>::reactor(const std::vector<Request> & asyn
for (Int32 i = 0; i < check_waiting_requests_freq; ++i)
{
AsyncHandler * handler = nullptr;
if (unlikely(!ready_requests.tryPop(handler, timeout)))
if (unlikely(!ready_requests.popTimeout(handler, timeout)))
break;

handler->handle();
Expand Down
4 changes: 3 additions & 1 deletion dbms/src/Flash/Mpp/MPPTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ void MPPTask::runImpl()
}
catch (...)
{
err_msg = getCurrentExceptionMessage(true);
err_msg = getCurrentExceptionMessage(true, true);
}

if (err_msg.empty())
Expand All @@ -405,6 +405,8 @@ void MPPTask::runImpl()
if (status == RUNNING)
{
LOG_FMT_ERROR(log, "task running meets error: {}", err_msg);
/// trim the stack trace to avoid too many useless information in log
trimStackTrace(err_msg);
try
{
handleError(err_msg);
Expand Down
4 changes: 4 additions & 0 deletions dbms/src/Flash/Mpp/MPPTunnel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,11 @@ void MPPTunnelBase<Writer>::sendJob(bool need_lock)
err_msg = "fatal error in sendJob()";
}
if (!err_msg.empty())
{
/// append tunnel id to error message
err_msg = fmt::format("{} meet error: {}", tunnel_id, err_msg);
LOG_ERROR(log, err_msg);
}
consumerFinish(err_msg, need_lock);
if (is_async)
writer->writeDone(grpc::Status::OK);
Expand Down
Loading

0 comments on commit c0c0799

Please sign in to comment.