自己实现一个RPC框架
RPC框架称为远程调用框架,其实现的核心原理就是消费者端使用动态代理来代理一个接口的方法(基于JDK的动态代理,当然如果使用CGLib可以直接使用无接口类的方法),通过加入网络传输编程,传输调用接口方法名称,方法参数来给提供者获取,再通过反射,来执行该接口的方法,再将反射执行的结果通过网络编程传回消费者端。
现在我们来依次实现这些概念。这里我们做最简单的实现,网络编程使用的是BIO,大家可以使用Reactor模式的Netty来改写性能更好的方式。而网络传输中使用的序列化和反序列化也是Java自带的,当然这样的传输字节比较大,可以使用google的protoBuffer或者kryo来处理。这里只为了方便说明原理。
pom
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.guanjian</groupId> <artifactId>rpc-framework</artifactId> <version>1.0-SNAPSHOT</version> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.7.0</version> <configuration> <source>1.8</source> <target>1.8</target> <encoding>UTF-8</encoding> </configuration> </plugin> </plugins> </build> </project>
首先当然是我们要进行远程调用的接口以及接口的方法。
public interface HelloService { String sayHello(String content); }
接口实现类
public class HelloServiceImpl implements HelloService { public String sayHello(String content) { return "hello," + content; } }
消费者端的动态代理,如果你是把提供者和消费者写在两个工程中,则提供者端需要上面的接口和实现类,而消费者端只需要上面的接口。
public class ConsumerProxy { /** * 消费者端的动态代理 * @param interfaceClass 代理的接口类 * @param host 远程主机IP * @param port 远程主机端口 * @param <T> * @return */ @SuppressWarnings("unchecked") public static <T> T consume(final Class<T> interfaceClass,final String host,final int port) { return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class[]{interfaceClass}, (proxy,method,args) -> { //创建一个客户端套接字 Socket socket = new Socket(host, port); try { //创建一个对外传输的对象流,绑定套接字 ObjectOutputStream output = new ObjectOutputStream(socket.getOutputStream()); try { //将动态代理的方法名写入对外传输的对象流中 output.writeUTF(method.getName()); //将动态代理的方法的参数写入对外传输的对象流中 output.writeObject(args); //创建一个对内传输的对象流,绑定套接字 //这里是为了获取提供者端传回的结果 ObjectInputStream input = new ObjectInputStream(socket.getInputStream()); try { //从对内传输的对象流中获取结果 Object result = input.readObject(); if (result instanceof Throwable) { throw (Throwable) result; } return result; } finally { input.close(); } } finally { output.close(); } } finally { socket.close(); } } ); } }
有关JDK动态代理的内容可以参考AOP原理与自实现 ,BIO的部分可以参考传统IO与NIO比较
提供者端的网络传输和远程方式调用服务
public class ProviderReflect { private static final ExecutorService executorService = Executors.newCachedThreadPool(); /** * RPC监听和远程方法调用 * @param service RPC远程方法调用的接口实例 * @param port 监听的端口 * @throws Exception */ public static void provider(final Object service,int port) throws Exception { //创建服务端的套接字,绑定端口port ServerSocket serverSocket = new ServerSocket(port); while (true) { //开始接收客户端的消息,并以此创建套接字 final Socket socket = serverSocket.accept(); //多线程执行,这里的问题是连接数过大,线程池的线程数会耗尽 executorService.execute(() -> { try { //创建呢一个对内传输的对象流,并绑定套接字 ObjectInputStream input = new ObjectInputStream(socket.getInputStream()); try { try { //从对象流中读取接口方法的方法名 String methodName = input.readUTF(); //从对象流中读取接口方法的所有参数 Object[] args = (Object[]) input.readObject(); Class[] argsTypes = new Class[args.length]; for (int i = 0;i < args.length;i++) { argsTypes[i] = args[i].getClass(); } //创建一个对外传输的对象流,并绑定套接字 //这里是为了将反射执行结果传递回消费者端 ObjectOutputStream output = new ObjectOutputStream(socket.getOutputStream()); try { Class<?>[] interfaces = service.getClass().getInterfaces(); Method method = null; for (int i = 0;i < interfaces.length;i++) { method = interfaces[i].getDeclaredMethod(methodName,argsTypes); if (method != null) { break; } } Object result = method.invoke(service, args); //将反射执行结果写入对外传输的对象流中 output.writeObject(result); } catch (Throwable t) { output.writeObject(t); } finally { output.close(); } } catch (Exception e) { e.printStackTrace(); } finally { input.close(); } } finally { socket.close(); } } catch (Exception e) { e.printStackTrace(); } }); } } }
启动提供者端的网络侦听和远程调用
public class RPCProviderMain { public static void main(String[] args) throws Exception { HelloService service = new HelloServiceImpl(); ProviderReflect.provider(service,8083); } }
启动消费者的动态代理调用
public class RPCConsumerMain { public static void main(String[] args) throws InterruptedException { HelloService service = ConsumerProxy.consume(HelloService.class,"127.0.0.1",8083); for (int i = 0;i < 1000;i++) { String hello = service.sayHello("你好_" + i); System.out.println(hello); Thread.sleep(1000); } } }
运行结果
hello,你好_0
hello,你好_1
hello,你好_2
hello,你好_3
hello,你好_4
hello,你好_5
.....
如果你要扩展成一个Netty+ProtoBuffer的高性能RPC框架可以参考Netty整合Protobuffer 的相关写法。有关Netty的相关内容可以参考Netty整理 、Netty整理(二) 、Netty整理(三)。
低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
数据爆发式增长下,CIO不可不知的“数据经济学”
导读:6月9日,全速重构•2020阿里云线上峰会如期举行。阿里巴巴研究员、阿里云智能存储资深产品总监Alex Chen做了主题为《面向未来,企业CIO该懂的数据经济学》的分享。在分享中,他畅谈了数据增长所带来的全新挑战,同时从“数据经济学”的角度,阐述了阿里云存储的应对之策。本文系根据此次分享整理而成。 一、数据增长带来的挑战 大部分企业/组织管理者都需要面对数据的快速增长,这些数据可能是结构化的,也有可能是非结构化的;可能是从传统业务而来,也有可能是从物联网 (IoT) 、人工智能(AI)等新的业务而来;可能是人产生的数据,也可能是机器产生的数据;可能是企业组织自身产生的数据,也有可能是外部合作产生的数据……数据的这种爆发式增长是难以控制的指数级增长。 除此之外,数据的价值呈现两极化的分布。初期,数据产生实时的洞察,比如当消费者进入到购物网站之后所进行的一些及时的推荐,这类推荐更多的是实时的且具有实效性。同时,随着数据量的积累,数据价值则是对长期的、宏观的探索和归纳的洞察。因此,数据价值的曲线就呈现出两极化的态势,即所谓的“微笑曲线”。 基于这样的“微笑曲线”,在数字化的变革中就必须...
- 下一篇
Dactor 1.1.2 版本发布
DActor框架基于协程思想设计,可同时支持同步和异步代码,简化在线异步代码的开发,用同步代码的思维来开发异步代码,兼顾异步代码的高并发、无阻塞和同步代码的易读性,可维护性。 最大程度的降低阻塞,提高单个线程的处理能力,并可有效的降低线程数。 更新功能: 增加雪花算法IdWorker 支持SpringBoot 使用 AsyncServlet标记为废弃,使用AsyncServletFilter过滤器替代 增加 DyanmicUrlPattern 接口,可动态进行路径匹配 队列满时抛出连接数过多异常 增加队列监控日志 最低支持JDK8,框架使用泛型进行完善 相关依赖升级到最新版本 相关链接 详细更新日志与下载地址: 点击查看
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- SpringBoot2编写第一个Controller,响应你的http请求并返回结果
- CentOS8,CentOS7,CentOS6编译安装Redis5.0.7
- MySQL8.0.19开启GTID主从同步CentOS8
- CentOS7,CentOS8安装Elasticsearch6.8.6
- Docker使用Oracle官方镜像安装(12C,18C,19C)
- Jdk安装(Linux,MacOS,Windows),包含三大操作系统的最全安装
- Linux系统CentOS6、CentOS7手动修改IP地址
- CentOS7安装Docker,走上虚拟化容器引擎之路
- CentOS7编译安装Cmake3.16.3,解决mysql等软件编译问题
- SpringBoot2全家桶,快速入门学习开发网站教程