Spring MQTT

Spring MQTT

简介

Spring 提供了 Message Queueing Telemetry Transport(MQTT) 协议的插件。

搭建环境

编写配置文件 config/mosquitto.conf

1
2
listener 1883
allow_anonymous true

编写 docker-compose.yaml

1
2
3
4
5
6
7
8
9
services:
mqtt-broker:
image: eclipse-mosquitto:latest
container_name: mqtt-broker
ports:
- "1883:1883"
- "9001:9001"
volumes:
- ./config:/mosquitto/config

使用

安装依赖(maven):

1
2
3
4
5
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
<version>6.3.1</version>
</dependency>

安装依赖(gradle):

1
compile "org.springframework.integration:spring-integration-mqtt:6.3.1"

编写配置类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.integration.annotation.MessagingGateway;

@Configuration
public class MqttConfig {

@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
MqttConnectOptions options = new MqttConnectOptions();
options.setServerURIs(new String[] { "tcp://192.168.2.235:1883"});
// options.setUserName("username");
// options.setPassword("password".toCharArray());
factory.setConnectionOptions(options);
return factory;
}

@Bean
@ServiceActivator(inputChannel = "mqttOutboundChannel")
public MessageHandler mqttOutbound() {
MqttPahoMessageHandler messageHandler =
new MqttPahoMessageHandler("writeClient", mqttClientFactory());
messageHandler.setAsync(true);
messageHandler.setDefaultTopic("topic1");
return messageHandler;
}

@Bean
public MessageChannel mqttOutboundChannel() {
return new DirectChannel();
}

@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MyGateway {
void sendToMqtt(String data);
}

@Bean
public MessageChannel mqttInputChannel() {
return new DirectChannel();
}


@Bean
public MessageProducer inbound() {
MqttPahoMessageDrivenChannelAdapter adapter =
new MqttPahoMessageDrivenChannelAdapter("readClient", mqttClientFactory(),
"topic1");
adapter.setCompletionTimeout(5000);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(1);
adapter.setOutputChannel(mqttInputChannel());
return adapter;
}

@Bean
@ServiceActivator(inputChannel = "mqttInputChannel")
public MessageHandler handler() {
return message -> System.out.println(message.getPayload() + " message received");
}
}

编写测试类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import jakarta.annotation.Resource;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/test")
public class TestController {

@Resource
private MqttConfig.MyGateway myGateway;

@GetMapping
public String test() {
myGateway.sendToMqtt("wqnice");
return "ok";
}
}

参考资料

官方文档


Spring MQTT
https://wangqian0306.github.io/2024/spring-mqtt/
作者
WangQian
发布于
2024年6月19日
许可协议