Skip to content

Commit

Permalink
[fix](mtmv) Fix written by mv successfully but not get mv lock. If us…
Browse files Browse the repository at this point in the history
…e mv, data maybe wrong (apache#40173)

When refresh partition mv, meanwhile, the query that can be written
successfully by the same mv, if we run the query, the result maybe
wrong.
this pr fix this.
  • Loading branch information
seawinde committed Oct 23, 2024
1 parent 65803b4 commit f2bc4a3
Show file tree
Hide file tree
Showing 5 changed files with 181 additions and 3 deletions.
5 changes: 5 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/MTMV.java
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,11 @@ public MTMV() {
mvRwLock = new ReentrantReadWriteLock(true);
}

@Override
public boolean needReadLockWhenPlan() {
return true;
}

public MTMVRefreshInfo getRefreshInfo() {
readMvLock();
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,8 @@ public synchronized void addTableReadLock(TableIf tableIf) {
String fullTableName = tableIf.getNameWithFullQualifiers();
String resourceName = "tableReadLock(" + fullTableName + ")";
plannerResources.push(new CloseableResource(
resourceName, Thread.currentThread().getName(), originStatement.originStmt, tableIf::readUnlock));
resourceName, Thread.currentThread().getName(),
originStatement == null ? null : originStatement.originStmt, tableIf::readUnlock));
}

/** releasePlannerResources */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,12 @@ protected List<Plan> doRewrite(StructInfo queryStructInfo, CascadesContext casca
// Rewrite query by view
rewrittenPlan = rewriteQueryByView(matchMode, queryStructInfo, viewStructInfo, viewToQuerySlotMapping,
rewrittenPlan, materializationContext, cascadesContext);
// If rewrite successfully, try to get mv read lock to avoid data inconsistent,
// try to get lock which should added before RBO
if (materializationContext instanceof AsyncMaterializationContext && !materializationContext.isSuccess()) {
cascadesContext.getStatementContext()
.addTableReadLock(((AsyncMaterializationContext) materializationContext).getMtmv());
}
rewrittenPlan = MaterializedViewUtils.rewriteByRules(cascadesContext,
childContext -> {
Rewriter.getWholeTreeRewriter(childContext).execute();
Expand Down Expand Up @@ -371,11 +377,11 @@ protected List<Plan> doRewrite(StructInfo queryStructInfo, CascadesContext casca
logicalProperties, queryPlan.getLogicalProperties()));
continue;
}
recordIfRewritten(queryStructInfo.getOriginalPlan(), materializationContext, cascadesContext);
trySetStatistics(materializationContext, cascadesContext);
rewriteResults.add(rewrittenPlan);
// if rewrite successfully, try to regenerate mv scan because it maybe used again
materializationContext.tryReGenerateScanPlan(cascadesContext);
recordIfRewritten(queryStructInfo.getOriginalPlan(), materializationContext, cascadesContext);
}
return rewriteResults;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,12 @@ public PlanChecker rewrite() {
public PlanChecker optimize() {
cascadesContext.setJobContext(PhysicalProperties.GATHER);
double now = System.currentTimeMillis();
new Optimizer(cascadesContext).execute();
try {
new Optimizer(cascadesContext).execute();
} finally {
// Mv rewrite add lock manually, so need release manually
cascadesContext.getStatementContext().releasePlannerResources();
}
System.out.println("cascades:" + (System.currentTimeMillis() - now));
return this;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

suite("dml_rewrite_with_lock", "zfr_mtmv_test") {

String db = context.config.getDbNameByFile(context.file)
sql "use ${db}"
sql "SET enable_materialized_view_rewrite=true"
sql "SET enable_materialized_view_nest_rewrite=true"
sql "SET enable_materialized_view_union_rewrite=true"

sql """
drop table if exists lineitem_range_date_union
"""

sql """CREATE TABLE `lineitem_range_date_union` (
`l_orderkey` BIGINT NULL,
`l_linenumber` INT NULL,
`l_partkey` INT NULL,
`l_suppkey` INT NULL,
`l_quantity` DECIMAL(15, 2) NULL,
`l_extendedprice` DECIMAL(15, 2) NULL,
`l_discount` DECIMAL(15, 2) NULL,
`l_tax` DECIMAL(15, 2) NULL,
`l_returnflag` VARCHAR(1) NULL,
`l_linestatus` VARCHAR(1) NULL,
`l_commitdate` DATE NULL,
`l_receiptdate` DATE NULL,
`l_shipinstruct` VARCHAR(25) NULL,
`l_shipmode` VARCHAR(10) NULL,
`l_comment` VARCHAR(44) NULL,
`l_shipdate` DATE not NULL
) ENGINE=OLAP
DUPLICATE KEY(l_orderkey, l_linenumber, l_partkey, l_suppkey )
COMMENT 'OLAP'
partition by range (`l_shipdate`) (
partition p1 values [("2023-10-29"), ("2023-10-30")),
partition p2 values [("2023-10-30"), ("2023-10-31")),
partition p3 values [("2023-10-31"), ("2023-11-01")))
DISTRIBUTED BY HASH(`l_orderkey`) BUCKETS 96
PROPERTIES (
"replication_allocation" = "tag.location.default: 1"
);"""

sql """
drop table if exists orders_range_date_union
"""

sql """CREATE TABLE `orders_range_date_union` (
`o_orderkey` BIGINT NULL,
`o_custkey` INT NULL,
`o_orderstatus` VARCHAR(1) NULL,
`o_totalprice` DECIMAL(15, 2) NULL,
`o_orderpriority` VARCHAR(15) NULL,
`o_clerk` VARCHAR(15) NULL,
`o_shippriority` INT NULL,
`o_comment` VARCHAR(79) NULL,
`o_orderdate` DATE not NULL
) ENGINE=OLAP
DUPLICATE KEY(`o_orderkey`, `o_custkey`)
COMMENT 'OLAP'
partition by range (`o_orderdate`) (
partition p1 values [("2023-10-29"), ("2023-10-30")),
partition p2 values [("2023-10-30"), ("2023-10-31")),
partition p3 values [("2023-10-31"), ("2023-11-01")),
partition p4 values [("2023-11-01"), ("2023-11-02")),
partition p5 values [("2023-11-02"), ("2023-11-03")))
DISTRIBUTED BY HASH(`o_orderkey`) BUCKETS 96
PROPERTIES (
"replication_allocation" = "tag.location.default: 1"
);"""

sql """
insert into lineitem_range_date_union values
(null, 1, 2, 3, 5.5, 6.5, 7.5, 8.5, 'o', 'k', '2023-10-17', '2023-10-17', 'a', 'b', 'yyyyyyyyy', '2023-10-29'),
(1, null, 3, 1, 5.5, 6.5, 7.5, 8.5, 'o', 'k', '2023-10-18', '2023-10-18', 'a', 'b', 'yyyyyyyyy', '2023-10-29'),
(3, 3, null, 2, 7.5, 8.5, 9.5, 10.5, 'k', 'o', '2023-10-19', '2023-10-19', 'c', 'd', 'xxxxxxxxx', '2023-10-31'),
(1, 2, 3, null, 5.5, 6.5, 7.5, 8.5, 'o', 'k', '2023-10-17', '2023-10-17', 'a', 'b', 'yyyyyyyyy', '2023-10-29'),
(2, 3, 2, 1, 5.5, 6.5, 7.5, 8.5, 'o', 'k', null, '2023-10-18', 'a', 'b', 'yyyyyyyyy', '2023-10-30'),
(3, 1, 1, 2, 7.5, 8.5, 9.5, 10.5, 'k', 'o', '2023-10-19', null, 'c', 'd', 'xxxxxxxxx', '2023-10-31'),
(1, 3, 2, 2, 5.5, 6.5, 7.5, 8.5, 'o', 'k', '2023-10-17', '2023-10-17', 'a', 'b', 'yyyyyyyyy', '2023-10-29');
"""

sql """
insert into orders_range_date_union values
(null, 1, 'k', 99.5, 'a', 'b', 1, 'yy', '2023-10-29'),
(1, null, 'o', 109.2, 'c','d',2, 'mm', '2023-10-29'),
(3, 3, null, 99.5, 'a', 'b', 1, 'yy', '2023-10-30'),
(1, 2, 'o', null, 'a', 'b', 1, 'yy', '2023-11-01'),
(2, 3, 'k', 109.2, null,'d',2, 'mm', '2023-11-02'),
(3, 1, 'k', 99.5, 'a', null, 1, 'yy', '2023-11-02'),
(1, 3, 'o', 99.5, 'a', 'b', null, 'yy', '2023-10-31'),
(2, 1, 'o', 109.2, 'c','d',2, null, '2023-10-30'),
(3, 2, 'k', 99.5, 'a', 'b', 1, 'yy', '2023-10-29'),
(4, 5, 'k', 99.5, 'a', 'b', 1, 'yy', '2023-10-31');
"""

sql """DROP MATERIALIZED VIEW if exists day_mv;"""
create_async_mv(db, "day_mv",
"""select date_trunc(`l_shipdate`, 'day') as col1, l_shipdate, l_orderkey
from lineitem_range_date_union as t1 left join orders_range_date_union as t2
on t1.l_orderkey = t2.o_orderkey group by col1, l_shipdate, l_orderkey;
"""
)

def query1 = """
select date_trunc(`l_shipdate`, 'day') as col1, l_shipdate, l_orderkey
from lineitem_range_date_union as t1 left join orders_range_date_union as t2
on t1.l_orderkey = t2.o_orderkey
group by col1, l_shipdate, l_orderkey
"""

mv_rewrite_success(query1, "day_mv")

def query2 = """
select date_trunc(`l_shipdate`, 'hour') as col1, l_shipdate, l_orderkey from
lineitem_range_date_union as t1 left join orders_range_date_union as t2
on t1.l_orderkey = t2.o_orderkey
group by col1, l_shipdate, l_orderkey
"""

sql """DROP MATERIALIZED VIEW if exists hour_mv;"""
create_async_mv(db, "hour_mv",
"""
select date_trunc(`l_shipdate`, 'hour') as col1, l_shipdate, l_orderkey from
lineitem_range_date_union as t1 left join orders_range_date_union as t2
on t1.l_orderkey = t2.o_orderkey group by col1, l_shipdate, l_orderkey;
""")
mv_rewrite_success(query2, "hour_mv")


sql """alter table lineitem_range_date_union add partition p4 values [("2023-11-01"), ("2023-11-02"));"""
sql """insert into lineitem_range_date_union values
(1, null, 3, 1, 5.5, 6.5, 7.5, 8.5, 'o', 'k', '2023-10-18', '2023-10-18', 'a', 'b', 'yyyyyyyyy', '2023-11-01')"""

sql """refresh MATERIALIZED VIEW hour_mv auto;"""
waitingMTMVTaskFinishedByMvName("hour_mv")

sql """refresh MATERIALIZED VIEW day_mv auto;"""
waitingMTMVTaskFinishedByMvName("day_mv")

mv_rewrite_success(query1, "day_mv")
mv_rewrite_success(query2, "hour_mv")
}



0 comments on commit f2bc4a3

Please sign in to comment.