您现在的位置是:首页 > 文章详情

Elasticsearch 6.1.0 启动过程

日期:2018-12-04点击:435

开篇

这篇文章主要目的是想梳理下elasticserach在启动过程中的核心步骤,宏观上讲解清楚elasticsearch启动过程中都做了哪些事情。

原本想通过流程图来进行画,后来网上有人通过xmind来分析整个过程,发现也能够讲解的非常清楚,因此同样采用xmind来自上而下讲解整个过程。


启动过程图

说明:

  • 1.通过XMind记录ES启动流程的整个过程。
  • 2.阅读顺序从上往下,标红色旗子的部分是核心流程
  • 3.核心流程我概括为:配置加载;Bootstrap 初始化; Bootstrap setup过程;Bootstrap start过程。
  • 4.每个步骤当中细分下去很多逻辑,这里只讲解能够串联整个过程的逻辑。

elasticsearch 启动过程


配置加载过程


Bootstrap 初始化

  • Elasticsearch的一个重要作用是解析命令参数。
  • 执行带 -h 参数的Elasticsearch启动命令。
  • Elasticsearch的构造函数如下所示,跟帮助信息是一致的。
 // elasticsearch启动命令帮助 Elasticsearch() { super("starts elasticsearch", () -> {}); versionOption = parser.acceptsAll(Arrays.asList("V", "version"), "Prints elasticsearch version information and exits"); daemonizeOption = parser.acceptsAll(Arrays.asList("d", "daemonize"), "Starts Elasticsearch in the background") .availableUnless(versionOption); pidfileOption = parser.acceptsAll(Arrays.asList("p", "pidfile"), "Creates a pid file in the specified path on start") .availableUnless(versionOption) .withRequiredArg() .withValuesConvertedBy(new PathConverter()); quietOption = parser.acceptsAll(Arrays.asList("q", "quiet"), "Turns off standard output/error streams logging in console") .availableUnless(versionOption) .availableUnless(daemonizeOption); }



Elasticsearch.main过程

 // elasticsearch启动入口函数 public static void main(final String[] args) throws Exception { System.setSecurityManager(new SecurityManager() { @Override public void checkPermission(Permission perm) { } }); LogConfigurator.registerErrorListener(); final Elasticsearch elasticsearch = new Elasticsearch(); int status = main(args, elasticsearch, Terminal.DEFAULT); if (status != ExitCodes.OK) { exit(status); } } // 调用elasticsearch对象的main函数 static int main(final String[] args, final Elasticsearch elasticsearch, final Terminal terminal) throws Exception { return elasticsearch.main(args, terminal); }
  • 1.创建 SecurityManager 安全管理器
  • 2.LogConfigurator.registerErrorListener() 注册侦听器
  • 3.创建Elasticsearch对象
  • 4.进入elasticsearch.main()过程。



elasticsearch.main过程

说明:

  • Elasticsearch继承关系如上图。
  • elasticsearch.main()过程当中会调用EnvironmentAwareCommand、Command等类。
class Elasticsearch extends EnvironmentAwareCommand { static int main(final String[] args, final Elasticsearch elasticsearch, final Terminal terminal) throws Exception { return elasticsearch.main(args, terminal); } } public abstract class Command implements Closeable { public final int main(String[] args, Terminal terminal) throws Exception { if (addShutdownHook()) { shutdownHookThread = new Thread(() -> { try { this.close(); } catch (final IOException e) { try ( StringWriter sw = new StringWriter(); PrintWriter pw = new PrintWriter(sw)) { e.printStackTrace(pw); terminal.println(sw.toString()); } catch (final IOException impossible) { } } }); Runtime.getRuntime().addShutdownHook(shutdownHookThread); } beforeMain.run(); try { mainWithoutErrorHandling(args, terminal); } catch (OptionException e) { return ExitCodes.USAGE; } catch (UserException e) { return e.exitCode; } return ExitCodes.OK; } void mainWithoutErrorHandling(String[] args, Terminal terminal) throws Exception { final OptionSet options = parser.parse(args); if (options.has(helpOption)) { printHelp(terminal); return; } if (options.has(silentOption)) { terminal.setVerbosity(Terminal.Verbosity.SILENT); } else if (options.has(verboseOption)) { terminal.setVerbosity(Terminal.Verbosity.VERBOSE); } else { terminal.setVerbosity(Terminal.Verbosity.NORMAL); } execute(terminal, options); } }
  • 5.elasticsearch.main()直接进入Command.main()方法。
  • 6.Command.main()给Runtime类添加一个hook线程,该线程作用是:当Runtime异常关闭时打印异常信息。
  • 7.Command.mainWithoutErrorHandling 方法,根据命令行参数打印或者设置参数,然后执行命令。
  • 8.进入EnvironmentAwareCommand.execute()方法。

EnvironmentAwareCommand.execute过程

public abstract class EnvironmentAwareCommand extends Command { protected void execute(Terminal terminal, OptionSet options) throws Exception { final Map<String, String> settings = new HashMap<>(); for (final KeyValuePair kvp : settingOption.values(options)) { if (kvp.value.isEmpty()) { throw new UserException(ExitCodes.USAGE, "setting [" + kvp.key + "] must not be empty"); } if (settings.containsKey(kvp.key)) { final String message = String.format( Locale.ROOT, "setting [%s] already set, saw [%s] and [%s]", kvp.key, settings.get(kvp.key), kvp.value); throw new UserException(ExitCodes.USAGE, message); } settings.put(kvp.key, kvp.value); } putSystemPropertyIfSettingIsMissing(settings, "path.data", "es.path.data"); putSystemPropertyIfSettingIsMissing(settings, "path.home", "es.path.home"); putSystemPropertyIfSettingIsMissing(settings, "path.logs", "es.path.logs"); execute(terminal, options, createEnv(terminal, settings)); } protected Environment createEnv(final Terminal terminal, final Map<String, String> settings) throws UserException { final String esPathConf = System.getProperty("es.path.conf"); if (esPathConf == null) { throw new UserException(ExitCodes.CONFIG, "the system property [es.path.conf] must be set"); } return InternalSettingsPreparer.prepareEnvironment(Settings.EMPTY, terminal, settings, getConfigPath(esPathConf)); } protected abstract void execute(Terminal terminal, OptionSet options, Environment env) throws Exception; }
  • 9.EnvironmentAwareCommand.execute,确保 es.path.data, es.path.home, es.path.logs 等参数已设置,否则从 System.properties 中读取。
  • 10.createEnv最后返回一个 Environment 对象,包含plugins,bin,lib,modules等目录下的文件信息
  • 11.进入elasticsearch的execute()方法。



prepareEnvironment过程

public class InternalSettingsPreparer { public static Environment prepareEnvironment(Settings input, Terminal terminal, Map<String, String> properties, Path configPath) { // just create enough settings to build the environment, to get the config dir Settings.Builder output = Settings.builder(); initializeSettings(output, input, properties); Environment environment = new Environment(output.build(), configPath); if (Files.exists(environment.configFile().resolve("elasticsearch.yaml"))) { throw new SettingsException("elasticsearch.yaml was deprecated in 5.5.0 and must be renamed to elasticsearch.yml"); } if (Files.exists(environment.configFile().resolve("elasticsearch.json"))) { throw new SettingsException("elasticsearch.json was deprecated in 5.5.0 and must be converted to elasticsearch.yml"); } output = Settings.builder(); // start with a fresh output Path path = environment.configFile().resolve("elasticsearch.yml"); if (Files.exists(path)) { try { output.loadFromPath(path); } catch (IOException e) { throw new SettingsException("Failed to load settings from " + path.toString(), e); } } // re-initialize settings now that the config file has been loaded initializeSettings(output, input, properties); finalizeSettings(output, terminal); environment = new Environment(output.build(), configPath); // we put back the path.logs so we can use it in the logging configuration file output.put(Environment.PATH_LOGS_SETTING.getKey(), environment.logsFile().toAbsolutePath().normalize().toString()); return new Environment(output.build(), configPath); } } public class Environment { public Environment(final Settings settings, final Path configPath) { final Path homeFile; if (PATH_HOME_SETTING.exists(settings)) { homeFile = PathUtils.get(PATH_HOME_SETTING.get(settings)).normalize(); } else { throw new IllegalStateException(PATH_HOME_SETTING.getKey() + " is not configured"); } if (configPath != null) { configFile = configPath.normalize(); } else { configFile = homeFile.resolve("config"); } pluginsFile = homeFile.resolve("plugins"); List<String> dataPaths = PATH_DATA_SETTING.get(settings); final ClusterName clusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings); if (DiscoveryNode.nodeRequiresLocalStorage(settings)) { if (dataPaths.isEmpty() == false) { dataFiles = new Path[dataPaths.size()]; dataWithClusterFiles = new Path[dataPaths.size()]; for (int i = 0; i < dataPaths.size(); i++) { dataFiles[i] = PathUtils.get(dataPaths.get(i)); dataWithClusterFiles[i] = dataFiles[i].resolve(clusterName.value()); } } else { dataFiles = new Path[]{homeFile.resolve("data")}; dataWithClusterFiles = new Path[]{homeFile.resolve("data").resolve(clusterName.value())}; } } else { if (dataPaths.isEmpty()) { dataFiles = dataWithClusterFiles = EMPTY_PATH_ARRAY; } else { final String paths = String.join(",", dataPaths); throw new IllegalStateException("node does not require local storage yet path.data is set to [" + paths + "]"); } } if (PATH_SHARED_DATA_SETTING.exists(settings)) { sharedDataFile = PathUtils.get(PATH_SHARED_DATA_SETTING.get(settings)).normalize(); } else { sharedDataFile = null; } List<String> repoPaths = PATH_REPO_SETTING.get(settings); if (repoPaths.isEmpty()) { repoFiles = EMPTY_PATH_ARRAY; } else { repoFiles = new Path[repoPaths.size()]; for (int i = 0; i < repoPaths.size(); i++) { repoFiles[i] = PathUtils.get(repoPaths.get(i)); } } // this is trappy, Setting#get(Settings) will get a fallback setting yet return false for Settings#exists(Settings) if (PATH_LOGS_SETTING.exists(settings)) { logsFile = PathUtils.get(PATH_LOGS_SETTING.get(settings)).normalize(); } else { logsFile = homeFile.resolve("logs"); } if (PIDFILE_SETTING.exists(settings)) { pidFile = PathUtils.get(PIDFILE_SETTING.get(settings)).normalize(); } else { pidFile = null; } binFile = homeFile.resolve("bin"); libFile = homeFile.resolve("lib"); modulesFile = homeFile.resolve("modules"); Settings.Builder finalSettings = Settings.builder().put(settings); finalSettings.put(PATH_HOME_SETTING.getKey(), homeFile); if (PATH_DATA_SETTING.exists(settings)) { finalSettings.putList(PATH_DATA_SETTING.getKey(), dataPaths); } finalSettings.put(PATH_LOGS_SETTING.getKey(), logsFile.toString()); this.settings = finalSettings.build(); } }
  • 12.createEnv最后返回一个 Environment 对象。



elasticsearch.execute过程

class Elasticsearch extends EnvironmentAwareCommand { protected void execute(Terminal terminal, OptionSet options, Environment env) throws UserException { if (options.nonOptionArguments().isEmpty() == false) { } if (options.has(versionOption)) { terminal.println("Version: " + Version.displayVersion(Version.CURRENT, Build.CURRENT.isSnapshot()) + ", Build: " + Build.CURRENT.shortHash() + "/" + Build.CURRENT.date() + ", JVM: " + JvmInfo.jvmInfo().version()); return; } final boolean daemonize = options.has(daemonizeOption); final Path pidFile = pidfileOption.value(options); final boolean quiet = options.has(quietOption); try { init(daemonize, pidFile, quiet, env); } catch (NodeValidationException e) { } } void init(final boolean daemonize, final Path pidFile, final boolean quiet, Environment initialEnv) throws NodeValidationException, UserException { try { Bootstrap.init(!daemonize, pidFile, quiet, initialEnv); } catch (BootstrapException | RuntimeException e) { } } }
  • 13.Elasticsearch.execute ,读取daemonize, pidFile,quiet 的值,并 确保配置的临时目录(temp)是有效目录
  • 14.进入Bootstrap初始化阶段


Bootstrap init过程

 static void init( final boolean foreground, final Path pidFile, final boolean quiet, final Environment initialEnv) throws BootstrapException, NodeValidationException, UserException { BootstrapInfo.init(); INSTANCE = new Bootstrap(); final SecureSettings keystore = loadSecureSettings(initialEnv); final Environment environment = createEnvironment(foreground, pidFile, keystore, initialEnv.settings(), initialEnv.configFile()); try { LogConfigurator.configure(environment); } catch (IOException e) { throw new BootstrapException(e); } if (environment.pidFile() != null) { try { PidFile.create(environment.pidFile(), true); } catch (IOException e) { throw new BootstrapException(e); } } final boolean closeStandardStreams = (foreground == false) || quiet; try { if (closeStandardStreams) { final Logger rootLogger = ESLoggerFactory.getRootLogger(); final Appender maybeConsoleAppender = Loggers.findAppender(rootLogger, ConsoleAppender.class); if (maybeConsoleAppender != null) { Loggers.removeAppender(rootLogger, maybeConsoleAppender); } closeSystOut(); } checkLucene(); Thread.setDefaultUncaughtExceptionHandler( new ElasticsearchUncaughtExceptionHandler(() -> Node.NODE_NAME_SETTING.get(environment.settings()))); INSTANCE.setup(true, environment); try { IOUtils.close(keystore); } catch (IOException e) { throw new BootstrapException(e); } INSTANCE.start(); if (closeStandardStreams) { closeSysError(); } } catch (NodeValidationException | RuntimeException e) { throw e; } }
  • 15.创建Bootstrap对象, INSTANCE = new Bootstrap()。
  • 16.初始化Bootstrap对象,INSTANCE.setup(true, environment)。
  • 17.启动Bootstrap对象INSTANCE.start()。


Bootstrap new过程

 Bootstrap() { keepAliveThread = new Thread(new Runnable() { @Override public void run() { try { keepAliveLatch.await(); } catch (InterruptedException e) { // bail out } } }, "elasticsearch[keepAlive/" + Version.CURRENT + "]"); keepAliveThread.setDaemon(false); // keep this thread alive (non daemon thread) until we shutdown Runtime.getRuntime().addShutdownHook(new Thread() { @Override public void run() { keepAliveLatch.countDown(); } }); }
  • 18.创建Bootstrap对象,该类构造函数会创建一个用户线程添加到Runtime Hook中,进行 countDown 操作。


Bootstrap setup过程

 private void setup(boolean addShutdownHook, Environment environment) throws BootstrapException { Settings settings = environment.settings(); try { spawner.spawnNativePluginControllers(environment); } catch (IOException e) { throw new BootstrapException(e); } initializeNatives( environment.tmpFile(), BootstrapSettings.MEMORY_LOCK_SETTING.get(settings), BootstrapSettings.SYSTEM_CALL_FILTER_SETTING.get(settings), BootstrapSettings.CTRLHANDLER_SETTING.get(settings)); // initialize probes before the security manager is installed initializeProbes(); if (addShutdownHook) { Runtime.getRuntime().addShutdownHook(new Thread() { @Override public void run() { try { IOUtils.close(node, spawner); LoggerContext context = (LoggerContext) LogManager.getContext(false); Configurator.shutdown(context); } catch (IOException ex) { throw new ElasticsearchException("failed to stop node", ex); } } }); } try { JarHell.checkJarHell(); } catch (IOException | URISyntaxException e) { } IfConfig.logIfNecessary(); try { Security.configure(environment, BootstrapSettings.SECURITY_FILTER_BAD_DEFAULTS_SETTING.get(settings)); } catch (IOException | NoSuchAlgorithmException e) { } node = new Node(environment) { @Override protected void validateNodeBeforeAcceptingRequests( final BootstrapContext context, final BoundTransportAddress boundTransportAddress, List<BootstrapCheck> checks) throws NodeValidationException { BootstrapChecks.check(context, boundTransportAddress, checks); } }; }
  • 19.为每个插件生成生成本机控制类,spawner.spawnNativePluginControllers(environment);尝试为给定模块生成控制器(native Controller)守护程序。 生成的进程将通过其stdin,stdout和stderr流保持与此JVM的连接,但对此包之外的代码不能使用对这些流的引用。
  • 20.初始化本地资源 initializeNatives()。
  • 21.使用 JarHell 检查重复的 jar 文件 JarHell.checkJarHell()。
  • 22.创建 node 节点 new Node(environment),核心!!!


Bootstrap start过程

 private void start() throws NodeValidationException { node.start(); keepAliveThread.start(); }
  • 23.启动node对象,node.start()。
  • 24.启动守护进程,keepAliveThread.start()。


node创建过程

 protected Node(final Environment environment, Collection<Class<? extends Plugin>> classpathPlugins) { try { Settings tmpSettings = Settings.builder().put(environment.settings()) .put(Client.CLIENT_TYPE_SETTING_S.getKey(), CLIENT_TYPE).build(); try { nodeEnvironment = new NodeEnvironment(tmpSettings, environment); resourcesToClose.add(nodeEnvironment); } catch (IOException ex) { } final boolean hadPredefinedNodeName = NODE_NAME_SETTING.exists(tmpSettings); Logger logger = Loggers.getLogger(Node.class, tmpSettings); final String nodeId = nodeEnvironment.nodeId(); tmpSettings = addNodeNameIfNeeded(tmpSettings, nodeId); final JvmInfo jvmInfo = JvmInfo.jvmInfo(); this.pluginsService = new PluginsService(tmpSettings, environment.configFile(), environment.modulesFile(), environment.pluginsFile(), classpathPlugins); this.settings = pluginsService.updatedSettings(); localNodeFactory = new LocalNodeFactory(settings, nodeEnvironment.nodeId()); this.environment = new Environment(this.settings, environment.configFile()); final List<ExecutorBuilder<?>> executorBuilders = pluginsService.getExecutorBuilders(settings); final ThreadPool threadPool = new ThreadPool(settings, executorBuilders.toArray(new ExecutorBuilder[0])); resourcesToClose.add(() -> ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS)); DeprecationLogger.setThreadContext(threadPool.getThreadContext()); resourcesToClose.add(() -> DeprecationLogger.removeThreadContext(threadPool.getThreadContext())); final List<Setting<?>> additionalSettings = new ArrayList<>(pluginsService.getPluginSettings()); final List<String> additionalSettingsFilter = new ArrayList<>(pluginsService.getPluginSettingsFilter()); for (final ExecutorBuilder<?> builder : threadPool.builders()) { additionalSettings.addAll(builder.getRegisteredSettings()); } client = new NodeClient(settings, threadPool); .................. }
  • 25.创建一个 NodeEnvironment 对象保存节点环境信息,如各种数据文件的路径
  • 26.读取JVM信息
  • 27.创建 PluginsService 对象,创建过程中会读取并加载所有的模块和插件
  • 28.创建一个最终的 Environment 对象
  • 29.创建线程池 ThreadPool 后面各类对象基本都是通过线程来提供服务,这个线程池可以管理各类线程
  • 30.创建 节点客户端 NodeClient
 protected Node(final Environment environment, Collection<Class<? extends Plugin>> classpathPlugins) { .................. final ResourceWatcherService resourceWatcherService = new ResourceWatcherService(settings, threadPool); final ScriptModule scriptModule = new ScriptModule(settings, pluginsService.filterPlugins(ScriptPlugin.class)); AnalysisModule analysisModule = new AnalysisModule(this.environment, pluginsService.filterPlugins(AnalysisPlugin.class)); final SettingsModule settingsModule = new SettingsModule(this.settings, additionalSettings, additionalSettingsFilter); scriptModule.registerClusterSettingsListeners(settingsModule.getClusterSettings()); resourcesToClose.add(resourceWatcherService); final NetworkService networkService = new NetworkService( getCustomNameResolvers(pluginsService.filterPlugins(DiscoveryPlugin.class))); List<ClusterPlugin> clusterPlugins = pluginsService.filterPlugins(ClusterPlugin.class); final ClusterService clusterService = new ClusterService(settings, settingsModule.getClusterSettings(), threadPool, ClusterModule.getClusterStateCustomSuppliers(clusterPlugins)); clusterService.addListener(scriptModule.getScriptService()); resourcesToClose.add(clusterService); final IngestService ingestService = new IngestService(settings, threadPool, this.environment, scriptModule.getScriptService(), analysisModule.getAnalysisRegistry(), pluginsService.filterPlugins(IngestPlugin.class)); final DiskThresholdMonitor listener = new DiskThresholdMonitor(settings, clusterService::state, clusterService.getClusterSettings(), client); final ClusterInfoService clusterInfoService = newClusterInfoService(settings, clusterService, threadPool, client, listener::onNewInfo); final UsageService usageService = new UsageService(settings); }
  • 31.创建各种服务类对象 ResourceWatcherService、NetworkService、ClusterService、IngestService、ClusterInfoService、UsageService、MonitorService、CircuitBreakerService、MetaStateService、IndicesService、MetaDataIndexUpgradeService、TemplateUpgradeService、TransportService、ResponseCollectorService、SearchTransportService、NodeService、SearchService、PersistentTasksClusterService,这些服务类是的功能可以根据名称做一个大概的判断。
 protected Node(final Environment environment, Collection<Class<? extends Plugin>> classpathPlugins) { .................. ModulesBuilder modules = new ModulesBuilder(); // plugin modules must be added here, before others or we can get crazy injection errors... for (Module pluginModule : pluginsService.createGuiceModules()) { modules.add(pluginModule); } final MonitorService monitorService = new MonitorService(settings, nodeEnvironment, threadPool, clusterInfoService); ClusterModule clusterModule = new ClusterModule(settings, clusterService, clusterPlugins, clusterInfoService); modules.add(clusterModule); IndicesModule indicesModule = new IndicesModule(pluginsService.filterPlugins(MapperPlugin.class)); modules.add(indicesModule); SearchModule searchModule = new SearchModule(settings, false, pluginsService.filterPlugins(SearchPlugin.class)); CircuitBreakerService circuitBreakerService = createCircuitBreakerService(settingsModule.getSettings(), settingsModule.getClusterSettings()); resourcesToClose.add(circuitBreakerService); modules.add(new GatewayModule()); ActionModule actionModule = new ActionModule(false, settings, clusterModule.getIndexNameExpressionResolver(), settingsModule.getIndexScopedSettings(), settingsModule.getClusterSettings(), settingsModule.getSettingsFilter(), threadPool, pluginsService.filterPlugins(ActionPlugin.class), client, circuitBreakerService, usageService); modules.add(actionModule); final RestController restController = actionModule.getRestController(); final NetworkModule networkModule = new NetworkModule(settings, false, pluginsService.filterPlugins(NetworkPlugin.class), threadPool, bigArrays, circuitBreakerService, namedWriteableRegistry, xContentRegistry, networkService, restController); Collection<UnaryOperator<Map<String, MetaData.Custom>>> customMetaDataUpgraders = pluginsService.filterPlugins(Plugin.class) .stream().map(Plugin::getCustomMetaDataUpgrader) .collect(Collectors.toList()); Collection<UnaryOperator<Map<String, IndexTemplateMetaData>>> indexTemplateMetaDataUpgraders = pluginsService.filterPlugins(Plugin.class).stream() .map(Plugin::getIndexTemplateMetaDataUpgrader) .collect(Collectors.toList()); Collection<UnaryOperator<IndexMetaData>> indexMetaDataUpgraders = pluginsService.filterPlugins(Plugin.class) .stream().map(Plugin::getIndexMetaDataUpgrader).collect(Collectors.toList()); final MetaDataUpgrader metaDataUpgrader = new MetaDataUpgrader(customMetaDataUpgraders, indexTemplateMetaDataUpgraders); final MetaDataIndexUpgradeService metaDataIndexUpgradeService = new MetaDataIndexUpgradeService(settings, xContentRegistry,indicesModule.getMapperRegistry(), settingsModule.getIndexScopedSettings(), indexMetaDataUpgraders); final GatewayMetaState gatewayMetaState = new GatewayMetaState(settings, nodeEnvironment, metaStateService, metaDataIndexUpgradeService, metaDataUpgrader); new TemplateUpgradeService(settings, client, clusterService, threadPool, indexTemplateMetaDataUpgraders); final Transport transport = networkModule.getTransportSupplier().get(); final TransportService transportService = newTransportService(settings, transport, threadPool, networkModule.getTransportInterceptor(), localNodeFactory, settingsModule.getClusterSettings()); final ResponseCollectorService responseCollectorService = new ResponseCollectorService(this.settings, clusterService); final SearchTransportService searchTransportService = new SearchTransportService(settings, transportService,SearchExecutionStatsCollector.makeWrapper(responseCollectorService)); final Consumer<Binder> httpBind; final HttpServerTransport httpServerTransport; if (networkModule.isHttpEnabled()) { httpServerTransport = networkModule.getHttpServerTransportSupplier().get(); httpBind = b -> { b.bind(HttpServerTransport.class).toInstance(httpServerTransport); }; } else { httpBind = b -> { b.bind(HttpServerTransport.class).toProvider(Providers.of(null)); }; httpServerTransport = null; } final DiscoveryModule discoveryModule = new DiscoveryModule(this.settings, threadPool, transportService, namedWriteableRegistry, networkService, clusterService.getMasterService(), clusterService.getClusterApplierService(), clusterService.getClusterSettings(), pluginsService.filterPlugins(DiscoveryPlugin.class), clusterModule.getAllocationService()); this.nodeService = new NodeService(settings, threadPool, monitorService, discoveryModule.getDiscovery(), transportService, indicesService, pluginsService, circuitBreakerService, scriptModule.getScriptService(), httpServerTransport, ingestService, clusterService, settingsModule.getSettingsFilter(), responseCollectorService, searchTransportService);
  • 32.ModulesBuilder类加入各种模块 ScriptModule、AnalysisModule、SettingsModule、pluginModule、ClusterModule、IndicesModule、SearchModule、GatewayModule、RepositoriesModule、ActionModule、NetworkModule、DiscoveryModule
 protected Node(final Environment environment, Collection<Class<? extends Plugin>> classpathPlugins) { .................. modules.add(b -> { b.bind(Node.class).toInstance(this); b.bind(NodeService.class).toInstance(nodeService); b.bind(NamedXContentRegistry.class).toInstance(xContentRegistry); b.bind(PluginsService.class).toInstance(pluginsService); b.bind(Client.class).toInstance(client); b.bind(NodeClient.class).toInstance(client); b.bind(Environment.class).toInstance(this.environment); b.bind(ThreadPool.class).toInstance(threadPool); b.bind(NodeEnvironment.class).toInstance(nodeEnvironment); b.bind(ResourceWatcherService.class).toInstance(resourceWatcherService); b.bind(CircuitBreakerService.class).toInstance(circuitBreakerService); b.bind(BigArrays.class).toInstance(bigArrays); b.bind(ScriptService.class).toInstance(scriptModule.getScriptService()); b.bind(AnalysisRegistry.class).toInstance(analysisModule.getAnalysisRegistry()); b.bind(IngestService.class).toInstance(ingestService); b.bind(UsageService.class).toInstance(usageService); b.bind(NamedWriteableRegistry.class).toInstance(namedWriteableRegistry); b.bind(MetaDataUpgrader.class).toInstance(metaDataUpgrader); b.bind(MetaStateService.class).toInstance(metaStateService); b.bind(IndicesService.class).toInstance(indicesService); b.bind(SearchService.class).toInstance(newSearchService(clusterService, indicesService, threadPool, scriptModule.getScriptService(), bigArrays, searchModule.getFetchPhase(), responseCollectorService)); b.bind(SearchTransportService.class).toInstance(searchTransportService); b.bind(SearchPhaseController.class).toInstance(new SearchPhaseController(settings, bigArrays, scriptModule.getScriptService())); b.bind(Transport.class).toInstance(transport); b.bind(TransportService.class).toInstance(transportService); b.bind(NetworkService.class).toInstance(networkService); b.bind(UpdateHelper.class).toInstance(new UpdateHelper(settings, scriptModule.getScriptService())); b.bind(MetaDataIndexUpgradeService.class).toInstance(metaDataIndexUpgradeService); b.bind(ClusterInfoService.class).toInstance(clusterInfoService); b.bind(GatewayMetaState.class).toInstance(gatewayMetaState); b.bind(Discovery.class).toInstance(discoveryModule.getDiscovery()); { RecoverySettings recoverySettings = new RecoverySettings(settings, settingsModule.getClusterSettings()); processRecoverySettings(settingsModule.getClusterSettings(), recoverySettings); b.bind(PeerRecoverySourceService.class).toInstance( new PeerRecoverySourceService(settings, transportService, indicesService, recoverySettings)); b.bind(PeerRecoveryTargetService.class).toInstance( new PeerRecoveryTargetService(settings, threadPool, transportService, recoverySettings, clusterService)); } httpBind.accept(b); pluginComponents.stream().forEach(p -> b.bind((Class) p.getClass()).toInstance(p)); } ); injector = modules.createInjector(); clusterModule.getAllocationService().setGatewayAllocator(injector.getInstance(GatewayAllocator.class)); List<LifecycleComponent> pluginLifecycleComponents = pluginComponents.stream() .filter(p -> p instanceof LifecycleComponent) .map(p -> (LifecycleComponent) p).collect(Collectors.toList()); pluginLifecycleComponents.addAll(pluginsService.getGuiceServiceClasses().stream() .map(injector::getInstance).collect(Collectors.toList())); resourcesToClose.addAll(pluginLifecycleComponents); this.pluginLifecycleComponents = Collections.unmodifiableList(pluginLifecycleComponents); client.initialize(injector.getInstance(new Key<Map<GenericAction, TransportAction>>() {}), () -> clusterService.localNode().getId()); if (NetworkModule.HTTP_ENABLED.get(settings)) { logger.debug("initializing HTTP handlers ..."); actionModule.initRestHandlers(() -> clusterService.state().nodes()); } success = true; } catch (IOException ex) { } finally { } }
  • 33.elasticsearch里面的组件基本都进行进行了模块化管理,elasticsearch对guice进行了封装通过ModulesBuilder类构建es的模块.


node启动过程

 public Node start() throws NodeValidationException { if (!lifecycle.moveToStarted()) { return this; } Logger logger = Loggers.getLogger(Node.class, NODE_NAME_SETTING.get(settings)); logger.info("starting ..."); pluginLifecycleComponents.forEach(LifecycleComponent::start); injector.getInstance(MappingUpdatedAction.class).setClient(client); injector.getInstance(IndicesService.class).start(); injector.getInstance(IndicesClusterStateService.class).start(); injector.getInstance(SnapshotsService.class).start(); injector.getInstance(SnapshotShardsService.class).start(); injector.getInstance(RoutingService.class).start(); injector.getInstance(SearchService.class).start(); nodeService.getMonitorService().start(); final ClusterService clusterService = injector.getInstance(ClusterService.class); final NodeConnectionsService nodeConnectionsService = injector.getInstance(NodeConnectionsService.class); nodeConnectionsService.start(); clusterService.setNodeConnectionsService(nodeConnectionsService); injector.getInstance(ResourceWatcherService.class).start(); injector.getInstance(GatewayService.class).start(); Discovery discovery = injector.getInstance(Discovery.class); clusterService.getMasterService().setClusterStatePublisher(discovery::publish); // Start the transport service now so the publish address will be added to the local disco node in ClusterService TransportService transportService = injector.getInstance(TransportService.class); transportService.getTaskManager().setTaskResultsService(injector.getInstance(TaskResultsService.class)); transportService.start(); assert localNodeFactory.getNode() != null; assert transportService.getLocalNode().equals(localNodeFactory.getNode()) : "transportService has a different local node than the factory provided"; final MetaData onDiskMetadata; try { // we load the global state here (the persistent part of the cluster state stored on disk) to // pass it to the bootstrap checks to allow plugins to enforce certain preconditions based on the recovered state. if (DiscoveryNode.isMasterNode(settings) || DiscoveryNode.isDataNode(settings)) { onDiskMetadata = injector.getInstance(GatewayMetaState.class).loadMetaState(); } else { onDiskMetadata = MetaData.EMPTY_META_DATA; } assert onDiskMetadata != null : "metadata is null but shouldn't"; // this is never null } catch (IOException e) { throw new UncheckedIOException(e); } validateNodeBeforeAcceptingRequests(new BootstrapContext(settings, onDiskMetadata), transportService.boundAddress(), pluginsService .filterPlugins(Plugin .class) .stream() .flatMap(p -> p.getBootstrapChecks().stream()).collect(Collectors.toList())); clusterService.addStateApplier(transportService.getTaskManager()); // start after transport service so the local disco is known discovery.start(); // start before cluster service so that it can set initial state on ClusterApplierService clusterService.start(); assert clusterService.localNode().equals(localNodeFactory.getNode()) : "clusterService has a different local node than the factory provided"; transportService.acceptIncomingRequests(); discovery.startInitialJoin(); // tribe nodes don't have a master so we shouldn't register an observer s final TimeValue initialStateTimeout = DiscoverySettings.INITIAL_STATE_TIMEOUT_SETTING.get(settings); if (initialStateTimeout.millis() > 0) { final ThreadPool thread = injector.getInstance(ThreadPool.class); ClusterState clusterState = clusterService.state(); ClusterStateObserver observer = new ClusterStateObserver(clusterState, clusterService, null, logger, thread.getThreadContext()); if (clusterState.nodes().getMasterNodeId() == null) { logger.debug("waiting to join the cluster. timeout [{}]", initialStateTimeout); final CountDownLatch latch = new CountDownLatch(1); observer.waitForNextChange(new ClusterStateObserver.Listener() { @Override public void onNewClusterState(ClusterState state) { latch.countDown(); } @Override public void onClusterServiceClose() { latch.countDown(); } @Override public void onTimeout(TimeValue timeout) { logger.warn("timed out while waiting for initial discovery state - timeout: {}", initialStateTimeout); latch.countDown(); } }, state -> state.nodes().getMasterNodeId() != null, initialStateTimeout); try { latch.await(); } catch (InterruptedException e) { throw new ElasticsearchTimeoutException("Interrupted while waiting for initial discovery state"); } } } if (NetworkModule.HTTP_ENABLED.get(settings)) { injector.getInstance(HttpServerTransport.class).start(); } if (WRITE_PORTS_FILE_SETTING.get(settings)) { if (NetworkModule.HTTP_ENABLED.get(settings)) { HttpServerTransport http = injector.getInstance(HttpServerTransport.class); writePortsFile("http", http.boundAddress()); } TransportService transport = injector.getInstance(TransportService.class); writePortsFile("transport", transport.boundAddress()); } pluginsService.filterPlugins(ClusterPlugin.class).forEach(ClusterPlugin::onNodeStarted); return this; }
  • 34.通过 injector 获取各个类的对象,调用 start() 方法启动(实际进入各个类的中 doStart 方法): LifecycleComponent、IndicesService、IndicesClusterStateService、SnapshotsService、SnapshotShardsService、RoutingService、SearchService、MonitorService、NodeConnectionsService、ResourceWatcherService、GatewayService、Discovery、TransportService
  • 35.IndicesService:索引管理
    IndicesClusterStateService:跨集群同步

SnapshotsService:负责创建快照
SnapshotShardsService:此服务在数据和主节点上运行,并控制这些节点上当前快照的分片。
RoutingService:侦听集群状态,当它收到集群改变事件将验证集群状态,路由表可能会更新
SearchService:搜索服务
MonitorService:监控
NodeConnectionsService:此组件负责在节点添加到群集状态后连接到节点,并在删除它们时断开连接。 此外,它会定期检查所有连接是否仍处于打开状态,并在需要时还原它们。 请注意,如果节点断开/不响应ping,则此组件不负责从群集中删除节点。 这是由NodesFaultDetection完成的。 主故障检测由链接MasterFaultDetection完成。
ResourceWatcherService:通用资源观察器服务
GatewayService:网关

  • 36.集群发现与监控等,启动 HttpServerTransport, 绑定服务端口。


线程池

 public ThreadPool(final Settings settings, final ExecutorBuilder<?>... customBuilders) { super(settings); assert Node.NODE_NAME_SETTING.exists(settings); final Map<String, ExecutorBuilder> builders = new HashMap<>(); final int availableProcessors = EsExecutors.numberOfProcessors(settings); final int halfProcMaxAt5 = halfNumberOfProcessorsMaxFive(availableProcessors); final int halfProcMaxAt10 = halfNumberOfProcessorsMaxTen(availableProcessors); final int genericThreadPoolMax = boundedBy(4 * availableProcessors, 128, 512); builders.put(Names.GENERIC, new ScalingExecutorBuilder(Names.GENERIC, 4, genericThreadPoolMax, TimeValue.timeValueSeconds(30))); builders.put(Names.INDEX, new FixedExecutorBuilder(settings, Names.INDEX, availableProcessors, 200)); builders.put(Names.BULK, new FixedExecutorBuilder(settings, Names.BULK, availableProcessors, 200)); builders.put(Names.GET, new FixedExecutorBuilder(settings, Names.GET, availableProcessors, 1000)); builders.put(Names.SEARCH, new AutoQueueAdjustingExecutorBuilder(settings, Names.SEARCH, searchThreadPoolSize(availableProcessors), 1000, 1000, 1000, 2000)); builders.put(Names.MANAGEMENT, new ScalingExecutorBuilder(Names.MANAGEMENT, 1, 5, TimeValue.timeValueMinutes(5))); builders.put(Names.LISTENER, new FixedExecutorBuilder(settings, Names.LISTENER, halfProcMaxAt10, -1)); builders.put(Names.FLUSH, new ScalingExecutorBuilder(Names.FLUSH, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5))); builders.put(Names.REFRESH, new ScalingExecutorBuilder(Names.REFRESH, 1, halfProcMaxAt10, TimeValue.timeValueMinutes(5))); builders.put(Names.WARMER, new ScalingExecutorBuilder(Names.WARMER, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5))); builders.put(Names.SNAPSHOT, new ScalingExecutorBuilder(Names.SNAPSHOT, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5))); builders.put(Names.FETCH_SHARD_STARTED, new ScalingExecutorBuilder(Names.FETCH_SHARD_STARTED, 1, 2 * availableProcessors, TimeValue.timeValueMinutes(5))); builders.put(Names.FORCE_MERGE, new FixedExecutorBuilder(settings, Names.FORCE_MERGE, 1, -1)); builders.put(Names.FETCH_SHARD_STORE, new ScalingExecutorBuilder(Names.FETCH_SHARD_STORE, 1, 2 * availableProcessors, TimeValue.timeValueMinutes(5))); for (final ExecutorBuilder<?> builder : customBuilders) { if (builders.containsKey(builder.name())) { throw new IllegalArgumentException("builder with name [" + builder.name() + "] already exists"); } builders.put(builder.name(), builder); } this.builders = Collections.unmodifiableMap(builders); threadContext = new ThreadContext(settings); final Map<String, ExecutorHolder> executors = new HashMap<>(); for (@SuppressWarnings("unchecked") final Map.Entry<String, ExecutorBuilder> entry : builders.entrySet()) { final ExecutorBuilder.ExecutorSettings executorSettings = entry.getValue().getSettings(settings); final ExecutorHolder executorHolder = entry.getValue().build(executorSettings, threadContext); if (executors.containsKey(executorHolder.info.getName())) { } logger.debug("created thread pool: {}", entry.getValue().formatInfo(executorHolder.info)); executors.put(entry.getKey(), executorHolder); } executors.put(Names.SAME, new ExecutorHolder(DIRECT_EXECUTOR, new Info(Names.SAME, ThreadPoolType.DIRECT))); this.executors = unmodifiableMap(executors); this.scheduler = Scheduler.initScheduler(settings); TimeValue estimatedTimeInterval = ESTIMATED_TIME_INTERVAL_SETTING.get(settings); this.cachedTimeThread = new CachedTimeThread(EsExecutors.threadName( settings, "[timer]"), estimatedTimeInterval.millis()); this.cachedTimeThread.start(); }

线程池类型 ThreadPoolType

fixed(固定):fixed线程池拥有固定数量的线程来处理请求,在没有空闲线程时请求将被挂在队列中。queue_size参数可以控制在没有空闲线程时,能排队挂起的请求数
fixed_auto_queue_size:此类型为实验性的,将被更改或删除,不关注
scaling(弹性):scaling线程池拥有的线程数量是动态的,这个数字介于core和max参数的配置之间变化。keep_alive参数用来控制线程在线程池中空闲的最长时间
direct:此类线程是一种不支持关闭的线程,就意味着一旦使用,则会一直存活下去.

一些重要的线程池

generic:用于通用的请求(例如:后台节点发现),线程池类型为 scaling。
index:用于index/delete请求,线程池类型为 fixed, 大小的为处理器数量,队列大小为200,最大线程数为 1 + 处理器数量。
search:用于count/search/suggest请求。线程池类型为 fixed, 大小的为 int((处理器数量 3) / 2) +1,队列大小为1000。*
get:用于get请求。线程池类型为 fixed,大小的为处理器数量,队列大小为1000。
analyze:用于analyze请求。线程池类型为 fixed,大小的1,队列大小为16
write:用于单个文档的 index/delete/update 请求以及 bulk 请求,线程池类型为 fixed,大小的为处理器数量,队列大小为200,最大线程数为 1 + 处理器数量。
snapshot:用于snaphost/restore请求。线程池类型为 scaling,线程保持存活时间为5分钟,最大线程数为min(5, (处理器数量)/2)。
warmer:用于segment warm-up请求。线程池类型为 scaling,线程保持存活时间为5分钟,最大线程数为min(5, (处理器数量)/2)。
refresh:用于refresh请求。线程池类型为 scaling,线程空闲保持存活时间为5分钟,最大线程数为min(10, (处理器数量)/2)。
listener:主要用于Java客户端线程监听器被设置为true时执行动作。线程池类型为 scaling,最大线程数为min(10, (处理器数量)/2)。


PluginsService插件服务

 public static final String ES_PLUGIN_PROPERTIES = "plugin-descriptor.properties"; public static final String ES_PLUGIN_POLICY = "plugin-security.policy"; public static PluginInfo readFromProperties(final Path path) throws IOException { final Path descriptor = path.resolve(ES_PLUGIN_PROPERTIES); final Properties props = new Properties(); try (InputStream stream = Files.newInputStream(descriptor)) { props.load(stream); } final String name = props.getProperty("name"); if (name == null || name.isEmpty()) { throw new IllegalArgumentException( "property [name] is missing in [" + descriptor + "]"); } final String description = props.getProperty("description"); if (description == null) { throw new IllegalArgumentException( "property [description] is missing for plugin [" + name + "]"); } final String version = props.getProperty("version"); if (version == null) { throw new IllegalArgumentException( "property [version] is missing for plugin [" + name + "]"); } final String esVersionString = props.getProperty("elasticsearch.version"); if (esVersionString == null) { throw new IllegalArgumentException( "property [elasticsearch.version] is missing for plugin [" + name + "]"); } final Version esVersion = Version.fromString(esVersionString); if (esVersion.equals(Version.CURRENT) == false) { final String message = String.format( Locale.ROOT, "plugin [%s] is incompatible with version [%s]; was designed for version [%s]", name, Version.CURRENT.toString(), esVersionString); throw new IllegalArgumentException(message); } final String javaVersionString = props.getProperty("java.version"); if (javaVersionString == null) { throw new IllegalArgumentException( "property [java.version] is missing for plugin [" + name + "]"); } JarHell.checkVersionFormat(javaVersionString); JarHell.checkJavaVersion(name, javaVersionString); final String classname = props.getProperty("classname"); if (classname == null) { throw new IllegalArgumentException( "property [classname] is missing for plugin [" + name + "]"); } final String hasNativeControllerValue = props.getProperty("has.native.controller"); final boolean hasNativeController; if (hasNativeControllerValue == null) { hasNativeController = false; } else { switch (hasNativeControllerValue) { case "true": hasNativeController = true; break; case "false": hasNativeController = false; break; default: final String message = String.format( Locale.ROOT, "property [%s] must be [%s], [%s], or unspecified but was [%s]", "has_native_controller", "true", "false", hasNativeControllerValue); throw new IllegalArgumentException(message); } } final String requiresKeystoreValue = props.getProperty("requires.keystore", "false"); final boolean requiresKeystore; try { requiresKeystore = Booleans.parseBoolean(requiresKeystoreValue); } catch (IllegalArgumentException e) { throw new IllegalArgumentException("property [requires.keystore] must be [true] or [false]," + " but was [" + requiresKeystoreValue + "]", e); } return new PluginInfo(name, description, version, classname, hasNativeController, requiresKeystore); }
  • 读取模块的配置文件 plugin-descriptor.properties,解析出内容并存储到 Map中。
  • 分别校验 name, description, version, elasticsearch.version, java.version, classname, extended.plugins, has.native.controller, requires.keystore 这些配置项,缺失或者不按要求则抛出异常。
  • 根据配置项构造一个 PluginInfo 对象返回。


参考文章

Elasticsearch 6.3.2 启动过程

原文链接:https://yq.aliyun.com/articles/675247
关注公众号

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。

持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。

转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。

文章评论

共有0条评论来说两句吧...

文章二维码

扫描即可查看该文章

点击排行

推荐阅读

最新文章