summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorStefan Zager <szager@google.com>2013-03-01 12:19:20 -0800
committerOswald Buddenhagen <oswald.buddenhagen@theqtcompany.com>2015-02-10 21:05:03 +0000
commite5cb89869362bc0d75b786561c77952f3f5896d9 (patch)
treeb15c9a100e1097d3d623b73988dab2080518e655
parent1aacda9b4a4d3868002c52f8dcb8c6b53562b2d9 (diff)
Serialize pushes to each destination URI.
Scheduling a retry to avoid collision with an in-flight push is differentiated from a retry due to a transport error. In the case of collision avoidance, the job is rescheduled according to the replication delay, rather than the retry delay. Change-Id: I55b592feccfea24e5d05978d749ef6c4054b5bc5 (cherry picked from commit 3fffd16403c1d3a4d6f0b13e9f33f4f78df1f6b4) Reviewed-by: Ismo Haataja <ismo.haataja@digia.com> Reviewed-by: Oswald Buddenhagen <oswald.buddenhagen@theqtcompany.com>
-rw-r--r--src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java72
-rw-r--r--src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java82
2 files changed, 91 insertions, 63 deletions
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
index 4271dc9..65e9631 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
@@ -75,7 +75,9 @@ class Destination {
private final String[] projects;
private final int delay;
private final int retryDelay;
+ private final Object stateLock = new Object();
private final Map<URIish, PushOne> pending = new HashMap<URIish, PushOne>();
+ private final Map<URIish, PushOne> inFlight = new HashMap<URIish, PushOne>();
private final PushOne.Factory opFactory;
private final ProjectControl.Factory projectControlFactory;
private final GitRepositoryManager gitManager;
@@ -84,6 +86,10 @@ class Destination {
private volatile WorkQueue.Executor pool;
private final PerThreadRequestScope.Scoper threadScoper;
+ protected static enum RetryReason {
+ TRANSPORT_ERROR, COLLISION;
+ }
+
Destination(final Injector injector,
final RemoteConfig rc,
final Config cfg,
@@ -207,7 +213,7 @@ class Destination {
if (!replicatePermissions) {
PushOne e;
- synchronized (pending) {
+ synchronized (stateLock) {
e = pending.get(uri);
}
if (e == null) {
@@ -236,7 +242,7 @@ class Destination {
}
}
- synchronized (pending) {
+ synchronized (stateLock) {
PushOne e = pending.get(uri);
if (e == null) {
e = opFactory.create(project, uri);
@@ -252,17 +258,20 @@ class Destination {
/**
* It schedules again a PushOp instance.
* <p>
- * It is assumed to be previously scheduled and found a
- * transport exception. It will schedule it as a push
- * operation to be retried after the minutes count
- * determined by class attribute retryDelay.
+ * If the reason for rescheduling is to avoid a collison
+ * with an in-flight push to the same URI, we don't
+ * mark the operation as "retrying," and we schedule
+ * using the replication delay, rather than the retry
+ * delay. Otherwise, the operation is marked as
+ * "retrying" and scheduled to run following the
+ * minutes count determined by class attribute retryDelay.
* <p>
* In case the PushOp instance to be scheduled has same
- * URI than one also pending for retry, it adds to the one
+ * URI than one marked as "retrying," it adds to the one
* pending the refs list of the parameter instance.
* <p>
- * In case the PushOp instance to be scheduled has same
- * URI than one pending, but not pending for retry, it
+ * In case the PushOp instance to be scheduled has the
+ * same URI as one pending, but not marked "retrying," it
* indicates the one pending should be canceled when it
* starts executing, removes it from pending list, and
* adds its refs to the parameter instance. The parameter
@@ -270,14 +279,13 @@ class Destination {
* <p>
* Notice all operations to indicate a PushOp should be
* canceled, or it is retrying, or remove/add it from/to
- * pending Map should be protected by the lock on pending
- * Map class instance attribute.
+ * pending Map should be protected by synchronizing on the
+ * stateLock object.
*
* @param pushOp The PushOp instance to be scheduled.
*/
- void reschedule(PushOne pushOp) {
- // It locks access to pending variable.
- synchronized (pending) {
+ void reschedule(PushOne pushOp, RetryReason reason) {
+ synchronized (stateLock) {
URIish uri = pushOp.getURI();
PushOne pendingPushOp = pending.get(uri);
@@ -323,13 +331,17 @@ class Destination {
}
if (pendingPushOp == null || !pendingPushOp.isRetrying()) {
- // The PushOp method param instance should be scheduled for retry.
- // Remember when retrying it should be used different delay.
-
- pushOp.setToRetry();
-
pending.put(uri, pushOp);
- pool.schedule(pushOp, retryDelay, TimeUnit.MINUTES);
+ switch (reason) {
+ case COLLISION:
+ pool.schedule(pushOp, delay, TimeUnit.SECONDS);
+ break;
+ case TRANSPORT_ERROR:
+ default:
+ pushOp.setToRetry();
+ pool.schedule(pushOp, retryDelay, TimeUnit.MINUTES);
+ break;
+ }
}
}
}
@@ -339,11 +351,23 @@ class Destination {
return projectControlFactory.controlFor(project);
}
- void notifyStarting(PushOne op) {
- synchronized (pending) {
- if (!op.wasCanceled()) {
- pending.remove(op.getURI());
+ boolean requestRunway(PushOne op) {
+ synchronized (stateLock) {
+ if (op.wasCanceled()) {
+ return false;
+ }
+ pending.remove(op.getURI());
+ if (inFlight.containsKey(op.getURI())) {
+ return false;
}
+ inFlight.put(op.getURI(), op);
+ }
+ return true;
+ }
+
+ void notifyFinished(PushOne op) {
+ synchronized (stateLock) {
+ inFlight.remove(op.getURI());
}
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java b/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java
index b4c926b..596d4ac 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java
@@ -249,51 +249,55 @@ class PushOne implements ProjectRunnable {
// we start replication (instead a new instance, with the same URI, is
// created and scheduled for a future point in time.)
//
- pool.notifyStarting(this);
+ if (!pool.requestRunway(this)) {
+ if (!canceled) {
+ log.info("Rescheduling replication to " + uri +
+ " to avoid collision with an in-flight push.");
+ pool.reschedule(this, Destination.RetryReason.COLLISION);
+ }
+ return;
+ }
- // It should only verify if it was canceled after calling notifyStarting,
- // since the canceled flag would be set locking the queue.
- if (!canceled) {
- try {
- git = gitManager.openRepository(projectName);
- runImpl();
- } catch (RepositoryNotFoundException e) {
- wrappedLog.error("Cannot replicate " + projectName + "; " + e.getMessage(), getStatesAsArray());
-
- } catch (RemoteRepositoryException e) {
- log.error("Cannot replicate " + projectName + "; " + e.getMessage());
-
- } catch (NoRemoteRepositoryException e) {
- wrappedLog.error("Cannot replicate to " + uri + "; repository not found", getStatesAsArray());
-
- } catch (NotSupportedException e) {
- wrappedLog.error("Cannot replicate to " + uri, e, getStatesAsArray());
-
- } catch (TransportException e) {
- Throwable cause = e.getCause();
- if (cause instanceof JSchException
- && cause.getMessage().startsWith("UnknownHostKey:")) {
- log.error("Cannot replicate to " + uri + ": " + cause.getMessage());
- } else {
- log.error("Cannot replicate to " + uri, e);
- }
+ try {
+ git = gitManager.openRepository(projectName);
+ runImpl();
+ } catch (RepositoryNotFoundException e) {
+ wrappedLog.error("Cannot replicate " + projectName + "; " + e.getMessage(), getStatesAsArray());
+
+ } catch (RemoteRepositoryException e) {
+ log.error("Cannot replicate " + projectName + "; " + e.getMessage());
+
+ } catch (NoRemoteRepositoryException e) {
+ wrappedLog.error("Cannot replicate to " + uri + "; repository not found", getStatesAsArray());
+
+ } catch (NotSupportedException e) {
+ wrappedLog.error("Cannot replicate to " + uri, e, getStatesAsArray());
+
+ } catch (TransportException e) {
+ Throwable cause = e.getCause();
+ if (cause instanceof JSchException
+ && cause.getMessage().startsWith("UnknownHostKey:")) {
+ log.error("Cannot replicate to " + uri + ": " + cause.getMessage());
+ } else {
+ log.error("Cannot replicate to " + uri, e);
+ }
- // The remote push operation should be retried.
- pool.reschedule(this);
- } catch (IOException e) {
- wrappedLog.error("Cannot replicate to " + uri, e, getStatesAsArray());
+ // The remote push operation should be retried.
+ pool.reschedule(this, Destination.RetryReason.TRANSPORT_ERROR);
+ } catch (IOException e) {
+ wrappedLog.error("Cannot replicate to " + uri, e, getStatesAsArray());
- } catch (RuntimeException e) {
- wrappedLog.error("Unexpected error during replication to " + uri, e, getStatesAsArray());
+ } catch (RuntimeException e) {
+ wrappedLog.error("Unexpected error during replication to " + uri, e, getStatesAsArray());
- } catch (Error e) {
- wrappedLog.error("Unexpected error during replication to " + uri, e, getStatesAsArray());
+ } catch (Error e) {
+ wrappedLog.error("Unexpected error during replication to " + uri, e, getStatesAsArray());
- } finally {
- if (git != null) {
- git.close();
- }
+ } finally {
+ if (git != null) {
+ git.close();
}
+ pool.notifyFinished(this);
}
}