QDAirPortBackend0122/mqtt项目核心代码总结.md
shan c66b290677 重构红绿灯数据采集方式从TCP改为MQTT并优化WebSocket车辆控制指令
- 禁用TCP红绿灯服务器,改用MQTT订阅获取信号状态
- 新增MqttConfig配置类和TrafficLightMqttSubscriber订阅器
- 重构VehicleCommandInfoWebSocketHandler,根据红绿灯信号实时发送车辆控制指令
- 添加Eclipse Paho MQTT客户端依赖
- 新增接口文档和MQTT核心代码总结文档
- 添加WebSocket测试页面
2026-01-27 17:01:22 +08:00

129 lines
3.2 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# MQTT 订阅者项目 - 核心代码总结
## 项目概述
Spring Boot MQTT 订阅者最小可行性验证项目,用于连接 MQTT Broker 并订阅指定 Topic 接收消息数据。
## 技术栈
- Eclipse Paho MQTT 1.2.5
- dotenv-java 3.2.0
## 2. application.yaml
```yaml
mqtt:
broker: ${MQTT_BROKER:tcp://10.0.0.202:8082}
client-id: ${MQTT_CLIENT_ID:springboot-mqtt-client}
username: ${MQTT_USERNAME:mqtt}
password: ${MQTT_PASSWORD:XAsOsLxaPs1}
topic: ${MQTT_TOPIC:test/topic}
timeout: ${MQTT_TIMEOUT:30}
keepalive: ${MQTT_KEEPALIVE:60}
```
## 4. MqttConfig.java
```java
package com.example.demo.config;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class MqttConfig {
@Value("${mqtt.username:}")
private String username;
@Value("${mqtt.password:}")
private String password;
@Value("${mqtt.timeout:30}")
private int timeout;
@Value("${mqtt.keepalive:60}")
private int keepalive;
@Bean
public MqttConnectOptions mqttConnectOptions() {
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(true);
options.setConnectionTimeout(timeout);
options.setKeepAliveInterval(keepalive);
if (username != null && !username.isEmpty()) {
options.setUserName(username);
options.setPassword(password.toCharArray());
}
return options;
}
}
```
---
## 5. MqttSubscriber.java
```java
package com.example.demo.subscriber;
import jakarta.annotation.PostConstruct;
import org.eclipse.paho.client.mqttv3.*;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import java.nio.charset.StandardCharsets;
@Component
public class MqttSubscriber {
@Value("${mqtt.broker}")
private String broker;
@Value("${mqtt.client-id}")
private String clientId;
@Value("${mqtt.topic}")
private String topic;
private final MqttConnectOptions options;
public MqttSubscriber(MqttConnectOptions options) {
this.options = options;
}
@PostConstruct
public void init() throws MqttException {
MqttClient client = new MqttClient(broker, clientId);
client.setCallback(new MqttCallback() {
@Override
public void connectionLost(Throwable cause) {
System.out.println("❌ MQTT 连接断开:" + cause.getMessage());
}
@Override
public void messageArrived(String topic, MqttMessage message) {
String payload = new String(message.getPayload(), StandardCharsets.UTF_8);
System.out.println("📥 收到消息");
System.out.println("Topic: " + topic);
System.out.println("Payload: " + payload);
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
}
});
client.connect(options);
client.subscribe(topic, 0);
System.out.println("✅ MQTT 已连接,订阅 Topic" + topic);
}
}
```