网站首页 > 教程文章 正文
在物联网(IoT)和智能设备横行的今天,你有没有遇到这样的问题:
- 服务端需要实时把报警、状态更新、控制指令推送给客户端;
- 安卓 App、嵌入式设备、网页等终端,需要轻量且稳定的连接方式;
- HTTP 太“重”、WebSocket 配置又麻烦?
这时,轻量级消息传输协议 MQTT(Message Queuing Telemetry Transport)登场!
一句话理解 MQTT:专为低带宽、高并发、实时通信设计的发布-订阅协议。
那么问题来了 —— 在 Spring Boot 项目中,如何快速、优雅、高可控地落地 MQTT?
-01-
MQTT 接入方案选择
MQTT 本身只是一种通信协议,并不指定你用哪个消息中间件。而目前支持 MQTT 的主流 Broker 包括:
Broker | 特点简述 |
Mosquitto | 轻量级,C语言实现,非常稳定 |
RabbitMQ | 插件支持 MQTT,易与现有系统整合 |
EMQX | 高性能 MQTT Broker,专为 IoT 优化 |
HiveMQ | 商用支持强,价格偏贵 |
本次我们采用的是:RabbitMQ + MQTT 插件,实现服务端到安卓客户端的推送通知,配合 Spring Boot 框架,集成简便,生产可用!
-02-
MQTT 三大角色
MQTT,就像微信一样:
- Publisher(发布者):你发朋友圈
- Broker(中间人):微信服务器
- Subscriber(订阅者):看到你朋友圈的朋友
也就是说,消息不是点对点的,而是“你说一句,谁订阅了就能听到”。
-03-
实战解析
Spring Boot + RabbitMQ MQTT 实现推送系统
整体架构:
[Spring Boot服务] --发布消息--> [RabbitMQ MQTT插件] --> [MQTT客户端订阅接收消息]
RabbitMQ 开启 MQTT 插件
rabbitmq-plugins enable rabbitmq_mqtt # 服务端 MQTT 协议,端口1883
rabbitmq-plugins enable rabbitmq_web_mqtt # Web前端用 MQTT 协议,端口15675
引入依赖
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
配置 application.yml
mqtt-push:
clientId: mqtt_client_
serverClientId: mqtt_server_
servers: tcp://127.0.0.1:1883
username: guest
password: guest
defaultTopic: sensor/+/temperature
配置连接工厂
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
MqttConnectOptions options = new MqttConnectOptions();
options.setServerURIs(servers.split(","));
options.setCleanSession(false);
options.setUserName(username);
options.setPassword(password.toCharArray());
options.setKeepAliveInterval(20);
factory.setConnectionOptions(options);
return factory;
}
服务端推送消息
@Bean
@ServiceActivator(inputChannel = "mqttOutboundChannel")
public MessageHandler mqttOutbound() {
MqttPahoMessageHandler handler = new MqttPahoMessageHandler(serverClientId + "producer_" + RandomUtil.getRandomStr(), mqttClientFactory());
handler.setAsync(true);
handler.setDefaultQos(1);
handler.setDefaultTopic(defaultTopic);
return handler;
}
使用接口发送消息:
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttGateway {
void sendMessage2Mqtt(String data, @Header(MqttHeaders.TOPIC) String topic);
}
服务端监听客户端消息
@Bean
public MessageProducer inbound() {
MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(
clientId + "consumer_" + RandomUtil.getRandomStr(), mqttClientFactory(), defaultTopic);
adapter.setQos(2);
adapter.setOutputChannel(mqttInputChannel());
return adapter;
}
处理消息回调:
@Bean
@ServiceActivator(inputChannel = "mqttInputChannel")
public MessageHandler mqttInMessageHandler() {
return message -> {
String topic = message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC).toString();
String payload = message.getPayload().toString();
log.info("收到消息:主题 [{}] 内容 [{}]", topic, payload);
};
}
MQTTBox 测试
MQTTBox 是一款强大的 MQTT 测试工具,可以模拟发送消息,也能订阅查看接收到的消息:
1. 发布测试
使用 MQTTBox 向 sensor/s123/temperature 发布消息
服务端通过通配符 sensor/+/temperature 成功收到消息!
2. 控制器测试
@PostMapping("/sendMessage")
public String sendMqtt(@RequestBody ReqSendMsgDTO dto) {
mqttGateway.sendMessage2Mqtt(dto.getTopic(), dto.getPayload());
return "SUCCESS";
}
-04-
总结
实践建议
- clientId 必须唯一,推荐使用 UUID 或服务实例标识;
- QoS 建议使用 1(至少一次),避免消息丢失;
- 若用 RabbitMQ,也可以使用 Exchange + Topic Binding 方式做高级路由;
- 对于高并发或长连接推送,推荐结合 Netty 或 Gateway 层限流处理。
技术方案
能力点 | 技术实现 |
协议支持 | MQTT(通过 rabbitmq_mqtt 插件) |
服务端推送 | Spring Integration + MqttGateway |
客户端订阅 | MqttPahoMessageDrivenChannelAdapter |
工具联调 | MQTTBox / Postman / 模拟器 |
安全与稳定性 | 唯一 clientId、QoS 保证、自动重连 |
猜你喜欢
- 2025-08-03 Chrome插件Talend API Tester核心竞争力与功能深度解析
- 2025-08-03 十分钟带你了解阿里、美团、滴滴、头条等互联网头部大厂面经
- 2025-08-03 什么是RPC?什么是Restful?它们有什么区别?
- 2025-08-03 基于Java实现,支持在线发布API接口读取数据库,有哪些工具?
- 2025-08-03 最近做了一个搜索接口的优化,反复压测了四次,终于达到要求了
- 2025-08-03 HTTP链接保活,3个层面的保活机制,让你的认知入木三分
- 2025-08-03 每天一个 Python 库:httpx异步请求,让接口测试飞起来
- 2025-08-03 20. 综合项目
- 2025-08-03 工厂模式+策略模式消除 if else 实战
- 2025-08-03 架构篇-一分钟掌握性能优化小技巧
- 最近发表
- 标签列表
-
- location.href (44)
- document.ready (36)
- git checkout -b (34)
- 跃点数 (35)
- 阿里云镜像地址 (33)
- qt qmessagebox (36)
- mybatis plus page (35)
- vue @scroll (38)
- 堆栈区别 (33)
- 什么是容器 (33)
- sha1 md5 (33)
- navicat导出数据 (34)
- 阿里云acp考试 (33)
- 阿里云 nacos (34)
- redhat官网下载镜像 (36)
- srs服务器 (33)
- pico开发者 (33)
- https的端口号 (34)
- vscode更改主题 (35)
- 阿里云资源池 (34)
- os.path.join (33)
- redis aof rdb 区别 (33)
- 302跳转 (33)
- http method (35)
- js array splice (33)