| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196 |
- <?php
- // Configuración de BD
- $host = 'localhost';
- $port = 3306;
- $database = 'samqa';
- $username = 'root';
- $password = 'root';
- // Archivo lock global
- $lockFile = __DIR__ . '/validation.lock';
- // 1. Verificar lock global
- if (file_exists($lockFile)) {
- echo "[" . date('Y-m-d H:i:s') . "] Otro proceso está en ejecución. Saliendo...\n";
- exit;
- }
- file_put_contents($lockFile, getmypid());
- // 2. Conectar BD
- try {
- $pdo = new PDO("mysql:host=$host;port=$port;dbname=$database", $username, $password);
- $pdo->setAttribute(PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION);
- // 3. Buscar job pendiente o en progreso
- $pdo->beginTransaction();
- // Primero buscar un job que quedó a medias
- $stmt = $pdo->prepare("
- SELECT * FROM validation_jobs
- WHERE status = 'processing' AND progress_percentage < 100
- ORDER BY created_at ASC
- LIMIT 1
- FOR UPDATE
- ");
- $stmt->execute();
- $job = $stmt->fetch(PDO::FETCH_ASSOC);
- // Si no hay, tomar uno en cola
- if (!$job) {
- $stmt = $pdo->prepare("
- SELECT * FROM validation_jobs
- WHERE status = 'queued'
- ORDER BY created_at ASC
- LIMIT 1
- FOR UPDATE
- ");
- $stmt->execute();
- $job = $stmt->fetch(PDO::FETCH_ASSOC);
- if ($job) {
- $update = $pdo->prepare("
- UPDATE validation_jobs
- SET status = 'processing', updated_at = NOW()
- WHERE id = ?
- ");
- $update->execute([$job['id']]);
- }
- }
- $pdo->commit();
- if (!$job) {
- echo "[" . date('Y-m-d H:i:s') . "] No hay trabajos pendientes.\n";
- unlink($lockFile);
- exit;
- }
- echo "Procesando Job ID: {$job['id']}\n";
-
- // Enviar notificación WebSocket de inicio
- sendWebSocketNotification($job['user_id'], 'processing', $job['id']);
- // 4. Determinar archivos ya procesados
- $results = json_decode($job['results'], true) ?? ['processed_files' => []];
- $procesados = $results['processed_files'] ?? [];
- // 5. Abrir ZIP y obtener lista completa de archivos
- $zipPath = $job['zip_temp_path'];
- if (!file_exists($zipPath)) {
- throw new Exception("ZIP no encontrado: {$zipPath}");
- }
- $zip = new ZipArchive();
- if ($zip->open($zipPath) !== TRUE) {
- throw new Exception("No se pudo abrir el ZIP: {$zipPath}");
- }
- $totalArchivos = $zip->numFiles;
- $faltantes = [];
- for ($i = 0; $i < $totalArchivos; $i++) {
- $nombreArchivo = $zip->getNameIndex($i);
- if (!in_array($nombreArchivo, $procesados)) {
- $faltantes[] = $nombreArchivo;
- }
- }
- echo "Archivos totales: $totalArchivos, ya procesados: " . count($procesados) . ", faltantes: " . count($faltantes) . "\n";
- // 6. Procesar faltantes
- foreach ($faltantes as $nombreArchivo) {
- echo "Procesando: {$nombreArchivo}\n";
- // Extraer contenido del archivo
- $contenido = $zip->getFromName($nombreArchivo);
- // Marcar archivo como procesado
- $procesados[] = $nombreArchivo;
- $progress = intval((count($procesados) / $totalArchivos) * 100);
- $results['processed_files'] = $procesados;
- $stmt = $pdo->prepare("
- UPDATE validation_jobs
- SET results = ?, progress_percentage = ?, updated_at = NOW()
- WHERE id = ?
- ");
- $stmt->execute([json_encode($results), $progress, $job['id']]);
-
- // Enviar notificación de progreso
- sendWebSocketNotification($job['user_id'], 'progress', $job['id'], $progress);
- echo "Progreso: {$progress}%\n";
- }
- $zip->close();
- // 7. Marcar como completado
- $stmt = $pdo->prepare("
- UPDATE validation_jobs
- SET status = 'completed', progress_percentage = 100, completed_at = NOW(), updated_at = NOW()
- WHERE id = ?
- ");
- $stmt->execute([$job['id']]);
-
- // Enviar notificación de completado
- sendWebSocketNotification($job['user_id'], 'completed', $job['id']);
- echo "Job completado: {$job['id']}\n";
- } catch (Exception $e) {
- echo "ERROR: " . $e->getMessage() . "\n";
-
- // Enviar notificación de error si hay job activo
- if (isset($job)) {
- $stmt = $pdo->prepare("UPDATE validation_jobs SET status = 'failed', error_message = ? WHERE id = ?");
- $stmt->execute([$e->getMessage(), $job['id']]);
- sendWebSocketNotification($job['user_id'], 'failed', $job['id']);
- }
- } finally {
- // 8. Eliminar lock global
- if (file_exists($lockFile)) {
- unlink($lockFile);
- }
- }
- function sendWebSocketNotification($userId, $status, $jobId, $progress = null) {
- try {
- require_once __DIR__ . '/../vendor/autoload.php';
-
- $client = new \ElephantIO\Client(\ElephantIO\Client::engine(\ElephantIO\Client::CLIENT_4X, 'http://localhost:3200'));
- $client->initialize();
- $client->of('/');
-
- $data = [
- 'jobId' => $jobId,
- 'status' => $status,
- 'message' => getStatusMessage($status)
- ];
-
- if ($progress !== null) {
- $data['progress'] = $progress;
- }
-
- $client->emit('laravel_emit', [
- 'event' => 'validation_status',
- 'data' => $data,
- 'userId' => (string)$userId
- ]);
-
- $client->close();
- echo "WebSocket: $status enviado a usuario $userId\n";
- } catch (Exception $e) {
- echo "WebSocket error: " . $e->getMessage() . "\n";
- }
- }
- function getStatusMessage($status) {
- $messages = [
- 'queued' => 'En cola de validación',
- 'processing' => 'Procesando archivos',
- 'progress' => 'Validando archivos',
- 'completed' => 'Validación completada',
- 'failed' => 'Error en validación'
- ];
- return $messages[$status] ?? 'Estado desconocido';
- }
|