基于Flink和Drools的实时日志处理
云栖号资讯:【点击查看更多行业资讯】
在这里您可以找到不同行业的第一手的上云资讯,还在等什么,快来!
背景
日志系统接入的日志种类多、格式复杂多样,主流的有以下几种日志:
- Filebeat采集到的文本日志,格式多样
- Winbeat采集到的操作系统日志
- 设备上报到Logstash的syslog日志
- 接入到Kafka的业务日志
以上通过各种渠道接入的日志,存在2个主要的问题:
- 格式不统一、不规范、标准化不够
- 如何从各类日志中提取出用户关心的指标,挖掘更多的业务价值
为了解决上面2个问题,我们基于Flink和Drools规则引擎做了实时的日志处理服务。
系统架构
架构比较简单,架构图如下:
各类日志都是通过Kafka汇总,做日志中转。
Flink消费Kafka的数据,同时通过API调用拉取Drools规则引擎,对日志做解析处理后,将解析后的数据存储到Elasticsearch中,用于日志的搜索和分析等业务。
为了监控日志解析的实时状态,Flink会将日志处理的统计数据,如每分钟处理的日志量,每种日志从各个机器IP来的日志量写到Redis中,用于监控统计。
模块介绍
系统项目命名为Eagle。
eagle-api:基于Spring Boot,作为Drools规则引擎的写入和读取API服务。
eagle-common:通用类模块。
eagle-log:基于Flink的日志处理服务。
重点讲一下eagle-log:
对接kafka、ES和Redis
对接Kafka和ES都比较简单,用的官方的connector(flink-connector-kafka-0.10和flink-connector-elasticsearch6),详见代码。
对接Redis,最开始用的是org.apache.bahir提供的redis connector,后来发现灵活度不够,就使用了Jedis。
在将统计数据写入redis的时候,最开始用的keyby分组后缓存了分组数据,在sink中做统计处理后写入,参考代码如下:
String name = "redis-agg-log"; DataStream<Tuple2<String, List<LogEntry>>> keyedStream = dataSource.keyBy((KeySelector<LogEntry, String>) log -> log.getIndex()) .timeWindow(Time.seconds(windowTime)).trigger(new CountTriggerWithTimeout<>(windowCount, TimeCharacteristic.ProcessingTime)) .process(new ProcessWindowFunction<LogEntry, Tuple2<String, List<LogEntry>>, String, TimeWindow>() { @Override public void process(String s, Context context, Iterable<LogEntry> iterable, Collector<Tuple2<String, List<LogEntry>>> collector) { ArrayList<LogEntry> logs = Lists.newArrayList(iterable); if (logs.size() > 0) { collector.collect(new Tuple2(s, logs)); } } }).setParallelism(redisSinkParallelism).name(name).uid(name);
后来发现这样做对内存消耗比较大,其实不需要缓存整个分组的原始数据,只需要一个统计数据就OK了,优化后:
String name = "redis-agg-log"; DataStream<LogStatWindowResult> keyedStream = dataSource.keyBy((KeySelector<LogEntry, String>) log -> log.getIndex()) .timeWindow(Time.seconds(windowTime)) .trigger(new CountTriggerWithTimeout<>(windowCount, TimeCharacteristic.ProcessingTime)) .aggregate(new LogStatAggregateFunction(), new LogStatWindowFunction()) .setParallelism(redisSinkParallelism).name(name).uid(name);
这里使用了Flink的聚合函数和Accumulator,通过Flink的agg操作做统计,减轻了内存消耗的压力。
使用Broadcast广播Drools规则引擎
1、Drools规则流通过broadcast map state广播出去。
2、Kafka的数据流connect规则流处理日志。
//广播规则流 env.addSource(new RuleSourceFunction(ruleUrl)).name(ruleName).uid(ruleName).setParallelism(1) .broadcast(ruleStateDescriptor); //Kafka数据流 FlinkKafkaConsumer010<LogEntry> source = new FlinkKafkaConsumer010<>(kafkaTopic, new LogSchema(), properties); env.addSource(source).name(kafkaTopic).uid(kafkaTopic).setParallelism(kafkaParallelism); //数据流connect规则流处理日志 BroadcastConnectedStream<LogEntry, RuleBase> connectedStreams = dataSource.connect(ruleSource); connectedStreams.process(new LogProcessFunction(ruleStateDescriptor, ruleBase)).setParallelism(processParallelism).name(name).uid(name);
具体细节参考开源代码。
小结
本系统提供了一个基于Flink的实时数据处理参考,对接了Kafka、Redis和Elasticsearch,通过可配置的Drools规则引擎,将数据处理逻辑配置化和动态化。
对于处理后的数据,也可以对接到其他sink,为其他各类业务平台提供数据的解析、清洗和标准化服务。
【云栖号在线课堂】每天都有产品技术专家分享!
课程地址:https://yqh.aliyun.com/live立即加入社群,与专家面对面,及时了解课程最新动态!
【云栖号在线课堂 社群】https://c.tb.cn/F3.Z8gvnK
原文发布时间:2020-07-09
本文作者: aoxiang
本文来自:“dockone”,了解相关信息可以关注“dockone”
低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
IoTSharp 0.2 发布,已成功对接 STM32、树莓派等多种设备
IoTSharp 是一个基于.Net Core 3.1 的跨平台物联网平台, 目前最新版本实现如下功能! 服务端支持方面: 一、支持HTTP的遥测数据、属性数据上传和获取。 二、支持完整的HTTP Api进行设备以及多租户设备管理 三、支持MQTT协议遥测数据的上传和订阅 四、RPC支持mqtt和http, coap晚些时候支持。 五、支持coap的属性上传和遥测上传 六、通过kimbus 内置ModBus 服务端。 七、通过MQTTnet 内置MQTTBroker 并能通过IoTSharp.X509Extensions进行自签名证书以实现通过证书限定身份和加密通讯 八、 为了给IoTSharp的研发过程提供更好的软件生态, 我们独立开源了Silkier、Silkier.EFCore、SSilkier.AspNetCore 便于让IoTSharp代码更加整洁。通过整合和优化调整了其他开源产品开源了 SilkierQuartz 进行任务管理。 设备测对接方面: 一、实现了通过C语言进行连接平台的演示 https://github.com/IoTSharp/IoTSharp.Edge.pa...
- 下一篇
后来者腾讯云的焦炙
一直不被外界所知的腾讯云收入数据,在腾讯前不久的财报中,披露出来了。 在腾讯2019年年度财报中,腾讯云在2019年全年实现营收突破170亿元,增速达到87%,在金融科技及企业服务业务总收入里占比约为17%。 不过在腾讯最新披露的2020年第一季度财报中,占其总收入25%的金融科技及企业服务同比增速放缓,仅有22%,在2019年同期增速为44%。同时腾讯并没有将腾讯云的收入单独披露出来。 如今已经成为中国第二大、亚太地区第四的云服务商腾讯云,在云计算风谲云诡的战场里,还要走很长时间的一段路。 腾讯云的追赶 自从腾讯“930变革”之后,腾讯云的发展就备受外界关注。 一直以来To C端风生水起的腾讯,在2018年9月宣布组织架构的调整,将原先的七大事业群(BG)重组整合,同时成立了新的云与智慧产业事业群(CSIG)和平台与内容事业群(PCG),开启了拥抱产业互联网的战略。 腾讯云作为CSIG中的重头戏,其表现如何关系着腾讯To B的进程。经过腾讯在腾讯云方面不断投入,腾讯云的表现并不差。 根据腾讯官方披露的数据,腾讯云在2019年年度实现营收为170亿元,增速达到87%。同期的阿里云增速为...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- SpringBoot2初体验,简单认识spring boot2并且搭建基础工程
- Linux系统CentOS6、CentOS7手动修改IP地址
- SpringBoot2全家桶,快速入门学习开发网站教程
- CentOS8编译安装MySQL8.0.19
- CentOS7,CentOS8安装Elasticsearch6.8.6
- SpringBoot2整合Redis,开启缓存,提高访问速度
- Red5直播服务器,属于Java语言的直播服务器
- CentOS6,CentOS7官方镜像安装Oracle11G
- Windows10,CentOS7,CentOS8安装Nodejs环境
- Springboot2将连接池hikari替换为druid,体验最强大的数据库连接池