提升稳定性

This commit is contained in:
sladro 2026-02-04 10:27:59 +08:00
parent 97c0d3196f
commit 98d8da5a17
8 changed files with 94 additions and 58 deletions

View File

@ -23,10 +23,13 @@ bool DataCollector::initialize(const DataSourceConfig& dataSourceConfig,
dataSourceConfig_ = dataSourceConfig;
dataSource_ = std::make_shared<HTTPDataSource>(dataSourceConfig_);
warnConfig_ = warnConfig;
last_position_fetch_ = std::chrono::steady_clock::now();
last_unmanned_fetch_ = std::chrono::steady_clock::now();
last_traffic_light_fetch_ = std::chrono::steady_clock::now();
last_warning_time_ = std::chrono::steady_clock::now();
{
std::lock_guard<std::mutex> lk(timeoutMutex_);
last_position_fetch_ = std::chrono::steady_clock::now();
last_unmanned_fetch_ = std::chrono::steady_clock::now();
last_traffic_light_fetch_ = std::chrono::steady_clock::now();
last_warning_time_ = std::chrono::steady_clock::now();
}
}
return true;
}
@ -42,9 +45,12 @@ void DataCollector::start() {
}
running_ = true;
last_position_fetch_ = std::chrono::steady_clock::now();
last_unmanned_fetch_ = std::chrono::steady_clock::now();
last_traffic_light_fetch_ = std::chrono::steady_clock::now();
{
std::lock_guard<std::mutex> lk(timeoutMutex_);
last_position_fetch_ = std::chrono::steady_clock::now();
last_unmanned_fetch_ = std::chrono::steady_clock::now();
last_traffic_light_fetch_ = std::chrono::steady_clock::now();
}
// 尝试连接,但即使失败也继续运行
if (!dataSource_->connect()) {
@ -184,10 +190,20 @@ void DataCollector::trafficLightLoop() {
void DataCollector::checkTimeout() {
auto now = std::chrono::steady_clock::now();
std::chrono::steady_clock::time_point last_pos;
std::chrono::steady_clock::time_point last_unmanned;
std::chrono::steady_clock::time_point last_tl;
{
std::lock_guard<std::mutex> lk(timeoutMutex_);
last_pos = last_position_fetch_;
last_unmanned = last_unmanned_fetch_;
last_tl = last_traffic_light_fetch_;
}
// 检查位置数据超时
auto position_elapsed =
std::chrono::duration_cast<std::chrono::milliseconds>(
now - last_position_fetch_)
now - last_pos)
.count();
if (position_elapsed > dataSourceConfig_.position.timeout_ms) {
sendTimeoutWarning("position", position_elapsed);
@ -196,7 +212,7 @@ void DataCollector::checkTimeout() {
// 检查无人车数据超时
auto unmanned_elapsed =
std::chrono::duration_cast<std::chrono::milliseconds>(
now - last_unmanned_fetch_)
now - last_unmanned)
.count();
if (unmanned_elapsed > dataSourceConfig_.vehicle.timeout_ms) {
sendTimeoutWarning("unmanned", unmanned_elapsed);
@ -205,7 +221,7 @@ void DataCollector::checkTimeout() {
// 检查红绿灯数据超时
auto traffic_light_elapsed =
std::chrono::duration_cast<std::chrono::milliseconds>(
now - last_traffic_light_fetch_)
now - last_tl)
.count();
if (traffic_light_elapsed > dataSourceConfig_.traffic_light.timeout_ms) {
sendTimeoutWarning("traffic_light", traffic_light_elapsed);
@ -215,6 +231,8 @@ void DataCollector::checkTimeout() {
void DataCollector::resetTimeout(const std::string& source) {
auto now = std::chrono::steady_clock::now();
std::lock_guard<std::mutex> lk(timeoutMutex_);
if (source == "position") {
auto elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(
now - last_position_fetch_)
@ -249,20 +267,29 @@ void DataCollector::resetTimeout(const std::string& source) {
void DataCollector::sendTimeoutWarning(const std::string& source,
int64_t elapsed_ms) {
auto now = std::chrono::steady_clock::now();
auto warning_elapsed =
std::chrono::duration_cast<std::chrono::milliseconds>(
now - last_warning_time_)
.count();
{
std::lock_guard<std::mutex> lk(timeoutMutex_);
auto warning_elapsed =
std::chrono::duration_cast<std::chrono::milliseconds>(
now - last_warning_time_)
.count();
if (warning_elapsed >= warnConfig_.warning_interval_ms) {
if (!system_) {
Logger::debug("System not set, skipping timeout warning");
if (warning_elapsed < warnConfig_.warning_interval_ms) {
return;
}
network::TimeoutWarningMessage msg;
msg.source = source;
msg.elapsed_ms = elapsed_ms;
// 预占位更新时间,避免并发线程重复刷屏
last_warning_time_ = now;
}
if (!system_) {
Logger::debug("System not set, skipping timeout warning");
return;
}
network::TimeoutWarningMessage msg;
msg.source = source;
msg.elapsed_ms = elapsed_ms;
// 根据数据源类型设置超时阈值
int64_t timeout_ms = 0;
@ -274,11 +301,9 @@ void DataCollector::sendTimeoutWarning(const std::string& source,
timeout_ms = dataSourceConfig_.traffic_light.timeout_ms;
}
msg.is_read_timeout = (elapsed_ms > timeout_ms);
msg.is_read_timeout = (elapsed_ms > timeout_ms);
system_->broadcastTimeoutWarning(msg);
last_warning_time_ = now;
}
system_->broadcastTimeoutWarning(msg);
}
void DataCollector::filterPositionData(std::vector<Aircraft>& aircraft,
@ -458,7 +483,10 @@ void DataCollector::injectTrafficLightSignals(std::vector<TrafficLightSignal> si
}
// 用于超时检测:收到推送即认为链路正常
last_traffic_light_fetch_ = std::chrono::steady_clock::now();
{
std::lock_guard<std::mutex> lk(timeoutMutex_);
last_traffic_light_fetch_ = std::chrono::steady_clock::now();
}
}
bool DataCollector::fetchTrafficLightSignals(

View File

@ -52,17 +52,6 @@ public:
void setSystem(std::shared_ptr<System> system) { system_ = system; }
void setDataSource(std::shared_ptr<DataSource> source) { dataSource_ = source; }
// 获取数据的引用访问接口
const std::vector<Aircraft>& getAircraftRef() const {
std::lock_guard<std::mutex> lock(cacheMutex_);
return aircraftCache_;
}
const std::vector<Vehicle>& getVehicleRef() const {
std::lock_guard<std::mutex> lock(cacheMutex_);
return vehicleCache_;
}
private:
std::shared_ptr<DataSource> dataSource_;
@ -99,6 +88,9 @@ private:
std::chrono::steady_clock::time_point last_traffic_light_fetch_; // 红绿灯数据最后获取时间
std::chrono::steady_clock::time_point last_warning_time_;
std::chrono::steady_clock::time_point last_log_time_;
// 保护超时/告警相关 time_point 并发访问position/unmanned/TCP 注入线程可能同时读写)
mutable std::mutex timeoutMutex_;
void checkTimeout();
void resetTimeout(const std::string& source);

View File

@ -686,23 +686,14 @@ void System::broadcastVehicleCommand(const VehicleCommand& cmd) {
}
const MovingObject* System::findVehicle(const std::string& vehicleId) const {
if (!dataCollector_) {
return nullptr;
}
// 获取数据引用注意DataCollector 内部会加锁)
const auto& aircrafts = dataCollector_->getAircraftRef();
const auto& vehicles = dataCollector_->getVehicleRef();
// 在航空器中查找
for (const auto& aircraft : aircrafts) {
// 仅使用本轮 processLoop 已获取的快照latest_*),避免跨线程访问 DataCollector 内部缓存导致 data race / 悬空引用。
for (const auto& aircraft : latest_aircraft_) {
if (aircraft.flightNo == vehicleId) {
return &aircraft;
}
}
// 在车辆中查找
for (const auto& vehicle : vehicles) {
for (const auto& vehicle : latest_vehicles_) {
if (vehicle.vehicleNo == vehicleId) {
return &vehicle;
}

View File

@ -287,6 +287,9 @@ private:
if (ec != net::error::operation_aborted) {
Logger::error("ConfigListener accept error: ", ec.message());
}
if (acceptor_.is_open() && ec != net::error::operation_aborted) {
do_accept();
}
return;
}

View File

@ -49,6 +49,7 @@ size_t HTTPClient::WriteCallback(void* contents, size_t size, size_t nmemb, void
}
bool HTTPClient::sendCommand(const std::string& host, int port, const std::string& command_path, const VehicleCommand& command) const {
std::lock_guard<std::mutex> lk(mutex_);
if (!curl_) {
Logger::error("CURL not initialized");
return false;

View File

@ -2,6 +2,7 @@
#include <string>
#include <curl/curl.h>
#include <mutex>
#include "types/VehicleCommand.h"
#include "nlohmann/json.hpp"
@ -16,5 +17,6 @@ public:
private:
static size_t WriteCallback(void* contents, size_t size, size_t nmemb, void* userp);
CURL* curl_;
mutable std::mutex mutex_;
mutable std::string response_buffer_;
};

View File

@ -2,6 +2,7 @@
#include "HTTPClient.h"
#include "utils/Logger.h"
#include <nlohmann/json.hpp>
#include <algorithm>
#include <sstream>
using json = nlohmann::json;
@ -117,18 +118,28 @@ bool HTTPDataSource::ensureConnected() {
}
bool HTTPDataSource::sendRequest(const std::string& url, const AuthState* auth_state,
std::string& response, HttpMethod method, const std::string& body) {
std::string& response, HttpMethod method, const std::string& body,
int connect_timeout_ms, int read_timeout_ms) {
if (!curl_) {
Logger::error("CURL not initialized");
return false;
}
// 防止传入小于 1s 时被 /1000 截断为 0 导致“无限等待”
const int effective_connect_ms = (connect_timeout_ms > 0) ? connect_timeout_ms : config_.position.timeout_ms;
int effective_read_ms = (read_timeout_ms > 0) ? read_timeout_ms : config_.position.read_timeout_ms;
if (effective_read_ms <= 0) {
effective_read_ms = config_.position.timeout_ms;
}
const long connect_ms = static_cast<long>(std::max(1, effective_connect_ms));
const long read_ms = static_cast<long>(std::max(1, effective_read_ms));
curl_easy_reset(curl_);
curl_easy_setopt(curl_, CURLOPT_URL, url.c_str());
curl_easy_setopt(curl_, CURLOPT_WRITEFUNCTION, WriteCallback);
curl_easy_setopt(curl_, CURLOPT_WRITEDATA, &response);
curl_easy_setopt(curl_, CURLOPT_TIMEOUT, config_.position.timeout_ms / 1000);
curl_easy_setopt(curl_, CURLOPT_CONNECTTIMEOUT, config_.position.timeout_ms / 1000);
curl_easy_setopt(curl_, CURLOPT_TIMEOUT_MS, read_ms);
curl_easy_setopt(curl_, CURLOPT_CONNECTTIMEOUT_MS, connect_ms);
curl_easy_setopt(curl_, CURLOPT_NOSIGNAL, 1L);
if (method == HttpMethod::GET) {
@ -202,7 +213,8 @@ bool HTTPDataSource::fetchPositionAircraftData(std::vector<Aircraft>& aircraft)
config_.position.aircraft_path;
std::string response;
if (!sendRequest(url, &position_auth_, response, HttpMethod::GET)) {
if (!sendRequest(url, &position_auth_, response, HttpMethod::GET, "",
config_.position.timeout_ms, config_.position.read_timeout_ms)) {
Logger::error("请求失败");
return false;
}
@ -230,7 +242,8 @@ bool HTTPDataSource::fetchPositionVehicleData(std::vector<Vehicle>& vehicles) {
config_.position.vehicle_path;
std::string response;
if (!sendRequest(url, &position_auth_, response, HttpMethod::GET)) {
if (!sendRequest(url, &position_auth_, response, HttpMethod::GET, "",
config_.position.timeout_ms, config_.position.read_timeout_ms)) {
return false;
}
@ -257,7 +270,8 @@ bool HTTPDataSource::fetchTrafficLightSignals(std::vector<TrafficLightSignal>& s
config_.traffic_light.signal_path;
std::string response;
if (!sendRequest(url, &traffic_light_auth_, response, HttpMethod::GET)) {
if (!sendRequest(url, &traffic_light_auth_, response, HttpMethod::GET, "",
config_.traffic_light.timeout_ms, config_.traffic_light.read_timeout_ms)) {
return false;
}
@ -335,7 +349,8 @@ bool HTTPDataSource::sendUnmannedVehicleCommand(const std::string& vehicle_id, c
request["minDistance"] = command.minDistance;
std::string response;
if (!sendRequest(url, &unmanned_vehicle_auth_, response, HttpMethod::POST, request.dump())) {
if (!sendRequest(url, &unmanned_vehicle_auth_, response, HttpMethod::POST, request.dump(),
config_.vehicle.timeout_ms, config_.vehicle.read_timeout_ms)) {
return false;
}
@ -378,7 +393,8 @@ bool HTTPDataSource::fetchUnmannedVehicleStatus(const std::string& vehicle_id, s
config_.vehicle.status_path + "?vehicleId=" + vehicle_id;
std::string response;
if (!sendRequest(url, &unmanned_vehicle_auth_, response, HttpMethod::GET)) {
if (!sendRequest(url, &unmanned_vehicle_auth_, response, HttpMethod::GET, "",
config_.vehicle.timeout_ms, config_.vehicle.read_timeout_ms)) {
return false;
}
@ -421,7 +437,8 @@ bool HTTPDataSource::authenticatePosition(const AuthConfig& auth_config,
"&password=" + auth_config.password;
std::string response;
if (!sendRequest(url, nullptr, response, HttpMethod::POST, "")) {
if (!sendRequest(url, nullptr, response, HttpMethod::POST, "",
config_.position.timeout_ms, config_.position.read_timeout_ms)) {
Logger::error("认证请求失败");
return false;
}

View File

@ -72,7 +72,9 @@ private:
bool sendRequest(const std::string& url, const AuthState* auth_state,
std::string& response, HttpMethod method = HttpMethod::GET,
const std::string& body = "");
const std::string& body = "",
int connect_timeout_ms = -1,
int read_timeout_ms = -1);
// 位置数据响应解析
bool parsePositionAircraftResponse(const std::string& response, std::vector<Aircraft>& aircraft);