242 lines
8.9 KiB
Python
242 lines
8.9 KiB
Python
#!/usr/bin/env python3
|
|
"""Render media-server template/profile/overlay config into one root config."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import copy
|
|
import json
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
|
|
JsonObject = dict[str, Any]
|
|
|
|
|
|
def deep_merge(base: Any, override: Any) -> Any:
|
|
if isinstance(base, dict) and isinstance(override, dict):
|
|
merged = copy.deepcopy(base)
|
|
for key, value in override.items():
|
|
if key in merged:
|
|
merged[key] = deep_merge(merged[key], value)
|
|
else:
|
|
merged[key] = copy.deepcopy(value)
|
|
return merged
|
|
return copy.deepcopy(override)
|
|
|
|
|
|
def load_json(path: Path) -> JsonObject:
|
|
with path.open("r", encoding="utf-8-sig") as f:
|
|
data = json.load(f)
|
|
if not isinstance(data, dict):
|
|
raise ValueError(f"{path}: root must be a JSON object")
|
|
return data
|
|
|
|
|
|
def template_name(template_doc: JsonObject, template_path: Path) -> str:
|
|
name = str(template_doc.get("name") or template_path.stem).strip()
|
|
if not name:
|
|
raise ValueError(f"{template_path}: template name is empty")
|
|
return name
|
|
|
|
|
|
def template_body(template_doc: JsonObject) -> JsonObject:
|
|
body = template_doc.get("template", template_doc)
|
|
if not isinstance(body, dict):
|
|
raise ValueError("template body must be a JSON object")
|
|
if not isinstance(body.get("nodes"), list) or not isinstance(body.get("edges"), list):
|
|
raise ValueError("template body must contain nodes[] and edges[]")
|
|
allowed = {"executor", "nodes", "edges"}
|
|
return {key: copy.deepcopy(value) for key, value in body.items() if key in allowed}
|
|
|
|
|
|
def template_params(template_doc: JsonObject) -> JsonObject:
|
|
params = template_doc.get("params", {})
|
|
if params is None:
|
|
return {}
|
|
if not isinstance(params, dict):
|
|
raise ValueError("template params must be a JSON object")
|
|
return copy.deepcopy(params)
|
|
|
|
|
|
def profile_instances(profile: JsonObject, tpl_name: str) -> list[JsonObject]:
|
|
if "instances" in profile:
|
|
instances = profile["instances"]
|
|
if not isinstance(instances, list):
|
|
raise ValueError("profile.instances must be an array")
|
|
out = []
|
|
for item in instances:
|
|
if not isinstance(item, dict):
|
|
raise ValueError("profile.instances entries must be objects")
|
|
inst = copy.deepcopy(item)
|
|
inst.setdefault("template", tpl_name)
|
|
out.append(inst)
|
|
return out
|
|
|
|
name = str(profile.get("name", "")).strip()
|
|
if not name:
|
|
raise ValueError("profile must contain name or instances[]")
|
|
return [
|
|
{
|
|
"name": name,
|
|
"template": tpl_name,
|
|
"params": copy.deepcopy(profile.get("params", {})),
|
|
**({"override": copy.deepcopy(profile["override"])} if "override" in profile else {}),
|
|
}
|
|
]
|
|
|
|
|
|
def merge_template_params(instances: list[JsonObject], shared_params: JsonObject) -> list[JsonObject]:
|
|
if not shared_params:
|
|
return instances
|
|
out: list[JsonObject] = []
|
|
for item in instances:
|
|
inst = copy.deepcopy(item)
|
|
params = inst.get("params", {})
|
|
if params is None:
|
|
params = {}
|
|
if not isinstance(params, dict):
|
|
raise ValueError("instance params must be a JSON object")
|
|
inst["params"] = deep_merge(shared_params, params)
|
|
out.append(inst)
|
|
return out
|
|
|
|
|
|
def merge_instance_patch(instance: JsonObject, patch: JsonObject) -> JsonObject:
|
|
merged = copy.deepcopy(instance)
|
|
if "params" in patch:
|
|
merged["params"] = deep_merge(merged.get("params", {}), patch["params"])
|
|
if "override" in patch:
|
|
merged["override"] = deep_merge(merged.get("override", {}), patch["override"])
|
|
for key, value in patch.items():
|
|
if key not in {"name", "template", "params", "override"}:
|
|
merged[key] = deep_merge(merged.get(key), value)
|
|
return merged
|
|
|
|
|
|
def apply_overlay(root: JsonObject, overlay: JsonObject) -> JsonObject:
|
|
out = copy.deepcopy(root)
|
|
for key in ("global", "queue", "templates"):
|
|
if key in overlay:
|
|
out[key] = deep_merge(out.get(key, {}), overlay[key])
|
|
|
|
patches = overlay.get("instance_overrides", {})
|
|
if patches:
|
|
if not isinstance(patches, dict):
|
|
raise ValueError("overlay.instance_overrides must be an object")
|
|
instances = []
|
|
for inst in out.get("instances", []):
|
|
merged = copy.deepcopy(inst)
|
|
if "*" in patches:
|
|
merged = merge_instance_patch(merged, patches["*"])
|
|
name = merged.get("name")
|
|
if name in patches:
|
|
merged = merge_instance_patch(merged, patches[name])
|
|
instances.append(merged)
|
|
out["instances"] = instances
|
|
|
|
if "instances" in overlay:
|
|
if not isinstance(overlay["instances"], list):
|
|
raise ValueError("overlay.instances must be an array")
|
|
by_name = {inst.get("name"): i for i, inst in enumerate(out.get("instances", []))}
|
|
for patch in overlay["instances"]:
|
|
if not isinstance(patch, dict) or not patch.get("name"):
|
|
raise ValueError("overlay.instances entries must be objects with name")
|
|
name = patch["name"]
|
|
if name not in by_name:
|
|
raise ValueError(f"overlay instance not found in profile: {name}")
|
|
out["instances"][by_name[name]] = merge_instance_patch(out["instances"][by_name[name]], patch)
|
|
|
|
return out
|
|
|
|
|
|
def render(
|
|
template_path: Path,
|
|
profile_path: Path,
|
|
overlay_paths: list[Path],
|
|
metadata: JsonObject | None = None,
|
|
) -> JsonObject:
|
|
template_doc = load_json(template_path)
|
|
profile = load_json(profile_path)
|
|
tpl_name = template_name(template_doc, template_path)
|
|
|
|
root: JsonObject = {
|
|
"templates": {tpl_name: template_body(template_doc)},
|
|
"instances": merge_template_params(profile_instances(profile, tpl_name), template_params(template_doc)),
|
|
}
|
|
for key in ("global", "queue"):
|
|
if key in profile:
|
|
root[key] = copy.deepcopy(profile[key])
|
|
|
|
for overlay_path in overlay_paths:
|
|
root = apply_overlay(root, load_json(overlay_path))
|
|
if metadata is not None:
|
|
profile_name = str(profile.get("name") or profile_path.stem).strip()
|
|
instance_names: list[str] = []
|
|
instance_display_names: list[str] = []
|
|
for instance in root.get("instances", []):
|
|
if not isinstance(instance, dict):
|
|
continue
|
|
name = str(instance.get("name") or "").strip()
|
|
if name:
|
|
instance_names.append(name)
|
|
params = instance.get("params", {})
|
|
if isinstance(params, dict):
|
|
display_name = str(params.get("display_name") or "").strip()
|
|
if display_name:
|
|
instance_display_names.append(display_name)
|
|
root["metadata"] = {
|
|
"template": tpl_name,
|
|
"template_path": template_path.as_posix(),
|
|
"profile": profile_name,
|
|
"profile_path": profile_path.as_posix(),
|
|
"instance_names": instance_names,
|
|
"instance_display_names": instance_display_names,
|
|
"overlays": [p.stem for p in overlay_paths],
|
|
"overlay_paths": [p.as_posix() for p in overlay_paths],
|
|
**copy.deepcopy(metadata),
|
|
}
|
|
return root
|
|
|
|
|
|
def main() -> int:
|
|
parser = argparse.ArgumentParser(description=__doc__)
|
|
parser.add_argument("--template", required=True, type=Path)
|
|
parser.add_argument("--profile", required=True, type=Path)
|
|
parser.add_argument("--overlay", action="append", default=[], type=Path)
|
|
parser.add_argument("--out", required=True, type=Path)
|
|
parser.add_argument("--config-id", default="")
|
|
parser.add_argument("--config-version", default="")
|
|
parser.add_argument("--rendered-at", default="")
|
|
parser.add_argument("--metadata-json", default="")
|
|
args = parser.parse_args()
|
|
|
|
metadata: JsonObject | None = None
|
|
if args.metadata_json:
|
|
parsed = json.loads(args.metadata_json)
|
|
if not isinstance(parsed, dict):
|
|
raise ValueError("--metadata-json must be a JSON object")
|
|
metadata = parsed
|
|
if args.config_id or args.config_version or args.rendered_at:
|
|
metadata = {} if metadata is None else metadata
|
|
if args.config_id:
|
|
metadata["config_id"] = args.config_id
|
|
if args.config_version:
|
|
metadata["config_version"] = args.config_version
|
|
if args.rendered_at:
|
|
metadata["rendered_at"] = args.rendered_at
|
|
if metadata is not None:
|
|
metadata.setdefault("rendered_by", "tools/render_config.py")
|
|
|
|
rendered = render(args.template, args.profile, args.overlay, metadata=metadata)
|
|
args.out.parent.mkdir(parents=True, exist_ok=True)
|
|
args.out.write_text(
|
|
json.dumps(rendered, ensure_ascii=False, indent=2) + "\n",
|
|
encoding="utf-8",
|
|
)
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|