setAttribute(PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION); // 3. Buscar job pendiente o en progreso $pdo->beginTransaction(); // Primero buscar un job que quedó a medias $stmt = $pdo->prepare(" SELECT * FROM validation_jobs WHERE status = 'processing' AND progress_percentage < 100 ORDER BY created_at ASC LIMIT 1 FOR UPDATE "); $stmt->execute(); $job = $stmt->fetch(PDO::FETCH_ASSOC); // Si no hay, tomar uno en cola if (!$job) { $stmt = $pdo->prepare(" SELECT * FROM validation_jobs WHERE status = 'queued' ORDER BY created_at ASC LIMIT 1 FOR UPDATE "); $stmt->execute(); $job = $stmt->fetch(PDO::FETCH_ASSOC); if ($job) { $update = $pdo->prepare(" UPDATE validation_jobs SET status = 'processing', updated_at = NOW() WHERE id = ? "); $update->execute([$job['id']]); } } $pdo->commit(); if (!$job) { echo "[" . date('Y-m-d H:i:s') . "] No hay trabajos pendientes.\n"; unlink($lockFile); exit; } echo "Procesando Job ID: {$job['id']}\n"; // Enviar notificación WebSocket de inicio sendWebSocketNotification($job['user_id'], 'processing', $job['id']); // 4. Determinar archivos ya procesados $results = json_decode($job['results'], true) ?? ['processed_files' => []]; $procesados = $results['processed_files'] ?? []; // 5. Abrir ZIP y obtener lista completa de archivos $zipPath = $job['zip_temp_path']; if (!file_exists($zipPath)) { throw new Exception("ZIP no encontrado: {$zipPath}"); } $zip = new ZipArchive(); if ($zip->open($zipPath) !== TRUE) { throw new Exception("No se pudo abrir el ZIP: {$zipPath}"); } $totalArchivos = $zip->numFiles; $faltantes = []; for ($i = 0; $i < $totalArchivos; $i++) { $nombreArchivo = $zip->getNameIndex($i); if (!in_array($nombreArchivo, $procesados)) { $faltantes[] = $nombreArchivo; } } echo "Archivos totales: $totalArchivos, ya procesados: " . count($procesados) . ", faltantes: " . count($faltantes) . "\n"; // 6. Procesar faltantes foreach ($faltantes as $nombreArchivo) { echo "Procesando: {$nombreArchivo}\n"; // Extraer contenido del archivo $contenido = $zip->getFromName($nombreArchivo); // Marcar archivo como procesado $procesados[] = $nombreArchivo; $progress = intval((count($procesados) / $totalArchivos) * 100); $results['processed_files'] = $procesados; $stmt = $pdo->prepare(" UPDATE validation_jobs SET results = ?, progress_percentage = ?, updated_at = NOW() WHERE id = ? "); $stmt->execute([json_encode($results), $progress, $job['id']]); // Enviar notificación de progreso sendWebSocketNotification($job['user_id'], 'progress', $job['id'], $progress); echo "Progreso: {$progress}%\n"; } $zip->close(); // 7. Marcar como completado $stmt = $pdo->prepare(" UPDATE validation_jobs SET status = 'completed', progress_percentage = 100, completed_at = NOW(), updated_at = NOW() WHERE id = ? "); $stmt->execute([$job['id']]); // Enviar notificación de completado sendWebSocketNotification($job['user_id'], 'completed', $job['id']); echo "Job completado: {$job['id']}\n"; } catch (Exception $e) { echo "ERROR: " . $e->getMessage() . "\n"; // Enviar notificación de error si hay job activo if (isset($job)) { $stmt = $pdo->prepare("UPDATE validation_jobs SET status = 'failed', error_message = ? WHERE id = ?"); $stmt->execute([$e->getMessage(), $job['id']]); sendWebSocketNotification($job['user_id'], 'failed', $job['id']); } } finally { // 8. Eliminar lock global if (file_exists($lockFile)) { unlink($lockFile); } } function sendWebSocketNotification($userId, $status, $jobId, $progress = null) { try { require_once __DIR__ . '/../vendor/autoload.php'; $client = new \ElephantIO\Client(\ElephantIO\Client::engine(\ElephantIO\Client::CLIENT_4X, 'http://localhost:3200')); $client->initialize(); $client->of('/'); $data = [ 'jobId' => $jobId, 'status' => $status, 'message' => getStatusMessage($status) ]; if ($progress !== null) { $data['progress'] = $progress; } $client->emit('laravel_emit', [ 'event' => 'validation_status', 'data' => $data, 'userId' => (string)$userId ]); $client->close(); echo "WebSocket: $status enviado a usuario $userId\n"; } catch (Exception $e) { echo "WebSocket error: " . $e->getMessage() . "\n"; } } function getStatusMessage($status) { $messages = [ 'queued' => 'En cola de validación', 'processing' => 'Procesando archivos', 'progress' => 'Validando archivos', 'completed' => 'Validación completada', 'failed' => 'Error en validación' ]; return $messages[$status] ?? 'Estado desconocido'; }