summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java28
-rw-r--r--src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java20
2 files changed, 30 insertions, 18 deletions
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java
index 5564925..8130051 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java
@@ -85,19 +85,20 @@ public class ReplicationTasksStorage {
private static Gson GSON = new Gson();
+ private final Path refUpdates;
private final Path buildingUpdates;
private final Path runningUpdates;
private final Path waitingUpdates;
@Inject
ReplicationTasksStorage(ReplicationConfig config) {
- Path refUpdates = config.getEventsDirectory().resolve("ref-updates");
+ refUpdates = config.getEventsDirectory().resolve("ref-updates");
buildingUpdates = refUpdates.resolve("building");
runningUpdates = refUpdates.resolve("running");
waitingUpdates = refUpdates.resolve("waiting");
}
- public String create(ReplicateRefUpdate r) {
+ public synchronized String create(ReplicateRefUpdate r) {
return new Task(r).create();
}
@@ -106,44 +107,49 @@ public class ReplicationTasksStorage {
this.disableDeleteForTesting = deleteDisabled;
}
- public void start(PushOne push) {
+ public synchronized void start(PushOne push) {
for (String ref : push.getRefs()) {
new Task(new ReplicateRefUpdate(push, ref)).start();
}
}
- public void reset(PushOne push) {
+ public synchronized void reset(PushOne push) {
for (String ref : push.getRefs()) {
new Task(new ReplicateRefUpdate(push, ref)).reset();
}
}
- public void resetAll() {
- for (ReplicateRefUpdate r : list(createDir(runningUpdates))) {
+ public synchronized void resetAll() {
+ for (ReplicateRefUpdate r : listRunning()) {
new Task(r).reset();
}
}
- public void finish(PushOne push) {
+ public synchronized void finish(PushOne push) {
for (String ref : push.getRefs()) {
new Task(new ReplicateRefUpdate(push, ref)).finish();
}
}
- public List<ReplicateRefUpdate> listWaiting() {
+ public synchronized List<ReplicateRefUpdate> listWaiting() {
return list(createDir(waitingUpdates));
}
@VisibleForTesting
- public List<ReplicateRefUpdate> listRunning() {
+ public synchronized List<ReplicateRefUpdate> listRunning() {
return list(createDir(runningUpdates));
}
@VisibleForTesting
- public List<ReplicateRefUpdate> listBuilding() {
+ public synchronized List<ReplicateRefUpdate> listBuilding() {
return list(createDir(buildingUpdates));
}
+ @VisibleForTesting
+ public synchronized List<ReplicateRefUpdate> list() {
+ return list(createDir(refUpdates));
+ }
+
private List<ReplicateRefUpdate> list(Path tasks) {
List<ReplicateRefUpdate> results = new ArrayList<>();
try (DirectoryStream<Path> events = Files.newDirectoryStream(tasks)) {
@@ -151,6 +157,8 @@ public class ReplicationTasksStorage {
if (Files.isRegularFile(e)) {
String json = new String(Files.readAllBytes(e), UTF_8);
results.add(GSON.fromJson(json, ReplicateRefUpdate.class));
+ } else if (Files.isDirectory(e)) {
+ results.addAll(list(e));
}
}
} catch (IOException e) {
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java
index 41f93b3..9cf5489 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java
@@ -42,7 +42,6 @@ import java.util.List;
import java.util.Optional;
import java.util.function.Supplier;
import java.util.regex.Pattern;
-import java.util.stream.Stream;
import org.eclipse.jgit.lib.Constants;
import org.eclipse.jgit.lib.Ref;
import org.eclipse.jgit.lib.Repository;
@@ -400,18 +399,23 @@ public class ReplicationIT extends LightweightPluginDaemonTest {
private List<ReplicateRefUpdate> listReplicationTasks(String refRegex) {
Pattern refmaskPattern = Pattern.compile(refRegex);
- return Stream.concat(
- tasksStorage.listWaiting().stream(),
- Stream.concat(
- tasksStorage.listBuilding().stream(), tasksStorage.listRunning().stream()))
+ return tasksStorage.list().stream()
.filter(task -> refmaskPattern.matcher(task.ref).matches())
.collect(toList());
}
- private void cleanupReplicationTasks() throws IOException {
- try (DirectoryStream<Path> files = Files.newDirectoryStream(storagePath)) {
+ public void cleanupReplicationTasks() throws IOException {
+ cleanupReplicationTasks(storagePath);
+ }
+
+ private void cleanupReplicationTasks(Path basePath) throws IOException {
+ try (DirectoryStream<Path> files = Files.newDirectoryStream(basePath)) {
for (Path path : files) {
- path.toFile().delete();
+ if (Files.isDirectory(path)) {
+ cleanupReplicationTasks(path);
+ } else {
+ path.toFile().delete();
+ }
}
}
}