# spring-scheduling-cluster **Repository Path**: javacodekit2010/spring-scheduling-cluster ## Basic Information - **Project Name**: spring-scheduling-cluster - **Description**: Spring 定时器Scheduled分布式/集群环境下任务调度控制插件 - **Primary Language**: Java - **License**: MPL-2.0 - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 9 - **Created**: 2018-05-11 - **Last Updated**: 2022-05-24 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # spring-scheduling-cluster Spring Boot 自带定时器Scheduled增加分布式/集群环境下任务调度控制插件,其原理是对任务加锁实现控制,支持能实现分布锁的中间件。 下面提供了redis缓存和mysql数据库的实现,如果使用到其它中间件暂时麻烦请自行实现,下面会讲如果实现的。 ## 环境要求和关键技术 - [JDK1.7](http://www.oracle.com/technetwork/java/javase/downloads/index.html) - [Spring Boot 1.5.7.RELEASE](https://docs.spring.io/spring-boot/docs/1.5.7.RELEASE/reference/html/) ## 添加插件到项目中并启用 ### 添加插件 1. 方式一:下载本项目源码然后加添加到项目中。 2. 方式二:下载本项目的[jar包](https://gitee.com/lnkToKing/spring-scheduling-cluster/attach_files),添加到项目的libs包库中。 3. 方式三:下载本项目的[jar包](https://gitee.com/lnkToKing/spring-scheduling-cluster/attach_files),添加到本地maven库中,然后在pom.xml文件添加引用 ``` xml pres.lnk.springframework spring-scheduling-cluster 1.0-BATE ``` ### 启用插件 在 Spring 配置中添加以下代码将插件注册成Bean(注意,原来用来启动定时器的注解@EnableScheduling还是要添加的) ``` java @Bean(name = TaskManagementConfigUtils.SCHEDULED_ANNOTATION_PROCESSOR_BEAN_NAME) @Role(BeanDefinition.ROLE_INFRASTRUCTURE) public ScheduledAnnotationBeanPostProcessor scheduledAnnotationProcessor() { return new ScheduledClusterAnnotationBeanPostProcessor(); } ``` ## 实现调度器中间件对定时任务进行锁操作 ### 使用 redis 缓存做中间件 将下面类的代码添加到项目中,并注册成SpringBean限可 ``` java import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.connection.RedisConnection; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Component; import java.lang.reflect.Method; import java.util.concurrent.TimeUnit; /** * redis调度器中间件 * * @Author lnk * @Date 2018/2/28 */ @Component public class RedisSchedulerImpl extends AbstractScheduler { private static final String CACHE_PREFIX = "scheduler_"; private static final String MAX_LEVEL = "maxLevel"; @Autowired private RedisTemplate redisTemplate; @Override public boolean check(String id) { Long time = getCache(id); return time == null || currentTimeMillis() > time; } @Override public boolean lock(String id, long timeoutMillis) { String key = prefixKey(id); long nextTimeMillis = currentTimeMillis() + timeoutMillis; boolean flag = redisTemplate.opsForValue().setIfAbsent(key, nextTimeMillis); if(flag){ redisTemplate.expire(key, timeoutMillis < 0 ? 1 : timeoutMillis, TimeUnit.MILLISECONDS); } return flag; } @Override public void relock(String id, long timeoutMillis) { String key = prefixKey(id); redisTemplate.expire(key, timeoutMillis < 0 ? 1 : timeoutMillis, TimeUnit.MILLISECONDS); } @Override public long currentTimeMillis() { RedisConnection connection = redisTemplate.getConnectionFactory().getConnection(); long time = connection.time(); connection.close(); return time; } /* * ------------- 下面3个重写方法可选 ------------- * 如果启用了主从服务器则 keepAlive() 和 getMaxAliveLevel() 必须重写 */ @Override public void keepAlive() { int level = getLevel(); String key = prefixKey(MAX_LEVEL); Integer maxLevel = getCache(MAX_LEVEL); //如果maxLevel为null,就将当前level写进去 if (maxLevel == null) { //控制只能写一个,防止并发低级别把高级别覆盖了 boolean result = redisTemplate.opsForValue().setIfAbsent(key, level); if (result) { //写入成功,添加过期时间,过期时间追加5秒,避免出现误差,在下次刷新时间前就失效了,被低级别的服务器抢先执行任务 redisTemplate.expire(key, getHeartTime() + 5, TimeUnit.SECONDS); return; } else { //写入失败,则获取最高级别跟当前级别做比较,避免低级别抢先写入成功 maxLevel = getCache(key); } } //如果当前级别比缓存里级别还要高,则覆盖它 if (maxLevel > level) { redisTemplate.delete(key); keepAlive(); return; } //如果当前级别跟最高级别同级,则刷新过期时间 if (maxLevel == level) { Long expire = redisTemplate.getExpire(prefixKey(MAX_LEVEL)); //只有过期时间有还有10秒的时候才刷新时间,避免多个服务器同时刷新 if (expire < 10) { //加5秒理由同上 redisTemplate.expire(key, getHeartTime() + 5, TimeUnit.SECONDS); } } } @Override public int getMaxAliveLevel() { Integer level = getCache(MAX_LEVEL); if (level != null) { return level; } //如果没有最高级别则返回当前服务器级别 return getLevel(); } /** * 任务执行结束后调用的方法,可以写日志,不需要做后续处理可不重写 */ @Override public void executed(Method method, Object targer, long startTimeMillis, long endTimeMillis, String description) throws Exception { //可写任务执行日志 switch (getStatus()){ case AbstractScheduler.SUCCESS : // 执行成功 case AbstractScheduler.FAIL_CHECK : // 未执行任务,已有服务器执行过 case AbstractScheduler.FAIL_LOCK : // 未执行任务,获取锁失败 case AbstractScheduler.FAIL_LEVEL : // 未执行任务,级别低 case AbstractScheduler.ERROR : // 执行任务出现异常,如果不想处理可以抛回由Spring处理 throw getException(); } } public T getCache(String key) { key = prefixKey(key); return (T)redisTemplate.opsForValue().get(key); } private static String prefixKey(String key) { return CACHE_PREFIX.concat(key.replaceAll("\\W+", "_")); } } ``` ### 使用 mysql数据库 做中间件 由于每个项目使用的持久化框架不一样,下面只提供了Mybatis的实现做参考,请根据自己项目框架做改动 ``` java import com.gzkit.crm.mcs.mapper.TimedTaskMapper; import com.gzkit.crm.mcs.utils.CacheUtils; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.math.NumberUtils; import org.springframework.beans.factory.annotation.Autowired; import pres.lnk.springframework.AbstractScheduler; import java.lang.reflect.Method; import java.util.concurrent.TimeUnit; /** * mysql调度器中间件 * 由于mysql 5.6 版本之后才支持查询毫秒,为了兼容低版本mysql,所以将毫秒转成秒再处理业务 * * @Author lnk * @Date 2018/2/28 */ @Component public class MysqlSchedulerImpl extends AbstractScheduler { private static final String MAX_LEVEL = "maxLevel"; @Autowired private TimedTaskMapper timedTaskMapper; @Override public boolean check(String id) { int count = timedTaskMapper.check(id); return count == 0; } @Override public boolean lock(String id, long timeoutMillis) { long timeout = timeoutMillis / 1000; int result = timedTaskMapper.hasId(id); if(result > 0){ //id已有锁,则尝试重新获取锁 try { result = timedTaskMapper.lock(id, timeout); } catch (Exception e){ } }else{ //id还没锁,就直接插入锁 try { result = timedTaskMapper.insert(id, timeout, null); } catch (Exception e){ } } return result == 1; } @Override public void relock(String id, long timeoutMillis) { long timeout = timeoutMillis / 1000; timedTaskMapper.update(id, timeout, null); } @Override public long currentTimeMillis() { return timedTaskMapper.time() * 1000; } /* * ------------- 下面3个重写方法可选 ------------- * 如果启用了主从服务器则 keepAlive() 和 getMaxAliveLevel() 必须重写 */ @Override public void keepAlive() { int timeout = getHeartTime() + 5; //获取当前最高level String value = timedTaskMapper.getMaxLevel(MAX_LEVEL); Integer result = null; if(NumberUtils.isDigits(value)){ result = Integer.parseInt(value); } if(result == null){ //如果数据库没有保存level,则保存当前服务器的级别到数据库 try { timedTaskMapper.insert(MAX_LEVEL, timeout, getLevel() + ""); } catch (Exception e){ timedTaskMapper.updateLevel(MAX_LEVEL, getLevel() + "", timeout); } }else{ //数据库已有level if(result == getLevel()){ //如果数据库level与当前服务器相同,则尝试刷新当前级别服务器的时间 try { timedTaskMapper.updateLevelTime(MAX_LEVEL, getLevel() + "", timeout); } catch (Exception e){ } }else if(result > getLevel()){ //如果当前服务器级别比数据库的高,则刷新数据库级别 try { timedTaskMapper.updateLevel(MAX_LEVEL, getLevel() + "", timeout); } catch (Exception e){ } } } } @Override public int getMaxAliveLevel() { String value = timedTaskMapper.getMaxLevel(MAX_LEVEL); if(NumberUtils.isDigits(value)){ return Integer.parseInt(value); } return getLevel(); } /** * 任务执行结束后调用的方法,可以写日志,不需要做后续处理可不重写 */ @Override public void executed(Method method, Object targer, long startTimeMillis, long endTimeMillis, String description) throws Exception { //可写任务执行日志 switch (getStatus()){ case AbstractScheduler.SUCCESS : // 执行成功 case AbstractScheduler.FAIL_CHECK : // 未执行任务,已有服务器执行过 case AbstractScheduler.FAIL_LOCK : // 未执行任务,获取锁失败 case AbstractScheduler.FAIL_LEVEL : // 未执行任务,级别低 case AbstractScheduler.ERROR : // 执行任务出现异常,如果不想处理可以抛回由Spring处理 throw getException(); } } } ``` ``` java import org.apache.ibatis.annotations.Mapper; import org.apache.ibatis.annotations.Param; /** * @Author lnk * @Date 2018/3/6 */ @Mapper public interface TimedTaskMapper { /** * select count(*) from t_timed_task * where id = #{id, jdbcType=VARCHAR} and timeout >= UNIX_TIMESTAMP(now()) * * @param id * @return */ int check(String id); /** * select count(*) from t_timed_task where id = #{id, jdbcType=VARCHAR} * * @param id * @return */ int hasId(String id); /** * update t_timed_task set timeout = UNIX_TIMESTAMP(now()) + #{timeout, jdbcType=BIGINT} * where id = #{id, jdbcType=VARCHAR} and timeout < UNIX_TIMESTAMP(now()) * * @param id * @param timeout * @return */ int lock(@Param("id") String id, @Param("timeout") long timeout); /** * insert into t_timed_task values ( * #{id, jdbcType=VARCHAR}, * UNIX_TIMESTAMP(now()) + #{timeout, jdbcType=BIGINT}, * #{value, jdbcType=VARCHAR} * ) * * @param id * @param timeout * @param value * @return */ int insert(@Param("id") String id, @Param("timeout") long timeout, @Param("value") String value); /** * update t_timed_task set * timeout = UNIX_TIMESTAMP(now()) + #{timeout, jdbcType=BIGINT}, * value = #{value, jdbcType=VARCHAR} * where id = #{id, jdbcType=VARCHAR} * * @param id * @param timeout * @param value * @return */ int update(@Param("id") String id, @Param("timeout") long timeout, @Param("value") String value); /** * select UNIX_TIMESTAMP(now()) * * @return */ long time(); /** * update t_timed_task set value = #{value, jdbcType=VARCHAR}, timeout = UNIX_TIMESTAMP(now()) + #{timeout, jdbcType=BIGINT} * where id = #{id, jdbcType=VARCHAR} and (value > #{value, jdbcType=VARCHAR} or timeout < UNIX_TIMESTAMP(now())) * * @param id * @return */ int updateLevel(@Param("id") String id, @Param("value") String value, @Param("timeout") long timeout); /** * update t_timed_task set timeout = UNIX_TIMESTAMP(now()) + #{timeout, jdbcType=BIGINT} * where id = #{id, jdbcType=VARCHAR} and value = #{value, jdbcType=VARCHAR} * * @param id * @param value * @param timeout * @return */ int updateLevelTime(@Param("id") String id, @Param("value") String value, @Param("timeout") long timeout); /** * select value from t_timed_task where id = #{id, jdbcType=VARCHAR} and timeout > UNIX_TIMESTAMP(now()) * * @param id * @return */ String getMaxLevel(String id); } ``` 数据库表结构 t_timed_task 字段 | 类型 | 说明 --- | --- | --- id | varchar(255) PRIMARY NOT NULL | 任务id timeout | bigint NOT NULL | 锁的失效时间 value | varchar(255) DEFAULT NULL | 锁对应的值 ``` sql DROP TABLE IF EXISTS `t_timed_task`; CREATE TABLE `t_timed_task` ( `id` varchar(255) NOT NULL, `timeout` bigint(20) NOT NULL, `value` varchar(255) DEFAULT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8; ``` ### 自定义中间件 自定义类继承抽象类`AbstractScheduler`,实现父类的方法并注册成Spring Bean。 `AbstractScheduler` 方法说明 方法 | 必须 | 说明 --- | --- | --- boolean check(String id) | 是 | 检查任务id是否没被锁,如果没被锁则表示可以执行任务,下一步就获取锁 boolean lock(String id, long timeoutMillis) | 是 | 对任务id加锁,并在下次执行任务前释放锁,返回加锁是否成功 void relock(String id, long timeoutMillis) | 是 | 修改任务id锁的释放时间 long currentTimeMillis() | 是 | 获取中间件的服务器时间,做为锁的参考时间。集群所有服务器最好做时间同步,避免cron任务出现误差 void keepAlive() | 如果启用优先级功能则必须重写 | 将服务器最高优先级别保存到中间件 int getMaxAliveLevel() | 如果启用优先级功能则必须重写 | 获取中间件保存的最高级别 void executed(Method method, Object targer, long startTimeMillis, long endTimeMillis, String description) | 否 | 定时任务执行结束的后续处理 ## 设置主从服务器 主从服务器是通过设置服务器优先级实现,实现原理是优先级高的服务器定时(心跳时间)告诉中间件我还活着(运行中), 然后优先级低的服务器则不执行任务。如果优先级高级服务器挂了,中间件不再接收到信息了,优先级低的服务器就会接替继续工作。 #### 设置方式 在spring的yml配置文件中添加下面配置 ``` yaml spring: scheduling: cluster: level: 1 #优先级别 heartTime: 60 #心跳时间(秒) ``` 配置说明: level : 优先级别,1 等级最高,数字越大等级越低。其中 0 是该服务器不执行定时任务。 -1 是不参与集群服务调度,不受优先级影响,任务每次都会执行。 heartTime : 心跳时间,服务器会以这个时间频率告诉中间件我还活着 重写中间件的getLevel()方法,可自定义level规则,例如根据ip判断level,这样不用每个服务器单独改配置文件 ## 注解@ScheduledCluster 该注解用在`@Scheduled`的方法上,有以下属性 属性 | 必填 | 说明 --- | --- | --- id | 否 | 自定义任务id description | 否 | 任务描述 ignore | 否 | 是否忽略集群控制,作用跟level=-1一样,但只针对该任务 ``` java @ScheduledCluster(id="updateData", description = "每小时更新一次数据") @Scheduled(cron = "0 0 0/1 * * ?") public void update(){ // 更新数据 } ```