summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorLuca Milanesio <luca.milanesio@gmail.com>2023-01-02 17:01:14 +0000
committerLuca Milanesio <luca.milanesio@gmail.com>2023-01-15 12:31:34 +0000
commit82ea7309752a6c250dbd15ac091ddbd341ecee26 (patch)
tree9f0833def4540a8f000bacd8fb2f16e39c1ad93a
parent456a2c2f44ffcd1dc550d119df21b59835d4065f (diff)
Introduce cache.threads option to enable a custom cache executor
Since the introduction of Caffeine as alternative to Guava in Change 244612, the execution of the cache event listeners moved to a background thread, run by the ForkJoinPool's common pool [1]. The subtle difference has caused issues to the plugins that are registering listeners, like the high-availability and multi-site: the consequences have been quite serious because of the inability to understand if the eviction was caused by a forwarded cache eviction event or by an execution of Gerrit API or other internals that caused the removal of the entry. The use of the JVM common pool has several disadvantages and, under certain conditions [2], it may even lead to deadlocks or unexpected blockages. By introducing the cache.threads option, decouple the cache background threads execution and allow to configure an explicit separate thread pool which can be tuned and decoupled from the rest of the JVM common threads. Also, allow to restore the plugins' cache listeners legacy behaviour without losing the ability to leverage the performance of Caffeine cache vs. the traditional Guava. By default, this change is a NO-OP because it preserves the current behaviour of background execution tasks of the Caffeine cache. Introduce DefaultMemoryCacheFactoryTest class from stable-3.4 for avoiding further conflicts when merging upstream. References: [1] https://github.com/ben-manes/caffeine/wiki/Guava#asynchronous-notifications [2] https://dzone.com/articles/be-aware-of-forkjoinpoolcommonpool Release-Notes: introduce cache.threads option to allow custom executors for Caffeine caches Bug: Issue 16565 Change-Id: I204abd1bdbf2bbed5b3d982d14cbc5549ac96ace
-rw-r--r--Documentation/config-gerrit.txt13
-rw-r--r--java/com/google/gerrit/server/cache/ForwardingRemovalListener.java4
-rw-r--r--java/com/google/gerrit/server/cache/mem/DefaultMemoryCacheFactory.java24
-rw-r--r--javatests/com/google/gerrit/server/cache/mem/DefaultMemoryCacheFactoryTest.java263
4 files changed, 302 insertions, 2 deletions
diff --git a/Documentation/config-gerrit.txt b/Documentation/config-gerrit.txt
index 41435e9894..a050021b94 100644
--- a/Documentation/config-gerrit.txt
+++ b/Documentation/config-gerrit.txt
@@ -701,6 +701,19 @@ By default, true.
[[cache]]
=== Section cache
+[[cache.threads]]cache.threads::
++
+Number of threads to use when running asynchronous cache tasks.
+The threads executor is delegated to when sending removal notifications to listeners,
+when asynchronous computations like refresh, refreshAfterWrite are performed, or when
+performing periodic maintenance.
++
+**NOTE**: Setting it to 0 disables the dedicated thread pool and indexing will be done in the
+same thread as the operation. This may result in evictions taking longer because the
+listeners are executed in the caller's thread.
++
+By default, the JVM common ForkJoinPool is used.
+
[[cache.directory]]cache.directory::
+
Path to a local directory where Gerrit can write cached entities for
diff --git a/java/com/google/gerrit/server/cache/ForwardingRemovalListener.java b/java/com/google/gerrit/server/cache/ForwardingRemovalListener.java
index ee672cd0c2..357cbbb545 100644
--- a/java/com/google/gerrit/server/cache/ForwardingRemovalListener.java
+++ b/java/com/google/gerrit/server/cache/ForwardingRemovalListener.java
@@ -14,6 +14,7 @@
package com.google.gerrit.server.cache;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
@@ -40,7 +41,8 @@ public class ForwardingRemovalListener<K, V> implements RemovalListener<K, V> {
private String pluginName = PluginName.GERRIT;
@Inject
- ForwardingRemovalListener(
+ @VisibleForTesting
+ protected ForwardingRemovalListener(
PluginSetContext<CacheRemovalListener> listeners, @Assisted String cacheName) {
this.listeners = listeners;
this.cacheName = cacheName;
diff --git a/java/com/google/gerrit/server/cache/mem/DefaultMemoryCacheFactory.java b/java/com/google/gerrit/server/cache/mem/DefaultMemoryCacheFactory.java
index 9906b3dc40..afed2f7098 100644
--- a/java/com/google/gerrit/server/cache/mem/DefaultMemoryCacheFactory.java
+++ b/java/com/google/gerrit/server/cache/mem/DefaultMemoryCacheFactory.java
@@ -27,6 +27,7 @@ import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.cache.RemovalNotification;
+import com.google.common.util.concurrent.MoreExecutors;
import com.google.gerrit.common.Nullable;
import com.google.gerrit.server.cache.CacheBackend;
import com.google.gerrit.server.cache.CacheDef;
@@ -34,20 +35,37 @@ import com.google.gerrit.server.cache.ForwardingRemovalListener;
import com.google.gerrit.server.cache.MemoryCacheFactory;
import com.google.gerrit.server.config.ConfigUtil;
import com.google.gerrit.server.config.GerritServerConfig;
+import com.google.gerrit.server.git.WorkQueue;
import com.google.inject.Inject;
import java.time.Duration;
+import java.util.concurrent.Executor;
import org.eclipse.jgit.lib.Config;
class DefaultMemoryCacheFactory implements MemoryCacheFactory {
+ static final String CACHE_EXECUTOR_PREFIX = "InMemoryCacheExecutor";
+ private static final int DEFAULT_CACHE_EXECUTOR_THREADS = -1;
+
private final Config cfg;
private final ForwardingRemovalListener.Factory forwardingRemovalListenerFactory;
+ private int executorThreads;
+ private final Executor executor;
@Inject
DefaultMemoryCacheFactory(
@GerritServerConfig Config config,
- ForwardingRemovalListener.Factory forwardingRemovalListenerFactory) {
+ ForwardingRemovalListener.Factory forwardingRemovalListenerFactory,
+ WorkQueue workQueue) {
this.cfg = config;
this.forwardingRemovalListenerFactory = forwardingRemovalListenerFactory;
+ this.executorThreads = config.getInt("cache", "threads", DEFAULT_CACHE_EXECUTOR_THREADS);
+
+ if (executorThreads == 0) {
+ executor = MoreExecutors.newDirectExecutorService();
+ } else if (executorThreads > DEFAULT_CACHE_EXECUTOR_THREADS) {
+ executor = workQueue.createQueue(executorThreads, CACHE_EXECUTOR_PREFIX);
+ } else {
+ executor = null;
+ }
}
@Override
@@ -114,6 +132,10 @@ class DefaultMemoryCacheFactory implements MemoryCacheFactory {
builder.maximumWeight(
cfg.getLong("cache", def.configKey(), "memoryLimit", def.maximumWeight()));
builder = builder.removalListener(newRemovalListener(def.name()));
+
+ if (executor != null) {
+ builder.executor(executor);
+ }
builder.weigher(newWeigher(def.weigher()));
Duration expireAfterWrite = def.expireAfterWrite();
diff --git a/javatests/com/google/gerrit/server/cache/mem/DefaultMemoryCacheFactoryTest.java b/javatests/com/google/gerrit/server/cache/mem/DefaultMemoryCacheFactoryTest.java
new file mode 100644
index 0000000000..9e345c0af4
--- /dev/null
+++ b/javatests/com/google/gerrit/server/cache/mem/DefaultMemoryCacheFactoryTest.java
@@ -0,0 +1,263 @@
+// Copyright (C) 2022 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.google.gerrit.server.cache.mem;
+
+import static com.google.common.base.Functions.identity;
+import static com.google.common.truth.Truth.assertThat;
+
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.cache.RemovalNotification;
+import com.google.common.cache.Weigher;
+import com.google.common.collect.ImmutableMap;
+import com.google.gerrit.metrics.DisabledMetricMaker;
+import com.google.gerrit.server.cache.CacheBackend;
+import com.google.gerrit.server.cache.CacheDef;
+import com.google.gerrit.server.cache.ForwardingRemovalListener;
+import com.google.gerrit.server.git.WorkQueue;
+import com.google.gerrit.server.util.IdGenerator;
+import com.google.inject.Guice;
+import com.google.inject.TypeLiteral;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import org.apache.mina.util.ConcurrentHashSet;
+import org.eclipse.jgit.lib.Config;
+import org.junit.Before;
+import org.junit.Test;
+
+public class DefaultMemoryCacheFactoryTest {
+
+ private static final String TEST_CACHE = "test-cache";
+ private static final long TEST_TIMEOUT_SEC = 1;
+ private static final int TEST_CACHE_KEY = 1;
+ private static final int TEST_CACHE_VALUE = 2;
+
+ private DefaultMemoryCacheFactory memoryCacheFactory;
+ private DefaultMemoryCacheFactory memoryCacheFactoryDirectExecutor;
+ private DefaultMemoryCacheFactory memoryCacheFactoryWithThreadPool;
+ private Config memoryCacheConfig;
+ private Config memoryCacheConfigDirectExecutor;
+ private Config memoryCacheConfigWithThreadPool;
+ private CyclicBarrier cacheGetStarted;
+ private CyclicBarrier cacheGetCompleted;
+ private CyclicBarrier evictionReceived;
+ private ForwardingRemovalTrackerListener forwardingRemovalListener;
+ private WorkQueue workQueue;
+
+ @Before
+ public void setUp() {
+
+ IdGenerator idGenerator = Guice.createInjector().getInstance(IdGenerator.class);
+ workQueue = new WorkQueue(idGenerator, 10, new DisabledMetricMaker());
+
+ memoryCacheConfig = new Config();
+ memoryCacheConfigDirectExecutor = new Config();
+ memoryCacheConfigDirectExecutor.setInt("cache", null, "threads", 0);
+ memoryCacheConfigWithThreadPool = new Config();
+ memoryCacheConfigWithThreadPool.setInt("cache", null, "threads", 1);
+
+ forwardingRemovalListener = new ForwardingRemovalTrackerListener();
+ memoryCacheFactory =
+ new DefaultMemoryCacheFactory(
+ memoryCacheConfig, (cache) -> forwardingRemovalListener, workQueue);
+ memoryCacheFactoryDirectExecutor =
+ new DefaultMemoryCacheFactory(
+ memoryCacheConfigDirectExecutor, (cache) -> forwardingRemovalListener, workQueue);
+ memoryCacheFactoryWithThreadPool =
+ new DefaultMemoryCacheFactory(
+ memoryCacheConfigWithThreadPool, (cache) -> forwardingRemovalListener, workQueue);
+ cacheGetStarted = new CyclicBarrier(2);
+ cacheGetCompleted = new CyclicBarrier(2);
+ evictionReceived = new CyclicBarrier(2);
+ }
+
+ @Test
+ public void shouldRunEvictionListenerInBackgroundByDefault() throws Exception {
+ shouldRunEvictionListenerInThreadPool(memoryCacheFactory, "ForkJoinPool");
+ }
+
+ @Test
+ public void shouldRunEvictionListenerInThreadPool() throws Exception {
+ shouldRunEvictionListenerInThreadPool(
+ memoryCacheFactoryWithThreadPool, DefaultMemoryCacheFactory.CACHE_EXECUTOR_PREFIX);
+ }
+
+ private void shouldRunEvictionListenerInThreadPool(
+ DefaultMemoryCacheFactory cacheFactory, String threadPoolPrefix) throws Exception {
+ LoadingCache<Integer, Integer> cache =
+ cacheFactory.build(newCacheDef(1), newCacheLoader(identity()), CacheBackend.CAFFEINE);
+
+ cache.put(TEST_CACHE_KEY, TEST_CACHE_VALUE);
+ cache.invalidate(TEST_CACHE_KEY);
+
+ assertThat(forwardingRemovalListener.contains(TEST_CACHE_KEY, TEST_CACHE_VALUE)).isFalse();
+
+ evictionReceived.await(TEST_TIMEOUT_SEC, TimeUnit.SECONDS);
+
+ assertThat(forwardingRemovalListener.contains(TEST_CACHE_KEY, TEST_CACHE_VALUE)).isTrue();
+ assertThat(forwardingRemovalListener.removalThreadName(TEST_CACHE_KEY))
+ .startsWith(threadPoolPrefix);
+ }
+
+ @Test
+ public void shouldRunEvictionListenerWithDirectExecutor() throws Exception {
+ LoadingCache<Integer, Integer> cache =
+ memoryCacheFactoryDirectExecutor.build(
+ newCacheDef(1), newCacheLoader(identity()), CacheBackend.CAFFEINE);
+
+ cache.put(TEST_CACHE_KEY, TEST_CACHE_VALUE);
+ cache.invalidate(TEST_CACHE_KEY);
+
+ assertThat(forwardingRemovalListener.contains(TEST_CACHE_KEY, TEST_CACHE_VALUE)).isTrue();
+ }
+
+ @Test
+ public void shouldLoadAllKeysWithDisabledCache() throws Exception {
+ LoadingCache<Integer, Integer> disabledCache =
+ memoryCacheFactory.build(newCacheDef(0), newCacheLoader(identity()), CacheBackend.CAFFEINE);
+
+ List<Integer> keys = Arrays.asList(1, 2);
+ ImmutableMap<Integer, Integer> entries = disabledCache.getAll(keys);
+
+ assertThat(entries).containsExactly(1, 1, 2, 2);
+ }
+
+ private CacheLoader<Integer, Integer> newCacheLoader(Function<Integer, Integer> loadFunc) {
+ return new CacheLoader<Integer, Integer>() {
+
+ @Override
+ public Integer load(Integer n) throws Exception {
+ Integer v = 0;
+ try {
+ cacheGetStarted.await(TEST_TIMEOUT_SEC, TimeUnit.SECONDS);
+ v = loadFunc.apply(n);
+ cacheGetCompleted.await(TEST_TIMEOUT_SEC, TimeUnit.SECONDS);
+ } catch (TimeoutException | BrokenBarrierException e) {
+ }
+ return v;
+ }
+
+ @Override
+ public Map<Integer, Integer> loadAll(Iterable<? extends Integer> keys) throws Exception {
+ return StreamSupport.stream(keys.spliterator(), false)
+ .collect(Collectors.toMap(identity(), identity()));
+ }
+ };
+ }
+
+ private class ForwardingRemovalTrackerListener extends ForwardingRemovalListener<Object, Object> {
+ private final ConcurrentHashMap<Object, Set<Object>> removalEvents;
+ private final ConcurrentHashMap<Object, String> removalThreads;
+
+ public ForwardingRemovalTrackerListener() {
+ super(null, null);
+
+ removalEvents = new ConcurrentHashMap<>();
+ removalThreads = new ConcurrentHashMap<>();
+ }
+
+ @Override
+ public void onRemoval(RemovalNotification<Object, Object> notification) {
+ Set<Object> setOfValues =
+ removalEvents.computeIfAbsent(
+ notification.getKey(),
+ (key) -> {
+ Set<Object> elements = new ConcurrentHashSet<>();
+ return elements;
+ });
+ setOfValues.add(notification.getValue());
+
+ removalThreads.put(notification.getKey(), Thread.currentThread().getName());
+
+ try {
+ evictionReceived.await(TEST_TIMEOUT_SEC, TimeUnit.SECONDS);
+ } catch (InterruptedException | BrokenBarrierException | TimeoutException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ private boolean contains(Object key, Object value) {
+ return Optional.ofNullable(removalEvents.get(key))
+ .map(sv -> sv.contains(value))
+ .orElse(false);
+ }
+
+ private String removalThreadName(Object key) {
+ return removalThreads.get(key);
+ }
+ }
+
+ private CacheDef<Integer, Integer> newCacheDef(long maximumWeight) {
+ return new CacheDef<Integer, Integer>() {
+
+ @Override
+ public String name() {
+ return TEST_CACHE;
+ }
+
+ @Override
+ public String configKey() {
+ return TEST_CACHE;
+ }
+
+ @Override
+ public TypeLiteral<Integer> keyType() {
+ return null;
+ }
+
+ @Override
+ public TypeLiteral<Integer> valueType() {
+ return null;
+ }
+
+ @Override
+ public long maximumWeight() {
+ return maximumWeight;
+ }
+
+ @Override
+ public Duration expireAfterWrite() {
+ return null;
+ }
+
+ @Override
+ public Duration expireFromMemoryAfterAccess() {
+ return null;
+ }
+
+ @Override
+ public Weigher<Integer, Integer> weigher() {
+ return null;
+ }
+
+ @Override
+ public CacheLoader<Integer, Integer> loader() {
+ return null;
+ }
+ };
+ }
+}