flink内部计算指标的95线-99线等的实现
15年在某电商从0设计了一个通用的API监控系统,当时只是计算了成功率+平均耗时,没有算75,90,95,99,999,9999线,这次单位需要,所以促使我去思考这个问题,问了单位CAT维护人员,大致了解了计算方式,跟我在18年7月份在单位内网BBS发表的文章思路是一致的,所以就直接写了下面的代码
PercentageCalculation.java
package com.ymm.computation.udf.define;
import org.apache.flink.table.functions.AggregateFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
//批量计算95line类似的数据
public class PercentageCalculation extends AggregateFunction<Object, PercentageAccumulator> {
/** */
private static final long serialVersionUID = 4009559061130131166L;
private static final Logger LOG = LoggerFactory
.getLogger(PercentageCalculation.class);
//private static BlockingQueue<PercentageAggregatorContainer> GLOBAL_QUEUE = new LinkedBlockingQueue<PercentageAggregatorContainer>();
@Override
public PercentageAccumulator createAccumulator() {
return new PercentageAccumulator();
}
public void accumulate(PercentageAccumulator accumulator, Object value) {
accumulator.accumulate(value);
}
@Override
public Object getValue(PercentageAccumulator accumulator) {
return accumulator.getValue();
}
public void resetAccumulator(PercentageAccumulator acc) {
acc = null;//help GC
}
}
PercentageAccumulator.java
package com.ymm.computation.udf.define;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
//!只针对时间来计算95线等,其它参数不要使用本类
public class PercentageAccumulator {
private static final Logger LOG = LoggerFactory
.getLogger(PercentageAccumulator.class);
public final static double PERCENT_50 = 0.5;
public final static double PERCENT_75 = 0.25;
public final static double PERCENT_90 = 0.1;
public final static double PERCENT_95 = 0.05;
public final static double PERCENT_99 = 0.01;
public final static double PERCENT_999 = 0.001;
public final static double PERCENT_9999 = 0.0001;
public final static int PERCENT_COUNT = 7;
private final static int[] SCALE = { //
1, //
2, //
4, //
8, //
16, //
32, //
64, //
128, //
256, //
512, //
1024, //
2048, //
4096, //
8192, //
16384, //
32768, //
65536 //
};
private int[] countContainer = { //
0, //<=1
0, //<=2
0, //<=4
0, //<=8
0, //<=16
0, //<=32
0, //<=64
0, //<=128
0, //<=256
0, //<=512
0, //<=1024
0, //<=2048
0, //<=4096
0, //<=8192
0, //<=16384
0, //<=32768
0 //<=65536
};
private int positionByTwoDivision(int[] array, int begin, int end, int value) {
int mid = (begin + end) >> 1;
int midValue = array[mid];
int halfMidValue = midValue >> 1;
//判断是否可以命中mid
if (value > halfMidValue && value <= midValue) {
return mid;
}
//没法命中,则根据大小来定
if (value <= halfMidValue) {
if (mid - 1 < 0) {//没路可走的边界条件
return 0;
}
return positionByTwoDivision(array, begin, mid - 1, value);
} else {
return positionByTwoDivision(array, mid + 1, end, value);
}
}
public int positionInValueArray(int val) {
int length = SCALE.length;
//如果大于最大值|小于等于最小值
if (val >= SCALE[length - 1]) {
return length - 1;
} else if (val <= SCALE[0]) {
return 0;
}
//采用2分法来计算
return positionByTwoDivision(SCALE, 0, length - 1, val);
}
public void accumulate(Object value) {
//转换为long值,int值够用了
Long longValue = (Long) value;
int intValue = longValue.intValue();
//找到下标
int index = positionInValueArray(intValue);
countContainer[index]++;
}
//确保在[1,MAX]范围内,
//自然顺序
private int adjust(int input, int max) {
if (input <= 1) {
return 1;
} else if (input >= max) {
return max;
} else {
return input;
}
}
private static final ThreadLocal<StringBuilder> STR_BUILDER_ThreadLocal = new ThreadLocal<StringBuilder>() {
public StringBuilder initialValue() {
return new StringBuilder();
}
};
private static final String SEPARATOR = ":";
public String getValue() {
//total
int total = 0;
int length = countContainer.length;
for (int index = 0; index < length; index++) {
total += countContainer[index];
}
//如果total为0的异常情况
//注意是自然序---[1,total]
int percent_9999_pos = adjust((int) (total * PERCENT_9999), total);
boolean found_9999 = false;
int percent_9999_value = Integer.MAX_VALUE;
//999
int percent_999_pos = adjust((int) (total * PERCENT_999), total);
boolean found_999 = false;
int percent_999_value = Integer.MAX_VALUE;
//99
int percent_99_pos = adjust((int) (total * PERCENT_99), total);
boolean found_99 = false;
int percent_99_value = Integer.MAX_VALUE;
//95
int percent_95_pos = adjust((int) (total * PERCENT_95), total);
boolean found_95 = false;
int percent_95_value = Integer.MAX_VALUE;
//90
int percent_90_pos = adjust((int) (total * PERCENT_90), total);
boolean found_90 = false;
int percent_90_value = Integer.MAX_VALUE;
//75
int percent_75_pos = adjust((int) (total * PERCENT_75), total);
boolean found_75 = false;
int percent_75_value = Integer.MAX_VALUE;
//50
int percent_50_pos = adjust((int) (total * PERCENT_50), total);
boolean found_50 = false;
int percent_50_value = Integer.MAX_VALUE;
//开始遍历每一个元素,从后往前算
int scanned = 0;
int left = PERCENT_COUNT;
for (int index = length - 1; index >= 0; index--) {
//当前没有值,无论如何也不会成为备选
if (0 == countContainer[index]) {
continue;
}
//当前有值
scanned += countContainer[index];
//逐个判断
//9999线
if (false == found_9999 && scanned >= percent_9999_pos) {
percent_9999_value = SCALE[index];
found_9999 = true;
left--;
}
//999线
if (false == found_999 && scanned >= percent_999_pos) {
percent_999_value = SCALE[index];
found_999 = true;
left--;
}
//99线
if (false == found_99 && scanned >= percent_99_pos) {
percent_99_value = SCALE[index];
found_99 = true;
left--;
}
//95线
if (false == found_95 && scanned >= percent_95_pos) {
percent_95_value = SCALE[index];
found_95 = true;
left--;
}
//90线
if (false == found_90 && scanned >= percent_90_pos) {
percent_90_value = SCALE[index];
found_90 = true;
left--;
}
//75线
if (false == found_75 && scanned >= percent_75_pos) {
percent_75_value = SCALE[index];
found_75 = true;
left--;
}
//50线
if (false == found_50 && scanned >= percent_50_pos) {
percent_50_value = SCALE[index];
found_50 = true;
left--;
}
//全部都找到了就break
if (0 == left) {
break;
}
}
//所有的值都算好了
//拿出来时先reset一下
StringBuilder stringBuilder = STR_BUILDER_ThreadLocal.get();
stringBuilder.delete(0, stringBuilder.length());
//开始挂各种数据,测试表明每秒几百万次执行
stringBuilder.append(percent_50_value);
stringBuilder.append(SEPARATOR);
stringBuilder.append(percent_75_value);
stringBuilder.append(SEPARATOR);
stringBuilder.append(percent_90_value);
stringBuilder.append(SEPARATOR);
stringBuilder.append(percent_95_value);
stringBuilder.append(SEPARATOR);
stringBuilder.append(percent_99_value);
stringBuilder.append(SEPARATOR);
stringBuilder.append(percent_999_value);
stringBuilder.append(SEPARATOR);
stringBuilder.append(percent_9999_value);
//return
return stringBuilder.toString();
}
public void print() {
for (int index = 0; index < this.countContainer.length; index++) {
System.out.println(index + "->" + this.countContainer[index]);
}
}
}
欢迎提出 优化建议,比如对GC更友好的优化建议!
这个函数的瓶颈在于stringbuilder.

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
-
上一篇
使用redis来调用iptables,封禁恶意IP
话不多说,通常大多数站点都会有被薅羊毛的情况,防护无非也就是业务层做处理,短时内不再响应恶意请求啦.虽然不响应了,可还是会消耗资源的,比如我要从数据库(当然也可能是内存数据库)去查询下,你是不是恶意的IP. 那么能否网络层或应用层去处理呢?在前几篇文章有写过应用层方案,今天就写下网络层方法. 说起iptables 除非是专业人员,像普通开发者是不会使用的,一堆表一堆链的一看就头疼.所以**RedisPushIptables**就应时而生,开发者不须为iptables复杂语法头疼,只需要像使用redis那样简单,就可使用iptables来阻挡恶意IP地址. RedisPushIptables是一个redis模块,用于更新防火墙规则,以在指定的时间内拒绝IP地址或永久拒绝。比fail2ban更好用点,不到400行代码实现.适用任意业务,API调用,不需要分析应用日志,业务主动调用,缺点是要手动编码.不适用普通使用者. 该模块可以通过 redis 来操作 iptables 的 filter表INPUT链规则的增加和删除,可以用来动态调用防火墙。比如用来防御攻击。 但是前提要以 root 来运...
-
下一篇
大前端时代即将来临,后端该何去何从?
缘起 “天下大事,合久必分,分久必合” 一直以来,前端都是“切图师”,仅有“特效师”一脉堪称翘楚。 大家就这样安安分分过了几年,前端一直是js的天地,直到08年的一天,node.js 横空出世,开始不安分起来。 一时间,暗流涌动,后端也开始发力。 与此同时,也有 coffeescript 等相关的项目崛起,用后端语言写前端。 甚至 谷歌也有 GWT 这样的以 java 写前端的方式。 反击 但紧接着,grunt gulp webpack ... 以及三大框架 层出不穷。 express 、 koa 再到最近的nest.js 也让人难以招架。 node.js 在后端虽然没掀起大浪,可也总算有了一席之地,前端复杂的工具链及框架,coffeescript 败下阵来,GWT 后来演变为 Dart。 毫无疑问,这场战争的胜利属于 JavaScript/Typescript。 webassembly 出世 JavaScript 看来没办法绕过去了,Rust kotlin go 一直想通过 webassembly 弯道超车,有了一些前端框架(编译成 wasm), 可由于生态原因,兼容问题,以及各种难...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- SpringBoot2整合Redis,开启缓存,提高访问速度
- SpringBoot2整合MyBatis,连接MySql数据库做增删改查操作
- SpringBoot2整合Thymeleaf,官方推荐html解决方案
- SpringBoot2初体验,简单认识spring boot2并且搭建基础工程
- Docker快速安装Oracle11G,搭建oracle11g学习环境
- Dcoker安装(在线仓库),最新的服务器搭配容器使用
- SpringBoot2配置默认Tomcat设置,开启更多高级功能
- Docker使用Oracle官方镜像安装(12C,18C,19C)
- SpringBoot2全家桶,快速入门学习开发网站教程
- CentOS7,8上快速安装Gitea,搭建Git服务器