| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327 |
- <?php
- // Configuración de BD
- $host = 'localhost';
- $port = 3306;
- $database = 'samqa';
- $username = 'root';
- $password = 'root';
- // Archivo lock global
- $lockFile = __DIR__ . '/validation.lock';
- // 1. Verificar lock global
- if (file_exists($lockFile)) {
- echo "[" . date('Y-m-d H:i:s') . "] Otro proceso está en ejecución. Saliendo...\n";
- exit;
- }
- file_put_contents($lockFile, getmypid());
- // 2. Conectar BD
- try {
- $pdo = new PDO("mysql:host=$host;port=$port;dbname=$database", $username, $password);
- $pdo->setAttribute(PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION);
- // 3. Buscar job pendiente, en progreso o fallido
- $pdo->beginTransaction();
- // Primero buscar un job que quedó a medias
- $stmt = $pdo->prepare("
- SELECT * FROM validation_jobs
- WHERE status = 'processing' AND progress_percentage < 100
- ORDER BY created_at ASC
- LIMIT 1
- FOR UPDATE
- ");
- $stmt->execute();
- $job = $stmt->fetch(PDO::FETCH_ASSOC);
- // Si no hay, buscar jobs fallidos para reintentar
- if (!$job) {
- $stmt = $pdo->prepare("
- SELECT * FROM validation_jobs
- WHERE status = 'failed'
- ORDER BY created_at ASC
- LIMIT 1
- FOR UPDATE
- ");
- $stmt->execute();
- $job = $stmt->fetch(PDO::FETCH_ASSOC);
-
- if ($job) {
- echo "Reintentando job fallido: {$job['id']}\n";
- $update = $pdo->prepare("
- UPDATE validation_jobs
- SET status = 'processing', error_message = NULL, updated_at = NOW()
- WHERE id = ?
- ");
- $update->execute([$job['id']]);
- }
- }
- // Si no hay, tomar uno en cola
- if (!$job) {
- $stmt = $pdo->prepare("
- SELECT * FROM validation_jobs
- WHERE status = 'queued'
- ORDER BY created_at ASC
- LIMIT 1
- FOR UPDATE
- ");
- $stmt->execute();
- $job = $stmt->fetch(PDO::FETCH_ASSOC);
- if ($job) {
- $update = $pdo->prepare("
- UPDATE validation_jobs
- SET status = 'processing', updated_at = NOW()
- WHERE id = ?
- ");
- $update->execute([$job['id']]);
- }
- }
- $pdo->commit();
- if (!$job) {
- echo "[" . date('Y-m-d H:i:s') . "] No hay trabajos pendientes.\n";
- unlink($lockFile);
- exit;
- }
- echo "Procesando Job ID: {$job['id']}\n";
-
- // Enviar notificación WebSocket de inicio
- sendWebSocketNotification($job['user_id'], 'processing', $job['id']);
- // 4. Determinar archivos ya procesados
- $results = json_decode($job['results'], true) ?? ['processed_files' => []];
- $procesados = $results['processed_files'] ?? [];
- // 5. Buscar ruta del ZIP en base de datos
- $stmt = $pdo->prepare("SELECT ARTE_UBTE FROM s002v01tarte WHERE ARTE_IDAR = ?");
- $stmt->execute([$job['zip_temp_id']]);
- $zipData = $stmt->fetch(PDO::FETCH_ASSOC);
-
- if (!$zipData) {
- throw new Exception("ZIP ID no encontrado: {$job['zip_temp_id']}");
- }
-
- $zipPath = $zipData['ARTE_UBTE'];
- if (!file_exists($zipPath)) {
- throw new Exception("ZIP no encontrado: {$zipPath}");
- }
-
- // Guardar IDs de archivos temporales
- $tempFiles = [
- 'excel' => getEncrypt($job['excel_temp_id']),
- 'zip' => getEncrypt($job['zip_temp_id'])
- ];
-
- $individualTempFiles = [];
- $zip = new ZipArchive();
- if ($zip->open($zipPath) !== TRUE) {
- throw new Exception("No se pudo abrir el ZIP: {$zipPath}");
- }
- // Crear carpeta de extracción temporal
- $extractDir = __DIR__ . '/../storage/app/tempFiles/extracted_' . time();
- if (!mkdir($extractDir, 0755, true)) {
- throw new Exception("No se pudo crear directorio de extracción: {$extractDir}");
- }
- // Obtener solo archivos (no directorios)
- $archivos = [];
- for ($i = 0; $i < $zip->numFiles; $i++) {
- $stat = $zip->statIndex($i);
- $nombreArchivo = $stat['name'];
-
- // Solo procesar archivos, no directorios
- if (substr($nombreArchivo, -1) !== '/') {
- $archivos[] = basename($nombreArchivo); // Solo nombre sin ruta
- }
- }
- $totalArchivos = count($archivos);
- $faltantes = array_diff($archivos, $procesados);
- echo "Archivos totales: $totalArchivos, ya procesados: " . count($procesados) . ", faltantes: " . count($faltantes) . "\n";
- // 6. Procesar faltantes
- foreach ($faltantes as $nombreArchivo) {
- echo "Procesando: {$nombreArchivo}\n";
- // Extraer archivo del ZIP
- $archivoExtraido = $extractDir . '/' . $nombreArchivo;
- $contenido = null;
-
- // Buscar el archivo en el ZIP (puede estar en subdirectorios)
- for ($i = 0; $i < $zip->numFiles; $i++) {
- $stat = $zip->statIndex($i);
- if (basename($stat['name']) === $nombreArchivo) {
- $contenido = $zip->getFromIndex($i);
- break;
- }
- }
-
- if ($contenido === null) {
- echo "Archivo no encontrado en ZIP: {$nombreArchivo}\n";
- continue;
- }
-
- // Guardar archivo extraído
- file_put_contents($archivoExtraido, $contenido);
- // Enviar archivo al endpoint usando multipart/form-data
- $ch = curl_init('http://192.168.2.21:8000/api/upload-temp-file');
- curl_setopt($ch, CURLOPT_POST, true);
- curl_setopt($ch, CURLOPT_POSTFIELDS, [
- 'file' => new CURLFile($archivoExtraido, mime_content_type($archivoExtraido), $nombreArchivo),
- 'id_user' => $job['user_id'],
- 'linea' => $job['linea']
- ]);
- curl_setopt($ch, CURLOPT_RETURNTRANSFER, true);
- $response = curl_exec($ch);
- $httpCode = curl_getinfo($ch, CURLINFO_HTTP_CODE);
- curl_close($ch);
-
- // Eliminar archivo temporal
- unlink($archivoExtraido);
-
- // Solo marcar como procesado si el endpoint respondió exitosamente
- if ($httpCode >= 200 && $httpCode < 300) {
- $responseData = json_decode($response, true);
- if (isset($responseData['response']['idArchivo'])) {
- $individualTempFiles[] = [
- 'original_name' => $nombreArchivo,
- 'temp_id' => $responseData['response']['idArchivo']
- ];
- }
-
- $procesados[] = $nombreArchivo;
- $progress = intval((count($procesados) / $totalArchivos) * 100);
- $results['processed_files'] = $procesados;
- $stmt = $pdo->prepare("
- UPDATE validation_jobs
- SET results = ?, progress_percentage = ?, updated_at = NOW()
- WHERE id = ?
- ");
- $stmt->execute([json_encode($results), $progress, $job['id']]);
-
- // Enviar notificación de progreso
- sendWebSocketNotification($job['user_id'], 'progress', $job['id'], $progress);
- //echo "Progreso: {$progress}%\n";
- } else {
- echo "Error al procesar archivo {$nombreArchivo}: HTTP {$httpCode}\n";
- throw new Exception("Error al procesar archivo: {$nombreArchivo}");
- }
- }
- $zip->close();
-
- // Eliminar directorio de extracción
- rmdir($extractDir);
- // 7. Marcar como completado
- $stmt = $pdo->prepare("
- UPDATE validation_jobs
- SET status = 'completed', progress_percentage = 100, completed_at = NOW(), updated_at = NOW()
- WHERE id = ?
- ");
- $stmt->execute([$job['id']]);
-
- // Enviar notificación de completado con archivos temporales
- $completionData = [
- 'temp_files' => $tempFiles,
- 'individual_temp_files' => $individualTempFiles
- ];
- sendWebSocketNotification($job['user_id'], 'completed', $job['id'], 100, $completionData);
- echo "Job completado: {$job['id']}\n";
- } catch (Exception $e) {
- echo "ERROR: " . $e->getMessage() . "\n";
-
- // Enviar notificación de error si hay job activo
- if (isset($job)) {
- $stmt = $pdo->prepare("UPDATE validation_jobs SET status = 'failed', error_message = ? WHERE id = ?");
- $stmt->execute([$e->getMessage(), $job['id']]);
- sendWebSocketNotification($job['user_id'], 'failed', $job['id']);
- }
- } finally {
- // 8. Limpiar y eliminar lock global
- if (isset($extractDir) && is_dir($extractDir)) {
- $files = glob("$extractDir/*");
- foreach ($files as $file) {
- if (is_file($file)) {
- unlink($file);
- }
- }
- rmdir($extractDir);
- }
- if (file_exists($lockFile)) {
- unlink($lockFile);
- }
- }
- function sendWebSocketNotification($userId, $status, $jobId, $progress = 0, $completionData = []) {
- try {
- require_once __DIR__ . '/../vendor/autoload.php';
-
- $client = new \ElephantIO\Client(\ElephantIO\Client::engine(\ElephantIO\Client::CLIENT_4X, 'http://localhost:3200'));
- $client->initialize();
- $client->of('/');
-
- $data = [
- 'jobId' => $jobId,
- 'status' => $status,
- 'message' => getStatusMessage($status)
- ];
-
- if ($progress !== null) {
- $data['progress'] = $progress;
- }
-
- if (!empty($completionData)) {
- $data = array_merge($data, $completionData);
- }
- $client->emit('laravel_emit', [
- 'event' => 'validation_status',
- 'data' => $data,
- 'completionData' => $completionData,
- 'userId' => (string)$userId
- ]);
-
- $client->close();
- //echo "WebSocket: $status enviado a usuario $userId\n";
- } catch (Exception $e) {
- echo "WebSocket error: " . $e->getMessage() . "\n";
- }
- }
- function getStatusMessage($status) {
- $messages = [
- 'queued' => 'En cola de validación',
- 'processing' => 'Procesando archivos',
- 'progress' => 'Validando archivos',
- 'completed' => 'Validación completada',
- 'failed' => 'Error en validación'
- ];
- return $messages[$status] ?? 'Estado desconocido';
- }
- function getEncrypt($value){
- $ch = curl_init('http://192.168.2.21:8000/api/encrypt');
- curl_setopt($ch, CURLOPT_POST, true);
- curl_setopt($ch, CURLOPT_POSTFIELDS, ['value' => $value]);
- curl_setopt($ch, CURLOPT_RETURNTRANSFER, true);
- $response = curl_exec($ch);
- curl_close($ch);
-
- $responseData = json_decode($response, true);
- return $responseData['response']['encrypted'];
- }
|