性能工具之Jmeter压测Thrift RPC服务
2 --> 概述 Thrift是一个可互操作和可伸缩服务的框架,用来进行可扩展且跨语言的服务的开发。它结合了功能强大的软件堆栈和代码生成引擎,以构建在 C++, Java, Python, PHP, Ruby, Erlang, Perl, Haskell, C#, Cocoa, JavaScript, Node.js, Smalltalk, and OCaml 等等编程语言间无缝结合的、高效的服务。 Thrift最初由facebook开发,07年四月开放源码,08年5月进入apache孵化器。thrift允许你定义一个简单的定义文件中的数据类型和服务接口(IDL)。以作为输入文件,编译器生成代码用来方便地生成RPC客户端和服务器通信的无缝跨编程语言。 其传输数据采用二进制格式,相对于XML和JSON等序列化方式体积更小,对于高并发、大数据量和多语言的环境更有优势。 Thrift它含有三个主要的组件:protocol,transport和server,其中,protocol定义了消息是怎样序列化的,transport定义了消息是怎样在客户端和服务器端之间通信的,server用于从transport接收序列化的消息,根据protocol反序列化之,调用用户定义的消息处理器,并序列化消息处理器的响应,然后再将它们写回transport。 官网地址:thrift.apache.org 基本概念 架构图 堆栈的顶部是从Thrift定义文件生成的代码。Thrift 服务生成的客户端和处理器代码。这些由图中的棕色框表示。红色框为发送的数据结构(内置类型除外)也会生成代码。协议和传输是Thrift运行时库的一部分。因此使用Thrift可以定义服务,并且可以自由更改协议和传输,而无需重新生成代码。 Thrift还包括一个服务器基础结构,用于将协议和传输绑定在一起。有可用的阻塞,非阻塞,单线程和多线程服务器。 堆栈的“底层I / O”部分根据所开发语言而有所不同。对于Java和Python网络I / O,Thrift库利用内置库,而C ++实现使用自己的自定义实现。 数据类型: 基本类型: bool:布尔值,true 或 false,对应 Java 的 boolean byte:8 位有符号整数,对应 Java 的 byte i16:16 位有符号整数,对应 Java 的 short i32:32 位有符号整数,对应 Java 的 int i64:64 位有符号整数,对应 Java 的 long double:64 位浮点数,对应 Java 的 double string:未知编码文本或二进制字符串,对应 Java 的 String 结构体类型: struct:定义公共的对象,类似于 C 语言中的结构体定义,在 Java 中是一个 JavaBean 集合类型: list:对应 Java 的 ArrayList set:对应 Java 的 HashSet map:对应 Java 的 HashMap 异常类型: exception:对应 Java 的 Exception 服务类型: service:对应服务的类 数据传输层Transport TSocket —— 使用阻塞式 I/O 进行传输,是最常见的模式 TFramedTransport —— 使用非阻塞方式,按块的大小进行传输,类似于 Java 中的 NIO,若使用 TFramedTransport 传输层,其服务器必须修改为非阻塞的服务类型 TNonblockingTransport —— 使用非阻塞方式,用于构建异步客户端 数据传输协议Protocol Thrift 可以让用户选择客户端与服务端之间传输通信协议的类别,在传输协议上总体划分为文本 (text) 和二进制 (binary) 传输协议,为节约带宽,提高传输效率,一般情况下使用二进制类型的传输协议为多数,有时还会使用基于文本类型的协议,这需要根据项目 / 产品中的实际需求。 常用协议有以下几种: TBinaryProtocol : 二进制格式. TCompactProtocol : 高效率的、密集的二进制压缩格式 TJSONProtocol : JSON格式 TSimpleJSONProtocol : 提供JSON只写协议, 生成的文件很容易通过脚本语言解析 注意:客户端和服务端的协议要一致。 服务器类型Server TSimpleServer ——单线程服务器端使用标准的阻塞式 I/O,一般用于测试。 TThreadPoolServer —— 多线程服务器端使用标准的阻塞式 I/O,预先创建一组线程处理请求。 TNonblockingServer —— 多线程服务器端使用非阻塞式 I/O,服务端和客户端需要指定 TFramedTransport 数据传输的方式。 THsHaServer —— 半同步半异步的服务端模型,需要指定为: TFramedTransport 数据传输的方式。它使用一个单独的线程来处理网络I/O,一个独立的worker线程池来处理消息。这样,只要有空闲的worker线程,消息就会被立即处理,因此多条消息能被并行处理。 TThreadedSelectorServer —— TThreadedSelectorServer允许你用多个线程来处理网络I/O。它维护了两个线程池,一个用来处理网络I/O,另一个用来进行请求的处理。当网络I/O是瓶颈的时候,TThreadedSelectorServer比THsHaServer的表现要好。 实现逻辑 服务端 实现服务处理接口impl 创建TProcessor 创建TServerTransport 创建TProtocol 创建TServer 启动Server 客户端 创建Transport 创建TProtocol 基于TTransport和TProtocol创建 Client 调用Client的相应方法 ThriftServerDemo实例 新建Maven项目,并且添加thrift依赖包 <dependencies> <dependency> <groupId>org.apache.thrift</groupId> <artifactId>libthrift</artifactId> <version>0.9.3</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.12</version> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-api</artifactId> <version>2.7</version> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-core</artifactId> <version>2.7</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.3</version> <configuration> <source>1.8</source> <target>1.8</target> <encoding>utf-8</encoding> </configuration> </plugin> </plugins> </build> 编写IDL接口并生成接口文件 namespace java thrift.service // 计算类型 - 仅限整数四则运算 enum ComputeType { ADD = 0; SUB = 1; MUL = 2; DIV = 3; } // 服务请求 struct ComputeRequest { 1:required i64 x; 2:required i64 y; 3:required ComputeType computeType; } // 服务响应 struct ComputeResponse { 1:required i32 errorNo; 2:optional string errorMsg; 3:required i64 computeRet; } service ComputeServer { ComputeResponse getComputeResult(1:ComputeRequest request); } 执行编译命令: thrift-0.11.0.exe -r -gen java computeServer.thrift 拷贝生成的Service类文件到IDEA 服务端接口实现 public class ThriftTestImpl implements ComputeServer.Iface { private static final Logger logger = LogManager.getLogger(ThriftTestImpl.class); public ComputeResponse getComputeResult(ComputeRequest request) { ComputeType computeType = request.getComputeType(); long x = request.getX(); long y = request.getY(); logger.info("get compute result begin. [x:{}] [y:{}] [type:{}]", x, y, computeType.toString()); long begin = System.currentTimeMillis(); ComputeResponse response = new ComputeResponse(); response.setErrorNo(0); try { long ret; if (computeType == ComputeType.ADD) { ret = add(x, y); response.setComputeRet(ret); } else if (computeType == ComputeType.SUB) { ret = sub(x, y); response.setComputeRet(ret); } else if (computeType == ComputeType.MUL) { ret = mul(x, y); response.setComputeRet(ret); } else { ret = div(x, y); response.setComputeRet(ret); } } catch (Exception e) { response.setErrorNo(1001); response.setErrorMsg(e.getMessage()); logger.error("exception:", e); } long end = System.currentTimeMillis(); logger.info("get compute result end. [errno:{}] cost:[{}ms]", response.getErrorNo(), (end - begin)); return response; } private long add(long x, long y) { return x + y; } private long sub(long x, long y) { return x - y; } private long mul(long x, long y) { return x * y; } private long div(long x, long y) { return x / y; } } 服务端实现 public class ServerMain { private static final Logger logger = LogManager.getLogger(ServerMain.class); public static void main(String[] args) { try { //实现服务处理接口impl ThriftTestImpl workImpl = new ThriftTestImpl(); //创建TProcessor TProcessor tProcessor = new ComputeServer.Processor<ComputeServer.Iface>(workImpl); //创建TServerTransport,非阻塞式 I/O,服务端和客户端需要指定 TFramedTransport 数据传输的方式 final TNonblockingServerTransport transport = new TNonblockingServerSocket(9999); //创建TProtocol TThreadedSelectorServer.Args ttpsArgs = new TThreadedSelectorServer.Args(transport); ttpsArgs.transportFactory(new TFramedTransport.Factory()); //二进制格式反序列化 ttpsArgs.protocolFactory(new TBinaryProtocol.Factory()); ttpsArgs.processor(tProcessor); ttpsArgs.selectorThreads(16); ttpsArgs.workerThreads(32); logger.info("compute service server on port :" + 9999); //创建TServer TServer server = new TThreadedSelectorServer(ttpsArgs); //启动Server server.serve(); } catch (Exception e) { logger.error(e); } } } 服务端整体代码结构 log4j2.xml配置文件 <?xml version="1.0" encoding="UTF-8"?> <!--日志级别以及优先级排序: OFF > FATAL > ERROR > WARN > INFO > DEBUG > TRACE > ALL --> <!--Configuration后面的status,这个用于设置log4j2自身内部的信息输出,可以不设置,当设置成trace时,你会看到log4j2内部各种详细输出--> <!--monitorInterval:Log4j能够自动检测修改配置 文件和重新配置本身,设置间隔秒数--> <configuration status="INFO" monitorInterval="30"> <!--先定义所有的appender--> <appenders> <!--这个输出控制台的配置--> <console name="Console" target="SYSTEM_OUT"> <!--输出日志的格式--> <PatternLayout pattern="%highlight{[ %p ] [%-d{yyyy-MM-dd HH:mm:ss}] [%l] %m%n}"/> </console> <RollingFile name="RollingFileInfo" fileName="log/log.log" filePattern="log/log.log.%d{yyyy-MM-dd}"> <!-- 只接受level=INFO以上的日志 --> <ThresholdFilter level="info" onMatch="ACCEPT" onMismatch="DENY"/> <PatternLayout pattern="[ %p ] [%-d{yyyy-MM-dd HH:mm:ss}] [ LOGID:%X{logid} ] [%l] %m%n"/> <Policies> <TimeBasedTriggeringPolicy modulate="true" interval="1"/> <SizeBasedTriggeringPolicy/> </Policies> </RollingFile> <RollingFile name="RollingFileError" fileName="log/error.log" filePattern="log/error.log.%d{yyyy-MM-dd}"> <!-- 只接受level=WARN以上的日志 --> <Filters> <ThresholdFilter level="warn" onMatch="ACCEPT" onMismatch="DENY" /> </Filters> <PatternLayout pattern="[ %p ] %-d{yyyy-MM-dd HH:mm:ss} [ %t:%r ] [%l] %m%n"/> <Policies> <TimeBasedTriggeringPolicy modulate="true" interval="1"/> <SizeBasedTriggeringPolicy/> </Policies> </RollingFile> </appenders> <!--然后定义logger,只有定义了logger并引入的appender,appender才会生效--> <loggers> <!--过滤掉spring和mybatis的一些无用的DEBUG信息--> <logger name="org.springframework" level="INFO"></logger> <logger name="org.mybatis" level="INFO"></logger> <root level="all"> <appender-ref ref="Console"/> <appender-ref ref="RollingFileInfo"/> <appender-ref ref="RollingFileError"/> </root> </loggers> </configuration> Jmeter测试类编写 利用JMeter调用Java测试类去调用对应的后台服务,并记住每次调用并获取反馈值的RT,ERR%,只需要按照单线程的方式去实现测试业务,也无需添加各种埋点收集数据 新建一个JavaMaven工程,添加JMeter及thrift依赖包 <dependencies> <dependency> <groupId>org.apache.jmeter</groupId> <artifactId>ApacheJMeter_core</artifactId> <version>4.0</version> </dependency> <dependency> <groupId>org.apache.jmeter</groupId> <artifactId>ApacheJMeter_java</artifactId> <version>4.0</version> </dependency> <dependency> <groupId>org.apache.thrift</groupId> <artifactId>libthrift</artifactId> <version>0.11.0</version> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-api</artifactId> <version>2.11.1</version> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-core</artifactId> <version>2.11.1</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.25</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.7.0</version> <configuration> <source>1.8</source> <target>1.8</target> <encoding>utf-8</encoding> </configuration> </plugin> </plugins> </build> ThriftClient测试类编写 public class ThriftClient { private ComputeServer.Client client = null; private TTransport tTransport = null; public ThriftClient(String ip,int port){ try { TTransport tTransport = new TFramedTransport(new TSocket(ip,port)); tTransport.open(); TProtocol tProtocol = new TBinaryProtocol(tTransport); client = new ComputeServer.Client(tProtocol); } catch (TTransportException e) { e.printStackTrace(); } } public ComputeResponse getResponse(ComputeRequest request){ try { ComputeResponse response = client.getComputeResult(request); return response; } catch (TException e) { e.printStackTrace(); return null; } } public void close(){ if (tTransport != null && tTransport.isOpen()){ tTransport.close(); } } } 注意:需要把编写IDL接口文件拷贝到工程里 新建一个JavaClass,如下例中的TestThriftByJmeter,并继承AbstractJavaSamplerClient。AbstractJavaSamplerClient中默认实现了四个可以覆盖的方法,分别是getDefaultParameters(),setupTest(),runTest()和teardownTest()方法。 getDefaultParameters方法主要用于设置传入界面的参数; setupTest方法为初始化方法,用于初始化性能测试时的每个线程; runTest方法为性能测试时的线程运行体; teardownTest方法为测试结束方法,用于结束性能测试中的每个线程。 编写TestThriftByJmeter测试类 public class TestThriftByJmeter extends AbstractJavaSamplerClient { private ThriftClient client; private ComputeRequest request; private ComputeResponse response; //设置传入界面的参数 @Override public Arguments getDefaultParameters(){ Arguments arguments = new Arguments(); arguments.addArgument("ip","172.16.14.251"); arguments.addArgument("port","9999"); arguments.addArgument("X","0"); arguments.addArgument("Y","0"); arguments.addArgument("type","0"); return arguments; } //初始化方法 @Override public void setupTest(JavaSamplerContext context){ //获取Jmeter中设置的参数 String ip = context.getParameter("ip"); int port = context.getIntParameter("port"); int x = context.getIntParameter("X"); int y = context.getIntParameter("Y"); ComputeType type = ComputeType.findByValue(context.getIntParameter("type")); //创建客户端 client = new ThriftClient(ip,port); //设置request请求 request = new ComputeRequest(x,y,type); super.setupTest(context); } //性能测试线程运行体 @Override public SampleResult runTest(JavaSamplerContext context) { SampleResult result = new SampleResult(); //开始统计响应时间标记 result.sampleStart(); try { long begin = System.currentTimeMillis(); response = client.getResponse(request); long cost = (System.currentTimeMillis() - begin); //打印时间戳差值。Java请求响应时间 System.out.println(response.toString()+" 总计花费:["+cost+"ms]"); if (response == null){ //设置测试结果为fasle result.setSuccessful(false); return result; } if (response.getErrorNo() == 0){ //设置测试结果为true result.setSuccessful(true); }else{ result.setSuccessful(false); result.setResponseMessage("ERROR"); } }catch (Exception e){ result.setSuccessful(false); result.setResponseMessage("ERROR"); e.printStackTrace(); }finally { //结束统计响应时间标记 result.sampleEnd(); } return result; } //测试结束方法 public void tearDownTest(JavaSamplerContext context) { if (client != null) { client.close(); } super.teardownTest(context); } } 特别说明: result.setSamplerLabel("7D"); //设置java Sampler的标题 result.setResponseOK(); //设置响应成功 result.setResponseData(); //设置响应内容 编写测试Run Main方法 public class RunMain { public static void main(String[] args) { Arguments arguments = new Arguments(); arguments.addArgument("ip","172.16.14.251"); arguments.addArgument("port","9999"); arguments.addArgument("X","1"); arguments.addArgument("Y","3"); arguments.addArgument("type","0"); JavaSamplerContext context = new JavaSamplerContext(arguments); TestThriftByJmeter jmeter = new TestThriftByJmeter(); jmeter.setupTest(context); jmeter.runTest(context); jmeter.tearDownTest(context); } } 测试结果通过 使用mvn cleanpackage打包测试代码 使用mvn dependency:copy-dependencies-DoutputDirectory=lib复制所依赖的jar包都会到项目下的lib目录下 复制测试代码jar包到jmeter\lib\ext目录下,复制依赖包到jmeter\lib目录下 这里有两点需要注意: 如果你的jar依赖了其他第三方jar,需要将其一起放到lib/ext下,否则会出现ClassNotFound错误 如果在将jar放入lib/ext后,你还是无法找到你编写的类,且此时你是开着JMeter的,则需要重启一下JMeter 打开Jmeter,在添加Java请求时,注意要选择Jmeter测试类,下面的列表中可以看到参数和默认值。 下面我们将进行性能压测,设置线程组,设置10个并发线程。 服务端日志: