summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadConfigDecorator.java16
-rw-r--r--src/main/java/com/googlesource/gerrit/plugins/replication/EventsStorage.java110
-rw-r--r--src/main/java/com/googlesource/gerrit/plugins/replication/OnStartStop.java9
-rw-r--r--src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationConfig.java3
-rw-r--r--src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfig.java18
-rw-r--r--src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationModule.java1
-rw-r--r--src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java33
-rw-r--r--src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationState.java25
-rw-r--r--src/main/java/com/googlesource/gerrit/plugins/replication/StartCommand.java3
-rw-r--r--src/main/resources/Documentation/config.md11
-rw-r--r--src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStateTest.java5
11 files changed, 221 insertions, 13 deletions
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadConfigDecorator.java b/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadConfigDecorator.java
index 5d6e409..e73e049 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadConfigDecorator.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadConfigDecorator.java
@@ -14,11 +14,13 @@
package com.googlesource.gerrit.plugins.replication;
import com.google.gerrit.common.FileUtil;
+import com.google.gerrit.extensions.annotations.PluginData;
import com.google.gerrit.server.config.SitePaths;
import com.google.gerrit.server.git.WorkQueue;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import java.io.IOException;
+import java.nio.file.Path;
import java.util.List;
import org.eclipse.jgit.errors.ConfigInvalidException;
import org.slf4j.Logger;
@@ -33,13 +35,18 @@ public class AutoReloadConfigDecorator implements ReplicationConfig {
private final SitePaths site;
private final WorkQueue workQueue;
private final DestinationFactory destinationFactory;
+ private final Path pluginDataDir;
@Inject
public AutoReloadConfigDecorator(
- SitePaths site, WorkQueue workQueue, DestinationFactory destinationFactory)
+ SitePaths site,
+ WorkQueue workQueue,
+ DestinationFactory destinationFactory,
+ @PluginData Path pluginDataDir)
throws ConfigInvalidException, IOException {
this.site = site;
this.destinationFactory = destinationFactory;
+ this.pluginDataDir = pluginDataDir;
this.currentConfig = loadConfig();
this.currentConfigTs = getLastModified(currentConfig);
this.workQueue = workQueue;
@@ -50,7 +57,7 @@ public class AutoReloadConfigDecorator implements ReplicationConfig {
}
private ReplicationFileBasedConfig loadConfig() throws ConfigInvalidException, IOException {
- return new ReplicationFileBasedConfig(site, destinationFactory);
+ return new ReplicationFileBasedConfig(site, destinationFactory, pluginDataDir);
}
private synchronized boolean isAutoReload() {
@@ -102,6 +109,11 @@ public class AutoReloadConfigDecorator implements ReplicationConfig {
}
@Override
+ public Path getEventsDirectory() {
+ return currentConfig.getEventsDirectory();
+ }
+
+ @Override
public synchronized int shutdown() {
return currentConfig.shutdown();
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/EventsStorage.java b/src/main/java/com/googlesource/gerrit/plugins/replication/EventsStorage.java
new file mode 100644
index 0000000..dc2e6e5
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/EventsStorage.java
@@ -0,0 +1,110 @@
+// Copyright (C) 2018 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import com.google.common.hash.Hashing;
+import com.google.gson.Gson;
+import com.google.inject.Inject;
+import com.google.inject.ProvisionException;
+import com.google.inject.Singleton;
+import java.io.IOException;
+import java.nio.file.DirectoryStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+import org.eclipse.jgit.lib.ObjectId;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Singleton
+public class EventsStorage {
+ private static final Logger log = LoggerFactory.getLogger(EventsStorage.class);
+
+ public static class ReplicateRefUpdate {
+ public String project;
+ public String ref;
+ }
+
+ private static Gson GSON = new Gson();
+
+ private final Path refUpdates;
+
+ @Inject
+ EventsStorage(ReplicationConfig config) {
+ refUpdates = config.getEventsDirectory().resolve("ref-updates");
+ }
+
+ public String persist(String project, String ref) {
+ ReplicateRefUpdate r = new ReplicateRefUpdate();
+ r.project = project;
+ r.ref = ref;
+
+ String json = GSON.toJson(r);
+ String eventKey = sha1(json).name();
+ Path file = refUpdates().resolve(eventKey);
+
+ if (Files.exists(file)) {
+ return eventKey;
+ }
+
+ try {
+ Files.write(file, json.getBytes(UTF_8));
+ } catch (IOException e) {
+ log.warn("Couldn't persist event {}", json);
+ }
+ return eventKey;
+ }
+
+ public void delete(String eventKey) {
+ if (eventKey != null) {
+ try {
+ Files.delete(refUpdates().resolve(eventKey));
+ } catch (IOException e) {
+ log.error("Error while deleting event {}", eventKey);
+ }
+ }
+ }
+
+ public List<ReplicateRefUpdate> list() {
+ ArrayList<ReplicateRefUpdate> result = new ArrayList<>();
+ try (DirectoryStream<Path> events = Files.newDirectoryStream(refUpdates())) {
+ for (Path e : events) {
+ if (Files.isRegularFile(e)) {
+ String json = new String(Files.readAllBytes(e), UTF_8);
+ result.add(GSON.fromJson(json, ReplicateRefUpdate.class));
+ Files.delete(e);
+ }
+ }
+ } catch (IOException e) {
+ log.error("Error when firing pending events", e);
+ }
+ return result;
+ }
+
+ private ObjectId sha1(String s) {
+ return ObjectId.fromRaw(Hashing.sha1().hashString(s, UTF_8).asBytes());
+ }
+
+ private Path refUpdates() {
+ try {
+ return Files.createDirectories(refUpdates);
+ } catch (IOException e) {
+ throw new ProvisionException(String.format("Couldn't create %s", refUpdates), e);
+ }
+ }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/OnStartStop.java b/src/main/java/com/googlesource/gerrit/plugins/replication/OnStartStop.java
index 227804d..2d60d86 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/OnStartStop.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/OnStartStop.java
@@ -21,6 +21,7 @@ import com.google.gerrit.extensions.registration.DynamicItem;
import com.google.gerrit.extensions.systemstatus.ServerInformation;
import com.google.inject.Inject;
import com.googlesource.gerrit.plugins.replication.PushResultProcessing.GitUpdateProcessing;
+import com.googlesource.gerrit.plugins.replication.ReplicationState.Factory;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
@@ -32,6 +33,7 @@ public class OnStartStop implements LifecycleListener {
private final ReplicationQueue queue;
private final ReplicationConfig config;
private final DynamicItem<EventDispatcher> eventDispatcher;
+ private final Factory replicationStateFactory;
@Inject
protected OnStartStop(
@@ -39,12 +41,14 @@ public class OnStartStop implements LifecycleListener {
PushAll.Factory pushAll,
ReplicationQueue queue,
ReplicationConfig config,
- DynamicItem<EventDispatcher> eventDispatcher) {
+ DynamicItem<EventDispatcher> eventDispatcher,
+ ReplicationState.Factory replicationStateFactory) {
this.srvInfo = srvInfo;
this.pushAll = pushAll;
this.queue = queue;
this.config = config;
this.eventDispatcher = eventDispatcher;
+ this.replicationStateFactory = replicationStateFactory;
this.pushAllFuture = Atomics.newReference();
}
@@ -54,7 +58,8 @@ public class OnStartStop implements LifecycleListener {
if (srvInfo.getState() == ServerInformation.State.STARTUP
&& config.isReplicateAllOnPluginStart()) {
- ReplicationState state = new ReplicationState(new GitUpdateProcessing(eventDispatcher.get()));
+ ReplicationState state =
+ replicationStateFactory.create(new GitUpdateProcessing(eventDispatcher.get()));
pushAllFuture.set(
pushAll
.create(null, ReplicationFilter.all(), state, false)
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationConfig.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationConfig.java
index 869a49b..c9531e3 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationConfig.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationConfig.java
@@ -14,6 +14,7 @@
package com.googlesource.gerrit.plugins.replication;
import com.google.gerrit.server.git.WorkQueue;
+import java.nio.file.Path;
import java.util.List;
public interface ReplicationConfig {
@@ -32,6 +33,8 @@ public interface ReplicationConfig {
boolean isEmpty();
+ Path getEventsDirectory();
+
int shutdown();
void startup(WorkQueue workQueue);
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfig.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfig.java
index db9f35d..1bba96d 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfig.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfig.java
@@ -17,8 +17,10 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
import static java.util.stream.Collectors.toList;
+import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
+import com.google.gerrit.extensions.annotations.PluginData;
import com.google.gerrit.server.config.ConfigUtil;
import com.google.gerrit.server.config.SitePaths;
import com.google.gerrit.server.git.WorkQueue;
@@ -47,19 +49,24 @@ public class ReplicationFileBasedConfig implements ReplicationConfig {
private static final int DEFAULT_SSH_CONNECTION_TIMEOUT_MS = 2 * 60 * 1000; // 2 minutes
private List<Destination> destinations;
+ private final SitePaths site;
private Path cfgPath;
private boolean replicateAllOnPluginStart;
private boolean defaultForceUpdate;
private int sshCommandTimeout;
private int sshConnectionTimeout = DEFAULT_SSH_CONNECTION_TIMEOUT_MS;
private final FileBasedConfig config;
+ private final Path pluginDataDir;
@Inject
- public ReplicationFileBasedConfig(SitePaths site, DestinationFactory destinationFactory)
+ public ReplicationFileBasedConfig(
+ SitePaths site, DestinationFactory destinationFactory, @PluginData Path pluginDataDir)
throws ConfigInvalidException, IOException {
+ this.site = site;
this.cfgPath = site.etc_dir.resolve("replication.config");
this.config = new FileBasedConfig(cfgPath.toFile(), FS.DETECTED);
this.destinations = allDestinations(destinationFactory);
+ this.pluginDataDir = pluginDataDir;
}
/*
@@ -199,6 +206,15 @@ public class ReplicationFileBasedConfig implements ReplicationConfig {
return destinations.isEmpty();
}
+ @Override
+ public Path getEventsDirectory() {
+ String eventsDirectory = config.getString("replication", null, "eventsDirectory");
+ if (!Strings.isNullOrEmpty(eventsDirectory)) {
+ return site.resolve(eventsDirectory);
+ }
+ return pluginDataDir;
+ }
+
Path getCfgPath() {
return cfgPath;
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationModule.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationModule.java
index f30e13d..5e7c978 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationModule.java
@@ -56,6 +56,7 @@ class ReplicationModule extends AbstractModule {
install(new FactoryModuleBuilder().build(PushAll.Factory.class));
install(new FactoryModuleBuilder().build(RemoteSiteUser.Factory.class));
+ install(new FactoryModuleBuilder().build(ReplicationState.Factory.class));
bind(ReplicationConfig.class).to(AutoReloadConfigDecorator.class);
bind(ReplicationStateListener.class).to(ReplicationStateLogger.class);
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
index 4c7bdfc..c79363b 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
@@ -74,6 +74,8 @@ public class ReplicationQueue
private final DynamicItem<EventDispatcher> dispatcher;
private final ReplicationConfig config;
private final GerritSshApi gerritAdmin;
+ private final ReplicationState.Factory replicationStateFactory;
+ private final EventsStorage eventsStorage;
private volatile boolean running;
private final Queue<GitReferenceUpdatedListener.Event> beforeStartupEventsQueue;
@@ -84,13 +86,17 @@ public class ReplicationQueue
GerritSshApi ga,
ReplicationConfig rc,
DynamicItem<EventDispatcher> dis,
- ReplicationStateListener sl) {
+ ReplicationStateListener sl,
+ ReplicationState.Factory rsf,
+ EventsStorage es) {
workQueue = wq;
sshHelper = sh;
dispatcher = dis;
config = rc;
stateLog = sl;
gerritAdmin = ga;
+ replicationStateFactory = rsf;
+ eventsStorage = es;
beforeStartupEventsQueue = Queues.newConcurrentLinkedQueue();
}
@@ -98,6 +104,7 @@ public class ReplicationQueue
public void start() {
config.startup(workQueue);
running = true;
+ firePendingEvents();
fireBeforeStartupEvents();
}
@@ -132,7 +139,8 @@ public class ReplicationQueue
@Override
public void onGitReferenceUpdated(GitReferenceUpdatedListener.Event event) {
- ReplicationState state = new ReplicationState(new GitUpdateProcessing(dispatcher.get()));
+ ReplicationState state =
+ replicationStateFactory.create(new GitUpdateProcessing(dispatcher.get()));
if (!running) {
stateLog.warn(
"Replication plugin did not finish startup before event, event replication is postponed",
@@ -140,18 +148,33 @@ public class ReplicationQueue
beforeStartupEventsQueue.add(event);
return;
}
+ onGitReferenceUpdated(event.getProjectName(), event.getRefName());
+ }
- Project.NameKey project = new Project.NameKey(event.getProjectName());
+ private void onGitReferenceUpdated(String projectName, String refName) {
+ ReplicationState state =
+ replicationStateFactory.create(new GitUpdateProcessing(dispatcher.get()));
+
+ Project.NameKey project = new Project.NameKey(projectName);
for (Destination cfg : config.getDestinations(FilterType.ALL)) {
- if (cfg.wouldPushProject(project) && cfg.wouldPushRef(event.getRefName())) {
+ if (cfg.wouldPushProject(project) && cfg.wouldPushRef(refName)) {
+ String eventKey = eventsStorage.persist(projectName, refName);
+ state.setEventKey(eventKey);
for (URIish uri : cfg.getURIs(project, null)) {
- cfg.schedule(project, event.getRefName(), uri, state);
+ cfg.schedule(project, refName, uri, state);
}
}
}
state.markAllPushTasksScheduled();
}
+ private void firePendingEvents() {
+ for (EventsStorage.ReplicateRefUpdate e : eventsStorage.list()) {
+ repLog.info("Firing pending event {}", e);
+ onGitReferenceUpdated(e.project, e.ref);
+ }
+ }
+
@Override
public void onNewProjectCreated(NewProjectCreatedListener.Event event) {
Project.NameKey projectName = new Project.NameKey(event.getProjectName());
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationState.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationState.java
index 86557e2..6f0803a 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationState.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationState.java
@@ -16,6 +16,8 @@ package com.googlesource.gerrit.plugins.replication;
import com.google.common.collect.HashBasedTable;
import com.google.common.collect.Table;
+import com.google.inject.assistedinject.Assisted;
+import com.google.inject.assistedinject.AssistedInject;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@@ -23,7 +25,13 @@ import org.eclipse.jgit.transport.RemoteRefUpdate;
import org.eclipse.jgit.transport.URIish;
public class ReplicationState {
+
+ public interface Factory {
+ ReplicationState create(PushResultProcessing processing);
+ }
+
private boolean allScheduled;
+ private final EventsStorage eventsStorage;
private final PushResultProcessing pushResultProcessing;
private final Lock countingLock = new ReentrantLock();
@@ -49,7 +57,11 @@ public class ReplicationState {
private int totalPushTasksCount;
private int finishedPushTasksCount;
- public ReplicationState(PushResultProcessing processing) {
+ private String eventKey;
+
+ @AssistedInject
+ ReplicationState(EventsStorage storage, @Assisted PushResultProcessing processing) {
+ eventsStorage = storage;
pushResultProcessing = processing;
statusByProjectRef = HashBasedTable.create();
}
@@ -74,6 +86,7 @@ public class ReplicationState {
URIish uri,
RefPushResult status,
RemoteRefUpdate.Status refUpdateStatus) {
+ deleteEvent();
pushResultProcessing.onRefReplicatedToOneNode(project, ref, uri, status, refUpdateStatus);
RefReplicationStatus completedRefStatus = null;
@@ -103,6 +116,12 @@ public class ReplicationState {
}
}
+ private void deleteEvent() {
+ if (eventKey != null) {
+ eventsStorage.delete(eventKey);
+ }
+ }
+
public void markAllPushTasksScheduled() {
countingLock.lock();
try {
@@ -173,4 +192,8 @@ public class ReplicationState {
return name().toLowerCase().replace("_", "-");
}
}
+
+ public void setEventKey(String eventKey) {
+ this.eventKey = eventKey;
+ }
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/StartCommand.java b/src/main/java/com/googlesource/gerrit/plugins/replication/StartCommand.java
index 7115d5b..2dbc7b7 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/StartCommand.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/StartCommand.java
@@ -52,6 +52,7 @@ final class StartCommand extends SshCommand {
@Inject private PushAll.Factory pushFactory;
private final Object lock = new Object();
+ @Inject private ReplicationState.Factory replicationStateFactory;
@Override
protected void run() throws Failure {
@@ -59,7 +60,7 @@ final class StartCommand extends SshCommand {
throw new UnloggedFailure(1, "error: cannot combine --all and PROJECT");
}
- ReplicationState state = new ReplicationState(new CommandProcessing(this));
+ ReplicationState state = replicationStateFactory.create(new CommandProcessing(this));
Future<?> future = null;
ReplicationFilter projectFilter;
diff --git a/src/main/resources/Documentation/config.md b/src/main/resources/Documentation/config.md
index e70094c..81a0613 100644
--- a/src/main/resources/Documentation/config.md
+++ b/src/main/resources/Documentation/config.md
@@ -124,6 +124,17 @@ replication.maxRetries
By default, pushes are retried indefinitely.
+replication.eventsDirectory
+: Directory where replication events are persisted
+
+ When scheduling a replication, the replication event is persisted
+ under this directory. When the replication is done, the event is deleted.
+ If plugin is stopped before all scheduled replications are done, the
+ persisted events will not be deleted. When the plugin is started again,
+ it will trigger all replications found under this directory.
+
+ When not set, defaults to the plugin's data directory.
+
remote.NAME.url
: Address of the remote server to push to. Multiple URLs may be
specified within a single remote block, listing different
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStateTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStateTest.java
index 193af1e..cf6715e 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStateTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStateTest.java
@@ -32,12 +32,15 @@ public class ReplicationStateTest {
private ReplicationState replicationState;
private PushResultProcessing pushResultProcessingMock;
+ private EventsStorage eventsStorage;
@Before
public void setUp() throws Exception {
pushResultProcessingMock = createNiceMock(PushResultProcessing.class);
replay(pushResultProcessingMock);
- replicationState = new ReplicationState(pushResultProcessingMock);
+ eventsStorage = createNiceMock(EventsStorage.class);
+ replay(eventsStorage);
+ replicationState = new ReplicationState(eventsStorage, pushResultProcessingMock);
}
@Test