summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSaša Živkov <sasa.zivkov@sap.com>2018-04-18 12:20:15 +0200
committerNguyen Tuan Khang Phan <nguyen.tuan.khang.phan@ericsson.com>2020-02-14 14:16:21 -0500
commit8b5547da463789631721a23919d0270c95ca8f50 (patch)
treec155cc5477a72698a5a411c6a3b2b8f99937f442
parent3248582915e897c0d568160465a2ca905f38df3e (diff)
Don't lose ref-updated events on plugin restart
When a ref-updated event is received, persist the event in the directory defined by the replication.eventsDirectory. When the updated ref is replicated deleted the persisted event. If replication queue is non-empty and plugin gets stopped, ref updates will not be replicated and, therefore, the persisted events will not get deleted. When the plugin starts it will schedule replication for all persisted events and delete them. This change provides two benefits: * no ref-updated events are lost on plugin restart * eliminate need for the replicateOnStartup=true setting which schedules replication of all refs for all projects and typically creates a humongous replication queue on every plugin restart. Change-Id: Ieacd084fabe703333241ffda11c8b6c78cced37a (cherry picked from commit bdaea910694dd5a3474dbc051b298aaee9d77950)
-rw-r--r--src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadConfigDecorator.java16
-rw-r--r--src/main/java/com/googlesource/gerrit/plugins/replication/EventsStorage.java110
-rw-r--r--src/main/java/com/googlesource/gerrit/plugins/replication/OnStartStop.java9
-rw-r--r--src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationConfig.java3
-rw-r--r--src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfig.java18
-rw-r--r--src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationModule.java1
-rw-r--r--src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java33
-rw-r--r--src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationState.java25
-rw-r--r--src/main/java/com/googlesource/gerrit/plugins/replication/StartCommand.java3
-rw-r--r--src/main/resources/Documentation/config.md11
-rw-r--r--src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStateTest.java5
11 files changed, 221 insertions, 13 deletions
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadConfigDecorator.java b/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadConfigDecorator.java
index 5d6e409..e73e049 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadConfigDecorator.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadConfigDecorator.java
@@ -14,11 +14,13 @@
package com.googlesource.gerrit.plugins.replication;
import com.google.gerrit.common.FileUtil;
+import com.google.gerrit.extensions.annotations.PluginData;
import com.google.gerrit.server.config.SitePaths;
import com.google.gerrit.server.git.WorkQueue;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import java.io.IOException;
+import java.nio.file.Path;
import java.util.List;
import org.eclipse.jgit.errors.ConfigInvalidException;
import org.slf4j.Logger;
@@ -33,13 +35,18 @@ public class AutoReloadConfigDecorator implements ReplicationConfig {
private final SitePaths site;
private final WorkQueue workQueue;
private final DestinationFactory destinationFactory;
+ private final Path pluginDataDir;
@Inject
public AutoReloadConfigDecorator(
- SitePaths site, WorkQueue workQueue, DestinationFactory destinationFactory)
+ SitePaths site,
+ WorkQueue workQueue,
+ DestinationFactory destinationFactory,
+ @PluginData Path pluginDataDir)
throws ConfigInvalidException, IOException {
this.site = site;
this.destinationFactory = destinationFactory;
+ this.pluginDataDir = pluginDataDir;
this.currentConfig = loadConfig();
this.currentConfigTs = getLastModified(currentConfig);
this.workQueue = workQueue;
@@ -50,7 +57,7 @@ public class AutoReloadConfigDecorator implements ReplicationConfig {
}
private ReplicationFileBasedConfig loadConfig() throws ConfigInvalidException, IOException {
- return new ReplicationFileBasedConfig(site, destinationFactory);
+ return new ReplicationFileBasedConfig(site, destinationFactory, pluginDataDir);
}
private synchronized boolean isAutoReload() {
@@ -102,6 +109,11 @@ public class AutoReloadConfigDecorator implements ReplicationConfig {
}
@Override
+ public Path getEventsDirectory() {
+ return currentConfig.getEventsDirectory();
+ }
+
+ @Override
public synchronized int shutdown() {
return currentConfig.shutdown();
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/EventsStorage.java b/src/main/java/com/googlesource/gerrit/plugins/replication/EventsStorage.java
new file mode 100644
index 0000000..dc2e6e5
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/EventsStorage.java
@@ -0,0 +1,110 @@
+// Copyright (C) 2018 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import com.google.common.hash.Hashing;
+import com.google.gson.Gson;
+import com.google.inject.Inject;
+import com.google.inject.ProvisionException;
+import com.google.inject.Singleton;
+import java.io.IOException;
+import java.nio.file.DirectoryStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+import org.eclipse.jgit.lib.ObjectId;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Singleton
+public class EventsStorage {
+ private static final Logger log = LoggerFactory.getLogger(EventsStorage.class);
+
+ public static class ReplicateRefUpdate {
+ public String project;
+ public String ref;
+ }
+
+ private static Gson GSON = new Gson();
+
+ private final Path refUpdates;
+
+ @Inject
+ EventsStorage(ReplicationConfig config) {
+ refUpdates = config.getEventsDirectory().resolve("ref-updates");
+ }
+
+ public String persist(String project, String ref) {
+ ReplicateRefUpdate r = new ReplicateRefUpdate();
+ r.project = project;
+ r.ref = ref;
+
+ String json = GSON.toJson(r);
+ String eventKey = sha1(json).name();
+ Path file = refUpdates().resolve(eventKey);
+
+ if (Files.exists(file)) {
+ return eventKey;
+ }
+
+ try {
+ Files.write(file, json.getBytes(UTF_8));
+ } catch (IOException e) {
+ log.warn("Couldn't persist event {}", json);
+ }
+ return eventKey;
+ }
+
+ public void delete(String eventKey) {
+ if (eventKey != null) {
+ try {
+ Files.delete(refUpdates().resolve(eventKey));
+ } catch (IOException e) {
+ log.error("Error while deleting event {}", eventKey);
+ }
+ }
+ }
+
+ public List<ReplicateRefUpdate> list() {
+ ArrayList<ReplicateRefUpdate> result = new ArrayList<>();
+ try (DirectoryStream<Path> events = Files.newDirectoryStream(refUpdates())) {
+ for (Path e : events) {
+ if (Files.isRegularFile(e)) {
+ String json = new String(Files.readAllBytes(e), UTF_8);
+ result.add(GSON.fromJson(json, ReplicateRefUpdate.class));
+ Files.delete(e);
+ }
+ }
+ } catch (IOException e) {
+ log.error("Error when firing pending events", e);
+ }
+ return result;
+ }
+
+ private ObjectId sha1(String s) {
+ return ObjectId.fromRaw(Hashing.sha1().hashString(s, UTF_8).asBytes());
+ }
+
+ private Path refUpdates() {
+ try {
+ return Files.createDirectories(refUpdates);
+ } catch (IOException e) {
+ throw new ProvisionException(String.format("Couldn't create %s", refUpdates), e);
+ }
+ }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/OnStartStop.java b/src/main/java/com/googlesource/gerrit/plugins/replication/OnStartStop.java
index 227804d..2d60d86 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/OnStartStop.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/OnStartStop.java
@@ -21,6 +21,7 @@ import com.google.gerrit.extensions.registration.DynamicItem;
import com.google.gerrit.extensions.systemstatus.ServerInformation;
import com.google.inject.Inject;
import com.googlesource.gerrit.plugins.replication.PushResultProcessing.GitUpdateProcessing;
+import com.googlesource.gerrit.plugins.replication.ReplicationState.Factory;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
@@ -32,6 +33,7 @@ public class OnStartStop implements LifecycleListener {
private final ReplicationQueue queue;
private final ReplicationConfig config;
private final DynamicItem<EventDispatcher> eventDispatcher;
+ private final Factory replicationStateFactory;
@Inject
protected OnStartStop(
@@ -39,12 +41,14 @@ public class OnStartStop implements LifecycleListener {
PushAll.Factory pushAll,
ReplicationQueue queue,
ReplicationConfig config,
- DynamicItem<EventDispatcher> eventDispatcher) {
+ DynamicItem<EventDispatcher> eventDispatcher,
+ ReplicationState.Factory replicationStateFactory) {
this.srvInfo = srvInfo;
this.pushAll = pushAll;
this.queue = queue;
this.config = config;
this.eventDispatcher = eventDispatcher;
+ this.replicationStateFactory = replicationStateFactory;
this.pushAllFuture = Atomics.newReference();
}
@@ -54,7 +58,8 @@ public class OnStartStop implements LifecycleListener {
if (srvInfo.getState() == ServerInformation.State.STARTUP
&& config.isReplicateAllOnPluginStart()) {
- ReplicationState state = new ReplicationState(new GitUpdateProcessing(eventDispatcher.get()));
+ ReplicationState state =
+ replicationStateFactory.create(new GitUpdateProcessing(eventDispatcher.get()));
pushAllFuture.set(
pushAll
.create(null, ReplicationFilter.all(), state, false)
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationConfig.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationConfig.java
index 869a49b..c9531e3 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationConfig.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationConfig.java
@@ -14,6 +14,7 @@
package com.googlesource.gerrit.plugins.replication;
import com.google.gerrit.server.git.WorkQueue;
+import java.nio.file.Path;
import java.util.List;
public interface ReplicationConfig {
@@ -32,6 +33,8 @@ public interface ReplicationConfig {
boolean isEmpty();
+ Path getEventsDirectory();
+
int shutdown();
void startup(WorkQueue workQueue);
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfig.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfig.java
index db9f35d..1bba96d 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfig.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfig.java
@@ -17,8 +17,10 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
import static java.util.stream.Collectors.toList;
+import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
+import com.google.gerrit.extensions.annotations.PluginData;
import com.google.gerrit.server.config.ConfigUtil;
import com.google.gerrit.server.config.SitePaths;
import com.google.gerrit.server.git.WorkQueue;
@@ -47,19 +49,24 @@ public class ReplicationFileBasedConfig implements ReplicationConfig {
private static final int DEFAULT_SSH_CONNECTION_TIMEOUT_MS = 2 * 60 * 1000; // 2 minutes
private List<Destination> destinations;
+ private final SitePaths site;
private Path cfgPath;
private boolean replicateAllOnPluginStart;
private boolean defaultForceUpdate;
private int sshCommandTimeout;
private int sshConnectionTimeout = DEFAULT_SSH_CONNECTION_TIMEOUT_MS;
private final FileBasedConfig config;
+ private final Path pluginDataDir;
@Inject
- public ReplicationFileBasedConfig(SitePaths site, DestinationFactory destinationFactory)
+ public ReplicationFileBasedConfig(
+ SitePaths site, DestinationFactory destinationFactory, @PluginData Path pluginDataDir)
throws ConfigInvalidException, IOException {
+ this.site = site;
this.cfgPath = site.etc_dir.resolve("replication.config");
this.config = new FileBasedConfig(cfgPath.toFile(), FS.DETECTED);
this.destinations = allDestinations(destinationFactory);
+ this.pluginDataDir = pluginDataDir;
}
/*
@@ -199,6 +206,15 @@ public class ReplicationFileBasedConfig implements ReplicationConfig {
return destinations.isEmpty();
}
+ @Override
+ public Path getEventsDirectory() {
+ String eventsDirectory = config.getString("replication", null, "eventsDirectory");
+ if (!Strings.isNullOrEmpty(eventsDirectory)) {
+ return site.resolve(eventsDirectory);
+ }
+ return pluginDataDir;
+ }
+
Path getCfgPath() {
return cfgPath;
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationModule.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationModule.java
index f30e13d..5e7c978 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationModule.java
@@ -56,6 +56,7 @@ class ReplicationModule extends AbstractModule {
install(new FactoryModuleBuilder().build(PushAll.Factory.class));
install(new FactoryModuleBuilder().build(RemoteSiteUser.Factory.class));
+ install(new FactoryModuleBuilder().build(ReplicationState.Factory.class));
bind(ReplicationConfig.class).to(AutoReloadConfigDecorator.class);
bind(ReplicationStateListener.class).to(ReplicationStateLogger.class);
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
index 4c7bdfc..c79363b 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
@@ -74,6 +74,8 @@ public class ReplicationQueue
private final DynamicItem<EventDispatcher> dispatcher;
private final ReplicationConfig config;
private final GerritSshApi gerritAdmin;
+ private final ReplicationState.Factory replicationStateFactory;
+ private final EventsStorage eventsStorage;
private volatile boolean running;
private final Queue<GitReferenceUpdatedListener.Event> beforeStartupEventsQueue;
@@ -84,13 +86,17 @@ public class ReplicationQueue
GerritSshApi ga,
ReplicationConfig rc,
DynamicItem<EventDispatcher> dis,
- ReplicationStateListener sl) {
+ ReplicationStateListener sl,
+ ReplicationState.Factory rsf,
+ EventsStorage es) {
workQueue = wq;
sshHelper = sh;
dispatcher = dis;
config = rc;
stateLog = sl;
gerritAdmin = ga;
+ replicationStateFactory = rsf;
+ eventsStorage = es;
beforeStartupEventsQueue = Queues.newConcurrentLinkedQueue();
}
@@ -98,6 +104,7 @@ public class ReplicationQueue
public void start() {
config.startup(workQueue);
running = true;
+ firePendingEvents();
fireBeforeStartupEvents();
}
@@ -132,7 +139,8 @@ public class ReplicationQueue
@Override
public void onGitReferenceUpdated(GitReferenceUpdatedListener.Event event) {
- ReplicationState state = new ReplicationState(new GitUpdateProcessing(dispatcher.get()));
+ ReplicationState state =
+ replicationStateFactory.create(new GitUpdateProcessing(dispatcher.get()));
if (!running) {
stateLog.warn(
"Replication plugin did not finish startup before event, event replication is postponed",
@@ -140,18 +148,33 @@ public class ReplicationQueue
beforeStartupEventsQueue.add(event);
return;
}
+ onGitReferenceUpdated(event.getProjectName(), event.getRefName());
+ }
- Project.NameKey project = new Project.NameKey(event.getProjectName());
+ private void onGitReferenceUpdated(String projectName, String refName) {
+ ReplicationState state =
+ replicationStateFactory.create(new GitUpdateProcessing(dispatcher.get()));
+
+ Project.NameKey project = new Project.NameKey(projectName);
for (Destination cfg : config.getDestinations(FilterType.ALL)) {
- if (cfg.wouldPushProject(project) && cfg.wouldPushRef(event.getRefName())) {
+ if (cfg.wouldPushProject(project) && cfg.wouldPushRef(refName)) {
+ String eventKey = eventsStorage.persist(projectName, refName);
+ state.setEventKey(eventKey);
for (URIish uri : cfg.getURIs(project, null)) {
- cfg.schedule(project, event.getRefName(), uri, state);
+ cfg.schedule(project, refName, uri, state);
}
}
}
state.markAllPushTasksScheduled();
}
+ private void firePendingEvents() {
+ for (EventsStorage.ReplicateRefUpdate e : eventsStorage.list()) {
+ repLog.info("Firing pending event {}", e);
+ onGitReferenceUpdated(e.project, e.ref);
+ }
+ }
+
@Override
public void onNewProjectCreated(NewProjectCreatedListener.Event event) {
Project.NameKey projectName = new Project.NameKey(event.getProjectName());
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationState.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationState.java
index 86557e2..6f0803a 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationState.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationState.java
@@ -16,6 +16,8 @@ package com.googlesource.gerrit.plugins.replication;
import com.google.common.collect.HashBasedTable;
import com.google.common.collect.Table;
+import com.google.inject.assistedinject.Assisted;
+import com.google.inject.assistedinject.AssistedInject;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@@ -23,7 +25,13 @@ import org.eclipse.jgit.transport.RemoteRefUpdate;
import org.eclipse.jgit.transport.URIish;
public class ReplicationState {
+
+ public interface Factory {
+ ReplicationState create(PushResultProcessing processing);
+ }
+
private boolean allScheduled;
+ private final EventsStorage eventsStorage;
private final PushResultProcessing pushResultProcessing;
private final Lock countingLock = new ReentrantLock();
@@ -49,7 +57,11 @@ public class ReplicationState {
private int totalPushTasksCount;
private int finishedPushTasksCount;
- public ReplicationState(PushResultProcessing processing) {
+ private String eventKey;
+
+ @AssistedInject
+ ReplicationState(EventsStorage storage, @Assisted PushResultProcessing processing) {
+ eventsStorage = storage;
pushResultProcessing = processing;
statusByProjectRef = HashBasedTable.create();
}
@@ -74,6 +86,7 @@ public class ReplicationState {
URIish uri,
RefPushResult status,
RemoteRefUpdate.Status refUpdateStatus) {
+ deleteEvent();
pushResultProcessing.onRefReplicatedToOneNode(project, ref, uri, status, refUpdateStatus);
RefReplicationStatus completedRefStatus = null;
@@ -103,6 +116,12 @@ public class ReplicationState {
}
}
+ private void deleteEvent() {
+ if (eventKey != null) {
+ eventsStorage.delete(eventKey);
+ }
+ }
+
public void markAllPushTasksScheduled() {
countingLock.lock();
try {
@@ -173,4 +192,8 @@ public class ReplicationState {
return name().toLowerCase().replace("_", "-");
}
}
+
+ public void setEventKey(String eventKey) {
+ this.eventKey = eventKey;
+ }
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/StartCommand.java b/src/main/java/com/googlesource/gerrit/plugins/replication/StartCommand.java
index 7115d5b..2dbc7b7 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/StartCommand.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/StartCommand.java
@@ -52,6 +52,7 @@ final class StartCommand extends SshCommand {
@Inject private PushAll.Factory pushFactory;
private final Object lock = new Object();
+ @Inject private ReplicationState.Factory replicationStateFactory;
@Override
protected void run() throws Failure {
@@ -59,7 +60,7 @@ final class StartCommand extends SshCommand {
throw new UnloggedFailure(1, "error: cannot combine --all and PROJECT");
}
- ReplicationState state = new ReplicationState(new CommandProcessing(this));
+ ReplicationState state = replicationStateFactory.create(new CommandProcessing(this));
Future<?> future = null;
ReplicationFilter projectFilter;
diff --git a/src/main/resources/Documentation/config.md b/src/main/resources/Documentation/config.md
index e70094c..81a0613 100644
--- a/src/main/resources/Documentation/config.md
+++ b/src/main/resources/Documentation/config.md
@@ -124,6 +124,17 @@ replication.maxRetries
By default, pushes are retried indefinitely.
+replication.eventsDirectory
+: Directory where replication events are persisted
+
+ When scheduling a replication, the replication event is persisted
+ under this directory. When the replication is done, the event is deleted.
+ If plugin is stopped before all scheduled replications are done, the
+ persisted events will not be deleted. When the plugin is started again,
+ it will trigger all replications found under this directory.
+
+ When not set, defaults to the plugin's data directory.
+
remote.NAME.url
: Address of the remote server to push to. Multiple URLs may be
specified within a single remote block, listing different
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStateTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStateTest.java
index 193af1e..cf6715e 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStateTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStateTest.java
@@ -32,12 +32,15 @@ public class ReplicationStateTest {
private ReplicationState replicationState;
private PushResultProcessing pushResultProcessingMock;
+ private EventsStorage eventsStorage;
@Before
public void setUp() throws Exception {
pushResultProcessingMock = createNiceMock(PushResultProcessing.class);
replay(pushResultProcessingMock);
- replicationState = new ReplicationState(pushResultProcessingMock);
+ eventsStorage = createNiceMock(EventsStorage.class);
+ replay(eventsStorage);
+ replicationState = new ReplicationState(eventsStorage, pushResultProcessingMock);
}
@Test