#!/usr/bin/env python3 """ 红绿灯设备模拟器 此脚本模拟真实的红绿灯硬件设备,通过TCP连接发送纯JSON格式的DI信号数据。 服务器会从TCP连接中获取设备的IP地址和端口信息。 使用方法: python test_traffic_light.py --host localhost --port 8082 --interval 1 """ import socket import time import json import argparse from datetime import datetime import signal from typing import TypedDict # 导入统一的日志配置 import sys import os sys.path.append(os.path.dirname(os.path.abspath(__file__))) from logging_config import setup_logger # 设置当前模块的日志记录器 logger = setup_logger( name='mock_traffic_light', log_file='../logs/mock_traffic_light.log', max_bytes=10*1024*1024, # 10MB backup_count=5 ) class TLState(TypedDict): name: str duration: int ns: tuple[int, int, int] ew: tuple[int, int, int] class TrafficLightDevice: """红绿灯设备模拟器""" def __init__(self, host='localhost', port=8082, interval=1): self.host = host self.port = port self.interval = interval # 发送间隔(秒) self.running = False self.socket = None def create_di_signal(self, ns_red: int = 0, ns_yellow: int = 0, ns_green: int = 0, ew_red: int = 0, ew_yellow: int = 0, ew_green: int = 0): """ 创建红绿灯DI信号数据(纯JSON格式) Args: ns_red: 南北红灯状态 (0或1) ns_yellow: 南北黄灯状态 (0或1) ns_green: 南北绿灯状态 (0或1) ew_red: 东西红灯状态 (0或1) ew_yellow: 东西黄灯状态 (0或1) ew_green: 东西绿灯状态 (0或1) Returns: JSON格式的DI信号字符串 """ # DI信号数据(真实红绿灯设备发送的格式) di_data = { "DI-01": 0, "DI-02": 0, "DI-11": ns_red, # 北红 "DI-12": ns_yellow, # 北黄 "DI-13": ns_green, # 北绿 "DI-14": ew_red, # 东红 "DI-15": ew_yellow, # 东黄 "DI-16": ew_green, # 东绿 "DI-17": 0, "DI-18": 0 } return json.dumps(di_data, separators=(',', ':')) def connect(self): """连接到服务器""" try: # 如果已有连接,先关闭 if self.socket: try: self.socket.close() except: pass self.socket = None self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # 设置连接超时 self.socket.settimeout(10) self.socket.connect((self.host, self.port)) # 连接成功后取消超时设置 self.socket.settimeout(None) logger.info(f"[{datetime.now().strftime('%H:%M:%S')}] ✅ 已连接到服务器 {self.host}:{self.port}") return True except Exception as e: logger.error(f"[{datetime.now().strftime('%H:%M:%S')}] ❌ 连接失败: {e}") if self.socket: try: self.socket.close() except: pass self.socket = None return False def disconnect(self): """断开连接""" if self.socket: try: self.socket.close() logger.info(f"[{datetime.now().strftime('%H:%M:%S')}] 已断开连接") except: pass self.socket = None def send_signal(self, di_signal): """发送DI信号到服务器""" if not self.socket: logger.warning("未连接到服务器") return False try: self.socket.sendall(di_signal.encode('utf-8')) logger.info(f"[{datetime.now().strftime('%H:%M:%S')}] 📡 发送信号: {di_signal}") return True except Exception as e: logger.error(f"[{datetime.now().strftime('%H:%M:%S')}] ❌ 发送失败: {e}") # 发送失败时关闭socket,触发重连 try: self.socket.close() except: pass self.socket = None return False def get_current_state(self, cycle_time): """ 根据时间计算当前红绿灯状态 Args: cycle_time: 当前周期时间(秒) Returns: tuple: (状态名称, (ns_red, ns_yellow, ns_green), (ew_red, ew_yellow, ew_green)) """ # 红绿灯状态序列,每个状态持续时间(秒) states: list[TLState] = [ {"name": "南北红+东西绿", "duration": 30, "ns": (1,0,0), "ew": (0,0,1)}, {"name": "南北红+东西黄", "duration": 3, "ns": (1,0,0), "ew": (0,1,0)}, {"name": "南北绿+东西红", "duration": 25, "ns": (0,0,1), "ew": (1,0,0)}, {"name": "南北黄+东西红", "duration": 3, "ns": (0,1,0), "ew": (1,0,0)}, ] # 计算总周期时间 total_cycle_time = sum(state["duration"] for state in states) # 计算当前在周期中的位置 current_time = cycle_time % total_cycle_time # 找到当前状态 elapsed = 0 for state in states: if current_time < elapsed + state["duration"]: return state["name"], state["ns"], state["ew"] elapsed += state["duration"] # 默认返回第一个状态 return states[0]["name"], states[0]["ns"], states[0]["ew"] def run_continuous_simulation(self): """运行连续的红绿灯信号模拟""" logger.info(f"开始连续发送红绿灯信号,间隔: {self.interval}秒") logger.info("按 Ctrl+C 停止模拟") logger.info("服务端断开时会自动重连...") self.running = True start_time = time.time() connected = False try: while self.running: # 如果未连接,尝试连接 if not connected: connected = self.connect() if not connected: logger.warning("连接失败,5秒后重试...") time.sleep(5) continue else: # 重新连接成功,重置开始时间以保持状态同步 start_time = time.time() # 计算当前周期时间 current_time = time.time() - start_time # 获取当前状态 state_name, ns_state, ew_state = self.get_current_state(current_time) # 创建DI信号 di_signal = self.create_di_signal( ns_state[0], ns_state[1], ns_state[2], # 南北红黄绿 ew_state[0], ew_state[1], ew_state[2] # 东西红黄绿 ) # 发送信号 success = self.send_signal(di_signal) if not success: logger.warning("发送失败,连接已断开") self.disconnect() connected = False continue logger.info(f"当前状态: {state_name}") # 等待下一次发送 time.sleep(self.interval) except KeyboardInterrupt: logger.info("\n收到停止信号...") finally: self.running = False self.disconnect() logger.info("红绿灯模拟已停止") def test_signal_states(self): """测试各种信号状态""" if not self.connect(): return print("开始测试各种红绿灯信号状态...") test_cases: list[TLState] = [ {"name": "南北红+东西绿", "duration": 2, "ns": (1,0,0), "ew": (0,0,1)}, {"name": "南北红+东西黄", "duration": 2, "ns": (1,0,0), "ew": (0,1,0)}, {"name": "南北绿+东西红", "duration": 2, "ns": (0,0,1), "ew": (1,0,0)}, {"name": "南北黄+东西红", "duration": 2, "ns": (0,1,0), "ew": (1,0,0)}, {"name": "全红状态", "duration": 2, "ns": (1,0,0), "ew": (1,0,0)}, {"name": "无信号状态", "duration": 2, "ns": (0,0,0), "ew": (0,0,0)}, {"name": "冲突状态(两绿)", "duration": 2, "ns": (0,0,1), "ew": (0,0,1)}, ] try: for i, case in enumerate(test_cases, 1): print(f"\n测试 {i}/{len(test_cases)}: {case['name']}") di_signal = self.create_di_signal( case["ns"][0], case["ns"][1], case["ns"][2], case["ew"][0], case["ew"][1], case["ew"][2] ) success = self.send_signal(di_signal) if not success: print("发送失败") break time.sleep(2) # 每个状态持续2秒 except KeyboardInterrupt: print("\n测试被中断") finally: self.disconnect() def test_invalid_json(self): """测试无效JSON格式""" if not self.connect(): return print("开始测试无效JSON格式...") invalid_signals = [ "invalid json", "{\"DI-11\":\"invalid_value\"}", # 无效值类型 "{\"invalid_field\":1}", # 无效字段 "not json at all", "{\"DI-11\":1,}", # 语法错误 "", # 空字符串 ] try: for i, signal in enumerate(invalid_signals, 1): print(f"\n测试无效JSON {i}/{len(invalid_signals)}: {signal[:30]}...") success = self.send_signal(signal) if not success: print("发送失败") time.sleep(1) except KeyboardInterrupt: print("\n测试被中断") finally: self.disconnect() def setup_signal_handler(self): """设置信号处理器""" def signal_handler(signum, frame): print(f"\n收到信号 {signum},正在停止...") self.running = False signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) def main(): parser = argparse.ArgumentParser(description='红绿灯设备模拟器') parser.add_argument('--host', default='localhost', help='服务器主机地址 (默认: localhost)') parser.add_argument('--port', type=int, default=8082, help='服务器端口 (默认: 8082)') parser.add_argument('--interval', type=float, default=1.0, help='信号发送间隔秒数 (默认: 1.0)') parser.add_argument('--mode', choices=['continuous', 'test-states', 'test-invalid'], default='continuous', help='运行模式') args = parser.parse_args() device = TrafficLightDevice(args.host, args.port, args.interval) device.setup_signal_handler() print("=" * 60) print("红绿灯设备模拟器") print("=" * 60) print(f"服务器地址: {args.host}:{args.port}") print(f"发送间隔: {args.interval}秒") print(f"运行模式: {args.mode}") print("=" * 60) if args.mode == 'continuous': print("\n连续模拟模式 - 模拟真实红绿灯设备持续发送信号") print("设备会通过TCP连接发送纯JSON格式的DI信号") print("服务器将从TCP连接中获取设备的IP地址和端口") device.run_continuous_simulation() elif args.mode == 'test-states': print("\n状态测试模式 - 测试各种红绿灯状态") device.test_signal_states() elif args.mode == 'test-invalid': print("\n无效数据测试模式 - 测试无效JSON格式") device.test_invalid_json() if __name__ == '__main__': main()