亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

Java Dubbo協(xié)議下的服務(wù)端線程使用詳解

 更新時間:2023年03月01日 10:47:55   作者:Redick01  
Dubbo是阿里開源項目,國內(nèi)很多互聯(lián)網(wǎng)公司都在用,已經(jīng)經(jīng)過很多線上考驗。Dubbo內(nèi)部使用了Netty、Zookeeper,保證了高性能高可用性,使用Dubbo可以將核心業(yè)務(wù)抽取出來,作為獨立的服務(wù),逐漸形成穩(wěn)定的服務(wù)中心

Provider端線程模型

在了解服務(wù)端線程模型之前,先了解一下Dubbo對Channel上的操作抽象,Dubbo將Channel上的操作成了5中行為,分別是:建立連接、斷開連接、發(fā)送消息、接收消息、異常捕獲,Channel上的操作的接口為org.apache.dubbo.remoting.ChannelHandler,該接口是SPI的,用戶可以自己擴展,接口代碼如下:

該接口抽象的五種Channel上的行為解釋如下:

  • 建立連接:connected,主要是的職責(zé)是在channel記錄read、write的時間,以及處理建立連接后的回調(diào)邏輯,比如dubbo支持在斷開后自定義回調(diào)的hook(onconnect),即在該操作中執(zhí)行。
  • 斷開連接:disconnected,主要是的職責(zé)是在channel移除read、write的時間,以及處理端開連接后的回調(diào)邏輯,比如dubbo支持在斷開后自定義回調(diào)的hook(ondisconnect),即在該操作中執(zhí)行。
  • 發(fā)送消息:sent,包括發(fā)送請求和發(fā)送響應(yīng)。記錄write的時間。
  • 接收消息:received,包括接收請求和接收響應(yīng)。記錄read的時間。
  • 異常捕獲:caught,用于處理在channel上發(fā)生的各類異常。

Dubbo框架的線程模型與以上這五種行為息息相關(guān),Dubbo協(xié)議Provider端線程模型提供了五種實現(xiàn),雖說都是五種但是別把二者混淆,線程模型的頂級接口是org.apache.dubbo.remoting.Dispatcher,該接口也是SPI的,提供的五種實現(xiàn)分別是AllDispatcher、DirectDispatcherMessageOnlyDispatcher、ExecutionDispatcher、ConnectionOrderedDispatcher,默認的使用的是AllDispatcher。

org.apache.dubbo.remoting.ChannelHandler作為Channel上的行為的頂級接口對應(yīng)Dubbo協(xié)議Provider端的5種線程模型同樣也提供了對應(yīng)的5種實現(xiàn),分別是AllChannelHandler、DirectChannelHandlerMessageOnlyChannelHandler、ExecutionChannelHandler、ConnectionOrderedChannelHandler,這里Channel上行為的具體實現(xiàn)不展開討論。

Channel上行為和線程模型之間使用策略可以參考org.apache.dubbo.remoting.transport.dispatcher.ChannelHandlers的源代碼,這里不做詳細的介紹,下面的各個章節(jié)只針對5種線程模型做簡單的介紹。

AllDispatcher

IO線程上的操作:

  • 接口響應(yīng)序列化
  • sent操作

Dubbo線程池上的操作:

  • received、connected、disconnected、caught都是在Dubbo線程池上執(zhí)行
  • 服務(wù)端反序列化操作的Dubbo線程池上執(zhí)行

AllDispatcher代碼如下,AllDispatcherdispatch方法實例化了AllChannelHandler,AllChannelHandler實現(xiàn)了received、connected、disconnected、caught操作在dubbo線程池中,代碼如下:

public class AllDispatcher implements Dispatcher {
    public static final String NAME = "all";
    @Override
    public ChannelHandler dispatch(ChannelHandler handler, URL url) {
        return new AllChannelHandler(handler, url);
    }
}
public class AllChannelHandler extends WrappedChannelHandler {
    public AllChannelHandler(ChannelHandler handler, URL url) {
        super(handler, url);
    }
    @Override
    public void connected(Channel channel) throws RemotingException {
        ExecutorService executor = getSharedExecutorService();
        try {
            executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED));
        } catch (Throwable t) {
            throw new ExecutionException("connect event", channel, getClass() + " error when process connected event .", t);
        }
    }
    @Override
    public void disconnected(Channel channel) throws RemotingException {
        ExecutorService executor = getSharedExecutorService();
        try {
            executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED));
        } catch (Throwable t) {
            throw new ExecutionException("disconnect event", channel, getClass() + " error when process disconnected event .", t);
        }
    }
    @Override
    public void received(Channel channel, Object message) throws RemotingException {
        ExecutorService executor = getPreferredExecutorService(message);
        try {
            executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
        } catch (Throwable t) {
            if(message instanceof Request && t instanceof RejectedExecutionException){
                sendFeedback(channel, (Request) message, t);
                return;
            }
            throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
        }
    }
    @Override
    public void caught(Channel channel, Throwable exception) throws RemotingException {
        ExecutorService executor = getSharedExecutorService();
        try {
            executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CAUGHT, exception));
        } catch (Throwable t) {
            throw new ExecutionException("caught event", channel, getClass() + " error when process caught event .", t);
        }
    }
}

DirectDispatcher

該線程模型Channel上的所有行為均在IO線程中執(zhí)行,并沒有在Dubbo線程池中執(zhí)行

DirectDispatcherAllDispatcher相似,實例化了DirectChannelHandler,DirectChannelHandler只實現(xiàn)了received行為,但是received中獲取的線程池如果是ThreadlessExecutor才會提交task,否則也是在ChannelHandler中執(zhí)行received行為,ThreadlessExecutor和普通線程池最大的區(qū)別是不會管理任何線程,這里不展開討論。

public class DirectDispatcher implements Dispatcher {
    public static final String NAME = "direct";
    @Override
    public ChannelHandler dispatch(ChannelHandler handler, URL url) {
        return new DirectChannelHandler(handler, url);
    }
}
public class DirectChannelHandler extends WrappedChannelHandler {
    public DirectChannelHandler(ChannelHandler handler, URL url) {
        super(handler, url);
    }
    @Override
    public void received(Channel channel, Object message) throws RemotingException {
        ExecutorService executor = getPreferredExecutorService(message);
        if (executor instanceof ThreadlessExecutor) {
            try {
                executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
            } catch (Throwable t) {
                throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
            }
        } else {
            handler.received(channel, message);
        }
    }
}

ExecutionDispatcher

在IO線程中執(zhí)行的操作有:

  • sent、connected、disconnected、caught操作在IO線程上執(zhí)行。
  • 序列化響應(yīng)在IO線程上執(zhí)行。

在Dubbo線程中執(zhí)行的操作有:

  • received都是在Dubbo線程上執(zhí)行的。
  • 反序列化請求的行為在Dubbo中做的。

同樣的,我們可以直接看ExecutionChannelHandler源碼,邏輯是當(dāng)message的類型是Request時received行為在Dubbo線程池執(zhí)行。感興趣的可以自己看源碼,這里不做介紹。

MessageOnlyDispatcher

Message Only Dispatcher所有的received行為和反序列化都是在dubbo線程池中執(zhí)行的

public class MessageOnlyChannelHandler extends WrappedChannelHandler {
    public MessageOnlyChannelHandler(ChannelHandler handler, URL url) {
        super(handler, url);
    }
    @Override
    public void received(Channel channel, Object message) throws RemotingException {
        ExecutorService executor = getPreferredExecutorService(message);
        try {
            executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
        } catch (Throwable t) {
            if(message instanceof Request && t instanceof RejectedExecutionException){
                sendFeedback(channel, (Request) message, t);
                return;
            }
            throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
        }
    }
}

ConnectionOrderedDispatcher

該線程模型與AllDispatcher類似,sent操作和相應(yīng)的序列化是在IO線程上執(zhí)行;connected、disconnected、received、caught操作在dubbo線程池上執(zhí)行,他們的區(qū)別是在connected、disconnected行為上ConnectionOrderedDispatcher做了線程池隔離,并且在Dubbo connected thread pool中提供了鏈接限制、告警燈能力,我們直接看ConnectionOrderedChannelHandler源碼,代碼如下:

public class ConnectionOrderedChannelHandler extends WrappedChannelHandler {
    protected final ThreadPoolExecutor connectionExecutor;
    private final int queueWarningLimit;
    public ConnectionOrderedChannelHandler(ChannelHandler handler, URL url) {
        super(handler, url);
        String threadName = url.getParameter(THREAD_NAME_KEY, DEFAULT_THREAD_NAME);
        connectionExecutor = new ThreadPoolExecutor(1, 1,
                0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>(url.getPositiveParameter(CONNECT_QUEUE_CAPACITY, Integer.MAX_VALUE)),
                new NamedThreadFactory(threadName, true),
                new AbortPolicyWithReport(threadName, url)
        );  // FIXME There's no place to release connectionExecutor!
        queueWarningLimit = url.getParameter(CONNECT_QUEUE_WARNING_SIZE, DEFAULT_CONNECT_QUEUE_WARNING_SIZE);
    }
    @Override
    public void connected(Channel channel) throws RemotingException {
        try {
            checkQueueLength();
            connectionExecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED));
        } catch (Throwable t) {
            throw new ExecutionException("connect event", channel, getClass() + " error when process connected event .", t);
        }
    }
    @Override
    public void disconnected(Channel channel) throws RemotingException {
        try {
            checkQueueLength();
            connectionExecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED));
        } catch (Throwable t) {
            throw new ExecutionException("disconnected event", channel, getClass() + " error when process disconnected event .", t);
        }
    }
    @Override
    public void received(Channel channel, Object message) throws RemotingException {
        ExecutorService executor = getPreferredExecutorService(message);
        try {
            executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
        } catch (Throwable t) {
            if (message instanceof Request && t instanceof RejectedExecutionException) {
                sendFeedback(channel, (Request) message, t);
                return;
            }
            throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
        }
    }
    @Override
    public void caught(Channel channel, Throwable exception) throws RemotingException {
        ExecutorService executor = getSharedExecutorService();
        try {
            executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CAUGHT, exception));
        } catch (Throwable t) {
            throw new ExecutionException("caught event", channel, getClass() + " error when process caught event .", t);
        }
    }
    private void checkQueueLength() {
        if (connectionExecutor.getQueue().size() > queueWarningLimit) {
            logger.warn(new IllegalThreadStateException("connectionordered channel handler queue size: " + connectionExecutor.getQueue().size() + " exceed the warning limit number :" + queueWarningLimit));
        }
    }
}

到此這篇關(guān)于Java Dubbo協(xié)議下的服務(wù)端線程使用詳解的文章就介紹到這了,更多相關(guān)Java Dubbo服務(wù)端線程內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • mybatis如何通過接口查找對應(yīng)的mapper.xml及方法執(zhí)行詳解

    mybatis如何通過接口查找對應(yīng)的mapper.xml及方法執(zhí)行詳解

    這篇文章主要給大家介紹了利用mybatis如何通過接口查找對應(yīng)的mapper.xml及方法執(zhí)行的相關(guān)資料,文中通過示例代碼介紹的非常詳細,對大家具有一定的參考學(xué)習(xí)價值,需要的朋友們下面跟著小編一起來學(xué)習(xí)學(xué)習(xí)吧。
    2017-06-06
  • Java輸入三個整數(shù)并把他們由小到大輸出(x,y,z)

    Java輸入三個整數(shù)并把他們由小到大輸出(x,y,z)

    這篇文章主要介紹了輸入三個整數(shù)x,y,z,請把這三個數(shù)由小到大輸出,需要的朋友可以參考下
    2017-02-02
  • Java-IO流實驗

    Java-IO流實驗

    流是一種抽象概念,它代表了數(shù)據(jù)的無結(jié)構(gòu)化傳遞。。用來進行輸入輸出操作的流就稱為IO流。換句話說,IO流就是以流的方式進行輸入輸出,希望能給您帶來幫助
    2021-06-06
  • SpringBoot實現(xiàn)啟動類的存放位置

    SpringBoot實現(xiàn)啟動類的存放位置

    這篇文章主要介紹了SpringBoot實現(xiàn)啟動類的存放位置,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2022-01-01
  • Java IO之包裝流詳解

    Java IO之包裝流詳解

    這篇文章主要為大家介紹了Java IO之包裝流,具有一定的參考價值,感興趣的小伙伴們可以參考一下,希望能夠給你帶來幫助
    2022-01-01
  • Java中正則表達式的使用和詳解(下)

    Java中正則表達式的使用和詳解(下)

    這篇文章主要介紹了Java正則表達式的使用和詳解(下)的相關(guān)資料,包括常用正則表達式和正則表達式語法,非常不錯,具有參考借鑒價值,需要的的朋友參考下吧
    2017-04-04
  • 簡單了解java集合框架LinkedList使用方法

    簡單了解java集合框架LinkedList使用方法

    這篇文章主要介紹了簡單了解java集合框架LinkedList使用方法,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下
    2020-08-08
  • spring-boot中使用spring-boot-devtools的實現(xiàn)代碼

    spring-boot中使用spring-boot-devtools的實現(xiàn)代碼

    這篇文章主要介紹了spring-boot中使用spring-boot-devtools的實現(xiàn)代碼,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2019-11-11
  • 基于Spring5實現(xiàn)登錄注冊功能

    基于Spring5實現(xiàn)登錄注冊功能

    這篇文章主要為大家詳細介紹了基于Spring5實現(xiàn)登錄注冊功能,文中示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2022-09-09
  • java中容器(頂層容器和中間容器)的布局管理器詳解

    java中容器(頂層容器和中間容器)的布局管理器詳解

    這篇文章主要介紹了java中容器(頂層容器和中間容器)的布局管理器詳解,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2021-12-12

最新評論