亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

Java中RabbitMQ隊列實現(xiàn)RPC詳解

 更新時間:2023年08月29日 08:29:27   作者:輕塵×  
這篇文章主要介紹了Java中RabbitMQ隊列實現(xiàn)RPC詳解,在本教程中,我們將使用RabbitMQ構(gòu)建一個RPC系統(tǒng):一個客戶端和一個RPC服務(wù)器,我們將創(chuàng)建一個返回斐波那契數(shù)字的模擬RPC服務(wù),,需要的朋友可以參考下

RabbitMQ實現(xiàn)RPC

如果我們需要在遠(yuǎn)程計算機(jī)上運行一個函數(shù)并等待結(jié)果,這種模式通常被稱為遠(yuǎn)程過程調(diào)用或RPC。

在本教程中,我們將使用RabbitMQ構(gòu)建一個RPC系統(tǒng):

  1. 一個客戶端和一個RPC服務(wù)器。
  2. 我們將創(chuàng)建一個返回斐波那契數(shù)字的模擬RPC服務(wù)。

整個過程示意圖如下:

這里寫圖片描述

客戶端將請求發(fā)送至rpc_queue(我們定義的消息隊列),然后等待響應(yīng);服務(wù)端獲取請求,并處理請求,然后將請求結(jié)果返回給隊列,客戶端得知請求被響應(yīng)后獲取結(jié)果。

在結(jié)果被響應(yīng)之前,客戶端是被阻塞的,主線程會等待RPC響應(yīng)

如果每個RPC請求都創(chuàng)建一個回調(diào)隊列。這是非常低效,我們創(chuàng)建一個單一的客戶端回調(diào)隊列。

這引發(fā)了一個新的問題,在該隊列中收到回復(fù)時,不清楚回復(fù)屬于哪個請求。這就需要用到 correlationId屬性。

我們?yōu)闆]有請求設(shè)置唯一的correlationId值。

然后,當(dāng)我們在回調(diào)隊列中收到一條消息時,我們將獲取這個值,將響應(yīng)與請求的進(jìn)行correlationId匹配。

如果我們一致就是我們需要的結(jié)果,否則就不是。

客戶端代RPCClient

代碼如下:

package com.adtec.rabbitmq;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeoutException;
public class RPCClient {
    private Connection connection;
    private Channel channel;
    private String requestQueueName = "rpc_queue";
    private String replyQueueName;
    public RPCClient() throws IOException, TimeoutException {
        //建立一個連接和一個通道,并為回調(diào)聲明一個唯一的'回調(diào)'隊列
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        connection = factory.newConnection();
        channel = connection.createChannel();
        //定義一個臨時變量的接受隊列名    
        replyQueueName = channel.queueDeclare().getQueue();
    }
    //發(fā)送RPC請求  
    public String call(String message) throws IOException, InterruptedException {
         //生成一個唯一的字符串作為回調(diào)隊列的編號
        String corrId = UUID.randomUUID().toString();
        //發(fā)送請求消息,消息使用了兩個屬性:replyto和correlationId
        //服務(wù)端根據(jù)replyto返回結(jié)果,客戶端根據(jù)correlationId判斷響應(yīng)是不是給自己的
        AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().correlationId(corrId).replyTo(replyQueueName)
                .build();
        //發(fā)布一個消息,requestQueueName路由規(guī)則
        channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8"));
        //由于我們的消費者交易處理是在單獨的線程中進(jìn)行的,因此我們需要在響應(yīng)到達(dá)之前暫停主線程。
        //這里我們創(chuàng)建的 容量為1的阻塞隊列ArrayBlockingQueue,因為我們只需要等待一個響應(yīng)。
        final BlockingQueue<String> response = new ArrayBlockingQueue<String>(1);
        // String basicConsume(String queue, boolean autoAck, Consumer callback)
        channel.basicConsume(replyQueueName, true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                    byte[] body) throws IOException {
                //檢查它的correlationId是否是我們所要找的那個
                if (properties.getCorrelationId().equals(corrId)) {
                    //如果是,則響應(yīng)BlockingQueue
                    response.offer(new String(body, "UTF-8"));
                }
            }
        });
        return response.take();
    }
    public void close() throws IOException {
        connection.close();
    }
    public static void main(String[] argv) {
        RPCClient fibonacciRpc = null;
        String response = null;
        try {
            fibonacciRpc = new RPCClient();
            System.out.println(" [x] Requesting fib(30)");
            response = fibonacciRpc.call("30");
            System.out.println(" [.] Got '" + response + "'");
        } catch (IOException | TimeoutException | InterruptedException e) {
            e.printStackTrace();
        } finally {
            if (fibonacciRpc != null) {
                try {
                    fibonacciRpc.close();
                } catch (IOException _ignore) {
                }
            }
        }
    }
}

上面的代碼中用到了阻塞隊列ArrayBlockingQueue

服務(wù)端代RPCServer

代碼如下:

package rabbitmq;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class RPCServer {
    private static final String RPC_QUEUE_NAME = "rpc_queue";
    //具體處理方法
    private static int fib(int n) {
        if (n == 0)
            return 0;
        if (n == 1)
            return 1;
        return fib(n - 1) + fib(n - 2);
    }
    public static void main(String[] argv) {
         //建立連接、通道,并聲明隊列 
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = null;
        try {
            connection = factory.newConnection();
            final Channel channel = connection.createChannel();
            channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
            channel.basicQos(1);
            System.out.println(" [x] Awaiting RPC requests");
            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                        byte[] body) throws IOException {
                    AMQP.BasicProperties replyProps = new AMQP.BasicProperties.Builder()
                            .correlationId(properties.getCorrelationId()).build();
                    String response = "";
                    try {
                        String message = new String(body, "UTF-8");
                        int n = Integer.parseInt(message);
                        System.out.println(" [.] fib(" + message + ")");
                        response += fib(n);
                    } catch (RuntimeException e) {
                        System.out.println(" [.] " + e.toString());
                    } finally {
                        // 返回處理結(jié)果隊列
                        channel.basicPublish("", properties.getReplyTo(), replyProps, response.getBytes("UTF-8"));
                        //  確認(rèn)消息,已經(jīng)收到后面參數(shù) multiple:是否批量.true:將一次性確認(rèn)所有小于envelope.getDeliveryTag()的消息。
                        channel.basicAck(envelope.getDeliveryTag(), false);
                        // RabbitMq consumer worker thread notifies the RPC
                        // server owner thread
                        synchronized (this) {
                            this.notify();
                        }
                    }
                }
            };
            //取消自動確認(rèn)
            boolean autoAck = false ;
            channel.basicConsume(RPC_QUEUE_NAME, autoAck, consumer);
            // Wait and be prepared to consume the message from RPC client.
            while (true) {
                synchronized (consumer) {
                    try {
                        consumer.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        } catch (IOException | TimeoutException e) {
            e.printStackTrace();
        } finally {
            if (connection != null)
                try {
                    connection.close();
                } catch (IOException _ignore) {
                }
        }
    }
}

測試時先運行服務(wù)端,再運行客戶端 為了方便觀察結(jié)果,最好將客戶端和服務(wù)端在不同workspace實現(xiàn)

客戶端結(jié)果

這里寫圖片描述

服務(wù)端結(jié)果

這里寫圖片描述

到此這篇關(guān)于Java中RabbitMQ隊列實現(xiàn)RPC詳解的文章就介紹到這了,更多相關(guān)RabbitMQ實現(xiàn)RPC內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • mybatis注解與xml常用語句匯總

    mybatis注解與xml常用語句匯總

    最近一直在用mybatis,由于需要使用到了動態(tài)sql,遇到了一些問題,現(xiàn)在來總結(jié)一下,經(jīng)驗教訓(xùn)。下面這篇文章主要給大家總結(jié)介紹了mybatis注解與xml常用語句的相關(guān)資料,文中通過示例代碼介紹的非常詳細(xì),需要的朋友可以參考下
    2018-09-09
  • 使用CXF和Jersey框架來進(jìn)行Java的WebService編程

    使用CXF和Jersey框架來進(jìn)行Java的WebService編程

    這篇文章主要介紹了使用CXF和Jersey框架來進(jìn)行Java的WebService編程,Web service是一個平臺獨立的低耦合的自包含的基于可編程的web的應(yīng)用程序,需要的朋友可以參考下
    2015-12-12
  • JAVA基礎(chǔ)之注解與反射的使用方法和場景

    JAVA基礎(chǔ)之注解與反射的使用方法和場景

    這篇文章主要給大家介紹了關(guān)于JAVA基礎(chǔ)之注解與反射的使用方法和場景的相關(guān)資料,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2021-03-03
  • java多線程累加計數(shù)的實現(xiàn)方法

    java多線程累加計數(shù)的實現(xiàn)方法

    在多線程協(xié)作任務(wù)中,如何計算也是很重的,這篇文章主要介紹了java多線程累加計數(shù)的實現(xiàn)方法,感興趣的朋友可以了解一下
    2021-05-05
  • Java三種移位運算符原理解析

    Java三種移位運算符原理解析

    這篇文章主要介紹了Java三種移位運算符原理解析,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下
    2020-02-02
  • Java中的static--靜態(tài)變量你了解嗎

    Java中的static--靜態(tài)變量你了解嗎

    Java 中被 static 修飾的成員稱為靜態(tài)成員或類成員。它屬于整個類所有,而不是某個對象所有,即被類的所有對象所共享。靜態(tài)成員可以使用類名直接訪問,也可以使用對象名進(jìn)行訪問,.下面我們來詳細(xì)了解一下吧
    2021-09-09
  • 使用@CachePut?更新數(shù)據(jù)庫和更新緩存

    使用@CachePut?更新數(shù)據(jù)庫和更新緩存

    這篇文章主要介紹了使用@CachePut?更新數(shù)據(jù)庫和更新緩存方式,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2021-12-12
  • spring,mybatis事務(wù)管理配置與@Transactional注解使用詳解

    spring,mybatis事務(wù)管理配置與@Transactional注解使用詳解

    這篇文章主要介紹了spring,mybatis事務(wù)管理配置與@Transactional注解使用,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2023-07-07
  • 如何用120行Java代碼寫一個自己的區(qū)塊鏈

    如何用120行Java代碼寫一個自己的區(qū)塊鏈

    這篇文章就是幫助你使用 Java 語言來實現(xiàn)一個簡單的區(qū)塊鏈,用不到 120 行代碼來揭示區(qū)塊鏈的原理,感興趣的就一起來了解一下
    2019-06-06
  • Spring七大事務(wù)傳遞機(jī)制深入分析實現(xiàn)原理

    Spring七大事務(wù)傳遞機(jī)制深入分析實現(xiàn)原理

    實際項目開發(fā)中,如果涉及到多張表操作時,為了保證業(yè)務(wù)數(shù)據(jù)的一致性,大家一般都會采用事務(wù)機(jī)制,好多小伙伴可能只是簡單了解一下,遇到事務(wù)失效的情況,便會無從下手,下面這篇文章主要給大家介紹了關(guān)于Spring事務(wù)傳遞機(jī)制的相關(guān)資料,需要的朋友可以參考下
    2023-03-03

最新評論