""" Real Progress Tracker for CAE Mesh Generator This module provides real-time progress monitoring for ANSYS Mechanical operations using PyMechanical API to track mesh generation and other operations. """ import logging import time import threading from typing import Dict, Any, Optional, Callable, List from datetime import datetime, timedelta from dataclasses import dataclass from enum import Enum logger = logging.getLogger(__name__) class OperationStage(Enum): """ANSYS operation stages""" INITIALIZING = "initializing" GEOMETRY_IMPORT = "geometry_import" MESH_SETUP = "mesh_setup" MESH_GENERATION = "mesh_generation" QUALITY_CHECK = "quality_check" FILE_EXPORT = "file_export" VISUALIZATION = "visualization" COMPLETED = "completed" FAILED = "failed" @dataclass class ProgressInfo: """Progress information container""" stage: OperationStage = OperationStage.INITIALIZING percentage: float = 0.0 message: str = "" current_operation: str = "" estimated_remaining_time: float = 0.0 started_at: datetime = None last_updated: datetime = None stage_start_time: datetime = None detailed_info: Dict[str, Any] = None def __post_init__(self): if self.detailed_info is None: self.detailed_info = {} if self.started_at is None: self.started_at = datetime.now() if self.last_updated is None: self.last_updated = datetime.now() class RealProgressTracker: """ Real-time progress tracker for ANSYS Mechanical operations This class monitors actual ANSYS operations and provides accurate progress information including stage identification and time estimation. """ def __init__(self, mechanical_session): """ Initialize real progress tracker Args: mechanical_session: Active PyMechanical session """ if mechanical_session is None: raise ValueError("Mechanical session is required for progress tracking") self.mechanical = mechanical_session self.current_progress = ProgressInfo() self.progress_callbacks = [] self.is_tracking = False self.tracking_thread = None self.operation_history = [] # Initialize progress data analyzer try: from backend.pymechanical.progress_data_analyzer import ProgressDataAnalyzer self.data_analyzer = ProgressDataAnalyzer() except Exception as e: logger.warning(f"Could not initialize progress data analyzer: {str(e)}") self.data_analyzer = None # Stage timing estimates (in seconds) based on typical operations self.stage_estimates = { OperationStage.INITIALIZING: 5, OperationStage.GEOMETRY_IMPORT: 15, OperationStage.MESH_SETUP: 10, OperationStage.MESH_GENERATION: 120, # Most time-consuming OperationStage.QUALITY_CHECK: 20, OperationStage.FILE_EXPORT: 15, OperationStage.VISUALIZATION: 10 } # Stage progress weights for overall percentage calculation self.stage_weights = { OperationStage.INITIALIZING: 5, OperationStage.GEOMETRY_IMPORT: 15, OperationStage.MESH_SETUP: 10, OperationStage.MESH_GENERATION: 50, OperationStage.QUALITY_CHECK: 10, OperationStage.FILE_EXPORT: 7, OperationStage.VISUALIZATION: 3 } logger.info("Real Progress Tracker initialized") def add_progress_callback(self, callback: Callable[[ProgressInfo], None]): """ Add progress update callback Args: callback: Function to call when progress updates """ self.progress_callbacks.append(callback) logger.debug(f"Progress callback added, total callbacks: {len(self.progress_callbacks)}") def start_tracking(self, operation_name: str = "ANSYS Operation"): """ Start progress tracking Args: operation_name: Name of the operation being tracked """ try: if self.is_tracking: logger.warning("Progress tracking already active") return self.is_tracking = True self.current_progress = ProgressInfo( stage=OperationStage.INITIALIZING, message=f"Starting {operation_name}...", current_operation=operation_name, started_at=datetime.now(), stage_start_time=datetime.now() ) # Start operation analysis if analyzer is available if self.data_analyzer: operation_context = { 'operation_name': operation_name, 'complexity': 'medium', # Default, can be enhanced 'start_time': datetime.now() } self.data_analyzer.start_operation_analysis('mesh_generation', operation_context) # Start background tracking thread self.tracking_thread = threading.Thread( target=self._tracking_loop, args=(operation_name,), daemon=True ) self.tracking_thread.start() logger.info(f"Progress tracking started for: {operation_name}") self._notify_callbacks() except Exception as e: logger.error(f"Failed to start progress tracking: {str(e)}") self.is_tracking = False def stop_tracking(self, success: bool = True, final_message: str = None): """ Stop progress tracking Args: success: Whether the operation completed successfully final_message: Final status message """ try: self.is_tracking = False if success: self.current_progress.stage = OperationStage.COMPLETED self.current_progress.percentage = 100.0 self.current_progress.message = final_message or "Operation completed successfully" else: self.current_progress.stage = OperationStage.FAILED self.current_progress.message = final_message or "Operation failed" self.current_progress.last_updated = datetime.now() self.current_progress.estimated_remaining_time = 0.0 # Complete operation analysis if analyzer is available if self.data_analyzer: try: final_data = { 'final_stage': self.current_progress.stage.value, 'element_count': self.current_progress.detailed_info.get('element_count', 0), 'final_message': self.current_progress.message } self.data_analyzer.complete_operation_analysis(success, final_data) except Exception as analyzer_error: logger.warning(f"Error completing operation analysis: {str(analyzer_error)}") # Add to history operation_record = { 'operation': self.current_progress.current_operation, 'started_at': self.current_progress.started_at, 'completed_at': self.current_progress.last_updated, 'success': success, 'final_stage': self.current_progress.stage.value, 'total_time': (self.current_progress.last_updated - self.current_progress.started_at).total_seconds() } # Add detailed info if available if self.current_progress.detailed_info: operation_record['detailed_info'] = self.current_progress.detailed_info.copy() self.operation_history.append(operation_record) logger.info(f"Progress tracking stopped: {self.current_progress.message}") self._notify_callbacks() except Exception as e: logger.error(f"Error stopping progress tracking: {str(e)}") def update_stage(self, stage: OperationStage, message: str = None, stage_progress: float = 0.0): """ Update current operation stage Args: stage: New operation stage message: Stage-specific message stage_progress: Progress within current stage (0-100) """ try: if not self.is_tracking: return # Update stage information old_stage = self.current_progress.stage self.current_progress.stage = stage self.current_progress.message = message or f"Processing {stage.value.replace('_', ' ')}..." self.current_progress.last_updated = datetime.now() # Reset stage start time if stage changed if old_stage != stage: self.current_progress.stage_start_time = datetime.now() logger.info(f"Stage changed: {old_stage.value} -> {stage.value}") # Use data analyzer for enhanced progress calculation if available if self.data_analyzer: try: # Update analyzer with current progress operation_data = { 'element_count': self.current_progress.detailed_info.get('element_count', 0), 'mesh_status': self.current_progress.detailed_info.get('mesh_status', 'unknown') } progress_report = self.data_analyzer.update_operation_progress( stage.value, stage_progress, operation_data ) # Update progress info with analyzer results self.current_progress.percentage = progress_report.overall_progress self.current_progress.estimated_remaining_time = progress_report.estimated_remaining_time # Add detailed analysis to progress info self.current_progress.detailed_info.update({ 'confidence_level': progress_report.confidence_level, 'operation_velocity': progress_report.operation_velocity, 'performance_metrics': progress_report.performance_metrics, 'recommendations': progress_report.recommendations }) except Exception as analyzer_error: logger.warning(f"Data analyzer error: {str(analyzer_error)}") # Fallback to basic calculation self.current_progress.percentage = self._calculate_overall_progress(stage, stage_progress) self.current_progress.estimated_remaining_time = self._estimate_remaining_time(stage, stage_progress) else: # Basic calculation without analyzer self.current_progress.percentage = self._calculate_overall_progress(stage, stage_progress) self.current_progress.estimated_remaining_time = self._estimate_remaining_time(stage, stage_progress) self._notify_callbacks() except Exception as e: logger.error(f"Error updating stage: {str(e)}") def get_current_progress(self) -> ProgressInfo: """ Get current progress information Returns: Current ProgressInfo """ return self.current_progress def _tracking_loop(self, operation_name: str): """ Background tracking loop that monitors ANSYS operations Args: operation_name: Name of the operation being tracked """ try: logger.info(f"Starting tracking loop for: {operation_name}") while self.is_tracking: try: # Monitor ANSYS status through PyMechanical ansys_status = self._get_ansys_status() if ansys_status: self._process_ansys_status(ansys_status) # Sleep for a short interval time.sleep(2.0) # Check every 2 seconds except Exception as loop_error: logger.warning(f"Error in tracking loop: {str(loop_error)}") time.sleep(5.0) # Wait longer on error logger.info("Tracking loop ended") except Exception as e: logger.error(f"Tracking loop failed: {str(e)}") self.is_tracking = False def _get_ansys_status(self) -> Optional[Dict[str, Any]]: """ Get current ANSYS operation status Returns: Dictionary with ANSYS status information """ try: # Query ANSYS for current operation status status_script = ''' # Get ANSYS operation status try: import time status_info = { "timestamp": time.time(), "is_busy": False, "current_operation": "idle", "mesh_status": "unknown", "element_count": 0, "node_count": 0, "last_message": "" } # Check if mesh generation is in progress try: mesh = Model.Mesh if mesh: # Try to get mesh statistics if hasattr(mesh, 'Elements') and mesh.Elements: if hasattr(mesh.Elements, 'Count'): status_info["element_count"] = mesh.Elements.Count elif hasattr(mesh.Elements, '__len__'): status_info["element_count"] = len(mesh.Elements) if hasattr(mesh, 'Nodes') and mesh.Nodes: if hasattr(mesh.Nodes, 'Count'): status_info["node_count"] = mesh.Nodes.Count elif hasattr(mesh.Nodes, '__len__'): status_info["node_count"] = len(mesh.Nodes) # Determine mesh status if status_info["element_count"] > 0: status_info["mesh_status"] = "generated" status_info["current_operation"] = "mesh_complete" else: status_info["mesh_status"] = "not_generated" status_info["current_operation"] = "mesh_pending" except Exception as mesh_error: status_info["last_message"] = "Error checking mesh: " + str(mesh_error) # Check for active operations (this is simplified - real implementation would be more complex) try: # In a real implementation, you would check ANSYS internal status # For now, we'll use basic heuristics status_info["is_busy"] = False # Simplified except Exception as busy_error: status_info["last_message"] = "Error checking busy status: " + str(busy_error) print("STATUS_INFO_START") print("TIMESTAMP:" + str(status_info["timestamp"])) print("IS_BUSY:" + str(status_info["is_busy"])) print("CURRENT_OPERATION:" + str(status_info["current_operation"])) print("MESH_STATUS:" + str(status_info["mesh_status"])) print("ELEMENT_COUNT:" + str(status_info["element_count"])) print("NODE_COUNT:" + str(status_info["node_count"])) print("LAST_MESSAGE:" + str(status_info["last_message"])) print("STATUS_INFO_END") except Exception as e: print("STATUS_ERROR:" + str(e)) ''' result = self.mechanical.run_python_script(status_script) if result: return self._parse_status_result(result) return None except Exception as e: logger.warning(f"Failed to get ANSYS status: {str(e)}") return None def _parse_status_result(self, result: str) -> Optional[Dict[str, Any]]: """ Parse ANSYS status result from script output Args: result: Script output string Returns: Parsed status dictionary """ try: status_info = {} lines = str(result).split('\\n') in_status_section = False for line in lines: if line.strip() == "STATUS_INFO_START": in_status_section = True continue elif line.strip() == "STATUS_INFO_END": break elif in_status_section and ':' in line: key, value = line.split(':', 1) key = key.strip().lower() value = value.strip() # Convert values to appropriate types if key in ['element_count', 'node_count']: try: status_info[key] = int(value) except ValueError: status_info[key] = 0 elif key == 'timestamp': try: status_info[key] = float(value) except ValueError: status_info[key] = time.time() elif key == 'is_busy': status_info[key] = value.lower() in ['true', '1', 'yes'] else: status_info[key] = value return status_info if status_info else None except Exception as e: logger.warning(f"Failed to parse status result: {str(e)}") return None def _process_ansys_status(self, status: Dict[str, Any]): """ Process ANSYS status and update progress accordingly Args: status: ANSYS status dictionary """ try: current_op = status.get('current_operation', 'unknown') element_count = status.get('element_count', 0) mesh_status = status.get('mesh_status', 'unknown') # Update detailed info self.current_progress.detailed_info.update({ 'ansys_status': status, 'element_count': element_count, 'mesh_status': mesh_status }) # Determine stage based on ANSYS status if current_op == 'mesh_complete' and element_count > 0: if self.current_progress.stage in [OperationStage.MESH_GENERATION, OperationStage.MESH_SETUP]: self.update_stage( OperationStage.QUALITY_CHECK, f"Mesh generated with {element_count} elements, checking quality...", 0.0 ) elif current_op == 'mesh_pending': if self.current_progress.stage == OperationStage.INITIALIZING: self.update_stage( OperationStage.MESH_SETUP, "Setting up mesh parameters...", 0.0 ) elif self.current_progress.stage == OperationStage.MESH_SETUP: self.update_stage( OperationStage.MESH_GENERATION, "Generating mesh...", 0.0 ) except Exception as e: logger.warning(f"Error processing ANSYS status: {str(e)}") def _calculate_overall_progress(self, current_stage: OperationStage, stage_progress: float) -> float: """ Calculate overall progress percentage Args: current_stage: Current operation stage stage_progress: Progress within current stage (0-100) Returns: Overall progress percentage (0-100) """ try: # Get cumulative weight of completed stages completed_weight = 0 for stage in OperationStage: if stage == current_stage: break if stage in self.stage_weights: completed_weight += self.stage_weights[stage] # Add progress within current stage current_stage_weight = self.stage_weights.get(current_stage, 0) current_stage_progress = (stage_progress / 100.0) * current_stage_weight # Calculate total weight total_weight = sum(self.stage_weights.values()) # Calculate overall percentage overall_progress = ((completed_weight + current_stage_progress) / total_weight) * 100.0 return min(100.0, max(0.0, overall_progress)) except Exception as e: logger.warning(f"Error calculating overall progress: {str(e)}") return self.current_progress.percentage # Return current value on error def _estimate_remaining_time(self, current_stage: OperationStage, stage_progress: float) -> float: """ Estimate remaining time for operation Args: current_stage: Current operation stage stage_progress: Progress within current stage (0-100) Returns: Estimated remaining time in seconds """ try: remaining_time = 0.0 # Time remaining in current stage stage_estimate = self.stage_estimates.get(current_stage, 30) stage_remaining = stage_estimate * (1.0 - stage_progress / 100.0) remaining_time += stage_remaining # Time for remaining stages stage_found = False for stage in OperationStage: if stage == current_stage: stage_found = True continue if stage_found and stage in self.stage_estimates: remaining_time += self.stage_estimates[stage] return max(0.0, remaining_time) except Exception as e: logger.warning(f"Error estimating remaining time: {str(e)}") return 0.0 def _notify_callbacks(self): """Notify all registered progress callbacks""" try: for callback in self.progress_callbacks: try: callback(self.current_progress) except Exception as callback_error: logger.warning(f"Progress callback error: {str(callback_error)}") except Exception as e: logger.warning(f"Error notifying callbacks: {str(e)}") def get_operation_history(self) -> List[Dict[str, Any]]: """ Get history of tracked operations Returns: List of operation history records """ return self.operation_history.copy() def get_tracker_info(self) -> Dict[str, Any]: """ Get information about the progress tracker Returns: Dictionary with tracker information """ return { 'tracker_type': 'RealProgressTracker', 'is_tracking': self.is_tracking, 'mechanical_session_active': self.mechanical is not None, 'callback_count': len(self.progress_callbacks), 'operation_history_count': len(self.operation_history), 'supported_stages': [stage.value for stage in OperationStage], 'stage_estimates': {stage.value: estimate for stage, estimate in self.stage_estimates.items()}, 'stage_weights': {stage.value: weight for stage, weight in self.stage_weights.items()} }