Skip to content

Commit

Permalink
Merge branch 'master' into support_temp_table
Browse files Browse the repository at this point in the history
  • Loading branch information
Yulei-Yang committed Sep 30, 2024
2 parents 0bb8654 + ce982a9 commit f580e2a
Show file tree
Hide file tree
Showing 1,183 changed files with 106,588 additions and 4,654 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ thirdparty/doris-thirdparty*.tar.xz
docker/thirdparties/docker-compose/mysql/data
docker/thirdparties/docker-compose/hive/scripts/tpch1.db/
docker/thirdparties/docker-compose/hive/scripts/paimon1
docker/thirdparties/docker-compose/hive/scripts/tvf_data

fe_plugins/output
fe_plugins/**/.factorypath
Expand Down
1 change: 1 addition & 0 deletions be/src/agent/be_exec_version_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ void BeExecVersionManager::check_function_compatibility(int current_be_exec_vers
* c. change FunctionIsIPAddressInRange from AlwaysNotNullable to DependOnArguments
* d. change some agg function nullable property: PR #37215
* e. change variant serde to fix PR #38413
* f. support const column in serialize/deserialize function: PR #41175
*/
const int BeExecVersionManager::max_be_exec_version = 7;
const int BeExecVersionManager::min_be_exec_version = 0;
Expand Down
2 changes: 2 additions & 0 deletions be/src/agent/be_exec_version_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ constexpr inline int AGG_FUNCTION_NULLABLE = 5; // change some agg nullable prop
constexpr inline int VARIANT_SERDE = 6; // change variant serde to fix PR #38413
constexpr inline int AGGREGATION_2_1_VERSION =
6; // some aggregation changed the data format after this version
constexpr inline int USE_CONST_SERDE =
7; // support const column in serialize/deserialize function: PR #41175

class BeExecVersionManager {
public:
Expand Down
17 changes: 7 additions & 10 deletions be/src/agent/heartbeat_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -240,20 +240,12 @@ Status HeartbeatServer::_heartbeat(const TMasterInfo& master_info) {
master_info.network_address.hostname, master_info.network_address.port);
}

if (need_report) {
LOG(INFO) << "Master FE is changed or restarted. report tablet and disk info immediately";
_engine.notify_listeners();
}

if (master_info.__isset.meta_service_endpoint != config::is_cloud_mode()) {
LOG(WARNING) << "Detected mismatch in cloud mode configuration between FE and BE. "
<< "FE cloud mode: "
<< (master_info.__isset.meta_service_endpoint ? "true" : "false")
<< ", BE cloud mode: " << (config::is_cloud_mode() ? "true" : "false");
return Status::InvalidArgument<false>(
"fe and be do not work in same mode, fe cloud mode: {},"
" be cloud mode: {}",
master_info.__isset.meta_service_endpoint, config::is_cloud_mode());
<< ", BE cloud mode: " << (config::is_cloud_mode() ? "true" : "false")
<< ". If fe is earlier than version 3.0.2, the message can be ignored.";
}

if (master_info.__isset.meta_service_endpoint) {
Expand Down Expand Up @@ -283,6 +275,11 @@ Status HeartbeatServer::_heartbeat(const TMasterInfo& master_info) {
LOG(INFO) << "set config cloud_unique_id " << master_info.cloud_unique_id << " " << st;
}

if (need_report) {
LOG(INFO) << "Master FE is changed or restarted. report tablet and disk info immediately";
_engine.notify_listeners();
}

return Status::OK();
}

Expand Down
2 changes: 1 addition & 1 deletion be/src/cloud/cloud_base_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ Status CloudBaseCompaction::modify_rowsets() {
int64_t initiator = HashUtil::hash64(_uuid.data(), _uuid.size(), 0) &
std::numeric_limits<int64_t>::max();
RETURN_IF_ERROR(cloud_tablet()->calc_delete_bitmap_for_compaction(
_input_rowsets, _output_rowset, _rowid_conversion, compaction_type(),
_input_rowsets, _output_rowset, *_rowid_conversion, compaction_type(),
_stats.merged_rows, _stats.filtered_rows, initiator, output_rowset_delete_bitmap,
_allow_delete_in_cumu_compaction));
LOG_INFO("update delete bitmap in CloudBaseCompaction, tablet_id={}, range=[{}-{}]",
Expand Down
11 changes: 10 additions & 1 deletion be/src/cloud/cloud_cumulative_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -255,13 +255,22 @@ Status CloudCumulativeCompaction::modify_rowsets() {
compaction_job->add_txn_id(_output_rowset->txn_id());
compaction_job->add_output_rowset_ids(_output_rowset->rowset_id().to_string());

DBUG_EXECUTE_IF("CloudCumulativeCompaction::modify_rowsets.enable_spin_wait", {
LOG(INFO) << "CloudCumulativeCompaction::modify_rowsets.enable_spin_wait, start";
while (DebugPoints::instance()->is_enable(
"CloudCumulativeCompaction::modify_rowsets.block")) {
std::this_thread::sleep_for(std::chrono::milliseconds(50));
}
LOG(INFO) << "CloudCumulativeCompaction::modify_rowsets.enable_spin_wait, exit";
});

DeleteBitmapPtr output_rowset_delete_bitmap = nullptr;
int64_t initiator =
HashUtil::hash64(_uuid.data(), _uuid.size(), 0) & std::numeric_limits<int64_t>::max();
if (_tablet->keys_type() == KeysType::UNIQUE_KEYS &&
_tablet->enable_unique_key_merge_on_write()) {
RETURN_IF_ERROR(cloud_tablet()->calc_delete_bitmap_for_compaction(
_input_rowsets, _output_rowset, _rowid_conversion, compaction_type(),
_input_rowsets, _output_rowset, *_rowid_conversion, compaction_type(),
_stats.merged_rows, _stats.filtered_rows, initiator, output_rowset_delete_bitmap,
_allow_delete_in_cumu_compaction));
LOG_INFO("update delete bitmap in CloudCumulativeCompaction, tablet_id={}, range=[{}-{}]",
Expand Down
6 changes: 4 additions & 2 deletions be/src/cloud/cloud_delete_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,10 @@ Status CloudDeleteTask::execute(CloudStorageEngine& engine, const TPushReq& requ
LOG_WARNING("tablet exceeds max version num limit")
.tag("limit", config::max_tablet_version_num)
.tag("tablet_id", tablet->tablet_id());
return Status::Error<TOO_MANY_VERSION>("too many versions, versions={} tablet={}",
config::max_tablet_version_num, tablet->tablet_id());
return Status::Error<TOO_MANY_VERSION>(
"too many versions, versions={} tablet={}. Please reduce the frequency of loading "
"data or adjust the max_tablet_version_num in be.conf to a larger value.",
config::max_tablet_version_num, tablet->tablet_id());
}

// check delete condition if push for delete
Expand Down
3 changes: 2 additions & 1 deletion be/src/cloud/cloud_rowset_builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,8 @@ Status CloudRowsetBuilder::check_tablet_version_count() {
if (version_count > config::max_tablet_version_num) {
return Status::Error<TOO_MANY_VERSION>(
"failed to init rowset builder. version count: {}, exceed limit: {}, "
"tablet: {}",
"tablet: {}. Please reduce the frequency of loading data or adjust the "
"max_tablet_version_num in be.conf to a larger value.",
version_count, config::max_tablet_version_num, _tablet->tablet_id());
}
return Status::OK();
Expand Down
3 changes: 2 additions & 1 deletion be/src/cloud/cloud_rowset_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#include "cloud/cloud_rowset_writer.h"

#include "common/status.h"
#include "io/cache/block_file_cache_factory.h"
#include "io/fs/file_system.h"
#include "olap/rowset/rowset_factory.h"
Expand All @@ -34,7 +35,7 @@ Status CloudRowsetWriter::init(const RowsetWriterContext& rowset_writer_context)
if (_context.is_local_rowset()) {
// In cloud mode, this branch implies it is an intermediate rowset for external merge sort,
// we use `global_local_filesystem` to write data to `tmp_file_dir`(see `local_segment_path`).
_context.tablet_path = io::FileCacheFactory::instance()->get_cache_path();
_context.tablet_path = io::FileCacheFactory::instance()->pick_one_cache_path();
} else {
_rowset_meta->set_remote_storage_resource(*_context.storage_resource);
}
Expand Down
28 changes: 22 additions & 6 deletions be/src/cloud/cloud_txn_delete_bitmap_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,18 @@ Status CloudTxnDeleteBitmapCache::get_tablet_txn_info(
*publish_status = iter->second.publish_status;
*previous_publish_info = iter->second.publish_info;
}
RETURN_IF_ERROR(
get_delete_bitmap(transaction_id, tablet_id, delete_bitmap, rowset_ids, nullptr));
return Status::OK();

auto st = get_delete_bitmap(transaction_id, tablet_id, delete_bitmap, rowset_ids, nullptr);

if (st.is<ErrorCode::NOT_FOUND>()) {
// Because of the rowset_ids become empty, all delete bitmap
// will be recalculate in CalcDeleteBitmapTask
if (delete_bitmap != nullptr) {
*delete_bitmap = std::make_shared<DeleteBitmap>(tablet_id);
}
return Status::OK();
}
return st;
}

Status CloudTxnDeleteBitmapCache::get_delete_bitmap(
Expand All @@ -95,6 +104,13 @@ Status CloudTxnDeleteBitmapCache::get_delete_bitmap(
CacheKey key(key_str);
Cache::Handle* handle = lookup(key);

DBUG_EXECUTE_IF("CloudTxnDeleteBitmapCache::get_delete_bitmap.cache_miss", {
handle = nullptr;
LOG(INFO) << "CloudTxnDeleteBitmapCache::get_delete_bitmap.cache_miss, make cache missed "
"when get delete bitmap, txn_id:"
<< transaction_id << ", tablet_id: " << tablet_id;
});

DeleteBitmapCacheValue* val =
handle == nullptr ? nullptr : reinterpret_cast<DeleteBitmapCacheValue*>(value(handle));
if (val) {
Expand All @@ -109,9 +125,9 @@ Status CloudTxnDeleteBitmapCache::get_delete_bitmap(
LOG_INFO("cache missed when get delete bitmap")
.tag("txn_id", transaction_id)
.tag("tablet_id", tablet_id);
// Because of the rowset_ids become empty, all delete bitmap
// will be recalculate in CalcDeleteBitmapTask
*delete_bitmap = std::make_shared<DeleteBitmap>(tablet_id);
return Status::Error<ErrorCode::NOT_FOUND, false>(
"cache missed when get delete bitmap, tablet_id={}, transaction_id={}", tablet_id,
transaction_id);
}
return Status::OK();
}
Expand Down
11 changes: 11 additions & 0 deletions be/src/cloud/injection_point_action.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,17 @@ void register_suites() {
arg0->second = true;
});
});
// curl be_ip:http_port/api/injection_point/apply_suite?name=Segment::parse_footer:magic_number_corruption'
suite_map.emplace("Segment::parse_footer:magic_number_corruption", [] {
auto* sp = SyncPoint::get_instance();
sp->set_call_back("Segment::parse_footer:magic_number_corruption_inj", [](auto&& args) {
if (auto p = std::any_cast<uint8_t*>(args[0])) {
memset(p, 0, 12);
} else {
std::cerr << "Failed to cast std::any to uint8_t*" << std::endl;
}
});
});
}

void set_sleep(const std::string& point, HttpRequest* req) {
Expand Down
73 changes: 73 additions & 0 deletions be/src/common/cast_set.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include <limits>
#include <type_traits>

#include "common/exception.h"
#include "common/status.h"

namespace doris {

template <typename T, typename U>
void check_cast_value(U b) {
if constexpr (std::is_unsigned_v<U>) {
if (b > std::numeric_limits<T>::max()) {
throw doris::Exception(ErrorCode::INVALID_ARGUMENT,
"value {} cast to type {} out of range [{},{}]", b,
typeid(T).name(), std::numeric_limits<T>::min(),
std::numeric_limits<T>::max());
}
} else if constexpr (std::is_unsigned_v<T>) {
if (b < 0 || b > std::numeric_limits<T>::max()) {
throw doris::Exception(ErrorCode::INVALID_ARGUMENT,
"value {} cast to type {} out of range [{},{}]", b,
typeid(T).name(), std::numeric_limits<T>::min(),
std::numeric_limits<T>::max());
}
} else {
if (b < std::numeric_limits<T>::min() || b > std::numeric_limits<T>::max()) {
throw doris::Exception(ErrorCode::INVALID_ARGUMENT,
"value {} cast to type {} out of range [{},{}]", b,
typeid(T).name(), std::numeric_limits<T>::min(),
std::numeric_limits<T>::max());
}
}
}

template <typename T, typename U, bool need_check_value = true>
requires std::is_integral_v<T> && std::is_integral_v<U>
void cast_set(T& a, U b) {
if constexpr (need_check_value) {
check_cast_value<T>(b);
}
a = static_cast<T>(b);
}

template <typename T, typename U, bool need_check_value = true>
requires std::is_integral_v<T> && std::is_integral_v<U>
T cast_set(U b) {
if constexpr (need_check_value) {
check_cast_value<T>(b);
}
return static_cast<T>(b);
}

} // namespace doris
24 changes: 24 additions & 0 deletions be/src/common/compile_check_begin.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#ifdef __clang__
#pragma clang diagnostic push
#pragma clang diagnostic error "-Wshorten-64-to-32"
#endif
//#include "common/compile_check_begin.h"
23 changes: 23 additions & 0 deletions be/src/common/compile_check_end.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#ifdef __clang__
#pragma clang diagnostic pop
#endif
// #include "common/compile_check_end.h"
5 changes: 5 additions & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,8 @@ DEFINE_mInt32(doris_scan_range_max_mb, "1024");
DEFINE_mInt32(doris_scanner_row_num, "16384");
// single read execute fragment row bytes
DEFINE_mInt32(doris_scanner_row_bytes, "10485760");
// single read execute fragment max run time millseconds
DEFINE_mInt32(doris_scanner_max_run_time_ms, "1000");
// (Advanced) Maximum size of per-query receive-side buffer
DEFINE_mInt32(exchg_node_buffer_size_bytes, "20485760");
DEFINE_mInt32(exchg_buffer_queue_capacity_factor, "64");
Expand Down Expand Up @@ -1008,6 +1010,9 @@ DEFINE_mInt64(file_cache_ttl_valid_check_interval_second, "0"); // zero for not
DEFINE_Bool(enable_ttl_cache_evict_using_lru, "true");
// rename ttl filename to new format during read, with some performance cost
DEFINE_mBool(translate_to_new_ttl_format_during_read, "false");
DEFINE_mBool(enbale_dump_error_file, "true");
// limit the max size of error log on disk
DEFINE_mInt64(file_cache_error_log_limit_bytes, "209715200"); // 200MB

DEFINE_mInt32(index_cache_entry_stay_time_after_lookup_s, "1800");
DEFINE_mInt32(inverted_index_cache_stale_sweep_time_sec, "600");
Expand Down
5 changes: 5 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,8 @@ DECLARE_mInt32(doris_scan_range_max_mb);
DECLARE_mInt32(doris_scanner_row_num);
// single read execute fragment row bytes
DECLARE_mInt32(doris_scanner_row_bytes);
// single read execute fragment max run time millseconds
DECLARE_mInt32(doris_scanner_max_run_time_ms);
// (Advanced) Maximum size of per-query receive-side buffer
DECLARE_mInt32(exchg_node_buffer_size_bytes);
DECLARE_mInt32(exchg_buffer_queue_capacity_factor);
Expand Down Expand Up @@ -1056,6 +1058,9 @@ DECLARE_mInt64(file_cache_ttl_valid_check_interval_second);
DECLARE_Bool(enable_ttl_cache_evict_using_lru);
// rename ttl filename to new format during read, with some performance cost
DECLARE_Bool(translate_to_new_ttl_format_during_read);
DECLARE_mBool(enbale_dump_error_file);
// limit the max size of error log on disk
DECLARE_mInt64(file_cache_error_log_limit_bytes);

// inverted index searcher cache
// cache entry stay time after lookup
Expand Down
1 change: 1 addition & 0 deletions be/src/common/consts.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ const std::string BLOCK_TEMP_COLUMN_SCANNER_FILTERED = "__TEMP__scanner_filtered
const std::string ROWID_COL = "__DORIS_ROWID_COL__";
const std::string ROW_STORE_COL = "__DORIS_ROW_STORE_COL__";
const std::string DYNAMIC_COLUMN_NAME = "__DORIS_DYNAMIC_COL__";
const std::string PARTIAL_UPDATE_AUTO_INC_COL = "__PARTIAL_UPDATE_AUTO_INC_COLUMN__";

/// The maximum precision representable by a 4-byte decimal (Decimal4Value)
constexpr int MAX_DECIMAL32_PRECISION = 9;
Expand Down
2 changes: 1 addition & 1 deletion be/src/exprs/runtime_filter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1227,7 +1227,7 @@ void IRuntimeFilter::update_state() {
auto execution_timeout = _state->execution_timeout * 1000;
auto runtime_filter_wait_time_ms = _state->runtime_filter_wait_time_ms;
// bitmap filter is precise filter and only filter once, so it must be applied.
int64_t wait_times_ms = _wrapper->get_real_type() == RuntimeFilterType::BITMAP_FILTER
int64_t wait_times_ms = _runtime_filter_type == RuntimeFilterType::BITMAP_FILTER
? execution_timeout
: runtime_filter_wait_time_ms;
auto expected = _rf_state_atomic.load(std::memory_order_acquire);
Expand Down
13 changes: 13 additions & 0 deletions be/src/http/action/stream_load.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,19 @@ void StreamLoadAction::handle(HttpRequest* req) {
_save_stream_load_record(ctx, str);
}
#endif

LOG(INFO) << "finished to execute stream load. label=" << ctx->label
<< ", txn_id=" << ctx->txn_id << ", query_id=" << ctx->id
<< ", load_cost_ms=" << ctx->load_cost_millis << ", receive_data_cost_ms="
<< (ctx->receive_and_read_data_cost_nanos - ctx->read_data_cost_nanos) / 1000000
<< ", read_data_cost_ms=" << ctx->read_data_cost_nanos / 1000000
<< ", write_data_cost_ms=" << ctx->write_data_cost_nanos / 1000000
<< ", commit_and_publish_txn_cost_ms="
<< ctx->commit_and_publish_txn_cost_nanos / 1000000
<< ", number_total_rows=" << ctx->number_total_rows
<< ", number_loaded_rows=" << ctx->number_loaded_rows
<< ", receive_bytes=" << ctx->receive_bytes << ", loaded_bytes=" << ctx->loaded_bytes;

// update statistics
streaming_load_requests_total->increment(1);
streaming_load_duration_ms->increment(ctx->load_cost_millis);
Expand Down
Loading

0 comments on commit f580e2a

Please sign in to comment.