summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorKaushik Lingarkar <kaushikl@codeaurora.org>2021-12-02 11:33:42 -0800
committerKaushik Lingarkar <kaushikl@codeaurora.org>2021-12-16 10:32:48 -0800
commit9878980fd69b8e2d61d84066ab2556beba68d96f (patch)
tree106904431a1ce8cea5e3b90b9595b5da5decb612
parent739555e5e9daccba9d0352830fbe78820e55420d (diff)
AllChangesIndexer: Schedule slices immediately
If a site has one huge repository and several small/medium sized repos, then slice creation for the large repo will effectively block other smaller repos from starting to reindex their changes. With this change, we schedule slices without waiting for any other slice creation. With this change, we can no longer shuffle slices. However, there is some randomness to how slices are processed based on the thread count. Progress monitors are also updated to report progress despite not knowing the total work. The monitors add a '+' to the stats when the total work is not finalized. For example: project-slices: 32% (2167/6649+) changes: 18% (591857/3234259+) On large test-sites with ~3.5m changes and ~15k projects repos on NFS, with loose refs equal to number of changes(created by notedb migration) and with caches (change_kind, diff, diff_summary) populated, this change brings down reindex time from ~165mins to ~125mins. Change-Id: If3187ed9c9953177c270761da243b186627d8638
-rw-r--r--java/com/google/gerrit/server/git/MultiProgressMonitor.java78
-rw-r--r--java/com/google/gerrit/server/index/change/AllChangesIndexer.java136
2 files changed, 144 insertions, 70 deletions
diff --git a/java/com/google/gerrit/server/git/MultiProgressMonitor.java b/java/com/google/gerrit/server/git/MultiProgressMonitor.java
index 5cdd504100..7e5c99fc86 100644
--- a/java/com/google/gerrit/server/git/MultiProgressMonitor.java
+++ b/java/com/google/gerrit/server/git/MultiProgressMonitor.java
@@ -28,6 +28,8 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import org.eclipse.jgit.lib.Constants;
import org.eclipse.jgit.lib.ProgressMonitor;
@@ -123,6 +125,64 @@ public class MultiProgressMonitor {
return count;
}
}
+
+ public int getTotal() {
+ return total;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public String getTotalDisplay(int total) {
+ return String.valueOf(total);
+ }
+ }
+
+ /** Handle for a sub-task whose total work can be updated while the task is in progress. */
+ public class VolatileTask extends Task {
+ protected AtomicInteger volatileTotal;
+ protected AtomicBoolean isTotalFinalized = new AtomicBoolean(false);
+
+ public VolatileTask(String subTaskName) {
+ super(subTaskName, UNKNOWN);
+ volatileTotal = new AtomicInteger(UNKNOWN);
+ }
+
+ /**
+ * Update the total work for this sub-task.
+ *
+ * <p>Intended to be called from a worker thread.
+ *
+ * @param workUnits number of work units to be added to existing total work.
+ */
+ public void updateTotal(int workUnits) {
+ if (!isTotalFinalized.get()) {
+ volatileTotal.addAndGet(workUnits);
+ } else {
+ logger.atWarning().log(
+ "Total work has been finalized on sub-task " + getName() + " and cannot be updated");
+ }
+ }
+
+ /**
+ * Mark the total on this sub-task as unmodifiable.
+ *
+ * <p>Intended to be called from a worker thread.
+ */
+ public void finalizeTotal() {
+ isTotalFinalized.set(true);
+ }
+
+ @Override
+ public int getTotal() {
+ return volatileTotal.get();
+ }
+
+ @Override
+ public String getTotalDisplay(int total) {
+ return super.getTotalDisplay(total) + (isTotalFinalized.get() ? "" : "+");
+ }
}
private final OutputStream out;
@@ -305,6 +365,18 @@ public class MultiProgressMonitor {
}
/**
+ * Begin a sub-task whose total work can be updated.
+ *
+ * @param subTask sub-task name.
+ * @return sub-task handle.
+ */
+ public VolatileTask beginVolatileSubTask(String subTask) {
+ VolatileTask task = new VolatileTask(subTask);
+ tasks.add(task);
+ return task;
+ }
+
+ /**
* End the overall task.
*
* <p>Must be called from a worker thread.
@@ -347,6 +419,7 @@ public class MultiProgressMonitor {
boolean first = true;
for (Task t : tasks) {
int count = t.getCount();
+ int total = t.getTotal();
if (count == 0) {
continue;
}
@@ -361,10 +434,11 @@ public class MultiProgressMonitor {
if (!Strings.isNullOrEmpty(t.name)) {
s.append(t.name).append(": ");
}
- if (t.total == UNKNOWN) {
+ if (total == UNKNOWN) {
s.append(count);
} else {
- s.append(String.format("%d%% (%d/%d)", count * 100 / t.total, count, t.total));
+ s.append(
+ String.format("%d%% (%d/%s)", count * 100 / total, count, t.getTotalDisplay(total)));
}
}
}
diff --git a/java/com/google/gerrit/server/index/change/AllChangesIndexer.java b/java/com/google/gerrit/server/index/change/AllChangesIndexer.java
index f466ad6e4d..d6b8ef9fc7 100644
--- a/java/com/google/gerrit/server/index/change/AllChangesIndexer.java
+++ b/java/com/google/gerrit/server/index/change/AllChangesIndexer.java
@@ -14,7 +14,6 @@
package com.google.gerrit.server.index.change;
-import static com.google.common.base.Preconditions.checkState;
import static com.google.common.util.concurrent.Futures.successfulAsList;
import static com.google.common.util.concurrent.Futures.transform;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
@@ -22,9 +21,8 @@ import static com.google.gerrit.server.git.QueueProvider.QueueType.BATCH;
import com.google.auto.value.AutoValue;
import com.google.common.base.Stopwatch;
-import com.google.common.collect.Sets;
+import com.google.common.collect.ImmutableSortedSet;
import com.google.common.flogger.FluentLogger;
-import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.UncheckedExecutionException;
@@ -34,6 +32,7 @@ import com.google.gerrit.index.SiteIndexer;
import com.google.gerrit.server.git.GitRepositoryManager;
import com.google.gerrit.server.git.MultiProgressMonitor;
import com.google.gerrit.server.git.MultiProgressMonitor.Task;
+import com.google.gerrit.server.git.MultiProgressMonitor.VolatileTask;
import com.google.gerrit.server.index.IndexExecutor;
import com.google.gerrit.server.index.OnlineReindexMode;
import com.google.gerrit.server.notedb.ChangeNotes;
@@ -44,18 +43,13 @@ import com.google.gerrit.server.query.change.ChangeData;
import com.google.inject.Inject;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
-import java.util.Set;
import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.stream.Collectors;
import org.eclipse.jgit.lib.ProgressMonitor;
import org.eclipse.jgit.lib.Repository;
-import org.eclipse.jgit.lib.TextProgressMonitor;
/**
* Implementation that can index all changes on a host or within a project. Used by Gerrit's
@@ -64,6 +58,9 @@ import org.eclipse.jgit.lib.TextProgressMonitor;
*/
public class AllChangesIndexer extends SiteIndexer<Change.Id, ChangeData, ChangeIndex> {
private static final FluentLogger logger = FluentLogger.forEnclosingClass();
+ private MultiProgressMonitor mpm;
+ private VolatileTask doneTask;
+ private Task failedTask;
private static final int PROJECT_SLICE_MAX_REFS = 1000;
private static class ProjectsCollectionFailure extends Exception {
@@ -130,55 +127,18 @@ public class AllChangesIndexer extends SiteIndexer<Change.Id, ChangeData, Change
// in 2020.
Stopwatch sw = Stopwatch.createStarted();
- List<ProjectSlice> projectSlices;
+ AtomicBoolean ok = new AtomicBoolean(true);
+ mpm = new MultiProgressMonitor(progressOut, "Reindexing changes");
+ doneTask = mpm.beginVolatileSubTask("changes");
+ failedTask = mpm.beginSubTask("failed", MultiProgressMonitor.UNKNOWN);
+ List<ListenableFuture<?>> futures;
try {
- projectSlices = new SliceCreator().create();
- } catch (ProjectsCollectionFailure | InterruptedException | ExecutionException e) {
+ futures = new SliceScheduler(index, ok).schedule();
+ } catch (ProjectsCollectionFailure e) {
logger.atSevere().log(e.getMessage());
return Result.create(sw, false, 0, 0);
}
- // Since project slices are created in parallel, they are somewhat shuffled already. However,
- // the number of threads used to create the project slices doesn't guarantee good randomization.
- // If the slices are not shuffled well, then multiple threads would typically work concurrently
- // on different slices of the same project. While this is not a big issue, shuffling the list
- // beforehand helps with ungrouping the project slices, so different slices are less likely to
- // be worked on concurrently.
- // This shuffling gave a 6% runtime reduction for Wikimedia's Gerrit in 2020.
- Collections.shuffle(projectSlices);
- return indexAll(index, projectSlices);
- }
-
- private SiteIndexer.Result indexAll(ChangeIndex index, List<ProjectSlice> projectSlices) {
- Stopwatch sw = Stopwatch.createStarted();
- MultiProgressMonitor mpm = new MultiProgressMonitor(progressOut, "Reindexing changes");
- Task projTask = mpm.beginSubTask("project-slices", projectSlices.size());
- checkState(totalWork >= 0);
- Task doneTask = mpm.beginSubTask(null, totalWork);
- Task failedTask = mpm.beginSubTask("failed", MultiProgressMonitor.UNKNOWN);
-
- List<ListenableFuture<?>> futures = new ArrayList<>();
- AtomicBoolean ok = new AtomicBoolean(true);
-
- for (ProjectSlice projectSlice : projectSlices) {
- Project.NameKey name = projectSlice.name();
- int slice = projectSlice.slice();
- int slices = projectSlice.slices();
- ListenableFuture<?> future =
- executor.submit(
- reindexProject(
- indexerFactory.create(executor, index),
- name,
- slice,
- slices,
- projectSlice.scanResult(),
- doneTask,
- failedTask));
- String description = "project " + name + " (" + slice + "/" + slices + ")";
- addErrorListener(future, description, projTask, ok);
- futures.add(future);
- }
-
try {
mpm.waitFor(
transform(
@@ -308,30 +268,53 @@ public class AllChangesIndexer extends SiteIndexer<Change.Id, ChangeData, Change
}
}
- private class SliceCreator {
- final Set<ProjectSlice> projectSlices = Sets.newConcurrentHashSet();
+ private class SliceScheduler {
+ final ChangeIndex index;
+ final AtomicBoolean ok;
final AtomicInteger changeCount = new AtomicInteger(0);
final AtomicInteger projectsFailed = new AtomicInteger(0);
- final ProgressMonitor pm = new TextProgressMonitor();
-
- private List<ProjectSlice> create()
- throws ProjectsCollectionFailure, InterruptedException, ExecutionException {
- List<ListenableFuture<?>> futures = new ArrayList<>();
- pm.beginTask("Collecting projects", ProgressMonitor.UNKNOWN);
- for (Project.NameKey name : projectCache.all()) {
- futures.add(executor.submit(new ProjectSliceCreator(name)));
+ final List<ListenableFuture<?>> sliceIndexerFutures = new ArrayList<>();
+ final List<ListenableFuture<?>> sliceCreationFutures = new ArrayList<>();
+ VolatileTask projTask = mpm.beginVolatileSubTask("project-slices");
+ Task slicingProjects;
+
+ public SliceScheduler(ChangeIndex index, AtomicBoolean ok) {
+ this.index = index;
+ this.ok = ok;
+ }
+
+ private List<ListenableFuture<?>> schedule() throws ProjectsCollectionFailure {
+ ImmutableSortedSet<Project.NameKey> projects = projectCache.all();
+ int projectCount = projects.size();
+ slicingProjects = mpm.beginSubTask("Slicing projects", projectCount);
+ for (Project.NameKey name : projects) {
+ sliceCreationFutures.add(executor.submit(new ProjectSliceCreator(name)));
}
- Futures.allAsList(futures).get();
+ try {
+ mpm.waitForNonFinalTask(
+ transform(
+ successfulAsList(sliceCreationFutures),
+ x -> {
+ projTask.finalizeTotal();
+ doneTask.finalizeTotal();
+ return null;
+ },
+ directExecutor()));
+ } catch (UncheckedExecutionException e) {
+ logger.atSevere().withCause(e).log("Error project slice creation");
+ ok.set(false);
+ }
- if (projectsFailed.get() > projectCache.all().size() / 2) {
+ if (projectsFailed.get() > projectCount / 2) {
throw new ProjectsCollectionFailure(
"Over 50%% of the projects could not be collected: aborted");
}
- pm.endTask();
+ slicingProjects.endTask();
setTotalWork(changeCount.get());
- return projectSlices.stream().collect(Collectors.toList());
+
+ return sliceIndexerFutures;
}
private class ProjectSliceCreator implements Callable<Void> {
@@ -353,15 +336,32 @@ public class AllChangesIndexer extends SiteIndexer<Change.Id, ChangeData, Change
verboseWriter.println(
"Submitting " + name + " for indexing in " + slices + " slices");
}
+
+ doneTask.updateTotal(size);
+ projTask.updateTotal(slices);
+
for (int slice = 0; slice < slices; slice++) {
- projectSlices.add(ProjectSlice.create(name, slice, slices, sr));
+ ProjectSlice projectSlice = ProjectSlice.create(name, slice, slices, sr);
+ ListenableFuture<?> future =
+ executor.submit(
+ reindexProject(
+ indexerFactory.create(executor, index),
+ name,
+ slice,
+ slices,
+ projectSlice.scanResult(),
+ doneTask,
+ failedTask));
+ String description = "project " + name + " (" + slice + "/" + slices + ")";
+ addErrorListener(future, description, projTask, ok);
+ sliceIndexerFutures.add(future);
}
}
} catch (IOException e) {
logger.atSevere().withCause(e).log("Error collecting project %s", name);
projectsFailed.incrementAndGet();
}
- pm.update(1);
+ slicingProjects.update(1);
return null;
}
}