diff options
Diffstat (limited to 'java/com/google/gerrit/pgm/http/jetty/ProjectQoSFilter.java')
-rw-r--r-- | java/com/google/gerrit/pgm/http/jetty/ProjectQoSFilter.java | 246 |
1 files changed, 246 insertions, 0 deletions
diff --git a/java/com/google/gerrit/pgm/http/jetty/ProjectQoSFilter.java b/java/com/google/gerrit/pgm/http/jetty/ProjectQoSFilter.java new file mode 100644 index 0000000000..93542099f3 --- /dev/null +++ b/java/com/google/gerrit/pgm/http/jetty/ProjectQoSFilter.java @@ -0,0 +1,246 @@ +// Copyright (C) 2010 The Android Open Source Project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.google.gerrit.pgm.http.jetty; + +import static com.google.gerrit.server.config.ConfigUtil.getTimeUnit; +import static java.util.concurrent.TimeUnit.MINUTES; +import static javax.servlet.http.HttpServletResponse.SC_SERVICE_UNAVAILABLE; + +import com.google.gerrit.server.CurrentUser; +import com.google.gerrit.server.account.AccountLimits; +import com.google.gerrit.server.config.GerritServerConfig; +import com.google.gerrit.server.git.QueueProvider; +import com.google.gerrit.server.git.WorkQueue.CancelableRunnable; +import com.google.gerrit.sshd.CommandExecutorQueueProvider; +import com.google.inject.Inject; +import com.google.inject.Provider; +import com.google.inject.Singleton; +import com.google.inject.servlet.ServletModule; +import java.io.IOException; +import java.util.Optional; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import javax.servlet.Filter; +import javax.servlet.FilterChain; +import javax.servlet.FilterConfig; +import javax.servlet.ServletContext; +import javax.servlet.ServletException; +import javax.servlet.ServletRequest; +import javax.servlet.ServletResponse; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import org.eclipse.jetty.continuation.Continuation; +import org.eclipse.jetty.continuation.ContinuationListener; +import org.eclipse.jetty.continuation.ContinuationSupport; +import org.eclipse.jgit.lib.Config; + +/** + * Use Jetty continuations to defer execution until threads are available. + * + * <p>We actually schedule a task into the same execution queue as the SSH daemon uses for command + * execution, and then park the web request in a continuation until an execution thread is + * available. This ensures that the overall JVM process doesn't exceed the configured limit on + * concurrent Git requests. + * + * <p>During Git request execution however we have to use the Jetty service thread, not the thread + * from the SSH execution queue. Trying to complete the request on the SSH execution queue caused + * Jetty's HTTP parser to crash, so we instead block the SSH execution queue thread and ask Jetty to + * resume processing on the web service thread. + */ +@SuppressWarnings("deprecation") +@Singleton +public class ProjectQoSFilter implements Filter { + private static final String ATT_SPACE = ProjectQoSFilter.class.getName(); + private static final String TASK = ATT_SPACE + "/TASK"; + private static final String CANCEL = ATT_SPACE + "/CANCEL"; + + private static final String FILTER_RE = "^/(.*)/(git-upload-pack|git-receive-pack)$"; + private static final Pattern URI_PATTERN = Pattern.compile(FILTER_RE); + + public static class Module extends ServletModule { + @Override + protected void configureServlets() { + bind(QueueProvider.class).to(CommandExecutorQueueProvider.class); + filterRegex(FILTER_RE).through(ProjectQoSFilter.class); + } + } + + private final AccountLimits.Factory limitsFactory; + private final Provider<CurrentUser> user; + private final QueueProvider queue; + private final ServletContext context; + private final long maxWait; + + @Inject + ProjectQoSFilter( + AccountLimits.Factory limitsFactory, + Provider<CurrentUser> user, + QueueProvider queue, + ServletContext context, + @GerritServerConfig Config cfg) { + this.limitsFactory = limitsFactory; + this.user = user; + this.queue = queue; + this.context = context; + this.maxWait = MINUTES.toMillis(getTimeUnit(cfg, "httpd", null, "maxwait", 5, MINUTES)); + } + + @Override + public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) + throws IOException, ServletException { + final HttpServletRequest req = (HttpServletRequest) request; + final HttpServletResponse rsp = (HttpServletResponse) response; + final Continuation cont = ContinuationSupport.getContinuation(req); + + if (cont.isInitial()) { + TaskThunk task = new TaskThunk(cont, req); + if (maxWait > 0) { + cont.setTimeout(maxWait); + } + cont.suspend(rsp); + cont.setAttribute(TASK, task); + + Future<?> f = getExecutor().submit(task); + cont.addContinuationListener(new Listener(f)); + } else if (cont.isExpired()) { + rsp.sendError(SC_SERVICE_UNAVAILABLE); + + } else if (cont.isResumed() && cont.getAttribute(CANCEL) == Boolean.TRUE) { + rsp.sendError(SC_SERVICE_UNAVAILABLE); + + } else if (cont.isResumed()) { + TaskThunk task = (TaskThunk) cont.getAttribute(TASK); + try { + task.begin(Thread.currentThread()); + chain.doFilter(req, rsp); + } finally { + task.end(); + Thread.interrupted(); + } + + } else { + context.log("Unexpected QoS continuation state, aborting request"); + rsp.sendError(SC_SERVICE_UNAVAILABLE); + } + } + + private ScheduledThreadPoolExecutor getExecutor() { + QueueProvider.QueueType qt = limitsFactory.create(user.get()).getQueueType(); + return queue.getQueue(qt); + } + + @Override + public void init(FilterConfig config) {} + + @Override + public void destroy() {} + + private static final class Listener implements ContinuationListener { + final Future<?> future; + + Listener(Future<?> future) { + this.future = future; + } + + @Override + public void onComplete(Continuation self) {} + + @Override + public void onTimeout(Continuation self) { + future.cancel(true); + } + } + + private final class TaskThunk implements CancelableRunnable { + private final Continuation cont; + private final String name; + private final Object lock = new Object(); + private boolean done; + private Thread worker; + + TaskThunk(Continuation cont, HttpServletRequest req) { + this.cont = cont; + this.name = generateName(req); + } + + @Override + public void run() { + cont.resume(); + + synchronized (lock) { + while (!done) { + try { + lock.wait(); + } catch (InterruptedException e) { + if (worker != null) { + worker.interrupt(); + } else { + break; + } + } + } + } + } + + void begin(Thread thread) { + synchronized (lock) { + worker = thread; + } + } + + void end() { + synchronized (lock) { + worker = null; + done = true; + lock.notifyAll(); + } + } + + @Override + public void cancel() { + cont.setAttribute(CANCEL, Boolean.TRUE); + cont.resume(); + } + + @Override + public String toString() { + return name; + } + + private String generateName(HttpServletRequest req) { + String userName = ""; + + CurrentUser who = user.get(); + if (who.isIdentifiedUser()) { + Optional<String> name = who.asIdentifiedUser().getUserName(); + if (name.isPresent()) { + userName = " (" + name.get() + ")"; + } + } + + String uri = req.getServletPath(); + Matcher m = URI_PATTERN.matcher(uri); + if (m.matches()) { + String path = m.group(1); + String cmd = m.group(2); + return cmd + " " + path + userName; + } + + return req.getMethod() + " " + uri + userName; + } + } +} |