亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

elasticsearch源碼分析index?action實現(xiàn)方式

 更新時間:2022年04月21日 17:23:15   作者:zziawan  
這篇文章主要為大家介紹了elasticsearch源碼分析index?action實現(xiàn)方式,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪

action的作用

上一篇從結(jié)構(gòu)上分析了action的,本篇將以index action為例仔分析一下action的實現(xiàn)方式。

再概括一下action的作用:對于每種功能(如index)action都會包括兩個基本的類*action(IndexAction)和Transport*action(TransportIndexAction),前者類中會有一個實例(IndexAction INSTANCE = new IndexAction())這個實例用于client綁定對應(yīng)的TransportAction(registerAction(IndexAction.INSTANCE, TransportIndexAction.class)),綁定過程發(fā)送在ActionModuel中。

另外在Action類中還會定義一個action的名字(String NAME = "indices:data/write/index")這個名字用于TransportService綁定對于的handle,用于處理NettyTransport接收到的信息。TransportAction的是最終的邏輯處理者,當接收到請求時,會首先判斷本節(jié)點能否處理,如果能夠處理則調(diào)用相關(guān)的方法處理得到結(jié)果返回,否則將通過NettyTransport轉(zhuǎn)發(fā)該請求到對應(yīng)的node進行處理。所有的Transport的結(jié)構(gòu)都是這種類型。

TransportAction的類圖

首先看一下TransportAction的類圖,所的Transport*action都繼承自于它。

它主要由兩個方法execute和doExecute,execute方法有兩種實現(xiàn),第一種實現(xiàn)需要自行添加actionListener。最終的邏輯都在doExecute方法中,這個方法在各個功能模塊中實現(xiàn)。以下是TransportIndexAction的繼承關(guān)系:

實現(xiàn)上由于功能劃分的原因,TransportIndexAction直接繼承自TranspShardReplicationOperationAction,這個抽象類中的方法是所有需要操作shard副本的功能action的父,因此它的實現(xiàn)還包括delete,bulk等功能action。它實現(xiàn)了多個內(nèi)部類,這些內(nèi)部類用來輔助完成相關(guān)的功能。這里主要說一下OperationTransportHandler,ReplicaOperationTransportHandler及AsyncShardOperationAction三個子類。

OperationTransportHandler的代碼

如下所示:

class OperationTransportHandler extends BaseTransportRequestHandler<Request> {
//繼承自BaseTransportRequestHanlder
………………
        @Override
        public void messageReceived(final Request request, final TransportChannel channel) throws Exception {
            // no need to have a threaded listener since we just send back a response
            request.listenerThreaded(false);
            // if we have a local operation, execute it on a thread since we don't spawn
            request.operationThreaded(true);
      //調(diào)用Transport的execute方法,通過channel返回結(jié)果
            execute(request, new ActionListener<Response>() {
                @Override
                public void onResponse(Response result) {
                    try {
                        channel.sendResponse(result);
                    } catch (Throwable e) {
                        onFailure(e);
                    }
                }
                @Override
                public void onFailure(Throwable e) {
                    try {
                        channel.sendResponse(e);
                    } catch (Throwable e1) {
                        logger.warn("Failed to send response for " + actionName, e1);
                    }
                }
            });
        }

看過NettyTransport請求發(fā)送和處理的同學一定對這個代碼不陌生,這就是elasticsearch節(jié)點間處理信息的典型模式。當請求通過NettyTransport發(fā)送到本節(jié)點時會根據(jù)請求的action名稱找到對應(yīng)的handler,使用對應(yīng)的handler來處理該請求。這個handler就對應(yīng)著“indices:data/write/index”,可以看到它調(diào)用execute方法來處理。它的注冊時在TransportShardReplicationOperationAction構(gòu)造函數(shù)中完成的。

知道了OperationTransportHandler,ReplicaOperationTransportHandler就好理解了它的實現(xiàn)方式跟前者完全一樣,對應(yīng)的action名稱加了一個“[r]”,它的作用是處理需要在副本上進行的操作,代碼如下所示:

class ReplicaOperationTransportHandler extends BaseTransportRequestHandler<ReplicaOperationRequest> {
……………………
        @Override
        public void messageReceived(final ReplicaOperationRequest request, final TransportChannel channel) throws Exception {
            try {
                shardOperationOnReplica(request);
            } catch (Throwable t) {
                failReplicaIfNeeded(request.shardId.getIndex(), request.shardId.id(), t);
                throw t;
            }
            channel.sendResponse(TransportResponse.Empty.INSTANCE);
        }
    }

可以看到代碼結(jié)構(gòu)非常像,只是調(diào)用了副本操作的方法shardOperationOnReplica,這個方法在這TransportShardReplicationOperationAction中是抽象的,它的實現(xiàn)在各個子類中,例如deleteaction中實現(xiàn)了對于delete請求如何在副本上處理。

分析完這兩個handle是不是對于action的處理過程有了一定的眉目了呢?但是這才是冰山一角,這兩個Handler是用來接收來自其它節(jié)點的請求,如果請求的正好是本節(jié)點該如何處理呢?這些邏輯都在AsyncShardOperationAction類中。首先看一下它的內(nèi)部結(jié)構(gòu):

因為TransportShardReplicationOperationAction的所有子類都是對索引的修改,會引起數(shù)據(jù)不一致,因此它的操作流程都是現(xiàn)在primaryShard上操作然后是Replicashard上操作。代碼如下所示:

protected void doStart() throws ElasticsearchException {
            try {
          //檢查是否有阻塞
                ClusterBlockException blockException = checkGlobalBlock(observer.observedState());
                if (blockException != null) {
                    if (blockException.retryable()) {
                        logger.trace("cluster is blocked ({}), scheduling a retry", blockException.getMessage());
                        retry(blockException);
                        return;
                    } else {
                        throw blockException;
                    }
                }
          //檢測是否是創(chuàng)建索引
                if (resolveIndex()) {
                    internalRequest.concreteIndex(observer.observedState().metaData().concreteSingleIndex(internalRequest.request().index(), internalRequest.request().indicesOptions()));
                } else {
                    internalRequest.concreteIndex(internalRequest.request().index());
                }
                // check if we need to execute, and if not, return
                if (!resolveRequest(observer.observedState(), internalRequest, listener)) {
                    return;
                }
          //再次檢測是否有阻塞
                blockException = checkRequestBlock(observer.observedState(), internalRequest);
                if (blockException != null) {
                    if (blockException.retryable()) {
                        logger.trace("cluster is blocked ({}), scheduling a retry", blockException.getMessage());
                        retry(blockException);
                        return;
                    } else {
                        throw blockException;
                    }
                }
                shardIt = shards(observer.observedState(), internalRequest);
            } catch (Throwable e) {
                listener.onFailure(e);
                return;
            }
        //查找primaryShard
            boolean foundPrimary = false;
            ShardRouting shardX;
            while ((shardX = shardIt.nextOrNull()) != null) {
                final ShardRouting shard = shardX;
                // we only deal with primary shardIt here...
                if (!shard.primary()) {
                    continue;
                }
                if (!shard.active() || !observer.observedState().nodes().nodeExists(shard.currentNodeId())) {
                    logger.trace("primary shard [{}] is not yet active or we do not know the node it is assigned to [{}], scheduling a retry.", shard.shardId(), shard.currentNodeId());
                    retryBecauseUnavailable(shardIt.shardId(), "Primary shard is not active or isn't assigned to a known node.");
                    return;
                }
                if (!primaryOperationStarted.compareAndSet(false, true)) {
                    return;
                }
                foundPrimary = true;
          //primaryShard就在本地,直接進行相關(guān)操作
                if (shard.currentNodeId().equals(observer.observedState().nodes().localNodeId())) {
                    try {
                        if (internalRequest.request().operationThreaded()) {
                            internalRequest.request().beforeLocalFork();
                            threadPool.executor(executor).execute(new Runnable() {
                                @Override
                                public void run() {
                                    try {
                                        performOnPrimary(shard.id(), shard);
                                    } catch (Throwable t) {
                                        listener.onFailure(t);
                                    }
                                }
                            });
                        } else {
                            performOnPrimary(shard.id(), shard);
                        }
                    } catch (Throwable t) {
                        listener.onFailure(t);
                    }
                } else {//primaryShard在其它節(jié)點上,將請求通過truansport發(fā)送到對應(yīng)的節(jié)點。
                    DiscoveryNode node = observer.observedState().nodes().get(shard.currentNodeId());
                    transportService.sendRequest(node, actionName, internalRequest.request(), transportOptions, new BaseTransportResponseHandler<Response>() {
                        @Override
                        public Response newInstance() {
                            return newResponseInstance();
                        }
                        @Override
                        public String executor() {
                            return ThreadPool.Names.SAME;
                        }
                        @Override
                        public void handleResponse(Response response) {
                            listener.onResponse(response);
                        }
                        @Override
                        public void handleException(TransportException exp) {
                            // if we got disconnected from the node, or the node / shard is not in the right state (being closed)
                            if (exp.unwrapCause() instanceof ConnectTransportException || exp.unwrapCause() instanceof NodeClosedException ||
                                    retryPrimaryException(exp)) {
                                primaryOperationStarted.set(false);
                                internalRequest.request().setCanHaveDuplicates();
                                // we already marked it as started when we executed it (removed the listener) so pass false
                                // to re-add to the cluster listener
                                logger.trace("received an error from node the primary was assigned to ({}), scheduling a retry", exp.getMessage());
                                retry(exp);
                            } else {
                                listener.onFailure(exp);
                            }
                        }
                    });
                }
                break;
            }
            ………………
        }

這就是對應(yīng)請求的處理過程。

primary操作的方法

void performOnPrimary(int primaryShardId, final ShardRouting shard) {
           ……
                PrimaryResponse<Response, ReplicaRequest> response = shardOperationOnPrimary(clusterState, new PrimaryOperationRequest(primaryShardId, internalRequest.concreteIndex(), internalRequest.request()));
                performReplicas(response);
            …………
        }

以上就是performOnPrimary方法的部分代碼,首先調(diào)用外部類的shardOperationOnPrimary方法,該方法實現(xiàn)在各個子類中,在TransportIndexAction中的實現(xiàn)如下所示:

@Override
    protected PrimaryResponse<IndexResponse, IndexRequest> shardOperationOnPrimary(ClusterState clusterState, PrimaryOperationRequest shardRequest) throws Throwable {
        final IndexRequest request = shardRequest.request;
        // 查看是否需要routing
        IndexMetaData indexMetaData = clusterState.metaData().index(shardRequest.shardId.getIndex());
        MappingMetaData mappingMd = indexMetaData.mappingOrDefault(request.type());
        if (mappingMd != null && mappingMd.routing().required()) {
            if (request.routing() == null) {
                throw new RoutingMissingException(shardRequest.shardId.getIndex(), request.type(), request.id());
            }
        }
      //調(diào)用indexserice執(zhí)行對應(yīng)的index操作
        IndexService indexService = indicesService.indexServiceSafe(shardRequest.shardId.getIndex());
        IndexShard indexShard = indexService.shardSafe(shardRequest.shardId.id());
        SourceToParse sourceToParse = SourceToParse.source(SourceToParse.Origin.PRIMARY, request.source()).type(request.type()).id(request.id())
                .routing(request.routing()).parent(request.parent()).timestamp(request.timestamp()).ttl(request.ttl());
        long version;
        boolean created;
        try {
            Engine.IndexingOperation op;
            if (request.opType() == IndexRequest.OpType.INDEX) {
                Engine.Index index = indexShard.prepareIndex(sourceToParse, request.version(), request.versionType(), Engine.Operation.Origin.PRIMARY, request.canHaveDuplicates());
                if (index.parsedDoc().mappingsModified()) {
                    mappingUpdatedAction.updateMappingOnMaster(shardRequest.shardId.getIndex(), index.docMapper(), indexService.indexUUID());
                }
                indexShard.index(index);
                version = index.version();
                op = index;
                created = index.created();
            } else {
                Engine.Create create = indexShard.prepareCreate(sourceToParse,
                        request.version(), request.versionType(), Engine.Operation.Origin.PRIMARY, request.canHaveDuplicates(), request.autoGeneratedId());
                if (create.parsedDoc().mappingsModified()) {
                    mappingUpdatedAction.updateMappingOnMaster(shardRequest.shardId.getIndex(), create.docMapper(), indexService.indexUUID());
                }
                indexShard.create(create);
                version = create.version();
                op = create;
                created = true;
            }
            if (request.refresh()) {
                try {
                    indexShard.refresh("refresh_flag_index");
                } catch (Throwable e) {
                    // ignore
                }
            }
            // update the version on the request, so it will be used for the replicas
            request.version(version);
            request.versionType(request.versionType().versionTypeForReplicationAndRecovery());
            assert request.versionType().validateVersionForWrites(request.version());
            IndexResponse response = new IndexResponse(shardRequest.shardId.getIndex(), request.type(), request.id(), version, created);
            return new PrimaryResponse<>(shardRequest.request, response, op);
        } catch (WriteFailureException e) {
            if (e.getMappingTypeToUpdate() != null) {
                DocumentMapper docMapper = indexService.mapperService().documentMapper(e.getMappingTypeToUpdate());
                if (docMapper != null) {
                    mappingUpdatedAction.updateMappingOnMaster(indexService.index().name(), docMapper, indexService.indexUUID());
                }
            }
            throw e.getCause();
        }
    }

上面的代碼就是index的執(zhí)行過程,這一過程涉及到index的底層操作,這里就不展開,只是說明它在action中是如何實現(xiàn)的,后面會有詳細說明。接下來看在副本上的操作。副本可能有多個,因此首先調(diào)用了performReplicas方法,在這個方法中首先開始監(jiān)聽集群的狀態(tài),然后便利所有的副本進行處理,如果是異步則加入一個listener,否則同步執(zhí)行返回結(jié)果。最后調(diào)用performReplica,在該方法中調(diào)用外部類的抽象方法shardOperationOnReplica。 這一過程比較簡單,這里就不再貼代碼,有興趣可以參考相關(guān)源碼。

總結(jié)

這里以TransportIndexAction為例分析了tansportaction的結(jié)構(gòu)層次。它在TransportAction直接還有一層那就是TransportShardReplicationOperationAction,這個類是actionsupport包中的一個,這個包把所有的子操作方法做了進一步的抽象,抽象出幾個大類放到了這里,所有其它子功能很多都繼承自這。這個包會在后面有詳細分析。 

以上就是elasticsearch源碼分析index action實現(xiàn)方式的詳細內(nèi)容,更多關(guān)于elasticsearch源碼分析index action的資料請關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • Java獲得當前時間前指定幾個小時具體時間的方法示例

    Java獲得當前時間前指定幾個小時具體時間的方法示例

    這篇文章主要介紹了Java獲得當前時間前指定幾個小時具體時間的方法,涉及java使用Calendar針對日期時間的相關(guān)運算與轉(zhuǎn)換操作技巧,需要的朋友可以參考下
    2017-08-08
  • ElasticSearch創(chuàng)建后索引修改數(shù)據(jù)類型方法步驟

    ElasticSearch創(chuàng)建后索引修改數(shù)據(jù)類型方法步驟

    Elasticsearch存儲數(shù)據(jù)之前需要先創(chuàng)建索引,類似于結(jié)構(gòu)型數(shù)據(jù)庫建庫建表,創(chuàng)建索引時定義了每個字段的索引方式和數(shù)據(jù)類型,這篇文章主要給大家介紹了關(guān)于ElasticSearch創(chuàng)建后索引修改數(shù)據(jù)類型的方法步驟,需要的朋友可以參考下
    2023-09-09
  • 不到十行實現(xiàn)javaCV圖片OCR文字識別

    不到十行實現(xiàn)javaCV圖片OCR文字識別

    識別圖片中的文字,會省很多時間,本文介紹了javaCV圖片OCR文字識別,需要的朋友們下面隨著小編來一起學習學習吧
    2021-05-05
  • Java堆內(nèi)存又溢出了!教你一招必殺技(推薦)

    Java堆內(nèi)存又溢出了!教你一招必殺技(推薦)

    這篇文章主要介紹了Java內(nèi)存溢出問題,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2019-04-04
  • 一篇文章帶你入門Java數(shù)據(jù)類型

    一篇文章帶你入門Java數(shù)據(jù)類型

    下面小編就為大家?guī)硪黄狫ava的基本數(shù)據(jù)類型)。小編覺得挺不錯的,現(xiàn)在就分享給大家,也給大家做個參考。一起跟隨小編過來看看吧
    2021-08-08
  • springboot項目中application.properties無法變成小樹葉問題解決方案

    springboot項目中application.properties無法變成小樹葉問題解決方案

    這篇文章主要介紹了springboot項目中application.properties無法變成小樹葉問題解決,本文通過圖文實例代碼相結(jié)合給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下
    2023-09-09
  • 詳解JDK 5 Annotation 注解之@Target的用法介紹

    詳解JDK 5 Annotation 注解之@Target的用法介紹

    這篇文章主要介紹了詳解JDK 5 Annotation 注解之@Target的用法介紹,需要的朋友可以參考下
    2016-02-02
  • Flink開發(fā)IDEA環(huán)境搭建與測試的方法

    Flink開發(fā)IDEA環(huán)境搭建與測試的方法

    這篇文章主要介紹了Flink開發(fā)IDEA環(huán)境搭建與測試的方法,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2020-03-03
  • Java設(shè)計模式之責任鏈模式

    Java設(shè)計模式之責任鏈模式

    今天小編就為大家分享一篇關(guān)于Java設(shè)計模式之責任鏈模式,小編覺得內(nèi)容挺不錯的,現(xiàn)在分享給大家,具有很好的參考價值,需要的朋友一起跟隨小編來看看吧
    2019-01-01
  • SpringMVC+MyBatis聲明式事務(wù)管理

    SpringMVC+MyBatis聲明式事務(wù)管理

    在最近的一個項目中,采用springMVC、mybatis,MySQL、tomcat,事務(wù)管理對于企業(yè)應(yīng)用來說是至關(guān)重要的,即使出現(xiàn)異常情況,它也可以保證數(shù)據(jù)的一致性。Spring Framework對事務(wù)管理提供了一致的抽象,
    2015-08-08

最新評論