微服务MySQL分库分表数据到MongoDB同步方案[转]
需求背景
近年来,微服务概念持续火热,网络上针对微服务和单体架构的讨论也是越来越多,面对日益增长的业务需求是,很多公司做技术架构升级时优先选用微服务方式。我所在公司也是选的这个方向来升级技术架构,以支撑更大访问量和更方便的业务扩展。
发现问题
微服务拆分主要分两种方式:拆分业务系统不拆分数据库,拆分业务系统拆分库。如果数据规模小的话大可不必拆分数据库,因为拆分数据看必将面对多维度数据查询,跨进程之间的事务等问题。而我所在公司随着业务发展单数据库实例已经不能满足业务需要,所以选择了拆分业务系统同时拆分数据库的模式,所以也面临着以上的问题。本文主要介绍多维度数据实时查询解决方案。当前系统架构和存储结构如下:
解决思路
要对多数据库数据进行查询,首先就需要将数据库同步到一起以方便查询
为了满足大数据量数据需求,所以优先选择NOSQL数据库做同步库
NOSQL数据库基本无法进行关联查询,所以需要将关系数据进行拼接操作,转换成非关系型数据
业务多维度查询需要实时性,所以需要选择NOSQL中实时性相对比较好的数据库:MongoDB
根据以上思路,总结数据整合架构如下图所示:
解决方案
目前网上一些数据同步案例分两种:MQ消息同步和binlog数据读取同步
先说MQ消息同步,该同步方式我所在公司试用过一段时间,发现以下问题:
数据围绕业务进行,对业务关键性数据操作发送MQ消息,对业务系统依赖性比较高
对于数据库中存量数据需要单独处理
对于工具表还需要单独维护同步
每次新增数据表都需要重新添加MQ逻辑
考虑到以上问题,用MQ方式同步数据并不是最优解决办法
使用binlog 数据读取方式目前有一些成熟方案,比如tungsten replicator,但这些同步工具只能实现数据1:1复制,数据复制过程自定义逻辑添加比较麻烦,不支持分库分表数据归集操作。综上所述,最优方案应该是读取后binlog后自行处理后续数据逻辑。目前binlog读取binlog工具中最成熟的方案应该就是alibaba开源的canal了。
canal
canal是阿里巴巴mysql数据库binlog的增量订阅&消费组件 。阿里云DRDS、阿里巴巴TDDL 二级索引、小表复制. 都是基于canal做的,应用广泛。
canal原理相对比较简单:
canal模拟mysql slave的交互协议,伪装自己为mysql slave,向mysql master发送dump协议
mysql master收到dump请求,开始推送binary log给slave(也就是canal)
canal解析binary log对象(原始为byte流)
canal介绍: https://github.com/alibaba/canal/wiki
我使用的是canal的HA模式,由zookeeper选举可用实例,每个数据库一个instance,服务端配置如下:
目录:
conf database1 -instance.properties database2 -instance.properties canal.properties
instance.properties
canal.instance.mysql.slaveId = 1001 canal.instance.master.address = X.X.X.X:3306 canal.instance.master.journal.name = canal.instance.master.position = canal.instance.master.timestamp = canal.instance.dbUsername = canal canal.instance.dbPassword = canal canal.instance.defaultDatabaseName = canal.instance.connectionCharset = UTF-8 canal.instance.filter.regex = .*\\..* canal.instance.filter.black.regex =
canal.properties
canal.id= 1 canal.ip=X.X.X.X canal.port= 11111 canal.zkServers=X.X.X.X:2181,X.X.X.X:2181,X.X.X.X:2181 canal.zookeeper.flush.period = 1000 canal.file.data.dir = ${canal.conf.dir} canal.file.flush.period = 1000 canal.instance.memory.buffer.size = 16384 canal.instance.memory.buffer.memunit = 1024 canal.instance.memory.batch.mode = MEMSIZE canal.instance.detecting.enable = true canal.instance.detecting.sql = select 1 canal.instance.detecting.interval.time = 3 canal.instance.detecting.retry.threshold = 3 canal.instance.detecting.heartbeatHaEnable = false canal.instance.transaction.size = 1024 canal.instance.fallbackIntervalInSeconds = 60 canal.instance.network.receiveBufferSize = 16384 canal.instance.network.sendBufferSize = 16384 canal.instance.network.soTimeout = 30 canal.instance.filter.query.dcl = true canal.instance.filter.query.dml = false canal.instance.filter.query.ddl = false canal.instance.filter.table.error = false canal.instance.filter.rows = false canal.instance.binlog.format = ROW,STATEMENT,MIXED canal.instance.binlog.image = FULL,MINIMAL,NOBLOB canal.instance.get.ddl.isolation = false canal.destinations= example,p4-test canal.conf.dir = ../conf canal.auto.scan = true canal.auto.scan.interval = 5 canal.instance.global.mode = spring canal.instance.global.lazy = false canal.instance.global.spring.xml = classpath:spring/default-instance.xml
部署数据流如下:
tip:
虽然canal同时支持mixed和row类型的binlog日志,但是获取行数据时如果是mixed类型的日志则获取不到表名,所以本方案暂只支持row格式的binlog
数据同步
创建canal client应用订阅canal读取的binlog数据
1.开启多instance 订阅,订阅多个instance
public void initCanalStart() { List destinations = canalProperties.getDestination(); final List canalClientList = new ArrayList<>(); if (destinations != null && destinations.size() > 0) { for (String destination : destinations) { // 基于zookeeper动态获取canal server的地址,建立链接,其中一台server发生crash,可以支持failover CanalConnector connector = CanalConnectors.newClusterConnector(canalProperties.getZkServers(), destination, "", ""); CanalClient client = new CanalClient(destination, connector); canalClientList.add(client); client.start(); } } Runtime.getRuntime().addShutdownHook(new Thread() { public void run() { try { logger.info("## stop the canal client"); for (CanalClient canalClient : canalClientList) { canalClient.stop(); } } catch (Throwable e) { logger.warn("##something goes wrong when stopping canal:", e); } finally { logger.info("## canal client is down."); } } }); }
订阅消息处理
private void process() { int batchSize = 5 * 1024; while (running) { try { MDC.put("destination", destination); connector.connect(); connector.subscribe(); while (running) { Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据 long batchId = message.getId(); int size = message.getEntries().size(); if (batchId != -1 && size > 0) { saveEntry(message.getEntries()); } connector.ack(batchId); // 提交确认 // connector.rollback(batchId); // 处理失败, 回滚数据 } } catch (Exception e) { logger.error("process error!", e); } finally { connector.disconnect(); MDC.remove("destination"); } } }
根据数据库事件处理消息,过滤消息列表,对数据变动进行处理,用到信息为:
insert :schemaName,tableName,beforeColumnsList
update :schemaName,tableName,afterColumnsList
delete :schemaName,tableName,afterColumnsList
RowChange rowChage = null; try { rowChage = RowChange.parseFrom(entry.getStoreValue()); } catch (Exception e) { throw new RuntimeException("parse event has an error , data:" + entry.toString(), e); } EventType eventType = rowChage.getEventType(); logger.info(row_format, entry.getHeader().getLogfileName(), String.valueOf(entry.getHeader().getLogfileOffset()), entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType, String.valueOf(entry.getHeader().getExecuteTime()), String.valueOf(delayTime)); if (eventType == EventType.QUERY || rowChage.getIsDdl()) { logger.info(" sql ----> " + rowChage.getSql()); continue; } DataService dataService = SpringUtil.getBean(DataService.class); for (RowData rowData : rowChage.getRowDatasList()) { if (eventType == EventType.DELETE) { dataService.delete(rowData.getBeforeColumnsList(), entry.getHeader().getSchemaName(), entry.getHeader().getTableName()); } else if (eventType == EventType.INSERT) { dataService.insert(rowData.getAfterColumnsList(), entry.getHeader().getSchemaName(), entry.getHeader().getTableName()); } else if (eventType == EventType.UPDATE) { dataService.update(rowData.getAfterColumnsList(), entry.getHeader().getSchemaName(), entry.getHeader().getTableName()); } else { logger.info("未知数据变动类型:{}", eventType); } } }
ColumnsList转换成MongoTemplate 可用的数据类:DBObject,顺便做下数据类型转换
public static DBObject columnToJson(List columns) { DBObject obj = new BasicDBObject(); try { for (CanalEntry.Column column : columns) { String mysqlType = column.getMysqlType(); //int类型,长度11以下为Integer,以上为long if (mysqlType.startsWith("int")) { int lenBegin = mysqlType.indexOf('('); int lenEnd = mysqlType.indexOf(')'); if (lenBegin > 0 && lenEnd > 0) { int length = Integer.parseInt(mysqlType.substring(lenBegin + 1, lenEnd)); if (length > 10) { obj.put(column.getName(), StringUtils.isBlank(column.getValue()) ? null : Long.parseLong(column.getValue())); continue; } } obj.put(column.getName(), StringUtils.isBlank(column.getValue()) ? null : Integer.parseInt(column.getValue())); } else if (mysqlType.startsWith("bigint")) { obj.put(column.getName(), StringUtils.isBlank(column.getValue()) ? null : Long.parseLong(column.getValue())); } else if (mysqlType.startsWith("decimal")) { int lenBegin = mysqlType.indexOf('('); int lenCenter = mysqlType.indexOf(','); int lenEnd = mysqlType.indexOf(')'); if (lenBegin > 0 && lenEnd > 0 && lenCenter > 0) { int length = Integer.parseInt(mysqlType.substring(lenCenter + 1, lenEnd)); if (length == 0) { obj.put(column.getName(), StringUtils.isBlank(column.getValue()) ? null : Long.parseLong(column.getValue())); continue; } } obj.put(column.getName(), StringUtils.isBlank(column.getValue()) ? null : Double.parseDouble(column.getValue())); } else if (mysqlType.equals("datetime") || mysqlType.equals("timestamp")) { obj.put(column.getName(), StringUtils.isBlank(column.getValue()) ? null : DATE_TIME_FORMAT.parse(column.getValue())); } else if (mysqlType.equals("date")) { obj.put(column.getName(), StringUtils.isBlank(column.getValue()) ? null : DATE_FORMAT.parse(column.getValue())); } else if (mysqlType.equals("time")) { obj.put(column.getName(), StringUtils.isBlank(column.getValue()) ? null : TIME_FORMAT.parse(column.getValue())); } else { obj.put(column.getName(), column.getValue()); } } } catch (ParseException e) { e.printStackTrace(); } return obj; }
tip:
DBObject对象如果同时用于保存原始数据和组合数据或其他数据,使用时应该深度拷贝对象生成副本,然后使用副本
数据拼接
我们获取了数据库数据后做拼接操作,比如两张用户表:
user_info:{id,user_no,user_name,user_password} user_other_info:{id,user_no,idcard,realname}
拼接后mongo数据为:
user:{_id,user_no,userInfo:{id,user_no,user_name,user_password},userOtherInfo:{id,user_no,idcard,realname})
接收到的数据信息很多,如何才能简单的触发数据拼接操作呢?
先看我们能获取的信息:schemaName,tableName,DBObject,Event(insert,update,delete)
将这些信息标识拼接起来看看:/schemaName/tableName/Event(DBObject),没错,就是一个标准的restful链接。只要我们实现一个简单的springMVC 就能自动获取需要的数据信息进行拼接操作。
先实现@Controller,定义名称为Schema,value对应schemaName
@Target({ElementType.TYPE}) @Retention(RetentionPolicy.RUNTIME) @Documented @Component public @interface Schema { String value() default ""; }
然后实现@RequestMapping,定义名称为Table,直接使用Canal中的EventType 对应RequestMethod
@Target({ElementType.METHOD, ElementType.TYPE}) @Retention(RetentionPolicy.RUNTIME) @Documented public @interface Table { String value() default ""; CanalEntry.EventType[] event() default {}; }
然后创建springUtil,实现接口ApplicationContextAware,应用启动 加载的时候初始化两个Map:intanceMap,handlerMap
private static ApplicationContext applicationContext = null; //库名和数据处理Bean映射Map private static Map instanceMap = new HashMap(); //路劲和数据处理Method映射Map private static Map handlerMap = new HashMap(); @Override public void setApplicationContext(ApplicationContext applicationContext) { if (SpringUtil.applicationContext == null) { SpringUtil.applicationContext = applicationContext; //初始化instanceMap数据 instanceMap(); //初始化handlerMap数据 handlerMap(); } } private void instanceMap() { Map beans = applicationContext.getBeansWithAnnotation(Schema.class); for (Object bean : beans.values()) { Class clazz = bean.getClass(); Object instance = applicationContext.getBean(clazz); Schema schema = clazz.getAnnotation(Schema.class); String key = schema.value(); instanceMap.put(key, instance); logger.info("instanceMap [{}:{}]", key, bean == null ? "null" : clazz.getName()); } } private void handlerMap() { if (instanceMap.size() <= 0) return; for (Map.Entry entry : instanceMap.entrySet()) { if (entry.getValue().getClass().isAnnotationPresent(Schema.class)) { Schema schema = entry.getValue().getClass().getAnnotation(Schema.class); String schemeName = schema.value(); Method[] methods = entry.getValue().getClass().getMethods(); for (Method method : methods) { if (method.isAnnotationPresent(Table.class)) { Table table = method.getAnnotation(Table.class); String tName = table.value(); CanalEntry.EventType[] events = table.event(); //未标明数据事件类型的方法不做映射 if (events.length < 1) { continue; } //同一个方法可以映射多张表 for (int i = 0; i < events.length; i++) { String path = "/" + schemeName + "/" + tName + "/" + events[i].getNumber(); handlerMap.put(path, method); logger.info("handlerMap [{}:{}]", path, method.getName()); } } else { continue; } } } else { continue; } } }
调用方法:
public static void doEvent(String path, DBObject obj) throws Exception { String[] pathArray = path.split("/"); if (pathArray.length != 4) { logger.info("path 格式不正确:{}", path); return; } Method method = handlerMap.get(path); Object schema = instanceMap.get(pathArray[1]); //查找不到映射Bean和Method不做处理 if (method == null || schema == null) { return; } try { long begin = System.currentTimeMillis(); logger.info("integrate data:{},{}", path, obj); method.invoke(schema, new Object[]{obj}); logger.info("integrate data consume: {}ms:", System.currentTimeMillis() - begin); } catch (Exception e) { logger.error("调用组合逻辑异常", e); throw new Exception(e.getCause()); } }
数据拼接消息处理:
@Schema("demo_user") public class UserService { @Table(value = "user_info", event = {CanalEntry.EventType.INSERT, CanalEntry.EventType.UPDATE}) public void saveUser_UserInfo(DBObject userInfo) { String userNo = userInfo.get("user_no") == null ? null : userInfo.get("user_no").toString(); DBCollection collection = completeMongoTemplate.getCollection("user"); DBObject queryObject = new BasicDBObject("user_no", userNo); DBObject user = collection.findOne(queryObject); if (user == null) { user = new BasicDBObject(); user.put("user_no", userNo); user.put("userInfo", userInfo); collection.insert(user); } else { DBObject updateObj = new BasicDBObject("userInfo", userInfo); DBObject update = new BasicDBObject("$set", updateObj); collection.update(queryObject, update); } } }
示例源码
低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
架构师之路-创业互联网公司如何搭建自己的技术架构
适用范围 本文主要针对中小型互联网公司,特别适用于手机APP或者pc的后台架构,基本可以支撑5万日活本文会对可能用到的相关技术进行技术选型的说明,以及技术的架构介绍,技术架构的介绍课程后面有地址,可以点进去查看。 技术指标 说一下一些技术指标的计算过程可以作为其他同学的参考 QPS, 如果是5万日活,使用集中在每天的4小时,每个用户大概产生100的请求,那么平均下来,我们系统大概应该支撑的请求为:50000 * 100 / (4 * 60 * 60) = 350 qps/s 业务数据 业务量,我们自己是新闻业务,可能会有其他的业务,比如游戏,商城等等,基本每天新增的业务数据都会在同一个量级, 每日10000, 另外跟用户相关的信息也是比较大的一块,比如用户的订阅等行为,一共5万的用户,保存相关信息可能大概需要100条的数据。 缓存大小 主要业务数据和用户相关的热点数据限时保存在缓存中, 大概需要5个G左右。 日志大小 用户日志和请求日志。 大概每天3个G左右 技术架构 整体架构因为是小公司,我们基于阿里云来搭建,对图中的内容和技术选型进行一下说明: 负载均衡 可选方案: SLB, Ng...
- 下一篇
SpringBoot整合RabbitMQ之典型应用场景实战一
实战前言RabbitMQ 作为目前应用相当广泛的消息中间件,在企业级应用、微服务应用中充当着重要的角色。特别是在一些典型的应用场景以及业务模块中具有重要的作用,比如业务服务模块解耦、异步通信、高并发限流、超时业务、数据延迟处理等。 RabbitMQ 官网拜读首先,让我们先拜读 RabbitMQ 官网的技术开发手册以及相关的 Features,感兴趣的朋友可以耐心的阅读其中的相关介绍,相信会有一定的收获,地址可见:www.rabbitmq.com/getstarted.… 在阅读该手册过程中,我们可以得知 RabbitMQ 其实核心就是围绕 “消息模型” 来展开的,其中就包括了组成消息模型的相关组件:生产者,消费者,队列,交换机,路由,消息等!而我们在实战应用中,实际上也是紧紧围绕着 “消息模型” 来展开撸码的! 下面,我就介绍一下这一消息模型的演变历程,当然,这一历程在 RabbitMQ 官网也是可以窥览得到的! 上面几个图就已经概述了几个要点,而且,这几个要点的含义可以说是字如其名! 生产者:发送消息的程序 消费者:监听接收消费消息的程序 消息:一串二进制数据流 队列:消息的暂存区/...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- Docker使用Oracle官方镜像安装(12C,18C,19C)
- CentOS7设置SWAP分区,小内存服务器的救世主
- SpringBoot2更换Tomcat为Jetty,小型站点的福音
- SpringBoot2整合Thymeleaf,官方推荐html解决方案
- MySQL8.0.19开启GTID主从同步CentOS8
- Linux系统CentOS6、CentOS7手动修改IP地址
- CentOS7编译安装Cmake3.16.3,解决mysql等软件编译问题
- SpringBoot2整合Redis,开启缓存,提高访问速度
- CentOS7编译安装Gcc9.2.0,解决mysql等软件编译问题
- Hadoop3单机部署,实现最简伪集群