您现在的位置是:首页 > 文章详情

Dapr Pub/Sub 集成 RabbitMQ 、Golang、Java、DotNet Core

日期:2021-05-10点击:368

前置条件:
《Dapr运用》
《Dapr 运用之 Java gRPC 调用篇》
《Dapr 运用之集成 Asp.Net Core Grpc 调用篇》


  1. 搭建 RabbitMQ

    • Docker 搭建 RabbitMQ 服务

      docker run -d --hostname my-rabbit --name some-rabbit -p 5672:5672 -p 15672:15672 rabbitmq:3-management 
    • 创建 rabbiqmq.yaml

      apiVersion: dapr.io/v1alpha1 kind: Component metadata: name: messagebus spec: type: pubsub.rabbitmq metadata: - name: host value: "amqp://localhost:5672" # Required. Example: "rabbitmq.default.svc.cluster.local:5672" - name: consumerID value: "61415901178272324029" # Required. Any unique ID. Example: "myConsumerID" - name: durable value: "true" # Optional. Default: "false" - name: deletedWhenUnused value: "false" # Optional. Default: "false" - name: autoAck value: "false" # Optional. Default: "false" - name: deliveryMode value: "2" # Optional. Default: "0". Values between 0 - 2. - name: requeueInFailure value: "true" # Optional. Default: "false". 
  2. 改造 StorageService.Api

    目的:把 StorageService 从 Grpc 客户端改造为 Grpc 服务端,并 Sub Storage.Reduce 主题,完成减库存操作。

    • 删除 Storage 中无用的代码 StorageController.cs

    • 修改 Program.cs 中的 CreateHostBuilder 代码为

      public static IHostBuilder CreateHostBuilder(string[] args) { return Host.CreateDefaultBuilder(args) .ConfigureWebHostDefaults(webBuilder => { webBuilder.ConfigureKestrel(options => { options.Listen(IPAddress.Loopback, 5003, listenOptions => { listenOptions.Protocols = HttpProtocols.Http2; }); }); webBuilder.UseStartup<Startup>(); }); } 
    • 添加 DaprClientService

      public sealed class DaprClientService : DaprClient.DaprClientBase { public override Task<GetTopicSubscriptionsEnvelope> GetTopicSubscriptions(Empty request, ServerCallContext context) { var topicSubscriptionsEnvelope = new GetTopicSubscriptionsEnvelope(); topicSubscriptionsEnvelope.Topics.Add("Storage.Reduce"); return Task.FromResult(topicSubscriptionsEnvelope); } } 

      Dapr 运行时将调用此方法获取 StorageServcie 关注的主题列表

    • 修改 Startup.cs

       /// <summary> /// This method gets called by the runtime. Use this method to add services to the container. /// </summary> /// <param name="services">Services.</param> public void ConfigureServices(IServiceCollection services) { services.AddGrpc(); services.AddDbContextPool<StorageContext>(options => { options.UseMySql(Configuration.GetConnectionString("MysqlConnection")); }); } 
      /// <summary> /// This method gets called by the runtime. Use this method to configure the HTTP request pipeline. /// </summary> /// <param name="app">app.</param> /// <param name="env">env.</param> public void Configure(IApplicationBuilder app, IWebHostEnvironment env) { if (env.IsDevelopment()) { app.UseDeveloperExceptionPage(); } app.UseRouting(); app.UseEndpoints(endpoints => { endpoints.MapSubscribeHandler(); endpoints.MapGrpcService<DaprClientService>(); }); } 
    • 复制 rabbimq.yaml 文件到 components 文件夹中,删除 redis_messagebus.yaml 文件

    • 启动 StorageService 服务

      dapr run --app-id storageService --app-port 5003 --protocol grpc dotnet run 
  3. 使用 Java 开发一个 Order 服务端,Order 服务提供的功能为

    • 下单
    • 查看订单详情
    • 获取订单列表

    在当前上下文中着重处理的是下单功能,以及下单成功后 Java 服务端将发布一个事件到 Storage.Reduce 主题,即减少库存。

    • 创建 CreateOrder.proto 文件

      syntax = "proto3"; package daprexamples; option java_outer_classname = "CreateOrderProtos"; option java_package = "generate.protos"; service OrderService { rpc CreateOrder (CreateOrderRequest) returns (CreateOrderResponse); rpc RetrieveOrder(RetrieveOrderRequest) returns(RetrieveOrderResponse); rpc GetOrderList(GetOrderListRequest) returns(GetOrderListResponse); } message CreateOrderRequest { string ProductID = 1; //Product ID int32 Amount=2; //Product Amount string CustomerID=3; //Customer ID } message CreateOrderResponse { bool Succeed = 1; //Create Order Result,true:success,false:fail } message RetrieveOrderRequest{ string OrderID=1; } message RetrieveOrderResponse{ Order Order=1; } message GetOrderListRequest{ string CustomerID=1; } message GetOrderListResponse{ repeated Order Orders=1; } message Order{ string ID=1; string ProductID=2; int32 Amount=3; string CustomerID=4; } 
    • 使用 protoc 生成 Java 代码

      protoc -I=C:\Users\JR\DaprDemos\java\examples\src\main\protos\examples --java_out=C:\Users\JR\DaprDemos\java\examples\src\main\java C:\Users\JR\DaprDemos\java\examples\src\main\protos\examples\CreateOrder.proto 
    • 引用 MyBatis 做为 Mapper 工具

    • 修改 HelloWorldService.java 文件,提取 GrpcHelloWorldDaprService.java 到单独的包中,在此文件中添加 createOrder()getOrderList()retrieveOrder() 三个函数的实现

    • 复制 rabbimq.yaml 文件到 components 文件夹中,删除原有 redis_messagebus.yaml 文件

    • 启动 OrderService 服务

      dapr run --app-id OrderService --app-port 5000 --protocol grpc -- mvn exec:java -pl=examples -Dexec.mainClass=server.HelloWorldService -Dexec.args="-p 5000" 
  4. 创建 Golang Grpc 客户端,该客户端需要完成创建订单 Grpc 调用,订单创建成功发布扣除库存事件

    • 引用 CreateOrder.proto 文件,并生成 CreateOrder.pb.go 文件

      如未安装 protoc-gen-gogo ,通过一下命令获取并安装

      go get github.com/gogo/protobuf/gogoproto 

      安装 protoc-gen-gogo

      go install github.com/gogo/protobuf/gogoproto 

      根据 proto 文件生成代码

      protoc -I C:\Users\JR\DaprDemos\golang\shoppingCartForJava\protos\daprexamples C:\Users\JR\DaprDemos\golang\shoppingCartForJava\protos\daprexamples\CreateOrder.proto --go_out=plugins=grpc:C:\Users\JR\DaprDemos\golang\shoppingCartForJava\protos\daprexamples\ 
    • 客户端代码,创建订单

      ... response, err := client.InvokeService(context.Background(), &pb.InvokeServiceEnvelope{ Id: "OrderService", Data: createOrderRequestData, Method: "createOrder", }) if err != nil { fmt.Println(err) return } ... 
    • 添加 DataToPublish.proto 文件,此文件作为事件发布数据结构

      syntax = "proto3"; package daprexamples; option java_outer_classname = "DataToPublishProtos"; option java_package = "generate.protos"; message StorageReduceData { string ProductID = 1; int32 Amount=2; } 
    • 生成 DataToPublish 代码

       protoc -I C:\Users\JR\DaprDemos\golang\shoppingCartForJava\protos\daprexamples C:\Users\JR\DaprDemos\golang\shoppingCartForJava\protos\daprexamples\DataToPublish.proto --go_out=plugins=grpc:C:\Users\JR\DaprDemos\golang\shoppingCartForJava\protos\daprexamples\ 
    • 修改 main.go 代码,根据 createOrder 结果判断是否要发布信息到消息队列

      ... createOrderResponse := &daprexamples.CreateOrderResponse{} if err := proto.Unmarshal(response.Data.Value, createOrderResponse); err != nil { fmt.Println(err) return } fmt.Println(createOrderResponse.Succeed) if !createOrderResponse.Succeed { //下单失败 return } storageReduceData := &daprexamples.StorageReduceData{ ProductID: createOrderRequest.ProductID, Amount: createOrderRequest.Amount, } storageReduceDataData, err := jsoniter.ConfigFastest.Marshal(storageReduceData) //ptypes.MarshalAny(storageReduceData) if err != nil { fmt.Println(err) return } _, err = client.PublishEvent(context.Background(), &pb.PublishEventEnvelope{ Topic: "Storage.Reduce", Data: &any.Any{Value: storageReduceDataData}, }) fmt.Println(storageReduceDataData) if err != nil { fmt.Println(err) } else { fmt.Println("Published message!") } ... 

      注意: 发送数据前,使用 jsoniter 转换数据为 json 字符串,原因是如果直接传输 Grpc 流,当前版本(0.3.x) Dapr runtime 打包数据时使用 Json 打包,解包使用 String ,导致数据不一致。

    • 复制 rabbimq.yaml 文件到 components 文件夹,删除原有 redis_messagebus.yaml 文件

    • 启动 golang Grpc 客户端

       dapr run --app-id client go run main.go 

      输出

      == APP == true == APP == Published message! 
  5. RabbitMQ

    • 在浏览器中输入 http://localhost:15672/ ,账号和密码均为 guest

    • 查看 Connections ,有3个连接

      • 这个3个连接来自配置了 messagebus.yaml 组件的三个服务
    • 查看 Exchanges

      Name Type Features Message rate in Message rate out (AMQP default) direct D Storage.Reduce fanout D amq.direct direct D amq.fanout fanout D ... 

      着重看 Storage.Reduce ,可以看出 Dapr 运行时创建了一个 fanout 类型的 Exchange ,这表明该 Exhange 中的数据是广播的。

    • 查看 Queues

      Dapr 运行时创建了 storageService-Storage.Reduce ,该 Queue 绑定了 Storage.Reduce Exchange ,所以可以收到 Storage.Reduce 的广播数据。

  6. DotNet Core StorageService.Api 改造以完成 Sub 事件

    • 打开 DaprClientService.cs 文件,更改内容为

      public sealed class DaprClientService : DaprClient.DaprClientBase { private readonly StorageContext _storageContext; public DaprClientService(StorageContext storageContext) { _storageContext = storageContext; } public override Task<GetTopicSubscriptionsEnvelope> GetTopicSubscriptions(Empty request, ServerCallContext context) { var topicSubscriptionsEnvelope = new GetTopicSubscriptionsEnvelope(); topicSubscriptionsEnvelope.Topics.Add("Storage.Reduce"); return Task.FromResult(topicSubscriptionsEnvelope); } public override async Task<Empty> OnTopicEvent(CloudEventEnvelope request, ServerCallContext context) { if (request.Topic.Equals("Storage.Reduce")) { StorageReduceData storageReduceData = StorageReduceData.Parser.ParseJson(request.Data.Value.ToStringUtf8()); Console.WriteLine("ProductID:" + storageReduceData.ProductID); Console.WriteLine("Amount:" + storageReduceData.Amount); await HandlerStorageReduce(storageReduceData); } return new Empty(); } private async Task HandlerStorageReduce(StorageReduceData storageReduceData) { Guid productID = Guid.Parse(storageReduceData.ProductID); Storage storageFromDb = await _storageContext.Storage.FirstOrDefaultAsync(q => q.ProductID.Equals(productID)); if (storageFromDb == null) { return; } if (storageFromDb.Amount < storageReduceData.Amount) { return; } storageFromDb.Amount -= storageReduceData.Amount; Console.WriteLine(storageFromDb.Amount); await _storageContext.SaveChangesAsync(); } 
    • 说明

      • 添加 GetTopicSubscriptions() 将完成对主题的关注
        • 当应用停止时,RabbitMQ 中的 Queue 自动删除
        • 添加 OnTopicEvent() 重写,此方法将完成对 Sub 主题的事件处理
      • HandlerStorageReduce 用于减少库存
  7. 启动 DotNet Core StorageService.Api Grpc 服务,启动 Java OrderService Grpc 服务,启动 Go Grpc 客户端

    • DotNet Core

      dapr run --app-id storageService --app-port 5003 --protocol grpc dotnet run 
    • Java

      dapr run --app-id OrderService --app-port 5000 --protocol grpc -- mvn exec:java -pl=examples -Dexec.mainClass=server.HelloWorldService -Dexec.args="-p 5000" 
    • go

      dapr run --app-id client go run main.go 

      go grpc 输出为

      == APP == true == APP == Published message! 

    查看 MySql Storage 数据库,对应产品库存减少 20

至此,通过 Dapr runtime 完成了 Go 和 Java 之间的 Grpc 调用,并通过 RabbitMQ 组件完成了 Pub/Sub

源码地址

原文链接:https://blog.51cto.com/u_12857552/2764973
关注公众号

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。

持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。

转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。

文章评论

共有0条评论来说两句吧...

文章二维码

扫描即可查看该文章

点击排行

推荐阅读

最新文章