diff options
-rw-r--r-- | src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java | 28 | ||||
-rw-r--r-- | src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java | 20 |
2 files changed, 30 insertions, 18 deletions
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java index 5564925..8130051 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java @@ -85,19 +85,20 @@ public class ReplicationTasksStorage { private static Gson GSON = new Gson(); + private final Path refUpdates; private final Path buildingUpdates; private final Path runningUpdates; private final Path waitingUpdates; @Inject ReplicationTasksStorage(ReplicationConfig config) { - Path refUpdates = config.getEventsDirectory().resolve("ref-updates"); + refUpdates = config.getEventsDirectory().resolve("ref-updates"); buildingUpdates = refUpdates.resolve("building"); runningUpdates = refUpdates.resolve("running"); waitingUpdates = refUpdates.resolve("waiting"); } - public String create(ReplicateRefUpdate r) { + public synchronized String create(ReplicateRefUpdate r) { return new Task(r).create(); } @@ -106,44 +107,49 @@ public class ReplicationTasksStorage { this.disableDeleteForTesting = deleteDisabled; } - public void start(PushOne push) { + public synchronized void start(PushOne push) { for (String ref : push.getRefs()) { new Task(new ReplicateRefUpdate(push, ref)).start(); } } - public void reset(PushOne push) { + public synchronized void reset(PushOne push) { for (String ref : push.getRefs()) { new Task(new ReplicateRefUpdate(push, ref)).reset(); } } - public void resetAll() { - for (ReplicateRefUpdate r : list(createDir(runningUpdates))) { + public synchronized void resetAll() { + for (ReplicateRefUpdate r : listRunning()) { new Task(r).reset(); } } - public void finish(PushOne push) { + public synchronized void finish(PushOne push) { for (String ref : push.getRefs()) { new Task(new ReplicateRefUpdate(push, ref)).finish(); } } - public List<ReplicateRefUpdate> listWaiting() { + public synchronized List<ReplicateRefUpdate> listWaiting() { return list(createDir(waitingUpdates)); } @VisibleForTesting - public List<ReplicateRefUpdate> listRunning() { + public synchronized List<ReplicateRefUpdate> listRunning() { return list(createDir(runningUpdates)); } @VisibleForTesting - public List<ReplicateRefUpdate> listBuilding() { + public synchronized List<ReplicateRefUpdate> listBuilding() { return list(createDir(buildingUpdates)); } + @VisibleForTesting + public synchronized List<ReplicateRefUpdate> list() { + return list(createDir(refUpdates)); + } + private List<ReplicateRefUpdate> list(Path tasks) { List<ReplicateRefUpdate> results = new ArrayList<>(); try (DirectoryStream<Path> events = Files.newDirectoryStream(tasks)) { @@ -151,6 +157,8 @@ public class ReplicationTasksStorage { if (Files.isRegularFile(e)) { String json = new String(Files.readAllBytes(e), UTF_8); results.add(GSON.fromJson(json, ReplicateRefUpdate.class)); + } else if (Files.isDirectory(e)) { + results.addAll(list(e)); } } } catch (IOException e) { diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java index 41f93b3..9cf5489 100644 --- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java +++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java @@ -42,7 +42,6 @@ import java.util.List; import java.util.Optional; import java.util.function.Supplier; import java.util.regex.Pattern; -import java.util.stream.Stream; import org.eclipse.jgit.lib.Constants; import org.eclipse.jgit.lib.Ref; import org.eclipse.jgit.lib.Repository; @@ -400,18 +399,23 @@ public class ReplicationIT extends LightweightPluginDaemonTest { private List<ReplicateRefUpdate> listReplicationTasks(String refRegex) { Pattern refmaskPattern = Pattern.compile(refRegex); - return Stream.concat( - tasksStorage.listWaiting().stream(), - Stream.concat( - tasksStorage.listBuilding().stream(), tasksStorage.listRunning().stream())) + return tasksStorage.list().stream() .filter(task -> refmaskPattern.matcher(task.ref).matches()) .collect(toList()); } - private void cleanupReplicationTasks() throws IOException { - try (DirectoryStream<Path> files = Files.newDirectoryStream(storagePath)) { + public void cleanupReplicationTasks() throws IOException { + cleanupReplicationTasks(storagePath); + } + + private void cleanupReplicationTasks(Path basePath) throws IOException { + try (DirectoryStream<Path> files = Files.newDirectoryStream(basePath)) { for (Path path : files) { - path.toFile().delete(); + if (Files.isDirectory(path)) { + cleanupReplicationTasks(path); + } else { + path.toFile().delete(); + } } } } |