diff options
11 files changed, 221 insertions, 13 deletions
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadConfigDecorator.java b/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadConfigDecorator.java index 5d6e409..e73e049 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadConfigDecorator.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadConfigDecorator.java @@ -14,11 +14,13 @@ package com.googlesource.gerrit.plugins.replication; import com.google.gerrit.common.FileUtil; +import com.google.gerrit.extensions.annotations.PluginData; import com.google.gerrit.server.config.SitePaths; import com.google.gerrit.server.git.WorkQueue; import com.google.inject.Inject; import com.google.inject.Singleton; import java.io.IOException; +import java.nio.file.Path; import java.util.List; import org.eclipse.jgit.errors.ConfigInvalidException; import org.slf4j.Logger; @@ -33,13 +35,18 @@ public class AutoReloadConfigDecorator implements ReplicationConfig { private final SitePaths site; private final WorkQueue workQueue; private final DestinationFactory destinationFactory; + private final Path pluginDataDir; @Inject public AutoReloadConfigDecorator( - SitePaths site, WorkQueue workQueue, DestinationFactory destinationFactory) + SitePaths site, + WorkQueue workQueue, + DestinationFactory destinationFactory, + @PluginData Path pluginDataDir) throws ConfigInvalidException, IOException { this.site = site; this.destinationFactory = destinationFactory; + this.pluginDataDir = pluginDataDir; this.currentConfig = loadConfig(); this.currentConfigTs = getLastModified(currentConfig); this.workQueue = workQueue; @@ -50,7 +57,7 @@ public class AutoReloadConfigDecorator implements ReplicationConfig { } private ReplicationFileBasedConfig loadConfig() throws ConfigInvalidException, IOException { - return new ReplicationFileBasedConfig(site, destinationFactory); + return new ReplicationFileBasedConfig(site, destinationFactory, pluginDataDir); } private synchronized boolean isAutoReload() { @@ -102,6 +109,11 @@ public class AutoReloadConfigDecorator implements ReplicationConfig { } @Override + public Path getEventsDirectory() { + return currentConfig.getEventsDirectory(); + } + + @Override public synchronized int shutdown() { return currentConfig.shutdown(); } diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/EventsStorage.java b/src/main/java/com/googlesource/gerrit/plugins/replication/EventsStorage.java new file mode 100644 index 0000000..dc2e6e5 --- /dev/null +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/EventsStorage.java @@ -0,0 +1,110 @@ +// Copyright (C) 2018 The Android Open Source Project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.googlesource.gerrit.plugins.replication; + +import static java.nio.charset.StandardCharsets.UTF_8; + +import com.google.common.hash.Hashing; +import com.google.gson.Gson; +import com.google.inject.Inject; +import com.google.inject.ProvisionException; +import com.google.inject.Singleton; +import java.io.IOException; +import java.nio.file.DirectoryStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.List; +import org.eclipse.jgit.lib.ObjectId; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@Singleton +public class EventsStorage { + private static final Logger log = LoggerFactory.getLogger(EventsStorage.class); + + public static class ReplicateRefUpdate { + public String project; + public String ref; + } + + private static Gson GSON = new Gson(); + + private final Path refUpdates; + + @Inject + EventsStorage(ReplicationConfig config) { + refUpdates = config.getEventsDirectory().resolve("ref-updates"); + } + + public String persist(String project, String ref) { + ReplicateRefUpdate r = new ReplicateRefUpdate(); + r.project = project; + r.ref = ref; + + String json = GSON.toJson(r); + String eventKey = sha1(json).name(); + Path file = refUpdates().resolve(eventKey); + + if (Files.exists(file)) { + return eventKey; + } + + try { + Files.write(file, json.getBytes(UTF_8)); + } catch (IOException e) { + log.warn("Couldn't persist event {}", json); + } + return eventKey; + } + + public void delete(String eventKey) { + if (eventKey != null) { + try { + Files.delete(refUpdates().resolve(eventKey)); + } catch (IOException e) { + log.error("Error while deleting event {}", eventKey); + } + } + } + + public List<ReplicateRefUpdate> list() { + ArrayList<ReplicateRefUpdate> result = new ArrayList<>(); + try (DirectoryStream<Path> events = Files.newDirectoryStream(refUpdates())) { + for (Path e : events) { + if (Files.isRegularFile(e)) { + String json = new String(Files.readAllBytes(e), UTF_8); + result.add(GSON.fromJson(json, ReplicateRefUpdate.class)); + Files.delete(e); + } + } + } catch (IOException e) { + log.error("Error when firing pending events", e); + } + return result; + } + + private ObjectId sha1(String s) { + return ObjectId.fromRaw(Hashing.sha1().hashString(s, UTF_8).asBytes()); + } + + private Path refUpdates() { + try { + return Files.createDirectories(refUpdates); + } catch (IOException e) { + throw new ProvisionException(String.format("Couldn't create %s", refUpdates), e); + } + } +} diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/OnStartStop.java b/src/main/java/com/googlesource/gerrit/plugins/replication/OnStartStop.java index 227804d..2d60d86 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/OnStartStop.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/OnStartStop.java @@ -21,6 +21,7 @@ import com.google.gerrit.extensions.registration.DynamicItem; import com.google.gerrit.extensions.systemstatus.ServerInformation; import com.google.inject.Inject; import com.googlesource.gerrit.plugins.replication.PushResultProcessing.GitUpdateProcessing; +import com.googlesource.gerrit.plugins.replication.ReplicationState.Factory; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; @@ -32,6 +33,7 @@ public class OnStartStop implements LifecycleListener { private final ReplicationQueue queue; private final ReplicationConfig config; private final DynamicItem<EventDispatcher> eventDispatcher; + private final Factory replicationStateFactory; @Inject protected OnStartStop( @@ -39,12 +41,14 @@ public class OnStartStop implements LifecycleListener { PushAll.Factory pushAll, ReplicationQueue queue, ReplicationConfig config, - DynamicItem<EventDispatcher> eventDispatcher) { + DynamicItem<EventDispatcher> eventDispatcher, + ReplicationState.Factory replicationStateFactory) { this.srvInfo = srvInfo; this.pushAll = pushAll; this.queue = queue; this.config = config; this.eventDispatcher = eventDispatcher; + this.replicationStateFactory = replicationStateFactory; this.pushAllFuture = Atomics.newReference(); } @@ -54,7 +58,8 @@ public class OnStartStop implements LifecycleListener { if (srvInfo.getState() == ServerInformation.State.STARTUP && config.isReplicateAllOnPluginStart()) { - ReplicationState state = new ReplicationState(new GitUpdateProcessing(eventDispatcher.get())); + ReplicationState state = + replicationStateFactory.create(new GitUpdateProcessing(eventDispatcher.get())); pushAllFuture.set( pushAll .create(null, ReplicationFilter.all(), state, false) diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationConfig.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationConfig.java index 869a49b..c9531e3 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationConfig.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationConfig.java @@ -14,6 +14,7 @@ package com.googlesource.gerrit.plugins.replication; import com.google.gerrit.server.git.WorkQueue; +import java.nio.file.Path; import java.util.List; public interface ReplicationConfig { @@ -32,6 +33,8 @@ public interface ReplicationConfig { boolean isEmpty(); + Path getEventsDirectory(); + int shutdown(); void startup(WorkQueue workQueue); diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfig.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfig.java index db9f35d..1bba96d 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfig.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfig.java @@ -17,8 +17,10 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.SECONDS; import static java.util.stream.Collectors.toList; +import com.google.common.base.Strings; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; +import com.google.gerrit.extensions.annotations.PluginData; import com.google.gerrit.server.config.ConfigUtil; import com.google.gerrit.server.config.SitePaths; import com.google.gerrit.server.git.WorkQueue; @@ -47,19 +49,24 @@ public class ReplicationFileBasedConfig implements ReplicationConfig { private static final int DEFAULT_SSH_CONNECTION_TIMEOUT_MS = 2 * 60 * 1000; // 2 minutes private List<Destination> destinations; + private final SitePaths site; private Path cfgPath; private boolean replicateAllOnPluginStart; private boolean defaultForceUpdate; private int sshCommandTimeout; private int sshConnectionTimeout = DEFAULT_SSH_CONNECTION_TIMEOUT_MS; private final FileBasedConfig config; + private final Path pluginDataDir; @Inject - public ReplicationFileBasedConfig(SitePaths site, DestinationFactory destinationFactory) + public ReplicationFileBasedConfig( + SitePaths site, DestinationFactory destinationFactory, @PluginData Path pluginDataDir) throws ConfigInvalidException, IOException { + this.site = site; this.cfgPath = site.etc_dir.resolve("replication.config"); this.config = new FileBasedConfig(cfgPath.toFile(), FS.DETECTED); this.destinations = allDestinations(destinationFactory); + this.pluginDataDir = pluginDataDir; } /* @@ -199,6 +206,15 @@ public class ReplicationFileBasedConfig implements ReplicationConfig { return destinations.isEmpty(); } + @Override + public Path getEventsDirectory() { + String eventsDirectory = config.getString("replication", null, "eventsDirectory"); + if (!Strings.isNullOrEmpty(eventsDirectory)) { + return site.resolve(eventsDirectory); + } + return pluginDataDir; + } + Path getCfgPath() { return cfgPath; } diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationModule.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationModule.java index f30e13d..5e7c978 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationModule.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationModule.java @@ -56,6 +56,7 @@ class ReplicationModule extends AbstractModule { install(new FactoryModuleBuilder().build(PushAll.Factory.class)); install(new FactoryModuleBuilder().build(RemoteSiteUser.Factory.class)); + install(new FactoryModuleBuilder().build(ReplicationState.Factory.class)); bind(ReplicationConfig.class).to(AutoReloadConfigDecorator.class); bind(ReplicationStateListener.class).to(ReplicationStateLogger.class); diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java index 4c7bdfc..c79363b 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java @@ -74,6 +74,8 @@ public class ReplicationQueue private final DynamicItem<EventDispatcher> dispatcher; private final ReplicationConfig config; private final GerritSshApi gerritAdmin; + private final ReplicationState.Factory replicationStateFactory; + private final EventsStorage eventsStorage; private volatile boolean running; private final Queue<GitReferenceUpdatedListener.Event> beforeStartupEventsQueue; @@ -84,13 +86,17 @@ public class ReplicationQueue GerritSshApi ga, ReplicationConfig rc, DynamicItem<EventDispatcher> dis, - ReplicationStateListener sl) { + ReplicationStateListener sl, + ReplicationState.Factory rsf, + EventsStorage es) { workQueue = wq; sshHelper = sh; dispatcher = dis; config = rc; stateLog = sl; gerritAdmin = ga; + replicationStateFactory = rsf; + eventsStorage = es; beforeStartupEventsQueue = Queues.newConcurrentLinkedQueue(); } @@ -98,6 +104,7 @@ public class ReplicationQueue public void start() { config.startup(workQueue); running = true; + firePendingEvents(); fireBeforeStartupEvents(); } @@ -132,7 +139,8 @@ public class ReplicationQueue @Override public void onGitReferenceUpdated(GitReferenceUpdatedListener.Event event) { - ReplicationState state = new ReplicationState(new GitUpdateProcessing(dispatcher.get())); + ReplicationState state = + replicationStateFactory.create(new GitUpdateProcessing(dispatcher.get())); if (!running) { stateLog.warn( "Replication plugin did not finish startup before event, event replication is postponed", @@ -140,18 +148,33 @@ public class ReplicationQueue beforeStartupEventsQueue.add(event); return; } + onGitReferenceUpdated(event.getProjectName(), event.getRefName()); + } - Project.NameKey project = new Project.NameKey(event.getProjectName()); + private void onGitReferenceUpdated(String projectName, String refName) { + ReplicationState state = + replicationStateFactory.create(new GitUpdateProcessing(dispatcher.get())); + + Project.NameKey project = new Project.NameKey(projectName); for (Destination cfg : config.getDestinations(FilterType.ALL)) { - if (cfg.wouldPushProject(project) && cfg.wouldPushRef(event.getRefName())) { + if (cfg.wouldPushProject(project) && cfg.wouldPushRef(refName)) { + String eventKey = eventsStorage.persist(projectName, refName); + state.setEventKey(eventKey); for (URIish uri : cfg.getURIs(project, null)) { - cfg.schedule(project, event.getRefName(), uri, state); + cfg.schedule(project, refName, uri, state); } } } state.markAllPushTasksScheduled(); } + private void firePendingEvents() { + for (EventsStorage.ReplicateRefUpdate e : eventsStorage.list()) { + repLog.info("Firing pending event {}", e); + onGitReferenceUpdated(e.project, e.ref); + } + } + @Override public void onNewProjectCreated(NewProjectCreatedListener.Event event) { Project.NameKey projectName = new Project.NameKey(event.getProjectName()); diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationState.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationState.java index 86557e2..6f0803a 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationState.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationState.java @@ -16,6 +16,8 @@ package com.googlesource.gerrit.plugins.replication; import com.google.common.collect.HashBasedTable; import com.google.common.collect.Table; +import com.google.inject.assistedinject.Assisted; +import com.google.inject.assistedinject.AssistedInject; import java.util.concurrent.CountDownLatch; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -23,7 +25,13 @@ import org.eclipse.jgit.transport.RemoteRefUpdate; import org.eclipse.jgit.transport.URIish; public class ReplicationState { + + public interface Factory { + ReplicationState create(PushResultProcessing processing); + } + private boolean allScheduled; + private final EventsStorage eventsStorage; private final PushResultProcessing pushResultProcessing; private final Lock countingLock = new ReentrantLock(); @@ -49,7 +57,11 @@ public class ReplicationState { private int totalPushTasksCount; private int finishedPushTasksCount; - public ReplicationState(PushResultProcessing processing) { + private String eventKey; + + @AssistedInject + ReplicationState(EventsStorage storage, @Assisted PushResultProcessing processing) { + eventsStorage = storage; pushResultProcessing = processing; statusByProjectRef = HashBasedTable.create(); } @@ -74,6 +86,7 @@ public class ReplicationState { URIish uri, RefPushResult status, RemoteRefUpdate.Status refUpdateStatus) { + deleteEvent(); pushResultProcessing.onRefReplicatedToOneNode(project, ref, uri, status, refUpdateStatus); RefReplicationStatus completedRefStatus = null; @@ -103,6 +116,12 @@ public class ReplicationState { } } + private void deleteEvent() { + if (eventKey != null) { + eventsStorage.delete(eventKey); + } + } + public void markAllPushTasksScheduled() { countingLock.lock(); try { @@ -173,4 +192,8 @@ public class ReplicationState { return name().toLowerCase().replace("_", "-"); } } + + public void setEventKey(String eventKey) { + this.eventKey = eventKey; + } } diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/StartCommand.java b/src/main/java/com/googlesource/gerrit/plugins/replication/StartCommand.java index 7115d5b..2dbc7b7 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/StartCommand.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/StartCommand.java @@ -52,6 +52,7 @@ final class StartCommand extends SshCommand { @Inject private PushAll.Factory pushFactory; private final Object lock = new Object(); + @Inject private ReplicationState.Factory replicationStateFactory; @Override protected void run() throws Failure { @@ -59,7 +60,7 @@ final class StartCommand extends SshCommand { throw new UnloggedFailure(1, "error: cannot combine --all and PROJECT"); } - ReplicationState state = new ReplicationState(new CommandProcessing(this)); + ReplicationState state = replicationStateFactory.create(new CommandProcessing(this)); Future<?> future = null; ReplicationFilter projectFilter; diff --git a/src/main/resources/Documentation/config.md b/src/main/resources/Documentation/config.md index e70094c..81a0613 100644 --- a/src/main/resources/Documentation/config.md +++ b/src/main/resources/Documentation/config.md @@ -124,6 +124,17 @@ replication.maxRetries By default, pushes are retried indefinitely. +replication.eventsDirectory +: Directory where replication events are persisted + + When scheduling a replication, the replication event is persisted + under this directory. When the replication is done, the event is deleted. + If plugin is stopped before all scheduled replications are done, the + persisted events will not be deleted. When the plugin is started again, + it will trigger all replications found under this directory. + + When not set, defaults to the plugin's data directory. + remote.NAME.url : Address of the remote server to push to. Multiple URLs may be specified within a single remote block, listing different diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStateTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStateTest.java index 193af1e..cf6715e 100644 --- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStateTest.java +++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStateTest.java @@ -32,12 +32,15 @@ public class ReplicationStateTest { private ReplicationState replicationState; private PushResultProcessing pushResultProcessingMock; + private EventsStorage eventsStorage; @Before public void setUp() throws Exception { pushResultProcessingMock = createNiceMock(PushResultProcessing.class); replay(pushResultProcessingMock); - replicationState = new ReplicationState(pushResultProcessingMock); + eventsStorage = createNiceMock(EventsStorage.class); + replay(eventsStorage); + replicationState = new ReplicationState(eventsStorage, pushResultProcessingMock); } @Test |