#!/usr/bin/env python3 """Analyze RK3588 media-server face-recognition logs.""" from __future__ import annotations import argparse from collections import Counter, defaultdict from dataclasses import dataclass, field from pathlib import Path import re from typing import Iterable VERSION_RE = re.compile(r"RK3588 Media Server .* \(git ([^)]+)\)") GALLERY_RE = re.compile(r"\[ai_face_recog\] gallery loaded: n=(\d+) dim=(\d+)") START_RE = re.compile( r"\[ai_face_recog\] start .*?thr_accept=([0-9.\-]+) " r"thr_margin=([0-9.\-]+).*?infer_interval_ms=(\d+) " r"shared_target_key=([^ ]+)" ) FRAME_RE = re.compile( r"\[(\d+)\].*\[ai_face_recog\] frame .*frame=(\d+) " r"faces_in=(\d+) recog_items=(\d+)" ) MATCH_RE = re.compile( r"\[(\d+)\].*\[ai_face_recog\] match .*frame=(\d+) " r"status=(\w+) person_track_id=(-?\d+) candidate=([^\s]+) " r"candidate_id=(-?\d+) best_sim=([0-9.\-]+) second_sim=([0-9.\-]+) " r"sim_margin=([0-9.\-]+) bbox=\((\d+),(\d+),(\d+),(\d+)\)" ) ASSOC_RE = re.compile( r"\[(\d+)\].*\[ai_face_recog\] track_assoc .*frame=(\d+) " r"source=(\w+) .*best_track_id=(-?\d+)" ) ALARM_RE = re.compile(r"\[(\d+)\].*\[ALARM\].* rule=([^\s]+) frame=(\d+)") TOKEN_RE = re.compile(r"\[ExternalApiAction\] token fetched successfully") SEND_RE = re.compile(r"\[(\d+)\].*\[ExternalApiAction\] send (ok|failed).*?(?:alarm_content=([^\s]+))?") UNKNOWN_CANDIDATE_RE = re.compile(r"\[(\d+)\].*\[alarm\] unknown_candidate (.*)") KEY_VALUE_RE = re.compile(r"(\w+)=([^\s]+)") @dataclass class MatchItem: ts_ms: int frame: int status: str track_id: int candidate: str candidate_id: int best_sim: float second_sim: float sim_margin: float bbox: tuple[int, int, int, int] @property def area(self) -> int: return self.bbox[2] * self.bbox[3] @dataclass class UnknownCandidateItem: ts_ms: int frame: int status: str track_id: int candidate: str best_sim: float area_ratio: float aspect: float rule_matched: bool reject_reason: str gate: str track_age_ms: int quality_hits: int quality_hits_required: int @dataclass class LogSummary: version_git: str | None = None gallery_count: int | None = None gallery_dim: int | None = None threshold_accept: float | None = None threshold_margin: float | None = None infer_interval_ms: int | None = None shared_target_key: str | None = None frame_count: int = 0 faces_in_sum: int = 0 recog_items_sum: int = 0 match_total: int = 0 status_counts: Counter[str] = field(default_factory=Counter) candidate_counts: Counter[str] = field(default_factory=Counter) known_candidate_counts: Counter[str] = field(default_factory=Counter) track_id_missing: int = 0 known_track_id_missing: int = 0 assoc_total: int = 0 assoc_source_counts: Counter[str] = field(default_factory=Counter) assoc_track_id_missing: int = 0 alarm_counts: Counter[str] = field(default_factory=Counter) alarm_frames: list[tuple[str, int]] = field(default_factory=list) external_token_success: int = 0 external_send_counts: Counter[str] = field(default_factory=Counter) external_sends: list[tuple[str, str]] = field(default_factory=list) quantiles: dict[str, dict[str, dict[str, float | int]]] = field(default_factory=dict) tracks_total: int = 0 track_summaries: list[dict[str, object]] = field(default_factory=list) first_match_ts_ms: int | None = None last_match_ts_ms: int | None = None first_frame: int | None = None last_frame: int | None = None unknown_candidate_total: int = 0 unknown_candidate_rule_matched: int = 0 unknown_candidate_status_counts: Counter[str] = field(default_factory=Counter) unknown_candidate_reject_counts: Counter[str] = field(default_factory=Counter) unknown_candidate_gate_counts: Counter[str] = field(default_factory=Counter) unknown_candidate_candidate_counts: Counter[str] = field(default_factory=Counter) unknown_candidate_track_id_missing: int = 0 unknown_candidate_quantiles: dict[str, dict[str, float | int]] = field(default_factory=dict) unknown_candidate_track_summaries: list[dict[str, object]] = field(default_factory=list) def _quantiles(values: list[float | int]) -> dict[str, float | int]: if not values: return {} values = sorted(values) def q(pos: float) -> float | int: return values[min(len(values) - 1, int(round((len(values) - 1) * pos)))] return {"min": values[0], "p50": q(0.5), "p90": q(0.9), "max": values[-1]} def _pct(count: int, total: int) -> str: return "n/a" if total <= 0 else f"{100.0 * count / total:.1f}%" def _parse_unknown_candidate(line: str) -> UnknownCandidateItem | None: m = UNKNOWN_CANDIDATE_RE.search(line) if not m: return None ts_ms = int(m.group(1)) fields = dict(KEY_VALUE_RE.findall(m.group(2))) if not fields: return None quality = fields.get("quality_hits", "0/0").split("/", 1) try: quality_hits = int(quality[0]) quality_hits_required = int(quality[1]) if len(quality) > 1 else 0 return UnknownCandidateItem( ts_ms=ts_ms, frame=int(fields.get("frame", "0")), status=fields.get("status", ""), track_id=int(fields.get("track_id", "-1")), candidate=fields.get("candidate", ""), best_sim=float(fields.get("best_sim", "0")), area_ratio=float(fields.get("area_ratio", "0")), aspect=float(fields.get("aspect", "0")), rule_matched=fields.get("rule_matched", "false") == "true", reject_reason=fields.get("reject_reason", ""), gate=fields.get("gate", ""), track_age_ms=int(fields.get("track_age_ms", "0")), quality_hits=quality_hits, quality_hits_required=quality_hits_required, ) except ValueError: return None def analyze_lines(lines: Iterable[str]) -> LogSummary: summary = LogSummary() matches: list[MatchItem] = [] by_track: dict[int, list[MatchItem]] = defaultdict(list) unknown_candidates: list[UnknownCandidateItem] = [] unknown_candidates_by_track: dict[int, list[UnknownCandidateItem]] = defaultdict(list) for line in lines: if summary.version_git is None and (m := VERSION_RE.search(line)): summary.version_git = m.group(1) if m := GALLERY_RE.search(line): summary.gallery_count = int(m.group(1)) summary.gallery_dim = int(m.group(2)) if m := START_RE.search(line): summary.threshold_accept = float(m.group(1)) summary.threshold_margin = float(m.group(2)) summary.infer_interval_ms = int(m.group(3)) summary.shared_target_key = m.group(4) if m := FRAME_RE.search(line): summary.frame_count += 1 summary.faces_in_sum += int(m.group(3)) summary.recog_items_sum += int(m.group(4)) if m := ASSOC_RE.search(line): summary.assoc_total += 1 summary.assoc_source_counts[m.group(3)] += 1 if int(m.group(4)) < 0: summary.assoc_track_id_missing += 1 if m := ALARM_RE.search(line): rule = m.group(2) frame = int(m.group(3)) summary.alarm_counts[rule] += 1 summary.alarm_frames.append((rule, frame)) if TOKEN_RE.search(line): summary.external_token_success += 1 if m := SEND_RE.search(line): status = m.group(2) content = m.group(3) or "" summary.external_send_counts[status] += 1 summary.external_sends.append((status, content)) if item := _parse_unknown_candidate(line): unknown_candidates.append(item) summary.unknown_candidate_status_counts[item.status] += 1 summary.unknown_candidate_reject_counts[item.reject_reason] += 1 summary.unknown_candidate_gate_counts[item.gate] += 1 summary.unknown_candidate_candidate_counts[item.candidate] += 1 if item.rule_matched: summary.unknown_candidate_rule_matched += 1 if item.track_id < 0: summary.unknown_candidate_track_id_missing += 1 else: unknown_candidates_by_track[item.track_id].append(item) if m := MATCH_RE.search(line): item = MatchItem( ts_ms=int(m.group(1)), frame=int(m.group(2)), status=m.group(3), track_id=int(m.group(4)), candidate=m.group(5), candidate_id=int(m.group(6)), best_sim=float(m.group(7)), second_sim=float(m.group(8)), sim_margin=float(m.group(9)), bbox=(int(m.group(10)), int(m.group(11)), int(m.group(12)), int(m.group(13))), ) matches.append(item) summary.status_counts[item.status] += 1 summary.candidate_counts[item.candidate] += 1 if item.status == "known": summary.known_candidate_counts[item.candidate] += 1 if item.track_id < 0: summary.track_id_missing += 1 if item.status == "known": summary.known_track_id_missing += 1 else: by_track[item.track_id].append(item) summary.match_total = len(matches) summary.unknown_candidate_total = len(unknown_candidates) if matches: summary.first_match_ts_ms = matches[0].ts_ms summary.last_match_ts_ms = matches[-1].ts_ms summary.first_frame = matches[0].frame summary.last_frame = matches[-1].frame for status in ("known", "uncertain"): status_items = [item for item in matches if item.status == status] summary.quantiles[status] = { "best_sim": _quantiles([item.best_sim for item in status_items]), "sim_margin": _quantiles([item.sim_margin for item in status_items]), "bbox_area": _quantiles([item.area for item in status_items]), } summary.tracks_total = len(by_track) track_rows = [] for track_id, items in by_track.items(): status_counts = Counter(item.status for item in items) cand_counts = Counter(item.candidate for item in items) track_rows.append( { "track_id": track_id, "count": len(items), "status_counts": dict(status_counts), "top_candidates": cand_counts.most_common(3), "best_max": max(item.best_sim for item in items), "first_frame": items[0].frame, "last_frame": items[-1].frame, } ) summary.track_summaries = sorted( track_rows, key=lambda row: (-row["status_counts"].get("known", 0), -row["count"], row["track_id"]), )[:12] matched_unknown_candidates = [item for item in unknown_candidates if item.rule_matched] summary.unknown_candidate_quantiles = { "all_best_sim": _quantiles([item.best_sim for item in unknown_candidates]), "all_area_ratio": _quantiles([item.area_ratio for item in unknown_candidates]), "all_aspect": _quantiles([item.aspect for item in unknown_candidates]), "matched_best_sim": _quantiles([item.best_sim for item in matched_unknown_candidates]), "matched_area_ratio": _quantiles([item.area_ratio for item in matched_unknown_candidates]), "matched_track_age_ms": _quantiles([item.track_age_ms for item in matched_unknown_candidates]), "matched_quality_hits": _quantiles([item.quality_hits for item in matched_unknown_candidates]), } unknown_track_rows = [] for track_id, items in unknown_candidates_by_track.items(): matched_items = [item for item in items if item.rule_matched] if not matched_items: continue unknown_track_rows.append( { "track_id": track_id, "matched_count": len(matched_items), "max_quality_hits": max(item.quality_hits for item in matched_items), "max_track_age_ms": max(item.track_age_ms for item in matched_items), "best_sim_max": max(item.best_sim for item in matched_items), "gates": Counter(item.gate for item in matched_items).most_common(3), "candidates": Counter(item.candidate for item in matched_items).most_common(3), } ) summary.unknown_candidate_track_summaries = sorted( unknown_track_rows, key=lambda row: (-row["max_quality_hits"], -row["matched_count"], -row["max_track_age_ms"], row["track_id"]), )[:12] return summary def format_summary(summary: LogSummary) -> str: lines: list[str] = [] lines.append("Face Recognition Log Summary") lines.append("") lines.append("Startup:") lines.append(f"- git: {summary.version_git or 'unknown'}") lines.append(f"- gallery: n={summary.gallery_count if summary.gallery_count is not None else 'unknown'} dim={summary.gallery_dim if summary.gallery_dim is not None else 'unknown'}") lines.append(f"- threshold: accept={summary.threshold_accept if summary.threshold_accept is not None else 'unknown'} margin={summary.threshold_margin if summary.threshold_margin is not None else 'unknown'}") lines.append(f"- infer_interval_ms: {summary.infer_interval_ms if summary.infer_interval_ms is not None else 'unknown'}") lines.append(f"- shared_target_key: {summary.shared_target_key or 'unknown'}") lines.append("") lines.append("Recognition:") if summary.first_match_ts_ms is not None and summary.last_match_ts_ms is not None: lines.append(f"- match_time_span_ms: {summary.last_match_ts_ms - summary.first_match_ts_ms}") if summary.first_frame is not None and summary.last_frame is not None: lines.append(f"- frame_span: {summary.first_frame} -> {summary.last_frame}") lines.append(f"- frames_with_face_recog: {summary.frame_count}") lines.append(f"- faces_in_sum: {summary.faces_in_sum}") lines.append(f"- recog_items_sum: {summary.recog_items_sum}") lines.append(f"- match_total: {summary.match_total}") for status in ("known", "uncertain"): count = summary.status_counts.get(status, 0) lines.append(f"- {status}: {count} ({_pct(count, summary.match_total)})") lines.append(f"- person_track_id=-1: {summary.track_id_missing} ({_pct(summary.track_id_missing, summary.match_total)})") lines.append(f"- known person_track_id=-1: {summary.known_track_id_missing}/{summary.status_counts.get('known', 0)}") lines.append("") lines.append("Candidates:") lines.append(f"- top candidates: {summary.candidate_counts.most_common(10)}") lines.append(f"- known candidates: {summary.known_candidate_counts.most_common()}") lines.append("") lines.append("Quality Quantiles:") for status in ("known", "uncertain"): lines.append(f"- {status} best_sim: {summary.quantiles.get(status, {}).get('best_sim', {})}") lines.append(f"- {status} sim_margin: {summary.quantiles.get(status, {}).get('sim_margin', {})}") lines.append(f"- {status} bbox_area: {summary.quantiles.get(status, {}).get('bbox_area', {})}") lines.append("") lines.append("Track Association:") lines.append(f"- assoc_total: {summary.assoc_total}") lines.append(f"- assoc_source_counts: {summary.assoc_source_counts.most_common()}") lines.append(f"- assoc_track_id=-1: {summary.assoc_track_id_missing}") lines.append(f"- tracks_total: {summary.tracks_total}") for row in summary.track_summaries: lines.append( "- track {track_id}: n={count} status={status_counts} candidates={top_candidates} " "best_max={best_max:.2f} frames={first_frame}->{last_frame}".format(**row) ) lines.append("") lines.append("Unknown Face Candidate Diagnostics:") lines.append(f"- unknown_candidate_total: {summary.unknown_candidate_total}") lines.append( f"- rule_matched: {summary.unknown_candidate_rule_matched} " f"({_pct(summary.unknown_candidate_rule_matched, summary.unknown_candidate_total)})" ) lines.append( f"- track_id=-1: {summary.unknown_candidate_track_id_missing} " f"({_pct(summary.unknown_candidate_track_id_missing, summary.unknown_candidate_total)})" ) lines.append(f"- status_counts: {summary.unknown_candidate_status_counts.most_common()}") lines.append(f"- reject_reason_counts: {summary.unknown_candidate_reject_counts.most_common()}") lines.append(f"- gate_counts: {summary.unknown_candidate_gate_counts.most_common()}") lines.append(f"- candidate_counts: {summary.unknown_candidate_candidate_counts.most_common(10)}") for name, values in summary.unknown_candidate_quantiles.items(): lines.append(f"- {name}: {values}") for row in summary.unknown_candidate_track_summaries: lines.append( "- unknown track {track_id}: matched={matched_count} max_quality_hits={max_quality_hits} " "max_age_ms={max_track_age_ms} best_sim_max={best_sim_max:.3f} gates={gates} candidates={candidates}".format(**row) ) lines.append("") lines.append("Alarms And Uploads:") lines.append(f"- alarms: {sum(summary.alarm_counts.values())} {summary.alarm_counts.most_common()}") lines.append(f"- alarm_frames: {summary.alarm_frames}") lines.append(f"- external_token_success: {summary.external_token_success}") lines.append(f"- external_sends: {sum(summary.external_send_counts.values())} {summary.external_send_counts.most_common()}") return "\n".join(lines) def main() -> int: parser = argparse.ArgumentParser(description=__doc__) parser.add_argument("log_file", type=Path, help="Path to media-server log file") args = parser.parse_args() lines = args.log_file.read_text(encoding="utf-8", errors="replace").splitlines() print(format_summary(analyze_lines(lines))) return 0 if __name__ == "__main__": raise SystemExit(main())