首页 文章 精选 留言 我的

精选列表

搜索[学习],共10000篇文章
优秀的个人博客,低调大师

时序数据库Influx-IOx源码学习七(Chunk的生命周期)

InfluxDB是一个由InfluxData开发的开源时序数据库,专注于海量时序数据的高性能读、写、高效存储与实时分析等,在DB-Engines Ranking时序型数据库排行榜上常年排名第一。 InfluxDB可以说是当之无愧的佼佼者,但 InfluxDB CTO Paul 在 2020/12/10 号在博客中发表一篇名为:Announcing InfluxDB IOx – The Future Core of InfluxDB Built with Rust and Arrow的文章,介绍了一个新项目 InfluxDB IOx,InfluxDB 的下一代时序引擎。 接下来,我将连载对于InfluxDB IOx的源码解析过程,欢迎各位批评指正,联系方式见文章末尾。 上一章介绍了数据从客户端写入到服务器端的内存中的整个过程。详情见: https://my.oschina.net/u/3374539/blog/5027429 这一章记录一下数据库中数据管理单元Chunk的生命周期。 在开篇,先介绍一下一个Chunk拥有的生命周期: //这里需要注意,这些变体里的Chunk结构都是不相同的 //也就是有内存数据拷贝的工作 pub enum ChunkState { //内部移动数据时候用的 Invalid, //可以写入 Open(MBChunk), //还能继续写入,但很快会被关闭 Closing(MBChunk), //已经不能写入了,准备移动到readbuffer Moving(Arc<MBChunk>), //已经被移动到了read buffer Moved(Arc<ReadBufferChunk>), //准备写入持久化存储 WritingToObjectStore(Arc<ReadBufferChunk>), //写入持久化存储完成 WrittenToObjectStore(Arc<ReadBufferChunk>, Arc<ParquetChunk>), } 在第五章中有提到,在Create Database之后,会启动一个后台线程。 该后台线程完成了部分对Chunk的管理功能,通过理解这个后台线程,能够基本理解Chunk的所有生命周期。 //后台线程的方法入口,在创建完成数据库后,就会调用到这个方法 pub async fn background_worker( self: &Arc<Self>, shutdown: tokio_util::sync::CancellationToken, ) { //创建一个定时器,周期性的执行 let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(1)); let mut lifecycle_manager = LifecycleManager::new(Arc::clone(&self)); //没有收到停止服务器时候的信号就一直执行,1秒一次 while !shutdown.is_cancelled() { //记录执行的次数,每次加1,Ordering::Relaxed代表的单线程里的原子操作 self.worker_iterations.fetch_add(1, Ordering::Relaxed); //进入生命周期的管理 lifecycle_manager.check_for_work(); //收到不同信号之后的处理方法 tokio::select! { _ = interval.tick() => {}, _ = shutdown.cancelled() => break } } info!("finished background worker"); } 前方高能,请注意: fn check_for_work(&mut self, now: DateTime<Utc>) { //获取创建数据库的时候,对于Chunk的相关配置 let rules = self.rules(); //根据配置的排序规则,获取出内存里所有的chunk let chunks = self.chunks(&rules.sort_order); let mut buffer_size = 0; //判断是不是有其他的任务正在执行,move我理解针对于read buffer,write对于持久化 let mut move_active = self.is_move_active(); let mut write_active = self.is_write_active(); //遍历所有块,检查哪些块可以被持久化 for chunk in &chunks { //获取当前chunk的锁 let chunk_guard = chunk.upgradable_read(); //获取chunk占用的内存大小 buffer_size += Self::chunk_size(&*chunk_guard); //没有移动任务并且Chunk里最后的写入时间比较老 let would_move = !move_active && can_move(&rules, &*chunk_guard, now); //没有写出任务,并且开启了持久化 let would_write = !write_active && rules.persist; //判断chunk的生命周期 match chunk_guard.state() { //属于open状态,并且是需要移动的(上面的逻辑里有展示什么是需要移动的) //这里我理解就是相当于实时写入时候的一个补充方案 //试想,如果一个chunk一直不写入数据,可能有一年了,查询都不再用这些数据了,内存却被一直占用 ChunkState::Open(_) if would_move => { let mut chunk_guard = RwLockUpgradableReadGuard::upgrade(chunk_guard); //切换状态到closing chunk_guard.set_closing().expect("cannot close open chunk"); let partition_key = chunk_guard.key().to_string(); let chunk_id = chunk_guard.id(); std::mem::drop(chunk_guard); move_active = true; //移动到read_buffer,变为不可写入状态(启动了一个异步的线程,后面看) self.move_to_read_buffer(partition_key, chunk_id); } //这里有几种情况,同样会在别处触发为closing //例如:chunk大小超过了设置的可变内存大小的时候 ChunkState::Closing(_) if would_move => { let partition_key = chunk_guard.key().to_string(); let chunk_id = chunk_guard.id(); std::mem::drop(chunk_guard); move_active = true; //移动到read_buffer self.move_to_read_buffer(partition_key, chunk_id); } //已经被挪动到readbuffer中的 ChunkState::Moved(_) if would_write => { let partition_key = chunk_guard.key().to_string(); let chunk_id = chunk_guard.id(); std::mem::drop(chunk_guard); write_active = true; //写入到对象存储 self.write_to_object_store(partition_key, chunk_id); } _ => {} } } //这里是主要检查内存限制的逻辑,当所有chunk的大小超过限制的时候就要清理Chunk if let Some(soft_limit) = rules.buffer_size_soft { let mut chunks = chunks.iter(); while buffer_size > soft_limit.get() { match chunks.next() { Some(chunk) => { //获取读锁 let chunk_guard = chunk.read(); //如果配置了可以清理未持久化数据,那么处在read_buffer里的数据也会被清理 //一定会清理已经被持久化到对象存储上的数据 if (rules.drop_non_persisted && matches!(chunk_guard.state(), ChunkState::Moved(_))) || matches!(chunk_guard.state(), ChunkState::WrittenToObjectStore(_, _)) { let partition_key = chunk_guard.key().to_string(); let chunk_id = chunk_guard.id(); buffer_size = buffer_size.saturating_sub(Self::chunk_size(&*chunk_guard)); std::mem::drop(chunk_guard); //真真正正的删除逻辑后面看 self.drop_chunk(partition_key, chunk_id) } } //没有什么可以释放的了 None => { warn!(db_name=self.db_name(), soft_limit, buffer_size, "soft limited exceeded, but no chunks found that can be evicted. Check lifecycle rules"); break; } } } } } 这里基本看清楚了Chunk的周期: 在写入时候,如果没有Chunk就会open一个,并处在open状态。 如果写入超过了一些限制,就会被标记为closing;如果数据时间超过了配置的时间,也会被标记为closing。标记为closing的会添加一个后台进程,准备将Chunk移动到read_buffer中。 后台任务启动后,会标记为moving状态,此时禁止Chunk再写入任何数据。 一旦移动完成,会被标记为moved。 程序会对moved状态下的Chunk开始进行持久化。 扫描任务会不断判断内存使用是否超过了限制,如果超过限制,会清理已经持久化的Chunk。如果配置了drop_non_persisted,会把read_buffer中未持久化的也删除掉。 然后继续看程序是怎样将一个chunk移动到read_buffer的,因为篇幅的影响,将会在下一篇介绍数据是怎样真正写入到持久化存储当中的。 pub async fn load_chunk_to_read_buffer( &self, partition_key: &str, chunk_id: u32, ) -> Result<Arc<DbChunk>> { //根据partition_key及chunk_id获取内存中存储的Chunk let chunk = { let partition = self .catalog .valid_partition(partition_key) .context(LoadingChunk { partition_key, chunk_id, })?; let partition = partition.read(); partition.chunk(chunk_id).context(LoadingChunk { partition_key, chunk_id, })? }; //设置当前的Chunk为Moving状态 let mb_chunk = { let mut chunk = chunk.write(); chunk.set_moving().context(LoadingChunk { partition_key, chunk_id, })? }; info!(%partition_key, %chunk_id, "chunk marked MOVING, loading tables into read buffer"); let mut batches = Vec::new(); //这里是拿到Chunk中每个Cloumn的统计信息,分别是min,max,count let table_stats = mb_chunk.table_summaries(); //从新创建一个ReadBufferChunk,后面准备把所有数据都拷贝到这里 //还需要告诉内存管理这里新申请了多少空间 let rb_chunk = ReadBufferChunk::new_with_memory_tracker(chunk_id, &self.memory_registries.read_buffer); for stats in table_stats { //把内存中的数据,全部重新拷贝一次,转换为arrow格式 mb_chunk .table_to_arrow(&mut batches, &stats.name, Selection::All) //这里应该是还没有写完,如果出现错误,这个Chunk该怎么处理? .expect("Loading chunk to mutable buffer"); //循环拷贝 for batch in batches.drain(..) { rb_chunk.upsert_table(&stats.name, batch) } } let mut chunk = chunk.write(); //更新写入缓存里的Chunk为Moved状态,同时Chunk内容修改为了ReadBuffer的Chunk //对于Chunk的结构后面看 chunk.set_moved(Arc::new(rb_chunk)).context(LoadingChunk { partition_key, chunk_id, })?; //工作全部都完成了,调用做快照的方法,方法里什么都没做,返回新Chunk的一个Arc指针 Ok(DbChunk::snapshot(&chunk)) } 到这里基本清楚了整个Chunk的工作方式,因为Chunk这个名字被代码中重复使用到了,所以特意在文章末尾说一下都有什么Chunk。 //主要是存储一个数据块的描述信息,名字、最后写入时间等 Server::db::catalog::chunk //数据从客户端直接写入的内存块 mutable_buffer::chunk //在moving时候拷贝的新数据块,arrow结构 read_buffer::chunk //parquet对应的chunk parquet_file::chunk //query模块下对PartitionChunk重新命名了一下 //对于相同的partition key的数据抽象的行为 query -> type Chunk: PartitionChunk; //实现PartitionChunk定义的方法,对不同位置下的chunk的操作 //如ParquetFile、MutableBuffer等 server::db::chunk 好了就到这里,希望你也学到了很多 祝玩儿的开心 欢迎关注微信公众号: 或添加微信好友: liutaohua001

优秀的个人博客,低调大师

时序数据库Influx-IOx源码学习六-1(数据写入之分区)

欢迎关注公众号: 上一章说到如何创建一个数据库,并且数据库的描述信息是如何保存的。详情见:https://my.oschina.net/u/3374539/blog/5025128 这一章记录一下,数据是如何写入并保存的,具体会分为两篇来写: 一篇介绍分区是如何完成的 一篇介绍具体的写入 说到数据写入,必然是需要能够连接到服务器。IOx项目为提供了多种方式可以于服务器进行交互,分别是Grpc和Http基于这两种通信方式,又扩展支持了influxdb2_client以及influxdb_iox_client。 基于influxdb_iox_client我写了一个数据写入及查询的示例来观测接口是如何组织的,代码如下: #[tokio::main] async fn main() { { let connection = Builder::default() .build("http://127.0.0.1:8081") .await .unwrap(); write::Client::new(connection) .write("a", r#"myMeasurement,tag1=value1,tag2=value2 fieldKey="123" 1556813561098000000"#) .await .expect("failed to write data"); } let connection = Builder::default() .build("http://127.0.0.1:8081") .await .unwrap(); let mut query = flight::Client::new(connection) .perform_query("a", "select * from myMeasurement") .await .expect("query request should work"); let mut batches = vec![]; while let Some(data) = query.next().await.expect("valid batches") { batches.push(data); } let format1 = format::QueryOutputFormat::Pretty; println!("{}", format1.format(&batches).unwrap()); } +------------+--------+--------+-------------------------+ | fieldKey | tag1 | tag2 | time | +------------+--------+--------+-------------------------+ | 123 | value1 | value2 | 2019-05-02 16:12:41.098 | | 123 | value1 | value2 | 2019-05-02 16:12:41.098 | | fieldValue | value1 | value2 | 2019-05-02 16:12:41.098 | | fieldValue | value1 | value2 | 2019-05-02 16:12:41.098 | | 123 | value1 | value2 | 2019-05-02 16:12:41.098 | +------------+--------+--------+-------------------------+ 因为我多运行了几次,所以能看到数据被重复插入了。 这里还需要说一下的是写入的语句格式可以参见: [LineProtocol] https://docs.influxdata.com/influxdb/v2.0/reference/syntax/line-protocol/#data-types-and-format write::Client中的write方法生成了一个WriteRequest结构,并使用RPC调用远程的write方法。打开src/influxdb_ioxd/rpc/write.rs : 22行可以看到方法的具体实现。 async fn write( &self, request: tonic::Request<WriteRequest>, ) -> Result<tonic::Response<WriteResponse>, tonic::Status> { let request = request.into_inner(); //得到上面在客户端中写入的数据库名字,在上面的例子中传入的"a" let db_name = request.db_name; //这里得到了写入的LineProtocol let lp_data = request.lp_data; let lp_chars = lp_data.len(); //解析LineProtocol的内容 //示例中的lp会被解析为: //measurement: "myMeasurement" //tag_set: [("tag1", "value1"), ("tag2", "value2")] //field_set: [("fieldKey", "123")] //timestamp: 1556813561098000000 let lines = parse_lines(&lp_data) .collect::<Result<Vec<_>, influxdb_line_protocol::Error>>() .map_err(|e| FieldViolation { field: "lp_data".into(), description: format!("Invalid Line Protocol: {}", e), })?; let lp_line_count = lines.len(); debug!(%db_name, %lp_chars, lp_line_count, "Writing lines into database"); //对数据进行保存 self.server .write_lines(&db_name, &lines) .await .map_err(default_server_error_handler)?; //返回成功 let lines_written = lp_line_count as u64; Ok(Response::new(WriteResponse { lines_written })) } 继续看self.server.write_lines的执行: pub async fn write_lines(&self, db_name: &str, lines: &[ParsedLine<'_>]) -> Result<()> { self.require_id()?; //验证一下名字,然后拿到之前创建数据库时候在内存中存储的相关信息 let db_name = DatabaseName::new(db_name).context(InvalidDatabaseName)?; let db = self .config .db(&db_name) .context(DatabaseNotFound { db_name: &*db_name })?; //这里就开始执行分片相关的策略 let (sharded_entries, shards) = { //读取创建数据库时候配置的分片策略 let rules = db.rules.read(); let shard_config = &rules.shard_config; //根据数据和shard策略,把逐个数据对应的分区找到 //写入到一个List<分区标识,List<数据>>这样的结构中 //具体的结构信息后面看 let sharded_entries = lines_to_sharded_entries(lines, shard_config.as_ref(), &*rules) .context(LineConversion)?; //再把所有分区的配置返回给调用者 let shards = shard_config .as_ref() .map(|cfg| Arc::clone(&cfg.shards)) .unwrap_or_default(); (sharded_entries, shards) }; //根据上面返回的集合进行map方法遍历,写到每个分区中 futures_util::future::try_join_all( sharded_entries .into_iter() .map(|e| self.write_sharded_entry(&db_name, &db, Arc::clone(&shards), e)), ) .await?; Ok(()) } 这里描述了写入一条数据的主逻辑:数据写入的时候,先把数据划分到具体的分区里(使用List结构存储下所有的分区对应的数据),然后并行的进行数据写入 接下来看,数据是如何进行分区的: pub fn lines_to_sharded_entries( lines: &[ParsedLine<'_>], sharder: Option<&impl Sharder>, partitioner: &impl Partitioner, ) -> Result<Vec<ShardedEntry>> { let default_time = Utc::now(); let mut sharded_lines = BTreeMap::new(); //对所有要插入的数据进行遍历 for line in lines { //先找到符合哪个shard let shard_id = match &sharder { Some(s) => Some(s.shard(line).context(GeneratingShardId)?), None => None, }; //再判断属于哪个分区 let partition_key = partitioner .partition_key(line, &default_time) .context(GeneratingPartitionKey)?; let table = line.series.measurement.as_str(); //最后存储到一个map中 //shard-> partition -> table -> List<data> 的映射关系 sharded_lines .entry(shard_id) .or_insert_with(BTreeMap::new) .entry(partition_key) .or_insert_with(BTreeMap::new) .entry(table) .or_insert_with(Vec::new) .push(line); } let default_time = Utc::now(); //最后遍历这个map 转换到之前提到的List结构中 let sharded_entries = sharded_lines .into_iter() .map(|(shard_id, partitions)| build_sharded_entry(shard_id, partitions, &default_time)) .collect::<Result<Vec<_>>>()?; Ok(sharded_entries) } 这里理解shard的概念就是一个或者一组机器,称为一个shard,他们负责真正的存储数据。 partition理解为一个个文件夹,在shard上具体的存储路径。 这里看一下是怎样完成shard的划分的: impl Sharder for ShardConfig { fn shard(&self, line: &ParsedLine<'_>) -> Result<ShardId, Error> { if let Some(specific_targets) = &self.specific_targets { //如果对数据进行匹配,如果符合规则就返回,可以采用当前的shard //官方的代码中只实现了根据表名进行shard的策略 //这个配置似乎只能通过grpc来进行设置,这样好处可能是将来有个什么管理界面能动态修改 if specific_targets.matcher.match_line(line) { return Ok(specific_targets.shard); } } //如果没有配置就使用hash的方式 //对整条数据进行hash,然后比较机器的hash,找到合适的节点 //如果没找到,就放在hashring的第一个节点 //hash算法见后面 if let Some(hash_ring) = &self.hash_ring { return hash_ring .shards .find(LineHasher { line, hash_ring }) .context(NoShardsDefined); } NoShardingRuleMatches { line: line.to_string(), } .fail() } } //具体的Hash算法,如果全配置的话分的就会特别散,几乎不同测点都放到了不同的地方 impl<'a, 'b, 'c> Hash for LineHasher<'a, 'b, 'c> { fn hash<H: Hasher>(&self, state: &mut H) { //如果配置了使用table名字就在hash中加入tablename if self.hash_ring.table_name { self.line.series.measurement.hash(state); } //然后按照配置的列的值进行hash for column in &self.hash_ring.columns { if let Some(tag_value) = self.line.tag_value(column) { tag_value.hash(state); } else if let Some(field_value) = self.line.field_value(column) { field_value.to_string().hash(state);t } state.write_u8(0); // column separator } } } 接下来看默认的partition分区方式: impl Partitioner for PartitionTemplate { fn partition_key(&self, line: &ParsedLine<'_>, default_time: &DateTime<Utc>) -> Result<String> { let parts: Vec<_> = self .parts .iter() //匹配分区策略,或者是单一的,或者是复合的 //目前支持基于表、值、时间 //其余还会支持正则表达式和strftime模式 .map(|p| match p { TemplatePart::Table => line.series.measurement.to_string(), TemplatePart::Column(column) => match line.tag_value(&column) { Some(v) => format!("{}_{}", column, v), None => match line.field_value(&column) { Some(v) => format!("{}_{}", column, v), None => "".to_string(), }, }, TemplatePart::TimeFormat(format) => match line.timestamp { Some(t) => Utc.timestamp_nanos(t).format(&format).to_string(), None => default_time.format(&format).to_string(), }, _ => unimplemented!(), }) .collect(); //最后返回一个组合文件名,或者是 a-b-c 或者是一个单一的值 Ok(parts.join("-")) } } 到这里分区的工作就完成了,下一篇继续分析是怎样写入的。 祝玩儿的开心

资源下载

更多资源
优质分享App

优质分享App

近一个月的开发和优化,本站点的第一个app全新上线。该app采用极致压缩,本体才4.36MB。系统里面做了大量数据访问、缓存优化。方便用户在手机上查看文章。后续会推出HarmonyOS的适配版本。

腾讯云软件源

腾讯云软件源

为解决软件依赖安装时官方源访问速度慢的问题,腾讯云为一些软件搭建了缓存服务。您可以通过使用腾讯云软件源站来提升依赖包的安装速度。为了方便用户自由搭建服务架构,目前腾讯云软件源站支持公网访问和内网访问。

Spring

Spring

Spring框架(Spring Framework)是由Rod Johnson于2002年提出的开源Java企业级应用框架,旨在通过使用JavaBean替代传统EJB实现方式降低企业级编程开发的复杂性。该框架基于简单性、可测试性和松耦合性设计理念,提供核心容器、应用上下文、数据访问集成等模块,支持整合Hibernate、Struts等第三方框架,其适用范围不仅限于服务器端开发,绝大多数Java应用均可从中受益。

Sublime Text

Sublime Text

Sublime Text具有漂亮的用户界面和强大的功能,例如代码缩略图,Python的插件,代码段等。还可自定义键绑定,菜单和工具栏。Sublime Text 的主要功能包括:拼写检查,书签,完整的 Python API , Goto 功能,即时项目切换,多选择,多窗口等等。Sublime Text 是一个跨平台的编辑器,同时支持Windows、Linux、Mac OS X等操作系统。

用户登录
用户注册