Skip to content
This repository has been archived by the owner on Jan 3, 2023. It is now read-only.

Commit

Permalink
Solve #1633, Implement concurrent cmdlet dispatch (#1645)
Browse files Browse the repository at this point in the history
  • Loading branch information
littlezhou authored Mar 20, 2018
1 parent 35fd9dd commit 925436c
Show file tree
Hide file tree
Showing 4 changed files with 246 additions and 92 deletions.
6 changes: 6 additions & 0 deletions conf/smart-default.xml
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,12 @@
<description>Max number of cmdlets that can be executed in parallel</description>
</property>

<property>
<name>smart.cmdlet.dispatchers</name>
<value>3</value>
<description>Max number of cmdlet dispatchers that work in parallel</description>
</property>

<property>
<name>smart.cmdlet.mover.max.concurrent.blocks.per.srv.inst</name>
<value>0</value>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,8 @@ public class SmartConfKeys {
public static final String SMART_CMDLET_DISPATCHER_LOG_DISP_RESULT_KEY =
"smart.cmdlet.dispatcher.log.disp.result";
public static final boolean SMART_CMDLET_DISPATCHER_LOG_DISP_RESULT_DEFAULT = true;
public static final String SMART_CMDLET_DISPATCHERS_KEY = "smart.cmdlet.dispatchers";
public static final int SMART_CMDLET_DISPATCHERS_DEFAULT = 3;

// Action
public static final String SMART_ACTION_MOVE_THROTTLE_MB_KEY = "smart.action.move.throttle.mb";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.slf4j.LoggerFactory;
import org.smartdata.SmartContext;
import org.smartdata.action.ActionException;
import org.smartdata.conf.SmartConf;
import org.smartdata.conf.SmartConfKeys;
import org.smartdata.model.CmdletState;
import org.smartdata.model.ExecutorType;
Expand All @@ -41,6 +42,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

public class CmdletDispatcher {
private static final Logger LOG = LoggerFactory.getLogger(CmdletDispatcher.class);
Expand All @@ -55,14 +57,17 @@ public class CmdletDispatcher {
private CmdletExecutorService[] cmdExecServices;
private int[] cmdExecSrvInsts;
private int cmdExecSrvTotalInsts;
private int[] cmdExecSrvInstsSlotsLeft;
private int[] execSrvSlotsLeft;
private AtomicInteger totalSlotsLeft = new AtomicInteger();

private Map<Long, ExecutorType> dispatchedToSrvs;
private boolean disableLocalExec;
private boolean logDispResult;
private DispatchTask[] dispatchTasks;

// TODO: to be refined
private final int defaultSlots;
private int index;
private AtomicInteger index = new AtomicInteger(0);

private final ExecutorType[] preferLocalTryList = new ExecutorType[]
{ExecutorType.LOCAL, ExecutorType.REMOTE_SSM, ExecutorType.AGENT};
Expand All @@ -89,8 +94,8 @@ public CmdletDispatcher(SmartContext smartContext, CmdletManager cmdletManager,

this.cmdExecServices = new CmdletExecutorService[ExecutorType.values().length];
cmdExecSrvInsts = new int[ExecutorType.values().length];
execSrvSlotsLeft = new int[ExecutorType.values().length];
cmdExecSrvTotalInsts = 0;
cmdExecSrvInstsSlotsLeft = new int[ExecutorType.values().length];
dispatchedToSrvs = new ConcurrentHashMap<>();

disableLocalExec = smartContext.getConf().getBoolean(
Expand All @@ -101,12 +106,18 @@ public CmdletDispatcher(SmartContext smartContext, CmdletManager cmdletManager,
if (!disableLocalExec) {
registerExecutorService(exe);
}
this.index = 0;

schExecService = Executors.newScheduledThreadPool(1);
logDispResult = smartContext.getConf().getBoolean(
SmartConf conf = smartContext.getConf();
logDispResult = conf.getBoolean(
SmartConfKeys.SMART_CMDLET_DISPATCHER_LOG_DISP_RESULT_KEY,
SmartConfKeys.SMART_CMDLET_DISPATCHER_LOG_DISP_RESULT_DEFAULT);
int numDisp = conf.getInt(SmartConfKeys.SMART_CMDLET_DISPATCHERS_KEY,
SmartConfKeys.SMART_CMDLET_DISPATCHERS_DEFAULT);
dispatchTasks = new DispatchTask[numDisp];
for (int i = 0; i < numDisp; i++) {
dispatchTasks[i] = new DispatchTask(this);
}
schExecService = Executors.newScheduledThreadPool(numDisp + 1);
}

public void registerExecutorService(CmdletExecutorService executorService) {
Expand All @@ -118,34 +129,32 @@ public boolean canDispatchMore() {
}

public boolean dispatch(LaunchCmdlet cmdlet) {
CmdletDispatchPolicy policy = cmdlet.getDispPolicy();
if (policy == CmdletDispatchPolicy.ANY) {
policy = getRoundrobinDispatchPolicy();
int mod = index.incrementAndGet() % cmdExecSrvTotalInsts;
int idx = 0;

for (int nround = 0; nround < 2 && mod >= 0; nround++) {
for (idx = 0; idx < cmdExecSrvInsts.length; idx++) {
mod -= cmdExecSrvInsts[idx];
if (mod < 0) {
break;
}
}
try {
Thread.sleep(10);
} catch (InterruptedException e) {
// ignore
}
}
index++;
ExecutorType[] tryOrder;
switch (policy) {
case PREFER_LOCAL:
tryOrder = preferLocalTryList;
break;

case PREFER_REMOTE_SSM:
tryOrder = preferRemoteSsmTryList;
break;

case PREFER_AGENT:
tryOrder = preferAgentTryList;
break;

default:
LOG.error("Unknown cmdlet dispatch policy. " + cmdlet);
return false;
if (mod >= 0) {
return false;
}

CmdletExecutorService selected = null;
for (ExecutorType etTry : tryOrder) {
if (cmdExecServices[etTry.ordinal()] != null && executorSlotAvaliable(etTry)) {
selected = cmdExecServices[etTry.ordinal()];
for (int i = 0; i < ExecutorType.values().length; i++) {
idx = idx % ExecutorType.values().length;
if (execSrvSlotsLeft[idx] > 0) {
selected = cmdExecServices[idx];
break;
}
}
Expand All @@ -159,7 +168,7 @@ public boolean dispatch(LaunchCmdlet cmdlet) {

String id = selected.execute(cmdlet);

updateSlotsLeft(selected.getExecutorType().ordinal(), -1);
execSrvSlotsLeft[selected.getExecutorType().ordinal()] -= 1;
dispatchedToSrvs.put(cmdlet.getCmdletId(), selected.getExecutorType());

if (logDispResult) {
Expand All @@ -171,12 +180,8 @@ public boolean dispatch(LaunchCmdlet cmdlet) {
return true;
}

private boolean executorSlotAvaliable(ExecutorType executorType) {
return cmdExecSrvInstsSlotsLeft[executorType.ordinal()] > 0;
}

private CmdletDispatchPolicy getRoundrobinDispatchPolicy() {
int rev = index % cmdExecSrvTotalInsts;
int rev = index.get() % cmdExecSrvTotalInsts;
for (int i = 0; i < cmdExecSrvInsts.length; i++) {
if (cmdExecSrvInsts[i] > 0 && rev < cmdExecSrvInsts[i]) {
return roundRobinPolicies[i];
Expand Down Expand Up @@ -234,42 +239,29 @@ private void updateCmdActionStatus(LaunchCmdlet cmdlet) {

private class DispatchTask implements Runnable {
private final CmdletDispatcher dispatcher;
private long lastInfo = System.currentTimeMillis();
private int statRound = 0;
private int statFail = 0;
private int statDispatched = 0;
private int statNoMoreCmdlet = 0;
private int statFull = 0;
private long lastReportNoExecutor = 0;

public DispatchTask(CmdletDispatcher dispatcher) {
this.dispatcher = dispatcher;
}

public CmdletDispatcherStat getStat() {
CmdletDispatcherStat stat = new CmdletDispatcherStat(statRound, statFail,
statDispatched, statNoMoreCmdlet, statFull);
statRound = 0;
statFail = 0;
statDispatched = 0;
statFull = 0;
statNoMoreCmdlet = 0;
return stat;
}

@Override
public void run() {
long curr = System.currentTimeMillis();
if (curr - lastInfo >= 5000) {
if (!(statDispatched == 0 && statRound == statNoMoreCmdlet)) {
if (cmdExecSrvTotalInsts != 0 || statFull != 0) {
LOG.info("timeInterval={} statRound={} statFail={} statDispatched={} "
+ "statNoMoreCmdlet={} statFull={} pendingCmdlets={} numExecutor={}",
curr - lastInfo, statRound, statFail, statDispatched, statNoMoreCmdlet,
statFull, pendingCmdlets.size(), cmdExecSrvTotalInsts);
} else {
if (curr - lastReportNoExecutor >= 600 * 1000L) {
LOG.info("No cmdlet executor. pendingCmdlets={}", pendingCmdlets.size());
lastReportNoExecutor = curr;
}
}
}
statRound = 0;
statFail = 0;
statDispatched = 0;
statFull = 0;
statNoMoreCmdlet = 0;
lastInfo = curr;
}
statRound++;

if (cmdExecSrvTotalInsts == 0) {
Expand All @@ -282,31 +274,69 @@ public void run() {
}

LaunchCmdlet launchCmdlet = null;
try {
while (dispatcher.canDispatchMore()) {
try {
launchCmdlet = getNextCmdletToRun();
if (launchCmdlet == null) {
statNoMoreCmdlet++;
break;
} else {
cmdletPreExecutionProcess(launchCmdlet);
if (!dispatcher.dispatch(launchCmdlet)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Stop this round dispatch due : " + launchCmdlet);
}
statFail++;
break;
boolean disped;
while (resvExecSlot()) {
disped = false;
try {
launchCmdlet = getNextCmdletToRun();
if (launchCmdlet == null) {
statNoMoreCmdlet++;
break;
} else {
cmdletPreExecutionProcess(launchCmdlet);
if (!dispatcher.dispatch(launchCmdlet)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Stop this round dispatch due : " + launchCmdlet);
}
statDispatched++;
statFail++;
break;
}
} catch (IOException e) {
LOG.error("Cmdlet dispatcher error", e);
disped = true;
statDispatched++;
}
} catch (Throwable t) {
LOG.error("Cmdlet dispatcher error", t);
} finally {
if (!disped) {
freeExecSlot();
}
}
}
}
}

private class LogStatTask implements Runnable {
public DispatchTask[] tasks;
private long lastReportNoExecutor = 0;
private long lastInfo = System.currentTimeMillis();

public LogStatTask(DispatchTask[] tasks) {
this.tasks = tasks;
}

@Override
public void run() {
long curr = System.currentTimeMillis();
CmdletDispatcherStat stat = new CmdletDispatcherStat();
for (DispatchTask task : tasks) {
stat.add(task.getStat());
}

if (!(stat.getStatDispatched() == 0 && stat.getStatRound() == stat.getStatNoMoreCmdlet())) {
if (cmdExecSrvTotalInsts != 0 || stat.getStatFull() != 0) {
LOG.info("timeInterval={} statRound={} statFail={} statDispatched={} "
+ "statNoMoreCmdlet={} statFull={} pendingCmdlets={} numExecutor={}",
curr - lastInfo, stat.getStatRound(), stat.getStatFail(), stat.getStatDispatched(),
stat.getStatNoMoreCmdlet(), stat.getStatFull(), pendingCmdlets.size(),
cmdExecSrvTotalInsts);
} else {
if (curr - lastReportNoExecutor >= 600 * 1000L) {
LOG.info("No cmdlet executor. pendingCmdlets={}", pendingCmdlets.size());
lastReportNoExecutor = curr;
}
}
} catch (Throwable t) {
LOG.error("Dispatch {} error", launchCmdlet, t);
}
lastInfo = System.currentTimeMillis();
}
}

Expand Down Expand Up @@ -341,21 +371,25 @@ public void onNodeMessage(NodeMessage msg, boolean isAdd) {
LOG.info(String.format("Node " + msg.getNodeInfo() + (isAdd ? " added." : " removed.")));
}

private int updateSlotsLeft(int index, int delta) {
synchronized (cmdExecSrvInstsSlotsLeft) {
cmdExecSrvInstsSlotsLeft[index] += delta;
return cmdExecSrvInstsSlotsLeft[index];
}
private void updateSlotsLeft(int idx, int delta) {
execSrvSlotsLeft[idx] += delta;
totalSlotsLeft.addAndGet(delta);
}

public int getTotalSlotsLeft() {
synchronized (cmdExecSrvInstsSlotsLeft) {
int total = 0;
for (int i : cmdExecSrvInstsSlotsLeft) {
total += i;
}
return total;
return totalSlotsLeft.get();
}

public boolean resvExecSlot() {
if (totalSlotsLeft.decrementAndGet() >= 0) {
return true;
}
totalSlotsLeft.incrementAndGet();
return false;
}

public void freeExecSlot() {
totalSlotsLeft.incrementAndGet();
}

public int getTotalSlots() {
Expand All @@ -364,8 +398,14 @@ public int getTotalSlots() {

public void start() {
CmdletDispatcherHelper.getInst().register(this);
schExecService.scheduleAtFixedRate(
new DispatchTask(this), 200, 100, TimeUnit.MILLISECONDS);
int idx = 0;
for (DispatchTask task : dispatchTasks) {
schExecService.scheduleAtFixedRate(task, idx * 200 / dispatchTasks.length,
100, TimeUnit.MILLISECONDS);
idx++;
}
schExecService.scheduleAtFixedRate(new LogStatTask(dispatchTasks),
5000, 5000, TimeUnit.MILLISECONDS);
}

public void stop() {
Expand Down
Loading

0 comments on commit 925436c

Please sign in to comment.