Adopt slot-driven template rendering

This commit is contained in:
tian 2026-05-03 09:50:46 +08:00
parent 00c9268834
commit cf4493507c
9 changed files with 321 additions and 57 deletions

View File

@ -37,8 +37,8 @@ func (s *Server) handleGraphNodeTypes(w http.ResponseWriter, r *http.Request) {
func graphNodeTypesCatalog() []graphNodeTypeInfo {
return []graphNodeTypeInfo{
nodeType("input_rtsp", "RTSP 输入", "输入", "camera", "从网络摄像机或流媒体地址读取视频流。", "source", map[string]any{"url": "${rtsp_url}"}, []graphNodeParam{
textParam("url", "RTSP 地址", "${rtsp_url}"),
nodeType("input_rtsp", "RTSP 输入", "输入", "camera", "从网络摄像机或流媒体地址读取视频流。", "source", map[string]any{"url": "${slot:video_input_main.url}"}, []graphNodeParam{
textParam("url", "RTSP 地址", "${slot:video_input_main.url}"),
numberParam("fps", "输入帧率", "1"),
numberParam("width", "宽度", "1"),
numberParam("height", "高度", "1"),

View File

@ -0,0 +1,21 @@
package httpapi
import "testing"
func TestGraphNodeTypesDescribeSlotDrivenInputDefaults(t *testing.T) {
items := graphNodeTypesCatalog()
for _, item := range items {
if item.Type != "input_rtsp" {
continue
}
if got := item.Defaults["url"]; got != "${slot:video_input_main.url}" {
t.Fatalf("expected slot-driven input default, got %#v", got)
}
if len(item.Params) == 0 || item.Params[0].Placeholder != "${slot:video_input_main.url}" {
t.Fatalf("expected slot-driven placeholder, got %#v", item.Params)
}
return
}
t.Fatal("expected input_rtsp node type")
}

View File

@ -4,17 +4,51 @@
"instances": [
{
"name": "cam1",
"params": {
"channel_no": "cam1",
"device_code": "rk3588-a-001",
"template": "std_workshop_face_recognition_shoe_alarm",
"scene_meta": {
"display_name": "B厂区通道1",
"site_name": "B厂区",
"device_code": "rk3588-a-001"
},
"input_bindings": {
"video_input_main": {
"resolved": {
"url": "rtsp://10.0.0.49:8554/cam"
}
}
},
"service_bindings": {
"object_storage_main": {
"resolved": {
"endpoint": "http://10.0.0.49:9000",
"bucket": "myminio",
"access_key": "admin",
"secret_key": "password"
}
},
"token_service_main": {
"resolved": {
"get_token_url": "http://10.0.0.49:8080/api/getToken",
"tenant_code": "32"
}
},
"alarm_service_main": {
"resolved": {
"put_message_url": "http://10.0.0.49:8080/api/putMessage",
"tenant_code": "32"
}
}
},
"output_bindings": {
"stream_output_main": {
"resolved": {
"publish_hls_path": "./web/hls/cam1/index.m3u8",
"publish_rtsp_path": "/live/cam1",
"publish_rtsp_port": 8555,
"rtsp_url": "rtsp://10.0.0.49:8554/cam",
"site_name": "B厂区"
},
"template": "std_workshop_face_recognition_shoe_alarm"
"channel_no": "cam1"
}
}
}
}
],
"name": "local_3588_test",

View File

@ -2,7 +2,15 @@
"name": "std_face_recognition_stream",
"description": "1080p 人脸识别流程,包含滑窗人脸检测、人脸识别、画面叠加与视频发布。",
"source": "configs/test_scrfd_640_recog.json",
"params": {},
"slots": {
"inputs": [
{"name": "video_input_main", "type": "video_source", "required": true, "description": "主视频输入"}
],
"services": [],
"outputs": [
{"name": "stream_output_main", "type": "stream_publish", "required": true, "description": "主视频输出"}
]
},
"template": {
"executor": {
"batch_size": 2,
@ -14,7 +22,7 @@
"type": "input_rtsp",
"role": "source",
"enable": true,
"url": "${rtsp_url}",
"url": "${slot:video_input_main.url}",
"fps": 30,
"width": 1920,
"height": 1080,
@ -165,13 +173,13 @@
"outputs": [
{
"proto": "hls",
"path": "${publish_hls_path}",
"path": "${slot:stream_output_main.publish_hls_path}",
"segment_sec": 2
},
{
"proto": "rtsp_server",
"port": "${publish_rtsp_port}",
"path": "${publish_rtsp_path}"
"port": "${slot:stream_output_main.publish_rtsp_port}",
"path": "${slot:stream_output_main.publish_rtsp_path}"
}
]
}

View File

@ -2,7 +2,15 @@
"name": "std_service_test_stream",
"description": "最简服务可用性测试流程,用于验证视频解码、预处理、编码与发布链路是否正常。",
"source": "configs/test_face_only.json",
"params": {},
"slots": {
"inputs": [
{"name": "video_input_main", "type": "video_source", "required": true, "description": "主视频输入"}
],
"services": [],
"outputs": [
{"name": "stream_output_main", "type": "stream_publish", "required": true, "description": "主视频输出"}
]
},
"template": {
"executor": {
"batch_size": 1,
@ -14,7 +22,7 @@
"type": "input_rtsp",
"role": "source",
"enable": true,
"url": "${rtsp_url}",
"url": "${slot:video_input_main.url}",
"fps": 25,
"width": 1280,
"height": 720,
@ -50,13 +58,13 @@
"outputs": [
{
"proto": "hls",
"path": "${publish_hls_path}",
"path": "${slot:stream_output_main.publish_hls_path}",
"segment_sec": 2
},
{
"proto": "rtsp_server",
"port": "${publish_rtsp_port}",
"path": "${publish_rtsp_path}"
"port": "${slot:stream_output_main.publish_rtsp_port}",
"path": "${slot:stream_output_main.publish_rtsp_path}"
}
]
}

View File

@ -2,7 +2,15 @@
"name": "std_workshoe_detection_stream",
"description": "1080p 劳保鞋检测流程,包含人员检测、人员跟踪、劳保鞋规则判断、画面叠加与视频发布。",
"source": "configs/full_pipeline_1080p_test_alarm.json",
"params": {},
"slots": {
"inputs": [
{"name": "video_input_main", "type": "video_source", "required": true, "description": "主视频输入"}
],
"services": [],
"outputs": [
{"name": "stream_output_main", "type": "stream_publish", "required": true, "description": "主视频输出"}
]
},
"template": {
"executor": {
"batch_size": 2,
@ -14,7 +22,7 @@
"type": "input_rtsp",
"role": "source",
"enable": true,
"url": "${rtsp_url}",
"url": "${slot:video_input_main.url}",
"fps": 30,
"width": 1920,
"height": 1080,
@ -241,13 +249,13 @@
"outputs": [
{
"proto": "hls",
"path": "${publish_hls_path}",
"path": "${slot:stream_output_main.publish_hls_path}",
"segment_sec": 2
},
{
"proto": "rtsp_server",
"port": "${publish_rtsp_port}",
"path": "${publish_rtsp_path}"
"port": "${slot:stream_output_main.publish_rtsp_port}",
"path": "${slot:stream_output_main.publish_rtsp_path}"
}
]
}

View File

@ -2,14 +2,18 @@
"name": "std_workshop_face_recognition_shoe_alarm",
"description": "1080p 车间全功能识别流程,包含人脸检测与识别、人员跟踪、劳保鞋检测、画面叠加、视频发布、告警上报以及抓拍存证。",
"source": "configs/full_pipeline_1080p_test_alarm.json",
"params": {
"minio_endpoint": "http://10.0.0.49:9000",
"minio_bucket": "myminio",
"minio_access_key": "admin",
"minio_secret_key": "password",
"external_get_token_url": "http://10.0.0.49:8080/api/getToken",
"external_put_message_url": "http://10.0.0.49:8080/api/putMessage",
"tenant_code": "32"
"slots": {
"inputs": [
{"name": "video_input_main", "type": "video_source", "required": true, "description": "主视频输入"}
],
"services": [
{"name": "object_storage_main", "type": "object_storage", "required": false, "description": "抓拍与片段上传"},
{"name": "token_service_main", "type": "token_service", "required": false, "description": "认证服务"},
{"name": "alarm_service_main", "type": "alarm_service", "required": false, "description": "告警服务"}
],
"outputs": [
{"name": "stream_output_main", "type": "stream_publish", "required": true, "description": "主视频输出"}
]
},
"template": {
"executor": {
@ -22,7 +26,7 @@
"type": "input_rtsp",
"role": "source",
"enable": true,
"url": "${rtsp_url}",
"url": "${slot:video_input_main.url}",
"fps": 30,
"width": 1920,
"height": 1080,
@ -320,13 +324,13 @@
"outputs": [
{
"proto": "hls",
"path": "${publish_hls_path}",
"path": "${slot:stream_output_main.publish_hls_path}",
"segment_sec": 2
},
{
"proto": "rtsp_server",
"port": "${publish_rtsp_port}",
"path": "${publish_rtsp_path}"
"port": "${slot:stream_output_main.publish_rtsp_port}",
"path": "${slot:stream_output_main.publish_rtsp_path}"
}
]
},
@ -415,11 +419,11 @@
"quality": 85,
"upload": {
"type": "minio",
"endpoint": "${minio_endpoint}",
"bucket": "${minio_bucket}",
"endpoint": "${slot:object_storage_main.endpoint}",
"bucket": "${slot:object_storage_main.bucket}",
"region": "us-east-1",
"access_key": "${minio_access_key}",
"secret_key": "${minio_secret_key}"
"access_key": "${slot:object_storage_main.access_key}",
"secret_key": "${slot:object_storage_main.secret_key}"
}
},
"clip": {
@ -430,19 +434,19 @@
"fps": 30,
"upload": {
"type": "minio",
"endpoint": "${minio_endpoint}",
"bucket": "${minio_bucket}",
"endpoint": "${slot:object_storage_main.endpoint}",
"bucket": "${slot:object_storage_main.bucket}",
"region": "us-east-1",
"access_key": "${minio_access_key}",
"secret_key": "${minio_secret_key}"
"access_key": "${slot:object_storage_main.access_key}",
"secret_key": "${slot:object_storage_main.secret_key}"
}
},
"external_api": {
"enable": true,
"getTokenUrl": "${external_get_token_url}",
"putMessageUrl": "${external_put_message_url}",
"tenantCode": "${tenant_code}",
"channelNo": "${channel_no}",
"getTokenUrl": "${slot:token_service_main.get_token_url}",
"putMessageUrl": "${slot:alarm_service_main.put_message_url}",
"tenantCode": "${slot:alarm_service_main.tenant_code}",
"channelNo": "${slot:stream_output_main.channel_no}",
"timeout_ms": 3000,
"include_media_url": true,
"token_header": "X-Access-Token",

View File

@ -6,11 +6,13 @@ from __future__ import annotations
import argparse
import copy
import json
import re
from pathlib import Path
from typing import Any
JsonObject = dict[str, Any]
SLOT_TOKEN_RE = re.compile(r"^\$\{slot:([A-Za-z0-9_.-]+)\.([A-Za-z0-9_.-]+)\}$")
def deep_merge(base: Any, override: Any) -> Any:
@ -33,6 +35,7 @@ def load_json(path: Path) -> JsonObject:
return data
def template_name(template_doc: JsonObject, template_path: Path) -> str:
name = str(template_doc.get("name") or template_path.stem).strip()
if not name:
@ -59,6 +62,94 @@ def template_params(template_doc: JsonObject) -> JsonObject:
return copy.deepcopy(params)
def instance_scene_meta(instance: JsonObject) -> JsonObject:
scene_meta = instance.get("scene_meta", {})
if scene_meta is None:
return {}
if not isinstance(scene_meta, dict):
raise ValueError("instance.scene_meta must be an object")
return copy.deepcopy(scene_meta)
def instance_extra_params(instance: JsonObject) -> JsonObject:
params = instance.get("params", {})
if params is None:
return {}
if not isinstance(params, dict):
raise ValueError("instance.params must be a JSON object")
return copy.deepcopy(params)
def binding_group(instance: JsonObject, key: str) -> JsonObject:
value = instance.get(key, {})
if value is None:
return {}
if not isinstance(value, dict):
raise ValueError(f"instance.{key} must be an object")
return copy.deepcopy(value)
def resolved_binding_value(entry: JsonObject) -> JsonObject:
if not isinstance(entry, dict):
raise ValueError("binding entry must be an object")
resolved = entry.get("resolved")
if resolved is None:
return copy.deepcopy(entry)
if not isinstance(resolved, dict):
raise ValueError("binding resolved payload must be an object")
return copy.deepcopy(resolved)
def build_binding_context(instance: JsonObject) -> JsonObject:
context: JsonObject = {
"scene": instance_scene_meta(instance),
}
for group_name in ("input_bindings", "service_bindings", "output_bindings"):
group = binding_group(instance, group_name)
for slot_name, value in group.items():
context[slot_name] = resolved_binding_value(value)
return context
def expand_slot_tokens(value: Any, binding_context: JsonObject) -> Any:
if isinstance(value, dict):
return {key: expand_slot_tokens(item, binding_context) for key, item in value.items()}
if isinstance(value, list):
return [expand_slot_tokens(item, binding_context) for item in value]
if not isinstance(value, str):
return copy.deepcopy(value)
match = SLOT_TOKEN_RE.match(value.strip())
if not match:
return value
slot_name, field_name = match.groups()
slot_values = binding_context.get(slot_name)
if not isinstance(slot_values, dict):
raise ValueError(f"required slot '{slot_name}' is not bound")
if field_name not in slot_values:
raise ValueError(f"required slot field '{slot_name}.{field_name}' is not bound")
return copy.deepcopy(slot_values[field_name])
def render_scene_instance(template_doc: JsonObject, scene_instance: JsonObject) -> tuple[str, JsonObject, JsonObject]:
instance_name = str(scene_instance.get("name") or "").strip()
if not instance_name:
raise ValueError("scene instance name is required")
bound_template_name = f"{template_name(template_doc, Path(instance_name))}__{instance_name}"
binding_context = build_binding_context(scene_instance)
rendered_template = expand_slot_tokens(template_body(template_doc), binding_context)
rendered_instance = {
"name": instance_name,
"template": bound_template_name,
}
scene_meta = instance_scene_meta(scene_instance)
if scene_meta:
rendered_instance["scene_meta"] = scene_meta
extra_params = instance_extra_params(scene_instance)
if extra_params:
rendered_instance["params"] = extra_params
return bound_template_name, rendered_template, rendered_instance
def profile_instances(profile: JsonObject, tpl_name: str) -> list[JsonObject]:
if "instances" in profile:
instances = profile["instances"]
@ -159,10 +250,17 @@ def render(
template_doc = load_json(template_path)
profile = load_json(profile_path)
tpl_name = template_name(template_doc, template_path)
instances = merge_template_params(profile_instances(profile, tpl_name), template_params(template_doc))
rendered_templates: JsonObject = {}
rendered_instances: list[JsonObject] = []
for instance in instances:
bound_name, bound_template, rendered_instance = render_scene_instance(template_doc, instance)
rendered_templates[bound_name] = bound_template
rendered_instances.append(rendered_instance)
root: JsonObject = {
"templates": {tpl_name: template_body(template_doc)},
"instances": merge_template_params(profile_instances(profile, tpl_name), template_params(template_doc)),
"templates": rendered_templates,
"instances": rendered_instances,
}
for key in ("global", "queue"):
if key in profile:
@ -180,9 +278,7 @@ def render(
name = str(instance.get("name") or "").strip()
if name:
instance_names.append(name)
params = instance.get("params", {})
if isinstance(params, dict):
display_name = str(params.get("display_name") or "").strip()
display_name = str(as_map(instance.get("scene_meta")).get("display_name") or "").strip()
if display_name:
instance_display_names.append(display_name)
root["metadata"] = {
@ -200,6 +296,12 @@ def render(
return root
def as_map(value: Any) -> JsonObject:
if isinstance(value, dict):
return value
return {}
def main() -> int:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--template", required=True, type=Path)

View File

@ -0,0 +1,79 @@
import tempfile
import unittest
from pathlib import Path
from render_config import render
class RenderConfigSlotTests(unittest.TestCase):
def test_render_config_expands_scene_slot_bindings(self):
with tempfile.TemporaryDirectory() as tmpdir:
root = Path(tmpdir)
template_path = root / "template.json"
profile_path = root / "profile.json"
template_path.write_text(
"""
{
"name": "std_service_test_stream",
"slots": {
"inputs": [{"name": "video_input_main", "type": "video_source", "required": true}],
"services": [{"name": "object_storage_main", "type": "object_storage", "required": false}],
"outputs": [{"name": "stream_output_main", "type": "stream_publish", "required": true}]
},
"template": {
"nodes": [
{"id": "input_rtsp_main", "type": "input_rtsp", "url": "${slot:video_input_main.url}"},
{"id": "publish_stream", "type": "publish", "outputs": [{"proto": "hls", "path": "${slot:stream_output_main.publish_hls_path}"}]}
],
"edges": []
}
}
""".strip(),
encoding="utf-8",
)
profile_path.write_text(
"""
{
"name": "line_a",
"instances": [
{
"name": "cam1",
"template": "std_service_test_stream",
"scene_meta": {
"display_name": "B厂区通道1"
},
"input_bindings": {
"video_input_main": {
"resolved": {
"url": "rtsp://10.0.0.1/live"
}
}
},
"output_bindings": {
"stream_output_main": {
"publish_hls_path": "./web/hls/cam1/index.m3u8"
}
}
}
]
}
""".strip(),
encoding="utf-8",
)
rendered = render(template_path, profile_path, [])
self.assertIn("templates", rendered)
self.assertIn("instances", rendered)
self.assertEqual(1, len(rendered["instances"]))
instance = rendered["instances"][0]
self.assertEqual("std_service_test_stream__cam1", instance["template"])
template = rendered["templates"][instance["template"]]
self.assertEqual("rtsp://10.0.0.1/live", template["nodes"][0]["url"])
self.assertEqual("./web/hls/cam1/index.m3u8", template["nodes"][1]["outputs"][0]["path"])
if __name__ == "__main__":
unittest.main()