任务调度平台在服务器集群上的分布式搭建笔记

之前只在单点服务器上部署过任务调度平台,最近因为项目结题时计算速率的指标要求过高,计划采用多台服务器并发调度+各服务器上多线程并发相结合的形式提升效率,于是进行了任务调度平台的集群部署,最后计算效率提升了100倍。这里把搭建过程和相关的源码学习过程记录下来,便于其他同类需求项目的复现。

我这里的服务器集群为10台机架式服务器,有1个主控节点,9个计算节点,10台服务器的ip配置分别为192.168.1.101-192.168.1.110,任务调度平台基础框架沿袭之前项目使用的XXL-JOB,相关学习笔记可以参考作者之前的博文:
分布式任务调度平台XXL-JOB的Oracle版本搭建与学习笔记:
https://blog.csdn.net/nannan7777/article/details/107337464?spm=1001.2014.3001.5501
分布式任务调度平台XXL-JOB源码解析笔记
https://blog.csdn.net/nannan7777/article/details/117706213?spm=1001.2014.3001.5501

1 任务调度平台服务器集群搭建过程

首先XXL-JOB官网上给出的集群部署策略有两种,调度中心集群部署,以及执行器集群部署,具体要求如下:

(1) 调度中心集群部署

调度中心支持集群部署,提升调度系统容灾和可用性。
调度中心集群部署时,几点要求和建议:

1)DB配置保持一致;
2)集群机器时钟保持一致(单机集群忽视);
3)推荐通过Nginx反向代理为调度中心集群做负载均衡,分配域名。调度中心访问、执行器回调配置、调用API服务等操作均通过该域名进行。

(2) 执行器集群部署

执行器支持集群部署,提升调度系统可用性,同时提升任务处理能力。
执行器集群部署时,几点要求和建议:

1)执行器回调地址(xxl.job.admin.addresses)需要保持一致:执行器根据该配置进行执行器自动注册等操作;
2)同一个执行器集群内AppNamexxl.job.executor.appname)需要保持一致:调度中心根据该配置动态发现不同集群的在线执行器列表。

结合我自己工程项目的需求,这里采用执行器集群部署策略,将任务调度中心(xxl-job-admin)部署在主控节点的服务器上,并在全部服务器上部署执行器(xxl-job-executor),分别配置执行器地址为10台服务器的ip,同时这10个执行器的AppName保持一致,过程如下所示:

1)创建一个执行器集群:

(如果有多种类型的计算需求,可以创建多个执行器组,将计算任务按照类别分开,我这里仅有一类计算任务,因此只创建了一个执行器集群)

在这里插入图片描述


执行器集群的属性说明如下:
AppName:是每个执行器集群的唯一标示,执行器会周期性以AppName为对象进行自动注册。可通过该配置自动发现注册成功的执行器,供任务调度时使用。
名称执行器的名称,因为AppName限制字母数字等组成,可读性不强,名称为了提高执行器的可读性。
排序:执行器的排序,系统中需要执行器的地方,如任务新增,将会按照该排序读取可用的执行器列表。
注册方式:调度中心获取执行器地址的方式:
自动注册执行器自动进行执行器注册,调度中心通过底层注册表可以动态发现执行器机器地址。
手动录入:人工手动录入执行器的地址信息,多地址逗号分隔,供调度中心使用。
机器地址:"注册方式"为"手动录入"时有效,支持人工维护执行器的地址信息。

在这里插入图片描述


2)为各个执行器创建对应的服务器计算节点,并在各台服务器上部署执行器jar包,将全部执行器运行起来后计算节点状态会变为在线:

在这里插入图片描述


在这里插入图片描述


3)之后我修改了执行器集群的路由策略,代码认选择的是 RANDOM 随机策略,这里为均衡利用各个服务器的资源,我选择了ROUND 轮询策略:

  • 路由策略:
    FirsT(第一个):固定选择第一个机器;
    LAST(最后一个):固定选择最后一个机器;
    ROUND(轮询):顺序选择每一个机器;
    RANDOM随机):随机选择在线的机器;
    CONSISTENT_HASH(一致性HASH):每个任务按照Hash算法固定选择某一台机器,且所有任务均匀散列在不同机器上。
    LEAST_FREQUENTLY_USED(最不经常使用):使用频率最低的机器优先被选举;
    LEAST_RECENTLY_USED(最近最久未使用):最久未使用的机器优先被选举;
    FAIlovER(故障转移):按照顺序依次进行心跳检测,第一个心跳检测成功的机器选定为目标执行器并发起调度;
    BUSYOVER(忙碌转移):按照顺序依次进行空闲检测,第一个空闲检测成功的机器选定为目标执行器并发起调度;
    SHARDING_broADCAST (分片广播):广播触发对应集群中所有机器执行一次任务,同时系统自动传递分片参数;可根据分片参数开发分片任务;
    • 子任务:每个任务都拥有一个唯一的任务ID(任务ID可以从任务列表获取),当本任务执行结束并且执行成功时,将会触发子任务ID所对应的任务的一次主动调度。
    • 调度过期策略:
      • 忽略:调度过期后,忽略过期的任务,从当前时间开始重新计算下次触发时间;
      • 立即执行一次:调度过期后,立即执行一次,并从当前时间开始重新计算下次触发时间;
    • 阻塞处理策略:调度过于密集执行器来不及处理时的处理策略;
      单机串行(认):调度请求进入单机执行器后,调度请求进入FIFO队列并以串行方式运行;
      丢弃后续调度:调度请求进入单机执行器后,发现执行器存在运行的调度任务,本次请求将会被丢弃并标记为失败;
      覆盖之前调度:调度请求进入单机执行器后,发现执行器存在运行的调度任务,将会终止运行中的调度任务并清空队列,然后运行本地调度任务;
    • 任务超时时间:支持自定义任务超时时间,任务运行超时将会主动中断任务;
    • 失败重试次数支持自定义任务失败重试次数,当任务失败时将会按照预设的失败重试次数主动进行重试。

代码修改的部分如下所示:

private boolean saveJob(DagNode node, TaskDeFinition deFinition) {
    Job jobInfo = new Job();
    jobInfo.setId(node.getId());
    jobInfo.setJobCron("* * * * * ?");
    jobInfo.setExecutorId(deFinition.getExecutor());
    jobInfo.setExecutorHandler(deFinition.getJobHandler());
    jobInfo.setglueType(glueTypeEnum.BEAN.name());
    jobInfo.setglueRemark("NONE");
    jobInfo.setglueUpdatetime(new Date());
    jobInfo.setExecutorFailRetryCount(ConstantUtils.EXECUTOR_FAIL_RETRY_COUNT);
    jobInfo.setExecutorTimeout(ConstantUtils.EXECUTOR_TIMEOUT);
    jobInfo.setAuthor("System Scheduler");
    jobInfo.setExecutorParam(node.getCommand());
    jobInfo.setJobDesc(node.getId());
    jobInfo.setAddTime(new Date());
    jobInfo.setUpdateTime(new Date());
    jobInfo.setExecutorRouteStrategy(ExecutorRouteStrategyEnum.ROUND.name());
    jobInfo.setExecutorBlockStrategy(ExecutorBlockStrategyEnum.SERIAL_EXECUTION.name());
    int res = ServerConfig.getInstance().getJobDao().save(jobInfo);
    if (res > 0) {
        return true;
    } else {
        logger.error("Save Job:" + jobInfo.getId() + "Failed");
        return false;
    }
}

在保存计算任务时认选择ExecutorRouteStrategyEnum.ROUND.name()轮询标识的路由策略:

jobInfo.setExecutorRouteStrategy(ExecutorRouteStrategyEnum.ROUND.name());

各个路由策略的标识在枚举类型ExecutorRouteStrategyEnum类中定义:

FirsT("first"),
LAST("last"),
ROUND("round"),
RANDOM("random"),
CONSISTENT_HASH("consistenthash"),
LEAST_FREQUENTLY_USED("lfu"),
LEAST_RECENTLY_USED("lru"),
FAIlovER("failover"),
BUSYOVER("busyover"),
SHARDING_broADCAST("shard");

最后,基础环境就保障好了,接下来就可以分布式高并发的运行自己的计算模型。

例如我这里单台服务器单线程运行一个计算模型耗时10分钟,通过分布式调度部署后,实现了10台服务器并发调度,同时每台服务器的内存资源允许10个线程并发,速度因此提升了100倍

2 分布式集群调度策略源码分析

使用过程中其实没有遇到太多的问题,但仅仅会用不是我们的目的,既要知其然,也要知其所以然。于是接下来我对执行器集群并发调度的源码进行了深入学习,不禁再次感慨一句XXL-JOB的源码写的是真规范,编码过程也有很多巧思,有蛮大的工程项目借鉴意义。

(1) 远程触发执行器过程分析

首先,任务调度中心分配的计算任务会通过JobTriggerPoolHelper.trigger()函数进行触发:

private long trigger(DagNode node) {
    return JobTriggerPoolHelper.trigger(zProcessInstance.getXH(), node.getId(),
            TriggerTypeEnum.API,
            -1, null,
            node.getCommand());
}

JobTriggerPoolHelper.trigger()函数如下:

在这里插入图片描述

JobTriggerPoolHelper.trigger所需形参包括
(任务Id ,任务触发类型,失败重试次数,执行分片参数,执行参数,执行器地址)

这里的任务触发类型由TriggerTypeEnum类进行枚举:

MANUAL(I18nUtil.getString("jobconf_trigger_type_manual")),        //调度平台手动触发
CRON(I18nUtil.getString("jobconf_trigger_type_cron")),            //定时器触发
RETRY(I18nUtil.getString("jobconf_trigger_type_retry")),          //失败重试触发
PARENT(I18nUtil.getString("jobconf_trigger_type_parent")),        //作为子任务触发
API(I18nUtil.getString("jobconf_trigger_type_api"));              //API触发

添加触发器helper.addTrigger()之后,进行触发:

在这里插入图片描述


JobTrigger.trigger()执行触发的具体过程如下:

在这里插入图片描述

首先,通过ServerConfig.getInstance().getJobDao().loadById(jobId)获取任务的详细信息,之后处理分片参数与匹配执行器路由策略,按照选择的策略模式进行执行器的选择,processtrigger()的处理逻辑如下:

在这里插入图片描述


在这里插入图片描述


代码中可以看出processtrigger()的处理过程分为以下四个步骤:

  1. 保存任务执行日志jobLog
  2. 初始化触发器参数trigger-param,取出执行器任务handler名称
  3. 初始化执行器地址信息address,根据不同的执行器策略使用不同的实现类,从而选举出本次使用执行器集群地址列表中不同的地址信息;
  4. 触发远程执行器执行任务runExecutor(),向执行器发送指令。

接下来具体看一下runExecutor()中实际任务执行的处理逻辑:

在这里插入图片描述


首先获取ExecutorBiz代理对象,之后调用executorBiz.run(),但这个run方法不会最终执行,仅仅只是为了触发代理对象的invoke方法,同时将目标的类型传送给服务端,因为在代理对象的invoke的方法里面没有执行目标对象的方法

实际的执行过程在ExecutorBizImpl.run()代码如下所示:

在这里插入图片描述

在这里插入图片描述


ExecutorBizImpl.run()方法中首先获取了任务的执行线程,加载jobHandler,最后进行执行器注册,并将执行线程jobThread放入回调队列TriggerQueueregistJobThread()方法中的处理如下,启动了JobThread.run()方法

在这里插入图片描述


最后在JobThread.run()方法调用handler.execute()进行任务的远程执行:

在这里插入图片描述


在执行器(xxl-job-executor)工程中添加@JobHandler注解进行监听,从而进行执行器响应后的逻辑运算,比如我的计算模型调用就放在execute()方法中。

在这里插入图片描述


到这里,远程触发执行器的完整流程就结束了。

(2) 执行器集群的路由策略

这里把执行器的路由策略单独拎出来整理一下:

在进行执行器地址选择时,根据不同的执行器策略使用不同的实现类,从而选举出本次使用执行器集群地址列表中不同的地址信息,各个实现类如下所示:

ExecutorRouteBusyover:忙碌转移路由,从执行器地址列表查找心跳正常的执行器地址;
ExecutorRouteFailover:故障转移路由,查找心跳正常的执行器地址;
ExecutorRouteLast:执行器地址列表的最后一个地址;
ExecutorRouteFirst:执行器地址列表的第一个地址;
ExecutorRouteConsistentHash:哈希一致性路由,通过哈希一致性算法选择执行器地址;
ExecutorRouteLFU:最不经常使用路由,使用频率最低的执行器地址;
ExecutorRouteLRU:最近最久未使用路由,选择最近最久未被使用的执行器地址;
ExecutorRouterandom随机路由,随机选择一个执行器地址;
ExecutorRouteRound:轮询路由,轮询选择一个执行器地址。

1) ExecutorRouteBusyover是忙碌转移路由器,route()方法首先遍历执行器地址列表,然后对执行器地址进行空闲检测,当任务线程没有在执行定时任务时,将返回空闲检测成功,将该执行器地址返回,具体代码如下:

//代码位置:com.xxl.job.admin.core.route.strategy.ExecutorRouteBusyover#route
public ReturnT<String> route(TriggerParam triggerParam, List<String> addressList) {
        StringBuffer idleBeatResultSB = new StringBuffer();
        //遍历执行器地址
        for (String address : addressList) {
            // beat
            ReturnT<String> idleBeatResult = null;
            try {
                ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(address);
                //空闲检测,当任务线程没有在执行,返回true
                idleBeatResult = executorBiz.idleBeat(new IdleBeatParam(triggerParam.getJobId()));
            } catch (Exception e) {
                logger.error(e.getMessage(), e);
                idleBeatResult = new ReturnT<String>(ReturnT.FAIL_CODE, ""+e );
            }
            idleBeatResultSB.append( (idleBeatResultSB.length()>0)?"<br><br>":"")
                    .append(I18nUtil.getString("jobconf_idleBeat") + ":")
                    .append("<br>address:").append(address)
                    .append("<br>code:").append(idleBeatResult.getCode())
                    .append("<br>msg:").append(idleBeatResult.getMsg());

            // beat success
            //如果空闲检测成功,则返回执行器地址
            if (idleBeatResult.getCode() == ReturnT.SUCCESS_CODE) {
                idleBeatResult.setMsg(idleBeatResultSB.toString());
                idleBeatResult.setContent(address);
                return idleBeatResult;
            }
        }

        return new ReturnT<String>(ReturnT.FAIL_CODE, idleBeatResultSB.toString());
}

2) ExecutorRouteFailover是失败转移路由,route()方法首先遍历执行器地址,然后发送心跳给执行器服务,如果心跳正常,则成功返回该执行器地址,否则返回失败码。

//代码位置:com.xxl.job.admin.core.route.strategy.ExecutorRouteFailover#route
public ReturnT<String> route(TriggerParam triggerParam, List<String> addressList) {

        StringBuffer beatResultSB = new StringBuffer();
        for (String address : addressList) {
            // beat
            ReturnT<String> beatResult = null;
            try {
                ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(address);
                //心跳检测
                beatResult = executorBiz.beat();
            } catch (Exception e) {
                logger.error(e.getMessage(), e);
                beatResult = new ReturnT<String>(ReturnT.FAIL_CODE, ""+e );
            }
            beatResultSB.append( (beatResultSB.length()>0)?"<br><br>":"")
                    .append(I18nUtil.getString("jobconf_beat") + ":")
                    .append("<br>address:").append(address)
                    .append("<br>code:").append(beatResult.getCode())
                    .append("<br>msg:").append(beatResult.getMsg());

            // beat success
            //心跳正常,返回执行器地址
            if (beatResult.getCode() == ReturnT.SUCCESS_CODE) {

                beatResult.setMsg(beatResultSB.toString());
                beatResult.setContent(address);
                return beatResult;
            }
        }
        return new ReturnT<String>(ReturnT.FAIL_CODE, beatResultSB.toString());
}

3)ExecutorRouteConsistentHash是哈希一致性路由方法,首先遍历执行器地址列表,对每一个执行器地址生成100个虚拟节点与执行器地址对应。然后对任务id进行md5 hash,根据hash值从虚拟节点与执行器地址对应关系获取对应的执行器地址返回。

public String hashJob(int jobId, List<String> addressList) {

        // ------A1------A2-------A3------
        // -----------J1------------------
        TreeMap<Long, String> addressRing = new TreeMap<Long, String>();
        for (String address: addressList) {
            //将虚拟节点与执行器地址关联起来
            for (int i = 0; i < VIRTUAL_NODE_NUM; i++) {
                long addressHash = hash("SHARD-" + address + "-NODE-" + i);
                addressRing.put(addressHash, address);
            }
        }

        //将jobid进行md5 hash
        long jobHash = hash(String.valueOf(jobId));
        //获取大于等于 jobHash的虚拟节点与执行器地址对应关系
        SortedMap<Long, String> lastRing = addressRing.tailMap(jobHash);
        //如果不为空,返回第一个key对应的执行器地址
        if (!lastRing.isEmpty()) {
            return lastRing.get(lastRing.firstKey());
        }
        //第一个执行器地址
        return addressRing.firstEntry().getValue();
}

    @Override
public ReturnT<String> route(TriggerParam triggerParam, List<String> addressList) {
        String address = hashJob(triggerParam.getJobId(), addressList);
        return new ReturnT<String>(address);
}

4) ExecutorRouterandom随机路由,从执行器地址列表随机返回一个执行器地址,具体代码如下:

//代码位置:com.xxl.job.admin.core.route.strategy.ExecutorRouterandom#route
public ReturnT<String> route(TriggerParam triggerParam, List<String> addressList) {
        String address = addressList.get(localRandom.nextInt(addressList.size()));
        return new ReturnT<String>(address);
 }

5) ExecutorRouteRound是轮询路由,从执行器地址列表中轮询获取一个执行器地址返回。count()方法就是轮询的次数,将轮询的次数对执行器地址列表取余得到执行器地址在执行器地址列表中索引下标。

//代码位置:com.xxl.job.admin.core.route.strategy.ExecutorRouteRound#route
public ReturnT<String> route(TriggerParam triggerParam, List<String> addressList) {
        String address = addressList.get(count(triggerParam.getJobId())%addressList.size());
        return new ReturnT<String>(address);
}

private static int count(int jobId) {
        // cache clear
        //每一天都清理一次缓存
        if (System.currentTimeMillis() > CACHE_VALID_TIME) {
            routeCountEachJob.clear();
            //当前时间加上一天时间(1000*60*60*24)
            CACHE_VALID_TIME = System.currentTimeMillis() + 1000*60*60*24;
        }

        //从缓存中获取
        AtomicInteger count = routeCountEachJob.get(jobId);
        //不存在或者大于100百万次数
        if (count == null || count.get() > 1000000) {
            // 初始化时主动Random一次,缓解首次压力
            count = new AtomicInteger(new Random().nextInt(100));
        } else {
            // count++
            count.addAndGet(1);
        }
        routeCountEachJob.put(jobId, count);
        return count.get();
}

注:在我实际测试的时候发现轮询策略并没有按顺序进行各个执行器的选择,分析发现triggerParam.getJobId()是任务的主键id,并非执行器集群中各个执行器的序号主键,应该在初始化触发器参数时获取执行器主键Job.executorId修改代码后轮询策略可以正常运行,修改如下:

// 2、init trigger-param
        TriggerParam triggerParam = new TriggerParam();
        triggerParam.setGroupId(groupId);
        triggerParam.setJobId(jobInfo.getExecutorId());

月底终于要验收了江湖再见吧但莫名还有点成就感的坚强打工人乔木小姐
2022.08.22

相关文章

显卡天梯图2024最新版,显卡是电脑进行图形处理的重要设备,...
初始化电脑时出现问题怎么办,可以使用win系统的安装介质,连...
todesk远程开机怎么设置,两台电脑要在同一局域网内,然后需...
油猴谷歌插件怎么安装,可以通过谷歌应用商店进行安装,需要...
虚拟内存这个名词想必很多人都听说过,我们在使用电脑的时候...