diff options
Diffstat (limited to 'gerrit-lucene/src/main/java/com/google/gerrit/lucene/AbstractLuceneIndex.java')
-rw-r--r-- | gerrit-lucene/src/main/java/com/google/gerrit/lucene/AbstractLuceneIndex.java | 419 |
1 files changed, 0 insertions, 419 deletions
diff --git a/gerrit-lucene/src/main/java/com/google/gerrit/lucene/AbstractLuceneIndex.java b/gerrit-lucene/src/main/java/com/google/gerrit/lucene/AbstractLuceneIndex.java deleted file mode 100644 index 9d474dd8ea..0000000000 --- a/gerrit-lucene/src/main/java/com/google/gerrit/lucene/AbstractLuceneIndex.java +++ /dev/null @@ -1,419 +0,0 @@ -// Copyright (C) 2013 The Android Open Source Project -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package com.google.gerrit.lucene; - -import static com.google.common.util.concurrent.MoreExecutors.directExecutor; -import static java.util.concurrent.TimeUnit.MILLISECONDS; - -import com.google.common.base.Joiner; -import com.google.common.collect.Sets; -import com.google.common.util.concurrent.AbstractFuture; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.ListeningExecutorService; -import com.google.common.util.concurrent.MoreExecutors; -import com.google.common.util.concurrent.ThreadFactoryBuilder; -import com.google.gerrit.index.FieldDef; -import com.google.gerrit.index.FieldType; -import com.google.gerrit.index.Index; -import com.google.gerrit.index.Schema; -import com.google.gerrit.index.Schema.Values; -import com.google.gerrit.server.config.SitePaths; -import com.google.gerrit.server.index.IndexUtils; -import java.io.IOException; -import java.sql.Timestamp; -import java.util.Set; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executor; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import org.apache.lucene.document.Document; -import org.apache.lucene.document.Field; -import org.apache.lucene.document.Field.Store; -import org.apache.lucene.document.IntField; -import org.apache.lucene.document.LongField; -import org.apache.lucene.document.StoredField; -import org.apache.lucene.document.StringField; -import org.apache.lucene.document.TextField; -import org.apache.lucene.index.IndexWriter; -import org.apache.lucene.index.Term; -import org.apache.lucene.index.TrackingIndexWriter; -import org.apache.lucene.search.ControlledRealTimeReopenThread; -import org.apache.lucene.search.IndexSearcher; -import org.apache.lucene.search.ReferenceManager; -import org.apache.lucene.search.ReferenceManager.RefreshListener; -import org.apache.lucene.search.SearcherFactory; -import org.apache.lucene.store.AlreadyClosedException; -import org.apache.lucene.store.Directory; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** Basic Lucene index implementation. */ -public abstract class AbstractLuceneIndex<K, V> implements Index<K, V> { - private static final Logger log = LoggerFactory.getLogger(AbstractLuceneIndex.class); - - static String sortFieldName(FieldDef<?, ?> f) { - return f.getName() + "_SORT"; - } - - private final Schema<V> schema; - private final SitePaths sitePaths; - private final Directory dir; - private final String name; - private final ListeningExecutorService writerThread; - private final TrackingIndexWriter writer; - private final ReferenceManager<IndexSearcher> searcherManager; - private final ControlledRealTimeReopenThread<IndexSearcher> reopenThread; - private final Set<NrtFuture> notDoneNrtFutures; - private ScheduledThreadPoolExecutor autoCommitExecutor; - - AbstractLuceneIndex( - Schema<V> schema, - SitePaths sitePaths, - Directory dir, - String name, - String subIndex, - GerritIndexWriterConfig writerConfig, - SearcherFactory searcherFactory) - throws IOException { - this.schema = schema; - this.sitePaths = sitePaths; - this.dir = dir; - this.name = name; - String index = Joiner.on('_').skipNulls().join(name, subIndex); - IndexWriter delegateWriter; - long commitPeriod = writerConfig.getCommitWithinMs(); - - if (commitPeriod < 0) { - delegateWriter = new AutoCommitWriter(dir, writerConfig.getLuceneConfig()); - } else if (commitPeriod == 0) { - delegateWriter = new AutoCommitWriter(dir, writerConfig.getLuceneConfig(), true); - } else { - final AutoCommitWriter autoCommitWriter = - new AutoCommitWriter(dir, writerConfig.getLuceneConfig()); - delegateWriter = autoCommitWriter; - - autoCommitExecutor = - new ScheduledThreadPoolExecutor( - 1, - new ThreadFactoryBuilder() - .setNameFormat(index + " Commit-%d") - .setDaemon(true) - .build()); - @SuppressWarnings("unused") // Error handling within Runnable. - Future<?> possiblyIgnoredError = - autoCommitExecutor.scheduleAtFixedRate( - () -> { - try { - if (autoCommitWriter.hasUncommittedChanges()) { - autoCommitWriter.manualFlush(); - autoCommitWriter.commit(); - } - } catch (IOException e) { - log.error("Error committing " + index + " Lucene index", e); - } catch (OutOfMemoryError e) { - log.error("Error committing " + index + " Lucene index", e); - try { - autoCommitWriter.close(); - } catch (IOException e2) { - log.error( - "SEVERE: Error closing " - + index - + " Lucene index after OOM;" - + " index may be corrupted.", - e); - } - } - }, - commitPeriod, - commitPeriod, - MILLISECONDS); - } - writer = new TrackingIndexWriter(delegateWriter); - searcherManager = new WrappableSearcherManager(writer.getIndexWriter(), true, searcherFactory); - - notDoneNrtFutures = Sets.newConcurrentHashSet(); - - writerThread = - MoreExecutors.listeningDecorator( - Executors.newFixedThreadPool( - 1, - new ThreadFactoryBuilder() - .setNameFormat(index + " Write-%d") - .setDaemon(true) - .build())); - - reopenThread = - new ControlledRealTimeReopenThread<>( - writer, - searcherManager, - 0.500 /* maximum stale age (seconds) */, - 0.010 /* minimum stale age (seconds) */); - reopenThread.setName(index + " NRT"); - reopenThread.setPriority( - Math.min(Thread.currentThread().getPriority() + 2, Thread.MAX_PRIORITY)); - reopenThread.setDaemon(true); - - // This must be added after the reopen thread is created. The reopen thread - // adds its own listener which copies its internally last-refreshed - // generation to the searching generation. removeIfDone() depends on the - // searching generation being up to date when calling - // reopenThread.waitForGeneration(gen, 0), therefore the reopen thread's - // internal listener needs to be called first. - // TODO(dborowitz): This may have been fixed by - // http://issues.apache.org/jira/browse/LUCENE-5461 - searcherManager.addListener( - new RefreshListener() { - @Override - public void beforeRefresh() throws IOException {} - - @Override - public void afterRefresh(boolean didRefresh) throws IOException { - for (NrtFuture f : notDoneNrtFutures) { - f.removeIfDone(); - } - } - }); - - reopenThread.start(); - } - - @Override - public void markReady(boolean ready) throws IOException { - IndexUtils.setReady(sitePaths, name, schema.getVersion(), ready); - } - - @Override - public void close() { - if (autoCommitExecutor != null) { - autoCommitExecutor.shutdown(); - } - - writerThread.shutdown(); - try { - if (!writerThread.awaitTermination(5, TimeUnit.SECONDS)) { - log.warn("shutting down " + name + " index with pending Lucene writes"); - } - } catch (InterruptedException e) { - log.warn("interrupted waiting for pending Lucene writes of " + name + " index", e); - } - reopenThread.close(); - - // Closing the reopen thread sets its generation to Long.MAX_VALUE, but we - // still need to refresh the searcher manager to let pending NrtFutures - // know. - // - // Any futures created after this method (which may happen due to undefined - // shutdown ordering behavior) will finish immediately, even though they may - // not have flushed. - try { - searcherManager.maybeRefreshBlocking(); - } catch (IOException e) { - log.warn("error finishing pending Lucene writes", e); - } - - try { - writer.getIndexWriter().close(); - } catch (AlreadyClosedException e) { - // Ignore. - } catch (IOException e) { - log.warn("error closing Lucene writer", e); - } - try { - dir.close(); - } catch (IOException e) { - log.warn("error closing Lucene directory", e); - } - } - - ListenableFuture<?> insert(Document doc) { - return submit(() -> writer.addDocument(doc)); - } - - ListenableFuture<?> replace(Term term, Document doc) { - return submit(() -> writer.updateDocument(term, doc)); - } - - ListenableFuture<?> delete(Term term) { - return submit(() -> writer.deleteDocuments(term)); - } - - private ListenableFuture<?> submit(Callable<Long> task) { - ListenableFuture<Long> future = Futures.nonCancellationPropagating(writerThread.submit(task)); - return Futures.transformAsync( - future, - gen -> { - // Tell the reopen thread a future is waiting on this - // generation so it uses the min stale time when refreshing. - reopenThread.waitForGeneration(gen, 0); - return new NrtFuture(gen); - }, - directExecutor()); - } - - @Override - public void deleteAll() throws IOException { - writer.deleteAll(); - } - - public TrackingIndexWriter getWriter() { - return writer; - } - - IndexSearcher acquire() throws IOException { - return searcherManager.acquire(); - } - - void release(IndexSearcher searcher) throws IOException { - searcherManager.release(searcher); - } - - Document toDocument(V obj) { - Document result = new Document(); - for (Values<V> vs : schema.buildFields(obj)) { - if (vs.getValues() != null) { - add(result, vs); - } - } - return result; - } - - void add(Document doc, Values<V> values) { - String name = values.getField().getName(); - FieldType<?> type = values.getField().getType(); - Store store = store(values.getField()); - - if (type == FieldType.INTEGER || type == FieldType.INTEGER_RANGE) { - for (Object value : values.getValues()) { - doc.add(new IntField(name, (Integer) value, store)); - } - } else if (type == FieldType.LONG) { - for (Object value : values.getValues()) { - doc.add(new LongField(name, (Long) value, store)); - } - } else if (type == FieldType.TIMESTAMP) { - for (Object value : values.getValues()) { - doc.add(new LongField(name, ((Timestamp) value).getTime(), store)); - } - } else if (type == FieldType.EXACT || type == FieldType.PREFIX) { - for (Object value : values.getValues()) { - doc.add(new StringField(name, (String) value, store)); - } - } else if (type == FieldType.FULL_TEXT) { - for (Object value : values.getValues()) { - doc.add(new TextField(name, (String) value, store)); - } - } else if (type == FieldType.STORED_ONLY) { - for (Object value : values.getValues()) { - doc.add(new StoredField(name, (byte[]) value)); - } - } else { - throw FieldType.badFieldType(type); - } - } - - private static Field.Store store(FieldDef<?, ?> f) { - return f.isStored() ? Field.Store.YES : Field.Store.NO; - } - - private final class NrtFuture extends AbstractFuture<Void> { - private final long gen; - - NrtFuture(long gen) { - this.gen = gen; - } - - @Override - public Void get() throws InterruptedException, ExecutionException { - if (!isDone()) { - reopenThread.waitForGeneration(gen); - set(null); - } - return super.get(); - } - - @Override - public Void get(long timeout, TimeUnit unit) - throws InterruptedException, TimeoutException, ExecutionException { - if (!isDone()) { - if (!reopenThread.waitForGeneration(gen, (int) unit.toMillis(timeout))) { - throw new TimeoutException(); - } - set(null); - } - return super.get(timeout, unit); - } - - @Override - public boolean isDone() { - if (super.isDone()) { - return true; - } else if (isGenAvailableNowForCurrentSearcher()) { - set(null); - return true; - } else if (!reopenThread.isAlive()) { - setException(new IllegalStateException("NRT thread is dead")); - return true; - } - return false; - } - - @Override - public void addListener(Runnable listener, Executor executor) { - if (isGenAvailableNowForCurrentSearcher() && !isCancelled()) { - set(null); - } else if (!isDone()) { - notDoneNrtFutures.add(this); - } - super.addListener(listener, executor); - } - - @Override - public boolean cancel(boolean mayInterruptIfRunning) { - boolean result = super.cancel(mayInterruptIfRunning); - if (result) { - notDoneNrtFutures.remove(this); - } - return result; - } - - void removeIfDone() { - if (isGenAvailableNowForCurrentSearcher()) { - notDoneNrtFutures.remove(this); - if (!isCancelled()) { - set(null); - } - } - } - - private boolean isGenAvailableNowForCurrentSearcher() { - try { - return reopenThread.waitForGeneration(gen, 0); - } catch (InterruptedException e) { - log.warn("Interrupted waiting for searcher generation", e); - return false; - } - } - } - - @Override - public Schema<V> getSchema() { - return schema; - } -} |