192 lines
6.1 KiB
Python
192 lines
6.1 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
"""
|
||
QAUP 打包验证脚本(Python)
|
||
|
||
目的:
|
||
验证 qaup-admin.jar(Spring Boot fat jar)中是否包含“最新改动”的关键特征字符串。
|
||
通过读取 qaup-admin.jar -> BOOT-INF/lib/qaup-collision*.jar & qaup-framework*.jar
|
||
-> 读取指定 .class 条目 -> 在 class 常量池字节中搜索关键字符串(UTF-8)。
|
||
|
||
用法(在项目根目录执行):
|
||
python verify-jar.py
|
||
python verify-jar.py --jar qaup-admin/target/qaup-admin.jar
|
||
|
||
退出码:
|
||
0 = PASS(命中至少一个关键特征)
|
||
2 = FAIL(未命中关键特征,通常表示不是新包)
|
||
3 = FAIL(缺 jar 或缺 class,或不是 Spring Boot fat jar)
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import argparse
|
||
import io
|
||
import os
|
||
import sys
|
||
import zipfile
|
||
from dataclasses import dataclass
|
||
from typing import Iterable, List, Optional, Tuple
|
||
|
||
|
||
DEFAULT_ADMIN_JAR = os.path.join("qaup-admin", "target", "qaup-admin.jar")
|
||
|
||
|
||
NEEDLES: List[str] = [
|
||
# DataCollectorDao: 新增的失败参数打印
|
||
"bodySummary=",
|
||
"进港路由请求URL",
|
||
"出港路由请求URL",
|
||
# 补齐标记字段
|
||
"inRunwayPatched",
|
||
"outRunwayPatched",
|
||
# DataProcessingService: 新增的失败时参数打印包含 patched=
|
||
"patched=",
|
||
# SecurityConfig: debug 放行开关
|
||
"qaup.debug.runway-path-planning.enabled",
|
||
]
|
||
|
||
|
||
@dataclass(frozen=True)
|
||
class ClassTarget:
|
||
name: str
|
||
jar_kind: str # "collision" | "framework"
|
||
class_entry: str
|
||
|
||
|
||
CLASS_TARGETS: List[ClassTarget] = [
|
||
ClassTarget(
|
||
name="DataCollectorDao",
|
||
jar_kind="collision",
|
||
class_entry="com/qaup/collision/datacollector/dao/DataCollectorDao.class",
|
||
),
|
||
ClassTarget(
|
||
name="DataProcessingService",
|
||
jar_kind="collision",
|
||
class_entry="com/qaup/collision/dataprocessing/service/DataProcessingService.class",
|
||
),
|
||
ClassTarget(
|
||
name="AircraftRouteUpdateEvent",
|
||
jar_kind="collision",
|
||
class_entry="com/qaup/collision/websocket/event/AircraftRouteUpdateEvent.class",
|
||
),
|
||
ClassTarget(
|
||
name="SecurityConfig",
|
||
jar_kind="framework",
|
||
class_entry="com/qaup/framework/config/SecurityConfig.class",
|
||
),
|
||
]
|
||
|
||
|
||
def eprint(*args: object) -> None:
|
||
print(*args, file=sys.stderr)
|
||
|
||
|
||
def read_zip_entry(zf: zipfile.ZipFile, entry_name: str) -> bytes:
|
||
with zf.open(entry_name, "r") as f:
|
||
return f.read()
|
||
|
||
|
||
def find_first_entry_by_prefix(zf: zipfile.ZipFile, prefix: str) -> Optional[str]:
|
||
for n in zf.namelist():
|
||
if n.startswith(prefix):
|
||
return n
|
||
return None
|
||
|
||
|
||
def find_lib_jar(zf_admin: zipfile.ZipFile, lib_prefix: str) -> Optional[str]:
|
||
# BOOT-INF/lib/qaup-collision*.jar
|
||
candidates = [n for n in zf_admin.namelist() if n.startswith("BOOT-INF/lib/") and lib_prefix in os.path.basename(n)]
|
||
candidates.sort()
|
||
return candidates[0] if candidates else None
|
||
|
||
|
||
def scan_bytes_for_needles(blob: bytes, needles: Iterable[str]) -> List[str]:
|
||
hits: List[str] = []
|
||
for s in needles:
|
||
b = s.encode("utf-8", errors="strict")
|
||
if b in blob:
|
||
hits.append(s)
|
||
return hits
|
||
|
||
|
||
def main(argv: Optional[List[str]] = None) -> int:
|
||
parser = argparse.ArgumentParser(description="Verify qaup-admin.jar contains recent route changes.")
|
||
parser.add_argument("--jar", dest="jar_path", default=DEFAULT_ADMIN_JAR, help="Path to qaup-admin.jar")
|
||
args = parser.parse_args(argv)
|
||
|
||
jar_path = os.path.abspath(args.jar_path)
|
||
print(f"JarPath: {jar_path}")
|
||
if not os.path.exists(jar_path):
|
||
eprint(f"ERROR: jar 不存在: {jar_path}")
|
||
return 3
|
||
|
||
try:
|
||
with zipfile.ZipFile(jar_path, "r") as zf_admin:
|
||
lib_dir = find_first_entry_by_prefix(zf_admin, "BOOT-INF/lib/")
|
||
if lib_dir is None:
|
||
eprint("ERROR: 未发现 BOOT-INF/lib/,可能不是 Spring Boot fat jar。")
|
||
return 3
|
||
|
||
collision_jar_entry = find_lib_jar(zf_admin, "qaup-collision")
|
||
framework_jar_entry = find_lib_jar(zf_admin, "qaup-framework")
|
||
if collision_jar_entry is None:
|
||
eprint("ERROR: 未找到 BOOT-INF/lib/qaup-collision*.jar")
|
||
return 3
|
||
if framework_jar_entry is None:
|
||
eprint("ERROR: 未找到 BOOT-INF/lib/qaup-framework*.jar")
|
||
return 3
|
||
|
||
print(f"collisionJarEntry: {collision_jar_entry}")
|
||
print(f"frameworkJarEntry: {framework_jar_entry}")
|
||
|
||
collision_bytes = read_zip_entry(zf_admin, collision_jar_entry)
|
||
framework_bytes = read_zip_entry(zf_admin, framework_jar_entry)
|
||
|
||
jars = {
|
||
"collision": zipfile.ZipFile(io.BytesIO(collision_bytes), "r"),
|
||
"framework": zipfile.ZipFile(io.BytesIO(framework_bytes), "r"),
|
||
}
|
||
|
||
any_hit = False
|
||
missing = False
|
||
|
||
print("\n=== Scan class signatures ===")
|
||
for t in CLASS_TARGETS:
|
||
zf = jars[t.jar_kind]
|
||
if t.class_entry not in zf.namelist():
|
||
print(f"MISSING class: {t.name} -> {t.jar_kind}:{t.class_entry}")
|
||
missing = True
|
||
continue
|
||
blob = read_zip_entry(zf, t.class_entry)
|
||
hits = scan_bytes_for_needles(blob, NEEDLES)
|
||
any_hit = any_hit or bool(hits)
|
||
print(f"OK class: {t.name}")
|
||
print(" hits: " + (", ".join(hits) if hits else "<none>"))
|
||
|
||
for zf in jars.values():
|
||
zf.close()
|
||
|
||
print("\n=== Result ===")
|
||
if missing:
|
||
print("FAIL: 依赖 jar 或 class 缺失,无法验证(可能打包不完整)")
|
||
return 3
|
||
if any_hit:
|
||
print("PASS: 已命中关键特征字符串(大概率是最新改动包)")
|
||
return 0
|
||
|
||
print("FAIL: 未命中任何关键特征字符串(大概率不是最新改动包)")
|
||
return 2
|
||
|
||
except zipfile.BadZipFile:
|
||
eprint("ERROR: jar 不是有效的 zip/jar 文件")
|
||
return 3
|
||
except Exception as ex:
|
||
eprint(f"ERROR: 执行失败: {ex}")
|
||
return 3
|
||
|
||
|
||
if __name__ == "__main__":
|
||
raise SystemExit(main())
|
||
|