From 590a3b6cbbd4b3f2f0ef11f1b32c99ea06e2b78c Mon Sep 17 00:00:00 2001 From: Anthony Stirling <77850077+Frooodle@users.noreply.github.com.> Date: Fri, 21 Feb 2025 14:32:29 +0000 Subject: [PATCH] init --- .../pipeline/PipelineDirectoryProcessor.java | 81 ++++++++++++++----- .../software/SPDF/utils/FileMonitor.java | 40 ++++++++- 2 files changed, 100 insertions(+), 21 deletions(-) diff --git a/src/main/java/stirling/software/SPDF/controller/api/pipeline/PipelineDirectoryProcessor.java b/src/main/java/stirling/software/SPDF/controller/api/pipeline/PipelineDirectoryProcessor.java index 38550c5c..b5d5fd64 100644 --- a/src/main/java/stirling/software/SPDF/controller/api/pipeline/PipelineDirectoryProcessor.java +++ b/src/main/java/stirling/software/SPDF/controller/api/pipeline/PipelineDirectoryProcessor.java @@ -5,9 +5,14 @@ import java.io.FileOutputStream; import java.io.IOException; import java.io.OutputStream; import java.nio.charset.StandardCharsets; +import java.nio.file.FileSystemException; +import java.nio.file.FileVisitResult; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; +import java.nio.file.SimpleFileVisitor; +import java.nio.file.StandardCopyOption; +import java.nio.file.attribute.BasicFileAttributes; import java.time.LocalDate; import java.time.LocalTime; import java.time.format.DateTimeFormatter; @@ -61,7 +66,7 @@ public class PipelineDirectoryProcessor { @Scheduled(fixedRate = 60000) public void scanFolders() { - Path watchedFolderPath = Paths.get(watchedFoldersDir); + Path watchedFolderPath = Paths.get(watchedFoldersDir).toAbsolutePath(); if (!Files.exists(watchedFolderPath)) { try { Files.createDirectories(watchedFolderPath); @@ -71,19 +76,30 @@ public class PipelineDirectoryProcessor { return; } } - try (Stream paths = Files.walk(watchedFolderPath)) { - paths.filter(Files::isDirectory) - .forEach( - t -> { - try { - if (!t.equals(watchedFolderPath) && !t.endsWith("processing")) { - handleDirectory(t); - } - } catch (Exception e) { - log.error("Error handling directory: {}", t, e); - } - }); - } catch (Exception e) { + + try { + Files.walkFileTree(watchedFolderPath, new SimpleFileVisitor<>() { + @Override + public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) { + try { + // Skip root directory and "processing" subdirectories + if (!dir.equals(watchedFolderPath) && !dir.endsWith("processing")) { + handleDirectory(dir); + } + } catch (Exception e) { + log.error("Error handling directory: {}", dir, e); + } + return FileVisitResult.CONTINUE; + } + + @Override + public FileVisitResult visitFileFailed(Path path, IOException exc) { + // Handle broken symlinks or inaccessible directories + log.error("Error accessing path: {}", path, exc); + return FileVisitResult.CONTINUE; + } + }); + } catch (IOException e) { log.error("Error walking through directory: {}", watchedFolderPath, e); } } @@ -128,7 +144,7 @@ public class PipelineDirectoryProcessor { validateOperation(operation); File[] files = collectFilesForProcessing(dir, jsonFile, operation); if (files == null || files.length == 0) { - log.debug("No files detected for {} ", dir); + log.info("No files detected for {} ", dir); return; } List filesToProcess = prepareFilesForProcessing(files, processingDir); @@ -187,6 +203,7 @@ public class PipelineDirectoryProcessor { } return isAllowed; }) + .map(Path::toAbsolutePath) .filter( path -> { boolean isReady = @@ -205,13 +222,39 @@ public class PipelineDirectoryProcessor { } } - private List prepareFilesForProcessing(File[] files, Path processingDir) - throws IOException { + private List prepareFilesForProcessing(File[] files, Path processingDir) throws IOException { List filesToProcess = new ArrayList<>(); for (File file : files) { Path targetPath = resolveUniqueFilePath(processingDir, file.getName()); - Files.move(file.toPath(), targetPath); - filesToProcess.add(targetPath.toFile()); + + // Retry with exponential backoff + int maxRetries = 3; + int retryDelayMs = 500; + boolean moved = false; + + for (int attempt = 1; attempt <= maxRetries; attempt++) { + try { + Files.move(file.toPath(), targetPath, StandardCopyOption.REPLACE_EXISTING); + moved = true; + break; + } catch (FileSystemException e) { + if (attempt < maxRetries) { + log.info("File move failed (attempt {}), retrying...", attempt); + try { + Thread.sleep(retryDelayMs * (int) Math.pow(2, attempt-1)); + } catch (InterruptedException e1) { + // TODO Auto-generated catch block + e1.printStackTrace(); + } + } + } + } + + if (moved) { + filesToProcess.add(targetPath.toFile()); + } else { + log.error("Failed to move file after {} attempts: {}", maxRetries, file.getName()); + } } return filesToProcess; } diff --git a/src/main/java/stirling/software/SPDF/utils/FileMonitor.java b/src/main/java/stirling/software/SPDF/utils/FileMonitor.java index 389d2819..5d9df2de 100644 --- a/src/main/java/stirling/software/SPDF/utils/FileMonitor.java +++ b/src/main/java/stirling/software/SPDF/utils/FileMonitor.java @@ -2,7 +2,12 @@ package stirling.software.SPDF.utils; import static java.nio.file.StandardWatchEventKinds.*; +import java.io.FileInputStream; import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.channels.FileChannel; +import java.nio.channels.FileLock; +import java.nio.channels.OverlappingFileLockException; import java.nio.file.*; import java.util.*; import java.util.concurrent.ConcurrentHashMap; @@ -43,7 +48,7 @@ public class FileMonitor { this.pathFilter = pathFilter; this.readyForProcessingFiles = ConcurrentHashMap.newKeySet(); this.watchService = FileSystems.getDefault().newWatchService(); - this.rootDir = Path.of(InstallationPathConfig.getPipelineWatchedFoldersDir()); + this.rootDir = Path.of(InstallationPathConfig.getPipelineWatchedFoldersDir()).toAbsolutePath(); } private boolean shouldNotProcess(Path path) { @@ -162,6 +167,37 @@ public class FileMonitor { * @return true if the file is ready for processing, false otherwise */ public boolean isFileReadyForProcessing(Path path) { - return readyForProcessingFiles.contains(path); + // 1. Check FileMonitor's ready list + boolean isReady = readyForProcessingFiles.contains(path.toAbsolutePath()); + + // 2. Check last modified timestamp + if (!isReady) { + try { + long lastModified = Files.getLastModifiedTime(path).toMillis(); + long currentTime = System.currentTimeMillis(); + isReady = (currentTime - lastModified) > 5000; + } catch (IOException e) { + log.info("Timestamp check failed for {}", path, e); + } + } + + // 3. Direct file lock check + if (isReady) { + try (RandomAccessFile raf = new RandomAccessFile(path.toFile(), "rw"); + FileChannel channel = raf.getChannel()) { + // Try acquiring an exclusive lock + FileLock lock = channel.tryLock(); + if (lock == null) { + isReady = false; + } else { + lock.release(); + } + } catch (IOException e) { + log.info("File lock detected on {}", path); + isReady = false; + } + } + + return isReady; } }