validateLoadArchives.php 8.9 KB


  1. <?php
  2. // Configuración de BD
  3. $host = 'localhost';
  4. $port = 3306;
  5. $database = 'samqa';
  6. $username = 'root';
  7. $password = 'root';
  8. // Archivo lock global
  9. $lockFile = __DIR__ . '/validation.lock';
  10. // 1. Verificar lock global
  11. if (file_exists($lockFile)) {
  12. echo "[" . date('Y-m-d H:i:s') . "] Otro proceso está en ejecución. Saliendo...\n";
  13. exit;
  14. }
  15. file_put_contents($lockFile, getmypid());
  16. // 2. Conectar BD
  17. try {
  18. $pdo = new PDO("mysql:host=$host;port=$port;dbname=$database", $username, $password);
  19. $pdo->setAttribute(PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION);
  20. // 3. Buscar job pendiente, en progreso o fallido
  21. $pdo->beginTransaction();
  22. // Primero buscar un job que quedó a medias
  23. $stmt = $pdo->prepare("
  24. SELECT * FROM validation_jobs
  25. WHERE status = 'processing' AND progress_percentage < 100
  26. ORDER BY created_at ASC
  27. LIMIT 1
  28. FOR UPDATE
  29. ");
  30. $stmt->execute();
  31. $job = $stmt->fetch(PDO::FETCH_ASSOC);
  32. // Si no hay, buscar jobs fallidos para reintentar
  33. if (!$job) {
  34. $stmt = $pdo->prepare("
  35. SELECT * FROM validation_jobs
  36. WHERE status = 'failed'
  37. ORDER BY created_at ASC
  38. LIMIT 1
  39. FOR UPDATE
  40. ");
  41. $stmt->execute();
  42. $job = $stmt->fetch(PDO::FETCH_ASSOC);
  43. if ($job) {
  44. echo "Reintentando job fallido: {$job['id']}\n";
  45. $update = $pdo->prepare("
  46. UPDATE validation_jobs
  47. SET status = 'processing', error_message = NULL, updated_at = NOW()
  48. WHERE id = ?
  49. ");
  50. $update->execute([$job['id']]);
  51. }
  52. }
  53. // Si no hay, tomar uno en cola
  54. if (!$job) {
  55. $stmt = $pdo->prepare("
  56. SELECT * FROM validation_jobs
  57. WHERE status = 'queued'
  58. ORDER BY created_at ASC
  59. LIMIT 1
  60. FOR UPDATE
  61. ");
  62. $stmt->execute();
  63. $job = $stmt->fetch(PDO::FETCH_ASSOC);
  64. if ($job) {
  65. $update = $pdo->prepare("
  66. UPDATE validation_jobs
  67. SET status = 'processing', updated_at = NOW()
  68. WHERE id = ?
  69. ");
  70. $update->execute([$job['id']]);
  71. }
  72. }
  73. $pdo->commit();
  74. if (!$job) {
  75. echo "[" . date('Y-m-d H:i:s') . "] No hay trabajos pendientes.\n";
  76. unlink($lockFile);
  77. exit;
  78. }
  79. echo "Procesando Job ID: {$job['id']}\n";
  80. // Enviar notificación WebSocket de inicio
  81. sendWebSocketNotification($job['user_id'], 'processing', $job['id']);
  82. // 4. Determinar archivos ya procesados
  83. $results = json_decode($job['results'], true) ?? ['processed_files' => []];
  84. $procesados = $results['processed_files'] ?? [];
  85. // 5. Abrir ZIP y obtener lista completa de archivos
  86. $zipPath = $job['zip_temp_path'];
  87. if (!file_exists($zipPath)) {
  88. throw new Exception("ZIP no encontrado: {$zipPath}");
  89. }
  90. $zip = new ZipArchive();
  91. if ($zip->open($zipPath) !== TRUE) {
  92. throw new Exception("No se pudo abrir el ZIP: {$zipPath}");
  93. }
  94. // Crear carpeta de extracción temporal
  95. $extractDir = __DIR__ . '/../storage/app/tempFiles/extracted_' . time();
  96. if (!mkdir($extractDir, 0755, true)) {
  97. throw new Exception("No se pudo crear directorio de extracción: {$extractDir}");
  98. }
  99. // Obtener solo archivos (no directorios)
  100. $archivos = [];
  101. for ($i = 0; $i < $zip->numFiles; $i++) {
  102. $stat = $zip->statIndex($i);
  103. $nombreArchivo = $stat['name'];
  104. // Solo procesar archivos, no directorios
  105. if (substr($nombreArchivo, -1) !== '/') {
  106. $archivos[] = basename($nombreArchivo); // Solo nombre sin ruta
  107. }
  108. }
  109. $totalArchivos = count($archivos);
  110. $faltantes = array_diff($archivos, $procesados);
  111. echo "Archivos totales: $totalArchivos, ya procesados: " . count($procesados) . ", faltantes: " . count($faltantes) . "\n";
  112. // 6. Procesar faltantes
  113. foreach ($faltantes as $nombreArchivo) {
  114. echo "Procesando: {$nombreArchivo}\n";
  115. // Extraer archivo del ZIP
  116. $archivoExtraido = $extractDir . '/' . $nombreArchivo;
  117. $contenido = null;
  118. // Buscar el archivo en el ZIP (puede estar en subdirectorios)
  119. for ($i = 0; $i < $zip->numFiles; $i++) {
  120. $stat = $zip->statIndex($i);
  121. if (basename($stat['name']) === $nombreArchivo) {
  122. $contenido = $zip->getFromIndex($i);
  123. break;
  124. }
  125. }
  126. if ($contenido === null) {
  127. echo "Archivo no encontrado en ZIP: {$nombreArchivo}\n";
  128. continue;
  129. }
  130. // Guardar archivo extraído
  131. file_put_contents($archivoExtraido, $contenido);
  132. // Enviar archivo al endpoint usando multipart/form-data
  133. $ch = curl_init('http://192.168.2.21:8000/api/upload-temp-file');
  134. curl_setopt($ch, CURLOPT_POST, true);
  135. curl_setopt($ch, CURLOPT_POSTFIELDS, [
  136. 'file' => new CURLFile($archivoExtraido, mime_content_type($archivoExtraido), $nombreArchivo),
  137. 'id_user' => $job['user_id'],
  138. 'linea' => $job['linea']
  139. ]);
  140. curl_setopt($ch, CURLOPT_RETURNTRANSFER, true);
  141. $response = curl_exec($ch);
  142. $httpCode = curl_getinfo($ch, CURLINFO_HTTP_CODE);
  143. curl_close($ch);
  144. // Eliminar archivo temporal
  145. unlink($archivoExtraido);
  146. // Solo marcar como procesado si el endpoint respondió exitosamente
  147. echo "response: {$response}\n";
  148. if ($httpCode >= 200 && $httpCode < 300) {
  149. $procesados[] = $nombreArchivo;
  150. $progress = intval((count($procesados) / $totalArchivos) * 100);
  151. $results['processed_files'] = $procesados;
  152. $stmt = $pdo->prepare("
  153. UPDATE validation_jobs
  154. SET results = ?, progress_percentage = ?, updated_at = NOW()
  155. WHERE id = ?
  156. ");
  157. $stmt->execute([json_encode($results), $progress, $job['id']]);
  158. // Enviar notificación de progreso
  159. sendWebSocketNotification($job['user_id'], 'progress', $job['id'], $progress);
  160. echo "Progreso: {$progress}%\n";
  161. } else {
  162. echo "Error al procesar archivo {$nombreArchivo}: HTTP {$httpCode}\n";
  163. throw new Exception("Error al procesar archivo: {$nombreArchivo}");
  164. }
  165. }
  166. $zip->close();
  167. // Eliminar directorio de extracción
  168. rmdir($extractDir);
  169. // 7. Marcar como completado
  170. $stmt = $pdo->prepare("
  171. UPDATE validation_jobs
  172. SET status = 'completed', progress_percentage = 100, completed_at = NOW(), updated_at = NOW()
  173. WHERE id = ?
  174. ");
  175. $stmt->execute([$job['id']]);
  176. // Enviar notificación de completado
  177. sendWebSocketNotification($job['user_id'], 'completed', $job['id'], 100);
  178. echo "Job completado: {$job['id']}\n";
  179. } catch (Exception $e) {
  180. echo "ERROR: " . $e->getMessage() . "\n";
  181. // Enviar notificación de error si hay job activo
  182. if (isset($job)) {
  183. $stmt = $pdo->prepare("UPDATE validation_jobs SET status = 'failed', error_message = ? WHERE id = ?");
  184. $stmt->execute([$e->getMessage(), $job['id']]);
  185. sendWebSocketNotification($job['user_id'], 'failed', $job['id']);
  186. }
  187. } finally {
  188. // 8. Limpiar y eliminar lock global
  189. if (isset($extractDir) && is_dir($extractDir)) {
  190. $files = glob("$extractDir/*");
  191. foreach ($files as $file) {
  192. if (is_file($file)) {
  193. unlink($file);
  194. }
  195. }
  196. rmdir($extractDir);
  197. }
  198. if (file_exists($lockFile)) {
  199. unlink($lockFile);
  200. }
  201. }
  202. function sendWebSocketNotification($userId, $status, $jobId, $progress = null) {
  203. try {
  204. require_once __DIR__ . '/../vendor/autoload.php';
  205. $client = new \ElephantIO\Client(\ElephantIO\Client::engine(\ElephantIO\Client::CLIENT_4X, 'http://localhost:3200'));
  206. $client->initialize();
  207. $client->of('/');
  208. $data = [
  209. 'jobId' => $jobId,
  210. 'status' => $status,
  211. 'message' => getStatusMessage($status)
  212. ];
  213. if ($progress !== null) {
  214. $data['progress'] = $progress;
  215. }
  216. $client->emit('laravel_emit', [
  217. 'event' => 'validation_status',
  218. 'data' => $data,
  219. 'userId' => (string)$userId
  220. ]);
  221. $client->close();
  222. echo "WebSocket: $status enviado a usuario $userId\n";
  223. } catch (Exception $e) {
  224. echo "WebSocket error: " . $e->getMessage() . "\n";
  225. }
  226. }
  227. function getStatusMessage($status) {
  228. $messages = [
  229. 'queued' => 'En cola de validación',
  230. 'processing' => 'Procesando archivos',
  231. 'progress' => 'Validando archivos',
  232. 'completed' => 'Validación completada',
  233. 'failed' => 'Error en validación'
  234. ];
  235. return $messages[$status] ?? 'Estado desconocido';
  236. }