summaryrefslogtreecommitdiffstats
path: root/gerrit-server/src/main/java/com/google/gerrit/server/patch/IntraLineLoader.java
diff options
context:
space:
mode:
Diffstat (limited to 'gerrit-server/src/main/java/com/google/gerrit/server/patch/IntraLineLoader.java')
-rw-r--r--gerrit-server/src/main/java/com/google/gerrit/server/patch/IntraLineLoader.java176
1 files changed, 176 insertions, 0 deletions
diff --git a/gerrit-server/src/main/java/com/google/gerrit/server/patch/IntraLineLoader.java b/gerrit-server/src/main/java/com/google/gerrit/server/patch/IntraLineLoader.java
index 0d3afdeed4..0ac1af2a18 100644
--- a/gerrit-server/src/main/java/com/google/gerrit/server/patch/IntraLineLoader.java
+++ b/gerrit-server/src/main/java/com/google/gerrit/server/patch/IntraLineLoader.java
@@ -16,24 +16,200 @@
package com.google.gerrit.server.patch;
import com.google.gerrit.server.cache.EntryCreator;
+import com.google.gerrit.server.config.ConfigUtil;
+import com.google.gerrit.server.config.GerritServerConfig;
+import com.google.inject.Inject;
import org.eclipse.jgit.diff.Edit;
import org.eclipse.jgit.diff.MyersDiff;
import org.eclipse.jgit.diff.ReplaceEdit;
+import org.eclipse.jgit.lib.Config;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Pattern;
class IntraLineLoader extends EntryCreator<IntraLineDiffKey, IntraLineDiff> {
+ private static final Logger log = LoggerFactory
+ .getLogger(IntraLineLoader.class);
+
private static final Pattern BLANK_LINE_RE = Pattern
.compile("^[ \\t]*(|[{}]|/\\*\\*?|\\*)[ \\t]*$");
private static final Pattern CONTROL_BLOCK_START_RE = Pattern
.compile("[{:][ \\t]*$");
+ private final BlockingQueue<Worker> workerPool;
+ private final long timeoutMillis;
+
+ @Inject
+ IntraLineLoader(final @GerritServerConfig Config cfg) {
+ final int workers =
+ cfg.getInt("cache", PatchListCacheImpl.INTRA_NAME, "maxIdleWorkers",
+ Runtime.getRuntime().availableProcessors() * 3 / 2);
+ workerPool = new ArrayBlockingQueue<Worker>(workers, true /* fair */);
+
+ timeoutMillis =
+ ConfigUtil.getTimeUnit(cfg, "cache", PatchListCacheImpl.INTRA_NAME,
+ "timeout", TimeUnit.MILLISECONDS.convert(5, TimeUnit.SECONDS),
+ TimeUnit.MILLISECONDS);
+ }
+
@Override
public IntraLineDiff createEntry(IntraLineDiffKey key) throws Exception {
+ Worker w = workerPool.poll();
+ if (w == null) {
+ w = new Worker();
+ }
+
+ Worker.Result r = w.computeWithTimeout(key, timeoutMillis);
+
+ if (r == Worker.Result.TIMEOUT) {
+ // Don't keep this thread. We have to murder it unsafely, which
+ // means its unable to be reused in the future. Return back a
+ // null result, indicating the cache cannot load this key.
+ //
+ return new IntraLineDiff(IntraLineDiff.Status.TIMEOUT);
+ }
+
+ if (!workerPool.offer(w)) {
+ // If the idle worker pool is full, terminate this thread.
+ //
+ w.end();
+ }
+
+ if (r.error != null) {
+ // If there was an error computing the result, carry it
+ // up to the caller so the cache knows this key is invalid.
+ //
+ throw r.error;
+ }
+
+ return r.diff;
+ }
+
+ private static class Worker {
+ private static final AtomicInteger count = new AtomicInteger(1);
+
+ private final ArrayBlockingQueue<Input> input;
+ private final ArrayBlockingQueue<Result> result;
+ private final Thread thread;
+
+ Worker() {
+ input = new ArrayBlockingQueue<Input>(1);
+ result = new ArrayBlockingQueue<Result>(1);
+
+ thread = new Thread(new Runnable() {
+ public void run() {
+ workerLoop();
+ }
+ });
+ thread.setName("IntraLineDiff-" + count.getAndIncrement());
+ thread.setDaemon(true);
+ thread.start();
+ }
+
+ Result computeWithTimeout(IntraLineDiffKey key, long timeoutMillis)
+ throws Exception {
+ if (!input.offer(new Input(key))) {
+ log.error("Cannot enqueue task to thread " + thread.getName());
+ return null;
+ }
+
+ Result r = result.poll(timeoutMillis, TimeUnit.MILLISECONDS);
+ if (r != null) {
+ return r;
+ } else {
+ log.warn(timeoutMillis + " ms timeout reached for IntraLineDiff"
+ + " in project " + key.getProject().get() //
+ + " on commit " + key.getCommit().name() //
+ + " for path " + key.getPath() //
+ + " comparing " + key.getBlobA().name() //
+ + ".." + key.getBlobB().name() //
+ + ". Killing " + thread.getName());
+ try {
+ thread.stop();
+ } catch (Throwable error) {
+ // Ignore any reason the thread won't stop.
+ log.error("Cannot stop runaway thread " + thread.getName(), error);
+ }
+ return Result.TIMEOUT;
+ }
+ }
+
+ void end() {
+ if (!input.offer(Input.END_THREAD)) {
+ log.error("Cannot gracefully stop thread " + thread.getName());
+ }
+ }
+
+ private void workerLoop() {
+ try {
+ for (;;) {
+ Input in;
+ try {
+ in = input.take();
+ } catch (InterruptedException e) {
+ log.error("Unexpected interrupt on " + thread.getName());
+ continue;
+ }
+
+ if (in == Input.END_THREAD) {
+ return;
+ }
+
+ Result r;
+ try {
+ r = new Result(IntraLineLoader.compute(in.key));
+ } catch (Exception error) {
+ r = new Result(error);
+ }
+
+ if (!result.offer(r)) {
+ log.error("Cannot return result from " + thread.getName());
+ }
+ }
+ } catch (ThreadDeath iHaveBeenShot) {
+ // Handle thread death by gracefully returning to the caller,
+ // allowing the thread to be destroyed.
+ }
+ }
+
+ private static class Input {
+ static final Input END_THREAD = new Input(null);
+
+ final IntraLineDiffKey key;
+
+ Input(IntraLineDiffKey key) {
+ this.key = key;
+ }
+ }
+
+ static class Result {
+ static final Result TIMEOUT = new Result((IntraLineDiff) null);
+
+ final IntraLineDiff diff;
+ final Exception error;
+
+ Result(IntraLineDiff diff) {
+ this.diff = diff;
+ this.error = null;
+ }
+
+ Result(Exception error) {
+ this.diff = null;
+ this.error = error;
+ }
+ }
+ }
+
+ private static IntraLineDiff compute(IntraLineDiffKey key) throws Exception {
List<Edit> edits = new ArrayList<Edit>(key.getEdits());
Text aContent = key.getTextA();
Text bContent = key.getTextB();