一种轻量分表方案-MyBatis 拦截器分表实践
背景
部门内有一些亿级别核心业务表增速非常快,增量日均100W,但线上业务只依赖近一周的数据。随着数据量的迅速增长,慢SQL频发,数据库性能下降,系统稳定性受到严重影响。本篇文章,将分享如何使用MyBatis拦截器低成本的提升数据库稳定性。
业界常见方案
针对冷数据多的大表,常用的策略有以2种:
1. 删除/归档旧数据。
2. 分表。
归档/删除旧数据
定期将冷数据移动到归档表或者冷存储中,或定期对表进行删除,以减少表的大小。此策略逻辑简单,只需要编写一个JOB定期执行SQL删除数据。我们开始也是用这种方案,但此方案也有一些副作用:
综上,此方案有一定风险,为了规避这种风险,我们决定采用另一种方案:分表。
分表
我们决定按日期对表进行横向拆分,实现让系统每周生成一张周期表,表内只存近一周的数据,规避单表过大带来的风险。
分表方案选型
经调研,考虑2种分表方案:Sharding-JDBC、利用Mybatis自带的拦截器特性。
经过对比后,决定采用Mybatis拦截器来实现分表,原因如下:
分表具体实现代码
分表配置对象
import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import java.util.Date; @Data @AllArgsConstructor @NoArgsConstructor public class ShardingProperty { // 分表周期天数,配置7,就是一周一分 private Integer days; // 分表开始日期,需要用这个日期计算周期表名 private Date beginDate; // 需要分表的表名 private String tableName; }
分表配置类
import java.util.concurrent.ConcurrentHashMap; public class ShardingPropertyConfig { public static final ConcurrentHashMap<String, ShardingProperty> SHARDING_TABLE = new ConcurrentHashMap<>(); static { ShardingProperty orderInfoShardingConfig = new ShardingProperty(15, DateUtils.string2Date("20231117"), "order_info"); ShardingProperty userInfoShardingConfig = new ShardingProperty(7, DateUtils.string2Date("20231117"), "user_info"); SHARDING_TABLE.put(orderInfoShardingConfig.getTableName(), orderInfoShardingConfig); SHARDING_TABLE.put(userInfoShardingConfig.getTableName(), userInfoShardingConfig); } }
拦截器
import lombok.extern.slf4j.Slf4j; import o2o.aspect.platform.function.template.service.TemplateMatchService; import org.apache.commons.lang3.StringUtils; import org.apache.ibatis.executor.statement.StatementHandler; import org.apache.ibatis.mapping.BoundSql; import org.apache.ibatis.mapping.MappedStatement; import org.apache.ibatis.plugin.*; import org.apache.ibatis.reflection.DefaultReflectorFactory; import org.apache.ibatis.reflection.MetaObject; import org.apache.ibatis.reflection.ReflectorFactory; import org.apache.ibatis.reflection.factory.DefaultObjectFactory; import org.apache.ibatis.reflection.factory.ObjectFactory; import org.apache.ibatis.reflection.wrapper.DefaultObjectWrapperFactory; import org.apache.ibatis.reflection.wrapper.ObjectWrapperFactory; import org.springframework.stereotype.Component; import java.sql.Connection; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.util.Date; import java.util.Properties; @Slf4j @Component @Intercepts({@Signature(type = StatementHandler.class, method = "prepare", args = {Connection.class, Integer.class})}) public class ShardingTableInterceptor implements Interceptor { private static final ObjectFactory DEFAULT_OBJECT_FACTORY = new DefaultObjectFactory(); private static final ObjectWrapperFactory DEFAULT_OBJECT_WRAPPER_FACTORY = new DefaultObjectWrapperFactory(); private static final ReflectorFactory DEFAULT_REFLECTOR_FACTORY = new DefaultReflectorFactory(); private static final String MAPPED_STATEMENT = "delegate.mappedStatement"; private static final String BOUND_SQL = "delegate.boundSql"; private static final String ORIGIN_BOUND_SQL = "delegate.boundSql.sql"; private static final DateTimeFormatter FORMATTER = DateTimeFormatter.ofPattern("yyyyMMdd"); private static final String SHARDING_MAPPER = "com.jd.o2o.inviter.promote.mapper.ShardingMapper"; private ConfigUtils configUtils = SpringContextHolder.getBean(ConfigUtils.class); @Override public Object intercept(Invocation invocation) throws Throwable { boolean shardingSwitch = configUtils.getBool("sharding_switch", false); // 没开启分表 直接返回老数据 if (!shardingSwitch) { return invocation.proceed(); } StatementHandler statementHandler = (StatementHandler) invocation.getTarget(); MetaObject metaStatementHandler = MetaObject.forObject(statementHandler, DEFAULT_OBJECT_FACTORY, DEFAULT_OBJECT_WRAPPER_FACTORY, DEFAULT_REFLECTOR_FACTORY); MappedStatement mappedStatement = (MappedStatement) metaStatementHandler.getValue(MAPPED_STATEMENT); BoundSql boundSql = (BoundSql) metaStatementHandler.getValue(BOUND_SQL); String originSql = (String) metaStatementHandler.getValue(ORIGIN_BOUND_SQL); if (StringUtils.isBlank(originSql)) { return invocation.proceed(); } // 获取表名 String tableName = TemplateMatchService.matchTableName(boundSql.getSql().trim()); ShardingProperty shardingProperty = ShardingPropertyConfig.SHARDING_TABLE.get(tableName); if (shardingProperty == null) { return invocation.proceed(); } // 新表 String shardingTable = getCurrentShardingTable(shardingProperty, new Date()); String rebuildSql = boundSql.getSql().replace(shardingProperty.getTableName(), shardingTable); metaStatementHandler.setValue(ORIGIN_BOUND_SQL, rebuildSql); if (log.isDebugEnabled()) { log.info("rebuildSQL -> {}", rebuildSql); } return invocation.proceed(); } @Override public Object plugin(Object target) { if (target instanceof StatementHandler) { return Plugin.wrap(target, this); } return target; } @Override public void setProperties(Properties properties) {} public static String getCurrentShardingTable(ShardingProperty shardingProperty, Date createTime) { String tableName = shardingProperty.getTableName(); Integer days = shardingProperty.getDays(); Date beginDate = shardingProperty.getBeginDate(); Date date; if (createTime == null) { date = new Date(); } else { date = createTime; } if (date.before(beginDate)) { return null; } LocalDateTime targetDate = SimpleDateFormatUtils.convertDateToLocalDateTime(date); LocalDateTime startDate = SimpleDateFormatUtils.convertDateToLocalDateTime(beginDate); LocalDateTime intervalStartDate = DateIntervalChecker.getIntervalStartDate(targetDate, startDate, days); LocalDateTime intervalEndDate = intervalStartDate.plusDays(days - 1); return tableName + "_" + intervalStartDate.format(FORMATTER) + "_" + intervalEndDate.format(FORMATTER); } }
临界点数据不连续问题
分表方案有1个难点需要解决:周期临界点数据不连续。举例:假设要对operate_log(操作日志表)大表进行横向分表,每周一张表,分表明细可看下面表格。
第一周(operate_log_20240107_20240108) | 第二周(operate_log_20240108_20240114) | 第三周(operate_log_20240115_20240121) |
1月1号 ~ 1月7号的数据 | 1月8号 ~ 1月14号的数据 | 1月15号 ~ 1月21号的数据 |
1月8号就是分表临界点,8号需要切换到第二周的表,但8号0点刚切换的时候,表内没有任何数据,这时如果业务需要查近一周的操作日志是查不到的,这样就会引发线上问题。
我决定采用数据冗余的方式来解决这个痛点。每个周期表都冗余一份上个周期的数据,用双倍数据量实现数据滑动的效果,效果见下面表格。
第一周(operate_log_20240107_20240108) | 第二周(operate_log_20240108_20240114) | 第三周(operate_log_20240115_20240121) |
12月25号 ~ 12月31号的数据 | 1月1号 ~ 1月7号的数据 | 1月8号 ~ 1月14号的数据 |
1月1号 ~ 1月7号的数据 | 1月8号 ~ 1月14号的数据 | 1月15号 ~ 1月21号的数据 |
注:表格内第一行数据就是冗余的上个周期表的数据。
思路有了,接下来就要考虑怎么实现双写(数据冗余到下个周期表),有2种方案:
方案对比后,选择了对业务性能损耗小的方案二。
监听binlog并双写流程图
监听binlog数据双写注意点
监听binlog数据双写代码
注:下面代码不能直接用,只提供基本思路
/** * 监听binlog ,分表双写,解决数据临界问题 */ @Slf4j @Component public class BinLogConsumer implements MessageListener { private MessageDeserialize deserialize = new JMQMessageDeserialize(); private static final String TABLE_PLACEHOLDER = "%TABLE%"; @Value("${mq.doubleWriteTopic.topic}") private String doubleWriteTopic; @Autowired private JmqProducerService jmqProducerService; @Override public void onMessage(List<Message> messages) throws Exception { if (messages == null || messages.isEmpty()) { return; } List<EntryMessage> entryMessages = deserialize.deserialize(messages); for (EntryMessage entryMessage : entryMessages) { try { syncData(entryMessage); } catch (Exception e) { log.error("sharding sync data error", e); throw e; } } } private void syncData(EntryMessage entryMessage) throws JMQException { // 根据binlog内的表名,获取需要同步的表 // 3种情况: // 1、老表:需要同步当前周期表,和下个周期表。 // 2、当前周期表:需要同步下个周期表,和老表。 // 3、下个周期表:不需要同步。 List<String> syncTables = getSyncTables(entryMessage.tableName, entryMessage.createTime); if (CollectionUtils.isEmpty(syncTables)) { log.info("table {} is not need sync", tableName); return; } if (entryMessage.getHeader().getEventType() == WaveEntry.EventType.INSERT) { String insertTableSqlTemplate = parseSqlForInsert(rowData); for (String syncTable : syncTables) { String insertSql = insertTableSqlTemplate.replaceAll(TABLE_PLACEHOLDER, syncTable); // 双写老表发Q,为了避免出现同步死循环问题 if (ShardingPropertyConfig.SHARDING_TABLE.containsKey(syncTable)) { Long primaryKey = getPrimaryKey(rowData.getAfterColumnsList()); sendDoubleWriteMsg(insertSql, primaryKey); continue; } mysqlConnection.executeSql(insertSql); } continue; } }
数据对比
为了保证新表和老表数据一致,需要编写对比程序,在上线前进行数据对比,保证binlog同步无问题。
具体实现代码不做展示,思路:新表查询一定量级数据,老表查询相同量级数据,都转换成JSON,equals对比。
作者:京东零售 张均杰
来源:京东云开发者社区 转载请注明来源
低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
记一次 golang 的 zstd 压缩、解压缩 50%性能优化
问题背景 1、开发反馈 trs 的 stg 环境开启 zstd 解压缩后,内存有明显持续上涨趋势,最终导致 OOM 如图,内存频繁申请释放,当时分析导致 OOM 的原因是因为 stg 的 CPU 不够,导致 GC 不及时,调整 CPU 资源后确实 OOM 没有了。并未怀疑程序本身的性能问题 2、infra 同学发现 adx 的服务存在 zstd 压缩导致 CPU 资源消耗异常的问题,发现是压缩对象的 init 操作非常重导致。 问题分析 结合上面两次问题,想到 Redis 压缩降本时提交的 go 的 zstd 代码有很大优化空间的。可将 zstd.NewWriter 、zstd.NewReader 等重对象使用 sync.Pool 缓存起来,每次使用时从池中取,用完在放回去,避免频繁 New 对象造成内存申请多从而造成 GC 压力大,CPU 资源消耗高的问题。 预期关键结果(收益) 开压缩相关的接口 RT 明显降低,压缩&解压缩申请的内存变少 CPU 资源显著降低,部分实例可减少申请 CPU 的 request 和 limit (减少实例数) 总的来说应该可以提高性能,降低资源...
- 下一篇
让游戏云原生化别再「左右为难」
作者:云原生游戏社区 当下,游戏行业正在经历云原生架构转型期,不少游戏厂商纷纷投入游戏服容器化改造。在此现象的背后,是云原生技术带来的先进生产力推动着行业向前发展:容器化提升了游戏交付的效率;声明一致性带来游戏开服效率、更新效率、以及可用性的提升;弹性伸缩使得资源可自动化地应对游戏高峰期与波谷期,在保证游戏服务质量的同时提高资源利用率。 2024 年 1 月 18 日,OpenKruiseGame (OKG) 社区与 KubeSphere 联合举办了议题为「用 OKG Dashboard 解锁云原生游戏运维之道」的技术直播。本文将与大家一起回顾分享内容。 OpenKruiseGame:游戏云原生化的理想路径 尽管云原生带来了众多优势,但作为容器编排管理事实标准的 Kubernetes,其原生工作负载并不能很好地支持游戏场景,因此 OpenKruiseGame 在此背景下诞生。 1)OpenKruiseGame 是 CNCF 顶级开源云原生负载 OpenKruise 在游戏领域下的最佳实践抽象,项目由多家一线游戏公司共同贡献维护; 2)内置多云/混合云场景的适配,推出了 Cloud Pr...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- CentOS8安装MyCat,轻松搞定数据库的读写分离、垂直分库、水平分库
- Springboot2将连接池hikari替换为druid,体验最强大的数据库连接池
- SpringBoot2整合Redis,开启缓存,提高访问速度
- SpringBoot2编写第一个Controller,响应你的http请求并返回结果
- Red5直播服务器,属于Java语言的直播服务器
- CentOS7编译安装Gcc9.2.0,解决mysql等软件编译问题
- Docker使用Oracle官方镜像安装(12C,18C,19C)
- Linux系统CentOS6、CentOS7手动修改IP地址
- Hadoop3单机部署,实现最简伪集群
- Docker安装Oracle12C,快速搭建Oracle学习环境