diff options
10 files changed, 409 insertions, 5 deletions
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java index 400d6ff..00a46de 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java @@ -774,6 +774,10 @@ public class Destination { return config.getSlowLatencyThreshold(); } + int getPushBatchSize() { + return config.getPushBatchSize(); + } + private static boolean matches(URIish uri, String urlMatch) { if (urlMatch == null || urlMatch.equals("") || urlMatch.equals("*")) { return true; diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationConfiguration.java b/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationConfiguration.java index 1b39374..a74d198 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationConfiguration.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationConfiguration.java @@ -14,10 +14,14 @@ package com.googlesource.gerrit.plugins.replication; +import static com.google.common.base.Suppliers.memoize; +import static com.googlesource.gerrit.plugins.replication.ReplicationQueue.repLog; + import com.google.common.base.MoreObjects; import com.google.common.collect.ImmutableList; import com.google.gerrit.server.config.ConfigUtil; import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; import org.eclipse.jgit.lib.Config; import org.eclipse.jgit.transport.RemoteConfig; @@ -45,6 +49,7 @@ public class DestinationConfiguration implements RemoteConfiguration { private final RemoteConfig remoteConfig; private final int maxRetries; private final int slowLatencyThreshold; + private final Supplier<Integer> pushBatchSize; protected DestinationConfiguration(RemoteConfig remoteConfig, Config cfg) { this.remoteConfig = remoteConfig; @@ -84,6 +89,31 @@ public class DestinationConfiguration implements RemoteConfiguration { "slowLatencyThreshold", DEFAULT_SLOW_LATENCY_THRESHOLD_SECS, TimeUnit.SECONDS); + + pushBatchSize = + memoize( + () -> { + int configuredBatchSize = + Math.max( + 0, + getInt( + remoteConfig, + cfg, + "pushBatchSize", + cfg.getInt("gerrit", "pushBatchSize", 0))); + if (configuredBatchSize > 0) { + int distributionInterval = cfg.getInt("replication", "distributionInterval", 0); + if (distributionInterval > 0) { + repLog.atWarning().log( + "Push in batches cannot be turned on for remote (%s) when 'Cluster" + + " Replication' (replication.distributionInterval) is configured", + name); + return 0; + } + return configuredBatchSize; + } + return 0; + }); } @Override @@ -173,4 +203,9 @@ public class DestinationConfiguration implements RemoteConfiguration { public int getSlowLatencyThreshold() { return slowLatencyThreshold; } + + @Override + public int getPushBatchSize() { + return pushBatchSize.get(); + } } diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java b/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java index 87c35ee..8b3c9e2 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java @@ -19,12 +19,14 @@ import static com.googlesource.gerrit.plugins.replication.ReplicationQueue.repLo import static java.util.concurrent.TimeUnit.NANOSECONDS; import static java.util.concurrent.TimeUnit.SECONDS; import static java.util.stream.Collectors.joining; +import static java.util.stream.Collectors.toList; import static java.util.stream.Collectors.toMap; import com.google.common.base.MoreObjects; import com.google.common.base.Throwables; import com.google.common.collect.LinkedListMultimap; import com.google.common.collect.ListMultimap; +import com.google.common.collect.Lists; import com.google.common.collect.Sets; import com.google.gerrit.entities.Project; import com.google.gerrit.entities.RefNames; @@ -564,7 +566,36 @@ class PushOne implements ProjectRunnable, CanceledWhileRunning, UriUpdates { lazy(() -> refUpdatesForLogging(todo.subList(0, replConfig.getMaxRefsToLog())))); } - return tn.push(NullProgressMonitor.INSTANCE, todo); + return pushInBatches(tn, todo); + } + + private PushResult pushInBatches(Transport tn, List<RemoteRefUpdate> todo) + throws NotSupportedException, TransportException { + int batchSize = pool.getPushBatchSize(); + if (batchSize == 0 || todo.size() <= batchSize) { + return tn.push(NullProgressMonitor.INSTANCE, todo); + } + + List<List<RemoteRefUpdate>> batches = Lists.partition(todo, batchSize); + repLog.atInfo().log("Push to %s in %d batches", uri, batches.size()); + AggregatedPushResult result = new AggregatedPushResult(); + int completedBatch = 1; + for (List<RemoteRefUpdate> batch : batches) { + repLog.atInfo().log( + "Pushing %d/%d batches for replication to %s", completedBatch, batches.size(), uri); + result.addResult(tn.push(NullProgressMonitor.INSTANCE, batch)); + + // check if push should be no longer continued + if (wasCanceled()) { + repLog.atInfo().log( + "Push for replication to %s was canceled after %d completed batch and thus won't be" + + " rescheduled", + uri, completedBatch); + break; + } + completedBatch++; + } + return result; } private static String refUpdatesForLogging(List<RemoteRefUpdate> refUpdates) { @@ -836,4 +867,24 @@ class PushOne implements ProjectRunnable, CanceledWhileRunning, UriUpdates { super(uri, message); } } + + /** + * Internal class used to aggregate PushResult objects from all push batches. See {@link + * PushOne#pushInBatches} for usage. + */ + private static class AggregatedPushResult extends PushResult { + private final List<PushResult> results = new ArrayList<>(); + + void addResult(PushResult result) { + results.add(result); + } + + @Override + public Collection<RemoteRefUpdate> getRemoteUpdates() { + return results.stream() + .map(PushResult::getRemoteUpdates) + .flatMap(Collection::stream) + .collect(toList()); + } + } } diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/RemoteConfiguration.java b/src/main/java/com/googlesource/gerrit/plugins/replication/RemoteConfiguration.java index b66e73c..5fe0323 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/RemoteConfiguration.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/RemoteConfiguration.java @@ -98,6 +98,13 @@ public interface RemoteConfiguration { int getSlowLatencyThreshold(); /** + * Returns the maximum number of refs that can be pushed in a single push operation. + * + * @return batch size, zero if unlimited. + */ + int getPushBatchSize(); + + /** * Whether the remote configuration is for a single project only * * @return true, when configuration is for a single project, false otherwise diff --git a/src/main/resources/Documentation/config.md b/src/main/resources/Documentation/config.md index 2d02022..5c8e2c7 100644 --- a/src/main/resources/Documentation/config.md +++ b/src/main/resources/Documentation/config.md @@ -148,6 +148,20 @@ gerrit.maxRefsToShow `(retry 1) push aaa.com:/git/test.git [refs/heads/b1 refs/heads/b2 (+2)]` +gerrit.pushBatchSize +: Max number of refs that are pushed in a single push operation. If more + than pushBatchSize are to be pushed then they are divided into batches + and pushed sequentially one-by-one. + + Can be overridden at remote-level by setting pushBatchSize. + + By default, `0`, which means that there are no limitations on number of + refs to be transferred in a single push operation. Note that negative + values are treated as `0`. + + Note that `pushBatchSize` is ignored when *Cluster Replication* is configured + - when `replication.distributionInterval` has value > 0. + gerrit.sshCommandTimeout : Timeout for SSH command execution. If 0, there is no timeout and the client waits indefinitely. By default, 0. @@ -551,6 +565,19 @@ remote.NAME.slowLatencyThreshold default: 15 minutes +remote.NAME.pushBatchSize +: Max number of refs that are pushed in a single push operation to this + destination. If more than `pushBatchSize` are to be pushed then they are + divided into batches and pushed sequentially one-by-one. + + By default it falls back to `gerrit.pushBatchSize` value (which is `0` if + not set, which means that there are no limitations on number of refs to + be transferred in a single push operation). Note that negative values are + treated as `0`. + + Note that `pushBatchSize` is ignored when *Cluster Replication* is configured + - when `replication.distributionInterval` has value > 0. + Directory `replication` -------------------- The optional directory `$site_path/etc/replication` contains Git-style diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/DestinationConfigurationTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/DestinationConfigurationTest.java new file mode 100644 index 0000000..646f915 --- /dev/null +++ b/src/test/java/com/googlesource/gerrit/plugins/replication/DestinationConfigurationTest.java @@ -0,0 +1,101 @@ +// Copyright (C) 2021 The Android Open Source Project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.googlesource.gerrit.plugins.replication; + +import static com.google.common.truth.Truth.assertThat; +import static org.mockito.Mockito.when; + +import org.eclipse.jgit.lib.Config; +import org.eclipse.jgit.transport.RemoteConfig; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class DestinationConfigurationTest { + private static final String REMOTE = "foo"; + + @Mock private RemoteConfig remoteConfigMock; + @Mock private Config cfgMock; + + private DestinationConfiguration objectUnderTest; + + @Before + public void setUp() { + when(remoteConfigMock.getName()).thenReturn(REMOTE); + when(cfgMock.getStringList(Mockito.anyString(), Mockito.anyString(), Mockito.anyString())) + .thenReturn(new String[] {}); + objectUnderTest = new DestinationConfiguration(remoteConfigMock, cfgMock); + } + + @Test + public void shouldIgnoreRemotePushBatchSizeWhenClusterReplicationIsConfigured() { + // given + when(cfgMock.getInt("remote", REMOTE, "pushBatchSize", 0)).thenReturn(1); + when(cfgMock.getInt("replication", "distributionInterval", 0)).thenReturn(1); + + // when + int actual = objectUnderTest.getPushBatchSize(); + + // then + assertThat(actual).isEqualTo(0); + } + + @Test + public void shouldIgnoreGlobalPushBatchSizeWhenClusterReplicationIsConfigured() { + // given + int globalPushBatchSize = 1; + when(cfgMock.getInt("gerrit", "pushBatchSize", 0)).thenReturn(globalPushBatchSize); + when(cfgMock.getInt("remote", REMOTE, "pushBatchSize", globalPushBatchSize)) + .thenReturn(globalPushBatchSize); + when(cfgMock.getInt("replication", "distributionInterval", 0)).thenReturn(1); + + // when + int actual = objectUnderTest.getPushBatchSize(); + + // then + assertThat(actual).isEqualTo(0); + } + + @Test + public void shouldReturnRemotePushBatchSizeWhenClusterReplicationIsNotConfigured() { + // given + when(cfgMock.getInt("remote", REMOTE, "pushBatchSize", 0)).thenReturn(1); + + // when + int actual = objectUnderTest.getPushBatchSize(); + + // then + assertThat(actual).isEqualTo(1); + } + + @Test + public void shouldReturnGlobalPushBatchSizeWhenClusterReplicationIsNotConfigured() { + // given + int globalPushBatchSize = 1; + when(cfgMock.getInt("gerrit", "pushBatchSize", 0)).thenReturn(globalPushBatchSize); + when(cfgMock.getInt("remote", REMOTE, "pushBatchSize", globalPushBatchSize)) + .thenReturn(globalPushBatchSize); + + // when + int actual = objectUnderTest.getPushBatchSize(); + + // then + assertThat(actual).isEqualTo(globalPushBatchSize); + } +} diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/PushOneTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/PushOneTest.java index 94f0dc4..bb3e886 100644 --- a/src/test/java/com/googlesource/gerrit/plugins/replication/PushOneTest.java +++ b/src/test/java/com/googlesource/gerrit/plugins/replication/PushOneTest.java @@ -17,8 +17,10 @@ package com.googlesource.gerrit.plugins.replication; import static org.eclipse.jgit.lib.Ref.Storage.NEW; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -36,6 +38,7 @@ import com.google.gerrit.server.project.ProjectState; import com.google.gerrit.server.util.IdGenerator; import java.io.File; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; @@ -115,7 +118,8 @@ public class PushOneTest { new ObjectIdRef.Unpeeled( NEW, "foo", ObjectId.fromString("0000000000000000000000000000000000000001")); - localRefs = Arrays.asList(newLocalRef); + localRefs = new ArrayList<>(); + localRefs.add(newLocalRef); Ref remoteRef = new ObjectIdRef.Unpeeled(NEW, "foo", ObjectId.zeroId()); remoteRefs = new HashMap<>(); @@ -223,6 +227,52 @@ public class PushOneTest { verify(transportMock, never()).push(any(), any()); } + @Test + public void shouldPushInSingleOperationWhenPushBatchSizeIsNotConfigured() + throws InterruptedException, IOException { + replicateTwoRefs(createPushOne(null)); + verify(transportMock).push(any(), any()); + } + + @Test + public void shouldPushInBatchesWhenPushBatchSizeIsConfigured() + throws InterruptedException, IOException { + when(destinationMock.getPushBatchSize()).thenReturn(1); + replicateTwoRefs(createPushOne(null)); + verify(transportMock, times(2)).push(any(), any()); + } + + @Test + public void shouldStopPushingInBatchesWhenPushOperationGetsCanceled() + throws InterruptedException, IOException { + when(destinationMock.getPushBatchSize()).thenReturn(1); + PushOne pushOne = createPushOne(null); + + // cancel replication during the first push + doAnswer( + invocation -> { + pushOne.setCanceledWhileRunning(); + return new PushResult(); + }) + .when(transportMock) + .push(any(), any()); + + replicateTwoRefs(pushOne); + verify(transportMock, times(1)).push(any(), any()); + } + + private void replicateTwoRefs(PushOne pushOne) throws InterruptedException { + ObjectIdRef barLocalRef = + new ObjectIdRef.Unpeeled( + NEW, "bar", ObjectId.fromString("0000000000000000000000000000000000000001")); + localRefs.add(barLocalRef); + + pushOne.addRef(PushOne.ALL_REFS); + pushOne.run(); + + isCallFinished.await(TEST_PUSH_TIMEOUT_SECS, TimeUnit.SECONDS); + } + private PushOne createPushOne(DynamicItem<ReplicationPushFilter> replicationPushFilter) { PushOne push = new PushOne( diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationDaemon.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationDaemon.java index 9606371..b6a3ed1 100644 --- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationDaemon.java +++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationDaemon.java @@ -91,6 +91,18 @@ public class ReplicationDaemon extends LightweightPluginDaemonTest { } protected void setReplicationDestination( + String remoteName, String replicaSuffix, Optional<String> project, Integer pushBatchSize) + throws IOException { + setReplicationDestination( + remoteName, + Arrays.asList(replicaSuffix), + project, + TEST_REPLICATION_DELAY_SECONDS, + false, + Optional.ofNullable(pushBatchSize)); + } + + protected void setReplicationDestination( String remoteName, String replicaSuffix, Optional<String> project, boolean mirror) throws IOException { setReplicationDestination( @@ -125,13 +137,37 @@ public class ReplicationDaemon extends LightweightPluginDaemonTest { protected void setReplicationDestination( String remoteName, + String replicaSuffix, + Optional<String> project, + int replicationDelay, + boolean mirror, + Optional<Integer> pushBatchSize) + throws IOException { + setReplicationDestination( + remoteName, Arrays.asList(replicaSuffix), project, replicationDelay, mirror, pushBatchSize); + } + + protected void setReplicationDestination( + String remoteName, List<String> replicaSuffixes, Optional<String> project, int replicationDelay, boolean mirror) throws IOException { setReplicationDestination( - config, remoteName, replicaSuffixes, project, replicationDelay, mirror); + remoteName, replicaSuffixes, project, replicationDelay, mirror, Optional.empty()); + } + + protected void setReplicationDestination( + String remoteName, + List<String> replicaSuffixes, + Optional<String> project, + int replicationDelay, + boolean mirror, + Optional<Integer> pushBatchSize) + throws IOException { + setReplicationDestination( + config, remoteName, replicaSuffixes, project, replicationDelay, mirror, pushBatchSize); config.setBoolean("gerrit", null, "autoReload", true); config.save(); } @@ -142,7 +178,8 @@ public class ReplicationDaemon extends LightweightPluginDaemonTest { Optional<String> project, int replicationDelay) throws IOException { - setReplicationDestination(config, null, replicaSuffixes, project, replicationDelay, false); + setReplicationDestination( + config, null, replicaSuffixes, project, replicationDelay, false, Optional.empty()); } protected void setReplicationDestination( @@ -151,7 +188,8 @@ public class ReplicationDaemon extends LightweightPluginDaemonTest { List<String> replicaSuffixes, Optional<String> project, int replicationDelay, - boolean mirror) + boolean mirror, + Optional<Integer> pushBatchSize) throws IOException { List<String> replicaUrls = @@ -163,6 +201,7 @@ public class ReplicationDaemon extends LightweightPluginDaemonTest { remoteConfig.setInt("remote", remoteName, "replicationRetry", TEST_REPLICATION_RETRY_MINUTES); remoteConfig.setBoolean("remote", remoteName, "mirror", mirror); project.ifPresent(prj -> remoteConfig.setString("remote", remoteName, "projects", prj)); + pushBatchSize.ifPresent(pbs -> remoteConfig.setInt("remote", remoteName, "pushBatchSize", pbs)); remoteConfig.save(); } diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java index a174e91..33bd91d 100644 --- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java +++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java @@ -348,6 +348,28 @@ public class ReplicationIT extends ReplicationDaemon { } } + @Test + public void shouldReplicateWithPushBatchSizeSetForRemote() throws Exception { + Project.NameKey targetProject = createTestProject(project + "replica"); + + setReplicationDestination("foo", "replica", ALL_PROJECTS, 1); + reloadConfig(); + + // creating a change results in 2 refs creation therefore it already qualifies for push in two + // batches of size 1 each + Result pushResult = createChange(); + RevCommit sourceCommit = pushResult.getCommit(); + String sourceRef = pushResult.getPatchSet().refName(); + + try (Repository repo = repoManager.openRepository(targetProject)) { + WaitUtil.waitUntil(() -> checkedGetRef(repo, sourceRef) != null, Duration.ofSeconds(60)); + + Ref targetBranchRef = getRef(repo, sourceRef); + assertThat(targetBranchRef).isNotNull(); + assertThat(targetBranchRef.getObjectId()).isEqualTo(sourceCommit.getId()); + } + } + private void waitUntil(Supplier<Boolean> waitCondition) throws InterruptedException { WaitUtil.waitUntil(waitCondition, TEST_TIMEOUT); } diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationPushInBatchesIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationPushInBatchesIT.java new file mode 100644 index 0000000..cf8dbe3 --- /dev/null +++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationPushInBatchesIT.java @@ -0,0 +1,68 @@ +// Copyright (C) 2021 The Android Open Source Project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.googlesource.gerrit.plugins.replication; + +import static com.google.common.truth.Truth.assertThat; + +import com.google.gerrit.acceptance.PushOneCommit.Result; +import com.google.gerrit.acceptance.TestPlugin; +import com.google.gerrit.acceptance.WaitUtil; +import com.google.gerrit.entities.Project; +import java.time.Duration; +import java.util.Optional; +import org.eclipse.jgit.lib.Ref; +import org.eclipse.jgit.lib.Repository; +import org.eclipse.jgit.revwalk.RevCommit; +import org.junit.Test; + +@TestPlugin( + name = "replication", + sysModule = "com.googlesource.gerrit.plugins.replication.ReplicationModule") +public class ReplicationPushInBatchesIT extends ReplicationDaemon { + + @Override + public void setUpTestPlugin() throws Exception { + initConfig(); + config.setInt("gerrit", null, "pushBatchSize", 1); + config.save(); + setReplicationDestination( + "remote1", + "suffix1", + Optional.of("not-used-project")); // Simulates a full replication.config initialization + super.setUpTestPlugin(); + } + + @Test + public void shouldReplicateWithPushBatchSizeSetGlobaly() throws Exception { + Project.NameKey targetProject = createTestProject(project + "replica"); + + setReplicationDestination("foo", "replica", ALL_PROJECTS); + reloadConfig(); + + // creating a change results in 2 refs creation therefore it already qualifies for push in two + // batches of size 1 each + Result pushResult = createChange(); + RevCommit sourceCommit = pushResult.getCommit(); + String sourceRef = pushResult.getPatchSet().refName(); + + try (Repository repo = repoManager.openRepository(targetProject)) { + WaitUtil.waitUntil(() -> checkedGetRef(repo, sourceRef) != null, Duration.ofSeconds(60)); + + Ref targetBranchRef = getRef(repo, sourceRef); + assertThat(targetBranchRef).isNotNull(); + assertThat(targetBranchRef.getObjectId()).isEqualTo(sourceCommit.getId()); + } + } +} |