diff options
Diffstat (limited to 'mgrapp/src/com/google/codereview/manager/unpack/PatchSetUploader.java')
-rw-r--r-- | mgrapp/src/com/google/codereview/manager/unpack/PatchSetUploader.java | 256 |
1 files changed, 256 insertions, 0 deletions
diff --git a/mgrapp/src/com/google/codereview/manager/unpack/PatchSetUploader.java b/mgrapp/src/com/google/codereview/manager/unpack/PatchSetUploader.java new file mode 100644 index 0000000000..160a3d6316 --- /dev/null +++ b/mgrapp/src/com/google/codereview/manager/unpack/PatchSetUploader.java @@ -0,0 +1,256 @@ +// Copyright 2008 Google Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.google.codereview.manager.unpack; + +import com.google.codereview.internal.CompletePatchset.CompletePatchsetRequest; +import com.google.codereview.internal.CompletePatchset.CompletePatchsetResponse; +import com.google.codereview.internal.UploadPatchsetFile.UploadPatchsetFileRequest; +import com.google.codereview.internal.UploadPatchsetFile.UploadPatchsetFileResponse; +import com.google.codereview.manager.Backend; +import com.google.codereview.manager.StopProcessingException; +import com.google.codereview.rpc.SimpleController; +import com.google.protobuf.ByteString; +import com.google.protobuf.RpcCallback; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.spearce.jgit.lib.Constants; +import org.spearce.jgit.lib.ObjectId; +import org.spearce.jgit.lib.ObjectLoader; +import org.spearce.jgit.lib.Repository; +import org.spearce.jgit.revwalk.RevCommit; + +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.io.Writer; +import java.security.MessageDigest; +import java.util.zip.Deflater; +import java.util.zip.DeflaterOutputStream; +import java.util.zip.InflaterInputStream; + + +class PatchSetUploader implements Runnable { + private static final Log LOG = LogFactory.getLog(PatchSetUploader.class); + private static final int MAX_DATA_SIZE = 1022 * 1024; // bytes + private static final String EMPTY_BLOB_ID; + private static final ByteString EMPTY_DEFLATE; + + static { + final MessageDigest md = Constants.newMessageDigest(); + md.update(Constants.encodeASCII("blob 0\0")); + EMPTY_BLOB_ID = ObjectId.fromRaw(md.digest()).name(); + EMPTY_DEFLATE = deflate(new byte[0]); + } + + private final Backend server; + private final Repository db; + private final RevCommit commit; + private final String commitName; + private final String patchsetKey; + private ByteString.Output compressedFilenames; + private Writer filenameOut; + + PatchSetUploader(final Backend be, final Repository sourceRepo, + final RevCommit sourceCommit, final String destPatchsetKey) { + server = be; + db = sourceRepo; + commit = sourceCommit; + commitName = commit.getId().name(); + patchsetKey = destPatchsetKey; + } + + private String logkey() { + return db.getDirectory().getAbsolutePath() + " " + commitName; + } + + public void run() { + LOG.debug(logkey() + " begin"); + try { + runImpl(); + } catch (RuntimeException e) { + LOG.fatal(logkey() + " failure", e); + } catch (Error e) { + LOG.fatal(logkey() + " failure", e); + } + } + + private void runImpl() { + try { + compressedFilenames = ByteString.newOutput(); + filenameOut = + new OutputStreamWriter(new DeflaterOutputStream(compressedFilenames, + new Deflater(Deflater.DEFAULT_COMPRESSION)), "UTF-8"); + } catch (IOException e) { + LOG.error(logkey() + " cannot initialize filename compression", e); + return; + } + + try { + final DiffReader dr = new DiffReader(db, commit); + try { + boolean first = true; + FileDiff file; + while ((file = dr.next()) != null) { + storeOneDiff(file); + + if (first) { + first = false; + } else { + filenameOut.write('\0'); + } + filenameOut.write(file.getFilename()); + } + } finally { + dr.close(); + } + filenameOut.close(); + } catch (StopProcessingException halt) { + return; + } catch (IOException err) { + LOG.error(logkey() + " diff failed", err); + return; + } + + final CompletePatchsetRequest.Builder req; + req = CompletePatchsetRequest.newBuilder(); + req.setPatchsetKey(patchsetKey); + req.setCompressedFilenames(compressedFilenames.toByteString()); + final SimpleController ctrl = new SimpleController(); + server.getChangeService().completePatchset(ctrl, req.build(), + new RpcCallback<CompletePatchsetResponse>() { + public void run(final CompletePatchsetResponse rsp) { + LOG.debug(logkey() + " complete"); + } + }); + if (ctrl.failed()) { + final String why = ctrl.errorText(); + LOG.error(logkey() + " completing failed: " + why); + } + } + + private void storeOneDiff(final FileDiff diff) throws StopProcessingException { + final UploadPatchsetFileRequest req = toFileRequest(diff); + final SimpleController ctrl = new SimpleController(); + server.getChangeService().uploadPatchsetFile(ctrl, req, + new RpcCallback<UploadPatchsetFileResponse>() { + public void run(final UploadPatchsetFileResponse rsp) { + final UploadPatchsetFileResponse.CodeType sc = rsp.getStatusCode(); + final String fn = req.getFileName(); + final String pk = req.getPatchsetKey(); + + if (sc == UploadPatchsetFileResponse.CodeType.CREATED) { + LOG.debug(logkey() + " uploaded " + fn); + } else if (sc == UploadPatchsetFileResponse.CodeType.CLOSED) { + ctrl.setFailed("patchset closed " + pk); + } else if (sc == UploadPatchsetFileResponse.CodeType.UNKNOWN_PATCHSET) { + ctrl.setFailed("patchset unknown " + pk); + } else if (sc == UploadPatchsetFileResponse.CodeType.PATCHING_ERROR) { + ctrl.setFailed("server cannot apply patch"); + } else { + ctrl.setFailed("Unknown status " + sc.name() + " " + pk); + } + } + }); + if (ctrl.failed()) { + final String fn = req.getFileName(); + final String why = ctrl.errorText(); + LOG.error(logkey() + " uploading " + fn + " failed: " + why); + throw new StopProcessingException(why); + } + } + + private UploadPatchsetFileRequest toFileRequest(final FileDiff diff) { + final UploadPatchsetFileRequest.Builder req; + + req = UploadPatchsetFileRequest.newBuilder(); + req.setPatchsetKey(patchsetKey); + req.setFileName(diff.getFilename()); + req.setStatus(diff.getStatus()); + + ByteString patchz = deflate(diff.getPatch()); + + if (!diff.isBinary() && !diff.isTruncated()) { + final ObjectId baseId = diff.getBaseId(); + + if (baseId == null || ObjectId.equals(baseId, ObjectId.zeroId())) { + req.setBaseId(EMPTY_BLOB_ID); + req.setBaseZ(EMPTY_DEFLATE); + } else { + try { + final ObjectLoader ldr = db.openBlob(baseId); + if (ldr == null) { + LOG.fatal(logkey() + " missing " + baseId.name()); + throw new StopProcessingException("No " + baseId.name()); + } + + final ByteString basez = deflate(ldr.getCachedBytes()); + if (basez.size() + patchz.size() > MAX_DATA_SIZE) { + diff.truncatePatch(); + patchz = deflate(diff.getPatch()); + } else { + req.setBaseId(baseId.name()); + req.setBaseZ(basez); + } + } catch (IOException err) { + LOG.fatal(logkey() + " cannot read base " + baseId.name(), err); + throw new StopProcessingException("No " + baseId.name()); + } + } + } + + if (!diff.isBinary() && !diff.isTruncated()) { + final ObjectId finalId = diff.getFinalId(); + if (finalId == null || ObjectId.equals(finalId, ObjectId.zeroId())) { + req.setFinalId(EMPTY_BLOB_ID); + } else { + req.setFinalId(finalId.name()); + } + } + + req.setPatchZ(patchz); + req.setPatchId(hashOfInflated(patchz)); + + return req.build(); + } + + private static ByteString deflate(final byte[] buf) { + final ByteString.Output r = ByteString.newOutput(); + final DeflaterOutputStream out = new DeflaterOutputStream(r); + try { + out.write(buf); + out.close(); + } catch (IOException err) { + // This should not happen. + throw new StopProcessingException("Unexpected IO error", err); + } + return r.toByteString(); + } + + private static String hashOfInflated(final ByteString in) { + final MessageDigest md = Constants.newMessageDigest(); + final byte[] tmp = new byte[512]; + final InflaterInputStream iis = new InflaterInputStream(in.newInput()); + int cnt; + try { + while ((cnt = iis.read(tmp)) > 0) { + md.update(tmp, 0, cnt); + } + } catch (IOException err) { + // This should not happen. + throw new StopProcessingException("Unexpected IO error", err); + } + return ObjectId.fromRaw(md.digest()).name(); + } +} |