NET Core微服务之路:让我们对上一个Demo通讯进行修改,完成RPC通讯
- 当服务端启动后,绑定并监听(READ)设定的端口,比如1889。
- 当客户端启动后,绑定指定端口,等待用户输入。
- 当用户输入任意字符串数据后,客户端将这组数据进行转码为byte格式进行传输到服务端。
- 当服务端收到客户端传来的数据,进行转码后输出控制台,并将这组数据再次回传到客户端。
- 客户端收到数据,也打印出来。
服务端
增加两个静态方法SayHello和SayByebye,用于提供远程调用,超级简单,不解释。
public static class Say { public static string SayHello(string content) { return $"hello {content}"; } public static string SayByebye(string content) { return $"byebye {content}"; } }
在我们原来的ChannelRead函数中,将原有的Echo回路传输,直接替换成如下内容。
1 public override void ChannelRead(IChannelHandlerContext context, object message) 2 { 3 if (message is IByteBuffer buffer) 4 { 5 Console.WriteLine($"message length is {buffer.Capacity}"); 6 var obj = JsonConvert.DeserializeObject<Dictionary<string, string>>(buffer.ToString(Encoding.UTF8).Replace(")", "")); // (1) 7 8 byte[] msg = null; 9 if (obj["func"].Contains("sayHello")) // (2) 10 { 11 msg = Encoding.UTF8.GetBytes(Say.SayHello(json["username"])); 12 } 13 14 if (obj["func"].Contains("sayByebye")) // (2) 15 { 16 msg = Encoding.UTF8.GetBytes(Say.SayByebye(json["username"])); 17 } 18 19 if (msg == null) return; 20 // 设置Buffer大小 21 var b = Unpooled.Buffer(msg.Length, msg.Length); // (3) 22 IByteBuffer byteBuffer = b.WriteBytes(msg); // (4) 23 context.WriteAsync(byteBuffer); // (5) 24 } 25 }
客户端
我们将上一个demo中的EchoClientHandler做如下修改,以完成一个简单的请求
1 public EchoClientHandler() 2 { 3 var hello = new Dictionary<string, string> // (1) 4 { 5 {"func", "sayHello"}, 6 {"username", "stevelee"} 7 }; 8 SendMessage(ToStream(JsonConvert.SerializeObject(hello))); 9 } 10 11 private byte[] ToStream(string msg) 12 { 13 Console.WriteLine($"string length is {msg.Length}"); 14 using (var stream = new MemoryStream()) // (2) 15 { 16 Serializer.Serialize(stream, msg); 17 return stream.ToArray(); 18 } 19 } 20 21 private void SendMessage(byte[] msg) 22 { 23 Console.WriteLine($"byte length is {msg.Length}"); 24 _initialMessage = Unpooled.Buffer(msg.Length, msg.Length); 25 _initialMessage.WriteBytes(msg); // (3) 26 }
启动一下
由于在客户端明文标注了使用sayHello这个方法,客户端会收到服务端返回的"hello stevelee"。
这样一个最简单的RPC远程调用就完成了(其实上一篇就也属于RPC,只是这里用方法和过滤来指定调用)。
问题
- 服务端不可能都通过这样笨拙的过滤方式来调用方法吧?是的,这只是DEMO,为了演示和理解基础概念而已,而是要动过动态代理来实现方法Invoke。
- 这个DEMO只是一个点对点的远程调用,不会涉及到任何服务路由和转发等高级特性。
- 有新的接口的时候时候,需要重新编译和暴露,如果有上万个新的接口,这样的重复工作岂不是疯了。
- ...etc
Esay.Rpc
先看看接口定义了些什么:
1 /// <summary> 2 /// 接口UserService的定义 3 /// </summary> 4 [RpcTagBundle] 5 public interface IUserService 6 { 7 Task<string> GetUserName(int id); 8 9 Task<int> GetUserId(string userName); 10 11 Task<DateTime> GetUserLastSignInTime(int id); 12 13 Task<UserModel> GetUser(int id); 14 15 Task<bool> Update(int id, UserModel model); 16 17 Task<IDictionary<string, string>> GetDictionary(); 18 19 Task Try(); 20 21 Task TryThrowException(); 22 }
1 [ProtoContract] 2 public class UserModel 3 { 4 [ProtoMember(1)] public string Name { get; set; } 5 6 [ProtoMember(2)] public int Age { get; set; } 7 }
上面有两个不一样的标记,也是protobuf-net中独有的特性。
protobuf-net的坑
- 默认例子中该类没有任何继承,因此不会存在一个妖孽问题,但如果UserModel是一个子类,他继承于一个父类,而这个父类也同样拥有多个子类,直接ProtoContract参与序列化将会报错,需要在特性上增加DataMemberOffset = x,此处的x不是字母,而是这个子类的一个序列化顺序。比如有3个子类继承同一个父类,前面两个子类的偏移量分别是1和2,那么这个类的偏移量将设置为3,以此类推。
- 默认的数据类型中,系统定义的标准类型没问题,但有个妖孽的int[]这样的数组类型,那也将是个噩梦,官网团队没有解释为何不支持数组的序列化,我猜测估计是因为数组的不规则性(比如多维数组、甚至不规则的多维数组)而放弃了这个类型的序列化,毕竟序列化是不能影响性能的。
接下来继续服务端的代码
1 static void Main() 2 { 3 var bTime = DateTime.Now; 4 5 // 实现自动装配 6 var serviceCollection = new ServiceCollection(); 7 { 8 serviceCollection 9 .AddLogging() 10 .AddRpcCore() 11 .AddService() 12 .UseSharedFileRouteManager("d:\\routes.txt") 13 .UseDotNettyTransport(); 14 15 // ** 注入本地测试类 16 serviceCollection.AddSingleton<IUserService, UserServiceImpl>(); 17 } 18 19 // 构建当前容器 20 var buildServiceProvider = serviceCollection.BuildServiceProvider(); 21 22 // 获取服务管理实体类 23 var serviceEntryManager = buildServiceProvider.GetRequiredService<IServiceEntryManager>(); 24 var addressDescriptors = serviceEntryManager.GetEntries().Select(i => new ServiceRoute 25 { 26 Address = new[] 27 { 28 new IpAddressModel {Ip = "127.0.0.1", Port = 9881} 29 }, 30 ServiceDescriptor = i.Descriptor 31 }); 32 var serviceRouteManager = buildServiceProvider.GetRequiredService<IServiceRouteManager>(); 33 serviceRouteManager.SetRoutesAsync(addressDescriptors).Wait(); 34 35 // 构建内部日志处理 36 buildServiceProvider.GetRequiredService<ILoggerFactory>().AddConsole((console, logLevel) => (int) logLevel >= 0); 37 38 // 获取服务宿主 39 var serviceHost = buildServiceProvider.GetRequiredService<IServiceHost>(); 40 41 Task.Factory.StartNew(async () => 42 { 43 //启动主机 44 await serviceHost.StartAsync(new IPEndPoint(IPAddress.Parse("127.0.0.1"), 9881)); 45 }); 46 47 Console.ReadLine(); 48 }
1 static void Main() 2 { 3 var serviceCollection = new ServiceCollection(); 4 { 5 serviceCollection 6 .AddLogging() // 添加日志 7 .AddClient() // 添加客户端 8 .UseSharedFileRouteManager(@"d:\routes.txt") // 添加共享路由 9 .UseDotNettyTransport(); // 添加DotNetty通信传输 10 } 11 12 var serviceProvider = serviceCollection.BuildServiceProvider(); 13 14 serviceProvider.GetRequiredService<ILoggerFactory>().AddConsole((console, logLevel) => (int) logLevel >= 0); 15 16 var services = serviceProvider.GetRequiredService<IServiceProxyGenerater>() 17 .GenerateProxys(new[] {typeof(IUserService)}).ToArray(); 18 19 var userService = serviceProvider.GetRequiredService<IServiceProxyFactory>().CreateProxy<IUserService>( 20 services.Single(typeof(IUserService).GetTypeInfo().IsAssignableFrom) 21 ); 22 23 while (true) 24 { 25 Task.Run(async () => 26 { 27 Console.WriteLine($"userService.GetUserName:{await userService.GetUserName(1)}"); 28 Console.WriteLine($"userService.GetUserId:{await userService.GetUserId("rabbit")}"); 29 Console.WriteLine($"userService.GetUserLastSignInTime:{await userService.GetUserLastSignInTime(1)}"); 30 var user = await userService.GetUser(1); 31 Console.WriteLine($"userService.GetUser:name={user.Name},age={user.Age}"); 32 Console.WriteLine($"userService.Update:{await userService.Update(1, user)}"); 33 Console.WriteLine($"userService.GetDictionary:{(await userService.GetDictionary())["key"]}"); 34 await userService.Try(); 35 Console.WriteLine("client function completed!"); 36 }).Wait(); 37 Console.ReadKey(); 38 } 39 }

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
持续投入开源社区建设 | 阿里巴巴又一开源项目被列入 CNCF 云原生全景图
近日,阿里巴巴服务发现和配置管理领域开源项目Nacos被列入云原生全景图谱配置管理和服务发现象限,这是继Dragonfly、Dubbo、RocketMQ、OpenMessaging、 PouchContainer和Sentinel后,阿里巴巴又一开源项目被列入该图谱。借助Nacos,用户在云原生时代构建微服务架构时,可极大的降低生产上的落地难度和实施风险。 CNCF(Cloud Native Computing Foundation)于 2015 年 7 月成立,隶属于 Linux 基金会,初衷围绕"云原生"服务云计算,致力于维护和集成开源技术,支持编排容器化微服务架构应用。2016 年 11 月,CNCF 开始维护 Cloud Native Landscape,汇总流行热门的云原生技术与工具,并加以分类,为企业构建云原生体系提供参
- 下一篇
从“挖光缆”到“剪网线”|蚂蚁金服异地多活单元化架构下的微服务体系
“异地多活”是互联网系统的一种高可用部署架构,而“单元化”正是实现异地多活的一个解题思路。说起这个话题,不得不提两个事件:一件是三年多前的往事,另一件就发生今年的杭州云栖大会上。 从“挖光缆”到“剪网线” 2015年5月27日,因市政施工,支付宝杭州某数据中心的光缆被挖断,造成对部分用户服务不可用,时间长达数小时。其实支付宝的单元化架构容灾很早就开始启动了,2015年也基本上成型了。当时由于事发突然,还是碰到很多实际问题,花费了数小时的时间,才在确保用户数据完全正确的前提下,完成切换、恢复服务。虽然数据没有出错,但对于这样体量的公司来说,服务不可用的社会舆论影响也是非常大的。 527这个数字,成为蚂蚁金服全体技术人心中悬着那颗苦胆。我们甚至把技术部门所在办公楼的一个会议室命名为527,把每年的5月27日定为技术日,来时刻警醒自己敬畏技术
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- SpringBoot2整合Thymeleaf,官方推荐html解决方案
- CentOS7设置SWAP分区,小内存服务器的救世主
- CentOS8编译安装MySQL8.0.19
- SpringBoot2配置默认Tomcat设置,开启更多高级功能
- Docker使用Oracle官方镜像安装(12C,18C,19C)
- Docker安装Oracle12C,快速搭建Oracle学习环境
- Linux系统CentOS6、CentOS7手动修改IP地址
- CentOS6,7,8上安装Nginx,支持https2.0的开启
- SpringBoot2初体验,简单认识spring boot2并且搭建基础工程
- Hadoop3单机部署,实现最简伪集群