diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/AverageRecordSizeUtils.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/AverageRecordSizeUtils.java new file mode 100644 index 0000000000000..693fb575fdc10 --- /dev/null +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/AverageRecordSizeUtils.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.table.action.commit; + +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.HoodieCommitMetadata; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.config.HoodieWriteConfig; + +import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Iterator; +import java.util.concurrent.atomic.AtomicLong; + +import static org.apache.hudi.common.table.timeline.HoodieTimeline.COMMIT_ACTION; +import static org.apache.hudi.common.table.timeline.HoodieTimeline.DELTA_COMMIT_ACTION; +import static org.apache.hudi.common.table.timeline.HoodieTimeline.REPLACE_COMMIT_ACTION; + +/** + * Util class to assist with fetching average record size. + */ +public class AverageRecordSizeUtils { + private static final Logger LOG = LoggerFactory.getLogger(AverageRecordSizeUtils.class); + + /** + * Obtains the average record size based on records written during previous commits. Used for estimating how many + * records pack into one file. + */ + static long averageBytesPerRecord(HoodieTimeline commitTimeline, HoodieWriteConfig hoodieWriteConfig) { + long avgSize = hoodieWriteConfig.getCopyOnWriteRecordSizeEstimate(); + long fileSizeThreshold = (long) (hoodieWriteConfig.getRecordSizeEstimationThreshold() * hoodieWriteConfig.getParquetSmallFileLimit()); + try { + if (!commitTimeline.empty()) { + // Go over the reverse ordered commits to get a more recent estimate of average record size. + Iterator instants = commitTimeline.getReverseOrderedInstants().iterator(); + while (instants.hasNext()) { + HoodieInstant instant = instants.next(); + HoodieCommitMetadata commitMetadata = HoodieCommitMetadata + .fromBytes(commitTimeline.getInstantDetails(instant).get(), HoodieCommitMetadata.class); + if (instant.getAction().equals(COMMIT_ACTION) || instant.getAction().equals(REPLACE_COMMIT_ACTION)) { + long totalBytesWritten = commitMetadata.fetchTotalBytesWritten(); + long totalRecordsWritten = commitMetadata.fetchTotalRecordsWritten(); + if (totalBytesWritten > fileSizeThreshold && totalRecordsWritten > 0) { + avgSize = (long) Math.ceil((1.0 * totalBytesWritten) / totalRecordsWritten); + break; + } + } else if (instant.getAction().equals(DELTA_COMMIT_ACTION)) { + // lets consider only base files in case of delta commits + AtomicLong totalBytesWritten = new AtomicLong(0L); + AtomicLong totalRecordsWritten = new AtomicLong(0L); + commitMetadata.getWriteStats().stream() + .filter(hoodieWriteStat -> FSUtils.isBaseFile(new Path(hoodieWriteStat.getPath()))) + .forEach(hoodieWriteStat -> { + totalBytesWritten.addAndGet(hoodieWriteStat.getTotalWriteBytes()); + totalRecordsWritten.addAndGet(hoodieWriteStat.getNumWrites()); + }); + if (totalBytesWritten.get() > fileSizeThreshold && totalRecordsWritten.get() > 0) { + avgSize = (long) Math.ceil((1.0 * totalBytesWritten.get()) / totalRecordsWritten.get()); + break; + } + } + } + } + } catch (Throwable t) { + // make this fail safe. + LOG.error("Error trying to compute average bytes/record ", t); + } + return avgSize; + } +} diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java index edd6d981d1850..ba7550af982d0 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java @@ -22,7 +22,6 @@ import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieBaseFile; -import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecordLocation; import org.apache.hudi.common.model.HoodieWriteStat; @@ -46,7 +45,6 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -56,6 +54,8 @@ import scala.Tuple2; import static org.apache.hudi.common.table.timeline.HoodieTimeline.COMMIT_ACTION; +import static org.apache.hudi.common.table.timeline.HoodieTimeline.DELTA_COMMIT_ACTION; +import static org.apache.hudi.common.table.timeline.HoodieTimeline.REPLACE_COMMIT_ACTION; /** * Packs incoming records to be upserted, into buckets (1 bucket = 1 RDD partition). @@ -169,8 +169,9 @@ private void assignInserts(WorkloadProfile profile, HoodieEngineContext context) * created by clustering, which has smaller average record size, which affects assigning inserts and * may result in OOM by making spark underestimate the actual input record sizes. */ - long averageRecordSize = averageBytesPerRecord(table.getMetaClient().getActiveTimeline() - .getTimelineOfActions(CollectionUtils.createSet(COMMIT_ACTION)).filterCompletedInstants(), config); + long averageRecordSize = AverageRecordSizeUtils.averageBytesPerRecord(table.getMetaClient().getActiveTimeline() + .getTimelineOfActions(CollectionUtils.createSet(COMMIT_ACTION, DELTA_COMMIT_ACTION, REPLACE_COMMIT_ACTION)) + .filterCompletedInstants(), config); LOG.info("AvgRecordSize => " + averageRecordSize); Map> partitionSmallFilesMap = @@ -364,34 +365,4 @@ public int getPartition(Object key) { return targetBuckets.get(0).getKey().bucketNumber; } } - - /** - * Obtains the average record size based on records written during previous commits. Used for estimating how many - * records pack into one file. - */ - protected static long averageBytesPerRecord(HoodieTimeline commitTimeline, HoodieWriteConfig hoodieWriteConfig) { - long avgSize = hoodieWriteConfig.getCopyOnWriteRecordSizeEstimate(); - long fileSizeThreshold = (long) (hoodieWriteConfig.getRecordSizeEstimationThreshold() * hoodieWriteConfig.getParquetSmallFileLimit()); - try { - if (!commitTimeline.empty()) { - // Go over the reverse ordered commits to get a more recent estimate of average record size. - Iterator instants = commitTimeline.getReverseOrderedInstants().iterator(); - while (instants.hasNext()) { - HoodieInstant instant = instants.next(); - HoodieCommitMetadata commitMetadata = HoodieCommitMetadata - .fromBytes(commitTimeline.getInstantDetails(instant).get(), HoodieCommitMetadata.class); - long totalBytesWritten = commitMetadata.fetchTotalBytesWritten(); - long totalRecordsWritten = commitMetadata.fetchTotalRecordsWritten(); - if (totalBytesWritten > fileSizeThreshold && totalRecordsWritten > 0) { - avgSize = (long) Math.ceil((1.0 * totalBytesWritten) / totalRecordsWritten); - break; - } - } - } - } catch (Throwable t) { - // make this fail safe. - LOG.error("Error trying to compute average bytes/record ", t); - } - return avgSize; - } } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestAverageRecordSizeUtils.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestAverageRecordSizeUtils.java new file mode 100644 index 0000000000000..bb0835987b0af --- /dev/null +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestAverageRecordSizeUtils.java @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.table.action.commit; + +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.HoodieCommitMetadata; +import org.apache.hudi.common.model.HoodieWriteStat; +import org.apache.hudi.common.table.HoodieTableConfig; +import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.table.timeline.TimelineMetadataUtils; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.config.HoodieWriteConfig; + +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.UUID; +import java.util.stream.Stream; + +import static org.apache.hudi.common.model.HoodieFileFormat.HOODIE_LOG; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Test average record size estimation. + */ +public class TestAverageRecordSizeUtils { + + private final HoodieTimeline mockTimeline = mock(HoodieTimeline.class); + private static final String PARTITION1 = "partition1"; + private static final String TEST_WRITE_TOKEN = "1-0-1"; + private static final String BASE_FILE_EXTENSION = HoodieTableConfig.BASE_FILE_FORMAT.defaultValue().getFileExtension(); + + @ParameterizedTest + @MethodSource("testCasesCOW") + public void testAverageRecordSizeForCOW(List>> instantSizePairs, long expectedSize) { + HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder().withPath("/tmp") + .build(); + HoodieDefaultTimeline commitsTimeline = new HoodieDefaultTimeline(); + List instants = new ArrayList<>(); + instantSizePairs.forEach(entry -> { + HoodieInstant hoodieInstant = entry.getKey(); + Pair recordCountSizePair = entry.getValue(); + HoodieWriteStat writeStat = new HoodieWriteStat(); + writeStat.setNumWrites(recordCountSizePair.getKey()); + writeStat.setTotalWriteBytes(recordCountSizePair.getValue() * recordCountSizePair.getKey()); + HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata(); + commitMetadata.addWriteStat(PARTITION1, writeStat); + instants.add(hoodieInstant); + try { + when(mockTimeline.getInstantDetails(hoodieInstant)).thenReturn(TimelineMetadataUtils.serializeCommitMetadata(commitMetadata)); + } catch (IOException e) { + throw new RuntimeException("Should not have failed", e); + } + }); + + List reverseOrderInstants = new ArrayList<>(instants); + Collections.reverse(reverseOrderInstants); + when(mockTimeline.getInstants()).thenReturn(instants); + when(mockTimeline.getReverseOrderedInstants()).then(i -> reverseOrderInstants.stream()); + commitsTimeline.setInstants(instants); + + assertEquals(expectedSize, AverageRecordSizeUtils.averageBytesPerRecord(mockTimeline, writeConfig)); + } + + @ParameterizedTest + @MethodSource("testCasesMOR") + public void testAverageRecordSizeForMOR(List>> instantSizePairs, long expectedSize) { + HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder().withPath("/tmp") + .build(); + HoodieDefaultTimeline commitsTimeline = new HoodieDefaultTimeline(); + List instants = new ArrayList<>(); + instantSizePairs.forEach(entry -> { + HoodieInstant hoodieInstant = entry.getKey(); + HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata(); + entry.getValue().forEach(hWriteStat -> { + HoodieWriteStat writeStat = new HoodieWriteStat(); + writeStat.setNumWrites(hWriteStat.totalRecordsWritten); + writeStat.setTotalWriteBytes(hWriteStat.getPerRecordSize() * hWriteStat.totalRecordsWritten); + writeStat.setPath(hWriteStat.getPath()); + commitMetadata.addWriteStat(PARTITION1, writeStat); + }); + instants.add(hoodieInstant); + try { + when(mockTimeline.getInstantDetails(hoodieInstant)).thenReturn(TimelineMetadataUtils.serializeCommitMetadata(commitMetadata)); + } catch (IOException e) { + throw new RuntimeException("Should not have failed", e); + } + }); + + List reverseOrderInstants = new ArrayList<>(instants); + Collections.reverse(reverseOrderInstants); + when(mockTimeline.getInstants()).thenReturn(instants); + when(mockTimeline.getReverseOrderedInstants()).then(i -> reverseOrderInstants.stream()); + commitsTimeline.setInstants(instants); + + assertEquals(expectedSize, AverageRecordSizeUtils.averageBytesPerRecord(mockTimeline, writeConfig)); + } + + private static String getBaseFileName(String instantTime) { + String fileName = UUID.randomUUID().toString(); + return FSUtils.makeBaseFileName(instantTime, TEST_WRITE_TOKEN, fileName); + } + + private static String getLogFileName(String instantTime) { + String fileName = UUID.randomUUID().toString(); + String fullFileName = FSUtils.makeBaseFileName(instantTime, TEST_WRITE_TOKEN, fileName); + assertEquals(instantTime, FSUtils.getCommitTime(fullFileName)); + return FSUtils.makeLogFileName(fileName, HOODIE_LOG.getFileExtension(), instantTime, 1, TEST_WRITE_TOKEN); + } + + static Stream testCasesCOW() { + Long baseInstant = 20231204194919610L; + List arguments = new ArrayList<>(); + // straight forward. just 1 instant. + arguments.add(Arguments.of(Collections.singletonList(Pair.of(new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.COMMIT_ACTION, + Long.toString(baseInstant)), Pair.of(10000000L, 100L))), 100L)); + + // two instants. latest instant should be honored + arguments.add(Arguments.of( + Arrays.asList(Pair.of(new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.COMMIT_ACTION, Long.toString(baseInstant)), Pair.of(10000000L, 100L)), + Pair.of(new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.COMMIT_ACTION, Long.toString(baseInstant + 100)), Pair.of(10000000L, 200L))), 200L)); + + // two instants, while 2nd one is smaller in size so as to not meet the threshold. So, 1st one should be honored + arguments.add(Arguments.of( + Arrays.asList(Pair.of(new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.COMMIT_ACTION, Long.toString(baseInstant)), Pair.of(10000000L, 100L)), + Pair.of(new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.COMMIT_ACTION, Long.toString(baseInstant + 100)), Pair.of(100000L, 200L))), 100L)); + + // 2nd instance is replace commit and should be honored. + arguments.add(Arguments.of( + Arrays.asList(Pair.of(new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.COMMIT_ACTION, Long.toString(baseInstant)), Pair.of(10000000L, 100L)), + Pair.of(new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.REPLACE_COMMIT_ACTION, Long.toString(baseInstant + 100)), Pair.of(10000000L, 300L))), 300L)); + return arguments.stream(); + } + + static Stream testCasesMOR() { + Long baseInstant = 20231204194919610L; + List arguments = new ArrayList<>(); + // straight forward. just 1 compaction instant. + arguments.add(Arguments.of(Collections.singletonList(Pair.of(new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.COMMIT_ACTION, + Long.toString(baseInstant)), Collections.singletonList(new HWriteStat(getBaseFileName(String.valueOf(baseInstant)), 10000000L, 100L)))), 100L)); + + // for delta commits, only parquet files should be accounted for. + arguments.add(Arguments.of( + Arrays.asList(Pair.of(new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.COMMIT_ACTION, Long.toString(baseInstant)), + Collections.singletonList(new HWriteStat(getBaseFileName(String.valueOf(baseInstant)), 10000000L, 100L))), + Pair.of(new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.DELTA_COMMIT_ACTION, Long.toString(baseInstant + 100)), + Collections.singletonList(new HWriteStat(getBaseFileName(String.valueOf(baseInstant + 100)), 10000000L, 200L)))), 200L)); + + // delta commit has a mix of parquet and log files. only parquet files should be accounted for. + arguments.add(Arguments.of( + Arrays.asList(Pair.of(new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.DELTA_COMMIT_ACTION, Long.toString(baseInstant)), + Collections.singletonList(new HWriteStat(getBaseFileName(String.valueOf(baseInstant)), 1000000L, 100L))), + Pair.of(new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.DELTA_COMMIT_ACTION, Long.toString(baseInstant + 100)), + Arrays.asList(new HWriteStat(getBaseFileName(String.valueOf(baseInstant + 100)), 10000000L, 200L), + new HWriteStat(getLogFileName(String.valueOf(baseInstant + 100)), 10000000L, 300L)))), 200L)); + + // 2nd delta commit only has log files. and so we honor 1st delta commit size. + arguments.add(Arguments.of( + Arrays.asList(Pair.of(new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.DELTA_COMMIT_ACTION, Long.toString(baseInstant)), + Collections.singletonList(new HWriteStat(getBaseFileName(String.valueOf(baseInstant)), 10000000L, 100L))), + Pair.of(new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.DELTA_COMMIT_ACTION, Long.toString(baseInstant + 100)), + Arrays.asList(new HWriteStat(getLogFileName(String.valueOf(baseInstant + 100)), 1000000L, 200L), + new HWriteStat(getLogFileName(String.valueOf(baseInstant + 100)), 10000000L, 300L)))), 100L)); + + // replace commit should be honored. + arguments.add(Arguments.of( + Arrays.asList(Pair.of(new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.DELTA_COMMIT_ACTION, Long.toString(baseInstant)), + Collections.singletonList(new HWriteStat(getBaseFileName(String.valueOf(baseInstant)), 1000000L, 100L))), + Pair.of(new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.DELTA_COMMIT_ACTION, Long.toString(baseInstant + 100)), + Arrays.asList(new HWriteStat(getLogFileName(String.valueOf(baseInstant + 100)), 1000000L, 200L), + new HWriteStat(getLogFileName(String.valueOf(baseInstant + 100)), 1000000L, 300L))), + Pair.of(new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.REPLACE_COMMIT_ACTION, Long.toString(baseInstant)), + Collections.singletonList(new HWriteStat(getBaseFileName(String.valueOf(baseInstant + 200)), 1000000L, 400L)))), 400L)); + return arguments.stream(); + } + + static class HWriteStat { + private final String path; + private final Long totalRecordsWritten; + private final Long perRecordSize; + + public HWriteStat(String path, Long totalRecordsWritten, Long perRecordSize) { + this.path = path; + this.totalRecordsWritten = totalRecordsWritten; + this.perRecordSize = perRecordSize; + } + + public String getPath() { + return path; + } + + public Long getTotalRecordsWritten() { + return totalRecordsWritten; + } + + public Long getPerRecordSize() { + return perRecordSize; + } + } +} diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java index e320521924842..6e38dce72bdea 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java @@ -67,7 +67,6 @@ import static org.apache.hudi.common.testutils.HoodieTestUtils.DEFAULT_PARTITION_PATHS; import static org.apache.hudi.common.testutils.HoodieTestUtils.generateFakeHoodieWriteStat; import static org.apache.hudi.common.testutils.SchemaTestUtil.getSchemaFromResource; -import static org.apache.hudi.table.action.commit.UpsertPartitioner.averageBytesPerRecord; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; @@ -175,7 +174,7 @@ public void testAverageBytesPerRecordForNonEmptyCommitTimeLine() throws Exceptio LinkedList> commits = generateCommitMetadataList(); when(commitTimeLine.getInstantDetails(any(HoodieInstant.class))).thenAnswer(invocationOnMock -> commits.pop()); long expectAvgSize = (long) Math.ceil((1.0 * 7500) / 1500); - long actualAvgSize = averageBytesPerRecord(commitTimeLine, config); + long actualAvgSize = AverageRecordSizeUtils.averageBytesPerRecord(commitTimeLine, config); assertEquals(expectAvgSize, actualAvgSize); } @@ -185,7 +184,7 @@ public void testAverageBytesPerRecordForEmptyCommitTimeLine() throws Exception { HoodieWriteConfig config = makeHoodieClientConfigBuilder().build(); when(commitTimeLine.empty()).thenReturn(true); long expectAvgSize = config.getCopyOnWriteRecordSizeEstimate(); - long actualAvgSize = averageBytesPerRecord(commitTimeLine, config); + long actualAvgSize = AverageRecordSizeUtils.averageBytesPerRecord(commitTimeLine, config); assertEquals(expectAvgSize, actualAvgSize); }