AnsysLink/backend/utils/diagnostic_collector.py
2025-08-11 13:58:59 +08:00

626 lines
27 KiB
Python

"""
Diagnostic Information Collector for CAE Mesh Generator
This module provides comprehensive diagnostic information collection
for troubleshooting and system monitoring purposes.
"""
import logging
import os
import platform
import psutil
import subprocess
import json
from datetime import datetime
from typing import Dict, Any, List, Optional
from pathlib import Path
import threading
import time
logger = logging.getLogger(__name__)
class DiagnosticCollector:
"""
Comprehensive diagnostic information collector
This class collects system information, ANSYS environment details,
performance metrics, and other diagnostic data for troubleshooting.
"""
def __init__(self):
"""Initialize diagnostic collector"""
self.collection_lock = threading.Lock()
self.last_collection_time = None
self.cached_static_info = None
logger.info("Diagnostic Collector initialized")
def collect_comprehensive_diagnostics(self, include_performance: bool = True,
include_ansys_env: bool = True) -> Dict[str, Any]:
"""
Collect comprehensive diagnostic information
Args:
include_performance: Include performance metrics
include_ansys_env: Include ANSYS environment information
Returns:
Dictionary with comprehensive diagnostic information
"""
try:
with self.collection_lock:
logger.info("Starting comprehensive diagnostic collection...")
diagnostics = {
'collection_info': {
'timestamp': datetime.now().isoformat(),
'collector_version': '1.0',
'collection_duration': 0.0
},
'system_info': {},
'python_environment': {},
'ansys_environment': {},
'performance_metrics': {},
'disk_info': {},
'network_info': {},
'process_info': {},
'error_summary': {}
}
start_time = time.time()
# Collect system information
diagnostics['system_info'] = self._collect_system_info()
# Collect Python environment
diagnostics['python_environment'] = self._collect_python_environment()
# Collect ANSYS environment if requested
if include_ansys_env:
diagnostics['ansys_environment'] = self._collect_ansys_environment()
# Collect performance metrics if requested
if include_performance:
diagnostics['performance_metrics'] = self._collect_performance_metrics()
# Collect disk information
diagnostics['disk_info'] = self._collect_disk_info()
# Collect network information
diagnostics['network_info'] = self._collect_network_info()
# Collect process information
diagnostics['process_info'] = self._collect_process_info()
# Collect error summary
diagnostics['error_summary'] = self._collect_error_summary()
# Update collection info
collection_duration = time.time() - start_time
diagnostics['collection_info']['collection_duration'] = collection_duration
self.last_collection_time = datetime.now()
logger.info(f"Diagnostic collection completed in {collection_duration:.2f}s")
return diagnostics
except Exception as e:
logger.error(f"Comprehensive diagnostic collection failed: {str(e)}")
return {
'collection_info': {
'timestamp': datetime.now().isoformat(),
'error': str(e)
},
'error': 'Diagnostic collection failed'
}
def _collect_system_info(self) -> Dict[str, Any]:
"""Collect system information"""
try:
# Use cached static info if available and recent
if (self.cached_static_info and self.last_collection_time and
(datetime.now() - self.last_collection_time).seconds < 300): # 5 minutes cache
return self.cached_static_info
system_info = {
'platform': {
'system': platform.system(),
'release': platform.release(),
'version': platform.version(),
'machine': platform.machine(),
'processor': platform.processor(),
'architecture': platform.architecture(),
'platform_string': platform.platform()
},
'cpu': {
'physical_cores': psutil.cpu_count(logical=False),
'logical_cores': psutil.cpu_count(logical=True),
'max_frequency': psutil.cpu_freq().max if psutil.cpu_freq() else 'Unknown',
'current_frequency': psutil.cpu_freq().current if psutil.cpu_freq() else 'Unknown'
},
'memory': {
'total_gb': round(psutil.virtual_memory().total / (1024**3), 2),
'available_gb': round(psutil.virtual_memory().available / (1024**3), 2),
'used_gb': round(psutil.virtual_memory().used / (1024**3), 2),
'percentage_used': psutil.virtual_memory().percent
},
'environment_variables': {
'PATH': os.environ.get('PATH', 'Not set'),
'PYTHONPATH': os.environ.get('PYTHONPATH', 'Not set'),
'TEMP': os.environ.get('TEMP', 'Not set'),
'USER': os.environ.get('USER', os.environ.get('USERNAME', 'Unknown'))
}
}
# Cache static info
self.cached_static_info = system_info
return system_info
except Exception as e:
logger.error(f"System info collection failed: {str(e)}")
return {'error': str(e)}
def _collect_python_environment(self) -> Dict[str, Any]:
"""Collect Python environment information"""
try:
import sys
import pkg_resources
python_info = {
'version': sys.version,
'version_info': {
'major': sys.version_info.major,
'minor': sys.version_info.minor,
'micro': sys.version_info.micro
},
'executable': sys.executable,
'path': sys.path[:5], # First 5 paths to avoid too much data
'installed_packages': {}
}
# Get key packages
key_packages = ['flask', 'psutil', 'pathlib', 'requests', 'numpy', 'scipy']
for package_name in key_packages:
try:
package = pkg_resources.get_distribution(package_name)
python_info['installed_packages'][package_name] = package.version
except pkg_resources.DistributionNotFound:
python_info['installed_packages'][package_name] = 'Not installed'
return python_info
except Exception as e:
logger.error(f"Python environment collection failed: {str(e)}")
return {'error': str(e)}
def _collect_ansys_environment(self) -> Dict[str, Any]:
"""Collect ANSYS environment information"""
try:
ansys_info = {
'installation_detected': False,
'version_info': {},
'license_info': {},
'environment_variables': {},
'installation_paths': []
}
# Check for ANSYS environment variables
ansys_env_vars = [
'ANSYS_DIR', 'ANSYSLIC_DIR', 'ANSYS_SYSDIR',
'AWP_ROOT', 'ANSYS_INC', 'ANSYS_PRODUCT_PATH'
]
for var in ansys_env_vars:
value = os.environ.get(var)
if value:
ansys_info['environment_variables'][var] = value
ansys_info['installation_detected'] = True
# Check common ANSYS installation paths
common_paths = [
'C:\\Program Files\\ANSYS Inc',
'C:\\ANSYS Inc',
'/usr/ansys_inc',
'/opt/ansys_inc'
]
for path in common_paths:
if os.path.exists(path):
ansys_info['installation_paths'].append(path)
ansys_info['installation_detected'] = True
# Try to detect version from directory structure
try:
subdirs = [d for d in os.listdir(path) if os.path.isdir(os.path.join(path, d))]
version_dirs = [d for d in subdirs if d.startswith('v') and d[1:].replace('.', '').isdigit()]
if version_dirs:
ansys_info['version_info']['detected_versions'] = version_dirs
except Exception:
pass
# Try to get ANSYS version through PyMechanical if available
try:
# This is a simplified check - actual implementation would vary
ansys_info['pymechanical_available'] = True
ansys_info['version_info']['pymechanical_status'] = 'Available'
except ImportError:
ansys_info['pymechanical_available'] = False
ansys_info['version_info']['pymechanical_status'] = 'Not available'
# Check license server connectivity (simplified)
license_server = os.environ.get('ANSYSLIC_DIR') or os.environ.get('LM_LICENSE_FILE')
if license_server:
ansys_info['license_info']['license_server'] = license_server
ansys_info['license_info']['connectivity_status'] = 'Unknown' # Would need actual test
return ansys_info
except Exception as e:
logger.error(f"ANSYS environment collection failed: {str(e)}")
return {'error': str(e)}
def _collect_performance_metrics(self) -> Dict[str, Any]:
"""Collect current performance metrics"""
try:
performance = {
'cpu_usage': {
'current_percent': psutil.cpu_percent(interval=1),
'per_cpu': psutil.cpu_percent(interval=1, percpu=True)
},
'memory_usage': {
'virtual_memory': {
'total': psutil.virtual_memory().total,
'available': psutil.virtual_memory().available,
'percent': psutil.virtual_memory().percent,
'used': psutil.virtual_memory().used,
'free': psutil.virtual_memory().free
},
'swap_memory': {
'total': psutil.swap_memory().total,
'used': psutil.swap_memory().used,
'free': psutil.swap_memory().free,
'percent': psutil.swap_memory().percent
}
},
'load_average': getattr(os, 'getloadavg', lambda: [0, 0, 0])(),
'boot_time': datetime.fromtimestamp(psutil.boot_time()).isoformat()
}
return performance
except Exception as e:
logger.error(f"Performance metrics collection failed: {str(e)}")
return {'error': str(e)}
def _collect_disk_info(self) -> Dict[str, Any]:
"""Collect disk usage information"""
try:
disk_info = {
'disk_usage': {},
'disk_io': {}
}
# Get disk usage for all mounted disks
partitions = psutil.disk_partitions()
for partition in partitions:
try:
partition_usage = psutil.disk_usage(partition.mountpoint)
disk_info['disk_usage'][partition.device] = {
'mountpoint': partition.mountpoint,
'fstype': partition.fstype,
'total_gb': round(partition_usage.total / (1024**3), 2),
'used_gb': round(partition_usage.used / (1024**3), 2),
'free_gb': round(partition_usage.free / (1024**3), 2),
'percent_used': round((partition_usage.used / partition_usage.total) * 100, 2)
}
except PermissionError:
# Skip partitions we can't access
continue
# Get disk I/O statistics
try:
disk_io = psutil.disk_io_counters()
if disk_io:
disk_info['disk_io'] = {
'read_count': disk_io.read_count,
'write_count': disk_io.write_count,
'read_bytes': disk_io.read_bytes,
'write_bytes': disk_io.write_bytes,
'read_time': disk_io.read_time,
'write_time': disk_io.write_time
}
except Exception:
disk_info['disk_io'] = {'error': 'Could not collect disk I/O stats'}
return disk_info
except Exception as e:
logger.error(f"Disk info collection failed: {str(e)}")
return {'error': str(e)}
def _collect_network_info(self) -> Dict[str, Any]:
"""Collect network information"""
try:
network_info = {
'network_interfaces': {},
'network_connections': {},
'network_io': {}
}
# Get network interfaces
interfaces = psutil.net_if_addrs()
for interface_name, addresses in interfaces.items():
network_info['network_interfaces'][interface_name] = []
for addr in addresses:
network_info['network_interfaces'][interface_name].append({
'family': str(addr.family),
'address': addr.address,
'netmask': addr.netmask,
'broadcast': addr.broadcast
})
# Get network I/O statistics
try:
net_io = psutil.net_io_counters()
if net_io:
network_info['network_io'] = {
'bytes_sent': net_io.bytes_sent,
'bytes_recv': net_io.bytes_recv,
'packets_sent': net_io.packets_sent,
'packets_recv': net_io.packets_recv,
'errin': net_io.errin,
'errout': net_io.errout,
'dropin': net_io.dropin,
'dropout': net_io.dropout
}
except Exception:
network_info['network_io'] = {'error': 'Could not collect network I/O stats'}
# Get active connections (limited to avoid too much data)
try:
connections = psutil.net_connections(kind='inet')[:10] # Limit to first 10
network_info['network_connections'] = {
'active_connections_count': len(psutil.net_connections(kind='inet')),
'sample_connections': [
{
'family': str(conn.family),
'type': str(conn.type),
'local_address': f"{conn.laddr.ip}:{conn.laddr.port}" if conn.laddr else None,
'remote_address': f"{conn.raddr.ip}:{conn.raddr.port}" if conn.raddr else None,
'status': conn.status,
'pid': conn.pid
} for conn in connections
]
}
except Exception:
network_info['network_connections'] = {'error': 'Could not collect connection info'}
return network_info
except Exception as e:
logger.error(f"Network info collection failed: {str(e)}")
return {'error': str(e)}
def _collect_process_info(self) -> Dict[str, Any]:
"""Collect process information"""
try:
process_info = {
'current_process': {},
'system_processes': {},
'ansys_processes': []
}
# Current process info
current_proc = psutil.Process()
process_info['current_process'] = {
'pid': current_proc.pid,
'name': current_proc.name(),
'cpu_percent': current_proc.cpu_percent(),
'memory_percent': current_proc.memory_percent(),
'memory_info': {
'rss': current_proc.memory_info().rss,
'vms': current_proc.memory_info().vms
},
'create_time': datetime.fromtimestamp(current_proc.create_time()).isoformat(),
'num_threads': current_proc.num_threads()
}
# System process summary
all_processes = list(psutil.process_iter(['pid', 'name', 'cpu_percent', 'memory_percent']))
process_info['system_processes'] = {
'total_processes': len(all_processes),
'top_cpu_processes': [],
'top_memory_processes': []
}
# Find top CPU and memory processes
try:
cpu_sorted = sorted(all_processes, key=lambda p: p.info['cpu_percent'] or 0, reverse=True)[:5]
memory_sorted = sorted(all_processes, key=lambda p: p.info['memory_percent'] or 0, reverse=True)[:5]
process_info['system_processes']['top_cpu_processes'] = [
{
'pid': p.info['pid'],
'name': p.info['name'],
'cpu_percent': p.info['cpu_percent']
} for p in cpu_sorted
]
process_info['system_processes']['top_memory_processes'] = [
{
'pid': p.info['pid'],
'name': p.info['name'],
'memory_percent': p.info['memory_percent']
} for p in memory_sorted
]
except Exception:
pass # Skip if process info collection fails
# Look for ANSYS processes
ansys_keywords = ['ansys', 'mechanical', 'fluent', 'cfx', 'mapdl']
for proc in all_processes:
try:
proc_name = proc.info['name'].lower()
if any(keyword in proc_name for keyword in ansys_keywords):
process_info['ansys_processes'].append({
'pid': proc.info['pid'],
'name': proc.info['name'],
'cpu_percent': proc.info['cpu_percent'],
'memory_percent': proc.info['memory_percent']
})
except Exception:
continue # Skip processes we can't access
return process_info
except Exception as e:
logger.error(f"Process info collection failed: {str(e)}")
return {'error': str(e)}
def _collect_error_summary(self) -> Dict[str, Any]:
"""Collect error summary from error reporter"""
try:
# Try to get error summary from error reporter
try:
from backend.utils.error_reporter import error_reporter
error_summary = error_reporter.get_error_summary(hours=24)
return error_summary
except ImportError:
return {'error': 'Error reporter not available'}
except Exception as e:
logger.error(f"Error summary collection failed: {str(e)}")
return {'error': str(e)}
def generate_diagnostic_report(self, output_file: str = None) -> str:
"""
Generate comprehensive diagnostic report
Args:
output_file: Optional output file path
Returns:
Report content as string
"""
try:
logger.info("Generating diagnostic report...")
# Collect diagnostics
diagnostics = self.collect_comprehensive_diagnostics()
# Generate report
report_lines = []
report_lines.append("=" * 80)
report_lines.append("CAE MESH GENERATOR - DIAGNOSTIC REPORT")
report_lines.append("=" * 80)
report_lines.append(f"Generated: {diagnostics['collection_info']['timestamp']}")
report_lines.append(f"Collection Duration: {diagnostics['collection_info']['collection_duration']:.2f}s")
report_lines.append("")
# System Information
report_lines.append("SYSTEM INFORMATION")
report_lines.append("-" * 40)
sys_info = diagnostics.get('system_info', {})
if 'platform' in sys_info:
platform_info = sys_info['platform']
report_lines.append(f"Operating System: {platform_info.get('system')} {platform_info.get('release')}")
report_lines.append(f"Architecture: {platform_info.get('architecture')}")
report_lines.append(f"Processor: {platform_info.get('processor')}")
if 'cpu' in sys_info:
cpu_info = sys_info['cpu']
report_lines.append(f"CPU Cores: {cpu_info.get('physical_cores')} physical, {cpu_info.get('logical_cores')} logical")
if 'memory' in sys_info:
mem_info = sys_info['memory']
report_lines.append(f"Memory: {mem_info.get('total_gb')}GB total, {mem_info.get('available_gb')}GB available ({mem_info.get('percentage_used')}% used)")
report_lines.append("")
# ANSYS Environment
report_lines.append("ANSYS ENVIRONMENT")
report_lines.append("-" * 40)
ansys_info = diagnostics.get('ansys_environment', {})
report_lines.append(f"Installation Detected: {ansys_info.get('installation_detected', False)}")
report_lines.append(f"PyMechanical Available: {ansys_info.get('pymechanical_available', False)}")
if ansys_info.get('installation_paths'):
report_lines.append(f"Installation Paths: {', '.join(ansys_info['installation_paths'])}")
if ansys_info.get('version_info', {}).get('detected_versions'):
report_lines.append(f"Detected Versions: {', '.join(ansys_info['version_info']['detected_versions'])}")
report_lines.append("")
# Performance Metrics
report_lines.append("PERFORMANCE METRICS")
report_lines.append("-" * 40)
perf_info = diagnostics.get('performance_metrics', {})
if 'cpu_usage' in perf_info:
report_lines.append(f"CPU Usage: {perf_info['cpu_usage'].get('current_percent', 0)}%")
if 'memory_usage' in perf_info and 'virtual_memory' in perf_info['memory_usage']:
vm = perf_info['memory_usage']['virtual_memory']
report_lines.append(f"Memory Usage: {vm.get('percent', 0)}%")
report_lines.append("")
# Error Summary
report_lines.append("ERROR SUMMARY (Last 24 Hours)")
report_lines.append("-" * 40)
error_info = diagnostics.get('error_summary', {})
report_lines.append(f"Total Errors: {error_info.get('total_errors', 0)}")
report_lines.append(f"Resolved: {error_info.get('resolved_count', 0)}")
report_lines.append(f"Unresolved: {error_info.get('unresolved_count', 0)}")
if error_info.get('error_types'):
report_lines.append("Error Types:")
for error_type, count in error_info['error_types'].items():
report_lines.append(f" - {error_type}: {count}")
report_lines.append("")
report_lines.append("=" * 80)
# Join report
report_content = "\\n".join(report_lines)
# Save to file if requested
if output_file:
try:
with open(output_file, 'w') as f:
f.write(report_content)
logger.info(f"Diagnostic report saved to: {output_file}")
except Exception as e:
logger.error(f"Failed to save report to file: {str(e)}")
return report_content
except Exception as e:
logger.error(f"Diagnostic report generation failed: {str(e)}")
return f"Diagnostic report generation failed: {str(e)}"
def get_collector_info(self) -> Dict[str, Any]:
"""
Get information about the diagnostic collector
Returns:
Dictionary with collector information
"""
return {
'collector_type': 'DiagnosticCollector',
'last_collection_time': self.last_collection_time.isoformat() if self.last_collection_time else None,
'cached_static_info_available': self.cached_static_info is not None,
'collection_capabilities': [
'system_information',
'python_environment',
'ansys_environment',
'performance_metrics',
'disk_information',
'network_information',
'process_information',
'error_summary',
'diagnostic_report_generation'
]
}
# Global diagnostic collector instance
diagnostic_collector = DiagnosticCollector()