# SpringBatch_learn
**Repository Path**: MyCodeStudio/spring-batch_learn
## Basic Information
- **Project Name**: SpringBatch_learn
- **Description**: 是一个大数据批量处理的轻量级框架
- **Primary Language**: Unknown
- **License**: Not specified
- **Default Branch**: master
- **Homepage**: None
- **GVP Project**: No
## Statistics
- **Stars**: 1
- **Forks**: 1
- **Created**: 2022-05-17
- **Last Updated**: 2022-05-24
## Categories & Tags
**Categories**: Uncategorized
**Tags**: None
## README
# 第一章 SpringBatch入门
## 第一节 SpringBatch概述
Spring Batch是个轻量级的、 完善的批处理框架,旨在帮助企业建立健壮、高效的批处理应用。Spring Batch是Spring的一个子项目,使用Java语言并基于Spring框架为基础开发,使得已经使用Spring框架的开发者或者企业更容易访问和利用企业服务。
Spring Batch提供了大量可重用的组件,包括了日志追踪、事务、任务作业统计、任务重启、跳过、重复资源管理。对于大数据量和高性能的批处理任
务,Spring Batch同样提供了高级功能和特性来支持比如分区功能、远程功能。总之,通过Spring Batch能够支持简单的、复杂的和大数据量的批处理作业。
Spring Batch是一个批处理应用框架,不是调度框架,但需要和调度框架合作来构建完成的批处理任务。它只关注批处理任务相关的问题,如事务、并发、监控、执行等,并不提供相应的调度功能。如果需要使用调用框架,在商业软件和开源软件中已经有很多优秀的企业级调度框架(如Quartz. Tivoli、 Control-M、 Cron等)可以使用。
**框架主要有以下功能:**
Transaction management (事务管理)
Chunk based processing (基于块的处理)
Declarative 1/0 (声明式的输入输出)
Start/Stop/Restart (启动/停止/再启动)
Retry/Skip (重试/跳过)

框架一共有**4个主要角色**:
**JobLauncher**是任务启动器,通过它来启动任务,可以看做是程序的入口。
**Job**代表着一个具体的任务。
**Step**代表着一个具体的步骤,一个Job可以包含多个Step (想象把大象放进冰箱这个任务需要多少个步骤你就明白了) .
**JobRepository**是存储数据的地方,**可以看做是一个数据库的接口**,在任务执行的时候需要通过它来记录任务状态等等信息。
## 第二节 搭建SpringBatch项目
网站,idea。。
## 第三节 SpringBatch入门程序
```java
@Configuration
@EnableBatchProcessing
public class JobConfiguration {
/**
* @Description: 注入创建任务对象的对象
* @Param:
* @Return:
*/
@Autowired
private JobBuilderFactory jobBuilderFactory;
/**
* @Description: //任务的执行由Step决定,注入创建Step对象的对象
* @Param:
* @Return:
*/
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
/**
* @Description: 创建任务对象
* @Param: []
* @Return: org.springframework.batch.core.Job
*/
public Job helloWorldJob() {
//job名字//开始step
return jobBuilderFactory.get("helloWorldJob")
.start(step1())
.build();
}
@Bean
public Step step1() {
/**
* @Description: step1 step的名字,tasklet执行任务 可以用chunk
* @Param: []
* @Return: org.springframework.batch.core.Step
*/
return stepBuilderFactory.get("step1")
.tasklet(new Tasklet() {
@Override
/**
* @Description: RepeatStatus 状态值,一步步来,一个step结束开始下一个
* @Param: [stepContribution, chunkContext]
* @Return: org.springframework.batch.repeat.RepeatStatus
*/
public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
System.out.println("He11o World!");
return RepeatStatus.FINISHED;
}
}).build();
}
}
```
## 第四节 替换为MySQL数据库
```java
mysq1
mysql-connector-java
ore.springframework.boot
spring-boot-starter-jdbc
```
**application.properties:**
```java
spring.datasource.url=jdbc:mysql://localhost:3306/batch
spring.datasource.driver-class-name=com.mysql.jdbc.Driver
spring.datasource.username=root
spring.datasource.password=123456
spring.datasource.schema=classpath:/org/springframework/batch/core/schema-mysql.sql
spring.batch.initialize-schema=always
spring.datasource.type=com.alibaba.druid.pool.DruidDataSource
```
## 第五节 核心API
, sheode tate = 207085
JooPrsates
JolEauutin
**JobInstance:**该领域概念和ob的关系'jJava中实何和类的关系样,Job定义了个工.作流程。JobInstance就是该 工作流程的一个具体实例。**一个Job可以有多 个JobInstance**
**JobParameters:**是组可以贯穿整Job的运行时**配置参数**。不同的配置将产生不同的JobInstance. 如果你是使用**JobParameters**运行间Job. 那么这次运行会重用 上
**JobExecution:** 该领域概念 表示**JobInstance**的运行。 JobInstance运行时可能会成功或者失败。每次jobInstance的运行都会产生 1个JobExecution. 运行时间,开始结束,状态,成功与否。。
**StepExecution:**类似于 JobExecution.该领域对象表示Step的运行。Step足Job的部分。 因此个StepExecution公关联到 个Jobexecution. 另外。浅对象还会存储很
**ExecutionContext:**上下文,从前面的JobExecution. StepExecution的属性介绍中已经提到I该领域概念。说穿了。该领域概念就是个容器。该容器Batch框架控制。框架对该容器持久化。
# 第二章 作业流
## 第一节 Job的创建和使用
**Job**:作业。批处理中的核心概念,是Batch操作的基础单元。
每个作业Job有1个或者多个作业步Step:
```java
@Configuration
@EnableBatchProcessing
public class JobDemo {
/**
* @Description: 注入创建任务对象的对象
* @Param:
* @Return:
*/
@Autowired
private JobBuilderFactory jobBuilderFactory;
/**
* @Description: //任务的执行由Step决定,注入创建Step对象的对象
* @Param:
* @Return:
*/
@Autowired
private StepBuilderFactory stepBuilderFactory;
/**
* @Description: 让step1,step2,step3依次执行
* @Param: []
* @Return: org.springframework.batch.core.Job
*/
@Bean
public Job jobDemoJob(){
return jobBuilderFactory.get("jobDemoJob")
.start(step1())
.on("COMPLETED").to(step1())
.from(step2()).on("COMPLETED").to(step3())
.from(step3()).end()
.build();
// .from(step2()).on("COMPLETED").fail()
// .from(step2()).on("COMPLETED").stopAndRe
// .start(step1())
// .next(step2())
// .next(step3())
// .build();
}
public Step step3() {
return stepBuilderFactory.get("step3")
.tasklet(new Tasklet() {
@Override
public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
System.out.println("step3");
return RepeatStatus.FINISHED;
}
}).build();
}
public Step step2() {
return stepBuilderFactory.get("step2")
.tasklet(new Tasklet() {
@Override
public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
System.out.println("step2");
return RepeatStatus.FINISHED;
}
}).build();
}
public Step step1() {
return stepBuilderFactory.get("step1")
.tasklet(new Tasklet() {
@Override
public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
System.out.println("step1");
return RepeatStatus.FINISHED;
}
}).build();
}
}
```
## 第二节 Flow的创建和使用
**1.Flow是多个Step的集合**
**2.可以被多个Job复用**
**3.使用FlowBuilder来创建**
```java
@Configuration
@EnableBatchProcessing
public class FlowDemo {
/**
* @Description: 注入创建任务对象的对象
* @Param:
* @Return:
*/
@Autowired
private JobBuilderFactory jobBuilderFactory;
/**
* @Description: //任务的执行由Step决定,注入创建Step对象的对象
* @Param:
* @Return:
*/
@Autowired
private StepBuilderFactory stepBuilderFactory;
/**
* @Description: 创建Flow对象,指明F1ow对象包含哪些step
* @Param: []
* @Return: org.springframework.batch.core.job.flow.Flow
*/
@Bean
public Flow flowDemoFlow(){
return new FlowBuilder("flowDemoFlow")
.start(flowDemoStep1())
.next(flowDemoStep2())
.build();
}
/**
* @Description: fowDemoJob
* @Param: []
* @Return: org.springframework.batch.core.Job
*/
@Bean
public Job fowDemoJob(){
return jobBuilderFactory.get("fowDemoJob0")
.start(flowDemoFlow())
.next(flowDemoStep3())
.end()
.build();
}
... ...
```
## 第三节 split实现并发执行
**实现任务中的多个step或多个flow并发执行**
1:创建若干个step
2:创建两个flow
3:创建-个任务包含以上两个flow,井让这两个flow并发执行
```java
@Configuration
@EnableBatchProcessing
public class SplitDemo {
/**
* @Description: 注入创建任务对象的对象
* @Param:
* @Return:
*/
@Autowired
private JobBuilderFactory jobBuilderFactory;
/**
* @Description: //任务的执行由Step决定,注入创建Step对象的对象
* @Param:
* @Return:
*/
@Autowired
private StepBuilderFactory stepBuilderFactory;
/**
* @Description: 创建Flow对象,指明F1ow对象包含哪些step
* @Param: []
* @Return: org.springframework.batch.core.job.flow.Flow
*/
@Bean
public Flow splitDemoFlow1(){
return new FlowBuilder("splitDemoFlow1")
.start(splitDemoStep1())
.build();
}
@Bean
public Flow splitDemoFlow2(){
return new FlowBuilder("splitDemoFlow2")
.start(splitDemoStep2())
.next(splitDemoStep3())
.build();
}
/**
* @Description: fowDemoJob
* @Param: []
* @Return: org.springframework.batch.core.Job
*/
@Bean
public Job splitDemoJob(){
return jobBuilderFactory.get("splitDemoJob")
.start(splitDemoFlow1())
.split(new SimpleAsyncTaskExecutor())
.add(splitDemoFlow2())
.end()
.build();
}
```
## 第四节 决策器的使用
接口: JobExecutionDecider
```java
public class MyDecider implements JobExecutionDecider {
private int count;
/**
* @Description: decide 决策器,先执行odd奇数
* @Param: [jobExecution, stepExecution]
* @Return: org.springframework.batch.core.job.flow.FlowExecutionStatus
*/
@Override
public FlowExecutionStatus decide(JobExecution jobExecution, StepExecution stepExecution) {
count++;
if(count%2==0){
return new FlowExecutionStatus("even偶数");
}else {
return new FlowExecutionStatus("odd奇数");
}
}
}
```
```java
@Configuration
@EnableBatchProcessing
public class DeciderDemo {
/**
* @Description: 注入创建任务对象的对象
* @Param:
* @Return:
*/
@Autowired
private JobBuilderFactory jobBuilderFactory;
/**
* @Description: //任务的执行由Step决定,注入创建Step对象的对象
* @Param:
* @Return:
*/
@Autowired
private StepBuilderFactory stepBuilderFactory;
/**
* @Description: myDecider 决策器
* @Param: []
* @Return: org.springframework.batch.core.job.flow.JobExecutionDecider
*/
@Bean
public JobExecutionDecider myDecider(){
return new MyDecider();
}
@Bean
public Job deciderDemoJob(){
return jobBuilderFactory.get("deciderDemoJob")
.start(deciderDemoStep1())
.next(myDecider())
.from(myDecider()).on("even偶数").to(deciderDemoStep2())
.from(myDecider())
.on("odd奇数").to(deciderDemoStep3())
.from(deciderDemoStep3())
.on("*").to(myDecider())//无论返回什么,回到决策器 ↑next(myDecider())
.end()
.build();
}
@Bean
public Step deciderDemoStep1(){
return stepBuilderFactory.get("deciderDemoStep1")
.tasklet(new Tasklet() {
@Override
public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
System.out.println("deciderDemoStep1!");
return RepeatStatus.FINISHED;
}
}).build();
}
```
## 第五节 Job的嵌套
一个Job可以嵌套在另一个Job中,被嵌赛的Job称为子Job,外部Job称为父Jb。子job不能单独执行, 賽要由父Job来启动
案例:创建两个Job,作为子Job,再创建一个Job作为父Job
```java
@Configuration
public class NestedDemo {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
private JobLauncher launcher;
@Autowired
private Job childJobOne;
@Autowired
private Job childJobTwo;
@Bean
public Job parentJob(JobRepository jobRepository, PlatformTransactionManager transactionManager){
return jobBuilderFactory.get("parentJobs")
.start(childJob1(jobRepository,transactionManager))
.next(childJob2(jobRepository,transactionManager))
.build();
}
/**
* @Description: childJob2返回Job类型的Step,特殊Step
* @Param: []
* @Return: org.springframework.batch.core.Step
*/
private Step childJob2(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new JobStepBuilder(new StepBuilder("childJob2"))
.job(childJobTwo)
.launcher(launcher)
.repository(jobRepository)
.transactionManager(transactionManager)
.build();
}
private Step childJob1(JobRepository jobRepository,PlatformTransactionManager transactionManager) {
return new JobStepBuilder(new StepBuilder("childJob1"))
.job(childJobOne)
.launcher(launcher)
.repository(jobRepository)
.transactionManager(transactionManager)
.build();
}
}
```
```java
@Configuration
@EnableBatchProcessing
public class ChildJob2 {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public Step childJob2Step1(){
return stepBuilderFactory.get("childJob2Step1")
.tasklet(new Tasklet() {
@Override
public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
System.out.println("childJob2Step1!");
return RepeatStatus.FINISHED;
}
}).build();
}
@Bean
public Step childJob2Step2(){
return stepBuilderFactory.get("childJob2Step2")
.tasklet(new Tasklet() {
@Override
public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
System.out.println("childJob2Step2!");
return RepeatStatus.FINISHED;
}
}).build();
}
@Bean
public Job childJobTwo(){
return jobBuilderFactory.get("childJobTwo!!")
.start(childJob2Step1())
.next(childJob2Step2())
.build();
}
}
```
```java
spring.batch.job.names=parentJobs //选择执行的Job,名字是自己设置的Job名称
```
## 第六节 监听器的使用
用来监听批处理作业的执行情况
创建监听可以通过实现接口或使用注解
```java
//不同的监听,以及触发时机
JobExecutionListener(before,after)
StepExecutionListener(before,after)
ChunkListener(before,after,error)
ItemReadListener,ItemProcessListener,ItemWriteListener(before,after,error)
public class MyJobListener implements JobExecutionListener{
```
**示例代码:**
```java
public class MyChunkListener {
@BeforeChunk
public void beforeChunk(ChunkContext chunkContext){
System.out.println(chunkContext.getStepContext().getStepName()+"before...");
}
@AfterChunk
public void afterChunk(ChunkContext chunkContext){
System.out.println(chunkContext.getStepContext().getStepName()+"after...");
}
}
```
```java
public class MyJobListener implements JobExecutionListener {
@Override
public void beforeJob(JobExecution jobExecution) {
System.out.println(jobExecution.getJobInstance().getJobName()+"before...");
}
@Override
public void afterJob(JobExecution jobExecution) {
System.out.println(jobExecution.getJobInstance().getJobName()+"after...");
}
}
```
```java
@Configuration
@EnableBatchProcessing
public class ListenerDemo {
/**
* @Description: 注入创建任务对象的对象
* @Param:
* @Return:
*/
@Autowired
private JobBuilderFactory jobBuilderFactory;
/**
* @Description: //任务的执行由Step决定,注入创建Step对象的对象
* @Param:
* @Return:
*/
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public Job listenerJob() {
return jobBuilderFactory.get("listenerJob")
.start(step1())
.listener(new MyJobListener())
.build();
}
/**
* @Description: step1 Chunk使用方式
* @Param: []
* @Return: org.springframework.batch.core.Step
*/
@Bean
public Step step1() {
return stepBuilderFactory.get("listenerStep1")
//数据的读取,规定读取和输出的数据类型每读完2个数据进行一次输出处理
.chunk(2)
//容错
.faultTolerant()
.listener(new MyChunkListener())
//数据的读取
.reader(read())
//数据的写入/输出
.writer(write())
.build();
}
/**
* @Description: write
* @Param: []
* @Return: org.springframework.batch.item.ItemWriter
*/
@Bean
public ItemWriter write() {
return new ItemWriter() {
@Override
public void write(List extends String> list) throws Exception {
for(String item:list){
System.out.println(item);
}
}
};
}
/**
* @Description: read
* @Param: []
* @Return: org.springframework.batch.item.ItemReader
*/
@Bean
public ItemReader read() {
return new ListItemReader<>(Arrays.asList("java","spring","mybatis"));
}
}
```
## 第七节 Job参数
在Job运行时可以以key=value形式传递参数
```java
/**
* @ClassName ParametersDemo 实现Step监听器
* @Description ParametersDemo
* @Author ZX
* @Date 2020/5/30
*/
@Configuration
@EnableBatchProcessing
public class ParametersDemo implements StepExecutionListener {
/**
* @Description: 注入创建任务对象的对象
* @Param:
* @Return:
*/
@Autowired
private JobBuilderFactory jobBuilderFactory;
/**
* @Description: //任务的执行由Step决定,注入创建Step对象的对象
* @Param:
* @Return:
*/
@Autowired
private StepBuilderFactory stepBuilderFactory;
private Map parameters;
@Bean
public Job parameterJob() {
return jobBuilderFactory.get("parameterJob")
.start(parameterStep())
.listener(new MyJobListener())
.build();
}
/**
* @Description: parameterStep
* Job执行的是Step,Job使用的数据肯定是在step中使用,只需给Step传递数据。
* 使用监听,使用Step级别的监听来传递数据
* @Param: []
* @Return: org.springframework.batch.core.Step
*/
@Bean
public Step parameterStep() {
return stepBuilderFactory.get("parameterStep")
.listener(this)
.tasklet(new Tasklet() {
@Override
public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
//输出接收到的参数的值
System.out.println(parameters.get("info"));
return RepeatStatus.FINISHED;
}
}).build();
}
@Override
public void beforeStep(StepExecution stepExecution) {
parameters = stepExecution.getJobParameters().getParameters();
}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
return null;
}
}
```
**输入参数:**

# 第三章 数据输入
## 第一节 ItemReader概述
```java
@Configuration
@EnableBatchProcessing
public class ItemReaderDemo {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public Job itemReaderDemoJob() {
return jobBuilderFactory.get("itemReaderDemoJob")
.start(itemReaderDemoStep())
.listener(new MyJobListener())
.build();
}
@Bean
public Step itemReaderDemoStep() {
return stepBuilderFactory.get("itemReaderDemoStep")
.chunk(2)
.reader(itemReaderDemoRead())
.writer(list->{
for (Object item:list){
System.out.println(item+"...");
}
}).build();
}
/**
* @Description: itemReaderDemoRead 自定义itemReader
* @Param: []
* @Return: MyReader
*/
@Bean
public MyReader itemReaderDemoRead() {
List data = Arrays.asList("鼠","牛","虎","兔");
return new MyReader(data);
}
}
```
```java
/**
* @ClassName MyReader
* 自定义ItemReader
* @Description MyReader
* @Author ZX
* @Date 2020/5/30
*/
public class MyReader implements ItemReader {
private Iterator iterator;
//构造函数
public MyReader(List list){
this.iterator=list.iterator();
}
@Override
public String read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
//默认一个一个读数据
if(iterator.hasNext()){
return this.iterator.next();
}else {
return null;
}
}
}
```
## 第二节 从数据库中读取数据
用**JdbcPagingltemReader**类实现
```java
@Configuration
@EnableBatchProcessing
public class ItemReaderDbDemo {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
private DataSource dataSource;
@Autowired
@Qualifier("dbJdbcWriter")
private ItemWriter super User> dbJdbcWriter;
@Bean
public Job ItemReaderDbDemoJob(){
return jobBuilderFactory.get("itemReaderDbDemoJob")
.start(itemReaderDbStep())
.listener(new MyJobListener())
.build();
}
@Bean
public Step itemReaderDbStep() {
return stepBuilderFactory.get("itemReaderDemoStep")
.chunk(2)
.reader(dbJdbcReader())
.writer(dbJdbcWriter)
.build();
}
@Bean
@StepScope
public JdbcPagingItemReader dbJdbcReader() {
JdbcPagingItemReader reader = new JdbcPagingItemReader();
reader.setDataSource(dataSource);
reader.setFetchSize(2);
//把读取到的记录转换成User对象
reader.setRowMapper(new RowMapper() {
/**
* @Description: 结果集的映射
* @Param: [resultSet, i]
* @Return: com.example.springbootjdbc.pojo.Users
*/
@Override
public User mapRow(ResultSet resultSet, int rowNum) throws SQLException {
User users = new User();
users.setId(resultSet.getInt("id"));
users.setAge(resultSet.getInt("age"));
users.setPhone(resultSet.getInt("phone"));
users.setUsername(resultSet.getString("username"));
users.setEmail(resultSet.getString("email"));
return users;
}
});
//指定sq1语句
MySqlPagingQueryProvider provider =new MySqlPagingQueryProvider () ;
provider.setSelectClause ("id,username,age,phone,email") ;
provider.setFromClause ("from users") ;
//指定根据哪个字段进行排序
Map sort = new HashMap<>(1) ;
sort.put("id", Order.ASCENDING);
provider.setSortKeys(sort);
reader.setQueryProvider(provider);
return reader ;
}
}
```
```java
/**
* @ClassName DbJdbcWriter 自建ItemWriter
* @Description DbJdbcWriter
* @Author ZX
* @Date 2020/5/30
*/
@Component("dbJdbcWriter")
public class DbJdbcWriter implements ItemWriter{
@Override
public void write(List extends User> list) throws Exception {
for (User user:list){
System.out.println(user);
}
}
}
```
## 第三节 从普通文件中读取数据
**用FlatFileItemReader类**
customer.txt内容如下:
```java
id,firstName,lastName,birthday
1,Stone,Barrett,1964-10-19 14:11:03
2,Raymond,Pace,1977-12-11 21 :44:30
3,Armando,Logan,1986-12-25 11:54:28
4,Latifah,Barnett,1959-07-24 06:00:16
5,Cassandra,Moses,1956-09-14 06:49:28
6,Audra,Hopkins,1984-08-30 04:18:10
7,Upton,Morrow,1973-82-04 05:26:05
8,Melodie,Velasquez,1953-04-26 11:16:26
9,Sybill,Nolan,1951-06-24 14:56:51
10,Glenna,Little,1953-08-27 13:15:08
11,Ingrid,Jackson,1957-09-05 21:36:47
12,Duncan,Castaneda,1979-01 21 18:31:27
13,Xaviera,Gillespie,1965-07-18 15:05:22
14,Rhoda,Lancaster,1990-09-11 15:52:54
15,Fatima,Combs,1979-06-01 06: 58: 54
```
示例代码
```java
@Configuration
@EnableBatchProcessing
public class FileItemReaderDemo {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
@Qualifier("fileFileItemWriter")
private ItemWriter super Customer> fileFileItemWriter;
@Bean
public Job FileItemReaderDemo() {
return jobBuilderFactory.get("FileItemReaderDemo")
.start(FileItemReaderDemoStep())
.listener(new MyJobListener())
.build();
}
@Bean
public Step FileItemReaderDemoStep() {
return stepBuilderFactory.get("FileItemReaderDemoStep")
.chunk(5)
.reader(fileItemReaderDemoReader())
.writer(fileFileItemWriter)
.build();
}
/**
* @Description: fileItemReaderDemoReader 文件读取
* @Param: []
* @Return: org.springframework.batch.item.file.FlatFileItemReader
*/
@Bean
public FlatFileItemReader fileItemReaderDemoReader() {
FlatFileItemReader reader = new FlatFileItemReader();
reader.setResource(new ClassPathResource("customer.txt"));
//跳过第一行
reader.setLinesToSkip(1);
//数据解析
DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
tokenizer.setNames(new String[]{"id","fistName","lastName","birthday"});
//把解析出的一个数据映射为Customer对象
DefaultLineMapper mapper = new DefaultLineMapper<>();
mapper.setLineTokenizer(tokenizer);
mapper.setFieldSetMapper(new FieldSetMapper() {
/**
* @Description: mapFieldSet 映射
* @Param: [fieldSet]
* @Return: com.example.springbatchdemo.pojo.Customer
*/
@Override
public Customer mapFieldSet(FieldSet fieldSet) throws BindException {
Customer customer = new Customer();
customer.setId(fieldSet.readLong("id"));
customer.setFirstName(fieldSet.readString("fistName"));
customer.setLastName(fieldSet.readString("lastName"));
customer.setBirthday(fieldSet.readString("birthday"));
return customer;
}
});
mapper.afterPropertiesSet();
reader.setLineMapper(mapper);
return reader;
}
}
```
自建ItemWriter:
```java
@Component("fileFileItemWriter")
public class FileFileItemWriter implements ItemWriter {
@Override
public void write(List extends Customer> list) throws Exception {
for (Customer customer:list){
System.out.println(customer);
}
}
}
```
## 第四节 从XML文件中读取数据
**使用StaxEventtemReader类**
xml文件:
```xml
1
Mufutau
Maddox
2017-06-05 19:43:51PM
2
Brenden
Cobb
2017-01-06 13:18:17PM
3
Kerry
Joseph
2016-09-15 18:32:33PM
4
asdasd
Joseph
2016-09-15 18:32:33PM
5
JOJO5
Jobana
2016-09-15 18:32:33PM
6
XuLun
JoTaiLang
2046-09-15 18:32:33PM
7
JaiLuo
JieBeiLing
2077-09-15 18:32:33PM
```
配置pom文件
```xml
org.springframework
spring-oxm
com.thoughtworks.xstream
xstream
1.4.7
```
示例代码:
```java
@Configuration
@EnableBatchProcessing
public class XmlItemReaderDemo {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
@Qualifier("xmlFileWriter")
private ItemWriter super Customer> xmlFileWriter;
@Bean
public Job xmlItemReaderDemoJob() {
return jobBuilderFactory.get("xmlItemReaderDemoJob")
.start(xmlItemReaderDemoStep())
.listener(new MyJobListener())
.build();
}
@Bean
public Step xmlItemReaderDemoStep() {
return stepBuilderFactory.get("xmlItemReaderDemoStep")
.chunk(5)
.reader(xmlFileReader())
.writer(xmlFileWriter)
.build();
}
/**
* @Description: xmlFileReader 文件读取
* @Param: []
* @Return: org.springframework.batch.item.file.FlatFileItemReader
*/
@Bean
@StepScope
public StaxEventItemReader xmlFileReader() {
StaxEventItemReader reader = new StaxEventItemReader();
//指定文件位置
reader.setResource(new ClassPathResource("customer.xml"));
//跳过第一行
reader.setFragmentRootElementName("customer");
//把xml转成对象
XStreamMarshaller unmarshaller = new XStreamMarshaller();
//告诉unmarshaller把xml转成什么类型
Map map = new HashMap<>();
map.put("customer",Customer.class);
unmarshaller.setAliases(map);
reader.setUnmarshaller(unmarshaller);
return reader;
}
}
```
自建Itemwriter
```java
@Component("xmlFileWriter")
public class XmlItemWriter implements ItemWriter {
@Override
public void write(List extends Customer> list) throws Exception {
for (Customer customer:list){
System.out.println(customer);
}
}
}
```
## 第五节 从多个文件中读取数据
**使用MultiResourceItemReader类**
file1文件如下,file2,file3省略
```txt
1,Stone, Barrett, 1964-10-19 14:11:03
2,Raymond, Pace,1977-12-11 21:44:30
3,Armando, Logan,1986-12-25 11:54:28
4,Latifah, Barnett,1959-07-24 06:00:16
5,Cassandra, Moses,1956-09-14 06:49:28
6,Audra, Hopkins,1984-08-30 04:18:10
7,Upton, Morrow,1973-02-04 05:26:05
8,Melodie, Velasquez,1953-04-26 11:16:26
9,sybill, Nolan,1951-06-24 14:56:51
10,Glenna, Little, 1953-08-27 13:15:08
11,Ingrid, Jackson,1957-09-05 21:36:47.
12,Duncan, Castaneda,1979-01-21 18:31:27
13,Xaviera, Gillespie,1965-07-18 15:05:22
14,Rhoda, Lancaster,1990-09-11 15:52:54
15,Fatima, Combs,1979-06-01 06:58:54
16,Merri1l, Hopkins ,1990-07-02 17:36:35
17,Felicia, Vinson,1959-12-19 20:23:12
18,Hanae , Harvey, 1984-12-27 10:36:49
19,Ramona, Acosta,1962-06-23 20:03:40
20,Katelyn, Hammond ,1988-11-12 19:05:13
```
示例代码:
```java
@Configuration
@EnableBatchProcessing
public class MultiFileItemReaderDemo {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
@Qualifier("multiFileWriter")
private ItemWriter super Customer> multiFileWriter;
@Value("classpath:/file*.txt")
private Resource[] fileResources;
@Bean
public Job multiFileItemReaderDemoJob() {
return jobBuilderFactory.get("multiFileItemReaderDemoJob")
.start(multiFileItemReaderDemoStep())
.listener(new MyJobListener())
.build();
}
@Bean
public Step multiFileItemReaderDemoStep() {
return stepBuilderFactory.get("multiFileItemReaderDemoStep")
.chunk(5)
.reader(multiFileReader())
.writer(multiFileWriter)
.build();
}
/**
* @Description: multiFileReader 虽说是多文件读取,但其实是逐个读取单个文件
* @Param: []
* @Return: org.springframework.batch.item.file.MultiResourceItemReader
*/
@Bean
@StepScope
public MultiResourceItemReader multiFileReader() {
MultiResourceItemReader reader = new MultiResourceItemReader<>();
reader.setDelegate(fileItemReaderDemoReader());
reader.setResources(fileResources);
return reader;
}
/**
* @Description: fileItemReaderDemoReader 单个文件读取
* @Param: []
* @Return: org.springframework.batch.item.file.FlatFileItemReader
*/
@Bean
public FlatFileItemReader fileItemReaderDemoReader() {
FlatFileItemReader reader = new FlatFileItemReader();
reader.setResource(new ClassPathResource("customer.txt"));
//跳过第一行
//reader.setLinesToSkip(1);
//数据解析
DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
tokenizer.setNames(new String[]{"id","fistName","lastName","birthday"});
//把解析出的一个数据映射为Customer对象
DefaultLineMapper mapper = new DefaultLineMapper<>();
mapper.setLineTokenizer(tokenizer);
mapper.setFieldSetMapper(new FieldSetMapper() {
/**
* @Description: mapFieldSet 映射
* @Param: [fieldSet]
* @Return: com.example.springbatchdemo.pojo.Customer
*/
@Override
public Customer mapFieldSet(FieldSet fieldSet) throws BindException {
Customer customer = new Customer();
customer.setId(fieldSet.readLong("id"));
customer.setFirstName(fieldSet.readString("fistName"));
customer.setLastName(fieldSet.readString("lastName"));
customer.setBirthday(fieldSet.readString("birthday"));
return customer;
}
});
mapper.afterPropertiesSet();
reader.setLineMapper(mapper);
return reader;
}
}
```
自定义ItemWriter
```java
@Component("multiFileWriter")
public class MultiFileWriter implements ItemWriter {
@Override
public void write(List extends Customer> list) throws Exception {
for (Customer customer:list){
System.out.println(customer);
}
}
}
```
## 第六节 ItemReader异常处理及重启
# 第四章 数据输出
## 第一节 ItemWriter概述
ItemReader是一个数据一个数据的读, 而ItemWriter是一批一批的输出
```java
@Configuration
@EnableBatchProcessing
public class ItemWriterDemo {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
@Qualifier("myWriter")
private ItemWriter super String> myWriter;
@Bean
public Job ItemWriterDemoJob() {
return jobBuilderFactory.get("ItemWriterDemoJob")
.start(ItemWriterDemoStep())
.listener(new MyJobListener())
.build();
}
@Bean
public Step ItemWriterDemoStep() {
return stepBuilderFactory.get("ItemWriterDemoStep")
.chunk(2)
.reader(myReader())
.writer(myWriter).build();
}
@Bean
public ItemReader myReader() {
List items = new ArrayList<>();
for (int i = 1; i <= 50; i++) {
items.add("java" + i);
}
return new ListItemReader(items);
}
}
```
```java
/**
* @ClassName MyWriter
* @Description MyWriter
* @Author ZX
* @Date 2020/6/1
*/
@Component("myWriter")
public class MyWriter implements ItemWriter {
@Override
public void write(List extends String> list) throws Exception {
//输出一批的数量,chunk的值
System.out.println(list.size());
for (String str:list){
System.out.println(str);
}
}
}
```
## 第二节 数据输出到数据库
实现的各种类:
**Neo4jltemWriter**
**MongoltemWriter**
**RepositoryltemWriter**
**HibernateltemWriter**
**JdbcBatchltemWriter**
**JpaltemWriter**
**GemfireltemWriter**
Mysql这里使用**JdbcBatchltemWriter**
示例代码
```java
@Configuration
@EnableBatchProcessing
public class ItemWriterDbDemo {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
@Qualifier("itemWriterDb")
private ItemWriter super Customer> itemWriterDb;
@Autowired
@Qualifier("fileItemReaderDemoReader")
private ItemReader extends Customer> flatFileReader;
@Bean
public Job ItemWriterDbDemoJob() {
return jobBuilderFactory.get("ItemWriterDbDemoJob")
.start(ItemWriterDbDemoStep())
.listener(new MyJobListener())
.build();
}
@Bean
public Step ItemWriterDbDemoStep() {
return stepBuilderFactory.get("ItemWriterDbDemoStep")
.chunk(5)
.reader(flatFileReader)
.writer(itemWriterDb)
.build();
}
}
```
从文件中读取:
```java
@Configuration
public class FlatFileReaderConfig {
/**
* @Description: FlatItemReaderDemoReader 文件读取
* @Param: []
* @Return: org.springframework.batch.item.file.FlatFileItemReader
*/
@Bean
public FlatFileItemReader fileItemReaderDemoReader() {
FlatFileItemReader reader = new FlatFileItemReader();
reader.setResource(new ClassPathResource("customer.txt"));
//跳过第一行
reader.setLinesToSkip(1);
//数据解析
DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
tokenizer.setNames(new String[]{"id","fistName","lastName","birthday"});
//把解析出的一个数据映射为Customer对象
DefaultLineMapper mapper = new DefaultLineMapper<>();
mapper.setLineTokenizer(tokenizer);
mapper.setFieldSetMapper(new FieldSetMapper() {
/**
* @Description: mapFieldSet 映射
* @Param: [fieldSet]
* @Return: com.example.springbatchdemo.pojo.Customer
*/
@Override
public Customer mapFieldSet(FieldSet fieldSet) throws BindException {
Customer customer = new Customer();
customer.setId(fieldSet.readLong("id"));
customer.setFirstName(fieldSet.readString("fistName"));
customer.setLastName(fieldSet.readString("lastName"));
customer.setBirthday(fieldSet.readString("birthday"));
return customer;
}
});
mapper.afterPropertiesSet();
reader.setLineMapper(mapper);
return reader;
}
}
```
写入数据库:
```java
@Configuration
public class ItemWriterDbConfig {
@Autowired
private DataSource dataSource;
@Bean
public JdbcBatchItemWriter itemWriterDb(){
JdbcBatchItemWriter writer = new JdbcBatchItemWriter();
writer.setDataSource(dataSource);
writer.setSql("insert into customer(id,firstName,lastName,birthday) values" +
"(:id,:firstName,:lastName,:birthday)");
//将Customer中对应属性的值与Sql语句中的四个值进行替换
writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider());
return writer;
}
}
```
## 第三节 数据输出到普通文件
**FlatFileltemWriter**类
示例代码:
```java
@Configuration
@EnableBatchProcessing
public class FileItemWriterDemo {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
@Qualifier("dbJdbcReader")
private ItemReader extends Customer> dbJdbcReader;
@Autowired
@Qualifier("fileItemWriter")
private ItemWriter super Customer> fileItemWriter;
/**
* @Description: fileItemWriterDemoJob
* @Param: []
* @Return: org.springframework.batch.core.Job
*/
@Bean
public Job fileItemWriterDemoJob() {
return jobBuilderFactory.get("fileItemWriterDemoJob")
.start(fileItemWriterDemoStep())
.listener(new MyJobListener())
.build();
}
/**
* @Description: fileItemWriterDemoStep
* @Param: []
* @Return: org.springframework.batch.core.Step
*/
@Bean
public Step fileItemWriterDemoStep() {
return stepBuilderFactory.get("fileItemWriterDemoStep")
.chunk(5)
.reader(dbJdbcReader)
.writer(fileItemWriter)
.build();
}
}
```
写入文件:
```java
@Configuration
public class FIleItemWriter {
@Bean
/**
* @Description: fileItemWriter 向文件输出数据,覆盖原先数据
* @Param: []
* @Return: org.springframework.batch.item.file.FlatFileItemWriter
*/
public FlatFileItemWriter fileItemWriter(){
//把Customer对象转成字符串输出到文件
FlatFileItemWriter writer = new FlatFileItemWriter();
String path="F:\\test/customer.txt";
writer.setResource(new FileSystemResource(path));
//把Customer对象转成字符串
writer.setLineAggregator(new LineAggregator() {
ObjectMapper mapper = new ObjectMapper();
@Override
public String aggregate(Customer customer) {
String str = null;
try {
str=mapper.writeValueAsString(customer);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
return str;
}
});
try {
writer.afterPropertiesSet();
} catch (Exception e) {
e.printStackTrace();
}
return writer;
}
}
```
从数据库中读取:
```java
@Configuration
public class DbJdbcReaderConfig {
@Autowired
private DataSource dataSource;
/**
* @Description: dbJdbcReader 从数据库中读取数据
* @Param: []
* @Return: org.springframework.batch.item.database.JdbcPagingItemReader
*/
@Bean
public JdbcPagingItemReader dbJdbcReader() {
JdbcPagingItemReader reader = new JdbcPagingItemReader();
reader.setDataSource(dataSource);
//设置读取缓存,每次取5个
reader.setFetchSize(5);
//把读取到的记录转换成User对象
reader.setRowMapper(new RowMapper() {
/**
* @Description: 结果集的映射
* @Param: [resultSet, rowNum]
* @Return: com.example.springbootjdbc.pojo.Customer
*/
@Override
public Customer mapRow(ResultSet resultSet, int rowNum) throws SQLException {
Customer customer = new Customer();
customer.setId(resultSet.getLong(1));
customer.setFirstName(resultSet.getString(2));
customer.setLastName(resultSet.getString(3));
customer.setBirthday(resultSet.getString(4));
return customer;
}
});
//指定sq1语句
MySqlPagingQueryProvider provider =new MySqlPagingQueryProvider () ;
provider.setSelectClause ("id,firstName,lastName,birthday") ;
provider.setFromClause ("from customer") ;
//指定根据哪个字段进行排序
Map sort = new HashMap<>(1) ;
sort.put("id", Order.ASCENDING);
provider.setSortKeys(sort);
reader.setQueryProvider(provider);
return reader ;
}
}
```
## 第四节 数据输出到xml文件
**StaxEvenltemWriter**
代码:
```java
@Configuration
@EnableBatchProcessing
public class XmlItemWriterDemo {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
@Qualifier("dbJdbcReader")
private ItemReader extends Customer> dbJdbcReader;
@Autowired
@Qualifier("xmlItemWriter")
private ItemWriter super Customer> xmlItemWriter;
/**
* @Description: xmlItemWriterDemoJob
* @Param: []
* @Return: org.springframework.batch.core.Job
*/
@Bean
public Job xmlItemWriterDemoJob() {
return jobBuilderFactory.get("xmlItemWriterDemoJob")
.start(xmlItemWriterDemoStep())
.listener(new MyJobListener())
.build();
}
/**
* @Description: xmlItemWriterDemoStep
* @Param: []
* @Return: org.springframework.batch.core.Step
*/
@Bean
public Step xmlItemWriterDemoStep() {
return stepBuilderFactory.get("xmlItemWriterDemoStep")
.chunk(5)
.reader(dbJdbcReader)
.writer(xmlItemWriter)
.build();
}
}
```
写入xml文件:
```java
@Configuration
public class XmlItemWriterConfig {
@Bean
public StaxEventItemWriter xmlItemWriter(){
StaxEventItemWriter writer = new StaxEventItemWriter();
XStreamMarshaller marshaller = new XStreamMarshaller();
//告诉marshaller把数据转成什么类型
Map map = new HashMap<>();
map.put("customer",Customer.class);
marshaller.setAliases(map);
writer.setRootTagName("customers");
writer.setMarshaller(marshaller);
String path = "F:\\test/cus.xml";
writer.setResource(new FileSystemResource(path));
try {
writer.afterPropertiesSet();
} catch (Exception e) {
e.printStackTrace();
}
return writer;
}
}
```
## 第五节 数据输出到多个文件
**CompositeltemWriter**
**CassifireCompositeltemWriter** 根据分类写入不同文件
代码:
```java
@Configuration
@EnableBatchProcessing
public class MultiFileItemWriterDemo {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
@Qualifier("dbJdbcReader")
private ItemReader extends Customer> dbJdbcReader;
@Autowired
@Qualifier("multiFileItemWriter")
private ItemWriter super Customer> multiFileItemWriter;
/**
* @Description: multiFileItemWriterDemoJob
* @Param: []
* @Return: org.springframework.batch.core.Job
*/
@Bean
public Job multiFileItemWriterDemoJob() {
return jobBuilderFactory.get("multiFileItemWriterDemoJob")
.start(multiFileItemWriterDemoStep())
.listener(new MyJobListener())
.build();
}
/**
* @Description: multiFileItemWriterDemoStep
* @Param: []
* @Return: org.springframework.batch.core.Step
*/
@Bean
public Step multiFileItemWriterDemoStep() {
return stepBuilderFactory.get("multiFileItemWriterDemoStep")
.chunk(5)
.reader(dbJdbcReader)
.writer(multiFileItemWriter)
.build();
}
}
```
只是给多个文件传递数据,不分类:
```java
@Configuration
public class MultiFIleWriterConfig {
/**
* @Description: fileWriter 输出数据到txt文件
* @Param: []
* @Return: org.springframework.batch.item.file.FlatFileItemWriter
*/
@Bean
public FlatFileItemWriter fileWriter(){
//把Customer对象转成字符串输出到文件
FlatFileItemWriter writer = new FlatFileItemWriter();
String path="F:\\test/customer.txt";
writer.setResource(new FileSystemResource(path));
//把Customer对象转成字符串
writer.setLineAggregator(new LineAggregator() {
ObjectMapper mapper = new ObjectMapper();
@Override
public String aggregate(Customer customer) {
String str = null;
try {
str=mapper.writeValueAsString(customer);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
return str;
}
});
try {
writer.afterPropertiesSet();
} catch (Exception e) {
e.printStackTrace();
}
return writer;
}
/**
* @Description: xmlWriter 输出数据到xml文件
* @Param: []
* @Return: org.springframework.batch.item.xml.StaxEventItemWriter
*/
@Bean
public StaxEventItemWriter xmlWriter(){
StaxEventItemWriter writer = new StaxEventItemWriter();
XStreamMarshaller marshaller = new XStreamMarshaller();
//告诉marshaller把数据转成什么类型
Map map = new HashMap<>();
map.put("customer",Customer.class);
marshaller.setAliases(map);
writer.setRootTagName("customers");
writer.setMarshaller(marshaller);
String path = "F:\\test/cus.xml";
writer.setResource(new FileSystemResource(path));
try {
writer.afterPropertiesSet();
} catch (Exception e) {
e.printStackTrace();
}
return writer;
}
/**
* @Description: multiFileItemWriter 调用输出到单个文件操作来实现输出数据到多个文件
* @Param: []
* @Return: org.springframework.batch.item.support.CompositeItemWriter
*/
@Bean
public CompositeItemWriter multiFileItemWriter(){
CompositeItemWriter writer = new CompositeItemWriter();
//输出到两个不同的文件中
writer.setDelegates(Arrays.asList(fileWriter(),xmlWriter()));
try {
writer.afterPropertiesSet();
} catch (Exception e) {
e.printStackTrace();
}
return writer;
}
}
```
示例代码:**CassifireCompositeltemWriter** 根据分类写入不同文件
```java
@Configuration
@EnableBatchProcessing
public class MultiFileItemWriterDemo {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
@Qualifier("dbJdbcReader")
private ItemReader extends Customer> dbJdbcReader;
@Autowired
@Qualifier("multiFileItemWriter")
private ItemWriter super Customer> multiFileItemWriter;
//分类传数据给文件的ClassifierCompositeItemWriter没有实现ItemStream
@Autowired
@Qualifier("jsonFileWriter")
private ItemStreamWriter extends Customer> jsonFileWriter;
@Autowired
@Qualifier("xmlWriter")
private ItemStreamWriter extends Customer> xmlWriter;
/**
* @Description: multiFileItemWriterDemoJob
* @Param: []
* @Return: org.springframework.batch.core.Job
*/
@Bean
public Job multiFileItemWriterDemoJob2() {
return jobBuilderFactory.get("multiFileItemWriterDemoJob2")
.start(multiFileItemWriterDemoStep())
.listener(new MyJobListener())
.build();
}
/**
* @Description: multiFileItemWriterDemoStep
* @Param: []
* @Return: org.springframework.batch.core.Step
*/
@Bean
public Step multiFileItemWriterDemoStep() {
return stepBuilderFactory.get("multiFileItemWriterDemoStep")
.chunk(5)
.reader(dbJdbcReader)
.writer(multiFileItemWriter)
//ClassifierCompositeItemWriter没有实现ItemStream
.stream(jsonFileWriter)
.stream(xmlWriter)
.build();
}
}
```
分类输出:
```java
@Configuration
public class MultiFIleWriterConfig {
/**
* @Description: fileWriter 输出数据到txt文件
* @Param: []
* @Return: org.springframework.batch.item.file.FlatFileItemWriter
*/
@Bean
public FlatFileItemWriter jsonFileWriter(){
//把Customer对象转成字符串输出到文件
FlatFileItemWriter writer = new FlatFileItemWriter();
String path="F:\\test/customer.txt";
writer.setResource(new FileSystemResource(path));
//把Customer对象转成字符串
writer.setLineAggregator(new LineAggregator() {
ObjectMapper mapper = new ObjectMapper();
@Override
public String aggregate(Customer customer) {
String str = null;
try {
str=mapper.writeValueAsString(customer);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
return str;
}
});
try {
writer.afterPropertiesSet();
} catch (Exception e) {
e.printStackTrace();
}
return writer;
}
/**
* @Description: xmlWriter 输出数据到xml文件
* @Param: []
* @Return: org.springframework.batch.item.xml.StaxEventItemWriter
*/
@Bean
public StaxEventItemWriter xmlWriter(){
StaxEventItemWriter writer = new StaxEventItemWriter();
XStreamMarshaller marshaller = new XStreamMarshaller();
//告诉marshaller把数据转成什么类型
Map map = new HashMap<>();
map.put("customer",Customer.class);
marshaller.setAliases(map);
writer.setRootTagName("customers");
writer.setMarshaller(marshaller);
String path = "F:\\test/cus.xml";
writer.setResource(new FileSystemResource(path));
try {
writer.afterPropertiesSet();
} catch (Exception e) {
e.printStackTrace();
}
return writer;
}
/**
* @Description: multiFileItemWriter 调用输出到单个文件操作来实现输出数据到多个文件
* @Param: []
* @Return: org.springframework.batch.item.support.CompositeItemWriter
*/
// @Bean
// public CompositeItemWriter multiFileItemWriter(){
// CompositeItemWriter writer = new CompositeItemWriter();
// //输出到两个不同的文件中
// writer.setDelegates(Arrays.asList(jsonFileWriter(),xmlWriter()));
// try {
// writer.afterPropertiesSet();
// } catch (Exception e) {
// e.printStackTrace();
// }
// return writer;
// }
/**
* @Description: multiFileItemWriter 按照某种条件对数据进行分类存储不同文件
* 注意:ClassifierCompositeItemWriter没有实现ItemStream
* @Param: []
* @Return: org.springframework.batch.item.support.ClassifierCompositeItemWriter
*/
@Bean
public ClassifierCompositeItemWriter multiFileItemWriter(){
ClassifierCompositeItemWriter writer = new ClassifierCompositeItemWriter();
writer.setClassifier(new Classifier>() {
@Override
/**
* @Description: classify 分类,比如按照年龄分成两个文件
* @Param: [customer]
* @Return: org.springframework.batch.item.ItemWriter super com.example.springbatchdemo.pojo.Customer>
*/
public ItemWriter super Customer> classify(Customer customer) {
//按照Customer的id进行分类
ItemWriter writer1 = customer.getId()%2==0?jsonFileWriter():xmlWriter();
return writer1;
}
});
return writer;
}
}
```
## 第六节 ItemProcessor的使用
**ItemProcessor**用于处理业务逻辑,验证,过滤等功能
**CompositeltemProcessor**类,处理多种**ItemProcessor**处理方式
案例:从数据库中读取数据,然对数据进行处理,最后输出到普通文件
代码:
```java
@Configuration
@EnableBatchProcessing
public class ItemProcessorDemo {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
@Qualifier("dbJdbcReader")
private ItemReader extends Customer> dbJdbcReader;
@Autowired
@Qualifier("fileItemWriter")
private ItemWriter super Customer> fileItemWriter;
@Autowired
private ItemProcessor firstNameUpperProcessor;
@Autowired
private ItemProcessor idFilterProcessor;
/**
* @Description: itemProcessorDemoJob
* @Param: []
* @Return: org.springframework.batch.core.Job
*/
@Bean
public Job itemProcessorDemoJob() {
return jobBuilderFactory.get("itemProcessorDemoJob")
.start(itemProcessorDemoStep())
.listener(new MyJobListener())
.build();
}
/**
* @Description: itemProcessorDemoStep
* @Param: []
* @Return: org.springframework.batch.core.Step
*/
@Bean
public Step itemProcessorDemoStep() {
return stepBuilderFactory.get("itemProcessorDemoStep")
.chunk(5)
.reader(dbJdbcReader)
//.processor(firstNameUpperProcessor) 只一种处理方式
.processor(process())
.writer(fileItemWriter)
.build();
}
/**
* @Description: process 同时用多种方式处理方式
* @Param: []
* @Return: org.springframework.batch.item.support.CompositeItemProcessor
*/
@Bean
public CompositeItemProcessor process(){
CompositeItemProcessor processor = new CompositeItemProcessor();
List> delegates = new ArrayList<>();
delegates.add(firstNameUpperProcessor);
delegates.add(idFilterProcessor);
processor.setDelegates(delegates);
return processor;
}
}
```
ItemProcessor使用
```java
@Component
public class FirstNameUpperProcessor implements ItemProcessor {
@Override
public Customer process(Customer customer) throws Exception {
Customer customer1 = new Customer();
customer1.setId(customer.getId());
customer1.setLastName(customer.getLastName());
customer1.setFirstName(customer.getFirstName().toUpperCase());
customer1.setBirthday(customer.getBirthday());
return customer1;
}
}
```
```java
@Component
public class IdFilterProcessor implements ItemProcessor {
@Override
public Customer process(Customer customer) throws Exception {
if(customer.getId()%2==0)
return customer;
else
return null;
}
}
```
# 第五章 错误处理
## 第一节 错误处理概述
默认情况下当任务出现异常时,SpringBatch会结束任务,当使相同的参数重启任务时,SpringBatch会去执行未执行的剩余任务。