summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorChristian Aistleitner <christian@quelltextlich.at>2020-06-14 22:23:24 +0200
committerKaushik Lingarkar <kaushikl@codeaurora.org>2020-07-10 14:52:46 -0700
commit20784548c3fb643785e1e17acdd90b6410932333 (patch)
tree6db798eda52c7ea0f00ed3a66e4740b443e244c6
parent9407880fa6783a7601b1ee24463ee4081744729c (diff)
When reindexing changes, use multiple threads per project
Each project's changes were reindexed on a single thread. This might leave most threads of the pool idling when reindexing a site with one big and many small projects. In the beginning, all CPUs are busy reindexing projects. But once the small projects have been reindexed, one thread is still working alone on the big project, while the other threads are idle. To avoid this idling we split the big projects into smaller parts and let the thread pool index these parts. Thereby also the reindexing of big projects can take advantage of more CPUs. Change-Id: Ic7b36b5b8badab502370d79085f329f9b8c70d9d (cherry picked from commit 3679947c7f7de39183a0f5b8d2c16d5e6a3cec4e)
-rw-r--r--java/com/google/gerrit/server/index/change/AllChangesIndexer.java128
-rw-r--r--java/com/google/gerrit/server/notedb/ChangeNotes.java37
2 files changed, 128 insertions, 37 deletions
diff --git a/java/com/google/gerrit/server/index/change/AllChangesIndexer.java b/java/com/google/gerrit/server/index/change/AllChangesIndexer.java
index 2f23ad857d..7267ae2916 100644
--- a/java/com/google/gerrit/server/index/change/AllChangesIndexer.java
+++ b/java/com/google/gerrit/server/index/change/AllChangesIndexer.java
@@ -21,8 +21,8 @@ import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static com.google.gerrit.server.git.QueueProvider.QueueType.BATCH;
import com.google.common.base.Stopwatch;
-import com.google.common.collect.ComparisonChain;
import com.google.common.flogger.FluentLogger;
+import com.google.common.primitives.Ints;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.gerrit.index.SiteIndexer;
@@ -43,10 +43,9 @@ import com.google.gwtorm.server.SchemaFactory;
import com.google.inject.Inject;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Objects;
-import java.util.SortedSet;
-import java.util.TreeSet;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.RejectedExecutionException;
@@ -58,6 +57,7 @@ import org.eclipse.jgit.lib.TextProgressMonitor;
public class AllChangesIndexer extends SiteIndexer<Change.Id, ChangeData, ChangeIndex> {
private static final FluentLogger logger = FluentLogger.forEnclosingClass();
+ private static final int PROJECT_SLICE_MAX_REFS = 1000;
private final SchemaFactory<ReviewDb> schemaFactory;
private final ChangeData.Factory changeDataFactory;
@@ -85,22 +85,27 @@ public class AllChangesIndexer extends SiteIndexer<Change.Id, ChangeData, Change
this.projectCache = projectCache;
}
- private static class ProjectHolder implements Comparable<ProjectHolder> {
- final Project.NameKey name;
- private final long size;
+ private static class ProjectSlice {
+ private final Project.NameKey name;
+ private final int slice;
+ private final int slices;
- ProjectHolder(Project.NameKey name, long size) {
+ ProjectSlice(Project.NameKey name, int slice, int slices) {
this.name = name;
- this.size = size;
+ this.slice = slice;
+ this.slices = slices;
}
- @Override
- public int compareTo(ProjectHolder other) {
- // Sort projects based on size first to maximize utilization of threads early on.
- return ComparisonChain.start()
- .compare(other.size, size)
- .compare(other.name.get(), name.get())
- .result();
+ public Project.NameKey getName() {
+ return name;
+ }
+
+ public int getSlice() {
+ return slice;
+ }
+
+ public int getSlices() {
+ return slices;
}
}
@@ -108,19 +113,39 @@ public class AllChangesIndexer extends SiteIndexer<Change.Id, ChangeData, Change
public Result indexAll(ChangeIndex index) {
ProgressMonitor pm = new TextProgressMonitor();
pm.beginTask("Collecting projects", ProgressMonitor.UNKNOWN);
- SortedSet<ProjectHolder> projects = new TreeSet<>();
+ List<ProjectSlice> projectSlices = new ArrayList<>();
int changeCount = 0;
Stopwatch sw = Stopwatch.createStarted();
int projectsFailed = 0;
for (Project.NameKey name : projectCache.all()) {
try (Repository repo = repoManager.openRepository(name)) {
- long size = estimateSize(repo);
+ // The simplest approach to distribute indexing would be to let each thread grab a project
+ // and index it fully. But if a site has one big project and 100s of small projects, then
+ // in the beginning all CPUs would be busy reindexing projects. But soon enough all small
+ // projects have been reindexed, and only the thread that reindexes the big project is
+ // still working. The other threads would idle. Reindexing the big project on a single
+ // thread becomes the critical path. Bringing in more CPUs would not speed up things.
+ //
+ // To avoid such situations, we split big repos into smaller parts and let
+ // the thread pool index these smaller parts. This splitting introduces an overhead in the
+ // workload setup and there might be additional slow-downs from multiple threads
+ // concurrently working on different parts of the same project. But for Wikimedia's Gerrit,
+ // which had 2 big projects, many middle sized ones, and lots of smaller ones, the
+ // splitting of repos into smaller parts reduced indexing time from 1.5 hours to 55 minutes
+ // in 2020.
+ int size = estimateSize(repo);
changeCount += size;
- projects.add(new ProjectHolder(name, size));
+ int slices = 1 + size / PROJECT_SLICE_MAX_REFS;
+ if (slices > 1) {
+ verboseWriter.println("Submitting " + name + " for indexing in " + slices + " slices");
+ }
+ for (int slice = 0; slice < slices; slice++) {
+ projectSlices.add(new ProjectSlice(name, slice, slices));
+ }
} catch (IOException e) {
logger.atSevere().withCause(e).log("Error collecting project %s", name);
projectsFailed++;
- if (projectsFailed > projects.size() / 2) {
+ if (projectsFailed > projectCache.all().size() / 2) {
logger.atSevere().log("Over 50%% of the projects could not be collected: aborted");
return new Result(sw, false, 0, 0);
}
@@ -129,24 +154,34 @@ public class AllChangesIndexer extends SiteIndexer<Change.Id, ChangeData, Change
}
pm.endTask();
setTotalWork(changeCount);
- return indexAll(index, projects);
+
+ // projectSlices are currently grouped by projects. First all slices for project1, followed
+ // by all slices for project2, and so on. As workers pick tasks sequentially, multiple threads
+ // would typically work concurrently on different slices of the same project. While this is not
+ // a big issue, shuffling the list beforehand helps with ungrouping the project slices, so
+ // different slices are less likely to be worked on concurrently.
+ // This shuffling gave a 6% runtime reduction for Wikimedia's Gerrit in 2020.
+ Collections.shuffle(projectSlices);
+ return indexAll(index, projectSlices);
}
- private long estimateSize(Repository repo) throws IOException {
+ private int estimateSize(Repository repo) throws IOException {
// Estimate size based on IDs that show up in ref names. This is not perfect, since patch set
// refs may exist for changes whose metadata was never successfully stored. But that's ok, as
// the estimate is just used as a heuristic for sorting projects.
- return repo.getRefDatabase().getRefsByPrefix(RefNames.REFS_CHANGES).stream()
- .map(r -> Change.Id.fromRef(r.getName()))
- .filter(Objects::nonNull)
- .distinct()
- .count();
+ long size =
+ repo.getRefDatabase().getRefsByPrefix(RefNames.REFS_CHANGES).stream()
+ .map(r -> Change.Id.fromRef(r.getName()))
+ .filter(Objects::nonNull)
+ .distinct()
+ .count();
+ return Ints.saturatedCast(size);
}
- private SiteIndexer.Result indexAll(ChangeIndex index, SortedSet<ProjectHolder> projects) {
+ private SiteIndexer.Result indexAll(ChangeIndex index, List<ProjectSlice> projectSlices) {
Stopwatch sw = Stopwatch.createStarted();
MultiProgressMonitor mpm = new MultiProgressMonitor(progressOut, "Reindexing changes");
- Task projTask = mpm.beginSubTask("projects", projects.size());
+ Task projTask = mpm.beginSubTask("project-slices", projectSlices.size());
checkState(totalWork >= 0);
Task doneTask = mpm.beginSubTask(null, totalWork);
Task failedTask = mpm.beginSubTask("failed", MultiProgressMonitor.UNKNOWN);
@@ -154,12 +189,21 @@ public class AllChangesIndexer extends SiteIndexer<Change.Id, ChangeData, Change
List<ListenableFuture<?>> futures = new ArrayList<>();
AtomicBoolean ok = new AtomicBoolean(true);
- for (ProjectHolder project : projects) {
+ for (ProjectSlice projectSlice : projectSlices) {
+ Project.NameKey name = projectSlice.getName();
+ int slice = projectSlice.getSlice();
+ int slices = projectSlice.getSlices();
ListenableFuture<?> future =
executor.submit(
reindexProject(
- indexerFactory.create(executor, index), project.name, doneTask, failedTask));
- addErrorListener(future, "project " + project.name, projTask, ok);
+ indexerFactory.create(executor, index),
+ name,
+ slice,
+ slices,
+ doneTask,
+ failedTask));
+ String description = "project " + name + " (" + slice + "/" + slices + ")";
+ addErrorListener(future, description, projTask, ok);
futures.add(future);
}
@@ -194,22 +238,38 @@ public class AllChangesIndexer extends SiteIndexer<Change.Id, ChangeData, Change
public Callable<Void> reindexProject(
ChangeIndexer indexer, Project.NameKey project, Task done, Task failed) {
- return new ProjectIndexer(indexer, project, done, failed);
+ return reindexProject(indexer, project, 0, 1, done, failed);
+ }
+
+ public Callable<Void> reindexProject(
+ ChangeIndexer indexer,
+ Project.NameKey project,
+ int slice,
+ int slices,
+ Task done,
+ Task failed) {
+ return new ProjectIndexer(indexer, project, slice, slices, done, failed);
}
private class ProjectIndexer implements Callable<Void> {
private final ChangeIndexer indexer;
private final Project.NameKey project;
+ private final int slice;
+ private final int slices;
private final ProgressMonitor done;
private final ProgressMonitor failed;
private ProjectIndexer(
ChangeIndexer indexer,
Project.NameKey project,
+ int slice,
+ int slices,
ProgressMonitor done,
ProgressMonitor failed) {
this.indexer = indexer;
this.project = project;
+ this.slice = slice;
+ this.slices = slices;
this.done = done;
this.failed = failed;
}
@@ -225,7 +285,9 @@ public class AllChangesIndexer extends SiteIndexer<Change.Id, ChangeData, Change
// It does mean that reindexing after invalidating the DiffSummary cache will be expensive,
// but the goal is to invalidate that cache as infrequently as we possibly can. And besides,
// we don't have concrete proof that improving packfile locality would help.
- notesFactory.scan(repo, db, project).forEach(r -> index(db, r));
+ notesFactory
+ .scan(repo, db, project, id -> (id.get() % slices) == slice)
+ .forEach(r -> index(db, r));
} catch (RepositoryNotFoundException rnfe) {
logger.atSevere().log(rnfe.getMessage());
} finally {
diff --git a/java/com/google/gerrit/server/notedb/ChangeNotes.java b/java/com/google/gerrit/server/notedb/ChangeNotes.java
index 086b2e28f2..d548bf3fa9 100644
--- a/java/com/google/gerrit/server/notedb/ChangeNotes.java
+++ b/java/com/google/gerrit/server/notedb/ChangeNotes.java
@@ -305,19 +305,35 @@ public class ChangeNotes extends AbstractChangeNotes<ChangeNotes> {
public Stream<ChangeNotesResult> scan(Repository repo, ReviewDb db, Project.NameKey project)
throws IOException {
- return args.migration.readChanges() ? scanNoteDb(repo, db, project) : scanReviewDb(repo, db);
+ return scan(repo, db, project, null);
}
- private Stream<ChangeNotesResult> scanReviewDb(Repository repo, ReviewDb db)
+ public Stream<ChangeNotesResult> scan(
+ Repository repo,
+ ReviewDb db,
+ Project.NameKey project,
+ Predicate<Change.Id> changeIdPredicate)
throws IOException {
+ return args.migration.readChanges()
+ ? scanNoteDb(repo, db, project, changeIdPredicate)
+ : scanReviewDb(repo, db, changeIdPredicate);
+ }
+
+ private Stream<ChangeNotesResult> scanReviewDb(
+ Repository repo, ReviewDb db, Predicate<Change.Id> changeIdPredicate) throws IOException {
// Scan IDs that might exist in ReviewDb, assuming that each change has at least one patch set
// ref. Not all changes might exist: some patch set refs might have been written where the
// corresponding ReviewDb write failed. These will be silently filtered out by the batch get
// call below, which is intended.
Set<Change.Id> ids = scanChangeIds(repo).fromPatchSetRefs();
+ Stream<Change.Id> idStream = ids.stream();
+ if (changeIdPredicate != null) {
+ idStream = idStream.filter(changeIdPredicate);
+ }
+
// A batch size of N may overload get(Iterable), so use something smaller, but still >1.
- return Streams.stream(Iterators.partition(ids.iterator(), 30))
+ return Streams.stream(Iterators.partition(idStream.iterator(), 30))
.flatMap(
batch -> {
try {
@@ -333,10 +349,23 @@ public class ChangeNotes extends AbstractChangeNotes<ChangeNotes> {
private Stream<ChangeNotesResult> scanNoteDb(
Repository repo, ReviewDb db, Project.NameKey project) throws IOException {
+ return scanNoteDb(repo, db, project, null);
+ }
+
+ private Stream<ChangeNotesResult> scanNoteDb(
+ Repository repo,
+ ReviewDb db,
+ Project.NameKey project,
+ Predicate<Change.Id> changeIdPredicate)
+ throws IOException {
ScanResult sr = scanChangeIds(repo);
PrimaryStorage defaultStorage = args.migration.changePrimaryStorage();
- return sr.all().stream()
+ Stream<Change.Id> idStream = sr.all().stream();
+ if (changeIdPredicate != null) {
+ idStream = idStream.filter(changeIdPredicate);
+ }
+ return idStream
.map(id -> scanOneNoteDbChange(db, project, sr, defaultStorage, id))
.filter(Objects::nonNull);
}