462 lines
18 KiB
Python
462 lines
18 KiB
Python
"""
|
|
Error Reporter for CAE Mesh Generator
|
|
|
|
This module provides error reporting and management capabilities,
|
|
including error collection, analysis, and reporting functionality.
|
|
"""
|
|
import logging
|
|
import json
|
|
from datetime import datetime, timedelta
|
|
from typing import Dict, Any, List, Optional
|
|
from pathlib import Path
|
|
import threading
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class ErrorReporter:
|
|
"""
|
|
Centralized error reporter for collecting and managing errors
|
|
|
|
This class provides functionality to collect, store, and analyze
|
|
errors from various components of the CAE Mesh Generator.
|
|
"""
|
|
|
|
def __init__(self, log_directory: str = "logs"):
|
|
"""
|
|
Initialize error reporter
|
|
|
|
Args:
|
|
log_directory: Directory to store error logs
|
|
"""
|
|
self.log_directory = Path(log_directory)
|
|
self.log_directory.mkdir(exist_ok=True)
|
|
|
|
self.error_log_file = self.log_directory / "errors.json"
|
|
self.session_errors = []
|
|
self.lock = threading.Lock()
|
|
|
|
# Initialize ANSYS error handler if available
|
|
try:
|
|
from backend.pymechanical.ansys_error_handler import ANSYSErrorHandler
|
|
self.ansys_error_handler = ANSYSErrorHandler()
|
|
except ImportError:
|
|
logger.warning("ANSYS error handler not available")
|
|
self.ansys_error_handler = None
|
|
|
|
logger.info(f"Error Reporter initialized with log directory: {self.log_directory}")
|
|
|
|
def report_error(self, error_type: str, error_message: str,
|
|
context: Dict[str, Any] = None, severity: str = "medium") -> str:
|
|
"""
|
|
Report an error to the error management system
|
|
|
|
Args:
|
|
error_type: Type of error (ansys, file_io, validation, etc.)
|
|
error_message: Error message
|
|
context: Additional context information
|
|
severity: Error severity (critical, high, medium, low)
|
|
|
|
Returns:
|
|
Error ID for tracking
|
|
"""
|
|
try:
|
|
error_id = f"error_{datetime.now().strftime('%Y%m%d_%H%M%S_%f')}"
|
|
|
|
error_record = {
|
|
'error_id': error_id,
|
|
'timestamp': datetime.now().isoformat(),
|
|
'error_type': error_type,
|
|
'error_message': error_message,
|
|
'severity': severity,
|
|
'context': context or {},
|
|
'resolved': False,
|
|
'diagnosis': None
|
|
}
|
|
|
|
# Add ANSYS-specific analysis if applicable
|
|
if error_type.lower() == 'ansys' and self.ansys_error_handler:
|
|
try:
|
|
from backend.pymechanical.ansys_error_handler import ErrorContext
|
|
|
|
ansys_context = ErrorContext(
|
|
operation_type=context.get('operation', 'unknown') if context else 'unknown',
|
|
file_path=context.get('file_path') if context else None,
|
|
system_info=context.get('system_info') if context else None
|
|
)
|
|
|
|
diagnosis = self.ansys_error_handler.analyze_error(error_message, ansys_context)
|
|
error_record['diagnosis'] = {
|
|
'error_id': diagnosis.error_id,
|
|
'category': diagnosis.category.value,
|
|
'severity': diagnosis.severity.value,
|
|
'title': diagnosis.title,
|
|
'description': diagnosis.description,
|
|
'root_cause': diagnosis.root_cause,
|
|
'immediate_solutions': diagnosis.immediate_solutions,
|
|
'preventive_measures': diagnosis.preventive_measures,
|
|
'recovery_possible': diagnosis.recovery_possible,
|
|
'estimated_fix_time': diagnosis.estimated_fix_time,
|
|
'confidence_level': diagnosis.confidence_level
|
|
}
|
|
|
|
except Exception as analysis_error:
|
|
logger.warning(f"Error analysis failed: {str(analysis_error)}")
|
|
|
|
# Store error in session and persistent storage
|
|
with self.lock:
|
|
self.session_errors.append(error_record)
|
|
self._persist_error(error_record)
|
|
|
|
logger.info(f"Error reported: {error_id} ({error_type}, {severity})")
|
|
return error_id
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to report error: {str(e)}")
|
|
return "error_reporting_failed"
|
|
|
|
def get_error_summary(self, hours: int = 24) -> Dict[str, Any]:
|
|
"""
|
|
Get error summary for specified time period
|
|
|
|
Args:
|
|
hours: Number of hours to look back
|
|
|
|
Returns:
|
|
Dictionary with error summary
|
|
"""
|
|
try:
|
|
cutoff_time = datetime.now() - timedelta(hours=hours)
|
|
|
|
# Filter recent errors
|
|
recent_errors = []
|
|
with self.lock:
|
|
for error in self.session_errors:
|
|
error_time = datetime.fromisoformat(error['timestamp'])
|
|
if error_time >= cutoff_time:
|
|
recent_errors.append(error)
|
|
|
|
# Load additional errors from persistent storage if needed
|
|
persistent_errors = self._load_recent_errors(hours)
|
|
|
|
# Combine and deduplicate
|
|
all_errors = recent_errors + [e for e in persistent_errors if e['error_id'] not in [r['error_id'] for r in recent_errors]]
|
|
|
|
# Generate summary
|
|
summary = {
|
|
'total_errors': len(all_errors),
|
|
'time_period_hours': hours,
|
|
'error_types': {},
|
|
'severities': {},
|
|
'resolved_count': 0,
|
|
'unresolved_count': 0,
|
|
'most_recent': None,
|
|
'critical_errors': [],
|
|
'recommendations': []
|
|
}
|
|
|
|
if all_errors:
|
|
# Count by type and severity
|
|
for error in all_errors:
|
|
error_type = error['error_type']
|
|
severity = error['severity']
|
|
|
|
summary['error_types'][error_type] = summary['error_types'].get(error_type, 0) + 1
|
|
summary['severities'][severity] = summary['severities'].get(severity, 0) + 1
|
|
|
|
if error['resolved']:
|
|
summary['resolved_count'] += 1
|
|
else:
|
|
summary['unresolved_count'] += 1
|
|
|
|
if severity == 'critical':
|
|
summary['critical_errors'].append({
|
|
'error_id': error['error_id'],
|
|
'message': error['error_message'][:100] + '...' if len(error['error_message']) > 100 else error['error_message'],
|
|
'timestamp': error['timestamp']
|
|
})
|
|
|
|
# Most recent error
|
|
most_recent = max(all_errors, key=lambda x: x['timestamp'])
|
|
summary['most_recent'] = {
|
|
'error_id': most_recent['error_id'],
|
|
'type': most_recent['error_type'],
|
|
'message': most_recent['error_message'][:100] + '...' if len(most_recent['error_message']) > 100 else most_recent['error_message'],
|
|
'timestamp': most_recent['timestamp']
|
|
}
|
|
|
|
# Generate recommendations
|
|
summary['recommendations'] = self._generate_error_recommendations(all_errors)
|
|
|
|
return summary
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to generate error summary: {str(e)}")
|
|
return {'error': str(e)}
|
|
|
|
def get_error_details(self, error_id: str) -> Optional[Dict[str, Any]]:
|
|
"""
|
|
Get detailed information about a specific error
|
|
|
|
Args:
|
|
error_id: Error ID to retrieve
|
|
|
|
Returns:
|
|
Error details or None if not found
|
|
"""
|
|
try:
|
|
# Search in session errors first
|
|
with self.lock:
|
|
for error in self.session_errors:
|
|
if error['error_id'] == error_id:
|
|
return error.copy()
|
|
|
|
# Search in persistent storage
|
|
persistent_error = self._load_error_by_id(error_id)
|
|
if persistent_error:
|
|
return persistent_error
|
|
|
|
return None
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to get error details for {error_id}: {str(e)}")
|
|
return None
|
|
|
|
def mark_error_resolved(self, error_id: str, resolution_notes: str = None) -> bool:
|
|
"""
|
|
Mark an error as resolved
|
|
|
|
Args:
|
|
error_id: Error ID to mark as resolved
|
|
resolution_notes: Optional notes about the resolution
|
|
|
|
Returns:
|
|
True if successfully marked as resolved
|
|
"""
|
|
try:
|
|
# Update in session errors
|
|
with self.lock:
|
|
for error in self.session_errors:
|
|
if error['error_id'] == error_id:
|
|
error['resolved'] = True
|
|
error['resolved_at'] = datetime.now().isoformat()
|
|
if resolution_notes:
|
|
error['resolution_notes'] = resolution_notes
|
|
|
|
# Update persistent storage
|
|
self._update_error_in_storage(error)
|
|
|
|
logger.info(f"Error {error_id} marked as resolved")
|
|
return True
|
|
|
|
# If not found in session, try to load and update from storage
|
|
persistent_error = self._load_error_by_id(error_id)
|
|
if persistent_error:
|
|
persistent_error['resolved'] = True
|
|
persistent_error['resolved_at'] = datetime.now().isoformat()
|
|
if resolution_notes:
|
|
persistent_error['resolution_notes'] = resolution_notes
|
|
|
|
self._update_error_in_storage(persistent_error)
|
|
logger.info(f"Error {error_id} marked as resolved in storage")
|
|
return True
|
|
|
|
logger.warning(f"Error {error_id} not found")
|
|
return False
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to mark error {error_id} as resolved: {str(e)}")
|
|
return False
|
|
|
|
def _persist_error(self, error_record: Dict[str, Any]):
|
|
"""Persist error to storage"""
|
|
try:
|
|
# Load existing errors
|
|
existing_errors = []
|
|
if self.error_log_file.exists():
|
|
try:
|
|
with open(self.error_log_file, 'r') as f:
|
|
existing_errors = json.load(f)
|
|
except json.JSONDecodeError:
|
|
logger.warning("Error log file corrupted, starting fresh")
|
|
existing_errors = []
|
|
|
|
# Add new error
|
|
existing_errors.append(error_record)
|
|
|
|
# Keep only recent errors (last 1000)
|
|
if len(existing_errors) > 1000:
|
|
existing_errors = existing_errors[-1000:]
|
|
|
|
# Save back to file
|
|
with open(self.error_log_file, 'w') as f:
|
|
json.dump(existing_errors, f, indent=2)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to persist error: {str(e)}")
|
|
|
|
def _load_recent_errors(self, hours: int) -> List[Dict[str, Any]]:
|
|
"""Load recent errors from persistent storage"""
|
|
try:
|
|
if not self.error_log_file.exists():
|
|
return []
|
|
|
|
with open(self.error_log_file, 'r') as f:
|
|
all_errors = json.load(f)
|
|
|
|
cutoff_time = datetime.now() - timedelta(hours=hours)
|
|
recent_errors = []
|
|
|
|
for error in all_errors:
|
|
try:
|
|
error_time = datetime.fromisoformat(error['timestamp'])
|
|
if error_time >= cutoff_time:
|
|
recent_errors.append(error)
|
|
except ValueError:
|
|
continue # Skip errors with invalid timestamps
|
|
|
|
return recent_errors
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to load recent errors: {str(e)}")
|
|
return []
|
|
|
|
def _load_error_by_id(self, error_id: str) -> Optional[Dict[str, Any]]:
|
|
"""Load specific error by ID from persistent storage"""
|
|
try:
|
|
if not self.error_log_file.exists():
|
|
return None
|
|
|
|
with open(self.error_log_file, 'r') as f:
|
|
all_errors = json.load(f)
|
|
|
|
for error in all_errors:
|
|
if error['error_id'] == error_id:
|
|
return error
|
|
|
|
return None
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to load error {error_id}: {str(e)}")
|
|
return None
|
|
|
|
def _update_error_in_storage(self, updated_error: Dict[str, Any]):
|
|
"""Update error in persistent storage"""
|
|
try:
|
|
if not self.error_log_file.exists():
|
|
return
|
|
|
|
with open(self.error_log_file, 'r') as f:
|
|
all_errors = json.load(f)
|
|
|
|
# Find and update the error
|
|
for i, error in enumerate(all_errors):
|
|
if error['error_id'] == updated_error['error_id']:
|
|
all_errors[i] = updated_error
|
|
break
|
|
|
|
# Save back to file
|
|
with open(self.error_log_file, 'w') as f:
|
|
json.dump(all_errors, f, indent=2)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to update error in storage: {str(e)}")
|
|
|
|
def _generate_error_recommendations(self, errors: List[Dict[str, Any]]) -> List[str]:
|
|
"""Generate recommendations based on error patterns"""
|
|
try:
|
|
recommendations = []
|
|
|
|
# Count error types
|
|
error_types = {}
|
|
for error in errors:
|
|
error_type = error['error_type']
|
|
error_types[error_type] = error_types.get(error_type, 0) + 1
|
|
|
|
# Generate type-specific recommendations
|
|
if error_types.get('ansys', 0) > 3:
|
|
recommendations.append("Multiple ANSYS errors detected - consider checking ANSYS installation and license status")
|
|
|
|
if error_types.get('file_io', 0) > 2:
|
|
recommendations.append("File I/O errors detected - verify file permissions and disk space")
|
|
|
|
if error_types.get('memory', 0) > 1:
|
|
recommendations.append("Memory-related errors detected - consider increasing available RAM or reducing model complexity")
|
|
|
|
# Check for critical errors
|
|
critical_count = sum(1 for error in errors if error['severity'] == 'critical')
|
|
if critical_count > 0:
|
|
recommendations.append(f"{critical_count} critical error(s) require immediate attention")
|
|
|
|
# Check resolution rate
|
|
resolved_count = sum(1 for error in errors if error['resolved'])
|
|
if len(errors) > 0:
|
|
resolution_rate = resolved_count / len(errors)
|
|
if resolution_rate < 0.5:
|
|
recommendations.append("Low error resolution rate - consider reviewing error handling procedures")
|
|
|
|
# Default recommendation if no specific patterns found
|
|
if not recommendations:
|
|
recommendations.append("Monitor error patterns and address recurring issues")
|
|
|
|
return recommendations
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to generate recommendations: {str(e)}")
|
|
return ["Unable to generate recommendations due to analysis error"]
|
|
|
|
def get_reporter_info(self) -> Dict[str, Any]:
|
|
"""
|
|
Get information about the error reporter
|
|
|
|
Returns:
|
|
Dictionary with reporter information
|
|
"""
|
|
return {
|
|
'reporter_type': 'ErrorReporter',
|
|
'log_directory': str(self.log_directory),
|
|
'session_errors_count': len(self.session_errors),
|
|
'ansys_error_handler_available': self.ansys_error_handler is not None,
|
|
'persistent_storage_available': self.error_log_file.exists(),
|
|
'capabilities': [
|
|
'error_collection',
|
|
'error_analysis',
|
|
'error_persistence',
|
|
'error_reporting',
|
|
'resolution_tracking',
|
|
'recommendation_generation'
|
|
]
|
|
}
|
|
|
|
# Global error reporter instance
|
|
error_reporter = ErrorReporter()
|
|
|
|
def log_processing_step(step_name: str, status: str, details: Dict[str, Any] = None):
|
|
"""
|
|
Log processing step for debugging and monitoring
|
|
|
|
Args:
|
|
step_name: Name of the processing step
|
|
status: Status (started, completed, failed)
|
|
details: Additional details
|
|
"""
|
|
try:
|
|
log_entry = {
|
|
'step': step_name,
|
|
'status': status,
|
|
'timestamp': datetime.now().isoformat(),
|
|
'details': details or {}
|
|
}
|
|
|
|
if status == 'failed':
|
|
error_message = details.get('error', 'Unknown error') if details else 'Unknown error'
|
|
error_reporter.report_error(
|
|
error_type='processing',
|
|
error_message=f"Processing step '{step_name}' failed: {error_message}",
|
|
context={'step': step_name, 'details': details},
|
|
severity='medium'
|
|
)
|
|
|
|
logger.info(f"Processing step: {step_name} - {status}", extra=log_entry)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to log processing step: {str(e)}") |