summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSaša Živkov <sasa.zivkov@sap.com>2024-04-02 12:17:17 +0200
committerSaša Živkov <sasa.zivkov@sap.com>2024-04-02 13:48:50 +0200
commit012f04240eafe6dfa21fd94e012e97498881c621 (patch)
treecaabf11346d53f6567639897a3dadd786e9c43bf
parentcc47d53d4870e2c285c3b72202ebe6f3fdd67b57 (diff)
Remove remaining event posting from synchronized sections
Posting events sometimes can take significant time, mainly due to the event visibility checks. When this is done from a synchronized block then all other threads trying to schedule replication to the same (locked) Destination get blocked. Even worse, the synchronization happens on the Destination level, so even operations on unrelated repositories block each other. Change-Id: I7498c7b36ee4b0b2d915a414a0fb1054160e8a1c
-rw-r--r--src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java35
1 files changed, 21 insertions, 14 deletions
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
index 5316e35..6054a4a 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
@@ -73,7 +73,6 @@ import java.net.URISyntaxException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -395,13 +394,13 @@ public class Destination {
void schedule(
Project.NameKey project, Set<String> refs, URIish uri, ReplicationState state, boolean now) {
- Set<String> refsToSchedule = new HashSet<>();
+ ImmutableSet.Builder<String> toSchedule = ImmutableSet.builder();
for (String ref : refs) {
if (!shouldReplicate(project, ref, state)) {
repLog.atFine().log("Not scheduling replication %s:%s => %s", project, ref, uri);
continue;
}
- refsToSchedule.add(ref);
+ toSchedule.add(ref);
}
repLog.atInfo().log("scheduling replication %s:%s => %s", project, refs, uri);
@@ -430,11 +429,13 @@ public class Destination {
}
}
+ ImmutableSet<String> refsToSchedule = toSchedule.build();
+ PushOne task;
synchronized (stateLock) {
- PushOne task = getPendingPush(uri);
+ task = getPendingPush(uri);
if (task == null) {
task = opFactory.create(project, uri);
- addRefs(task, ImmutableSet.copyOf(refsToSchedule));
+ task.addRefBatch(refsToSchedule);
task.addState(refsToSchedule, state);
@SuppressWarnings("unused")
ScheduledFuture<?> ignored =
@@ -444,7 +445,7 @@ public class Destination {
"scheduled %s:%s => %s to run %s",
project, refsToSchedule, task, now ? "now" : "after " + config.getDelay() + "s");
} else {
- addRefs(task, ImmutableSet.copyOf(refsToSchedule));
+ task.addRefBatch(refsToSchedule);
task.addState(refsToSchedule, state);
repLog.atInfo().log(
"consolidated %s:%s => %s with an existing pending push",
@@ -454,6 +455,7 @@ public class Destination {
state.increasePushTaskCount(project.get(), ref);
}
}
+ postReplicationScheduledEvent(task, refsToSchedule);
}
@Nullable
@@ -489,11 +491,6 @@ public class Destination {
pool.schedule(updateHeadFactory.create(uri, project, newHead), 0, TimeUnit.SECONDS);
}
- private void addRefs(PushOne e, ImmutableSet<String> refs) {
- e.addRefBatch(refs);
- postReplicationScheduledEvent(e, refs);
- }
-
/**
* It schedules again a PushOp instance.
*
@@ -516,6 +513,10 @@ public class Destination {
* @param pushOp The PushOp instance to be scheduled.
*/
void reschedule(PushOne pushOp, RetryReason reason) {
+ boolean isRescheduled = false;
+ boolean isFailed = false;
+ RemoteRefUpdate.Status failedStatus = null;
+
synchronized (stateLock) {
URIish uri = pushOp.getURI();
PushOne pendingPushOp = getPendingPush(uri);
@@ -571,13 +572,13 @@ public class Destination {
case TRANSPORT_ERROR:
case REPOSITORY_MISSING:
default:
- RemoteRefUpdate.Status status =
+ failedStatus =
RetryReason.REPOSITORY_MISSING.equals(reason)
? NON_EXISTING
: REJECTED_OTHER_REASON;
- postReplicationFailedEvent(pushOp, status);
+ isFailed = true;
if (pushOp.setToRetry()) {
- postReplicationScheduledEvent(pushOp);
+ isRescheduled = true;
replicationTasksStorage.get().reset(pushOp);
@SuppressWarnings("unused")
ScheduledFuture<?> ignored2 =
@@ -594,6 +595,12 @@ public class Destination {
}
}
}
+ if (isFailed) {
+ postReplicationFailedEvent(pushOp, failedStatus);
+ }
+ if (isRescheduled) {
+ postReplicationScheduledEvent(pushOp);
+ }
}
RunwayStatus requestRunway(PushOne op) {