This commit is contained in:
sladro 2025-12-09 17:31:21 +08:00
commit c33595f6d6
6 changed files with 1456 additions and 0 deletions

89
config.yaml Normal file
View File

@ -0,0 +1,89 @@
# 服务器基本配置
server:
port: ":8080" # HTTP 服务器监听端口
read_timeout: 10s # HTTP 请求读取超时时间,建议 10-30 秒
write_timeout: 5s # HTTP 响应写入超时时间,建议 5-15 秒
max_header_bytes: 1048576 # HTTP 请求头最大字节数1MB防止头部攻击
# FFmpeg 转码配置
ffmpeg:
max_concurrent_streams: 10 # 最大并发流数量,根据服务器性能调整
preset: "medium" # 编码速度预设可选ultrafast, superfast, veryfast, faster, fast, medium
# ultrafast: 最低延迟,较低质量
# veryfast: 推荐值,平衡延迟和质量
# medium: 更好的质量,但延迟更高
bitrate: "2000k" # 视频码率,影响画质和带宽使用
# 1080p 建议 2000k-4000k
# 720p 建议 1500k-2500k
maxrate: "3000k" # 最大码率,建议设置为 bitrate 的 1.25-1.5 倍
buffer_size: "4000k" # 编码器缓冲区大小,建议设置为 bitrate 的 2 倍
# 编码参数
keyint: 60 # 关键帧间隔,影响延迟和定位能力
# 建议值:帧率的 1-2 倍
min_keyint: 60 # 最小关键帧间隔,通常与 keyint 相同
threads: 4 # 编码线程数,建议设置为 CPU 核心数的一半
frame_rate: 25 # 输出帧率常用值24, 25, 30, 60
gop_size: 60 # GOP 大小,建议与 keyint 相同
# 输出分辨率
scale_width: 1920 # 输出视频宽度(像素)
scale_height: 1080 # 输出视频高度(像素)
# 常用分辨率:
# 1080p: 1920x1080
# 720p: 1280x720
# 480p: 854x480
# 重试策略配置
retry:
max_retries: 3 # 连接失败时的最大重试次数
retry_interval: 2s # 重试间隔时间
# Prometheus 监控配置
metrics:
enabled: true # 是否启用 Prometheus 指标收集
path: "/metrics" # Prometheus 指标访问路径
# 缓存配置
cache:
enabled: true # 是否启用缓存
max_size: 1000 # 最大缓存条目数
expire_time: 300s # 缓存过期时间,建议 5-10 分钟
# 针对不同场景的推荐配置:
# 1. 低延迟场景(如实时监控):
# ffmpeg:
# preset: "ultrafast"
# bitrate: "1500k"
# maxrate: "2000k"
# buffer_size: "2000k"
# keyint: 30
# min_keyint: 30
# frame_rate: 30
# scale_width: 854
# scale_height: 480
# 2. 高质量场景(如视频会议):
# ffmpeg:
# preset: "medium"
# bitrate: "4000k"
# maxrate: "5000k"
# buffer_size: "8000k"
# keyint: 60
# min_keyint: 60
# frame_rate: 30
# scale_width: 1920
# scale_height: 1080
# 3. 资源受限场景(如低配置服务器):
# ffmpeg:
# max_concurrent_streams: 5
# preset: "ultrafast"
# bitrate: "1000k"
# maxrate: "1500k"
# buffer_size: "2000k"
# threads: 2
# frame_rate: 25
# scale_width: 854
# scale_height: 480

46
go.mod Normal file
View File

@ -0,0 +1,46 @@
module test
go 1.23.3
require (
github.com/patrickmn/go-cache v2.1.0+incompatible
github.com/pion/webrtc/v3 v3.3.4
github.com/prometheus/client_golang v1.20.5
gopkg.in/yaml.v2 v2.4.0
)
require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/klauspost/compress v1.17.9 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/pion/datachannel v1.5.8 // indirect
github.com/pion/dtls/v2 v2.2.12 // indirect
github.com/pion/ice/v2 v2.3.36 // indirect
github.com/pion/interceptor v0.1.29 // indirect
github.com/pion/logging v0.2.2 // indirect
github.com/pion/mdns v0.0.12 // indirect
github.com/pion/randutil v0.1.0 // indirect
github.com/pion/rtcp v1.2.14 // indirect
github.com/pion/rtp v1.8.7 // indirect
github.com/pion/sctp v1.8.19 // indirect
github.com/pion/sdp/v3 v3.0.9 // indirect
github.com/pion/srtp/v2 v2.0.20 // indirect
github.com/pion/stun v0.6.1 // indirect
github.com/pion/transport/v2 v2.2.10 // indirect
github.com/pion/turn/v2 v2.1.6 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/prometheus/client_model v0.6.1 // indirect
github.com/prometheus/common v0.55.0 // indirect
github.com/prometheus/procfs v0.15.1 // indirect
github.com/stretchr/testify v1.9.0 // indirect
github.com/wlynxg/anet v0.0.3 // indirect
golang.org/x/crypto v0.24.0 // indirect
golang.org/x/net v0.26.0 // indirect
golang.org/x/sys v0.22.0 // indirect
golang.org/x/time v0.8.0
google.golang.org/protobuf v1.34.2 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

163
go.sum Normal file
View File

@ -0,0 +1,163 @@
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/uuid v1.3.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA=
github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc=
github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw=
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA=
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc=
github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ=
github.com/pion/datachannel v1.5.8 h1:ph1P1NsGkazkjrvyMfhRBUAWMxugJjq2HfQifaOoSNo=
github.com/pion/datachannel v1.5.8/go.mod h1:PgmdpoaNBLX9HNzNClmdki4DYW5JtI7Yibu8QzbL3tI=
github.com/pion/dtls/v2 v2.2.7/go.mod h1:8WiMkebSHFD0T+dIU+UeBaoV7kDhOW5oDCzZ7WZ/F9s=
github.com/pion/dtls/v2 v2.2.12 h1:KP7H5/c1EiVAAKUmXyCzPiQe5+bCJrpOeKg/L05dunk=
github.com/pion/dtls/v2 v2.2.12/go.mod h1:d9SYc9fch0CqK90mRk1dC7AkzzpwJj6u2GU3u+9pqFE=
github.com/pion/ice/v2 v2.3.36 h1:SopeXiVbbcooUg2EIR8sq4b13RQ8gzrkkldOVg+bBsc=
github.com/pion/ice/v2 v2.3.36/go.mod h1:mBF7lnigdqgtB+YHkaY/Y6s6tsyRyo4u4rPGRuOjUBQ=
github.com/pion/interceptor v0.1.29 h1:39fsnlP1U8gw2JzOFWdfCU82vHvhW9o0rZnZF56wF+M=
github.com/pion/interceptor v0.1.29/go.mod h1:ri+LGNjRUc5xUNtDEPzfdkmSqISixVTBF/z/Zms/6T4=
github.com/pion/logging v0.2.2 h1:M9+AIj/+pxNsDfAT64+MAVgJO0rsyLnoJKCqf//DoeY=
github.com/pion/logging v0.2.2/go.mod h1:k0/tDVsRCX2Mb2ZEmTqNa7CWsQPc+YYCB7Q+5pahoms=
github.com/pion/mdns v0.0.12 h1:CiMYlY+O0azojWDmxdNr7ADGrnZ+V6Ilfner+6mSVK8=
github.com/pion/mdns v0.0.12/go.mod h1:VExJjv8to/6Wqm1FXK+Ii/Z9tsVk/F5sD/N70cnYFbk=
github.com/pion/randutil v0.1.0 h1:CFG1UdESneORglEsnimhUjf33Rwjubwj6xfiOXBa3mA=
github.com/pion/randutil v0.1.0/go.mod h1:XcJrSMMbbMRhASFVOlj/5hQial/Y8oH/HVo7TBZq+j8=
github.com/pion/rtcp v1.2.12/go.mod h1:sn6qjxvnwyAkkPzPULIbVqSKI5Dv54Rv7VG0kNxh9L4=
github.com/pion/rtcp v1.2.14 h1:KCkGV3vJ+4DAJmvP0vaQShsb0xkRfWkO540Gy102KyE=
github.com/pion/rtcp v1.2.14/go.mod h1:sn6qjxvnwyAkkPzPULIbVqSKI5Dv54Rv7VG0kNxh9L4=
github.com/pion/rtp v1.8.3/go.mod h1:pBGHaFt/yW7bf1jjWAoUjpSNoDnw98KTMg+jWWvziqU=
github.com/pion/rtp v1.8.7 h1:qslKkG8qxvQ7hqaxkmL7Pl0XcUm+/Er7nMnu6Vq+ZxM=
github.com/pion/rtp v1.8.7/go.mod h1:pBGHaFt/yW7bf1jjWAoUjpSNoDnw98KTMg+jWWvziqU=
github.com/pion/sctp v1.8.19 h1:2CYuw+SQ5vkQ9t0HdOPccsCz1GQMDuVy5PglLgKVBW8=
github.com/pion/sctp v1.8.19/go.mod h1:P6PbDVA++OJMrVNg2AL3XtYHV4uD6dvfyOovCgMs0PE=
github.com/pion/sdp/v3 v3.0.9 h1:pX++dCHoHUwq43kuwf3PyJfHlwIj4hXA7Vrifiq0IJY=
github.com/pion/sdp/v3 v3.0.9/go.mod h1:B5xmvENq5IXJimIO4zfp6LAe1fD9N+kFv+V/1lOdz8M=
github.com/pion/srtp/v2 v2.0.20 h1:HNNny4s+OUmG280ETrCdgFndp4ufx3/uy85EawYEhTk=
github.com/pion/srtp/v2 v2.0.20/go.mod h1:0KJQjA99A6/a0DOVTu1PhDSw0CXF2jTkqOoMg3ODqdA=
github.com/pion/stun v0.6.1 h1:8lp6YejULeHBF8NmV8e2787BogQhduZugh5PdhDyyN4=
github.com/pion/stun v0.6.1/go.mod h1:/hO7APkX4hZKu/D0f2lHzNyvdkTGtIy3NDmLR7kSz/8=
github.com/pion/transport/v2 v2.2.1/go.mod h1:cXXWavvCnFF6McHTft3DWS9iic2Mftcz1Aq29pGcU5g=
github.com/pion/transport/v2 v2.2.3/go.mod h1:q2U/tf9FEfnSBGSW6w5Qp5PFWRLRj3NjLhCCgpRK4p0=
github.com/pion/transport/v2 v2.2.4/go.mod h1:q2U/tf9FEfnSBGSW6w5Qp5PFWRLRj3NjLhCCgpRK4p0=
github.com/pion/transport/v2 v2.2.10 h1:ucLBLE8nuxiHfvkFKnkDQRYWYfp8ejf4YBOPfaQpw6Q=
github.com/pion/transport/v2 v2.2.10/go.mod h1:sq1kSLWs+cHW9E+2fJP95QudkzbK7wscs8yYgQToO5E=
github.com/pion/transport/v3 v3.0.1/go.mod h1:UY7kiITrlMv7/IKgd5eTUcaahZx5oUN3l9SzK5f5xE0=
github.com/pion/transport/v3 v3.0.2 h1:r+40RJR25S9w3jbA6/5uEPTzcdn7ncyU44RWCbHkLg4=
github.com/pion/transport/v3 v3.0.2/go.mod h1:nIToODoOlb5If2jF9y2Igfx3PFYWfuXi37m0IlWa/D0=
github.com/pion/turn/v2 v2.1.3/go.mod h1:huEpByKKHix2/b9kmTAM3YoX6MKP+/D//0ClgUYR2fY=
github.com/pion/turn/v2 v2.1.6 h1:Xr2niVsiPTB0FPtt+yAWKFUkU1eotQbGgpTIld4x1Gc=
github.com/pion/turn/v2 v2.1.6/go.mod h1:huEpByKKHix2/b9kmTAM3YoX6MKP+/D//0ClgUYR2fY=
github.com/pion/webrtc/v3 v3.3.4 h1:v2heQVnXTSqNRXcaFQVOhIOYkLMxOu1iJG8uy1djvkk=
github.com/pion/webrtc/v3 v3.3.4/go.mod h1:liNa+E1iwyzyXqNUwvoMRNQ10x8h8FOeJKL8RkIbamE=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prometheus/client_golang v1.20.5 h1:cxppBPuYhUnsO6yo/aoRol4L7q7UFfdm+bR9r+8l63Y=
github.com/prometheus/client_golang v1.20.5/go.mod h1:PIEt8X02hGcP8JWbeHyeZ53Y/jReSnHgO035n//V5WE=
github.com/prometheus/client_model v0.6.1 h1:ZKSh/rekM+n3CeS952MLRAdFwIKqeY8b62p8ais2e9E=
github.com/prometheus/client_model v0.6.1/go.mod h1:OrxVMOVHjw3lKMa8+x6HeMGkHMQyHDk9E3jmP2AmGiY=
github.com/prometheus/common v0.55.0 h1:KEi6DK7lXW/m7Ig5i47x0vRzuBsHuvJdi5ee6Y3G1dc=
github.com/prometheus/common v0.55.0/go.mod h1:2SECS4xJG1kd8XF9IcM1gMX6510RAEL65zxzNImwdc8=
github.com/prometheus/procfs v0.15.1 h1:YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0learggepc=
github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk=
github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ=
github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.3/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/wlynxg/anet v0.0.3 h1:PvR53psxFXstc12jelG6f1Lv4MWqE0tI76/hHGjh9rg=
github.com/wlynxg/anet v0.0.3/go.mod h1:eay5PRQr7fIVAMbTbchTnO9gG65Hg/uYGdc7mguHxoA=
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.8.0/go.mod h1:mRqEX+O9/h5TFCrQhkgjo2yKi0yYA+9ecGkdQoHrywE=
golang.org/x/crypto v0.12.0/go.mod h1:NF0Gs7EO5K4qLn+Ylc+fih8BSTeIjAP05siRnAh98yw=
golang.org/x/crypto v0.18.0/go.mod h1:R0j02AL6hcrfOiy9T4ZYp/rcWeMxM3L6QYxlOuEG1mg=
golang.org/x/crypto v0.24.0 h1:mnl8DM0o513X8fdIkmyFE/5hTYxbwYOjDS/+rK6qpRI=
golang.org/x/crypto v0.24.0/go.mod h1:Z1PMYSOR5nyMcyAVAIQSKCDwalqy85Aqn1x3Ws4L5DM=
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=
golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
golang.org/x/net v0.9.0/go.mod h1:d48xBJpPfHeWQsugry2m+kC02ZBRGRgulfHnEXEuWns=
golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg=
golang.org/x/net v0.14.0/go.mod h1:PpSgVXXLK0OxS0F31C1/tv6XNguvCrnXIDrFMspZIUI=
golang.org/x/net v0.20.0/go.mod h1:z8BVo6PvndSri0LbOE3hAn0apkU+1YvI6E70E9jsnvY=
golang.org/x/net v0.26.0 h1:soB7SVo0PWrY4vPW/+ay0jKDNScG2X9wFeYlXIvJsOQ=
golang.org/x/net v0.26.0/go.mod h1:5YKkiSynbBIh3p6iOc/vibscux0x38BZDkn8sCUPxHE=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.7.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.9.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.22.0 h1:RI27ohtqKCnwULzJLqkv897zojh5/DwS/ENaMzUOaWI=
golang.org/x/sys v0.22.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k=
golang.org/x/term v0.7.0/go.mod h1:P32HKFT3hSsZrRxla30E9HqToFYAQPCMs/zFMBUFqPY=
golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo=
golang.org/x/term v0.11.0/go.mod h1:zC9APTIj3jG3FdV/Ons+XE1riIZXG4aZ4GTHiPZJPIU=
golang.org/x/term v0.16.0/go.mod h1:yn7UURbUtPyrVJPGPq404EukNFxcm/foM+bV/bfcDsY=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
golang.org/x/text v0.12.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
golang.org/x/time v0.8.0 h1:9i3RxcPv3PZnitoVGMPDKZSq1xW1gK1Xy3ArNOGZfEg=
golang.org/x/time v0.8.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc=
golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg=
google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

957
main.go Normal file
View File

@ -0,0 +1,957 @@
package main
import (
"bufio"
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"log"
"net/http"
"net/url"
"os"
"os/exec"
"os/signal"
"runtime"
"runtime/debug"
"strings"
"sync"
"syscall"
"time"
"github.com/patrickmn/go-cache"
"github.com/pion/webrtc/v3"
"github.com/pion/webrtc/v3/pkg/media"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"golang.org/x/time/rate"
"gopkg.in/yaml.v3"
)
type StreamConfig struct {
URL string `json:"url"`
Name string `json:"name"`
}
type OfferRequest struct {
Offer webrtc.SessionDescription `json:"offer"`
StreamConfig StreamConfig `json:"streamConfig"`
}
// 添加配置结构
type Config struct {
Server struct {
Port string `yaml:"port"`
ReadTimeout time.Duration `yaml:"read_timeout"`
WriteTimeout time.Duration `yaml:"write_timeout"`
MaxHeaderBytes int `yaml:"max_header_bytes"`
} `yaml:"server"`
FFmpeg struct {
MaxConcurrentStreams int `yaml:"max_concurrent_streams"`
Preset string `yaml:"preset"`
Bitrate string `yaml:"bitrate"`
Maxrate string `yaml:"maxrate"`
BufferSize string `yaml:"buffer_size"`
Keyint int `yaml:"keyint"`
MinKeyint int `yaml:"min_keyint"`
Threads int `yaml:"threads"`
FrameRate int `yaml:"frame_rate"`
GopSize int `yaml:"gop_size"`
ScaleWidth int `yaml:"scale_width"`
ScaleHeight int `yaml:"scale_height"`
} `yaml:"ffmpeg"`
Retry struct {
MaxRetries int `yaml:"max_retries"`
RetryInterval time.Duration `yaml:"retry_interval"`
} `yaml:"retry"`
Metrics struct {
Enabled bool `yaml:"enabled"`
Path string `yaml:"path"`
} `yaml:"metrics"`
Cache struct {
Enabled bool `yaml:"enabled"`
MaxSize int `yaml:"max_size"`
ExpireTime time.Duration `yaml:"expire_time"`
} `yaml:"cache"`
}
// 添加 Prometheus 指标
var (
activeStreamsGauge = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "webrtc_active_streams",
Help: "Number of active WebRTC streams",
})
ffmpegErrorsCounter = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "ffmpeg_errors_total",
Help: "Total number of FFmpeg errors",
}, []string{"stream_name"})
streamLatencyHistogram = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Name: "stream_processing_latency_seconds",
Help: "Latency of stream processing in seconds",
Buckets: prometheus.LinearBuckets(0, 0.1, 10), // 0-1s, 100ms buckets
}, []string{"stream_name"})
writeTimeoutCounter = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "write_timeouts_total",
Help: "Total number of write timeouts",
}, []string{"stream_name"})
streamProcessingDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "stream_processing_duration_seconds",
Help: "Time spent processing each stream",
Buckets: prometheus.ExponentialBuckets(0.1, 2, 10),
},
[]string{"stream_name"},
)
requestDurationHistogram = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "http_request_duration_seconds",
Help: "HTTP request duration in seconds",
Buckets: prometheus.DefBuckets,
},
[]string{"path", "method", "status"},
)
memoryUsageGauge = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "app_memory_usage_bytes",
Help: "Current memory usage in bytes",
})
)
var (
appCache *cache.Cache
)
// 添加自定义错误类型
type StreamError struct {
Code int
Message string
Err error
}
func (e *StreamError) Error() string {
if e.Err != nil {
return fmt.Sprintf("%s: %v", e.Message, e.Err)
}
return e.Message
}
func loadConfig() (*Config, error) {
f, err := os.Open("config.yaml")
if err != nil {
return nil, fmt.Errorf("error opening config file: %v", err)
}
defer f.Close()
var cfg Config
decoder := yaml.NewDecoder(f)
if err := decoder.Decode(&cfg); err != nil {
return nil, fmt.Errorf("error decoding config file: %v", err)
}
// 添加配置验证
if err := validateConfig(&cfg); err != nil {
return nil, fmt.Errorf("invalid configuration: %v", err)
}
// 打印当前配置
log.Printf("Server Configuration:")
log.Printf(" Port: %s", cfg.Server.Port)
log.Printf(" Read Timeout: %v", cfg.Server.ReadTimeout)
log.Printf(" Write Timeout: %v", cfg.Server.WriteTimeout)
log.Printf("FFmpeg Configuration:")
log.Printf(" Max Concurrent Streams: %d", cfg.FFmpeg.MaxConcurrentStreams)
log.Printf(" Preset: %s", cfg.FFmpeg.Preset)
log.Printf(" Bitrate: %s", cfg.FFmpeg.Bitrate)
// ... 其他配置项
return &cfg, nil
}
func validateConfig(cfg *Config) error {
if cfg.Server.Port == "" {
return fmt.Errorf("server port is required")
}
if cfg.Server.ReadTimeout <= 0 {
return fmt.Errorf("server read timeout must be positive")
}
if cfg.Server.WriteTimeout <= 0 {
return fmt.Errorf("server write timeout must be positive")
}
if cfg.FFmpeg.MaxConcurrentStreams <= 0 {
return fmt.Errorf("max concurrent streams must be positive")
}
if cfg.FFmpeg.FrameRate <= 0 {
return fmt.Errorf("frame rate must be positive")
}
// 添加 FFmpeg 配置验证
if cfg.FFmpeg.Preset == "" {
return fmt.Errorf("ffmpeg preset is required")
}
if cfg.FFmpeg.Bitrate == "" {
return fmt.Errorf("ffmpeg bitrate is required")
}
if cfg.FFmpeg.Maxrate == "" {
return fmt.Errorf("ffmpeg maxrate is required")
}
if cfg.FFmpeg.BufferSize == "" {
return fmt.Errorf("ffmpeg buffer size is required")
}
if cfg.FFmpeg.ScaleWidth <= 0 {
return fmt.Errorf("ffmpeg scale width must be positive")
}
if cfg.FFmpeg.ScaleHeight <= 0 {
return fmt.Errorf("ffmpeg scale height must be positive")
}
// 添加重试配置验证
if cfg.Retry.MaxRetries < 0 {
return fmt.Errorf("retry max retries must be non-negative")
}
if cfg.Retry.RetryInterval <= 0 {
return fmt.Errorf("retry interval must be positive")
}
// ... 其他验证
return nil
}
func initMetrics() {
prometheus.MustRegister(activeStreamsGauge)
prometheus.MustRegister(ffmpegErrorsCounter)
prometheus.MustRegister(streamLatencyHistogram)
prometheus.MustRegister(writeTimeoutCounter)
prometheus.MustRegister(streamProcessingDuration)
prometheus.MustRegister(requestDurationHistogram)
prometheus.MustRegister(memoryUsageGauge)
}
func recoveryMiddleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
defer func() {
if err := recover(); err != nil {
log.Printf("Panic recovered: %v\nStack trace: %s", err, debug.Stack())
http.Error(w, "Internal server error", http.StatusInternalServerError)
}
}()
next.ServeHTTP(w, r)
})
}
type LogEntry struct {
Level string `json:"level"`
Timestamp time.Time `json:"timestamp"`
Message string `json:"message"`
StreamID string `json:"stream_id,omitempty"`
Error string `json:"error,omitempty"`
}
func logError(format string, v ...interface{}) {
entry := LogEntry{
Level: "ERROR",
Timestamp: time.Now(),
Message: fmt.Sprintf(format, v...),
}
json.NewEncoder(os.Stderr).Encode(entry)
}
func logInfo(format string, v ...interface{}) {
entry := LogEntry{
Level: "INFO",
Timestamp: time.Now(),
Message: fmt.Sprintf(format, v...),
}
json.NewEncoder(os.Stdout).Encode(entry)
}
func securityHeaders(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("X-Content-Type-Options", "nosniff")
w.Header().Set("X-Frame-Options", "DENY")
w.Header().Set("X-XSS-Protection", "1; mode=block")
next.ServeHTTP(w, r)
})
}
func gracefulShutdown(server *http.Server, timeout time.Duration) {
done := make(chan bool)
go func() {
signalChan := make(chan os.Signal, 1)
signal.Notify(signalChan, os.Interrupt, syscall.SIGTERM)
<-signalChan
log.Println("Shutdown signal received")
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
server.SetKeepAlivesEnabled(false)
if err := server.Shutdown(ctx); err != nil {
log.Printf("Could not gracefully shutdown the server: %v\n", err)
}
close(done)
}()
}
// 添加 CORS 中间件
func corsMiddleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// 允许特定的源
allowedOrigins := []string{
"http://127.0.0.1:5500",
"http://localhost:5500",
// 添加其他需要的源
}
origin := r.Header.Get("Origin")
for _, allowedOrigin := range allowedOrigins {
if origin == allowedOrigin {
w.Header().Set("Access-Control-Allow-Origin", origin)
break
}
}
// 允许的请求方法
w.Header().Set("Access-Control-Allow-Methods", "POST, GET, OPTIONS, PUT, DELETE")
// 允许的请求头
w.Header().Set("Access-Control-Allow-Headers", "Accept, Content-Type, Content-Length, Accept-Encoding, X-CSRF-Token, Authorization")
// 允许凭证
w.Header().Set("Access-Control-Allow-Credentials", "true")
// 设置预检请求的缓存时间
w.Header().Set("Access-Control-Max-Age", "86400")
// 处理预检请求
if r.Method == "OPTIONS" {
w.WriteHeader(http.StatusOK)
return
}
next.ServeHTTP(w, r)
})
}
func main() {
log.SetFlags(log.Ldate | log.Ltime | log.Lshortfile)
mediaEngine := webrtc.MediaEngine{}
if err := mediaEngine.RegisterDefaultCodecs(); err != nil {
log.Fatal("Failed to register default codecs:", err)
}
api := webrtc.NewAPI(webrtc.WithMediaEngine(&mediaEngine))
// 加载配置
cfg, err := loadConfig()
if err != nil {
log.Fatalf("Failed to load config: %v", err)
}
// 初始化指标
if cfg.Metrics.Enabled {
initMetrics()
}
// 初始化缓存
if cfg.Cache.Enabled {
appCache = cache.New(cfg.Cache.ExpireTime, 2*cfg.Cache.ExpireTime)
log.Println("Cache initialized")
}
// 修改资源限制部分
var (
activeStreams = make(chan struct{}, cfg.FFmpeg.MaxConcurrentStreams)
streamsMutex sync.RWMutex
activeStreamCount int
)
// 添加 Prometheus metrics endpoint
if cfg.Metrics.Enabled {
http.Handle(cfg.Metrics.Path, promhttp.Handler())
}
// 创建用于优雅关闭的信号处理
signalChan := make(chan os.Signal, 1)
signal.Notify(signalChan, os.Interrupt, syscall.SIGTERM)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
<-signalChan
log.Println("Received shutdown signal, cleaning up...")
cancel()
}()
// 创建 mux 并应用中间件
mux := http.NewServeMux()
// 注册路由
mux.HandleFunc("/offer", func(w http.ResponseWriter, r *http.Request) {
requestID := fmt.Sprintf("%d", time.Now().UnixNano())
log.Printf("[%s] Received new offer request", requestID)
// 在处理新请求前检是否达到最大并发数
select {
case activeStreams <- struct{}{}: // 获取令牌
streamsMutex.Lock()
activeStreamCount++
activeStreamsGauge.Set(float64(activeStreamCount))
streamsMutex.Unlock()
default:
http.Error(w, "Max concurrent streams reached", http.StatusServiceUnavailable)
return
}
log.Println("Received /offer request")
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Access-Control-Allow-Methods", "POST, OPTIONS")
w.Header().Set("Access-Control-Allow-Headers", "Content-Type")
if r.Method == http.MethodOptions {
w.WriteHeader(http.StatusOK)
return
}
if r.Method != http.MethodPost {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}
const (
maxRequestBodySize = 1 << 20 // 1MB
)
// 在处理请求时添加大小限制
body, err := io.ReadAll(io.LimitReader(r.Body, maxRequestBodySize))
if err != nil {
log.Println("Failed to read request body:", err)
http.Error(w, "Failed to read request body", http.StatusBadRequest)
return
}
defer r.Body.Close()
var offerReq OfferRequest
if err := json.Unmarshal(body, &offerReq); err != nil {
log.Println("Failed to parse request:", err)
http.Error(w, "Invalid request format", http.StatusBadRequest)
return
}
// 添加流配置验证
if err := validateStreamConfig(offerReq.StreamConfig); err != nil {
log.Printf("Invalid stream config: %v", err)
http.Error(w, fmt.Sprintf("Invalid stream config: %v", err), http.StatusBadRequest)
return
}
// 使用缓存储或检某些数据(例如,已处理的流)
if cfg.Cache.Enabled {
cacheKey := fmt.Sprintf("%s_%s", offerReq.StreamConfig.URL, offerReq.StreamConfig.Name)
if _, found := appCache.Get(cacheKey); found {
log.Printf("Stream %s is already being processed\n", offerReq.StreamConfig.Name)
http.Error(w, "Stream already being processed", http.StatusConflict)
<-activeStreams // 释放令牌
return
}
// 使用新的缓存键
appCache.Set(cacheKey, true, cache.DefaultExpiration)
}
// 在处理请求前先测试 RTSP 流是否可访问
if err := testRTSPStream(offerReq.StreamConfig.URL); err != nil {
log.Printf("RTSP stream test failed: %v", err)
http.Error(w, fmt.Sprintf("RTSP stream test failed: %v", err), http.StatusBadRequest)
return
}
configuration := webrtc.Configuration{}
peerConnection, err := api.NewPeerConnection(configuration)
if err != nil {
log.Println("Failed to create peer connection:", err)
http.Error(w, "Failed to create peer connection", http.StatusInternalServerError)
if cfg.Cache.Enabled {
appCache.Delete(offerReq.StreamConfig.Name) // 处理失败,移除缓存
}
return
}
log.Printf("[%s] Peer connection created successfully", requestID)
var wg sync.WaitGroup
done := make(chan struct{})
cmdDone := make(chan struct{})
var cmd *exec.Cmd
var cmdMutex sync.Mutex
var cleanupOnce sync.Once
var cleanupMutex sync.Mutex
isCleanedUp := false
cleanup := func() {
cleanupOnce.Do(func() {
cleanupMutex.Lock()
if isCleanedUp {
cleanupMutex.Unlock()
return
}
isCleanedUp = true
cleanupMutex.Unlock()
log.Printf("Cleanup triggered for stream: %s\n", offerReq.StreamConfig.URL)
// 使用相同的缓存键进行删除
if cfg.Cache.Enabled {
cacheKey := fmt.Sprintf("%s_%s", offerReq.StreamConfig.URL, offerReq.StreamConfig.Name)
appCache.Delete(cacheKey)
}
select {
case <-done:
// channel 已经关闭
default:
close(done)
}
// 等待 FFmpeg 处理完成
select {
case <-cmdDone:
case <-time.After(5 * time.Second):
log.Println("FFmpeg cleanup timed out")
}
wg.Wait()
if err := peerConnection.Close(); err != nil {
log.Printf("Failed to close peer connection: %v", err)
}
if cfg.Cache.Enabled {
appCache.Delete(offerReq.StreamConfig.Name)
}
// 确 FFmpeg 进程被终止
cmdMutex.Lock()
if cmd != nil && cmd.Process != nil {
if err := cmd.Process.Kill(); err != nil {
if !strings.Contains(err.Error(), "process already finished") {
log.Printf("Error killing FFmpeg process: %v", err)
}
}
// 等待进程完全退出
if err := cmd.Wait(); err != nil {
log.Printf("Error waiting for FFmpeg process to exit: %v", err)
}
}
cmdMutex.Unlock()
log.Printf("Cleanup completed for stream: %s\n", offerReq.StreamConfig.URL)
})
}
// 修改这里:使用符合规范的 streamID
streamID := fmt.Sprintf("stream_%s", strings.ReplaceAll(offerReq.StreamConfig.Name, " ", "_"))
videoTrack, err := webrtc.NewTrackLocalStaticSample(
webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeH264},
"video",
streamID,
)
if err != nil {
log.Println("Failed to create video track:", err)
cleanup()
http.Error(w, "Failed to create video track", http.StatusInternalServerError)
return
}
rtpSender, err := peerConnection.AddTrack(videoTrack)
if err != nil {
log.Println("Failed to add video track:", err)
cleanup()
http.Error(w, "Failed to add video track", http.StatusInternalServerError)
return
}
wg.Add(1)
go func() {
defer wg.Done()
rtcpBuf := make([]byte, 1500)
for {
select {
case <-done:
return
default:
if _, _, rtcpErr := rtpSender.Read(rtcpBuf); rtcpErr != nil {
if rtcpErr != io.EOF {
log.Printf("rtcp error: %v", rtcpErr)
}
return
}
}
}
}()
peerConnection.OnConnectionStateChange(func(s webrtc.PeerConnectionState) {
log.Printf("Peer Connection State has changed to %s for stream: %s\n", s.String(), offerReq.StreamConfig.URL)
switch s {
case webrtc.PeerConnectionStateFailed,
webrtc.PeerConnectionStateClosed,
webrtc.PeerConnectionStateDisconnected:
log.Printf("Peer Connection %s, cleaning up...\n", s.String())
cleanup()
}
})
peerConnection.OnICEConnectionStateChange(func(connectionState webrtc.ICEConnectionState) {
log.Printf("ICE Connection State has changed: %s\n", connectionState.String())
})
log.Printf("[%s] Setting remote description", requestID)
if err = peerConnection.SetRemoteDescription(offerReq.Offer); err != nil {
log.Printf("[%s] Failed to set remote description: %v", requestID, err)
cleanup()
http.Error(w, fmt.Sprintf("Failed to set remote description: %v", err), http.StatusInternalServerError)
return
}
log.Printf("[%s] Remote description set successfully", requestID)
log.Printf("[%s] Creating answer", requestID)
answer, err := peerConnection.CreateAnswer(nil)
if err != nil {
log.Printf("[%s] Failed to create answer: %v", requestID, err)
cleanup()
http.Error(w, fmt.Sprintf("Failed to create answer: %v", err), http.StatusInternalServerError)
return
}
log.Printf("[%s] Answer created successfully", requestID)
log.Printf("[%s] Setting local description", requestID)
if err = peerConnection.SetLocalDescription(answer); err != nil {
log.Printf("[%s] Failed to set local description: %v", requestID, err)
cleanup()
http.Error(w, fmt.Sprintf("Failed to set local description: %v", err), http.StatusInternalServerError)
return
}
log.Printf("[%s] Local description set successfully", requestID)
gatherComplete := webrtc.GatheringCompletePromise(peerConnection)
select {
case <-gatherComplete:
log.Println("ICE gathering completed")
case <-time.After(3 * time.Second):
log.Println("ICE gathering timed out")
cleanup()
http.Error(w, "ICE gathering timed out", http.StatusInternalServerError)
return
}
response := peerConnection.LocalDescription()
w.Header().Set("Content-Type", "application/json")
if err := json.NewEncoder(w).Encode(response); err != nil {
log.Println("Failed to encode response:", err)
cleanup()
http.Error(w, "Failed to encode response", http.StatusInternalServerError)
return
}
wg.Add(1)
go func() {
defer wg.Done()
defer func() {
select {
case <-cmdDone:
default:
close(cmdDone)
}
// 释放流资源
<-activeStreams
streamsMutex.Lock()
activeStreamCount--
activeStreamsGauge.Set(float64(activeStreamCount))
streamsMutex.Unlock()
}()
// 使用带缓冲的管道来控制数据流
bufferPool := sync.Pool{
New: func() interface{} {
return make([]byte, 1024*1024)
},
}
startTime := time.Now()
defer func() {
duration := time.Since(startTime).Seconds()
streamLatencyHistogram.WithLabelValues(offerReq.StreamConfig.Name).Observe(duration)
}()
cmdMutex.Lock()
cmd = exec.CommandContext(ctx, "ffmpeg",
"-i", offerReq.StreamConfig.URL,
"-c:v", "libx264",
"-preset", "ultrafast",
"-tune", "zerolatency",
"-profile:v", "baseline",
"-level", "3.0",
"-x264-params", "keyint=30:min-keyint=30:scenecut=0:bframes=0",
"-b:v", "1000k",
"-maxrate", "1500k",
"-bufsize", "2000k",
"-r", "25",
"-g", "30",
"-threads", "4",
"-f", "h264",
"-pix_fmt", "yuv420p",
"-vf", "scale=640:480",
"-movflags", "+faststart",
"-")
cmdMutex.Unlock()
// 添加错误输出捕获
var errBuf bytes.Buffer
cmd.Stderr = &errBuf
ffmpegStdout, err := cmd.StdoutPipe()
if err != nil {
log.Printf("Failed to create stdout pipe: %v", err)
return
}
if err := cmd.Start(); err != nil {
ffmpegErrorsCounter.WithLabelValues(offerReq.StreamConfig.Name).Inc()
log.Printf("Failed to start FFmpeg: %v\nFFmpeg error: %s", err, errBuf.String())
return
}
reader := bufio.NewReaderSize(ffmpegStdout, 8*1024*1024) // 使用8MB缓冲区
var lastFrameTime time.Time
// 在 FFmpeg 处理部分添加帧控制
targetFPS := cfg.FFmpeg.FrameRate
frameInterval := time.Second / time.Duration(targetFPS)
// 在视频处理循环中添加帧率控制
for {
select {
case <-ctx.Done():
log.Printf("[%s] Context cancelled", streamID)
return
case <-done:
log.Printf("[%s] Done signal received", streamID)
return
default:
// 控制帧率
elapsed := time.Since(lastFrameTime)
if elapsed < frameInterval {
time.Sleep(frameInterval - elapsed)
}
lastFrameTime = time.Now()
buffer := bufferPool.Get().([]byte)
n, err := reader.Read(buffer)
if err != nil {
bufferPool.Put(buffer)
if err != io.EOF {
log.Printf("[%s] Error reading from FFmpeg: %v", streamID, err)
}
return
}
writeCtx, writeCancel := context.WithTimeout(ctx, cfg.Server.WriteTimeout)
select {
case <-writeCtx.Done():
writeCancel()
bufferPool.Put(buffer)
log.Printf("[%s] Write timeout", streamID)
return
default:
err = videoTrack.WriteSample(media.Sample{
Data: buffer[:n],
Duration: frameInterval,
})
writeCancel()
bufferPool.Put(buffer)
if err != nil {
log.Printf("[%s] Error writing video sample: %v", streamID, err)
return
}
}
}
}
// 在函数结束时清理 FFmpeg 进程
defer func() {
cmdMutex.Lock()
if cmd != nil && cmd.Process != nil {
if err := cmd.Process.Kill(); err != nil {
if !strings.Contains(err.Error(), "process already finished") {
log.Printf("Error killing process: %v", err)
}
}
cmd.Wait()
}
cmdMutex.Unlock()
}()
}()
})
type HealthStatus struct {
Status string `json:"status"`
ActiveStreams int `json:"active_streams"`
MaxStreams int `json:"max_streams"`
MemStats runtime.MemStats `json:"mem_stats"`
FFmpegStatus string `json:"ffmpeg_status"`
UptimeSeconds int64 `json:"uptime_seconds"`
Version string `json:"version"`
}
var startTime = time.Now()
http.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) {
streamsMutex.RLock()
health := HealthStatus{
Status: "healthy",
ActiveStreams: activeStreamCount,
MaxStreams: cfg.FFmpeg.MaxConcurrentStreams,
UptimeSeconds: int64(time.Since(startTime).Seconds()),
Version: "1.0.0", // 添加版本信息
}
streamsMutex.RUnlock()
// 检查 FFmpeg 是否可用
cmd := exec.Command("ffmpeg", "-version")
if err := cmd.Run(); err != nil {
health.FFmpegStatus = "unavailable"
health.Status = "degraded"
} else {
health.FFmpegStatus = "available"
}
runtime.ReadMemStats(&health.MemStats)
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(health)
})
// 修改中间件链
handler := chainMiddleware(
mux,
recoveryMiddleware,
corsMiddleware, // 添加 CORS 中间件
rateLimitMiddleware,
metricsMiddleware,
securityHeaders,
)
server := &http.Server{
Addr: cfg.Server.Port,
Handler: handler,
ReadTimeout: cfg.Server.ReadTimeout,
WriteTimeout: cfg.Server.WriteTimeout,
MaxHeaderBytes: cfg.Server.MaxHeaderBytes,
}
go func() {
<-ctx.Done()
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 30*time.Second)
defer shutdownCancel()
server.Shutdown(shutdownCtx)
}()
log.Printf("WebRTC Server running at %s\n", cfg.Server.Port)
if err := server.ListenAndServe(); err != http.ErrServerClosed {
log.Fatal("Failed to start HTTP server:", err)
}
}
func validateStreamConfig(cfg StreamConfig) error {
if cfg.Name == "" {
return fmt.Errorf("stream name is required")
}
if cfg.URL == "" {
return fmt.Errorf("stream URL is required")
}
// 验证 URL 格式
if _, err := url.Parse(cfg.URL); err != nil {
return fmt.Errorf("invalid stream URL: %v", err)
}
return nil
}
// 添加速率限制中间件
func rateLimitMiddleware(next http.Handler) http.Handler {
limiter := rate.NewLimiter(rate.Every(time.Second), 100) // 每秒100个请求
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if !limiter.Allow() {
http.Error(w, "Too many requests", http.StatusTooManyRequests)
return
}
next.ServeHTTP(w, r)
})
}
// 在处理请求前先测试 RTSP 流是否可访问
func testRTSPStream(url string) error {
cmd := exec.Command("ffprobe",
"-v", "error",
"-select_streams", "v:0", // 只选择第一个视频流
"-show_entries", "stream=width,height,codec_name,r_frame_rate",
"-of", "json",
"-i", url)
output, err := cmd.CombinedOutput()
if err != nil {
return fmt.Errorf("failed to connect to RTSP stream: %v, output: %s", err, string(output))
}
log.Printf("RTSP stream info: %s", string(output))
return nil
}
// 添加中间件链函数
func chainMiddleware(handler http.Handler, middlewares ...func(http.Handler) http.Handler) http.Handler {
for _, middleware := range middlewares {
handler = middleware(handler)
}
return handler
}
// 添加请求追踪中间件
func metricsMiddleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
start := time.Now()
// 包装 ResponseWriter 以捕获状态码
wrapped := &responseWriter{ResponseWriter: w, status: http.StatusOK}
next.ServeHTTP(wrapped, r)
duration := time.Since(start).Seconds()
requestDurationHistogram.WithLabelValues(
r.URL.Path,
r.Method,
fmt.Sprintf("%d", wrapped.status),
).Observe(duration)
// 更新内存使用指标
var m runtime.MemStats
runtime.ReadMemStats(&m)
memoryUsageGauge.Set(float64(m.Alloc))
})
}
// 添加 ResponseWriter 包装器
type responseWriter struct {
http.ResponseWriter
status int
}
func (rw *responseWriter) WriteHeader(code int) {
rw.status = code
rw.ResponseWriter.WriteHeader(code)
}

BIN
stream_server.exe Normal file

Binary file not shown.

201
test.html Normal file
View File

@ -0,0 +1,201 @@
<!DOCTYPE html>
<html>
<head>
<title>Multi-Stream WebRTC RTSP Player</title>
<style>
.video-container {
display: grid;
grid-template-columns: repeat(auto-fit, minmax(320px, 1fr));
gap: 10px;
padding: 10px;
}
.video-wrapper {
position: relative;
}
video {
width: 100%;
height: auto;
border: 1px solid #ccc;
}
.stream-name {
position: absolute;
top: 10px;
left: 10px;
background: rgba(0,0,0,0.7);
color: white;
padding: 5px;
border-radius: 3px;
}
.stream-status {
position: absolute;
bottom: 10px;
right: 10px;
background: rgba(0,0,0,0.7);
color: white;
padding: 5px;
border-radius: 3px;
}
</style>
</head>
<body>
<div class="video-container" id="videoContainer"></div>
<script>
// 配置多个流
const streams = [
{ url: 'rtsp://10.0.0.17:8554/camera_test/2', name: 'Camera_2_A' },
{ url: 'rtsp://10.0.0.17:8554/camera_test/2', name: 'Camera_2_B' },
{ url: 'rtsp://10.0.0.17:8554/camera_test/2', name: 'Camera_2_C' },
// 添加更多流
];
function createVideoElement(streamConfig) {
const wrapper = document.createElement('div');
wrapper.className = 'video-wrapper';
const video = document.createElement('video');
video.autoplay = true;
video.playsinline = true;
video.id = `video-${streamConfig.name}`;
const nameLabel = document.createElement('div');
nameLabel.className = 'stream-name';
nameLabel.textContent = streamConfig.name;
const statusLabel = document.createElement('div');
statusLabel.className = 'stream-status';
statusLabel.style.position = 'absolute';
statusLabel.style.bottom = '10px';
statusLabel.style.right = '10px';
statusLabel.style.background = 'rgba(0,0,0,0.7)';
statusLabel.style.color = 'white';
statusLabel.style.padding = '5px';
statusLabel.style.borderRadius = '3px';
statusLabel.textContent = 'Connecting...';
wrapper.appendChild(video);
wrapper.appendChild(nameLabel);
wrapper.appendChild(statusLabel);
document.getElementById('videoContainer').appendChild(wrapper);
return { video, statusLabel };
}
async function startStream(streamConfig) {
const { video, statusLabel } = createVideoElement(streamConfig);
const pc = new RTCPeerConnection();
const maxRetries = 3;
let retryCount = 0;
// 添加更详细的连接状态监控
pc.onconnectionstatechange = () => {
const state = pc.connectionState;
statusLabel.textContent = `Connection: ${state}`;
console.log(`[${streamConfig.name}] Connection state changed to: ${state}`);
};
pc.oniceconnectionstatechange = () => {
const state = pc.iceConnectionState;
console.log(`[${streamConfig.name}] ICE state changed to: ${state}`);
};
pc.onicegatheringstatechange = () => {
console.log(`[${streamConfig.name}] ICE gathering state: ${pc.iceGatheringState}`);
};
pc.onicecandidate = event => {
if (event.candidate) {
console.log(`[${streamConfig.name}] New ICE candidate: ${event.candidate.candidate}`);
}
};
// 添加更详细的轨道监控
pc.ontrack = (event) => {
console.log(`[${streamConfig.name}] Track received:`, event.track);
console.log(`Track settings:`, event.track.getSettings());
console.log(`Track constraints:`, event.track.getConstraints());
if (event.track.kind === 'video') {
video.srcObject = event.streams[0];
video.onloadedmetadata = () => {
console.log(`[${streamConfig.name}] Video metadata loaded`);
statusLabel.textContent = 'Playing';
video.play().catch(e => {
console.error(`[${streamConfig.name}] Play error:`, e);
statusLabel.textContent = 'Play failed';
});
};
// 添加视频元素事件监听
video.onplay = () => console.log(`[${streamConfig.name}] Video started playing`);
video.onpause = () => console.log(`[${streamConfig.name}] Video paused`);
video.onwaiting = () => console.log(`[${streamConfig.name}] Video buffering`);
video.onerror = () => console.log(`[${streamConfig.name}] Video error:`, video.error);
}
};
const tryConnect = async () => {
try {
statusLabel.textContent = 'Creating offer...';
const offer = await pc.createOffer({
offerToReceiveVideo: true,
offerToReceiveAudio: false
});
statusLabel.textContent = 'Setting local description...';
await pc.setLocalDescription(offer);
statusLabel.textContent = 'Sending offer to server...';
const response = await fetch('http://localhost:8080/offer', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify({
offer: offer,
streamConfig: streamConfig
})
});
if (!response.ok) {
const errorText = await response.text();
throw new Error(`HTTP error! status: ${response.status}, message: ${errorText}`);
}
statusLabel.textContent = 'Processing server response...';
const answer = await response.json();
statusLabel.textContent = 'Setting remote description...';
await pc.setRemoteDescription(answer);
console.log(`[${streamConfig.name}] Connection setup completed`);
} catch (e) {
console.error(`[${streamConfig.name}] Error:`, e);
statusLabel.textContent = `Error: ${e.message}`;
if (retryCount < maxRetries) {
retryCount++;
statusLabel.textContent = `Retrying (${retryCount}/${maxRetries})...`;
await new Promise(resolve => setTimeout(resolve, 2000)); // 等待2秒后重试
return tryConnect();
}
throw e;
}
};
try {
await tryConnect();
} catch (e) {
statusLabel.textContent = 'Failed to connect';
console.error(`[${streamConfig.name}] Final error:`, e);
}
}
// 启动所有流
streams.forEach(stream => startStream(stream));
</script>
</body>
</html>