首页 文章 精选 留言 我的

精选列表

搜索[学习],共10000篇文章
优秀的个人博客,低调大师

酷瓜云课堂(内网版)v1.0.0 发布,内网学习不打样

更新内容 完成后台功能 完成点播功能 完成直播功能 完成专栏功能 完成问答功能 完成群组功能 完成微聊功能 完成积分功能 完成用户功能 系统介绍 酷瓜云课堂内网版,采用C扩展框架Phalcon开发,使用本地基础服务,无营销相关功能,主要适用于公司,学校等内部网络环境使用。 系统功能 实现了点播、直播、专栏、问答、群组、微聊、积分等。 友情提示: 演示系统配置低,带宽有限,切莫压测 课程数据来源于网络(无实质内容) 管理后台已禁止数据提交,私密配置已过滤 系统演示: 前台演示 后台演示 演示账号:100015@163.com / 123456 (前后台通用) 项目组件 后台框架:phalcon 3.4.5 前端框架:layui 2.6.8, layim 3.9.8(已授权) 全文检索:xunsearch 1.4.9 即时通讯:workerman 3.5.22 基础依赖:php7.3, mysql5.7, redis5.0 项目文档 运行环境搭建 系统服务配置 客户终端配置

优秀的个人博客,低调大师

每日学习总结:局部变量、成员变量、嵌套for循环、方法(Day04)

2 --> 1 变量 1.1 概念 一般通过“变量类型 变量名 = 变量值 ;”这三部分来描述一个变量。如:int a = 3 ;变量的使用原则:就近原则,即尽量控制变量的使用范围到最小。 1.2 局部变量 位置: 定义在方法里或者局部代码块中 注意: 必须手动初始化/赋值来分配内存.如:int i = 5;或者int i; i = 5; 作用域: 也就是方法里或者局部代码块中,方法运行完内存就释放了 注意事项:变量的使用原则,成员变量与局部变量同名时,使用近的局部变量 1.2 成员变量 位置: 定义在类里方法外 注意: 不用初始化,也会自动被初始化成默认值 作用域: 整个类中,类消失了,变量才会释放 public class TestVariable2 { /** 本类测试成员变量与局部变量 */ /** 创建成员变量 * 1.位置:类里方法外 * 2.注意:无需手动初始化,会自动赋予对应类型的默认值 * 3.作用域:在整类里都生效,类消失,变量才消失 * */ int count; static int sum; public static void main(String[] args) { /**创建局部变量, * 1.位置:方法里/局部代码块里 * 2.注意:使用时必须手动初始化 * 3.作用域:当方法里/局部代码块执行完毕,变量就释放 * */ int sum = 100; System.out.println(sum); TestVariable2 test2 = new TestVariable2(); System.out.println(test2.count); /**变量:变量的使用原则,就近原则,当成员变量与局部变量同名时,使用局部变量*/ System.out.println(sum); } } 默认值: 基本类型 默认值 boolean false char ‘\u0000’ (null) byte (byte)0 short (short)0 int 0 long 0L float 0.0f double 0.0d public class TestVariable1 { /** 本类用于测试各种类型变量的默认值 */ //定义各种类型的成员变量,测试变量的默认值 byte b; short s; int i; long l; float f; char a; double d; boolean t; String x; public static void main(String[] args) { TestVariable1 test = new TestVariable1(); System.out.println(test.b); System.out.println(test.s); System.out.println(test.i); System.out.println(test.l); System.out.println(test.f); System.out.println(test.d); System.out.println(test.a); System.out.println(test.t); System.out.println(test.x); } } 2 嵌套for循环 2.1 概述 存在至少2层for循环,根据外层的条件,判断里层能否执行如果能执行,就把里层代码都循环完毕后,再继续判断是否执行外层循环的下一次循环 2.2 嵌套for循环的形式 2.2.1 练习:嵌套for循环,入门案例 总结1:外循环执行1次,内循环执行多次总结2:外循环控制行,内循环控制列 public class TestForDemo { /** 本类用于测试嵌套for循环的入门案例 */ public static void main(String[] args) { /** 外层执行一次,内层执行多次 * 外层控制的是轮数,内存控制的是每轮执行的次数 * */ for(int i = 1; i<=3; i++){ //内存循环 System.out.println("外层循环第:"+i+"轮"); for (int j = 1; j<=5 ;j++){ //外层循环 System.out.println("内层循环第第:"+j+"次"); } } System.out.println("···································"); /** 打印一个由 * 组成的矩形 */ for(int i = 1; i<=3; i++){ //内存循环 for (int j = 1; j<=5 ;j++){ //外层循环 System.out.print("*"); } System.out.println(); //空白行用来换行 } } } 2.2.2 练习2:打印直角三角形 public class TestForTriangles { /** 利用for循环打印一个左直角三角形 * * * * ** * *** * **** * ***** * ****** * */ public static void main(String[] args) { for (int i = 1; i<=6;i++){ /** 此处列数要随着行数而变化,列数最大值就是行数 */ for(int j = 1 ; j<=i;j++){ System.out.print("*");//在同一轮/同一行打印不换行 } System.out.println();//空白行用来换行 } } } 2.2.3 练习:打印99乘法表 public class TestFor99Excel { /** 打印99乘法表 */ public static void main(String[] args) { for (int i = 1;i<=9;i++){ //控制行数,打印9行 for (int j= 1; j<=i;j++){ //控制列数,i行打印i列 //拼接打印算式,后面拼接一个"\t"表示这是表格格式,\t也被称作制表符 System.out.print(j+"*"+i+"="+(j*i)+"\t"); } System.out.println( ); } } } 3 break与continue 3.1 概念 break: 直接结束当前循环,跳出循环体,简单粗暴 break以后的循环体中的语句不会继续执行,循环体外的会执行注意如果是嵌套for循环,在内层循环遇到了break,只会跳出当前这一层内循环哦 continue: 跳出本轮循环,继续下一轮循环 continue后本轮循环体中的语句不会继续执行,但是会继续执行下轮循环,循环体外的也会执行 3.2练习:测试Break与Continue import java.util.Scanner; /**需求:找数字88 * 提示并接受用户输入100次数字,如果不是88,则继续输入,找到88就结束*/ public class TestBreakAndContinue { public static void main(String[] args) { //循环体可以帮助我们执行重复的事情,控制for循环执行100次 for(int i = 1;i <= 100; i++) { //在每一次循环中都要提示并接收用户输入的数字 System.out.println("请输入数字:"); int input = new Scanner(System.in).nextInt(); if(input != 88) {//用户输入的不是88 continue;//直接继续输入 /**注意,不管是不是加continue,都可以在猜不对的情况下继续输入 * 只不过加了continue后效率更高,只要数据不等于88,就无需执行后面的代码 * 直接进行下一轮的猜数字即可* */ /**break或者continue之后都不允许写代码,都是不可到达的代码*/ //System.out.println(0);//Unreachable code } System.out.println("我是用来测试continue有没有跳过循环后半部分代码的哦"); if(input == 88) {//找到88了 System.out.println("恭喜您,猜对了!"); break;//结束程序 //System.out.println(0);//Unreachable code } } } } 4 循环结构2 : while 4.1 形式(先判断,再执行) 4 循环结构3 : do-while 4.2形式(先执行,再判断,循环体代码保证最少执行一次) 4.3 练习:猜数字之while与do-while练习 import java.util.Random; import java.util.Scanner; /** * 本类用于练习while循环 * 需求: 产生一个随机数,和用户一直在输入的数字做比较,直到用户猜对 * */ public class TestWhile { public static void main(String[] args) { int r = createNum();//调用可以生成随机数的方法,并且接收生成的随机数 System.out.println("打个小抄:"+r); //调用猜数字方法1--while //guessNum1(r); //调用猜数字方法2--do-while guessNum2(r); } public static void guessNum2(int r) { do {//先执行一次 //2.接收用户输入的值 System.out.println("猜猜看~"); int input = new Scanner(System.in).nextInt(); //3.判断是否猜对(拿用户猜的数字与生成的随机数做比较) if(input > r) { System.out.println("猜大了,继续猜猜看"); }else if(input < r) { System.out.println("猜小了,继续努力"); }else if(input == r) { System.out.println("猜对了!"); //一定注意:要设置程序出口!!! break; } }while(true); } //创建猜数字的方法 public static void guessNum1(int r) { //1.写一个死循环 while(true) {//死循环--需要设置程序的出口 //2.接收用户输入的值 System.out.println("猜猜看~"); int input = new Scanner(System.in).nextInt(); //3.判断是否猜对(拿用户猜的数字与生成的随机数做比较) if(input > r) { System.out.println("猜大了,继续猜猜看"); }else if(input < r) { System.out.println("猜小了,继续努力"); }else if(input == r) { System.out.println("猜对了!"); //一定注意:要设置程序出口!!! break; } } } //创建一个用来生成随机数的方法 public static int createNum() { //让程序产生一个随机数 //java.util.Random,注意导包,快捷键:Ctrl+Shift+O int random = new Random().nextInt(100);//参数100是自定义的,此时生成的随机数范围是[0,100)的整数 return random; } } 4.4 拓展:三种循环的区别 for:知道循环次数 while/do while:当循环次数不确定时 while:先判断,不符合规则,不执行代码 do while:代码最少被执行一次,再去判断,符合规则,再次执行代码 循环之间都可以互相替代,但是一般最好选择合适的循环结构来完成代码哦~ 5 方法 5.1 概述 被命名的代码块,方法可以含参数可以不含参数,可以提高代码的复用性。 5.2 方法定义的格式 5.3 方法调用顺序图 顺序执行代码,调用指定方法,执行完毕,返回调用位置 5.4 练习:测试方法的调用顺序/参数/返回值 public class TestMethod { //1.创建程序的入口函数main() public static void main(String[] args) { System.out.println(1); /**2.我们通过方法名+参数列表的方式来调用方法的功能*/ method1();//调用method1() System.out.println(2); method2(3);//调用method2() int result = method3(1,2);//调用method3() System.out.println(result); } /**3.如果方法想要返回值,必须修改返回值类型 * 并且return对应类型的结果 * 如果方法的返回值类型是void,不允许有返回值 * */ /*本方法用来测试方法的返回值类型*/ public static int method3(int i, int j) { /**4.通过return关键字将方法结果返回到调用位置*/ return i+j; } /**1.方法的修饰符 方法的返回值类型 方法名(方法参数){方法体}*/ /*method1()想测试方法的调用顺序*/ public static void method1() { System.out.println(5); System.out.println(6); System.out.println(7); } /*本方法用来测试方法的参数,参数的位置在小括号里*/ public static void method2(int a) { System.out.println("海绵宝宝今年:"+ a +"岁啦~"); } }

优秀的个人博客,低调大师

时序数据库Influx-IOx源码学习十一(SQL的解析)

InfluxDB是一个由InfluxData开发的开源时序数据库,专注于海量时序数据的高性能读、写、高效存储与实时分析等,在DB-Engines Ranking时序型数据库排行榜上常年排名第一。 InfluxDB可以说是当之无愧的佼佼者,但 InfluxDB CTO Paul 在 2020/12/10 号在博客中发表一篇名为:Announcing InfluxDB IOx – The Future Core of InfluxDB Built with Rust and Arrow的文章,介绍了一个新项目 InfluxDB IOx,InfluxDB 的下一代时序引擎。 接下来,我将连载对于InfluxDB IOx的源码解析过程,欢迎各位批评指正,联系方式见文章末尾。 上一章介绍了查询的主流程,详情见: https://my.oschina.net/u/3374539/blog/5034513 这章记录一下SQL的解析过程。 Influx Iox 使用了 Fusion 作为sql的查询引擎(Funsion目前是apache arrow的一个子项目)。整体查询架构如图所示: 我在网上找了找Fusion相关的文档,没有找到比较详细一些的说明,所以只能自己总结了。 通常来讲,sql的语句解析分为两个大的步骤,分别是: 逻辑执行计划(LogicPlan) 物理执行计划(PhysicalPlan) 逻辑执行计划(LogicPlan) LogicPlan主要是用来描述用户输入的SQL都包含了一些什么内容,例如: selet * from table where a = 1 ,要转换到类能解释的模型上就会成为: class SelectClass{ path : "*", from: "table", where: eq(a , 1) } 这是非常简单的情况,sql中还会掺杂大量的关键字,比如SUM、JOIN、ORDER BY等等,如果需要把所有东西都记录下来,可能类图看起来就像是这样: 在Fusion中,有一个名为parser.rs的解析器他的主要工作就是将纯SQL解析为一个程序基本可以理解的结构。主要过程有: 定义所有的关键词,能够识别出来在sql语句中的含义。比如 SELECT、INSERT 等等 遍历sql语句每个空格或者遇到表达式切分一次,然后在定义的关键词里查找是否为关键字 使用一个名叫TOKEN的枚举来表示每个节点不同的含义,比如EQ,NEQ,COMMA等等 最后存储到一个数组当中,数据结构大致如下: Ok([Word(Word { value: "select", quote_style: None, keyword: SELECT }), Whitespace(Space), Mult, Whitespace(Space), Word(Word { value: "from", quote_style: None, keyword: FROM }), Whitespace(Space), Word(Word { value: "table1", quote_style: None, keyword: NoKeyword }), Whitespace(Space), Word(Word { value: "where", quote_style: None, keyword: WHERE }), Whitespace(Space), Word(Word { value: "a", quote_style: None, keyword: NoKeyword }), Whitespace(Space), Eq, Whitespace(Space), Number("1", false)]) 按照不同的开头关键字去执行不同的分支。比如CREATE 和 SELECT 肯定后面的解析方式不一样。 封装成不同的LogicPlan子类。 pub enum LogicalPlan { //基本就是纯select Projection { ... 省略 }, //带有filter的 Filter { ... 省略 }, //是聚合的 Aggregate { ... 省略 }, //带排序的 Sort { ... 省略 }, ... 省略 } 逻辑执行计划的优化 在用户输入一段sql之后,往往他并不会意识到自己是否真的输入了非常有意义的东西,并且他也不会清楚程序到底用什么样的组织方式会让程序执行的更快,所以一般来讲,用户输入的sql是最不可信的,还需要再次执行优化。 举个简单的例子,假如用户输入select * from table where is_valid = true and is_valid !=false ,很明显可以在执行前优化成 select * from table where is_valid = true,从而减少在实际执行时,对数据库的操作。 从图中可以看到Fusion提供了5种优化,有兴趣的的可以自己了解,或者以后再做分析。 物理执行计划 物理执行计划,我理解就是将用户输入的文字性描述的信息专为真正的资源来存储,比如用户写入的是from t1,那么t1作为一个字符串存在于logic阶段,但是到物理阶段的时候,要从内存或者磁盘上取来真正指向物理资源的一个类型,存储到计划里,以备后用。 例如下面的示例当中,就是一段物理执行计划,他从上面的Projection存储的各种字符串,转换到了存储表对应的schema,以及RBChunk类型。 pub(crate) struct IOxReadFilterNode<C: PartitionChunk + 'static> { table_name: Arc<String>, schema: SchemaRef, chunk_and_infos: Vec<ChunkInfo<C>>, predicate: Predicate, } 物理执行计划的优化 对于物理计划Fasion中提供了3种优化方式,如下图: 分别是批处理、分区合并、并行度优化。主要是为了在实际实行的过程中,减少因为通讯、调用、单机等造成的响应缓慢。 在文章的最后展示一下一个物理执行之计划都包含了哪些信息: ProjectionExec { expr: [ (Column { name: "fieldKey" }, "fieldKey"), (Column { name: "tag1" }, "tag1"), (Column { name: "tag2" }, "tag2"), (Column { name: "time" }, "time")], schema: Schema { fields: [Field { name: "fieldKey", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: None }, Field { name: "tag1", data_type: Dictionary(Int32, Utf8), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: None }, Field { name: "tag2", data_type: Dictionary(Int32, Utf8), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: None }, Field { name: "time", data_type: Timestamp(Nanosecond, None), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: None }], metadata: {} }, input: RepartitionExec { input: IOxReadFilterNode { table_name: "myMeasurement", schema: Schema { fields: [Field { name: "fieldKey", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: None }, Field { name: "tag1", data_type: Dictionary(Int32, Utf8), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: None }, Field { name: "tag2", data_type: Dictionary(Int32, Utf8), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: None }, Field { name: "time", data_type: Timestamp(Nanosecond, None), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: None }], metadata: {} }, chunk_and_infos: [ChunkInfo { chunk_table_schema: Schema { inner: Schema { fields: [Field { name: "fieldKey", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: None }, Field { name: "tag1", data_type: Dictionary(Int32, Utf8), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: None }, Field { name: "tag2", data_type: Dictionary(Int32, Utf8), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: None }, Field { name: "time", data_type: Timestamp(Nanosecond, None), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: None }], metadata: {"tag2": "iox::column_type::tag", "time": "iox::column_type::timestamp", "fieldKey": "iox::column_type::field::string", "tag1": "iox::column_type::tag"} } }, chunk: MutableBuffer { 。。。省略MBChunk数据 } } 就到这里,祝玩儿的开心。 欢迎关注微信公众号: 或添加微信好友: liutaohua001

优秀的个人博客,低调大师

时序数据库Influx-IOx源码学习八(Chunk持久化)

InfluxDB是一个由InfluxData开发的开源时序数据库,专注于海量时序数据的高性能读、写、高效存储与实时分析等,在DB-Engines Ranking时序型数据库排行榜上常年排名第一。 InfluxDB可以说是当之无愧的佼佼者,但 InfluxDB CTO Paul 在 2020/12/10 号在博客中发表一篇名为:Announcing InfluxDB IOx – The Future Core of InfluxDB Built with Rust and Arrow的文章,介绍了一个新项目 InfluxDB IOx,InfluxDB 的下一代时序引擎。 接下来,我将连载对于InfluxDB IOx的源码解析过程,欢迎各位批评指正,联系方式见文章末尾。 上一章介绍了Chunk是怎样被管理的,以及各个阶段的操作。详情见: https://my.oschina.net/u/3374539/blog/5029926 这一章记录一下Chunk是怎样持久化的。 ChunkState::Moved(_) if would_write => { let partition_key = chunk_guard.key().to_string(); let table_name = chunk_guard.table_name().to_string(); let chunk_id = chunk_guard.id(); std::mem::drop(chunk_guard); write_active = true; //处于Moved状态下的Chunk会调用write_to_object_store方法进行持久化 self.write_to_object_store(partition_key, table_name, chunk_id); } //write_to_object_store实际调用到write_chunk_to_object_store_in_background方法来进行持久化 pub fn write_chunk_to_object_store_in_background( self: &Arc<Self>, partition_key: String, table_name: String, chunk_id: u32, ) -> TaskTracker<Job> { //获取数据库名称 let name = self.rules.read().name.clone(); //新建一个后台任务的管理器,用来记录db中都在执行哪些任务及状态, let (tracker, registration) = self.jobs.register(Job::WriteChunk { db_name: name.to_string(), partition_key: partition_key.clone(), table_name: table_name.clone(), chunk_id, }); let captured = Arc::clone(&self); //异步写入 let task = async move { let result = captured //真正的写入方法 .write_chunk_to_object_store(&partition_key, &table_name, chunk_id) .await; if let Err(e) = result { info!(?e, %name, %partition_key, %chunk_id, "background task error loading object store chunk"); return Err(e); } Ok(()) }; tokio::spawn(task.track(registration)); tracker } 后面的方法有点儿长,希望能够耐心观看。。 pub async fn write_chunk_to_object_store( &self, partition_key: &str, table_name: &str, chunk_id: u32, ) -> Result<Arc<DbChunk>> { //从catalog中取回chunk let chunk = { //先找partition let partition = self.catalog .valid_partition(partition_key) .context(LoadingChunkToParquet { partition_key, table_name, chunk_id, })?; let partition = partition.read(); //从partition里根据表名和chunk_id拿到chunk partition .chunk(table_name, chunk_id) .context(LoadingChunkToParquet { partition_key, table_name, chunk_id, })? }; let rb_chunk = { //先加写锁 let mut chunk = chunk.write(); //修改Chunk的状态为WritingToObjectStore chunk .set_writing_to_object_store() .context(LoadingChunkToParquet { partition_key, table_name, chunk_id, })? }; //获取所有Chunk下所有表的Statistics信息 let table_stats = rb_chunk.table_summaries(); //创建一个parquet Chunk,这个在上一章里有提到各种Chunk类型 let mut parquet_chunk = Chunk::new( partition_key.to_string(), chunk_id, //用来统计parquet占用的内存 self.memory_registries.parquet.as_ref(), ); //创建一个Storage结构,使用的是启动数据库时候指定的存储类型,这个在第3章里有提到 let storage = Storage::new( Arc::clone(&self.store), self.server_id, self.rules.read().name.to_string(), ); //遍历所有表的统计数据 for stats in table_stats { //构建一个空的查询,也就是 select * from table,不加where let predicate = read_buffer::Predicate::default(); //从rb_chunk筛选数据, Selection::All代表所有列,predicate代表没有where条件 //意思就是 `stats` 指向的单个表内的所有数据 let read_results = rb_chunk .read_filter(stats.name.as_str(), predicate, Selection::All) .context(ReadBufferChunkError { table_name, chunk_id, })?; //再拿出来schema信息,因为arrow是分开存的,所以需要拿两次 let arrow_schema: ArrowSchemaRef = rb_chunk .read_filter_table_schema(stats.name.as_str(), Selection::All) .context(ReadBufferChunkSchemaError { table_name, chunk_id, })? .into(); //再拿出来这个表里的最大最小的时间 //这个是从readBuffer::Column::from里完成的最大最小时间统计 //也就是当从mutbuffer转移到readbuffer的时候 let time_range = rb_chunk.table_time_range(stats.name.as_str()).context( ReadBufferChunkTimestampError { table_name, chunk_id, }, )?; //创建一个ReadFilterResultsStream //官方文档里面说的是这是一个转变ReadFilterResults为异步流的适配器 let stream: SendableRecordBatchStream = Box::pin( streams::ReadFilterResultsStream::new(read_results, Arc::clone(&arrow_schema)), ); // 写到持久化存储当中 let path = storage .write_to_object_store( partition_key.to_string(), chunk_id, stats.name.to_string(), stream, ) .await .context(WritingToObjectStore)?; // 这里就是把写入parquet的摘要信息存储在内存中 let schema = Arc::clone(&arrow_schema) .try_into() .context(SchemaConversion)?; let table_time_range = time_range.map(|(start, end)| TimestampRange::new(start, end)); parquet_chunk.add_table(stats, path, schema, table_time_range); } //对`catlog::chunk`加写锁,然后更新这个chunk的状态为WrittenToObjectStore let mut chunk = chunk.write(); let parquet_chunk = Arc::clone(&Arc::new(parquet_chunk)); chunk .set_written_to_object_store(parquet_chunk) .context(LoadingChunkToParquet { partition_key, table_name, chunk_id, })?; //包装`catlog::chunk`为`ParquetChunk` Ok(DbChunk::snapshot(&chunk)) } 这里面看起来有点儿绕,不容易理解的就是chunk.set_written_to_object_store这种方法。 因为Rust中enum是存在变种的,所以基于这种特性,虽然都是Chunk,但是存储的内容变化了。 pub enum ChunkState { ....省略 //这里就是mutbuffer里的chunk Moving(Arc<MBChunk>), //这里就变成存储的readbuffer的chunk结构 Moved(Arc<ReadBufferChunk>), //这里又开始存储ParquetChunk结构 WrittenToObjectStore(Arc<ReadBufferChunk>, Arc<ParquetChunk>), } 还需要继续查看storage.write_to_object_store这个逻辑,这里涉及到了从mem的arrow结构转为Parquet结构,就不在文章中展示了,使用的是arrow的ArrowWriter直接转换的。 //这里直接跳跃到ObjectStore的put方法里,来看怎么组织的写入 async fn put<S>(&self, location: &Self::Path, bytes: S, length: Option<usize>) -> Result<()> where S: Stream<Item = io::Result<Bytes>> + Send + Sync + 'static, { use ObjectStoreIntegration::*; //匹配启动时候配置的存储方式,转到真正的实现去,这里只看文件的 match (&self.0, location) { ...省略 //文件存储 (File(file), path::Path::File(location)) => file .put(location, bytes, length) .await .context(FileObjectStoreError)?, _ => unreachable!(), } Ok(()) } //为File实现了ObjectStoreApi trait,相当于文件存储时候的实际实现 async fn put<S>(&self, location: &Self::Path, bytes: S, length: Option<usize>) -> Result<()> where S: Stream<Item = io::Result<Bytes>> + Send + Sync + 'static, { //读取之前ReadFilterResultsStream里的所有数据到content里 let content = bytes .map_ok(|b| bytes::BytesMut::from(&b[..])) .try_concat() .await .context(UnableToStreamDataIntoMemory)?; //这里就是一个验证长度否则报错DataDoesNotMatchLength。宏编程,不用关注 if let Some(length) = length { ensure!( content.len() == length, DataDoesNotMatchLength { actual: content.len(), expected: length, } ); } //获取文件路径,就是启动时候配置的根路径加上数据路径 let path = self.path(location); //创建这个文件出来 let mut file = match fs::File::create(&path).await { Ok(f) => f, //如果是没有找到父路径,那就从新创建一次 Err(err) if err.kind() == std::io::ErrorKind::NotFound => { let parent = path .parent() .context(UnableToCreateFile { path: &path, err })?; fs::create_dir_all(&parent) .await .context(UnableToCreateDir { path: parent })?; match fs::File::create(&path).await { Ok(f) => f, Err(err) => return UnableToCreateFile { path, err }.fail(), } } //否则就失败了 Err(err) => return UnableToCreateFile { path, err }.fail(), }; //这里就是拷贝所有数据到这个文件中去 tokio::io::copy(&mut &content[..], &mut file) .await .context(UnableToCopyDataToFile)?; //大功告成 Ok(()) } 这个写入的逻辑比较庞大了,但是基本也能捋清楚。 先写入mutBuffer,写到一定大小会关闭 异步线程来监控是不是该关掉mutBuffer 生命周期的转换,然后开始写入readBuffer 之后开始异步的写入持久化存储 检查内存是不是需要清理readbuffer 大概就这些。源代码中还有很多逻辑没有完成,比如WAL。先整体看完流程再回来看遗漏的,留给Influx写更多完整逻辑的时间。 祝玩儿的开心。 欢迎关注微信公众号: 或添加微信好友: liutaohua001

优秀的个人博客,低调大师

时序数据库Influx-IOx源码学习六-2(数据写入)

InfluxDB是一个由InfluxData开发的开源时序数据库,专注于海量时序数据的高性能读、写、高效存储与实时分析等,在DB-Engines Ranking时序型数据库排行榜上常年排名第一。 InfluxDB可以说是当之无愧的佼佼者,但 InfluxDB CTO Paul 在 2020/12/10 号在博客中发表一篇名为:Announcing InfluxDB IOx – The Future Core of InfluxDB Built with Rust and Arrow的文章,介绍了一个新项目 InfluxDB IOx,InfluxDB 的下一代时序引擎。 接下来,我将连载对于InfluxDB IOx的源码解析过程,欢迎各位批评指正,联系方式见文章末尾。 上一章说到数据写入时的分区机制及分区现有的功能。详情见: https://my.oschina.net/u/3374539/blog/5026139 这一章记录一下数据是怎样进行存储的。 上一章没有细节的介绍数据从Line protocol被解析成了什么样子,在开篇先介绍一下数据被封装后的展示。 转换过程的代码可以参见internal_types/src/entry.rs : 157行中的build_table_write_batch方法; 内部数据结构可以查看: generated_types/protos/influxdata/write/v1/entry.fbs。 数据是被层层加码组装出来的: LP --> TableWriteBatch --> PartitionWrite --> WriteOperations --> Entry --> ShardedEntry --> SequencedEntry sharded_entries:[{ shard_id: None, entry: { fb: { operation_type: write, operation: { partition_writes:[{ key:"2019-05-02 16:00:00", table_batches:[ { name:"myMeasurement", columns:[ { name:"fieldKey", logical_column_type: Field, values_type: StringValues, values: { values:["123"] }, null_mask: None }, { name:"tag1", logical_column_type: Tag, values_type: StringValues, values: { values:["value1"]) }, null_mask: None }, { name:"tag2", logical_column_type: Tag, values_type: StringValues, values: { values:["value2"]) }, null_mask: None }, { name:"time", logical_column_type: Time, values_type: I64Values, values: { values:[1556813561098000000]) }, null_mask: None }] }] }] } } } }] 数据在内存中就会形成如上格式保存,但要注意,内存中使用的 flatbuffer 格式保存,上面只是为了展示内容。 继续上节里的内容,结构被拼凑完成之后,就会调用write_sharded_entry方法去进行实际写入工作: futures_util::future::try_join_all( sharded_entries .into_iter() //对每个数据进行写入到shard .map(|e| self.write_sharded_entry(&db_name, &db, Arc::clone(&shards), e)), ) .await?; 然后看是怎样写入到shard的,因为shard的写入还没有完成,所以只能关注单机的写入了。具体看代码: async fn write_sharded_entry( &self, db_name: &str, db: &Db, shards: Arc<HashMap<u32, NodeGroup>>, sharded_entry: ShardedEntry, ) -> Result<()> { //判断shard的id是否为null,如果是null就写入本地 //否则就写入到具体的shard去 match sharded_entry.shard_id { Some(shard_id) => { let node_group = shards.get(&shard_id).context(ShardNotFound { shard_id })?; //还没有真正的实现,可以看下面的方法 self.write_entry_downstream(db_name, node_group, &sharded_entry.entry) .await? } None => self.write_entry_local(&db, sharded_entry.entry).await?, } Ok(()) } //可以看到还没有实现远程的写入 async fn write_entry_downstream( &self, db_name: &str, node_group: &[WriterId], _entry: &Entry, ) -> Result<()> { todo!( "perform API call of sharded entry {} to one of the nodes {:?}", db_name, node_group ) } //数据对本地写入 pub async fn write_entry_local(&self, db: &Db, entry: Entry) -> Result<()> { //继续往下跟踪 db.store_entry(entry).map_err(|e| match e { db::Error::HardLimitReached {} => Error::HardLimitReached {}, _ => Error::UnknownDatabaseError { source: Box::new(e), }, })?; Ok(()) } //方法似乎什么都没做,只是增补了clock_value和write_id //注释上解释到logical clock是一个用来在数据库内部把entry变为有序的字段 pub fn store_entry(&self, entry: Entry) -> Result<()> { //生成一个新的结构SequencedEntry并增补字段 let sequenced_entry = SequencedEntry::new_from_entry_bytes( ClockValue::new(self.next_sequence()), self.server_id.get(), entry.data(), ).context(SequencedEntryError)?; //关于读缓存相关的配置和实现,先不用管 if self.rules.read().wal_buffer_config.is_some() { todo!("route to the Write Buffer. TODO: carols10cents #1157") } //继续调用其他方法 self.store_sequenced_entry(sequenced_entry) } 上面的所有方法完成之后,基本的插入数据格式就准备完成了,接下来就是写入内存存储: pub fn store_sequenced_entry(&self, sequenced_entry: SequencedEntry) -> Result<()> { //读取出数据库对于写入相关的配置信息 //包括是否可写、是否超过内存限制等等验证 let rules = self.rules.read(); let mutable_size_threshold = rules.lifecycle_rules.mutable_size_threshold; if rules.lifecycle_rules.immutable { return DatabaseNotWriteable {}.fail(); } if let Some(hard_limit) = rules.lifecycle_rules.buffer_size_hard { if self.memory_registries.bytes() > hard_limit.get() { return HardLimitReached {}.fail(); } } //rust语言中的释放变量 std::mem::drop(rules); //因为是批量写入,所以需要循环 //partition_writes的数据格式可以参见上面的json数据 if let Some(partitioned_writes) = sequenced_entry.partition_writes() { for write in partitioned_writes { let partition_key = write.key(); //根据之前生成的partition_key来得到或者创建一个partition描述 let partition = self.catalog.get_or_create_partition(partition_key); //这里是拿到一个写锁 let mut partition = partition.write(); //更新这个partition最后的插入时间 //记录这个的目的,代码上并没写明白是做什么用的 partition.update_last_write_at(); //找到一个打开的chunk //不知道为什么每次都要在所有chunk里搜索一次 //难道是同时可能有很多个chunk都可以写入? let chunk = partition.open_chunk().unwrap_or_else(|| { //否则就创建一个新的chunk出来 partition.create_open_chunk(self.memory_registries.mutable_buffer.as_ref()) }); //获取一个写锁 let mut chunk = chunk.write(); //更新当前chunk的第一条、最后一条写入记录 chunk.record_write(); //得到chunk的内存区域,称为mutable_buffer let chunk_id = chunk.id(); let mb_chunk = chunk.mutable_buffer().expect("cannot mutate open chunk"); //真正的写入到内存中 mb_chunk .write_table_batches( sequenced_entry.clock_value(), sequenced_entry.writer_id(), &write.table_batches(), ) .context(WriteEntry { partition_key, chunk_id, })?; //如果当前chunk写入数据的大小超过了设置的限制,就关闭 //关闭的意思就是把状态制为Closing,并更新关闭时间 let size = mb_chunk.size(); if let Some(threshold) = mutable_size_threshold { if size > threshold.get() { chunk.set_closing().expect("cannot close open chunk") } } } } Ok(()) } 再深入的就不继续跟踪了,但是思路还是比较清晰了。 1.分区相关 client --> grpc --> 进行分区shard --> 分区partition 2.写入相关 结构封装 LP --> TableWriteBatch --> PartitionWrite --> WriteOperations --> Entry --> ShardedEntry --> SequencedEntry 内存写入空间 catalog -> partition -> table -> column 达到指定大小后标记为关闭 异步 - 后台线程进行内存整理 到这里基本就完成了所有的写入,并返回给客户端成功。 关于后台线程的内存整理再下一篇中继续介绍。 祝玩儿的开心 欢迎关注微信公众号: 或添加微信好友: liutaohua001

资源下载

更多资源
优质分享App

优质分享App

近一个月的开发和优化,本站点的第一个app全新上线。该app采用极致压缩,本体才4.36MB。系统里面做了大量数据访问、缓存优化。方便用户在手机上查看文章。后续会推出HarmonyOS的适配版本。

腾讯云软件源

腾讯云软件源

为解决软件依赖安装时官方源访问速度慢的问题,腾讯云为一些软件搭建了缓存服务。您可以通过使用腾讯云软件源站来提升依赖包的安装速度。为了方便用户自由搭建服务架构,目前腾讯云软件源站支持公网访问和内网访问。

Rocky Linux

Rocky Linux

Rocky Linux(中文名:洛基)是由Gregory Kurtzer于2020年12月发起的企业级Linux发行版,作为CentOS稳定版停止维护后与RHEL(Red Hat Enterprise Linux)完全兼容的开源替代方案,由社区拥有并管理,支持x86_64、aarch64等架构。其通过重新编译RHEL源代码提供长期稳定性,采用模块化包装和SELinux安全架构,默认包含GNOME桌面环境及XFS文件系统,支持十年生命周期更新。

Sublime Text

Sublime Text

Sublime Text具有漂亮的用户界面和强大的功能,例如代码缩略图,Python的插件,代码段等。还可自定义键绑定,菜单和工具栏。Sublime Text 的主要功能包括:拼写检查,书签,完整的 Python API , Goto 功能,即时项目切换,多选择,多窗口等等。Sublime Text 是一个跨平台的编辑器,同时支持Windows、Linux、Mac OS X等操作系统。

用户登录
用户注册