OrangePi3588Media/tools/render_config.py

345 lines
13 KiB
Python

#!/usr/bin/env python3
"""Render media-server template/profile/overlay config into one root config."""
from __future__ import annotations
import argparse
import copy
import json
import re
from pathlib import Path
from typing import Any
JsonObject = dict[str, Any]
SLOT_TOKEN_RE = re.compile(r"^\$\{slot:([A-Za-z0-9_.-]+)\.([A-Za-z0-9_.-]+)\}$")
def deep_merge(base: Any, override: Any) -> Any:
if isinstance(base, dict) and isinstance(override, dict):
merged = copy.deepcopy(base)
for key, value in override.items():
if key in merged:
merged[key] = deep_merge(merged[key], value)
else:
merged[key] = copy.deepcopy(value)
return merged
return copy.deepcopy(override)
def load_json(path: Path) -> JsonObject:
with path.open("r", encoding="utf-8-sig") as f:
data = json.load(f)
if not isinstance(data, dict):
raise ValueError(f"{path}: root must be a JSON object")
return data
def template_name(template_doc: JsonObject, template_path: Path) -> str:
name = str(template_doc.get("name") or template_path.stem).strip()
if not name:
raise ValueError(f"{template_path}: template name is empty")
return name
def template_body(template_doc: JsonObject) -> JsonObject:
body = template_doc.get("template", template_doc)
if not isinstance(body, dict):
raise ValueError("template body must be a JSON object")
if not isinstance(body.get("nodes"), list) or not isinstance(body.get("edges"), list):
raise ValueError("template body must contain nodes[] and edges[]")
allowed = {"executor", "nodes", "edges"}
return {key: copy.deepcopy(value) for key, value in body.items() if key in allowed}
def template_params(template_doc: JsonObject) -> JsonObject:
params = template_doc.get("params", {})
if params is None:
return {}
if not isinstance(params, dict):
raise ValueError("template params must be a JSON object")
return copy.deepcopy(params)
def instance_scene_meta(instance: JsonObject) -> JsonObject:
scene_meta = instance.get("scene_meta", {})
if scene_meta is None:
return {}
if not isinstance(scene_meta, dict):
raise ValueError("instance.scene_meta must be an object")
return copy.deepcopy(scene_meta)
def instance_extra_params(instance: JsonObject) -> JsonObject:
params = instance.get("params", {})
if params is None:
return {}
if not isinstance(params, dict):
raise ValueError("instance.params must be a JSON object")
return copy.deepcopy(params)
def binding_group(instance: JsonObject, key: str) -> JsonObject:
value = instance.get(key, {})
if value is None:
return {}
if not isinstance(value, dict):
raise ValueError(f"instance.{key} must be an object")
return copy.deepcopy(value)
def resolved_binding_value(entry: JsonObject) -> JsonObject:
if not isinstance(entry, dict):
raise ValueError("binding entry must be an object")
resolved = entry.get("resolved")
if resolved is None:
return copy.deepcopy(entry)
if not isinstance(resolved, dict):
raise ValueError("binding resolved payload must be an object")
return copy.deepcopy(resolved)
def build_binding_context(instance: JsonObject) -> JsonObject:
context: JsonObject = {
"scene": instance_scene_meta(instance),
}
for group_name in ("input_bindings", "service_bindings", "output_bindings"):
group = binding_group(instance, group_name)
for slot_name, value in group.items():
context[slot_name] = resolved_binding_value(value)
return context
def expand_slot_tokens(value: Any, binding_context: JsonObject) -> Any:
if isinstance(value, dict):
return {key: expand_slot_tokens(item, binding_context) for key, item in value.items()}
if isinstance(value, list):
return [expand_slot_tokens(item, binding_context) for item in value]
if not isinstance(value, str):
return copy.deepcopy(value)
match = SLOT_TOKEN_RE.match(value.strip())
if not match:
return value
slot_name, field_name = match.groups()
slot_values = binding_context.get(slot_name)
if not isinstance(slot_values, dict):
raise ValueError(f"required slot '{slot_name}' is not bound")
if field_name not in slot_values:
raise ValueError(f"required slot field '{slot_name}.{field_name}' is not bound")
return copy.deepcopy(slot_values[field_name])
def render_scene_instance(template_doc: JsonObject, scene_instance: JsonObject) -> tuple[str, JsonObject, JsonObject]:
instance_name = str(scene_instance.get("name") or "").strip()
if not instance_name:
raise ValueError("scene instance name is required")
bound_template_name = f"{template_name(template_doc, Path(instance_name))}__{instance_name}"
binding_context = build_binding_context(scene_instance)
rendered_template = expand_slot_tokens(template_body(template_doc), binding_context)
rendered_instance = {
"name": instance_name,
"template": bound_template_name,
}
scene_meta = instance_scene_meta(scene_instance)
if scene_meta:
rendered_instance["scene_meta"] = scene_meta
extra_params = instance_extra_params(scene_instance)
if extra_params:
rendered_instance["params"] = extra_params
return bound_template_name, rendered_template, rendered_instance
def profile_instances(profile: JsonObject, tpl_name: str) -> list[JsonObject]:
if "instances" in profile:
instances = profile["instances"]
if not isinstance(instances, list):
raise ValueError("profile.instances must be an array")
out = []
for item in instances:
if not isinstance(item, dict):
raise ValueError("profile.instances entries must be objects")
inst = copy.deepcopy(item)
inst.setdefault("template", tpl_name)
out.append(inst)
return out
name = str(profile.get("name", "")).strip()
if not name:
raise ValueError("profile must contain name or instances[]")
return [
{
"name": name,
"template": tpl_name,
"params": copy.deepcopy(profile.get("params", {})),
**({"override": copy.deepcopy(profile["override"])} if "override" in profile else {}),
}
]
def merge_template_params(instances: list[JsonObject], shared_params: JsonObject) -> list[JsonObject]:
if not shared_params:
return instances
out: list[JsonObject] = []
for item in instances:
inst = copy.deepcopy(item)
params = inst.get("params", {})
if params is None:
params = {}
if not isinstance(params, dict):
raise ValueError("instance params must be a JSON object")
inst["params"] = deep_merge(shared_params, params)
out.append(inst)
return out
def merge_instance_patch(instance: JsonObject, patch: JsonObject) -> JsonObject:
merged = copy.deepcopy(instance)
if "params" in patch:
merged["params"] = deep_merge(merged.get("params", {}), patch["params"])
if "override" in patch:
merged["override"] = deep_merge(merged.get("override", {}), patch["override"])
for key, value in patch.items():
if key not in {"name", "template", "params", "override"}:
merged[key] = deep_merge(merged.get(key), value)
return merged
def apply_overlay(root: JsonObject, overlay: JsonObject) -> JsonObject:
out = copy.deepcopy(root)
for key in ("global", "queue", "templates"):
if key in overlay:
out[key] = deep_merge(out.get(key, {}), overlay[key])
patches = overlay.get("instance_overrides", {})
if patches:
if not isinstance(patches, dict):
raise ValueError("overlay.instance_overrides must be an object")
instances = []
for inst in out.get("instances", []):
merged = copy.deepcopy(inst)
if "*" in patches:
merged = merge_instance_patch(merged, patches["*"])
name = merged.get("name")
if name in patches:
merged = merge_instance_patch(merged, patches[name])
instances.append(merged)
out["instances"] = instances
if "instances" in overlay:
if not isinstance(overlay["instances"], list):
raise ValueError("overlay.instances must be an array")
by_name = {inst.get("name"): i for i, inst in enumerate(out.get("instances", []))}
for patch in overlay["instances"]:
if not isinstance(patch, dict) or not patch.get("name"):
raise ValueError("overlay.instances entries must be objects with name")
name = patch["name"]
if name not in by_name:
raise ValueError(f"overlay instance not found in profile: {name}")
out["instances"][by_name[name]] = merge_instance_patch(out["instances"][by_name[name]], patch)
return out
def render(
template_path: Path,
profile_path: Path,
overlay_paths: list[Path],
metadata: JsonObject | None = None,
) -> JsonObject:
template_doc = load_json(template_path)
profile = load_json(profile_path)
tpl_name = template_name(template_doc, template_path)
instances = merge_template_params(profile_instances(profile, tpl_name), template_params(template_doc))
rendered_templates: JsonObject = {}
rendered_instances: list[JsonObject] = []
for instance in instances:
bound_name, bound_template, rendered_instance = render_scene_instance(template_doc, instance)
rendered_templates[bound_name] = bound_template
rendered_instances.append(rendered_instance)
root: JsonObject = {
"templates": rendered_templates,
"instances": rendered_instances,
}
for key in ("global", "queue"):
if key in profile:
root[key] = copy.deepcopy(profile[key])
for overlay_path in overlay_paths:
root = apply_overlay(root, load_json(overlay_path))
if metadata is not None:
profile_name = str(profile.get("name") or profile_path.stem).strip()
instance_names: list[str] = []
instance_display_names: list[str] = []
for instance in root.get("instances", []):
if not isinstance(instance, dict):
continue
name = str(instance.get("name") or "").strip()
if name:
instance_names.append(name)
display_name = str(as_map(instance.get("scene_meta")).get("display_name") or "").strip()
if display_name:
instance_display_names.append(display_name)
root["metadata"] = {
"template": tpl_name,
"template_path": template_path.as_posix(),
"profile": profile_name,
"business_name": str(profile.get("business_name") or "").strip(),
"profile_path": profile_path.as_posix(),
"instance_names": instance_names,
"instance_display_names": instance_display_names,
"overlays": [p.stem for p in overlay_paths],
"overlay_paths": [p.as_posix() for p in overlay_paths],
**copy.deepcopy(metadata),
}
return root
def as_map(value: Any) -> JsonObject:
if isinstance(value, dict):
return value
return {}
def main() -> int:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--template", required=True, type=Path)
parser.add_argument("--profile", required=True, type=Path)
parser.add_argument("--overlay", action="append", default=[], type=Path)
parser.add_argument("--out", required=True, type=Path)
parser.add_argument("--config-id", default="")
parser.add_argument("--config-version", default="")
parser.add_argument("--rendered-at", default="")
parser.add_argument("--metadata-json", default="")
args = parser.parse_args()
metadata: JsonObject | None = None
if args.metadata_json:
parsed = json.loads(args.metadata_json)
if not isinstance(parsed, dict):
raise ValueError("--metadata-json must be a JSON object")
metadata = parsed
if args.config_id or args.config_version or args.rendered_at:
metadata = {} if metadata is None else metadata
if args.config_id:
metadata["config_id"] = args.config_id
if args.config_version:
metadata["config_version"] = args.config_version
if args.rendered_at:
metadata["rendered_at"] = args.rendered_at
if metadata is not None:
metadata.setdefault("rendered_by", "tools/render_config.py")
rendered = render(args.template, args.profile, args.overlay, metadata=metadata)
args.out.parent.mkdir(parents=True, exist_ok=True)
args.out.write_text(
json.dumps(rendered, ensure_ascii=False, indent=2) + "\n",
encoding="utf-8",
)
return 0
if __name__ == "__main__":
raise SystemExit(main())