elasticsearch源碼分析index?action實(shí)現(xiàn)方式
action的作用
上一篇從結(jié)構(gòu)上分析了action的,本篇將以index action為例仔分析一下action的實(shí)現(xiàn)方式。
再概括一下action的作用:對(duì)于每種功能(如index)action都會(huì)包括兩個(gè)基本的類*action(IndexAction)和Transport*action(TransportIndexAction),前者類中會(huì)有一個(gè)實(shí)例(IndexAction INSTANCE = new IndexAction())這個(gè)實(shí)例用于client綁定對(duì)應(yīng)的TransportAction(registerAction(IndexAction.INSTANCE, TransportIndexAction.class)),綁定過程發(fā)送在ActionModuel中。
另外在Action類中還會(huì)定義一個(gè)action的名字(String NAME = "indices:data/write/index")這個(gè)名字用于TransportService綁定對(duì)于的handle,用于處理NettyTransport接收到的信息。TransportAction的是最終的邏輯處理者,當(dāng)接收到請(qǐng)求時(shí),會(huì)首先判斷本節(jié)點(diǎn)能否處理,如果能夠處理則調(diào)用相關(guān)的方法處理得到結(jié)果返回,否則將通過NettyTransport轉(zhuǎn)發(fā)該請(qǐng)求到對(duì)應(yīng)的node進(jìn)行處理。所有的Transport的結(jié)構(gòu)都是這種類型。
TransportAction的類圖
首先看一下TransportAction的類圖,所的Transport*action都繼承自于它。

它主要由兩個(gè)方法execute和doExecute,execute方法有兩種實(shí)現(xiàn),第一種實(shí)現(xiàn)需要自行添加actionListener。最終的邏輯都在doExecute方法中,這個(gè)方法在各個(gè)功能模塊中實(shí)現(xiàn)。以下是TransportIndexAction的繼承關(guān)系:

實(shí)現(xiàn)上由于功能劃分的原因,TransportIndexAction直接繼承自TranspShardReplicationOperationAction,這個(gè)抽象類中的方法是所有需要操作shard副本的功能action的父,因此它的實(shí)現(xiàn)還包括delete,bulk等功能action。它實(shí)現(xiàn)了多個(gè)內(nèi)部類,這些內(nèi)部類用來輔助完成相關(guān)的功能。這里主要說一下OperationTransportHandler,ReplicaOperationTransportHandler及AsyncShardOperationAction三個(gè)子類。
OperationTransportHandler的代碼
如下所示:
class OperationTransportHandler extends BaseTransportRequestHandler<Request> {
//繼承自BaseTransportRequestHanlder
………………
@Override
public void messageReceived(final Request request, final TransportChannel channel) throws Exception {
// no need to have a threaded listener since we just send back a response
request.listenerThreaded(false);
// if we have a local operation, execute it on a thread since we don't spawn
request.operationThreaded(true);
//調(diào)用Transport的execute方法,通過channel返回結(jié)果
execute(request, new ActionListener<Response>() {
@Override
public void onResponse(Response result) {
try {
channel.sendResponse(result);
} catch (Throwable e) {
onFailure(e);
}
}
@Override
public void onFailure(Throwable e) {
try {
channel.sendResponse(e);
} catch (Throwable e1) {
logger.warn("Failed to send response for " + actionName, e1);
}
}
});
}看過NettyTransport請(qǐng)求發(fā)送和處理的同學(xué)一定對(duì)這個(gè)代碼不陌生,這就是elasticsearch節(jié)點(diǎn)間處理信息的典型模式。當(dāng)請(qǐng)求通過NettyTransport發(fā)送到本節(jié)點(diǎn)時(shí)會(huì)根據(jù)請(qǐng)求的action名稱找到對(duì)應(yīng)的handler,使用對(duì)應(yīng)的handler來處理該請(qǐng)求。這個(gè)handler就對(duì)應(yīng)著“indices:data/write/index”,可以看到它調(diào)用execute方法來處理。它的注冊(cè)時(shí)在TransportShardReplicationOperationAction構(gòu)造函數(shù)中完成的。
知道了OperationTransportHandler,ReplicaOperationTransportHandler就好理解了它的實(shí)現(xiàn)方式跟前者完全一樣,對(duì)應(yīng)的action名稱加了一個(gè)“[r]”,它的作用是處理需要在副本上進(jìn)行的操作,代碼如下所示:
class ReplicaOperationTransportHandler extends BaseTransportRequestHandler<ReplicaOperationRequest> {
……………………
@Override
public void messageReceived(final ReplicaOperationRequest request, final TransportChannel channel) throws Exception {
try {
shardOperationOnReplica(request);
} catch (Throwable t) {
failReplicaIfNeeded(request.shardId.getIndex(), request.shardId.id(), t);
throw t;
}
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}
}可以看到代碼結(jié)構(gòu)非常像,只是調(diào)用了副本操作的方法shardOperationOnReplica,這個(gè)方法在這TransportShardReplicationOperationAction中是抽象的,它的實(shí)現(xiàn)在各個(gè)子類中,例如deleteaction中實(shí)現(xiàn)了對(duì)于delete請(qǐng)求如何在副本上處理。
分析完這兩個(gè)handle是不是對(duì)于action的處理過程有了一定的眉目了呢?但是這才是冰山一角,這兩個(gè)Handler是用來接收來自其它節(jié)點(diǎn)的請(qǐng)求,如果請(qǐng)求的正好是本節(jié)點(diǎn)該如何處理呢?這些邏輯都在AsyncShardOperationAction類中。首先看一下它的內(nèi)部結(jié)構(gòu):

因?yàn)門ransportShardReplicationOperationAction的所有子類都是對(duì)索引的修改,會(huì)引起數(shù)據(jù)不一致,因此它的操作流程都是現(xiàn)在primaryShard上操作然后是Replicashard上操作。代碼如下所示:
protected void doStart() throws ElasticsearchException {
try {
//檢查是否有阻塞
ClusterBlockException blockException = checkGlobalBlock(observer.observedState());
if (blockException != null) {
if (blockException.retryable()) {
logger.trace("cluster is blocked ({}), scheduling a retry", blockException.getMessage());
retry(blockException);
return;
} else {
throw blockException;
}
}
//檢測(cè)是否是創(chuàng)建索引
if (resolveIndex()) {
internalRequest.concreteIndex(observer.observedState().metaData().concreteSingleIndex(internalRequest.request().index(), internalRequest.request().indicesOptions()));
} else {
internalRequest.concreteIndex(internalRequest.request().index());
}
// check if we need to execute, and if not, return
if (!resolveRequest(observer.observedState(), internalRequest, listener)) {
return;
}
//再次檢測(cè)是否有阻塞
blockException = checkRequestBlock(observer.observedState(), internalRequest);
if (blockException != null) {
if (blockException.retryable()) {
logger.trace("cluster is blocked ({}), scheduling a retry", blockException.getMessage());
retry(blockException);
return;
} else {
throw blockException;
}
}
shardIt = shards(observer.observedState(), internalRequest);
} catch (Throwable e) {
listener.onFailure(e);
return;
}
//查找primaryShard
boolean foundPrimary = false;
ShardRouting shardX;
while ((shardX = shardIt.nextOrNull()) != null) {
final ShardRouting shard = shardX;
// we only deal with primary shardIt here...
if (!shard.primary()) {
continue;
}
if (!shard.active() || !observer.observedState().nodes().nodeExists(shard.currentNodeId())) {
logger.trace("primary shard [{}] is not yet active or we do not know the node it is assigned to [{}], scheduling a retry.", shard.shardId(), shard.currentNodeId());
retryBecauseUnavailable(shardIt.shardId(), "Primary shard is not active or isn't assigned to a known node.");
return;
}
if (!primaryOperationStarted.compareAndSet(false, true)) {
return;
}
foundPrimary = true;
//primaryShard就在本地,直接進(jìn)行相關(guān)操作
if (shard.currentNodeId().equals(observer.observedState().nodes().localNodeId())) {
try {
if (internalRequest.request().operationThreaded()) {
internalRequest.request().beforeLocalFork();
threadPool.executor(executor).execute(new Runnable() {
@Override
public void run() {
try {
performOnPrimary(shard.id(), shard);
} catch (Throwable t) {
listener.onFailure(t);
}
}
});
} else {
performOnPrimary(shard.id(), shard);
}
} catch (Throwable t) {
listener.onFailure(t);
}
} else {//primaryShard在其它節(jié)點(diǎn)上,將請(qǐng)求通過truansport發(fā)送到對(duì)應(yīng)的節(jié)點(diǎn)。
DiscoveryNode node = observer.observedState().nodes().get(shard.currentNodeId());
transportService.sendRequest(node, actionName, internalRequest.request(), transportOptions, new BaseTransportResponseHandler<Response>() {
@Override
public Response newInstance() {
return newResponseInstance();
}
@Override
public String executor() {
return ThreadPool.Names.SAME;
}
@Override
public void handleResponse(Response response) {
listener.onResponse(response);
}
@Override
public void handleException(TransportException exp) {
// if we got disconnected from the node, or the node / shard is not in the right state (being closed)
if (exp.unwrapCause() instanceof ConnectTransportException || exp.unwrapCause() instanceof NodeClosedException ||
retryPrimaryException(exp)) {
primaryOperationStarted.set(false);
internalRequest.request().setCanHaveDuplicates();
// we already marked it as started when we executed it (removed the listener) so pass false
// to re-add to the cluster listener
logger.trace("received an error from node the primary was assigned to ({}), scheduling a retry", exp.getMessage());
retry(exp);
} else {
listener.onFailure(exp);
}
}
});
}
break;
}
………………
}這就是對(duì)應(yīng)請(qǐng)求的處理過程。
primary操作的方法
void performOnPrimary(int primaryShardId, final ShardRouting shard) {
……
PrimaryResponse<Response, ReplicaRequest> response = shardOperationOnPrimary(clusterState, new PrimaryOperationRequest(primaryShardId, internalRequest.concreteIndex(), internalRequest.request()));
performReplicas(response);
…………
}以上就是performOnPrimary方法的部分代碼,首先調(diào)用外部類的shardOperationOnPrimary方法,該方法實(shí)現(xiàn)在各個(gè)子類中,在TransportIndexAction中的實(shí)現(xiàn)如下所示:
@Override
protected PrimaryResponse<IndexResponse, IndexRequest> shardOperationOnPrimary(ClusterState clusterState, PrimaryOperationRequest shardRequest) throws Throwable {
final IndexRequest request = shardRequest.request;
// 查看是否需要routing
IndexMetaData indexMetaData = clusterState.metaData().index(shardRequest.shardId.getIndex());
MappingMetaData mappingMd = indexMetaData.mappingOrDefault(request.type());
if (mappingMd != null && mappingMd.routing().required()) {
if (request.routing() == null) {
throw new RoutingMissingException(shardRequest.shardId.getIndex(), request.type(), request.id());
}
}
//調(diào)用indexserice執(zhí)行對(duì)應(yīng)的index操作
IndexService indexService = indicesService.indexServiceSafe(shardRequest.shardId.getIndex());
IndexShard indexShard = indexService.shardSafe(shardRequest.shardId.id());
SourceToParse sourceToParse = SourceToParse.source(SourceToParse.Origin.PRIMARY, request.source()).type(request.type()).id(request.id())
.routing(request.routing()).parent(request.parent()).timestamp(request.timestamp()).ttl(request.ttl());
long version;
boolean created;
try {
Engine.IndexingOperation op;
if (request.opType() == IndexRequest.OpType.INDEX) {
Engine.Index index = indexShard.prepareIndex(sourceToParse, request.version(), request.versionType(), Engine.Operation.Origin.PRIMARY, request.canHaveDuplicates());
if (index.parsedDoc().mappingsModified()) {
mappingUpdatedAction.updateMappingOnMaster(shardRequest.shardId.getIndex(), index.docMapper(), indexService.indexUUID());
}
indexShard.index(index);
version = index.version();
op = index;
created = index.created();
} else {
Engine.Create create = indexShard.prepareCreate(sourceToParse,
request.version(), request.versionType(), Engine.Operation.Origin.PRIMARY, request.canHaveDuplicates(), request.autoGeneratedId());
if (create.parsedDoc().mappingsModified()) {
mappingUpdatedAction.updateMappingOnMaster(shardRequest.shardId.getIndex(), create.docMapper(), indexService.indexUUID());
}
indexShard.create(create);
version = create.version();
op = create;
created = true;
}
if (request.refresh()) {
try {
indexShard.refresh("refresh_flag_index");
} catch (Throwable e) {
// ignore
}
}
// update the version on the request, so it will be used for the replicas
request.version(version);
request.versionType(request.versionType().versionTypeForReplicationAndRecovery());
assert request.versionType().validateVersionForWrites(request.version());
IndexResponse response = new IndexResponse(shardRequest.shardId.getIndex(), request.type(), request.id(), version, created);
return new PrimaryResponse<>(shardRequest.request, response, op);
} catch (WriteFailureException e) {
if (e.getMappingTypeToUpdate() != null) {
DocumentMapper docMapper = indexService.mapperService().documentMapper(e.getMappingTypeToUpdate());
if (docMapper != null) {
mappingUpdatedAction.updateMappingOnMaster(indexService.index().name(), docMapper, indexService.indexUUID());
}
}
throw e.getCause();
}
}上面的代碼就是index的執(zhí)行過程,這一過程涉及到index的底層操作,這里就不展開,只是說明它在action中是如何實(shí)現(xiàn)的,后面會(huì)有詳細(xì)說明。接下來看在副本上的操作。副本可能有多個(gè),因此首先調(diào)用了performReplicas方法,在這個(gè)方法中首先開始監(jiān)聽集群的狀態(tài),然后便利所有的副本進(jìn)行處理,如果是異步則加入一個(gè)listener,否則同步執(zhí)行返回結(jié)果。最后調(diào)用performReplica,在該方法中調(diào)用外部類的抽象方法shardOperationOnReplica。 這一過程比較簡(jiǎn)單,這里就不再貼代碼,有興趣可以參考相關(guān)源碼。
總結(jié)
這里以TransportIndexAction為例分析了tansportaction的結(jié)構(gòu)層次。它在TransportAction直接還有一層那就是TransportShardReplicationOperationAction,這個(gè)類是actionsupport包中的一個(gè),這個(gè)包把所有的子操作方法做了進(jìn)一步的抽象,抽象出幾個(gè)大類放到了這里,所有其它子功能很多都繼承自這。這個(gè)包會(huì)在后面有詳細(xì)分析。
以上就是elasticsearch源碼分析index action實(shí)現(xiàn)方式的詳細(xì)內(nèi)容,更多關(guān)于elasticsearch源碼分析index action的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
- elasticsearch索引創(chuàng)建create?index集群matedata更新
- elasticsearch索引的創(chuàng)建過程index?create邏輯分析
- elasticsearch索引index之merge底層機(jī)制的合并講解
- elasticsearch索引index之Mapping實(shí)現(xiàn)關(guān)系結(jié)構(gòu)示例
- elasticsearch索引index之engine讀寫控制結(jié)構(gòu)實(shí)現(xiàn)
- elasticsearch索引index之Translog數(shù)據(jù)功能分析
- elasticsearch索引index數(shù)據(jù)功能源碼示例
- elasticsearch索引index之put?mapping的設(shè)置分析
相關(guān)文章
Java獲得當(dāng)前時(shí)間前指定幾個(gè)小時(shí)具體時(shí)間的方法示例
這篇文章主要介紹了Java獲得當(dāng)前時(shí)間前指定幾個(gè)小時(shí)具體時(shí)間的方法,涉及java使用Calendar針對(duì)日期時(shí)間的相關(guān)運(yùn)算與轉(zhuǎn)換操作技巧,需要的朋友可以參考下2017-08-08
ElasticSearch創(chuàng)建后索引修改數(shù)據(jù)類型方法步驟
Elasticsearch存儲(chǔ)數(shù)據(jù)之前需要先創(chuàng)建索引,類似于結(jié)構(gòu)型數(shù)據(jù)庫建庫建表,創(chuàng)建索引時(shí)定義了每個(gè)字段的索引方式和數(shù)據(jù)類型,這篇文章主要給大家介紹了關(guān)于ElasticSearch創(chuàng)建后索引修改數(shù)據(jù)類型的方法步驟,需要的朋友可以參考下2023-09-09
不到十行實(shí)現(xiàn)javaCV圖片OCR文字識(shí)別
識(shí)別圖片中的文字,會(huì)省很多時(shí)間,本文介紹了javaCV圖片OCR文字識(shí)別,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2021-05-05
springboot項(xiàng)目中application.properties無法變成小樹葉問題解決方案
這篇文章主要介紹了springboot項(xiàng)目中application.properties無法變成小樹葉問題解決,本文通過圖文實(shí)例代碼相結(jié)合給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2023-09-09
詳解JDK 5 Annotation 注解之@Target的用法介紹
這篇文章主要介紹了詳解JDK 5 Annotation 注解之@Target的用法介紹,需要的朋友可以參考下2016-02-02
Flink開發(fā)IDEA環(huán)境搭建與測(cè)試的方法
這篇文章主要介紹了Flink開發(fā)IDEA環(huán)境搭建與測(cè)試的方法,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2020-03-03

