Java中RabbitMQ隊列實現(xiàn)RPC詳解
RabbitMQ實現(xiàn)RPC
如果我們需要在遠(yuǎn)程計算機(jī)上運行一個函數(shù)并等待結(jié)果,這種模式通常被稱為遠(yuǎn)程過程調(diào)用或RPC。
在本教程中,我們將使用RabbitMQ構(gòu)建一個RPC系統(tǒng):
- 一個客戶端和一個RPC服務(wù)器。
- 我們將創(chuàng)建一個返回斐波那契數(shù)字的模擬RPC服務(wù)。
整個過程示意圖如下:
客戶端將請求發(fā)送至rpc_queue(我們定義的消息隊列),然后等待響應(yīng);服務(wù)端獲取請求,并處理請求,然后將請求結(jié)果返回給隊列,客戶端得知請求被響應(yīng)后獲取結(jié)果。
在結(jié)果被響應(yīng)之前,客戶端是被阻塞的,主線程會等待RPC響應(yīng)
如果每個RPC請求都創(chuàng)建一個回調(diào)隊列。這是非常低效,我們創(chuàng)建一個單一的客戶端回調(diào)隊列。
這引發(fā)了一個新的問題,在該隊列中收到回復(fù)時,不清楚回復(fù)屬于哪個請求。這就需要用到 correlationId屬性。
我們?yōu)闆]有請求設(shè)置唯一的correlationId值。
然后,當(dāng)我們在回調(diào)隊列中收到一條消息時,我們將獲取這個值,將響應(yīng)與請求的進(jìn)行correlationId匹配。
如果我們一致就是我們需要的結(jié)果,否則就不是。
客戶端代RPCClient
代碼如下:
package com.adtec.rabbitmq; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Envelope; import java.io.IOException; import java.util.UUID; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeoutException; public class RPCClient { private Connection connection; private Channel channel; private String requestQueueName = "rpc_queue"; private String replyQueueName; public RPCClient() throws IOException, TimeoutException { //建立一個連接和一個通道,并為回調(diào)聲明一個唯一的'回調(diào)'隊列 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); connection = factory.newConnection(); channel = connection.createChannel(); //定義一個臨時變量的接受隊列名 replyQueueName = channel.queueDeclare().getQueue(); } //發(fā)送RPC請求 public String call(String message) throws IOException, InterruptedException { //生成一個唯一的字符串作為回調(diào)隊列的編號 String corrId = UUID.randomUUID().toString(); //發(fā)送請求消息,消息使用了兩個屬性:replyto和correlationId //服務(wù)端根據(jù)replyto返回結(jié)果,客戶端根據(jù)correlationId判斷響應(yīng)是不是給自己的 AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().correlationId(corrId).replyTo(replyQueueName) .build(); //發(fā)布一個消息,requestQueueName路由規(guī)則 channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8")); //由于我們的消費者交易處理是在單獨的線程中進(jìn)行的,因此我們需要在響應(yīng)到達(dá)之前暫停主線程。 //這里我們創(chuàng)建的 容量為1的阻塞隊列ArrayBlockingQueue,因為我們只需要等待一個響應(yīng)。 final BlockingQueue<String> response = new ArrayBlockingQueue<String>(1); // String basicConsume(String queue, boolean autoAck, Consumer callback) channel.basicConsume(replyQueueName, true, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //檢查它的correlationId是否是我們所要找的那個 if (properties.getCorrelationId().equals(corrId)) { //如果是,則響應(yīng)BlockingQueue response.offer(new String(body, "UTF-8")); } } }); return response.take(); } public void close() throws IOException { connection.close(); } public static void main(String[] argv) { RPCClient fibonacciRpc = null; String response = null; try { fibonacciRpc = new RPCClient(); System.out.println(" [x] Requesting fib(30)"); response = fibonacciRpc.call("30"); System.out.println(" [.] Got '" + response + "'"); } catch (IOException | TimeoutException | InterruptedException e) { e.printStackTrace(); } finally { if (fibonacciRpc != null) { try { fibonacciRpc.close(); } catch (IOException _ignore) { } } } } }
上面的代碼中用到了阻塞隊列ArrayBlockingQueue
服務(wù)端代RPCServer
代碼如下:
package rabbitmq; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Envelope; import java.io.IOException; import java.util.concurrent.TimeoutException; public class RPCServer { private static final String RPC_QUEUE_NAME = "rpc_queue"; //具體處理方法 private static int fib(int n) { if (n == 0) return 0; if (n == 1) return 1; return fib(n - 1) + fib(n - 2); } public static void main(String[] argv) { //建立連接、通道,并聲明隊列 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = null; try { connection = factory.newConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null); channel.basicQos(1); System.out.println(" [x] Awaiting RPC requests"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { AMQP.BasicProperties replyProps = new AMQP.BasicProperties.Builder() .correlationId(properties.getCorrelationId()).build(); String response = ""; try { String message = new String(body, "UTF-8"); int n = Integer.parseInt(message); System.out.println(" [.] fib(" + message + ")"); response += fib(n); } catch (RuntimeException e) { System.out.println(" [.] " + e.toString()); } finally { // 返回處理結(jié)果隊列 channel.basicPublish("", properties.getReplyTo(), replyProps, response.getBytes("UTF-8")); // 確認(rèn)消息,已經(jīng)收到后面參數(shù) multiple:是否批量.true:將一次性確認(rèn)所有小于envelope.getDeliveryTag()的消息。 channel.basicAck(envelope.getDeliveryTag(), false); // RabbitMq consumer worker thread notifies the RPC // server owner thread synchronized (this) { this.notify(); } } } }; //取消自動確認(rèn) boolean autoAck = false ; channel.basicConsume(RPC_QUEUE_NAME, autoAck, consumer); // Wait and be prepared to consume the message from RPC client. while (true) { synchronized (consumer) { try { consumer.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } } } catch (IOException | TimeoutException e) { e.printStackTrace(); } finally { if (connection != null) try { connection.close(); } catch (IOException _ignore) { } } } }
測試時先運行服務(wù)端,再運行客戶端 為了方便觀察結(jié)果,最好將客戶端和服務(wù)端在不同workspace實現(xiàn)
客戶端結(jié)果
服務(wù)端結(jié)果
到此這篇關(guān)于Java中RabbitMQ隊列實現(xiàn)RPC詳解的文章就介紹到這了,更多相關(guān)RabbitMQ實現(xiàn)RPC內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
使用CXF和Jersey框架來進(jìn)行Java的WebService編程
這篇文章主要介紹了使用CXF和Jersey框架來進(jìn)行Java的WebService編程,Web service是一個平臺獨立的低耦合的自包含的基于可編程的web的應(yīng)用程序,需要的朋友可以參考下2015-12-12使用@CachePut?更新數(shù)據(jù)庫和更新緩存
這篇文章主要介紹了使用@CachePut?更新數(shù)據(jù)庫和更新緩存方式,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-12-12spring,mybatis事務(wù)管理配置與@Transactional注解使用詳解
這篇文章主要介紹了spring,mybatis事務(wù)管理配置與@Transactional注解使用,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2023-07-07Spring七大事務(wù)傳遞機(jī)制深入分析實現(xiàn)原理
實際項目開發(fā)中,如果涉及到多張表操作時,為了保證業(yè)務(wù)數(shù)據(jù)的一致性,大家一般都會采用事務(wù)機(jī)制,好多小伙伴可能只是簡單了解一下,遇到事務(wù)失效的情況,便會無從下手,下面這篇文章主要給大家介紹了關(guān)于Spring事務(wù)傳遞機(jī)制的相關(guān)資料,需要的朋友可以參考下2023-03-03