summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMartin Fick <mfick@codeaurora.org>2021-03-10 17:04:18 -0700
committerKaushik Lingarkar <kaushikl@codeaurora.org>2021-03-25 13:25:05 -0700
commit839bbc509216fd4386c226697d732f96f2cb794c (patch)
treec1f26226b71d5a1112b810530ed845f2ea506d7d
parent3e2febe2cb6e26076a7d9d4e974071cdd7e787bf (diff)
Avoid checking for existing tasks while pruning
When the distributor runs, it now stores a snapshot of pending pushes and then removes from this snapshot all the RefUpdates which were found while adding pending persisted tasks. The remaining pushes in the snapshot can now be pruned without needing to do an existence check on them since they were no longer stored persistently (and thus no longer needed to be executed). This effectively makes pruning I/O less, thereby reducing the load put by distributor on disk I/O. Change-Id: I0916a57b302fd7d207fd31ec26df65d262a76124
-rw-r--r--src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java14
-rw-r--r--src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java36
2 files changed, 31 insertions, 19 deletions
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
index dfe7e79..8ef21d0 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
@@ -63,6 +63,7 @@ import com.google.inject.assistedinject.Assisted;
import com.google.inject.assistedinject.FactoryModuleBuilder;
import com.google.inject.servlet.RequestScoped;
import com.googlesource.gerrit.plugins.replication.ReplicationState.RefPushResult;
+import com.googlesource.gerrit.plugins.replication.ReplicationTasksStorage.ReplicateRefUpdate;
import com.googlesource.gerrit.plugins.replication.events.ProjectDeletionState;
import com.googlesource.gerrit.plugins.replication.events.RefReplicatedEvent;
import com.googlesource.gerrit.plugins.replication.events.ReplicationScheduledEvent;
@@ -70,7 +71,6 @@ import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -608,15 +608,15 @@ public class Destination {
}
}
- public Set<String> getPrunableTaskNames() {
- Set<String> names = new HashSet<>();
+ public Map<ReplicateRefUpdate, String> getTaskNamesByReplicateRefUpdate() {
+ Map<ReplicateRefUpdate, String> taskNameByReplicateRefUpdate = new HashMap<>();
for (PushOne push : pending.values()) {
- if (!replicationTasksStorage.get().isWaiting(push)) {
- repLog.atFine().log("No longer isWaiting, can prune %s", push.getURI());
- names.add(push.toString());
+ String taskName = push.toString();
+ for (ReplicateRefUpdate refUpdate : push.getReplicateRefUpdates()) {
+ taskNameByReplicateRefUpdate.put(refUpdate, taskName);
}
}
- return names;
+ return taskNameByReplicateRefUpdate;
}
boolean wouldPush(URIish uri, Project.NameKey project, String ref) {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
index ed474ae..4abb295 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
@@ -41,6 +41,7 @@ import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -69,6 +70,11 @@ public class ReplicationQueue
private final Queue<ReferenceUpdatedEvent> beforeStartupEventsQueue;
private Distributor distributor;
+ protected enum Prune {
+ TRUE,
+ FALSE;
+ }
+
@Inject
ReplicationQueue(
ReplicationConfig rc,
@@ -94,7 +100,7 @@ public class ReplicationQueue
destinations.get().startup(workQueue);
running = true;
replicationTasksStorage.recoverAll();
- firePendingEvents();
+ synchronizePendingEvents(Prune.FALSE);
fireBeforeStartupEvents();
distributor = new Distributor(workQueue);
}
@@ -193,8 +199,14 @@ public class ReplicationQueue
}
}
- private void firePendingEvents() {
+ private void synchronizePendingEvents(Prune prune) {
if (replaying.compareAndSet(false, true)) {
+ final Map<ReplicateRefUpdate, String> taskNamesByReplicateRefUpdate = new ConcurrentHashMap<>();
+ if (Prune.TRUE.equals(prune)) {
+ for (Destination destination : destinations.get().getAll(FilterType.ALL)) {
+ taskNamesByReplicateRefUpdate.putAll(destination.getTaskNamesByReplicateRefUpdate());
+ }
+ }
new ChainedScheduler.StreamScheduler<>(
workQueue.getDefaultQueue(),
replicationTasksStorage.streamWaiting(),
@@ -203,6 +215,9 @@ public class ReplicationQueue
public void run(ReplicationTasksStorage.ReplicateRefUpdate u) {
try {
fire(new URIish(u.uri()), Project.nameKey(u.project()), u.ref());
+ if (Prune.TRUE.equals(prune)) {
+ taskNamesByReplicateRefUpdate.remove(u);
+ }
} catch (URISyntaxException e) {
repLog.atSevere().withCause(e).log(
"Encountered malformed URI for persisted event %s", u);
@@ -213,6 +228,9 @@ public class ReplicationQueue
@Override
public void onDone() {
+ if (Prune.TRUE.equals(prune)) {
+ pruneNoLongerPending(taskNamesByReplicateRefUpdate.values());
+ }
replaying.set(false);
}
@@ -224,17 +242,12 @@ public class ReplicationQueue
}
}
- private void pruneCompleted() {
+ private void pruneNoLongerPending(Collection<String> prunableTaskNames) {
// Queue tasks have wrappers around them so workQueue.getTasks() does not return the PushOnes.
// We also cannot access them by taskId since PushOnes don't have a taskId, they do have
- // and Id, but it not the id assigned to the task in the queues. The tasks in the queue
- // do use the same name as returned by toString() though, so that be used to correlate
+ // an Id, but it is not the id assigned to the task in the queues. The tasks in the queue
+ // do use the same name as returned by toString() though, so that can be used to correlate
// PushOnes with queue tasks despite their wrappers.
- Set<String> prunableTaskNames = new HashSet<>();
- for (Destination destination : destinations.get().getAll(FilterType.ALL)) {
- prunableTaskNames.addAll(destination.getPrunableTaskNames());
- }
-
for (WorkQueue.Task<?> task : workQueue.getTasks()) {
WorkQueue.Task.State state = task.getState();
if (state == WorkQueue.Task.State.SLEEPING || state == WorkQueue.Task.State.READY) {
@@ -310,8 +323,7 @@ public class ReplicationQueue
return;
}
try {
- firePendingEvents();
- pruneCompleted();
+ synchronizePendingEvents(Prune.TRUE);
} catch (Exception e) {
repLog.atSevere().withCause(e).log("error distributing tasks");
}