为什么需要这个模块
做后台系统的时候,通知、审批进度、在线状态这类需求很常见。轮询能用,但费连接、也费服务端;WebSocket 能力强,接入成本却不低。SSE(Server-Sent Events)夹在中间——基于普通 HTTP,浏览器原生支持 EventSource,服务端单向推数据正好够用。
1.2.0 之前,Wiki-Framework 在 Socket、WebSocket 上已经有封装,但缺一块「轻量推送」的能力。wiki-sse 就是补这个空位的:基于 Spring MVC 的 SseEmitter,把连接管理、心跳保活、单播/广播/组播、生命周期回调这些脏活累活收进一个 Service,业务侧只管「连上来」和「发消息」。
模块里有什么
整个模块目前三个核心类,职责很清晰:
|
类
|
干什么
|
|
SseEmitterConnectionService
|
连接池、心跳、消息推送,日常开发主要跟它打交道
|
|
SseEmitterCallback
|
连接建立、断开、超时、出错、心跳等事件的回调接口
|
|
EmitterConstant
|
约定好的事件名和标识,比如 heartbeat、ping、success
|
依赖很轻:只拉了 wiki-util、wiki-entity 和 Spring WebMVC,不绑死整套 wiki-all,按需引入就行。
<dependency>
<groupId>com.framewiki</groupId>
<artifactId>wiki-sse</artifactId>
<version>1.2.0</version>
</dependency>
核心设计:连接怎么管
SseEmitterConnectionService 内部用两个 ConcurrentHashMap 分别存连接和心跳任务:
同一个 sessionId 重复连接时,会先关掉旧连接再建新连接,避免一个用户挂着两条长连接占资源。这点在实际项目里挺重要——用户刷新页面、网络闪断重连,都容易触发重复 connect。
连接建立后,服务会自动做三件事:
-
立刻推一条欢迎消息,event.id 为 success,前端可以用来确认链路通了;
-
启动心跳,默认每 25 秒发一次 heartbeat 事件,数据是 ping;
-
挂上完成 / 超时 / 错误回调,统一走 SseEmitterCallback,该清理的清理干净。
心跳间隔特意设在 25 秒,注释里写得很直白:要比网关、负载均衡的空闲超时短一截,不然连接会被中间层悄悄掐掉,你还以为是客户端的问题。
// 连接建立后的核心流程(节选)
public SseEmitter connect(String sessionId) {
long timeout = 60 * 15 * 1000; // 15 分钟
SseEmitter sseEmitter = new SseEmitter(timeout);
String cacheKey = KEY_PREFIX + sessionId;
// 同 sessionId 重连:关掉旧的
SseEmitter oldEmitter = sseEmitterMap.put(cacheKey, sseEmitter);
if (oldEmitter != null) {
oldEmitter.complete();
// ...取消旧心跳
}
startHeartbeat(sessionId, sseEmitter);
sseEmitter.onCompletion(() -> sseEmitterCallback.onCompletion(sessionId));
sseEmitter.onTimeout(() -> {
sseEmitterCallback.onTimeout(sessionId);
disconnect(sessionId);
});
sseEmitter.onError(e -> {
sseEmitterCallback.onError(sessionId, e);
disconnect(sessionId);
});
sseEmitter.send(SseEmitter.event()
.data("SSE连接建立成功", MediaType.TEXT_PLAIN)
.id(EmitterConstant.SUCCESS));
sseEmitterCallback.onConnect(sessionId);
return sseEmitter;
}
服务关闭时还有 @PreDestroy,会把调度器、心跳任务、所有连接一并收掉,不会留僵尸线程。
消息怎么发:三种模式
推送 API 分三档,按场景选就行:
|
方法
|
场景
|
|
sendMessage(sessionId, message)
|
给某个会话单播
|
|
sendMessageAll(message)
|
全体广播
|
|
groupSendMessage(groupId, message)
|
按组播(sessionId 以 groupId 为前缀匹配)
|
单播失败(比如客户端已经断了)会自动 disconnect,不会留着无效连接占 map。每条业务消息会带一个短 UUID 作为 event.id,方便前端做去重或排查。
// 审批通过后,只推给当前用户
sseEmitterConnectionService.sendMessage(userId, "您的请假单已通过");
// 系统维护通知,推给所有在线连接
sseEmitterConnectionService.sendMessageAll("系统将于 22:00 维护,请提前保存");
// 某个项目组的消息,sessionId 约定成 groupId_userId 的形式
sseEmitterConnectionService.groupSendMessage("project-42", "需求 #128 状态已更新");
组播这块有个约定要心里有数:groupSendMessage 是靠 key 前缀匹配的,sessionId 设计时最好带上组信息,比如 project-42_user_10086,不然组播筛不出来。
回调接口:把业务钩子留出来
SseEmitterCallback 定义了六个钩子,覆盖连接的完整生命周期:
public interface SseEmitterCallback {
void onConnect(String sessionId); // 连接成功
void onDisconnect(String sessionId); // 主动断开
void onError(String sessionId, Throwable throwable);
void onTimeout(String sessionId);
void onCompletion(String sessionId); // 连接正常结束
void onHeartbeat(String sessionId); // 心跳任务启动时
}
这个接口需要你自己实现并注册成 Spring Bean,框架不会替你写默认实现——毕竟有人要在 onConnect 里记在线状态,有人要在 onDisconnect 里清缓存,业务差异太大。
一个比较典型的写法:
@Component
public class MySseCallback implements SseEmitterCallback {
@Override
public void onConnect(String sessionId) {
// 记入在线表、打日志、更新 Redis 都行
}
@Override
public void onDisconnect(String sessionId) {
// 清理在线状态
}
@Override
public void onError(String sessionId, Throwable throwable) {
// 告警或记错误日志
}
@Override
public void onTimeout(String sessionId) {
// 超时也当断开处理
}
@Override
public void onCompletion(String sessionId) {
// 客户端正常关闭
}
@Override
public void onHeartbeat(String sessionId) {
// 一般不用干啥,需要的话可以用来续期
}
}
业务接入:从 Controller 到前端
服务端入口就是一个返回 SseEmitter 的接口,把 sessionId 传进去即可。sessionId 通常用当前登录用户 ID,或者用户 ID + 业务场景拼出来。
@RestController
@RequestMapping("/api/sse")
@RequiredArgsConstructor
public class NotifyController {
private final SseEmitterConnectionService sseService;
@GetMapping(value = "/connect", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public SseEmitter connect(@RequestParam String sessionId) {
return sseService.connect(sessionId);
}
@PostMapping("/push")
public void push(@RequestParam String sessionId, @RequestParam String message) {
sseService.sendMessage(sessionId, message);
}
}
前端用原生 EventSource 就能接,不用额外引库:
const sessionId = 'user_10086';
const source = new EventSource(`/api/sse/connect?sessionId=${sessionId}`);
// 连接成功时服务端会推 id=success 的事件
source.addEventListener('message', (e) => {
if (e.lastEventId === 'success') {
console.log('SSE 已连通:', e.data);
return;
}
console.log('收到消息:', e.data);
});
// 心跳事件,保持连接用,一般忽略即可
source.addEventListener('heartbeat', () => {});
source.onerror = () => {
console.warn('连接异常,浏览器会自动重连');
};
如果走 Nginx 反向代理,记得关掉缓冲,不然 SSE 会被攒着不发:
location /api/sse/ {
proxy_pass http://backend;
proxy_buffering off;
proxy_cache off;
proxy_read_timeout 3600s;
}
几个实用 API 顺带提一下
除了连和发,服务还暴露了这几个辅助方法,排查问题时挺好用:
// 这个用户还在线吗?
boolean online = sseEmitterConnectionService.isConnected(sessionId);
// 当前一共多少条连接?
int count = sseEmitterConnectionService.getConnectionCount();
// 踢掉某个会话
sseEmitterConnectionService.disconnect(sessionId);
// 全部关掉(比如发版前清场)
sseEmitterConnectionService.disconnectAll();
和 WebSocket 怎么选
简单说:
-
只要服务端往客户端推,客户端偶尔发请求走普通 HTTP —— 用 SSE,省心;
-
要双向实时通信(聊天、协同编辑、游戏)—— 还是 WebSocket;
-
要兼容老浏览器或极简场景 —— SSE 基于 HTTP,穿透代理通常更顺。
Wiki-Framework 里 wiki-sse 和 wiki-web-socket 是并列模块,不冲突,按场景选一个或混用都行。
小结
wiki-sse 在 1.2.0 里是全新模块,不是什么花哨封装,就是把 SSE 长连接最容易踩坑的几件事处理好了:
-
连接复用与替换
-
25 秒心跳保活
-
单播 / 广播 / 组播
-
生命周期回调
-
服务关停时的资源回收
业务侧接入路径也很直接:引依赖 → 实现 SseEmitterCallback → Controller 返回 connect(sessionId) → 需要推送时调 sendMessage。如果你已经在用 Wiki-Framework 1.2.0,又刚好有「服务端主动通知前端」的需求,这个模块值得试一下。