340 lines
12 KiB
Python
340 lines
12 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
红绿灯设备模拟器
|
||
|
||
此脚本模拟真实的红绿灯硬件设备,通过TCP连接发送纯JSON格式的DI信号数据。
|
||
服务器会从TCP连接中获取设备的IP地址和端口信息。
|
||
|
||
使用方法:
|
||
python test_traffic_light.py --host localhost --port 8082 --interval 1
|
||
"""
|
||
|
||
import socket
|
||
import time
|
||
import json
|
||
import argparse
|
||
from datetime import datetime
|
||
import signal
|
||
from typing import TypedDict
|
||
|
||
# 导入统一的日志配置
|
||
import sys
|
||
import os
|
||
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
|
||
from logging_config import setup_logger
|
||
|
||
# 设置当前模块的日志记录器
|
||
logger = setup_logger(
|
||
name='mock_traffic_light',
|
||
log_file='../logs/mock_traffic_light.log',
|
||
max_bytes=10*1024*1024, # 10MB
|
||
backup_count=5
|
||
)
|
||
|
||
class TLState(TypedDict):
|
||
name: str
|
||
duration: int
|
||
ns: tuple[int, int, int]
|
||
ew: tuple[int, int, int]
|
||
|
||
class TrafficLightDevice:
|
||
"""红绿灯设备模拟器"""
|
||
|
||
def __init__(self, host='localhost', port=8082, interval=1):
|
||
self.host = host
|
||
self.port = port
|
||
self.interval = interval # 发送间隔(秒)
|
||
self.running = False
|
||
self.socket = None
|
||
|
||
def create_di_signal(self, ns_red: int = 0, ns_yellow: int = 0, ns_green: int = 0,
|
||
ew_red: int = 0, ew_yellow: int = 0, ew_green: int = 0):
|
||
"""
|
||
创建红绿灯DI信号数据(纯JSON格式)
|
||
|
||
Args:
|
||
ns_red: 南北红灯状态 (0或1)
|
||
ns_yellow: 南北黄灯状态 (0或1)
|
||
ns_green: 南北绿灯状态 (0或1)
|
||
ew_red: 东西红灯状态 (0或1)
|
||
ew_yellow: 东西黄灯状态 (0或1)
|
||
ew_green: 东西绿灯状态 (0或1)
|
||
|
||
Returns:
|
||
JSON格式的DI信号字符串
|
||
"""
|
||
# DI信号数据(真实红绿灯设备发送的格式)
|
||
di_data = {
|
||
"DI-01": 0,
|
||
"DI-02": 0,
|
||
"DI-11": ns_red, # 北红
|
||
"DI-12": ns_yellow, # 北黄
|
||
"DI-13": ns_green, # 北绿
|
||
"DI-14": ew_red, # 东红
|
||
"DI-15": ew_yellow, # 东黄
|
||
"DI-16": ew_green, # 东绿
|
||
"DI-17": 0,
|
||
"DI-18": 0
|
||
}
|
||
|
||
return json.dumps(di_data, separators=(',', ':'))
|
||
|
||
def connect(self):
|
||
"""连接到服务器"""
|
||
try:
|
||
# 如果已有连接,先关闭
|
||
if self.socket:
|
||
try:
|
||
self.socket.close()
|
||
except:
|
||
pass
|
||
self.socket = None
|
||
|
||
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||
# 设置连接超时
|
||
self.socket.settimeout(10)
|
||
self.socket.connect((self.host, self.port))
|
||
# 连接成功后取消超时设置
|
||
self.socket.settimeout(None)
|
||
logger.info(f"[{datetime.now().strftime('%H:%M:%S')}] ✅ 已连接到服务器 {self.host}:{self.port}")
|
||
return True
|
||
except Exception as e:
|
||
logger.error(f"[{datetime.now().strftime('%H:%M:%S')}] ❌ 连接失败: {e}")
|
||
if self.socket:
|
||
try:
|
||
self.socket.close()
|
||
except:
|
||
pass
|
||
self.socket = None
|
||
return False
|
||
|
||
def disconnect(self):
|
||
"""断开连接"""
|
||
if self.socket:
|
||
try:
|
||
self.socket.close()
|
||
logger.info(f"[{datetime.now().strftime('%H:%M:%S')}] 已断开连接")
|
||
except:
|
||
pass
|
||
self.socket = None
|
||
|
||
def send_signal(self, di_signal):
|
||
"""发送DI信号到服务器"""
|
||
if not self.socket:
|
||
logger.warning("未连接到服务器")
|
||
return False
|
||
|
||
try:
|
||
self.socket.sendall(di_signal.encode('utf-8'))
|
||
logger.info(f"[{datetime.now().strftime('%H:%M:%S')}] 📡 发送信号: {di_signal}")
|
||
return True
|
||
except Exception as e:
|
||
logger.error(f"[{datetime.now().strftime('%H:%M:%S')}] ❌ 发送失败: {e}")
|
||
# 发送失败时关闭socket,触发重连
|
||
try:
|
||
self.socket.close()
|
||
except:
|
||
pass
|
||
self.socket = None
|
||
return False
|
||
|
||
def get_current_state(self, cycle_time):
|
||
"""
|
||
根据时间计算当前红绿灯状态
|
||
|
||
Args:
|
||
cycle_time: 当前周期时间(秒)
|
||
|
||
Returns:
|
||
tuple: (状态名称, (ns_red, ns_yellow, ns_green), (ew_red, ew_yellow, ew_green))
|
||
"""
|
||
# 红绿灯状态序列,每个状态持续时间(秒)
|
||
states: list[TLState] = [
|
||
{"name": "南北红+东西绿", "duration": 30, "ns": (1,0,0), "ew": (0,0,1)},
|
||
{"name": "南北红+东西黄", "duration": 3, "ns": (1,0,0), "ew": (0,1,0)},
|
||
{"name": "南北绿+东西红", "duration": 25, "ns": (0,0,1), "ew": (1,0,0)},
|
||
{"name": "南北黄+东西红", "duration": 3, "ns": (0,1,0), "ew": (1,0,0)},
|
||
]
|
||
|
||
# 计算总周期时间
|
||
total_cycle_time = sum(state["duration"] for state in states)
|
||
|
||
# 计算当前在周期中的位置
|
||
current_time = cycle_time % total_cycle_time
|
||
|
||
# 找到当前状态
|
||
elapsed = 0
|
||
for state in states:
|
||
if current_time < elapsed + state["duration"]:
|
||
return state["name"], state["ns"], state["ew"]
|
||
elapsed += state["duration"]
|
||
|
||
# 默认返回第一个状态
|
||
return states[0]["name"], states[0]["ns"], states[0]["ew"]
|
||
|
||
def run_continuous_simulation(self):
|
||
"""运行连续的红绿灯信号模拟"""
|
||
logger.info(f"开始连续发送红绿灯信号,间隔: {self.interval}秒")
|
||
logger.info("按 Ctrl+C 停止模拟")
|
||
logger.info("服务端断开时会自动重连...")
|
||
|
||
self.running = True
|
||
start_time = time.time()
|
||
connected = False
|
||
|
||
try:
|
||
while self.running:
|
||
# 如果未连接,尝试连接
|
||
if not connected:
|
||
connected = self.connect()
|
||
if not connected:
|
||
logger.warning("连接失败,5秒后重试...")
|
||
time.sleep(5)
|
||
continue
|
||
else:
|
||
# 重新连接成功,重置开始时间以保持状态同步
|
||
start_time = time.time()
|
||
|
||
# 计算当前周期时间
|
||
current_time = time.time() - start_time
|
||
|
||
# 获取当前状态
|
||
state_name, ns_state, ew_state = self.get_current_state(current_time)
|
||
|
||
# 创建DI信号
|
||
di_signal = self.create_di_signal(
|
||
ns_state[0], ns_state[1], ns_state[2], # 南北红黄绿
|
||
ew_state[0], ew_state[1], ew_state[2] # 东西红黄绿
|
||
)
|
||
|
||
# 发送信号
|
||
success = self.send_signal(di_signal)
|
||
if not success:
|
||
logger.warning("发送失败,连接已断开")
|
||
self.disconnect()
|
||
connected = False
|
||
continue
|
||
|
||
logger.info(f"当前状态: {state_name}")
|
||
|
||
# 等待下一次发送
|
||
time.sleep(self.interval)
|
||
|
||
except KeyboardInterrupt:
|
||
logger.info("\n收到停止信号...")
|
||
finally:
|
||
self.running = False
|
||
self.disconnect()
|
||
logger.info("红绿灯模拟已停止")
|
||
|
||
def test_signal_states(self):
|
||
"""测试各种信号状态"""
|
||
if not self.connect():
|
||
return
|
||
|
||
print("开始测试各种红绿灯信号状态...")
|
||
|
||
test_cases: list[TLState] = [
|
||
{"name": "南北红+东西绿", "duration": 2, "ns": (1,0,0), "ew": (0,0,1)},
|
||
{"name": "南北红+东西黄", "duration": 2, "ns": (1,0,0), "ew": (0,1,0)},
|
||
{"name": "南北绿+东西红", "duration": 2, "ns": (0,0,1), "ew": (1,0,0)},
|
||
{"name": "南北黄+东西红", "duration": 2, "ns": (0,1,0), "ew": (1,0,0)},
|
||
{"name": "全红状态", "duration": 2, "ns": (1,0,0), "ew": (1,0,0)},
|
||
{"name": "无信号状态", "duration": 2, "ns": (0,0,0), "ew": (0,0,0)},
|
||
{"name": "冲突状态(两绿)", "duration": 2, "ns": (0,0,1), "ew": (0,0,1)},
|
||
]
|
||
|
||
try:
|
||
for i, case in enumerate(test_cases, 1):
|
||
print(f"\n测试 {i}/{len(test_cases)}: {case['name']}")
|
||
|
||
di_signal = self.create_di_signal(
|
||
case["ns"][0], case["ns"][1], case["ns"][2],
|
||
case["ew"][0], case["ew"][1], case["ew"][2]
|
||
)
|
||
|
||
success = self.send_signal(di_signal)
|
||
if not success:
|
||
print("发送失败")
|
||
break
|
||
|
||
time.sleep(2) # 每个状态持续2秒
|
||
|
||
except KeyboardInterrupt:
|
||
print("\n测试被中断")
|
||
finally:
|
||
self.disconnect()
|
||
|
||
def test_invalid_json(self):
|
||
"""测试无效JSON格式"""
|
||
if not self.connect():
|
||
return
|
||
|
||
print("开始测试无效JSON格式...")
|
||
|
||
invalid_signals = [
|
||
"invalid json",
|
||
"{\"DI-11\":\"invalid_value\"}", # 无效值类型
|
||
"{\"invalid_field\":1}", # 无效字段
|
||
"not json at all",
|
||
"{\"DI-11\":1,}", # 语法错误
|
||
"", # 空字符串
|
||
]
|
||
|
||
try:
|
||
for i, signal in enumerate(invalid_signals, 1):
|
||
print(f"\n测试无效JSON {i}/{len(invalid_signals)}: {signal[:30]}...")
|
||
success = self.send_signal(signal)
|
||
if not success:
|
||
print("发送失败")
|
||
time.sleep(1)
|
||
|
||
except KeyboardInterrupt:
|
||
print("\n测试被中断")
|
||
finally:
|
||
self.disconnect()
|
||
|
||
def setup_signal_handler(self):
|
||
"""设置信号处理器"""
|
||
def signal_handler(signum, frame):
|
||
print(f"\n收到信号 {signum},正在停止...")
|
||
self.running = False
|
||
|
||
signal.signal(signal.SIGINT, signal_handler)
|
||
signal.signal(signal.SIGTERM, signal_handler)
|
||
|
||
def main():
|
||
parser = argparse.ArgumentParser(description='红绿灯设备模拟器')
|
||
parser.add_argument('--host', default='localhost', help='服务器主机地址 (默认: localhost)')
|
||
parser.add_argument('--port', type=int, default=8082, help='服务器端口 (默认: 8082)')
|
||
parser.add_argument('--interval', type=float, default=1.0, help='信号发送间隔秒数 (默认: 1.0)')
|
||
parser.add_argument('--mode', choices=['continuous', 'test-states', 'test-invalid'],
|
||
default='continuous', help='运行模式')
|
||
|
||
args = parser.parse_args()
|
||
|
||
device = TrafficLightDevice(args.host, args.port, args.interval)
|
||
device.setup_signal_handler()
|
||
|
||
print("=" * 60)
|
||
print("红绿灯设备模拟器")
|
||
print("=" * 60)
|
||
print(f"服务器地址: {args.host}:{args.port}")
|
||
print(f"发送间隔: {args.interval}秒")
|
||
print(f"运行模式: {args.mode}")
|
||
print("=" * 60)
|
||
|
||
if args.mode == 'continuous':
|
||
print("\n连续模拟模式 - 模拟真实红绿灯设备持续发送信号")
|
||
print("设备会通过TCP连接发送纯JSON格式的DI信号")
|
||
print("服务器将从TCP连接中获取设备的IP地址和端口")
|
||
device.run_continuous_simulation()
|
||
elif args.mode == 'test-states':
|
||
print("\n状态测试模式 - 测试各种红绿灯状态")
|
||
device.test_signal_states()
|
||
elif args.mode == 'test-invalid':
|
||
print("\n无效数据测试模式 - 测试无效JSON格式")
|
||
device.test_invalid_json()
|
||
|
||
if __name__ == '__main__':
|
||
main() |