修改bug
This commit is contained in:
parent
c3c0ff380e
commit
644fb590fa
@ -150,3 +150,158 @@ ss -lntp | grep ':8554'
|
||||
|
||||
- `[rtsp] decoding for stream 0 failed`:FFmpeg 在 RTSP 抖动/重连/首包阶段可能出现(但你后续已经推理、报警,说明最终还是拿到了帧)。
|
||||
- `deprecated pixel format used`:swscale 对某些 YUV 范围提示,通常不影响功能验证。
|
||||
|
||||
---
|
||||
|
||||
## 7. 测试通过后的下一步计划(多路并发 + 分辨率自适应)
|
||||
|
||||
### 7.1 固化“基线验收标准”(先定义通过=什么)
|
||||
|
||||
- 每路 `total_fps` ≥ 目标帧率的 90%
|
||||
- RTSP 输出可连续播放 N 分钟不断流
|
||||
- 报警触发次数符合预期,MinIO 中 `.jpg/.mp4` 可用且数量/大小合理
|
||||
- CPU/NPU/内存占用在可接受范围(尤其是内存不持续增长)
|
||||
|
||||
### 7.2 多路压测:按 2→4→8 逐步加路,不要一次拉满
|
||||
|
||||
- 每加一档路数就记录:每路 fps、掉帧、端到端延迟(拉流→输出)、报警/上传是否还能跟上
|
||||
- 发现瓶颈后先定位/修复,再进入下一档
|
||||
|
||||
### 7.3 逐步切回硬件链路(性能优化主线,按顺序恢复)
|
||||
|
||||
当前为了稳定跑通(避免 CMA/DMA 申请失败)使用了 CPU 路径:`use_mpp=false`、`use_rga=false`。
|
||||
|
||||
建议按“阶段化 + 验收 + 失败回退”来做,每次只动一个开关:
|
||||
|
||||
#### 阶段 0:固定稳定基线(可随时回退点)
|
||||
|
||||
- `input_rtsp.use_mpp=false` + `input_rtsp.use_ffmpeg=true`(FFmpeg CPU 解码)
|
||||
- `pre_cam1.use_rga=false`、`post_cam1.use_rga=false`(swscale)
|
||||
|
||||
验收:fps、报警、上传、RTSP 输出都稳定(建议至少跑 10~30 分钟)。
|
||||
|
||||
#### 阶段 1:先恢复输入硬解(只改一处)
|
||||
|
||||
目标:把输入从 CPU 解码切到 **ffmpeg demux + mpp decode**。
|
||||
|
||||
配置改动(示例):
|
||||
|
||||
```json
|
||||
{
|
||||
"id": "in_cam1",
|
||||
"type": "input_rtsp",
|
||||
"use_mpp": true,
|
||||
"use_ffmpeg": false
|
||||
}
|
||||
```
|
||||
|
||||
说明:`use_mpp=true` 会走 “FFmpeg 解复用 + MPP 解码”;为避免歧义建议把 `use_ffmpeg=false`(否则会同时存在两条可选路径)。
|
||||
|
||||
验收点:
|
||||
|
||||
- 启动日志出现:`(ffmpeg demux + mpp decode)`
|
||||
- `/api/graphs/<graph>` 里 `total_fps` 稳定
|
||||
- 对比基线:CPU 降、温度/功耗更稳(多路时收益更明显)
|
||||
|
||||
失败判定与回退:
|
||||
|
||||
- 若出现 “requested ffmpeg/mpp but not enabled at build time” 或持续拿不到帧:先回退到 `use_mpp=false`,确认编译选项/依赖再继续。
|
||||
|
||||
#### 阶段 2:再恢复 RGA(建议分两步)
|
||||
|
||||
目标:把缩放/色彩转换从 swscale 切到 RGA。
|
||||
|
||||
建议分两步开:
|
||||
|
||||
1) 先只开 `pre_cam1.use_rga=true`(AI 前处理)
|
||||
2) 稳定后再开 `post_cam1.use_rga=true`(输出/编码前处理)
|
||||
|
||||
配置改动(示例):
|
||||
|
||||
```json
|
||||
{ "id": "pre_cam1", "type": "preprocess", "use_rga": true }
|
||||
{ "id": "post_cam1", "type": "preprocess", "use_rga": true }
|
||||
```
|
||||
|
||||
验收点:
|
||||
|
||||
- preprocess 启动日志从 `(swscale)` 变为 `(rga)`
|
||||
- 无以下报错/告警(任意出现都算“资源不足/路径退化”,需要先处理再继续):
|
||||
- `[DmaAlloc] DMA_HEAP_IOCTL_ALLOC failed` / `failed to open dma_heap device`
|
||||
- `[preprocess] DMA alloc failed`
|
||||
- AI 输入尺寸/格式正确(例如 `pre_cam1` 要输出 `dst_format=rgb`),不能出现“RGA 失败直接 passthrough 导致下游格式/尺寸不匹配”。
|
||||
|
||||
若再次出现 CMA/DMA 不足:
|
||||
|
||||
- 系统层:增大 CMA(内核启动参数/设备树),保证 `/dev/dma_heap/cma` 有足够空间
|
||||
- 代码层:做 DMA buffer 复用/池化(当前 RGA 路径会频繁 `DmaAlloc()`,多路更容易打爆)
|
||||
|
||||
回退策略:
|
||||
|
||||
- 先回退 `post_cam1.use_rga=false`,再回退 `pre_cam1.use_rga=false`
|
||||
- 若仍不稳,再回退 `input_rtsp.use_mpp=false` 回到阶段 0
|
||||
|
||||
### 7.4 降低报警/上传对多路性能的影响(多路必须做)
|
||||
|
||||
多路压测时,原则是:**先把触发频率降下来**,再控制媒体时长,最后再看上传/队列的背压与丢弃策略。
|
||||
|
||||
#### 1) 规则侧:避免“每帧触发”
|
||||
|
||||
可直接在 `alarm.rules[]` 里调:
|
||||
|
||||
- `min_duration_ms`:设为非 0(例如 30fps 下 `300ms≈9 帧`,`500ms≈15 帧`)
|
||||
- `cooldown_ms`:适当加大(例如 10s/30s),避免同一目标持续刷屏
|
||||
- `roi`:缩小区域(减少无效触发与后续媒体/上传开销)
|
||||
|
||||
示例:
|
||||
|
||||
```json
|
||||
{
|
||||
"name": "person_in_view",
|
||||
"class_ids": [0],
|
||||
"roi": { "x": 0.2, "y": 0.2, "w": 0.6, "h": 0.6 },
|
||||
"min_duration_ms": 500,
|
||||
"cooldown_ms": 10000
|
||||
}
|
||||
```
|
||||
|
||||
#### 2) 媒体侧:先压测 snapshot,再开 clip
|
||||
|
||||
- 压测阶段建议先 `clip.enable=false`,只开 `snapshot`,先把“检测/触发/上传链路”跑稳。
|
||||
- 需要 clip 时,尽量缩短:`clip.pre_sec/post_sec`。
|
||||
|
||||
备注:当前 clip 动作会等待 `post_sec`(内部会 `sleep(post_sec)` 以收集后置窗口帧),触发太频繁会导致动作堆积、事件队列更容易丢。
|
||||
|
||||
#### 3) 队列侧:区分“图流水线队列”与“报警事件队列”
|
||||
|
||||
- 顶层 `queue`:影响整条图的帧流转(你现在默认 `drop_oldest`)。多路时建议保持丢帧优先(而不是阻塞),避免某一路慢把全局拖死。
|
||||
- `alarm.event_queue`(可选):只影响报警动作执行(snapshot/clip/http 等),不会直接阻塞图处理(除非你把策略设成 `block`)。
|
||||
|
||||
建议给 `alarm` 增加 `event_queue` 来限制动作压力(示例):
|
||||
|
||||
```json
|
||||
{
|
||||
"id": "alarm_cam1",
|
||||
"type": "alarm",
|
||||
"event_queue": { "size": 64, "strategy": "drop_oldest" }
|
||||
}
|
||||
```
|
||||
|
||||
说明:
|
||||
|
||||
- `strategy` 支持:`drop_oldest` / `drop_newest` / `block`
|
||||
- 多路压测阶段不建议 `block`(可能把触发线程卡住,反而放大抖动)
|
||||
|
||||
#### 4) 上传/通知侧:超时与失败策略要心里有数
|
||||
|
||||
- MinIO 上传:当前是同步上传(snapshot/clip 都会直接调用 uploader),失败会打印 `upload failed: ...`,但未看到通用的“自动重试/退避”配置项。
|
||||
- 若使用 presign:可配置 `presign_endpoint` + `presign_timeout_ms`(默认 3000ms)。
|
||||
- HTTP 通知:内部有后台线程与队列,但同样没有重试语义;压测时建议先关 `http.enable`,等主链路稳了再开。
|
||||
|
||||
压测结论建议记录:每路触发频率(每分钟/每小时)、snapshot/clip 成功率、以及队列丢弃(若有)——否则很难判断“算力瓶颈”还是“报警/上传侧把系统拖慢”。
|
||||
|
||||
### 7.5 工程化:可重复部署 + 可观测 + 长稳
|
||||
|
||||
- 固化启动/停止流程(端口冲突处理、配置热更策略)
|
||||
- 用 `/api/graphs` 做在线监控(fps、队列、alarm_total、publish_clients)并接入现有监控
|
||||
- 做 24h 长稳(内存、线程数、句柄、CMA 变化、断流重连)
|
||||
|
||||
@ -13,8 +13,8 @@
|
||||
"fps": 30,
|
||||
"width": 1280,
|
||||
"height": 720,
|
||||
"use_mpp": false,
|
||||
"use_ffmpeg": true,
|
||||
"use_mpp": true,
|
||||
"use_ffmpeg": false,
|
||||
"force_tcp": true,
|
||||
"reconnect_sec": 5,
|
||||
"reconnect_backoff_max_sec": 30
|
||||
@ -28,7 +28,7 @@
|
||||
"dst_h": 640,
|
||||
"dst_format": "rgb",
|
||||
"keep_ratio": false,
|
||||
"use_rga": false
|
||||
"use_rga": true
|
||||
},
|
||||
{
|
||||
"id": "ai_cam1",
|
||||
@ -62,7 +62,7 @@
|
||||
"dst_h": 720,
|
||||
"dst_format": "nv12",
|
||||
"keep_ratio": false,
|
||||
"use_rga": false
|
||||
"use_rga": true
|
||||
},
|
||||
{
|
||||
"id": "alarm_cam1",
|
||||
|
||||
@ -1,5 +1,6 @@
|
||||
#include "http_action.h"
|
||||
|
||||
#include <algorithm>
|
||||
#include <chrono>
|
||||
#include <ctime>
|
||||
#include <iostream>
|
||||
@ -25,6 +26,11 @@ bool HttpAction::Init(const SimpleJson& config) {
|
||||
include_media_url_ = config.ValueOr<bool>("include_media_url", true);
|
||||
method_ = config.ValueOr<std::string>("method", "POST");
|
||||
|
||||
max_queue_size_ = static_cast<size_t>(std::max(0, config.ValueOr<int>("max_queue_size", static_cast<int>(max_queue_size_))));
|
||||
const std::string policy = config.ValueOr<std::string>("queue_policy", "drop_oldest");
|
||||
if (policy == "drop_newest") queue_policy_ = QueuePolicy::DropNewest;
|
||||
else queue_policy_ = QueuePolicy::DropOldest;
|
||||
|
||||
if (url_.empty()) {
|
||||
std::cerr << "[HttpAction] url is required\n";
|
||||
return false;
|
||||
@ -76,7 +82,17 @@ void HttpAction::Execute(AlarmEvent& event, std::shared_ptr<Frame> /*frame*/) {
|
||||
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(queue_mutex_);
|
||||
request_queue_.push(json_body);
|
||||
if (max_queue_size_ > 0 && request_queue_.size() >= max_queue_size_) {
|
||||
++dropped_total_;
|
||||
if (queue_policy_ == QueuePolicy::DropOldest) {
|
||||
request_queue_.pop();
|
||||
request_queue_.push(json_body);
|
||||
} else {
|
||||
// DropNewest: drop this request
|
||||
}
|
||||
} else {
|
||||
request_queue_.push(json_body);
|
||||
}
|
||||
}
|
||||
queue_cv_.notify_one();
|
||||
}
|
||||
|
||||
@ -23,6 +23,11 @@ private:
|
||||
void WorkerLoop();
|
||||
bool SendRequest(const std::string& json_body);
|
||||
|
||||
enum class QueuePolicy {
|
||||
DropOldest,
|
||||
DropNewest,
|
||||
};
|
||||
|
||||
std::string url_;
|
||||
int timeout_ms_ = 3000;
|
||||
bool include_media_url_ = true;
|
||||
@ -34,6 +39,10 @@ private:
|
||||
std::condition_variable queue_cv_;
|
||||
std::queue<std::string> request_queue_;
|
||||
size_t in_flight_ = 0;
|
||||
|
||||
size_t max_queue_size_ = 0; // 0 = unlimited
|
||||
QueuePolicy queue_policy_ = QueuePolicy::DropOldest;
|
||||
uint64_t dropped_total_ = 0;
|
||||
};
|
||||
|
||||
} // namespace rk3588
|
||||
|
||||
@ -1,12 +1,14 @@
|
||||
#include "minio_uploader.h"
|
||||
|
||||
#include <algorithm>
|
||||
#include <chrono>
|
||||
#include <cstdlib>
|
||||
#include <cstring>
|
||||
#include <ctime>
|
||||
#include <iomanip>
|
||||
#include <iostream>
|
||||
#include <sstream>
|
||||
#include <thread>
|
||||
#include <vector>
|
||||
|
||||
#include "utils/simple_json_writer.h"
|
||||
@ -290,6 +292,11 @@ bool BuildS3SigV4Headers(const std::string& endpoint,
|
||||
}
|
||||
#endif
|
||||
|
||||
bool IsRetryableHttp(long http_code) {
|
||||
if (http_code == 408 || http_code == 429) return true;
|
||||
return http_code >= 500;
|
||||
}
|
||||
|
||||
} // namespace
|
||||
|
||||
bool MinioUploader::Init(const SimpleJson& config) {
|
||||
@ -300,6 +307,13 @@ bool MinioUploader::Init(const SimpleJson& config) {
|
||||
presign_endpoint_ = config.ValueOr<std::string>("presign_endpoint", "");
|
||||
presign_timeout_ms_ = config.ValueOr<int>("presign_timeout_ms", presign_timeout_ms_);
|
||||
|
||||
max_retries_ = std::max(0, config.ValueOr<int>("max_retries", max_retries_));
|
||||
retry_backoff_ms_ = std::max(0, config.ValueOr<int>("retry_backoff_ms", retry_backoff_ms_));
|
||||
retry_backoff_max_ms_ = std::max(retry_backoff_ms_, config.ValueOr<int>("retry_backoff_max_ms", retry_backoff_max_ms_));
|
||||
retry_jitter_ms_ = std::max(0, config.ValueOr<int>("retry_jitter_ms", retry_jitter_ms_));
|
||||
upload_timeout_sec_ = std::max(1, config.ValueOr<int>("upload_timeout_sec", upload_timeout_sec_));
|
||||
upload_connect_timeout_ms_ = std::max(0, config.ValueOr<int>("upload_connect_timeout_ms", upload_connect_timeout_ms_));
|
||||
|
||||
std::string ak = config.ValueOr<std::string>("access_key", "");
|
||||
std::string sk = config.ValueOr<std::string>("secret_key", "");
|
||||
|
||||
@ -330,103 +344,129 @@ UploadResult MinioUploader::Upload(const std::string& key,
|
||||
#if HAS_CURL
|
||||
const std::string ct = content_type.empty() ? "application/octet-stream" : content_type;
|
||||
|
||||
PresignResult presigned;
|
||||
std::string upload_url;
|
||||
std::vector<std::string> extra_headers;
|
||||
if (!presign_endpoint_.empty()) {
|
||||
std::string perr;
|
||||
if (!GetPresignedPutUrl(presign_endpoint_, bucket_, key, ct, size, presign_timeout_ms_, presigned, perr)) {
|
||||
result.error = perr;
|
||||
return result;
|
||||
const int max_attempts = std::max(1, max_retries_ + 1);
|
||||
for (int attempt = 0; attempt < max_attempts; ++attempt) {
|
||||
if (attempt > 0) {
|
||||
int64_t backoff = retry_backoff_ms_;
|
||||
if (attempt > 1) {
|
||||
const int shift = attempt - 1;
|
||||
const int64_t exp = (shift >= 30) ? static_cast<int64_t>(retry_backoff_max_ms_) : (static_cast<int64_t>(retry_backoff_ms_) << shift);
|
||||
backoff = std::min<int64_t>(retry_backoff_max_ms_, exp);
|
||||
}
|
||||
if (retry_jitter_ms_ > 0) {
|
||||
backoff += (std::rand() % (retry_jitter_ms_ + 1));
|
||||
}
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(static_cast<int>(backoff)));
|
||||
}
|
||||
upload_url = presigned.upload_url;
|
||||
extra_headers = presigned.headers;
|
||||
} else {
|
||||
|
||||
PresignResult presigned;
|
||||
std::string upload_url;
|
||||
std::vector<std::string> extra_headers;
|
||||
if (!presign_endpoint_.empty()) {
|
||||
std::string perr;
|
||||
if (!GetPresignedPutUrl(presign_endpoint_, bucket_, key, ct, size, presign_timeout_ms_, presigned, perr)) {
|
||||
result.error = perr;
|
||||
continue;
|
||||
}
|
||||
upload_url = presigned.upload_url;
|
||||
extra_headers = presigned.headers;
|
||||
} else {
|
||||
#if HAS_OPENSSL
|
||||
if (access_key_.empty() || secret_key_.empty()) {
|
||||
result.error = "missing access_key/secret_key (or configure presign_endpoint)";
|
||||
return result;
|
||||
}
|
||||
const std::string payload_hash = Sha256Hex(data, size);
|
||||
std::string berr;
|
||||
if (!BuildS3SigV4Headers(endpoint_, region_, access_key_, secret_key_, bucket_, key, payload_hash,
|
||||
upload_url, extra_headers, berr)) {
|
||||
result.error = berr;
|
||||
return result;
|
||||
}
|
||||
if (access_key_.empty() || secret_key_.empty()) {
|
||||
result.error = "missing access_key/secret_key (or configure presign_endpoint)";
|
||||
return result;
|
||||
}
|
||||
const std::string payload_hash = Sha256Hex(data, size);
|
||||
std::string berr;
|
||||
if (!BuildS3SigV4Headers(endpoint_, region_, access_key_, secret_key_, bucket_, key, payload_hash,
|
||||
upload_url, extra_headers, berr)) {
|
||||
result.error = berr;
|
||||
continue;
|
||||
}
|
||||
#else
|
||||
result.error = "presign_endpoint is required (or rebuild with OpenSSL to enable SigV4 upload using access_key/secret_key)";
|
||||
return result;
|
||||
result.error = "presign_endpoint is required (or rebuild with OpenSSL to enable SigV4 upload using access_key/secret_key)";
|
||||
return result;
|
||||
#endif
|
||||
}
|
||||
|
||||
if (upload_url.empty()) {
|
||||
result.error = "upload_url is empty";
|
||||
continue;
|
||||
}
|
||||
|
||||
CURL* curl = curl_easy_init();
|
||||
if (!curl) {
|
||||
result.error = "curl_easy_init failed";
|
||||
continue;
|
||||
}
|
||||
|
||||
struct curl_slist* headers = nullptr;
|
||||
headers = curl_slist_append(headers, ("Content-Type: " + ct).c_str());
|
||||
headers = curl_slist_append(headers, "Expect:");
|
||||
for (const auto& h : extra_headers) {
|
||||
headers = curl_slist_append(headers, h.c_str());
|
||||
}
|
||||
|
||||
curl_easy_setopt(curl, CURLOPT_URL, upload_url.c_str());
|
||||
curl_easy_setopt(curl, CURLOPT_UPLOAD, 1L);
|
||||
curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headers);
|
||||
curl_easy_setopt(curl, CURLOPT_INFILESIZE_LARGE, static_cast<curl_off_t>(size));
|
||||
curl_easy_setopt(curl, CURLOPT_TIMEOUT, static_cast<long>(upload_timeout_sec_));
|
||||
curl_easy_setopt(curl, CURLOPT_CONNECTTIMEOUT_MS, static_cast<long>(upload_connect_timeout_ms_));
|
||||
|
||||
struct ReadData {
|
||||
const uint8_t* data;
|
||||
size_t remaining;
|
||||
} read_data{data, size};
|
||||
|
||||
curl_easy_setopt(curl, CURLOPT_READFUNCTION,
|
||||
+[](char* buffer, size_t s, size_t n, void* userp) -> size_t {
|
||||
auto* rd = static_cast<ReadData*>(userp);
|
||||
size_t to_copy = std::min(s * n, rd->remaining);
|
||||
if (to_copy > 0) {
|
||||
std::memcpy(buffer, rd->data, to_copy);
|
||||
rd->data += to_copy;
|
||||
rd->remaining -= to_copy;
|
||||
}
|
||||
return to_copy;
|
||||
});
|
||||
curl_easy_setopt(curl, CURLOPT_READDATA, &read_data);
|
||||
|
||||
CURLcode res = curl_easy_perform(curl);
|
||||
long http_code = 0;
|
||||
curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &http_code);
|
||||
|
||||
curl_slist_free_all(headers);
|
||||
curl_easy_cleanup(curl);
|
||||
|
||||
if (res == CURLE_OK && http_code >= 200 && http_code < 300) {
|
||||
result.success = true;
|
||||
if (!presigned.public_url.empty()) {
|
||||
result.url = presigned.public_url;
|
||||
} else if (!presigned.upload_url.empty()) {
|
||||
result.url = presigned.upload_url;
|
||||
} else {
|
||||
result.url = endpoint_ + "/" + bucket_ + "/" + key;
|
||||
}
|
||||
result.error.clear();
|
||||
return result;
|
||||
}
|
||||
|
||||
if (res != CURLE_OK) {
|
||||
result.error = curl_easy_strerror(res);
|
||||
} else {
|
||||
result.error = "HTTP " + std::to_string(http_code);
|
||||
}
|
||||
|
||||
if (attempt == max_attempts - 1) {
|
||||
return result;
|
||||
}
|
||||
if (res == CURLE_OK && !IsRetryableHttp(http_code)) {
|
||||
return result;
|
||||
}
|
||||
}
|
||||
|
||||
if (upload_url.empty()) {
|
||||
result.error = "upload_url is empty";
|
||||
return result;
|
||||
}
|
||||
|
||||
CURL* curl = curl_easy_init();
|
||||
if (!curl) {
|
||||
result.error = "curl_easy_init failed";
|
||||
return result;
|
||||
}
|
||||
|
||||
struct curl_slist* headers = nullptr;
|
||||
headers = curl_slist_append(headers, ("Content-Type: " + ct).c_str());
|
||||
headers = curl_slist_append(headers, "Expect:");
|
||||
for (const auto& h : extra_headers) {
|
||||
headers = curl_slist_append(headers, h.c_str());
|
||||
}
|
||||
|
||||
curl_easy_setopt(curl, CURLOPT_URL, upload_url.c_str());
|
||||
curl_easy_setopt(curl, CURLOPT_UPLOAD, 1L);
|
||||
curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headers);
|
||||
curl_easy_setopt(curl, CURLOPT_INFILESIZE_LARGE, static_cast<curl_off_t>(size));
|
||||
curl_easy_setopt(curl, CURLOPT_TIMEOUT, 30L);
|
||||
|
||||
// Set up data to send
|
||||
struct ReadData {
|
||||
const uint8_t* data;
|
||||
size_t remaining;
|
||||
} read_data{data, size};
|
||||
|
||||
curl_easy_setopt(curl, CURLOPT_READFUNCTION,
|
||||
+[](char* buffer, size_t s, size_t n, void* userp) -> size_t {
|
||||
auto* rd = static_cast<ReadData*>(userp);
|
||||
size_t to_copy = std::min(s * n, rd->remaining);
|
||||
if (to_copy > 0) {
|
||||
std::memcpy(buffer, rd->data, to_copy);
|
||||
rd->data += to_copy;
|
||||
rd->remaining -= to_copy;
|
||||
}
|
||||
return to_copy;
|
||||
});
|
||||
curl_easy_setopt(curl, CURLOPT_READDATA, &read_data);
|
||||
|
||||
CURLcode res = curl_easy_perform(curl);
|
||||
long http_code = 0;
|
||||
curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &http_code);
|
||||
|
||||
curl_slist_free_all(headers);
|
||||
curl_easy_cleanup(curl);
|
||||
|
||||
if (res != CURLE_OK) {
|
||||
result.error = curl_easy_strerror(res);
|
||||
return result;
|
||||
}
|
||||
if (http_code < 200 || http_code >= 300) {
|
||||
result.error = "HTTP " + std::to_string(http_code);
|
||||
return result;
|
||||
}
|
||||
|
||||
result.success = true;
|
||||
if (!presigned.public_url.empty()) {
|
||||
result.url = presigned.public_url;
|
||||
} else if (!presigned.upload_url.empty()) {
|
||||
result.url = presigned.upload_url;
|
||||
} else {
|
||||
result.url = endpoint_ + "/" + bucket_ + "/" + key;
|
||||
}
|
||||
return result;
|
||||
#else
|
||||
// Stub for Windows
|
||||
std::cout << "[MinioUploader] would upload " << size << " bytes as key=" << key
|
||||
|
||||
@ -27,6 +27,13 @@ private:
|
||||
std::string presign_endpoint_;
|
||||
int presign_timeout_ms_ = 3000;
|
||||
bool use_ssl_ = false;
|
||||
|
||||
int max_retries_ = 0;
|
||||
int retry_backoff_ms_ = 200;
|
||||
int retry_backoff_max_ms_ = 2000;
|
||||
int retry_jitter_ms_ = 50;
|
||||
int upload_timeout_sec_ = 30;
|
||||
int upload_connect_timeout_ms_ = 5000;
|
||||
};
|
||||
|
||||
} // namespace rk3588
|
||||
|
||||
@ -257,6 +257,14 @@ public:
|
||||
}
|
||||
|
||||
private:
|
||||
void FallbackToSwscaleOrPassthrough(FramePtr frame) {
|
||||
#if defined(RK3588_ENABLE_FFMPEG)
|
||||
ProcessSwscale(std::move(frame));
|
||||
#else
|
||||
PushToDownstream(std::move(frame));
|
||||
#endif
|
||||
}
|
||||
|
||||
void PushToDownstream(FramePtr frame) {
|
||||
for (auto& q : output_queues_) {
|
||||
q->Push(frame);
|
||||
@ -312,22 +320,22 @@ private:
|
||||
|
||||
if (src_fmt_rga == RK_FORMAT_UNKNOWN || dst_fmt_rga == RK_FORMAT_UNKNOWN) {
|
||||
std::cerr << "[preprocess] unsupported format for RGA\n";
|
||||
PushToDownstream(frame);
|
||||
FallbackToSwscaleOrPassthrough(frame);
|
||||
return;
|
||||
}
|
||||
|
||||
size_t out_size = CalcImageSizeStrided(dst_wstride, dst_hstride, out_fmt);
|
||||
if (out_size == 0) {
|
||||
std::cerr << "[preprocess] invalid output size for RGA\n";
|
||||
PushToDownstream(frame);
|
||||
FallbackToSwscaleOrPassthrough(frame);
|
||||
return;
|
||||
}
|
||||
|
||||
// Use DMA-BUF allocation to avoid >4GB address issue with RGA
|
||||
auto dma_buf = DmaAlloc(out_size);
|
||||
if (!dma_buf || !dma_buf->valid()) {
|
||||
std::cerr << "[preprocess] DMA alloc failed, falling back to std::vector\n";
|
||||
PushToDownstream(frame);
|
||||
std::cerr << "[preprocess] DMA alloc failed\n";
|
||||
FallbackToSwscaleOrPassthrough(frame);
|
||||
return;
|
||||
}
|
||||
|
||||
@ -354,18 +362,18 @@ private:
|
||||
src_dma_buf = DmaAlloc(src_size);
|
||||
if (!src_dma_buf || !src_dma_buf->valid()) {
|
||||
std::cerr << "[preprocess] DMA alloc for src failed\n";
|
||||
PushToDownstream(frame);
|
||||
FallbackToSwscaleOrPassthrough(frame);
|
||||
return;
|
||||
}
|
||||
if (!CopyToStridedBuffer(*frame, src_dma_buf->data(), src_dma_buf->size, src_wstride, src_hstride)) {
|
||||
std::cerr << "[preprocess] copy src to DMA failed\n";
|
||||
PushToDownstream(frame);
|
||||
FallbackToSwscaleOrPassthrough(frame);
|
||||
return;
|
||||
}
|
||||
src_buf = wrapbuffer_fd_t(src_dma_buf->fd, frame->width, frame->height,
|
||||
src_wstride, src_hstride, src_fmt_rga);
|
||||
} else {
|
||||
PushToDownstream(frame);
|
||||
FallbackToSwscaleOrPassthrough(frame);
|
||||
return;
|
||||
}
|
||||
|
||||
@ -380,7 +388,7 @@ private:
|
||||
auto tmp_dma = DmaAlloc(CalcImageSizeStrided(dst_wstride, dst_hstride, frame->format));
|
||||
if (!tmp_dma || !tmp_dma->valid()) {
|
||||
std::cerr << "[preprocess] DMA alloc for tmp failed\n";
|
||||
PushToDownstream(frame);
|
||||
FallbackToSwscaleOrPassthrough(frame);
|
||||
return;
|
||||
}
|
||||
rga_buffer_t tmp = wrapbuffer_fd_t(tmp_dma->fd, out_w, out_h,
|
||||
@ -397,7 +405,7 @@ private:
|
||||
|
||||
if (status != IM_STATUS_SUCCESS) {
|
||||
std::cerr << "[preprocess] RGA failed: " << imStrError(status) << "\n";
|
||||
PushToDownstream(frame);
|
||||
FallbackToSwscaleOrPassthrough(frame);
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
@ -1,8 +1,12 @@
|
||||
#include "utils/dma_alloc.h"
|
||||
|
||||
#include <cstdlib>
|
||||
#include <cerrno>
|
||||
#include <cstring>
|
||||
#include <iostream>
|
||||
#include <mutex>
|
||||
#include <unordered_map>
|
||||
#include <vector>
|
||||
|
||||
#if defined(__linux__)
|
||||
#include <fcntl.h>
|
||||
@ -37,6 +41,80 @@ struct dma_buf_sync {
|
||||
|
||||
namespace rk3588 {
|
||||
|
||||
namespace {
|
||||
|
||||
struct DmaBufferPool {
|
||||
std::mutex mu;
|
||||
std::unordered_map<size_t, std::vector<DmaBuffer*>> free_by_size;
|
||||
size_t total_bytes = 0;
|
||||
size_t total_buffers = 0;
|
||||
|
||||
bool inited = false;
|
||||
bool enabled = true;
|
||||
size_t max_bytes = 64ULL * 1024ULL * 1024ULL;
|
||||
size_t max_buffers = 16;
|
||||
|
||||
void InitOnce() {
|
||||
if (inited) return;
|
||||
inited = true;
|
||||
if (const char* dis = std::getenv("RK3588_DMA_POOL_DISABLE")) {
|
||||
if (std::strcmp(dis, "1") == 0 || std::strcmp(dis, "true") == 0) enabled = false;
|
||||
}
|
||||
if (const char* mb = std::getenv("RK3588_DMA_POOL_MAX_BYTES")) {
|
||||
const long long v = std::atoll(mb);
|
||||
if (v > 0) max_bytes = static_cast<size_t>(v);
|
||||
}
|
||||
if (const char* mc = std::getenv("RK3588_DMA_POOL_MAX_BUFFERS")) {
|
||||
const long long v = std::atoll(mc);
|
||||
if (v > 0) max_buffers = static_cast<size_t>(v);
|
||||
}
|
||||
}
|
||||
|
||||
DmaBuffer* Take(size_t size) {
|
||||
std::lock_guard<std::mutex> lock(mu);
|
||||
InitOnce();
|
||||
if (!enabled) return nullptr;
|
||||
auto it = free_by_size.find(size);
|
||||
if (it == free_by_size.end() || it->second.empty()) return nullptr;
|
||||
DmaBuffer* b = it->second.back();
|
||||
it->second.pop_back();
|
||||
total_bytes -= size;
|
||||
total_buffers -= 1;
|
||||
return b;
|
||||
}
|
||||
|
||||
void Put(DmaBuffer* b) {
|
||||
if (!b) return;
|
||||
std::lock_guard<std::mutex> lock(mu);
|
||||
InitOnce();
|
||||
if (!enabled) {
|
||||
delete b;
|
||||
return;
|
||||
}
|
||||
const size_t sz = b->size;
|
||||
if (sz == 0) {
|
||||
delete b;
|
||||
return;
|
||||
}
|
||||
|
||||
if (total_buffers + 1 > max_buffers || total_bytes + sz > max_bytes) {
|
||||
delete b;
|
||||
return;
|
||||
}
|
||||
|
||||
free_by_size[sz].push_back(b);
|
||||
total_bytes += sz;
|
||||
total_buffers += 1;
|
||||
}
|
||||
};
|
||||
|
||||
static DmaBufferPool& Pool() {
|
||||
static DmaBufferPool* p = new DmaBufferPool();
|
||||
return *p;
|
||||
}
|
||||
|
||||
} // namespace
|
||||
|
||||
DmaBuffer::~DmaBuffer() {
|
||||
#if defined(__linux__)
|
||||
if (ptr && size > 0) {
|
||||
@ -80,6 +158,12 @@ DmaBuffer& DmaBuffer::operator=(DmaBuffer&& other) noexcept {
|
||||
|
||||
DmaBufferPtr DmaAlloc(size_t alloc_size) {
|
||||
#if defined(__linux__)
|
||||
if (alloc_size == 0) return nullptr;
|
||||
|
||||
if (auto* reused = Pool().Take(alloc_size)) {
|
||||
return DmaBufferPtr(reused, [](DmaBuffer* b) { Pool().Put(b); });
|
||||
}
|
||||
|
||||
// Try different dma_heap devices - prefer CMA for RGA compatibility (<4GB)
|
||||
const char* heap_paths[] = {
|
||||
"/dev/dma_heap/cma",
|
||||
@ -139,12 +223,11 @@ DmaBufferPtr DmaAlloc(size_t alloc_size) {
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
auto buf = std::make_shared<DmaBuffer>();
|
||||
auto* buf = new DmaBuffer();
|
||||
buf->fd = dma_fd;
|
||||
buf->ptr = ptr;
|
||||
buf->size = alloc_size;
|
||||
|
||||
return buf;
|
||||
return DmaBufferPtr(buf, [](DmaBuffer* b) { Pool().Put(b); });
|
||||
#else
|
||||
(void)alloc_size;
|
||||
std::cerr << "[DmaAlloc] not supported on this platform\n";
|
||||
|
||||
Loading…
Reference in New Issue
Block a user