commit c33595f6d6b5c22bb5e73dd0ca3b78248383fb8b Author: sladro Date: Tue Dec 9 17:31:21 2025 +0800 first diff --git a/config.yaml b/config.yaml new file mode 100644 index 0000000..3bb07fe --- /dev/null +++ b/config.yaml @@ -0,0 +1,89 @@ +# 服务器基本配置 +server: + port: ":8080" # HTTP 服务器监听端口 + read_timeout: 10s # HTTP 请求读取超时时间,建议 10-30 秒 + write_timeout: 5s # HTTP 响应写入超时时间,建议 5-15 秒 + max_header_bytes: 1048576 # HTTP 请求头最大字节数(1MB),防止头部攻击 + +# FFmpeg 转码配置 +ffmpeg: + max_concurrent_streams: 10 # 最大并发流数量,根据服务器性能调整 + preset: "medium" # 编码速度预设,可选:ultrafast, superfast, veryfast, faster, fast, medium + # ultrafast: 最低延迟,较低质量 + # veryfast: 推荐值,平衡延迟和质量 + # medium: 更好的质量,但延迟更高 + bitrate: "2000k" # 视频码率,影响画质和带宽使用 + # 1080p 建议 2000k-4000k + # 720p 建议 1500k-2500k + maxrate: "3000k" # 最大码率,建议设置为 bitrate 的 1.25-1.5 倍 + buffer_size: "4000k" # 编码器缓冲区大小,建议设置为 bitrate 的 2 倍 + + # 编码参数 + keyint: 60 # 关键帧间隔,影响延迟和定位能力 + # 建议值:帧率的 1-2 倍 + min_keyint: 60 # 最小关键帧间隔,通常与 keyint 相同 + threads: 4 # 编码线程数,建议设置为 CPU 核心数的一半 + frame_rate: 25 # 输出帧率,常用值:24, 25, 30, 60 + gop_size: 60 # GOP 大小,建议与 keyint 相同 + + # 输出分辨率 + scale_width: 1920 # 输出视频宽度(像素) + scale_height: 1080 # 输出视频高度(像素) + # 常用分辨率: + # 1080p: 1920x1080 + # 720p: 1280x720 + # 480p: 854x480 + +# 重试策略配置 +retry: + max_retries: 3 # 连接失败时的最大重试次数 + retry_interval: 2s # 重试间隔时间 + +# Prometheus 监控配置 +metrics: + enabled: true # 是否启用 Prometheus 指标收集 + path: "/metrics" # Prometheus 指标访问路径 + +# 缓存配置 +cache: + enabled: true # 是否启用缓存 + max_size: 1000 # 最大缓存条目数 + expire_time: 300s # 缓存过期时间,建议 5-10 分钟 + +# 针对不同场景的推荐配置: + +# 1. 低延迟场景(如实时监控): +# ffmpeg: +# preset: "ultrafast" +# bitrate: "1500k" +# maxrate: "2000k" +# buffer_size: "2000k" +# keyint: 30 +# min_keyint: 30 +# frame_rate: 30 +# scale_width: 854 +# scale_height: 480 + +# 2. 高质量场景(如视频会议): +# ffmpeg: +# preset: "medium" +# bitrate: "4000k" +# maxrate: "5000k" +# buffer_size: "8000k" +# keyint: 60 +# min_keyint: 60 +# frame_rate: 30 +# scale_width: 1920 +# scale_height: 1080 + +# 3. 资源受限场景(如低配置服务器): +# ffmpeg: +# max_concurrent_streams: 5 +# preset: "ultrafast" +# bitrate: "1000k" +# maxrate: "1500k" +# buffer_size: "2000k" +# threads: 2 +# frame_rate: 25 +# scale_width: 854 +# scale_height: 480 \ No newline at end of file diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..9f9e687 --- /dev/null +++ b/go.mod @@ -0,0 +1,46 @@ +module test + +go 1.23.3 + +require ( + github.com/patrickmn/go-cache v2.1.0+incompatible + github.com/pion/webrtc/v3 v3.3.4 + github.com/prometheus/client_golang v1.20.5 + gopkg.in/yaml.v2 v2.4.0 +) + +require ( + github.com/beorn7/perks v1.0.1 // indirect + github.com/cespare/xxhash/v2 v2.3.0 // indirect + github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect + github.com/google/uuid v1.6.0 // indirect + github.com/klauspost/compress v1.17.9 // indirect + github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect + github.com/pion/datachannel v1.5.8 // indirect + github.com/pion/dtls/v2 v2.2.12 // indirect + github.com/pion/ice/v2 v2.3.36 // indirect + github.com/pion/interceptor v0.1.29 // indirect + github.com/pion/logging v0.2.2 // indirect + github.com/pion/mdns v0.0.12 // indirect + github.com/pion/randutil v0.1.0 // indirect + github.com/pion/rtcp v1.2.14 // indirect + github.com/pion/rtp v1.8.7 // indirect + github.com/pion/sctp v1.8.19 // indirect + github.com/pion/sdp/v3 v3.0.9 // indirect + github.com/pion/srtp/v2 v2.0.20 // indirect + github.com/pion/stun v0.6.1 // indirect + github.com/pion/transport/v2 v2.2.10 // indirect + github.com/pion/turn/v2 v2.1.6 // indirect + github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect + github.com/prometheus/client_model v0.6.1 // indirect + github.com/prometheus/common v0.55.0 // indirect + github.com/prometheus/procfs v0.15.1 // indirect + github.com/stretchr/testify v1.9.0 // indirect + github.com/wlynxg/anet v0.0.3 // indirect + golang.org/x/crypto v0.24.0 // indirect + golang.org/x/net v0.26.0 // indirect + golang.org/x/sys v0.22.0 // indirect + golang.org/x/time v0.8.0 + google.golang.org/protobuf v1.34.2 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..d55b762 --- /dev/null +++ b/go.sum @@ -0,0 +1,163 @@ +github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= +github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= +github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= +github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= +github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/uuid v1.3.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA= +github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= +github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= +github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc= +github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw= +github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= +github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= +github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc= +github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ= +github.com/pion/datachannel v1.5.8 h1:ph1P1NsGkazkjrvyMfhRBUAWMxugJjq2HfQifaOoSNo= +github.com/pion/datachannel v1.5.8/go.mod h1:PgmdpoaNBLX9HNzNClmdki4DYW5JtI7Yibu8QzbL3tI= +github.com/pion/dtls/v2 v2.2.7/go.mod h1:8WiMkebSHFD0T+dIU+UeBaoV7kDhOW5oDCzZ7WZ/F9s= +github.com/pion/dtls/v2 v2.2.12 h1:KP7H5/c1EiVAAKUmXyCzPiQe5+bCJrpOeKg/L05dunk= +github.com/pion/dtls/v2 v2.2.12/go.mod h1:d9SYc9fch0CqK90mRk1dC7AkzzpwJj6u2GU3u+9pqFE= +github.com/pion/ice/v2 v2.3.36 h1:SopeXiVbbcooUg2EIR8sq4b13RQ8gzrkkldOVg+bBsc= +github.com/pion/ice/v2 v2.3.36/go.mod h1:mBF7lnigdqgtB+YHkaY/Y6s6tsyRyo4u4rPGRuOjUBQ= +github.com/pion/interceptor v0.1.29 h1:39fsnlP1U8gw2JzOFWdfCU82vHvhW9o0rZnZF56wF+M= +github.com/pion/interceptor v0.1.29/go.mod h1:ri+LGNjRUc5xUNtDEPzfdkmSqISixVTBF/z/Zms/6T4= +github.com/pion/logging v0.2.2 h1:M9+AIj/+pxNsDfAT64+MAVgJO0rsyLnoJKCqf//DoeY= +github.com/pion/logging v0.2.2/go.mod h1:k0/tDVsRCX2Mb2ZEmTqNa7CWsQPc+YYCB7Q+5pahoms= +github.com/pion/mdns v0.0.12 h1:CiMYlY+O0azojWDmxdNr7ADGrnZ+V6Ilfner+6mSVK8= +github.com/pion/mdns v0.0.12/go.mod h1:VExJjv8to/6Wqm1FXK+Ii/Z9tsVk/F5sD/N70cnYFbk= +github.com/pion/randutil v0.1.0 h1:CFG1UdESneORglEsnimhUjf33Rwjubwj6xfiOXBa3mA= +github.com/pion/randutil v0.1.0/go.mod h1:XcJrSMMbbMRhASFVOlj/5hQial/Y8oH/HVo7TBZq+j8= +github.com/pion/rtcp v1.2.12/go.mod h1:sn6qjxvnwyAkkPzPULIbVqSKI5Dv54Rv7VG0kNxh9L4= +github.com/pion/rtcp v1.2.14 h1:KCkGV3vJ+4DAJmvP0vaQShsb0xkRfWkO540Gy102KyE= +github.com/pion/rtcp v1.2.14/go.mod h1:sn6qjxvnwyAkkPzPULIbVqSKI5Dv54Rv7VG0kNxh9L4= +github.com/pion/rtp v1.8.3/go.mod h1:pBGHaFt/yW7bf1jjWAoUjpSNoDnw98KTMg+jWWvziqU= +github.com/pion/rtp v1.8.7 h1:qslKkG8qxvQ7hqaxkmL7Pl0XcUm+/Er7nMnu6Vq+ZxM= +github.com/pion/rtp v1.8.7/go.mod h1:pBGHaFt/yW7bf1jjWAoUjpSNoDnw98KTMg+jWWvziqU= +github.com/pion/sctp v1.8.19 h1:2CYuw+SQ5vkQ9t0HdOPccsCz1GQMDuVy5PglLgKVBW8= +github.com/pion/sctp v1.8.19/go.mod h1:P6PbDVA++OJMrVNg2AL3XtYHV4uD6dvfyOovCgMs0PE= +github.com/pion/sdp/v3 v3.0.9 h1:pX++dCHoHUwq43kuwf3PyJfHlwIj4hXA7Vrifiq0IJY= +github.com/pion/sdp/v3 v3.0.9/go.mod h1:B5xmvENq5IXJimIO4zfp6LAe1fD9N+kFv+V/1lOdz8M= +github.com/pion/srtp/v2 v2.0.20 h1:HNNny4s+OUmG280ETrCdgFndp4ufx3/uy85EawYEhTk= +github.com/pion/srtp/v2 v2.0.20/go.mod h1:0KJQjA99A6/a0DOVTu1PhDSw0CXF2jTkqOoMg3ODqdA= +github.com/pion/stun v0.6.1 h1:8lp6YejULeHBF8NmV8e2787BogQhduZugh5PdhDyyN4= +github.com/pion/stun v0.6.1/go.mod h1:/hO7APkX4hZKu/D0f2lHzNyvdkTGtIy3NDmLR7kSz/8= +github.com/pion/transport/v2 v2.2.1/go.mod h1:cXXWavvCnFF6McHTft3DWS9iic2Mftcz1Aq29pGcU5g= +github.com/pion/transport/v2 v2.2.3/go.mod h1:q2U/tf9FEfnSBGSW6w5Qp5PFWRLRj3NjLhCCgpRK4p0= +github.com/pion/transport/v2 v2.2.4/go.mod h1:q2U/tf9FEfnSBGSW6w5Qp5PFWRLRj3NjLhCCgpRK4p0= +github.com/pion/transport/v2 v2.2.10 h1:ucLBLE8nuxiHfvkFKnkDQRYWYfp8ejf4YBOPfaQpw6Q= +github.com/pion/transport/v2 v2.2.10/go.mod h1:sq1kSLWs+cHW9E+2fJP95QudkzbK7wscs8yYgQToO5E= +github.com/pion/transport/v3 v3.0.1/go.mod h1:UY7kiITrlMv7/IKgd5eTUcaahZx5oUN3l9SzK5f5xE0= +github.com/pion/transport/v3 v3.0.2 h1:r+40RJR25S9w3jbA6/5uEPTzcdn7ncyU44RWCbHkLg4= +github.com/pion/transport/v3 v3.0.2/go.mod h1:nIToODoOlb5If2jF9y2Igfx3PFYWfuXi37m0IlWa/D0= +github.com/pion/turn/v2 v2.1.3/go.mod h1:huEpByKKHix2/b9kmTAM3YoX6MKP+/D//0ClgUYR2fY= +github.com/pion/turn/v2 v2.1.6 h1:Xr2niVsiPTB0FPtt+yAWKFUkU1eotQbGgpTIld4x1Gc= +github.com/pion/turn/v2 v2.1.6/go.mod h1:huEpByKKHix2/b9kmTAM3YoX6MKP+/D//0ClgUYR2fY= +github.com/pion/webrtc/v3 v3.3.4 h1:v2heQVnXTSqNRXcaFQVOhIOYkLMxOu1iJG8uy1djvkk= +github.com/pion/webrtc/v3 v3.3.4/go.mod h1:liNa+E1iwyzyXqNUwvoMRNQ10x8h8FOeJKL8RkIbamE= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= +github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/prometheus/client_golang v1.20.5 h1:cxppBPuYhUnsO6yo/aoRol4L7q7UFfdm+bR9r+8l63Y= +github.com/prometheus/client_golang v1.20.5/go.mod h1:PIEt8X02hGcP8JWbeHyeZ53Y/jReSnHgO035n//V5WE= +github.com/prometheus/client_model v0.6.1 h1:ZKSh/rekM+n3CeS952MLRAdFwIKqeY8b62p8ais2e9E= +github.com/prometheus/client_model v0.6.1/go.mod h1:OrxVMOVHjw3lKMa8+x6HeMGkHMQyHDk9E3jmP2AmGiY= +github.com/prometheus/common v0.55.0 h1:KEi6DK7lXW/m7Ig5i47x0vRzuBsHuvJdi5ee6Y3G1dc= +github.com/prometheus/common v0.55.0/go.mod h1:2SECS4xJG1kd8XF9IcM1gMX6510RAEL65zxzNImwdc8= +github.com/prometheus/procfs v0.15.1 h1:YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0learggepc= +github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk= +github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= +github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= +github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/stretchr/testify v1.8.3/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= +github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/wlynxg/anet v0.0.3 h1:PvR53psxFXstc12jelG6f1Lv4MWqE0tI76/hHGjh9rg= +github.com/wlynxg/anet v0.0.3/go.mod h1:eay5PRQr7fIVAMbTbchTnO9gG65Hg/uYGdc7mguHxoA= +github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= +golang.org/x/crypto v0.8.0/go.mod h1:mRqEX+O9/h5TFCrQhkgjo2yKi0yYA+9ecGkdQoHrywE= +golang.org/x/crypto v0.12.0/go.mod h1:NF0Gs7EO5K4qLn+Ylc+fih8BSTeIjAP05siRnAh98yw= +golang.org/x/crypto v0.18.0/go.mod h1:R0j02AL6hcrfOiy9T4ZYp/rcWeMxM3L6QYxlOuEG1mg= +golang.org/x/crypto v0.24.0 h1:mnl8DM0o513X8fdIkmyFE/5hTYxbwYOjDS/+rK6qpRI= +golang.org/x/crypto v0.24.0/go.mod h1:Z1PMYSOR5nyMcyAVAIQSKCDwalqy85Aqn1x3Ws4L5DM= +golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= +golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= +golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= +golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= +golang.org/x/net v0.9.0/go.mod h1:d48xBJpPfHeWQsugry2m+kC02ZBRGRgulfHnEXEuWns= +golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= +golang.org/x/net v0.14.0/go.mod h1:PpSgVXXLK0OxS0F31C1/tv6XNguvCrnXIDrFMspZIUI= +golang.org/x/net v0.20.0/go.mod h1:z8BVo6PvndSri0LbOE3hAn0apkU+1YvI6E70E9jsnvY= +golang.org/x/net v0.26.0 h1:soB7SVo0PWrY4vPW/+ay0jKDNScG2X9wFeYlXIvJsOQ= +golang.org/x/net v0.26.0/go.mod h1:5YKkiSynbBIh3p6iOc/vibscux0x38BZDkn8sCUPxHE= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.7.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.9.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.22.0 h1:RI27ohtqKCnwULzJLqkv897zojh5/DwS/ENaMzUOaWI= +golang.org/x/sys v0.22.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= +golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= +golang.org/x/term v0.7.0/go.mod h1:P32HKFT3hSsZrRxla30E9HqToFYAQPCMs/zFMBUFqPY= +golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo= +golang.org/x/term v0.11.0/go.mod h1:zC9APTIj3jG3FdV/Ons+XE1riIZXG4aZ4GTHiPZJPIU= +golang.org/x/term v0.16.0/go.mod h1:yn7UURbUtPyrVJPGPq404EukNFxcm/foM+bV/bfcDsY= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= +golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= +golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= +golang.org/x/text v0.12.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= +golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= +golang.org/x/time v0.8.0 h1:9i3RxcPv3PZnitoVGMPDKZSq1xW1gK1Xy3ArNOGZfEg= +golang.org/x/time v0.8.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= +golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg= +google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= +gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= +gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/main.go b/main.go new file mode 100644 index 0000000..ac5b71e --- /dev/null +++ b/main.go @@ -0,0 +1,957 @@ +package main + +import ( + "bufio" + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "log" + "net/http" + "net/url" + "os" + "os/exec" + "os/signal" + "runtime" + "runtime/debug" + "strings" + "sync" + "syscall" + "time" + + "github.com/patrickmn/go-cache" + "github.com/pion/webrtc/v3" + "github.com/pion/webrtc/v3/pkg/media" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promhttp" + "golang.org/x/time/rate" + "gopkg.in/yaml.v3" +) + +type StreamConfig struct { + URL string `json:"url"` + Name string `json:"name"` +} + +type OfferRequest struct { + Offer webrtc.SessionDescription `json:"offer"` + StreamConfig StreamConfig `json:"streamConfig"` +} + +// 添加配置结构 +type Config struct { + Server struct { + Port string `yaml:"port"` + ReadTimeout time.Duration `yaml:"read_timeout"` + WriteTimeout time.Duration `yaml:"write_timeout"` + MaxHeaderBytes int `yaml:"max_header_bytes"` + } `yaml:"server"` + FFmpeg struct { + MaxConcurrentStreams int `yaml:"max_concurrent_streams"` + Preset string `yaml:"preset"` + Bitrate string `yaml:"bitrate"` + Maxrate string `yaml:"maxrate"` + BufferSize string `yaml:"buffer_size"` + Keyint int `yaml:"keyint"` + MinKeyint int `yaml:"min_keyint"` + Threads int `yaml:"threads"` + FrameRate int `yaml:"frame_rate"` + GopSize int `yaml:"gop_size"` + ScaleWidth int `yaml:"scale_width"` + ScaleHeight int `yaml:"scale_height"` + } `yaml:"ffmpeg"` + Retry struct { + MaxRetries int `yaml:"max_retries"` + RetryInterval time.Duration `yaml:"retry_interval"` + } `yaml:"retry"` + Metrics struct { + Enabled bool `yaml:"enabled"` + Path string `yaml:"path"` + } `yaml:"metrics"` + Cache struct { + Enabled bool `yaml:"enabled"` + MaxSize int `yaml:"max_size"` + ExpireTime time.Duration `yaml:"expire_time"` + } `yaml:"cache"` +} + +// 添加 Prometheus 指标 +var ( + activeStreamsGauge = prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "webrtc_active_streams", + Help: "Number of active WebRTC streams", + }) + + ffmpegErrorsCounter = prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "ffmpeg_errors_total", + Help: "Total number of FFmpeg errors", + }, []string{"stream_name"}) + + streamLatencyHistogram = prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Name: "stream_processing_latency_seconds", + Help: "Latency of stream processing in seconds", + Buckets: prometheus.LinearBuckets(0, 0.1, 10), // 0-1s, 100ms buckets + }, []string{"stream_name"}) + + writeTimeoutCounter = prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "write_timeouts_total", + Help: "Total number of write timeouts", + }, []string{"stream_name"}) + + streamProcessingDuration = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Name: "stream_processing_duration_seconds", + Help: "Time spent processing each stream", + Buckets: prometheus.ExponentialBuckets(0.1, 2, 10), + }, + []string{"stream_name"}, + ) + + requestDurationHistogram = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Name: "http_request_duration_seconds", + Help: "HTTP request duration in seconds", + Buckets: prometheus.DefBuckets, + }, + []string{"path", "method", "status"}, + ) + + memoryUsageGauge = prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "app_memory_usage_bytes", + Help: "Current memory usage in bytes", + }) +) + +var ( + appCache *cache.Cache +) + +// 添加自定义错误类型 +type StreamError struct { + Code int + Message string + Err error +} + +func (e *StreamError) Error() string { + if e.Err != nil { + return fmt.Sprintf("%s: %v", e.Message, e.Err) + } + return e.Message +} + +func loadConfig() (*Config, error) { + f, err := os.Open("config.yaml") + if err != nil { + return nil, fmt.Errorf("error opening config file: %v", err) + } + defer f.Close() + + var cfg Config + decoder := yaml.NewDecoder(f) + if err := decoder.Decode(&cfg); err != nil { + return nil, fmt.Errorf("error decoding config file: %v", err) + } + + // 添加配置验证 + if err := validateConfig(&cfg); err != nil { + return nil, fmt.Errorf("invalid configuration: %v", err) + } + + // 打印当前配置 + log.Printf("Server Configuration:") + log.Printf(" Port: %s", cfg.Server.Port) + log.Printf(" Read Timeout: %v", cfg.Server.ReadTimeout) + log.Printf(" Write Timeout: %v", cfg.Server.WriteTimeout) + log.Printf("FFmpeg Configuration:") + log.Printf(" Max Concurrent Streams: %d", cfg.FFmpeg.MaxConcurrentStreams) + log.Printf(" Preset: %s", cfg.FFmpeg.Preset) + log.Printf(" Bitrate: %s", cfg.FFmpeg.Bitrate) + // ... 其他配置项 + + return &cfg, nil +} + +func validateConfig(cfg *Config) error { + if cfg.Server.Port == "" { + return fmt.Errorf("server port is required") + } + if cfg.Server.ReadTimeout <= 0 { + return fmt.Errorf("server read timeout must be positive") + } + if cfg.Server.WriteTimeout <= 0 { + return fmt.Errorf("server write timeout must be positive") + } + if cfg.FFmpeg.MaxConcurrentStreams <= 0 { + return fmt.Errorf("max concurrent streams must be positive") + } + if cfg.FFmpeg.FrameRate <= 0 { + return fmt.Errorf("frame rate must be positive") + } + // 添加 FFmpeg 配置验证 + if cfg.FFmpeg.Preset == "" { + return fmt.Errorf("ffmpeg preset is required") + } + if cfg.FFmpeg.Bitrate == "" { + return fmt.Errorf("ffmpeg bitrate is required") + } + if cfg.FFmpeg.Maxrate == "" { + return fmt.Errorf("ffmpeg maxrate is required") + } + if cfg.FFmpeg.BufferSize == "" { + return fmt.Errorf("ffmpeg buffer size is required") + } + if cfg.FFmpeg.ScaleWidth <= 0 { + return fmt.Errorf("ffmpeg scale width must be positive") + } + if cfg.FFmpeg.ScaleHeight <= 0 { + return fmt.Errorf("ffmpeg scale height must be positive") + } + // 添加重试配置验证 + if cfg.Retry.MaxRetries < 0 { + return fmt.Errorf("retry max retries must be non-negative") + } + if cfg.Retry.RetryInterval <= 0 { + return fmt.Errorf("retry interval must be positive") + } + // ... 其他验证 + return nil +} + +func initMetrics() { + prometheus.MustRegister(activeStreamsGauge) + prometheus.MustRegister(ffmpegErrorsCounter) + prometheus.MustRegister(streamLatencyHistogram) + prometheus.MustRegister(writeTimeoutCounter) + prometheus.MustRegister(streamProcessingDuration) + prometheus.MustRegister(requestDurationHistogram) + prometheus.MustRegister(memoryUsageGauge) +} + +func recoveryMiddleware(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + defer func() { + if err := recover(); err != nil { + log.Printf("Panic recovered: %v\nStack trace: %s", err, debug.Stack()) + http.Error(w, "Internal server error", http.StatusInternalServerError) + } + }() + next.ServeHTTP(w, r) + }) +} + +type LogEntry struct { + Level string `json:"level"` + Timestamp time.Time `json:"timestamp"` + Message string `json:"message"` + StreamID string `json:"stream_id,omitempty"` + Error string `json:"error,omitempty"` +} + +func logError(format string, v ...interface{}) { + entry := LogEntry{ + Level: "ERROR", + Timestamp: time.Now(), + Message: fmt.Sprintf(format, v...), + } + json.NewEncoder(os.Stderr).Encode(entry) +} + +func logInfo(format string, v ...interface{}) { + entry := LogEntry{ + Level: "INFO", + Timestamp: time.Now(), + Message: fmt.Sprintf(format, v...), + } + json.NewEncoder(os.Stdout).Encode(entry) +} + +func securityHeaders(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("X-Content-Type-Options", "nosniff") + w.Header().Set("X-Frame-Options", "DENY") + w.Header().Set("X-XSS-Protection", "1; mode=block") + next.ServeHTTP(w, r) + }) +} + +func gracefulShutdown(server *http.Server, timeout time.Duration) { + done := make(chan bool) + go func() { + signalChan := make(chan os.Signal, 1) + signal.Notify(signalChan, os.Interrupt, syscall.SIGTERM) + <-signalChan + + log.Println("Shutdown signal received") + + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + + server.SetKeepAlivesEnabled(false) + if err := server.Shutdown(ctx); err != nil { + log.Printf("Could not gracefully shutdown the server: %v\n", err) + } + close(done) + }() +} + +// 添加 CORS 中间件 +func corsMiddleware(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // 允许特定的源 + allowedOrigins := []string{ + "http://127.0.0.1:5500", + "http://localhost:5500", + // 添加其他需要的源 + } + + origin := r.Header.Get("Origin") + for _, allowedOrigin := range allowedOrigins { + if origin == allowedOrigin { + w.Header().Set("Access-Control-Allow-Origin", origin) + break + } + } + + // 允许的请求方法 + w.Header().Set("Access-Control-Allow-Methods", "POST, GET, OPTIONS, PUT, DELETE") + + // 允许的请求头 + w.Header().Set("Access-Control-Allow-Headers", "Accept, Content-Type, Content-Length, Accept-Encoding, X-CSRF-Token, Authorization") + + // 允许凭证 + w.Header().Set("Access-Control-Allow-Credentials", "true") + + // 设置预检请求的缓存时间 + w.Header().Set("Access-Control-Max-Age", "86400") + + // 处理预检请求 + if r.Method == "OPTIONS" { + w.WriteHeader(http.StatusOK) + return + } + + next.ServeHTTP(w, r) + }) +} + +func main() { + log.SetFlags(log.Ldate | log.Ltime | log.Lshortfile) + + mediaEngine := webrtc.MediaEngine{} + if err := mediaEngine.RegisterDefaultCodecs(); err != nil { + log.Fatal("Failed to register default codecs:", err) + } + + api := webrtc.NewAPI(webrtc.WithMediaEngine(&mediaEngine)) + + // 加载配置 + cfg, err := loadConfig() + if err != nil { + log.Fatalf("Failed to load config: %v", err) + } + + // 初始化指标 + if cfg.Metrics.Enabled { + initMetrics() + } + + // 初始化缓存 + if cfg.Cache.Enabled { + appCache = cache.New(cfg.Cache.ExpireTime, 2*cfg.Cache.ExpireTime) + log.Println("Cache initialized") + } + + // 修改资源限制部分 + var ( + activeStreams = make(chan struct{}, cfg.FFmpeg.MaxConcurrentStreams) + streamsMutex sync.RWMutex + activeStreamCount int + ) + + // 添加 Prometheus metrics endpoint + if cfg.Metrics.Enabled { + http.Handle(cfg.Metrics.Path, promhttp.Handler()) + } + + // 创建用于优雅关闭的信号处理 + signalChan := make(chan os.Signal, 1) + signal.Notify(signalChan, os.Interrupt, syscall.SIGTERM) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + go func() { + <-signalChan + log.Println("Received shutdown signal, cleaning up...") + cancel() + }() + + // 创建 mux 并应用中间件 + mux := http.NewServeMux() + + // 注册路由 + mux.HandleFunc("/offer", func(w http.ResponseWriter, r *http.Request) { + requestID := fmt.Sprintf("%d", time.Now().UnixNano()) + log.Printf("[%s] Received new offer request", requestID) + + // 在处理新请求前检是否达到最大并发数 + select { + case activeStreams <- struct{}{}: // 获取令牌 + streamsMutex.Lock() + activeStreamCount++ + activeStreamsGauge.Set(float64(activeStreamCount)) + streamsMutex.Unlock() + default: + http.Error(w, "Max concurrent streams reached", http.StatusServiceUnavailable) + return + } + + log.Println("Received /offer request") + + w.Header().Set("Access-Control-Allow-Origin", "*") + w.Header().Set("Access-Control-Allow-Methods", "POST, OPTIONS") + w.Header().Set("Access-Control-Allow-Headers", "Content-Type") + + if r.Method == http.MethodOptions { + w.WriteHeader(http.StatusOK) + return + } + + if r.Method != http.MethodPost { + http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) + return + } + + const ( + maxRequestBodySize = 1 << 20 // 1MB + ) + + // 在处理请求时添加大小限制 + body, err := io.ReadAll(io.LimitReader(r.Body, maxRequestBodySize)) + if err != nil { + log.Println("Failed to read request body:", err) + http.Error(w, "Failed to read request body", http.StatusBadRequest) + return + } + defer r.Body.Close() + + var offerReq OfferRequest + if err := json.Unmarshal(body, &offerReq); err != nil { + log.Println("Failed to parse request:", err) + http.Error(w, "Invalid request format", http.StatusBadRequest) + return + } + + // 添加流配置验证 + if err := validateStreamConfig(offerReq.StreamConfig); err != nil { + log.Printf("Invalid stream config: %v", err) + http.Error(w, fmt.Sprintf("Invalid stream config: %v", err), http.StatusBadRequest) + return + } + + // 使用缓存储或检某些数据(例如,已处理的流) + if cfg.Cache.Enabled { + cacheKey := fmt.Sprintf("%s_%s", offerReq.StreamConfig.URL, offerReq.StreamConfig.Name) + if _, found := appCache.Get(cacheKey); found { + log.Printf("Stream %s is already being processed\n", offerReq.StreamConfig.Name) + http.Error(w, "Stream already being processed", http.StatusConflict) + <-activeStreams // 释放令牌 + return + } + // 使用新的缓存键 + appCache.Set(cacheKey, true, cache.DefaultExpiration) + } + + // 在处理请求前先测试 RTSP 流是否可访问 + if err := testRTSPStream(offerReq.StreamConfig.URL); err != nil { + log.Printf("RTSP stream test failed: %v", err) + http.Error(w, fmt.Sprintf("RTSP stream test failed: %v", err), http.StatusBadRequest) + return + } + + configuration := webrtc.Configuration{} + peerConnection, err := api.NewPeerConnection(configuration) + if err != nil { + log.Println("Failed to create peer connection:", err) + http.Error(w, "Failed to create peer connection", http.StatusInternalServerError) + if cfg.Cache.Enabled { + appCache.Delete(offerReq.StreamConfig.Name) // 处理失败,移除缓存 + } + return + } + + log.Printf("[%s] Peer connection created successfully", requestID) + + var wg sync.WaitGroup + done := make(chan struct{}) + cmdDone := make(chan struct{}) + var cmd *exec.Cmd + var cmdMutex sync.Mutex + + var cleanupOnce sync.Once + var cleanupMutex sync.Mutex + isCleanedUp := false + + cleanup := func() { + cleanupOnce.Do(func() { + cleanupMutex.Lock() + if isCleanedUp { + cleanupMutex.Unlock() + return + } + isCleanedUp = true + cleanupMutex.Unlock() + + log.Printf("Cleanup triggered for stream: %s\n", offerReq.StreamConfig.URL) + + // 使用相同的缓存键进行删除 + if cfg.Cache.Enabled { + cacheKey := fmt.Sprintf("%s_%s", offerReq.StreamConfig.URL, offerReq.StreamConfig.Name) + appCache.Delete(cacheKey) + } + + select { + case <-done: + // channel 已经关闭 + default: + close(done) + } + + // 等待 FFmpeg 处理完成 + select { + case <-cmdDone: + case <-time.After(5 * time.Second): + log.Println("FFmpeg cleanup timed out") + } + + wg.Wait() + + if err := peerConnection.Close(); err != nil { + log.Printf("Failed to close peer connection: %v", err) + } + + if cfg.Cache.Enabled { + appCache.Delete(offerReq.StreamConfig.Name) + } + + // 确 FFmpeg 进程被终止 + cmdMutex.Lock() + if cmd != nil && cmd.Process != nil { + if err := cmd.Process.Kill(); err != nil { + if !strings.Contains(err.Error(), "process already finished") { + log.Printf("Error killing FFmpeg process: %v", err) + } + } + // 等待进程完全退出 + if err := cmd.Wait(); err != nil { + log.Printf("Error waiting for FFmpeg process to exit: %v", err) + } + } + cmdMutex.Unlock() + + log.Printf("Cleanup completed for stream: %s\n", offerReq.StreamConfig.URL) + }) + } + + // 修改这里:使用符合规范的 streamID + streamID := fmt.Sprintf("stream_%s", strings.ReplaceAll(offerReq.StreamConfig.Name, " ", "_")) + videoTrack, err := webrtc.NewTrackLocalStaticSample( + webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeH264}, + "video", + streamID, + ) + if err != nil { + log.Println("Failed to create video track:", err) + cleanup() + http.Error(w, "Failed to create video track", http.StatusInternalServerError) + return + } + + rtpSender, err := peerConnection.AddTrack(videoTrack) + if err != nil { + log.Println("Failed to add video track:", err) + cleanup() + http.Error(w, "Failed to add video track", http.StatusInternalServerError) + return + } + + wg.Add(1) + go func() { + defer wg.Done() + rtcpBuf := make([]byte, 1500) + for { + select { + case <-done: + return + default: + if _, _, rtcpErr := rtpSender.Read(rtcpBuf); rtcpErr != nil { + if rtcpErr != io.EOF { + log.Printf("rtcp error: %v", rtcpErr) + } + return + } + } + } + }() + + peerConnection.OnConnectionStateChange(func(s webrtc.PeerConnectionState) { + log.Printf("Peer Connection State has changed to %s for stream: %s\n", s.String(), offerReq.StreamConfig.URL) + switch s { + case webrtc.PeerConnectionStateFailed, + webrtc.PeerConnectionStateClosed, + webrtc.PeerConnectionStateDisconnected: + log.Printf("Peer Connection %s, cleaning up...\n", s.String()) + cleanup() + } + }) + + peerConnection.OnICEConnectionStateChange(func(connectionState webrtc.ICEConnectionState) { + log.Printf("ICE Connection State has changed: %s\n", connectionState.String()) + }) + + log.Printf("[%s] Setting remote description", requestID) + if err = peerConnection.SetRemoteDescription(offerReq.Offer); err != nil { + log.Printf("[%s] Failed to set remote description: %v", requestID, err) + cleanup() + http.Error(w, fmt.Sprintf("Failed to set remote description: %v", err), http.StatusInternalServerError) + return + } + log.Printf("[%s] Remote description set successfully", requestID) + + log.Printf("[%s] Creating answer", requestID) + answer, err := peerConnection.CreateAnswer(nil) + if err != nil { + log.Printf("[%s] Failed to create answer: %v", requestID, err) + cleanup() + http.Error(w, fmt.Sprintf("Failed to create answer: %v", err), http.StatusInternalServerError) + return + } + log.Printf("[%s] Answer created successfully", requestID) + + log.Printf("[%s] Setting local description", requestID) + if err = peerConnection.SetLocalDescription(answer); err != nil { + log.Printf("[%s] Failed to set local description: %v", requestID, err) + cleanup() + http.Error(w, fmt.Sprintf("Failed to set local description: %v", err), http.StatusInternalServerError) + return + } + log.Printf("[%s] Local description set successfully", requestID) + + gatherComplete := webrtc.GatheringCompletePromise(peerConnection) + select { + case <-gatherComplete: + log.Println("ICE gathering completed") + case <-time.After(3 * time.Second): + log.Println("ICE gathering timed out") + cleanup() + http.Error(w, "ICE gathering timed out", http.StatusInternalServerError) + return + } + + response := peerConnection.LocalDescription() + w.Header().Set("Content-Type", "application/json") + if err := json.NewEncoder(w).Encode(response); err != nil { + log.Println("Failed to encode response:", err) + cleanup() + http.Error(w, "Failed to encode response", http.StatusInternalServerError) + return + } + + wg.Add(1) + go func() { + defer wg.Done() + defer func() { + select { + case <-cmdDone: + default: + close(cmdDone) + } + // 释放流资源 + <-activeStreams + streamsMutex.Lock() + activeStreamCount-- + activeStreamsGauge.Set(float64(activeStreamCount)) + streamsMutex.Unlock() + }() + + // 使用带缓冲的管道来控制数据流 + bufferPool := sync.Pool{ + New: func() interface{} { + return make([]byte, 1024*1024) + }, + } + + startTime := time.Now() + defer func() { + duration := time.Since(startTime).Seconds() + streamLatencyHistogram.WithLabelValues(offerReq.StreamConfig.Name).Observe(duration) + }() + + cmdMutex.Lock() + cmd = exec.CommandContext(ctx, "ffmpeg", + "-i", offerReq.StreamConfig.URL, + "-c:v", "libx264", + "-preset", "ultrafast", + "-tune", "zerolatency", + "-profile:v", "baseline", + "-level", "3.0", + "-x264-params", "keyint=30:min-keyint=30:scenecut=0:bframes=0", + "-b:v", "1000k", + "-maxrate", "1500k", + "-bufsize", "2000k", + "-r", "25", + "-g", "30", + "-threads", "4", + "-f", "h264", + "-pix_fmt", "yuv420p", + "-vf", "scale=640:480", + "-movflags", "+faststart", + "-") + cmdMutex.Unlock() + + // 添加错误输出捕获 + var errBuf bytes.Buffer + cmd.Stderr = &errBuf + + ffmpegStdout, err := cmd.StdoutPipe() + if err != nil { + log.Printf("Failed to create stdout pipe: %v", err) + return + } + + if err := cmd.Start(); err != nil { + ffmpegErrorsCounter.WithLabelValues(offerReq.StreamConfig.Name).Inc() + log.Printf("Failed to start FFmpeg: %v\nFFmpeg error: %s", err, errBuf.String()) + return + } + + reader := bufio.NewReaderSize(ffmpegStdout, 8*1024*1024) // 使用8MB缓冲区 + var lastFrameTime time.Time + + // 在 FFmpeg 处理部分添加帧控制 + targetFPS := cfg.FFmpeg.FrameRate + frameInterval := time.Second / time.Duration(targetFPS) + + // 在视频处理循环中添加帧率控制 + for { + select { + case <-ctx.Done(): + log.Printf("[%s] Context cancelled", streamID) + return + case <-done: + log.Printf("[%s] Done signal received", streamID) + return + default: + // 控制帧率 + elapsed := time.Since(lastFrameTime) + if elapsed < frameInterval { + time.Sleep(frameInterval - elapsed) + } + lastFrameTime = time.Now() + + buffer := bufferPool.Get().([]byte) + n, err := reader.Read(buffer) + + if err != nil { + bufferPool.Put(buffer) + if err != io.EOF { + log.Printf("[%s] Error reading from FFmpeg: %v", streamID, err) + } + return + } + + writeCtx, writeCancel := context.WithTimeout(ctx, cfg.Server.WriteTimeout) + select { + case <-writeCtx.Done(): + writeCancel() + bufferPool.Put(buffer) + log.Printf("[%s] Write timeout", streamID) + return + default: + err = videoTrack.WriteSample(media.Sample{ + Data: buffer[:n], + Duration: frameInterval, + }) + writeCancel() + bufferPool.Put(buffer) + + if err != nil { + log.Printf("[%s] Error writing video sample: %v", streamID, err) + return + } + } + } + } + + // 在函数结束时清理 FFmpeg 进程 + defer func() { + cmdMutex.Lock() + if cmd != nil && cmd.Process != nil { + if err := cmd.Process.Kill(); err != nil { + if !strings.Contains(err.Error(), "process already finished") { + log.Printf("Error killing process: %v", err) + } + } + cmd.Wait() + } + cmdMutex.Unlock() + }() + }() + }) + + type HealthStatus struct { + Status string `json:"status"` + ActiveStreams int `json:"active_streams"` + MaxStreams int `json:"max_streams"` + MemStats runtime.MemStats `json:"mem_stats"` + FFmpegStatus string `json:"ffmpeg_status"` + UptimeSeconds int64 `json:"uptime_seconds"` + Version string `json:"version"` + } + + var startTime = time.Now() + + http.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) { + streamsMutex.RLock() + health := HealthStatus{ + Status: "healthy", + ActiveStreams: activeStreamCount, + MaxStreams: cfg.FFmpeg.MaxConcurrentStreams, + UptimeSeconds: int64(time.Since(startTime).Seconds()), + Version: "1.0.0", // 添加版本信息 + } + streamsMutex.RUnlock() + + // 检查 FFmpeg 是否可用 + cmd := exec.Command("ffmpeg", "-version") + if err := cmd.Run(); err != nil { + health.FFmpegStatus = "unavailable" + health.Status = "degraded" + } else { + health.FFmpegStatus = "available" + } + + runtime.ReadMemStats(&health.MemStats) + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(health) + }) + + // 修改中间件链 + handler := chainMiddleware( + mux, + recoveryMiddleware, + corsMiddleware, // 添加 CORS 中间件 + rateLimitMiddleware, + metricsMiddleware, + securityHeaders, + ) + + server := &http.Server{ + Addr: cfg.Server.Port, + Handler: handler, + ReadTimeout: cfg.Server.ReadTimeout, + WriteTimeout: cfg.Server.WriteTimeout, + MaxHeaderBytes: cfg.Server.MaxHeaderBytes, + } + + go func() { + <-ctx.Done() + shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 30*time.Second) + defer shutdownCancel() + server.Shutdown(shutdownCtx) + }() + + log.Printf("WebRTC Server running at %s\n", cfg.Server.Port) + if err := server.ListenAndServe(); err != http.ErrServerClosed { + log.Fatal("Failed to start HTTP server:", err) + } +} + +func validateStreamConfig(cfg StreamConfig) error { + if cfg.Name == "" { + return fmt.Errorf("stream name is required") + } + if cfg.URL == "" { + return fmt.Errorf("stream URL is required") + } + // 验证 URL 格式 + if _, err := url.Parse(cfg.URL); err != nil { + return fmt.Errorf("invalid stream URL: %v", err) + } + return nil +} + +// 添加速率限制中间件 +func rateLimitMiddleware(next http.Handler) http.Handler { + limiter := rate.NewLimiter(rate.Every(time.Second), 100) // 每秒100个请求 + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if !limiter.Allow() { + http.Error(w, "Too many requests", http.StatusTooManyRequests) + return + } + next.ServeHTTP(w, r) + }) +} + +// 在处理请求前先测试 RTSP 流是否可访问 +func testRTSPStream(url string) error { + cmd := exec.Command("ffprobe", + "-v", "error", + "-select_streams", "v:0", // 只选择第一个视频流 + "-show_entries", "stream=width,height,codec_name,r_frame_rate", + "-of", "json", + "-i", url) + + output, err := cmd.CombinedOutput() + if err != nil { + return fmt.Errorf("failed to connect to RTSP stream: %v, output: %s", err, string(output)) + } + + log.Printf("RTSP stream info: %s", string(output)) + return nil +} + +// 添加中间件链函数 +func chainMiddleware(handler http.Handler, middlewares ...func(http.Handler) http.Handler) http.Handler { + for _, middleware := range middlewares { + handler = middleware(handler) + } + return handler +} + +// 添加请求追踪中间件 +func metricsMiddleware(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + start := time.Now() + + // 包装 ResponseWriter 以捕获状态码 + wrapped := &responseWriter{ResponseWriter: w, status: http.StatusOK} + next.ServeHTTP(wrapped, r) + + duration := time.Since(start).Seconds() + requestDurationHistogram.WithLabelValues( + r.URL.Path, + r.Method, + fmt.Sprintf("%d", wrapped.status), + ).Observe(duration) + + // 更新内存使用指标 + var m runtime.MemStats + runtime.ReadMemStats(&m) + memoryUsageGauge.Set(float64(m.Alloc)) + }) +} + +// 添加 ResponseWriter 包装器 +type responseWriter struct { + http.ResponseWriter + status int +} + +func (rw *responseWriter) WriteHeader(code int) { + rw.status = code + rw.ResponseWriter.WriteHeader(code) +} diff --git a/stream_server.exe b/stream_server.exe new file mode 100644 index 0000000..85b8918 Binary files /dev/null and b/stream_server.exe differ diff --git a/test.html b/test.html new file mode 100644 index 0000000..aa4493d --- /dev/null +++ b/test.html @@ -0,0 +1,201 @@ + + + + + Multi-Stream WebRTC RTSP Player + + + +
+ + + + + \ No newline at end of file