diff options
author | Saša Živkov <sasa.zivkov@sap.com> | 2024-04-02 12:17:17 +0200 |
---|---|---|
committer | Saša Živkov <sasa.zivkov@sap.com> | 2024-04-02 13:48:50 +0200 |
commit | 012f04240eafe6dfa21fd94e012e97498881c621 (patch) | |
tree | caabf11346d53f6567639897a3dadd786e9c43bf | |
parent | cc47d53d4870e2c285c3b72202ebe6f3fdd67b57 (diff) |
Remove remaining event posting from synchronized sections
Posting events sometimes can take significant time, mainly due to the
event visibility checks. When this is done from a synchronized block
then all other threads trying to schedule replication to the same
(locked) Destination get blocked. Even worse, the synchronization
happens on the Destination level, so even operations on unrelated
repositories block each other.
Change-Id: I7498c7b36ee4b0b2d915a414a0fb1054160e8a1c
-rw-r--r-- | src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java | 35 |
1 files changed, 21 insertions, 14 deletions
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java index 5316e35..6054a4a 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java @@ -73,7 +73,6 @@ import java.net.URISyntaxException; import java.util.Collection; import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -395,13 +394,13 @@ public class Destination { void schedule( Project.NameKey project, Set<String> refs, URIish uri, ReplicationState state, boolean now) { - Set<String> refsToSchedule = new HashSet<>(); + ImmutableSet.Builder<String> toSchedule = ImmutableSet.builder(); for (String ref : refs) { if (!shouldReplicate(project, ref, state)) { repLog.atFine().log("Not scheduling replication %s:%s => %s", project, ref, uri); continue; } - refsToSchedule.add(ref); + toSchedule.add(ref); } repLog.atInfo().log("scheduling replication %s:%s => %s", project, refs, uri); @@ -430,11 +429,13 @@ public class Destination { } } + ImmutableSet<String> refsToSchedule = toSchedule.build(); + PushOne task; synchronized (stateLock) { - PushOne task = getPendingPush(uri); + task = getPendingPush(uri); if (task == null) { task = opFactory.create(project, uri); - addRefs(task, ImmutableSet.copyOf(refsToSchedule)); + task.addRefBatch(refsToSchedule); task.addState(refsToSchedule, state); @SuppressWarnings("unused") ScheduledFuture<?> ignored = @@ -444,7 +445,7 @@ public class Destination { "scheduled %s:%s => %s to run %s", project, refsToSchedule, task, now ? "now" : "after " + config.getDelay() + "s"); } else { - addRefs(task, ImmutableSet.copyOf(refsToSchedule)); + task.addRefBatch(refsToSchedule); task.addState(refsToSchedule, state); repLog.atInfo().log( "consolidated %s:%s => %s with an existing pending push", @@ -454,6 +455,7 @@ public class Destination { state.increasePushTaskCount(project.get(), ref); } } + postReplicationScheduledEvent(task, refsToSchedule); } @Nullable @@ -489,11 +491,6 @@ public class Destination { pool.schedule(updateHeadFactory.create(uri, project, newHead), 0, TimeUnit.SECONDS); } - private void addRefs(PushOne e, ImmutableSet<String> refs) { - e.addRefBatch(refs); - postReplicationScheduledEvent(e, refs); - } - /** * It schedules again a PushOp instance. * @@ -516,6 +513,10 @@ public class Destination { * @param pushOp The PushOp instance to be scheduled. */ void reschedule(PushOne pushOp, RetryReason reason) { + boolean isRescheduled = false; + boolean isFailed = false; + RemoteRefUpdate.Status failedStatus = null; + synchronized (stateLock) { URIish uri = pushOp.getURI(); PushOne pendingPushOp = getPendingPush(uri); @@ -571,13 +572,13 @@ public class Destination { case TRANSPORT_ERROR: case REPOSITORY_MISSING: default: - RemoteRefUpdate.Status status = + failedStatus = RetryReason.REPOSITORY_MISSING.equals(reason) ? NON_EXISTING : REJECTED_OTHER_REASON; - postReplicationFailedEvent(pushOp, status); + isFailed = true; if (pushOp.setToRetry()) { - postReplicationScheduledEvent(pushOp); + isRescheduled = true; replicationTasksStorage.get().reset(pushOp); @SuppressWarnings("unused") ScheduledFuture<?> ignored2 = @@ -594,6 +595,12 @@ public class Destination { } } } + if (isFailed) { + postReplicationFailedEvent(pushOp, failedStatus); + } + if (isRescheduled) { + postReplicationScheduledEvent(pushOp); + } } RunwayStatus requestRunway(PushOne op) { |