首页 文章 精选 留言 我的

精选列表

搜索[优化],共10000篇文章
优秀的个人博客,低调大师

这样优化终于通了!

遇到的问题 我们项目使用seatunnel从业务库抽数到数仓(StarRocks),已经成功使用Mysql-CDC做了大量的实时同步。但最近在抽一个MySQL表的时候遇到了异常情况,作业启动之后,日志显示读写数量一直为0,且长时间不停止,运行6小时之后以checkpoint timeout异常停止。 作业模型如下(已擦除涉密信息): 运行关键日志: 问题背景 场景:使用mysql-cdc进行数据实时抽取到StarRocks seatunnel版本:2.3.9 Mysql版本:8.x starrocks版本:3.2 源表数据量:6000W-7000W 提出疑问 为什么读写数量一直为0? 为什么运行这么长时间才报超时? 分析过程 由于之前已大量使用mysql-cdc进行抽数,模型配置基本一致,没有出现过这种问题,大概率不是seatunnel的问题。 对比之前的表,看源表和之前正常接入的表是否有什么不一样。 对比之下果然发现猫腻: 之前的表基本都是有自增主键的;本次同步的表没有自增主键,仅设有多个唯一索引 疑问就来了:SeaTunnel到底是怎样同步数据的? 根据已有的认知,在同步cdc数据时,SeaTunnel会分为两个步骤:先快照同步,再解析binlog进行增量同步。 作业启动读取数量一直为0,应该是在快照同步阶段就卡住了。那么,快照同步的过程是啥呢? 先查看官方文档MySQL CDC | Apache SeaTunnel:https://seatunnel.apache.org/zh-CN/docs/2.3.9/connector-v2/source/MySQL-CDC 其实是没有关于原理方面的介绍的,但是找到了一些可配置参数: 参数解析 chunk-key.even-distribution.factor.upper-bound 默认值:100 英文描述: The upper bound of the chunk key distribution factor. This factor is used to determine whether the table data is evenly distributed. If the distribution factor is calculated to be less than or equal to this upper bound (i.e., (MAX(id) - MIN(id) + 1) / row count), the table chunks would be optimized for even distribution. Otherwise, if the distribution factor is greater, the table will be considered as unevenly distributed and the sampling-based sharding strategy will be used if the estimated shard count exceeds the value specified by sample-sharding.threshold. The default value is 100.0. 中文描述: 用于分块的key的分布因子的上边界值。这个因子用于决定这个表数据是否是均匀分布的。如果这个分布因子<=上边界值(举例:计算公式(MAX(id) - MIN(id) + 1) / 总数据量),这个表分块的时候会按照均匀分布的策略进行,否则,如果这个分布因子超过了上边界值,这个表被认为是不均匀分布,并且如果预估分片数量超过了sample-sharding.threshold参数值(默认100),会使用基于采样的分片策略。 chunk-key.even-distribution.factor.upper-bound 默认值:0.5 英文描述: The lower bound of the chunk key distribution factor. This factor is used to determine whether the table data is evenly distributed. If the distribution factor is calculated to be greater than or equal to this lower bound (i.e., (MAX(id) - MIN(id) + 1) / row count), the table chunks would be optimized for even distribution. Otherwise, if the distribution factor is less, the table will be considered as unevenly distributed and the sampling-based sharding strategy will be used if the estimated shard count exceeds the value specified by sample-sharding.threshold. The default value is 0.05. 中文描述:分布因子的下边界值 sample-sharding.threshold 默认值:1000 英文描述: This configuration specifies the threshold of estimated shard count to trigger the sample sharding strategy. When the distribution factor is outside the bounds specified by chunk-key.even-distribution.factor.upper-bound and chunk-key.even-distribution.factor.lower-bound, and the estimated shard count (calculated as approximate row count / chunk size) exceeds this threshold, the sample sharding strategy will be used. This can help to handle large datasets more efficiently. The default value is 1000 shards. 中文描述: 这个配置项是预估分片数的阈值,用于触发采样分片策略。当分布因子在上下边界之外,并且预估分片数(预估总数据量/分块大小)超过这个阈值,将会使用采样分片策略。这个参数能够帮助更高效的处理非常大的数据量。默认值1000分片。 inverse-sampling.rate 默认值:1000 英文描述: The inverse of the sampling rate used in the sample sharding strategy. For example, if this value is set to 1000, it means a 1/1000 sampling rate is applied during the sampling process. This option provides flexibility in controlling the granularity of the sampling, thus affecting the final number of shards. It's especially useful when dealing with very large datasets where a lower sampling rate is preferred. The default value is 1000. 中文描述: 参数值的倒数会用于采样分片策略。举例,如果值是1000,按照1/1000去采样。这个参数给控制采样的粒度提供了灵活性,直接影响最终的分片数。在处理非常大的数据量是,配置一个较低的采样率会特别有效。默认值1000。 snapshot.split.size 默认值:8096 英文描述: The split size (number of rows) of table snapshot, captured tables are split into multiple splits when read the snapshot of table. 中文描述: 快照同步阶段的分片大小(行数),被接入的表会被分割为多个分片 snapshot.fetch.size 默认值:1024 英文描述: The maximum fetch size for per poll when read table snapshot. 中文描述: 每次读取表快照时最大的拉取大小。应该也是指行数。 从这几个配置中能获取到的信息: 在快照同步阶段会将数据划分为多个分片,根据数据是否均匀分布会有不同的分片策略。 我们这边表数据量大概是6000W,count不出来,业务系统人员反馈的 由于这个表没有主键,也不知道SeaTunnel内部是拿哪个字段作为分块键的? 暂且拿ID做分块键,这个字段正好有唯一索引,也合理 select max(ID),min(ID) from 表 得到最大key值:804306477418 最小key值:607312608210 那么数据分布因子计算为:(804306477418-607312608210+1)/60000000 = 3283.2312 明显不在均匀分布因子0.5~100范围内,那就判定为不均匀分布 按照默认分块大小8096来计算分块数,60000000/8096 = 7411 明显大于sample-sharding.threshold默认值1000,所以猜测走了采样分片策略 默认采样率为1000,按照6000W的数据量来计算,需要采样60000条 此时此刻,我坚定的认为内部一直在采样,并且非常感兴趣SeaTunnel内部是如何采样的,为什么能运行长达6个小时? 简单一想:即使表是6000W数据量,查6W条数据好像也不应该那么慢吧,肯定是按照ID查,且仅查ID,ID上有唯一索引 最终决定拉下源码,一探究竟 地址:https://github.com/apache/seatunnel/ SeaTunnel架构还是挺复杂的,整环境主要是拉依赖花了大概一天时间,过程不做赘述。 具体怎么找到关键代码的说实话我忘了,也是花了大概一天时间,好像是通过日志中的关键字找的,不了解架构情况下其实挺难找到对应代码。 部分源码解读 private List<ChunkRange> splitTableIntoChunks( JdbcConnection jdbc, TableId tableId, Column splitColumn) throws Exception { final String splitColumnName = splitColumn.name(); //获取最大最小值 final Object[] minMax = queryMinMax(jdbc, tableId, splitColumn); final Object min = minMax[0]; final Object max = minMax[1]; if (min == null || max == null || min.equals(max)) { // empty table, or only one row, return full table scan as a chunk return Collections.singletonList(ChunkRange.all()); } //获取配置中的分块大小,分布因子边界值,采样分片阈值 final int chunkSize = sourceConfig.getSplitSize(); final double distributionFactorUpper = sourceConfig.getDistributionFactorUpper(); final double distributionFactorLower = sourceConfig.getDistributionFactorLower(); final int sampleShardingThreshold = sourceConfig.getSampleShardingThreshold(); log.info( "Splitting table {} into chunks, split column: {}, min: {}, max: {}, chunk size: {}, " + "distribution factor upper: {}, distribution factor lower: {}, sample sharding threshold: {}", tableId, splitColumnName, min, max, chunkSize, distributionFactorUpper, distributionFactorLower, sampleShardingThreshold); if (isEvenlySplitColumn(splitColumn)) { //获取预估总数,用的是show table status long approximateRowCnt = queryApproximateRowCnt(jdbc, tableId); //计算分布因子 double distributionFactor = calculateDistributionFactor(tableId, min, max, approximateRowCnt); //判断是否均匀分布 boolean dataIsEvenlyDistributed = doubleCompare(distributionFactor, distributionFactorLower) >= 0 && doubleCompare(distributionFactor, distributionFactorUpper) <= 0; //如果均匀分布,按照走均匀分布分片策略 if (dataIsEvenlyDistributed) { // the minimum dynamic chunk size is at least 1 final int dynamicChunkSize = Math.max((int) (distributionFactor * chunkSize), 1); return splitEvenlySizedChunks( tableId, min, max, approximateRowCnt, chunkSize, dynamicChunkSize); } else { //走采样分片策略前要判断是否超过采样分片数阈值 int shardCount = (int) (approximateRowCnt / chunkSize); int inverseSamplingRate = sourceConfig.getInverseSamplingRate(); if (sampleShardingThreshold < shardCount) { // It is necessary to ensure that the number of data rows sampled by the // sampling rate is greater than the number of shards. // Otherwise, if the sampling rate is too low, it may result in an insufficient // number of data rows for the shards, leading to an inadequate number of // shards. // Therefore, inverseSamplingRate should be less than chunkSize if (inverseSamplingRate > chunkSize) { log.warn( "The inverseSamplingRate is {}, which is greater than chunkSize {}, so we set inverseSamplingRate to chunkSize", inverseSamplingRate, chunkSize); inverseSamplingRate = chunkSize; } log.info( "Use sampling sharding for table {}, the sampling rate is {}", tableId, inverseSamplingRate); //采样 Object[] sample = sampleDataFromColumn(jdbc, tableId, splitColumn, inverseSamplingRate); log.info( "Sample data from table {} end, the sample size is {}", tableId, sample.length); //走采样分片策略 return efficientShardingThroughSampling( tableId, sample, approximateRowCnt, shardCount); } //走不均匀分布分片策略 return splitUnevenlySizedChunks(jdbc, tableId, splitColumn, min, max, chunkSize); } } else { return splitUnevenlySizedChunks(jdbc, tableId, splitColumn, min, max, chunkSize); } } 这里关注采样逻辑: public static Object[] skipReadAndSortSampleData( JdbcConnection jdbc, TableId tableId, String columnName, int inverseSamplingRate) throws Exception { //即select ID from 源表 final String sampleQuery = String.format("SELECT %s FROM %s", quote(columnName), quote(tableId)); Statement stmt = null; ResultSet rs = null; List<Object> results = new ArrayList<>(); try { //直接把6000W个ID一次性查出来效率很低,会有内存溢出风险,这里使用到了游标查询 stmt = jdbc.connection() .createStatement( ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); stmt.setFetchSize(Integer.MIN_VALUE); rs = stmt.executeQuery(sampleQuery); int count = 0; while (rs.next()) { //遍历所有数据,每10W条打印一条日志记录进度 count++; if (count % 100000 == 0) { log.info("Processing row index: {}", count); } //对采样率1000进行取模,即ID是1000的整数倍的作为样本 if (count % inverseSamplingRate == 0) { results.add(rs.getObject(1)); } if (Thread.currentThread().isInterrupted()) { throw new InterruptedException("Thread interrupted"); } } } finally { if (rs != null) { try { rs.close(); } catch (SQLException e) { log.error("Failed to close ResultSet", e); } } if (stmt != null) { try { stmt.close(); } catch (SQLException e) { log.error("Failed to close Statement", e); } } } Object[] resultsArray = results.toArray(); Arrays.sort(resultsArray); return resultsArray; } 这就是核心采样逻辑了,原来要遍历所有的数据啊,难怪那么慢,我看服务端日志确实有Processing row index,之前我还纳闷这是在干啥,一直打印不完。 这里大概采60000个ID。 继续看采样分片逻辑: protected List<ChunkRange> efficientShardingThroughSampling( TableId tableId, Object[] sampleData, long approximateRowCnt, int shardCount) { log.info( "Use efficient sharding through sampling optimization for table {}, the approximate row count is {}, the shardCount is {}", tableId, approximateRowCnt, shardCount); final List<ChunkRange> splits = new ArrayList<>(); if (shardCount == 0) { splits.add(ChunkRange.of(null, null)); return splits; } //样本数除以分片数,这里主要用于判断样本数是否大于分片数,保证样本在分片上的覆盖率 double approxSamplePerShard = (double) sampleData.length / shardCount; Object lastEnd = null; if (approxSamplePerShard <= 1) { splits.add(ChunkRange.of(null, sampleData[0])); lastEnd = sampleData[0]; //样本数小于等于分片数的时候,有多少样本分多少片, for (int i = 1; i < sampleData.length; i++) { // avoid split duplicate data if (!sampleData[i].equals(lastEnd)) { splits.add(ChunkRange.of(lastEnd, sampleData[i])); lastEnd = sampleData[i]; } } splits.add(ChunkRange.of(lastEnd, null)); } else { //样本数大于分片数时,要让每个分片的ID范围保持一致 for (int i = 0; i < shardCount; i++) { Object chunkStart = lastEnd; Object chunkEnd = (i < shardCount - 1) ? sampleData[(int) ((i + 1) * approxSamplePerShard)] : null; // avoid split duplicate data if (i == 0 || i == shardCount - 1 || !Objects.equals(chunkEnd, chunkStart)) { splits.add(ChunkRange.of(chunkStart, chunkEnd)); lastEnd = chunkEnd; } } } return splits; } 最终得到一个分片的集合,每个分片有自己的起始位置,且不会有交集,再来看看分片的类定义吧: 快照分片是为了实现并行读取,这样可以更快的拉取历史数据。 解决方式 通过上述分析,我们大致知道了快照读阶段是怎么个过程,也验证到之前作业一直读不到数时因为在做采样,需要遍历所有数据。之所以会做采样,是因为SeaTunnel判定源表的数据为不均匀分布了。 我们的需求已经卡了好几天了,想要让SeaTunnel尽快地开始同步数据,我们想到一个最简单的方式:调整参数让SeaTunnel判定为是均匀分布的,这样就不会走采样了,不走采样的话分片很快就结束了。 默认均匀分布因子范围是0.5~100,而我们的数据算出来分布因子大概是3000多,果断把分布因子调为4000,最终配置如下: snapshot.split.size,是因为我们的数据很不均匀,间隔率大概3000,所以在默认值的基础上加了1000倍,(啊我承认瞎加的,不知道有用没)。 table-names-config:手动给表指定主键和快照分片键,因为我的表没有主键,暂时还不知道SeaTunnel是如何选择分块键的,既然可以指定就试试唠(也是我瞎加的)。 最终结果 那当然是开始同步了呀! 原文链接:https://blog.csdn.net/qq_36265343/article/details/148918844 本文由 白鲸开源科技 提供发布支持!

优秀的个人博客,低调大师

Anolis OS 8.10 发布:软硬协同优化,满足多行业实际应用需求

引言 龙蜥操作系统 Anolis OS 8 是 OpenAnolis 龙蜥社区发行的开源 Linux 发行版,支持多计算架构,提供稳定、高性能、安全、可靠的操作系统支持。Anolis OS 8.10 是 Anolis OS 8 发布的第六个小版本,通过软硬协同,不断完善生态,以满足各个行业领域的实际应用需求。 发布内容 龙蜥操作系统 Anolis OS 8.10 发布内容包括ISO、虚拟机镜像、容器镜像和 repo 源。 1、ISO 列表 名称 描述 AnolisOS-8.10-x86_64-dvd.iso x86_64 架构的基础安装 ISO,约 17.2 GB AnolisOS-8.10-x86_64-minimal.iso x86_64 架构的精简安装 ISO,约 2.9 GB AnolisOS-8.10-x86_64-boot.iso x86_64 架构的网络安装 ISO,约 1.2 GB AnolisOS-8.10-aarch64-dvd.iso aarch64 架构的基础安装 ISO,约 14.7 GB AnolisOS-8.10-aarch64-minimal.iso aarch64 架构的精简安装 ISO, 约 2.7 GB AnolisOS-8.10-aarch64-boot.iso aarch64 架构的网络安装 ISO, 约 1.1 GB AnolisOS-8.10-loongarch64-dvd.iso loongarch64 架构的基础安装 ISO, 约 9.2 GB AnolisOS-8.10-loongarch64-minimal.iso loongarch64 架构的精简安装 ISO, 约 2.1 GB AnolisOS-8.10-loongarch64-boot.iso loongarch64 架构的网络安装 ISO, 约 1.1 GB AnolisOS-8.10-src-dvd.iso 源码包 ISO,约 26.8 GB 2、虚拟机镜像列表 名称 描述 AnolisOS-8.10-x86_64-ANCK.qcow2 x86_64 架构 QEMU 虚拟机镜像 AnolisOS-8.10-x86_64-RHCK.qcow2 x86_64 架构 QEMU 虚拟机镜像 AnolisOS-8.10-aarch64-ANCK.qcow2 aarch64 架构 QEMU 虚拟机镜像 AnolisOS-8.10-aarch64-RHCK.qcow2 aarch64 架构 QEMU 虚拟机镜像 AnolisOS-8.10-loongarch64.qcow2 loongarch64 架构 QEMU 虚拟机镜像 3、容器镜像 名称 描述 AnolisOS-8.10-x86_64-docker.tar docker pull registry.openanolis.cn/openanolis/anolisos:8.10 x86_64 架构容器镜像 AnolisOS-8.10-aarch64-docker.tar docker pull registry.openanolis.cn/openanolis/anolisos:8.10 aarch64 架构容器镜像 AnolisOS-8.10-loongarch64-docker.tar docker pull cr.loongnix.cn/openanolis/anolisos:8.10 loongarch64 架构容器镜像 4、下载列表 社区网站(复制链接至服务器打开): https://mirrors.openanolis.cn/anolis/8/ 阿里云镜像站(复制链接至服务器打开): https://mirrors.aliyun.com/anolis/8/ 5、REPO 源列表 名称 描述 仓库状态 BaseOS BaseOS 软件包源,该源目的是提供安装基础的所有核心包。 默认开启 AppStream AppStream 软件包源,该源提供额外的多场景,多用途的用户态程序,数据库等。 默认开启 PowerTools PowerTools 软件包源,该源提供开发者需要的额外包。 默认开启 Plus Plus 软件包源,提供社区滚动内核以及相应的组件。 默认不启用 DDE DDE 软件包源,提供 DDE 桌面环境以及相应的组件。 默认不启用 NDE NDE 软件包源,提供 NDE 桌面环境以及相应的组件。 默认不启用 Extras repo 仓库包源,提供了各类衍生仓库的仓库包。 默认开启 kernel-5.10 5.10 内核软件包源,提供 5.10 内核包以及相应的组件。 默认在 5.10 内核环境中开启,非 5.10 内核环境中需要使用 enablerepo=kernel-5.10 开启 EPAO EPAO 软件包源,提供社区孵化类软件。 默认不启用,需要安装 anolis-epao-release 后使用 亮点一览 Anolis OS 8.10ANCK 镜像默认内核变更为kernel-5.10.134-18.an8版本。 Anolis OS 8.10 龙芯镜像默认内核变更为kernel-4.19.190-7.12.an8版本。 正式支持 Intel GNR 平台,AMD Turin 平台,龙芯 3C6000、3D6000、3E6000 平台。 提供Dragonwell8/11/17/21 四个版本的 java 组件,可根据实际需要替代开源OpenJDK系列。 集成开源分布式数据库社区版 oceanbase-ce 最新 4.3.5.1 版本,新增了对嵌套物化视图的支持,并完善了全文索引和向量索引功能。 新引入基于 QT 开发的超融合轻量级桌面环境--新支点桌面环境 (NewStart Desktop Environment)。 新引入 python3.11 的兼容包,使用户可以在主版本是 3.6 的情况下使用 python3.11 而不破坏系统的兼容性和稳定性,实现 python 生态兼容扩展。 推动软件包生态建设,新引入大量生态包,增强了对大数据和人工智能等应用场景的支持。 为用户提供了可靠的安全更新和缺陷修复,进一步提升了系统的安全性和稳定性。 详细发行声明,可参阅 Anolis OS 8.10 GA 发行声明(可复制到浏览器打开): https://docs.openanolis.cn/document/detail/1338493464723914757?docsNo=1132372012566249498&documentNo=1338493494704799792 致谢 衷心感谢龙芯中科、海光信息、统信软件、浪潮信息、Intel、AMD、飞腾、中兴通讯、中兴新支点、中科方德、中科曙光、红旗软件、阿里云等(排名不分先后)各理事单位对 Anolis OS 8.10 版本研发过程的大力支持。 —— 完 —— 本文分享自微信公众号 - OpenAnolis龙蜥(OpenAnolis)。 如有侵权,请联系 support@oschina.cn 删除。 本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。

资源下载

更多资源
优质分享App

优质分享App

近一个月的开发和优化,本站点的第一个app全新上线。该app采用极致压缩,本体才4.36MB。系统里面做了大量数据访问、缓存优化。方便用户在手机上查看文章。后续会推出HarmonyOS的适配版本。

Nacos

Nacos

Nacos /nɑ:kəʊs/ 是 Dynamic Naming and Configuration Service 的首字母简称,一个易于构建 AI Agent 应用的动态服务发现、配置管理和AI智能体管理平台。Nacos 致力于帮助您发现、配置和管理微服务及AI智能体应用。Nacos 提供了一组简单易用的特性集,帮助您快速实现动态服务发现、服务配置、服务元数据、流量管理。Nacos 帮助您更敏捷和容易地构建、交付和管理微服务平台。

Spring

Spring

Spring框架(Spring Framework)是由Rod Johnson于2002年提出的开源Java企业级应用框架,旨在通过使用JavaBean替代传统EJB实现方式降低企业级编程开发的复杂性。该框架基于简单性、可测试性和松耦合性设计理念,提供核心容器、应用上下文、数据访问集成等模块,支持整合Hibernate、Struts等第三方框架,其适用范围不仅限于服务器端开发,绝大多数Java应用均可从中受益。

Sublime Text

Sublime Text

Sublime Text具有漂亮的用户界面和强大的功能,例如代码缩略图,Python的插件,代码段等。还可自定义键绑定,菜单和工具栏。Sublime Text 的主要功能包括:拼写检查,书签,完整的 Python API , Goto 功能,即时项目切换,多选择,多窗口等等。Sublime Text 是一个跨平台的编辑器,同时支持Windows、Linux、Mac OS X等操作系统。

用户登录
用户注册