Skip to content

Commit

Permalink
some fix
Browse files Browse the repository at this point in the history
  • Loading branch information
mymeiyi committed Sep 29, 2024
1 parent 3b07d43 commit 78739c1
Show file tree
Hide file tree
Showing 9 changed files with 31 additions and 7 deletions.
3 changes: 1 addition & 2 deletions be/src/cloud/cloud_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,7 @@ Status CloudTablet::capture_rs_readers(const Version& spec_version,
return capture_rs_readers_unlocked(version_path, rs_splits);
}

Status CloudTablet::capture_sub_txn_rowsets(int64_t version,
const std::vector<int64_t>& sub_txn_ids,
Status CloudTablet::capture_sub_txn_rowsets(const std::vector<int64_t>& sub_txn_ids,
std::vector<RowsetSharedPtr>* rowsets) {
return _engine.meta_mgr().get_tmp_rowset(tablet_schema(), index_id(), tablet_id(), sub_txn_ids,
*rowsets);
Expand Down
2 changes: 1 addition & 1 deletion be/src/cloud/cloud_tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class CloudTablet final : public BaseTablet {

Status capture_rs_readers(const Version& spec_version, std::vector<RowSetSplits>* rs_splits,
bool skip_missing_version) override;
Status capture_sub_txn_rowsets(int64_t version, const std::vector<int64_t>& sub_txn_ids,
Status capture_sub_txn_rowsets(const std::vector<int64_t>& sub_txn_ids,
std::vector<RowsetSharedPtr>* rowsets) override;

Status capture_consistent_rowsets_unlocked(
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/base_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ Status BaseTablet::capture_sub_txn_rs_readers(int64_t version,
<< ", tablet_id=" << tablet_id() << ", version=" << version
<< ", sub_txn_ids.size=" << sub_txn_ids.size();
std::vector<RowsetSharedPtr> rowsets;
RETURN_IF_ERROR(capture_sub_txn_rowsets(version, sub_txn_ids, &rowsets));
RETURN_IF_ERROR(capture_sub_txn_rowsets(sub_txn_ids, &rowsets));
DCHECK(rowsets.size() == sub_txn_ids.size())
<< " sub_txn_id size=" << sub_txn_ids.size() << ", rowset size=" << rowsets.size()
<< ", partition_id=" << partition_id() << ", tablet=" << tablet_id();
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/base_tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ class BaseTablet {
Status capture_sub_txn_rs_readers(int64_t version, const std::vector<int64_t>& sub_txn_ids,
std::vector<RowSetSplits>* rs_splits);

virtual Status capture_sub_txn_rowsets(int64_t version, const std::vector<int64_t>& sub_txn_ids,
virtual Status capture_sub_txn_rowsets(const std::vector<int64_t>& sub_txn_ids,
std::vector<RowsetSharedPtr>* rowsets) = 0;

virtual size_t tablet_footprint() = 0;
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -938,7 +938,7 @@ Status Tablet::capture_rs_readers(const Version& spec_version, std::vector<RowSe
return Status::OK();
}

Status Tablet::capture_sub_txn_rowsets(int64_t version, const std::vector<int64_t>& sub_txn_ids,
Status Tablet::capture_sub_txn_rowsets(const std::vector<int64_t>& sub_txn_ids,
std::vector<RowsetSharedPtr>* rowsets) {
for (int i = 0; i < sub_txn_ids.size(); ++i) {
auto sub_txn_id = sub_txn_ids[i];
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ class Tablet final : public BaseTablet {
// If skip_missing_version is true, skip versions if they are missing.
Status capture_rs_readers(const Version& spec_version, std::vector<RowSetSplits>* rs_splits,
bool skip_missing_version) override;
Status capture_sub_txn_rowsets(int64_t version, const std::vector<int64_t>& sub_txn_ids,
Status capture_sub_txn_rowsets(const std::vector<int64_t>& sub_txn_ids,
std::vector<RowsetSharedPtr>* rowsets) override;

// Find the missed versions until the spec_version.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -977,6 +977,9 @@ public class Config extends ConfigBase {
@ConfField(mutable = true, masterOnly = true)
public static boolean disable_load_job = false;

@ConfField(mutable = true, masterOnly = true)
public static int max_sub_transactions_in_transaction_load = 50;

/*
* One master daemon thread will update database used data quota for db txn manager
* every db_used_data_quota_update_interval_secs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,10 @@ public long beginTransaction(TableIf table) throws Exception {
throw new AnalysisException(
"Transaction insert must be in the same database, expect db_id=" + this.database.getId());
}
if (subTransactionStates.size() >= Config.max_sub_transactions_in_transaction_load) {
throw new UserException("Transaction load can not have more than "
+ Config.max_sub_transactions_in_transaction_load + " sub transactions");
}
long subTxnId;
if (Config.isCloudMode()) {
TUniqueId queryId = ConnectContext.get().queryId();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,4 +231,22 @@ suite("txn_insert_inject_case", "nonConcurrent") {
}
assertEquals(7, rowCount)
}

// check the limit of sub transaction num
try {
sql """ admin set frontend config('max_sub_transactions_in_transaction_load' = '4') """
sql """begin"""
sql """insert into ${table}_0 select * from ${table}_1;"""
sql """insert into ${table}_0 select * from ${table}_1;"""
sql """insert into ${table}_0 select * from ${table}_1;"""
sql """insert into ${table}_0 select * from ${table}_1;"""
sql """insert into ${table}_0 select * from ${table}_1;"""
assertTrue(false, "should not reach here")
} catch (Exception e) {
logger.info("failed", e)
sql """rollback"""
assertTrue(e.getMessage().contains("Transaction load can not have more than"))
} finally {
sql """ admin set frontend config('max_sub_transactions_in_transaction_load' = '50') """
}
}

0 comments on commit 78739c1

Please sign in to comment.