首页 文章 精选 留言 我的

精选列表

搜索[学习],共10000篇文章
优秀的个人博客,低调大师

【Java技术探索】深入学习JIT编译器实现机制(原理篇)

前提概要 解释器 Java程序最初是通过解释器(Interpreter)进行解释执行的,当虚拟机发现某个方法或代码块的运行特别频繁的时候,就会把这些代码认定为“热点代码”(hotspot code)。正因为如此,我们的hotspot的虚拟机就是因此而得名。 解释器优点 (占用空间较少)解释执行占用更小的内存空间。 (启动和首次执行速度较快)当程序需要迅速启动的时候,解释器可以首先发挥作用,省去了编译的时间,立即执行。 (提高动态性和移植性)当处于程序的动态效果下,如果预先编译好所有相关的静态本地代码后,就无法实现动态化扩展,以及提高移植到其他计算机平台架构下的能力 编译器 为了提高热点代码的执行效率,在运行时,即时编译器(Just In Time Compiler,下文称 JIT编译器 )会把这些代码编译成与本地平台相关的机器码,并进行各种层次的优化。 编译器优点 (提高运行速度)在程序运行时,随着时间的推移,编译器逐渐发挥作用,把越来越多的代码编译成本地代码之后,可以获得更高的执行效率。 (逆转优化)同时,当编译器进行的激进优化失败的时候,还可以进行逆优化来恢复到解释执行的状态。 因此,整个虚拟机执行架构中,解释器与编译器经常配合工作,如下图所示。 解释器与编译器并存的架构(流程) 如果Java程序需要迅速启动和执行时,或者只是执行一次,解释器可首先发挥作用,省去编译时间,立即执行程序运行后,随着时间推移,JIT编译器逐渐发挥作用,把越来越多的代码编译成本地代码后,可获取更高执行效率。 程序运行环境中内存资源限制较大(如部分嵌入式系统中),可使用解释执行节约内存,反之可使用JIT编译执行提升效率 解释器还可作为JIT编译器激进优化时的一个“逃生门”,让编译器根据概率选择一些大多数时候都能提升运行速度的优化手段,当激进优化的假设不成立时可通过逆优化(Deoptimization)退回到解释状态继续执行 故,在整个虚拟机执行架构中解释器与编译器经常配合工作 Xint设置:用户可以使用参数 -Xint 强制虚拟机运行于 “解释模式”(Interpreted Mode),这时候编译器完全不介入工作。 -Xcomp设置:强制虚拟机运行于 “编译模式”(Compiled Mode),这时候将优先采用编译方式执行,但是解释器仍然要在编译无法进行的情况下接入执行过程。 -Xmixed设置:这种配合使用的方式称为“混合模式”(Mixed Mode), 通过虚拟机 -version 命令可以查看当前默认的运行模式。 即时编译器(JIT编译器) JIT编译器不是虚拟机的必需部分,但JIT编译器编译性能的好坏、代码优化程度的高低是衡量一款商用虚拟机优秀与否的最关键的指标之一,也是虚拟机中最核心且最能体现虚拟机技术水平的部分。 被编译对象和触发条件 在运行过程中会被即时编译的“热点代码”有两类,即: 编译的目标对象 被多次调用的方法 编译器会将整个方法作为编译对象,这也是标准的JIT 编译方式 被多次执行的循环体 由循环体出发的,但是编译器依然会以整个方法作为编译对象,因为发生在方法执行过程中,称为栈上替换。 判断热点代码 「判断一段代码是否是热点代码,是不是需要出发即时编译」,这样的行为称为热点探测(Hot Spot Detection),探测算法有两种,分别为。 基于采样的热点探测(Sample Based Hot Spot Detection) 虚拟机会周期的对各个线程栈顶进行检查,如果某些方法经常出现在栈顶,这个方法就是“热点方法”。 优点:实现简单、高效,很容易获取方法调用关系。 缺点:很难确认方法的reduce(衰减),容易受到线程阻塞或其他外因扰乱。 基于计数器的热点探测(Counter Based Hot Spot Detection) 为每个方法(甚至是代码块)建立计数器,执行次数超过阈值就认为是“热点方法”。 优点:统计结果精确严谨。 缺点:实现麻烦,不能直接获取方法的调用关系。 HotSpot使用的是第二种-基于技术其的热点探测,并且有两类计数器: 方法调用计数器(Invocation Counter ) 回边计数器(Back Edge Counter ) 两个即时编译器 从上面的解释器和编译器的协同合作架构图中,应该可以了解到,JVM虚拟机实现了两个不同的JIT编译器,分别称为 Client Compiler和 Server Compiler ,或者简称为 C1 编译器和 C2 编译器。 热点触发的阈值 这两个计数器都有一个确定的阈值,超过后便会触发JIT编译,具体细节和内容下面会详细讲述。 上面提到了一下两种热点探测的计数器: 方法调用计数器(Invocation Counter ) 首先是方法调用计数器: Client模式下默认阈值是1500 次。 Server 模式下是 10000次。 这个阈值可以通过 -XX:CompileThreshold 来人为设定。 如果不做任何设置,方法调用计数器统计的并不是方法被调用的绝对次数,而是一个相对的执行频率,即一段时间之内的方法被调用的次数。(可以理解为滑动窗口)。 当超过一定的时间限度,如果方法的调用次数仍然不足以让它提交给即时编译器编译,那么这个方法的调用计数器就会被减少一半,这个过程称为方法调用计数器热度的衰减(Counter Decay),而这段时间就成为此方法的统计的半衰期( Counter Half Life Time)。 进行热度衰减的动作是在虚拟机进行垃圾收集时顺便进行的,可以使用虚拟机参数 -XX:CounterHalfLifeTime 参数设置半衰周期的时间 (时间窗口秒),单位是秒。整个 JIT 编译的交互过程如下图。 回边计数器(Back Edge Counter ) 作用是统计一个方法中循环体代码执行的次数,在字节码中遇到控制流向后跳转的指令称为“回边”( Back Edge )。 显然,建立回边计数器统计的目的就是为了触发 OSR 编译。关于这个计数器的阈值, HotSpot 提供了 -XX:BackEdgeThreshold 供用户设置。 但是当前的虚拟机实际上使用了 -XX:OnStackReplacePercentage 来简介调整阈值,计算公式如下: Client模式, 公式为方法调用计数器阈值(CompileThreshold)X OSR 比率(OnStackReplacePercentage)/100 。其中OSR比率默认为933,那么,回边计数器的阈值为13995。 Server模式,公式为方法调用计数器阈值(Compile Threashold)X (OSR (OnStackReplacePercentage)- 解释器监控比率 (InterpreterProfilePercent))/100 其中onStackReplacePercentage 默认值为 140,InterpreterProfilePercentage 默认值为 33,如果都取默认值,那么 Server 模式虚拟机回边计数器阈值为 10700 。 编译过程 默认情况下,无论是方法调用产生的即时编译请求,还是OSR请求,虚拟机在代码编译器还未完成之前,都仍然将按照解释方式继续执行,而编译动作则在后台的编译线程中进行。 用户可以通过参数 -XX:-BackgroundCompilation来禁止后台编译,这样,一旦达到 JIT 的编译条件,执行线程向虚拟机提交便已请求之后便会一直等待,直到编译过程完成后再开始执行编译器输出的本地代码。 虚拟机运行模式 目前的HotSpot编译器默认的是解释器和其中一个即时编译器配合的方式工作,具体是哪一个编译器,取决于虚拟机运行的模式,HotSpot虚拟机会根据自身版本与计算机的硬件性能自动选择运行模式,用户也可以使用 -client 和 -server 参数强制指定虚拟机运行在 Client 模式或者 Server 模式。 Client Compiler(了解即可) : 它是一个简单快速的三段式编译器,主要关注点在于局部的优化,放弃了许多耗时较长的全局优化手段。 第一阶段,一个平台独立的前端将字节码构造成一种高级中间代码表示(High-Level Intermediate Representaion , HIR)。在此之前,编译器会在字节码上完成一部分基础优化,如 方法内联,常量传播等优化。 第二阶段,一个平台相关的后端从 HIR 中产生低级中间代码表示(Low-Level Intermediate Representation ,LIR),而在此之前会在 HIR 上完成另外一些优化,如空值检查消除,范围检查消除等,让HIR 更为高效。 第三阶段,在平台相关的后端使用线性扫描算法(Linear Scan Register Allocation)在 LIR 上分配寄存器,做窥孔(Peephole)优化,然后产生机器码。 Server Compiler(了解即可): 专门面向服务端典型应用并为服务端性能配置特别调整过的编译器也是一个充分优化过的高级编译器,几乎能达到GNU C++编译器使用-02参数时的优化强度会执行所有经典的优化动作。 无用代码消除(Dead Code Elimination)、 循环展开(LoopcUnrolling)、 循环表达式外提(Loop Expression Hoisting)、 消除公共子表达式(Common Subexpression Elimination)、 常量传播(Constant Propagation)、 基本块重排序(Basic Block Reordering)等 还会实施一些与Java语言特性密切相关的优化技术,如 范围检查消除(Range Check Elimination)、 空值检查消除(Null Check Elimination)等 还可能根据解释器或Client Compiler提供的性能监控信息,进行一些不稳定的激进优化,如 守护内联(Guarded Inlining)、 分支频率预测(Branch Frequency Prediction)等 Server Compiler的寄存器分配器是一个全局图着色分配器,它可充分利用某些处理器架构(如RISC)上的大寄存器集合 编译速度远超传统静态优化编译器,相对Client Compiler代码质量有所提高,可减少本地代码执行时间,从而抵消额外的编译时间开销 如何从外部观察即时编译器的编译过程和编译结果? -XX:+PrintCompilation 在即时编译时,打印被编译成本地代码的方法名称 -XX:+PrintInlining 在即时编译时,输出方法内联信息 -XX:+PrintAssembly 在即时编译时,打印被编译方法的汇编代码,虚拟机需安装反汇编适配器HSDIS插件,Product版虚拟机需加入参数-XX:+UnlockDiagnosticVMOptions打开虚拟机诊断模式 -XX:+PrintOptoAssembly 用于Server VM,输出比较接近最终结果的中间代码表示,不需HSDIS插件支持 -XX:+PrintLIR 用于Client VM,输出比较接近最终结果的中间代码表示,不需HSDIS插件支持 -XX:+PrintCFGToFile 用于Client Compiler,将编译过程中各阶段数据(如,字节码、HIR生成、LIR生成、寄存器分配过程、本地代码生成等)输出到文件中 -XX:PrintIdealGraphFile 用于Server Compiler,将编译过程中各阶段数据(如,字节码、HIR生成、LIR生成、寄存器分配过程、本地代码生成等)输出到文件中 注,要输出CFG或IdealGraph文件,需Debug或FastDebug版虚拟机支持,Product版的虚拟机无法输出这些文件

优秀的个人博客,低调大师

时序数据库Influx-IOx源码学习十二(物理计划的执行)

InfluxDB是一个由InfluxData开发的开源时序数据库,专注于海量时序数据的高性能读、写、高效存储与实时分析等,在DB-Engines Ranking时序型数据库排行榜上常年排名第一。 InfluxDB可以说是当之无愧的佼佼者,但 InfluxDB CTO Paul 在 2020/12/10 号在博客中发表一篇名为:Announcing InfluxDB IOx – The Future Core of InfluxDB Built with Rust and Arrow的文章,介绍了一个新项目 InfluxDB IOx,InfluxDB 的下一代时序引擎。 接下来,我将连载对于InfluxDB IOx的源码解析过程,欢迎各位批评指正,联系方式见文章末尾。 上一章介绍了一个SQL是怎样从字符串转换到物理执行计划的,详情见: https://my.oschina.net/u/3374539/blog/5035628 这一章主要记录一下物理计划是怎样执行的。 在上一篇文章的末尾,我们展示了物理计划之中存储的数据,这些数据代表了当前整个数据库中,能够与用户输入的查询表相关联的所有数据。 对于一般数据库来讲,在物理计划中更应该是指向索引相关的信息,举例来说:select * from table1 ,在物理计划里,应该是要拿到table1的表描述、存储数据的文件路径、文件大小、等等,而不是拿到真实数据。在文章最末尾中,有一段省略的数据,为什么会出现数据呢?其实这是数据库设计的缓存,缓存的数据本来就没有落到磁盘上,所以直接在物理计划中也会持有RBChunk和MBChunk的数据引用。 对于一个过滤而言,会在物理计划中产生对应的信息,展示如下: select * from myMeasurement where fieldKey like 'value1'; input: FilterExec { predicate: BinaryExpr { left: Column { name: "fieldKey" }, op: Like, right: Literal { value: Utf8("value1") } } 接下来看物理计划的执行代码: pub async fn collect(plan: Arc<dyn ExecutionPlan>) -> Result<Vec<RecordBatch>> { match plan.output_partitioning().partition_count() { 0 => Ok(vec![]), //单一块的时候直接取出数据 1 => { let it = plan.execute(0).await?; common::collect(it).await } //多个数据块的时候就需要进行合并数据 _ => { let plan = MergeExec::new(plan.clone()); assert_eq!(1, plan.output_partitioning().partition_count()); //这里分为了两步execute 和 collect common::collect(plan.execute(0).await?).await } } } 接下来看plan.execute方法: async fn execute(&self, partition: usize) -> Result<SendableRecordBatchStream> { 。。。省略 tokio::spawn(async move { //这里的input就代表了上面展示的filter的input或者是数据的input let mut stream = match input.execute(part_i).await { Err(e) => { let arrow_error = ArrowError::ExternalError(Box::new(e)); sender.send(Err(arrow_error)).await.ok(); return; } Ok(stream) => stream, }; //计划执行完成之后返回一个stream,这里就是一直next获取完 while let Some(item) = stream.next().await { sender.send(item).await.ok(); } }); 。。。省略 } 上面的input代表了以下这么多东西: 上面展示的为datafusion框架里的Plan,也就是通用sql都需要实现的功能,下面是iox项目中实现的Plan是完成数据获取的。 Plan之间的关系是嵌套的,想象一下上一章的大图,比如coalesceBatchesExec里可能还会包含filter,主要就是描述整个sql语句中都出现了什么。所有出现的plan就会对数据进行一次全面的过滤。 姑且不看过滤的细节,只看获取数据的部分(ExecutionPlan for IOxReadFilterNode)。 async fn execute( &self, partition: usize, ) -> datafusion::error::Result<SendableRecordBatchStream> { //因为在前面物理计划中得到了所有列,这里拿出列的名字 let fields = self.schema.fields(); let selection_cols = fields.iter().map(|f| f.name() as &str).collect::<Vec<_>>(); //多个分区的时候可以根据分区号拿出chunk信息 let ChunkInfo { chunk, chunk_table_schema, } = &self.chunk_and_infos[partition]; //过滤出来列名字对应的arrow的filed,这里就存在不对应的问题,假如用户输入了ABC,但是chunk_table_schema中并不存在,这里就会是一个空 let selection_cols = restrict_selection(selection_cols, &chunk_table_schema); let selection = Selection::Some(&selection_cols); //使用predicate过滤一次,但是我调试的时候一直是空的,也就是查询出所有数据。 let stream = chunk .read_filter(&self.table_name, &self.predicate, selection) .map_err(|e| { DataFusionError::Execution(format!( "Error creating scan for table {} chunk {}: {}", self.table_name, chunk.id(), e )) })?; //这里使用SchemaAdapterStream的结构来填充空值列 let adapter = SchemaAdapterStream::try_new(stream, Arc::clone(&self.schema)) .map_err(|e| DataFusionError::Internal(e.to_string()))?; Ok(Box::pin(adapter)) } 这个SchemaAdapterStream在代码中给了一个特别形象的描述: /// /// ┌────────────────┐ ┌─────────────────────────┐ /// │ ┌─────┐┌─────┐ │ │ ┌─────┐┌──────┐┌─────┐ │ /// │ │ A ││ C │ │ │ │ A ││ B ││ C │ │ /// │ │ - ││ - │ │ │ │ - ││ - ││ - │ │ /// ┌──────────────┐ │ │ 1 ││ 10 │ │ ┌──────────────┐ │ │ 1 ││ NULL ││ 10 │ │ /// │ Input │ │ │ 2 ││ 20 │ │ │ Adapter │ │ │ 2 ││ NULL ││ 20 │ │ /// │ Stream ├────▶ │ │ 3 ││ 30 │ │────▶│ Stream ├───▶│ │ 3 ││ NULL ││ 30 │ │ /// └──────────────┘ │ │ 4 ││ 40 │ │ └──────────────┘ │ │ 4 ││ NULL ││ 40 │ │ /// │ └─────┘└─────┘ │ │ └─────┘└──────┘└─────┘ │ /// │ │ │ │ /// │ Record Batch │ │ Record Batch │ /// └────────────────┘ └─────────────────────────┘ /// 接下来看如何实现数据查找的: fn read_filter( &self, table_name: &str, predicate: &Predicate, selection: Selection<'_>, ) -> Result<SendableRecordBatchStream, Self::Error> { //chunk存在变体,这里就是先判断是什么chunk,有三种MB,RB,ParquetFile match self { //还是在写入阶段的buffer,暂时不支持查询条件 Self::MutableBuffer { chunk, .. } => { if !predicate.is_empty() { return InternalPredicateNotSupported { predicate: predicate.clone(), } .fail(); } let batch = chunk .read_filter(table_name, selection) .context(MutableBufferChunk)?; Ok(Box::pin(MemoryStream::new(vec![batch]))) } //不可写阶段的buffer,对数据进行过滤 Self::ReadBuffer { chunk, .. } => { let rb_predicate = to_read_buffer_predicate(&predicate).context(PredicateConversion)?; //读取数据并过滤 let read_results = chunk .read_filter(table_name, rb_predicate, selection) .context(ReadBufferChunkError { chunk_id: chunk.id(), })?; //读取schema信息并过滤 let schema = chunk .read_filter_table_schema(table_name, selection) .context(ReadBufferChunkError { chunk_id: chunk.id(), })?; //ReadFilterResultsStream是对不同的chunk类型实现的读取接口 Ok(Box::pin(ReadFilterResultsStream::new( read_results, schema.into(), ))) } //Parquet同理 Self::ParquetFile { chunk, .. } => chunk .read_filter(table_name, predicate, selection) .context(ParquetFileChunkError { chunk_id: chunk.id(), }), } } 数据到了这里就会按照你选择的表名、列名,将数据全部查询出来了。在代码中的predicate,一直是空的,暂时不确定是如何填充的,后面再看。 数据从这里全部查询出来之后,会返回给datafusion框架,继续按照开头写到的过滤器进行过滤,就是遍历一遍数据判断大于、小于或者like等等。 好了查询就先写到这里。 祝玩儿的开心!! 欢迎关注微信公众号: 或添加微信好友: liutaohua001

优秀的个人博客,低调大师

时序数据库Influx-IOx源码学习十(查询主流程)

InfluxDB是一个由InfluxData开发的开源时序数据库,专注于海量时序数据的高性能读、写、高效存储与实时分析等,在DB-Engines Ranking时序型数据库排行榜上常年排名第一。 InfluxDB可以说是当之无愧的佼佼者,但 InfluxDB CTO Paul 在 2020/12/10 号在博客中发表一篇名为:Announcing InfluxDB IOx – The Future Core of InfluxDB Built with Rust and Arrow的文章,介绍了一个新项目 InfluxDB IOx,InfluxDB 的下一代时序引擎。 接下来,我将连载对于InfluxDB IOx的源码解析过程,欢迎各位批评指正,联系方式见文章末尾。 上一篇粗略的总结了写入的基本流程,详情见: https://my.oschina.net/u/3374539/blog/5033469 这一篇记录一下查询的主要流程。 在第六章中,写了一个查询示例,如下: let mut query = flight::Client::new(connection) .perform_query("databaseName", "select * from myMeasurement") .await .expect("query request should work"); 其中connection,代表的建立了一个Grpc的连接。perform_query代表执行查询,其中第一个参数是数据库名字,第二个参数是要执行查询的sql语句。这个perform_query是封装了一下调用协议,然后调用了服务器端的do_get方法,do_get方法在服务器的src/influxdb_ioxd/rpc/flight.rs:139行可以找到,如下: async fn do_get( &self, //这个Ticket里就是保存的perform_query方法中封装的json数据 request: Request<Ticket>, ) -> Result<Response<Self::DoGetStream>, tonic::Status> { //这里就是把json还原回来 let ticket = request.into_inner(); let json_str = String::from_utf8(ticket.ticket.to_vec()).context(InvalidTicket { ticket: ticket.ticket, })?; //反序列化成了ReadInfo结构 let read_info: ReadInfo = serde_json::from_str(&json_str).context(InvalidQuery { query: &json_str })?; //拿到客户端设置的数据库名字 let database = DatabaseName::new(&read_info.database_name).context(InvalidDatabaseName)?; //从内存中查找是否存在这个database名字,如果不存在就会报DatabaseNotFound错误回去 //这里就是创建数据库的时候写入到内存里的 //同时还应该记得iox的数据库必须一个节点创建一次。。hhhhha let db = self.server.db(&database).context(DatabaseNotFound { database_name: &read_info.database_name, })?; //这个是拿到之前创建数据库时候设置的线程池,可以回去参考第五章 let executor = db.executor(); //这里是创建出sql语句对应的physical_plan,后面再看 let physical_plan = Planner::new(Arc::clone(&executor)) .sql(db, &read_info.sql_query) .await .context(Planning)?; //使用线程异步的执行查询 let results = executor //复制一下执行时候需要用到的信息 .new_context() //真正的去执行 .collect(Arc::clone(&physical_plan)) .await .map_err(|e| Box::new(e) as _) .context(Query { database_name: &read_info.database_name, })?; //在写入的章节里应该知道了在RBChunk里面存储的是Arrow格式的。 //在这个方法中就是调用arrow_flight工具包的方法,先把schema序列化到flight_buffer中 let options = arrow::ipc::writer::IpcWriteOptions::default(); let schema = physical_plan.schema(); let schema_flight_data = arrow_flight::utils::flight_data_from_arrow_schema(schema.as_ref(), &options); let mut flights: Vec<Result<FlightData, tonic::Status>> = vec![Ok(schema_flight_data)]; //上面得到的结果集,这里进行遍历,封装为要返回的数据结构 let mut batches: Vec<Result<FlightData, tonic::Status>> = results .iter() //这个是为了给下面flight_data_from_arrow_batch这个方法打补丁用的 //因为这个方法即便对于切片类型的batch也是盲目的序列化所有数据 .map(optimize_record_batch) .collect::<Result<Vec<_>, Error>>()? .iter() //这里就是一条一条的把数据序列化到缓冲区里 .flat_map(|batch| { let (flight_dictionaries, flight_batch) = arrow_flight::utils::flight_data_from_arrow_batch(&batch, &options); //把数据包装在Result中 flight_dictionaries .into_iter() .chain(std::iter::once(flight_batch)) .map(Ok) }) .collect(); //前面是schema,后面是数据 flights.append(&mut batches); //返回一个数据的异步stream,有可能调用一次next就会释放一次cpu? let output = futures::stream::iter(flights); //数据以flight形式发送到了客户端,客户端先读取schema再读取数据。 Ok(Response::new(Box::pin(output) as Self::DoGetStream)) } 这里基本上是整个查询的主逻辑: 异步的将sql转换为plan。 异步的去执行plan并返回结果和结果所对应的schema信息。 将返回的arrow数据封装到flights格式中。 通过Grpc返回 这一篇就到这里吧,下几章准备记录一下: sql是怎么被执行的 查询中都经历了什么 等等。。。 祝玩儿的开心。 欢迎关注微信公众号: 或添加微信好友: liutaohua001

优秀的个人博客,低调大师

时序数据库Influx-IOx源码学习五(创建数据库)

欢迎关注公众号: 上篇介绍到:InfluxDB-IOx的Run命令启动过程,详情见:https://my.oschina.net/u/3374539/blog/5021654 这章记录一下Database create命令的执行过程。 在第三章命令行中介绍了,所有的子命令都有一个独立的参数或配置称为subcommand。 enum Command { Convert { // 省略 ...}, Meta {// 省略 ...}, Database(commands::database::Config), Run(Box<commands::run::Config>), Stats(commands::stats::Config), Server(commands::server::Config), Writer(commands::writer::Config), Operation(commands::operations::Config), } 这章我们打开看一眼commands::database下的config包含了什么。 pub struct Config { #[structopt(subcommand)] command: Command, } //见名知意,基本猜测一下就行了,慢慢使用到再回来看 enum Command { Create(Create), List(List), Get(Get), Write(Write), Query(Query), Chunk(chunk::Config), Partition(partition::Config), } 先来看一下create命令的执行。 Command::Create(command) => { //创建一个grpc的client let mut client = management::Client::new(connection); //设置基本的配置项 let rules = DatabaseRules { //数据库名字 name: command.name, //内存的各种配置,包含缓存大小,时间等等 lifecycle_rules: Some(LifecycleRules { //省略。。 }), //设置分区的策略 partition_template: Some(PartitionTemplate { //省略。。 }), //其它都填充default ..Default::default() }; //使用配置信息创建数据库,这里是生成了一个CreateDatabaseRequest去调用了远程服务器的方法 client.create_database(rules).await?; println!("Ok"); } 在上一章中提到了grpc的启动,这里就涉及到了之前提到的grpc的框架tonic,在tonic中使用#[tonic::async_trait]了标记一个服务器端的实现开始。我在ide中搜索,可以在src/influxdb_ioxd/rpc/management.rs:50行中找到ManagementService相关的实现。 有关tonic更多的资料请阅读:https://github.com/hyperium/tonic #[tonic::async_trait] impl<M> management_service_server::ManagementService for ManagementService<M> where M: ConnectionManager + Send + Sync + Debug + 'static, { //省略其它方法。。。 async fn create_database( &self, //这里就是接收CreateDatabaseRequest的请求 request: Request<CreateDatabaseRequest>, ) -> Result<Response<CreateDatabaseResponse>, Status> { //对数据进行一下校验,然后获得在上面配置的rules规则 let rules: DatabaseRules = request .into_inner() .rules .ok_or_else(|| FieldViolation::required("")) .and_then(TryInto::try_into) .map_err(|e| e.scope("rules"))?; //这里就是在第三章中提到的server_id,如果没配置就会报错了 let server_id = match self.server.require_id().ok() { Some(id) => id, None => return Err(NotFound::default().into()), }; //这里就是真正的去创建,在下面继续跟踪 match self.server.create_database(rules, server_id).await { Ok(_) => Ok(Response::new(CreateDatabaseResponse {})), Err(Error::DatabaseAlreadyExists { db_name }) => { return Err(AlreadyExists { resource_type: "database".to_string(), resource_name: db_name, ..Default::default() } .into()) } Err(e) => Err(default_server_error_handler(e)), } } } 接下来要继续查看数据库真正的被创建出来,我读到这里存在一个问题,文件格式是什么样子的? pub async fn create_database(&self, rules: DatabaseRules, server_id: NonZeroU32) -> Result<()> { //检查server_id self.require_id()?; //把数据库名字存储到内存中,最终保存到一个btreemap中 let db_reservation = self.config.create_db(rules)?; //对数据进行持久化保存 self.persist_database_rules(db_reservation.rules().clone()) .await?; //启动数据库后台线程,在内存中写入数据库状态 db_reservation.commit(server_id, Arc::clone(&self.store), Arc::clone(&self.exec)); Ok(()) } 来解答上面的疑问,文件是怎样持久化、格式是什么样子的。 pub async fn persist_database_rules<'a>(&self, rules: DatabaseRules) -> Result<()> { //生成一个新的数据库路径 let location = object_store_path_for_database_config(&self.root_path()?, &rules.name); //序列化DatabaseRules这个pb到byte流 let mut data = BytesMut::new(); rules.encode(&mut data).context(ErrorSerializing)?; let len = data.len(); let stream_data = std::io::Result::Ok(data.freeze()); //将pb的内容进行存储 self.store .put( &location, futures::stream::once(async move { stream_data }), Some(len), ) .await .context(StoreError)?; Ok(()) } 这里调用了rules.encode()转换到pb的格式,这里是rust语言的一个方法,实现了From特性的,就得到了一个into的方法,如:impl From<DatabaseRules> for management::DatabaseRules. 到这里数据库的一个描述文件rules.pb就被写入到磁盘中了,路径是启动命令中指定的--data-dir参数路径 + --writer-id + 数据库名字。 例如,我的启动和创建命令为: ./influxdb_iox run --writer-id 1 --object-store file --data-dir ~/influxtest/ ./influxdb_iox database create test 那么得到的路径就为:~/influxtest/1/test/rules.pb. 之后可以运行一个pb的脚本来反查rules.pb中的数据内容,如下: $ ./scripts/prototxt decode influxdata.iox.management.v1.DatabaseRules \ < ~/influxtest/1/test/rules.pb influxdata/iox/management/v1/service.proto:6:1: warning: Import google/protobuf/field_mask.proto is unused. name: "test" partition_template { parts { time: "%Y-%m-%d %H:00:00" } } lifecycle_rules { mutable_linger_seconds: 300 mutable_size_threshold: 10485760 buffer_size_soft: 52428800 buffer_size_hard: 104857600 sort_order { order: ORDER_ASC created_at_time { } } } 看到这里已经知道整个生成过程及文件内容。 祝玩儿的开心。

优秀的个人博客,低调大师

时序数据库Influx-IOx源码学习四(Run命令的执行)

欢迎关注公众号: 上篇介绍到:InfluxDB-IOx的命令行及配置,详情见:https://my.oschina.net/u/3374539/blog/5017858 这章记录一下Run命令的执行过程。 //根据用户在命令行配置的num_threads参数 //来选择创建一个多线程的模型,还是current_thread的模型 //后面有时间深入研究tokio的时候再来分析有什么异同 let tokio_runtime = get_runtime(config.num_threads)?; //block_on会让线程一直等待方法里的future执行完成 //这是让闭包中的方法占有了io driver 和 timer context tokio_runtime.block_on(async move { let host = config.host; match config.command { // 省略其它command ... Command::Run(config) => { //具体去子类型里执行,然后await一个结果 if let Err(e) = commands::run::command(logging_level, *config).await { eprintln!("Server command failed: {}", e); std::process::exit(ReturnCode::Failure as _) } } } }); 在influxdb_ioxd::main方法中,忽略一些不太需要重点关注的,分别是初始化log的管理、PanicsTracing、CancellationToken等。 //初始化对象存储 let object_store = ObjectStore::try_from(&config)?; //可以看到,目前已经支持了 //1.内存(在container环境运行时候使用) //2.Google //3.S3 //4.Azure //5.File 本地文件,方便开发者调试运行在云上时候的文件变化 fn try_from(config: &Config) -> Result<Self, Self::Error> { match config.object_store { Some(ObjStoreOpt::Memory) | None => { //创建一个btreemap用来缓存或者搜索 Ok(Self::new_in_memory(object_store::memory::InMemory::new())) } Some(ObjStoreOpt::Google) => { // 省略 } Some(ObjStoreOpt::S3) => { // 省略 } Some(ObjStoreOpt::Azure) => { // 省略 } Some(ObjStoreOpt::File) => match config.database_directory.as_ref() { Some(db_dir) => { //去递归创建这个配置路径中的文件夹 //context也是使用的snafu来处理错误的 fs::create_dir_all(db_dir) .context(CreatingDatabaseDirectory { path: db_dir })?; //都创建完成,并且没出错误,把路径保存起来 Ok(Self::new_file(object_store::disk::File::new(&db_dir))) } // 如果database_directory这个参数没有配置的时候 //使用snafu这个crate来返回一个错误 None => MissingObjectStoreConfig { object_store: ObjStoreOpt::File, missing: "data-dir", } .fail(), }, } } 关于错误处理的代码: #[snafu(display("Unable to create database directory {:?}: {}", path, source))] CreatingDatabaseDirectory { path: PathBuf, source: std::io::Error, }, #[snafu(display( "Specified {} for the object store, required configuration missing for {}", object_store, missing ))] MissingObjectStoreConfig { object_store: ObjStoreOpt, missing: String, }, 我们来测试一下错误的场景,来看看是否符合代码的预期。 // 不传入路径 cargo run run --object-store file Finished dev [unoptimized + debuginfo] target(s) in 0.42s Running `./influxdb_iox run --object-store file` Apr 15 13:38:34.352 INFO influxdb_iox::influxdb_ioxd: Using File for object storage Server command failed: Run: Specified File for the object store, required configuration missing for data-dir //传入一个创建不了的路径 cargo run run --object-store file --data-dir /root/1/1 Finished dev [unoptimized + debuginfo] target(s) in 0.47s Running `./influxdb_iox run --object-store file --data-dir /root/1/1` Apr 15 13:45:26.664 INFO influxdb_iox::influxdb_ioxd: Using File for object storage Server command failed: Run: Unable to create database directory "/root/1/1": Read-only file system (os error 30) 可以看到是符合预期的,bingo //创建一个空的结构体 let connection_manager = ConnectionManager {}; //创建AppServer结构体用来保存基本的信息 //server_config里就是保存的对象存储的信息及线程配置 //如果num_worker_threads没有填写,默认就使用cpu数量 let app_server = Arc::new(AppServer::new(connection_manager, server_config)); //不设置这个writer_id能启动,但是不能做任何操作 if let Some(id) = config.writer_id { //compare and set 一个非0的数值,错误就打印一个指定的panic app_server.set_id(id).expect("writer id already set"); //校验所有的配置 if let Err(e) = app_server.load_database_configs().await { error!( "unable to load database configurations from object storage: {}", e ) } } else { warn!("server ID not set. ID must be set via the INFLUXDB_IOX_ID config or API before writing or querying data."); } 接下来进入load_database_configs方法看看, let list_result = self .store //把write_id和配置的文件路径组合一下,作为一个目录 //遍历文件夹中的所有东西,用一个BTreeSet存所有子文件夹 //用Vec存下所有的文件信息,包括路径、修改时间、大小等 .list_with_delimiter(&self.root_path()?) .await .context(StoreError)?; //拿到配置的server的write_id let server_id = self.require_id()?; let handles: Vec<_> = list_result //配置的文件夹下的所有文件夹 .common_prefixes .into_iter() //全部进行map转换 .map(|mut path| { let store = Arc::clone(&self.store); let config = Arc::clone(&self.config); let exec = Arc::clone(&self.exec); //先找database的相关信息文件,名字叫rules.pb path.set_file_name(DB_RULES_FILE_NAME); //感觉是需要io来读取文件内容,所以开一个异步 tokio::task::spawn(async move { let mut res = get_store_bytes(&path, &store).await; //省略错误处理。。 let res = res.unwrap().freeze(); //解析文件内容,根据文件名可以看出是个pb文件。 match DatabaseRules::decode(res) { Err(e) => { //省略错误。。 } //根据解析出来的文件内容,在内存中恢复回来db的相关信息 Ok(rules) => match config.create_db(rules) { Err(e) => error!("error adding database to config: {}", e), //提交一个后台任务,用来不断的检测chunks的状态 //比如达到了某个大小,然后写入到存储等 Ok(handle) => handle.commit(server_id, store, exec), }, } }) }) .collect(); //等待所有任务完成 futures::future::join_all(handles).await; 这里就启动完成了一个基本的服务,创建了存储路径、初始化数据库的基本配置、启动了一个用来刷盘、整理chunk的后台任务。 接下来就是启动连接相关的了。 //从启动命令行中读取grpc的地址 let grpc_bind_addr = config.grpc_bind_address; //绑定这个地址 let socket = tokio::net::TcpListener::bind(grpc_bind_addr) .await .context(StartListeningGrpc { grpc_bind_addr })?; //真正的协议启动 let grpc_server = rpc::serve(socket, Arc::clone(&app_server), frontend_shutdown.clone()).fuse(); //同样的启动http相关的服务,使用的hyper库 let bind_addr = config.http_bind_address; let addr = AddrIncoming::bind(&bind_addr).context(StartListeningHttp { bind_addr })?; let http_server = http::serve(addr, Arc::clone(&app_server), frontend_shutdown.clone()).fuse(); //省略后面的停止流程。。。 然后看grpc的启动的服务 //启动起来健康检查的服务 let stream = TcpListenerStream::new(socket); let (mut health_reporter, health_service) = tonic_health::server::health_reporter(); //标识相对应的服务已经是可以提供服务的状态了 let services = [ generated_types::STORAGE_SERVICE, generated_types::IOX_TESTING_SERVICE, generated_types::ARROW_SERVICE, ]; for service in &services { health_reporter .set_service_status(service, tonic_health::ServingStatus::Serving) .await; } //增加一堆使用grpc的服务,并启动起来 tonic::transport::Server::builder() .add_service(health_service) .add_service(testing::make_server()) .add_service(storage::make_server(Arc::clone(&server))) .add_service(flight::make_server(Arc::clone(&server))) .add_service(write::make_server(Arc::clone(&server))) .add_service(management::make_server(Arc::clone(&server))) .add_service(operations::make_server(server)) .serve_with_incoming_shutdown(stream, shutdown.cancelled()) .await 然后是http相关的启动 pub async fn serve<M>( addr: AddrIncoming, server: Arc<AppServer<M>>, shutdown: CancellationToken, ) -> Result<(), hyper::Error> where M: ConnectionManager + Send + Sync + Debug + 'static, { //初始化路由相关的信息 let router = router(server); let service = RouterService::new(router).unwrap(); //启动服务 hyper::Server::builder(addr) .serve(service) .with_graceful_shutdown(shutdown.cancelled()) .await } 顺便看一下都提供了哪些地址可以被访问的: Router::builder() .data(server) //写了一个拦截,打印请求参数和返回结果 .middleware(Middleware::pre(|req| async move { debug!(request = ?req, "Processing request"); Ok(req) })) .middleware(Middleware::post(|res| async move { debug!(response = ?res, "Successfully processed request"); Ok(res) })) // this endpoint is for API backward compatibility with InfluxDB 2.x .post("/api/v2/write", write::<M>) .get("/health", health) .get("/metrics", handle_metrics) .get("/iox/api/v1/databases/:name/query", query::<M>) .get("/iox/api/v1/databases/:name/wal/meta", get_wal_meta::<M>) .get("/api/v1/partitions", list_partitions::<M>) .post("/api/v1/snapshot", snapshot_partition::<M>) //错误的时候调用的处理拦截 .err_handler_with_info(error_handler) .build() .unwrap() 做一个/health的测试: curl localhost:8080/health OK% 可以看到成功返回了值。 到这里基本启动就完成了,后面再用到的时候会继续对启动里的细节做研究,比如Panics,Log等等吧,欢迎持续关注。 祝玩儿的开心

资源下载

更多资源
Nacos

Nacos

Nacos /nɑ:kəʊs/ 是 Dynamic Naming and Configuration Service 的首字母简称,一个易于构建 AI Agent 应用的动态服务发现、配置管理和AI智能体管理平台。Nacos 致力于帮助您发现、配置和管理微服务及AI智能体应用。Nacos 提供了一组简单易用的特性集,帮助您快速实现动态服务发现、服务配置、服务元数据、流量管理。Nacos 帮助您更敏捷和容易地构建、交付和管理微服务平台。

Spring

Spring

Spring框架(Spring Framework)是由Rod Johnson于2002年提出的开源Java企业级应用框架,旨在通过使用JavaBean替代传统EJB实现方式降低企业级编程开发的复杂性。该框架基于简单性、可测试性和松耦合性设计理念,提供核心容器、应用上下文、数据访问集成等模块,支持整合Hibernate、Struts等第三方框架,其适用范围不仅限于服务器端开发,绝大多数Java应用均可从中受益。

Rocky Linux

Rocky Linux

Rocky Linux(中文名:洛基)是由Gregory Kurtzer于2020年12月发起的企业级Linux发行版,作为CentOS稳定版停止维护后与RHEL(Red Hat Enterprise Linux)完全兼容的开源替代方案,由社区拥有并管理,支持x86_64、aarch64等架构。其通过重新编译RHEL源代码提供长期稳定性,采用模块化包装和SELinux安全架构,默认包含GNOME桌面环境及XFS文件系统,支持十年生命周期更新。

Sublime Text

Sublime Text

Sublime Text具有漂亮的用户界面和强大的功能,例如代码缩略图,Python的插件,代码段等。还可自定义键绑定,菜单和工具栏。Sublime Text 的主要功能包括:拼写检查,书签,完整的 Python API , Goto 功能,即时项目切换,多选择,多窗口等等。Sublime Text 是一个跨平台的编辑器,同时支持Windows、Linux、Mac OS X等操作系统。

用户登录
用户注册