Skip to content

Commit

Permalink
Fixing average record size estimation for delta commits
Browse files Browse the repository at this point in the history
  • Loading branch information
nsivabalan committed Feb 27, 2024
1 parent 9da1f2b commit d3a7378
Show file tree
Hide file tree
Showing 4 changed files with 324 additions and 37 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.hudi.table.action.commit;

import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.config.HoodieWriteConfig;

import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Iterator;
import java.util.concurrent.atomic.AtomicLong;

import static org.apache.hudi.common.table.timeline.HoodieTimeline.COMMIT_ACTION;
import static org.apache.hudi.common.table.timeline.HoodieTimeline.DELTA_COMMIT_ACTION;
import static org.apache.hudi.common.table.timeline.HoodieTimeline.REPLACE_COMMIT_ACTION;

/**
* Util class to assist with fetching average record size.
*/
public class AverageRecordSizeUtils {
private static final Logger LOG = LoggerFactory.getLogger(AverageRecordSizeUtils.class);

/**
* Obtains the average record size based on records written during previous commits. Used for estimating how many
* records pack into one file.
*/
static long averageBytesPerRecord(HoodieTimeline commitTimeline, HoodieWriteConfig hoodieWriteConfig) {
long avgSize = hoodieWriteConfig.getCopyOnWriteRecordSizeEstimate();
long fileSizeThreshold = (long) (hoodieWriteConfig.getRecordSizeEstimationThreshold() * hoodieWriteConfig.getParquetSmallFileLimit());
try {
if (!commitTimeline.empty()) {
// Go over the reverse ordered commits to get a more recent estimate of average record size.
Iterator<HoodieInstant> instants = commitTimeline.getReverseOrderedInstants().iterator();
while (instants.hasNext()) {
HoodieInstant instant = instants.next();
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
.fromBytes(commitTimeline.getInstantDetails(instant).get(), HoodieCommitMetadata.class);
if (instant.getAction().equals(COMMIT_ACTION) || instant.getAction().equals(REPLACE_COMMIT_ACTION)) {
long totalBytesWritten = commitMetadata.fetchTotalBytesWritten();
long totalRecordsWritten = commitMetadata.fetchTotalRecordsWritten();
if (totalBytesWritten > fileSizeThreshold && totalRecordsWritten > 0) {
avgSize = (long) Math.ceil((1.0 * totalBytesWritten) / totalRecordsWritten);
break;
}
} else if (instant.getAction().equals(DELTA_COMMIT_ACTION)) {
// lets consider only base files in case of delta commits
AtomicLong totalBytesWritten = new AtomicLong(0L);
AtomicLong totalRecordsWritten = new AtomicLong(0L);
commitMetadata.getWriteStats().stream()
.filter(hoodieWriteStat -> FSUtils.isBaseFile(new Path(hoodieWriteStat.getPath())))
.forEach(hoodieWriteStat -> {
totalBytesWritten.addAndGet(hoodieWriteStat.getTotalWriteBytes());
totalRecordsWritten.addAndGet(hoodieWriteStat.getNumWrites());
});
if (totalBytesWritten.get() > fileSizeThreshold && totalRecordsWritten.get() > 0) {
avgSize = (long) Math.ceil((1.0 * totalBytesWritten.get()) / totalRecordsWritten.get());
break;
}
}
}
}
} catch (Throwable t) {
// make this fail safe.
LOG.error("Error trying to compute average bytes/record ", t);
}
return avgSize;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieWriteStat;
Expand All @@ -46,7 +45,6 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
Expand All @@ -56,6 +54,8 @@
import scala.Tuple2;

import static org.apache.hudi.common.table.timeline.HoodieTimeline.COMMIT_ACTION;
import static org.apache.hudi.common.table.timeline.HoodieTimeline.DELTA_COMMIT_ACTION;
import static org.apache.hudi.common.table.timeline.HoodieTimeline.REPLACE_COMMIT_ACTION;

/**
* Packs incoming records to be upserted, into buckets (1 bucket = 1 RDD partition).
Expand Down Expand Up @@ -169,8 +169,9 @@ private void assignInserts(WorkloadProfile profile, HoodieEngineContext context)
* created by clustering, which has smaller average record size, which affects assigning inserts and
* may result in OOM by making spark underestimate the actual input record sizes.
*/
long averageRecordSize = averageBytesPerRecord(table.getMetaClient().getActiveTimeline()
.getTimelineOfActions(CollectionUtils.createSet(COMMIT_ACTION)).filterCompletedInstants(), config);
long averageRecordSize = AverageRecordSizeUtils.averageBytesPerRecord(table.getMetaClient().getActiveTimeline()
.getTimelineOfActions(CollectionUtils.createSet(COMMIT_ACTION, DELTA_COMMIT_ACTION, REPLACE_COMMIT_ACTION))
.filterCompletedInstants(), config);
LOG.info("AvgRecordSize => " + averageRecordSize);

Map<String, List<SmallFile>> partitionSmallFilesMap =
Expand Down Expand Up @@ -364,34 +365,4 @@ public int getPartition(Object key) {
return targetBuckets.get(0).getKey().bucketNumber;
}
}

/**
* Obtains the average record size based on records written during previous commits. Used for estimating how many
* records pack into one file.
*/
protected static long averageBytesPerRecord(HoodieTimeline commitTimeline, HoodieWriteConfig hoodieWriteConfig) {
long avgSize = hoodieWriteConfig.getCopyOnWriteRecordSizeEstimate();
long fileSizeThreshold = (long) (hoodieWriteConfig.getRecordSizeEstimationThreshold() * hoodieWriteConfig.getParquetSmallFileLimit());
try {
if (!commitTimeline.empty()) {
// Go over the reverse ordered commits to get a more recent estimate of average record size.
Iterator<HoodieInstant> instants = commitTimeline.getReverseOrderedInstants().iterator();
while (instants.hasNext()) {
HoodieInstant instant = instants.next();
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
.fromBytes(commitTimeline.getInstantDetails(instant).get(), HoodieCommitMetadata.class);
long totalBytesWritten = commitMetadata.fetchTotalBytesWritten();
long totalRecordsWritten = commitMetadata.fetchTotalRecordsWritten();
if (totalBytesWritten > fileSizeThreshold && totalRecordsWritten > 0) {
avgSize = (long) Math.ceil((1.0 * totalBytesWritten) / totalRecordsWritten);
break;
}
}
}
} catch (Throwable t) {
// make this fail safe.
LOG.error("Error trying to compute average bytes/record ", t);
}
return avgSize;
}
}
Loading

0 comments on commit d3a7378

Please sign in to comment.