diff options
author | Shawn Pearce <sop@google.com> | 2016-06-20 02:45:00 -0400 |
---|---|---|
committer | Hector Oswaldo Caballero <hector.caballero@ericsson.com> | 2016-09-07 16:38:58 -0400 |
commit | fdfc3738fc32eda68f444ca5b7730786f23008ff (patch) | |
tree | 56ab541d00d0f875d866068d99184d5550c6ef7c | |
parent | 0d8786d34485864d0709b62c8871b022c5b65cf6 (diff) |
Write each Lucene index using a dedicated background thread
Backport of I54296d62fd9206b2ed2bbcbd5bbcc941890206a3 to stable-2.12
Like searches, it is not safe to interrupt the IndexWriter. It also
reads from the NIOFSDirectory which closes file handles if the thread
is interrupted (such as by SSH command being Ctrl-C'd).
Although IndexWriter is thread safe it is essentially single threaded.
Each of these methods acquires a lock on entry, manipulates the index,
and releases the lock. There isn't a lot of value in allowing these to
be running on parallel threads borrowed from Gerrit.
Background (and serialize) all writes onto a single thread to prevent
an interrupt on the application thread from passing into Lucene code.
Change-Id: I2678e1a4c106c027b02fce47d0a570f59f6dde57
-rw-r--r-- | gerrit-lucene/src/main/java/com/google/gerrit/lucene/SubIndex.java | 94 |
1 files changed, 76 insertions, 18 deletions
diff --git a/gerrit-lucene/src/main/java/com/google/gerrit/lucene/SubIndex.java b/gerrit-lucene/src/main/java/com/google/gerrit/lucene/SubIndex.java index bb69533bf8..84a7bda559 100644 --- a/gerrit-lucene/src/main/java/com/google/gerrit/lucene/SubIndex.java +++ b/gerrit-lucene/src/main/java/com/google/gerrit/lucene/SubIndex.java @@ -18,7 +18,11 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS; import com.google.common.collect.Sets; import com.google.common.util.concurrent.AbstractFuture; +import com.google.common.util.concurrent.AsyncFunction; +import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.gerrit.lucene.LuceneChangeIndex.GerritIndexWriterConfig; @@ -40,8 +44,10 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.nio.file.Path; import java.util.Set; +import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; +import java.util.concurrent.Executors; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -51,10 +57,13 @@ public class SubIndex { private static final Logger log = LoggerFactory.getLogger(SubIndex.class); private final Directory dir; + private final String dirName; private final TrackingIndexWriter writer; + private final ListeningExecutorService writerThread; private final ReferenceManager<IndexSearcher> searcherManager; private final ControlledRealTimeReopenThread<IndexSearcher> reopenThread; private final Set<NrtFuture> notDoneNrtFutures; + private ScheduledThreadPoolExecutor autoCommitExecutor; SubIndex(Path path, GerritIndexWriterConfig writerConfig, SearcherFactory searcherFactory) throws IOException { @@ -66,6 +75,7 @@ public class SubIndex { GerritIndexWriterConfig writerConfig, SearcherFactory searcherFactory) throws IOException { this.dir = dir; + this.dirName = dirName; IndexWriter delegateWriter; long commitPeriod = writerConfig.getCommitWithinMs(); @@ -79,11 +89,11 @@ public class SubIndex { new AutoCommitWriter(dir, writerConfig.getLuceneConfig()); delegateWriter = autoCommitWriter; - new ScheduledThreadPoolExecutor(1, new ThreadFactoryBuilder() - .setNameFormat("Commit-%d " + dirName) - .setDaemon(true) - .build()) - .scheduleAtFixedRate(new Runnable() { + autoCommitExecutor = + new ScheduledThreadPoolExecutor(1, new ThreadFactoryBuilder() + .setNameFormat("Commit-%d " + dirName) + .setDaemon(true).build()); + autoCommitExecutor.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { @@ -111,6 +121,13 @@ public class SubIndex { notDoneNrtFutures = Sets.newConcurrentHashSet(); + writerThread = MoreExecutors.listeningDecorator( + Executors.newFixedThreadPool(1, + new ThreadFactoryBuilder() + .setNameFormat("Write-%d " + dirName) + .setDaemon(true) + .build())); + reopenThread = new ControlledRealTimeReopenThread<>( writer, searcherManager, 0.500 /* maximum stale age (seconds) */, @@ -146,6 +163,20 @@ public class SubIndex { } void close() { + if (autoCommitExecutor != null) { + autoCommitExecutor.shutdown(); + } + + writerThread.shutdown(); + try { + if (!writerThread.awaitTermination(5, TimeUnit.SECONDS)) { + log.warn( + "shutting down {} index with pending Lucene writes", dirName); + } + } catch (InterruptedException e) { + log.warn("interrupted waiting for pending Lucene writes of " + dirName + + " index", e); + } reopenThread.close(); // Closing the reopen thread sets its generation to Long.MAX_VALUE, but we @@ -175,16 +206,45 @@ public class SubIndex { } } - ListenableFuture<?> insert(Document doc) throws IOException { - return new NrtFuture(writer.addDocument(doc)); + ListenableFuture<?> insert(final Document doc) { + return submit(new Callable<Long>() { + @Override + public Long call() throws IOException, InterruptedException { + return writer.addDocument(doc); + } + }); } - ListenableFuture<?> replace(Term term, Document doc) throws IOException { - return new NrtFuture(writer.updateDocument(term, doc)); + ListenableFuture<?> replace(final Term term, final Document doc) { + return submit(new Callable<Long>() { + @Override + public Long call() throws IOException, InterruptedException { + return writer.updateDocument(term, doc); + } + }); } - ListenableFuture<?> delete(Term term) throws IOException { - return new NrtFuture(writer.deleteDocuments(term)); + ListenableFuture<?> delete(final Term term) { + return submit(new Callable<Long>() { + @Override + public Long call() throws IOException, InterruptedException { + return writer.deleteDocuments(term); + } + }); + } + + private ListenableFuture<?> submit(Callable<Long> task) { + ListenableFuture<Long> future = + Futures.nonCancellationPropagating(writerThread.submit(task)); + return Futures.transformAsync(future, new AsyncFunction<Long, Void>() { + @Override + public ListenableFuture<Void> apply(Long gen) throws InterruptedException { + // Tell the reopen thread a future is waiting on this + // generation so it uses the min stale time when refreshing. + reopenThread.waitForGeneration(gen, 0); + return new NrtFuture(gen); + } + }); } void deleteAll() throws IOException { @@ -208,9 +268,6 @@ public class SubIndex { NrtFuture(long gen) { this.gen = gen; - // Tell the reopen thread we are waiting on this generation so it uses the - // min stale time when refreshing. - isGenAvailableNowForCurrentSearcher(); } @Override @@ -226,12 +283,10 @@ public class SubIndex { public Void get(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException, ExecutionException { if (!isDone()) { - if (reopenThread.waitForGeneration(gen, - (int) MILLISECONDS.convert(timeout, unit))) { - set(null); - } else { + if (!reopenThread.waitForGeneration(gen, (int) unit.toMillis(timeout))) { throw new TimeoutException(); } + set(null); } return super.get(timeout, unit); } @@ -243,6 +298,9 @@ public class SubIndex { } else if (isGenAvailableNowForCurrentSearcher()) { set(null); return true; + } else if (!reopenThread.isAlive()) { + setException(new IllegalStateException("NRT thread is dead")); + return true; } return false; } |