diff options
author | Nasser Grainawi <nasser.grainawi@linaro.org> | 2024-02-13 18:56:29 +0000 |
---|---|---|
committer | Gerrit Code Review <noreply-gerritcodereview@google.com> | 2024-02-13 18:56:29 +0000 |
commit | a1f8983b106bff490f8d2957a297cee0d1f567cd (patch) | |
tree | 5b0d81395b484d2db18304385881761476b95f28 | |
parent | 4768d4df0bf54c3c8ed4601fd15202e240b439dc (diff) | |
parent | 920c412c6db1269023b00e4ec22ce17bee6c3a27 (diff) |
Merge "Fix WorkQueue bug to ensure reliable execution of scheduled tasks" into stable-3.8
-rw-r--r-- | java/com/google/gerrit/server/git/WorkQueue.java | 40 | ||||
-rw-r--r-- | javatests/com/google/gerrit/acceptance/server/util/WorkQueueIT.java | 71 |
2 files changed, 106 insertions, 5 deletions
diff --git a/java/com/google/gerrit/server/git/WorkQueue.java b/java/com/google/gerrit/server/git/WorkQueue.java index e8b7c62f15..86d6c7c7d2 100644 --- a/java/com/google/gerrit/server/git/WorkQueue.java +++ b/java/com/google/gerrit/server/git/WorkQueue.java @@ -14,6 +14,7 @@ package com.google.gerrit.server.git; +import static com.google.common.base.MoreObjects.firstNonNull; import static java.util.stream.Collectors.toList; import com.google.common.base.CaseFormat; @@ -286,6 +287,7 @@ public class WorkQueue { /** An isolated queue. */ private class Executor extends ScheduledThreadPoolExecutor { private final ConcurrentHashMap<Integer, Task<?>> all; + private final ConcurrentHashMap<Runnable, Long> nanosPeriodByRunnable; private final String queueName; Executor(int corePoolSize, final String queueName) { @@ -310,6 +312,7 @@ public class WorkQueue { 0.75f, // load factor corePoolSize + 4 // concurrency level ); + nanosPeriodByRunnable = new ConcurrentHashMap<>(1, 0.75f, 1); this.queueName = queueName; } @@ -373,12 +376,14 @@ public class WorkQueue { @Override public ScheduledFuture<?> scheduleAtFixedRate( Runnable command, long initialDelay, long period, TimeUnit unit) { + nanosPeriodByRunnable.put(command, unit.toNanos(period)); return super.scheduleAtFixedRate(LoggingContext.copy(command), initialDelay, period, unit); } @Override public ScheduledFuture<?> scheduleWithFixedDelay( Runnable command, long initialDelay, long delay, TimeUnit unit) { + nanosPeriodByRunnable.put(command, unit.toNanos(delay)); return super.scheduleWithFixedDelay(LoggingContext.copy(command), initialDelay, delay, unit); } @@ -440,6 +445,18 @@ public class WorkQueue { protected <V> RunnableScheduledFuture<V> decorateTask( Runnable runnable, RunnableScheduledFuture<V> r) { r = super.decorateTask(runnable, r); + + // Periodic Tasks may get rescheduled if the previous run has yet to fully complete (and thus + // passed to decorateTask() more than once), and there is no need to redecorate them if they + // are already decorated. + if (runnable instanceof LoggingContextAwareRunnable) { + Runnable unwrappedTask = ((LoggingContextAwareRunnable) runnable).unwrap(); + if (unwrappedTask instanceof Task<?>) { + return r; + } + } + + long nanosPeriod = firstNonNull(nanosPeriodByRunnable.remove(runnable), 0L); for (; ; ) { final int id = idGenerator.next(); @@ -450,9 +467,9 @@ public class WorkQueue { } if (runnable instanceof ProjectRunnable) { - task = new ProjectTask<>((ProjectRunnable) runnable, r, this, id); + task = new ProjectTask<>((ProjectRunnable) runnable, r, nanosPeriod, this, id); } else { - task = new Task<>(runnable, r, this, id); + task = new Task<>(runnable, r, nanosPeriod, this, id); } if (all.putIfAbsent(task.getTaskId(), task) == null) { @@ -553,13 +570,20 @@ public class WorkQueue { private final Executor executor; private final int taskId; private final Instant startTime; + private final long nanosPeriod; // runningState is non-null when listener or task code is running in an executor thread private final AtomicReference<State> runningState = new AtomicReference<>(); - Task(Runnable runnable, RunnableScheduledFuture<V> task, Executor executor, int taskId) { + Task( + Runnable runnable, + RunnableScheduledFuture<V> task, + long nanosPeriod, + Executor executor, + int taskId) { this.runnable = runnable; this.task = task; + this.nanosPeriod = nanosPeriod; this.executor = executor; this.taskId = taskId; this.startTime = Instant.now(); @@ -684,6 +708,8 @@ public class WorkQueue { executor.remove(this); } } + } else { + Future<?> unusedFuture = executor.schedule(this, nanosPeriod / 3, TimeUnit.NANOSECONDS); } } @@ -731,8 +757,12 @@ public class WorkQueue { private final ProjectRunnable runnable; ProjectTask( - ProjectRunnable runnable, RunnableScheduledFuture<V> task, Executor executor, int taskId) { - super(runnable, task, executor, taskId); + ProjectRunnable runnable, + RunnableScheduledFuture<V> task, + long nanosPeriod, + Executor executor, + int taskId) { + super(runnable, task, nanosPeriod, executor, taskId); this.runnable = runnable; } diff --git a/javatests/com/google/gerrit/acceptance/server/util/WorkQueueIT.java b/javatests/com/google/gerrit/acceptance/server/util/WorkQueueIT.java new file mode 100644 index 0000000000..c43e4101e7 --- /dev/null +++ b/javatests/com/google/gerrit/acceptance/server/util/WorkQueueIT.java @@ -0,0 +1,71 @@ +package com.google.gerrit.acceptance.server.util; + +import static com.google.common.truth.Truth.assertThat; + +import com.google.gerrit.acceptance.AbstractDaemonTest; +import com.google.gerrit.extensions.annotations.Exports; +import com.google.gerrit.server.git.WorkQueue; +import com.google.inject.AbstractModule; +import com.google.inject.Inject; +import com.google.inject.Module; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import org.junit.Test; + +public class WorkQueueIT extends AbstractDaemonTest { + public static class TestListener implements WorkQueue.TaskListener { + + @Override + public void onStart(WorkQueue.Task<?> task) {} + + @Override + public void onStop(WorkQueue.Task<?> task) { + try { + Thread.sleep(FIXED_RATE_SCHEDULE_INTERVAL_MILLI_SEC); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + } + + private static final Integer FIXED_RATE_SCHEDULE_INITIAL_DELAY = 0; + private static final Integer FIXED_RATE_SCHEDULE_INTERVAL_MILLI_SEC = 1000; + private static final Integer POOL_CORE_SIZE = 8; + private static final String QUEUE_NAME = "test-Queue"; + private static final Integer EXCEPT_RUN_TIMES = 2; + private final CountDownLatch downLatch = new CountDownLatch(EXCEPT_RUN_TIMES); + @Inject private WorkQueue workQueue; + private TestListener testListener; + + @Override + public Module createModule() { + return new AbstractModule() { + @Override + public void configure() { + testListener = new TestListener(); + bind(WorkQueue.TaskListener.class) + .annotatedWith(Exports.named("listener")) + .toInstance(testListener); + } + }; + } + + @Test + public void testScheduleAtFixedRate() throws InterruptedException { + ScheduledExecutorService testExecutor = workQueue.createQueue(POOL_CORE_SIZE, QUEUE_NAME); + ScheduledFuture<?> unusedFuture = + testExecutor.scheduleAtFixedRate( + downLatch::countDown, + FIXED_RATE_SCHEDULE_INITIAL_DELAY, + FIXED_RATE_SCHEDULE_INTERVAL_MILLI_SEC, + TimeUnit.MILLISECONDS); + + boolean ifRunMoreThanOnce = + downLatch.await( + EXCEPT_RUN_TIMES * FIXED_RATE_SCHEDULE_INTERVAL_MILLI_SEC, TimeUnit.MILLISECONDS); + assertThat(ifRunMoreThanOnce).isTrue(); + testExecutor.shutdownNow(); + } +} |