QAUP_Management/tools/mock_adxp.py
2025-10-16 18:33:44 +08:00

675 lines
24 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""ADXP 数据中台 SOAP WebService 模拟服务(纯标准库实现)
提供 SOAP WebService 接口,模拟真实数据中台,支持 SDK 客户端连接。
启动示例:
python3 mock_adxp.py
python3 mock_adxp.py --auto --interval 10
SOAP 接口:
POST /adxp SOAP 服务端点
GET /adxp?wsdl 获取 WSDL 描述
支持操作:
- login(username, password) 用户登录
- receiveMessage() 接收消息
运行该脚本无需额外第三方依赖。
"""
import argparse
import logging
import socketserver
import threading
import time
import xml.etree.ElementTree as ET
from datetime import datetime
from http.server import BaseHTTPRequestHandler
from typing import List, Tuple, Optional
# 导入统一的日志配置
import sys
import os
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
from logging_config import setup_logger
# 设置当前模块的日志记录器
logger = setup_logger(
name='mock_adxp',
log_file='logs/mock_adxp.log',
max_bytes=10*1024*1024, # 10MB
backup_count=5
)
# 服务代码
SERVICE_CODES = {
"ARR": "ADXP_NAOMS_O_DYN_ARR",
"AXOT": "ADXP_NAOMS_O_CDM_AXOT",
"RUNWAY": "ADXP_NAOMS_O_CDM_RUNWAY",
"CRAFTSEAT": "ADXP_NAOMS_O_DYN_CRAFTSEAT",
}
# SOAP 命名空间
NS_SOAP = "http://schemas.xmlsoap.org/soap/envelope/"
NS_LOGIN = "http://LoginService"
NS_MESSAGE = "http://MessageService"
def now_timestamp(fmt="%Y%m%d%H%M%S"):
"""获取当前时间戳"""
return datetime.now().strftime(fmt)
def build_biz_key(flight_no, movement, event_time):
"""构建 BizKey"""
marker = "A" if movement == "ARR" else "D"
return f"{flight_no}-{marker}-{event_time}"
def wrap_message(service_code, body):
"""包装消息(匹配真实数据中台格式)"""
session_id = now_timestamp("%Y%m%d%H%M%S%f")[:-3] # 精确到毫秒
return (
'<?xml version="1.0" encoding="UTF-8"?>'
'<Msg>'
'<Head>'
f'<Svc_ServiceCode>{service_code}</Svc_ServiceCode>'
'<Svc_Version>1.0</Svc_Version>'
'<Svc_Sender_Org>ADXP</Svc_Sender_Org>'
'<Svc_Sender>NAOMS</Svc_Sender>'
'<Svc_Receiver_Org></Svc_Receiver_Org>'
'<Svc_Receiver></Svc_Receiver>'
'<Svc_SerialNumber></Svc_SerialNumber>'
f'<Svc_SessionId>{session_id}</Svc_SessionId>'
f'<Svc_SendTimeStamp>{session_id}</Svc_SendTimeStamp>'
'</Head>'
'<Body>'
f'{body}'
'</Body>'
'</Msg>'
)
def build_arrival_message(flight_no, landing_time=None):
"""构建到达消息(匹配真实格式)"""
event_time = landing_time or now_timestamp()
biz_key = build_biz_key(flight_no, "ARR", event_time)
flight_id = abs(hash(biz_key)) % 10000000 # 生成模拟 FlightId
body = (
'<DynFlight>'
f'<FlightId>{flight_id}</FlightId>'
f'<BizKey>{biz_key}</BizKey>'
'<AirportIATA>TAO</AirportIATA>'
'<FLIGHTSTATUS>ARR</FLIGHTSTATUS>'
f'<RealLanding>{event_time}</RealLanding>'
'</DynFlight>'
)
xml = wrap_message(SERVICE_CODES["ARR"], body)
return SERVICE_CODES["ARR"], "UPDATE", xml
def build_departure_message(flight_no, offblock_time=None):
"""构建离港AXOT消息匹配真实CDM格式"""
event_time_min = (offblock_time or now_timestamp())[:12] # AXOT精确到分钟 yyyyMMddHHmm
event_time = offblock_time or now_timestamp()
biz_key = build_biz_key(flight_no, "DEP", event_time)
body = (
'<Flight>'
'<SourceKey>HCDM</SourceKey>'
f'<BizKey> {biz_key}</BizKey>' # 注意BizKey有空格前缀
f'<AXOT>{event_time_min}</AXOT>'
'</Flight>'
)
xml = wrap_message(SERVICE_CODES["AXOT"], body)
return SERVICE_CODES["AXOT"], "UPDATE", xml
def build_runway_message(flight_no, movement, runway, event_time=None):
"""构建跑道分配消息匹配真实CDM格式"""
evt_time = event_time or now_timestamp()
biz_key = build_biz_key(flight_no, movement, evt_time)
body = (
'<CdmRunway>'
f'<BizKey>{biz_key}</BizKey>'
f'<Movement>{movement}</Movement>'
f'<Runway>{runway}</Runway>'
'</CdmRunway>'
)
xml = wrap_message(SERVICE_CODES["RUNWAY"], body)
return SERVICE_CODES["RUNWAY"], "UPDATE", xml
def build_craftseat_message(flight_no, movement, seat, event_time=None):
"""构建机位分配消息匹配真实DYN格式"""
evt_time = event_time or now_timestamp()
biz_key = build_biz_key(flight_no, movement, evt_time)
flight_id = abs(hash(biz_key)) % 10000000
body = (
'<DynFlight>'
f'<FlightId>{flight_id}</FlightId>'
f'<BizKey>{biz_key}</BizKey>'
'<CraftseatList>'
'<Craftseat>'
f'<Code>{seat}</Code>'
f'<PlanStart>{evt_time}</PlanStart>'
f'<PlanEnd>{evt_time}</PlanEnd>'
f'<RealStart>{evt_time}</RealStart>'
f'<RealEnd>{evt_time}</RealEnd>'
'</Craftseat>'
'</CraftseatList>'
'</DynFlight>'
)
xml = wrap_message(SERVICE_CODES["CRAFTSEAT"], body)
return SERVICE_CODES["CRAFTSEAT"], "UPDATE", xml
# ==================== 消息队列 ====================
class MessageQueue:
"""线程安全的消息队列"""
def __init__(self):
self.messages = [] # List of (service_code, action_code, xml)
self.lock = threading.Lock()
self.auto_push_enabled = False
self.auto_push_thread = None
self.auto_push_interval = 10
def add_message(self, service_code, action_code, xml):
with self.lock:
self.messages.append((service_code, action_code, xml))
logger.info(f"Added message: {service_code}")
def get_messages(self):
"""获取并清空所有消息"""
with self.lock:
msgs = self.messages.copy()
self.messages.clear()
return msgs
def start_auto_push(self, interval=10):
if self.auto_push_enabled:
return
self.auto_push_enabled = True
self.auto_push_interval = interval
self.auto_push_thread = threading.Thread(target=self._auto_push_loop, daemon=True)
self.auto_push_thread.start()
logger.info(f"Auto-push started with interval {interval}s")
def stop_auto_push(self):
self.auto_push_enabled = False
if self.auto_push_thread:
self.auto_push_thread.join(timeout=2)
logger.info("Auto-push stopped")
def _auto_push_loop(self):
while self.auto_push_enabled:
try:
self._generate_sample_messages()
time.sleep(self.auto_push_interval)
except Exception as e:
logger.error(f"Auto-push error: {e}")
def _generate_sample_messages(self):
event_time = now_timestamp()
# MU5123 到达
self.add_message(*build_arrival_message("MU5123", event_time))
self.add_message(*build_runway_message("MU5123", "ARR", "35L", event_time))
self.add_message(*build_craftseat_message("MU5123", "ARR", "138", event_time))
# CA1234 离港
self.add_message(*build_departure_message("CA1234", event_time))
self.add_message(*build_runway_message("CA1234", "DEP", "17", event_time))
self.add_message(*build_craftseat_message("CA1234", "DEP", "201", event_time))
logger.info(f"Generated 6 sample messages at {event_time}")
# 全局消息队列
message_queue = MessageQueue()
# 全局 token 存储(用户名 -> token
active_tokens = {}
# ==================== SOAP 处理 ====================
def create_soap_response(body_content):
"""创建 SOAP 响应"""
# 使用字符串拼接方式创建 SOAP 响应,避免命名空间问题
body_xml = ET.tostring(body_content, encoding='utf-8').decode('utf-8')
soap_response = f'''<?xml version="1.0" encoding="utf-8"?>
<soapenv:Envelope xmlns:soapenv="{NS_SOAP}">
<soapenv:Body>
{body_xml}
</soapenv:Body>
</soapenv:Envelope>'''
return soap_response.encode('utf-8')
def create_login_response(success, code, message):
"""创建登录响应(完全匹配真实数据中台格式 - 直接返回 LoginResult"""
# 直接创建 LoginResult 元素,不需要 loginResponse 包装
result = ET.Element('LoginResult')
success_elem = ET.SubElement(result, 'success')
success_elem.text = 'TRUE' if success else 'FALSE' # 全大写
code_elem = ET.SubElement(result, 'code')
code_elem.text = str(code)
# message 字段返回 token成功时或错误信息失败时
message_elem = ET.SubElement(result, 'message')
message_elem.text = message if message else ''
return create_soap_response(result)
def create_receive_message_response(messages):
"""创建接收消息响应(匹配 SDK 期望的 receiveMessageResponse"""
import uuid
# 创建 SOAP 操作响应元素SDK 期望带命名空间)
response = ET.Element('{' + NS_MESSAGE + '}receiveMessageResponse')
result = ET.SubElement(response, 'MessageResult')
success_elem = ET.SubElement(result, 'success')
success_elem.text = 'TRUE'
code_elem = ET.SubElement(result, 'code')
code_elem.text = '0'
guid_elem = ET.SubElement(result, 'guid')
guid_elem.text = str(uuid.uuid4()).replace('-', '')
# MessageList - 大写 MSDK JAXB 要求,每个 Msg 包含 Head 和 Body
msg_list = ET.SubElement(result, 'MessageList')
for service_code, action_code, xml_content in messages:
msg_elem = ET.SubElement(msg_list, 'Msg') # 注意Msg 首字母大写
# 解析 xml_content提取 Head 和 Body 元素
try:
import xml.etree.ElementTree as ET_parse
msg_root = ET_parse.fromstring(xml_content)
# 找到 Head 和 Body 元素并添加到 Msg 中
for child in msg_root:
if child.tag in ('Head', 'Body'):
msg_elem.append(child)
except Exception as e:
logger.error(f"解析消息 XML 失败: {e}")
return create_soap_response(response)
def parse_soap_request(xml_data):
"""解析 SOAP 请求(支持 Header 中的 token 验证)"""
try:
root = ET.fromstring(xml_data)
# 查找 Header可能包含 username 和 token
header = root.find('.//{' + NS_SOAP + '}Header')
auth_info = {}
if header is not None:
# 尝试提取 username 和 token可能在不同命名空间
for elem in header.iter():
if elem.tag.endswith('username') and elem.text:
auth_info['username'] = elem.text
elif elem.tag.endswith('token') and elem.text:
auth_info['token'] = elem.text
# 查找 Body
body = root.find('.//{' + NS_SOAP + '}Body')
if body is None:
return None, None
# 查找 login 操作
login_elem = body.find('.//{' + NS_LOGIN + '}login')
if login_elem is not None:
username_elem = login_elem.find('.//{' + NS_LOGIN + '}username')
password_elem = login_elem.find('.//{' + NS_LOGIN + '}password')
# 也尝试无命名空间的元素
if username_elem is None:
username_elem = login_elem.find('.//username')
if password_elem is None:
password_elem = login_elem.find('.//password')
params = {
'username': username_elem.text if username_elem is not None else None,
'password': password_elem.text if password_elem is not None else None
}
params.update(auth_info) # 添加 Header 中的认证信息
return 'login', params
# 查找 receiveMessage 操作(可能在 LoginService 或 MessageService 命名空间)
receive_elem = body.find('.//{' + NS_LOGIN + '}receiveMessage')
if receive_elem is None:
receive_elem = body.find('.//{' + NS_MESSAGE + '}receiveMessage')
if receive_elem is not None:
return 'receiveMessage', auth_info # 返回 Header 中的认证信息
# 查找 getInterval 操作HeartbeatService
# 尝试查找任何命名空间的 getInterval
for elem in body.iter():
if elem.tag.endswith('getInterval'):
return 'getInterval', auth_info
return None, None
except Exception as e:
logger.error(f"Parse SOAP request error: {e}", exc_info=True)
return None, None
def handle_login(username, password):
"""处理登录,生成并返回 token"""
logger.info(f"Login request: username={username}")
# 检查用户名和密码是否为None
if username is None or password is None:
logger.warning("❌ Login failed: username or password is None")
return create_login_response(False, 803, "用户名或密码不能为空")
if username == "dianxin" and password == "dianxin@123":
# 生成 token使用 UUID32位十六进制
import uuid
token = str(uuid.uuid4()).replace('-', '')
active_tokens[username] = token
logger.info(f"✅ Login successful: username={username}, token={token[:16] if token else ''}...")
return create_login_response(True, 0, token) # message 字段返回 token
else:
logger.warning(f"❌ Login failed: username={username}")
return create_login_response(False, 803, "用户名或密码错误")
def handle_receive_message(auth_info=None):
"""处理接收消息(无验证,直接返回)"""
# 完全跳过 token 验证,直接返回消息
messages = message_queue.get_messages()
# 只在有消息时记录日志,减少噪音
if len(messages) > 0:
logger.info(f"✅ receiveMessage returned {len(messages)} messages")
return create_receive_message_response(messages)
def handle_get_interval():
"""处理 getInterval 请求HeartbeatService"""
# 返回心跳间隔毫秒SDK 用这个来检查连接是否正常
response = ET.Element('getIntervalResponse')
interval = ET.SubElement(response, 'return')
interval.text = '60000' # 60 秒心跳间隔
return create_soap_response(response)
# ==================== WSDL 定义 ====================
WSDL_TEMPLATE = '''<?xml version="1.0" encoding="UTF-8"?>
<wsdl:definitions xmlns:wsdl="http://schemas.xmlsoap.org/wsdl/"
xmlns:soap="http://schemas.xmlsoap.org/wsdl/soap/"
xmlns:tns="http://LoginService"
xmlns:xsd="http://www.w3.org/2001/XMLSchema"
targetNamespace="http://LoginService"
name="LoginService">
<wsdl:types>
<xsd:schema targetNamespace="http://LoginService" elementFormDefault="qualified">
<xsd:element name="login">
<xsd:complexType>
<xsd:sequence>
<xsd:element name="username" type="xsd:string"/>
<xsd:element name="password" type="xsd:string"/>
</xsd:sequence>
</xsd:complexType>
</xsd:element>
<xsd:element name="loginResponse">
<xsd:complexType>
<xsd:sequence>
<xsd:element name="return" type="tns:loginResult"/>
</xsd:sequence>
</xsd:complexType>
</xsd:element>
<xsd:complexType name="loginResult">
<xsd:sequence>
<xsd:element name="success" type="xsd:boolean"/>
<xsd:element name="code" type="xsd:string"/>
<xsd:element name="message" type="xsd:string"/>
</xsd:sequence>
</xsd:complexType>
<xsd:element name="receiveMessage">
<xsd:complexType>
<xsd:sequence/>
</xsd:complexType>
</xsd:element>
<xsd:element name="receiveMessageResponse">
<xsd:complexType>
<xsd:sequence>
<xsd:element name="return" type="tns:messageResult"/>
</xsd:sequence>
</xsd:complexType>
</xsd:element>
<xsd:complexType name="messageResult">
<xsd:sequence>
<xsd:element name="messageList" type="tns:messageList"/>
<xsd:element name="messageStringList" type="xsd:string" maxOccurs="unbounded" minOccurs="0"/>
</xsd:sequence>
</xsd:complexType>
<xsd:complexType name="messageList">
<xsd:sequence>
<xsd:element name="msg" type="tns:msgType" maxOccurs="unbounded" minOccurs="0"/>
</xsd:sequence>
</xsd:complexType>
<xsd:complexType name="msgType">
<xsd:sequence>
<xsd:element name="serviceCode" type="xsd:string"/>
<xsd:element name="actionCode" type="xsd:string"/>
</xsd:sequence>
</xsd:complexType>
</xsd:schema>
</wsdl:types>
<wsdl:message name="login">
<wsdl:part name="parameters" element="tns:login"/>
</wsdl:message>
<wsdl:message name="loginResponse">
<wsdl:part name="parameters" element="tns:loginResponse"/>
</wsdl:message>
<wsdl:message name="receiveMessage">
<wsdl:part name="parameters" element="tns:receiveMessage"/>
</wsdl:message>
<wsdl:message name="receiveMessageResponse">
<wsdl:part name="parameters" element="tns:receiveMessageResponse"/>
</wsdl:message>
<wsdl:portType name="LoginService">
<wsdl:operation name="login">
<wsdl:input message="tns:login"/>
<wsdl:output message="tns:loginResponse"/>
</wsdl:operation>
<wsdl:operation name="receiveMessage">
<wsdl:input message="tns:receiveMessage"/>
<wsdl:output message="tns:receiveMessageResponse"/>
</wsdl:operation>
</wsdl:portType>
<wsdl:binding name="LoginServiceSoapBinding" type="tns:LoginService">
<soap:binding style="document" transport="http://schemas.xmlsoap.org/soap/http"/>
<wsdl:operation name="login">
<soap:operation soapAction=""/>
<wsdl:input><soap:body use="literal"/></wsdl:input>
<wsdl:output><soap:body use="literal"/></wsdl:output>
</wsdl:operation>
<wsdl:operation name="receiveMessage">
<soap:operation soapAction=""/>
<wsdl:input><soap:body use="literal"/></wsdl:input>
<wsdl:output><soap:body use="literal"/></wsdl:output>
</wsdl:operation>
</wsdl:binding>
<wsdl:service name="LoginServiceHttpService">
<wsdl:port name="LoginServiceHttpPort" binding="tns:LoginServiceSoapBinding">
<soap:address location="{{ENDPOINT_URL}}"/>
</wsdl:port>
</wsdl:service>
</wsdl:definitions>'''
# ==================== HTTP 处理器 ====================
class SOAPRequestHandler(BaseHTTPRequestHandler):
"""SOAP 请求处理器"""
def log_message(self, format, *args):
"""自定义日志 - 禁用 HTTP 请求日志减少噪音"""
pass # 不记录每个 HTTP 请求
def do_GET(self):
"""处理 GET 请求WSDL"""
# 支持多个服务路径
if any(self.path.startswith(p) for p in ['/LoginService', '/MessageService', '/HeartbeatService', '/adxp']):
if '?wsdl' in self.path.lower():
# 动态生成 WSDL替换端点 URL
host = self.headers.get('Host', 'localhost:8086')
# 根据路径确定服务名
if '/MessageService' in self.path:
service_name = 'MessageService'
elif '/HeartbeatService' in self.path:
service_name = 'HeartbeatService'
else:
service_name = 'LoginService'
endpoint_url = f"http://{host}/{service_name}"
wsdl_content = WSDL_TEMPLATE.replace('{{ENDPOINT_URL}}', endpoint_url)
wsdl_content = wsdl_content.replace('LoginService', service_name)
self.send_response(200)
self.send_header('Content-Type', 'text/xml; charset=utf-8')
self.end_headers()
self.wfile.write(wsdl_content.encode('utf-8'))
else:
self.send_response(200)
self.send_header('Content-Type', 'text/html; charset=utf-8')
self.end_headers()
html = '''<!DOCTYPE html>
<html><head><title>ADXP Mock Service</title></head>
<body>
<h1>ADXP SOAP WebService Mock Server</h1>
<p>WSDL: <a href="/LoginService?wsdl">/LoginService?wsdl</a></p>
<p>Endpoint: POST /LoginService</p>
</body></html>'''
self.wfile.write(html.encode('utf-8'))
else:
self.send_error(404)
def do_POST(self):
"""处理 POST 请求SOAP"""
# 支持多个服务路径
valid_paths = ['/LoginService', '/MessageService', '/HeartbeatService', '/adxp']
if not any(self.path.startswith(p) for p in valid_paths):
self.send_error(404)
return
# 读取请求体
content_length = int(self.headers.get('Content-Length', 0))
body = self.rfile.read(content_length)
# 打印原始请求(调试用)
logger.debug("=" * 60)
logger.debug("Received SOAP Request:")
logger.debug(body.decode('utf-8', errors='ignore'))
logger.debug("=" * 60)
# 解析 SOAP 请求
operation, params = parse_soap_request(body)
if operation == 'login':
# 确保 params 不为 None 且包含必要的字段
if params is None:
logger.error("Login request has no parameters")
response = create_login_response(False, 801, "登录参数错误")
else:
# 确保username和password不为None
username = params.get('username') if params.get('username') is not None else ""
password = params.get('password') if params.get('password') is not None else ""
response = handle_login(username, password)
elif operation == 'receiveMessage':
# 确保 params 不为 None
response = handle_receive_message(params if params is not None else {}) # 传入 auth_info
elif operation == 'getInterval':
response = handle_get_interval()
else:
logger.error(f"Unknown operation: {operation}")
logger.error(f"Request body: {body.decode('utf-8', errors='ignore')}")
self.send_error(400, "Unknown operation")
return
# 打印响应(调试用)
logger.debug("Sending SOAP Response:")
logger.debug(response.decode('utf-8', errors='ignore'))
logger.debug("=" * 60)
# 发送响应
self.send_response(200)
self.send_header('Content-Type', 'text/xml; charset=utf-8')
self.send_header('Content-Length', str(len(response)))
self.end_headers()
self.wfile.write(response)
class ThreadedHTTPServer(socketserver.ThreadingMixIn, socketserver.TCPServer):
"""支持多线程的 HTTP 服务器"""
allow_reuse_address = True
daemon_threads = True
# ==================== 主程序 ====================
def main():
parser = argparse.ArgumentParser(description="ADXP SOAP WebService Mock Server")
parser.add_argument("--host", default="0.0.0.0", help="Server host (default: 0.0.0.0)")
parser.add_argument("--port", type=int, default=8086, help="Server port (default: 8086)")
parser.add_argument("--auto", action="store_true", help="Enable auto-push on startup")
parser.add_argument("--interval", type=int, default=10, help="Auto-push interval in seconds (default: 10)")
args = parser.parse_args()
# 启动自动推送
if args.auto:
message_queue.start_auto_push(args.interval)
# 启动服务器
server = ThreadedHTTPServer((args.host, args.port), SOAPRequestHandler)
logger.info("=" * 70)
logger.info("ADXP SOAP WebService Mock Server")
logger.info("=" * 70)
logger.info(f"WSDL: http://{args.host}:{args.port}/LoginService?wsdl")
logger.info(f"Endpoint: http://{args.host}:{args.port}/LoginService")
logger.info(f"Auto-push: {'ENABLED' if args.auto else 'DISABLED'}")
if args.auto:
logger.info(f"Auto-push interval: {args.interval}s")
logger.info(f"Credentials: dianxin / dianxin@123")
logger.info("=" * 70)
try:
server.serve_forever()
except KeyboardInterrupt:
logger.info("Shutting down...")
message_queue.stop_auto_push()
server.shutdown()
if __name__ == "__main__":
main()