diff options
Diffstat (limited to 'gerrit-server/src/main/java/com/google/gerrit/server/git/PushReplication.java')
-rw-r--r-- | gerrit-server/src/main/java/com/google/gerrit/server/git/PushReplication.java | 433 |
1 files changed, 433 insertions, 0 deletions
diff --git a/gerrit-server/src/main/java/com/google/gerrit/server/git/PushReplication.java b/gerrit-server/src/main/java/com/google/gerrit/server/git/PushReplication.java new file mode 100644 index 0000000000..84d5b002d4 --- /dev/null +++ b/gerrit-server/src/main/java/com/google/gerrit/server/git/PushReplication.java @@ -0,0 +1,433 @@ +// Copyright (C) 2009 The Android Open Source Project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.google.gerrit.server.git; + +import com.google.gerrit.reviewdb.AccountGroup; +import com.google.gerrit.reviewdb.Project; +import com.google.gerrit.reviewdb.ReviewDb; +import com.google.gerrit.server.CurrentUser; +import com.google.gerrit.server.ReplicationUser; +import com.google.gerrit.server.config.SitePath; +import com.google.gerrit.server.project.NoSuchProjectException; +import com.google.gerrit.server.project.ProjectControl; +import com.google.gwtorm.client.OrmException; +import com.google.gwtorm.client.SchemaFactory; +import com.google.inject.AbstractModule; +import com.google.inject.Inject; +import com.google.inject.Injector; +import com.google.inject.Singleton; +import com.google.inject.assistedinject.FactoryProvider; + +import com.jcraft.jsch.Channel; +import com.jcraft.jsch.ChannelExec; +import com.jcraft.jsch.JSchException; +import com.jcraft.jsch.Session; + +import org.eclipse.jgit.errors.ConfigInvalidException; +import org.eclipse.jgit.lib.Config; +import org.eclipse.jgit.lib.FileBasedConfig; +import org.eclipse.jgit.transport.OpenSshConfig; +import org.eclipse.jgit.transport.RefSpec; +import org.eclipse.jgit.transport.RemoteConfig; +import org.eclipse.jgit.transport.SshConfigSessionFactory; +import org.eclipse.jgit.transport.SshSessionFactory; +import org.eclipse.jgit.transport.URIish; +import org.eclipse.jgit.util.QuotedString; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.io.OutputStream; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +/** Manages automatic replication to remote repositories. */ +@Singleton +public class PushReplication implements ReplicationQueue { + static final Logger log = LoggerFactory.getLogger(PushReplication.class); + + static { + // Install our own factory which always runs in batch mode, as we + // have no UI available for interactive prompting. + // + SshSessionFactory.setInstance(new SshConfigSessionFactory() { + @Override + protected void configure(OpenSshConfig.Host hc, Session session) { + // Default configuration is batch mode. + } + }); + } + + private final Injector injector; + private final WorkQueue workQueue; + private final List<ReplicationConfig> configs; + private final SchemaFactory<ReviewDb> database; + private final ReplicationUser.Factory replicationUserFactory; + + @Inject + PushReplication(final Injector i, final WorkQueue wq, + @SitePath final File sitePath, final ReplicationUser.Factory ruf, + final SchemaFactory<ReviewDb> db) throws ConfigInvalidException, + IOException { + injector = i; + workQueue = wq; + database = db; + replicationUserFactory = ruf; + configs = allConfigs(sitePath); + } + + @Override + public boolean isEnabled() { + return configs.size() > 0; + } + + @Override + public void scheduleFullSync(final Project.NameKey project, + final String urlMatch) { + for (final ReplicationConfig cfg : configs) { + for (final URIish uri : cfg.getURIs(project, urlMatch)) { + cfg.schedule(project, PushOp.MIRROR_ALL, uri); + } + } + } + + @Override + public void scheduleUpdate(final Project.NameKey project, final String ref) { + for (final ReplicationConfig cfg : configs) { + if (cfg.wouldPushRef(ref)) { + for (final URIish uri : cfg.getURIs(project, null)) { + cfg.schedule(project, ref, uri); + } + } + } + } + + private static String replace(final String pat, final String key, + final String val) { + final int n = pat.indexOf("${" + key + "}"); + return pat.substring(0, n) + val + pat.substring(n + 3 + key.length()); + } + + private List<ReplicationConfig> allConfigs(final File path) + throws ConfigInvalidException, IOException { + final File cfgFile = new File(path, "replication.config"); + final FileBasedConfig cfg = new FileBasedConfig(cfgFile); + + if (!cfg.getFile().exists()) { + log.warn("No " + cfg.getFile() + "; not replicating"); + return Collections.emptyList(); + } + + try { + cfg.load(); + } catch (ConfigInvalidException e) { + throw new ConfigInvalidException("Config file " + cfg.getFile() + + " is invalid: " + e.getMessage(), e); + } catch (IOException e) { + throw new IOException("Cannot read " + cfgFile + ": " + e.getMessage(), e); + } + + final List<ReplicationConfig> r = new ArrayList<ReplicationConfig>(); + for (final RemoteConfig c : allRemotes(cfg)) { + if (c.getURIs().isEmpty()) { + continue; + } + + for (final URIish u : c.getURIs()) { + if (u.getPath() == null || !u.getPath().contains("${name}")) { + throw new ConfigInvalidException("remote." + c.getName() + ".url" + + " \"" + u + "\" lacks ${name} placeholder in " + cfg.getFile()); + } + } + + if (c.getPushRefSpecs().isEmpty()) { + RefSpec spec = new RefSpec(); + spec = spec.setSourceDestination("refs/*", "refs/*"); + spec = spec.setForceUpdate(true); + c.addPushRefSpec(spec); + } + + r.add(new ReplicationConfig(injector, workQueue, c, cfg, database, + replicationUserFactory)); + } + return Collections.unmodifiableList(r); + } + + private List<RemoteConfig> allRemotes(final FileBasedConfig cfg) + throws ConfigInvalidException { + List<String> names = new ArrayList<String>(cfg.getSubsections("remote")); + Collections.sort(names); + + final List<RemoteConfig> result = new ArrayList<RemoteConfig>(names.size()); + for (final String name : names) { + try { + result.add(new RemoteConfig(cfg, name)); + } catch (URISyntaxException e) { + throw new ConfigInvalidException("remote " + name + + " has invalid URL in " + cfg.getFile()); + } + } + return result; + } + + public void replicateNewProject(Project.NameKey projectName, String head) { + if (!isEnabled()) { + return; + } + + Iterator<ReplicationConfig> configIter = configs.iterator(); + + while (configIter.hasNext()) { + ReplicationConfig rp = configIter.next(); + List<URIish> uriList = rp.getURIs(projectName, "*"); + + Iterator<URIish> uriIter = uriList.iterator(); + + while (uriIter.hasNext()) { + replicateProject(uriIter.next(), head); + } + } + } + + private void replicateProject(final URIish replicateURI, final String head) { + SshSessionFactory sshFactory = SshSessionFactory.getInstance(); + Session sshSession; + String projectPath = QuotedString.BOURNE.quote(replicateURI.getPath()); + + if (!usingSSH(replicateURI)) { + log.warn("Cannot create new project on remote site since the connection " + + "method is not SSH: " + replicateURI.toString()); + return; + } + + OutputStream errStream = createErrStream(); + String cmd = + "mkdir -p " + projectPath + "&& cd " + projectPath + + "&& git init --bare" + "&& git symbolic-ref HEAD " + + QuotedString.BOURNE.quote(head); + + try { + sshSession = + sshFactory.getSession(replicateURI.getUser(), replicateURI.getPass(), + replicateURI.getHost(), replicateURI.getPort()); + sshSession.connect(); + + Channel channel = sshSession.openChannel("exec"); + ((ChannelExec) channel).setCommand(cmd); + + channel.setInputStream(null); + + ((ChannelExec) channel).setErrStream(errStream); + + channel.connect(); + + while (!channel.isClosed()) { + try { + final int delay = 50; + Thread.sleep(delay); + } catch (InterruptedException e) { + } + } + channel.disconnect(); + sshSession.disconnect(); + } catch (JSchException e) { + log.error("Communication error when trying to replicate to: " + + replicateURI.toString() + "\n" + "Error reported: " + + e.getMessage() + "\n" + "Error in communication: " + + errStream.toString()); + } + } + + private OutputStream createErrStream() { + return new OutputStream() { + private StringBuilder all = new StringBuilder(); + private StringBuilder sb = new StringBuilder(); + + public String toString() { + String r = all.toString(); + while (r.endsWith("\n")) + r = r.substring(0, r.length() - 1); + return r; + } + + @Override + public void write(final int b) throws IOException { + if (b == '\r') { + return; + } + + sb.append((char) b); + + if (b == '\n') { + all.append(sb); + sb.setLength(0); + } + } + }; + } + + private boolean usingSSH(final URIish uri) { + final String scheme = uri.getScheme(); + if (!uri.isRemote()) return false; + if (scheme != null && scheme.toLowerCase().contains("ssh")) return true; + if (scheme == null && uri.getHost() != null && uri.getPath() != null) + return true; + return false; + } + + static class ReplicationConfig { + private final RemoteConfig remote; + private final int delay; + private final WorkQueue.Executor pool; + private final Map<URIish, PushOp> pending = new HashMap<URIish, PushOp>(); + private final PushOp.Factory opFactory; + private final ProjectControl.Factory projectControlFactory; + private final boolean authEnabled; + + ReplicationConfig(final Injector injector, final WorkQueue workQueue, + final RemoteConfig rc, final Config cfg, SchemaFactory<ReviewDb> db, + final ReplicationUser.Factory replicationUserFactory) { + + remote = rc; + delay = Math.max(0, getInt(rc, cfg, "replicationdelay", 15)); + + final int poolSize = Math.max(0, getInt(rc, cfg, "threads", 1)); + final String poolName = "ReplicateTo-" + rc.getName(); + pool = workQueue.createQueue(poolSize, poolName); + + String[] authGroupNames = + cfg.getStringList("remote", rc.getName(), "authGroup"); + authEnabled = authGroupNames.length > 0; + Set<AccountGroup.Id> authGroups = groupsFor(db, authGroupNames); + + final ReplicationUser remoteUser = + replicationUserFactory.create(authGroups); + + projectControlFactory = + injector.createChildInjector(new AbstractModule() { + @Override + protected void configure() { + bind(CurrentUser.class).toInstance(remoteUser); + } + }).getInstance(ProjectControl.Factory.class); + + opFactory = injector.createChildInjector(new AbstractModule() { + @Override + protected void configure() { + bind(PushReplication.ReplicationConfig.class).toInstance( + ReplicationConfig.this); + bind(RemoteConfig.class).toInstance(remote); + bind(PushOp.Factory.class).toProvider( + FactoryProvider.newFactory(PushOp.Factory.class, PushOp.class)); + } + }).getInstance(PushOp.Factory.class); + } + + private static Set<AccountGroup.Id> groupsFor( + SchemaFactory<ReviewDb> dbfactory, String[] groupNames) { + final Set<AccountGroup.Id> result = new HashSet<AccountGroup.Id>(); + try { + final ReviewDb db = dbfactory.open(); + try { + for (String name : groupNames) { + AccountGroup group = + db.accountGroups().get(new AccountGroup.NameKey(name)); + if (group == null) { + log.warn("Group \"" + name + "\" not in database," + + " removing from authGroup"); + } else { + result.add(group.getId()); + } + } + } finally { + db.close(); + } + } catch (OrmException e) { + log.error("Database error: " + e); + } + return result; + } + + private int getInt(final RemoteConfig rc, final Config cfg, + final String name, final int defValue) { + return cfg.getInt("remote", rc.getName(), name, defValue); + } + + void schedule(final Project.NameKey project, final String ref, + final URIish uri) { + try { + if (authEnabled + && !projectControlFactory.controlFor(project).isVisible()) { + return; + } + } catch (NoSuchProjectException e1) { + log.error("Internal error: project " + project + + " not found during replication"); + return; + } + synchronized (pending) { + PushOp e = pending.get(uri); + if (e == null) { + e = opFactory.create(project.get(), uri); + pool.schedule(e, delay, TimeUnit.SECONDS); + pending.put(uri, e); + } + e.addRef(ref); + } + } + + void notifyStarting(final PushOp op) { + synchronized (pending) { + pending.remove(op.getURI()); + } + } + + boolean wouldPushRef(final String ref) { + for (final RefSpec s : remote.getPushRefSpecs()) { + if (s.matchSource(ref)) { + return true; + } + } + return false; + } + + List<URIish> getURIs(final Project.NameKey project, final String urlMatch) { + final List<URIish> r = new ArrayList<URIish>(remote.getURIs().size()); + for (URIish uri : remote.getURIs()) { + if (matches(uri, urlMatch)) { + uri = uri.setPath(replace(uri.getPath(), "name", project.get())); + r.add(uri); + } + } + return r; + } + + private boolean matches(URIish uri, final String urlMatch) { + if (urlMatch == null || urlMatch.equals("") || urlMatch.equals("*")) { + return true; + } + return uri.toString().contains(urlMatch); + } + } +} |