diff options
author | Matthias Sohn <matthias.sohn@sap.com> | 2020-08-01 14:59:30 +0200 |
---|---|---|
committer | Matthias Sohn <matthias.sohn@sap.com> | 2020-10-07 00:10:14 +0200 |
commit | 834e718ccc41c187acc3e253ab023a62fdacd0e3 (patch) | |
tree | 7b649016423c5a0476a428c8edbf2608f124d82d | |
parent | 88653e3269265eb92cee240a9141adb4b8b2b26f (diff) |
NoteDbMigrator: shuffle slices to reduce concurrency on same repo
Migrating many slices of the same repository concurrently increases
pressure on the thread trying to pack refs of the repository. Hence
shuffle all slices of all projects to decrease concurrency per
repository.
When testing migration of a large project (370k changes, >1m refs) using
80 threads I observed that there were always 20-30k loose refs despite
the fact that refs were constantly packed which took 1-3 minutes for
each repacking.
Change-Id: I39e1d99995d7e543cba8eedcd921706ca1655b5c
-rw-r--r-- | java/com/google/gerrit/server/notedb/rebuild/NoteDbMigrator.java | 130 |
1 files changed, 88 insertions, 42 deletions
diff --git a/java/com/google/gerrit/server/notedb/rebuild/NoteDbMigrator.java b/java/com/google/gerrit/server/notedb/rebuild/NoteDbMigrator.java index cdc84ec0de..9d2f1e8823 100644 --- a/java/com/google/gerrit/server/notedb/rebuild/NoteDbMigrator.java +++ b/java/com/google/gerrit/server/notedb/rebuild/NoteDbMigrator.java @@ -89,11 +89,13 @@ import java.io.OutputStreamWriter; import java.io.PrintWriter; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.Optional; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReentrantLock; import java.util.function.Consumer; import java.util.function.Predicate; import org.eclipse.jgit.errors.ConfigInvalidException; @@ -429,6 +431,38 @@ public class NoteDbMigrator implements AutoCloseable { } } + private static class ProjectContext { + final ReentrantLock gcLock; + final Project.NameKey project; + int sliceCount; + int changeCount; + final AtomicLong changesMigratedCount; + + ProjectContext(Project.NameKey project, int sliceCount) { + this.gcLock = new ReentrantLock(); + this.project = project; + this.sliceCount = sliceCount; + this.changesMigratedCount = new AtomicLong(); + } + } + + private static class ProjectSlice { + final ProjectContext ctx; + final List<Id> changes; + int sliceNumber = 0; + + ProjectSlice(ProjectContext ctx, List<Id> changes, int sliceNumber) { + this.ctx = ctx; + this.changes = changes; + this.sliceNumber = sliceNumber; + } + + @Override + public String toString() { + return "Slice [project=" + ctx.project + "]"; + } + } + private final FileBasedConfig gerritConfig; private final FileBasedConfig noteDbConfig; private final SchemaFactory<ReviewDb> schemaFactory; @@ -459,6 +493,7 @@ public class NoteDbMigrator implements AutoCloseable { private final boolean verbose; private final AtomicLong globalChangeCounter = new AtomicLong(); + private long totalChangeCount; private NoteDbMigrator( SitePaths sitePaths, @@ -824,6 +859,27 @@ public class NoteDbMigrator implements AutoCloseable { } } + private List<ProjectSlice> slice() throws OrmException { + ImmutableListMultimap<Project.NameKey, Change.Id> changesByProject = getChangesByProject(); + List<Project.NameKey> projectNames = + Ordering.usingToString().sortedCopy(changesByProject.keySet()); + List<ProjectSlice> slices = Lists.newArrayList(); + for (Project.NameKey project : projectNames) { + int sliceNumber = 1; + List<List<Id>> projectSlices = + Lists.partition(changesByProject.get(project), PROJECT_SLICE_MAX_REFS); + ProjectContext ctx = new ProjectContext(project, projectSlices.size()); + ctx.changeCount = changesByProject.get(project).size(); + for (List<Id> s : projectSlices) { + ProjectSlice ps = new ProjectSlice(ctx, s, sliceNumber++); + slices.add(ps); + } + } + Collections.shuffle(slices); + totalChangeCount = changesByProject.size(); + return slices; + } + public void rebuild() throws MigrationException, OrmException { if (!globalNotesMigration.commitChangeWrites()) { throw new MigrationException("Cannot rebuild without noteDb.changes.write=true"); @@ -831,37 +887,25 @@ public class NoteDbMigrator implements AutoCloseable { Stopwatch sw = Stopwatch.createStarted(); logger.atInfo().log("Rebuilding changes in NoteDb"); - ImmutableListMultimap<Project.NameKey, Change.Id> changesByProject = getChangesByProject(); + List<ProjectSlice> slices = slice(); List<ListenableFuture<Boolean>> futures = new ArrayList<>(); - List<Project.NameKey> projectNames = - Ordering.usingToString().sortedCopy(changesByProject.keySet()); - for (Project.NameKey project : projectNames) { - List<List<Id>> slices = - Lists.partition(changesByProject.get(project), PROJECT_SLICE_MAX_REFS); - int count = slices.size(); - int sliceNumber = 1; - for (List<Change.Id> slice : slices) { - int sn = sliceNumber++; - ListenableFuture<Boolean> future = - executor.submit( - () -> { - try (ContextHelper contextHelper = new ContextHelper()) { - return rebuildProjectSlice( - contextHelper.getReviewDb(), project, slice, sn, count); - } catch (Exception e) { - logger.atSevere().withCause(e).log("Error rebuilding project %s", project); - return false; - } - }); - futures.add(future); - } + for (ProjectSlice slice : slices) { + ListenableFuture<Boolean> future = + executor.submit( + () -> { + try (ContextHelper contextHelper = new ContextHelper()) { + return rebuildProjectSlice(contextHelper.getReviewDb(), slice); + } catch (Exception e) { + logger.atSevere().withCause(e).log("Error rebuilding project %s", slice.ctx); + return false; + } + }); + futures.add(future); } - boolean ok = futuresToBoolean(futures, "Error rebuilding projects"); double t = sw.elapsed(TimeUnit.MILLISECONDS) / 1000d; logger.atInfo().log( - "Rebuilt %d changes in %.01fs (%.01f/s)\n", - changesByProject.size(), t, changesByProject.size() / t); + "Rebuilt %d changes in %.01fs (%.01f/s)\n", totalChangeCount, t, totalChangeCount / t); if (!ok) { throw new MigrationException("Rebuilding some changes failed, see log"); } @@ -906,16 +950,13 @@ public class NoteDbMigrator implements AutoCloseable { return ins; } - private boolean rebuildProjectSlice( - ReviewDb db, - Project.NameKey project, - List<Change.Id> slice, - int sliceNumber, - int sliceCount) { + private boolean rebuildProjectSlice(ReviewDb db, ProjectSlice slice) { + ProjectContext ctx = slice.ctx; boolean ok = true; ProgressMonitor pm = new TextProgressMonitor( new PrintWriter(new BufferedWriter(new OutputStreamWriter(progressOut, UTF_8)))); + Project.NameKey project = ctx.project; try (Repository changeRepo = repoManager.openRepository(project); // Only use a PackInserter for the change repo, not All-Users. // @@ -939,15 +980,17 @@ public class NoteDbMigrator implements AutoCloseable { pm.beginTask( FormatUtil.elide( String.format( - "Rebuilding project %s, slice %d/%d", project.get(), sliceNumber, sliceCount), + "Rebuilding project %s slice %d/%d", + project.get(), slice.sliceNumber, ctx.sliceCount), 60), - slice.size()); + slice.changes.size()); int toSave = 0; try { logger.atInfo().log( - "Starting to rebuild changes from project %s, slice %d/%d", - project.get(), sliceNumber, sliceCount); - for (Change.Id changeId : slice) { + "Starting to rebuild changes from project %s slice %d/%d", + project.get(), slice.sliceNumber, ctx.sliceCount); + long pc = 0; + for (Change.Id changeId : slice.changes) { // NoteDbUpdateManager assumes that all commands in its OpenRepo were added by itself, so // we can't share the top-level ChainedReceiveCommands. Use a new set of commands sharing // the same underlying repo, and copy commands back to the top-level @@ -992,15 +1035,18 @@ public class NoteDbMigrator implements AutoCloseable { if (verbose) { logger.atInfo().log("Rebuilt change %s", changeId.get()); } - long cnt = globalChangeCounter.incrementAndGet(); - if (cnt % 1000 == 0) { - logger.atInfo().log("Total number of rebuilt changes %d", cnt); + long c = globalChangeCounter.incrementAndGet(); + if (c % 1000 == 0) { + logger.atInfo().log( + "Total number of rebuilt changes %d/%d (%.01f%%)", + c, totalChangeCount, (100.0 * c) / totalChangeCount); } + pc = ctx.changesMigratedCount.incrementAndGet(); pm.update(1); } logger.atInfo().log( - "Finished rebuilding changes of project %s, slice %d/%d", - project.get(), sliceNumber, sliceCount); + "Finished rebuilding changes of project %s, slice %d/%d, changes %d/%d)", + project.get(), slice.sliceNumber, ctx.sliceCount, pc, ctx.changeCount); } finally { pm.endTask(); } |