webflux提供响应式API,玩出不一样的花样
先说说什么是响应式
响应式编程或反应式编程(英语:Reactive programming)是一种面向数据流和变化传播的编程范式,直白的说就是:将变化的值通过数据流进行传播。
WebFlux又是什么呢
WebFlux 模块的名称是 spring-webflux,名称中的 Flux 来源于 Reactor 中的类 Flux。Spring webflux 有一个全新的非堵塞的函数式 Reactive Web 框架,可以用来构建异步的、非堵塞的、事件驱动的服务,在伸缩性方面表现非常好。
spring-webflux 模块。该模块包含对响应式 HTTP 和 WebSocket 客户端的支持,以及对 REST,HTML 和 WebSocket 交互等程序的支持。一般来说,Spring MVC 用于同步处理,Spring Webflux 用于异步处理。
Spring Boot Webflux 有两种编程模型实现,一种类似 Spring MVC 注解方式,另一种是基于 Reactor 的响应式方式。
实践走起
我在网找了下发现现在支持的DAL包有:
spring-boot-starter-data-redis-reactive、spring-boot-starter-data-mongodb-reactive
也许还有别的,我本意是想要spring-boot-starter-data-mysql-reactive,然而并木有。那就说下上面2个包的实践把。
spring-boot-starter-data-redis-reactive
用到的包
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis-reactive</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>com.github.flying-cattle</groupId>
<artifactId>mybatis-dsc-generator</artifactId>
<version>${mybatis-dsc-generator.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>${fastjson.version}</version>
</dependency>
YMl配置
server:
port: 8080
spring:
application:
name: webFlux-test
redis:
host: 127.0.0.1
port: 6379
password: pwd2020
timeout: 5000
lettuce:
pool:
max-active: 200
max-idle: 20
min-idle: 5
max-wait: 1000
整合redis-reactive
虽然包是starter,但是还是要有自己的配置才能用不然报错如下:
Description:
Field redisTemplate in com.flying.cattle.wf.service.impl.RedisServiceImpl required a bean of type 'org.springframework.data.redis.core.ReactiveRedisTemplate' that could not be found.
The injection point has the following annotations:
- @org.springframework.beans.factory.annotation.Autowired(required=true)
Action:
Consider defining a bean of type 'org.springframework.data.redis.core.ReactiveRedisTemplate' in your configuration.
看了下官方文档需要加上如下:
@Bean
public ReactiveRedisTemplate<String, String> reactiveRedisTemplate(ReactiveRedisConnectionFactory factory) {
ReactiveRedisTemplate<String, String> reactiveRedisTemplate = new ReactiveRedisTemplate<>(factory,RedisSerializationContext.string());
return reactiveRedisTemplate;
}
发现了么是ReactiveRedisTemplate<String, String> 感觉就不很友好了,本来我是想声明成ReactiveRedisTemplate<String, Serializable>,搞古了一会儿木有搞定。有那个大佬有好的方案,望指点哈
Service代码:
@Service
public class RedisServiceImpl implements RedisService {
@Autowired
private ReactiveRedisTemplate<String, String> redisTemplate;
@Override
public Mono<String> getById(String key) {
// TODO Auto-generated method stub
ReactiveValueOperations<String, String> operations = redisTemplate.opsForValue();
return operations.get(key);
}
@Override
public Mono<String> addUser(String key,User user) {
// TODO Auto-generated method stub
ReactiveValueOperations<String, String> operations = redisTemplate.opsForValue();
return operations.getAndSet(key, JSON.toJSONString(user));
}
@Override
public Mono<Boolean> deleteById(String key) {
// TODO Auto-generated method stub
ReactiveValueOperations<String, String> operations = redisTemplate.opsForValue();
return operations.delete(key);
}
@Override
public Mono<String> updateById(String key,User user) {
// TODO Auto-generated method stub
ReactiveValueOperations<String, String> operations = redisTemplate.opsForValue();
return operations.getAndSet(key, JSON.toJSONString(user));
}
@Override
public Flux<String> findAll(String key) {
// TODO Auto-generated method stub
ReactiveListOperations<String, String> operations = redisTemplate.opsForList();
return operations.range(key, 0, -1);
}
@Override
public Mono<Long> addlist(String key,List<String> list) {
// TODO Auto-generated method stub
ReactiveListOperations<String, String> operations = redisTemplate.opsForList();
return operations.leftPushAll(key, list);
}
@Override
public Flux<String> findUsers(String key) {
ReactiveValueOperations<String, String> operations = redisTemplate.opsForValue();
return redisTemplate.keys(key).flatMap(keyId ->operations.get(keyId));
}
}
Controller代码
@RestController
@RequestMapping("/user")
public class UserController {
public final static String USER_KEY="user";
@Autowired
private RedisService redisService;
@Autowired
private RedisGenerateId redisGenerateId;
@GetMapping("/getId")
public Long getUserId(){
return redisGenerateId.generate(USER_KEY);
}
public String getKey(Long id) {
return USER_KEY+"_"+id;
}
@GetMapping("/getById/{id}")
public Mono<String> getUserById(@PathVariable("id")Long id){
return redisService.getById(getKey(id));
}
@GetMapping("/add")
public Mono<String> add(User user){
user = new User();
user.setAccount("admin1");
user.setPassword("123123");
user.setNickname("admin");
user.setEmail("505237@qq.com");
user.setPhone("13666275002");
user.setSex(true);
String bd="1990-01-01";
DateFormat fmt = new SimpleDateFormat("yyyy-MM-dd");
try {
user.setBirthday(fmt.parse(bd));
} catch (ParseException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
user.setProvince("四川省");
user.setCity("成都市");
user.setCounty("高新区");
user.setAddress("天 府大道XXd段XX号");
user.setState("1");
// 以上是模拟数据
ValidationResult vr=ValidationUtils.validateEntity(user);
if (!vr.isHasErrors()) {
user.setId(getUserId());
System.out.println(JSON.toJSONString(user));
return redisService.addUser(getKey(user.getId()),user);
}else {
return Mono.just(vr.getFirstErrors());
}
}
@GetMapping("/addlist")
public Mono<Long> addlist(){
List<String> list=new ArrayList<String>();
User user = new User();
user.setAccount("admin1");
user.setPassword("123123");
user.setNickname("admin");
user.setEmail("505237@qq.com");
user.setPhone("13666275002");
user.setSex(true);
user.setBirthday(new Date());
user.setProvince("四川省");
user.setCity("成都市");
user.setCounty("高新区");
user.setAddress("天 府大道XXd段XX号");
user.setState("1");
//添加第一条数据
Long id=redisGenerateId.generate("user");
user.setId(id);
list.add(JSON.toJSONString(user));
//添加第二条数据
id=redisGenerateId.generate("user");
user.setId(id);
list.add(JSON.toJSONString(user));
//添加第三条数据
id=redisGenerateId.generate("user");
user.setId(id);
list.add(JSON.toJSONString(user));
return redisService.addlist("list", list);
}
/**
* 这个就是流响应式的接口了,是一个一个的返回数据的,异步返回
* delayElements(Duration.ofSeconds(2))这个是不要的,只是方便看效果
* redis 直接就是一个一个返回,不需要produces,不知道为什么...还木有深究。
*/
@GetMapping(value="/findAll",produces = MediaType.APPLICATION_STREAM_JSON_VALUE)
public Flux<String> findAll(){
return redisService.findAll("list").delayElements(Duration.ofSeconds(2));
}
@GetMapping("/getUsers")
public Flux<String> findUsers() {
// TODO Auto-generated method stub
return redisService.findUsers(USER_KEY+"_"+"*").delayElements(Duration.ofSeconds(2));
}
}
一个是差list数据类型,一个是匹配key查询的,都是一个一个返回的,实际开发中去掉.delayElements(Duration.ofSeconds(2))就好
整合mongodb-reactive
需要的包,只需要在redis的基础上下面的jar
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-mongodb-reactive</artifactId>
</dependency>
MongoDB就很人性化了,感觉就很友好。而且是真的starter包,配置好数据库连接,就不需要其他配置了,直接可用
DAO
import org.springframework.data.mongodb.repository.ReactiveMongoRepository;
import com.flying.cattle.wf.entity.User;
public interface UserRepository extends ReactiveMongoRepository<User, Long>{
}
SERVICE(接口层我就不贴代码了)
@Service
public class MongoServiceImpl implements MongoService {
@Autowired
private UserRepository userRepository;
@Override
public Mono<User> getById(Long id) {
// TODO Auto-generated method stub
return userRepository.findById(id);
}
@Override
public Mono<User> addUser(User user) {
// TODO Auto-generated method stub
return userRepository.save(user);
}
@Override
public Mono<Boolean> deleteById(Long id) {
// TODO Auto-generated method stub
userRepository.deleteById(id);
return Mono.create(userMonoSink -> userMonoSink.success());
}
@Override
public Mono<User> updateById(User user) {
// TODO Auto-generated method stub
return userRepository.save(user);
}
@Override
public Flux<User> findAllUser() {
// TODO Auto-generated method stub
return userRepository.findAll();
}
}
CONTROLLER
@RestController
@RequestMapping("/usermg")
public class UserMongoController {
public final static String USER_KEY="user";
@Autowired
private RedisGenerateId redisGenerateId;
@Autowired
private MongoService mongoService;
@GetMapping("/getId")
public Long getUserId(){
return redisGenerateId.generate(USER_KEY);
}
@GetMapping("/add")
public Mono<User> add(User user) {
user = new User();
user.setAccount("admin1");
user.setPassword("123123");
user.setNickname("admin");
user.setEmail("505237@qq.com");
user.setPhone("13666275002");
user.setSex(true);
String bd = "1990-01-01";
DateFormat fmt = new SimpleDateFormat("yyyy-MM-dd");
try {
user.setBirthday(fmt.parse(bd));
} catch (ParseException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
user.setProvince("四川省");
user.setCity("成都市");
user.setCounty("高新区");
user.setAddress("天 府大道XXd段XX号");
user.setState("1");
// 以上是模拟数据
ValidationResult vr = ValidationUtils.validateEntity(user);
if (!vr.isHasErrors()) {
user.setId(getUserId());
System.out.println(JSON.toJSONString(user));
return mongoService.addUser(user);
} else {
System.err.println(vr.getFirstErrors());
}
return null;
}
/**
* 注意这里produces = MediaType.APPLICATION_STREAM_JSON_VALUE
* 如果不是application/stream+json则调用端无法滚动得到结果,将一直阻塞等待数据流结束或超时。
*/
@GetMapping(value="/findAll",produces = MediaType.APPLICATION_STREAM_JSON_VALUE)
public Flux<User> findAll(){
return mongoService.findAllUser().delayElements(Duration.ofSeconds(1));
}
}
代码就这些,大家要体验这个框架,建议还是用MongoDB把,毕竟redis主要是做缓存的。
给大家看下数据结构图
源码地址:https://gitee.com/flying-cattle/infrastructure/tree/master/webFluxTest

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
-
上一篇
线程池没你想的那么简单(续)
前言 前段时间写过一篇《线程池没你想的那么简单》,和大家一起撸了一个基本的线程池,具备: 线程池基本调度功能。 线程池自动扩容缩容。 队列缓存线程。 关闭线程池。 这些功能,最后也留下了三个待实现的 features 。 执行带有返回值的线程。 异常处理怎么办? 所有任务执行完怎么通知我? 这次就实现这三个特性来看看 j.u.c 中的线程池是如何实现这些需求的。 再看本文之前,强烈建议先查看上文《线程池没你想的那么简单》 任务完成后的通知 大家在用线程池的时候或多或少都会有这样的需求: 线程池中的任务执行完毕后再通知主线程做其他事情,比如一批任务都执行完毕后再执行下一波任务等等。 以我们之前的代码为例: 总共往线程池中提交了 13 个任务,直到他们都执行完毕后再打印 “任务执行完毕” 这个日志。 执行结果如下: 为了简单的达到这个效果,我们可以在初始化线程池的时候传入一个接口的实现,这个接口就是用于任务完成之后的回调。 public interface Notify { /** * 回调 */ void notifyListen() ; } 以上就是线程池的构造函数以及接口的定义。 所...
-
下一篇
RPC的负载均衡策略
抽空自己写了个简易版的rpc框架,想了下怎么搞负载均衡, 最简单的方式就是搞个配置文件放置服务地址,直接读配置文件,转而想到配置文件可以放zk,相当于用zk来做配置中心或者服务发现。 优秀的dubbo项目就可以这么做,马上参考了下谷歌的grpc,发现了一篇谷歌很棒的文章,拜读了下(也借用了谷歌这篇文章的图片),很不错,想写一些我自己的见解。 传送门: https://grpc.io/blog/loadbalancing/ rpc通信本身并不复杂,只要定好协议怎么处理问题不大,但是负载均衡的策略是值得推敲的。 一般情况下,负载均衡的策略有以下两种 1. 代理服务 客户端并不知道服务端的存在,它所有的请求都打到代理服务,由代理服务去分发到服务端,并且实现公平的负载算法。 客户机可能不可信,这种情况通过用户面向用户的服务,类似于我们的nginx将请求分发到后端机器。 缺点: 客户端不知道后端的存在,且客户端不可信,延迟会更高且代理服务会影响服务本身的吞吐量 优点: 在中间层做监控等拦截操作特别棒。 如图: 2. 客户端负载均衡 客户端知道有多个后端服务,由客户端去选择服务端,并且客户端可以从...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- CentOS6,7,8上安装Nginx,支持https2.0的开启
- Hadoop3单机部署,实现最简伪集群
- MySQL8.0.19开启GTID主从同步CentOS8
- CentOS7编译安装Cmake3.16.3,解决mysql等软件编译问题
- SpringBoot2整合Thymeleaf,官方推荐html解决方案
- CentOS8编译安装MySQL8.0.19
- Docker使用Oracle官方镜像安装(12C,18C,19C)
- SpringBoot2配置默认Tomcat设置,开启更多高级功能
- CentOS7,CentOS8安装Elasticsearch6.8.6
- CentOS7设置SWAP分区,小内存服务器的救世主