Springboot集成SpringBatch批處理組件
1.Spring Batch 簡(jiǎn)介
Spring Batch 是 Spring 生態(tài)系統(tǒng)中的??企業(yè)級(jí)批處理框架??,專門設(shè)計(jì)用于處理大規(guī)模數(shù)據(jù)作業(yè)。它提供了批處理應(yīng)用所需的核心功能,解決了傳統(tǒng)批處理應(yīng)用開發(fā)中的重復(fù)性問(wèn)題,使開發(fā)人員能夠?qū)W⒂跇I(yè)務(wù)邏輯而非基礎(chǔ)設(shè)施。
核心價(jià)值與定位??
??問(wèn)題解決??:自動(dòng)化處理??周期性的、數(shù)據(jù)密集型的??任務(wù)(如報(bào)表生成、數(shù)據(jù)遷移、對(duì)賬結(jié)算)
??典型場(chǎng)景??:
每月財(cái)務(wù)報(bào)表生成
銀行日終批量交易處理
電商平臺(tái)每日用戶行為分析
百萬(wàn)級(jí)數(shù)據(jù)遷移(如舊系統(tǒng)到新系統(tǒng))
2.批處理工具架構(gòu)和示例
項(xiàng) | 接口 |
---|---|
讀 | ItemReader |
處理 | ItemProcessor |
寫 | ItemWriter |
項(xiàng)目結(jié)構(gòu)
依賴包
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>org.example</groupId> <artifactId>SpringBatcher</artifactId> <version>1.0-SNAPSHOT</version> <properties> <maven.compiler.source>21</maven.compiler.source> <maven.compiler.target>21</maven.compiler.target> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <spring-boot.version>3.5.3</spring-boot.version> </properties> <dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-dependencies</artifactId> <version>${spring-boot.version}</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-batch</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.38</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>easyexcel</artifactId> <version>4.0.3</version> </dependency> <dependency> <groupId>com.h2database</groupId> <artifactId>h2</artifactId> <scope>runtime</scope> <!-- 通常只需運(yùn)行時(shí)依賴 --> </dependency> </dependencies> </project>
啟動(dòng)類
package org.example; import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @EnableBatchProcessing @SpringBootApplication public class BatchApp { public static void main(String[] args) { SpringApplication.run(BatchApp.class, args); } }
2.1 批處理任務(wù)持久化控制
示例代碼基于 H2 存儲(chǔ)
package org.example.config; import org.springframework.batch.core.repository.JobRepository; import org.springframework.batch.core.repository.support.JobRepositoryFactoryBean; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.jdbc.datasource.DataSourceTransactionManager; import javax.sql.DataSource; /** * @Author zhx && moon * @Since 21 * @Date 2025-06-20 PM 4:21 */ @Configuration public class BatchJobConfig { @Bean public JobRepository jobRepository(DataSource dataSource) throws Exception { JobRepositoryFactoryBean bean = new JobRepositoryFactoryBean(); bean.setDataSource(dataSource); bean.setDatabaseType("H2"); bean.setTransactionManager(new DataSourceTransactionManager(dataSource)); bean.afterPropertiesSet(); return bean.getObject(); } }
2.2 實(shí)現(xiàn)一個(gè)讀取器
以 Excel 文件讀取為例
package org.example.job.common; import com.alibaba.excel.EasyExcel; import org.springframework.batch.item.ItemReader; import org.springframework.beans.factory.InitializingBean; import java.io.File; import java.util.List; /** * @Author zhx && moon * @Since 21 * @Date 2025-06-24 PM 2:16 */ public class EasyExcelItemReader<T> implements ItemReader<T>, InitializingBean { private final Class<T> clazz; private final String filePath; private List<T> cacheList; private int index = 0; public EasyExcelItemReader(Class<T> clazz, String filePath) { this.clazz = clazz; this.filePath = filePath; } @Override public void afterPropertiesSet() { try { // 一次性讀取Excel所有數(shù)據(jù)(適用于中小文件) cacheList = EasyExcel.read(new File(filePath)) .head(clazz) .sheet() .headRowNumber(1) // 跳過(guò)標(biāo)題行 .doReadSync(); } catch (Exception e) { throw new RuntimeException("read excel failed ", e); } } @Override public T read() { if (index < cacheList.size()) { return cacheList.get(index++); } // 重置讀取的位置 index = 0; return null; } }
2.3 定義批處理JOB
package org.example.job; import org.example.entity.User; import org.example.job.common.EasyExcelItemReader; import org.springframework.batch.core.*; import org.springframework.batch.core.job.builder.JobBuilder; import org.springframework.batch.core.launch.support.RunIdIncrementer; import org.springframework.batch.core.repository.JobRepository; import org.springframework.batch.core.step.builder.StepBuilder; import org.springframework.batch.item.ItemProcessor; import org.springframework.batch.item.ItemWriter; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Component; import org.springframework.transaction.PlatformTransactionManager; /** * @Author zhx && moon * @Since 21 * @Date 2025-06-24 PM 2:08 */ @Component public class SVCJob { /** * Excel 讀取 * @return */ @Bean("easyExcelItemReader") public EasyExcelItemReader<User> easyExcelItemReader() { return new EasyExcelItemReader<>(User.class, "C:\\Users\\Administrator\\Desktop\\Test.xlsx"); } /** * 數(shù)據(jù)處理器 對(duì)讀取的數(shù)據(jù)進(jìn)行加工 * @return */ @Bean("getNameProcessors") public ItemProcessor<User, String> getNameProcessors() { return item -> { return item.getName(); }; } /** * 配置寫入器(保持不變) * @return */ @Bean("nameWriter") public ItemWriter<String> nameWriter() { return items -> { for (String item : items) { System.out.println("User Name: " + item); } }; } /** * 配置批處理步驟(使用新版API) * @param jobRepository * @param transactionManager * @param reader * @param processor * @param writer * @return */ @Bean("easyExcelStep") public Step easyExcelStep(JobRepository jobRepository, PlatformTransactionManager transactionManager, @Qualifier("easyExcelItemReader") EasyExcelItemReader<User> reader, @Qualifier("getNameProcessors") ItemProcessor<User, String> processor, @Qualifier("nameWriter") ItemWriter<String> writer) { return new StepBuilder("easyExcelStep", jobRepository) .<User, String>chunk(100, transactionManager) .reader(reader) .processor(processor) .writer(writer) .faultTolerant() .skipLimit(1) .skip(IllegalArgumentException.class) .listener(new StepExecutionListener() { @Override public void beforeStep(StepExecution stepExecution) { System.out.println("start to processor data ..."); } }) .build(); } /** * 配置批處理作業(yè) * @param jobRepository * @param importStep * @return */ @Bean("easyExcelImportJobs") public Job customerImportJob(JobRepository jobRepository, @Qualifier("easyExcelStep") Step importStep) { return new JobBuilder("easyExcelImportJobs", jobRepository) .incrementer(new RunIdIncrementer()) .start(importStep) .listener(new JobExecutionListener() { @Override public void afterJob(JobExecution jobExecution) { System.out.println("Job Finished!State: " + jobExecution.getStatus()); } }) .build(); } }
2.4數(shù)據(jù)實(shí)體
package org.example.entity; import com.alibaba.excel.annotation.ExcelProperty; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; /** * @Author zhx && moon * @Since 21 * @Date 2025-06-24 PM 2:20 */ @Data @NoArgsConstructor @AllArgsConstructor public class User { @ExcelProperty("姓名") private String name; @ExcelProperty("編號(hào)") private String employeeId; @ExcelProperty("年齡") private Integer age; }
2.5 接口類
package org.example.controller; import jakarta.annotation.Resource; import org.springframework.batch.core.Job; import org.springframework.batch.core.JobParameters; import org.springframework.batch.core.JobParametersBuilder; import org.springframework.batch.core.launch.JobLauncher; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; /** * @Author zhx && moon * @Since 21 * @Date 2025-06-23 PM 4:40 */ @RestController @RequestMapping("/job") public class JobManage { @Autowired private JobLauncher jobLauncher; @Resource(name = "easyExcelImportJobs") Job job; @GetMapping("/start") public void start(){ try { JobParameters params = new JobParametersBuilder() .addLong("uniqueId", System.nanoTime()) .toJobParameters(); jobLauncher.run(job, params); } catch (Exception e) { throw new RuntimeException(e); } } }
2.6 H2 配置
spring: datasource: url: jdbc:h2:file:Z:/IdeaProjects/SpringBatcher/SpringBatcher/springbatchdb #jdbc:h2:tcp://localhost/mem:springbatchdb;DB_CLOSE_DELAY=-1 #jdbc:h2:mem:springbatchdb driver-class-name: org.h2.Driver username: sa password: sa h2: console: enabled: true path: /h2/db-console settings: web-allow-others: true batch: jdbc: initialize-schema: always
2.7 H2 數(shù)據(jù)庫(kù)腳本
-- Autogenerated: do not edit this file CREATE TABLE BATCH_JOB_INSTANCE ( JOB_INSTANCE_ID BIGINT GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY , VERSION BIGINT , JOB_NAME VARCHAR(100) NOT NULL, JOB_KEY VARCHAR(32) NOT NULL, constraint JOB_INST_UN unique (JOB_NAME, JOB_KEY) ) ; CREATE TABLE BATCH_JOB_EXECUTION ( JOB_EXECUTION_ID BIGINT GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY , VERSION BIGINT , JOB_INSTANCE_ID BIGINT NOT NULL, CREATE_TIME TIMESTAMP(9) NOT NULL, START_TIME TIMESTAMP(9) DEFAULT NULL , END_TIME TIMESTAMP(9) DEFAULT NULL , STATUS VARCHAR(10) , EXIT_CODE VARCHAR(2500) , EXIT_MESSAGE VARCHAR(2500) , LAST_UPDATED TIMESTAMP(9), constraint JOB_INST_EXEC_FK foreign key (JOB_INSTANCE_ID) references BATCH_JOB_INSTANCE(JOB_INSTANCE_ID) ) ; CREATE TABLE BATCH_JOB_EXECUTION_PARAMS ( JOB_EXECUTION_ID BIGINT NOT NULL , PARAMETER_NAME VARCHAR(100) NOT NULL , PARAMETER_TYPE VARCHAR(100) NOT NULL , PARAMETER_VALUE VARCHAR(2500) , IDENTIFYING CHAR(1) NOT NULL , constraint JOB_EXEC_PARAMS_FK foreign key (JOB_EXECUTION_ID) references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID) ) ; CREATE TABLE BATCH_STEP_EXECUTION ( STEP_EXECUTION_ID BIGINT GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY , VERSION BIGINT NOT NULL, STEP_NAME VARCHAR(100) NOT NULL, JOB_EXECUTION_ID BIGINT NOT NULL, CREATE_TIME TIMESTAMP(9) NOT NULL, START_TIME TIMESTAMP(9) DEFAULT NULL , END_TIME TIMESTAMP(9) DEFAULT NULL , STATUS VARCHAR(10) , COMMIT_COUNT BIGINT , READ_COUNT BIGINT , FILTER_COUNT BIGINT , WRITE_COUNT BIGINT , READ_SKIP_COUNT BIGINT , WRITE_SKIP_COUNT BIGINT , PROCESS_SKIP_COUNT BIGINT , ROLLBACK_COUNT BIGINT , EXIT_CODE VARCHAR(2500) , EXIT_MESSAGE VARCHAR(2500) , LAST_UPDATED TIMESTAMP(9), constraint JOB_EXEC_STEP_FK foreign key (JOB_EXECUTION_ID) references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID) ) ; CREATE TABLE BATCH_STEP_EXECUTION_CONTEXT ( STEP_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY, SHORT_CONTEXT VARCHAR(2500) NOT NULL, SERIALIZED_CONTEXT LONGVARCHAR , constraint STEP_EXEC_CTX_FK foreign key (STEP_EXECUTION_ID) references BATCH_STEP_EXECUTION(STEP_EXECUTION_ID) ) ; CREATE TABLE BATCH_JOB_EXECUTION_CONTEXT ( JOB_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY, SHORT_CONTEXT VARCHAR(2500) NOT NULL, SERIALIZED_CONTEXT LONGVARCHAR , constraint JOB_EXEC_CTX_FK foreign key (JOB_EXECUTION_ID) references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID) ) ; CREATE SEQUENCE BATCH_STEP_EXECUTION_SEQ; CREATE SEQUENCE BATCH_JOB_EXECUTION_SEQ; CREATE SEQUENCE BATCH_JOB_SEQ;
3.測(cè)試
啟動(dòng)服務(wù)
測(cè)試 H2 連接
測(cè)試數(shù)據(jù)
觸發(fā) JOB
JOB 執(zhí)行記錄
到此這篇關(guān)于Springboot集成SpringBatch批處理組件的文章就介紹到這了,更多相關(guān)SpringBatch批處理內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
springboot啟動(dòng)mongoDB報(bào)錯(cuò)之禁用mongoDB自動(dòng)配置問(wèn)題
這篇文章主要介紹了springboot啟動(dòng)mongoDB報(bào)錯(cuò)之禁用mongoDB自動(dòng)配置問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2024-05-05SpringBoot實(shí)現(xiàn)監(jiān)控Actuator,關(guān)閉redis監(jiān)測(cè)
這篇文章主要介紹了SpringBoot實(shí)現(xiàn)監(jiān)控Actuator,關(guān)閉redis監(jiān)測(cè),具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-11-11兼容Spring Boot 1.x和2.x配置類參數(shù)綁定的工具類SpringBootBindUtil
今天小編就為大家分享一篇關(guān)于兼容Spring Boot 1.x和2.x配置類參數(shù)綁定的工具類SpringBootBindUtil,小編覺(jué)得內(nèi)容挺不錯(cuò)的,現(xiàn)在分享給大家,具有很好的參考價(jià)值,需要的朋友一起跟隨小編來(lái)看看吧2018-12-12SpringMVC的處理器適配器-HandlerAdapter的用法及說(shuō)明
這篇文章主要介紹了SpringMVC的處理器適配器-HandlerAdapter的用法及說(shuō)明,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-12-12RocketMQ4.5.X 實(shí)現(xiàn)修改生產(chǎn)者消費(fèi)者日志保存路徑
這篇文章主要介紹了RocketMQ4.5.X 實(shí)現(xiàn)修改生產(chǎn)者消費(fèi)者日志保存路徑方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-07-07java重寫(@Override)介紹以及舉例說(shuō)明
這篇文章主要給大家介紹了關(guān)于java重寫(@Override)介紹以及舉例說(shuō)明的相關(guān)資料,在Java中@Override注解用于表示方法重寫(覆蓋)了父類的方法,文中通過(guò)代碼介紹的非常詳細(xì),需要的朋友可以參考下2024-01-01