AddFaceTo3588/gallery_builder/detector.py
2026-01-08 13:46:50 +08:00

515 lines
20 KiB
Python

from __future__ import annotations
import json
import os
from dataclasses import dataclass
from typing import Any, Dict, List, Optional, Sequence, Tuple
import numpy as np
from .types import Detection
_CANONICAL_LMK_ORDER = ["left_eye", "right_eye", "nose", "left_mouth", "right_mouth"]
@dataclass(frozen=True)
class _ResizeMeta:
orig_w: int
orig_h: int
in_w: int
in_h: int
mode: str # none|stretch|keep_ratio
scale_x: float
scale_y: float
pad_x: float
pad_y: float
def load_det_outputs_config(s: str) -> Dict[str, Any]:
"""Accept JSON string or a JSON file path."""
if s is None:
raise ValueError("det_outputs_config is required (Option B)")
p = os.path.abspath(s)
if os.path.isfile(p):
with open(p, "r", encoding="utf-8") as f:
return json.load(f)
return json.loads(s)
class OnnxFaceDetector:
def __init__(
self,
model_path: str,
det_outputs_config: Dict[str, Any],
score_thresh: float = 0.0,
pick_face: str = "largest",
) -> None:
self.model_path = model_path
self.cfg = det_outputs_config
self.score_thresh = float(score_thresh)
self.pick_face = pick_face
self._sess = None
self._input_name: Optional[str] = None
self._output_names: Optional[List[str]] = None
if pick_face not in ("largest", "first", "highest_score"):
raise ValueError("pick_face must be one of: largest|first|highest_score")
def _ensure_session(self) -> None:
if self._sess is not None:
return
try:
import onnxruntime as ort
except Exception as e: # pragma: no cover
raise RuntimeError("onnxruntime is required for detection") from e
self._sess = ort.InferenceSession(self.model_path, providers=["CPUExecutionProvider"])
self._input_name = self._sess.get_inputs()[0].name
self._output_names = [o.name for o in self._sess.get_outputs()]
def detect_one(self, img_bgr: np.ndarray) -> Optional[Detection]:
dets = self.detect_all(img_bgr)
if not dets:
return None
if self.pick_face == "first":
return dets[0]
if self.pick_face == "highest_score":
return max(dets, key=lambda d: float(d.score))
return max(dets, key=lambda d: float((d.bbox_xyxy[2] - d.bbox_xyxy[0]) * (d.bbox_xyxy[3] - d.bbox_xyxy[1])))
def detect_all(self, img_bgr: np.ndarray) -> List[Detection]:
"""Return detections in original image coords."""
import cv2
self._ensure_session()
if img_bgr is None or img_bgr.ndim != 3 or img_bgr.shape[2] != 3:
raise ValueError("img_bgr must be HxWx3")
inp_cfg = self.cfg.get("input", {})
color = str(inp_cfg.get("color", "BGR")).upper()
img = img_bgr
if color == "RGB":
img = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2RGB)
elif color != "BGR":
raise ValueError(f"unsupported input.color: {color}")
inp, meta = self._preprocess(img, inp_cfg)
outputs = self._sess.run(None, {self._input_name: inp})
out_by_name = {name: val for name, val in zip(self._output_names, outputs)}
dets = self._parse_outputs(out_by_name, meta)
if self.score_thresh > 0:
dets = [d for d in dets if float(d.score) >= self.score_thresh]
return dets
def _preprocess(self, img_hwc: np.ndarray, inp_cfg: Dict[str, Any]) -> Tuple[np.ndarray, _ResizeMeta]:
import cv2
h, w = int(img_hwc.shape[0]), int(img_hwc.shape[1])
resize_cfg = inp_cfg.get("resize", None)
if not resize_cfg:
in_w, in_h = w, h
meta = _ResizeMeta(orig_w=w, orig_h=h, in_w=w, in_h=h, mode="none", scale_x=1.0, scale_y=1.0, pad_x=0.0, pad_y=0.0)
resized = img_hwc
else:
size = resize_cfg.get("size")
if not (isinstance(size, (list, tuple)) and len(size) == 2):
raise ValueError("input.resize.size must be [w,h]")
in_w, in_h = int(size[0]), int(size[1])
mode = str(resize_cfg.get("mode", "stretch")).lower()
if mode == "stretch":
resized = cv2.resize(img_hwc, (in_w, in_h), interpolation=cv2.INTER_LINEAR)
meta = _ResizeMeta(
orig_w=w,
orig_h=h,
in_w=in_w,
in_h=in_h,
mode="stretch",
scale_x=in_w / float(w),
scale_y=in_h / float(h),
pad_x=0.0,
pad_y=0.0,
)
elif mode == "keep_ratio":
scale = min(in_w / float(w), in_h / float(h))
new_w = int(round(w * scale))
new_h = int(round(h * scale))
resized_small = cv2.resize(img_hwc, (new_w, new_h), interpolation=cv2.INTER_LINEAR)
canvas = np.zeros((in_h, in_w, 3), dtype=resized_small.dtype)
pad_x = (in_w - new_w) // 2
pad_y = (in_h - new_h) // 2
canvas[pad_y : pad_y + new_h, pad_x : pad_x + new_w] = resized_small
resized = canvas
meta = _ResizeMeta(
orig_w=w,
orig_h=h,
in_w=in_w,
in_h=in_h,
mode="keep_ratio",
scale_x=scale,
scale_y=scale,
pad_x=float(pad_x),
pad_y=float(pad_y),
)
else:
raise ValueError("input.resize.mode must be stretch|keep_ratio")
dtype = str(inp_cfg.get("dtype", "float32")).lower()
x = resized
if dtype in ("float32", "fp32"):
x = x.astype(np.float32)
elif dtype in ("uint8",):
x = x.astype(np.uint8)
else:
raise ValueError(f"unsupported input.dtype: {dtype}")
norm = inp_cfg.get("normalize", None)
if norm and dtype in ("float32", "fp32"):
scale = float(norm.get("scale", 1.0))
mean = norm.get("mean", [0.0, 0.0, 0.0])
std = norm.get("std", [1.0, 1.0, 1.0])
mean = np.asarray(mean, dtype=np.float32).reshape(1, 1, 3)
std = np.asarray(std, dtype=np.float32).reshape(1, 1, 3)
x = x * scale
x = (x - mean) / std
layout = str(inp_cfg.get("layout", "NCHW")).upper()
if layout == "NHWC":
x = np.expand_dims(x, axis=0)
elif layout == "NCHW":
x = np.transpose(x, (2, 0, 1))
x = np.expand_dims(x, axis=0)
else:
raise ValueError("input.layout must be NCHW|NHWC")
return x, meta
def _parse_outputs(self, out_by_name: Dict[str, Any], meta: _ResizeMeta) -> List[Detection]:
decoder_cfg = self.cfg.get("decoder")
if decoder_cfg and str(decoder_cfg.get("type", "")).lower() == "retinaface":
return self._parse_outputs_retinaface(out_by_name, meta, decoder_cfg)
out_cfg = self.cfg.get("outputs", {})
bbox_cfg = out_cfg.get("bbox")
lmk_cfg = out_cfg.get("landmarks")
score_cfg = out_cfg.get("score")
if not bbox_cfg or not lmk_cfg:
raise ValueError("det_outputs_config must include either decoder.type=retinaface OR outputs.bbox+outputs.landmarks")
bbox_arr = self._select_output(out_by_name, bbox_cfg)
lmk_arr = self._select_output(out_by_name, lmk_cfg)
score_arr = self._select_output(out_by_name, score_cfg) if score_cfg else None
bbox = self._to_Nx4(bbox_arr)
lmks = self._to_landmarks(lmk_arr, lmk_cfg)
if score_arr is None:
scores = np.ones((bbox.shape[0],), dtype=np.float32)
else:
scores = np.asarray(score_arr, dtype=np.float32)
scores = scores.reshape(-1)
if scores.size == bbox.shape[0] * 1:
scores = scores[: bbox.shape[0]]
elif scores.size != bbox.shape[0]:
raise ValueError(f"score count mismatch: scores={scores.size}, bbox={bbox.shape[0]}")
bbox_format = str(bbox_cfg.get("format", "xyxy")).lower()
bbox_norm = bool(bbox_cfg.get("normalized", False))
lmk_norm = bool(lmk_cfg.get("normalized", False))
if bbox_norm:
bbox = bbox.copy()
bbox[:, [0, 2]] *= float(meta.in_w)
bbox[:, [1, 3]] *= float(meta.in_h)
if bbox_format == "xywh":
bbox = bbox.copy()
bbox[:, 2] = bbox[:, 0] + bbox[:, 2]
bbox[:, 3] = bbox[:, 1] + bbox[:, 3]
elif bbox_format != "xyxy":
raise ValueError("outputs.bbox.format must be xyxy|xywh")
if lmk_norm:
lmks = lmks.copy()
lmks[:, :, 0] *= float(meta.in_w)
lmks[:, :, 1] *= float(meta.in_h)
bbox, lmks = self._map_to_original(bbox, lmks, meta)
bbox = self._clip_bbox(bbox, meta.orig_w, meta.orig_h)
dets: List[Detection] = []
for i in range(bbox.shape[0]):
dets.append(
Detection(
bbox_xyxy=bbox[i].astype(np.float32),
landmarks5=lmks[i].astype(np.float32),
score=float(scores[i]),
)
)
return dets
def _parse_outputs_retinaface(self, out_by_name: Dict[str, Any], meta: _ResizeMeta, decoder_cfg: Dict[str, Any]) -> List[Detection]:
out_cfg = self.cfg.get("outputs", {})
loc_spec = out_cfg.get("loc") or out_cfg.get("bbox")
conf_spec = out_cfg.get("conf") or out_cfg.get("score")
lmk_spec = out_cfg.get("landmarks")
if not loc_spec or not conf_spec or not lmk_spec:
raise ValueError("retinaface decoder requires outputs.loc, outputs.conf, outputs.landmarks")
loc = np.asarray(self._select_output(out_by_name, loc_spec), dtype=np.float32)
conf = np.asarray(self._select_output(out_by_name, conf_spec), dtype=np.float32)
landms = np.asarray(self._select_output(out_by_name, lmk_spec), dtype=np.float32)
if loc.ndim == 3 and loc.shape[0] == 1:
loc = loc[0]
if conf.ndim == 3 and conf.shape[0] == 1:
conf = conf[0]
if landms.ndim == 3 and landms.shape[0] == 1:
landms = landms[0]
if loc.ndim != 2 or loc.shape[1] != 4:
raise ValueError(f"retinaface loc must be [N,4] (or [1,N,4]); got {loc.shape}")
if conf.ndim != 2 or conf.shape[1] != 2:
raise ValueError(f"retinaface conf must be [N,2] (or [1,N,2]); got {conf.shape}")
if landms.ndim != 2 or landms.shape[1] != 10:
raise ValueError(f"retinaface landmarks must be [N,10] (or [1,N,10]); got {landms.shape}")
steps = decoder_cfg.get("steps", [8, 16, 32])
min_sizes = decoder_cfg.get("min_sizes", [[16, 32], [64, 128], [256, 512]])
variances = decoder_cfg.get("variances", [0.1, 0.2])
score_index = int(decoder_cfg.get("score_index", 1))
nms_iou = float(decoder_cfg.get("nms_iou_thresh", 0.4))
top_k = int(decoder_cfg.get("top_k", 5000))
keep_top_k = int(decoder_cfg.get("keep_top_k", 750))
prob_mode = str(decoder_cfg.get("conf_mode", "auto")).lower() # auto|prob|logits
priors = self._retinaface_priors(meta.in_w, meta.in_h, steps=steps, min_sizes=min_sizes)
if priors.shape[0] != loc.shape[0]:
raise ValueError(f"prior count mismatch: priors={priors.shape[0]} loc={loc.shape[0]}")
scores = self._retinaface_scores(conf, score_index=score_index, mode=prob_mode)
# filter
keep = np.where(scores >= float(self.score_thresh))[0] if self.score_thresh > 0 else np.arange(scores.size)
if keep.size == 0:
return []
if top_k > 0 and keep.size > top_k:
idx = np.argsort(scores[keep])[::-1][:top_k]
keep = keep[idx]
pri = priors[keep]
loc_k = loc[keep]
lmk_k = landms[keep].reshape(-1, 5, 2)
sc_k = scores[keep]
bbox_in, lmks_in = self._retinaface_decode(pri, loc_k, lmk_k, meta.in_w, meta.in_h, variances=variances)
order = np.argsort(sc_k)[::-1]
bbox_in = bbox_in[order]
lmks_in = lmks_in[order]
sc_k = sc_k[order]
keep_nms = self._nms_xyxy(bbox_in, sc_k, iou_thresh=nms_iou)
if keep_top_k > 0:
keep_nms = keep_nms[:keep_top_k]
bbox_in = bbox_in[keep_nms]
lmks_in = lmks_in[keep_nms]
sc_k = sc_k[keep_nms]
bbox, lmks = self._map_to_original(bbox_in, lmks_in, meta)
bbox = self._clip_bbox(bbox, meta.orig_w, meta.orig_h)
dets: List[Detection] = []
for i in range(bbox.shape[0]):
dets.append(Detection(bbox_xyxy=bbox[i].astype(np.float32), landmarks5=lmks[i].astype(np.float32), score=float(sc_k[i])))
return dets
def _retinaface_priors(self, in_w: int, in_h: int, steps: Sequence[int], min_sizes: Sequence[Sequence[int]]) -> np.ndarray:
from itertools import product
priors: List[List[float]] = []
for k, step in enumerate(steps):
fm_h = int(np.ceil(in_h / float(step)))
fm_w = int(np.ceil(in_w / float(step)))
for i, j in product(range(fm_h), range(fm_w)):
for ms in min_sizes[k]:
s_kx = ms / float(in_w)
s_ky = ms / float(in_h)
cx = (j + 0.5) * step / float(in_w)
cy = (i + 0.5) * step / float(in_h)
priors.append([cx, cy, s_kx, s_ky])
return np.asarray(priors, dtype=np.float32)
def _retinaface_scores(self, conf: np.ndarray, score_index: int, mode: str) -> np.ndarray:
x = conf.astype(np.float32)
if mode == "prob":
prob = x
elif mode == "logits":
prob = self._softmax(x, axis=1)
else: # auto
row_sum = x.sum(axis=1)
looks_prob = (x.min() >= 0.0) and (x.max() <= 1.0) and (np.mean(np.abs(row_sum - 1.0)) < 1e-2)
prob = x if looks_prob else self._softmax(x, axis=1)
if score_index < 0 or score_index >= prob.shape[1]:
raise ValueError(f"score_index out of range: {score_index}")
return prob[:, score_index]
def _retinaface_decode(
self,
priors: np.ndarray,
loc: np.ndarray,
landms: np.ndarray,
in_w: int,
in_h: int,
variances: Sequence[float],
) -> Tuple[np.ndarray, np.ndarray]:
v0 = float(variances[0])
v1 = float(variances[1])
pri_c = priors[:, 0:2]
pri_s = priors[:, 2:4]
boxes_c = pri_c + loc[:, 0:2] * v0 * pri_s
boxes_s = pri_s * np.exp(loc[:, 2:4] * v1)
boxes = np.concatenate([boxes_c - boxes_s / 2.0, boxes_c + boxes_s / 2.0], axis=1)
boxes[:, [0, 2]] *= float(in_w)
boxes[:, [1, 3]] *= float(in_h)
lm = pri_c[:, None, :] + landms * v0 * pri_s[:, None, :]
lm[:, :, 0] *= float(in_w)
lm[:, :, 1] *= float(in_h)
return boxes.astype(np.float32), lm.astype(np.float32)
def _softmax(self, x: np.ndarray, axis: int = -1) -> np.ndarray:
x = x.astype(np.float32)
m = np.max(x, axis=axis, keepdims=True)
e = np.exp(x - m)
s = np.sum(e, axis=axis, keepdims=True)
return e / s
def _nms_xyxy(self, boxes: np.ndarray, scores: np.ndarray, iou_thresh: float) -> List[int]:
b = boxes.astype(np.float32)
s = scores.astype(np.float32)
x1 = b[:, 0]
y1 = b[:, 1]
x2 = b[:, 2]
y2 = b[:, 3]
areas = np.maximum(0.0, x2 - x1) * np.maximum(0.0, y2 - y1)
order = np.argsort(s)[::-1]
keep: List[int] = []
while order.size > 0:
i = int(order[0])
keep.append(i)
if order.size == 1:
break
rest = order[1:]
xx1 = np.maximum(x1[i], x1[rest])
yy1 = np.maximum(y1[i], y1[rest])
xx2 = np.minimum(x2[i], x2[rest])
yy2 = np.minimum(y2[i], y2[rest])
w = np.maximum(0.0, xx2 - xx1)
h = np.maximum(0.0, yy2 - yy1)
inter = w * h
union = areas[i] + areas[rest] - inter
iou = np.where(union > 0, inter / union, 0.0)
inds = np.where(iou <= float(iou_thresh))[0]
order = rest[inds]
return keep
def _select_output(self, out_by_name: Dict[str, Any], spec: Optional[Dict[str, Any]]) -> Any:
if spec is None:
return None
if "name" in spec:
name = spec["name"]
if name not in out_by_name:
raise KeyError(f"output not found: {name}")
return out_by_name[name]
if "index" in spec:
idx = int(spec["index"])
keys = list(out_by_name.keys())
if idx < 0 or idx >= len(keys):
raise IndexError(f"output index out of range: {idx}")
return out_by_name[keys[idx]]
raise ValueError("output spec must include name or index")
def _to_Nx4(self, arr: Any) -> np.ndarray:
x = np.asarray(arr)
if x.ndim == 3 and x.shape[0] == 1:
x = x[0]
if x.ndim != 2 or x.shape[1] != 4:
raise ValueError(f"bbox output must be [N,4] (or [1,N,4]); got {x.shape}")
return x.astype(np.float32)
def _to_landmarks(self, arr: Any, lmk_cfg: Dict[str, Any]) -> np.ndarray:
x = np.asarray(arr)
if x.ndim == 4 and x.shape[0] == 1:
x = x[0]
if x.ndim == 3 and x.shape[0] == 1:
x = x[0]
layout = str(lmk_cfg.get("layout", "flat10")).lower()
if layout == "flat10":
if x.ndim != 2 or x.shape[1] != 10:
raise ValueError(f"landmarks flat10 must be [N,10]; got {x.shape}")
x = x.reshape(-1, 5, 2)
elif layout in ("5x2", "five_two"):
if x.ndim != 3 or x.shape[1:] != (5, 2):
raise ValueError(f"landmarks 5x2 must be [N,5,2]; got {x.shape}")
else:
raise ValueError("outputs.landmarks.layout must be flat10|5x2")
order = lmk_cfg.get("order")
if order:
x = self._reorder_landmarks(x, order)
return x.astype(np.float32)
def _reorder_landmarks(self, lmks: np.ndarray, order: Sequence[str]) -> np.ndarray:
order = [str(o) for o in order]
if sorted(order) != sorted(_CANONICAL_LMK_ORDER):
raise ValueError(f"outputs.landmarks.order must be a permutation of {_CANONICAL_LMK_ORDER}")
idx = {name: i for i, name in enumerate(order)}
take = [idx[name] for name in _CANONICAL_LMK_ORDER]
return lmks[:, take, :]
def _map_to_original(self, bbox_xyxy_in: np.ndarray, lmks_in: np.ndarray, meta: _ResizeMeta) -> Tuple[np.ndarray, np.ndarray]:
if meta.mode == "none":
return bbox_xyxy_in, lmks_in
if meta.mode == "stretch":
sx = meta.scale_x
sy = meta.scale_y
bbox = bbox_xyxy_in.copy()
bbox[:, [0, 2]] /= sx
bbox[:, [1, 3]] /= sy
lmks = lmks_in.copy()
lmks[:, :, 0] /= sx
lmks[:, :, 1] /= sy
return bbox, lmks
if meta.mode == "keep_ratio":
s = meta.scale_x
px = meta.pad_x
py = meta.pad_y
bbox = bbox_xyxy_in.copy()
bbox[:, [0, 2]] = (bbox[:, [0, 2]] - px) / s
bbox[:, [1, 3]] = (bbox[:, [1, 3]] - py) / s
lmks = lmks_in.copy()
lmks[:, :, 0] = (lmks[:, :, 0] - px) / s
lmks[:, :, 1] = (lmks[:, :, 1] - py) / s
return bbox, lmks
raise ValueError(f"unknown resize mode: {meta.mode}")
def _clip_bbox(self, bbox: np.ndarray, w: int, h: int) -> np.ndarray:
b = bbox.copy()
b[:, 0] = np.clip(b[:, 0], 0, w - 1)
b[:, 1] = np.clip(b[:, 1], 0, h - 1)
b[:, 2] = np.clip(b[:, 2], 0, w - 1)
b[:, 3] = np.clip(b[:, 3], 0, h - 1)
return b