亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

Spring Batch遠程分區(qū)的本地Jar包模式的代碼詳解

 更新時間:2020年09月15日 08:41:53   作者:南瓜慢說  
這篇文章主要介紹了Spring Batch遠程分區(qū)的本地Jar包模式,本文通過實例代碼給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下

1 前言

Spring Batch遠程分區(qū)對于大量數(shù)據(jù)的處理非常擅長,它的實現(xiàn)有多種方式,如本地Jar包模式、MQ模式、Kubernetes模式。這三種模式的如下:

(1)本地Jar包模式:分區(qū)處理的worker為一個Java進程,從jar包啟動,通過jvm參數(shù)和數(shù)據(jù)庫傳遞參數(shù);官方提供示例代碼。

(2)MQ模式worker是一個常駐進程,ManagerWorker通過消息隊列來傳遞參數(shù);網(wǎng)上有不少相關(guān)示例代碼。

(3)Kubernetes模式workerK8s中的Pod,Manager直接啟動Pod來處理;網(wǎng)上并沒有找到任何示例代碼。

本文將通過代碼來講解第一種模式(本地Jar包模式),其它后續(xù)再介紹。

建議先看下面文章了解一下:

Spring Batch入門:Spring Batch入門教程篇

Spring Batch并行處理介紹:詳解SpringBoot和SpringBatch 使用

2 代碼講解

本文代碼中,ManagerWorker是放在一起的,在同一個項目里,也只會打一個jar包而已;我們通過profile來區(qū)別是manager還是worker,也就是通過Spring Profile實現(xiàn)一份代碼,兩份邏輯。實際上也可以拆成兩份代碼,但放一起更方便測試,而且代碼量不大,就沒有必要了。

2.1 項目準備

2.1.1 數(shù)據(jù)庫

首先我們需要準備一個數(shù)據(jù)庫,因為ManagerWorker都需要同步狀態(tài)到DB上,不能直接使用嵌入式的內(nèi)存數(shù)據(jù)庫了,需要一個外部可共同訪問的數(shù)據(jù)庫。這里我使用的是H2 Database,安裝可參考:把H2數(shù)據(jù)庫從jar包部署到Kubernetes,并解決Ingress不支持TCP的問題。

2.1.2 引入依賴

maven引入依賴如下所示:

<dependency>
 <groupId>org.springframework.boot</groupId>
 <artifactId>spring-boot-starter-batch</artifactId>
</dependency>
<dependency>
 <groupId>org.springframework.cloud</groupId>
 <artifactId>spring-cloud-starter-task</artifactId>
</dependency>
<dependency>
 <groupId>com.h2database</groupId>
 <artifactId>h2</artifactId>
 <scope>runtime</scope>
</dependency>

<dependency>
 <groupId>org.springframework.cloud</groupId>
 <artifactId>spring-cloud-deployer-local</artifactId>
 <version>2.4.1</version>
</dependency>

<dependency>
 <groupId>org.springframework.batch</groupId>
 <artifactId>spring-batch-integration</artifactId>
</dependency>

spring-cloud-deployer-local用于部署和啟動worker,非常關(guān)鍵;其它就是Spring BatchTask相關(guān)的依賴;以及數(shù)據(jù)庫連接。

2.1.3 主類入口

Springboot的主類入口如下:

@EnableTask
@SpringBootApplication
@EnableBatchProcessing
public class PkslowRemotePartitionJar {
 public static void main(String[] args) {
 SpringApplication.run(PkslowRemotePartitionJar.class, args);
 }
}

Springboot的基礎(chǔ)上,添加了Spring BatchSpring Cloud Task的支持。

2.2 關(guān)鍵代碼編寫

前面的數(shù)據(jù)庫搭建和其它代碼沒有太多可講的,接下來就開始關(guān)鍵代碼的編寫。

2.2.1 分區(qū)管理Partitioner

Partitioner是遠程分區(qū)中的核心bean,它定義了分成多少個區(qū)、怎么分區(qū),要把什么變量傳遞給worker。它會返回一組<分區(qū)名,執(zhí)行上下文>的鍵值對,即返回Map<String, ExecutionContext>。把要傳遞給worker的變量放在ExecutionContext中去,支持多種類型的變量,如String、intlong等。實際上,我們不建議通過ExecutionContext來傳遞太多數(shù)據(jù);可以傳遞一些標識或主鍵,然后worker自己去拿數(shù)據(jù)即可。

具體代碼如下:

private static final int GRID_SIZE = 4;
@Bean
public Partitioner partitioner() {
 return new Partitioner() {
 @Override
 public Map<String, ExecutionContext> partition(int gridSize) {

 Map<String, ExecutionContext> partitions = new HashMap<>(gridSize);

 for (int i = 0; i < GRID_SIZE; i++) {
 ExecutionContext executionContext = new ExecutionContext();
 executionContext.put("partitionNumber", i);
 partitions.put("partition" + i, executionContext);
 }

 return partitions;
 }
 };
}

上面分成4個區(qū),程序會啟動4個worker來處理;給worker傳遞的參數(shù)是partitionNumber

2.2.2 分區(qū)處理器PartitionHandler

PartitionHandler也是核心的bean,它決定了怎么去啟動worker,給它們傳遞什么jvm參數(shù)(跟之前的ExecutionContext傳遞不一樣)。

@Bean
public PartitionHandler partitionHandler(TaskLauncher taskLauncher, JobExplorer jobExplorer, TaskRepository taskRepository) throws Exception {

 Resource resource = this.resourceLoader.getResource(workerResource);

 DeployerPartitionHandler partitionHandler =
 new DeployerPartitionHandler(taskLauncher, jobExplorer, resource, "workerStep", taskRepository);

 List<String> commandLineArgs = new ArrayList<>(3);
 commandLineArgs.add("--spring.profiles.active=worker");
 commandLineArgs.add("--spring.cloud.task.initialize-enabled=false");
 commandLineArgs.add("--spring.batch.initializer.enabled=false");

 partitionHandler
 .setCommandLineArgsProvider(new PassThroughCommandLineArgsProvider(commandLineArgs));
 partitionHandler
 .setEnvironmentVariablesProvider(new SimpleEnvironmentVariablesProvider(this.environment));
 partitionHandler.setMaxWorkers(2);
 partitionHandler.setApplicationName("PkslowWorkerJob");

 return partitionHandler;
}

上面代碼中:

resourceworkerjar包地址,表示將啟動該程序;

workerStepworker將要執(zhí)行的step;

commandLineArgs定義了啟動workerjvm參數(shù),如--spring.profiles.active=worker;

environmentmanager的系統(tǒng)環(huán)境變量,可以傳遞給worker,當然也可以選擇不傳遞;

MaxWorkers是最多能同時啟動多少個worker,類似于線程池大??;設(shè)置為2,表示最多同時有2個worker來處理4個分區(qū)。

2.2.3 Manager和Worker的Batch定義

完成了分區(qū)相關(guān)的代碼,剩下的就只是如何定義ManagerWorker的業(yè)務(wù)代碼了。

Manager作為管理者,不用太多業(yè)務(wù)邏輯,代碼如下:

@Bean
@Profile("!worker")
public Job partitionedJob(PartitionHandler partitionHandler) throws Exception {
 Random random = new Random();
 return this.jobBuilderFactory.get("partitionedJob" + random.nextInt())
 .start(step1(partitionHandler))
 .build();
}

@Bean
public Step step1(PartitionHandler partitionHandler) throws Exception {
 return this.stepBuilderFactory.get("step1")
 .partitioner(workerStep().getName(), partitioner())
 .step(workerStep())
 .partitionHandler(partitionHandler)
 .build();
}

Worker主要作用是處理數(shù)據(jù),是我們的業(yè)務(wù)代碼,這里就演示一下如何獲取Manager傳遞過來的partitionNumber

@Bean
public Step workerStep() {
 return this.stepBuilderFactory.get("workerStep")
 .tasklet(workerTasklet(null, null))
 .build();
}

@Bean
@StepScope
public Tasklet workerTasklet(final @Value("#{stepExecutionContext['partitionNumber']}") Integer partitionNumber) {
 return new Tasklet() {
 @Override
 public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
 Thread.sleep(6000); //增加延時,查看效果,通過jps:在jar情況下會新起java進程
 System.out.println("This tasklet ran partition: " + partitionNumber);
 
 return RepeatStatus.FINISHED;
 }
 };
}

通過表達式@Value("#{stepExecutionContext['partitionNumber']}") 獲取Manager傳遞過來的變量;注意要加注解@StepScope

3 程序運行

因為我們分為ManagerWorker,但都是同一份代碼,所以我們先打包一個jar出來,不然manager無法啟動。配置數(shù)據(jù)庫和Workerjar包地址如下:

spring.datasource.url=jdbc:h2:tcp://localhost:9092/test
spring.datasource.username=pkslow
spring.datasource.password=pkslow
spring.datasource.driver-class-name=org.h2.Driver

pkslow.worker.resource=file://pkslow/target/remote-partitioning-jar-1.0-SNAPSHOT.jar

執(zhí)行程序如下:

可以看到啟動了4次Java程序,還給出日志路徑。

通過jps命令查看,能看到一個Manager進程,還有兩個worker進程:

4 復雜變量傳遞

前面講了Manager可以通過ExecutionContext傳遞變量,如簡單的Stringlong等。但其實它也是可以傳遞復雜的Java對象的,但對應的類需要可序列化,如:

import java.io.Serializable;

public class Person implements Serializable {
 private Integer age;
 private String name;
 private String webSite;
 //getter and setter
}

Manager傳遞:

executionContext.put("person", new Person(0, "pkslow", "www.pkslow.com"));

Worker接收:

@Value("#{stepExecutionContext['person']}") Person person

5 總結(jié)

本文介紹了Spring Batch遠程分區(qū)的本地Jar包模式,只能在一臺機器上運行,所以也是無法真正發(fā)揮出遠程分區(qū)的作用。但它對我們后續(xù)理解更復雜的模式是有很大幫助的;同時,我們也可以使用本地模式進行開發(fā)測試,畢竟它只需要一個數(shù)據(jù)庫就行了,依賴很少。

相關(guān)文章

  • java使用rmi傳輸大文件示例分享

    java使用rmi傳輸大文件示例分享

    由于在rmi中無法傳輸文件流,可以先用FileInputStream將文件讀到一個Byte數(shù)組中,然后把這個Byte數(shù)組作為參數(shù)傳進RMI的方法中,然后在服務(wù)器端將Byte數(shù)組還原為outputStream,這樣就能通過RMI 來傳輸文件了,下面我們來看實例
    2014-01-01
  • mybatis如何返回某列的最大值

    mybatis如何返回某列的最大值

    這篇文章主要介紹了mybatis如何返回某列的最大值操作,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2021-07-07
  • Sentinel中實現(xiàn)限流的兩種方法

    Sentinel中實現(xiàn)限流的兩種方法

    本文給大家介紹了Sentinel中實現(xiàn)限流的兩種方法,限流是一種通過控制系統(tǒng)對外提供的資源、服務(wù)或接口的訪問數(shù)量或速率,以保護系統(tǒng)免受過載的一種策略,需要的朋友可以參考下
    2024-02-02
  • tk-mybatis整合springBoot使用兩個數(shù)據(jù)源的方法

    tk-mybatis整合springBoot使用兩個數(shù)據(jù)源的方法

    單純的使用mybaits進行多數(shù)據(jù)配置網(wǎng)上資料很多,但是關(guān)于tk-mybaits多數(shù)據(jù)源配置沒有相關(guān)材料,本文就詳細的介紹一下如何使用,感興趣的可以了解一下
    2021-12-12
  • 詳解java Collections.sort的兩種用法

    詳解java Collections.sort的兩種用法

    這篇文章主要介紹了詳解java Collections.sort的兩種用法,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2020-07-07
  • JVM系列之String.intern的性能解析

    JVM系列之String.intern的性能解析

    這篇文章主要介紹了JVM系列之String.intern的性能解析,本文通過實例代碼給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下
    2020-06-06
  • Eureka源碼閱讀解析Server服務(wù)端啟動流程實例

    Eureka源碼閱讀解析Server服務(wù)端啟動流程實例

    這篇文章主要為大家介紹了Eureka源碼閱讀解析Server服務(wù)端啟動流程實例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪
    2022-10-10
  • BeanDefinition基礎(chǔ)信息講解

    BeanDefinition基礎(chǔ)信息講解

    今天小編就為大家分享一篇關(guān)于BeanDefinition基礎(chǔ)信息講解,小編覺得內(nèi)容挺不錯的,現(xiàn)在分享給大家,具有很好的參考價值,需要的朋友一起跟隨小編來看看吧
    2019-02-02
  • 深入了解java8的foreach循環(huán)

    深入了解java8的foreach循環(huán)

    雖然java8出來很久了,但是之前用的一直也不多,最近正好學習了java8。下面給大家分享java8中的foreach循環(huán),感興趣的朋友一起看看吧
    2017-05-05
  • 深入了解Java對象的克隆

    深入了解Java對象的克隆

    這篇文章主要介紹了Java對象的克隆的相關(guān)資料,幫助大家更好的理解和學習Java,感興趣的朋友可以了解下
    2020-08-08

最新評論