詳解批處理框架之Spring Batch
一、Spring Batch的概念知識(shí)
1.1、分層架構(gòu)
Spring Batch
的分層架構(gòu)圖如下:
可以看到它分為三層,分別是:
Application
應(yīng)用層:包含了所有任務(wù)batch jobs
和開(kāi)發(fā)人員自定義的代碼,主要是根據(jù)項(xiàng)目需要開(kāi)發(fā)的業(yè)務(wù)流程等。Batch Core
核心層:包含啟動(dòng)和管理任務(wù)的運(yùn)行環(huán)境類(lèi),如JobLauncher
等。Batch Infrastructure
基礎(chǔ)層:上面兩層是建立在基礎(chǔ)層之上的,包含基礎(chǔ)的讀入reader
和寫(xiě)出writer
、重試框架等。
1.2、關(guān)鍵概念
理解下圖所涉及的概念至關(guān)重要,不然很難進(jìn)行后續(xù)開(kāi)發(fā)和問(wèn)題分析。
1.2.1、JobRepository
專(zhuān)門(mén)負(fù)責(zé)與數(shù)據(jù)庫(kù)打交道,對(duì)整個(gè)批處理的新增、更新、執(zhí)行進(jìn)行記錄。所以Spring Batch
是需要依賴(lài)數(shù)據(jù)庫(kù)來(lái)管理的。
1.2.2、任務(wù)啟動(dòng)器JobLauncher
負(fù)責(zé)啟動(dòng)任務(wù)Job
。
1.2.3、任務(wù)Job
Job
是封裝整個(gè)批處理過(guò)程的單位,跑一個(gè)批處理任務(wù),就是跑一個(gè)Job
所定義的內(nèi)容。
上圖介紹了Job
的一些相關(guān)概念:
Job
:封裝處理實(shí)體,定義過(guò)程邏輯。JobInstance
:Job
的運(yùn)行實(shí)例,不同的實(shí)例,參數(shù)不同,所以定義好一個(gè)Job
后可以通過(guò)不同參數(shù)運(yùn)行多次。JobParameters
:與JobInstance
相關(guān)聯(lián)的參數(shù)。JobExecution
:代表Job
的一次實(shí)際執(zhí)行,可能成功、可能失敗。
所以,開(kāi)發(fā)人員要做的事情,就是定義Job
。
1.2.4、步驟Step
Step
是對(duì)Job
某個(gè)過(guò)程的封裝,一個(gè)Job
可以包含一個(gè)或多個(gè)Step
,一步步的Step
按特定邏輯執(zhí)行,才代表Job
執(zhí)行完成。
通過(guò)定義Step
來(lái)組裝Job
可以更靈活地實(shí)現(xiàn)復(fù)雜的業(yè)務(wù)邏輯。
1.2.5、輸入——處理——輸出
所以,定義一個(gè)Job
關(guān)鍵是定義好一個(gè)或多個(gè)Step
,然后把它們組裝好即可。而定義Step
有多種方法,但有一種常用的模型就是輸入——處理——輸出
,即Item Reader
、Item Processor
和Item Writer
。比如通過(guò)Item Reader
從文件輸入數(shù)據(jù),然后通過(guò)Item Processor
進(jìn)行業(yè)務(wù)處理和數(shù)據(jù)轉(zhuǎn)換,最后通過(guò)Item Writer
寫(xiě)到數(shù)據(jù)庫(kù)中去。
Spring Batch
為我們提供了許多開(kāi)箱即用的Reader
和Writer
,非常方便。
二、代碼實(shí)例
理解了基本概念后,就直接通過(guò)代碼來(lái)感受一下吧。整個(gè)項(xiàng)目的功能是從多個(gè)csv
文件中讀數(shù)據(jù),處理后輸出到一個(gè)csv
文件。
2.1、基本框架
添加依賴(lài):
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-batch</artifactId> </dependency> <dependency> <groupId>com.h2database</groupId> <artifactId>h2</artifactId> <scope>runtime</scope> </dependency>
需要添加Spring Batch
的依賴(lài),同時(shí)使用H2
作為內(nèi)存數(shù)據(jù)庫(kù)比較方便,實(shí)際生產(chǎn)肯定是要使用外部的數(shù)據(jù)庫(kù),如Oracle
、PostgreSQL
。
入口主類(lèi):
@SpringBootApplication @EnableBatchProcessing public class PkslowBatchJobMain { public static void main(String[] args) { SpringApplication.run(PkslowBatchJobMain.class, args); } }
也很簡(jiǎn)單,只是在Springboot
的基礎(chǔ)上添加注解@EnableBatchProcessing
。
領(lǐng)域?qū)嶓w類(lèi)Employee
:
package com.pkslow.batch.entity; public class Employee { String id; String firstName; String lastName; }
對(duì)應(yīng)的csv
文件內(nèi)容如下:
id,firstName,lastName
1,Lokesh,Gupta
2,Amit,Mishra
3,Pankaj,Kumar
4,David,Miller
2.2、輸入——處理——輸出
2.2.1、讀取ItemReader
因?yàn)橛卸鄠€(gè)輸入文件,所以定義如下:
@Value("input/inputData*.csv") private Resource[] inputResources; @Bean public MultiResourceItemReader<Employee> multiResourceItemReader() { MultiResourceItemReader<Employee> resourceItemReader = new MultiResourceItemReader<Employee>(); resourceItemReader.setResources(inputResources); resourceItemReader.setDelegate(reader()); return resourceItemReader; } @Bean public FlatFileItemReader<Employee> reader() { FlatFileItemReader<Employee> reader = new FlatFileItemReader<Employee>(); //跳過(guò)csv文件第一行,為表頭 reader.setLinesToSkip(1); reader.setLineMapper(new DefaultLineMapper() { { setLineTokenizer(new DelimitedLineTokenizer() { { //字段名 setNames(new String[] { "id", "firstName", "lastName" }); } }); setFieldSetMapper(new BeanWrapperFieldSetMapper<Employee>() { { //轉(zhuǎn)換化后的目標(biāo)類(lèi) setTargetType(Employee.class); } }); } }); return reader; }
這里使用了FlatFileItemReader
,方便我們從文件讀取數(shù)據(jù)。
2.2.2、處理ItemProcessor
為了簡(jiǎn)單演示,處理很簡(jiǎn)單,就是把最后一列轉(zhuǎn)為大寫(xiě):
public ItemProcessor<Employee, Employee> itemProcessor() { return employee -> { employee.setLastName(employee.getLastName().toUpperCase()); return employee; }; }
2.2.3、輸出ItremWriter
比較簡(jiǎn)單,代碼及注釋如下:
private Resource outputResource = new FileSystemResource("output/outputData.csv"); @Bean public FlatFileItemWriter<Employee> writer() { FlatFileItemWriter<Employee> writer = new FlatFileItemWriter<>(); writer.setResource(outputResource); //是否為追加模式 writer.setAppendAllowed(true); writer.setLineAggregator(new DelimitedLineAggregator<Employee>() { { //設(shè)置分割符 setDelimiter(","); setFieldExtractor(new BeanWrapperFieldExtractor<Employee>() { { //設(shè)置字段 setNames(new String[] { "id", "firstName", "lastName" }); } }); } }); return writer; }
2.3、Step
有了Reader-Processor-Writer
后,就可以定義Step
了:
@Bean public Step csvStep() { return stepBuilderFactory.get("csvStep").<Employee, Employee>chunk(5) .reader(multiResourceItemReader()) .processor(itemProcessor()) .writer(writer()) .build(); }
這里有一個(gè)chunk
的設(shè)置,值為5
,意思是5條記錄后再提交輸出,可以根據(jù)自己需求定義。
2.4、Job
完成了Step
的編碼,定義Job
就容易了:
@Bean public Job pkslowCsvJob() { return jobBuilderFactory .get("pkslowCsvJob") .incrementer(new RunIdIncrementer()) .start(csvStep()) .build(); }
2.5、運(yùn)行
完成以上編碼后,執(zhí)行程序,結(jié)果如下:
成功讀取數(shù)據(jù),并將最后字段轉(zhuǎn)為大寫(xiě),并輸出到outputData.csv
文件。
三、監(jiān)聽(tīng)Listener
可以通過(guò)Listener
接口對(duì)特定事件進(jìn)行監(jiān)聽(tīng),以實(shí)現(xiàn)更多業(yè)務(wù)功能。比如如果處理失敗,就記錄一條失敗日志;處理完成,就通知下游拿數(shù)據(jù)等。
我們分別對(duì)Read
、Process
和Write
事件進(jìn)行監(jiān)聽(tīng),對(duì)應(yīng)分別要實(shí)現(xiàn)ItemReadListener
接口、ItemProcessListener
接口和ItemWriteListener
接口。因?yàn)榇a比較簡(jiǎn)單,就是打印一下日志,這里只貼出ItemWriteListener
的實(shí)現(xiàn)代碼:
public class PkslowWriteListener implements ItemWriteListener<Employee> { private static final Log logger = LogFactory.getLog(PkslowWriteListener.class); @Override public void beforeWrite(List<? extends Employee> list) { logger.info("beforeWrite: " + list); } @Override public void afterWrite(List<? extends Employee> list) { logger.info("afterWrite: " + list); } @Override public void onWriteError(Exception e, List<? extends Employee> list) { logger.info("onWriteError: " + list); } }
把實(shí)現(xiàn)的監(jiān)聽(tīng)器listener
整合到Step
中去:
@Bean public Step csvStep() { return stepBuilderFactory.get("csvStep").<Employee, Employee>chunk(5) .reader(multiResourceItemReader()) .listener(new PkslowReadListener()) .processor(itemProcessor()) .listener(new PkslowProcessListener()) .writer(writer()) .listener(new PkslowWriteListener()) .build(); }
執(zhí)行后看一下日志:
這里就能明顯看到之前設(shè)置的chunk
的作用了。Writer
每次是處理5條記錄,如果一條輸出一次,會(huì)對(duì)IO
造成壓力。
以上就是詳解Spring Batch入門(mén)之優(yōu)秀的批處理框架的詳細(xì)內(nèi)容,更多關(guān)于Spring Batch 批處理框架的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
- 源碼解析springbatch的job運(yùn)行機(jī)制
- spring中jdbcTemplate.batchUpdate的幾種使用情況
- spring?batch線(xiàn)上異常定位記錄
- Spring Batch 如何自定義ItemReader
- 手把手教你搭建第一個(gè)Spring Batch項(xiàng)目的步驟
- 基于Spring Batch向Elasticsearch批量導(dǎo)入數(shù)據(jù)示例
- Spring Batch入門(mén)教程篇
- Spring Batch讀取txt文件并寫(xiě)入數(shù)據(jù)庫(kù)的方法教程
- 使用Spring?Batch實(shí)現(xiàn)大數(shù)據(jù)處理的操作方法
相關(guān)文章
MyBatis中一對(duì)多的xml配置方式(嵌套查詢(xún)/嵌套結(jié)果)
這篇文章主要介紹了MyBatis中一對(duì)多的xml配置方式(嵌套查詢(xún)/嵌套結(jié)果),具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-03-03Maven將Jar包打入本地倉(cāng)庫(kù)的實(shí)現(xiàn)
項(xiàng)目需要用到一個(gè)Jar包,不能從遠(yuǎn)程倉(cāng)庫(kù)拉取,只有一個(gè)Jar包,所以需要將Jar包打入到本地倉(cāng)庫(kù)才能引入項(xiàng)目,本文主要介紹了Maven將Jar包打入本地倉(cāng)庫(kù)的實(shí)現(xiàn),感興趣的可以了解一下2023-12-12SpringBoot全局異常處理之多個(gè)處理器匹配順序(最新推薦)
這篇文章主要介紹了SpringBoot全局異常處理之多個(gè)處理器匹配順序(最新推薦),調(diào)試源碼可見(jiàn)匹配順序?yàn)椋寒惓蛹?jí)高者優(yōu)先,再清楚點(diǎn),子類(lèi)異常處理器優(yōu)先,本文給大家介紹的非常詳細(xì),感興趣的朋友一起看看吧2024-03-03Java集合的組內(nèi)平均值的計(jì)算方法總結(jié)
在Java中,經(jīng)常需要對(duì)集合進(jìn)行各種操作,其中之一就是計(jì)算集合的組內(nèi)平均值,本文將介紹如何使用Java集合來(lái)計(jì)算組內(nèi)平均值,并提供一些示例代碼和實(shí)用技巧2024-08-08解決IntelliJ IDEA創(chuàng)建spring boot無(wú)法連接http://start.spring.io/問(wèn)題
這篇文章主要介紹了解決IntelliJ IDEA創(chuàng)建spring boot無(wú)法連接http://start.spring.io/問(wèn)題,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2020-08-08全面解釋java中StringBuilder、StringBuffer、String類(lèi)之間的關(guān)系
String的值是不可變的,這就導(dǎo)致每次對(duì)String的操作都會(huì)生成新的String對(duì)象,不僅效率低下,而且大量浪費(fèi)有限的內(nèi)存空間,StringBuffer是可變類(lèi),和線(xiàn)程安全的字符串操作類(lèi),任何對(duì)它指向的字符串的操作都不會(huì)產(chǎn)生新的對(duì)象,StringBuffer和StringBuilder類(lèi)功能基本相似2013-01-01Win10系統(tǒng)下配置Java環(huán)境變量
今天給大家?guī)?lái)的是關(guān)于Java的相關(guān)知識(shí),文章圍繞著Win10系統(tǒng)下配置Java環(huán)境變量展開(kāi),文中有非常詳細(xì)的介紹及圖文示例,需要的朋友可以參考下2021-06-06