首页 文章 精选 留言 我的

精选列表

搜索[mysql],共10000篇文章
优秀的个人博客,低调大师

Python全栈 MySQL 数据库 (表字段增、删、改、查、函数)

ParisGabriel 每天坚持手写一天一篇 决定坚持几年 为了梦想为了信仰 开局一张图 查询SQL变量show variables 1.表字段的操作 1.语法:alter table表名 执行动作; 2.添加字段(add) alter table表名add字段名 数据类型;(尾插) alter table 表名 add 字段名 数据类型first;(头插) alter table 表名 add 字段名 数据类型after字段名;(指定插入) 3.删除字段(drop) alter table 表名drop字段名; 4.修改数据类型(modify) alter table 表名modify字段名 新数据类型; 5.重命名(rename) alter table 表名rename表名;2.字符类型 1.字符类型宽度与数值类型宽度的区别 1.数值类型宽度为显示宽度,只用于select查询显示 占用储存无关,可用zerofill查看效果 2.枚举类型 1.单选(enum):字段名enum(值1,值2...); 2.多选(set):字段名set(值1,值2...); (多项放在一个字符串内用,号隔开) 3.日期时间类型 1.date:“YYYY-MM-DD” 2.time:“HH:MM:SS” 3.datetime:“YYYY-MM-DD HH:MM:SS” 4.timestamp:“YYYY-MM-DD HH:MM:SS” 5.datetime:不给值默认返回Null 6.timestamp:不给值默认返回系统时间 3. 日期时间函数 1.now()返回服务器当前的时间 2.curdate()返回当前时期 3.curtime()返回当前日期 4.year(date)返回指定时间的年份 5.date(date)返回指定时间的日期 6.time(date)返回指定时间的时间4.日期时间运算 1.语法格式 select * from 表名 where 字段名 运算符(时间-interval时间间隔单位); 时间间隔单位: 1 day | 2hour | 1 minute | 2year | month 5.表记录管理 1.删除表记录 1.deletefrom 表名 where 条件; 注意: 如果不加where条件,所有记录全部清空 2.更改表记录 1.update表名 set 字段1=值1,字段名2=值2,... where 条件 注意: 如果不加where条件,所有记录全部更改 3.运算符操作 1.数值比较/字符比较 1.数值比较: = != > >= < <= 2.字符比较: = != 2.逻辑比较 1.and 2.or 3.范围内比较 1.where 字段名between值1 and 值2 2.where 字段名in(值1,值2,....) 3.where 字段名not in(值1,值2,...) 4.匹配空、非空 1.空:where nameisnull 2.非空:where nameis notnull 3.注意 1.NILL:空值,只能用is或is not取匹配 2.“ ”:空字符串,用=或!=去匹配 4.模糊比较 1.where 字段名like表达式 2.表达式 1._:匹配单个字符 2.% :匹配0到多个字符 NULL不会被统计6.SQL查询: 1语法顺序: 3.select... 聚合函数from表名 1.where 2.group by... 4.having... 5.order by... 6.limit...; 2.order by 1.给出查询结果进行排序 2...order by字段名 升序/降序 升序:ASC(默认排序方式) 降序:DESC 3.limit(永远放在SQL语句的最后) 1.作用:显示查询记录的个数 2.用法 limitn 显示n条记录 limit m,n m表示 从m+1条记录开始显示显示n条记录 limit 2,3 显示第3,4,5条记录 3.分页 每页显示5条记录,显示第4页内容 第1页:limit 0,5 #1,2,3,4,5 第2页:limit 5,5 第3页:limit 10,5 第4页:limit 15,5 每页显示n条记录,显示第m页: limit(m-1)*n,n4.聚合函数 avg(字段名):求该字段的平均值 sum(字段名):求和 max(字段名):最大值 min(字段名):最小值 count(字段名):统计该字段的个数 create database MOSHOU; use MOSHOU; create table hero( id int, name char(15), sex enum("男","女"), country char(10) )default charset=utf8; insert into hero values (1,"曹操","男","魏国"), (2,"小乔","女","吴国"), (3,"诸葛亮","男","蜀国"), (4,"貂蝉","女","东汉"), (5,"赵子龙","男","蜀国"), (6,"魏延","男","蜀国"); use MOSHOU; create table sanguo( id int, name char(20), gongji int, fangyu tinyint unsigned, sex enum("男","女"), country varchar(20) )default charset=utf8; insert into sanguo values (1,'诸葛亮',120,20,'男','蜀国'), (2,'司马懿',119,25,'男','魏国'), (3,'关羽',188,60,'男','蜀国'), (4,'赵云',200,66,'男','魏国'), (5,'孙权',110,20,'男','吴国'), (6,'貂蝉',666,10,'女','魏国'), (7,null,1000,99,'男','蜀国'), (8,'',1005,88,'女','蜀国'); 1、创建库 studb2 2、在库中创建表 t1 ,字段有3个:name、age、phnumber 3、查看表结构 4、在表中第一列添加一个 id 字段 5、把 phnumber 的数据类型改为 bigint 6、在表中最后一列添加一个字段 address 7、删除表中的 age 字段 8、查看表结构 答案: use studb2; create table t1( name char(20), age tinyint unsigned, phnumber char(11) ); desc t1; alter table t1 add id int first; alter table t1 modify phnumber bigint; alter table t1 add address varchar(50); alter table t1 drop age; desc t1; 1、在表中插入3条记录 2、查找2018年7月2日有哪些用户充值了 3、查找2018年7月份充值的信息 4、查找7月30日10:00-12:00充值的信息 答案: insert into t7 values (3,"小昭",19000520,3000,20180630000000), (4,"赵敏",19000521,4000,20180702000000), (5,"周芷若",19010522,3500,20180702100000); select * from t7 where date(shijian)="2018-07-02"; select * from t7 where date(shijian)>="2018-07-01" and date(shijian)<="2018-07-31"; select * from t7 where date(shijian)="2018-07-31" and time(shijian)>="10:00:00" and time(shijian)<="12:00:00"; 1、查询1天以内的记录 2、查询1年以前的记录 3、查询1天以前,3天以内的记录 答案: select * from t7 where shijian > (now()-interval 1 day); select * from t7 where shijian < (now()-interval 1 year); select * from t7 where shijian < (now()-interval 1 day) and shijian > (now()-interval 3 day); 1、查找所有蜀国人的信息 2、查找所有女英雄的姓名、性别和国家 3、把id为2的记录改为典韦,性别男,国家魏国 4、删除所有蜀国英雄 5、把貂蝉的国籍改为魏国 6、删除所有表记录 答案: select * from hero where country="蜀国"; select name,sex,country from hero where sex="女"; update hero set name="典韦",sex="男",country="魏国" where id=2; delete from hero where country="蜀国"; update hero set country="魏国" where name="貂蝉"; delete from hero; 1、找出攻击值高于200的蜀国英雄的名字、攻击力 2、将吴国英雄中攻击值为110的英雄的攻击值改为100,防御力改为60 3、查找蜀国和魏国的英雄信息 答案: select name as n,gongji as g from sanguo where gongji>200 and country="蜀国"; update sanguo set gongji=100,fangyu=60 where country="吴国" and gongji=110; select * from sanguo where country="蜀国" or country="魏国"; 1、查找攻击值100-200的蜀国英雄信息 2、找到蜀国和吴国以外的国家的女英雄信息 3、找到id为1、3或5的蜀国英雄 和 貂蝉的信息 答案: select * from sanguo where gongji between 100 and 200 and country="蜀国"; select * from sanguo where country not in("蜀国","吴国") and sex="女"; select * from sanguo where (id in(1,3,5) and country="蜀国") or name="貂蝉"; 1、在蜀国英雄中,查找防御值倒数第二名至倒数第四名的英雄的记录 2、在蜀国英雄中,查找攻击值前3名且名字不为 NULL 的英雄的姓名、攻击值和国家 答案: select * from sanguo where country="蜀国" order by fangyu asc limit 1,3; select name,gongji,country from sanguo where country="蜀国" and name is not NULL order by gongji DESC limit 3; 1、攻击力最强值是多少 2、统计id 、name 两个字段分别有几条记录 ## 空值 NULL 不会被统计,""会被统计 3、计算蜀国英雄的总攻击力 4、统计蜀国英雄中攻击值大于200的英雄的数量 答案: select max(gongji) from MOSHOU.sanguo; select count(id),count(name) from sanguo; select sum(gongji) from MOSHOU.sanguo where country="蜀国"; select count(*) from MOSHOU.sanguo where gongji>200 and country="蜀国";

优秀的个人博客,低调大师

spring cloud实现 rocketmq可靠一致性的mysql落地实现

1.前言 1.1 目的 为开发测试提供指导性文件 为系统今后的扩展提供参考 解决系统中消息不可达问题 1.2 范围和功能 1.3 适用读者 需要发送MQ分布式系统的开发人员和测试人员 可靠消息服务的开发人员和测试人员 1.4 读者须知 本服务需要提供一个sdk和数据库初始语句创建数据库表,并且对外提供可扫描的domain、mapper、service,使用的技术框架zk + mapper3 + pagehelper + feign(edas) , 使用者(上游系统、下游系统) 只需要在对应的接口上写上响应注解即可实现可靠消息, 如果不熟悉上述框架,可选择对应框架替换,比如redis替换zk,放弃mapper3和pagehelper使用传统的mybatis,使用http接口替换fein(eads)的解决办法,本文不提供替换的解决方案 1.5 参考文档 https://segmentfault.com/a/1190000011479826 2 系统概述 本文为分布式系统解决方案,此方案涉及 3 个模块: 1. 上游应用,执行业务并发送指令给可靠消息服务并保留消息副本。 2. 可靠消息服务和 MQ消息组件,协调上下游消息的传递,并确保上下游数据的一致性。 3. 下游应用,监听 MQ 的消息并执行自身业务并保留消息副本。 2.1业务流程图 2.2数据库表设计 2.2.1 可靠消息表 2.2.2 消费者确认表 2.2.3 消费者表 2.2.4 生产者表 2.2.5 发布关系表 2.2.6 消息重发记录表 暂时未设计 2.2.7 消息订阅关系表 2.2.8 消息订阅TAG关系表 2.2.9 各个子系统消息落地的消息表 3 详细设计 3.1 上游应用执行业务并发送 MQ 消息 上游应用将本地业务执行和消息发送绑定在同一个本地事务中,保证要么本地操作成功并发送 MQ 消息,要么两步操作都失败并回滚。这里可以采用自定义切面完成,后续会有介绍。 上游应用发送待确认消息到可靠消息系统。(本地消息落地) 可靠消息系统保存待确认消息并返回。 上游应用执行本地业务。 上游应用通知可靠消息系统确认业务已执行并发送消息。 可靠消息系统修改消息状态为发送状态并将消息投递到 MQ 中间件。 以上每一步都可能出现失败情况,分析一下这 5 步出现异常后上游业务和消息发送是否一致: 失败步骤 现象 一致性 第1步 上游应用业务未执行,MQ消息未发送 一致 第2步 上游应用业务未执行,MQ消息未发送 一致 第3步 上游应用事物回滚,MQ消息未发送 一致 第4步 上游应用业务执行,MQ消息未发送 不一致 第5步 上游应用业务执行,MQ消息未发送 不一致 上游应用执行完成,下游应用尚未执行或执行失败时,此事务即处于 BASE 理论的 Soft State 状态。 3.2 下游应用监听 MQ 消息并执行业务 下游应用监听 MQ 消息并执行业务,并且将消息的消费结果通知可靠消息服务。(本地消息落地) 可靠消息的状态需要和下游应用的业务执行保持一致,可靠消息状态不是已完成时,确保下游应用未执行,可靠消息状态是已完成时,确保下游应用已执行。 下游应用和可靠消息服务之间的交互图如下: 下游应用监听 MQ 消息组件并获取消息, 并存储本地消息 下游系统通知可靠消息服务已接收到消息 可靠消息把消息更新为已接收状态 下游应用根据 MQ 消息体信息处理本地业务 下游应用向 MQ 组件自动发送 ACK 确认消息被消费 下游应用通知可靠消息系统消息被成功消费,可靠消息将该消息状态更改为以消费,任务表状态修改为已完成。 失败步骤 现象 一致性 第1步 下游应用业务未接收MQ消息,MQ消息为已发送未接收 不一致 第2步 通知可靠消息服务,接收到消息 不一致 第3步 下游应用异步通知 不一致 第4步 下游应用数据回滚,本地消息存储成功,消息状态为已接收未成功消费 一致 第5步 MQ未收到ack确认 一致 第6步 下游应用异步通知 不一致 1. 下游应用监听 MQ 消息组件并获取消息, 并存储本地消息 2. 下游系统通知可靠消息服务已接收到消息 3. 可靠消息把消息更新为已接收状态 4. 下游应用根据 MQ 消息体信息处理本地业务 5. 下游应用向 MQ 组件自动发送 ACK 确认消息被消费 6. 下游应用通知可靠消息系统消息被成功消费,可靠消息将该消息状态更改为已消费,任务表状态修改为已完成 3.3 生产者消息状态确认 可靠消息服务定时监听消息的状态,如果存在状态为待确认并且超时的消息,则表示上游应用和可靠消息交互中的步骤 4 或者 5 出现异常。 可靠消息则携带消息体内的信息向上游应用发起请求查询该业务是否已执行。上游应用提供一个可查询接口供可靠消息追溯业务执行状态,如果业务执行成功则更改消息状态为已发送,否则删除此消息确保数据一致。具体流程如下: 3.4 消费者消息状态确认 下游消费MQ服务异步通知可靠消息的过程中可能出现异常,在此可能导致两个现象一、消息已接到但可靠消息没有确认接到二、消息已成功消费但可靠消息没有确认接到,为此下游系统需要提供消费者消息状态查询接口,从而可靠消息重新确认.在确认过程中如果是可靠消息为已消费而下游消费系统为已接收则不进行更新操作. 具体流程如下: 3.5 消息重投 消息已发送则表示上游应用已经执行,接下来则确保下游应用也能正常执行。 可靠消息服务发现可靠消息服务中存在消息状态为已发送并且超时的消息,则表示可靠消息服务和下游应用中存在异常的步骤,无论哪个步骤出现异常,可靠消息服务都将此消息重新投递到 MQ 组件中供下游应用监听。 下游应用监听到此消息后,在保证幂等性的情况下重新执行业务并通知可靠消息服务此消息已经成功消费,最终确保上游应用、下游应用的数据最终一致性。具体流程如下: 可靠消息服务定时查询状态为已发送并超时的消息 可靠消息将消息重新投递到 MQ 组件中 下游应用监听消息,在满足幂等性的条件下,重新执行业务。 下游应用通知可靠消息服务该消息已经成功消费。 更新consumer消息记录为已消费 3.6 删除上游系统7天前成功发送的消息 在预发送执行MQ消息的时候本地消息如果落库则需要删除消息,否则业务系统需要额外提供查询消息发送状态接口, 这里介绍两种方法 第一种,RPC服务接口来实现, 在生产者和消费者注册到可靠消息的时候把生产者和消费者存储到BeanFactory的Map里在定时清理任务的时候去处理在线的RPC服务 第二种,发可靠消息来实现, 确保100%到达 3.7 删除下游系统7天前成功消费的消息 在消费MQ消息的时候本地消息如果落库则需要删除消息,否则业务系统需要额外提供查询消息发送状态接口,删除实现同3.6 3.8 每天备份可靠消息记录 每天将成功消息删除并备份到对应数据库提供历史消息查询功能,当然如果你选择mongo可以不考虑备份消息 4 核心代码实现 这里做一个说明,因为项目采用的是rocketmq,一个topic对应一个生产者,而可靠消息采用的是中间件负责发送消息,又不能采用中间件的生产者为所有上游系统发送消息,这里引入了zookeeper做注册中心,所以依赖可靠消息的服务,在启动项目的时候会像中间件去注册生产者,而中间件的watch机制会及时的更新生产者和消费者状态,而中间件会为使用中间件的系统提供sdk,使用者无需关注实现,只需要引入中间件的sdk和对应的注解即可完成可靠消息的发送和消费,详见下图: 普通消息发送流程: 可靠消息发送流程: 可靠消息发送和消费流程: 服务注册 public static void startup(PaascloudProperties paascloudProperties, String host, String app) { CoordinatorRegistryCenter coordinatorRegistryCenter = createCoordinatorRegistryCenter(paascloudProperties.getZk()); RegisterDto dto = new RegisterDto(app, host, coordinatorRegistryCenter); Long serviceId = new IncrementIdGenerator(dto).nextId(); IncrementIdGenerator.setServiceId(serviceId); registerMq(paascloudProperties, host, app); } private static void registerMq(PaascloudProperties paascloudProperties, String host, String app) { CoordinatorRegistryCenter coordinatorRegistryCenter = createCoordinatorRegistryCenter(paascloudProperties.getZk()); AliyunProperties.RocketMqProperties rocketMq = paascloudProperties.getAliyun().getRocketMq(); String consumerGroup = rocketMq.isReliableMessageConsumer() ? rocketMq.getConsumerGroup() : null; String namesrvAddr = rocketMq.getNamesrvAddr(); String producerGroup = rocketMq.isReliableMessageProducer() ? rocketMq.getProducerGroup() : null; coordinatorRegistryCenter.registerMq(app, host, producerGroup, consumerGroup, namesrvAddr); } @Override public void registerMq(final String app, final String host, final String producerGroup, final String consumerGroup, String namesrvAddr) { // 注册生产者 final String producerRootPath = GlobalConstant.ZK_REGISTRY_PRODUCER_ROOT_PATH + GlobalConstant.Symbol.SLASH + app; final String consumerRootPath = GlobalConstant.ZK_REGISTRY_CONSUMER_ROOT_PATH + GlobalConstant.Symbol.SLASH + app; ReliableMessageRegisterDto dto; if (StringUtils.isNotEmpty(producerGroup)) { dto = new ReliableMessageRegisterDto().setProducerGroup(producerGroup).setNamesrvAddr(namesrvAddr); String producerJson = JSON.toJSONString(dto); this.persist(producerRootPath, producerJson); this.persistEphemeral(producerRootPath + GlobalConstant.Symbol.SLASH + host, DateUtil.now()); } // 注册消费者 if (StringUtils.isNotEmpty(consumerGroup)) { dto = new ReliableMessageRegisterDto().setConsumerGroup(consumerGroup).setNamesrvAddr(namesrvAddr); String producerJson = JSON.toJSONString(dto); this.persist(consumerRootPath, producerJson); this.persistEphemeral(consumerRootPath + GlobalConstant.Symbol.SLASH + host, DateUtil.now()); } } 消费注解 @MqProducerStore @Around(value = "mqProducerStoreAnnotationPointcut()") public Object processMqProducerStoreJoinPoint(ProceedingJoinPoint joinPoint) throws Throwable { log.info("processMqProducerStoreJoinPoint - 线程id={}", Thread.currentThread().getId()); Object result; Object[] args = joinPoint.getArgs(); MqProducerStore annotation = getAnnotation(joinPoint); MqSendTypeEnum type = annotation.sendType(); int orderType = annotation.orderType().orderType(); DelayLevelEnum delayLevelEnum = annotation.delayLevel(); if (args.length == 0) { throw new TpcBizException(ErrorCodeEnum.TPC10050005); } MqMessageData domain = null; for (Object object : args) { if (object instanceof MqMessageData) { domain = (MqMessageData) object; break; } } if (domain == null) { throw new TpcBizException(ErrorCodeEnum.TPC10050005); } domain.setOrderType(orderType); domain.setProducerGroup(producerGroup); if (type == MqSendTypeEnum.WAIT_CONFIRM) { if (delayLevelEnum != DelayLevelEnum.ZERO) { domain.setDelayLevel(delayLevelEnum.delayLevel()); } mqMessageService.saveWaitConfirmMessage(domain); } result = joinPoint.proceed(); if (type == MqSendTypeEnum.SAVE_AND_SEND) { mqMessageService.saveAndSendMessage(domain); } else if (type == MqSendTypeEnum.DIRECT_SEND) { mqMessageService.directSendMessage(domain); } else { mqMessageService.confirmAndSendMessage(domain.getMessageKey()); } return result; } 生产注解@MqConsumerStore @Around(value = "mqConsumerStoreAnnotationPointcut()") public Object processMqConsumerStoreJoinPoint(ProceedingJoinPoint joinPoint) throws Throwable { log.info("processMqConsumerStoreJoinPoint - 线程id={}", Thread.currentThread().getId()); Object result; long startTime = System.currentTimeMillis(); Object[] args = joinPoint.getArgs(); MqConsumerStore annotation = getAnnotation(joinPoint); boolean isStorePreStatus = annotation.storePreStatus(); List<MessageExt> messageExtList; if (args == null || args.length == 0) { throw new TpcBizException(ErrorCodeEnum.TPC10050005); } if (!(args[0] instanceof List)) { throw new TpcBizException(ErrorCodeEnum.GL99990001); } try { messageExtList = (List<MessageExt>) args[0]; } catch (Exception e) { log.error("processMqConsumerStoreJoinPoint={}", e.getMessage(), e); throw new TpcBizException(ErrorCodeEnum.GL99990001); } MqMessageData dto = this.getTpcMqMessageDto(messageExtList.get(0)); final String messageKey = dto.getMessageKey(); if (isStorePreStatus) { mqMessageService.confirmReceiveMessage(consumerGroup, dto); } String methodName = joinPoint.getSignature().getName(); try { result = joinPoint.proceed(); log.info("result={}", result); if (CONSUME_SUCCESS.equals(result.toString())) { mqMessageService.saveAndConfirmFinishMessage(consumerGroup, messageKey); } } catch (Exception e) { log.error("发送可靠消息, 目标方法[{}], 出现异常={}", methodName, e.getMessage(), e); throw e; } finally { log.info("发送可靠消息 目标方法[{}], 总耗时={}", methodName, System.currentTimeMillis() - startTime); } return result; } 定时清理所有订阅者消费成功的消息数据 @Slf4j @ElasticJobConfig(cron = "0 0 0 1/1 * ?") public class DeleteRpcConsumerMessageJob implements SimpleJob { @Resource private PaascloudProperties paascloudProperties; @Resource private TpcMqMessageService tpcMqMessageService; /** * Execute. * * @param shardingContext the sharding context */ @Override public void execute(final ShardingContext shardingContext) { ShardingContextDto shardingContextDto = new ShardingContextDto(shardingContext.getShardingTotalCount(), shardingContext.getShardingItem()); final TpcMqMessageDto message = new TpcMqMessageDto(); message.setMessageBody(JSON.toJSONString(shardingContextDto)); message.setMessageTag(AliyunMqTopicConstants.MqTagEnum.DELETE_CONSUMER_MESSAGE.getTag()); message.setMessageTopic(AliyunMqTopicConstants.MqTopicEnum.TPC_TOPIC.getTopic()); message.setProducerGroup(paascloudProperties.getAliyun().getRocketMq().getProducerGroup()); String refNo = Long.toString(UniqueIdGenerator.generateId()); message.setRefNo(refNo); message.setMessageKey(refNo); tpcMqMessageService.saveAndSendMessage(message); } } 定时清理所有生产者发送成功的消息数据 @Slf4j @ElasticJobConfig(cron = "0 0 0 1/1 * ?") public class DeleteRpcExpireFileJob implements SimpleJob { @Resource private OpcRpcService opcRpcService; /** * Execute. * * @param shardingContext the sharding context */ @Override public void execute(final ShardingContext shardingContext) { opcRpcService.deleteExpireFile(); } } 定时清理所有生产者发送成功的消息数据 @Slf4j @ElasticJobConfig(cron = "0 0 1 1/1 * ?") public class DeleteRpcProducerMessageJob implements SimpleJob { @Resource private PaascloudProperties paascloudProperties; @Resource private TpcMqMessageService tpcMqMessageService; /** * Execute. * * @param shardingContext the sharding context */ @Override public void execute(final ShardingContext shardingContext) { final TpcMqMessageDto message = new TpcMqMessageDto(); message.setMessageBody(JSON.toJSONString(shardingContext)); message.setMessageTag(AliyunMqTopicConstants.MqTagEnum.DELETE_PRODUCER_MESSAGE.getTag()); message.setMessageTopic(AliyunMqTopicConstants.MqTopicEnum.TPC_TOPIC.getTopic()); message.setProducerGroup(paascloudProperties.getAliyun().getRocketMq().getProducerGroup()); String refNo = Long.toString(UniqueIdGenerator.generateId()); message.setRefNo(refNo); message.setMessageKey(refNo); tpcMqMessageService.saveAndSendMessage(message); } } 处理发送中的消息数据 @Component @Slf4j @ElasticJobConfig(cron = "0/30 * * * * ?", jobParameter = "fetchNum=200") public class HandleSendingMessageJob extends AbstractBaseDataflowJob<TpcMqMessage> { @Resource private TpcMqMessageService tpcMqMessageService; @Value("${paascloud.message.handleTimeout}") private int timeOutMinute; @Value("${paascloud.message.maxSendTimes}") private int messageMaxSendTimes; @Value("${paascloud.message.resendMultiplier}") private int messageResendMultiplier; @Resource private TpcMqConfirmMapper tpcMqConfirmMapper; /** * Fetch job data list. * * @param jobParameter the job parameter * * @return the list */ @Override protected List<TpcMqMessage> fetchJobData(JobParameter jobParameter) { MessageTaskQueryDto query = new MessageTaskQueryDto(); query.setCreateTimeBefore(DateUtil.getBeforeTime(timeOutMinute)); query.setMessageStatus(MqSendStatusEnum.SENDING.sendStatus()); query.setFetchNum(jobParameter.getFetchNum()); query.setShardingItem(jobParameter.getShardingItem()); query.setShardingTotalCount(jobParameter.getShardingTotalCount()); query.setTaskStatus(JobTaskStatusEnum.TASK_CREATE.status()); return tpcMqMessageService.listMessageForWaitingProcess(query); } /** * Process job data. * * @param taskList the task list */ @Override @Transactional(rollbackFor = Exception.class) protected void processJobData(List<TpcMqMessage> taskList) { for (TpcMqMessage message : taskList) { Integer resendTimes = message.getResendTimes(); if (resendTimes >= messageMaxSendTimes) { tpcMqMessageService.setMessageToAlreadyDead(message.getId()); continue; } int times = (resendTimes == 0 ? 1 : resendTimes) * messageResendMultiplier; long currentTimeInMillis = Calendar.getInstance().getTimeInMillis(); long needTime = currentTimeInMillis - times * 60 * 1000; long hasTime = message.getUpdateTime().getTime(); // 判断是否达到了可以再次发送的时间条件 if (hasTime > needTime) { log.debug("currentTime[" + com.xiaoleilu.hutool.date.DateUtil.formatDateTime(new Date()) + "],[SENDING]消息上次发送时间[" + com.xiaoleilu.hutool.date.DateUtil.formatDateTime(message.getUpdateTime()) + "],必须过了[" + times + "]分钟才可以再发送。"); continue; } // 前置状态 List<Integer> preStatusList = Lists.newArrayList(JobTaskStatusEnum.TASK_CREATE.status()); // 设置任务状态为执行中 message.setPreStatusList(preStatusList); message.setTaskStatus(JobTaskStatusEnum.TASK_EXETING.status()); int updateRes = tpcMqMessageService.updateMqMessageTaskStatus(message); if (updateRes > 0) { try { // 查询是否全部订阅者都确认了消息 是 则更新消息状态完成, 否则重发消息 int count = tpcMqConfirmMapper.selectUnConsumedCount(message.getMessageKey()); int status = JobTaskStatusEnum.TASK_CREATE.status(); if (count < 1) { TpcMqMessage update = new TpcMqMessage(); update.setMessageStatus(MqSendStatusEnum.FINISH.sendStatus()); update.setId(message.getId()); tpcMqMessageService.updateMqMessageStatus(update); status = JobTaskStatusEnum.TASK_SUCCESS.status(); } else { tpcMqMessageService.resendMessageByMessageId(message.getId()); } // 前置状态 preStatusList = Lists.newArrayList(JobTaskStatusEnum.TASK_EXETING.status()); // 设置任务状态为执行中 message.setPreStatusList(preStatusList); message.setTaskStatus(status); tpcMqMessageService.updateMqMessageTaskStatus(message); } catch (Exception e) { log.error("重发失败 ex={}", e.getMessage(), e); // 设置任务状态为执行中 preStatusList = Lists.newArrayList(JobTaskStatusEnum.TASK_EXETING.status()); message.setPreStatusList(preStatusList); message.setTaskStatus(JobTaskStatusEnum.TASK_SUCCESS.status()); tpcMqMessageService.updateMqMessageTaskStatus(message); } } } } } 处理待确认的消息数据 @Slf4j @Component @ElasticJobConfig(cron = "0 0/10 * * * ?", jobParameter = "fetchNum=1000") public class HandleWaitingConfirmMessageJob extends AbstractBaseDataflowJob<String> { @Resource private TpcMqMessageService tpcMqMessageService; @Resource private UacRpcService uacRpcService; @Value("${paascloud.message.handleTimeout}") private int timeOutMinute; private static final String PID_UAC = "PID_UAC"; /** * Fetch job data list. * * @param jobParameter the job parameter * * @return the list */ @Override protected List<String> fetchJobData(JobParameter jobParameter) { MessageTaskQueryDto query = new MessageTaskQueryDto(); query.setCreateTimeBefore(DateUtil.getBeforeTime(timeOutMinute)); query.setMessageStatus(MqSendStatusEnum.WAIT_SEND.sendStatus()); query.setFetchNum(jobParameter.getFetchNum()); query.setShardingItem(jobParameter.getShardingItem()); query.setShardingTotalCount(jobParameter.getShardingTotalCount()); query.setTaskStatus(JobTaskStatusEnum.TASK_CREATE.status()); query.setProducerGroup(PID_UAC); return tpcMqMessageService.queryWaitingConfirmMessageKeyList(query); } /** * */ @Override protected void processJobData(List<String> messageKeyList) { if (messageKeyList == null) { return; } List<String> resendMessageList = uacRpcService.queryWaitingConfirmMessageKeyList(messageKeyList); if (resendMessageList == null) { resendMessageList = Lists.newArrayList(); } messageKeyList.removeAll(resendMessageList); tpcMqMessageService.handleWaitingConfirmMessage(messageKeyList, resendMessageList); } } 可靠消息用法 例子 @MqProducerStore public void resetLoginPwd(final MqMessageData mqMessageData, final UacUser update) { log.info("重置密码. mqMessageData={}, user={}", mqMessageData, update); int updateResult = uacUserMapper.updateByPrimaryKeySelective(update); if (updateResult < 1) { log.error("用户【 {} 】重置密码失败", update.getLoginName()); } else { log.info("用户【 {} 】重置密码失败", update.getLoginName()); } } 强制: 需要使用的使用加上述两个注解,方法参数需要加入 MqMessageData 如果对本文感兴趣,或者本文对您有所帮助,可靠参考github代码,本套代码是spring cloud E版本 + vue两套全家桶实现 后端项目:https://github.com/paascloud/paascloud-master https://gitee.com/passcloud/paascloud-master 登录入口:https://github.com/paascloud/paascloud-login-web https://gitee.com/passcloud/paascloud-login-web 后端入口:https://github.com/paascloud/paascloud-admin-web https://gitee.com/passcloud/paascloud-admin-web 前端入口:https://github.com/paascloud/paascloud-mall-web https://gitee.com/passcloud/paascloud-mall-web 如果有时间最好能给点加个星或者follow一下,笔者在这里先谢过了。对不知道怎么加星的朋友,请用电脑登录github或者码云,这里两个截图 写在最后 更多内容请参考paascloud 建站文档 https://document.paascloud.net/

优秀的个人博客,低调大师

SSO场景系列:实现Shibboleth+JAAS+Mysql到阿里云的单点登录

Shibboleth简介 Shibboleth是一个基于标准的,实现组织内部或跨组织的网页单点登录的开源软件包。它允许站点为处于私有保护方式下的受保护的在线资源做出被通知的认证决定。 Shibboleth软件工具广泛使用联合的身份标准,主要是OASIS安全声称标记语言(SAML),来提供一个联合单点登录和属性交换框架。一个用户用他的组织的证书认证,组织(或IdP)传送最少的必要的身份信息给SP实现认证决定。Shibboleth也提供扩展的隐私功能,允许一个用户和他们的主站点来控制释放给每一个应用的属性。 Shibboleth项目作为一个Internet2中间件活动启动于2000年,这年晚些时候该项目和OASIS SAML工作组的工作相联系。Shibboleth1.0 于2003年发布,并快速被全世界的研究和教育机构使用。随着2005年S

资源下载

更多资源
腾讯云软件源

腾讯云软件源

为解决软件依赖安装时官方源访问速度慢的问题,腾讯云为一些软件搭建了缓存服务。您可以通过使用腾讯云软件源站来提升依赖包的安装速度。为了方便用户自由搭建服务架构,目前腾讯云软件源站支持公网访问和内网访问。

Nacos

Nacos

Nacos /nɑ:kəʊs/ 是 Dynamic Naming and Configuration Service 的首字母简称,一个易于构建 AI Agent 应用的动态服务发现、配置管理和AI智能体管理平台。Nacos 致力于帮助您发现、配置和管理微服务及AI智能体应用。Nacos 提供了一组简单易用的特性集,帮助您快速实现动态服务发现、服务配置、服务元数据、流量管理。Nacos 帮助您更敏捷和容易地构建、交付和管理微服务平台。

Spring

Spring

Spring框架(Spring Framework)是由Rod Johnson于2002年提出的开源Java企业级应用框架,旨在通过使用JavaBean替代传统EJB实现方式降低企业级编程开发的复杂性。该框架基于简单性、可测试性和松耦合性设计理念,提供核心容器、应用上下文、数据访问集成等模块,支持整合Hibernate、Struts等第三方框架,其适用范围不仅限于服务器端开发,绝大多数Java应用均可从中受益。

Rocky Linux

Rocky Linux

Rocky Linux(中文名:洛基)是由Gregory Kurtzer于2020年12月发起的企业级Linux发行版,作为CentOS稳定版停止维护后与RHEL(Red Hat Enterprise Linux)完全兼容的开源替代方案,由社区拥有并管理,支持x86_64、aarch64等架构。其通过重新编译RHEL源代码提供长期稳定性,采用模块化包装和SELinux安全架构,默认包含GNOME桌面环境及XFS文件系统,支持十年生命周期更新。

用户登录
用户注册