Spring MQTT
简介
Spring 提供了 Message Queueing Telemetry Transport(MQTT) 协议的插件。
搭建环境
编写配置文件 config/mosquitto.conf
:
1 2
| listener 1883 allow_anonymous true
|
编写 docker-compose.yaml
1 2 3 4 5 6 7 8 9
| services: mqtt-broker: image: eclipse-mosquitto:latest container_name: mqtt-broker ports: - "1883:1883" - "9001:9001" volumes: - ./config:/mosquitto/config
|
使用
安装依赖(maven):
1 2 3 4 5
| <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-mqtt</artifactId> <version>6.3.1</version> </dependency>
|
安装依赖(gradle):
1
| compile "org.springframework.integration:spring-integration-mqtt:6.3.1"
|
编写配置类:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73
| import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.integration.channel.DirectChannel; import org.springframework.integration.core.MessageProducer; import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory; import org.springframework.integration.mqtt.core.MqttPahoClientFactory; import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter; import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler; import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler; import org.springframework.integration.annotation.MessagingGateway;
@Configuration public class MqttConfig {
@Bean public MqttPahoClientFactory mqttClientFactory() { DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); MqttConnectOptions options = new MqttConnectOptions(); options.setServerURIs(new String[] { "tcp://192.168.2.235:1883"});
factory.setConnectionOptions(options); return factory; }
@Bean @ServiceActivator(inputChannel = "mqttOutboundChannel") public MessageHandler mqttOutbound() { MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler("writeClient", mqttClientFactory()); messageHandler.setAsync(true); messageHandler.setDefaultTopic("topic1"); return messageHandler; }
@Bean public MessageChannel mqttOutboundChannel() { return new DirectChannel(); }
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel") public interface MyGateway { void sendToMqtt(String data); }
@Bean public MessageChannel mqttInputChannel() { return new DirectChannel(); }
@Bean public MessageProducer inbound() { MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter("readClient", mqttClientFactory(), "topic1"); adapter.setCompletionTimeout(5000); adapter.setConverter(new DefaultPahoMessageConverter()); adapter.setQos(1); adapter.setOutputChannel(mqttInputChannel()); return adapter; }
@Bean @ServiceActivator(inputChannel = "mqttInputChannel") public MessageHandler handler() { return message -> System.out.println(message.getPayload() + " message received"); } }
|
编写测试类:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| import jakarta.annotation.Resource; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController;
@RestController @RequestMapping("/test") public class TestController {
@Resource private MqttConfig.MyGateway myGateway;
@GetMapping public String test() { myGateway.sendToMqtt("wqnice"); return "ok"; } }
|
参考资料
官方文档