109 lines
4.2 KiB
Python
109 lines
4.2 KiB
Python
import time
|
|
import json
|
|
import urllib.request
|
|
import urllib.error
|
|
import logging
|
|
import os
|
|
import sys
|
|
|
|
# --- 配置 ---
|
|
CPP_SERVER_URL = "http://localhost:8082/trafficlight" # C++ TrafficLightHttpServer 的地址和端口
|
|
SEND_INTERVAL = 1.0 # 发送间隔(秒)
|
|
|
|
# 红绿灯循环周期 (North/South State, East/West State, Duration in seconds)
|
|
TRAFFIC_LIGHT_CYCLE = [
|
|
("GREEN", "RED", 10),
|
|
("YELLOW", "RED", 2),
|
|
("RED", "GREEN", 10),
|
|
("RED", "YELLOW", 2),
|
|
]
|
|
|
|
# --- 日志设置 ---
|
|
LOG_DIR = 'logs'
|
|
if not os.path.exists(LOG_DIR):
|
|
try:
|
|
os.makedirs(LOG_DIR)
|
|
except OSError as e:
|
|
print(f"Error creating log directory {LOG_DIR}: {e}", file=sys.stderr)
|
|
LOG_DIR = '.' # Fallback to current directory
|
|
|
|
logging.basicConfig(
|
|
level=logging.DEBUG,
|
|
format='%(asctime)s - %(levelname)s - %(message)s',
|
|
handlers=[
|
|
logging.FileHandler(os.path.join(LOG_DIR, 'mock_traffic_light_client.log')),
|
|
logging.StreamHandler() # 同时输出到控制台
|
|
]
|
|
)
|
|
|
|
# --- 全局状态 ---
|
|
current_cycle_index = 0
|
|
last_cycle_switch_time = time.time()
|
|
|
|
# --- 辅助函数 ---
|
|
def generate_di_payload(ns_state, ew_state):
|
|
"""根据南北和东西状态生成 DI 信号 Payload。"""
|
|
# 初始化所有 DI 信号为 0
|
|
payload = {f"DI-{i:02d}": 0 for i in range(1, 19)}
|
|
# 设置南北向状态 (DI-11: 北红, DI-12: 北黄, DI-13: 北绿)
|
|
if ns_state == "RED": payload["DI-11"] = 1
|
|
elif ns_state == "YELLOW": payload["DI-12"] = 1
|
|
elif ns_state == "GREEN": payload["DI-13"] = 1
|
|
# 设置东西向状态 (DI-14: 东红, DI-15: 东黄, DI-16: 东绿)
|
|
if ew_state == "RED": payload["DI-14"] = 1
|
|
elif ew_state == "YELLOW": payload["DI-15"] = 1
|
|
elif ew_state == "GREEN": payload["DI-16"] = 1
|
|
return payload
|
|
|
|
def send_traffic_light_update(url, payload):
|
|
"""使用 urllib 发送 POST 请求。"""
|
|
try:
|
|
data = json.dumps(payload).encode('utf-8')
|
|
req = urllib.request.Request(url, data=data, headers={'Content-Type': 'application/json'}, method='POST')
|
|
# 设置一个较短的超时时间,避免长时间阻塞
|
|
with urllib.request.urlopen(req, timeout=0.8) as response:
|
|
logging.debug(f"Sent: {payload}, Status: {response.status}")
|
|
# 可以选择性地读取响应内容
|
|
# response.read()
|
|
except (urllib.error.URLError, TimeoutError, ConnectionRefusedError) as e:
|
|
logging.error(f"Failed to send to {url}: {e}")
|
|
except Exception as e:
|
|
logging.error(f"Unexpected error during send: {e}")
|
|
|
|
# --- 主循环 ---
|
|
def traffic_light_sender_loop():
|
|
"""主循环,管理状态切换并发送数据。"""
|
|
global current_cycle_index, last_cycle_switch_time
|
|
logging.info(f"Starting traffic light client, sending to {CPP_SERVER_URL}")
|
|
while True:
|
|
try:
|
|
now = time.time()
|
|
|
|
# 检查是否需要切换循环阶段
|
|
current_ns_state, current_ew_state, duration = TRAFFIC_LIGHT_CYCLE[current_cycle_index]
|
|
if now - last_cycle_switch_time >= duration:
|
|
current_cycle_index = (current_cycle_index + 1) % len(TRAFFIC_LIGHT_CYCLE)
|
|
last_cycle_switch_time = now
|
|
next_ns_state, next_ew_state, _ = TRAFFIC_LIGHT_CYCLE[current_cycle_index]
|
|
logging.info(f"Switching phase: {current_ns_state}/{current_ew_state} -> {next_ns_state}/{next_ew_state}")
|
|
# 更新当前状态以供本次发送
|
|
current_ns_state, current_ew_state, _ = TRAFFIC_LIGHT_CYCLE[current_cycle_index]
|
|
|
|
# 生成 Payload 并发送
|
|
payload = generate_di_payload(current_ns_state, current_ew_state)
|
|
send_traffic_light_update(CPP_SERVER_URL, payload)
|
|
|
|
# 等待指定间隔
|
|
time.sleep(SEND_INTERVAL)
|
|
|
|
except KeyboardInterrupt:
|
|
logging.info("Traffic light client stopped by user.")
|
|
break
|
|
except Exception as e:
|
|
logging.error(f"Error in main loop: {e}", exc_info=True)
|
|
# 等待一段时间再重试,避免快速失败循环
|
|
time.sleep(5)
|
|
|
|
# --- 启动入口 ---
|
|
if __name__ == '__main__':
|
|
traffic_light_sender_loop() |