summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorEdwin Kempin <edwin.kempin@sap.com>2012-10-31 13:57:31 +0100
committerEdwin Kempin <edwin.kempin@sap.com>2012-10-31 14:04:56 +0100
commit31f124e8ac837fb427c1b015ede96cb03bfc0a24 (patch)
tree7cc5ab74853fe997bebbdaedc0610facfde94f37
parent0f13254156adf367daad830e4248fb15c644fe78 (diff)
Revert "Perform change update on multiple threads"
This reverts commit c545c0901241314190cac02a24aa95f831dd0572. This commit is reverted because it breaks pushing via HTTP. Change-Id: Ica20fb9ce606f425e57737ebe92eccd2c8739fb9 Signed-off-by: Edwin Kempin <edwin.kempin@sap.com>
-rw-r--r--Documentation/config-gerrit.txt12
-rw-r--r--gerrit-server/src/main/java/com/google/gerrit/server/ApprovalsUtil.java35
-rw-r--r--gerrit-server/src/main/java/com/google/gerrit/server/git/ChangeUpdateExecutor.java31
-rw-r--r--gerrit-server/src/main/java/com/google/gerrit/server/git/ReceiveCommits.java112
-rw-r--r--gerrit-server/src/main/java/com/google/gerrit/server/git/ReceiveCommitsExecutorModule.java32
5 files changed, 33 insertions, 189 deletions
diff --git a/Documentation/config-gerrit.txt b/Documentation/config-gerrit.txt
index 1959995f7b..de2aa021ee 100644
--- a/Documentation/config-gerrit.txt
+++ b/Documentation/config-gerrit.txt
@@ -1839,18 +1839,6 @@ processed.
+
Defaults to the number of available CPUs according to the Java runtime.
-[[receive.changeUpdateThreads]]receive.changeUpdateThreads::
-+
-Number of threads to perform change creation or patch set updates
-concurrently. Each thread uses its own database connection from
-the database connection pool, and if all threads are busy then
-main receive thread will also perform a change creation or patch
-set update.
-+
-Defaults to 1, using only the main receive thread. This feature is for
-databases with very high latency that can benfit from concurrent
-operations when multiple changes are impacted at once.
-
[[receive.timeout]]receive.timeout::
+
Overall timeout on the time taken to process the change data in
diff --git a/gerrit-server/src/main/java/com/google/gerrit/server/ApprovalsUtil.java b/gerrit-server/src/main/java/com/google/gerrit/server/ApprovalsUtil.java
index d7a487d631..a295c49d0e 100644
--- a/gerrit-server/src/main/java/com/google/gerrit/server/ApprovalsUtil.java
+++ b/gerrit-server/src/main/java/com/google/gerrit/server/ApprovalsUtil.java
@@ -29,6 +29,7 @@ import com.google.gerrit.reviewdb.server.ReviewDb;
import com.google.gwtorm.server.OrmException;
import com.google.inject.Inject;
+import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Set;
@@ -73,29 +74,16 @@ public class ApprovalsUtil {
*
* @param change Change to update
* @throws OrmException
+ * @throws IOException
* @return List<PatchSetApproval> The previous approvals
*/
public List<PatchSetApproval> copyVetosToLatestPatchSet(Change change)
- throws OrmException {
- return copyVetosToLatestPatchSet(db, change);
- }
-
- /**
- * Moves the PatchSetApprovals to the last PatchSet on the change while
- * keeping the vetos.
- *
- * @param db database connection to use for updates.
- * @param change Change to update
- * @throws OrmException
- * @return List<PatchSetApproval> The previous approvals
- */
- public List<PatchSetApproval> copyVetosToLatestPatchSet(ReviewDb db,
- Change change) throws OrmException {
+ throws OrmException, IOException {
PatchSet.Id source;
if (change.getNumberOfPatchSets() > 1) {
source = new PatchSet.Id(change.getId(), change.getNumberOfPatchSets() - 1);
} else {
- throw new OrmException("Previous patch set could not be found");
+ throw new IOException("Previous patch set could not be found");
}
PatchSet.Id dest = change.currPatchSetId();
@@ -115,9 +103,18 @@ public class ApprovalsUtil {
return patchSetApprovals;
}
- public void addReviewers(ReviewDb db, Change change, PatchSet ps,
- PatchSetInfo info, Set<Id> wantReviewers,
- Set<Account.Id> existingReviewers) throws OrmException {
+
+ /** Attach reviewers to a change. */
+ public void addReviewers(Change change, PatchSet ps, PatchSetInfo info,
+ Set<Account.Id> wantReviewers) throws OrmException {
+ Set<Id> existing = Sets.<Account.Id> newHashSet();
+ addReviewers(change, ps, info, wantReviewers, existing);
+ }
+
+ /** Attach reviewers to a change. */
+ public void addReviewers(Change change, PatchSet ps, PatchSetInfo info,
+ Set<Account.Id> wantReviewers, Set<Account.Id> existingReviewers)
+ throws OrmException {
List<ApprovalType> allTypes = approvalTypes.getApprovalTypes();
if (allTypes.isEmpty()) {
return;
diff --git a/gerrit-server/src/main/java/com/google/gerrit/server/git/ChangeUpdateExecutor.java b/gerrit-server/src/main/java/com/google/gerrit/server/git/ChangeUpdateExecutor.java
deleted file mode 100644
index 3452bb056f..0000000000
--- a/gerrit-server/src/main/java/com/google/gerrit/server/git/ChangeUpdateExecutor.java
+++ /dev/null
@@ -1,31 +0,0 @@
-// Copyright (C) 2012 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.google.gerrit.server.git;
-
-import static java.lang.annotation.RetentionPolicy.RUNTIME;
-
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.inject.BindingAnnotation;
-
-import java.lang.annotation.Retention;
-
-/**
- * Marker on the global {@link ListeningExecutorService} used by
- * {@link ReceiveCommits} to create or replace changes.
- */
-@Retention(RUNTIME)
-@BindingAnnotation
-public @interface ChangeUpdateExecutor {
-}
diff --git a/gerrit-server/src/main/java/com/google/gerrit/server/git/ReceiveCommits.java b/gerrit-server/src/main/java/com/google/gerrit/server/git/ReceiveCommits.java
index 6a6f908851..981f0d80aa 100644
--- a/gerrit-server/src/main/java/com/google/gerrit/server/git/ReceiveCommits.java
+++ b/gerrit-server/src/main/java/com/google/gerrit/server/git/ReceiveCommits.java
@@ -22,7 +22,6 @@ import static org.eclipse.jgit.transport.ReceiveCommand.Result.REJECTED_MISSING_
import static org.eclipse.jgit.transport.ReceiveCommand.Result.REJECTED_NONFASTFORWARD;
import static org.eclipse.jgit.transport.ReceiveCommand.Result.REJECTED_OTHER_REASON;
-import com.google.common.base.Function;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
import com.google.common.collect.LinkedListMultimap;
@@ -31,9 +30,6 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.CheckedFuture;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.gerrit.common.ChangeHooks;
import com.google.gerrit.common.PageLinks;
import com.google.gerrit.common.data.Capable;
@@ -73,7 +69,6 @@ import com.google.gerrit.server.util.RequestScopePropagator;
import com.google.gwtorm.server.AtomicUpdate;
import com.google.gwtorm.server.OrmException;
import com.google.gwtorm.server.ResultSet;
-import com.google.gwtorm.server.SchemaFactory;
import com.google.inject.Inject;
import com.google.inject.assistedinject.Assisted;
@@ -113,7 +108,6 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.Callable;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -216,23 +210,11 @@ public class ReceiveCommits {
}
}
- private static final Function<Exception, OrmException> ORM_EXCEPTION =
- new Function<Exception, OrmException>() {
- @Override
- public OrmException apply(Exception input) {
- if (input instanceof OrmException) {
- return (OrmException) input;
- }
- return new OrmException("Error updating database", input);
- }
- };
-
private final Set<Account.Id> reviewerId = new HashSet<Account.Id>();
private final Set<Account.Id> ccId = new HashSet<Account.Id>();
private final IdentifiedUser currentUser;
private final ReviewDb db;
- private final SchemaFactory<ReviewDb> schemaFactory;
private final AccountResolver accountResolver;
private final CreateChangeSender.Factory createChangeSenderFactory;
private final MergedSender.Factory mergedSenderFactory;
@@ -248,7 +230,6 @@ public class ReceiveCommits {
private final TrackingFooters trackingFooters;
private final TagCache tagCache;
private final WorkQueue workQueue;
- private final ListeningExecutorService changeUpdateExector;
private final RequestScopePropagator requestScopePropagator;
private final ProjectControl projectControl;
@@ -284,7 +265,6 @@ public class ReceiveCommits {
@Inject
ReceiveCommits(final ReviewDb db,
- final SchemaFactory<ReviewDb> schemaFactory,
final AccountResolver accountResolver,
final CreateChangeSender.Factory createChangeSenderFactory,
final MergedSender.Factory mergedSenderFactory,
@@ -300,7 +280,6 @@ public class ReceiveCommits {
@GerritPersonIdent final PersonIdent gerritIdent,
final TrackingFooters trackingFooters,
final WorkQueue workQueue,
- @ChangeUpdateExecutor ListeningExecutorService changeUpdateExector,
final RequestScopePropagator requestScopePropagator,
@Assisted final ProjectControl projectControl,
@@ -308,7 +287,6 @@ public class ReceiveCommits {
final SubmoduleOp.Factory subOpFactory) throws IOException {
this.currentUser = (IdentifiedUser) projectControl.getCurrentUser();
this.db = db;
- this.schemaFactory = schemaFactory;
this.accountResolver = accountResolver;
this.createChangeSenderFactory = createChangeSenderFactory;
this.mergedSenderFactory = mergedSenderFactory;
@@ -324,7 +302,6 @@ public class ReceiveCommits {
this.trackingFooters = trackingFooters;
this.tagCache = tagCache;
this.workQueue = workQueue;
- this.changeUpdateExector = changeUpdateExector;
this.requestScopePropagator = requestScopePropagator;
this.projectControl = projectControl;
@@ -618,7 +595,7 @@ public class ReceiveCommits {
}
} else if (replace.cmd != null && replace.cmd.getResult() == OK) {
try {
- if (replace.insertPatchSet().checkedGet() != null) {
+ if (replace.insertPatchSet() != null) {
replace.inputCommand.setResult(OK);
}
} catch (IOException err) {
@@ -660,19 +637,14 @@ public class ReceiveCommits {
}
try {
- List<CheckedFuture<?, OrmException>> futures = Lists.newArrayList();
for (ReplaceRequest replace : replaceByChange.values()) {
if (replace.inputCommand == newChange) {
- futures.add(replace.insertPatchSet());
+ replace.insertPatchSet();
}
}
for (CreateRequest create : newChanges) {
- futures.add(create.insertChange());
- }
-
- for (CheckedFuture<?, OrmException> f : futures) {
- f.checkedGet();
+ create.insertChange();
}
newChange.setResult(OK);
} catch (OrmException err) {
@@ -1283,35 +1255,10 @@ public class ReceiveCommits {
cmd = new ReceiveCommand(ObjectId.zeroId(), c, ps.getRefName());
}
- CheckedFuture<Void, OrmException> insertChange() throws IOException {
+ void insertChange() throws IOException, OrmException {
rp.getRevWalk().parseBody(commit);
warnMalformedMessage(commit);
- final Thread caller = Thread.currentThread();
- ListenableFuture<Void> future = changeUpdateExector.submit(
- requestScopePropagator.wrap(new Callable<Void>() {
- @Override
- public Void call() throws OrmException {
- if (caller == Thread.currentThread()) {
- insertChange(db);
- } else {
- ReviewDb db = schemaFactory.open();
- try {
- insertChange(db);
- } finally {
- db.close();
- }
- }
- synchronized (newProgress) {
- newProgress.update(1);
- }
- return null;
- }
- }));
- return Futures.makeChecked(future, ORM_EXCEPTION);
- }
-
- private void insertChange(ReviewDb db) throws OrmException {
final Account.Id me = currentUser.getAccountId();
final Set<Account.Id> reviewers = new HashSet<Account.Id>(reviewerId);
final Set<Account.Id> cc = new HashSet<Account.Id>(ccId);
@@ -1336,12 +1283,11 @@ public class ReceiveCommits {
db.changes().beginTransaction(change.getId());
try {
- insertAncestors(db, ps.getId(), commit);
+ insertAncestors(ps.getId(), commit);
db.patchSets().insert(Collections.singleton(ps));
db.changes().insert(Collections.singleton(change));
ChangeUtil.updateTrackingIds(db, change, trackingFooters, footerLines);
- approvalsUtil.addReviewers(db, change, ps, info,
- reviewers, Collections.<Account.Id> emptySet());
+ approvalsUtil.addReviewers(change, ps, info, reviewers);
db.commit();
} finally {
db.rollback();
@@ -1350,6 +1296,7 @@ public class ReceiveCommits {
created = true;
replication.fire(project.getNameKey(), ps.getRefName());
hooks.doPatchsetCreatedHook(change, ps, db);
+ newProgress.update(1);
workQueue.getDefaultQueue()
.submit(requestScopePropagator.wrap(new Runnable() {
@Override
@@ -1594,38 +1541,10 @@ public class ReceiveCommits {
return true;
}
- CheckedFuture<PatchSet.Id, OrmException> insertPatchSet()
- throws IOException {
+ PatchSet.Id insertPatchSet() throws IOException, OrmException {
rp.getRevWalk().parseBody(newCommit);
warnMalformedMessage(newCommit);
- final Thread caller = Thread.currentThread();
- ListenableFuture<PatchSet.Id> future = changeUpdateExector.submit(
- requestScopePropagator.wrap(new Callable<PatchSet.Id>() {
- @Override
- public PatchSet.Id call() throws OrmException {
- try {
- if (caller == Thread.currentThread()) {
- return insertPatchSet(db);
- } else {
- ReviewDb db = schemaFactory.open();
- try {
- return insertPatchSet(db);
- } finally {
- db.close();
- }
- }
- } finally {
- synchronized (newProgress) {
- replaceProgress.update(1);
- }
- }
- }
- }));
- return Futures.makeChecked(future, ORM_EXCEPTION);
- }
-
- PatchSet.Id insertPatchSet(ReviewDb db) throws OrmException {
final Account.Id me = currentUser.getAccountId();
final Set<Account.Id> reviewers = new HashSet<Account.Id>(reviewerId);
final Set<Account.Id> cc = new HashSet<Account.Id>(ccId);
@@ -1667,7 +1586,7 @@ public class ReceiveCommits {
return null;
}
- insertAncestors(db, newPatchSet.getId(), newCommit);
+ insertAncestors(newPatchSet.getId(), newCommit);
db.patchSets().insert(Collections.singleton(newPatchSet));
if (checkMergedInto) {
@@ -1675,8 +1594,7 @@ public class ReceiveCommits {
mergedIntoRef = mergedInto != null ? mergedInto.getName() : null;
}
- List<PatchSetApproval> patchSetApprovals =
- approvalsUtil.copyVetosToLatestPatchSet(db, change);
+ List<PatchSetApproval> patchSetApprovals = approvalsUtil.copyVetosToLatestPatchSet(change);
final Set<Account.Id> haveApprovals = new HashSet<Account.Id>();
oldReviewers.clear();
@@ -1691,8 +1609,7 @@ public class ReceiveCommits {
}
}
- approvalsUtil.addReviewers(db, change, newPatchSet, info,
- reviewers, haveApprovals);
+ approvalsUtil.addReviewers(change, newPatchSet, info, reviewers, haveApprovals);
msg =
new ChangeMessage(new ChangeMessage.Key(change.getId(), ChangeUtil
@@ -1753,6 +1670,7 @@ public class ReceiveCommits {
replication.fire(project.getNameKey(), newPatchSet.getRefName());
hooks.doPatchsetCreatedHook(change, newPatchSet, db);
+ replaceProgress.update(1);
if (mergedIntoRef != null) {
hooks.doChangeMergedHook(
change, currentUser.getAccount(), newPatchSet, db);
@@ -2156,9 +2074,7 @@ public class ReceiveCommits {
}
for (final ReplaceRequest req : toClose) {
- final PatchSet.Id psi = req.validate(true)
- ? req.insertPatchSet().checkedGet()
- : null;
+ final PatchSet.Id psi = req.validate(true) ? req.insertPatchSet() : null;
if (psi != null) {
closeChange(req.inputCommand, psi, req.newCommit);
closeProgress.update(1);
@@ -2305,7 +2221,7 @@ public class ReceiveCommits {
}));
}
- private void insertAncestors(ReviewDb db, PatchSet.Id id, RevCommit src)
+ private void insertAncestors(PatchSet.Id id, RevCommit src)
throws OrmException {
final int cnt = src.getParentCount();
List<PatchSetAncestor> toInsert = new ArrayList<PatchSetAncestor>(cnt);
diff --git a/gerrit-server/src/main/java/com/google/gerrit/server/git/ReceiveCommitsExecutorModule.java b/gerrit-server/src/main/java/com/google/gerrit/server/git/ReceiveCommitsExecutorModule.java
index 1cbd227c4c..063db2d841 100644
--- a/gerrit-server/src/main/java/com/google/gerrit/server/git/ReceiveCommitsExecutorModule.java
+++ b/gerrit-server/src/main/java/com/google/gerrit/server/git/ReceiveCommitsExecutorModule.java
@@ -14,20 +14,15 @@
package com.google.gerrit.server.git;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.gerrit.server.config.GerritServerConfig;
+import com.google.gerrit.server.git.WorkQueue;
+import com.google.gerrit.server.git.WorkQueue.Executor;
import com.google.inject.AbstractModule;
import com.google.inject.Provides;
import com.google.inject.Singleton;
import org.eclipse.jgit.lib.Config;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
/** Module providing the {@link ReceiveCommitsExecutor}. */
public class ReceiveCommitsExecutorModule extends AbstractModule {
@Override
@@ -37,31 +32,10 @@ public class ReceiveCommitsExecutorModule extends AbstractModule {
@Provides
@Singleton
@ReceiveCommitsExecutor
- public WorkQueue.Executor createReceiveCommitsExecutor(
- @GerritServerConfig Config config,
+ public Executor getReceiveCommitsExecutor(@GerritServerConfig Config config,
WorkQueue queues) {
int poolSize = config.getInt("receive", null, "threadPoolSize",
Runtime.getRuntime().availableProcessors());
return queues.createQueue(poolSize, "ReceiveCommits");
}
-
- @Provides
- @Singleton
- @ChangeUpdateExecutor
- public ListeningExecutorService createChangeUpdateExecutor(@GerritServerConfig Config config) {
- int poolSize = config.getInt("receive", null, "changeUpdateThreads", 1);
- if (poolSize <= 1) {
- return MoreExecutors.sameThreadExecutor();
- }
- return MoreExecutors.listeningDecorator(
- MoreExecutors.getExitingExecutorService(
- new ThreadPoolExecutor(1, poolSize,
- 10, TimeUnit.MINUTES,
- new ArrayBlockingQueue<Runnable>(poolSize),
- new ThreadFactoryBuilder()
- .setNameFormat("ChangeUpdate-%d")
- .setDaemon(true)
- .build(),
- new ThreadPoolExecutor.CallerRunsPolicy())));
- }
}