import time import json import urllib.request import urllib.error import logging import os import sys # --- 配置 --- CPP_SERVER_URL = "http://localhost:8082/trafficlight" # C++ TrafficLightHttpServer 的地址和端口 SEND_INTERVAL = 1.0 # 发送间隔(秒) # 红绿灯循环周期 (North/South State, East/West State, Duration in seconds) TRAFFIC_LIGHT_CYCLE = [ ("GREEN", "RED", 10), ("YELLOW", "RED", 2), ("RED", "GREEN", 10), ("RED", "YELLOW", 2), ] # --- 日志设置 --- LOG_DIR = 'logs' if not os.path.exists(LOG_DIR): try: os.makedirs(LOG_DIR) except OSError as e: print(f"Error creating log directory {LOG_DIR}: {e}", file=sys.stderr) LOG_DIR = '.' # Fallback to current directory logging.basicConfig( level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler(os.path.join(LOG_DIR, 'mock_traffic_light_client.log')), logging.StreamHandler() # 同时输出到控制台 ] ) # --- 全局状态 --- current_cycle_index = 0 last_cycle_switch_time = time.time() # --- 辅助函数 --- def generate_di_payload(ns_state, ew_state): """根据南北和东西状态生成 DI 信号 Payload。""" # 初始化所有 DI 信号为 0 payload = {f"DI-{i:02d}": 0 for i in range(1, 19)} # 设置南北向状态 (DI-11: 北红, DI-12: 北黄, DI-13: 北绿) if ns_state == "RED": payload["DI-11"] = 1 elif ns_state == "YELLOW": payload["DI-12"] = 1 elif ns_state == "GREEN": payload["DI-13"] = 1 # 设置东西向状态 (DI-14: 东红, DI-15: 东黄, DI-16: 东绿) if ew_state == "RED": payload["DI-14"] = 1 elif ew_state == "YELLOW": payload["DI-15"] = 1 elif ew_state == "GREEN": payload["DI-16"] = 1 return payload def send_traffic_light_update(url, payload): """使用 urllib 发送 POST 请求。""" try: data = json.dumps(payload).encode('utf-8') req = urllib.request.Request(url, data=data, headers={'Content-Type': 'application/json'}, method='POST') # 设置一个较短的超时时间,避免长时间阻塞 with urllib.request.urlopen(req, timeout=0.8) as response: logging.debug(f"Sent: {payload}, Status: {response.status}") # 可以选择性地读取响应内容 # response.read() except (urllib.error.URLError, TimeoutError, ConnectionRefusedError) as e: logging.error(f"Failed to send to {url}: {e}") except Exception as e: logging.error(f"Unexpected error during send: {e}") # --- 主循环 --- def traffic_light_sender_loop(): """主循环,管理状态切换并发送数据。""" global current_cycle_index, last_cycle_switch_time logging.info(f"Starting traffic light client, sending to {CPP_SERVER_URL}") while True: try: now = time.time() # 检查是否需要切换循环阶段 current_ns_state, current_ew_state, duration = TRAFFIC_LIGHT_CYCLE[current_cycle_index] if now - last_cycle_switch_time >= duration: current_cycle_index = (current_cycle_index + 1) % len(TRAFFIC_LIGHT_CYCLE) last_cycle_switch_time = now next_ns_state, next_ew_state, _ = TRAFFIC_LIGHT_CYCLE[current_cycle_index] logging.info(f"Switching phase: {current_ns_state}/{current_ew_state} -> {next_ns_state}/{next_ew_state}") # 更新当前状态以供本次发送 current_ns_state, current_ew_state, _ = TRAFFIC_LIGHT_CYCLE[current_cycle_index] # 生成 Payload 并发送 payload = generate_di_payload(current_ns_state, current_ew_state) send_traffic_light_update(CPP_SERVER_URL, payload) # 等待指定间隔 time.sleep(SEND_INTERVAL) except KeyboardInterrupt: logging.info("Traffic light client stopped by user.") break except Exception as e: logging.error(f"Error in main loop: {e}", exc_info=True) # 等待一段时间再重试,避免快速失败循环 time.sleep(5) # --- 启动入口 --- if __name__ == '__main__': traffic_light_sender_loop()