简单的Java实现Netty进行通信
使用Java搭建一个简单的Netty通信例子
看过dubbo源码的同学应该都清楚,使用dubbo协议的底层通信是使用的netty进行交互,而最近看了dubbo的Netty部分后,自己写了个简单的Netty通信例子。
本文源地址:实现Netty进行通信
准备
工程截图
模块详解
rpc-common
rpc-common作为各个模块都需使用的模块,工程中出现的是一些通信时请求的参数以及返回的参数,还有一些序列化的工具。
rpc-client
rpc-client中目前只是单单的一个NettyClient启动类。
rpc-server
rpc-client中目前也只是单单的一个NettyServer服务启动类。
需要的依赖
目前所有的依赖项都出现在 rpc-common 下的 pom.xml中。
<!-- Netty --> <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.10.Final</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.25</version> </dependency> <!-- Protostuff --> <dependency> <groupId>com.dyuproject.protostuff</groupId> <artifactId>protostuff-core</artifactId> <version>1.0.9</version> </dependency> <dependency> <groupId>com.dyuproject.protostuff</groupId> <artifactId>protostuff-runtime</artifactId> <version>1.0.9</version> </dependency> <!-- Objenesis --> <dependency> <groupId>org.objenesis</groupId> <artifactId>objenesis</artifactId> <version>2.1</version> </dependency> <!-- fastjson --> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.38</version> </dependency>
实现
首先我们在common中先定义本次的Request和Response的基类对象。
public class Request {
private String requestId; private Object parameter; public String getRequestId() { return requestId; } public void setRequestId(String requestId) { this.requestId = requestId; } public Object getParameter() { return parameter; } public void setParameter(Object parameter) { this.parameter = parameter; }
}
public class Response {
private String requestId; private Object result; public String getRequestId() { return requestId; } public void setRequestId(String requestId) { this.requestId = requestId; } public Object getResult() { return result; } public void setResult(Object result) { this.result = result; }
}
使用fastJson进行本次序列化
Netty对象的序列化转换很好懂, ByteToMessageDecoder 和 MessageToByteEncoder 分别只要继承它们,重写方法后,获取到Object和Byte,各自转换就OK。
不过如果是有要用到生产上的同学,建议不要使用 fastJson,因为它的漏洞补丁真的是太多了,可以使用google的 protostuff。
public class RpcDecoder extends ByteToMessageDecoder {
// 目标对象类型进行解码 private Class<?> target; public RpcDecoder(Class target) { this.target = target; } @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { if (in.readableBytes() < 4) { // 不够长度丢弃 return; } in.markReaderIndex(); // 标记一下当前的readIndex的位置 int dataLength = in.readInt(); // 读取传送过来的消息的长度。ByteBuf 的readInt()方法会让他的readIndex增加4 if (in.readableBytes() < dataLength) { // 读到的消息体长度如果小于我们传送过来的消息长度,则resetReaderIndex. 这个配合markReaderIndex使用的。把readIndex重置到mark的地方 in.resetReaderIndex(); return; } byte[] data = new byte[dataLength]; in.readBytes(data); Object obj = JSON.parseObject(data, target); // 将byte数据转化为我们需要的对象 out.add(obj); }
}
public class RpcEncoder extends MessageToByteEncoder {
//目标对象类型进行编码 private Class<?> target; public RpcEncoder(Class target) { this.target = target; } @Override protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception { if (target.isInstance(msg)) { byte[] data = JSON.toJSONBytes(msg); // 使用fastJson将对象转换为byte out.writeInt(data.length); // 先将消息长度写入,也就是消息头 out.writeBytes(data); // 消息体中包含我们要发送的数据 } }
}
NetyServer
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { Request request = (Request) msg; System.out.println("Client Data:" + JSON.toJSONString(request)); Response response = new Response(); response.setRequestId(request.getRequestId()); response.setResult("Hello Client !"); // client接收到信息后主动关闭掉连接 ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); }
}
public class NettyServer {
private static final Logger logger = LoggerFactory.getLogger(NettyServer.class); private String ip; private int port; public NettyServer(String ip, int port) { this.ip = ip; this.port = port; } public void server() throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { final ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) .option(ChannelOption.SO_SNDBUF, 32 * 1024) .option(ChannelOption.SO_RCVBUF, 32 * 1024) .option(ChannelOption.SO_KEEPALIVE, true) .childHandler(new ChannelInitializer<SocketChannel>() { protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast(new RpcDecoder(Request.class)) .addLast(new RpcEncoder(Response.class)) .addLast(new NettyServerHandler()); } }); serverBootstrap.childOption(ChannelOption.SO_KEEPALIVE, true); // 开启长连接 ChannelFuture future = serverBootstrap.bind(ip, port).sync();
// if (future.isSuccess()) {
//
// new Register().register("/yanzhenyidai/com.yanzhenyidai.server", ip + ":" + port);
// }
future.channel().closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } public static void main(String[] args) throws Exception { new NettyServer("127.0.0.1", 20000).server(); }
}
关键名词:
EventLoopGroup
workerGroup
bossGroup
Server端的EventLoopGroup分为两个,一般workerGroup作为处理请求,bossGroup作为接收请求。
ChannelOption
SO_BACKLOG
SO_SNDBUF
SO_RCVBUF
SO_KEEPALIVE
以上四个常量作为TCP连接中的属性。
ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
NettyServerHandler中出现的 ChannelFutureListener.CLOSE ,作为Server端主动关闭与Client端的通信,如果没有主动Close,那么NettyClient将会一直处于阻塞状态,得不到NettyServer的返回信息。
NettyClient
public class NettyClient extends SimpleChannelInboundHandler {
private final String ip; private final int port; private Response response; public NettyClient(String ip, int port) { this.ip = ip; this.port = port; } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, Response response) throws Exception { this.response = response; } public Response client(Request request) throws Exception { EventLoopGroup group = new NioEventLoopGroup(); try { // 创建并初始化 Netty 客户端 Bootstrap 对象 Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group); bootstrap.channel(NioSocketChannel.class); bootstrap.handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel channel) throws Exception { ChannelPipeline pipeline = channel.pipeline(); pipeline.addLast(new RpcDecoder(Response.class)); pipeline.addLast(new RpcEncoder(Request.class)); pipeline.addLast(NettyClient.this); } }); bootstrap.option(ChannelOption.TCP_NODELAY, true);
// String[] discover = new Discover().discover("/yanzhenyidai/com.yanzhenyidai.server").split(":");
// 连接 RPC 服务器 ChannelFuture future = bootstrap.connect(ip, port).sync(); // 写入 RPC 请求数据并关闭连接 Channel channel = future.channel(); channel.writeAndFlush(request).sync(); channel.closeFuture().sync(); return response; } finally { group.shutdownGracefully(); } } public static void main(String[] args) throws Exception { Request request = new Request(); request.setRequestId(UUID.randomUUID().toString()); request.setParameter("Hello Server !"); System.out.println(JSON.toJSONString(new NettyClient("127.0.0.1", 30000).client(request))); }
}
测试
如果以上所有内容都准备就绪,那么就可以进行调试了。
启动顺序,先启动NettyServer,再启动NettyClient。
总结
记得刚出来工作时,有工作很多年的同事问我了不了解Netty,当时工作太短,直说听过Putty,现在回想起来真的挺丢人的,哈哈。😋
Netty作为通信框架,如果你了解TCP,而且项目中有类似传输信息的需求,又不想集成HTTP或者Socket,那么Netty真的挺实用的。
参考资料:
Dubbo-Netty
Netty.io
本项目Github地址:Netty-RPC
低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
Java 8 Stream API学习总结
Java 8 API添加了一个新的抽象称为流Stream,可以让你以一种声明的方式处理数据。Stream API可以极大提高Java程序员的生产力,让程序员写出高效率、干净、简洁的代码。这种风格将要处理的元素集合看作一种流, 流在管道中传输, 并且可以在管道的节点上进行处理, 比如筛选, 排序,聚合等。元素流在管道中经过中间操作(intermediate operation)的处理,最后由最终操作(terminal operation)得到前面处理的结果。 这一次为什么要系统性的总结一下 Java 8 Stream API 呢?说得简单点,我们先不论性能,我们就是为了 装x ,而且要让这个 x 装得再优秀一些,仅此而已! Stream基础知识 流程 创建流 → 流的中间操作 → 流的最终操作 创建流 我们需要把哪些元素放入流中,常见的api有: // 使用List创建流 list.stream() // 使用一个或多个元素创建流 Stream.of(T value) Stream.of(T... values) // 使用数组创建流 Arrays.stream(T[] array) /...
- 下一篇
Spring Boot 2.3.0 发布
Spring Boot 2.3.0 已经发布,此版本主要更新内容包括: 依赖升级 新版本核心组件依赖升级 Spring Data Neumann Spring HATEOAS 1.1 Spring Integration 5.3 Spring Kafka 2.5 Spring Security 5.3 Spring Session Dragonfruit 三方组件依赖 Cassandra Driver 4.6 Couchbase Client 3.0 Elasticsearch 7.6 Kafka 2.5 Micrometer 1.5 MongoDB 4.0 支持 Java 14 spring boot 2.3.0 支持 Java 14 ,同样对 Java 11 、Java 8 提供兼容 Docker 容器支持 spring boot 2.3.0 添加了部分功能用来帮助将Spring Boot 应用直接打包到 Docker 镜像。 支持 Cloud Native Buildpacks 构建镜像 maven 插件 增加 spring-boot:build-image 、gradle增加b...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- Docker使用Oracle官方镜像安装(12C,18C,19C)
- CentOS8编译安装MySQL8.0.19
- CentOS8,CentOS7,CentOS6编译安装Redis5.0.7
- SpringBoot2整合MyBatis,连接MySql数据库做增删改查操作
- SpringBoot2整合Redis,开启缓存,提高访问速度
- SpringBoot2配置默认Tomcat设置,开启更多高级功能
- Hadoop3单机部署,实现最简伪集群
- CentOS7,CentOS8安装Elasticsearch6.8.6
- CentOS6,7,8上安装Nginx,支持https2.0的开启
- SpringBoot2编写第一个Controller,响应你的http请求并返回结果