7.0 KiB
7.0 KiB
温度数据采集系统设计文档
1. 系统概述
1.1 系统目标
设计并实现一个自动化温度数据采集系统,通过websocket与别的系统通信.当收到websocket消息后,调用第三方API获取事件并保存时事件到数据库.然后使用OCR技术识别图片中的温度数据,保存到数据库中并提供数据查询接口。
1.2 系统范围
- 自动化数据采集
- 图片处理和OCR识别
- 数据存储和管理
- API接口服务
2. 系统架构
2.1 整体架构
系统采用分层架构设计,主要包含以下层次:
- 数据采集层
- 业务处理层
- 数据持久层
- API接口层
2.2 技术架构
├── app/
│ ├── config/ # 配置
│ ├── core/ # 数据库核心配置
│ ├── crud/ # 通用数据库操作
│ ├── kandaApi/ # 第三方平台的一些测试接口
│ ├── models/ # 数据库模型
│ ├── service/ # 同步数据库数据
│ └── utils/ # 工具函数
├── doc/ # 项目文档
├── alembic/ # 数据库迁移
├── ocrModel/ # ocr模型
├── logs/ # 日志文件
├── imagesDownload/ # 下载的图片
└── imagesOcr/ # ocr结果图片
3. 详细设计
3.1 数据同步模块
class EventSyncService:
def __init__(self):
self.kangda = Kangda()
self.ocr = BadiduOcr()
async def sync_events(self):
"""同步事件数据"""
print("开始同步事件数据...")
# 获取事件列表
event_list = self.kangda.get_event_list()
if not event_list:
print("获取事件列表失败")
return
# 获取新事件列表
new_events = await self._get_new_events(event_list)
if not new_events:
print("没有新事件需要同步")
return
print(f"发现 {len(new_events)} 个新事件")
# 获取事件详情并更新
await self._update_event_details(new_events)
print("事件同步完成
3.2 API接口模块
@router.get("/temperatures")
async def get_temperatures(
start_time: datetime,
end_time: datetime,
db: Session = Depends(get_db)
):
return await temperature_service.get_temperatures(
db, start_time, end_time
)
4. 数据库设计
4.1 表结构
详细表结构见需求文档中的数据库设计部分。
4.2 索引设计
-- 温度数据表索引
ALTER TABLE temperature ADD INDEX idx_temp_event_id (event_id);
ALTER TABLE temperature ADD INDEX idx_temp_create_time (create_time);
-- 图片表索引
ALTER TABLE image ADD INDEX idx_image_event_id (event_id);
-- 处理日志表索引
ALTER TABLE process_log ADD INDEX idx_log_event_id (event_id);
ALTER TABLE process_log ADD INDEX idx_log_create_time (create_time);
4.3 数据库配置
[mysql]
host = localhost
port = 3306
database = temp_db
user = temp_user
password = your_password
charset = utf8mb4
pool_size = 5
max_overflow = 10
pool_timeout = 30
pool_recycle = 1800
5. 接口设计
5.1 内部接口
- 数据采集接口
- 获取事件列表
- 获取事件详情
- 下载图片
- 数据处理接口
- OCR识别接口
- 数据验证接口
- 数据存储接口
5.2 外部接口
-
WebSocket接口
@router.websocket("/ws") async def websocket_endpoint(websocket: WebSocket): await websocket.accept() try: while True: data = await websocket.receive_json() # 处理接收到的消息 await process_websocket_message(data) except WebSocketDisconnect: pass -
温度数据查询接口
@router.get("/api/v1/temperatures") async def query_temperatures( start_time: datetime, end_time: datetime, event_type: Optional[str] = None ) -> List[TemperatureResponse]: pass -
处理状态查询接口
@router.get("/api/v1/events/{event_id}/status") async def get_event_status( event_id: str ) -> EventStatusResponse: pass -
获取事件列表接口
@router.get("/api/v1/events") async def get_events( skip: int = 0, limit: int = 100, start_time: Optional[datetime] = None, end_time: Optional[datetime] = None ) -> List[EventResponse]: pass -
搜索事件接口
@router.get("/api/v1/events/search") async def search_events( keyword: str, skip: int = 0, limit: int = 100 ) -> List[EventResponse]: pass -
删除事件接口
@router.delete("/api/v1/events/{event_id}") async def delete_event( event_id: str ) -> Dict[str, str]: pass -
查看事件详情接口
@router.get("/api/v1/events/{event_id}") async def get_event_detail( event_id: str ) -> EventDetailResponse: pass
6. 安全设计
6.1 认证授权
- 使用JWT进行API认证
- 实现基于角色的访问控制
6.2 数据安全
- 敏感数据加密存储
- 数据传输加密
- 操作日志记录
7. 性能设计
7.1 并发处理
- 使用异步IO处理HTTP请求
- 实现请求限流
- 使用连接池管理数据库连接
7.2 缓存策略
- 使用Redis缓存token
- 缓存热点数据
- 实现数据预加载
8. 监控设计
8.1 系统监控
- 接口响应时间监控
- 系统资源使用监控
- 异常监控告警
8.2 业务监控
- 数据采集成功率
- OCR识别准确率
- 处理延迟监控
9. 部署方案
9.1 环境要求
- Python 3.8+
- MySQL 8.0+
- Redis 6+
- Docker 20+
9.2 部署架构
[负载均衡器]
|
+---------------+---------------+
| | |
[应用服务器1] [应用服务器2] [应用服务器3]
| | |
+---+---------------+---+-----------+
| |
[MySQL主] [MySQL从]
| |
[Redis集群] [文件存储]
9.3 MySQL配置建议
[mysqld]
# 基本配置
character-set-server = utf8mb4
collation-server = utf8mb4_unicode_ci
default-time-zone = '+8:00'
# 连接配置
max_connections = 1000
max_connect_errors = 1000
# 缓冲池配置
innodb_buffer_pool_size = 4G
innodb_buffer_pool_instances = 4
# 日志配置
slow_query_log = 1
slow_query_log_file = /var/log/mysql/slow.log
long_query_time = 2
# 主从复制配置
server-id = 1
log-bin = mysql-bin
binlog_format = ROW
10. 测试方案
10.1 单元测试
- 使用pytest进行单元测试
- 测试覆盖率要求>80%
10.2 集成测试
- API接口测试
- 数据库操作测试
- OCR识别测试
10.3 性能测试
- 并发请求测试
- 数据处理能力测试
- 系统稳定性测试