XXL-job开源框架相关的源码流程解析。

发布时间 2023-06-21 10:36:01作者: 一蛋双簧(滑稽)

XXL-job框架是一个分布式的定时任务框架。他简单快捷。配置方便。而且用途广泛。所以他的源码非常值得一看。

对于我来说。其中其自写的RPC框架。以及处理发布多个定时任务的高并发处理。是我打开微服务的大门。这是一篇xxl-job源码的解析与流程分析。比较偏口语化。

在这篇随笔中,你能了解到:XXLjob调度中心的初始化过程。调度任务流程。以及触发任务和相关日志统计和失败处理的具体流程。

一.调度中心。
调度中心作为分配任务和校验任务的中心。是通过配置并创建五个线程完成初始化。整个程序逻辑都是建立在这五条线程池或线程上。所以,启动调度中心后第一件事是调用core包内的conf包内的XxlJobAdminConfig类进行初始化。创建XxlJobScheduler对象。对整个流程进行调度配置。(值得注意的是这里的Scheduler维护一个ExecutorBizClient仓库。每个ExecutorBizClient代表一个在线执行器都与调度中心相连接。
1.初始化。
调度者初始化调用了七个方法。创建了分别是1语言切换即国际化方法;2用于向触发器发送任务的线程池;3用于处理因执行器注册而访问数据库的请求;
4监控失败日志的线程;5监控执行器状态的线程;6每天统计日志情况,存放入数据库;7还有最关键的调度算法线程池。于此就完成了调度中心的大部分功能。余下的仅剩通信的原理。
关于通信有两方面来实现功能。一部分是向执行器发送数据的对象,还记得上文提到的ExecutorBizClient仓库吗。ExecutorBizClient就相当于一条链接执行器的单向道。用以专门向自己绑定的执行器发送命令。 而创建ExecutorBizClient一般则是在调用该执行器时。使用单例模式来减少连接数量。当然。在注册取消执行器后仍然存放有ExecutorBizClient。但是在选择任务触发的时候根本就触发不到该对象,因为获取任务时是先读取数据库中的配置,而数据库中在取消执行器后已经被修改过。所以还算比较安全。而ExecutorBizClient这里主要负责向执行器的netty框架发送消息,第二是从执行器中拿取数据的对象。因为调度中心实际上就是一个springboot项目。所以它可以很方便的通过controller来接收传递到的数据和请求。也是很方便的与前端连接起来。这算是初始化的一小部分原理。

1.语言切换不必多说。读取配置带入数据即可。

2.向执行器发送任务,

JobTriggerPoolHelper

,接下来我们叫这个线程为触发线程。触发线程是已经调度完成。决定向执行器集群发送任务的线程。因为存在分片。和其他策略模型。所以不太可能在调度层面就选出自己所要发送的执行器。费时且单个线程的任务太多了。而且调度线程仅负责选取将要运行的任务和传递给其他线程这个任务已经快要运行了的标记。至于其他的一概不管。触发线程维护了两个线程池用以执行任务线程。快线程和慢线程,快慢的区别在于队列长度。快池有一千个,慢池有两千个。快池与慢池的选择策略是维护一个一分钟一更新的以任务id为key的HASH表。每次有个任务执行超过半秒则在表内加一次次数。如果在这一分钟内该任务超时十次。则将其送入慢池。可以理解为这个线程内的代码仅负责选取快池还是慢池。在线程中。任务交由XxlJobTrigger进行触发。在XxlJobTrigger中。其负责所有的任务触发。分片,失效重发等等。其根据传入的任务参数。选择堵塞策略和分配执行器策略。至于轮转也就是分配执行器策略。这里不细说。大致就是困难的基本上是一个任务维护一个链表。根据这个链表选择所需要发送的触发器地址。根据上面所说的维护的ExecutorBizClient仓库选取执行器。而分片仅是将总片数和自己所属的片号发送给执行器。执行器可以通过片号自己区分并处理执行器。发送完成后。获得返回值并不是运行成功的返回值。而是调度触发成功的返回值。从其中获取触发的信息。如触发时间。触发的执行器地址,论转策略,是否超时,重传次数等。作为一次调度方法。作为一行数据将其保存回数据库。这个数据仅存放触发阶段的信息。执行阶段的数据等待后续更新。到此。发送任务的线程算是完成了

3执行器注册而访问数据库的请求

JobRegistryHelper

;虽然已经有Controller来获得注册消息了。但是在XXL中,当前在线执行器和每个执行器集群所拥有的在线执行器不是在同一张表内。这就导致了会有延迟。而这个线程所做的就是根据在线执行器表格维护执行器集群所拥有的在线执行器这一字段。在原代码中。其每隔30秒更新一次。也就导致了如果你在三十秒内多次运行任务,如果你退出一个执行器。那会导致这三十秒内所有的任务发送失败。而且无法及时使用新的执行器来执行任务。最高会有30秒的延迟。解决办法就是在下面源码中作者留了一个freshGroupRegistryInfo方法。将这个线程的所有操作复制给下面那个方法即可。对了。两张表格的好处其实是为了防止中间因为网络波动导致执行器没有接收到信号就删除旧的执行器。这样如果执行器再一次发送心跳信号就导致执行器依然存活但不存在于设备上。虽然有将注册当作心跳消息的解决方法。但是尽可能还是少出BUG。这就是秉承着分布式好死不如赖活着的思想。尽可能默认执行器是活着的。但事实上退出时是确认执行器消失的。所以此时应该对两张表进行更新。这里作者可能是考虑到修改数据库导致的并发问题。如果在意这一点的话可以将freshGroupRegistryInfo加锁。理论上可以减少冲突可能性。

4失败监控者线程

JobFailMonitorHelper

其每隔十毫秒访问一次数据库。查询失败的任务日志。查询到的如果ExecutorFailRetryCount>0那么就可以再触发一次任务。将新的触发日志,和减一的失败触发次数写入jobLog中。以作更新。别的没什么好说的

5监控执行器状态的线程。

JobCompleteHelper

其实就是负责将十分钟仍未完成的且在线执行器表未存在他的执行器的任务标为失败。而且提供了从Controller获得的返回执行结果的处理方法。大致就是将获取这个调度流程的日志对象。将执行结果的部分放入该行数据中。值得注意的是。2.2版本的日志返回值包括了一个returnT对象。任务状态码就放在这个returnT中。而2.3以后的日志取消了这一对象。直接返回一个状态码。如果使用老版本的执行器向新的调度中心发送任务。记得将rollback对象添加一个returnT对象。并将这个对象的状态码赋给原有的新的状态码

6每天统计日志情况

JobLogReportHelper

,存放入数据库。统计线程。这个线程只是单纯的为调度中心的前端服务。其线程仅负责统计前三天的任务成功情况,失败情况。总任务数。且如果存在过期的日志。就将其删掉。默认的删除30天的。

7调度算法线程池。

JobScheduleHelper

这个是整个流程的大头。可以说起始在于它。其构成为两个线程。一个是访问数据库部分。一个是时间轮用以及时触发所需任务。首先是根据数据库中的 下一次触发时间字段选取将来五秒内的触发任务。值得注意的是。如果你是使用cron表达式生成的时间。那么在创建任务时计算时间是从五秒后开始计算。也就是第一次触发任务会在五秒后的第一次符合条件的时间内执行。其目的还不明确,但我猜是为了防止线程预读出现失误导致任务没有被选取。导致任务调度失败。访问完成后获得任务进行判断。如果超时超过五秒。那按照正常超时策略进行选择重新运行还是什么都不做。如果超时未超过五秒。直接执行。如果不曾超时。那作为对象送入时间轮等待时间轮线程触发该任务。时间轮是一个用于定时调度的巧妙算法。类似于操作系统中的时间片。但是不能说是一个任务分段完成。而是说。在某一时间段内确定触发这个任务。由时间轮线程决定任务的触发。而不是由任务本身。这样节约了线程。同时在时间方面也避免了出现误差。