Add face recognition log analyzer

This commit is contained in:
tian 2026-04-17 10:17:44 +08:00
parent 5644a5bf0b
commit 02ca4fed52
3 changed files with 354 additions and 0 deletions

View File

@ -302,6 +302,19 @@ pre_rgb -> face_det -> person_det -> person_trk -> face_recog -> ...
- `uncertain`:像某个已知人,但证据不足;不会直接当作陌生人。
- `unknown`:保留给明确陌生人语义。当前人脸识别节点主要输出 `known` / `uncertain`,陌生人告警由 alarm 结合 track 聚合、质量门槛和“没有已知人证据”来判断。
日志分析:
```bash
python3 tools/analyze_face_recog_log.py /tmp/media-server.log
```
本地从 RK3588 拉回日志后也可以直接分析:
```powershell
scp orangepi@10.0.0.81:/tmp/media-server.log .\logs\media-server_latest.log
python tools/analyze_face_recog_log.py .\logs\media-server_latest.log
```
人脸库说明:
- SQLite 人脸库支持同一个人多条 embedding。

View File

@ -0,0 +1,54 @@
import importlib.util
import pathlib
import sys
import textwrap
import unittest
REPO_ROOT = pathlib.Path(__file__).resolve().parents[1]
SCRIPT_PATH = REPO_ROOT / "tools" / "analyze_face_recog_log.py"
def load_module():
spec = importlib.util.spec_from_file_location("analyze_face_recog_log", SCRIPT_PATH)
module = importlib.util.module_from_spec(spec)
sys.modules[spec.name] = module
spec.loader.exec_module(module)
return module
class FaceRecogLogAnalysisTest(unittest.TestCase):
def test_summarizes_face_matches_tracks_and_uploads(self):
module = load_module()
sample = textwrap.dedent(
"""
RK3588 Media Server v0.1.0 (git abc1234)
[1000][I] [ai_face_recog] gallery loaded: n=15 dim=512
[1001][I] [ai_face_recog] start id=face_recog align=true thr_accept=0.450000 thr_margin=0.050000 infer_interval_ms=500 shared_target_key=graph:cam1:tracked_targets debug=true debug_log_matches=true debug_min_log_interval_ms=0
[1002][I] [ai_face_recog] track_assoc id=face_recog frame=10 source=shared_state person_class_id=0 face_bbox=(10,20,30,40) dets=1 person_dets=1 tracked_person_dets=1 containing_tracked_person_dets=1 best_track_id=7 best_overlap=900.0
[1003][I] [ai_face_recog] frame id=face_recog frame=10 faces_in=2 recog_items=2
[1004][I] [ai_face_recog] match id=face_recog frame=10 status=known person_track_id=7 candidate=reg_001 candidate_id=1 best_sim=0.58 second_sim=0.44 sim_margin=0.14 bbox=(10,20,30,40)
[1005][I] [ai_face_recog] match id=face_recog frame=10 status=uncertain person_track_id=-1 candidate=reg_002 candidate_id=2 best_sim=0.42 second_sim=0.39 sim_margin=0.03 bbox=(50,60,12,18)
[1006][I] [ALARM][info] 2026-04-17 10:00:00 node=alarm rule=known_person:reg_001 frame=10 detections=[]
[1007][I] [ExternalApiAction] token fetched successfully
[1008][I] [ExternalApiAction] send ok http=200 alarm_content=known_person:reg_001 pic_url=a video_url=b
"""
).strip().splitlines()
summary = module.analyze_lines(sample)
self.assertEqual(summary.version_git, "abc1234")
self.assertEqual(summary.gallery_count, 15)
self.assertEqual(summary.gallery_dim, 512)
self.assertEqual(summary.shared_target_key, "graph:cam1:tracked_targets")
self.assertEqual(summary.match_total, 2)
self.assertEqual(summary.status_counts["known"], 1)
self.assertEqual(summary.status_counts["uncertain"], 1)
self.assertEqual(summary.track_id_missing, 1)
self.assertEqual(summary.alarm_counts["known_person:reg_001"], 1)
self.assertEqual(summary.external_send_counts["ok"], 1)
self.assertEqual(summary.quantiles["known"]["best_sim"]["max"], 0.58)
if __name__ == "__main__":
unittest.main()

View File

@ -0,0 +1,287 @@
#!/usr/bin/env python3
"""Analyze RK3588 media-server face-recognition logs."""
from __future__ import annotations
import argparse
from collections import Counter, defaultdict
from dataclasses import dataclass, field
from pathlib import Path
import re
from typing import Iterable
VERSION_RE = re.compile(r"RK3588 Media Server .* \(git ([^)]+)\)")
GALLERY_RE = re.compile(r"\[ai_face_recog\] gallery loaded: n=(\d+) dim=(\d+)")
START_RE = re.compile(
r"\[ai_face_recog\] start .*?thr_accept=([0-9.\-]+) "
r"thr_margin=([0-9.\-]+).*?infer_interval_ms=(\d+) "
r"shared_target_key=([^ ]+)"
)
FRAME_RE = re.compile(
r"\[(\d+)\].*\[ai_face_recog\] frame .*frame=(\d+) "
r"faces_in=(\d+) recog_items=(\d+)"
)
MATCH_RE = re.compile(
r"\[(\d+)\].*\[ai_face_recog\] match .*frame=(\d+) "
r"status=(\w+) person_track_id=(-?\d+) candidate=([^\s]+) "
r"candidate_id=(-?\d+) best_sim=([0-9.\-]+) second_sim=([0-9.\-]+) "
r"sim_margin=([0-9.\-]+) bbox=\((\d+),(\d+),(\d+),(\d+)\)"
)
ASSOC_RE = re.compile(
r"\[(\d+)\].*\[ai_face_recog\] track_assoc .*frame=(\d+) "
r"source=(\w+) .*best_track_id=(-?\d+)"
)
ALARM_RE = re.compile(r"\[(\d+)\].*\[ALARM\].* rule=([^\s]+) frame=(\d+)")
TOKEN_RE = re.compile(r"\[ExternalApiAction\] token fetched successfully")
SEND_RE = re.compile(r"\[(\d+)\].*\[ExternalApiAction\] send (ok|failed).*?(?:alarm_content=([^\s]+))?")
@dataclass
class MatchItem:
ts_ms: int
frame: int
status: str
track_id: int
candidate: str
candidate_id: int
best_sim: float
second_sim: float
sim_margin: float
bbox: tuple[int, int, int, int]
@property
def area(self) -> int:
return self.bbox[2] * self.bbox[3]
@dataclass
class LogSummary:
version_git: str | None = None
gallery_count: int | None = None
gallery_dim: int | None = None
threshold_accept: float | None = None
threshold_margin: float | None = None
infer_interval_ms: int | None = None
shared_target_key: str | None = None
frame_count: int = 0
faces_in_sum: int = 0
recog_items_sum: int = 0
match_total: int = 0
status_counts: Counter[str] = field(default_factory=Counter)
candidate_counts: Counter[str] = field(default_factory=Counter)
known_candidate_counts: Counter[str] = field(default_factory=Counter)
unknown_candidate_counts: Counter[str] = field(default_factory=Counter)
track_id_missing: int = 0
known_track_id_missing: int = 0
assoc_total: int = 0
assoc_source_counts: Counter[str] = field(default_factory=Counter)
assoc_track_id_missing: int = 0
alarm_counts: Counter[str] = field(default_factory=Counter)
alarm_frames: list[tuple[str, int]] = field(default_factory=list)
external_token_success: int = 0
external_send_counts: Counter[str] = field(default_factory=Counter)
external_sends: list[tuple[str, str]] = field(default_factory=list)
quantiles: dict[str, dict[str, dict[str, float | int]]] = field(default_factory=dict)
tracks_total: int = 0
track_summaries: list[dict[str, object]] = field(default_factory=list)
first_match_ts_ms: int | None = None
last_match_ts_ms: int | None = None
first_frame: int | None = None
last_frame: int | None = None
def _quantiles(values: list[float | int]) -> dict[str, float | int]:
if not values:
return {}
values = sorted(values)
def q(pos: float) -> float | int:
return values[min(len(values) - 1, int(round((len(values) - 1) * pos)))]
return {"min": values[0], "p50": q(0.5), "p90": q(0.9), "max": values[-1]}
def _pct(count: int, total: int) -> str:
return "n/a" if total <= 0 else f"{100.0 * count / total:.1f}%"
def analyze_lines(lines: Iterable[str]) -> LogSummary:
summary = LogSummary()
matches: list[MatchItem] = []
by_track: dict[int, list[MatchItem]] = defaultdict(list)
for line in lines:
if summary.version_git is None and (m := VERSION_RE.search(line)):
summary.version_git = m.group(1)
if m := GALLERY_RE.search(line):
summary.gallery_count = int(m.group(1))
summary.gallery_dim = int(m.group(2))
if m := START_RE.search(line):
summary.threshold_accept = float(m.group(1))
summary.threshold_margin = float(m.group(2))
summary.infer_interval_ms = int(m.group(3))
summary.shared_target_key = m.group(4)
if m := FRAME_RE.search(line):
summary.frame_count += 1
summary.faces_in_sum += int(m.group(3))
summary.recog_items_sum += int(m.group(4))
if m := ASSOC_RE.search(line):
summary.assoc_total += 1
summary.assoc_source_counts[m.group(3)] += 1
if int(m.group(4)) < 0:
summary.assoc_track_id_missing += 1
if m := ALARM_RE.search(line):
rule = m.group(2)
frame = int(m.group(3))
summary.alarm_counts[rule] += 1
summary.alarm_frames.append((rule, frame))
if TOKEN_RE.search(line):
summary.external_token_success += 1
if m := SEND_RE.search(line):
status = m.group(2)
content = m.group(3) or ""
summary.external_send_counts[status] += 1
summary.external_sends.append((status, content))
if m := MATCH_RE.search(line):
item = MatchItem(
ts_ms=int(m.group(1)),
frame=int(m.group(2)),
status=m.group(3),
track_id=int(m.group(4)),
candidate=m.group(5),
candidate_id=int(m.group(6)),
best_sim=float(m.group(7)),
second_sim=float(m.group(8)),
sim_margin=float(m.group(9)),
bbox=(int(m.group(10)), int(m.group(11)), int(m.group(12)), int(m.group(13))),
)
matches.append(item)
summary.status_counts[item.status] += 1
summary.candidate_counts[item.candidate] += 1
if item.status == "known":
summary.known_candidate_counts[item.candidate] += 1
if item.status == "unknown":
summary.unknown_candidate_counts[item.candidate] += 1
if item.track_id < 0:
summary.track_id_missing += 1
if item.status == "known":
summary.known_track_id_missing += 1
else:
by_track[item.track_id].append(item)
summary.match_total = len(matches)
if matches:
summary.first_match_ts_ms = matches[0].ts_ms
summary.last_match_ts_ms = matches[-1].ts_ms
summary.first_frame = matches[0].frame
summary.last_frame = matches[-1].frame
for status in ("known", "uncertain", "unknown"):
status_items = [item for item in matches if item.status == status]
summary.quantiles[status] = {
"best_sim": _quantiles([item.best_sim for item in status_items]),
"sim_margin": _quantiles([item.sim_margin for item in status_items]),
"bbox_area": _quantiles([item.area for item in status_items]),
}
summary.tracks_total = len(by_track)
track_rows = []
for track_id, items in by_track.items():
status_counts = Counter(item.status for item in items)
cand_counts = Counter(item.candidate for item in items)
track_rows.append(
{
"track_id": track_id,
"count": len(items),
"status_counts": dict(status_counts),
"top_candidates": cand_counts.most_common(3),
"best_max": max(item.best_sim for item in items),
"first_frame": items[0].frame,
"last_frame": items[-1].frame,
}
)
summary.track_summaries = sorted(
track_rows,
key=lambda row: (-row["status_counts"].get("known", 0), -row["count"], row["track_id"]),
)[:12]
return summary
def format_summary(summary: LogSummary) -> str:
lines: list[str] = []
lines.append("Face Recognition Log Summary")
lines.append("")
lines.append("Startup:")
lines.append(f"- git: {summary.version_git or 'unknown'}")
lines.append(f"- gallery: n={summary.gallery_count if summary.gallery_count is not None else 'unknown'} dim={summary.gallery_dim if summary.gallery_dim is not None else 'unknown'}")
lines.append(f"- threshold: accept={summary.threshold_accept if summary.threshold_accept is not None else 'unknown'} margin={summary.threshold_margin if summary.threshold_margin is not None else 'unknown'}")
lines.append(f"- infer_interval_ms: {summary.infer_interval_ms if summary.infer_interval_ms is not None else 'unknown'}")
lines.append(f"- shared_target_key: {summary.shared_target_key or 'unknown'}")
lines.append("")
lines.append("Recognition:")
if summary.first_match_ts_ms is not None and summary.last_match_ts_ms is not None:
lines.append(f"- match_time_span_ms: {summary.last_match_ts_ms - summary.first_match_ts_ms}")
if summary.first_frame is not None and summary.last_frame is not None:
lines.append(f"- frame_span: {summary.first_frame} -> {summary.last_frame}")
lines.append(f"- frames_with_face_recog: {summary.frame_count}")
lines.append(f"- faces_in_sum: {summary.faces_in_sum}")
lines.append(f"- recog_items_sum: {summary.recog_items_sum}")
lines.append(f"- match_total: {summary.match_total}")
for status in ("known", "uncertain", "unknown"):
count = summary.status_counts.get(status, 0)
lines.append(f"- {status}: {count} ({_pct(count, summary.match_total)})")
lines.append(f"- person_track_id=-1: {summary.track_id_missing} ({_pct(summary.track_id_missing, summary.match_total)})")
lines.append(f"- known person_track_id=-1: {summary.known_track_id_missing}/{summary.status_counts.get('known', 0)}")
lines.append("")
lines.append("Candidates:")
lines.append(f"- top candidates: {summary.candidate_counts.most_common(10)}")
lines.append(f"- known candidates: {summary.known_candidate_counts.most_common()}")
lines.append(f"- unknown candidates: {summary.unknown_candidate_counts.most_common()}")
lines.append("")
lines.append("Quality Quantiles:")
for status in ("known", "uncertain", "unknown"):
lines.append(f"- {status} best_sim: {summary.quantiles.get(status, {}).get('best_sim', {})}")
lines.append(f"- {status} sim_margin: {summary.quantiles.get(status, {}).get('sim_margin', {})}")
lines.append(f"- {status} bbox_area: {summary.quantiles.get(status, {}).get('bbox_area', {})}")
lines.append("")
lines.append("Track Association:")
lines.append(f"- assoc_total: {summary.assoc_total}")
lines.append(f"- assoc_source_counts: {summary.assoc_source_counts.most_common()}")
lines.append(f"- assoc_track_id=-1: {summary.assoc_track_id_missing}")
lines.append(f"- tracks_total: {summary.tracks_total}")
for row in summary.track_summaries:
lines.append(
"- track {track_id}: n={count} status={status_counts} candidates={top_candidates} "
"best_max={best_max:.2f} frames={first_frame}->{last_frame}".format(**row)
)
lines.append("")
lines.append("Alarms And Uploads:")
lines.append(f"- alarms: {sum(summary.alarm_counts.values())} {summary.alarm_counts.most_common()}")
lines.append(f"- alarm_frames: {summary.alarm_frames}")
lines.append(f"- external_token_success: {summary.external_token_success}")
lines.append(f"- external_sends: {sum(summary.external_send_counts.values())} {summary.external_send_counts.most_common()}")
return "\n".join(lines)
def main() -> int:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("log_file", type=Path, help="Path to media-server log file")
args = parser.parse_args()
lines = args.log_file.read_text(encoding="utf-8", errors="replace").splitlines()
print(format_summary(analyze_lines(lines)))
return 0
if __name__ == "__main__":
raise SystemExit(main())