diff --git a/src/collector/DataCollector.cpp b/src/collector/DataCollector.cpp index c666500..409a156 100644 --- a/src/collector/DataCollector.cpp +++ b/src/collector/DataCollector.cpp @@ -23,10 +23,13 @@ bool DataCollector::initialize(const DataSourceConfig& dataSourceConfig, dataSourceConfig_ = dataSourceConfig; dataSource_ = std::make_shared(dataSourceConfig_); warnConfig_ = warnConfig; - last_position_fetch_ = std::chrono::steady_clock::now(); - last_unmanned_fetch_ = std::chrono::steady_clock::now(); - last_traffic_light_fetch_ = std::chrono::steady_clock::now(); - last_warning_time_ = std::chrono::steady_clock::now(); + { + std::lock_guard lk(timeoutMutex_); + last_position_fetch_ = std::chrono::steady_clock::now(); + last_unmanned_fetch_ = std::chrono::steady_clock::now(); + last_traffic_light_fetch_ = std::chrono::steady_clock::now(); + last_warning_time_ = std::chrono::steady_clock::now(); + } } return true; } @@ -42,9 +45,12 @@ void DataCollector::start() { } running_ = true; - last_position_fetch_ = std::chrono::steady_clock::now(); - last_unmanned_fetch_ = std::chrono::steady_clock::now(); - last_traffic_light_fetch_ = std::chrono::steady_clock::now(); + { + std::lock_guard lk(timeoutMutex_); + last_position_fetch_ = std::chrono::steady_clock::now(); + last_unmanned_fetch_ = std::chrono::steady_clock::now(); + last_traffic_light_fetch_ = std::chrono::steady_clock::now(); + } // 尝试连接,但即使失败也继续运行 if (!dataSource_->connect()) { @@ -184,10 +190,20 @@ void DataCollector::trafficLightLoop() { void DataCollector::checkTimeout() { auto now = std::chrono::steady_clock::now(); + std::chrono::steady_clock::time_point last_pos; + std::chrono::steady_clock::time_point last_unmanned; + std::chrono::steady_clock::time_point last_tl; + { + std::lock_guard lk(timeoutMutex_); + last_pos = last_position_fetch_; + last_unmanned = last_unmanned_fetch_; + last_tl = last_traffic_light_fetch_; + } + // 检查位置数据超时 auto position_elapsed = std::chrono::duration_cast( - now - last_position_fetch_) + now - last_pos) .count(); if (position_elapsed > dataSourceConfig_.position.timeout_ms) { sendTimeoutWarning("position", position_elapsed); @@ -196,7 +212,7 @@ void DataCollector::checkTimeout() { // 检查无人车数据超时 auto unmanned_elapsed = std::chrono::duration_cast( - now - last_unmanned_fetch_) + now - last_unmanned) .count(); if (unmanned_elapsed > dataSourceConfig_.vehicle.timeout_ms) { sendTimeoutWarning("unmanned", unmanned_elapsed); @@ -205,7 +221,7 @@ void DataCollector::checkTimeout() { // 检查红绿灯数据超时 auto traffic_light_elapsed = std::chrono::duration_cast( - now - last_traffic_light_fetch_) + now - last_tl) .count(); if (traffic_light_elapsed > dataSourceConfig_.traffic_light.timeout_ms) { sendTimeoutWarning("traffic_light", traffic_light_elapsed); @@ -215,6 +231,8 @@ void DataCollector::checkTimeout() { void DataCollector::resetTimeout(const std::string& source) { auto now = std::chrono::steady_clock::now(); + std::lock_guard lk(timeoutMutex_); + if (source == "position") { auto elapsed = std::chrono::duration_cast( now - last_position_fetch_) @@ -249,20 +267,29 @@ void DataCollector::resetTimeout(const std::string& source) { void DataCollector::sendTimeoutWarning(const std::string& source, int64_t elapsed_ms) { auto now = std::chrono::steady_clock::now(); - auto warning_elapsed = - std::chrono::duration_cast( - now - last_warning_time_) - .count(); + { + std::lock_guard lk(timeoutMutex_); + auto warning_elapsed = + std::chrono::duration_cast( + now - last_warning_time_) + .count(); - if (warning_elapsed >= warnConfig_.warning_interval_ms) { - if (!system_) { - Logger::debug("System not set, skipping timeout warning"); + if (warning_elapsed < warnConfig_.warning_interval_ms) { return; } - network::TimeoutWarningMessage msg; - msg.source = source; - msg.elapsed_ms = elapsed_ms; + // 预占位更新时间,避免并发线程重复刷屏 + last_warning_time_ = now; + } + + if (!system_) { + Logger::debug("System not set, skipping timeout warning"); + return; + } + + network::TimeoutWarningMessage msg; + msg.source = source; + msg.elapsed_ms = elapsed_ms; // 根据数据源类型设置超时阈值 int64_t timeout_ms = 0; @@ -274,11 +301,9 @@ void DataCollector::sendTimeoutWarning(const std::string& source, timeout_ms = dataSourceConfig_.traffic_light.timeout_ms; } - msg.is_read_timeout = (elapsed_ms > timeout_ms); + msg.is_read_timeout = (elapsed_ms > timeout_ms); - system_->broadcastTimeoutWarning(msg); - last_warning_time_ = now; - } + system_->broadcastTimeoutWarning(msg); } void DataCollector::filterPositionData(std::vector& aircraft, @@ -458,7 +483,10 @@ void DataCollector::injectTrafficLightSignals(std::vector si } // 用于超时检测:收到推送即认为链路正常 - last_traffic_light_fetch_ = std::chrono::steady_clock::now(); + { + std::lock_guard lk(timeoutMutex_); + last_traffic_light_fetch_ = std::chrono::steady_clock::now(); + } } bool DataCollector::fetchTrafficLightSignals( diff --git a/src/collector/DataCollector.h b/src/collector/DataCollector.h index d2d3815..0fbf896 100644 --- a/src/collector/DataCollector.h +++ b/src/collector/DataCollector.h @@ -52,17 +52,6 @@ public: void setSystem(std::shared_ptr system) { system_ = system; } void setDataSource(std::shared_ptr source) { dataSource_ = source; } - - // 获取数据的引用访问接口 - const std::vector& getAircraftRef() const { - std::lock_guard lock(cacheMutex_); - return aircraftCache_; - } - - const std::vector& getVehicleRef() const { - std::lock_guard lock(cacheMutex_); - return vehicleCache_; - } private: std::shared_ptr dataSource_; @@ -99,6 +88,9 @@ private: std::chrono::steady_clock::time_point last_traffic_light_fetch_; // 红绿灯数据最后获取时间 std::chrono::steady_clock::time_point last_warning_time_; std::chrono::steady_clock::time_point last_log_time_; + + // 保护超时/告警相关 time_point 并发访问(position/unmanned/TCP 注入线程可能同时读写) + mutable std::mutex timeoutMutex_; void checkTimeout(); void resetTimeout(const std::string& source); diff --git a/src/core/System.cpp b/src/core/System.cpp index 9d4879d..49b9129 100644 --- a/src/core/System.cpp +++ b/src/core/System.cpp @@ -686,23 +686,14 @@ void System::broadcastVehicleCommand(const VehicleCommand& cmd) { } const MovingObject* System::findVehicle(const std::string& vehicleId) const { - if (!dataCollector_) { - return nullptr; - } - - // 获取数据引用(注意:DataCollector 内部会加锁) - const auto& aircrafts = dataCollector_->getAircraftRef(); - const auto& vehicles = dataCollector_->getVehicleRef(); - - // 在航空器中查找 - for (const auto& aircraft : aircrafts) { + // 仅使用本轮 processLoop 已获取的快照(latest_*),避免跨线程访问 DataCollector 内部缓存导致 data race / 悬空引用。 + for (const auto& aircraft : latest_aircraft_) { if (aircraft.flightNo == vehicleId) { return &aircraft; } } - // 在车辆中查找 - for (const auto& vehicle : vehicles) { + for (const auto& vehicle : latest_vehicles_) { if (vehicle.vehicleNo == vehicleId) { return &vehicle; } diff --git a/src/network/ConfigHttpServer.cpp b/src/network/ConfigHttpServer.cpp index 8e82051..1da36ae 100644 --- a/src/network/ConfigHttpServer.cpp +++ b/src/network/ConfigHttpServer.cpp @@ -287,6 +287,9 @@ private: if (ec != net::error::operation_aborted) { Logger::error("ConfigListener accept error: ", ec.message()); } + if (acceptor_.is_open() && ec != net::error::operation_aborted) { + do_accept(); + } return; } diff --git a/src/network/HTTPClient.cpp b/src/network/HTTPClient.cpp index af54a52..2db3273 100644 --- a/src/network/HTTPClient.cpp +++ b/src/network/HTTPClient.cpp @@ -49,6 +49,7 @@ size_t HTTPClient::WriteCallback(void* contents, size_t size, size_t nmemb, void } bool HTTPClient::sendCommand(const std::string& host, int port, const std::string& command_path, const VehicleCommand& command) const { + std::lock_guard lk(mutex_); if (!curl_) { Logger::error("CURL not initialized"); return false; diff --git a/src/network/HTTPClient.h b/src/network/HTTPClient.h index 49da7ef..3136ef3 100644 --- a/src/network/HTTPClient.h +++ b/src/network/HTTPClient.h @@ -2,6 +2,7 @@ #include #include +#include #include "types/VehicleCommand.h" #include "nlohmann/json.hpp" @@ -16,5 +17,6 @@ public: private: static size_t WriteCallback(void* contents, size_t size, size_t nmemb, void* userp); CURL* curl_; + mutable std::mutex mutex_; mutable std::string response_buffer_; }; \ No newline at end of file diff --git a/src/network/HTTPDataSource.cpp b/src/network/HTTPDataSource.cpp index fb325b0..d6ea9f8 100644 --- a/src/network/HTTPDataSource.cpp +++ b/src/network/HTTPDataSource.cpp @@ -2,6 +2,7 @@ #include "HTTPClient.h" #include "utils/Logger.h" #include +#include #include using json = nlohmann::json; @@ -117,18 +118,28 @@ bool HTTPDataSource::ensureConnected() { } bool HTTPDataSource::sendRequest(const std::string& url, const AuthState* auth_state, - std::string& response, HttpMethod method, const std::string& body) { + std::string& response, HttpMethod method, const std::string& body, + int connect_timeout_ms, int read_timeout_ms) { if (!curl_) { Logger::error("CURL not initialized"); return false; } + // 防止传入小于 1s 时被 /1000 截断为 0 导致“无限等待” + const int effective_connect_ms = (connect_timeout_ms > 0) ? connect_timeout_ms : config_.position.timeout_ms; + int effective_read_ms = (read_timeout_ms > 0) ? read_timeout_ms : config_.position.read_timeout_ms; + if (effective_read_ms <= 0) { + effective_read_ms = config_.position.timeout_ms; + } + const long connect_ms = static_cast(std::max(1, effective_connect_ms)); + const long read_ms = static_cast(std::max(1, effective_read_ms)); + curl_easy_reset(curl_); curl_easy_setopt(curl_, CURLOPT_URL, url.c_str()); curl_easy_setopt(curl_, CURLOPT_WRITEFUNCTION, WriteCallback); curl_easy_setopt(curl_, CURLOPT_WRITEDATA, &response); - curl_easy_setopt(curl_, CURLOPT_TIMEOUT, config_.position.timeout_ms / 1000); - curl_easy_setopt(curl_, CURLOPT_CONNECTTIMEOUT, config_.position.timeout_ms / 1000); + curl_easy_setopt(curl_, CURLOPT_TIMEOUT_MS, read_ms); + curl_easy_setopt(curl_, CURLOPT_CONNECTTIMEOUT_MS, connect_ms); curl_easy_setopt(curl_, CURLOPT_NOSIGNAL, 1L); if (method == HttpMethod::GET) { @@ -202,7 +213,8 @@ bool HTTPDataSource::fetchPositionAircraftData(std::vector& aircraft) config_.position.aircraft_path; std::string response; - if (!sendRequest(url, &position_auth_, response, HttpMethod::GET)) { + if (!sendRequest(url, &position_auth_, response, HttpMethod::GET, "", + config_.position.timeout_ms, config_.position.read_timeout_ms)) { Logger::error("请求失败"); return false; } @@ -230,7 +242,8 @@ bool HTTPDataSource::fetchPositionVehicleData(std::vector& vehicles) { config_.position.vehicle_path; std::string response; - if (!sendRequest(url, &position_auth_, response, HttpMethod::GET)) { + if (!sendRequest(url, &position_auth_, response, HttpMethod::GET, "", + config_.position.timeout_ms, config_.position.read_timeout_ms)) { return false; } @@ -257,7 +270,8 @@ bool HTTPDataSource::fetchTrafficLightSignals(std::vector& s config_.traffic_light.signal_path; std::string response; - if (!sendRequest(url, &traffic_light_auth_, response, HttpMethod::GET)) { + if (!sendRequest(url, &traffic_light_auth_, response, HttpMethod::GET, "", + config_.traffic_light.timeout_ms, config_.traffic_light.read_timeout_ms)) { return false; } @@ -335,7 +349,8 @@ bool HTTPDataSource::sendUnmannedVehicleCommand(const std::string& vehicle_id, c request["minDistance"] = command.minDistance; std::string response; - if (!sendRequest(url, &unmanned_vehicle_auth_, response, HttpMethod::POST, request.dump())) { + if (!sendRequest(url, &unmanned_vehicle_auth_, response, HttpMethod::POST, request.dump(), + config_.vehicle.timeout_ms, config_.vehicle.read_timeout_ms)) { return false; } @@ -378,7 +393,8 @@ bool HTTPDataSource::fetchUnmannedVehicleStatus(const std::string& vehicle_id, s config_.vehicle.status_path + "?vehicleId=" + vehicle_id; std::string response; - if (!sendRequest(url, &unmanned_vehicle_auth_, response, HttpMethod::GET)) { + if (!sendRequest(url, &unmanned_vehicle_auth_, response, HttpMethod::GET, "", + config_.vehicle.timeout_ms, config_.vehicle.read_timeout_ms)) { return false; } @@ -421,7 +437,8 @@ bool HTTPDataSource::authenticatePosition(const AuthConfig& auth_config, "&password=" + auth_config.password; std::string response; - if (!sendRequest(url, nullptr, response, HttpMethod::POST, "")) { + if (!sendRequest(url, nullptr, response, HttpMethod::POST, "", + config_.position.timeout_ms, config_.position.read_timeout_ms)) { Logger::error("认证请求失败"); return false; } diff --git a/src/network/HTTPDataSource.h b/src/network/HTTPDataSource.h index 25b42d3..3770723 100644 --- a/src/network/HTTPDataSource.h +++ b/src/network/HTTPDataSource.h @@ -72,7 +72,9 @@ private: bool sendRequest(const std::string& url, const AuthState* auth_state, std::string& response, HttpMethod method = HttpMethod::GET, - const std::string& body = ""); + const std::string& body = "", + int connect_timeout_ms = -1, + int read_timeout_ms = -1); // 位置数据响应解析 bool parsePositionAircraftResponse(const std::string& response, std::vector& aircraft);