""" Resource management utilities for CAE Mesh Generator """ import os import shutil import threading import time import atexit from datetime import datetime, timedelta from pathlib import Path from typing import List, Dict, Optional import logging from backend.utils.error_handler import log_processing_step, error_reporter logger = logging.getLogger(__name__) class ResourceManager: """Manages system resources including files, ANSYS sessions, and cleanup""" def __init__(self): self.temp_files: List[str] = [] self.temp_directories: List[str] = [] self.ansys_sessions: List = [] self.cleanup_lock = threading.Lock() self.auto_cleanup_enabled = True self.cleanup_interval = 3600 # 1 hour self.max_file_age = 24 * 3600 # 24 hours # Start cleanup thread self._start_cleanup_thread() # Register cleanup on exit atexit.register(self.cleanup_all) def register_temp_file(self, file_path: str) -> None: """Register a temporary file for cleanup""" with self.cleanup_lock: if file_path not in self.temp_files: self.temp_files.append(file_path) logger.debug(f"Registered temp file: {file_path}") def register_temp_directory(self, dir_path: str) -> None: """Register a temporary directory for cleanup""" with self.cleanup_lock: if dir_path not in self.temp_directories: self.temp_directories.append(dir_path) logger.debug(f"Registered temp directory: {dir_path}") def register_ansys_session(self, session) -> None: """Register an ANSYS session for cleanup""" with self.cleanup_lock: if session not in self.ansys_sessions: self.ansys_sessions.append(session) logger.debug(f"Registered ANSYS session: {session}") def cleanup_temp_files(self) -> Dict[str, int]: """Clean up temporary files""" cleaned_files = 0 failed_files = 0 with self.cleanup_lock: files_to_remove = self.temp_files.copy() self.temp_files.clear() for file_path in files_to_remove: try: if os.path.exists(file_path): os.remove(file_path) cleaned_files += 1 logger.debug(f"Cleaned temp file: {file_path}") except Exception as e: failed_files += 1 logger.warning(f"Failed to clean temp file {file_path}: {e}") error_reporter.add_warning(f"Failed to clean temp file: {file_path}", {'error': str(e)}) return {'cleaned': cleaned_files, 'failed': failed_files} def cleanup_temp_directories(self) -> Dict[str, int]: """Clean up temporary directories""" cleaned_dirs = 0 failed_dirs = 0 with self.cleanup_lock: dirs_to_remove = self.temp_directories.copy() self.temp_directories.clear() for dir_path in dirs_to_remove: try: if os.path.exists(dir_path): shutil.rmtree(dir_path) cleaned_dirs += 1 logger.debug(f"Cleaned temp directory: {dir_path}") except Exception as e: failed_dirs += 1 logger.warning(f"Failed to clean temp directory {dir_path}: {e}") error_reporter.add_warning(f"Failed to clean temp directory: {dir_path}", {'error': str(e)}) return {'cleaned': cleaned_dirs, 'failed': failed_dirs} def cleanup_ansys_sessions(self) -> Dict[str, int]: """Clean up ANSYS sessions""" closed_sessions = 0 failed_sessions = 0 with self.cleanup_lock: sessions_to_close = self.ansys_sessions.copy() self.ansys_sessions.clear() for session in sessions_to_close: try: if hasattr(session, 'exit'): session.exit() closed_sessions += 1 logger.debug(f"Closed ANSYS session: {session}") elif hasattr(session, 'close'): session.close() closed_sessions += 1 logger.debug(f"Closed ANSYS session: {session}") except Exception as e: failed_sessions += 1 logger.warning(f"Failed to close ANSYS session {session}: {e}") error_reporter.add_warning(f"Failed to close ANSYS session", {'error': str(e)}) return {'closed': closed_sessions, 'failed': failed_sessions} def cleanup_old_files(self, directories: List[str] = None) -> Dict[str, int]: """Clean up old files in specified directories""" if directories is None: directories = ['uploads', 'results', 'temp', 'static/visualizations'] cleaned_files = 0 failed_files = 0 total_size_freed = 0 cutoff_time = datetime.now() - timedelta(seconds=self.max_file_age) for directory in directories: if not os.path.exists(directory): continue try: for root, dirs, files in os.walk(directory): for file in files: file_path = os.path.join(root, file) try: # Check file age file_mtime = datetime.fromtimestamp(os.path.getmtime(file_path)) if file_mtime < cutoff_time: file_size = os.path.getsize(file_path) os.remove(file_path) cleaned_files += 1 total_size_freed += file_size logger.debug(f"Cleaned old file: {file_path}") except Exception as e: failed_files += 1 logger.warning(f"Failed to clean old file {file_path}: {e}") except Exception as e: logger.warning(f"Failed to process directory {directory}: {e}") return { 'cleaned': cleaned_files, 'failed': failed_files, 'size_freed_mb': round(total_size_freed / (1024 * 1024), 2) } def cleanup_all(self) -> Dict[str, Dict]: """Perform complete cleanup of all resources""" log_processing_step("resource_cleanup", "started") results = { 'temp_files': self.cleanup_temp_files(), 'temp_directories': self.cleanup_temp_directories(), 'ansys_sessions': self.cleanup_ansys_sessions(), 'old_files': self.cleanup_old_files() } total_cleaned = ( results['temp_files']['cleaned'] + results['temp_directories']['cleaned'] + results['ansys_sessions']['closed'] + results['old_files']['cleaned'] ) total_failed = ( results['temp_files']['failed'] + results['temp_directories']['failed'] + results['ansys_sessions']['failed'] + results['old_files']['failed'] ) logger.info(f"Resource cleanup completed: {total_cleaned} items cleaned, {total_failed} failed") log_processing_step("resource_cleanup", "completed", { 'total_cleaned': total_cleaned, 'total_failed': total_failed, 'results': results }) return results def get_resource_status(self) -> Dict[str, any]: """Get current resource status""" with self.cleanup_lock: return { 'temp_files_count': len(self.temp_files), 'temp_directories_count': len(self.temp_directories), 'ansys_sessions_count': len(self.ansys_sessions), 'auto_cleanup_enabled': self.auto_cleanup_enabled, 'cleanup_interval': self.cleanup_interval, 'max_file_age_hours': self.max_file_age / 3600 } def _start_cleanup_thread(self): """Start background cleanup thread""" def cleanup_worker(): while self.auto_cleanup_enabled: try: time.sleep(self.cleanup_interval) if self.auto_cleanup_enabled: logger.info("Starting automatic resource cleanup") self.cleanup_old_files() except Exception as e: logger.error(f"Error in cleanup thread: {e}") cleanup_thread = threading.Thread(target=cleanup_worker, daemon=True) cleanup_thread.start() logger.info("Resource cleanup thread started") def stop_auto_cleanup(self): """Stop automatic cleanup""" self.auto_cleanup_enabled = False logger.info("Automatic resource cleanup stopped") def start_auto_cleanup(self): """Start automatic cleanup""" if not self.auto_cleanup_enabled: self.auto_cleanup_enabled = True self._start_cleanup_thread() logger.info("Automatic resource cleanup started") class ANSYSSessionManager: """Manages ANSYS sessions with automatic cleanup""" def __init__(self, resource_manager: ResourceManager): self.resource_manager = resource_manager self.active_sessions = {} self.session_lock = threading.Lock() def create_session(self, session_id: str = None, **kwargs): """Create a new ANSYS session with automatic cleanup registration""" if session_id is None: session_id = f"session_{datetime.now().strftime('%Y%m%d_%H%M%S')}" try: # Import ANSYS modules import ansys.mechanical.core as mech # Create session session = mech.launch_mechanical(batch=True, **kwargs) with self.session_lock: self.active_sessions[session_id] = session self.resource_manager.register_ansys_session(session) logger.info(f"Created ANSYS session: {session_id}") return session_id, session except ImportError: logger.warning("ANSYS Mechanical not available") raise except Exception as e: logger.error(f"Failed to create ANSYS session: {e}") raise def get_session(self, session_id: str): """Get an existing ANSYS session""" with self.session_lock: return self.active_sessions.get(session_id) def close_session(self, session_id: str) -> bool: """Close a specific ANSYS session""" with self.session_lock: session = self.active_sessions.pop(session_id, None) if session: try: if hasattr(session, 'exit'): session.exit() elif hasattr(session, 'close'): session.close() logger.info(f"Closed ANSYS session: {session_id}") return True except Exception as e: logger.warning(f"Error closing ANSYS session {session_id}: {e}") return False return False def close_all_sessions(self) -> Dict[str, bool]: """Close all active ANSYS sessions""" results = {} with self.session_lock: session_ids = list(self.active_sessions.keys()) for session_id in session_ids: results[session_id] = self.close_session(session_id) return results def get_active_sessions(self) -> List[str]: """Get list of active session IDs""" with self.session_lock: return list(self.active_sessions.keys()) class FileManager: """Manages file operations with automatic cleanup""" def __init__(self, resource_manager: ResourceManager): self.resource_manager = resource_manager def create_temp_file(self, suffix: str = '', prefix: str = 'temp_', directory: str = 'temp') -> str: """Create a temporary file with automatic cleanup registration""" os.makedirs(directory, exist_ok=True) timestamp = datetime.now().strftime('%Y%m%d_%H%M%S_%f') filename = f"{prefix}{timestamp}{suffix}" file_path = os.path.join(directory, filename) # Create empty file Path(file_path).touch() # Register for cleanup self.resource_manager.register_temp_file(file_path) logger.debug(f"Created temp file: {file_path}") return file_path def create_temp_directory(self, prefix: str = 'temp_', parent_dir: str = 'temp') -> str: """Create a temporary directory with automatic cleanup registration""" os.makedirs(parent_dir, exist_ok=True) timestamp = datetime.now().strftime('%Y%m%d_%H%M%S_%f') dir_name = f"{prefix}{timestamp}" dir_path = os.path.join(parent_dir, dir_name) os.makedirs(dir_path) # Register for cleanup self.resource_manager.register_temp_directory(dir_path) logger.debug(f"Created temp directory: {dir_path}") return dir_path def safe_copy(self, src: str, dst: str) -> bool: """Safely copy a file with error handling""" try: # Ensure destination directory exists os.makedirs(os.path.dirname(dst), exist_ok=True) shutil.copy2(src, dst) logger.debug(f"Copied file: {src} -> {dst}") return True except Exception as e: logger.error(f"Failed to copy file {src} -> {dst}: {e}") error_reporter.add_error(f"File copy failed: {src} -> {dst}", details={'error': str(e)}) return False def safe_move(self, src: str, dst: str) -> bool: """Safely move a file with error handling""" try: # Ensure destination directory exists os.makedirs(os.path.dirname(dst), exist_ok=True) shutil.move(src, dst) logger.debug(f"Moved file: {src} -> {dst}") return True except Exception as e: logger.error(f"Failed to move file {src} -> {dst}: {e}") error_reporter.add_error(f"File move failed: {src} -> {dst}", details={'error': str(e)}) return False def safe_delete(self, file_path: str) -> bool: """Safely delete a file with error handling""" try: if os.path.exists(file_path): os.remove(file_path) logger.debug(f"Deleted file: {file_path}") return True return True # File doesn't exist, consider it deleted except Exception as e: logger.error(f"Failed to delete file {file_path}: {e}") error_reporter.add_error(f"File deletion failed: {file_path}", details={'error': str(e)}) return False # Global resource manager instance resource_manager = ResourceManager() ansys_session_manager = ANSYSSessionManager(resource_manager) file_manager = FileManager(resource_manager)