亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

用JAVA實(shí)現(xiàn)一套背壓機(jī)制

 更新時(shí)間:2023年06月30日 08:47:27   作者:hwp0710  
背壓依我的理解來說,是指訂閱者能和發(fā)布者交互,可以調(diào)節(jié)發(fā)布者發(fā)布數(shù)據(jù)的速率,解決把訂閱者壓垮的問題,這篇文章主要介紹了用JAVA自己實(shí)現(xiàn)一套背壓機(jī)制,需要的朋友可以參考下

Reactive Streams:一種支持背壓的異步數(shù)據(jù)流處理標(biāo)準(zhǔn),主流實(shí)現(xiàn)有RxJava和Reactor,Spring WebFlux默認(rèn)集成的是Reactor。

Reactive Streams主要解決背壓(back-pressure)問題。當(dāng)傳入的任務(wù)速率大于系統(tǒng)處理能力時(shí),數(shù)據(jù)處理將會(huì)對(duì)未處理數(shù)據(jù)產(chǎn)生一個(gè)緩沖區(qū)。

背壓依我的理解來說,是指訂閱者能和發(fā)布者交互(通過代碼里面的調(diào)用request和cancel方法交互),可以調(diào)節(jié)發(fā)布者發(fā)布數(shù)據(jù)的速率,解決把訂閱者壓垮的問題。關(guān)鍵在于上面例子里面的訂閱關(guān)系Subscription這個(gè)接口,他有request和cancel 2個(gè)方法,用于通知發(fā)布者需要數(shù)據(jù)和通知發(fā)布者不再接受數(shù)據(jù)。

我們重點(diǎn)理解背壓在jdk9里面是如何實(shí)現(xiàn)的。關(guān)鍵在于發(fā)布者Publisher的實(shí)現(xiàn)類SubmissionPublisher的submit方法是block方法。訂閱者會(huì)有一個(gè)緩沖池,默認(rèn)為Flow.defaultBufferSize() = 256。當(dāng)訂閱者的緩沖池滿了之后,發(fā)布者調(diào)用submit方法發(fā)布數(shù)據(jù)就會(huì)被阻塞,發(fā)布者就會(huì)停(慢)下來;訂閱者消費(fèi)了數(shù)據(jù)之后(調(diào)用Subscription.request方法),緩沖池有位置了,submit方法就會(huì)繼續(xù)執(zhí)行下去,就是通過這樣的機(jī)制,實(shí)現(xiàn)了調(diào)節(jié)發(fā)布者發(fā)布數(shù)據(jù)的速率,消費(fèi)得快,生成就快,消費(fèi)得慢,發(fā)布者就會(huì)被阻塞,當(dāng)然就會(huì)慢下來了。

單線程版本:

一個(gè)生產(chǎn)者,一個(gè)消費(fèi)者

import lombok.SneakyThrows;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
public class BackpressureExample {
    public static void main(String[] args) throws InterruptedException {
        BackpressureSubscriber subscriber = new BackpressureSubscriber();
        BackpressurePublisher publisher = new BackpressurePublisher(subscriber);
        publisher.start();
        subscriber.start();
        // 為了演示效果,這里讓主線程休眠一段時(shí)間
        Thread.sleep(50000);
        publisher.stop();
        subscriber.stop();
    }
    @SneakyThrows
    public static void processDataLogic(List<Integer> batch) {
        //模擬任務(wù)執(zhí)行
        int r = new Random().nextInt(3000);
        Thread.sleep(r);
        System.out.println(Thread.currentThread().getName() + ",Received batch: " + batch + ",sleep ms = " + r);
    }
    static class BackpressurePublisher {
        private final BackpressureSubscriber subscriber;
        private volatile boolean running;
        public BackpressurePublisher(BackpressureSubscriber subscriber) {
            this.subscriber = subscriber;
            this.running = true;
        }
        public void start() {
            Thread thread = new Thread(() -> {
                int item = 1;
                while (running) {
                    List<Integer> batch = new ArrayList<>();
                    for (int i = 0; i < 5; i++) {
                        System.out.println(Thread.currentThread().getName() + "-----produce data = " + item);
                        batch.add(item++);
                    }
                    while (!subscriber.accept(batch)) {
                        if (!running) {
                            break;
                        }
                    }
                }
            });
            thread.start();
        }
        public void stop() {
            running = false;
        }
    }
    static class BackpressureSubscriber {
        private volatile boolean running;
        public BackpressureSubscriber() {
            this.running = true;
        }
        public boolean accept(List<Integer> batch) {
            if (running) {
                processDataLogic(batch);
                return true;
            } else {
                return false;
            }
        }
        public void start() {
            // Subscriber 在 JDK 8 中沒有異步處理的能力,因此不需要單獨(dú)開啟線程
        }
        public void stop() {
            running = false;
        }
    }
}

多線程版本

一個(gè)生產(chǎn)者,多個(gè)消費(fèi)者

import lombok.SneakyThrows;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class BackpressureExample {
    public static void main(String[] args) throws InterruptedException {
        BackpressureSubscriber subscriber = new BackpressureSubscriber();
        BackpressurePublisher publisher = new BackpressurePublisher(subscriber);
        publisher.start();
        subscriber.start();
        // 為了演示效果,這里讓主線程休眠一段時(shí)間
        Thread.sleep(50000);
        publisher.stop();
        subscriber.stop();
    }
    @SneakyThrows
    public static void processDataLogic(List<Integer> batch) {
        //模擬任務(wù)執(zhí)行
        int r = new Random().nextInt(3000);
        Thread.sleep(r);
        System.out.println(Thread.currentThread().getName() + ",Received batch: " + batch + ",sleep ms = " + r);
    }
    static class BackpressurePublisher {
        private final BackpressureSubscriber subscriber;
        private volatile boolean running;
        public BackpressurePublisher(BackpressureSubscriber subscriber) {
            this.subscriber = subscriber;
            this.running = true;
        }
        public void start() {
            Thread thread = new Thread(() -> {
                int item = 1;
                while (running) {
                    List<Integer> batch = new ArrayList<>();
                    for (int i = 0; i < 5; i++) {
                        System.out.println(Thread.currentThread().getName() + "-----produce data = " + item);
                        batch.add(item++);
                    }
                    while (!subscriber.accept(batch)) {
                        if (!running) {
                            break;
                        }
                    }
                }
            });
            thread.start();
        }
        public void stop() {
            running = false;
        }
    }
    static class BackpressureSubscriber {
        private volatile boolean running;
        private final ExecutorService executor;
        private final int workerSize = 2;
        private final List<Future> futures;
        public BackpressureSubscriber() {
            this.running = true;
            this.executor = Executors.newFixedThreadPool(workerSize);
            futures = new ArrayList<>(workerSize);
        }
        public boolean accept(List<Integer> batch) {
            if (running) {
                Future f = executor.submit(() -> processDataLogic(batch));
                futures.add(f);
                waitForTaskDone(futures);
                return true;
            } else {
                return false;
            }
        }
        public void waitForTaskDone(List<Future> futures) {
            while (futures.size() >= workerSize) {
                for (Future future : futures) {
                    if (future.isDone()) {
                        // 只要有一個(gè)worker是空閑就重新獲取任務(wù)
                        futures.remove(future);
                        return;
                    }
                }
            }
        }
        public void start() {
            // Subscriber 在 JDK 8 中沒有異步處理的能力,因此不需要單獨(dú)開啟線程
        }
        public void stop() {
            running = false;
            executor.shutdown();
        }
    }
}

到此這篇關(guān)于用JAVA自己實(shí)現(xiàn)一套背壓機(jī)制的文章就介紹到這了,更多相關(guān)java背壓機(jī)制內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • 淺談JDK8中的Duration Period和ChronoUnit

    淺談JDK8中的Duration Period和ChronoUnit

    在JDK8中,引入了三個(gè)非常有用的時(shí)間相關(guān)的API:Duration,Period和ChronoUnit。他們都是用來對(duì)時(shí)間進(jìn)行統(tǒng)計(jì)的,本文將會(huì)詳細(xì)講解一下這三個(gè)API的使用
    2021-06-06
  • Java中捕獲線程異常的幾種方式總結(jié)

    Java中捕獲線程異常的幾種方式總結(jié)

    這篇文章主要介紹了Java中捕獲線程異常的幾種方式總結(jié),具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2022-11-11
  • Servlet的兩種創(chuàng)建方式(xml?注解)示例詳解

    Servlet的兩種創(chuàng)建方式(xml?注解)示例詳解

    這篇文章主要為大家介紹了Servlet的兩種創(chuàng)建方式(xml?注解)示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2023-08-08
  • Java線程生命周期及轉(zhuǎn)換過程

    Java線程生命周期及轉(zhuǎn)換過程

    這篇文章主要介紹了Java線程生命周期及轉(zhuǎn)換過程,線程的生命周期指的是線程從創(chuàng)建到銷毀的整個(gè)過程初始狀態(tài)、可運(yùn)行狀態(tài)、運(yùn)行狀態(tài)、休眠狀態(tài)、終止?fàn)顟B(tài),更多詳細(xì)介紹,需要的小伙伴可以參考下面文章內(nèi)容
    2022-05-05
  • Java HashMap的工作原理

    Java HashMap的工作原理

    這篇文章主要介紹了Java HashMap的工作原理的相關(guān)資料,需要的朋友可以參考下
    2016-03-03
  • MyBatis的動(dòng)態(tài)攔截sql并修改

    MyBatis的動(dòng)態(tài)攔截sql并修改

    因工作需求,需要根據(jù)用戶的數(shù)據(jù)權(quán)限,來查詢并展示相應(yīng)的數(shù)據(jù),那么就需要?jiǎng)討B(tài)攔截sql,本文就來介紹了MyBatis的動(dòng)態(tài)攔截sql并修改,感興趣的可以了解一下
    2023-11-11
  • Java并發(fā)編程信號(hào)量Semapher

    Java并發(fā)編程信號(hào)量Semapher

    這篇文章主要介紹了Java并發(fā)編程信號(hào)量Semapher,Semapher信號(hào)量也是Java中的一個(gè)同步器,下文關(guān)于信號(hào)量Semapher的更多內(nèi)容介紹,需要的小伙伴可以參考下面文章
    2022-04-04
  • Debian 7 和 Debian 8 用戶安裝 Java 8的方法

    Debian 7 和 Debian 8 用戶安裝 Java 8的方法

    Oracle Java 8 穩(wěn)定版本近期已發(fā)布,有很多新的特征變化。其中,有功能的程序支持通過“Lambda項(xiàng)目 ”,收到了一些安全更新和界面改進(jìn)上的bug修復(fù),使得開發(fā)人員的工作更容易。
    2014-03-03
  • Java的非阻塞隊(duì)列ConcurrentLinkedQueue解讀

    Java的非阻塞隊(duì)列ConcurrentLinkedQueue解讀

    這篇文章主要介紹了Java的非阻塞隊(duì)列ConcurrentLinkedQueue解讀,在并發(fā)編程中,有時(shí)候需要使用線程安全的隊(duì)列,如果要實(shí)現(xiàn)一個(gè)線程安全的隊(duì)列有兩種方式:一種是使用阻塞算法,另一種是使用非阻塞算法,需要的朋友可以參考下
    2023-12-12
  • java 從int數(shù)組中獲取最大數(shù)的方法

    java 從int數(shù)組中獲取最大數(shù)的方法

    這篇文章主要介紹了java 從int數(shù)組中獲取最大數(shù)的方法,需要的朋友可以參考下
    2017-02-02

最新評(píng)論