diff options
author | Martin Fick <mfick@codeaurora.org> | 2021-03-10 17:04:18 -0700 |
---|---|---|
committer | Kaushik Lingarkar <kaushikl@codeaurora.org> | 2021-03-25 13:25:05 -0700 |
commit | 839bbc509216fd4386c226697d732f96f2cb794c (patch) | |
tree | c1f26226b71d5a1112b810530ed845f2ea506d7d | |
parent | 3e2febe2cb6e26076a7d9d4e974071cdd7e787bf (diff) |
Avoid checking for existing tasks while pruning
When the distributor runs, it now stores a snapshot of pending
pushes and then removes from this snapshot all the RefUpdates
which were found while adding pending persisted tasks. The
remaining pushes in the snapshot can now be pruned without
needing to do an existence check on them since they were no
longer stored persistently (and thus no longer needed to be
executed). This effectively makes pruning I/O less, thereby
reducing the load put by distributor on disk I/O.
Change-Id: I0916a57b302fd7d207fd31ec26df65d262a76124
-rw-r--r-- | src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java | 14 | ||||
-rw-r--r-- | src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java | 36 |
2 files changed, 31 insertions, 19 deletions
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java index dfe7e79..8ef21d0 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java @@ -63,6 +63,7 @@ import com.google.inject.assistedinject.Assisted; import com.google.inject.assistedinject.FactoryModuleBuilder; import com.google.inject.servlet.RequestScoped; import com.googlesource.gerrit.plugins.replication.ReplicationState.RefPushResult; +import com.googlesource.gerrit.plugins.replication.ReplicationTasksStorage.ReplicateRefUpdate; import com.googlesource.gerrit.plugins.replication.events.ProjectDeletionState; import com.googlesource.gerrit.plugins.replication.events.RefReplicatedEvent; import com.googlesource.gerrit.plugins.replication.events.ReplicationScheduledEvent; @@ -70,7 +71,6 @@ import java.io.IOException; import java.io.UnsupportedEncodingException; import java.net.URLEncoder; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -608,15 +608,15 @@ public class Destination { } } - public Set<String> getPrunableTaskNames() { - Set<String> names = new HashSet<>(); + public Map<ReplicateRefUpdate, String> getTaskNamesByReplicateRefUpdate() { + Map<ReplicateRefUpdate, String> taskNameByReplicateRefUpdate = new HashMap<>(); for (PushOne push : pending.values()) { - if (!replicationTasksStorage.get().isWaiting(push)) { - repLog.atFine().log("No longer isWaiting, can prune %s", push.getURI()); - names.add(push.toString()); + String taskName = push.toString(); + for (ReplicateRefUpdate refUpdate : push.getReplicateRefUpdates()) { + taskNameByReplicateRefUpdate.put(refUpdate, taskName); } } - return names; + return taskNameByReplicateRefUpdate; } boolean wouldPush(URIish uri, Project.NameKey project, String ref) { diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java index ed474ae..4abb295 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java @@ -41,6 +41,7 @@ import java.util.Map; import java.util.Optional; import java.util.Queue; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.atomic.AtomicBoolean; @@ -69,6 +70,11 @@ public class ReplicationQueue private final Queue<ReferenceUpdatedEvent> beforeStartupEventsQueue; private Distributor distributor; + protected enum Prune { + TRUE, + FALSE; + } + @Inject ReplicationQueue( ReplicationConfig rc, @@ -94,7 +100,7 @@ public class ReplicationQueue destinations.get().startup(workQueue); running = true; replicationTasksStorage.recoverAll(); - firePendingEvents(); + synchronizePendingEvents(Prune.FALSE); fireBeforeStartupEvents(); distributor = new Distributor(workQueue); } @@ -193,8 +199,14 @@ public class ReplicationQueue } } - private void firePendingEvents() { + private void synchronizePendingEvents(Prune prune) { if (replaying.compareAndSet(false, true)) { + final Map<ReplicateRefUpdate, String> taskNamesByReplicateRefUpdate = new ConcurrentHashMap<>(); + if (Prune.TRUE.equals(prune)) { + for (Destination destination : destinations.get().getAll(FilterType.ALL)) { + taskNamesByReplicateRefUpdate.putAll(destination.getTaskNamesByReplicateRefUpdate()); + } + } new ChainedScheduler.StreamScheduler<>( workQueue.getDefaultQueue(), replicationTasksStorage.streamWaiting(), @@ -203,6 +215,9 @@ public class ReplicationQueue public void run(ReplicationTasksStorage.ReplicateRefUpdate u) { try { fire(new URIish(u.uri()), Project.nameKey(u.project()), u.ref()); + if (Prune.TRUE.equals(prune)) { + taskNamesByReplicateRefUpdate.remove(u); + } } catch (URISyntaxException e) { repLog.atSevere().withCause(e).log( "Encountered malformed URI for persisted event %s", u); @@ -213,6 +228,9 @@ public class ReplicationQueue @Override public void onDone() { + if (Prune.TRUE.equals(prune)) { + pruneNoLongerPending(taskNamesByReplicateRefUpdate.values()); + } replaying.set(false); } @@ -224,17 +242,12 @@ public class ReplicationQueue } } - private void pruneCompleted() { + private void pruneNoLongerPending(Collection<String> prunableTaskNames) { // Queue tasks have wrappers around them so workQueue.getTasks() does not return the PushOnes. // We also cannot access them by taskId since PushOnes don't have a taskId, they do have - // and Id, but it not the id assigned to the task in the queues. The tasks in the queue - // do use the same name as returned by toString() though, so that be used to correlate + // an Id, but it is not the id assigned to the task in the queues. The tasks in the queue + // do use the same name as returned by toString() though, so that can be used to correlate // PushOnes with queue tasks despite their wrappers. - Set<String> prunableTaskNames = new HashSet<>(); - for (Destination destination : destinations.get().getAll(FilterType.ALL)) { - prunableTaskNames.addAll(destination.getPrunableTaskNames()); - } - for (WorkQueue.Task<?> task : workQueue.getTasks()) { WorkQueue.Task.State state = task.getState(); if (state == WorkQueue.Task.State.SLEEPING || state == WorkQueue.Task.State.READY) { @@ -310,8 +323,7 @@ public class ReplicationQueue return; } try { - firePendingEvents(); - pruneCompleted(); + synchronizePendingEvents(Prune.TRUE); } catch (Exception e) { repLog.atSevere().withCause(e).log("error distributing tasks"); } |