- 禁用TCP红绿灯服务器,改用MQTT订阅获取信号状态 - 新增MqttConfig配置类和TrafficLightMqttSubscriber订阅器 - 重构VehicleCommandInfoWebSocketHandler,根据红绿灯信号实时发送车辆控制指令 - 添加Eclipse Paho MQTT客户端依赖 - 新增接口文档和MQTT核心代码总结文档 - 添加WebSocket测试页面
3.2 KiB
3.2 KiB
MQTT 订阅者项目 - 核心代码总结
项目概述
Spring Boot MQTT 订阅者最小可行性验证项目,用于连接 MQTT Broker 并订阅指定 Topic 接收消息数据。
技术栈
- Eclipse Paho MQTT 1.2.5
- dotenv-java 3.2.0
2. application.yaml
mqtt:
broker: ${MQTT_BROKER:tcp://10.0.0.202:8082}
client-id: ${MQTT_CLIENT_ID:springboot-mqtt-client}
username: ${MQTT_USERNAME:mqtt}
password: ${MQTT_PASSWORD:XAsOsLxaPs1}
topic: ${MQTT_TOPIC:test/topic}
timeout: ${MQTT_TIMEOUT:30}
keepalive: ${MQTT_KEEPALIVE:60}
4. MqttConfig.java
package com.example.demo.config;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class MqttConfig {
@Value("${mqtt.username:}")
private String username;
@Value("${mqtt.password:}")
private String password;
@Value("${mqtt.timeout:30}")
private int timeout;
@Value("${mqtt.keepalive:60}")
private int keepalive;
@Bean
public MqttConnectOptions mqttConnectOptions() {
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(true);
options.setConnectionTimeout(timeout);
options.setKeepAliveInterval(keepalive);
if (username != null && !username.isEmpty()) {
options.setUserName(username);
options.setPassword(password.toCharArray());
}
return options;
}
}
5. MqttSubscriber.java
package com.example.demo.subscriber;
import jakarta.annotation.PostConstruct;
import org.eclipse.paho.client.mqttv3.*;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import java.nio.charset.StandardCharsets;
@Component
public class MqttSubscriber {
@Value("${mqtt.broker}")
private String broker;
@Value("${mqtt.client-id}")
private String clientId;
@Value("${mqtt.topic}")
private String topic;
private final MqttConnectOptions options;
public MqttSubscriber(MqttConnectOptions options) {
this.options = options;
}
@PostConstruct
public void init() throws MqttException {
MqttClient client = new MqttClient(broker, clientId);
client.setCallback(new MqttCallback() {
@Override
public void connectionLost(Throwable cause) {
System.out.println("❌ MQTT 连接断开:" + cause.getMessage());
}
@Override
public void messageArrived(String topic, MqttMessage message) {
String payload = new String(message.getPayload(), StandardCharsets.UTF_8);
System.out.println("📥 收到消息");
System.out.println("Topic: " + topic);
System.out.println("Payload: " + payload);
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
}
});
client.connect(options);
client.subscribe(topic, 0);
System.out.println("✅ MQTT 已连接,订阅 Topic:" + topic);
}
}