增加了接收移动式红绿灯状态的http服务接口,修改了mock_server.py,添加了mock_traffic_light_client.py,用于模拟红绿灯状态的推送。

This commit is contained in:
Tian jianyong 2025-05-06 12:10:39 +08:00
parent e0c873ce5b
commit d645dbc486
9 changed files with 795 additions and 50 deletions

View File

@ -101,6 +101,7 @@ set(LIB_SOURCES
src/network/HTTPDataSource.cpp
src/network/WebSocketServer.cpp
src/network/HTTPClient.cpp
src/network/TrafficLightHttpServer.cpp
src/spatial/CoordinateConverter.cpp
src/types/BasicTypes.cpp
src/types/VehicleData.cpp
@ -124,6 +125,7 @@ target_include_directories(${PROJECT_NAME}_lib
target_link_libraries(${PROJECT_NAME}_lib
PUBLIC
Boost::system
Boost::thread
nlohmann_json::nlohmann_json
CURL::libcurl
Threads::Threads

View File

@ -8,19 +8,71 @@
"longitude": 120.08502054
},
"coordinate_points": [
{"point": "T1", "longitude": 120.0868853, "latitude": 36.35496367},
{"point": "T2", "longitude": 120.08502054, "latitude": 36.35448347},
{"point": "T3", "longitude": 120.08341044, "latitude": 36.35406879},
{"point": "T4", "longitude": 120.08558121, "latitude": 36.35305878},
{"point": "T5", "longitude": 120.08400957, "latitude": 36.35265197},
{"point": "T6", "longitude": 120.08649105, "latitude": 36.35074527},
{"point": "T7", "longitude": 120.08562915, "latitude": 36.35052372},
{"point": "T8", "longitude": 120.08676664, "latitude": 36.35004529},
{"point": "T9", "longitude": 120.08520616, "latitude": 36.34964473},
{"point": "T10", "longitude": 120.08710569, "latitude": 36.34917893},
{"point": "T11", "longitude": 120.0873865, "latitude": 36.3509885},
{"point": "T12", "longitude": 120.08603613, "latitude": 36.35190217},
{"point": "T13", "longitude": 120.08509148, "latitude": 36.35041247}
{
"point": "T1",
"longitude": 120.0868853,
"latitude": 36.35496367
},
{
"point": "T2",
"longitude": 120.08502054,
"latitude": 36.35448347
},
{
"point": "T3",
"longitude": 120.08341044,
"latitude": 36.35406879
},
{
"point": "T4",
"longitude": 120.08558121,
"latitude": 36.35305878
},
{
"point": "T5",
"longitude": 120.08400957,
"latitude": 36.35265197
},
{
"point": "T6",
"longitude": 120.08649105,
"latitude": 36.35074527
},
{
"point": "T7",
"longitude": 120.08562915,
"latitude": 36.35052372
},
{
"point": "T8",
"longitude": 120.08676664,
"latitude": 36.35004529
},
{
"point": "T9",
"longitude": 120.08520616,
"latitude": 36.34964473
},
{
"point": "T10",
"longitude": 120.08710569,
"latitude": 36.34917893
},
{
"point": "T11",
"longitude": 120.0873865,
"latitude": 36.3509885
},
{
"point": "T12",
"longitude": 120.08603613,
"latitude": 36.35190217
},
{
"point": "T13",
"longitude": 120.08509148,
"latitude": 36.35041247
}
]
},
"data_source": {
@ -104,5 +156,10 @@
"enable_mock_data": false,
"save_raw_data": false,
"profile_performance": false
}
}
},
"traffic_light_server": {
"port": 8082,
"max_connections": 100
},
"simulated_mobile_light_target_intersection_id": "T2路口"
}

View File

@ -90,16 +90,26 @@ public:
int log_interval_ms;
} warning;
// 新增: 红绿灯 HTTP 服务器配置
struct TrafficLightServerConfig {
uint16_t port = 8082; // Default port if not specified in config
int max_connections = 100;
// Can add other relevant configs like timeout etc.
} traffic_light_server;
// 新增: 模拟的移动红绿灯目标路口 ID
std::string simulated_mobile_light_target_intersection_id = "T2路口"; // Default value
static SystemConfig& instance() {
static SystemConfig instance;
return instance;
}
void load(const std::string& filename);
SystemConfig(const SystemConfig&) = delete;
SystemConfig& operator=(const SystemConfig&) = delete;
friend class System;
private:
@ -239,6 +249,22 @@ inline void from_json(const json& j, SystemConfig::Warning& p) {
j.at("log_interval_ms").get_to(p.log_interval_ms);
}
// 新增: TrafficLightServerConfig 的 JSON 序列化/反序列化函数
inline void to_json(json& j, const SystemConfig::TrafficLightServerConfig& p) {
j = json{
{"port", p.port},
{"max_connections", p.max_connections}
// Serialize other fields if added
};
}
inline void from_json(const json& j, SystemConfig::TrafficLightServerConfig& p) {
// Use .value() to provide default values if keys are missing
p.port = j.value("port", static_cast<uint16_t>(8082));
p.max_connections = j.value("max_connections", 100);
// Deserialize other fields if added
}
inline void to_json(json& j, const SystemConfig& config) {
j = json{
{"airport", config.airport},
@ -247,7 +273,9 @@ inline void to_json(json& j, const SystemConfig& config) {
{"collision_detection", config.collision_detection},
{"logging", config.logging},
{"debug", config.debug},
{"warning", config.warning}
{"warning", config.warning},
{"traffic_light_server", config.traffic_light_server},
{"simulated_mobile_light_target_intersection_id", config.simulated_mobile_light_target_intersection_id}
};
}
@ -259,5 +287,7 @@ inline void from_json(const json& j, SystemConfig& config) {
j.at("logging").get_to(config.logging);
j.at("debug").get_to(config.debug);
j.at("warning").get_to(config.warning);
config.traffic_light_server = j.value("traffic_light_server", SystemConfig::TrafficLightServerConfig{});
config.simulated_mobile_light_target_intersection_id = j.value("simulated_mobile_light_target_intersection_id", std::string("T2路口"));
}

View File

@ -6,6 +6,9 @@
#include "utils/Logger.h"
#include "collector/DataCollector.h"
#include "spatial/CoordinateConverter.h"
#include "network/TrafficLightHttpServer.h"
#include "config/SystemConfig.h"
#include "types/TrafficLightTypes.h"
System* System::instance_ = nullptr;
@ -49,6 +52,30 @@ bool System::initialize() {
// 初始化 WebSocket 服务器
ws_server_ = std::make_unique<network::WebSocketServer>(system_config.websocket.port);
ws_thread_ = std::thread([this]() { ws_server_->start(); });
Logger::info("WebSocket server initialized on port ", system_config.websocket.port);
// 新增: 初始化并启动红绿灯 HTTP 服务器
try {
traffic_light_http_server_ = std::make_unique<network::TrafficLightHttpServer>(
system_config.traffic_light_server.port,
system_config.traffic_light_server.max_connections,
".",
*this
);
// Start server in its own thread (start itself handles creating internal threads for io_context)
traffic_light_http_server_thread_ = std::thread([this]() {
try {
traffic_light_http_server_->start(2); // Use 2 threads for IO context
} catch (const std::exception& e) {
Logger::error("TrafficLightHttpServer thread failed to start: ", e.what());
}
});
Logger::info("Traffic light HTTP server initialized on port ", system_config.traffic_light_server.port);
} catch (const std::exception& e) {
Logger::error("Failed to initialize Traffic Light HTTP Server: ", e.what());
// Decide if this failure is critical
return false; // Example: treat as critical
}
// 加载机场区域配置
airportBounds_ = std::make_unique<AirportBounds>("config/airport_bounds.json");
@ -115,35 +142,40 @@ void System::start() {
running_ = true;
dataCollector_->start();
processThread_ = std::thread(&System::processLoop, this);
Logger::info("System started");
Logger::info("System processing loop started");
}
void System::stop() {
if (!running_) {
return;
}
Logger::info("Stopping system...");
running_ = false;
// 停止 WebSocket 服务器
if (ws_server_) {
ws_server_->stop();
}
// 等待线程结束
if (ws_thread_.joinable()) {
ws_thread_.join();
}
if (processThread_.joinable()) {
processThread_.join();
}
// Stop data collector first
if (dataCollector_) {
dataCollector_->stop();
}
Logger::info("System stopped");
// Stop WebSocket server
if (ws_server_) {
ws_server_->stop();
}
if (ws_thread_.joinable()) {
ws_thread_.join();
}
// 新增: Stop Traffic Light HTTP Server
if (traffic_light_http_server_) {
traffic_light_http_server_->stop();
}
if (traffic_light_http_server_thread_.joinable()) {
traffic_light_http_server_thread_.join();
}
// Stop processing loop
if (processThread_.joinable()) {
processThread_.join();
}
Logger::info("System fully stopped.");
}
void System::processLoop() {
@ -739,4 +771,64 @@ bool System::handleSafetyZoneRisk(const Vehicle& vehicle,
broadcastCollisionWarning(risk);
return true;
}
void System::processPushedTrafficLightData(const nlohmann::json& di_data) {
const auto& config = SystemConfig::instance();
const std::string target_id = config.simulated_mobile_light_target_intersection_id;
if (target_id.empty()) {
Logger::warning("Target intersection ID for pushed traffic light data is not configured.");
return;
}
// 使用配置的 ID 查找路口信息
const Intersection* intersection = intersection_config_.findById(target_id);
if (!intersection) {
Logger::warning("Configured target intersection ID ", target_id, " not found.");
return;
}
// 解析 DI 数据获取当前状态 (简化: 只处理南北向)
SignalStatus current_status = SignalStatus::UNKNOWN;
try {
// DI-13: 北绿 (NS Green)
if (di_data.contains("DI-13") && di_data.at("DI-13").get<int>() == 1) {
current_status = SignalStatus::GREEN;
}
// DI-12: 北黄 (NS Yellow)
else if (di_data.contains("DI-12") && di_data.at("DI-12").get<int>() == 1) {
current_status = SignalStatus::YELLOW;
}
// DI-11: 北红 (NS Red)
else if (di_data.contains("DI-11") && di_data.at("DI-11").get<int>() == 1) {
current_status = SignalStatus::RED;
}
// 可以添加东西向 DI-14 (E/W Red), DI-15 (E/W Yellow), DI-16 (E/W Green) 的解析逻辑
else {
Logger::warning("Received DI data has unknown or invalid N/S state: ", di_data.dump());
// 可以选择不广播或广播 UNKNOWN
// return; // 如果状态无效则不广播
}
} catch (const json::exception& e) {
Logger::error("Error parsing DI data JSON: ", e.what(), ", Data: ", di_data.dump());
return;
}
// 创建 TrafficLightSignal 结构体
TrafficLightSignal signal;
signal.trafficLightId = intersection->trafficLightId; // 使用找到的路口的红绿灯 ID
signal.status = current_status;
signal.timestamp = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::system_clock::now().time_since_epoch())
.count();
signal.intersectionId = intersection->id;
signal.latitude = intersection->position.latitude;
signal.longitude = intersection->position.longitude;
Logger::debug("Processing pushed traffic light data for intersection: ", intersection->id,
", status: ", static_cast<int>(signal.status));
// 广播信号状态
broadcastTrafficLightStatus(signal);
}

View File

@ -15,6 +15,7 @@
#include "network/WebSocketServer.h"
#include "network/MessageTypes.h"
#include "config/IntersectionConfig.h"
#include "network/TrafficLightHttpServer.h"
// 前向声明
class DataCollector;
@ -42,6 +43,9 @@ public:
const SystemConfig& getSystemConfig() const { return SystemConfig::instance(); }
const IntersectionConfig& getIntersectionConfig() const { return intersection_config_; }
// 新增: 处理推送的红绿灯数据
void processPushedTrafficLightData(const nlohmann::json& di_data);
private:
void processLoop();
void processCollisions(const std::vector<CollisionRisk>& detectedRisks,
@ -87,6 +91,10 @@ private:
std::unique_ptr<network::WebSocketServer> ws_server_;
std::thread ws_thread_;
// 新增: 红绿灯 HTTP 服务器
std::unique_ptr<network::TrafficLightHttpServer> traffic_light_http_server_;
std::thread traffic_light_http_server_thread_;
// 路口配置
IntersectionConfig intersection_config_;

View File

@ -0,0 +1,371 @@
#include "TrafficLightHttpServer.h"
#include "utils/Logger.h"
#include "core/System.h"
#include <nlohmann/json.hpp>
#include <boost/asio/signal_set.hpp>
#include <boost/asio/strand.hpp>
#include <boost/asio/bind_executor.hpp>
#include <boost/beast/core.hpp>
#include <boost/beast/http.hpp>
#include <boost/beast/version.hpp>
#include <boost/bind/bind.hpp>
#include <iostream>
#include <vector>
#include <memory>
#include <string>
namespace network {
namespace beast = boost::beast;
namespace http = beast::http;
namespace net = boost::asio;
using tcp = boost::asio::ip::tcp;
using json = nlohmann::json;
// --- Forward Declarations within namespace ---
// 不再需要,因为类定义直接在下面
// class TrafficLightSession;
// class TrafficLightListener;
//---------------- Helper Functions for HTTP Responses ------------------
// Returns a bad request response
http::response<http::string_body> bad_request(const http::request<http::string_body>& req, beast::string_view why) {
http::response<http::string_body> res{ http::status::bad_request, req.version() };
res.set(http::field::server, BOOST_BEAST_VERSION_STRING);
res.set(http::field::content_type, "text/html");
res.keep_alive(req.keep_alive());
res.body() = std::string(why);
res.prepare_payload();
return res;
}
// Returns a not found response
http::response<http::string_body> not_found(const http::request<http::string_body>& req) {
http::response<http::string_body> res{ http::status::not_found, req.version() };
res.set(http::field::server, BOOST_BEAST_VERSION_STRING);
res.set(http::field::content_type, "text/html");
res.keep_alive(req.keep_alive());
res.body() = "The resource '" + std::string(req.target()) + "' was not found.";
res.prepare_payload();
return res;
}
// Returns a server error response
http::response<http::string_body> server_error(const http::request<http::string_body>& req, beast::string_view what) {
http::response<http::string_body> res{ http::status::internal_server_error, req.version() };
res.set(http::field::server, BOOST_BEAST_VERSION_STRING);
res.set(http::field::content_type, "text/html");
res.keep_alive(req.keep_alive());
res.body() = "An error occurred: '" + std::string(what) + "'";
res.prepare_payload();
return res;
}
// Returns a method not allowed response
http::response<http::string_body> method_not_allowed(const http::request<http::string_body>& req) {
http::response<http::string_body> res{ http::status::method_not_allowed, req.version() };
res.set(http::field::server, BOOST_BEAST_VERSION_STRING);
res.set(http::field::content_type, "text/html");
res.set(http::field::allow, "POST");
res.keep_alive(req.keep_alive());
res.body() = "Method Not Allowed. Allowed methods: POST";
res.prepare_payload();
return res;
}
// Returns a success response (200 OK)
http::response<http::string_body> ok_response(const http::request<http::string_body>& req, const std::string& message = "OK") {
http::response<http::string_body> res{ http::status::ok, req.version() };
res.set(http::field::server, BOOST_BEAST_VERSION_STRING);
res.set(http::field::content_type, "application/json");
res.keep_alive(req.keep_alive());
res.body() = "{\"status\": \"success\", \"message\": \"" + message + "\"}";
res.prepare_payload();
return res;
}
//---------------- Internal Session Class (Definition & Implementation) ----------------------
// Handles an HTTP server connection. Defined entirely within the .cpp file.
class TrafficLightSession : public std::enable_shared_from_this<TrafficLightSession>
{
tcp::socket socket_;
beast::flat_buffer buffer_;
std::string const& doc_root_; // Keep for potential future use
http::request<http::string_body> req_;
// Strand to ensure sequential execution of handlers for this session.
net::strand<net::io_context::executor_type> strand_;
net::io_context& ioc_; // 新增: 持有 io_context 引用
System& system_; // 新增: System 引用
public:
// Take ownership of the socket
explicit TrafficLightSession(tcp::socket&& socket, std::string const& doc_root, net::io_context& ioc, System& system_ref)
: socket_(std::move(socket)),
doc_root_(doc_root),
strand_(net::make_strand(ioc.get_executor())),
ioc_(ioc),
system_(system_ref) {}
void run() {
net::dispatch(strand_, beast::bind_front_handler(&TrafficLightSession::do_read, shared_from_this()));
}
private:
void do_read() {
req_ = {};
http::async_read(socket_, buffer_, req_,
net::bind_executor(strand_,
beast::bind_front_handler(&TrafficLightSession::on_read, shared_from_this())
));
}
void on_read(beast::error_code ec, std::size_t bytes_transferred) {
boost::ignore_unused(bytes_transferred);
// Connection closed by peer
if (ec == http::error::end_of_stream) return do_close();
if (ec) return Logger::error("Session Read Error: ", ec.message());
handle_request();
}
void handle_request() {
// Respond to HEAD request
if (req_.method() == http::verb::head) {
auto res = ok_response(req_, "");
res.body() = ""; // No body for HEAD
return send_response(std::move(res));
}
// Only allow POST requests to the specific endpoint
if (req_.method() != http::verb::post) {
return send_response(method_not_allowed(req_));
}
if (req_.target() != "/trafficlight") {
return send_response(not_found(req_));
}
// Attempt to parse the JSON body
try {
json body_json = json::parse(req_.body());
// Log the received data
Logger::info("接收到红绿灯数据:");
std::string log_msg = "红绿灯状态: ";
bool first = true;
for (const auto& key : { "DI-11", "DI-12", "DI-13", "DI-14", "DI-15", "DI-16" }) {
if (body_json.contains(key)) {
if (!first) log_msg += ", ";
log_msg += key;
log_msg += ":";
// Safely convert JSON value to string, handling potential non-string types
try {
log_msg += body_json[key].dump();
} catch (const json::type_error& te) {
log_msg += "[invalid_type]";
Logger::warning("JSON value for key ", key, " is not easily dumpable: ", te.what());
}
first = false;
}
}
if (first) { // If no relevant keys were found
log_msg += body_json.dump(); // Log the full body
}
Logger::info(log_msg);
// TODO: Integrate data processing (e.g., pass to DataCollector)
// For now, just logging and sending OK.
system_.processPushedTrafficLightData(body_json);
return send_response(ok_response(req_, "Data received and processed"));
} catch (const json::parse_error& e) {
Logger::error("JSON Parsing Error: ", e.what(), ", Body: ", req_.body());
return send_response(bad_request(req_, "Invalid JSON format"));
} catch (const std::exception& e) {
Logger::error("Error processing request: ", e.what());
return send_response(server_error(req_, "Internal server error processing request"));
}
}
void send_response(http::response<http::string_body>&& res) {
auto sp = std::make_shared<http::response<http::string_body>>(std::move(res));
http::async_write(socket_, *sp,
net::bind_executor(strand_,
[self = shared_from_this(), sp](beast::error_code ec, std::size_t bytes) {
self->on_write(sp->need_eof(), ec, bytes);
}
));
}
void on_write(bool close, beast::error_code ec, std::size_t bytes_transferred) {
boost::ignore_unused(bytes_transferred);
if (ec) return Logger::error("Session Write Error: ", ec.message());
if (close) return do_close();
do_read();
}
void do_close() {
beast::error_code ec;
socket_.shutdown(tcp::socket::shutdown_send, ec);
}
};
//---------------- Internal Listener Class (Definition & Implementation) ---------------------
// Accepts incoming connections and launches sessions. Defined entirely within the .cpp file.
class TrafficLightListener : public std::enable_shared_from_this<TrafficLightListener>
{
net::io_context& ioc_;
tcp::acceptor acceptor_;
std::string const& doc_root_;
int max_connections_; // 保留这个,用于拒绝连接
System& system_; // 新增: System 引用
public:
TrafficLightListener(
net::io_context& ioc,
tcp::endpoint endpoint,
int max_connections,
std::string const& doc_root,
System& system_ref)
: ioc_(ioc),
acceptor_(ioc),
doc_root_(doc_root),
max_connections_(max_connections),
system_(system_ref) {
beast::error_code ec;
// Open the acceptor
acceptor_.open(endpoint.protocol(), ec);
if (ec) { Logger::error("Listener Open Error: ", ec.message()); throw beast::system_error{ ec }; }
// Allow address reuse
acceptor_.set_option(net::socket_base::reuse_address(true), ec);
if (ec) { Logger::error("Listener Set Option Error: ", ec.message()); throw beast::system_error{ ec }; }
// Bind to the server address
acceptor_.bind(endpoint, ec);
if (ec) { Logger::error("Listener Bind Error: ", ec.message()); throw beast::system_error{ ec }; }
// Start listening for connections
acceptor_.listen(net::socket_base::max_listen_connections, ec);
if (ec) { Logger::error("Listener Listen Error: ", ec.message()); throw beast::system_error{ ec }; }
}
void run() {
do_accept();
}
void stop() {
beast::error_code ec_cancel, ec_close;
// 取消所有挂起的异步接受操作
acceptor_.cancel(ec_cancel);
// 关闭 acceptor
acceptor_.close(ec_close);
if (ec_cancel) Logger::warning("Listener cancel error: ", ec_cancel.message());
if (ec_close) Logger::warning("Listener close error: ", ec_close.message());
}
private:
void do_accept() {
acceptor_.async_accept(
net::bind_executor(acceptor_.get_executor(),
beast::bind_front_handler(
&TrafficLightListener::on_accept,
shared_from_this())));
}
void on_accept(beast::error_code ec, tcp::socket socket) {
if (ec) {
if (ec != net::error::operation_aborted)
Logger::error("Listener Accept Error: ", ec.message());
return;
} else {
// 移除连接数检查简化注释
Logger::info("Accepted connection from ", socket.remote_endpoint());
std::make_shared<TrafficLightSession>(
std::move(socket),
doc_root_,
ioc_,
system_
)->run();
}
do_accept();
}
};
//---------------- TrafficLightHttpServer Implementation -------------------
TrafficLightHttpServer::TrafficLightHttpServer(uint16_t port, int max_connections, const std::string& doc_root, System& system_ref)
: port_(port),
doc_root_(doc_root),
max_connections_(max_connections),
ioc_(1),
system_(system_ref) {}
TrafficLightHttpServer::~TrafficLightHttpServer() {
if (running_.load()) {
stop();
}
}
void TrafficLightHttpServer::start(int num_threads) {
if (running_.exchange(true)) {
Logger::warning("TrafficLightHttpServer already started.");
return;
}
// ... (确保 num_threads >= 1)
auto const address = net::ip::make_address("0.0.0.0");
auto const port = port_;
try {
listener_ = std::make_shared<TrafficLightListener>(
ioc_,
tcp::endpoint{ address, port },
max_connections_,
doc_root_,
system_
);
listener_->run();
Logger::info("TrafficLightHttpServer starting on ", address.to_string(), ":", port, " with ", num_threads, " threads.");
threads_.reserve(num_threads);
for (int i = 0; i < num_threads; ++i) {
threads_.emplace_back([this] { run_ioc(); });
}
} catch (const std::exception& e) {
Logger::error("Failed to start TrafficLightHttpServer: ", e.what());
running_ = false;
throw;
}
}
void TrafficLightHttpServer::stop() {
if (!running_.exchange(false)) { return; }
Logger::info("Stopping TrafficLightHttpServer...");
net::post(ioc_, [this]() {
if (listener_) {
listener_->stop();
listener_.reset();
}
});
ioc_.stop();
for (auto& thread : threads_) {
if (thread.joinable()) { thread.join(); }
}
threads_.clear();
Logger::info("TrafficLightHttpServer stopped.");
}
void TrafficLightHttpServer::run_ioc() {
Logger::debug("TrafficLightHttpServer I/O thread started.");
try {
ioc_.run();
} catch (const std::exception& e) {
Logger::error("Exception in TrafficLightHttpServer I/O thread: ", e.what());
} catch (...) {
Logger::error("Unknown exception in TrafficLightHttpServer I/O thread.");
}
Logger::debug("TrafficLightHttpServer I/O thread finished.");
}
} // namespace network

View File

@ -0,0 +1,64 @@
#pragma once
#include <boost/beast/core.hpp>
#include <boost/beast/http.hpp>
#include <boost/beast/version.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/config.hpp>
#include <cstdlib>
#include <iostream>
#include <memory>
#include <string>
#include <thread>
#include <vector>
#include <atomic>
#include <mutex>
#include <cstdint> // For uint16_t
#include <nlohmann/json.hpp>
// Forward declare Logger if header is heavy, or include if needed often
// #include "utils/Logger.h"
// 前向声明 System 类
class System;
namespace beast = boost::beast; // from <boost/beast.hpp>
namespace http = beast::http; // from <boost/beast/http.hpp>
namespace net = boost::asio; // from <boost/asio.hpp>
using tcp = boost::asio::ip::tcp; // from <boost/asio/ip/tcp.hpp>
using json = nlohmann::json;
namespace network {
// 前向声明内部类 (如果 TrafficLightHttpServer 需要引用它)
class TrafficLightListener;
// class TrafficLightSession; // Session 通常由 Listener 创建和管理Server 可能不需要直接引用
// Main server class (只保留这个类的声明)
class TrafficLightHttpServer
{
uint16_t port_;
std::string doc_root_;
int max_connections_;
net::io_context ioc_;
// 使用指向内部 Listener 实现的指针
std::shared_ptr<TrafficLightListener> listener_;
std::vector<std::thread> threads_;
std::atomic<bool> running_{ false };
System& system_; // 新增: System 引用
public:
TrafficLightHttpServer(uint16_t port, int max_connections, const std::string& doc_root, System& system_ref);
~TrafficLightHttpServer();
TrafficLightHttpServer(const TrafficLightHttpServer&) = delete;
TrafficLightHttpServer& operator=(const TrafficLightHttpServer&) = delete;
void start(int num_threads = 1);
void stop();
private:
void run_ioc();
};
}

View File

@ -886,7 +886,7 @@ def get_vehicle_positions():
try:
# 统一处理红绿灯状态切换
switch_traffic_light_state()
# switch_traffic_light_state() # 采用模拟的红绿灯脚本,注释掉此处的调用
# 只在达到更新间隔时更新位置
if elapsed_time >= UPDATE_INTERVAL:
@ -911,21 +911,33 @@ def get_vehicle_positions():
@app.route('/openApi/getTrafficLightSignals', methods=['GET', 'OPTIONS'])
def get_traffic_light_signals():
"""获取红绿灯信号状态"""
"""获取红绿灯信号状态 (旧的轮询接口)"""
if request.method == 'OPTIONS':
return '', 204
# 更新红绿灯状态
switch_traffic_light_state()
# 更新时间戳
for signal in traffic_light_data:
signal["timestamp"] = int(time.time() * 1000)
# --- 注释掉原有逻辑 (2025-05-06 by AI Assistant) ---
# 原因: 为了测试新的 TrafficLightHttpServer 推送机制,暂时禁用此轮询接口返回数据。
# 模拟和发送红绿灯状态的功能现在由 tools/mock_traffic_light_client.py 负责。
# 如需恢复轮询测试,请取消下面的注释并注释掉新的 return 语句。
# # 更新红绿灯状态
# # switch_traffic_light_state() # 采用模拟的红绿灯脚本,注释掉此处的调用
#
# # 更新时间戳
# for signal in traffic_light_data:
# signal[\"timestamp\"] = int(time.time() * 1000)
#
# return jsonify({
# \"status\": 200,
# \"msg\": \"获取红绿灯信号成功\",
# \"data\": traffic_light_data
# })
# --- 注释结束 ---
# 新增: 返回空数据,以禁用轮询数据源
return jsonify({
"status": 200,
"msg": "获取红绿灯信号成功",
"data": traffic_light_data
"msg": "Traffic light polling endpoint disabled to test push mechanism.",
"data": []
})
@app.after_request

View File

@ -0,0 +1,109 @@
import time
import json
import urllib.request
import urllib.error
import logging
import os
import sys
# --- 配置 ---
CPP_SERVER_URL = "http://localhost:8082/trafficlight" # C++ TrafficLightHttpServer 的地址和端口
SEND_INTERVAL = 1.0 # 发送间隔(秒)
# 红绿灯循环周期 (North/South State, East/West State, Duration in seconds)
TRAFFIC_LIGHT_CYCLE = [
("GREEN", "RED", 10),
("YELLOW", "RED", 2),
("RED", "GREEN", 10),
("RED", "YELLOW", 2),
]
# --- 日志设置 ---
LOG_DIR = 'logs'
if not os.path.exists(LOG_DIR):
try:
os.makedirs(LOG_DIR)
except OSError as e:
print(f"Error creating log directory {LOG_DIR}: {e}", file=sys.stderr)
LOG_DIR = '.' # Fallback to current directory
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(os.path.join(LOG_DIR, 'mock_traffic_light_client.log')),
logging.StreamHandler() # 同时输出到控制台
]
)
# --- 全局状态 ---
current_cycle_index = 0
last_cycle_switch_time = time.time()
# --- 辅助函数 ---
def generate_di_payload(ns_state, ew_state):
"""根据南北和东西状态生成 DI 信号 Payload。"""
# 初始化所有 DI 信号为 0
payload = {f"DI-{i:02d}": 0 for i in range(1, 19)}
# 设置南北向状态 (DI-11: 北红, DI-12: 北黄, DI-13: 北绿)
if ns_state == "RED": payload["DI-11"] = 1
elif ns_state == "YELLOW": payload["DI-12"] = 1
elif ns_state == "GREEN": payload["DI-13"] = 1
# 设置东西向状态 (DI-14: 东红, DI-15: 东黄, DI-16: 东绿)
if ew_state == "RED": payload["DI-14"] = 1
elif ew_state == "YELLOW": payload["DI-15"] = 1
elif ew_state == "GREEN": payload["DI-16"] = 1
return payload
def send_traffic_light_update(url, payload):
"""使用 urllib 发送 POST 请求。"""
try:
data = json.dumps(payload).encode('utf-8')
req = urllib.request.Request(url, data=data, headers={'Content-Type': 'application/json'}, method='POST')
# 设置一个较短的超时时间,避免长时间阻塞
with urllib.request.urlopen(req, timeout=0.8) as response:
logging.debug(f"Sent: {payload}, Status: {response.status}")
# 可以选择性地读取响应内容
# response.read()
except (urllib.error.URLError, TimeoutError, ConnectionRefusedError) as e:
logging.error(f"Failed to send to {url}: {e}")
except Exception as e:
logging.error(f"Unexpected error during send: {e}")
# --- 主循环 ---
def traffic_light_sender_loop():
"""主循环,管理状态切换并发送数据。"""
global current_cycle_index, last_cycle_switch_time
logging.info(f"Starting traffic light client, sending to {CPP_SERVER_URL}")
while True:
try:
now = time.time()
# 检查是否需要切换循环阶段
current_ns_state, current_ew_state, duration = TRAFFIC_LIGHT_CYCLE[current_cycle_index]
if now - last_cycle_switch_time >= duration:
current_cycle_index = (current_cycle_index + 1) % len(TRAFFIC_LIGHT_CYCLE)
last_cycle_switch_time = now
next_ns_state, next_ew_state, _ = TRAFFIC_LIGHT_CYCLE[current_cycle_index]
logging.info(f"Switching phase: {current_ns_state}/{current_ew_state} -> {next_ns_state}/{next_ew_state}")
# 更新当前状态以供本次发送
current_ns_state, current_ew_state, _ = TRAFFIC_LIGHT_CYCLE[current_cycle_index]
# 生成 Payload 并发送
payload = generate_di_payload(current_ns_state, current_ew_state)
send_traffic_light_update(CPP_SERVER_URL, payload)
# 等待指定间隔
time.sleep(SEND_INTERVAL)
except KeyboardInterrupt:
logging.info("Traffic light client stopped by user.")
break
except Exception as e:
logging.error(f"Error in main loop: {e}", exc_info=True)
# 等待一段时间再重试,避免快速失败循环
time.sleep(5)
# --- 启动入口 ---
if __name__ == '__main__':
traffic_light_sender_loop()