首页 文章 精选 留言 我的

精选列表

搜索[Elasticsearch],共4100篇文章
优秀的个人博客,低调大师

Elasticsearch 5.6.12 源码】——【3】启动过程分析(下)

版权声明:本文为博主原创,转载请注明出处! 简介 本文主要解决以下问题: 1、ES启动过程中的Node对象都初始化了那些服务? 构造流程 Step 1、创建一个List暂存初始化失败时需要释放的资源,并使用临时的Logger对象输出开始初始化的日志。 这里首先创建了一个List<Closeable>然后输出日志initializing ...。代码比较简单: final List<Closeable> resourcesToClose = new ArrayList<>(); // register everything we need to release in the case of an error boolean success = false; { // use temp logger just to say we are starting. we can't use it later on because the node name might not be set Logger logger = Loggers.getLogger(Node.class, NODE_NAME_SETTING.get(environment.settings())); logger.info("initializing ..."); } Step 2、强制设置settings中client.type的配置为node,设置node.name并检查索引data目录的设置。 这部分首先设置client.type为node,接下来调用TribeService的processSettings方法来处理了“部落”的配置,然后创建NodeEnvironment,检查并设置node.name属性,最后按需检查索引数据的Path的配置并打印一些JVM的信息。代码如下: Settings tmpSettings = Settings.builder().put(environment.settings()) .put(Client.CLIENT_TYPE_SETTING_S.getKey(), CLIENT_TYPE).build(); tmpSettings = TribeService.processSettings(tmpSettings); // create the node environment as soon as possible, to recover the node id and enable logging try { nodeEnvironment = new NodeEnvironment(tmpSettings, environment); resourcesToClose.add(nodeEnvironment); } catch (IOException ex) { throw new IllegalStateException("Failed to create node environment", ex); } final boolean hadPredefinedNodeName = NODE_NAME_SETTING.exists(tmpSettings); Logger logger = Loggers.getLogger(Node.class, tmpSettings); final String nodeId = nodeEnvironment.nodeId(); tmpSettings = addNodeNameIfNeeded(tmpSettings, nodeId); if (DiscoveryNode.nodeRequiresLocalStorage(tmpSettings)) { checkForIndexDataInDefaultPathData(tmpSettings, nodeEnvironment, logger); } // this must be captured after the node name is possibly added to the settings final String nodeName = NODE_NAME_SETTING.get(tmpSettings); if (hadPredefinedNodeName == false) { logger.info("node name [{}] derived from node ID [{}]; set [{}] to override", nodeName, nodeId, NODE_NAME_SETTING.getKey()); } else { logger.info("node name [{}], node ID [{}]", nodeName, nodeId); } Step 3、创建PluginsService及Environment实例。 在PluginsService的构造方法中会加载plugins和modules目录下的jar包,并创建相应的plugin和module实例。创建完以后,Node的构造方法中会调用pluginsService的updatedSettings方法来获取plugin和module中定义的配置项。接下来Node或使用新的settings和nodeId来创建LocalNodeFactory,并使用最新的settings重新创建Environment对象。代码如下: this.pluginsService = new PluginsService(tmpSettings, environment.modulesFile(), environment.pluginsFile(), classpathPlugins); this.settings = pluginsService.updatedSettings(); localNodeFactory = new LocalNodeFactory(settings, nodeEnvironment.nodeId()); // create the environment based on the finalized (processed) view of the settings // this is just to makes sure that people get the same settings, no matter where they ask them from this.environment = new Environment(this.settings); Environment.assertEquivalent(environment, this.environment); Step 4、创建ThreadPool及ThreadContext实例。 首先,通过pluginsService获取plugin及module中提供的ExecutorBuilder对象列表。接下来基于settings及获取的ExecutorBuilder对象列表创建ThreadPool及ThreadContext实例。代码如下: final ThreadPool threadPool = new ThreadPool(settings, executorBuilders.toArray(new ExecutorBuilder[0])); resourcesToClose.add(() -> ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS)); // adds the context to the DeprecationLogger so that it does not need to be injected everywhere DeprecationLogger.setThreadContext(threadPool.getThreadContext()); resourcesToClose.add(() -> DeprecationLogger.removeThreadContext(threadPool.getThreadContext())); Step 5、依次创建NodeClient、ResourceWatcherService、ScriptModule、AnalysisModule、SettingsModule、NetworkService、ClusterService、IngestService及ClusterInfoService等主要模块。 ScriptModule中持有ScriptService通过该服务可以获取到ES中配置的各类脚本引擎的实例。AnalysisModule中持有AnalysisRegistry对象,通过该对象可以获取到ES中配置的各类查询分析器的实例。SettingModule中按类型保存了ES中可以解析的配置对象。NetworkService主要用来解析网络地址,ClusterService用例维护集群的信息。代码如下: final List<Setting<?>> additionalSettings = new ArrayList<>(pluginsService.getPluginSettings()); final List<String> additionalSettingsFilter = new ArrayList<>(pluginsService.getPluginSettingsFilter()); for (final ExecutorBuilder<?> builder : threadPool.builders()) { additionalSettings.addAll(builder.getRegisteredSettings()); } client = new NodeClient(settings, threadPool); final ResourceWatcherService resourceWatcherService = new ResourceWatcherService(settings, threadPool); final ScriptModule scriptModule = ScriptModule.create(settings, this.environment, resourceWatcherService, pluginsService.filterPlugins(ScriptPlugin.class)); AnalysisModule analysisModule = new AnalysisModule(this.environment, pluginsService.filterPlugins(AnalysisPlugin.class)); additionalSettings.addAll(scriptModule.getSettings()); // this is as early as we can validate settings at this point. we already pass them to ScriptModule as well as ThreadPool // so we might be late here already final SettingsModule settingsModule = new SettingsModule(this.settings, additionalSettings, additionalSettingsFilter); scriptModule.registerClusterSettingsListeners(settingsModule.getClusterSettings()); resourcesToClose.add(resourceWatcherService); final NetworkService networkService = new NetworkService(settings, getCustomNameResolvers(pluginsService.filterPlugins(DiscoveryPlugin.class))); final ClusterService clusterService = new ClusterService(settings, settingsModule.getClusterSettings(), threadPool, localNodeFactory::getNode); clusterService.addStateApplier(scriptModule.getScriptService()); resourcesToClose.add(clusterService); final IngestService ingestService = new IngestService(clusterService.getClusterSettings(), settings, threadPool, this.environment, scriptModule.getScriptService(), analysisModule.getAnalysisRegistry(), pluginsService.filterPlugins(IngestPlugin.class)); final ClusterInfoService clusterInfoService = newClusterInfoService(settings, clusterService, threadPool, client); Step 6、创建ModulesBuilder并加入各种Module。 ES使用google开源的Guice管理程序中的依赖。加入ModulesBuilder中的Module有:通过PluginsService获取的插件提供的Module;NodeModule内部持有MonitorService;ClusterModule内部持有ClusterService及相关的ClusterPlugin;IndicesModule内部持有MapperPlugin;SearchModule内部持有相关的SearchPlugin;ActionModule内部持有ThreadPool、ActionPlugin、NodeClient及CircuitBreakerService;GatewayModule;RepositoriesModule内部持有RepositoryPlugin;SttingsModule内部ES可用的各类配置对象等;最好调用modules的createInjector方法创建应用的“依赖注入器”。 Step 7、收集各plugin的LifecycleComponent对象,并出初始化NodeClient。 代码如下: List<LifecycleComponent> pluginLifecycleComponents = pluginComponents.stream() .filter(p -> p instanceof LifecycleComponent) .map(p -> (LifecycleComponent) p).collect(Collectors.toList()); pluginLifecycleComponents.addAll(pluginsService.getGuiceServiceClasses().stream() .map(injector::getInstance).collect(Collectors.toList())); resourcesToClose.addAll(pluginLifecycleComponents); this.pluginLifecycleComponents = Collections.unmodifiableList(pluginLifecycleComponents); client.initialize(injector.getInstance(new Key<Map<GenericAction, TransportAction>>() {}), () -> clusterService.localNode().getId()); if (NetworkModule.HTTP_ENABLED.get(settings)) { logger.debug("initializing HTTP handlers ..."); actionModule.initRestHandlers(() -> clusterService.state().nodes()); } logger.info("initialized"); Step 8、调用Node的Start方法,在该方法内依次调用各重要模块的start方法。 依次启动各个关键服务。代码如下: // hack around dependency injection problem (for now...) injector.getInstance(Discovery.class).setAllocationService(injector.getInstance(AllocationService.class)); pluginLifecycleComponents.forEach(LifecycleComponent::start); injector.getInstance(MappingUpdatedAction.class).setClient(client); injector.getInstance(IndicesService.class).start(); injector.getInstance(IndicesClusterStateService.class).start(); injector.getInstance(IndicesTTLService.class).start(); injector.getInstance(SnapshotsService.class).start(); injector.getInstance(SnapshotShardsService.class).start(); injector.getInstance(RoutingService.class).start(); injector.getInstance(SearchService.class).start(); injector.getInstance(MonitorService.class).start(); final ClusterService clusterService = injector.getInstance(ClusterService.class); final NodeConnectionsService nodeConnectionsService = injector.getInstance(NodeConnectionsService.class); nodeConnectionsService.start(); clusterService.setNodeConnectionsService(nodeConnectionsService); // TODO hack around circular dependencies problems injector.getInstance(GatewayAllocator.class).setReallocation(clusterService, injector.getInstance(RoutingService.class)); injector.getInstance(ResourceWatcherService.class).start(); injector.getInstance(GatewayService.class).start(); Discovery discovery = injector.getInstance(Discovery.class); clusterService.setDiscoverySettings(discovery.getDiscoverySettings()); clusterService.addInitialStateBlock(discovery.getDiscoverySettings().getNoMasterBlock()); clusterService.setClusterStatePublisher(discovery::publish); // start before the cluster service since it adds/removes initial Cluster state blocks final TribeService tribeService = injector.getInstance(TribeService.class); tribeService.start(); // Start the transport service now so the publish address will be added to the local disco node in ClusterService TransportService transportService = injector.getInstance(TransportService.class); transportService.getTaskManager().setTaskResultsService(injector.getInstance(TaskResultsService.class)); transportService.start(); validateNodeBeforeAcceptingRequests(settings, transportService.boundAddress(), pluginsService.filterPlugins(Plugin.class).stream() .flatMap(p -> p.getBootstrapChecks().stream()).collect(Collectors.toList())); clusterService.addStateApplier(transportService.getTaskManager()); clusterService.start(); assert localNodeFactory.getNode() != null; assert transportService.getLocalNode().equals(localNodeFactory.getNode()) : "transportService has a different local node than the factory provided"; assert clusterService.localNode().equals(localNodeFactory.getNode()) : "clusterService has a different local node than the factory provided"; // start after cluster service so the local disco is known discovery.start(); transportService.acceptIncomingRequests(); discovery.startInitialJoin(); // tribe nodes don't have a master so we shouldn't register an observer s final TimeValue initialStateTimeout = DiscoverySettings.INITIAL_STATE_TIMEOUT_SETTING.get(settings); if (initialStateTimeout.millis() > 0) { final ThreadPool thread = injector.getInstance(ThreadPool.class); ClusterState clusterState = clusterService.state(); ClusterStateObserver observer = new ClusterStateObserver(clusterState, clusterService, null, logger, thread.getThreadContext()); if (clusterState.nodes().getMasterNodeId() == null) { logger.debug("waiting to join the cluster. timeout [{}]", initialStateTimeout); final CountDownLatch latch = new CountDownLatch(1); observer.waitForNextChange(new ClusterStateObserver.Listener() { @Override public void onNewClusterState(ClusterState state) { latch.countDown(); } @Override public void onClusterServiceClose() { latch.countDown(); } @Override public void onTimeout(TimeValue timeout) { logger.warn("timed out while waiting for initial discovery state - timeout: {}", initialStateTimeout); latch.countDown(); } }, state -> state.nodes().getMasterNodeId() != null, initialStateTimeout); try { latch.await(); } catch (InterruptedException e) { throw new ElasticsearchTimeoutException("Interrupted while waiting for initial discovery state"); } } } if (NetworkModule.HTTP_ENABLED.get(settings)) { injector.getInstance(HttpServerTransport.class).start(); } if (WRITE_PORTS_FILE_SETTING.get(settings)) { if (NetworkModule.HTTP_ENABLED.get(settings)) { HttpServerTransport http = injector.getInstance(HttpServerTransport.class); writePortsFile("http", http.boundAddress()); } TransportService transport = injector.getInstance(TransportService.class); writePortsFile("transport", transport.boundAddress()); } // start nodes now, after the http server, because it may take some time tribeService.startNodes(); logger.info("started");

资源下载

更多资源
Mario

Mario

马里奥是站在游戏界顶峰的超人气多面角色。马里奥靠吃蘑菇成长,特征是大鼻子、头戴帽子、身穿背带裤,还留着胡子。与他的双胞胎兄弟路易基一起,长年担任任天堂的招牌角色。

腾讯云软件源

腾讯云软件源

为解决软件依赖安装时官方源访问速度慢的问题,腾讯云为一些软件搭建了缓存服务。您可以通过使用腾讯云软件源站来提升依赖包的安装速度。为了方便用户自由搭建服务架构,目前腾讯云软件源站支持公网访问和内网访问。

Rocky Linux

Rocky Linux

Rocky Linux(中文名:洛基)是由Gregory Kurtzer于2020年12月发起的企业级Linux发行版,作为CentOS稳定版停止维护后与RHEL(Red Hat Enterprise Linux)完全兼容的开源替代方案,由社区拥有并管理,支持x86_64、aarch64等架构。其通过重新编译RHEL源代码提供长期稳定性,采用模块化包装和SELinux安全架构,默认包含GNOME桌面环境及XFS文件系统,支持十年生命周期更新。

WebStorm

WebStorm

WebStorm 是jetbrains公司旗下一款JavaScript 开发工具。目前已经被广大中国JS开发者誉为“Web前端开发神器”、“最强大的HTML5编辑器”、“最智能的JavaScript IDE”等。与IntelliJ IDEA同源,继承了IntelliJ IDEA强大的JS部分的功能。

用户登录
用户注册