Spring RSocket
简介
RSocket 是一种使用异步二进制流提供 Reactive Streams 语义的应用程序协议,使用它可以不关注底层的实现方式。
使用方式
引入如下依赖:
1 2 3 4 5 6 7 dependencies { implementation 'org.springframework.boot:spring-boot-starter-rsocket' implementation 'org.springframework.boot:spring-boot-starter-webflux' developmentOnly 'org.springframework.boot:spring-boot-devtools' testImplementation 'org.springframework.boot:spring-boot-starter-test' testImplementation 'io.projectreactor:reactor-test' }
编写测试 Record:
1 2 public record Message (String name, String content) { }
编写测试 Controller:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 import org.springframework.messaging.handler.annotation.MessageMapping;import org.springframework.stereotype.Controller;import reactor.core.publisher.Mono;import java.time.Instant;@Controller public class TestController { @MessageMapping("getByName") Mono<Message> getByName (String name) { return Mono.just(new Message (name, Instant.now().toString())); } @MessageMapping("create") Mono<Message> create (Message message) { return Mono.just(message); } }
编写配置文件 application.yaml
:
1 2 3 4 5 6 server: port: 8080 spring: rsocket: server: port: 7000
编写单元测试:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 import org.junit.jupiter.api.BeforeEach;import org.junit.jupiter.api.Test;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.test.context.SpringBootTest;import org.springframework.messaging.rsocket.RSocketRequester;import org.springframework.messaging.rsocket.RSocketStrategies;import org.springframework.util.MimeTypeUtils;import reactor.core.publisher.Mono;import reactor.test.StepVerifier;import static org.assertj.core.api.Assertions.assertThat;@SpringBootTest public class TestControllerTest { private RSocketRequester requester; @Autowired private RSocketStrategies rSocketStrategies; @BeforeEach public void setup () { requester = RSocketRequester.builder() .rsocketStrategies(rSocketStrategies) .dataMimeType(MimeTypeUtils.APPLICATION_JSON) .tcp("localhost" , 7000 ); } @Test public void testGetByName () { Mono<Message> result = requester .route("getByName" ) .data("demo" ) .retrieveMono(Message.class); StepVerifier .create(result) .consumeNextWith(message -> { assertThat(message.name()).isEqualTo("demo" ); }) .verifyComplete(); } @Test public void testCreate () { Mono<Message> result = requester .route("create" ) .data(new Message ("TEST" , "Request" )) .retrieveMono(Message.class); StepVerifier .create(result) .consumeNextWith(message -> { assertThat(message.name()).isEqualTo("TEST" ); assertThat(message.content()).isEqualTo("Request" ); }) .verifyComplete(); } }
WebSocket
除了使用 tcp 链接之外还可以采用 WebSocket 协议,仅需完成如下配置:
1 2 3 4 5 6 7 8 server: port: 8080 spring: rsocket: server: port: 7000 mapping-path: /rsocket transport: websocket
测试类的部分则需要修改链接的建立方式:
1 2 3 4 5 6 7 @BeforeEach public void setup () { requester = RSocketRequester.builder() .rsocketStrategies(rSocketStrategies) .dataMimeType(MimeTypeUtils.APPLICATION_JSON) .websocket(URI.create("ws://localhost:7000/rsocket" )); }
调试工具
RSocket Requests In HTTP Client
在 JetBrains Marketplace 中寻找 RSocket Requests In HTTP Client
插件并安装即可使用如下的 test.http
文件进行测试:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 ### rsocket getByName RSOCKET getByName Host: localhost:9090 Content-Type: application/json 1 ### rsocket create RSOCKET create Host: localhost:9090 Content-Type: application/json { "name": "wq", "content": "wqnice" } ### rsocket getByName websocket RSOCKET getByName Host: ws://localhost:7000/rsocket Content-Type: application/json 1 ### rsocket stream STREAM st Host: ws://localhost:7000/rsocket Content-Type: application/json
RSocket Client CLI (RSC)
访问 https://github.com/making/rsc/releases 即可获取到最新的命令行工具。
使用如下命令即可完成测试:
1 java -jar rsc.jar --debug --request --data "wq" --route getByName tcp://localhost:7000
或:
1 java -jar rsc.jar --debug --request --data '{"name":"wq","content":"nice"}' --route create tcp://localhost:7000
注:不要使用 CMD 或 Power Shell 直接用 Bash,否则会报错。
在 WebSocket 协议中应该采用下面的命令:
1 java -jar rsc.jar --debug --request --data '1' --route getByName ws://localhost:7000/rsocket
常见问题
聊天室 Demo
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 import org.springframework.messaging.handler.annotation.MessageMapping;import org.springframework.messaging.handler.annotation.Payload;import org.springframework.stereotype.Controller;import reactor.core.publisher.Flux;import reactor.core.publisher.FluxSink;import reactor.core.publisher.Mono;import java.util.concurrent.ConcurrentHashMap;@Controller public class RsocketController { private final ConcurrentHashMap<String, FluxSink<Message>> clientMap = new ConcurrentHashMap <>(); @MessageMapping("data") Flux<Message> data (@Payload String id) { return Flux.create(sink -> clientMap.put(id, sink)); } @MessageMapping("chat") Mono<Void> chat (@Payload Message message) { clientMap.forEach((userId, sink) -> sink.next(message)); return Mono.empty(); } }
与 SpringSecurity 和 JWT 集成
需要修改引入的类
1 2 3 4 5 6 7 8 9 10 11 12 dependencies { implementation 'org.springframework.boot:spring-boot-starter-rsocket' implementation 'org.springframework.boot:spring-boot-starter-security' implementation 'org.springframework.boot:spring-boot-starter-web' implementation 'org.springframework.boot:spring-boot-starter-webflux' implementation 'org.springframework.boot:spring-boot-starter-oauth2-resource-server' implementation 'org.springframework.security:spring-security-messaging' implementation 'org.springframework.security:spring-security-rsocket' testImplementation 'org.springframework.boot:spring-boot-starter-test' testImplementation 'io.projectreactor:reactor-test' testImplementation 'org.springframework.security:spring-security-test' }
与 Spring Security 集成
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 import org.springframework.beans.factory.annotation.Value;import org.springframework.context.annotation.Bean;import org.springframework.security.config.Customizer;import org.springframework.security.config.annotation.rsocket.RSocketSecurity;import org.springframework.security.oauth2.jwt.NimbusReactiveJwtDecoder;import org.springframework.security.oauth2.jwt.ReactiveJwtDecoder;import org.springframework.security.rsocket.core.PayloadSocketAcceptorInterceptor;import org.springframework.stereotype.Component;import java.security.interfaces.RSAPrivateKey;import java.security.interfaces.RSAPublicKey;@Component public class RSocketSecurityConfig { @Value("${jwt.public.key}") RSAPublicKey publicKey; @Value("${jwt.private.key}") RSAPrivateKey privateKey; @Bean PayloadSocketAcceptorInterceptor rsocketInterceptor (RSocketSecurity rsocket) { rsocket .authorizePayload(authorize -> authorize .anyRequest().authenticated() .anyExchange().permitAll() ) .jwt(Customizer.withDefaults()); return rsocket.build(); } @Bean ReactiveJwtDecoder jwtDecoder () { return NimbusReactiveJwtDecoder.withPublicKey(publicKey).build(); } }
参考资料
Spring Framework 官方文档
Spring Boot 官方文档
Getting Started With RSocket On Spring Boot
RSocket Client CLI (RSC)