Skip to content

Commit

Permalink
fix #36
Browse files Browse the repository at this point in the history
  • Loading branch information
haocao committed Jan 29, 2016
1 parent 7c4555a commit 07e8021
Show file tree
Hide file tree
Showing 17 changed files with 129 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,5 @@
*
* @author zhangliang
*/
public interface ElasticJob extends Job { }
public interface ElasticJob extends Job, Stopable {
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.quartz.CronTrigger;
import org.quartz.JobBuilder;
import org.quartz.JobDetail;
import org.quartz.JobExecutionContext;
import org.quartz.Scheduler;
import org.quartz.SchedulerException;
import org.quartz.Trigger;
Expand All @@ -39,7 +38,6 @@
import com.dangdang.ddframe.job.internal.execution.ExecutionContextService;
import com.dangdang.ddframe.job.internal.execution.ExecutionService;
import com.dangdang.ddframe.job.internal.failover.FailoverService;
import com.dangdang.ddframe.job.internal.job.AbstractElasticJob;
import com.dangdang.ddframe.job.internal.listener.ListenerManager;
import com.dangdang.ddframe.job.internal.monitor.MonitorService;
import com.dangdang.ddframe.job.internal.offset.OffsetService;
Expand All @@ -56,7 +54,8 @@
/**
* 作业调度器.
*
* @author zhangliang, caohao
* @author zhangliang
* @author caohao
*/
@Slf4j
public class JobScheduler {
Expand Down Expand Up @@ -125,7 +124,7 @@ public void init() {
} catch (final SchedulerException ex) {
throw new JobException(ex);
}
JobRegistry.getInstance().addJob(jobConfiguration.getJobName(), this);
JobRegistry.getInstance().addJobScheduler(jobConfiguration.getJobName(), this);
}

private void registerElasticEnv() {
Expand Down Expand Up @@ -224,11 +223,7 @@ public Date getNextFireTime() {
*/
public void stopJob() {
try {
for (JobExecutionContext each : scheduler.getCurrentlyExecutingJobs()) {
if (each.getJobInstance() instanceof AbstractElasticJob) {
((AbstractElasticJob) each.getJobInstance()).stop();
}
}
JobRegistry.getInstance().getJobInstance(jobConfiguration.getJobName()).stop();
scheduler.pauseAll();
} catch (final SchedulerException ex) {
throw new JobException(ex);
Expand All @@ -243,8 +238,8 @@ public void resumeManualStopedJob() {
if (scheduler.isShutdown()) {
return;
}
JobRegistry.getInstance().getJobInstance(jobConfiguration.getJobName()).resume();
scheduler.resumeAll();
// TODO 恢复stoped=fasle状态
} catch (final SchedulerException ex) {
throw new JobException(ex);
}
Expand All @@ -264,6 +259,7 @@ public void resumeCrashedJob() {
if (serverService.isJobStopedManually()) {
return;
}
JobRegistry.getInstance().getJobInstance(jobConfiguration.getJobName()).resume();
try {
scheduler.resumeAll();
} catch (final SchedulerException ex) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/**
* Copyright 1999-2015 dangdang.com.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* </p>
*/

package com.dangdang.ddframe.job.api;

/**
* 可停止的作业或目标.
*
* @author caohao
*/
public interface Stopable {

/**
* 停止运行中的作业或目标.
*/
void stop();

/**
* 恢复运行作业或目标.
*/
void resume();
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ class CronSettingChangedJobListener extends AbstractJobListener {
protected void dataChanged(final CuratorFramework client, final TreeCacheEvent event, final String path) {
if (configNode.isCronPath(path) && Type.NODE_UPDATED == event.getType()) {
String cronExpression = new String(event.getData().getData());
JobScheduler jobScheduler = JobRegistry.getInstance().getJob(jobName);
JobScheduler jobScheduler = JobRegistry.getInstance().getJobScheduler(jobName);
if (null != jobScheduler) {
jobScheduler.rescheduleJob(cronExpression);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,4 +216,14 @@ public String getJobShardingStrategyClass() {
public int getMonitorPort() {
return Integer.valueOf(jobNodeStorage.getJobNodeData(ConfigurationNode.MONITOR_PORT));
}


/**
* 获取作业名称.
*
* @return 作业名称
*/
public String getJobName() {
return jobNodeStorage.getJobConfiguration().getJobName();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@
/**
* 执行作业的服务.
*
* @author zhangliang, caohao
* @author zhangliang
* @author caohao
*/
public class ExecutionService {

Expand Down Expand Up @@ -71,7 +72,7 @@ public void registerJobBegin(final JobExecutionMultipleShardingContext jobExecut
for (int each : jobExecutionShardingContext.getShardingItems()) {
jobNodeStorage.fillEphemeralJobNode(ExecutionNode.getRunningNode(each), "");
jobNodeStorage.replaceJobNode(ExecutionNode.getLastBeginTimeNode(each), System.currentTimeMillis());
JobScheduler jobScheduler = JobRegistry.getInstance().getJob(jobConfiguration.getJobName());
JobScheduler jobScheduler = JobRegistry.getInstance().getJobScheduler(jobConfiguration.getJobName());
if (null == jobScheduler) {
continue;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ public void execute() {
log.debug("Elastic job: failover job begin, crashed item:{}.", crashedItem);
jobNodeStorage.fillEphemeralJobNode(FailoverNode.getExecutionFailoverNode(crashedItem), localHostService.getIp());
jobNodeStorage.removeJobNodeIfExisted(FailoverNode.getItemsNode(crashedItem));
JobRegistry.getInstance().getJob(jobConfiguration.getJobName()).triggerJob();
JobRegistry.getInstance().getJobScheduler(jobConfiguration.getJobName()).triggerJob();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,6 @@

package com.dangdang.ddframe.job.internal.job;

import lombok.AccessLevel;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;

import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;

Expand All @@ -32,20 +27,26 @@
import com.dangdang.ddframe.job.internal.execution.ExecutionService;
import com.dangdang.ddframe.job.internal.failover.FailoverService;
import com.dangdang.ddframe.job.internal.offset.OffsetService;
import com.dangdang.ddframe.job.internal.schedule.JobRegistry;
import com.dangdang.ddframe.job.internal.sharding.ShardingService;

import lombok.AccessLevel;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;

/**
* 弹性化分布式作业的基类.
*
* @author zhangliang, caohao
* @author zhangliang
* @author caohao
*/
@Slf4j
public abstract class AbstractElasticJob implements ElasticJob {

@Getter(AccessLevel.PROTECTED)
private volatile boolean stoped;

@Setter
@Getter(AccessLevel.PROTECTED)
private ConfigurationService configService;

Expand Down Expand Up @@ -106,10 +107,18 @@ private void executeJobInternal(final JobExecutionMultipleShardingContext shardi

protected abstract void executeJob(final JobExecutionMultipleShardingContext shardingContext);

/**
* 停止运行中的作业.
*/
public void stop() {
@Override
public final void stop() {
stoped = true;
}

@Override
public final void resume() {
stoped = false;
}

public final void setConfigService(final ConfigurationService configService) {
this.configService = configService;
JobRegistry.getInstance().addJobInstance(configService.getJobName(), this);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,25 @@

package com.dangdang.ddframe.job.internal.schedule;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

import com.dangdang.ddframe.job.api.ElasticJob;
import com.dangdang.ddframe.job.api.JobScheduler;

/**
* 作业注册表.
*
* @author zhangliang
* @author caohao
*/
public final class JobRegistry {

private static volatile JobRegistry instance;

private ConcurrentMap<String, JobScheduler> map = new ConcurrentHashMap<>();
private Map<String, JobScheduler> schedulerMap = new ConcurrentHashMap<>();

private ConcurrentHashMap<String, ElasticJob> instanceMap = new ConcurrentHashMap<>();

private JobRegistry() {
}
Expand All @@ -48,21 +52,42 @@ public static JobRegistry getInstance() {
}

/**
* 添加作业.
* 添加作业控制器.
*
* @param jobName 作业名称
* @param jobScheduler 作业控制器
*/
public void addJob(final String jobName, final JobScheduler jobScheduler) {
map.put(jobName, jobScheduler);
public void addJobScheduler(final String jobName, final JobScheduler jobScheduler) {
schedulerMap.put(jobName, jobScheduler);
}

/**
* 获取作业控制器.
*
* @param jobName 作业名称
* @return 作业控制器
*/
public JobScheduler getJobScheduler(final String jobName) {
return schedulerMap.get(jobName);
}

/**
* 添加作业实例.
*
* @param jobName 作业名称
* @param job 作业实例
*/
public void addJobInstance(final String jobName, final ElasticJob job) {
instanceMap.putIfAbsent(jobName, job);
}

/**
* 获取作业.
* 获取作业实例.
*
* @param jobName 作业名称
* @return 作业实例
*/
public JobScheduler getJob(final String jobName) {
return map.get(jobName);
public ElasticJob getJobInstance(final String jobName) {
return instanceMap.get(jobName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,9 @@ class ConnectionLostListener implements ConnectionStateListener {
@Override
public void stateChanged(final CuratorFramework client, final ConnectionState newState) {
if (ConnectionState.LOST == newState) {
JobRegistry.getInstance().getJob(jobName).stopJob();
JobRegistry.getInstance().getJobScheduler(jobName).stopJob();
} else if (ConnectionState.RECONNECTED == newState) {
JobRegistry.getInstance().getJob(jobName).resumeCrashedJob();
JobRegistry.getInstance().getJobScheduler(jobName).resumeCrashedJob();
}
}
}
Expand All @@ -72,7 +72,7 @@ protected void dataChanged(final CuratorFramework client, final TreeCacheEvent e
if (!serverNode.isJobStopedPath(path)) {
return;
}
JobScheduler jobScheduler = JobRegistry.getInstance().getJob(jobName);
JobScheduler jobScheduler = JobRegistry.getInstance().getJobScheduler(jobName);
if (null == jobScheduler) {
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public interface CoordinatorRegistryCenter extends RegistryCenter {
/**
* 添加本地缓存.
*
* @param watcherPath 需加入缓存的路径
* @param cachePath 需加入缓存的路径
*/
void addCacheData(String cachePath);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,9 +109,9 @@ public void setUp() {
@After
public void tearDown() throws SchedulerException, NoSuchFieldException {
ProcessCountStatistics.reset(jobName);
JobScheduler jobScheduler = JobRegistry.getInstance().getJob(jobName);
JobScheduler jobScheduler = JobRegistry.getInstance().getJobScheduler(jobName);
if (null != jobScheduler) {
JobRegistry.getInstance().getJob(jobName).shutdown();
JobRegistry.getInstance().getJobScheduler(jobName).shutdown();
}
ReflectionUtils.setFieldValue(JobRegistry.getInstance(), "instance", null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ configurationListenerManager.new CronSettingChangedJobListener().dataChanged(nul

@Test
public void assertCronSettingChangedJobListenerWhenIsCronPathAndUpdateAndFindJob() {
JobRegistry.getInstance().addJob("testJob", jobScheduler);
JobRegistry.getInstance().addJobScheduler("testJob", jobScheduler);
configurationListenerManager.new CronSettingChangedJobListener().dataChanged(null, new TreeCacheEvent(
TreeCacheEvent.Type.NODE_UPDATED, new ChildData("/testJob/config/cron", null, "*/10 * * * * *".getBytes())), "/testJob/config/cron");
verify(jobScheduler).rescheduleJob("*/10 * * * * *");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public void assertRegisterJobBeginWhenNotMonitorExecution() {
public void assertRegisterJobBeginWithoutNextFireTime() {
when(configService.isMonitorExecution()).thenReturn(true);
when(jobScheduler.getNextFireTime()).thenReturn(null);
JobRegistry.getInstance().addJob("testJob", jobScheduler);
JobRegistry.getInstance().addJobScheduler("testJob", jobScheduler);
JobExecutionMultipleShardingContext jobExecutionShardingContext = new JobExecutionMultipleShardingContext();
jobExecutionShardingContext.setShardingItems(Arrays.asList(0, 1, 2));
executionService.registerJobBegin(jobExecutionShardingContext);
Expand All @@ -122,7 +122,7 @@ public void assertRegisterJobBeginWithoutNextFireTime() {
public void assertRegisterJobBeginWithNextFireTime() {
when(configService.isMonitorExecution()).thenReturn(true);
when(jobScheduler.getNextFireTime()).thenReturn(new Date(0L));
JobRegistry.getInstance().addJob("testJob", jobScheduler);
JobRegistry.getInstance().addJobScheduler("testJob", jobScheduler);
JobExecutionMultipleShardingContext jobExecutionShardingContext = new JobExecutionMultipleShardingContext();
jobExecutionShardingContext.setShardingItems(Arrays.asList(0, 1, 2));
executionService.registerJobBegin(jobExecutionShardingContext);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ public void assertFailoverLeaderExecutionCallbackIfNecessary() {
when(jobNodeStorage.isJobNodeExisted("leader/failover/items")).thenReturn(true);
when(jobNodeStorage.getJobNodeChildrenKeys("leader/failover/items")).thenReturn(Arrays.asList("0", "1", "2"));
when(serverService.isServerReady()).thenReturn(true);
JobRegistry.getInstance().addJob("testJob", jobScheduler);
JobRegistry.getInstance().addJobScheduler("testJob", jobScheduler);
failoverService.new FailoverLeaderExecutionCallback().execute();
verify(jobNodeStorage).isJobNodeExisted("leader/failover/items");
verify(jobNodeStorage, times(2)).getJobNodeChildrenKeys("leader/failover/items");
Expand Down
Loading

0 comments on commit 07e8021

Please sign in to comment.