如何优雅统计订单收益(一)
引言
统计订单收益是做电商类型的APP老生常谈的问题.常规需求大致有用户收益日报/月报/年报
.这些报表型的数据对表设计和程序设计有着不小的挑战.常规的聚合查询
语句的查询时间会随着收益表数据日渐庞大而逐渐变长.这时候就需要思考如何设计收益表可以更高效的查询?怎样的设计才可以让统计收益变得简单?
需求
效果图
具体需求
1.收益类型分为:自购订单收益,分享订单收益,分销收益,活动收益 2.统计当日收益,当月收益 3.根据筛选的时间统计出时间段的收益.
思考
设计思路
订单表是肯定需要的.在写入或者修改订单表的时候同步写入修改收益表.只有自购和分享订单会记录到订单表中,分销以及活动赠送收益只在特殊业务中写入收益表.再以日为维度,创建一张用户收益日报表
.单行记录写入用户当天收益情况.降低
查询用户日/月/年收益统计时的数据量.以单用户为例,通过拆分用户一个月只会产生最多31
条数据.属于可控增长速度.如果沿用收益表,因为收益表的数据量跟用户下单的数量一一对应,如果用户下单量多那么表会非常庞大.在前期用户量初见增长时,可用此方法规避大的数据量统计,后期如果用户量增大导致日报表数据变多可以再考虑分表.
可见问题
1.同步收益日报表的时机问题,因为原本订单的操作就很复杂需要同步写入收益和计算写入收益日报数据,代码耦合度太高.有没有什么方法通过收益表异构出收益日报表呢? 2.虽然收益被写入到了日报表中,但是要满足效果图要求的效果,可能需要多次查询SQL语句,有没有办法在不影响程序效率的情况下尽量少些一些聚合SQL呢?
实现
总结出上面这些问题.我开始了资料收集.最终采用canal
+RocketMQ
做为异构方案.
技术栈
简单介绍下这两款技术框架:
注:我用的aliyun的全家桶,MQ和mysql都是阿里云的,如果是自建服务器的可能有区别,我在后面尽量标出
方案流程
- 在写入或修改收益表的同时通过canal监控mysql收益表的binlog日志.
- canal检测到变更,组装变更的JSON报文,发送RocketMQ中事先定义好的TOPIC.
- 程序消费该TOPIC,异构收益日报表.
canal配置部分
canal的安装请参考官方文档 解压后可得到一个canal文件夹,包含三个目录
- bin:存放启动重启脚本
conf:存放核心配置文件
- lib:存放核心jar包
我们需要重点关注conf文件夹里的conf/canal.properties核心配置文件以及conf/example/instance.properties单个监控节点配置文件
conf/canal.properties
# tcp, kafka, RocketMQ,这里默认是tcp读取模式,采用RocketMQ需要将其改变为RocketMQ模式 canal.serverMode = RocketMQ # 如果是aliyun的RocketMQ需要配置以下两个KEY,ak/sk canal.aliyun.accessKey =xxxxxxx canal.aliyun.secretKey =xxxxxxx # 监控的节点名称.这个默认就是example如果有多节点可以逗号隔开,如下方的例子 canal.destinations = example,sign # 如果是aliyun的RocketMQ需要修改canal.mq.accessChannel为cloud默认为local canal.mq.accessChannel = cloud #MQ的地址,需要注意这里是不带http://,但是需要带端口号 canal.mq.servers = #rocketmq实例id canal.mq.namespace =
conf/example/instance.properties
#mysql地址 canal.instance.master.address= #以下两个参数需要在开启数据库binlog日志后得到,在数据库查询界面输入查询语句`show master status`,canal.instance.master.journal.name对应File参数,canal.instance.master.position对应Position参数 canal.instance.master.journal.name= canal.instance.master.position= #数据库的账号密码 canal.instance.dbUsername= canal.instance.dbPassword= #需要监控变动的表 canal.instance.filter.regex=xxx.t_user_income_detail,xxx.t_user_cash_out #定义发送的mq生产组 canal.mq.producerGroup = #定义发送到mq的指定主题 canal.mq.topic=
注:监控表的书写规则格式参照监控表书写规则
启动
cd /canal/bin ./start.sh
这时候会发现canal目录中多了一个log文件,进入可以看到canal主日志文件和example节点启动日志.
canal日志中出现 the canal server is running now ...... example日志中出现 init table filter : ^tablename xxxxxxxxx , the next step is binlog dump
表示你已经成功了一大步,canal监控已正常运行.
RocketMQ部分
如果用的aliyun的RocketMQ,配置代码部分直接可参考文档 自建的RocketMQ也可参照简单的消费例子监控对应的TOPIC即可 消费Canal发来的数据,格式如下:
{ "data":[ { //单个修改后表数据,如果同一时间有多个表变动会有多个该JSON对象 } ], "database":"监控的表所在数据库", "es":表变动时间, "id":canal生成的id, "isDdl":Boolean类型,表示是否DDL语句, "mysqlType":{ 表结构 }, "old":如果是修改类型会填充修改前的值, "pkNames":[ 该表的主键,如"id" ], "sql":"执行的SQL", "sqlType":{ 字段对应的sqlType,一般使用mysqlType即可 }, "table":"监控的表名", "ts":canal记录发送时间, "type":"表的修改类型,入INSERT,UPDATE,DELETE" }
MQ消费代码主要用了反射,映射到对应的表
//这里的body就是Canal发来的数据 public Action process(String body) { boolean result = Boolean.FALSE; JSONObject data = JSONObject.parseObject(body); log.info("数据库操作日志记录:data:{}",data.toString()); Class c = null; try { //这里监控了订单和收益表分别做订单统计和收益日报统计 c = Class.forName(getClassName(data.getString("table"))); } catch (ClassNotFoundException e) { log.error("error {}",e); } if (null != c) { JSONArray dataArray = data.getJSONArray("data"); if (dataArray != null) { //把获取到的data部分转换为反射后的实体集合 List list = dataArray.toJavaList(c); if (CollUtil.isNotEmpty(list)) { //对修改和写入操作分别进行逻辑操作 String type = data.getString("type"); if ("UPDATE".equals(type)) { result = uppHistory(list); } else if ("INSERT".equals(type)) { result = saveHistory(list); } } } } return result ? Action.CommitMessage : Action.ReconsumeLater; } /** * @description: 获取反射ClassName * @author: chenyunxuan */ private String getClassName(String tableName) { StringBuilder sb = new StringBuilder(); //判断是哪张表的数据 if (tableName.equals("t_user_income_detail")) { sb.append("cn.mc.core.model.order"); } else if (tableName.equals("t_user_cash_out")) { sb.append("cn.mc.sync.model"); } String className = StrUtil.toCamelCase(tableName).substring(1); return sb.append(".").append(className).toString(); } /** * @description: 写入对应类型的统计表 * @author: chenyunxuan */ private <T> Boolean saveHistory(List<T> orderList) { boolean result = Boolean.FALSE; Object dataType = orderList.get(0); //用instanceof判断类型进入不同的逻辑处理代码 if (dataType instanceof TUserIncomeDetail) { result = userOrderHistoryService.saveIncomeDaily(orderList); } else if (dataType instanceof UserCashOut) { result = userCashOutHistoryService.delSaveHistoryList(orderList); } return result; }
saveIncomeDaily伪代码
public synchronized Boolean saveIncomeDaily(List orderList) { //循环收益明细记录 ....... //通过创建时间和用户id查询收益日报表中是否有当日数据 if(不存在当日数据){ //创建当日的收益日报表记录 ..... } //因为不存在当日记录也会立即写入当日的空数据,所以下面的流程都是走更新流程 //更新当日数据 ....... return Boolean.TRUE; }
注:代码中应该多打一些日志,方便产生异常收益数据后的校对
后记
至此一个基于canal
+RocketMQ
的收益日报统计异构方案就完成了,下一篇会围绕本文提到的第二个问题减少聚合SQL的产生展开.敬请关注.

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
分享套接字数据包序列化与反序列化方法
分享套接字数据包序列化与反序列化方法 简单说一下,本文不涉及Socket的连接、数据接收,只是对数据包(byte[])的序列化和反序列化方法的封装介绍。 本文目录 本文背景 一般操作 本文操作 总结 1.本文背景 经常做C/S,客户端与服务端通信基本是TCP/UDP通信,套接字用得飞起。 比如我们有一个系统,这个系统又分几个系统子模块进程: C++服务端 Android 客户端 iOS 客户端 WPF桌面管理端 ...... 几个模块之间通过TCP或者UDP通信,数据包解析与组装是常规操作,我们定义数据包格式如下: 一个数据包包含包头和包体,定义如下: 包头 序号 字段名 数据类型 备注 1 消息标识 int 用于标识数据包是否合法 2 名称 string 当前消息名称,用于标识数据包类型 3 版本号 int 当前消息版本号,允许程序中消息存在多个版本,用于版本迭代 包含这三个字段:消息标识、名称、版本号,唯一确定消息对象。 包体 序号 字段名 数据类型 备注 1 字段1 数据类型 字段1 2 字段2 数据类型 字段2 包体直接定义字段信息,就像定义类属性一样。 另包头与包体中数据类型...
- 下一篇
企业级RPC框架zRPC
近期比较火的开源项目go-zero是一个集成了各种工程实践的包含了Web和RPC协议的功能完善的微服务框架,今天我们就一起来分析一下其中的RPC部分zRPC。 zRPC底层依赖gRPC,内置了服务注册、负载均衡、拦截器等模块,其中还包括自适应降载,自适应熔断,限流等微服务治理方案,是一个简单易用的可直接用于生产的企业级RPC框架。 zRPC初探 zRPC支持直连和基于etcd服务发现两种方式,我们以基于etcd做服务发现为例演示zRPC的基本使用: 配置 创建hello.yaml配置文件,配置如下: Name: hello.rpc // 服务名 ListenOn: 127.0.0.1:9090 // 服务监听地址 Etcd: Hosts: - 127.0.0.1:2379 // etcd服务地址 Key: hello.rpc // 服务注册key 创建proto文件 创建hello.proto文件,并生成对应的go代码 syntax = "proto3"; package pb; service Greeter { rpc SayHello (HelloRequest) returns...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- SpringBoot2编写第一个Controller,响应你的http请求并返回结果
- CentOS7编译安装Gcc9.2.0,解决mysql等软件编译问题
- CentOS8编译安装MySQL8.0.19
- CentOS8安装Docker,最新的服务器搭配容器使用
- CentOS8,CentOS7,CentOS6编译安装Redis5.0.7
- SpringBoot2整合Redis,开启缓存,提高访问速度
- Mario游戏-低调大师作品
- Docker安装Oracle12C,快速搭建Oracle学习环境
- CentOS7安装Docker,走上虚拟化容器引擎之路
- CentOS6,7,8上安装Nginx,支持https2.0的开启