[Spring cloud 一步步实现广告系统] 16. 增量投送到kafka
实现增量数据索引
上一节中,我们为实现增量索引的加载做了充足的准备,使用到mysql-binlog-connector-java 开源组件来实现MySQL 的binlog监听,关于binlog的相关知识,大家可以自行网络查阅。或者可以mailto:magicianisaac@gmail.com
本节我们将根据binlog 的数据对象,来实现增量数据的处理,我们构建广告的增量数据,其实说白了就是为了在后期能把广告投放到索引服务,实现增量数据到增量索引的生成。Let's code.
- 定义一个投递增量数据的接口(接收参数为我们上一节定义的binlog日志的转换对象)
/**
* ISender for 投递增量数据 方法定义接口
*
* @author <a href="mailto:magicianisaac@gmail.com">Isaac.Zhang | 若初</a>
*/
public interface ISender {
void sender(MysqlRowData rowData);
}
- 创建增量索引监听器
/**
* IncrementListener for 增量数据实现监听
*
* @author <a href="mailto:magicianisaac@gmail.com">Isaac.Zhang | 若初</a>
* @since 2019/6/27
*/
@Slf4j
@Component
public class IncrementListener implements Ilistener {
private final AggregationListener aggregationListener;
@Autowired
public IncrementListener(AggregationListener aggregationListener) {
this.aggregationListener = aggregationListener;
}
//根据名称选择要注入的投递方式
@Resource(name = "indexSender")
private ISender sender;
/**
* 标注为 {@link PostConstruct},
* 即表示在服务启动,Bean完成初始化之后,立刻初始化
*/
@Override
@PostConstruct
public void register() {
log.info("IncrementListener register db and table info.");
Constant.table2db.forEach((tb, db) -> aggregationListener.register(db, tb, this));
}
@Override
public void onEvent(BinlogRowData eventData) {
TableTemplate table = eventData.getTableTemplate();
EventType eventType = eventData.getEventType();
//包装成最后需要投递的数据
MysqlRowData rowData = new MysqlRowData();
rowData.setTableName(table.getTableName());
rowData.setLevel(eventData.getTableTemplate().getLevel());
//将EventType转为OperationTypeEnum
OperationTypeEnum operationType = OperationTypeEnum.convert(eventType);
rowData.setOperationTypeEnum(operationType);
//获取模版中该操作对应的字段列表
List<String> fieldList = table.getOpTypeFieldSetMap().get(operationType);
if (null == fieldList) {
log.warn("{} not support for {}.", operationType, table.getTableName());
return;
}
for (Map<String, String> afterMap : eventData.getAfter()) {
Map<String, String> _afterMap = new HashMap<>();
for (Map.Entry<String, String> entry : afterMap.entrySet()) {
String colName = entry.getKey();
String colValue = entry.getValue();
_afterMap.put(colName, colValue);
}
rowData.getFieldValueMap().add(_afterMap);
}
sender.sender(rowData);
}
}
开启binlog监听
- 首先来配置监听binlog的数据库连接信息
adconf:
mysql:
host: 127.0.0.1
port: 3306
username: root
password: 12345678
binlogName: ""
position: -1 # 从当前位置开始监听
编写配置类:
/**
* BinlogConfig for 定义监听Binlog的配置信息
*
* @author <a href="mailto:magicianisaac@gmail.com">Isaac.Zhang | 若初</a>
*/
@Component
@ConfigurationProperties(prefix = "adconf.mysql")
@Data
@AllArgsConstructor
@NoArgsConstructor
public class BinlogConfig {
private String host;
private Integer port;
private String username;
private String password;
private String binlogName;
private Long position;
}
在我们实现 监听binlog那节,我们实现了一个自定义client CustomBinlogClient,需要实现binlog的监听,这个监听的客户端就必须是一个独立运行的线程,并且要在程序启动的时候进行监听,我们来实现运行当前client的方式,这里我们会使用到一个新的Runnerorg.springframework.boot.CommandLineRunner,let's code.
@Slf4j
@Component
public class BinlogRunner implements CommandLineRunner {
@Autowired
private CustomBinlogClient binlogClient;
@Override
public void run(String... args) throws Exception {
log.info("BinlogRunner is running...");
binlogClient.connect();
}
}
增量数据投递
在binlog监听的过程中,我们看到针对于int, String 这类数据字段,mysql的记录是没有问题的,但是针对于时间类型,它被格式化成了字符串类型:Fri Jun 21 15:07:53 CST 2019。
--------Insert-----------
WriteRowsEventData{tableId=91, includedColumns={0, 1, 2, 3, 4, 5, 6, 7}, rows=[
[10, 11, ad unit test binlog, 1, 0, 1236.7655, Thu Jun 27 08:00:00 CST 2019, Thu Jun 27 08:00:00 CST 2019]
--------Update-----------
UpdateRowsEventData{tableId=81, includedColumnsBeforeUpdate={0, 1, 2, 3, 4, 5}, includedColumns={0, 1, 2, 3, 4, 5}, rows=[
{before=[10, Isaac Zhang, 2D3ABB6F2434109A105170FB21D00453, 0, Fri Jun 21 15:07:53 CST 2019, Fri Jun 21 15:07:53 CST 2019], after=[10, Isaac Zhang, 2D3ABB6F2434109A105170FB21D00453, 1, Fri Jun 21 15:07:53 CST 2019, Fri Jun 21 15:07:53 CST 2019]}
对于这个时间格式,我们需要关注2点信息:
- CST,这个时间格式会比我们的时间+ 8h(中国标准时间 China Standard Time UT+8:00)
- 需要对这个日期进行解释处理
当然,我们也可以通过设置mysql的日期格式来改变该行为,在此,我们通过编码来解析该时间格式:
/**
* Thu Jun 27 08:00:00 CST 2019
*/
public static Date parseBinlogString2Date(String dateString) {
try {
DateFormat dateFormat = new SimpleDateFormat(
"EEE MMM dd HH:mm:ss zzz yyyy",
Locale.US
);
return DateUtils.addHours(dateFormat.parse(dateString), -8);
} catch (ParseException ex) {
log.error("parseString2Date error:{}", dateString);
return null;
}
}
因为我们在定义索引的时候,是根据表之间的层级关系(Level)来设定的,根据代码规范,不允许出现Magic Number, 因此我们定义一个数据层级枚举,来表达数据层级。
/**
* AdDataLevel for 广告数据层级
*
* @author <a href="mailto:magicianisaac@gmail.com">Isaac.Zhang | 若初</a>
*/
@Getter
public enum AdDataLevel {
LEVEL2("2", "level 2"),
LEVEL3("3", "level 3"),
LEVEL4("4", "level 4");
private String level;
private String desc;
AdDataLevel(String level, String desc) {
this.level = level;
this.desc = desc;
}
}
实现数据投递
因为增量数据可以投递到不同的位置以及用途,我们之前实现了一个投递接口com.sxzhongf.ad.sender.ISender,接下来我们实现一个投递类:
@Slf4j
@Component("indexSender")
public class IndexSender implements ISender {
/**
* 根据广告级别,投递Binlog数据
*/
@Override
public void sender(MysqlRowData rowData) {
if (AdDataLevel.LEVEL2.getLevel().equals(rowData.getLevel())) {
Level2RowData(rowData);
} else if (AdDataLevel.LEVEL3.getLevel().equals(rowData.getLevel())) {
Level3RowData(rowData);
} else if (AdDataLevel.LEVEL4.getLevel().equals(rowData.getLevel())) {
Level4RowData(rowData);
} else {
log.error("Binlog MysqlRowData error: {}", JSON.toJSONString(rowData));
}
}
private void Level2RowData(MysqlRowData rowData) {
if (rowData.getTableName().equals(Constant.AD_PLAN_TABLE_INFO.TABLE_NAME)) {
List<AdPlanTable> planTables = new ArrayList<>();
for (Map<String, String> fieldValueMap : rowData.getFieldValueMap()) {
AdPlanTable planTable = new AdPlanTable();
//Map的第二种循环方式
fieldValueMap.forEach((k, v) -> {
switch (k) {
case Constant.AD_PLAN_TABLE_INFO.COLUMN_PLAN_ID:
planTable.setPlanId(Long.valueOf(v));
break;
case Constant.AD_PLAN_TABLE_INFO.COLUMN_USER_ID:
planTable.setUserId(Long.valueOf(v));
break;
case Constant.AD_PLAN_TABLE_INFO.COLUMN_PLAN_STATUS:
planTable.setPlanStatus(Integer.valueOf(v));
break;
case Constant.AD_PLAN_TABLE_INFO.COLUMN_START_DATE:
planTable.setStartDate(CommonUtils.parseBinlogString2Date(v));
break;
case Constant.AD_PLAN_TABLE_INFO.COLUMN_END_DATE:
planTable.setEndDate(CommonUtils.parseBinlogString2Date(v));
break;
}
});
planTables.add(planTable);
}
//投递推广计划
planTables.forEach(p -> AdLevelDataHandler.handleLevel2Index(p, rowData.getOperationTypeEnum()));
} else if (rowData.getTableName().equals(Constant.AD_CREATIVE_TABLE_INFO.TABLE_NAME)) {
List<AdCreativeTable> creativeTables = new LinkedList<>();
rowData.getFieldValueMap().forEach(afterMap -> {
AdCreativeTable creativeTable = new AdCreativeTable();
afterMap.forEach((k, v) -> {
switch (k) {
case Constant.AD_CREATIVE_TABLE_INFO.COLUMN_CREATIVE_ID:
creativeTable.setAdId(Long.valueOf(v));
break;
case Constant.AD_CREATIVE_TABLE_INFO.COLUMN_TYPE:
creativeTable.setType(Integer.valueOf(v));
break;
case Constant.AD_CREATIVE_TABLE_INFO.COLUMN_MATERIAL_TYPE:
creativeTable.setMaterialType(Integer.valueOf(v));
break;
case Constant.AD_CREATIVE_TABLE_INFO.COLUMN_HEIGHT:
creativeTable.setHeight(Integer.valueOf(v));
break;
case Constant.AD_CREATIVE_TABLE_INFO.COLUMN_WIDTH:
creativeTable.setWidth(Integer.valueOf(v));
break;
case Constant.AD_CREATIVE_TABLE_INFO.COLUMN_AUDIT_STATUS:
creativeTable.setAuditStatus(Integer.valueOf(v));
break;
case Constant.AD_CREATIVE_TABLE_INFO.COLUMN_URL:
creativeTable.setAdUrl(v);
break;
}
});
creativeTables.add(creativeTable);
});
//投递广告创意
creativeTables.forEach(c -> AdLevelDataHandler.handleLevel2Index(c, rowData.getOperationTypeEnum()));
}
}
private void Level3RowData(MysqlRowData rowData) {
...
}
/**
* 处理4级广告
*/
private void Level4RowData(MysqlRowData rowData) {
...
}
}
投放增量数据到MQ(kafka)
为了我们的数据投放更加灵活,方便数据统计,分析等系统的需求,我们来实现一个投放到消息中的接口,其他服务可以订阅当前MQ 的TOPIC来实现数据订阅。
配置文件中配置TOPIC
adconf:
kafka:
topic: ad-search-mysql-data
--------------------------------------
/**
* KafkaSender for 投递Binlog增量数据到kafka消息队列
*
* @author <a href="mailto:magicianisaac@gmail.com">Isaac.Zhang | 若初</a>
* @since 2019/7/1
*/
@Component(value = "kafkaSender")
public class KafkaSender implements ISender {
@Value("${adconf.kafka.topic}")
private String topic;
@Autowired
private KafkaTemplate kafkaTemplate;
/**
* 发送数据到kafka队列
*/
@Override
public void sender(MysqlRowData rowData) {
kafkaTemplate.send(
topic, JSON.toJSONString(rowData)
);
}
/**
* 测试消费kafka消息
*/
@KafkaListener(topics = {"ad-search-mysql-data"}, groupId = "ad-search")
public void processMysqlRowData(ConsumerRecord<?, ?> record) {
Optional<?> kafkaMsg = Optional.ofNullable(record.value());
if (kafkaMsg.isPresent()) {
Object message = kafkaMsg.get();
MysqlRowData rowData = JSON.parseObject(
message.toString(),
MysqlRowData.class
);
System.out.println("kafka process MysqlRowData: " + JSON.toJSONString(rowData));
//sender.sender();
}
}
}
关注公众号
低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
-
上一篇
[Spring cloud 一步步实现广告系统] 15. Binlog 增量准备
MySQL Binlog简介 什么是binlog? 一个二进制日志,用来记录对数据发生或潜在发生更改的SQL语句,并以而进行的形式保存在磁盘中。 binlog 的作用? 最主要有3个用途: 数据复制(主从同步) Mysql 的Master-Slave协议,让Slave可以通过监听binlog实现数据复制,达到数据一致性目的 数据恢复 通过mysqlbinlog工具恢复数据 增量备份 Binlog 变量 log_bin (Binlog 开关,使用show variables like 'log_bin';查看) binlog_format (Binlog 日志格式,使用show variables like 'binlog_format';查看) 日志格式总共有三种: ROW, 仅保存记录被修改的细节,不记录SQL语句上下文相关信息。(能清晰的记录下每行数据的修改细节,不需要记录上下文相关信息,因此不会发生某些特定情况下的procedure、function以及trigger 的调用无法被准确复制的问题,任何情况下都可以被复制,且能加快从库重放日志的效率,保证从库数据的一致性) STAT...
-
下一篇
黄小斜:我的成长史,关于考研,求职与写作的那些事
阅读本文大概需要 9.96 分钟。 今天和大家分享一下我的故事,相信老读者都比较了解,恰逢今天是公众号首次发文一周年的纪念日,借此机会,重新向大家介绍一下我的经历。 1.从小到大,我的成绩都只能算中等。参加高考的时候,正常发挥,和估的分就差了不到5分,分数不算高,勉强够着了一所省内211的最低门槛。当时的分数不支持我选择自己比较感兴趣的计算机专业,所以只好在学校和专业之间做一个权衡。 在保证能上这所学校的前提下,选了一个相对比较折中的专业。没想到的是,在大学的四年时间里,基本上这个专业学的东西和计算机没有太大的关系,也就几个学了几门像c语言、Java这类非常基础的课程,其他基础的计算机课程,比如操作系统、计算机网络,一个都没有学过。 由于接触的不多,那时候对于计算机行业,我只是充满着好奇,甚至觉得学了Android开发的基础就可以去面试安卓工程师了,没有太多的思考和了解。 2.我的大学,可以说是浑浑噩噩地玩了三年,对当时的网络游戏和手机游戏如数家珍,每天基本上都是这样一个节奏,上课,打DOTA,睡觉。上课,打英雄联盟,睡觉。上课,打魔兽世界,睡觉。 当时间来到大三的那个暑假时,我面临了...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- SpringBoot2初体验,简单认识spring boot2并且搭建基础工程
- Linux系统CentOS6、CentOS7手动修改IP地址
- Docker快速安装Oracle11G,搭建oracle11g学习环境
- SpringBoot2配置默认Tomcat设置,开启更多高级功能
- Dcoker安装(在线仓库),最新的服务器搭配容器使用
- CentOS7设置SWAP分区,小内存服务器的救世主
- CentOS关闭SELinux安全模块
- MySQL8.0.19开启GTID主从同步CentOS8
- SpringBoot2整合MyBatis,连接MySql数据库做增删改查操作
- MySQL数据库在高并发下的优化方案

微信收款码
支付宝收款码