summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorNasser Grainawi <nasser.grainawi@linaro.org>2024-02-13 18:56:29 +0000
committerGerrit Code Review <noreply-gerritcodereview@google.com>2024-02-13 18:56:29 +0000
commita1f8983b106bff490f8d2957a297cee0d1f567cd (patch)
tree5b0d81395b484d2db18304385881761476b95f28
parent4768d4df0bf54c3c8ed4601fd15202e240b439dc (diff)
parent920c412c6db1269023b00e4ec22ce17bee6c3a27 (diff)
Merge "Fix WorkQueue bug to ensure reliable execution of scheduled tasks" into stable-3.8
-rw-r--r--java/com/google/gerrit/server/git/WorkQueue.java40
-rw-r--r--javatests/com/google/gerrit/acceptance/server/util/WorkQueueIT.java71
2 files changed, 106 insertions, 5 deletions
diff --git a/java/com/google/gerrit/server/git/WorkQueue.java b/java/com/google/gerrit/server/git/WorkQueue.java
index e8b7c62f15..86d6c7c7d2 100644
--- a/java/com/google/gerrit/server/git/WorkQueue.java
+++ b/java/com/google/gerrit/server/git/WorkQueue.java
@@ -14,6 +14,7 @@
package com.google.gerrit.server.git;
+import static com.google.common.base.MoreObjects.firstNonNull;
import static java.util.stream.Collectors.toList;
import com.google.common.base.CaseFormat;
@@ -286,6 +287,7 @@ public class WorkQueue {
/** An isolated queue. */
private class Executor extends ScheduledThreadPoolExecutor {
private final ConcurrentHashMap<Integer, Task<?>> all;
+ private final ConcurrentHashMap<Runnable, Long> nanosPeriodByRunnable;
private final String queueName;
Executor(int corePoolSize, final String queueName) {
@@ -310,6 +312,7 @@ public class WorkQueue {
0.75f, // load factor
corePoolSize + 4 // concurrency level
);
+ nanosPeriodByRunnable = new ConcurrentHashMap<>(1, 0.75f, 1);
this.queueName = queueName;
}
@@ -373,12 +376,14 @@ public class WorkQueue {
@Override
public ScheduledFuture<?> scheduleAtFixedRate(
Runnable command, long initialDelay, long period, TimeUnit unit) {
+ nanosPeriodByRunnable.put(command, unit.toNanos(period));
return super.scheduleAtFixedRate(LoggingContext.copy(command), initialDelay, period, unit);
}
@Override
public ScheduledFuture<?> scheduleWithFixedDelay(
Runnable command, long initialDelay, long delay, TimeUnit unit) {
+ nanosPeriodByRunnable.put(command, unit.toNanos(delay));
return super.scheduleWithFixedDelay(LoggingContext.copy(command), initialDelay, delay, unit);
}
@@ -440,6 +445,18 @@ public class WorkQueue {
protected <V> RunnableScheduledFuture<V> decorateTask(
Runnable runnable, RunnableScheduledFuture<V> r) {
r = super.decorateTask(runnable, r);
+
+ // Periodic Tasks may get rescheduled if the previous run has yet to fully complete (and thus
+ // passed to decorateTask() more than once), and there is no need to redecorate them if they
+ // are already decorated.
+ if (runnable instanceof LoggingContextAwareRunnable) {
+ Runnable unwrappedTask = ((LoggingContextAwareRunnable) runnable).unwrap();
+ if (unwrappedTask instanceof Task<?>) {
+ return r;
+ }
+ }
+
+ long nanosPeriod = firstNonNull(nanosPeriodByRunnable.remove(runnable), 0L);
for (; ; ) {
final int id = idGenerator.next();
@@ -450,9 +467,9 @@ public class WorkQueue {
}
if (runnable instanceof ProjectRunnable) {
- task = new ProjectTask<>((ProjectRunnable) runnable, r, this, id);
+ task = new ProjectTask<>((ProjectRunnable) runnable, r, nanosPeriod, this, id);
} else {
- task = new Task<>(runnable, r, this, id);
+ task = new Task<>(runnable, r, nanosPeriod, this, id);
}
if (all.putIfAbsent(task.getTaskId(), task) == null) {
@@ -553,13 +570,20 @@ public class WorkQueue {
private final Executor executor;
private final int taskId;
private final Instant startTime;
+ private final long nanosPeriod;
// runningState is non-null when listener or task code is running in an executor thread
private final AtomicReference<State> runningState = new AtomicReference<>();
- Task(Runnable runnable, RunnableScheduledFuture<V> task, Executor executor, int taskId) {
+ Task(
+ Runnable runnable,
+ RunnableScheduledFuture<V> task,
+ long nanosPeriod,
+ Executor executor,
+ int taskId) {
this.runnable = runnable;
this.task = task;
+ this.nanosPeriod = nanosPeriod;
this.executor = executor;
this.taskId = taskId;
this.startTime = Instant.now();
@@ -684,6 +708,8 @@ public class WorkQueue {
executor.remove(this);
}
}
+ } else {
+ Future<?> unusedFuture = executor.schedule(this, nanosPeriod / 3, TimeUnit.NANOSECONDS);
}
}
@@ -731,8 +757,12 @@ public class WorkQueue {
private final ProjectRunnable runnable;
ProjectTask(
- ProjectRunnable runnable, RunnableScheduledFuture<V> task, Executor executor, int taskId) {
- super(runnable, task, executor, taskId);
+ ProjectRunnable runnable,
+ RunnableScheduledFuture<V> task,
+ long nanosPeriod,
+ Executor executor,
+ int taskId) {
+ super(runnable, task, nanosPeriod, executor, taskId);
this.runnable = runnable;
}
diff --git a/javatests/com/google/gerrit/acceptance/server/util/WorkQueueIT.java b/javatests/com/google/gerrit/acceptance/server/util/WorkQueueIT.java
new file mode 100644
index 0000000000..c43e4101e7
--- /dev/null
+++ b/javatests/com/google/gerrit/acceptance/server/util/WorkQueueIT.java
@@ -0,0 +1,71 @@
+package com.google.gerrit.acceptance.server.util;
+
+import static com.google.common.truth.Truth.assertThat;
+
+import com.google.gerrit.acceptance.AbstractDaemonTest;
+import com.google.gerrit.extensions.annotations.Exports;
+import com.google.gerrit.server.git.WorkQueue;
+import com.google.inject.AbstractModule;
+import com.google.inject.Inject;
+import com.google.inject.Module;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import org.junit.Test;
+
+public class WorkQueueIT extends AbstractDaemonTest {
+ public static class TestListener implements WorkQueue.TaskListener {
+
+ @Override
+ public void onStart(WorkQueue.Task<?> task) {}
+
+ @Override
+ public void onStop(WorkQueue.Task<?> task) {
+ try {
+ Thread.sleep(FIXED_RATE_SCHEDULE_INTERVAL_MILLI_SEC);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ private static final Integer FIXED_RATE_SCHEDULE_INITIAL_DELAY = 0;
+ private static final Integer FIXED_RATE_SCHEDULE_INTERVAL_MILLI_SEC = 1000;
+ private static final Integer POOL_CORE_SIZE = 8;
+ private static final String QUEUE_NAME = "test-Queue";
+ private static final Integer EXCEPT_RUN_TIMES = 2;
+ private final CountDownLatch downLatch = new CountDownLatch(EXCEPT_RUN_TIMES);
+ @Inject private WorkQueue workQueue;
+ private TestListener testListener;
+
+ @Override
+ public Module createModule() {
+ return new AbstractModule() {
+ @Override
+ public void configure() {
+ testListener = new TestListener();
+ bind(WorkQueue.TaskListener.class)
+ .annotatedWith(Exports.named("listener"))
+ .toInstance(testListener);
+ }
+ };
+ }
+
+ @Test
+ public void testScheduleAtFixedRate() throws InterruptedException {
+ ScheduledExecutorService testExecutor = workQueue.createQueue(POOL_CORE_SIZE, QUEUE_NAME);
+ ScheduledFuture<?> unusedFuture =
+ testExecutor.scheduleAtFixedRate(
+ downLatch::countDown,
+ FIXED_RATE_SCHEDULE_INITIAL_DELAY,
+ FIXED_RATE_SCHEDULE_INTERVAL_MILLI_SEC,
+ TimeUnit.MILLISECONDS);
+
+ boolean ifRunMoreThanOnce =
+ downLatch.await(
+ EXCEPT_RUN_TIMES * FIXED_RATE_SCHEDULE_INTERVAL_MILLI_SEC, TimeUnit.MILLISECONDS);
+ assertThat(ifRunMoreThanOnce).isTrue();
+ testExecutor.shutdownNow();
+ }
+}