请先关注 [低调大师] 公众号 优秀的自媒体个人博客,低调大师,许军

低调大师

您现在的位置是:首页>文章详情

文章详情

基于TCP的远程服务调用

2018-09-21 60热度

前言

    上篇,分析了基于HTTP方式的RPC调用。本篇将在上篇的基础上,分析基于TCP方式的RPC调用。代码的整体思路是一致的,可以看作是在上篇功能上的扩展——即通信的方式。

    代码:https://gitee.com/marvelcode/marvelcode-rpc.git

 

源码

    上篇基于HTTP的远程服务调用,在服务消费方,提到过一个 ReferenceAgent 的抽象,可看作是远程服务的代理对象。即通过 JDK 动态代理所有接口方法,进而在 invoke 中使用 RestTemplate 发起HTTP请求并获取响应,接口最重要的方法定义如下图:

package com.menghao.rpc.consumer.handle; import com.menghao.rpc.consumer.model.RpcRequest; import com.menghao.rpc.util.RequestIdUtils; import java.lang.reflect.Method; import java.util.List; /** * <p>Rpc框架消费方代理.<br> * <p>执行具体调用,子类实现Http、Tcp方式的Rpc请求<p/> * <p>需要保证线程安全(会被并发调用)</p> * * @author MarvelCode. */ public interface ReferenceAgent { /** * 通过rpc方式调用处理 * * @param method 调用方法 * @param args 调用参数 * @return Object 调用结果 */ Object invoke(Method method, Object[] args); }

    同样的,扩展 TCP 方式调用就需要从此处着手,来扩展新的实现,以发送TCP包给服务提供方。其余的,像对象代理赋值,以及ZooKeeper节点的监听,都可以复用。由此也可以看出,面向接口编程的优势:屏蔽实现,以最小化的代价切换、扩展方案。

    那么就来看下子类 TcpReferenceAgent 的实现:

package com.menghao.rpc.consumer.handle.tcp; import com.menghao.rpc.consumer.balance.LoadBalancer; import com.menghao.rpc.consumer.balance.RandomLoadBalancer; import com.menghao.rpc.consumer.handle.ReferenceAgent; import com.menghao.rpc.consumer.model.ReferenceKey; import com.menghao.rpc.consumer.model.RpcRequest; import com.menghao.rpc.exception.InvokeException; import com.menghao.rpc.netty.TcpConnectionContainer; import com.menghao.rpc.netty.model.TcpConnection; import com.menghao.rpc.spring.BeansManager; import lombok.Getter; import java.lang.reflect.Method; import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; /** * <p>ReferenceAgent T方式实现.</br> * <p>调用原始接口的任意方法会被该类的invoke方法代理:使用Netty发送请求</p> * <p>sourceInterface/implCode:唯一标识一个服务</p> * * @author MarvelCode */ public class TcpReferenceAgent implements ReferenceAgent { @Getter private Class sourceInterface; @Getter private String implCode; private List<String> providerHosts; private TcpConnectionContainer tcpConnectionContainer; private LoadBalancer defaultBalancer = new RandomLoadBalancer(); private ReadWriteLock lock = new ReentrantReadWriteLock(); public TcpReferenceAgent(ReferenceKey referenceKey) { this.sourceInterface = referenceKey.getSourceInterface(); this.implCode = referenceKey.getName(); this.tcpConnectionContainer = BeansManager.getInstance().getBeanByType(TcpConnectionContainer.class); this.providerHosts = new ArrayList<>(); } @Override public Object invoke(Method method, Object[] args) { // 构造请求参数 RpcRequest rpcRequest = makeParam(method, args); // 负载均衡选取Tcp连接 TcpConnection tcpConnection = select(); // 构建调用上下文 InvocationContext invocationContext = new InvocationContext(tcpConnection, rpcRequest); // 执行调用 invocationContext.execute(); // 阻塞获取结果 return invocationContext.get(); } @Override public void setProviderHosts(List<String> hosts) { lock.writeLock().lock(); try { // 刷新Tcp连接 refreshConnection(providerHosts, hosts); this.providerHosts = hosts; } finally { lock.writeLock().unlock(); } } private TcpConnection select() { lock.readLock().lock(); try { if (providerHosts == null || providerHosts.size() == 0) { throw new InvokeException("There are currently no service providers available"); } // 负载均衡 String host = defaultBalancer.select(providerHosts); String[] info = host.split(":"); return tcpConnectionContainer.get(info[0], Integer.valueOf(info[1])); } finally { lock.readLock().unlock(); } } private void refreshConnection(List<String> lastHost, List<String> nowHost) { Set<String> commonHost = new HashSet<>(lastHost); // 当前存活机器与上次存活机器交集 commonHost.retainAll(nowHost); Set<String> lostHost = new HashSet<>(lastHost); Set<String> addHost = new HashSet<>(nowHost); // 当前存活机器与交集的差集,得出新增的机器 addHost.removeAll(commonHost); // 上次存活机器与交集的差集,得出下线的机器 lostHost.removeAll(commonHost); // 下线的机器,将关闭并移除Tcp连接 for (String host : lostHost) { String[] info = host.split(":"); tcpConnectionContainer.remove(info[0], Integer.valueOf(info[1])); } // 新增的机器,将新建Tcp连接 for (String host : addHost) { String[] info = host.split(":"); tcpConnectionContainer.register(info[0], Integer.valueOf(info[1])); } } } 

    从逻辑上看,分为构造参数、挑选请求机器信息、构造调用上下文、执行,返回结果五步。其中构造参数借助了 JDK8 特性,将 RpcRequest请求实体的构造,放在了接口层实现。其余的步骤引入了几个新的模型对象:

  • TcpConnection:通过 Netty 实现的,服务消费方和服务提供方的长连接。针对一台远程服务器,该连接全局唯一。
    • 举个例子,服务A和服务B部署在相同的两台机器上,那么消费方将持有两条Tcp长连接(针对服务部署的机器,而非服务),调用服务A和B将复用这两条连接。
  • TcpConnectionContainer:上述连接的容器,管理着连接的生命周期。服务的启停会触发连接的打开、关闭。
  • InvocationContext:一次调用的上下文环境(伴随调用的生命周期)。存储了请求数据及响应结果,重要的职责在于回写结果。

    介绍完 TcpReferenceAgent ,我就以连接创建——发起请求——请求处理——响应处理这四步来分析如何实现。

 

连接创建

    首先,来看看连接的创建过程吧。我的实现采用了 Netty 作为客户端与服务端通信的基础。没Netty基础的小伙伴需要自行补下课了。

 public TcpConnection(String ip, int port) throws InterruptedException { this.close = new AtomicBoolean(true); TcpClient tcpClient = BeansManager.getInstance().getBeanByType(TcpClient.class); ChannelFuture future = tcpClient.connect(ip, port); this.channel = future.channel(); this.close.set(false); }
package com.menghao.rpc.netty; import com.menghao.rpc.NamedThreadFactory; import com.menghao.rpc.netty.in.TcpInboundHandler; import com.menghao.rpc.netty.in.TcpMessageDecoder; import com.menghao.rpc.netty.out.TcpMessageEncoder; import com.menghao.rpc.provider.model.RpcResponse; import io.netty.bootstrap.Bootstrap; import io.netty.buffer.PooledByteBufAllocator; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.timeout.IdleStateHandler; import java.util.List; /** * <p>借助Netty实现TCP客户端.<br> * * @author MarvelCode. */ public class TcpClient { private static EventLoopGroup workerGroup; private Bootstrap bootstrap; private int connectTimeout; private int maxFrameLength; private int readIdle; private int writIdle; private List<TcpMessageHandler> messageHandlers; public TcpClient(int connectTimeout, int maxFrameLength, int readIdle, int writIdle, List<TcpMessageHandler> messageHandlers) { this.connectTimeout = connectTimeout; this.maxFrameLength = maxFrameLength; this.readIdle = readIdle; this.writIdle = writIdle; this.messageHandlers = messageHandlers; } static { // 当jvm关闭的时候执行addShutdownHook添加的钩子方法 Runtime.getRuntime().addShutdownHook(new Thread() { @Override public void run() { workerGroup.shutdownGracefully(); } }); } public void initBootstrap() { bootstrap = new Bootstrap(); workerGroup = new NioEventLoopGroup(0, new NamedThreadFactory("netty-server-io", true)); int ct = connectTimeout > 0 ? this.connectTimeout : 5000; bootstrap.group(workerGroup) // 连接超时时间 .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, ct) // 是否使用Nagle的算法以尽可能发送大块数据 .option(ChannelOption.TCP_NODELAY, true) // 是否启动心跳保活机制(长连接) .option(ChannelOption.SO_KEEPALIVE, true) // 是否允许一个地址重复绑定 .option(ChannelOption.SO_REUSEADDR, true) // 基于内存池的缓冲区重用机制 .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) .option(ChannelOption.RCVBUF_ALLOCATOR, AdaptiveRecvByteBufAllocator.DEFAULT) .channel(NioSocketChannel.class) .handler(new ChannelInitializer() { @Override protected void initChannel(Channel ch) throws Exception { // 客户端需要序列化 rpcRequest、反序列化 rpcResponse ch.pipeline().addLast(new TcpMessageEncoder()) .addLast(new TcpMessageDecoder(maxFrameLength, RpcResponse.class)) .addLast(new IdleStateHandler(readIdle, writIdle, 0)) .addLast(new TcpInboundHandler(messageHandlers)); } }); } public ChannelFuture connect(String ip, int port) throws InterruptedException { return bootstrap.connect(ip, port).sync(); } } 

    其中 TcpClient 的初始化时机,借助了 Spring Bean生命周期的 init-method

 @Bean(initMethod = "initBootstrap") public TcpClient tcpClient() { List<TcpMessageHandler> messageHandlers = new ArrayList<>(2); messageHandlers.add(new RpcRequestMsgHandler()); messageHandlers.add(new RpcResponseMsgHandler()); return new TcpClient(tcpProperties.getConnectTimeout(), tcpProperties.getMaxFrameLength(), tcpProperties.getReadIdle(), tcpProperties.getWritIdle(), messageHandlers); }

    在进行了一些参数的设置之后,就可以调用 connect 方法来创建一条指定ip、端口的连接。最终会使用 io.netty.channel.Channel 对象来进行数据的传输通信。

    其中 ChannelOption.SO_KEEPALIVE 参数设置为 true,来保持长连接。之后会往 ChannelPipeline 中添加 ChannelHandler 处理类。这里包含了信息的编解码Handler、心跳检测Handler,TcpInboundHandler 是对解码后的 RpcRequest/RpcResponse 数据进行处理。

    请求创建好之后,接下来就该发起请求了。    

 

发起请求

    请求的主要工作集中在了 InvocationContext 这个类中,同时这个类还包含了响应数据的回写。利用锁机制,通过响应 result 是否为空来判断一次调用是否已完成,在调用 get 方法时,调用未完成前会一直阻塞,直到 result 结果被回写。

    如果响应的 RpcResponse 是正常响应的结果,就返回;如果是异常,则抛出 :

package com.menghao.rpc.consumer.handle.tcp; import com.menghao.rpc.consumer.model.Future; import com.menghao.rpc.consumer.model.RpcRequest; import com.menghao.rpc.exception.InvokeException; import com.menghao.rpc.netty.model.TcpConnection; import com.menghao.rpc.provider.model.RpcResponse; import com.menghao.rpc.spring.BeansManager; import lombok.Getter; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; /** * <p>一次调用请求上下文.<br> * <p>存放请求及结果,伴随一次调用的生命周期</p> * * @author MarvelCode. */ public class InvocationContext implements Future { private InvocationContextContainer invocationContextContainer; @Getter private RpcRequest rpcRequest; private TcpConnection tcpConnection; /** 异步调用锁,在结果返回时解锁 */ private Lock lock = new ReentrantLock(); private Condition doneCondition = lock.newCondition(); /** 预防调用无返回值的判断 */ private static final Object NULL_OBJECT = new Object(); private Object result; InvocationContext(TcpConnection tcpConnection, RpcRequest rpcRequest) { this.tcpConnection = tcpConnection; this.rpcRequest = rpcRequest; invocationContextContainer = BeansManager.getInstance().getBeanByType(InvocationContextContainer.class); } public void execute() { invocationContextContainer.add(this); tcpConnection.write(rpcRequest); } @Override public Object get() { if (!isDone()) { try { lock.lock(); if (!isDone()) { doneCondition.await(); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } finally { lock.unlock(); } } if (result instanceof Throwable) { throw new InvokeException((Throwable) result); } return result == NULL_OBJECT ? null : result; } @Override public boolean isDone() { return result != null; } public void notifyCompleted(RpcResponse rpcResponse) { lock.lock(); try { if (rpcResponse.getThrowable() != null) { setResult(rpcResponse.getThrowable()); } setResult(rpcResponse.getResult()); doneCondition.signalAll(); } finally { lock.unlock(); } } private void setResult(Object value) { lock.lock(); try { result = (value == null) ? NULL_OBJECT : value; doneCondition.signalAll(); } finally { lock.unlock(); } invocationContextContainer.remove(rpcRequest.getId()); } } 

    这里的 execute 方法,首先将自身(InvocationContext放置到容器中,因为通过管道写完数据流后,不会立马读取到响应流,所以需要有一个唯一标识(标识请求-响应的对应关系),这样在反序列化得到响应数据时,就可以找到对应请求的 InvocationContext 来回写数据了。

    这里生成唯一标识的规则可自定义,即保证一段时间内不会出现重复的即可(并发量越大,越需要考量唯一标识生成的唯一性)。

    当执行了 tcpConnection.write(rpcRequest) 这行代码时,会进而调用 channel.writeAndFlush(rpcRequest),将数据写入管道:

package com.menghao.rpc.netty.out; import com.menghao.rpc.serialize.ObjectOutput; import com.menghao.rpc.serialize.hessian.HessianObjectOutput; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufOutputStream; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToByteEncoder; import java.io.OutputStream; /** * <p>Tcp信息编码类.</br> * * @author MarvelCode */ public class TcpMessageEncoder extends MessageToByteEncoder<Object> { @Override protected void encode(ChannelHandlerContext channelHandlerContext, Object o, ByteBuf byteBuf) throws Exception { OutputStream outputStream = new ByteBufOutputStream(byteBuf); // 预占,回写包长度 byteBuf.writeInt(0); ObjectOutput objectOutput = new HessianObjectOutput(outputStream); objectOutput.writeObject(o); objectOutput.flush(); // 写包长度 byteBuf.setInt(0, byteBuf.writerIndex() - 4); } } 

    可以看到,将对象序列化写入缓冲区前,会先预写一个整形(4字节)的“0”进去,在写入数据流之后,再使用“写指针索引”减去整形的4字节,就得到了数据包的长度。

    这样做的目的,就是利用“消息分割”的方式,解决了Tcp半包、粘包的问题。类似 Http协议的 content-length。

 

请求处理

    既然有序列化,自然有反序列化的处理:

package com.menghao.rpc.netty.in; import com.menghao.rpc.serialize.ObjectInput; import com.menghao.rpc.serialize.hessian.HessianObjectInput; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufInputStream; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import java.io.InputStream; /** * <p>Tcp信息解码类.</br> * * @author MarvelCode */ public class TcpMessageDecoder extends LengthFieldBasedFrameDecoder { private Class<?> covertClass; /** * @param maxFrameLength 解码时,处理每个帧数据的最大长度 * 0 该帧数据中,存放该帧数据的长度的数据的起始位置 * 4 记录该帧数据长度的字段本身的长度 * 0 修改帧数据长度字段中定义的值,可以为负数 * 4 解析的时候需要跳过的字节数 */ public TcpMessageDecoder(int maxFrameLength, Class<?> covertClass) { super(maxFrameLength, 0, 4, 0, 4); this.covertClass = covertClass; } @Override protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception { ByteBuf byteBuf = (ByteBuf) super.decode(ctx, in); if (byteBuf == null) { return null; } try { InputStream inputStream = new ByteBufInputStream(byteBuf); ObjectInput objectInput = new HessianObjectInput(inputStream); return objectInput.readObject(covertClass); } finally { byteBuf.release(); } } } 

    这里通过集成 Netty 提供的 LengthFieldBasedFrameDecoder ,来实现对Tcp包数据部分的截取工作。

    由于这里实现的Tcp协议是我自定义的,所以我很清楚开头使用了几字节来记录数据长度,应该越过几字节才是真正数据的部分。因此配置好参数(即构造器指定的0、4、0、4),进而调用 super.decode(ctx, in),就可以获取到数据流了,然后使用 Hessian 反序列化就获取到了 RpcRequest 对象。

    由于 Netty 的处理链使用的是“责任链”设计模式,上述解码类获取到的 RpcRequest 对象会作为下个处理器的入参:

package com.menghao.rpc.netty.in; import com.menghao.rpc.netty.TcpMessageHandler; import com.menghao.rpc.netty.model.TcpConnection; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import java.util.List; /** * <p>Netty上行处理Handler.</br> * <p>该层已经可以获取到反序列化后的对象</p> * <p>根据对象为 RpcRequest、RpcResponse,执行不同的处理逻辑</p> * * @author MarvelCode */ public class TcpInboundHandler extends SimpleChannelInboundHandler<Object> { private List<TcpMessageHandler> messageHandlers; public TcpInboundHandler(List<TcpMessageHandler> messageHandlers) { this.messageHandlers = messageHandlers; } @SuppressWarnings("unchecked") @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, Object o) throws Exception { if (o == null) { return; } for (TcpMessageHandler tcpMessageHandler : messageHandlers) { Channel channel = channelHandlerContext.channel(); if (tcpMessageHandler.support(o.getClass())) { tcpMessageHandler.handler(new TcpConnection(channel), o); } } } } 

    由于服务提供方、服务消费方都会共用 TcpInboundHandler ,但提供方只会处理 RpcRequest,消费方只会处理 RpcResponse,所以就定义了 TcpMessageHandler 来定制化处理。是不是能看到 SpringMVC-HttpMessageConverter<T> 的影子?

package com.menghao.rpc.netty; import com.menghao.rpc.netty.model.TcpConnection; import java.io.Serializable; /** * <p>Tcp消息处理Handler.</br> * * @author MarvelCode */ public interface TcpMessageHandler<T> { /** * 判断该Handler是否支持指定类型的处理 * * @param supportClass 支持处理的类型 * @return 是否支持该处理 */ boolean support(Class<?> supportClass); /** * 处理读取到的数据 * * @param connection 使用Netty Channel封装的连接 * @param data 将要处理的数据 */ void handler(TcpConnection connection, T data); } 

    该接口的两种实现,分别对应 RpcRequest 和 RpcResponse 类型数据的处理。这里先来看下 RpcRequestMsgHandler

package com.menghao.rpc.provider.handle.tcp; import com.menghao.rpc.consumer.model.RpcRequest; import com.menghao.rpc.netty.TcpMessageHandler; import com.menghao.rpc.netty.model.TcpConnection; import com.menghao.rpc.spring.BeansManager; /** * <p>RpcRequest处理类.</br> * <p>根据请求信息,定位服务,在 ExecutionExecutor中反射调用</p> * * @author MarvelCode */ public class RpcRequestMsgHandler implements TcpMessageHandler<RpcRequest> { @Override public boolean support(Class<?> supportClass) { return RpcRequest.class.isAssignableFrom(supportClass); } @Override public void handler(TcpConnection connection, RpcRequest data) { ExecutionExecutor executor = BeansManager.getInstance().getBeanByType(ExecutionExecutor.class); executor.execute(new ExecutionExecutor.ExecutionTask( new ExecutionContext(data, connection) )); } } 
package com.menghao.rpc.provider.handle.tcp; import com.menghao.rpc.consumer.model.RpcRequest; import com.menghao.rpc.NamedThreadFactory; import com.menghao.rpc.exception.InvokeException; import com.menghao.rpc.provider.model.ProviderKey; import com.menghao.rpc.provider.regisiter.ProviderRepository; import com.menghao.rpc.spring.BeansManager; import org.springframework.util.ReflectionUtils; import java.lang.reflect.Method; import java.text.MessageFormat; import java.util.concurrent.*; /** * <p>Rpc调用的执行器.</br> * <p>线程池执行</p> * * @author MarvelCode */ public class ExecutionExecutor { private ThreadPoolExecutor threadPoolExecutor; private int queueLimit; private static final String THREAD_PREFIX = "Executor-Execution-Task"; public ExecutionExecutor(int corePoolSize, int maxPoolSize, int keepAliveTime, int queueLimit) { this.queueLimit = queueLimit; threadPoolExecutor = new ExecutionTaskThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime, TimeUnit.SECONDS, new ExecutionTaskBlockQueue(), new NamedThreadFactory(THREAD_PREFIX, true)); } public void execute(ExecutionTask task) { int queueSize = threadPoolExecutor.getQueue().size(); // TODO 队列溢出处理 if (queueSize > queueLimit) { } threadPoolExecutor.execute(task); } public static class ExecutionTask implements Runnable { private ExecutionContext executionContext; private ProviderRepository providerRepository; ExecutionTask(ExecutionContext executionContext) { this.executionContext = executionContext; this.providerRepository = BeansManager.getInstance().getBeanByType(ProviderRepository.class); } @Override public void run() { RpcRequest rpcRequest = executionContext.getRequest(); ProviderKey providerKey = new ProviderKey(rpcRequest.getContract(), rpcRequest.getImplCode()); Object instance = providerRepository.getProvider(providerKey); if (instance == null) { // 无对应的服务单例,抛异常 executionContext.writeException(new InvokeException( MessageFormat.format("service {0} not found", providerKey))); return; } try { Object result = invoke(instance, rpcRequest); executionContext.writeResult(result); } catch (Exception e) { executionContext.writeException(new InvokeException(e)); } } private Object invoke(Object instance, RpcRequest rpcRequest) { Method method = ReflectionUtils.findMethod(instance.getClass(), rpcRequest.getMethod(), rpcRequest.getArgsType()); return ReflectionUtils.invokeMethod(method, instance, rpcRequest.getArgs()); } } private class ExecutionTaskThreadPoolExecutor extends ThreadPoolExecutor { ExecutionTaskThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory); } /** * TODO 执行前置操作(可扩展,计数同一时刻某方法/某服务并发量) */ @Override protected void beforeExecute(Thread t, Runnable r) { super.beforeExecute(t, r); } /** * TODO 执行后置操作(可扩展,计数同一时刻某方法/某服务并发量) */ @Override protected void afterExecute(Runnable r, Throwable t) { super.afterExecute(r, t); } } /** * TODO 自定义任务队列 */ private class ExecutionTaskBlockQueue extends LinkedBlockingQueue<Runnable> { } } 

    服务的调用逻辑,同上篇Http方式一样,都是使用“反射调用”的方式。不同点在于,采用了线程池执行,可以应付更高的并发请求。

    这里的 ExecutionContext 是服务提供方的执行上下文,不同于服务消费方的 InvocationContext,别搞混了~

    其中 executionContext.writeResult(result),同样使用了 TcpMessageEncoder 进行序列化。

 

响应处理

    服务提供方将响应数据通过管道传输给消费方,服务消费方再借助 TcpMessageDecoder 对数据流反序列化获取到 RpcResponse,同样进入 TcpInboundHandler 处理,这些代码之前都分析过了,直接来看 RpcResponse 对应的处理类即可:

package com.menghao.rpc.consumer.handle.tcp; import com.menghao.rpc.netty.TcpMessageHandler; import com.menghao.rpc.netty.model.TcpConnection; import com.menghao.rpc.provider.model.RpcResponse; import com.menghao.rpc.spring.BeansManager; /** * <p>RpcResponse处理类.</br> * <p>分析相应结果,调用 InvocationContext回写结果</p> * * @author MarvelCode */ public class RpcResponseMsgHandler implements TcpMessageHandler<RpcResponse> { @Override public boolean support(Class<?> supportClass) { return RpcResponse.class.isAssignableFrom(supportClass); } @Override public void handler(TcpConnection connection, RpcResponse data) { InvocationContextContainer contextContainer = BeansManager.getInstance().getBeanByType(InvocationContextContainer.class); InvocationContext invocationContext = contextContainer.get(data.getId()); if (invocationContext != null) { invocationContext.notifyCompleted(data); contextContainer.remove(data.getId()); } } } 

    嗯,没错。逻辑很简单,通过“唯一标识”找到对应的 InvocationContext,然后调用 invocationContext.notifyCompleted(data) 以通知调用完成,最后从 InvocationContextContatiner 中移除已经完成的调用上下文。

    到此整个基于 Tcp 方式的远程服务调用就分析结束了。

 

配置

    在上篇的基础上,其余配置不变,仅将 marvel.rpc.type 由 http 改为 tcp 即可。

marvel.rpc.type=tcp

 

总结

    到此,我用两篇博文,分别对基于HTTP、TCP两种方式的远程服务调用进行了简要分析。后期会逐渐对代码进行完善,各位如果发现代码有任何的缺陷,或对代码思路有更好的建议,都可以跟我讨论。

收藏 (0)

相关文章

    文章评论

    共有0条评论来说两句吧...