CollisionAvoidance/tools/mock_traffic_light_client.py

109 lines
4.2 KiB
Python

import time
import json
import urllib.request
import urllib.error
import logging
import os
import sys
# --- 配置 ---
CPP_SERVER_URL = "http://localhost:8082/trafficlight" # C++ TrafficLightHttpServer 的地址和端口
SEND_INTERVAL = 1.0 # 发送间隔(秒)
# 红绿灯循环周期 (North/South State, East/West State, Duration in seconds)
TRAFFIC_LIGHT_CYCLE = [
("GREEN", "RED", 10),
("YELLOW", "RED", 2),
("RED", "GREEN", 10),
("RED", "YELLOW", 2),
]
# --- 日志设置 ---
LOG_DIR = 'logs'
if not os.path.exists(LOG_DIR):
try:
os.makedirs(LOG_DIR)
except OSError as e:
print(f"Error creating log directory {LOG_DIR}: {e}", file=sys.stderr)
LOG_DIR = '.' # Fallback to current directory
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(os.path.join(LOG_DIR, 'mock_traffic_light_client.log')),
logging.StreamHandler() # 同时输出到控制台
]
)
# --- 全局状态 ---
current_cycle_index = 0
last_cycle_switch_time = time.time()
# --- 辅助函数 ---
def generate_di_payload(ns_state, ew_state):
"""根据南北和东西状态生成 DI 信号 Payload。"""
# 初始化所有 DI 信号为 0
payload = {f"DI-{i:02d}": 0 for i in range(1, 19)}
# 设置南北向状态 (DI-11: 北红, DI-12: 北黄, DI-13: 北绿)
if ns_state == "RED": payload["DI-11"] = 1
elif ns_state == "YELLOW": payload["DI-12"] = 1
elif ns_state == "GREEN": payload["DI-13"] = 1
# 设置东西向状态 (DI-14: 东红, DI-15: 东黄, DI-16: 东绿)
if ew_state == "RED": payload["DI-14"] = 1
elif ew_state == "YELLOW": payload["DI-15"] = 1
elif ew_state == "GREEN": payload["DI-16"] = 1
return payload
def send_traffic_light_update(url, payload):
"""使用 urllib 发送 POST 请求。"""
try:
data = json.dumps(payload).encode('utf-8')
req = urllib.request.Request(url, data=data, headers={'Content-Type': 'application/json'}, method='POST')
# 设置一个较短的超时时间,避免长时间阻塞
with urllib.request.urlopen(req, timeout=0.8) as response:
logging.debug(f"Sent: {payload}, Status: {response.status}")
# 可以选择性地读取响应内容
# response.read()
except (urllib.error.URLError, TimeoutError, ConnectionRefusedError) as e:
logging.error(f"Failed to send to {url}: {e}")
except Exception as e:
logging.error(f"Unexpected error during send: {e}")
# --- 主循环 ---
def traffic_light_sender_loop():
"""主循环,管理状态切换并发送数据。"""
global current_cycle_index, last_cycle_switch_time
logging.info(f"Starting traffic light client, sending to {CPP_SERVER_URL}")
while True:
try:
now = time.time()
# 检查是否需要切换循环阶段
current_ns_state, current_ew_state, duration = TRAFFIC_LIGHT_CYCLE[current_cycle_index]
if now - last_cycle_switch_time >= duration:
current_cycle_index = (current_cycle_index + 1) % len(TRAFFIC_LIGHT_CYCLE)
last_cycle_switch_time = now
next_ns_state, next_ew_state, _ = TRAFFIC_LIGHT_CYCLE[current_cycle_index]
logging.info(f"Switching phase: {current_ns_state}/{current_ew_state} -> {next_ns_state}/{next_ew_state}")
# 更新当前状态以供本次发送
current_ns_state, current_ew_state, _ = TRAFFIC_LIGHT_CYCLE[current_cycle_index]
# 生成 Payload 并发送
payload = generate_di_payload(current_ns_state, current_ew_state)
send_traffic_light_update(CPP_SERVER_URL, payload)
# 等待指定间隔
time.sleep(SEND_INTERVAL)
except KeyboardInterrupt:
logging.info("Traffic light client stopped by user.")
break
except Exception as e:
logging.error(f"Error in main loop: {e}", exc_info=True)
# 等待一段时间再重试,避免快速失败循环
time.sleep(5)
# --- 启动入口 ---
if __name__ == '__main__':
traffic_light_sender_loop()