diff options
Diffstat (limited to 'gerrit-server/src/main/java/com/google/gerrit/server/git/WorkQueue.java')
-rw-r--r-- | gerrit-server/src/main/java/com/google/gerrit/server/git/WorkQueue.java | 632 |
1 files changed, 0 insertions, 632 deletions
diff --git a/gerrit-server/src/main/java/com/google/gerrit/server/git/WorkQueue.java b/gerrit-server/src/main/java/com/google/gerrit/server/git/WorkQueue.java deleted file mode 100644 index 72bc805aa6..0000000000 --- a/gerrit-server/src/main/java/com/google/gerrit/server/git/WorkQueue.java +++ /dev/null @@ -1,632 +0,0 @@ -// Copyright (C) 2009 The Android Open Source Project -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package com.google.gerrit.server.git; - -import com.google.common.base.CaseFormat; -import com.google.common.base.Supplier; -import com.google.gerrit.extensions.events.LifecycleListener; -import com.google.gerrit.lifecycle.LifecycleModule; -import com.google.gerrit.metrics.Description; -import com.google.gerrit.metrics.MetricMaker; -import com.google.gerrit.reviewdb.client.Project; -import com.google.gerrit.server.config.GerritServerConfig; -import com.google.gerrit.server.util.IdGenerator; -import com.google.inject.Inject; -import com.google.inject.Singleton; -import java.lang.Thread.UncaughtExceptionHandler; -import java.lang.reflect.Field; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Date; -import java.util.List; -import java.util.concurrent.Callable; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.Delayed; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.RunnableScheduledFuture; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import org.eclipse.jgit.lib.Config; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** Delayed execution of tasks using a background thread pool. */ -@Singleton -public class WorkQueue { - public static class Lifecycle implements LifecycleListener { - private final WorkQueue workQueue; - - @Inject - Lifecycle(WorkQueue workQeueue) { - this.workQueue = workQeueue; - } - - @Override - public void start() {} - - @Override - public void stop() { - workQueue.stop(); - } - } - - public static class Module extends LifecycleModule { - @Override - protected void configure() { - bind(WorkQueue.class); - listener().to(Lifecycle.class); - } - } - - private static final Logger log = LoggerFactory.getLogger(WorkQueue.class); - private static final UncaughtExceptionHandler LOG_UNCAUGHT_EXCEPTION = - new UncaughtExceptionHandler() { - @Override - public void uncaughtException(Thread t, Throwable e) { - log.error("WorkQueue thread " + t.getName() + " threw exception", e); - } - }; - - private final ScheduledExecutorService defaultQueue; - private final IdGenerator idGenerator; - private final MetricMaker metrics; - private final CopyOnWriteArrayList<Executor> queues; - - @Inject - WorkQueue(IdGenerator idGenerator, @GerritServerConfig Config cfg, MetricMaker metrics) { - this(idGenerator, cfg.getInt("execution", "defaultThreadPoolSize", 1), metrics); - } - - /** Constructor to allow binding the WorkQueue more explicitly in a vhost setup. */ - public WorkQueue(IdGenerator idGenerator, int defaultThreadPoolSize, MetricMaker metrics) { - this.idGenerator = idGenerator; - this.metrics = metrics; - this.queues = new CopyOnWriteArrayList<>(); - this.defaultQueue = createQueue(defaultThreadPoolSize, "WorkQueue", true); - } - - /** Get the default work queue, for miscellaneous tasks. */ - public ScheduledExecutorService getDefaultQueue() { - return defaultQueue; - } - - /** - * Create a new executor queue. - * - * <p>Creates a new executor queue without associated metrics. This method is suitable for use by - * plugins. - * - * <p>If metrics are needed, use {@link #createQueue(int, String, int, boolean)} instead. - * - * @param poolsize the size of the pool. - * @param queueName the name of the queue. - */ - public ScheduledExecutorService createQueue(int poolsize, String queueName) { - return createQueue(poolsize, queueName, Thread.NORM_PRIORITY, false); - } - - /** - * Create a new executor queue, with default priority, optionally with metrics. - * - * <p>Creates a new executor queue, optionally with associated metrics. Metrics should not be - * requested for queues created by plugins. - * - * @param poolsize the size of the pool. - * @param queueName the name of the queue. - * @param withMetrics whether to create metrics. - */ - public ScheduledThreadPoolExecutor createQueue( - int poolsize, String queueName, boolean withMetrics) { - return createQueue(poolsize, queueName, Thread.NORM_PRIORITY, withMetrics); - } - - /** - * Create a new executor queue, optionally with metrics. - * - * <p>Creates a new executor queue, optionally with associated metrics. Metrics should not be - * requested for queues created by plugins. - * - * @param poolsize the size of the pool. - * @param queueName the name of the queue. - * @param threadPriority thread priority. - * @param withMetrics whether to create metrics. - */ - public ScheduledThreadPoolExecutor createQueue( - int poolsize, String queueName, int threadPriority, boolean withMetrics) { - Executor executor = new Executor(poolsize, queueName); - if (withMetrics) { - log.info("Adding metrics for '{}' queue", queueName); - executor.buildMetrics(queueName); - } - executor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false); - executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(true); - queues.add(executor); - if (threadPriority != Thread.NORM_PRIORITY) { - ThreadFactory parent = executor.getThreadFactory(); - executor.setThreadFactory( - task -> { - Thread t = parent.newThread(task); - t.setPriority(threadPriority); - return t; - }); - } - - return executor; - } - - /** Get all of the tasks currently scheduled in any work queue. */ - public List<Task<?>> getTasks() { - final List<Task<?>> r = new ArrayList<>(); - for (Executor e : queues) { - e.addAllTo(r); - } - return r; - } - - public <T> List<T> getTaskInfos(TaskInfoFactory<T> factory) { - List<T> taskInfos = new ArrayList<>(); - for (Executor exe : queues) { - for (Task<?> task : exe.getTasks()) { - taskInfos.add(factory.getTaskInfo(task)); - } - } - return taskInfos; - } - - /** Locate a task by its unique id, null if no task matches. */ - public Task<?> getTask(int id) { - Task<?> result = null; - for (Executor e : queues) { - final Task<?> t = e.getTask(id); - if (t != null) { - if (result != null) { - // Don't return the task if we have a duplicate. Lie instead. - return null; - } - result = t; - } - } - return result; - } - - public ScheduledThreadPoolExecutor getExecutor(String queueName) { - for (Executor e : queues) { - if (e.queueName.equals(queueName)) { - return e; - } - } - return null; - } - - private void stop() { - for (Executor p : queues) { - p.shutdown(); - boolean isTerminated; - do { - try { - isTerminated = p.awaitTermination(10, TimeUnit.SECONDS); - } catch (InterruptedException ie) { - isTerminated = false; - } - } while (!isTerminated); - } - queues.clear(); - } - - /** An isolated queue. */ - private class Executor extends ScheduledThreadPoolExecutor { - private final ConcurrentHashMap<Integer, Task<?>> all; - private final String queueName; - - Executor(int corePoolSize, final String queueName) { - super( - corePoolSize, - new ThreadFactory() { - private final ThreadFactory parent = Executors.defaultThreadFactory(); - private final AtomicInteger tid = new AtomicInteger(1); - - @Override - public Thread newThread(Runnable task) { - final Thread t = parent.newThread(task); - t.setName(queueName + "-" + tid.getAndIncrement()); - t.setUncaughtExceptionHandler(LOG_UNCAUGHT_EXCEPTION); - return t; - } - }); - - all = - new ConcurrentHashMap<>( // - corePoolSize << 1, // table size - 0.75f, // load factor - corePoolSize + 4 // concurrency level - ); - this.queueName = queueName; - } - - @Override - protected void terminated() { - super.terminated(); - queues.remove(this); - } - - private void buildMetrics(String queueName) { - metrics.newCallbackMetric( - getMetricName(queueName, "max_pool_size"), - Long.class, - new Description("Maximum allowed number of threads in the pool") - .setGauge() - .setUnit("threads"), - new Supplier<Long>() { - @Override - public Long get() { - return (long) getMaximumPoolSize(); - } - }); - metrics.newCallbackMetric( - getMetricName(queueName, "pool_size"), - Long.class, - new Description("Current number of threads in the pool").setGauge().setUnit("threads"), - new Supplier<Long>() { - @Override - public Long get() { - return (long) getPoolSize(); - } - }); - metrics.newCallbackMetric( - getMetricName(queueName, "active_threads"), - Long.class, - new Description("Number number of threads that are actively executing tasks") - .setGauge() - .setUnit("threads"), - new Supplier<Long>() { - @Override - public Long get() { - return (long) getActiveCount(); - } - }); - metrics.newCallbackMetric( - getMetricName(queueName, "scheduled_tasks"), - Integer.class, - new Description("Number of scheduled tasks in the queue").setGauge().setUnit("tasks"), - new Supplier<Integer>() { - @Override - public Integer get() { - return getQueue().size(); - } - }); - metrics.newCallbackMetric( - getMetricName(queueName, "total_scheduled_tasks_count"), - Long.class, - new Description("Total number of tasks that have been scheduled for execution") - .setCumulative() - .setUnit("tasks"), - new Supplier<Long>() { - @Override - public Long get() { - return (long) getTaskCount(); - } - }); - metrics.newCallbackMetric( - getMetricName(queueName, "total_completed_tasks_count"), - Long.class, - new Description("Total number of tasks that have completed execution") - .setCumulative() - .setUnit("tasks"), - new Supplier<Long>() { - @Override - public Long get() { - return (long) getCompletedTaskCount(); - } - }); - } - - private String getMetricName(String queueName, String metricName) { - String name = - CaseFormat.UPPER_CAMEL.to( - CaseFormat.LOWER_UNDERSCORE, - queueName.replaceFirst("SSH", "Ssh").replaceAll("-", "")); - return metrics.sanitizeMetricName(String.format("queue/%s/%s", name, metricName)); - } - - @Override - protected <V> RunnableScheduledFuture<V> decorateTask( - Runnable runnable, RunnableScheduledFuture<V> r) { - r = super.decorateTask(runnable, r); - for (; ; ) { - final int id = idGenerator.next(); - - Task<V> task; - - if (runnable instanceof ProjectRunnable) { - task = new ProjectTask<>((ProjectRunnable) runnable, r, this, id); - } else { - task = new Task<>(runnable, r, this, id); - } - - if (all.putIfAbsent(task.getTaskId(), task) == null) { - return task; - } - } - } - - @Override - protected <V> RunnableScheduledFuture<V> decorateTask( - Callable<V> callable, RunnableScheduledFuture<V> task) { - throw new UnsupportedOperationException("Callable not implemented"); - } - - void remove(Task<?> task) { - all.remove(task.getTaskId(), task); - } - - Task<?> getTask(int id) { - return all.get(id); - } - - void addAllTo(List<Task<?>> list) { - list.addAll(all.values()); // iterator is thread safe - } - - Collection<Task<?>> getTasks() { - return all.values(); - } - } - - /** - * Runnable needing to know it was canceled. Note that cancel is called only in case the task is - * not in progress already. - */ - public interface CancelableRunnable extends Runnable { - /** Notifies the runnable it was canceled. */ - void cancel(); - } - - /** - * Base interface handles the case when task was canceled before actual execution and in case it - * was started cancel method is not called yet the task itself will be destroyed anyway (it will - * result in resource opening errors). This interface gives a chance to implementing classes for - * handling such scenario and act accordingly. - */ - public interface CanceledWhileRunning extends CancelableRunnable { - /** Notifies the runnable it was canceled during execution. * */ - void setCanceledWhileRunning(); - } - - /** A wrapper around a scheduled Runnable, as maintained in the queue. */ - public static class Task<V> implements RunnableScheduledFuture<V> { - /** - * Summarized status of a single task. - * - * <p>Tasks have the following state flow: - * - * <ol> - * <li>{@link #SLEEPING}: if scheduled with a non-zero delay. - * <li>{@link #READY}: waiting for an available worker thread. - * <li>{@link #RUNNING}: actively executing on a worker thread. - * <li>{@link #DONE}: finished executing, if not periodic. - * </ol> - */ - public enum State { - // Ordered like this so ordinal matches the order we would - // prefer to see tasks sorted in: done before running, - // running before ready, ready before sleeping. - // - DONE, - CANCELLED, - RUNNING, - READY, - SLEEPING, - OTHER - } - - private final Runnable runnable; - private final RunnableScheduledFuture<V> task; - private final Executor executor; - private final int taskId; - private final AtomicBoolean running; - private final Date startTime; - - Task(Runnable runnable, RunnableScheduledFuture<V> task, Executor executor, int taskId) { - this.runnable = runnable; - this.task = task; - this.executor = executor; - this.taskId = taskId; - this.running = new AtomicBoolean(); - this.startTime = new Date(); - } - - public int getTaskId() { - return taskId; - } - - public State getState() { - if (isCancelled()) { - return State.CANCELLED; - } else if (isDone() && !isPeriodic()) { - return State.DONE; - } else if (running.get()) { - return State.RUNNING; - } - - final long delay = getDelay(TimeUnit.MILLISECONDS); - if (delay <= 0) { - return State.READY; - } - return State.SLEEPING; - } - - public Date getStartTime() { - return startTime; - } - - public String getQueueName() { - return executor.queueName; - } - - @Override - public boolean cancel(boolean mayInterruptIfRunning) { - if (task.cancel(mayInterruptIfRunning)) { - // Tiny abuse of running: if the task needs to know it was - // canceled (to clean up resources) and it hasn't started - // yet the task's run method won't execute. So we tag it - // as running and allow it to clean up. This ensures we do - // not invoke cancel twice. - // - if (runnable instanceof CancelableRunnable) { - if (running.compareAndSet(false, true)) { - ((CancelableRunnable) runnable).cancel(); - } else if (runnable instanceof CanceledWhileRunning) { - ((CanceledWhileRunning) runnable).setCanceledWhileRunning(); - } - } - if (runnable instanceof Future<?>) { - // Creating new futures eventually passes through - // AbstractExecutorService#schedule, which will convert the Guava - // Future to a Runnable, thereby making it impossible for the - // cancellation to propagate from ScheduledThreadPool's task back to - // the Guava future, so kludge it here. - ((Future<?>) runnable).cancel(mayInterruptIfRunning); - } - - executor.remove(this); - executor.purge(); - return true; - } - return false; - } - - @Override - public int compareTo(Delayed o) { - return task.compareTo(o); - } - - @Override - public V get() throws InterruptedException, ExecutionException { - return task.get(); - } - - @Override - public V get(long timeout, TimeUnit unit) - throws InterruptedException, ExecutionException, TimeoutException { - return task.get(timeout, unit); - } - - @Override - public long getDelay(TimeUnit unit) { - return task.getDelay(unit); - } - - @Override - public boolean isCancelled() { - return task.isCancelled(); - } - - @Override - public boolean isDone() { - return task.isDone(); - } - - @Override - public boolean isPeriodic() { - return task.isPeriodic(); - } - - @Override - public void run() { - if (running.compareAndSet(false, true)) { - try { - task.run(); - } finally { - if (isPeriodic()) { - running.set(false); - } else { - executor.remove(this); - } - } - } - } - - @Override - public String toString() { - // This is a workaround to be able to print a proper name when the task - // is wrapped into a TrustedListenableFutureTask. - try { - if (runnable - .getClass() - .isAssignableFrom( - Class.forName("com.google.common.util.concurrent.TrustedListenableFutureTask"))) { - Class<?> trustedFutureInterruptibleTask = - Class.forName( - "com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask"); - for (Field field : runnable.getClass().getDeclaredFields()) { - if (field.getType().isAssignableFrom(trustedFutureInterruptibleTask)) { - field.setAccessible(true); - Object innerObj = field.get(runnable); - if (innerObj != null) { - for (Field innerField : innerObj.getClass().getDeclaredFields()) { - if (innerField.getType().isAssignableFrom(Callable.class)) { - innerField.setAccessible(true); - return ((Callable<?>) innerField.get(innerObj)).toString(); - } - } - } - } - } - } - } catch (ClassNotFoundException | IllegalArgumentException | IllegalAccessException e) { - log.debug("Cannot get a proper name for TrustedListenableFutureTask: {}", e.getMessage()); - } - return runnable.toString(); - } - } - - /** - * Same as Task class, but with a reference to ProjectRunnable, used to retrieve the project name - * from the operation queued - */ - public static class ProjectTask<V> extends Task<V> implements ProjectRunnable { - - private final ProjectRunnable runnable; - - ProjectTask( - ProjectRunnable runnable, RunnableScheduledFuture<V> task, Executor executor, int taskId) { - super(runnable, task, executor, taskId); - this.runnable = runnable; - } - - @Override - public Project.NameKey getProjectNameKey() { - return runnable.getProjectNameKey(); - } - - @Override - public String getRemoteName() { - return runnable.getRemoteName(); - } - - @Override - public boolean hasCustomizedPrint() { - return runnable.hasCustomizedPrint(); - } - } -} |