OrangePi3588Media/tools/analyze_face_recog_log.py

415 lines
18 KiB
Python

#!/usr/bin/env python3
"""Analyze RK3588 media-server face-recognition logs."""
from __future__ import annotations
import argparse
from collections import Counter, defaultdict
from dataclasses import dataclass, field
from pathlib import Path
import re
from typing import Iterable
VERSION_RE = re.compile(r"RK3588 Media Server .* \(git ([^)]+)\)")
GALLERY_RE = re.compile(r"\[ai_face_recog\] gallery loaded: n=(\d+) dim=(\d+)")
START_RE = re.compile(
r"\[ai_face_recog\] start .*?thr_accept=([0-9.\-]+) "
r"thr_margin=([0-9.\-]+).*?infer_interval_ms=(\d+) "
r"shared_target_key=([^ ]+)"
)
FRAME_RE = re.compile(
r"\[(\d+)\].*\[ai_face_recog\] frame .*frame=(\d+) "
r"faces_in=(\d+) recog_items=(\d+)"
)
MATCH_RE = re.compile(
r"\[(\d+)\].*\[ai_face_recog\] match .*frame=(\d+) "
r"status=(\w+) person_track_id=(-?\d+) candidate=([^\s]+) "
r"candidate_id=(-?\d+) best_sim=([0-9.\-]+) second_sim=([0-9.\-]+) "
r"sim_margin=([0-9.\-]+) bbox=\((\d+),(\d+),(\d+),(\d+)\)"
)
ASSOC_RE = re.compile(
r"\[(\d+)\].*\[ai_face_recog\] track_assoc .*frame=(\d+) "
r"source=(\w+) .*best_track_id=(-?\d+)"
)
ALARM_RE = re.compile(r"\[(\d+)\].*\[ALARM\].* rule=([^\s]+) frame=(\d+)")
TOKEN_RE = re.compile(r"\[ExternalApiAction\] token fetched successfully")
SEND_RE = re.compile(r"\[(\d+)\].*\[ExternalApiAction\] send (ok|failed).*?(?:alarm_content=([^\s]+))?")
UNKNOWN_CANDIDATE_RE = re.compile(r"\[(\d+)\].*\[alarm\] unknown_candidate (.*)")
KEY_VALUE_RE = re.compile(r"(\w+)=([^\s]+)")
@dataclass
class MatchItem:
ts_ms: int
frame: int
status: str
track_id: int
candidate: str
candidate_id: int
best_sim: float
second_sim: float
sim_margin: float
bbox: tuple[int, int, int, int]
@property
def area(self) -> int:
return self.bbox[2] * self.bbox[3]
@dataclass
class UnknownCandidateItem:
ts_ms: int
frame: int
status: str
track_id: int
candidate: str
best_sim: float
area_ratio: float
aspect: float
rule_matched: bool
reject_reason: str
gate: str
track_age_ms: int
quality_hits: int
quality_hits_required: int
@dataclass
class LogSummary:
version_git: str | None = None
gallery_count: int | None = None
gallery_dim: int | None = None
threshold_accept: float | None = None
threshold_margin: float | None = None
infer_interval_ms: int | None = None
shared_target_key: str | None = None
frame_count: int = 0
faces_in_sum: int = 0
recog_items_sum: int = 0
match_total: int = 0
status_counts: Counter[str] = field(default_factory=Counter)
candidate_counts: Counter[str] = field(default_factory=Counter)
known_candidate_counts: Counter[str] = field(default_factory=Counter)
track_id_missing: int = 0
known_track_id_missing: int = 0
assoc_total: int = 0
assoc_source_counts: Counter[str] = field(default_factory=Counter)
assoc_track_id_missing: int = 0
alarm_counts: Counter[str] = field(default_factory=Counter)
alarm_frames: list[tuple[str, int]] = field(default_factory=list)
external_token_success: int = 0
external_send_counts: Counter[str] = field(default_factory=Counter)
external_sends: list[tuple[str, str]] = field(default_factory=list)
quantiles: dict[str, dict[str, dict[str, float | int]]] = field(default_factory=dict)
tracks_total: int = 0
track_summaries: list[dict[str, object]] = field(default_factory=list)
first_match_ts_ms: int | None = None
last_match_ts_ms: int | None = None
first_frame: int | None = None
last_frame: int | None = None
unknown_candidate_total: int = 0
unknown_candidate_rule_matched: int = 0
unknown_candidate_status_counts: Counter[str] = field(default_factory=Counter)
unknown_candidate_reject_counts: Counter[str] = field(default_factory=Counter)
unknown_candidate_gate_counts: Counter[str] = field(default_factory=Counter)
unknown_candidate_candidate_counts: Counter[str] = field(default_factory=Counter)
unknown_candidate_track_id_missing: int = 0
unknown_candidate_quantiles: dict[str, dict[str, float | int]] = field(default_factory=dict)
unknown_candidate_track_summaries: list[dict[str, object]] = field(default_factory=list)
def _quantiles(values: list[float | int]) -> dict[str, float | int]:
if not values:
return {}
values = sorted(values)
def q(pos: float) -> float | int:
return values[min(len(values) - 1, int(round((len(values) - 1) * pos)))]
return {"min": values[0], "p50": q(0.5), "p90": q(0.9), "max": values[-1]}
def _pct(count: int, total: int) -> str:
return "n/a" if total <= 0 else f"{100.0 * count / total:.1f}%"
def _parse_unknown_candidate(line: str) -> UnknownCandidateItem | None:
m = UNKNOWN_CANDIDATE_RE.search(line)
if not m:
return None
ts_ms = int(m.group(1))
fields = dict(KEY_VALUE_RE.findall(m.group(2)))
if not fields:
return None
quality = fields.get("quality_hits", "0/0").split("/", 1)
try:
quality_hits = int(quality[0])
quality_hits_required = int(quality[1]) if len(quality) > 1 else 0
return UnknownCandidateItem(
ts_ms=ts_ms,
frame=int(fields.get("frame", "0")),
status=fields.get("status", ""),
track_id=int(fields.get("track_id", "-1")),
candidate=fields.get("candidate", ""),
best_sim=float(fields.get("best_sim", "0")),
area_ratio=float(fields.get("area_ratio", "0")),
aspect=float(fields.get("aspect", "0")),
rule_matched=fields.get("rule_matched", "false") == "true",
reject_reason=fields.get("reject_reason", ""),
gate=fields.get("gate", ""),
track_age_ms=int(fields.get("track_age_ms", "0")),
quality_hits=quality_hits,
quality_hits_required=quality_hits_required,
)
except ValueError:
return None
def analyze_lines(lines: Iterable[str]) -> LogSummary:
summary = LogSummary()
matches: list[MatchItem] = []
by_track: dict[int, list[MatchItem]] = defaultdict(list)
unknown_candidates: list[UnknownCandidateItem] = []
unknown_candidates_by_track: dict[int, list[UnknownCandidateItem]] = defaultdict(list)
for line in lines:
if summary.version_git is None and (m := VERSION_RE.search(line)):
summary.version_git = m.group(1)
if m := GALLERY_RE.search(line):
summary.gallery_count = int(m.group(1))
summary.gallery_dim = int(m.group(2))
if m := START_RE.search(line):
summary.threshold_accept = float(m.group(1))
summary.threshold_margin = float(m.group(2))
summary.infer_interval_ms = int(m.group(3))
summary.shared_target_key = m.group(4)
if m := FRAME_RE.search(line):
summary.frame_count += 1
summary.faces_in_sum += int(m.group(3))
summary.recog_items_sum += int(m.group(4))
if m := ASSOC_RE.search(line):
summary.assoc_total += 1
summary.assoc_source_counts[m.group(3)] += 1
if int(m.group(4)) < 0:
summary.assoc_track_id_missing += 1
if m := ALARM_RE.search(line):
rule = m.group(2)
frame = int(m.group(3))
summary.alarm_counts[rule] += 1
summary.alarm_frames.append((rule, frame))
if TOKEN_RE.search(line):
summary.external_token_success += 1
if m := SEND_RE.search(line):
status = m.group(2)
content = m.group(3) or ""
summary.external_send_counts[status] += 1
summary.external_sends.append((status, content))
if item := _parse_unknown_candidate(line):
unknown_candidates.append(item)
summary.unknown_candidate_status_counts[item.status] += 1
summary.unknown_candidate_reject_counts[item.reject_reason] += 1
summary.unknown_candidate_gate_counts[item.gate] += 1
summary.unknown_candidate_candidate_counts[item.candidate] += 1
if item.rule_matched:
summary.unknown_candidate_rule_matched += 1
if item.track_id < 0:
summary.unknown_candidate_track_id_missing += 1
else:
unknown_candidates_by_track[item.track_id].append(item)
if m := MATCH_RE.search(line):
item = MatchItem(
ts_ms=int(m.group(1)),
frame=int(m.group(2)),
status=m.group(3),
track_id=int(m.group(4)),
candidate=m.group(5),
candidate_id=int(m.group(6)),
best_sim=float(m.group(7)),
second_sim=float(m.group(8)),
sim_margin=float(m.group(9)),
bbox=(int(m.group(10)), int(m.group(11)), int(m.group(12)), int(m.group(13))),
)
matches.append(item)
summary.status_counts[item.status] += 1
summary.candidate_counts[item.candidate] += 1
if item.status == "known":
summary.known_candidate_counts[item.candidate] += 1
if item.track_id < 0:
summary.track_id_missing += 1
if item.status == "known":
summary.known_track_id_missing += 1
else:
by_track[item.track_id].append(item)
summary.match_total = len(matches)
summary.unknown_candidate_total = len(unknown_candidates)
if matches:
summary.first_match_ts_ms = matches[0].ts_ms
summary.last_match_ts_ms = matches[-1].ts_ms
summary.first_frame = matches[0].frame
summary.last_frame = matches[-1].frame
for status in ("known", "uncertain"):
status_items = [item for item in matches if item.status == status]
summary.quantiles[status] = {
"best_sim": _quantiles([item.best_sim for item in status_items]),
"sim_margin": _quantiles([item.sim_margin for item in status_items]),
"bbox_area": _quantiles([item.area for item in status_items]),
}
summary.tracks_total = len(by_track)
track_rows = []
for track_id, items in by_track.items():
status_counts = Counter(item.status for item in items)
cand_counts = Counter(item.candidate for item in items)
track_rows.append(
{
"track_id": track_id,
"count": len(items),
"status_counts": dict(status_counts),
"top_candidates": cand_counts.most_common(3),
"best_max": max(item.best_sim for item in items),
"first_frame": items[0].frame,
"last_frame": items[-1].frame,
}
)
summary.track_summaries = sorted(
track_rows,
key=lambda row: (-row["status_counts"].get("known", 0), -row["count"], row["track_id"]),
)[:12]
matched_unknown_candidates = [item for item in unknown_candidates if item.rule_matched]
summary.unknown_candidate_quantiles = {
"all_best_sim": _quantiles([item.best_sim for item in unknown_candidates]),
"all_area_ratio": _quantiles([item.area_ratio for item in unknown_candidates]),
"all_aspect": _quantiles([item.aspect for item in unknown_candidates]),
"matched_best_sim": _quantiles([item.best_sim for item in matched_unknown_candidates]),
"matched_area_ratio": _quantiles([item.area_ratio for item in matched_unknown_candidates]),
"matched_track_age_ms": _quantiles([item.track_age_ms for item in matched_unknown_candidates]),
"matched_quality_hits": _quantiles([item.quality_hits for item in matched_unknown_candidates]),
}
unknown_track_rows = []
for track_id, items in unknown_candidates_by_track.items():
matched_items = [item for item in items if item.rule_matched]
if not matched_items:
continue
unknown_track_rows.append(
{
"track_id": track_id,
"matched_count": len(matched_items),
"max_quality_hits": max(item.quality_hits for item in matched_items),
"max_track_age_ms": max(item.track_age_ms for item in matched_items),
"best_sim_max": max(item.best_sim for item in matched_items),
"gates": Counter(item.gate for item in matched_items).most_common(3),
"candidates": Counter(item.candidate for item in matched_items).most_common(3),
}
)
summary.unknown_candidate_track_summaries = sorted(
unknown_track_rows,
key=lambda row: (-row["max_quality_hits"], -row["matched_count"], -row["max_track_age_ms"], row["track_id"]),
)[:12]
return summary
def format_summary(summary: LogSummary) -> str:
lines: list[str] = []
lines.append("Face Recognition Log Summary")
lines.append("")
lines.append("Startup:")
lines.append(f"- git: {summary.version_git or 'unknown'}")
lines.append(f"- gallery: n={summary.gallery_count if summary.gallery_count is not None else 'unknown'} dim={summary.gallery_dim if summary.gallery_dim is not None else 'unknown'}")
lines.append(f"- threshold: accept={summary.threshold_accept if summary.threshold_accept is not None else 'unknown'} margin={summary.threshold_margin if summary.threshold_margin is not None else 'unknown'}")
lines.append(f"- infer_interval_ms: {summary.infer_interval_ms if summary.infer_interval_ms is not None else 'unknown'}")
lines.append(f"- shared_target_key: {summary.shared_target_key or 'unknown'}")
lines.append("")
lines.append("Recognition:")
if summary.first_match_ts_ms is not None and summary.last_match_ts_ms is not None:
lines.append(f"- match_time_span_ms: {summary.last_match_ts_ms - summary.first_match_ts_ms}")
if summary.first_frame is not None and summary.last_frame is not None:
lines.append(f"- frame_span: {summary.first_frame} -> {summary.last_frame}")
lines.append(f"- frames_with_face_recog: {summary.frame_count}")
lines.append(f"- faces_in_sum: {summary.faces_in_sum}")
lines.append(f"- recog_items_sum: {summary.recog_items_sum}")
lines.append(f"- match_total: {summary.match_total}")
for status in ("known", "uncertain"):
count = summary.status_counts.get(status, 0)
lines.append(f"- {status}: {count} ({_pct(count, summary.match_total)})")
lines.append(f"- person_track_id=-1: {summary.track_id_missing} ({_pct(summary.track_id_missing, summary.match_total)})")
lines.append(f"- known person_track_id=-1: {summary.known_track_id_missing}/{summary.status_counts.get('known', 0)}")
lines.append("")
lines.append("Candidates:")
lines.append(f"- top candidates: {summary.candidate_counts.most_common(10)}")
lines.append(f"- known candidates: {summary.known_candidate_counts.most_common()}")
lines.append("")
lines.append("Quality Quantiles:")
for status in ("known", "uncertain"):
lines.append(f"- {status} best_sim: {summary.quantiles.get(status, {}).get('best_sim', {})}")
lines.append(f"- {status} sim_margin: {summary.quantiles.get(status, {}).get('sim_margin', {})}")
lines.append(f"- {status} bbox_area: {summary.quantiles.get(status, {}).get('bbox_area', {})}")
lines.append("")
lines.append("Track Association:")
lines.append(f"- assoc_total: {summary.assoc_total}")
lines.append(f"- assoc_source_counts: {summary.assoc_source_counts.most_common()}")
lines.append(f"- assoc_track_id=-1: {summary.assoc_track_id_missing}")
lines.append(f"- tracks_total: {summary.tracks_total}")
for row in summary.track_summaries:
lines.append(
"- track {track_id}: n={count} status={status_counts} candidates={top_candidates} "
"best_max={best_max:.2f} frames={first_frame}->{last_frame}".format(**row)
)
lines.append("")
lines.append("Unknown Face Candidate Diagnostics:")
lines.append(f"- unknown_candidate_total: {summary.unknown_candidate_total}")
lines.append(
f"- rule_matched: {summary.unknown_candidate_rule_matched} "
f"({_pct(summary.unknown_candidate_rule_matched, summary.unknown_candidate_total)})"
)
lines.append(
f"- track_id=-1: {summary.unknown_candidate_track_id_missing} "
f"({_pct(summary.unknown_candidate_track_id_missing, summary.unknown_candidate_total)})"
)
lines.append(f"- status_counts: {summary.unknown_candidate_status_counts.most_common()}")
lines.append(f"- reject_reason_counts: {summary.unknown_candidate_reject_counts.most_common()}")
lines.append(f"- gate_counts: {summary.unknown_candidate_gate_counts.most_common()}")
lines.append(f"- candidate_counts: {summary.unknown_candidate_candidate_counts.most_common(10)}")
for name, values in summary.unknown_candidate_quantiles.items():
lines.append(f"- {name}: {values}")
for row in summary.unknown_candidate_track_summaries:
lines.append(
"- unknown track {track_id}: matched={matched_count} max_quality_hits={max_quality_hits} "
"max_age_ms={max_track_age_ms} best_sim_max={best_sim_max:.3f} gates={gates} candidates={candidates}".format(**row)
)
lines.append("")
lines.append("Alarms And Uploads:")
lines.append(f"- alarms: {sum(summary.alarm_counts.values())} {summary.alarm_counts.most_common()}")
lines.append(f"- alarm_frames: {summary.alarm_frames}")
lines.append(f"- external_token_success: {summary.external_token_success}")
lines.append(f"- external_sends: {sum(summary.external_send_counts.values())} {summary.external_send_counts.most_common()}")
return "\n".join(lines)
def main() -> int:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("log_file", type=Path, help="Path to media-server log file")
args = parser.parse_args()
lines = args.log_file.read_text(encoding="utf-8", errors="replace").splitlines()
print(format_summary(analyze_lines(lines)))
return 0
if __name__ == "__main__":
raise SystemExit(main())