elasticsearch索引建立过程
开篇
怀着佛系心态写的文章,因为发现心急依然看不懂代码,所以只能安慰自己佛系一点,这篇文章希望能够把elasticsearch的index的创建过程讲清楚(包括index但不包括doc的添加过程),希望能够有帮助。
ES的Meta的组成
Meta是用来描述数据的数据。在ES中,Index的mapping结构、配置、持久化状态等就属于meta数据,集群的一些配置信息也属于meta。ES的Meta信息可以简单的理解为包括ClusterState、MetaData、IndexMetaData。
ClusterState
集群中的每个节点都会在内存中维护一个当前的ClusterState,表示当前集群的各种状态。ClusterState中包含一个MetaData的结构,MetaData中存储的内容更符合meta的特征,而且需要持久化的信息都在MetaData中,此外的一些变量可以认为是一些临时状态,是集群运行中动态构建出来的。
public class ClusterState implements ToXContentFragment, Diffable<ClusterState> { public static final String UNKNOWN_UUID = "_na_"; public static final long UNKNOWN_VERSION = -1; private final long version; private final String stateUUID; private final RoutingTable routingTable; private final DiscoveryNodes nodes; private final MetaData metaData; private final ClusterBlocks blocks; private final ImmutableOpenMap<String, Custom> customs; private final ClusterName clusterName; private final boolean wasReadFromDiff; }
MetaData
MetaData主要是集群的一些配置,集群所有Index的Meta,所有Template的Meta,所有custom的Meta信息。
public class MetaData implements Iterable<IndexMetaData>, Diffable<MetaData>, ToXContentFragment { private final String clusterUUID; private final long version; private final Settings transientSettings; private final Settings persistentSettings; private final Settings settings; private final ImmutableOpenMap<String, IndexMetaData> indices; private final ImmutableOpenMap<String, IndexTemplateMetaData> templates; private final ImmutableOpenMap<String, Custom> customs; private final transient int totalNumberOfShards; private final int numberOfShards; private final String[] allIndices; private final String[] allOpenIndices; private final String[] allClosedIndices; private final SortedMap<String, AliasOrIndex> aliasAndIndexLookup;
IndexMetaData
IndexMetaData指具体某个Index的Meta,比如Index的shard、replica、mappings等
public class IndexMetaData implements Diffable<IndexMetaData>, ToXContentFragment { private final int routingNumShards; private final int routingFactor; private final int routingPartitionSize; private final int numberOfShards; private final int numberOfReplicas; private final Index index; private final long version; private final long[] primaryTerms; private final State state; private final ImmutableOpenMap<String, AliasMetaData> aliases; private final Settings settings; private final ImmutableOpenMap<String, MappingMetaData> mappings; private final ImmutableOpenMap<String, Custom> customs; private final ImmutableOpenIntMap<Set<String>> inSyncAllocationIds; private final transient int totalNumberOfShards; private final DiscoveryNodeFilters requireFilters; private final DiscoveryNodeFilters includeFilters; private final DiscoveryNodeFilters excludeFilters; private final DiscoveryNodeFilters initialRecoveryFilters; private final Version indexCreatedVersion; private final Version indexUpgradedVersion; private final ActiveShardCount waitForActiveShards; }
索引建立过程
阶段一:校验参数阶段
- 校验当前ClusterState的currentState是否存在该索引,routingTable包含该index、metaData包含该index、alias别名等。
阶段二:配置合并阶段
- 合并template和request传入的mapping、customs 数据,优先级上request配置优先于template。
- 合并template和request的setting,优先级上request配置优先于template。
阶段三:构建IndexSettings阶段
- 构建Settings.Builder indexSettingsBuilder对象,合并templates、request的数据,辅以默认配置值。
阶段四:构建IndexMetaData阶段
- 构建IndexMetaData.Builder的tmpImdBuilder对象并绑定indexSettingsBuilder生成的actualIndexSettings。
- 通过tmpImdBuilder.build()构建构建IndexMetaData tmpImd对象。
阶段五:构建IndexService阶段
- 临时构建indexService对象,IndexService indexService = indicesService.createIndex(tmpImd, Collections.emptyList())。
阶段六:获取Index和Map阶段
- 获取新创建的index和mapping,Index createdIndex = indexService.index()和MapperService mapperService = indexService.mapperService()。
阶段七:更新mapping到mapperService阶段
- mapperService.merge()合并request和template合并后生成的最新的mappings。
- 生成Map mappingsMetaData,key是创建mapping的指定的type。
阶段八:构建IndexMetaData阶段
- 构建IndexMetaData.Builder indexMetaDataBuilder的indexMetaDataBuilder对象并绑定actualIndexSettings和routingNumShards。
- indexMetaDataBuilder通过primaryTerm设置primaryTerm、putMapping设置mappingMd,putAlias设置aliasMetaData(template和request请求),putCustom设置customIndexMetaData,indexMetaDataBuilder.state设置state。
- 创建indexMetaData对象,通过indexMetaData = indexMetaDataBuilder.build()实现。
阶段九:更新IndexMetaData阶段
- 将IndexMetaData对象更新到MetaData对象中并返回最新的MetaData, MetaData newMetaData = MetaData.builder(currentState.metaData()).put(indexMetaData, false).build()。
阶段十:更新MetaData阶段
- 更新最新的MetaData对象newMetaData对象到ClusterState,ClusterState.builder(currentState).blocks(blocks).metaData(newMetaData).build()。
public class MetaDataCreateIndexService extends AbstractComponent { static class IndexCreationTask extends AckedClusterStateUpdateTask<ClusterStateUpdateResponse> { @Override /** * es当前集群的状态ClusterState */ public ClusterState execute(ClusterState currentState) throws Exception { Index createdIndex = null; String removalExtraInfo = null; IndexRemovalReason removalReason = IndexRemovalReason.FAILURE; try { // 以下是校验ClusterState中是否存在index索引。 // 校验当前currentState是否存在该索引,routingTable包含该index、metaData包含该index、alias别名等。 validator.validate(request, currentState); // 校验是否已存在别名 for (Alias alias : request.aliases()) { aliasValidator.validateAlias(alias, request.index(), currentState.metaData()); } // 以下过程是合并request和template的配置信息 // 从当前的ClusterState查询是否存在别名 List<IndexTemplateMetaData> templates = findTemplates(request, currentState); // 用来保存请求带过来的mapping Map<String, Custom> customs = new HashMap<>(); Map<String, Map<String, Object>> mappings = new HashMap<>(); Map<String, AliasMetaData> templatesAliases = new HashMap<>(); List<String> templateNames = new ArrayList<>(); // 保存request的mapping到临时变量mappings for (Map.Entry<String, String> entry : request.mappings().entrySet()) { mappings.put(entry.getKey(), MapperService.parseMapping(xContentRegistry, entry.getValue())); } // 保存request的customs信息到临时变量customs当中 for (Map.Entry<String, Custom> entry : request.customs().entrySet()) { customs.put(entry.getKey(), entry.getValue()); } final Index recoverFromIndex = request.recoverFrom(); // todo 这里不知道是什么判断,大概是不需要从某个索引进行恢复 if (recoverFromIndex == null) { // 合并template的mapping、customs、alias到request的参数当中 for (IndexTemplateMetaData template : templates) { templateNames.add(template.getName()); // 合并request和template的mapping变量 for (ObjectObjectCursor<String, CompressedXContent> cursor : template.mappings()) { String mappingString = cursor.value.string(); // 如果request包含该命名的mapping,就进行合并,mapping以request传入为主,合并命中的template if (mappings.containsKey(cursor.key)) { XContentHelper.mergeDefaults(mappings.get(cursor.key), MapperService.parseMapping(xContentRegistry, mappingString)); } else { // 如果request不包含该命名的mapping,就直接新增即可 mappings.put(cursor.key, MapperService.parseMapping(xContentRegistry, mappingString)); } } // handle custom for (ObjectObjectCursor<String, Custom> cursor : template.customs()) { String type = cursor.key; IndexMetaData.Custom custom = cursor.value; IndexMetaData.Custom existing = customs.get(type); if (existing == null) { customs.put(type, custom); } else { IndexMetaData.Custom merged = existing.mergeWith(custom); customs.put(type, merged); } } // 以request带的alias作为为主,合并template的alias for (ObjectObjectCursor<String, AliasMetaData> cursor : template.aliases()) { AliasMetaData aliasMetaData = cursor.value; if (request.aliases().contains(new Alias(aliasMetaData.alias()))) { continue; } if (templatesAliases.containsKey(cursor.key)) { continue; } if (aliasMetaData.alias().contains("{index}")) { String templatedAlias = aliasMetaData.alias().replace("{index}", request.index()); aliasMetaData = AliasMetaData.newAliasMetaData(aliasMetaData, templatedAlias); } aliasValidator.validateAliasMetaData(aliasMetaData, request.index(), currentState.metaData()); templatesAliases.put(aliasMetaData.alias(), aliasMetaData); } } } // 以下是创建setting过程,用于创建索引时候使用。 // 合并templates的setting配置 Settings.Builder indexSettingsBuilder = Settings.builder(); if (recoverFromIndex == null) { // apply templates, here, in reverse order, since first ones are better matching for (int i = templates.size() - 1; i >= 0; i--) { indexSettingsBuilder.put(templates.get(i).settings()); } } // 合并request的setting并覆盖templates的setting indexSettingsBuilder.put(request.settings()); if (indexSettingsBuilder.get(SETTING_NUMBER_OF_SHARDS) == null) { indexSettingsBuilder.put(SETTING_NUMBER_OF_SHARDS, settings.getAsInt(SETTING_NUMBER_OF_SHARDS, 5)); } if (indexSettingsBuilder.get(SETTING_NUMBER_OF_REPLICAS) == null) { indexSettingsBuilder.put(SETTING_NUMBER_OF_REPLICAS, settings.getAsInt(SETTING_NUMBER_OF_REPLICAS, 1)); } if (settings.get(SETTING_AUTO_EXPAND_REPLICAS) != null && indexSettingsBuilder.get(SETTING_AUTO_EXPAND_REPLICAS) == null) { indexSettingsBuilder.put(SETTING_AUTO_EXPAND_REPLICAS, settings.get(SETTING_AUTO_EXPAND_REPLICAS)); } if (indexSettingsBuilder.get(SETTING_VERSION_CREATED) == null) { DiscoveryNodes nodes = currentState.nodes(); final Version createdVersion = Version.min(Version.CURRENT, nodes.getSmallestNonClientNodeVersion()); indexSettingsBuilder.put(SETTING_VERSION_CREATED, createdVersion); } if (indexSettingsBuilder.get(SETTING_CREATION_DATE) == null) { indexSettingsBuilder.put(SETTING_CREATION_DATE, new DateTime(DateTimeZone.UTC).getMillis()); } indexSettingsBuilder.put(IndexMetaData.SETTING_INDEX_PROVIDED_NAME, request.getProvidedName()); indexSettingsBuilder.put(SETTING_INDEX_UUID, UUIDs.randomBase64UUID()); // 以下是创建IndexMetaData的builder的过程 // 组建IndexMetaData的builder final IndexMetaData.Builder tmpImdBuilder = IndexMetaData.builder(request.index()); final int routingNumShards; // routingNumShards的获取逻辑没怎么看懂 if (recoverFromIndex == null) { Settings idxSettings = indexSettingsBuilder.build(); routingNumShards = IndexMetaData.INDEX_NUMBER_OF_ROUTING_SHARDS_SETTING.get(idxSettings); } else { assert IndexMetaData.INDEX_NUMBER_OF_ROUTING_SHARDS_SETTING.exists(indexSettingsBuilder.build()) == false : "index.number_of_routing_shards should be present on the target index on resize"; final IndexMetaData sourceMetaData = currentState.metaData().getIndexSafe(recoverFromIndex); routingNumShards = sourceMetaData.getRoutingNumShards(); } // 移除routing_shards配置 indexSettingsBuilder.remove(IndexMetaData.INDEX_NUMBER_OF_ROUTING_SHARDS_SETTING.getKey()); tmpImdBuilder.setRoutingNumShards(routingNumShards); if (recoverFromIndex != null) { assert request.resizeType() != null; prepareResizeIndexSettings( currentState, mappings.keySet(), indexSettingsBuilder, recoverFromIndex, request.index(), request.resizeType()); } // 实际的IndexSetting final Settings actualIndexSettings = indexSettingsBuilder.build(); // IndexMetaData的builder添加实际的索引的设置 tmpImdBuilder.settings(actualIndexSettings); if (recoverFromIndex != null) { final IndexMetaData sourceMetaData = currentState.metaData().getIndexSafe(recoverFromIndex); final long primaryTerm = IntStream .range(0, sourceMetaData.getNumberOfShards()) .mapToLong(sourceMetaData::primaryTerm) .max() .getAsLong(); for (int shardId = 0; shardId < tmpImdBuilder.numberOfShards(); shardId++) { tmpImdBuilder.primaryTerm(shardId, primaryTerm); } } // 创建实际的IndexMetaData对象 final IndexMetaData tmpImd = tmpImdBuilder.build(); ActiveShardCount waitForActiveShards = request.waitForActiveShards(); if (waitForActiveShards == ActiveShardCount.DEFAULT) { waitForActiveShards = tmpImd.getWaitForActiveShards(); } if (waitForActiveShards.validate(tmpImd.getNumberOfReplicas()) == false) { throw new IllegalArgumentException("invalid wait_for_active_shards[" + request.waitForActiveShards() + "]: cannot be greater than number of shard copies [" + (tmpImd.getNumberOfReplicas() + 1) + "]"); } // 创建索引服务IndexService final IndexService indexService = indicesService.createIndex(tmpImd, Collections.emptyList()); createdIndex = indexService.index(); // 获取创建IndexService的mapperService MapperService mapperService = indexService.mapperService(); try { // 合并request和template合并后生成的最新的mappings mapperService.merge(mappings, MergeReason.MAPPING_UPDATE, request.updateAllTypes()); } catch (Exception e) { removalExtraInfo = "failed on parsing default mapping/mappings on index creation"; throw e; } if (request.recoverFrom() == null) { indexService.getIndexSortSupplier().get(); } final QueryShardContext queryShardContext = indexService.newQueryShardContext(0, null, () -> 0L, null); for (Alias alias : request.aliases()) { if (Strings.hasLength(alias.filter())) { aliasValidator.validateAliasFilter(alias.name(), alias.filter(), queryShardContext, xContentRegistry); } } for (AliasMetaData aliasMetaData : templatesAliases.values()) { if (aliasMetaData.filter() != null) { aliasValidator.validateAliasFilter(aliasMetaData.alias(), aliasMetaData.filter().uncompressed(), queryShardContext, xContentRegistry); } } // 新建mappingsMetaData对象 Map<String, MappingMetaData> mappingsMetaData = new HashMap<>(); for (DocumentMapper mapper : mapperService.docMappers(true)) { MappingMetaData mappingMd = new MappingMetaData(mapper); mappingsMetaData.put(mapper.type(), mappingMd); } // 以下过程是创建IndexMetaData的过程 // 创建indexMetaDataBuilder对象,执行真正的创建 final IndexMetaData.Builder indexMetaDataBuilder = IndexMetaData.builder(request.index()) .settings(actualIndexSettings) .setRoutingNumShards(routingNumShards); for (int shardId = 0; shardId < tmpImd.getNumberOfShards(); shardId++) { indexMetaDataBuilder.primaryTerm(shardId, tmpImd.primaryTerm(shardId)); } for (MappingMetaData mappingMd : mappingsMetaData.values()) { indexMetaDataBuilder.putMapping(mappingMd); } for (AliasMetaData aliasMetaData : templatesAliases.values()) { indexMetaDataBuilder.putAlias(aliasMetaData); } for (Alias alias : request.aliases()) { AliasMetaData aliasMetaData = AliasMetaData.builder(alias.name()).filter(alias.filter()) .indexRouting(alias.indexRouting()).searchRouting(alias.searchRouting()).build(); indexMetaDataBuilder.putAlias(aliasMetaData); } for (Map.Entry<String, Custom> customEntry : customs.entrySet()) { indexMetaDataBuilder.putCustom(customEntry.getKey(), customEntry.getValue()); } indexMetaDataBuilder.state(request.state()); // 创建indexMetaData对象 final IndexMetaData indexMetaData; try { indexMetaData = indexMetaDataBuilder.build(); } catch (Exception e) { removalExtraInfo = "failed to build index metadata"; throw e; } indexService.getIndexEventListener().beforeIndexAddedToCluster(indexMetaData.getIndex(), indexMetaData.getSettings()); // 创建新的MetaData MetaData newMetaData = MetaData.builder(currentState.metaData()) .put(indexMetaData, false) .build(); ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks()); if (!request.blocks().isEmpty()) { for (ClusterBlock block : request.blocks()) { blocks.addIndexBlock(request.index(), block); } } blocks.updateBlocks(indexMetaData); // 阻塞blocks并更新Meta元数据 ClusterState updatedState = ClusterState.builder(currentState).blocks(blocks).metaData(newMetaData).build(); if (request.state() == State.OPEN) { RoutingTable.Builder routingTableBuilder = RoutingTable.builder(updatedState.routingTable()) .addAsNew(updatedState.metaData().index(request.index())); updatedState = allocationService.reroute( ClusterState.builder(updatedState).routingTable(routingTableBuilder.build()).build(), "index [" + request.index() + "] created"); } removalExtraInfo = "cleaning up after validating index on master"; removalReason = IndexRemovalReason.NO_LONGER_ASSIGNED; return updatedState; } finally { if (createdIndex != null) { indicesService.removeIndex(createdIndex, removalReason, removalExtraInfo); } } } } }
创建IndexService过程
这部分逻辑暂时没有深入研究,这里只放在这里用于引导提醒一下,具体不做深入研究。
public class IndicesService extends AbstractLifecycleComponent implements IndicesClusterStateService.AllocatedIndices<IndexShard, IndexService>, IndexService.ShardStoreDeleter { public synchronized IndexService createIndex( final IndexMetaData indexMetaData, final List<IndexEventListener> builtInListeners) throws IOException { ensureChangesAllowed(); if (indexMetaData.getIndexUUID().equals(IndexMetaData.INDEX_UUID_NA_VALUE)) { throw new IllegalArgumentException("index must have a real UUID found value: [" + indexMetaData.getIndexUUID() + "]"); } final Index index = indexMetaData.getIndex(); if (hasIndex(index)) { throw new ResourceAlreadyExistsException(index); } List<IndexEventListener> finalListeners = new ArrayList<>(builtInListeners); final IndexEventListener onStoreClose = new IndexEventListener() { @Override public void onStoreClosed(ShardId shardId) { indicesQueryCache.onClose(shardId); } }; finalListeners.add(onStoreClose); finalListeners.add(oldShardsStats); final IndexService indexService = createIndexService( "create index", indexMetaData, indicesQueryCache, indicesFieldDataCache, finalListeners, indexingMemoryController); boolean success = false; try { indexService.getIndexEventListener().afterIndexCreated(indexService); indices = newMapBuilder(indices).put(index.getUUID(), indexService).immutableMap(); success = true; return indexService; } finally { if (success == false) { indexService.close("plugins_failed", true); } } } private synchronized IndexService createIndexService(final String reason, IndexMetaData indexMetaData, IndicesQueryCache indicesQueryCache, IndicesFieldDataCache indicesFieldDataCache, List<IndexEventListener> builtInListeners, IndexingOperationListener... indexingOperationListeners) throws IOException { final IndexSettings idxSettings = new IndexSettings(indexMetaData, this.settings, indexScopeSetting); logger.debug("creating Index [{}], shards [{}]/[{}] - reason [{}]", indexMetaData.getIndex(), idxSettings.getNumberOfShards(), idxSettings.getNumberOfReplicas(), reason); final IndexModule indexModule = new IndexModule(idxSettings, analysisRegistry); for (IndexingOperationListener operationListener : indexingOperationListeners) { indexModule.addIndexOperationListener(operationListener); } pluginsService.onIndexModule(indexModule); for (IndexEventListener listener : builtInListeners) { indexModule.addIndexEventListener(listener); } return indexModule.newIndexService( nodeEnv, xContentRegistry, this, circuitBreakerService, bigArrays, threadPool, scriptService, client, indicesQueryCache, mapperRegistry, indicesFieldDataCache, namedWriteableRegistry ); } }
参考文章
Elasticsearch分布式一致性原理剖析(二)-Meta篇
elasticsearch index 之 create index(二)
招聘信息
【招贤纳士】
欢迎热爱技术、热爱生活的你和我成为同事,和贝贝共同成长。
贝贝集团诚招算法、大数据、BI、Java、PHP、android、iOS、测试、运维、DBA等人才,有意可投递zhi.wang@beibei.com。
贝贝集团创建于2011年,旗下拥有贝贝网、贝店、贝贷等平台,致力于成为全球领先的家庭消费平台。
贝贝创始团队来自阿里巴巴,先后获得IDG资本、高榕资本、今日资本、新天域资本、北极光等数亿美金的风险投资。
公司地址:杭州市江干区普盛巷9号东谷创业园(上下班有多趟班车)

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
0110-如何给Kerberos环境下的CDH集群添加Gateway节点
1.文档编写目的 Gateway节点又称为客户端节点,通常用作访问Hadoop集群的接口机。它主要会部署一些客户端的配置,脚本命令,比如HDFS的core-site.xml,hdfs-site.xml以及hadoop的操作命令。 如果你使用的是Apache Hadoop,你只需要将hadoop相关服务的配置和脚本命令拷贝到客户端机器即可,但一旦集群的配置有所修改,你需要注意也同步到客户端机器。如果是CDH集群,客户端节点也会是Cloudera Manager管理的一台机器,它会被安装cloudera-scm-agent服务,以及CDH的Parcel,部署客户端配置Cloudera Manager会统一做,另外如果客户端机器出现异常,Cloudera Manager也会告警。 增加一台Gateway节点,与安装CDH非常类似,你必须要注意一定要做好客户端机器的前置条件准备,参考《CDH安装前置准备》,否则会增加失败。前面Fayson介绍过在非Kerberos环境下部署Gateway节点,参考《如何给CDH集群增加Gateway节点》。本文则主要是介绍如何在Kerberos环境下给CDH集...
- 下一篇
Apache Spark Meetup China 第1期 最全资料下载
活动时间:2018年12月16日13:30-17:00 活动地点:杭州市余杭区文一西路998号未来科技城海创园4幢801C 主办单位:阿里云、袋鼠云、云栖社区 主题介绍: 主题一、Spark优化实践-13:30 - 14:30阿里云E-MapReduce-王道远介绍阿里云EMR中Spark计算引擎所包含的一系列额外优化工作,包括SmartShuffle、file skip index等。 主题二、Spark使用对象存储的机遇和挑战-14:30 - 15:30阿里云E-MapReduce-余根茂介绍Spark对对象存储这种类似的访问的原理机制,以及优化的策略 主题三、基于SparkSQL的安防大数据检索分析优化外部-15:30-16:30杭州海康威视研究院大数据技术部 - 陈国栋 介绍基于SparkSQL构建安防大数据交互式查询分析方面做的工作,
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- Hadoop3单机部署,实现最简伪集群
- Springboot2将连接池hikari替换为druid,体验最强大的数据库连接池
- CentOS7设置SWAP分区,小内存服务器的救世主
- SpringBoot2全家桶,快速入门学习开发网站教程
- SpringBoot2编写第一个Controller,响应你的http请求并返回结果
- Docker快速安装Oracle11G,搭建oracle11g学习环境
- Windows10,CentOS7,CentOS8安装Nodejs环境
- Eclipse初始化配置,告别卡顿、闪退、编译时间过长
- 设置Eclipse缩进为4个空格,增强代码规范
- CentOS7编译安装Gcc9.2.0,解决mysql等软件编译问题