文盘Rust -- tonic-Rust grpc初体验 | 京东云技术团队
gRPC 是开发中常用的开源高性能远程过程调用(RPC)框架,tonic 是基于 HTTP/2 的 gRPC 实现,专注于高性能、互操作性和灵活性。该库的创建是为了对 async/await 提供一流的支持,并充当用 Rust 编写的生产系统的核心构建块。今天我们聊聊通过使用tonic 调用grpc的的具体过程。
工程规划
rpc程序一般包含server端和client端,为了方便我们把两个程序打包到一个工程里面 新建tonic_sample工程
cargo new tonic_sample
Cargo.toml 如下
[package] name = "tonic_sample" version = "0.1.0" edition = "2021" [[bin]] # Bin to run the gRPC server name = "stream-server" path = "src/stream_server.rs" [[bin]] # Bin to run the gRPC client name = "stream-client" path = "src/stream_client.rs" [dependencies] tokio.workspace = true tonic = "0.9" tonic-reflection = "0.9.2" prost = "0.11" tokio-stream = "0.1" async-stream = "0.2" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" rand = "0.7" h2 = { version = "0.3" } anyhow = "1.0.75" futures-util = "0.3.28" [build-dependencies] tonic-build = "0.9"
tonic 的示例代码还是比较齐全的,本次我们参考 tonic 的 streaming example。
首先编写 proto 文件,用来描述报文。 proto/echo.proto
syntax = "proto3"; package stream; // EchoRequest is the request for echo. message EchoRequest { string message = 1; } // EchoResponse is the response for echo. message EchoResponse { string message = 1; } // Echo is the echo service. service Echo { // UnaryEcho is unary echo. rpc UnaryEcho(EchoRequest) returns (EchoResponse) {} // ServerStreamingEcho is server side streaming. rpc ServerStreamingEcho(EchoRequest) returns (stream EchoResponse) {} // ClientStreamingEcho is client side streaming. rpc ClientStreamingEcho(stream EchoRequest) returns (EchoResponse) {} // BidirectionalStreamingEcho is bidi streaming. rpc BidirectionalStreamingEcho(stream EchoRequest) returns (stream EchoResponse) {} }
文件并不复杂,只有两个 message 一个请求一个返回,之所以选择这个示例是因为该示例包含了rpc中的流式处理,包扩了server 流、client 流以及双向流的操作。 编辑build.rs 文件
use std::{env, path::PathBuf}; fn main() -> Result<(), Box<dyn std::error::Error>> { tonic_build::compile_protos("proto/echo.proto")?; Ok(()) }
该文件用来通过 tonic-build 生成 grpc 的 rust 基础代码
完成上述工作后就可以构建 server 和 client 代码了
stream_server.rs
pub mod pb { tonic::include_proto!("stream"); } use anyhow::Result; use futures_util::FutureExt; use pb::{EchoRequest, EchoResponse}; use std::{ error::Error, io::ErrorKind, net::{SocketAddr, ToSocketAddrs}, pin::Pin, thread, time::Duration, }; use tokio::{ net::TcpListener, sync::{ mpsc, oneshot::{self, Receiver, Sender}, Mutex, }, task::{self, JoinHandle}, }; use tokio_stream::{ wrappers::{ReceiverStream, TcpListenerStream}, Stream, StreamExt, }; use tonic::{transport::Server, Request, Response, Status, Streaming}; type EchoResult<T> = Result<Response<T>, Status>; type ResponseStream = Pin<Box<dyn Stream<Item = Result<EchoResponse, Status>> + Send>>; fn match_for_io_error(err_status: &Status) -> Option<&std::io::Error> { let mut err: &(dyn Error + 'static) = err_status; loop { if let Some(io_err) = err.downcast_ref::<std::io::Error>() { return Some(io_err); } // h2::Error do not expose std::io::Error with `source()` // https://github.com/hyperium/h2/pull/462 if let Some(h2_err) = err.downcast_ref::<h2::Error>() { if let Some(io_err) = h2_err.get_io() { return Some(io_err); } } err = match err.source() { Some(err) => err, None => return None, }; } } #[derive(Debug)] pub struct EchoServer {} #[tonic::async_trait] impl pb::echo_server::Echo for EchoServer { async fn unary_echo(&self, req: Request<EchoRequest>) -> EchoResult<EchoResponse> { let req_str = req.into_inner().message; let response = EchoResponse { message: req_str }; Ok(Response::new(response)) } type ServerStreamingEchoStream = ResponseStream; async fn server_streaming_echo( &self, req: Request<EchoRequest>, ) -> EchoResult<Self::ServerStreamingEchoStream> { println!("EchoServer::server_streaming_echo"); println!("\tclient connected from: {:?}", req.remote_addr()); // creating infinite stream with requested message let repeat = std::iter::repeat(EchoResponse { message: req.into_inner().message, }); let mut stream = Box::pin(tokio_stream::iter(repeat).throttle(Duration::from_millis(200))); let (tx, rx) = mpsc::channel(128); tokio::spawn(async move { while let Some(item) = stream.next().await { match tx.send(Result::<_, Status>::Ok(item)).await { Ok(_) => { // item (server response) was queued to be send to client } Err(_item) => { // output_stream was build from rx and both are dropped break; } } } println!("\tclient disconnected"); }); let output_stream = ReceiverStream::new(rx); Ok(Response::new( Box::pin(output_stream) as Self::ServerStreamingEchoStream )) } async fn client_streaming_echo( &self, _: Request<Streaming<EchoRequest>>, ) -> EchoResult<EchoResponse> { Err(Status::unimplemented("not implemented")) } type BidirectionalStreamingEchoStream = ResponseStream; async fn bidirectional_streaming_echo( &self, req: Request<Streaming<EchoRequest>>, ) -> EchoResult<Self::BidirectionalStreamingEchoStream> { println!("EchoServer::bidirectional_streaming_echo"); let mut in_stream = req.into_inner(); let (tx, rx) = mpsc::channel(128); tokio::spawn(async move { while let Some(result) = in_stream.next().await { match result { Ok(v) => tx .send(Ok(EchoResponse { message: v.message })) .await .expect("working rx"), Err(err) => { if let Some(io_err) = match_for_io_error(&err) { if io_err.kind() == ErrorKind::BrokenPipe { eprintln!("\tclient disconnected: broken pipe"); break; } } match tx.send(Err(err)).await { Ok(_) => (), Err(_err) => break, // response was droped } } } } println!("\tstream ended"); }); // echo just write the same data that was received let out_stream = ReceiverStream::new(rx); Ok(Response::new( Box::pin(out_stream) as Self::BidirectionalStreamingEchoStream )) } } #[tokio::main] async fn main() -> Result<(), Box<dyn std::error::Error>> { // 基础server let server = EchoServer {}; Server::builder() .add_service(pb::echo_server::EchoServer::new(server)) .serve("0.0.0.0:50051".to_socket_addrs().unwrap().next().unwrap()) .await .unwrap(); Ok(()) }
server 端的代码还是比较清晰的,首先通过 tonic::include_proto! 宏引入grpc定义,参数是 proto 文件中定义的 package 。我们重点说说 server_streaming_echo function 。这个function 的处理流程明白了,其他的流式处理大同小异。首先 通过std::iter::repeat function 定义一个迭代器;然后构建 tokio_stream 在本示例中 每 200毫秒产生一个 repeat;最后构建一个 channel ,tx 用来发送从stream中获取的内容太,rx 封装到response 中返回。 最后 main 函数 拉起服务。
client 代码如下
pub mod pb { tonic::include_proto!("stream"); } use std::time::Duration; use tokio_stream::{Stream, StreamExt}; use tonic::transport::Channel; use pb::{echo_client::EchoClient, EchoRequest}; fn echo_requests_iter() -> impl Stream<Item = EchoRequest> { tokio_stream::iter(1..usize::MAX).map(|i| EchoRequest { message: format!("msg {:02}", i), }) } async fn unary_echo(client: &mut EchoClient<Channel>, num: usize) { for i in 0..num { let req = tonic::Request::new(EchoRequest { message: "msg".to_string() + &i.to_string(), }); let resp = client.unary_echo(req).await.unwrap(); println!("resp:{}", resp.into_inner().message); } } async fn streaming_echo(client: &mut EchoClient<Channel>, num: usize) { let stream = client .server_streaming_echo(EchoRequest { message: "foo".into(), }) .await .unwrap() .into_inner(); // stream is infinite - take just 5 elements and then disconnect let mut stream = stream.take(num); while let Some(item) = stream.next().await { println!("\treceived: {}", item.unwrap().message); } // stream is droped here and the disconnect info is send to server } async fn bidirectional_streaming_echo(client: &mut EchoClient<Channel>, num: usize) { let in_stream = echo_requests_iter().take(num); let response = client .bidirectional_streaming_echo(in_stream) .await .unwrap(); let mut resp_stream = response.into_inner(); while let Some(received) = resp_stream.next().await { let received = received.unwrap(); println!("\treceived message: `{}`", received.message); } } async fn bidirectional_streaming_echo_throttle(client: &mut EchoClient<Channel>, dur: Duration) { let in_stream = echo_requests_iter().throttle(dur); let response = client .bidirectional_streaming_echo(in_stream) .await .unwrap(); let mut resp_stream = response.into_inner(); while let Some(received) = resp_stream.next().await { let received = received.unwrap(); println!("\treceived message: `{}`", received.message); } } #[tokio::main] async fn main() -> Result<(), Box<dyn std::error::Error>> { let mut client = EchoClient::connect("http://127.0.0.1:50051").await.unwrap(); println!("Unary echo:"); unary_echo(&mut client, 10).await; tokio::time::sleep(Duration::from_secs(1)).await; println!("Streaming echo:"); streaming_echo(&mut client, 5).await; tokio::time::sleep(Duration::from_secs(1)).await; //do not mess server println functions // Echo stream that sends 17 requests then graceful end that connection println!("\r\nBidirectional stream echo:"); bidirectional_streaming_echo(&mut client, 17).await; // Echo stream that sends up to `usize::MAX` requests. One request each 2s. // Exiting client with CTRL+C demonstrate how to distinguish broken pipe from // graceful client disconnection (above example) on the server side. println!("\r\nBidirectional stream echo (kill client with CTLR+C):"); bidirectional_streaming_echo_throttle(&mut client, Duration::from_secs(2)).await; Ok(()) }
测试一下,分别运行 server 和 client
cargo run --bin stream-server cargo run --bin stream-client
在开发中,我们通常不会再 client 和 server都开发好的情况下才开始测试。通常在开发server 端的时候采用 grpcurl 工具进行测试工作
grpcurl -import-path ./proto -proto echo.proto list grpcurl -import-path ./proto -proto echo.proto describe stream.Echo grpcurl -plaintext -import-path ./proto -proto echo.proto -d '{"message":"1234"}' 127.0.0.1:50051 stream.Echo/UnaryEcho
此时,如果我们不指定 -import-path 参数,执行如下命令
grpcurl -plaintext 127.0.0.1:50051 list
会出现如下报错信息
Failed to list services: server does not support the reflection API
让服务端程序支持 reflection API
use std::{env, path::PathBuf}; fn main() -> Result<(), Box<dyn std::error::Error>> { let out_dir = PathBuf::from(env::var("OUT_DIR").unwrap()); tonic_build::configure() .file_descriptor_set_path(out_dir.join("stream_descriptor.bin")) .compile(&["proto/echo.proto"], &["proto"]) .unwrap(); Ok(()) }
file_descriptor_set_path 生成一个文件,其中包含为协议缓冲模块编码的 prost_types::FileDescriptorSet
文件。这是实现 gRPC 服务器反射所必需的。
接下来改造一下 stream-server.rs,涉及两处更改。
新增 STREAM_DESCRIPTOR_SET 常量
pub mod pb { tonic::include_proto!("stream"); pub const STREAM_DESCRIPTOR_SET: &[u8] = tonic::include_file_descriptor_set!("stream_descriptor"); }
修改main函数
#[tokio::main] async fn main() -> Result<(), Box<dyn std::error::Error>> { // 基础server // let server = EchoServer {}; // Server::builder() // .add_service(pb::echo_server::EchoServer::new(server)) // .serve("0.0.0.0:50051".to_socket_addrs().unwrap().next().unwrap()) // .await // .unwrap(); // tonic_reflection let service = tonic_reflection::server::Builder::configure() .register_encoded_file_descriptor_set(pb::STREAM_DESCRIPTOR_SET) .with_service_name("stream.Echo") .build() .unwrap(); let addr = "0.0.0.0:50051".parse().unwrap(); let server = EchoServer {}; Server::builder() .add_service(service) .add_service(pb::echo_server::EchoServer::new(server)) .serve(addr) .await?; Ok(()) }
register_encoded_file_descriptor_set 将包含编码的 prost_types::FileDescriptorSet
的 byte slice 注册到 gRPC Reflection 服务生成器注册。
再次测试
grpcurl -plaintext 127.0.0.1:50051 list grpcurl -plaintext 127.0.0.1:50051 describe stream.Echo
返回正确结果。
作者:京东科技 贾世闻
来源:京东云开发者社区 转载请注明来源

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
618京东到家APP-门详页反爬实战 | 京东云技术团队
一、背景与系统安全需求分析 1. 系统的重要性 上图所示是接口所属位置、对电商平台或在线商店而言,分类查商品都是很重要的,通过为用户提供清晰的商品分类,帮助他们快速找到所需产品,节省浏览时间,提升购物效率,是购物结算产生GMV的核心环节。那么电商平台为什么都很看重商品信息的爬取? a. 数据收集和分析:这些数据对于市场研究、竞争分析、价格比较等方面非常有价值。可获得有关产品趋势、消费者偏好、价格波动等信息,有助于企业进行决策和制定营销策略。 b. 价格监控和动态调整:可以实时跟踪和监控竞争对手的价格变化。企业可以根据市场情况及时调整自己的产品定价,保持竞争力,并更好地满足消费者需求等。 2. 风险评估 a. 系统安全、以及触发各种报警 b. 数据安全 c. 带宽和服务器资源消耗 d. 不良竞争等; 3. 618期间的爬虫问题 由于这个接口还比较特殊,我们在3个版本前刚迁移color网关,其他低版本使用的是另一个物理网关我们暂且称: B网关,另外在B网关还由于一些历史原因区分了Get 和 Post 两个接口对客户端提供。所以一共是3个接口。 客户端有多平台:h5, 微信小程序、支付宝小程...
- 下一篇
iOS16新特性:实时活动-在锁屏界面实时更新APP消息 | 京东云技术团队
简介 之前在 《iOS16新特性:灵动岛适配开发与到家业务场景结合的探索实践》 里介绍了iOS16新的特性:实时更新(Live Activity)中灵动岛的适配流程,但其实除了灵动岛的展示样式,Live Activity还有一种非常实用的应用场景,那就是锁屏界面实时状态更新: 上图是部分已经做出适配的APP,锁屏实时活动的展示。可以看到,相比于灵动岛的样式,锁屏更新的展示区域更大,能够显示更多信息,并且是在锁屏界面上进行展示,结合苹果在iPhone14之后推出的“全天候显示”功能,能够让用户在不解锁手机,甚至不拿起手机的情况下就能够获取到APP内最新的消息更新,在某些应用场景下非常实用。 这篇文章主要就介绍Live Activity中锁屏实时活动样式的适配流程,再结合实际开发过程中的遇到的问题进行实际详解: 限制条件 在进行开发之前,需要先了解一下锁屏实时活动的一些限制条件: 1.实时活动显示在通知区域且有更自由的视图定制和刷新方法,但是跟Widget小组件一样,它也限制了视图上的动画开发,所有的动画效果仅能由系统处理。 2.锁屏通知区域内的实时活动在8小时之内可以刷新数据展示,超过8...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- Windows10,CentOS7,CentOS8安装MongoDB4.0.16
- Linux系统CentOS6、CentOS7手动修改IP地址
- CentOS7编译安装Gcc9.2.0,解决mysql等软件编译问题
- Jdk安装(Linux,MacOS,Windows),包含三大操作系统的最全安装
- SpringBoot2整合Redis,开启缓存,提高访问速度
- SpringBoot2整合MyBatis,连接MySql数据库做增删改查操作
- CentOS8,CentOS7,CentOS6编译安装Redis5.0.7
- MySQL8.0.19开启GTID主从同步CentOS8
- Mario游戏-低调大师作品
- CentOS7安装Docker,走上虚拟化容器引擎之路