From eee1f44510b4927ae428fbff7dbb9567fd7a6de3 Mon Sep 17 00:00:00 2001 From: haotian <2421912570@qq.com> Date: Thu, 15 May 2025 16:17:26 +0800 Subject: [PATCH] =?UTF-8?q?=E6=AF=8F=E9=9A=945=E5=88=86=E9=92=9F=E5=90=8C?= =?UTF-8?q?=E6=AD=A5=E4=B8=80=E6=AC=A1=E5=8A=9F=E8=83=BD=E5=88=9D=E6=AD=A5?= =?UTF-8?q?=E5=AE=8C=E6=88=90?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- alembic.ini | 82 +++++++ alembic/env.py | 78 +++++++ app/config/config.yaml | 30 +++ app/core/config.py | 19 ++ app/core/database.py | 53 +++++ app/crud/base.py | 81 +++++++ app/kangdaApi/event.py | 110 +++++++++ app/kangdaApi/login.py | 57 +++++ app/kangdaApi/parseConfig.py | 8 + app/models/models.py | 121 ++++++++++ app/services/event_sync_service.py | 166 ++++++++++++++ app/util/baiduOcr.py | 73 ++++++ app/util/kangda.py | 170 ++++++++++++++ doc/design.md | 355 +++++++++++++++++++++++++++++ doc/requirement.md | 355 +++++++++++++++++++++++++++++ requirements.txt | 4 + run_sync.py | 6 + test_db.py | 211 +++++++++++++++++ test_t.py | 24 ++ 安装依赖.txt | 6 + 20 files changed, 2009 insertions(+) create mode 100644 alembic.ini create mode 100644 alembic/env.py create mode 100644 app/config/config.yaml create mode 100644 app/core/config.py create mode 100644 app/core/database.py create mode 100644 app/crud/base.py create mode 100644 app/kangdaApi/event.py create mode 100644 app/kangdaApi/login.py create mode 100644 app/kangdaApi/parseConfig.py create mode 100644 app/models/models.py create mode 100644 app/services/event_sync_service.py create mode 100644 app/util/baiduOcr.py create mode 100644 app/util/kangda.py create mode 100644 doc/design.md create mode 100644 doc/requirement.md create mode 100644 requirements.txt create mode 100644 run_sync.py create mode 100644 test_db.py create mode 100644 test_t.py create mode 100644 安装依赖.txt diff --git a/alembic.ini b/alembic.ini new file mode 100644 index 0000000..cc215ba --- /dev/null +++ b/alembic.ini @@ -0,0 +1,82 @@ +[alembic] +# path to migration scripts +script_location = alembic + +# template used to generate migration files +file_template = %%(year)d%%(month).2d%%(day).2d_%%(hour).2d%%(minute).2d%%(second).2d_%%(rev)s_%%(slug)s + +# timezone to use when rendering the date +# within the migration file as well as the filename. +# string value is passed to dateutil.tz.gettz() +# leave blank for localtime +timezone = Asia/Shanghai + +# max length of characters to apply to the +# "slug" field +truncate_slug_length = 40 + +# set to 'true' to run the environment during +# the 'revision' command, regardless of autogenerate +revision_environment = false + +# set to 'true' to allow .pyc and .pyo files without +# a source .py file to be detected as revisions in the +# versions/ directory +sourceless = false + +# version location specification; this defaults +# to alembic/versions. When using multiple version +# directories, initial revisions must be specified with --version-path +version_locations = %(here)s/alembic/versions + +# the output encoding used when revision files +# are written from script.py.mako +output_encoding = utf-8 + +sqlalchemy.url = driver://user:pass@localhost/dbname + +[post_write_hooks] +# post_write_hooks defines scripts or Python functions that are run +# on newly generated revision scripts. See the documentation for further +# detail and examples + +# format using "black" - use the console_scripts runner, against the "black" entrypoint +# hooks = black +# black.type = console_scripts +# black.entrypoint = black +# black.options = -l 79 REVISION_SCRIPT_FILENAME + +# Logging configuration +[loggers] +keys = root,sqlalchemy,alembic + +[handlers] +keys = console + +[formatters] +keys = generic + +[logger_root] +level = WARN +handlers = console +qualname = + +[logger_sqlalchemy] +level = WARN +handlers = +qualname = sqlalchemy.engine + +[logger_alembic] +level = INFO +handlers = +qualname = alembic + +[handler_console] +class = StreamHandler +args = (sys.stderr,) +level = NOTSET +formatter = generic + +[formatter_generic] +format = %(levelname)-5.5s [%(name)s] %(message)s +datefmt = %H:%M:%S \ No newline at end of file diff --git a/alembic/env.py b/alembic/env.py new file mode 100644 index 0000000..93c57ce --- /dev/null +++ b/alembic/env.py @@ -0,0 +1,78 @@ +from logging.config import fileConfig +from sqlalchemy import engine_from_config +from sqlalchemy import pool +from alembic import context +from app.core.config import settings +from app.models.models import Base + +# this is the Alembic Config object, which provides +# access to the values within the .ini file in use. +config = context.config + +# Interpret the config file for Python logging. +# This line sets up loggers basically. +if config.config_file_name is not None: + fileConfig(config.config_file_name) + +# add your model's MetaData object here +# for 'autogenerate' support +target_metadata = Base.metadata + +# other values from the config, defined by the needs of env.py, +# can be acquired: +# my_important_option = config.get_main_option("my_important_option") +# ... etc. + +def get_url(): + return f"mysql+aiomysql://{settings.DB_USER}:{settings.DB_PASSWORD}@{settings.DB_HOST}:{settings.DB_PORT}/{settings.DB_NAME}" + +def run_migrations_offline() -> None: + """Run migrations in 'offline' mode. + + This configures the context with just a URL + and not an Engine, though an Engine is acceptable + here as well. By skipping the Engine creation + we don't even need a DBAPI to be available. + + Calls to context.execute() here emit the given string to the + script output. + + """ + url = get_url() + context.configure( + url=url, + target_metadata=target_metadata, + literal_binds=True, + dialect_opts={"paramstyle": "named"}, + ) + + with context.begin_transaction(): + context.run_migrations() + +def run_migrations_online() -> None: + """Run migrations in 'online' mode. + + In this scenario we need to create an Engine + and associate a connection with the context. + + """ + configuration = config.get_section(config.config_ini_section) + configuration["sqlalchemy.url"] = get_url() + connectable = engine_from_config( + configuration, + prefix="sqlalchemy.", + poolclass=pool.NullPool, + ) + + with connectable.connect() as connection: + context.configure( + connection=connection, target_metadata=target_metadata + ) + + with context.begin_transaction(): + context.run_migrations() + +if context.is_offline_mode(): + run_migrations_offline() +else: + run_migrations_online() \ No newline at end of file diff --git a/app/config/config.yaml b/app/config/config.yaml new file mode 100644 index 0000000..ed65a0c --- /dev/null +++ b/app/config/config.yaml @@ -0,0 +1,30 @@ + +kangda: + username: cmJzem4= + password: QUJjZDEyMzQ + isLogin: false + cookieFlag: false + deviceId: pc + lang: zh_CN + +url_login : "http://erpapi.concoai.com/basis-api/user/login" +url_event_list : "http://erpapi.concoai.com/robot/event/page" +url_event_detail: "http://erpapi.concoai.com/robot/event/" + +image_save_path: "imagesDownload" + +det_model_dir: "/home/admin-root/haotian/康达瑞贝斯/PaddleOCR-main/output/PP-OCRv4_server_det_inference" +rec_model_dir: "/home/admin-root/haotian/康达瑞贝斯/PaddleOCR-main/output/rec_ppocr_v4_hgnet_kangda_inference" + + +DB_HOST: "10.0.0.17" +DB_PORT: 3306 +DB_USER: "root" +DB_PASSWORD: "root" +DB_NAME: "kangda" +DB_CHARSET: utf8mb4" +DB_POOL_SIZE: 5 +DB_MAX_OVERFLOW: 10 +DB_POOL_TIMEOUT: 30 +DB_POOL_RECYCLE: 1800 + diff --git a/app/core/config.py b/app/core/config.py new file mode 100644 index 0000000..25fe352 --- /dev/null +++ b/app/core/config.py @@ -0,0 +1,19 @@ +from pydantic_settings import BaseSettings + +class Settings(BaseSettings): + # 数据库配置 + DB_HOST: str = "10.0.0.17" + DB_PORT: int = 3306 + DB_USER: str = "root" + DB_PASSWORD: str = "root" + DB_NAME: str = "kangda" + DB_CHARSET: str = "utf8mb4" + DB_POOL_SIZE: int = 5 + DB_MAX_OVERFLOW: int = 10 + DB_POOL_TIMEOUT: int = 30 + DB_POOL_RECYCLE: int = 1800 + + class Config: + env_file = ".env" + +settings = Settings() \ No newline at end of file diff --git a/app/core/database.py b/app/core/database.py new file mode 100644 index 0000000..b2eb2cd --- /dev/null +++ b/app/core/database.py @@ -0,0 +1,53 @@ +from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession +from sqlalchemy.orm import sessionmaker, declarative_base +from ..kangdaApi.parseConfig import parse_config +from .config import settings + +# import yaml + +# config_path = "app/config/config.yaml" + +# config = parse_config(config_path) + +# config_kangda = config["kangda"] + +# print("sssss", config_kangda) + +# 创建异步引擎 +engine = create_async_engine( + f"mysql+aiomysql://{settings.DB_USER}:{settings.DB_PASSWORD}@{settings.DB_HOST}:{settings.DB_PORT}/{settings.DB_NAME}", + pool_size=settings.DB_POOL_SIZE, + max_overflow=settings.DB_MAX_OVERFLOW, + pool_timeout=settings.DB_POOL_TIMEOUT, + pool_recycle=settings.DB_POOL_RECYCLE, + echo=False +) + +# engine = create_async_engine( +# f"mysql+aiomysql://{config_kangda["DB_USER"]}:{config_kangda["DB_PASSWORD"]}@{config_kangda["DB_HOST"]}:{config_kangda["DB_PORT"]}/{config_kangda["DB_NAME"]}", +# pool_size=config_kangda["DB_POOL_SIZE"], +# max_overflow=config_kangda["DB_MAX_OVERFLOW"], +# pool_timeout=config_kangda["DB_POOL_TIMEOUT"], +# pool_recycle=config_kangda["DB_POOL_RECYCLE"], +# echo=False +# ) + +# 创建异步会话工厂 +async_session = sessionmaker( + engine, class_=AsyncSession, expire_on_commit=False +) + +# 创建基类 +Base = declarative_base() + +# 获取数据库会话 +async def get_db(): + async with async_session() as session: + try: + yield session + await session.commit() + except Exception: + await session.rollback() + raise + finally: + await session.close() \ No newline at end of file diff --git a/app/crud/base.py b/app/crud/base.py new file mode 100644 index 0000000..8b78952 --- /dev/null +++ b/app/crud/base.py @@ -0,0 +1,81 @@ +from typing import Any, Dict, Generic, List, Optional, Type, TypeVar, Union +from fastapi.encoders import jsonable_encoder +from pydantic import BaseModel +from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy import select, update, delete +from app.core.database import Base + +ModelType = TypeVar("ModelType", bound=Base) +CreateSchemaType = TypeVar("CreateSchemaType", bound=BaseModel) +UpdateSchemaType = TypeVar("UpdateSchemaType", bound=BaseModel) + +class CRUDBase(Generic[ModelType, CreateSchemaType, UpdateSchemaType]): + def __init__(self, model: Type[ModelType]): + """ + CRUD对象与SQLAlchemy模型类一起使用 + :param model: SQLAlchemy模型类 + """ + self.model = model + + # 根据id获取对象 + async def get(self, db: AsyncSession, id: Any) -> Optional[ModelType]: + """ + 通过ID获取对象 + """ + query = select(self.model).where(self.model.id == id) + result = await db.execute(query) + return result.scalar_one_or_none() + + # 分页查询 + async def get_multi( + self, db: AsyncSession, *, skip: int = 0, limit: int = 100 + ) -> List[ModelType]: + """ + 获取多个对象 + """ + query = select(self.model).offset(skip).limit(limit) + result = await db.execute(query) + return result.scalars().all() + + async def create(self, db: AsyncSession, *, obj_in: CreateSchemaType) -> ModelType: + """ + 创建对象 + """ + obj_in_data = jsonable_encoder(obj_in) + db_obj = self.model(**obj_in_data) + db.add(db_obj) + await db.commit() + await db.refresh(db_obj) + return db_obj + + async def update( + self, + db: AsyncSession, + *, + db_obj: ModelType, + obj_in: Union[UpdateSchemaType, Dict[str, Any]] + ) -> ModelType: + """ + 更新对象 + """ + obj_data = jsonable_encoder(db_obj) + if isinstance(obj_in, dict): + update_data = obj_in + else: + update_data = obj_in.dict(exclude_unset=True) + for field in obj_data: + if field in update_data: + setattr(db_obj, field, update_data[field]) + db.add(db_obj) + await db.commit() + await db.refresh(db_obj) + return db_obj + + async def remove(self, db: AsyncSession, *, id: Any) -> ModelType: + """ + 删除对象 + """ + obj = await self.get(db=db, id=id) + await db.delete(obj) + await db.commit() + return obj \ No newline at end of file diff --git a/app/kangdaApi/event.py b/app/kangdaApi/event.py new file mode 100644 index 0000000..7df8626 --- /dev/null +++ b/app/kangdaApi/event.py @@ -0,0 +1,110 @@ +import requests +from .login import login_gettoken +from .parseConfig import parse_config + + + +config_path = "app/config/config.yaml" + +config = parse_config(config_path) + +config_kangda = config["kangda"] +# print(config_kangda) + +url_login = config_kangda["url_login"] +url_event_list = config_kangda["url_event_list"] +url_event_detail = config_kangda["url_event_detail"] + + + +def event_get_list(url, token): + headers = { + "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/134.0.0.0 Safari/537.36", + "accept-language": "zh-CN,zh;q=0.9", + "connections": "keep-alive", + "Host": "erpapi.concoai.com", + "accept-encoding": "gzip, deflate", + "upgrade-insecure-requests": "1", + "accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7", + "token": token, + "refreshToken": token + } + + parse_body = {"pageNo": 1, "pageSize": 10000} + + try: + response = requests.post(url=url, + headers=headers, + json=parse_body) + # 检查请求是否成功 + response.raise_for_status() + + # 解析JSON响应 + result = response.json() + + return result["rows"] + + + except requests.exceptions.RequestException as e: + print("请求出错:", e) + except ValueError as e: + print("解析JSON失败:", e) + +def parse_event_result(results): + for result in results: + print(result.keys()) + break + +def event_get_detail(results, token): + + # 请求头 + headers = { + "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/134.0.0.0 Safari/537.36", + "accept-language": "zh-CN,zh;q=0.9", + "connections": "keep-alive", + "Host": "erpapi.concoai.com", + "accept-encoding": "gzip, deflate", + "upgrade-insecure-requests": "1", + "accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7", + "token": token, + "refreshToken": token + } + + # 事件详情列表 + back = list() + + for result in results: + # print(result["eventId"]) + try: + response = requests.get( + url=url_event_detail+result["eventId"], + headers=headers + ) + response.raise_for_status() + + # 解析JSON响应 + result = response.json() + + back.append(result["data"]) + + except Exception as e: + print("获取事件详情失败", e) + # break + + + return back + + +# def get_event_list(url_login, config_kangda) + + + + +if __name__ == "__main__": + + token = login_gettoken(url_login, config_kangda) + results = event_get_list(url_event_list, token) + details = event_get_detail(results, token) + for detail in details: + print(detail["imgList"], type(detail["imgList"])) + \ No newline at end of file diff --git a/app/kangdaApi/login.py b/app/kangdaApi/login.py new file mode 100644 index 0000000..e0ac0fb --- /dev/null +++ b/app/kangdaApi/login.py @@ -0,0 +1,57 @@ + +from .parseConfig import parse_config + +import requests + + +config_path = "app/config/config.yaml" + +config = parse_config(config_path) + +config_kangda = config["kangda"] +# print(config_kangda) + +url_login = config_kangda["url_login"] + + +def login_gettoken(url, config): + + headers = { + "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/134.0.0.0 Safari/537.36", + "accept-language": "zh-CN,zh;q=0.9", + "connections": "keep-alive", + "Host": "erpapi.concoai.com", + "accept-encoding": "gzip, deflate", + "upgrade-insecure-requests": "1", + "accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7" + } + + try: + # 发送POST请求,params设置查询参数,json设置JSON请求体 + response = requests.get( + url, + params=config, + headers=headers + # json=payload + ) + + # 检查请求是否成功 + response.raise_for_status() + + # 解析JSON响应 + result = response.json() + + return result.get("data").get("token") + + + except requests.exceptions.RequestException as e: + print("请求出错:", e) + except ValueError as e: + print("解析JSON失败:", e) + +if __name__ == "__main__": + print(login_gettoken(url_login, config_kangda)) + + + + diff --git a/app/kangdaApi/parseConfig.py b/app/kangdaApi/parseConfig.py new file mode 100644 index 0000000..6b33aaf --- /dev/null +++ b/app/kangdaApi/parseConfig.py @@ -0,0 +1,8 @@ + +import yaml + + +def parse_config(path): + with open(path, "r", encoding="utf-8") as f: + config = yaml.safe_load(f) + return config \ No newline at end of file diff --git a/app/models/models.py b/app/models/models.py new file mode 100644 index 0000000..de4694c --- /dev/null +++ b/app/models/models.py @@ -0,0 +1,121 @@ +from datetime import datetime +from sqlalchemy import Column, String, DateTime, Integer, Text, ForeignKey, Index, BigInteger +from sqlalchemy.orm import relationship +from app.core.database import Base + + +class Event(Base): + __tablename__ = "event" + + eventId = Column(String(50), primary_key=True) + tenantInfoId = Column(String(100)) + reportEventId = Column(String(100)) + number = Column(String(20)) + name = Column(String(20)) + eclassify = Column(String(5)) + operationType = Column(String(5)) + etype = Column(String(20)) + etypeName = Column(String(20)) + enTypeName = Column(String(30)) + hkTypeName = Column(String(20)) + reportStatus = Column(String(5)) + results = Column(String(5)) + insDate = Column(DateTime) + insDateShow = Column(DateTime) + updDate = Column(DateTime) + updDateShow = Column(DateTime) + fileType = Column(String(5)) + area = Column(String(20)) + floor = Column(String(10)) + map = Column(String(20)) + staffId = Column(String(40)) + targetUserId = Column(String(40)) + position = Column(String(100)) + actualStaffName = Column(String(20)) + targetStaffName = Column(String(20)) + routeName = Column(String(20)) + phoneAddress = Column(String(500)) + width = Column(String(10)) + height = Column(String(10)) + resolution = Column(String(10)) + originX = Column(String(20)) + originY = Column(String(20)) + imgList = Column(String(20)) + robotType = Column(String(5)) + eventFloor = Column(String(10)) + floorName = Column(String(10)) + coordId = Column(String(40)) + coord = Column(String(40)) + coordName = Column(String(30)) + positonName = Column(String(20)) + processingRemark = Column(String(300)) + carId = Column(String(40)) + parkingSpaceType = Column(String(10)) + parkingSpaceNumber = Column(String(40)) + carNumber = Column(String(40)) + eno = Column(String(40)) + instrument = Column(String(40)) + evideo = Column(String(40)) + createTime = Column(DateTime, default=datetime.now, comment='本地后台创建时间') + updateTime = Column(DateTime, default=datetime.now, onupdate=datetime.now, comment='本地后台更新时间') + + # 关系 + images = relationship("Image", back_populates="event", cascade="all, delete-orphan") + temperatures = relationship("Temperature", back_populates="event", cascade="all, delete-orphan") + process_logs = relationship("ProcessLog", back_populates="event", cascade="all, delete-orphan") + + +class Image(Base): + __tablename__ = "image" + + imageId = Column(BigInteger, primary_key=True, autoincrement=True, comment='图片ID') + eventId = Column(String(50), ForeignKey('event.eventId'), nullable=False, comment='关联事件ID') + imageUrl = Column(String(500), nullable=False, comment='图片URL') + localPath = Column(String(500), comment='本地存储路径') + createTime = Column(DateTime, default=datetime.now, nullable=False, comment='创建时间') + + # 关系 + event = relationship("Event", back_populates="images" ) + temperatures = relationship("Temperature", back_populates="image") + + __table_args__ = ( + Index('idx_image_event_id', 'eventId'), + ) + + +class Temperature(Base): + __tablename__ = "temperature" + + tempId = Column(BigInteger, primary_key=True, autoincrement=True, comment='温度记录ID') + eventId = Column(String(50), ForeignKey('event.eventId'), nullable=False, comment='关联事件ID') + imageId = Column(BigInteger, ForeignKey('image.imageId'), nullable=False, comment='关联图片ID') + temperature = Column(String(20), nullable=False, comment='温度值') + confidence = Column(String(20), nullable=False, comment='识别置信度') + createTime = Column(DateTime, default=datetime.now, nullable=False, comment='创建时间') + + # 关系 + event = relationship("Event", back_populates="temperatures") + image = relationship("Image", back_populates="temperatures") + + __table_args__ = ( + Index('idx_temp_event_id', 'eventId'), + Index('idx_temp_create_time', 'createTime'), + ) + + +class ProcessLog(Base): + __tablename__ = "process_log" + + logId = Column(BigInteger, primary_key=True, autoincrement=True, comment='日志ID') + eventId = Column(String(50), ForeignKey('event.eventId'), nullable=False, comment='关联事件ID') + processStatus = Column(Integer, nullable=False, comment='处理状态') + errorMessage = Column(Text, comment='错误信息') + createTime = Column(DateTime, default=datetime.now, nullable=False, comment='创建时间') + + # 关系 + event = relationship("Event", back_populates="process_logs") + + __table_args__ = ( + Index('idx_log_event_id', 'eventId'), + Index('idx_log_create_time', 'createTime'), + ) \ No newline at end of file diff --git a/app/services/event_sync_service.py b/app/services/event_sync_service.py new file mode 100644 index 0000000..b94f941 --- /dev/null +++ b/app/services/event_sync_service.py @@ -0,0 +1,166 @@ +import asyncio +from datetime import datetime +from typing import List, Dict, Any +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession +from app.core.database import async_session +from app.models.models import Event, Image, Temperature +from app.util.kangda import Kangda +from app.util.baiduOcr import BadiduOcr + +class EventSyncService: + def __init__(self): + self.kangda = Kangda() + self.ocr = BadiduOcr() + + async def sync_events(self): + """同步事件数据""" + print("开始同步事件数据...") + + # 获取事件列表 + event_list = self.kangda.get_event_list() + if not event_list: + print("获取事件列表失败") + return + + # 获取新事件列表 + new_events = await self._get_new_events(event_list) + if not new_events: + print("没有新事件需要同步") + return + + print(f"发现 {len(new_events)} 个新事件") + + # 获取事件详情并更新 + await self._update_event_details(new_events) + + print("事件同步完成") + + async def _get_new_events(self, event_list: List[Dict[str, Any]]) -> List[Dict[str, Any]]: + """获取新事件列表""" + new_events = [] + + async with async_session() as session: + for event_data in event_list: + # 检查事件是否已存在 + query = select(Event).where(Event.eventId == event_data["eventId"]) + result = await session.execute(query) + existing_event = result.scalar_one_or_none() + + if not existing_event: + # 创建新事件 + new_event = Event(**event_data) + session.add(new_event) + new_events.append(event_data) + + await session.commit() + + return new_events + + async def _update_event_details(self, event_list: List[Dict[str, Any]]): + """更新事件详情""" + # 获取事件详情 + event_details = self.kangda.get_event_list_detail(event_list) + + async with async_session() as session: + for detail in event_details: + if not detail: + continue + + # 更新事件信息 + await self._update_event(session, detail) + + # 保存图片信息 + image_list = await self._save_images(session, detail) + + # 保存ocr温度信息 + await self._ocr_images(session, image_list) + + + async def _update_event(self, session: AsyncSession, detail: Dict[str, Any]): + """更新事件信息""" + query = select(Event).where(Event.eventId == detail["eventId"]) + result = await session.execute(query) + event = result.scalar_one_or_none() + + if event: + # 更新事件字段 + event.reportEventId = detail.get("reportEventId") + event.fileType = detail.get("fileType") + event.area = detail.get("area") + event.position = detail.get("position") + event.phoneAddress = detail.get("phoneAddress") + event.width = detail.get("width") + event.height = detail.get("height") + event.resolution = detail.get("resolution") + event.originX = detail.get("originX") + event.originY = detail.get("originY") + + await session.commit() + + async def _save_images(self, session: AsyncSession, detail: Dict[str, Any]): + """保存图片信息""" + if not detail.get("imgList"): + return + + image_list = list() + + for image_url in detail["imgList"]: + # 检查图片是否已存在 + query = select(Image).where( + Image.eventId == detail["eventId"], + Image.imageUrl == image_url + ) + result = await session.execute(query) + existing_image = result.scalar_one_or_none() + + if not existing_image: + # 下载图片 + local_path = self.kangda._download_image(image_url) + + # 创建图片记录 + image = Image( + eventId=detail["eventId"], + imageUrl=image_url, + localPath=local_path + ) + session.add(image) + session.refresh(image) + + image_list.append(image) + + await session.commit() + + return image_list + + async def _ocr_images(self, session: AsyncSession, image_list:Dict[str, Any]): + + for image in image_list: + result = self.ocr.image_inference(image.localPath) + value, conf = self.ocr.parse_result(result) + if len(value) > 0: + + # 插入temperature表数据 + temperature = Temperature( + eventId=image.eventId, + imageId=image.imageId, + temperature = ",".join(value), + confidence = ",".join(map(str, conf)) + ) + session.add(temperature) + await session.commit() + +async def run_sync(): + """运行同步任务""" + service = EventSyncService() + while True: + try: + await service.sync_events() + except Exception as e: + print(f"同步过程出错: {str(e)}") + + # 等待5分钟 + await asyncio.sleep(300) + +if __name__ == "__main__": + asyncio.run(run_sync()) \ No newline at end of file diff --git a/app/util/baiduOcr.py b/app/util/baiduOcr.py new file mode 100644 index 0000000..826587b --- /dev/null +++ b/app/util/baiduOcr.py @@ -0,0 +1,73 @@ +import yaml +from PIL import Image +from paddleocr import PaddleOCR, draw_ocr + +class BadiduOcr(): + + def __init__(self, config_path="app/config/config.yaml"): + + self.config_path = config_path + self.config = self._parse_config() + + self.ocr = self._init_ocr() + + + def image_inference(self, img_path, show_result=False): + + result = self.ocr.ocr(img_path, cls=False) + + if show_result: + for idx in range(len(result)): + res = result[idx] + for line in res: + print(line) + + return result + + def draw_result(self, result, img_path, save_path): + if result is not None and result[0] is not None : + + result = result[0] + + image = Image.open(img_path).convert('RGB') + boxes = [line[0] for line in result] + txts = [line[1][0] for line in result] + scores = [line[1][1] for line in result] + + # 最好自己下个字体文件 + im_show = draw_ocr(image, boxes, txts, scores, font_path='/usr/share/fonts/truetype/teluguvijayam/PottiSreeramulu.ttf') + im_show = Image.fromarray(im_show) + im_show.save(save_path) + + + + def _init_ocr(self): + + ocr = PaddleOCR(use_angle_cls=False, + lang="ch", + det_model_dir = self.config["det_model_dir"], + det_max_side_len=1920, det_db_score_mode="slow", + rec_model_dir= self.config["rec_model_dir"] , + drop_score=0.2) + + return ocr + + def _parse_config(self): + with open(self.config_path, "r", encoding="utf-8") as f: + config = yaml.safe_load(f) + return config + + def parse_result(self, result): + + value = list() + conf = list() + + if result is not None and result[0] is not None: + # batch is 1 + result = result[0] + for line in result: + # line[1][0], line[1][1] --> ocr value and confidence + value.append(line[1][0]) + conf.append(round(line[1][1], 2)) + + return value, conf \ No newline at end of file diff --git a/app/util/kangda.py b/app/util/kangda.py new file mode 100644 index 0000000..cd83c57 --- /dev/null +++ b/app/util/kangda.py @@ -0,0 +1,170 @@ +import os +import yaml +import requests + +class Kangda: + def __init__(self, config_path="app/config/config.yaml"): + + self.config_path = config_path + self.config = self._parse_config() + + + # 获取所有事件列表 + def get_event_list(self): + token = self._login_gettoken() + event_list = self._event_get_list(token) + return event_list + + # 获取事件列表详情 + def get_event_list_detail(self, results): + + token = self._login_gettoken() + detail_list = self._event_get_detail(results, token) + return detail_list + + def _parse_config(self): + with open(self.config_path, "r", encoding="utf-8") as f: + config = yaml.safe_load(f) + return config + + def _event_get_list(self, token): + headers = { + "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/134.0.0.0 Safari/537.36", + "accept-language": "zh-CN,zh;q=0.9", + "connections": "keep-alive", + "Host": "erpapi.concoai.com", + "accept-encoding": "gzip, deflate", + "upgrade-insecure-requests": "1", + "accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7", + "token": token, + "refreshToken": token + } + + parse_body = {"pageNo": 1, "pageSize": 10000} + + try: + response = requests.post(url=self.config["url_event_list"], + headers=headers, + json=parse_body) + # 检查请求是否成功 + response.raise_for_status() + + # 解析JSON响应 + result = response.json() + + return result["rows"] + + + except requests.exceptions.RequestException as e: + print("获取事件列表请求出错:", e) + except ValueError as e: + print("解析JSON失败:", e) + + def _event_get_detail(self, results, token): + + # 请求头 + headers = { + "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/134.0.0.0 Safari/537.36", + "accept-language": "zh-CN,zh;q=0.9", + "connections": "keep-alive", + "Host": "erpapi.concoai.com", + "accept-encoding": "gzip, deflate", + "upgrade-insecure-requests": "1", + "accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7", + "token": token, + "refreshToken": token + } + + # 事件详情列表 + back = list() + + for result in results: + # print(result["eventId"]) + try: + response = requests.get( + url=self.config["url_event_detail"]+result["eventId"], + headers=headers + ) + response.raise_for_status() + + # 解析JSON响应 + result = response.json() + + back.append(result["data"]) + + except Exception as e: + print("获取事件详情失败", e) + # break + + + return back + + def _login_gettoken(self): + + headers = { + "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/134.0.0.0 Safari/537.36", + "accept-language": "zh-CN,zh;q=0.9", + "connections": "keep-alive", + "Host": "erpapi.concoai.com", + "accept-encoding": "gzip, deflate", + "upgrade-insecure-requests": "1", + "accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7" + } + + try: + # 发送POST请求,params设置查询参数,json设置JSON请求体 + response = requests.get( + self.config["url_login"], + params=self.config["kangda"], + headers=headers + # json=payload + ) + + # 检查请求是否成功 + response.raise_for_status() + + # 解析JSON响应 + result = response.json() + + return result.get("data").get("token") + + except requests.exceptions.RequestException as e: + print("登录请求出错:", e) + except ValueError as e: + print("解析JSON失败:", e) + + + def _download_image(self, url, save_image=True): + ''' + url: 图片下载地址 + save_path: 保存文件目录 + + return: 返回保存的本地文件路径 + ''' + save_path = self.config["image_save_path"] + + # 确保目录存在 + os.makedirs(save_path, exist_ok=True) + + file_name = url.split("/")[-1] + + + try: + + img_save_path = os.path.join(save_path, file_name) + + if save_image: + response = requests.get(url, stream=True) + response.raise_for_status() # 检查请求是否成功, 不成功会raise错误. + + with open(img_save_path, "wb") as f: + for chunk in response.iter_content(1024): + f.write(chunk) + return img_save_path + + except Exception as e: + print("fail to download image ", url," ", e) + return None + + + \ No newline at end of file diff --git a/doc/design.md b/doc/design.md new file mode 100644 index 0000000..ddd3f68 --- /dev/null +++ b/doc/design.md @@ -0,0 +1,355 @@ +# 温度数据采集系统设计文档 + +## 1. 系统概述 + +### 1.1 系统目标 + +设计并实现一个自动化温度数据采集系统,通过websocket与别的系统通信.当收到websocket消息后,调用第三方API获取事件并保存时事件到数据库.然后使用OCR技术识别图片中的温度数据,保存到数据库中并提供数据查询接口。 + +### 1.2 系统范围 + +- 自动化数据采集 +- 图片处理和OCR识别 +- 数据存储和管理 +- API接口服务 + +## 2. 系统架构 + +### 2.1 整体架构 + +系统采用分层架构设计,主要包含以下层次: + +1. 数据采集层 +2. 业务处理层 +3. 数据持久层 +4. API接口层 + +### 2.2 技术架构 + +``` +├── app/ +│ ├── api/ # API接口层 +│ ├── core/ # 核心配置 +│ ├── models/ # 数据模型 +│ ├── schemas/ # 数据验证 +│ ├── services/ # 业务服务 +│ ├── tasks/ # 定时任务 +│ └── utils/ # 工具函数 +├── tests/ # 测试用例 +├── alembic/ # 数据库迁移 +├── logs/ # 日志文件 +└── config/ # 配置文件 +``` + +## 3. 详细设计 + +### 3.1 数据采集模块 + +```python +class DataCollector: + def __init__(self): + self.token_manager = TokenManager() + self.http_client = AsyncHTTPClient() + + async def collect_events(self): + # 获取token + token = await self.token_manager.get_token() + # 获取事件列表 + events = await self.fetch_events(token) + # 获取事件详情 + for event in events: + await self.process_event(event, token) +``` + +### 3.2 OCR处理模块 + +```python +class OCRProcessor: + def __init__(self): + self.ocr = PaddleOCR() + + async def process_image(self, image_path: str) -> Dict: + # 图片预处理 + image = self.preprocess_image(image_path) + # OCR识别 + result = self.ocr.ocr(image) + # 提取温度数据 + temperature = self.extract_temperature(result) + return temperature +``` + +### 3.3 数据存储模块 + +```python +class DatabaseManager: + def __init__(self): + self.engine = create_engine( + 'mysql+aiomysql://user:password@localhost:3306/temp_db', + pool_size=5, + max_overflow=10, + pool_timeout=30, + pool_recycle=1800 + ) + self.session = SessionLocal() + + async def save_event(self, event_data: Dict): + event = Event(**event_data) + self.session.add(event) + await self.session.commit() +``` + +### 3.4 API接口模块 + +```python +@router.get("/temperatures") +async def get_temperatures( + start_time: datetime, + end_time: datetime, + db: Session = Depends(get_db) +): + return await temperature_service.get_temperatures( + db, start_time, end_time + ) +``` + +## 4. 数据库设计 + +### 4.1 表结构 + +详细表结构见需求文档中的数据库设计部分。 + +### 4.2 索引设计 + +```sql + + +-- 温度数据表索引 +ALTER TABLE temperature ADD INDEX idx_temp_event_id (event_id); +ALTER TABLE temperature ADD INDEX idx_temp_create_time (create_time); + +-- 图片表索引 +ALTER TABLE image ADD INDEX idx_image_event_id (event_id); + +-- 处理日志表索引 +ALTER TABLE process_log ADD INDEX idx_log_event_id (event_id); +ALTER TABLE process_log ADD INDEX idx_log_create_time (create_time); +``` + +### 4.3 数据库配置 + +```ini +[mysql] +host = localhost +port = 3306 +database = temp_db +user = temp_user +password = your_password +charset = utf8mb4 +pool_size = 5 +max_overflow = 10 +pool_timeout = 30 +pool_recycle = 1800 +``` + +## 5. 接口设计 + +### 5.1 内部接口 + +1. 数据采集接口 + - 获取事件列表 + - 获取事件详情 + - 下载图片 +2. 数据处理接口 + - OCR识别接口 + - 数据验证接口 + - 数据存储接口 + +### 5.2 外部接口 + +1. WebSocket接口 + ```python + @router.websocket("/ws") + async def websocket_endpoint(websocket: WebSocket): + await websocket.accept() + try: + while True: + data = await websocket.receive_json() + # 处理接收到的消息 + await process_websocket_message(data) + except WebSocketDisconnect: + pass + ``` + +2. 温度数据查询接口 + ```python + @router.get("/api/v1/temperatures") + async def query_temperatures( + start_time: datetime, + end_time: datetime, + event_type: Optional[str] = None + ) -> List[TemperatureResponse]: + pass + ``` + +3. 处理状态查询接口 + ```python + @router.get("/api/v1/events/{event_id}/status") + async def get_event_status( + event_id: str + ) -> EventStatusResponse: + pass + ``` + +4. 获取事件列表接口 + ```python + @router.get("/api/v1/events") + async def get_events( + skip: int = 0, + limit: int = 100, + start_time: Optional[datetime] = None, + end_time: Optional[datetime] = None + ) -> List[EventResponse]: + pass + ``` + +5. 搜索事件接口 + ```python + @router.get("/api/v1/events/search") + async def search_events( + keyword: str, + skip: int = 0, + limit: int = 100 + ) -> List[EventResponse]: + pass + ``` + +6. 删除事件接口 + ```python + @router.delete("/api/v1/events/{event_id}") + async def delete_event( + event_id: str + ) -> Dict[str, str]: + pass + ``` + +7. 查看事件详情接口 + ```python + @router.get("/api/v1/events/{event_id}") + async def get_event_detail( + event_id: str + ) -> EventDetailResponse: + pass + ``` + +## 6. 安全设计 + +### 6.1 认证授权 + +- 使用JWT进行API认证 +- 实现基于角色的访问控制 + +### 6.2 数据安全 + +- 敏感数据加密存储 +- 数据传输加密 +- 操作日志记录 + +## 7. 性能设计 + +### 7.1 并发处理 + +- 使用异步IO处理HTTP请求 +- 实现请求限流 +- 使用连接池管理数据库连接 + +### 7.2 缓存策略 + +- 使用Redis缓存token +- 缓存热点数据 +- 实现数据预加载 + +## 8. 监控设计 + +### 8.1 系统监控 + +- 接口响应时间监控 +- 系统资源使用监控 +- 异常监控告警 + +### 8.2 业务监控 + +- 数据采集成功率 +- OCR识别准确率 +- 处理延迟监控 + +## 9. 部署方案 + +### 9.1 环境要求 + +- Python 3.8+ +- MySQL 8.0+ +- Redis 6+ +- Docker 20+ + +### 9.2 部署架构 + +``` + [负载均衡器] + | + +---------------+---------------+ + | | | + [应用服务器1] [应用服务器2] [应用服务器3] + | | | + +---+---------------+---+-----------+ + | | +[MySQL主] [MySQL从] + | | +[Redis集群] [文件存储] +``` + +### 9.3 MySQL配置建议 + +```ini +[mysqld] +# 基本配置 +character-set-server = utf8mb4 +collation-server = utf8mb4_unicode_ci +default-time-zone = '+8:00' + +# 连接配置 +max_connections = 1000 +max_connect_errors = 1000 + +# 缓冲池配置 +innodb_buffer_pool_size = 4G +innodb_buffer_pool_instances = 4 + +# 日志配置 +slow_query_log = 1 +slow_query_log_file = /var/log/mysql/slow.log +long_query_time = 2 + +# 主从复制配置 +server-id = 1 +log-bin = mysql-bin +binlog_format = ROW +``` + +## 10. 测试方案 + +### 10.1 单元测试 + +- 使用pytest进行单元测试 +- 测试覆盖率要求>80% + +### 10.2 集成测试 + +- API接口测试 +- 数据库操作测试 +- OCR识别测试 + +### 10.3 性能测试 + +- 并发请求测试 +- 数据处理能力测试 +- 系统稳定性测试 diff --git a/doc/requirement.md b/doc/requirement.md new file mode 100644 index 0000000..497602d --- /dev/null +++ b/doc/requirement.md @@ -0,0 +1,355 @@ +# 需求: +- 设计一个Python后台服务程序,用于自动化采集和处理温度数据 +- 通过WebSocket接收外部系统的消息触发 +- 支持事件驱动的数据采集和处理流程 + +# 功能: +- WebSocket服务 + - 接收外部系统的消息 + - 处理消息并触发相应的数据采集流程 + - 支持消息的实时响应 +- 数据采集 + - 定时向第三方接口发送请求,获取返回内容中的图片url + - 根据图片url获取图片,并使用OCR技术读取图片中电子显示屏中的数字(温度)信息 + - 将处理结果存储到数据库并提供查询接口 +- 事件处理 + - 支持事件的增删改查 + - 支持事件的状态跟踪 + - 支持事件的搜索和过滤 + +# 系统架构: +- 采用分层架构设计 + - 数据采集层:负责与第三方API交互 + - 业务处理层:负责数据处理和OCR识别 + - 数据持久层:负责数据存储 + - API接口层:提供对外服务接口 + +# 技术选型: +- 开发语言:Python 3.8+ +- Web框架:FastAPI +- 数据库:MySQL 8.0+ +- ORM:SQLAlchemy +- OCR引擎:PaddleOCR +- 定时任务:APScheduler +- HTTP客户端:aiohttp +- 图片处理:Pillow +- 日志:loguru + +# 具体第三方接口: +- 获取token和refresh接口 + - url: + - http://erpapi.concoai.com/basis-api/user/login + - 传递参数: + - Query参数 + - cookie_flag: Optional[str] = None + - device_id: Optional[str] = None + - is_login: Optional[str] = None + - lang: Optional[str] = None + - password: Optional[str] = None + - username: Optional[str] = None + - 响应参数: 重点保存 data中的token和refreshtoken + ```json + { + "code": "2000", + "message": "成功", + "total": null, + "data": { + "token": "xxx", + "refreshToken": "xxx", + "isLogin": null, + "username": null, + "password": null, + "rolePath": "/index/staff-page" + }, + "rows": null + } + ``` +- 获取列表页接口 + - url: + - http://erpapi.concoai.com/robot/event/page + - 传递参数: + - Header 参数 + - token + - refreshToken + - Body 参数 + ```json + {"pageNo": 1, "pageSize": 10000} + ``` + - 响应参数: 获取rows中 的eventId ,insDate, insDateShow + ```json + { + "code": "2000", + "message": "成功", + "total": 1, + "data": null, + "rows": [ + { + "eventId": "a462a29d7481495380cd47035b19edc0", + "tenantInfoId": "4fff5d4bcc4b4239941ff077a0da8958", + "reportEventId": null, + "number": "ROB23100098", + "name": "X32305000019", + "eclassify": "2", + "operationType": "2", + "etype": "E000007", + "etypeName": "日常巡检", + "enTypeName": "Routine patrol", + "hkTypeName": "日常巡檢\r", + "reportStatus": "0", + "results": "2", + "insDate": "2025-05-06T16:44:31", + "insDateShow": "2025-05-06 16:44:31", + "updDate": null, + "updDateShow": null, + "fileType": null, + "area": null, + "floor": null, + "map": null, + "staffId": null, + "targetUserId": null, + "position": null, + "actualStaffName": null, + "targetStaffName": "员工1", + "routeName": "test", + "phoneAddress": null, + "width": null, + "height": null, + "resolution": null, + "originX": null, + "originY": null, + "imgList": null, + "robotType": "08", + "eventFloor": null, + "floorName": null, + "coordId": null, + "coord": null, + "coordName": null, + "positonName": "A3", + "processingRemark": null, + "carId": null, + "parkingSpaceType": null, + "parkingSpaceNumber": null, + "carNumber": null, + "eno": null, + "instrument": null, + "evideo": null + } + ] + } + ``` + +- 获取事件详情接口 + - url: + - http://erpapi.concoai.com/robot/event/{eventId} + - 传递参数: + - 路径变量: + - eventId + - 请求头参数: + - token: + - refreshToken + - 响应参数: 主要获取imgList中的图片url. + ```json + { + "code": "2000", + "message": "成功", + "total": null, + "data": { + "eventId": "a462a29d7481495380cd47035b19edc0", + "tenantInfoId": "4fff5d4bcc4b4239941ff077a0da8958", + "reportEventId": "bb96f28e14d944c79f44321edca4395f", + "number": "ROB23100098", + "name": "X32305000019", + "eclassify": "2", + "operationType": "2", + "etype": "E000007", + "etypeName": "日常巡检", + "enTypeName": null, + "hkTypeName": null, + "reportStatus": "0", + "results": "2", + "insDate": null, + "insDateShow": "2025-05-06 16:44:31", + "updDate": null, + "updDateShow": null, + "fileType": "2", + "area": "区域1", + "floor": "", + "map": null, + "staffId": null, + "targetUserId": null, + "position": "75.104378,90.402679", + "actualStaffName": null, + "targetStaffName": "员工1", + "routeName": "test", + "phoneAddress": "http://file.prod.concoai.com/image/4fff5d4bcc4b4239941ff077a0da8958/4f60fbd391e98327f6ee13df1455f23e.jpg", + "width": "3994.0", + "height": "4816.0", + "resolution": "0.100000", + "originX": "-111.361710", + "originY": " -255.272873", + "imgList": [ + "http://file.prod.concoai.com/image/4fff5d4bcc4b4239941ff077a0da8958/168c66b32421d8e8d5bc13a0bbb87a30.jpeg" + ], + "robotType": "08", + "eventFloor": null, + "floorName": null, + "coordId": null, + "coord": null, + "coordName": null, + "positonName": "A3", + "processingRemark": null, + "carId": null, + "parkingSpaceType": null, + "parkingSpaceNumber": null, + "carNumber": null, + "eno": null, + "instrument": null, + "evideo": null + }, + "rows": null + } + ``` + +# 后台系统功能: +1. WebSocket服务 + - 实现WebSocket服务器 + - 处理WebSocket连接管理 + - 实现消息的接收和响应 + - 支持心跳检测和重连机制 + +2. 认证管理 + - 通过第三方的获取token和refresh接口获取token和refreshToken + - 实现token自动刷新机制 + - token失效自动重试机制 + +3. 数据采集 + - 定时调用列表页接口获取事件数据 + - 实现增量采集,避免重复处理 + - 异常重试机制 + - 并发控制 + +4. 数据处理 + - 图片下载和预处理 + - OCR识别温度数据 + - 数据清洗和验证 + - 异常数据处理 + +5. 数据存储 + - 事件信息表 + - 图片信息表 + - 温度数据表 + - 处理日志表 + +6. API接口 + - WebSocket接口 + - 温度数据查询接口 + - 处理状态查询接口 + - 事件管理接口(增删改查) + - 手动触发处理接口 + - 系统状态监控接口 + +# 数据库设计: +1. 事件表(event) + + - event_id varchar(50) primary key 事件Id + - tenant_info_id varchar(100) + - report_event_id varchar(100) + - number varchar(20) + - name varchar(20) + - eclassify varchar(5) + - operation_type varchar(5) + - etype varchar(10) + - etype_name varchar(20) + - en_type_name varchar(20) + - hk_type_name varchar(20) + - report_status varchar(5) + - results varchar(5) + - ins_date datetime + - ins_date_show datetime + - upd_date datetime + - upd_date_show datetime + - file_type varchar(5) + - area varchar(20) + - floor varchar(10) + - map varchar(20) + - staff_id varchar(40) + - target_user_id varchar(40) + - position varchar(30) + - actual_staff_name varchar(20) + - target_staff_name varchar(20) + - route_name varchar(20) + - phone_address varchar(500) + - width varchar(10) + - height varchar(10) + - resolution varchar(10) + - origin_x varchar(20) + - origin_y varchar(20) + - img_list_id varchar(20) + - robot_type varchar(5) + - event_floor varchar(10) + - floor_name varchar(10) + - coord_id varchar(40) + - coord_name varchar(30) + - position_name varchar(20) + - processing_remark varchar(300) + - card_id varchar(40) + - parking_space_type varchar(10) + - parking_space_number varchar(40) + - car_number varchar(40) + - eno varchar(40) + - instrument varchar(40) + - evideo varchar(40) + - create_time datetime COMMENT '本地后台创建时间' + - update_time datetime DEFAULT CURRENT_TIMESTAMP COMMENT '本地后台更新时间' + +2. 图片表(image) + - image_id: VARCHAR(40) PRIMARY KEY COMMENT '图片ID' + - event_id: VARCHAR(50) NOT NULL COMMENT '关联事件ID' + - image_url: VARCHAR(500) NOT NULL COMMENT '图片URL' + - local_path: VARCHAR(200) COMMENT '本地存储路径' + - create_time: DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间' + - INDEX idx_event_id (event_id), + - FOREIGN KEY (event_id) REFERENCES event(event_id) + +3. 温度数据表(temperature) + - temp_id: VARCHAR(32) PRIMARY KEY COMMENT '温度记录ID' + - event_id: VARCHAR(50) NOT NULL COMMENT '关联事件ID' + - image_id: VARCHAR(40) NOT NULL COMMENT '关联图片ID' + - temperature: varchar(20) NOT NULL COMMENT '温度值' + - confidence: varchar(20) NOT NULL COMMENT '识别置信度' + - create_time: DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间' + - INDEX idx_event_id (event_id), + - INDEX idx_create_time (create_time), + - FOREIGN KEY (event_id) REFERENCES event(event_id), + - FOREIGN KEY (image_id) REFERENCES image(image_id) + +4. 处理日志表(process_log) + - log_id: VARCHAR(32) PRIMARY KEY COMMENT '日志ID' + - event_id: VARCHAR(32) NOT NULL COMMENT '关联事件ID' + - process_status: TINYINT NOT NULL COMMENT '处理状态' + - error_message: TEXT COMMENT '错误信息' + - create_time: DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间' + - INDEX idx_event_id (event_id), + - INDEX idx_create_time (create_time), + - FOREIGN KEY (event_id) REFERENCES event(event_id) + +# 性能要求: +- 系统响应时间:API接口响应时间<500ms +- 并发处理能力:支持10个并发请求 +- 数据处理能力:每分钟处理不少于100张图片 +- 系统可用性:99.9% + +# 安全要求: +- 实现接口认证机制 +- 敏感数据加密存储 +- 操作日志记录 +- 异常监控告警 + +# 部署要求: +- 支持Docker容器化部署 +- 支持多环境配置(开发、测试、生产) +- 提供部署文档和运维手册 + + + + diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..4b79e1a --- /dev/null +++ b/requirements.txt @@ -0,0 +1,4 @@ +sqlalchemy +pymysql +pydantic-settings +aiomysql \ No newline at end of file diff --git a/run_sync.py b/run_sync.py new file mode 100644 index 0000000..567e6cf --- /dev/null +++ b/run_sync.py @@ -0,0 +1,6 @@ +import asyncio +from app.services.event_sync_service import run_sync + +if __name__ == "__main__": + print("启动事件同步服务...") + asyncio.run(run_sync()) \ No newline at end of file diff --git a/test_db.py b/test_db.py new file mode 100644 index 0000000..d4f17c1 --- /dev/null +++ b/test_db.py @@ -0,0 +1,211 @@ +import asyncio +import uuid +from datetime import datetime +from sqlalchemy import select +from app.core.database import async_session, Base, engine +from app.models.models import Event, Image, Temperature, ProcessLog +from sqlalchemy.orm import selectinload + + +# 开发环境清空并创建数据.. +async def init_db(): + """初始化数据库""" + async with engine.begin() as conn: + + # 删除已注册的表 + await conn.run_sync(Base.metadata.drop_all) + + # 创建表 + await conn.run_sync(Base.metadata.create_all) + +async def test_create(): + """测试创建操作""" + print("\n=== 测试创建操作 ===") + + # 创建事件 + event = Event( + eventId=str(uuid.uuid4())[:30], + number="TEST001", + name="测试事件", + etype="TEST", + etypeName="测试类型", + insDate=datetime.now(), + insDateShow=datetime.now() + ) + + print("测试用例Id", event.eventId) + + async with async_session() as session: + session.add(event) + await session.commit() + await session.refresh(event) + print(f"创建事件成功: {event.eventId}") + + # 创建图片 + image = Image( + eventId=event.eventId, + imageUrl="http://example.com/test.jpg", + localPath="/tmp/test.jpg" + ) + session.add(image) + await session.commit() + await session.refresh(image) + print(f"创建图片成功: {image.imageId}") + + # 创建温度记录 + temperature = Temperature( + eventId=event.eventId, + imageId=image.imageId, + temperature="25.5", + confidence="0.95" + ) + session.add(temperature) + await session.commit() + await session.refresh(temperature) + print(f"创建温度记录成功: {temperature.tempId}") + + # 创建处理日志 + process_log = ProcessLog( + eventId=event.eventId, + processStatus=1, + errorMessage="处理成功" + ) + session.add(process_log) + await session.commit() + await session.refresh(process_log) + print(f"创建处理日志成功: {process_log.logId}") + + return event.eventId + +async def test_read(event_id): + """测试读取操作""" + print("\n=== 测试读取操作 ===") + + async with async_session() as session: + # 查询事件, 这括号可以没有, 完全是为了书写方便, 在一句话中能换行而不用加\. + query = ( + select(Event).where(Event.eventId == event_id).options( + selectinload(Event.images), + selectinload(Event.temperatures), + selectinload(Event.process_logs) + ) + ) + result = await session.execute(query) + event = result.scalar_one_or_none() + print(f"查询事件: {event.eventId if event else 'Not Found'}") + + if event: + # 查询关联的图片 + print(f"关联图片数量: {len(event.images)}") + for image in event.images: + print(f"图片ID: {image.imageId}, URL: {image.imageUrl}") + + # 查询关联的温度记录 + print(f"关联温度记录数量: {len(event.temperatures)}") + for temp in event.temperatures: + print(f"温度ID: {temp.tempId}, 温度值: {temp.temperature}") + + # 查询关联的处理日志 + print(f"关联处理日志数量: {len(event.process_logs)}") + for log in event.process_logs: + print(f"日志ID: {log.logId}, 状态: {log.processStatus}") + +async def test_update(event_id): + """测试更新操作""" + print("\n=== 测试更新操作 ===") + + async with async_session() as session: + # 更新事件 + query = select(Event).where(Event.eventId == event_id).options( + selectinload(Event.images), + selectinload(Event.temperatures), + selectinload(Event.process_logs) + ) + result = await session.execute(query) + event = result.scalar_one_or_none() + + if event: + event.name = "更新后的测试事件" + event.etypeName = "更新后的测试类型" + await session.commit() + print(f"更新事件成功: {event.eventId}") + + # 更新温度记录 + for temp in event.temperatures: + temp.temperature = "26.5" + temp.confidence = "0.98" + await session.commit() + print("更新温度记录成功") + +async def test_delete(event_id): + """测试删除操作""" + print("\n=== 测试删除操作 ===") + + async with async_session() as session: + # 删除事件(级联删除关联数据) + query = select(Event).where(Event.eventId == event_id) + result = await session.execute(query) + event = result.scalar_one_or_none() + + if event: + await session.delete(event) + await session.commit() + print(f"删除事件成功: {event_id}") + +async def test_transaction(): + """测试事务操作""" + print("\n=== 测试事务操作 ===") + + async with async_session() as session: + try: + # 开始事务 + event = Event( + eventId=str(uuid.uuid4()), + number="TEST002", + name="事务测试事件", + etype="TEST", + etypeName="测试类型", + insDate=datetime.now(), + insDateShow=datetime.now() + ) + session.add(event) + await session.flush() + + # 模拟错误 + raise Exception("模拟事务错误") + + await session.commit() + except Exception as e: + await session.rollback() + print(f"事务回滚成功: {str(e)}") + +async def main(): + """主测试函数""" + print("开始数据库测试...") + + # 初始化数据库 + # await init_db() + print("数据库初始化完成") + + # 测试创建 + event_id = await test_create() + + # 测试读取 + await test_read(event_id) + + # 测试更新 + await test_update(event_id) + + # 测试读取更新后的数据 + await test_read(event_id) + + # 测试事务 + await test_transaction() + + # 测试删除 + await test_delete(event_id) + + print("\n数据库测试完成") + +if __name__ == "__main__": + asyncio.run(main()) \ No newline at end of file diff --git a/test_t.py b/test_t.py new file mode 100644 index 0000000..8840abb --- /dev/null +++ b/test_t.py @@ -0,0 +1,24 @@ +# from app.uitl.getEventList import test + +from app.util.kangda import Kangda + +from app.util.baiduOcr import BadiduOcr + + +# #-----------------------测试获取kangda接口事件列表-------------------------------- +# kangda = Kangda() +# print(kangda.get_event_list()) +# #-----------------------测试获取kangda接口事件列表end--------------------------- + +#-------------------------测试百度ocr--------------------------------------------- +ocr = BadiduOcr() +result = ocr.image_inference("/home/admin-root/haotian/康达瑞贝斯/imagesKangda/0a7f94b11153fc79ef2dd8b18f9eb8ec.jpeg", True) +ocr.draw_result(result, + "/home/admin-root/haotian/康达瑞贝斯/imagesKangda/0a7f94b11153fc79ef2dd8b18f9eb8ec.jpeg", + "imagesOcr/1.jpeg") +#-------------------------测试百度ocrend----------------------------------------- + + + + + diff --git a/安装依赖.txt b/安装依赖.txt new file mode 100644 index 0000000..3e5539a --- /dev/null +++ b/安装依赖.txt @@ -0,0 +1,6 @@ +1. 安装fastapi + pip install fastapi uvicorn[standard] +2.启动 + uvicorn main:app --reload + 或者 + 程序中 uvicorn.run(app, 参数....), 直接运行程序就行. \ No newline at end of file