在早前跟测试同行在QQ群聊天的时候,聊过一个固定QPS压测 的问题,最近突然有需求,想实现一下,丰富一下自己的性能测试框架 ,最新的代码请移步我的GitHub,地址:https://github.com/JunManYuanLong/FunTester ,gitee地址:https://gitee.com/fanapi/tester 。
思路
并发执行类由 线程池 、 任务发生器 和 补偿器 组成。
单线程执行 任务发生器 将生成的任务对象丢到线程池里面执行。
另起 补偿器 线程完成缺失的补偿。(由于多种原因,真实发生量小于设定值)
总体的思路与如何mock固定QPS的接口 、moco固定QPS接口升级补偿机制 这两票文章一致,但是没有采取Semaphore的模式,原因是moco是多线程对单线程,压测是单线程对多线程。
基类
写得有点仓促,还未进行大量实践,所以注释少一些。这里依然设计两种子模式:定量压测 和定时压测 ,这里由于两种压测模式,通过一个属性isTimesMode记录,在执行类FixedQpsConcurrent中用到,单次压测任务对象统一isTimesMode和limit两个属性。
package com.fun.base.constaint;import com.fun.base.interfaces.MarkThread;import com.fun.config.HttpClientConstant;import com.fun.frame.execute.FixedQpsConcurrent;import com.fun.frame.httpclient.GCThread;import com.fun.utils.Time;import org.slf4j.Logger;import org.slf4j.LoggerFactory;public abstract class FixedQpsThread <T > extends ThreadBase { private static Logger logger = LoggerFactory.getLogger(FixedQpsThread.class); public int qps; public int limit; public boolean isTimesMode; public FixedQpsThread (T t, int limit, int qps, MarkThread markThread) { this .limit = limit; this .qps = qps; this .mark = markThread; this .t = t; isTimesMode = limit > 1000 ? true : false ; } protected FixedQpsThread () { super (); } @Override public void run () { try { before(); threadmark = mark == null ? EMPTY : this .mark.mark(this ); long s = Time.getTimeStamp(); doing(); long e = Time.getTimeStamp(); long diff = e - s; FixedQpsConcurrent.allTimes.add(diff); FixedQpsConcurrent.executeTimes.getAndIncrement(); if (diff > HttpClientConstant.MAX_ACCEPT_TIME) FixedQpsConcurrent.marks.add(diff + CONNECTOR + threadmark); } catch (Exception e) { logger.warn("执行任务失败!" , e); logger.warn("执行失败对象的标记:{}" , threadmark); FixedQpsConcurrent.errorTimes.getAndIncrement(); } finally { after(); } } @Override public void before () { GCThread.starts(); } /** * 子类必需实现改方法,不然调用deepclone方法会报错 * * @return */ public abstract FixedQpsThread clone () ; }
执行类
此处补偿线程设计还待优化,中间有两处休眠:一处是循环检测是否需要补偿,一处是单词补偿间隔。尚未提取配置变量,有待后面实践之后进行优化调整。测试结果对象依然采用了原来的,数值和计算方式保持一致,后期也会根据实践结果进行调整,可以关注我的GitHub及时获取更新。
package com.fun.frame.execute;import com.fun.base.bean.PerformanceResultBean;import com.fun.base.constaint.FixedQpsThread;import com.fun.config.Constant;import com.fun.frame.Save;import com.fun.frame.SourceCode;import com.fun.frame.httpclient.GCThread;import com.fun.utils.Time;import com.fun.utils.WriteRead;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.util.ArrayList;import java.util.Collections;import java.util.List;import java.util.Vector;import java.util.concurrent.ExecutorService;import java.util.concurrent.TimeUnit;import java.util.concurrent.atomic.AtomicInteger;import static java.util.stream.Collectors.toList;/** * 并发类,用于启动压力脚本 */ public class FixedQpsConcurrent extends SourceCode { private static Logger logger = LoggerFactory.getLogger(FixedQpsConcurrent.class); public static boolean key = false ; public static AtomicInteger executeTimes = new AtomicInteger(); public static AtomicInteger errorTimes = new AtomicInteger(); public static Vector<String> marks = new Vector<>(); /** * 用于记录所有请求时间 */ public static Vector<Long> allTimes = new Vector<>(); /** * 开始时间 */ public long startTime; /** * 结束时间 */ public long endTime; public int queueLength; /** * 任务描述 */ public String desc = DEFAULT_STRING; /** * 任务集 */ public List<FixedQpsThread> threads = new ArrayList<>(); /** * 线程池 */ ExecutorService executorService; /** * @param thread 线程任务 */ public FixedQpsConcurrent (FixedQpsThread thread) { this (thread, DEFAULT_STRING); } /** * @param threads 线程组 */ public FixedQpsConcurrent (List<FixedQpsThread> threads) { this (threads, DEFAULT_STRING); } /** * @param thread 线程任务 * @param desc 任务描述 */ public FixedQpsConcurrent (FixedQpsThread thread, String desc) { this (); this .queueLength = 1 ; threads.add(thread); this .desc = desc + Time.getNow(); } /** * @param threads 线程组 * @param desc 任务描述 */ public FixedQpsConcurrent (List<FixedQpsThread> threads, String desc) { this (); this .threads = threads; this .queueLength = threads.size(); this .desc = desc + Time.getNow(); } private FixedQpsConcurrent () { executorService = ThreadPoolUtil.createPool(20 , 200 , 3 ); } /** * 执行多线程任务 * 默认取list中thread对象,丢入线程池,完成多线程执行,如果没有threadname,name默认采用desc+线程数作为threadname,去除末尾的日期 */ public PerformanceResultBean start () { key = false ; FixedQpsThread fixedQpsThread = threads.get(0 ); boolean isTimesMode = fixedQpsThread.isTimesMode; int limit = fixedQpsThread.limit; int qps = fixedQpsThread.qps; long interval = 1_000_000_000 / qps; AidThread aidThread = new AidThread(); new Thread(aidThread).start(); startTime = Time.getTimeStamp(); while (true ) { executorService.execute(threads.get(limit-- % queueLength).clone()); if (key ? true : isTimesMode ? limit < 1 : Time.getTimeStamp() - startTime > fixedQpsThread.limit) break ; sleep(interval); } endTime = Time.getTimeStamp(); aidThread.stop(); GCThread.stop(); try { executorService.shutdown(); executorService.awaitTermination(10 , TimeUnit.SECONDS); } catch (InterruptedException e) { logger.error("线程池等待任务结束失败!" , e); } logger.info("总计执行 {} ,共用时:{} s,执行总数:{},错误数:{}!" , fixedQpsThread.isTimesMode ? fixedQpsThread.limit + "次任务" : "秒" , Time.getTimeDiffer(startTime, endTime), executeTimes, errorTimes); return over(); } private PerformanceResultBean over () { key = true ; Save.saveLongList(allTimes, "data/" + queueLength + desc); Save.saveStringListSync(marks, MARK_Path.replace(LONG_Path, EMPTY) + desc); allTimes = new Vector<>(); marks = new Vector<>(); executeTimes.set(0 ); errorTimes.set(0 ); return countQPS(queueLength, desc, Time.getTimeByTimestamp(startTime), Time.getTimeByTimestamp(endTime)); } /** * 计算结果 * <p>此结果仅供参考</p> * * @param name 线程数 */ public PerformanceResultBean countQPS (int name, String desc, String start, String end) { List<String> strings = WriteRead.readTxtFileByLine(Constant.DATA_Path + name + desc); int size = strings.size(); List<Integer> data = strings.stream().map(x -> changeStringToInt(x)).collect(toList()); int sum = data.stream().mapToInt(x -> x).sum(); Collections.sort(data); String statistics = StatisticsUtil.statistics(data, desc, this .queueLength); double qps = 1000.0 * size * name / sum; return new PerformanceResultBean(desc, start, end, name, size, sum / size, qps, getPercent(executeTimes.get(), errorTimes.get()), 0 , executeTimes.get(), statistics); } /** * 用于做后期的计算 * * @param name * @param desc * @return */ public PerformanceResultBean countQPS (int name, String desc) { return countQPS(name, desc, Time.getDate(), Time.getDate()); } /** * 后期计算用 * * @param name * @return */ public PerformanceResultBean countQPS (int name) { return countQPS(name, EMPTY, Time.getDate(), Time.getDate()); } /** * 补偿线程 */ class AidThread implements Runnable { private boolean key = true ; int i; public AidThread () { } @Override public void run () { logger.info("补偿线程开始!" ); while (key) { long expect = (Time.getTimeStamp() - startTime) / 1000 * threads.get(0 ).qps; if (expect > executeTimes.get() + 10 ) { range((int ) expect - executeTimes.get()).forEach(x -> { sleep(100 ); executorService.execute(threads.get(i++ % queueLength).clone()); }); } sleep(3 ); } logger.info("补偿线程结束!" ); } public void stop () { key = false ; } } }
其他配套的标记类、统计类还等待修改,比较简单,这里不放代码了。
公众号FunTester 首发,原创分享爱好者,腾讯云、开源中国和掘金社区首页推荐,知乎准八级原创作者,欢迎关注、交流,禁止第三方擅自转载。
FunTester热文精选