亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

解析Tars-Java客戶端源碼

 更新時(shí)間:2021年06月17日 14:55:46   作者:vivo互聯(lián)網(wǎng)技術(shù)  
Tars是基于名字服務(wù)使用Tars協(xié)議的高性能RPC開(kāi)發(fā)框架,同時(shí)配套一體化的服務(wù)治理平臺(tái),幫助個(gè)人或者企業(yè)快速的以微服務(wù)的方式構(gòu)建自己穩(wěn)定可靠的分布式應(yīng)用

一、基本RPC框架簡(jiǎn)介

在分布式計(jì)算中,遠(yuǎn)程過(guò)程調(diào)用(Remote Procedure Call,縮寫 RPC)允許運(yùn)行于一臺(tái)計(jì)算機(jī)的程序調(diào)用另一個(gè)地址空間計(jì)算機(jī)的程序,就像調(diào)用本地程序一樣,無(wú)需額外地為這個(gè)交互作用涉及到的代理對(duì)象構(gòu)建、網(wǎng)絡(luò)協(xié)議等進(jìn)行編程。

一般RPC架構(gòu),有至少三種結(jié)構(gòu),分別為注冊(cè)中心,服務(wù)提供者和服務(wù)消費(fèi)者。如圖1.1所示,注冊(cè)中心提供注冊(cè)服務(wù)和注冊(cè)信息變更的通知服務(wù),服務(wù)提供者運(yùn)行在服務(wù)器來(lái)提供服務(wù),服務(wù)消費(fèi)者使用服務(wù)提供者的服務(wù)。

服務(wù)提供者(RPC Server),運(yùn)行在服務(wù)端,提供服務(wù)接口定義與服務(wù)實(shí)現(xiàn)類,并對(duì)外暴露服務(wù)接口。注冊(cè)中心(Registry),運(yùn)行在服務(wù)端,負(fù)責(zé)記錄服務(wù)提供者的服務(wù)對(duì)象,并提供遠(yuǎn)程服務(wù)信息的查詢服務(wù)和變更通知服務(wù)。服務(wù)消費(fèi)者(RPC Client),運(yùn)行在客戶端,通過(guò)遠(yuǎn)程代理對(duì)象調(diào)用遠(yuǎn)程服務(wù)。

1.1、RPC調(diào)用流程

如下圖所示,描述了RPC的調(diào)用流程,其中IDL(Interface Description Language)為接口描述語(yǔ)言,使得在不同平臺(tái)上運(yùn)行的程序和用不同語(yǔ)言編寫的程序可以相互通信交流。

1)客戶端調(diào)用客戶端樁模塊。該調(diào)用是本地過(guò)程調(diào)用,其中參數(shù)以正常方式推入堆棧。

2)客戶端樁模塊將參數(shù)打包到消息中,并進(jìn)行系統(tǒng)調(diào)用以發(fā)送消息。打包參數(shù)稱為編組。

3)客戶端的本地操作系統(tǒng)將消息從客戶端計(jì)算機(jī)發(fā)送到服務(wù)器計(jì)算機(jī)。

4)服務(wù)器計(jì)算機(jī)上的本地操作系統(tǒng)將傳入的數(shù)據(jù)包傳遞到服務(wù)器樁模塊。

5)服務(wù)器樁模塊從消息中解包出參數(shù)。解包參數(shù)稱為解組。

6)最后,服務(wù)器樁模塊執(zhí)行服務(wù)器程序流程?;貜?fù)是沿相反的方向執(zhí)行相同的步驟。

二、Tars Java客戶端設(shè)計(jì)介紹

Tars Java客戶端整體設(shè)計(jì)與主流的RPC框架基本一致。我們先介紹Tars Java客戶端初始化過(guò)程。

2.1、Tars Java客戶端初始化過(guò)程

如圖2.1所示,描述了Tars Java的初始化過(guò)程。

1)先出創(chuàng)建一個(gè)CommunicatorConfig配置項(xiàng),命名為communicatorConfig,其中按需設(shè)置locator, moduleName, connections等參數(shù)。

2)通過(guò)上述的CommunicatorConfig配置項(xiàng),命名為config,那么調(diào)用CommunicatorFactory.getInstance().getCommunicator(config),創(chuàng)建一個(gè)Communicator對(duì)象,命名為communicator。

3)假設(shè)objectName="MESSAGE.ControlCenter.Dispatcher",需要生成的代理接口為Dispatcher.class,調(diào)用communicator.stringToProxy(objectName, Dispatcher.class)方法來(lái)生成代理對(duì)象的實(shí)現(xiàn)類。

4)在stringToProxy()方法里,首先通過(guò)初始化QueryHelper代理對(duì)象,調(diào)用getServerNodes()方法獲取遠(yuǎn)程服務(wù)對(duì)象列表,并設(shè)置該返回值到communicatorConfig的objectName字段里。具體的代理對(duì)象的代碼分析,見(jiàn)下文中的“2.3 代理生成”章節(jié)。

5)判斷在之前調(diào)用stringToProxy是否有設(shè)置LoadBalance參數(shù),如果沒(méi)有的話,就生成默認(rèn)的采用RR輪訓(xùn)算法的DefaultLoadBalance對(duì)象。

6)創(chuàng)建TarsProtocolInvoker協(xié)議調(diào)用對(duì)象,其中過(guò)程有通過(guò)解析communicatorConfig中的objectName和simpleObjectName來(lái)獲取URL列表,其中一個(gè)URL對(duì)應(yīng)一個(gè)遠(yuǎn)程服務(wù)對(duì)象,TarsProtocolInvoker初始化各個(gè)URL對(duì)應(yīng)的ServantClient對(duì)象,其中一個(gè)URL根據(jù)communicatorConfig的connections配置項(xiàng)確認(rèn)生成多少個(gè)ServantClient對(duì)象。然后使用ServantClients等參數(shù)初始化TarsInvoker對(duì)象,并將這些TarsInvoker對(duì)象集合設(shè)置到TarsProtocolInvoker的allInvokers成員變量中,其中每個(gè)URL對(duì)應(yīng)一個(gè)TarsInvoker對(duì)象。上述分析表明,一個(gè)遠(yuǎn)程服務(wù)節(jié)點(diǎn)對(duì)應(yīng)一個(gè)TarsInvoker對(duì)象,一個(gè)TarsInvoker對(duì)象包含connections個(gè)ServantClient對(duì)象,對(duì)于TCP協(xié)議,那么就是一個(gè)ServantClient對(duì)象對(duì)應(yīng)一個(gè)TCP連接。

7)使用api, objName, servantProxyConfig,loadBalance,protocolInvoker, this.communicator參數(shù)生成一個(gè)實(shí)現(xiàn)JDK代理接口InvocationHandler的ObjectProxy對(duì)象。

8)生成ObjectProxy對(duì)象的同時(shí)進(jìn)行初始化操作,首先會(huì)執(zhí)行l(wèi)oadBalancer.refresh()方法刷新遠(yuǎn)程服務(wù)節(jié)點(diǎn)到負(fù)載均衡器中便于后續(xù)tars遠(yuǎn)程調(diào)用進(jìn)行路由。

9)然后注冊(cè)統(tǒng)計(jì)信息上報(bào)器,其中是上報(bào)方法采用JDK的ScheduledThreadPoolExecutor進(jìn)行定時(shí)輪訓(xùn)上報(bào)。

10)注冊(cè)服務(wù)列表刷新器,采用的技術(shù)方法和上述統(tǒng)計(jì)信息上報(bào)器基本一致。

2.2、使用范例

以下代碼為最簡(jiǎn)化示例,其中CommunicatorConfig里的配置采用默認(rèn)值,communicator通過(guò)CommunicatorConfig配置生成后,直接指定遠(yuǎn)程服務(wù)對(duì)象的具體服務(wù)對(duì)象名、IP和端口生成一個(gè)遠(yuǎn)程服務(wù)代理對(duì)象。

Tars Java代碼使用范例// 先初始化基本Tars配置CommunicatorConfig cfg = new CommunicatorConfig();// 通過(guò)上述的CommunicatorConfig配置生成一個(gè)Communicator對(duì)象。Communicator communicator = CommunicatorFactory.getInstance().getCommunicator(cfg);// 指定Tars遠(yuǎn)程服務(wù)的服務(wù)對(duì)象名、IP和端口生成一個(gè)遠(yuǎn)程服務(wù)代理對(duì)象。

// 先初始化基本Tars配置
CommunicatorConfig cfg = new CommunicatorConfig();
// 通過(guò)上述的CommunicatorConfig配置生成一個(gè)Communicator對(duì)象。
Communicator communicator = CommunicatorFactory.getInstance().getCommunicator(cfg);
// 指定Tars遠(yuǎn)程服務(wù)的服務(wù)對(duì)象名、IP和端口生成一個(gè)遠(yuǎn)程服務(wù)代理對(duì)象。
HelloPrx proxy = communicator.stringToProxy(HelloPrx.class, "TestApp.HelloServer.HelloObj@tcp -h 127.0.0.1 -p 18601 -t 60000");
//同步調(diào)用,阻塞直到遠(yuǎn)程服務(wù)對(duì)象的方法返回結(jié)果
String ret = proxy.hello(3000, "Hello World");
System.out.println(ret);
//異步調(diào)用,不關(guān)注異步調(diào)用最終的情況
proxy.async_hello(null, 3000, "Hello World");
    //異步調(diào)用,注冊(cè)一個(gè)實(shí)現(xiàn)TarsAbstractCallback接口的回執(zhí)處理對(duì)象,該實(shí)現(xiàn)類分別處理調(diào)用成功,調(diào)用超時(shí)和調(diào)用異常的情況。
proxy.async_hello(new HelloPrxCallback() {
    @Override
    public void callback_expired() { //超時(shí)事件處理
    }
    @Override
    public void callback_exception(Throwable ex) { //異常事件處理
    }
    @Override
    public void callback_hello(String ret) { //調(diào)用成功事件處理
        Main.logger.info("invoke async method successfully {}", ret);
    }
}, 1000, "Hello World");

在上述例子中,演示了常見(jiàn)的兩種調(diào)用方式,分別為同步調(diào)用和異步調(diào)用。其中異步調(diào)用,如果調(diào)用方想捕捉異步調(diào)用的最終結(jié)果,可以注冊(cè)一個(gè)實(shí)現(xiàn)TarsAbstractCallback接口的實(shí)現(xiàn)類,對(duì)tars調(diào)用的異常,超時(shí)和成功事件進(jìn)行處理。

2.3、代理生成

Tars Java的客戶端樁模塊的遠(yuǎn)程代理對(duì)象是采用JDK原生Proxy方法。如下文的源碼所示,ObjectProxy實(shí)現(xiàn)了java.lang.reflect.InvocationHandler的接口方法,該接口是JDK自帶的代理接口。

代理實(shí)現(xiàn)

public final class ObjectProxy<T> implements ServantProxy, InvocationHandler {
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        String methodName = method.getName();
        Class<?>[] parameterTypes = method.getParameterTypes();
        InvokeContext context = this.protocolInvoker.createContext(proxy, method, args);
        try {
            if ("toString".equals(methodName) && parameterTypes.length == 0) {
                return this.toString();
            } else if
                //***** 省略代碼 *****
            } else {
                // 在負(fù)載均衡器選取一個(gè)遠(yuǎn)程調(diào)用類,進(jìn)行應(yīng)用層協(xié)議的封裝,最后調(diào)用TCP傳輸層進(jìn)行發(fā)送。
                Invoker invoker = this.loadBalancer.select(context);
                return invoker.invoke(context);
            }
        } catch (Throwable var8) {
            // ***** 省略代碼 *****
        }
    }
}

當(dāng)然生成上述遠(yuǎn)程服務(wù)代理類,涉及到輔助類,Tars Java采用ServantProxyFactory來(lái)生成上述的ObjectProxy,并存儲(chǔ)ObjectProxy對(duì)象到Map結(jié)構(gòu),便于調(diào)用方二次使用時(shí)直接復(fù)用已存在的遠(yuǎn)程服務(wù)代理對(duì)象。

具體相關(guān)邏輯如源碼所示,ObjectProxyFactory是生成ObjectProxy的輔助工廠類,和ServantProxyFactory不同,其本身不緩存生成的代理對(duì)象。

class ServantProxyFactory {
    private final ConcurrentHashMap<String, Object> cache = new ConcurrentHashMap();
    // ***** 省略代碼 *****
    public <T> Object getServantProxy(Class<T> clazz, String objName, ServantProxyConfig servantProxyConfig, LoadBalance loadBalance, ProtocolInvoker<T> protocolInvoker) {
        Object proxy = this.cache.get(objName);
        if (proxy == null) {
            this.lock.lock(); // 加鎖,保證只生成一個(gè)遠(yuǎn)程服務(wù)代理對(duì)象。
            try {
                proxy = this.cache.get(objName);
                if (proxy == null) {
                    // 創(chuàng)建實(shí)現(xiàn)JDK的java.lang.reflect.InvocationHandler接口的對(duì)象
                    ObjectProxy<T> objectProxy = this.communicator.getObjectProxyFactory().getObjectProxy(clazz, objName, servantProxyConfig, loadBalance, protocolInvoker);
                    // 使用JDK的java.lang.reflect.Proxy來(lái)生成實(shí)際的代理對(duì)象
                    this.cache.putIfAbsent(objName, this.createProxy(clazz, objectProxy));
                    proxy = this.cache.get(objName);
                }
            } finally {
                this.lock.unlock();
            }
        }
        return proxy;
    }
    /** 使用JDK自帶的Proxy.newProxyInstance生成代理對(duì)象 */
    private <T> Object createProxy(Class<T> clazz, ObjectProxy<T> objectProxy) {
        return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), new Class[]{clazz, ServantProxy.class}, objectProxy);
    }
    // ***** 省略代碼 *****
}

從以上的源碼中,可以看到createProxy使用了JDK的Proxy.newProxyInstance方法來(lái)生成遠(yuǎn)程服務(wù)代理對(duì)象。

2.4、遠(yuǎn)程服務(wù)尋址方法

作為一個(gè)RPC遠(yuǎn)程框架,在分布式系統(tǒng)中,調(diào)用遠(yuǎn)程服務(wù),涉及到如何路由的問(wèn)題,也就是如何從多個(gè)遠(yuǎn)程服務(wù)節(jié)點(diǎn)中選擇一個(gè)服務(wù)節(jié)點(diǎn)進(jìn)行調(diào)用,當(dāng)然Tars Java支持直連特定節(jié)點(diǎn)的方式調(diào)用遠(yuǎn)程服務(wù),如上文的2.2 使用范例所介紹。

如圖下圖所示,ClientA某個(gè)時(shí)刻的一次調(diào)用使用了Service3節(jié)點(diǎn)進(jìn)行遠(yuǎn)程服務(wù)調(diào)用,而ClientB某個(gè)時(shí)刻的一次調(diào)用采用Service2節(jié)點(diǎn)。Tars Java提供多種負(fù)載均衡算法實(shí)現(xiàn)類,其中有采用RR輪訓(xùn)算法的RoundRobinLoadBalance,一致性哈希算法的ConsistentHashLoadBalance和普通哈希算法的HashLoadBalance。

(客戶端按特定路由規(guī)則調(diào)用遠(yuǎn)程服務(wù))

如下述源碼所示,如果要自定義負(fù)載均衡器來(lái)定義遠(yuǎn)程調(diào)用的路由規(guī)則,那么需要實(shí)現(xiàn)com.qq.tars.rpc.common.LoadBalance接口,其中LoadBalance.select()方法負(fù)責(zé)按照路由規(guī)則,選取對(duì)應(yīng)的Invoker對(duì)象,然后進(jìn)行遠(yuǎn)程調(diào)用,具體邏輯見(jiàn)源碼代理實(shí)現(xiàn)。由于遠(yuǎn)程服務(wù)節(jié)點(diǎn)可能發(fā)生變更,比如上下線遠(yuǎn)程服務(wù)節(jié)點(diǎn),需要刷新本地負(fù)載均衡器的路由信息,那么此信息更新的邏輯在LoadBalance.refresh()方法里實(shí)現(xiàn)。

負(fù)載均衡接口

public interface LoadBalance<T> {
    /** 根據(jù)負(fù)載均衡策略,挑選invoker */
    Invoker<T> select(InvokeContext invokeContext) throws NoInvokerException;
    /** 通知invoker列表的更新 */
    void refresh(Collection<Invoker<T>> invokers);
}

2.5、網(wǎng)絡(luò)模型

Tars Java的IO模式采用的JDK的NIO的Selector模式。這里以TCP協(xié)議來(lái)描述網(wǎng)絡(luò)處理,如下述源碼所示,Reactor是一個(gè)線程,其中的run()方法中,調(diào)用了selector.select()方法,意思是如果除非此時(shí)網(wǎng)絡(luò)產(chǎn)生一個(gè)事件,否則將一直線程阻塞下去。

假如此時(shí)出現(xiàn)一個(gè)網(wǎng)絡(luò)事件,那么此時(shí)線程將會(huì)被喚醒,執(zhí)行后續(xù)代碼,其中一個(gè)代碼是dispatcheEvent(key),也就是將進(jìn)行事件的分發(fā)。

其中將根據(jù)對(duì)應(yīng)條件,調(diào)用acceptor.handleConnectEvent(key)方法來(lái)處理客戶端連接成功事件,或acceptor.handleAcceptEvent(key)方法來(lái)處理服務(wù)器接受連接成功事件,或調(diào)用acceptor.handleReadEvent(key)方法從Socket里讀取數(shù)據(jù),或acceptor.handleWriteEvent(key)方法來(lái)寫數(shù)據(jù)到Socket 。

Reactor事件處理

public final class Reactor extends Thread {
    protected volatile Selector selector = null;
    private Acceptor acceptor = null;
    //***** 省略代碼 *****
    public void run() {
        try {
            while (!Thread.interrupted()) {
                // 阻塞直到有網(wǎng)絡(luò)事件發(fā)生。
                selector.select();
                //***** 省略代碼 *****
                while (iter.hasNext()) {
                    SelectionKey key = iter.next();
                    iter.remove();
                    if (!key.isValid()) continue;
                    try {
                        //***** 省略代碼 *****
                        // 分發(fā)傳輸層協(xié)議TCP或UDP網(wǎng)絡(luò)事件
                        dispatchEvent(key);
                //***** 省略代碼 *****
            }
        }
        //***** 省略代碼 *****
    }
        //***** 省略代碼 *****
    private void dispatchEvent(final SelectionKey key) throws IOException {
        if (key.isConnectable()) {
            acceptor.handleConnectEvent(key);
        } else if (key.isAcceptable()) {
            acceptor.handleAcceptEvent(key);
        } else if (key.isReadable()) {
            acceptor.handleReadEvent(key);
        } else if (key.isValid() && key.isWritable()) {
            acceptor.handleWriteEvent(key);
        }
    }
}

網(wǎng)絡(luò)處理采用Reactor事件驅(qū)動(dòng)模式,Tars定義一個(gè)Reactor對(duì)象對(duì)應(yīng)一個(gè)Selector對(duì)象,針對(duì)每個(gè)遠(yuǎn)程服務(wù)(整體服務(wù)集群,非單個(gè)節(jié)點(diǎn)程序)默認(rèn)創(chuàng)建2個(gè)Reactor對(duì)象進(jìn)行處理。

上圖中的處理讀IO事件(Read Event)實(shí)現(xiàn)和寫IO事件(Write Event)的線程池是在Communicator初始化的時(shí)候配置的。具體邏輯如源碼所示,其中線程池參數(shù)配置由CommunicatorConfig的corePoolSize, maxPoolSize, keepAliveTime等參數(shù)決定。

讀寫事件線程池初始化

private void initCommunicator(CommunicatorConfig config) throws CommunicatorConfigException {
    //***** 省略代碼 *****
    this.threadPoolExecutor = ClientPoolManager.getClientThreadPoolExecutor(config);
    //***** 省略代碼 *****
}
​
public class ClientPoolManager {
    public static ThreadPoolExecutor getClientThreadPoolExecutor(CommunicatorConfig communicatorConfig) {
        //***** 省略代碼 *****
        clientThreadPoolMap.put(communicatorConfig, createThreadPool(communicatorConfig));
        //***** 省略代碼 *****
        return clientPoolExecutor;
    }    
     
    private static ThreadPoolExecutor createThreadPool(CommunicatorConfig communicatorConfig) {
        int corePoolSize = communicatorConfig.getCorePoolSize();
        int maxPoolSize = communicatorConfig.getMaxPoolSize();
        int keepAliveTime = communicatorConfig.getKeepAliveTime();
        int queueSize = communicatorConfig.getQueueSize();
        TaskQueue taskqueue = new TaskQueue(queueSize);
​
        String namePrefix = "tars-client-executor-";
        TaskThreadPoolExecutor executor = new TaskThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime, TimeUnit.SECONDS, taskqueue, new TaskThreadFactory(namePrefix));
        taskqueue.setParent(executor);
        return executor;
    }
}

2.6、遠(yuǎn)程調(diào)用交互模型

調(diào)用代理類的方法,那么會(huì)進(jìn)入實(shí)現(xiàn)InvocationHandler接口的ObjectProxy中的invoke方法。

下圖描述了遠(yuǎn)程服務(wù)調(diào)用的流程情況。這里著重講幾個(gè)點(diǎn),一個(gè)是如何寫數(shù)據(jù)到網(wǎng)絡(luò)IO。第二個(gè)是Tars Java通過(guò)什么方式進(jìn)行同步或者異步調(diào)用,底層采用了什么技術(shù)。

2.6.1、寫 IO 流程

如圖(底層代碼寫IO過(guò)程)所示,ServantClient將調(diào)用底層網(wǎng)絡(luò)寫操作,在invokeWithSync方法中,取得ServantClient自身成員變量TCPSession,調(diào)用TCPSession.write()方法,如圖(底層代碼寫IO過(guò)程)和以下源碼( 讀寫事件線程池初始化)所示,先獲取Encode進(jìn)行請(qǐng)求內(nèi)容編碼成IoBuffer對(duì)象,最后將IoBuffer的java.nio.ByteBuffer內(nèi)容放入TCPSession的queue成員變量中,然后調(diào)用key.selector().wakeup(),喚醒Reactor中run()方法中的Selector.select(),執(zhí)行后續(xù)的寫操作。

具體Reactor邏輯見(jiàn)上文2.5 網(wǎng)絡(luò)模型內(nèi)容,如果Reactor檢查條件發(fā)現(xiàn)可以寫IO的話也就是key.isWritable()為true,那么最終會(huì)循環(huán)從TCPSession.queue中取出ByteBuffer對(duì)象,調(diào)用SocketChannel.write(byteBuffer)執(zhí)行實(shí)際的寫網(wǎng)絡(luò)Socket操作,代碼邏輯見(jiàn)源碼中的doWrite()方法。

讀寫事件線程池初始化

public class TCPSession extends Session {
    public void write(Request request) throws IOException {
        try {
            IoBuffer buffer = selectorManager.getProtocolFactory().getEncoder().encodeRequest(request, this);
            write(buffer);
        //***** 省略代碼 *****
    }
    protected void write(IoBuffer buffer) throws IOException {
        //***** 省略代碼 *****
        if (!this.queue.offer(buffer.buf())) {
            throw new IOException("The session queue is full. [ queue size:" + queue.size() + " ]");
        }
        if (key != null) {
            key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
            key.selector().wakeup();
        }
    }
    protected synchronized int doWrite() throws IOException {
        int writeBytes = 0;
        while (true) {
            ByteBuffer wBuf = queue.peek();
            //***** 省略代碼 *****
            int bytesWritten = ((SocketChannel) channel).write(wBuf);
            //***** 省略代碼 *****
        return writeBytes;
    }
}

2.6.2、同步和異步調(diào)用的底層技術(shù)實(shí)現(xiàn)

對(duì)于同步方法調(diào)用,如圖(遠(yuǎn)程調(diào)用流程)和源碼(ServantClient的同步調(diào)用)所示,ServantClient調(diào)用底層網(wǎng)絡(luò)寫操作,在invokeWithSync方法中創(chuàng)建一個(gè)Ticket對(duì)象,Ticket顧名思義就是票的意思,這張票唯一標(biāo)識(shí)本次網(wǎng)絡(luò)調(diào)用情況。

ServantClient的同步調(diào)用

public class ServantClient {
    public <T extends ServantResponse> T invokeWithSync(ServantRequest request) throws IOException {
            //***** 省略代碼 *****
            ticket = TicketManager.createTicket(request, session, this.syncTimeout);
            Session current = session;
            current.write(request);
            if (!ticket.await(this.syncTimeout, TimeUnit.MILLISECONDS)) {
            //***** 省略代碼 *****
            response = ticket.response();
            //***** 省略代碼 *****
            return response;
            //***** 省略代碼 *****
        return response;
    }
}

如代碼所示,在執(zhí)行完session.write()操作后,緊接著執(zhí)行ticket.await()方法,該方法線程等待直到遠(yuǎn)程服務(wù)回復(fù)返回結(jié)果到客戶端,ticket.await()被喚醒后,將執(zhí)行后續(xù)操作,最終invokeWithSync方法返回response對(duì)象。其中Ticket的等待喚醒功能內(nèi)部采用java.util.concurrent.CountDownLatch來(lái)實(shí)現(xiàn)。

對(duì)于異步方法調(diào)用,將會(huì)執(zhí)行ServantClient.invokeWithAsync方法,也會(huì)創(chuàng)建一個(gè)Ticket,并且執(zhí)行Session.write()操作,雖然不會(huì)調(diào)用ticket.await(),但是在Reactor接收到遠(yuǎn)程回復(fù)時(shí),首先會(huì)先解析Tars協(xié)議頭得到Response對(duì)象,然后將Response對(duì)象放入如圖(Tars-Java的網(wǎng)絡(luò)事件處理模型)所示的IO讀寫線程池中進(jìn)行進(jìn)一步處理,如下述源碼(異步回調(diào)事件處理)所示,最終會(huì)調(diào)用WorkThread.run()方法,在run()方法里執(zhí)行ticket.notifyResponse(resp),該方法里面會(huì)執(zhí)行類似上述代碼2.1中的實(shí)現(xiàn)TarsAbstractCallback接口的調(diào)用成功回調(diào)的方法。

異步回調(diào)事件處理

public final class WorkThread implements Runnable {
    public void run() {
        try {
            //***** 省略代碼 *****
                Ticket<Response> ticket = TicketManager.getTicket(resp.getTicketNumber());
            //***** 省略代碼 *****
                ticket.notifyResponse(resp);
                ticket.countDown();
                TicketManager.removeTicket(ticket.getTicketNumber());
            }
            //***** 省略代碼 *****
    }
}

如下述源碼所示,TicketManager會(huì)有一個(gè)定時(shí)任務(wù)輪訓(xùn)檢查所有的調(diào)用是否超時(shí),如果(currentTime - t.startTime) > t.timeout條件成立,那么會(huì)調(diào)用t.expired()告知回調(diào)對(duì)象,本次調(diào)用超時(shí)。

調(diào)用超時(shí)事件處理

public class TicketManager {
            //***** 省略代碼 *****
    static {
        executor.scheduleAtFixedRate(new Runnable() {
            long currentTime = -1;
            public void run() {
                Collection<Ticket<?>> values = tickets.values();
                currentTime = System.currentTimeMillis();
                for (Ticket<?> t : values) {
                    if ((currentTime - t.startTime) > t.timeout) {
                        removeTicket(t.getTicketNumber());
                        t.expired();
                    }
                }
            }
        }, 500, 500, TimeUnit.MILLISECONDS);
    }
}

三、總結(jié)

代碼的調(diào)用一般都是層層遞歸調(diào)用,代碼的調(diào)用深度和廣度都很大,通過(guò)調(diào)試代碼的方式一步步學(xué)習(xí)源碼的方式,更加容易理解源碼的含義和設(shè)計(jì)理念。

Tars與其他RPC框架,并沒(méi)有什么本質(zhì)區(qū)別,通過(guò)類比其他框架的設(shè)計(jì)理念,可以更加深入理解Tars Java設(shè)計(jì)理念。

以上就是解析Tars-Java客戶端源碼的詳細(xì)內(nèi)容,更多關(guān)于Tars-Java 客戶端源碼的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • Eclipse中配置Maven的圖文教程

    Eclipse中配置Maven的圖文教程

    這篇文章主要介紹了Eclipse中配置Maven的圖文教程,需要的朋友可以參考下
    2020-12-12
  • 快速校驗(yàn)實(shí)體類時(shí),@Valid,@Validated,@NotNull注解無(wú)效的解決

    快速校驗(yàn)實(shí)體類時(shí),@Valid,@Validated,@NotNull注解無(wú)效的解決

    這篇文章主要介紹了快速校驗(yàn)實(shí)體類時(shí),@Valid,@Validated,@NotNull注解無(wú)效的解決方案,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2021-10-10
  • Hibernate核心類和接口的詳細(xì)介紹

    Hibernate核心類和接口的詳細(xì)介紹

    今天小編就為大家分享一篇關(guān)于Hibernate核心類和接口的詳細(xì)介紹,小編覺(jué)得內(nèi)容挺不錯(cuò)的,現(xiàn)在分享給大家,具有很好的參考價(jià)值,需要的朋友一起跟隨小編來(lái)看看吧
    2019-03-03
  • Java中的CopyOnWriteArrayList深入解讀

    Java中的CopyOnWriteArrayList深入解讀

    這篇文章主要介紹了Java中的CopyOnWriteArrayList深入解讀,在 ArrayList 的類注釋上,JDK 就提醒了我們,如果要把 ArrayList 作為共享變量的話,是線程不安全的,需要的朋友可以參考下
    2023-12-12
  • Java中Arrays.asList()方法詳解及實(shí)例

    Java中Arrays.asList()方法詳解及實(shí)例

    這篇文章主要介紹了Java中Arrays.asList()方法將數(shù)組作為列表時(shí)的一些差異的相關(guān)資料,需要的朋友可以參考下
    2017-06-06
  • Java中如何使用?byte?數(shù)組作為?Map?的?key

    Java中如何使用?byte?數(shù)組作為?Map?的?key

    本文將討論在使用HashMap時(shí),當(dāng)byte數(shù)組作為key時(shí)所遇到的問(wèn)題及其解決方案,介紹使用String和List這兩種數(shù)據(jù)結(jié)構(gòu)作為臨時(shí)解決方案的方法,感興趣的朋友跟隨小編一起看看吧
    2023-06-06
  • java 多態(tài)性詳解及簡(jiǎn)單實(shí)例

    java 多態(tài)性詳解及簡(jiǎn)單實(shí)例

    這篇文章主要介紹了java 多態(tài)性詳解及簡(jiǎn)單實(shí)例的相關(guān)資料,需要的朋友可以參考下
    2017-02-02
  • 詳解jdbc實(shí)現(xiàn)對(duì)CLOB和BLOB數(shù)據(jù)類型的操作

    詳解jdbc實(shí)現(xiàn)對(duì)CLOB和BLOB數(shù)據(jù)類型的操作

    這篇文章主要介紹了詳解jdbc實(shí)現(xiàn)對(duì)CLOB和BLOB數(shù)據(jù)類型的操作的相關(guān)資料,這里實(shí)現(xiàn)寫入操作與讀寫操作,需要的朋友可以參考下
    2017-08-08
  • Java中的TreeSet源碼解讀

    Java中的TreeSet源碼解讀

    這篇文章主要介紹了Java中的TreeSet源碼解讀,TreeSet 是一個(gè) 有序集合,它擴(kuò)展了 AbstractSet 類并實(shí)現(xiàn)了 NavigableSet 接口,對(duì)象根據(jù)其自然順序以升序排序和存儲(chǔ),該 TreeSet 中使用 平衡樹(shù),更具體的一個(gè) 紅黑樹(shù),需要的朋友可以參考下
    2023-09-09
  • Java模仿微信實(shí)現(xiàn)零錢通簡(jiǎn)易功能(兩種版本)

    Java模仿微信實(shí)現(xiàn)零錢通簡(jiǎn)易功能(兩種版本)

    本文主要介紹了使用Java開(kāi)發(fā)零錢通項(xiàng)目, 模仿微信實(shí)現(xiàn)簡(jiǎn)易功能,可以完成收益入賬,消費(fèi),查看明細(xì),退出系統(tǒng)等功能。文中一共介紹了兩種實(shí)現(xiàn)方法,快來(lái)學(xué)習(xí)吧
    2021-12-12

最新評(píng)論