首页 文章 精选 留言 我的

精选列表

搜索[学习],共10000篇文章
优秀的个人博客,低调大师

时序数据库Influx-IOx源码学习八(Chunk持久化)

InfluxDB是一个由InfluxData开发的开源时序数据库,专注于海量时序数据的高性能读、写、高效存储与实时分析等,在DB-Engines Ranking时序型数据库排行榜上常年排名第一。 InfluxDB可以说是当之无愧的佼佼者,但 InfluxDB CTO Paul 在 2020/12/10 号在博客中发表一篇名为:Announcing InfluxDB IOx – The Future Core of InfluxDB Built with Rust and Arrow的文章,介绍了一个新项目 InfluxDB IOx,InfluxDB 的下一代时序引擎。 接下来,我将连载对于InfluxDB IOx的源码解析过程,欢迎各位批评指正,联系方式见文章末尾。 上一章介绍了Chunk是怎样被管理的,以及各个阶段的操作。详情见: https://my.oschina.net/u/3374539/blog/5029926 这一章记录一下Chunk是怎样持久化的。 ChunkState::Moved(_) if would_write => { let partition_key = chunk_guard.key().to_string(); let table_name = chunk_guard.table_name().to_string(); let chunk_id = chunk_guard.id(); std::mem::drop(chunk_guard); write_active = true; //处于Moved状态下的Chunk会调用write_to_object_store方法进行持久化 self.write_to_object_store(partition_key, table_name, chunk_id); } //write_to_object_store实际调用到write_chunk_to_object_store_in_background方法来进行持久化 pub fn write_chunk_to_object_store_in_background( self: &Arc<Self>, partition_key: String, table_name: String, chunk_id: u32, ) -> TaskTracker<Job> { //获取数据库名称 let name = self.rules.read().name.clone(); //新建一个后台任务的管理器,用来记录db中都在执行哪些任务及状态, let (tracker, registration) = self.jobs.register(Job::WriteChunk { db_name: name.to_string(), partition_key: partition_key.clone(), table_name: table_name.clone(), chunk_id, }); let captured = Arc::clone(&self); //异步写入 let task = async move { let result = captured //真正的写入方法 .write_chunk_to_object_store(&partition_key, &table_name, chunk_id) .await; if let Err(e) = result { info!(?e, %name, %partition_key, %chunk_id, "background task error loading object store chunk"); return Err(e); } Ok(()) }; tokio::spawn(task.track(registration)); tracker } 后面的方法有点儿长,希望能够耐心观看。。 pub async fn write_chunk_to_object_store( &self, partition_key: &str, table_name: &str, chunk_id: u32, ) -> Result<Arc<DbChunk>> { //从catalog中取回chunk let chunk = { //先找partition let partition = self.catalog .valid_partition(partition_key) .context(LoadingChunkToParquet { partition_key, table_name, chunk_id, })?; let partition = partition.read(); //从partition里根据表名和chunk_id拿到chunk partition .chunk(table_name, chunk_id) .context(LoadingChunkToParquet { partition_key, table_name, chunk_id, })? }; let rb_chunk = { //先加写锁 let mut chunk = chunk.write(); //修改Chunk的状态为WritingToObjectStore chunk .set_writing_to_object_store() .context(LoadingChunkToParquet { partition_key, table_name, chunk_id, })? }; //获取所有Chunk下所有表的Statistics信息 let table_stats = rb_chunk.table_summaries(); //创建一个parquet Chunk,这个在上一章里有提到各种Chunk类型 let mut parquet_chunk = Chunk::new( partition_key.to_string(), chunk_id, //用来统计parquet占用的内存 self.memory_registries.parquet.as_ref(), ); //创建一个Storage结构,使用的是启动数据库时候指定的存储类型,这个在第3章里有提到 let storage = Storage::new( Arc::clone(&self.store), self.server_id, self.rules.read().name.to_string(), ); //遍历所有表的统计数据 for stats in table_stats { //构建一个空的查询,也就是 select * from table,不加where let predicate = read_buffer::Predicate::default(); //从rb_chunk筛选数据, Selection::All代表所有列,predicate代表没有where条件 //意思就是 `stats` 指向的单个表内的所有数据 let read_results = rb_chunk .read_filter(stats.name.as_str(), predicate, Selection::All) .context(ReadBufferChunkError { table_name, chunk_id, })?; //再拿出来schema信息,因为arrow是分开存的,所以需要拿两次 let arrow_schema: ArrowSchemaRef = rb_chunk .read_filter_table_schema(stats.name.as_str(), Selection::All) .context(ReadBufferChunkSchemaError { table_name, chunk_id, })? .into(); //再拿出来这个表里的最大最小的时间 //这个是从readBuffer::Column::from里完成的最大最小时间统计 //也就是当从mutbuffer转移到readbuffer的时候 let time_range = rb_chunk.table_time_range(stats.name.as_str()).context( ReadBufferChunkTimestampError { table_name, chunk_id, }, )?; //创建一个ReadFilterResultsStream //官方文档里面说的是这是一个转变ReadFilterResults为异步流的适配器 let stream: SendableRecordBatchStream = Box::pin( streams::ReadFilterResultsStream::new(read_results, Arc::clone(&arrow_schema)), ); // 写到持久化存储当中 let path = storage .write_to_object_store( partition_key.to_string(), chunk_id, stats.name.to_string(), stream, ) .await .context(WritingToObjectStore)?; // 这里就是把写入parquet的摘要信息存储在内存中 let schema = Arc::clone(&arrow_schema) .try_into() .context(SchemaConversion)?; let table_time_range = time_range.map(|(start, end)| TimestampRange::new(start, end)); parquet_chunk.add_table(stats, path, schema, table_time_range); } //对`catlog::chunk`加写锁,然后更新这个chunk的状态为WrittenToObjectStore let mut chunk = chunk.write(); let parquet_chunk = Arc::clone(&Arc::new(parquet_chunk)); chunk .set_written_to_object_store(parquet_chunk) .context(LoadingChunkToParquet { partition_key, table_name, chunk_id, })?; //包装`catlog::chunk`为`ParquetChunk` Ok(DbChunk::snapshot(&chunk)) } 这里面看起来有点儿绕,不容易理解的就是chunk.set_written_to_object_store这种方法。 因为Rust中enum是存在变种的,所以基于这种特性,虽然都是Chunk,但是存储的内容变化了。 pub enum ChunkState { ....省略 //这里就是mutbuffer里的chunk Moving(Arc<MBChunk>), //这里就变成存储的readbuffer的chunk结构 Moved(Arc<ReadBufferChunk>), //这里又开始存储ParquetChunk结构 WrittenToObjectStore(Arc<ReadBufferChunk>, Arc<ParquetChunk>), } 还需要继续查看storage.write_to_object_store这个逻辑,这里涉及到了从mem的arrow结构转为Parquet结构,就不在文章中展示了,使用的是arrow的ArrowWriter直接转换的。 //这里直接跳跃到ObjectStore的put方法里,来看怎么组织的写入 async fn put<S>(&self, location: &Self::Path, bytes: S, length: Option<usize>) -> Result<()> where S: Stream<Item = io::Result<Bytes>> + Send + Sync + 'static, { use ObjectStoreIntegration::*; //匹配启动时候配置的存储方式,转到真正的实现去,这里只看文件的 match (&self.0, location) { ...省略 //文件存储 (File(file), path::Path::File(location)) => file .put(location, bytes, length) .await .context(FileObjectStoreError)?, _ => unreachable!(), } Ok(()) } //为File实现了ObjectStoreApi trait,相当于文件存储时候的实际实现 async fn put<S>(&self, location: &Self::Path, bytes: S, length: Option<usize>) -> Result<()> where S: Stream<Item = io::Result<Bytes>> + Send + Sync + 'static, { //读取之前ReadFilterResultsStream里的所有数据到content里 let content = bytes .map_ok(|b| bytes::BytesMut::from(&b[..])) .try_concat() .await .context(UnableToStreamDataIntoMemory)?; //这里就是一个验证长度否则报错DataDoesNotMatchLength。宏编程,不用关注 if let Some(length) = length { ensure!( content.len() == length, DataDoesNotMatchLength { actual: content.len(), expected: length, } ); } //获取文件路径,就是启动时候配置的根路径加上数据路径 let path = self.path(location); //创建这个文件出来 let mut file = match fs::File::create(&path).await { Ok(f) => f, //如果是没有找到父路径,那就从新创建一次 Err(err) if err.kind() == std::io::ErrorKind::NotFound => { let parent = path .parent() .context(UnableToCreateFile { path: &path, err })?; fs::create_dir_all(&parent) .await .context(UnableToCreateDir { path: parent })?; match fs::File::create(&path).await { Ok(f) => f, Err(err) => return UnableToCreateFile { path, err }.fail(), } } //否则就失败了 Err(err) => return UnableToCreateFile { path, err }.fail(), }; //这里就是拷贝所有数据到这个文件中去 tokio::io::copy(&mut &content[..], &mut file) .await .context(UnableToCopyDataToFile)?; //大功告成 Ok(()) } 这个写入的逻辑比较庞大了,但是基本也能捋清楚。 先写入mutBuffer,写到一定大小会关闭 异步线程来监控是不是该关掉mutBuffer 生命周期的转换,然后开始写入readBuffer 之后开始异步的写入持久化存储 检查内存是不是需要清理readbuffer 大概就这些。源代码中还有很多逻辑没有完成,比如WAL。先整体看完流程再回来看遗漏的,留给Influx写更多完整逻辑的时间。 祝玩儿的开心。 欢迎关注微信公众号: 或添加微信好友: liutaohua001

优秀的个人博客,低调大师

时序数据库Influx-IOx源码学习六-2(数据写入)

InfluxDB是一个由InfluxData开发的开源时序数据库,专注于海量时序数据的高性能读、写、高效存储与实时分析等,在DB-Engines Ranking时序型数据库排行榜上常年排名第一。 InfluxDB可以说是当之无愧的佼佼者,但 InfluxDB CTO Paul 在 2020/12/10 号在博客中发表一篇名为:Announcing InfluxDB IOx – The Future Core of InfluxDB Built with Rust and Arrow的文章,介绍了一个新项目 InfluxDB IOx,InfluxDB 的下一代时序引擎。 接下来,我将连载对于InfluxDB IOx的源码解析过程,欢迎各位批评指正,联系方式见文章末尾。 上一章说到数据写入时的分区机制及分区现有的功能。详情见: https://my.oschina.net/u/3374539/blog/5026139 这一章记录一下数据是怎样进行存储的。 上一章没有细节的介绍数据从Line protocol被解析成了什么样子,在开篇先介绍一下数据被封装后的展示。 转换过程的代码可以参见internal_types/src/entry.rs : 157行中的build_table_write_batch方法; 内部数据结构可以查看: generated_types/protos/influxdata/write/v1/entry.fbs。 数据是被层层加码组装出来的: LP --> TableWriteBatch --> PartitionWrite --> WriteOperations --> Entry --> ShardedEntry --> SequencedEntry sharded_entries:[{ shard_id: None, entry: { fb: { operation_type: write, operation: { partition_writes:[{ key:"2019-05-02 16:00:00", table_batches:[ { name:"myMeasurement", columns:[ { name:"fieldKey", logical_column_type: Field, values_type: StringValues, values: { values:["123"] }, null_mask: None }, { name:"tag1", logical_column_type: Tag, values_type: StringValues, values: { values:["value1"]) }, null_mask: None }, { name:"tag2", logical_column_type: Tag, values_type: StringValues, values: { values:["value2"]) }, null_mask: None }, { name:"time", logical_column_type: Time, values_type: I64Values, values: { values:[1556813561098000000]) }, null_mask: None }] }] }] } } } }] 数据在内存中就会形成如上格式保存,但要注意,内存中使用的 flatbuffer 格式保存,上面只是为了展示内容。 继续上节里的内容,结构被拼凑完成之后,就会调用write_sharded_entry方法去进行实际写入工作: futures_util::future::try_join_all( sharded_entries .into_iter() //对每个数据进行写入到shard .map(|e| self.write_sharded_entry(&db_name, &db, Arc::clone(&shards), e)), ) .await?; 然后看是怎样写入到shard的,因为shard的写入还没有完成,所以只能关注单机的写入了。具体看代码: async fn write_sharded_entry( &self, db_name: &str, db: &Db, shards: Arc<HashMap<u32, NodeGroup>>, sharded_entry: ShardedEntry, ) -> Result<()> { //判断shard的id是否为null,如果是null就写入本地 //否则就写入到具体的shard去 match sharded_entry.shard_id { Some(shard_id) => { let node_group = shards.get(&shard_id).context(ShardNotFound { shard_id })?; //还没有真正的实现,可以看下面的方法 self.write_entry_downstream(db_name, node_group, &sharded_entry.entry) .await? } None => self.write_entry_local(&db, sharded_entry.entry).await?, } Ok(()) } //可以看到还没有实现远程的写入 async fn write_entry_downstream( &self, db_name: &str, node_group: &[WriterId], _entry: &Entry, ) -> Result<()> { todo!( "perform API call of sharded entry {} to one of the nodes {:?}", db_name, node_group ) } //数据对本地写入 pub async fn write_entry_local(&self, db: &Db, entry: Entry) -> Result<()> { //继续往下跟踪 db.store_entry(entry).map_err(|e| match e { db::Error::HardLimitReached {} => Error::HardLimitReached {}, _ => Error::UnknownDatabaseError { source: Box::new(e), }, })?; Ok(()) } //方法似乎什么都没做,只是增补了clock_value和write_id //注释上解释到logical clock是一个用来在数据库内部把entry变为有序的字段 pub fn store_entry(&self, entry: Entry) -> Result<()> { //生成一个新的结构SequencedEntry并增补字段 let sequenced_entry = SequencedEntry::new_from_entry_bytes( ClockValue::new(self.next_sequence()), self.server_id.get(), entry.data(), ).context(SequencedEntryError)?; //关于读缓存相关的配置和实现,先不用管 if self.rules.read().wal_buffer_config.is_some() { todo!("route to the Write Buffer. TODO: carols10cents #1157") } //继续调用其他方法 self.store_sequenced_entry(sequenced_entry) } 上面的所有方法完成之后,基本的插入数据格式就准备完成了,接下来就是写入内存存储: pub fn store_sequenced_entry(&self, sequenced_entry: SequencedEntry) -> Result<()> { //读取出数据库对于写入相关的配置信息 //包括是否可写、是否超过内存限制等等验证 let rules = self.rules.read(); let mutable_size_threshold = rules.lifecycle_rules.mutable_size_threshold; if rules.lifecycle_rules.immutable { return DatabaseNotWriteable {}.fail(); } if let Some(hard_limit) = rules.lifecycle_rules.buffer_size_hard { if self.memory_registries.bytes() > hard_limit.get() { return HardLimitReached {}.fail(); } } //rust语言中的释放变量 std::mem::drop(rules); //因为是批量写入,所以需要循环 //partition_writes的数据格式可以参见上面的json数据 if let Some(partitioned_writes) = sequenced_entry.partition_writes() { for write in partitioned_writes { let partition_key = write.key(); //根据之前生成的partition_key来得到或者创建一个partition描述 let partition = self.catalog.get_or_create_partition(partition_key); //这里是拿到一个写锁 let mut partition = partition.write(); //更新这个partition最后的插入时间 //记录这个的目的,代码上并没写明白是做什么用的 partition.update_last_write_at(); //找到一个打开的chunk //不知道为什么每次都要在所有chunk里搜索一次 //难道是同时可能有很多个chunk都可以写入? let chunk = partition.open_chunk().unwrap_or_else(|| { //否则就创建一个新的chunk出来 partition.create_open_chunk(self.memory_registries.mutable_buffer.as_ref()) }); //获取一个写锁 let mut chunk = chunk.write(); //更新当前chunk的第一条、最后一条写入记录 chunk.record_write(); //得到chunk的内存区域,称为mutable_buffer let chunk_id = chunk.id(); let mb_chunk = chunk.mutable_buffer().expect("cannot mutate open chunk"); //真正的写入到内存中 mb_chunk .write_table_batches( sequenced_entry.clock_value(), sequenced_entry.writer_id(), &write.table_batches(), ) .context(WriteEntry { partition_key, chunk_id, })?; //如果当前chunk写入数据的大小超过了设置的限制,就关闭 //关闭的意思就是把状态制为Closing,并更新关闭时间 let size = mb_chunk.size(); if let Some(threshold) = mutable_size_threshold { if size > threshold.get() { chunk.set_closing().expect("cannot close open chunk") } } } } Ok(()) } 再深入的就不继续跟踪了,但是思路还是比较清晰了。 1.分区相关 client --> grpc --> 进行分区shard --> 分区partition 2.写入相关 结构封装 LP --> TableWriteBatch --> PartitionWrite --> WriteOperations --> Entry --> ShardedEntry --> SequencedEntry 内存写入空间 catalog -> partition -> table -> column 达到指定大小后标记为关闭 异步 - 后台线程进行内存整理 到这里基本就完成了所有的写入,并返回给客户端成功。 关于后台线程的内存整理再下一篇中继续介绍。 祝玩儿的开心 欢迎关注微信公众号: 或添加微信好友: liutaohua001

优秀的个人博客,低调大师

(二十七) 跟我学习SpringCloud-使用Hystrix实现容错处理

创建一个新的Maven项目 hystrix-feign-demo,增加 Hystrix 的依赖,代码如下所示。 <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-hystrix</artifactId> </dependency> 在启动类上添加 @EnableHystrix 或者 @EnableCircuitBreaker。注意,@EnableHystrix 中包含了 @EnableCircuitBreaker。 然后编写一个调用接口的方法,在上面增加一个 @HystrixCommand 注解,用于指定依赖服务调用延迟或失败时调用的方法,代码如下所示。 @GetMapping("/callHello") @HystrixCommand(fallbackMethod = "defaultCallHello") public String callHello() { String result = restTemplate.getForObject("http://localhost:8088/house/hello", String.class); return result; } 当调用失败触发熔断时会用 defaultCallHello 方法来回退具体的内容,定义 default-CallHello 方法的代码如下所示。 public String defaultCallHello() { return "fail"; } 只要不启动 8088 端口所在的服务,调用 /callHello 接口,就可以看到返回的内容是“fail”,如图 1 所示。 将启动类上的 @EnableHystrix 去掉,重启服务,再次调用 /callHello 接口可以看到返回的是 500 错误信息,这个时候就没有用到回退功能了。 { code: 500, message: "I/O error on GET request for "http://localhost:8088/house/hello": Connection refused; nested exception is java.net.ConnectException: Connection refused ", data: null } 配置详解 HystrixCommand 中除了 fallbackMethod 还有很多的配置,下面我们来看看这些配置,如下表所示: HystrixCommand 配置详解 名称 说明 hystrix.command.default.execution.isolation .strategy 该配置用来指定隔离策略,具体策略有下面 2 种。 THREAD:线程隔离,在单独的线程上执行,并发请求受线程池大小的控制。 SEMAPHORE:信号量隔离,在调用线程上执行,并发请求受信号量计数器的限制。 hystrix.command.default.execution.isolation .thread.timeoutInMilliseconds 该配置用于 HystrixCommand 执行的超时时间设置,当 HystrixCommand 执行的时间超过了该配置所设置的数值后就会进入服务降级处理,单位是毫秒,默认值为 1000。 hystrix.command.default.execution .timeout.enabled 该配置用于确定是否启用 execution.isolation.thread.timeoutInMilliseconds 设置的超时时间,默认值为 true。设置为 false 后 execution.isolation.thread.timeoutInMilliseconds 配置也将失效。 hystrix.command.default.execution.isolation .thread.interruptOnTimeout 该配置用于确定 HystrixCommand 执行超时后是否需要中断它,默认值为 true。 hystrix.command.default.execution.isolation .thread.interruptOnCancel 该配置用于确定 HystrixCommand 执行被取消时是否需要中断它,默认值为 false。 hystrix.command.default.execution.isolation .semaphore.maxConcurrentRequests 该配置用于确定 Hystrix 使用信号量策略时最大的并发请求数。 hystrix.command.default.fallback.isolation .semaphore.maxConcurrentRequests 该配置用于如果并发数达到该设置值,请求会被拒绝和抛出异常并且 fallback 不会被调用,默认值为 10。 hystrix.command.default.fallback.enabled 该配置用于确定当执行失败或者请求被拒绝时,是否会尝试调用 hystrixCommand.getFallback(),默认值为 true。 hystrix.command.default.circuitBreaker.enabled 该配置用来跟踪 circuit 的健康性,如果未达标则让 request 短路,默认值为 true。 hystrix.command.default.circuitBreaker .requestVolumeThreshold 该配置用于设置一个 rolling window 内最小的请求数。如果设为 20,那么当一个 rolling window 的时间内(比如说 1 个 rolling window 是 10 秒)收到 19 个请求,即使 19 个请求都失败,也不会触发 circuit break,默认值为 20。 hystrix.command.default.circuitBreaker .sleepWindowInMilliseconds 该配置用于设置一个触发短路的时间值,当该值设为 5000 时,则当触发 circuit break 后的 5000 毫秒内都会拒绝 request,也就是 5000 毫秒后才会关闭 circuit。默认值为 5000。 hystrix.command.default.circuitBreaker .errorThresholdPercentage 该配置用于设置错误率阈值,当错误率超过此值时,所有请求都会触发 fallback,默认值为 50。 hystrix.command.default.circuitBreaker.forceOpen 如果配置为 true,将强制打开熔断器,在这个状态下将拒绝所有请求,默认值为 false。 hystrix.command.default.circuitBreaker.forceClosed 如果配置为 true,则将强制关闭熔断器,在这个状态下,不管错误率有多高,都允许请求,默认值为 false。 hystrix.command.default.metrics .rollingStats.timeInMilliseconds 设置统计的时间窗口值,单位为毫秒。circuit break 的打开会根据 1 个 rolling window 的统计来计算。若 rolling window 被设为 10 000 毫秒,则 rolling window 会被分成多个 buckets,每个 bucket 包含 success、failure、timeout、rejection 的次数的统计信息。默认值为 10 000 毫秒。 hystrix.command.default.metrics .rollingStats.numBuckets 设置一个 rolling window 被划分的数量,若 numBuckets=10、rolling window=10 000,那么一个 bucket 的时间即 1 秒。必须符合 rolling window%numberBuckets==0。默认值为 10。 hystrix.command.default.metrics .rollingPercentile.enabled 是否开启指标的计算和跟踪,默认值为 true。 hystrix.command.default.metrics .rollingPercentile.timeInMilliseconds 设置 rolling percentile window 的时间,默认值为 60 000 毫秒 hystrix.command.default.metrics .rollingPercentile.numBuckets 设置 rolling percentile window 的 numberBuckets,默认值为 6。 hystrix.command.default.metrics .rollingPercentile.bucketSize 如果 bucket size=100、window=10 秒,若这 10 秒里有 500 次执行,只有最后 100 次执行会被统计到 bucket 里去。增加该值会增加内存开销及排序的开销。默认值为 100。 hystrix.command.default.metrics .healthSnapshot.intervalInMilliseconds 用来计算影响断路器状态的健康快照的间隔等待时间,默认值为 500 毫秒。 hystrix.command.default.requestCache.enabled 是否开启请求缓存功能,默认值为 true。 hystrix.command.default.requestLog.enabled 记录日志到 HystrixRequestLog,默认值为 true。 hystrix.collapser.default.maxRequestsInBatch 单次批处理的最大请求数,达到该数量触发批处理,默认为 Integer.MAX_VALUE。 hystrix.collapser.default.timerDelayInMilliseconds 触发批处理的延迟,延迟也可以为创建批处理的时间与该值的和,默认值为 10 毫秒。 hystrix.collapser.default.requestCache.enabled 是否启用对 HystrixCollapser.execute() 和 HystrixCollapser.queue() 的请求缓存,默认值为 true。 hystrix.threadpool.default.coreSize 并发执行的最大线程数,默认值为 10。 hystrix.threadpool.default.maxQueueSize BlockingQueue 的最大队列数。当设为 -1 时,会使用 SynchronousQueue;值为正数时,会使用 LinkedBlcokingQueue。该设置只会在初始化时有效,之后不能修改 threadpool 的 queue size。默认值为 -1。 hystrix.threadpool.default.queueSizeRejectionThreshold 即使没有达到 maxQueueSize,但若达到 queueSizeRejectionThreshold 该值后,请求也会被拒绝。因为 maxQueueSize 不能被动态修改,而 queueSizeRejectionThreshold 参数将允许我们动态设置该值。if maxQueueSize==-1,该字段将不起作用。 hystrix.threadpool.default.keepAliveTimeMinutes 设置存活时间,单位为分钟。如果 coreSize 小于 maximumSize,那么该属性控制一个线程从实用完成到被释放的时间。默认值为 1 分钟。 hystrix.threadpool.default .allowMaximumSizeToDivergeFromCoreSize 该属性允许 maximumSize 的配置生效。那么该值可以等于或高于 coreSize。设置 coreSize 小于 maximumSize 会创建一个线程池,该线程池可以支持 maximumSize 并发,但在相对不活动期间将向系统返回线程。默认值为 false。 hystrix.threadpool.default.metrics .rollingStats.timeInMilliseconds 设置滚动时间窗的时间,单位为毫秒,默认值是 10 000。 hystrix.threadpool.default.metrics .rollingStats.numBuckets 设置滚动时间窗划分桶的数量,默认值为 10。 官方的配置信息文档请参考:https://github.com/Netflix/Hystrix/wiki/Configuration。 上面列出来的都是 Hystrix 的配置信息,那么在Spring Cloud中该如何使用呢?只需要在接口的方法上面使用 HystrixCommand 注解(如下代码所示),指定对应的属性即可。 @HystrixCommand(fallbackMethod = "defaultCallHello",commandProperties = { @HystrixProperty(name="execution.isolation.strategy", value = "THREAD") } ) @GetMapping("/callHello") public String callHello() { String result = restTemplate.getForObject("http://localhost:8088/house/hello", String.class); return result; } 给大家推荐分布式架构源码

优秀的个人博客,低调大师

从头开始学习->java数据结构(一):物理上的存储结构

前言 我们都知道,所谓的数据结构,都是我们在为了更好的对数据的增删改查而创造出来的对数据的结构设计,但是我们要知道的是,这些数据结构都是抽象的逻辑结构,并不是真实的物理上的存储结构,大部分时候,我们对数据结构的讨论,也都是讨论的是逻辑上的数据结构,并不是对真实的存储在硬盘中的数据的存储结构的讨论。 真实的物理上的数据,存储到我们的硬盘上的或者是在内存上的时候,都是有着确切的存储结构的,而不是我们想象中的类似b树,二叉树这种抽象的逻辑结构。 那么,在我们的硬盘上,数据是如何存储的呢?这个问题,是我们去研究逻辑上的数据结构之前,需要搞清楚的问题,也是我们本篇文章的主题。 正文 物理存储结构,是数据的逻辑结构在计算机中的存储形式。 但是在理解物理上的数据存储结构之前,我们要先对硬盘有一个足够的了解。 硬盘是一个实际存在的物品,那么也就是说,这个硬盘不可能如我们想象出的逻辑设计那样,去存储我们的数据。想象一下,如果存一百万条数据,我们在硬盘中随意存储,不按照客观的物理规则来存储,那么我们会发现,硬盘的空间会被很大的浪费掉。 也因此,在硬盘中存储数据的时候,我们必须按照一定的客观规律来。而这种客观规律,在计算机发展的过程中,逐渐形成了现在比较常见的两种规律。 1.顺序存储结构 顺序存储结构,是将我们逻辑上的数据结构中相邻的数据,在物理位置上,也存储在相邻的位置。数据结构中的节点关系由存储单元的邻接关系来体现。 也就是说,数据间的逻辑关系和物理关系是一致的。 一般来说,在我们java语言中,描述这种物理存储结构的话,都是借助我们的 数组 来阐述的。如图所示: 这种存储结构,还是很好理解的,说白了,就是排队而已,每一条数据都是占一小段空间,按照顺序站好,占据着连续的存储空间。 1.1 优点 顺序存储结构的优点还是比较明显的,由于是按照物理上的存储空间的顺序存储数据,那么对存储空间的利用率就会非常高,存储密度比较大。 而且因为是顺序存储数据,那么也就是说,我们的数据,存储在这里的时候,天然在硬盘中的平均寻道时间中,就会快很多。 那么,什么是寻道时间呢? 先看一张关于磁盘的图片,如下: 我们发现,这种传统的硬盘,数据是存储到磁盘中的磁道(磁盘盘片)中,通过磁头(读写磁头)来查找数据。而平均寻道时间就是指磁头移动到数据所在的磁道所使用的时间的平均值。 如果硬盘的寻道速度变快,那么就意味着我们的查找数据能力也在变快。 而在我们的顺序存储结构中,我们的数据是按照磁道的轮转顺序存储到磁道上的,也就是说,当我们进行数据查找的时候,会更快的定位到实际的数据所在,这也就是意味着我们的数据查找速度变快。 <table><tr><td> 小知识; 在现代的计算机进化过程中,数据的存储有了很大的变化,比如说我们现在比较常见的固态硬盘,由于SSD固态硬盘没有磁头,所以几乎不存在寻道时间这一概念,当系统发出指令时,不需要磁头和盘片,而是直接从Flash颗粒上读取,相对传统的机械硬盘的寻道时间来说要快多了。 </td></tr></table> 顺序存储结构的优点总结如下: 数据存储密度大,存储空间的利用率要大。 查找速度比较快。 1.2 缺点 首先我们知道的是,顺序存储结构,存储的数据,在存储空间中,是连续的,如下图所示; 那么当我们要插入一条数据的时候,会怎么样呢?如图所示: 从图所示的插入过程中,我们发现,自插入位置起,后面的所有数据元素都往后面移动了一位,想一想,如果数据量上去了,那么等于大量的数据都要移动,而且实际的情况,远远比这个更糟糕,所以这样的插入效率,可想而知。 而删除的过程,虽然和插入恰恰相反,但是造成的后果,都是一样的,都是会导致大量的数据移动位置。 也因此,我们知道的是,顺序存储结构的缺点,就是在插入或者是删除的时候,效率会比较低。 2. 链式存储结构 在实际的数据操作过程中,有的数据结构需要的是查找的时候速度快一点,那么我们的顺序存储结构是符合这种需求的,但是也有很多时候,我们的数据会被多次删除也会被多次的插入,这就引发了一个问题,顺序存储结构不能满足这种需求场景,于是就出现了链式存储结构。 链式存储结构是将数据元素存放在任意的存储单元里,这组存储单元可以是连续的,也可以是不连续的。 也就是说,在链式存储结构中,数据是可以放到这个存储空间的任意位置,也因此,数据与数据之间物理位置的关系,不能表达出数据之间的逻辑关系。 那么,要如何解决这种逻辑关系呢? 于是在链式存储结构中,会将一个数据单元,分为两个区域,一个数据域,一个指针域。 数据域存储的就是我们实际的数据,而指针域则存储的是关联的其他数据的位置的指针,如果要通过链式存储结构存储数据 [A , B , C , D , E] ,存储的大概模型如图所示: 在这张图的基础上,经过转变后,将图变为如下的样子: 这样可以看的出来,指针域指向了下一个数据节点,也就是说,在连式存储结构中,数据之间的关系,是通过指针域的指向关系来表现的,和数据实际存储的位置没有关系。 2.1 优点 链式存储结构的优点是很好理解的,先看链式存储结构的插入图: 由于链式存储结构中,数据的关系不是由数据的实际存储位置表示的,而是由各个数据节点的指针域来表示的,那么也就是说,在数据插入或者删除的时候,不需要将插入数据后面的数据移动,只需要修改一下指针域的指向性就可以了。 也就是说,链式存储结构的插入或者删除元素很方便,比较灵活。而且我们看到的是,链式存储结构不需要连续的内存来存储,也就是说,对碎片型的内存的利用效率还是相对比较高的。 2.2 缺点 在顺序存储结构中,数据的存储就只需要存储这个数据就行。但是在链式存储结构中,数据的存储,除了存储的是这个数据外,还需要存储和这个数据相关的指针域。 这样,就相比于顺序存储结构外,链式存储结构多存储了一部分其他的数据,所以对存储空间的利用率就会降低。 而且要注意的是,链式存储结构,在物理逻辑上讲,因为实际存储的位置不是顺序的,那么读取的时候,效率会比较低。 总结 3.1 顺序存储结构 优点: 数据存储密度大,存储空间的利用率要大。 查找速度比较快。 缺点: 插入或者是删除的时候,效率会比较低 3.2 链式存储结构 优点: 插入或者删除的时候,效率比较高 对碎片型的内存利用率高。 缺点: 额外新增了指针域的数据,对内存的消耗大 在读取的时候效率比较低。

优秀的个人博客,低调大师

PaddleHub v2.0.0-beta1 已经发布,深度学习模型开发工具

PaddleHub v2.0.0-beta1 已经发布,此版本更新内容包括: BERT、ERNIE、RoBERTa等Transformer类模型升级至动态图,增加文本分类的Fine-Tune能力 新增自动数据增强能力Auto Augment,能高效地搜索适合数据集的数据增强策略组合。 支持搜索算法: PBA 支持任务: 图像分类、物体检测(待开放) 分布式自动数据增强搜索服务可以试用BML全功能AI开发平台 修复部分已知问题 详情查看:https://gitee.com/paddlepaddle/PaddleHub/releases/v2.0.0-beta1

资源下载

更多资源
Mario

Mario

马里奥是站在游戏界顶峰的超人气多面角色。马里奥靠吃蘑菇成长,特征是大鼻子、头戴帽子、身穿背带裤,还留着胡子。与他的双胞胎兄弟路易基一起,长年担任任天堂的招牌角色。

Nacos

Nacos

Nacos /nɑ:kəʊs/ 是 Dynamic Naming and Configuration Service 的首字母简称,一个易于构建 AI Agent 应用的动态服务发现、配置管理和AI智能体管理平台。Nacos 致力于帮助您发现、配置和管理微服务及AI智能体应用。Nacos 提供了一组简单易用的特性集,帮助您快速实现动态服务发现、服务配置、服务元数据、流量管理。Nacos 帮助您更敏捷和容易地构建、交付和管理微服务平台。

Sublime Text

Sublime Text

Sublime Text具有漂亮的用户界面和强大的功能,例如代码缩略图,Python的插件,代码段等。还可自定义键绑定,菜单和工具栏。Sublime Text 的主要功能包括:拼写检查,书签,完整的 Python API , Goto 功能,即时项目切换,多选择,多窗口等等。Sublime Text 是一个跨平台的编辑器,同时支持Windows、Linux、Mac OS X等操作系统。

WebStorm

WebStorm

WebStorm 是jetbrains公司旗下一款JavaScript 开发工具。目前已经被广大中国JS开发者誉为“Web前端开发神器”、“最强大的HTML5编辑器”、“最智能的JavaScript IDE”等。与IntelliJ IDEA同源,继承了IntelliJ IDEA强大的JS部分的功能。

用户登录
用户注册