diff options
Diffstat (limited to 'src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java')
-rw-r--r-- | src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java | 171 |
1 files changed, 82 insertions, 89 deletions
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java index ba8e9cc..9a68d32 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java @@ -26,10 +26,15 @@ import com.google.gerrit.reviewdb.client.Project; import com.google.gerrit.server.git.WorkQueue; import com.google.inject.Inject; import com.google.inject.Provider; - import com.googlesource.gerrit.plugins.replication.PushResultProcessing.GitUpdateProcessing; import com.googlesource.gerrit.plugins.replication.ReplicationConfig.FilterType; - +import java.io.File; +import java.io.IOException; +import java.io.OutputStream; +import java.net.URISyntaxException; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; import org.eclipse.jgit.errors.TransportException; import org.eclipse.jgit.internal.storage.file.FileRepository; import org.eclipse.jgit.lib.Constants; @@ -44,21 +49,13 @@ import org.eclipse.jgit.util.io.StreamCopyThread; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.File; -import java.io.IOException; -import java.io.OutputStream; -import java.net.URISyntaxException; -import java.util.Collections; -import java.util.HashSet; -import java.util.Set; - /** Manages automatic replication to remote repositories. */ -public class ReplicationQueue implements - LifecycleListener, - GitReferenceUpdatedListener, - NewProjectCreatedListener, - ProjectDeletedListener, - HeadUpdatedListener { +public class ReplicationQueue + implements LifecycleListener, + GitReferenceUpdatedListener, + NewProjectCreatedListener, + ProjectDeletedListener, + HeadUpdatedListener { static final String REPLICATION_LOG_NAME = "replication_log"; static final Logger repLog = LoggerFactory.getLogger(REPLICATION_LOG_NAME); private static final int SSH_REMOTE_TIMEOUT = 120 * 1000; @@ -84,7 +81,8 @@ public class ReplicationQueue implements private volatile boolean running; @Inject - ReplicationQueue(WorkQueue wq, + ReplicationQueue( + WorkQueue wq, ReplicationConfig rc, DynamicItem<EventDispatcher> dis, ReplicationStateListener sl, @@ -107,16 +105,14 @@ public class ReplicationQueue implements running = false; int discarded = config.shutdown(); if (discarded > 0) { - repLog.warn(String.format( - "Canceled %d replication events during shutdown", discarded)); + repLog.warn(String.format("Canceled %d replication events during shutdown", discarded)); } } - void scheduleFullSync(final Project.NameKey project, final String urlMatch, - ReplicationState state) { + void scheduleFullSync( + final Project.NameKey project, final String urlMatch, ReplicationState state) { if (!running) { - stateLog.warn("Replication plugin did not finish startup before event", - state); + stateLog.warn("Replication plugin did not finish startup before event", state); return; } @@ -131,8 +127,7 @@ public class ReplicationQueue implements @Override public void onGitReferenceUpdated(GitReferenceUpdatedListener.Event event) { - ReplicationState state = - new ReplicationState(new GitUpdateProcessing(dispatcher.get())); + ReplicationState state = new ReplicationState(new GitUpdateProcessing(dispatcher.get())); if (!running) { stateLog.warn("Replication plugin did not finish startup before event", state); return; @@ -151,30 +146,28 @@ public class ReplicationQueue implements @Override public void onNewProjectCreated(NewProjectCreatedListener.Event event) { - for (URIish uri : getURIs(new Project.NameKey(event.getProjectName()), - FilterType.PROJECT_CREATION)) { + for (URIish uri : + getURIs(new Project.NameKey(event.getProjectName()), FilterType.PROJECT_CREATION)) { createProject(uri, event.getHeadName()); } } @Override public void onProjectDeleted(ProjectDeletedListener.Event event) { - for (URIish uri : getURIs(new Project.NameKey(event.getProjectName()), - FilterType.PROJECT_DELETION)) { + for (URIish uri : + getURIs(new Project.NameKey(event.getProjectName()), FilterType.PROJECT_DELETION)) { deleteProject(uri); } } @Override public void onHeadUpdated(HeadUpdatedListener.Event event) { - for (URIish uri : getURIs(new Project.NameKey(event.getProjectName()), - FilterType.ALL)) { + for (URIish uri : getURIs(new Project.NameKey(event.getProjectName()), FilterType.ALL)) { updateHead(uri, event.getNewHeadName()); } } - private Set<URIish> getURIs(Project.NameKey projectName, - FilterType filterType) { + private Set<URIish> getURIs(Project.NameKey projectName, FilterType filterType) { if (config.getDestinations(filterType).isEmpty()) { return Collections.emptySet(); } @@ -200,23 +193,19 @@ public class ReplicationQueue implements try { uri = new URIish(url); } catch (URISyntaxException e) { - repLog.warn(String.format("adminURL '%s' is invalid: %s", url, - e.getMessage())); + repLog.warn(String.format("adminURL '%s' is invalid: %s", url, e.getMessage())); continue; } - String path = replaceName(uri.getPath(), projectName.get(), - config.isSingleProjectMatch()); + String path = replaceName(uri.getPath(), projectName.get(), config.isSingleProjectMatch()); if (path == null) { - repLog.warn(String - .format("adminURL %s does not contain ${name}", uri)); + repLog.warn(String.format("adminURL %s does not contain ${name}", uri)); continue; } uri = uri.setPath(path); if (!isSSH(uri)) { - repLog.warn(String.format( - "adminURL '%s' is invalid: only SSH is supported", uri)); + repLog.warn(String.format("adminURL '%s' is invalid: only SSH is supported", uri)); continue; } @@ -249,9 +238,12 @@ public class ReplicationQueue implements createRemoteSsh(replicateURI, head); repLog.info("Created remote repository: " + replicateURI); } else { - repLog.warn(String.format("Cannot create new project on remote site %s." - + " Only local paths and SSH URLs are supported" - + " for remote repository creation", replicateURI)); + repLog.warn( + String.format( + "Cannot create new project on remote site %s." + + " Only local paths and SSH URLs are supported" + + " for remote repository creation", + replicateURI)); return false; } return true; @@ -267,16 +259,13 @@ public class ReplicationQueue implements u.link(head); } } catch (IOException e) { - repLog.error(String.format( - "Error creating local repository %s:\n", uri.getPath()), e); + repLog.error(String.format("Error creating local repository %s:\n", uri.getPath()), e); } } private void createRemoteSsh(URIish uri, String head) { String quotedPath = QuotedString.BOURNE.quote(uri.getPath()); - String cmd = "mkdir -p " + quotedPath - + " && cd " + quotedPath - + " && git init --bare"; + String cmd = "mkdir -p " + quotedPath + " && cd " + quotedPath + " && git init --bare"; if (head != null) { cmd = cmd + " && git symbolic-ref HEAD " + QuotedString.BOURNE.quote(head); } @@ -284,12 +273,14 @@ public class ReplicationQueue implements try { executeRemoteSsh(uri, cmd, errStream); } catch (IOException e) { - repLog.error(String.format( - "Error creating remote repository at %s:\n" - + " Exception: %s\n" - + " Command: %s\n" - + " Output: %s", - uri, e, cmd, errStream), e); + repLog.error( + String.format( + "Error creating remote repository at %s:\n" + + " Exception: %s\n" + + " Command: %s\n" + + " Output: %s", + uri, e, cmd, errStream), + e); } } @@ -301,9 +292,12 @@ public class ReplicationQueue implements deleteRemoteSsh(replicateURI); repLog.info("Deleted remote repository: " + replicateURI); } else { - repLog.warn(String.format("Cannot delete project on remote site %s." - + " Only local paths and SSH URLs are supported" - + " for remote repository deletion", replicateURI)); + repLog.warn( + String.format( + "Cannot delete project on remote site %s." + + " Only local paths and SSH URLs are supported" + + " for remote repository deletion", + replicateURI)); } } @@ -311,9 +305,7 @@ public class ReplicationQueue implements try { recursivelyDelete(new File(uri.getPath())); } catch (IOException e) { - repLog.error(String.format( - "Error deleting local repository %s:\n", - uri.getPath()), e); + repLog.error(String.format("Error deleting local repository %s:\n", uri.getPath()), e); } } @@ -342,12 +334,14 @@ public class ReplicationQueue implements try { executeRemoteSsh(uri, cmd, errStream); } catch (IOException e) { - repLog.error(String.format( - "Error deleting remote repository at %s:\n" - + " Exception: %s\n" - + " Command: %s\n" - + " Output: %s", - uri, e, cmd, errStream), e); + repLog.error( + String.format( + "Error deleting remote repository at %s:\n" + + " Exception: %s\n" + + " Command: %s\n" + + " Output: %s", + uri, e, cmd, errStream), + e); } } @@ -357,27 +351,31 @@ public class ReplicationQueue implements } else if (isSSH(replicateURI)) { updateHeadRemoteSsh(replicateURI, newHead); } else { - repLog.warn(String.format( - "Cannot update HEAD of project on remote site %s." - + " Only local paths and SSH URLs are supported" - + " for remote HEAD update.", replicateURI)); + repLog.warn( + String.format( + "Cannot update HEAD of project on remote site %s." + + " Only local paths and SSH URLs are supported" + + " for remote HEAD update.", + replicateURI)); } } private void updateHeadRemoteSsh(URIish uri, String newHead) { String quotedPath = QuotedString.BOURNE.quote(uri.getPath()); - String cmd = "cd " + quotedPath - + " && git symbolic-ref HEAD " + QuotedString.BOURNE.quote(newHead); + String cmd = + "cd " + quotedPath + " && git symbolic-ref HEAD " + QuotedString.BOURNE.quote(newHead); OutputStream errStream = newErrorBufferStream(); try { executeRemoteSsh(uri, cmd, errStream); } catch (IOException e) { - repLog.error(String.format( - "Error updating HEAD of remote repository at %s to %s:\n" - + " Exception: %s\n" - + " Command: %s\n" - + " Output: %s", - uri, newHead, e, cmd, errStream), e); + repLog.error( + String.format( + "Error updating HEAD of remote repository at %s to %s:\n" + + " Exception: %s\n" + + " Command: %s\n" + + " Output: %s", + uri, newHead, e, cmd, errStream), + e); } } @@ -389,20 +387,16 @@ public class ReplicationQueue implements } } catch (IOException e) { repLog.error( - String.format("Failed to update HEAD of repository %s to %s", - uri.getPath(), newHead), e); + String.format("Failed to update HEAD of repository %s to %s", uri.getPath(), newHead), e); } } - private void executeRemoteSsh(URIish uri, String cmd, - OutputStream errStream) throws IOException { + private void executeRemoteSsh(URIish uri, String cmd, OutputStream errStream) throws IOException { RemoteSession ssh = connect(uri); Process proc = ssh.exec(cmd, 0); proc.getOutputStream().close(); - StreamCopyThread out = - new StreamCopyThread(proc.getInputStream(), errStream); - StreamCopyThread err = - new StreamCopyThread(proc.getErrorStream(), errStream); + StreamCopyThread out = new StreamCopyThread(proc.getInputStream(), errStream); + StreamCopyThread err = new StreamCopyThread(proc.getErrorStream(), errStream); out.start(); err.start(); try { @@ -416,8 +410,7 @@ public class ReplicationQueue implements } private RemoteSession connect(URIish uri) throws TransportException { - return sshSessionFactoryProvider.get().getSession(uri, null, FS.DETECTED, - SSH_REMOTE_TIMEOUT); + return sshSessionFactoryProvider.get().getSession(uri, null, FS.DETECTED, SSH_REMOTE_TIMEOUT); } private static OutputStream newErrorBufferStream() { |