WebFlux定点推送、全推送灵活websocket运用
前言
WebFlux 本身提供了对 WebSocket 协议的支持,处理 WebSocket 请求需要对应的 handler 实现 WebSocketHandler 接口,每一个 WebSocket 都有一个关联的 WebSocketSession,包含了建立请求时的握手信息 HandshakeInfo
,以及其它相关的信息。可以通过 session 的 receive()
方法来接收客户端的数据,通过 session 的 send()
方法向客户端发送数据。
示例
下面是一个简单的 WebSocketHandler 示例:
@Component public class EchoHandler implements WebSocketHandler { public Mono<Void> handle(WebSocketSession session) { return session.send( session.receive().map( msg -> session.textMessage("ECHO -> " + msg.getPayloadAsText()))); } }
有了 handler 之后,还需要让 WebFlux 知道哪些请求需要交给这个 handler 进行处理,因此要创建相应的 HandlerMapping。
在处理 HTTP 请求时,我们经常使用 WebFlux 中最简单的 handler 定义方式,即通过注解 @RequestMapping
将某个方法定义为处理特定路径请求的 handler。 但是这个注解是用于处理 HTTP 请求的,对于 WebSocket 请求而言,收到请求后还需要协议升级的过程,之后才是 handler 的执行,所以我们不能直接通过该注解定义请求映射,不过可以使用 SimpleUrlHandlerMapping 来添加映射。
@Configuration public class WebSocketConfiguration { @Bean public HandlerMapping webSocketMapping(EchoHandler echoHandler) { final Map<String, WebSocketHandler> map = new HashMap<>(1); map.put("/echo", echoHandler); final SimpleUrlHandlerMapping mapping = new SimpleUrlHandlerMapping(); mapping.setOrder(Ordered.HIGHEST_PRECEDENCE); mapping.setUrlMap(map); return mapping; } @Bean public WebSocketHandlerAdapter handlerAdapter() { return new WebSocketHandlerAdapter(); } }
这样就能够将发往 /echo
的 WebSocket 请求交给 EchoHandler 处理。
我们还要为 WebSocket 类型的 handler 创建对应的 WebSocketHandlerAdapter,以便让 DispatcherHandler 能够调用我们的 WebSocketHandler。
完成这三个步骤后,当一个 WebSocket 请求到达 WebFlux 时,首先由 DispatcherHandler 进行处理,它会根据已有的 HandlerMapping 找到这个 WebSocket 请求对应的 handler,接着发现该 handler 实现了 WebSocketHandler 接口,于是会通过 WebSocketHandlerAdapter 来完成该 handler 的调用。
疑惑
从上面的例子不难看出,没接收一个请求后,就得在里面里面返回消息,后面就不能再给他发消息了。其次是我每次新添加或者删除一个消息的处理类Handler,就得每次去修改配置文件中的SimpleUrlHandlerMapping的UrlMap的内容,感觉不是很友好。于是针对这2点进行修改和调整如下:
1. 用自定义注解注册 Handler
我们能否像注册 HTTP 请求的 Handler 那样,也通过类似 RequestMapping 的注解来注册 Handler 呢?
虽然官方没有相关实现,但我们可以自己实现一个类似的注解,不妨叫作 WebSocketMapping
:
@Retention(RetentionPolicy.RUNTIME) @Target(ElementType.TYPE) public @interface WebSocketMapping { String value() default ""; }
@Retention(RetentionPolicy.RUNTIME)
表明该注解工作在运行期间,@Target(ElementType.TYPE)
表明该注解作用在类上。
我们先看下该注解最终的使用方式。下面是一个 TimeHandler 的示例,它会每秒钟会向客户端发送一次时间。我们通过注解 @WebSocketMapping("/time")
完成了 TimeHandler 的注册,告诉 WebFlux 当有 WebSocket 请求发往 /echo
路径时,就交给 EchoHandler 处理:
@Component @WebSocketMapping("/echo") public class EchoHandler implements WebSocketHandler { @Override public Mono<Void> handle(final WebSocketSession session) { return session.send( session.receive() .map(msg -> session.textMessage( "服务端返回:小明, -> " + msg.getPayloadAsText()))); } }
是不是和 RequestMapping 一样方便?
到目前为止,这个注解还没有实际的功能,还不能自动注册 handler。回顾我们上面注册路由的方式,我们创建了一个 SimpleUrlHandlerMapping,并手动添加了 EchoHandler 的映射规则,然后将其作为 HandlerMapping 的 Bean 返回。
现在我们要创建一个专门的 HandlerMapping 类来处理 WebSocketMapping 注解,自动完成 handler 的注册:
public class WebSocketMappingHandlerMapping extends SimpleUrlHandlerMapping{ private Map<String, WebSocketHandler> handlerMap = new LinkedHashMap<>(); /** * Register WebSocket handlers annotated by @WebSocketMapping * @throws BeansException */ @Override public void initApplicationContext() throws BeansException { Map<String, Object> beanMap = obtainApplicationContext() .getBeansWithAnnotation(WebSocketMapping.class); beanMap.values().forEach(bean -> { if (!(bean instanceof WebSocketHandler)) { throw new RuntimeException( String.format("Controller [%s] doesn't implement WebSocketHandler interface.", bean.getClass().getName())); } WebSocketMapping annotation = AnnotationUtils.getAnnotation( bean.getClass(), WebSocketMapping.class); //webSocketMapping 映射到管理中 handlerMap.put(Objects.requireNonNull(annotation).value(),(WebSocketHandler) bean); }); super.setOrder(Ordered.HIGHEST_PRECEDENCE); super.setUrlMap(handlerMap); super.initApplicationContext(); } }
我们的 WebSocketMappingHandlerMapping 类,实际上就是 SimpleUrlHandlerMapping,只不过增加了一些初始化的操作。
initApplicationContext()
方法是 Spring 中 ApplicationObjectSupport 类的方法,用于自定义类的初始化行为,在我们的 WebSocketMappingHandlerMapping 中,初始化工作主要是收集使用了 @WebSocketMapping
注解并且实现来 WebSocketHandler
接口的 Component,然后将它们注册到内部的 SimpleUrlHandlerMapping 中。之后的路由工作都是由父类 SimpleUrlHandlerMapping 已实现的功能来完成。
现在,我们只需要返回 WebSocketMappingHandlerMapping 的 Bean,就能自动处理 @WebSocketMapping
注解了:
@Configuration public class WebSocketConfiguration { @Bean public HandlerMapping webSocketMapping() { return new WebSocketMappingHandlerMapping(); } @Bean public WebSocketHandlerAdapter handlerAdapter() { return new WebSocketHandlerAdapter(); } }
2. WebSocket 请求处理过程剖析
我们来看下基于 Reactor Netty 的 WebFlux 具体是如何处理 WebSocket 请求的。
前面说过,WebSocket 请求进入 WebFlux 后,首先会从 HandlerMapping 中找到对应的 WebSocketHandler,再由 WebSocketHandlerAdapter 进行实际的调用。这就不再多做阐述,有兴趣的朋友可以去看看WebSocketHandler,WebSocketHandlerAdapter。
3. 分离数据的接收与发送操作
我们知道 HTTP 协议是半双工通信,虽然客户端和服务器都能给对方发数据,但是同一时间内只会由一方向另一方发送数据,并且在顺序上是客户端先发送请求,然后才由服务器返回响应数据。所以服务器处理 HTTP 的逻辑很简单,就是每接收到一个客户端请求,就返回一个响应。
而 WebSocket 是全双工通信,客户端和服务器可以随时向另一方发送数据,所以不再是"发送请求、返回响应"的通信方式了。我们上面的 EchoHandler 示例用的仍旧是这一方式,即收到数据后再针对性地返回一条数据,我们下面就来看看如何充分利用 WebSocket 的双向通信。
WebSocket 的处理,主要是通过 session 完成对两个数据流的操作,一个是客户端发给服务器的数据流,一个是服务器发给客户端的数据流:
WebSocketSession 方法 | 描述 |
---|---|
Flux<WebSocketMessage> receive() | 接收来自客户端的数据流,当连接关闭时数据流结束。 |
Mono<Void> send(Publisher<WebSocketMessage>) | 向客户端发送数据流,当数据流结束时,往客户端的写操作也会随之结束,此时返回的 Mono<Void> 会发出一个完成信号。 |
在 WebSocketHandler 中,最后应该将两个数据流的处理结果整合成一个信号流,并返回一个 Mono<Void>
用于表明处理是否结束。
我们分别为两个流定义处理的逻辑:
- 对于输出流:服务器每秒向客户端发送一个数字;
- 对于输入流:每当收到客户端消息时,就打印到标准输出
Mono<Void> input = session.receive() .map(WebSocketMessage::getPayloadAsText) .map(msg -> id + ": " + msg) .doOnNext(System.out::println).then(); Mono<Void> output = session.send(Flux.create(sink -> senderMap.put(id, new WebSocketSender(session, sink))));
这两个处理逻辑互相独立,它们之间没有先后关系,操作执行完之后都是返回一个 Mono<Void>
,但是如何将这两个操作的结果整合成一个信号流返回给 WebFlux 呢?我们可以使用 WebFlux 中的 Mono.zip()
方法:
@Component @WebSocketMapping("/echo") public class EchoHandler implements WebSocketHandler { @Autowired private ConcurrentHashMap<String, WebSocketSender> senderMap; @Override public Mono<Void> handle(WebSocketSession session) { Mono<Void> input = session.receive() .map(WebSocketMessage::getPayloadAsText).map(msg -> id + ": " + msg) .doOnNext(System.out::println).then(); Mono<Void> output = session.send(Flux.create(sink -> senderMap.put(id, new WebSocketSender(session, sink)))); /** * Mono.zip() 会将多个 Mono 合并为一个新的 Mono, * 任何一个 Mono 产生 error 或 complete 都会导致合并后的 Mono * 也随之产生 error 或 complete,此时其它的 Mono 则会被执行取消操作。 */ return Mono.zip(input, output).then(); } }
4. 从 Handler 外部发送数据
这里所说的从外部发送数据,指的是需要在 WebSocketHandler 的代码范围之外,在其它地方通过代码调用的方式向 WebSocket 连接发送数据。
思路:在定义 session 的 send()
操作时,通过编程的方式创建 Flux,即使用 Flux.create()
方法创建,将发布 Flux 数据的 FluxSink
暴露出来,并进行保存,然后在需要发送数据的地方,调用 FluxSink<T>
的 next(T data)
方法,向 Flux 的订阅者发布数据。
create 方法是以编程方式创建 Flux 的高级形式,它允许每次产生多个数据,并且可以由多个线程产生。
create 方法将内部的 FluxSink 暴露出来,FluxSink 提供了 next、error、complete 方法。通过 create 方法,可以将响应式堆栈中的 API 与其它 API 进行连接。
考虑这么一个场景:服务器与客户端 A 建立 WebSocket 连接后,允许客户端 B 通过 HTTP 向客户端 A 发送数据。
不考虑安全性、鲁棒性等问题,我们给出一个简单的示例。
首先是 WebSocketHandler 的实现,客户端发送 WebSocket 建立请求时,需要在 query 参数中为当前连接指定一个 id,服务器会以该 id 为键,以对应的 WebSocketSender 为值存放到 senderMap 中:
@Component @WebSocketMapping("/echo") public class EchoHandler implements WebSocketHandler { @Autowired private ConcurrentHashMap<String, WebSocketSender> senderMap; @Override public Mono<Void> handle(WebSocketSession session) { // TODO Auto-generated method stub HandshakeInfo handshakeInfo = session.getHandshakeInfo(); Map<String, String> queryMap = getQueryMap(handshakeInfo.getUri().getQuery()); String id = queryMap.getOrDefault("id", "defaultId"); Mono<Void> input = session.receive().map(WebSocketMessage::getPayloadAsText).map(msg -> id + ": " + msg) .doOnNext(System.out::println).then(); Mono<Void> output = session.send(Flux.create(sink -> senderMap.put(id, new WebSocketSender(session, sink)))); /** * Mono.zip() 会将多个 Mono 合并为一个新的 Mono,任何一个 Mono 产生 error 或 complete 都会导致合并后的 Mono * 也随之产生 error 或 complete,此时其它的 Mono 则会被执行取消操作。 */ return Mono.zip(input, output).then(); } //用于获取url参数 private Map<String, String> getQueryMap(String queryStr) { Map<String, String> queryMap = new HashMap<>(); if (!StringUtils.isEmpty(queryStr)) { String[] queryParam = queryStr.split("&"); Arrays.stream(queryParam).forEach(s -> { String[] kv = s.split("=", 2); String value = kv.length == 2 ? kv[1] : ""; queryMap.put(kv[0], value); }); } return queryMap; } }
其中,senderMap
是我们自己定义的 Bean,在配置文件中定义:
@Configuration public class WebSocketConfiguration { @Bean public HandlerMapping webSocketMapping() { return new WebSocketMappingHandlerMapping(); } @Bean public ConcurrentHashMap<String, WebSocketSender> senderMap() { return new ConcurrentHashMap<String, WebSocketSender>(); } @Bean public WebSocketHandlerAdapter handlerAdapter() { return new WebSocketHandlerAdapter(); } }
WebSocketSender
是我们自己创建的类,目的是保存 WebSocket 连接的 session 以及对应的 FluxSink,以便在 WebSocketHandler 代码范围外发送数据:
public class WebSocketSender { private WebSocketSession session; private FluxSink<WebSocketMessage> sink; public WebSocketSender(WebSocketSession session, FluxSink<WebSocketMessage> sink) { this.session = session; this.sink = sink; } public void sendData(String data) { sink.next(session.textMessage(data)); } }
接着我们来实现 HTTP Controller,用户在发起 HTTP 请求时,通过 query 参数指定要通信的 WebSocket 连接 id,以及要发送的数据,然后从 senderMap 中取出对应的 WebSocketSender,调用其 send()
方法向客户端发送数据:
@RestController @RequestMapping("/msg") public class MsgController { @Autowired private ConcurrentHashMap<String, WebSocketSender> senderMap; @RequestMapping("/send") public String sendMessage(@RequestParam String id, @RequestParam String data) { WebSocketSender sender = senderMap.get(id); if (sender != null) { sender.sendData(data); return String.format("Message '%s' sent to connection: %s.", data, id); } else { return String.format("Connection of id '%s' doesn't exist", id); } } }
5. 测试
我这就不再写页面了,直接就用https://www.websocket.org/echo.html进行测试了,结果如下:
这样就算完成了定点推送了,全推送,和部分推送就不再写了,只要从ConcurrentHashMap中取出来去发送就是了。
低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
如何低成本实现Flutter富文本,看这一篇就够了!
作者:闲鱼技术-玄川 背景 闲鱼是国内最早使用Flutter 的团队,作为一个电商App商品详情页是非常重要场景,其中最主要的技术能力是文字混排。 我们面对文本类的需求是复杂而且多变,然而Flutter历史的几个版本,Text只能显示简单样式文本,它只有包含一些控制文本样式显示的属性,而通过TextSpan连接实现的RichText也只能显示多种文本样式(例如:一个基础文本片段和一个链接片段),这些远远达不到设计需要的能力。被产品和设计怂为啥别人别的平台能做,Flutter为何做不了,不管,必须支持。 因此,需要开发一个能力更强的文字混排组件就变得迫在眉睫。 富文本的原理 再讲文字混批组件设计实现前,先来讲讲系统RichText的富文本的原理。 创建过程 创建RichText节点的时候其实会创建以下几个对象: 先创建LeafRenderObjectElement实例。 ComponentElement方法当中会调用RichText实例的CreateRenderObject方法,生成RenderParagraph 实例。 RenderParagraph 会创建TextPainter 负责...
- 下一篇
Java8让Excel的读写变得更加简单高效
快速使用 GitHUb地址【https://github.com/liuhuagui/gridexcel】 Apache POI 在业务开发中我们经常会遇到Excel的导入导出,而 Apache POI 是Java开发者常用的API。 【https://poi.apache.org/components/spreadsheet/index.html】 GridExcel Universal solution for reading and writing simply Excel based on functional programming and POI EventModel GridExcel是基于Java8函数式编程和POI EventModel实现的用于Excel简单读写的通用解决方案。 基于POI EventModel,在读写数据量非常大的Excel时,降低内存占用避免OOM与频繁FullGC 基于函数编程,支持关联对象等多种复杂情况的处理,学习成本低 支持流式API,使代码编写和理解更简单,更直观 EventModel 什么是EventModel?在POI FAQ(常见问...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
-
Docker使用Oracle官方镜像安装(12C,18C,19C)
- Springboot2将连接池hikari替换为druid,体验最强大的数据库连接池
- CentOS8编译安装MySQL8.0.19
- Docker快速安装Oracle11G,搭建oracle11g学习环境
- SpringBoot2配置默认Tomcat设置,开启更多高级功能
- MySQL8.0.19开启GTID主从同步CentOS8
- CentOS7,8上快速安装Gitea,搭建Git服务器
- Jdk安装(Linux,MacOS,Windows),包含三大操作系统的最全安装
- SpringBoot2编写第一个Controller,响应你的http请求并返回结果
推荐阅读
最新文章
- Springboot2将连接池hikari替换为druid,体验最强大的数据库连接池
- Docker使用Oracle官方镜像安装(12C,18C,19C)
- CentOS6,CentOS7官方镜像安装Oracle11G
- SpringBoot2整合Redis,开启缓存,提高访问速度
- Jdk安装(Linux,MacOS,Windows),包含三大操作系统的最全安装
- SpringBoot2配置默认Tomcat设置,开启更多高级功能
- SpringBoot2整合MyBatis,连接MySql数据库做增删改查操作
- Hadoop3单机部署,实现最简伪集群
- MySQL8.0.19开启GTID主从同步CentOS8
- SpringBoot2编写第一个Controller,响应你的http请求并返回结果