diff options
author | Nasser Grainawi <nasser@codeaurora.org> | 2022-03-03 10:17:33 -0700 |
---|---|---|
committer | Nasser Grainawi <nasser@codeaurora.org> | 2022-03-08 13:13:37 -0700 |
commit | 44171b65a0c1c390764cdd045aff1cd138c4b802 (patch) | |
tree | 450cdf34a1fd5ff56e6d6661193f6e178da40df1 | |
parent | 9a76d4505e0f327db5d2b2a650b1c1985c26077b (diff) |
Ensure states are updated for canceled replication tasksv3.4.4
When using 'replication start --wait' we need
ReplicationState.waitForReplication() to return when the task we're
waiting on has been canceled, either through an admin action or because
the replication distributor determined another node already completed
it.
Add a couple tests for PushAll that confirm this behavior was previously
broken and is fixed now.
Change-Id: I36320ae079af5d7673e05d20ddc94b42a9b04347
-rw-r--r-- | src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java | 1 | ||||
-rw-r--r-- | src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java | 78 |
2 files changed, 79 insertions, 0 deletions
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java index 8ef21d0..baf0328 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java @@ -459,6 +459,7 @@ public class Destination { synchronized (stateLock) { URIish uri = pushOp.getURI(); pending.remove(uri); + pushOp.notifyNotAttempted(pushOp.getRefs()); } } diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java index a174e91..17c8933 100644 --- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java +++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java @@ -27,10 +27,19 @@ import com.google.gerrit.extensions.api.projects.BranchInput; import com.google.gerrit.extensions.common.ProjectInfo; import com.google.gerrit.extensions.events.ProjectDeletedListener; import com.google.gerrit.extensions.registration.DynamicSet; +import com.google.gerrit.server.git.WorkQueue; import com.google.inject.Inject; import java.time.Duration; +import java.util.Arrays; +import java.util.List; import java.util.Optional; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import java.util.function.Supplier; +import java.util.stream.Collectors; import org.eclipse.jgit.lib.Constants; import org.eclipse.jgit.lib.ObjectId; import org.eclipse.jgit.lib.Ref; @@ -217,6 +226,75 @@ public class ReplicationIT extends ReplicationDaemon { } @Test + public void pushAllWait() throws Exception { + createTestProject(project + "replica"); + + setReplicationDestination("foo", "replica", ALL_PROJECTS); + reloadConfig(); + + ReplicationState state = new ReplicationState(NO_OP); + + Future<?> future = + plugin + .getSysInjector() + .getInstance(PushAll.Factory.class) + .create(null, new ReplicationFilter(Arrays.asList(project.get())), state, false) + .schedule(0, TimeUnit.SECONDS); + + future.get(); + state.waitForReplication(); + } + + @Test + public void pushAllWaitCancelNotRunningTask() throws Exception { + createTestProject(project + "replica"); + + setReplicationDestination("foo", "replica", ALL_PROJECTS); + reloadConfig(); + + ReplicationState state = new ReplicationState(NO_OP); + + Future<?> future = + plugin + .getSysInjector() + .getInstance(PushAll.Factory.class) + .create(null, new ReplicationFilter(Arrays.asList(project.get())), state, false) + .schedule(0, TimeUnit.SECONDS); + + CountDownLatch latch = new CountDownLatch(1); + Executor service = Executors.newSingleThreadExecutor(); + service.execute( + new Runnable() { + @Override + public void run() { + try { + future.get(); + state.waitForReplication(); + latch.countDown(); + } catch (Exception e) { + // fails the test because we don't countDown + } + } + }); + + // Cancel the replication task + waitUntil(() -> getProjectTasks().size() != 0); + WorkQueue.Task<?> task = getProjectTasks().get(0); + assertThat(task.getState()).isAnyOf(WorkQueue.Task.State.READY, WorkQueue.Task.State.SLEEPING); + task.cancel(false); + + // Confirm our waiting thread completed + boolean receivedSignal = latch.await(5, TimeUnit.SECONDS); // FIXME Choose a good timeout + assertThat(receivedSignal).isTrue(); + } + + private List<WorkQueue.Task<?>> getProjectTasks() { + return getInstance(WorkQueue.class).getTasks().stream() + .filter(t -> t instanceof WorkQueue.ProjectTask) + .collect(Collectors.toList()); + } + + @Test public void shouldReplicateHeadUpdate() throws Exception { setReplicationDestination("foo", "replica", ALL_PROJECTS); reloadConfig(); |