用JAVA實(shí)現(xiàn)一套背壓機(jī)制
Reactive Streams:一種支持背壓的異步數(shù)據(jù)流處理標(biāo)準(zhǔn),主流實(shí)現(xiàn)有RxJava和Reactor,Spring WebFlux默認(rèn)集成的是Reactor。
Reactive Streams主要解決背壓(back-pressure)問題。當(dāng)傳入的任務(wù)速率大于系統(tǒng)處理能力時(shí),數(shù)據(jù)處理將會(huì)對(duì)未處理數(shù)據(jù)產(chǎn)生一個(gè)緩沖區(qū)。
背壓依我的理解來說,是指訂閱者能和發(fā)布者交互(通過代碼里面的調(diào)用request和cancel方法交互),可以調(diào)節(jié)發(fā)布者發(fā)布數(shù)據(jù)的速率,解決把訂閱者壓垮的問題。關(guān)鍵在于上面例子里面的訂閱關(guān)系Subscription這個(gè)接口,他有request和cancel 2個(gè)方法,用于通知發(fā)布者需要數(shù)據(jù)和通知發(fā)布者不再接受數(shù)據(jù)。
我們重點(diǎn)理解背壓在jdk9里面是如何實(shí)現(xiàn)的。關(guān)鍵在于發(fā)布者Publisher的實(shí)現(xiàn)類SubmissionPublisher的submit方法是block方法。訂閱者會(huì)有一個(gè)緩沖池,默認(rèn)為Flow.defaultBufferSize() = 256。當(dāng)訂閱者的緩沖池滿了之后,發(fā)布者調(diào)用submit方法發(fā)布數(shù)據(jù)就會(huì)被阻塞,發(fā)布者就會(huì)停(慢)下來;訂閱者消費(fèi)了數(shù)據(jù)之后(調(diào)用Subscription.request方法),緩沖池有位置了,submit方法就會(huì)繼續(xù)執(zhí)行下去,就是通過這樣的機(jī)制,實(shí)現(xiàn)了調(diào)節(jié)發(fā)布者發(fā)布數(shù)據(jù)的速率,消費(fèi)得快,生成就快,消費(fèi)得慢,發(fā)布者就會(huì)被阻塞,當(dāng)然就會(huì)慢下來了。
單線程版本:
一個(gè)生產(chǎn)者,一個(gè)消費(fèi)者
import lombok.SneakyThrows; import java.util.ArrayList; import java.util.List; import java.util.Random; public class BackpressureExample { public static void main(String[] args) throws InterruptedException { BackpressureSubscriber subscriber = new BackpressureSubscriber(); BackpressurePublisher publisher = new BackpressurePublisher(subscriber); publisher.start(); subscriber.start(); // 為了演示效果,這里讓主線程休眠一段時(shí)間 Thread.sleep(50000); publisher.stop(); subscriber.stop(); } @SneakyThrows public static void processDataLogic(List<Integer> batch) { //模擬任務(wù)執(zhí)行 int r = new Random().nextInt(3000); Thread.sleep(r); System.out.println(Thread.currentThread().getName() + ",Received batch: " + batch + ",sleep ms = " + r); } static class BackpressurePublisher { private final BackpressureSubscriber subscriber; private volatile boolean running; public BackpressurePublisher(BackpressureSubscriber subscriber) { this.subscriber = subscriber; this.running = true; } public void start() { Thread thread = new Thread(() -> { int item = 1; while (running) { List<Integer> batch = new ArrayList<>(); for (int i = 0; i < 5; i++) { System.out.println(Thread.currentThread().getName() + "-----produce data = " + item); batch.add(item++); } while (!subscriber.accept(batch)) { if (!running) { break; } } } }); thread.start(); } public void stop() { running = false; } } static class BackpressureSubscriber { private volatile boolean running; public BackpressureSubscriber() { this.running = true; } public boolean accept(List<Integer> batch) { if (running) { processDataLogic(batch); return true; } else { return false; } } public void start() { // Subscriber 在 JDK 8 中沒有異步處理的能力,因此不需要單獨(dú)開啟線程 } public void stop() { running = false; } } }
多線程版本
一個(gè)生產(chǎn)者,多個(gè)消費(fèi)者
import lombok.SneakyThrows; import java.util.ArrayList; import java.util.List; import java.util.Random; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; public class BackpressureExample { public static void main(String[] args) throws InterruptedException { BackpressureSubscriber subscriber = new BackpressureSubscriber(); BackpressurePublisher publisher = new BackpressurePublisher(subscriber); publisher.start(); subscriber.start(); // 為了演示效果,這里讓主線程休眠一段時(shí)間 Thread.sleep(50000); publisher.stop(); subscriber.stop(); } @SneakyThrows public static void processDataLogic(List<Integer> batch) { //模擬任務(wù)執(zhí)行 int r = new Random().nextInt(3000); Thread.sleep(r); System.out.println(Thread.currentThread().getName() + ",Received batch: " + batch + ",sleep ms = " + r); } static class BackpressurePublisher { private final BackpressureSubscriber subscriber; private volatile boolean running; public BackpressurePublisher(BackpressureSubscriber subscriber) { this.subscriber = subscriber; this.running = true; } public void start() { Thread thread = new Thread(() -> { int item = 1; while (running) { List<Integer> batch = new ArrayList<>(); for (int i = 0; i < 5; i++) { System.out.println(Thread.currentThread().getName() + "-----produce data = " + item); batch.add(item++); } while (!subscriber.accept(batch)) { if (!running) { break; } } } }); thread.start(); } public void stop() { running = false; } } static class BackpressureSubscriber { private volatile boolean running; private final ExecutorService executor; private final int workerSize = 2; private final List<Future> futures; public BackpressureSubscriber() { this.running = true; this.executor = Executors.newFixedThreadPool(workerSize); futures = new ArrayList<>(workerSize); } public boolean accept(List<Integer> batch) { if (running) { Future f = executor.submit(() -> processDataLogic(batch)); futures.add(f); waitForTaskDone(futures); return true; } else { return false; } } public void waitForTaskDone(List<Future> futures) { while (futures.size() >= workerSize) { for (Future future : futures) { if (future.isDone()) { // 只要有一個(gè)worker是空閑就重新獲取任務(wù) futures.remove(future); return; } } } } public void start() { // Subscriber 在 JDK 8 中沒有異步處理的能力,因此不需要單獨(dú)開啟線程 } public void stop() { running = false; executor.shutdown(); } } }
到此這篇關(guān)于用JAVA自己實(shí)現(xiàn)一套背壓機(jī)制的文章就介紹到這了,更多相關(guān)java背壓機(jī)制內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
淺談JDK8中的Duration Period和ChronoUnit
在JDK8中,引入了三個(gè)非常有用的時(shí)間相關(guān)的API:Duration,Period和ChronoUnit。他們都是用來對(duì)時(shí)間進(jìn)行統(tǒng)計(jì)的,本文將會(huì)詳細(xì)講解一下這三個(gè)API的使用2021-06-06Servlet的兩種創(chuàng)建方式(xml?注解)示例詳解
這篇文章主要為大家介紹了Servlet的兩種創(chuàng)建方式(xml?注解)示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-08-08Debian 7 和 Debian 8 用戶安裝 Java 8的方法
Oracle Java 8 穩(wěn)定版本近期已發(fā)布,有很多新的特征變化。其中,有功能的程序支持通過“Lambda項(xiàng)目 ”,收到了一些安全更新和界面改進(jìn)上的bug修復(fù),使得開發(fā)人員的工作更容易。2014-03-03Java的非阻塞隊(duì)列ConcurrentLinkedQueue解讀
這篇文章主要介紹了Java的非阻塞隊(duì)列ConcurrentLinkedQueue解讀,在并發(fā)編程中,有時(shí)候需要使用線程安全的隊(duì)列,如果要實(shí)現(xiàn)一個(gè)線程安全的隊(duì)列有兩種方式:一種是使用阻塞算法,另一種是使用非阻塞算法,需要的朋友可以參考下2023-12-12java 從int數(shù)組中獲取最大數(shù)的方法
這篇文章主要介紹了java 從int數(shù)組中獲取最大數(shù)的方法,需要的朋友可以參考下2017-02-02