首页 » 技术分享 » Spring Batch(5): Step详解

Spring Batch(5): Step详解

 

1. 面向chunk处理

一个简单的Step配置如下:

<job id="sampleJob" job-repository="jobRepository">
    <step id="step1">
        <tasklet transaction-manager="transactionManager">
            <chunk reader="itemReader" writer="itemWriter" commit-interval="10"/>
        </tasklet>
    </step>
</job>

1.1 抽象Step与继承

可以在抽象Step中封装通用逻辑,然后在具体的Step中实现个性化的逻辑:

<step id="parentStep">
    <tasklet allow-start-if-complete="true">
        <chunk reader="itemReader" writer="itemWriter" commit-interval="10"/>
    </tasklet>
</step>

<step id="concreteStep1" parent="parentStep">
    <tasklet start-limit="5">
        <chunk processor="itemProcessor" commit-interval="5"/>
    </tasklet>
</step>

1.2 commit interval

Step在开始处理的时候启动一个事务,通过指定的Spring PlatformTransactionManager周期性地提交Item的写操作,通过interval来指定批次大小:

<job id="sampleJob">
    <step id="step1">
        <tasklet>
            <chunk reader="itemReader" writer="itemWriter" commit-interval="10"/>
        </tasklet>
    </step>
</job>

1.3 重启

启动次数限制

可以通过step的start-limit属性来设置Step可以执行的次数:

<step id="step1">
    <tasklet start-limit="1">
        <chunk reader="itemReader" writer="itemWriter" commit-interval="10"/>
    </tasklet>
</step>

这个Step只能被执行一次,多次执行将抛出异常。默认情况下,step的start-limit为无穷大,即可以被无限次执行。

重启已经完成的Step

默认情况下,如果一个Step的状态为COMPLETED,那么重新执行该Step时,将被跳过,可以通过配置allow-start-if-complete属性改变这一点,使得已经完成的Step可以重新被执行:

<step id="step1">
    <tasklet allow-start-if-complete="true">
        <chunk reader="itemReader" writer="itemWriter" commit-interval="10"/>
    </tasklet>
</step>

1.4 跳过

对于一些异常情况,我们可能不希望它导致整个任务结束,相反我们希望跳过这些异常的Item,后期再通过日志(通过SkipListener来记录)来特殊处理,这时候可以通过Skip逻辑来实现:

<step id="step1">
   <tasklet>
      <chunk reader="flatFileItemReader" writer="itemWriter"
             commit-interval="10" skip-limit="10">
         <skippable-exception-classes>
            <include class="org.springframework.batch.item.file.FlatFileParseException"/>
         </skippable-exception-classes>
      </chunk>
   </tasklet>
</step>

上述配置中,如果出现的异常在skippable异常列表中,则将被跳过,被跳过的Item数量上限为10,超出限制将抛出异常(导致Step失败)。如果skippable列表很长或者难以配置,可以通过include和exclude来配置:

<step id="step1">
    <tasklet>
        <chunk reader="flatFileItemReader" writer="itemWriter"
               commit-interval="10" skip-limit="10">
            <skippable-exception-classes>
                <include class="java.lang.Exception"/>
                <exclude class="java.io.FileNotFoundException"/>
            </skippable-exception-classes>
        </chunk>
    </tasklet>
</step>

在决定是否跳过时,使用抛出的异常的最近的超类(nearest superclass)来决定。
include和exclude的顺序无关紧要。

1.5 重试

对于数据库用户名或者密码错误这一类异常,再多的重试连接都没有用。但是对于偶然的数据库死锁或者通信连接异常,通过重试很有可能解决问题。Spring Batch对Step提供了重试支持:

<step id="step1">
   <tasklet>
      <chunk reader="itemReader" writer="itemWriter"
             commit-interval="2" retry-limit="3">
         <retryable-exception-classes>
            <include class="org.springframework.dao.DeadlockLoserDataAccessException"/>
         </retryable-exception-classes>
      </chunk>
   </tasklet>
</step>

各项配置与Skip类似。

1.6 事务属性与回滚

可以配置一些特定的异常,使得这些异常抛出的时候不回滚事务:

<step id="step1">
   <tasklet>
      <chunk reader="itemReader" writer="itemWriter" commit-interval="2"/>
      <no-rollback-exception-classes>
         <include class="org.springframework.batch.item.validator.ValidationException"/>
      </no-rollback-exception-classes>
   </tasklet>
</step>

事务隔离级别,传播特定等属性也可以配置:

<step id="step1">
    <tasklet>
        <chunk reader="itemReader" writer="itemWriter" commit-interval="2"/>
        <transaction-attributes isolation="DEFAULT"
                                propagation="REQUIRED"
                                timeout="30"/>
    </tasklet>
</step>

事务属性配置同常规的Spring事务。

1.7 在Step中注册ItemStream流

Step在执行过程中,会在适当的时机调用ItemStream的回调函数,并且从ItemStream中获取状态等信息,持久化到Repository。因此这些ItemStream必须注册到Step中,默认情况下,如果ItemReader、Processor、Writer实现了ItemStream,会被自动注册。否则,需要手动注册,这通常实在非直接依赖中使用了ItemStream,例如在Delegate中使用了ItemStream。组合模式下的组件是常见的例子:

<step id="step1">
    <tasklet>
        <chunk reader="itemReader" writer="compositeWriter" commit-interval="2">
            <streams>
                <stream ref="fileItemWriter1"/>
                <stream ref="fileItemWriter2"/>
            </streams>
        </chunk>
    </tasklet>
</step>

<beans:bean id="compositeWriter"
            class="org.springframework.batch.item.support.CompositeItemWriter">
    <beans:property name="delegates">
        <beans:list>
            <beans:ref bean="fileItemWriter1" />
            <beans:ref bean="fileItemWriter2" />
        </beans:list>
    </beans:property>
</beans:bean>

1.8 拦截Step执行

Step的执行过程中,框架提供了许多钩子用于自定义,主要是以listener形式提供。包括
- StepExecutionListener
- ChunkListener
- ItemReaderListener/ProcessorListener/WriterListener
- SkipListener
- RetryListener

这些监听器提供的方法主要时间点是:执行之前、执行之后、异常发生时

<step id="step1">
    <tasklet>
        <chunk reader="reader" writer="writer" commit-interval="10"/>
        <listeners>
            <listener ref="chunkListener"/>
        </listeners>
    </tasklet>
</step>

2. TaskletStep

批处理(chunk)是一种常见的形式,但是许多场合下不一定能满足要求。此时Tasklet可以派上用场。
Tasklet接口只定义了一个方法execute。TaskletStep将循环调用该方法,直到方法返回RepeatStatus.FINISH或者抛出异常。对Tasklet的调用被封装在一个事务中。
使用如下方法定义一个TaskletStep:

<step id="step1">
    <tasklet ref="myTasklet"/>
</step>

这是,不能再使用chunk子元素。

下面是一个Tasklet实现的例子,删除某个目录下的文件:

public class FileDeletingTasklet implements Tasklet, InitializingBean {

    private Resource directory;

    public RepeatStatus execute(StepContribution contribution,
                                ChunkContext chunkContext) throws Exception {
        File dir = directory.getFile();
        Assert.state(dir.isDirectory());

        File[] files = dir.listFiles();
        for (int i = 0; i < files.length; i++) {
            boolean deleted = files[i].delete();
            if (!deleted) {
                throw new UnexpectedJobExecutionException("Could not delete file " +
                                                          files[i].getPath());
            }
        }
        return RepeatStatus.FINISHED;
    }

    public void setDirectoryResource(Resource directory) {
        this.directory = directory;
    }

    public void afterPropertiesSet() throws Exception {
        Assert.notNull(directory, "directory must be set");
    }
}

如下使用该Tasklet:

<job id="taskletJob">
    <step id="deleteFilesInDir">
       <tasklet ref="fileDeletingTasklet"/>
    </step>
</job>

<beans:bean id="fileDeletingTasklet"
            class="org.springframework.batch.sample.tasklet.FileDeletingTasklet">
    <beans:property name="directoryResource">
        <beans:bean id="directory"
                    class="org.springframework.core.io.FileSystemResource">
            <beans:constructor-arg value="target/test-outputs/test-dir" />
        </beans:bean>
    </beans:property>
</beans:bean>

TaskletAdapter提供了复用现有服务的能力:

<bean id="myTasklet" class="o.s.b.core.step.tasklet.MethodInvokingTaskletAdapter">
    <property name="targetObject">
        <bean class="org.mycompany.FooDao"/>
    </property>
    <property name="targetMethod" value="updateFoo" />
</bean>

3. 控制Step执行流程

Step条件用于决定该Step完成之后,下一步如何执行。条件分值是根据Step执行结束之后返回的ExistCode来决定的。

3.1 next

<job id="job">
    <step id="stepA" parent="s1">
        <next on="*" to="stepB" />
        <next on="FAILED" to="stepC" />
    </step>
    <step id="stepB" parent="s2" next="stepC" />
    <step id="stepC" parent="s3" />
</job>

其中next的on属性支持通配符
- *: 匹配0或多个字符
- ?: 匹配精确一个字符

这里的on是指step的ExitStatus。区别于JobExecution和StepExecution的属性BatchStatus,ExitStatus用于表示一个Step执行完成之后处于什么样的状态,而BatchStatus表示Job或Step的执行状态。ExitStatus默认情况下等于Step的BatchStatus.但是可以自定义:

<step id="step1" parent="s1">
    <end on="FAILED" />
    <next on="COMPLETED WITH SKIPS" to="errorPrint1" />
    <next on="*" to="step2" />
</step>

其中ExitStatus “COMPLETED WITH SKIPS”来自以下Listener:

public class SkipCheckingListener extends StepExecutionListenerSupport {
    public ExitStatus afterStep(StepExecution stepExecution) {
        String exitCode = stepExecution.getExitStatus().getExitCode();
        if (!exitCode.equals(ExitStatus.FAILED.getExitCode()) &&
              stepExecution.getSkipCount() > 0) {
            return new ExitStatus("COMPLETED WITH SKIPS");
        }
        else {
            return null;
        }
    }
}

注意next元素和next属性不能同时使用。

next属性除了指向另外一个step外,还可以指向一个Decider

<job id="job">
    <step id="step1" parent="s1" next="decision" />

    <decision id="decision" decider="decider">
        <next on="FAILED" to="step2" />
        <next on="COMPLETED" to="step3" />
    </decision>

    <step id="step2" parent="s2" next="step3"/>
    <step id="step3" parent="s3" />
</job>

<beans:bean id="decider" class="com.MyDecider"/>

MyDecider定义如下:

public class MyDecider implements JobExecutionDecider {
    public FlowExecutionStatus decide(JobExecution jobExecution, StepExecution stepExecution) {
        if (someCondition) {
            return "FAILED";
        }
        else {
            return "COMPLETED";
        }
    }
}

Decider用于对自定义条件分支。通常情况下,next根据step的返回结果决定下一步如何执行。但是如果使用Decider,那么step执行完成之后,将通过当前的执行上下文,根据Decider返回的状态,决定下一步如何执行。也就是说,默认情况下有一个默认的Decider,这个Decider简单地根据Step返回结果决定下一步,伪代码如下:

public FlowExecutionStatus decide(... , ...) {
    return stepExecution.getExitStatus();
}

3.2 flow

flow用于包装一组step,可以被重用。例如:

<job id="job">
    <flow id="job1.flow1" parent="flow1" next="step3"/>
    <step id="step3" parent="s3"/>
</job>

<flow id="flow1">
    <step id="step1" parent="s1" next="step2"/>
    <step id="step2" parent="s2"/>
</flow>

上述flow定义组合了2个step,可以被多个job共用。

另外,可以通过split将多个flow并行执行:

<split id="split1" next="step4">
    <flow>
        <step id="step1" parent="s1" next="step2"/>
        <step id="step2" parent="s2"/>
    </flow>
    <flow>
        <step id="step3" parent="s3"/>
    </flow>
</split>
<step id="step4" parent="s4"/>

上述两个flow可以并发执行,当两个都完成,也就是split执行完成之后,按照顺序执行下一个step step4.

3.3 JobStep

JobSte将其他Job作为一个Step来执行:


<job id="jobStepJob" restartable="true">
   <step id="jobStepJob.step1" next="step2">
      <job ref="job" job-launcher="jobLauncher"
          job-parameters-extractor="jobParametersExtractor"/>
   </step>
   <step id=step2> ... <step>
</job>

<job id="job" restartable="true">...</job>

<bean id="jobParametersExtractor" class="org.spr...DefaultJobParametersExtractor">
   <property name="keys" value="input.file"/>
</bean>

JobStep会创建一个新的Job Execution。 job-parameters-extractor用于将当前的JobParameter转化为新启Job的JobParameter。
区别于flow,JobStep作为一个独立的step(step内启动新的job execution),而flow只是简单地执行flow中定义的step,整个flow不会作为一个单独的step运行。

4. 在Step中结束Job

在step的执行过程中,可以根据执行结果中断整个job,Spring Batch提供了fail、end、stop三种元素用于在step中中断job。
通常情况下,job的step配置中都有一个没有下一步的step:

have had at least one final Step with no transitions

该step决定Job的最终状态:

  • 如果step的返回状态为FAILED,则BatchStatus和ExitStatus都为FAILED
  • 否则,两者的状态均为COMPLETED

4.1 end

end元素将使得Jb最终BatchStatus为COMPLETED(不能被重启),如果job通过end元素结束,则ExitStatus默认为COMPLETED,当然可以自定义ExitStatus。

<step id="step1" parent="s1" next="step2">

<step id="step2" parent="s2">
    <end on="FAILED"/>
    <next on="*" to="step3"/>
</step>

<step id="step3" parent="s3">

4.2 fail

fail元素引导Job以FAILED状态结束,另外可以自定义ExitStatus。以该方式结束的Job可以被重启。

<step id="step1" parent="s1" next="step2">

<step id="step2" parent="s2">
    <fail on="FAILED" exit-code="EARLY TERMINATION"/>
    <next on="*" to="step3"/>
</step>

<step id="step3" parent="s3">

4.3 stop

stop元素使得Job以STOPPED状态结束,该状态的Job可以被重启(针对STOPPED状态采取一些动作之后),但是必须指定下一次重启的step:

<step id="step1" parent="s1">
    <stop on="COMPLETED" restart="step2"/>
</step>

<step id="step2" parent="s2"/>

5. Late Binding与Step、Job scope

对于Job和Step的属性配置,可以使用Spring常规的占位符,将属性配置独立到properties文件中,或者在运行时通过-D参数传入。例如:

<bean id="flatFileItemReader"
      class="org.springframework.batch.item.file.FlatFileItemReader">
    <property name="resource" value="${input.file.name}" />
</bean>

然后在运行时通过-D传入参数:

-Dinput.file.name="file://file.txt"

或者通过properties文件:

input.file.name=file://file.txt

但是在Spring Batch中,更方便的方式是通过JobParameter传入。为了实现该功能,Spring Batch通过自定义scope来达到Job和Step属性的后期绑定(Late Binding)。

<bean id="flatFileItemReader" scope="step"
      class="org.springframework.batch.item.file.FlatFileItemReader">
    <property name="resource" value="#{jobParameters['input.file.name']}" />
</bean>

另外,ExecutionContext中的属性也都可以用于组件的属性配置。

<bean id="flatFileItemReader" scope="step"
      class="org.springframework.batch.item.file.FlatFileItemReader">
    <property name="resource" value="#{jobExecutionContext['input.file.name']}" />
</bean>
<bean id="flatFileItemReader" scope="step"
      class="org.springframework.batch.item.file.FlatFileItemReader">
    <property name="resource" value="#{stepExecutionContext['input.file.name']}" />
</bean>

5.1 Step Scope

注意到上述的配置中,bean都设置了scope属性,这是必须的。为了在step运行时才绑定参数,这些bean只有在Step启动时才会被实例化。由于step或者job scope不是spring默认的scope,因此需要显示配置,有2中方式可以达到该目的:
1. 使用batch命名空间

<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:batch="http://www.springframework.org/schema/batch"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="...">
<batch:job .../>
...
</beans>

2. 显式配置scope

<bean class="org.springframework.batch.core.scope.StepScope" />

两者只能选其一。

5.2 Job Scope

类似于step scope,job scope确保一个执行中的job只有一个bean实例。同时也使得可以从JobParameter或者jobExecutionContext中后期绑定参数。

<bean id="..." class="..." scope="job">
    <property name="name" value="#{jobParameters[input]}" />
</bean>
<bean id="..." class="..." scope="job">
    // spring EL
    <property name="name" value="#{jobExecutionContext['input.name']}.txt" />
</bean>

对于上述bean,如果在多个job配置中引用该bean,那么job scope确保对于这些Job的任何JobInstance,都只有一个唯一的bean。
这里写图片描述

job scope一样可以通过使用batch命名空间或者显式声明:

<beans xmlns="http://www.springframework.org/schema/beans"
          xmlns:batch="http://www.springframework.org/schema/batch"
          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
          xsi:schemaLocation="...">

          <batch:job .../>
          ...
</beans>
<bean class="org.springframework.batch.core.scope.JobScope" />

转载自原文链接, 如需删除请联系管理员。

原文链接:Spring Batch(5): Step详解,转载请注明来源!

0