Skip to content

Commit

Permalink
[Flink] Set default max input split size to -1 (disabled)
Browse files Browse the repository at this point in the history
  • Loading branch information
jto committed Aug 28, 2023
1 parent 40d2093 commit 9579fe0
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -313,11 +313,11 @@ public interface FlinkPipelineOptions

void setFlinkConfDir(String confDir);

@Description("Set the maximum size of input split when data is read from a filesystem.")
@Default.Long(128 * 1024 * 1024)
Long getFileInputSplitMaxSizeBytes();
@Description("Set the maximum size of input split when data is read from a filesystem. -1 implies no max size.")
@Default.Long(-1)
Long getFileInputSplitMaxSizeMB();

void setFileInputSplitMaxSizeBytes(Long inputSplitMaxSizeBytes);
void setFileInputSplitMaxSizeMB(Long inputSplitMaxSizeMB);

static FlinkPipelineOptions defaults() {
return PipelineOptionsFactory.as(FlinkPipelineOptions.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,12 +113,12 @@ public float getAverageRecordWidth() {
private long getDesiredSizeBytes(int numSplits) throws Exception {
long totalSize = initialSource.getEstimatedSizeBytes(options);
long defaultSplitSize = totalSize / numSplits;
if (initialSource instanceof FileBasedSource) {
long maxSplitSize = options.as(FlinkPipelineOptions.class).getFileInputSplitMaxSizeBytes();
long maxSplitSize = options.as(FlinkPipelineOptions.class).getFileInputSplitMaxSizeMB();
if (initialSource instanceof FileBasedSource && maxSplitSize > 0) {
// Most of the time parallelism is < number of files in source.
// Each file becomes a unique split which commonly create skew.
// This limits the size of splits to 128Mb to reduce skew.
return Math.min(defaultSplitSize, maxSplitSize);
// This limits the size of splits to reduce skew.
return Math.min(defaultSplitSize, maxSplitSize * 1024 * 1024);
} else {
return defaultSplitSize;
}
Expand Down

0 comments on commit 9579fe0

Please sign in to comment.