1042 lines
39 KiB
Python
1042 lines
39 KiB
Python
"""
|
|
API routes for CAE Mesh Generator
|
|
"""
|
|
import os
|
|
import uuid
|
|
import time
|
|
from datetime import datetime
|
|
from flask import Blueprint, request, jsonify, current_app
|
|
from werkzeug.utils import secure_filename
|
|
from pathlib import Path
|
|
|
|
from backend.models.data_models import UploadedFile, ProcessingStatus
|
|
from backend.utils.file_validator import validate_step_file, get_file_info
|
|
from backend.utils.state_manager import state_manager
|
|
from backend.utils.mesh_processor import process_blade_mesh_with_state_updates, ProcessingStep
|
|
from backend.utils.visualization_exporter import VisualizationExporter, VisualizationSettings
|
|
from backend.utils.error_handler import (
|
|
handle_api_error, handle_ansys_error, validate_file_upload,
|
|
FileUploadError, ANSYSError, MeshGenerationError, ValidationError,
|
|
error_reporter, log_processing_step
|
|
)
|
|
from config import ALLOWED_EXTENSIONS, UPLOAD_FOLDER
|
|
import threading
|
|
|
|
# Create API blueprint
|
|
api_bp = Blueprint('api', __name__)
|
|
|
|
def allowed_file(filename):
|
|
"""Check if file extension is allowed"""
|
|
return Path(filename).suffix.lower() in ALLOWED_EXTENSIONS
|
|
|
|
def get_file_size(file_path):
|
|
"""Get file size in bytes"""
|
|
try:
|
|
return os.path.getsize(file_path)
|
|
except OSError:
|
|
return 0
|
|
|
|
@api_bp.route('/upload', methods=['POST'])
|
|
@handle_api_error
|
|
def upload_file():
|
|
"""
|
|
Handle file upload
|
|
POST /api/upload
|
|
"""
|
|
log_processing_step("file_upload", "started")
|
|
|
|
# Check if file is in request
|
|
if 'file' not in request.files:
|
|
raise FileUploadError("No file provided")
|
|
|
|
file = request.files['file']
|
|
|
|
# Validate file upload
|
|
validate_file_upload(file)
|
|
|
|
# Validate file extension
|
|
if not allowed_file(file.filename):
|
|
raise FileUploadError(
|
|
f'Invalid file format. Only {", ".join(ALLOWED_EXTENSIONS)} files are supported.',
|
|
details={'provided_filename': file.filename, 'allowed_extensions': ALLOWED_EXTENSIONS}
|
|
)
|
|
|
|
# Generate unique filename
|
|
file_id = str(uuid.uuid4())
|
|
original_filename = secure_filename(file.filename)
|
|
file_extension = Path(original_filename).suffix
|
|
unique_filename = f"{file_id}{file_extension}"
|
|
|
|
# Ensure upload directory exists
|
|
os.makedirs(UPLOAD_FOLDER, exist_ok=True)
|
|
|
|
# Save file
|
|
file_path = os.path.join(UPLOAD_FOLDER, unique_filename)
|
|
file.save(file_path)
|
|
|
|
# Validate uploaded file
|
|
is_valid, validation_error = validate_step_file(file_path)
|
|
if not is_valid:
|
|
# Remove invalid file
|
|
try:
|
|
os.remove(file_path)
|
|
except:
|
|
pass
|
|
raise FileUploadError(f'File validation failed: {validation_error}')
|
|
|
|
# Get file information
|
|
file_info = get_file_info(file_path)
|
|
|
|
# Create file record
|
|
uploaded_file = UploadedFile(
|
|
id=file_id,
|
|
filename=original_filename,
|
|
file_path=file_path,
|
|
upload_time=datetime.now(),
|
|
status='UPLOADED'
|
|
)
|
|
|
|
# Update state manager
|
|
state_manager.set_current_file(uploaded_file)
|
|
|
|
return jsonify({
|
|
'success': True,
|
|
'file': uploaded_file.to_dict(),
|
|
'file_info': file_info,
|
|
'message': 'File uploaded and validated successfully'
|
|
}), 200
|
|
|
|
@api_bp.route('/files/current', methods=['GET'])
|
|
def get_current_file():
|
|
"""
|
|
Get current uploaded file information
|
|
GET /api/files/current
|
|
"""
|
|
current_file = state_manager.get_current_file()
|
|
|
|
if current_file is None:
|
|
return jsonify({
|
|
'success': False,
|
|
'message': 'No file uploaded'
|
|
}), 404
|
|
|
|
return jsonify({
|
|
'success': True,
|
|
'file': current_file.to_dict()
|
|
}), 200
|
|
|
|
@api_bp.route('/mesh/status', methods=['GET'])
|
|
def get_mesh_status():
|
|
"""
|
|
Get current processing status
|
|
GET /api/mesh/status
|
|
"""
|
|
processing_status = state_manager.get_processing_status()
|
|
|
|
return jsonify({
|
|
'success': True,
|
|
'status': processing_status.to_dict()
|
|
}), 200
|
|
|
|
@api_bp.route('/mesh/result', methods=['GET'])
|
|
def get_mesh_result():
|
|
"""
|
|
Get comprehensive mesh generation result with statistics and visualization
|
|
GET /api/mesh/result
|
|
|
|
Query parameters:
|
|
- include_visualization: bool - Include visualization data (default: false)
|
|
- include_quality_details: bool - Include detailed quality metrics (default: false)
|
|
- format: str - Response format (json, summary) (default: json)
|
|
"""
|
|
try:
|
|
# Get query parameters
|
|
include_visualization = request.args.get('include_visualization', 'false').lower() == 'true'
|
|
include_quality_details = request.args.get('include_quality_details', 'false').lower() == 'true'
|
|
response_format = request.args.get('format', 'json').lower()
|
|
|
|
# Get basic mesh result
|
|
mesh_result = state_manager.get_mesh_result()
|
|
|
|
if mesh_result is None:
|
|
return jsonify({
|
|
'success': False,
|
|
'message': 'No mesh result available'
|
|
}), 404
|
|
|
|
# Get processing status for additional context
|
|
processing_status = state_manager.get_processing_status()
|
|
current_file = state_manager.get_current_file()
|
|
|
|
# Build comprehensive result
|
|
result_data = {
|
|
'basic_info': mesh_result.to_dict(),
|
|
'processing_info': {
|
|
'status': processing_status.status if processing_status else 'unknown',
|
|
'progress_percentage': processing_status.progress_percentage if processing_status else 0,
|
|
'started_at': processing_status.start_time.isoformat() if processing_status and processing_status.start_time else None,
|
|
'completed_at': processing_status.completed_at.isoformat() if processing_status and processing_status.completed_at else None,
|
|
'total_time': (processing_status.completed_at - processing_status.start_time).total_seconds() if processing_status and processing_status.start_time and processing_status.completed_at else 0
|
|
},
|
|
'file_info': {
|
|
'filename': current_file.filename if current_file else 'unknown',
|
|
'file_size': get_file_size(current_file.file_path) if current_file and current_file.file_path else 0,
|
|
'upload_time': current_file.upload_time.isoformat() if current_file and current_file.upload_time else None
|
|
}
|
|
}
|
|
|
|
# Add detailed quality information if requested
|
|
if include_quality_details:
|
|
result_data['quality_details'] = _get_detailed_quality_info(mesh_result)
|
|
|
|
# Add visualization data if requested
|
|
if include_visualization:
|
|
result_data['visualization'] = _get_visualization_info()
|
|
|
|
# Format response based on requested format
|
|
if response_format == 'summary':
|
|
return jsonify({
|
|
'success': True,
|
|
'summary': _format_result_summary(result_data)
|
|
}), 200
|
|
else:
|
|
return jsonify({
|
|
'success': True,
|
|
'result': result_data,
|
|
'metadata': {
|
|
'retrieved_at': datetime.now().isoformat(),
|
|
'include_visualization': include_visualization,
|
|
'include_quality_details': include_quality_details,
|
|
'format': response_format
|
|
}
|
|
}), 200
|
|
|
|
except Exception as e:
|
|
current_app.logger.error(f"Get mesh result error: {str(e)}")
|
|
return jsonify({
|
|
'success': False,
|
|
'error': f'Failed to retrieve mesh result: {str(e)}'
|
|
}), 500
|
|
|
|
@api_bp.route('/system/state', methods=['GET'])
|
|
def get_system_state():
|
|
"""
|
|
Get complete system state
|
|
GET /api/system/state
|
|
"""
|
|
system_state = state_manager.get_system_state()
|
|
|
|
return jsonify({
|
|
'success': True,
|
|
'state': system_state
|
|
}), 200
|
|
|
|
@api_bp.route('/system/reset', methods=['POST'])
|
|
def reset_system():
|
|
"""
|
|
Reset system state
|
|
POST /api/system/reset
|
|
"""
|
|
try:
|
|
state_manager.clear_current_file()
|
|
state_manager.clear_session_data()
|
|
|
|
return jsonify({
|
|
'success': True,
|
|
'message': 'System state reset successfully'
|
|
}), 200
|
|
|
|
except Exception as e:
|
|
current_app.logger.error(f"System reset error: {str(e)}")
|
|
return jsonify({
|
|
'success': False,
|
|
'error': f'Reset failed: {str(e)}'
|
|
}), 500
|
|
|
|
@api_bp.route('/mesh/ready', methods=['GET'])
|
|
def check_mesh_ready():
|
|
"""
|
|
Check if system is ready for mesh generation
|
|
GET /api/mesh/ready
|
|
"""
|
|
is_ready = state_manager.is_ready_for_processing()
|
|
current_file = state_manager.get_current_file()
|
|
processing_status = state_manager.get_processing_status()
|
|
|
|
return jsonify({
|
|
'success': True,
|
|
'ready': is_ready,
|
|
'file_uploaded': current_file is not None,
|
|
'processing_status': processing_status.status,
|
|
'message': 'Ready for mesh generation' if is_ready else 'Not ready for mesh generation'
|
|
}), 200
|
|
|
|
@api_bp.route('/health', methods=['GET'])
|
|
def health_check():
|
|
"""
|
|
Health check endpoint
|
|
GET /api/health
|
|
"""
|
|
system_state = state_manager.get_system_state()
|
|
|
|
return jsonify({
|
|
'success': True,
|
|
'message': 'CAE Mesh Generator API is running',
|
|
'timestamp': datetime.now().isoformat(),
|
|
'system_status': {
|
|
'has_file': system_state['current_file'] is not None,
|
|
'processing_status': system_state['processing_status']['status'],
|
|
'ready_for_processing': system_state['is_ready_for_processing']
|
|
}
|
|
}), 200
|
|
|
|
@api_bp.route('/mesh/generate', methods=['POST'])
|
|
@handle_api_error
|
|
@handle_ansys_error
|
|
def generate_mesh():
|
|
"""
|
|
Start mesh generation for uploaded file
|
|
POST /api/mesh/generate
|
|
"""
|
|
log_processing_step("mesh_generation", "requested")
|
|
|
|
# Check if system is ready for processing
|
|
if not state_manager.is_ready_for_processing():
|
|
current_file = state_manager.get_current_file()
|
|
processing_status = state_manager.get_processing_status()
|
|
|
|
if current_file is None:
|
|
raise ValidationError("No file uploaded")
|
|
elif state_manager.is_processing():
|
|
raise ValidationError(
|
|
"Processing already in progress",
|
|
details={'current_status': processing_status.status}
|
|
)
|
|
else:
|
|
raise ValidationError(
|
|
f'System not ready for processing. Current status: {processing_status.status}',
|
|
details={'current_status': processing_status.status}
|
|
)
|
|
|
|
# Get simulation mode from request (optional)
|
|
simulation_mode = False
|
|
if request.is_json and request.json:
|
|
simulation_mode = request.json.get('simulation_mode', False)
|
|
|
|
# Get current file and app for background thread
|
|
current_file = state_manager.get_current_file()
|
|
file_path = current_file.file_path
|
|
app = current_app._get_current_object() # Get the actual app instance
|
|
|
|
# Start processing in background thread
|
|
def background_processing():
|
|
"""Background thread for mesh processing"""
|
|
with app.app_context(): # Use the captured app instance
|
|
try:
|
|
app.logger.info(f"Starting mesh generation for file: {current_file.filename}")
|
|
|
|
# Start processing status
|
|
state_manager.start_processing(f"Starting mesh generation for {current_file.filename}")
|
|
|
|
# Process mesh
|
|
result = process_blade_mesh_with_state_updates(
|
|
file_path=file_path,
|
|
simulation_mode=simulation_mode
|
|
)
|
|
|
|
if result.success:
|
|
app.logger.info(f"✓ Mesh generation completed successfully: {result.element_count} elements")
|
|
state_manager.complete_processing("Mesh generation completed successfully")
|
|
else:
|
|
app.logger.error(f"✗ Mesh generation failed: {result.error_message}")
|
|
state_manager.set_processing_error(result.error_message)
|
|
|
|
except Exception as e:
|
|
app.logger.error(f"Background processing error: {str(e)}")
|
|
state_manager.set_processing_error(f"Processing error: {str(e)}")
|
|
|
|
# Start background thread
|
|
processing_thread = threading.Thread(target=background_processing, daemon=True)
|
|
processing_thread.start()
|
|
|
|
return jsonify({
|
|
'success': True,
|
|
'message': 'Mesh generation started',
|
|
'file_id': current_file.id,
|
|
'filename': current_file.filename,
|
|
'simulation_mode': simulation_mode,
|
|
'started_at': datetime.now().isoformat()
|
|
}), 202 # Accepted - processing started
|
|
|
|
@api_bp.route('/mesh/progress', methods=['GET'])
|
|
def get_mesh_progress():
|
|
"""
|
|
Get mesh generation progress
|
|
GET /api/mesh/progress
|
|
"""
|
|
try:
|
|
processing_status = state_manager.get_processing_status()
|
|
current_file = state_manager.get_current_file()
|
|
|
|
response_data = {
|
|
'success': True,
|
|
'status': processing_status.status,
|
|
'message': processing_status.message,
|
|
'progress_percentage': getattr(processing_status, 'progress_percentage', 0.0),
|
|
'current_operation': getattr(processing_status, 'current_operation', None),
|
|
'last_updated': getattr(processing_status, 'last_updated', processing_status.start_time),
|
|
'file_info': {
|
|
'id': current_file.id if current_file else None,
|
|
'filename': current_file.filename if current_file else None
|
|
} if current_file else None
|
|
}
|
|
|
|
# Add timing information
|
|
if processing_status.start_time:
|
|
response_data['started_at'] = processing_status.start_time.isoformat()
|
|
|
|
if processing_status.status in ['COMPLETED', 'ERROR']:
|
|
end_time = getattr(processing_status, 'completed_at', None) or processing_status.end_time
|
|
if end_time:
|
|
response_data['completed_at'] = end_time.isoformat()
|
|
processing_time = (end_time - processing_status.start_time).total_seconds()
|
|
response_data['processing_time'] = processing_time
|
|
else:
|
|
# Calculate current processing time
|
|
current_time = datetime.now()
|
|
processing_time = (current_time - processing_status.start_time).total_seconds()
|
|
response_data['current_processing_time'] = processing_time
|
|
|
|
# Add error information if failed
|
|
if processing_status.status == 'ERROR' and processing_status.error_message:
|
|
response_data['error_message'] = processing_status.error_message
|
|
|
|
return jsonify(response_data), 200
|
|
|
|
except Exception as e:
|
|
current_app.logger.error(f"Progress check error: {str(e)}")
|
|
return jsonify({
|
|
'success': False,
|
|
'error': f'Failed to get progress: {str(e)}'
|
|
}), 500
|
|
|
|
@api_bp.route('/mesh/cancel', methods=['POST'])
|
|
def cancel_mesh_generation():
|
|
"""
|
|
Cancel ongoing mesh generation
|
|
POST /api/mesh/cancel
|
|
"""
|
|
try:
|
|
processing_status = state_manager.get_processing_status()
|
|
|
|
if processing_status.status != 'PROCESSING':
|
|
return jsonify({
|
|
'success': False,
|
|
'error': f'No processing to cancel. Current status: {processing_status.status}'
|
|
}), 400
|
|
|
|
# Set status to cancelled (the background thread should handle this gracefully)
|
|
state_manager.set_processing_status('CANCELLED', 'Mesh generation cancelled by user')
|
|
|
|
current_app.logger.info("Mesh generation cancellation requested")
|
|
|
|
return jsonify({
|
|
'success': True,
|
|
'message': 'Mesh generation cancellation requested',
|
|
'cancelled_at': datetime.now().isoformat()
|
|
}), 200
|
|
|
|
except Exception as e:
|
|
current_app.logger.error(f"Cancellation error: {str(e)}")
|
|
return jsonify({
|
|
'success': False,
|
|
'error': f'Failed to cancel: {str(e)}'
|
|
}), 500
|
|
# Helper functions for detailed result processing
|
|
|
|
def _get_detailed_quality_info(mesh_result):
|
|
"""
|
|
Get detailed quality information from mesh result
|
|
|
|
Args:
|
|
mesh_result: MeshResult object
|
|
|
|
Returns:
|
|
Dictionary with detailed quality information
|
|
"""
|
|
try:
|
|
quality_details = {
|
|
'overall_score': mesh_result.quality_score,
|
|
'overall_status': mesh_result.quality_status,
|
|
'quality_breakdown': {
|
|
'element_quality': {
|
|
'score': mesh_result.quality_score,
|
|
'status': mesh_result.quality_status,
|
|
'threshold': 0.2,
|
|
'description': 'Minimum element quality measure'
|
|
},
|
|
'mesh_density': {
|
|
'elements_per_volume': mesh_result.element_count / 1000 if mesh_result.element_count > 0 else 0,
|
|
'nodes_per_element': mesh_result.node_count / mesh_result.element_count if mesh_result.element_count > 0 else 0,
|
|
'description': 'Mesh density characteristics'
|
|
}
|
|
},
|
|
'recommendations': _get_quality_recommendations(mesh_result),
|
|
'quality_metrics': {
|
|
'element_count': mesh_result.element_count,
|
|
'node_count': mesh_result.node_count,
|
|
'quality_score': mesh_result.quality_score,
|
|
'generation_time': mesh_result.generation_time
|
|
}
|
|
}
|
|
|
|
return quality_details
|
|
|
|
except Exception as e:
|
|
current_app.logger.error(f"Error getting detailed quality info: {str(e)}")
|
|
return {
|
|
'error': f'Failed to get detailed quality info: {str(e)}',
|
|
'overall_score': mesh_result.quality_score if mesh_result else 0,
|
|
'overall_status': mesh_result.quality_status if mesh_result else 'unknown'
|
|
}
|
|
|
|
def _get_quality_recommendations(mesh_result):
|
|
"""
|
|
Generate quality improvement recommendations
|
|
|
|
Args:
|
|
mesh_result: MeshResult object
|
|
|
|
Returns:
|
|
List of recommendation strings
|
|
"""
|
|
recommendations = []
|
|
|
|
try:
|
|
if mesh_result.quality_score < 50:
|
|
recommendations.append("Consider reducing global element size for better quality")
|
|
recommendations.append("Add local refinement to high-curvature areas")
|
|
elif mesh_result.quality_score < 70:
|
|
recommendations.append("Mesh quality is acceptable but could be improved")
|
|
recommendations.append("Consider adding inflation layers for better boundary resolution")
|
|
else:
|
|
recommendations.append("Excellent mesh quality achieved")
|
|
recommendations.append("Mesh is suitable for accurate analysis")
|
|
|
|
if mesh_result.element_count > 100000:
|
|
recommendations.append("High element count - consider optimizing for computational efficiency")
|
|
elif mesh_result.element_count < 10000:
|
|
recommendations.append("Low element count - consider refining for better accuracy")
|
|
|
|
return recommendations
|
|
|
|
except Exception as e:
|
|
current_app.logger.error(f"Error generating recommendations: {str(e)}")
|
|
return ["Unable to generate recommendations due to error"]
|
|
|
|
def _get_visualization_info():
|
|
"""
|
|
Get visualization information and generate images if needed
|
|
|
|
Returns:
|
|
Dictionary with visualization information
|
|
"""
|
|
try:
|
|
# Initialize visualization exporter
|
|
viz_exporter = VisualizationExporter(output_dir="static/visualizations")
|
|
|
|
visualization_info = {
|
|
'available_views': viz_exporter.get_available_views(),
|
|
'available_formats': viz_exporter.get_available_formats(),
|
|
'default_settings': {
|
|
'width': 1280,
|
|
'height': 720,
|
|
'background': 'white',
|
|
'camera_view': 'isometric',
|
|
'format': 'PNG'
|
|
},
|
|
'images': [],
|
|
'export_summary': viz_exporter.get_export_summary()
|
|
}
|
|
|
|
# Try to generate basic mesh visualization
|
|
try:
|
|
settings = VisualizationSettings(
|
|
width=800,
|
|
height=600,
|
|
camera_view='isometric',
|
|
background_color='white'
|
|
)
|
|
|
|
result = viz_exporter.export_mesh_image(
|
|
filename='current_mesh_preview.png',
|
|
settings=settings
|
|
)
|
|
|
|
if result.success:
|
|
visualization_info['images'].append({
|
|
'type': 'mesh_preview',
|
|
'path': result.image_path,
|
|
'size': result.image_size,
|
|
'file_size': result.file_size,
|
|
'description': 'Current mesh visualization'
|
|
})
|
|
|
|
except Exception as img_error:
|
|
current_app.logger.warning(f"Could not generate mesh preview: {str(img_error)}")
|
|
visualization_info['images'].append({
|
|
'type': 'mesh_preview',
|
|
'error': str(img_error),
|
|
'description': 'Mesh preview generation failed'
|
|
})
|
|
|
|
return visualization_info
|
|
|
|
except Exception as e:
|
|
current_app.logger.error(f"Error getting visualization info: {str(e)}")
|
|
return {
|
|
'error': f'Failed to get visualization info: {str(e)}',
|
|
'available_views': [],
|
|
'available_formats': [],
|
|
'images': []
|
|
}
|
|
|
|
def _format_result_summary(result_data):
|
|
"""
|
|
Format result data as a concise summary
|
|
|
|
Args:
|
|
result_data: Complete result data dictionary
|
|
|
|
Returns:
|
|
Dictionary with summary information
|
|
"""
|
|
try:
|
|
basic_info = result_data.get('basic_info', {})
|
|
processing_info = result_data.get('processing_info', {})
|
|
file_info = result_data.get('file_info', {})
|
|
|
|
summary = {
|
|
'mesh_statistics': {
|
|
'elements': basic_info.get('element_count', 0),
|
|
'nodes': basic_info.get('node_count', 0),
|
|
'quality_score': basic_info.get('quality_score', 0),
|
|
'quality_status': basic_info.get('quality_status', 'unknown')
|
|
},
|
|
'processing_summary': {
|
|
'status': processing_info.get('status', 'unknown'),
|
|
'total_time': processing_info.get('total_time', 0),
|
|
'completed': processing_info.get('status') == 'completed'
|
|
},
|
|
'file_summary': {
|
|
'filename': file_info.get('filename', 'unknown'),
|
|
'file_size_mb': round(file_info.get('file_size', 0) / (1024 * 1024), 2)
|
|
},
|
|
'success_indicators': {
|
|
'mesh_generated': basic_info.get('element_count', 0) > 0,
|
|
'quality_acceptable': basic_info.get('quality_score', 0) >= 50,
|
|
'processing_completed': processing_info.get('status') == 'completed'
|
|
}
|
|
}
|
|
|
|
return summary
|
|
|
|
except Exception as e:
|
|
current_app.logger.error(f"Error formatting result summary: {str(e)}")
|
|
return {
|
|
'error': f'Failed to format summary: {str(e)}',
|
|
'mesh_statistics': {'elements': 0, 'nodes': 0, 'quality_score': 0},
|
|
'processing_summary': {'status': 'error', 'completed': False},
|
|
'success_indicators': {'mesh_generated': False, 'quality_acceptable': False, 'processing_completed': False}
|
|
}
|
|
|
|
@api_bp.route('/mesh/visualization', methods=['GET'])
|
|
def get_mesh_visualization():
|
|
"""
|
|
Generate and return mesh visualization
|
|
GET /api/mesh/visualization
|
|
|
|
Query parameters:
|
|
- view: str - Camera view (isometric, front, side, top) (default: isometric)
|
|
- width: int - Image width (default: 800)
|
|
- height: int - Image height (default: 600)
|
|
- format: str - Image format (PNG, JPG) (default: PNG)
|
|
- quality_metric: str - Quality metric to visualize (optional)
|
|
"""
|
|
try:
|
|
# Get query parameters
|
|
view = request.args.get('view', 'isometric')
|
|
width = int(request.args.get('width', 800))
|
|
height = int(request.args.get('height', 600))
|
|
img_format = request.args.get('format', 'PNG').upper()
|
|
quality_metric = request.args.get('quality_metric', None)
|
|
|
|
# Initialize visualization exporter
|
|
viz_exporter = VisualizationExporter(output_dir="static/visualizations")
|
|
|
|
# Create visualization settings
|
|
settings = VisualizationSettings(
|
|
width=width,
|
|
height=height,
|
|
camera_view=view,
|
|
image_format=img_format,
|
|
background_color='white',
|
|
show_edges=True
|
|
)
|
|
|
|
# Generate appropriate visualization
|
|
if quality_metric:
|
|
result = viz_exporter.export_quality_visualization(
|
|
quality_metric=quality_metric
|
|
)
|
|
else:
|
|
result = viz_exporter.export_mesh_image(settings=settings)
|
|
|
|
if result.success:
|
|
return jsonify({
|
|
'success': True,
|
|
'visualization': {
|
|
'image_path': result.image_path,
|
|
'image_size': result.image_size,
|
|
'file_size': result.file_size,
|
|
'export_time': result.export_time,
|
|
'settings': {
|
|
'view': view,
|
|
'width': width,
|
|
'height': height,
|
|
'format': img_format,
|
|
'quality_metric': quality_metric
|
|
}
|
|
},
|
|
'warnings': result.warnings
|
|
}), 200
|
|
else:
|
|
return jsonify({
|
|
'success': False,
|
|
'error': result.error_message,
|
|
'warnings': result.warnings
|
|
}), 500
|
|
|
|
except Exception as e:
|
|
current_app.logger.error(f"Visualization generation error: {str(e)}")
|
|
return jsonify({
|
|
'success': False,
|
|
'error': f'Failed to generate visualization: {str(e)}'
|
|
}), 500
|
|
|
|
@api_bp.route('/mesh/export', methods=['POST'])
|
|
def export_mesh_data():
|
|
"""
|
|
Export mesh data in various formats
|
|
POST /api/mesh/export
|
|
|
|
JSON body:
|
|
{
|
|
"format": "json|summary|csv",
|
|
"include_visualization": bool,
|
|
"include_quality_details": bool
|
|
}
|
|
"""
|
|
try:
|
|
data = request.get_json() or {}
|
|
export_format = data.get('format', 'json').lower()
|
|
include_visualization = data.get('include_visualization', False)
|
|
include_quality_details = data.get('include_quality_details', True)
|
|
|
|
# Get mesh result
|
|
mesh_result = state_manager.get_mesh_result()
|
|
if mesh_result is None:
|
|
return jsonify({
|
|
'success': False,
|
|
'error': 'No mesh result available for export'
|
|
}), 404
|
|
|
|
# Build export data
|
|
export_data = {
|
|
'export_info': {
|
|
'format': export_format,
|
|
'exported_at': datetime.now().isoformat(),
|
|
'include_visualization': include_visualization,
|
|
'include_quality_details': include_quality_details
|
|
},
|
|
'mesh_data': mesh_result.to_dict()
|
|
}
|
|
|
|
# Add additional data based on options
|
|
if include_quality_details:
|
|
export_data['quality_details'] = _get_detailed_quality_info(mesh_result)
|
|
|
|
if include_visualization:
|
|
export_data['visualization'] = _get_visualization_info()
|
|
|
|
# Format based on requested format
|
|
if export_format == 'summary':
|
|
return jsonify({
|
|
'success': True,
|
|
'export': _format_result_summary(export_data)
|
|
}), 200
|
|
elif export_format == 'csv':
|
|
# For CSV format, return structured data that can be converted to CSV
|
|
csv_data = {
|
|
'mesh_statistics': [
|
|
['Metric', 'Value'],
|
|
['Elements', mesh_result.element_count],
|
|
['Nodes', mesh_result.node_count],
|
|
['Quality Score', mesh_result.quality_score],
|
|
['Quality Status', mesh_result.quality_status],
|
|
['Generation Time', mesh_result.generation_time]
|
|
]
|
|
}
|
|
return jsonify({
|
|
'success': True,
|
|
'export': csv_data,
|
|
'format': 'csv'
|
|
}), 200
|
|
else:
|
|
return jsonify({
|
|
'success': True,
|
|
'export': export_data
|
|
}), 200
|
|
|
|
except Exception as e:
|
|
current_app.logger.error(f"Export error: {str(e)}")
|
|
return jsonify({
|
|
'success': False,
|
|
'error': f'Export failed: {str(e)}'
|
|
}), 500
|
|
|
|
# Download endpoints
|
|
@api_bp.route('/mesh/download/mesh', methods=['GET'])
|
|
def download_mesh_file():
|
|
"""
|
|
Download mesh file by copying from ANSYS temp directories
|
|
GET /api/mesh/download/mesh
|
|
"""
|
|
try:
|
|
import glob
|
|
import shutil
|
|
import time
|
|
|
|
current_app.logger.info("Starting mesh file download process...")
|
|
|
|
# Ensure results directory exists
|
|
results_dir = Path("results/mesh_files")
|
|
results_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Get current file information for naming
|
|
current_file = state_manager.get_current_file()
|
|
if current_file:
|
|
step_file_name = Path(current_file.filename).stem
|
|
else:
|
|
step_file_name = "blade"
|
|
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
output_filename = f"{step_file_name}_mesh_{timestamp}.mechdb"
|
|
output_path = results_dir / output_filename
|
|
|
|
# Try to find the actual ANSYS .mechdb file using patterns from blade_mesh_cli.py
|
|
mechdb_copied = False
|
|
|
|
# Common ANSYS temp directories to search (from blade_mesh_cli.py)
|
|
temp_patterns = [
|
|
os.path.expanduser("~/AppData/Local/Temp/ANSYS.*/AnsysMech*/Project_Mech_Files/*.mechdb"),
|
|
"C:/Users/*/AppData/Local/Temp/ANSYS.*/AnsysMech*/Project_Mech_Files/*.mechdb",
|
|
os.path.expanduser("~/AppData/Local/Temp/ANSYS.*/AnsysMech*/*.mechdb"),
|
|
"C:/temp/ANSYS*/*.mechdb",
|
|
"./temp/*.mechdb"
|
|
]
|
|
|
|
current_app.logger.info("Searching for ANSYS mesh database file in temp directories...")
|
|
|
|
for pattern in temp_patterns:
|
|
try:
|
|
mechdb_files = glob.glob(pattern, recursive=True)
|
|
if mechdb_files:
|
|
# Sort by modification time, get the most recent
|
|
mechdb_files.sort(key=lambda x: os.path.getmtime(x), reverse=True)
|
|
source_file = mechdb_files[0]
|
|
|
|
# Check if file is recent (within last 24 hours)
|
|
file_age = time.time() - os.path.getmtime(source_file)
|
|
if file_age < 86400: # 24 hours
|
|
current_app.logger.info(f"Found recent ANSYS database: {os.path.basename(source_file)}")
|
|
|
|
# Copy the file
|
|
shutil.copy2(source_file, output_path)
|
|
mechdb_copied = True
|
|
|
|
file_size = os.path.getsize(output_path)
|
|
current_app.logger.info(f"Mesh database copied: {output_filename} ({file_size:,} bytes)")
|
|
break
|
|
|
|
except Exception as search_error:
|
|
current_app.logger.debug(f"Search pattern failed: {pattern} - {search_error}")
|
|
continue
|
|
|
|
if not mechdb_copied:
|
|
current_app.logger.warning("Could not find recent ANSYS .mechdb file, creating informative placeholder")
|
|
|
|
# Get mesh result for placeholder content
|
|
mesh_result = state_manager.get_mesh_result()
|
|
|
|
# Create a more informative placeholder file
|
|
placeholder_content = f"""ANSYS Mechanical Database Placeholder
|
|
|
|
Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
|
|
Source File: {current_file.filename if current_file else 'unknown'}
|
|
Status: Mesh generation completed but database file not found in ANSYS temp directories
|
|
|
|
MESH STATISTICS:
|
|
Elements: {mesh_result.element_count if mesh_result else 'N/A'}
|
|
Nodes: {mesh_result.node_count if mesh_result else 'N/A'}
|
|
Quality Score: {mesh_result.quality_score if mesh_result else 'N/A'}
|
|
Generation Time: {mesh_result.generation_time if mesh_result else 'N/A'} seconds
|
|
|
|
NOTE: This is a placeholder file. In a production environment with ANSYS installed,
|
|
the actual .mechdb file would be copied from the ANSYS working directory.
|
|
|
|
SEARCHED LOCATIONS:
|
|
{chr(10).join(temp_patterns)}
|
|
|
|
To get the actual mesh database file:
|
|
1. Locate the ANSYS Mechanical project directory
|
|
2. Find the .mechdb file in the Project_Mech_Files subfolder
|
|
3. The file will be named something like "file.mechdb" or "solver_files.mechdb"
|
|
"""
|
|
|
|
with open(output_path, 'w', encoding='utf-8') as f:
|
|
f.write(placeholder_content)
|
|
|
|
current_app.logger.info(f"Placeholder created: {output_filename}")
|
|
|
|
# Check if we have any mesh files to send
|
|
if output_path.exists():
|
|
file_size = os.path.getsize(output_path)
|
|
|
|
from flask import send_file
|
|
return send_file(
|
|
output_path,
|
|
as_attachment=True,
|
|
download_name=output_filename,
|
|
mimetype='application/octet-stream'
|
|
)
|
|
else:
|
|
return jsonify({
|
|
'success': False,
|
|
'error': 'Failed to create mesh file for download'
|
|
}), 500
|
|
|
|
except Exception as e:
|
|
current_app.logger.error(f"Mesh download error: {str(e)}")
|
|
return jsonify({
|
|
'success': False,
|
|
'error': f'Download failed: {str(e)}'
|
|
}), 500
|
|
|
|
@api_bp.route('/mesh/download/image', methods=['GET'])
|
|
def download_mesh_image():
|
|
"""
|
|
Download mesh visualization image with proper content
|
|
GET /api/mesh/download/image
|
|
"""
|
|
try:
|
|
current_app.logger.info("Starting mesh image download process...")
|
|
|
|
# Ensure visualization directory exists
|
|
viz_dir = Path("frontend/static/visualizations")
|
|
viz_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Check for existing visualization images first
|
|
image_files = (
|
|
list(viz_dir.glob("*.png")) +
|
|
list(viz_dir.glob("*.jpg")) +
|
|
list(viz_dir.glob("*.jpeg"))
|
|
)
|
|
|
|
latest_image = None
|
|
if image_files:
|
|
# Get the most recent image file
|
|
latest_image = max(image_files, key=os.path.getmtime)
|
|
|
|
# Check if the image file is not empty and recent (within 1 hour)
|
|
if latest_image.stat().st_size > 1000: # More than 1KB
|
|
file_age = time.time() - latest_image.stat().st_mtime
|
|
if file_age < 3600: # Within 1 hour
|
|
current_app.logger.info(f"Found recent valid image: {latest_image.name} ({latest_image.stat().st_size} bytes)")
|
|
else:
|
|
current_app.logger.info(f"Image file is too old: {file_age/60:.1f} minutes")
|
|
latest_image = None
|
|
else:
|
|
current_app.logger.info(f"Image file is too small: {latest_image.stat().st_size} bytes")
|
|
latest_image = None
|
|
|
|
# If no valid image found, generate a new one
|
|
if not latest_image:
|
|
current_app.logger.info("No valid visualization image found, generating new one...")
|
|
|
|
try:
|
|
from backend.utils.visualization_exporter import VisualizationExporter, VisualizationSettings
|
|
|
|
# Create visualization exporter
|
|
viz_exporter = VisualizationExporter(
|
|
mechanical_session=None, # Use simulation mode
|
|
output_dir=str(viz_dir)
|
|
)
|
|
|
|
# Generate new image with proper settings
|
|
viz_settings = VisualizationSettings(
|
|
width=1280,
|
|
height=720,
|
|
image_format="PNG",
|
|
camera_view="isometric",
|
|
show_edges=True,
|
|
background_color="white"
|
|
)
|
|
|
|
# Generate filename with timestamp
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
filename = f"mesh_visualization_{timestamp}.png"
|
|
|
|
viz_result = viz_exporter.export_mesh_image(
|
|
filename=filename,
|
|
settings=viz_settings
|
|
)
|
|
|
|
if viz_result.success and viz_result.image_path:
|
|
latest_image = Path(viz_result.image_path)
|
|
current_app.logger.info(f"Generated new visualization image: {filename} ({viz_result.file_size} bytes)")
|
|
else:
|
|
current_app.logger.error(f"Failed to generate visualization: {viz_result.error_message}")
|
|
raise Exception(f"Visualization generation failed: {viz_result.error_message}")
|
|
|
|
except ImportError as e:
|
|
current_app.logger.error(f"Visualization exporter not available: {e}")
|
|
raise Exception("Visualization system not available")
|
|
except Exception as e:
|
|
current_app.logger.error(f"Error generating visualization: {e}")
|
|
raise Exception(f"Failed to generate visualization: {str(e)}")
|
|
|
|
# Send the image file
|
|
if latest_image and latest_image.exists():
|
|
file_size = latest_image.stat().st_size
|
|
current_app.logger.info(f"Sending image file: {latest_image.name} ({file_size} bytes)")
|
|
|
|
from flask import send_file
|
|
return send_file(
|
|
latest_image,
|
|
as_attachment=True,
|
|
download_name=f"mesh_visualization_{datetime.now().strftime('%Y%m%d_%H%M%S')}.png",
|
|
mimetype='image/png'
|
|
)
|
|
else:
|
|
return jsonify({
|
|
'success': False,
|
|
'error': 'No visualization images available and unable to generate new one'
|
|
}), 404
|
|
|
|
except Exception as e:
|
|
current_app.logger.error(f"Image download error: {str(e)}")
|
|
return jsonify({
|
|
'success': False,
|
|
'error': f'Download failed: {str(e)}'
|
|
}), 500 |