SpringBoot2 整合 Redis集群 ,实现消息队列场景
一、Redis集群简介
1、RedisCluster概念
Redis的分布式解决方案,在3.0版本后推出的方案,有效地解决了Redis分布式的需求,当一个服务宕机可以快速的切换到另外一个服务。redis cluster主要是针对海量数据+高并发+高可用的场景。
二、与SpringBoot2.0整合
1、核心依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
<version>${spring-boot.version}</version>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>${redis-client.version}</version>
</dependency>
2、核心配置
spring:
# Redis 集群
redis:
sentinel:
# sentinel 配置
master: mymaster
nodes: 192.168.0.127:26379
maxTotal: 60
minIdle: 10
maxWaitMillis: 10000
testWhileIdle: true
testOnBorrow: true
testOnReturn: false
timeBetweenEvictionRunsMillis: 10000
3、参数渲染类
@ConfigurationProperties(prefix = "spring.redis.sentinel")
public class RedisParam {
private String nodes ;
private String master ;
private Integer maxTotal ;
private Integer minIdle ;
private Integer maxWaitMillis ;
private Integer timeBetweenEvictionRunsMillis ;
private boolean testWhileIdle ;
private boolean testOnBorrow ;
private boolean testOnReturn ;
// 省略GET和SET方法
}
4、集群配置文件
@Configuration
@EnableConfigurationProperties(RedisParam.class)
public class RedisPool {
@Resource
private RedisParam redisParam ;
@Bean("jedisSentinelPool")
public JedisSentinelPool getRedisPool (){
Set<String> sentinels = new HashSet<>();
sentinels.addAll(Arrays.asList(redisParam.getNodes().split(",")));
GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig();
poolConfig.setMaxTotal(redisParam.getMaxTotal());
poolConfig.setMinIdle(redisParam.getMinIdle());
poolConfig.setMaxWaitMillis(redisParam.getMaxWaitMillis());
poolConfig.setTestWhileIdle(redisParam.isTestWhileIdle());
poolConfig.setTestOnBorrow(redisParam.isTestOnBorrow());
poolConfig.setTestOnReturn(redisParam.isTestOnReturn());
poolConfig.setTimeBetweenEvictionRunsMillis(redisParam.getTimeBetweenEvictionRunsMillis());
JedisSentinelPool redisPool = new JedisSentinelPool(redisParam.getMaster(), sentinels, poolConfig);
return redisPool;
}
@Bean
SpringUtil springUtil() {
return new SpringUtil();
}
@Bean
RedisListener redisListener() {
return new RedisListener();
}
}
5、配置Redis模板类
@Configuration
public class RedisConfig {
@Bean
public StringRedisTemplate stringRedisTemplate(RedisConnectionFactory factory) {
StringRedisTemplate stringRedisTemplate = new StringRedisTemplate();
stringRedisTemplate.setConnectionFactory(factory);
return stringRedisTemplate;
}
}
三、模拟队列场景案例
生产者消费者模式:客户端监听消息队列,消息达到,消费者马上消费,如果消息队列里面没有消息,那么消费者就继续监听。基于Redis的LPUSH(BLPUSH)把消息入队,用 RPOP(BRPOP)获取消息的模式。
1、加锁解锁工具
@Component
public class RedisLock {
private static String keyPrefix = "RedisLock:";
@Resource
private JedisSentinelPool jedisSentinelPool;
public boolean addLock(String key, long expire) {
Jedis jedis = null;
try {
jedis = jedisSentinelPool.getResource();
/*
* nxxx的值只能取NX或者XX,如果取NX,则只有当key不存在是才进行set,如果取XX,则只有当key已经存在时才进行set
* expx的值只能取EX或者PX,代表数据过期时间的单位,EX代表秒,PX代表毫秒。
*/
String value = jedis.set(keyPrefix + key, "1", "nx", "ex", expire);
return value != null;
} catch (Exception e){
e.printStackTrace();
}finally {
if (jedis != null) jedis.close();
}
return false;
}
public void removeLock(String key) {
Jedis jedis = null;
try {
jedis = jedisSentinelPool.getResource();
jedis.del(keyPrefix + key);
} finally {
if (jedis != null) jedis.close();
}
}
}
2、消息消费
1)封装接口
public interface RedisHandler {
/**
* 队列名称
*/
String queueName();
/**
* 队列消息内容
*/
String consume (String msgBody);
}
2)接口实现
@Component
public class LogAListen implements RedisHandler {
private static final Logger LOG = LoggerFactory.getLogger(LogAListen.class) ;
@Resource
private RedisLock redisLock;
@Override
public String queueName() {
return "LogA-key";
}
@Override
public String consume(String msgBody) {
// 加锁,防止消息重复投递
String lockKey = "lock-order-uuid-A";
boolean lock = false;
try {
lock = redisLock.addLock(lockKey, 60);
if (!lock) {
return "success";
}
LOG.info("LogA-key == >>" + msgBody);
} catch (Exception e){
e.printStackTrace();
} finally {
if (lock) {
redisLock.removeLock(lockKey);
}
}
return "success";
}
}
3、消息监听器
public class RedisListener implements InitializingBean {
/**
* Redis 集群
*/
@Resource
private JedisSentinelPool jedisSentinelPool;
private List<RedisHandler> handlers = null;
private ExecutorService product = null;
private ExecutorService consumer = null;
/**
* 初始化配置
*/
@Override
public void afterPropertiesSet() {
handlers = SpringUtil.getBeans(RedisHandler.class) ;
product = new ThreadPoolExecutor(10,15,60 * 3,
TimeUnit.SECONDS,new SynchronousQueue<>());
consumer = new ThreadPoolExecutor(10,15,60 * 3,
TimeUnit.SECONDS,new SynchronousQueue<>());
for (RedisHandler redisHandler : handlers){
product.execute(() -> {
redisTask(redisHandler);
});
}
}
/**
* 队列监听
*/
public void redisTask (RedisHandler redisHandler){
Jedis jedis = null ;
while (true){
try {
jedis = jedisSentinelPool.getResource() ;
List<String> msgBodyList = jedis.brpop(0, redisHandler.queueName());
if (msgBodyList != null && msgBodyList.size()>0){
consumer.execute(() -> {
redisHandler.consume(msgBodyList.get(1)) ;
});
}
} catch (Exception e){
e.printStackTrace();
} finally {
if (jedis != null) jedis.close();
}
}
}
}
4、消息生产者
@Service
public class RedisServiceImpl implements RedisService {
@Resource
private JedisSentinelPool jedisSentinelPool;
@Override
public void saveQueue(String queueKey, String msgBody) {
Jedis jedis = null;
try {
jedis = jedisSentinelPool.getResource();
jedis.lpush(queueKey,msgBody) ;
} catch (Exception e){
e.printStackTrace();
} finally {
if (jedis != null) jedis.close();
}
}
}
5、场景测试接口
@RestController
public class RedisController {
@Resource
private RedisService redisService ;
/**
* 队列推消息
*/
@RequestMapping("/saveQueue")
public String saveQueue (){
MsgBody msgBody = new MsgBody() ;
msgBody.setName("LogAModel");
msgBody.setDesc("描述");
msgBody.setCreateTime(new Date());
redisService.saveQueue("LogA-key", JSONObject.toJSONString(msgBody));
return "success" ;
}
}
四、源代码地址
GitHub地址:知了一笑
https://github.com/cicadasmile/middle-ware-parent
码云地址:知了一笑
https://gitee.com/cicadasmile/middle-ware-parent

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
-
上一篇
Java描述设计模式(03):工厂方法模式
一、工厂方法模式 1、生活场景 系统常见的数据导出功能:数据导出PDF、WORD等常见格式。 2、工厂方法模式 是类的创建模式,又叫做虚拟构造子(Virtual Constructor)模式或者多态性工厂(Polymorphic Factory)模式。工厂方法模式的用意是定义一个创建产品对象的工厂接口,将实际创建工作推迟到子类中。 3、核心角色 1)、抽象工厂角色 这个角色的是工厂方法模式的核心,任何在模式中创建对象的工厂类必须实现这个接口。在实际的系统中,这个角色也常常使用抽象类实现。 2)、具体工厂角色 担任这个角色的是实现了抽象工厂接口的具体JAVA类。具体工厂角色含有与业务密切相关的逻辑,并且受到使用者的调用以创建导出类。 3)、抽象导出角色 工厂方法模式所创建的对象的超类,也就是所有导出类的共同父类或共同拥有的接口。在实际的系统中,这个角色也常常使用抽象类实现。 4)、具体导出角色 这个角色实现了抽象导出角色所声明的接口,工厂方法模式所创建的每一个对象都是某个具体导出角色的实例。 4、代码UML关系图 5、源代码实现 // 客户端角色 public class C01_Fac...
-
下一篇
ubuntu golang vscode环境搭建
安装golang # 到官网下载二进制包https://golang.org/dl/ 或者 https://studygolang.com/dl # 解压包 tar -xvf go1.12.9.linux-amd64.tar.gz # 移动到正常目录下 sudo mv go /usr/local/ # 添加环境变量, 配置代理, 启用gomod sudo vim /etc/profile.d/golang.sh export PATH=$PATH:/usr/local/go/bin export GO111MODULE=on export GOPROXY=https://goproxy.io # 加载环境变量 source /etc/profile.d/golang.sh # 测试安装 go version 安装vscode 到官网下载vscode https://code.visualstudio.com/下载deb包双击安装或者dpkg -i 安装即可 安装vscode 的go插件 安装其他工具 如果因为墙原因无法安装,请翻墙,或者手动安装https://github.com/Mi...
相关文章
文章评论
共有0条评论来说两句吧...