时序数据库Influx-IOx源码学习六-2(数据写入)
InfluxDB是一个由InfluxData开发的开源时序数据库,专注于海量时序数据的高性能读、写、高效存储与实时分析等,在DB-Engines Ranking时序型数据库排行榜上常年排名第一。
InfluxDB可以说是当之无愧的佼佼者,但 InfluxDB CTO Paul 在 2020/12/10 号在博客中发表一篇名为:Announcing InfluxDB IOx – The Future Core of InfluxDB Built with Rust and Arrow的文章,介绍了一个新项目 InfluxDB IOx,InfluxDB 的下一代时序引擎。
接下来,我将连载对于InfluxDB IOx的源码解析过程,欢迎各位批评指正,联系方式见文章末尾。
上一章说到数据写入时的分区机制及分区现有的功能。详情见: https://my.oschina.net/u/3374539/blog/5026139
这一章记录一下数据是怎样进行存储的。
上一章没有细节的介绍数据从Line protocol
被解析成了什么样子,在开篇先介绍一下数据被封装后的展示。
转换过程的代码可以参见
internal_types/src/entry.rs : 157行
中的build_table_write_batch
方法;内部数据结构可以查看:
generated_types/protos/influxdata/write/v1/entry.fbs
。
数据是被层层加码组装出来的:
LP --> TableWriteBatch --> PartitionWrite --> WriteOperations --> Entry --> ShardedEntry --> SequencedEntry
sharded_entries:[{ shard_id: None, entry: { fb: { operation_type: write, operation: { partition_writes:[{ key:"2019-05-02 16:00:00", table_batches:[ { name:"myMeasurement", columns:[ { name:"fieldKey", logical_column_type: Field, values_type: StringValues, values: { values:["123"] }, null_mask: None }, { name:"tag1", logical_column_type: Tag, values_type: StringValues, values: { values:["value1"]) }, null_mask: None }, { name:"tag2", logical_column_type: Tag, values_type: StringValues, values: { values:["value2"]) }, null_mask: None }, { name:"time", logical_column_type: Time, values_type: I64Values, values: { values:[1556813561098000000]) }, null_mask: None }] }] }] } } } }]
数据在内存中就会形成如上格式保存,但要注意,内存中使用的 flatbuffer 格式保存,上面只是为了展示内容。
继续上节里的内容,结构被拼凑完成之后,就会调用write_sharded_entry
方法去进行实际写入工作:
futures_util::future::try_join_all( sharded_entries .into_iter() //对每个数据进行写入到shard .map(|e| self.write_sharded_entry(&db_name, &db, Arc::clone(&shards), e)), ) .await?;
然后看是怎样写入到shard的,因为shard的写入还没有完成,所以只能关注单机的写入了。具体看代码:
async fn write_sharded_entry( &self, db_name: &str, db: &Db, shards: Arc<HashMap<u32, NodeGroup>>, sharded_entry: ShardedEntry, ) -> Result<()> { //判断shard的id是否为null,如果是null就写入本地 //否则就写入到具体的shard去 match sharded_entry.shard_id { Some(shard_id) => { let node_group = shards.get(&shard_id).context(ShardNotFound { shard_id })?; //还没有真正的实现,可以看下面的方法 self.write_entry_downstream(db_name, node_group, &sharded_entry.entry) .await? } None => self.write_entry_local(&db, sharded_entry.entry).await?, } Ok(()) } //可以看到还没有实现远程的写入 async fn write_entry_downstream( &self, db_name: &str, node_group: &[WriterId], _entry: &Entry, ) -> Result<()> { todo!( "perform API call of sharded entry {} to one of the nodes {:?}", db_name, node_group ) } //数据对本地写入 pub async fn write_entry_local(&self, db: &Db, entry: Entry) -> Result<()> { //继续往下跟踪 db.store_entry(entry).map_err(|e| match e { db::Error::HardLimitReached {} => Error::HardLimitReached {}, _ => Error::UnknownDatabaseError { source: Box::new(e), }, })?; Ok(()) } //方法似乎什么都没做,只是增补了clock_value和write_id //注释上解释到logical clock是一个用来在数据库内部把entry变为有序的字段 pub fn store_entry(&self, entry: Entry) -> Result<()> { //生成一个新的结构SequencedEntry并增补字段 let sequenced_entry = SequencedEntry::new_from_entry_bytes( ClockValue::new(self.next_sequence()), self.server_id.get(), entry.data(), ).context(SequencedEntryError)?; //关于读缓存相关的配置和实现,先不用管 if self.rules.read().wal_buffer_config.is_some() { todo!("route to the Write Buffer. TODO: carols10cents #1157") } //继续调用其他方法 self.store_sequenced_entry(sequenced_entry) }
上面的所有方法完成之后,基本的插入数据格式就准备完成了,接下来就是写入内存存储:
pub fn store_sequenced_entry(&self, sequenced_entry: SequencedEntry) -> Result<()> { //读取出数据库对于写入相关的配置信息 //包括是否可写、是否超过内存限制等等验证 let rules = self.rules.read(); let mutable_size_threshold = rules.lifecycle_rules.mutable_size_threshold; if rules.lifecycle_rules.immutable { return DatabaseNotWriteable {}.fail(); } if let Some(hard_limit) = rules.lifecycle_rules.buffer_size_hard { if self.memory_registries.bytes() > hard_limit.get() { return HardLimitReached {}.fail(); } } //rust语言中的释放变量 std::mem::drop(rules); //因为是批量写入,所以需要循环 //partition_writes的数据格式可以参见上面的json数据 if let Some(partitioned_writes) = sequenced_entry.partition_writes() { for write in partitioned_writes { let partition_key = write.key(); //根据之前生成的partition_key来得到或者创建一个partition描述 let partition = self.catalog.get_or_create_partition(partition_key); //这里是拿到一个写锁 let mut partition = partition.write(); //更新这个partition最后的插入时间 //记录这个的目的,代码上并没写明白是做什么用的 partition.update_last_write_at(); //找到一个打开的chunk //不知道为什么每次都要在所有chunk里搜索一次 //难道是同时可能有很多个chunk都可以写入? let chunk = partition.open_chunk().unwrap_or_else(|| { //否则就创建一个新的chunk出来 partition.create_open_chunk(self.memory_registries.mutable_buffer.as_ref()) }); //获取一个写锁 let mut chunk = chunk.write(); //更新当前chunk的第一条、最后一条写入记录 chunk.record_write(); //得到chunk的内存区域,称为mutable_buffer let chunk_id = chunk.id(); let mb_chunk = chunk.mutable_buffer().expect("cannot mutate open chunk"); //真正的写入到内存中 mb_chunk .write_table_batches( sequenced_entry.clock_value(), sequenced_entry.writer_id(), &write.table_batches(), ) .context(WriteEntry { partition_key, chunk_id, })?; //如果当前chunk写入数据的大小超过了设置的限制,就关闭 //关闭的意思就是把状态制为Closing,并更新关闭时间 let size = mb_chunk.size(); if let Some(threshold) = mutable_size_threshold { if size > threshold.get() { chunk.set_closing().expect("cannot close open chunk") } } } } Ok(()) }
再深入的就不继续跟踪了,但是思路还是比较清晰了。
1.分区相关
client --> grpc --> 进行分区shard --> 分区partition
2.写入相关
- 结构封装
LP --> TableWriteBatch --> PartitionWrite --> WriteOperations --> Entry --> ShardedEntry --> SequencedEntry
- 内存写入空间
catalog -> partition -> table -> column
-
达到指定大小后标记为关闭
-
异步 - 后台线程进行内存整理
到这里基本就完成了所有的写入,并返回给客户端成功。
关于后台线程的内存整理再下一篇中继续介绍。
祝玩儿的开心
欢迎关注微信公众号:
或添加微信好友: liutaohua001

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
基于 Flutter 的 Web 渲染引擎「北海」正式开源
基于 Flutter 的 Web 渲染引擎「北海」正式开源 阿里巴巴历时 3 年自研开发的 Web 渲染引擎 北海(英文名:Kraken)正式开源,致力打造易扩展,跨平台,高性能的渲染引擎,并已在优酷、大麦、天猫等业务场景中使用。 官网:https://openkraken.com Github:https://github.com/openkraken/kraken 背景 互联网业务如火如荼地发展离不开跨平台技术,而最成熟的跨平台技术就是大家熟悉的浏览器了,它与生俱来的跨平台能力、开放的标准以及强大的生态使它成为炙手可热的容器之一。而由于其本身不是为了性能而设计的,并且历史包袱重、兼容性、厂商更新慢等问题,浏览器在移动端的表现并不突出。尽管网络以及硬件的发展带来了足够多的性能红利,但是日益复杂的业务总能把已有的性能吃透。 过去也有很多对跨平台方案的探索与实践,新的技术方案也随着历史的浪潮不断地发展。从最早的 H5 方案到 Hybrid 方案,以及后来的 Weex/React Native 方案,到现在如火如荼的 Flutter。 Flutter 由于其精简的渲染管线,高效的布局渲染能...
- 下一篇
酷瓜云课堂 v1.3.2 发布,开源网课系统,可免费商用
更新内容 前台增文章和章节评论功能 后台增加评论相关管理功能 优化课程,章节,文章等前台界面 优化分享链接的生成和跳转方式 优化课程,章节,文章相关js 优化后台数据展示 修正后台分类二级分类错位问题 修正文章命名空间问题 修正后台轮播没有保存问题 项目介绍 酷瓜云课堂,依托腾讯云基础服务架构,采用C扩展框架Phalcon开发,GPL-2.0开源协议,致力开源网课系统,开源网校系统,开源在线教育系统。 系统功能 实现了点播、直播、专栏、面授、会员、群组、积分、秒杀等,100%真开源在线教育解决方案,可以免费商用。 托管仓库 gitee仓库 github仓库 意见反馈 在线反馈(推荐) 官方论坛(推荐) 开源助力 毫无保留的真开源不容易,如果对你有帮助,请给我们 STAR !!!
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- Docker快速安装Oracle11G,搭建oracle11g学习环境
- CentOS7,8上快速安装Gitea,搭建Git服务器
- CentOS8安装MyCat,轻松搞定数据库的读写分离、垂直分库、水平分库
- SpringBoot2整合Thymeleaf,官方推荐html解决方案
- CentOS7,CentOS8安装Elasticsearch6.8.6
- CentOS关闭SELinux安全模块
- SpringBoot2初体验,简单认识spring boot2并且搭建基础工程
- Eclipse初始化配置,告别卡顿、闪退、编译时间过长
- SpringBoot2整合Redis,开启缓存,提高访问速度
- CentOS7设置SWAP分区,小内存服务器的救世主