diff --git a/smart-common/src/main/java/org/smartdata/model/CmdletState.java b/smart-common/src/main/java/org/smartdata/model/CmdletState.java index c7a65dfb31b..5b35eef1b6f 100644 --- a/smart-common/src/main/java/org/smartdata/model/CmdletState.java +++ b/smart-common/src/main/java/org/smartdata/model/CmdletState.java @@ -22,14 +22,15 @@ */ public enum CmdletState { NOTINITED(0), - PENDING(1), // Ready for execution + PENDING(1), // Ready for schedule EXECUTING(2), // Still running PAUSED(3), DONE(4), // Execution successful CANCELLED(5), DISABLED(6), // Disable this Cmdlet, kill all executing actions - DRYRUN(7), // TODO Don't Run, but keep status - FAILED(8); // Running cmdlet failed + DRYRUN(7), // TODO Don't Run, but keep status + FAILED(8), // Running cmdlet failed + SCHEDULED(9); private int value; diff --git a/smart-common/src/main/java/org/smartdata/model/action/ActionScheduler.java b/smart-common/src/main/java/org/smartdata/model/action/ActionScheduler.java new file mode 100644 index 00000000000..31df8e0f757 --- /dev/null +++ b/smart-common/src/main/java/org/smartdata/model/action/ActionScheduler.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.smartdata.model.action; + +import org.smartdata.model.ActionInfo; +import org.smartdata.model.LaunchAction; + +import java.util.List; + +public interface ActionScheduler { + + List getSupportedActions(); + + /** + * Called when new action submitted to CmdletManager. + * + * @param actionInfo + * @return acceptable if true, or discard + */ + boolean onSubmit(ActionInfo actionInfo); + + /** + * Trying to schedule an action for Dispatch. + * @param actionInfo + * @param action + * @return + */ + ScheduleResult onSchedule(ActionInfo actionInfo, LaunchAction action); + + /** + * Called after and an Cmdlet get scheduled. + * + * @param actionInfo + * @param result + */ + void postSchedule(ActionInfo actionInfo, ScheduleResult result); + + /** + * Called just before dispatch for execution. + * + * @param action + */ + void onPreDispatch(LaunchAction action); + + /** + * Called when action finished execution. + * + * @param actionInfo + */ + void onActionFinished(ActionInfo actionInfo); +} diff --git a/smart-common/src/main/java/org/smartdata/model/action/CmdletScheduler.java b/smart-common/src/main/java/org/smartdata/model/action/CmdletScheduler.java new file mode 100644 index 00000000000..9763eec688e --- /dev/null +++ b/smart-common/src/main/java/org/smartdata/model/action/CmdletScheduler.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.smartdata.model.action; + +// TODO: to be implemented +public interface CmdletScheduler { + + boolean onSubmitCmdlet(); + boolean postSubmitCmdlet(); + + ScheduleResult onSchedule(); + void postSchedule(); + + void onDispatch(); + + void onActionMessage(); + void onCmdletFinished(); +} diff --git a/smart-common/src/main/java/org/smartdata/model/action/ActionPreProcessor.java b/smart-common/src/main/java/org/smartdata/model/action/ScheduleResult.java similarity index 78% rename from smart-common/src/main/java/org/smartdata/model/action/ActionPreProcessor.java rename to smart-common/src/main/java/org/smartdata/model/action/ScheduleResult.java index 9c6bc79019c..a4ec0253a27 100644 --- a/smart-common/src/main/java/org/smartdata/model/action/ActionPreProcessor.java +++ b/smart-common/src/main/java/org/smartdata/model/action/ScheduleResult.java @@ -17,15 +17,8 @@ */ package org.smartdata.model.action; -import org.smartdata.model.LaunchAction; - -import java.util.List; - -public interface ActionPreProcessor { - - List getSupportedActions(); - - void beforeExecution(LaunchAction action); - - void afterExecution(LaunchAction action); +public enum ScheduleResult { + SUCCESS, // OK for dispatch + RETRY, // Need re-schedule later + FAIL } diff --git a/smart-engine/src/main/java/org/smartdata/server/engine/CmdletManager.java b/smart-engine/src/main/java/org/smartdata/server/engine/CmdletManager.java index b15b6726f40..2b60173f0b2 100644 --- a/smart-engine/src/main/java/org/smartdata/server/engine/CmdletManager.java +++ b/smart-engine/src/main/java/org/smartdata/server/engine/CmdletManager.java @@ -34,6 +34,7 @@ import org.smartdata.model.CmdletDescriptor; import org.smartdata.model.CmdletInfo; import org.smartdata.model.CmdletState; +import org.smartdata.model.action.ScheduleResult; import org.smartdata.protocol.message.ActionFinished; import org.smartdata.protocol.message.ActionStarted; import org.smartdata.protocol.message.ActionStatus; @@ -43,7 +44,7 @@ import org.smartdata.server.engine.cmdlet.CmdletDispatcher; import org.smartdata.server.engine.cmdlet.CmdletExecutorService; import org.smartdata.metastore.ActionSchedulerService; -import org.smartdata.model.action.ActionPreProcessor; +import org.smartdata.model.action.ActionScheduler; import org.smartdata.model.LaunchAction; import org.smartdata.server.engine.cmdlet.message.LaunchCmdlet; @@ -52,6 +53,7 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Queue; @@ -79,13 +81,16 @@ public class CmdletManager extends AbstractService { private AtomicLong maxActionId; private AtomicLong maxCmdletId; - private Queue pendingCmdlet; + private List pendingCmdlet; + private List schedulingCmdlet; + private Queue scheduledCmdlet; + private Map idToLaunchCmdlet; private List runningCmdlets; private Map idToCmdlets; private Map idToActions; private Map fileLocks; - private ListMultimap preExecuteProcessor = ArrayListMultimap.create(); - private List preProcessServices = new ArrayList<>(); + private ListMultimap schedulers = ArrayListMultimap.create(); + private List schedulerServices = new ArrayList<>(); public CmdletManager(ServerContext context) { super(context); @@ -94,7 +99,10 @@ public CmdletManager(ServerContext context) { this.executorService = Executors.newSingleThreadScheduledExecutor(); this.dispatcher = new CmdletDispatcher(context, this); this.runningCmdlets = new ArrayList<>(); - this.pendingCmdlet = new LinkedBlockingQueue<>(); + this.pendingCmdlet = new LinkedList<>(); + this.schedulingCmdlet = new LinkedList<>(); + this.scheduledCmdlet = new LinkedBlockingQueue<>(); + this.idToLaunchCmdlet = new HashMap<>(); this.idToCmdlets = new ConcurrentHashMap<>(); this.idToActions = new ConcurrentHashMap<>(); this.fileLocks = new ConcurrentHashMap<>(); @@ -111,14 +119,14 @@ public void init() throws IOException { maxActionId = new AtomicLong(metaStore.getMaxActionId()); maxCmdletId = new AtomicLong(metaStore.getMaxCmdletId()); - preProcessServices = AbstractServiceFactory.createActionSchedulerServices( + schedulerServices = AbstractServiceFactory.createActionSchedulerServices( getContext().getConf(), getContext(), metaStore, false); - for (ActionSchedulerService s : preProcessServices) { + for (ActionSchedulerService s : schedulerServices) { s.init(); List actions = s.getSupportedActions(); for (String a : actions) { - preExecuteProcessor.put(a, s); + schedulers.put(a, s); } } } catch (Exception e) { @@ -131,15 +139,15 @@ public void init() throws IOException { public void start() throws IOException { executorService.scheduleAtFixedRate( new ScheduleTask(this.dispatcher), 1000, 1000, TimeUnit.MILLISECONDS); - for (ActionSchedulerService s : preProcessServices) { + for (ActionSchedulerService s : schedulerServices) { s.start(); } } @Override public void stop() throws IOException { - for (int i = preProcessServices.size() - 1; i >=0 ; i--) { - preProcessServices.get(i).stop(); + for (int i = schedulerServices.size() - 1; i >=0 ; i--) { + schedulerServices.get(i).stop(); } executorService.shutdown(); dispatcher.shutDownExcutorServices(); @@ -199,11 +207,14 @@ public long submitCmdlet(CmdletDescriptor cmdletDescriptor) throws IOException { } throw new IOException(e); } - pendingCmdlet.add(cmdletInfo); - idToCmdlets.put(cmdletInfo.getCid(), cmdletInfo); + for (ActionInfo actionInfo : actionInfos) { idToActions.put(actionInfo.getActionId(), actionInfo); } + idToCmdlets.put(cmdletInfo.getCid(), cmdletInfo); + synchronized (pendingCmdlet) { + pendingCmdlet.add(cmdletInfo.getCid()); + } return cmdletInfo.getCid(); } @@ -238,8 +249,103 @@ private synchronized Set lockMovefileActionFiles(List action return filesToLock.keySet(); } - public LaunchCmdlet getNextCmdletToRun() throws IOException { - CmdletInfo cmdletInfo = pendingCmdlet.poll(); + public void scheduleCmdlet() throws IOException { + int maxScheduled = 10; + + synchronized (pendingCmdlet) { + if (pendingCmdlet.size() > 0) { + schedulingCmdlet.addAll(pendingCmdlet); + pendingCmdlet.clear(); + } + } + + Iterator it = schedulingCmdlet.iterator(); + while (maxScheduled > 0 && it.hasNext()) { + long id = it.next(); + CmdletInfo cmdlet = idToCmdlets.get(id); + synchronized (cmdlet) { + switch (cmdlet.getState()) { + case DISABLED: + it.remove(); + break; + + case PENDING: + LaunchCmdlet launchCmdlet = createLaunchCmdlet(cmdlet); + ScheduleResult result = scheduleCmdletActions(cmdlet, launchCmdlet); + if (result != ScheduleResult.RETRY) { + it.remove(); + } + if (result == ScheduleResult.SUCCESS) { + idToLaunchCmdlet.put(cmdlet.getCid(), launchCmdlet); + cmdlet.setState(CmdletState.SCHEDULED); + scheduledCmdlet.add(id); + } + maxScheduled--; + break; + } + } + } + } + + private ScheduleResult scheduleCmdletActions(CmdletInfo info, LaunchCmdlet launchCmdlet) { + List actIds = info.getAids(); + int idx = 0; + int schIdx = 0; + ActionInfo actionInfo; + LaunchAction launchAction; + List actSchedulers; + ScheduleResult scheduleResult = ScheduleResult.SUCCESS; + for (idx = 0; idx < actIds.size(); idx++) { + actionInfo = idToActions.get(actIds.get(idx)); + launchAction = launchCmdlet.getLaunchActions().get(idx); + actSchedulers = schedulers.get(actionInfo.getActionName()); + if (actSchedulers == null || actSchedulers.size() == 0) { + continue; + } + + for (schIdx = 0; schIdx < actSchedulers.size(); schIdx++) { + ActionScheduler s = actSchedulers.get(schIdx); + scheduleResult = s.onSchedule(actionInfo, launchAction); + if (scheduleResult != ScheduleResult.SUCCESS) { + break; + } + } + + if (scheduleResult != ScheduleResult.SUCCESS) { + break; + } + } + + if (scheduleResult == ScheduleResult.SUCCESS) { + idx--; + schIdx--; + } + postscheduleCmdletActions(actIds, scheduleResult, idx, schIdx); + return scheduleResult; + } + + private void postscheduleCmdletActions(List actions, ScheduleResult result, + int lastAction, int lastScheduler) { + List actSchedulers; + for (int aidx = lastAction; aidx >= 0 ; aidx--) { + ActionInfo info = idToActions.get(actions.get(aidx)); + actSchedulers = schedulers.get(info.getActionName()); + if (actSchedulers == null || actSchedulers.size() == 0) { + continue; + } + if (lastScheduler < 0) { + lastScheduler = actSchedulers.size() - 1; + } + + for (int sidx = lastScheduler; sidx >= 0; sidx--) { + actSchedulers.get(sidx).postSchedule(info, result); + } + + lastScheduler = -1; + } + } + + private LaunchCmdlet createLaunchCmdlet(CmdletInfo cmdletInfo) { if (cmdletInfo == null) { return null; } @@ -251,10 +357,26 @@ public LaunchCmdlet getNextCmdletToRun() throws IOException { new LaunchAction(toLaunch.getActionId(), toLaunch.getActionName(), toLaunch.getArgs())); } } - runningCmdlets.add(cmdletInfo.getCid()); return new LaunchCmdlet(cmdletInfo.getCid(), launchActions); } + public LaunchCmdlet getNextCmdletToRun() throws IOException { + if (scheduledCmdlet.size() == 0) { + scheduleCmdlet(); + } + Long cmdletId = scheduledCmdlet.poll(); + if (cmdletId == null) { + return null; + } + CmdletInfo cmdletInfo = idToCmdlets.get(cmdletId); + if (cmdletInfo == null) { + return null; + } + LaunchCmdlet launchCmdlet = idToLaunchCmdlet.get(cmdletId); + runningCmdlets.add(cmdletInfo.getCid()); + return launchCmdlet; + } + public CmdletInfo getCmdletInfo(long cid) throws IOException { if (idToCmdlets.containsKey(cid)) { return idToCmdlets.get(cid); @@ -313,11 +435,20 @@ public void activateCmdlet(long cid) throws IOException { public void disableCmdlet(long cid) throws IOException { if (idToCmdlets.containsKey(cid)) { CmdletInfo info = idToCmdlets.get(cid); - if (pendingCmdlet.contains(info)) { - pendingCmdlet.remove(info); + synchronized (info) { info.setState(CmdletState.DISABLED); - this.cmdletFinished(cid); } + synchronized (pendingCmdlet) { + if (pendingCmdlet.contains(cid)) { + pendingCmdlet.remove(cid); + this.cmdletFinished(cid); + } + } + + if (scheduledCmdlet.contains(cid)) { + scheduledCmdlet.remove(cid); + } + // Wait status update from status reporter, so need to update to MetaStore if (runningCmdlets.contains(cid)) { dispatcher.stop(cid); @@ -345,6 +476,7 @@ private void cmdletFinished(long cmdletId) throws IOException { unLockFileIfNeeded(actionInfo); } flushActionInfos(removed); + idToLaunchCmdlet.remove(cmdletId); } private void unLockFileIfNeeded(ActionInfo actionInfo) { @@ -485,11 +617,11 @@ private void onActionStarted(ActionStarted started) { private void onActionFinished(ActionFinished finished) throws IOException, ActionException { if (idToActions.containsKey(finished.getActionId())) { ActionInfo actionInfo = idToActions.get(finished.getActionId()); + actionInfo.setProgress(1.0F); actionInfo.setFinished(true); actionInfo.setFinishTime(finished.getTimestamp()); actionInfo.setResult(finished.getResult()); actionInfo.setLog(finished.getLog()); - actionInfo.setProgress(1.0F); unLockFileIfNeeded(actionInfo); if (finished.getThrowable() != null) { actionInfo.setSuccessful(false); @@ -587,8 +719,8 @@ public void run() { public void cmdletPreExecutionProcess(LaunchCmdlet cmdlet) { for (LaunchAction action : cmdlet.getLaunchActions()) { - for (ActionPreProcessor p : preExecuteProcessor.get(action.getActionType())) { - p.beforeExecution(action); + for (ActionScheduler p : schedulers.get(action.getActionType())) { + p.onPreDispatch(action); } } } diff --git a/smart-engine/src/main/java/org/smartdata/server/engine/cmdlet/CmdletDispatcher.java b/smart-engine/src/main/java/org/smartdata/server/engine/cmdlet/CmdletDispatcher.java index 1be979195ba..d24f5a421ee 100644 --- a/smart-engine/src/main/java/org/smartdata/server/engine/cmdlet/CmdletDispatcher.java +++ b/smart-engine/src/main/java/org/smartdata/server/engine/cmdlet/CmdletDispatcher.java @@ -26,7 +26,7 @@ import java.util.ArrayList; import java.util.List; -//Todo: extract the schedule implementation +//Todo: extract the onSchedule implementation public class CmdletDispatcher { private Logger LOG = LoggerFactory.getLogger(CmdletDispatcher.class); private List executorServices; diff --git a/smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/scheduler/MoverPreProcessService.java b/smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/scheduler/MoverPreProcessService.java index 07538eccc21..bc7b64fcf7f 100644 --- a/smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/scheduler/MoverPreProcessService.java +++ b/smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/scheduler/MoverPreProcessService.java @@ -30,8 +30,10 @@ import org.smartdata.hdfs.metric.fetcher.MoverProcessor; import org.smartdata.metastore.ActionSchedulerService; import org.smartdata.metastore.MetaStore; +import org.smartdata.model.ActionInfo; import org.smartdata.model.LaunchAction; import org.smartdata.model.action.FileMovePlan; +import org.smartdata.model.action.ScheduleResult; import java.io.IOException; import java.net.URI; @@ -98,14 +100,14 @@ public List getSupportedActions() { return actions; } - public void beforeExecution(LaunchAction action) { + public ScheduleResult onSchedule(ActionInfo actionInfo, LaunchAction action) { if (!actions.contains(action.getActionType())) { - return; + return ScheduleResult.SUCCESS; } String file = action.getArgs().get(HdfsAction.FILE_PATH); if (file == null) { - return; + return ScheduleResult.FAIL; } String policy = null; @@ -126,12 +128,24 @@ public void beforeExecution(LaunchAction action) { FileMovePlan plan = processor.processNamespace(new Path(file)); plan.setNamenode(nnUri); action.getArgs().put(MoveFileAction.MOVE_PLAN, plan.toString()); + return ScheduleResult.SUCCESS; } catch (IOException e) { LOG.error("Exception while processing " + action, e); + return ScheduleResult.FAIL; } } - public void afterExecution(LaunchAction action) { + public void postSchedule(ActionInfo actionInfo, ScheduleResult result) { + } + + public void onPreDispatch(LaunchAction action) { + } + + public boolean onSubmit(ActionInfo actionInfo) { + return true; + } + + public void onActionFinished(ActionInfo actionInfo) { } diff --git a/smart-metastore/src/main/java/org/smartdata/metastore/ActionSchedulerService.java b/smart-metastore/src/main/java/org/smartdata/metastore/ActionSchedulerService.java index 68bed5755c6..83f70f04ab6 100644 --- a/smart-metastore/src/main/java/org/smartdata/metastore/ActionSchedulerService.java +++ b/smart-metastore/src/main/java/org/smartdata/metastore/ActionSchedulerService.java @@ -19,9 +19,9 @@ import org.smartdata.AbstractService; import org.smartdata.SmartContext; -import org.smartdata.model.action.ActionPreProcessor; +import org.smartdata.model.action.ActionScheduler; -public abstract class ActionSchedulerService extends AbstractService implements ActionPreProcessor { +public abstract class ActionSchedulerService extends AbstractService implements ActionScheduler { private MetaStore metaStore; public ActionSchedulerService(SmartContext context, MetaStore metaStore) {