精选列表

搜索[初体验],共233篇文章
优秀的个人博客,低调大师

文盘Rust -- tonic-Rust grpc初体验 | 京东云技术团队

gRPC 是开发中常用的开源高性能远程过程调用(RPC)框架,tonic 是基于 HTTP/2 的 gRPC 实现,专注于高性能、互操作性和灵活性。该库的创建是为了对 async/await 提供一流的支持,并充当用 Rust 编写的生产系统的核心构建块。今天我们聊聊通过使用tonic 调用grpc的的具体过程。 工程规划 rpc程序一般包含server端和client端,为了方便我们把两个程序打包到一个工程里面 新建tonic_sample工程 cargo new tonic_sample Cargo.toml 如下 [package] name = "tonic_sample" version = "0.1.0" edition = "2021" [[bin]] # Bin to run the gRPC server name = "stream-server" path = "src/stream_server.rs" [[bin]] # Bin to run the gRPC client name = "stream-client" path = "src/stream_client.rs" [dependencies] tokio.workspace = true tonic = "0.9" tonic-reflection = "0.9.2" prost = "0.11" tokio-stream = "0.1" async-stream = "0.2" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" rand = "0.7" h2 = { version = "0.3" } anyhow = "1.0.75" futures-util = "0.3.28" [build-dependencies] tonic-build = "0.9" tonic 的示例代码还是比较齐全的,本次我们参考 tonic 的 streaming example。 首先编写 proto 文件,用来描述报文。 proto/echo.proto syntax = "proto3"; package stream; // EchoRequest is the request for echo. message EchoRequest { string message = 1; } // EchoResponse is the response for echo. message EchoResponse { string message = 1; } // Echo is the echo service. service Echo { // UnaryEcho is unary echo. rpc UnaryEcho(EchoRequest) returns (EchoResponse) {} // ServerStreamingEcho is server side streaming. rpc ServerStreamingEcho(EchoRequest) returns (stream EchoResponse) {} // ClientStreamingEcho is client side streaming. rpc ClientStreamingEcho(stream EchoRequest) returns (EchoResponse) {} // BidirectionalStreamingEcho is bidi streaming. rpc BidirectionalStreamingEcho(stream EchoRequest) returns (stream EchoResponse) {} } 文件并不复杂,只有两个 message 一个请求一个返回,之所以选择这个示例是因为该示例包含了rpc中的流式处理,包扩了server 流、client 流以及双向流的操作。 编辑build.rs 文件 use std::{env, path::PathBuf}; fn main() -> Result<(), Box<dyn std::error::Error>> { tonic_build::compile_protos("proto/echo.proto")?; Ok(()) } 该文件用来通过 tonic-build 生成 grpc 的 rust 基础代码 完成上述工作后就可以构建 server 和 client 代码了 stream_server.rs pub mod pb { tonic::include_proto!("stream"); } use anyhow::Result; use futures_util::FutureExt; use pb::{EchoRequest, EchoResponse}; use std::{ error::Error, io::ErrorKind, net::{SocketAddr, ToSocketAddrs}, pin::Pin, thread, time::Duration, }; use tokio::{ net::TcpListener, sync::{ mpsc, oneshot::{self, Receiver, Sender}, Mutex, }, task::{self, JoinHandle}, }; use tokio_stream::{ wrappers::{ReceiverStream, TcpListenerStream}, Stream, StreamExt, }; use tonic::{transport::Server, Request, Response, Status, Streaming}; type EchoResult<T> = Result<Response<T>, Status>; type ResponseStream = Pin<Box<dyn Stream<Item = Result<EchoResponse, Status>> + Send>>; fn match_for_io_error(err_status: &Status) -> Option<&std::io::Error> { let mut err: &(dyn Error + 'static) = err_status; loop { if let Some(io_err) = err.downcast_ref::<std::io::Error>() { return Some(io_err); } // h2::Error do not expose std::io::Error with `source()` // https://github.com/hyperium/h2/pull/462 if let Some(h2_err) = err.downcast_ref::<h2::Error>() { if let Some(io_err) = h2_err.get_io() { return Some(io_err); } } err = match err.source() { Some(err) => err, None => return None, }; } } #[derive(Debug)] pub struct EchoServer {} #[tonic::async_trait] impl pb::echo_server::Echo for EchoServer { async fn unary_echo(&self, req: Request<EchoRequest>) -> EchoResult<EchoResponse> { let req_str = req.into_inner().message; let response = EchoResponse { message: req_str }; Ok(Response::new(response)) } type ServerStreamingEchoStream = ResponseStream; async fn server_streaming_echo( &self, req: Request<EchoRequest>, ) -> EchoResult<Self::ServerStreamingEchoStream> { println!("EchoServer::server_streaming_echo"); println!("\tclient connected from: {:?}", req.remote_addr()); // creating infinite stream with requested message let repeat = std::iter::repeat(EchoResponse { message: req.into_inner().message, }); let mut stream = Box::pin(tokio_stream::iter(repeat).throttle(Duration::from_millis(200))); let (tx, rx) = mpsc::channel(128); tokio::spawn(async move { while let Some(item) = stream.next().await { match tx.send(Result::<_, Status>::Ok(item)).await { Ok(_) => { // item (server response) was queued to be send to client } Err(_item) => { // output_stream was build from rx and both are dropped break; } } } println!("\tclient disconnected"); }); let output_stream = ReceiverStream::new(rx); Ok(Response::new( Box::pin(output_stream) as Self::ServerStreamingEchoStream )) } async fn client_streaming_echo( &self, _: Request<Streaming<EchoRequest>>, ) -> EchoResult<EchoResponse> { Err(Status::unimplemented("not implemented")) } type BidirectionalStreamingEchoStream = ResponseStream; async fn bidirectional_streaming_echo( &self, req: Request<Streaming<EchoRequest>>, ) -> EchoResult<Self::BidirectionalStreamingEchoStream> { println!("EchoServer::bidirectional_streaming_echo"); let mut in_stream = req.into_inner(); let (tx, rx) = mpsc::channel(128); tokio::spawn(async move { while let Some(result) = in_stream.next().await { match result { Ok(v) => tx .send(Ok(EchoResponse { message: v.message })) .await .expect("working rx"), Err(err) => { if let Some(io_err) = match_for_io_error(&err) { if io_err.kind() == ErrorKind::BrokenPipe { eprintln!("\tclient disconnected: broken pipe"); break; } } match tx.send(Err(err)).await { Ok(_) => (), Err(_err) => break, // response was droped } } } } println!("\tstream ended"); }); // echo just write the same data that was received let out_stream = ReceiverStream::new(rx); Ok(Response::new( Box::pin(out_stream) as Self::BidirectionalStreamingEchoStream )) } } #[tokio::main] async fn main() -> Result<(), Box<dyn std::error::Error>> { // 基础server let server = EchoServer {}; Server::builder() .add_service(pb::echo_server::EchoServer::new(server)) .serve("0.0.0.0:50051".to_socket_addrs().unwrap().next().unwrap()) .await .unwrap(); Ok(()) } server 端的代码还是比较清晰的,首先通过 tonic::include_proto! 宏引入grpc定义,参数是 proto 文件中定义的 package 。我们重点说说 server_streaming_echo function 。这个function 的处理流程明白了,其他的流式处理大同小异。首先 通过std::iter::repeat function 定义一个迭代器;然后构建 tokio_stream 在本示例中 每 200毫秒产生一个 repeat;最后构建一个 channel ,tx 用来发送从stream中获取的内容太,rx 封装到response 中返回。 最后 main 函数 拉起服务。 client 代码如下 pub mod pb { tonic::include_proto!("stream"); } use std::time::Duration; use tokio_stream::{Stream, StreamExt}; use tonic::transport::Channel; use pb::{echo_client::EchoClient, EchoRequest}; fn echo_requests_iter() -> impl Stream<Item = EchoRequest> { tokio_stream::iter(1..usize::MAX).map(|i| EchoRequest { message: format!("msg {:02}", i), }) } async fn unary_echo(client: &mut EchoClient<Channel>, num: usize) { for i in 0..num { let req = tonic::Request::new(EchoRequest { message: "msg".to_string() + &i.to_string(), }); let resp = client.unary_echo(req).await.unwrap(); println!("resp:{}", resp.into_inner().message); } } async fn streaming_echo(client: &mut EchoClient<Channel>, num: usize) { let stream = client .server_streaming_echo(EchoRequest { message: "foo".into(), }) .await .unwrap() .into_inner(); // stream is infinite - take just 5 elements and then disconnect let mut stream = stream.take(num); while let Some(item) = stream.next().await { println!("\treceived: {}", item.unwrap().message); } // stream is droped here and the disconnect info is send to server } async fn bidirectional_streaming_echo(client: &mut EchoClient<Channel>, num: usize) { let in_stream = echo_requests_iter().take(num); let response = client .bidirectional_streaming_echo(in_stream) .await .unwrap(); let mut resp_stream = response.into_inner(); while let Some(received) = resp_stream.next().await { let received = received.unwrap(); println!("\treceived message: `{}`", received.message); } } async fn bidirectional_streaming_echo_throttle(client: &mut EchoClient<Channel>, dur: Duration) { let in_stream = echo_requests_iter().throttle(dur); let response = client .bidirectional_streaming_echo(in_stream) .await .unwrap(); let mut resp_stream = response.into_inner(); while let Some(received) = resp_stream.next().await { let received = received.unwrap(); println!("\treceived message: `{}`", received.message); } } #[tokio::main] async fn main() -> Result<(), Box<dyn std::error::Error>> { let mut client = EchoClient::connect("http://127.0.0.1:50051").await.unwrap(); println!("Unary echo:"); unary_echo(&mut client, 10).await; tokio::time::sleep(Duration::from_secs(1)).await; println!("Streaming echo:"); streaming_echo(&mut client, 5).await; tokio::time::sleep(Duration::from_secs(1)).await; //do not mess server println functions // Echo stream that sends 17 requests then graceful end that connection println!("\r\nBidirectional stream echo:"); bidirectional_streaming_echo(&mut client, 17).await; // Echo stream that sends up to `usize::MAX` requests. One request each 2s. // Exiting client with CTRL+C demonstrate how to distinguish broken pipe from // graceful client disconnection (above example) on the server side. println!("\r\nBidirectional stream echo (kill client with CTLR+C):"); bidirectional_streaming_echo_throttle(&mut client, Duration::from_secs(2)).await; Ok(()) } 测试一下,分别运行 server 和 client cargo run --bin stream-server cargo run --bin stream-client 在开发中,我们通常不会再 client 和 server都开发好的情况下才开始测试。通常在开发server 端的时候采用 grpcurl 工具进行测试工作 grpcurl -import-path ./proto -proto echo.proto list grpcurl -import-path ./proto -proto echo.proto describe stream.Echo grpcurl -plaintext -import-path ./proto -proto echo.proto -d '{"message":"1234"}' 127.0.0.1:50051 stream.Echo/UnaryEcho 此时,如果我们不指定 -import-path 参数,执行如下命令 grpcurl -plaintext 127.0.0.1:50051 list 会出现如下报错信息 Failed to list services: server does not support the reflection API 让服务端程序支持 reflection API 首先改造build.rs use std::{env, path::PathBuf}; fn main() -> Result<(), Box<dyn std::error::Error>> { let out_dir = PathBuf::from(env::var("OUT_DIR").unwrap()); tonic_build::configure() .file_descriptor_set_path(out_dir.join("stream_descriptor.bin")) .compile(&["proto/echo.proto"], &["proto"]) .unwrap(); Ok(()) } file_descriptor_set_path 生成一个文件,其中包含为协议缓冲模块编码的 prost_types::FileDescriptorSet 文件。这是实现 gRPC 服务器反射所必需的。 接下来改造一下 stream-server.rs,涉及两处更改。 新增 STREAM_DESCRIPTOR_SET 常量 pub mod pb { tonic::include_proto!("stream"); pub const STREAM_DESCRIPTOR_SET: &[u8] = tonic::include_file_descriptor_set!("stream_descriptor"); } 修改main函数 #[tokio::main] async fn main() -> Result<(), Box<dyn std::error::Error>> { // 基础server // let server = EchoServer {}; // Server::builder() // .add_service(pb::echo_server::EchoServer::new(server)) // .serve("0.0.0.0:50051".to_socket_addrs().unwrap().next().unwrap()) // .await // .unwrap(); // tonic_reflection let service = tonic_reflection::server::Builder::configure() .register_encoded_file_descriptor_set(pb::STREAM_DESCRIPTOR_SET) .with_service_name("stream.Echo") .build() .unwrap(); let addr = "0.0.0.0:50051".parse().unwrap(); let server = EchoServer {}; Server::builder() .add_service(service) .add_service(pb::echo_server::EchoServer::new(server)) .serve(addr) .await?; Ok(()) } register_encoded_file_descriptor_set 将包含编码的 prost_types::FileDescriptorSet 的 byte slice 注册到 gRPC Reflection 服务生成器注册。 再次测试 grpcurl -plaintext 127.0.0.1:50051 list grpcurl -plaintext 127.0.0.1:50051 describe stream.Echo 返回正确结果。 以上完整代码地址 作者:京东科技 贾世闻 来源:京东云开发者社区 转载请注明来源

优秀的个人博客,低调大师

JDK 17 营销初体验 —— 亚毫秒停顿 ZGC 落地实践 | 京东云技术团队

前言 自 2014 年发布以来, JDK 8 一直都是相当热门的 JDK 版本。其原因就是对底层数据结构、JVM 性能以及开发体验做了重大升级,得到了开发人员的认可。但距离 JDK 8 发布已经过去了 9 年,那么这 9 年的时间,JDK 做了哪些升级?是否有新的重大特性值得我们尝试?能否解决一些我们现在苦恼的问题?带着这份疑问,我们进行了 JDK 版本的调研与尝试。 新特性一览 现如今的 JDK 发布节奏变快,每次新出一个版本,我们就会感叹一下:我还在用 JDK 8,现在都 JDK 9、10、11 …… 21 了?然后就会瞅瞅又多了哪些新特性。有一些新特性很香,但考虑一番还是决定放弃升级。主要原因除了新增特性对我们来说改变不大以外,最重要的就是 JDK 9 带来的模块化(JEP 200),导致我们升级十分困难。 模块化的本意是将 JDK 划分为一组模块,这些模块可以在编译时、构建时和运行时组合成各种配置,主要目标是使实现更容易扩展到小型设备,提高安全性和可维护性,并提高应用程序性能。但付出的代价非常大,最直观的影响就是,一些 JDK 内部类不能访问了。 但是除此之外,并没有太多阻塞升级的问题,后续版本都是一些很香的特性: G1 (JEP 248、JEP 307、JEP 344、JEP 345、JEP 346),提供一个支持指定暂停时间、NUMA 感知内存分配的高性能垃圾回收器 ZGC (JEP 333、JEP 376、JEP 377),一个支持 NUMA,暂停时间不应超过 1ms 的垃圾回收器 并发 API 更新(JEP 266),提供 publish-subscribe 框架,支持响应式流发布 - 订阅框架的接口,以及 CompletableFuture 的进一步完善 集合工厂方法(JEP 269),类似 Guava,支持快速创建有初始元素的集合 新版 HTTP 客户端(JEP 321),一个现代化、支持异步、WebSocket、响应式流的 JDK 内置 API 空指针 NPE 直接给出异常方法位置(JEP 358),以前只给代码行数,不告诉哪个方法,一行多个方法的写法一但出现空指针,全靠程序员上下文分析推理 instanceof 的模式匹配(JEP 394),判断类型后再也不用强转了 数据记录类(JEP 395),一个标准的值聚合类,帮助程序员专注于对不可变数据进行建模,实现数据驱动 Switch 表达式语法改进(JEP 361),改变 Switch 又臭又长,易于出错的现状 文本块(JEP 378),支持二维文本块,而不是像现在一样通过 + 号自行拼接 密封类(JEP 409),提供一种限制进行扩展的语法,超类应该可以被广泛访问(因为它代表了用户的重要抽象),但不能广泛扩展(因为它的子类应该仅限于作者已知的子类) 以及一些未提到的底层数据结构优化,JVM 性能提升…… 这么多的优点,恰好能解决我们当前遇到的一些问题,因此我们决定进行 JDK 升级。 升级 升级应用评估 首先自然是要考虑要将哪些应用进行升级。我们根据以下条件进行应用筛选: 第一,也是最重要的一点,此系统可以通过升级,解决现有问题与瓶颈 第二,有完备的机制能够进行快速回归与验证,如完备的单元测试,自动化测试覆盖能力,便捷的生产压测能力等,底层的升级一定要做好完备的验证 第三,技术债务一定要少,不至于在升级过程中遇到一些必须解决的技术债,给升级增加难度 第四,负责升级的人对这个系统都很了解,除核心业务逻辑外,还能够了解引入了哪些中间件与依赖,使用了中间件的哪些功能,中间件升级后,大量不兼容的改动是否对现有系统造成影响 最终我们选取了一个结算页、收银台展示无券支付营销的应用进行升级。此应用特点如下: 作为核心链路的应用之一,接口响应时间要求很高,GC 是其耗时抖动的瓶颈之一 业务正在进行快速迭代发展,随着降本增效策略的落地,营销策略进一步精细化,营销种类、数量、范围进一步增加,给系统性能带来更大的挑战 日常流量不低,整点存在突发流量,并且需要承接大促流量 核心链路覆盖了单元测试,测试环境具备自动化回归能力,预发、生产支持常态化压测与生产流量回放 非 Web 应用,仅使用各个中间件的基础功能,升级出现不兼容的问题小 维护了 3 年,经历过多次重构,历史问题较少,几乎没有技术债务 针对以上特点,此应用很适合进行 JDK 17 升级。此应用基于 JDK 8,SpringBoot 2.0.8,除常见外部基础组件外,还使用以下公司内部中间件:UMP、SGM、DUCC、CDS、JMQ、JSF、R2M。 升级效果 可以先看下我们升级后压测的效果: 纯计算代码不再受 GC 影响 升级前 升级后 版本 吞吐量 平均耗时 最大耗时 JDK 8 G1 99.966% 35.7ms 120ms JDK 17 ZGC 99.999% 0.0254ms 0.106ms 升级后吞吐量几乎不受影响(甚至提升0.01%),GC 平均耗时下降1405 倍,GC 最大耗时下降1132 倍 升级步骤 升级 JDK 编译版本 首先自然是修改 maven 中指定的 JDK 版本,可以先升级到 JDK 11,同时修改 maven 编译插件 <java.version>11</java.version> <maven-compiler-plugin.version>3.8.1</maven-compiler-plugin.version> <maven-source-plugin.version>3.2.1</maven-source-plugin.version> <maven-javadoc-plugin.version>3.3.2</maven-javadoc-plugin.version> <maven-surefire-plugin.version>2.22.2</maven-surefire-plugin.version> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>${maven-compiler-plugin.version}</version> <configuration> <release>${java.version}</release> <encoding>${project.build.sourceEncoding}</encoding> </configuration> </plugin> 引入缺少的依赖 然后就可以进行本地编译了,此时会暴露一些很简单的问题,比如找不到包、类等等。原因就是 JDK 11 移除了 Java EE and CORBA 的模块,需要手动引入。 <!-- JAVAX --> <dependency> <groupId>javax.annotation</groupId> <artifactId>javax.annotation-api</artifactId> <version>1.3.1</version> </dependency> <dependency> <groupId>javax.xml.bind</groupId> <artifactId>jaxb-api</artifactId> <version>2.3.0</version> </dependency> <dependency> <groupId>com.sun.xml.bind</groupId> <artifactId>jaxb-impl</artifactId> <version>2.3.0</version> </dependency> <dependency> <groupId>com.sun.xml.bind</groupId> <artifactId>jaxb-core</artifactId> <version>2.3.0</version> </dependency> <dependency> <groupId>javax.activation</groupId> <artifactId>activation</artifactId> <version>1.0.2</version> </dependency> 升级外部中间件 解决了编译找不到类的问题,接下来就该升级依赖的外部中间件了。对于我们的应用来说,也就是升级 SpringBoot 的版本。支持 JDK 17 的版本是 Spring 5.3,对应 SpringBoot 2.5。 在这里我建议升级至 SpringBoot 2.7,从 2.5 升级至 2.7 几乎没有需要改动的地方,同时高版本的 SprngBoot 所约定的依赖,对 JDK 17 的支持也更好。 建议进行大版本逐个升级,比如我们从 2.0 升级至 2.1。每升一个版本,就要仔细观察依赖版本的变化,掌握每个依赖升级的情况。SpringBoot 的升级其实意味着把所有开源组件约定版本进行大版本升级,接口弃用,破坏性兼容更新较多,需要一一鉴别。 下面以升级 Spring Boot 2.1 为例,说明我们升级的步骤: 首先阅读 Spring Boot 2.1 做了哪些和我们有关的配置改动 禁用了同 Bean 覆盖,开启需要指定spring.main.allow-bean-definition-overriding为true 然后阅读 Spring Boot 2.1 升级了哪些我们用到的依赖 Spring 升级至 5.1 首先阅读 Spring 5.1 做了哪些和我们有关的配置改动 无影响 然后阅读 Spring 5.1 升级了哪些我们用到的依赖 ASM 7.0 同理,阅读升级影响(这种底层依赖的底层依赖,如果仅 ASM 在使用,则无需关心) CGLIB 3.2 同理,阅读升级影响(这种底层依赖的底层依赖,如果仅 ASM 在使用,则无需关心) 最后阅读 Spring 5.1 弃用了哪些和我们有关的配置与依赖 无影响 Lombok 升级至 1.18 阅读改动影响,1.18 Lombok 默认情况下将不再生成私有无参构造函数。可以通过在lombok.config配置文件中设置lombok.noArgsConstructor.extraPrivate=true来启用它 Hibernate 升级至 5.3 阅读改动影响,对我们项目无影响 JUnit 升级至 5.2 阅读改动影响,需要 Surefire 插件升级至2.21.0及以上 最后阅读 Spring Boot 2.1 弃用了哪些和我们有关的配置与依赖 至此,Spring Boot 2.1 升级完毕。接下来分析一次依赖树变化,和升级前的依赖树进行比较,查看依赖变化范围是否全部已知可控。完成后进行 Spring Boot 2.2 的升级。 以下为我们需要注意的升级事项,仅供参考: 可以先升级到 JDK 11,一边启动一边验证。但不要在 JDK 11 使用 ZGC,ZGC 的堆预留与可用堆的比例太大,有时会导致OOM 代码中存在同 Bean,启动时 Springboot 2.0 会自动进行覆盖,高版本开启覆盖,需要指定spring.main.allow-bean-definition-overriding为true Spring Boot 2.2 默认的单元测试 Junit 升级至 5,Junit 4 的单元测试建议进行升级,改动不大 Spring Boot 2.4 不再支持 Junit 4 的单元测试,如果需要可以手动引入 Vintage 引擎 Spring Boot 2.4 配置文件处理逻辑变更,注意阅读更新日志 Spring Boot 2.6 默认禁用 Bean 循环依赖,可以通过将spring.main.allow-circular-references设置为true开启 Spring Boot 2.7 自动配置注册文件变更,spring.factories中的内容需要移动至META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports文件下 spring-boot-properties-migrator可以识别弃用的属性,可以考虑使用 Spring Framework 5.2 需要 Jackson 2.9.7+,注意阅读更新日志 Spring Framework 5.2 注解检索算法重构,所有自定义注释都必须使用@Retention(RetentionPolicy.RUNTIME)进行注释,以便 Spring 能够找到它们 Spring Framework 5.3 修改了很多东西,但都与我们的应用无关,请关注更新日志 ASM 仅单元测试 Mock 在使用,无需特殊关注,做好 JUnit 升级兼容即可 CGLIB 大版本升级以兼容字节码版本为主,关注好变更日志即可 Lombok 即使是小版本升级,也会有破坏性更新,需要仔细阅读每个版本的更新日志,建议少用 Lombok Hibernate 没有太大的破坏性更新,关注好变更日志即可 JUnit 升级主要关注大版本变更,如 4 升 5,小版本没有特别大的破坏性更新,并且是单元测试使用的依赖,可以放心升级或者不升级 Jackson 2.11,对java.util.Date和java.util.Calendar默认格式进行了更改,注意查看更新日志进行兼容 注意字节码增强相关依赖的升级 注意本地缓存升级 注意 Netty 升级,关注更新日志 升级内部中间件 内部中间件升级较为简单,主要是关注 JMQ、JSF 版本。其中 JSF 依赖的 Netty 和 Javassist 等都需要升级,Netty 版本较低会有内存泄漏问题。 我们使用的依赖版本 给大家参考下我们升级后的依赖版本 <properties> <!-- 基础组件版本 Start --> <java.version>17</java.version> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <maven-compiler-plugin.version>3.11.0</maven-compiler-plugin.version> <maven-surefire-plugin.version>2.22.2</maven-surefire-plugin.version> <jacoco-maven-plugin-version>0.8.10</jacoco-maven-plugin-version> <maven-assembly-plugin-version>2.4.1</maven-assembly-plugin-version> <maven-dependency-plugin-version>3.1.0</maven-dependency-plugin-version> <profiles.dir>src/main/profiles</profiles.dir> <springboot-version>2.7.13</springboot-version> <log4j2.version>2.18.0-jdsec.rc2</log4j2.version> <hibernate-validator.version>5.2.4.Final</hibernate-validator.version> <collections-version>3.2.2</collections-version> <collections4.version>4.4</collections4.version> <netty.old.version>3.9.0.Final</netty.old.version> <netty.version>4.1.36.Final</netty.version> <javassist-version>3.29.2-GA</javassist-version> <guava.version>23.0</guava.version> <mysql-connector-java.version>5.1.29</mysql-connector-java.version> <jmh-version>1.36</jmh-version> <caffeine-version>3.1.6</caffeine-version> <fastjson-version>1.2.83-jdsec.rc1</fastjson-version> <fastjson2-version>2.0.35</fastjson2-version> <roaringBitmap.version>0.9.44</roaringBitmap.version> <disruptor.version>3.4.4</disruptor.version> <jaxb-impl.version>2.3.8</jaxb-impl.version> <jaxb-core.version>2.3.0.1</jaxb-core.version> <activation.version>1.1.1</activation.version> <!-- 基础组件版本 End --> <!-- 京东中间件版本 Start --> <ump-version>20221231.1</ump-version> <ducc.version>1.0.20</ducc.version> <jdcds-driver-alg-version>2.21.1</jdcds-driver-alg-version> <jdcds-driver-version>3.8.3</jdcds-driver-version> <jmq.version>2.3.3-RC2</jmq.version> <jsf.version>1.7.6-HOTFIX-T2</jsf.version> <r2m.version>3.3.4</r2m.version> <!-- 京东中间件版本 End --> </properties> JVM 启动参数升级 远程 DEBUG 参数有所变化: JAVA_DEBUG_OPTS=" -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=*:8000 " 打印 GC 日志参数的变化,我们在预发环境开启了日志进行观察: JAVA_GC_LOG_OPTS=" -Xlog:gc*:file=/export/logs/gc.log:time,tid,tags:filecount=10:filesize=10m " 使用了 ZGC 的部分 JVM 参数: JAVA_MEM_OPTS=" -server -Xmx12g -Xms12g -XX:MaxMetaspaceSize=256m -XX:MetaspaceSize=256m -XX:MaxDirectMemorySize=2048m -XX:+UseZGC -XX:ZAllocationSpikeTolerance=3 -XX:ParallelGCThreads=8 -XX:CICompilerCount=3 -XX:-RestrictContended -XX:+AlwaysPreTouch -XX:+ExplicitGCInvokesConcurrent -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/export/logs " 内部依赖需要访问 JDK 模块,如 UMP、JSF、虫洞、MyBatis、DUCC、R2M、SGM: if [[ "$JAVA_VERSION" -ge 11 ]]; then SGM_OPTS="${SGM_OPTS} --add-opens jdk.management/com.sun.management.internal=ALL-UNNAMED --add-opens java.management/sun.management=ALL-UNNAMED --add-opens java.management/java.lang.management=ALL-UNNAMED " UMP_OPT=" --add-opens java.base/sun.net.util=ALL-UNNAMED " JSF_OPTS=" --add-opens java.base/sun.util.calendar=ALL-UNNAMED --add-opens java.base/java.util=ALL-UNNAMED --add-opens java.base/java.math=ALL-UNNAMED" WORMHOLE_OPT=" --add-opens java.base/sun.security.action=ALL-UNNAMED " MB_OPTS=" --add-opens java.base/java.lang=ALL-UNNAMED " DUC_OPT=" --add-opens java.base/java.net=ALL-UNNAMED " R2M_OPT=" --add-opens java.base/java.time=ALL-UNNAMED " fi 启动后完整的启动参数如下: -javaagent:/export/package/sgm-probe-java/sgm-probe-5.9.5-product/sgm-agent-5.9.5.jar -Dsgm.server.address=http://sgm.jdfin.local -Dsgm.app.name=market-reduction-center -Dsgm.agent.sink.http.connection.requestTimeout=2000 -Dsgm.agent.sink.http.connection.connectTimeout=2000 -Dsgm.agent.sink.http.minAlive=1 -Dsgm.agent.virgo.address=10.24.216.198:8999,10.223.182.52:8999,10.25.217.95:8999 -Dsgm.agent.zone=m6 -Dsgm.agent.group=m6-discount -Dsgm.agent.tenant=jdjr -Dsgm.deployment.platform=jdt-jdos --add-opens=jdk.management/com.sun.management.internal=ALL-UNNAMED --add-opens=java.management/sun.management=ALL-UNNAMED --add-opens=java.management/java.lang.management=ALL-UNNAMED -DJDOS_DATACENTER=JXQ -Ddeploy.app.name=jdos_kj_market-reduction-center -Ddeploy.app.id=30005051 -Ddeploy.instance.id=0 -Ddeploy.instance.name=server -Djava.awt.headless=true -Djava.net.preferIPv4Stack=true -Djava.util.Arrays.useLegacyMergeSort=true -Dog4j2.contextSelector=org.apache.logging.log4j.core.async.AsyncLoggerContextSelector -Dlog4j2.AsyncQueueFullPolicy=Discard -Xmx12g -Xms12g -XX:MaxMetaspaceSize=256m -XX:MetaspaceSize=256m -XX:MaxDirectMemorySize=2048m -XX:+UseZGC -XX:ZAllocationSpikeTolerance=3 -XX:ParallelGCThreads=8 -XX:CICompilerCount=3 -XX:-RestrictContended -XX:+AlwaysPreTouch -XX:+ExplicitGCInvokesConcurrent -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/export/logs --add-opens=java.base/sun.net.util=ALL-UNNAMED --add-opens=java.base/sun.util.calendar=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.math=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.time=ALL-UNNAMED -Dloader.path=/export/package/jdos_kj_market-reduction-center/conf 系统验证 系统可以成功启动后,就可以进行功能验证。有几个验证重点与方法: 首先可以通过单元测试快速进行系统全面回归,避免出现 JDK API、中间件 API 变更导致的业务异常 部署到测试环境,验证各个中间件是否正常,如 DUCC 开关下发,MQ 收发,JSF 接口调用等等,系统中所有用到的中间件都需要一一验证 然后可以开始进行核心业务的验证,这时候可以利用测试同学的测试自动化能力加人工补充场景,快速进行核心业务回归。其中研发需要观察系统被调用时的所有异常日志,包括警告,明确每条日志产生的原因 验证完成后,可以部署到联调环境,利用外部同事联调时的请求进一步进行验证 充分在测试环境观察后,部署至预发环境,利用外部同事联调时的请求进一步进行验证,并进行常态化压测,验证优化效果与瓶颈 经过预发长时间验证,没有问题后,部署一台生产,通过回放生产流量进一步进行验证 回放流量无异常后,开始承接生产流量,按接口开量,进行若干周的观察 逐步切量,直到全量上线 GC 调优 ZGC 介绍 如图所示,ZGC 的定位是一个最大暂停时间小于 1ms,且能够处理大小从 8MB 到 16TB 的堆,并且易于调优的垃圾回收器。ZGC 只有三个 STW 阶段,具体流程网上有大量类似文章,这里不做详细介绍。 优化方向 目前我们的应用日常使用 G1 约 30ms 的 GC 停顿时间,不到 1 分钟就会触发一次,大促时频率更高,暂停时间更长,导致接口性能波动较大。随着业务发展,为了优化系统我们大量应用了本地缓存,导致存活对象较多。ZGC 暂停时间不随堆、活动集或根集大小而增加,且极低的 GC 时间正是我们需要的特性,因此决定使用 ZGC。 ZGC 作为一个现代化 GC,没有必要做过多的优化,默认配置已经可以解决 99.9% 的场景。但是我们的应用会承接大促流量,根据观察,瞬时流量激增时 GC 时机较晚,因此应对突发流量是我们 ZGC 调优的一个目标,其他属性不做任何调整。 优化措施 ZGC 的一个优化措施就是足够大的堆,一般来说,给 ZGC 的内存越多越好,但我们也没必要浪费,通过压测观察 GC 日志,取得一个合适的值即可。我们只要保证: 堆可以容纳应用程序产生的实时垃圾 堆中有足够的空间,以便在 GC 运行时,为新的垃圾分配提供空间 因此,我们将机器升级成 8C 16G 配置,观察 GC 日志根据应用情况调整内存占用配置,最终设定为-Xmx12g -Xms12g -XX:MaxMetaspaceSize=256m -XX:MetaspaceSize=256m -XX:MaxDirectMemorySize=2048m,提升 ZGC 的效果。 剩下的其他优化措施则视情况而定,可以调整触发 GC 的时机,也可以改为基于固定时间间隔触发 GC。 我们略微提升了触发时机,-XX:ZAllocationSpikeTolerance=3(默认为 2)应对突发流量。 CICompilerCount ParallelGCThreads一个是提升 JIT 编译速度,一个是垃圾收集器并行阶段使用的线程数,根据实际情况略微增加,牺牲一点点 CPU 使用率,提升下效率。 另外还可以开启Large Pages进一步提升性能。这一步我们没有做,因为现在部署方式为一台物理机 Docker 混部署。开启需要修改内核,影响宿主机的其他镜像。 总结 至此,调优完成,目前我们已在线上跑了一个多月,每周都有三次常态化压测,一切正常。 以上升级心得分享给大家,希望对各位有所帮助。 作者:京东科技 张天赐 来源:京东云开发者社区

优秀的个人博客,低调大师

每日一博 | RocketMQ 事务消息初体验

事务消息是 RocketMQ 的高级特性之一 。这篇文章,笔者会从应用场景、功能原理、实战例子三个模块慢慢为你揭开事务消息的神秘面纱。 1 应用场景 举一个电商场景的例子:用户购物车结算时,系统会创建支付订单。 用户支付成功后支付订单的状态会由未支付修改为支付成功,然后系统给用户增加积分。 通常我们会使用普通消费方案,该方案能够发挥 MQ 的优势:异步和解耦 , 同时架构设计非常简单。 用户购物车结算时,系统创建支付订单; 支付成功后,更新订单的状态从未支付修改为支付成功; 发送一条普通消息到消息队列服务端; 积分服务消费消息,添加积分记录。 但该方案有个非常直观的缺点:容易出现不一致的现象。 假如先发送消息,后修改订单状态,消息发送成功,订单没有执行成功,需要回滚整个事务(订单数据事务回滚,积分服务消费时,需要先反查事务状态,若事务提交,才插入积分记录)。 假如先修改订单状态,后发送消息,订单状态修改成功,但消息发送失败,需要补偿操作才能保持最终一致。 假如先修改订单,后发送消息,订单状态修改成功,但消息发送超时,此时无法判断需要回滚订单还是提交订单变更。 我们看到,为了完善普通消费方案,业务层还需要做到两点:补偿机制和提供事务状态查询接口。 要做到这两点,难不难呢? 不难,但是业务层代码会比较混乱,更优的方案还是得从中间件层面解决。 2 功能原理 RocketMQ 事务消息是支持在分布式场景下保障消息生产和本地事务的最终一致性。交互流程如下图所示: 1、生产者将消息发送至 Broker 。 2、Broker 将消息持久化成功之后,向生产者返回 Ack 确认消息已经发送成功,此时消息被标记为"暂不能投递",这种状态下的消息即为半事务消息。 3、生产者开始执行本地事务逻辑。 4、生产者根据本地事务执行结果向服务端提交二次确认结果( Commit 或是 Rollback ),Broker 收到确认结果后处理逻辑如下: 二次确认结果为 Commit :Broker 将半事务消息标记为可投递,并投递给消费者。 二次确认结果为 Rollback :Broker 将回滚事务,不会将半事务消息投递给消费者。 5、在断网或者是生产者应用重启的特殊情况下,若 Broker 未收到发送者提交的二次确认结果,或 Broker 收到的二次确认结果为 Unknown 未知状态,经过固定时间后,服务端将对消息生产者即生产者集群中任一生产者实例发起消息回查。 生产者收到消息回查后,需要检查对应消息的本地事务执行的最终结果。 生产者根据检查到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤4对半事务消息进行处理。 笔者认为事务消息的精髓在于: 本地事务执行成功,消费者才能消费事务消息; 消息回查本身就是补偿机制的实现,事务生产者需提供了事务状态查询接口。 3 实战例子 为了便于大家理解事务消息 ,笔者新建一个工程用于模拟支付订单创建、支付成功、赠送积分的流程。 首先,我们创建一个真实的订单主题:order-topic 。 然后在数据库中创建三张表 订单表、事务日志表、积分表。 最后我们创建一个 Demo 工程,生产者模块用于创建支付订单、修改支付订单成功,消费者模块用于新增积分记录。 接下来,我们展示事务消息的实现流程。 <strong style="font-size: 15px;line-height: inherit;color: black;">1、创建支付订单</strong> 调用订单生产者服务创建订单接口 ,在 t_order 表中插入一条支付订单记录。 <strong style="font-size: 15px;line-height: inherit;color: black;">2、调用生产者服务修改订单状态接口</strong> 接口的逻辑就是执行事务生产者的 sendMessageInTransaction 方法。 生产者端需要配置事务生产者和事务监听器。 发送事务消息的方法内部包含三个步骤 : 事务生产者首先发送半事务消息,发送成功后,生产者才开始执行本地事务逻辑。 事务监听器实现了两个功能:执行本地事务和供 Broker 回查事务状态 。 执行本地事务的逻辑内部就是执行 orderService.updateOrder 方法。 方法执行成功则返回 LocalTransactionState.COMMIT_MESSAGE , 若执行失败则返回 LocalTransactionState.ROLLBACK_MESSAGE 。 需要注意的是: orderService.updateOrder 方法添加了事务注解,并将修改订单状态和插入事务日志表放进一个事务内,避免订单状态和事务日志表的数据不一致。 最后,生产者根据本地事务执行结果向 Broker 提交二次确认结果。 Broker 收到生产者确认结果后处理逻辑如下: 二次确认结果为 Commit :Broker 将半事务消息标记为可投递,并投递给消费者。 二次确认结果为 Rollback :Broker 将回滚事务,不会将半事务消息投递给消费者。 <strong style="font-size: 15px;line-height: inherit;color: black;">3、积分消费者消费消息,添加积分记录</strong > 当 Broker 将半事务消息标记为可投递时,积分消费者就可以开始消费主题 order-topic 的消息了。 积分消费者服务,我们定义了消费者组名,以及订阅主题和消费监听器。 在消费监听器逻辑里,幂等非常重要 。当收到订单信息后,首先判断该订单是否有积分记录,若没有记录,才插入积分记录。 而且我们在创建积分表时,订单编号也是唯一键,数据库中也必然不会存在相同订单的多条积分记录。 4 总结 RocketMQ 事务消息是支持在分布式场景下保障消息生产和本地事务的最终一致性。 编写一个实战例子并不复杂,但使用事务消息时需要注意如下三点: 1、事务生产者和消费者共同协作才能保证业务数据的最终一致性; 2、事务生产者需要实现事务监听器,并且保存事务的执行结果(比如事务日志表) ; 3、消费者要保证幂等。消费失败时,通过重试、告警+人工介入等手段保证消费结果正确。 本文涉及到的工程源码,笔者已上传到 Github ,感兴趣的同学可以了解一下,若有疑问直接加笔者好友,一起交流技术,一起成长。 笔者会在后续的文章里,详细解析事务消息的实现原理,敬请期待。 实战代码地址: https://github.com/makemyownlife/rocketmq4-learning 如果我的文章对你有所帮助,还请帮忙点赞、在看、转发一下,你的支持会激励我输出更高质量的文章,非常感谢!

优秀的个人博客,低调大师

自动化接口回归测试神器 AREX 使用初体验

AREX 是一款开源的基于流量录制回放技术的自动化回归测试平台,目前官方文档仅介绍了如何在本地及私有云部署,本篇文章分享如何 AWS 环境下快速搭建 AREX 服务,并使用 AWS 的 DocumentDB 作为数据库替换官方默认的 MongoDB,使用 ElastiCache 替换默认的 Redis。 安装 AREX 使用前需要注册 AWS 账号并对相应概念有一定了解,详细可参考 AWS 官方文档。 步骤一:准备一台 EC2 用于部署 AREX 操作系统选择 Amazon Linux 2 AMI。(Amazon Linux 2023 AMI 有部分应用安装不上,所以这里选择使用稳定的低版本操作系统) 如果只是进行试用,建议最小配置选择 t3.large(2C8G)的机器。 密钥对名称可以按照自己的需求创建并使用(方便快速链接到自己的 EC2)。后续创建 DocumentDB 的时候也可以共用该秘钥。 存储配置默认 8G 即可。 步骤二:创建 Amazon DocumentDB 集群 注意:在使用 DocumentDB 时,需要将其部署在与之关联的 EC2 实例所在的同一个虚拟私有云(VPC)中,保证它们之间能够进行通信。 创建 2 个 db.t3.medium 类型的实例,1 主 1 从两台机器,引擎版本选择 5.0.0。 安装完成后,配置对应的入站规则以允许外部网络通过连接串进行访问。使用 mongo shell 验证连接串是否可用。 执行 show dbs 命令,出现上图中红框部分则表示 DocumentDB 创建成功。 步骤四:准备一台 ElastiCache 并创建 Redis 集群 在 AWS 控制台中搜索 ElastiCache 并创建 Redis 集群。注意:ElastiCache 必须与上述创建的 EC2 在同一个 VPC 中。 选择配置并创建新集群,试用阶段可以先禁用集群模式,正式使用时按需修改配置。 引擎版本选择 6.2,节点类型选择 cache.t3.micro,副本数量设置为 0。 选择创建新的子网组,选择和上述 EC2 同一个 VPC ID。 在 EC2 上通过 redis-cli 连接到 ElastiCache 检查是否连通。如果连接不上可以看下安全组对应的入站规则,根据自己网络情况进行配置即可。 步骤五:通过 docker-compose 安装 AREX AREX 的安装非常简单,使用 Docker-Compose 命令,即可一键安装 AREX 所有基础服务组件。 这里简单介绍一下 AREX 的工作原理及各个服务组件。 AREX 回归测试的工作原理是利用 AREX Java Agent 将生产环境中 Java 应用的数据流量和请求信息进行采样录制,并将这些信息发送给 AREX 数据存取服务(Storage Service),由数据存取服务导入数据库(mongoDB)中进行存储。当需要进行回放时,AREX 调度服务(Schedule Service)将会通过 Storage Service 从数据库中提取被测应用的录制数据,然后向目标验证服务发送接口请求。同时,Java Agent 会将录制的外部依赖的响应进行 Mock,代替真正的数据访问,传达给被测应用,目标服务处理完成请求逻辑后返回响应报文。随后调度服务会将录制的响应报文与回放的响应报文进行比对,验证系统逻辑正确性,并将比对结果推送给分析服务(Report Service),由其生成完整的回放测试报告,供测试人员分析录制回放差异。 首先,通过 git 命令克隆 AREX 仓库。 git clone --depth 1 https://github.com/arextest/deployments.git cd deployments 配置 DocumentDB、ElastiCache 如要使用 AWS 的 DocumentDB 作为数据库替换官方默认的 MongoDB,并使用 ElastiCache 替换默认的 Redis,只需修改配置文件 docker-compose.yml 中的连接串,把文件中所有 MongoDB 的连接串都替换成 DocumentDB 的连接串,所有 Redis 的连接串都替换成 ElastiCache 的连接串即可。 步骤六:启动 AREX 配置完成后,执行 docker-compose 一键启动 AREX 服务。 docker-compose up -d 服务启动后,在没有修改端口配置的情况下,直接访问 8088 端口进入 AREX 前端页面。 差异分析 在实际使用过程中,对于一个复杂的线上应用,业务场景复杂,录制及回放的用例数量巨大,如何分析差异点及排查问题成为难点。 为了减轻使用者分析差异时的工作量,AREX 对可能存在的大量差异点,使用聚合的方法进行了大幅度的简化。 差异场景聚合分析 首先是差异场景聚合,AREX 对差异场景相同的多个用例进行了聚合展示。在介绍差异场景聚合逻辑之前,先大概了解一下 AREX 中用例(Case)的基本概念。 AREX 用例概念 在 AREX 中,一个用例通常由多个步骤组成,每个步骤包含了一个请求和一个响应。请求可以是主入口,也可以是外部调用(包括 DB、Redis 等)。在每个步骤中,都会记录请求的参数和响应结果等信息,用于后续的对比。如果录制与回放时的主入口响应,以及外部依赖的请求均无差异,则视为该用例回放通过。 这里的主入口和外部调用我们称之为 Mock 的类型。 Mock 的差异类型 每个 Mock 类型的对比差异类型会被分为三种情况: new call:这种差异类型表示该主入口或外部调用的 Mock 在录制时不存在,但在回放时存在,即新增了调用,通常是因为有新功能的迭代。 call missing:表示在录制时存在,但在回放时缺失了调用,通常是因为项目进行了优化,移除了某些不必要的调用关系。 value diff:表示在录制和回放时都存在,但在对比过程中某些节点有差异。后续章节中会具体介绍如何分析这些差异。 差异场景聚合 在进行流量回放测试时,针对可能出现的用例数量较大的情况,AREX 会通过一些聚合的操作,将相似的用例进行合并,以减少差异点的数量,便于用户对数据进行分析。如下图所示,聚合相似差异场景后,每个差异场景下仅选取一条用例作为展示。 场景聚合逻辑 有了上述 Mock 差异类型的概念,接下来介绍下差异用例场景聚合的逻辑。差异用例场景聚合是为了将具有相同 Mock 类型和差异类型的用例聚合在一起形成一个场景,从而帮助用户更快速地了解整个场景中的用例情况,减少用户需要分析的用例数量,提高分析用例的效率。 首先,根据 Mock 的 type 类型和差异类型的组合,AREX 会生成一个唯一的键,将所有用例分类聚合到这些键中,形成一个大分类。如上图中标注的 ①大分类。 其次,每个大分类中的用例都会再根据具体的 Mock 和差异类型的排列生成一个子唯一键,进一步对用例进行分类。这样做的目的是为了更加细致地分类,以便更快速地分析差异用例,具体可见上图中 ②小分类 的示例。每个小分类中有多少个用例数量会标记在该分类的最前面。 差异点分析 其次在每个差异场景中,AREX 对相似的差异节点也进行了聚合展示。 差异点聚合逻辑 在某些大报文的场景下,有些大数组中的差异点会非常多,一方面不利于前端展示,另一方面增加了使用者分析差异点的复杂度。 为了解决这个问题,AREX 将差异点按照模糊路径进行聚合。这里的模糊路径指的是不带数组下标的 JSON 节点路径。例如,一个 JSON 对象中有一个名为 “items” 的数组,数组中有多个元素 “items[0]”、“items[1]”、“items[2]” 等。在模糊路径中,这些路径会被合并为 “items”,从而实现聚合。 新增节点 在比较两个 JSON 对象时,如果发现某一边 JSON 对象中存在一个节点,但是另一边的对象中不存在该节点,那么我们就认为该节点是“新增节点”,如下图中,仅出现在右侧对象中的 cardpayfee 节点。同时为了方便用户查看,AREX 会将新增节点的所有祖先节点都标注为褐色,以突出显示该节点的位置。这样,用户可以清晰地看到该节点在另一侧对象中的位置,从而更好地分析问题。 差异节点 如果左右两边对象都存在某节点,但节点的值不一致,就可以认为该节点是“差异节点”,AREX 会使用蓝色背景高亮显示这些节点。不同于新增节点,这种类型的差异点只会出现在叶子节点上,因为只有叶子节点才有具体的值可以进行比较。 AREX 文档:http://arextest.com/zh-Hans/docs/intro/ AREX 官网:http://arextest.com/ AREX GitHub:https://github.com/arextest AREX 官方 QQ 交流群:656108079

优秀的个人博客,低调大师

RocketMQ 事务消息初体验

事务消息是 RocketMQ 的高级特性之一 。这篇文章,笔者会从应用场景、功能原理、实战例子三个模块慢慢为你揭开事务消息的神秘面纱。 1 应用场景 举一个电商场景的例子:用户购物车结算时,系统会创建支付订单。 用户支付成功后支付订单的状态会由未支付修改为支付成功,然后系统给用户增加积分。 通常我们会使用普通消费方案,该方案能够发挥 MQ 的优势:异步和解耦 , 同时架构设计非常简单。 用户购物车结算时,系统创建支付订单; 支付成功后,更新订单的状态从未支付修改为支付成功; 发送一条普通消息到消息队列服务端; 积分服务消费消息,添加积分记录。 但该方案有个非常直观的缺点:容易出现不一致的现象。 假如先发送消息,后修改订单状态,消息发送成功,订单没有执行成功,需要回滚整个事务(订单数据事务回滚,积分服务消费时,需要先反查事务状态,若事务提交,才插入积分记录)。 假如先修改订单状态,后发送消息,订单状态修改成功,但消息发送失败,需要补偿操作才能保持最终一致。 假如先修改订单,后发送消息,订单状态修改成功,但消息发送超时,此时无法判断需要回滚订单还是提交订单变更。 我们看到,为了完善普通消费方案,业务层还需要做到两点:补偿机制和提供事务状态查询接口。 要做到这两点,难不难呢? 不难,但是业务层代码会比较混乱,更优的方案还是得从中间件层面解决。 2 功能原理 RocketMQ 事务消息是支持在分布式场景下保障消息生产和本地事务的最终一致性。交互流程如下图所示: 1、生产者将消息发送至 Broker 。 2、Broker 将消息持久化成功之后,向生产者返回 Ack 确认消息已经发送成功,此时消息被标记为"暂不能投递",这种状态下的消息即为半事务消息。 3、生产者开始执行本地事务逻辑。 4、生产者根据本地事务执行结果向服务端提交二次确认结果( Commit 或是 Rollback ),Broker 收到确认结果后处理逻辑如下: 二次确认结果为 Commit :Broker 将半事务消息标记为可投递,并投递给消费者。 二次确认结果为 Rollback :Broker 将回滚事务,不会将半事务消息投递给消费者。 5、在断网或者是生产者应用重启的特殊情况下,若 Broker 未收到发送者提交的二次确认结果,或 Broker 收到的二次确认结果为 Unknown 未知状态,经过固定时间后,服务端将对消息生产者即生产者集群中任一生产者实例发起消息回查。 生产者收到消息回查后,需要检查对应消息的本地事务执行的最终结果。 生产者根据检查到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤4对半事务消息进行处理。 笔者认为事务消息的精髓在于: 本地事务执行成功,消费者才能消费事务消息; 消息回查本身就是补偿机制的实现,事务生产者需提供了事务状态查询接口。 3 实战例子 为了便于大家理解事务消息 ,笔者新建一个工程用于模拟支付订单创建、支付成功、赠送积分的流程。 首先,我们创建一个真实的订单主题:order-topic 。 然后在数据库中创建三张表 订单表、事务日志表、积分表。 最后我们创建一个 Demo 工程,生产者模块用于创建支付订单、修改支付订单成功,消费者模块用于新增积分记录。 接下来,我们展示事务消息的实现流程。 <strong style="font-size: 15px;line-height: inherit;color: black;">1、创建支付订单</strong> 调用订单生产者服务创建订单接口 ,在 t_order 表中插入一条支付订单记录。 <strong style="font-size: 15px;line-height: inherit;color: black;">2、调用生产者服务修改订单状态接口</strong> 接口的逻辑就是执行事务生产者的 sendMessageInTransaction 方法。 生产者端需要配置事务生产者和事务监听器。 发送事务消息的方法内部包含三个步骤 : 事务生产者首先发送半事务消息,发送成功后,生产者才开始执行本地事务逻辑。 事务监听器实现了两个功能:执行本地事务和供 Broker 回查事务状态 。 执行本地事务的逻辑内部就是执行 orderService.updateOrder 方法。 方法执行成功则返回 LocalTransactionState.COMMIT_MESSAGE , 若执行失败则返回 LocalTransactionState.ROLLBACK_MESSAGE 。 需要注意的是: orderService.updateOrder 方法添加了事务注解,并将修改订单状态和插入事务日志表放进一个事务内,避免订单状态和事务日志表的数据不一致。 最后,生产者根据本地事务执行结果向 Broker 提交二次确认结果。 Broker 收到生产者确认结果后处理逻辑如下: 二次确认结果为 Commit :Broker 将半事务消息标记为可投递,并投递给消费者。 二次确认结果为 Rollback :Broker 将回滚事务,不会将半事务消息投递给消费者。 <strong style="font-size: 15px;line-height: inherit;color: black;">3、积分消费者消费消息,添加积分记录</strong > 当 Broker 将半事务消息标记为可投递时,积分消费者就可以开始消费主题 order-topic 的消息了。 积分消费者服务,我们定义了消费者组名,以及订阅主题和消费监听器。 在消费监听器逻辑里,幂等非常重要 。当收到订单信息后,首先判断该订单是否有积分记录,若没有记录,才插入积分记录。 而且我们在创建积分表时,订单编号也是唯一键,数据库中也必然不会存在相同订单的多条积分记录。 4 总结 RocketMQ 事务消息是支持在分布式场景下保障消息生产和本地事务的最终一致性。 编写一个实战例子并不复杂,但使用事务消息时需要注意如下三点: 1、事务生产者和消费者共同协作才能保证业务数据的最终一致性; 2、事务生产者需要实现事务监听器,并且保存事务的执行结果(比如事务日志表) ; 3、消费者要保证幂等。消费失败时,通过重试、告警+人工介入等手段保证消费结果正确。 本文涉及到的工程源码,笔者已上传到 Github ,感兴趣的同学可以了解一下,若有疑问直接加笔者好友,一起交流技术,一起成长。 笔者会在后续的文章里,详细解析事务消息的实现原理,敬请期待。 实战代码地址: https://github.com/makemyownlife/rocketmq4-learning 如果我的文章对你有所帮助,还请帮忙点赞、在看、转发一下,你的支持会激励我输出更高质量的文章,非常感谢!

优秀的个人博客,低调大师

JDK 17 + ZGC 初体验

1 前言 垃圾回收器的暂停问题一直是Java工程师关注的重点,特别是对实时响应要求较高的服务来说,CMS和G1等主流垃圾回收器的数十毫秒乃至上百毫秒的暂停时间相当致命。此外,调优门槛也相对较高,需要对垃圾回收器的内部机制有一定的了解,才能够进行有效的调优。 为了解决此类问题,JDK 11开始推出了一种低延迟垃圾回收器ZGC。ZGC使用了一些新技术和优化算法,可以将GC暂停时间控制在10毫秒以内,而在JDK 17的加持下,ZGC的暂停时间甚至可以控制在亚毫秒级别! 2 ZGC ZGC相关介绍、原理,网上已经有很多类似文章,这里只做简单介绍。 2.1 设计目标 ZGC 最初在 JDK 11 中作为实验性功能引入,并在 JDK 15 中宣布为生产就绪。作为一款低延迟垃圾收集器,旨在满足以下目标: 8MB到16TB的堆大小支持 10ms最大GC暂时 最糟糕的情况下吞吐量会降低15%(低延时换吞吐量很值,吞吐量扩容即可解决) 2.2 ZGC 内存分布 ZGC与传统的CMS、G1不同、它没有分代的概念,只有类似G1的Region概率,ZGC 的 Region可以具有如下图所示的大中下三类容量: 小型 Region(Small Region):容量固定为2MB,用于放置小于 256KB的小对象。 中型 Region(Medium Region):容量固定为 32MB,用于放置大于 256KB但是小于 4MB的对象。 大型 Region(Large Region):容量不固定,可以动态变化,但必须为 2MB的整数倍,用于放置 4MB或以上的大对象。每个大型 Region中会存放一个大对象,这也预示着虽然名字叫“大型 Region”,但它的实际容量完全有可能小于中型Region,最小容量可低至4MB。大型 Region在ZGC的实现中是不会被重分配的(重分配是ZGC的一种处理动作,用于复制对象的收集器阶段)因为复制大对象的代价非常高。 2.3 GC工作过程 与CMS中的ParNew和G1类似,ZGC也采用标记-复制算法,不过ZGC通过着色指针和读屏障技术,解决了转移过程中准确访问对象的问题,在标记、转移和重定位阶段几乎都是并发执行的,这是ZGC实现停顿时间小于10ms目标的最关键原因。 从上图中可以看出,ZGC只有三个STW阶段:初始标记,再标记,初始转移。 具体转移过程,网上有大量类似文章,这里不做详细介绍,大家有兴趣可以参考以下文章: 新一代垃圾回收器ZGC的探索与实践 ZGC 最新一代垃圾回收器 | 程序员进阶 3 为什么选择JDK17呢? JDK 17于9月14日发布,是一个长期支持(LTS)版本,这意味着它将在很多年内得到支持和更新。这也是第一个LTS版本,其中包含了一个可用于生产环境的ZGC版本。回顾一下,ZGC的实验版本已经包含在JDK 11(之前的LTS版本)中,而第一个可用于生产环境的ZGC版本出现在JDK 15(一个非LTS版本)中。 4 升级过程 从JDK8+G1升级到JDK17+ZGC,主要是在代码层面和JVM启动参数层面的做适配。 4.1 JDK下载 首先jdk17选择的是openjdk,下载地址:https://jdk.java.net/archive/,选择版本17 GA 4.2 代码适配 JDK11移除了 Java EE and CORBA 的模块 项目中如果用到javax.annotation.*、javax.xml.*等等开头的包,需要手动引入对应依赖 <dependency> <groupId>javax.annotation</groupId> <artifactId>javax.annotation-api</artifactId> </dependency> <dependency> <groupId>javax.xml.bind</groupId> <artifactId>jaxb-api</artifactId> </dependency> <dependency> <groupId>com.sun.xml.bind</groupId> <artifactId>jaxb-core</artifactId> </dependency> <dependency> <groupId>com.sun.xml.bind</groupId> <artifactId>jaxb-impl</artifactId> </dependency> maven相关依赖版本升级 <!-- 仅供参考 --> <maven-compiler-plugin.version>3.8.1</maven-compiler-plugin.version> <maven-assembly-plugin.version>3.3.0</maven-assembly-plugin.version> <maven-resources-plugin.version>3.2.0</maven-resources-plugin.version> <maven-jar-plugin.version>3.2.0</maven-jar-plugin.version> <maven-surefire-plugin.version>3.0.0-M5</maven-surefire-plugin.version> <maven-deploy-plugin.version>3.0.0-M1</maven-deploy-plugin.version> <maven-release-plugin.version>3.0.0-M1</maven-release-plugin.version> <maven-site-plugin.version>3.9.1</maven-site-plugin.version> <maven-enforcer-plugin.version>3.0.0-M2</maven-enforcer-plugin.version> <maven-project-info-reports-plugin.version>3.1.0</maven-project-info-reports-plugin.version> <maven-plugin-plugin.version>3.6.1</maven-plugin-plugin.version> <maven-javadoc-plugin.version>3.3.0</maven-javadoc-plugin.version> <maven-source-plugin.version>3.2.1</maven-source-plugin.version> <maven-jxr-plugin.version>3.0.0</maven-jxr-plugin.version> Lombok版本升级https://projectlombok.org/changelog <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <!-- <version>1.16.20</version>--> <version>1.18.22</version> </dependency> Java9 模块化后,不允许应用程序查看来自JDK的所有类,会影响部分反射的运行,需要通过以下命令解决 --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED 本地使用了transmittable-thread-local-2.14.2.jar后启动报错 在agent后面加上日志输出即可解决,至于原因,猜测是跟类加载顺序有关系 -javaagent:/Users/admin/Documents/transmittable-thread-local-2.14.2.jar =ttl.agent.logger:STDOUT 以上内容仅针对彩虹桥项目升级遇到的问题,不同的业务代码适配的情况可能不一样,需要根据实际情况寻找解决方案。 4.3 JVM参数替换 下面是一些通用GC参数和ZGC特有参数以及ZGC的一些诊断选型,来自官网:Main - Main - OpenJDK Wiki 具体每个参数的含义,这里不做介绍,可参考官网文档The java Command,里面有详细说明。 JKD8+G1的启动参数: -server -Xms36600m -Xmx36600m -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:+PrintReferenceGC -XX:+ParallelRefProcEnabled -XX:G1HeapRegionSize=16m -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/opt/apps/errorDump.hprof -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintHeapAtGC -XX:+PrintGCApplicationConcurrentTime -verbose:gc -Xloggc:/opt/apps/logs/${app_name}-gc.log JDK17+ZGC的启动参数如下: -server -Xms36600m -Xmx36600m #开启ZGC -XX:+UseZGC #GC周期之间的最大间隔(单位秒) -XX:ZCollectionInterval=120 #官方的解释是 ZGC 的分配尖峰容忍度,数值越大越早触发GC -XX:ZAllocationSpikeTolerance=4 #关闭主动GC周期,在主动回收模式下,ZGC 会在系统空闲时自动执行垃圾回收,以减少垃圾回收在应用程序忙碌时所造成的影响。如果未指定此参数(默认情况),ZGC 会在需要时(即堆内存不足以满足分配请求时)执行垃圾回收。 -XX:-ZProactive #GC日志 -Xlog:safepoint=trace,classhisto*=trace,age*=info,gc*=info:file=/opt/logs/gc-%t.log:time,level,tid,tags:filesize=50M #发生OOM时dump内存日志 -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/opt/apps/errorDump.hprof 5 压测结果 直接上图 正如 ZGC 设计目标所描述,它将 GC 暂停时间从过去的几十毫秒降低到了令人惊叹的亚毫秒级别。然而,这种超低延迟表现也需要一定的代价,因为在实现低延迟的同时,ZGC 会占用一定的 CPU 资源。通常情况下,ZGC 占用的 CPU 比例不会超过 15%。在彩虹桥项目中,使用以上推荐的 JVM 参数后,ZGC 占用的 CPU 资源为 6% 左右。 6 ZGC日志 6.1 输出ZGC日志 GC日志中包含有关 GC 操作的详细信息,可以帮我们分析当前GC存在的问题。先来看一下上面JVM参数中关于GC日志的参数 -Xlog:safepoint=trace,classhisto*=trace,age*=info,gc*=info:file=/opt/logs/gc-%t.log:time,level,tid,tags:filesize=50M safepoint=trace:记录关于 safepoint 的 trace 级别日志。 Safepoint 是 JVM 中一个特殊的状态,它用于确保所有线程在特定操作(如垃圾回收、代码优化等)之前进入安全状态。 classhisto*=trace:记录与类的历史相关的 trace 级别日志。 age*=info:记录与对象年龄(在新生代中存在的时间)相关的 info 级别日志。 gc*=info:记录与垃圾回收相关的 info 级别日志。 file=/opt/logs/gc-%t.log:将日志写入到 /opt/logs/ 目录下的文件中,文件名为 gc-%t.log,其中 %t 是一个占位符,表示当前时间戳。 time,level,tid,tags:在每个日志记录中包含时间戳、日志级别、线程 ID 和标签。 filesize=50M:设置日志文件的大小限制为 50MB。当日志文件大小达到此限制时,JVM 将创建一个新的日志文件并继续记录。 更详细的gc日志配置可以参考:https://docs.oracle.com/en/java/javase/17/docs/specs/man/java.html#enable-logging-with-the-jvm-unified-logging-framework 6.2 STW关键日志 其中我们重点关注的就是GC的STW情况,以下是一些关键字代表GC STW阶段 最基本的STW三阶段,初始标记:日志中Pause Mark Start,再标记:日志中Pause Mark End,初始转移:日志中Pause Relocate Start。 内存分配阻塞:这一般是因为垃圾生产速度大于回收速度,垃圾来不及回收,垃圾将堆占满时,线程会阻塞等待GC完成,关键字是Allocation Stall(被阻塞的线程名称) 如果出现此类日志,可以尝试如下方法解决: -XX:ZCollectionInterval 该配置含义:两个 GC 周期之间的最大间隔(单位秒)。默认情况下,此选项设置为 0(禁用),可以适当调小该配置,让GC周期缩短、提升垃圾回收速度,但这会提升应用CPU占用。 -XX:ZAllocationSpikeTolerance官方的解释是 ZGC 的分配尖峰容忍度。其实就是数值越大,越早触发回收。可以适当调大该配置,更早触发回收,提升垃圾回收速度,但这会提升应用CPU占用。 安全点:所有线程进入到安全点后才能进行GC,ZGC定期进入安全点判断是否需要GC。先进入安全点的线程需要等待后进入安全点的线程直到所有线程挂起。日志关键字safepoint ... stopped dump线程、内存:比如jstack、jmap命令,一般是手动dump导致,日志关键字HeapDumper 7 Linux大页内存 在openjdk的官网上也能看到,开启Linux大页内存后会提升应用的性能。 开启方式见官网文档https://wiki.openjdk.org/display/zgc/Main#Main-EnablingLargePagesOnLinux,注意除了修改系统配置外,还需要在进程JVM启动参数中新增-XX:+UseLargePages配置 经过几轮压测实际测试下来,发现在开启Linux大页后,CPU有8%左右的下降,但是由于大页面会提前预留指定大小的内存,会导致机器的内存使用率较高。而且目前生产环境没有其他应用开启此配置,稳定性有待考究,生产环境自行评估是否开启。 8 总结 在本篇文章中,我们探讨了如何升级到JDK 17,并使用最新一代垃圾回收器ZGC。经过实践和测试,我们发现升级后的系统在垃圾回收方面表现出色,暂停时间被有效控制在1毫秒内。尽管这一优化过程可能会消耗额外的CPU资源,但所获得的超低GC暂停时间显然是非常值得的。总之,相比其他垃圾回收器,ZGC 的性能和稳定性已经非常优秀,而且不需要太多的调优。在大多数情况下,使用 ZGC官方推荐的默认设置即可获得优秀的性能表现。对于那些RT敏感型应用,升级到JDK 17并采用ZGC是一个明智的选择。 文: 新一 本文属得物技术原创,来源于:得物技术官网 未经得物技术许可严禁转载,否则依法追究法律责任!

优秀的个人博客,低调大师

JDK17+ZGC初体验|得物技术

1 前言 垃圾回收器的暂停问题一直是Java工程师关注的重点,特别是对实时响应要求较高的服务来说,CMS和G1等主流垃圾回收器的数十毫秒乃至上百毫秒的暂停时间相当致命。此外,调优门槛也相对较高,需要对垃圾回收器的内部机制有一定的了解,才能够进行有效的调优。 为了解决此类问题,JDK 11开始推出了一种低延迟垃圾回收器ZGC。ZGC使用了一些新技术和优化算法,可以将GC暂停时间控制在10毫秒以内,而在JDK 17的加持下,ZGC的暂停时间甚至可以控制在亚毫秒级别! 2 ZGC ZGC相关介绍、原理,网上已经有很多类似文章,这里只做简单介绍。 2.1 设计目标 ZGC 最初在 JDK 11 中作为实验性功能引入,并在 JDK 15 中宣布为生产就绪。作为一款低延迟垃圾收集器,旨在满足以下目标: 8MB到16TB的堆大小支持 10ms最大GC暂时 最糟糕的情况下吞吐量会降低15%(低延时换吞吐量很值,吞吐量扩容即可解决) 2.2 ZGC 内存分布 ZGC与传统的CMS、G1不同、它没有分代的概念,只有类似G1的Region概率,ZGC 的 Region可以具有如下图所示的大中下三类容量: 小型 Region(Small Region):容量固定为2MB,用于放置小于 256KB的小对象。 中型 Region(Medium Region):容量固定为 32MB,用于放置大于 256KB但是小于 4MB的对象。 大型 Region(Large Region):容量不固定,可以动态变化,但必须为 2MB的整数倍,用于放置 4MB或以上的大对象。每个大型 Region中会存放一个大对象,这也预示着虽然名字叫“大型 Region”,但它的实际容量完全有可能小于中型Region,最小容量可低至4MB。大型 Region在ZGC的实现中是不会被重分配的(重分配是ZGC的一种处理动作,用于复制对象的收集器阶段)因为复制大对象的代价非常高。 2.3 GC工作过程 与CMS中的ParNew和G1类似,ZGC也采用标记-复制算法,不过ZGC通过着色指针和读屏障技术,解决了转移过程中准确访问对象的问题,在标记、转移和重定位阶段几乎都是并发执行的,这是ZGC实现停顿时间小于10ms目标的最关键原因。 从上图中可以看出,ZGC只有三个STW阶段:初始标记,再标记,初始转移。 具体转移过程,网上有大量类似文章,这里不做详细介绍,大家有兴趣可以参考以下文章: 新一代垃圾回收器ZGC的探索与实践 ZGC 最新一代垃圾回收器 | 程序员进阶 3 为什么选择JDK17呢? JDK 17于9月14日发布,是一个长期支持(LTS)版本,这意味着它将在很多年内得到支持和更新。这也是第一个LTS版本,其中包含了一个可用于生产环境的ZGC版本。回顾一下,ZGC的实验版本已经包含在JDK 11(之前的LTS版本)中,而第一个可用于生产环境的ZGC版本出现在JDK 15(一个非LTS版本)中。 4 升级过程 从JDK8+G1升级到JDK17+ZGC,主要是在代码层面和JVM启动参数层面的做适配。 4.1 JDK下载 首先jdk17选择的是openjdk,下载地址:https://jdk.java.net/archive/,选择版本17 GA 4.2 代码适配 JDK11移除了 Java EE and CORBA 的模块 项目中如果用到javax.annotation.*、javax.xml.*等等开头的包,需要手动引入对应依赖 <dependency> <groupId>javax.annotation</groupId> <artifactId>javax.annotation-api</artifactId> </dependency> <dependency> <groupId>javax.xml.bind</groupId> <artifactId>jaxb-api</artifactId> </dependency> <dependency> <groupId>com.sun.xml.bind</groupId> <artifactId>jaxb-core</artifactId> </dependency> <dependency> <groupId>com.sun.xml.bind</groupId> <artifactId>jaxb-impl</artifactId> </dependency> maven相关依赖版本升级 <!-- 仅供参考 --> <maven-compiler-plugin.version>3.8.1</maven-compiler-plugin.version> <maven-assembly-plugin.version>3.3.0</maven-assembly-plugin.version> <maven-resources-plugin.version>3.2.0</maven-resources-plugin.version> <maven-jar-plugin.version>3.2.0</maven-jar-plugin.version> <maven-surefire-plugin.version>3.0.0-M5</maven-surefire-plugin.version> <maven-deploy-plugin.version>3.0.0-M1</maven-deploy-plugin.version> <maven-release-plugin.version>3.0.0-M1</maven-release-plugin.version> <maven-site-plugin.version>3.9.1</maven-site-plugin.version> <maven-enforcer-plugin.version>3.0.0-M2</maven-enforcer-plugin.version> <maven-project-info-reports-plugin.version>3.1.0</maven-project-info-reports-plugin.version> <maven-plugin-plugin.version>3.6.1</maven-plugin-plugin.version> <maven-javadoc-plugin.version>3.3.0</maven-javadoc-plugin.version> <maven-source-plugin.version>3.2.1</maven-source-plugin.version> <maven-jxr-plugin.version>3.0.0</maven-jxr-plugin.version> Lombok版本升级https://projectlombok.org/changelog <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <!-- <version>1.16.20</version>--> <version>1.18.22</version> </dependency> Java9 模块化后,不允许应用程序查看来自JDK的所有类,会影响部分反射的运行,需要通过以下命令解决 --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED 本地使用了transmittable-thread-local-2.14.2.jar后启动报错 在agent后面加上日志输出即可解决,至于原因,猜测是跟类加载顺序有关系 -javaagent:/Users/admin/Documents/transmittable-thread-local-2.14.2.jar =ttl.agent.logger:STDOUT 以上内容仅针对彩虹桥项目升级遇到的问题,不同的业务代码适配的情况可能不一样,需要根据实际情况寻找解决方案。 4.3 JVM参数替换 下面是一些通用GC参数和ZGC特有参数以及ZGC的一些诊断选型,来自官网:Main - Main - OpenJDK Wiki 具体每个参数的含义,这里不做介绍,可参考官网文档The java Command,里面有详细说明。 JKD8+G1的启动参数: -server -Xms36600m -Xmx36600m -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:+PrintReferenceGC -XX:+ParallelRefProcEnabled -XX:G1HeapRegionSize=16m -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/opt/apps/errorDump.hprof -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintHeapAtGC -XX:+PrintGCApplicationConcurrentTime -verbose:gc -Xloggc:/opt/apps/logs/${app_name}-gc.log JDK17+ZGC的启动参数如下: -server -Xms36600m -Xmx36600m #开启ZGC -XX:+UseZGC #GC周期之间的最大间隔(单位秒) -XX:ZCollectionInterval=120 #官方的解释是 ZGC 的分配尖峰容忍度,数值越大越早触发GC -XX:ZAllocationSpikeTolerance=4 #关闭主动GC周期,在主动回收模式下,ZGC 会在系统空闲时自动执行垃圾回收,以减少垃圾回收在应用程序忙碌时所造成的影响。如果未指定此参数(默认情况),ZGC 会在需要时(即堆内存不足以满足分配请求时)执行垃圾回收。 -XX:-ZProactive #GC日志 -Xlog:safepoint=trace,classhisto*=trace,age*=info,gc*=info:file=/opt/logs/gc-%t.log:time,level,tid,tags:filesize=50M #发生OOM时dump内存日志 -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/opt/apps/errorDump.hprof 5 压测结果 直接上图 正如 ZGC 设计目标所描述,它将 GC 暂停时间从过去的几十毫秒降低到了令人惊叹的亚毫秒级别。然而,这种超低延迟表现也需要一定的代价,因为在实现低延迟的同时,ZGC 会占用一定的 CPU 资源。通常情况下,ZGC 占用的 CPU 比例不会超过 15%。在彩虹桥项目中,使用以上推荐的 JVM 参数后,ZGC 占用的 CPU 资源为 6% 左右。 6 ZGC日志 6.1 输出ZGC日志 GC日志中包含有关 GC 操作的详细信息,可以帮我们分析当前GC存在的问题。先来看一下上面JVM参数中关于GC日志的参数 -Xlog:safepoint=trace,classhisto*=trace,age*=info,gc*=info:file=/opt/logs/gc-%t.log:time,level,tid,tags:filesize=50M safepoint=trace:记录关于 safepoint 的 trace 级别日志。 Safepoint 是 JVM 中一个特殊的状态,它用于确保所有线程在特定操作(如垃圾回收、代码优化等)之前进入安全状态。 classhisto*=trace:记录与类的历史相关的 trace 级别日志。 age*=info:记录与对象年龄(在新生代中存在的时间)相关的 info 级别日志。 gc*=info:记录与垃圾回收相关的 info 级别日志。 file=/opt/logs/gc-%t.log:将日志写入到 /opt/logs/ 目录下的文件中,文件名为 gc-%t.log,其中 %t 是一个占位符,表示当前时间戳。 time,level,tid,tags:在每个日志记录中包含时间戳、日志级别、线程 ID 和标签。 filesize=50M:设置日志文件的大小限制为 50MB。当日志文件大小达到此限制时,JVM 将创建一个新的日志文件并继续记录。 更详细的gc日志配置可以参考:https://docs.oracle.com/en/java/javase/17/docs/specs/man/java.html#enable-logging-with-the-jvm-unified-logging-framework 6.2 STW关键日志 其中我们重点关注的就是GC的STW情况,以下是一些关键字代表GC STW阶段 最基本的STW三阶段,初始标记:日志中Pause Mark Start,再标记:日志中Pause Mark End,初始转移:日志中Pause Relocate Start。 内存分配阻塞:这一般是因为垃圾生产速度大于回收速度,垃圾来不及回收,垃圾将堆占满时,线程会阻塞等待GC完成,关键字是Allocation Stall(被阻塞的线程名称) 如果出现此类日志,可以尝试如下方法解决: -XX:ZCollectionInterval 该配置含义:两个 GC 周期之间的最大间隔(单位秒)。默认情况下,此选项设置为 0(禁用),可以适当调小该配置,让GC周期缩短、提升垃圾回收速度,但这会提升应用CPU占用。 -XX:ZAllocationSpikeTolerance官方的解释是 ZGC 的分配尖峰容忍度。其实就是数值越大,越早触发回收。可以适当调大该配置,更早触发回收,提升垃圾回收速度,但这会提升应用CPU占用。 安全点:所有线程进入到安全点后才能进行GC,ZGC定期进入安全点判断是否需要GC。先进入安全点的线程需要等待后进入安全点的线程直到所有线程挂起。日志关键字safepoint ... stopped dump线程、内存:比如jstack、jmap命令,一般是手动dump导致,日志关键字HeapDumper 7 Linux大页内存 在openjdk的官网上也能看到,开启Linux大页内存后会提升应用的性能。 开启方式见官网文档https://wiki.openjdk.org/display/zgc/Main#Main-EnablingLargePagesOnLinux,注意除了修改系统配置外,还需要在进程JVM启动参数中新增-XX:+UseLargePages配置 经过几轮压测实际测试下来,发现在开启Linux大页后,CPU有8%左右的下降,但是由于大页面会提前预留指定大小的内存,会导致机器的内存使用率较高。而且目前生产环境没有其他应用开启此配置,稳定性有待考究,生产环境自行评估是否开启。 8 总结 在本篇文章中,我们探讨了如何升级到JDK 17,并使用最新一代垃圾回收器ZGC。经过实践和测试,我们发现升级后的系统在垃圾回收方面表现出色,暂停时间被有效控制在1毫秒内。尽管这一优化过程可能会消耗额外的CPU资源,但所获得的超低GC暂停时间显然是非常值得的。总之,相比其他垃圾回收器,ZGC 的性能和稳定性已经非常优秀,而且不需要太多的调优。在大多数情况下,使用 ZGC官方推荐的默认设置即可获得优秀的性能表现。对于那些RT敏感型应用,升级到JDK 17并采用ZGC是一个明智的选择。 文: 新一 本文属得物技术原创,来源于:得物技术官网 未经得物技术许可严禁转载,否则依法追究法律责任!

优秀的个人博客,低调大师

【ChatGPT应用篇】助力Beauty代码的初体验 | 京东云技术团队

思考过程: 案例1:项目里面有Excel文件的解析场景,试着与ChatGPT进行了交互,现将问题整理如下: 1.给我写一个Java版本的excel导入解析代码 (毫不客气的分配任务) 2.需要支持100000000数据量 (业务需求变更) 3.优化代码需要支持10000000数据量 (降低数量级,减轻难度) 4.请采用面向对象的思想给做一下封装 (初级工程师 -> 中级工程师) 5.进一步面向接口编程,抽离业务 (中级晋升应该加一点泛型诉求,代码更Beauty) 6.替换掉 poi 采用EasyExcel (替换原始的默认技术选型,替换三方包) 7.进一步优化,能支持EasyExcel、POI自由切换 (问题没问好!本意是想让他进一步抽象,能基于SPI支持扩展点) 8.采用控制反转方式优化 (与问题5有重复) 9.提升解析性能,采用多线程并行解析优化 (中级工程->高级工程师) 10.提升程序效率,把解析性能在提升10倍 (架构师的成本意识,这个问题的回答有点意思) 11.采用Spring IOC 控制反转方式进一步封装 (与问题8有重复) 12.添加异常 logger 打印 (优化细节) 13.添加更多的中文代码注释 (增强易读性,优化细节) 14.将中文注释替换成英文注释 (^_^ 开始走向国际化) 15.日志错误信息支持 国际化 资源文件 (国际化的开始……) 16.把上面Excel解析的代码在给来个单元测试吧 17.… 案例2:项目里面一个ES深度分页优化的场景,问题整理如下: 1.Elasticsearch 深度分页问题 2.Elasticsearch 深度分页Java代码 3.以上代码采用面向IOC优化 4.进一步面向对象优化 5.基于泛型进行抽象 6.将 scroll API 替换为 searchafter 7.使用 spring data elasticsearch 优化并实现 search After 8.以上代码请采用jmeter 生成一个压测脚本 9.上面Spring Data Elasticsearch 实现 search after 的示例代码请采用 groovy 语言生成测试用例 案例2的代码就不贴出来了,大家感兴趣可以自行输入以上问题进行体验! (个人体会:程序领域问问题要采用专业术语,专业术语越强越能精准给予反馈!) 说明: 对于ChatGPT输出的代码没有做仔细的Review及调试,但是看代码风格基本上是改改就能用于生产的,对于代码规范、命名还是很认可的! 案例过程: Happy Coding!Beauty Code! 以下是一个与ChatGPT 结对编程的一个Excel数据解析案例,大家可以根据我的思考过程进行体验,以下放了一些截图,ChatGPT的反应速度惊人,代码版本一步步进行迭代优化,感触较深的是真正的高质量低代码平台要应运而生了! import com.alibaba.excel.EasyExcel; import com.alibaba.excel.annotation.ExcelProperty; import com.alibaba.excel.metadata.Sheet; import com.alibaba.excel.read.listener.ReadListener; import org.apache.poi.hssf.usermodel.HSSFWorkbook; import org.apache.poi.ss.usermodel.Cell; import org.apache.poi.ss.usermodel.Row; import org.apache.poi.ss.usermodel.Sheet as PoiSheet; import org.apache.poi.xssf.usermodel.XSSFWorkbook; import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; public class ExcelParser { private ParserType parserType; public ExcelParser(ParserType parserType) { this.parserType = parserType; } public List<Map<String, String>> parse(File file, int sheetIndex) throws IOException { if (parserType == ParserType.EASY_EXCEL) { return parseWithEasyExcel(file, sheetIndex); } else if (parserType == ParserType.POI) { return parseWithPoi(file, sheetIndex); } else { throw new IllegalArgumentException("Invalid parser type"); } } private List<Map<String, String>> parseWithEasyExcel(File file, int sheetIndex) { List<Map<String, String>> data = new ArrayList<>(); DataListener listener = new DataListener(data); ExcelReaderBuilder readerBuilder = EasyExcel.read(file, listener); Sheet sheet = new Sheet(sheetIndex + 1, 0); readerBuilder.sheet(sheet).doRead(); return data; } private List<Map<String, String>> parseWithPoi(File file, int sheetIndex) throws IOException { List<Map<String, String>> data = new ArrayList<>(); FileInputStream inputStream = new FileInputStream(file); HSSFWorkbook workbook = file.getName().endsWith(".xls") ? new HSSFWorkbook(inputStream) : new XSSFWorkbook(inputStream); PoiSheet sheet = workbook.getSheetAt(sheetIndex); List<String> headers = new ArrayList<>(); for (Cell cell : sheet.getRow(0)) { headers.add(cell.getStringCellValue()); } for (int i = 1; i <= sheet.getLastRowNum(); i++) { Row row = sheet.getRow(i); Map<String, String> record = new HashMap<>(); for (int j = 0; j < headers.size(); j++) { Cell cell = row.getCell(j, Row.MissingCellPolicy.CREATE_NULL_AS_BLANK); String value = ""; switch (cell.getCellType()) { case STRING: value = cell.getStringCellValue(); break; case NUMERIC: value = Double.toString(cell.getNumericCellValue()); break; default: value = cell.getStringCellValue(); break; } record.put(headers.get(j), value); } data.add(record); } workbook.close(); inputStream.close(); return data; } public static void main(String[] args) throws IOException { File file = new File("excel_file.xlsx"); ExcelParser parser = new ExcelParser(ParserType.EASY_EXCEL); List<Map<String, String>> data = parser.parse(file, 0); System.out.println(data); } } enum ParserType { EASY_EXCEL, POI } class DataListener extends ReadListener<Data> { private List<Map<String, String>> data; private List<String> headers; public DataListener(List<Map<String, String>> data) { this.data = data; this.headers = new ArrayList<>(); } @Override public void onException(Exception exception) { // 处理异常 } @Override public void onReadError(Exception exception) { // 处理读取错误 } @Override public void onSheet(int sheetIndex, Sheet sheet) { // 处理工作表信息 } @Override public void onReadEnd() { // 读取结束 } @Override public void onHeadRow(Row row) { for (Cell cell : row) { headers.add(cell.getStringCellValue()); } } @Override public void onRecord(Data data) { Map<String, String> record = new HashMap<>(); for (int i = 0; i < headers.size(); i++) { record.put(headers.get(i), data.getField(i)); } this.data.add(record); } } class Data { @ExcelProperty(index = 0) private String name; @ExcelProperty(index = 1) private int age; import com.alibaba.excel.annotation.ExcelProperty; import java.util.List; import java.util.Map; public class ExcelParser { private SheetParser sheetParser; public ExcelParser(SheetParser sheetParser) { this.sheetParser = sheetParser; } public List<Map<String, String>> parse(String fileName, int sheetIndex) throws Exception { return sheetParser.parse(fileName, sheetIndex); } public static void main(String[] args) throws Exception { String fileName = "excel_file.xlsx"; SheetParser sheetParser = new EasyExcelSheetParser(); ExcelParser parser = new ExcelParser(sheetParser); List<Map<String, String>> data = parser.parse(fileName, 0); System.out.println(data); } } interface SheetParser { List<Map<String, String>> parse(String fileName, int sheetIndex) throws Exception; } class EasyExcelSheetParser implements SheetParser { @Override public List<Map<String, String>> parse(String fileName, int sheetIndex) throws Exception { List<Map<String, String>> data = new ArrayList<>(); DataListener listener = new DataListener(data); ExcelReaderBuilder readerBuilder = EasyExcel.read(fileName, listener); Sheet sheet = new Sheet(sheetIndex + 1, 0); readerBuilder.sheet(sheet).doRead(); return data; } } class PoiSheetParser implements SheetParser { @Override public List<Map<String, String>> parse(String fileName, int sheetIndex) throws Exception { List<Map<String, String>> data = new ArrayList<>(); FileInputStream inputStream = new FileInputStream(fileName); HSSFWorkbook workbook = fileName.endsWith(".xls") ? new HSSFWorkbook(inputStream) : new XSSFWorkbook(inputStream); PoiSheet sheet = workbook.getSheetAt(sheetIndex); List<String> headers = new ArrayList<>(); for (Cell cell : sheet.getRow(0)) { headers.add(cell.getStringCellValue()); } for (int i = 1; i <= sheet.getLastRowNum(); i++) { Row row = sheet.getRow(i); Map<String, String> record = new HashMap<>(); for (int j = 0; j < headers.size(); j++) { Cell cell = row.getCell(j, Row.MissingCellPolicy.CREATE_NULL_AS_BLANK); String value = ""; switch (cell.getCellType()) { case STRING: value = cell.getStringCellValue(); break; case NUMERIC: value = Double.toString(cell.getNumericCellValue()); break; default: value = cell.getStringCellValue(); break; } record.put(headers.get(j), value); } data.add(record); } workbook.close(); inputStream.close(); return data; } } class DataListener extends ReadListener<Data> { private List<Map<String, String>> data; private List<String> headers; public DataListener(List<Map<String, String>> data) { this.data = data; this.headers = new ArrayList<>(); } @Override public void onException(Exception exception) { // 处理异常 } @Override public void onReadError(Exception exception) { // 处理读取错误 } @Override public void onSheet(int sheetIndex, Sheet sheet) { // 处理工作表信息 } @Override public void onReadEnd() { // 读取结束 } @Override public void onHeadRow(Row row) { for (Cell cell : row) { headers.add(cell.getStringCellValue()); } } @Override public void onRecord(Data data) { Map<String, String> record = new HashMap<>(); for (int i = 0; i < headers.size(); i++) { record.put(headers.get(i), data.getField(i)); } this.data.add(record); } } class Data { @ExcelProperty(index = 0) private String name; @ExcelProperty(index = 1) private int age; // Getters and setters } import com.alibaba.excel.EasyExcel; import com.alibaba.excel.annotation.ExcelProperty; import com.alibaba.excel.metadata.Sheet; import com.alibaba.excel.read.listener.ReadListener; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.*; public class ExcelParser { private SheetParser sheetParser; public ExcelParser(SheetParser sheetParser) { this.sheetParser = sheetParser; } public List<Map<String, String>> parse(String fileName, int sheetIndex) throws ExecutionException, InterruptedException { return sheetParser.parse(fileName, sheetIndex); } public static void main(String[] args) throws ExecutionException, InterruptedException { String fileName = "excel_file.xlsx"; ExecutorService executor = Executors.newFixedThreadPool(2); List<Future<List<Map<String, String>>>> futures = new ArrayList<>(); for (int i = 0; i < 2; i++) { SheetParser sheetParser = new EasyExcelSheetParser(); Future<List<Map<String, String>>> future = executor.submit(() -> sheetParser.parse(fileName, i)); futures.add(future); } List<Map<String, String>> data = new ArrayList<>(); for (Future<List<Map<String, String>>> future : futures) { data.addAll(future.get()); } System.out.println(data); executor.shutdown(); } } interface SheetParser { List<Map<String, String>> parse(String fileName, int sheetIndex); } class EasyExcelSheetParser implements SheetParser { @Override public List<Map<String, String>> parse(String fileName, int sheetIndex) { List<Map<String, String>> data = new ArrayList<>(); DataListener listener = new DataListener(data); ExcelReaderBuilder readerBuilder = EasyExcel.read(fileName, listener); Sheet sheet = new Sheet(sheetIndex + 1, 0); readerBuilder.sheet(sheet).doRead(); return data; } } class PoiSheetParser implements SheetParser { @Override public List<Map<String, String>> parse(String fileName, int sheetIndex) throws Exception { // 实现 POI 的解析,类似于前面的示例 } } class DataListener extends ReadListener<Data> { private List<Map<String, String>> data; private List<String> headers; public DataListener(List<Map<String, String>> data) { this.data = data; this.headers = new ArrayList<>(); } @Override public void onHeadRow(Row row) { for (Cell cell : row) { headers.add(cell.getStringCellValue()); } } @Override public void onRecord(Data data) { Map<String, String> record = new HashMap<>(); for (int i = 0; i < headers.size(); i++) { record.put(headers.get(i), data.getField(i)); } this.data.add(record); } } class Data { @ExcelProperty(index = 0) private String name; @ExcelProperty(index = 1) private int age; // Getters and setters } import java.util.List; import java.util.Map; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Component public class ExcelParser { private SheetParser sheetParser; @Autowired public ExcelParser(SheetParser sheetParser) { this.sheetParser = sheetParser; } public List<Map<String, String>> parse(String fileName, int sheetIndex) throws Exception { return sheetParser.parse(fileName, sheetIndex); } } interface SheetParser { List<Map<String, String>> parse(String fileName, int sheetIndex) throws Exception; } @Component class EasyExcelSheetParser implements SheetParser { @Override public List<Map<String, String>> parse(String fileName, int sheetIndex) throws Exception { List<Map<String, String>> data = new ArrayList<>(); DataListener listener = new DataListener(data); ExcelReaderBuilder readerBuilder = EasyExcel.read(fileName, listener); Sheet sheet = new Sheet(sheetIndex + 1, 0); readerBuilder.sheet(sheet).doRead(); return data; } } @Component class PoiSheetParser implements SheetParser { @Override public List<Map<String, String>> parse(String fileName, int sheetIndex) throws Exception { // 实现 POI 的解析,类似于前面的示例 } } class DataListener extends ReadListener<Data> { private List<Map<String, String>> data; private List<String> headers; public DataListener(List<Map<String, String>> data) { this.data = data; this.headers = new ArrayList<>(); } @Override public void onHeadRow(Row row) { for (Cell cell : row) { headers.add(cell.getStringCellValue()); } } @Override public void onRecord(Data data) { Map<String, String> record = new HashMap<>(); for (int i = 0; i < headers.size(); i++) { record.put(headers.get(i), data.getField(i)); } this.data.add(record); } } class Data { @ExcelProperty(index = 0) private String name; @ExcelProperty(index = 1) private int age; // Getters and setters } 原理学习: 知其然,并未知其所以然!直到看到下面的解惑视频…… 视频号搜索: 于建国博士讲解ChatGPT的工作原理及颠覆性影响,通俗易懂 (愿学有所得) 职责有边界、思考无边界、担当无边界!!! 利用好ChatGPT一切皆有可能,用好新时代的生产力工具希望ChatGPT能助力大家更上一层楼!

资源下载

更多资源
优质分享Android(本站安卓app)

优质分享Android(本站安卓app)

近一个月的开发和优化,本站点的第一个app全新上线。该app采用极致压缩,本体才4.36MB。系统里面做了大量数据访问、缓存优化。方便用户在手机上查看文章。后续会推出HarmonyOS的适配版本。

Mario,低调大师唯一一个Java游戏作品

Mario,低调大师唯一一个Java游戏作品

马里奥是站在游戏界顶峰的超人气多面角色。马里奥靠吃蘑菇成长,特征是大鼻子、头戴帽子、身穿背带裤,还留着胡子。与他的双胞胎兄弟路易基一起,长年担任任天堂的招牌角色。

Oracle Database,又名Oracle RDBMS

Oracle Database,又名Oracle RDBMS

Oracle Database,又名Oracle RDBMS,或简称Oracle。是甲骨文公司的一款关系数据库管理系统。它是在数据库领域一直处于领先地位的产品。可以说Oracle数据库系统是目前世界上流行的关系数据库管理系统,系统可移植性好、使用方便、功能强,适用于各类大、中、小、微机环境。它是一种高效率、可靠性好的、适应高吞吐量的数据库方案。

Eclipse(集成开发环境)

Eclipse(集成开发环境)

Eclipse 是一个开放源代码的、基于Java的可扩展开发平台。就其本身而言,它只是一个框架和一组服务,用于通过插件组件构建开发环境。幸运的是,Eclipse 附带了一个标准的插件集,包括Java开发工具(Java Development Kit,JDK)。