Skip to content

Commit

Permalink
Merge branch 'master' into func-unhex
Browse files Browse the repository at this point in the history
  • Loading branch information
b41sh authored Dec 29, 2022
2 parents 496855d + 4c8877e commit 887b308
Show file tree
Hide file tree
Showing 19 changed files with 919 additions and 515 deletions.
21 changes: 16 additions & 5 deletions dbms/src/Common/HashTable/StringHashTable.h
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,6 @@ class StringHashTable : private boost::noncopyable
// 1. Always memcpy 8 times bytes
// 2. Use switch case extension to generate fast dispatching table
// 3. Funcs are named callables that can be force_inlined
// NOTE: It relies on Little Endianness
template <typename Self, typename KeyHolder, typename Func>
static auto
#if defined(ADDRESS_SANITIZER) || defined(THREAD_SANITIZER)
Expand Down Expand Up @@ -296,13 +295,19 @@ class StringHashTable : private boost::noncopyable
if ((reinterpret_cast<uintptr_t>(p) & 2048) == 0)
{
memcpy(&n[0], p, 8);
n[0] &= -1ul >> s;
if constexpr (DB::isLittleEndian())
n[0] &= (-1ULL >> s);
else
n[0] &= (-1ULL << s);
}
else
{
const char * lp = x.data + x.size - 8;
memcpy(&n[0], lp, 8);
n[0] >>= s;
if constexpr (DB::isLittleEndian())
n[0] >>= s;
else
n[0] <<= s;
}
keyHolderDiscardKey(key_holder);
return func(self.m1, k8, hash(k8));
Expand All @@ -312,7 +317,10 @@ class StringHashTable : private boost::noncopyable
memcpy(&n[0], p, 8);
const char * lp = x.data + x.size - 8;
memcpy(&n[1], lp, 8);
n[1] >>= s;
if constexpr (DB::isLittleEndian())
n[1] >>= s;
else
n[1] <<= s;
keyHolderDiscardKey(key_holder);
return func(self.m2, k16, hash(k16));
}
Expand All @@ -321,7 +329,10 @@ class StringHashTable : private boost::noncopyable
memcpy(&n[0], p, 16);
const char * lp = x.data + x.size - 8;
memcpy(&n[2], lp, 8);
n[2] >>= s;
if constexpr (DB::isLittleEndian())
n[2] >>= s;
else
n[2] <<= s;
keyHolderDiscardKey(key_holder);
return func(self.m3, k24, hash(k24));
}
Expand Down
20 changes: 16 additions & 4 deletions dbms/src/Common/HashTable/TwoLevelStringHashTable.h
Original file line number Diff line number Diff line change
Expand Up @@ -132,13 +132,19 @@ class TwoLevelStringHashTable : private boost::noncopyable
if ((reinterpret_cast<uintptr_t>(p) & 2048) == 0)
{
memcpy(&n[0], p, 8);
n[0] &= -1ul >> s;
if constexpr (DB::isLittleEndian())
n[0] &= (-1ULL >> s);
else
n[0] &= (-1ULL << s);
}
else
{
const char * lp = x.data + x.size - 8;
memcpy(&n[0], lp, 8);
n[0] >>= s;
if constexpr (DB::isLittleEndian())
n[0] >>= s;
else
n[0] <<= s;
}
auto res = hash(k8);
auto buck = getBucketFromHash(res);
Expand All @@ -150,7 +156,10 @@ class TwoLevelStringHashTable : private boost::noncopyable
memcpy(&n[0], p, 8);
const char * lp = x.data + x.size - 8;
memcpy(&n[1], lp, 8);
n[1] >>= s;
if constexpr (DB::isLittleEndian())
n[1] >>= s;
else
n[1] <<= s;
auto res = hash(k16);
auto buck = getBucketFromHash(res);
keyHolderDiscardKey(key_holder);
Expand All @@ -161,7 +170,10 @@ class TwoLevelStringHashTable : private boost::noncopyable
memcpy(&n[0], p, 16);
const char * lp = x.data + x.size - 8;
memcpy(&n[2], lp, 8);
n[2] >>= s;
if constexpr (DB::isLittleEndian())
n[2] >>= s;
else
n[2] <<= s;
auto res = hash(k24);
auto buck = getBucketFromHash(res);
keyHolderDiscardKey(key_holder);
Expand Down
7 changes: 7 additions & 0 deletions dbms/src/Common/OptimizedRegularExpression.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#pragma once

#include <Columns/ColumnString.h>
#include <Common/config.h>
#include <common/StringRef.h>
#include <common/types.h>
Expand Down Expand Up @@ -117,6 +118,7 @@ class OptimizedRegularExpressionImpl

Int64 instr(const char * subject, size_t subject_size, Int64 pos, Int64 occur, Int64 ret_op);
std::optional<StringRef> substr(const char * subject, size_t subject_size, Int64 pos, Int64 occur);
void replace(const char * subject, size_t subject_size, DB::ColumnString::Chars_t & res_data, DB::ColumnString::Offset & res_offset, const StringRef & repl, Int64 pos, Int64 occur);

private:
Int64 processInstrEmptyStringExpr(const char * expr, size_t expr_size, size_t byte_pos, Int64 occur);
Expand All @@ -125,6 +127,11 @@ class OptimizedRegularExpressionImpl
std::optional<StringRef> processSubstrEmptyStringExpr(const char * expr, size_t expr_size, size_t byte_pos, Int64 occur);
std::optional<StringRef> substrImpl(const char * subject, size_t subject_size, Int64 byte_pos, Int64 occur);

void processReplaceEmptyStringExpr(const char * subject, size_t subject_size, DB::ColumnString::Chars_t & res_data, DB::ColumnString::Offset & res_offset, const StringRef & repl, Int64 byte_pos, Int64 occur);
void replaceImpl(const char * subject, size_t subject_size, DB::ColumnString::Chars_t & res_data, DB::ColumnString::Offset & res_offset, const StringRef & repl, Int64 byte_pos, Int64 occur);
void replaceOneImpl(const char * subject, size_t subject_size, DB::ColumnString::Chars_t & res_data, DB::ColumnString::Offset & res_offset, const StringRef & repl, Int64 byte_pos, Int64 occur);
void replaceAllImpl(const char * subject, size_t subject_size, DB::ColumnString::Chars_t & res_data, DB::ColumnString::Offset & res_offset, const StringRef & repl, Int64 byte_pos);

bool is_trivial;
bool required_substring_is_prefix;
bool is_case_insensitive;
Expand Down
171 changes: 162 additions & 9 deletions dbms/src/Common/OptimizedRegularExpression.inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <common/defines.h>
#include <common/types.h>

#include <cstring>
#include <iostream>
#include <optional>

Expand Down Expand Up @@ -499,22 +500,67 @@ std::optional<StringRef> OptimizedRegularExpressionImpl<thread_safe>::processSub
return std::optional<StringRef>(StringRef(matched_str.data(), matched_str.size()));
}

static inline void checkInstrArgs(Int64 utf8_total_len, size_t subject_size, Int64 pos, Int64 ret_op)
template <bool thread_safe>
void OptimizedRegularExpressionImpl<thread_safe>::processReplaceEmptyStringExpr(const char * subject, size_t subject_size, DB::ColumnString::Chars_t & res_data, DB::ColumnString::Offset & res_offset, const StringRef & repl, Int64 byte_pos, Int64 occur)
{
RUNTIME_CHECK_MSG(!(ret_op != 0 && ret_op != 1), "Incorrect argument to regexp function: return_option must be 1 or 0");
RUNTIME_CHECK_MSG(!(pos <= 0 || (pos > utf8_total_len && subject_size != 0)), "Index out of bounds in regular function.");
if (occur > 1 || byte_pos != 1)
{
res_data.resize(res_data.size() + 1);
res_data[res_offset++] = '\0';
return;
}

StringPieceType expr_sp(subject, subject_size);
StringPieceType matched_str;
bool success = RegexType::FindAndConsume(&expr_sp, *re2, &matched_str);
if (!success)
{
res_data.resize(res_data.size() + 1);
}
else
{
res_data.resize(res_data.size() + repl.size + 1);
memcpy(&res_data[res_offset], repl.data, repl.size);
res_offset += repl.size;
}

res_data[res_offset++] = '\0';
}

static inline void checkSubstrArgs(Int64 utf8_total_len, size_t subject_size, Int64 pos)
namespace FunctionsRegexp
{
inline void checkArgPos(Int64 utf8_total_len, size_t subject_size, Int64 pos)
{
RUNTIME_CHECK_MSG(!(pos <= 0 || (pos > utf8_total_len && subject_size != 0)), "Index out of bounds in regular function.");
}

static inline void makeOccurValid(Int64 & occur)
inline void checkArgsInstr(Int64 utf8_total_len, size_t subject_size, Int64 pos, Int64 ret_op)
{
RUNTIME_CHECK_MSG(!(ret_op != 0 && ret_op != 1), "Incorrect argument to regexp function: return_option must be 1 or 0");
checkArgPos(utf8_total_len, subject_size, pos);
}

inline void checkArgsSubstr(Int64 utf8_total_len, size_t subject_size, Int64 pos)
{
checkArgPos(utf8_total_len, subject_size, pos);
}

inline void checkArgsReplace(Int64 utf8_total_len, size_t subject_size, Int64 pos)
{
checkArgPos(utf8_total_len, subject_size, pos);
}

inline void makeOccurValid(Int64 & occur)
{
occur = occur < 1 ? 1 : occur;
}

inline void makeReplaceOccurValid(Int64 & occur)
{
occur = occur < 0 ? 1 : occur;
}
} // namespace FunctionsRegexp

template <bool thread_safe>
Int64 OptimizedRegularExpressionImpl<thread_safe>::instrImpl(const char * subject, size_t subject_size, Int64 byte_pos, Int64 occur, Int64 ret_op)
{
Expand Down Expand Up @@ -557,13 +603,95 @@ std::optional<StringRef> OptimizedRegularExpressionImpl<thread_safe>::substrImpl
return std::optional<StringRef>(StringRef(matched_str.data(), matched_str.size()));
}

template <bool thread_safe>
void OptimizedRegularExpressionImpl<thread_safe>::replaceAllImpl(const char * subject, size_t subject_size, DB::ColumnString::Chars_t & res_data, DB::ColumnString::Offset & res_offset, const StringRef & repl, Int64 byte_pos)
{
size_t byte_offset = byte_pos - 1; // This is a offset for bytes, not utf8
StringPieceType expr_sp(subject + byte_offset, subject_size - byte_offset);
StringPieceType matched_str;
size_t prior_offset = 0;

while (true)
{
bool success = RegexType::FindAndConsume(&expr_sp, *re2, &matched_str);
if (!success)
break;

auto skipped_byte_size = static_cast<Int64>(matched_str.data() - (subject + prior_offset));
res_data.resize(res_data.size() + skipped_byte_size);
memcpy(&res_data[res_offset], subject + prior_offset, skipped_byte_size); // copy the skipped bytes
res_offset += skipped_byte_size;

res_data.resize(res_data.size() + repl.size);
memcpy(&res_data[res_offset], repl.data, repl.size); // replace the matched string
res_offset += repl.size;

prior_offset = expr_sp.data() - subject;
}

size_t suffix_byte_size = subject_size - prior_offset;
res_data.resize(res_data.size() + suffix_byte_size + 1);
memcpy(&res_data[res_offset], subject + prior_offset, suffix_byte_size); // Copy suffix string
res_offset += suffix_byte_size;
res_data[res_offset++] = 0;
}

template <bool thread_safe>
void OptimizedRegularExpressionImpl<thread_safe>::replaceOneImpl(const char * subject, size_t subject_size, DB::ColumnString::Chars_t & res_data, DB::ColumnString::Offset & res_offset, const StringRef & repl, Int64 byte_pos, Int64 occur)
{
size_t byte_offset = byte_pos - 1; // This is a offset for bytes, not utf8
StringPieceType expr_sp(subject + byte_offset, subject_size - byte_offset);
StringPieceType matched_str;

while (occur > 0)
{
bool success = RegexType::FindAndConsume(&expr_sp, *re2, &matched_str);
if (!success)
{
res_data.resize(res_data.size() + subject_size + 1);
memcpy(&res_data[res_offset], subject, subject_size);
res_offset += subject_size;
res_data[res_offset++] = 0;
return;
}

--occur;
}

auto prefix_byte_size = static_cast<Int64>(matched_str.data() - subject);
res_data.resize(res_data.size() + prefix_byte_size);
memcpy(&res_data[res_offset], subject, prefix_byte_size); // Copy prefix string
res_offset += prefix_byte_size;

res_data.resize(res_data.size() + repl.size);
memcpy(&res_data[res_offset], repl.data, repl.size); // Replace the matched string
res_offset += repl.size;

const char * suffix_str = subject + prefix_byte_size + matched_str.size();
size_t suffix_byte_size = subject_size - prefix_byte_size - matched_str.size();
res_data.resize(res_data.size() + suffix_byte_size + 1);
memcpy(&res_data[res_offset], suffix_str, suffix_byte_size); // Copy suffix string
res_offset += suffix_byte_size;

res_data[res_offset++] = 0;
}

template <bool thread_safe>
void OptimizedRegularExpressionImpl<thread_safe>::replaceImpl(const char * subject, size_t subject_size, DB::ColumnString::Chars_t & res_data, DB::ColumnString::Offset & res_offset, const StringRef & repl, Int64 byte_pos, Int64 occur)
{
if (occur == 0)
return replaceAllImpl(subject, subject_size, res_data, res_offset, repl, byte_pos);
else
return replaceOneImpl(subject, subject_size, res_data, res_offset, repl, byte_pos, occur);
}

template <bool thread_safe>
Int64 OptimizedRegularExpressionImpl<thread_safe>::instr(const char * subject, size_t subject_size, Int64 pos, Int64 occur, Int64 ret_op)
{
Int64 utf8_total_len = DB::UTF8::countCodePoints(reinterpret_cast<const UInt8 *>(subject), subject_size);
;
checkInstrArgs(utf8_total_len, subject_size, pos, ret_op);
makeOccurValid(occur);
FunctionsRegexp::checkArgsInstr(utf8_total_len, subject_size, pos, ret_op);
FunctionsRegexp::makeOccurValid(occur);

if (unlikely(subject_size == 0))
return processInstrEmptyStringExpr(subject, subject_size, pos, occur);
Expand All @@ -576,8 +704,8 @@ template <bool thread_safe>
std::optional<StringRef> OptimizedRegularExpressionImpl<thread_safe>::substr(const char * subject, size_t subject_size, Int64 pos, Int64 occur)
{
Int64 utf8_total_len = DB::UTF8::countCodePoints(reinterpret_cast<const UInt8 *>(subject), subject_size);
checkSubstrArgs(utf8_total_len, subject_size, pos);
makeOccurValid(occur);
FunctionsRegexp::checkArgsSubstr(utf8_total_len, subject_size, pos);
FunctionsRegexp::makeOccurValid(occur);

if (unlikely(subject_size == 0))
return processSubstrEmptyStringExpr(subject, subject_size, pos, occur);
Expand All @@ -586,5 +714,30 @@ std::optional<StringRef> OptimizedRegularExpressionImpl<thread_safe>::substr(con
return substrImpl(subject, subject_size, byte_pos, occur);
}

template <bool thread_safe>
void OptimizedRegularExpressionImpl<thread_safe>::replace(
const char * subject,
size_t subject_size,
DB::ColumnString::Chars_t & res_data,
DB::ColumnString::Offset & res_offset,
const StringRef & repl,
Int64 pos,
Int64 occur)
{
Int64 utf8_total_len = DB::UTF8::countCodePoints(reinterpret_cast<const UInt8 *>(subject), subject_size);
;
FunctionsRegexp::checkArgsReplace(utf8_total_len, subject_size, pos);
FunctionsRegexp::makeReplaceOccurValid(occur);

if (unlikely(subject_size == 0))
{
processReplaceEmptyStringExpr(subject, subject_size, res_data, res_offset, repl, pos, occur);
return;
}

size_t byte_pos = DB::UTF8::utf8Pos2bytePos(reinterpret_cast<const UInt8 *>(subject), pos);
replaceImpl(subject, subject_size, res_data, res_offset, repl, byte_pos, occur);
}

#undef MIN_LENGTH_FOR_STRSTR
#undef MAX_SUBPATTERNS
4 changes: 2 additions & 2 deletions dbms/src/Debug/MockExecutor/AggregationBinder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,11 +92,11 @@ void AggregationBinder::toMPPSubPlan(size_t & executor_index, const DAGPropertie
}

std::shared_ptr<ExchangeSenderBinder> exchange_sender
= std::make_shared<ExchangeSenderBinder>(executor_index, output_schema_for_partial_agg, partition_keys.empty() ? tipb::PassThrough : tipb::Hash, partition_keys);
= std::make_shared<ExchangeSenderBinder>(executor_index, output_schema_for_partial_agg, partition_keys.empty() ? tipb::PassThrough : tipb::Hash, partition_keys, fine_grained_shuffle_stream_count);
exchange_sender->children.push_back(partial_agg);

std::shared_ptr<ExchangeReceiverBinder> exchange_receiver
= std::make_shared<ExchangeReceiverBinder>(executor_index, output_schema_for_partial_agg);
= std::make_shared<ExchangeReceiverBinder>(executor_index, output_schema_for_partial_agg, fine_grained_shuffle_stream_count);
exchange_map[exchange_receiver->name] = std::make_pair(exchange_receiver, exchange_sender);

/// re-construct agg_exprs and gby_exprs in final_agg
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Debug/MockExecutor/ExchangeSenderBinder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ bool ExchangeSenderBinder::toTiPBExecutor(tipb::Executor * tipb_executor, int32_
tipb_executor->set_executor_id(name);
tipb::ExchangeSender * exchange_sender = tipb_executor->mutable_exchange_sender();
exchange_sender->set_tp(type);
if (tipb_executor->exchange_sender().tp() == tipb::Hash)
tipb_executor->set_fine_grained_shuffle_stream_count(fine_grained_shuffle_stream_count);
for (auto i : partition_keys)
{
auto * expr = exchange_sender->add_partition_keys();
Expand Down
4 changes: 3 additions & 1 deletion dbms/src/Debug/MockExecutor/ExchangeSenderBinder.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,11 @@ namespace DB::mock
class ExchangeSenderBinder : public ExecutorBinder
{
public:
ExchangeSenderBinder(size_t & index, const DAGSchema & output, tipb::ExchangeType type_, const std::vector<size_t> & partition_keys_ = {})
ExchangeSenderBinder(size_t & index, const DAGSchema & output, tipb::ExchangeType type_, const std::vector<size_t> & partition_keys_ = {}, uint64_t fine_grained_shuffle_stream_count_ = 0)
: ExecutorBinder(index, "exchange_sender_" + std::to_string(index), output)
, type(type_)
, partition_keys(partition_keys_)
, fine_grained_shuffle_stream_count(fine_grained_shuffle_stream_count_)
{}

bool toTiPBExecutor(tipb::Executor * tipb_executor, int32_t collator_id, const MPPInfo & mpp_info, const Context & context) override;
Expand All @@ -38,6 +39,7 @@ class ExchangeSenderBinder : public ExecutorBinder
tipb::ExchangeType type;
TaskMetas task_metas;
std::vector<size_t> partition_keys;
uint64_t fine_grained_shuffle_stream_count;
};

ExecutorBinderPtr compileExchangeSender(ExecutorBinderPtr input, size_t & executor_index, tipb::ExchangeType exchange_type);
Expand Down
Loading

0 comments on commit 887b308

Please sign in to comment.