1. 面向chunk处理
一个简单的Step配置如下:
<job id="sampleJob" job-repository="jobRepository">
<step id="step1">
<tasklet transaction-manager="transactionManager">
<chunk reader="itemReader" writer="itemWriter" commit-interval="10"/>
</tasklet>
</step>
</job>
1.1 抽象Step与继承
可以在抽象Step中封装通用逻辑,然后在具体的Step中实现个性化的逻辑:
<step id="parentStep">
<tasklet allow-start-if-complete="true">
<chunk reader="itemReader" writer="itemWriter" commit-interval="10"/>
</tasklet>
</step>
<step id="concreteStep1" parent="parentStep">
<tasklet start-limit="5">
<chunk processor="itemProcessor" commit-interval="5"/>
</tasklet>
</step>
1.2 commit interval
Step在开始处理的时候启动一个事务,通过指定的Spring PlatformTransactionManager周期性地提交Item的写操作,通过interval来指定批次大小:
<job id="sampleJob">
<step id="step1">
<tasklet>
<chunk reader="itemReader" writer="itemWriter" commit-interval="10"/>
</tasklet>
</step>
</job>
1.3 重启
启动次数限制
可以通过step的start-limit属性来设置Step可以执行的次数:
<step id="step1">
<tasklet start-limit="1">
<chunk reader="itemReader" writer="itemWriter" commit-interval="10"/>
</tasklet>
</step>
这个Step只能被执行一次,多次执行将抛出异常。默认情况下,step的start-limit为无穷大,即可以被无限次执行。
重启已经完成的Step
默认情况下,如果一个Step的状态为COMPLETED,那么重新执行该Step时,将被跳过,可以通过配置allow-start-if-complete属性改变这一点,使得已经完成的Step可以重新被执行:
<step id="step1">
<tasklet allow-start-if-complete="true">
<chunk reader="itemReader" writer="itemWriter" commit-interval="10"/>
</tasklet>
</step>
1.4 跳过
对于一些异常情况,我们可能不希望它导致整个任务结束,相反我们希望跳过这些异常的Item,后期再通过日志(通过SkipListener来记录)来特殊处理,这时候可以通过Skip逻辑来实现:
<step id="step1">
<tasklet>
<chunk reader="flatFileItemReader" writer="itemWriter"
commit-interval="10" skip-limit="10">
<skippable-exception-classes>
<include class="org.springframework.batch.item.file.FlatFileParseException"/>
</skippable-exception-classes>
</chunk>
</tasklet>
</step>
上述配置中,如果出现的异常在skippable异常列表中,则将被跳过,被跳过的Item数量上限为10,超出限制将抛出异常(导致Step失败)。如果skippable列表很长或者难以配置,可以通过include和exclude来配置:
<step id="step1">
<tasklet>
<chunk reader="flatFileItemReader" writer="itemWriter"
commit-interval="10" skip-limit="10">
<skippable-exception-classes>
<include class="java.lang.Exception"/>
<exclude class="java.io.FileNotFoundException"/>
</skippable-exception-classes>
</chunk>
</tasklet>
</step>
在决定是否跳过时,使用抛出的异常的最近的超类(nearest superclass)来决定。
include和exclude的顺序无关紧要。
1.5 重试
对于数据库用户名或者密码错误这一类异常,再多的重试连接都没有用。但是对于偶然的数据库死锁或者通信连接异常,通过重试很有可能解决问题。Spring Batch对Step提供了重试支持:
<step id="step1">
<tasklet>
<chunk reader="itemReader" writer="itemWriter"
commit-interval="2" retry-limit="3">
<retryable-exception-classes>
<include class="org.springframework.dao.DeadlockLoserDataAccessException"/>
</retryable-exception-classes>
</chunk>
</tasklet>
</step>
各项配置与Skip类似。
1.6 事务属性与回滚
可以配置一些特定的异常,使得这些异常抛出的时候不回滚事务:
<step id="step1">
<tasklet>
<chunk reader="itemReader" writer="itemWriter" commit-interval="2"/>
<no-rollback-exception-classes>
<include class="org.springframework.batch.item.validator.ValidationException"/>
</no-rollback-exception-classes>
</tasklet>
</step>
事务隔离级别,传播特定等属性也可以配置:
<step id="step1">
<tasklet>
<chunk reader="itemReader" writer="itemWriter" commit-interval="2"/>
<transaction-attributes isolation="DEFAULT"
propagation="REQUIRED"
timeout="30"/>
</tasklet>
</step>
事务属性配置同常规的Spring事务。
1.7 在Step中注册ItemStream流
Step在执行过程中,会在适当的时机调用ItemStream的回调函数,并且从ItemStream中获取状态等信息,持久化到Repository。因此这些ItemStream必须注册到Step中,默认情况下,如果ItemReader、Processor、Writer实现了ItemStream,会被自动注册。否则,需要手动注册,这通常实在非直接依赖中使用了ItemStream,例如在Delegate中使用了ItemStream。组合模式下的组件是常见的例子:
<step id="step1">
<tasklet>
<chunk reader="itemReader" writer="compositeWriter" commit-interval="2">
<streams>
<stream ref="fileItemWriter1"/>
<stream ref="fileItemWriter2"/>
</streams>
</chunk>
</tasklet>
</step>
<beans:bean id="compositeWriter"
class="org.springframework.batch.item.support.CompositeItemWriter">
<beans:property name="delegates">
<beans:list>
<beans:ref bean="fileItemWriter1" />
<beans:ref bean="fileItemWriter2" />
</beans:list>
</beans:property>
</beans:bean>
1.8 拦截Step执行
Step的执行过程中,框架提供了许多钩子用于自定义,主要是以listener形式提供。包括
- StepExecutionListener
- ChunkListener
- ItemReaderListener/ProcessorListener/WriterListener
- SkipListener
- RetryListener
这些监听器提供的方法主要时间点是:执行之前、执行之后、异常发生时
<step id="step1">
<tasklet>
<chunk reader="reader" writer="writer" commit-interval="10"/>
<listeners>
<listener ref="chunkListener"/>
</listeners>
</tasklet>
</step>
2. TaskletStep
批处理(chunk)是一种常见的形式,但是许多场合下不一定能满足要求。此时Tasklet可以派上用场。
Tasklet接口只定义了一个方法execute。TaskletStep将循环调用该方法,直到方法返回RepeatStatus.FINISH或者抛出异常。对Tasklet的调用被封装在一个事务中。
使用如下方法定义一个TaskletStep:
<step id="step1">
<tasklet ref="myTasklet"/>
</step>
这是,不能再使用chunk子元素。
下面是一个Tasklet实现的例子,删除某个目录下的文件:
public class FileDeletingTasklet implements Tasklet, InitializingBean {
private Resource directory;
public RepeatStatus execute(StepContribution contribution,
ChunkContext chunkContext) throws Exception {
File dir = directory.getFile();
Assert.state(dir.isDirectory());
File[] files = dir.listFiles();
for (int i = 0; i < files.length; i++) {
boolean deleted = files[i].delete();
if (!deleted) {
throw new UnexpectedJobExecutionException("Could not delete file " +
files[i].getPath());
}
}
return RepeatStatus.FINISHED;
}
public void setDirectoryResource(Resource directory) {
this.directory = directory;
}
public void afterPropertiesSet() throws Exception {
Assert.notNull(directory, "directory must be set");
}
}
如下使用该Tasklet:
<job id="taskletJob">
<step id="deleteFilesInDir">
<tasklet ref="fileDeletingTasklet"/>
</step>
</job>
<beans:bean id="fileDeletingTasklet"
class="org.springframework.batch.sample.tasklet.FileDeletingTasklet">
<beans:property name="directoryResource">
<beans:bean id="directory"
class="org.springframework.core.io.FileSystemResource">
<beans:constructor-arg value="target/test-outputs/test-dir" />
</beans:bean>
</beans:property>
</beans:bean>
TaskletAdapter提供了复用现有服务的能力:
<bean id="myTasklet" class="o.s.b.core.step.tasklet.MethodInvokingTaskletAdapter">
<property name="targetObject">
<bean class="org.mycompany.FooDao"/>
</property>
<property name="targetMethod" value="updateFoo" />
</bean>
3. 控制Step执行流程
Step条件用于决定该Step完成之后,下一步如何执行。条件分值是根据Step执行结束之后返回的ExistCode来决定的。
3.1 next
<job id="job">
<step id="stepA" parent="s1">
<next on="*" to="stepB" />
<next on="FAILED" to="stepC" />
</step>
<step id="stepB" parent="s2" next="stepC" />
<step id="stepC" parent="s3" />
</job>
其中next的on属性支持通配符
- *: 匹配0或多个字符
- ?: 匹配精确一个字符
这里的on是指step的ExitStatus。区别于JobExecution和StepExecution的属性BatchStatus,ExitStatus用于表示一个Step执行完成之后处于什么样的状态,而BatchStatus表示Job或Step的执行状态。ExitStatus默认情况下等于Step的BatchStatus.但是可以自定义:
<step id="step1" parent="s1">
<end on="FAILED" />
<next on="COMPLETED WITH SKIPS" to="errorPrint1" />
<next on="*" to="step2" />
</step>
其中ExitStatus “COMPLETED WITH SKIPS”来自以下Listener:
public class SkipCheckingListener extends StepExecutionListenerSupport {
public ExitStatus afterStep(StepExecution stepExecution) {
String exitCode = stepExecution.getExitStatus().getExitCode();
if (!exitCode.equals(ExitStatus.FAILED.getExitCode()) &&
stepExecution.getSkipCount() > 0) {
return new ExitStatus("COMPLETED WITH SKIPS");
}
else {
return null;
}
}
}
注意next元素和next属性不能同时使用。
next属性除了指向另外一个step外,还可以指向一个Decider
<job id="job">
<step id="step1" parent="s1" next="decision" />
<decision id="decision" decider="decider">
<next on="FAILED" to="step2" />
<next on="COMPLETED" to="step3" />
</decision>
<step id="step2" parent="s2" next="step3"/>
<step id="step3" parent="s3" />
</job>
<beans:bean id="decider" class="com.MyDecider"/>
MyDecider定义如下:
public class MyDecider implements JobExecutionDecider {
public FlowExecutionStatus decide(JobExecution jobExecution, StepExecution stepExecution) {
if (someCondition) {
return "FAILED";
}
else {
return "COMPLETED";
}
}
}
Decider用于对自定义条件分支。通常情况下,next根据step的返回结果决定下一步如何执行。但是如果使用Decider,那么step执行完成之后,将通过当前的执行上下文,根据Decider返回的状态,决定下一步如何执行。也就是说,默认情况下有一个默认的Decider,这个Decider简单地根据Step返回结果决定下一步,伪代码如下:
public FlowExecutionStatus decide(... , ...) {
return stepExecution.getExitStatus();
}
3.2 flow
flow用于包装一组step,可以被重用。例如:
<job id="job">
<flow id="job1.flow1" parent="flow1" next="step3"/>
<step id="step3" parent="s3"/>
</job>
<flow id="flow1">
<step id="step1" parent="s1" next="step2"/>
<step id="step2" parent="s2"/>
</flow>
上述flow定义组合了2个step,可以被多个job共用。
另外,可以通过split将多个flow并行执行:
<split id="split1" next="step4">
<flow>
<step id="step1" parent="s1" next="step2"/>
<step id="step2" parent="s2"/>
</flow>
<flow>
<step id="step3" parent="s3"/>
</flow>
</split>
<step id="step4" parent="s4"/>
上述两个flow可以并发执行,当两个都完成,也就是split执行完成之后,按照顺序执行下一个step step4.
3.3 JobStep
JobSte将其他Job作为一个Step来执行:
<job id="jobStepJob" restartable="true">
<step id="jobStepJob.step1" next="step2">
<job ref="job" job-launcher="jobLauncher"
job-parameters-extractor="jobParametersExtractor"/>
</step>
<step id=step2> ... <step>
</job>
<job id="job" restartable="true">...</job>
<bean id="jobParametersExtractor" class="org.spr...DefaultJobParametersExtractor">
<property name="keys" value="input.file"/>
</bean>
JobStep会创建一个新的Job Execution。 job-parameters-extractor用于将当前的JobParameter转化为新启Job的JobParameter。
区别于flow,JobStep作为一个独立的step(step内启动新的job execution),而flow只是简单地执行flow中定义的step,整个flow不会作为一个单独的step运行。
4. 在Step中结束Job
在step的执行过程中,可以根据执行结果中断整个job,Spring Batch提供了fail、end、stop三种元素用于在step中中断job。
通常情况下,job的step配置中都有一个没有下一步的step:
have had at least one final Step with no transitions
该step决定Job的最终状态:
- 如果step的返回状态为FAILED,则BatchStatus和ExitStatus都为FAILED
- 否则,两者的状态均为COMPLETED
4.1 end
end元素将使得Jb最终BatchStatus为COMPLETED(不能被重启),如果job通过end元素结束,则ExitStatus默认为COMPLETED,当然可以自定义ExitStatus。
<step id="step1" parent="s1" next="step2">
<step id="step2" parent="s2">
<end on="FAILED"/>
<next on="*" to="step3"/>
</step>
<step id="step3" parent="s3">
4.2 fail
fail元素引导Job以FAILED状态结束,另外可以自定义ExitStatus。以该方式结束的Job可以被重启。
<step id="step1" parent="s1" next="step2">
<step id="step2" parent="s2">
<fail on="FAILED" exit-code="EARLY TERMINATION"/>
<next on="*" to="step3"/>
</step>
<step id="step3" parent="s3">
4.3 stop
stop元素使得Job以STOPPED状态结束,该状态的Job可以被重启(针对STOPPED状态采取一些动作之后),但是必须指定下一次重启的step:
<step id="step1" parent="s1">
<stop on="COMPLETED" restart="step2"/>
</step>
<step id="step2" parent="s2"/>
5. Late Binding与Step、Job scope
对于Job和Step的属性配置,可以使用Spring常规的占位符,将属性配置独立到properties文件中,或者在运行时通过-D参数传入。例如:
<bean id="flatFileItemReader"
class="org.springframework.batch.item.file.FlatFileItemReader">
<property name="resource" value="${input.file.name}" />
</bean>
然后在运行时通过-D传入参数:
-Dinput.file.name="file://file.txt"
或者通过properties文件:
input.file.name=file://file.txt
但是在Spring Batch中,更方便的方式是通过JobParameter传入。为了实现该功能,Spring Batch通过自定义scope来达到Job和Step属性的后期绑定(Late Binding)。
<bean id="flatFileItemReader" scope="step"
class="org.springframework.batch.item.file.FlatFileItemReader">
<property name="resource" value="#{jobParameters['input.file.name']}" />
</bean>
另外,ExecutionContext中的属性也都可以用于组件的属性配置。
<bean id="flatFileItemReader" scope="step"
class="org.springframework.batch.item.file.FlatFileItemReader">
<property name="resource" value="#{jobExecutionContext['input.file.name']}" />
</bean>
<bean id="flatFileItemReader" scope="step"
class="org.springframework.batch.item.file.FlatFileItemReader">
<property name="resource" value="#{stepExecutionContext['input.file.name']}" />
</bean>
5.1 Step Scope
注意到上述的配置中,bean都设置了scope属性,这是必须的。为了在step运行时才绑定参数,这些bean只有在Step启动时才会被实例化。由于step或者job scope不是spring默认的scope,因此需要显示配置,有2中方式可以达到该目的:
1. 使用batch命名空间
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:batch="http://www.springframework.org/schema/batch"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="...">
<batch:job .../>
...
</beans>
2. 显式配置scope
<bean class="org.springframework.batch.core.scope.StepScope" />
两者只能选其一。
5.2 Job Scope
类似于step scope,job scope确保一个执行中的job只有一个bean实例。同时也使得可以从JobParameter或者jobExecutionContext中后期绑定参数。
<bean id="..." class="..." scope="job">
<property name="name" value="#{jobParameters[input]}" />
</bean>
<bean id="..." class="..." scope="job">
// spring EL
<property name="name" value="#{jobExecutionContext['input.name']}.txt" />
</bean>
对于上述bean,如果在多个job配置中引用该bean,那么job scope确保对于这些Job的任何JobInstance,都只有一个唯一的bean。
job scope一样可以通过使用batch命名空间或者显式声明:
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:batch="http://www.springframework.org/schema/batch"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="...">
<batch:job .../>
...
</beans>
<bean class="org.springframework.batch.core.scope.JobScope" />
转载自原文链接, 如需删除请联系管理员。
原文链接:Spring Batch(5): Step详解,转载请注明来源!