QDAirPortBackend0122/mqtt项目核心代码总结.md
shan c66b290677 重构红绿灯数据采集方式从TCP改为MQTT并优化WebSocket车辆控制指令
- 禁用TCP红绿灯服务器,改用MQTT订阅获取信号状态
- 新增MqttConfig配置类和TrafficLightMqttSubscriber订阅器
- 重构VehicleCommandInfoWebSocketHandler,根据红绿灯信号实时发送车辆控制指令
- 添加Eclipse Paho MQTT客户端依赖
- 新增接口文档和MQTT核心代码总结文档
- 添加WebSocket测试页面
2026-01-27 17:01:22 +08:00

3.2 KiB
Raw Blame History

MQTT 订阅者项目 - 核心代码总结

项目概述

Spring Boot MQTT 订阅者最小可行性验证项目,用于连接 MQTT Broker 并订阅指定 Topic 接收消息数据。

技术栈

  • Eclipse Paho MQTT 1.2.5
  • dotenv-java 3.2.0

2. application.yaml

mqtt:
  broker: ${MQTT_BROKER:tcp://10.0.0.202:8082}
  client-id: ${MQTT_CLIENT_ID:springboot-mqtt-client}
  username: ${MQTT_USERNAME:mqtt}
  password: ${MQTT_PASSWORD:XAsOsLxaPs1}
  topic: ${MQTT_TOPIC:test/topic}
  timeout: ${MQTT_TIMEOUT:30}
  keepalive: ${MQTT_KEEPALIVE:60}

4. MqttConfig.java

package com.example.demo.config;

import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class MqttConfig {

    @Value("${mqtt.username:}")
    private String username;

    @Value("${mqtt.password:}")
    private String password;

    @Value("${mqtt.timeout:30}")
    private int timeout;

    @Value("${mqtt.keepalive:60}")
    private int keepalive;

    @Bean
    public MqttConnectOptions mqttConnectOptions() {
        MqttConnectOptions options = new MqttConnectOptions();
        options.setCleanSession(true);
        options.setConnectionTimeout(timeout);
        options.setKeepAliveInterval(keepalive);

        if (username != null && !username.isEmpty()) {
            options.setUserName(username);
            options.setPassword(password.toCharArray());
        }

        return options;
    }
}

5. MqttSubscriber.java

package com.example.demo.subscriber;

import jakarta.annotation.PostConstruct;
import org.eclipse.paho.client.mqttv3.*;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import java.nio.charset.StandardCharsets;

@Component
public class MqttSubscriber {

    @Value("${mqtt.broker}")
    private String broker;

    @Value("${mqtt.client-id}")
    private String clientId;

    @Value("${mqtt.topic}")
    private String topic;

    private final MqttConnectOptions options;

    public MqttSubscriber(MqttConnectOptions options) {
        this.options = options;
    }

    @PostConstruct
    public void init() throws MqttException {
        MqttClient client = new MqttClient(broker, clientId);

        client.setCallback(new MqttCallback() {
            @Override
            public void connectionLost(Throwable cause) {
                System.out.println("❌ MQTT 连接断开:" + cause.getMessage());
            }

            @Override
            public void messageArrived(String topic, MqttMessage message) {
                String payload = new String(message.getPayload(), StandardCharsets.UTF_8);
                System.out.println("📥 收到消息");
                System.out.println("Topic: " + topic);
                System.out.println("Payload: " + payload);
            }

            @Override
            public void deliveryComplete(IMqttDeliveryToken token) {
            }
        });

        client.connect(options);
        client.subscribe(topic, 0);

        System.out.println("✅ MQTT 已连接,订阅 Topic" + topic);
    }
}