diff options
author | Kaushik Lingarkar <kaushikl@codeaurora.org> | 2021-11-10 11:58:13 -0800 |
---|---|---|
committer | Kaushik Lingarkar <kaushikl@codeaurora.org> | 2021-11-16 10:41:38 -0800 |
commit | 13fdc81e113f8d3491275e652a542d96cd6dcf1a (patch) | |
tree | ca9b333899f2b34e8baef926590ce541197f9fcc | |
parent | 35fcc7c5373286596fb4e117157777e2d14f5850 (diff) |
AllChangesIndexer: Parallelize project slice creation
Slice creation can become an overhead on NFS based repositories with
many loose refs. Parallelizing it will help bring down the overall
reindex time.
On a test-site with ~160k changes across 4 repos on NFS, with loose
refs equal to number of changes(created by notedb migration) and
with caches (change_kind, diff, diff_summary) populated, this change
helps bring down the reindex time from ~20mins to ~18mins. Also, if
the repos are repacked, this change does not degrade the performance.
On larger test-sites (~3.5m changes and ~15k projects) with similar
setup and state as the 160k site, this change brings down reindex
time from ~6.5hrs to ~2hrs.
Change-Id: Ie3972a84838f1fb17b66504b933676152294bd7b
-rw-r--r-- | java/com/google/gerrit/server/index/change/AllChangesIndexer.java | 207 |
1 files changed, 122 insertions, 85 deletions
diff --git a/java/com/google/gerrit/server/index/change/AllChangesIndexer.java b/java/com/google/gerrit/server/index/change/AllChangesIndexer.java index 4e403cbfd2..494aa842d5 100644 --- a/java/com/google/gerrit/server/index/change/AllChangesIndexer.java +++ b/java/com/google/gerrit/server/index/change/AllChangesIndexer.java @@ -20,8 +20,11 @@ import static com.google.common.util.concurrent.Futures.transform; import static com.google.common.util.concurrent.MoreExecutors.directExecutor; import static com.google.gerrit.server.git.QueueProvider.QueueType.BATCH; +import com.google.auto.value.AutoValue; import com.google.common.base.Stopwatch; +import com.google.common.collect.Sets; import com.google.common.flogger.FluentLogger; +import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.UncheckedExecutionException; @@ -43,9 +46,13 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Set; import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; import org.eclipse.jgit.lib.ProgressMonitor; import org.eclipse.jgit.lib.Repository; import org.eclipse.jgit.lib.TextProgressMonitor; @@ -59,6 +66,14 @@ public class AllChangesIndexer extends SiteIndexer<Change.Id, ChangeData, Change private static final FluentLogger logger = FluentLogger.forEnclosingClass(); private static final int PROJECT_SLICE_MAX_REFS = 1000; + private static class ProjectsCollectionFailure extends Exception { + private static final long serialVersionUID = 1L; + + public ProjectsCollectionFailure(String message) { + super(message); + } + } + private final ChangeData.Factory changeDataFactory; private final GitRepositoryManager repoManager; private final ListeningExecutorService executor; @@ -82,92 +97,53 @@ public class AllChangesIndexer extends SiteIndexer<Change.Id, ChangeData, Change this.projectCache = projectCache; } - private static class ProjectSlice { - private final Project.NameKey name; - private final int slice; - private final int slices; - private final ScanResult sr; + @AutoValue + public abstract static class ProjectSlice { + public abstract Project.NameKey name(); - ProjectSlice(Project.NameKey name, int slice, int slices, ScanResult sr) { - this.name = name; - this.slice = slice; - this.slices = slices; - this.sr = sr; - } + public abstract int slice(); - public Project.NameKey getName() { - return name; - } + public abstract int slices(); - public int getSlice() { - return slice; - } - - public int getSlices() { - return slices; - } + public abstract ScanResult scanResult(); - public ScanResult getScanResult() { - return sr; + private static ProjectSlice create(Project.NameKey name, int slice, int slices, ScanResult sr) { + return new AutoValue_AllChangesIndexer_ProjectSlice(name, slice, slices, sr); } } @Override public Result indexAll(ChangeIndex index) { - ProgressMonitor pm = new TextProgressMonitor(); - pm.beginTask("Collecting projects", ProgressMonitor.UNKNOWN); - List<ProjectSlice> projectSlices = new ArrayList<>(); - int changeCount = 0; + // The simplest approach to distribute indexing would be to let each thread grab a project + // and index it fully. But if a site has one big project and 100s of small projects, then + // in the beginning all CPUs would be busy reindexing projects. But soon enough all small + // projects have been reindexed, and only the thread that reindexes the big project is + // still working. The other threads would idle. Reindexing the big project on a single + // thread becomes the critical path. Bringing in more CPUs would not speed up things. + // + // To avoid such situations, we split big repos into smaller parts and let + // the thread pool index these smaller parts. This splitting introduces an overhead in the + // workload setup and there might be additional slow-downs from multiple threads + // concurrently working on different parts of the same project. But for Wikimedia's Gerrit, + // which had 2 big projects, many middle sized ones, and lots of smaller ones, the + // splitting of repos into smaller parts reduced indexing time from 1.5 hours to 55 minutes + // in 2020. + Stopwatch sw = Stopwatch.createStarted(); - int projectsFailed = 0; - for (Project.NameKey name : projectCache.all()) { - try (Repository repo = repoManager.openRepository(name)) { - // The simplest approach to distribute indexing would be to let each thread grab a project - // and index it fully. But if a site has one big project and 100s of small projects, then - // in the beginning all CPUs would be busy reindexing projects. But soon enough all small - // projects have been reindexed, and only the thread that reindexes the big project is - // still working. The other threads would idle. Reindexing the big project on a single - // thread becomes the critical path. Bringing in more CPUs would not speed up things. - // - // To avoid such situations, we split big repos into smaller parts and let - // the thread pool index these smaller parts. This splitting introduces an overhead in the - // workload setup and there might be additional slow-downs from multiple threads - // concurrently working on different parts of the same project. But for Wikimedia's Gerrit, - // which had 2 big projects, many middle sized ones, and lots of smaller ones, the - // splitting of repos into smaller parts reduced indexing time from 1.5 hours to 55 minutes - // in 2020. - ScanResult sr = ChangeNotes.Factory.scanChangeIds(repo); - int size = sr.all().size(); - if (size == 0) { - pm.update(1); - continue; - } - changeCount += size; - int slices = 1 + size / PROJECT_SLICE_MAX_REFS; - if (slices > 1) { - verboseWriter.println("Submitting " + name + " for indexing in " + slices + " slices"); - } - for (int slice = 0; slice < slices; slice++) { - projectSlices.add(new ProjectSlice(name, slice, slices, sr)); - } - } catch (IOException e) { - logger.atSevere().withCause(e).log("Error collecting project %s", name); - projectsFailed++; - if (projectsFailed > projectCache.all().size() / 2) { - logger.atSevere().log("Over 50%% of the projects could not be collected: aborted"); - return Result.create(sw, false, 0, 0); - } - } - pm.update(1); + List<ProjectSlice> projectSlices; + try { + projectSlices = new SliceCreator().create(); + } catch (ProjectsCollectionFailure | InterruptedException | ExecutionException e) { + logger.atSevere().log(e.getMessage()); + return Result.create(sw, false, 0, 0); } - pm.endTask(); - setTotalWork(changeCount); - - // projectSlices are currently grouped by projects. First all slices for project1, followed - // by all slices for project2, and so on. As workers pick tasks sequentially, multiple threads - // would typically work concurrently on different slices of the same project. While this is not - // a big issue, shuffling the list beforehand helps with ungrouping the project slices, so - // different slices are less likely to be worked on concurrently. + + // Since project slices are created in parallel, they are somewhat shuffled already. However, + // the number of threads used to create the project slices doesn't guarantee good randomization. + // If the slices are not shuffled well, then multiple threads would typically work concurrently + // on different slices of the same project. While this is not a big issue, shuffling the list + // beforehand helps with ungrouping the project slices, so different slices are less likely to + // be worked on concurrently. // This shuffling gave a 6% runtime reduction for Wikimedia's Gerrit in 2020. Collections.shuffle(projectSlices); return indexAll(index, projectSlices); @@ -185,9 +161,9 @@ public class AllChangesIndexer extends SiteIndexer<Change.Id, ChangeData, Change AtomicBoolean ok = new AtomicBoolean(true); for (ProjectSlice projectSlice : projectSlices) { - Project.NameKey name = projectSlice.getName(); - int slice = projectSlice.getSlice(); - int slices = projectSlice.getSlices(); + Project.NameKey name = projectSlice.name(); + int slice = projectSlice.slice(); + int slices = projectSlice.slices(); ListenableFuture<?> future = executor.submit( reindexProject( @@ -195,7 +171,7 @@ public class AllChangesIndexer extends SiteIndexer<Change.Id, ChangeData, Change name, slice, slices, - projectSlice.getScanResult(), + projectSlice.scanResult(), doneTask, failedTask)); String description = "project " + name + " (" + slice + "/" + slices + ")"; @@ -248,10 +224,10 @@ public class AllChangesIndexer extends SiteIndexer<Change.Id, ChangeData, Change Project.NameKey project, int slice, int slices, - ScanResult sr, + ScanResult scanResult, Task done, Task failed) { - return new ProjectIndexer(indexer, project, slice, slices, sr, done, failed); + return new ProjectIndexer(indexer, project, slice, slices, scanResult, done, failed); } private class ProjectIndexer implements Callable<Void> { @@ -259,7 +235,7 @@ public class AllChangesIndexer extends SiteIndexer<Change.Id, ChangeData, Change private final Project.NameKey project; private final int slice; private final int slices; - private final ScanResult sr; + private final ScanResult scanResult; private final ProgressMonitor done; private final ProgressMonitor failed; @@ -268,14 +244,14 @@ public class AllChangesIndexer extends SiteIndexer<Change.Id, ChangeData, Change Project.NameKey project, int slice, int slices, - ScanResult sr, + ScanResult scanResult, ProgressMonitor done, ProgressMonitor failed) { this.indexer = indexer; this.project = project; this.slice = slice; this.slices = slices; - this.sr = sr; + this.scanResult = scanResult; this.done = done; this.failed = failed; } @@ -288,7 +264,9 @@ public class AllChangesIndexer extends SiteIndexer<Change.Id, ChangeData, Change // It does mean that reindexing after invalidating the DiffSummary cache will be expensive, // but the goal is to invalidate that cache as infrequently as we possibly can. And besides, // we don't have concrete proof that improving packfile locality would help. - notesFactory.scan(sr, project, id -> (id.get() % slices) == slice).forEach(r -> index(r)); + notesFactory + .scan(scanResult, project, id -> (id.get() % slices) == slice) + .forEach(r -> index(r)); OnlineReindexMode.end(); return null; } @@ -329,4 +307,63 @@ public class AllChangesIndexer extends SiteIndexer<Change.Id, ChangeData, Change return "Index all changes of project " + project.get(); } } + + private class SliceCreator { + final Set<ProjectSlice> projectSlices = Sets.newConcurrentHashSet(); + final AtomicInteger changeCount = new AtomicInteger(0); + final AtomicInteger projectsFailed = new AtomicInteger(0); + final ProgressMonitor pm = new TextProgressMonitor(); + + private List<ProjectSlice> create() + throws ProjectsCollectionFailure, InterruptedException, ExecutionException { + List<ListenableFuture<?>> futures = new ArrayList<>(); + pm.beginTask("Collecting projects", ProgressMonitor.UNKNOWN); + for (Project.NameKey name : projectCache.all()) { + futures.add(executor.submit(new ProjectSliceCreator(name))); + } + + Futures.allAsList(futures).get(); + + if (projectsFailed.get() > projectCache.all().size() / 2) { + throw new ProjectsCollectionFailure( + "Over 50%% of the projects could not be collected: aborted"); + } + + pm.endTask(); + setTotalWork(changeCount.get()); + return projectSlices.stream().collect(Collectors.toList()); + } + + private class ProjectSliceCreator implements Callable<Void> { + final Project.NameKey name; + + public ProjectSliceCreator(Project.NameKey name) { + this.name = name; + } + + @Override + public Void call() throws IOException { + try (Repository repo = repoManager.openRepository(name)) { + ScanResult sr = ChangeNotes.Factory.scanChangeIds(repo); + int size = sr.all().size(); + if (size > 0) { + changeCount.addAndGet(size); + int slices = 1 + size / PROJECT_SLICE_MAX_REFS; + if (slices > 1) { + verboseWriter.println( + "Submitting " + name + " for indexing in " + slices + " slices"); + } + for (int slice = 0; slice < slices; slice++) { + projectSlices.add(ProjectSlice.create(name, slice, slices, sr)); + } + } + } catch (IOException e) { + logger.atSevere().withCause(e).log("Error collecting project %s", name); + projectsFailed.incrementAndGet(); + } + pm.update(1); + return null; + } + } + } } |