From a050bba2f0151beb508fa7ab6f58c25c4e92bfb0 Mon Sep 17 00:00:00 2001 From: Martin Petersson Date: Mon, 7 May 2012 12:41:58 +0200 Subject: QNetworkReply::setReadBufferSize fix for threaded http Added the setReadBufferSize functionallity again by limiting the amount that the delegate read from the channel. Each time that data is fetched from the reply buffer, we communicate back to the thread so that more data can be fetched. Task-number: QTBUG-25327 Change-Id: I2f9950196e64acd09bc8da50c1116f2c9deacad4 Reviewed-by: Shane Kearns Reviewed-by: Lars Knoll --- src/network/access/qhttpnetworkreply.cpp | 32 ++++++++++++++++- src/network/access/qhttpnetworkreply_p.h | 4 +++ src/network/access/qhttpthreaddelegate.cpp | 51 ++++++++++++++++++++++++++-- src/network/access/qhttpthreaddelegate_p.h | 5 +++ src/network/access/qnetworkreplyhttpimpl.cpp | 15 ++++++-- src/network/access/qnetworkreplyhttpimpl_p.h | 2 ++ 6 files changed, 102 insertions(+), 7 deletions(-) diff --git a/src/network/access/qhttpnetworkreply.cpp b/src/network/access/qhttpnetworkreply.cpp index 39204163f2..4f358ed178 100644 --- a/src/network/access/qhttpnetworkreply.cpp +++ b/src/network/access/qhttpnetworkreply.cpp @@ -206,6 +206,19 @@ QByteArray QHttpNetworkReply::readAll() return d->responseData.readAll(); } +QByteArray QHttpNetworkReply::read(qint64 amount) +{ + Q_D(QHttpNetworkReply); + return d->responseData.read(amount); +} + + +qint64 QHttpNetworkReply::sizeNextBlock() +{ + Q_D(QHttpNetworkReply); + return d->responseData.sizeNextBlock(); +} + void QHttpNetworkReply::setDownstreamLimited(bool dsl) { Q_D(QHttpNetworkReply); @@ -213,6 +226,12 @@ void QHttpNetworkReply::setDownstreamLimited(bool dsl) d->connection->d_func()->readMoreLater(this); } +void QHttpNetworkReply::setReadBufferSize(qint64 size) +{ + Q_D(QHttpNetworkReply); + d->readBufferMaxSize = size; +} + bool QHttpNetworkReply::supportsUserProvidedDownloadBuffer() { Q_D(QHttpNetworkReply); @@ -258,7 +277,7 @@ QHttpNetworkReplyPrivate::QHttpNetworkReplyPrivate(const QUrl &newUrl) connectionCloseEnabled(true), forceConnectionCloseEnabled(false), lastChunkRead(false), - currentChunkSize(0), currentChunkRead(0), connection(0), + currentChunkSize(0), currentChunkRead(0), readBufferMaxSize(0), connection(0), autoDecompress(false), responseData(), requestIsPrepared(false) ,pipeliningUsed(false), downstreamLimited(false) ,userProvidedDownloadBuffer(0) @@ -598,6 +617,10 @@ qint64 QHttpNetworkReplyPrivate::readBodyFast(QAbstractSocket *socket, QByteData { qint64 toBeRead = qMin(socket->bytesAvailable(), bodyLength - contentRead); + if (readBufferMaxSize) + toBeRead = qMin(toBeRead, readBufferMaxSize); + + QByteArray bd; bd.resize(toBeRead); qint64 haveRead = socket->read(bd.data(), toBeRead); @@ -698,6 +721,9 @@ qint64 QHttpNetworkReplyPrivate::readReplyBodyRaw(QAbstractSocket *socket, QByte int toBeRead = qMin(128*1024, qMin(size, socket->bytesAvailable())); + if (readBufferMaxSize) + toBeRead = qMin(toBeRead, readBufferMaxSize); + while (toBeRead > 0) { QByteArray byteData; byteData.resize(toBeRead); @@ -723,6 +749,10 @@ qint64 QHttpNetworkReplyPrivate::readReplyBodyChunked(QAbstractSocket *socket, Q { qint64 bytes = 0; while (socket->bytesAvailable()) { + + if (readBufferMaxSize && (bytes > readBufferMaxSize)) + break; + if (!lastChunkRead && currentChunkRead >= currentChunkSize) { // For the first chunk and when we're done with a chunk currentChunkSize = 0; diff --git a/src/network/access/qhttpnetworkreply_p.h b/src/network/access/qhttpnetworkreply_p.h index 97fefc6e1b..b3c16a8258 100644 --- a/src/network/access/qhttpnetworkreply_p.h +++ b/src/network/access/qhttpnetworkreply_p.h @@ -120,7 +120,10 @@ public: bool readAnyAvailable() const; QByteArray readAny(); QByteArray readAll(); + QByteArray read(qint64 amount); + qint64 sizeNextBlock(); void setDownstreamLimited(bool t); + void setReadBufferSize(qint64 size); bool supportsUserProvidedDownloadBuffer(); void setUserProvidedDownloadBuffer(char*); @@ -220,6 +223,7 @@ public: bool lastChunkRead; qint64 currentChunkSize; qint64 currentChunkRead; + qint64 readBufferMaxSize; QPointer connection; QPointer connectionChannel; diff --git a/src/network/access/qhttpthreaddelegate.cpp b/src/network/access/qhttpthreaddelegate.cpp index 634340bb54..1ed9f5d3bd 100644 --- a/src/network/access/qhttpthreaddelegate.cpp +++ b/src/network/access/qhttpthreaddelegate.cpp @@ -194,6 +194,8 @@ QHttpThreadDelegate::QHttpThreadDelegate(QObject *parent) : QObject(parent) , ssl(false) , downloadBufferMaximumSize(0) + , readBufferMaxSize(0) + , bytesEmitted(0) , pendingDownloadData(0) , pendingDownloadProgress(0) , synchronous(false) @@ -349,15 +351,58 @@ void QHttpThreadDelegate::abortRequest() } } +void QHttpThreadDelegate::readBufferSizeChanged(qint64 size) +{ +#ifdef QHTTPTHREADDELEGATE_DEBUG + qDebug() << "QHttpThreadDelegate::readBufferSizeChanged() size " << size; +#endif + if (httpReply) { + httpReply->setDownstreamLimited(size > 0); + httpReply->setReadBufferSize(size); + readBufferMaxSize = size; + } +} + +void QHttpThreadDelegate::readBufferFreed(qint64 size) +{ + if (readBufferMaxSize) { + bytesEmitted -= size; + + QMetaObject::invokeMethod(this, "readyReadSlot", Qt::QueuedConnection); + } +} + void QHttpThreadDelegate::readyReadSlot() { // Don't do in zerocopy case if (!downloadBuffer.isNull()) return; - while (httpReply->readAnyAvailable()) { - pendingDownloadData->fetchAndAddRelease(1); - emit downloadData(httpReply->readAny()); + if (readBufferMaxSize) { + if (bytesEmitted < readBufferMaxSize) { + qint64 sizeEmitted = 0; + while (httpReply->readAnyAvailable() && (sizeEmitted < (readBufferMaxSize-bytesEmitted))) { + if (httpReply->sizeNextBlock() > (readBufferMaxSize-bytesEmitted)) { + sizeEmitted = readBufferMaxSize-bytesEmitted; + bytesEmitted += sizeEmitted; + pendingDownloadData->fetchAndAddRelease(1); + emit downloadData(httpReply->read(sizeEmitted)); + } else { + sizeEmitted = httpReply->sizeNextBlock(); + bytesEmitted += sizeEmitted; + pendingDownloadData->fetchAndAddRelease(1); + emit downloadData(httpReply->readAny()); + } + } + } else { + // We need to wait until we empty data from the read buffer in the reply. + } + + } else { + while (httpReply->readAnyAvailable()) { + pendingDownloadData->fetchAndAddRelease(1); + emit downloadData(httpReply->readAny()); + } } } diff --git a/src/network/access/qhttpthreaddelegate_p.h b/src/network/access/qhttpthreaddelegate_p.h index 036d5b94c6..7ac927ab1f 100644 --- a/src/network/access/qhttpthreaddelegate_p.h +++ b/src/network/access/qhttpthreaddelegate_p.h @@ -93,6 +93,8 @@ public: #endif QHttpNetworkRequest httpRequest; qint64 downloadBufferMaximumSize; + qint64 readBufferMaxSize; + qint64 bytesEmitted; // From backend, modified by us for signal compression QSharedPointer pendingDownloadData; QSharedPointer pendingDownloadProgress; @@ -145,6 +147,9 @@ public slots: // This are called via QueuedConnection from user thread void startRequest(); void abortRequest(); + void readBufferSizeChanged(qint64 size); + void readBufferFreed(qint64 size); + // This is called with a BlockingQueuedConnection from user thread void startRequestSynchronously(); protected slots: diff --git a/src/network/access/qnetworkreplyhttpimpl.cpp b/src/network/access/qnetworkreplyhttpimpl.cpp index c9ec287c2c..25b73e969f 100644 --- a/src/network/access/qnetworkreplyhttpimpl.cpp +++ b/src/network/access/qnetworkreplyhttpimpl.cpp @@ -357,17 +357,22 @@ qint64 QNetworkReplyHttpImpl::readData(char* data, qint64 maxlen) if (maxlen == 1) { // optimization for getChar() *data = d->downloadMultiBuffer.getChar(); + if (readBufferSize()) + emit readBufferFreed(1); return 1; } maxlen = qMin(maxlen, d->downloadMultiBuffer.byteAmount()); - return d->downloadMultiBuffer.read(data, maxlen); + qint64 bytesRead = d->downloadMultiBuffer.read(data, maxlen); + if (readBufferSize()) + emit readBufferFreed(bytesRead); + return bytesRead; } void QNetworkReplyHttpImpl::setReadBufferSize(qint64 size) { - Q_UNUSED(size); - // FIXME, unsupported right now + QNetworkReply::setReadBufferSize(size); + emit readBufferSizeChanged(size); return; } @@ -839,6 +844,10 @@ void QNetworkReplyHttpImplPrivate::postRequest() QObject::connect(q, SIGNAL(startHttpRequest()), delegate, SLOT(startRequest())); QObject::connect(q, SIGNAL(abortHttpRequest()), delegate, SLOT(abortRequest())); + // To throttle the connection. + QObject::connect(q, SIGNAL(readBufferSizeChanged(qint64)), delegate, SLOT(readBufferSizeChanged(qint64))); + QObject::connect(q, SIGNAL(readBufferFreed(qint64)), delegate, SLOT(readBufferFreed(qint64))); + if (uploadByteDevice) { QNonContiguousByteDeviceThreadForwardImpl *forwardUploadDevice = new QNonContiguousByteDeviceThreadForwardImpl(uploadByteDevice->atEnd(), uploadByteDevice->size()); diff --git a/src/network/access/qnetworkreplyhttpimpl_p.h b/src/network/access/qnetworkreplyhttpimpl_p.h index 6c2313448b..fb7dfb5495 100644 --- a/src/network/access/qnetworkreplyhttpimpl_p.h +++ b/src/network/access/qnetworkreplyhttpimpl_p.h @@ -139,6 +139,8 @@ signals: // To HTTP thread: void startHttpRequest(); void abortHttpRequest(); + void readBufferSizeChanged(qint64 size); + void readBufferFreed(qint64 size); void startHttpRequestSynchronously(); -- cgit v1.2.3