AnsysLink/backend/pymechanical/progress_data_analyzer.py
2025-08-11 13:58:59 +08:00

852 lines
34 KiB
Python

"""
Progress Data Analyzer for CAE Mesh Generator
This module provides advanced progress data analysis and reporting capabilities
for ANSYS Mechanical operations, including accurate progress calculation and
time estimation based on real operation patterns.
"""
import logging
import time
import statistics
from typing import Dict, Any, Optional, List, Tuple
from datetime import datetime, timedelta
from dataclasses import dataclass
from enum import Enum
logger = logging.getLogger(__name__)
@dataclass
class OperationPattern:
"""Pattern data for operation timing analysis"""
operation_type: str
stage: str
typical_duration: float
min_duration: float
max_duration: float
sample_count: int
last_updated: datetime
@dataclass
class ProgressReport:
"""Comprehensive progress report"""
current_stage: str
overall_progress: float
stage_progress: float
estimated_remaining_time: float
estimated_completion_time: datetime
confidence_level: float
operation_velocity: float # elements/second or similar
performance_metrics: Dict[str, Any]
historical_comparison: Dict[str, Any]
bottleneck_analysis: List[str]
recommendations: List[str]
class ProgressDataAnalyzer:
"""
Advanced progress data analyzer for ANSYS operations
This class analyzes ANSYS operation patterns, provides accurate progress
calculations, and generates intelligent time estimates based on historical
data and current performance metrics.
"""
def __init__(self):
"""Initialize progress data analyzer"""
self.operation_patterns = {}
self.current_operation_data = {}
self.historical_data = []
self.performance_baselines = {}
# Initialize default operation patterns based on typical ANSYS behavior
self._initialize_default_patterns()
logger.info("Progress Data Analyzer initialized")
def _initialize_default_patterns(self):
"""Initialize default operation timing patterns"""
try:
# Default patterns based on typical ANSYS Mechanical operations
default_patterns = {
'geometry_import': {
'small_model': {'duration': 10, 'variance': 5},
'medium_model': {'duration': 30, 'variance': 15},
'large_model': {'duration': 60, 'variance': 30}
},
'mesh_setup': {
'simple_mesh': {'duration': 15, 'variance': 8},
'complex_mesh': {'duration': 45, 'variance': 20},
'advanced_mesh': {'duration': 90, 'variance': 40}
},
'mesh_generation': {
'coarse_mesh': {'duration': 60, 'variance': 30},
'medium_mesh': {'duration': 180, 'variance': 60},
'fine_mesh': {'duration': 600, 'variance': 200},
'very_fine_mesh': {'duration': 1800, 'variance': 600}
},
'quality_check': {
'basic_check': {'duration': 20, 'variance': 10},
'detailed_check': {'duration': 60, 'variance': 25}
},
'file_export': {
'small_file': {'duration': 10, 'variance': 5},
'large_file': {'duration': 30, 'variance': 15}
}
}
for operation_type, patterns in default_patterns.items():
self.operation_patterns[operation_type] = {}
for pattern_name, timing in patterns.items():
self.operation_patterns[operation_type][pattern_name] = OperationPattern(
operation_type=operation_type,
stage=pattern_name,
typical_duration=timing['duration'],
min_duration=max(1, timing['duration'] - timing['variance']),
max_duration=timing['duration'] + timing['variance'],
sample_count=1, # Default pattern
last_updated=datetime.now()
)
logger.info("Default operation patterns initialized")
except Exception as e:
logger.error(f"Failed to initialize default patterns: {str(e)}")
def start_operation_analysis(self, operation_type: str, operation_context: Dict[str, Any]):
"""
Start analyzing a new operation
Args:
operation_type: Type of operation (mesh_generation, quality_check, etc.)
operation_context: Context information (model size, complexity, etc.)
"""
try:
self.current_operation_data = {
'operation_type': operation_type,
'context': operation_context,
'start_time': datetime.now(),
'stages': [],
'performance_data': {},
'progress_history': []
}
logger.info(f"Started operation analysis: {operation_type}")
except Exception as e:
logger.error(f"Failed to start operation analysis: {str(e)}")
def update_operation_progress(self, stage: str, stage_progress: float,
operation_data: Dict[str, Any] = None) -> ProgressReport:
"""
Update operation progress and generate comprehensive report
Args:
stage: Current operation stage
stage_progress: Progress within current stage (0-100)
operation_data: Additional operation data (element count, etc.)
Returns:
ProgressReport with detailed analysis
"""
try:
if not self.current_operation_data:
logger.warning("No active operation for progress update")
return self._create_default_report(stage, stage_progress)
# Update current operation data
current_time = datetime.now()
self.current_operation_data['last_update'] = current_time
# Record stage transition if changed
if not self.current_operation_data['stages'] or self.current_operation_data['stages'][-1]['stage'] != stage:
self.current_operation_data['stages'].append({
'stage': stage,
'start_time': current_time,
'progress_at_start': stage_progress
})
# Update performance data
if operation_data:
self.current_operation_data['performance_data'].update(operation_data)
# Record progress history
self.current_operation_data['progress_history'].append({
'timestamp': current_time,
'stage': stage,
'progress': stage_progress,
'data': operation_data or {}
})
# Generate comprehensive progress report
report = self._generate_progress_report(stage, stage_progress)
return report
except Exception as e:
logger.error(f"Failed to update operation progress: {str(e)}")
return self._create_default_report(stage, stage_progress)
def _generate_progress_report(self, current_stage: str, stage_progress: float) -> ProgressReport:
"""
Generate comprehensive progress report with analysis
Args:
current_stage: Current operation stage
stage_progress: Progress within current stage
Returns:
ProgressReport with detailed analysis
"""
try:
# Calculate overall progress
overall_progress = self._calculate_overall_progress(current_stage, stage_progress)
# Estimate remaining time
remaining_time, confidence = self._estimate_remaining_time(current_stage, stage_progress)
# Calculate completion time
completion_time = datetime.now() + timedelta(seconds=remaining_time)
# Analyze operation velocity
velocity = self._calculate_operation_velocity()
# Generate performance metrics
performance_metrics = self._analyze_performance_metrics()
# Compare with historical data
historical_comparison = self._compare_with_historical_data()
# Identify bottlenecks
bottlenecks = self._identify_bottlenecks()
# Generate recommendations
recommendations = self._generate_recommendations(current_stage, performance_metrics)
report = ProgressReport(
current_stage=current_stage,
overall_progress=overall_progress,
stage_progress=stage_progress,
estimated_remaining_time=remaining_time,
estimated_completion_time=completion_time,
confidence_level=confidence,
operation_velocity=velocity,
performance_metrics=performance_metrics,
historical_comparison=historical_comparison,
bottleneck_analysis=bottlenecks,
recommendations=recommendations
)
return report
except Exception as e:
logger.error(f"Failed to generate progress report: {str(e)}")
return self._create_default_report(current_stage, stage_progress)
def _calculate_overall_progress(self, current_stage: str, stage_progress: float) -> float:
"""
Calculate overall operation progress
Args:
current_stage: Current operation stage
stage_progress: Progress within current stage
Returns:
Overall progress percentage (0-100)
"""
try:
# Define stage weights based on typical operation flow
stage_weights = {
'initializing': 5,
'geometry_import': 15,
'mesh_setup': 10,
'mesh_generation': 50,
'quality_check': 10,
'file_export': 7,
'visualization': 3
}
# Calculate completed stages weight
completed_weight = 0
stage_order = list(stage_weights.keys())
try:
current_stage_index = stage_order.index(current_stage)
for i in range(current_stage_index):
completed_weight += stage_weights[stage_order[i]]
except ValueError:
# Stage not in predefined order, estimate based on name
if 'mesh' in current_stage.lower():
completed_weight = 30 # Assume past initial stages
elif 'quality' in current_stage.lower():
completed_weight = 80 # Assume past mesh generation
else:
completed_weight = 10 # Conservative estimate
# Add current stage progress
current_stage_weight = stage_weights.get(current_stage, 10)
current_stage_contribution = (stage_progress / 100.0) * current_stage_weight
# Calculate total weight
total_weight = sum(stage_weights.values())
# Calculate overall progress
overall_progress = ((completed_weight + current_stage_contribution) / total_weight) * 100.0
return min(100.0, max(0.0, overall_progress))
except Exception as e:
logger.warning(f"Error calculating overall progress: {str(e)}")
return stage_progress # Fallback to stage progress
def _estimate_remaining_time(self, current_stage: str, stage_progress: float) -> Tuple[float, float]:
"""
Estimate remaining time with confidence level
Args:
current_stage: Current operation stage
stage_progress: Progress within current stage
Returns:
Tuple of (remaining_time_seconds, confidence_level)
"""
try:
if not self.current_operation_data:
return 60.0, 0.3 # Default estimate with low confidence
# Get operation context for better estimation
context = self.current_operation_data.get('context', {})
operation_type = self.current_operation_data.get('operation_type', 'unknown')
# Estimate based on current stage and historical patterns
stage_remaining_time = self._estimate_stage_remaining_time(current_stage, stage_progress, context)
# Estimate time for remaining stages
remaining_stages_time = self._estimate_remaining_stages_time(current_stage, context)
total_remaining_time = stage_remaining_time + remaining_stages_time
# Calculate confidence based on data quality
confidence = self._calculate_time_estimate_confidence(current_stage, context)
return max(0.0, total_remaining_time), confidence
except Exception as e:
logger.warning(f"Error estimating remaining time: {str(e)}")
return 60.0, 0.3 # Default fallback
def _estimate_stage_remaining_time(self, stage: str, progress: float, context: Dict[str, Any]) -> float:
"""
Estimate remaining time for current stage
Args:
stage: Current stage name
progress: Current stage progress (0-100)
context: Operation context
Returns:
Estimated remaining time for current stage in seconds
"""
try:
# Get pattern for current stage
pattern = self._get_best_matching_pattern(stage, context)
if pattern:
# Calculate remaining time based on pattern and current progress
stage_total_time = pattern.typical_duration
elapsed_ratio = progress / 100.0
remaining_ratio = 1.0 - elapsed_ratio
return stage_total_time * remaining_ratio
else:
# Fallback estimation
default_times = {
'mesh_generation': 120,
'quality_check': 30,
'file_export': 15,
'visualization': 10
}
stage_time = default_times.get(stage, 30)
remaining_ratio = (100.0 - progress) / 100.0
return stage_time * remaining_ratio
except Exception as e:
logger.warning(f"Error estimating stage remaining time: {str(e)}")
return 30.0 # Default fallback
def _estimate_remaining_stages_time(self, current_stage: str, context: Dict[str, Any]) -> float:
"""
Estimate time for all remaining stages after current one
Args:
current_stage: Current stage name
context: Operation context
Returns:
Estimated time for remaining stages in seconds
"""
try:
# Define typical stage sequence and default times
stage_sequence = [
('initializing', 5),
('geometry_import', 15),
('mesh_setup', 10),
('mesh_generation', 120),
('quality_check', 30),
('file_export', 15),
('visualization', 10)
]
# Find current stage position
current_found = False
remaining_time = 0.0
for stage_name, default_time in stage_sequence:
if current_found:
# This is a remaining stage
pattern = self._get_best_matching_pattern(stage_name, context)
if pattern:
remaining_time += pattern.typical_duration
else:
remaining_time += default_time
elif stage_name == current_stage or current_stage in stage_name:
current_found = True
return remaining_time
except Exception as e:
logger.warning(f"Error estimating remaining stages time: {str(e)}")
return 60.0 # Default fallback
def _get_best_matching_pattern(self, stage: str, context: Dict[str, Any]) -> Optional[OperationPattern]:
"""
Get best matching operation pattern for given stage and context
Args:
stage: Stage name
context: Operation context
Returns:
Best matching OperationPattern or None
"""
try:
# Determine operation category
if 'mesh' in stage.lower():
operation_type = 'mesh_generation'
elif 'quality' in stage.lower():
operation_type = 'quality_check'
elif 'export' in stage.lower():
operation_type = 'file_export'
elif 'import' in stage.lower():
operation_type = 'geometry_import'
else:
return None
if operation_type not in self.operation_patterns:
return None
# Select best pattern based on context
patterns = self.operation_patterns[operation_type]
# Simple heuristic based on context
element_count = context.get('element_count', 0)
model_complexity = context.get('complexity', 'medium')
if operation_type == 'mesh_generation':
if element_count > 100000 or model_complexity == 'high':
return patterns.get('fine_mesh') or patterns.get('medium_mesh')
elif element_count > 50000 or model_complexity == 'medium':
return patterns.get('medium_mesh')
else:
return patterns.get('coarse_mesh')
else:
# Return first available pattern for other operations
return next(iter(patterns.values()), None)
except Exception as e:
logger.warning(f"Error getting best matching pattern: {str(e)}")
return None
def _calculate_time_estimate_confidence(self, stage: str, context: Dict[str, Any]) -> float:
"""
Calculate confidence level for time estimates
Args:
stage: Current stage
context: Operation context
Returns:
Confidence level (0.0 to 1.0)
"""
try:
confidence = 0.5 # Base confidence
# Increase confidence based on available data
if self.current_operation_data.get('progress_history'):
history_length = len(self.current_operation_data['progress_history'])
confidence += min(0.3, history_length * 0.05) # More history = higher confidence
# Increase confidence if we have matching patterns
pattern = self._get_best_matching_pattern(stage, context)
if pattern and pattern.sample_count > 1:
confidence += min(0.2, pattern.sample_count * 0.02)
# Decrease confidence for complex operations
if context.get('complexity') == 'high':
confidence -= 0.1
return max(0.1, min(1.0, confidence))
except Exception as e:
logger.warning(f"Error calculating confidence: {str(e)}")
return 0.5
def _calculate_operation_velocity(self) -> float:
"""
Calculate current operation velocity (elements/second or similar metric)
Returns:
Operation velocity
"""
try:
if not self.current_operation_data or not self.current_operation_data.get('progress_history'):
return 0.0
history = self.current_operation_data['progress_history']
if len(history) < 2:
return 0.0
# Calculate velocity based on progress over time
recent_entries = history[-5:] # Use last 5 entries
if len(recent_entries) >= 2:
time_diff = (recent_entries[-1]['timestamp'] - recent_entries[0]['timestamp']).total_seconds()
progress_diff = recent_entries[-1]['progress'] - recent_entries[0]['progress']
if time_diff > 0:
return progress_diff / time_diff # Progress units per second
return 0.0
except Exception as e:
logger.warning(f"Error calculating operation velocity: {str(e)}")
return 0.0
def _analyze_performance_metrics(self) -> Dict[str, Any]:
"""
Analyze current operation performance metrics
Returns:
Dictionary with performance analysis
"""
try:
metrics = {
'operation_efficiency': 'normal',
'resource_utilization': 'unknown',
'bottleneck_detected': False,
'performance_trend': 'stable'
}
if not self.current_operation_data:
return metrics
# Analyze progress velocity trend
velocity = self._calculate_operation_velocity()
if velocity > 0:
metrics['operation_efficiency'] = 'good' if velocity > 1.0 else 'normal'
metrics['performance_trend'] = 'improving' if velocity > 0.5 else 'stable'
# Check for performance issues
history = self.current_operation_data.get('progress_history', [])
if len(history) > 3:
recent_progress = [entry['progress'] for entry in history[-3:]]
if len(set(recent_progress)) == 1: # No progress change
metrics['bottleneck_detected'] = True
metrics['operation_efficiency'] = 'poor'
return metrics
except Exception as e:
logger.warning(f"Error analyzing performance metrics: {str(e)}")
return {'error': str(e)}
def _compare_with_historical_data(self) -> Dict[str, Any]:
"""
Compare current operation with historical data
Returns:
Dictionary with historical comparison
"""
try:
comparison = {
'faster_than_average': None,
'typical_performance': True,
'historical_data_available': len(self.historical_data) > 0
}
if not self.historical_data:
return comparison
# Simple comparison logic (can be enhanced)
current_duration = (datetime.now() - self.current_operation_data.get('start_time', datetime.now())).total_seconds()
similar_operations = [
op for op in self.historical_data
if op.get('operation_type') == self.current_operation_data.get('operation_type')
]
if similar_operations:
avg_duration = statistics.mean([op.get('total_duration', 0) for op in similar_operations])
comparison['faster_than_average'] = current_duration < avg_duration
comparison['typical_performance'] = abs(current_duration - avg_duration) < (avg_duration * 0.3)
return comparison
except Exception as e:
logger.warning(f"Error comparing with historical data: {str(e)}")
return {'error': str(e)}
def _identify_bottlenecks(self) -> List[str]:
"""
Identify potential bottlenecks in current operation
Returns:
List of identified bottlenecks
"""
try:
bottlenecks = []
if not self.current_operation_data:
return bottlenecks
# Check for stalled progress
history = self.current_operation_data.get('progress_history', [])
if len(history) > 3:
recent_progress = [entry['progress'] for entry in history[-3:]]
if len(set(recent_progress)) == 1:
bottlenecks.append("Progress appears stalled - no advancement in recent updates")
# Check for slow stages
stages = self.current_operation_data.get('stages', [])
for stage_info in stages:
stage_duration = (datetime.now() - stage_info['start_time']).total_seconds()
if stage_duration > 300: # More than 5 minutes
bottlenecks.append(f"Stage '{stage_info['stage']}' is taking longer than expected")
return bottlenecks
except Exception as e:
logger.warning(f"Error identifying bottlenecks: {str(e)}")
return []
def _generate_recommendations(self, current_stage: str, performance_metrics: Dict[str, Any]) -> List[str]:
"""
Generate recommendations based on current progress and performance
Args:
current_stage: Current operation stage
performance_metrics: Performance analysis results
Returns:
List of recommendations
"""
try:
recommendations = []
# Performance-based recommendations
if performance_metrics.get('bottleneck_detected'):
recommendations.append("Consider checking system resources - operation may be resource-constrained")
recommendations.append("Monitor ANSYS process for potential issues")
if performance_metrics.get('operation_efficiency') == 'poor':
recommendations.append("Operation is running slower than expected - consider optimizing mesh settings")
# Stage-specific recommendations
if 'mesh_generation' in current_stage.lower():
recommendations.append("Mesh generation in progress - avoid interrupting the process")
recommendations.append("Monitor memory usage during mesh generation")
elif 'quality' in current_stage.lower():
recommendations.append("Quality check in progress - results will be available soon")
# General recommendations
if not recommendations:
recommendations.append("Operation is progressing normally")
recommendations.append("Estimated completion time is based on current performance")
return recommendations
except Exception as e:
logger.warning(f"Error generating recommendations: {str(e)}")
return ["Unable to generate recommendations due to analysis error"]
def _create_default_report(self, stage: str, progress: float) -> ProgressReport:
"""
Create default progress report when analysis fails
Args:
stage: Current stage
progress: Current progress
Returns:
Default ProgressReport
"""
return ProgressReport(
current_stage=stage,
overall_progress=progress,
stage_progress=progress,
estimated_remaining_time=60.0,
estimated_completion_time=datetime.now() + timedelta(seconds=60),
confidence_level=0.3,
operation_velocity=0.0,
performance_metrics={'status': 'unknown'},
historical_comparison={'available': False},
bottleneck_analysis=[],
recommendations=["Limited analysis available - using default estimates"]
)
def complete_operation_analysis(self, success: bool, final_data: Dict[str, Any] = None):
"""
Complete current operation analysis and store results
Args:
success: Whether operation completed successfully
final_data: Final operation data
"""
try:
if not self.current_operation_data:
return
# Calculate total operation time
end_time = datetime.now()
total_duration = (end_time - self.current_operation_data['start_time']).total_seconds()
# Create historical record
historical_record = {
'operation_type': self.current_operation_data['operation_type'],
'context': self.current_operation_data['context'],
'start_time': self.current_operation_data['start_time'],
'end_time': end_time,
'total_duration': total_duration,
'success': success,
'stages': self.current_operation_data['stages'],
'final_data': final_data or {}
}
# Add to historical data
self.historical_data.append(historical_record)
# Update operation patterns based on this operation
self._update_operation_patterns(historical_record)
# Clear current operation data
self.current_operation_data = {}
logger.info(f"Operation analysis completed: {total_duration:.1f}s, Success: {success}")
except Exception as e:
logger.error(f"Error completing operation analysis: {str(e)}")
def _update_operation_patterns(self, historical_record: Dict[str, Any]):
"""
Update operation patterns based on completed operation
Args:
historical_record: Completed operation record
"""
try:
operation_type = historical_record['operation_type']
total_duration = historical_record['total_duration']
context = historical_record['context']
# Determine pattern category
pattern_key = self._determine_pattern_key(operation_type, context)
if operation_type not in self.operation_patterns:
self.operation_patterns[operation_type] = {}
if pattern_key in self.operation_patterns[operation_type]:
# Update existing pattern
pattern = self.operation_patterns[operation_type][pattern_key]
# Simple moving average update
old_weight = pattern.sample_count
new_weight = old_weight + 1
pattern.typical_duration = (
(pattern.typical_duration * old_weight + total_duration) / new_weight
)
pattern.min_duration = min(pattern.min_duration, total_duration)
pattern.max_duration = max(pattern.max_duration, total_duration)
pattern.sample_count = new_weight
pattern.last_updated = datetime.now()
else:
# Create new pattern
self.operation_patterns[operation_type][pattern_key] = OperationPattern(
operation_type=operation_type,
stage=pattern_key,
typical_duration=total_duration,
min_duration=total_duration,
max_duration=total_duration,
sample_count=1,
last_updated=datetime.now()
)
logger.debug(f"Updated operation pattern: {operation_type}/{pattern_key}")
except Exception as e:
logger.warning(f"Error updating operation patterns: {str(e)}")
def _determine_pattern_key(self, operation_type: str, context: Dict[str, Any]) -> str:
"""
Determine pattern key based on operation type and context
Args:
operation_type: Type of operation
context: Operation context
Returns:
Pattern key string
"""
try:
element_count = context.get('element_count', 0)
complexity = context.get('complexity', 'medium')
if operation_type == 'mesh_generation':
if element_count > 100000:
return 'fine_mesh'
elif element_count > 50000:
return 'medium_mesh'
else:
return 'coarse_mesh'
elif operation_type == 'quality_check':
return 'detailed_check' if complexity == 'high' else 'basic_check'
else:
return f"{complexity}_operation"
except Exception as e:
logger.warning(f"Error determining pattern key: {str(e)}")
return 'default'
def get_analyzer_info(self) -> Dict[str, Any]:
"""
Get information about the progress analyzer
Returns:
Dictionary with analyzer information
"""
return {
'analyzer_type': 'ProgressDataAnalyzer',
'operation_patterns_count': sum(len(patterns) for patterns in self.operation_patterns.values()),
'historical_operations_count': len(self.historical_data),
'current_operation_active': bool(self.current_operation_data),
'supported_operations': list(self.operation_patterns.keys()),
'analysis_capabilities': [
'progress_calculation',
'time_estimation',
'performance_analysis',
'bottleneck_detection',
'historical_comparison',
'recommendation_generation'
]
}