96SEO 2026-02-20 09:22 0
andler2、往数据节点发送查询请求的action(TransportSearchAction)3、通过transportService把查询请求发送到指定的数据节点

二、数据节点收到请求的处理逻辑1、尝试从缓存中加载查询结果2、不通过缓存查询直接执行查询(1)executeQuery和executeRank两种查询方式(2)、根据搜索上下文在查询之前添加各种查询搜集器(3)
这里只是HTTP发送查询请求到主节点主节点再转发到数据节点数据节点再到调用lucene.search实际查询数据之前如何处理的源码逻辑
RestSearchAction(restController.getSearchUsageHolder()));actions.register(SearchAction.INSTANCE,
下面这个类RestSearchAction有长该省略的方法我都已经省略了首先通过routes请求到这个方法的prepareRequest(主要是组装searchRequest)这个方法内部会调用parseSearchSource(主要是组装searchSourceBuilder)
/{index}/_search),Route.builder(GET,
/{index}/{type}/_search).deprecated(TYPES_DEPRECATION_MESSAGE,
RestApiVersion.V_7).build(),Route.builder(POST,
/{index}/{type}/_search).deprecated(TYPES_DEPRECATION_MESSAGE,
RestApiVersion.V_7).build());}//入口
(request.hasParam(min_compatible_shard_node))
SearchRequest(Version.fromString(request.param(min_compatible_shard_node)));}
null。
*///组装SearchRequestIntConsumer
searchRequest.source().size(size);request.withContentOrSourceParamParserOrNull(parser
parseSearchRequest(searchRequest,
client.getNamedWriteableRegistry(),
searchUsageHolder));//请求发送return
RestCancellableNodeClient(client,
request.getHttpChannel());cancelClient.execute(SearchAction.INSTANCE,
RestChunkedToXContentListener(channel));};}//组装searchRequestpublic
parseSearchRequest(SearchRequest
requestContentParser,NamedWriteableRegistry
namedWriteableRegistry,IntConsumer
{request.param(type);deprecationLogger.compatibleCritical(search_with_types,
TYPES_DEPRECATION_MESSAGE);}//如果搜索请求的源为空创建一个新的
SearchSourceBuilder());}//将请求中的索引参数解析为一个索引数组并设置到搜索请求中。
searchRequest.indices(Strings.splitStringByCommaToArray(request.param(index)));//如果提供了
{searchRequest.source().parseXContent(requestContentParser,
{searchRequest.source().parseXContent(requestContentParser,
searchUsageHolder);}}//设置批量减少大小参数。
final
request.paramAsInt(batched_reduce_size,
searchRequest.getBatchedReduceSize());searchRequest.setBatchedReduceSize(batchedReduceSize);//如果请求中包含了
(request.hasParam(pre_filter_shard_size))
{searchRequest.setPreFilterShardSize(request.paramAsInt(pre_filter_shard_size,
SearchRequest.DEFAULT_PRE_FILTER_SHARD_SIZE));}//如果请求中包含了
(request.hasParam(enable_fields_emulation))
errorsrequest.paramAsBoolean(enable_fields_emulation,
max_concurrent_shard_requests(最大并发分片请求数)
(request.hasParam(max_concurrent_shard_requests))
request.paramAsInt(max_concurrent_shard_requests,searchRequest.getMaxConcurrentShardRequests());searchRequest.setMaxConcurrentShardRequests(maxConcurrentShardRequests);}//如果请求中包含了
allow_partial_search_results(允许部分搜索结果)
(request.hasParam(allow_partial_search_results))
{//仅当我们传递了参数以覆盖集群级默认值时才设置searchRequest.allowPartialSearchResults(request.paramAsBoolean(allow_partial_search_results,
null));}//设置搜索类型参数。
searchRequest.searchType(request.param(search_type));//调用
方法解析搜索源。
parseSearchSource(searchRequest.source(),
setSize);//设置请求缓存参数searchRequest.requestCache(request.paramAsBoolean(request_cache,
searchRequest.requestCache()));//解析并设置滚动参数。
String
scroll)));}//设置路由参数。
searchRequest.routing(request.param(routing));//设置首选项参数。
searchRequest.preference(request.param(preference));//设置索引选项。
searchRequest.indicesOptions(IndicesOptions.fromRequest(request,
searchRequest.indicesOptions()));//验证搜索请求。
validateSearchRequest(request,
(searchRequest.pointInTimeBuilder()
{preparePointInTime(searchRequest,
参数searchRequest.setCcsMinimizeRoundtrips(request.paramAsBoolean(ccs_minimize_roundtrips,
searchRequest.isCcsMinimizeRoundtrips()));}//如果请求中包含了
(request.paramAsBoolean(force_synthetic_source,
{searchRequest.setForceSyntheticSource(true);}}//组装searchSourceBuilderprivate
{//RestRequest对象的URL参数转换为QueryBuilder对象QueryBuilder
RestActions.urlParamsToQueryBuilder(request);//并将其设置为SearchSourceBuilder对象的查询条件if
{searchSourceBuilder.query(queryBuilder);}//如果RestRequest对象包含from参数则将其转换为整数并设置为SearchSourceBuilder对象的from属性if
{searchSourceBuilder.from(request.paramAsInt(from,
treatmentdeprecationLogger.compatibleCritical(search-api-size-1,Using
{searchSourceBuilder.explain(request.paramAsBoolean(explain,
{searchSourceBuilder.version(request.paramAsBoolean(version,
(request.hasParam(seq_no_primary_term))
{searchSourceBuilder.seqNoAndPrimaryTerm(request.paramAsBoolean(seq_no_primary_term,
{searchSourceBuilder.timeout(request.paramAsTime(timeout,
(request.hasParam(terminate_after))
request.paramAsInt(terminate_after,
SearchContext.DEFAULT_TERMINATE_AFTER);searchSourceBuilder.terminateAfter(terminateAfter);}StoredFieldsContext
StoredFieldsContext.fromRestRequest(SearchSourceBuilder.STORED_FIELDS_FIELD.getPreferredName(),request);if
{searchSourceBuilder.storedFields(storedFieldsContext);}String
request.param(docvalue_fields);if
(Strings.hasText(sDocValueFields))
Strings.splitStringByCommaToArray(sDocValueFields);for
{searchSourceBuilder.docValueField(field,
FetchSourceContext.parseFromRestRequest(request);if
{searchSourceBuilder.fetchSource(fetchSourceContext);}if
(request.hasParam(track_scores))
{searchSourceBuilder.trackScores(request.paramAsBoolean(track_scores,
(request.hasParam(track_total_hits))
(Booleans.isBoolean(request.param(track_total_hits)))
{searchSourceBuilder.trackTotalHits(request.paramAsBoolean(track_total_hits,
{searchSourceBuilder.trackTotalHitsUpTo(request.paramAsInt(track_total_hits,
SearchContext.DEFAULT_TRACK_TOTAL_HITS_UP_TO));}}String
Strings.splitStringByCommaToArray(sSorts);for
{searchSourceBuilder.sort(sortField,
{searchSourceBuilder.sort(sortField,
{searchSourceBuilder.sort(sort);}}}String
{searchSourceBuilder.stats(Arrays.asList(Strings.splitStringByCommaToArray(sStats)));}SuggestBuilder
parseSuggestUrlParameters(request);if
{searchSourceBuilder.suggest(suggestBuilder);}}
2、往数据节点发送查询请求的action(TransportSearchAction)
下面这个TransportSearchAction也有点长主要流程是doExecute-executeLocalSearch-executeSearch-接口SearchPhaseProvider的实现类AsyncSearchActionProvider
HandledTransportActionSearchRequest,
AsyncSearchActionProvider::new);}//主要功能是执行搜索请求并根据不同的情况选择执行本地搜索或远程搜索void
original,ActionListenerSearchResponse
listener,FunctionActionListenerSearchResponse,
SearchTimeProvider(original.getOrCreateAbsoluteStartMillis(),relativeStartNanos,System::nanoTime);//使用重写监听器对搜索请求进行重写并根据重写后的请求获取搜索上下文和远程集群索引。
ActionListenerSearchRequest
{checkCCSVersionCompatibility(rewritten);}if
(rewritten.pointInTimeBuilder()
rewritten.pointInTimeBuilder().getSearchContextId(namedWriteableRegistry);remoteClusterIndices
getIndicesFromSearchContexts(searchContext,
remoteClusterService.groupIndices(rewritten.indicesOptions(),
remoteClusterService.groupIndices(rewritten.indicesOptions(),
RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY
remoteClusterIndices.remove(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY);//获取当前集群状态
clusterService.state();//如果远程集群索引为空则执行本地搜索if
(remoteClusterIndices.isEmpty())
{executeLocalSearch(task,timeProvider,rewritten,localIndices,clusterState,SearchResponse.Clusters.EMPTY,searchContext,searchPhaseProvider.apply(listener));}
{//如果远程集群索引不为空则根据是否需要最小化往返次数选择执行远程搜索或本地搜索。
//省略目前不涉及到远程集群}}},
listener::onFailure);Rewriteable.rewriteAndFetch(original,
searchService.getRewriteContext(timeProvider::absoluteStartMillis),
clusterState,SearchResponse.Clusters
searchContext,SearchPhaseProvider
task,timeProvider,searchRequest,localIndices,Collections.emptyList(),(clusterName,
null,clusterState,Collections.emptyMap(),clusterInfo,searchContext,searchPhaseProvider);}private
localIndices,ListSearchShardIterator
remoteShardIterators,BiFunctionString,
remoteAliasMap,SearchResponse.Clusters
searchContext,SearchPhaseProvider
{//检查全局集群阻塞状态是否允许读取操作clusterState.blocks().globalBlockedRaiseException(ClusterBlockLevel.READ);//检查搜索请求中是否定义了allowPartialSearchResults(允许部分搜索结果)参数如果没有则应用集群服务的默认设置。
if
(searchRequest.allowPartialSearchResults()
{//默认允许部分搜索结果searchRequest.allowPartialSearchResults(searchService.defaultAllowPartialSearchResults());}final
concreteLocalIndices;//根据搜索上下文的存在与否确定本地和远程的索引和别名过滤器。
if
searchRequest.pointInTimeBuilder()
searchContext.aliasFilter();concreteLocalIndices
localIndices.indices();localShardIterators
getLocalLocalShardsIteratorFromPointInTime(clusterState,localIndices,searchRequest.getLocalClusterAlias(),searchContext,searchRequest.pointInTimeBuilder().getKeepAlive(),searchRequest.allowPartialSearchResults());}
{//解析本地索引获取Index对象数组indices。
final
resolveLocalIndices(localIndices,
timeProvider);//将indices数组中的每个Index对象的名称提取出来并存储在concreteLocalIndices数组中。
concreteLocalIndices
Arrays.stream(indices).map(Index::getName).toArray(String[]::new);//解析索引名称表达式获取与搜索请求中的索引相关的索引和别名的集合indicesAndAliases。
final
indexNameExpressionResolver.resolveExpressions(clusterState,
searchRequest.indices());//构建索引别名过滤器aliasFilter
buildIndexAliasFilters(clusterState,
indices);//将remoteAliasMap中的所有映射添加到aliasFilter中aliasFilter.putAll(remoteAliasMap);//取本地分片迭代器localShardIteratorslocalShardIterators
getLocalShardsIterator(clusterState,searchRequest,searchRequest.getLocalClusterAlias(),indicesAndAliases,concreteLocalIndices);}//合并创建一个GroupShardsIteratorSearchShardIterator对象并赋值给shardIterators变量。
final
GroupShardsIteratorSearchShardIterator
mergeShardsIterators(localShardIterators,
remoteShardIterators);//检查shardIterators的大小是否超过了集群设定的分片数量限制如果超过则抛出异常。
failIfOverShardCountLimit(clusterService,
shardIterators.size());//WaitForCheckpointsbuwei1if
(searchRequest.getWaitForCheckpoints().isEmpty()
(remoteShardIterators.isEmpty()
IllegalArgumentException(Cannot
{validateAndResolveWaitForCheckpoint(clusterState,
concreteLocalIndices);}}MapString,
resolveIndexBoosts(searchRequest,
clusterState);//shardIterators的大小调整搜索类型。
adjustSearchType(searchRequest,
clusterState.nodes();//构建一个连接查询函数connectionLookup用于根据索引和节点名称获取连接对象。
BiFunctionString,
buildConnectionLookup(searchRequest.getLocalClusterAlias(),nodes::get,remoteConnections,searchTransportService::getConnection);//创建一个异步搜索执行器asyncSearchExecutor用于执行异步搜索。
final
asyncSearchExecutor(concreteLocalIndices);//根据条件判断是否需要预过滤搜索分片。
final
shouldPreFilterSearchShards(clusterState,searchRequest,concreteLocalIndices,localShardIterators.size()
remoteShardIterators.size(),defaultPreFilterShardSize);//调用searchPhaseProvider的newSearchPhase方法开始执行搜索阶段//searchPhaseProvider的实现用的是AsyncSearchActionProvidersearchPhaseProvider.newSearchPhase(task,searchRequest,asyncSearchExecutor,shardIterators,timeProvider,connectionLookup,clusterState,Collections.unmodifiableMap(aliasFilter),concreteIndexBoosts,preFilterSearchShards,threadPool,clusters).start();}//一个接口interface
executor,GroupShardsIteratorSearchShardIterator
shardIterators,SearchTimeProvider
threadPool,SearchResponse.Clusters
clusters);}//接口SearchPhaseProvider的一个实现类private
listener;AsyncSearchActionProvider(ActionListenerSearchResponse
executor,GroupShardsIteratorSearchShardIterator
shardIterators,SearchTimeProvider
threadPool,SearchResponse.Clusters
searchPhaseController.newSearchPhaseResults(executor,circuitBreaker,task::isCancelled,task.getProgressListener(),searchRequest,shardIterators.size(),exc
searchTransportService.cancelSearchTask(task,
searchRequest.searchType();return
SearchQueryThenFetchAsyncAction(logger,searchTransportService,connectionLookup,aliasFilter,concreteIndexBoosts,executor,queryResultConsumer,searchRequest,listener,shardIterators,timeProvider,clusterState,task,clusters);}}}}
相同但初始散射阶段除外该阶段用于计算分布项频率以实现更准确的评分。
*/DFS_QUERY_THEN_FETCH((byte)
对所有分片执行查询但仅返回足够的信息而不是文档内容。
然后对结果进行排序和排名并基于此*
仅要求相关分片提供实际文档内容。
返回的命中数与大小中指定的命中数完全相同因为它们是唯一被提取的命中数。
当索引有很多分片不是副本、分片
组时这非常方便。
*/QUERY_THEN_FETCH((byte)
}SearchQueryThenFetchAsyncAction的实现如下
SearchQueryThenFetchAsyncAction
AbstractSearchAsyncActionSearchPhaseResult
bottomSortCollector;SearchQueryThenFetchAsyncAction(final
GroupShardsIteratorSearchShardIterator
TransportSearchAction.SearchTimeProvider
{super(query,logger,searchTransportService,nodeIdToConnection,aliasFilter,concreteIndexBoosts,executor,request,listener,shardsIts,timeProvider,clusterState,task,resultConsumer,request.getMaxConcurrentShardRequests(),clusters);//省略代码}//父类的performPhaseOnShard方法会调用这个方法protected
SearchActionListenerSearchPhaseResult
rewriteShardSearchRequest(super.buildShardSearchRequest(shardIt,
listener.requestIndex));getSearchTransport().sendExecuteQuery(getConnection(shard.getClusterAlias(),
}其中上面的executeSearch方法中searchPhaseProvider.newSearchPhase().start()实际执行的是SearchQueryThenFetchAsyncAction的父类AbstractSearchAsyncAction中的start方法
AbstractSearchAsyncActionResult
这是搜索的主要入口点。
此方法启动初始阶段的搜索执行。
*/public
{//省略return;}executePhase(this);}private
{//toSkipShardsIts中的每个SearchShardIterator对象调用skip()方法并断言其返回值为true然后调用skipShard()方法for
iterator.skip();skipShard(iterator);}//如果shardsIts的大小大于0if
(request.allowPartialSearchResults()
{//省略代码}//对于shardsIts中的每个索引获取对应的SearchShardIterator对象shardRoutings然后执行performPhaseOnShard()方法。
//这里会遍历每一个分片for
shardIndexMap.containsKey(shardRoutings);int
shardIndexMap.get(shardRoutings);performPhaseOnShard(shardIndex,
shardRoutings.nextOrNull());}}}protected
{//该分片未分配给任何节点会触发onShardFailure方法处理该情况assert
assertExecuteOnStartThread();SearchShardTarget
shardIt.getClusterAlias());onShardFailure(shardIndex,
NoShardAvailableActionException(shardIt.shardId()));}
{//创建一个Runnable对象并执行executePhaseOnShard方法来在分片上执行搜索操作。
final
pendingExecutionsPerNode.computeIfAbsent(shard.getNodeId(),
PendingExecutions(maxConcurrentRequestsPerNode)):
SearchActionListenerResult(shard,
{//定义了一个SearchActionListener的匿名子类用于处理搜索操作的响应。
Overridepublic
{在响应成功时会调用onShardResult方法处理搜索结果onShardResult(result,
{//在响应失败时会调用onShardFailure方法处理错误情况onShardFailure(shardIndex,
{//无论成功还是失败最后都会调用executeNext方法执行下一个操作。
executeNext(pendingExecutions,
{executeNext(pendingExecutions,
{executeNext(pendingExecutions,
thread);}}};//如果throttleConcurrentRequests为true则会使用pendingExecutions对象来限制并发请求的数量。
否则直接执行r.run()方法。
if
这个抽象方法由子类SearchQueryThenFetchAsyncAction实现*
executePhaseOnShard(SearchShardIterator
shard,SearchActionListenerResult
}3、通过transportService把查询请求发送到指定的数据节点
registerRequestHandler(TransportService
{transportService.registerRequestHandler(QUERY_ACTION_NAME,ThreadPool.Names.SAME,ShardSearchRequest::new,(request,
searchService.executeQueryPhase(request,(SearchShardTask)
ChannelActionListener(channel)));
sendExecuteQuery(Transport.Connection
anymore.//我们对此进行了优化如果我们在搜索请求中只有一个分片则期望
QueryFetchSearchResult这曾经是不再存在的QUERY_AND_FETCH。
final
null);Writeable.ReaderSearchPhaseResult
responseWrapper.apply(connection,
listener);//上面根据QUERY_ACTION_NAME注册的实际调用的是
searchService.executeQueryPhasetransportService.sendChildRequest(connection,QUERY_ACTION_NAME,request,task,new
ConnectionCountingHandler(handler,
connection.getNode().getId()));}二、数据节点收到请求的处理逻辑
executeQueryPhase(ShardSearchRequest
ActionListenerSearchPhaseResult
request.canReturnNullResponseIfMatchNoDocs()
shard;//根据request对象获取一个IndexShard对象final
getShard(request);//调用rewriteAndFetchShardRequest方法对shard和request进行重写和获取请求。
rewriteAndFetchShardRequest(shard,
(orig.canReturnNullResponseIfMatchNoDocs())
{//创建一个ShardSearchRequest对象的副本clone并调用canMatch方法进行匹配检查。
ShardSearchRequest
ShardSearchRequest(orig);canMatchResp
{l.onResponse(QuerySearchResult.nullInstance());return;}}//其中会执行executeQueryPhase方法的递归调用。
ensureAfterSeqNoRefreshed(shard,
的引用计数将通过此方法递增。
调用方有责任确保在不再需要对象时正确递减引用计数。
*/private
executeQueryPhase(ShardSearchRequest
createOrGetReaderContext(request);try
(//创建SearchContext对象并设置相关参数。
Releasable
tracer.withScope(task);Releasable
readerContext.markAsUsed(getKeepAlive(request));SearchContext
{//开始跟踪执行查询阶段。
tracer.startTrace(executeQueryPhase,
afterQueryTime;//使用SearchOperationListenerExecutor执行加载或执行查询阶段的操作。
try
(SearchOperationListenerExecutor
SearchOperationListenerExecutor(context))
{loadOrExecuteQueryPhase(request,
context);//检查查询结果是否具有搜索上下文并根据需要释放ReaderContext对象。
if
(context.queryResult().hasSearchContext()
{freeReaderContext(readerContext.id());}afterQueryTime
{//停止跟踪执行查询阶段。
tracer.stopTrace();}//根据条件判断是否需要执行提取阶段if
fetchcontext.addFetchResult();//如果需要执行提取阶段则将提取结果添加到SearchContext对象并调用executeFetchPhase方法执行提取阶段。
return
executeFetchPhase(readerContext,
{//将RescoreDocIds对象传递给queryResult并返回context.queryResult()。
//将
queryResult以将它们发送到协调节点并在提取阶段接收它们。
我们还将
LegacyReaderContext以防搜索状态需要保留在数据节点中final
context.rescoreDocIds();context.queryResult().setRescoreDocIds(rescoreDocIds);readerContext.setRescoreDocIds(rescoreDocIds);context.queryResult().incRef();return
ElasticsearchException(e.getCause());}logger.trace(Query
e);processFailure(readerContext,
QuerySearchResultnullInstance}*
false因为协调器节点至少需要一个分片响应来构建全局响应。
*/public
canReturnNullResponseIfMatchNoDocs()
canReturnNullResponseIfMatchNoDocs;}1、尝试从缓存中加载查询结果
如果无法使用缓存请尝试从缓存加载查询结果或直接执行查询阶段。
*/private
IndicesService.canCache(request,
context);context.getSearchExecutionContext().freezeContext();if
{indicesService.loadIntoContext(request,
{QueryPhase.execute(context);}}/***
加载缓存结果根据需要通过执行查询阶段进行计算否则将缓存值反序列化为
的组合允许进行单个加载操作这将导致具有相同密钥的其他请求等待直到其加载并重用相同的缓存。
*/public
loadIntoContext(ShardSearchRequest
的搜索器searcher的阅读器对象BytesReference
//cacheKey缓存键CheckedConsumerStreamOutput,
//一个带有StreamOutput参数的回调函数用于加载数据)
{//创建一个IndexShardCacheEntity对象用于表示索引分片的缓存实体IndexShardCacheEntity
IndexShardCacheEntity(shard);//创建一个CheckedSupplier对象用于生成缓存数据。
CheckedSupplierBytesReference,
{//这个对象内部使用BytesStreamOutput它允许指定预期的字节大小//
但默认使用16k作为页面大小。
为了避免对小查询结果浪费太多内存将预期大小设置为512字节。
final
512;//在BytesStreamOutput中执行loader回调函数将数据写入输出流中try
BytesStreamOutput(expectedSizeInBytes))
{loader.accept(out);//将输出流的字节表示返回作为缓存数据。
return
out.bytes();}};//通过调用indicesRequestCache.getOrCompute方法使用缓存实体、缓存数据生成器、映射缓存键、目录阅读器和缓存键作为参数获取或计算缓存数据return
indicesRequestCache.getOrCompute(cacheEntity,
cacheKey);}这里要知道supplier内部会执行loader.accept(out);而传过来的loader是如下
{QueryPhase.execute(context);context.queryResult().writeToNoId(out);loadedFromCache[0]
false;}其实意味着如果执行了loader说明缓存中没有而是直接查询的继续往下
cacheEntity,CheckedSupplierBytesReference,
mappingCacheKey,DirectoryReader
ElasticsearchDirectoryReader.getESReaderCacheHelper(reader);assert
{key.entity.onMiss();//看看这是否是我们第一次看到这个读取器并确保注册一个清理密钥CleanupKey
(registeredClosedListeners.containsKey(cleanupKey)
registeredClosedListeners.putIfAbsent(cleanupKey,
{cacheHelper.addClosedListener(cleanupKey);}}}
null则尝试使用给定的映射函数计算其值并将其输入到此映射中除非为
而第二个加载器函数将返回第一个加载器函数提供的结果包括执行第一个加载器函数期间引发的任何异常*/public
fail//尝试从缓存中获取与给定键关联的值如果值已过期则会在获取前将其删除。
V
时按住段锁可能会导致由于依赖键加载而对另一个线程进行死锁;//
中//首先获取与给定键关联的缓存段CacheSegment。
CacheSegment
getCacheSegment(key);CompletableFutureEntryK,
future;//创建一个CompletableFuture对象用于在加载完成后获取值。
CompletableFutureEntryK,
CompletableFuture();//使用段锁将键和CompletableFuture对象放入段的映射Map中。
try
completableFuture);}BiFunction?
(segment.writeLock.acquire())并使用try-with-resources语句来确保锁被释放。
try
{//检查segment.map是否为空如果不为空则尝试从segment.map中获取与key对应的CompletableFutureEntryK,
sanity.isCompletedExceptionally())
{//如果sanity不为空且已经完成异常则从segment.map中移除key。
segment.map.remove(key);if
{//如果segment.map为空则将其赋值为null。
segment.map
completableValue;//如果该键之前不存在映射则说明当前线程赢得了竞争需要执行加载操作。
if
completableFuture;completableValue
loaded;//调用加载器的load方法加载值并将其放入CompletableFuture对象中。
try
{future.completeExceptionally(e);throw
value);future.completeExceptionally(npe);throw
{//将加载的值包装成一个Entry对象并完成CompletableFuture对象。
future.complete(new
{//说明该键存在映射直接调用completableValue
future.handle(handler);}//通过completableValue.get()获取加载完成的值try
(future.isCompletedExceptionally())
IllegalStateException(e);}}return
value;}这里面在从缓存中没有得到指定的CacheSegment
里面实际调用的是QueryPhase.execute(context);
这里就看一下QueryPhase.execute(context);的实现源码
//搜索请求的查询阶段用于运行查询并从每个分片中获取有关匹配文档的信息*/
(searchContext.rankShardContext()
{executeRank(searchContext);}}(1)executeQuery和executeRank两种查询方式
{//获取排名的上下文信息和查询结果信息RankShardContext
searchContext.rankShardContext();QuerySearchResult
searchContext.queryResult();//然后根据条件判断是否需要执行组合布尔查询以获取总命中数或聚合结果,if
(searchContext.trackTotalHitsUpTo()
{//需要的话则size0再执行executeQuery,来获取总命中数和聚合结果searchContext.size(0);QueryPhase.executeQuery(searchContext);}
{//将查询结果的topDocs设置为空即命中文档为空。
searchContext.queryResult().topDocs(new
DocValueFormat[0]);}ListTopDocs
querySearchResult.searchTimedOut();long
querySearchResult.serviceTimeEWMA();int
querySearchResult.nodeQueueSize();//迭代rankShardContext.queries()中的每个排名查询来执行排名操作for
{break;}//对于每个排名查询创建一个RankSearchContext对象RankSearchContext
RankSearchContext(searchContext,
rankShardContext.windowSize());//并添加收集器和搜索操作QueryPhase.addCollectorsAndSearch(rankSearchContext);//然后将查询结果添加到rrfRankResults列表中并更新服务时间、节点队列大小和搜索超时的状态。
QuerySearchResult
rankSearchContext.queryResult();rrfRankResults.add(rrfQuerySearchResult.topDocs().topDocs);serviceTimeEWMA
rrfQuerySearchResult.serviceTimeEWMA();nodeQueueSize
rrfQuerySearchResult.nodeQueueSize());searchTimedOut
rrfQuerySearchResult.searchTimedOut();}//将排名结果通过rankShardContext.combine方法进行合并并将相关的值记录到querySearchResult中querySearchResult.setRankShardResult(rankShardContext.combine(rrfRankResults));//包括搜索超时状态、服务时间和节点队列大小。
//
queriesquerySearchResult.searchTimedOut(searchTimedOut);querySearchResult.serviceTimeEWMA(serviceTimeEWMA);querySearchResult.nodeQueueSize(nodeQueueSize);}static
{//检查searchContext是否只有建议suggest操作如果是就执行建议阶段的操作并返回一个空的查询结果if
(searchContext.hasOnlySuggest())
{SuggestPhase.execute(searchContext);searchContext.queryResult().topDocs(new
SearchContextSourcePrinter(searchContext));}//
phase//聚合aggregation进行预处理操作AggregationPhase.preProcess(searchContext);//添加收集器collectors并执行搜索操作addCollectorsAndSearch(searchContext);//执行重新评分rescore阶段的操作RescorePhase.execute(searchContext);//再次执行建议阶段的操作。
SuggestPhase.execute(searchContext);//执行聚合阶段的操作。
AggregationPhase.execute(searchContext);//如果searchContext中包含性能分析器profiler则对查询阶段的性能结果进行分析。
if
{searchContext.queryResult().profileResults(searchContext.getProfilers().buildQueryPhaseResults());}}这两种最后还是要调用
QueryPhase.addCollectorsAndSearch进行查询只是executeRank
会多一层判断执行两遍addCollectorsAndSearch
addCollectorsAndSearch(SearchContext
searcher.getIndexReader();QuerySearchResult
searchContext.queryResult();//设置查询结果的超时状态queryResult.searchTimedOut(false);try
{//起始位置和大小。
queryResult.from(searchContext.from());queryResult.size(searchContext.size());//重写查询并通过断言确认查询已经重写。
Query
searchContext.rewrittenQuery();assert
searchContext.scrollContext();if
hits//我们在这里可以优化的不多因为我们想收集所有文档以获得总点击数}
{//如果不是第一轮滚动查询根据排序条件判断是否可以提前终止查询并构建新的查询对象。
final
scrollContext.lastEmittedDoc;if
doc//由于搜索排序是索引排序的前缀我们可以直接跳到所需的文档if
BooleanQuery.Builder().add(query,
BooleanClause.Occur.MUST).add(new
SearchAfterSortedDocQuery(searchContext.sort().sort,
BooleanClause.Occur.FILTER).build();}}}}//创建顶部文档收集器。
//
createTopDocsCollectorFactory(searchContext,searchContext.parsedPostFilter()
null);CollectorManagerCollector,
wrapWithProfilerCollectorManagerIfNeeded(searchContext.getProfilers(),topDocsFactory.collectorManager(),topDocsFactory.profilerName);//根据条件添加收集器//如果设置了terminate_after参数添加一个用于终止查询的收集器。
if
(searchContext.terminateAfter()
SearchContext.DEFAULT_TERMINATE_AFTER)
collectorsTerminateAfterCollector
TerminateAfterCollector(searchContext.terminateAfter());final
collectorManager.newCollector();collectorManager
wrapWithProfilerCollectorManagerIfNeeded(searchContext.getProfilers(),new
SingleThreadCollectorManager(MultiCollector.wrap(terminateAfterCollector,
collector)),REASON_SEARCH_TERMINATE_AFTER_COUNT,collector);}//如果存在后置过滤器添加一个用于过滤结果的收集器。
if
(searchContext.parsedPostFilter()
searcher.createWeight(searcher.rewrite(searchContext.parsedPostFilter().query()),ScoreMode.COMPLETE_NO_SCORES,1f);final
collectorManager.newCollector();collectorManager
wrapWithProfilerCollectorManagerIfNeeded(searchContext.getProfilers(),new
SingleThreadCollectorManager(new
filterWeight)),REASON_SEARCH_POST_FILTER,collector);}//如果存在聚合操作添加一个用于聚合的收集器。
if
collectorManager.newCollector();final
searchContext.aggregations().getAggsCollectorManager().newCollector();collectorManager
wrapWithProfilerCollectorManagerIfNeeded(searchContext.getProfilers(),new
SingleThreadCollectorManager(MultiCollector.wrap(collector,
aggsCollector)),REASON_SEARCH_MULTI,collector,aggsCollector);}//如果设置了最小分数添加一个用于过滤低分结果的收集器。
if
collectorManager.newCollector();//
wrapWithProfilerCollectorManagerIfNeeded(searchContext.getProfilers(),new
SingleThreadCollectorManager(new
MinimumScoreCollector(collector,
searchContext.minimumScore())),REASON_SEARCH_MIN_SCORE,collector);}//根据超时设置添加查询超时检查的任务。
final
getTimeoutCheck(searchContext);if
{searcher.addQueryCancellation(timeoutRunnable);}try
{//使用收集器管理器执行查询并更新查询结果。
searchWithCollectorManager(searchContext,
null);queryResult.topDocs(topDocsFactory.topDocsAndMaxScore(),
topDocsFactory.sortValueFormats);//获取线程池执行器对象并根据类型更新查询结果的节点队列大小和服务时间指数加权移动平均值。
ExecutorService
searchContext.indexShard().getThreadPool().executor(ThreadPool.Names.SEARCH);assert
TaskExecutionTimeTrackingEsThreadPoolExecutor||
TaskExecutionTimeTrackingEsThreadPoolExecutor
{queryResult.nodeQueueSize(rExecutor.getCurrentQueueSize());queryResult.serviceTimeEWMA((long)
rExecutor.getTaskExecutionEWMA());}}
{searcher.removeQueryCancellation(timeoutRunnable);}}}
QueryPhaseExecutionException(searchContext.shardTarget(),
e);}}会通过searchWithCollectorManager
searchWithCollectorManager(SearchContext
searchContext,ContextIndexSearcher
query,CollectorManagerCollector,
{//如果profilers不为null则获取当前查询的分析器并将collectorManager设置为InternalProfileCollectorManager的getCollectorTree方法。
if
{searchContext.getProfilers().getCurrentQueryProfiler().setCollectorManager(((InternalProfileCollectorManager)
collectorManager)::getCollectorTree);}//获取searchContext的查询结果对象queryResult。
QuerySearchResult
searchContext.queryResult();try
{//使用searcher和collectorManager执行查询操作。
searcher.search(query,
(TerminateAfterCollector.EarlyTerminationException
{//如果查询被TerminateAfterCollector.EarlyTerminationException异常提前终止则将queryResult的terminatedEarly属性设置为true。
queryResult.terminatedEarly(true);}
set;//如果查询超时且timeoutSet为true则检查searchContext的request是否允许部分搜索结果。
if
(searchContext.request().allowPartialSearchResults()
{//如果不允许部分搜索结果则抛出QueryPhaseExecutionException异常指示查询超时。
//
QueryPhaseExecutionException(searchContext.shardTarget(),
exceeded);}//如果允许部分搜索结果则将queryResult的searchTimedOut属性设置为true。
queryResult.searchTimedOut(true);}//如果searchContext的terminateAfter属性不等于SearchContext.DEFAULT_TERMINATE_AFTER且queryResult的terminatedEarly属性为null则将queryResult的terminatedEarly属性设置为false。
if
(searchContext.terminateAfter()
SearchContext.DEFAULT_TERMINATE_AFTER
{queryResult.terminatedEarly(false);}}(3)
{//通过collectorManager创建一个收集器(Collector)的实例firstCollector。
final
collectorManager.newCollector();//
needed.//根据firstCollector的评分模式(scoreMode)判断是否需要评分,如果需要评分则使用rewrite方法对查询进行重写如果不需要评分则使用ConstantScoreQuery对查询进行重写。
query
firstCollector.scoreMode().needsScores()
ConstantScoreQuery(query));//根据重写后的查询(query)、评分模式(scoreMode)和权重(Weight)创建一个权重对象(weight)。
final
1);//调用search方法传入权重对象、收集器管理器和第一个收集器执行搜索操作并返回结果。
return
firstCollector);}/**每个元素表示一个分片的信息*LeafSlice的数量就代表了索引的分片数量。
每个LeafSlice对象代表了一个分片的信息和上下文。
*
如果一个索引在这个数据节点有5个分片则这个的长度为5*/private
{//如果queueSizeBasedExecutor为空或者leafSlices的长度小于等于1//LeafSlice关键字解释是IndexSearcher用来管理和表示索引搜索分片的类,如果小于等于1则此数据节点只有一个分片if
{//那么直接在leafContexts上执行搜索操作search(leafContexts,
firstCollector);//并通过collectorManager.reduce方法将结果收集起来返回。
return
collectorManager.reduce(Collections.singletonList(firstCollector));}
ArrayList(leafSlices.length);collectors.add(firstCollector);final
firstCollector.scoreMode();//并使用collectorManager创建新的收集器。
for
collectorManager.newCollector();collectors.add(collector);if
IllegalStateException(CollectorManager
mode);}}//创建一个FutureTask列表listTasks用于异步执行搜索操作final
ArrayList();//遍历leafSlices对每个leafSlices创建一个FutureTask并将其添加到listTasks中。
for
collector;});listTasks.add(task);}//使用queueSizeBasedExecutor的invokeAll方法执行所有的listTasks等待它们的执行完成。
queueSizeBasedExecutor.invokeAll(listTasks);RuntimeException
ArrayList();//遍历listTasks获取每个任务的结果并将其添加到collectedCollectors列表中。
for
{collectedCollectors.add(future.get());//
{//省略代码}}//通过collectorManager.reduce方法将所有收集器的结果进行组合并返回最终结果。
return
collectorManager.reduce(collectedCollectors);}}需要注意leafSlices
{collector.setWeight(weight);for
collector);}}1、主节点将查询请求路由(根据分片找到数据节点)到对应的数据节点执行查询请求因为数据节点无法知道查询请求是否仅针对某一个具体的分片。
数据节点会在所有分片上执行查询操作并将结果进行合并、去重和处理以产生最终的结果。
2、因此即使主节点发送的查询请求只涉及一个分片但在实际查询过程中数据节点会遍历该数据节点上所有与该索引对应的分片以保证查询结果的完整性。
{//检查是否取消了搜索操作。
cancellable.checkCancelled();//获取当前叶子节点的LeafCollector。
用于收集匹配的文档。
final
{//获取当前叶子节点的存活文档集合。
leafCollector
collector.getLeafCollector(ctx);}
{return;}//获取当前叶子节点的存活文档集合位集合表示哪些文档未被删除Bits
ctx.reader().getLiveDocs();//将存活文档集合转换为稀疏位集合BitSet
getSparseBitSetOrNull(liveDocs);//如果存活文档集合不是稀疏位集合那么使用BulkScorer进行评分操作。
if
CancellableBulkScorer(bulkScorer,
cancellable::checkCancelled);}try
{//使用BulkScorer对匹配的文档进行评分操作。
bulkScorer.score(leafCollector,
{//如果存活文档集合为稀疏位集合//获取权重对象的Scorer用于计算每个候选文档的得分Scorer
使用Scorer和稀疏位集合liveDocsBitSet对匹配的文档进行交集计算并进行评分操作。
intersectScorerAndBitSet(scorer,liveDocsBitSet,leafCollector,this.cancellable.isEnabled()
的作用是在搜索阶段过滤掉已被删除的文档并仅处理存活文档。
通过与其他集合的结合使用可以在执行搜索和评分操作时仅处理存活的有效文档可以大大减少内存占用。
从而提升搜索性能和准确性。
其他集合例如Bits、BulkScorer和Scorer可能用于执行搜索和评分操作但往往与存活文档集合无直接关联
//在给定的Scorer、BitSet、LeafCollector和checkCancelled函数的基础上计算它们的交集并将结果收集到collector中。
static
intersectScorerAndBitSet(Scorer
{//将scorer设置为collector的scorer。
collector.setScorer(scorer);//
first://创建一个迭代器iterator通过将acceptDocs和scorer的迭代器传递给ConjunctionUtils.intersectIterators()方法来计算它们的交集。
DocIdSetIterator
ConjunctionUtils.intersectIterators(Arrays.asList(new
acceptDocs.approximateCardinality()),
0;checkCancelled.run();//通过迭代器遍历交集中的文档对于每个文档如果满足一定条件则将其收集到collector中。
for
{//在每次遍历一定数量的文档后调用checkCancelled函数检查是否取消操作。
if
CHECK_CANCELLED_SCORER_INTERVAL
{checkCancelled.run();}collector.collect(docId);}//最后再次调用checkCancelled函数。
checkCancelled.run();}
作为专业的SEO优化服务提供商,我们致力于通过科学、系统的搜索引擎优化策略,帮助企业在百度、Google等搜索引擎中获得更高的排名和流量。我们的服务涵盖网站结构优化、内容优化、技术SEO和链接建设等多个维度。
| 服务项目 | 基础套餐 | 标准套餐 | 高级定制 |
|---|---|---|---|
| 关键词优化数量 | 10-20个核心词 | 30-50个核心词+长尾词 | 80-150个全方位覆盖 |
| 内容优化 | 基础页面优化 | 全站内容优化+每月5篇原创 | 个性化内容策略+每月15篇原创 |
| 技术SEO | 基本技术检查 | 全面技术优化+移动适配 | 深度技术重构+性能优化 |
| 外链建设 | 每月5-10条 | 每月20-30条高质量外链 | 每月50+条多渠道外链 |
| 数据报告 | 月度基础报告 | 双周详细报告+分析 | 每周深度报告+策略调整 |
| 效果保障 | 3-6个月见效 | 2-4个月见效 | 1-3个月快速见效 |
我们的SEO优化服务遵循科学严谨的流程,确保每一步都基于数据分析和行业最佳实践:
全面检测网站技术问题、内容质量、竞争对手情况,制定个性化优化方案。
基于用户搜索意图和商业目标,制定全面的关键词矩阵和布局策略。
解决网站技术问题,优化网站结构,提升页面速度和移动端体验。
创作高质量原创内容,优化现有页面,建立内容更新机制。
获取高质量外部链接,建立品牌在线影响力,提升网站权威度。
持续监控排名、流量和转化数据,根据效果调整优化策略。
基于我们服务的客户数据统计,平均优化效果如下:
我们坚信,真正的SEO优化不仅仅是追求排名,而是通过提供优质内容、优化用户体验、建立网站权威,最终实现可持续的业务增长。我们的目标是与客户建立长期合作关系,共同成长。
Demand feedback