Skip to content

Commit

Permalink
Merge branch 'master' into refine_interpreter_join
Browse files Browse the repository at this point in the history
  • Loading branch information
SeaRise authored May 24, 2022
2 parents 2a40af5 + 1e64c5d commit d7852c7
Show file tree
Hide file tree
Showing 31 changed files with 409 additions and 579 deletions.
5 changes: 5 additions & 0 deletions dbms/src/Common/TiFlashBuildInfo.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,11 @@ std::string getEnabledFeatures()
#else
"unwind",
#endif
#endif

// THINLTO
#if ENABLE_THINLTO
"thinlto",
#endif
};
return fmt::format("{}", fmt::join(features.begin(), features.end(), " "));
Expand Down
4 changes: 1 addition & 3 deletions dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,6 @@
#include <Interpreters/ExpressionAnalyzer.h>
#include <Interpreters/Join.h>
#include <Parsers/ASTSelectQuery.h>
#include <Storages/Transaction/TiDB.h>


namespace DB
{
Expand Down Expand Up @@ -88,7 +86,7 @@ struct AnalysisResult
Names aggregation_keys;
TiDB::TiDBCollators aggregation_collators;
AggregateDescriptions aggregate_descriptions;
bool is_final_agg;
bool is_final_agg = false;
};

AnalysisResult analyzeExpressions(
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Mpp/MPPTaskManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ struct MPPQueryTaskSet
bool to_be_cancelled = false;
MPPTaskMap task_map;
/// only used in scheduler
std::queue<MPPTaskPtr> waiting_tasks;
std::queue<MPPTaskId> waiting_tasks;
};

using MPPQueryTaskSetPtr = std::shared_ptr<MPPQueryTaskSet>;
Expand Down
22 changes: 16 additions & 6 deletions dbms/src/Flash/Mpp/MinTSOScheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ bool MinTSOScheduler::tryToSchedule(const MPPTaskPtr & task, MPPTaskManager & ta
LOG_FMT_WARNING(log, "{} is scheduled with miss or cancellation.", id.toString());
return true;
}
return scheduleImp(id.start_ts, query_task_set, task, false);
bool has_error = false;
return scheduleImp(id.start_ts, query_task_set, task, false, has_error);
}

/// after finishing the query, there would be no threads released soon, so the updated min-tso query with waiting tasks should be scheduled.
Expand All @@ -97,7 +98,9 @@ void MinTSOScheduler::deleteQuery(const UInt64 tso, MPPTaskManager & task_manage
{
while (!query_task_set->waiting_tasks.empty())
{
query_task_set->waiting_tasks.front()->scheduleThisTask(MPPTask::ScheduleState::FAILED);
auto task_it = query_task_set->task_map.find(query_task_set->waiting_tasks.front());
if (task_it != query_task_set->task_map.end() && task_it->second != nullptr)
task_it->second->scheduleThisTask(MPPTask::ScheduleState::FAILED);
query_task_set->waiting_tasks.pop();
GET_METRIC(tiflash_task_scheduler, type_waiting_tasks_count).Decrement();
}
Expand Down Expand Up @@ -153,9 +156,14 @@ void MinTSOScheduler::scheduleWaitingQueries(MPPTaskManager & task_manager)
/// schedule tasks one by one
while (!query_task_set->waiting_tasks.empty())
{
auto task = query_task_set->waiting_tasks.front();
if (!scheduleImp(current_query_id, query_task_set, task, true))
auto task_it = query_task_set->task_map.find(query_task_set->waiting_tasks.front());
bool has_error = false;
if (task_it != query_task_set->task_map.end() && task_it->second != nullptr && !scheduleImp(current_query_id, query_task_set, task_it->second, true, has_error))
{
if (has_error)
query_task_set->waiting_tasks.pop(); /// it should be pop from the waiting queue, because the task is scheduled with errors.
return;
}
query_task_set->waiting_tasks.pop();
GET_METRIC(tiflash_task_scheduler, type_waiting_tasks_count).Decrement();
}
Expand All @@ -166,7 +174,7 @@ void MinTSOScheduler::scheduleWaitingQueries(MPPTaskManager & task_manager)
}

/// [directly schedule, from waiting set] * [is min_tso query, not] * [can schedule, can't] totally 8 cases.
bool MinTSOScheduler::scheduleImp(const UInt64 tso, const MPPQueryTaskSetPtr & query_task_set, const MPPTaskPtr & task, const bool isWaiting)
bool MinTSOScheduler::scheduleImp(const UInt64 tso, const MPPQueryTaskSetPtr & query_task_set, const MPPTaskPtr & task, const bool isWaiting, bool & has_error)
{
auto needed_threads = task->getNeededThreads();
auto check_for_new_min_tso = tso <= min_tso && estimated_thread_usage + needed_threads <= thread_hard_limit;
Expand All @@ -187,6 +195,7 @@ bool MinTSOScheduler::scheduleImp(const UInt64 tso, const MPPQueryTaskSetPtr & q
{
if (tso <= min_tso) /// the min_tso query should fully run, otherwise throw errors here.
{
has_error = true;
auto msg = fmt::format("threads are unavailable for the query {} ({} min_tso {}) {}, need {}, but used {} of the thread hard limit {}, {} active and {} waiting queries.", tso, tso == min_tso ? "is" : "is newer than", min_tso, isWaiting ? "from the waiting set" : "when directly schedule it", needed_threads, estimated_thread_usage, thread_hard_limit, active_set.size(), waiting_set.size());
LOG_FMT_ERROR(log, "{}", msg);
GET_METRIC(tiflash_task_scheduler, type_hard_limit_exceeded_count).Increment();
Expand All @@ -200,11 +209,12 @@ bool MinTSOScheduler::scheduleImp(const UInt64 tso, const MPPQueryTaskSetPtr & q
{
throw Exception(msg);
}
return false;
}
if (!isWaiting)
{
waiting_set.insert(tso);
query_task_set->waiting_tasks.push(task);
query_task_set->waiting_tasks.push(task->getId());
GET_METRIC(tiflash_task_scheduler, type_waiting_queries_count).Set(waiting_set.size());
GET_METRIC(tiflash_task_scheduler, type_waiting_tasks_count).Increment();
}
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Mpp/MinTSOScheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class MinTSOScheduler : private boost::noncopyable
void releaseThreadsThenSchedule(const int needed_threads, MPPTaskManager & task_manager);

private:
bool scheduleImp(const UInt64 tso, const MPPQueryTaskSetPtr & query_task_set, const MPPTaskPtr & task, const bool isWaiting);
bool scheduleImp(const UInt64 tso, const MPPQueryTaskSetPtr & query_task_set, const MPPTaskPtr & task, const bool isWaiting, bool & has_error);
bool updateMinTSO(const UInt64 tso, const bool retired, const String msg);
void scheduleWaitingQueries(MPPTaskManager & task_manager);
bool isDisabled()
Expand Down
15 changes: 15 additions & 0 deletions dbms/src/Functions/FunctionsTiDBConversion.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,19 @@ void registerFunctionsTiDBConversion(FunctionFactory & factory)
factory.registerFunction<FunctionBuilderTiDBCast>();
}

FunctionBasePtr FunctionBuilderTiDBCast::buildImpl(
const ColumnsWithTypeAndName & arguments,
const DataTypePtr & return_type,
const TiDB::TiDBCollatorPtr &) const
{
DataTypes data_types(arguments.size());

for (size_t i = 0; i < arguments.size(); ++i)
data_types[i] = arguments[i].type;

auto monotonicity = getMonotonicityInformation(arguments.front().type, return_type.get());
return std::make_shared<FunctionTiDBCast<>>(context, name, std::move(monotonicity), data_types, return_type, in_union, tidb_tp);
}


} // namespace DB
20 changes: 6 additions & 14 deletions dbms/src/Functions/FunctionsTiDBConversion.h
Original file line number Diff line number Diff line change
Expand Up @@ -1743,6 +1743,7 @@ inline bool numberToDateTime(Int64 number, MyDateTime & result, DAGContext * ctx
return getDatetime(number, result, ctx);
}

template <typename...>
class ExecutableFunctionTiDBCast : public IExecutableFunction
{
public:
Expand Down Expand Up @@ -1782,13 +1783,15 @@ class ExecutableFunctionTiDBCast : public IExecutableFunction
const Context & context;
};

using MonotonicityForRange = std::function<IFunctionBase::Monotonicity(const IDataType &, const Field &, const Field &)>;

/// FunctionTiDBCast implements SQL cast function in TiDB
/// The basic idea is to dispatch according to combinations of <From, To> parameter types
template <typename...>
class FunctionTiDBCast final : public IFunctionBase
{
public:
using WrapperType = std::function<void(Block &, const ColumnNumbers &, size_t, bool, const tipb::FieldType &, const Context &)>;
using MonotonicityForRange = std::function<Monotonicity(const IDataType &, const Field &, const Field &)>;

FunctionTiDBCast(const Context & context, const char * name, MonotonicityForRange && monotonicity_for_range, const DataTypes & argument_types, const DataTypePtr & return_type, bool in_union_, const tipb::FieldType & tidb_tp_)
: context(context)
Expand All @@ -1805,7 +1808,7 @@ class FunctionTiDBCast final : public IFunctionBase

ExecutableFunctionPtr prepare(const Block & /*sample_block*/) const override
{
return std::make_shared<ExecutableFunctionTiDBCast>(
return std::make_shared<ExecutableFunctionTiDBCast<>>(
prepare(getArgumentTypes()[0], getReturnType()),
name,
in_union,
Expand Down Expand Up @@ -2341,8 +2344,6 @@ class FunctionTiDBCast final : public IFunctionBase
class FunctionBuilderTiDBCast : public IFunctionBuilder
{
public:
using MonotonicityForRange = FunctionTiDBCast::MonotonicityForRange;

static constexpr auto name = "tidb_cast";
static FunctionBuilderPtr create(const Context & context)
{
Expand All @@ -2369,16 +2370,7 @@ class FunctionBuilderTiDBCast : public IFunctionBuilder
FunctionBasePtr buildImpl(
const ColumnsWithTypeAndName & arguments,
const DataTypePtr & return_type,
const TiDB::TiDBCollatorPtr &) const override
{
DataTypes data_types(arguments.size());

for (size_t i = 0; i < arguments.size(); ++i)
data_types[i] = arguments[i].type;

auto monotonicity = getMonotonicityInformation(arguments.front().type, return_type.get());
return std::make_shared<FunctionTiDBCast>(context, name, std::move(monotonicity), data_types, return_type, in_union, tidb_tp);
}
const TiDB::TiDBCollatorPtr &) const override;

// use the last const string column's value as the return type name, in string representation like "Float64"
DataTypePtr getReturnTypeImpl(const ColumnsWithTypeAndName & arguments) const override
Expand Down
Loading

0 comments on commit d7852c7

Please sign in to comment.