from __future__ import annotations import json import os from dataclasses import dataclass from typing import Any, Dict, List, Optional, Sequence, Tuple import numpy as np from .types import Detection _CANONICAL_LMK_ORDER = ["left_eye", "right_eye", "nose", "left_mouth", "right_mouth"] @dataclass(frozen=True) class _ResizeMeta: orig_w: int orig_h: int in_w: int in_h: int mode: str # none|stretch|keep_ratio scale_x: float scale_y: float pad_x: float pad_y: float def load_det_outputs_config(s: str) -> Dict[str, Any]: """Accept JSON string or a JSON file path.""" if s is None: raise ValueError("det_outputs_config is required (Option B)") p = os.path.abspath(s) if os.path.isfile(p): with open(p, "r", encoding="utf-8") as f: return json.load(f) return json.loads(s) class OnnxFaceDetector: def __init__( self, model_path: str, det_outputs_config: Dict[str, Any], score_thresh: float = 0.0, pick_face: str = "largest", ) -> None: self.model_path = model_path self.cfg = det_outputs_config self.score_thresh = float(score_thresh) self.pick_face = pick_face self._sess = None self._input_name: Optional[str] = None self._output_names: Optional[List[str]] = None if pick_face not in ("largest", "first", "highest_score"): raise ValueError("pick_face must be one of: largest|first|highest_score") def _ensure_session(self) -> None: if self._sess is not None: return try: import onnxruntime as ort except Exception as e: # pragma: no cover raise RuntimeError("onnxruntime is required for detection") from e self._sess = ort.InferenceSession(self.model_path, providers=["CPUExecutionProvider"]) self._input_name = self._sess.get_inputs()[0].name self._output_names = [o.name for o in self._sess.get_outputs()] def detect_one(self, img_bgr: np.ndarray) -> Optional[Detection]: dets = self.detect_all(img_bgr) if not dets: return None if self.pick_face == "first": return dets[0] if self.pick_face == "highest_score": return max(dets, key=lambda d: float(d.score)) return max(dets, key=lambda d: float((d.bbox_xyxy[2] - d.bbox_xyxy[0]) * (d.bbox_xyxy[3] - d.bbox_xyxy[1]))) def detect_all(self, img_bgr: np.ndarray) -> List[Detection]: """Return detections in original image coords.""" import cv2 self._ensure_session() if img_bgr is None or img_bgr.ndim != 3 or img_bgr.shape[2] != 3: raise ValueError("img_bgr must be HxWx3") inp_cfg = self.cfg.get("input", {}) color = str(inp_cfg.get("color", "BGR")).upper() img = img_bgr if color == "RGB": img = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2RGB) elif color != "BGR": raise ValueError(f"unsupported input.color: {color}") inp, meta = self._preprocess(img, inp_cfg) outputs = self._sess.run(None, {self._input_name: inp}) out_by_name = {name: val for name, val in zip(self._output_names, outputs)} dets = self._parse_outputs(out_by_name, meta) if self.score_thresh > 0: dets = [d for d in dets if float(d.score) >= self.score_thresh] return dets def _preprocess(self, img_hwc: np.ndarray, inp_cfg: Dict[str, Any]) -> Tuple[np.ndarray, _ResizeMeta]: import cv2 h, w = int(img_hwc.shape[0]), int(img_hwc.shape[1]) resize_cfg = inp_cfg.get("resize", None) if not resize_cfg: in_w, in_h = w, h meta = _ResizeMeta(orig_w=w, orig_h=h, in_w=w, in_h=h, mode="none", scale_x=1.0, scale_y=1.0, pad_x=0.0, pad_y=0.0) resized = img_hwc else: size = resize_cfg.get("size") if not (isinstance(size, (list, tuple)) and len(size) == 2): raise ValueError("input.resize.size must be [w,h]") in_w, in_h = int(size[0]), int(size[1]) mode = str(resize_cfg.get("mode", "stretch")).lower() if mode == "stretch": resized = cv2.resize(img_hwc, (in_w, in_h), interpolation=cv2.INTER_LINEAR) meta = _ResizeMeta( orig_w=w, orig_h=h, in_w=in_w, in_h=in_h, mode="stretch", scale_x=in_w / float(w), scale_y=in_h / float(h), pad_x=0.0, pad_y=0.0, ) elif mode == "keep_ratio": scale = min(in_w / float(w), in_h / float(h)) new_w = int(round(w * scale)) new_h = int(round(h * scale)) resized_small = cv2.resize(img_hwc, (new_w, new_h), interpolation=cv2.INTER_LINEAR) canvas = np.zeros((in_h, in_w, 3), dtype=resized_small.dtype) pad_x = (in_w - new_w) // 2 pad_y = (in_h - new_h) // 2 canvas[pad_y : pad_y + new_h, pad_x : pad_x + new_w] = resized_small resized = canvas meta = _ResizeMeta( orig_w=w, orig_h=h, in_w=in_w, in_h=in_h, mode="keep_ratio", scale_x=scale, scale_y=scale, pad_x=float(pad_x), pad_y=float(pad_y), ) else: raise ValueError("input.resize.mode must be stretch|keep_ratio") dtype = str(inp_cfg.get("dtype", "float32")).lower() x = resized if dtype in ("float32", "fp32"): x = x.astype(np.float32) elif dtype in ("uint8",): x = x.astype(np.uint8) else: raise ValueError(f"unsupported input.dtype: {dtype}") norm = inp_cfg.get("normalize", None) if norm and dtype in ("float32", "fp32"): scale = float(norm.get("scale", 1.0)) mean = norm.get("mean", [0.0, 0.0, 0.0]) std = norm.get("std", [1.0, 1.0, 1.0]) mean = np.asarray(mean, dtype=np.float32).reshape(1, 1, 3) std = np.asarray(std, dtype=np.float32).reshape(1, 1, 3) x = x * scale x = (x - mean) / std layout = str(inp_cfg.get("layout", "NCHW")).upper() if layout == "NHWC": x = np.expand_dims(x, axis=0) elif layout == "NCHW": x = np.transpose(x, (2, 0, 1)) x = np.expand_dims(x, axis=0) else: raise ValueError("input.layout must be NCHW|NHWC") return x, meta def _parse_outputs(self, out_by_name: Dict[str, Any], meta: _ResizeMeta) -> List[Detection]: decoder_cfg = self.cfg.get("decoder") if decoder_cfg and str(decoder_cfg.get("type", "")).lower() == "retinaface": return self._parse_outputs_retinaface(out_by_name, meta, decoder_cfg) out_cfg = self.cfg.get("outputs", {}) bbox_cfg = out_cfg.get("bbox") lmk_cfg = out_cfg.get("landmarks") score_cfg = out_cfg.get("score") if not bbox_cfg or not lmk_cfg: raise ValueError("det_outputs_config must include either decoder.type=retinaface OR outputs.bbox+outputs.landmarks") bbox_arr = self._select_output(out_by_name, bbox_cfg) lmk_arr = self._select_output(out_by_name, lmk_cfg) score_arr = self._select_output(out_by_name, score_cfg) if score_cfg else None bbox = self._to_Nx4(bbox_arr) lmks = self._to_landmarks(lmk_arr, lmk_cfg) if score_arr is None: scores = np.ones((bbox.shape[0],), dtype=np.float32) else: scores = np.asarray(score_arr, dtype=np.float32) scores = scores.reshape(-1) if scores.size == bbox.shape[0] * 1: scores = scores[: bbox.shape[0]] elif scores.size != bbox.shape[0]: raise ValueError(f"score count mismatch: scores={scores.size}, bbox={bbox.shape[0]}") bbox_format = str(bbox_cfg.get("format", "xyxy")).lower() bbox_norm = bool(bbox_cfg.get("normalized", False)) lmk_norm = bool(lmk_cfg.get("normalized", False)) if bbox_norm: bbox = bbox.copy() bbox[:, [0, 2]] *= float(meta.in_w) bbox[:, [1, 3]] *= float(meta.in_h) if bbox_format == "xywh": bbox = bbox.copy() bbox[:, 2] = bbox[:, 0] + bbox[:, 2] bbox[:, 3] = bbox[:, 1] + bbox[:, 3] elif bbox_format != "xyxy": raise ValueError("outputs.bbox.format must be xyxy|xywh") if lmk_norm: lmks = lmks.copy() lmks[:, :, 0] *= float(meta.in_w) lmks[:, :, 1] *= float(meta.in_h) bbox, lmks = self._map_to_original(bbox, lmks, meta) bbox = self._clip_bbox(bbox, meta.orig_w, meta.orig_h) dets: List[Detection] = [] for i in range(bbox.shape[0]): dets.append( Detection( bbox_xyxy=bbox[i].astype(np.float32), landmarks5=lmks[i].astype(np.float32), score=float(scores[i]), ) ) return dets def _parse_outputs_retinaface(self, out_by_name: Dict[str, Any], meta: _ResizeMeta, decoder_cfg: Dict[str, Any]) -> List[Detection]: out_cfg = self.cfg.get("outputs", {}) loc_spec = out_cfg.get("loc") or out_cfg.get("bbox") conf_spec = out_cfg.get("conf") or out_cfg.get("score") lmk_spec = out_cfg.get("landmarks") if not loc_spec or not conf_spec or not lmk_spec: raise ValueError("retinaface decoder requires outputs.loc, outputs.conf, outputs.landmarks") loc = np.asarray(self._select_output(out_by_name, loc_spec), dtype=np.float32) conf = np.asarray(self._select_output(out_by_name, conf_spec), dtype=np.float32) landms = np.asarray(self._select_output(out_by_name, lmk_spec), dtype=np.float32) if loc.ndim == 3 and loc.shape[0] == 1: loc = loc[0] if conf.ndim == 3 and conf.shape[0] == 1: conf = conf[0] if landms.ndim == 3 and landms.shape[0] == 1: landms = landms[0] if loc.ndim != 2 or loc.shape[1] != 4: raise ValueError(f"retinaface loc must be [N,4] (or [1,N,4]); got {loc.shape}") if conf.ndim != 2 or conf.shape[1] != 2: raise ValueError(f"retinaface conf must be [N,2] (or [1,N,2]); got {conf.shape}") if landms.ndim != 2 or landms.shape[1] != 10: raise ValueError(f"retinaface landmarks must be [N,10] (or [1,N,10]); got {landms.shape}") steps = decoder_cfg.get("steps", [8, 16, 32]) min_sizes = decoder_cfg.get("min_sizes", [[16, 32], [64, 128], [256, 512]]) variances = decoder_cfg.get("variances", [0.1, 0.2]) score_index = int(decoder_cfg.get("score_index", 1)) nms_iou = float(decoder_cfg.get("nms_iou_thresh", 0.4)) top_k = int(decoder_cfg.get("top_k", 5000)) keep_top_k = int(decoder_cfg.get("keep_top_k", 750)) prob_mode = str(decoder_cfg.get("conf_mode", "auto")).lower() # auto|prob|logits priors = self._retinaface_priors(meta.in_w, meta.in_h, steps=steps, min_sizes=min_sizes) if priors.shape[0] != loc.shape[0]: raise ValueError(f"prior count mismatch: priors={priors.shape[0]} loc={loc.shape[0]}") scores = self._retinaface_scores(conf, score_index=score_index, mode=prob_mode) # filter keep = np.where(scores >= float(self.score_thresh))[0] if self.score_thresh > 0 else np.arange(scores.size) if keep.size == 0: return [] if top_k > 0 and keep.size > top_k: idx = np.argsort(scores[keep])[::-1][:top_k] keep = keep[idx] pri = priors[keep] loc_k = loc[keep] lmk_k = landms[keep].reshape(-1, 5, 2) sc_k = scores[keep] bbox_in, lmks_in = self._retinaface_decode(pri, loc_k, lmk_k, meta.in_w, meta.in_h, variances=variances) order = np.argsort(sc_k)[::-1] bbox_in = bbox_in[order] lmks_in = lmks_in[order] sc_k = sc_k[order] keep_nms = self._nms_xyxy(bbox_in, sc_k, iou_thresh=nms_iou) if keep_top_k > 0: keep_nms = keep_nms[:keep_top_k] bbox_in = bbox_in[keep_nms] lmks_in = lmks_in[keep_nms] sc_k = sc_k[keep_nms] bbox, lmks = self._map_to_original(bbox_in, lmks_in, meta) bbox = self._clip_bbox(bbox, meta.orig_w, meta.orig_h) dets: List[Detection] = [] for i in range(bbox.shape[0]): dets.append(Detection(bbox_xyxy=bbox[i].astype(np.float32), landmarks5=lmks[i].astype(np.float32), score=float(sc_k[i]))) return dets def _retinaface_priors(self, in_w: int, in_h: int, steps: Sequence[int], min_sizes: Sequence[Sequence[int]]) -> np.ndarray: from itertools import product priors: List[List[float]] = [] for k, step in enumerate(steps): fm_h = int(np.ceil(in_h / float(step))) fm_w = int(np.ceil(in_w / float(step))) for i, j in product(range(fm_h), range(fm_w)): for ms in min_sizes[k]: s_kx = ms / float(in_w) s_ky = ms / float(in_h) cx = (j + 0.5) * step / float(in_w) cy = (i + 0.5) * step / float(in_h) priors.append([cx, cy, s_kx, s_ky]) return np.asarray(priors, dtype=np.float32) def _retinaface_scores(self, conf: np.ndarray, score_index: int, mode: str) -> np.ndarray: x = conf.astype(np.float32) if mode == "prob": prob = x elif mode == "logits": prob = self._softmax(x, axis=1) else: # auto row_sum = x.sum(axis=1) looks_prob = (x.min() >= 0.0) and (x.max() <= 1.0) and (np.mean(np.abs(row_sum - 1.0)) < 1e-2) prob = x if looks_prob else self._softmax(x, axis=1) if score_index < 0 or score_index >= prob.shape[1]: raise ValueError(f"score_index out of range: {score_index}") return prob[:, score_index] def _retinaface_decode( self, priors: np.ndarray, loc: np.ndarray, landms: np.ndarray, in_w: int, in_h: int, variances: Sequence[float], ) -> Tuple[np.ndarray, np.ndarray]: v0 = float(variances[0]) v1 = float(variances[1]) pri_c = priors[:, 0:2] pri_s = priors[:, 2:4] boxes_c = pri_c + loc[:, 0:2] * v0 * pri_s boxes_s = pri_s * np.exp(loc[:, 2:4] * v1) boxes = np.concatenate([boxes_c - boxes_s / 2.0, boxes_c + boxes_s / 2.0], axis=1) boxes[:, [0, 2]] *= float(in_w) boxes[:, [1, 3]] *= float(in_h) lm = pri_c[:, None, :] + landms * v0 * pri_s[:, None, :] lm[:, :, 0] *= float(in_w) lm[:, :, 1] *= float(in_h) return boxes.astype(np.float32), lm.astype(np.float32) def _softmax(self, x: np.ndarray, axis: int = -1) -> np.ndarray: x = x.astype(np.float32) m = np.max(x, axis=axis, keepdims=True) e = np.exp(x - m) s = np.sum(e, axis=axis, keepdims=True) return e / s def _nms_xyxy(self, boxes: np.ndarray, scores: np.ndarray, iou_thresh: float) -> List[int]: b = boxes.astype(np.float32) s = scores.astype(np.float32) x1 = b[:, 0] y1 = b[:, 1] x2 = b[:, 2] y2 = b[:, 3] areas = np.maximum(0.0, x2 - x1) * np.maximum(0.0, y2 - y1) order = np.argsort(s)[::-1] keep: List[int] = [] while order.size > 0: i = int(order[0]) keep.append(i) if order.size == 1: break rest = order[1:] xx1 = np.maximum(x1[i], x1[rest]) yy1 = np.maximum(y1[i], y1[rest]) xx2 = np.minimum(x2[i], x2[rest]) yy2 = np.minimum(y2[i], y2[rest]) w = np.maximum(0.0, xx2 - xx1) h = np.maximum(0.0, yy2 - yy1) inter = w * h union = areas[i] + areas[rest] - inter iou = np.where(union > 0, inter / union, 0.0) inds = np.where(iou <= float(iou_thresh))[0] order = rest[inds] return keep def _select_output(self, out_by_name: Dict[str, Any], spec: Optional[Dict[str, Any]]) -> Any: if spec is None: return None if "name" in spec: name = spec["name"] if name not in out_by_name: raise KeyError(f"output not found: {name}") return out_by_name[name] if "index" in spec: idx = int(spec["index"]) keys = list(out_by_name.keys()) if idx < 0 or idx >= len(keys): raise IndexError(f"output index out of range: {idx}") return out_by_name[keys[idx]] raise ValueError("output spec must include name or index") def _to_Nx4(self, arr: Any) -> np.ndarray: x = np.asarray(arr) if x.ndim == 3 and x.shape[0] == 1: x = x[0] if x.ndim != 2 or x.shape[1] != 4: raise ValueError(f"bbox output must be [N,4] (or [1,N,4]); got {x.shape}") return x.astype(np.float32) def _to_landmarks(self, arr: Any, lmk_cfg: Dict[str, Any]) -> np.ndarray: x = np.asarray(arr) if x.ndim == 4 and x.shape[0] == 1: x = x[0] if x.ndim == 3 and x.shape[0] == 1: x = x[0] layout = str(lmk_cfg.get("layout", "flat10")).lower() if layout == "flat10": if x.ndim != 2 or x.shape[1] != 10: raise ValueError(f"landmarks flat10 must be [N,10]; got {x.shape}") x = x.reshape(-1, 5, 2) elif layout in ("5x2", "five_two"): if x.ndim != 3 or x.shape[1:] != (5, 2): raise ValueError(f"landmarks 5x2 must be [N,5,2]; got {x.shape}") else: raise ValueError("outputs.landmarks.layout must be flat10|5x2") order = lmk_cfg.get("order") if order: x = self._reorder_landmarks(x, order) return x.astype(np.float32) def _reorder_landmarks(self, lmks: np.ndarray, order: Sequence[str]) -> np.ndarray: order = [str(o) for o in order] if sorted(order) != sorted(_CANONICAL_LMK_ORDER): raise ValueError(f"outputs.landmarks.order must be a permutation of {_CANONICAL_LMK_ORDER}") idx = {name: i for i, name in enumerate(order)} take = [idx[name] for name in _CANONICAL_LMK_ORDER] return lmks[:, take, :] def _map_to_original(self, bbox_xyxy_in: np.ndarray, lmks_in: np.ndarray, meta: _ResizeMeta) -> Tuple[np.ndarray, np.ndarray]: if meta.mode == "none": return bbox_xyxy_in, lmks_in if meta.mode == "stretch": sx = meta.scale_x sy = meta.scale_y bbox = bbox_xyxy_in.copy() bbox[:, [0, 2]] /= sx bbox[:, [1, 3]] /= sy lmks = lmks_in.copy() lmks[:, :, 0] /= sx lmks[:, :, 1] /= sy return bbox, lmks if meta.mode == "keep_ratio": s = meta.scale_x px = meta.pad_x py = meta.pad_y bbox = bbox_xyxy_in.copy() bbox[:, [0, 2]] = (bbox[:, [0, 2]] - px) / s bbox[:, [1, 3]] = (bbox[:, [1, 3]] - py) / s lmks = lmks_in.copy() lmks[:, :, 0] = (lmks[:, :, 0] - px) / s lmks[:, :, 1] = (lmks[:, :, 1] - py) / s return bbox, lmks raise ValueError(f"unknown resize mode: {meta.mode}") def _clip_bbox(self, bbox: np.ndarray, w: int, h: int) -> np.ndarray: b = bbox.copy() b[:, 0] = np.clip(b[:, 0], 0, w - 1) b[:, 1] = np.clip(b[:, 1], 0, h - 1) b[:, 2] = np.clip(b[:, 2], 0, w - 1) b[:, 3] = np.clip(b[:, 3], 0, h - 1) return b