capturedEvents = eventCaptor.getAllValues();
- assertFalse(capturedEvents.isEmpty(), "如果API返回数据,必须发布事件");
-
- System.out.println("✓ 检测到事件发布,共发布了 " + capturedEvents.size() + " 个事件");
-
- System.out.println("3. 验证事件内容...");
-
- // 验证每个事件的完整性
- for (int i = 0; i < capturedEvents.size(); i++) {
- FlightNotificationEvent event = capturedEvents.get(i);
-
- System.out.println(String.format(" 事件[%d]: 航班=%s, 类型=%s, 跑道=%s, 机位=%s",
- i+1, event.getFlightNo(), event.getEventType(), event.getRunway(), event.getSeat()));
-
- // 严格验证必要字段
- assertNotNull(event.getFlightNo(), "航班号不能为空");
- assertFalse(event.getFlightNo().trim().isEmpty(), "航班号不能为空字符串");
- assertNotNull(event.getEventType(), "事件类型不能为空");
- assertNotNull(event.getNotificationLevel(), "通知级别不能为空");
- assertNotNull(event.getTimestamp(), "时间戳不能为空");
-
- // 验证业务逻辑
- assertTrue(event.getEventType().equals("LANDING") || event.getEventType().equals("TAKEOFF"),
- "事件类型必须是LANDING或TAKEOFF,实际: " + event.getEventType());
- assertEquals("IMPORTANT", event.getNotificationLevel(), "通知级别必须是IMPORTANT");
-
- // 验证时间戳合理性
- long now = System.currentTimeMillis();
- long eventTime = event.getTimestamp();
- assertTrue(Math.abs(now - eventTime) < 300000,
- "事件时间戳应该在5分钟内,实际差值: " + Math.abs(now - eventTime) + "ms");
- }
-
- System.out.println("✓ 事件内容验证通过");
- System.out.println("4. 验证数据处理逻辑...");
-
- // 验证事件类型与航班类型的对应关系
- for (FlightNotificationEvent event : capturedEvents) {
- if ("IN".equals(event.getFlightType())) {
- assertEquals("LANDING", event.getEventType(),
- "进港航班(IN)必须产生LANDING事件,实际: " + event.getEventType());
- assertNotNull(event.getEventDescription(), "事件描述不能为空");
- } else if ("OUT".equals(event.getFlightType())) {
- assertEquals("TAKEOFF", event.getEventType(),
- "出港航班(OUT)必须产生TAKEOFF事件,实际: " + event.getEventType());
- assertNotNull(event.getEventDescription(), "事件描述不能为空");
- } else {
- fail("未知的航班类型: " + event.getFlightType());
- }
- }
-
- System.out.println("✓ 数据处理逻辑验证通过");
- System.out.println("🎉 航班进出港通知完整流程集成测试成功!");
-
- } catch (Exception e) {
- System.err.println("❌ 检测到问题:");
-
- // 检查是否没有事件发布
- try {
- verify(eventPublisher, never()).publishEvent(any());
- System.err.println(" 没有事件被发布");
- System.err.println(" 可能原因:");
- System.err.println(" 1. API返回空数据(这是正常的)");
- System.err.println(" 2. 认证失败");
- System.err.println(" 3. API连接问题");
- System.err.println(" 4. 数据转换失败");
-
- // 如果确实没有数据,这不算测试失败
- System.out.println("⚠ 当前API无数据,但系统处理正常");
-
- } catch (Exception verifyException) {
- System.err.println(" 事件发布验证失败: " + e.getMessage());
- fail("航班进出港通知处理流程失败: " + e.getMessage());
- }
- }
- }
-
- /**
- * 测试无数据情况的处理
- * 如果API返回空数据,应该正常处理而不是报错
- */
- @Test
- void testEmptyDataHandling() {
- System.out.println("=== 测试空数据处理 ===");
-
- // 执行数据采集(可能返回空数据)
- dataCollectorService.collectFlightNotificationData();
-
- // 如果没有数据,不应该发布新事件,但也不应该抛异常
- // 这个测试确保空数据情况被正确处理
- System.out.println("✓ 空数据处理测试完成(无异常抛出)");
- }
-
- /**
- * 测试配置和依赖注入
- * 确保所有必要的组件都正确注入
- */
- @Test
- void testDependencyInjection() {
- System.out.println("=== 测试依赖注入 ===");
-
- assertNotNull(dataCollectorService, "DataCollectorService应该被正确注入");
- assertNotNull(eventPublisher, "ApplicationEventPublisher应该被正确注入");
-
- // 验证定时任务配置
- // 注意:这里只是验证Bean存在,实际的定时任务调度由Spring管理
- System.out.println("✓ 依赖注入验证通过");
- System.out.println("✓ DataCollectorService实例: " + dataCollectorService.getClass().getSimpleName());
- System.out.println("✓ EventPublisher实例: " + eventPublisher.getClass().getSimpleName());
- }
-
- /**
- * 测试异常处理
- * 确保在异常情况下不会导致整个应用崩溃
- */
- @Test
- void testExceptionHandling() {
- System.out.println("=== 测试异常处理 ===");
-
- try {
- // 执行数据采集,即使有异常也应该被妥善处理
- dataCollectorService.collectFlightNotificationData();
- System.out.println("✓ 数据采集方法执行完成,未抛出未捕获异常");
- } catch (Exception e) {
- fail("数据采集方法不应该抛出未捕获的异常: " + e.getMessage());
- }
- }
-}
\ No newline at end of file
diff --git a/qaup-collision/src/test/java/com/qaup/collision/test/ConfigurationBindingTest.java b/qaup-collision/src/test/java/com/qaup/collision/test/ConfigurationBindingTest.java
index 85b3aa48..c5ce2d44 100644
--- a/qaup-collision/src/test/java/com/qaup/collision/test/ConfigurationBindingTest.java
+++ b/qaup-collision/src/test/java/com/qaup/collision/test/ConfigurationBindingTest.java
@@ -1,15 +1,16 @@
-package com.qaup.collision.test;
-
-import org.junit.jupiter.api.Test;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.boot.test.context.SpringBootTest;
-import org.springframework.test.context.TestPropertySource;
+package com.qaup.collision.test;
+
+import org.junit.jupiter.api.Test;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.test.context.TestPropertySource;
/**
* 测试Spring Boot配置绑定规则
* 验证@Value和@ConfigurationProperties对连字符的不同处理
*/
-@SpringBootTest
+@SpringBootTest(classes = ConfigurationBindingTest.TestApplication.class)
@TestPropertySource(properties = {
// 测试嵌套连字符配置
"test.simple-key=works", // 简单连字符
@@ -19,6 +20,10 @@ import org.springframework.test.context.TestPropertySource;
"test.flightnotification.interval=2000" // 修复后的配置
})
class ConfigurationBindingTest {
+
+ @SpringBootApplication
+ static class TestApplication {
+ }
// 简单连字符 - 应该工作
@Value("${test.simple-key:default}")
diff --git a/tools/README_SOAP.md b/tools/README_SOAP.md
new file mode 100644
index 00000000..28cf6cf5
--- /dev/null
+++ b/tools/README_SOAP.md
@@ -0,0 +1,150 @@
+# ADXP SOAP Mock Server 使用说明
+
+## 概述
+
+`mock_adxp_soap.py` 是数据中台的 SOAP WebService 模拟服务器,用于本地开发测试 SDK 集成。
+
+## 安装依赖
+
+```bash
+cd tools
+pip3 install -r requirements-soap.txt
+```
+
+或直接安装:
+```bash
+pip3 install spyne lxml
+```
+
+## 启动服务器
+
+### 基本启动
+```bash
+python3 mock_adxp_soap.py
+```
+
+### 启用自动推送(每10秒生成一批样例消息)
+```bash
+python3 mock_adxp_soap.py --auto --interval 10
+```
+
+### 自定义地址和端口
+```bash
+python3 mock_adxp_soap.py --host 0.0.0.0 --port 8086 --auto
+```
+
+## 接口说明
+
+### WSDL 地址
+```
+http://localhost:8086/adxp?wsdl
+```
+
+### SOAP 操作
+
+#### 1. login(username, password)
+用户登录
+
+**测试账号:**
+- 用户名: `dianxin`
+- 密码: `dianxin@123`
+
+**返回:**
+```xml
+
+ true
+ 200
+ 登录成功
+
+```
+
+#### 2. receiveMessage()
+接收消息队列中的所有消息(接收后清空)
+
+**返回:**
+```xml
+
+
+
+ ADXP_NAOMS_O_DYN_ARR
+ UPDATE
+
+
+
+ - ...
+
+
+```
+
+## 自动推送消息
+
+启用 `--auto` 后,服务器每隔指定秒数自动生成以下样例消息:
+
+1. **MU5123 到达** (ARR + RUNWAY + CRAFTSEAT)
+ - 降落时间: 实时
+ - 跑道: 35L
+ - 机位: 138
+
+2. **CA1234 离港** (AXOT + RUNWAY + CRAFTSEAT)
+ - 撤轮挡时间: 实时
+ - 跑道: 17
+ - 机位: 201
+
+## 配置后端应用
+
+确保 `application-dev.yml` 中配置正确:
+
+```yaml
+data:
+ collector:
+ adxp-adapter:
+ host: localhost
+ port: 8086
+ username: dianxin
+ password: dianxin@123
+ reconnect-delay-millis: 3000
+```
+
+或通过环境变量:
+```bash
+export ADXP_SDK_ENABLED=true
+export ADXP_SDK_HOST=localhost
+export ADXP_SDK_PORT=8086
+```
+
+## 测试流程
+
+1. **启动 SOAP mock 服务器:**
+ ```bash
+ cd tools
+ python3 mock_adxp_soap.py --auto --interval 10
+ ```
+
+2. **启动后端应用:**
+ ```bash
+ cd qaup-admin
+ mvn spring-boot:run -Dspring-boot.run.profiles=dev
+ ```
+
+3. **验证数据:**
+ - 查看后端日志,确认 SDK 登录成功
+ - 打开前端页面,查看航班通知实时更新
+ - 每10秒应该收到 MU5123 和 CA1234 的更新
+
+## 故障排查
+
+### 登录失败 803
+- 检查 mock 服务器是否启动
+- 检查用户名密码是否为 `dianxin/dianxin@123`
+- 查看 mock 服务器日志
+
+### 接收不到消息
+- 确认启用了 `--auto` 自动推送
+- 检查后端 `adxp-adapter` 配置是否正确
+- 查看 DataCollectorService 日志
+
+### 依赖安装失败
+```bash
+# 使用国内镜像
+pip3 install -i https://pypi.tuna.tsinghua.edu.cn/simple spyne lxml
+```
diff --git a/tools/mock_adxp.py b/tools/mock_adxp.py
new file mode 100644
index 00000000..484b3f64
--- /dev/null
+++ b/tools/mock_adxp.py
@@ -0,0 +1,668 @@
+#!/usr/bin/env python3
+"""ADXP 数据中台 SOAP WebService 模拟服务(纯标准库实现)
+
+提供 SOAP WebService 接口,模拟真实数据中台,支持 SDK 客户端连接。
+
+启动示例:
+ python3 mock_adxp.py
+ python3 mock_adxp.py --auto --interval 10
+
+SOAP 接口:
+ POST /adxp SOAP 服务端点
+ GET /adxp?wsdl 获取 WSDL 描述
+
+支持操作:
+ - login(username, password) 用户登录
+ - receiveMessage() 接收消息
+
+运行该脚本无需额外第三方依赖。
+"""
+
+import argparse
+import logging
+import socketserver
+import threading
+import time
+import xml.etree.ElementTree as ET
+from datetime import datetime
+from http.server import BaseHTTPRequestHandler
+from typing import List, Tuple, Optional
+
+# 日志配置 - 只显示关键信息
+logging.basicConfig(
+ level=logging.WARNING, # 只显示 WARNING 和 ERROR,减少海量日志
+ format="%(asctime)s [%(levelname)s] %(message)s"
+)
+logger = logging.getLogger("mock-adxp")
+logger.setLevel(logging.INFO) # 自己的日志保持 INFO
+
+# 服务代码
+SERVICE_CODES = {
+ "ARR": "ADXP_NAOMS_O_DYN_ARR",
+ "AXOT": "ADXP_NAOMS_O_CDM_AXOT",
+ "RUNWAY": "ADXP_NAOMS_O_CDM_RUNWAY",
+ "CRAFTSEAT": "ADXP_NAOMS_O_DYN_CRAFTSEAT",
+}
+
+# SOAP 命名空间
+NS_SOAP = "http://schemas.xmlsoap.org/soap/envelope/"
+NS_LOGIN = "http://LoginService"
+NS_MESSAGE = "http://MessageService"
+
+
+def now_timestamp(fmt="%Y%m%d%H%M%S"):
+ """获取当前时间戳"""
+ return datetime.now().strftime(fmt)
+
+
+def build_biz_key(flight_no, movement, event_time):
+ """构建 BizKey"""
+ marker = "A" if movement == "ARR" else "D"
+ return f"{flight_no}-{marker}-{event_time}"
+
+
+def wrap_message(service_code, body):
+ """包装消息(匹配真实数据中台格式)"""
+ session_id = now_timestamp("%Y%m%d%H%M%S%f")[:-3] # 精确到毫秒
+ return (
+ ''
+ ''
+ ''
+ f'{service_code}'
+ '1.0'
+ 'ADXP'
+ 'NAOMS'
+ ''
+ ''
+ ''
+ f'{session_id}'
+ f'{session_id}'
+ ''
+ ''
+ f'{body}'
+ ''
+ ''
+ )
+
+
+def build_arrival_message(flight_no, landing_time=None):
+ """构建到达消息(匹配真实格式)"""
+ event_time = landing_time or now_timestamp()
+ biz_key = build_biz_key(flight_no, "ARR", event_time)
+ flight_id = abs(hash(biz_key)) % 10000000 # 生成模拟 FlightId
+ body = (
+ ''
+ f'{flight_id}'
+ f'{biz_key}'
+ 'TAO'
+ 'ARR'
+ f'{event_time}'
+ ''
+ )
+ xml = wrap_message(SERVICE_CODES["ARR"], body)
+ return SERVICE_CODES["ARR"], "UPDATE", xml
+
+
+def build_departure_message(flight_no, offblock_time=None):
+ """构建离港AXOT消息(匹配真实CDM格式)"""
+ event_time_min = (offblock_time or now_timestamp())[:12] # AXOT精确到分钟 yyyyMMddHHmm
+ event_time = offblock_time or now_timestamp()
+ biz_key = build_biz_key(flight_no, "DEP", event_time)
+ body = (
+ ''
+ 'HCDM'
+ f' {biz_key}' # 注意BizKey有空格前缀
+ f'{event_time_min}'
+ ''
+ )
+ xml = wrap_message(SERVICE_CODES["AXOT"], body)
+ return SERVICE_CODES["AXOT"], "UPDATE", xml
+
+
+def build_runway_message(flight_no, movement, runway, event_time=None):
+ """构建跑道分配消息(匹配真实CDM格式)"""
+ evt_time = event_time or now_timestamp()
+ biz_key = build_biz_key(flight_no, movement, evt_time)
+ body = (
+ ''
+ f'{biz_key}'
+ f'{movement}'
+ f'{runway}'
+ ''
+ )
+ xml = wrap_message(SERVICE_CODES["RUNWAY"], body)
+ return SERVICE_CODES["RUNWAY"], "UPDATE", xml
+
+
+def build_craftseat_message(flight_no, movement, seat, event_time=None):
+ """构建机位分配消息(匹配真实DYN格式)"""
+ evt_time = event_time or now_timestamp()
+ biz_key = build_biz_key(flight_no, movement, evt_time)
+ flight_id = abs(hash(biz_key)) % 10000000
+ body = (
+ ''
+ f'{flight_id}'
+ f'{biz_key}'
+ ''
+ ''
+ f'{seat}'
+ f'{evt_time}'
+ f'{evt_time}'
+ f'{evt_time}'
+ f'{evt_time}'
+ ''
+ ''
+ ''
+ )
+ xml = wrap_message(SERVICE_CODES["CRAFTSEAT"], body)
+ return SERVICE_CODES["CRAFTSEAT"], "UPDATE", xml
+
+
+# ==================== 消息队列 ====================
+
+class MessageQueue:
+ """线程安全的消息队列"""
+
+ def __init__(self):
+ self.messages = [] # List of (service_code, action_code, xml)
+ self.lock = threading.Lock()
+ self.auto_push_enabled = False
+ self.auto_push_thread = None
+ self.auto_push_interval = 10
+
+ def add_message(self, service_code, action_code, xml):
+ with self.lock:
+ self.messages.append((service_code, action_code, xml))
+ logger.info(f"Added message: {service_code}")
+
+ def get_messages(self):
+ """获取并清空所有消息"""
+ with self.lock:
+ msgs = self.messages.copy()
+ self.messages.clear()
+ return msgs
+
+ def start_auto_push(self, interval=10):
+ if self.auto_push_enabled:
+ return
+ self.auto_push_enabled = True
+ self.auto_push_interval = interval
+ self.auto_push_thread = threading.Thread(target=self._auto_push_loop, daemon=True)
+ self.auto_push_thread.start()
+ logger.info(f"Auto-push started with interval {interval}s")
+
+ def stop_auto_push(self):
+ self.auto_push_enabled = False
+ if self.auto_push_thread:
+ self.auto_push_thread.join(timeout=2)
+ logger.info("Auto-push stopped")
+
+ def _auto_push_loop(self):
+ while self.auto_push_enabled:
+ try:
+ self._generate_sample_messages()
+ time.sleep(self.auto_push_interval)
+ except Exception as e:
+ logger.error(f"Auto-push error: {e}")
+
+ def _generate_sample_messages(self):
+ event_time = now_timestamp()
+
+ # MU5123 到达
+ self.add_message(*build_arrival_message("MU5123", event_time))
+ self.add_message(*build_runway_message("MU5123", "ARR", "35L", event_time))
+ self.add_message(*build_craftseat_message("MU5123", "ARR", "138", event_time))
+
+ # CA1234 离港
+ self.add_message(*build_departure_message("CA1234", event_time))
+ self.add_message(*build_runway_message("CA1234", "DEP", "17", event_time))
+ self.add_message(*build_craftseat_message("CA1234", "DEP", "201", event_time))
+
+ logger.info(f"Generated 6 sample messages at {event_time}")
+
+
+# 全局消息队列
+message_queue = MessageQueue()
+
+# 全局 token 存储(用户名 -> token)
+active_tokens = {}
+
+
+# ==================== SOAP 处理 ====================
+
+def create_soap_response(body_content):
+ """创建 SOAP 响应"""
+ # 使用字符串拼接方式创建 SOAP 响应,避免命名空间问题
+ body_xml = ET.tostring(body_content, encoding='utf-8').decode('utf-8')
+
+ soap_response = f'''
+
+
+ {body_xml}
+
+'''
+
+ return soap_response.encode('utf-8')
+
+
+def create_login_response(success, code, message):
+ """创建登录响应(完全匹配真实数据中台格式 - 直接返回 LoginResult)"""
+ # 直接创建 LoginResult 元素,不需要 loginResponse 包装
+ result = ET.Element('LoginResult')
+
+ success_elem = ET.SubElement(result, 'success')
+ success_elem.text = 'TRUE' if success else 'FALSE' # 全大写
+
+ code_elem = ET.SubElement(result, 'code')
+ code_elem.text = str(code)
+
+ # message 字段返回 token(成功时)或错误信息(失败时)
+ message_elem = ET.SubElement(result, 'message')
+ message_elem.text = message if message else ''
+
+ return create_soap_response(result)
+
+
+def create_receive_message_response(messages):
+ """创建接收消息响应(匹配 SDK 期望的 receiveMessageResponse)"""
+ import uuid
+
+ # 创建 SOAP 操作响应元素(SDK 期望带命名空间)
+ response = ET.Element('{' + NS_MESSAGE + '}receiveMessageResponse')
+ result = ET.SubElement(response, 'MessageResult')
+
+ success_elem = ET.SubElement(result, 'success')
+ success_elem.text = 'TRUE'
+
+ code_elem = ET.SubElement(result, 'code')
+ code_elem.text = '0'
+
+ guid_elem = ET.SubElement(result, 'guid')
+ guid_elem.text = str(uuid.uuid4()).replace('-', '')
+
+ # MessageList - 大写 M!SDK JAXB 要求,每个 Msg 包含 Head 和 Body
+ msg_list = ET.SubElement(result, 'MessageList')
+ for service_code, action_code, xml_content in messages:
+ msg_elem = ET.SubElement(msg_list, 'Msg') # 注意:Msg 首字母大写
+
+ # 解析 xml_content,提取 Head 和 Body 元素
+ try:
+ import xml.etree.ElementTree as ET_parse
+ msg_root = ET_parse.fromstring(xml_content)
+ # 找到 Head 和 Body 元素并添加到 Msg 中
+ for child in msg_root:
+ if child.tag in ('Head', 'Body'):
+ msg_elem.append(child)
+ except Exception as e:
+ logger.error(f"解析消息 XML 失败: {e}")
+
+ return create_soap_response(response)
+
+
+def parse_soap_request(xml_data):
+ """解析 SOAP 请求(支持 Header 中的 token 验证)"""
+ try:
+ root = ET.fromstring(xml_data)
+
+ # 查找 Header(可能包含 username 和 token)
+ header = root.find('.//{' + NS_SOAP + '}Header')
+ auth_info = {}
+ if header is not None:
+ # 尝试提取 username 和 token(可能在不同命名空间)
+ for elem in header.iter():
+ if elem.tag.endswith('username') and elem.text:
+ auth_info['username'] = elem.text
+ elif elem.tag.endswith('token') and elem.text:
+ auth_info['token'] = elem.text
+
+ # 查找 Body
+ body = root.find('.//{' + NS_SOAP + '}Body')
+ if body is None:
+ return None, None
+
+ # 查找 login 操作
+ login_elem = body.find('.//{' + NS_LOGIN + '}login')
+ if login_elem is not None:
+ username_elem = login_elem.find('.//{' + NS_LOGIN + '}username')
+ password_elem = login_elem.find('.//{' + NS_LOGIN + '}password')
+ # 也尝试无命名空间的元素
+ if username_elem is None:
+ username_elem = login_elem.find('.//username')
+ if password_elem is None:
+ password_elem = login_elem.find('.//password')
+
+ params = {
+ 'username': username_elem.text if username_elem is not None else None,
+ 'password': password_elem.text if password_elem is not None else None
+ }
+ params.update(auth_info) # 添加 Header 中的认证信息
+ return 'login', params
+
+ # 查找 receiveMessage 操作(可能在 LoginService 或 MessageService 命名空间)
+ receive_elem = body.find('.//{' + NS_LOGIN + '}receiveMessage')
+ if receive_elem is None:
+ receive_elem = body.find('.//{' + NS_MESSAGE + '}receiveMessage')
+ if receive_elem is not None:
+ return 'receiveMessage', auth_info # 返回 Header 中的认证信息
+
+ # 查找 getInterval 操作(HeartbeatService)
+ # 尝试查找任何命名空间的 getInterval
+ for elem in body.iter():
+ if elem.tag.endswith('getInterval'):
+ return 'getInterval', auth_info
+
+ return None, None
+ except Exception as e:
+ logger.error(f"Parse SOAP request error: {e}", exc_info=True)
+ return None, None
+
+
+def handle_login(username, password):
+ """处理登录,生成并返回 token"""
+ logger.info(f"Login request: username={username}")
+
+ # 检查用户名和密码是否为None
+ if username is None or password is None:
+ logger.warning("❌ Login failed: username or password is None")
+ return create_login_response(False, 803, "用户名或密码不能为空")
+
+ if username == "dianxin" and password == "dianxin@123":
+ # 生成 token(使用 UUID,32位十六进制)
+ import uuid
+ token = str(uuid.uuid4()).replace('-', '')
+ active_tokens[username] = token
+ logger.info(f"✅ Login successful: username={username}, token={token[:16] if token else ''}...")
+ return create_login_response(True, 0, token) # message 字段返回 token
+ else:
+ logger.warning(f"❌ Login failed: username={username}")
+ return create_login_response(False, 803, "用户名或密码错误")
+
+
+def handle_receive_message(auth_info=None):
+ """处理接收消息(无验证,直接返回)"""
+ # 完全跳过 token 验证,直接返回消息
+ messages = message_queue.get_messages()
+ # 只在有消息时记录日志,减少噪音
+ if len(messages) > 0:
+ logger.info(f"✅ receiveMessage returned {len(messages)} messages")
+ return create_receive_message_response(messages)
+
+
+def handle_get_interval():
+ """处理 getInterval 请求(HeartbeatService)"""
+ # 返回心跳间隔(毫秒),SDK 用这个来检查连接是否正常
+ response = ET.Element('getIntervalResponse')
+ interval = ET.SubElement(response, 'return')
+ interval.text = '60000' # 60 秒心跳间隔
+ return create_soap_response(response)
+
+
+# ==================== WSDL 定义 ====================
+
+WSDL_TEMPLATE = '''
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+'''
+
+
+# ==================== HTTP 处理器 ====================
+
+class SOAPRequestHandler(BaseHTTPRequestHandler):
+ """SOAP 请求处理器"""
+
+ def log_message(self, format, *args):
+ """自定义日志 - 禁用 HTTP 请求日志减少噪音"""
+ pass # 不记录每个 HTTP 请求
+
+ def do_GET(self):
+ """处理 GET 请求(WSDL)"""
+ # 支持多个服务路径
+ if any(self.path.startswith(p) for p in ['/LoginService', '/MessageService', '/HeartbeatService', '/adxp']):
+ if '?wsdl' in self.path.lower():
+ # 动态生成 WSDL,替换端点 URL
+ host = self.headers.get('Host', 'localhost:8086')
+
+ # 根据路径确定服务名
+ if '/MessageService' in self.path:
+ service_name = 'MessageService'
+ elif '/HeartbeatService' in self.path:
+ service_name = 'HeartbeatService'
+ else:
+ service_name = 'LoginService'
+
+ endpoint_url = f"http://{host}/{service_name}"
+ wsdl_content = WSDL_TEMPLATE.replace('{{ENDPOINT_URL}}', endpoint_url)
+ wsdl_content = wsdl_content.replace('LoginService', service_name)
+
+ self.send_response(200)
+ self.send_header('Content-Type', 'text/xml; charset=utf-8')
+ self.end_headers()
+ self.wfile.write(wsdl_content.encode('utf-8'))
+ else:
+ self.send_response(200)
+ self.send_header('Content-Type', 'text/html; charset=utf-8')
+ self.end_headers()
+ html = '''
+ADXP Mock Service
+
+ADXP SOAP WebService Mock Server
+WSDL: /LoginService?wsdl
+Endpoint: POST /LoginService
+'''
+ self.wfile.write(html.encode('utf-8'))
+ else:
+ self.send_error(404)
+
+ def do_POST(self):
+ """处理 POST 请求(SOAP)"""
+ # 支持多个服务路径
+ valid_paths = ['/LoginService', '/MessageService', '/HeartbeatService', '/adxp']
+ if not any(self.path.startswith(p) for p in valid_paths):
+ self.send_error(404)
+ return
+
+ # 读取请求体
+ content_length = int(self.headers.get('Content-Length', 0))
+ body = self.rfile.read(content_length)
+
+ # 打印原始请求(调试用)
+ logger.debug("=" * 60)
+ logger.debug("Received SOAP Request:")
+ logger.debug(body.decode('utf-8', errors='ignore'))
+ logger.debug("=" * 60)
+
+ # 解析 SOAP 请求
+ operation, params = parse_soap_request(body)
+
+ if operation == 'login':
+ # 确保 params 不为 None 且包含必要的字段
+ if params is None:
+ logger.error("Login request has no parameters")
+ response = create_login_response(False, 801, "登录参数错误")
+ else:
+ # 确保username和password不为None
+ username = params.get('username') if params.get('username') is not None else ""
+ password = params.get('password') if params.get('password') is not None else ""
+ response = handle_login(username, password)
+ elif operation == 'receiveMessage':
+ # 确保 params 不为 None
+ response = handle_receive_message(params if params is not None else {}) # 传入 auth_info
+ elif operation == 'getInterval':
+ response = handle_get_interval()
+ else:
+ logger.error(f"Unknown operation: {operation}")
+ logger.error(f"Request body: {body.decode('utf-8', errors='ignore')}")
+ self.send_error(400, "Unknown operation")
+ return
+
+ # 打印响应(调试用)
+ logger.debug("Sending SOAP Response:")
+ logger.debug(response.decode('utf-8', errors='ignore'))
+ logger.debug("=" * 60)
+
+ # 发送响应
+ self.send_response(200)
+ self.send_header('Content-Type', 'text/xml; charset=utf-8')
+ self.send_header('Content-Length', str(len(response)))
+ self.end_headers()
+ self.wfile.write(response)
+
+
+class ThreadedHTTPServer(socketserver.ThreadingMixIn, socketserver.TCPServer):
+ """支持多线程的 HTTP 服务器"""
+ allow_reuse_address = True
+ daemon_threads = True
+
+
+# ==================== 主程序 ====================
+
+def main():
+ parser = argparse.ArgumentParser(description="ADXP SOAP WebService Mock Server")
+ parser.add_argument("--host", default="0.0.0.0", help="Server host (default: 0.0.0.0)")
+ parser.add_argument("--port", type=int, default=8086, help="Server port (default: 8086)")
+ parser.add_argument("--auto", action="store_true", help="Enable auto-push on startup")
+ parser.add_argument("--interval", type=int, default=10, help="Auto-push interval in seconds (default: 10)")
+ args = parser.parse_args()
+
+ # 启动自动推送
+ if args.auto:
+ message_queue.start_auto_push(args.interval)
+
+ # 启动服务器
+ server = ThreadedHTTPServer((args.host, args.port), SOAPRequestHandler)
+
+ logger.info("=" * 70)
+ logger.info("ADXP SOAP WebService Mock Server")
+ logger.info("=" * 70)
+ logger.info(f"WSDL: http://{args.host}:{args.port}/LoginService?wsdl")
+ logger.info(f"Endpoint: http://{args.host}:{args.port}/LoginService")
+ logger.info(f"Auto-push: {'ENABLED' if args.auto else 'DISABLED'}")
+ if args.auto:
+ logger.info(f"Auto-push interval: {args.interval}s")
+ logger.info(f"Credentials: dianxin / dianxin@123")
+ logger.info("=" * 70)
+
+ try:
+ server.serve_forever()
+ except KeyboardInterrupt:
+ logger.info("Shutting down...")
+ message_queue.stop_auto_push()
+ server.shutdown()
+
+
+if __name__ == "__main__":
+ main()