diff --git a/conf/smart-default.xml b/conf/smart-default.xml
index 6a87ebbb393..bc46dff20a9 100644
--- a/conf/smart-default.xml
+++ b/conf/smart-default.xml
@@ -133,6 +133,12 @@
The throughput limit (MB) for SSM copy overall
+
+ smart.action.ec.throttle.mb
+ 0
+ The throughput limit (MB) for SSM EC overall
+
+
smart.action.local.execution.disabled
false
diff --git a/smart-common/src/main/java/org/smartdata/conf/SmartConfKeys.java b/smart-common/src/main/java/org/smartdata/conf/SmartConfKeys.java
index 95c3209a52a..45b0eb48288 100644
--- a/smart-common/src/main/java/org/smartdata/conf/SmartConfKeys.java
+++ b/smart-common/src/main/java/org/smartdata/conf/SmartConfKeys.java
@@ -138,6 +138,8 @@ public class SmartConfKeys {
public static final long SMART_ACTION_MOVE_THROTTLE_MB_DEFAULT = 0L; // 0 means unlimited
public static final String SMART_ACTION_COPY_THROTTLE_MB_KEY = "smart.action.copy.throttle.mb";
public static final long SMART_ACTION_COPY_THROTTLE_MB_DEFAULT = 0L; // 0 means unlimited
+ public static final String SMART_ACTION_EC_THROTTLE_MB_KEY = "smart.action.ec.throttle.mb";
+ public static final long SMART_ACTION_EC_THROTTLE_MB_DEFAULT = 0L;
public static final String SMART_ACTION_LOCAL_EXECUTION_DISABLED_KEY =
"smart.action.local.execution.disabled";
public static final boolean SMART_ACTION_LOCAL_EXECUTION_DISABLED_DEFAULT = false;
diff --git a/smart-hadoop-support/smart-hadoop-3.1/src/main/java/org/smartdata/hdfs/action/ErasureCodingAction.java b/smart-hadoop-support/smart-hadoop-3.1/src/main/java/org/smartdata/hdfs/action/ErasureCodingAction.java
index a290bc2eaff..58df3050ac4 100644
--- a/smart-hadoop-support/smart-hadoop-3.1/src/main/java/org/smartdata/hdfs/action/ErasureCodingAction.java
+++ b/smart-hadoop-support/smart-hadoop-3.1/src/main/java/org/smartdata/hdfs/action/ErasureCodingAction.java
@@ -84,7 +84,7 @@ protected void execute() throws Exception {
final String DIR_RESULT =
"The EC policy is set successfully for the given directory.";
final String CONVERT_RESULT =
- "The file is converted successfully with the given or default ec policy.";
+ "The file is converted successfully with the given or default EC policy.";
this.setDfsClient(HadoopUtil.getDFSClient(
HadoopUtil.getNameNodeUri(conf), conf));
diff --git a/smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/scheduler/ErasureCodingScheduler.java b/smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/scheduler/ErasureCodingScheduler.java
index 06d3b673471..775935c84f1 100644
--- a/smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/scheduler/ErasureCodingScheduler.java
+++ b/smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/scheduler/ErasureCodingScheduler.java
@@ -17,11 +17,13 @@
*/
package org.smartdata.hdfs.scheduler;
+import com.google.common.util.concurrent.RateLimiter;
import org.apache.hadoop.util.VersionInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.smartdata.SmartContext;
import org.smartdata.conf.SmartConf;
+import org.smartdata.conf.SmartConfKeys;
import org.smartdata.hdfs.action.*;
import org.smartdata.metastore.MetaStore;
import org.smartdata.metastore.MetaStoreException;
@@ -48,11 +50,18 @@ public class ErasureCodingScheduler extends ActionSchedulerService {
private Set fileLock;
private SmartConf conf;
private MetaStore metaStore;
+ private long throttleInMb;
+ private RateLimiter rateLimiter;
public ErasureCodingScheduler(SmartContext context, MetaStore metaStore) {
super(context, metaStore);
this.conf = context.getConf();
this.metaStore = metaStore;
+ this.throttleInMb = conf.getLong(
+ SmartConfKeys.SMART_ACTION_EC_THROTTLE_MB_KEY, SmartConfKeys.SMART_ACTION_EC_THROTTLE_MB_DEFAULT);
+ if (this.throttleInMb > 0) {
+ this.rateLimiter = RateLimiter.create(throttleInMb);
+ }
}
public List getSupportedActions() {
@@ -112,17 +121,25 @@ public ScheduleResult onSchedule(ActionInfo actionInfo, LaunchAction action) {
return ScheduleResult.SUCCESS;
}
- // check file lock merely for ec & unec action
- if (fileLock.contains(srcPath)) {
- return ScheduleResult.FAIL;
- }
try {
- if (!metaStore.getFile(srcPath).isdir()) {
- // For ec or unec, add ecTmp argument
- String tmpName = createTmpName(action);
- action.getArgs().put(EC_TMP, EC_DIR + tmpName);
- actionInfo.getArgs().put(EC_TMP, EC_DIR + tmpName);
+ if (metaStore.getFile(srcPath).isdir()) {
+ return ScheduleResult.SUCCESS;
+ }
+ // The below code is just for ec or unec action with file as argument, not directory
+ // check file lock merely for ec & unec action
+ if (fileLock.contains(srcPath)) {
+ return ScheduleResult.FAIL;
+ }
+ if (isLimitedByThrottle(srcPath)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Failed to schedule {} due to limitation of throttle!", actionInfo);
+ }
+ return ScheduleResult.RETRY;
}
+ // For ec or unec, add ecTmp argument
+ String tmpName = createTmpName(action);
+ action.getArgs().put(EC_TMP, EC_DIR + tmpName);
+ actionInfo.getArgs().put(EC_TMP, EC_DIR + tmpName);
} catch (MetaStoreException ex) {
LOG.error("Error occurred for getting file info", ex);
actionInfo.appendLog(ex.getMessage());
@@ -159,4 +176,15 @@ public void onActionFinished(ActionInfo actionInfo) {
fileLock.remove(actionInfo.getArgs().get(HdfsAction.FILE_PATH));
}
}
+
+ public boolean isLimitedByThrottle(String srcPath) throws MetaStoreException {
+ if (this.rateLimiter == null) {
+ return false;
+ }
+ int fileLengthInMb = (int) metaStore.getFile(srcPath).getLength() >> 20;
+ if (fileLengthInMb > 0) {
+ return !rateLimiter.tryAcquire(fileLengthInMb);
+ }
+ return false;
+ }
}