QAUP_Management/tools/mock_traffic_light.py

340 lines
12 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
红绿灯设备模拟器
此脚本模拟真实的红绿灯硬件设备通过TCP连接发送纯JSON格式的DI信号数据。
服务器会从TCP连接中获取设备的IP地址和端口信息。
使用方法:
python test_traffic_light.py --host localhost --port 8082 --interval 1
"""
import socket
import time
import json
import argparse
from datetime import datetime
import signal
from typing import TypedDict
# 导入统一的日志配置
import sys
import os
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
from logging_config import setup_logger
# 设置当前模块的日志记录器
logger = setup_logger(
name='mock_traffic_light',
log_file='../logs/mock_traffic_light.log',
max_bytes=10*1024*1024, # 10MB
backup_count=5
)
class TLState(TypedDict):
name: str
duration: int
ns: tuple[int, int, int]
ew: tuple[int, int, int]
class TrafficLightDevice:
"""红绿灯设备模拟器"""
def __init__(self, host='localhost', port=8082, interval=1):
self.host = host
self.port = port
self.interval = interval # 发送间隔(秒)
self.running = False
self.socket = None
def create_di_signal(self, ns_red: int = 0, ns_yellow: int = 0, ns_green: int = 0,
ew_red: int = 0, ew_yellow: int = 0, ew_green: int = 0):
"""
创建红绿灯DI信号数据纯JSON格式
Args:
ns_red: 南北红灯状态 (0或1)
ns_yellow: 南北黄灯状态 (0或1)
ns_green: 南北绿灯状态 (0或1)
ew_red: 东西红灯状态 (0或1)
ew_yellow: 东西黄灯状态 (0或1)
ew_green: 东西绿灯状态 (0或1)
Returns:
JSON格式的DI信号字符串
"""
# DI信号数据真实红绿灯设备发送的格式
di_data = {
"DI-01": 0,
"DI-02": 0,
"DI-11": ns_red, # 北红
"DI-12": ns_yellow, # 北黄
"DI-13": ns_green, # 北绿
"DI-14": ew_red, # 东红
"DI-15": ew_yellow, # 东黄
"DI-16": ew_green, # 东绿
"DI-17": 0,
"DI-18": 0
}
return json.dumps(di_data, separators=(',', ':'))
def connect(self):
"""连接到服务器"""
try:
# 如果已有连接,先关闭
if self.socket:
try:
self.socket.close()
except:
pass
self.socket = None
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# 设置连接超时
self.socket.settimeout(10)
self.socket.connect((self.host, self.port))
# 连接成功后取消超时设置
self.socket.settimeout(None)
logger.info(f"[{datetime.now().strftime('%H:%M:%S')}] ✅ 已连接到服务器 {self.host}:{self.port}")
return True
except Exception as e:
logger.error(f"[{datetime.now().strftime('%H:%M:%S')}] ❌ 连接失败: {e}")
if self.socket:
try:
self.socket.close()
except:
pass
self.socket = None
return False
def disconnect(self):
"""断开连接"""
if self.socket:
try:
self.socket.close()
logger.info(f"[{datetime.now().strftime('%H:%M:%S')}] 已断开连接")
except:
pass
self.socket = None
def send_signal(self, di_signal):
"""发送DI信号到服务器"""
if not self.socket:
logger.warning("未连接到服务器")
return False
try:
self.socket.sendall(di_signal.encode('utf-8'))
logger.info(f"[{datetime.now().strftime('%H:%M:%S')}] 📡 发送信号: {di_signal}")
return True
except Exception as e:
logger.error(f"[{datetime.now().strftime('%H:%M:%S')}] ❌ 发送失败: {e}")
# 发送失败时关闭socket触发重连
try:
self.socket.close()
except:
pass
self.socket = None
return False
def get_current_state(self, cycle_time):
"""
根据时间计算当前红绿灯状态
Args:
cycle_time: 当前周期时间(秒)
Returns:
tuple: (状态名称, (ns_red, ns_yellow, ns_green), (ew_red, ew_yellow, ew_green))
"""
# 红绿灯状态序列,每个状态持续时间(秒)
states: list[TLState] = [
{"name": "南北红+东西绿", "duration": 30, "ns": (1,0,0), "ew": (0,0,1)},
{"name": "南北红+东西黄", "duration": 3, "ns": (1,0,0), "ew": (0,1,0)},
{"name": "南北绿+东西红", "duration": 25, "ns": (0,0,1), "ew": (1,0,0)},
{"name": "南北黄+东西红", "duration": 3, "ns": (0,1,0), "ew": (1,0,0)},
]
# 计算总周期时间
total_cycle_time = sum(state["duration"] for state in states)
# 计算当前在周期中的位置
current_time = cycle_time % total_cycle_time
# 找到当前状态
elapsed = 0
for state in states:
if current_time < elapsed + state["duration"]:
return state["name"], state["ns"], state["ew"]
elapsed += state["duration"]
# 默认返回第一个状态
return states[0]["name"], states[0]["ns"], states[0]["ew"]
def run_continuous_simulation(self):
"""运行连续的红绿灯信号模拟"""
logger.info(f"开始连续发送红绿灯信号,间隔: {self.interval}")
logger.info("按 Ctrl+C 停止模拟")
logger.info("服务端断开时会自动重连...")
self.running = True
start_time = time.time()
connected = False
try:
while self.running:
# 如果未连接,尝试连接
if not connected:
connected = self.connect()
if not connected:
logger.warning("连接失败5秒后重试...")
time.sleep(5)
continue
else:
# 重新连接成功,重置开始时间以保持状态同步
start_time = time.time()
# 计算当前周期时间
current_time = time.time() - start_time
# 获取当前状态
state_name, ns_state, ew_state = self.get_current_state(current_time)
# 创建DI信号
di_signal = self.create_di_signal(
ns_state[0], ns_state[1], ns_state[2], # 南北红黄绿
ew_state[0], ew_state[1], ew_state[2] # 东西红黄绿
)
# 发送信号
success = self.send_signal(di_signal)
if not success:
logger.warning("发送失败,连接已断开")
self.disconnect()
connected = False
continue
logger.info(f"当前状态: {state_name}")
# 等待下一次发送
time.sleep(self.interval)
except KeyboardInterrupt:
logger.info("\n收到停止信号...")
finally:
self.running = False
self.disconnect()
logger.info("红绿灯模拟已停止")
def test_signal_states(self):
"""测试各种信号状态"""
if not self.connect():
return
print("开始测试各种红绿灯信号状态...")
test_cases: list[TLState] = [
{"name": "南北红+东西绿", "duration": 2, "ns": (1,0,0), "ew": (0,0,1)},
{"name": "南北红+东西黄", "duration": 2, "ns": (1,0,0), "ew": (0,1,0)},
{"name": "南北绿+东西红", "duration": 2, "ns": (0,0,1), "ew": (1,0,0)},
{"name": "南北黄+东西红", "duration": 2, "ns": (0,1,0), "ew": (1,0,0)},
{"name": "全红状态", "duration": 2, "ns": (1,0,0), "ew": (1,0,0)},
{"name": "无信号状态", "duration": 2, "ns": (0,0,0), "ew": (0,0,0)},
{"name": "冲突状态(两绿)", "duration": 2, "ns": (0,0,1), "ew": (0,0,1)},
]
try:
for i, case in enumerate(test_cases, 1):
print(f"\n测试 {i}/{len(test_cases)}: {case['name']}")
di_signal = self.create_di_signal(
case["ns"][0], case["ns"][1], case["ns"][2],
case["ew"][0], case["ew"][1], case["ew"][2]
)
success = self.send_signal(di_signal)
if not success:
print("发送失败")
break
time.sleep(2) # 每个状态持续2秒
except KeyboardInterrupt:
print("\n测试被中断")
finally:
self.disconnect()
def test_invalid_json(self):
"""测试无效JSON格式"""
if not self.connect():
return
print("开始测试无效JSON格式...")
invalid_signals = [
"invalid json",
"{\"DI-11\":\"invalid_value\"}", # 无效值类型
"{\"invalid_field\":1}", # 无效字段
"not json at all",
"{\"DI-11\":1,}", # 语法错误
"", # 空字符串
]
try:
for i, signal in enumerate(invalid_signals, 1):
print(f"\n测试无效JSON {i}/{len(invalid_signals)}: {signal[:30]}...")
success = self.send_signal(signal)
if not success:
print("发送失败")
time.sleep(1)
except KeyboardInterrupt:
print("\n测试被中断")
finally:
self.disconnect()
def setup_signal_handler(self):
"""设置信号处理器"""
def signal_handler(signum, frame):
print(f"\n收到信号 {signum},正在停止...")
self.running = False
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
def main():
parser = argparse.ArgumentParser(description='红绿灯设备模拟器')
parser.add_argument('--host', default='localhost', help='服务器主机地址 (默认: localhost)')
parser.add_argument('--port', type=int, default=8082, help='服务器端口 (默认: 8082)')
parser.add_argument('--interval', type=float, default=1.0, help='信号发送间隔秒数 (默认: 1.0)')
parser.add_argument('--mode', choices=['continuous', 'test-states', 'test-invalid'],
default='continuous', help='运行模式')
args = parser.parse_args()
device = TrafficLightDevice(args.host, args.port, args.interval)
device.setup_signal_handler()
print("=" * 60)
print("红绿灯设备模拟器")
print("=" * 60)
print(f"服务器地址: {args.host}:{args.port}")
print(f"发送间隔: {args.interval}")
print(f"运行模式: {args.mode}")
print("=" * 60)
if args.mode == 'continuous':
print("\n连续模拟模式 - 模拟真实红绿灯设备持续发送信号")
print("设备会通过TCP连接发送纯JSON格式的DI信号")
print("服务器将从TCP连接中获取设备的IP地址和端口")
device.run_continuous_simulation()
elif args.mode == 'test-states':
print("\n状态测试模式 - 测试各种红绿灯状态")
device.test_signal_states()
elif args.mode == 'test-invalid':
print("\n无效数据测试模式 - 测试无效JSON格式")
device.test_invalid_json()
if __name__ == '__main__':
main()