diff options
author | Kaushik Lingarkar <kaushikl@codeaurora.org> | 2020-07-17 12:13:33 -0700 |
---|---|---|
committer | Kaushik Lingarkar <kaushikl@codeaurora.org> | 2020-07-17 12:13:33 -0700 |
commit | 8c09beda773527b9ed539ff65f7a51a55ff26982 (patch) | |
tree | 37d51e90cbc660ee894a7bc2e3170372e0950c17 | |
parent | 756cbad22570dd57c7f6a402574ebbb00e66aa0f (diff) | |
parent | 413a4761df8d70f252c332c789f0e572bb55f336 (diff) |
Merge branch 'stable-3.0' into stable-3.1
* stable-3.0:
Update git submodules
When reindexing changes, use multiple threads per project
Change-Id: I9f7ae6cef8010e0ff54e0a806aabe49e940d31ea
-rw-r--r-- | java/com/google/gerrit/server/index/change/AllChangesIndexer.java | 109 | ||||
-rw-r--r-- | java/com/google/gerrit/server/notedb/ChangeNotes.java | 13 |
2 files changed, 94 insertions, 28 deletions
diff --git a/java/com/google/gerrit/server/index/change/AllChangesIndexer.java b/java/com/google/gerrit/server/index/change/AllChangesIndexer.java index 8434c10109..005f4c5691 100644 --- a/java/com/google/gerrit/server/index/change/AllChangesIndexer.java +++ b/java/com/google/gerrit/server/index/change/AllChangesIndexer.java @@ -21,7 +21,6 @@ import static com.google.common.util.concurrent.MoreExecutors.directExecutor; import static com.google.gerrit.server.git.QueueProvider.QueueType.BATCH; import com.google.common.base.Stopwatch; -import com.google.common.collect.ComparisonChain; import com.google.common.flogger.FluentLogger; import com.google.common.primitives.Ints; import com.google.common.util.concurrent.ListenableFuture; @@ -42,10 +41,9 @@ import com.google.gerrit.server.query.change.ChangeData; import com.google.inject.Inject; import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Objects; -import java.util.SortedSet; -import java.util.TreeSet; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.RejectedExecutionException; @@ -57,6 +55,7 @@ import org.eclipse.jgit.lib.TextProgressMonitor; public class AllChangesIndexer extends SiteIndexer<Change.Id, ChangeData, ChangeIndex> { private static final FluentLogger logger = FluentLogger.forEnclosingClass(); + private static final int PROJECT_SLICE_MAX_REFS = 1000; private final ChangeData.Factory changeDataFactory; private final GitRepositoryManager repoManager; @@ -81,22 +80,27 @@ public class AllChangesIndexer extends SiteIndexer<Change.Id, ChangeData, Change this.projectCache = projectCache; } - private static class ProjectHolder implements Comparable<ProjectHolder> { - final Project.NameKey name; - private final long size; + private static class ProjectSlice { + private final Project.NameKey name; + private final int slice; + private final int slices; - ProjectHolder(Project.NameKey name, long size) { + ProjectSlice(Project.NameKey name, int slice, int slices) { this.name = name; - this.size = size; + this.slice = slice; + this.slices = slices; } - @Override - public int compareTo(ProjectHolder other) { - // Sort projects based on size first to maximize utilization of threads early on. - return ComparisonChain.start() - .compare(other.size, size) - .compare(other.name.get(), name.get()) - .result(); + public Project.NameKey getName() { + return name; + } + + public int getSlice() { + return slice; + } + + public int getSlices() { + return slices; } } @@ -104,19 +108,39 @@ public class AllChangesIndexer extends SiteIndexer<Change.Id, ChangeData, Change public Result indexAll(ChangeIndex index) { ProgressMonitor pm = new TextProgressMonitor(); pm.beginTask("Collecting projects", ProgressMonitor.UNKNOWN); - SortedSet<ProjectHolder> projects = new TreeSet<>(); + List<ProjectSlice> projectSlices = new ArrayList<>(); int changeCount = 0; Stopwatch sw = Stopwatch.createStarted(); int projectsFailed = 0; for (Project.NameKey name : projectCache.all()) { try (Repository repo = repoManager.openRepository(name)) { + // The simplest approach to distribute indexing would be to let each thread grab a project + // and index it fully. But if a site has one big project and 100s of small projects, then + // in the beginning all CPUs would be busy reindexing projects. But soon enough all small + // projects have been reindexed, and only the thread that reindexes the big project is + // still working. The other threads would idle. Reindexing the big project on a single + // thread becomes the critical path. Bringing in more CPUs would not speed up things. + // + // To avoid such situations, we split big repos into smaller parts and let + // the thread pool index these smaller parts. This splitting introduces an overhead in the + // workload setup and there might be additional slow-downs from multiple threads + // concurrently working on different parts of the same project. But for Wikimedia's Gerrit, + // which had 2 big projects, many middle sized ones, and lots of smaller ones, the + // splitting of repos into smaller parts reduced indexing time from 1.5 hours to 55 minutes + // in 2020. int size = estimateSize(repo); changeCount += size; - projects.add(new ProjectHolder(name, size)); + int slices = 1 + size / PROJECT_SLICE_MAX_REFS; + if (slices > 1) { + verboseWriter.println("Submitting " + name + " for indexing in " + slices + " slices"); + } + for (int slice = 0; slice < slices; slice++) { + projectSlices.add(new ProjectSlice(name, slice, slices)); + } } catch (IOException e) { logger.atSevere().withCause(e).log("Error collecting project %s", name); projectsFailed++; - if (projectsFailed > projects.size() / 2) { + if (projectsFailed > projectCache.all().size() / 2) { logger.atSevere().log("Over 50%% of the projects could not be collected: aborted"); return new Result(sw, false, 0, 0); } @@ -125,7 +149,15 @@ public class AllChangesIndexer extends SiteIndexer<Change.Id, ChangeData, Change } pm.endTask(); setTotalWork(changeCount); - return indexAll(index, projects); + + // projectSlices are currently grouped by projects. First all slices for project1, followed + // by all slices for project2, and so on. As workers pick tasks sequentially, multiple threads + // would typically work concurrently on different slices of the same project. While this is not + // a big issue, shuffling the list beforehand helps with ungrouping the project slices, so + // different slices are less likely to be worked on concurrently. + // This shuffling gave a 6% runtime reduction for Wikimedia's Gerrit in 2020. + Collections.shuffle(projectSlices); + return indexAll(index, projectSlices); } private int estimateSize(Repository repo) throws IOException { @@ -141,10 +173,10 @@ public class AllChangesIndexer extends SiteIndexer<Change.Id, ChangeData, Change return Ints.saturatedCast(size); } - private SiteIndexer.Result indexAll(ChangeIndex index, SortedSet<ProjectHolder> projects) { + private SiteIndexer.Result indexAll(ChangeIndex index, List<ProjectSlice> projectSlices) { Stopwatch sw = Stopwatch.createStarted(); MultiProgressMonitor mpm = new MultiProgressMonitor(progressOut, "Reindexing changes"); - Task projTask = mpm.beginSubTask("projects", projects.size()); + Task projTask = mpm.beginSubTask("project-slices", projectSlices.size()); checkState(totalWork >= 0); Task doneTask = mpm.beginSubTask(null, totalWork); Task failedTask = mpm.beginSubTask("failed", MultiProgressMonitor.UNKNOWN); @@ -152,12 +184,21 @@ public class AllChangesIndexer extends SiteIndexer<Change.Id, ChangeData, Change List<ListenableFuture<?>> futures = new ArrayList<>(); AtomicBoolean ok = new AtomicBoolean(true); - for (ProjectHolder project : projects) { + for (ProjectSlice projectSlice : projectSlices) { + Project.NameKey name = projectSlice.getName(); + int slice = projectSlice.getSlice(); + int slices = projectSlice.getSlices(); ListenableFuture<?> future = executor.submit( reindexProject( - indexerFactory.create(executor, index), project.name, doneTask, failedTask)); - addErrorListener(future, "project " + project.name, projTask, ok); + indexerFactory.create(executor, index), + name, + slice, + slices, + doneTask, + failedTask)); + String description = "project " + name + " (" + slice + "/" + slices + ")"; + addErrorListener(future, description, projTask, ok); futures.add(future); } @@ -192,22 +233,38 @@ public class AllChangesIndexer extends SiteIndexer<Change.Id, ChangeData, Change public Callable<Void> reindexProject( ChangeIndexer indexer, Project.NameKey project, Task done, Task failed) { - return new ProjectIndexer(indexer, project, done, failed); + return reindexProject(indexer, project, 0, 1, done, failed); + } + + public Callable<Void> reindexProject( + ChangeIndexer indexer, + Project.NameKey project, + int slice, + int slices, + Task done, + Task failed) { + return new ProjectIndexer(indexer, project, slice, slices, done, failed); } private class ProjectIndexer implements Callable<Void> { private final ChangeIndexer indexer; private final Project.NameKey project; + private final int slice; + private final int slices; private final ProgressMonitor done; private final ProgressMonitor failed; private ProjectIndexer( ChangeIndexer indexer, Project.NameKey project, + int slice, + int slices, ProgressMonitor done, ProgressMonitor failed) { this.indexer = indexer; this.project = project; + this.slice = slice; + this.slices = slices; this.done = done; this.failed = failed; } @@ -222,7 +279,7 @@ public class AllChangesIndexer extends SiteIndexer<Change.Id, ChangeData, Change // It does mean that reindexing after invalidating the DiffSummary cache will be expensive, // but the goal is to invalidate that cache as infrequently as we possibly can. And besides, // we don't have concrete proof that improving packfile locality would help. - notesFactory.scan(repo, project).forEach(r -> index(r)); + notesFactory.scan(repo, project, id -> (id.get() % slices) == slice).forEach(r -> index(r)); } catch (RepositoryNotFoundException rnfe) { logger.atSevere().log(rnfe.getMessage()); } finally { diff --git a/java/com/google/gerrit/server/notedb/ChangeNotes.java b/java/com/google/gerrit/server/notedb/ChangeNotes.java index b7ce2a8220..bd673d6dec 100644 --- a/java/com/google/gerrit/server/notedb/ChangeNotes.java +++ b/java/com/google/gerrit/server/notedb/ChangeNotes.java @@ -206,9 +206,18 @@ public class ChangeNotes extends AbstractChangeNotes<ChangeNotes> { public Stream<ChangeNotesResult> scan(Repository repo, Project.NameKey project) throws IOException { - ScanResult sr = scanChangeIds(repo); + return scan(repo, project, null); + } - return sr.all().stream().map(id -> scanOneChange(project, sr, id)).filter(Objects::nonNull); + public Stream<ChangeNotesResult> scan( + Repository repo, Project.NameKey project, Predicate<Change.Id> changeIdPredicate) + throws IOException { + ScanResult sr = scanChangeIds(repo); + Stream<Change.Id> idStream = sr.all().stream(); + if (changeIdPredicate != null) { + idStream = idStream.filter(changeIdPredicate); + } + return idStream.map(id -> scanOneChange(project, sr, id)).filter(Objects::nonNull); } @Nullable |