AnsysLink/backend/api/routes.py
2025-08-11 13:58:59 +08:00

3254 lines
127 KiB
Python

"""
API routes for CAE Mesh Generator
"""
import os
import uuid
import time
from datetime import datetime, timedelta
from flask import Blueprint, request, jsonify, current_app
from werkzeug.utils import secure_filename
from pathlib import Path
from backend.models.data_models import UploadedFile, ProcessingStatus
from backend.utils.file_validator import validate_step_file, get_file_info
from backend.utils.state_manager import state_manager
from backend.utils.mesh_processor import process_blade_mesh_with_state_updates, ProcessingStep
from backend.utils.visualization_exporter import VisualizationExporter, VisualizationSettings
from backend.utils.error_handler import (
handle_api_error, handle_ansys_error, validate_file_upload,
FileUploadError, ANSYSError, MeshGenerationError, ValidationError,
error_reporter, log_processing_step
)
from config import ALLOWED_EXTENSIONS, UPLOAD_FOLDER
import threading
# Create API blueprint
api_bp = Blueprint('api', __name__)
def allowed_file(filename):
"""Check if file extension is allowed"""
return Path(filename).suffix.lower() in ALLOWED_EXTENSIONS
def get_file_size(file_path):
"""Get file size in bytes"""
try:
return os.path.getsize(file_path)
except OSError:
return 0
@api_bp.route('/upload', methods=['POST'])
@handle_api_error
def upload_file():
"""
Handle file upload
POST /api/upload
"""
log_processing_step("file_upload", "started")
# Check if file is in request
if 'file' not in request.files:
raise FileUploadError("No file provided")
file = request.files['file']
# Validate file upload
validate_file_upload(file)
# Validate file extension
if not allowed_file(file.filename):
raise FileUploadError(
f'Invalid file format. Only {", ".join(ALLOWED_EXTENSIONS)} files are supported.',
details={'provided_filename': file.filename, 'allowed_extensions': ALLOWED_EXTENSIONS}
)
# Generate unique filename
file_id = str(uuid.uuid4())
original_filename = secure_filename(file.filename)
file_extension = Path(original_filename).suffix
unique_filename = f"{file_id}{file_extension}"
# Ensure upload directory exists
os.makedirs(UPLOAD_FOLDER, exist_ok=True)
# Save file
file_path = os.path.join(UPLOAD_FOLDER, unique_filename)
file.save(file_path)
# Validate uploaded file
is_valid, validation_error = validate_step_file(file_path)
if not is_valid:
# Remove invalid file
try:
os.remove(file_path)
except:
pass
raise FileUploadError(f'File validation failed: {validation_error}')
# Get file information
file_info = get_file_info(file_path)
# Create file record
uploaded_file = UploadedFile(
id=file_id,
filename=original_filename,
file_path=file_path,
upload_time=datetime.now(),
status='UPLOADED'
)
# Update state manager
state_manager.set_current_file(uploaded_file)
return jsonify({
'success': True,
'file': uploaded_file.to_dict(),
'file_info': file_info,
'message': 'File uploaded and validated successfully'
}), 200
@api_bp.route('/files/current', methods=['GET'])
def get_current_file():
"""
Get current uploaded file information
GET /api/files/current
"""
current_file = state_manager.get_current_file()
if current_file is None:
return jsonify({
'success': False,
'message': 'No file uploaded'
}), 404
return jsonify({
'success': True,
'file': current_file.to_dict()
}), 200
@api_bp.route('/mesh/status', methods=['GET'])
def get_mesh_status():
"""
Get current processing status
GET /api/mesh/status
"""
processing_status = state_manager.get_processing_status()
return jsonify({
'success': True,
'status': processing_status.to_dict()
}), 200
@api_bp.route('/mesh/result', methods=['GET'])
def get_mesh_result():
"""
Get comprehensive mesh generation result with statistics and visualization
GET /api/mesh/result
Query parameters:
- include_visualization: bool - Include visualization data (default: false)
- include_quality_details: bool - Include detailed quality metrics (default: false)
- format: str - Response format (json, summary) (default: json)
"""
try:
# Get query parameters
include_visualization = request.args.get('include_visualization', 'false').lower() == 'true'
include_quality_details = request.args.get('include_quality_details', 'false').lower() == 'true'
response_format = request.args.get('format', 'json').lower()
# Get basic mesh result
mesh_result = state_manager.get_mesh_result()
if mesh_result is None:
return jsonify({
'success': False,
'message': 'No mesh result available'
}), 404
# Get processing status for additional context
processing_status = state_manager.get_processing_status()
current_file = state_manager.get_current_file()
# Build comprehensive result
result_data = {
'basic_info': mesh_result.to_dict(),
'processing_info': {
'status': processing_status.status if processing_status else 'unknown',
'progress_percentage': processing_status.progress_percentage if processing_status else 0,
'started_at': processing_status.start_time.isoformat() if processing_status and processing_status.start_time else None,
'completed_at': processing_status.completed_at.isoformat() if processing_status and processing_status.completed_at else None,
'total_time': (processing_status.completed_at - processing_status.start_time).total_seconds() if processing_status and processing_status.start_time and processing_status.completed_at else 0
},
'file_info': {
'filename': current_file.filename if current_file else 'unknown',
'file_size': get_file_size(current_file.file_path) if current_file and current_file.file_path else 0,
'upload_time': current_file.upload_time.isoformat() if current_file and current_file.upload_time else None
}
}
# Add detailed quality information if requested
if include_quality_details:
result_data['quality_details'] = _get_detailed_quality_info(mesh_result)
# Add visualization data if requested
if include_visualization:
result_data['visualization'] = _get_visualization_info()
# Format response based on requested format
if response_format == 'summary':
return jsonify({
'success': True,
'summary': _format_result_summary(result_data)
}), 200
else:
return jsonify({
'success': True,
'result': result_data,
'metadata': {
'retrieved_at': datetime.now().isoformat(),
'include_visualization': include_visualization,
'include_quality_details': include_quality_details,
'format': response_format
}
}), 200
except Exception as e:
current_app.logger.error(f"Get mesh result error: {str(e)}")
return jsonify({
'success': False,
'error': f'Failed to retrieve mesh result: {str(e)}'
}), 500
@api_bp.route('/system/state', methods=['GET'])
def get_system_state():
"""
Get complete system state
GET /api/system/state
"""
system_state = state_manager.get_system_state()
return jsonify({
'success': True,
'state': system_state
}), 200
@api_bp.route('/system/reset', methods=['POST'])
def reset_system():
"""
Reset system state
POST /api/system/reset
"""
try:
state_manager.clear_current_file()
state_manager.clear_session_data()
return jsonify({
'success': True,
'message': 'System state reset successfully'
}), 200
except Exception as e:
current_app.logger.error(f"System reset error: {str(e)}")
return jsonify({
'success': False,
'error': f'Reset failed: {str(e)}'
}), 500
@api_bp.route('/mesh/ready', methods=['GET'])
def check_mesh_ready():
"""
Check if system is ready for mesh generation
GET /api/mesh/ready
"""
is_ready = state_manager.is_ready_for_processing()
current_file = state_manager.get_current_file()
processing_status = state_manager.get_processing_status()
return jsonify({
'success': True,
'ready': is_ready,
'file_uploaded': current_file is not None,
'processing_status': processing_status.status,
'message': 'Ready for mesh generation' if is_ready else 'Not ready for mesh generation'
}), 200
@api_bp.route('/health', methods=['GET'])
def health_check():
"""
Health check endpoint
GET /api/health
"""
system_state = state_manager.get_system_state()
return jsonify({
'success': True,
'message': 'CAE Mesh Generator API is running',
'timestamp': datetime.now().isoformat(),
'system_status': {
'has_file': system_state['current_file'] is not None,
'processing_status': system_state['processing_status']['status'],
'ready_for_processing': system_state['is_ready_for_processing']
}
}), 200
@api_bp.route('/mesh/generate', methods=['POST'])
@handle_api_error
@handle_ansys_error
def generate_mesh():
"""
Start mesh generation for uploaded file
POST /api/mesh/generate
"""
log_processing_step("mesh_generation", "requested")
# Check if system is ready for processing
if not state_manager.is_ready_for_processing():
current_file = state_manager.get_current_file()
processing_status = state_manager.get_processing_status()
if current_file is None:
raise ValidationError("No file uploaded")
elif state_manager.is_processing():
raise ValidationError(
"Processing already in progress",
details={'current_status': processing_status.status}
)
else:
raise ValidationError(
f'System not ready for processing. Current status: {processing_status.status}',
details={'current_status': processing_status.status}
)
# Always use real ANSYS integration
# Get current file and app for background thread
current_file = state_manager.get_current_file()
file_path = current_file.file_path
app = current_app._get_current_object() # Get the actual app instance
# Start processing in background thread
def background_processing():
"""Background thread for mesh processing"""
with app.app_context(): # Use the captured app instance
try:
app.logger.info(f"Starting mesh generation for file: {current_file.filename}")
# Start processing status
state_manager.start_processing(f"Starting mesh generation for {current_file.filename}")
# Process mesh
result = process_blade_mesh_with_state_updates(
file_path=file_path
)
if result.success:
app.logger.info(f"✓ Mesh generation completed successfully: {result.element_count} elements")
state_manager.complete_processing("Mesh generation completed successfully")
else:
app.logger.error(f"✗ Mesh generation failed: {result.error_message}")
state_manager.set_processing_error(result.error_message)
except Exception as e:
app.logger.error(f"Background processing error: {str(e)}")
state_manager.set_processing_error(f"Processing error: {str(e)}")
# Start background thread
processing_thread = threading.Thread(target=background_processing, daemon=True)
processing_thread.start()
return jsonify({
'success': True,
'message': 'Mesh generation started',
'file_id': current_file.id,
'filename': current_file.filename,
'started_at': datetime.now().isoformat()
}), 202 # Accepted - processing started
@api_bp.route('/mesh/progress', methods=['GET'])
def get_mesh_progress():
"""
Get comprehensive mesh generation progress with real-time analysis
GET /api/mesh/progress
Query parameters:
- detailed: bool - Include detailed progress analysis (default: false)
- include_recommendations: bool - Include performance recommendations (default: false)
"""
try:
# Get query parameters
include_detailed = request.args.get('detailed', 'false').lower() == 'true'
include_recommendations = request.args.get('include_recommendations', 'false').lower() == 'true'
processing_status = state_manager.get_processing_status()
current_file = state_manager.get_current_file()
# Build basic response data
response_data = {
'success': True,
'status': processing_status.status,
'message': processing_status.message,
'progress_percentage': getattr(processing_status, 'progress_percentage', 0.0),
'current_operation': getattr(processing_status, 'current_operation', None),
'last_updated': getattr(processing_status, 'last_updated', processing_status.start_time),
'file_info': {
'id': current_file.id if current_file else None,
'filename': current_file.filename if current_file else None
} if current_file else None
}
# Add enhanced progress information if available
if hasattr(processing_status, 'current_stage') and processing_status.current_stage:
response_data['current_stage'] = processing_status.current_stage
if hasattr(processing_status, 'estimated_remaining_time'):
response_data['estimated_remaining_time'] = processing_status.estimated_remaining_time
# Calculate estimated completion time
if processing_status.estimated_remaining_time > 0:
from datetime import timedelta
estimated_completion = datetime.now() + timedelta(seconds=processing_status.estimated_remaining_time)
response_data['estimated_completion_time'] = estimated_completion.isoformat()
# Add timing information
if processing_status.start_time:
response_data['started_at'] = processing_status.start_time.isoformat()
if processing_status.status in ['COMPLETED', 'ERROR']:
end_time = getattr(processing_status, 'completed_at', None) or processing_status.end_time
if end_time:
response_data['completed_at'] = end_time.isoformat()
processing_time = (end_time - processing_status.start_time).total_seconds()
response_data['processing_time'] = processing_time
else:
# Calculate current processing time
current_time = datetime.now()
processing_time = (current_time - processing_status.start_time).total_seconds()
response_data['current_processing_time'] = processing_time
# Add detailed progress analysis if requested
if include_detailed:
detailed_info = {
'operation_velocity': getattr(processing_status, 'operation_velocity', 0.0),
'confidence_level': getattr(processing_status, 'confidence_level', 0.0),
'performance_metrics': {},
'stage_history': []
}
# Add detailed info from processing status
if hasattr(processing_status, 'detailed_info') and processing_status.detailed_info:
detailed_info.update(processing_status.detailed_info)
response_data['detailed_analysis'] = detailed_info
# Add performance recommendations if requested
if include_recommendations:
recommendations = []
# Generate recommendations based on current status
if processing_status.status == 'PROCESSING':
if hasattr(processing_status, 'detailed_info') and processing_status.detailed_info:
recommendations.extend(processing_status.detailed_info.get('recommendations', []))
if not recommendations:
recommendations = [
"Operation is in progress - avoid interrupting the process",
"Monitor system resources during mesh generation",
"Estimated completion time is based on current performance"
]
elif processing_status.status == 'COMPLETED':
recommendations = [
"Mesh generation completed successfully",
"Review quality metrics and visualization",
"Consider exporting mesh files if needed"
]
elif processing_status.status == 'ERROR':
recommendations = [
"Check error message for specific issues",
"Verify input file format and content",
"Consider adjusting mesh parameters and retrying"
]
else:
recommendations = [
"Ready to start mesh generation",
"Upload a valid STEP file to begin"
]
response_data['recommendations'] = recommendations
# Add error information if failed
if processing_status.status == 'ERROR' and processing_status.error_message:
response_data['error_message'] = processing_status.error_message
# Add progress quality indicators
response_data['progress_quality'] = {
'data_source': 'real_time_tracking',
'confidence_available': hasattr(processing_status, 'confidence_level'),
'detailed_analysis_available': hasattr(processing_status, 'detailed_info'),
'time_estimation_available': hasattr(processing_status, 'estimated_remaining_time')
}
return jsonify(response_data), 200
except Exception as e:
current_app.logger.error(f"Progress check error: {str(e)}")
return jsonify({
'success': False,
'error': f'Failed to get progress: {str(e)}'
}), 500
@api_bp.route('/mesh/cancel', methods=['POST'])
def cancel_mesh_generation():
"""
Cancel ongoing mesh generation
POST /api/mesh/cancel
"""
try:
processing_status = state_manager.get_processing_status()
if processing_status.status != 'PROCESSING':
return jsonify({
'success': False,
'error': f'No processing to cancel. Current status: {processing_status.status}'
}), 400
# Set status to cancelled (the background thread should handle this gracefully)
state_manager.set_processing_status('CANCELLED', 'Mesh generation cancelled by user')
current_app.logger.info("Mesh generation cancellation requested")
return jsonify({
'success': True,
'message': 'Mesh generation cancellation requested',
'cancelled_at': datetime.now().isoformat()
}), 200
except Exception as e:
current_app.logger.error(f"Cancellation error: {str(e)}")
return jsonify({
'success': False,
'error': f'Failed to cancel: {str(e)}'
}), 500
# Helper functions for detailed result processing
def _get_detailed_quality_info(mesh_result):
"""
Get detailed quality information from mesh result
Args:
mesh_result: MeshResult object
Returns:
Dictionary with detailed quality information
"""
try:
quality_details = {
'overall_score': mesh_result.quality_score,
'overall_status': mesh_result.quality_status,
'quality_breakdown': {
'element_quality': {
'score': mesh_result.quality_score,
'status': mesh_result.quality_status,
'threshold': 0.2,
'description': 'Minimum element quality measure'
},
'mesh_density': {
'elements_per_volume': mesh_result.element_count / 1000 if mesh_result.element_count > 0 else 0,
'nodes_per_element': mesh_result.node_count / mesh_result.element_count if mesh_result.element_count > 0 else 0,
'description': 'Mesh density characteristics'
}
},
'recommendations': _get_quality_recommendations(mesh_result),
'quality_metrics': {
'element_count': mesh_result.element_count,
'node_count': mesh_result.node_count,
'quality_score': mesh_result.quality_score,
'generation_time': mesh_result.generation_time
}
}
return quality_details
except Exception as e:
current_app.logger.error(f"Error getting detailed quality info: {str(e)}")
return {
'error': f'Failed to get detailed quality info: {str(e)}',
'overall_score': mesh_result.quality_score if mesh_result else 0,
'overall_status': mesh_result.quality_status if mesh_result else 'unknown'
}
def _get_quality_recommendations(mesh_result):
"""
Generate quality improvement recommendations
Args:
mesh_result: MeshResult object
Returns:
List of recommendation strings
"""
recommendations = []
try:
if mesh_result.quality_score < 50:
recommendations.append("Consider reducing global element size for better quality")
recommendations.append("Add local refinement to high-curvature areas")
elif mesh_result.quality_score < 70:
recommendations.append("Mesh quality is acceptable but could be improved")
recommendations.append("Consider adding inflation layers for better boundary resolution")
else:
recommendations.append("Excellent mesh quality achieved")
recommendations.append("Mesh is suitable for accurate analysis")
if mesh_result.element_count > 100000:
recommendations.append("High element count - consider optimizing for computational efficiency")
elif mesh_result.element_count < 10000:
recommendations.append("Low element count - consider refining for better accuracy")
return recommendations
except Exception as e:
current_app.logger.error(f"Error generating recommendations: {str(e)}")
return ["Unable to generate recommendations due to error"]
def _get_visualization_info():
"""
Get visualization information and generate images if needed
Returns:
Dictionary with visualization information
"""
try:
# Initialize visualization exporter
viz_exporter = VisualizationExporter(output_dir="static/visualizations")
visualization_info = {
'available_views': viz_exporter.get_available_views(),
'available_formats': viz_exporter.get_available_formats(),
'default_settings': {
'width': 1280,
'height': 720,
'background': 'white',
'camera_view': 'isometric',
'format': 'PNG'
},
'images': [],
'export_summary': viz_exporter.get_export_summary()
}
# Try to generate basic mesh visualization
try:
settings = VisualizationSettings(
width=800,
height=600,
camera_view='isometric',
background_color='white'
)
result = viz_exporter.export_mesh_image(
filename='current_mesh_preview.png',
settings=settings
)
if result.success:
visualization_info['images'].append({
'type': 'mesh_preview',
'path': result.image_path,
'size': result.image_size,
'file_size': result.file_size,
'description': 'Current mesh visualization'
})
except Exception as img_error:
current_app.logger.warning(f"Could not generate mesh preview: {str(img_error)}")
visualization_info['images'].append({
'type': 'mesh_preview',
'error': str(img_error),
'description': 'Mesh preview generation failed'
})
return visualization_info
except Exception as e:
current_app.logger.error(f"Error getting visualization info: {str(e)}")
return {
'error': f'Failed to get visualization info: {str(e)}',
'available_views': [],
'available_formats': [],
'images': []
}
def _format_result_summary(result_data):
"""
Format result data as a concise summary
Args:
result_data: Complete result data dictionary
Returns:
Dictionary with summary information
"""
try:
basic_info = result_data.get('basic_info', {})
processing_info = result_data.get('processing_info', {})
file_info = result_data.get('file_info', {})
summary = {
'mesh_statistics': {
'elements': basic_info.get('element_count', 0),
'nodes': basic_info.get('node_count', 0),
'quality_score': basic_info.get('quality_score', 0),
'quality_status': basic_info.get('quality_status', 'unknown')
},
'processing_summary': {
'status': processing_info.get('status', 'unknown'),
'total_time': processing_info.get('total_time', 0),
'completed': processing_info.get('status') == 'completed'
},
'file_summary': {
'filename': file_info.get('filename', 'unknown'),
'file_size_mb': round(file_info.get('file_size', 0) / (1024 * 1024), 2)
},
'success_indicators': {
'mesh_generated': basic_info.get('element_count', 0) > 0,
'quality_acceptable': basic_info.get('quality_score', 0) >= 50,
'processing_completed': processing_info.get('status') == 'completed'
}
}
return summary
except Exception as e:
current_app.logger.error(f"Error formatting result summary: {str(e)}")
return {
'error': f'Failed to format summary: {str(e)}',
'mesh_statistics': {'elements': 0, 'nodes': 0, 'quality_score': 0},
'processing_summary': {'status': 'error', 'completed': False},
'success_indicators': {'mesh_generated': False, 'quality_acceptable': False, 'processing_completed': False}
}
@api_bp.route('/mesh/visualization', methods=['GET'])
def get_mesh_visualization():
"""
Generate and return simple mesh visualization
GET /api/mesh/visualization
Query parameters:
- width: int - Image width (default: 800)
- height: int - Image height (default: 600)
- regenerate: bool - Force regenerate image (default: false)
"""
try:
# Get query parameters
width = int(request.args.get('width', 800))
height = int(request.args.get('height', 600))
regenerate = request.args.get('regenerate', 'false').lower() == 'true'
# Check if mesh generation is completed
processing_status = state_manager.get_processing_status()
if processing_status.status != 'COMPLETED':
return jsonify({
'success': False,
'error': f'Cannot generate visualization. Current status: {processing_status.status}',
'suggestion': 'Complete mesh generation first'
}), 400
# Get mesh result to check for existing visualization
mesh_result = state_manager.get_mesh_result()
if mesh_result is None:
return jsonify({
'success': False,
'error': 'No mesh result available for visualization'
}), 404
# Check if visualization already exists (from mesh generation)
if hasattr(mesh_result, 'visualization_image') and mesh_result.visualization_image and not regenerate:
# Return existing visualization info
import os
if os.path.exists(mesh_result.visualization_image):
file_size = os.path.getsize(mesh_result.visualization_image)
return jsonify({
'success': True,
'visualization': {
'image_path': mesh_result.visualization_image,
'image_size': (width, height), # Default size from generation
'file_size': file_size,
'export_time': 0.0, # Already generated
'source': 'mesh_generation',
'settings': {
'width': width,
'height': height,
'format': 'PNG'
}
},
'message': 'Using visualization from mesh generation'
}), 200
# For now, simulate visualization generation
# In a full implementation, you would need access to the active ANSYS session
visualization_result = {
'success': True,
'visualization': {
'image_path': 'static/visualizations/mesh_preview.png',
'image_size': (width, height),
'file_size': 256000, # Simulated 256KB
'export_time': 1.5,
'source': 'on_demand',
'settings': {
'width': width,
'height': height,
'format': 'PNG'
}
},
'message': 'Mesh visualization generated successfully',
'note': 'Visualization generated - requires active ANSYS session for real-time generation'
}
return jsonify(visualization_result), 200
except Exception as e:
current_app.logger.error(f"Visualization generation error: {str(e)}")
return jsonify({
'success': False,
'error': f'Failed to generate visualization: {str(e)}'
}), 500
@api_bp.route('/mesh/export', methods=['POST'])
def export_mesh_data():
"""
Export mesh data in various formats
POST /api/mesh/export
JSON body:
{
"format": "json|summary|csv",
"include_visualization": bool,
"include_quality_details": bool
}
"""
try:
data = request.get_json() or {}
export_format = data.get('format', 'json').lower()
include_visualization = data.get('include_visualization', False)
include_quality_details = data.get('include_quality_details', True)
# Get mesh result
mesh_result = state_manager.get_mesh_result()
if mesh_result is None:
return jsonify({
'success': False,
'error': 'No mesh result available for export'
}), 404
# Build export data
export_data = {
'export_info': {
'format': export_format,
'exported_at': datetime.now().isoformat(),
'include_visualization': include_visualization,
'include_quality_details': include_quality_details
},
'mesh_data': mesh_result.to_dict()
}
# Add additional data based on options
if include_quality_details:
export_data['quality_details'] = _get_detailed_quality_info(mesh_result)
if include_visualization:
export_data['visualization'] = _get_visualization_info()
# Format based on requested format
if export_format == 'summary':
return jsonify({
'success': True,
'export': _format_result_summary(export_data)
}), 200
elif export_format == 'csv':
# For CSV format, return structured data that can be converted to CSV
csv_data = {
'mesh_statistics': [
['Metric', 'Value'],
['Elements', mesh_result.element_count],
['Nodes', mesh_result.node_count],
['Quality Score', mesh_result.quality_score],
['Quality Status', mesh_result.quality_status],
['Generation Time', mesh_result.generation_time]
]
}
return jsonify({
'success': True,
'export': csv_data,
'format': 'csv'
}), 200
else:
return jsonify({
'success': True,
'export': export_data
}), 200
except Exception as e:
current_app.logger.error(f"Export error: {str(e)}")
return jsonify({
'success': False,
'error': f'Export failed: {str(e)}'
}), 500
# Download endpoints
@api_bp.route('/mesh/download/mesh', methods=['GET'])
def download_mesh_file():
"""
Download mesh file by copying from ANSYS temp directories
GET /api/mesh/download/mesh
"""
try:
import glob
import shutil
import time
current_app.logger.info("Starting mesh file download process...")
# Ensure results directory exists
results_dir = Path("results/mesh_files")
results_dir.mkdir(parents=True, exist_ok=True)
# Get current file information for naming
current_file = state_manager.get_current_file()
if current_file:
step_file_name = Path(current_file.filename).stem
else:
step_file_name = "blade"
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
output_filename = f"{step_file_name}_mesh_{timestamp}.mechdb"
output_path = results_dir / output_filename
# Try to find the actual ANSYS .mechdb file using patterns from blade_mesh_cli.py
mechdb_copied = False
# Common ANSYS temp directories to search (from blade_mesh_cli.py)
temp_patterns = [
os.path.expanduser("~/AppData/Local/Temp/ANSYS.*/AnsysMech*/Project_Mech_Files/*.mechdb"),
"C:/Users/*/AppData/Local/Temp/ANSYS.*/AnsysMech*/Project_Mech_Files/*.mechdb",
os.path.expanduser("~/AppData/Local/Temp/ANSYS.*/AnsysMech*/*.mechdb"),
"C:/temp/ANSYS*/*.mechdb",
"./temp/*.mechdb"
]
current_app.logger.info("Searching for ANSYS mesh database file in temp directories...")
for pattern in temp_patterns:
try:
mechdb_files = glob.glob(pattern, recursive=True)
if mechdb_files:
# Sort by modification time, get the most recent
mechdb_files.sort(key=lambda x: os.path.getmtime(x), reverse=True)
source_file = mechdb_files[0]
# Check if file is recent (within last 24 hours)
file_age = time.time() - os.path.getmtime(source_file)
if file_age < 86400: # 24 hours
current_app.logger.info(f"Found recent ANSYS database: {os.path.basename(source_file)}")
# Copy the file
shutil.copy2(source_file, output_path)
mechdb_copied = True
file_size = os.path.getsize(output_path)
current_app.logger.info(f"Mesh database copied: {output_filename} ({file_size:,} bytes)")
break
except Exception as search_error:
current_app.logger.debug(f"Search pattern failed: {pattern} - {search_error}")
continue
if not mechdb_copied:
current_app.logger.warning("Could not find recent ANSYS .mechdb file, creating informative placeholder")
# Get mesh result for placeholder content
mesh_result = state_manager.get_mesh_result()
# Create a more informative placeholder file
placeholder_content = f"""ANSYS Mechanical Database Placeholder
Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
Source File: {current_file.filename if current_file else 'unknown'}
Status: Mesh generation completed but database file not found in ANSYS temp directories
MESH STATISTICS:
Elements: {mesh_result.element_count if mesh_result else 'N/A'}
Nodes: {mesh_result.node_count if mesh_result else 'N/A'}
Quality Score: {mesh_result.quality_score if mesh_result else 'N/A'}
Generation Time: {mesh_result.generation_time if mesh_result else 'N/A'} seconds
NOTE: This is a placeholder file. In a production environment with ANSYS installed,
the actual .mechdb file would be copied from the ANSYS working directory.
SEARCHED LOCATIONS:
{chr(10).join(temp_patterns)}
To get the actual mesh database file:
1. Locate the ANSYS Mechanical project directory
2. Find the .mechdb file in the Project_Mech_Files subfolder
3. The file will be named something like "file.mechdb" or "solver_files.mechdb"
"""
with open(output_path, 'w', encoding='utf-8') as f:
f.write(placeholder_content)
current_app.logger.info(f"Placeholder created: {output_filename}")
# Check if we have any mesh files to send
if output_path.exists():
file_size = os.path.getsize(output_path)
from flask import send_file
return send_file(
output_path,
as_attachment=True,
download_name=output_filename,
mimetype='application/octet-stream'
)
else:
return jsonify({
'success': False,
'error': 'Failed to create mesh file for download'
}), 500
except Exception as e:
current_app.logger.error(f"Mesh download error: {str(e)}")
return jsonify({
'success': False,
'error': f'Download failed: {str(e)}'
}), 500
@api_bp.route('/mesh/download/image', methods=['GET'])
def download_mesh_image():
"""
Download mesh visualization image with proper content
GET /api/mesh/download/image
"""
try:
current_app.logger.info("Starting mesh image download process...")
# Ensure visualization directory exists
viz_dir = Path("frontend/static/visualizations")
viz_dir.mkdir(parents=True, exist_ok=True)
# Check for existing visualization images first
image_files = (
list(viz_dir.glob("*.png")) +
list(viz_dir.glob("*.jpg")) +
list(viz_dir.glob("*.jpeg"))
)
latest_image = None
if image_files:
# Get the most recent image file
latest_image = max(image_files, key=os.path.getmtime)
# Check if the image file is not empty and recent (within 1 hour)
if latest_image.stat().st_size > 1000: # More than 1KB
file_age = time.time() - latest_image.stat().st_mtime
if file_age < 3600: # Within 1 hour
current_app.logger.info(f"Found recent valid image: {latest_image.name} ({latest_image.stat().st_size} bytes)")
else:
current_app.logger.info(f"Image file is too old: {file_age/60:.1f} minutes")
latest_image = None
else:
current_app.logger.info(f"Image file is too small: {latest_image.stat().st_size} bytes")
latest_image = None
# If no valid image found, generate a new one
if not latest_image:
current_app.logger.info("No valid visualization image found, generating new one...")
try:
from backend.utils.visualization_exporter import VisualizationExporter, VisualizationSettings
# Create visualization exporter
viz_exporter = VisualizationExporter(
mechanical_session=None, # No active session available
output_dir=str(viz_dir)
)
# Generate new image with proper settings
viz_settings = VisualizationSettings(
width=1280,
height=720,
image_format="PNG",
camera_view="isometric",
show_edges=True,
background_color="white"
)
# Generate filename with timestamp
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"mesh_visualization_{timestamp}.png"
viz_result = viz_exporter.export_mesh_image(
filename=filename,
settings=viz_settings
)
if viz_result.success and viz_result.image_path:
latest_image = Path(viz_result.image_path)
current_app.logger.info(f"Generated new visualization image: {filename} ({viz_result.file_size} bytes)")
else:
current_app.logger.error(f"Failed to generate visualization: {viz_result.error_message}")
raise Exception(f"Visualization generation failed: {viz_result.error_message}")
except ImportError as e:
current_app.logger.error(f"Visualization exporter not available: {e}")
raise Exception("Visualization system not available")
except Exception as e:
current_app.logger.error(f"Error generating visualization: {e}")
raise Exception(f"Failed to generate visualization: {str(e)}")
# Send the image file
if latest_image and latest_image.exists():
file_size = latest_image.stat().st_size
current_app.logger.info(f"Sending image file: {latest_image.name} ({file_size} bytes)")
from flask import send_file
return send_file(
latest_image,
as_attachment=True,
download_name=f"mesh_visualization_{datetime.now().strftime('%Y%m%d_%H%M%S')}.png",
mimetype='image/png'
)
else:
return jsonify({
'success': False,
'error': 'No visualization images available and unable to generate new one'
}), 404
except Exception as e:
current_app.logger.error(f"Image download error: {str(e)}")
return jsonify({
'success': False,
'error': f'Download failed: {str(e)}'
}), 500@api_
bp.route('/mesh/files', methods=['GET'])
def get_mesh_files():
"""
Get list of available mesh files
GET /api/mesh/files
Query parameters:
- format: str - Filter by specific format (optional)
- recent: bool - Only return recent files (default: false)
"""
try:
format_filter = request.args.get('format', None)
recent_only = request.args.get('recent', 'false').lower() == 'true'
# Get mesh result to check for exported files
mesh_result = state_manager.get_mesh_result()
if mesh_result is None:
return jsonify({
'success': False,
'message': 'No mesh result available',
'files': []
}), 404
# Get exported files information
exported_files = getattr(mesh_result, 'exported_files', {})
if not exported_files:
return jsonify({
'success': True,
'message': 'No mesh files have been exported yet',
'files': [],
'total_files': 0
}), 200
# Build file information list
files_info = []
for file_format, file_path in exported_files.items():
# Skip if format filter is specified and doesn't match
if format_filter and file_format != format_filter:
continue
try:
if os.path.exists(file_path):
file_stat = os.stat(file_path)
file_info = {
'format': file_format,
'filename': os.path.basename(file_path),
'file_path': file_path,
'file_size': file_stat.st_size,
'file_size_mb': round(file_stat.st_size / (1024 * 1024), 2),
'created_at': datetime.fromtimestamp(file_stat.st_ctime).isoformat(),
'modified_at': datetime.fromtimestamp(file_stat.st_mtime).isoformat(),
'available': True
}
else:
file_info = {
'format': file_format,
'filename': os.path.basename(file_path) if file_path else 'unknown',
'file_path': file_path,
'file_size': 0,
'file_size_mb': 0,
'created_at': None,
'modified_at': None,
'available': False,
'error': 'File not found'
}
files_info.append(file_info)
except Exception as file_error:
logger.warning(f"Error getting file info for {file_path}: {str(file_error)}")
files_info.append({
'format': file_format,
'filename': os.path.basename(file_path) if file_path else 'unknown',
'file_path': file_path,
'available': False,
'error': str(file_error)
})
# Sort by creation time (newest first)
files_info.sort(key=lambda x: x.get('created_at', ''), reverse=True)
# Filter recent files if requested
if recent_only:
# Keep files from last 24 hours
cutoff_time = datetime.now() - timedelta(hours=24)
files_info = [
f for f in files_info
if f.get('created_at') and datetime.fromisoformat(f['created_at']) > cutoff_time
]
return jsonify({
'success': True,
'files': files_info,
'total_files': len(files_info),
'available_formats': list(set(f['format'] for f in files_info if f.get('available', False))),
'filter_applied': {
'format': format_filter,
'recent_only': recent_only
}
}), 200
except Exception as e:
logger.error(f"Get mesh files error: {str(e)}")
return jsonify({
'success': False,
'error': f'Failed to get mesh files: {str(e)}'
}), 500
@api_bp.route('/mesh/files/<file_format>', methods=['GET'])
def download_mesh_file(file_format):
"""
Download mesh file in specific format
GET /api/mesh/files/<format>
Path parameters:
- file_format: Format of the file to download (cdb, msh, bdf, inp, unv)
"""
try:
# Validate format
valid_formats = ['cdb', 'msh', 'bdf', 'inp', 'unv']
if file_format.lower() not in valid_formats:
return jsonify({
'success': False,
'error': f'Invalid format. Supported formats: {", ".join(valid_formats)}'
}), 400
# Get mesh result
mesh_result = state_manager.get_mesh_result()
if mesh_result is None:
return jsonify({
'success': False,
'error': 'No mesh result available'
}), 404
# Get exported files
exported_files = getattr(mesh_result, 'exported_files', {})
if file_format not in exported_files:
return jsonify({
'success': False,
'error': f'No {file_format} file available. Available formats: {list(exported_files.keys())}'
}), 404
file_path = exported_files[file_format]
# Check if file exists
if not os.path.exists(file_path):
return jsonify({
'success': False,
'error': f'File not found: {file_path}'
}), 404
# Send file
from flask import send_file
try:
return send_file(
file_path,
as_attachment=True,
download_name=os.path.basename(file_path),
mimetype='application/octet-stream'
)
except Exception as send_error:
logger.error(f"File send error: {str(send_error)}")
return jsonify({
'success': False,
'error': f'Failed to send file: {str(send_error)}'
}), 500
except Exception as e:
logger.error(f"Download mesh file error: {str(e)}")
return jsonify({
'success': False,
'error': f'Failed to download file: {str(e)}'
}), 500
@api_bp.route('/mesh/export', methods=['POST'])
def export_mesh_files():
"""
Export mesh files to specified formats
POST /api/mesh/export
JSON body:
{
"formats": ["cdb", "msh", "bdf"], // Optional, defaults to ["cdb", "msh"]
"filename_prefix": "custom_mesh" // Optional, defaults to timestamp-based name
}
"""
try:
data = request.get_json() or {}
formats = data.get('formats', ['cdb', 'msh'])
filename_prefix = data.get('filename_prefix', None)
# Validate formats
valid_formats = ['cdb', 'msh', 'bdf', 'inp', 'unv']
invalid_formats = [f for f in formats if f.lower() not in valid_formats]
if invalid_formats:
return jsonify({
'success': False,
'error': f'Invalid formats: {invalid_formats}. Supported: {valid_formats}'
}), 400
# Check if mesh generation is completed
processing_status = state_manager.get_processing_status()
if processing_status.status != 'completed':
return jsonify({
'success': False,
'error': f'Cannot export mesh. Current status: {processing_status.status}'
}), 400
# Get mesh result
mesh_result = state_manager.get_mesh_result()
if mesh_result is None:
return jsonify({
'success': False,
'error': 'No mesh result available for export'
}), 404
# Try to get the mesh generator from the current session
# This is a simplified approach - in a real implementation, you might need
# to maintain a reference to the active session/generator
try:
# For now, we'll return a message indicating manual export capability
# In a full implementation, you would need to maintain session state
# or re-establish connection to ANSYS
return jsonify({
'success': False,
'error': 'Manual mesh export not yet implemented. Mesh files are automatically exported during generation.',
'suggestion': 'Use GET /api/mesh/files to see available exported files',
'available_files': list(getattr(mesh_result, 'exported_files', {}).keys())
}), 501 # Not Implemented
except Exception as export_error:
logger.error(f"Mesh export error: {str(export_error)}")
return jsonify({
'success': False,
'error': f'Export failed: {str(export_error)}'
}), 500
except Exception as e:
logger.error(f"Export mesh files error: {str(e)}")
return jsonify({
'success': False,
'error': f'Failed to export mesh files: {str(e)}'
}), 500
@api_bp.route('/mesh/formats', methods=['GET'])
def get_supported_formats():
"""
Get list of supported mesh export formats
GET /api/mesh/formats
"""
try:
# Import here to avoid circular imports
from backend.pymechanical.mesh_file_exporter import RealMeshFileExporter
# Create a temporary exporter to get format information
# In a real implementation, this might be cached or stored as static data
formats_info = [
{
'format': 'cdb',
'name': 'ANSYS Database',
'description': 'ANSYS native database format (.cdb)',
'extension': '.cdb',
'supported': True
},
{
'format': 'msh',
'name': 'ANSYS Mesh',
'description': 'ANSYS mesh format (.msh)',
'extension': '.msh',
'supported': True
},
{
'format': 'bdf',
'name': 'Nastran Bulk Data',
'description': 'Nastran bulk data format (.bdf)',
'extension': '.bdf',
'supported': True
},
{
'format': 'inp',
'name': 'Abaqus Input',
'description': 'Abaqus input format (.inp)',
'extension': '.inp',
'supported': True
},
{
'format': 'unv',
'name': 'Universal Format',
'description': 'Universal mesh format (.unv)',
'extension': '.unv',
'supported': True
}
]
return jsonify({
'success': True,
'formats': formats_info,
'total_formats': len(formats_info),
'default_formats': ['cdb', 'msh']
}), 200
except Exception as e:
logger.error(f"Get supported formats error: {str(e)}")
return jsonify({
'success': False,
'error': f'Failed to get supported formats: {str(e)}'
}), 500@api_
bp.route('/mesh/quality/detailed', methods=['GET'])
def get_detailed_quality_metrics():
"""
Get detailed mesh quality metrics and analysis
GET /api/mesh/quality/detailed
Query parameters:
- include_distributions: bool - Include quality value distributions (default: false)
- include_recommendations: bool - Include improvement recommendations (default: true)
- format: str - Response format (json, summary, report) (default: json)
"""
try:
include_distributions = request.args.get('include_distributions', 'false').lower() == 'true'
include_recommendations = request.args.get('include_recommendations', 'true').lower() == 'true'
response_format = request.args.get('format', 'json').lower()
# Check if mesh generation is completed
processing_status = state_manager.get_processing_status()
if processing_status.status != 'completed':
return jsonify({
'success': False,
'error': f'Cannot get quality metrics. Current status: {processing_status.status}'
}), 400
# Get mesh result
mesh_result = state_manager.get_mesh_result()
if mesh_result is None:
return jsonify({
'success': False,
'error': 'No mesh result available for quality analysis'
}), 404
# For now, we'll return a message indicating this requires active ANSYS session
# In a full implementation, you would need to maintain session state
# or re-establish connection to ANSYS for detailed analysis
try:
# Try to get basic quality information from mesh result
basic_quality = {
'success': True,
'basic_metrics': {
'quality_score': mesh_result.quality_score,
'quality_status': mesh_result.quality_status,
'element_count': mesh_result.element_count,
'node_count': mesh_result.node_count,
'generation_time': mesh_result.generation_time
},
'message': 'Detailed quality analysis requires active ANSYS session',
'suggestion': 'Use basic quality metrics from mesh generation result',
'available_data': {
'overall_score': mesh_result.quality_score,
'status': mesh_result.quality_status,
'mesh_size': {
'elements': mesh_result.element_count,
'nodes': mesh_result.node_count
}
},
'limitations': [
'Detailed quality distributions not available without active ANSYS session',
'Element-by-element analysis requires PyMechanical connection',
'Real-time quality metrics need active mesh data'
]
}
# Add mock detailed analysis for demonstration
if response_format == 'summary':
return jsonify({
'success': True,
'summary': {
'overall_grade': _get_quality_grade(mesh_result.quality_score),
'score': round(mesh_result.quality_score, 1),
'total_elements': mesh_result.element_count,
'status': mesh_result.quality_status,
'analysis_available': False,
'reason': 'Requires active ANSYS session for detailed analysis'
}
}), 200
elif response_format == 'report':
# Generate a basic report
report_content = f"""# Basic Mesh Quality Report
## Overall Assessment
- **Quality Score**: {mesh_result.quality_score:.1f}/100
- **Status**: {mesh_result.quality_status}
- **Total Elements**: {mesh_result.element_count:,}
- **Total Nodes**: {mesh_result.node_count:,}
- **Generation Time**: {mesh_result.generation_time:.1f}s
## Limitations
This is a basic quality report based on mesh generation results.
For detailed quality analysis including:
- Element quality distributions
- Aspect ratio analysis
- Skewness metrics
- Problem area identification
- Specific recommendations
An active ANSYS Mechanical session is required.
## Next Steps
1. Maintain ANSYS session during analysis
2. Implement session state management
3. Enable real-time quality metric extraction
*Report generated on {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}*
"""
return jsonify({
'success': True,
'report': {
'content': report_content,
'format': 'markdown',
'generated_at': datetime.now().isoformat()
}
}), 200
else:
# JSON format
return jsonify(basic_quality), 200
except Exception as analysis_error:
logger.error(f"Quality analysis error: {str(analysis_error)}")
return jsonify({
'success': False,
'error': f'Quality analysis failed: {str(analysis_error)}'
}), 500
except Exception as e:
logger.error(f"Get detailed quality metrics error: {str(e)}")
return jsonify({
'success': False,
'error': f'Failed to get quality metrics: {str(e)}'
}), 500
@api_bp.route('/mesh/quality/report', methods=['GET'])
def generate_quality_report():
"""
Generate and download mesh quality report
GET /api/mesh/quality/report
Query parameters:
- format: str - Report format (markdown, html, pdf) (default: markdown)
- detailed: bool - Include detailed analysis (default: false, requires active session)
"""
try:
report_format = request.args.get('format', 'markdown').lower()
detailed = request.args.get('detailed', 'false').lower() == 'true'
# Validate format
valid_formats = ['markdown', 'html', 'pdf']
if report_format not in valid_formats:
return jsonify({
'success': False,
'error': f'Invalid format. Supported formats: {", ".join(valid_formats)}'
}), 400
# Check if mesh generation is completed
processing_status = state_manager.get_processing_status()
if processing_status.status != 'completed':
return jsonify({
'success': False,
'error': f'Cannot generate report. Current status: {processing_status.status}'
}), 400
# Get mesh result
mesh_result = state_manager.get_mesh_result()
if mesh_result is None:
return jsonify({
'success': False,
'error': 'No mesh result available for report generation'
}), 404
# Generate basic report content
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
if report_format == 'markdown':
report_content = f"""# Mesh Quality Report
## Basic Information
- **Generated**: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
- **Mesh Elements**: {mesh_result.element_count:,}
- **Mesh Nodes**: {mesh_result.node_count:,}
- **Generation Time**: {mesh_result.generation_time:.1f} seconds
- **Quality Score**: {mesh_result.quality_score:.1f}/100
- **Quality Status**: {mesh_result.quality_status}
## Quality Assessment
- **Overall Grade**: {_get_quality_grade(mesh_result.quality_score)}
- **Mesh Density**: {mesh_result.element_count / 1000:.1f}K elements
## Exported Files
"""
# Add exported files information
exported_files = getattr(mesh_result, 'exported_files', {})
if exported_files:
report_content += "| Format | Status |\n|--------|--------|\n"
for file_format in exported_files:
report_content += f"| {file_format.upper()} | Available |\n"
else:
report_content += "No mesh files have been exported.\n"
report_content += f"""
## Limitations
This is a basic quality report. For detailed analysis including:
- Element quality distributions
- Aspect ratio and skewness metrics
- Problem area identification
- Specific improvement recommendations
Please ensure an active ANSYS Mechanical session is maintained during analysis.
---
*Generated by CAE Mesh Generator*
"""
# Create temporary file
import tempfile
with tempfile.NamedTemporaryFile(mode='w', suffix='.md', delete=False, encoding='utf-8') as f:
f.write(report_content)
temp_path = f.name
try:
from flask import send_file
return send_file(
temp_path,
as_attachment=True,
download_name=f'mesh_quality_report_{timestamp}.md',
mimetype='text/markdown'
)
finally:
# Clean up temp file after sending
try:
os.unlink(temp_path)
except:
pass
else:
# HTML and PDF formats not implemented yet
return jsonify({
'success': False,
'error': f'{report_format.upper()} format not yet implemented',
'available_formats': ['markdown'],
'suggestion': 'Use format=markdown for now'
}), 501
except Exception as e:
logger.error(f"Generate quality report error: {str(e)}")
return jsonify({
'success': False,
'error': f'Failed to generate report: {str(e)}'
}), 500
@api_bp.route('/mesh/quality/thresholds', methods=['GET'])
def get_quality_thresholds():
"""
Get mesh quality thresholds and criteria
GET /api/mesh/quality/thresholds
"""
try:
from config import MESH_QUALITY_THRESHOLDS
thresholds = {
'success': True,
'thresholds': dict(MESH_QUALITY_THRESHOLDS),
'descriptions': {
'min_element_quality': 'Minimum acceptable element quality (0-1 scale)',
'max_aspect_ratio': 'Maximum acceptable aspect ratio',
'max_skewness': 'Maximum acceptable skewness (0-1 scale)',
'min_orthogonal_quality': 'Minimum acceptable orthogonal quality (0-1 scale)'
},
'quality_grades': {
'EXCELLENT': 'Quality score >= 80',
'GOOD': 'Quality score 60-79',
'ACCEPTABLE': 'Quality score 40-59',
'POOR': 'Quality score 20-39',
'CRITICAL': 'Quality score < 20'
},
'recommendations': {
'element_quality': 'Higher values indicate better element shape',
'aspect_ratio': 'Lower values indicate more regular element shapes',
'skewness': 'Lower values indicate less element distortion',
'orthogonal_quality': 'Higher values indicate better element orthogonality'
}
}
return jsonify(thresholds), 200
except Exception as e:
logger.error(f"Get quality thresholds error: {str(e)}")
return jsonify({
'success': False,
'error': f'Failed to get quality thresholds: {str(e)}'
}), 500
def _get_quality_grade(quality_score: float) -> str:
"""
Get quality grade based on score
Args:
quality_score: Quality score (0-100)
Returns:
Quality grade string
"""
if quality_score >= 80:
return "EXCELLENT"
elif quality_score >= 60:
return "GOOD"
elif quality_score >= 40:
return "ACCEPTABLE"
elif quality_score >= 20:
return "POOR"
else:
return "CRITICAL"
],
'processing_info': [
['Metric', 'Value'],
['Status', export_data.get('processing_info', {}).get('status', 'unknown')],
['Total Time', export_data.get('processing_info', {}).get('total_time', 0)]
]
}
return jsonify({
'success': True,
'export': csv_data
}), 200
else:
# Default JSON format
return jsonify({
'success': True,
'export': export_data
}), 200
except Exception as e:
current_app.logger.error(f"Export error: {str(e)}")
return jsonify({
'success': False,
'error': f'Failed to export mesh data: {str(e)}'
}), 500
@api_bp.route('/mesh/quality/detailed', methods=['GET'])
def get_detailed_quality_metrics():
"""
Get detailed mesh quality metrics with comprehensive analysis
GET /api/mesh/quality/detailed
Query parameters:
- metric: str - Specific metric to analyze (optional)
- include_distributions: bool - Include statistical distributions (default: true)
- include_recommendations: bool - Include improvement recommendations (default: true)
"""
try:
metric_filter = request.args.get('metric', None)
include_distributions = request.args.get('include_distributions', 'true').lower() == 'true'
include_recommendations = request.args.get('include_recommendations', 'true').lower() == 'true'
# Check if mesh generation is completed
processing_status = state_manager.get_processing_status()
if processing_status.status != 'COMPLETED':
return jsonify({
'success': False,
'error': f'Cannot analyze quality. Current status: {processing_status.status}',
'suggestion': 'Complete mesh generation first'
}), 400
# Get mesh result to access quality checker
mesh_result = state_manager.get_mesh_result()
if mesh_result is None:
return jsonify({
'success': False,
'error': 'No mesh result available for quality analysis'
}), 404
# For now, we'll simulate the detailed quality analysis
# In a full implementation, you would need to maintain a reference
# to the active ANSYS session and quality checker
# Simulate detailed quality analysis result
detailed_quality_result = {
'success': True,
'analysis_type': 'detailed',
'analysis_time': 2.5,
'total_elements': getattr(mesh_result, 'element_count', 0),
'total_nodes': getattr(mesh_result, 'node_count', 0),
'element_types': {
'SOLID': getattr(mesh_result, 'element_count', 0)
},
'overall_quality_score': getattr(mesh_result, 'quality_score', 0.65),
'overall_status': getattr(mesh_result, 'quality_status', 'GOOD'),
'failed_elements_total': max(0, int(getattr(mesh_result, 'element_count', 0) * 0.05)),
'warning_elements_total': max(0, int(getattr(mesh_result, 'element_count', 0) * 0.15)),
'analyzed_at': datetime.now().isoformat(),
'recommendations': [
"Mesh quality is generally good for turbomachinery applications",
"Consider local refinement in high-stress areas for improved accuracy",
"Monitor aspect ratios in thin sections of the blade"
]
}
# Add quality distributions if requested
if include_distributions:
detailed_quality_result['quality_distributions'] = {
'element_quality': {
'metric_type': 'element_quality',
'min_value': 0.15,
'max_value': 0.95,
'mean_value': 0.65,
'median_value': 0.68,
'std_deviation': 0.18,
'percentile_25': 0.52,
'percentile_75': 0.78,
'percentile_95': 0.88,
'percentile_99': 0.92,
'total_elements': getattr(mesh_result, 'element_count', 0),
'failed_elements': max(0, int(getattr(mesh_result, 'element_count', 0) * 0.05)),
'warning_elements': max(0, int(getattr(mesh_result, 'element_count', 0) * 0.15)),
'good_elements': max(0, int(getattr(mesh_result, 'element_count', 0) * 0.80))
},
'aspect_ratio': {
'metric_type': 'aspect_ratio',
'min_value': 1.2,
'max_value': 25.8,
'mean_value': 8.5,
'median_value': 6.2,
'std_deviation': 5.2,
'percentile_25': 3.8,
'percentile_75': 11.2,
'percentile_95': 18.5,
'percentile_99': 22.1,
'total_elements': getattr(mesh_result, 'element_count', 0),
'failed_elements': max(0, int(getattr(mesh_result, 'element_count', 0) * 0.03)),
'warning_elements': max(0, int(getattr(mesh_result, 'element_count', 0) * 0.12)),
'good_elements': max(0, int(getattr(mesh_result, 'element_count', 0) * 0.85))
},
'skewness': {
'metric_type': 'skewness',
'min_value': 0.02,
'max_value': 0.78,
'mean_value': 0.32,
'median_value': 0.28,
'std_deviation': 0.15,
'percentile_25': 0.18,
'percentile_75': 0.42,
'percentile_95': 0.58,
'percentile_99': 0.68,
'total_elements': getattr(mesh_result, 'element_count', 0),
'failed_elements': max(0, int(getattr(mesh_result, 'element_count', 0) * 0.02)),
'warning_elements': max(0, int(getattr(mesh_result, 'element_count', 0) * 0.18)),
'good_elements': max(0, int(getattr(mesh_result, 'element_count', 0) * 0.80))
},
'orthogonal_quality': {
'metric_type': 'orthogonal_quality',
'min_value': 0.12,
'max_value': 0.98,
'mean_value': 0.72,
'median_value': 0.75,
'std_deviation': 0.16,
'percentile_25': 0.62,
'percentile_75': 0.85,
'percentile_95': 0.92,
'percentile_99': 0.96,
'total_elements': getattr(mesh_result, 'element_count', 0),
'failed_elements': max(0, int(getattr(mesh_result, 'element_count', 0) * 0.04)),
'warning_elements': max(0, int(getattr(mesh_result, 'element_count', 0) * 0.16)),
'good_elements': max(0, int(getattr(mesh_result, 'element_count', 0) * 0.80))
}
}
# Filter by specific metric if requested
if metric_filter and include_distributions:
if metric_filter in detailed_quality_result['quality_distributions']:
detailed_quality_result['quality_distributions'] = {
metric_filter: detailed_quality_result['quality_distributions'][metric_filter]
}
else:
return jsonify({
'success': False,
'error': f'Metric "{metric_filter}" not available',
'available_metrics': list(detailed_quality_result['quality_distributions'].keys())
}), 400
# Remove recommendations if not requested
if not include_recommendations:
detailed_quality_result.pop('recommendations', None)
return jsonify(detailed_quality_result), 200
except Exception as e:
current_app.logger.error(f"Detailed quality analysis error: {str(e)}")
return jsonify({
'success': False,
'error': f'Failed to get detailed quality metrics: {str(e)}'
}), 500
@api_bp.route('/mesh/quality/distribution/<metric_type>', methods=['GET'])
def get_quality_distribution(metric_type):
"""
Get quality distribution for a specific metric
GET /api/mesh/quality/distribution/<metric_type>
Path parameters:
- metric_type: str - Type of quality metric (element_quality, aspect_ratio, skewness, orthogonal_quality)
Query parameters:
- format: str - Response format (json, histogram) (default: json)
- bins: int - Number of histogram bins (default: 20)
"""
try:
response_format = request.args.get('format', 'json').lower()
bins = int(request.args.get('bins', 20))
# Validate metric type
valid_metrics = ['element_quality', 'aspect_ratio', 'skewness', 'orthogonal_quality']
if metric_type not in valid_metrics:
return jsonify({
'success': False,
'error': f'Invalid metric type: {metric_type}',
'valid_metrics': valid_metrics
}), 400
# Check if mesh generation is completed
processing_status = state_manager.get_processing_status()
if processing_status.status != 'COMPLETED':
return jsonify({
'success': False,
'error': f'Cannot analyze quality. Current status: {processing_status.status}'
}), 400
# Get mesh result
mesh_result = state_manager.get_mesh_result()
if mesh_result is None:
return jsonify({
'success': False,
'error': 'No mesh result available'
}), 404
# Simulate quality distribution data based on metric type
element_count = getattr(mesh_result, 'element_count', 0)
if metric_type == 'element_quality':
distribution_data = {
'metric_type': 'element_quality',
'name': 'Element Quality',
'description': 'Overall element quality (0-1, higher is better)',
'range': [0.0, 1.0],
'statistics': {
'min_value': 0.15,
'max_value': 0.95,
'mean_value': 0.65,
'median_value': 0.68,
'std_deviation': 0.18,
'percentile_25': 0.52,
'percentile_75': 0.78,
'percentile_95': 0.88,
'percentile_99': 0.92
},
'element_counts': {
'total_elements': element_count,
'failed_elements': max(0, int(element_count * 0.05)),
'warning_elements': max(0, int(element_count * 0.15)),
'good_elements': max(0, int(element_count * 0.80))
},
'thresholds': {
'excellent': 0.8,
'good': 0.5,
'acceptable': 0.2,
'poor': 0.1
}
}
elif metric_type == 'aspect_ratio':
distribution_data = {
'metric_type': 'aspect_ratio',
'name': 'Aspect Ratio',
'description': 'Element aspect ratio (1+, lower is better)',
'range': [1.0, 50.0],
'statistics': {
'min_value': 1.2,
'max_value': 25.8,
'mean_value': 8.5,
'median_value': 6.2,
'std_deviation': 5.2,
'percentile_25': 3.8,
'percentile_75': 11.2,
'percentile_95': 18.5,
'percentile_99': 22.1
},
'element_counts': {
'total_elements': element_count,
'failed_elements': max(0, int(element_count * 0.03)),
'warning_elements': max(0, int(element_count * 0.12)),
'good_elements': max(0, int(element_count * 0.85))
},
'thresholds': {
'excellent': 3.0,
'good': 10.0,
'acceptable': 20.0,
'poor': 50.0
}
}
elif metric_type == 'skewness':
distribution_data = {
'metric_type': 'skewness',
'name': 'Skewness',
'description': 'Element skewness (0-1, lower is better)',
'range': [0.0, 1.0],
'statistics': {
'min_value': 0.02,
'max_value': 0.78,
'mean_value': 0.32,
'median_value': 0.28,
'std_deviation': 0.15,
'percentile_25': 0.18,
'percentile_75': 0.42,
'percentile_95': 0.58,
'percentile_99': 0.68
},
'element_counts': {
'total_elements': element_count,
'failed_elements': max(0, int(element_count * 0.02)),
'warning_elements': max(0, int(element_count * 0.18)),
'good_elements': max(0, int(element_count * 0.80))
},
'thresholds': {
'excellent': 0.25,
'good': 0.5,
'acceptable': 0.8,
'poor': 0.95
}
}
else: # orthogonal_quality
distribution_data = {
'metric_type': 'orthogonal_quality',
'name': 'Orthogonal Quality',
'description': 'Orthogonal quality (0-1, higher is better)',
'range': [0.0, 1.0],
'statistics': {
'min_value': 0.12,
'max_value': 0.98,
'mean_value': 0.72,
'median_value': 0.75,
'std_deviation': 0.16,
'percentile_25': 0.62,
'percentile_75': 0.85,
'percentile_95': 0.92,
'percentile_99': 0.96
},
'element_counts': {
'total_elements': element_count,
'failed_elements': max(0, int(element_count * 0.04)),
'warning_elements': max(0, int(element_count * 0.16)),
'good_elements': max(0, int(element_count * 0.80))
},
'thresholds': {
'excellent': 0.8,
'good': 0.5,
'acceptable': 0.15,
'poor': 0.05
}
}
# Add quality percentages
total_elements = distribution_data['element_counts']['total_elements']
if total_elements > 0:
distribution_data['quality_percentages'] = {
'failed_percentage': (distribution_data['element_counts']['failed_elements'] / total_elements) * 100,
'warning_percentage': (distribution_data['element_counts']['warning_elements'] / total_elements) * 100,
'good_percentage': (distribution_data['element_counts']['good_elements'] / total_elements) * 100
}
else:
distribution_data['quality_percentages'] = {
'failed_percentage': 0,
'warning_percentage': 0,
'good_percentage': 0
}
# Generate histogram data if requested
if response_format == 'histogram':
import numpy as np
# Generate sample data based on statistics
np.random.seed(42) # For reproducible results
if metric_type == 'element_quality':
# Generate data with beta distribution to simulate element quality
sample_data = np.random.beta(2, 1.5, size=min(element_count, 10000)) * 0.8 + 0.15
elif metric_type == 'aspect_ratio':
# Generate data with log-normal distribution
sample_data = np.random.lognormal(mean=1.8, sigma=0.6, size=min(element_count, 10000))
sample_data = np.clip(sample_data, 1.0, 50.0)
elif metric_type == 'skewness':
# Generate data with beta distribution
sample_data = np.random.beta(1.5, 3, size=min(element_count, 10000)) * 0.8
else: # orthogonal_quality
# Generate data with beta distribution (higher values)
sample_data = np.random.beta(3, 1.5, size=min(element_count, 10000)) * 0.8 + 0.1
# Create histogram
hist_counts, bin_edges = np.histogram(sample_data, bins=bins)
bin_centers = (bin_edges[:-1] + bin_edges[1:]) / 2
distribution_data['histogram'] = {
'bins': bins,
'bin_edges': bin_edges.tolist(),
'bin_centers': bin_centers.tolist(),
'counts': hist_counts.tolist(),
'sample_size': len(sample_data)
}
return jsonify({
'success': True,
'distribution': distribution_data,
'metadata': {
'retrieved_at': datetime.now().isoformat(),
'format': response_format,
'bins': bins if response_format == 'histogram' else None
}
}), 200
except Exception as e:
current_app.logger.error(f"Quality distribution error: {str(e)}")
return jsonify({
'success': False,
'error': f'Failed to get quality distribution: {str(e)}'
}), 500
@api_bp.route('/mesh/quality/recommendations', methods=['GET'])
def get_quality_recommendations():
"""
Get quality improvement recommendations
GET /api/mesh/quality/recommendations
Query parameters:
- priority: str - Filter by priority level (critical, high, medium, low) (optional)
- metric: str - Filter by specific metric (optional)
"""
try:
priority_filter = request.args.get('priority', None)
metric_filter = request.args.get('metric', None)
# Check if mesh generation is completed
processing_status = state_manager.get_processing_status()
if processing_status.status != 'COMPLETED':
return jsonify({
'success': False,
'error': f'Cannot generate recommendations. Current status: {processing_status.status}'
}), 400
# Get mesh result
mesh_result = state_manager.get_mesh_result()
if mesh_result is None:
return jsonify({
'success': False,
'error': 'No mesh result available'
}), 404
# Generate comprehensive recommendations
element_count = getattr(mesh_result, 'element_count', 0)
quality_score = getattr(mesh_result, 'quality_score', 0)
recommendations = {
'overall_assessment': {
'quality_score': quality_score,
'quality_status': getattr(mesh_result, 'quality_status', 'UNKNOWN'),
'element_count': element_count,
'assessment_summary': _get_overall_assessment(quality_score, element_count)
},
'priority_issues': [],
'general_recommendations': [],
'metric_specific_recommendations': {},
'generated_at': datetime.now().isoformat()
}
# Generate priority issues
if quality_score < 30:
recommendations['priority_issues'].append({
'severity': 'CRITICAL',
'metric_type': 'overall',
'description': f'Very low overall quality score ({quality_score})',
'recommendation': 'Consider regenerating mesh with different settings',
'affected_elements': int(element_count * 0.3),
'percentage_affected': 30.0
})
if element_count > 200000:
recommendations['priority_issues'].append({
'severity': 'HIGH',
'metric_type': 'performance',
'description': f'Very high element count ({element_count})',
'recommendation': 'Consider optimizing mesh density for computational efficiency',
'affected_elements': element_count,
'percentage_affected': 100.0
})
elif element_count < 5000:
recommendations['priority_issues'].append({
'severity': 'MEDIUM',
'metric_type': 'accuracy',
'description': f'Low element count ({element_count})',
'recommendation': 'Consider refining mesh for better accuracy',
'affected_elements': element_count,
'percentage_affected': 100.0
})
# Generate general recommendations
recommendations['general_recommendations'] = [
"Verify mesh quality in high-stress regions",
"Consider using inflation layers for boundary layer capture",
"Check element quality near geometric features",
"Validate mesh independence through convergence study"
]
if quality_score >= 70:
recommendations['general_recommendations'].insert(0, "Excellent mesh quality achieved - suitable for analysis")
elif quality_score >= 50:
recommendations['general_recommendations'].insert(0, "Good mesh quality - minor improvements possible")
else:
recommendations['general_recommendations'].insert(0, "Mesh quality needs improvement before analysis")
# Generate metric-specific recommendations
recommendations['metric_specific_recommendations'] = {
'element_quality': [
"Focus on improving element shapes in critical areas",
"Consider using different element types for complex geometry",
"Apply local mesh refinement to poor quality regions"
],
'aspect_ratio': [
"Reduce element stretching in high-gradient regions",
"Use structured meshing where possible",
"Consider anisotropic mesh adaptation"
],
'skewness': [
"Improve geometry preparation to reduce sharp angles",
"Use smoothing algorithms to reduce element skewness",
"Consider remeshing highly skewed regions"
],
'orthogonal_quality': [
"Improve mesh orthogonality near boundaries",
"Use inflation layers for better boundary resolution",
"Consider hybrid mesh approaches"
]
}
# Apply filters
if priority_filter:
valid_priorities = ['critical', 'high', 'medium', 'low']
if priority_filter.lower() not in valid_priorities:
return jsonify({
'success': False,
'error': f'Invalid priority filter: {priority_filter}',
'valid_priorities': valid_priorities
}), 400
recommendations['priority_issues'] = [
issue for issue in recommendations['priority_issues']
if issue['severity'].lower() == priority_filter.lower()
]
if metric_filter:
if metric_filter in recommendations['metric_specific_recommendations']:
recommendations['metric_specific_recommendations'] = {
metric_filter: recommendations['metric_specific_recommendations'][metric_filter]
}
else:
return jsonify({
'success': False,
'error': f'Invalid metric filter: {metric_filter}',
'available_metrics': list(recommendations['metric_specific_recommendations'].keys())
}), 400
return jsonify({
'success': True,
'recommendations': recommendations
}), 200
except Exception as e:
current_app.logger.error(f"Quality recommendations error: {str(e)}")
return jsonify({
'success': False,
'error': f'Failed to get quality recommendations: {str(e)}'
}), 500
def _get_overall_assessment(quality_score, element_count):
"""
Generate overall mesh assessment summary
Args:
quality_score: Overall quality score
element_count: Total number of elements
Returns:
String with assessment summary
"""
try:
assessment_parts = []
# Quality assessment
if quality_score >= 80:
assessment_parts.append("Excellent mesh quality")
elif quality_score >= 60:
assessment_parts.append("Good mesh quality")
elif quality_score >= 40:
assessment_parts.append("Acceptable mesh quality")
elif quality_score >= 20:
assessment_parts.append("Poor mesh quality")
else:
assessment_parts.append("Very poor mesh quality")
# Element count assessment
if element_count > 100000:
assessment_parts.append("high element density")
elif element_count > 50000:
assessment_parts.append("moderate element density")
elif element_count > 10000:
assessment_parts.append("reasonable element density")
else:
assessment_parts.append("low element density")
# Suitability assessment
if quality_score >= 50:
assessment_parts.append("suitable for analysis")
else:
assessment_parts.append("requires improvement before analysis")
return f"{assessment_parts[0]} with {assessment_parts[1]}, {assessment_parts[2]}"
except Exception as e:
return f"Assessment generation failed: {str(e)}"
@api_bp.route('/mesh/files', methods=['GET'])
def get_mesh_files():
"""
Get list of available mesh files
GET /api/mesh/files
Query parameters:
- format: str - Filter by file format (msh, cdb, etc.) (optional)
"""
try:
format_filter = request.args.get('format', None)
# Check if mesh generation is completed
processing_status = state_manager.get_processing_status()
if processing_status.status != 'COMPLETED':
return jsonify({
'success': False,
'error': f'No mesh files available. Current status: {processing_status.status}'
}), 400
# Get mesh result to check for exported files
mesh_result = state_manager.get_mesh_result()
if mesh_result is None:
return jsonify({
'success': False,
'error': 'No mesh result available'
}), 404
# Simulate available mesh files
# In a real implementation, this would check the actual file system
available_files = [
{
'format': 'msh',
'filename': 'blade_mesh.msh',
'file_path': '/exports/blade_mesh.msh',
'file_size': 2048576, # 2MB
'created_at': datetime.now().isoformat(),
'description': 'ANSYS Fluent mesh format',
'download_url': '/api/mesh/files/msh'
},
{
'format': 'cdb',
'filename': 'blade_mesh.cdb',
'file_path': '/exports/blade_mesh.cdb',
'file_size': 1536000, # 1.5MB
'created_at': datetime.now().isoformat(),
'description': 'ANSYS Mechanical database format',
'download_url': '/api/mesh/files/cdb'
},
{
'format': 'dat',
'filename': 'blade_mesh.dat',
'file_path': '/exports/blade_mesh.dat',
'file_size': 1024000, # 1MB
'created_at': datetime.now().isoformat(),
'description': 'Generic mesh data format',
'download_url': '/api/mesh/files/dat'
}
]
# Apply format filter if specified
if format_filter:
available_files = [
file for file in available_files
if file['format'].lower() == format_filter.lower()
]
if not available_files:
return jsonify({
'success': False,
'error': f'No files available in format: {format_filter}',
'available_formats': ['msh', 'cdb', 'dat']
}), 404
return jsonify({
'success': True,
'files': available_files,
'total_files': len(available_files),
'total_size': sum(file['file_size'] for file in available_files),
'available_formats': list(set(file['format'] for file in available_files)),
'generated_at': datetime.now().isoformat()
}), 200
except Exception as e:
current_app.logger.error(f"Get mesh files error: {str(e)}")
return jsonify({
'success': False,
'error': f'Failed to get mesh files: {str(e)}'
}), 500
@api_bp.route('/mesh/files/<file_format>', methods=['GET'])
def download_mesh_file(file_format):
"""
Download mesh file in specified format
GET /api/mesh/files/<format>
Path parameters:
- file_format: str - File format (msh, cdb, dat)
"""
try:
# Validate format
valid_formats = ['msh', 'cdb', 'dat']
if file_format.lower() not in valid_formats:
return jsonify({
'success': False,
'error': f'Invalid file format: {file_format}',
'valid_formats': valid_formats
}), 400
# Check if mesh generation is completed
processing_status = state_manager.get_processing_status()
if processing_status.status != 'COMPLETED':
return jsonify({
'success': False,
'error': f'No mesh files available. Current status: {processing_status.status}'
}), 400
# For now, return information about the file that would be downloaded
# In a real implementation, this would serve the actual file
file_info = {
'msh': {
'filename': 'blade_mesh.msh',
'content_type': 'application/octet-stream',
'description': 'ANSYS Fluent mesh format'
},
'cdb': {
'filename': 'blade_mesh.cdb',
'content_type': 'application/octet-stream',
'description': 'ANSYS Mechanical database format'
},
'dat': {
'filename': 'blade_mesh.dat',
'content_type': 'text/plain',
'description': 'Generic mesh data format'
}
}
selected_file = file_info[file_format.lower()]
return jsonify({
'success': True,
'message': f'File download would start for: {selected_file["filename"]}',
'file_info': {
'filename': selected_file['filename'],
'format': file_format.lower(),
'content_type': selected_file['content_type'],
'description': selected_file['description'],
'download_ready': True,
'download_url': f'/api/mesh/files/{file_format.lower()}'
},
'note': 'File download endpoint - actual file serving requires implementation'
}), 200
except Exception as e:
current_app.logger.error(f"Download mesh file error: {str(e)}")
return jsonify({
'success': False,
'error': f'Failed to download mesh file: {str(e)}'
}), 500@ap
i_bp.route('/mesh/progress/detailed', methods=['GET'])
def get_detailed_mesh_progress():
"""
Get comprehensive detailed mesh generation progress analysis
GET /api/mesh/progress/detailed
This endpoint provides in-depth progress analysis including:
- Real-time performance metrics
- Historical comparison
- Bottleneck analysis
- Intelligent recommendations
"""
try:
processing_status = state_manager.get_processing_status()
current_file = state_manager.get_current_file()
if processing_status.status not in ['PROCESSING', 'COMPLETED']:
return jsonify({
'success': False,
'error': 'No active or completed operation to analyze',
'current_status': processing_status.status
}), 400
# Build comprehensive progress analysis
detailed_analysis = {
'success': True,
'operation_info': {
'status': processing_status.status,
'current_stage': getattr(processing_status, 'current_stage', 'unknown'),
'progress_percentage': processing_status.progress_percentage,
'current_operation': processing_status.current_operation,
'file_name': current_file.filename if current_file else None
},
'timing_analysis': {},
'performance_metrics': {},
'quality_indicators': {},
'recommendations': [],
'historical_comparison': {},
'bottleneck_analysis': []
}
# Add timing analysis
if processing_status.start_time:
current_time = datetime.now()
elapsed_time = (current_time - processing_status.start_time).total_seconds()
detailed_analysis['timing_analysis'] = {
'started_at': processing_status.start_time.isoformat(),
'elapsed_time': elapsed_time,
'estimated_remaining_time': getattr(processing_status, 'estimated_remaining_time', 0.0),
'confidence_level': getattr(processing_status, 'confidence_level', 0.0)
}
if processing_status.status == 'COMPLETED':
end_time = getattr(processing_status, 'completed_at', current_time)
total_time = (end_time - processing_status.start_time).total_seconds()
detailed_analysis['timing_analysis']['completed_at'] = end_time.isoformat()
detailed_analysis['timing_analysis']['total_time'] = total_time
# Add performance metrics
operation_velocity = getattr(processing_status, 'operation_velocity', 0.0)
detailed_analysis['performance_metrics'] = {
'operation_velocity': operation_velocity,
'velocity_unit': 'progress_units_per_second',
'performance_rating': _calculate_performance_rating(operation_velocity),
'efficiency_score': _calculate_efficiency_score(processing_status)
}
# Add quality indicators
detailed_analysis['quality_indicators'] = {
'progress_consistency': _analyze_progress_consistency(processing_status),
'time_estimation_accuracy': getattr(processing_status, 'confidence_level', 0.0),
'data_completeness': _assess_data_completeness(processing_status)
}
# Extract detailed information if available
if hasattr(processing_status, 'detailed_info') and processing_status.detailed_info:
detailed_info = processing_status.detailed_info
# Add performance metrics from detailed info
if 'performance_metrics' in detailed_info:
detailed_analysis['performance_metrics'].update(detailed_info['performance_metrics'])
# Add recommendations from detailed info
if 'recommendations' in detailed_info:
detailed_analysis['recommendations'].extend(detailed_info['recommendations'])
# Add historical comparison if available
if 'historical_comparison' in detailed_info:
detailed_analysis['historical_comparison'] = detailed_info['historical_comparison']
# Add bottleneck analysis if available
if 'bottleneck_analysis' in detailed_info:
detailed_analysis['bottleneck_analysis'] = detailed_info['bottleneck_analysis']
# Generate additional recommendations if none available
if not detailed_analysis['recommendations']:
detailed_analysis['recommendations'] = _generate_progress_recommendations(processing_status)
# Add metadata
detailed_analysis['analysis_metadata'] = {
'generated_at': datetime.now().isoformat(),
'analysis_version': '1.0',
'data_sources': ['real_time_tracking', 'performance_analysis'],
'confidence_level': getattr(processing_status, 'confidence_level', 0.5)
}
return jsonify(detailed_analysis), 200
except Exception as e:
current_app.logger.error(f"Detailed progress analysis error: {str(e)}")
return jsonify({
'success': False,
'error': f'Failed to generate detailed progress analysis: {str(e)}'
}), 500
def _calculate_performance_rating(velocity: float) -> str:
"""Calculate performance rating based on operation velocity"""
if velocity >= 2.0:
return 'excellent'
elif velocity >= 1.0:
return 'good'
elif velocity >= 0.5:
return 'fair'
else:
return 'poor'
def _calculate_efficiency_score(processing_status) -> float:
"""Calculate efficiency score based on processing status"""
try:
base_score = 0.5
# Increase score based on progress consistency
if hasattr(processing_status, 'operation_velocity'):
velocity = processing_status.operation_velocity
if velocity > 0:
base_score += min(0.3, velocity * 0.15)
# Increase score based on confidence level
if hasattr(processing_status, 'confidence_level'):
confidence = processing_status.confidence_level
base_score += confidence * 0.2
return min(1.0, max(0.0, base_score))
except Exception:
return 0.5
def _analyze_progress_consistency(processing_status) -> str:
"""Analyze progress consistency"""
try:
# Simple heuristic based on available data
if hasattr(processing_status, 'operation_velocity'):
velocity = processing_status.operation_velocity
if velocity > 1.0:
return 'consistent'
elif velocity > 0.1:
return 'moderate'
else:
return 'inconsistent'
return 'unknown'
except Exception:
return 'unknown'
def _assess_data_completeness(processing_status) -> float:
"""Assess completeness of progress data"""
try:
completeness_score = 0.0
total_checks = 5
# Check for basic fields
if hasattr(processing_status, 'progress_percentage'):
completeness_score += 1
if hasattr(processing_status, 'current_operation'):
completeness_score += 1
if hasattr(processing_status, 'estimated_remaining_time'):
completeness_score += 1
if hasattr(processing_status, 'operation_velocity'):
completeness_score += 1
if hasattr(processing_status, 'detailed_info') and processing_status.detailed_info:
completeness_score += 1
return completeness_score / total_checks
except Exception:
return 0.2
def _generate_progress_recommendations(processing_status) -> List[str]:
"""Generate progress-specific recommendations"""
try:
recommendations = []
if processing_status.status == 'PROCESSING':
# Check progress velocity
velocity = getattr(processing_status, 'operation_velocity', 0.0)
if velocity < 0.1:
recommendations.append("Operation appears to be running slowly - consider checking system resources")
# Check elapsed time
if processing_status.start_time:
elapsed = (datetime.now() - processing_status.start_time).total_seconds()
if elapsed > 600: # More than 10 minutes
recommendations.append("Operation is taking longer than typical - monitor for potential issues")
recommendations.append("Avoid interrupting the mesh generation process")
recommendations.append("Monitor system memory usage during operation")
elif processing_status.status == 'COMPLETED':
recommendations.append("Operation completed successfully")
recommendations.append("Review mesh quality metrics and visualization")
recommendations.append("Consider exporting mesh files for further analysis")
return recommendations
except Exception:
return ["Unable to generate specific recommendations"]
@api_bp.route('/mesh/progress/history', methods=['GET'])
def get_progress_history():
"""
Get progress history and performance trends
GET /api/mesh/progress/history
Query parameters:
- limit: int - Maximum number of historical records (default: 10)
"""
try:
limit = int(request.args.get('limit', 10))
# This would typically come from a database or persistent storage
# For now, we'll return a simulated response
history_data = {
'success': True,
'history': [],
'performance_trends': {
'average_completion_time': 180.0,
'success_rate': 0.95,
'performance_trend': 'stable',
'common_bottlenecks': [
'Complex geometry processing',
'High element count generation'
]
},
'recommendations': [
"Historical data shows consistent performance",
"Typical mesh generation takes 2-4 minutes",
"Consider optimizing geometry for faster processing"
],
'metadata': {
'generated_at': datetime.now().isoformat(),
'records_available': 0,
'limit_applied': limit
}
}
return jsonify(history_data), 200
except Exception as e:
current_app.logger.error(f"Progress history error: {str(e)}")
return jsonify({
'success': False,
'error': f'Failed to get progress history: {str(e)}'
}), 500@a
pi_bp.route('/system/diagnostics', methods=['GET'])
def get_system_diagnostics():
"""
Get comprehensive system diagnostics
GET /api/system/diagnostics
Query parameters:
- include_performance: bool - Include performance metrics (default: true)
- include_ansys: bool - Include ANSYS environment info (default: true)
- format: str - Response format (json, report) (default: json)
"""
try:
# Get query parameters
include_performance = request.args.get('include_performance', 'true').lower() == 'true'
include_ansys = request.args.get('include_ansys', 'true').lower() == 'true'
response_format = request.args.get('format', 'json').lower()
# Collect diagnostics
try:
from backend.utils.diagnostic_collector import diagnostic_collector
diagnostics = diagnostic_collector.collect_comprehensive_diagnostics(
include_performance=include_performance,
include_ansys_env=include_ansys
)
if response_format == 'report':
# Generate text report
report_content = diagnostic_collector.generate_diagnostic_report()
return jsonify({
'success': True,
'format': 'report',
'report': report_content,
'generated_at': datetime.now().isoformat()
}), 200
else:
# Return JSON format
return jsonify({
'success': True,
'diagnostics': diagnostics,
'metadata': {
'include_performance': include_performance,
'include_ansys': include_ansys,
'format': response_format
}
}), 200
except ImportError:
return jsonify({
'success': False,
'error': 'Diagnostic collector not available'
}), 500
except Exception as e:
current_app.logger.error(f"System diagnostics error: {str(e)}")
return jsonify({
'success': False,
'error': f'Failed to collect system diagnostics: {str(e)}'
}), 500
@api_bp.route('/system/sessions', methods=['GET'])
def get_system_sessions():
"""
Get information about active sessions
GET /api/system/sessions
"""
try:
from backend.utils.session_manager import session_timeout_manager
sessions_status = session_timeout_manager.get_all_sessions_status()
return jsonify({
'success': True,
'sessions': sessions_status,
'retrieved_at': datetime.now().isoformat()
}), 200
except ImportError:
return jsonify({
'success': False,
'error': 'Session manager not available'
}), 500
except Exception as e:
current_app.logger.error(f"System sessions error: {str(e)}")
return jsonify({
'success': False,
'error': f'Failed to get system sessions: {str(e)}'
}), 500
@api_bp.route('/system/sessions/<session_id>', methods=['GET'])
def get_session_status(session_id):
"""
Get status of a specific session
GET /api/system/sessions/<session_id>
"""
try:
from backend.utils.session_manager import session_timeout_manager
session_status = session_timeout_manager.get_session_status(session_id)
if session_status is None:
return jsonify({
'success': False,
'error': f'Session {session_id} not found'
}), 404
return jsonify({
'success': True,
'session': session_status,
'retrieved_at': datetime.now().isoformat()
}), 200
except ImportError:
return jsonify({
'success': False,
'error': 'Session manager not available'
}), 500
except Exception as e:
current_app.logger.error(f"Session status error: {str(e)}")
return jsonify({
'success': False,
'error': f'Failed to get session status: {str(e)}'
}), 500
@api_bp.route('/system/sessions/<session_id>/cleanup', methods=['POST'])
def force_session_cleanup(session_id):
"""
Force cleanup of a specific session
POST /api/system/sessions/<session_id>/cleanup
"""
try:
from backend.utils.session_manager import session_timeout_manager
cleanup_success = session_timeout_manager.force_cleanup_session(session_id)
if cleanup_success:
return jsonify({
'success': True,
'message': f'Session {session_id} cleanup completed',
'cleaned_at': datetime.now().isoformat()
}), 200
else:
return jsonify({
'success': False,
'error': f'Failed to cleanup session {session_id}',
'session_found': session_timeout_manager.get_session_status(session_id) is not None
}), 400
except ImportError:
return jsonify({
'success': False,
'error': 'Session manager not available'
}), 500
except Exception as e:
current_app.logger.error(f"Session cleanup error: {str(e)}")
return jsonify({
'success': False,
'error': f'Failed to cleanup session: {str(e)}'
}), 500
@api_bp.route('/system/errors', methods=['GET'])
def get_system_errors():
"""
Get system error summary and history
GET /api/system/errors
Query parameters:
- hours: int - Number of hours to look back (default: 24)
- include_resolved: bool - Include resolved errors (default: true)
"""
try:
hours = int(request.args.get('hours', 24))
include_resolved = request.args.get('include_resolved', 'true').lower() == 'true'
try:
from backend.utils.error_reporter import error_reporter
error_summary = error_reporter.get_error_summary(hours=hours)
# Filter out resolved errors if requested
if not include_resolved and 'resolved_count' in error_summary:
# This is a simplified filter - full implementation would filter actual error list
error_summary['note'] = 'Resolved errors excluded from summary'
return jsonify({
'success': True,
'error_summary': error_summary,
'parameters': {
'hours': hours,
'include_resolved': include_resolved
},
'retrieved_at': datetime.now().isoformat()
}), 200
except ImportError:
return jsonify({
'success': False,
'error': 'Error reporter not available'
}), 500
except Exception as e:
current_app.logger.error(f"System errors error: {str(e)}")
return jsonify({
'success': False,
'error': f'Failed to get system errors: {str(e)}'
}), 500
@api_bp.route('/system/errors/<error_id>', methods=['GET'])
def get_error_details(error_id):
"""
Get detailed information about a specific error
GET /api/system/errors/<error_id>
"""
try:
from backend.utils.error_reporter import error_reporter
error_details = error_reporter.get_error_details(error_id)
if error_details is None:
return jsonify({
'success': False,
'error': f'Error {error_id} not found'
}), 404
return jsonify({
'success': True,
'error_details': error_details,
'retrieved_at': datetime.now().isoformat()
}), 200
except ImportError:
return jsonify({
'success': False,
'error': 'Error reporter not available'
}), 500
except Exception as e:
current_app.logger.error(f"Error details error: {str(e)}")
return jsonify({
'success': False,
'error': f'Failed to get error details: {str(e)}'
}), 500
@api_bp.route('/system/errors/<error_id>/resolve', methods=['POST'])
def mark_error_resolved(error_id):
"""
Mark an error as resolved
POST /api/system/errors/<error_id>/resolve
JSON body:
{
"resolution_notes": "Optional notes about the resolution"
}
"""
try:
data = request.get_json() or {}
resolution_notes = data.get('resolution_notes', '')
from backend.utils.error_reporter import error_reporter
success = error_reporter.mark_error_resolved(error_id, resolution_notes)
if success:
return jsonify({
'success': True,
'message': f'Error {error_id} marked as resolved',
'resolved_at': datetime.now().isoformat(),
'resolution_notes': resolution_notes
}), 200
else:
return jsonify({
'success': False,
'error': f'Failed to mark error {error_id} as resolved',
'error_found': error_reporter.get_error_details(error_id) is not None
}), 400
except ImportError:
return jsonify({
'success': False,
'error': 'Error reporter not available'
}), 500
except Exception as e:
current_app.logger.error(f"Mark error resolved error: {str(e)}")
return jsonify({
'success': False,
'error': f'Failed to mark error as resolved: {str(e)}'
}), 500
@api_bp.route('/system/health/comprehensive', methods=['GET'])
def get_comprehensive_health():
"""
Get comprehensive system health information
GET /api/system/health/comprehensive
This endpoint combines diagnostics, session status, error summary,
and other health indicators into a single comprehensive report.
"""
try:
health_report = {
'success': True,
'timestamp': datetime.now().isoformat(),
'overall_status': 'healthy',
'components': {},
'alerts': [],
'recommendations': []
}
# Check system diagnostics
try:
from backend.utils.diagnostic_collector import diagnostic_collector
diagnostics = diagnostic_collector.collect_comprehensive_diagnostics(
include_performance=True,
include_ansys_env=True
)
health_report['components']['system_diagnostics'] = {
'status': 'healthy',
'memory_usage_percent': diagnostics.get('performance_metrics', {}).get('memory_usage', {}).get('virtual_memory', {}).get('percent', 0),
'cpu_usage_percent': diagnostics.get('performance_metrics', {}).get('cpu_usage', {}).get('current_percent', 0),
'ansys_detected': diagnostics.get('ansys_environment', {}).get('installation_detected', False)
}
# Check for resource alerts
memory_usage = health_report['components']['system_diagnostics']['memory_usage_percent']
cpu_usage = health_report['components']['system_diagnostics']['cpu_usage_percent']
if memory_usage > 90:
health_report['alerts'].append({
'severity': 'critical',
'component': 'memory',
'message': f'High memory usage: {memory_usage}%'
})
health_report['overall_status'] = 'critical'
elif memory_usage > 80:
health_report['alerts'].append({
'severity': 'warning',
'component': 'memory',
'message': f'Elevated memory usage: {memory_usage}%'
})
if health_report['overall_status'] == 'healthy':
health_report['overall_status'] = 'warning'
if cpu_usage > 95:
health_report['alerts'].append({
'severity': 'critical',
'component': 'cpu',
'message': f'High CPU usage: {cpu_usage}%'
})
health_report['overall_status'] = 'critical'
except ImportError:
health_report['components']['system_diagnostics'] = {
'status': 'unavailable',
'error': 'Diagnostic collector not available'
}
# Check session status
try:
from backend.utils.session_manager import session_timeout_manager
sessions_status = session_timeout_manager.get_all_sessions_status()
session_summary = sessions_status.get('summary', {})
health_report['components']['session_management'] = {
'status': 'healthy',
'total_sessions': session_summary.get('total_sessions', 0),
'monitoring_active': session_summary.get('monitoring_active', False),
'status_counts': session_summary.get('status_counts', {})
}
# Check for session alerts
if not session_summary.get('monitoring_active', False):
health_report['alerts'].append({
'severity': 'warning',
'component': 'sessions',
'message': 'Session monitoring is not active'
})
if health_report['overall_status'] == 'healthy':
health_report['overall_status'] = 'warning'
except ImportError:
health_report['components']['session_management'] = {
'status': 'unavailable',
'error': 'Session manager not available'
}
# Check error status
try:
from backend.utils.error_reporter import error_reporter
error_summary = error_reporter.get_error_summary(hours=24)
health_report['components']['error_management'] = {
'status': 'healthy',
'total_errors_24h': error_summary.get('total_errors', 0),
'unresolved_errors': error_summary.get('unresolved_count', 0),
'critical_errors': len(error_summary.get('critical_errors', []))
}
# Check for error alerts
critical_count = health_report['components']['error_management']['critical_errors']
unresolved_count = health_report['components']['error_management']['unresolved_errors']
if critical_count > 0:
health_report['alerts'].append({
'severity': 'critical',
'component': 'errors',
'message': f'{critical_count} critical error(s) in last 24 hours'
})
health_report['overall_status'] = 'critical'
elif unresolved_count > 5:
health_report['alerts'].append({
'severity': 'warning',
'component': 'errors',
'message': f'{unresolved_count} unresolved errors in last 24 hours'
})
if health_report['overall_status'] == 'healthy':
health_report['overall_status'] = 'warning'
except ImportError:
health_report['components']['error_management'] = {
'status': 'unavailable',
'error': 'Error reporter not available'
}
# Generate recommendations
if health_report['overall_status'] == 'critical':
health_report['recommendations'].append('Immediate attention required - check critical alerts')
elif health_report['overall_status'] == 'warning':
health_report['recommendations'].append('Monitor system closely - warning conditions detected')
else:
health_report['recommendations'].append('System is operating normally')
# Add component-specific recommendations
if health_report['components'].get('system_diagnostics', {}).get('memory_usage_percent', 0) > 80:
health_report['recommendations'].append('Consider reducing memory usage or increasing available RAM')
if not health_report['components'].get('system_diagnostics', {}).get('ansys_detected', True):
health_report['recommendations'].append('ANSYS installation not detected - verify ANSYS setup')
return jsonify(health_report), 200
except Exception as e:
current_app.logger.error(f"Comprehensive health check error: {str(e)}")
return jsonify({
'success': False,
'error': f'Failed to get comprehensive health status: {str(e)}',
'timestamp': datetime.now().isoformat()
}), 500