Java学习笔记——dubbo服务之底层通讯协议Protocol
我们先来找到通讯协议的入口点吧。通过Protocol接口查找通讯协议入口点,我们根据接口的export方法搜索发现入口了,在ServiceConfig的doExportUrlsFor1Protocol方法,如下图: 然后我们进入 protocol.export(invoker)方法发现有很多实现类,根据spi(不懂的请看之前写的容器篇)查看配置文件能找到如下 registry=com.alibaba.dubbo.registry.integration.RegistryProtocol dubbo=com.alibaba.dubbo.rpc.protocol.dubbo.DubboProtocol//这个是默认的,我们在Protocol接口上可以看到spi的注解 filter=com.alibaba.dubbo.rpc.protocol.ProtocolFilterWrapper listener=com.alibaba.dubbo.rpc.protocol.ProtocolListenerWrapper mock=com.alibaba.dubbo.rpc.support.MockProtocol injvm=com.alibaba.dubbo.rpc.protocol.injvm.InjvmProtocol rmi=com.alibaba.dubbo.rpc.protocol.rmi.RmiProtocol hessian=com.alibaba.dubbo.rpc.protocol.hessian.HessianProtocol com.alibaba.dubbo.rpc.protocol.http.HttpProtocol com.alibaba.dubbo.rpc.protocol.webservice.WebServiceProtocol thrift=com.alibaba.dubbo.rpc.protocol.thrift.ThriftProtocol memcached=com.alibaba.dubbo.rpc.protocol.memcached.MemcachedProtocol redis=com.alibaba.dubbo.rpc.protocol.redis.RedisProtocol rest=com.alibaba.dubbo.rpc.protocol.rest.RestProtocol 进入DubboProtocol.export(Invoker<T> invoker)方法里面有个 openServer(url); 代码: privatevoidopenServer(URLurl){ //findserver. Stringkey=url.getAddress(); //client也可以暴露一个只有server可以调用的服务。 booleanisServer=url.getParameter(Constants.IS_SERVER_KEY,true); if(isServer){ ExchangeServerserver=serverMap.get(key); if(server==null){ serverMap.put(key,createServer(url));//createServer是创建服务 }else{ //server支持reset,配合override功能使用 server.reset(url); } } } 继续进入createServer,上源码 privateExchangeServercreateServer(URLurl){ //默认开启server关闭时发送readonly事件 url=url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY,Boolean.TRUE.toString()); //默认开启heartbeat url=url.addParameterIfAbsent(Constants.HEARTBEAT_KEY,String.valueOf(Constants.DEFAULT_HEARTBEAT)); Stringstr=url.getParameter(Constants.SERVER_KEY,Constants.DEFAULT_REMOTING_SERVER); if(str!=null&&str.length()>0&&!ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) thrownewRpcException("Unsupportedservertype:"+str+",url:"+url); url=url.addParameter(Constants.CODEC_KEY,Version.isCompatibleVersion()?COMPATIBLE_CODEC_NAME:DubboCodec.NAME); ExchangeServerserver; try{ server=Exchangers.bind(url,requestHandler); }catch(RemotingExceptione){ thrownewRpcException("Failtostartserver(url:"+url+")"+e.getMessage(),e); } str=url.getParameter(Constants.CLIENT_KEY); if(str!=null&&str.length()>0){ Set<String>supportedTypes=ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(); if(!supportedTypes.contains(str)){ thrownewRpcException("Unsupportedclienttype:"+str); } } returnserver; } dubbo从要暴漏的服务的URL中取得相关的配置(host,port等)进行服务端server的创建,同上面的server = Exchangers.bind(url, requestHandler) 正式创建服务。 所以基本的创建步骤是 export() --> openServer() --> createServer() --> server = Exchangers.bind(url, requestHandler); 我们进行来看 Exchangers.bind(url, requestHandler) 源码: publicstaticExchangeServerbind(URLurl,ExchangeHandlerhandler)throwsRemotingException{ if(url==null){ thrownewIllegalArgumentException("url==null"); } if(handler==null){ thrownewIllegalArgumentException("handler==null"); } url=url.addParameterIfAbsent(Constants.CODEC_KEY,"exchange"); returngetExchanger(url).bind(url,handler); } 然后通过getExchanger(url).bind(url, handler)的bing进入 HeaderExchanger类 publicExchangeServerbind(URLurl,ExchangeHandlerhandler)throwsRemotingException{ returnnewHeaderExchangeServer(Transporters.bind(url,newDecodeHandler(newHeaderExchangeHandler(handler)))); } 在进入Transporters类的bing的 publicstaticServerbind(URLurl,ChannelHandler...handlers)throwsRemotingException{ if(url==null){ thrownewIllegalArgumentException("url==null"); } if(handlers==null||handlers.length==0){ thrownewIllegalArgumentException("handlers==null"); } ChannelHandlerhandler; if(handlers.length==1){ handler=handlers[0]; }else{ handler=newChannelHandlerDispatcher(handlers); } returngetTransporter().bind(url,handler); } 通过bing可以知道他讲调用:GrizzlyTransporter,MinaTransporter,NettyTransporter 通过spi默认是调用NettyTransporter 到这里我们基本明白dubbo的通讯默认是交给了netty来处理, 我们在看下doOPen方法 @Override protectedvoiddoOpen()throwsThrowable{ NettyHelper.setNettyLoggerFactory(); ExecutorServiceboss=Executors.newCachedThreadPool(newNamedThreadFactory("NettyServerBoss",true)); ExecutorServiceworker=Executors.newCachedThreadPool(newNamedThreadFactory("NettyServerWorker",true)); ChannelFactorychannelFactory=newNioServerSocketChannelFactory(boss,worker,getUrl().getPositiveParameter(Constants.IO_THREADS_KEY,Constants.DEFAULT_IO_THREADS)); bootstrap=newServerBootstrap(channelFactory); finalNettyHandlernettyHandler=newNettyHandler(getUrl(),this); channels=nettyHandler.getChannels(); //https://issues.jboss.org/browse/NETTY-365 //https://issues.jboss.org/browse/NETTY-379 //finalTimertimer=newHashedWheelTimer(newNamedThreadFactory("NettyIdleTimer",true)); bootstrap.setPipelineFactory(newChannelPipelineFactory(){ publicChannelPipelinegetPipeline(){ NettyCodecAdapteradapter=newNettyCodecAdapter(getCodec(),getUrl(),NettyServer.this); ChannelPipelinepipeline=Channels.pipeline(); /*intidleTimeout=getIdleTimeout(); if(idleTimeout>10000){ pipeline.addLast("timer",newIdleStateHandler(timer,idleTimeout/1000,0,0)); }*/ pipeline.addLast("decoder",adapter.getDecoder());//解码 pipeline.addLast("encoder",adapter.getEncoder());//编码 pipeline.addLast("handler",nettyHandler); returnpipeline; } }); //bind channel=bootstrap.bind(getBindAddress()); } 了解netty的同学,肯定早已习惯这个方法的写法,就是创建了netty的server嘛,到这里dubbo的服务创建完毕了,这个时候控制台见打印: [DUBBO]StartNettyServerbind/0.0.0.0:20880,export/192.168.4.241:20880,dubboversion:2.8.4,currenthost:127.0.0.1