summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java22
-rw-r--r--src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationQueueIT.java134
2 files changed, 155 insertions, 1 deletions
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
index 30aff44..4c7bdfc 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
@@ -15,6 +15,7 @@
package com.googlesource.gerrit.plugins.replication;
import com.google.common.base.Strings;
+import com.google.common.collect.Queues;
import com.google.gerrit.common.EventDispatcher;
import com.google.gerrit.extensions.events.GitReferenceUpdatedListener;
import com.google.gerrit.extensions.events.HeadUpdatedListener;
@@ -33,6 +34,7 @@ import java.io.OutputStream;
import java.net.URISyntaxException;
import java.util.Collections;
import java.util.HashSet;
+import java.util.Queue;
import java.util.Set;
import org.eclipse.jgit.internal.storage.file.FileRepository;
import org.eclipse.jgit.lib.Constants;
@@ -73,6 +75,7 @@ public class ReplicationQueue
private final ReplicationConfig config;
private final GerritSshApi gerritAdmin;
private volatile boolean running;
+ private final Queue<GitReferenceUpdatedListener.Event> beforeStartupEventsQueue;
@Inject
ReplicationQueue(
@@ -88,12 +91,14 @@ public class ReplicationQueue
config = rc;
stateLog = sl;
gerritAdmin = ga;
+ beforeStartupEventsQueue = Queues.newConcurrentLinkedQueue();
}
@Override
public void start() {
config.startup(workQueue);
running = true;
+ fireBeforeStartupEvents();
}
@Override
@@ -129,7 +134,10 @@ public class ReplicationQueue
public void onGitReferenceUpdated(GitReferenceUpdatedListener.Event event) {
ReplicationState state = new ReplicationState(new GitUpdateProcessing(dispatcher.get()));
if (!running) {
- stateLog.warn("Replication plugin did not finish startup before event", state);
+ stateLog.warn(
+ "Replication plugin did not finish startup before event, event replication is postponed",
+ state);
+ beforeStartupEventsQueue.add(event);
return;
}
@@ -168,6 +176,18 @@ public class ReplicationQueue
}
}
+ private void fireBeforeStartupEvents() {
+ Set<String> eventsReplayed = new HashSet<>();
+ for (GitReferenceUpdatedListener.Event event : beforeStartupEventsQueue) {
+ String eventKey = String.format("%s:%s", event.getProjectName(), event.getRefName());
+ if (!eventsReplayed.contains(eventKey)) {
+ repLog.info("Firing pending task {}", event);
+ onGitReferenceUpdated(event);
+ eventsReplayed.add(eventKey);
+ }
+ }
+ }
+
private Set<URIish> getURIs(Project.NameKey projectName, FilterType filterType) {
if (config.getDestinations(filterType).isEmpty()) {
return Collections.emptySet();
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationQueueIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationQueueIT.java
new file mode 100644
index 0000000..881a282
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationQueueIT.java
@@ -0,0 +1,134 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+import static com.google.common.truth.Truth.assertThat;
+import static java.util.stream.Collectors.toList;
+
+import com.google.common.base.Stopwatch;
+import com.google.gerrit.acceptance.LightweightPluginDaemonTest;
+import com.google.gerrit.acceptance.PushOneCommit.Result;
+import com.google.gerrit.acceptance.TestPlugin;
+import com.google.gerrit.acceptance.UseLocalDisk;
+import com.google.gerrit.reviewdb.client.Project;
+import com.google.gerrit.server.config.SitePaths;
+import com.google.inject.Inject;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+import org.eclipse.jgit.lib.Ref;
+import org.eclipse.jgit.lib.Repository;
+import org.eclipse.jgit.revwalk.RevCommit;
+import org.eclipse.jgit.storage.file.FileBasedConfig;
+import org.eclipse.jgit.util.FS;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@UseLocalDisk
+@TestPlugin(
+ name = "replication",
+ sysModule = "com.googlesource.gerrit.plugins.replication.ReplicationModule")
+public class ReplicationQueueIT extends LightweightPluginDaemonTest {
+ private static final Logger logger = LoggerFactory.getLogger(ReplicationQueueIT.class);
+
+ private static final int TEST_REPLICATION_DELAY = 1;
+ private static final Duration TEST_TIMEOUT = Duration.ofSeconds(TEST_REPLICATION_DELAY * 2);
+
+ @Inject private SitePaths sitePaths;
+ private Path gitPath;
+ private FileBasedConfig config;
+
+ @Override
+ public void setUp() throws Exception {
+ gitPath = sitePaths.site_path.resolve("git");
+ config =
+ new FileBasedConfig(sitePaths.etc_dir.resolve("replication.config").toFile(), FS.DETECTED);
+
+ setReplicationDestination("foo", "replica");
+ super.setUp();
+ }
+
+ @Test
+ public void shouldNotDropEventsWhenStarting() throws Exception {
+ Project.NameKey targetProject = createProject("projectreplica");
+
+ replicationQueueStop();
+ Result pushResult = createChange();
+ replicationQueueStart();
+
+ RevCommit sourceCommit = pushResult.getCommit();
+ String sourceRef = pushResult.getPatchSet().getRefName();
+
+ try (Repository repo = repoManager.openRepository(targetProject)) {
+ waitUntil(() -> checkedGetRef(repo, sourceRef) != null);
+ Ref targetBranchRef = getRef(repo, sourceRef);
+ assertThat(targetBranchRef).isNotNull();
+ assertThat(targetBranchRef.getObjectId()).isEqualTo(sourceCommit.getId());
+ }
+ }
+
+ private Ref getRef(Repository repo, String branchName) throws IOException {
+ return repo.getRefDatabase().exactRef(branchName);
+ }
+
+ private Ref checkedGetRef(Repository repo, String branchName) {
+ try {
+ return repo.getRefDatabase().exactRef(branchName);
+ } catch (Exception e) {
+ logger.error("failed to get ref %s in repo %s", branchName, repo);
+ return null;
+ }
+ }
+
+ private void setReplicationDestination(String remoteName, String replicaSuffix)
+ throws IOException {
+ setReplicationDestination(remoteName, Arrays.asList(replicaSuffix));
+ }
+
+ private void setReplicationDestination(String remoteName, List<String> replicaSuffixes)
+ throws IOException {
+
+ List<String> replicaUrls =
+ replicaSuffixes.stream()
+ .map(suffix -> gitPath.resolve("${name}" + suffix + ".git").toString())
+ .collect(toList());
+ config.setStringList("remote", remoteName, "url", replicaUrls);
+ config.setInt("remote", remoteName, "replicationDelay", TEST_REPLICATION_DELAY);
+ config.save();
+ }
+
+ private void waitUntil(Supplier<Boolean> waitCondition) throws InterruptedException {
+ Stopwatch stopwatch = Stopwatch.createStarted();
+ while (!waitCondition.get()) {
+ if (stopwatch.elapsed().compareTo(TEST_TIMEOUT) > 0) {
+ throw new InterruptedException();
+ }
+ TimeUnit.MILLISECONDS.sleep(50);
+ }
+ }
+
+ private void replicationQueueStart() {
+ plugin.getSysInjector().getInstance(ReplicationQueue.class).start();
+ }
+
+ private void replicationQueueStop() {
+ plugin.getSysInjector().getInstance(ReplicationQueue.class).stop();
+ }
+}