summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMatthias Sohn <matthias.sohn@sap.com>2020-08-01 14:59:30 +0200
committerMatthias Sohn <matthias.sohn@sap.com>2020-10-07 00:10:14 +0200
commit834e718ccc41c187acc3e253ab023a62fdacd0e3 (patch)
tree7b649016423c5a0476a428c8edbf2608f124d82d
parent88653e3269265eb92cee240a9141adb4b8b2b26f (diff)
NoteDbMigrator: shuffle slices to reduce concurrency on same repo
Migrating many slices of the same repository concurrently increases pressure on the thread trying to pack refs of the repository. Hence shuffle all slices of all projects to decrease concurrency per repository. When testing migration of a large project (370k changes, >1m refs) using 80 threads I observed that there were always 20-30k loose refs despite the fact that refs were constantly packed which took 1-3 minutes for each repacking. Change-Id: I39e1d99995d7e543cba8eedcd921706ca1655b5c
-rw-r--r--java/com/google/gerrit/server/notedb/rebuild/NoteDbMigrator.java130
1 files changed, 88 insertions, 42 deletions
diff --git a/java/com/google/gerrit/server/notedb/rebuild/NoteDbMigrator.java b/java/com/google/gerrit/server/notedb/rebuild/NoteDbMigrator.java
index cdc84ec0de..9d2f1e8823 100644
--- a/java/com/google/gerrit/server/notedb/rebuild/NoteDbMigrator.java
+++ b/java/com/google/gerrit/server/notedb/rebuild/NoteDbMigrator.java
@@ -89,11 +89,13 @@ import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import java.util.function.Predicate;
import org.eclipse.jgit.errors.ConfigInvalidException;
@@ -429,6 +431,38 @@ public class NoteDbMigrator implements AutoCloseable {
}
}
+ private static class ProjectContext {
+ final ReentrantLock gcLock;
+ final Project.NameKey project;
+ int sliceCount;
+ int changeCount;
+ final AtomicLong changesMigratedCount;
+
+ ProjectContext(Project.NameKey project, int sliceCount) {
+ this.gcLock = new ReentrantLock();
+ this.project = project;
+ this.sliceCount = sliceCount;
+ this.changesMigratedCount = new AtomicLong();
+ }
+ }
+
+ private static class ProjectSlice {
+ final ProjectContext ctx;
+ final List<Id> changes;
+ int sliceNumber = 0;
+
+ ProjectSlice(ProjectContext ctx, List<Id> changes, int sliceNumber) {
+ this.ctx = ctx;
+ this.changes = changes;
+ this.sliceNumber = sliceNumber;
+ }
+
+ @Override
+ public String toString() {
+ return "Slice [project=" + ctx.project + "]";
+ }
+ }
+
private final FileBasedConfig gerritConfig;
private final FileBasedConfig noteDbConfig;
private final SchemaFactory<ReviewDb> schemaFactory;
@@ -459,6 +493,7 @@ public class NoteDbMigrator implements AutoCloseable {
private final boolean verbose;
private final AtomicLong globalChangeCounter = new AtomicLong();
+ private long totalChangeCount;
private NoteDbMigrator(
SitePaths sitePaths,
@@ -824,6 +859,27 @@ public class NoteDbMigrator implements AutoCloseable {
}
}
+ private List<ProjectSlice> slice() throws OrmException {
+ ImmutableListMultimap<Project.NameKey, Change.Id> changesByProject = getChangesByProject();
+ List<Project.NameKey> projectNames =
+ Ordering.usingToString().sortedCopy(changesByProject.keySet());
+ List<ProjectSlice> slices = Lists.newArrayList();
+ for (Project.NameKey project : projectNames) {
+ int sliceNumber = 1;
+ List<List<Id>> projectSlices =
+ Lists.partition(changesByProject.get(project), PROJECT_SLICE_MAX_REFS);
+ ProjectContext ctx = new ProjectContext(project, projectSlices.size());
+ ctx.changeCount = changesByProject.get(project).size();
+ for (List<Id> s : projectSlices) {
+ ProjectSlice ps = new ProjectSlice(ctx, s, sliceNumber++);
+ slices.add(ps);
+ }
+ }
+ Collections.shuffle(slices);
+ totalChangeCount = changesByProject.size();
+ return slices;
+ }
+
public void rebuild() throws MigrationException, OrmException {
if (!globalNotesMigration.commitChangeWrites()) {
throw new MigrationException("Cannot rebuild without noteDb.changes.write=true");
@@ -831,37 +887,25 @@ public class NoteDbMigrator implements AutoCloseable {
Stopwatch sw = Stopwatch.createStarted();
logger.atInfo().log("Rebuilding changes in NoteDb");
- ImmutableListMultimap<Project.NameKey, Change.Id> changesByProject = getChangesByProject();
+ List<ProjectSlice> slices = slice();
List<ListenableFuture<Boolean>> futures = new ArrayList<>();
- List<Project.NameKey> projectNames =
- Ordering.usingToString().sortedCopy(changesByProject.keySet());
- for (Project.NameKey project : projectNames) {
- List<List<Id>> slices =
- Lists.partition(changesByProject.get(project), PROJECT_SLICE_MAX_REFS);
- int count = slices.size();
- int sliceNumber = 1;
- for (List<Change.Id> slice : slices) {
- int sn = sliceNumber++;
- ListenableFuture<Boolean> future =
- executor.submit(
- () -> {
- try (ContextHelper contextHelper = new ContextHelper()) {
- return rebuildProjectSlice(
- contextHelper.getReviewDb(), project, slice, sn, count);
- } catch (Exception e) {
- logger.atSevere().withCause(e).log("Error rebuilding project %s", project);
- return false;
- }
- });
- futures.add(future);
- }
+ for (ProjectSlice slice : slices) {
+ ListenableFuture<Boolean> future =
+ executor.submit(
+ () -> {
+ try (ContextHelper contextHelper = new ContextHelper()) {
+ return rebuildProjectSlice(contextHelper.getReviewDb(), slice);
+ } catch (Exception e) {
+ logger.atSevere().withCause(e).log("Error rebuilding project %s", slice.ctx);
+ return false;
+ }
+ });
+ futures.add(future);
}
-
boolean ok = futuresToBoolean(futures, "Error rebuilding projects");
double t = sw.elapsed(TimeUnit.MILLISECONDS) / 1000d;
logger.atInfo().log(
- "Rebuilt %d changes in %.01fs (%.01f/s)\n",
- changesByProject.size(), t, changesByProject.size() / t);
+ "Rebuilt %d changes in %.01fs (%.01f/s)\n", totalChangeCount, t, totalChangeCount / t);
if (!ok) {
throw new MigrationException("Rebuilding some changes failed, see log");
}
@@ -906,16 +950,13 @@ public class NoteDbMigrator implements AutoCloseable {
return ins;
}
- private boolean rebuildProjectSlice(
- ReviewDb db,
- Project.NameKey project,
- List<Change.Id> slice,
- int sliceNumber,
- int sliceCount) {
+ private boolean rebuildProjectSlice(ReviewDb db, ProjectSlice slice) {
+ ProjectContext ctx = slice.ctx;
boolean ok = true;
ProgressMonitor pm =
new TextProgressMonitor(
new PrintWriter(new BufferedWriter(new OutputStreamWriter(progressOut, UTF_8))));
+ Project.NameKey project = ctx.project;
try (Repository changeRepo = repoManager.openRepository(project);
// Only use a PackInserter for the change repo, not All-Users.
//
@@ -939,15 +980,17 @@ public class NoteDbMigrator implements AutoCloseable {
pm.beginTask(
FormatUtil.elide(
String.format(
- "Rebuilding project %s, slice %d/%d", project.get(), sliceNumber, sliceCount),
+ "Rebuilding project %s slice %d/%d",
+ project.get(), slice.sliceNumber, ctx.sliceCount),
60),
- slice.size());
+ slice.changes.size());
int toSave = 0;
try {
logger.atInfo().log(
- "Starting to rebuild changes from project %s, slice %d/%d",
- project.get(), sliceNumber, sliceCount);
- for (Change.Id changeId : slice) {
+ "Starting to rebuild changes from project %s slice %d/%d",
+ project.get(), slice.sliceNumber, ctx.sliceCount);
+ long pc = 0;
+ for (Change.Id changeId : slice.changes) {
// NoteDbUpdateManager assumes that all commands in its OpenRepo were added by itself, so
// we can't share the top-level ChainedReceiveCommands. Use a new set of commands sharing
// the same underlying repo, and copy commands back to the top-level
@@ -992,15 +1035,18 @@ public class NoteDbMigrator implements AutoCloseable {
if (verbose) {
logger.atInfo().log("Rebuilt change %s", changeId.get());
}
- long cnt = globalChangeCounter.incrementAndGet();
- if (cnt % 1000 == 0) {
- logger.atInfo().log("Total number of rebuilt changes %d", cnt);
+ long c = globalChangeCounter.incrementAndGet();
+ if (c % 1000 == 0) {
+ logger.atInfo().log(
+ "Total number of rebuilt changes %d/%d (%.01f%%)",
+ c, totalChangeCount, (100.0 * c) / totalChangeCount);
}
+ pc = ctx.changesMigratedCount.incrementAndGet();
pm.update(1);
}
logger.atInfo().log(
- "Finished rebuilding changes of project %s, slice %d/%d",
- project.get(), sliceNumber, sliceCount);
+ "Finished rebuilding changes of project %s, slice %d/%d, changes %d/%d)",
+ project.get(), slice.sliceNumber, ctx.sliceCount, pc, ctx.changeCount);
} finally {
pm.endTask();
}