您现在的位置是:首页 > 文章详情

webflux提供响应式API,玩出不一样的花样

日期:2019-06-06点击:289

先说说什么是响应式

        响应式编程或反应式编程(英语:Reactive programming)是一种面向数据流和变化传播的编程范式,直白的说就是:将变化的值通过数据流进行传播。

WebFlux又是什么呢

        WebFlux 模块的名称是 spring-webflux,名称中的 Flux 来源于 Reactor 中的类 Flux。Spring webflux 有一个全新的非堵塞的函数式 Reactive Web 框架,可以用来构建异步的、非堵塞的、事件驱动的服务,在伸缩性方面表现非常好。

        spring-webflux 模块。该模块包含对响应式 HTTP 和 WebSocket 客户端的支持,以及对 REST,HTML 和 WebSocket 交互等程序的支持。一般来说,Spring MVC 用于同步处理,Spring Webflux 用于异步处理。

        Spring Boot Webflux 有两种编程模型实现,一种类似 Spring MVC 注解方式,另一种是基于 Reactor 的响应式方式。

实践走起

    我在网找了下发现现在支持的DAL包有:
    spring-boot-starter-data-redis-reactive、spring-boot-starter-data-mongodb-reactive
    也许还有别的,我本意是想要spring-boot-starter-data-mysql-reactive,然而并木有。那就说下上面2个包的实践把。

spring-boot-starter-data-redis-reactive

用到的包

 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-webflux</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis-reactive</artifactId> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-pool2</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-configuration-processor</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>com.github.flying-cattle</groupId> <artifactId>mybatis-dsc-generator</artifactId> <version>${mybatis-dsc-generator.version}</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>${fastjson.version}</version> </dependency>

YMl配置

server: port: 8080 spring: application: name: webFlux-test redis: host: 127.0.0.1 port: 6379 password: pwd2020 timeout: 5000 lettuce: pool: max-active: 200 max-idle: 20 min-idle: 5 max-wait: 1000 

整合redis-reactive

        虽然包是starter,但是还是要有自己的配置才能用不然报错如下:

Description: Field redisTemplate in com.flying.cattle.wf.service.impl.RedisServiceImpl required a bean of type 'org.springframework.data.redis.core.ReactiveRedisTemplate' that could not be found. The injection point has the following annotations: - @org.springframework.beans.factory.annotation.Autowired(required=true) Action: Consider defining a bean of type 'org.springframework.data.redis.core.ReactiveRedisTemplate' in your configuration.

看了下官方文档需要加上如下:

@Bean public ReactiveRedisTemplate<String, String> reactiveRedisTemplate(ReactiveRedisConnectionFactory factory) { ReactiveRedisTemplate<String, String> reactiveRedisTemplate = new ReactiveRedisTemplate<>(factory,RedisSerializationContext.string()); return reactiveRedisTemplate; }

发现了么是ReactiveRedisTemplate<String, String> 感觉就不很友好了,本来我是想声明成ReactiveRedisTemplate<String, Serializable>,搞古了一会儿木有搞定。有那个大佬有好的方案,望指点哈

Service代码:

@Service public class RedisServiceImpl implements RedisService { @Autowired private ReactiveRedisTemplate<String, String> redisTemplate; @Override public Mono<String> getById(String key) { // TODO Auto-generated method stub ReactiveValueOperations<String, String> operations = redisTemplate.opsForValue(); return operations.get(key); } @Override public Mono<String> addUser(String key,User user) { // TODO Auto-generated method stub ReactiveValueOperations<String, String> operations = redisTemplate.opsForValue(); return operations.getAndSet(key, JSON.toJSONString(user)); } @Override public Mono<Boolean> deleteById(String key) { // TODO Auto-generated method stub ReactiveValueOperations<String, String> operations = redisTemplate.opsForValue(); return operations.delete(key); } @Override public Mono<String> updateById(String key,User user) { // TODO Auto-generated method stub ReactiveValueOperations<String, String> operations = redisTemplate.opsForValue(); return operations.getAndSet(key, JSON.toJSONString(user)); } @Override public Flux<String> findAll(String key) { // TODO Auto-generated method stub ReactiveListOperations<String, String> operations = redisTemplate.opsForList(); return operations.range(key, 0, -1); } @Override public Mono<Long> addlist(String key,List<String> list) { // TODO Auto-generated method stub ReactiveListOperations<String, String> operations = redisTemplate.opsForList(); return operations.leftPushAll(key, list); } @Override public Flux<String> findUsers(String key) { ReactiveValueOperations<String, String> operations = redisTemplate.opsForValue(); return redisTemplate.keys(key).flatMap(keyId ->operations.get(keyId)); } }

Controller代码

@RestController @RequestMapping("/user") public class UserController { public final static String USER_KEY="user"; @Autowired private RedisService redisService; @Autowired private RedisGenerateId redisGenerateId; @GetMapping("/getId") public Long getUserId(){ return redisGenerateId.generate(USER_KEY); } public String getKey(Long id) { return USER_KEY+"_"+id; } @GetMapping("/getById/{id}") public Mono<String> getUserById(@PathVariable("id")Long id){ return redisService.getById(getKey(id)); } @GetMapping("/add") public Mono<String> add(User user){ user = new User(); user.setAccount("admin1"); user.setPassword("123123"); user.setNickname("admin"); user.setEmail("505237@qq.com"); user.setPhone("13666275002"); user.setSex(true); String bd="1990-01-01"; DateFormat fmt = new SimpleDateFormat("yyyy-MM-dd"); try { user.setBirthday(fmt.parse(bd)); } catch (ParseException e) { // TODO Auto-generated catch block e.printStackTrace(); } user.setProvince("四川省"); user.setCity("成都市"); user.setCounty("高新区"); user.setAddress("天 府大道XXd段XX号"); user.setState("1"); // 以上是模拟数据 ValidationResult vr=ValidationUtils.validateEntity(user); if (!vr.isHasErrors()) { user.setId(getUserId()); System.out.println(JSON.toJSONString(user)); return redisService.addUser(getKey(user.getId()),user); }else { return Mono.just(vr.getFirstErrors()); } } @GetMapping("/addlist") public Mono<Long> addlist(){ List<String> list=new ArrayList<String>(); User user = new User(); user.setAccount("admin1"); user.setPassword("123123"); user.setNickname("admin"); user.setEmail("505237@qq.com"); user.setPhone("13666275002"); user.setSex(true); user.setBirthday(new Date()); user.setProvince("四川省"); user.setCity("成都市"); user.setCounty("高新区"); user.setAddress("天 府大道XXd段XX号"); user.setState("1"); //添加第一条数据 Long id=redisGenerateId.generate("user"); user.setId(id); list.add(JSON.toJSONString(user)); //添加第二条数据 id=redisGenerateId.generate("user"); user.setId(id); list.add(JSON.toJSONString(user)); //添加第三条数据 id=redisGenerateId.generate("user"); user.setId(id); list.add(JSON.toJSONString(user)); return redisService.addlist("list", list); } /** * 这个就是流响应式的接口了,是一个一个的返回数据的,异步返回 * delayElements(Duration.ofSeconds(2))这个是不要的,只是方便看效果 * redis 直接就是一个一个返回,不需要produces,不知道为什么...还木有深究。 */ @GetMapping(value="/findAll",produces = MediaType.APPLICATION_STREAM_JSON_VALUE) public Flux<String> findAll(){ return redisService.findAll("list").delayElements(Duration.ofSeconds(2)); } @GetMapping("/getUsers") public Flux<String> findUsers() { // TODO Auto-generated method stub return redisService.findUsers(USER_KEY+"_"+"*").delayElements(Duration.ofSeconds(2)); } }

一个是差list数据类型,一个是匹配key查询的,都是一个一个返回的,实际开发中去掉.delayElements(Duration.ofSeconds(2))就好

整合mongodb-reactive

需要的包,只需要在redis的基础上下面的jar

 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-mongodb-reactive</artifactId> </dependency> 

MongoDB就很人性化了,感觉就很友好。而且是真的starter包,配置好数据库连接,就不需要其他配置了,直接可用

DAO

import org.springframework.data.mongodb.repository.ReactiveMongoRepository; import com.flying.cattle.wf.entity.User; public interface UserRepository extends ReactiveMongoRepository<User, Long>{ }

SERVICE(接口层我就不贴代码了)

@Service public class MongoServiceImpl implements MongoService { @Autowired private UserRepository userRepository; @Override public Mono<User> getById(Long id) { // TODO Auto-generated method stub return userRepository.findById(id); } @Override public Mono<User> addUser(User user) { // TODO Auto-generated method stub return userRepository.save(user); } @Override public Mono<Boolean> deleteById(Long id) { // TODO Auto-generated method stub userRepository.deleteById(id); return Mono.create(userMonoSink -> userMonoSink.success()); } @Override public Mono<User> updateById(User user) { // TODO Auto-generated method stub return userRepository.save(user); } @Override public Flux<User> findAllUser() { // TODO Auto-generated method stub return userRepository.findAll(); } }

CONTROLLER

@RestController @RequestMapping("/usermg") public class UserMongoController { public final static String USER_KEY="user"; @Autowired private RedisGenerateId redisGenerateId; @Autowired private MongoService mongoService; @GetMapping("/getId") public Long getUserId(){ return redisGenerateId.generate(USER_KEY); } @GetMapping("/add") public Mono<User> add(User user) { user = new User(); user.setAccount("admin1"); user.setPassword("123123"); user.setNickname("admin"); user.setEmail("505237@qq.com"); user.setPhone("13666275002"); user.setSex(true); String bd = "1990-01-01"; DateFormat fmt = new SimpleDateFormat("yyyy-MM-dd"); try { user.setBirthday(fmt.parse(bd)); } catch (ParseException e) { // TODO Auto-generated catch block e.printStackTrace(); } user.setProvince("四川省"); user.setCity("成都市"); user.setCounty("高新区"); user.setAddress("天 府大道XXd段XX号"); user.setState("1"); // 以上是模拟数据 ValidationResult vr = ValidationUtils.validateEntity(user); if (!vr.isHasErrors()) { user.setId(getUserId()); System.out.println(JSON.toJSONString(user)); return mongoService.addUser(user); } else { System.err.println(vr.getFirstErrors()); } return null; } /** * 注意这里produces = MediaType.APPLICATION_STREAM_JSON_VALUE * 如果不是application/stream+json则调用端无法滚动得到结果,将一直阻塞等待数据流结束或超时。 */ @GetMapping(value="/findAll",produces = MediaType.APPLICATION_STREAM_JSON_VALUE) public Flux<User> findAll(){ return mongoService.findAllUser().delayElements(Duration.ofSeconds(1)); } }

代码就这些,大家要体验这个框架,建议还是用MongoDB把,毕竟redis主要是做缓存的。

给大家看下数据结构图

 

 

源码地址:https://gitee.com/flying-cattle/infrastructure/tree/master/webFluxTest

原文链接:https://my.oschina.net/bianxin/blog/3059122
关注公众号

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。

持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。

转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。

文章评论

共有0条评论来说两句吧...

文章二维码

扫描即可查看该文章

点击排行

推荐阅读

最新文章