diff --git a/smart-common/src/main/java/org/smartdata/conf/SmartConfKeys.java b/smart-common/src/main/java/org/smartdata/conf/SmartConfKeys.java index 805e7fbf576..34361a7314c 100644 --- a/smart-common/src/main/java/org/smartdata/conf/SmartConfKeys.java +++ b/smart-common/src/main/java/org/smartdata/conf/SmartConfKeys.java @@ -121,6 +121,8 @@ public class SmartConfKeys { // Action public static final String SMART_ACTION_MOVE_THROTTLE_MB_KEY = "smart.action.move.throttle.mb"; public static final long SMART_ACTION_MOVE_THROTTLE_MB_DEFAULT = 0L; // 0 means unlimited + public static final String SMART_ACTION_COPY_THROTTLE_MB_KEY = "smart.action.copy.throttle.mb"; + public static final long SMART_ACTION_COPY_THROTTLE_MB_DEFAULT = 0L; // 0 means unlimited public static final String SMART_ACTION_LOCAL_EXECUTION_DISABLED_KEY = "smart.action.local.execution.disabled"; public static final boolean SMART_ACTION_LOCAL_EXECUTION_DISABLED_DEFAULT = false; diff --git a/smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/scheduler/CopyScheduler.java b/smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/scheduler/CopyScheduler.java index 130e3a0b3b2..9b5c4a2ff20 100644 --- a/smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/scheduler/CopyScheduler.java +++ b/smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/scheduler/CopyScheduler.java @@ -18,6 +18,7 @@ package org.smartdata.hdfs.scheduler; import com.google.common.annotations.VisibleForTesting; +import com.google.common.util.concurrent.RateLimiter; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -90,6 +91,9 @@ public class CopyScheduler extends ActionSchedulerService { private int cacheSyncTh = 100; // record the file_diff whether being changed private Map fileDiffCacheChanged; + // throttle for copy action + private long throttleInMb; + private RateLimiter rateLimiter = null; public CopyScheduler(SmartContext context, MetaStore metaStore) { @@ -113,6 +117,12 @@ public CopyScheduler(SmartContext context, MetaStore metaStore) { } this.fileDiffCache = new ConcurrentHashMap<>(); this.fileDiffCacheChanged = new ConcurrentHashMap<>(); + + throttleInMb = conf.getLong(SmartConfKeys.SMART_ACTION_COPY_THROTTLE_MB_KEY, + SmartConfKeys.SMART_ACTION_COPY_THROTTLE_MB_DEFAULT); + if (throttleInMb > 0) { + rateLimiter = RateLimiter.create(throttleInMb); + } } @Override @@ -143,6 +153,20 @@ public ScheduleResult onSchedule(ActionInfo actionInfo, LaunchAction action) { case APPEND: action.setActionType("copy"); action.getArgs().put("-dest", path.replace(srcDir, destDir)); + if (rateLimiter != null) { + String strLen = fileDiff.getParameters().get("-length"); + if (strLen != null) { + int appendLen = (int)(Long.valueOf(strLen) >> 20); + if (appendLen > 0) { + if (!rateLimiter.tryAcquire(appendLen)) { + if (LOG.isDebugEnabled()) { + LOG.debug("Cancel Scheduling COPY action {} due to throttling.", actionInfo); + } + return ScheduleResult.RETRY; + } + } + } + } break; case DELETE: action.setActionType("delete");