您现在的位置是:首页 > 文章详情

时序数据库Influx-IOx源码学习六-2(数据写入)

日期:2021-04-21点击:442

InfluxDB是一个由InfluxData开发的开源时序数据库,专注于海量时序数据的高性能读、写、高效存储与实时分析等,在DB-Engines Ranking时序型数据库排行榜上常年排名第一。

InfluxDB可以说是当之无愧的佼佼者,但 InfluxDB CTO Paul 在 2020/12/10 号在博客中发表一篇名为:Announcing InfluxDB IOx – The Future Core of InfluxDB Built with Rust and Arrow的文章,介绍了一个新项目 InfluxDB IOx,InfluxDB 的下一代时序引擎。

接下来,我将连载对于InfluxDB IOx的源码解析过程,欢迎各位批评指正,联系方式见文章末尾。


上一章说到数据写入时的分区机制及分区现有的功能。详情见: https://my.oschina.net/u/3374539/blog/5026139

这一章记录一下数据是怎样进行存储的。


上一章没有细节的介绍数据从Line protocol被解析成了什么样子,在开篇先介绍一下数据被封装后的展示。

转换过程的代码可以参见internal_types/src/entry.rs : 157行中的build_table_write_batch方法;

内部数据结构可以查看: generated_types/protos/influxdata/write/v1/entry.fbs

数据是被层层加码组装出来的:

LP --> TableWriteBatch --> PartitionWrite --> WriteOperations --> Entry --> ShardedEntry --> SequencedEntry

sharded_entries:[{ shard_id: None, entry: { fb: { operation_type: write, operation: { partition_writes:[{ key:"2019-05-02 16:00:00", table_batches:[ { name:"myMeasurement", columns:[ { name:"fieldKey", logical_column_type: Field, values_type: StringValues, values: { values:["123"] }, null_mask: None }, { name:"tag1", logical_column_type: Tag, values_type: StringValues, values: { values:["value1"]) }, null_mask: None }, { name:"tag2", logical_column_type: Tag, values_type: StringValues, values: { values:["value2"]) }, null_mask: None }, { name:"time", logical_column_type: Time, values_type: I64Values, values: { values:[1556813561098000000]) }, null_mask: None }] }] }] } } } }] 

数据在内存中就会形成如上格式保存,但要注意,内存中使用的 flatbuffer 格式保存,上面只是为了展示内容

继续上节里的内容,结构被拼凑完成之后,就会调用write_sharded_entry方法去进行实际写入工作:

futures_util::future::try_join_all( sharded_entries .into_iter() //对每个数据进行写入到shard .map(|e| self.write_sharded_entry(&db_name, &db, Arc::clone(&shards), e)), ) .await?; 

然后看是怎样写入到shard的,因为shard的写入还没有完成,所以只能关注单机的写入了。具体看代码:

 async fn write_sharded_entry( &self, db_name: &str, db: &Db, shards: Arc<HashMap<u32, NodeGroup>>, sharded_entry: ShardedEntry, ) -> Result<()> { //判断shard的id是否为null,如果是null就写入本地 //否则就写入到具体的shard去 match sharded_entry.shard_id { Some(shard_id) => { let node_group = shards.get(&shard_id).context(ShardNotFound { shard_id })?; //还没有真正的实现,可以看下面的方法 self.write_entry_downstream(db_name, node_group, &sharded_entry.entry) .await? } None => self.write_entry_local(&db, sharded_entry.entry).await?, } Ok(()) } //可以看到还没有实现远程的写入 async fn write_entry_downstream( &self, db_name: &str, node_group: &[WriterId], _entry: &Entry, ) -> Result<()> { todo!( "perform API call of sharded entry {} to one of the nodes {:?}", db_name, node_group ) } //数据对本地写入 pub async fn write_entry_local(&self, db: &Db, entry: Entry) -> Result<()> { //继续往下跟踪 db.store_entry(entry).map_err(|e| match e { db::Error::HardLimitReached {} => Error::HardLimitReached {}, _ => Error::UnknownDatabaseError { source: Box::new(e), }, })?; Ok(()) } //方法似乎什么都没做,只是增补了clock_value和write_id //注释上解释到logical clock是一个用来在数据库内部把entry变为有序的字段 pub fn store_entry(&self, entry: Entry) -> Result<()> { //生成一个新的结构SequencedEntry并增补字段 let sequenced_entry = SequencedEntry::new_from_entry_bytes( ClockValue::new(self.next_sequence()), self.server_id.get(), entry.data(), ).context(SequencedEntryError)?; //关于读缓存相关的配置和实现,先不用管 if self.rules.read().wal_buffer_config.is_some() { todo!("route to the Write Buffer. TODO: carols10cents #1157") } //继续调用其他方法 self.store_sequenced_entry(sequenced_entry) } 

上面的所有方法完成之后,基本的插入数据格式就准备完成了,接下来就是写入内存存储:

pub fn store_sequenced_entry(&self, sequenced_entry: SequencedEntry) -> Result<()> { //读取出数据库对于写入相关的配置信息 //包括是否可写、是否超过内存限制等等验证 let rules = self.rules.read(); let mutable_size_threshold = rules.lifecycle_rules.mutable_size_threshold; if rules.lifecycle_rules.immutable { return DatabaseNotWriteable {}.fail(); } if let Some(hard_limit) = rules.lifecycle_rules.buffer_size_hard { if self.memory_registries.bytes() > hard_limit.get() { return HardLimitReached {}.fail(); } } //rust语言中的释放变量 std::mem::drop(rules); //因为是批量写入,所以需要循环 //partition_writes的数据格式可以参见上面的json数据 if let Some(partitioned_writes) = sequenced_entry.partition_writes() { for write in partitioned_writes { let partition_key = write.key(); //根据之前生成的partition_key来得到或者创建一个partition描述 let partition = self.catalog.get_or_create_partition(partition_key); //这里是拿到一个写锁 let mut partition = partition.write(); //更新这个partition最后的插入时间 //记录这个的目的,代码上并没写明白是做什么用的 partition.update_last_write_at(); //找到一个打开的chunk //不知道为什么每次都要在所有chunk里搜索一次 //难道是同时可能有很多个chunk都可以写入? let chunk = partition.open_chunk().unwrap_or_else(|| { //否则就创建一个新的chunk出来 partition.create_open_chunk(self.memory_registries.mutable_buffer.as_ref()) }); //获取一个写锁 let mut chunk = chunk.write(); //更新当前chunk的第一条、最后一条写入记录 chunk.record_write(); //得到chunk的内存区域,称为mutable_buffer let chunk_id = chunk.id(); let mb_chunk = chunk.mutable_buffer().expect("cannot mutate open chunk"); //真正的写入到内存中 mb_chunk .write_table_batches( sequenced_entry.clock_value(), sequenced_entry.writer_id(), &write.table_batches(), ) .context(WriteEntry { partition_key, chunk_id, })?; //如果当前chunk写入数据的大小超过了设置的限制,就关闭 //关闭的意思就是把状态制为Closing,并更新关闭时间 let size = mb_chunk.size(); if let Some(threshold) = mutable_size_threshold { if size > threshold.get() { chunk.set_closing().expect("cannot close open chunk") } } } } Ok(()) } 

再深入的就不继续跟踪了,但是思路还是比较清晰了。

1.分区相关

client --> grpc --> 进行分区shard --> 分区partition

2.写入相关

  • 结构封装

LP --> TableWriteBatch --> PartitionWrite --> WriteOperations --> Entry --> ShardedEntry --> SequencedEntry

  • 内存写入空间

catalog -> partition -> table -> column

  • 达到指定大小后标记为关闭

  • 异步 - 后台线程进行内存整理


到这里基本就完成了所有的写入,并返回给客户端成功。

关于后台线程的内存整理再下一篇中继续介绍。

祝玩儿的开心


欢迎关注微信公众号:

或添加微信好友: liutaohua001

原文链接:https://my.oschina.net/u/3374539/blog/5027429
关注公众号

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。

持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。

转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。

文章评论

共有0条评论来说两句吧...

文章二维码

扫描即可查看该文章

点击排行

推荐阅读

最新文章