diff options
51 files changed, 2910 insertions, 1242 deletions
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/AdminApi.java b/src/main/java/com/googlesource/gerrit/plugins/replication/AdminApi.java index acbf763..f3d2103 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/AdminApi.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/AdminApi.java @@ -14,7 +14,7 @@ package com.googlesource.gerrit.plugins.replication; -import com.google.gerrit.reviewdb.client.Project; +import com.google.gerrit.entities.Project; public interface AdminApi { public boolean createProject(Project.NameKey project, String head); diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadConfigDecorator.java b/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadConfigDecorator.java index a43d7d9..fe5dbad 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadConfigDecorator.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadConfigDecorator.java @@ -14,133 +14,46 @@ package com.googlesource.gerrit.plugins.replication; import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.Multimap; -import com.google.common.flogger.FluentLogger; -import com.google.gerrit.common.FileUtil; -import com.google.gerrit.extensions.annotations.PluginData; +import com.google.common.eventbus.EventBus; +import com.google.common.eventbus.Subscribe; import com.google.gerrit.extensions.annotations.PluginName; -import com.google.gerrit.reviewdb.client.Project; -import com.google.gerrit.server.config.SitePaths; +import com.google.gerrit.extensions.events.LifecycleListener; import com.google.gerrit.server.git.WorkQueue; import com.google.inject.Inject; -import com.google.inject.Provider; import com.google.inject.Singleton; -import java.io.IOException; import java.nio.file.Path; -import java.util.List; -import java.util.Optional; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; -import org.eclipse.jgit.errors.ConfigInvalidException; -import org.eclipse.jgit.transport.URIish; +import org.eclipse.jgit.lib.Config; @Singleton -public class AutoReloadConfigDecorator implements ReplicationConfig { - private static final FluentLogger logger = FluentLogger.forEnclosingClass(); +public class AutoReloadConfigDecorator implements ReplicationConfig, LifecycleListener { private static final long RELOAD_DELAY = 120; private static final long RELOAD_INTERVAL = 60; - private volatile ReplicationFileBasedConfig currentConfig; - private long currentConfigTs; - private long lastFailedConfigTs; + private volatile ReplicationConfig currentConfig; - private final SitePaths site; - private final Destination.Factory destinationFactory; - private final Path pluginDataDir; - // Use Provider<> instead of injecting the ReplicationQueue because of circular dependency with - // ReplicationConfig - private final Provider<ReplicationQueue> replicationQueue; private final ScheduledExecutorService autoReloadExecutor; private ScheduledFuture<?> autoReloadRunnable; - - private volatile boolean shuttingDown; + private final AutoReloadRunnable reloadRunner; @Inject public AutoReloadConfigDecorator( - SitePaths site, - Destination.Factory destinationFactory, - Provider<ReplicationQueue> replicationQueue, - @PluginData Path pluginDataDir, @PluginName String pluginName, - WorkQueue workQueue) - throws ConfigInvalidException, IOException { - this.site = site; - this.destinationFactory = destinationFactory; - this.pluginDataDir = pluginDataDir; - this.currentConfig = loadConfig(); - this.currentConfigTs = getLastModified(currentConfig); - this.replicationQueue = replicationQueue; + WorkQueue workQueue, + @MainReplicationConfig ReplicationConfig replicationConfig, + AutoReloadRunnable reloadRunner, + EventBus eventBus) { + this.currentConfig = replicationConfig; this.autoReloadExecutor = workQueue.createQueue(1, pluginName + "_auto-reload-config"); - } - - private static long getLastModified(ReplicationFileBasedConfig cfg) { - return FileUtil.lastModified(cfg.getCfgPath()); - } - - private ReplicationFileBasedConfig loadConfig() throws ConfigInvalidException, IOException { - return new ReplicationFileBasedConfig(site, destinationFactory, pluginDataDir); - } - - private synchronized boolean isAutoReload() { - return currentConfig.getConfig().getBoolean("gerrit", "autoReload", false); - } - - @Override - public List<Destination> getDestinations(URIish uri, Project.NameKey project, String ref) { - reloadIfNeeded(); - return currentConfig.getDestinations(uri, project, ref); - } - - @Override - public synchronized List<Destination> getDestinations(FilterType filterType) { - return currentConfig.getDestinations(filterType); - } - - @Override - public synchronized Multimap<Destination, URIish> getURIs( - Optional<String> remoteName, Project.NameKey projectName, FilterType filterType) { - return currentConfig.getURIs(remoteName, projectName, filterType); - } - - private synchronized void reloadIfNeeded() { - reload(false); + this.reloadRunner = reloadRunner; + eventBus.register(this); } @VisibleForTesting - public void forceReload() { - reload(true); - } - - private void reload(boolean force) { - if (force || isAutoReload()) { - ReplicationQueue queue = replicationQueue.get(); - - long lastModified = getLastModified(currentConfig); - try { - if (force - || (!shuttingDown - && lastModified > currentConfigTs - && lastModified > lastFailedConfigTs - && queue.isRunning() - && !queue.isReplaying())) { - queue.stop(); - currentConfig = loadConfig(); - currentConfigTs = lastModified; - lastFailedConfigTs = 0; - logger.atInfo().log( - "Configuration reloaded: %d destinations", - currentConfig.getDestinations(FilterType.ALL).size()); - } - } catch (Exception e) { - logger.atSevere().withCause(e).log( - "Cannot reload replication configuration: keeping existing settings"); - lastFailedConfigTs = lastModified; - return; - } finally { - queue.start(); - } - } + public void reload() { + reloadRunner.reload(); } @Override @@ -159,45 +72,36 @@ public class AutoReloadConfigDecorator implements ReplicationConfig { } @Override - public synchronized boolean isEmpty() { - return currentConfig.isEmpty(); + public Path getEventsDirectory() { + return currentConfig.getEventsDirectory(); } @Override - public Path getEventsDirectory() { - return currentConfig.getEventsDirectory(); + public synchronized void start() { + autoReloadRunnable = + autoReloadExecutor.scheduleAtFixedRate( + reloadRunner, RELOAD_DELAY, RELOAD_INTERVAL, TimeUnit.SECONDS); } - /* shutdown() cannot be set as a synchronized method because - * it may need to wait for pending events to complete; - * e.g. when enabling the drain of replication events before - * shutdown. - * - * As a rule of thumb for synchronized methods, because they - * implicitly define a critical section and associated lock, - * they should never hold waiting for another resource, otherwise - * the risk of deadlock is very high. - * - * See more background about deadlocks, what they are and how to - * prevent them at: https://en.wikipedia.org/wiki/Deadlock - */ @Override - public int shutdown() { - this.shuttingDown = true; + public synchronized void stop() { if (autoReloadRunnable != null) { - autoReloadRunnable.cancel(false); + if (!autoReloadRunnable.cancel(true)) { + throw new IllegalStateException( + "Unable to cancel replication reload task: cannot guarantee orderly shutdown"); + } autoReloadRunnable = null; } - return currentConfig.shutdown(); } @Override - public synchronized void startup(WorkQueue workQueue) { - shuttingDown = false; - currentConfig.startup(workQueue); - autoReloadRunnable = - autoReloadExecutor.scheduleAtFixedRate( - this::reloadIfNeeded, RELOAD_DELAY, RELOAD_INTERVAL, TimeUnit.SECONDS); + public String getVersion() { + return currentConfig.getVersion(); + } + + @Subscribe + public void onReload(ReplicationConfig newConfig) { + currentConfig = newConfig; } @Override @@ -209,4 +113,9 @@ public class AutoReloadConfigDecorator implements ReplicationConfig { public synchronized int getSshCommandTimeout() { return currentConfig.getSshCommandTimeout(); } + + @Override + public Config getConfig() { + return currentConfig.getConfig(); + } } diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadRunnable.java b/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadRunnable.java new file mode 100644 index 0000000..71f7c67 --- /dev/null +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadRunnable.java @@ -0,0 +1,79 @@ +// Copyright (C) 2019 The Android Open Source Project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.googlesource.gerrit.plugins.replication; + +import com.google.common.eventbus.EventBus; +import com.google.common.flogger.FluentLogger; +import com.google.inject.Inject; +import com.google.inject.Provider; +import java.util.List; + +public class AutoReloadRunnable implements Runnable { + private static final FluentLogger logger = FluentLogger.forEnclosingClass(); + + private final EventBus eventBus; + private final Provider<ObservableQueue> queueObserverProvider; + private final ConfigParser configParser; + private ReplicationConfig loadedConfig; + private Provider<ReplicationConfig> replicationConfigProvider; + private String loadedConfigVersion; + private String lastFailedConfigVersion; + + @Inject + public AutoReloadRunnable( + ConfigParser configParser, + @MainReplicationConfig Provider<ReplicationConfig> replicationConfigProvider, + EventBus eventBus, + Provider<ObservableQueue> queueObserverProvider) { + this.replicationConfigProvider = replicationConfigProvider; + this.loadedConfig = replicationConfigProvider.get(); + this.loadedConfigVersion = loadedConfig.getVersion(); + this.lastFailedConfigVersion = ""; + this.eventBus = eventBus; + this.queueObserverProvider = queueObserverProvider; + this.configParser = configParser; + } + + @Override + public synchronized void run() { + String pendingConfigVersion = loadedConfig.getVersion(); + ObservableQueue queue = queueObserverProvider.get(); + if (pendingConfigVersion.equals(loadedConfigVersion) + || pendingConfigVersion.equals(lastFailedConfigVersion) + || !queue.isRunning() + || queue.isReplaying()) { + return; + } + + reload(); + } + + synchronized void reload() { + String pendingConfigVersion = loadedConfig.getVersion(); + try { + ReplicationConfig newConfig = replicationConfigProvider.get(); + final List<RemoteConfiguration> newValidDestinations = + configParser.parseRemotes(newConfig.getConfig()); + loadedConfig = newConfig; + loadedConfigVersion = newConfig.getVersion(); + lastFailedConfigVersion = ""; + eventBus.post(newValidDestinations); + } catch (Exception e) { + logger.atSevere().withCause(e).log( + "Cannot reload replication configuration: keeping existing settings"); + lastFailedConfigVersion = pendingConfigVersion; + } + } +} diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadSecureCredentialsFactoryDecorator.java b/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadSecureCredentialsFactoryDecorator.java index 98f364d..5bae0af 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadSecureCredentialsFactoryDecorator.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadSecureCredentialsFactoryDecorator.java @@ -31,11 +31,10 @@ public class AutoReloadSecureCredentialsFactoryDecorator implements CredentialsF private final AtomicReference<SecureCredentialsFactory> secureCredentialsFactory; private volatile long secureCredentialsFactoryLoadTs; private final SitePaths site; - private ReplicationFileBasedConfig config; + private ReplicationConfig config; @Inject - public AutoReloadSecureCredentialsFactoryDecorator( - SitePaths site, ReplicationFileBasedConfig config) + public AutoReloadSecureCredentialsFactoryDecorator(SitePaths site, ReplicationConfig config) throws ConfigInvalidException, IOException { this.site = site; this.config = config; diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ConfigParser.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ConfigParser.java new file mode 100644 index 0000000..29ea706 --- /dev/null +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ConfigParser.java @@ -0,0 +1,32 @@ +// Copyright (C) 2020 The Android Open Source Project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.googlesource.gerrit.plugins.replication; + +import java.util.List; +import org.eclipse.jgit.errors.ConfigInvalidException; +import org.eclipse.jgit.lib.Config; + +/** Parser for parsing {@link Config} to a collection of {@link RemoteConfiguration} objects */ +public interface ConfigParser { + + /** + * parse the new replication config + * + * @param config new configuration to parse + * @return List of parsed {@link RemoteConfiguration} + * @throws ConfigInvalidException if the new configuration is not valid. + */ + List<RemoteConfiguration> parseRemotes(Config config) throws ConfigInvalidException; +} diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/CreateProjectTask.java b/src/main/java/com/googlesource/gerrit/plugins/replication/CreateProjectTask.java index a8dede3..fa26e82 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/CreateProjectTask.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/CreateProjectTask.java @@ -16,8 +16,8 @@ package com.googlesource.gerrit.plugins.replication; import static com.googlesource.gerrit.plugins.replication.ReplicationQueue.repLog; +import com.google.gerrit.entities.Project; import com.google.gerrit.extensions.registration.DynamicItem; -import com.google.gerrit.reviewdb.client.Project; import com.google.inject.Inject; import com.google.inject.assistedinject.Assisted; import com.googlesource.gerrit.plugins.replication.ReplicationConfig.FilterType; @@ -31,7 +31,7 @@ public class CreateProjectTask { } private final RemoteConfig config; - private final ReplicationConfig replicationConfig; + private final DestinationsCollection destinations; private final DynamicItem<AdminApiFactory> adminApiFactory; private final Project.NameKey project; private final String head; @@ -39,21 +39,20 @@ public class CreateProjectTask { @Inject CreateProjectTask( RemoteConfig config, - ReplicationConfig replicationConfig, + DestinationsCollection destinations, DynamicItem<AdminApiFactory> adminApiFactory, @Assisted Project.NameKey project, @Assisted String head) { this.config = config; - this.replicationConfig = replicationConfig; + this.destinations = destinations; this.adminApiFactory = adminApiFactory; this.project = project; this.head = head; } public boolean create() { - return replicationConfig - .getURIs(Optional.of(config.getName()), project, FilterType.PROJECT_CREATION).values() - .stream() + return destinations.getURIs(Optional.of(config.getName()), project, FilterType.PROJECT_CREATION) + .values().stream() .map(u -> createProject(u, project, head)) .reduce(true, (a, b) -> a && b); } diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/DeleteProjectTask.java b/src/main/java/com/googlesource/gerrit/plugins/replication/DeleteProjectTask.java index f9b2ad7..4617672 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/DeleteProjectTask.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/DeleteProjectTask.java @@ -16,8 +16,8 @@ package com.googlesource.gerrit.plugins.replication; import static com.googlesource.gerrit.plugins.replication.ReplicationQueue.repLog; +import com.google.gerrit.entities.Project; import com.google.gerrit.extensions.registration.DynamicItem; -import com.google.gerrit.reviewdb.client.Project; import com.google.gerrit.server.ioutil.HexFormat; import com.google.gerrit.server.util.IdGenerator; import com.google.inject.Inject; diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java index 3b8208b..35470eb 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java @@ -26,13 +26,13 @@ import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSet.Builder; import com.google.common.collect.Lists; import com.google.gerrit.common.data.GroupReference; +import com.google.gerrit.entities.AccountGroup; +import com.google.gerrit.entities.BranchNameKey; +import com.google.gerrit.entities.Project; +import com.google.gerrit.entities.RefNames; import com.google.gerrit.extensions.config.FactoryModule; import com.google.gerrit.extensions.registration.DynamicItem; import com.google.gerrit.extensions.restapi.AuthException; -import com.google.gerrit.reviewdb.client.AccountGroup; -import com.google.gerrit.reviewdb.client.Branch; -import com.google.gerrit.reviewdb.client.Project; -import com.google.gerrit.reviewdb.client.RefNames; import com.google.gerrit.server.CurrentUser; import com.google.gerrit.server.PluginUser; import com.google.gerrit.server.account.GroupBackend; @@ -60,7 +60,6 @@ import com.google.inject.assistedinject.Assisted; import com.google.inject.assistedinject.FactoryModuleBuilder; import com.google.inject.servlet.RequestScoped; import com.googlesource.gerrit.plugins.replication.ReplicationState.RefPushResult; -import com.googlesource.gerrit.plugins.replication.ReplicationTasksStorage.ReplicateRefUpdate; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.net.URLEncoder; @@ -554,6 +553,7 @@ public class Destination { postReplicationFailedEvent(pushOp, status); if (pushOp.setToRetry()) { postReplicationScheduledEvent(pushOp); + replicationTasksStorage.get().reset(pushOp); @SuppressWarnings("unused") ScheduledFuture<?> ignored2 = pool.schedule(pushOp, config.getRetryDelay(), TimeUnit.MINUTES); @@ -580,36 +580,21 @@ public class Destination { if (inFlightOp != null) { return RunwayStatus.denied(inFlightOp.getId()); } + replicationTasksStorage.get().start(op); inFlight.put(op.getURI(), op); } return RunwayStatus.allowed(); } - void notifyFinished(PushOne task) { + void notifyFinished(PushOne op) { synchronized (stateLock) { - inFlight.remove(task.getURI()); - if (!task.wasCanceled()) { - for (String ref : task.getRefs()) { - if (!refHasPendingPush(task.getURI(), ref)) { - replicationTasksStorage - .get() - .delete( - new ReplicateRefUpdate( - task.getProjectNameKey().get(), ref, task.getURI(), getRemoteConfigName())); - } - } + if (!op.isRetrying()) { + replicationTasksStorage.get().finish(op); } + inFlight.remove(op.getURI()); } } - private boolean refHasPendingPush(URIish opUri, String ref) { - return pushContainsRef(pending.get(opUri), ref) || pushContainsRef(inFlight.get(opUri), ref); - } - - private boolean pushContainsRef(PushOne op, String ref) { - return op != null && op.getRefs().contains(ref); - } - boolean wouldPush(URIish uri, Project.NameKey project, String ref) { return matches(uri, project) && wouldPushProject(project) && wouldPushRef(ref); } @@ -634,21 +619,7 @@ public class Destination { } boolean isSingleProjectMatch() { - List<String> projects = config.getProjects(); - boolean ret = (projects.size() == 1); - if (ret) { - String projectMatch = projects.get(0); - if (ReplicationFilter.getPatternType(projectMatch) - != ReplicationFilter.PatternType.EXACT_MATCH) { - // projectMatch is either regular expression, or wild-card. - // - // Even though they might refer to a single project now, they need not - // after new projects have been created. Hence, we do not treat them as - // matching a single project. - ret = false; - } - } - return ret; + return config.isSingleProjectMatch(); } boolean wouldPushRef(String ref) { @@ -775,6 +746,10 @@ public class Destination { return config.getDelay() * 1000L; } + int getSlowLatencyThreshold() { + return config.getSlowLatencyThreshold(); + } + private static boolean matches(URIish uri, String urlMatch) { if (urlMatch == null || urlMatch.equals("") || urlMatch.equals("*")) { return true; @@ -794,7 +769,7 @@ public class Destination { ReplicationScheduledEvent event = new ReplicationScheduledEvent(project.get(), ref, targetNode); try { - eventDispatcher.get().postEvent(new Branch.NameKey(project, ref), event); + eventDispatcher.get().postEvent(BranchNameKey.create(project, ref), event); } catch (PermissionBackendException e) { repLog.error("error posting event", e); } @@ -808,7 +783,7 @@ public class Destination { RefReplicatedEvent event = new RefReplicatedEvent(project.get(), ref, targetNode, RefPushResult.FAILED, status); try { - eventDispatcher.get().postEvent(new Branch.NameKey(project, ref), event); + eventDispatcher.get().postEvent(BranchNameKey.create(project, ref), event); } catch (PermissionBackendException e) { repLog.error("error posting event", e); } diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationConfigParser.java b/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationConfigParser.java new file mode 100644 index 0000000..4050c9c --- /dev/null +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationConfigParser.java @@ -0,0 +1,103 @@ +// Copyright (C) 2020 The Android Open Source Project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.googlesource.gerrit.plugins.replication; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import com.google.common.flogger.FluentLogger; +import java.net.URISyntaxException; +import java.util.Collections; +import java.util.List; +import java.util.Set; +import org.eclipse.jgit.errors.ConfigInvalidException; +import org.eclipse.jgit.lib.Config; +import org.eclipse.jgit.transport.RefSpec; +import org.eclipse.jgit.transport.RemoteConfig; +import org.eclipse.jgit.transport.URIish; + +/** + * Implementation of {@link ConfigParser} for parsing {@link Config} to a collection of {@link + * DestinationConfiguration} objects + */ +public class DestinationConfigParser implements ConfigParser { + + private static final FluentLogger logger = FluentLogger.forEnclosingClass(); + + /* (non-Javadoc) + * @see com.googlesource.gerrit.plugins.replication.ConfigParser#parseRemotes(org.eclipse.jgit.lib.Config) + */ + @Override + public List<RemoteConfiguration> parseRemotes(Config config) throws ConfigInvalidException { + + if (config.getSections().isEmpty()) { + logger.atWarning().log("Replication config does not exist or it's empty; not replicating"); + return Collections.emptyList(); + } + + boolean defaultForceUpdate = config.getBoolean("gerrit", "defaultForceUpdate", false); + + ImmutableList.Builder<RemoteConfiguration> confs = ImmutableList.builder(); + for (RemoteConfig c : allRemotes(config)) { + if (c.getURIs().isEmpty()) { + continue; + } + + // If destination for push is not set assume equal to source. + for (RefSpec ref : c.getPushRefSpecs()) { + if (ref.getDestination() == null) { + ref.setDestination(ref.getSource()); + } + } + + if (c.getPushRefSpecs().isEmpty()) { + c.addPushRefSpec( + new RefSpec() + .setSourceDestination("refs/*", "refs/*") + .setForceUpdate(defaultForceUpdate)); + } + + DestinationConfiguration destinationConfiguration = new DestinationConfiguration(c, config); + + if (!destinationConfiguration.isSingleProjectMatch()) { + for (URIish u : c.getURIs()) { + if (u.getPath() == null || !u.getPath().contains("${name}")) { + throw new ConfigInvalidException( + String.format( + "remote.%s.url \"%s\" lacks ${name} placeholder in %s", + c.getName(), u, config)); + } + } + } + + confs.add(destinationConfiguration); + } + + return confs.build(); + } + + private static List<RemoteConfig> allRemotes(Config cfg) throws ConfigInvalidException { + Set<String> names = cfg.getSubsections("remote"); + List<RemoteConfig> result = Lists.newArrayListWithCapacity(names.size()); + for (String name : names) { + try { + result.add(new RemoteConfig(cfg, name)); + } catch (URISyntaxException e) { + throw new ConfigInvalidException( + String.format("remote %s has invalid URL in %s", name, cfg), e); + } + } + return result; + } +} diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationConfiguration.java b/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationConfiguration.java index f688cfc..4b757ea 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationConfiguration.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationConfiguration.java @@ -16,13 +16,16 @@ package com.googlesource.gerrit.plugins.replication; import com.google.common.base.MoreObjects; import com.google.common.collect.ImmutableList; +import com.google.gerrit.server.config.ConfigUtil; +import java.util.concurrent.TimeUnit; import org.eclipse.jgit.lib.Config; import org.eclipse.jgit.transport.RemoteConfig; -public class DestinationConfiguration { +public class DestinationConfiguration implements RemoteConfiguration { static final int DEFAULT_REPLICATION_DELAY = 15; static final int DEFAULT_RESCHEDULE_DELAY = 3; static final int DEFAULT_DRAIN_QUEUE_ATTEMPTS = 0; + private static final int DEFAULT_SLOW_LATENCY_THRESHOLD_SECS = 900; private final int delay; private final int rescheduleDelay; @@ -41,6 +44,7 @@ public class DestinationConfiguration { private final ImmutableList<String> authGroupNames; private final RemoteConfig remoteConfig; private final int maxRetries; + private final int slowLatencyThreshold; protected DestinationConfiguration(RemoteConfig remoteConfig, Config cfg) { this.remoteConfig = remoteConfig; @@ -67,16 +71,29 @@ public class DestinationConfiguration { maxRetries = getInt( remoteConfig, cfg, "replicationMaxRetries", cfg.getInt("replication", "maxRetries", 0)); + + slowLatencyThreshold = + (int) + ConfigUtil.getTimeUnit( + cfg, + "remote", + remoteConfig.getName(), + "slowLatencyThreshold", + DEFAULT_SLOW_LATENCY_THRESHOLD_SECS, + TimeUnit.SECONDS); } + @Override public int getDelay() { return delay; } + @Override public int getRescheduleDelay() { return rescheduleDelay; } + @Override public int getRetryDelay() { return retryDelay; } @@ -93,26 +110,32 @@ public class DestinationConfiguration { return lockErrorMaxRetries; } + @Override public ImmutableList<String> getUrls() { return urls; } + @Override public ImmutableList<String> getAdminUrls() { return adminUrls; } + @Override public ImmutableList<String> getProjects() { return projects; } + @Override public ImmutableList<String> getAuthGroupNames() { return authGroupNames; } + @Override public String getRemoteNameStyle() { return remoteNameStyle; } + @Override public boolean replicatePermissions() { return replicatePermissions; } @@ -129,10 +152,12 @@ public class DestinationConfiguration { return replicateHiddenProjects; } + @Override public RemoteConfig getRemoteConfig() { return remoteConfig; } + @Override public int getMaxRetries() { return maxRetries; } @@ -140,4 +165,9 @@ public class DestinationConfiguration { private static int getInt(RemoteConfig rc, Config cfg, String name, int defValue) { return cfg.getInt("remote", rc.getName(), name, defValue); } + + @Override + public int getSlowLatencyThreshold() { + return slowLatencyThreshold; + } } diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationsCollection.java b/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationsCollection.java new file mode 100644 index 0000000..eaf5b27 --- /dev/null +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationsCollection.java @@ -0,0 +1,269 @@ +// Copyright (C) 2019 The Android Open Source Project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.googlesource.gerrit.plugins.replication; + +import static com.googlesource.gerrit.plugins.replication.AdminApiFactory.isGerrit; +import static com.googlesource.gerrit.plugins.replication.AdminApiFactory.isGerritHttp; +import static com.googlesource.gerrit.plugins.replication.AdminApiFactory.isSSH; +import static com.googlesource.gerrit.plugins.replication.ReplicationFileBasedConfig.replaceName; +import static com.googlesource.gerrit.plugins.replication.ReplicationQueue.repLog; +import static java.util.stream.Collectors.toList; + +import com.google.common.base.Strings; +import com.google.common.collect.HashMultimap; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMultimap; +import com.google.common.collect.Multimap; +import com.google.common.collect.SetMultimap; +import com.google.common.eventbus.EventBus; +import com.google.common.eventbus.Subscribe; +import com.google.common.flogger.FluentLogger; +import com.google.gerrit.entities.Project; +import com.google.gerrit.server.git.WorkQueue; +import com.google.inject.Inject; +import com.google.inject.Provider; +import com.google.inject.Singleton; +import com.googlesource.gerrit.plugins.replication.Destination.Factory; +import com.googlesource.gerrit.plugins.replication.ReplicationConfig.FilterType; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.function.Predicate; +import org.eclipse.jgit.errors.ConfigInvalidException; +import org.eclipse.jgit.transport.URIish; + +@Singleton +public class DestinationsCollection implements ReplicationDestinations { + private static final FluentLogger logger = FluentLogger.forEnclosingClass(); + + private final Factory destinationFactory; + private final Provider<ReplicationQueue> replicationQueue; + private volatile List<Destination> destinations; + private boolean shuttingDown; + + public static class EventQueueNotEmptyException extends Exception { + private static final long serialVersionUID = 1L; + + public EventQueueNotEmptyException(String errorMessage) { + super(errorMessage); + } + } + + @Inject + public DestinationsCollection( + Destination.Factory destinationFactory, + Provider<ReplicationQueue> replicationQueue, + ReplicationConfig replicationConfig, + ConfigParser configParser, + EventBus eventBus) + throws ConfigInvalidException { + this.destinationFactory = destinationFactory; + this.replicationQueue = replicationQueue; + this.destinations = + allDestinations( + destinationFactory, configParser.parseRemotes(replicationConfig.getConfig())); + eventBus.register(this); + } + + @Override + public Multimap<Destination, URIish> getURIs( + Optional<String> remoteName, Project.NameKey projectName, FilterType filterType) { + if (getAll(filterType).isEmpty()) { + return ImmutableMultimap.of(); + } + + SetMultimap<Destination, URIish> uris = HashMultimap.create(); + for (Destination config : getAll(filterType)) { + if (filterType != FilterType.PROJECT_DELETION && !config.wouldPushProject(projectName)) { + continue; + } + + if (remoteName.isPresent() && !config.getRemoteConfigName().equals(remoteName.get())) { + continue; + } + + boolean adminURLUsed = false; + + for (String url : config.getAdminUrls()) { + if (Strings.isNullOrEmpty(url)) { + continue; + } + + URIish uri; + try { + uri = new URIish(url); + } catch (URISyntaxException e) { + repLog.warn("adminURL '{}' is invalid: {}", url, e.getMessage()); + continue; + } + + if (!isGerrit(uri) && !isGerritHttp(uri)) { + String path = + replaceName(uri.getPath(), projectName.get(), config.isSingleProjectMatch()); + if (path == null) { + repLog.warn("adminURL {} does not contain ${name}", uri); + continue; + } + + uri = uri.setPath(path); + if (!isSSH(uri)) { + repLog.warn("adminURL '{}' is invalid: only SSH and HTTP are supported", uri); + continue; + } + } + uris.put(config, uri); + adminURLUsed = true; + } + + if (!adminURLUsed) { + for (URIish uri : config.getURIs(projectName, "*")) { + uris.put(config, uri); + } + } + } + return uris; + } + + @Override + public List<Destination> getAll(FilterType filterType) { + Predicate<? super Destination> filter; + switch (filterType) { + case PROJECT_CREATION: + filter = dest -> dest.isCreateMissingRepos(); + break; + case PROJECT_DELETION: + filter = dest -> dest.isReplicateProjectDeletions(); + break; + case ALL: + default: + filter = dest -> true; + break; + } + return destinations.stream().filter(Objects::nonNull).filter(filter).collect(toList()); + } + + @Override + public List<Destination> getDestinations(URIish uri, Project.NameKey project, String ref) { + List<Destination> dests = new ArrayList<>(); + for (Destination dest : getAll(FilterType.ALL)) { + if (dest.wouldPush(uri, project, ref)) { + dests.add(dest); + } + } + return dests; + } + + @Override + public boolean isEmpty() { + return destinations.isEmpty(); + } + + @Override + public synchronized void startup(WorkQueue workQueue) { + shuttingDown = false; + for (Destination cfg : destinations) { + cfg.start(workQueue); + } + } + + /* shutdown() cannot be set as a synchronized method because + * it may need to wait for pending events to complete; + * e.g. when enabling the drain of replication events before + * shutdown. + * + * As a rule of thumb for synchronized methods, because they + * implicitly define a critical section and associated lock, + * they should never hold waiting for another resource, otherwise + * the risk of deadlock is very high. + * + * See more background about deadlocks, what they are and how to + * prevent them at: https://en.wikipedia.org/wiki/Deadlock + */ + @Override + public int shutdown() { + synchronized (this) { + shuttingDown = true; + } + + int discarded = 0; + for (Destination cfg : destinations) { + try { + drainReplicationEvents(cfg); + } catch (EventQueueNotEmptyException e) { + logger.atWarning().log("Event queue not empty: %s", e.getMessage()); + } finally { + discarded += cfg.shutdown(); + } + } + return discarded; + } + + void drainReplicationEvents(Destination destination) throws EventQueueNotEmptyException { + int drainQueueAttempts = destination.getDrainQueueAttempts(); + if (drainQueueAttempts == 0) { + return; + } + int pending = destination.getQueueInfo().pending.size(); + int inFlight = destination.getQueueInfo().inFlight.size(); + while ((inFlight > 0 || pending > 0) && drainQueueAttempts > 0) { + try { + logger.atInfo().log( + "Draining replication events, postpone shutdown. Events left: inFlight %d, pending %d", + inFlight, pending); + Thread.sleep(destination.getReplicationDelayMilliseconds()); + } catch (InterruptedException ie) { + logger.atWarning().withCause(ie).log( + "Wait for replication events to drain has been interrupted"); + } + pending = destination.getQueueInfo().pending.size(); + inFlight = destination.getQueueInfo().inFlight.size(); + drainQueueAttempts--; + } + if (pending > 0 || inFlight > 0) { + throw new EventQueueNotEmptyException( + String.format("Pending: %d - InFlight: %d", pending, inFlight)); + } + } + + @Subscribe + public synchronized void onReload(List<RemoteConfiguration> remoteConfigurations) { + if (shuttingDown) { + logger.atWarning().log("Shutting down: configuration reload ignored"); + return; + } + + try { + replicationQueue.get().stop(); + destinations = allDestinations(destinationFactory, remoteConfigurations); + logger.atInfo().log("Configuration reloaded: %d destinations", getAll(FilterType.ALL).size()); + } finally { + replicationQueue.get().start(); + } + } + + private List<Destination> allDestinations( + Destination.Factory destinationFactory, List<RemoteConfiguration> remoteConfigurations) { + + ImmutableList.Builder<Destination> dest = ImmutableList.builder(); + for (RemoteConfiguration c : remoteConfigurations) { + if (c instanceof DestinationConfiguration) { + dest.add(destinationFactory.create((DestinationConfiguration) c)); + } + } + return dest.build(); + } +} diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/FanoutReplicationConfig.java b/src/main/java/com/googlesource/gerrit/plugins/replication/FanoutReplicationConfig.java new file mode 100644 index 0000000..4cc9974 --- /dev/null +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/FanoutReplicationConfig.java @@ -0,0 +1,182 @@ +// Copyright (C) 2020 The Android Open Source Project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.googlesource.gerrit.plugins.replication; + +import static com.google.common.io.Files.getNameWithoutExtension; +import static java.nio.charset.StandardCharsets.UTF_8; + +import com.google.common.collect.Lists; +import com.google.common.flogger.FluentLogger; +import com.google.common.hash.Hasher; +import com.google.common.hash.Hashing; +import com.google.gerrit.extensions.annotations.PluginData; +import com.google.gerrit.server.config.SitePaths; +import com.google.inject.Inject; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Stream; +import org.eclipse.jgit.errors.ConfigInvalidException; +import org.eclipse.jgit.internal.storage.file.FileSnapshot; +import org.eclipse.jgit.lib.Config; +import org.eclipse.jgit.storage.file.FileBasedConfig; +import org.eclipse.jgit.util.FS; + +public class FanoutReplicationConfig implements ReplicationConfig { + private static final FluentLogger logger = FluentLogger.forEnclosingClass(); + + private final ReplicationFileBasedConfig replicationConfig; + private final Config config; + private final Path remoteConfigsDirPath; + + @Inject + public FanoutReplicationConfig(SitePaths site, @PluginData Path pluginDataDir) + throws IOException, ConfigInvalidException { + + remoteConfigsDirPath = site.etc_dir.resolve("replication"); + replicationConfig = new ReplicationFileBasedConfig(site, pluginDataDir); + config = replicationConfig.getConfig(); + removeRemotes(config); + + try (Stream<Path> files = Files.list(remoteConfigsDirPath)) { + files + .filter(Files::isRegularFile) + .filter(FanoutReplicationConfig::isConfig) + .map(FanoutReplicationConfig::loadConfig) + .filter(Optional::isPresent) + .map(Optional::get) + .filter(FanoutReplicationConfig::isValid) + .forEach(cfg -> addRemoteConfig(cfg, config)); + } catch (IllegalStateException e) { + throw new ConfigInvalidException(e.getMessage()); + } + } + + private static void removeRemotes(Config config) { + Set<String> remoteNames = config.getSubsections("remote"); + if (remoteNames.size() > 0) { + logger.atSevere().log( + "When replication directory is present replication.config file cannot contain remote configuration. Ignoring: %s", + String.join(",", remoteNames)); + + for (String name : remoteNames) { + config.unsetSection("remote", name); + } + } + } + + private static void addRemoteConfig(FileBasedConfig source, Config destination) { + String remoteName = getNameWithoutExtension(source.getFile().getName()); + for (String name : source.getNames("remote")) { + destination.setStringList( + "remote", + remoteName, + name, + Lists.newArrayList(source.getStringList("remote", null, name))); + } + } + + private static boolean isValid(Config cfg) { + if (cfg.getSections().size() != 1 || !cfg.getSections().contains("remote")) { + logger.atSevere().log( + "Remote replication configuration file %s must contain only one remote section.", cfg); + return false; + } + if (cfg.getSubsections("remote").size() > 0) { + logger.atSevere().log( + "Remote replication configuration file %s cannot contain remote subsections.", cfg); + return false; + } + + return true; + } + + private static Optional<FileBasedConfig> loadConfig(Path path) { + FileBasedConfig cfg = new FileBasedConfig(path.toFile(), FS.DETECTED); + try { + cfg.load(); + } catch (IOException | ConfigInvalidException e) { + logger.atSevere().withCause(e).log( + "Cannot load remote replication configuration file %s.", path); + return Optional.empty(); + } + return Optional.of(cfg); + } + + private static boolean isConfig(Path p) { + return p.toString().endsWith(".config"); + } + + @Override + public boolean isReplicateAllOnPluginStart() { + return replicationConfig.isReplicateAllOnPluginStart(); + } + + @Override + public boolean isDefaultForceUpdate() { + return replicationConfig.isDefaultForceUpdate(); + } + + @Override + public int getMaxRefsToLog() { + return replicationConfig.getMaxRefsToLog(); + } + + @Override + public Path getEventsDirectory() { + return replicationConfig.getEventsDirectory(); + } + + @Override + public int getSshConnectionTimeout() { + return replicationConfig.getSshConnectionTimeout(); + } + + @Override + public int getSshCommandTimeout() { + return replicationConfig.getSshCommandTimeout(); + } + + @Override + public String getVersion() { + Hasher hasher = Hashing.murmur3_128().newHasher(); + hasher.putString(replicationConfig.getVersion(), UTF_8); + try (Stream<Path> files = Files.list(remoteConfigsDirPath)) { + files + .filter(Files::isRegularFile) + .filter(FanoutReplicationConfig::isConfig) + .sorted() + .map(Path::toFile) + .map(FileSnapshot::save) + .forEach( + fileSnapshot -> + // hashCode is based on file size, file key and last modified time + hasher.putInt(fileSnapshot.hashCode())); + return hasher.hash().toString(); + } catch (IOException e) { + logger.atSevere().withCause(e).log( + "Cannot list remote configuration files from %s. Returning replication.config file version", + remoteConfigsDirPath); + return replicationConfig.getVersion(); + } + } + + @Override + public Config getConfig() { + return config; + } +} diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/GerritRestApi.java b/src/main/java/com/googlesource/gerrit/plugins/replication/GerritRestApi.java index eac56df..66130f9 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/GerritRestApi.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/GerritRestApi.java @@ -18,8 +18,8 @@ import static com.googlesource.gerrit.plugins.replication.GerritSshApi.GERRIT_AD import static com.googlesource.gerrit.plugins.replication.ReplicationQueue.repLog; import static java.nio.charset.StandardCharsets.UTF_8; +import com.google.gerrit.entities.Project; import com.google.gerrit.extensions.restapi.Url; -import com.google.gerrit.reviewdb.client.Project; import com.google.inject.Inject; import com.google.inject.assistedinject.Assisted; import java.io.IOException; diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/GerritSshApi.java b/src/main/java/com/googlesource/gerrit/plugins/replication/GerritSshApi.java index 6dcc80e..d195aa3 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/GerritSshApi.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/GerritSshApi.java @@ -15,7 +15,7 @@ package com.googlesource.gerrit.plugins.replication; import com.google.common.flogger.FluentLogger; -import com.google.gerrit.reviewdb.client.Project; +import com.google.gerrit.entities.Project; import com.google.gerrit.server.ssh.SshAddressesModule; import java.io.IOException; import java.io.OutputStream; diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ListCommand.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ListCommand.java index 07978ab..9264d9b 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/ListCommand.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ListCommand.java @@ -40,11 +40,11 @@ final class ListCommand extends SshCommand { @Option(name = "--json", usage = "output in json format") private boolean json; - @Inject private ReplicationConfig config; + @Inject private ReplicationDestinations destinations; @Override protected void run() { - for (Destination d : config.getDestinations(FilterType.ALL)) { + for (Destination d : destinations.getAll(FilterType.ALL)) { if (matches(d.getRemoteConfigName())) { printRemote(d); } diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/LocalFS.java b/src/main/java/com/googlesource/gerrit/plugins/replication/LocalFS.java index aa6e16c..da960e6 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/LocalFS.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/LocalFS.java @@ -16,7 +16,7 @@ package com.googlesource.gerrit.plugins.replication; import static com.googlesource.gerrit.plugins.replication.ReplicationQueue.repLog; -import com.google.gerrit.reviewdb.client.Project; +import com.google.gerrit.entities.Project; import java.io.File; import java.io.IOException; import org.eclipse.jgit.internal.storage.file.FileRepository; diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/MainReplicationConfig.java b/src/main/java/com/googlesource/gerrit/plugins/replication/MainReplicationConfig.java new file mode 100644 index 0000000..e8d95ec --- /dev/null +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/MainReplicationConfig.java @@ -0,0 +1,23 @@ +// Copyright (C) 2020 The Android Open Source Project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.googlesource.gerrit.plugins.replication; + +import com.google.inject.BindingAnnotation; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; + +@BindingAnnotation +@Retention(RetentionPolicy.RUNTIME) +public @interface MainReplicationConfig {} diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/Nfs.java b/src/main/java/com/googlesource/gerrit/plugins/replication/Nfs.java new file mode 100644 index 0000000..a347f3a --- /dev/null +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/Nfs.java @@ -0,0 +1,55 @@ +// Copyright (C) 2020 The Android Open Source Project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.googlesource.gerrit.plugins.replication; + +import java.io.IOException; +import java.util.Locale; + +/** Some NFS utilities */ +public class Nfs { + /** + * Determine if a throwable or a cause in its causal chain is a Stale NFS File Handle + * + * @param throwable + * @return a boolean true if the throwable or a cause in its causal chain is a Stale NFS File + * Handle + */ + public static boolean isStaleFileHandleInCausalChain(Throwable throwable) { + while (throwable != null) { + if (throwable instanceof IOException && isStaleFileHandle((IOException) throwable)) { + return true; + } + throwable = throwable.getCause(); + } + return false; + } + + /** + * Determine if an IOException is a Stale NFS File Handle + * + * @param ioe + * @return a boolean true if the IOException is a Stale NFS FIle Handle + */ + public static boolean isStaleFileHandle(IOException ioe) { + String msg = ioe.getMessage(); + return msg != null && msg.toLowerCase(Locale.ROOT).matches(".*stale .*file .*handle.*"); + } + + public static <T extends Throwable> void throwIfNotStaleFileHandle(T e) throws T { + if (!isStaleFileHandleInCausalChain(e)) { + throw e; + } + } +} diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ObservableQueue.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ObservableQueue.java new file mode 100644 index 0000000..1007ae5 --- /dev/null +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ObservableQueue.java @@ -0,0 +1,32 @@ +// Copyright (C) 2019 The Android Open Source Project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.googlesource.gerrit.plugins.replication; + +/** Allows a queue activity to be observed */ +public interface ObservableQueue { + /** + * Indicates whether the observed queue is running + * + * @return true, when the queue is running, false otherwise + */ + boolean isRunning(); + + /** + * Indicates whether the observed queue is replaying queued events + * + * @return true, when the queue is replaying, false otherwise + */ + boolean isReplaying(); +} diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/PushAll.java b/src/main/java/com/googlesource/gerrit/plugins/replication/PushAll.java index 833b02b..4f60319 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/PushAll.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/PushAll.java @@ -15,7 +15,7 @@ package com.googlesource.gerrit.plugins.replication; import com.google.gerrit.common.Nullable; -import com.google.gerrit.reviewdb.client.Project; +import com.google.gerrit.entities.Project; import com.google.gerrit.server.git.WorkQueue; import com.google.gerrit.server.project.ProjectCache; import com.google.inject.Inject; diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java b/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java index b488264..634f44d 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java @@ -16,6 +16,7 @@ package com.googlesource.gerrit.plugins.replication; import static com.googlesource.gerrit.plugins.replication.ReplicationQueue.repLog; import static java.util.concurrent.TimeUnit.NANOSECONDS; +import static java.util.concurrent.TimeUnit.SECONDS; import static java.util.stream.Collectors.joining; import static java.util.stream.Collectors.toMap; @@ -24,13 +25,13 @@ import com.google.common.base.Throwables; import com.google.common.collect.LinkedListMultimap; import com.google.common.collect.ListMultimap; import com.google.common.collect.Sets; +import com.google.gerrit.entities.Project; +import com.google.gerrit.entities.RefNames; import com.google.gerrit.extensions.events.GitReferenceUpdatedListener; import com.google.gerrit.extensions.registration.DynamicItem; import com.google.gerrit.extensions.restapi.AuthException; import com.google.gerrit.extensions.restapi.ResourceConflictException; import com.google.gerrit.metrics.Timer1; -import com.google.gerrit.reviewdb.client.Project; -import com.google.gerrit.reviewdb.client.RefNames; import com.google.gerrit.server.git.GitRepositoryManager; import com.google.gerrit.server.git.PerThreadRequestScope; import com.google.gerrit.server.git.ProjectRunnable; @@ -217,6 +218,10 @@ class PushOne implements ProjectRunnable, CanceledWhileRunning { return maxRetries == 0 || retryCount <= maxRetries; } + private void retryDone() { + this.retrying = false; + } + void canceledByReplication() { canceled = true; } @@ -334,14 +339,20 @@ class PushOne implements ProjectRunnable, CanceledWhileRunning { } repLog.info("Replication to {} started...", uri); - Timer1.Context context = metrics.start(config.getName()); + Timer1.Context<String> destinationContext = metrics.start(config.getName()); try { - long startedAt = context.getStartTime(); + long startedAt = destinationContext.getStartTime(); long delay = NANOSECONDS.toMillis(startedAt - createdAt); metrics.record(config.getName(), delay, retryCount); git = gitManager.openRepository(projectName); runImpl(); - long elapsed = NANOSECONDS.toMillis(context.stop()); + long elapsed = NANOSECONDS.toMillis(destinationContext.stop()); + + if (elapsed > SECONDS.toMillis(pool.getSlowLatencyThreshold())) { + metrics.recordSlowProjectReplication( + config.getName(), projectName.get(), pool.getSlowLatencyThreshold(), elapsed); + } + retryDone(); repLog.info( "Replication to {} completed in {}ms, {}ms delay, {} retries", uri, diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/RefReplicatedEvent.java b/src/main/java/com/googlesource/gerrit/plugins/replication/RefReplicatedEvent.java index fccdb7b..d1ab790 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/RefReplicatedEvent.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/RefReplicatedEvent.java @@ -14,9 +14,10 @@ package com.googlesource.gerrit.plugins.replication; -import com.google.gerrit.reviewdb.client.Project; +import com.google.gerrit.entities.Project; import com.google.gerrit.server.events.RefEvent; import com.googlesource.gerrit.plugins.replication.ReplicationState.RefPushResult; +import java.util.Objects; import org.eclipse.jgit.transport.RemoteRefUpdate; import org.eclipse.jgit.transport.RemoteRefUpdate.Status; @@ -45,11 +46,40 @@ public class RefReplicatedEvent extends RefEvent { @Override public Project.NameKey getProjectNameKey() { - return new Project.NameKey(project); + return Project.nameKey(project); } @Override public String getRefName() { return ref; } + + @Override + public boolean equals(Object other) { + if (!(other instanceof RefReplicatedEvent)) { + return false; + } + RefReplicatedEvent event = (RefReplicatedEvent) other; + if (!Objects.equals(event.project, this.project)) { + return false; + } + if (!Objects.equals(event.ref, this.ref)) { + return false; + } + if (!Objects.equals(event.targetNode, this.targetNode)) { + return false; + } + if (!Objects.equals(event.status, this.status)) { + return false; + } + if (!Objects.equals(event.refStatus, this.refStatus)) { + return false; + } + return true; + } + + @Override + public int hashCode() { + return super.hashCode(); + } } diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/RefReplicationDoneEvent.java b/src/main/java/com/googlesource/gerrit/plugins/replication/RefReplicationDoneEvent.java index 4789a96..e663194 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/RefReplicationDoneEvent.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/RefReplicationDoneEvent.java @@ -14,8 +14,9 @@ package com.googlesource.gerrit.plugins.replication; -import com.google.gerrit.reviewdb.client.Project; +import com.google.gerrit.entities.Project; import com.google.gerrit.server.events.RefEvent; +import java.util.Objects; public class RefReplicationDoneEvent extends RefEvent { public static final String TYPE = "ref-replication-done"; @@ -33,11 +34,35 @@ public class RefReplicationDoneEvent extends RefEvent { @Override public Project.NameKey getProjectNameKey() { - return new Project.NameKey(project); + return Project.nameKey(project); } @Override public String getRefName() { return ref; } + + @Override + public boolean equals(Object other) { + if (!(other instanceof RefReplicationDoneEvent)) { + return false; + } + + RefReplicationDoneEvent event = (RefReplicationDoneEvent) other; + if (!Objects.equals(event.project, this.project)) { + return false; + } + if (!Objects.equals(event.ref, this.ref)) { + return false; + } + if (event.nodesCount != this.nodesCount) { + return false; + } + return true; + } + + @Override + public int hashCode() { + return super.hashCode(); + } } diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/RemoteConfiguration.java b/src/main/java/com/googlesource/gerrit/plugins/replication/RemoteConfiguration.java new file mode 100644 index 0000000..b66e73c --- /dev/null +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/RemoteConfiguration.java @@ -0,0 +1,122 @@ +// Copyright (C) 2019 The Android Open Source Project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package com.googlesource.gerrit.plugins.replication; + +import com.google.common.collect.ImmutableList; +import java.util.List; +import org.eclipse.jgit.transport.RemoteConfig; + +/** Remote configuration for a replication endpoint */ +public interface RemoteConfiguration { + /** + * Time to wait before scheduling a remote replication operation. Setting to 0 effectively + * disables the delay. + * + * @return the delay value in seconds + */ + int getDelay(); + /** + * Time to wait before rescheduling a remote replication operation, which might have failed the + * first time round. Setting to 0 effectively disables the delay. + * + * @return the delay value in seconds + */ + int getRescheduleDelay(); + /** + * Time to wait before retrying a failed remote replication operation, Setting to 0 effectively + * disables the delay. + * + * @return the delay value in seconds + */ + int getRetryDelay(); + /** + * List of the remote endpoint addresses used for replication. + * + * @return list of remote URL strings + */ + ImmutableList<String> getUrls(); + /** + * List of alternative remote endpoint addresses, used for admin operations, such as repository + * creation + * + * @return list of remote URL strings + */ + ImmutableList<String> getAdminUrls(); + /** + * List of repositories that should be replicated + * + * @return list of project strings + */ + ImmutableList<String> getProjects(); + /** + * List of groups that should be used to access the repositories. + * + * @return list of group strings + */ + ImmutableList<String> getAuthGroupNames(); + /** + * Influence how the name of the remote repository should be computed. + * + * @return a string representing a remote style name + */ + String getRemoteNameStyle(); + /** + * If true, permissions-only projects and the refs/meta/config branch will also be replicated + * + * @return a string representing a remote style name + */ + boolean replicatePermissions(); + /** + * the JGIT remote configuration representing the replication for this endpoint + * + * @return The remote config {@link RemoteConfig} + */ + RemoteConfig getRemoteConfig(); + /** + * Number of times to retry a replication operation + * + * @return the number of retries + */ + int getMaxRetries(); + + /** + * the time duration after which the replication for a project should be considered “slow” + * + * @return the slow latency threshold + */ + int getSlowLatencyThreshold(); + + /** + * Whether the remote configuration is for a single project only + * + * @return true, when configuration is for a single project, false otherwise + */ + default boolean isSingleProjectMatch() { + List<String> projects = getProjects(); + boolean ret = (projects.size() == 1); + if (ret) { + String projectMatch = projects.get(0); + if (ReplicationFilter.getPatternType(projectMatch) + != ReplicationFilter.PatternType.EXACT_MATCH) { + // projectMatch is either regular expression, or wild-card. + // + // Even though they might refer to a single project now, they need not + // after new projects have been created. Hence, we do not treat them as + // matching a single project. + ret = false; + } + } + return ret; + } +} diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/RemoteSsh.java b/src/main/java/com/googlesource/gerrit/plugins/replication/RemoteSsh.java index e685215..7538298 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/RemoteSsh.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/RemoteSsh.java @@ -16,7 +16,7 @@ package com.googlesource.gerrit.plugins.replication; import static com.googlesource.gerrit.plugins.replication.ReplicationQueue.repLog; -import com.google.gerrit.reviewdb.client.Project; +import com.google.gerrit.entities.Project; import java.io.IOException; import java.io.OutputStream; import org.eclipse.jgit.transport.URIish; diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationConfig.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationConfig.java index ccdead8..b978952 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationConfig.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationConfig.java @@ -11,46 +11,81 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. + package com.googlesource.gerrit.plugins.replication; -import com.google.common.collect.Multimap; -import com.google.gerrit.reviewdb.client.Project; -import com.google.gerrit.server.git.WorkQueue; import java.nio.file.Path; -import java.util.List; -import java.util.Optional; -import org.eclipse.jgit.transport.URIish; +import org.eclipse.jgit.lib.Config; +/** Configuration of all the replication end points. */ public interface ReplicationConfig { + /** Filter for accessing replication projects. */ enum FilterType { PROJECT_CREATION, PROJECT_DELETION, ALL } - List<Destination> getDestinations(URIish uriish, Project.NameKey project, String ref); - - List<Destination> getDestinations(FilterType filterType); - - Multimap<Destination, URIish> getURIs( - Optional<String> remoteName, Project.NameKey projectName, FilterType filterType); - + /** + * Returns current replication configuration of whether to replicate or not all the projects when + * the plugin starts. + * + * @return true if replication at plugin start, false otherwise. + */ boolean isReplicateAllOnPluginStart(); + /** + * Returns the default behaviour of the replication plugin when pushing to remote replication + * ends. Even though the property name has the 'update' suffix, it actually refers to Git push + * operation and not to a Git update. + * + * @return true if forced push is the default, false otherwise. + */ boolean isDefaultForceUpdate(); + /** + * Returns the maximum number of ref-specs to log into the replication_log whenever a push + * operation is completed against a replication end. + * + * @return maximum number of refs to log, zero if unlimited. + */ int getMaxRefsToLog(); - boolean isEmpty(); - + /** + * Configured location where the replication events are stored on the filesystem for being resumed + * and kept across restarts. + * + * @return path to store persisted events. + */ Path getEventsDirectory(); - int shutdown(); - - void startup(WorkQueue workQueue); - + /** + * Timeout for establishing SSH connection to remote. + * + * @return connection timeout, zero if infinite. + */ int getSshConnectionTimeout(); + /** + * Timeout for executing an SSH command on remote. + * + * @return command timeout, zero if infinite. + */ int getSshCommandTimeout(); + + /** + * Current logical version string of the current configuration loaded in memory, depending on the + * actual implementation of the configuration on the persistent storage. + * + * @return current logical version number. + */ + String getVersion(); + + /** + * Return a copy of the current config. + * + * @return the config. + */ + Config getConfig(); } diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationDestinations.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationDestinations.java new file mode 100644 index 0000000..18ccc66 --- /dev/null +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationDestinations.java @@ -0,0 +1,73 @@ +// Copyright (C) 2019 The Android Open Source Project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.googlesource.gerrit.plugins.replication; + +import com.google.common.collect.Multimap; +import com.google.gerrit.entities.Project; +import com.google.gerrit.server.git.WorkQueue; +import com.googlesource.gerrit.plugins.replication.ReplicationConfig.FilterType; +import java.util.List; +import java.util.Optional; +import org.eclipse.jgit.transport.URIish; + +/** Git destinations currently active for replication. */ +public interface ReplicationDestinations { + + /** + * Return all the URIs associated to a project and a filter criteria. + * + * @param remoteName name of the replication end or empty if selecting all ends. + * @param projectName name of the project + * @param filterType type of filter criteria for selecting projects + * @return the multi-map of destinations and the associated replication URIs + */ + Multimap<Destination, URIish> getURIs( + Optional<String> remoteName, Project.NameKey projectName, FilterType filterType); + + /** + * List of currently active replication destinations. + * + * @param filterType type project filtering + * @return the list of active destinations + */ + List<Destination> getAll(FilterType filterType); + + /** + * Return the active replication destinations for a uri/project/ref triplet. + * + * @param uriish uri of the destinations + * @param project name of the project + * @param ref ref name + * @return the list of active destinations + */ + List<Destination> getDestinations(URIish uriish, Project.NameKey project, String ref); + + /** @return true if there are no destinations, false otherwise. */ + boolean isEmpty(); + + /** + * Start replicating to all destinations. + * + * @param workQueue execution queue for scheduling the replication events. + */ + void startup(WorkQueue workQueue); + + /** + * Stop the replication to all destinations. + * + * @return number of events cancelled during shutdown. + */ + int shutdown(); +} diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfig.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfig.java index 3094929..e99f6b1 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfig.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfig.java @@ -13,52 +13,22 @@ // limitations under the License. package com.googlesource.gerrit.plugins.replication; -import static com.googlesource.gerrit.plugins.replication.AdminApiFactory.isGerrit; -import static com.googlesource.gerrit.plugins.replication.AdminApiFactory.isGerritHttp; -import static com.googlesource.gerrit.plugins.replication.AdminApiFactory.isSSH; import static com.googlesource.gerrit.plugins.replication.ReplicationQueue.repLog; -import static java.util.concurrent.TimeUnit.MILLISECONDS; -import static java.util.concurrent.TimeUnit.SECONDS; -import static java.util.stream.Collectors.toList; import com.google.common.base.Strings; -import com.google.common.collect.HashMultimap; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMultimap; -import com.google.common.collect.Lists; -import com.google.common.collect.Multimap; -import com.google.common.collect.SetMultimap; -import com.google.common.flogger.FluentLogger; import com.google.gerrit.extensions.annotations.PluginData; -import com.google.gerrit.reviewdb.client.Project; -import com.google.gerrit.server.config.ConfigUtil; import com.google.gerrit.server.config.SitePaths; -import com.google.gerrit.server.git.WorkQueue; import com.google.inject.Inject; -import com.google.inject.Singleton; import java.io.IOException; -import java.net.URISyntaxException; import java.nio.file.Path; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Objects; -import java.util.Optional; -import java.util.Set; -import java.util.function.Predicate; import org.eclipse.jgit.errors.ConfigInvalidException; +import org.eclipse.jgit.lib.Config; import org.eclipse.jgit.storage.file.FileBasedConfig; -import org.eclipse.jgit.transport.RefSpec; -import org.eclipse.jgit.transport.RemoteConfig; -import org.eclipse.jgit.transport.URIish; import org.eclipse.jgit.util.FS; -@Singleton public class ReplicationFileBasedConfig implements ReplicationConfig { - private static final FluentLogger logger = FluentLogger.forEnclosingClass(); private static final int DEFAULT_SSH_CONNECTION_TIMEOUT_MS = 2 * 60 * 1000; // 2 minutes - private List<Destination> destinations; private final SitePaths site; private Path cfgPath; private boolean replicateAllOnPluginStart; @@ -70,188 +40,24 @@ public class ReplicationFileBasedConfig implements ReplicationConfig { private final Path pluginDataDir; @Inject - public ReplicationFileBasedConfig( - SitePaths site, Destination.Factory destinationFactory, @PluginData Path pluginDataDir) - throws ConfigInvalidException, IOException { + public ReplicationFileBasedConfig(SitePaths site, @PluginData Path pluginDataDir) { this.site = site; this.cfgPath = site.etc_dir.resolve("replication.config"); this.config = new FileBasedConfig(cfgPath.toFile(), FS.DETECTED); - this.destinations = allDestinations(destinationFactory); - this.pluginDataDir = pluginDataDir; - } - - @Override - public List<Destination> getDestinations(URIish uri, Project.NameKey project, String ref) { - List<Destination> dests = new ArrayList<>(); - for (Destination dest : getDestinations(FilterType.ALL)) { - if (dest.wouldPush(uri, project, ref)) { - dests.add(dest); - } - } - return dests; - } - - /* - * (non-Javadoc) - * @see - * com.googlesource.gerrit.plugins.replication.ReplicationConfig#getDestinations - * (com.googlesource.gerrit.plugins.replication.ReplicationConfig.FilterType) - */ - @Override - public List<Destination> getDestinations(FilterType filterType) { - Predicate<? super Destination> filter; - switch (filterType) { - case PROJECT_CREATION: - filter = dest -> dest.isCreateMissingRepos(); - break; - case PROJECT_DELETION: - filter = dest -> dest.isReplicateProjectDeletions(); - break; - case ALL: - default: - filter = dest -> true; - break; - } - return destinations.stream().filter(Objects::nonNull).filter(filter).collect(toList()); - } - - private List<Destination> allDestinations(Destination.Factory destinationFactory) - throws ConfigInvalidException, IOException { - if (!config.getFile().exists()) { - logger.atWarning().log("Config file %s does not exist; not replicating", config.getFile()); - return Collections.emptyList(); - } - if (config.getFile().length() == 0) { - logger.atInfo().log("Config file %s is empty; not replicating", config.getFile()); - return Collections.emptyList(); - } - try { config.load(); } catch (ConfigInvalidException e) { - throw new ConfigInvalidException( - String.format("Config file %s is invalid: %s", config.getFile(), e.getMessage()), e); + repLog.error("Config file {} is invalid: {}", cfgPath, e.getMessage(), e); } catch (IOException e) { - throw new IOException( - String.format("Cannot read %s: %s", config.getFile(), e.getMessage()), e); - } - - replicateAllOnPluginStart = config.getBoolean("gerrit", "replicateOnStartup", false); - - defaultForceUpdate = config.getBoolean("gerrit", "defaultForceUpdate", false); - - maxRefsToLog = config.getInt("gerrit", "maxRefsToLog", 0); - - sshCommandTimeout = - (int) ConfigUtil.getTimeUnit(config, "gerrit", null, "sshCommandTimeout", 0, SECONDS); - sshConnectionTimeout = - (int) - ConfigUtil.getTimeUnit( - config, - "gerrit", - null, - "sshConnectionTimeout", - DEFAULT_SSH_CONNECTION_TIMEOUT_MS, - MILLISECONDS); - - ImmutableList.Builder<Destination> dest = ImmutableList.builder(); - for (RemoteConfig c : allRemotes(config)) { - if (c.getURIs().isEmpty()) { - continue; - } - - // If destination for push is not set assume equal to source. - for (RefSpec ref : c.getPushRefSpecs()) { - if (ref.getDestination() == null) { - ref.setDestination(ref.getSource()); - } - } - - if (c.getPushRefSpecs().isEmpty()) { - c.addPushRefSpec( - new RefSpec() - .setSourceDestination("refs/*", "refs/*") - .setForceUpdate(defaultForceUpdate)); - } - - Destination destination = destinationFactory.create(new DestinationConfiguration(c, config)); - - if (!destination.isSingleProjectMatch()) { - for (URIish u : c.getURIs()) { - if (u.getPath() == null || !u.getPath().contains("${name}")) { - throw new ConfigInvalidException( - String.format( - "remote.%s.url \"%s\" lacks ${name} placeholder in %s", - c.getName(), u, config.getFile())); - } - } - } - - dest.add(destination); - } - return dest.build(); - } - - @Override - public Multimap<Destination, URIish> getURIs( - Optional<String> remoteName, Project.NameKey projectName, FilterType filterType) { - if (getDestinations(filterType).isEmpty()) { - return ImmutableMultimap.of(); + repLog.error("Cannot read {}: {}", cfgPath, e.getMessage(), e); } - - SetMultimap<Destination, URIish> uris = HashMultimap.create(); - for (Destination config : getDestinations(filterType)) { - if (filterType != FilterType.PROJECT_DELETION && !config.wouldPushProject(projectName)) { - continue; - } - - if (remoteName.isPresent() && !config.getRemoteConfigName().equals(remoteName.get())) { - continue; - } - - boolean adminURLUsed = false; - - for (String url : config.getAdminUrls()) { - if (Strings.isNullOrEmpty(url)) { - continue; - } - - URIish uri; - try { - uri = new URIish(url); - } catch (URISyntaxException e) { - repLog.warn("adminURL '{}' is invalid: {}", url, e.getMessage()); - continue; - } - - if (!isGerrit(uri) && !isGerritHttp(uri)) { - String path = - replaceName(uri.getPath(), projectName.get(), config.isSingleProjectMatch()); - if (path == null) { - repLog.warn("adminURL {} does not contain ${name}", uri); - continue; - } - - uri = uri.setPath(path); - if (!isSSH(uri)) { - repLog.warn("adminURL '{}' is invalid: only SSH and HTTP are supported", uri); - continue; - } - } - uris.put(config, uri); - adminURLUsed = true; - } - - if (!adminURLUsed) { - for (URIish uri : config.getURIs(projectName, "*")) { - uris.put(config, uri); - } - } - } - return uris; + this.replicateAllOnPluginStart = config.getBoolean("gerrit", "replicateOnStartup", false); + this.defaultForceUpdate = config.getBoolean("gerrit", "defaultForceUpdate", false); + this.maxRefsToLog = config.getInt("gerrit", "maxRefsToLog", 0); + this.pluginDataDir = pluginDataDir; } - static String replaceName(String in, String name, boolean keyIsOptional) { + public static String replaceName(String in, String name, boolean keyIsOptional) { String key = "${name}"; int n = in.indexOf(key); if (0 <= n) { @@ -284,28 +90,6 @@ public class ReplicationFileBasedConfig implements ReplicationConfig { return maxRefsToLog; } - private static List<RemoteConfig> allRemotes(FileBasedConfig cfg) throws ConfigInvalidException { - Set<String> names = cfg.getSubsections("remote"); - List<RemoteConfig> result = Lists.newArrayListWithCapacity(names.size()); - for (String name : names) { - try { - result.add(new RemoteConfig(cfg, name)); - } catch (URISyntaxException e) { - throw new ConfigInvalidException( - String.format("remote %s has invalid URL in %s", name, cfg.getFile()), e); - } - } - return result; - } - - /* (non-Javadoc) - * @see com.googlesource.gerrit.plugins.replication.ReplicationConfig#isEmpty() - */ - @Override - public boolean isEmpty() { - return destinations.isEmpty(); - } - @Override public Path getEventsDirectory() { String eventsDirectory = config.getString("replication", null, "eventsDirectory"); @@ -320,66 +104,13 @@ public class ReplicationFileBasedConfig implements ReplicationConfig { } @Override - public int shutdown() { - int discarded = 0; - for (Destination cfg : destinations) { - try { - drainReplicationEvents(cfg); - } catch (EventQueueNotEmptyException e) { - logger.atWarning().log("Event queue not empty: %s", e.getMessage()); - } finally { - discarded += cfg.shutdown(); - } - } - return discarded; - } - - void drainReplicationEvents(Destination destination) throws EventQueueNotEmptyException { - int drainQueueAttempts = destination.getDrainQueueAttempts(); - if (drainQueueAttempts == 0) { - return; - } - int pending = destination.getQueueInfo().pending.size(); - int inFlight = destination.getQueueInfo().inFlight.size(); - - while ((inFlight > 0 || pending > 0) && drainQueueAttempts > 0) { - try { - logger.atInfo().log( - "Draining replication events, postpone shutdown. Events left: inFlight %d, pending %d", - inFlight, pending); - Thread.sleep(destination.getReplicationDelayMilliseconds()); - } catch (InterruptedException ie) { - logger.atWarning().withCause(ie).log( - "Wait for replication events to drain has been interrupted"); - } - pending = destination.getQueueInfo().pending.size(); - inFlight = destination.getQueueInfo().inFlight.size(); - drainQueueAttempts--; - } - - if (pending > 0 || inFlight > 0) { - throw new EventQueueNotEmptyException( - String.format("Pending: %d - InFlight: %d", pending, inFlight)); - } - } - - public static class EventQueueNotEmptyException extends Exception { - private static final long serialVersionUID = 1L; - - public EventQueueNotEmptyException(String errorMessage) { - super(errorMessage); - } - } - - FileBasedConfig getConfig() { + public Config getConfig() { return config; } @Override - public void startup(WorkQueue workQueue) { - for (Destination cfg : destinations) { - cfg.start(workQueue); - } + public String getVersion() { + return Long.toString(config.getFile().lastModified()); } @Override diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFilter.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFilter.java index 05bbb03..5b4204e 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFilter.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFilter.java @@ -15,7 +15,7 @@ package com.googlesource.gerrit.plugins.replication; import com.google.gerrit.common.data.AccessSection; -import com.google.gerrit.reviewdb.client.Project; +import com.google.gerrit.entities.Project; import java.util.Collections; import java.util.List; diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationMetrics.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationMetrics.java index afc7926..1bc17ec 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationMetrics.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationMetrics.java @@ -14,11 +14,14 @@ package com.googlesource.gerrit.plugins.replication; +import com.google.gerrit.extensions.annotations.PluginName; import com.google.gerrit.metrics.Description; import com.google.gerrit.metrics.Field; import com.google.gerrit.metrics.Histogram1; +import com.google.gerrit.metrics.Histogram3; import com.google.gerrit.metrics.MetricMaker; import com.google.gerrit.metrics.Timer1; +import com.google.gerrit.server.logging.PluginMetadata; import com.google.inject.Inject; import com.google.inject.Singleton; @@ -27,10 +30,37 @@ public class ReplicationMetrics { private final Timer1<String> executionTime; private final Histogram1<String> executionDelay; private final Histogram1<String> executionRetries; + private final Histogram3<Integer, String, String> slowProjectReplicationLatency; @Inject - ReplicationMetrics(MetricMaker metricMaker) { - Field<String> DEST_FIELD = Field.ofString("destination"); + ReplicationMetrics(@PluginName String pluginName, MetricMaker metricMaker) { + Field<String> DEST_FIELD = + Field.ofString( + "destination", + (metadataBuilder, fieldValue) -> + metadataBuilder + .pluginName(pluginName) + .addPluginMetadata(PluginMetadata.create("destination", fieldValue))) + .build(); + + Field<String> PROJECT_FIELD = + Field.ofString( + "project", + (metadataBuilder, fieldValue) -> + metadataBuilder + .pluginName(pluginName) + .addPluginMetadata(PluginMetadata.create("project", fieldValue))) + .build(); + + Field<Integer> SLOW_THRESHOLD_FIELD = + Field.ofInteger( + "slow_threshold", + (metadataBuilder, fieldValue) -> + metadataBuilder + .pluginName(pluginName) + .addPluginMetadata( + PluginMetadata.create("slow_threshold", fieldValue.toString()))) + .build(); executionTime = metricMaker.newTimer( @@ -55,6 +85,17 @@ public class ReplicationMetrics { .setCumulative() .setUnit("retries"), DEST_FIELD); + + slowProjectReplicationLatency = + metricMaker.newHistogram( + "latency_slower_than_threshold" + "", + new Description( + "latency for project to destination, where latency was slower than threshold") + .setCumulative() + .setUnit(Description.Units.MILLISECONDS), + SLOW_THRESHOLD_FIELD, + PROJECT_FIELD, + DEST_FIELD); } /** @@ -63,7 +104,7 @@ public class ReplicationMetrics { * @param name the destination name. * @return the timer context. */ - Timer1.Context start(String name) { + Timer1.Context<String> start(String name) { return executionTime.start(name); } @@ -78,4 +119,17 @@ public class ReplicationMetrics { executionDelay.record(name, delay); executionRetries.record(name, retries); } + + /** + * Record replication latency for project to destination, where latency was slower than threshold + * + * @param destinationName the destination name. + * @param projectName the project name. + * @param slowThreshold replication initialDelay in milliseconds. + * @param latency number of retries. + */ + void recordSlowProjectReplication( + String destinationName, String projectName, Integer slowThreshold, long latency) { + slowProjectReplicationLatency.record(slowThreshold, destinationName, projectName, latency); + } } diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationModule.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationModule.java index 835d068..c2b96a1 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationModule.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationModule.java @@ -16,6 +16,7 @@ package com.googlesource.gerrit.plugins.replication; import static com.googlesource.gerrit.plugins.replication.StartReplicationCapability.START_REPLICATION; +import com.google.common.eventbus.EventBus; import com.google.gerrit.extensions.annotations.Exports; import com.google.gerrit.extensions.config.CapabilityDefinition; import com.google.gerrit.extensions.events.GitReferenceUpdatedListener; @@ -23,18 +24,38 @@ import com.google.gerrit.extensions.events.HeadUpdatedListener; import com.google.gerrit.extensions.events.LifecycleListener; import com.google.gerrit.extensions.events.ProjectDeletedListener; import com.google.gerrit.extensions.registration.DynamicSet; +import com.google.gerrit.server.config.SitePaths; import com.google.gerrit.server.events.EventTypes; import com.google.inject.AbstractModule; +import com.google.inject.Inject; +import com.google.inject.ProvisionException; import com.google.inject.Scopes; import com.google.inject.assistedinject.FactoryModuleBuilder; import com.google.inject.internal.UniqueAnnotations; +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import org.eclipse.jgit.errors.ConfigInvalidException; +import org.eclipse.jgit.storage.file.FileBasedConfig; import org.eclipse.jgit.transport.SshSessionFactory; +import org.eclipse.jgit.util.FS; class ReplicationModule extends AbstractModule { + private final SitePaths site; + private final Path cfgPath; + + @Inject + public ReplicationModule(SitePaths site) { + this.site = site; + cfgPath = site.etc_dir.resolve("replication.config"); + } + @Override protected void configure() { install(new FactoryModuleBuilder().build(Destination.Factory.class)); bind(ReplicationQueue.class).in(Scopes.SINGLETON); + bind(ObservableQueue.class).to(ReplicationQueue.class); bind(LifecycleListener.class) .annotatedWith(UniqueAnnotations.create()) .to(ReplicationQueue.class); @@ -57,7 +78,22 @@ class ReplicationModule extends AbstractModule { install(new FactoryModuleBuilder().build(PushAll.Factory.class)); - bind(ReplicationConfig.class).to(AutoReloadConfigDecorator.class); + bind(EventBus.class).in(Scopes.SINGLETON); + bind(ReplicationDestinations.class).to(DestinationsCollection.class); + bind(ConfigParser.class).to(DestinationConfigParser.class).in(Scopes.SINGLETON); + + if (getReplicationConfig().getBoolean("gerrit", "autoReload", false)) { + bind(ReplicationConfig.class) + .annotatedWith(MainReplicationConfig.class) + .to(getReplicationConfigClass()); + bind(ReplicationConfig.class).to(AutoReloadConfigDecorator.class).in(Scopes.SINGLETON); + bind(LifecycleListener.class) + .annotatedWith(UniqueAnnotations.create()) + .to(AutoReloadConfigDecorator.class); + } else { + bind(ReplicationConfig.class).to(getReplicationConfigClass()).in(Scopes.SINGLETON); + } + DynamicSet.setOf(binder(), ReplicationStateListener.class); DynamicSet.bind(binder(), ReplicationStateListener.class).to(ReplicationStateLogger.class); @@ -68,4 +104,22 @@ class ReplicationModule extends AbstractModule { bind(TransportFactory.class).to(TransportFactoryImpl.class).in(Scopes.SINGLETON); } + + private FileBasedConfig getReplicationConfig() { + File replicationConfigFile = cfgPath.toFile(); + FileBasedConfig config = new FileBasedConfig(replicationConfigFile, FS.DETECTED); + try { + config.load(); + } catch (IOException | ConfigInvalidException e) { + throw new ProvisionException("Unable to load " + replicationConfigFile.getAbsolutePath(), e); + } + return config; + } + + private Class<? extends ReplicationConfig> getReplicationConfigClass() { + if (Files.exists(site.etc_dir.resolve("replication"))) { + return FanoutReplicationConfig.class; + } + return ReplicationFileBasedConfig.class; + } } diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java index cf344f8..6a89e80 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java @@ -18,15 +18,16 @@ import com.google.auto.value.AutoValue; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Queues; import com.google.gerrit.common.UsedAt; +import com.google.gerrit.entities.Project; import com.google.gerrit.extensions.events.GitReferenceUpdatedListener; import com.google.gerrit.extensions.events.HeadUpdatedListener; import com.google.gerrit.extensions.events.LifecycleListener; import com.google.gerrit.extensions.events.ProjectDeletedListener; import com.google.gerrit.extensions.registration.DynamicItem; -import com.google.gerrit.reviewdb.client.Project; import com.google.gerrit.server.events.EventDispatcher; import com.google.gerrit.server.git.WorkQueue; import com.google.inject.Inject; +import com.google.inject.Provider; import com.googlesource.gerrit.plugins.replication.PushResultProcessing.GitUpdateProcessing; import com.googlesource.gerrit.plugins.replication.ReplicationConfig.FilterType; import com.googlesource.gerrit.plugins.replication.ReplicationTasksStorage.ReplicateRefUpdate; @@ -41,7 +42,8 @@ import org.slf4j.LoggerFactory; /** Manages automatic replication to remote repositories. */ public class ReplicationQueue - implements LifecycleListener, + implements ObservableQueue, + LifecycleListener, GitReferenceUpdatedListener, ProjectDeletedListener, HeadUpdatedListener { @@ -52,7 +54,7 @@ public class ReplicationQueue private final WorkQueue workQueue; private final DynamicItem<EventDispatcher> dispatcher; - private final ReplicationConfig config; + private final Provider<ReplicationDestinations> destinations; // For Guice circular dependency private final ReplicationTasksStorage replicationTasksStorage; private volatile boolean running; private volatile boolean replaying; @@ -61,13 +63,13 @@ public class ReplicationQueue @Inject ReplicationQueue( WorkQueue wq, - ReplicationConfig rc, + Provider<ReplicationDestinations> rd, DynamicItem<EventDispatcher> dis, ReplicationStateListeners sl, ReplicationTasksStorage rts) { workQueue = wq; dispatcher = dis; - config = rc; + destinations = rd; stateLog = sl; replicationTasksStorage = rts; beforeStartupEventsQueue = Queues.newConcurrentLinkedQueue(); @@ -76,8 +78,9 @@ public class ReplicationQueue @Override public void start() { if (!running) { - config.startup(workQueue); + destinations.get().startup(workQueue); running = true; + replicationTasksStorage.resetAll(); firePendingEvents(); fireBeforeStartupEvents(); } @@ -86,16 +89,18 @@ public class ReplicationQueue @Override public void stop() { running = false; - int discarded = config.shutdown(); + int discarded = destinations.get().shutdown(); if (discarded > 0) { repLog.warn("Canceled {} replication events during shutdown", discarded); } } + @Override public boolean isRunning() { return running; } + @Override public boolean isReplaying() { return replaying; } @@ -107,48 +112,42 @@ public class ReplicationQueue @VisibleForTesting public void scheduleFullSync( Project.NameKey project, String urlMatch, ReplicationState state, boolean now) { - if (!running) { - stateLog.warn("Replication plugin did not finish startup before event", state); - return; - } - - for (Destination cfg : config.getDestinations(FilterType.ALL)) { - if (cfg.wouldPushProject(project)) { - for (URIish uri : cfg.getURIs(project, urlMatch)) { - cfg.schedule(project, PushOne.ALL_REFS, uri, state, now); - replicationTasksStorage.persist( - new ReplicateRefUpdate( - project.get(), PushOne.ALL_REFS, uri, cfg.getRemoteConfigName())); - } - } - } + fire(project, urlMatch, PushOne.ALL_REFS, state, now); } @Override public void onGitReferenceUpdated(GitReferenceUpdatedListener.Event event) { - onGitReferenceUpdated(event.getProjectName(), event.getRefName()); + fire(event.getProjectName(), event.getRefName()); } - private void onGitReferenceUpdated(String projectName, String refName) { + private void fire(String projectName, String refName) { ReplicationState state = new ReplicationState(new GitUpdateProcessing(dispatcher.get())); + fire(Project.nameKey(projectName), null, refName, state, false); + state.markAllPushTasksScheduled(); + } + + private void fire( + Project.NameKey project, + String urlMatch, + String refName, + ReplicationState state, + boolean now) { if (!running) { stateLog.warn( "Replication plugin did not finish startup before event, event replication is postponed", state); - beforeStartupEventsQueue.add(ReferenceUpdatedEvent.create(projectName, refName)); + beforeStartupEventsQueue.add(ReferenceUpdatedEvent.create(project.get(), refName)); return; } - Project.NameKey project = new Project.NameKey(projectName); - for (Destination cfg : config.getDestinations(FilterType.ALL)) { - pushReference(cfg, project, refName, state); + for (Destination cfg : destinations.get().getAll(FilterType.ALL)) { + pushReference(cfg, project, urlMatch, refName, state, now); } - state.markAllPushTasksScheduled(); } private void fire(URIish uri, Project.NameKey project, String refName) { ReplicationState state = new ReplicationState(new GitUpdateProcessing(dispatcher.get())); - for (Destination dest : config.getDestinations(uri, project, refName)) { + for (Destination dest : destinations.get().getDestinations(uri, project, refName)) { dest.schedule(project, refName, uri, state); } state.markAllPushTasksScheduled(); @@ -156,26 +155,29 @@ public class ReplicationQueue @UsedAt(UsedAt.Project.COLLABNET) public void pushReference(Destination cfg, Project.NameKey project, String refName) { - pushReference(cfg, project, refName, null); + pushReference(cfg, project, null, refName, null, true); } private void pushReference( - Destination cfg, Project.NameKey project, String refName, ReplicationState state) { + Destination cfg, + Project.NameKey project, + String urlMatch, + String refName, + ReplicationState state, + boolean now) { boolean withoutState = state == null; if (withoutState) { state = new ReplicationState(new GitUpdateProcessing(dispatcher.get())); } - if (cfg.wouldPushProject(project) && cfg.wouldPushRef(refName)) { - for (URIish uri : cfg.getURIs(project, null)) { - replicationTasksStorage.persist( + for (URIish uri : cfg.getURIs(project, urlMatch)) { + replicationTasksStorage.create( new ReplicateRefUpdate(project.get(), refName, uri, cfg.getRemoteConfigName())); - cfg.schedule(project, refName, uri, state); + cfg.schedule(project, refName, uri, state, now); } } else { repLog.debug("Skipping ref {} on project {}", refName, project.get()); } - if (withoutState) { state.markAllPushTasksScheduled(); } @@ -185,13 +187,13 @@ public class ReplicationQueue replaying = true; try { replaying = true; - for (ReplicationTasksStorage.ReplicateRefUpdate t : replicationTasksStorage.list()) { + for (ReplicationTasksStorage.ReplicateRefUpdate t : replicationTasksStorage.listWaiting()) { if (t == null) { repLog.warn("Encountered null replication event in ReplicationTasksStorage"); continue; } try { - fire(new URIish(t.uri), new Project.NameKey(t.project), t.ref); + fire(new URIish(t.uri), Project.nameKey(t.project), t.ref); } catch (URISyntaxException e) { repLog.error("Encountered malformed URI for persisted event %s", t); } @@ -203,15 +205,15 @@ public class ReplicationQueue @Override public void onProjectDeleted(ProjectDeletedListener.Event event) { - Project.NameKey p = new Project.NameKey(event.getProjectName()); - config.getURIs(Optional.empty(), p, FilterType.PROJECT_DELETION).entries().stream() + Project.NameKey p = Project.nameKey(event.getProjectName()); + destinations.get().getURIs(Optional.empty(), p, FilterType.PROJECT_DELETION).entries().stream() .forEach(e -> e.getKey().scheduleDeleteProject(e.getValue(), p)); } @Override public void onHeadUpdated(HeadUpdatedListener.Event event) { - Project.NameKey p = new Project.NameKey(event.getProjectName()); - config.getURIs(Optional.empty(), p, FilterType.ALL).entries().stream() + Project.NameKey p = Project.nameKey(event.getProjectName()); + destinations.get().getURIs(Optional.empty(), p, FilterType.ALL).entries().stream() .forEach(e -> e.getKey().scheduleUpdateHead(e.getValue(), p, event.getNewHeadName())); } @@ -221,7 +223,7 @@ public class ReplicationQueue String eventKey = String.format("%s:%s", event.projectName(), event.refName()); if (!eventsReplayed.contains(eventKey)) { repLog.info("Firing pending task {}", event); - onGitReferenceUpdated(event.projectName(), event.refName()); + fire(event.projectName(), event.refName()); eventsReplayed.add(eventKey); } } diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationScheduledEvent.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationScheduledEvent.java index aa965fe..28f6b6b 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationScheduledEvent.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationScheduledEvent.java @@ -14,7 +14,7 @@ package com.googlesource.gerrit.plugins.replication; -import com.google.gerrit.reviewdb.client.Project; +import com.google.gerrit.entities.Project; import com.google.gerrit.server.events.RefEvent; public class ReplicationScheduledEvent extends RefEvent { @@ -38,6 +38,6 @@ public class ReplicationScheduledEvent extends RefEvent { @Override public Project.NameKey getProjectNameKey() { - return new Project.NameKey(project); + return Project.nameKey(project); } } diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java index b1fbb10..98bc40f 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java @@ -24,15 +24,38 @@ import com.google.inject.Inject; import com.google.inject.ProvisionException; import com.google.inject.Singleton; import java.io.IOException; +import java.nio.file.DirectoryIteratorException; import java.nio.file.DirectoryStream; import java.nio.file.Files; import java.nio.file.NoSuchFileException; import java.nio.file.Path; +import java.nio.file.StandardCopyOption; import java.util.ArrayList; import java.util.List; import org.eclipse.jgit.lib.ObjectId; import org.eclipse.jgit.transport.URIish; +/** + * A persistent store for replication tasks. + * + * <p>The data of this store lives under <replication_data>/ref-updates where replication_data is + * determined by the replication.eventsDirectory config option and defaults to + * <site_dir>/data/replication. Atomic renames must be supported from anywhere within the store to + * anywhere within the store. This generally means that all the contents of the store needs to live + * on the same filesystem. + * + * <p>Individual tasks are stored in files under the following directories using the sha1 of the + * task: + * + * <p><code> + * .../building/<tmp_name> new replication tasks under construction + * .../running/<sha1> running replication tasks + * .../waiting/<sha1> outstanding replication tasks + * </code> + * + * <p>Tasks are moved atomically via a rename between those directories to indicate the current + * state of each task. + */ @Singleton public class ReplicationTasksStorage { private static final FluentLogger logger = FluentLogger.forEnclosingClass(); @@ -45,6 +68,10 @@ public class ReplicationTasksStorage { public final String uri; public final String remote; + public ReplicateRefUpdate(PushOne push, String ref) { + this(push.getProjectNameKey().get(), ref, push.getURI(), push.getRemoteName()); + } + public ReplicateRefUpdate(String project, String ref, URIish uri, String remote) { this.project = project; this.ref = ref; @@ -61,29 +88,20 @@ public class ReplicationTasksStorage { private static final Gson GSON = new Gson(); private final Path refUpdates; + private final Path buildingUpdates; + private final Path runningUpdates; + private final Path waitingUpdates; @Inject ReplicationTasksStorage(ReplicationConfig config) { refUpdates = config.getEventsDirectory().resolve("ref-updates"); + buildingUpdates = refUpdates.resolve("building"); + runningUpdates = refUpdates.resolve("running"); + waitingUpdates = refUpdates.resolve("waiting"); } - public String persist(ReplicateRefUpdate r) { - String json = GSON.toJson(r) + "\n"; - String key = r.project + "\n" + r.ref + "\n" + r.uri + "\n" + r.remote; - String eventKey = sha1(key).name(); - Path file = refUpdates().resolve(eventKey); - - if (Files.exists(file)) { - return eventKey; - } - - try { - logger.atFine().log("CREATE %s (%s:%s => %s)", file, r.project, r.ref, r.uri); - Files.write(file, json.getBytes(UTF_8)); - } catch (IOException e) { - logger.atWarning().withCause(e).log("Couldn't persist event %s", json); - } - return eventKey; + public synchronized String create(ReplicateRefUpdate r) { + return new Task(r).create(); } @VisibleForTesting @@ -91,27 +109,57 @@ public class ReplicationTasksStorage { this.disableDeleteForTesting = deleteDisabled; } + @VisibleForTesting public void delete(ReplicateRefUpdate r) { - String key = r.project + "\n" + r.ref + "\n" + r.uri + "\n" + r.remote; - String taskKey = sha1(key).name(); - Path file = refUpdates().resolve(taskKey); + new Task(r).delete(); + } - if (disableDeleteForTesting) { - logger.atFine().log("DELETE %s (%s:%s => %s) DISABLED", file, r.project, r.ref, r.uri); - return; + public synchronized void start(PushOne push) { + for (String ref : push.getRefs()) { + new Task(new ReplicateRefUpdate(push, ref)).start(); } + } - try { - logger.atFine().log("DELETE %s (%s:%s => %s)", file, r.project, r.ref, r.uri); - Files.delete(file); - } catch (IOException e) { - logger.atSevere().withCause(e).log("Error while deleting event %s", taskKey); + public synchronized void reset(PushOne push) { + for (String ref : push.getRefs()) { + new Task(new ReplicateRefUpdate(push, ref)).reset(); } } - public List<ReplicateRefUpdate> list() { + public synchronized void resetAll() { + for (ReplicateRefUpdate r : listRunning()) { + new Task(r).reset(); + } + } + + public synchronized void finish(PushOne push) { + for (String ref : push.getRefs()) { + new Task(new ReplicateRefUpdate(push, ref)).finish(); + } + } + + public synchronized List<ReplicateRefUpdate> listWaiting() { + return list(createDir(waitingUpdates)); + } + + @VisibleForTesting + public synchronized List<ReplicateRefUpdate> listRunning() { + return list(createDir(runningUpdates)); + } + + @VisibleForTesting + public synchronized List<ReplicateRefUpdate> listBuilding() { + return list(createDir(buildingUpdates)); + } + + @VisibleForTesting + public synchronized List<ReplicateRefUpdate> list() { + return list(createDir(refUpdates)); + } + + private List<ReplicateRefUpdate> list(Path tasks) { List<ReplicateRefUpdate> results = new ArrayList<>(); - try (DirectoryStream<Path> events = Files.newDirectoryStream(refUpdates())) { + try (DirectoryStream<Path> events = Files.newDirectoryStream(tasks)) { for (Path path : events) { if (Files.isRegularFile(path)) { try { @@ -124,10 +172,17 @@ public class ReplicationTasksStorage { } catch (IOException e) { logger.atSevere().withCause(e).log("Error when firing pending event %s", path); } + } else if (Files.isDirectory(path)) { + try { + results.addAll(list(path)); + } catch (DirectoryIteratorException d) { + // iterating over the sub-directories is expected to have dirs disappear + Nfs.throwIfNotStaleFileHandle(d.getCause()); + } } } } catch (IOException e) { - logger.atSevere().withCause(e).log("Error when firing pending events"); + logger.atSevere().withCause(e).log("Error while listing tasks"); } return results; } @@ -137,11 +192,89 @@ public class ReplicationTasksStorage { return ObjectId.fromRaw(Hashing.sha1().hashString(s, UTF_8).asBytes()); } - private Path refUpdates() { + private static Path createDir(Path dir) { try { - return Files.createDirectories(refUpdates); + return Files.createDirectories(dir); } catch (IOException e) { - throw new ProvisionException(String.format("Couldn't create %s", refUpdates), e); + throw new ProvisionException(String.format("Couldn't create %s", dir), e); + } + } + + private class Task { + public final ReplicateRefUpdate update; + public final String json; + public final String taskKey; + public final Path running; + public final Path waiting; + + public Task(ReplicateRefUpdate update) { + this.update = update; + json = GSON.toJson(update) + "\n"; + String key = update.project + "\n" + update.ref + "\n" + update.uri + "\n" + update.remote; + taskKey = sha1(key).name(); + running = createDir(runningUpdates).resolve(taskKey); + waiting = createDir(waitingUpdates).resolve(taskKey); + } + + public String create() { + if (Files.exists(waiting)) { + return taskKey; + } + + try { + Path tmp = Files.createTempFile(createDir(buildingUpdates), taskKey, null); + logger.atFine().log("CREATE %s %s", tmp, updateLog()); + Files.write(tmp, json.getBytes(UTF_8)); + logger.atFine().log("RENAME %s %s %s", tmp, waiting, updateLog()); + rename(tmp, waiting); + } catch (IOException e) { + logger.atWarning().withCause(e).log("Couldn't create task %s", json); + } + return taskKey; + } + + public void start() { + rename(waiting, running); + } + + public void reset() { + rename(running, waiting); + } + + public void finish() { + if (disableDeleteForTesting) { + logger.atFine().log("DELETE %s %s DISABLED", running, updateLog()); + return; + } + + try { + logger.atFine().log("DELETE %s %s", running, updateLog()); + Files.delete(running); + } catch (IOException e) { + logger.atSevere().withCause(e).log("Error while deleting task %s", taskKey); + } + } + + public void delete() { + try { + Files.deleteIfExists(waiting); + Files.deleteIfExists(running); + } catch (IOException e) { + logger.atSevere().withCause(e).log("Error while deleting task %s", taskKey); + } + } + + private void rename(Path from, Path to) { + try { + logger.atFine().log("RENAME %s to %s %s", from, to, updateLog()); + Files.move(from, to, StandardCopyOption.ATOMIC_MOVE, StandardCopyOption.REPLACE_EXISTING); + } catch (IOException e) { + logger.atSevere().withCause(e).log("Error while renaming task %s", taskKey); + } + } + + private String updateLog() { + return String.format("(%s:%s => %s)", update.project, update.ref, update.uri); } } } diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/UpdateHeadTask.java b/src/main/java/com/googlesource/gerrit/plugins/replication/UpdateHeadTask.java index 70452b4..ffa6be1 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/UpdateHeadTask.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/UpdateHeadTask.java @@ -16,8 +16,8 @@ package com.googlesource.gerrit.plugins.replication; import static com.googlesource.gerrit.plugins.replication.ReplicationQueue.repLog; +import com.google.gerrit.entities.Project; import com.google.gerrit.extensions.registration.DynamicItem; -import com.google.gerrit.reviewdb.client.Project; import com.google.gerrit.server.ioutil.HexFormat; import com.google.gerrit.server.util.IdGenerator; import com.google.inject.Inject; diff --git a/src/main/resources/Documentation/config.md b/src/main/resources/Documentation/config.md index 194238c..0aad73b 100644 --- a/src/main/resources/Documentation/config.md +++ b/src/main/resources/Documentation/config.md @@ -142,6 +142,11 @@ replication.eventsDirectory persisted events will not be deleted. When the plugin is started again, it will trigger all replications found under this directory. + For replication to work, is is important that atomic renames be possible + from within any subdirectory of the eventsDirectory to within any other + subdirectory of the eventsDirectory. This generally means that the entire + contents of the eventsDirectory should live on the same filesystem. + When not set, defaults to the plugin's data directory. remote.NAME.url @@ -442,6 +447,88 @@ remote.NAME.remoteNameStyle By default, replicates without matching, i.e. replicates everything to all remotes. +remote.NAME.slowLatencyThreshold +: the time duration after which the replication of a project to this + destination will be considered "slow". A slow project replication + will cause additional metrics to be exposed for further investigation. + See [metrics.md](metrics.md) for further details. + + default: 15 minutes + +Directory `replication` +-------------------- +The optional directory `$site_path/etc/replication` contains Git-style +config files that controls the replication settings for the replication +plugin. When present all `remote` sections from `replication.config` file are +ignored. + +Files are composed of one `remote` section. Multiple `remote` sections or any +other section makes the file invalid and skipped by the replication plugin. +File name defines remote section name. Each section provides common configuration +settings for one or more destination URLs. For more details how to setup `remote` +sections please refer to the `replication.config` section. + +### Configuration example: + +Static configuration in `$site_path/etc/replication.config`: + +``` +[gerrit] + autoReload = true + replicateOnStartup = false +[replication] + lockErrorMaxRetries = 5 + maxRetries = 5 +``` + +Remote sections in `$site_path/etc/replication` directory: + +* File `$site_path/etc/replication/host-one.config` + + ``` + [remote] + url = gerrit2@host-one.example.com:/some/path/${name}.git + ``` + + +* File `$site_path/etc/replication/pubmirror.config` + + ``` + [remote] + url = mirror1.us.some.org:/pub/git/${name}.git + url = mirror2.us.some.org:/pub/git/${name}.git + url = mirror3.us.some.org:/pub/git/${name}.git + push = +refs/heads/*:refs/heads/* + push = +refs/tags/*:refs/tags/* + threads = 3 + authGroup = Public Mirror Group + authGroup = Second Public Mirror Group + ``` + +Replication plugin resolves config files to the following configuration: + +``` +[gerrit] + autoReload = true + replicateOnStartup = false +[replication] + lockErrorMaxRetries = 5 + maxRetries = 5 + +[remote "host-one"] + url = gerrit2@host-one.example.com:/some/path/${name}.git + +[remote "pubmirror"] + url = mirror1.us.some.org:/pub/git/${name}.git + url = mirror2.us.some.org:/pub/git/${name}.git + url = mirror3.us.some.org:/pub/git/${name}.git + push = +refs/heads/*:refs/heads/* + push = +refs/tags/*:refs/tags/* + threads = 3 + authGroup = Public Mirror Group + authGroup = Second Public Mirror Group +``` + File `secure.config` -------------------- diff --git a/src/main/resources/Documentation/metrics.md b/src/main/resources/Documentation/metrics.md new file mode 100644 index 0000000..ef8b150 --- /dev/null +++ b/src/main/resources/Documentation/metrics.md @@ -0,0 +1,61 @@ +# Metrics + +Some metrics are emitted when replication occurs to a remote destination. +The granularity of the metrics recorded is at destination level, however when a particular project replication is flagged +as slow. This happens when the replication took longer than allowed threshold (see _remote.NAME.slowLatencyThreshold_ in [config.md](config.md)) + +The reason only slow metrics are published, rather than all, is to contain their number, which, on a big Gerrit installation +could potentially be considerably big. + +### Project level + +* plugins_replication_latency_slower_than_<threshold>_<destinationName>_<ProjectName> - Time spent pushing <ProjectName> to remote <destinationName> (in ms) + +### Destination level + +* plugins_replication_replication_delay_<destinationName> - Time spent waiting before pushing to remote <destinationName> (in ms) +* plugins_replication_replication_retries_<destinationName> - Number of retries when pushing to remote <destinationName> +* plugins_replication_replication_latency_<destinationName> - Time spent pushing to remote <destinationName> (in ms) + +### Example +``` +# HELP plugins_replication_replication_delay_destination Generated from Dropwizard metric import (metric=plugins/replication/replication_delay/destination, type=com.codahale.metrics.Histogram) +# TYPE plugins_replication_replication_delay_destination summary +plugins_replication_replication_delay_destinationName{quantile="0.5",} 65726.0 +plugins_replication_replication_delay_destinationName{quantile="0.75",} 65726.0 +plugins_replication_replication_delay_destinationName{quantile="0.95",} 65726.0 +plugins_replication_replication_delay_destinationName{quantile="0.98",} 65726.0 +plugins_replication_replication_delay_destinationName{quantile="0.99",} 65726.0 +plugins_replication_replication_delay_destinationName{quantile="0.999",} 65726.0 +plugins_replication_replication_delay_destinationName_count 3.0 + +# HELP plugins_replication_replication_retries_destination Generated from Dropwizard metric import (metric=plugins/replication/replication_retries/destination, type=com.codahale.metrics.Histogram) +# TYPE plugins_replication_replication_retries_destination summary +plugins_replication_replication_retries_destinationName{quantile="0.5",} 1.0 +plugins_replication_replication_retries_destinationName{quantile="0.75",} 1.0 +plugins_replication_replication_retries_destinationName{quantile="0.95",} 1.0 +plugins_replication_replication_retries_destinationName{quantile="0.98",} 1.0 +plugins_replication_replication_retries_destinationName{quantile="0.99",} 1.0 +plugins_replication_replication_retries_destinationName{quantile="0.999",} 1.0 +plugins_replication_replication_retries_destinationName_count 3.0 + +# HELP plugins_replication_replication_latency_destinationName Generated from Dropwizard metric import (metric=plugins/replication/replication_latency/destinationName, type=com.codahale.metrics.Timer) +# TYPE plugins_replication_replication_latency_destinationName summary +plugins_replication_replication_latency_destinationName{quantile="0.5",} 0.21199641400000002 +plugins_replication_replication_latency_destinationName{quantile="0.75",} 0.321083881 +plugins_replication_replication_latency_destinationName{quantile="0.95",} 0.321083881 +plugins_replication_replication_latency_destinationName{quantile="0.98",} 0.321083881 +plugins_replication_replication_latency_destinationName{quantile="0.99",} 0.321083881 +plugins_replication_replication_latency_destinationName{quantile="0.999",} 0.321083881 +plugins_replication_replication_latency_destinationName_count 2.0 + +# HELP plugins_replication_latency_slower_than_60_destinationName_projectName Generated from Dropwizard metric import (metric=plugins/replication/latency_slower_than/60/destinationName/projectName, type=com.codahale.metrics.Histogram) +# TYPE plugins_replication_latency_slower_than_60_destinationName_projectName summary +plugins_replication_latency_slower_than_60_destinationName_projectName{quantile="0.5",} 278.0 +plugins_replication_latency_slower_than_60_destinationName_projectName{quantile="0.75",} 278.0 +plugins_replication_latency_slower_than_60_destinationName_projectName{quantile="0.95",} 278.0 +plugins_replication_latency_slower_than_60_destinationName_projectName{quantile="0.98",} 278.0 +plugins_replication_latency_slower_than_60_destinationName_projectName{quantile="0.99",} 278.0 +plugins_replication_latency_slower_than_60_destinationName_projectName{quantile="0.999",} 278.0 +plugins_replication_latency_slower_than_60_destinationName_projectName 1.0 +```
\ No newline at end of file diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/AbstractConfigTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/AbstractConfigTest.java index 77dc1cc..2b6a8c4 100644 --- a/src/test/java/com/googlesource/gerrit/plugins/replication/AbstractConfigTest.java +++ b/src/test/java/com/googlesource/gerrit/plugins/replication/AbstractConfigTest.java @@ -16,26 +16,28 @@ package com.googlesource.gerrit.plugins.replication; import static com.google.common.truth.Truth.assertThat; import static java.nio.file.Files.createTempDirectory; -import static org.easymock.EasyMock.anyObject; -import static org.easymock.EasyMock.createMock; -import static org.easymock.EasyMock.createNiceMock; -import static org.easymock.EasyMock.expect; -import static org.easymock.EasyMock.getCurrentArguments; -import static org.easymock.EasyMock.isA; -import static org.easymock.EasyMock.replay; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import com.google.common.eventbus.EventBus; import com.google.gerrit.server.config.SitePaths; +import com.google.gerrit.server.git.WorkQueue; import com.google.inject.Injector; import com.google.inject.Module; +import com.google.inject.util.Providers; import java.io.IOException; import java.nio.file.Path; import java.util.List; import java.util.stream.Collectors; -import org.easymock.IAnswer; +import org.eclipse.jgit.errors.ConfigInvalidException; import org.eclipse.jgit.storage.file.FileBasedConfig; import org.eclipse.jgit.util.FS; import org.junit.Before; import org.junit.Ignore; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; @Ignore public abstract class AbstractConfigTest { @@ -43,6 +45,11 @@ public abstract class AbstractConfigTest { protected final SitePaths sitePaths; protected final Destination.Factory destinationFactoryMock; protected final Path pluginDataPath; + protected ReplicationQueue replicationQueueMock; + protected WorkQueue workQueueMock; + protected EventBus eventBus = new EventBus(); + protected FakeExecutorService executorService = new FakeExecutorService(); + protected ConfigParser configParser; static class FakeDestination extends Destination { public final DestinationConfiguration config; @@ -53,11 +60,9 @@ public abstract class AbstractConfigTest { } private static Injector injectorMock() { - Injector injector = createNiceMock(Injector.class); - Injector childInjectorMock = createNiceMock(Injector.class); - expect(injector.createChildInjector((Module) anyObject())).andReturn(childInjectorMock); - replay(childInjectorMock); - replay(injector); + Injector injector = mock(Injector.class); + Injector childInjectorMock = mock(Injector.class); + when(injector.createChildInjector(any(Module.class))).thenReturn(childInjectorMock); return injector; } } @@ -66,21 +71,26 @@ public abstract class AbstractConfigTest { sitePath = createTempPath("site"); sitePaths = new SitePaths(sitePath); pluginDataPath = createTempPath("data"); - destinationFactoryMock = createMock(Destination.Factory.class); + destinationFactoryMock = mock(Destination.Factory.class); + configParser = new DestinationConfigParser(); } @Before public void setup() { - expect(destinationFactoryMock.create(isA(DestinationConfiguration.class))) - .andAnswer( - new IAnswer<Destination>() { + when(destinationFactoryMock.create(any(DestinationConfiguration.class))) + .thenAnswer( + new Answer<Destination>() { @Override - public Destination answer() throws Throwable { - return new FakeDestination((DestinationConfiguration) getCurrentArguments()[0]); + public Destination answer(InvocationOnMock invocation) throws Throwable { + return new FakeDestination((DestinationConfiguration) invocation.getArguments()[0]); } - }) - .anyTimes(); - replay(destinationFactoryMock); + }); + + replicationQueueMock = mock(ReplicationQueue.class); + when(replicationQueueMock.isRunning()).thenReturn(Boolean.TRUE); + + workQueueMock = mock(WorkQueue.class); + when(workQueueMock.createQueue(anyInt(), any(String.class))).thenReturn(executorService); } protected static Path createTempPath(String prefix) throws IOException { @@ -88,8 +98,12 @@ public abstract class AbstractConfigTest { } protected FileBasedConfig newReplicationConfig() { + return newReplicationConfig("replication.config"); + } + + protected FileBasedConfig newReplicationConfig(String path) { FileBasedConfig replicationConfig = - new FileBasedConfig(sitePaths.etc_dir.resolve("replication.config").toFile(), FS.DETECTED); + new FileBasedConfig(sitePaths.etc_dir.resolve(path).toFile(), FS.DETECTED); return replicationConfig; } @@ -113,4 +127,18 @@ public abstract class AbstractConfigTest { assertThatIsDestination(matchingDestinations.get(0), remoteName, remoteUrls); } + + protected DestinationsCollection newDestinationsCollections(ReplicationConfig replicationConfig) + throws ConfigInvalidException { + return new DestinationsCollection( + destinationFactoryMock, + Providers.of(replicationQueueMock), + replicationConfig, + configParser, + eventBus); + } + + protected ReplicationConfig newReplicationFileBasedConfig() { + return new ReplicationFileBasedConfig(sitePaths, pluginDataPath); + } } diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/AutoReloadConfigDecoratorTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/AutoReloadConfigDecoratorTest.java index 211cafa..b1b9453 100644 --- a/src/test/java/com/googlesource/gerrit/plugins/replication/AutoReloadConfigDecoratorTest.java +++ b/src/test/java/com/googlesource/gerrit/plugins/replication/AutoReloadConfigDecoratorTest.java @@ -15,238 +15,237 @@ package com.googlesource.gerrit.plugins.replication; import static com.google.common.truth.Truth.assertThat; -import static org.easymock.EasyMock.anyInt; -import static org.easymock.EasyMock.anyObject; -import static org.easymock.EasyMock.createNiceMock; -import static org.easymock.EasyMock.expect; -import static org.easymock.EasyMock.replay; -import com.google.gerrit.server.git.WorkQueue; +import com.google.inject.Provider; import com.google.inject.util.Providers; import com.googlesource.gerrit.plugins.replication.ReplicationConfig.FilterType; import java.io.IOException; -import java.util.Collection; import java.util.List; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; +import java.util.function.Supplier; +import org.eclipse.jgit.errors.ConfigInvalidException; import org.eclipse.jgit.storage.file.FileBasedConfig; -import org.junit.Before; import org.junit.Test; public class AutoReloadConfigDecoratorTest extends AbstractConfigTest { - private AutoReloadConfigDecorator autoReloadConfig; - private ReplicationQueue replicationQueueMock; - private WorkQueue workQueueMock; - private FakeExecutorService executorService = new FakeExecutorService(); - - public class FakeExecutorService implements ScheduledExecutorService { - public Runnable refreshCommand; - - @Override - public void shutdown() {} - - @Override - public List<Runnable> shutdownNow() { - return null; - } - - @Override - public boolean isShutdown() { - return false; - } - - @Override - public boolean isTerminated() { - return false; - } - - @Override - public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { - return false; - } - - @Override - public <T> Future<T> submit(Callable<T> task) { - return null; - } - - @Override - public <T> Future<T> submit(Runnable task, T result) { - return null; - } - - @Override - public Future<?> submit(Runnable task) { - return null; - } - - @Override - public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) - throws InterruptedException { - return null; - } - - @Override - public <T> List<Future<T>> invokeAll( - Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) - throws InterruptedException { - return null; - } - - @Override - public <T> T invokeAny(Collection<? extends Callable<T>> tasks) - throws InterruptedException, ExecutionException { - return null; - } - - @Override - public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) - throws InterruptedException, ExecutionException, TimeoutException { - return null; - } - - @Override - public void execute(Runnable command) {} - - @Override - public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) { - return null; - } - - @Override - public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) { - return null; - } - - @Override - public ScheduledFuture<?> scheduleAtFixedRate( - Runnable command, long initialDelay, long period, TimeUnit unit) { - refreshCommand = command; - return null; - } - - @Override - public ScheduledFuture<?> scheduleWithFixedDelay( - Runnable command, long initialDelay, long delay, TimeUnit unit) { - return null; - } - } + ReplicationConfig replicationConfig; public AutoReloadConfigDecoratorTest() throws IOException { super(); } - @Override - @Before - public void setup() { - super.setup(); + @Test + public void shouldAutoReloadReplicationConfig() throws Exception { + FileBasedConfig fileConfig = newReplicationConfig(); + fileConfig.setBoolean("gerrit", null, "autoReload", true); + String remoteName1 = "foo"; + String remoteUrl1 = "ssh://git@git.foo.com/${name}"; + fileConfig.setString("remote", remoteName1, "url", remoteUrl1); + fileConfig.save(); + + replicationConfig = newReplicationFileBasedConfig(); - setupMocks(); - } + newAutoReloadConfig(() -> newReplicationFileBasedConfig()).start(); - private void setupMocks() { - replicationQueueMock = createNiceMock(ReplicationQueue.class); - expect(replicationQueueMock.isRunning()).andReturn(true); - replay(replicationQueueMock); + DestinationsCollection destinationsCollections = newDestinationsCollections(replicationConfig); + destinationsCollections.startup(workQueueMock); + List<Destination> destinations = destinationsCollections.getAll(FilterType.ALL); + assertThat(destinations).hasSize(1); + assertThatIsDestination(destinations.get(0), remoteName1, remoteUrl1); + + TimeUnit.SECONDS.sleep(1); // Allow the filesystem to change the update TS + + String remoteName2 = "bar"; + String remoteUrl2 = "ssh://git@git.bar.com/${name}"; + fileConfig.setString("remote", remoteName2, "url", remoteUrl2); + fileConfig.save(); + executorService.refreshCommand.run(); - workQueueMock = createNiceMock(WorkQueue.class); - expect(workQueueMock.createQueue(anyInt(), anyObject(String.class))).andReturn(executorService); - replay(workQueueMock); + destinations = destinationsCollections.getAll(FilterType.ALL); + assertThat(destinations).hasSize(2); + assertThatContainsDestination(destinations, remoteName1, remoteUrl1); + assertThatContainsDestination(destinations, remoteName2, remoteUrl2); } @Test - public void shouldLoadNotEmptyInitialReplicationConfig() throws Exception { - FileBasedConfig replicationConfig = newReplicationConfig(); - String remoteName = "foo"; - String remoteUrl = "ssh://git@git.somewhere.com/${name}"; - replicationConfig.setString("remote", remoteName, "url", remoteUrl); - replicationConfig.save(); - - autoReloadConfig = - new AutoReloadConfigDecorator( - sitePaths, - destinationFactoryMock, - Providers.of(replicationQueueMock), - pluginDataPath, - "replication", - workQueueMock); - - List<Destination> destinations = autoReloadConfig.getDestinations(FilterType.ALL); + public void shouldAutoReloadFanoutReplicationConfigWhenConfigIsAdded() throws Exception { + String remoteName1 = "foo"; + String remoteUrl1 = "ssh://git@git.foo.com/${name}"; + FileBasedConfig remoteConfig = newReplicationConfig("replication/" + remoteName1 + ".config"); + remoteConfig.setString("remote", null, "url", remoteUrl1); + remoteConfig.save(); + + replicationConfig = new FanoutReplicationConfig(sitePaths, pluginDataPath); + + newAutoReloadConfig( + () -> { + try { + return new FanoutReplicationConfig(sitePaths, pluginDataPath); + } catch (IOException | ConfigInvalidException e) { + throw new RuntimeException(e); + } + }) + .start(); + + DestinationsCollection destinationsCollections = newDestinationsCollections(replicationConfig); + destinationsCollections.startup(workQueueMock); + List<Destination> destinations = destinationsCollections.getAll(FilterType.ALL); assertThat(destinations).hasSize(1); - assertThatIsDestination(destinations.get(0), remoteName, remoteUrl); + assertThatContainsDestination(destinations, remoteName1, remoteUrl1); + + TimeUnit.SECONDS.sleep(1); // Allow the filesystem to change the update TS + + String remoteName2 = "foobar"; + String remoteUrl2 = "ssh://git@git.foobar.com/${name}"; + remoteConfig = newReplicationConfig("replication/" + remoteName2 + ".config"); + remoteConfig.setString("remote", null, "url", remoteUrl2); + remoteConfig.save(); + executorService.refreshCommand.run(); + + destinations = destinationsCollections.getAll(FilterType.ALL); + assertThat(destinations).hasSize(2); + assertThatContainsDestination(destinations, remoteName1, remoteUrl1); + assertThatContainsDestination(destinations, remoteName2, remoteUrl2); } @Test - public void shouldAutoReloadReplicationConfig() throws Exception { - FileBasedConfig replicationConfig = newReplicationConfig(); - replicationConfig.setBoolean("gerrit", null, "autoReload", true); + public void shouldAutoReloadFanoutReplicationConfigWhenConfigIsRemoved() throws Exception { String remoteName1 = "foo"; String remoteUrl1 = "ssh://git@git.foo.com/${name}"; - replicationConfig.setString("remote", remoteName1, "url", remoteUrl1); - replicationConfig.save(); - - autoReloadConfig = - new AutoReloadConfigDecorator( - sitePaths, - destinationFactoryMock, - Providers.of(replicationQueueMock), - pluginDataPath, - "replication", - workQueueMock); - autoReloadConfig.startup(workQueueMock); - - List<Destination> destinations = autoReloadConfig.getDestinations(FilterType.ALL); - assertThat(destinations).hasSize(1); - assertThatIsDestination(destinations.get(0), remoteName1, remoteUrl1); + FileBasedConfig remoteConfig = newReplicationConfig("replication/" + remoteName1 + ".config"); + remoteConfig.setString("remote", null, "url", remoteUrl1); + remoteConfig.save(); + + String remoteName2 = "foobar"; + String remoteUrl2 = "ssh://git@git.foobar.com/${name}"; + remoteConfig = newReplicationConfig("replication/" + remoteName2 + ".config"); + remoteConfig.setString("remote", null, "url", remoteUrl2); + remoteConfig.save(); + + replicationConfig = new FanoutReplicationConfig(sitePaths, pluginDataPath); + + newAutoReloadConfig( + () -> { + try { + return new FanoutReplicationConfig(sitePaths, pluginDataPath); + } catch (IOException | ConfigInvalidException e) { + throw new RuntimeException(e); + } + }) + .start(); + + DestinationsCollection destinationsCollections = newDestinationsCollections(replicationConfig); + destinationsCollections.startup(workQueueMock); + List<Destination> destinations = destinationsCollections.getAll(FilterType.ALL); + assertThat(destinations).hasSize(2); + assertThatContainsDestination(destinations, remoteName1, remoteUrl1); + assertThatContainsDestination(destinations, remoteName2, remoteUrl2); TimeUnit.SECONDS.sleep(1); // Allow the filesystem to change the update TS + assertThat( + sitePaths.etc_dir.resolve("replication/" + remoteName2 + ".config").toFile().delete()) + .isTrue(); + + executorService.refreshCommand.run(); + + destinations = destinationsCollections.getAll(FilterType.ALL); + assertThat(destinations).hasSize(1); + assertThatContainsDestination(destinations, remoteName1, remoteUrl1); + } + + @Test + public void shouldAutoReloadFanoutReplicationConfigWhenConfigIsModified() throws Exception { + String remoteName1 = "foo"; + String remoteUrl1 = "ssh://git@git.foo.com/${name}"; + FileBasedConfig remoteConfig = newReplicationConfig("replication/" + remoteName1 + ".config"); + remoteConfig.setString("remote", null, "url", remoteUrl1); + remoteConfig.save(); + String remoteName2 = "bar"; String remoteUrl2 = "ssh://git@git.bar.com/${name}"; - replicationConfig.setString("remote", remoteName2, "url", remoteUrl2); - replicationConfig.save(); + remoteConfig = newReplicationConfig("replication/" + remoteName2 + ".config"); + remoteConfig.setString("remote", null, "url", remoteUrl2); + remoteConfig.save(); + + replicationConfig = new FanoutReplicationConfig(sitePaths, pluginDataPath); + + newAutoReloadConfig( + () -> { + try { + return new FanoutReplicationConfig(sitePaths, pluginDataPath); + } catch (IOException | ConfigInvalidException e) { + throw new RuntimeException(e); + } + }) + .start(); + + DestinationsCollection destinationsCollections = newDestinationsCollections(replicationConfig); + destinationsCollections.startup(workQueueMock); + List<Destination> destinations = destinationsCollections.getAll(FilterType.ALL); + assertThat(destinations).hasSize(2); + assertThatContainsDestination(destinations, remoteName1, remoteUrl1); + assertThatContainsDestination(destinations, remoteName2, remoteUrl2); + + TimeUnit.SECONDS.sleep(1); // Allow the filesystem to change the update TS + + String remoteUrl3 = "ssh://git@git.foobar.com/${name}"; + remoteConfig.setString("remote", null, "url", remoteUrl3); + remoteConfig.save(); + executorService.refreshCommand.run(); - destinations = autoReloadConfig.getDestinations(FilterType.ALL); + destinations = destinationsCollections.getAll(FilterType.ALL); assertThat(destinations).hasSize(2); assertThatContainsDestination(destinations, remoteName1, remoteUrl1); - assertThatContainsDestination(destinations, remoteName2, remoteUrl2); + assertThatContainsDestination(destinations, remoteName2, remoteUrl3); } @Test public void shouldNotAutoReloadReplicationConfigIfDisabled() throws Exception { String remoteName1 = "foo"; String remoteUrl1 = "ssh://git@git.foo.com/${name}"; - FileBasedConfig replicationConfig = newReplicationConfig(); - replicationConfig.setBoolean("gerrit", null, "autoReload", false); - replicationConfig.setString("remote", remoteName1, "url", remoteUrl1); - replicationConfig.save(); - - autoReloadConfig = - new AutoReloadConfigDecorator( - sitePaths, - destinationFactoryMock, - Providers.of(replicationQueueMock), - pluginDataPath, - "replication", - workQueueMock); - autoReloadConfig.startup(workQueueMock); - - List<Destination> destinations = autoReloadConfig.getDestinations(FilterType.ALL); + FileBasedConfig fileConfig = newReplicationConfig(); + fileConfig.setBoolean("gerrit", null, "autoReload", false); + fileConfig.setString("remote", remoteName1, "url", remoteUrl1); + fileConfig.save(); + + replicationConfig = newReplicationFileBasedConfig(); + + DestinationsCollection destinationsCollections = newDestinationsCollections(replicationConfig); + destinationsCollections.startup(workQueueMock); + List<Destination> destinations = destinationsCollections.getAll(FilterType.ALL); assertThat(destinations).hasSize(1); assertThatIsDestination(destinations.get(0), remoteName1, remoteUrl1); TimeUnit.SECONDS.sleep(1); // Allow the filesystem to change the update TS - replicationConfig.setString("remote", "bar", "url", "ssh://git@git.bar.com/${name}"); - replicationConfig.save(); + fileConfig.setString("remote", "bar", "url", "ssh://git@git.bar.com/${name}"); + fileConfig.save(); executorService.refreshCommand.run(); - assertThat(autoReloadConfig.getDestinations(FilterType.ALL)).isEqualTo(destinations); + assertThat(destinationsCollections.getAll(FilterType.ALL)).isEqualTo(destinations); + } + + private AutoReloadConfigDecorator newAutoReloadConfig( + Supplier<ReplicationConfig> configSupplier) { + AutoReloadRunnable autoReloadRunnable = + new AutoReloadRunnable( + configParser, + new Provider<ReplicationConfig>() { + + @Override + public ReplicationConfig get() { + return configSupplier.get(); + } + }, + eventBus, + Providers.of(replicationQueueMock)); + return new AutoReloadConfigDecorator( + "replication", + workQueueMock, + newReplicationFileBasedConfig(), + autoReloadRunnable, + eventBus); } } diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/AutoReloadRunnableTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/AutoReloadRunnableTest.java new file mode 100644 index 0000000..e7339d9 --- /dev/null +++ b/src/test/java/com/googlesource/gerrit/plugins/replication/AutoReloadRunnableTest.java @@ -0,0 +1,122 @@ +// Copyright (C) 2019 The Android Open Source Project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.googlesource.gerrit.plugins.replication; + +import static com.google.common.truth.Truth.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import com.google.common.eventbus.EventBus; +import com.google.common.eventbus.Subscribe; +import com.google.gerrit.server.config.SitePaths; +import com.google.inject.Provider; +import com.google.inject.util.Providers; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Collections; +import java.util.List; +import org.eclipse.jgit.errors.ConfigInvalidException; +import org.eclipse.jgit.lib.Config; +import org.junit.Before; +import org.junit.Test; + +public class AutoReloadRunnableTest { + + private SitePaths sitePaths; + private EventBus eventBus; + private ReloadTrackerSubscriber onReloadSubscriber; + private String pluginName; + private ReplicationQueue replicationQueueMock; + + @Before + public void setUp() throws IOException { + Path tmp = Files.createTempFile(pluginName, "_site"); + Files.deleteIfExists(tmp); + sitePaths = new SitePaths(tmp); + pluginName = "replication"; + eventBus = new EventBus(); + onReloadSubscriber = new ReloadTrackerSubscriber(); + eventBus.register(onReloadSubscriber); + + replicationQueueMock = mock(ReplicationQueue.class); + when(replicationQueueMock.isRunning()).thenReturn(Boolean.TRUE); + } + + @Test + public void configurationIsReloadedWhenParsingSucceeds() { + ConfigParser parser = new TestValidConfigurationListener(); + + attemptAutoReload(parser); + + assertThat(onReloadSubscriber.reloaded).isTrue(); + } + + @Test + public void configurationIsNotReloadedWhenParsingFails() { + ConfigParser parser = new TestInvalidConfigurationListener(); + + attemptAutoReload(parser); + + assertThat(onReloadSubscriber.reloaded).isFalse(); + } + + private void attemptAutoReload(ConfigParser validator) { + final AutoReloadRunnable autoReloadRunnable = + new AutoReloadRunnable( + validator, newVersionConfigProvider(), eventBus, Providers.of(replicationQueueMock)); + + autoReloadRunnable.run(); + } + + private Provider<ReplicationConfig> newVersionConfigProvider() { + return new Provider<ReplicationConfig>() { + @Override + public ReplicationConfig get() { + return new ReplicationFileBasedConfig(sitePaths, sitePaths.data_dir) { + @Override + public String getVersion() { + return String.format("%s", System.nanoTime()); + } + }; + } + }; + } + + private static class ReloadTrackerSubscriber { + public boolean reloaded = false; + + @Subscribe + public void onReload( + @SuppressWarnings("unused") List<DestinationConfiguration> destinationConfigurations) { + reloaded = true; + } + } + + private static class TestValidConfigurationListener implements ConfigParser { + @Override + public List<RemoteConfiguration> parseRemotes(Config newConfig) { + return Collections.emptyList(); + } + } + + private static class TestInvalidConfigurationListener implements ConfigParser { + @Override + public List<RemoteConfiguration> parseRemotes(Config configurationChangeEvent) + throws ConfigInvalidException { + throw new ConfigInvalidException("expected test failure"); + } + } +} diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/FakeExecutorService.java b/src/test/java/com/googlesource/gerrit/plugins/replication/FakeExecutorService.java new file mode 100644 index 0000000..2f7059e --- /dev/null +++ b/src/test/java/com/googlesource/gerrit/plugins/replication/FakeExecutorService.java @@ -0,0 +1,118 @@ +// Copyright (C) 2019 The Android Open Source Project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.googlesource.gerrit.plugins.replication; + +import java.util.Collection; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +public class FakeExecutorService implements ScheduledExecutorService { + public Runnable refreshCommand = () -> {}; + + @Override + public void shutdown() {} + + @Override + public List<Runnable> shutdownNow() { + return null; + } + + @Override + public boolean isShutdown() { + return false; + } + + @Override + public boolean isTerminated() { + return false; + } + + @Override + public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { + return false; + } + + @Override + public <T> Future<T> submit(Callable<T> task) { + return null; + } + + @Override + public <T> Future<T> submit(Runnable task, T result) { + return null; + } + + @Override + public Future<?> submit(Runnable task) { + return null; + } + + @Override + public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) + throws InterruptedException { + return null; + } + + @Override + public <T> List<Future<T>> invokeAll( + Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) + throws InterruptedException { + return null; + } + + @Override + public <T> T invokeAny(Collection<? extends Callable<T>> tasks) + throws InterruptedException, ExecutionException { + return null; + } + + @Override + public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) + throws InterruptedException, ExecutionException, TimeoutException { + return null; + } + + @Override + public void execute(Runnable command) {} + + @Override + public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) { + return null; + } + + @Override + public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) { + return null; + } + + @Override + public ScheduledFuture<?> scheduleAtFixedRate( + Runnable command, long initialDelay, long period, TimeUnit unit) { + refreshCommand = command; + return null; + } + + @Override + public ScheduledFuture<?> scheduleWithFixedDelay( + Runnable command, long initialDelay, long delay, TimeUnit unit) { + return null; + } +} diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/FanoutReplicationConfigTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/FanoutReplicationConfigTest.java new file mode 100644 index 0000000..8cba4bb --- /dev/null +++ b/src/test/java/com/googlesource/gerrit/plugins/replication/FanoutReplicationConfigTest.java @@ -0,0 +1,286 @@ +// Copyright (C) 2020 The Android Open Source Project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.googlesource.gerrit.plugins.replication; + +import static com.google.common.io.RecursiveDeleteOption.ALLOW_INSECURE; +import static com.google.common.truth.Truth.assertThat; + +import com.google.common.io.MoreFiles; +import com.googlesource.gerrit.plugins.replication.ReplicationConfig.FilterType; +import java.io.IOException; +import java.util.List; +import org.eclipse.jgit.errors.ConfigInvalidException; +import org.eclipse.jgit.storage.file.FileBasedConfig; +import org.eclipse.jgit.util.FS; +import org.junit.Before; +import org.junit.Test; + +public class FanoutReplicationConfigTest extends AbstractConfigTest { + + public FanoutReplicationConfigTest() throws IOException { + super(); + } + + String remoteName1 = "foo"; + String remoteUrl1 = "ssh://git@git.somewhere.com/${name}"; + String remoteName2 = "bar"; + String remoteUrl2 = "ssh://git@git.elsewhere.com/${name}"; + + @Before + public void setupTests() { + FileBasedConfig config = newReplicationConfig(); + try { + config.save(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Test + public void shouldSkipRemoteConfigFromReplicationConfig() throws Exception { + String remoteName = "foo"; + String remoteUrl = "ssh://git@git.somewhere.com/${name}"; + + FileBasedConfig config = newReplicationConfig(); + config.setString("remote", remoteName, "url", remoteUrl); + config.save(); + + config = newRemoteConfig(remoteName2); + config.setString("remote", null, "url", remoteUrl2); + config.save(); + + DestinationsCollection destinationsCollections = + newDestinationsCollections(new FanoutReplicationConfig(sitePaths, pluginDataPath)); + List<Destination> destinations = destinationsCollections.getAll(FilterType.ALL); + assertThat(destinations).hasSize(1); + + assertThatIsDestination(destinations.get(0), remoteName2, remoteUrl2); + } + + @Test + public void shouldLoadDestinationsFromMultipleFiles() throws Exception { + + FileBasedConfig config = newRemoteConfig(remoteName1); + config.setString("remote", null, "url", remoteUrl1); + config.save(); + + config = newRemoteConfig(remoteName2); + config.setString("remote", null, "url", remoteUrl2); + config.save(); + + DestinationsCollection destinationsCollections = + newDestinationsCollections(new FanoutReplicationConfig(sitePaths, pluginDataPath)); + List<Destination> destinations = destinationsCollections.getAll(FilterType.ALL); + assertThat(destinations).hasSize(2); + + assertThatContainsDestination(destinations, remoteName1, remoteUrl1); + assertThatContainsDestination(destinations, remoteName2, remoteUrl2); + } + + @Test + public void shouldIgnoreDestinationsFromSubdirectories() throws Exception { + + FileBasedConfig config = newRemoteConfig(remoteName1); + config.setString("remote", null, "url", remoteUrl1); + config.save(); + + config = newRemoteConfig("subdirectory/" + remoteName2); + config.setString("remote", null, "url", remoteUrl2); + config.save(); + + DestinationsCollection destinationsCollections = + newDestinationsCollections(new FanoutReplicationConfig(sitePaths, pluginDataPath)); + List<Destination> destinations = destinationsCollections.getAll(FilterType.ALL); + assertThat(destinations).hasSize(1); + + assertThatIsDestination(destinations.get(0), remoteName1, remoteUrl1); + } + + @Test + public void shouldIgnoreNonConfigFiles() throws Exception { + + FileBasedConfig config = newRemoteConfig(remoteName1); + config.setString("remote", null, "url", remoteUrl1); + config.save(); + + config = + new FileBasedConfig( + sitePaths.etc_dir.resolve("replication/" + remoteName2 + ".yaml").toFile(), + FS.DETECTED); + config.setString("remote", null, "url", remoteUrl2); + config.save(); + + DestinationsCollection destinationsCollections = + newDestinationsCollections(new FanoutReplicationConfig(sitePaths, pluginDataPath)); + List<Destination> destinations = destinationsCollections.getAll(FilterType.ALL); + assertThat(destinations).hasSize(1); + + assertThatIsDestination(destinations.get(0), remoteName1, remoteUrl1); + } + + @Test(expected = ConfigInvalidException.class) + public void shouldThrowConfigInvalidExceptionWhenUrlIsMissingName() throws Exception { + FileBasedConfig config = newRemoteConfig(remoteName1); + config.setString("remote", null, "url", "ssh://git@git.elsewhere.com/name"); + config.save(); + + newDestinationsCollections(new FanoutReplicationConfig(sitePaths, pluginDataPath)); + } + + @Test + public void shouldIgnoreEmptyConfigFile() throws Exception { + FileBasedConfig config = newRemoteConfig(remoteName1); + config.save(); + + DestinationsCollection destinationsCollections = + newDestinationsCollections(new FanoutReplicationConfig(sitePaths, pluginDataPath)); + List<Destination> destinations = destinationsCollections.getAll(FilterType.ALL); + assertThat(destinations).hasSize(0); + } + + @Test + public void shouldIgnoreConfigWhenMoreThanOneRemoteInASingleFile() throws Exception { + FileBasedConfig config = newRemoteConfig(remoteName1); + config.setString("remote", null, "url", remoteUrl1); + config.setString("remote", remoteName2, "url", remoteUrl2); + config.save(); + + DestinationsCollection destinationsCollections = + newDestinationsCollections(new FanoutReplicationConfig(sitePaths, pluginDataPath)); + List<Destination> destinations = destinationsCollections.getAll(FilterType.ALL); + assertThat(destinations).hasSize(0); + } + + @Test + public void shouldIgnoreConfigRemoteSection() throws Exception { + FileBasedConfig config = newRemoteConfig(remoteName1); + config.setString("replication", null, "url", remoteUrl1); + config.save(); + + DestinationsCollection destinationsCollections = + newDestinationsCollections(new FanoutReplicationConfig(sitePaths, pluginDataPath)); + List<Destination> destinations = destinationsCollections.getAll(FilterType.ALL); + assertThat(destinations).hasSize(0); + } + + @Test + public void shouldReturnSameVersionWhenNoChanges() throws Exception { + + FileBasedConfig config = newRemoteConfig(remoteName1); + config.setString("remote", null, "url", remoteUrl1); + config.save(); + + config = newRemoteConfig(remoteName2); + config.setString("remote", null, "url", remoteUrl2); + config.save(); + + FanoutReplicationConfig objectUnderTest = + new FanoutReplicationConfig(sitePaths, pluginDataPath); + + String version = objectUnderTest.getVersion(); + + objectUnderTest = new FanoutReplicationConfig(sitePaths, pluginDataPath); + + assertThat(objectUnderTest.getVersion()).isEqualTo(version); + } + + @Test + public void shouldReturnNewVersionWhenConfigFileAdded() throws Exception { + + FileBasedConfig config = newRemoteConfig(remoteName1); + config.setString("remote", null, "url", remoteUrl1); + config.save(); + + FanoutReplicationConfig objectUnderTest = + new FanoutReplicationConfig(sitePaths, pluginDataPath); + + String version = objectUnderTest.getVersion(); + + config = newRemoteConfig(remoteName2); + config.setString("remote", null, "url", remoteUrl2); + config.save(); + + assertThat(objectUnderTest.getVersion()).isNotEqualTo(version); + } + + @Test + public void shouldReturnNewVersionWhenConfigFileIsModified() throws Exception { + + FileBasedConfig config = newRemoteConfig(remoteName1); + config.setString("remote", null, "url", remoteUrl1); + config.save(); + + FanoutReplicationConfig objectUnderTest = + new FanoutReplicationConfig(sitePaths, pluginDataPath); + + String version = objectUnderTest.getVersion(); + + config.setString("remote", null, "url", remoteUrl2); + config.save(); + + assertThat(objectUnderTest.getVersion()).isNotEqualTo(version); + } + + @Test + public void shouldReturnNewVersionWhenConfigFileRemoved() throws Exception { + + FileBasedConfig config = newRemoteConfig(remoteName1); + config.setString("remote", null, "url", remoteUrl1); + config.save(); + + config = newRemoteConfig(remoteName2); + config.setString("remote", null, "url", remoteUrl2); + config.save(); + + FanoutReplicationConfig objectUnderTest = + new FanoutReplicationConfig(sitePaths, pluginDataPath); + + String version = objectUnderTest.getVersion(); + assertThat( + sitePaths.etc_dir.resolve("replication/" + remoteName2 + ".config").toFile().delete()) + .isTrue(); + + assertThat(objectUnderTest.getVersion()).isNotEqualTo(version); + } + + @Test + public void shouldReturnReplicationConfigVersionWhenReplicationConfigDirectoryRemoved() + throws Exception { + + FileBasedConfig config = newRemoteConfig(remoteName1); + config.setString("remote", null, "url", remoteUrl1); + config.save(); + + config = newRemoteConfig(remoteName2); + config.setString("remote", null, "url", remoteUrl2); + config.save(); + + FanoutReplicationConfig objectUnderTest = + new FanoutReplicationConfig(sitePaths, pluginDataPath); + + String replicationConfigVersion = + new ReplicationFileBasedConfig(sitePaths, pluginDataPath).getVersion(); + + MoreFiles.deleteRecursively(sitePaths.etc_dir.resolve("replication"), ALLOW_INSECURE); + + assertThat(objectUnderTest.getVersion()).isEqualTo(replicationConfigVersion); + } + + protected FileBasedConfig newRemoteConfig(String configFileName) { + return new FileBasedConfig( + sitePaths.etc_dir.resolve("replication/" + configFileName + ".config").toFile(), + FS.DETECTED); + } +} diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/GitUpdateProcessingTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/GitUpdateProcessingTest.java index 0f6d629..2ee9a39 100644 --- a/src/test/java/com/googlesource/gerrit/plugins/replication/GitUpdateProcessingTest.java +++ b/src/test/java/com/googlesource/gerrit/plugins/replication/GitUpdateProcessingTest.java @@ -14,11 +14,10 @@ package com.googlesource.gerrit.plugins.replication; -import static org.easymock.EasyMock.createMock; -import static org.easymock.EasyMock.expectLastCall; -import static org.easymock.EasyMock.replay; -import static org.easymock.EasyMock.reset; -import static org.easymock.EasyMock.verify; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import com.google.gerrit.server.events.EventDispatcher; import com.google.gerrit.server.permissions.PermissionBackendException; @@ -36,14 +35,12 @@ public class GitUpdateProcessingTest { @Before public void setUp() throws Exception { - dispatcherMock = createMock(EventDispatcher.class); - replay(dispatcherMock); + dispatcherMock = mock(EventDispatcher.class); gitUpdateProcessing = new GitUpdateProcessing(dispatcherMock); } @Test public void headRefReplicated() throws URISyntaxException, PermissionBackendException { - reset(dispatcherMock); RefReplicatedEvent expectedEvent = new RefReplicatedEvent( "someProject", @@ -51,9 +48,6 @@ public class GitUpdateProcessingTest { "someHost", RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK); - dispatcherMock.postEvent(RefReplicatedEventEquals.eqEvent(expectedEvent)); - expectLastCall().once(); - replay(dispatcherMock); gitUpdateProcessing.onRefReplicatedToOneNode( "someProject", @@ -61,12 +55,11 @@ public class GitUpdateProcessingTest { new URIish("git://someHost/someProject.git"), RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK); - verify(dispatcherMock); + verify(dispatcherMock, times(1)).postEvent(eq(expectedEvent)); } @Test public void changeRefReplicated() throws URISyntaxException, PermissionBackendException { - reset(dispatcherMock); RefReplicatedEvent expectedEvent = new RefReplicatedEvent( "someProject", @@ -74,9 +67,6 @@ public class GitUpdateProcessingTest { "someHost", RefPushResult.FAILED, RemoteRefUpdate.Status.REJECTED_NONFASTFORWARD); - dispatcherMock.postEvent(RefReplicatedEventEquals.eqEvent(expectedEvent)); - expectLastCall().once(); - replay(dispatcherMock); gitUpdateProcessing.onRefReplicatedToOneNode( "someProject", @@ -84,19 +74,15 @@ public class GitUpdateProcessingTest { new URIish("git://someHost/someProject.git"), RefPushResult.FAILED, RemoteRefUpdate.Status.REJECTED_NONFASTFORWARD); - verify(dispatcherMock); + verify(dispatcherMock, times(1)).postEvent(eq(expectedEvent)); } @Test public void onAllNodesReplicated() throws PermissionBackendException { - reset(dispatcherMock); RefReplicationDoneEvent expectedDoneEvent = new RefReplicationDoneEvent("someProject", "refs/heads/master", 5); - dispatcherMock.postEvent(RefReplicationDoneEventEquals.eqEvent(expectedDoneEvent)); - expectLastCall().once(); - replay(dispatcherMock); gitUpdateProcessing.onRefReplicatedToAllNodes("someProject", "refs/heads/master", 5); - verify(dispatcherMock); + verify(dispatcherMock, times(1)).postEvent(eq(expectedDoneEvent)); } } diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/PushOneTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/PushOneTest.java index c010ddb..c09fcd1 100644 --- a/src/test/java/com/googlesource/gerrit/plugins/replication/PushOneTest.java +++ b/src/test/java/com/googlesource/gerrit/plugins/replication/PushOneTest.java @@ -14,19 +14,18 @@ package com.googlesource.gerrit.plugins.replication; -import static com.googlesource.gerrit.plugins.replication.RemoteRefUpdateCollectionMatcher.eqRemoteRef; -import static org.easymock.EasyMock.anyObject; -import static org.easymock.EasyMock.createNiceMock; -import static org.easymock.EasyMock.expect; -import static org.easymock.EasyMock.getCurrentArguments; -import static org.easymock.EasyMock.replay; -import static org.easymock.EasyMock.verify; import static org.eclipse.jgit.lib.Ref.Storage.NEW; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import com.google.common.collect.ImmutableList; +import com.google.gerrit.entities.Project; import com.google.gerrit.extensions.registration.DynamicItem; import com.google.gerrit.metrics.Timer1; -import com.google.gerrit.reviewdb.client.Project; import com.google.gerrit.server.git.GitRepositoryManager; import com.google.gerrit.server.git.PerThreadRequestScope; import com.google.gerrit.server.permissions.PermissionBackend; @@ -45,8 +44,6 @@ import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import junit.framework.AssertionFailedError; -import org.easymock.IAnswer; import org.eclipse.jgit.errors.NotSupportedException; import org.eclipse.jgit.errors.TransportException; import org.eclipse.jgit.lib.Config; @@ -68,6 +65,8 @@ import org.eclipse.jgit.transport.URIish; import org.eclipse.jgit.util.FS; import org.junit.Before; import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; public class PushOneTest { private static final int TEST_PUSH_TIMEOUT_SECS = 10; @@ -86,7 +85,7 @@ public class PushOneTest { private IdGenerator idGeneratorMock; private ReplicationStateListeners replicationStateListenersMock; private ReplicationMetrics replicationMetricsMock; - private Timer1.Context timerContextMock; + private Timer1.Context<String> timerContextMock; private ProjectCache projectCacheMock; private TransportFactory transportFactoryMock; private Transport transportMock; @@ -108,7 +107,7 @@ public class PushOneTest { @Before public void setup() throws Exception { - projectNameKey = new Project.NameKey("fooProject"); + projectNameKey = Project.nameKey("fooProject"); urish = new URIish("http://foo.com/fooProject.git"); newLocalRef = @@ -126,6 +125,7 @@ public class PushOneTest { setupMocks(); } + @SuppressWarnings("unchecked") private void setupMocks() throws Exception { FileBasedConfig config = new FileBasedConfig(new Config(), new File("/foo"), FS.DETECTED); config.setString("remote", "Replication", "push", "foo"); @@ -134,8 +134,8 @@ public class PushOneTest { setupRepositoryMock(config); setupGitRepoManagerMock(); - projectStateMock = createNiceMock(ProjectState.class); - forProjectMock = createNiceMock(ForProject.class); + projectStateMock = mock(ProjectState.class); + forProjectMock = mock(ForProject.class); setupWithUserMock(); setupPermissionBackedMock(); @@ -144,46 +144,22 @@ public class PushOneTest { setupRefSpecMock(); setupRemoteConfigMock(); - credentialsFactory = createNiceMock(CredentialsFactory.class); + credentialsFactory = mock(CredentialsFactory.class); setupFetchConnectionMock(); setupPushConnectionMock(); setupRequestScopeMock(); - idGeneratorMock = createNiceMock(IdGenerator.class); - replicationStateListenersMock = createNiceMock(ReplicationStateListeners.class); + idGeneratorMock = mock(IdGenerator.class); + replicationStateListenersMock = mock(ReplicationStateListeners.class); - timerContextMock = createNiceMock(Timer1.Context.class); + timerContextMock = mock(Timer1.Context.class); setupReplicationMetricsMock(); setupTransportMock(); setupProjectCacheMock(); - replicationConfigMock = createNiceMock(ReplicationConfig.class); - - replay( - gitRepositoryManagerMock, - refUpdateMock, - repositoryMock, - permissionBackendMock, - destinationMock, - remoteConfigMock, - credentialsFactory, - threadRequestScoperMock, - idGeneratorMock, - replicationStateListenersMock, - replicationMetricsMock, - projectCacheMock, - timerContextMock, - transportFactoryMock, - projectStateMock, - withUserMock, - forProjectMock, - fetchConnection, - pushConnection, - refSpecMock, - refDatabaseMock, - replicationConfigMock); + replicationConfigMock = mock(ReplicationConfig.class); } @Test @@ -212,10 +188,7 @@ public class PushOneTest { PushResult pushResult = new PushResult(); - expect(transportMock.push(anyObject(), eqRemoteRef(expectedUpdates))) - .andReturn(pushResult) - .once(); - replay(transportMock); + when(transportMock.push(any(), eq(expectedUpdates))).thenReturn(pushResult); PushOne pushOne = createPushOne(replicationPushFilter); @@ -223,8 +196,6 @@ public class PushOneTest { pushOne.run(); isCallFinished.await(TEST_PUSH_TIMEOUT_SECS, TimeUnit.SECONDS); - - verify(transportMock); } @Test @@ -241,12 +212,6 @@ public class PushOneTest { } }); - // easymock way to check if method was never called - expect(transportMock.push(anyObject(), anyObject())) - .andThrow(new AssertionFailedError()) - .anyTimes(); - replay(transportMock); - PushOne pushOne = createPushOne(replicationPushFilter); pushOne.addRef(PushOne.ALL_REFS); @@ -254,7 +219,7 @@ public class PushOneTest { isCallFinished.await(10, TimeUnit.SECONDS); - verify(transportMock); + verify(transportMock, never()).push(any(), any()); } private PushOne createPushOne(DynamicItem<ReplicationPushFilter> replicationPushFilter) { @@ -281,31 +246,31 @@ public class PushOneTest { } private void setupProjectCacheMock() throws IOException { - projectCacheMock = createNiceMock(ProjectCache.class); - expect(projectCacheMock.checkedGet(projectNameKey)).andReturn(projectStateMock); + projectCacheMock = mock(ProjectCache.class); + when(projectCacheMock.checkedGet(projectNameKey)).thenReturn(projectStateMock); } private void setupTransportMock() throws NotSupportedException, TransportException { - transportMock = createNiceMock(Transport.class); - expect(transportMock.openFetch()).andReturn(fetchConnection); - transportFactoryMock = createNiceMock(TransportFactory.class); - expect(transportFactoryMock.open(repositoryMock, urish)).andReturn(transportMock).anyTimes(); + transportMock = mock(Transport.class); + when(transportMock.openFetch()).thenReturn(fetchConnection); + transportFactoryMock = mock(TransportFactory.class); + when(transportFactoryMock.open(repositoryMock, urish)).thenReturn(transportMock); } private void setupReplicationMetricsMock() { - replicationMetricsMock = createNiceMock(ReplicationMetrics.class); - expect(replicationMetricsMock.start(anyObject())).andReturn(timerContextMock); + replicationMetricsMock = mock(ReplicationMetrics.class); + when(replicationMetricsMock.start(any())).thenReturn(timerContextMock); } private void setupRequestScopeMock() { - threadRequestScoperMock = createNiceMock(PerThreadRequestScope.Scoper.class); - expect(threadRequestScoperMock.scope(anyObject())) - .andAnswer( - new IAnswer<Callable<Object>>() { + threadRequestScoperMock = mock(PerThreadRequestScope.Scoper.class); + when(threadRequestScoperMock.scope(any())) + .thenAnswer( + new Answer<Callable<Object>>() { @SuppressWarnings("unchecked") @Override - public Callable<Object> answer() throws Throwable { - Callable<Object> originalCall = (Callable<Object>) getCurrentArguments()[0]; + public Callable<Object> answer(InvocationOnMock invocation) throws Throwable { + Callable<Object> originalCall = (Callable<Object>) invocation.getArguments()[0]; return new Callable<Object>() { @Override @@ -316,66 +281,64 @@ public class PushOneTest { } }; } - }) - .anyTimes(); + }); } private void setupPushConnectionMock() { - pushConnection = createNiceMock(PushConnection.class); - expect(pushConnection.getRefsMap()).andReturn(remoteRefs); + pushConnection = mock(PushConnection.class); + when(pushConnection.getRefsMap()).thenReturn(remoteRefs); } private void setupFetchConnectionMock() { - fetchConnection = createNiceMock(FetchConnection.class); - expect(fetchConnection.getRefsMap()).andReturn(remoteRefs); + fetchConnection = mock(FetchConnection.class); + when(fetchConnection.getRefsMap()).thenReturn(remoteRefs); } private void setupRemoteConfigMock() { - remoteConfigMock = createNiceMock(RemoteConfig.class); - expect(remoteConfigMock.getPushRefSpecs()).andReturn(ImmutableList.of(refSpecMock)); + remoteConfigMock = mock(RemoteConfig.class); + when(remoteConfigMock.getPushRefSpecs()).thenReturn(ImmutableList.of(refSpecMock)); } private void setupRefSpecMock() { - refSpecMock = createNiceMock(RefSpec.class); - expect(refSpecMock.matchSource(anyObject(String.class))).andReturn(true); - expect(refSpecMock.expandFromSource(anyObject(String.class))).andReturn(refSpecMock); - expect(refSpecMock.getDestination()).andReturn("fooProject").anyTimes(); - expect(refSpecMock.isForceUpdate()).andReturn(false).anyTimes(); + refSpecMock = mock(RefSpec.class); + when(refSpecMock.matchSource(any(String.class))).thenReturn(true); + when(refSpecMock.expandFromSource(any(String.class))).thenReturn(refSpecMock); + when(refSpecMock.getDestination()).thenReturn("fooProject"); + when(refSpecMock.isForceUpdate()).thenReturn(false); } private void setupDestinationMock() { - destinationMock = createNiceMock(Destination.class); - expect(destinationMock.requestRunway(anyObject())).andReturn(RunwayStatus.allowed()); + destinationMock = mock(Destination.class); + when(destinationMock.requestRunway(any())).thenReturn(RunwayStatus.allowed()); } private void setupPermissionBackedMock() { - permissionBackendMock = createNiceMock(PermissionBackend.class); - expect(permissionBackendMock.currentUser()).andReturn(withUserMock); + permissionBackendMock = mock(PermissionBackend.class); + when(permissionBackendMock.currentUser()).thenReturn(withUserMock); } private void setupWithUserMock() { - withUserMock = createNiceMock(WithUser.class); - expect(withUserMock.project(projectNameKey)).andReturn(forProjectMock); + withUserMock = mock(WithUser.class); + when(withUserMock.project(projectNameKey)).thenReturn(forProjectMock); } private void setupGitRepoManagerMock() throws IOException { - gitRepositoryManagerMock = createNiceMock(GitRepositoryManager.class); - expect(gitRepositoryManagerMock.openRepository(projectNameKey)).andReturn(repositoryMock); + gitRepositoryManagerMock = mock(GitRepositoryManager.class); + when(gitRepositoryManagerMock.openRepository(projectNameKey)).thenReturn(repositoryMock); } private void setupRepositoryMock(FileBasedConfig config) throws IOException { - repositoryMock = createNiceMock(Repository.class); - refDatabaseMock = createNiceMock(RefDatabase.class); - expect(repositoryMock.getConfig()).andReturn(config).anyTimes(); - expect(repositoryMock.getRefDatabase()).andReturn(refDatabaseMock); - expect(refDatabaseMock.getRefs()).andReturn(localRefs); - expect(repositoryMock.updateRef("fooProject")).andReturn(refUpdateMock); + repositoryMock = mock(Repository.class); + refDatabaseMock = mock(RefDatabase.class); + when(repositoryMock.getConfig()).thenReturn(config); + when(repositoryMock.getRefDatabase()).thenReturn(refDatabaseMock); + when(refDatabaseMock.getRefs()).thenReturn(localRefs); + when(repositoryMock.updateRef("fooProject")).thenReturn(refUpdateMock); } private void setupRefUpdateMock() { - refUpdateMock = createNiceMock(RefUpdate.class); - expect(refUpdateMock.getOldObjectId()) - .andReturn(ObjectId.fromString("0000000000000000000000000000000000000001")) - .anyTimes(); + refUpdateMock = mock(RefUpdate.class); + when(refUpdateMock.getOldObjectId()) + .thenReturn(ObjectId.fromString("0000000000000000000000000000000000000001")); } } diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/RefReplicatedEventEquals.java b/src/test/java/com/googlesource/gerrit/plugins/replication/RefReplicatedEventEquals.java deleted file mode 100644 index 983e97f..0000000 --- a/src/test/java/com/googlesource/gerrit/plugins/replication/RefReplicatedEventEquals.java +++ /dev/null @@ -1,83 +0,0 @@ -// Copyright (C) 2013 The Android Open Source Project -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package com.googlesource.gerrit.plugins.replication; - -import org.easymock.EasyMock; -import org.easymock.IArgumentMatcher; - -public class RefReplicatedEventEquals implements IArgumentMatcher { - - private RefReplicatedEvent expected; - - public RefReplicatedEventEquals(RefReplicatedEvent expected) { - this.expected = expected; - } - - public static final RefReplicatedEvent eqEvent(RefReplicatedEvent refReplicatedEvent) { - EasyMock.reportMatcher(new RefReplicatedEventEquals(refReplicatedEvent)); - return null; - } - - @Override - public boolean matches(Object actual) { - if (!(actual instanceof RefReplicatedEvent)) { - return false; - } - RefReplicatedEvent actualRefReplicatedEvent = (RefReplicatedEvent) actual; - if (!equals(expected.project, actualRefReplicatedEvent.project)) { - return false; - } - if (!equals(expected.ref, actualRefReplicatedEvent.ref)) { - return false; - } - if (!equals(expected.targetNode, actualRefReplicatedEvent.targetNode)) { - return false; - } - if (!equals(expected.status, actualRefReplicatedEvent.status)) { - return false; - } - if (!equals(expected.refStatus, actualRefReplicatedEvent.refStatus)) { - return false; - } - return true; - } - - private static boolean equals(Object object1, Object object2) { - if (object1 == object2) { - return true; - } - if (object1 != null && !object1.equals(object2)) { - return false; - } - return true; - } - - @Override - public void appendTo(StringBuffer buffer) { - buffer.append("eqEvent("); - buffer.append(expected.getClass().getName()); - buffer.append(" with project \""); - buffer.append(expected.project); - buffer.append("\" and ref \""); - buffer.append(expected.ref); - buffer.append("\" and targetNode \""); - buffer.append(expected.targetNode); - buffer.append("\" and status \""); - buffer.append(expected.status); - buffer.append("\" and refStatus \""); - buffer.append(expected.refStatus); - buffer.append("\")"); - } -} diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/RefReplicationDoneEventEquals.java b/src/test/java/com/googlesource/gerrit/plugins/replication/RefReplicationDoneEventEquals.java deleted file mode 100644 index d1284e1..0000000 --- a/src/test/java/com/googlesource/gerrit/plugins/replication/RefReplicationDoneEventEquals.java +++ /dev/null @@ -1,73 +0,0 @@ -// Copyright (C) 2013 The Android Open Source Project -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package com.googlesource.gerrit.plugins.replication; - -import org.easymock.EasyMock; -import org.easymock.IArgumentMatcher; - -public class RefReplicationDoneEventEquals implements IArgumentMatcher { - - private RefReplicationDoneEvent expected; - - public RefReplicationDoneEventEquals(RefReplicationDoneEvent expected) { - this.expected = expected; - } - - public static final RefReplicationDoneEvent eqEvent(RefReplicationDoneEvent refReplicatedEvent) { - EasyMock.reportMatcher(new RefReplicationDoneEventEquals(refReplicatedEvent)); - return null; - } - - @Override - public boolean matches(Object actual) { - if (!(actual instanceof RefReplicationDoneEvent)) { - return false; - } - RefReplicationDoneEvent actualRefReplicatedDoneEvent = (RefReplicationDoneEvent) actual; - if (!equals(expected.project, actualRefReplicatedDoneEvent.project)) { - return false; - } - if (!equals(expected.ref, actualRefReplicatedDoneEvent.ref)) { - return false; - } - if (expected.nodesCount != actualRefReplicatedDoneEvent.nodesCount) { - return false; - } - return true; - } - - private static boolean equals(Object object1, Object object2) { - if (object1 == object2) { - return true; - } - if (object1 != null && !object1.equals(object2)) { - return false; - } - return true; - } - - @Override - public void appendTo(StringBuffer buffer) { - buffer.append("eqEvent("); - buffer.append(expected.getClass().getName()); - buffer.append(" with project \""); - buffer.append(expected.project); - buffer.append("\" and ref \""); - buffer.append(expected.ref); - buffer.append("\" and nodesCount \""); - buffer.append(expected.nodesCount); - buffer.append("\")"); - } -} diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/RemoteRefUpdateCollectionMatcher.java b/src/test/java/com/googlesource/gerrit/plugins/replication/RemoteRefUpdateCollectionMatcher.java deleted file mode 100644 index 111a792..0000000 --- a/src/test/java/com/googlesource/gerrit/plugins/replication/RemoteRefUpdateCollectionMatcher.java +++ /dev/null @@ -1,64 +0,0 @@ -// Copyright (C) 2019 The Android Open Source Project -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package com.googlesource.gerrit.plugins.replication; - -import java.util.Collection; -import java.util.Objects; -import org.easymock.EasyMock; -import org.easymock.IArgumentMatcher; -import org.eclipse.jgit.transport.RemoteRefUpdate; - -public class RemoteRefUpdateCollectionMatcher implements IArgumentMatcher { - Collection<RemoteRefUpdate> expectedRemoteRefs; - - public static Collection<RemoteRefUpdate> eqRemoteRef( - Collection<RemoteRefUpdate> expectedRemoteRefs) { - EasyMock.reportMatcher(new RemoteRefUpdateCollectionMatcher(expectedRemoteRefs)); - return null; - } - - public RemoteRefUpdateCollectionMatcher(Collection<RemoteRefUpdate> expectedRemoteRefs) { - this.expectedRemoteRefs = expectedRemoteRefs; - } - - @Override - public boolean matches(Object argument) { - if (!(argument instanceof Collection)) return false; - - @SuppressWarnings("unchecked") - Collection<RemoteRefUpdate> refs = (Collection<RemoteRefUpdate>) argument; - - if (expectedRemoteRefs.size() != refs.size()) return false; - return refs.stream() - .allMatch( - ref -> expectedRemoteRefs.stream().anyMatch(expectedRef -> compare(ref, expectedRef))); - } - - @Override - public void appendTo(StringBuffer buffer) { - buffer.append("expected:" + expectedRemoteRefs.toString()); - } - - private boolean compare(RemoteRefUpdate ref, RemoteRefUpdate expectedRef) { - return Objects.equals(ref.getRemoteName(), expectedRef.getRemoteName()) - && Objects.equals(ref.getStatus(), expectedRef.getStatus()) - && Objects.equals(ref.getExpectedOldObjectId(), expectedRef.getExpectedOldObjectId()) - && Objects.equals(ref.getNewObjectId(), expectedRef.getNewObjectId()) - && Objects.equals(ref.isFastForward(), expectedRef.isFastForward()) - && Objects.equals(ref.getSrcRef(), expectedRef.getSrcRef()) - && Objects.equals(ref.isForceUpdate(), expectedRef.isForceUpdate()) - && Objects.equals(ref.getMessage(), expectedRef.getMessage()); - } -} diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationFanoutIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationFanoutIT.java new file mode 100644 index 0000000..adb7fc8 --- /dev/null +++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationFanoutIT.java @@ -0,0 +1,266 @@ +// Copyright (C) 2020 The Android Open Source Project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.googlesource.gerrit.plugins.replication; + +import static com.google.common.truth.Truth.assertThat; +import static java.util.stream.Collectors.toList; + +import com.google.common.flogger.FluentLogger; +import com.google.common.io.MoreFiles; +import com.google.common.io.RecursiveDeleteOption; +import com.google.gerrit.acceptance.LightweightPluginDaemonTest; +import com.google.gerrit.acceptance.PushOneCommit.Result; +import com.google.gerrit.acceptance.TestPlugin; +import com.google.gerrit.acceptance.UseLocalDisk; +import com.google.gerrit.acceptance.testsuite.project.ProjectOperations; +import com.google.gerrit.entities.Project; +import com.google.gerrit.extensions.annotations.PluginData; +import com.google.gerrit.extensions.api.projects.BranchInput; +import com.google.gerrit.server.config.SitePaths; +import com.google.inject.Inject; +import com.google.inject.Key; +import com.googlesource.gerrit.plugins.replication.ReplicationTasksStorage.ReplicateRefUpdate; +import java.io.IOException; +import java.nio.file.DirectoryStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.time.Duration; +import java.util.Arrays; +import java.util.List; +import java.util.Optional; +import java.util.function.Supplier; +import java.util.regex.Pattern; +import org.eclipse.jgit.lib.Ref; +import org.eclipse.jgit.lib.Repository; +import org.eclipse.jgit.revwalk.RevCommit; +import org.eclipse.jgit.storage.file.FileBasedConfig; +import org.eclipse.jgit.util.FS; +import org.junit.After; +import org.junit.Test; + +@UseLocalDisk +@TestPlugin( + name = "replication", + sysModule = "com.googlesource.gerrit.plugins.replication.ReplicationModule") +public class ReplicationFanoutIT extends LightweightPluginDaemonTest { + private static final Optional<String> ALL_PROJECTS = Optional.empty(); + private static final FluentLogger logger = FluentLogger.forEnclosingClass(); + private static final int TEST_REPLICATION_DELAY = 1; + private static final Duration TEST_TIMEOUT = Duration.ofSeconds(TEST_REPLICATION_DELAY * 2); + + @Inject private SitePaths sitePaths; + @Inject private ProjectOperations projectOperations; + private Path pluginDataDir; + private Path gitPath; + private Path storagePath; + private FileBasedConfig config; + private ReplicationTasksStorage tasksStorage; + + @Override + public void setUpTestPlugin() throws Exception { + gitPath = sitePaths.site_path.resolve("git"); + + config = + new FileBasedConfig(sitePaths.etc_dir.resolve("replication.config").toFile(), FS.DETECTED); + setAutoReload(); + config.save(); + + setReplicationDestination("remote1", "suffix1", Optional.of("not-used-project")); + + super.setUpTestPlugin(); + + pluginDataDir = plugin.getSysInjector().getInstance(Key.get(Path.class, PluginData.class)); + storagePath = pluginDataDir.resolve("ref-updates"); + tasksStorage = plugin.getSysInjector().getInstance(ReplicationTasksStorage.class); + cleanupReplicationTasks(); + tasksStorage.disableDeleteForTesting(true); + } + + @After + public void cleanUp() throws IOException { + if (Files.exists(sitePaths.etc_dir.resolve("replication"))) { + MoreFiles.deleteRecursively( + sitePaths.etc_dir.resolve("replication"), RecursiveDeleteOption.ALLOW_INSECURE); + } + } + + @Test + public void shouldReplicateNewBranch() throws Exception { + setReplicationDestination("foo", "replica", ALL_PROJECTS); + reloadConfig(); + + Project.NameKey targetProject = createTestProject(project + "replica"); + String newBranch = "refs/heads/mybranch"; + String master = "refs/heads/master"; + BranchInput input = new BranchInput(); + input.revision = master; + gApi.projects().name(project.get()).branch(newBranch).create(input); + + assertThat(listReplicationTasks("refs/heads/(mybranch|master)")).hasSize(2); + + try (Repository repo = repoManager.openRepository(targetProject); + Repository sourceRepo = repoManager.openRepository(project)) { + waitUntil(() -> checkedGetRef(repo, newBranch) != null); + + Ref masterRef = getRef(sourceRepo, master); + Ref targetBranchRef = getRef(repo, newBranch); + assertThat(targetBranchRef).isNotNull(); + assertThat(targetBranchRef.getObjectId()).isEqualTo(masterRef.getObjectId()); + } + } + + @Test + public void shouldReplicateNewBranchToTwoRemotes() throws Exception { + Project.NameKey targetProject1 = createTestProject(project + "replica1"); + Project.NameKey targetProject2 = createTestProject(project + "replica2"); + + setReplicationDestination("foo1", "replica1", ALL_PROJECTS); + setReplicationDestination("foo2", "replica2", ALL_PROJECTS); + reloadConfig(); + + Result pushResult = createChange(); + RevCommit sourceCommit = pushResult.getCommit(); + String sourceRef = pushResult.getPatchSet().refName(); + + assertThat(listReplicationTasks("refs/changes/\\d*/\\d*/\\d*")).hasSize(2); + + try (Repository repo1 = repoManager.openRepository(targetProject1); + Repository repo2 = repoManager.openRepository(targetProject2)) { + waitUntil( + () -> + (checkedGetRef(repo1, sourceRef) != null && checkedGetRef(repo2, sourceRef) != null)); + + Ref targetBranchRef1 = getRef(repo1, sourceRef); + assertThat(targetBranchRef1).isNotNull(); + assertThat(targetBranchRef1.getObjectId()).isEqualTo(sourceCommit.getId()); + + Ref targetBranchRef2 = getRef(repo2, sourceRef); + assertThat(targetBranchRef2).isNotNull(); + assertThat(targetBranchRef2.getObjectId()).isEqualTo(sourceCommit.getId()); + } + } + + @Test + public void shouldCreateIndividualReplicationTasksForEveryRemoteUrlPair() throws Exception { + List<String> replicaSuffixes = Arrays.asList("replica1", "replica2"); + createTestProject(project + "replica1"); + createTestProject(project + "replica2"); + + setReplicationDestination("foo1", replicaSuffixes, ALL_PROJECTS); + setReplicationDestination("foo2", replicaSuffixes, ALL_PROJECTS); + config.setInt("remote", "foo1", "replicationDelay", TEST_REPLICATION_DELAY * 100); + config.setInt("remote", "foo2", "replicationDelay", TEST_REPLICATION_DELAY * 100); + reloadConfig(); + + createChange(); + + assertThat(listReplicationTasks("refs/changes/\\d*/\\d*/\\d*")).hasSize(4); + + setReplicationDestination("foo1", replicaSuffixes, ALL_PROJECTS); + setReplicationDestination("foo2", replicaSuffixes, ALL_PROJECTS); + } + + private Ref getRef(Repository repo, String branchName) throws IOException { + return repo.getRefDatabase().exactRef(branchName); + } + + private Ref checkedGetRef(Repository repo, String branchName) { + try { + return repo.getRefDatabase().exactRef(branchName); + } catch (Exception e) { + logger.atSevere().withCause(e).log("failed to get ref %s in repo %s", branchName, repo); + return null; + } + } + + private void setReplicationDestination( + String remoteName, String replicaSuffix, Optional<String> project) throws IOException { + setReplicationDestination(remoteName, Arrays.asList(replicaSuffix), project); + } + + private void setReplicationDestination( + String remoteName, List<String> replicaSuffixes, Optional<String> allProjects) + throws IOException { + FileBasedConfig remoteConfig = + new FileBasedConfig( + sitePaths.etc_dir.resolve("replication/" + remoteName + ".config").toFile(), + FS.DETECTED); + + setReplicationDestination(remoteConfig, replicaSuffixes, allProjects); + } + + private void setAutoReload() throws IOException { + config.setBoolean("gerrit", null, "autoReload", true); + config.save(); + } + + private void setReplicationDestination( + FileBasedConfig config, List<String> replicaSuffixes, Optional<String> project) + throws IOException { + + List<String> replicaUrls = + replicaSuffixes.stream() + .map(suffix -> gitPath.resolve("${name}" + suffix + ".git").toString()) + .collect(toList()); + config.setStringList("remote", null, "url", replicaUrls); + config.setInt("remote", null, "replicationDelay", TEST_REPLICATION_DELAY); + project.ifPresent(prj -> config.setString("remote", null, "projects", prj)); + + config.save(); + } + + private void waitUntil(Supplier<Boolean> waitCondition) throws InterruptedException { + WaitUtil.waitUntil(waitCondition, TEST_TIMEOUT); + } + + private void reloadConfig() { + getAutoReloadConfigDecoratorInstance().reload(); + } + + private AutoReloadConfigDecorator getAutoReloadConfigDecoratorInstance() { + return getInstance(AutoReloadConfigDecorator.class); + } + + private <T> T getInstance(Class<T> classObj) { + return plugin.getSysInjector().getInstance(classObj); + } + + private Project.NameKey createTestProject(String name) throws Exception { + return projectOperations.newProject().name(name).create(); + } + + private List<ReplicateRefUpdate> listReplicationTasks(String refRegex) { + Pattern refmaskPattern = Pattern.compile(refRegex); + return tasksStorage.list().stream() + .filter(task -> refmaskPattern.matcher(task.ref).matches()) + .collect(toList()); + } + + public void cleanupReplicationTasks() throws IOException { + cleanupReplicationTasks(storagePath); + } + + private void cleanupReplicationTasks(Path basePath) throws IOException { + try (DirectoryStream<Path> files = Files.newDirectoryStream(basePath)) { + for (Path path : files) { + if (Files.isDirectory(path)) { + cleanupReplicationTasks(path); + } else { + path.toFile().delete(); + } + } + } + } +} diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfigTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfigTest.java index 36cc209..efacae7 100644 --- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfigTest.java +++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfigTest.java @@ -19,7 +19,6 @@ import static com.google.common.truth.Truth.assertThat; import com.googlesource.gerrit.plugins.replication.ReplicationConfig.FilterType; import java.io.IOException; import java.util.List; -import org.eclipse.jgit.errors.ConfigInvalidException; import org.eclipse.jgit.storage.file.FileBasedConfig; import org.junit.Test; @@ -37,8 +36,10 @@ public class ReplicationFileBasedConfigTest extends AbstractConfigTest { config.setString("remote", remoteName, "url", remoteUrl); config.save(); - ReplicationFileBasedConfig replicationConfig = newReplicationFileBasedConfig(); - List<Destination> destinations = replicationConfig.getDestinations(FilterType.ALL); + DestinationsCollection destinationsCollections = + newDestinationsCollections(newReplicationFileBasedConfig()); + destinationsCollections.startup(workQueueMock); + List<Destination> destinations = destinationsCollections.getAll(FilterType.ALL); assertThat(destinations).hasSize(1); assertThatIsDestination(destinations.get(0), remoteName, remoteUrl); @@ -55,18 +56,13 @@ public class ReplicationFileBasedConfigTest extends AbstractConfigTest { config.setString("remote", remoteName2, "url", remoteUrl2); config.save(); - ReplicationFileBasedConfig replicationConfig = newReplicationFileBasedConfig(); - List<Destination> destinations = replicationConfig.getDestinations(FilterType.ALL); + DestinationsCollection destinationsCollections = + newDestinationsCollections(newReplicationFileBasedConfig()); + destinationsCollections.startup(workQueueMock); + List<Destination> destinations = destinationsCollections.getAll(FilterType.ALL); assertThat(destinations).hasSize(2); - assertThatIsDestination(destinations.get(0), remoteName1, remoteUrl1); - assertThatIsDestination(destinations.get(1), remoteName2, remoteUrl2); - } - - private ReplicationFileBasedConfig newReplicationFileBasedConfig() - throws ConfigInvalidException, IOException { - ReplicationFileBasedConfig replicationConfig = - new ReplicationFileBasedConfig(sitePaths, destinationFactoryMock, pluginDataPath); - return replicationConfig; + assertThatContainsDestination(destinations, remoteName1, remoteUrl1); + assertThatContainsDestination(destinations, remoteName2, remoteUrl2); } } diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java index 0074075..31cd75d 100644 --- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java +++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java @@ -25,13 +25,13 @@ import com.google.gerrit.acceptance.PushOneCommit.Result; import com.google.gerrit.acceptance.TestPlugin; import com.google.gerrit.acceptance.UseLocalDisk; import com.google.gerrit.acceptance.testsuite.project.ProjectOperations; +import com.google.gerrit.entities.Project; import com.google.gerrit.extensions.annotations.PluginData; import com.google.gerrit.extensions.api.changes.NotifyHandling; import com.google.gerrit.extensions.api.projects.BranchInput; import com.google.gerrit.extensions.common.ProjectInfo; import com.google.gerrit.extensions.events.ProjectDeletedListener; import com.google.gerrit.extensions.registration.DynamicSet; -import com.google.gerrit.reviewdb.client.Project; import com.google.gerrit.server.config.SitePaths; import com.google.inject.Inject; import com.google.inject.Key; @@ -106,7 +106,7 @@ public class ReplicationIT extends LightweightPluginDaemonTest { assertThat(listReplicationTasks("refs/meta/config")).hasSize(1); - waitUntil(() -> nonEmptyProjectExists(new Project.NameKey(sourceProject + "replica"))); + waitUntil(() -> nonEmptyProjectExists(Project.nameKey(sourceProject + "replica.git"))); ProjectInfo replicaProject = gApi.projects().name(sourceProject + "replica").get(); assertThat(replicaProject).isNotNull(); @@ -149,7 +149,7 @@ public class ReplicationIT extends LightweightPluginDaemonTest { Result pushResult = createChange(); RevCommit sourceCommit = pushResult.getCommit(); - String sourceRef = pushResult.getPatchSet().getRefName(); + String sourceRef = pushResult.getPatchSet().refName(); assertThat(listReplicationTasks("refs/changes/\\d*/\\d*/\\d*")).hasSize(1); @@ -198,7 +198,7 @@ public class ReplicationIT extends LightweightPluginDaemonTest { Result pushResult = createChange(); RevCommit sourceCommit = pushResult.getCommit(); - String sourceRef = pushResult.getPatchSet().getRefName(); + String sourceRef = pushResult.getPatchSet().refName(); assertThat(listReplicationTasks("refs/changes/\\d*/\\d*/\\d*")).hasSize(2); @@ -329,6 +329,8 @@ public class ReplicationIT extends LightweightPluginDaemonTest { } private void replicateBranchDeletion(boolean mirror) throws Exception { + tasksStorage.disableDeleteForTesting(false); + setReplicationDestination("foo", "replica", ALL_PROJECTS, mirror); reloadConfig(); @@ -371,10 +373,10 @@ public class ReplicationIT extends LightweightPluginDaemonTest { setReplicationDestination(remoteName, "replica", ALL_PROJECTS); Result pushResult = createChange(); - shutdownConfig(); + shutdownDestinations(); pushResult.getCommit(); - String sourceRef = pushResult.getPatchSet().getRefName(); + String sourceRef = pushResult.getPatchSet().refName(); assertThrows( InterruptedException.class, @@ -397,10 +399,10 @@ public class ReplicationIT extends LightweightPluginDaemonTest { reloadConfig(); Result pushResult = createChange(); - shutdownConfig(); + shutdownDestinations(); RevCommit sourceCommit = pushResult.getCommit(); - String sourceRef = pushResult.getPatchSet().getRefName(); + String sourceRef = pushResult.getPatchSet().refName(); try (Repository repo = repoManager.openRepository(targetProject)) { waitUntil(() -> checkedGetRef(repo, sourceRef) != null); @@ -422,7 +424,7 @@ public class ReplicationIT extends LightweightPluginDaemonTest { replicationQueueStart(); RevCommit sourceCommit = pushResult.getCommit(); - String sourceRef = pushResult.getPatchSet().getRefName(); + String sourceRef = pushResult.getPatchSet().refName(); try (Repository repo = repoManager.openRepository(targetProject)) { waitUntil(() -> checkedGetRef(repo, sourceRef) != null); @@ -433,6 +435,20 @@ public class ReplicationIT extends LightweightPluginDaemonTest { } @Test + public void shouldCleanupTasksAfterNewProjectReplication() throws Exception { + tasksStorage.disableDeleteForTesting(false); + setReplicationDestination("task_cleanup_project", "replica", ALL_PROJECTS); + config.setInt("remote", "task_cleanup_project", "replicationRetry", 0); + config.save(); + reloadConfig(); + assertThat(tasksStorage.listRunning()).hasSize(0); + Project.NameKey sourceProject = createTestProject("task_cleanup_project"); + + waitUntil(() -> nonEmptyProjectExists(Project.nameKey(sourceProject + "replica.git"))); + waitUntil(() -> tasksStorage.listRunning().size() == 0); + } + + @Test public void shouldFirePendingOnlyToStoredUri() throws Exception { String suffix1 = "replica1"; String suffix2 = "replica2"; @@ -444,7 +460,7 @@ public class ReplicationIT extends LightweightPluginDaemonTest { setReplicationDestination(remote2, suffix2, ALL_PROJECTS, Integer.MAX_VALUE, false); reloadConfig(); - String changeRef = createChange().getPatchSet().getRefName(); + String changeRef = createChange().getPatchSet().refName(); tasksStorage.disableDeleteForTesting(false); changeReplicationTasksForRemote(changeRef, remote1).forEach(tasksStorage::delete); @@ -531,6 +547,7 @@ public class ReplicationIT extends LightweightPluginDaemonTest { config.setInt("remote", remoteName, "replicationRetry", TEST_REPLICATION_RETRY); config.setBoolean("remote", remoteName, "mirror", mirror); project.ifPresent(prj -> config.setString("remote", remoteName, "projects", prj)); + config.setBoolean("gerrit", null, "autoReload", true); config.save(); } @@ -545,11 +562,11 @@ public class ReplicationIT extends LightweightPluginDaemonTest { } private void reloadConfig() { - getAutoReloadConfigDecoratorInstance().forceReload(); + getAutoReloadConfigDecoratorInstance().reload(); } - private void shutdownConfig() { - getAutoReloadConfigDecoratorInstance().shutdown(); + private void shutdownDestinations() { + getInstance(DestinationsCollection.class).shutdown(); } private void replicationQueueStart() { @@ -590,10 +607,18 @@ public class ReplicationIT extends LightweightPluginDaemonTest { .collect(toList()); } - private void cleanupReplicationTasks() throws IOException { - try (DirectoryStream<Path> files = Files.newDirectoryStream(storagePath)) { + public void cleanupReplicationTasks() throws IOException { + cleanupReplicationTasks(storagePath); + } + + private void cleanupReplicationTasks(Path basePath) throws IOException { + try (DirectoryStream<Path> files = Files.newDirectoryStream(basePath)) { for (Path path : files) { - path.toFile().delete(); + if (Files.isDirectory(path)) { + cleanupReplicationTasks(path); + } else { + path.toFile().delete(); + } } } } diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStateTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStateTest.java index 193af1e..e0de577 100644 --- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStateTest.java +++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStateTest.java @@ -15,11 +15,9 @@ package com.googlesource.gerrit.plugins.replication; import static com.google.common.truth.Truth.assertThat; -import static org.easymock.EasyMock.createNiceMock; -import static org.easymock.EasyMock.replay; -import static org.easymock.EasyMock.resetToDefault; -import static org.easymock.EasyMock.verify; import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; import com.googlesource.gerrit.plugins.replication.ReplicationState.RefPushResult; import java.net.URISyntaxException; @@ -35,8 +33,7 @@ public class ReplicationStateTest { @Before public void setUp() throws Exception { - pushResultProcessingMock = createNiceMock(PushResultProcessing.class); - replay(pushResultProcessingMock); + pushResultProcessingMock = mock(PushResultProcessing.class); replicationState = new ReplicationState(pushResultProcessingMock); } @@ -53,52 +50,36 @@ public class ReplicationStateTest { @Test public void shouldFireOneReplicationEventWhenNothingToReplicate() { - resetToDefault(pushResultProcessingMock); - - // expected event - pushResultProcessingMock.onAllRefsReplicatedToAllNodes(0); - replay(pushResultProcessingMock); - // actual test replicationState.markAllPushTasksScheduled(); - verify(pushResultProcessingMock); + + // expected event + verify(pushResultProcessingMock).onAllRefsReplicatedToAllNodes(0); } @Test public void shouldFireEventsForReplicationOfOneRefToOneNode() throws URISyntaxException { - resetToDefault(pushResultProcessingMock); URIish uri = new URIish("git://someHost/someRepo.git"); - // expected events - pushResultProcessingMock.onRefReplicatedToOneNode( - "someProject", "someRef", uri, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK); - pushResultProcessingMock.onRefReplicatedToAllNodes("someProject", "someRef", 1); - pushResultProcessingMock.onAllRefsReplicatedToAllNodes(1); - replay(pushResultProcessingMock); - // actual test replicationState.increasePushTaskCount("someProject", "someRef"); replicationState.markAllPushTasksScheduled(); replicationState.notifyRefReplicated( "someProject", "someRef", uri, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK); - verify(pushResultProcessingMock); + + // expected events + verify(pushResultProcessingMock) + .onRefReplicatedToOneNode( + "someProject", "someRef", uri, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK); + verify(pushResultProcessingMock).onRefReplicatedToAllNodes("someProject", "someRef", 1); + verify(pushResultProcessingMock).onAllRefsReplicatedToAllNodes(1); } @Test public void shouldFireEventsForReplicationOfOneRefToMultipleNodes() throws URISyntaxException { - resetToDefault(pushResultProcessingMock); URIish uri1 = new URIish("git://someHost1/someRepo.git"); URIish uri2 = new URIish("git://someHost2/someRepo.git"); - // expected events - pushResultProcessingMock.onRefReplicatedToOneNode( - "someProject", "someRef", uri1, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK); - pushResultProcessingMock.onRefReplicatedToOneNode( - "someProject", "someRef", uri2, RefPushResult.FAILED, RemoteRefUpdate.Status.NON_EXISTING); - pushResultProcessingMock.onRefReplicatedToAllNodes("someProject", "someRef", 2); - pushResultProcessingMock.onAllRefsReplicatedToAllNodes(2); - replay(pushResultProcessingMock); - // actual test replicationState.increasePushTaskCount("someProject", "someRef"); replicationState.increasePushTaskCount("someProject", "someRef"); @@ -107,33 +88,29 @@ public class ReplicationStateTest { "someProject", "someRef", uri1, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK); replicationState.notifyRefReplicated( "someProject", "someRef", uri2, RefPushResult.FAILED, RemoteRefUpdate.Status.NON_EXISTING); - verify(pushResultProcessingMock); + + // expected events + verify(pushResultProcessingMock) + .onRefReplicatedToOneNode( + "someProject", "someRef", uri1, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK); + verify(pushResultProcessingMock) + .onRefReplicatedToOneNode( + "someProject", + "someRef", + uri2, + RefPushResult.FAILED, + RemoteRefUpdate.Status.NON_EXISTING); + verify(pushResultProcessingMock).onRefReplicatedToAllNodes("someProject", "someRef", 2); + verify(pushResultProcessingMock).onAllRefsReplicatedToAllNodes(2); } @Test public void shouldFireEventsForReplicationOfMultipleRefsToMultipleNodes() throws URISyntaxException { - resetToDefault(pushResultProcessingMock); URIish uri1 = new URIish("git://host1/someRepo.git"); URIish uri2 = new URIish("git://host2/someRepo.git"); URIish uri3 = new URIish("git://host3/someRepo.git"); - // expected events - pushResultProcessingMock.onRefReplicatedToOneNode( - "someProject", "ref1", uri1, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK); - pushResultProcessingMock.onRefReplicatedToOneNode( - "someProject", "ref1", uri2, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK); - pushResultProcessingMock.onRefReplicatedToOneNode( - "someProject", "ref1", uri3, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK); - pushResultProcessingMock.onRefReplicatedToOneNode( - "someProject", "ref2", uri1, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK); - pushResultProcessingMock.onRefReplicatedToOneNode( - "someProject", "ref2", uri2, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK); - pushResultProcessingMock.onRefReplicatedToAllNodes("someProject", "ref1", 3); - pushResultProcessingMock.onRefReplicatedToAllNodes("someProject", "ref2", 2); - pushResultProcessingMock.onAllRefsReplicatedToAllNodes(5); - replay(pushResultProcessingMock); - // actual test replicationState.increasePushTaskCount("someProject", "ref1"); replicationState.increasePushTaskCount("someProject", "ref1"); @@ -151,24 +128,32 @@ public class ReplicationStateTest { "someProject", "ref2", uri1, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK); replicationState.notifyRefReplicated( "someProject", "ref2", uri2, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK); - verify(pushResultProcessingMock); + + // expected events + verify(pushResultProcessingMock) + .onRefReplicatedToOneNode( + "someProject", "ref1", uri1, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK); + verify(pushResultProcessingMock) + .onRefReplicatedToOneNode( + "someProject", "ref1", uri2, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK); + verify(pushResultProcessingMock) + .onRefReplicatedToOneNode( + "someProject", "ref1", uri3, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK); + verify(pushResultProcessingMock) + .onRefReplicatedToOneNode( + "someProject", "ref2", uri1, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK); + verify(pushResultProcessingMock) + .onRefReplicatedToOneNode( + "someProject", "ref2", uri2, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK); + verify(pushResultProcessingMock).onRefReplicatedToAllNodes("someProject", "ref1", 3); + verify(pushResultProcessingMock).onRefReplicatedToAllNodes("someProject", "ref2", 2); + verify(pushResultProcessingMock).onAllRefsReplicatedToAllNodes(5); } @Test public void shouldFireEventsForReplicationSameRefDifferentProjects() throws URISyntaxException { - resetToDefault(pushResultProcessingMock); URIish uri = new URIish("git://host1/someRepo.git"); - // expected events - pushResultProcessingMock.onRefReplicatedToOneNode( - "project1", "ref1", uri, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK); - pushResultProcessingMock.onRefReplicatedToOneNode( - "project2", "ref2", uri, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK); - pushResultProcessingMock.onRefReplicatedToAllNodes("project1", "ref1", 1); - pushResultProcessingMock.onRefReplicatedToAllNodes("project2", "ref2", 1); - pushResultProcessingMock.onAllRefsReplicatedToAllNodes(2); - replay(pushResultProcessingMock); - // actual test replicationState.increasePushTaskCount("project1", "ref1"); replicationState.increasePushTaskCount("project2", "ref2"); @@ -177,25 +162,24 @@ public class ReplicationStateTest { "project1", "ref1", uri, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK); replicationState.notifyRefReplicated( "project2", "ref2", uri, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK); - verify(pushResultProcessingMock); + + // expected events + verify(pushResultProcessingMock) + .onRefReplicatedToOneNode( + "project1", "ref1", uri, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK); + verify(pushResultProcessingMock) + .onRefReplicatedToOneNode( + "project2", "ref2", uri, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK); + verify(pushResultProcessingMock).onRefReplicatedToAllNodes("project1", "ref1", 1); + verify(pushResultProcessingMock).onRefReplicatedToAllNodes("project2", "ref2", 1); + verify(pushResultProcessingMock).onAllRefsReplicatedToAllNodes(2); } @Test public void shouldFireEventsWhenSomeReplicationCompleteBeforeAllTasksAreScheduled() throws URISyntaxException { - resetToDefault(pushResultProcessingMock); URIish uri1 = new URIish("git://host1/someRepo.git"); - // expected events - pushResultProcessingMock.onRefReplicatedToOneNode( - "someProject", "ref1", uri1, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK); - pushResultProcessingMock.onRefReplicatedToOneNode( - "someProject", "ref2", uri1, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK); - pushResultProcessingMock.onRefReplicatedToAllNodes("someProject", "ref1", 1); - pushResultProcessingMock.onRefReplicatedToAllNodes("someProject", "ref2", 1); - pushResultProcessingMock.onAllRefsReplicatedToAllNodes(2); - replay(pushResultProcessingMock); - // actual test replicationState.increasePushTaskCount("someProject", "ref1"); replicationState.increasePushTaskCount("someProject", "ref2"); @@ -204,7 +188,17 @@ public class ReplicationStateTest { replicationState.notifyRefReplicated( "someProject", "ref2", uri1, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK); replicationState.markAllPushTasksScheduled(); - verify(pushResultProcessingMock); + + // expected events + verify(pushResultProcessingMock) + .onRefReplicatedToOneNode( + "someProject", "ref1", uri1, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK); + verify(pushResultProcessingMock) + .onRefReplicatedToOneNode( + "someProject", "ref2", uri1, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK); + verify(pushResultProcessingMock).onRefReplicatedToAllNodes("someProject", "ref1", 1); + verify(pushResultProcessingMock).onRefReplicatedToAllNodes("someProject", "ref2", 1); + verify(pushResultProcessingMock).onAllRefsReplicatedToAllNodes(2); } @Test |