From 53ab86bbcb49e39aa54f12fc59e8ab405902935e Mon Sep 17 00:00:00 2001 From: PHILO-HE Date: Wed, 26 Sep 2018 14:41:14 +0800 Subject: [PATCH] Add a throttle on EC to avoid IO overload (#1947) --- conf/smart-default.xml | 6 +++ .../org/smartdata/conf/SmartConfKeys.java | 2 + .../hdfs/action/ErasureCodingAction.java | 2 +- .../scheduler/ErasureCodingScheduler.java | 46 +++++++++++++++---- 4 files changed, 46 insertions(+), 10 deletions(-) diff --git a/conf/smart-default.xml b/conf/smart-default.xml index 6a87ebbb393..bc46dff20a9 100644 --- a/conf/smart-default.xml +++ b/conf/smart-default.xml @@ -133,6 +133,12 @@ The throughput limit (MB) for SSM copy overall + + smart.action.ec.throttle.mb + 0 + The throughput limit (MB) for SSM EC overall + + smart.action.local.execution.disabled false diff --git a/smart-common/src/main/java/org/smartdata/conf/SmartConfKeys.java b/smart-common/src/main/java/org/smartdata/conf/SmartConfKeys.java index 95c3209a52a..45b0eb48288 100644 --- a/smart-common/src/main/java/org/smartdata/conf/SmartConfKeys.java +++ b/smart-common/src/main/java/org/smartdata/conf/SmartConfKeys.java @@ -138,6 +138,8 @@ public class SmartConfKeys { public static final long SMART_ACTION_MOVE_THROTTLE_MB_DEFAULT = 0L; // 0 means unlimited public static final String SMART_ACTION_COPY_THROTTLE_MB_KEY = "smart.action.copy.throttle.mb"; public static final long SMART_ACTION_COPY_THROTTLE_MB_DEFAULT = 0L; // 0 means unlimited + public static final String SMART_ACTION_EC_THROTTLE_MB_KEY = "smart.action.ec.throttle.mb"; + public static final long SMART_ACTION_EC_THROTTLE_MB_DEFAULT = 0L; public static final String SMART_ACTION_LOCAL_EXECUTION_DISABLED_KEY = "smart.action.local.execution.disabled"; public static final boolean SMART_ACTION_LOCAL_EXECUTION_DISABLED_DEFAULT = false; diff --git a/smart-hadoop-support/smart-hadoop-3.1/src/main/java/org/smartdata/hdfs/action/ErasureCodingAction.java b/smart-hadoop-support/smart-hadoop-3.1/src/main/java/org/smartdata/hdfs/action/ErasureCodingAction.java index a290bc2eaff..58df3050ac4 100644 --- a/smart-hadoop-support/smart-hadoop-3.1/src/main/java/org/smartdata/hdfs/action/ErasureCodingAction.java +++ b/smart-hadoop-support/smart-hadoop-3.1/src/main/java/org/smartdata/hdfs/action/ErasureCodingAction.java @@ -84,7 +84,7 @@ protected void execute() throws Exception { final String DIR_RESULT = "The EC policy is set successfully for the given directory."; final String CONVERT_RESULT = - "The file is converted successfully with the given or default ec policy."; + "The file is converted successfully with the given or default EC policy."; this.setDfsClient(HadoopUtil.getDFSClient( HadoopUtil.getNameNodeUri(conf), conf)); diff --git a/smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/scheduler/ErasureCodingScheduler.java b/smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/scheduler/ErasureCodingScheduler.java index 06d3b673471..775935c84f1 100644 --- a/smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/scheduler/ErasureCodingScheduler.java +++ b/smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/scheduler/ErasureCodingScheduler.java @@ -17,11 +17,13 @@ */ package org.smartdata.hdfs.scheduler; +import com.google.common.util.concurrent.RateLimiter; import org.apache.hadoop.util.VersionInfo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.smartdata.SmartContext; import org.smartdata.conf.SmartConf; +import org.smartdata.conf.SmartConfKeys; import org.smartdata.hdfs.action.*; import org.smartdata.metastore.MetaStore; import org.smartdata.metastore.MetaStoreException; @@ -48,11 +50,18 @@ public class ErasureCodingScheduler extends ActionSchedulerService { private Set fileLock; private SmartConf conf; private MetaStore metaStore; + private long throttleInMb; + private RateLimiter rateLimiter; public ErasureCodingScheduler(SmartContext context, MetaStore metaStore) { super(context, metaStore); this.conf = context.getConf(); this.metaStore = metaStore; + this.throttleInMb = conf.getLong( + SmartConfKeys.SMART_ACTION_EC_THROTTLE_MB_KEY, SmartConfKeys.SMART_ACTION_EC_THROTTLE_MB_DEFAULT); + if (this.throttleInMb > 0) { + this.rateLimiter = RateLimiter.create(throttleInMb); + } } public List getSupportedActions() { @@ -112,17 +121,25 @@ public ScheduleResult onSchedule(ActionInfo actionInfo, LaunchAction action) { return ScheduleResult.SUCCESS; } - // check file lock merely for ec & unec action - if (fileLock.contains(srcPath)) { - return ScheduleResult.FAIL; - } try { - if (!metaStore.getFile(srcPath).isdir()) { - // For ec or unec, add ecTmp argument - String tmpName = createTmpName(action); - action.getArgs().put(EC_TMP, EC_DIR + tmpName); - actionInfo.getArgs().put(EC_TMP, EC_DIR + tmpName); + if (metaStore.getFile(srcPath).isdir()) { + return ScheduleResult.SUCCESS; + } + // The below code is just for ec or unec action with file as argument, not directory + // check file lock merely for ec & unec action + if (fileLock.contains(srcPath)) { + return ScheduleResult.FAIL; + } + if (isLimitedByThrottle(srcPath)) { + if (LOG.isDebugEnabled()) { + LOG.debug("Failed to schedule {} due to limitation of throttle!", actionInfo); + } + return ScheduleResult.RETRY; } + // For ec or unec, add ecTmp argument + String tmpName = createTmpName(action); + action.getArgs().put(EC_TMP, EC_DIR + tmpName); + actionInfo.getArgs().put(EC_TMP, EC_DIR + tmpName); } catch (MetaStoreException ex) { LOG.error("Error occurred for getting file info", ex); actionInfo.appendLog(ex.getMessage()); @@ -159,4 +176,15 @@ public void onActionFinished(ActionInfo actionInfo) { fileLock.remove(actionInfo.getArgs().get(HdfsAction.FILE_PATH)); } } + + public boolean isLimitedByThrottle(String srcPath) throws MetaStoreException { + if (this.rateLimiter == null) { + return false; + } + int fileLengthInMb = (int) metaStore.getFile(srcPath).getLength() >> 20; + if (fileLengthInMb > 0) { + return !rateLimiter.tryAcquire(fileLengthInMb); + } + return false; + } }