亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

SpringBoot基于Disruptor實現(xiàn)高效的消息隊列?

 更新時間:2024年02月22日 09:04:48   作者:wx59bcc77095d22  
Disruptor是一個開源的Java框架,它被設計用于在生產者-消費者問題上獲得盡量高的吞吐量和盡量低的延遲,本文主要介紹了SpringBoot基于Disruptor實現(xiàn)高效的消息隊列?,具有一定的參考價值,感興趣的可以了解一下

一、前言

Disruptor是一個開源的Java框架,它被設計用于在生產者-消費者問題上獲得盡量高的吞吐量和盡量低的延遲,從功能上來看Disruptor是實現(xiàn)了隊列的功能,而且是一個有界隊列。那么它的應用場景自然就是“生產者-消費者”模型的應用場合了。Disruptor 是在內存中以隊列的方式去實現(xiàn)的,而且是無鎖的。這也是 Disruptor 為什么高效的原因。

二、SpringBoot整合Disruptor

1.添加依賴

<!--Disruptor-->
<dependency>
    <groupId>com.lmax</groupId>
    <artifactId>disruptor</artifactId>
    <version>3.4.4</version>
</dependency>
<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
</dependency>

2.創(chuàng)建消息體實體

package com.example.aopdemo.disruptor;

import lombok.Data;

/**
 * @author qx
 * @date 2024/2/21
 * @des 消息體
 */
@Data
public class MessageModel {

    private String message;

}

3.創(chuàng)建事件工廠類

package com.example.aopdemo.disruptor;

import com.lmax.disruptor.EventFactory;

/**
 * @author qx
 * @date 2024/2/21
 * @des 事件工廠類
 */
public class MessageEventFactory implements EventFactory<MessageModel> {
    @Override
    public MessageModel newInstance() {
        return new MessageModel();
    }
}

4.創(chuàng)建消費者

package com.example.aopdemo.disruptor;

import com.lmax.disruptor.EventHandler;
import lombok.extern.slf4j.Slf4j;

/**
 * @author qx
 * @date 2024/2/21
 * @des 消息消費者
 */
@Slf4j
public class MessageEventHandler implements EventHandler<MessageModel> {
    @Override
    public void onEvent(MessageModel messageModel, long sequence, boolean endOfBatch) {
        log.info("消費者獲取消息:{}", messageModel);
    }
}

5.構造BeanManager

package com.example.aopdemo.disruptor;

import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;

/**
 * @author qx
 * @date 2024/2/21
 * @des
 */
@Component
public class BeanManager implements ApplicationContextAware {

    private static ApplicationContext applicationContext;

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        BeanManager.applicationContext = applicationContext;
    }

    public static ApplicationContext getApplicationContext() {
        return applicationContext;
    }

    public static Object getBean(String name) {
        return applicationContext.getBean(name);
    }

    public static <T> T getBean(Class<T> clazz) {
        return applicationContext.getBean(clazz);
    }
}

6.創(chuàng)建消息管理器

package com.example.aopdemo.disruptor;

import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * @author qx
 * @date 2024/2/21
 * @des 事件管理器
 */
@Configuration
public class MessageManager {

    @Bean("messageModel")
    public RingBuffer<MessageModel> messageModelRingBuffer() {
        // 定義線程池
        ExecutorService executorService = Executors.newFixedThreadPool(2);

        // 指定事件工廠
        MessageEventFactory factory = new MessageEventFactory();

        // 指定ringbuffer字節(jié)大小,必須為2的N次方(能將求模運算轉為位運算提高效率),否則將影響效率
        int bufferSize = 1024 * 256;

        //單線程模式,獲取額外的性能
        Disruptor<MessageModel> disruptor = new Disruptor<>(factory, bufferSize, executorService, ProducerType.SINGLE, new BlockingWaitStrategy());

        //設置事件業(yè)務處理器---消費者
        disruptor.handleEventsWith(new MessageEventHandler());

        //啟動disruptor線程
        disruptor.start();

        //獲取ringbuffer環(huán),用于接取生產者生產的事件
        return disruptor.getRingBuffer();
    }

}

7.創(chuàng)建生產者

package com.example.aopdemo.disruptor;

import com.lmax.disruptor.RingBuffer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

/**
 * @author qx
 * @date 2024/2/21
 * @des 生產者
 */
@Service
@Slf4j
public class DisruptorService {

    @Autowired
    private RingBuffer<MessageModel> messageModelRingBuffer;

    public void sayMessage(String message) {
        // 獲取下一個Event槽的下標
        long sequence = messageModelRingBuffer.next();
        try {
            // 填充數(shù)據(jù)
            MessageModel messageModel = messageModelRingBuffer.get(sequence);
            messageModel.setMessage(message);
            log.info("往消息隊列中添加消息:{}", messageModel);
        } catch (Exception e) {
            log.error("failed to add event to messageModelRingBuffer for : e = {},{}", e, e.getMessage());
        } finally {
            //發(fā)布Event,激活觀察者去消費,將sequence傳遞給改消費者
            //注意最后的publish方法必須放在finally中以確保必須得到調用;如果某個請求的sequence未被提交將會堵塞后續(xù)的發(fā)布操作或者其他的producer
            messageModelRingBuffer.publish(sequence);
        }

    }

}

8.創(chuàng)建測試類

package com.example.aopdemo.controller;

import com.example.aopdemo.disruptor.DisruptorService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @author qx
 * @date 2024/2/21
 * @des Disruptor測試
 */
@RestController
public class DisruptorController {

    @Autowired
    private DisruptorService disruptorService;

    @GetMapping("/disruptor")
    public String disruptorTest(String message) {
        disruptorService.sayMessage(message);
        return "發(fā)送消息成功";
    }
}

9.測試

啟動程序,在瀏覽器訪問請求連接進行測試。

我們在控制臺上可以獲取到消息的發(fā)送和接收信息。

2024-02-21 15:22:16.059  INFO 6788 --- [nio-8080-exec-1] c.e.aopdemo.disruptor.DisruptorService   : 往消息隊列中添加消息:MessageModel(message=hello)
2024-02-21 15:22:16.060  INFO 6788 --- [pool-1-thread-1] c.e.a.disruptor.MessageEventHandler      : 消費者獲取消息:MessageModel(message=hello)

到此這篇關于SpringBoot基于Disruptor實現(xiàn)高效的消息隊列 的文章就介紹到這了,更多相關SpringBoot Disruptor消息隊列內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!

相關文章

  • Java實現(xiàn)數(shù)組去除重復數(shù)據(jù)的方法詳解

    Java實現(xiàn)數(shù)組去除重復數(shù)據(jù)的方法詳解

    這篇文章主要介紹了Java實現(xiàn)數(shù)組去除重復數(shù)據(jù)的方法,結合實例形式詳細分析了java數(shù)組去除重復的幾種常用方法、實現(xiàn)原理與相關注意事項,需要的朋友可以參考下
    2017-09-09
  • SpringBoot+layui實現(xiàn)文件上傳功能

    SpringBoot+layui實現(xiàn)文件上傳功能

    Spring Boot是由Pivotal團隊提供的全新框架,其設計目的是用來簡化新Spring應用的初始搭建以及開發(fā)過程。這篇文章主要介紹了SpringBoot+layui實現(xiàn)文件上傳,需要的朋友可以參考下
    2018-09-09
  • Java基礎之自定義類加載器

    Java基礎之自定義類加載器

    應該有很多小伙伴還不了解Java自定義類加載器吧,下文中有對Java自定義類加載器非常詳細的描述,還有小伙伴們最喜歡的代碼環(huán)節(jié),需要的朋友可以參考下
    2021-05-05
  • 查看Java所支持的語言及相應的版本信息

    查看Java所支持的語言及相應的版本信息

    Java語言作為第一種支持國際化的語言,在Internet從一開始就具有其他語言無與倫比的國際化的本質特性,查看Java所支持的語言及相應的版本信息可以采用以下代碼進行查詢
    2014-01-01
  • 一文詳解如何使用Java來發(fā)送qq郵箱郵件

    一文詳解如何使用Java來發(fā)送qq郵箱郵件

    這篇文章主要給大家介紹了關于如何使用Java來發(fā)送qq郵箱郵件的相關資料,文中降了準備工作(開啟服務并生成授權碼)、接口調用(引入依賴和編寫接口代碼)、發(fā)送HTML格式郵件等內容,需要的朋友可以參考下
    2024-12-12
  • Java用遞歸方法解決漢諾塔問題詳解

    Java用遞歸方法解決漢諾塔問題詳解

    漢諾塔問題是一個經典的問題。漢諾塔(Hanoi?Tower),又稱河內塔,源于印度一個古老傳說。本文將用Java遞歸方法求解這一問題,感興趣的可以學習一下
    2022-04-04
  • Java的中l(wèi)ombok下的@Builder注解用法詳解

    Java的中l(wèi)ombok下的@Builder注解用法詳解

    這篇文章主要介紹了Java的中l(wèi)ombok下的@Builder注解用法詳解,lombok注解在java進行編譯時進行代碼的構建,對于java對象的創(chuàng)建工作它可以更優(yōu)雅,不需要寫多余的重復的代碼,在出現(xiàn)lombok之后,對象的創(chuàng)建工作更提供Builder方法,需要的朋友可以參考下
    2023-11-11
  • 如何開發(fā)基于Netty的HTTP/HTTPS應用程序

    如何開發(fā)基于Netty的HTTP/HTTPS應用程序

    HTTP/HTTPS是最常見的協(xié)議套件之一,并且隨著智能手機的成功,它的應用也日益廣泛,因為對于任何公司來說,擁有一個可以被移動設備訪問的網站幾乎是必須的。下面就來看看如何開發(fā)基于Netty的HTTP/HTTPS應用程序
    2021-06-06
  • Java線程池由淺入深掌握到精通

    Java線程池由淺入深掌握到精通

    什么是線程池?很簡單,簡單看名字就知道是裝有線程的池子,我們可以把要執(zhí)行的多線程交給線程池來處理,和連接池的概念一樣,通過維護一定數(shù)量的線程池來達到多個線程的復用
    2021-09-09
  • Java 輸入流中的read(byte[] b)方法詳解

    Java 輸入流中的read(byte[] b)方法詳解

    這篇文章主要介紹了Java 輸入流中的read(byte[] b)方法詳解,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧
    2021-01-01

最新評論