diff options
Diffstat (limited to 'gerrit-server/src/main/java/com/google/gerrit/server/patch/IntraLineLoader.java')
-rw-r--r-- | gerrit-server/src/main/java/com/google/gerrit/server/patch/IntraLineLoader.java | 176 |
1 files changed, 176 insertions, 0 deletions
diff --git a/gerrit-server/src/main/java/com/google/gerrit/server/patch/IntraLineLoader.java b/gerrit-server/src/main/java/com/google/gerrit/server/patch/IntraLineLoader.java index 0d3afdeed4..0ac1af2a18 100644 --- a/gerrit-server/src/main/java/com/google/gerrit/server/patch/IntraLineLoader.java +++ b/gerrit-server/src/main/java/com/google/gerrit/server/patch/IntraLineLoader.java @@ -16,24 +16,200 @@ package com.google.gerrit.server.patch; import com.google.gerrit.server.cache.EntryCreator; +import com.google.gerrit.server.config.ConfigUtil; +import com.google.gerrit.server.config.GerritServerConfig; +import com.google.inject.Inject; import org.eclipse.jgit.diff.Edit; import org.eclipse.jgit.diff.MyersDiff; import org.eclipse.jgit.diff.ReplaceEdit; +import org.eclipse.jgit.lib.Config; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.regex.Pattern; class IntraLineLoader extends EntryCreator<IntraLineDiffKey, IntraLineDiff> { + private static final Logger log = LoggerFactory + .getLogger(IntraLineLoader.class); + private static final Pattern BLANK_LINE_RE = Pattern .compile("^[ \\t]*(|[{}]|/\\*\\*?|\\*)[ \\t]*$"); private static final Pattern CONTROL_BLOCK_START_RE = Pattern .compile("[{:][ \\t]*$"); + private final BlockingQueue<Worker> workerPool; + private final long timeoutMillis; + + @Inject + IntraLineLoader(final @GerritServerConfig Config cfg) { + final int workers = + cfg.getInt("cache", PatchListCacheImpl.INTRA_NAME, "maxIdleWorkers", + Runtime.getRuntime().availableProcessors() * 3 / 2); + workerPool = new ArrayBlockingQueue<Worker>(workers, true /* fair */); + + timeoutMillis = + ConfigUtil.getTimeUnit(cfg, "cache", PatchListCacheImpl.INTRA_NAME, + "timeout", TimeUnit.MILLISECONDS.convert(5, TimeUnit.SECONDS), + TimeUnit.MILLISECONDS); + } + @Override public IntraLineDiff createEntry(IntraLineDiffKey key) throws Exception { + Worker w = workerPool.poll(); + if (w == null) { + w = new Worker(); + } + + Worker.Result r = w.computeWithTimeout(key, timeoutMillis); + + if (r == Worker.Result.TIMEOUT) { + // Don't keep this thread. We have to murder it unsafely, which + // means its unable to be reused in the future. Return back a + // null result, indicating the cache cannot load this key. + // + return new IntraLineDiff(IntraLineDiff.Status.TIMEOUT); + } + + if (!workerPool.offer(w)) { + // If the idle worker pool is full, terminate this thread. + // + w.end(); + } + + if (r.error != null) { + // If there was an error computing the result, carry it + // up to the caller so the cache knows this key is invalid. + // + throw r.error; + } + + return r.diff; + } + + private static class Worker { + private static final AtomicInteger count = new AtomicInteger(1); + + private final ArrayBlockingQueue<Input> input; + private final ArrayBlockingQueue<Result> result; + private final Thread thread; + + Worker() { + input = new ArrayBlockingQueue<Input>(1); + result = new ArrayBlockingQueue<Result>(1); + + thread = new Thread(new Runnable() { + public void run() { + workerLoop(); + } + }); + thread.setName("IntraLineDiff-" + count.getAndIncrement()); + thread.setDaemon(true); + thread.start(); + } + + Result computeWithTimeout(IntraLineDiffKey key, long timeoutMillis) + throws Exception { + if (!input.offer(new Input(key))) { + log.error("Cannot enqueue task to thread " + thread.getName()); + return null; + } + + Result r = result.poll(timeoutMillis, TimeUnit.MILLISECONDS); + if (r != null) { + return r; + } else { + log.warn(timeoutMillis + " ms timeout reached for IntraLineDiff" + + " in project " + key.getProject().get() // + + " on commit " + key.getCommit().name() // + + " for path " + key.getPath() // + + " comparing " + key.getBlobA().name() // + + ".." + key.getBlobB().name() // + + ". Killing " + thread.getName()); + try { + thread.stop(); + } catch (Throwable error) { + // Ignore any reason the thread won't stop. + log.error("Cannot stop runaway thread " + thread.getName(), error); + } + return Result.TIMEOUT; + } + } + + void end() { + if (!input.offer(Input.END_THREAD)) { + log.error("Cannot gracefully stop thread " + thread.getName()); + } + } + + private void workerLoop() { + try { + for (;;) { + Input in; + try { + in = input.take(); + } catch (InterruptedException e) { + log.error("Unexpected interrupt on " + thread.getName()); + continue; + } + + if (in == Input.END_THREAD) { + return; + } + + Result r; + try { + r = new Result(IntraLineLoader.compute(in.key)); + } catch (Exception error) { + r = new Result(error); + } + + if (!result.offer(r)) { + log.error("Cannot return result from " + thread.getName()); + } + } + } catch (ThreadDeath iHaveBeenShot) { + // Handle thread death by gracefully returning to the caller, + // allowing the thread to be destroyed. + } + } + + private static class Input { + static final Input END_THREAD = new Input(null); + + final IntraLineDiffKey key; + + Input(IntraLineDiffKey key) { + this.key = key; + } + } + + static class Result { + static final Result TIMEOUT = new Result((IntraLineDiff) null); + + final IntraLineDiff diff; + final Exception error; + + Result(IntraLineDiff diff) { + this.diff = diff; + this.error = null; + } + + Result(Exception error) { + this.diff = null; + this.error = error; + } + } + } + + private static IntraLineDiff compute(IntraLineDiffKey key) throws Exception { List<Edit> edits = new ArrayList<Edit>(key.getEdits()); Text aContent = key.getTextA(); Text bContent = key.getTextB(); |