summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorShawn Pearce <sop@google.com>2016-06-20 02:45:00 -0400
committerHector Oswaldo Caballero <hector.caballero@ericsson.com>2016-09-07 16:38:58 -0400
commitfdfc3738fc32eda68f444ca5b7730786f23008ff (patch)
tree56ab541d00d0f875d866068d99184d5550c6ef7c
parent0d8786d34485864d0709b62c8871b022c5b65cf6 (diff)
Write each Lucene index using a dedicated background thread
Backport of I54296d62fd9206b2ed2bbcbd5bbcc941890206a3 to stable-2.12 Like searches, it is not safe to interrupt the IndexWriter. It also reads from the NIOFSDirectory which closes file handles if the thread is interrupted (such as by SSH command being Ctrl-C'd). Although IndexWriter is thread safe it is essentially single threaded. Each of these methods acquires a lock on entry, manipulates the index, and releases the lock. There isn't a lot of value in allowing these to be running on parallel threads borrowed from Gerrit. Background (and serialize) all writes onto a single thread to prevent an interrupt on the application thread from passing into Lucene code. Change-Id: I2678e1a4c106c027b02fce47d0a570f59f6dde57
-rw-r--r--gerrit-lucene/src/main/java/com/google/gerrit/lucene/SubIndex.java94
1 files changed, 76 insertions, 18 deletions
diff --git a/gerrit-lucene/src/main/java/com/google/gerrit/lucene/SubIndex.java b/gerrit-lucene/src/main/java/com/google/gerrit/lucene/SubIndex.java
index bb69533bf8..84a7bda559 100644
--- a/gerrit-lucene/src/main/java/com/google/gerrit/lucene/SubIndex.java
+++ b/gerrit-lucene/src/main/java/com/google/gerrit/lucene/SubIndex.java
@@ -18,7 +18,11 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.AbstractFuture;
+import com.google.common.util.concurrent.AsyncFunction;
+import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.gerrit.lucene.LuceneChangeIndex.GerritIndexWriterConfig;
@@ -40,8 +44,10 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.file.Path;
import java.util.Set;
+import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -51,10 +57,13 @@ public class SubIndex {
private static final Logger log = LoggerFactory.getLogger(SubIndex.class);
private final Directory dir;
+ private final String dirName;
private final TrackingIndexWriter writer;
+ private final ListeningExecutorService writerThread;
private final ReferenceManager<IndexSearcher> searcherManager;
private final ControlledRealTimeReopenThread<IndexSearcher> reopenThread;
private final Set<NrtFuture> notDoneNrtFutures;
+ private ScheduledThreadPoolExecutor autoCommitExecutor;
SubIndex(Path path, GerritIndexWriterConfig writerConfig,
SearcherFactory searcherFactory) throws IOException {
@@ -66,6 +75,7 @@ public class SubIndex {
GerritIndexWriterConfig writerConfig,
SearcherFactory searcherFactory) throws IOException {
this.dir = dir;
+ this.dirName = dirName;
IndexWriter delegateWriter;
long commitPeriod = writerConfig.getCommitWithinMs();
@@ -79,11 +89,11 @@ public class SubIndex {
new AutoCommitWriter(dir, writerConfig.getLuceneConfig());
delegateWriter = autoCommitWriter;
- new ScheduledThreadPoolExecutor(1, new ThreadFactoryBuilder()
- .setNameFormat("Commit-%d " + dirName)
- .setDaemon(true)
- .build())
- .scheduleAtFixedRate(new Runnable() {
+ autoCommitExecutor =
+ new ScheduledThreadPoolExecutor(1, new ThreadFactoryBuilder()
+ .setNameFormat("Commit-%d " + dirName)
+ .setDaemon(true).build());
+ autoCommitExecutor.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
@@ -111,6 +121,13 @@ public class SubIndex {
notDoneNrtFutures = Sets.newConcurrentHashSet();
+ writerThread = MoreExecutors.listeningDecorator(
+ Executors.newFixedThreadPool(1,
+ new ThreadFactoryBuilder()
+ .setNameFormat("Write-%d " + dirName)
+ .setDaemon(true)
+ .build()));
+
reopenThread = new ControlledRealTimeReopenThread<>(
writer, searcherManager,
0.500 /* maximum stale age (seconds) */,
@@ -146,6 +163,20 @@ public class SubIndex {
}
void close() {
+ if (autoCommitExecutor != null) {
+ autoCommitExecutor.shutdown();
+ }
+
+ writerThread.shutdown();
+ try {
+ if (!writerThread.awaitTermination(5, TimeUnit.SECONDS)) {
+ log.warn(
+ "shutting down {} index with pending Lucene writes", dirName);
+ }
+ } catch (InterruptedException e) {
+ log.warn("interrupted waiting for pending Lucene writes of " + dirName +
+ " index", e);
+ }
reopenThread.close();
// Closing the reopen thread sets its generation to Long.MAX_VALUE, but we
@@ -175,16 +206,45 @@ public class SubIndex {
}
}
- ListenableFuture<?> insert(Document doc) throws IOException {
- return new NrtFuture(writer.addDocument(doc));
+ ListenableFuture<?> insert(final Document doc) {
+ return submit(new Callable<Long>() {
+ @Override
+ public Long call() throws IOException, InterruptedException {
+ return writer.addDocument(doc);
+ }
+ });
}
- ListenableFuture<?> replace(Term term, Document doc) throws IOException {
- return new NrtFuture(writer.updateDocument(term, doc));
+ ListenableFuture<?> replace(final Term term, final Document doc) {
+ return submit(new Callable<Long>() {
+ @Override
+ public Long call() throws IOException, InterruptedException {
+ return writer.updateDocument(term, doc);
+ }
+ });
}
- ListenableFuture<?> delete(Term term) throws IOException {
- return new NrtFuture(writer.deleteDocuments(term));
+ ListenableFuture<?> delete(final Term term) {
+ return submit(new Callable<Long>() {
+ @Override
+ public Long call() throws IOException, InterruptedException {
+ return writer.deleteDocuments(term);
+ }
+ });
+ }
+
+ private ListenableFuture<?> submit(Callable<Long> task) {
+ ListenableFuture<Long> future =
+ Futures.nonCancellationPropagating(writerThread.submit(task));
+ return Futures.transformAsync(future, new AsyncFunction<Long, Void>() {
+ @Override
+ public ListenableFuture<Void> apply(Long gen) throws InterruptedException {
+ // Tell the reopen thread a future is waiting on this
+ // generation so it uses the min stale time when refreshing.
+ reopenThread.waitForGeneration(gen, 0);
+ return new NrtFuture(gen);
+ }
+ });
}
void deleteAll() throws IOException {
@@ -208,9 +268,6 @@ public class SubIndex {
NrtFuture(long gen) {
this.gen = gen;
- // Tell the reopen thread we are waiting on this generation so it uses the
- // min stale time when refreshing.
- isGenAvailableNowForCurrentSearcher();
}
@Override
@@ -226,12 +283,10 @@ public class SubIndex {
public Void get(long timeout, TimeUnit unit) throws InterruptedException,
TimeoutException, ExecutionException {
if (!isDone()) {
- if (reopenThread.waitForGeneration(gen,
- (int) MILLISECONDS.convert(timeout, unit))) {
- set(null);
- } else {
+ if (!reopenThread.waitForGeneration(gen, (int) unit.toMillis(timeout))) {
throw new TimeoutException();
}
+ set(null);
}
return super.get(timeout, unit);
}
@@ -243,6 +298,9 @@ public class SubIndex {
} else if (isGenAvailableNowForCurrentSearcher()) {
set(null);
return true;
+ } else if (!reopenThread.isAlive()) {
+ setException(new IllegalStateException("NRT thread is dead"));
+ return true;
}
return false;
}