diff options
author | Stefan Zager <szager@google.com> | 2013-03-01 12:19:20 -0800 |
---|---|---|
committer | Oswald Buddenhagen <oswald.buddenhagen@theqtcompany.com> | 2015-02-10 21:05:03 +0000 |
commit | e5cb89869362bc0d75b786561c77952f3f5896d9 (patch) | |
tree | b15c9a100e1097d3d623b73988dab2080518e655 | |
parent | 1aacda9b4a4d3868002c52f8dcb8c6b53562b2d9 (diff) |
Serialize pushes to each destination URI.
Scheduling a retry to avoid collision with an in-flight
push is differentiated from a retry due to a transport
error. In the case of collision avoidance, the job is
rescheduled according to the replication delay, rather
than the retry delay.
Change-Id: I55b592feccfea24e5d05978d749ef6c4054b5bc5
(cherry picked from commit 3fffd16403c1d3a4d6f0b13e9f33f4f78df1f6b4)
Reviewed-by: Ismo Haataja <ismo.haataja@digia.com>
Reviewed-by: Oswald Buddenhagen <oswald.buddenhagen@theqtcompany.com>
-rw-r--r-- | src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java | 72 | ||||
-rw-r--r-- | src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java | 82 |
2 files changed, 91 insertions, 63 deletions
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java index 4271dc9..65e9631 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java @@ -75,7 +75,9 @@ class Destination { private final String[] projects; private final int delay; private final int retryDelay; + private final Object stateLock = new Object(); private final Map<URIish, PushOne> pending = new HashMap<URIish, PushOne>(); + private final Map<URIish, PushOne> inFlight = new HashMap<URIish, PushOne>(); private final PushOne.Factory opFactory; private final ProjectControl.Factory projectControlFactory; private final GitRepositoryManager gitManager; @@ -84,6 +86,10 @@ class Destination { private volatile WorkQueue.Executor pool; private final PerThreadRequestScope.Scoper threadScoper; + protected static enum RetryReason { + TRANSPORT_ERROR, COLLISION; + } + Destination(final Injector injector, final RemoteConfig rc, final Config cfg, @@ -207,7 +213,7 @@ class Destination { if (!replicatePermissions) { PushOne e; - synchronized (pending) { + synchronized (stateLock) { e = pending.get(uri); } if (e == null) { @@ -236,7 +242,7 @@ class Destination { } } - synchronized (pending) { + synchronized (stateLock) { PushOne e = pending.get(uri); if (e == null) { e = opFactory.create(project, uri); @@ -252,17 +258,20 @@ class Destination { /** * It schedules again a PushOp instance. * <p> - * It is assumed to be previously scheduled and found a - * transport exception. It will schedule it as a push - * operation to be retried after the minutes count - * determined by class attribute retryDelay. + * If the reason for rescheduling is to avoid a collison + * with an in-flight push to the same URI, we don't + * mark the operation as "retrying," and we schedule + * using the replication delay, rather than the retry + * delay. Otherwise, the operation is marked as + * "retrying" and scheduled to run following the + * minutes count determined by class attribute retryDelay. * <p> * In case the PushOp instance to be scheduled has same - * URI than one also pending for retry, it adds to the one + * URI than one marked as "retrying," it adds to the one * pending the refs list of the parameter instance. * <p> - * In case the PushOp instance to be scheduled has same - * URI than one pending, but not pending for retry, it + * In case the PushOp instance to be scheduled has the + * same URI as one pending, but not marked "retrying," it * indicates the one pending should be canceled when it * starts executing, removes it from pending list, and * adds its refs to the parameter instance. The parameter @@ -270,14 +279,13 @@ class Destination { * <p> * Notice all operations to indicate a PushOp should be * canceled, or it is retrying, or remove/add it from/to - * pending Map should be protected by the lock on pending - * Map class instance attribute. + * pending Map should be protected by synchronizing on the + * stateLock object. * * @param pushOp The PushOp instance to be scheduled. */ - void reschedule(PushOne pushOp) { - // It locks access to pending variable. - synchronized (pending) { + void reschedule(PushOne pushOp, RetryReason reason) { + synchronized (stateLock) { URIish uri = pushOp.getURI(); PushOne pendingPushOp = pending.get(uri); @@ -323,13 +331,17 @@ class Destination { } if (pendingPushOp == null || !pendingPushOp.isRetrying()) { - // The PushOp method param instance should be scheduled for retry. - // Remember when retrying it should be used different delay. - - pushOp.setToRetry(); - pending.put(uri, pushOp); - pool.schedule(pushOp, retryDelay, TimeUnit.MINUTES); + switch (reason) { + case COLLISION: + pool.schedule(pushOp, delay, TimeUnit.SECONDS); + break; + case TRANSPORT_ERROR: + default: + pushOp.setToRetry(); + pool.schedule(pushOp, retryDelay, TimeUnit.MINUTES); + break; + } } } } @@ -339,11 +351,23 @@ class Destination { return projectControlFactory.controlFor(project); } - void notifyStarting(PushOne op) { - synchronized (pending) { - if (!op.wasCanceled()) { - pending.remove(op.getURI()); + boolean requestRunway(PushOne op) { + synchronized (stateLock) { + if (op.wasCanceled()) { + return false; + } + pending.remove(op.getURI()); + if (inFlight.containsKey(op.getURI())) { + return false; } + inFlight.put(op.getURI(), op); + } + return true; + } + + void notifyFinished(PushOne op) { + synchronized (stateLock) { + inFlight.remove(op.getURI()); } } diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java b/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java index b4c926b..596d4ac 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java @@ -249,51 +249,55 @@ class PushOne implements ProjectRunnable { // we start replication (instead a new instance, with the same URI, is // created and scheduled for a future point in time.) // - pool.notifyStarting(this); + if (!pool.requestRunway(this)) { + if (!canceled) { + log.info("Rescheduling replication to " + uri + + " to avoid collision with an in-flight push."); + pool.reschedule(this, Destination.RetryReason.COLLISION); + } + return; + } - // It should only verify if it was canceled after calling notifyStarting, - // since the canceled flag would be set locking the queue. - if (!canceled) { - try { - git = gitManager.openRepository(projectName); - runImpl(); - } catch (RepositoryNotFoundException e) { - wrappedLog.error("Cannot replicate " + projectName + "; " + e.getMessage(), getStatesAsArray()); - - } catch (RemoteRepositoryException e) { - log.error("Cannot replicate " + projectName + "; " + e.getMessage()); - - } catch (NoRemoteRepositoryException e) { - wrappedLog.error("Cannot replicate to " + uri + "; repository not found", getStatesAsArray()); - - } catch (NotSupportedException e) { - wrappedLog.error("Cannot replicate to " + uri, e, getStatesAsArray()); - - } catch (TransportException e) { - Throwable cause = e.getCause(); - if (cause instanceof JSchException - && cause.getMessage().startsWith("UnknownHostKey:")) { - log.error("Cannot replicate to " + uri + ": " + cause.getMessage()); - } else { - log.error("Cannot replicate to " + uri, e); - } + try { + git = gitManager.openRepository(projectName); + runImpl(); + } catch (RepositoryNotFoundException e) { + wrappedLog.error("Cannot replicate " + projectName + "; " + e.getMessage(), getStatesAsArray()); + + } catch (RemoteRepositoryException e) { + log.error("Cannot replicate " + projectName + "; " + e.getMessage()); + + } catch (NoRemoteRepositoryException e) { + wrappedLog.error("Cannot replicate to " + uri + "; repository not found", getStatesAsArray()); + + } catch (NotSupportedException e) { + wrappedLog.error("Cannot replicate to " + uri, e, getStatesAsArray()); + + } catch (TransportException e) { + Throwable cause = e.getCause(); + if (cause instanceof JSchException + && cause.getMessage().startsWith("UnknownHostKey:")) { + log.error("Cannot replicate to " + uri + ": " + cause.getMessage()); + } else { + log.error("Cannot replicate to " + uri, e); + } - // The remote push operation should be retried. - pool.reschedule(this); - } catch (IOException e) { - wrappedLog.error("Cannot replicate to " + uri, e, getStatesAsArray()); + // The remote push operation should be retried. + pool.reschedule(this, Destination.RetryReason.TRANSPORT_ERROR); + } catch (IOException e) { + wrappedLog.error("Cannot replicate to " + uri, e, getStatesAsArray()); - } catch (RuntimeException e) { - wrappedLog.error("Unexpected error during replication to " + uri, e, getStatesAsArray()); + } catch (RuntimeException e) { + wrappedLog.error("Unexpected error during replication to " + uri, e, getStatesAsArray()); - } catch (Error e) { - wrappedLog.error("Unexpected error during replication to " + uri, e, getStatesAsArray()); + } catch (Error e) { + wrappedLog.error("Unexpected error during replication to " + uri, e, getStatesAsArray()); - } finally { - if (git != null) { - git.close(); - } + } finally { + if (git != null) { + git.close(); } + pool.notifyFinished(this); } } |