Skip to content

Commit

Permalink
support mow
Browse files Browse the repository at this point in the history
  • Loading branch information
mymeiyi committed Sep 27, 2024
1 parent 740af58 commit 7646c20
Show file tree
Hide file tree
Showing 9 changed files with 261 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3716,6 +3716,10 @@ public TQueryOptions toThrift() {
tResult.setSkipDeletePredicate(skipDeletePredicate);

tResult.setSkipDeleteBitmap(skipDeleteBitmap);
if (ConnectContext.get().isTxnModel()) {
// TODO set to true only if the sub txn ids are not empty
tResult.setSkipDeleteBitmap(true);
}

tResult.setPartitionedHashJoinRowsThreshold(partitionedHashJoinRowsThreshold);
tResult.setPartitionedHashAggRowsThreshold(partitionedHashAggRowsThreshold);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,13 +248,6 @@ public long beginTransaction(TableIf table, SubTransactionType subTransactionTyp
throw new AnalysisException(
"Transaction insert must be in the same database, expect db_id=" + this.database.getId());
}
// TODO for delete type, make sure there is no insert for the same table for mow
if (subTransactionType == SubTransactionType.DELETE && subTransactionStates.stream()
.anyMatch(s -> ((OlapTable) s.getTable()).getEnableUniqueKeyMergeOnWrite()
&& s.getTable().getId() == table.getId()
&& s.getSubTransactionType() == SubTransactionType.INSERT)) {
throw new AnalysisException("Can not delete because there is a insert operation for the same table");
}
long subTxnId;
if (Config.isCloudMode()) {
TUniqueId queryId = ConnectContext.get().queryId();
Expand Down Expand Up @@ -551,10 +544,6 @@ public List<Long> getPartitionSubTxnIds(long tableId, Partition partition, long
if (subTransactionState.getTable().getId() != tableId) {
continue;
}
// TODO mow will be supported
if (((OlapTable) subTransactionState.getTable()).getEnableUniqueKeyMergeOnWrite()) {
continue;
}
for (TTabletCommitInfo tabletCommitInfo : subTransactionState.getTabletCommitInfos()) {
if (index.getTablet(tabletCommitInfo.getTabletId()) != null) {
subTxnIds.add(subTransactionState.getSubTransactionId());
Expand All @@ -574,10 +563,6 @@ public List<Long> getTabletSubTxnIds(long tableId, Tablet tablet) {
if (subTransactionState.getTable().getId() != tableId) {
continue;
}
// TODO mow will be supported
if (((OlapTable) subTransactionState.getTable()).getEnableUniqueKeyMergeOnWrite()) {
continue;
}
for (TTabletCommitInfo tabletCommitInfo : subTransactionState.getTabletCommitInfos()) {
if (tablet.getId() == tabletCommitInfo.getTabletId()) {
subTxnIds.add(subTransactionState.getSubTransactionId());
Expand All @@ -591,7 +576,8 @@ public List<Long> getTabletSubTxnIds(long tableId, Tablet tablet) {
return subTxnIds;
}

public List<Replica> getQueryableReplicas(long tabletId, List<Replica> replicas, List<Long> subTxnIds) {
public List<Replica> getQueryableReplicas(long tabletId, List<Replica> replicas, List<Long> subTxnIds)
throws UserException {
List<Replica> queryableReplicas = new ArrayList<>(replicas);
for (Long subTxnId : subTxnIds) {
if (queryableReplicas.isEmpty()) {
Expand All @@ -602,7 +588,8 @@ public List<Replica> getQueryableReplicas(long tabletId, List<Replica> replicas,
return queryableReplicas;
}

private List<Replica> getQueryableReplicas(long tabletId, List<Replica> replicas, long subTxnId) {
private List<Replica> getQueryableReplicas(long tabletId, List<Replica> replicas, long subTxnId)
throws UserException {
List<Replica> queryableReplicas = new ArrayList<>();
for (SubTransactionState subTransactionState : subTransactionStates) {
if (subTxnId != subTransactionState.getSubTransactionId()) {
Expand Down
83 changes: 83 additions & 0 deletions regression-test/data/insert_p0/transaction/sub_txn_mow.out
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !select_61 --
1 a 1
2 b 2

-- !select_62 --
1 a 1
2 b 2

-- !select_63 --
1 a 1
2 b 2

-- !select_61 --
1 a 1
2 b 2

-- !select_62 --
1 a 1
2 b 2

-- !select_63 --
1 a 1
2 b 2

-- !par_71 --
1 a 1
10 a 10
2 b 2
20 b 2

-- !par_72 --
1 a 1
10 a 10
2 b 2
20 b 2

-- !par_73 --
1 a 1
10 a 10
2 b 2
20 b 2

-- !par_71 --
1 a 1
10 a 10
2 b 2
20 b 2

-- !par_72 --
1 a 1
10 a 10
2 b 2
20 b 2

-- !par_73 --
1 a 1
10 a 10
2 b 2
20 b 2

-- !del_71 --
1 a 1
2 b 2

-- !del_72 --
1 a 1
2 b 2

-- !del_73 --
2 b 2

-- !del_71 --
1 a 1
2 b 2

-- !del_72 --
1 a 1
2 b 2

-- !del_73 --
2 b 2

Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,6 @@
// specific language governing permissions and limitations
// under the License.

// The cases is copied from https://github.com/trinodb/trino/tree/master
// /testing/trino-product-tests/src/main/resources/sql-tests/testcases
// and modified by Doris.

import com.mysql.cj.jdbc.StatementImpl
import java.sql.Connection
import java.sql.DriverManager
import java.sql.Statement
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit

suite("sub_txn_aggregate") {
sql """ set enable_query_in_transaction_load = true """
// case 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,6 @@
// specific language governing permissions and limitations
// under the License.

// The cases is copied from https://github.com/trinodb/trino/tree/master
// /testing/trino-product-tests/src/main/resources/sql-tests/testcases
// and modified by Doris.

import com.mysql.cj.jdbc.StatementImpl
import java.sql.Connection
import java.sql.DriverManager
import java.sql.Statement
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit

suite("sub_txn_duplicate") {
sql """ set enable_query_in_transaction_load = true """
// case 1
Expand Down
11 changes: 0 additions & 11 deletions regression-test/suites/insert_p0/transaction/sub_txn_mor.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,6 @@
// specific language governing permissions and limitations
// under the License.

// The cases is copied from https://github.com/trinodb/trino/tree/master
// /testing/trino-product-tests/src/main/resources/sql-tests/testcases
// and modified by Doris.

import com.mysql.cj.jdbc.StatementImpl
import java.sql.Connection
import java.sql.DriverManager
import java.sql.Statement
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit

suite("sub_txn_mor") {
sql """ set enable_query_in_transaction_load = true """
// case 1
Expand Down
170 changes: 170 additions & 0 deletions regression-test/suites/insert_p0/transaction/sub_txn_mow.groovy
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

suite("sub_txn_mow") {
if (isCloudMode()) {
logger.info("txn load for mow tables is not supported in cloud mode")
return
}
sql """ set enable_query_in_transaction_load = true """
// case 1
def table_txn = "sub_txn_mow"
def table_normal = "sub_txn_mow_n"
for (def i in 1..3) {
for (def prefix: [table_normal, table_txn]) {
sql """ drop table if exists ${prefix}_${i} """
sql """
CREATE TABLE ${prefix}_${i} (
`id` int(11) NOT NULL,
`name` varchar(50) NULL,
`score` int(11) NULL default "-1"
) ENGINE=OLAP
UNIQUE KEY(`id`)
DISTRIBUTED BY HASH(`id`) BUCKETS 1
PROPERTIES (
"replication_num" = "1",
"enable_mow_light_delete" = "true"
);
"""
if (i == 1) {
sql """ insert into ${prefix}_${i} values(1, "a", 1), (2, "b", 2); """
}
}
}
for (def prefix: [table_normal, table_txn]) {
if (prefix == table_txn) {
sql """ begin; """
}

sql """ insert into ${prefix}_3 select * from ${prefix}_2; """
// order_qt_select_1 """ select * from ${prefix}_3; """

sql """ insert into ${prefix}_3 select * from ${prefix}_1; """
// order_qt_select_2 """ select * from ${prefix}_3; """

sql """ insert into ${prefix}_2 select * from ${prefix}_3; """
// order_qt_select_3 """ select * from ${prefix}_2; """

sql """ insert into ${prefix}_1 select * from ${prefix}_2; """
// order_qt_select_4 """ select * from ${prefix}_1; """

sql """ insert into ${prefix}_2 select * from ${prefix}_1; """
// order_qt_select_5 """ select * from ${prefix}_2; """

if (prefix == table_txn) {
sql """ commit; """
}
order_qt_select_61 """ select * from ${prefix}_1; """
order_qt_select_62 """ select * from ${prefix}_2; """
order_qt_select_63 """ select * from ${prefix}_3; """
}

// case 2: insert with partition
table_txn = "sub_txn_mow_p"
table_normal = "sub_txn_mow_pn"
for (def i in 1..3) {
for (def prefix: [table_normal, table_txn]) {
sql """ drop table if exists ${prefix}_${i} """
sql """
CREATE TABLE ${prefix}_${i} (
`id` int(11) NOT NULL,
`name` varchar(50) NULL,
`score` int(11) NULL default "-1"
) ENGINE=OLAP
UNIQUE KEY(`id`)
PARTITION BY RANGE(id)
(
FROM (1) TO (30) INTERVAL 10
)
DISTRIBUTED BY HASH(`id`) BUCKETS 2
PROPERTIES (
"enable_unique_key_merge_on_write" = "false",
"replication_num" = "1"
);
"""
if (i == 1) {
sql """ insert into ${prefix}_${i} values(1, "a", 1), (2, "b", 2), (10, "a", 10), (20, "b", 2); """
}
}
}
sql """ set enable_insert_strict = false """
for (def prefix: [table_normal, table_txn]) {
if (prefix == table_txn) {
sql """ begin; """
}

sql """ insert into ${prefix}_3 PARTITION(p_1_11) select * from ${prefix}_2; """
// order_qt_par_1 """ select * from ${prefix}_3; """

sql """ insert into ${prefix}_2 PARTITION(p_1_11) select * from ${prefix}_3; """
// order_qt_par_2 """ select * from ${prefix}_2; """

sql """ insert into ${prefix}_3 select * from ${prefix}_1; """
// order_qt_par_3 """ select * from ${prefix}_3; """

sql """ insert into ${prefix}_2 select * from ${prefix}_3; """
// order_qt_par_4 """ select * from ${prefix}_2; """

sql """ insert into ${prefix}_3 PARTITION(p_1_11) select * from ${prefix}_2; """
// order_qt_par_5 """ select * from ${prefix}_3; """

sql """ insert into ${prefix}_2 PARTITION(p_11_21) select * from ${prefix}_3; """
// order_qt_par_6 """ select * from ${prefix}_2; """

if (prefix == table_txn) {
sql """ commit; """
}
order_qt_par_71 """ select * from ${prefix}_1; """
order_qt_par_72 """ select * from ${prefix}_2; """
order_qt_par_73 """ select * from ${prefix}_3; """
}
sql """ set enable_insert_strict = true """

// case 3: delete command
table_txn = "sub_txn_mow"
table_normal = "sub_txn_mow_n"
for (def prefix: [table_normal, table_txn]) {
if (prefix == table_txn) {
sql """ begin; """
}

sql """ delete from ${prefix}_3 where id > 1; """
// order_qt_del_1 """ select * from ${prefix}_3; """

sql """ insert into ${prefix}_2 select * from ${prefix}_3; """
// order_qt_del_2 """ select * from ${prefix}_2; """

sql """ insert into ${prefix}_3 select * from ${prefix}_2; """
// order_qt_del_3 """ select * from ${prefix}_3; """

sql """ update ${prefix}_3 set score = score + 100 where id = 1; """
// order_qt_del_4 """ select * from ${prefix}_3; """

sql """ delete from ${prefix}_3 where id < 2; """
// order_qt_del_5 """ select * from ${prefix}_3; """

sql """ insert into ${prefix}_2 select * from ${prefix}_3; """
// order_qt_del_6 """ select * from ${prefix}_2; """

if (prefix == table_txn) {
sql """ commit; """
}
order_qt_del_71 """ select * from ${prefix}_1; """
order_qt_del_72 """ select * from ${prefix}_2; """
order_qt_del_73 """ select * from ${prefix}_3; """
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,6 @@
// specific language governing permissions and limitations
// under the License.

// The cases is copied from https://github.com/trinodb/trino/tree/master
// /testing/trino-product-tests/src/main/resources/sql-tests/testcases
// and modified by Doris.

import org.apache.doris.regression.suite.ClusterOptions
import org.apache.doris.regression.util.NodeType

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,6 @@
// specific language governing permissions and limitations
// under the License.

// The cases is copied from https://github.com/trinodb/trino/tree/master
// /testing/trino-product-tests/src/main/resources/sql-tests/testcases
// and modified by Doris.

import org.apache.doris.regression.suite.ClusterOptions
import org.apache.doris.regression.util.NodeType

Expand Down

0 comments on commit 7646c20

Please sign in to comment.