Spring Batch 如何自定義ItemReader
Spring Batch 自定義ItemReader
Spring Batch支持各種數(shù)據(jù)輸入源,如文件、數(shù)據(jù)庫(kù)等。然而有時(shí)也會(huì)遇到一些默認(rèn)不支持的數(shù)據(jù)源,這時(shí)我們則需要實(shí)現(xiàn)自己的數(shù)據(jù)源————自定義ItemReader。本文通過示例說(shuō)明如何自定義ItemReader。
創(chuàng)建自定義ItemReader
創(chuàng)建自定義ItemReader需要下面兩個(gè)步驟:
- 創(chuàng)建一個(gè)實(shí)現(xiàn)ItemReader接口的類,并提供返回對(duì)象類型 T 作為類型參數(shù)。
- 按照下面規(guī)則實(shí)現(xiàn)ItemReader接口的T read()方法
read()方法如果存在下一個(gè)對(duì)象則返回,否則返回null。
下面我們自定義ItemReader,其返回在線測(cè)試課程的學(xué)生信息StuDto類型,為了減少?gòu)?fù)雜性,該數(shù)據(jù)存儲(chǔ)在內(nèi)存中。StuDto類是一個(gè)簡(jiǎn)單數(shù)據(jù)傳輸對(duì)象,代碼如下:
@Data public class StuDTO { private String emailAddress; private String name; private String purchasedPackage; }
下面參照一下步驟創(chuàng)建ItemReader:
- 創(chuàng)建InMemoryStudentReader 類
- 實(shí)現(xiàn)ItemReader接口,并設(shè)置返回對(duì)象類型為StuDto
- 類中增加List studentData 字段,其包括參加課程的學(xué)生信息
- 類中增加nextStudentIndex 字段,表示下一個(gè)StuDto對(duì)象的索引
- 增加私有initialize()方法,初始化學(xué)生信息并設(shè)置索引值為0
- 創(chuàng)建構(gòu)造函數(shù)并調(diào)用initialize方法
- 實(shí)現(xiàn)read()方法,包括下面規(guī)則:如果存在下一個(gè)學(xué)生,則返回StuDto對(duì)象并把索引加一。否則返回null。
InMemoryStudentReader 代碼如下:
public class InMemoryStudentReader implements ItemReader<StuDto> { private int nextStudentIndex; private List<StuDto> studentData; InMemoryStudentReader() { initialize(); } private void initialize() { StuDto tony = new StuDto(); tony.setEmailAddress("tony.tester@gmail.com"); tony.setName("Tony Tester"); tony.setPurchasedPackage("master"); StuDto nick = new StuDto(); nick.setEmailAddress("nick.newbie@gmail.com"); nick.setName("Nick Newbie"); nick.setPurchasedPackage("starter"); StuDto ian = new StuDto(); ian.setEmailAddress("ian.intermediate@gmail.com"); ian.setName("Ian Intermediate"); ian.setPurchasedPackage("intermediate"); studentData = Collections.unmodifiableList(Arrays.asList(tony, nick, ian)); nextStudentIndex = 0; } @Override public StuDto read() throws Exception { StuDto nextStudent = null; if (nextStudentIndex < studentData.size()) { nextStudent = studentData.get(nextStudentIndex); nextStudentIndex++; } return nextStudent; } }
創(chuàng)建好自定義ItemReader后,需要配置其作為bean讓Spring Batch Job使用。下面請(qǐng)看如何配置。
配置ItemReader Bean
配置類代碼如下:
@Configuration public class InMemoryStudentJobConfig { @Bean ItemReader<StuDto> inMemoryStudentReader() { return new InMemoryStudentReader(); } }
需要增加@Configuration表明類為配置類, 增加方法返回ItemReader類型,并增加@Bean注解,實(shí)現(xiàn)方法內(nèi)容————返回InMemoryStudentReader對(duì)象。
小結(jié)一下
本文通過示例說(shuō)明如何自定義ItemReader,主要包括三個(gè)方面:
- 自定義ItemReader需實(shí)現(xiàn)ItemReader接口
- 實(shí)現(xiàn)ItemReader接口,需要指定返回類型作為類型參數(shù)(T)
- 實(shí)現(xiàn)接口方法read,如果存在下一個(gè)對(duì)象則返回,反之返回null
Spring Batch 之 ItemReader
重點(diǎn)介紹 ItemReader,如何從不同數(shù)據(jù)源讀取數(shù)據(jù);以及異常處理及重啟機(jī)制。
JdbcPagingItemReader
從數(shù)據(jù)庫(kù)中讀取數(shù)據(jù)
@Configuration public class DBJdbcDemoJobConfiguration { @Autowired private JobBuilderFactory jobBuilderFactory; @Autowired private StepBuilderFactory stepBuilderFactory; @Autowired @Qualifier("dbJdbcDemoWriter") private ItemWriter<? super Customer> dbJdbcDemoWriter; @Autowired private DataSource dataSource; @Bean public Job DBJdbcDemoJob(){ return jobBuilderFactory.get("DBJdbcDemoJob") .start(dbJdbcDemoStep()) .build(); } @Bean public Step dbJdbcDemoStep() { return stepBuilderFactory.get("dbJdbcDemoStep") .<Customer,Customer>chunk(100) .reader(dbJdbcDemoReader()) .writer(dbJdbcDemoWriter) .build(); } @Bean @StepScope public JdbcPagingItemReader<Customer> dbJdbcDemoReader() { JdbcPagingItemReader<Customer> reader = new JdbcPagingItemReader<>(); reader.setDataSource(this.dataSource); reader.setFetchSize(100); //批量讀取 reader.setRowMapper((rs,rowNum)->{ return Customer.builder().id(rs.getLong("id")) .firstName(rs.getString("firstName")) .lastName(rs.getString("lastName")) .birthdate(rs.getString("birthdate")) .build(); }); MySqlPagingQueryProvider queryProvider = new MySqlPagingQueryProvider(); queryProvider.setSelectClause("id, firstName, lastName, birthdate"); queryProvider.setFromClause("from Customer"); Map<String, Order> sortKeys = new HashMap<>(1); sortKeys.put("id", Order.ASCENDING); queryProvider.setSortKeys(sortKeys); reader.setQueryProvider(queryProvider); return reader; } }
Job 和 ItermWriter不是本文介紹重點(diǎn),此處舉例,下面例子相同
@Component("dbJdbcDemoWriter") public class DbJdbcDemoWriter implements ItemWriter<Customer> { @Override public void write(List<? extends Customer> items) throws Exception { for (Customer customer:items) System.out.println(customer); } }
FlatFileItemReader
從CVS文件中讀取數(shù)據(jù)
@Configuration public class FlatFileDemoJobConfiguration { @Autowired private JobBuilderFactory jobBuilderFactory; @Autowired private StepBuilderFactory stepBuilderFactory; @Autowired @Qualifier("flatFileDemoWriter") private ItemWriter<? super Customer> flatFileDemoWriter; @Bean public Job flatFileDemoJob(){ return jobBuilderFactory.get("flatFileDemoJob") .start(flatFileDemoStep()) .build(); } @Bean public Step flatFileDemoStep() { return stepBuilderFactory.get("flatFileDemoStep") .<Customer,Customer>chunk(100) .reader(flatFileDemoReader()) .writer(flatFileDemoWriter) .build(); } @Bean @StepScope public FlatFileItemReader<Customer> flatFileDemoReader() { FlatFileItemReader<Customer> reader = new FlatFileItemReader<>(); reader.setResource(new ClassPathResource("customer.csv")); reader.setLinesToSkip(1); DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer(); tokenizer.setNames(new String[]{"id","firstName","lastName","birthdate"}); DefaultLineMapper<Customer> lineMapper = new DefaultLineMapper<>(); lineMapper.setLineTokenizer(tokenizer); lineMapper.setFieldSetMapper((fieldSet -> { return Customer.builder().id(fieldSet.readLong("id")) .firstName(fieldSet.readString("firstName")) .lastName(fieldSet.readString("lastName")) .birthdate(fieldSet.readString("birthdate")) .build(); })); lineMapper.afterPropertiesSet(); reader.setLineMapper(lineMapper); return reader; } }
StaxEventItemReader
從XML文件中讀取數(shù)據(jù)
@Configuration public class XmlFileDemoJobConfiguration { @Autowired private JobBuilderFactory jobBuilderFactory; @Autowired private StepBuilderFactory stepBuilderFactory; @Autowired @Qualifier("xmlFileDemoWriter") private ItemWriter<? super Customer> xmlFileDemoWriter; @Bean public Job xmlFileDemoJob(){ return jobBuilderFactory.get("xmlFileDemoJob") .start(xmlFileDemoStep()) .build(); } @Bean public Step xmlFileDemoStep() { return stepBuilderFactory.get("xmlFileDemoStep") .<Customer,Customer>chunk(10) .reader(xmlFileDemoReader()) .writer(xmlFileDemoWriter) .build(); } @Bean @StepScope public StaxEventItemReader<Customer> xmlFileDemoReader() { StaxEventItemReader<Customer> reader = new StaxEventItemReader<>(); reader.setResource(new ClassPathResource("customer.xml")); reader.setFragmentRootElementName("customer"); XStreamMarshaller unMarshaller = new XStreamMarshaller(); Map<String,Class> map = new HashMap<>(); map.put("customer",Customer.class); unMarshaller.setAliases(map); reader.setUnmarshaller(unMarshaller); return reader; } }
MultiResourceItemReader
從多個(gè)文件讀取數(shù)據(jù)
@Configuration public class MultipleFileDemoJobConfiguration { @Autowired private JobBuilderFactory jobBuilderFactory; @Autowired private StepBuilderFactory stepBuilderFactory; @Autowired @Qualifier("flatFileDemoWriter") private ItemWriter<? super Customer> flatFileDemoWriter; @Value("classpath*:/file*.csv") private Resource[] inputFiles; @Bean public Job multipleFileDemoJob(){ return jobBuilderFactory.get("multipleFileDemoJob") .start(multipleFileDemoStep()) .build(); } @Bean public Step multipleFileDemoStep() { return stepBuilderFactory.get("multipleFileDemoStep") .<Customer,Customer>chunk(50) .reader(multipleResourceItemReader()) .writer(flatFileDemoWriter) .build(); } private MultiResourceItemReader<Customer> multipleResourceItemReader() { MultiResourceItemReader<Customer> reader = new MultiResourceItemReader<>(); reader.setDelegate(flatFileReader()); reader.setResources(inputFiles); return reader; } @Bean public FlatFileItemReader<Customer> flatFileReader() { FlatFileItemReader<Customer> reader = new FlatFileItemReader<>(); reader.setResource(new ClassPathResource("customer.csv")); // reader.setLinesToSkip(1); DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer(); tokenizer.setNames(new String[]{"id","firstName","lastName","birthdate"}); DefaultLineMapper<Customer> lineMapper = new DefaultLineMapper<>(); lineMapper.setLineTokenizer(tokenizer); lineMapper.setFieldSetMapper((fieldSet -> { return Customer.builder().id(fieldSet.readLong("id")) .firstName(fieldSet.readString("firstName")) .lastName(fieldSet.readString("lastName")) .birthdate(fieldSet.readString("birthdate")) .build(); })); lineMapper.afterPropertiesSet(); reader.setLineMapper(lineMapper); return reader; } }
異常處理及重啟機(jī)制
對(duì)于chunk-oriented step,Spring Batch提供了管理狀態(tài)的工具。如何在一個(gè)步驟中管理狀態(tài)是通過ItemStream接口為開發(fā)人員提供訪問權(quán)限保持狀態(tài)的組件。這里提到的這個(gè)組件是ExecutionContext實(shí)際上它是鍵值對(duì)的映射。map存儲(chǔ)特定步驟的狀態(tài)。該ExecutionContext使重啟步驟成為可能,因?yàn)闋顟B(tài)在JobRepository中持久存在。
執(zhí)行期間出現(xiàn)錯(cuò)誤時(shí),最后一個(gè)狀態(tài)將更新為JobRepository。下次作業(yè)運(yùn)行時(shí),最后一個(gè)狀態(tài)將用于填充ExecutionContext然后
可以繼續(xù)從上次離開的地方開始運(yùn)行。
檢查ItemStream接口:
將在步驟開始時(shí)調(diào)用open()并執(zhí)行ExecutionContext;
用DB填充值; update()將在每個(gè)步驟或事務(wù)結(jié)束時(shí)調(diào)用,更新ExecutionContext;
完成所有數(shù)據(jù)塊后調(diào)用close();
下面我們構(gòu)造個(gè)例子
準(zhǔn)備個(gè)cvs文件,在第33條數(shù)據(jù),添加一條錯(cuò)誤名字信息 ;當(dāng)讀取到這條數(shù)據(jù)時(shí),拋出異常終止程序。
ItemReader測(cè)試代碼
@Component("restartDemoReader") public class RestartDemoReader implements ItemStreamReader<Customer> { private Long curLine = 0L; private boolean restart = false; private FlatFileItemReader<Customer> reader = new FlatFileItemReader<>(); private ExecutionContext executionContext; RestartDemoReader public () { reader.setResource(new ClassPathResource("restartDemo.csv")); DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer(); tokenizer.setNames(new String[]{"id", "firstName", "lastName", "birthdate"}); DefaultLineMapper<Customer> lineMapper = new DefaultLineMapper<>(); lineMapper.setLineTokenizer(tokenizer); lineMapper.setFieldSetMapper((fieldSet -> { return Customer.builder().id(fieldSet.readLong("id")) .firstName(fieldSet.readString("firstName")) .lastName(fieldSet.readString("lastName")) .birthdate(fieldSet.readString("birthdate")) .build(); })); lineMapper.afterPropertiesSet(); reader.setLineMapper(lineMapper); } @Override public Customer read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException { Customer customer = null; this.curLine++; //如果是重啟,則從上一步讀取的行數(shù)繼續(xù)往下執(zhí)行 if (restart) { reader.setLinesToSkip(this.curLine.intValue()-1); restart = false; System.out.println("Start reading from line: " + this.curLine); } reader.open(this.executionContext); customer = reader.read(); //當(dāng)匹配到wrongName時(shí),顯示拋出異常,終止程序 if (customer != null) { if (customer.getFirstName().equals("wrongName")) throw new RuntimeException("Something wrong. Customer id: " + customer.getId()); } else { curLine--; } return customer; } /** * 判斷是否是重啟job * @param executionContext * @throws ItemStreamException */ @Override public void open(ExecutionContext executionContext) throws ItemStreamException { this.executionContext = executionContext; if (executionContext.containsKey("curLine")) { this.curLine = executionContext.getLong("curLine"); this.restart = true; } else { this.curLine = 0L; executionContext.put("curLine", this.curLine.intValue()); } } @Override public void update(ExecutionContext executionContext) throws ItemStreamException { System.out.println("update curLine: " + this.curLine); executionContext.put("curLine", this.curLine); } @Override public void close() throws ItemStreamException { } }
Job配置
以10條記錄為一個(gè)批次,進(jìn)行讀取
@Configuration public class RestartDemoJobConfiguration { @Autowired private JobBuilderFactory jobBuilderFactory; @Autowired private StepBuilderFactory stepBuilderFactory; @Autowired @Qualifier("flatFileDemoWriter") private ItemWriter<? super Customer> flatFileDemoWriter; @Autowired @Qualifier("restartDemoReader") private ItemReader<Customer> restartDemoReader; @Bean public Job restartDemoJob(){ return jobBuilderFactory.get("restartDemoJob") .start(restartDemoStep()) .build(); } @Bean public Step restartDemoStep() { return stepBuilderFactory.get("restartDemoStep") .<Customer,Customer>chunk(10) .reader(restartDemoReader) .writer(flatFileDemoWriter) .build(); } }
當(dāng)我們第一次執(zhí)行時(shí),程序在33行拋出異常異常,curline值是30;
這時(shí),我們可以查詢數(shù)據(jù)庫(kù) batch_step_excution表,發(fā)現(xiàn)curline值已經(jīng)以 鍵值對(duì)形式,持久化進(jìn)數(shù)據(jù)庫(kù)(上文以10條數(shù)據(jù)為一個(gè)批次;故33條數(shù)據(jù)異常時(shí),curline值為30)
接下來(lái),我們更新wrongName,再次執(zhí)行程序;
程序會(huì)執(zhí)行open方法,判斷數(shù)據(jù)庫(kù)step中map是否存在curline,如果存在,則是重跑,即讀取curline,從該批次開始往下繼續(xù)執(zhí)行;
以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
- 源碼解析springbatch的job運(yùn)行機(jī)制
- spring中jdbcTemplate.batchUpdate的幾種使用情況
- spring?batch線上異常定位記錄
- 詳解批處理框架之Spring Batch
- 手把手教你搭建第一個(gè)Spring Batch項(xiàng)目的步驟
- 基于Spring Batch向Elasticsearch批量導(dǎo)入數(shù)據(jù)示例
- Spring Batch入門教程篇
- Spring Batch讀取txt文件并寫入數(shù)據(jù)庫(kù)的方法教程
- 使用Spring?Batch實(shí)現(xiàn)大數(shù)據(jù)處理的操作方法
相關(guān)文章
Springboot實(shí)現(xiàn)多文件上傳代碼解析
這篇文章主要介紹了Springboot實(shí)現(xiàn)多文件上傳代碼解析,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-04-04SpringBoot集成SSM、Dubbo、Redis、JSP的案例小結(jié)及思路講解
這個(gè)案例其實(shí)就是SpringBoot集成SSM、Dubbo、Redis、JSP,看起來(lái)感覺很繁瑣,其實(shí)就是很簡(jiǎn)單,下面通過案例分析給大家講解,感興趣的朋友跟隨小編一起看看吧2021-05-05IDEA mybatis-generator逆向工程生成代碼
這篇文章主要介紹了IDEA mybatis-generator逆向工程生成代碼,小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過來(lái)看看吧2018-06-06jdk-logging?log4j?logback日志系統(tǒng)實(shí)現(xiàn)機(jī)制原理介紹
這篇文章主要介紹了jdk-logging、log4j、logback日志介紹以及三個(gè)日志系統(tǒng)的實(shí)現(xiàn)機(jī)制,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-03-03spring boot實(shí)戰(zhàn)之本地jar包引用示例
本篇文章主要介紹了spring boot實(shí)戰(zhàn)之本地jar包引用示例,小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過來(lái)看看吧2017-10-10Spring boot攔截器實(shí)現(xiàn)IP黑名單的完整步驟
這篇文章主要給大家介紹了關(guān)于Spring boot攔截器實(shí)現(xiàn)IP黑名單的完整步驟,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家學(xué)習(xí)或者使用Spring boot攔截器具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2020-06-06springboot中自定義異常以及定制異常界面實(shí)現(xiàn)過程解析
這篇文章主要介紹了springboot中自定義異常以及定制異常界面實(shí)現(xiàn)過程解析,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2019-09-09