网易乐得技术团队

elastic job学习


一、作业系统

作业的必要性

  1. 时间驱动 OR 事件驱动
  2. 批量处理 OR 逐条处理
  3. 非实时性 OR 实时性
  4. 系统内部 OR 系统解耦

常见的作业系统

  1. Quartz
  2. TBSchedule
  3. Crontab

二、elastic-job介绍

elastic-job主要的设计理念是无中心化的分布式定时调度框架,思路来源于Quartz的基于数据库的高可用方案。但数据库没有分布式协调功能,所以在高可用方案的基础上增加了弹性扩容和数据分片的思路,以便于更大限度的利用分布式服务器的资源。

1

1.主要功能

  • 分布式
    • 重写Quartz基于数据库的分布式功能,改用Zookeeper实现注册中心。
  • 并行调度
    • 采用任务分片方式实现。将一个任务拆分为n个独立的任务项,由分布式的服务器并行执行各自分配到的分片项。
  • 弹性扩容缩容
    • 将任务拆分为n个任务项后,各个服务器分别执行各自分配到的任务项。一旦有新的服务器加入集群,或现有服务器下线,elastic-job将在保留本次任务执行不变的情况下,下次任务开始前触发任务重分片。
  • 集中管理
    • 采用基于Zookeeper的注册中心,集中管理和协调分布式作业的状态,分配和监听。外部系统可直接根据Zookeeper的数据管理和- 监控elastic-job。
  • 定制化流程型任务
    • 作业可分为简单和数据流处理两种模式,数据流又分为高吞吐处理模式和顺序性处理模式,其中高吞吐处理模式可以开启足够多的线程快速的处理数据,而顺序性处理模式将每个分片项分配到一个独立线程,用于保证同一分片的顺序性。
  • 失效转移
    • 弹性扩容缩容在下次作业运行前重分片,但本次作业执行的过程中,下线的服务器所分配的作业将不会重新被分配。失效转移功能可以在本次作业运行中用空闲服务器抓取孤儿作业分片执行。同样失效转移功能也会牺牲部分性能。
  • 运行时状态收集
    • 监控作业运行时状态,统计最近一段时间处理的数据成功和失败数量,记录作业上次运行开始时间,结束时间和下次运行时间。
  • 作业停止,恢复和禁用
    • 用于操作作业启停,并可以禁止某作业运行(上线时常用)。
  • Spring命名空间支持
    • elastic-job可以不依赖于spring直接运行,但是也提供了自定义的命名空间方便与spring集成。
  • 运维平台
    • 提供web控制台用于管理作业和注册中心。
  • 稳定性
    • 在服务器无波动的情况下,并不会重新分片;即使服务器有波动,下次分片的结果也会根据服务器IP和作业名称哈希值算出稳定的分片顺序,尽量不做大的变动。
  • 高性能
    • 同一服务器的批量数据处理采用自动切割并多线程并行处理。
  • 灵活性
    • 所有在功能和性能之间的权衡,都可通过配置开启/关闭。如:elastic-job会将作业运行状态的必要信息更新到注册中心。如果作业执行频度很高,会造成大量Zookeeper写操作,而分布式Zookeeper同步数据可能引起网络风暴。因此为了考虑性能问题,可以牺牲一些功能,而换取性能的提升。
  • 幂等性
    • elastic-job可牺牲部分性能用以保证同一分片项不会同时在两个服务器上运行。
  • 容错性
    • 作业服务器与Zookeeper服务器通信失败则立即停止作业运行,防止作业注册中心将失效的分片分项配给其他作业服务器,而当前作业服务器仍在执行任务,导致重复执行。

2.elastic-job的具体模块

  • 去中心化
    • 去中心化指elastic-job并无调度中心这一概念,每个运行在集群中的作业服务器都是对等的,节点之间通过注册中心进行分布式协调。但elastic-job有主节点的概念,主节点用于处理一些集中式任务,如分片,清理运行时信息等,并无调度功能,定时调度都是由作业服务器自行触发。
  • 注册中心
    • 注册中心模块目前直接使用zookeeper,用于记录作业的配置,服务器信息以及作业运行状态。Zookeeper虽然很成熟,但原理复杂,使用较难,在海量数据支持的情况下也会有性能和网络问题。目前elastic-job已经抽象出注册中心的接口,下一步将会考虑支持多注册中心,如etcd,或由用户自行实现注册中心。无临时节点和监听机制的注册中心需要自行实现定时心跳监测等功能。
  • 数据分片
    • 数据分片是elastic-job中实现分布式的重要概念,将真实数据和逻辑分片对应,用于解耦作业框架和数据的关系。作业框架只负责将分片合理的分配给相关的作业服务器,而作业服务器需要根据所分配的分片匹配数据进行处理。服务器分片目前都存储在注册中心中,各个服务器根据自己的IP地址拉取分片。
  • 分布式协调
    • 分布式协调模块用于处理作业服务器的动态扩容缩容。一旦集群中有服务器发生变化,分布式协调将自动监测并将变化结果通知仍存活的作业服务器。协调时将会涉及主节点选举,重分片等操作。目前使用的Zookeeper的临时节点和监听器实现主动检查和通知功能。
  • 定时任务处理
    • 定时任务处理根据cron表达式定时触发任务,目前有防止任务同时触发,错过任务重出发等功能。主要还是使用Quartz本身的定时调度功能,为了便于控制,每个任务都使用独立的线程池。
  • 定制化流程型任务
    • 定制化流程型任务将定时任务分为多种流程,有不经任何修饰的简单任务;有用于处理数据的fetchData/processData的数据流任务;以后还将增加消息流任务,文件任务,工作流任务等。用户能以插件的形式扩展并贡献代码。

3.流程图

下面是作业启动的流程图:

2

  • (1)第一台服务器上线触发主服务器选举。主服务器一旦下线,则重新触发选举,选举过程中阻塞,只有主服务器选举完成,才会执行其他任务。
  • (2)某作业服务器上线时会自动将服务器信息注册到注册中心,下线时会自动更新服务器状态。
  • (3)主节点选举,服务器上下线,分片总数变更均更新重新分片标记。
  • (4)定时任务触发时,如需重新分片,则通过主服务器分片,分片过程中阻塞,分片结束后才可执行任务。如分片过程中主服务器下线,则先选举主服务器,再分片。
  • (5)通过(4)可知,为了维持作业运行时的稳定性,运行过程中只会标记分片状态,不会重新分片。分片仅可能发生在下次任务触发前。
  • (6)每次分片都会按服务器IP排序,保证分片结果不会产生较大波动。
  • (7)实现失效转移功能,在某台服务器执行完毕后主动抓取未分配的分片,并且在某台服务器下线后主动寻找可用的服务器执行任务。

下面是作业执行的流程图:

3

三、elastic-job使用

1.目录结构说明

  • elastic-job-core
    • elastic-job核心模块,只通过Quartz和Curator就可执行分布式作业。
  • elastic-job-spring
    • elastic-job对spring支持的模块,包括命名空间,依赖注入,占位符等。
  • elastic-job-console
    • elastic-job web控制台,可将编译之后的war放入tomcat等servlet容器中使用。
  • elastic-job-example
    • 使用例子。
  • elastic-job-test
    • 测试elastic-job使用的公用类,使用方无需关注。

2.quick start

2.1 引入maven依赖

<dependency>
    <artifactId>elastic-job-lite-core</artifactId>
    <groupId>com.dangdang</groupId>
    <version>2.1.2</version>
</dependency>
<dependency>
    <artifactId>elastic-job-common-core</artifactId>
    <groupId>com.dangdang</groupId>
    <version>2.1.2</version>
    <exclusions>
        <exclusion>
            <artifactId>curator-framework</artifactId>
            <groupId>org.apache.curator</groupId>
        </exclusion>
        <exclusion>
            <artifactId>curator-recipes</artifactId>
            <groupId>org.apache.curator</groupId>
        </exclusion>
    </exclusions>
</dependency>
<dependency>
    <artifactId>curator-framework</artifactId>
    <groupId>org.apache.curator</groupId>         
    <version>2.10.0</version>
</dependency>
<dependency>
    <artifactId>curator-recipes</artifactId>
    <groupId>org.apache.curator</groupId>
    <version>2.10.0</version>
    <exclusions>
        <exclusion>
            <artifactId>curator-framework</artifactId>
            <groupId>org.apache.curator</groupId>
        </exclusion>
    </exclusions>
</dependency>

注意

  • 引入elastic-job-common-core的时候,必须去掉curator-framework和curator-recipes这两个依赖,然后单独引入2.10.0版本的依赖。因为在elastic-job-common-core中,只有2.10.0版本的curator才会创建zk节点,通过elastic-job-common-core引入的curator是2.11.1版本。

2.2 作业开发

elastic-job提供了三种类型的job:SimpleJob,DataflowJob,ScriptJob

  • SimpleJob类型作业

意为简单实现,未经任何封装的类型。需实现SimpleJob接口。该接口仅提供单一方法用于覆盖,此方法将定时执行。与Quartz原生接口相似,但提供了弹性扩缩容和分片等功能。

/**
 * 执行作业.
 *
 * @param shardingContext 分片上下文
 */
void execute(ShardingContext shardingContext);
  • DataflowJob类型作业

Dataflow类型用于处理数据流,需实现DataflowJob接口。该接口提供2个方法可供覆盖,分别用于抓取(fetchData)和处理(processData)数据。

fetchData方法的返回值只有为null或长度为空时,作业会停止执行,不会执行processData。

/**
 * 获取待处理数据.
 *
 * @param shardingContext 分片上下文
 * @return 待处理的数据集合
 */
List<T> fetchData(ShardingContext shardingContext);

/**
 * 处理数据.
 *
 * @param shardingContext 分片上下文
 * @param data 待处理数据集合
 */
void processData(ShardingContext shardingContext, List<T> data);

流式处理

可通过DataflowJobConfiguration配置是否流式处理。

流式处理数据只有fetchData方法的返回值为null或集合长度为空时,作业才停止抓取,否则作业将一直运行下去;
非流式处理数据则只会在每次作业执行过程中执行一次fetchData方法和processData方法,随即完成本次作业。

如果采用流式作业处理方式,建议processData处理数据后更新其状态,避免fetchData再次抓取到,从而使得作业永不停止。
流式数据处理参照TbSchedule设计,适用于不间歇的数据处理。

  • ScriptJob类型作业

Script类型作业意为脚本类型作业,支持shell,python,perl等所有类型脚本。只需通过控制台或代码配置scriptCommandLine即可,无需编码。执行脚本路径可包含参数,参数传递完毕后,作业框架会自动追加最后一个参数为作业运行时信息。

#!/bin/bash
echo sharding execution context is $*

作业运行时输出

sharding execution context is {"jobName":"scriptElasticDemoJob","shardingTotalCount":10,"jobParameter":"","shardingItem":0,"shardingParameter":"A"}

其中,参数shardingContext包含的信息有:

/**
 * 作业名称.
 */
private final String jobName;

/**
 * 作业任务ID.
 */
private final String taskId;

/**
 * 分片总数.
 */
private final int shardingTotalCount;

/**
 * 作业自定义参数.
 * 可以配置多个相同的作业, 但是用不同的参数作为不同的调度实例.
 */
private final String jobParameter;

/**
 * 分配于本作业实例的分片项.
 */
private final int shardingItem;

/**
 * 分配于本作业实例的分片参数.
 */
private final String shardingParameter;

2.3 作业启动配置

作业启动配置有三种方式:

  • java方式配置启动作业
  • 使用Spring但不使用命名空间配置启动
  • 基于Spring命名空间配置启动
2.3.1 java方式配置启动作业

创建作业配置

// 定义作业核心配置
JobCoreConfiguration simpleCoreConfig = JobCoreConfiguration.newBuilder("demoSimpleJob", "0/15 * * * * ?", 10).build();
// 定义simple类型配置
SimpleJobConfiguration simpleJobConfig = new SimpleJobConfiguration(simpleCoreConfig,MyElasticSimpleJob.class.getCanonicalName());
// 定义Lite作业根配置
LiteJobConfiguration simpleJobRootConfig = LiteJobConfiguration.newBuilder(simpleJobConfig).build();

启动作业

 public static void main(String[] args) {
    new JobScheduler(createRegistryCenter(), createJobConfiguration()).init();
}
//注册中心
private static CoordinatorRegistryCenter createRegistryCenter() {
    CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(new ZookeeperConfiguration("zk_host:2181", "elastic-job-demo"));
    regCenter.init();
    return regCenter;
}

a. ZookeeperConfiguration

private final String serverLists;
private final String namespace;
private int baseSleepTimeMilliseconds = 1000;
private int maxSleepTimeMilliseconds = 3000;
private int maxRetries = 3;
private int sessionTimeoutMilliseconds;
private int connectionTimeoutMilliseconds;
private String digest;

b. 作业配置

作业配置分为3级,分别是JobCoreConfiguration,JobTypeConfiguration和LiteJobConfiguration。

LiteJobConfiguration使用JobTypeConfiguration,JobTypeConfiguration使用JobCoreConfiguration,层层嵌套。

JobTypeConfiguration根据不同实现类型分为SimpleJobConfiguration,DataflowJobConfiguration和ScriptJobConfiguration。

  • JobCoreConfiguration属性

    private final String jobName;
    private final String cron;
    private final int shardingTotalCount;
    private final String shardingItemParameters;
    private final String jobParameter;
    private final boolean failover;
    private final boolean misfire;
    private final String description;
    /**
    * 配置jobProperties定义的枚举控制Elastic-Job的实现细节
    * JOB_EXCEPTION_HANDLER用于扩展异常处理类
    * EXECUTOR_SERVICE_HANDLER用于扩展作业处理线程池类
    */
    private final JobProperties jobProperties;
    
  • SimpleJobConfiguration属性

    private final JobCoreConfiguration coreConfig;
    private final JobType jobType = JobType.SIMPLE;
    private final String jobClass;
    
  • DataflowJobConfiguration属性

    private final JobCoreConfiguration coreConfig;
    private final JobType jobType = JobType.DATAFLOW;
    private final String jobClass;
    //是否流式处理数据
    private final boolean streamingProcess;
    
  • ScriptJobConfiguration属性

    private final JobCoreConfiguration coreConfig;
    private final JobType jobType = JobType.SCRIPT;
    private final String jobClass = ScriptJob.class.getCanonicalName();
    //脚本型作业执行命令行
    private final String scriptCommandLine;
    
  • LiteJobConfiguration属性

    private final JobTypeConfiguration typeConfig;
    private final boolean monitorExecution;
    private final int maxTimeDiffSeconds;    
    private final int monitorPort;
    private final String jobShardingStrategyClass;
    private final int reconcileIntervalMinutes;
    private final boolean disabled;
    private final boolean overwrite;
    
2.3.2 使用Spring但不使用命名空间配置启动
 <bean id="job1" class="com.dangdang.ddframe.job.lite.api.JobScheduler" init-method="init">
    <constructor-arg name="regCenter" ref="regCenter2"/>
    <constructor-arg name="liteJobConfig" ref="liteJobConfig"/>
    <constructor-arg name="elasticJobListeners">
        <bean class="com.wf.live.elasticJobTest.MyElasticJobListener"/>
    </constructor-arg>
</bean>

<bean id="regCenter2" class="com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter" init-method="init">
    <constructor-arg>
        <bean class="com.dangdang.ddframe.job.reg.zookeeper.ZookeeperConfiguration">
            <constructor-arg name="serverLists" value="${elasticJob.serverLists}"/>
            <constructor-arg name="namespace" value="${elasticJob.namespace}"/>
        </bean>
    </constructor-arg>
</bean>

<bean id="liteJobConfig" class="com.dangdang.ddframe.job.lite.config.LiteJobConfiguration">
    <constructor-arg name="typeConfig">
        <bean class="com.dangdang.ddframe.job.config.simple.SimpleJobConfiguration">
            <constructor-arg name="coreConfig">
                <bean class="com.dangdang.ddframe.job.config.JobCoreConfiguration">
                    <constructor-arg name="jobName" value="myElasticSimpleJob5"/>
                    <constructor-arg name="cron" value="0/5 * * * * ?"/>
                    <constructor-arg name="shardingTotalCount" value="3"/>
                    <constructor-arg name="shardingItemParameters" value="0=A,1=B,2=C"/>
                    <constructor-arg name="jobProperties">
                        <bean class="com.dangdang.ddframe.job.executor.handler.JobProperties"/>
                    </constructor-arg>
                    <constructor-arg name="misfire" value="true"/>
                    <constructor-arg name="failover" value="false"/>
                    <constructor-arg name="description" value=""/>
                    <constructor-arg name="jobParameter" value=""/>
                </bean>
            </constructor-arg>
            <constructor-arg name="jobClass" value="com.wf.live.elasticJobTest.MyElasticSimpleJob"/>
        </bean>
    </constructor-arg>
    <constructor-arg name="jobShardingStrategyClass"
                     value="com.dangdang.ddframe.job.lite.api.strategy.impl.AverageAllocationJobShardingStrategy"/>
    <constructor-arg name="disabled" value="false"/>
    <constructor-arg name="maxTimeDiffSeconds" value="-1"/>
    <constructor-arg name="monitorExecution" value="true"/>
    <constructor-arg name="monitorPort" value="-1"/>
    <constructor-arg name="overwrite" value="false"/>
    <constructor-arg name="reconcileIntervalMinutes" value="10"/>
</bean>
2.3.3 基于Spring命名空间配置启动

配置作业

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
   xmlns:reg="http://www.dangdang.com/schema/ddframe/reg"
   xmlns:job="http://www.dangdang.com/schema/ddframe/job"
   xsi:schemaLocation="http://www.springframework.org/schema/beans
                    http://www.springframework.org/schema/beans/spring-beans.xsd
                    http://www.dangdang.com/schema/ddframe/reg
                    http://www.dangdang.com/schema/ddframe/reg/reg.xsd
                    http://www.dangdang.com/schema/ddframe/job
                    http://www.dangdang.com/schema/ddframe/job/job.xsd
                    ">
    <!--Zookeeper注册中心 -->
    <reg:zookeeper id="regCenter" server-lists="${elasticJob.serverLists}" namespace="${elasticJob.namespace}"
               base-sleep-time-milliseconds="${elasticJob.baseSleepTimeMilliseconds}"
               max-sleep-time-milliseconds="${elasticJob.maxSleepTimeMilliseconds}"
               max-retries="${elasticJob.maxRetries}"/>

    <!-- 测试配置作业-->
    <job:simple id="myElasticSimpleJob" class="com.wf.live.elasticJobTest.MyElasticSimpleJob"
    registry-center-ref="regCenter" cron="0 0/1 * * * ?" sharding-total-count="3" sharding-item-parameters="0=A,1=B,2=C"/>
    <job:dataflow id="myElasticDataFlowJob" class="com.wf.live.elasticJobTest.MyElasticDataFlowJob"
    registry-center-ref="regCenter" cron="0/20 * * * * ?" sharding-total-count="3"/>
</beans>

启动作业

  • 将配置Spring命名空间的xml通过Spring启动,作业将自动加载。

对于这种配置方式,需要加入maven依赖:

<dependency>
    <artifactId>elastic-job-lite-spring</artifactId>
    <groupId>com.dangdang</groupId>
    <version>2.1.2</version>
</dependency>

a. 注册中心配置

  • reg:zookeeper命名空间属性详细说明:
    12

b. 作业配置

  • job:simple命名空间属性详细说明
    13

  • job:dataflow命名空间属性详细说明

job:dataflow命名空间拥有job:simple命名空间的全部属性,以下仅列出特有属性:
14

如果流式处理数据, 则fetchData不返回空结果将持续执行作业

如果非流式处理数据, 则处理数据完成后作业结束

  • job:script命名空间属性详细说明

job:script命名空间拥有job:simple命名空间的全部属性,以下仅列出特有属性

15

  • job:listener命名空间属性详细说明

job:listener必须配置为job:bean的子元素,并且在子元素中只允许出现一次

16

c. 作业监听配置

  • job:distributed-listener命名空间属性详细说明

job:distributed-listener必须配置为job:bean的子元素,并且在子元素中只允许出现一次

17

3.分片策略

3.1 框架提供的分片策略

  • AverageAllocationJobShardingStrategy

基于平均分配算法的分片策略,也是默认的分片策略。

如果分片不能整除,则不能整除的多余分片将依次追加到序号小的服务器。如:

如果有3台服务器,分成9片,则每台服务器分到的分片是:1=[0,1,2], 2=[3,4,5], 3=[6,7,8]

如果有3台服务器,分成8片,则每台服务器分到的分片是:1=[0,1,6], 2=[2,3,7], 3=[4,5]

如果有3台服务器,分成10片,则每台服务器分到的分片是:1=[0,1,2,9], 2=[3,4,5], 3=[6,7,8]

  • OdevitySortByNameJobShardingStrategy

根据作业名的哈希值奇偶数决定IP升降序算法的分片策略。

作业名的哈希值为奇数则IP升序,作业名的哈希值为偶数则IP降序。

用于不同的作业平均分配负载至不同的服务器。

AverageAllocationJobShardingStrategy的缺点是,一旦分片数小于作业服务器数,作业将永远分配至IP地址靠前的服务器,导致IP地址靠后的服务器空闲。而OdevitySortByNameJobShardingStrategy则可以根据作业名称重新分配服务器负载。如:

如果有3台服务器,分成2片,作业名称的哈希值为奇数,则每台服务器分到的分片是:1=[0], 2=[1], 3=[]

如果有3台服务器,分成2片,作业名称的哈希值为偶数,则每台服务器分到的分片是:3=[0], 2=[1], 1=[]

  • RotateServerByNameJobShardingStrategy

根据作业名的哈希值对服务器列表进行轮转的分片策略。

3.2 自定义分片策略

实现JobShardingStrategy接口并实现sharding方法,接口方法参数为作业服务器IP列表和分片策略选项,分片策略选项包括作业名称,分片总数以及分片序列号和个性化参数对照表,可以根据需求定制化自己的分片策略。

public interface JobShardingStrategy {

    /**
     * 作业分片.
     * 
     * @param jobInstances 所有参与分片的单元列表
     * @param jobName 作业名称
     * @param shardingTotalCount 分片总数
     * @return 分片结果
     */
    Map<JobInstance, List<Integer>> sharding(List<JobInstance> jobInstances, String jobName, int shardingTotalCount);
}

3.3 配置分片策略

与配置通常的作业属性相同,在spring命名空间或者JobConfiguration中配置jobShardingStrategyClass属性,属性值是作业分片策略类的全路径。

4.操作API

4.1 配置类API

  • 作业配置的API:JobSettingsAPI

     /**
     * 获取作业设置.
     *
     * @param jobName 作业名称
     * @return 作业设置对象
     */
    JobSettings getJobSettings(String jobName);
    
    /**
     * 更新作业设置.
     *
     * @param jobSettings 作业设置对象
     */
    void updateJobSettings(JobSettings jobSettings);
    
    /**
     * 删除作业设置.
     *
     * @param jobName 作业名称
     */
    void removeJobSettings(final String jobName);
    

4.2 操作类API

  • 操作作业的API:JobOperateAPI

     /**
     * 作业立刻执行.
     *
     * <p>作业在不与上次运行中作业冲突的情况下才会启动, 并在启动后自动清理此标记.</p>
     *
     * @param jobName 作业名称
     * @param serverIp 作业服务器IP地址
     */
    void trigger(Optional<String> jobName, Optional<String> serverIp);
    
    /**
     * 作业禁用.
     * 
     * <p>会重新分片.</p>
     *
     * @param jobName 作业名称
     * @param serverIp 作业服务器IP地址
     */
    void disable(Optional<String> jobName, Optional<String> serverIp);
    
    /**
     * 作业启用.
     *
     * @param jobName 作业名称
     * @param serverIp 作业服务器IP地址
     */
    void enable(Optional<String> jobName, Optional<String> serverIp);
    
    /**
     * 作业关闭.
     *
     * @param jobName 作业名称
     * @param serverIp 作业服务器IP地址
     */
    void shutdown(Optional<String> jobName, Optional<String> serverIp);
    
    /**
     * 作业删除.
     * 
     * @param jobName 作业名称
     * @param serverIp 作业服务器IP地址
     */
    void remove(Optional<String> jobName, Optional<String> serverIp);
    
  • 操作分片的API:ShardingOperateAPI

     /**
     * 禁用作业分片.
     * 
     * @param jobName 作业名称
     * @param item 分片项
     */
    void disable(String jobName, String item);
    
    /**
     * 启用作业分片.
     *
     * @param jobName 作业名称
     * @param item 分片项
     */
    void enable(String jobName, String item);
    

4.3 统计类API

  • 作业状态展示的API:JobStatisticsAPI

     /**
     * 获取作业总数.
     *
     * @return 作业总数.
     */
    int getJobsTotalCount();
    
    /**
     * 获取所有作业简明信息.
     *
     * @return 作业简明信息集合.
     */
    Collection<JobBriefInfo> getAllJobsBriefInfo();
    
    /**
     * 获取作业简明信息.
     *
     * @param jobName 作业名称
     * @return 作业简明信息.
     */
    JobBriefInfo getJobBriefInfo(String jobName);
    
    /**
     * 获取该IP下所有作业简明信息.
     *
     * @param ip 服务器IP
     * @return 作业简明信息集合.
     */
    Collection<JobBriefInfo> getJobsBriefInfo(String ip);
    
  • 作业服务器状态展示的API:ServerStatisticsAPI

      /**
     * 获取作业服务器总数.
     *
     * @return 作业服务器总数
     */
    int getServersTotalCount();
    
    /**
     * 获取所有作业服务器简明信息.
     *
     * @return 作业服务器简明信息集合
     */
    Collection<ServerBriefInfo> getAllServersBriefInfo();
    
  • 作业分片状态展示的API:ShardingStatisticsAPI

     /**
     * 获取作业分片信息集合.
     *
     * @param jobName 作业名称
     * @return 作业分片信息集合
     */
    Collection<ShardingInfo> getShardingInfo(String jobName);
    

5.作业状态监控

通过监听Elastic-Job-Lite的zookeeper注册中心的几个关键节点即可完成作业运行状态监控功能

5.1 监听作业服务器存活

监听job_name/instances/job_instance_id节点是否存在。该节点为临时节点,如果作业服务器下线,该节点将删除。

如果job_name/instances下没有任何节点,则判断这个job已经下线。

5.2 查看job配置

job_name/config节点存放的是job的配置

9

通过elastic-job运维平台修改job配置之后,会相应的修改zk这个节点的值,后台job会重新按照新配之执行。该节点是持久化的,所以即使任务重启之后,还是会按照修改后的配置执行,不会再去读代码后台的配置。

Elastic-Job-Lite提出了overwrite概念,可通过JobConfiguration或Spring命名空间配置。overwrite=true即允许客户端配置覆盖注册中心,反之则不允许。如果注册中心无相关作业的配置,则无论overwrite是否配置,客户端配置都将写入注册中心。

5.3 查看job分片信息

/job_name/sharding下的子节点就是所有的分片

/job_name/sharding/sharding_num/instance节点错放的是执行该分片的机器

10

6.事件追踪

Elastic-Job提供了事件追踪功能,可通过事件订阅的方式处理调度过程的重要事件,用于查询、统计和监控。Elastic-Job目前提供了基于关系型数据库两种事件订阅方式记录事件。

Elastic-Job-Lite在配置中提供了JobEventConfiguration,目前支持数据库方式配置。

public static void main(String[] args)
{
    new JobScheduler(createRegistryCenter(), createJobConfiguration(), createJobEventConfiguration()).init();
}

private static JobEventConfiguration createJobEventConfiguration()
{
    MysqlDataSource dataSource = new MysqlDataSource();
    dataSource.setURL("jdbc:mysql://localhost:3306/user_info_db?serverTimezone=UTC");
    dataSource.setUser("root");
    dataSource.setPassword("123456");
    // 定义日志数据库事件溯源配置
    JobEventConfiguration jobEventRdbConfig = new JobEventRdbConfiguration(dataSource);
    return jobEventRdbConfig;
}

运行的时候,会往JOB_EXECUTION_LOG和JOB_STATUS_TRACE_LOG两张表中插入记录,若表不存在则自动创建。

JOB_EXECUTION_LOG字段含义

18

JOB_EXECUTION_LOG记录每次作业的执行历史。分为两个步骤:

1.作业开始执行时向数据库插入数据,除failure_cause和complete_time外的其他字段均不为空。
2.作业完成执行时向数据库更新数据,更新is_success, complete_time和failure_cause(如果作业执行失败)。

JOB_STATUS_TRACE_LOG字段含义

19

JOB_STATUS_TRACE_LOG记录作业状态变更痕迹表。可通过每次作业运行的task_id查询作业状态变化的生命周期和运行轨迹。

7.使用限制

  • 作业一旦启动成功后不能修改作业名称,如果修改名称则视为新的作业。
  • 同一台作业服务器只能运行一个相同的作业实例,因为作业运行时是按照IP注册和管理的。
  • 作业根据/etc/hosts文件获取IP地址,如果获取的IP地址是127.0.0.1而非真实IP地址,应正确配置此文件。
  • 一旦有服务器波动,或者修改分片项,将会触发重新分片;触发重新分片将会导致运行中的Perpetual以及SequencePerpetual作业再执行完本次作业后不再继续执行,等待分片结束后再恢复正常。
  • 开启monitorExecution才能实现分布式作业幂等性(即不会在多个作业服务器运行同一个分片)的功能,但monitorExecution对短时间内执行的作业(如每5秒一触发)性能影响较大,建议关闭并自行实现幂等性。
  • elastic-job没有自动删除作业服务器的功能,因为无法区分是服务器崩溃还是正常下线。所以如果要下线服务器,需要手工删除zookeeper中相关的服务器节点。由于直接删除服务器节点风险较大,暂时不考虑在运维平台增加此功能

四、elastic-job运维平台

  1. 主要功能

    • 登录安全控制
    • 注册中心管理
    • 作业维度状态查看
    • 服务器维度状态查看
    • 快捷修改作业设置
    • 控制作业暂停和恢复运行
  2. 设计理念

运维平台和elastic-job并无直接关系,是通过读取作业注册中心数据展现作业状态,或更新注册中心数据修改全局配置。

控制台只能控制作业本身是否运行,但不能控制作业进程的启停,因为控制台和作业本身服务器是完全分布式的,控制台并不能控制作业服务器。

  1. 不支持项

    • 添加作业。因为作业都是在首次运行时自动添加,使用运维平台添加作业并无必要。
    • 停止作业。即使删除了Zookeeper信息也不能真正停止作业的运行,还会导致运行中的作业出问题。
    • 删除作业服务器。由于直接删除服务器节点风险较大,暂时不考虑在运维平台增加此功能。
  2. 部署

    • a. 先在本地执行mvn clean install
    • b. 步骤a之后在target目录下就会有一个elastic-job-lite-console-2.1.2.tar这个文件
    • c. 将压缩包上传到服务器/home/project目录,解压缩jar
    • d. 执行./bin/start.sh
  3. 界面

打开链接之后,可以看到界面

  • 登陆

elastic-job-lite-console-2.1.2.tar解压缩后有一个conf/auth.properties的文件,可以在这个文件中配置用户的登陆信息。

  • 注册中心配置总览

4

注册中心名称 -> 随便取,只要保持唯一即可
注册中心地址 -> server-lists
命名空间 -> namespace
登陆凭证 -> digest
  • 作业维度

5

在作业维度,查看分片状态

6

修改作业

11

  • 服务器维度

7

在服务器维度,查看分片状态

8

五、不足

目前的elastic-job定位是一个基于java的定时任务调度框架,仍然有很多不足的地方:

  1. 异构语言不支持
    • 目前采用的无中心设计,难于支持多语言,后面需要考虑调度中心的可行性。
  2. 监控体系有待提高,目前只能通过注册中心做简单的存活和数据积压监控,未来需要做的监控部分有:
    • 增加可监控维度,如作业运行时间等。
    • 基于JMX的内部状态监控。
    • 基于历史的全量数据监控,将所有监控数据通过flume等形式发到外部监控中心,提供实时分析功能。
  3. 不能支持多种注册中心。
  4. 需要增加任务工作流,如任务依赖,初始化任务,清理任务等。
  5. 失效转移功能的实时性有待提升。
  6. 缺少更多作业类型支持,如文件,MQ等类型作业的支持。
  7. 缺少更多分片策略支持。

六、对比spring batch

  1. Spring Batch 是一款批处理应用框架,不是调度框架。如果我们希望批处理任务定期执行,可结合 Quartz 等成熟的调度框架实现。elastic job集成了调度框架,不需要额外添加。
  2. Spring batch 提供了丰富的读写组件,适用于复杂的流程化作业。
  3. elastic-job采用分片的方式,是分布式调度解决方案。适用场景是:相对于流程比较简单,但是任务可以拆分到多个线程去执行。