diff options
-rw-r--r-- | src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java | 22 | ||||
-rw-r--r-- | src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationQueueIT.java | 134 |
2 files changed, 155 insertions, 1 deletions
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java index 30aff44..4c7bdfc 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java @@ -15,6 +15,7 @@ package com.googlesource.gerrit.plugins.replication; import com.google.common.base.Strings; +import com.google.common.collect.Queues; import com.google.gerrit.common.EventDispatcher; import com.google.gerrit.extensions.events.GitReferenceUpdatedListener; import com.google.gerrit.extensions.events.HeadUpdatedListener; @@ -33,6 +34,7 @@ import java.io.OutputStream; import java.net.URISyntaxException; import java.util.Collections; import java.util.HashSet; +import java.util.Queue; import java.util.Set; import org.eclipse.jgit.internal.storage.file.FileRepository; import org.eclipse.jgit.lib.Constants; @@ -73,6 +75,7 @@ public class ReplicationQueue private final ReplicationConfig config; private final GerritSshApi gerritAdmin; private volatile boolean running; + private final Queue<GitReferenceUpdatedListener.Event> beforeStartupEventsQueue; @Inject ReplicationQueue( @@ -88,12 +91,14 @@ public class ReplicationQueue config = rc; stateLog = sl; gerritAdmin = ga; + beforeStartupEventsQueue = Queues.newConcurrentLinkedQueue(); } @Override public void start() { config.startup(workQueue); running = true; + fireBeforeStartupEvents(); } @Override @@ -129,7 +134,10 @@ public class ReplicationQueue public void onGitReferenceUpdated(GitReferenceUpdatedListener.Event event) { ReplicationState state = new ReplicationState(new GitUpdateProcessing(dispatcher.get())); if (!running) { - stateLog.warn("Replication plugin did not finish startup before event", state); + stateLog.warn( + "Replication plugin did not finish startup before event, event replication is postponed", + state); + beforeStartupEventsQueue.add(event); return; } @@ -168,6 +176,18 @@ public class ReplicationQueue } } + private void fireBeforeStartupEvents() { + Set<String> eventsReplayed = new HashSet<>(); + for (GitReferenceUpdatedListener.Event event : beforeStartupEventsQueue) { + String eventKey = String.format("%s:%s", event.getProjectName(), event.getRefName()); + if (!eventsReplayed.contains(eventKey)) { + repLog.info("Firing pending task {}", event); + onGitReferenceUpdated(event); + eventsReplayed.add(eventKey); + } + } + } + private Set<URIish> getURIs(Project.NameKey projectName, FilterType filterType) { if (config.getDestinations(filterType).isEmpty()) { return Collections.emptySet(); diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationQueueIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationQueueIT.java new file mode 100644 index 0000000..881a282 --- /dev/null +++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationQueueIT.java @@ -0,0 +1,134 @@ +// Copyright (C) 2019 The Android Open Source Project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.googlesource.gerrit.plugins.replication; + +import static com.google.common.truth.Truth.assertThat; +import static java.util.stream.Collectors.toList; + +import com.google.common.base.Stopwatch; +import com.google.gerrit.acceptance.LightweightPluginDaemonTest; +import com.google.gerrit.acceptance.PushOneCommit.Result; +import com.google.gerrit.acceptance.TestPlugin; +import com.google.gerrit.acceptance.UseLocalDisk; +import com.google.gerrit.reviewdb.client.Project; +import com.google.gerrit.server.config.SitePaths; +import com.google.inject.Inject; +import java.io.IOException; +import java.nio.file.Path; +import java.time.Duration; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; +import org.eclipse.jgit.lib.Ref; +import org.eclipse.jgit.lib.Repository; +import org.eclipse.jgit.revwalk.RevCommit; +import org.eclipse.jgit.storage.file.FileBasedConfig; +import org.eclipse.jgit.util.FS; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@UseLocalDisk +@TestPlugin( + name = "replication", + sysModule = "com.googlesource.gerrit.plugins.replication.ReplicationModule") +public class ReplicationQueueIT extends LightweightPluginDaemonTest { + private static final Logger logger = LoggerFactory.getLogger(ReplicationQueueIT.class); + + private static final int TEST_REPLICATION_DELAY = 1; + private static final Duration TEST_TIMEOUT = Duration.ofSeconds(TEST_REPLICATION_DELAY * 2); + + @Inject private SitePaths sitePaths; + private Path gitPath; + private FileBasedConfig config; + + @Override + public void setUp() throws Exception { + gitPath = sitePaths.site_path.resolve("git"); + config = + new FileBasedConfig(sitePaths.etc_dir.resolve("replication.config").toFile(), FS.DETECTED); + + setReplicationDestination("foo", "replica"); + super.setUp(); + } + + @Test + public void shouldNotDropEventsWhenStarting() throws Exception { + Project.NameKey targetProject = createProject("projectreplica"); + + replicationQueueStop(); + Result pushResult = createChange(); + replicationQueueStart(); + + RevCommit sourceCommit = pushResult.getCommit(); + String sourceRef = pushResult.getPatchSet().getRefName(); + + try (Repository repo = repoManager.openRepository(targetProject)) { + waitUntil(() -> checkedGetRef(repo, sourceRef) != null); + Ref targetBranchRef = getRef(repo, sourceRef); + assertThat(targetBranchRef).isNotNull(); + assertThat(targetBranchRef.getObjectId()).isEqualTo(sourceCommit.getId()); + } + } + + private Ref getRef(Repository repo, String branchName) throws IOException { + return repo.getRefDatabase().exactRef(branchName); + } + + private Ref checkedGetRef(Repository repo, String branchName) { + try { + return repo.getRefDatabase().exactRef(branchName); + } catch (Exception e) { + logger.error("failed to get ref %s in repo %s", branchName, repo); + return null; + } + } + + private void setReplicationDestination(String remoteName, String replicaSuffix) + throws IOException { + setReplicationDestination(remoteName, Arrays.asList(replicaSuffix)); + } + + private void setReplicationDestination(String remoteName, List<String> replicaSuffixes) + throws IOException { + + List<String> replicaUrls = + replicaSuffixes.stream() + .map(suffix -> gitPath.resolve("${name}" + suffix + ".git").toString()) + .collect(toList()); + config.setStringList("remote", remoteName, "url", replicaUrls); + config.setInt("remote", remoteName, "replicationDelay", TEST_REPLICATION_DELAY); + config.save(); + } + + private void waitUntil(Supplier<Boolean> waitCondition) throws InterruptedException { + Stopwatch stopwatch = Stopwatch.createStarted(); + while (!waitCondition.get()) { + if (stopwatch.elapsed().compareTo(TEST_TIMEOUT) > 0) { + throw new InterruptedException(); + } + TimeUnit.MILLISECONDS.sleep(50); + } + } + + private void replicationQueueStart() { + plugin.getSysInjector().getInstance(ReplicationQueue.class).start(); + } + + private void replicationQueueStop() { + plugin.getSysInjector().getInstance(ReplicationQueue.class).stop(); + } +} |