Elastic Job 从单点到高可用、同城主备、同城双活
- 前言 -
- 单点部署到高可用 -

作业分片一致性,保证同一分片在分布式环境中仅一个执行实例


- 双机房高可用 -

- 优先级调度 -

保证两个机房都随时可用,也就是一个机房的服务如果全部不可用了,另外一个机房能提供对等的服务; 但一个任务可以优先指定A机房执行。
- Elastic Job 分片策略 -
public Map<JobInstance, List<Integer>> sharding(List<JobInstance> jobInstances, String jobName, int shardingTotalCount)
public abstract class JobShardingStrategyActiveStandbyDecorator implements JobShardingStrategy {
//内置的分配策略采用原来的默认策略:平均
private JobShardingStrategy inner = new AverageAllocationJobShardingStrategy();
/**
* 判断一个实例是否是备用的实例,在每次触发sharding方法之前会遍历所有实例调用此方法。
* 如果主备实例同时存在于列表中,那么备实例将会被剔除后才进行sharding
* @param jobInstance
* @return
*/
protected abstract boolean isStandby(JobInstance jobInstance, String jobName);
public Map<JobInstance, List<Integer>> sharding(List<JobInstance> jobInstances, String jobName, int shardingTotalCount) {
List<JobInstance> jobInstancesCandidates = new ArrayList<>(jobInstances);
List<JobInstance> removeInstance = new ArrayList<>();
boolean removeSelf = false;
for (JobInstance jobInstance : jobInstances) {
boolean isStandbyInstance = false;
try {
isStandbyInstance = isStandby(jobInstance, jobName);
} catch (Exception e) {
log.warn("isStandBy throws error, consider as not standby",e);
}
if (isStandbyInstance) {
if (IpUtils.getIp().equals(jobInstance.getIp())) {
removeSelf = true;
}
jobInstancesCandidates.remove(jobInstance);
removeInstance.add(jobInstance);
}
}
if (jobInstancesCandidates.isEmpty()) {//移除后发现没有实例了,就不移除了,用原来的列表(后备)的顶上
jobInstancesCandidates = jobInstances;
log.info("[{}] ATTENTION!! Only backup job instances exist, but do sharding with them anyway {}", jobName, JSON.toJSONString(jobInstancesCandidates));
}
if (!jobInstancesCandidates.equals(jobInstances)) {
log.info("[{}] remove backup before really do sharding, removeSelf :{} , remove instances: {}", jobName, removeSelf, JSON.toJSONString(removeInstance));
log.info("[{}] after remove backups :{}", jobName, JSON.toJSONString(jobInstancesCandidates));
} else {//全部都是master或者全部都是slave
log.info("[{}] job instances just remain the same {}", jobName, JSON.toJSONString(jobInstancesCandidates));
}
//保险一点,排序一下,保证每个实例拿到的列表肯定是一样的
jobInstancesCandidates.sort((o1, o2) -> o1.getJobInstanceId().compareTo(o2.getJobInstanceId()));
return inner.sharding(jobInstancesCandidates, jobName, shardingTotalCount);
}
- 调优 -
一、继承此装饰器策略,指定哪些实例是standby实例
public class ActiveStandbyESJobStrategy extends JobShardingStrategyActiveStandbyDecorator{
protected boolean isStandby(JobInstance jobInstance, String jobName) {
String activeIps = "10.10.10.1,10.10.10.2";//只有这两个ip的实例才是优先执行的,其他都是备用的
String ss[] = activeIps.split(",");
return !Arrays.asList(ss).contains(jobInstance.getIp());//不在active名单的就是后备
}
}
很简单吧!这样实现之后,就能达到以下类似的效果。

二、 在任务启动前,指定使用这个策略
JobCoreConfiguration simpleCoreConfig = JobCoreConfiguration.newBuilder(jobClass.getName(), cron, shardingTotalCount).shardingItemParameters(shardingItemParameters).build();
SimpleJobConfiguration simpleJobConfiguration = new SimpleJobConfiguration(simpleCoreConfig, jobClass.getCanonicalName());
return LiteJobConfiguration.newBuilder(simpleJobConfiguration)
.jobShardingStrategyClass("com.xxx.yyy.job.ActiveStandbyESJobStrategy")//使用主备的分配策略,分主备实例(输入你的实现类类名)
.build();
- 同城双活模式 -
public Map<JobInstance, List<Integer>> sharding(List<JobInstance> jobInstances, String jobName, int shardingTotalCount)
指定任务的主机房让其是B机房优先调度(例如挑选部分只读任务,占10%的任务数); 对于分片的分配,把末尾(如1/10)的分片优先分配给B机房。
public class ActiveStandbyESJobStrategy extends JobShardingStrategyActiveStandbyDecorator{
protected boolean isStandby(JobInstance jobInstance, String jobName) {
String activeIps = "10.10.10.1,10.10.10.2";//默认只有这两个ip的实例才是优先执行的,其他都是备用的
if ("TASK_B_FIRST".equals(jobName)){//选择这个任务优先调度到B机房
activeIps = "10.11.10.1,10.11.10.2";
}
String ss[] = activeIps.split(",");
return !Arrays.asList(ss).contains(jobInstance.getIp());//不在active名单的就是后备
}
}

作者:Jaskey Lam
来源:
https://jaskey.github.io/blog/2020/05/25/elastic-job-timmer-active-standby/

评论