java開發(fā)分布式服務(wù)框架Dubbo調(diào)用過程
大致流程
客戶端根據(jù)遠程服務(wù)的地址,客戶端發(fā)送請求至服務(wù)端,服務(wù)端解析信息并找到對應(yīng)的實現(xiàn)類,進行方法調(diào)用,之后將調(diào)用結(jié)果原路返回,客戶端解析響應(yīng)之后再返回。
調(diào)用請求的具體信息
客戶端發(fā)送給服務(wù)端的請求中應(yīng)該包含哪些具體信息呢?
首先肯定要說明調(diào)用的是服務(wù)端的哪個接口、方法名、方法參數(shù)類型、以及版本號等,將上述信息封裝進請求,服務(wù)端就可以根據(jù)請求進行方法調(diào)用,之后再組裝響應(yīng)返回即可。
以上就是一個實際調(diào)用請求所包含的信息。
協(xié)議
遠程調(diào)用必不可少協(xié)議的約定,否則客戶端與服務(wù)端無法解析彼此傳來的信息,因此需要提前約定好協(xié)議,方便遠程調(diào)用的信息解析。
Dubbo使用的協(xié)議屬于Header+Body
,協(xié)議頭固定長度,并且頭部中會填寫B(tài)ody的長度,因此Body是不固定長度的,方便拓展,伸縮性較好。
Dubbo協(xié)議
協(xié)議分為協(xié)議頭和協(xié)議體,16字節(jié)的協(xié)議頭主要攜帶了魔法數(shù)、一些請求的設(shè)置,消息體數(shù)據(jù)長度。
16字節(jié)之后包含的就是協(xié)議體,包含版本信息,接口名稱,接口版本,以及方法名參數(shù)類型等。
序列化器
網(wǎng)絡(luò)是以字節(jié)流傳輸?shù)?,傳輸之前,我們需要將?shù)據(jù)序列化為字節(jié)流然后再傳輸至服務(wù)端,服務(wù)端再反序列化這些字節(jié)流得到原來的數(shù)據(jù)。
從上圖中可得知,Dubbo支持多種序列化,大致分為兩種,一種是字符型,一種是二進制流。
字符型的代表就是JSON,優(yōu)點是易懂,方便調(diào)試,缺點也很明顯,傳輸效率低,對于計算機來說有很多冗余的東西,例如JSON中的括號等等都會使得網(wǎng)絡(luò)傳輸時長邊長,占用帶寬變大。
二進制流類型的數(shù)據(jù)緊湊,占用字節(jié)數(shù)小,傳輸更快,但是調(diào)試困難。
Dubbo默認使用的是Hessian2Serialization
,即Hessian2
序列化協(xié)議。
調(diào)用流程圖
這個流程圖比較簡略,大致就是客戶端發(fā)起調(diào)用,實際調(diào)用的是代理類,代理類調(diào)用Client
(默認使用NettyClient),之后構(gòu)造好協(xié)議頭以及將Java對象序列化生成協(xié)議體,之后進行網(wǎng)絡(luò)傳輸。
服務(wù)端的NettyServer
接收到請求之后,會分發(fā)給業(yè)務(wù)線程池,由線程池來調(diào)用具體的方法。
但這遠遠不夠,實際場景比這復雜得多,并且Dubbo是生產(chǎn)級別的,通常會比上述流程更加安全穩(wěn)定。
在實際生產(chǎn)環(huán)境中,服務(wù)端往往會集群分布,多個服務(wù)端的服務(wù)會有多個Invoker,最終需要通過路由Router
過濾,以及負載均衡LoadBalance
選出一個Invoker進行調(diào)用。
請求會到達Netty的IO線程池進行序列化,再將請求發(fā)送給服務(wù)端,反序列化后丟入線程池處理,找到對應(yīng)的Invoker進行調(diào)用。
調(diào)用流程源碼分析——客戶端
客戶端調(diào)用方法并發(fā)送請求。
首先會調(diào)用生成的代理類,而代理類會生成一個RpcInvocation
對象調(diào)用MockClusterInvoker.invoke()
。
生成的RpcInvocation如下:
進入MockClusterInvoker.invoke()
看看:
public Result invoke(Invocation invocation) throws RpcException { Result result = null; //獲取mock參數(shù)配置 String value = this.directory.getUrl().getMethodParameter(invocation.getMethodName(), "mock", Boolean.FALSE.toString()).trim(); //如果配置了并且配置值為true if (value.length() != 0 && !value.equalsIgnoreCase("false")) { //強制走mock流程 if (value.startsWith("force")) { result = this.doMockInvoke(invocation, (RpcException)null); } else { //不走mock流程 try { result = this.invoker.invoke(invocation); } catch (RpcException var5) { .... } .... result = this.doMockInvoke(invocation, var5); } } } else { result = this.invoker.invoke(invocation); } return result; }
總的來說就是檢查配置是否配置了mock
,如果沒有就直接進入this.invoker.invoke(invocation)
,實際上會調(diào)用到AbstractClusterInvoker.invoke()
:
public Result invoke(Invocation invocation) throws RpcException { //檢查是否被銷毀 this.checkWhetherDestroyed(); LoadBalance loadbalance = null; //從上下文中獲取attachments,如果獲取得到的話綁定到invocation中 Map<String, String> contextAttachments = RpcContext.getContext().getAttachments(); if (contextAttachments != null && contextAttachments.size() != 0) { ((RpcInvocation)invocation).addAttachments(contextAttachments); } //調(diào)用的是directory.list,其中會做路由過濾 List<Invoker<T>> invokers = this.list(invocation); //如果過濾完之后還有Invoker,就通過SPI獲取對應(yīng)的LoadBalance實現(xiàn)類 if (invokers != null && !invokers.isEmpty()) { loadbalance = (LoadBalance)ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(((Invoker)invokers.get(0)).getUrl().getMethodParameter(RpcUtils.getMethodName(invocation), "loadbalance", "random")); } RpcUtils.attachInvocationIdIfAsync(this.getUrl(), invocation); return this.doInvoke(invocation, invokers, loadbalance); //調(diào)用子類方法 } protected List<Invoker<T>> list(Invocation invocation) throws RpcException { //獲取invokers目錄,實際調(diào)用的是AbstractDirectory.list() List<Invoker<T>> invokers = this.directory.list(invocation); return invokers; }
模板方法模式
這是很常見的設(shè)計模式之一,就是再抽象類中定好代碼的整體架構(gòu),然后將具體的實現(xiàn)留到子類中,由子類自定義實現(xiàn),由此可以再不改變整體執(zhí)行步驟的情況下,實現(xiàn)多樣化的實現(xiàn),減少代碼重復,利于擴展,符合開閉原則。
在上述代碼中this.doInvoke()
是抽象方法,具體實現(xiàn)在FailoverClusterInvoker.doInvoke()
中,上述所有步驟是每個子類都需要執(zhí)行的,所以抽取出來放在抽象類中。
路由和負載均衡
上述this.directory.list(invocation)
,其實就是通過方法名找到對應(yīng)的Invoker,然后由路由進行過濾。
public List<Invoker<T>> list(Invocation invocation) throws RpcException { if (this.destroyed) { throw new RpcException("Directory already destroyed .url: " + this.getUrl()); } else { //抽象方法doList,同樣由子類實現(xiàn) List<Invoker<T>> invokers = this.doList(invocation); List<Router> localRouters = this.routers; if (localRouters != null && !localRouters.isEmpty()) { Iterator i$ = localRouters.iterator(); while(i$.hasNext()) { Router router = (Router)i$.next(); try { //遍歷router,并判斷是否進行路由過濾 if (router.getUrl() == null || router.getUrl().getParameter("runtime", false)) { invokers = router.route(invokers, this.getConsumerUrl(), invocation); } } catch (Throwable var7) { logger.error("Failed to execute router: " + this.getUrl() + ", cause: " + var7.getMessage(), var7); } } } return invokers; } }
返回Invokers之后,還會在進行負載均衡的篩選,得到最終調(diào)用的Invoke,Dubbo默認使用的是FailoverClusterInvoker
,即失敗調(diào)用后自動切換的容錯方式。
進入FailoverClusterInvoker.doInvoke()
:
public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException { //重試次數(shù) int len = this.getUrl().getMethodParameter(invocation.getMethodName(), "retries", 2) + 1; if (len <= 0) { len = 1; } .... //重試 for(int i = 0; i < len; ++i) { //負載均衡篩選出一個Invoker Invoker<T> invoker = this.select(loadbalance, invocation, copyinvokers, invoked); invoked.add(invoker); //在上下文中保存調(diào)用過的invoker RpcContext.getContext().setInvokers(invoked); try { Result result = invoker.invoke(invocation); .... return result; } catch (RpcException e) { .... } finally { providers.add(invoker.getUrl().getAddress()); } } throw new RpcException(); }
發(fā)起這次調(diào)用的invoker.invoke
又是調(diào)用抽象類的中的invoke,然后再調(diào)用子類的doInvoke,我們直接進入子類DubboInvoker.doInvoke
看看:
protected Result doInvoke(Invocation invocation) throws Throwable { RpcInvocation inv = (RpcInvocation)invocation; String methodName = RpcUtils.getMethodName(invocation); inv.setAttachment("path", this.getUrl().getPath()); //設(shè)置path到attachment中 inv.setAttachment("version", this.version); //設(shè)置版本號 ExchangeClient currentClient; if (this.clients.length == 1) { //選擇client currentClient = this.clients[0]; } else { currentClient = this.clients[this.index.getAndIncrement() % this.clients.length]; } try { boolean isAsync = RpcUtils.isAsync(this.getUrl(), invocation); //是否異步調(diào)用 boolean isOneway = RpcUtils.isOneway(this.getUrl(), invocation); //是否oneway調(diào)用 int timeout = this.getUrl().getMethodParameter(methodName, "timeout", 1000); //獲取超時限制 if (isOneway) { //oneway boolean isSent = this.getUrl().getMethodParameter(methodName, "sent", false); currentClient.send(inv, isSent); //發(fā)送 RpcContext.getContext().setFuture((Future)null); //返回空的future return new RpcResult(); //返回空結(jié)果 } else if (isAsync) { //異步調(diào)用 ResponseFuture future = currentClient.request(inv, timeout); RpcContext.getContext().setFuture(new FutureAdapter(future)); //上下文中設(shè)置future return new RpcResult(); //返回空結(jié)果 } else { //同步調(diào)用 RpcContext.getContext().setFuture((Future)null); return (Result)currentClient.request(inv, timeout).get(); //直接調(diào)用future.get() 進行等待,完成get操作之后再返回結(jié)果 } } catch (TimeoutException var9) { throw new RpcException(); } }
調(diào)用的三種方式
從上述代碼中,可以看到調(diào)用一共分為三種,分別是oneway
,異步
,同步
。
- oneway:不需要關(guān)心請求是否發(fā)送成功的情況下,直接使用oneway,無需關(guān)心是否能完成發(fā)送并返回結(jié)果。
- 異步調(diào)用:client發(fā)送請求之后會得到一個ResponseFuture,然后將這個future塞入上下文中,讓用戶從上下文拿到這個future,用戶可以繼續(xù)執(zhí)行操作在調(diào)用future.get()返回結(jié)果。
- 同步調(diào)用:從Dubbo源碼中,我們可以看到,先使用了future.get(),讓用戶進行等待之后,再用client發(fā)送請求,給用戶的感覺就是調(diào)用接口后要進行等待才能返回結(jié)果,這個過程是阻塞的。
currentClient.request()
就是由如下所示,組裝request,然后構(gòu)造一個future調(diào)用NettyClient發(fā)送請求。
public ResponseFuture request(Object request, int timeout) throws RemotingException { if (this.closed) { throw new RemotingException(this.getLocalAddress(), (InetSocketAddress)null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!"); } else { Request req = new Request(); //構(gòu)建request req.setVersion(Version.getProtocolVersion()); req.setTwoWay(true); req.setData(request); DefaultFuture future = new DefaultFuture(this.channel, req, timeout); try { this.channel.send(req); //調(diào)用NettyServer.sent()進行發(fā)送請求 return future; } catch (RemotingException var6) { future.cancel(); throw var6; } } }
Dubbo默認的調(diào)用方式是異步調(diào)用,那么這個future保存至上下文之后,等響應(yīng)回來之后怎么找到對應(yīng)的future呢?
進入DefaultFuture
看看:
public class Request { private final long mId; public Request() { this.mId = newId(); } //靜態(tài)變量遞增,依次構(gòu)造唯一ID private static long newId() { return INVOKE_ID.getAndIncrement(); } } public class DefaultFuture implements ResponseFuture { private static final Map<Long, DefaultFuture> FUTURES = new ConcurrentHashMap(); public DefaultFuture(Channel channel, Request request, int timeout) { this.done = this.lock.newCondition(); this.start = System.currentTimeMillis(); this.channel = channel; this.request = request; this.id = request.getId(); //唯一ID this.timeout = timeout > 0 ? timeout : channel.getUrl().getPositiveParameter("timeout", 1000); FUTURES.put(this.id, this); //將唯一ID和future的關(guān)系保存到這個ConcurrentHashMap中 CHANNELS.put(this.id, channel); } }
Request構(gòu)造對象的時候會生成一個唯一ID,future內(nèi)部也會將自己與請求ID存儲到一個ConcurrentHashMap中,這個ID發(fā)送至服務(wù)端之后,服務(wù)端也會把這個ID返回,通過ID再去ConcurrentHashMap中找到對應(yīng)的future,由此完成一次完整的調(diào)用。
最終相應(yīng)返回之后會調(diào)用DefaultFuture.received()
:
public static void received(Channel channel, Response response) { try { //獲取響應(yīng)的ID去FUTURES中獲取對應(yīng)的future,獲取之后將future移除 DefaultFuture future = (DefaultFuture)FUTURES.remove(response.getId()); if (future != null) { //確認接收響應(yīng) future.doReceived(response); } else { logger.warn("...."); } } finally { CHANNELS.remove(response.getId()); } } private void doReceived(Response res) { this.lock.lock(); try { this.response = res; //響應(yīng)賦值 if (this.done != null) { this.done.signal(); //通知響應(yīng)返回 } } finally { this.lock.unlock(); } if (this.callback != null) { this.invokeCallback(this.callback); } }
調(diào)用流程源碼分析——服務(wù)端
服務(wù)端接受請求之后會解析請求得到消息,消息總共有五種派發(fā)策略:
Dubbo默認使用的是all
,所有消息都派發(fā)到業(yè)務(wù)線程池中,在AllChannelHandler
中實現(xiàn):
public void received(Channel channel, Object message) throws RemotingException { ExecutorService cexecutor = this.getExecutorService(); try { cexecutor.execute(new ChannelEventRunnable(channel, this.handler, ChannelState.RECEIVED, message)); } catch (Throwable var8) { if (message instanceof Request && var8 instanceof RejectedExecutionException) { Request request = (Request)message; if (request.isTwoWay()) { //如果需要返回響應(yīng),將錯誤封裝起來之后返回 String msg = "Server side(" + this.url.getIp() + "," + this.url.getPort() + ") threadpool is exhausted ,detail msg:" + var8.getMessage(); Response response = new Response(request.getId(), request.getVersion()); response.setStatus((byte)100); response.setErrorMessage(msg); channel.send(response); return; } } throw new ExecutionException(message, channel, this.getClass() + " error when process received event .", var8); } }
上述代碼就是將消息封裝成一個ChannelEventRunnable
然后放入業(yè)務(wù)線程池中執(zhí)行,ChannelEventRunnable
會根據(jù)ChannelState
參數(shù)調(diào)用對應(yīng)的處理方法,此處是ChannelState.RECEIVED
,因此調(diào)用的是handler.received
,最終調(diào)用的是HeaderExchangeHandler.handleRequest()
方法:
Response handleRequest(ExchangeChannel channel, Request req) throws RemotingException { Response res = new Response(req.getId(), req.getVersion()); //通過requestId構(gòu)造響應(yīng) Object data; if (req.isBroken()) { data = req.getData(); String msg; if (data == null) { msg = null; } else if (data instanceof Throwable) { msg = StringUtils.toString((Throwable)data); } else { msg = data.toString(); } res.setErrorMessage("Fail to decode request due to: " + msg); res.setStatus((byte)40); return res; } else { data = req.getData(); try { Object result = this.handler.reply(channel, data); //最終調(diào)用DubboProtocol.reply() res.setStatus((byte)20); res.setResult(result); } catch (Throwable var6) { res.setStatus((byte)70); res.setErrorMessage(StringUtils.toString(var6)); } return res; } }
進入DubboProtocol.reply()
看看:
public Object reply(ExchangeChannel channel, Object message) throws RemotingException { if (!(message instanceof Invocation)) { throw new RemotingException(); } else { Invocation inv = (Invocation)message; Invoker<?> invoker = DubboProtocol.this.getInvoker(channel, inv); //根據(jù)inv得到對應(yīng)的Invoker if (Boolean.TRUE.toString().equals(inv.getAttachments().get("_isCallBackServiceInvoke"))) { //一些回調(diào)邏輯 } else { hasMethod = inv.getMethodName().equals(methodsStr); } } RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress()); return invoker.invoke(inv); //調(diào)用選擇的Invoker.invoke() } }
最后的調(diào)用我們已經(jīng)了解過,就是調(diào)用一個Javassist
生成的代理類,其中包含了真正的實現(xiàn)類;再進入this.getInvoker()
看看是怎么根據(jù)請求信息獲取到Invoker的:
Invoker<?> getInvoker(Channel channel, Invocation inv) throws RemotingException { //.... int port = channel.getLocalAddress().getPort(); String path = (String)inv.getAttachments().get("path"); //根據(jù)port、path以及其他信息獲取serviceKey String serviceKey = serviceKey(port, path, (String)inv.getAttachments().get("version"), (String)inv.getAttachments().get("group")); //根據(jù)serviceKey在之前提到的exportMap中獲取exporter DubboExporter<?> exporter = (DubboExporter)this.exporterMap.get(serviceKey); if (exporter == null) { throw new RemotingException(....); } else { return exporter.getInvoker(); //返回Invoker } }
關(guān)鍵點在于serviceKey
,在之前服務(wù)暴露提到的將Invoker封裝成exporter之后再構(gòu)建一個exporterMap
,將serviceKey
和對應(yīng)的exporter
存入,在服務(wù)調(diào)用時這個map就起到作用了。
找到所需要的Invoker最終調(diào)用實現(xiàn)類具體方法再返回響應(yīng)整個服務(wù)調(diào)用流程就結(jié)束了,再對上述的流程圖進行一下補充:
總結(jié)
首先客戶端調(diào)用接口中的某個方法,但實際調(diào)用的是代理類,代理類通過Cluster從獲取Invokers,之后通過Router進行路由過濾,再通過所配置的負載均衡機制進行篩選得到本次遠程調(diào)用所需要的Invoker,此時根據(jù)具體的協(xié)議構(gòu)造請求頭,再將參數(shù)根據(jù)具體的序列化協(xié)議進行序列化之后構(gòu)造好塞入?yún)f(xié)議體,最后通過NettyClient發(fā)起遠程調(diào)用。
服務(wù)端NettyServer收到請求后,根據(jù)協(xié)議將得到的信息進行反序列化得到對象,根據(jù)消息派發(fā)策略(默認是All)將消息丟入線程池。
業(yè)務(wù)現(xiàn)場會根據(jù)消息類型得到serviceKey,用這個key從之前服務(wù)暴露生成的exportMap中得到對應(yīng)的Invoker,然后調(diào)用真正的實現(xiàn)類中的具體方法。
最終將結(jié)果返回,因為請求和響應(yīng)的都有一個對應(yīng)且唯一的ID,客戶端會根據(jù)響應(yīng)的ID找到存儲起來的Future,塞入響應(yīng)中等待喚醒Future的線程,這就完成了一次完整的調(diào)用過程。
如有錯誤或不足歡迎評論指正。
以上就是java開發(fā)分布式服務(wù)框架Dubbo調(diào)用過程的詳細內(nèi)容,更多關(guān)于Dubbo服務(wù)調(diào)用過程的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
淺談Java內(nèi)部類——靜態(tài)內(nèi)部類
這篇文章主要介紹了Java靜態(tài)內(nèi)部類的相關(guān)資料,幫助大家更好的理解和學習Java內(nèi)部類的相關(guān)知識,感興趣的朋友可以了解下2020-08-08詳解SpringBoot中的統(tǒng)一功能處理的實現(xiàn)
這篇文章主要為大家詳細介紹了SpringBoot如何實現(xiàn)統(tǒng)一功能處理,文中的示例代碼講解詳細,對我們學習或工作有一定借鑒價值,需要的可以參考一下2023-01-01Java中java.sql.SQLException異常的正確解決方法(親測有效!)
SQLException是在Java中處理數(shù)據(jù)庫操作過程中可能發(fā)生的異常,通常是由于底層數(shù)據(jù)庫操作錯誤或違反了數(shù)據(jù)庫規(guī)則而引起的,下面這篇文章主要給大家介紹了關(guān)于Java中java.sql.SQLException異常的正確解決方法,需要的朋友可以參考下2024-01-01SpringBoot JWT接口驗證實現(xiàn)流程詳細介紹
這篇文章主要介紹了SpringBoot+JWT實現(xiàn)接口驗證,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習吧2022-09-09