AnsysLink/backend/utils/resource_manager.py

398 lines
15 KiB
Python

"""
Resource management utilities for CAE Mesh Generator
"""
import os
import shutil
import threading
import time
import atexit
from datetime import datetime, timedelta
from pathlib import Path
from typing import List, Dict, Optional
import logging
from backend.utils.error_handler import log_processing_step, error_reporter
logger = logging.getLogger(__name__)
class ResourceManager:
"""Manages system resources including files, ANSYS sessions, and cleanup"""
def __init__(self):
self.temp_files: List[str] = []
self.temp_directories: List[str] = []
self.ansys_sessions: List = []
self.cleanup_lock = threading.Lock()
self.auto_cleanup_enabled = True
self.cleanup_interval = 3600 # 1 hour
self.max_file_age = 24 * 3600 # 24 hours
# Start cleanup thread
self._start_cleanup_thread()
# Register cleanup on exit
atexit.register(self.cleanup_all)
def register_temp_file(self, file_path: str) -> None:
"""Register a temporary file for cleanup"""
with self.cleanup_lock:
if file_path not in self.temp_files:
self.temp_files.append(file_path)
logger.debug(f"Registered temp file: {file_path}")
def register_temp_directory(self, dir_path: str) -> None:
"""Register a temporary directory for cleanup"""
with self.cleanup_lock:
if dir_path not in self.temp_directories:
self.temp_directories.append(dir_path)
logger.debug(f"Registered temp directory: {dir_path}")
def register_ansys_session(self, session) -> None:
"""Register an ANSYS session for cleanup"""
with self.cleanup_lock:
if session not in self.ansys_sessions:
self.ansys_sessions.append(session)
logger.debug(f"Registered ANSYS session: {session}")
def cleanup_temp_files(self) -> Dict[str, int]:
"""Clean up temporary files"""
cleaned_files = 0
failed_files = 0
with self.cleanup_lock:
files_to_remove = self.temp_files.copy()
self.temp_files.clear()
for file_path in files_to_remove:
try:
if os.path.exists(file_path):
os.remove(file_path)
cleaned_files += 1
logger.debug(f"Cleaned temp file: {file_path}")
except Exception as e:
failed_files += 1
logger.warning(f"Failed to clean temp file {file_path}: {e}")
error_reporter.add_warning(f"Failed to clean temp file: {file_path}", {'error': str(e)})
return {'cleaned': cleaned_files, 'failed': failed_files}
def cleanup_temp_directories(self) -> Dict[str, int]:
"""Clean up temporary directories"""
cleaned_dirs = 0
failed_dirs = 0
with self.cleanup_lock:
dirs_to_remove = self.temp_directories.copy()
self.temp_directories.clear()
for dir_path in dirs_to_remove:
try:
if os.path.exists(dir_path):
shutil.rmtree(dir_path)
cleaned_dirs += 1
logger.debug(f"Cleaned temp directory: {dir_path}")
except Exception as e:
failed_dirs += 1
logger.warning(f"Failed to clean temp directory {dir_path}: {e}")
error_reporter.add_warning(f"Failed to clean temp directory: {dir_path}", {'error': str(e)})
return {'cleaned': cleaned_dirs, 'failed': failed_dirs}
def cleanup_ansys_sessions(self) -> Dict[str, int]:
"""Clean up ANSYS sessions"""
closed_sessions = 0
failed_sessions = 0
with self.cleanup_lock:
sessions_to_close = self.ansys_sessions.copy()
self.ansys_sessions.clear()
for session in sessions_to_close:
try:
if hasattr(session, 'exit'):
session.exit()
closed_sessions += 1
logger.debug(f"Closed ANSYS session: {session}")
elif hasattr(session, 'close'):
session.close()
closed_sessions += 1
logger.debug(f"Closed ANSYS session: {session}")
except Exception as e:
failed_sessions += 1
logger.warning(f"Failed to close ANSYS session {session}: {e}")
error_reporter.add_warning(f"Failed to close ANSYS session", {'error': str(e)})
return {'closed': closed_sessions, 'failed': failed_sessions}
def cleanup_old_files(self, directories: List[str] = None) -> Dict[str, int]:
"""Clean up old files in specified directories"""
if directories is None:
directories = ['uploads', 'results', 'temp', 'static/visualizations']
cleaned_files = 0
failed_files = 0
total_size_freed = 0
cutoff_time = datetime.now() - timedelta(seconds=self.max_file_age)
for directory in directories:
if not os.path.exists(directory):
continue
try:
for root, dirs, files in os.walk(directory):
for file in files:
file_path = os.path.join(root, file)
try:
# Check file age
file_mtime = datetime.fromtimestamp(os.path.getmtime(file_path))
if file_mtime < cutoff_time:
file_size = os.path.getsize(file_path)
os.remove(file_path)
cleaned_files += 1
total_size_freed += file_size
logger.debug(f"Cleaned old file: {file_path}")
except Exception as e:
failed_files += 1
logger.warning(f"Failed to clean old file {file_path}: {e}")
except Exception as e:
logger.warning(f"Failed to process directory {directory}: {e}")
return {
'cleaned': cleaned_files,
'failed': failed_files,
'size_freed_mb': round(total_size_freed / (1024 * 1024), 2)
}
def cleanup_all(self) -> Dict[str, Dict]:
"""Perform complete cleanup of all resources"""
log_processing_step("resource_cleanup", "started")
results = {
'temp_files': self.cleanup_temp_files(),
'temp_directories': self.cleanup_temp_directories(),
'ansys_sessions': self.cleanup_ansys_sessions(),
'old_files': self.cleanup_old_files()
}
total_cleaned = (
results['temp_files']['cleaned'] +
results['temp_directories']['cleaned'] +
results['ansys_sessions']['closed'] +
results['old_files']['cleaned']
)
total_failed = (
results['temp_files']['failed'] +
results['temp_directories']['failed'] +
results['ansys_sessions']['failed'] +
results['old_files']['failed']
)
logger.info(f"Resource cleanup completed: {total_cleaned} items cleaned, {total_failed} failed")
log_processing_step("resource_cleanup", "completed", {
'total_cleaned': total_cleaned,
'total_failed': total_failed,
'results': results
})
return results
def get_resource_status(self) -> Dict[str, any]:
"""Get current resource status"""
with self.cleanup_lock:
return {
'temp_files_count': len(self.temp_files),
'temp_directories_count': len(self.temp_directories),
'ansys_sessions_count': len(self.ansys_sessions),
'auto_cleanup_enabled': self.auto_cleanup_enabled,
'cleanup_interval': self.cleanup_interval,
'max_file_age_hours': self.max_file_age / 3600
}
def _start_cleanup_thread(self):
"""Start background cleanup thread"""
def cleanup_worker():
while self.auto_cleanup_enabled:
try:
time.sleep(self.cleanup_interval)
if self.auto_cleanup_enabled:
logger.info("Starting automatic resource cleanup")
self.cleanup_old_files()
except Exception as e:
logger.error(f"Error in cleanup thread: {e}")
cleanup_thread = threading.Thread(target=cleanup_worker, daemon=True)
cleanup_thread.start()
logger.info("Resource cleanup thread started")
def stop_auto_cleanup(self):
"""Stop automatic cleanup"""
self.auto_cleanup_enabled = False
logger.info("Automatic resource cleanup stopped")
def start_auto_cleanup(self):
"""Start automatic cleanup"""
if not self.auto_cleanup_enabled:
self.auto_cleanup_enabled = True
self._start_cleanup_thread()
logger.info("Automatic resource cleanup started")
class ANSYSSessionManager:
"""Manages ANSYS sessions with automatic cleanup"""
def __init__(self, resource_manager: ResourceManager):
self.resource_manager = resource_manager
self.active_sessions = {}
self.session_lock = threading.Lock()
def create_session(self, session_id: str = None, **kwargs):
"""Create a new ANSYS session with automatic cleanup registration"""
if session_id is None:
session_id = f"session_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
try:
# Import ANSYS modules
import ansys.mechanical.core as mech
# Create session
session = mech.launch_mechanical(batch=True, **kwargs)
with self.session_lock:
self.active_sessions[session_id] = session
self.resource_manager.register_ansys_session(session)
logger.info(f"Created ANSYS session: {session_id}")
return session_id, session
except ImportError:
logger.warning("ANSYS Mechanical not available")
raise
except Exception as e:
logger.error(f"Failed to create ANSYS session: {e}")
raise
def get_session(self, session_id: str):
"""Get an existing ANSYS session"""
with self.session_lock:
return self.active_sessions.get(session_id)
def close_session(self, session_id: str) -> bool:
"""Close a specific ANSYS session"""
with self.session_lock:
session = self.active_sessions.pop(session_id, None)
if session:
try:
if hasattr(session, 'exit'):
session.exit()
elif hasattr(session, 'close'):
session.close()
logger.info(f"Closed ANSYS session: {session_id}")
return True
except Exception as e:
logger.warning(f"Error closing ANSYS session {session_id}: {e}")
return False
return False
def close_all_sessions(self) -> Dict[str, bool]:
"""Close all active ANSYS sessions"""
results = {}
with self.session_lock:
session_ids = list(self.active_sessions.keys())
for session_id in session_ids:
results[session_id] = self.close_session(session_id)
return results
def get_active_sessions(self) -> List[str]:
"""Get list of active session IDs"""
with self.session_lock:
return list(self.active_sessions.keys())
class FileManager:
"""Manages file operations with automatic cleanup"""
def __init__(self, resource_manager: ResourceManager):
self.resource_manager = resource_manager
def create_temp_file(self, suffix: str = '', prefix: str = 'temp_', directory: str = 'temp') -> str:
"""Create a temporary file with automatic cleanup registration"""
os.makedirs(directory, exist_ok=True)
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S_%f')
filename = f"{prefix}{timestamp}{suffix}"
file_path = os.path.join(directory, filename)
# Create empty file
Path(file_path).touch()
# Register for cleanup
self.resource_manager.register_temp_file(file_path)
logger.debug(f"Created temp file: {file_path}")
return file_path
def create_temp_directory(self, prefix: str = 'temp_', parent_dir: str = 'temp') -> str:
"""Create a temporary directory with automatic cleanup registration"""
os.makedirs(parent_dir, exist_ok=True)
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S_%f')
dir_name = f"{prefix}{timestamp}"
dir_path = os.path.join(parent_dir, dir_name)
os.makedirs(dir_path)
# Register for cleanup
self.resource_manager.register_temp_directory(dir_path)
logger.debug(f"Created temp directory: {dir_path}")
return dir_path
def safe_copy(self, src: str, dst: str) -> bool:
"""Safely copy a file with error handling"""
try:
# Ensure destination directory exists
os.makedirs(os.path.dirname(dst), exist_ok=True)
shutil.copy2(src, dst)
logger.debug(f"Copied file: {src} -> {dst}")
return True
except Exception as e:
logger.error(f"Failed to copy file {src} -> {dst}: {e}")
error_reporter.add_error(f"File copy failed: {src} -> {dst}", details={'error': str(e)})
return False
def safe_move(self, src: str, dst: str) -> bool:
"""Safely move a file with error handling"""
try:
# Ensure destination directory exists
os.makedirs(os.path.dirname(dst), exist_ok=True)
shutil.move(src, dst)
logger.debug(f"Moved file: {src} -> {dst}")
return True
except Exception as e:
logger.error(f"Failed to move file {src} -> {dst}: {e}")
error_reporter.add_error(f"File move failed: {src} -> {dst}", details={'error': str(e)})
return False
def safe_delete(self, file_path: str) -> bool:
"""Safely delete a file with error handling"""
try:
if os.path.exists(file_path):
os.remove(file_path)
logger.debug(f"Deleted file: {file_path}")
return True
return True # File doesn't exist, consider it deleted
except Exception as e:
logger.error(f"Failed to delete file {file_path}: {e}")
error_reporter.add_error(f"File deletion failed: {file_path}", details={'error': str(e)})
return False
# Global resource manager instance
resource_manager = ResourceManager()
ansys_session_manager = ANSYSSessionManager(resource_manager)
file_manager = FileManager(resource_manager)