summaryrefslogtreecommitdiffstats
path: root/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java')
-rw-r--r--src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java283
1 files changed, 151 insertions, 132 deletions
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
index ed361a9..f8e2d7b 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
@@ -14,15 +14,20 @@
package com.googlesource.gerrit.plugins.replication;
+import static com.googlesource.gerrit.plugins.replication.PushResultProcessing.resolveNodeName;
+
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.ImmutableSet.Builder;
import com.google.common.collect.Lists;
+import com.google.gerrit.common.EventDispatcher;
import com.google.gerrit.common.data.GroupReference;
import com.google.gerrit.extensions.config.FactoryModule;
+import com.google.gerrit.extensions.registration.DynamicItem;
import com.google.gerrit.reviewdb.client.AccountGroup;
+import com.google.gerrit.reviewdb.client.Branch;
import com.google.gerrit.reviewdb.client.Project;
import com.google.gerrit.reviewdb.client.RefNames;
import com.google.gerrit.reviewdb.server.ReviewDb;
@@ -45,16 +50,6 @@ import com.google.inject.Provider;
import com.google.inject.Provides;
import com.google.inject.assistedinject.FactoryModuleBuilder;
import com.google.inject.servlet.RequestScoped;
-
-import org.apache.commons.io.FilenameUtils;
-import org.eclipse.jgit.lib.Constants;
-import org.eclipse.jgit.lib.Ref;
-import org.eclipse.jgit.lib.Repository;
-import org.eclipse.jgit.transport.RefSpec;
-import org.eclipse.jgit.transport.RemoteConfig;
-import org.eclipse.jgit.transport.URIish;
-import org.slf4j.Logger;
-
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
@@ -63,6 +58,14 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
+import org.apache.commons.io.FilenameUtils;
+import org.eclipse.jgit.lib.Constants;
+import org.eclipse.jgit.lib.Ref;
+import org.eclipse.jgit.lib.Repository;
+import org.eclipse.jgit.transport.RefSpec;
+import org.eclipse.jgit.transport.RemoteConfig;
+import org.eclipse.jgit.transport.URIish;
+import org.slf4j.Logger;
public class Destination {
private static final Logger repLog = ReplicationQueue.repLog;
@@ -76,31 +79,36 @@ public class Destination {
private volatile WorkQueue.Executor pool;
private final PerThreadRequestScope.Scoper threadScoper;
private final DestinationConfiguration config;
+ private final DynamicItem<EventDispatcher> eventDispatcher;
protected enum RetryReason {
- TRANSPORT_ERROR, COLLISION, REPOSITORY_MISSING;
+ TRANSPORT_ERROR,
+ COLLISION,
+ REPOSITORY_MISSING;
}
public static class QueueInfo {
public final Map<URIish, PushOne> pending;
public final Map<URIish, PushOne> inFlight;
- public QueueInfo(Map<URIish, PushOne> pending,
- Map<URIish, PushOne> inFlight) {
+ public QueueInfo(Map<URIish, PushOne> pending, Map<URIish, PushOne> inFlight) {
this.pending = ImmutableMap.copyOf(pending);
this.inFlight = ImmutableMap.copyOf(inFlight);
}
}
- protected Destination(Injector injector,
+ protected Destination(
+ Injector injector,
DestinationConfiguration cfg,
RemoteSiteUser.Factory replicationUserFactory,
PluginUser pluginUser,
GitRepositoryManager gitRepositoryManager,
GroupBackend groupBackend,
ReplicationStateListener stateLog,
- GroupIncludeCache groupIncludeCache) {
+ GroupIncludeCache groupIncludeCache,
+ DynamicItem<EventDispatcher> eventDispatcher) {
config = cfg;
+ this.eventDispatcher = eventDispatcher;
gitManager = gitRepositoryManager;
this.stateLog = stateLog;
@@ -113,59 +121,62 @@ public class Destination {
builder.add(g.getUUID());
addRecursiveParents(g.getUUID(), builder, groupIncludeCache);
} else {
- repLog.warn(String.format(
- "Group \"%s\" not recognized, removing from authGroup", name));
+ repLog.warn(String.format("Group \"%s\" not recognized, removing from authGroup", name));
}
}
- remoteUser = replicationUserFactory.create(
- new ListGroupMembership(builder.build()));
+ remoteUser = replicationUserFactory.create(new ListGroupMembership(builder.build()));
} else {
remoteUser = pluginUser;
}
- Injector child = injector.createChildInjector(new FactoryModule() {
- @Override
- protected void configure() {
- bindScope(RequestScoped.class, PerThreadRequestScope.REQUEST);
- bind(PerThreadRequestScope.Propagator.class);
- bind(PerRequestProjectControlCache.class).in(RequestScoped.class);
-
- bind(Destination.class).toInstance(Destination.this);
- bind(RemoteConfig.class).toInstance(config.getRemoteConfig());
- install(new FactoryModuleBuilder().build(PushOne.Factory.class));
- }
-
- @Provides
- public PerThreadRequestScope.Scoper provideScoper(
- final PerThreadRequestScope.Propagator propagator,
- final Provider<RequestScopedReviewDbProvider> dbProvider) {
- final RequestContext requestContext = new RequestContext() {
- @Override
- public CurrentUser getUser() {
- return remoteUser;
- }
-
- @Override
- public Provider<ReviewDb> getReviewDbProvider() {
- return dbProvider.get();
- }
- };
- return new PerThreadRequestScope.Scoper() {
- @Override
- public <T> Callable<T> scope(Callable<T> callable) {
- return propagator.scope(requestContext, callable);
- }
- };
- }
- });
+ Injector child =
+ injector.createChildInjector(
+ new FactoryModule() {
+ @Override
+ protected void configure() {
+ bindScope(RequestScoped.class, PerThreadRequestScope.REQUEST);
+ bind(PerThreadRequestScope.Propagator.class);
+ bind(PerRequestProjectControlCache.class).in(RequestScoped.class);
+
+ bind(Destination.class).toInstance(Destination.this);
+ bind(RemoteConfig.class).toInstance(config.getRemoteConfig());
+ install(new FactoryModuleBuilder().build(PushOne.Factory.class));
+ }
+
+ @Provides
+ public PerThreadRequestScope.Scoper provideScoper(
+ final PerThreadRequestScope.Propagator propagator,
+ final Provider<RequestScopedReviewDbProvider> dbProvider) {
+ final RequestContext requestContext =
+ new RequestContext() {
+ @Override
+ public CurrentUser getUser() {
+ return remoteUser;
+ }
+
+ @Override
+ public Provider<ReviewDb> getReviewDbProvider() {
+ return dbProvider.get();
+ }
+ };
+ return new PerThreadRequestScope.Scoper() {
+ @Override
+ public <T> Callable<T> scope(Callable<T> callable) {
+ return propagator.scope(requestContext, callable);
+ }
+ };
+ }
+ });
projectControlFactory = child.getInstance(ProjectControl.Factory.class);
opFactory = child.getInstance(PushOne.Factory.class);
threadScoper = child.getInstance(PerThreadRequestScope.Scoper.class);
}
- private void addRecursiveParents(AccountGroup.UUID g,
- Builder<AccountGroup.UUID> builder, GroupIncludeCache groupIncludeCache) {
+ private void addRecursiveParents(
+ AccountGroup.UUID g,
+ Builder<AccountGroup.UUID> builder,
+ GroupIncludeCache groupIncludeCache) {
for (AccountGroup.UUID p : groupIncludeCache.parentGroupsOf(g)) {
if (builder.build().contains(p)) {
continue;
@@ -200,52 +211,55 @@ public class Destination {
}
private boolean shouldReplicate(ProjectControl projectControl) {
- return projectControl.isReadable() && (!projectControl.isHidden()
- || config.replicateHiddenProjects());
+ return projectControl.isReadable()
+ && (!projectControl.isHidden() || config.replicateHiddenProjects());
}
- private boolean shouldReplicate(final Project.NameKey project, final String ref,
- ReplicationState... states) {
+ private boolean shouldReplicate(
+ final Project.NameKey project, final String ref, ReplicationState... states) {
try {
- return threadScoper.scope(new Callable<Boolean>() {
- @Override
- public Boolean call() throws NoSuchProjectException {
- ProjectControl projectControl = controlFor(project);
- return shouldReplicate(projectControl)
- && (PushOne.ALL_REFS.equals(ref)
- || projectControl.controlForRef(ref).isVisible());
- }
- }).call();
+ return threadScoper
+ .scope(
+ new Callable<Boolean>() {
+ @Override
+ public Boolean call() throws NoSuchProjectException {
+ ProjectControl projectControl = controlFor(project);
+ return shouldReplicate(projectControl)
+ && (PushOne.ALL_REFS.equals(ref)
+ || projectControl.controlForRef(ref).isVisible());
+ }
+ })
+ .call();
} catch (NoSuchProjectException err) {
- stateLog.error(String.format("source project %s not available", project),
- err, states);
+ stateLog.error(String.format("source project %s not available", project), err, states);
} catch (Exception e) {
- throw Throwables.propagate(e);
+ Throwables.throwIfUnchecked(e);
+ throw new RuntimeException(e);
}
return false;
}
- private boolean shouldReplicate(final Project.NameKey project,
- ReplicationState... states) {
+ private boolean shouldReplicate(final Project.NameKey project, ReplicationState... states) {
try {
- return threadScoper.scope(new Callable<Boolean>() {
- @Override
- public Boolean call() throws NoSuchProjectException {
- return shouldReplicate(controlFor(project));
- }
- }).call();
+ return threadScoper
+ .scope(
+ new Callable<Boolean>() {
+ @Override
+ public Boolean call() throws NoSuchProjectException {
+ return shouldReplicate(controlFor(project));
+ }
+ })
+ .call();
} catch (NoSuchProjectException err) {
- stateLog.error(String.format("source project %s not available", project),
- err, states);
+ stateLog.error(String.format("source project %s not available", project), err, states);
} catch (Exception e) {
- Throwables.propagateIfPossible(e);
+ Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
}
return false;
}
- void schedule(Project.NameKey project, String ref, URIish uri,
- ReplicationState state) {
+ void schedule(Project.NameKey project, String ref, URIish uri, ReplicationState state) {
repLog.info("scheduling replication {}:{} => {}", project, ref, uri);
if (!shouldReplicate(project, ref, state)) {
return;
@@ -266,13 +280,11 @@ public class Destination {
return;
}
} catch (IOException err) {
- stateLog.error(String.format(
- "cannot check type of project %s", project), err, state);
+ stateLog.error(String.format("cannot check type of project %s", project), err, state);
return;
}
} catch (IOException err) {
- stateLog.error(String.format(
- "source project %s not available", project), err, state);
+ stateLog.error(String.format("source project %s not available", project), err, state);
return;
}
}
@@ -282,14 +294,16 @@ public class Destination {
PushOne e = pending.get(uri);
if (e == null) {
e = opFactory.create(project, uri);
+ addRef(e, ref);
+ e.addState(ref, state);
pool.schedule(e, config.getDelay(), TimeUnit.SECONDS);
pending.put(uri, e);
+ } else if (!e.getRefs().contains(ref)) {
+ addRef(e, ref);
+ e.addState(ref, state);
}
- e.addRef(ref);
state.increasePushTaskCount(project.get(), ref);
- e.addState(ref, state);
- repLog.info("scheduled {}:{} => {} to run after {}s", project, ref,
- e, config.getDelay());
+ repLog.info("scheduled {}:{} => {} to run after {}s", project, ref, e, config.getDelay());
}
}
@@ -300,32 +314,29 @@ public class Destination {
}
}
+ private void addRef(PushOne e, String ref) {
+ e.addRef(ref);
+ postEvent(e, ref);
+ }
+
/**
* It schedules again a PushOp instance.
- * <p>
- * If the reason for rescheduling is to avoid a collision
- * with an in-flight push to the same URI, we don't
- * mark the operation as "retrying," and we schedule
- * using the replication delay, rather than the retry
- * delay. Otherwise, the operation is marked as
- * "retrying" and scheduled to run following the
- * minutes count determined by class attribute retryDelay.
- * <p>
- * In case the PushOp instance to be scheduled has same
- * URI than one marked as "retrying," it adds to the one
- * pending the refs list of the parameter instance.
- * <p>
- * In case the PushOp instance to be scheduled has the
- * same URI as one pending, but not marked "retrying," it
- * indicates the one pending should be canceled when it
- * starts executing, removes it from pending list, and
- * adds its refs to the parameter instance. The parameter
- * instance is scheduled for retry.
- * <p>
- * Notice all operations to indicate a PushOp should be
- * canceled, or it is retrying, or remove/add it from/to
- * pending Map should be protected by synchronizing on the
- * stateLock object.
+ *
+ * <p>If the reason for rescheduling is to avoid a collision with an in-flight push to the same
+ * URI, we don't mark the operation as "retrying," and we schedule using the replication delay,
+ * rather than the retry delay. Otherwise, the operation is marked as "retrying" and scheduled to
+ * run following the minutes count determined by class attribute retryDelay.
+ *
+ * <p>In case the PushOp instance to be scheduled has same URI than one marked as "retrying," it
+ * adds to the one pending the refs list of the parameter instance.
+ *
+ * <p>In case the PushOp instance to be scheduled has the same URI as one pending, but not marked
+ * "retrying," it indicates the one pending should be canceled when it starts executing, removes
+ * it from pending list, and adds its refs to the parameter instance. The parameter instance is
+ * scheduled for retry.
+ *
+ * <p>Notice all operations to indicate a PushOp should be canceled, or it is retrying, or
+ * remove/add it from/to pending Map should be protected by synchronizing on the stateLock object.
*
* @param pushOp The PushOp instance to be scheduled.
*/
@@ -378,13 +389,19 @@ public class Destination {
pending.put(uri, pushOp);
switch (reason) {
case COLLISION:
- pool.schedule(pushOp, config.getDelay(), TimeUnit.SECONDS);
+ pool.schedule(pushOp, config.getRescheduleDelay(), TimeUnit.SECONDS);
break;
case TRANSPORT_ERROR:
case REPOSITORY_MISSING:
default:
if (pushOp.setToRetry()) {
pool.schedule(pushOp, config.getRetryDelay(), TimeUnit.MINUTES);
+ } else {
+ pushOp.canceledByReplication();
+ pending.remove(uri);
+ stateLog.error(
+ "Push to " + pushOp.getURI() + " cancelled after maximum number of retries",
+ pushOp.getStatesAsArray());
}
break;
}
@@ -392,8 +409,7 @@ public class Destination {
}
}
- ProjectControl controlFor(Project.NameKey project)
- throws NoSuchProjectException {
+ ProjectControl controlFor(Project.NameKey project) throws NoSuchProjectException {
return projectControlFactory.controlFor(project);
}
@@ -474,8 +490,7 @@ public class Destination {
}
List<URIish> getURIs(Project.NameKey project, String urlMatch) {
- List<URIish> r = Lists.newArrayListWithCapacity(
- config.getRemoteConfig().getURIs().size());
+ List<URIish> r = Lists.newArrayListWithCapacity(config.getRemoteConfig().getURIs().size());
for (URIish uri : config.getRemoteConfig().getURIs()) {
if (matches(uri, urlMatch)) {
String name = project.get();
@@ -490,12 +505,11 @@ public class Destination {
} else if (remoteNameStyle.equals("basenameOnly")) {
name = FilenameUtils.getBaseName(name);
} else if (!remoteNameStyle.equals("slash")) {
- repLog.debug(String.format(
- "Unknown remoteNameStyle: %s, falling back to slash",
- remoteNameStyle));
+ repLog.debug(
+ String.format("Unknown remoteNameStyle: %s, falling back to slash", remoteNameStyle));
}
- String replacedPath = ReplicationQueue.replaceName(uri.getPath(), name,
- isSingleProjectMatch());
+ String replacedPath =
+ ReplicationQueue.replaceName(uri.getPath(), name, isSingleProjectMatch());
if (replacedPath != null) {
uri = uri.setPath(replacedPath);
r.add(uri);
@@ -507,8 +521,8 @@ public class Destination {
static boolean needsUrlEncoding(URIish uri) {
return "http".equalsIgnoreCase(uri.getScheme())
- || "https".equalsIgnoreCase(uri.getScheme())
- || "amazon-s3".equalsIgnoreCase(uri.getScheme());
+ || "https".equalsIgnoreCase(uri.getScheme())
+ || "amazon-s3".equalsIgnoreCase(uri.getScheme());
}
static String encode(String str) {
@@ -518,9 +532,7 @@ public class Destination {
// path used to the repository. Space is incorrectly encoded as '+' for this
// context. In the path part of a URI space should be %20, but in form data
// space is '+'. Our cleanup replace fixes these two issues.
- return URLEncoder.encode(str, "UTF-8")
- .replaceAll("%2[fF]", "/")
- .replace("+", "%20");
+ return URLEncoder.encode(str, "UTF-8").replaceAll("%2[fF]", "/").replace("+", "%20");
} catch (UnsupportedEncodingException e) {
throw new RuntimeException(e);
}
@@ -560,4 +572,11 @@ public class Destination {
}
return uri.toString().contains(urlMatch);
}
+
+ private void postEvent(PushOne pushOp, String ref) {
+ Project.NameKey project = pushOp.getProjectNameKey();
+ String targetNode = resolveNodeName(pushOp.getURI());
+ ReplicationScheduledEvent event = new ReplicationScheduledEvent(project.get(), ref, targetNode);
+ eventDispatcher.get().postEvent(new Branch.NameKey(project, ref), event);
+ }
}