summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorYang Zhenhui <zhenhui.yang@sonymobile.com>2012-09-24 13:56:21 +0800
committerOswald Buddenhagen <oswald.buddenhagen@theqtcompany.com>2015-02-10 21:03:17 +0000
commitd8a178acff21f6828b7693525492296b3838eff5 (patch)
tree88789d6e0a14a1f764f98626c15757194235f7ed
parent2afe9249cf818987e1e8a20c3f6691ca79370735 (diff)
Show the replication status after running the replication command
The actual replication work is done asynchronously and it is split into many subtasks, this patch adds the functionality that traces every subtask execution, once the task is finished, we can get replication status of all subtasks. This makes it possible for users to start the replication ssh command, and wait to see information about replication progress and errors in their consoles before the command exits. For the gerrit triggered replication of receiving a new patchset, we can also add some features in the future, for example, send stream events for a finished replication to a node. Change-Id: I0fc64e93faa9157fc936379de29691282388ba26 (cherry picked from commit 05f71912f4289480533773ffd05cb5cfcbe191bf) Reviewed-by: Ismo Haataja <ismo.haataja@digia.com> Reviewed-by: Oswald Buddenhagen <oswald.buddenhagen@theqtcompany.com>
-rw-r--r--src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java26
-rw-r--r--src/main/java/com/googlesource/gerrit/plugins/replication/OnStartStop.java4
-rw-r--r--src/main/java/com/googlesource/gerrit/plugins/replication/PushAll.java12
-rw-r--r--src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java158
-rw-r--r--src/main/java/com/googlesource/gerrit/plugins/replication/PushResultProcessing.java129
-rw-r--r--src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java16
-rw-r--r--src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationState.java136
-rw-r--r--src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationType.java26
-rw-r--r--src/main/java/com/googlesource/gerrit/plugins/replication/StartCommand.java63
-rw-r--r--src/main/java/com/googlesource/gerrit/plugins/replication/WrappedLogger.java54
-rw-r--r--src/main/resources/Documentation/cmd-start.md11
11 files changed, 575 insertions, 60 deletions
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
index 4bf12d5..fe5969c 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
@@ -51,6 +51,7 @@ import org.eclipse.jgit.lib.Repository;
import org.eclipse.jgit.transport.RefSpec;
import org.eclipse.jgit.transport.RemoteConfig;
import org.eclipse.jgit.transport.URIish;
+import org.slf4j.Logger;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
@@ -62,6 +63,9 @@ import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
class Destination {
+ private static final Logger log = ReplicationQueue.log;
+ private static final WrappedLogger wrappedLog = new WrappedLogger(log);
+
private final int poolThreads;
private final String poolName;
@@ -110,7 +114,7 @@ class Destination {
if (g != null) {
builder.add(g.getUUID());
} else {
- ReplicationQueue.log.warn(String.format(
+ log.warn(String.format(
"Group \"%s\" not recognized, removing from authGroup", name));
}
}
@@ -181,7 +185,7 @@ class Destination {
}
void schedule(final Project.NameKey project, final String ref,
- final URIish uri) {
+ final URIish uri, ReplicationState state) {
try {
boolean visible = threadScoper.scope(new Callable<Boolean>(){
@Override
@@ -193,8 +197,8 @@ class Destination {
return;
}
} catch (NoSuchProjectException err) {
- ReplicationQueue.log.error(String.format(
- "source project %s not available", project), err);
+ wrappedLog.error(String.format(
+ "source project %s not available", project), err, state);
return;
} catch (Exception e) {
throw Throwables.propagate(e);
@@ -210,8 +214,8 @@ class Destination {
try {
git = gitManager.openRepository(project);
} catch (IOException err) {
- ReplicationQueue.log.error(String.format(
- "source project %s not available", project), err);
+ wrappedLog.error(String.format(
+ "source project %s not available", project), err, state);
return;
}
try {
@@ -222,8 +226,8 @@ class Destination {
return;
}
} catch (IOException err) {
- ReplicationQueue.log.error(String.format(
- "cannot check type of project %s", project), err);
+ wrappedLog.error(String.format(
+ "cannot check type of project %s", project), err, state);
return;
} finally {
git.close();
@@ -239,6 +243,8 @@ class Destination {
pending.put(uri, e);
}
e.addRef(ref);
+ state.increasePushTaskCount();
+ e.addState(ref, state);
}
}
@@ -290,6 +296,8 @@ class Destination {
// here, find out replication to its URI is already pending
// for retry (blocking).
pendingPushOp.addRefs(pushOp.getRefs());
+ pendingPushOp.addStates(pushOp.getStates());
+ pushOp.removeStates();
} else {
// The one pending is one that is NOT retrying, it was just
@@ -308,6 +316,8 @@ class Destination {
pending.remove(uri);
pushOp.addRefs(pendingPushOp.getRefs());
+ pushOp.addStates(pendingPushOp.getStates());
+ pendingPushOp.removeStates();
}
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/OnStartStop.java b/src/main/java/com/googlesource/gerrit/plugins/replication/OnStartStop.java
index 90d4644..c752edc 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/OnStartStop.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/OnStartStop.java
@@ -46,7 +46,9 @@ class OnStartStop implements LifecycleListener {
if (srvInfo.getState() == ServerInformation.State.STARTUP
&& queue.replicateAllOnPluginStart) {
- pushAllFuture.set(pushAll.create(null).schedule(30, TimeUnit.SECONDS));
+ ReplicationState state =
+ new ReplicationState(ReplicationType.STARTUP);
+ pushAllFuture.set(pushAll.create(null, state).schedule(30, TimeUnit.SECONDS));
}
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/PushAll.java b/src/main/java/com/googlesource/gerrit/plugins/replication/PushAll.java
index 6d99536..301a861 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/PushAll.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/PushAll.java
@@ -30,23 +30,26 @@ import javax.annotation.Nullable;
class PushAll implements Runnable {
private static final Logger log = LoggerFactory.getLogger(PushAll.class);
+ private static final WrappedLogger wrappedLog = new WrappedLogger(log);
interface Factory {
- PushAll create(String urlMatch);
+ PushAll create(String urlMatch, ReplicationState state);
}
private final WorkQueue workQueue;
private final ProjectCache projectCache;
private final ReplicationQueue replication;
private final String urlMatch;
+ private final ReplicationState state;
@Inject
PushAll(WorkQueue wq, ProjectCache projectCache, ReplicationQueue rq,
- @Assisted @Nullable String urlMatch) {
+ @Assisted @Nullable String urlMatch, @Assisted ReplicationState state) {
this.workQueue = wq;
this.projectCache = projectCache;
this.replication = rq;
this.urlMatch = urlMatch;
+ this.state = state;
}
Future<?> schedule(long delay, TimeUnit unit) {
@@ -57,11 +60,12 @@ class PushAll implements Runnable {
public void run() {
try {
for (Project.NameKey nameKey : projectCache.all()) {
- replication.scheduleFullSync(nameKey, urlMatch);
+ replication.scheduleFullSync(nameKey, urlMatch, state);
}
} catch (Exception e) {
- log.error("Cannot enumerate known projects", e);
+ wrappedLog.error("Cannot enumerate known projects", e, state);
}
+ state.markAllPushTasksScheduled();
}
@Override
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java b/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java
index 10d1013..6ad44e9 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java
@@ -15,8 +15,10 @@
package com.googlesource.gerrit.plugins.replication;
import com.google.common.base.Throwables;
+import com.google.common.collect.LinkedListMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
import com.google.gerrit.extensions.events.GitReferenceUpdatedListener;
import com.google.gerrit.reviewdb.client.Project;
@@ -34,6 +36,8 @@ import com.google.gwtorm.server.SchemaFactory;
import com.google.inject.Inject;
import com.google.inject.assistedinject.Assisted;
+import com.googlesource.gerrit.plugins.replication.ReplicationState.RefPushResult;
+
import com.jcraft.jsch.JSchException;
import org.eclipse.jgit.errors.NoRemoteRepositoryException;
@@ -56,7 +60,9 @@ import org.eclipse.jgit.transport.URIish;
import org.slf4j.Logger;
import java.io.IOException;
+import java.util.Collection;
import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -70,6 +76,7 @@ import java.util.concurrent.Callable;
*/
class PushOne implements ProjectRunnable {
private static final Logger log = ReplicationQueue.log;
+ private static final WrappedLogger wrappedLog = new WrappedLogger(log);
static final String ALL_REFS = "..all..";
interface Factory {
@@ -92,6 +99,8 @@ class PushOne implements ProjectRunnable {
private Repository git;
private boolean retrying;
private boolean canceled;
+ private final Multimap<String,ReplicationState> stateMap =
+ LinkedListMultimap.create();
@Inject
PushOne(final GitRepositoryManager grm,
@@ -177,6 +186,42 @@ class PushOne implements ProjectRunnable {
}
}
+ void addState(String ref, ReplicationState state) {
+ stateMap.put(ref, state);
+ }
+
+ Multimap<String,ReplicationState> getStates() {
+ return stateMap;
+ }
+
+ ReplicationState[] getStatesAsArray() {
+ Set<ReplicationState> statesSet = new HashSet<ReplicationState>();
+ statesSet.addAll(stateMap.values());
+ return statesSet.toArray(new ReplicationState[statesSet.size()]);
+ }
+
+ ReplicationState[] getStatesByRef(String ref) {
+ Collection<ReplicationState> states = stateMap.get(ref);
+ return states.toArray(new ReplicationState[states.size()]);
+ }
+
+ void addStates(Multimap<String,ReplicationState> states) {
+ stateMap.putAll(states);
+ }
+
+ void removeStates() {
+ stateMap.clear();
+ }
+
+ private void statesCleanUp() {
+ if (!stateMap.isEmpty() && !isRetrying()) {
+ for (Map.Entry<String,ReplicationState> entry : stateMap.entries()) {
+ entry.getValue().notifyRefReplicated(projectName.get(), entry.getKey(), uri,
+ RefPushResult.FAILED);
+ }
+ }
+ }
+
@Override
public void run() {
try {
@@ -189,6 +234,8 @@ class PushOne implements ProjectRunnable {
}).call();
} catch (Exception e) {
throw Throwables.propagate(e);
+ } finally {
+ statesCleanUp();
}
}
@@ -206,16 +253,16 @@ class PushOne implements ProjectRunnable {
git = gitManager.openRepository(projectName);
runImpl();
} catch (RepositoryNotFoundException e) {
- log.error("Cannot replicate " + projectName + "; " + e.getMessage());
+ wrappedLog.error("Cannot replicate " + projectName + "; " + e.getMessage(), getStatesAsArray());
} catch (RemoteRepositoryException e) {
log.error("Cannot replicate " + projectName + "; " + e.getMessage());
} catch (NoRemoteRepositoryException e) {
- log.error("Cannot replicate to " + uri + "; repository not found");
+ wrappedLog.error("Cannot replicate to " + uri + "; repository not found", getStatesAsArray());
} catch (NotSupportedException e) {
- log.error("Cannot replicate to " + uri, e);
+ wrappedLog.error("Cannot replicate to " + uri, e, getStatesAsArray());
} catch (TransportException e) {
Throwable cause = e.getCause();
@@ -229,13 +276,13 @@ class PushOne implements ProjectRunnable {
// The remote push operation should be retried.
pool.reschedule(this);
} catch (IOException e) {
- log.error("Cannot replicate to " + uri, e);
+ wrappedLog.error("Cannot replicate to " + uri, e, getStatesAsArray());
} catch (RuntimeException e) {
- log.error("Unexpected error during replication to " + uri, e);
+ wrappedLog.error("Unexpected error during replication to " + uri, e, getStatesAsArray());
} catch (Error e) {
- log.error("Unexpected error during replication to " + uri, e);
+ wrappedLog.error("Unexpected error during replication to " + uri, e, getStatesAsArray());
} finally {
if (git != null) {
@@ -258,36 +305,7 @@ class PushOne implements ProjectRunnable {
}
}
- for (RemoteRefUpdate u : res.getRemoteUpdates()) {
- switch (u.getStatus()) {
- case OK:
- case UP_TO_DATE:
- case NON_EXISTING:
- break;
-
- case NOT_ATTEMPTED:
- case AWAITING_REPORT:
- case REJECTED_NODELETE:
- case REJECTED_NONFASTFORWARD:
- case REJECTED_REMOTE_CHANGED:
- log.error(String.format("Failed replicate of %s to %s: status %s",
- u.getRemoteName(), uri, u.getStatus()));
- break;
-
- case REJECTED_OTHER_REASON:
- if ("non-fast-forward".equals(u.getMessage())) {
- log.error(String.format("Failed replicate of %s to %s"
- + ", remote rejected non-fast-forward push."
- + " Check receive.denyNonFastForwards variable in config file"
- + " of destination repository.", u.getRemoteName(), uri));
- } else {
- log.error(String.format(
- "Failed replicate of %s to %s, reason: %s",
- u.getRemoteName(), uri, u.getMessage()));
- }
- break;
- }
- }
+ updateStates(res.getRemoteUpdates());
}
private PushResult pushVia(Transport tn)
@@ -336,7 +354,7 @@ class PushOne implements ProjectRunnable {
try {
db = schema.open();
} catch (OrmException e) {
- log.error("Cannot read database to replicate to " + projectName, e);
+ wrappedLog.error("Cannot read database to replicate to " + projectName, e, getStatesAsArray());
return Collections.emptyList();
}
try {
@@ -445,4 +463,70 @@ class PushOne implements ProjectRunnable {
boolean force = spec.isForceUpdate();
cmds.add(new RemoteRefUpdate(git, (Ref) null, dst, force, null, null));
}
+
+ private void updateStates(Collection<RemoteRefUpdate> refUpdates) {
+ Set<String> doneRefs = new HashSet<String>();
+ boolean anyRefFailed = false;
+
+ for (RemoteRefUpdate u : refUpdates) {
+ RefPushResult pushStatus = RefPushResult.SUCCEEDED;
+ Set<ReplicationState> logStates = new HashSet<ReplicationState>();
+
+ logStates.addAll(stateMap.get(u.getSrcRef()));
+ logStates.addAll(stateMap.get(ALL_REFS));
+ ReplicationState[] logStatesArray = logStates.toArray(new ReplicationState[logStates.size()]);
+
+ doneRefs.add(u.getSrcRef());
+ switch (u.getStatus()) {
+ case OK:
+ case UP_TO_DATE:
+ case NON_EXISTING:
+ break;
+
+ case NOT_ATTEMPTED:
+ case AWAITING_REPORT:
+ case REJECTED_NODELETE:
+ case REJECTED_NONFASTFORWARD:
+ case REJECTED_REMOTE_CHANGED:
+ wrappedLog.error(String.format("Failed replicate of %s to %s: status %s",
+ u.getRemoteName(), uri, u.getStatus()), logStatesArray);
+ pushStatus = RefPushResult.FAILED;
+ anyRefFailed = true;
+ break;
+
+ case REJECTED_OTHER_REASON:
+ if ("non-fast-forward".equals(u.getMessage())) {
+ wrappedLog.error(String.format("Failed replicate of %s to %s"
+ + ", remote rejected non-fast-forward push."
+ + " Check receive.denyNonFastForwards variable in config file"
+ + " of destination repository.", u.getRemoteName(), uri), logStatesArray);
+ } else {
+ wrappedLog.error(String.format(
+ "Failed replicate of %s to %s, reason: %s",
+ u.getRemoteName(), uri, u.getMessage()), logStatesArray);
+ }
+ pushStatus = RefPushResult.FAILED;
+ anyRefFailed = true;
+ break;
+ }
+
+ for (ReplicationState rs : getStatesByRef(u.getSrcRef())) {
+ rs.notifyRefReplicated(projectName.get(), u.getSrcRef(),
+ uri, pushStatus);
+ }
+ }
+
+ doneRefs.add(ALL_REFS);
+ for (ReplicationState rs : getStatesByRef(ALL_REFS)) {
+ rs.notifyRefReplicated(projectName.get(), ALL_REFS,
+ uri, anyRefFailed ? RefPushResult.FAILED : RefPushResult.SUCCEEDED);
+ }
+ for (Map.Entry<String,ReplicationState> entry : stateMap.entries()) {
+ if (!doneRefs.contains(entry.getKey())) {
+ entry.getValue().notifyRefReplicated(projectName.get(), entry.getKey(), uri,
+ RefPushResult.NOT_ATTEMPTED);
+ }
+ }
+ stateMap.clear();
+ }
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/PushResultProcessing.java b/src/main/java/com/googlesource/gerrit/plugins/replication/PushResultProcessing.java
new file mode 100644
index 0000000..27c8b85
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/PushResultProcessing.java
@@ -0,0 +1,129 @@
+// Copyright (C) 2013 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+import com.googlesource.gerrit.plugins.replication.ReplicationState.RefPushResult;
+
+import org.eclipse.jgit.transport.URIish;
+
+import java.lang.ref.WeakReference;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public abstract class PushResultProcessing {
+ abstract void onOneNodeReplicated(String project, String ref, URIish uri, RefPushResult status);
+
+ abstract void onAllNodesReplicated(int totalPushTasksCount);
+
+ void writeStdOut(final String message) {
+ // Default doing nothing
+ }
+
+ void writeStdErr(final String message) {
+ // Default doing nothing
+ }
+
+ public static class CommandProcessing extends PushResultProcessing {
+ private WeakReference<StartCommand> sshCommand;
+ private AtomicBoolean hasError = new AtomicBoolean();
+
+ CommandProcessing(StartCommand sshCommand) {
+ this.sshCommand = new WeakReference<StartCommand>(sshCommand);
+ }
+
+ @Override
+ void onOneNodeReplicated(String project, String ref, URIish uri,
+ RefPushResult status) {
+ StringBuilder sb = new StringBuilder();
+ sb.append("Replicate ");
+ sb.append(project);
+ sb.append(" to ");
+ if (uri.getHost() != null) {
+ sb.append(uri.getHost());
+ } else {
+ sb.append("localhost");
+ }
+ sb.append(", ");
+ switch (status) {
+ case SUCCEEDED:
+ sb.append("Succeeded!");
+ break;
+ case FAILED:
+ sb.append("FAILED!");
+ hasError.compareAndSet(false, true);
+ break;
+ case NOT_ATTEMPTED:
+ sb.append("NOT ATTEMPTED!");
+ break;
+ default:
+ sb.append("UNKNOWN RESULT!");
+ break;
+ }
+ writeStdOut(sb.toString());
+ }
+
+ @Override
+ void onAllNodesReplicated(int totalPushTasksCount) {
+ if (totalPushTasksCount == 0) {
+ return;
+ }
+ writeStdOut("----------------------------------------------");
+ if (hasError.get()) {
+ writeStdOut("Replication completed with some errors!");
+ } else {
+ writeStdOut("Replication completed successfully!");
+ }
+ }
+
+ @Override
+ void writeStdOut(final String message) {
+ StartCommand command = sshCommand.get();
+ if (command != null) {
+ command.writeStdOutSync(message);
+ }
+ }
+
+ @Override
+ void writeStdErr(final String message) {
+ StartCommand command = sshCommand.get();
+ if (command != null) {
+ command.writeStdErrSync(message);
+ }
+ }
+ }
+
+ public static class GitUpdateProcessing extends PushResultProcessing {
+ @Override
+ void onOneNodeReplicated(String project, String ref, URIish uri,
+ RefPushResult status) {
+ //TODO: send stream events
+ }
+
+ @Override
+ void onAllNodesReplicated(int totalPushTasksCount) {
+ //TODO: send stream events
+ }
+ }
+
+ public static class NoopProcessing extends PushResultProcessing {
+ @Override
+ void onOneNodeReplicated(String project, String ref, URIish uri,
+ RefPushResult status) {
+ }
+
+ @Override
+ void onAllNodesReplicated(int totalPushTasksCount) {
+ }
+ }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
index ba4c0db..a39d61c 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
@@ -63,6 +63,7 @@ class ReplicationQueue implements
GitReferenceUpdatedListener,
NewProjectCreatedListener {
static final Logger log = LoggerFactory.getLogger(ReplicationQueue.class);
+ private static final WrappedLogger wrappedLog = new WrappedLogger(log);
static String replaceName(String in, String name, boolean keyIsOptional) {
String key = "${name}";
@@ -125,16 +126,17 @@ class ReplicationQueue implements
}
}
- void scheduleFullSync(final Project.NameKey project, final String urlMatch) {
+ void scheduleFullSync(final Project.NameKey project, final String urlMatch,
+ ReplicationState state) {
if (!running) {
- log.warn("Replication plugin did not finish startup before event");
+ wrappedLog.warn("Replication plugin did not finish startup before event", state);
return;
}
for (Destination cfg : configs) {
if (cfg.wouldPushProject(project)) {
for (URIish uri : cfg.getURIs(project, urlMatch)) {
- cfg.schedule(project, PushOne.ALL_REFS, uri);
+ cfg.schedule(project, PushOne.ALL_REFS, uri, state);
}
}
}
@@ -142,8 +144,11 @@ class ReplicationQueue implements
@Override
public void onGitReferenceUpdated(GitReferenceUpdatedListener.Event event) {
+ ReplicationState state =
+ new ReplicationState(ReplicationType.GIT_UPDATED);
+
if (!running) {
- log.warn("Replication plugin did not finish startup before event");
+ wrappedLog.warn("Replication plugin did not finish startup before event", state);
return;
}
@@ -152,11 +157,12 @@ class ReplicationQueue implements
for (Destination cfg : configs) {
if (cfg.wouldPushProject(project) && cfg.wouldPushRef(u.getRefName())) {
for (URIish uri : cfg.getURIs(project, null)) {
- cfg.schedule(project, u.getRefName(), uri);
+ cfg.schedule(project, u.getRefName(), uri, state);
}
}
}
}
+ state.markAllPushTasksScheduled();
}
private List<Destination> allDestinations(File cfgPath)
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationState.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationState.java
new file mode 100644
index 0000000..35f086f
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationState.java
@@ -0,0 +1,136 @@
+// Copyright (C) 2012 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+import org.eclipse.jgit.transport.URIish;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import com.googlesource.gerrit.plugins.replication.PushResultProcessing.CommandProcessing;
+import com.googlesource.gerrit.plugins.replication.PushResultProcessing.GitUpdateProcessing;
+import com.googlesource.gerrit.plugins.replication.PushResultProcessing.NoopProcessing;
+
+public class ReplicationState {
+ private boolean allScheduled;
+ private final PushResultProcessing pushResultProcessing;
+
+ private final Lock countingLock = new ReentrantLock();
+ private final CountDownLatch allPushTasksFinished = new CountDownLatch(1);
+
+ private int totalPushTasksCount;
+ private int finishedPushTasksCount;
+
+ public ReplicationState(ReplicationType type) {
+ this(type, null);
+ }
+
+ public ReplicationState(ReplicationType type, StartCommand sshCommand) {
+ switch(type) {
+ case COMMAND:
+ pushResultProcessing = new CommandProcessing(sshCommand);
+ break;
+ case GIT_UPDATED:
+ pushResultProcessing = new GitUpdateProcessing();
+ break;
+ case STARTUP:
+ default:
+ pushResultProcessing = new NoopProcessing();
+ break;
+ }
+ }
+
+ public void increasePushTaskCount() {
+ countingLock.lock();
+ try {
+ totalPushTasksCount++;
+ } finally {
+ countingLock.unlock();
+ }
+ }
+
+ public boolean hasPushTask() {
+ return totalPushTasksCount != 0;
+ }
+
+ public void notifyRefReplicated(String project, String ref, URIish uri,
+ RefPushResult status) {
+ pushResultProcessing.onOneNodeReplicated(project, ref, uri, status);
+
+ countingLock.lock();
+ try {
+ finishedPushTasksCount++;
+ if (!allScheduled) {
+ return;
+ }
+ if (finishedPushTasksCount < totalPushTasksCount) {
+ return;
+ }
+ } finally {
+ countingLock.unlock();
+ }
+
+ doAllPushTasksCompleted();
+ }
+
+ public void markAllPushTasksScheduled() {
+ countingLock.lock();
+ try {
+ allScheduled = true;
+ if (finishedPushTasksCount < totalPushTasksCount) {
+ return;
+ }
+ } finally {
+ countingLock.unlock();
+ }
+
+ doAllPushTasksCompleted();
+ }
+
+ private void doAllPushTasksCompleted() {
+ pushResultProcessing.onAllNodesReplicated(totalPushTasksCount);
+ allPushTasksFinished.countDown();
+ }
+
+ public void waitForReplication() throws InterruptedException {
+ allPushTasksFinished.await();
+ }
+
+ public void writeStdOut(final String message) {
+ pushResultProcessing.writeStdOut(message);
+ }
+
+ public void writeStdErr(final String message) {
+ pushResultProcessing.writeStdErr(message);
+ }
+
+ public enum RefPushResult {
+ /**
+ * The ref is not replicated to slave.
+ */
+ FAILED,
+
+ /**
+ * The ref is not configured to be replicated.
+ */
+ NOT_ATTEMPTED,
+
+ /**
+ * ref was successfully replicated.
+ */
+ SUCCEEDED;
+ }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationType.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationType.java
new file mode 100644
index 0000000..ff88b87
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationType.java
@@ -0,0 +1,26 @@
+// Copyright (C) 2012 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+public enum ReplicationType {
+ /** Replicate all after gerrit startup. */
+ STARTUP,
+
+ /** Invoke ssh command to replicate. */
+ COMMAND,
+
+ /** After a git reference is updated, run the replicaton. */
+ GIT_UPDATED;
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/StartCommand.java b/src/main/java/com/googlesource/gerrit/plugins/replication/StartCommand.java
index 5bcb7cb..bcdc3d9 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/StartCommand.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/StartCommand.java
@@ -24,20 +24,30 @@ import com.google.inject.Inject;
import org.kohsuke.args4j.Argument;
import org.kohsuke.args4j.Option;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
@RequiresCapability(GlobalCapability.START_REPLICATION)
@CommandMetaData(name="start", descr="Start replication for specific project or all projects")
final class StartCommand extends SshCommand {
+ private static final Logger log = LoggerFactory.getLogger(StartCommand.class);
+ private static final WrappedLogger wrappedLog = new WrappedLogger(log);
@Option(name = "--all", usage = "push all known projects")
private boolean all;
@Option(name = "--url", metaVar = "PATTERN", usage = "pattern to match URL on")
private String urlMatch;
+ @Option(name = "--wait",
+ usage = "wait for replication to finish before exiting")
+ private boolean wait;
+
@Argument(index = 0, multiValued = true, metaVar = "PROJECT", usage = "project name")
private List<String> projectNames = new ArrayList<String>(2);
@@ -56,17 +66,62 @@ final class StartCommand extends SshCommand {
throw new UnloggedFailure(1, "error: cannot combine --all and PROJECT");
}
+ ReplicationState state =
+ new ReplicationState(ReplicationType.COMMAND, this);
+ Future<?> future = null;
if (all) {
- pushAllFactory.create(urlMatch).schedule(0, TimeUnit.SECONDS);
-
+ future = pushAllFactory.create(urlMatch, state).schedule(0, TimeUnit.SECONDS);
} else {
for (String name : projectNames) {
Project.NameKey key = new Project.NameKey(name);
if (projectCache.get(key) != null) {
- replication.scheduleFullSync(key, urlMatch);
+ replication.scheduleFullSync(key, urlMatch, state);
} else {
- throw new UnloggedFailure(1, "error: '" + name + "': not a Gerrit project");
+ writeStdErrSync("error: '" + name + "': not a Gerrit project");
+ }
+ }
+ state.markAllPushTasksScheduled();
+ }
+
+ if (wait) {
+ if (future != null) {
+ try {
+ future.get();
+ } catch (InterruptedException e) {
+ wrappedLog.error("Thread was interrupted while waiting for PushAll operation to finish", e, state);
+ return;
+ } catch (ExecutionException e) {
+ wrappedLog.error("An exception was thrown in PushAll operation", e, state);
+ return;
+ }
+ }
+
+ if (state.hasPushTask()) {
+ try {
+ state.waitForReplication();
+ } catch (InterruptedException e) {
+ writeStdErrSync("We are interrupted while waiting replication to complete");
}
+ } else {
+ writeStdOutSync("Nothing to replicate");
+ }
+ }
+ }
+
+ public void writeStdOutSync(final String message) {
+ if (wait) {
+ synchronized (stdout) {
+ stdout.println(message);
+ stdout.flush();
+ }
+ }
+ }
+
+ public void writeStdErrSync(final String message) {
+ if (wait) {
+ synchronized (stderr) {
+ stderr.println(message);
+ stderr.flush();
}
}
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/WrappedLogger.java b/src/main/java/com/googlesource/gerrit/plugins/replication/WrappedLogger.java
new file mode 100644
index 0000000..7bef7e7
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/WrappedLogger.java
@@ -0,0 +1,54 @@
+// Copyright (C) 2012 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+import org.slf4j.Logger;
+
+public class WrappedLogger {
+
+ private final Logger logger;
+
+ public WrappedLogger(Logger logger) {
+ this.logger = logger;
+ }
+
+ public void warn(String msg, ReplicationState... states) {
+ stateWriteErr("Warning: " + msg, states);
+ logger.warn(msg);
+ }
+
+ public void warn(String msg, Throwable t, ReplicationState... states) {
+ stateWriteErr("Warning: " + msg, states);
+ logger.warn(msg, t);
+ }
+
+ public void error(String msg, ReplicationState... states) {
+ stateWriteErr("Error: " + msg, states);
+ logger.error(msg);
+ }
+
+ public void error(String msg, Throwable t, ReplicationState... states) {
+ stateWriteErr("Error: " + msg, states);
+ logger.error(msg, t);
+ }
+
+ private void stateWriteErr(String msg, ReplicationState[] states) {
+ for (ReplicationState rs : states) {
+ if (rs != null) {
+ rs.writeStdErr(msg);
+ }
+ }
+ }
+}
diff --git a/src/main/resources/Documentation/cmd-start.md b/src/main/resources/Documentation/cmd-start.md
index 54548f6..a6fc5ae 100644
--- a/src/main/resources/Documentation/cmd-start.md
+++ b/src/main/resources/Documentation/cmd-start.md
@@ -9,6 +9,7 @@ SYNOPSIS
--------
```
ssh -p @SSH_PORT@ @SSH_HOST@ @PLUGIN@ start
+ [--wait]
[--url <PATTERN>]
{--all | <PROJECT> ...}
```
@@ -54,6 +55,11 @@ other reasons why an administrator may wish to trigger replication:
replication on just the affected project can update the
mirrors.
+If you get message "Nothing to replicate" while running this command,
+it may be caused by several reasons, such as you give a wrong url
+pattern in command options, or the authGroup in the replication.config
+has no read access for the replicated projects.
+
ACCESS
------
Caller must be a member of the privileged 'Administrators' group,
@@ -68,8 +74,11 @@ This command is intended to be used in scripts.
OPTIONS
-------
+`--wait`
+: Wait for replication to finish before exiting.
+
`--all`
-: Schedule replicating for all projects.
+: Schedule replication for all projects.
`--url <PATTERN>`
: Replicate only to replication destinations whose URL contains