diff options
-rw-r--r-- | src/network/access/qhttpnetworkreply.cpp | 32 | ||||
-rw-r--r-- | src/network/access/qhttpnetworkreply_p.h | 4 | ||||
-rw-r--r-- | src/network/access/qhttpthreaddelegate.cpp | 51 | ||||
-rw-r--r-- | src/network/access/qhttpthreaddelegate_p.h | 5 | ||||
-rw-r--r-- | src/network/access/qnetworkreplyhttpimpl.cpp | 15 | ||||
-rw-r--r-- | src/network/access/qnetworkreplyhttpimpl_p.h | 2 |
6 files changed, 102 insertions, 7 deletions
diff --git a/src/network/access/qhttpnetworkreply.cpp b/src/network/access/qhttpnetworkreply.cpp index 39204163f2..4f358ed178 100644 --- a/src/network/access/qhttpnetworkreply.cpp +++ b/src/network/access/qhttpnetworkreply.cpp @@ -206,6 +206,19 @@ QByteArray QHttpNetworkReply::readAll() return d->responseData.readAll(); } +QByteArray QHttpNetworkReply::read(qint64 amount) +{ + Q_D(QHttpNetworkReply); + return d->responseData.read(amount); +} + + +qint64 QHttpNetworkReply::sizeNextBlock() +{ + Q_D(QHttpNetworkReply); + return d->responseData.sizeNextBlock(); +} + void QHttpNetworkReply::setDownstreamLimited(bool dsl) { Q_D(QHttpNetworkReply); @@ -213,6 +226,12 @@ void QHttpNetworkReply::setDownstreamLimited(bool dsl) d->connection->d_func()->readMoreLater(this); } +void QHttpNetworkReply::setReadBufferSize(qint64 size) +{ + Q_D(QHttpNetworkReply); + d->readBufferMaxSize = size; +} + bool QHttpNetworkReply::supportsUserProvidedDownloadBuffer() { Q_D(QHttpNetworkReply); @@ -258,7 +277,7 @@ QHttpNetworkReplyPrivate::QHttpNetworkReplyPrivate(const QUrl &newUrl) connectionCloseEnabled(true), forceConnectionCloseEnabled(false), lastChunkRead(false), - currentChunkSize(0), currentChunkRead(0), connection(0), + currentChunkSize(0), currentChunkRead(0), readBufferMaxSize(0), connection(0), autoDecompress(false), responseData(), requestIsPrepared(false) ,pipeliningUsed(false), downstreamLimited(false) ,userProvidedDownloadBuffer(0) @@ -598,6 +617,10 @@ qint64 QHttpNetworkReplyPrivate::readBodyFast(QAbstractSocket *socket, QByteData { qint64 toBeRead = qMin(socket->bytesAvailable(), bodyLength - contentRead); + if (readBufferMaxSize) + toBeRead = qMin(toBeRead, readBufferMaxSize); + + QByteArray bd; bd.resize(toBeRead); qint64 haveRead = socket->read(bd.data(), toBeRead); @@ -698,6 +721,9 @@ qint64 QHttpNetworkReplyPrivate::readReplyBodyRaw(QAbstractSocket *socket, QByte int toBeRead = qMin<qint64>(128*1024, qMin<qint64>(size, socket->bytesAvailable())); + if (readBufferMaxSize) + toBeRead = qMin<qint64>(toBeRead, readBufferMaxSize); + while (toBeRead > 0) { QByteArray byteData; byteData.resize(toBeRead); @@ -723,6 +749,10 @@ qint64 QHttpNetworkReplyPrivate::readReplyBodyChunked(QAbstractSocket *socket, Q { qint64 bytes = 0; while (socket->bytesAvailable()) { + + if (readBufferMaxSize && (bytes > readBufferMaxSize)) + break; + if (!lastChunkRead && currentChunkRead >= currentChunkSize) { // For the first chunk and when we're done with a chunk currentChunkSize = 0; diff --git a/src/network/access/qhttpnetworkreply_p.h b/src/network/access/qhttpnetworkreply_p.h index 97fefc6e1b..b3c16a8258 100644 --- a/src/network/access/qhttpnetworkreply_p.h +++ b/src/network/access/qhttpnetworkreply_p.h @@ -120,7 +120,10 @@ public: bool readAnyAvailable() const; QByteArray readAny(); QByteArray readAll(); + QByteArray read(qint64 amount); + qint64 sizeNextBlock(); void setDownstreamLimited(bool t); + void setReadBufferSize(qint64 size); bool supportsUserProvidedDownloadBuffer(); void setUserProvidedDownloadBuffer(char*); @@ -220,6 +223,7 @@ public: bool lastChunkRead; qint64 currentChunkSize; qint64 currentChunkRead; + qint64 readBufferMaxSize; QPointer<QHttpNetworkConnection> connection; QPointer<QHttpNetworkConnectionChannel> connectionChannel; diff --git a/src/network/access/qhttpthreaddelegate.cpp b/src/network/access/qhttpthreaddelegate.cpp index 634340bb54..1ed9f5d3bd 100644 --- a/src/network/access/qhttpthreaddelegate.cpp +++ b/src/network/access/qhttpthreaddelegate.cpp @@ -194,6 +194,8 @@ QHttpThreadDelegate::QHttpThreadDelegate(QObject *parent) : QObject(parent) , ssl(false) , downloadBufferMaximumSize(0) + , readBufferMaxSize(0) + , bytesEmitted(0) , pendingDownloadData(0) , pendingDownloadProgress(0) , synchronous(false) @@ -349,15 +351,58 @@ void QHttpThreadDelegate::abortRequest() } } +void QHttpThreadDelegate::readBufferSizeChanged(qint64 size) +{ +#ifdef QHTTPTHREADDELEGATE_DEBUG + qDebug() << "QHttpThreadDelegate::readBufferSizeChanged() size " << size; +#endif + if (httpReply) { + httpReply->setDownstreamLimited(size > 0); + httpReply->setReadBufferSize(size); + readBufferMaxSize = size; + } +} + +void QHttpThreadDelegate::readBufferFreed(qint64 size) +{ + if (readBufferMaxSize) { + bytesEmitted -= size; + + QMetaObject::invokeMethod(this, "readyReadSlot", Qt::QueuedConnection); + } +} + void QHttpThreadDelegate::readyReadSlot() { // Don't do in zerocopy case if (!downloadBuffer.isNull()) return; - while (httpReply->readAnyAvailable()) { - pendingDownloadData->fetchAndAddRelease(1); - emit downloadData(httpReply->readAny()); + if (readBufferMaxSize) { + if (bytesEmitted < readBufferMaxSize) { + qint64 sizeEmitted = 0; + while (httpReply->readAnyAvailable() && (sizeEmitted < (readBufferMaxSize-bytesEmitted))) { + if (httpReply->sizeNextBlock() > (readBufferMaxSize-bytesEmitted)) { + sizeEmitted = readBufferMaxSize-bytesEmitted; + bytesEmitted += sizeEmitted; + pendingDownloadData->fetchAndAddRelease(1); + emit downloadData(httpReply->read(sizeEmitted)); + } else { + sizeEmitted = httpReply->sizeNextBlock(); + bytesEmitted += sizeEmitted; + pendingDownloadData->fetchAndAddRelease(1); + emit downloadData(httpReply->readAny()); + } + } + } else { + // We need to wait until we empty data from the read buffer in the reply. + } + + } else { + while (httpReply->readAnyAvailable()) { + pendingDownloadData->fetchAndAddRelease(1); + emit downloadData(httpReply->readAny()); + } } } diff --git a/src/network/access/qhttpthreaddelegate_p.h b/src/network/access/qhttpthreaddelegate_p.h index 036d5b94c6..7ac927ab1f 100644 --- a/src/network/access/qhttpthreaddelegate_p.h +++ b/src/network/access/qhttpthreaddelegate_p.h @@ -93,6 +93,8 @@ public: #endif QHttpNetworkRequest httpRequest; qint64 downloadBufferMaximumSize; + qint64 readBufferMaxSize; + qint64 bytesEmitted; // From backend, modified by us for signal compression QSharedPointer<QAtomicInt> pendingDownloadData; QSharedPointer<QAtomicInt> pendingDownloadProgress; @@ -145,6 +147,9 @@ public slots: // This are called via QueuedConnection from user thread void startRequest(); void abortRequest(); + void readBufferSizeChanged(qint64 size); + void readBufferFreed(qint64 size); + // This is called with a BlockingQueuedConnection from user thread void startRequestSynchronously(); protected slots: diff --git a/src/network/access/qnetworkreplyhttpimpl.cpp b/src/network/access/qnetworkreplyhttpimpl.cpp index c9ec287c2c..25b73e969f 100644 --- a/src/network/access/qnetworkreplyhttpimpl.cpp +++ b/src/network/access/qnetworkreplyhttpimpl.cpp @@ -357,17 +357,22 @@ qint64 QNetworkReplyHttpImpl::readData(char* data, qint64 maxlen) if (maxlen == 1) { // optimization for getChar() *data = d->downloadMultiBuffer.getChar(); + if (readBufferSize()) + emit readBufferFreed(1); return 1; } maxlen = qMin<qint64>(maxlen, d->downloadMultiBuffer.byteAmount()); - return d->downloadMultiBuffer.read(data, maxlen); + qint64 bytesRead = d->downloadMultiBuffer.read(data, maxlen); + if (readBufferSize()) + emit readBufferFreed(bytesRead); + return bytesRead; } void QNetworkReplyHttpImpl::setReadBufferSize(qint64 size) { - Q_UNUSED(size); - // FIXME, unsupported right now + QNetworkReply::setReadBufferSize(size); + emit readBufferSizeChanged(size); return; } @@ -839,6 +844,10 @@ void QNetworkReplyHttpImplPrivate::postRequest() QObject::connect(q, SIGNAL(startHttpRequest()), delegate, SLOT(startRequest())); QObject::connect(q, SIGNAL(abortHttpRequest()), delegate, SLOT(abortRequest())); + // To throttle the connection. + QObject::connect(q, SIGNAL(readBufferSizeChanged(qint64)), delegate, SLOT(readBufferSizeChanged(qint64))); + QObject::connect(q, SIGNAL(readBufferFreed(qint64)), delegate, SLOT(readBufferFreed(qint64))); + if (uploadByteDevice) { QNonContiguousByteDeviceThreadForwardImpl *forwardUploadDevice = new QNonContiguousByteDeviceThreadForwardImpl(uploadByteDevice->atEnd(), uploadByteDevice->size()); diff --git a/src/network/access/qnetworkreplyhttpimpl_p.h b/src/network/access/qnetworkreplyhttpimpl_p.h index 6c2313448b..fb7dfb5495 100644 --- a/src/network/access/qnetworkreplyhttpimpl_p.h +++ b/src/network/access/qnetworkreplyhttpimpl_p.h @@ -139,6 +139,8 @@ signals: // To HTTP thread: void startHttpRequest(); void abortHttpRequest(); + void readBufferSizeChanged(qint64 size); + void readBufferFreed(qint64 size); void startHttpRequestSynchronously(); |