diff options
5 files changed, 33 insertions, 189 deletions
diff --git a/Documentation/config-gerrit.txt b/Documentation/config-gerrit.txt index 1959995f7b..de2aa021ee 100644 --- a/Documentation/config-gerrit.txt +++ b/Documentation/config-gerrit.txt @@ -1839,18 +1839,6 @@ processed. + Defaults to the number of available CPUs according to the Java runtime. -[[receive.changeUpdateThreads]]receive.changeUpdateThreads:: -+ -Number of threads to perform change creation or patch set updates -concurrently. Each thread uses its own database connection from -the database connection pool, and if all threads are busy then -main receive thread will also perform a change creation or patch -set update. -+ -Defaults to 1, using only the main receive thread. This feature is for -databases with very high latency that can benfit from concurrent -operations when multiple changes are impacted at once. - [[receive.timeout]]receive.timeout:: + Overall timeout on the time taken to process the change data in diff --git a/gerrit-server/src/main/java/com/google/gerrit/server/ApprovalsUtil.java b/gerrit-server/src/main/java/com/google/gerrit/server/ApprovalsUtil.java index d7a487d631..a295c49d0e 100644 --- a/gerrit-server/src/main/java/com/google/gerrit/server/ApprovalsUtil.java +++ b/gerrit-server/src/main/java/com/google/gerrit/server/ApprovalsUtil.java @@ -29,6 +29,7 @@ import com.google.gerrit.reviewdb.server.ReviewDb; import com.google.gwtorm.server.OrmException; import com.google.inject.Inject; +import java.io.IOException; import java.util.Collections; import java.util.List; import java.util.Set; @@ -73,29 +74,16 @@ public class ApprovalsUtil { * * @param change Change to update * @throws OrmException + * @throws IOException * @return List<PatchSetApproval> The previous approvals */ public List<PatchSetApproval> copyVetosToLatestPatchSet(Change change) - throws OrmException { - return copyVetosToLatestPatchSet(db, change); - } - - /** - * Moves the PatchSetApprovals to the last PatchSet on the change while - * keeping the vetos. - * - * @param db database connection to use for updates. - * @param change Change to update - * @throws OrmException - * @return List<PatchSetApproval> The previous approvals - */ - public List<PatchSetApproval> copyVetosToLatestPatchSet(ReviewDb db, - Change change) throws OrmException { + throws OrmException, IOException { PatchSet.Id source; if (change.getNumberOfPatchSets() > 1) { source = new PatchSet.Id(change.getId(), change.getNumberOfPatchSets() - 1); } else { - throw new OrmException("Previous patch set could not be found"); + throw new IOException("Previous patch set could not be found"); } PatchSet.Id dest = change.currPatchSetId(); @@ -115,9 +103,18 @@ public class ApprovalsUtil { return patchSetApprovals; } - public void addReviewers(ReviewDb db, Change change, PatchSet ps, - PatchSetInfo info, Set<Id> wantReviewers, - Set<Account.Id> existingReviewers) throws OrmException { + + /** Attach reviewers to a change. */ + public void addReviewers(Change change, PatchSet ps, PatchSetInfo info, + Set<Account.Id> wantReviewers) throws OrmException { + Set<Id> existing = Sets.<Account.Id> newHashSet(); + addReviewers(change, ps, info, wantReviewers, existing); + } + + /** Attach reviewers to a change. */ + public void addReviewers(Change change, PatchSet ps, PatchSetInfo info, + Set<Account.Id> wantReviewers, Set<Account.Id> existingReviewers) + throws OrmException { List<ApprovalType> allTypes = approvalTypes.getApprovalTypes(); if (allTypes.isEmpty()) { return; diff --git a/gerrit-server/src/main/java/com/google/gerrit/server/git/ChangeUpdateExecutor.java b/gerrit-server/src/main/java/com/google/gerrit/server/git/ChangeUpdateExecutor.java deleted file mode 100644 index 3452bb056f..0000000000 --- a/gerrit-server/src/main/java/com/google/gerrit/server/git/ChangeUpdateExecutor.java +++ /dev/null @@ -1,31 +0,0 @@ -// Copyright (C) 2012 The Android Open Source Project -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package com.google.gerrit.server.git; - -import static java.lang.annotation.RetentionPolicy.RUNTIME; - -import com.google.common.util.concurrent.ListeningExecutorService; -import com.google.inject.BindingAnnotation; - -import java.lang.annotation.Retention; - -/** - * Marker on the global {@link ListeningExecutorService} used by - * {@link ReceiveCommits} to create or replace changes. - */ -@Retention(RUNTIME) -@BindingAnnotation -public @interface ChangeUpdateExecutor { -} diff --git a/gerrit-server/src/main/java/com/google/gerrit/server/git/ReceiveCommits.java b/gerrit-server/src/main/java/com/google/gerrit/server/git/ReceiveCommits.java index 6a6f908851..981f0d80aa 100644 --- a/gerrit-server/src/main/java/com/google/gerrit/server/git/ReceiveCommits.java +++ b/gerrit-server/src/main/java/com/google/gerrit/server/git/ReceiveCommits.java @@ -22,7 +22,6 @@ import static org.eclipse.jgit.transport.ReceiveCommand.Result.REJECTED_MISSING_ import static org.eclipse.jgit.transport.ReceiveCommand.Result.REJECTED_NONFASTFORWARD; import static org.eclipse.jgit.transport.ReceiveCommand.Result.REJECTED_OTHER_REASON; -import com.google.common.base.Function; import com.google.common.base.Predicate; import com.google.common.collect.Iterables; import com.google.common.collect.LinkedListMultimap; @@ -31,9 +30,6 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; import com.google.common.util.concurrent.CheckedFuture; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.ListeningExecutorService; import com.google.gerrit.common.ChangeHooks; import com.google.gerrit.common.PageLinks; import com.google.gerrit.common.data.Capable; @@ -73,7 +69,6 @@ import com.google.gerrit.server.util.RequestScopePropagator; import com.google.gwtorm.server.AtomicUpdate; import com.google.gwtorm.server.OrmException; import com.google.gwtorm.server.ResultSet; -import com.google.gwtorm.server.SchemaFactory; import com.google.inject.Inject; import com.google.inject.assistedinject.Assisted; @@ -113,7 +108,6 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.Callable; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -216,23 +210,11 @@ public class ReceiveCommits { } } - private static final Function<Exception, OrmException> ORM_EXCEPTION = - new Function<Exception, OrmException>() { - @Override - public OrmException apply(Exception input) { - if (input instanceof OrmException) { - return (OrmException) input; - } - return new OrmException("Error updating database", input); - } - }; - private final Set<Account.Id> reviewerId = new HashSet<Account.Id>(); private final Set<Account.Id> ccId = new HashSet<Account.Id>(); private final IdentifiedUser currentUser; private final ReviewDb db; - private final SchemaFactory<ReviewDb> schemaFactory; private final AccountResolver accountResolver; private final CreateChangeSender.Factory createChangeSenderFactory; private final MergedSender.Factory mergedSenderFactory; @@ -248,7 +230,6 @@ public class ReceiveCommits { private final TrackingFooters trackingFooters; private final TagCache tagCache; private final WorkQueue workQueue; - private final ListeningExecutorService changeUpdateExector; private final RequestScopePropagator requestScopePropagator; private final ProjectControl projectControl; @@ -284,7 +265,6 @@ public class ReceiveCommits { @Inject ReceiveCommits(final ReviewDb db, - final SchemaFactory<ReviewDb> schemaFactory, final AccountResolver accountResolver, final CreateChangeSender.Factory createChangeSenderFactory, final MergedSender.Factory mergedSenderFactory, @@ -300,7 +280,6 @@ public class ReceiveCommits { @GerritPersonIdent final PersonIdent gerritIdent, final TrackingFooters trackingFooters, final WorkQueue workQueue, - @ChangeUpdateExecutor ListeningExecutorService changeUpdateExector, final RequestScopePropagator requestScopePropagator, @Assisted final ProjectControl projectControl, @@ -308,7 +287,6 @@ public class ReceiveCommits { final SubmoduleOp.Factory subOpFactory) throws IOException { this.currentUser = (IdentifiedUser) projectControl.getCurrentUser(); this.db = db; - this.schemaFactory = schemaFactory; this.accountResolver = accountResolver; this.createChangeSenderFactory = createChangeSenderFactory; this.mergedSenderFactory = mergedSenderFactory; @@ -324,7 +302,6 @@ public class ReceiveCommits { this.trackingFooters = trackingFooters; this.tagCache = tagCache; this.workQueue = workQueue; - this.changeUpdateExector = changeUpdateExector; this.requestScopePropagator = requestScopePropagator; this.projectControl = projectControl; @@ -618,7 +595,7 @@ public class ReceiveCommits { } } else if (replace.cmd != null && replace.cmd.getResult() == OK) { try { - if (replace.insertPatchSet().checkedGet() != null) { + if (replace.insertPatchSet() != null) { replace.inputCommand.setResult(OK); } } catch (IOException err) { @@ -660,19 +637,14 @@ public class ReceiveCommits { } try { - List<CheckedFuture<?, OrmException>> futures = Lists.newArrayList(); for (ReplaceRequest replace : replaceByChange.values()) { if (replace.inputCommand == newChange) { - futures.add(replace.insertPatchSet()); + replace.insertPatchSet(); } } for (CreateRequest create : newChanges) { - futures.add(create.insertChange()); - } - - for (CheckedFuture<?, OrmException> f : futures) { - f.checkedGet(); + create.insertChange(); } newChange.setResult(OK); } catch (OrmException err) { @@ -1283,35 +1255,10 @@ public class ReceiveCommits { cmd = new ReceiveCommand(ObjectId.zeroId(), c, ps.getRefName()); } - CheckedFuture<Void, OrmException> insertChange() throws IOException { + void insertChange() throws IOException, OrmException { rp.getRevWalk().parseBody(commit); warnMalformedMessage(commit); - final Thread caller = Thread.currentThread(); - ListenableFuture<Void> future = changeUpdateExector.submit( - requestScopePropagator.wrap(new Callable<Void>() { - @Override - public Void call() throws OrmException { - if (caller == Thread.currentThread()) { - insertChange(db); - } else { - ReviewDb db = schemaFactory.open(); - try { - insertChange(db); - } finally { - db.close(); - } - } - synchronized (newProgress) { - newProgress.update(1); - } - return null; - } - })); - return Futures.makeChecked(future, ORM_EXCEPTION); - } - - private void insertChange(ReviewDb db) throws OrmException { final Account.Id me = currentUser.getAccountId(); final Set<Account.Id> reviewers = new HashSet<Account.Id>(reviewerId); final Set<Account.Id> cc = new HashSet<Account.Id>(ccId); @@ -1336,12 +1283,11 @@ public class ReceiveCommits { db.changes().beginTransaction(change.getId()); try { - insertAncestors(db, ps.getId(), commit); + insertAncestors(ps.getId(), commit); db.patchSets().insert(Collections.singleton(ps)); db.changes().insert(Collections.singleton(change)); ChangeUtil.updateTrackingIds(db, change, trackingFooters, footerLines); - approvalsUtil.addReviewers(db, change, ps, info, - reviewers, Collections.<Account.Id> emptySet()); + approvalsUtil.addReviewers(change, ps, info, reviewers); db.commit(); } finally { db.rollback(); @@ -1350,6 +1296,7 @@ public class ReceiveCommits { created = true; replication.fire(project.getNameKey(), ps.getRefName()); hooks.doPatchsetCreatedHook(change, ps, db); + newProgress.update(1); workQueue.getDefaultQueue() .submit(requestScopePropagator.wrap(new Runnable() { @Override @@ -1594,38 +1541,10 @@ public class ReceiveCommits { return true; } - CheckedFuture<PatchSet.Id, OrmException> insertPatchSet() - throws IOException { + PatchSet.Id insertPatchSet() throws IOException, OrmException { rp.getRevWalk().parseBody(newCommit); warnMalformedMessage(newCommit); - final Thread caller = Thread.currentThread(); - ListenableFuture<PatchSet.Id> future = changeUpdateExector.submit( - requestScopePropagator.wrap(new Callable<PatchSet.Id>() { - @Override - public PatchSet.Id call() throws OrmException { - try { - if (caller == Thread.currentThread()) { - return insertPatchSet(db); - } else { - ReviewDb db = schemaFactory.open(); - try { - return insertPatchSet(db); - } finally { - db.close(); - } - } - } finally { - synchronized (newProgress) { - replaceProgress.update(1); - } - } - } - })); - return Futures.makeChecked(future, ORM_EXCEPTION); - } - - PatchSet.Id insertPatchSet(ReviewDb db) throws OrmException { final Account.Id me = currentUser.getAccountId(); final Set<Account.Id> reviewers = new HashSet<Account.Id>(reviewerId); final Set<Account.Id> cc = new HashSet<Account.Id>(ccId); @@ -1667,7 +1586,7 @@ public class ReceiveCommits { return null; } - insertAncestors(db, newPatchSet.getId(), newCommit); + insertAncestors(newPatchSet.getId(), newCommit); db.patchSets().insert(Collections.singleton(newPatchSet)); if (checkMergedInto) { @@ -1675,8 +1594,7 @@ public class ReceiveCommits { mergedIntoRef = mergedInto != null ? mergedInto.getName() : null; } - List<PatchSetApproval> patchSetApprovals = - approvalsUtil.copyVetosToLatestPatchSet(db, change); + List<PatchSetApproval> patchSetApprovals = approvalsUtil.copyVetosToLatestPatchSet(change); final Set<Account.Id> haveApprovals = new HashSet<Account.Id>(); oldReviewers.clear(); @@ -1691,8 +1609,7 @@ public class ReceiveCommits { } } - approvalsUtil.addReviewers(db, change, newPatchSet, info, - reviewers, haveApprovals); + approvalsUtil.addReviewers(change, newPatchSet, info, reviewers, haveApprovals); msg = new ChangeMessage(new ChangeMessage.Key(change.getId(), ChangeUtil @@ -1753,6 +1670,7 @@ public class ReceiveCommits { replication.fire(project.getNameKey(), newPatchSet.getRefName()); hooks.doPatchsetCreatedHook(change, newPatchSet, db); + replaceProgress.update(1); if (mergedIntoRef != null) { hooks.doChangeMergedHook( change, currentUser.getAccount(), newPatchSet, db); @@ -2156,9 +2074,7 @@ public class ReceiveCommits { } for (final ReplaceRequest req : toClose) { - final PatchSet.Id psi = req.validate(true) - ? req.insertPatchSet().checkedGet() - : null; + final PatchSet.Id psi = req.validate(true) ? req.insertPatchSet() : null; if (psi != null) { closeChange(req.inputCommand, psi, req.newCommit); closeProgress.update(1); @@ -2305,7 +2221,7 @@ public class ReceiveCommits { })); } - private void insertAncestors(ReviewDb db, PatchSet.Id id, RevCommit src) + private void insertAncestors(PatchSet.Id id, RevCommit src) throws OrmException { final int cnt = src.getParentCount(); List<PatchSetAncestor> toInsert = new ArrayList<PatchSetAncestor>(cnt); diff --git a/gerrit-server/src/main/java/com/google/gerrit/server/git/ReceiveCommitsExecutorModule.java b/gerrit-server/src/main/java/com/google/gerrit/server/git/ReceiveCommitsExecutorModule.java index 1cbd227c4c..063db2d841 100644 --- a/gerrit-server/src/main/java/com/google/gerrit/server/git/ReceiveCommitsExecutorModule.java +++ b/gerrit-server/src/main/java/com/google/gerrit/server/git/ReceiveCommitsExecutorModule.java @@ -14,20 +14,15 @@ package com.google.gerrit.server.git; -import com.google.common.util.concurrent.ListeningExecutorService; -import com.google.common.util.concurrent.MoreExecutors; -import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.gerrit.server.config.GerritServerConfig; +import com.google.gerrit.server.git.WorkQueue; +import com.google.gerrit.server.git.WorkQueue.Executor; import com.google.inject.AbstractModule; import com.google.inject.Provides; import com.google.inject.Singleton; import org.eclipse.jgit.lib.Config; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; - /** Module providing the {@link ReceiveCommitsExecutor}. */ public class ReceiveCommitsExecutorModule extends AbstractModule { @Override @@ -37,31 +32,10 @@ public class ReceiveCommitsExecutorModule extends AbstractModule { @Provides @Singleton @ReceiveCommitsExecutor - public WorkQueue.Executor createReceiveCommitsExecutor( - @GerritServerConfig Config config, + public Executor getReceiveCommitsExecutor(@GerritServerConfig Config config, WorkQueue queues) { int poolSize = config.getInt("receive", null, "threadPoolSize", Runtime.getRuntime().availableProcessors()); return queues.createQueue(poolSize, "ReceiveCommits"); } - - @Provides - @Singleton - @ChangeUpdateExecutor - public ListeningExecutorService createChangeUpdateExecutor(@GerritServerConfig Config config) { - int poolSize = config.getInt("receive", null, "changeUpdateThreads", 1); - if (poolSize <= 1) { - return MoreExecutors.sameThreadExecutor(); - } - return MoreExecutors.listeningDecorator( - MoreExecutors.getExitingExecutorService( - new ThreadPoolExecutor(1, poolSize, - 10, TimeUnit.MINUTES, - new ArrayBlockingQueue<Runnable>(poolSize), - new ThreadFactoryBuilder() - .setNameFormat("ChangeUpdate-%d") - .setDaemon(true) - .build(), - new ThreadPoolExecutor.CallerRunsPolicy()))); - } } |