setAttribute(PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION); // 3. Buscar job pendiente, en progreso o fallido $pdo->beginTransaction(); // Primero buscar un job que quedó a medias $stmt = $pdo->prepare(" SELECT * FROM validation_jobs WHERE status = 'processing' AND progress_percentage < 100 ORDER BY created_at ASC LIMIT 1 FOR UPDATE "); $stmt->execute(); $job = $stmt->fetch(PDO::FETCH_ASSOC); // Si no hay, buscar jobs fallidos para reintentar if (!$job) { $stmt = $pdo->prepare(" SELECT * FROM validation_jobs WHERE status = 'failed' ORDER BY created_at ASC LIMIT 1 FOR UPDATE "); $stmt->execute(); $job = $stmt->fetch(PDO::FETCH_ASSOC); if ($job) { echo "Reintentando job fallido: {$job['id']}\n"; $update = $pdo->prepare(" UPDATE validation_jobs SET status = 'processing', error_message = NULL, updated_at = NOW() WHERE id = ? "); $update->execute([$job['id']]); } } // Si no hay, tomar uno en cola if (!$job) { $stmt = $pdo->prepare(" SELECT * FROM validation_jobs WHERE status = 'queued' ORDER BY created_at ASC LIMIT 1 FOR UPDATE "); $stmt->execute(); $job = $stmt->fetch(PDO::FETCH_ASSOC); if ($job) { $update = $pdo->prepare(" UPDATE validation_jobs SET status = 'processing', updated_at = NOW() WHERE id = ? "); $update->execute([$job['id']]); } } $pdo->commit(); if (!$job) { echo "[" . date('Y-m-d H:i:s') . "] No hay trabajos pendientes.\n"; unlink($lockFile); exit; } echo "Procesando Job ID: {$job['id']}\n"; // Enviar notificación WebSocket de inicio sendWebSocketNotification($job['user_id'], 'processing', $job['id']); // 4. Determinar archivos ya procesados $results = json_decode($job['results'], true) ?? ['processed_files' => []]; $procesados = $results['processed_files'] ?? []; // 5. Buscar ruta del ZIP en base de datos $stmt = $pdo->prepare("SELECT ARTE_UBTE FROM s002v01tarte WHERE ARTE_IDAR = ?"); $stmt->execute([$job['zip_temp_id']]); $zipData = $stmt->fetch(PDO::FETCH_ASSOC); if (!$zipData) { throw new Exception("ZIP ID no encontrado: {$job['zip_temp_id']}"); } $zipPath = $zipData['ARTE_UBTE']; if (!file_exists($zipPath)) { throw new Exception("ZIP no encontrado: {$zipPath}"); } // Guardar IDs de archivos temporales $tempFiles = [ 'excel' => getEncrypt($job['excel_temp_id']), 'zip' => getEncrypt($job['zip_temp_id']) ]; $individualTempFiles = []; $zip = new ZipArchive(); if ($zip->open($zipPath) !== TRUE) { throw new Exception("No se pudo abrir el ZIP: {$zipPath}"); } // Crear carpeta de extracción temporal $extractDir = __DIR__ . '/../storage/app/tempFiles/extracted_' . time(); if (!mkdir($extractDir, 0755, true)) { throw new Exception("No se pudo crear directorio de extracción: {$extractDir}"); } // Obtener solo archivos (no directorios) $archivos = []; for ($i = 0; $i < $zip->numFiles; $i++) { $stat = $zip->statIndex($i); $nombreArchivo = $stat['name']; // Solo procesar archivos, no directorios if (substr($nombreArchivo, -1) !== '/') { $archivos[] = basename($nombreArchivo); // Solo nombre sin ruta } } $totalArchivos = count($archivos); $faltantes = array_diff($archivos, $procesados); echo "Archivos totales: $totalArchivos, ya procesados: " . count($procesados) . ", faltantes: " . count($faltantes) . "\n"; // 6. Procesar faltantes foreach ($faltantes as $nombreArchivo) { echo "Procesando: {$nombreArchivo}\n"; // Extraer archivo del ZIP $archivoExtraido = $extractDir . '/' . $nombreArchivo; $contenido = null; // Buscar el archivo en el ZIP (puede estar en subdirectorios) for ($i = 0; $i < $zip->numFiles; $i++) { $stat = $zip->statIndex($i); if (basename($stat['name']) === $nombreArchivo) { $contenido = $zip->getFromIndex($i); break; } } if ($contenido === null) { echo "Archivo no encontrado en ZIP: {$nombreArchivo}\n"; continue; } // Guardar archivo extraído file_put_contents($archivoExtraido, $contenido); // Enviar archivo al endpoint usando multipart/form-data $ch = curl_init('http://192.168.2.21:8000/api/upload-temp-file'); curl_setopt($ch, CURLOPT_POST, true); curl_setopt($ch, CURLOPT_POSTFIELDS, [ 'file' => new CURLFile($archivoExtraido, mime_content_type($archivoExtraido), $nombreArchivo), 'id_user' => $job['user_id'], 'linea' => $job['linea'] ]); curl_setopt($ch, CURLOPT_RETURNTRANSFER, true); $response = curl_exec($ch); $httpCode = curl_getinfo($ch, CURLINFO_HTTP_CODE); curl_close($ch); // Eliminar archivo temporal unlink($archivoExtraido); // Solo marcar como procesado si el endpoint respondió exitosamente if ($httpCode >= 200 && $httpCode < 300) { $responseData = json_decode($response, true); if (isset($responseData['response']['idArchivo'])) { $individualTempFiles[] = [ 'original_name' => $nombreArchivo, 'temp_id' => $responseData['response']['idArchivo'] ]; } $procesados[] = $nombreArchivo; $progress = intval((count($procesados) / $totalArchivos) * 100); $results['processed_files'] = $procesados; $stmt = $pdo->prepare(" UPDATE validation_jobs SET results = ?, progress_percentage = ?, updated_at = NOW() WHERE id = ? "); $stmt->execute([json_encode($results), $progress, $job['id']]); // Enviar notificación de progreso sendWebSocketNotification($job['user_id'], 'progress', $job['id'], $progress); //echo "Progreso: {$progress}%\n"; } else { echo "Error al procesar archivo {$nombreArchivo}: HTTP {$httpCode}\n"; throw new Exception("Error al procesar archivo: {$nombreArchivo}"); } } $zip->close(); // Eliminar directorio de extracción rmdir($extractDir); // 7. Marcar como completado $stmt = $pdo->prepare(" UPDATE validation_jobs SET status = 'completed', progress_percentage = 100, completed_at = NOW(), updated_at = NOW() WHERE id = ? "); $stmt->execute([$job['id']]); // Enviar notificación de completado con archivos temporales $completionData = [ 'temp_files' => $tempFiles, 'individual_temp_files' => $individualTempFiles ]; echo ("completionData: " . json_encode($completionData) . "\n"); sendWebSocketNotification($job['user_id'], 'completed', $job['id'], 100, $completionData); echo "Job completado: {$job['id']}\n"; } catch (Exception $e) { echo "ERROR: " . $e->getMessage() . "\n"; // Enviar notificación de error si hay job activo if (isset($job)) { $stmt = $pdo->prepare("UPDATE validation_jobs SET status = 'failed', error_message = ? WHERE id = ?"); $stmt->execute([$e->getMessage(), $job['id']]); sendWebSocketNotification($job['user_id'], 'failed', $job['id']); } } finally { // 8. Limpiar y eliminar lock global if (isset($extractDir) && is_dir($extractDir)) { $files = glob("$extractDir/*"); foreach ($files as $file) { if (is_file($file)) { unlink($file); } } rmdir($extractDir); } if (file_exists($lockFile)) { unlink($lockFile); } } function sendWebSocketNotification($userId, $status, $jobId, $progress = 0, $completionData = []) { try { require_once __DIR__ . '/../vendor/autoload.php'; $client = new \ElephantIO\Client(\ElephantIO\Client::engine(\ElephantIO\Client::CLIENT_4X, 'http://localhost:3200')); $client->initialize(); $client->of('/'); $data = [ 'jobId' => $jobId, 'status' => $status, 'message' => getStatusMessage($status) ]; if ($progress !== null) { $data['progress'] = $progress; } if (!empty($completionData)) { $data = array_merge($data, $completionData); } $client->emit('laravel_emit', [ 'event' => 'validation_status', 'data' => $data, 'completionData' => $completionData, 'userId' => (string)$userId ]); $client->close(); //echo "WebSocket: $status enviado a usuario $userId\n"; } catch (Exception $e) { echo "WebSocket error: " . $e->getMessage() . "\n"; } } function getStatusMessage($status) { $messages = [ 'queued' => 'En cola de validación', 'processing' => 'Procesando archivos', 'progress' => 'Validando archivos', 'completed' => 'Validación completada', 'failed' => 'Error en validación' ]; return $messages[$status] ?? 'Estado desconocido'; } function getEncrypt($value){ $ch = curl_init('http://192.168.2.21:8000/api/encrypt'); curl_setopt($ch, CURLOPT_POST, true); curl_setopt($ch, CURLOPT_POSTFIELDS, ['value' => $value]); curl_setopt($ch, CURLOPT_RETURNTRANSFER, true); $response = curl_exec($ch); curl_close($ch); $responseData = json_decode($response, true); return $responseData['response']['encrypted']; }