diff --git a/agent/internal/httpapi/graph_node_types.go b/agent/internal/httpapi/graph_node_types.go index 83b138d..9432cf9 100644 --- a/agent/internal/httpapi/graph_node_types.go +++ b/agent/internal/httpapi/graph_node_types.go @@ -37,8 +37,8 @@ func (s *Server) handleGraphNodeTypes(w http.ResponseWriter, r *http.Request) { func graphNodeTypesCatalog() []graphNodeTypeInfo { return []graphNodeTypeInfo{ - nodeType("input_rtsp", "RTSP 输入", "输入", "camera", "从网络摄像机或流媒体地址读取视频流。", "source", map[string]any{"url": "${rtsp_url}"}, []graphNodeParam{ - textParam("url", "RTSP 地址", "${rtsp_url}"), + nodeType("input_rtsp", "RTSP 输入", "输入", "camera", "从网络摄像机或流媒体地址读取视频流。", "source", map[string]any{"url": "${slot:video_input_main.url}"}, []graphNodeParam{ + textParam("url", "RTSP 地址", "${slot:video_input_main.url}"), numberParam("fps", "输入帧率", "1"), numberParam("width", "宽度", "1"), numberParam("height", "高度", "1"), diff --git a/agent/internal/httpapi/graph_node_types_test.go b/agent/internal/httpapi/graph_node_types_test.go new file mode 100644 index 0000000..a571262 --- /dev/null +++ b/agent/internal/httpapi/graph_node_types_test.go @@ -0,0 +1,21 @@ +package httpapi + +import "testing" + +func TestGraphNodeTypesDescribeSlotDrivenInputDefaults(t *testing.T) { + items := graphNodeTypesCatalog() + for _, item := range items { + if item.Type != "input_rtsp" { + continue + } + if got := item.Defaults["url"]; got != "${slot:video_input_main.url}" { + t.Fatalf("expected slot-driven input default, got %#v", got) + } + if len(item.Params) == 0 || item.Params[0].Placeholder != "${slot:video_input_main.url}" { + t.Fatalf("expected slot-driven placeholder, got %#v", item.Params) + } + return + } + t.Fatal("expected input_rtsp node type") +} + diff --git a/configs/profiles/local_3588_test.json b/configs/profiles/local_3588_test.json index 8a14d34..78814bd 100644 --- a/configs/profiles/local_3588_test.json +++ b/configs/profiles/local_3588_test.json @@ -4,17 +4,51 @@ "instances": [ { "name": "cam1", - "params": { - "channel_no": "cam1", - "device_code": "rk3588-a-001", + "template": "std_workshop_face_recognition_shoe_alarm", + "scene_meta": { "display_name": "B厂区通道1", - "publish_hls_path": "./web/hls/cam1/index.m3u8", - "publish_rtsp_path": "/live/cam1", - "publish_rtsp_port": 8555, - "rtsp_url": "rtsp://10.0.0.49:8554/cam", - "site_name": "B厂区" + "site_name": "B厂区", + "device_code": "rk3588-a-001" }, - "template": "std_workshop_face_recognition_shoe_alarm" + "input_bindings": { + "video_input_main": { + "resolved": { + "url": "rtsp://10.0.0.49:8554/cam" + } + } + }, + "service_bindings": { + "object_storage_main": { + "resolved": { + "endpoint": "http://10.0.0.49:9000", + "bucket": "myminio", + "access_key": "admin", + "secret_key": "password" + } + }, + "token_service_main": { + "resolved": { + "get_token_url": "http://10.0.0.49:8080/api/getToken", + "tenant_code": "32" + } + }, + "alarm_service_main": { + "resolved": { + "put_message_url": "http://10.0.0.49:8080/api/putMessage", + "tenant_code": "32" + } + } + }, + "output_bindings": { + "stream_output_main": { + "resolved": { + "publish_hls_path": "./web/hls/cam1/index.m3u8", + "publish_rtsp_path": "/live/cam1", + "publish_rtsp_port": 8555, + "channel_no": "cam1" + } + } + } } ], "name": "local_3588_test", diff --git a/configs/templates/std_face_recognition_stream.json b/configs/templates/std_face_recognition_stream.json index a62064f..b36393a 100644 --- a/configs/templates/std_face_recognition_stream.json +++ b/configs/templates/std_face_recognition_stream.json @@ -2,7 +2,15 @@ "name": "std_face_recognition_stream", "description": "1080p 人脸识别流程,包含滑窗人脸检测、人脸识别、画面叠加与视频发布。", "source": "configs/test_scrfd_640_recog.json", - "params": {}, + "slots": { + "inputs": [ + {"name": "video_input_main", "type": "video_source", "required": true, "description": "主视频输入"} + ], + "services": [], + "outputs": [ + {"name": "stream_output_main", "type": "stream_publish", "required": true, "description": "主视频输出"} + ] + }, "template": { "executor": { "batch_size": 2, @@ -14,7 +22,7 @@ "type": "input_rtsp", "role": "source", "enable": true, - "url": "${rtsp_url}", + "url": "${slot:video_input_main.url}", "fps": 30, "width": 1920, "height": 1080, @@ -165,13 +173,13 @@ "outputs": [ { "proto": "hls", - "path": "${publish_hls_path}", + "path": "${slot:stream_output_main.publish_hls_path}", "segment_sec": 2 }, { "proto": "rtsp_server", - "port": "${publish_rtsp_port}", - "path": "${publish_rtsp_path}" + "port": "${slot:stream_output_main.publish_rtsp_port}", + "path": "${slot:stream_output_main.publish_rtsp_path}" } ] } diff --git a/configs/templates/std_service_test_stream.json b/configs/templates/std_service_test_stream.json index 3e797fd..7edbc4c 100644 --- a/configs/templates/std_service_test_stream.json +++ b/configs/templates/std_service_test_stream.json @@ -2,7 +2,15 @@ "name": "std_service_test_stream", "description": "最简服务可用性测试流程,用于验证视频解码、预处理、编码与发布链路是否正常。", "source": "configs/test_face_only.json", - "params": {}, + "slots": { + "inputs": [ + {"name": "video_input_main", "type": "video_source", "required": true, "description": "主视频输入"} + ], + "services": [], + "outputs": [ + {"name": "stream_output_main", "type": "stream_publish", "required": true, "description": "主视频输出"} + ] + }, "template": { "executor": { "batch_size": 1, @@ -14,7 +22,7 @@ "type": "input_rtsp", "role": "source", "enable": true, - "url": "${rtsp_url}", + "url": "${slot:video_input_main.url}", "fps": 25, "width": 1280, "height": 720, @@ -50,13 +58,13 @@ "outputs": [ { "proto": "hls", - "path": "${publish_hls_path}", + "path": "${slot:stream_output_main.publish_hls_path}", "segment_sec": 2 }, { "proto": "rtsp_server", - "port": "${publish_rtsp_port}", - "path": "${publish_rtsp_path}" + "port": "${slot:stream_output_main.publish_rtsp_port}", + "path": "${slot:stream_output_main.publish_rtsp_path}" } ] } diff --git a/configs/templates/std_workshoe_detection_stream.json b/configs/templates/std_workshoe_detection_stream.json index 97b32dd..936d9a9 100644 --- a/configs/templates/std_workshoe_detection_stream.json +++ b/configs/templates/std_workshoe_detection_stream.json @@ -2,7 +2,15 @@ "name": "std_workshoe_detection_stream", "description": "1080p 劳保鞋检测流程,包含人员检测、人员跟踪、劳保鞋规则判断、画面叠加与视频发布。", "source": "configs/full_pipeline_1080p_test_alarm.json", - "params": {}, + "slots": { + "inputs": [ + {"name": "video_input_main", "type": "video_source", "required": true, "description": "主视频输入"} + ], + "services": [], + "outputs": [ + {"name": "stream_output_main", "type": "stream_publish", "required": true, "description": "主视频输出"} + ] + }, "template": { "executor": { "batch_size": 2, @@ -14,7 +22,7 @@ "type": "input_rtsp", "role": "source", "enable": true, - "url": "${rtsp_url}", + "url": "${slot:video_input_main.url}", "fps": 30, "width": 1920, "height": 1080, @@ -241,13 +249,13 @@ "outputs": [ { "proto": "hls", - "path": "${publish_hls_path}", + "path": "${slot:stream_output_main.publish_hls_path}", "segment_sec": 2 }, { "proto": "rtsp_server", - "port": "${publish_rtsp_port}", - "path": "${publish_rtsp_path}" + "port": "${slot:stream_output_main.publish_rtsp_port}", + "path": "${slot:stream_output_main.publish_rtsp_path}" } ] } diff --git a/configs/templates/std_workshop_face_recognition_shoe_alarm.json b/configs/templates/std_workshop_face_recognition_shoe_alarm.json index 0c5d8b0..6fa01d6 100644 --- a/configs/templates/std_workshop_face_recognition_shoe_alarm.json +++ b/configs/templates/std_workshop_face_recognition_shoe_alarm.json @@ -2,14 +2,18 @@ "name": "std_workshop_face_recognition_shoe_alarm", "description": "1080p 车间全功能识别流程,包含人脸检测与识别、人员跟踪、劳保鞋检测、画面叠加、视频发布、告警上报以及抓拍存证。", "source": "configs/full_pipeline_1080p_test_alarm.json", - "params": { - "minio_endpoint": "http://10.0.0.49:9000", - "minio_bucket": "myminio", - "minio_access_key": "admin", - "minio_secret_key": "password", - "external_get_token_url": "http://10.0.0.49:8080/api/getToken", - "external_put_message_url": "http://10.0.0.49:8080/api/putMessage", - "tenant_code": "32" + "slots": { + "inputs": [ + {"name": "video_input_main", "type": "video_source", "required": true, "description": "主视频输入"} + ], + "services": [ + {"name": "object_storage_main", "type": "object_storage", "required": false, "description": "抓拍与片段上传"}, + {"name": "token_service_main", "type": "token_service", "required": false, "description": "认证服务"}, + {"name": "alarm_service_main", "type": "alarm_service", "required": false, "description": "告警服务"} + ], + "outputs": [ + {"name": "stream_output_main", "type": "stream_publish", "required": true, "description": "主视频输出"} + ] }, "template": { "executor": { @@ -22,7 +26,7 @@ "type": "input_rtsp", "role": "source", "enable": true, - "url": "${rtsp_url}", + "url": "${slot:video_input_main.url}", "fps": 30, "width": 1920, "height": 1080, @@ -320,13 +324,13 @@ "outputs": [ { "proto": "hls", - "path": "${publish_hls_path}", + "path": "${slot:stream_output_main.publish_hls_path}", "segment_sec": 2 }, { "proto": "rtsp_server", - "port": "${publish_rtsp_port}", - "path": "${publish_rtsp_path}" + "port": "${slot:stream_output_main.publish_rtsp_port}", + "path": "${slot:stream_output_main.publish_rtsp_path}" } ] }, @@ -415,11 +419,11 @@ "quality": 85, "upload": { "type": "minio", - "endpoint": "${minio_endpoint}", - "bucket": "${minio_bucket}", + "endpoint": "${slot:object_storage_main.endpoint}", + "bucket": "${slot:object_storage_main.bucket}", "region": "us-east-1", - "access_key": "${minio_access_key}", - "secret_key": "${minio_secret_key}" + "access_key": "${slot:object_storage_main.access_key}", + "secret_key": "${slot:object_storage_main.secret_key}" } }, "clip": { @@ -430,19 +434,19 @@ "fps": 30, "upload": { "type": "minio", - "endpoint": "${minio_endpoint}", - "bucket": "${minio_bucket}", + "endpoint": "${slot:object_storage_main.endpoint}", + "bucket": "${slot:object_storage_main.bucket}", "region": "us-east-1", - "access_key": "${minio_access_key}", - "secret_key": "${minio_secret_key}" + "access_key": "${slot:object_storage_main.access_key}", + "secret_key": "${slot:object_storage_main.secret_key}" } }, "external_api": { "enable": true, - "getTokenUrl": "${external_get_token_url}", - "putMessageUrl": "${external_put_message_url}", - "tenantCode": "${tenant_code}", - "channelNo": "${channel_no}", + "getTokenUrl": "${slot:token_service_main.get_token_url}", + "putMessageUrl": "${slot:alarm_service_main.put_message_url}", + "tenantCode": "${slot:alarm_service_main.tenant_code}", + "channelNo": "${slot:stream_output_main.channel_no}", "timeout_ms": 3000, "include_media_url": true, "token_header": "X-Access-Token", diff --git a/tools/render_config.py b/tools/render_config.py index e2b5ae4..cda1549 100644 --- a/tools/render_config.py +++ b/tools/render_config.py @@ -6,11 +6,13 @@ from __future__ import annotations import argparse import copy import json +import re from pathlib import Path from typing import Any JsonObject = dict[str, Any] +SLOT_TOKEN_RE = re.compile(r"^\$\{slot:([A-Za-z0-9_.-]+)\.([A-Za-z0-9_.-]+)\}$") def deep_merge(base: Any, override: Any) -> Any: @@ -33,6 +35,7 @@ def load_json(path: Path) -> JsonObject: return data + def template_name(template_doc: JsonObject, template_path: Path) -> str: name = str(template_doc.get("name") or template_path.stem).strip() if not name: @@ -59,6 +62,94 @@ def template_params(template_doc: JsonObject) -> JsonObject: return copy.deepcopy(params) +def instance_scene_meta(instance: JsonObject) -> JsonObject: + scene_meta = instance.get("scene_meta", {}) + if scene_meta is None: + return {} + if not isinstance(scene_meta, dict): + raise ValueError("instance.scene_meta must be an object") + return copy.deepcopy(scene_meta) + + +def instance_extra_params(instance: JsonObject) -> JsonObject: + params = instance.get("params", {}) + if params is None: + return {} + if not isinstance(params, dict): + raise ValueError("instance.params must be a JSON object") + return copy.deepcopy(params) + + +def binding_group(instance: JsonObject, key: str) -> JsonObject: + value = instance.get(key, {}) + if value is None: + return {} + if not isinstance(value, dict): + raise ValueError(f"instance.{key} must be an object") + return copy.deepcopy(value) + + +def resolved_binding_value(entry: JsonObject) -> JsonObject: + if not isinstance(entry, dict): + raise ValueError("binding entry must be an object") + resolved = entry.get("resolved") + if resolved is None: + return copy.deepcopy(entry) + if not isinstance(resolved, dict): + raise ValueError("binding resolved payload must be an object") + return copy.deepcopy(resolved) + + +def build_binding_context(instance: JsonObject) -> JsonObject: + context: JsonObject = { + "scene": instance_scene_meta(instance), + } + for group_name in ("input_bindings", "service_bindings", "output_bindings"): + group = binding_group(instance, group_name) + for slot_name, value in group.items(): + context[slot_name] = resolved_binding_value(value) + return context + + +def expand_slot_tokens(value: Any, binding_context: JsonObject) -> Any: + if isinstance(value, dict): + return {key: expand_slot_tokens(item, binding_context) for key, item in value.items()} + if isinstance(value, list): + return [expand_slot_tokens(item, binding_context) for item in value] + if not isinstance(value, str): + return copy.deepcopy(value) + match = SLOT_TOKEN_RE.match(value.strip()) + if not match: + return value + slot_name, field_name = match.groups() + slot_values = binding_context.get(slot_name) + if not isinstance(slot_values, dict): + raise ValueError(f"required slot '{slot_name}' is not bound") + if field_name not in slot_values: + raise ValueError(f"required slot field '{slot_name}.{field_name}' is not bound") + return copy.deepcopy(slot_values[field_name]) + + +def render_scene_instance(template_doc: JsonObject, scene_instance: JsonObject) -> tuple[str, JsonObject, JsonObject]: + instance_name = str(scene_instance.get("name") or "").strip() + if not instance_name: + raise ValueError("scene instance name is required") + bound_template_name = f"{template_name(template_doc, Path(instance_name))}__{instance_name}" + binding_context = build_binding_context(scene_instance) + rendered_template = expand_slot_tokens(template_body(template_doc), binding_context) + rendered_instance = { + "name": instance_name, + "template": bound_template_name, + } + scene_meta = instance_scene_meta(scene_instance) + if scene_meta: + rendered_instance["scene_meta"] = scene_meta + extra_params = instance_extra_params(scene_instance) + if extra_params: + rendered_instance["params"] = extra_params + return bound_template_name, rendered_template, rendered_instance + + def profile_instances(profile: JsonObject, tpl_name: str) -> list[JsonObject]: if "instances" in profile: instances = profile["instances"] @@ -159,10 +250,17 @@ def render( template_doc = load_json(template_path) profile = load_json(profile_path) tpl_name = template_name(template_doc, template_path) + instances = merge_template_params(profile_instances(profile, tpl_name), template_params(template_doc)) + rendered_templates: JsonObject = {} + rendered_instances: list[JsonObject] = [] + for instance in instances: + bound_name, bound_template, rendered_instance = render_scene_instance(template_doc, instance) + rendered_templates[bound_name] = bound_template + rendered_instances.append(rendered_instance) root: JsonObject = { - "templates": {tpl_name: template_body(template_doc)}, - "instances": merge_template_params(profile_instances(profile, tpl_name), template_params(template_doc)), + "templates": rendered_templates, + "instances": rendered_instances, } for key in ("global", "queue"): if key in profile: @@ -180,11 +278,9 @@ def render( name = str(instance.get("name") or "").strip() if name: instance_names.append(name) - params = instance.get("params", {}) - if isinstance(params, dict): - display_name = str(params.get("display_name") or "").strip() - if display_name: - instance_display_names.append(display_name) + display_name = str(as_map(instance.get("scene_meta")).get("display_name") or "").strip() + if display_name: + instance_display_names.append(display_name) root["metadata"] = { "template": tpl_name, "template_path": template_path.as_posix(), @@ -200,6 +296,12 @@ def render( return root +def as_map(value: Any) -> JsonObject: + if isinstance(value, dict): + return value + return {} + + def main() -> int: parser = argparse.ArgumentParser(description=__doc__) parser.add_argument("--template", required=True, type=Path) diff --git a/tools/tests/test_render_config_slots.py b/tools/tests/test_render_config_slots.py new file mode 100644 index 0000000..c4a50ed --- /dev/null +++ b/tools/tests/test_render_config_slots.py @@ -0,0 +1,79 @@ +import tempfile +import unittest +from pathlib import Path + +from render_config import render + + +class RenderConfigSlotTests(unittest.TestCase): + def test_render_config_expands_scene_slot_bindings(self): + with tempfile.TemporaryDirectory() as tmpdir: + root = Path(tmpdir) + template_path = root / "template.json" + profile_path = root / "profile.json" + + template_path.write_text( + """ +{ + "name": "std_service_test_stream", + "slots": { + "inputs": [{"name": "video_input_main", "type": "video_source", "required": true}], + "services": [{"name": "object_storage_main", "type": "object_storage", "required": false}], + "outputs": [{"name": "stream_output_main", "type": "stream_publish", "required": true}] + }, + "template": { + "nodes": [ + {"id": "input_rtsp_main", "type": "input_rtsp", "url": "${slot:video_input_main.url}"}, + {"id": "publish_stream", "type": "publish", "outputs": [{"proto": "hls", "path": "${slot:stream_output_main.publish_hls_path}"}]} + ], + "edges": [] + } +} +""".strip(), + encoding="utf-8", + ) + + profile_path.write_text( + """ +{ + "name": "line_a", + "instances": [ + { + "name": "cam1", + "template": "std_service_test_stream", + "scene_meta": { + "display_name": "B厂区通道1" + }, + "input_bindings": { + "video_input_main": { + "resolved": { + "url": "rtsp://10.0.0.1/live" + } + } + }, + "output_bindings": { + "stream_output_main": { + "publish_hls_path": "./web/hls/cam1/index.m3u8" + } + } + } + ] +} +""".strip(), + encoding="utf-8", + ) + + rendered = render(template_path, profile_path, []) + + self.assertIn("templates", rendered) + self.assertIn("instances", rendered) + self.assertEqual(1, len(rendered["instances"])) + instance = rendered["instances"][0] + self.assertEqual("std_service_test_stream__cam1", instance["template"]) + template = rendered["templates"][instance["template"]] + self.assertEqual("rtsp://10.0.0.1/live", template["nodes"][0]["url"]) + self.assertEqual("./web/hls/cam1/index.m3u8", template["nodes"][1]["outputs"][0]["path"]) + + +if __name__ == "__main__": + unittest.main()