Spring Boot SSE

Spring Boot SSE

简介

Server-Sent Events (SSE) 是一种服务器推送技术,使客户端能够通过HTTP连接从服务器接收自动更新。

实现方式

编写 Message 类:

1
2
3
4
5
6
7
8
9
10
import lombok.Data;

@Data
public class Message {

private String userId;

private String message;

}

编写 EventHandler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Component;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter.SseEventBuilder;

import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;

@Slf4j
@Component
public class EventHandler {

private static final long DEFAULT_TIMEOUT = 60 * 60 * 1000L;
private final ConcurrentHashMap<String, SseEmitter> emitterConcurrentHashMap = new ConcurrentHashMap<>();

public SseEmitter registerClient(String userId) {
if (emitterConcurrentHashMap.containsKey(userId)) {
return emitterConcurrentHashMap.get(userId);
} else {
SseEmitter emitter = new SseEmitter(DEFAULT_TIMEOUT);
emitter.onCompletion(() -> emitterConcurrentHashMap.remove(userId));
emitter.onError((err) -> removeAndLogError(userId));
emitter.onTimeout(() -> removeAndLogError(userId));
return emitterConcurrentHashMap.put(userId, emitter);
}
}

private void removeAndLogError(String userId) {
log.info("Error during communication. Unregister client {}", userId);
emitterConcurrentHashMap.remove(userId);
}

public void broadcastMessage(Message message) {
emitterConcurrentHashMap.forEach((k, v) -> sendMessage(k, message));
}

private void sendMessage(String userId, Message message) {
var sseEmitter = emitterConcurrentHashMap.get(userId);
try {
log.info("Notify client " + userId + " " + message.toString());
SseEventBuilder eventBuilder = SseEmitter.event().data(message, MediaType.APPLICATION_JSON).name("chat");
sseEmitter.send(eventBuilder);
} catch (IOException e) {
sseEmitter.completeWithError(e);
}
}

}

编写接口 SSEController

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import jakarta.annotation.Resource;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

@RestController
public class SSEController {

@Resource
private EventHandler eventHandler;

@PostMapping("/message")
public void sendMessage(@RequestBody Message message) {
eventHandler.broadcastMessage(message);
}

@GetMapping("/connect")
public SseEmitter connect(@RequestParam(defaultValue = "admin") String userId) {
return eventHandler.registerClient(userId);
}

}

编写 src/main/resources/static/chat.js 前端业务逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
"use strict";

async function postData(url, data) {
const response = await fetch(url, {
method: 'POST',
mode: 'cors',
cache: 'no-cache',
credentials: 'same-origin',
headers: {
'Content-Type': 'application/json'
},
redirect: 'follow',
referrerPolicy: 'no-referrer',
body: JSON.stringify(data)
});
return response;
}

function send() {
const input = document.getElementById('messageInput').value;
console.error(window.assignedName);
postData('/message',{ message: input, userId: window.assignedName});
}

function handleChatEvent(eventData) {
const userNameNode = document.createElement('span');
userNameNode.innerHTML = eventData.userId + ': ';
const li = document.createElement("li");
li.appendChild(userNameNode);
li.appendChild(document.createTextNode(eventData.message));
const ul = document.getElementById("list");
ul.appendChild(li);
}

function registerSSE(url) {
const source = new EventSource(url);
source.addEventListener('chat', event => {
console.error("ininini");
handleChatEvent(JSON.parse(event.data));
})
source.onopen = event => console.log("Connection opened");
source.onerror = event => console.error("Connection error");
source.onmessage = event => console.error("ininini");
return source;
}

function connect() {
let url = "/connect?userId=" + document.getElementById('username').value
document.getElementById('connect').hidden = true;
window.assignedName = document.getElementById('username').value;
window.eventSource = registerSSE(url);
}

编写页面 src/main/resources/static/index.html:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
<!doctype html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport"
content="width=device-width, initial-scale=1.0">
<meta http-equiv="X-UA-Compatible" content="ie=edge">
<title>Document</title>
</head>
<body>

<div class="center">
<input class="text" id="username" placeholder="">
<button id="connect" type="submit" value="Connect" onclick="connect()">Connect</button>
</div>

<div>
<input class="text" id="messageInput" placeholder="">
<button type="submit" value="Send" onclick="send()">Send Message</button>
<ul id="list"></ul>
</div>

<script src="chat.js"></script>

</body>
</html>

常见问题

链接超时

由于是长连接所以可能出现链接超时的情况,例如在 k8s 上使用 nginx-ingress-controller 就可能遇到,建议参照 ingress 文档进行配置。

参考资料

Server-Sent Events in Spring

demo-chat-app-sse-spring-boot


Spring Boot SSE
https://wangqian0306.github.io/2023/sse/
作者
WangQian
发布于
2023年5月4日
许可协议