diff options
Diffstat (limited to 'src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java')
-rw-r--r-- | src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java | 283 |
1 files changed, 151 insertions, 132 deletions
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java index ed361a9..f8e2d7b 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java @@ -14,15 +14,20 @@ package com.googlesource.gerrit.plugins.replication; +import static com.googlesource.gerrit.plugins.replication.PushResultProcessing.resolveNodeName; + import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSet.Builder; import com.google.common.collect.Lists; +import com.google.gerrit.common.EventDispatcher; import com.google.gerrit.common.data.GroupReference; import com.google.gerrit.extensions.config.FactoryModule; +import com.google.gerrit.extensions.registration.DynamicItem; import com.google.gerrit.reviewdb.client.AccountGroup; +import com.google.gerrit.reviewdb.client.Branch; import com.google.gerrit.reviewdb.client.Project; import com.google.gerrit.reviewdb.client.RefNames; import com.google.gerrit.reviewdb.server.ReviewDb; @@ -45,16 +50,6 @@ import com.google.inject.Provider; import com.google.inject.Provides; import com.google.inject.assistedinject.FactoryModuleBuilder; import com.google.inject.servlet.RequestScoped; - -import org.apache.commons.io.FilenameUtils; -import org.eclipse.jgit.lib.Constants; -import org.eclipse.jgit.lib.Ref; -import org.eclipse.jgit.lib.Repository; -import org.eclipse.jgit.transport.RefSpec; -import org.eclipse.jgit.transport.RemoteConfig; -import org.eclipse.jgit.transport.URIish; -import org.slf4j.Logger; - import java.io.IOException; import java.io.UnsupportedEncodingException; import java.net.URLEncoder; @@ -63,6 +58,14 @@ import java.util.List; import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; +import org.apache.commons.io.FilenameUtils; +import org.eclipse.jgit.lib.Constants; +import org.eclipse.jgit.lib.Ref; +import org.eclipse.jgit.lib.Repository; +import org.eclipse.jgit.transport.RefSpec; +import org.eclipse.jgit.transport.RemoteConfig; +import org.eclipse.jgit.transport.URIish; +import org.slf4j.Logger; public class Destination { private static final Logger repLog = ReplicationQueue.repLog; @@ -76,31 +79,36 @@ public class Destination { private volatile WorkQueue.Executor pool; private final PerThreadRequestScope.Scoper threadScoper; private final DestinationConfiguration config; + private final DynamicItem<EventDispatcher> eventDispatcher; protected enum RetryReason { - TRANSPORT_ERROR, COLLISION, REPOSITORY_MISSING; + TRANSPORT_ERROR, + COLLISION, + REPOSITORY_MISSING; } public static class QueueInfo { public final Map<URIish, PushOne> pending; public final Map<URIish, PushOne> inFlight; - public QueueInfo(Map<URIish, PushOne> pending, - Map<URIish, PushOne> inFlight) { + public QueueInfo(Map<URIish, PushOne> pending, Map<URIish, PushOne> inFlight) { this.pending = ImmutableMap.copyOf(pending); this.inFlight = ImmutableMap.copyOf(inFlight); } } - protected Destination(Injector injector, + protected Destination( + Injector injector, DestinationConfiguration cfg, RemoteSiteUser.Factory replicationUserFactory, PluginUser pluginUser, GitRepositoryManager gitRepositoryManager, GroupBackend groupBackend, ReplicationStateListener stateLog, - GroupIncludeCache groupIncludeCache) { + GroupIncludeCache groupIncludeCache, + DynamicItem<EventDispatcher> eventDispatcher) { config = cfg; + this.eventDispatcher = eventDispatcher; gitManager = gitRepositoryManager; this.stateLog = stateLog; @@ -113,59 +121,62 @@ public class Destination { builder.add(g.getUUID()); addRecursiveParents(g.getUUID(), builder, groupIncludeCache); } else { - repLog.warn(String.format( - "Group \"%s\" not recognized, removing from authGroup", name)); + repLog.warn(String.format("Group \"%s\" not recognized, removing from authGroup", name)); } } - remoteUser = replicationUserFactory.create( - new ListGroupMembership(builder.build())); + remoteUser = replicationUserFactory.create(new ListGroupMembership(builder.build())); } else { remoteUser = pluginUser; } - Injector child = injector.createChildInjector(new FactoryModule() { - @Override - protected void configure() { - bindScope(RequestScoped.class, PerThreadRequestScope.REQUEST); - bind(PerThreadRequestScope.Propagator.class); - bind(PerRequestProjectControlCache.class).in(RequestScoped.class); - - bind(Destination.class).toInstance(Destination.this); - bind(RemoteConfig.class).toInstance(config.getRemoteConfig()); - install(new FactoryModuleBuilder().build(PushOne.Factory.class)); - } - - @Provides - public PerThreadRequestScope.Scoper provideScoper( - final PerThreadRequestScope.Propagator propagator, - final Provider<RequestScopedReviewDbProvider> dbProvider) { - final RequestContext requestContext = new RequestContext() { - @Override - public CurrentUser getUser() { - return remoteUser; - } - - @Override - public Provider<ReviewDb> getReviewDbProvider() { - return dbProvider.get(); - } - }; - return new PerThreadRequestScope.Scoper() { - @Override - public <T> Callable<T> scope(Callable<T> callable) { - return propagator.scope(requestContext, callable); - } - }; - } - }); + Injector child = + injector.createChildInjector( + new FactoryModule() { + @Override + protected void configure() { + bindScope(RequestScoped.class, PerThreadRequestScope.REQUEST); + bind(PerThreadRequestScope.Propagator.class); + bind(PerRequestProjectControlCache.class).in(RequestScoped.class); + + bind(Destination.class).toInstance(Destination.this); + bind(RemoteConfig.class).toInstance(config.getRemoteConfig()); + install(new FactoryModuleBuilder().build(PushOne.Factory.class)); + } + + @Provides + public PerThreadRequestScope.Scoper provideScoper( + final PerThreadRequestScope.Propagator propagator, + final Provider<RequestScopedReviewDbProvider> dbProvider) { + final RequestContext requestContext = + new RequestContext() { + @Override + public CurrentUser getUser() { + return remoteUser; + } + + @Override + public Provider<ReviewDb> getReviewDbProvider() { + return dbProvider.get(); + } + }; + return new PerThreadRequestScope.Scoper() { + @Override + public <T> Callable<T> scope(Callable<T> callable) { + return propagator.scope(requestContext, callable); + } + }; + } + }); projectControlFactory = child.getInstance(ProjectControl.Factory.class); opFactory = child.getInstance(PushOne.Factory.class); threadScoper = child.getInstance(PerThreadRequestScope.Scoper.class); } - private void addRecursiveParents(AccountGroup.UUID g, - Builder<AccountGroup.UUID> builder, GroupIncludeCache groupIncludeCache) { + private void addRecursiveParents( + AccountGroup.UUID g, + Builder<AccountGroup.UUID> builder, + GroupIncludeCache groupIncludeCache) { for (AccountGroup.UUID p : groupIncludeCache.parentGroupsOf(g)) { if (builder.build().contains(p)) { continue; @@ -200,52 +211,55 @@ public class Destination { } private boolean shouldReplicate(ProjectControl projectControl) { - return projectControl.isReadable() && (!projectControl.isHidden() - || config.replicateHiddenProjects()); + return projectControl.isReadable() + && (!projectControl.isHidden() || config.replicateHiddenProjects()); } - private boolean shouldReplicate(final Project.NameKey project, final String ref, - ReplicationState... states) { + private boolean shouldReplicate( + final Project.NameKey project, final String ref, ReplicationState... states) { try { - return threadScoper.scope(new Callable<Boolean>() { - @Override - public Boolean call() throws NoSuchProjectException { - ProjectControl projectControl = controlFor(project); - return shouldReplicate(projectControl) - && (PushOne.ALL_REFS.equals(ref) - || projectControl.controlForRef(ref).isVisible()); - } - }).call(); + return threadScoper + .scope( + new Callable<Boolean>() { + @Override + public Boolean call() throws NoSuchProjectException { + ProjectControl projectControl = controlFor(project); + return shouldReplicate(projectControl) + && (PushOne.ALL_REFS.equals(ref) + || projectControl.controlForRef(ref).isVisible()); + } + }) + .call(); } catch (NoSuchProjectException err) { - stateLog.error(String.format("source project %s not available", project), - err, states); + stateLog.error(String.format("source project %s not available", project), err, states); } catch (Exception e) { - throw Throwables.propagate(e); + Throwables.throwIfUnchecked(e); + throw new RuntimeException(e); } return false; } - private boolean shouldReplicate(final Project.NameKey project, - ReplicationState... states) { + private boolean shouldReplicate(final Project.NameKey project, ReplicationState... states) { try { - return threadScoper.scope(new Callable<Boolean>() { - @Override - public Boolean call() throws NoSuchProjectException { - return shouldReplicate(controlFor(project)); - } - }).call(); + return threadScoper + .scope( + new Callable<Boolean>() { + @Override + public Boolean call() throws NoSuchProjectException { + return shouldReplicate(controlFor(project)); + } + }) + .call(); } catch (NoSuchProjectException err) { - stateLog.error(String.format("source project %s not available", project), - err, states); + stateLog.error(String.format("source project %s not available", project), err, states); } catch (Exception e) { - Throwables.propagateIfPossible(e); + Throwables.throwIfUnchecked(e); throw new RuntimeException(e); } return false; } - void schedule(Project.NameKey project, String ref, URIish uri, - ReplicationState state) { + void schedule(Project.NameKey project, String ref, URIish uri, ReplicationState state) { repLog.info("scheduling replication {}:{} => {}", project, ref, uri); if (!shouldReplicate(project, ref, state)) { return; @@ -266,13 +280,11 @@ public class Destination { return; } } catch (IOException err) { - stateLog.error(String.format( - "cannot check type of project %s", project), err, state); + stateLog.error(String.format("cannot check type of project %s", project), err, state); return; } } catch (IOException err) { - stateLog.error(String.format( - "source project %s not available", project), err, state); + stateLog.error(String.format("source project %s not available", project), err, state); return; } } @@ -282,14 +294,16 @@ public class Destination { PushOne e = pending.get(uri); if (e == null) { e = opFactory.create(project, uri); + addRef(e, ref); + e.addState(ref, state); pool.schedule(e, config.getDelay(), TimeUnit.SECONDS); pending.put(uri, e); + } else if (!e.getRefs().contains(ref)) { + addRef(e, ref); + e.addState(ref, state); } - e.addRef(ref); state.increasePushTaskCount(project.get(), ref); - e.addState(ref, state); - repLog.info("scheduled {}:{} => {} to run after {}s", project, ref, - e, config.getDelay()); + repLog.info("scheduled {}:{} => {} to run after {}s", project, ref, e, config.getDelay()); } } @@ -300,32 +314,29 @@ public class Destination { } } + private void addRef(PushOne e, String ref) { + e.addRef(ref); + postEvent(e, ref); + } + /** * It schedules again a PushOp instance. - * <p> - * If the reason for rescheduling is to avoid a collision - * with an in-flight push to the same URI, we don't - * mark the operation as "retrying," and we schedule - * using the replication delay, rather than the retry - * delay. Otherwise, the operation is marked as - * "retrying" and scheduled to run following the - * minutes count determined by class attribute retryDelay. - * <p> - * In case the PushOp instance to be scheduled has same - * URI than one marked as "retrying," it adds to the one - * pending the refs list of the parameter instance. - * <p> - * In case the PushOp instance to be scheduled has the - * same URI as one pending, but not marked "retrying," it - * indicates the one pending should be canceled when it - * starts executing, removes it from pending list, and - * adds its refs to the parameter instance. The parameter - * instance is scheduled for retry. - * <p> - * Notice all operations to indicate a PushOp should be - * canceled, or it is retrying, or remove/add it from/to - * pending Map should be protected by synchronizing on the - * stateLock object. + * + * <p>If the reason for rescheduling is to avoid a collision with an in-flight push to the same + * URI, we don't mark the operation as "retrying," and we schedule using the replication delay, + * rather than the retry delay. Otherwise, the operation is marked as "retrying" and scheduled to + * run following the minutes count determined by class attribute retryDelay. + * + * <p>In case the PushOp instance to be scheduled has same URI than one marked as "retrying," it + * adds to the one pending the refs list of the parameter instance. + * + * <p>In case the PushOp instance to be scheduled has the same URI as one pending, but not marked + * "retrying," it indicates the one pending should be canceled when it starts executing, removes + * it from pending list, and adds its refs to the parameter instance. The parameter instance is + * scheduled for retry. + * + * <p>Notice all operations to indicate a PushOp should be canceled, or it is retrying, or + * remove/add it from/to pending Map should be protected by synchronizing on the stateLock object. * * @param pushOp The PushOp instance to be scheduled. */ @@ -378,13 +389,19 @@ public class Destination { pending.put(uri, pushOp); switch (reason) { case COLLISION: - pool.schedule(pushOp, config.getDelay(), TimeUnit.SECONDS); + pool.schedule(pushOp, config.getRescheduleDelay(), TimeUnit.SECONDS); break; case TRANSPORT_ERROR: case REPOSITORY_MISSING: default: if (pushOp.setToRetry()) { pool.schedule(pushOp, config.getRetryDelay(), TimeUnit.MINUTES); + } else { + pushOp.canceledByReplication(); + pending.remove(uri); + stateLog.error( + "Push to " + pushOp.getURI() + " cancelled after maximum number of retries", + pushOp.getStatesAsArray()); } break; } @@ -392,8 +409,7 @@ public class Destination { } } - ProjectControl controlFor(Project.NameKey project) - throws NoSuchProjectException { + ProjectControl controlFor(Project.NameKey project) throws NoSuchProjectException { return projectControlFactory.controlFor(project); } @@ -474,8 +490,7 @@ public class Destination { } List<URIish> getURIs(Project.NameKey project, String urlMatch) { - List<URIish> r = Lists.newArrayListWithCapacity( - config.getRemoteConfig().getURIs().size()); + List<URIish> r = Lists.newArrayListWithCapacity(config.getRemoteConfig().getURIs().size()); for (URIish uri : config.getRemoteConfig().getURIs()) { if (matches(uri, urlMatch)) { String name = project.get(); @@ -490,12 +505,11 @@ public class Destination { } else if (remoteNameStyle.equals("basenameOnly")) { name = FilenameUtils.getBaseName(name); } else if (!remoteNameStyle.equals("slash")) { - repLog.debug(String.format( - "Unknown remoteNameStyle: %s, falling back to slash", - remoteNameStyle)); + repLog.debug( + String.format("Unknown remoteNameStyle: %s, falling back to slash", remoteNameStyle)); } - String replacedPath = ReplicationQueue.replaceName(uri.getPath(), name, - isSingleProjectMatch()); + String replacedPath = + ReplicationQueue.replaceName(uri.getPath(), name, isSingleProjectMatch()); if (replacedPath != null) { uri = uri.setPath(replacedPath); r.add(uri); @@ -507,8 +521,8 @@ public class Destination { static boolean needsUrlEncoding(URIish uri) { return "http".equalsIgnoreCase(uri.getScheme()) - || "https".equalsIgnoreCase(uri.getScheme()) - || "amazon-s3".equalsIgnoreCase(uri.getScheme()); + || "https".equalsIgnoreCase(uri.getScheme()) + || "amazon-s3".equalsIgnoreCase(uri.getScheme()); } static String encode(String str) { @@ -518,9 +532,7 @@ public class Destination { // path used to the repository. Space is incorrectly encoded as '+' for this // context. In the path part of a URI space should be %20, but in form data // space is '+'. Our cleanup replace fixes these two issues. - return URLEncoder.encode(str, "UTF-8") - .replaceAll("%2[fF]", "/") - .replace("+", "%20"); + return URLEncoder.encode(str, "UTF-8").replaceAll("%2[fF]", "/").replace("+", "%20"); } catch (UnsupportedEncodingException e) { throw new RuntimeException(e); } @@ -560,4 +572,11 @@ public class Destination { } return uri.toString().contains(urlMatch); } + + private void postEvent(PushOne pushOp, String ref) { + Project.NameKey project = pushOp.getProjectNameKey(); + String targetNode = resolveNodeName(pushOp.getURI()); + ReplicationScheduledEvent event = new ReplicationScheduledEvent(project.get(), ref, targetNode); + eventDispatcher.get().postEvent(new Branch.NameKey(project, ref), event); + } } |