Spring RSocket

Spring RSocket

简介

RSocket 是一种使用异步二进制流提供 Reactive Streams 语义的应用程序协议,使用它可以不关注底层的实现方式。

使用方式

引入如下依赖:

1
2
3
4
5
6
7
dependencies {
implementation 'org.springframework.boot:spring-boot-starter-rsocket'
implementation 'org.springframework.boot:spring-boot-starter-webflux'
developmentOnly 'org.springframework.boot:spring-boot-devtools'
testImplementation 'org.springframework.boot:spring-boot-starter-test'
testImplementation 'io.projectreactor:reactor-test'
}

编写测试 Record:

1
2
public record Message(String name, String content) {
}

编写测试 Controller:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.stereotype.Controller;
import reactor.core.publisher.Mono;

import java.time.Instant;

@Controller
public class TestController {

@MessageMapping("getByName")
Mono<Message> getByName(String name) {
return Mono.just(new Message(name, Instant.now().toString()));
}

@MessageMapping("create")
Mono<Message> create(Message message) {
return Mono.just(message);
}

}

编写配置文件 application.yaml

1
2
3
4
5
6
server:
port: 8080
spring:
rsocket:
server:
port: 7000

编写单元测试:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.messaging.rsocket.RSocketRequester;
import org.springframework.messaging.rsocket.RSocketStrategies;
import org.springframework.util.MimeTypeUtils;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;

import static org.assertj.core.api.Assertions.assertThat;


@SpringBootTest
public class TestControllerTest {

private RSocketRequester requester;

@Autowired
private RSocketStrategies rSocketStrategies;

@BeforeEach
public void setup() {
requester = RSocketRequester.builder()
.rsocketStrategies(rSocketStrategies)
.dataMimeType(MimeTypeUtils.APPLICATION_JSON)
.tcp("localhost", 7000);
}

@Test
public void testGetByName() {
Mono<Message> result = requester
.route("getByName")
.data("demo")
.retrieveMono(Message.class);

// Verify that the response message contains the expected data
StepVerifier
.create(result)
.consumeNextWith(message -> {
assertThat(message.name()).isEqualTo("demo");
})
.verifyComplete();
}

@Test
public void testCreate() {
Mono<Message> result = requester
.route("create")
.data(new Message("TEST", "Request"))
.retrieveMono(Message.class);

// Verify that the response message contains the expected data
StepVerifier
.create(result)
.consumeNextWith(message -> {
assertThat(message.name()).isEqualTo("TEST");
assertThat(message.content()).isEqualTo("Request");
})
.verifyComplete();
}

}

WebSocket

除了使用 tcp 链接之外还可以采用 WebSocket 协议,仅需完成如下配置:

1
2
3
4
5
6
7
8
server:
port: 8080
spring:
rsocket:
server:
port: 7000
mapping-path: /rsocket
transport: websocket

测试类的部分则需要修改链接的建立方式:

1
2
3
4
5
6
7
@BeforeEach
public void setup() {
requester = RSocketRequester.builder()
.rsocketStrategies(rSocketStrategies)
.dataMimeType(MimeTypeUtils.APPLICATION_JSON)
.websocket(URI.create("ws://localhost:7000/rsocket"));
}

调试工具

RSocket Requests In HTTP Client

在 JetBrains Marketplace 中寻找 RSocket Requests In HTTP Client 插件并安装即可使用如下的 test.http 文件进行测试:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
### rsocket getByName
RSOCKET getByName
Host: localhost:9090
Content-Type: application/json

1

### rsocket create
RSOCKET create
Host: localhost:9090
Content-Type: application/json

{
"name": "wq",
"content": "wqnice"
}

### rsocket getByName websocket
RSOCKET getByName
Host: ws://localhost:7000/rsocket
Content-Type: application/json

1

### rsocket stream
STREAM st
Host: ws://localhost:7000/rsocket
Content-Type: application/json

RSocket Client CLI (RSC)

访问 https://github.com/making/rsc/releases 即可获取到最新的命令行工具。

使用如下命令即可完成测试:

1
java -jar rsc.jar --debug --request --data "wq" --route getByName tcp://localhost:7000

或:

1
java -jar rsc.jar --debug --request --data '{"name":"wq","content":"nice"}' --route create tcp://localhost:7000

注:不要使用 CMD 或 Power Shell 直接用 Bash,否则会报错。

在 WebSocket 协议中应该采用下面的命令:

1
java -jar rsc.jar --debug --request --data '1' --route getByName ws://localhost:7000/rsocket

常见问题

聊天室 Demo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Controller;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;

import java.util.concurrent.ConcurrentHashMap;

@Controller
public class RsocketController {

private final ConcurrentHashMap<String, FluxSink<Message>> clientMap = new ConcurrentHashMap<>();

@MessageMapping("data")
Flux<Message> data(@Payload String id) {
return Flux.create(sink -> clientMap.put(id, sink));
}

@MessageMapping("chat")
Mono<Void> chat(@Payload Message message) {
clientMap.forEach((userId, sink) -> sink.next(message));
return Mono.empty();
}

}

与 SpringSecurity 和 JWT 集成

需要修改引入的类

1
2
3
4
5
6
7
8
9
10
11
12
dependencies {
implementation 'org.springframework.boot:spring-boot-starter-rsocket'
implementation 'org.springframework.boot:spring-boot-starter-security'
implementation 'org.springframework.boot:spring-boot-starter-web'
implementation 'org.springframework.boot:spring-boot-starter-webflux'
implementation 'org.springframework.boot:spring-boot-starter-oauth2-resource-server'
implementation 'org.springframework.security:spring-security-messaging'
implementation 'org.springframework.security:spring-security-rsocket'
testImplementation 'org.springframework.boot:spring-boot-starter-test'
testImplementation 'io.projectreactor:reactor-test'
testImplementation 'org.springframework.security:spring-security-test'
}

与 Spring Security 集成

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.security.config.Customizer;
import org.springframework.security.config.annotation.rsocket.RSocketSecurity;
import org.springframework.security.oauth2.jwt.NimbusReactiveJwtDecoder;
import org.springframework.security.oauth2.jwt.ReactiveJwtDecoder;
import org.springframework.security.rsocket.core.PayloadSocketAcceptorInterceptor;
import org.springframework.stereotype.Component;

import java.security.interfaces.RSAPrivateKey;
import java.security.interfaces.RSAPublicKey;

@Component
public class RSocketSecurityConfig {

@Value("${jwt.public.key}")
RSAPublicKey publicKey;

@Value("${jwt.private.key}")
RSAPrivateKey privateKey;

@Bean
PayloadSocketAcceptorInterceptor rsocketInterceptor(RSocketSecurity rsocket) {
rsocket
.authorizePayload(authorize ->
authorize
.anyRequest().authenticated()
.anyExchange().permitAll()
)
.jwt(Customizer.withDefaults());
return rsocket.build();
}

@Bean
ReactiveJwtDecoder jwtDecoder() {
return NimbusReactiveJwtDecoder.withPublicKey(publicKey).build();
}

}

参考资料

Spring Framework 官方文档

Spring Boot 官方文档

Getting Started With RSocket On Spring Boot

RSocket Client CLI (RSC)


Spring RSocket
https://wangqian0306.github.io/2023/rsocket/
作者
WangQian
发布于
2023年8月29日
许可协议