AnsysLink/backend/pymechanical/real_progress_tracker.py
2025-08-11 13:58:59 +08:00

605 lines
24 KiB
Python

"""
Real Progress Tracker for CAE Mesh Generator
This module provides real-time progress monitoring for ANSYS Mechanical operations
using PyMechanical API to track mesh generation and other operations.
"""
import logging
import time
import threading
from typing import Dict, Any, Optional, Callable, List
from datetime import datetime, timedelta
from dataclasses import dataclass
from enum import Enum
logger = logging.getLogger(__name__)
class OperationStage(Enum):
"""ANSYS operation stages"""
INITIALIZING = "initializing"
GEOMETRY_IMPORT = "geometry_import"
MESH_SETUP = "mesh_setup"
MESH_GENERATION = "mesh_generation"
QUALITY_CHECK = "quality_check"
FILE_EXPORT = "file_export"
VISUALIZATION = "visualization"
COMPLETED = "completed"
FAILED = "failed"
@dataclass
class ProgressInfo:
"""Progress information container"""
stage: OperationStage = OperationStage.INITIALIZING
percentage: float = 0.0
message: str = ""
current_operation: str = ""
estimated_remaining_time: float = 0.0
started_at: datetime = None
last_updated: datetime = None
stage_start_time: datetime = None
detailed_info: Dict[str, Any] = None
def __post_init__(self):
if self.detailed_info is None:
self.detailed_info = {}
if self.started_at is None:
self.started_at = datetime.now()
if self.last_updated is None:
self.last_updated = datetime.now()
class RealProgressTracker:
"""
Real-time progress tracker for ANSYS Mechanical operations
This class monitors actual ANSYS operations and provides accurate
progress information including stage identification and time estimation.
"""
def __init__(self, mechanical_session):
"""
Initialize real progress tracker
Args:
mechanical_session: Active PyMechanical session
"""
if mechanical_session is None:
raise ValueError("Mechanical session is required for progress tracking")
self.mechanical = mechanical_session
self.current_progress = ProgressInfo()
self.progress_callbacks = []
self.is_tracking = False
self.tracking_thread = None
self.operation_history = []
# Initialize progress data analyzer
try:
from backend.pymechanical.progress_data_analyzer import ProgressDataAnalyzer
self.data_analyzer = ProgressDataAnalyzer()
except Exception as e:
logger.warning(f"Could not initialize progress data analyzer: {str(e)}")
self.data_analyzer = None
# Stage timing estimates (in seconds) based on typical operations
self.stage_estimates = {
OperationStage.INITIALIZING: 5,
OperationStage.GEOMETRY_IMPORT: 15,
OperationStage.MESH_SETUP: 10,
OperationStage.MESH_GENERATION: 120, # Most time-consuming
OperationStage.QUALITY_CHECK: 20,
OperationStage.FILE_EXPORT: 15,
OperationStage.VISUALIZATION: 10
}
# Stage progress weights for overall percentage calculation
self.stage_weights = {
OperationStage.INITIALIZING: 5,
OperationStage.GEOMETRY_IMPORT: 15,
OperationStage.MESH_SETUP: 10,
OperationStage.MESH_GENERATION: 50,
OperationStage.QUALITY_CHECK: 10,
OperationStage.FILE_EXPORT: 7,
OperationStage.VISUALIZATION: 3
}
logger.info("Real Progress Tracker initialized")
def add_progress_callback(self, callback: Callable[[ProgressInfo], None]):
"""
Add progress update callback
Args:
callback: Function to call when progress updates
"""
self.progress_callbacks.append(callback)
logger.debug(f"Progress callback added, total callbacks: {len(self.progress_callbacks)}")
def start_tracking(self, operation_name: str = "ANSYS Operation"):
"""
Start progress tracking
Args:
operation_name: Name of the operation being tracked
"""
try:
if self.is_tracking:
logger.warning("Progress tracking already active")
return
self.is_tracking = True
self.current_progress = ProgressInfo(
stage=OperationStage.INITIALIZING,
message=f"Starting {operation_name}...",
current_operation=operation_name,
started_at=datetime.now(),
stage_start_time=datetime.now()
)
# Start operation analysis if analyzer is available
if self.data_analyzer:
operation_context = {
'operation_name': operation_name,
'complexity': 'medium', # Default, can be enhanced
'start_time': datetime.now()
}
self.data_analyzer.start_operation_analysis('mesh_generation', operation_context)
# Start background tracking thread
self.tracking_thread = threading.Thread(
target=self._tracking_loop,
args=(operation_name,),
daemon=True
)
self.tracking_thread.start()
logger.info(f"Progress tracking started for: {operation_name}")
self._notify_callbacks()
except Exception as e:
logger.error(f"Failed to start progress tracking: {str(e)}")
self.is_tracking = False
def stop_tracking(self, success: bool = True, final_message: str = None):
"""
Stop progress tracking
Args:
success: Whether the operation completed successfully
final_message: Final status message
"""
try:
self.is_tracking = False
if success:
self.current_progress.stage = OperationStage.COMPLETED
self.current_progress.percentage = 100.0
self.current_progress.message = final_message or "Operation completed successfully"
else:
self.current_progress.stage = OperationStage.FAILED
self.current_progress.message = final_message or "Operation failed"
self.current_progress.last_updated = datetime.now()
self.current_progress.estimated_remaining_time = 0.0
# Complete operation analysis if analyzer is available
if self.data_analyzer:
try:
final_data = {
'final_stage': self.current_progress.stage.value,
'element_count': self.current_progress.detailed_info.get('element_count', 0),
'final_message': self.current_progress.message
}
self.data_analyzer.complete_operation_analysis(success, final_data)
except Exception as analyzer_error:
logger.warning(f"Error completing operation analysis: {str(analyzer_error)}")
# Add to history
operation_record = {
'operation': self.current_progress.current_operation,
'started_at': self.current_progress.started_at,
'completed_at': self.current_progress.last_updated,
'success': success,
'final_stage': self.current_progress.stage.value,
'total_time': (self.current_progress.last_updated - self.current_progress.started_at).total_seconds()
}
# Add detailed info if available
if self.current_progress.detailed_info:
operation_record['detailed_info'] = self.current_progress.detailed_info.copy()
self.operation_history.append(operation_record)
logger.info(f"Progress tracking stopped: {self.current_progress.message}")
self._notify_callbacks()
except Exception as e:
logger.error(f"Error stopping progress tracking: {str(e)}")
def update_stage(self, stage: OperationStage, message: str = None, stage_progress: float = 0.0):
"""
Update current operation stage
Args:
stage: New operation stage
message: Stage-specific message
stage_progress: Progress within current stage (0-100)
"""
try:
if not self.is_tracking:
return
# Update stage information
old_stage = self.current_progress.stage
self.current_progress.stage = stage
self.current_progress.message = message or f"Processing {stage.value.replace('_', ' ')}..."
self.current_progress.last_updated = datetime.now()
# Reset stage start time if stage changed
if old_stage != stage:
self.current_progress.stage_start_time = datetime.now()
logger.info(f"Stage changed: {old_stage.value} -> {stage.value}")
# Use data analyzer for enhanced progress calculation if available
if self.data_analyzer:
try:
# Update analyzer with current progress
operation_data = {
'element_count': self.current_progress.detailed_info.get('element_count', 0),
'mesh_status': self.current_progress.detailed_info.get('mesh_status', 'unknown')
}
progress_report = self.data_analyzer.update_operation_progress(
stage.value, stage_progress, operation_data
)
# Update progress info with analyzer results
self.current_progress.percentage = progress_report.overall_progress
self.current_progress.estimated_remaining_time = progress_report.estimated_remaining_time
# Add detailed analysis to progress info
self.current_progress.detailed_info.update({
'confidence_level': progress_report.confidence_level,
'operation_velocity': progress_report.operation_velocity,
'performance_metrics': progress_report.performance_metrics,
'recommendations': progress_report.recommendations
})
except Exception as analyzer_error:
logger.warning(f"Data analyzer error: {str(analyzer_error)}")
# Fallback to basic calculation
self.current_progress.percentage = self._calculate_overall_progress(stage, stage_progress)
self.current_progress.estimated_remaining_time = self._estimate_remaining_time(stage, stage_progress)
else:
# Basic calculation without analyzer
self.current_progress.percentage = self._calculate_overall_progress(stage, stage_progress)
self.current_progress.estimated_remaining_time = self._estimate_remaining_time(stage, stage_progress)
self._notify_callbacks()
except Exception as e:
logger.error(f"Error updating stage: {str(e)}")
def get_current_progress(self) -> ProgressInfo:
"""
Get current progress information
Returns:
Current ProgressInfo
"""
return self.current_progress
def _tracking_loop(self, operation_name: str):
"""
Background tracking loop that monitors ANSYS operations
Args:
operation_name: Name of the operation being tracked
"""
try:
logger.info(f"Starting tracking loop for: {operation_name}")
while self.is_tracking:
try:
# Monitor ANSYS status through PyMechanical
ansys_status = self._get_ansys_status()
if ansys_status:
self._process_ansys_status(ansys_status)
# Sleep for a short interval
time.sleep(2.0) # Check every 2 seconds
except Exception as loop_error:
logger.warning(f"Error in tracking loop: {str(loop_error)}")
time.sleep(5.0) # Wait longer on error
logger.info("Tracking loop ended")
except Exception as e:
logger.error(f"Tracking loop failed: {str(e)}")
self.is_tracking = False
def _get_ansys_status(self) -> Optional[Dict[str, Any]]:
"""
Get current ANSYS operation status
Returns:
Dictionary with ANSYS status information
"""
try:
# Query ANSYS for current operation status
status_script = '''
# Get ANSYS operation status
try:
import time
status_info = {
"timestamp": time.time(),
"is_busy": False,
"current_operation": "idle",
"mesh_status": "unknown",
"element_count": 0,
"node_count": 0,
"last_message": ""
}
# Check if mesh generation is in progress
try:
mesh = Model.Mesh
if mesh:
# Try to get mesh statistics
if hasattr(mesh, 'Elements') and mesh.Elements:
if hasattr(mesh.Elements, 'Count'):
status_info["element_count"] = mesh.Elements.Count
elif hasattr(mesh.Elements, '__len__'):
status_info["element_count"] = len(mesh.Elements)
if hasattr(mesh, 'Nodes') and mesh.Nodes:
if hasattr(mesh.Nodes, 'Count'):
status_info["node_count"] = mesh.Nodes.Count
elif hasattr(mesh.Nodes, '__len__'):
status_info["node_count"] = len(mesh.Nodes)
# Determine mesh status
if status_info["element_count"] > 0:
status_info["mesh_status"] = "generated"
status_info["current_operation"] = "mesh_complete"
else:
status_info["mesh_status"] = "not_generated"
status_info["current_operation"] = "mesh_pending"
except Exception as mesh_error:
status_info["last_message"] = "Error checking mesh: " + str(mesh_error)
# Check for active operations (this is simplified - real implementation would be more complex)
try:
# In a real implementation, you would check ANSYS internal status
# For now, we'll use basic heuristics
status_info["is_busy"] = False # Simplified
except Exception as busy_error:
status_info["last_message"] = "Error checking busy status: " + str(busy_error)
print("STATUS_INFO_START")
print("TIMESTAMP:" + str(status_info["timestamp"]))
print("IS_BUSY:" + str(status_info["is_busy"]))
print("CURRENT_OPERATION:" + str(status_info["current_operation"]))
print("MESH_STATUS:" + str(status_info["mesh_status"]))
print("ELEMENT_COUNT:" + str(status_info["element_count"]))
print("NODE_COUNT:" + str(status_info["node_count"]))
print("LAST_MESSAGE:" + str(status_info["last_message"]))
print("STATUS_INFO_END")
except Exception as e:
print("STATUS_ERROR:" + str(e))
'''
result = self.mechanical.run_python_script(status_script)
if result:
return self._parse_status_result(result)
return None
except Exception as e:
logger.warning(f"Failed to get ANSYS status: {str(e)}")
return None
def _parse_status_result(self, result: str) -> Optional[Dict[str, Any]]:
"""
Parse ANSYS status result from script output
Args:
result: Script output string
Returns:
Parsed status dictionary
"""
try:
status_info = {}
lines = str(result).split('\\n')
in_status_section = False
for line in lines:
if line.strip() == "STATUS_INFO_START":
in_status_section = True
continue
elif line.strip() == "STATUS_INFO_END":
break
elif in_status_section and ':' in line:
key, value = line.split(':', 1)
key = key.strip().lower()
value = value.strip()
# Convert values to appropriate types
if key in ['element_count', 'node_count']:
try:
status_info[key] = int(value)
except ValueError:
status_info[key] = 0
elif key == 'timestamp':
try:
status_info[key] = float(value)
except ValueError:
status_info[key] = time.time()
elif key == 'is_busy':
status_info[key] = value.lower() in ['true', '1', 'yes']
else:
status_info[key] = value
return status_info if status_info else None
except Exception as e:
logger.warning(f"Failed to parse status result: {str(e)}")
return None
def _process_ansys_status(self, status: Dict[str, Any]):
"""
Process ANSYS status and update progress accordingly
Args:
status: ANSYS status dictionary
"""
try:
current_op = status.get('current_operation', 'unknown')
element_count = status.get('element_count', 0)
mesh_status = status.get('mesh_status', 'unknown')
# Update detailed info
self.current_progress.detailed_info.update({
'ansys_status': status,
'element_count': element_count,
'mesh_status': mesh_status
})
# Determine stage based on ANSYS status
if current_op == 'mesh_complete' and element_count > 0:
if self.current_progress.stage in [OperationStage.MESH_GENERATION, OperationStage.MESH_SETUP]:
self.update_stage(
OperationStage.QUALITY_CHECK,
f"Mesh generated with {element_count} elements, checking quality...",
0.0
)
elif current_op == 'mesh_pending':
if self.current_progress.stage == OperationStage.INITIALIZING:
self.update_stage(
OperationStage.MESH_SETUP,
"Setting up mesh parameters...",
0.0
)
elif self.current_progress.stage == OperationStage.MESH_SETUP:
self.update_stage(
OperationStage.MESH_GENERATION,
"Generating mesh...",
0.0
)
except Exception as e:
logger.warning(f"Error processing ANSYS status: {str(e)}")
def _calculate_overall_progress(self, current_stage: OperationStage, stage_progress: float) -> float:
"""
Calculate overall progress percentage
Args:
current_stage: Current operation stage
stage_progress: Progress within current stage (0-100)
Returns:
Overall progress percentage (0-100)
"""
try:
# Get cumulative weight of completed stages
completed_weight = 0
for stage in OperationStage:
if stage == current_stage:
break
if stage in self.stage_weights:
completed_weight += self.stage_weights[stage]
# Add progress within current stage
current_stage_weight = self.stage_weights.get(current_stage, 0)
current_stage_progress = (stage_progress / 100.0) * current_stage_weight
# Calculate total weight
total_weight = sum(self.stage_weights.values())
# Calculate overall percentage
overall_progress = ((completed_weight + current_stage_progress) / total_weight) * 100.0
return min(100.0, max(0.0, overall_progress))
except Exception as e:
logger.warning(f"Error calculating overall progress: {str(e)}")
return self.current_progress.percentage # Return current value on error
def _estimate_remaining_time(self, current_stage: OperationStage, stage_progress: float) -> float:
"""
Estimate remaining time for operation
Args:
current_stage: Current operation stage
stage_progress: Progress within current stage (0-100)
Returns:
Estimated remaining time in seconds
"""
try:
remaining_time = 0.0
# Time remaining in current stage
stage_estimate = self.stage_estimates.get(current_stage, 30)
stage_remaining = stage_estimate * (1.0 - stage_progress / 100.0)
remaining_time += stage_remaining
# Time for remaining stages
stage_found = False
for stage in OperationStage:
if stage == current_stage:
stage_found = True
continue
if stage_found and stage in self.stage_estimates:
remaining_time += self.stage_estimates[stage]
return max(0.0, remaining_time)
except Exception as e:
logger.warning(f"Error estimating remaining time: {str(e)}")
return 0.0
def _notify_callbacks(self):
"""Notify all registered progress callbacks"""
try:
for callback in self.progress_callbacks:
try:
callback(self.current_progress)
except Exception as callback_error:
logger.warning(f"Progress callback error: {str(callback_error)}")
except Exception as e:
logger.warning(f"Error notifying callbacks: {str(e)}")
def get_operation_history(self) -> List[Dict[str, Any]]:
"""
Get history of tracked operations
Returns:
List of operation history records
"""
return self.operation_history.copy()
def get_tracker_info(self) -> Dict[str, Any]:
"""
Get information about the progress tracker
Returns:
Dictionary with tracker information
"""
return {
'tracker_type': 'RealProgressTracker',
'is_tracking': self.is_tracking,
'mechanical_session_active': self.mechanical is not None,
'callback_count': len(self.progress_callbacks),
'operation_history_count': len(self.operation_history),
'supported_stages': [stage.value for stage in OperationStage],
'stage_estimates': {stage.value: estimate for stage, estimate in self.stage_estimates.items()},
'stage_weights': {stage.value: weight for stage, weight in self.stage_weights.items()}
}