Skip to content
This repository has been archived by the owner on Jan 3, 2023. It is now read-only.

Solve #1681, Data throttling for file copy action #1683

Merged
merged 1 commit into from
Apr 19, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,8 @@ public class SmartConfKeys {
// Action
public static final String SMART_ACTION_MOVE_THROTTLE_MB_KEY = "smart.action.move.throttle.mb";
public static final long SMART_ACTION_MOVE_THROTTLE_MB_DEFAULT = 0L; // 0 means unlimited
public static final String SMART_ACTION_COPY_THROTTLE_MB_KEY = "smart.action.copy.throttle.mb";
public static final long SMART_ACTION_COPY_THROTTLE_MB_DEFAULT = 0L; // 0 means unlimited
public static final String SMART_ACTION_LOCAL_EXECUTION_DISABLED_KEY =
"smart.action.local.execution.disabled";
public static final boolean SMART_ACTION_LOCAL_EXECUTION_DISABLED_DEFAULT = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.smartdata.hdfs.scheduler;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.RateLimiter;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
Expand Down Expand Up @@ -90,6 +91,9 @@ public class CopyScheduler extends ActionSchedulerService {
private int cacheSyncTh = 100;
// record the file_diff whether being changed
private Map<Long, Boolean> fileDiffCacheChanged;
// throttle for copy action
private long throttleInMb;
private RateLimiter rateLimiter = null;


public CopyScheduler(SmartContext context, MetaStore metaStore) {
Expand All @@ -113,6 +117,12 @@ public CopyScheduler(SmartContext context, MetaStore metaStore) {
}
this.fileDiffCache = new ConcurrentHashMap<>();
this.fileDiffCacheChanged = new ConcurrentHashMap<>();

throttleInMb = conf.getLong(SmartConfKeys.SMART_ACTION_COPY_THROTTLE_MB_KEY,
SmartConfKeys.SMART_ACTION_COPY_THROTTLE_MB_DEFAULT);
if (throttleInMb > 0) {
rateLimiter = RateLimiter.create(throttleInMb);
}
}

@Override
Expand Down Expand Up @@ -143,6 +153,20 @@ public ScheduleResult onSchedule(ActionInfo actionInfo, LaunchAction action) {
case APPEND:
action.setActionType("copy");
action.getArgs().put("-dest", path.replace(srcDir, destDir));
if (rateLimiter != null) {
String strLen = fileDiff.getParameters().get("-length");
if (strLen != null) {
int appendLen = (int)(Long.valueOf(strLen) >> 20);
if (appendLen > 0) {
if (!rateLimiter.tryAcquire(appendLen)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Cancel Scheduling COPY action {} due to throttling.", actionInfo);
}
return ScheduleResult.RETRY;
}
}
}
}
break;
case DELETE:
action.setActionType("delete");
Expand Down