This commit is contained in:
Anthony Stirling
2025-02-21 14:32:29 +00:00
parent 5deb27cc12
commit 590a3b6cbb
2 changed files with 100 additions and 21 deletions

View File

@@ -5,9 +5,14 @@ import java.io.FileOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStream; import java.io.OutputStream;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.nio.file.FileSystemException;
import java.nio.file.FileVisitResult;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
import java.nio.file.Paths; import java.nio.file.Paths;
import java.nio.file.SimpleFileVisitor;
import java.nio.file.StandardCopyOption;
import java.nio.file.attribute.BasicFileAttributes;
import java.time.LocalDate; import java.time.LocalDate;
import java.time.LocalTime; import java.time.LocalTime;
import java.time.format.DateTimeFormatter; import java.time.format.DateTimeFormatter;
@@ -61,7 +66,7 @@ public class PipelineDirectoryProcessor {
@Scheduled(fixedRate = 60000) @Scheduled(fixedRate = 60000)
public void scanFolders() { public void scanFolders() {
Path watchedFolderPath = Paths.get(watchedFoldersDir); Path watchedFolderPath = Paths.get(watchedFoldersDir).toAbsolutePath();
if (!Files.exists(watchedFolderPath)) { if (!Files.exists(watchedFolderPath)) {
try { try {
Files.createDirectories(watchedFolderPath); Files.createDirectories(watchedFolderPath);
@@ -71,19 +76,30 @@ public class PipelineDirectoryProcessor {
return; return;
} }
} }
try (Stream<Path> paths = Files.walk(watchedFolderPath)) {
paths.filter(Files::isDirectory) try {
.forEach( Files.walkFileTree(watchedFolderPath, new SimpleFileVisitor<>() {
t -> { @Override
try { public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) {
if (!t.equals(watchedFolderPath) && !t.endsWith("processing")) { try {
handleDirectory(t); // Skip root directory and "processing" subdirectories
} if (!dir.equals(watchedFolderPath) && !dir.endsWith("processing")) {
} catch (Exception e) { handleDirectory(dir);
log.error("Error handling directory: {}", t, e); }
} } catch (Exception e) {
}); log.error("Error handling directory: {}", dir, e);
} catch (Exception e) { }
return FileVisitResult.CONTINUE;
}
@Override
public FileVisitResult visitFileFailed(Path path, IOException exc) {
// Handle broken symlinks or inaccessible directories
log.error("Error accessing path: {}", path, exc);
return FileVisitResult.CONTINUE;
}
});
} catch (IOException e) {
log.error("Error walking through directory: {}", watchedFolderPath, e); log.error("Error walking through directory: {}", watchedFolderPath, e);
} }
} }
@@ -128,7 +144,7 @@ public class PipelineDirectoryProcessor {
validateOperation(operation); validateOperation(operation);
File[] files = collectFilesForProcessing(dir, jsonFile, operation); File[] files = collectFilesForProcessing(dir, jsonFile, operation);
if (files == null || files.length == 0) { if (files == null || files.length == 0) {
log.debug("No files detected for {} ", dir); log.info("No files detected for {} ", dir);
return; return;
} }
List<File> filesToProcess = prepareFilesForProcessing(files, processingDir); List<File> filesToProcess = prepareFilesForProcessing(files, processingDir);
@@ -187,6 +203,7 @@ public class PipelineDirectoryProcessor {
} }
return isAllowed; return isAllowed;
}) })
.map(Path::toAbsolutePath)
.filter( .filter(
path -> { path -> {
boolean isReady = boolean isReady =
@@ -205,13 +222,39 @@ public class PipelineDirectoryProcessor {
} }
} }
private List<File> prepareFilesForProcessing(File[] files, Path processingDir) private List<File> prepareFilesForProcessing(File[] files, Path processingDir) throws IOException {
throws IOException {
List<File> filesToProcess = new ArrayList<>(); List<File> filesToProcess = new ArrayList<>();
for (File file : files) { for (File file : files) {
Path targetPath = resolveUniqueFilePath(processingDir, file.getName()); Path targetPath = resolveUniqueFilePath(processingDir, file.getName());
Files.move(file.toPath(), targetPath);
filesToProcess.add(targetPath.toFile()); // Retry with exponential backoff
int maxRetries = 3;
int retryDelayMs = 500;
boolean moved = false;
for (int attempt = 1; attempt <= maxRetries; attempt++) {
try {
Files.move(file.toPath(), targetPath, StandardCopyOption.REPLACE_EXISTING);
moved = true;
break;
} catch (FileSystemException e) {
if (attempt < maxRetries) {
log.info("File move failed (attempt {}), retrying...", attempt);
try {
Thread.sleep(retryDelayMs * (int) Math.pow(2, attempt-1));
} catch (InterruptedException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
}
}
}
if (moved) {
filesToProcess.add(targetPath.toFile());
} else {
log.error("Failed to move file after {} attempts: {}", maxRetries, file.getName());
}
} }
return filesToProcess; return filesToProcess;
} }

View File

@@ -2,7 +2,12 @@ package stirling.software.SPDF.utils;
import static java.nio.file.StandardWatchEventKinds.*; import static java.nio.file.StandardWatchEventKinds.*;
import java.io.FileInputStream;
import java.io.IOException; import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
import java.nio.channels.OverlappingFileLockException;
import java.nio.file.*; import java.nio.file.*;
import java.util.*; import java.util.*;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
@@ -43,7 +48,7 @@ public class FileMonitor {
this.pathFilter = pathFilter; this.pathFilter = pathFilter;
this.readyForProcessingFiles = ConcurrentHashMap.newKeySet(); this.readyForProcessingFiles = ConcurrentHashMap.newKeySet();
this.watchService = FileSystems.getDefault().newWatchService(); this.watchService = FileSystems.getDefault().newWatchService();
this.rootDir = Path.of(InstallationPathConfig.getPipelineWatchedFoldersDir()); this.rootDir = Path.of(InstallationPathConfig.getPipelineWatchedFoldersDir()).toAbsolutePath();
} }
private boolean shouldNotProcess(Path path) { private boolean shouldNotProcess(Path path) {
@@ -162,6 +167,37 @@ public class FileMonitor {
* @return true if the file is ready for processing, false otherwise * @return true if the file is ready for processing, false otherwise
*/ */
public boolean isFileReadyForProcessing(Path path) { public boolean isFileReadyForProcessing(Path path) {
return readyForProcessingFiles.contains(path); // 1. Check FileMonitor's ready list
boolean isReady = readyForProcessingFiles.contains(path.toAbsolutePath());
// 2. Check last modified timestamp
if (!isReady) {
try {
long lastModified = Files.getLastModifiedTime(path).toMillis();
long currentTime = System.currentTimeMillis();
isReady = (currentTime - lastModified) > 5000;
} catch (IOException e) {
log.info("Timestamp check failed for {}", path, e);
}
}
// 3. Direct file lock check
if (isReady) {
try (RandomAccessFile raf = new RandomAccessFile(path.toFile(), "rw");
FileChannel channel = raf.getChannel()) {
// Try acquiring an exclusive lock
FileLock lock = channel.tryLock();
if (lock == null) {
isReady = false;
} else {
lock.release();
}
} catch (IOException e) {
log.info("File lock detected on {}", path);
isReady = false;
}
}
return isReady;
} }
} }