亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

docker啟動rabbitmq以及使用方式詳解

 更新時間:2022年08月04日 11:34:00   作者:Maackia  
RabbitMQ是一個由erlang開發(fā)的消息隊列,下面這篇文章主要給大家介紹了關于docker啟動rabbitmq以及使用的相關資料,文中通過圖文介紹的非常詳細,需要的朋友可以參考下

搜索rabbitmq鏡像

docker search rabbitmq:management

下載鏡像

docker pull rabbitmq:management

啟動容器

docker run -d --hostname localhost --name rabbitmq -p 15672:15672 -p 5672:5672 rabbitmq:management

打印容器

docker logs rabbitmq

訪問RabbitMQ Management

http://localhost:15672

賬戶密碼默認:guest

編寫生產(chǎn)者類

package com.xun.rabbitmqdemo.example;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Producer {
    private final static String QUEUE_NAME = "hello";
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUsername("guest");
        factory.setPassword("guest");
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setVirtualHost("/");

        Connection connection = factory.newConnection();

        Channel channel = connection.createChannel();
        /**
         * 生成一個queue隊列
         * 1、隊列名稱 QUEUE_NAME
         * 2、隊列里面的消息是否持久化(默認消息存儲在內(nèi)存中)
         * 3、該隊列是否只供一個Consumer消費 是否共享 設置為true可以多個消費者消費
         * 4、是否自動刪除 最后一個消費者斷開連接后 該隊列是否自動刪除
         * 5、其他參數(shù)
         */
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        String message = "Hello world!";
        /**
         * 發(fā)送一個消息
         * 1、發(fā)送到哪個exchange交換機
         * 2、路由的key
         * 3、其他的參數(shù)信息
         * 4、消息體
         */
        channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
        System.out.println(" [x] Sent '"+message+"'");

        channel.close();
        connection.close();
    }
}

運行該方法,可以看到控制臺的打印

name=hello的隊列收到Message

消費者

package com.xun.rabbitmqdemo.example;

import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Receiver {
    private final static String QUEUE_NAME = "hello";
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUsername("guest");
        factory.setPassword("guest");
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setConnectionTimeout(600000);//milliseconds
        factory.setRequestedHeartbeat(60);//seconds
        factory.setHandshakeTimeout(6000);//milliseconds
        factory.setRequestedChannelMax(5);
        factory.setNetworkRecoveryInterval(500);

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        System.out.println("Waiting for messages. ");

        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println(" [x] Received '" + message + "'");
            }
        };
        channel.basicConsume(QUEUE_NAME,true,consumer);
    }
}

工作隊列

RabbitMqUtils工具類

package com.xun.rabbitmqdemo.utils;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class RabbitMqUtils {
    public static Channel getChannel() throws Exception{
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setUsername("guest");
        factory.setPassword("guest");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        return channel;
    }
}

啟動2個工作線程

package com.xun.rabbitmqdemo.workQueue;

import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.xun.rabbitmqdemo.utils.RabbitMqUtils;

public class Work01 {
    private static final String QUEUE_NAME = "hello";
    public static void main(String[] args) throws Exception{
        Channel channel = RabbitMqUtils.getChannel();
        DeliverCallback deliverCallback = (consumerTag,delivery)->{
            String receivedMessage = new String(delivery.getBody());
            System.out.println("接收消息:"+receivedMessage);
        };
        CancelCallback cancelCallback = (consumerTag)->{
            System.out.println(consumerTag+"消費者取消消費接口回調(diào)邏輯");
        };
        System.out.println("C1 消費者啟動等待消費....");
        /**
         * 消費者消費消息
         * 1、消費哪個隊列
         * 2、消費成功后是否自動應答
         * 3、消費的接口回調(diào)
         * 4、消費未成功的接口回調(diào)
         */
        channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
    }
}
package com.xun.rabbitmqdemo.workQueue;

import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.xun.rabbitmqdemo.utils.RabbitMqUtils;

public class Work02 {
    private static final String QUEUE_NAME = "hello";
    public static void main(String[] args) throws Exception{
        Channel channel = RabbitMqUtils.getChannel();
        DeliverCallback deliverCallback = (consumerTag,delivery)->{
            String receivedMessage = new String(delivery.getBody());
            System.out.println("接收消息:"+receivedMessage);
        };
        CancelCallback cancelCallback = (consumerTag)->{
            System.out.println(consumerTag+"消費者取消消費接口回調(diào)邏輯");
        };
        System.out.println("C2 消費者啟動等待消費....");
        channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
    }
}

啟動工作線程

啟動發(fā)送線程

package com.xun.rabbitmqdemo.workQueue;

import com.rabbitmq.client.Channel;
import com.xun.rabbitmqdemo.utils.RabbitMqUtils;
import java.util.Scanner;

public class Task01 {
    private static final String QUEUE_NAME = "hello";
    public static void main(String[] args) throws Exception{
        try(Channel channel= RabbitMqUtils.getChannel();){
            channel.queueDeclare(QUEUE_NAME,false,false,false,null);
            //從控制臺接收消息
            Scanner scanner = new Scanner(System.in);
            while(scanner.hasNext()){
                String message = scanner.next();
                channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
                System.out.println("發(fā)送消息完成:"+message);
            }
        }
    }
}

啟動發(fā)送線程,此時發(fā)送線程等待鍵盤輸入

發(fā)送4個消息

可以看到2個工作線程按照順序分別接收message。

消息應答機制

rabbitmq將message發(fā)送給消費者后,就會將該消息標記為刪除。

但消費者在處理message過程中宕機,會導致消息的丟失。

因此需要設置手動應答。

生產(chǎn)者

import com.xun.rabbitmqdemo.utils.RabbitMqUtils;
import java.util.Scanner;

public class Task02 {
    private static final String TASK_QUEUE_NAME = "ack_queue";
    public static void main(String[] args) throws Exception{
        try(Channel channel = RabbitMqUtils.getChannel()){
            channel.queueDeclare(TASK_QUEUE_NAME,false,false,false,null);
            Scanner scanner = new Scanner(System.in);
            System.out.println("請輸入信息");
            while(scanner.hasNext()){
                String message = scanner.nextLine();
                channel.basicPublish("",TASK_QUEUE_NAME,null,message.getBytes());
                System.out.println("生產(chǎn)者task02發(fā)出消息"+ message);
            }
        }
    }
}

消費者

package com.xun.rabbitmqdemo.workQueue;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.xun.rabbitmqdemo.utils.RabbitMqUtils;
import com.xun.rabbitmqdemo.utils.SleepUtils;

public class Work03 {
    private static final String ACK_QUEUE_NAME = "ack_queue";
    public static void main(String[] args) throws Exception{
        Channel channel = RabbitMqUtils.getChannel();
        System.out.println("Work03 等待接收消息處理時間較短");
        DeliverCallback deliverCallback = (consumerTag,delivery)->{
            String message = new String(delivery.getBody());
            SleepUtils.sleep(1);
            System.out.println("接收到消息:"+message);
            /**
             * 1、消息的標記tag
             * 2、是否批量應答
             */
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
        };
        CancelCallback cancelCallback = (consumerTag)->{
            System.out.println(consumerTag+"消費者取消消費接口回調(diào)邏輯");
        };
        //采用手動應答
        boolean autoAck = false;
        channel.basicConsume(ACK_QUEUE_NAME,autoAck,deliverCallback,cancelCallback);
    }
}
package com.xun.rabbitmqdemo.workQueue;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.xun.rabbitmqdemo.utils.RabbitMqUtils;
import com.xun.rabbitmqdemo.utils.SleepUtils;

public class Work04 {
    private static final String ACK_QUEUE_NAME = "ack_queue";
    public static void main(String[] args) throws Exception{
        Channel channel = RabbitMqUtils.getChannel();
        System.out.println("Work04 等待接收消息處理時間較長");
        DeliverCallback deliverCallback = (consumerTag,delivery)->{
            String message = new String(delivery.getBody());
            SleepUtils.sleep(30);
            System.out.println("接收到消息:"+message);
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
        };
        CancelCallback cancelCallback = (consumerTag)->{
            System.out.println(consumerTag+"消費者取消消費接口回調(diào)邏輯");
        };
        //采用手動應答
        boolean autoAck = false;
        channel.basicConsume(ACK_QUEUE_NAME,autoAck,deliverCallback,cancelCallback);
    }
}

工具類SleepUtils

package com.xun.rabbitmqdemo.utils;
public class SleepUtils {
    public static void sleep(int second){
        try{
            Thread.sleep(1000*second);
        }catch (InterruptedException _ignored){
            Thread.currentThread().interrupt();
        }
    }
}

模擬

work04等待30s后發(fā)出ack

在work04處理message時手動停止線程,可以看到message:dd被rabbitmq交給了work03

不公平分發(fā)

上面的輪詢分發(fā),生產(chǎn)者依次向消費者按順序發(fā)送消息,但當消費者A處理速度很快,而消費者B處理速度很慢時,這種分發(fā)策略顯然是不合理的。
不公平分發(fā):

int prefetchCount = 1;
channel.basicQos(prefetchCount);

通過此配置,當消費者未處理完當前消息,rabbitmq會優(yōu)先將該message分發(fā)給空閑消費者。

總結(jié) 

到此這篇關于docker啟動rabbitmq以及使用的文章就介紹到這了,更多相關docker啟動rabbitmq及使用內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!

相關文章

  • docker-compose啟動docker文件掛載失敗的解決

    docker-compose啟動docker文件掛載失敗的解決

    這篇文章主要介紹了docker-compose啟動docker文件掛載失敗的解決方案。具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2023-03-03
  • Mac上使用Docker如何快速啟動MySQL測試

    Mac上使用Docker如何快速啟動MySQL測試

    本文主要討論如何使用Docker快速啟動 MySQL 測試,包括Mac環(huán)境。非常不錯,具有參考借鑒價值,感興趣的朋友一起看看吧
    2016-10-10
  • docker容器使用GPU方法實現(xiàn)

    docker容器使用GPU方法實現(xiàn)

    本文主要介紹了docker容器使用GPU方法實現(xiàn),文中通過示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2022-05-05
  • DockerCE之執(zhí)行docker info出現(xiàn)兩條警告信息及解決方案

    DockerCE之執(zhí)行docker info出現(xiàn)兩條警告信息及解決方案

    這篇文章主要介紹了DockerCE之執(zhí)行docker info出現(xiàn)兩條警告信息及解決方案,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教
    2024-02-02
  • Docker容器進入的4種方式(小結(jié))

    Docker容器進入的4種方式(小結(jié))

    本文主要介紹了Docker容器進入的4種方式(小結(jié)),文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2023-01-01
  • 解決docker中mysql時間與系統(tǒng)時間不一致問題

    解決docker中mysql時間與系統(tǒng)時間不一致問題

    最近在Docker中裝mysql時,發(fā)現(xiàn)數(shù)據(jù)庫時間與系統(tǒng)時間相差8個小時。查詢資料發(fā)現(xiàn),docker的默認時區(qū)是0區(qū),其實這會對安裝的容器造成不少麻煩,比如執(zhí)行日志的記錄不準確等
    2021-12-12
  • docker網(wǎng)絡,docker-compose?network問題

    docker網(wǎng)絡,docker-compose?network問題

    這篇文章主要介紹了docker網(wǎng)絡,docker-compose?network問題,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教
    2024-01-01
  • docker環(huán)境搭建JMeter+Grafana+influxdb可視化性能監(jiān)控平臺的教程

    docker環(huán)境搭建JMeter+Grafana+influxdb可視化性能監(jiān)控平臺的教程

    這篇文章主要介紹了docker下搭建JMeter+Grafana+influxdb可視化性能監(jiān)控平臺,本文給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下
    2021-07-07
  • docker使用Dockerfile構建鏡像的實現(xiàn)示例

    docker使用Dockerfile構建鏡像的實現(xiàn)示例

    本文主要介紹了docker使用Dockerfile構建鏡像的實現(xiàn)示例,通過編寫 Dockerfile,您可以定義鏡像的基礎環(huán)境、安裝軟件包、復制文件、設置環(huán)境變量等操作,下面就來介紹一下
    2024-01-01
  • Docker網(wǎng)絡配置及部署SpringCloud項目詳解

    Docker網(wǎng)絡配置及部署SpringCloud項目詳解

    bridge模式是Docker默認的網(wǎng)絡設置,此模式會為每一個容器分配Network Namespace、設置IP等,并將一個主機上的Docker容器連接到一個虛擬網(wǎng)橋上,下面這篇文章主要給大家介紹了關于Docker網(wǎng)絡配置及部署SpringCloud項目的相關資料,需要的朋友可以參考下
    2023-01-01

最新評論