基于flink sql构建报警系统的若干技术点
1)选择滑动窗口
滑动窗口会导致一个时间点的数据会分布到多个window里来,其它跟滚动窗口没区别
2)滑动数据重新汇聚
计算源是单个时间窗口内预汇聚的数据,不适用于滑动窗口的计算值,比如前面单个窗口计算出来p90,那么滑动窗口的p90怎么算?
需要把前面的时间窗口的现场保留下来传到当前滑动窗口内才可以继续计算,具体代码如下:
//拿到原始数据后,重新计算 @SuppressWarnings("unchecked") public void accumulate(Object value) { if (null == value) { return; } HashMap<String, Object> mapVal = (HashMap<String, Object>) value; //1)在历史值和当前值中设置新的max值 Object maxObj = mapVal.get("max"); Object minObj = mapVal.get("min"); Object countObj = mapVal.get("count"); Object sumObj = mapVal.get("sum"); Object afObj = mapVal.get("af"); Object ccObj = mapVal.get("cc"); if (null == maxObj || null == minObj || null == countObj || null == sumObj || null == afObj || null == ccObj) { return; } if (false == (ccObj instanceof JSONArray)) { return; } JSONArray jsonArray = (JSONArray) ccObj; int jsonSize = jsonArray.size(); if (0 == jsonSize) { return; } this.max = Math.max(this.max, ((Number) maxObj).doubleValue()); this.min = Math.min(this.min, ((Number) minObj).doubleValue()); this.count += ((Number) countObj).intValue(); this.sum += ((Number) sumObj).doubleValue(); //每次都是直接替换 this.augmentFactor = ((Number) afObj).intValue(); //5)开始汇总以便后面计算各种95线之类的值 Integer[] intArray = new Integer[jsonSize]; intArray = jsonArray.toArray(intArray); for (int index = 0; index < jsonSize; index++) { countContainer[index] += intArray[index]; } //6)over }
3)海量tag的发现
如果上传的metric tag数据非常多,怎么去重是个问题,我采取的方案是
3.1)使用采样率
@Override public void filter(String metric, TreeMap<String, String> tagValues, boolean tagValuesEmpty) { Random randomGenerator = RANDOM_THREAD_LOCAL.get(); if (0 == randomGenerator.nextInt(20)) { //取5%的采样率 ReportQueue.put(ReportQueue.METRIC_TAG, new MetricAndTags(metric, tagValues, tagValuesEmpty)); //结束 } else { } }
3.2)布隆过滤器判重
//普通数据-bloom filter private static Integer SIZE = 100 * 1000 * 1000; private static Integer BITS = 20; private static Integer HASH_FUNCTION = 1; private static BloomFilter DATA_BLOOM_FILTER = new BloomFilter(SIZE, BITS, HASH_FUNCTION); //哈希code-bloom filter private static BloomFilter HASHCODE_BLOOM_FILTER = new BloomFilter(SIZE, BITS, HASH_FUNCTION); public static synchronized boolean isNewKey(String data) { Key dataKey = new Key(data.getBytes()); if (false == DATA_BLOOM_FILTER.membershipTest(dataKey)) { //不存在就是真的不存在 return true; } //再做hashcode的2次判断 if (false == HASHCODE_BLOOM_FILTER.membershipTest(hashCodeKey(data))) { //不存在就是真的不存在 return true; } //(如果2次都说存在,也没办法了,这条数据丢弃) //返回false表示不是new key return false; }
3.3)元数据幂等性保存到es
注意幂等性,之前存在的数据会被更新,而不是新增一条数据,因为我们是保存元数据
具体就是设置请求体里的upsert为true
@Data public class ExecutionMetricTagValue { private Boolean doc_as_upsert=true; private EsMetricTagValue doc; }
3.4)限流防对远程ES的流量冲击
这个是构建一个 Guava对象
private static final RateLimiter RATE_LIMITER = RateLimiter.create(500); //在JVM级别限流,防止对ES产生冲击 RATE_LIMITER.acquire(1);
4)用户配置数据拉取
用户配置的一些规则,通过另外一个JVM级别的线程拉取到本地内存,这样就可以不影响flink的计算速度
5)报警屏蔽周期
这是为了防止报警洪灾,实现思路
String res; try { SetParams setParams = new SetParams(); setParams.nx(); setParams.ex(alarmInterval); //仅仅是当前timeSpan内有效,10s不影响30s 1m这种 res = JEDIS_CLUSTER.set(timeSpan + "_hubble_alarm_" + fullKey, "1", setParams); } catch (Exception e) { LOG.error(e.toString()); return; } if (null != res) { //LOG.info("初次插入,可以报警");
主要就是这些,很难的点没有,就是要注意各个细节
----------------------------------------------------------------------------------------------------------
其实我觉得报警系统的精髓在于阈值的设置上,傻乎乎的设置静态值是没有技术含量的,整个报警系统的精髓就在于自动设置报警阈值
所以接下来我会去研究这方面的技术,如果研究出来了我会发文章出来!
----------------------------------------------------------------------------------------------------------
下面放界面图

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
谈谈Spring Boot 数据源加载及其多数据源简单实现
业务需求 提供所有微服务数据源的图形化维护功能 代码生成可以根据选择的数据源加载表等源信息 数据源管理要支持动态配置,实时生效 附录效果图 实现思路 本文提供方法仅供类似简单业务场景,在生产环境和复杂的业务场景 请使用分库分表的中间件(例如mycat)或者框架 sharding-sphere (一直在用)等 先来看Spring 默认的数据源注入策略,如下代码默认的事务管理器在初始化时回去加载数据源实现。这里就是我们动态数据源的入口 // 默认的事务管理器 ppublic class DataSourceTransactionManager extends AbstractPlatformTransactionManager implements ResourceTransactionManager, InitializingBean { // 启动时候注入一个数据源 public void setDataSource(@Nullable DataSource dataSource) { if (dataSource instanceof TransactionAwareDataSour...
- 下一篇
基于MaxCompute的数仓数据质量管理
声明 本文中介绍的非功能性规范均为建议性规范,产品功能无强制,仅供指导。 参考文献 《大数据之路——阿里巴巴大数据实践》——阿里巴巴数据技术及产品部 著。 背景及目的 数据对一个企业来说已经是一项重要的资产,既然是资产,肯定需要管理。随着业务的增加,数据的应用越来越多,企业在创建的数仓过程中对数据的管理也提出了更高的要求,而数据质量也是数仓建设过程不容忽视的环节。本文针对MaxCompute数仓建设过程中如何做数据质量给出规范建议,为实际数据治理提供依据及指导。 数据质量保障原则 评估数据质量的好坏不同行业甚至不同企业有不同标准,在此我们主要从四个方面进行评估,即完整性、准确性、一致性和及时性。 完整性。 完整性是指数据的记录和信息是否完整,是否存在缺失情况。数据缺失主要包括记录的缺失和记录中某个字段信息的缺失,两者都会造成统计结果不准确,可以说,完整性是数据质量最基础的保障。如某个相对稳定的业务数据量每天的都有100万条记录,某天突然下降1万条,那么可能就是记录缺失。而对于记录中某个字段信息缺失,如某科高考成绩表中一个考卷分数要对应一个准考证号,这个字段的空值数就该为0,一旦大于0,...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- CentOS8,CentOS7,CentOS6编译安装Redis5.0.7
- MySQL8.0.19开启GTID主从同步CentOS8
- Mario游戏-低调大师作品
- Linux系统CentOS6、CentOS7手动修改IP地址
- Docker安装Oracle12C,快速搭建Oracle学习环境
- Docker使用Oracle官方镜像安装(12C,18C,19C)
- CentOS7安装Docker,走上虚拟化容器引擎之路
- Docker快速安装Oracle11G,搭建oracle11g学习环境
- CentOS7编译安装Cmake3.16.3,解决mysql等软件编译问题
- CentOS7编译安装Gcc9.2.0,解决mysql等软件编译问题