""" Session Manager for CAE Mesh Generator This module provides session management, timeout handling, and resource cleanup for ANSYS Mechanical sessions and other system resources. """ import logging import threading import time import weakref from datetime import datetime, timedelta from typing import Dict, Any, List, Optional, Callable from dataclasses import dataclass from enum import Enum logger = logging.getLogger(__name__) class SessionStatus(Enum): """Session status enumeration""" ACTIVE = "active" IDLE = "idle" TIMEOUT = "timeout" TERMINATED = "terminated" ERROR = "error" @dataclass class SessionInfo: """Session information container""" session_id: str session_type: str created_at: datetime last_activity: datetime status: SessionStatus timeout_minutes: int resource_info: Dict[str, Any] cleanup_callbacks: List[Callable] class SessionTimeoutManager: """ Session timeout and resource cleanup manager This class manages session timeouts, monitors resource usage, and performs cleanup operations for ANSYS sessions and other resources. """ def __init__(self, default_timeout_minutes: int = 30): """ Initialize session timeout manager Args: default_timeout_minutes: Default session timeout in minutes """ self.default_timeout_minutes = default_timeout_minutes self.sessions = {} # session_id -> SessionInfo self.cleanup_callbacks = {} # session_id -> list of cleanup functions self.monitoring_thread = None self.monitoring_active = False self.lock = threading.Lock() # Resource monitoring self.resource_monitors = [] self.cleanup_history = [] # Start monitoring thread self.start_monitoring() logger.info(f"Session Timeout Manager initialized with {default_timeout_minutes}min default timeout") def register_session(self, session_id: str, session_type: str, session_object: Any = None, timeout_minutes: int = None, cleanup_callbacks: List[Callable] = None) -> bool: """ Register a session for timeout monitoring Args: session_id: Unique session identifier session_type: Type of session (ansys, file_processing, etc.) session_object: The actual session object (stored as weak reference) timeout_minutes: Custom timeout for this session cleanup_callbacks: List of cleanup functions to call on timeout Returns: True if successfully registered """ try: with self.lock: if session_id in self.sessions: logger.warning(f"Session {session_id} already registered, updating...") # Create session info session_info = SessionInfo( session_id=session_id, session_type=session_type, created_at=datetime.now(), last_activity=datetime.now(), status=SessionStatus.ACTIVE, timeout_minutes=timeout_minutes or self.default_timeout_minutes, resource_info={ 'session_object_ref': weakref.ref(session_object) if session_object else None, 'memory_usage': 0, 'cpu_usage': 0 }, cleanup_callbacks=cleanup_callbacks or [] ) self.sessions[session_id] = session_info logger.info(f"Session registered: {session_id} ({session_type}, timeout: {session_info.timeout_minutes}min)") return True except Exception as e: logger.error(f"Failed to register session {session_id}: {str(e)}") return False def update_session_activity(self, session_id: str) -> bool: """ Update session activity timestamp Args: session_id: Session identifier Returns: True if successfully updated """ try: with self.lock: if session_id in self.sessions: session_info = self.sessions[session_id] session_info.last_activity = datetime.now() # Reactivate session if it was idle if session_info.status == SessionStatus.IDLE: session_info.status = SessionStatus.ACTIVE logger.debug(f"Session {session_id} reactivated") return True else: logger.warning(f"Session {session_id} not found for activity update") return False except Exception as e: logger.error(f"Failed to update session activity for {session_id}: {str(e)}") return False def unregister_session(self, session_id: str, perform_cleanup: bool = True) -> bool: """ Unregister a session and optionally perform cleanup Args: session_id: Session identifier perform_cleanup: Whether to perform cleanup callbacks Returns: True if successfully unregistered """ try: with self.lock: if session_id not in self.sessions: logger.warning(f"Session {session_id} not found for unregistration") return False session_info = self.sessions[session_id] # Perform cleanup if requested if perform_cleanup: self._perform_session_cleanup(session_info) # Remove from sessions del self.sessions[session_id] logger.info(f"Session unregistered: {session_id}") return True except Exception as e: logger.error(f"Failed to unregister session {session_id}: {str(e)}") return False def start_monitoring(self): """Start the session monitoring thread""" try: if self.monitoring_active: logger.warning("Session monitoring already active") return self.monitoring_active = True self.monitoring_thread = threading.Thread( target=self._monitoring_loop, daemon=True, name="SessionTimeoutMonitor" ) self.monitoring_thread.start() logger.info("Session monitoring started") except Exception as e: logger.error(f"Failed to start session monitoring: {str(e)}") self.monitoring_active = False def stop_monitoring(self): """Stop the session monitoring thread""" try: self.monitoring_active = False if self.monitoring_thread and self.monitoring_thread.is_alive(): self.monitoring_thread.join(timeout=5) logger.info("Session monitoring stopped") except Exception as e: logger.error(f"Failed to stop session monitoring: {str(e)}") def _monitoring_loop(self): """Main monitoring loop""" try: logger.info("Session monitoring loop started") while self.monitoring_active: try: # Check for timeouts self._check_session_timeouts() # Update resource usage self._update_resource_usage() # Perform periodic cleanup self._perform_periodic_cleanup() # Sleep for monitoring interval time.sleep(30) # Check every 30 seconds except Exception as loop_error: logger.error(f"Error in monitoring loop: {str(loop_error)}") time.sleep(60) # Wait longer on error logger.info("Session monitoring loop ended") except Exception as e: logger.error(f"Session monitoring loop failed: {str(e)}") self.monitoring_active = False def _check_session_timeouts(self): """Check for session timeouts and handle them""" try: current_time = datetime.now() timed_out_sessions = [] with self.lock: for session_id, session_info in self.sessions.items(): # Skip already terminated sessions if session_info.status in [SessionStatus.TERMINATED, SessionStatus.ERROR]: continue # Calculate time since last activity time_since_activity = current_time - session_info.last_activity timeout_threshold = timedelta(minutes=session_info.timeout_minutes) if time_since_activity > timeout_threshold: # Session has timed out session_info.status = SessionStatus.TIMEOUT timed_out_sessions.append(session_id) logger.warning(f"Session timeout detected: {session_id} (inactive for {time_since_activity})") elif time_since_activity > timeout_threshold * 0.8: # Session is approaching timeout if session_info.status == SessionStatus.ACTIVE: session_info.status = SessionStatus.IDLE logger.info(f"Session marked as idle: {session_id}") # Handle timed out sessions outside the lock for session_id in timed_out_sessions: self._handle_session_timeout(session_id) except Exception as e: logger.error(f"Session timeout check failed: {str(e)}") def _handle_session_timeout(self, session_id: str): """Handle a session timeout""" try: with self.lock: if session_id not in self.sessions: return session_info = self.sessions[session_id] logger.warning(f"Handling timeout for session: {session_id} ({session_info.session_type})") # Perform cleanup cleanup_success = self._perform_session_cleanup(session_info) # Update session status session_info.status = SessionStatus.TERMINATED if cleanup_success else SessionStatus.ERROR # Record cleanup in history self.cleanup_history.append({ 'session_id': session_id, 'session_type': session_info.session_type, 'cleanup_time': datetime.now(), 'reason': 'timeout', 'success': cleanup_success }) # Keep only recent cleanup history if len(self.cleanup_history) > 100: self.cleanup_history = self.cleanup_history[-100:] except Exception as e: logger.error(f"Failed to handle timeout for session {session_id}: {str(e)}") def _perform_session_cleanup(self, session_info: SessionInfo) -> bool: """Perform cleanup for a session""" try: cleanup_success = True logger.info(f"Performing cleanup for session: {session_info.session_id}") # Call registered cleanup callbacks for cleanup_callback in session_info.cleanup_callbacks: try: cleanup_callback() logger.debug(f"Cleanup callback executed for session: {session_info.session_id}") except Exception as callback_error: logger.error(f"Cleanup callback failed for session {session_info.session_id}: {str(callback_error)}") cleanup_success = False # Cleanup session object if it still exists if session_info.resource_info.get('session_object_ref'): session_obj_ref = session_info.resource_info['session_object_ref'] session_obj = session_obj_ref() if session_obj: try: # Try to close/cleanup the session object if hasattr(session_obj, 'close'): session_obj.close() elif hasattr(session_obj, 'cleanup'): session_obj.cleanup() elif hasattr(session_obj, 'terminate'): session_obj.terminate() logger.debug(f"Session object cleaned up for: {session_info.session_id}") except Exception as obj_cleanup_error: logger.error(f"Session object cleanup failed for {session_info.session_id}: {str(obj_cleanup_error)}") cleanup_success = False # Perform ANSYS-specific cleanup if applicable if session_info.session_type.lower() == 'ansys': cleanup_success &= self._perform_ansys_cleanup(session_info) return cleanup_success except Exception as e: logger.error(f"Session cleanup failed for {session_info.session_id}: {str(e)}") return False def _perform_ansys_cleanup(self, session_info: SessionInfo) -> bool: """Perform ANSYS-specific cleanup""" try: logger.info(f"Performing ANSYS cleanup for session: {session_info.session_id}") # Try to terminate ANSYS processes try: import psutil # Look for ANSYS processes ansys_keywords = ['ansys', 'mechanical', 'mapdl', 'fluent'] terminated_processes = [] for proc in psutil.process_iter(['pid', 'name', 'cmdline']): try: proc_info = proc.info proc_name = proc_info['name'].lower() # Check if this is an ANSYS process if any(keyword in proc_name for keyword in ansys_keywords): # Additional check to avoid terminating unrelated processes cmdline = proc_info.get('cmdline', []) if cmdline and any('ansys' in arg.lower() for arg in cmdline): proc.terminate() terminated_processes.append(proc_info['pid']) logger.info(f"Terminated ANSYS process: {proc_info['name']} (PID: {proc_info['pid']})") except (psutil.NoSuchProcess, psutil.AccessDenied): continue # Wait for processes to terminate if terminated_processes: time.sleep(2) # Force kill if still running for pid in terminated_processes: try: proc = psutil.Process(pid) if proc.is_running(): proc.kill() logger.warning(f"Force killed ANSYS process: PID {pid}") except psutil.NoSuchProcess: pass # Process already terminated except ImportError: logger.warning("psutil not available for ANSYS process cleanup") except Exception as proc_error: logger.error(f"ANSYS process cleanup failed: {str(proc_error)}") return False # Clean up temporary files try: import tempfile import glob temp_dir = tempfile.gettempdir() ansys_temp_patterns = [ 'ansys_*', 'mechanical_*', '*.mechdb', '*.rst', '*.rth' ] cleaned_files = 0 for pattern in ansys_temp_patterns: temp_files = glob.glob(os.path.join(temp_dir, pattern)) for temp_file in temp_files: try: # Only remove files older than 1 hour to be safe file_age = time.time() - os.path.getmtime(temp_file) if file_age > 3600: # 1 hour os.remove(temp_file) cleaned_files += 1 except Exception: continue if cleaned_files > 0: logger.info(f"Cleaned up {cleaned_files} ANSYS temporary files") except Exception as file_cleanup_error: logger.warning(f"ANSYS file cleanup failed: {str(file_cleanup_error)}") return True except Exception as e: logger.error(f"ANSYS cleanup failed: {str(e)}") return False def _update_resource_usage(self): """Update resource usage for active sessions""" try: import psutil with self.lock: for session_info in self.sessions.values(): if session_info.status in [SessionStatus.ACTIVE, SessionStatus.IDLE]: # Update basic resource info session_info.resource_info['memory_usage'] = psutil.virtual_memory().percent session_info.resource_info['cpu_usage'] = psutil.cpu_percent(interval=None) except Exception as e: logger.debug(f"Resource usage update failed: {str(e)}") def _perform_periodic_cleanup(self): """Perform periodic cleanup tasks""" try: # Clean up terminated sessions older than 1 hour current_time = datetime.now() cleanup_threshold = timedelta(hours=1) sessions_to_remove = [] with self.lock: for session_id, session_info in self.sessions.items(): if (session_info.status == SessionStatus.TERMINATED and current_time - session_info.last_activity > cleanup_threshold): sessions_to_remove.append(session_id) # Remove old terminated sessions for session_id in sessions_to_remove: with self.lock: if session_id in self.sessions: del self.sessions[session_id] logger.debug(f"Removed old terminated session: {session_id}") except Exception as e: logger.debug(f"Periodic cleanup failed: {str(e)}") def get_session_status(self, session_id: str) -> Optional[Dict[str, Any]]: """ Get status information for a session Args: session_id: Session identifier Returns: Dictionary with session status or None if not found """ try: with self.lock: if session_id not in self.sessions: return None session_info = self.sessions[session_id] return { 'session_id': session_info.session_id, 'session_type': session_info.session_type, 'status': session_info.status.value, 'created_at': session_info.created_at.isoformat(), 'last_activity': session_info.last_activity.isoformat(), 'timeout_minutes': session_info.timeout_minutes, 'time_until_timeout': self._calculate_time_until_timeout(session_info), 'resource_info': session_info.resource_info.copy() } except Exception as e: logger.error(f"Failed to get session status for {session_id}: {str(e)}") return None def _calculate_time_until_timeout(self, session_info: SessionInfo) -> float: """Calculate time until session timeout in minutes""" try: if session_info.status in [SessionStatus.TERMINATED, SessionStatus.ERROR]: return 0.0 time_since_activity = datetime.now() - session_info.last_activity timeout_threshold = timedelta(minutes=session_info.timeout_minutes) remaining_time = timeout_threshold - time_since_activity return max(0.0, remaining_time.total_seconds() / 60.0) except Exception: return 0.0 def get_all_sessions_status(self) -> Dict[str, Any]: """ Get status of all sessions Returns: Dictionary with all sessions status """ try: with self.lock: sessions_status = {} for session_id, session_info in self.sessions.items(): sessions_status[session_id] = { 'session_type': session_info.session_type, 'status': session_info.status.value, 'created_at': session_info.created_at.isoformat(), 'last_activity': session_info.last_activity.isoformat(), 'timeout_minutes': session_info.timeout_minutes, 'time_until_timeout': self._calculate_time_until_timeout(session_info) } # Summary statistics status_counts = {} for session_info in self.sessions.values(): status = session_info.status.value status_counts[status] = status_counts.get(status, 0) + 1 return { 'sessions': sessions_status, 'summary': { 'total_sessions': len(self.sessions), 'status_counts': status_counts, 'monitoring_active': self.monitoring_active, 'cleanup_history_count': len(self.cleanup_history) } } except Exception as e: logger.error(f"Failed to get all sessions status: {str(e)}") return {'error': str(e)} def force_cleanup_session(self, session_id: str) -> bool: """ Force cleanup of a specific session Args: session_id: Session identifier Returns: True if cleanup was successful """ try: with self.lock: if session_id not in self.sessions: logger.warning(f"Session {session_id} not found for force cleanup") return False session_info = self.sessions[session_id] logger.info(f"Force cleanup requested for session: {session_id}") # Perform cleanup cleanup_success = self._perform_session_cleanup(session_info) # Update session status with self.lock: if session_id in self.sessions: session_info.status = SessionStatus.TERMINATED if cleanup_success else SessionStatus.ERROR return cleanup_success except Exception as e: logger.error(f"Force cleanup failed for session {session_id}: {str(e)}") return False def get_manager_info(self) -> Dict[str, Any]: """ Get information about the session manager Returns: Dictionary with manager information """ return { 'manager_type': 'SessionTimeoutManager', 'default_timeout_minutes': self.default_timeout_minutes, 'monitoring_active': self.monitoring_active, 'total_sessions': len(self.sessions), 'cleanup_history_count': len(self.cleanup_history), 'capabilities': [ 'session_registration', 'timeout_monitoring', 'automatic_cleanup', 'resource_monitoring', 'ansys_specific_cleanup', 'force_cleanup', 'session_status_tracking' ] } # Global session timeout manager instance session_timeout_manager = SessionTimeoutManager()