Skip to content

Commit

Permalink
some fix
Browse files Browse the repository at this point in the history
  • Loading branch information
mymeiyi committed Sep 29, 2024
1 parent eceb30b commit 58694e1
Show file tree
Hide file tree
Showing 10 changed files with 56 additions and 67 deletions.
1 change: 1 addition & 0 deletions be/src/cloud/cloud_meta_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -795,6 +795,7 @@ Status CloudMetaMgr::update_tmp_rowset(const RowsetMeta& rs_meta) {
Status CloudMetaMgr::get_tmp_rowset(TabletSchemaSPtr tablet_schema, int64_t index_id,
int64_t tablet_id, const std::vector<int64_t>& txn_ids,
std::vector<std::shared_ptr<Rowset>>& rowsets) {
// see CloudMetaMgr::sync_tablet_rowsets, GetRowsetRequest
VLOG_DEBUG << "get tmp rowset, tablet_id: " << tablet_id
<< ", txn_ids size: " << txn_ids.size();
GetTmpRowsetRequest req;
Expand Down
40 changes: 4 additions & 36 deletions be/src/cloud/cloud_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,42 +108,10 @@ Status CloudTablet::capture_rs_readers(const Version& spec_version,
return capture_rs_readers_unlocked(version_path, rs_splits);
}

Status CloudTablet::capture_sub_txn_rs_readers(int64_t version,
const std::vector<int64_t>& sub_txn_ids,
std::vector<RowSetSplits>* rs_splits) {
LOG(INFO) << "capture_sub_txn_rs_readers for partition=" << partition_id()
<< ", tablet_id=" << tablet_id() << ", version=" << version
<< ", sub_txn_ids.size=" << sub_txn_ids.size();
std::vector<std::shared_ptr<Rowset>> rowsets;
RETURN_IF_ERROR(_engine.meta_mgr().get_tmp_rowset(tablet_schema(), index_id(), tablet_id(),
sub_txn_ids, rowsets));
DCHECK(rowsets.size() == sub_txn_ids.size())
<< " sub_txn_id size=" << sub_txn_ids.size() << ", rowset size=" << rowsets.size()
<< ", partition_id=" << partition_id() << ", tablet=" << tablet_id();
for (int i = 0; i < rowsets.size(); ++i) {
auto& rowset = rowsets[i];
DCHECK(rowset != nullptr) << " rowset is nullptr for sub_txn_id=" << sub_txn_ids[i]
<< ", partition_id=" << partition_id()
<< ", tablet=" << tablet_id();
// see CloudMetaMgr::sync_tablet_rowsets, GetRowsetRequest
int64_t tmp_version = version + i + 1;
LOG(INFO) << "sub_txn_id=" << sub_txn_ids[i] << ", partition_id=" << partition_id()
<< ", tablet=" << tablet_id() << ", set tmp version=" << tmp_version;
rowset->set_version(Version(tmp_version, tmp_version));
if (rowset->rowset_meta()->has_delete_predicate()) {
rowset->rowset_meta()->mutable_delete_predicate()->set_version(tmp_version);
}
if (rowset != nullptr) {
RowsetReaderSharedPtr rs_reader;
auto res = rowset->create_reader(&rs_reader);
if (!res.ok()) {
return Status::Error<ErrorCode::CAPTURE_ROWSET_READER_ERROR>(
"failed to create reader for rowset:{}", rowset->rowset_id().to_string());
}
rs_splits->emplace_back(std::move(rs_reader));
}
}
return Status::OK();
Status CloudTablet::capture_sub_txn_rowsets(int64_t version, const std::vector<int64_t>& sub_txn_ids,
std::vector<RowsetSharedPtr>* rowsets) {
return _engine.meta_mgr().get_tmp_rowset(tablet_schema(), index_id(), tablet_id(), sub_txn_ids,
*rowsets);
}

// There are only two tablet_states RUNNING and NOT_READY in cloud mode
Expand Down
4 changes: 2 additions & 2 deletions be/src/cloud/cloud_tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ class CloudTablet final : public BaseTablet {

Status capture_rs_readers(const Version& spec_version, std::vector<RowSetSplits>* rs_splits,
bool skip_missing_version) override;
Status capture_sub_txn_rs_readers(int64_t version, const std::vector<int64_t>& sub_txn_ids,
std::vector<RowSetSplits>* rs_splits) override;
Status capture_sub_txn_rowsets(int64_t version, const std::vector<int64_t>& sub_txn_ids,
std::vector<RowsetSharedPtr>* rowsets) override;

Status capture_consistent_rowsets_unlocked(
const Version& spec_version, std::vector<RowsetSharedPtr>* rowsets) const override;
Expand Down
36 changes: 36 additions & 0 deletions be/src/olap/base_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,42 @@ BaseTablet::~BaseTablet() {
g_total_tablet_num << -1;
}

Status BaseTablet::capture_sub_txn_rs_readers(int64_t version,
const std::vector<int64_t>& sub_txn_ids,
std::vector<RowSetSplits>* rs_splits) {
LOG(INFO) << "capture_sub_txn_rs_readers for partition=" << partition_id()
<< ", tablet_id=" << tablet_id() << ", version=" << version
<< ", sub_txn_ids.size=" << sub_txn_ids.size();
std::vector<RowsetSharedPtr> rowsets;
RETURN_IF_ERROR(capture_sub_txn_rowsets(version, sub_txn_ids, &rowsets));
DCHECK(rowsets.size() == sub_txn_ids.size())
<< " sub_txn_id size=" << sub_txn_ids.size() << ", rowset size=" << rowsets.size()
<< ", partition_id=" << partition_id() << ", tablet=" << tablet_id();
for (int i = 0; i < rowsets.size(); ++i) {
auto& rowset = rowsets[i];
DCHECK(rowset != nullptr) << " rowset is nullptr for sub_txn_id=" << sub_txn_ids[i]
<< ", partition_id=" << partition_id()
<< ", tablet=" << tablet_id();
int64_t tmp_version = version + i + 1;
LOG(INFO) << "sub_txn_id=" << sub_txn_ids[i] << ", partition_id=" << partition_id()
<< ", tablet=" << tablet_id() << ", set tmp version=" << tmp_version;
rowset->set_version(Version(tmp_version, tmp_version));
if (rowset->rowset_meta()->has_delete_predicate()) {
rowset->rowset_meta()->mutable_delete_predicate()->set_version(tmp_version);
}
if (rowset != nullptr) {
RowsetReaderSharedPtr rs_reader;
auto res = rowset->create_reader(&rs_reader);
if (!res.ok()) {
return Status::Error<ErrorCode::CAPTURE_ROWSET_READER_ERROR>(
"failed to create reader for rowset:{}", rowset->rowset_id().to_string());
}
rs_splits->emplace_back(std::move(rs_reader));
}
}
return Status::OK();
}

TabletSchemaSPtr BaseTablet::tablet_schema_with_merged_max_schema_version(
const std::vector<RowsetMetaSharedPtr>& rowset_metas) {
RowsetMetaSharedPtr max_schema_version_rs = *std::max_element(
Expand Down
8 changes: 5 additions & 3 deletions be/src/olap/base_tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,11 @@ class BaseTablet {
std::vector<RowSetSplits>* rs_splits,
bool skip_missing_version) = 0;

virtual Status capture_sub_txn_rs_readers(int64_t version,
const std::vector<int64_t>& sub_txn_ids,
std::vector<RowSetSplits>* rs_splits) = 0;
Status capture_sub_txn_rs_readers(int64_t version, const std::vector<int64_t>& sub_txn_ids,
std::vector<RowSetSplits>* rs_splits);

virtual Status capture_sub_txn_rowsets(int64_t version, const std::vector<int64_t>& sub_txn_ids,
std::vector<RowsetSharedPtr>* rowsets) = 0;

virtual size_t tablet_footprint() = 0;

Expand Down
24 changes: 3 additions & 21 deletions be/src/olap/tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -938,34 +938,16 @@ Status Tablet::capture_rs_readers(const Version& spec_version, std::vector<RowSe
return Status::OK();
}

Status Tablet::capture_sub_txn_rs_readers(int64_t version, const std::vector<int64_t>& sub_txn_ids,
std::vector<RowSetSplits>* rs_splits) {
LOG(INFO) << "capture_sub_txn_rs_readers for partition=" << partition_id()
<< ", tablet_id=" << tablet_id() << ", version=" << version
<< ", sub_txn_ids.size=" << sub_txn_ids.size();
Status Tablet::capture_sub_txn_rowsets(int64_t version, const std::vector<int64_t>& sub_txn_ids,
std::vector<RowsetSharedPtr>* rowsets) {
for (int i = 0; i < sub_txn_ids.size(); ++i) {
auto sub_txn_id = sub_txn_ids[i];
auto rowset = _engine.txn_manager()->get_tablet_rowset(tablet_id(), tablet_uid(),
partition_id(), sub_txn_id);
DCHECK(rowset != nullptr) << " rowset is nullptr for sub_txn_id=" << sub_txn_ids[i]
<< ", partition_id=" << partition_id()
<< ", tablet=" << tablet_id();
int64_t tmp_version = version + i + 1;
LOG(INFO) << "sub_txn_id=" << sub_txn_ids[i] << ", partition_id=" << partition_id()
<< ", tablet=" << tablet_id() << ", set tmp version=" << tmp_version;
rowset->set_version(Version(tmp_version, tmp_version));
if (rowset->rowset_meta()->has_delete_predicate()) {
rowset->rowset_meta()->mutable_delete_predicate()->set_version(tmp_version);
}
if (rowset != nullptr) {
RowsetReaderSharedPtr rs_reader;
auto res = rowset->create_reader(&rs_reader);
if (!res.ok()) {
return Status::Error<ErrorCode::CAPTURE_ROWSET_READER_ERROR>(
"failed to create reader for rowset:{}", rowset->rowset_id().to_string());
}
rs_splits->emplace_back(std::move(rs_reader));
}
rowsets->push_back(rowset);
}
return Status::OK();
}
Expand Down
4 changes: 2 additions & 2 deletions be/src/olap/tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -201,8 +201,8 @@ class Tablet final : public BaseTablet {
// If skip_missing_version is true, skip versions if they are missing.
Status capture_rs_readers(const Version& spec_version, std::vector<RowSetSplits>* rs_splits,
bool skip_missing_version) override;
Status capture_sub_txn_rs_readers(int64_t version, const std::vector<int64_t>& sub_txn_ids,
std::vector<RowSetSplits>* rs_splits) override;
Status capture_sub_txn_rowsets(int64_t version, const std::vector<int64_t>& sub_txn_ids,
std::vector<RowsetSharedPtr>* rowsets) override;

// Find the missed versions until the spec_version.
//
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public TxnDeleteJob(long id, long transactionId, String label, Map<Long, Short>
@Override
public long beginTxn() throws Exception {
TransactionEntry txnEntry = ConnectContext.get().getTxnEntry();
this.transactionId = txnEntry.beginTransaction(targetTbl, SubTransactionType.DELETE);
this.transactionId = txnEntry.beginTransaction(targetTbl);
this.label = txnEntry.getLabel();
return this.transactionId;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public void beginTransaction() {
throw new AnalysisException("Transaction insert expect label " + txnEntry.getLabel()
+ ", but got " + this.labelName);
}
this.txnId = txnEntry.beginTransaction(table, SubTransactionType.INSERT);
this.txnId = txnEntry.beginTransaction(table);
this.labelName = txnEntry.getLabel();
} catch (Exception e) {
throw new AnalysisException("begin transaction failed. " + e.getMessage(), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ public void setTxnSchemaVersion(int txnSchemaVersion) {
}

// Used for insert into select, return the sub_txn_id for this insert
public long beginTransaction(TableIf table, SubTransactionType subTransactionType) throws Exception {
public long beginTransaction(TableIf table) throws Exception {
if (isInsertValuesTxnBegan()) {
// FIXME: support mix usage of `insert into values` and `insert into select`
throw new AnalysisException(
Expand Down

0 comments on commit 58694e1

Please sign in to comment.