summaryrefslogtreecommitdiffstats
path: root/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java')
-rw-r--r--src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java33
1 files changed, 28 insertions, 5 deletions
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
index 4c7bdfc..c79363b 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
@@ -74,6 +74,8 @@ public class ReplicationQueue
private final DynamicItem<EventDispatcher> dispatcher;
private final ReplicationConfig config;
private final GerritSshApi gerritAdmin;
+ private final ReplicationState.Factory replicationStateFactory;
+ private final EventsStorage eventsStorage;
private volatile boolean running;
private final Queue<GitReferenceUpdatedListener.Event> beforeStartupEventsQueue;
@@ -84,13 +86,17 @@ public class ReplicationQueue
GerritSshApi ga,
ReplicationConfig rc,
DynamicItem<EventDispatcher> dis,
- ReplicationStateListener sl) {
+ ReplicationStateListener sl,
+ ReplicationState.Factory rsf,
+ EventsStorage es) {
workQueue = wq;
sshHelper = sh;
dispatcher = dis;
config = rc;
stateLog = sl;
gerritAdmin = ga;
+ replicationStateFactory = rsf;
+ eventsStorage = es;
beforeStartupEventsQueue = Queues.newConcurrentLinkedQueue();
}
@@ -98,6 +104,7 @@ public class ReplicationQueue
public void start() {
config.startup(workQueue);
running = true;
+ firePendingEvents();
fireBeforeStartupEvents();
}
@@ -132,7 +139,8 @@ public class ReplicationQueue
@Override
public void onGitReferenceUpdated(GitReferenceUpdatedListener.Event event) {
- ReplicationState state = new ReplicationState(new GitUpdateProcessing(dispatcher.get()));
+ ReplicationState state =
+ replicationStateFactory.create(new GitUpdateProcessing(dispatcher.get()));
if (!running) {
stateLog.warn(
"Replication plugin did not finish startup before event, event replication is postponed",
@@ -140,18 +148,33 @@ public class ReplicationQueue
beforeStartupEventsQueue.add(event);
return;
}
+ onGitReferenceUpdated(event.getProjectName(), event.getRefName());
+ }
- Project.NameKey project = new Project.NameKey(event.getProjectName());
+ private void onGitReferenceUpdated(String projectName, String refName) {
+ ReplicationState state =
+ replicationStateFactory.create(new GitUpdateProcessing(dispatcher.get()));
+
+ Project.NameKey project = new Project.NameKey(projectName);
for (Destination cfg : config.getDestinations(FilterType.ALL)) {
- if (cfg.wouldPushProject(project) && cfg.wouldPushRef(event.getRefName())) {
+ if (cfg.wouldPushProject(project) && cfg.wouldPushRef(refName)) {
+ String eventKey = eventsStorage.persist(projectName, refName);
+ state.setEventKey(eventKey);
for (URIish uri : cfg.getURIs(project, null)) {
- cfg.schedule(project, event.getRefName(), uri, state);
+ cfg.schedule(project, refName, uri, state);
}
}
}
state.markAllPushTasksScheduled();
}
+ private void firePendingEvents() {
+ for (EventsStorage.ReplicateRefUpdate e : eventsStorage.list()) {
+ repLog.info("Firing pending event {}", e);
+ onGitReferenceUpdated(e.project, e.ref);
+ }
+ }
+
@Override
public void onNewProjectCreated(NewProjectCreatedListener.Event event) {
Project.NameKey projectName = new Project.NameKey(event.getProjectName());