summaryrefslogtreecommitdiffstats
path: root/mgrapp/src/com/google/codereview/manager/unpack/PatchSetUploader.java
diff options
context:
space:
mode:
Diffstat (limited to 'mgrapp/src/com/google/codereview/manager/unpack/PatchSetUploader.java')
-rw-r--r--mgrapp/src/com/google/codereview/manager/unpack/PatchSetUploader.java256
1 files changed, 256 insertions, 0 deletions
diff --git a/mgrapp/src/com/google/codereview/manager/unpack/PatchSetUploader.java b/mgrapp/src/com/google/codereview/manager/unpack/PatchSetUploader.java
new file mode 100644
index 0000000000..160a3d6316
--- /dev/null
+++ b/mgrapp/src/com/google/codereview/manager/unpack/PatchSetUploader.java
@@ -0,0 +1,256 @@
+// Copyright 2008 Google Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.google.codereview.manager.unpack;
+
+import com.google.codereview.internal.CompletePatchset.CompletePatchsetRequest;
+import com.google.codereview.internal.CompletePatchset.CompletePatchsetResponse;
+import com.google.codereview.internal.UploadPatchsetFile.UploadPatchsetFileRequest;
+import com.google.codereview.internal.UploadPatchsetFile.UploadPatchsetFileResponse;
+import com.google.codereview.manager.Backend;
+import com.google.codereview.manager.StopProcessingException;
+import com.google.codereview.rpc.SimpleController;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.RpcCallback;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.spearce.jgit.lib.Constants;
+import org.spearce.jgit.lib.ObjectId;
+import org.spearce.jgit.lib.ObjectLoader;
+import org.spearce.jgit.lib.Repository;
+import org.spearce.jgit.revwalk.RevCommit;
+
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+import java.security.MessageDigest;
+import java.util.zip.Deflater;
+import java.util.zip.DeflaterOutputStream;
+import java.util.zip.InflaterInputStream;
+
+
+class PatchSetUploader implements Runnable {
+ private static final Log LOG = LogFactory.getLog(PatchSetUploader.class);
+ private static final int MAX_DATA_SIZE = 1022 * 1024; // bytes
+ private static final String EMPTY_BLOB_ID;
+ private static final ByteString EMPTY_DEFLATE;
+
+ static {
+ final MessageDigest md = Constants.newMessageDigest();
+ md.update(Constants.encodeASCII("blob 0\0"));
+ EMPTY_BLOB_ID = ObjectId.fromRaw(md.digest()).name();
+ EMPTY_DEFLATE = deflate(new byte[0]);
+ }
+
+ private final Backend server;
+ private final Repository db;
+ private final RevCommit commit;
+ private final String commitName;
+ private final String patchsetKey;
+ private ByteString.Output compressedFilenames;
+ private Writer filenameOut;
+
+ PatchSetUploader(final Backend be, final Repository sourceRepo,
+ final RevCommit sourceCommit, final String destPatchsetKey) {
+ server = be;
+ db = sourceRepo;
+ commit = sourceCommit;
+ commitName = commit.getId().name();
+ patchsetKey = destPatchsetKey;
+ }
+
+ private String logkey() {
+ return db.getDirectory().getAbsolutePath() + " " + commitName;
+ }
+
+ public void run() {
+ LOG.debug(logkey() + " begin");
+ try {
+ runImpl();
+ } catch (RuntimeException e) {
+ LOG.fatal(logkey() + " failure", e);
+ } catch (Error e) {
+ LOG.fatal(logkey() + " failure", e);
+ }
+ }
+
+ private void runImpl() {
+ try {
+ compressedFilenames = ByteString.newOutput();
+ filenameOut =
+ new OutputStreamWriter(new DeflaterOutputStream(compressedFilenames,
+ new Deflater(Deflater.DEFAULT_COMPRESSION)), "UTF-8");
+ } catch (IOException e) {
+ LOG.error(logkey() + " cannot initialize filename compression", e);
+ return;
+ }
+
+ try {
+ final DiffReader dr = new DiffReader(db, commit);
+ try {
+ boolean first = true;
+ FileDiff file;
+ while ((file = dr.next()) != null) {
+ storeOneDiff(file);
+
+ if (first) {
+ first = false;
+ } else {
+ filenameOut.write('\0');
+ }
+ filenameOut.write(file.getFilename());
+ }
+ } finally {
+ dr.close();
+ }
+ filenameOut.close();
+ } catch (StopProcessingException halt) {
+ return;
+ } catch (IOException err) {
+ LOG.error(logkey() + " diff failed", err);
+ return;
+ }
+
+ final CompletePatchsetRequest.Builder req;
+ req = CompletePatchsetRequest.newBuilder();
+ req.setPatchsetKey(patchsetKey);
+ req.setCompressedFilenames(compressedFilenames.toByteString());
+ final SimpleController ctrl = new SimpleController();
+ server.getChangeService().completePatchset(ctrl, req.build(),
+ new RpcCallback<CompletePatchsetResponse>() {
+ public void run(final CompletePatchsetResponse rsp) {
+ LOG.debug(logkey() + " complete");
+ }
+ });
+ if (ctrl.failed()) {
+ final String why = ctrl.errorText();
+ LOG.error(logkey() + " completing failed: " + why);
+ }
+ }
+
+ private void storeOneDiff(final FileDiff diff) throws StopProcessingException {
+ final UploadPatchsetFileRequest req = toFileRequest(diff);
+ final SimpleController ctrl = new SimpleController();
+ server.getChangeService().uploadPatchsetFile(ctrl, req,
+ new RpcCallback<UploadPatchsetFileResponse>() {
+ public void run(final UploadPatchsetFileResponse rsp) {
+ final UploadPatchsetFileResponse.CodeType sc = rsp.getStatusCode();
+ final String fn = req.getFileName();
+ final String pk = req.getPatchsetKey();
+
+ if (sc == UploadPatchsetFileResponse.CodeType.CREATED) {
+ LOG.debug(logkey() + " uploaded " + fn);
+ } else if (sc == UploadPatchsetFileResponse.CodeType.CLOSED) {
+ ctrl.setFailed("patchset closed " + pk);
+ } else if (sc == UploadPatchsetFileResponse.CodeType.UNKNOWN_PATCHSET) {
+ ctrl.setFailed("patchset unknown " + pk);
+ } else if (sc == UploadPatchsetFileResponse.CodeType.PATCHING_ERROR) {
+ ctrl.setFailed("server cannot apply patch");
+ } else {
+ ctrl.setFailed("Unknown status " + sc.name() + " " + pk);
+ }
+ }
+ });
+ if (ctrl.failed()) {
+ final String fn = req.getFileName();
+ final String why = ctrl.errorText();
+ LOG.error(logkey() + " uploading " + fn + " failed: " + why);
+ throw new StopProcessingException(why);
+ }
+ }
+
+ private UploadPatchsetFileRequest toFileRequest(final FileDiff diff) {
+ final UploadPatchsetFileRequest.Builder req;
+
+ req = UploadPatchsetFileRequest.newBuilder();
+ req.setPatchsetKey(patchsetKey);
+ req.setFileName(diff.getFilename());
+ req.setStatus(diff.getStatus());
+
+ ByteString patchz = deflate(diff.getPatch());
+
+ if (!diff.isBinary() && !diff.isTruncated()) {
+ final ObjectId baseId = diff.getBaseId();
+
+ if (baseId == null || ObjectId.equals(baseId, ObjectId.zeroId())) {
+ req.setBaseId(EMPTY_BLOB_ID);
+ req.setBaseZ(EMPTY_DEFLATE);
+ } else {
+ try {
+ final ObjectLoader ldr = db.openBlob(baseId);
+ if (ldr == null) {
+ LOG.fatal(logkey() + " missing " + baseId.name());
+ throw new StopProcessingException("No " + baseId.name());
+ }
+
+ final ByteString basez = deflate(ldr.getCachedBytes());
+ if (basez.size() + patchz.size() > MAX_DATA_SIZE) {
+ diff.truncatePatch();
+ patchz = deflate(diff.getPatch());
+ } else {
+ req.setBaseId(baseId.name());
+ req.setBaseZ(basez);
+ }
+ } catch (IOException err) {
+ LOG.fatal(logkey() + " cannot read base " + baseId.name(), err);
+ throw new StopProcessingException("No " + baseId.name());
+ }
+ }
+ }
+
+ if (!diff.isBinary() && !diff.isTruncated()) {
+ final ObjectId finalId = diff.getFinalId();
+ if (finalId == null || ObjectId.equals(finalId, ObjectId.zeroId())) {
+ req.setFinalId(EMPTY_BLOB_ID);
+ } else {
+ req.setFinalId(finalId.name());
+ }
+ }
+
+ req.setPatchZ(patchz);
+ req.setPatchId(hashOfInflated(patchz));
+
+ return req.build();
+ }
+
+ private static ByteString deflate(final byte[] buf) {
+ final ByteString.Output r = ByteString.newOutput();
+ final DeflaterOutputStream out = new DeflaterOutputStream(r);
+ try {
+ out.write(buf);
+ out.close();
+ } catch (IOException err) {
+ // This should not happen.
+ throw new StopProcessingException("Unexpected IO error", err);
+ }
+ return r.toByteString();
+ }
+
+ private static String hashOfInflated(final ByteString in) {
+ final MessageDigest md = Constants.newMessageDigest();
+ final byte[] tmp = new byte[512];
+ final InflaterInputStream iis = new InflaterInputStream(in.newInput());
+ int cnt;
+ try {
+ while ((cnt = iis.read(tmp)) > 0) {
+ md.update(tmp, 0, cnt);
+ }
+ } catch (IOException err) {
+ // This should not happen.
+ throw new StopProcessingException("Unexpected IO error", err);
+ }
+ return ObjectId.fromRaw(md.digest()).name();
+ }
+}