准备测试

This commit is contained in:
sladro 2026-01-28 17:36:32 +08:00
parent 4bd984a540
commit aade3009e1
20 changed files with 690 additions and 157 deletions

View File

@ -109,6 +109,7 @@ set(LIB_SOURCES
src/network/HTTPClient.cpp
src/network/TrafficLightHttpServer.cpp
src/network/TrafficLightTcpServer.cpp
src/network/ConfigHttpServer.cpp
src/spatial/CoordinateConverter.cpp
src/types/BasicTypes.cpp
src/types/VehicleData.cpp

View File

@ -92,8 +92,8 @@
"read_timeout_ms": 2000
},
"unmanned_vehicle": {
"host": "localhost",
"port": 8081,
"host": "10.232.18.23",
"port": 8020,
"location_path": "/api/VehicleLocationInfo",
"status_path": "/api/VehicleStateInfo",
"command_path": "/api/VehicleCommandInfo",

View File

@ -389,19 +389,15 @@ bool DataCollector::fetchUnmannedVehicleData() {
bool any_success = false;
bool any_failure = false;
auto& vehicles = ControllableVehicles::getInstance().getVehicles();
auto controllables = ControllableVehicles::getInstance().getControllableVehicleIdsSnapshot();
for (const auto& vehicle : vehicles) {
if (vehicle.type == "UNMANNED") {
std::string status;
if (dataSource_->fetchUnmannedVehicleStatus(vehicle.vehicleNo,
status)) {
any_success = true;
} else {
any_failure = true;
Logger::error("获取无人车状态失败, vehicleNo: " +
vehicle.vehicleNo);
}
for (const auto& vehicle_id : controllables) {
std::string status;
if (dataSource_->fetchUnmannedVehicleStatus(vehicle_id, status)) {
any_success = true;
} else {
any_failure = true;
Logger::error("获取无人车状态失败, vehicleNo: " + vehicle_id);
}
}

View File

@ -152,7 +152,8 @@ AreaType AirportBounds::getAreaType(const Vector2D& position) const {
return AreaType::TEST_ZONE;
}
const AreaConfig& AirportBounds::getAreaConfig(AreaType type) const {
AreaConfig AirportBounds::getAreaConfig(AreaType type) const {
std::shared_lock<std::shared_mutex> lock(config_mutex_);
auto it = areaConfigs_.find(type);
if (it == areaConfigs_.end()) {
throw std::runtime_error("Invalid area type");
@ -160,6 +161,19 @@ const AreaConfig& AirportBounds::getAreaConfig(AreaType type) const {
return it->second;
}
bool AirportBounds::setWarningZoneAircraftRadius(AreaType type, double radius) {
if (!(radius > 0.0) || !std::isfinite(radius)) {
return false;
}
std::unique_lock<std::shared_mutex> lock(config_mutex_);
auto it = areaConfigs_.find(type);
if (it == areaConfigs_.end()) {
return false;
}
it->second.warning_zone_radius.aircraft = radius;
return true;
}
bool AirportBounds::isPointInBounds(const Vector2D& position) const {
// 在机场坐标系中判断是否在边界内
bool result = airportBounds_.contains(position);

View File

@ -7,6 +7,8 @@
#include "spatial/QuadTree.h"
#include "AreaConfig.h"
#include <unordered_map>
#include <shared_mutex>
#include <mutex>
// 机场区域定义
class AirportBounds {
@ -18,7 +20,11 @@ public:
virtual AreaType getAreaType(const Vector2D& position) const;
// 获取区域配置
virtual const AreaConfig& getAreaConfig(AreaType type) const;
virtual AreaConfig getAreaConfig(AreaType type) const;
// 动态更新区域配置(运行中生效)
// 例如:修改 warning_zone_radius.aircraft你提到的 200
virtual bool setWarningZoneAircraftRadius(AreaType type, double radius);
// 获取整个机场边界
virtual const Bounds& getAirportBounds() const { return airportBounds_; }
@ -47,6 +53,9 @@ protected:
Bounds airportBounds_; // 整个机场边界
std::unordered_map<AreaType, Bounds> areaBounds_; // 各区域边界
std::unordered_map<AreaType, AreaConfig> areaConfigs_; // 各区域配置
// 保护 areaConfigs_ 的并发读写HTTP 动态修改 vs 处理线程读取)
mutable std::shared_mutex config_mutex_;
// 从配置文件加载数据
virtual void loadConfig(const std::string& configFile);

View File

@ -14,14 +14,15 @@
System* System::instance_ = nullptr;
namespace {
volatile sig_atomic_t g_system_signal_requested = 0;
}
System::System()
: controllableVehicles_(ControllableVehicles::getInstance()) {
instance_ = this;
std::signal(SIGINT, signalHandler);
std::signal(SIGTERM, signalHandler);
// 使用单例模式获取配置实例
auto& config = SystemConfig::instance();
// 信号处理由 main 负责注册(避免覆盖 main 的 handler且 signal handler 内不可做非异步信号安全操作)
(void)SystemConfig::instance();
}
System::~System() {
@ -30,11 +31,8 @@ System::~System() {
}
void System::signalHandler(int signal) {
Logger::info("Received signal: ", signal);
if (instance_) {
instance_->stop();
}
std::exit(0);
// async-signal-safe: 仅记录请求,不做锁/线程/join/IO/exit 等操作
g_system_signal_requested = signal;
}
bool System::initialize() {
@ -56,6 +54,14 @@ bool System::initialize() {
ws_thread_ = std::thread([this]() { ws_server_->start(); });
Logger::info("WebSocket server initialized on port ", system_config.websocket.port);
// 前端动态配置接口HTTP
// POST http://<host>:8090/config/runway/warning_zone_radius/aircraft {"value": 300}
config_http_server_ = std::make_unique<network::ConfigHttpServer>(
static_cast<uint16_t>(8090),
50,
*this);
config_http_server_->start(1);
// 红绿灯改为 TCP 被动接收(对齐 tcp_server.pyaccept -> recv -> 响应 -> close
// 这里先写死监听端口 80820.0.0.0),用于你先在测试平台验证链路
traffic_light_tcp_server_ = std::make_unique<network::TrafficLightTcpServer>(
@ -167,6 +173,11 @@ void System::stop() {
traffic_light_tcp_server_->stop();
}
// Stop Config HTTP server
if (config_http_server_) {
config_http_server_->stop();
}
// 保留旧 HTTP server 成员(当前不启用)
if (traffic_light_http_server_) {
traffic_light_http_server_->stop();
@ -183,6 +194,21 @@ void System::stop() {
Logger::info("System fully stopped.");
}
bool System::setRunwayWarningZoneAircraftRadius(double radius, double* oldValue) {
if (!airportBounds_) {
return false;
}
try {
auto cfg = airportBounds_->getAreaConfig(AreaType::RUNWAY);
if (oldValue) {
*oldValue = cfg.warning_zone_radius.aircraft;
}
} catch (...) {
return false;
}
return airportBounds_->setWarningZoneAircraftRadius(AreaType::RUNWAY, radius);
}
void System::processLoop() {
while (running_) {
try {
@ -220,11 +246,9 @@ void System::processLoop() {
objects.push_back(&ac);
}
for (auto& veh : latest_vehicles_) {
const auto* config = controllableVehicles_.findVehicle(veh.vehicleNo);
if (config) {
veh.type = config->type == "UNMANNED" ? MovingObjectType::UNMANNED : MovingObjectType::SPECIAL;
veh.isControllable = config->type == "UNMANNED";
}
bool controllable = controllableVehicles_.isControllable(veh.vehicleNo);
veh.type = controllable ? MovingObjectType::UNMANNED : MovingObjectType::SPECIAL;
veh.isControllable = controllable;
objects.push_back(&veh);
}
@ -299,8 +323,11 @@ void System::checkUnmannedVehicleSafetyZones(
std::unordered_map<std::string, RiskLevel>& vehicleMaxRiskLevels,
std::vector<CollisionRisk>& detectedRisks) {
// 遍历所有无人车
// 遍历所有无人车(可控车辆)
for (const auto& vehicle : vehicles) {
if (!controllableVehicles_.isControllable(vehicle.id)) {
continue;
}
// 遍历所有路口安全区
for (const auto& [intersectionId, zone] : safetyZones_) {
if (zone->getState() == SafetyZoneState::INACTIVE) {

View File

@ -13,6 +13,7 @@
#include "config/AirportBounds.h"
#include "vehicle/ControllableVehicles.h"
#include "network/WebSocketServer.h"
#include "network/ConfigHttpServer.h"
#include "network/MessageTypes.h"
#include "config/IntersectionConfig.h"
#include "network/TrafficLightHttpServer.h"
@ -41,6 +42,9 @@ public:
void broadcastCollisionWarning(const CollisionRisk& risk);
void broadcastVehicleCommand(const VehicleCommand& cmd);
void broadcastTrafficLightStatus(const TrafficLightSignal& signal);
// 运行中动态修改配置:对应 airport_bounds.json 里 runway.warning_zone_radius.aircraft默认 200
bool setRunwayWarningZoneAircraftRadius(double radius, double* oldValue = nullptr);
const SystemConfig& getSystemConfig() const { return SystemConfig::instance(); }
const IntersectionConfig& getIntersectionConfig() const { return intersection_config_; }
@ -95,6 +99,9 @@ private:
// WebSocket 服务器
std::unique_ptr<network::WebSocketServer> ws_server_;
std::thread ws_thread_;
// 前端动态配置 HTTP Server
std::unique_ptr<network::ConfigHttpServer> config_http_server_;
// 新增: 红绿灯 HTTP 服务器
std::unique_ptr<network::TrafficLightHttpServer> traffic_light_http_server_;

View File

@ -33,22 +33,11 @@ void CollisionDetector::updateTraffic(const std::vector<Aircraft>& aircraft,
continue; // 跳过边界外的车辆
}
// 根据配置设置车辆类型
// 根据运行时注册表设置车辆类型
Vehicle updatedVehicle = vehicle;
const auto* config =
controllableVehicles_->findVehicle(vehicle.vehicleNo);
if (config) {
if (config->type == "UNMANNED") {
updatedVehicle.type = MovingObjectType::UNMANNED;
updatedVehicle.isControllable = true;
} else if (config->type == "SPECIAL") {
updatedVehicle.type = MovingObjectType::SPECIAL;
updatedVehicle.isControllable = false;
}
} else {
updatedVehicle.type = MovingObjectType::SPECIAL;
updatedVehicle.isControllable = false;
}
bool controllable = controllableVehicles_ && controllableVehicles_->isControllable(vehicle.vehicleNo);
updatedVehicle.type = controllable ? MovingObjectType::UNMANNED : MovingObjectType::SPECIAL;
updatedVehicle.isControllable = controllable;
// 插入四叉树
try {

View File

@ -74,7 +74,6 @@ std::vector<Vehicle> MockDataService::generateVehicleData() {
}
Vector2D MockDataService::generatePosition(AreaType areaType) {
const auto& config = bounds_.getAreaConfig(areaType);
const auto& areaBounds = bounds_.getAreaBounds(areaType);
std::uniform_real_distribution<double> x_dist(areaBounds.x,

View File

@ -0,0 +1,361 @@
#include "ConfigHttpServer.h"
#include "core/System.h"
#include "utils/Logger.h"
#include "vehicle/ControllableVehicles.h"
#include <boost/asio/dispatch.hpp>
#include <boost/asio/strand.hpp>
#include <nlohmann/json.hpp>
#include <chrono>
#include <unordered_map>
namespace network {
namespace beast = boost::beast;
namespace http = beast::http;
namespace net = boost::asio;
using tcp = boost::asio::ip::tcp;
using json = nlohmann::json;
namespace {
http::response<http::string_body> json_response(const http::request<http::string_body>& req,
http::status status,
const json& body) {
http::response<http::string_body> res{status, req.version()};
res.set(http::field::server, BOOST_BEAST_VERSION_STRING);
res.set(http::field::content_type, "application/json");
res.keep_alive(req.keep_alive());
res.body() = body.dump();
res.prepare_payload();
return res;
}
http::response<http::string_body> method_not_allowed(const http::request<http::string_body>& req,
const char* allow) {
http::response<http::string_body> res{http::status::method_not_allowed, req.version()};
res.set(http::field::server, BOOST_BEAST_VERSION_STRING);
res.set(http::field::content_type, "application/json");
res.set(http::field::allow, allow);
res.keep_alive(req.keep_alive());
res.body() = json{{"status", "error"}, {"message", "Method Not Allowed"}}.dump();
res.prepare_payload();
return res;
}
} // namespace
class ConfigSession : public std::enable_shared_from_this<ConfigSession> {
tcp::socket socket_;
beast::flat_buffer buffer_;
http::request<http::string_body> req_;
net::strand<net::io_context::executor_type> strand_;
System& system_;
public:
explicit ConfigSession(tcp::socket&& socket, net::io_context& ioc, System& system_ref)
: socket_(std::move(socket)), strand_(ioc.get_executor()), system_(system_ref) {}
void run() {
net::dispatch(strand_, [self = shared_from_this()]() { self->do_read(); });
}
private:
void do_read() {
req_ = {};
http::async_read(socket_, buffer_, req_,
net::bind_executor(strand_,
[self = shared_from_this()](beast::error_code ec, std::size_t bytes) {
boost::ignore_unused(bytes);
self->on_read(ec);
}));
}
void on_read(beast::error_code ec) {
if (ec == http::error::end_of_stream) {
return do_close();
}
if (ec) {
Logger::error("ConfigSession read error: ", ec.message());
return;
}
handle_request();
}
void handle_request() {
if (req_.method() != http::verb::post) {
return send_response(method_not_allowed(req_, "POST"));
}
// Endpoint A: 前端车辆注册表(增量更新)
// POST /api/VehicleRegistry
// Body: [{"vehicleID":"A001","vehicleType":"WUREN"}, ...]
if (req_.target() == "/api/VehicleRegistry") {
try {
json body = json::parse(req_.body());
if (!body.is_array()) {
return send_response(json_response(req_, http::status::bad_request,
json{{"status", "error"}, {"message", "Body must be an array"}}));
}
std::vector<VehicleRegistryEntry> entries;
entries.reserve(body.size());
std::unordered_map<std::string, int> type_counts;
std::vector<std::string> errors;
auto is_allowed_type = [](const std::string& t) {
return t == "WUREN" || t == "TEQIN" || t == "HANGKONG" || t == "PUTONG" || t == "JIUYUAN";
};
for (size_t i = 0; i < body.size(); ++i) {
const auto& item = body.at(i);
if (!item.is_object()) {
errors.push_back("index " + std::to_string(i) + ": item must be an object");
continue;
}
if (!item.contains("vehicleID") || !item.contains("vehicleType")) {
errors.push_back("index " + std::to_string(i) + ": missing vehicleID/vehicleType");
continue;
}
if (!item.at("vehicleID").is_string() || !item.at("vehicleType").is_string()) {
errors.push_back("index " + std::to_string(i) + ": vehicleID/vehicleType must be string");
continue;
}
std::string vehicleID = item.at("vehicleID").get<std::string>();
std::string vehicleType = item.at("vehicleType").get<std::string>();
if (vehicleID.empty()) {
errors.push_back("index " + std::to_string(i) + ": vehicleID must be non-empty");
continue;
}
if (!is_allowed_type(vehicleType)) {
errors.push_back("index " + std::to_string(i) + ": invalid vehicleType=" + vehicleType);
continue;
}
entries.push_back(VehicleRegistryEntry{vehicleID, vehicleType});
type_counts[vehicleType] += 1;
}
if (!errors.empty()) {
return send_response(json_response(req_, http::status::bad_request,
json{{"status", "error"}, {"message", "Invalid request"}, {"errors", errors}}));
}
auto& registry = ControllableVehicles::getInstance();
registry.updateRegistry(entries);
auto controllables = registry.getControllableVehicleIdsSnapshot();
std::vector<std::string> controllable_ids;
controllable_ids.reserve(controllables.size());
for (const auto& id : controllables) {
controllable_ids.push_back(id);
}
auto now_ms = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::system_clock::now().time_since_epoch()).count();
return send_response(json_response(req_, http::status::ok,
json{{"status", "success"},
{"updatedAt", now_ms},
{"updated", static_cast<int>(entries.size())},
{"controllableCount", static_cast<int>(controllable_ids.size())},
{"typesCount", type_counts},
{"controllableVehicleIDs", controllable_ids}}));
} catch (const json::parse_error& e) {
return send_response(json_response(req_, http::status::bad_request,
json{{"status", "error"}, {"message", "Invalid JSON"}, {"detail", e.what()}}));
} catch (const std::exception& e) {
return send_response(json_response(req_, http::status::internal_server_error,
json{{"status", "error"}, {"message", "Internal error"}, {"detail", e.what()}}));
}
}
// Endpoint B: 运行中动态修改配置
// POST /config/runway/warning_zone_radius/aircraft
// Body: {"value": 300}
if (req_.target() != "/config/runway/warning_zone_radius/aircraft") {
return send_response(json_response(req_, http::status::not_found,
json{{"status", "error"}, {"message", "Not Found"}}));
}
try {
json body = json::parse(req_.body());
if (!body.contains("value")) {
return send_response(json_response(req_, http::status::bad_request,
json{{"status", "error"}, {"message", "Missing field: value"}}));
}
double value = body.at("value").get<double>();
double oldValue = 0.0;
if (!system_.setRunwayWarningZoneAircraftRadius(value, &oldValue)) {
return send_response(json_response(req_, http::status::internal_server_error,
json{{"status", "error"}, {"message", "Failed to update config"}}));
}
Logger::info("Updated runway warning_zone_radius.aircraft: ", oldValue, " -> ", value);
return send_response(json_response(req_, http::status::ok,
json{{"status", "success"},
{"area", "runway"},
{"field", "warning_zone_radius.aircraft"},
{"old", oldValue},
{"new", value}}));
} catch (const json::parse_error& e) {
return send_response(json_response(req_, http::status::bad_request,
json{{"status", "error"}, {"message", "Invalid JSON"}, {"detail", e.what()}}));
} catch (const std::exception& e) {
return send_response(json_response(req_, http::status::internal_server_error,
json{{"status", "error"}, {"message", "Internal error"}, {"detail", e.what()}}));
}
}
void send_response(http::response<http::string_body>&& res) {
auto sp = std::make_shared<http::response<http::string_body>>(std::move(res));
http::async_write(socket_, *sp,
net::bind_executor(strand_,
[self = shared_from_this(), sp](beast::error_code ec, std::size_t bytes) {
boost::ignore_unused(bytes);
self->on_write(sp->need_eof(), ec);
}));
}
void on_write(bool close, beast::error_code ec) {
if (ec) {
Logger::error("ConfigSession write error: ", ec.message());
return;
}
if (close) {
return do_close();
}
do_read();
}
void do_close() {
beast::error_code ec;
socket_.shutdown(tcp::socket::shutdown_send, ec);
}
};
class ConfigListener : public std::enable_shared_from_this<ConfigListener> {
net::io_context& ioc_;
tcp::acceptor acceptor_;
net::strand<net::io_context::executor_type> strand_;
int max_connections_;
System& system_;
public:
ConfigListener(net::io_context& ioc, tcp::endpoint endpoint, int max_connections, System& system_ref)
: ioc_(ioc)
, acceptor_(ioc)
, strand_(ioc.get_executor())
, max_connections_(max_connections)
, system_(system_ref) {
beast::error_code ec;
acceptor_.open(endpoint.protocol(), ec);
if (ec) throw beast::system_error{ec};
acceptor_.set_option(net::socket_base::reuse_address(true), ec);
if (ec) throw beast::system_error{ec};
acceptor_.bind(endpoint, ec);
if (ec) throw beast::system_error{ec};
acceptor_.listen(net::socket_base::max_listen_connections, ec);
if (ec) throw beast::system_error{ec};
}
void run() { do_accept(); }
void stop() {
beast::error_code ec;
acceptor_.close(ec);
}
private:
void do_accept() {
acceptor_.async_accept(
net::bind_executor(strand_,
[self = shared_from_this()](beast::error_code ec, tcp::socket socket) {
self->on_accept(ec, std::move(socket));
}));
}
void on_accept(beast::error_code ec, tcp::socket socket) {
if (ec) {
if (ec != net::error::operation_aborted) {
Logger::error("ConfigListener accept error: ", ec.message());
}
return;
}
std::make_shared<ConfigSession>(std::move(socket), ioc_, system_)->run();
do_accept();
}
};
ConfigHttpServer::ConfigHttpServer(uint16_t port, int max_connections, System& system_ref)
: port_(port), max_connections_(max_connections), ioc_(1), system_(system_ref) {}
ConfigHttpServer::~ConfigHttpServer() {
if (running_.load()) {
stop();
}
}
void ConfigHttpServer::start(int num_threads) {
if (running_.exchange(true)) {
return;
}
if (num_threads < 1) num_threads = 1;
auto const address = net::ip::make_address("0.0.0.0");
try {
listener_ = std::make_shared<ConfigListener>(
ioc_, tcp::endpoint{address, port_}, max_connections_, system_);
listener_->run();
Logger::info("ConfigHttpServer listening on ", address.to_string(), ":", port_);
threads_.reserve(num_threads);
for (int i = 0; i < num_threads; ++i) {
threads_.emplace_back([this] { run_ioc(); });
}
} catch (const std::exception& e) {
running_ = false;
Logger::error("Failed to start ConfigHttpServer: ", e.what());
throw;
}
}
void ConfigHttpServer::stop() {
if (!running_.exchange(false)) {
return;
}
Logger::info("Stopping ConfigHttpServer...");
net::post(ioc_, [this]() {
if (listener_) {
listener_->stop();
listener_.reset();
}
});
ioc_.stop();
for (auto& t : threads_) {
if (t.joinable()) t.join();
}
threads_.clear();
Logger::info("ConfigHttpServer stopped.");
}
void ConfigHttpServer::run_ioc() {
try {
ioc_.run();
} catch (const std::exception& e) {
Logger::error("Exception in ConfigHttpServer I/O thread: ", e.what());
}
}
} // namespace network

View File

@ -0,0 +1,50 @@
#pragma once
#include <boost/beast/core.hpp>
#include <boost/beast/http.hpp>
#include <boost/beast/version.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/io_context.hpp>
#include <atomic>
#include <cstdint>
#include <memory>
#include <string>
#include <thread>
#include <vector>
class System;
namespace network {
namespace beast = boost::beast;
namespace http = beast::http;
namespace net = boost::asio;
using tcp = boost::asio::ip::tcp;
class ConfigListener;
// 提供一个极简配置更新 HTTP 接口(给前端调用)
class ConfigHttpServer {
uint16_t port_;
int max_connections_;
net::io_context ioc_;
std::shared_ptr<ConfigListener> listener_;
std::vector<std::thread> threads_;
std::atomic<bool> running_{false};
System& system_;
public:
ConfigHttpServer(uint16_t port, int max_connections, System& system_ref);
~ConfigHttpServer();
ConfigHttpServer(const ConfigHttpServer&) = delete;
ConfigHttpServer& operator=(const ConfigHttpServer&) = delete;
void start(int num_threads = 1);
void stop();
private:
void run_ioc();
};
} // namespace network

View File

@ -39,7 +39,7 @@ size_t HTTPClient::WriteCallback(void* contents, size_t size, size_t nmemb, void
return total_size;
}
bool HTTPClient::sendCommand(const std::string& ip, int port, const VehicleCommand& command) const {
bool HTTPClient::sendCommand(const std::string& host, int port, const std::string& command_path, const VehicleCommand& command) const {
if (!curl_) {
Logger::error("CURL not initialized");
return false;
@ -47,7 +47,11 @@ bool HTTPClient::sendCommand(const std::string& ip, int port, const VehicleComma
// 构造请求URL
std::stringstream url;
url << "http://" << ip << ":" << port << "/api/VehicleCommandInfo";
url << "http://" << host << ":" << port;
if (!command_path.empty() && command_path.front() != '/') {
url << '/';
}
url << command_path;
// 生成消息唯一 id
std::stringstream transId;
@ -80,15 +84,21 @@ bool HTTPClient::sendCommand(const std::string& ip, int port, const VehicleComma
}
}()},
{"latitude", command.latitude},
{"longitude", command.longitude},
{"signalState", getSignalStateString(command.signalState)},
{"intersectionId", command.intersectionId},
{"relativeSpeed", command.relativeSpeed},
{"relativeMotionX", command.relativeMotionX},
{"relativeMotionY", command.relativeMotionY},
{"minDistance", command.minDistance}
{"longitude", command.longitude}
};
if (command.type == CommandType::SIGNAL) {
request["signalState"] = getSignalStateString(command.signalState);
request["intersectionId"] = command.intersectionId;
}
if (command.type == CommandType::ALERT || command.type == CommandType::WARNING) {
request["relativeSpeed"] = command.relativeSpeed;
request["relativeMotionX"] = command.relativeMotionX;
request["relativeMotionY"] = command.relativeMotionY;
request["minDistance"] = command.minDistance;
}
std::string request_body = request.dump();
response_buffer_.clear();

View File

@ -11,7 +11,7 @@ public:
~HTTPClient();
// 发送控制指令
bool sendCommand(const std::string& ip, int port, const VehicleCommand& command) const;
bool sendCommand(const std::string& host, int port, const std::string& command_path, const VehicleCommand& command) const;
private:
static size_t WriteCallback(void* contents, size_t size, size_t nmemb, void* userp);

View File

@ -1,6 +1,10 @@
#include <boost/beast/core.hpp>
#include <boost/beast/core.hpp>
#include <boost/beast/websocket.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/dispatch.hpp>
#include <algorithm>
#include <future>
#include "network/WebSocketServer.h"
#include "utils/Logger.h"
#include <nlohmann/json.hpp>
@ -15,7 +19,9 @@ namespace network {
std::lock_guard<std::mutex> lock(sessions_mutex_);
for (auto& session : sessions_) {
try {
session.lock()->close(boost::beast::websocket::close_code::normal);
if (auto ws = session.lock()) {
ws->close(boost::beast::websocket::close_code::normal);
}
} catch (...) {
// 忽略关闭时的错误
}
@ -27,35 +33,55 @@ namespace network {
}
void WebSocketServer::start() {
running_.store(true, std::memory_order_relaxed);
Logger::info("WebSocket 服务器启动,监听端口: ", acceptor_.local_endpoint().port());
handleAccept();
ioc_.run();
}
void WebSocketServer::broadcast(const std::string& message) {
std::lock_guard<std::mutex> lock(sessions_mutex_);
auto it = sessions_.begin();
int successCount = 0;
int failCount = 0;
while (it != sessions_.end()) {
if (auto session = it->lock()) { // 获取 shared_ptr
try {
session->write(boost::asio::buffer(message));
++successCount;
++it;
} catch (...) {
Logger::warning("广播消息到客户端失败");
++failCount;
it = sessions_.erase(it); // 移除失败的会话
}
} else {
Logger::debug("移除失效的会话");
it = sessions_.erase(it); // 移除已失效的会话
}
if (!running_.load(std::memory_order_relaxed)) {
return;
}
Logger::debug("广播消息完成: ", successCount, " 个成功, ", failCount, " 个失败, 当前共 ", sessions_.size(), " 个连接");
auto msg = std::make_shared<std::string>(message);
auto do_broadcast = [this, msg]() {
std::lock_guard<std::mutex> lock(sessions_mutex_);
auto it = sessions_.begin();
int successCount = 0;
int failCount = 0;
while (it != sessions_.end()) {
if (auto session = it->lock()) {
try {
session->write(boost::asio::buffer(*msg));
++successCount;
++it;
} catch (...) {
Logger::warning("广播消息到客户端失败");
++failCount;
it = sessions_.erase(it);
}
} else {
Logger::debug("移除失效的会话");
it = sessions_.erase(it);
}
}
Logger::debug("广播消息完成: ", successCount, " 个成功, ", failCount, " 个失败, 当前共 ", sessions_.size(), " 个连接");
};
// 保持原语义broadcast 返回前完成实际发送,但确保所有 websocket 操作在 io_context 线程执行
auto done = std::make_shared<std::promise<void>>();
auto fut = done->get_future();
// dispatch: 若已在 strand 线程内会直接执行,避免等待导致自锁;否则投递到 io_context 线程执行
boost::asio::dispatch(strand_, [do_broadcast, done]() mutable {
do_broadcast();
done->set_value();
});
fut.wait();
}
void WebSocketServer::handleAccept() {
@ -118,7 +144,7 @@ namespace network {
}
void WebSocketServer::stop() {
running_ = false;
running_.store(false, std::memory_order_relaxed);
Logger::info("正在停止 WebSocket 服务器...");
ioc_.stop(); // 停止 io_context

View File

@ -23,10 +23,11 @@ private:
void doRead(std::shared_ptr<boost::beast::websocket::stream<boost::asio::ip::tcp::socket>> ws);
boost::asio::io_context ioc_;
boost::asio::io_context::strand strand_{ioc_};
boost::asio::ip::tcp::acceptor acceptor_;
std::vector<std::weak_ptr<boost::beast::websocket::stream<boost::asio::ip::tcp::socket>>> sessions_;
std::mutex sessions_mutex_;
std::atomic<bool> running_{true};
std::atomic<bool> running_{false};
};
} // namespace network

View File

@ -7,6 +7,8 @@
#include <iomanip>
#include <sstream>
#include <mutex>
#include <atomic>
#include <ctime>
enum class LogLevel {
DEBUG,
@ -19,46 +21,45 @@ class Logger {
public:
static void initialize(const std::string& filename, LogLevel level = LogLevel::INFO) {
std::lock_guard<std::mutex> lock(getMutex());
currentLevel() = level;
currentLevel().store(level, std::memory_order_relaxed);
logFile().open(filename, std::ios::app);
}
static void setLogLevel(LogLevel level) {
std::lock_guard<std::mutex> lock(getMutex());
currentLevel() = level;
currentLevel().store(level, std::memory_order_relaxed);
}
template<typename... Args>
static void debug(Args... args) {
if (currentLevel() <= LogLevel::DEBUG) {
if (currentLevel().load(std::memory_order_relaxed) <= LogLevel::DEBUG) {
log("DEBUG", args...);
}
}
template<typename... Args>
static void info(Args... args) {
if (currentLevel() <= LogLevel::INFO) {
if (currentLevel().load(std::memory_order_relaxed) <= LogLevel::INFO) {
log("INFO", args...);
}
}
template<typename... Args>
static void warning(Args... args) {
if (currentLevel() <= LogLevel::WARNING) {
if (currentLevel().load(std::memory_order_relaxed) <= LogLevel::WARNING) {
log("WARNING", args...);
}
}
template<typename... Args>
static void error(Args... args) {
if (currentLevel() <= LogLevel::ERROR) {
if (currentLevel().load(std::memory_order_relaxed) <= LogLevel::ERROR) {
log("ERROR", args...);
}
}
private:
static LogLevel& currentLevel() {
static LogLevel level = LogLevel::INFO;
static std::atomic<LogLevel>& currentLevel() {
static std::atomic<LogLevel> level{ LogLevel::INFO };
return level;
}
@ -71,6 +72,16 @@ private:
static std::mutex mutex;
return mutex;
}
static std::tm safeLocalTime(std::time_t t) {
std::tm tm{};
#if defined(_WIN32)
localtime_s(&tm, &t);
#else
localtime_r(&t, &tm);
#endif
return tm;
}
template<typename... Args>
static void log(const char* level, Args... args) {
@ -78,9 +89,11 @@ private:
auto now_c = std::chrono::system_clock::to_time_t(now);
auto now_ms = std::chrono::duration_cast<std::chrono::milliseconds>(
now.time_since_epoch()) % 1000;
const std::tm tm = safeLocalTime(now_c);
std::stringstream ss;
ss << std::put_time(std::localtime(&now_c), "%Y-%m-%d %H:%M:%S")
ss << std::put_time(&tm, "%Y-%m-%d %H:%M:%S")
<< '.' << std::setfill('0') << std::setw(3) << now_ms.count()
<< " [" << level << "] ";

View File

@ -1,23 +1,21 @@
#include "ControllableVehicles.h"
#include "utils/Logger.h"
#include <fstream>
#include <stdexcept>
#include <mutex>
#include "config/SystemConfig.h"
// 定义静态成员变量
ControllableVehicles* ControllableVehicles::instance_ = nullptr;
ControllableVehicles& ControllableVehicles::getInstance() {
if (instance_ == nullptr) {
instance_ = new ControllableVehicles("config/unmanned_vehicles.json");
instance_ = new ControllableVehicles();
}
return *instance_;
}
ControllableVehicles::ControllableVehicles(const std::string& configFile)
ControllableVehicles::ControllableVehicles()
: http_client_(std::make_unique<HTTPClient>()) {
if (!configFile.empty()) {
loadConfig(configFile);
}
}
ControllableVehicles::~ControllableVehicles() {
@ -27,67 +25,62 @@ ControllableVehicles::~ControllableVehicles() {
}
}
const std::vector<ControllableVehicleConfig>& ControllableVehicles::getVehicles() const {
return vehicles_;
void ControllableVehicles::updateRegistry(const std::vector<VehicleRegistryEntry>& entries) {
if (entries.empty()) {
return;
}
int applied = 0;
{
std::unique_lock lock(mutex_);
for (const auto& e : entries) {
if (e.vehicleID.empty()) {
continue;
}
vehicle_type_by_id_[e.vehicleID] = e.vehicleType;
if (e.vehicleType == "WUREN") {
controllable_vehicle_ids_.insert(e.vehicleID);
} else {
controllable_vehicle_ids_.erase(e.vehicleID);
}
applied++;
}
}
Logger::info("Vehicle registry updated. applied=", applied,
" total=", vehicle_type_by_id_.size(),
" controllable(WUREN)=", controllable_vehicle_ids_.size());
}
const ControllableVehicleConfig* ControllableVehicles::findVehicle(const std::string& vehicleNo) const {
auto iter = std::find_if(vehicles_.begin(), vehicles_.end(),
[&](const ControllableVehicleConfig& config) {
return config.vehicleNo == vehicleNo;
});
std::optional<std::string> ControllableVehicles::getVehicleType(const std::string& vehicleID) const {
std::shared_lock lock(mutex_);
auto it = vehicle_type_by_id_.find(vehicleID);
if (it == vehicle_type_by_id_.end()) {
return std::nullopt;
}
return it->second;
}
return iter != vehicles_.end() ? &(*iter) : nullptr;
std::unordered_set<std::string> ControllableVehicles::getControllableVehicleIdsSnapshot() const {
std::shared_lock lock(mutex_);
return controllable_vehicle_ids_;
}
bool ControllableVehicles::isControllable(const std::string& vehicleNo) const {
// 查找车辆配置
auto* config = findVehicle(vehicleNo);
if (!config) {
return false;
}
// 只有无人车类型是可控的
return config->type == "UNMANNED";
}
bool ControllableVehicles::loadConfig(const std::string& configFile) {
try {
std::ifstream file(configFile);
if (!file.is_open()) {
Logger::error("Failed to open controllable vehicles config file: ", configFile);
return false;
}
nlohmann::json jsonConfig;
file >> jsonConfig;
for (const auto& item : jsonConfig["vehicles"]) {
ControllableVehicleConfig config;
config.vehicleNo = item["vehicleNo"].get<std::string>();
config.type = item["type"].get<std::string>();
config.ip = item["ip"].get<std::string>();
config.port = item["port"].get<int>();
vehicles_.push_back(config);
Logger::info("Added vehicle: ", config.vehicleNo, ", type: ", config.type);
}
Logger::info("Loaded ", vehicles_.size(), " controllable vehicles");
return true;
} catch (const std::exception& e) {
Logger::error("Failed to parse controllable vehicles config: ", e.what());
return false;
}
std::shared_lock lock(mutex_);
return controllable_vehicle_ids_.find(vehicleNo) != controllable_vehicle_ids_.end();
}
bool ControllableVehicles::sendCommand(const std::string& vehicleNo, const VehicleCommand& command) {
auto vehicle = findVehicle(vehicleNo);
if (!vehicle) {
Logger::error("Vehicle ", vehicleNo, " not found in controllable vehicles");
if (!isControllable(vehicleNo)) {
Logger::debug("Skip sendCommand: vehicle not controllable: ", vehicleNo);
return false;
}
try {
bool success = http_client_->sendCommand(vehicle->ip, vehicle->port, command);
const auto& cfg = SystemConfig::instance().data_source.vehicle;
bool success = http_client_->sendCommand(cfg.host, cfg.port, cfg.command_path, command);
if (success) {
Logger::info("Successfully sent command to vehicle ", vehicleNo, ": ",
command.type == CommandType::SIGNAL ? "SIGNAL" :

View File

@ -1,34 +1,44 @@
#pragma once
#include <string>
#include <vector>
#include <memory>
#include <optional>
#include <shared_mutex>
#include <unordered_map>
#include <unordered_set>
#include "types/VehicleCommand.h"
#include "network/HTTPClient.h"
struct ControllableVehicleConfig {
std::string vehicleNo; // 车牌号
std::string type; // 车辆类型UNMANNED 或 SPECIAL
std::string ip; // IP地址
int port; // 端口号
struct VehicleRegistryEntry {
std::string vehicleID;
std::string vehicleType; // WUREN/TEQIN/HANGKONG/PUTONG/JIUYUAN
};
class ControllableVehicles {
private:
static ControllableVehicles* instance_;
ControllableVehicles(const std::string& configFile);
ControllableVehicles();
ControllableVehicles(const ControllableVehicles&) = delete;
ControllableVehicles& operator=(const ControllableVehicles&) = delete;
~ControllableVehicles();
std::vector<ControllableVehicleConfig> vehicles_;
// 运行时车辆类型注册表(由前端接口增量更新)
mutable std::shared_mutex mutex_;
std::unordered_map<std::string, std::string> vehicle_type_by_id_;
std::unordered_set<std::string> controllable_vehicle_ids_; // vehicleType == WUREN
std::unique_ptr<HTTPClient> http_client_;
bool loadConfig(const std::string& configFile);
public:
static ControllableVehicles& getInstance();
const std::vector<ControllableVehicleConfig>& getVehicles() const;
const ControllableVehicleConfig* findVehicle(const std::string& vehicleNo) const;
// 增量更新车辆注册表(仅更新 entries 中出现的 vehicleID并同步可控车辆集合WUREN
void updateRegistry(const std::vector<VehicleRegistryEntry>& entries);
// 查询
std::optional<std::string> getVehicleType(const std::string& vehicleID) const;
std::unordered_set<std::string> getControllableVehicleIdsSnapshot() const;
bool isControllable(const std::string& vehicleNo) const;
bool sendCommand(const std::string& vehicleNo, const VehicleCommand& command);
};

27
命令.md Normal file
View File

@ -0,0 +1,27 @@
rm -rf build
export PATH=/opt/rh/devtoolset-9/root/usr/bin:$PATH
export LD_LIBRARY_PATH=/opt/rh/devtoolset-9/root/usr/lib64:$LD_LIBRARY_PATH
/opt/cmake/bin/cmake -S . -B build -DCMAKE_BUILD_TYPE=Release \
-DLINK_STATIC_DEPS=ON \
-DFULL_STATIC=OFF \
-DBoost_NO_SYSTEM_PATHS=ON \
-DBOOST_ROOT=/opt/boost-1.69 \
-DBOOST_LIBRARYDIR=/opt/boost-1.69/lib
/opt/cmake/bin/cmake --build build -j
ldd /opt/collision_avoidance/bin/collision_avoidancenew | grep "not found" || true
docker run --rm -it --network host -v D:/App/C++/CollisionAvoidance:/src -w /src centos:7 bash
docker commit --pause=false 5f4a906adb1b qdairporttestbackend:20260128

BIN
账号信息_20251107.docx Normal file

Binary file not shown.