Skip to content
This repository has been archived by the owner on Jan 3, 2023. It is now read-only.

Commit

Permalink
Add a throttle on EC to avoid IO overload (#1947)
Browse files Browse the repository at this point in the history
  • Loading branch information
PHILO-HE authored Sep 26, 2018
1 parent 14232ec commit 53ab86b
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 10 deletions.
6 changes: 6 additions & 0 deletions conf/smart-default.xml
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,12 @@
<description>The throughput limit (MB) for SSM copy overall</description>
</property>

<property>
<name>smart.action.ec.throttle.mb</name>
<value>0</value>
<description>The throughput limit (MB) for SSM EC overall</description>
</property>

<property>
<name>smart.action.local.execution.disabled</name>
<value>false</value>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,8 @@ public class SmartConfKeys {
public static final long SMART_ACTION_MOVE_THROTTLE_MB_DEFAULT = 0L; // 0 means unlimited
public static final String SMART_ACTION_COPY_THROTTLE_MB_KEY = "smart.action.copy.throttle.mb";
public static final long SMART_ACTION_COPY_THROTTLE_MB_DEFAULT = 0L; // 0 means unlimited
public static final String SMART_ACTION_EC_THROTTLE_MB_KEY = "smart.action.ec.throttle.mb";
public static final long SMART_ACTION_EC_THROTTLE_MB_DEFAULT = 0L;
public static final String SMART_ACTION_LOCAL_EXECUTION_DISABLED_KEY =
"smart.action.local.execution.disabled";
public static final boolean SMART_ACTION_LOCAL_EXECUTION_DISABLED_DEFAULT = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ protected void execute() throws Exception {
final String DIR_RESULT =
"The EC policy is set successfully for the given directory.";
final String CONVERT_RESULT =
"The file is converted successfully with the given or default ec policy.";
"The file is converted successfully with the given or default EC policy.";

this.setDfsClient(HadoopUtil.getDFSClient(
HadoopUtil.getNameNodeUri(conf), conf));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@
*/
package org.smartdata.hdfs.scheduler;

import com.google.common.util.concurrent.RateLimiter;
import org.apache.hadoop.util.VersionInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.smartdata.SmartContext;
import org.smartdata.conf.SmartConf;
import org.smartdata.conf.SmartConfKeys;
import org.smartdata.hdfs.action.*;
import org.smartdata.metastore.MetaStore;
import org.smartdata.metastore.MetaStoreException;
Expand All @@ -48,11 +50,18 @@ public class ErasureCodingScheduler extends ActionSchedulerService {
private Set<String> fileLock;
private SmartConf conf;
private MetaStore metaStore;
private long throttleInMb;
private RateLimiter rateLimiter;

public ErasureCodingScheduler(SmartContext context, MetaStore metaStore) {
super(context, metaStore);
this.conf = context.getConf();
this.metaStore = metaStore;
this.throttleInMb = conf.getLong(
SmartConfKeys.SMART_ACTION_EC_THROTTLE_MB_KEY, SmartConfKeys.SMART_ACTION_EC_THROTTLE_MB_DEFAULT);
if (this.throttleInMb > 0) {
this.rateLimiter = RateLimiter.create(throttleInMb);
}
}

public List<String> getSupportedActions() {
Expand Down Expand Up @@ -112,17 +121,25 @@ public ScheduleResult onSchedule(ActionInfo actionInfo, LaunchAction action) {
return ScheduleResult.SUCCESS;
}

// check file lock merely for ec & unec action
if (fileLock.contains(srcPath)) {
return ScheduleResult.FAIL;
}
try {
if (!metaStore.getFile(srcPath).isdir()) {
// For ec or unec, add ecTmp argument
String tmpName = createTmpName(action);
action.getArgs().put(EC_TMP, EC_DIR + tmpName);
actionInfo.getArgs().put(EC_TMP, EC_DIR + tmpName);
if (metaStore.getFile(srcPath).isdir()) {
return ScheduleResult.SUCCESS;
}
// The below code is just for ec or unec action with file as argument, not directory
// check file lock merely for ec & unec action
if (fileLock.contains(srcPath)) {
return ScheduleResult.FAIL;
}
if (isLimitedByThrottle(srcPath)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Failed to schedule {} due to limitation of throttle!", actionInfo);
}
return ScheduleResult.RETRY;
}
// For ec or unec, add ecTmp argument
String tmpName = createTmpName(action);
action.getArgs().put(EC_TMP, EC_DIR + tmpName);
actionInfo.getArgs().put(EC_TMP, EC_DIR + tmpName);
} catch (MetaStoreException ex) {
LOG.error("Error occurred for getting file info", ex);
actionInfo.appendLog(ex.getMessage());
Expand Down Expand Up @@ -159,4 +176,15 @@ public void onActionFinished(ActionInfo actionInfo) {
fileLock.remove(actionInfo.getArgs().get(HdfsAction.FILE_PATH));
}
}

public boolean isLimitedByThrottle(String srcPath) throws MetaStoreException {
if (this.rateLimiter == null) {
return false;
}
int fileLengthInMb = (int) metaStore.getFile(srcPath).getLength() >> 20;
if (fileLengthInMb > 0) {
return !rateLimiter.tryAcquire(fileLengthInMb);
}
return false;
}
}

0 comments on commit 53ab86b

Please sign in to comment.