summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorLuca Milanesio <luca.milanesio@gmail.com>2019-12-09 22:03:12 +0000
committerLuca Milanesio <luca.milanesio@gmail.com>2019-12-09 23:21:14 +0000
commit0d91ffd189297f511c93d40a864978b5774805c1 (patch)
tree150d53254d13c382418ee96e7a849a3449847b03
parent505f2c63c41100d215e706c7df6932854d213bd7 (diff)
Synchronize access to ReplicationTasksStoragev3.1.2v3.1.1
The ReplicationTasksStorage can be subject to concurrency issues when a replication task is moved across directories (waiting/running/building) concurrently with the listing. The result of the uncontrolled concurrency could be lead to: 1. Flaky tests because of the replication tasks found two or more times in different directories 2. Flaky tests because of the failure to list replication tasks that are escaping across directories because of the rename 3. File-based exceptions when replication tasks are moved concurrently by two threads to different directories. The replication tasks storage is supposed to contain only small files and only in-flight operations: the overhead of the additional synchronisation is thus negligible compared to the overall latency of the replication itself. To eliminate all residual latency, cleanup all the replication tasks on all subdirectories at the start of the tests. Bug: Issue 11843 Change-Id: I5f6293b3f22f0943df79f8ab2cb2c217210e5236
-rw-r--r--src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java28
-rw-r--r--src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java20
2 files changed, 30 insertions, 18 deletions
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java
index 5564925..8130051 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java
@@ -85,19 +85,20 @@ public class ReplicationTasksStorage {
private static Gson GSON = new Gson();
+ private final Path refUpdates;
private final Path buildingUpdates;
private final Path runningUpdates;
private final Path waitingUpdates;
@Inject
ReplicationTasksStorage(ReplicationConfig config) {
- Path refUpdates = config.getEventsDirectory().resolve("ref-updates");
+ refUpdates = config.getEventsDirectory().resolve("ref-updates");
buildingUpdates = refUpdates.resolve("building");
runningUpdates = refUpdates.resolve("running");
waitingUpdates = refUpdates.resolve("waiting");
}
- public String create(ReplicateRefUpdate r) {
+ public synchronized String create(ReplicateRefUpdate r) {
return new Task(r).create();
}
@@ -106,44 +107,49 @@ public class ReplicationTasksStorage {
this.disableDeleteForTesting = deleteDisabled;
}
- public void start(PushOne push) {
+ public synchronized void start(PushOne push) {
for (String ref : push.getRefs()) {
new Task(new ReplicateRefUpdate(push, ref)).start();
}
}
- public void reset(PushOne push) {
+ public synchronized void reset(PushOne push) {
for (String ref : push.getRefs()) {
new Task(new ReplicateRefUpdate(push, ref)).reset();
}
}
- public void resetAll() {
- for (ReplicateRefUpdate r : list(createDir(runningUpdates))) {
+ public synchronized void resetAll() {
+ for (ReplicateRefUpdate r : listRunning()) {
new Task(r).reset();
}
}
- public void finish(PushOne push) {
+ public synchronized void finish(PushOne push) {
for (String ref : push.getRefs()) {
new Task(new ReplicateRefUpdate(push, ref)).finish();
}
}
- public List<ReplicateRefUpdate> listWaiting() {
+ public synchronized List<ReplicateRefUpdate> listWaiting() {
return list(createDir(waitingUpdates));
}
@VisibleForTesting
- public List<ReplicateRefUpdate> listRunning() {
+ public synchronized List<ReplicateRefUpdate> listRunning() {
return list(createDir(runningUpdates));
}
@VisibleForTesting
- public List<ReplicateRefUpdate> listBuilding() {
+ public synchronized List<ReplicateRefUpdate> listBuilding() {
return list(createDir(buildingUpdates));
}
+ @VisibleForTesting
+ public synchronized List<ReplicateRefUpdate> list() {
+ return list(createDir(refUpdates));
+ }
+
private List<ReplicateRefUpdate> list(Path tasks) {
List<ReplicateRefUpdate> results = new ArrayList<>();
try (DirectoryStream<Path> events = Files.newDirectoryStream(tasks)) {
@@ -151,6 +157,8 @@ public class ReplicationTasksStorage {
if (Files.isRegularFile(e)) {
String json = new String(Files.readAllBytes(e), UTF_8);
results.add(GSON.fromJson(json, ReplicateRefUpdate.class));
+ } else if (Files.isDirectory(e)) {
+ results.addAll(list(e));
}
}
} catch (IOException e) {
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java
index 41f93b3..9cf5489 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java
@@ -42,7 +42,6 @@ import java.util.List;
import java.util.Optional;
import java.util.function.Supplier;
import java.util.regex.Pattern;
-import java.util.stream.Stream;
import org.eclipse.jgit.lib.Constants;
import org.eclipse.jgit.lib.Ref;
import org.eclipse.jgit.lib.Repository;
@@ -400,18 +399,23 @@ public class ReplicationIT extends LightweightPluginDaemonTest {
private List<ReplicateRefUpdate> listReplicationTasks(String refRegex) {
Pattern refmaskPattern = Pattern.compile(refRegex);
- return Stream.concat(
- tasksStorage.listWaiting().stream(),
- Stream.concat(
- tasksStorage.listBuilding().stream(), tasksStorage.listRunning().stream()))
+ return tasksStorage.list().stream()
.filter(task -> refmaskPattern.matcher(task.ref).matches())
.collect(toList());
}
- private void cleanupReplicationTasks() throws IOException {
- try (DirectoryStream<Path> files = Files.newDirectoryStream(storagePath)) {
+ public void cleanupReplicationTasks() throws IOException {
+ cleanupReplicationTasks(storagePath);
+ }
+
+ private void cleanupReplicationTasks(Path basePath) throws IOException {
+ try (DirectoryStream<Path> files = Files.newDirectoryStream(basePath)) {
for (Path path : files) {
- path.toFile().delete();
+ if (Files.isDirectory(path)) {
+ cleanupReplicationTasks(path);
+ } else {
+ path.toFile().delete();
+ }
}
}
}