summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorKaushik Lingarkar <kaushikl@codeaurora.org>2021-11-10 11:58:13 -0800
committerKaushik Lingarkar <kaushikl@codeaurora.org>2021-11-16 10:41:38 -0800
commit13fdc81e113f8d3491275e652a542d96cd6dcf1a (patch)
treeca9b333899f2b34e8baef926590ce541197f9fcc
parent35fcc7c5373286596fb4e117157777e2d14f5850 (diff)
AllChangesIndexer: Parallelize project slice creation
Slice creation can become an overhead on NFS based repositories with many loose refs. Parallelizing it will help bring down the overall reindex time. On a test-site with ~160k changes across 4 repos on NFS, with loose refs equal to number of changes(created by notedb migration) and with caches (change_kind, diff, diff_summary) populated, this change helps bring down the reindex time from ~20mins to ~18mins. Also, if the repos are repacked, this change does not degrade the performance. On larger test-sites (~3.5m changes and ~15k projects) with similar setup and state as the 160k site, this change brings down reindex time from ~6.5hrs to ~2hrs. Change-Id: Ie3972a84838f1fb17b66504b933676152294bd7b
-rw-r--r--java/com/google/gerrit/server/index/change/AllChangesIndexer.java207
1 files changed, 122 insertions, 85 deletions
diff --git a/java/com/google/gerrit/server/index/change/AllChangesIndexer.java b/java/com/google/gerrit/server/index/change/AllChangesIndexer.java
index 4e403cbfd2..494aa842d5 100644
--- a/java/com/google/gerrit/server/index/change/AllChangesIndexer.java
+++ b/java/com/google/gerrit/server/index/change/AllChangesIndexer.java
@@ -20,8 +20,11 @@ import static com.google.common.util.concurrent.Futures.transform;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static com.google.gerrit.server.git.QueueProvider.QueueType.BATCH;
+import com.google.auto.value.AutoValue;
import com.google.common.base.Stopwatch;
+import com.google.common.collect.Sets;
import com.google.common.flogger.FluentLogger;
+import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.UncheckedExecutionException;
@@ -43,9 +46,13 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.Set;
import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
import org.eclipse.jgit.lib.ProgressMonitor;
import org.eclipse.jgit.lib.Repository;
import org.eclipse.jgit.lib.TextProgressMonitor;
@@ -59,6 +66,14 @@ public class AllChangesIndexer extends SiteIndexer<Change.Id, ChangeData, Change
private static final FluentLogger logger = FluentLogger.forEnclosingClass();
private static final int PROJECT_SLICE_MAX_REFS = 1000;
+ private static class ProjectsCollectionFailure extends Exception {
+ private static final long serialVersionUID = 1L;
+
+ public ProjectsCollectionFailure(String message) {
+ super(message);
+ }
+ }
+
private final ChangeData.Factory changeDataFactory;
private final GitRepositoryManager repoManager;
private final ListeningExecutorService executor;
@@ -82,92 +97,53 @@ public class AllChangesIndexer extends SiteIndexer<Change.Id, ChangeData, Change
this.projectCache = projectCache;
}
- private static class ProjectSlice {
- private final Project.NameKey name;
- private final int slice;
- private final int slices;
- private final ScanResult sr;
+ @AutoValue
+ public abstract static class ProjectSlice {
+ public abstract Project.NameKey name();
- ProjectSlice(Project.NameKey name, int slice, int slices, ScanResult sr) {
- this.name = name;
- this.slice = slice;
- this.slices = slices;
- this.sr = sr;
- }
+ public abstract int slice();
- public Project.NameKey getName() {
- return name;
- }
+ public abstract int slices();
- public int getSlice() {
- return slice;
- }
-
- public int getSlices() {
- return slices;
- }
+ public abstract ScanResult scanResult();
- public ScanResult getScanResult() {
- return sr;
+ private static ProjectSlice create(Project.NameKey name, int slice, int slices, ScanResult sr) {
+ return new AutoValue_AllChangesIndexer_ProjectSlice(name, slice, slices, sr);
}
}
@Override
public Result indexAll(ChangeIndex index) {
- ProgressMonitor pm = new TextProgressMonitor();
- pm.beginTask("Collecting projects", ProgressMonitor.UNKNOWN);
- List<ProjectSlice> projectSlices = new ArrayList<>();
- int changeCount = 0;
+ // The simplest approach to distribute indexing would be to let each thread grab a project
+ // and index it fully. But if a site has one big project and 100s of small projects, then
+ // in the beginning all CPUs would be busy reindexing projects. But soon enough all small
+ // projects have been reindexed, and only the thread that reindexes the big project is
+ // still working. The other threads would idle. Reindexing the big project on a single
+ // thread becomes the critical path. Bringing in more CPUs would not speed up things.
+ //
+ // To avoid such situations, we split big repos into smaller parts and let
+ // the thread pool index these smaller parts. This splitting introduces an overhead in the
+ // workload setup and there might be additional slow-downs from multiple threads
+ // concurrently working on different parts of the same project. But for Wikimedia's Gerrit,
+ // which had 2 big projects, many middle sized ones, and lots of smaller ones, the
+ // splitting of repos into smaller parts reduced indexing time from 1.5 hours to 55 minutes
+ // in 2020.
+
Stopwatch sw = Stopwatch.createStarted();
- int projectsFailed = 0;
- for (Project.NameKey name : projectCache.all()) {
- try (Repository repo = repoManager.openRepository(name)) {
- // The simplest approach to distribute indexing would be to let each thread grab a project
- // and index it fully. But if a site has one big project and 100s of small projects, then
- // in the beginning all CPUs would be busy reindexing projects. But soon enough all small
- // projects have been reindexed, and only the thread that reindexes the big project is
- // still working. The other threads would idle. Reindexing the big project on a single
- // thread becomes the critical path. Bringing in more CPUs would not speed up things.
- //
- // To avoid such situations, we split big repos into smaller parts and let
- // the thread pool index these smaller parts. This splitting introduces an overhead in the
- // workload setup and there might be additional slow-downs from multiple threads
- // concurrently working on different parts of the same project. But for Wikimedia's Gerrit,
- // which had 2 big projects, many middle sized ones, and lots of smaller ones, the
- // splitting of repos into smaller parts reduced indexing time from 1.5 hours to 55 minutes
- // in 2020.
- ScanResult sr = ChangeNotes.Factory.scanChangeIds(repo);
- int size = sr.all().size();
- if (size == 0) {
- pm.update(1);
- continue;
- }
- changeCount += size;
- int slices = 1 + size / PROJECT_SLICE_MAX_REFS;
- if (slices > 1) {
- verboseWriter.println("Submitting " + name + " for indexing in " + slices + " slices");
- }
- for (int slice = 0; slice < slices; slice++) {
- projectSlices.add(new ProjectSlice(name, slice, slices, sr));
- }
- } catch (IOException e) {
- logger.atSevere().withCause(e).log("Error collecting project %s", name);
- projectsFailed++;
- if (projectsFailed > projectCache.all().size() / 2) {
- logger.atSevere().log("Over 50%% of the projects could not be collected: aborted");
- return Result.create(sw, false, 0, 0);
- }
- }
- pm.update(1);
+ List<ProjectSlice> projectSlices;
+ try {
+ projectSlices = new SliceCreator().create();
+ } catch (ProjectsCollectionFailure | InterruptedException | ExecutionException e) {
+ logger.atSevere().log(e.getMessage());
+ return Result.create(sw, false, 0, 0);
}
- pm.endTask();
- setTotalWork(changeCount);
-
- // projectSlices are currently grouped by projects. First all slices for project1, followed
- // by all slices for project2, and so on. As workers pick tasks sequentially, multiple threads
- // would typically work concurrently on different slices of the same project. While this is not
- // a big issue, shuffling the list beforehand helps with ungrouping the project slices, so
- // different slices are less likely to be worked on concurrently.
+
+ // Since project slices are created in parallel, they are somewhat shuffled already. However,
+ // the number of threads used to create the project slices doesn't guarantee good randomization.
+ // If the slices are not shuffled well, then multiple threads would typically work concurrently
+ // on different slices of the same project. While this is not a big issue, shuffling the list
+ // beforehand helps with ungrouping the project slices, so different slices are less likely to
+ // be worked on concurrently.
// This shuffling gave a 6% runtime reduction for Wikimedia's Gerrit in 2020.
Collections.shuffle(projectSlices);
return indexAll(index, projectSlices);
@@ -185,9 +161,9 @@ public class AllChangesIndexer extends SiteIndexer<Change.Id, ChangeData, Change
AtomicBoolean ok = new AtomicBoolean(true);
for (ProjectSlice projectSlice : projectSlices) {
- Project.NameKey name = projectSlice.getName();
- int slice = projectSlice.getSlice();
- int slices = projectSlice.getSlices();
+ Project.NameKey name = projectSlice.name();
+ int slice = projectSlice.slice();
+ int slices = projectSlice.slices();
ListenableFuture<?> future =
executor.submit(
reindexProject(
@@ -195,7 +171,7 @@ public class AllChangesIndexer extends SiteIndexer<Change.Id, ChangeData, Change
name,
slice,
slices,
- projectSlice.getScanResult(),
+ projectSlice.scanResult(),
doneTask,
failedTask));
String description = "project " + name + " (" + slice + "/" + slices + ")";
@@ -248,10 +224,10 @@ public class AllChangesIndexer extends SiteIndexer<Change.Id, ChangeData, Change
Project.NameKey project,
int slice,
int slices,
- ScanResult sr,
+ ScanResult scanResult,
Task done,
Task failed) {
- return new ProjectIndexer(indexer, project, slice, slices, sr, done, failed);
+ return new ProjectIndexer(indexer, project, slice, slices, scanResult, done, failed);
}
private class ProjectIndexer implements Callable<Void> {
@@ -259,7 +235,7 @@ public class AllChangesIndexer extends SiteIndexer<Change.Id, ChangeData, Change
private final Project.NameKey project;
private final int slice;
private final int slices;
- private final ScanResult sr;
+ private final ScanResult scanResult;
private final ProgressMonitor done;
private final ProgressMonitor failed;
@@ -268,14 +244,14 @@ public class AllChangesIndexer extends SiteIndexer<Change.Id, ChangeData, Change
Project.NameKey project,
int slice,
int slices,
- ScanResult sr,
+ ScanResult scanResult,
ProgressMonitor done,
ProgressMonitor failed) {
this.indexer = indexer;
this.project = project;
this.slice = slice;
this.slices = slices;
- this.sr = sr;
+ this.scanResult = scanResult;
this.done = done;
this.failed = failed;
}
@@ -288,7 +264,9 @@ public class AllChangesIndexer extends SiteIndexer<Change.Id, ChangeData, Change
// It does mean that reindexing after invalidating the DiffSummary cache will be expensive,
// but the goal is to invalidate that cache as infrequently as we possibly can. And besides,
// we don't have concrete proof that improving packfile locality would help.
- notesFactory.scan(sr, project, id -> (id.get() % slices) == slice).forEach(r -> index(r));
+ notesFactory
+ .scan(scanResult, project, id -> (id.get() % slices) == slice)
+ .forEach(r -> index(r));
OnlineReindexMode.end();
return null;
}
@@ -329,4 +307,63 @@ public class AllChangesIndexer extends SiteIndexer<Change.Id, ChangeData, Change
return "Index all changes of project " + project.get();
}
}
+
+ private class SliceCreator {
+ final Set<ProjectSlice> projectSlices = Sets.newConcurrentHashSet();
+ final AtomicInteger changeCount = new AtomicInteger(0);
+ final AtomicInteger projectsFailed = new AtomicInteger(0);
+ final ProgressMonitor pm = new TextProgressMonitor();
+
+ private List<ProjectSlice> create()
+ throws ProjectsCollectionFailure, InterruptedException, ExecutionException {
+ List<ListenableFuture<?>> futures = new ArrayList<>();
+ pm.beginTask("Collecting projects", ProgressMonitor.UNKNOWN);
+ for (Project.NameKey name : projectCache.all()) {
+ futures.add(executor.submit(new ProjectSliceCreator(name)));
+ }
+
+ Futures.allAsList(futures).get();
+
+ if (projectsFailed.get() > projectCache.all().size() / 2) {
+ throw new ProjectsCollectionFailure(
+ "Over 50%% of the projects could not be collected: aborted");
+ }
+
+ pm.endTask();
+ setTotalWork(changeCount.get());
+ return projectSlices.stream().collect(Collectors.toList());
+ }
+
+ private class ProjectSliceCreator implements Callable<Void> {
+ final Project.NameKey name;
+
+ public ProjectSliceCreator(Project.NameKey name) {
+ this.name = name;
+ }
+
+ @Override
+ public Void call() throws IOException {
+ try (Repository repo = repoManager.openRepository(name)) {
+ ScanResult sr = ChangeNotes.Factory.scanChangeIds(repo);
+ int size = sr.all().size();
+ if (size > 0) {
+ changeCount.addAndGet(size);
+ int slices = 1 + size / PROJECT_SLICE_MAX_REFS;
+ if (slices > 1) {
+ verboseWriter.println(
+ "Submitting " + name + " for indexing in " + slices + " slices");
+ }
+ for (int slice = 0; slice < slices; slice++) {
+ projectSlices.add(ProjectSlice.create(name, slice, slices, sr));
+ }
+ }
+ } catch (IOException e) {
+ logger.atSevere().withCause(e).log("Error collecting project %s", name);
+ projectsFailed.incrementAndGet();
+ }
+ pm.update(1);
+ return null;
+ }
+ }
+ }
}