时序数据库Influx-IOx源码学习六-1(数据写入之分区)
欢迎关注公众号: 上一章说到如何创建一个数据库,并且数据库的描述信息是如何保存的。详情见:https://my.oschina.net/u/3374539/blog/5025128 这一章记录一下,数据是如何写入并保存的,具体会分为两篇来写: 一篇介绍分区是如何完成的 一篇介绍具体的写入 说到数据写入,必然是需要能够连接到服务器。IOx项目为提供了多种方式可以于服务器进行交互,分别是Grpc和Http基于这两种通信方式,又扩展支持了influxdb2_client以及influxdb_iox_client。 基于influxdb_iox_client我写了一个数据写入及查询的示例来观测接口是如何组织的,代码如下: #[tokio::main] async fn main() { { let connection = Builder::default() .build("http://127.0.0.1:8081") .await .unwrap(); write::Client::new(connection) .write("a", r#"myMeasurement,tag1=value1,tag2=value2 fieldKey="123" 1556813561098000000"#) .await .expect("failed to write data"); } let connection = Builder::default() .build("http://127.0.0.1:8081") .await .unwrap(); let mut query = flight::Client::new(connection) .perform_query("a", "select * from myMeasurement") .await .expect("query request should work"); let mut batches = vec![]; while let Some(data) = query.next().await.expect("valid batches") { batches.push(data); } let format1 = format::QueryOutputFormat::Pretty; println!("{}", format1.format(&batches).unwrap()); } +------------+--------+--------+-------------------------+ | fieldKey | tag1 | tag2 | time | +------------+--------+--------+-------------------------+ | 123 | value1 | value2 | 2019-05-02 16:12:41.098 | | 123 | value1 | value2 | 2019-05-02 16:12:41.098 | | fieldValue | value1 | value2 | 2019-05-02 16:12:41.098 | | fieldValue | value1 | value2 | 2019-05-02 16:12:41.098 | | 123 | value1 | value2 | 2019-05-02 16:12:41.098 | +------------+--------+--------+-------------------------+ 因为我多运行了几次,所以能看到数据被重复插入了。 这里还需要说一下的是写入的语句格式可以参见: [LineProtocol] https://docs.influxdata.com/influxdb/v2.0/reference/syntax/line-protocol/#data-types-and-format write::Client中的write方法生成了一个WriteRequest结构,并使用RPC调用远程的write方法。打开src/influxdb_ioxd/rpc/write.rs : 22行可以看到方法的具体实现。 async fn write( &self, request: tonic::Request<WriteRequest>, ) -> Result<tonic::Response<WriteResponse>, tonic::Status> { let request = request.into_inner(); //得到上面在客户端中写入的数据库名字,在上面的例子中传入的"a" let db_name = request.db_name; //这里得到了写入的LineProtocol let lp_data = request.lp_data; let lp_chars = lp_data.len(); //解析LineProtocol的内容 //示例中的lp会被解析为: //measurement: "myMeasurement" //tag_set: [("tag1", "value1"), ("tag2", "value2")] //field_set: [("fieldKey", "123")] //timestamp: 1556813561098000000 let lines = parse_lines(&lp_data) .collect::<Result<Vec<_>, influxdb_line_protocol::Error>>() .map_err(|e| FieldViolation { field: "lp_data".into(), description: format!("Invalid Line Protocol: {}", e), })?; let lp_line_count = lines.len(); debug!(%db_name, %lp_chars, lp_line_count, "Writing lines into database"); //对数据进行保存 self.server .write_lines(&db_name, &lines) .await .map_err(default_server_error_handler)?; //返回成功 let lines_written = lp_line_count as u64; Ok(Response::new(WriteResponse { lines_written })) } 继续看self.server.write_lines的执行: pub async fn write_lines(&self, db_name: &str, lines: &[ParsedLine<'_>]) -> Result<()> { self.require_id()?; //验证一下名字,然后拿到之前创建数据库时候在内存中存储的相关信息 let db_name = DatabaseName::new(db_name).context(InvalidDatabaseName)?; let db = self .config .db(&db_name) .context(DatabaseNotFound { db_name: &*db_name })?; //这里就开始执行分片相关的策略 let (sharded_entries, shards) = { //读取创建数据库时候配置的分片策略 let rules = db.rules.read(); let shard_config = &rules.shard_config; //根据数据和shard策略,把逐个数据对应的分区找到 //写入到一个List<分区标识,List<数据>>这样的结构中 //具体的结构信息后面看 let sharded_entries = lines_to_sharded_entries(lines, shard_config.as_ref(), &*rules) .context(LineConversion)?; //再把所有分区的配置返回给调用者 let shards = shard_config .as_ref() .map(|cfg| Arc::clone(&cfg.shards)) .unwrap_or_default(); (sharded_entries, shards) }; //根据上面返回的集合进行map方法遍历,写到每个分区中 futures_util::future::try_join_all( sharded_entries .into_iter() .map(|e| self.write_sharded_entry(&db_name, &db, Arc::clone(&shards), e)), ) .await?; Ok(()) } 这里描述了写入一条数据的主逻辑:数据写入的时候,先把数据划分到具体的分区里(使用List结构存储下所有的分区对应的数据),然后并行的进行数据写入 接下来看,数据是如何进行分区的: pub fn lines_to_sharded_entries( lines: &[ParsedLine<'_>], sharder: Option<&impl Sharder>, partitioner: &impl Partitioner, ) -> Result<Vec<ShardedEntry>> { let default_time = Utc::now(); let mut sharded_lines = BTreeMap::new(); //对所有要插入的数据进行遍历 for line in lines { //先找到符合哪个shard let shard_id = match &sharder { Some(s) => Some(s.shard(line).context(GeneratingShardId)?), None => None, }; //再判断属于哪个分区 let partition_key = partitioner .partition_key(line, &default_time) .context(GeneratingPartitionKey)?; let table = line.series.measurement.as_str(); //最后存储到一个map中 //shard-> partition -> table -> List<data> 的映射关系 sharded_lines .entry(shard_id) .or_insert_with(BTreeMap::new) .entry(partition_key) .or_insert_with(BTreeMap::new) .entry(table) .or_insert_with(Vec::new) .push(line); } let default_time = Utc::now(); //最后遍历这个map 转换到之前提到的List结构中 let sharded_entries = sharded_lines .into_iter() .map(|(shard_id, partitions)| build_sharded_entry(shard_id, partitions, &default_time)) .collect::<Result<Vec<_>>>()?; Ok(sharded_entries) } 这里理解shard的概念就是一个或者一组机器,称为一个shard,他们负责真正的存储数据。 partition理解为一个个文件夹,在shard上具体的存储路径。 这里看一下是怎样完成shard的划分的: impl Sharder for ShardConfig { fn shard(&self, line: &ParsedLine<'_>) -> Result<ShardId, Error> { if let Some(specific_targets) = &self.specific_targets { //如果对数据进行匹配,如果符合规则就返回,可以采用当前的shard //官方的代码中只实现了根据表名进行shard的策略 //这个配置似乎只能通过grpc来进行设置,这样好处可能是将来有个什么管理界面能动态修改 if specific_targets.matcher.match_line(line) { return Ok(specific_targets.shard); } } //如果没有配置就使用hash的方式 //对整条数据进行hash,然后比较机器的hash,找到合适的节点 //如果没找到,就放在hashring的第一个节点 //hash算法见后面 if let Some(hash_ring) = &self.hash_ring { return hash_ring .shards .find(LineHasher { line, hash_ring }) .context(NoShardsDefined); } NoShardingRuleMatches { line: line.to_string(), } .fail() } } //具体的Hash算法,如果全配置的话分的就会特别散,几乎不同测点都放到了不同的地方 impl<'a, 'b, 'c> Hash for LineHasher<'a, 'b, 'c> { fn hash<H: Hasher>(&self, state: &mut H) { //如果配置了使用table名字就在hash中加入tablename if self.hash_ring.table_name { self.line.series.measurement.hash(state); } //然后按照配置的列的值进行hash for column in &self.hash_ring.columns { if let Some(tag_value) = self.line.tag_value(column) { tag_value.hash(state); } else if let Some(field_value) = self.line.field_value(column) { field_value.to_string().hash(state);t } state.write_u8(0); // column separator } } } 接下来看默认的partition分区方式: impl Partitioner for PartitionTemplate { fn partition_key(&self, line: &ParsedLine<'_>, default_time: &DateTime<Utc>) -> Result<String> { let parts: Vec<_> = self .parts .iter() //匹配分区策略,或者是单一的,或者是复合的 //目前支持基于表、值、时间 //其余还会支持正则表达式和strftime模式 .map(|p| match p { TemplatePart::Table => line.series.measurement.to_string(), TemplatePart::Column(column) => match line.tag_value(&column) { Some(v) => format!("{}_{}", column, v), None => match line.field_value(&column) { Some(v) => format!("{}_{}", column, v), None => "".to_string(), }, }, TemplatePart::TimeFormat(format) => match line.timestamp { Some(t) => Utc.timestamp_nanos(t).format(&format).to_string(), None => default_time.format(&format).to_string(), }, _ => unimplemented!(), }) .collect(); //最后返回一个组合文件名,或者是 a-b-c 或者是一个单一的值 Ok(parts.join("-")) } } 到这里分区的工作就完成了,下一篇继续分析是怎样写入的。 祝玩儿的开心