Skip to content

Commit

Permalink
[HUDI-8394] Restrict same partition multiple bulk inserts in append m…
Browse files Browse the repository at this point in the history
…ode into COW with bucket index
  • Loading branch information
geserdugarov committed Nov 15, 2024
1 parent 41816e3 commit df5dd11
Show file tree
Hide file tree
Showing 3 changed files with 104 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
package org.apache.hudi.table;

import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieNotSupportedException;
import org.apache.hudi.execution.bulkinsert.BulkInsertSortMode;
import org.apache.hudi.io.AppendHandleFactory;
import org.apache.hudi.io.SingleFileHandleCreateFactory;
Expand Down Expand Up @@ -47,6 +49,7 @@ public abstract class BucketIndexBulkInsertPartitioner<T> implements BulkInsertP
protected final List<String> indexKeyFields;
protected final List<Boolean> doAppend = new ArrayList<>();
protected final List<String> fileIdPfxList = new ArrayList<>();
protected boolean isAppendAllowed;

public BucketIndexBulkInsertPartitioner(HoodieTable table, String sortString, boolean preserveHoodieMetadata) {

Expand All @@ -59,12 +62,19 @@ public BucketIndexBulkInsertPartitioner(HoodieTable table, String sortString, bo
this.sortColumnNames = null;
}
this.preserveHoodieMetadata = preserveHoodieMetadata;
// Bulk insert into COW table with bucket index is allowed only once, otherwise AppendHadleFactory will produce MOR log files
this.isAppendAllowed = !table.getMetaClient().getTableConfig().getTableType().equals(HoodieTableType.COPY_ON_WRITE);
}

@Override
public Option<WriteHandleFactory> getWriteHandleFactory(int idx) {
return doAppend.get(idx) ? Option.of(new AppendHandleFactory()) :
Option.of(new SingleFileHandleCreateFactory(FSUtils.createNewFileId(getFileIdPfx(idx), 0), this.preserveHoodieMetadata));
if (!doAppend.get(idx)) {
return Option.of(new SingleFileHandleCreateFactory(FSUtils.createNewFileId(getFileIdPfx(idx), 0), this.preserveHoodieMetadata));
} else if (isAppendAllowed) {
return Option.of(new AppendHandleFactory());
} else {
throw new HoodieNotSupportedException("Bulk insert into COW table with bucket index is allowed only once, please, use upsert operation instead");
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.data.HoodieJavaRDD;
import org.apache.hudi.exception.HoodieNotSupportedException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.table.BulkInsertPartitioner;
import org.apache.hudi.table.HoodieSparkTable;
Expand All @@ -47,6 +48,7 @@

import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;

public class TestRDDSimpleBucketBulkInsertPartitioner extends HoodieSparkClientTestHarness {

Expand Down Expand Up @@ -104,27 +106,31 @@ public void testSimpleBucketPartitioner(String tableType, boolean partitionSort)
}, false).collect();
}

// first writes, it will create new bucket files based on the records
// 1st write, will create new bucket files based on the records
getHoodieWriteClient(config).startCommitWithTime("0");
List<WriteStatus> writeStatues = getHoodieWriteClient(config).bulkInsert(HoodieJavaRDD.getJavaRDD(javaRDD), "0").collect();
Map<String, WriteStatus> writeStatuesMap = new HashMap<>();
writeStatues.forEach(ws -> writeStatuesMap.put(ws.getFileId(), ws));

// second writes the same records, all records should be mapped to the same bucket files
getHoodieWriteClient(config).startCommitWithTime("1");
List<WriteStatus> writeStatues2 = getHoodieWriteClient(config).bulkInsert(HoodieJavaRDD.getJavaRDD(javaRDD), "1").collect();
writeStatues2.forEach(ws -> assertEquals(ws.getTotalRecords(), writeStatuesMap.get(ws.getFileId()).getTotalRecords()));
List<WriteStatus> writeStatuses = getHoodieWriteClient(config).bulkInsert(HoodieJavaRDD.getJavaRDD(javaRDD), "0").collect();
Map<String, WriteStatus> writeStatusesMap = new HashMap<>();
writeStatuses.forEach(ws -> writeStatusesMap.put(ws.getFileId(), ws));

getHoodieWriteClient(config).startCommitWithTime("2");
try {
// MOR table, 2nd write of the same records, all records should be mapped to the same bucket files
List<WriteStatus> writeStatuses2 = getHoodieWriteClient(config).bulkInsert(HoodieJavaRDD.getJavaRDD(javaRDD), "2").collect();
writeStatuses2.forEach(ws -> assertEquals(ws.getTotalRecords(), writeStatusesMap.get(ws.getFileId()).getTotalRecords()));
} catch (Exception ex) {
// COW table, 2nd bulk insert is restricted if it's not overwrite
assertEquals("COPY_ON_WRITE", tableType);
Throwable exceptionWithRealReason = ex.getCause().getCause();
assertInstanceOf(HoodieNotSupportedException.class, exceptionWithRealReason);
assertEquals("Bulk insert into COW table with bucket index is allowed only once, please, use upsert operation instead", exceptionWithRealReason.getMessage());
}
}

private static final List<Object> TABLE_TYPES = Arrays.asList(
"COPY_ON_WRITE",
"MERGE_ON_READ"
);

private static Iterable<Object> tableTypes() {
return TABLE_TYPES;
}

// table type, partitionSort
private static Iterable<Object[]> configParams() {
List<Object[]> opts = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1668,47 +1668,88 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
Seq(3, "a3,3", 30.0, 3000, "2021-01-07")
)

spark.sql(
s"""
| insert into $tableName values
| (1, 'a1', 10, 1000, "2021-01-05"),
| (3, "a3", 30, 3000, "2021-01-07")
""".stripMargin)
// for COW table append bulk insert multiple times is restricted
if (tableType != "cow") {
spark.sql(
s"""
| insert into $tableName values
| (1, 'a1', 10, 1000, "2021-01-05"),
| (3, "a3", 30, 3000, "2021-01-07")
""".stripMargin)

checkAnswer(s"select id, name, price, ts, dt from $tableName")(
Seq(1, "a1,1", 10.0, 1000, "2021-01-05"),
Seq(1, "a1", 10.0, 1000, "2021-01-05"),
Seq(2, "a2", 20.0, 2000, "2021-01-06"),
Seq(3, "a3,3", 30.0, 3000, "2021-01-07"),
Seq(3, "a3", 30.0, 3000, "2021-01-07")
)
checkAnswer(s"select id, name, price, ts, dt from $tableName")(
Seq(1, "a1,1", 10.0, 1000, "2021-01-05"),
Seq(1, "a1", 10.0, 1000, "2021-01-05"),
Seq(2, "a2", 20.0, 2000, "2021-01-06"),
Seq(3, "a3,3", 30.0, 3000, "2021-01-07"),
Seq(3, "a3", 30.0, 3000, "2021-01-07")
)

// there are two files in partition(dt = '2021-01-05')
checkAnswer(s"select count(distinct _hoodie_file_name) from $tableName where dt = '2021-01-05'")(
Seq(2)
)
// there are two files in partition(dt = '2021-01-05')
checkAnswer(s"select count(distinct _hoodie_file_name) from $tableName where dt = '2021-01-05'")(
Seq(2)
)

// would generate 6 other files in partition(dt = '2021-01-05')
spark.sql(
s"""
| insert into $tableName values
| (4, 'a1,1', 10, 1000, "2021-01-05"),
| (5, 'a1,1', 10, 1000, "2021-01-05"),
| (6, 'a1,1', 10, 1000, "2021-01-05"),
| (7, 'a1,1', 10, 1000, "2021-01-05"),
| (8, 'a1,1', 10, 1000, "2021-01-05"),
| (10, 'a3,3', 30, 3000, "2021-01-05")
""".stripMargin)

checkAnswer(s"select count(distinct _hoodie_file_name) from $tableName where dt = '2021-01-05'")(
Seq(8)
)
// would generate 6 other files in partition(dt = '2021-01-05')
spark.sql(
s"""
| insert into $tableName values
| (4, 'a1,1', 10, 1000, "2021-01-05"),
| (5, 'a1,1', 10, 1000, "2021-01-05"),
| (6, 'a1,1', 10, 1000, "2021-01-05"),
| (7, 'a1,1', 10, 1000, "2021-01-05"),
| (8, 'a1,1', 10, 1000, "2021-01-05"),
| (10, 'a3,3', 30, 3000, "2021-01-05")
""".stripMargin)

checkAnswer(s"select count(distinct _hoodie_file_name) from $tableName where dt = '2021-01-05'")(
Seq(8)
)
}
}
}
}
}
}

test("Test not supported double Bulk Insert Into Bucket Index COW Table") {
withSQLConf("hoodie.datasource.write.operation" -> "bulk_insert", "hoodie.bulkinsert.shuffle.parallelism" -> "1") {
withTempDir { tmp =>
val tableName = generateTableName
spark.sql(
s"""
|create table $tableName (
| id int,
| dt string,
| name string,
| price double,
| ts long
|) using hudi
| tblproperties (
| primaryKey = 'id,name',
| type = 'cow',
| preCombineField = 'ts',
| hoodie.index.type = 'BUCKET',
| hoodie.index.bucket.engine = 'SIMPLE',
| hoodie.bucket.index.num.buckets = '2',
| hoodie.bucket.index.hash.field = 'id,name',
| hoodie.datasource.write.row.writer.enable = 'false')
| partitioned by (dt)
| location '${tmp.getCanonicalPath}'
""".stripMargin)
spark.sql(s"insert into $tableName values (5, 'a1,1', 10, 1000, '2021-01-05')")
spark.sql(s"insert into $tableName values (6, 'a6,6', 20, 2000, '2024-02-03')")
// main check of this test case
checkExceptionContain(s"insert into $tableName values (9, 'a3,3', 30, 3000, '2021-01-05')")("Bulk insert into COW table with bucket index is allowed only once")
// for overwrite it's allowed to do multiple bulk inserts
spark.sql(s"insert overwrite $tableName values (9, 'a3,3', 30, 3000, '2021-01-05')")
spark.sql(s"insert overwrite $tableName values (13, 'a13,13', 40, 4000, '2024-02-03')")
// only the last data is available
checkAnswer (s"select count(distinct _hoodie_file_name) from $tableName")(Seq(2))
}
}
}

/**
* This test is to make sure that bulk insert doesn't create a bunch of tiny files if
* hoodie.bulkinsert.user.defined.partitioner.sort.columns doesn't start with the partition columns
Expand Down

0 comments on commit df5dd11

Please sign in to comment.