固定QPS压测模式探索
在早前跟测试同行在QQ群聊天的时候,聊过一个固定QPS压测的问题,最近突然有需求,想实现一下,丰富一下自己的性能测试框架,最新的代码请移步我的GitHub
,地址:https://github.com/JunManYuanLong/FunTester,gitee
地址:https://gitee.com/fanapi/tester。
思路
-
有一个多线程的基类,其他压测任务类继承于基类。 -
并发执行类由 线程池、 任务发生器和 补偿器组成。 -
单线程执行 任务发生器将生成的任务对象丢到线程池里面执行。 -
另起 补偿器线程完成缺失的补偿。(由于多种原因,真实发生量小于设定值)
总体的思路与如何mock固定QPS的接口、moco固定QPS接口升级补偿机制这两票文章一致,但是没有采取Semaphore
的模式,原因是moco
是多线程对单线程,压测
是单线程对多线程。
-
语言继续采用 Java
语言。
基类
写得有点仓促,还未进行大量实践,所以注释少一些。这里依然设计两种子模式:定量压测和定时压测,这里由于两种压测模式,通过一个属性isTimesMode
记录,在执行类FixedQpsConcurrent
中用到,单次压测任务对象统一isTimesMode
和limit
两个属性。
package com.fun.base.constaint;
import com.fun.base.interfaces.MarkThread;
import com.fun.config.HttpClientConstant;
import com.fun.frame.execute.FixedQpsConcurrent;
import com.fun.frame.httpclient.GCThread;
import com.fun.utils.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public abstract class FixedQpsThread<T> extends ThreadBase {
private static Logger logger = LoggerFactory.getLogger(FixedQpsThread.class);
public int qps;
public int limit;
public boolean isTimesMode;
public FixedQpsThread(T t, int limit, int qps, MarkThread markThread) {
this.limit = limit;
this.qps = qps;
this.mark = markThread;
this.t = t;
isTimesMode = limit > 1000 ? true : false;
}
protected FixedQpsThread() {
super();
}
@Override
public void run() {
try {
before();
threadmark = mark == null ? EMPTY : this.mark.mark(this);
long s = Time.getTimeStamp();
doing();
long e = Time.getTimeStamp();
long diff = e - s;
FixedQpsConcurrent.allTimes.add(diff);
FixedQpsConcurrent.executeTimes.getAndIncrement();
if (diff > HttpClientConstant.MAX_ACCEPT_TIME)
FixedQpsConcurrent.marks.add(diff + CONNECTOR + threadmark);
} catch (Exception e) {
logger.warn("执行任务失败!", e);
logger.warn("执行失败对象的标记:{}", threadmark);
FixedQpsConcurrent.errorTimes.getAndIncrement();
} finally {
after();
}
}
@Override
public void before() {
GCThread.starts();
}
/**
* 子类必需实现改方法,不然调用deepclone方法会报错
*
* @return
*/
public abstract FixedQpsThread clone();
}
执行类
此处补偿线程设计还待优化,中间有两处休眠:一处是循环检测是否需要补偿,一处是单词补偿间隔。尚未提取配置变量,有待后面实践之后进行优化调整。测试结果对象依然采用了原来的,数值和计算方式保持一致,后期也会根据实践结果进行调整,可以关注我的GitHub
及时获取更新。
package com.fun.frame.execute;
import com.fun.base.bean.PerformanceResultBean;
import com.fun.base.constaint.FixedQpsThread;
import com.fun.config.Constant;
import com.fun.frame.Save;
import com.fun.frame.SourceCode;
import com.fun.frame.httpclient.GCThread;
import com.fun.utils.Time;
import com.fun.utils.WriteRead;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Vector;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import static java.util.stream.Collectors.toList;
/**
* 并发类,用于启动压力脚本
*/
public class FixedQpsConcurrent extends SourceCode {
private static Logger logger = LoggerFactory.getLogger(FixedQpsConcurrent.class);
public static boolean key = false;
public static AtomicInteger executeTimes = new AtomicInteger();
public static AtomicInteger errorTimes = new AtomicInteger();
public static Vector<String> marks = new Vector<>();
/**
* 用于记录所有请求时间
*/
public static Vector<Long> allTimes = new Vector<>();
/**
* 开始时间
*/
public long startTime;
/**
* 结束时间
*/
public long endTime;
public int queueLength;
/**
* 任务描述
*/
public String desc = DEFAULT_STRING;
/**
* 任务集
*/
public List<FixedQpsThread> threads = new ArrayList<>();
/**
* 线程池
*/
ExecutorService executorService;
/**
* @param thread 线程任务
*/
public FixedQpsConcurrent(FixedQpsThread thread) {
this(thread, DEFAULT_STRING);
}
/**
* @param threads 线程组
*/
public FixedQpsConcurrent(List<FixedQpsThread> threads) {
this(threads, DEFAULT_STRING);
}
/**
* @param thread 线程任务
* @param desc 任务描述
*/
public FixedQpsConcurrent(FixedQpsThread thread, String desc) {
this();
this.queueLength = 1;
threads.add(thread);
this.desc = desc + Time.getNow();
}
/**
* @param threads 线程组
* @param desc 任务描述
*/
public FixedQpsConcurrent(List<FixedQpsThread> threads, String desc) {
this();
this.threads = threads;
this.queueLength = threads.size();
this.desc = desc + Time.getNow();
}
private FixedQpsConcurrent() {
executorService = ThreadPoolUtil.createPool(20, 200, 3);
}
/**
* 执行多线程任务
* 默认取list中thread对象,丢入线程池,完成多线程执行,如果没有threadname,name默认采用desc+线程数作为threadname,去除末尾的日期
*/
public PerformanceResultBean start() {
key = false;
FixedQpsThread fixedQpsThread = threads.get(0);
boolean isTimesMode = fixedQpsThread.isTimesMode;
int limit = fixedQpsThread.limit;
int qps = fixedQpsThread.qps;
long interval = 1_000_000_000 / qps;
AidThread aidThread = new AidThread();
new Thread(aidThread).start();
startTime = Time.getTimeStamp();
while (true) {
executorService.execute(threads.get(limit-- % queueLength).clone());
if (key ? true : isTimesMode ? limit < 1 : Time.getTimeStamp() - startTime > fixedQpsThread.limit) break;
sleep(interval);
}
endTime = Time.getTimeStamp();
aidThread.stop();
GCThread.stop();
try {
executorService.shutdown();
executorService.awaitTermination(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
logger.error("线程池等待任务结束失败!", e);
}
logger.info("总计执行 {} ,共用时:{} s,执行总数:{},错误数:{}!", fixedQpsThread.isTimesMode ? fixedQpsThread.limit + "次任务" : "秒", Time.getTimeDiffer(startTime, endTime), executeTimes, errorTimes);
return over();
}
private PerformanceResultBean over() {
key = true;
Save.saveLongList(allTimes, "data/" + queueLength + desc);
Save.saveStringListSync(marks, MARK_Path.replace(LONG_Path, EMPTY) + desc);
allTimes = new Vector<>();
marks = new Vector<>();
executeTimes.set(0);
errorTimes.set(0);
return countQPS(queueLength, desc, Time.getTimeByTimestamp(startTime), Time.getTimeByTimestamp(endTime));
}
/**
* 计算结果
* <p>此结果仅供参考</p>
*
* @param name 线程数
*/
public PerformanceResultBean countQPS(int name, String desc, String start, String end) {
List<String> strings = WriteRead.readTxtFileByLine(Constant.DATA_Path + name + desc);
int size = strings.size();
List<Integer> data = strings.stream().map(x -> changeStringToInt(x)).collect(toList());
int sum = data.stream().mapToInt(x -> x).sum();
Collections.sort(data);
String statistics = StatisticsUtil.statistics(data, desc, this.queueLength);
double qps = 1000.0 * size * name / sum;
return new PerformanceResultBean(desc, start, end, name, size, sum / size, qps, getPercent(executeTimes.get(), errorTimes.get()), 0, executeTimes.get(), statistics);
}
/**
* 用于做后期的计算
*
* @param name
* @param desc
* @return
*/
public PerformanceResultBean countQPS(int name, String desc) {
return countQPS(name, desc, Time.getDate(), Time.getDate());
}
/**
* 后期计算用
*
* @param name
* @return
*/
public PerformanceResultBean countQPS(int name) {
return countQPS(name, EMPTY, Time.getDate(), Time.getDate());
}
/**
* 补偿线程
*/
class AidThread implements Runnable {
private boolean key = true;
int i;
public AidThread() {
}
@Override
public void run() {
logger.info("补偿线程开始!");
while (key) {
long expect = (Time.getTimeStamp() - startTime) / 1000 * threads.get(0).qps;
if (expect > executeTimes.get() + 10) {
range((int) expect - executeTimes.get()).forEach(x -> {
sleep(100);
executorService.execute(threads.get(i++ % queueLength).clone());
});
}
sleep(3);
}
logger.info("补偿线程结束!");
}
public void stop() {
key = false;
}
}
}
其他配套的标记类
、统计类
还等待修改,比较简单,这里不放代码了。
公众号FunTester首发,原创分享爱好者,腾讯云、开源中国和掘金社区首页推荐,知乎准八级原创作者,欢迎关注、交流,禁止第三方擅自转载。
FunTester热文精选
-
写给所有人的编程思维 -
2020年Tester自我提升 -
未来的神器fiddler Everywhere -
测试开发工程师工作技巧 -
Selenium4 IDE,它终于来了 -
自动化测试灵魂三问:是什么、为什么和做什么 -
为什么测试覆盖率如此重要 -
吐个槽,非测勿入 -
自动化测试框架 -
敏捷中的端到端测试
本文分享自微信公众号 - FunTester(NuclearTester)。
如有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
【高并发】高并发秒杀系统架构解密,不是所有的秒杀都是秒杀!
点击上方蓝色“冰河技术”,关注并选择“设为星标” 持之以恒,贵在坚持,每天进步一点点! 作者个人研发的在高并发场景下,提供的简单、稳定、可扩展的延迟消息队列框架,具有精准的定时任务和延迟队列处理功能。自开源半年多以来,已成功为十几家中小型企业提供了精准定时调度方案,经受住了生产环境的考验。为使更多童鞋受益,现给出开源框架地址: https://github.com/sunshinelyz/mykit-delay PS: 欢迎各位Star源码,也可以pr你牛逼哄哄的代码。 写在前面 很多小伙伴反馈说,高并发专题学了那么久,但是,在真正做项目时,仍然不知道如何下手处理高并发业务场景!甚至很多小伙伴仍然停留在只是简单的提供接口(CRUD)阶段,不知道学习的并发知识如何运用到实际项目中,就更别提如何构建高并发系统了! 究竟什么样的系统算是高并发系统?今天,我们就一起解密高并发业务场景下典型的秒杀系统的架构,结合高并发专题下的其他文章,学以致用。 电商系统架构 在电商领域,存在着典型的秒杀业务场景,那何谓秒杀场景呢。简单的来说就是一件商品的购买人数远远大于这件商品的库存,而且这件商品在很短的...
- 下一篇
深入 unserialize() 函数之RCE漏洞身份验证绕过及注入
文章源自-投稿 作者-挽梦雪舞 扫描下方二维码进入社区: 继续我们上次的讨论,我们聊到了PHP的反序列化如何导致漏洞,以及攻击者如何利用POP链来实现RCE攻击。 今天我们接着讨论攻击者利用unserialize() 漏洞的其他骚操作。那么即使无法进行RCE攻击,攻击者仍可以使用该漏洞来实现身份验证绕过和SQL注入。 一、先来谈谈身份验证绕过: 除了RCE攻击,unserialize() 通常还被用于绕过应用程序的身份验证控制。目前我了解到的 有两种方法可以做到这一点: 1.通过控制用作访问控制的对象属性。 2.利用类型混淆问题来欺骗应用程序。 但是要注意这两种方法都取决于最终用户可以控制传递给unserialize()的对象的情况。 二、通过控制对象属性来绕过身份验证 其中,攻击者利用反序列化漏洞中最简单最常见的方法之一是控制对象属性来绕过身份验证,示例代码如下: 我们假设应用程序在注册过程中调用了一个名为User的类来传递用户数据,用户来填写一个表格,数据将通过序列化后的User对象传递给后端。 最终用户控制User对象,然后可以像下面这样简单地操作对象,并注册为admin...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- SpringBoot2编写第一个Controller,响应你的http请求并返回结果
- CentOS7编译安装Gcc9.2.0,解决mysql等软件编译问题
- CentOS8编译安装MySQL8.0.19
- CentOS8安装Docker,最新的服务器搭配容器使用
- CentOS8,CentOS7,CentOS6编译安装Redis5.0.7
- SpringBoot2整合Redis,开启缓存,提高访问速度
- Mario游戏-低调大师作品
- Docker安装Oracle12C,快速搭建Oracle学习环境
- CentOS7安装Docker,走上虚拟化容器引擎之路
- CentOS6,7,8上安装Nginx,支持https2.0的开启