summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorNasser Grainawi <nasser@codeaurora.org>2022-03-03 10:17:33 -0700
committerNasser Grainawi <nasser@codeaurora.org>2022-03-08 13:13:37 -0700
commit44171b65a0c1c390764cdd045aff1cd138c4b802 (patch)
tree450cdf34a1fd5ff56e6d6661193f6e178da40df1
parent9a76d4505e0f327db5d2b2a650b1c1985c26077b (diff)
Ensure states are updated for canceled replication tasksv3.4.4
When using 'replication start --wait' we need ReplicationState.waitForReplication() to return when the task we're waiting on has been canceled, either through an admin action or because the replication distributor determined another node already completed it. Add a couple tests for PushAll that confirm this behavior was previously broken and is fixed now. Change-Id: I36320ae079af5d7673e05d20ddc94b42a9b04347
-rw-r--r--src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java1
-rw-r--r--src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java78
2 files changed, 79 insertions, 0 deletions
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
index 8ef21d0..baf0328 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
@@ -459,6 +459,7 @@ public class Destination {
synchronized (stateLock) {
URIish uri = pushOp.getURI();
pending.remove(uri);
+ pushOp.notifyNotAttempted(pushOp.getRefs());
}
}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java
index a174e91..17c8933 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java
@@ -27,10 +27,19 @@ import com.google.gerrit.extensions.api.projects.BranchInput;
import com.google.gerrit.extensions.common.ProjectInfo;
import com.google.gerrit.extensions.events.ProjectDeletedListener;
import com.google.gerrit.extensions.registration.DynamicSet;
+import com.google.gerrit.server.git.WorkQueue;
import com.google.inject.Inject;
import java.time.Duration;
+import java.util.Arrays;
+import java.util.List;
import java.util.Optional;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
+import java.util.stream.Collectors;
import org.eclipse.jgit.lib.Constants;
import org.eclipse.jgit.lib.ObjectId;
import org.eclipse.jgit.lib.Ref;
@@ -217,6 +226,75 @@ public class ReplicationIT extends ReplicationDaemon {
}
@Test
+ public void pushAllWait() throws Exception {
+ createTestProject(project + "replica");
+
+ setReplicationDestination("foo", "replica", ALL_PROJECTS);
+ reloadConfig();
+
+ ReplicationState state = new ReplicationState(NO_OP);
+
+ Future<?> future =
+ plugin
+ .getSysInjector()
+ .getInstance(PushAll.Factory.class)
+ .create(null, new ReplicationFilter(Arrays.asList(project.get())), state, false)
+ .schedule(0, TimeUnit.SECONDS);
+
+ future.get();
+ state.waitForReplication();
+ }
+
+ @Test
+ public void pushAllWaitCancelNotRunningTask() throws Exception {
+ createTestProject(project + "replica");
+
+ setReplicationDestination("foo", "replica", ALL_PROJECTS);
+ reloadConfig();
+
+ ReplicationState state = new ReplicationState(NO_OP);
+
+ Future<?> future =
+ plugin
+ .getSysInjector()
+ .getInstance(PushAll.Factory.class)
+ .create(null, new ReplicationFilter(Arrays.asList(project.get())), state, false)
+ .schedule(0, TimeUnit.SECONDS);
+
+ CountDownLatch latch = new CountDownLatch(1);
+ Executor service = Executors.newSingleThreadExecutor();
+ service.execute(
+ new Runnable() {
+ @Override
+ public void run() {
+ try {
+ future.get();
+ state.waitForReplication();
+ latch.countDown();
+ } catch (Exception e) {
+ // fails the test because we don't countDown
+ }
+ }
+ });
+
+ // Cancel the replication task
+ waitUntil(() -> getProjectTasks().size() != 0);
+ WorkQueue.Task<?> task = getProjectTasks().get(0);
+ assertThat(task.getState()).isAnyOf(WorkQueue.Task.State.READY, WorkQueue.Task.State.SLEEPING);
+ task.cancel(false);
+
+ // Confirm our waiting thread completed
+ boolean receivedSignal = latch.await(5, TimeUnit.SECONDS); // FIXME Choose a good timeout
+ assertThat(receivedSignal).isTrue();
+ }
+
+ private List<WorkQueue.Task<?>> getProjectTasks() {
+ return getInstance(WorkQueue.class).getTasks().stream()
+ .filter(t -> t instanceof WorkQueue.ProjectTask)
+ .collect(Collectors.toList());
+ }
+
+ @Test
public void shouldReplicateHeadUpdate() throws Exception {
setReplicationDestination("foo", "replica", ALL_PROJECTS);
reloadConfig();