From 7d257aab8175f3d4fd3b838d4f47457a8d868885 Mon Sep 17 00:00:00 2001 From: Alex Trotsenko Date: Tue, 2 Jun 2015 22:01:44 +0300 Subject: QIODevice: handle incomplete reads Introduce a transaction mechanism that gives the ability to read the data atomically. Current implementation supports transactions for both types of devices. For sequential devices, it records the whole input stream during transaction. For random-access devices, device position is saved when transaction starts. If an error occurs, the application may be able to recover the input stream by rolling back to the start point. Also, QIODevice::peek() was rewritten to make use of transactions internally. The replacement of QIODevicePrivateLinearBuffer by QRingBuffer is closely entangled with that, which makes it unfeasible to do separately. Bump the TypeInformationVersion field in qtHookData, to notify the Qt Creator developers that the offset of QFilePrivate::fileName was changed and dumpers should be adapted. [ChangeLog][QtCore] Added QIODevice's startTransaction(), commitTransaction(), rollbackTransaction(), isTransactionStarted() functions to support the read transactions. Task-number: QTBUG-44418 Change-Id: I3564b343ebeeaaf7c48a1dcdb7ef0a7ffec550f2 Reviewed-by: Joerg Bornemann Reviewed-by: Oswald Buddenhagen --- src/corelib/io/qiodevice.cpp | 244 ++++++++++++++++++++++++++++++++++++------- 1 file changed, 204 insertions(+), 40 deletions(-) (limited to 'src/corelib/io/qiodevice.cpp') diff --git a/src/corelib/io/qiodevice.cpp b/src/corelib/io/qiodevice.cpp index 64078b5c54..00b50af5ca 100644 --- a/src/corelib/io/qiodevice.cpp +++ b/src/corelib/io/qiodevice.cpp @@ -143,7 +143,9 @@ static void checkWarnMessage(const QIODevice *device, const char *function, cons */ QIODevicePrivate::QIODevicePrivate() : openMode(QIODevice::NotOpen), buffer(QIODEVICE_BUFFERSIZE), - pos(0), devicePos(0) + pos(0), devicePos(0), + transactionPos(0), + transactionStarted(false) , baseReadLineDataCalled(false) , accessMode(Unset) #ifdef QT_NO_QOBJECT @@ -264,6 +266,12 @@ QIODevicePrivate::~QIODevicePrivate() subclassing QIODevice, remember to bypass any buffer you may use when the device is open in Unbuffered mode. + Usually, the incoming data stream from an asynchronous device is + fragmented, and chunks of data can arrive at arbitrary points in time. + To handle incomplete reads of data structures, use the transaction + mechanism implemented by QIODevice. See startTransaction() and related + functions for more details. + \sa QBuffer, QFile, QTcpSocket */ @@ -588,6 +596,8 @@ void QIODevice::close() d->openMode = NotOpen; d->errorString.clear(); d->pos = 0; + d->transactionStarted = false; + d->transactionPos = 0; d->buffer.clear(); } @@ -660,18 +670,8 @@ bool QIODevice::seek(qint64 pos) this, pos, d->pos, d->buffer.size()); #endif - qint64 offset = pos - d->pos; - d->pos = pos; d->devicePos = pos; - - if (offset < 0 || offset >= d->buffer.size()) - // When seeking backwards, an operation that is only allowed for - // random-access devices, the buffer is cleared. The next read - // operation will then refill the buffer. We can optimize this, if we - // find that seeking backwards becomes a significant performance hit. - d->buffer.clear(); - else - d->buffer.skip(offset); + d->seekBuffer(pos); #if defined QIODEVICE_DEBUG printf("%p \tafter: d->pos == %lld, d->buffer.size() == %lld\n", this, d->pos, @@ -680,6 +680,24 @@ bool QIODevice::seek(qint64 pos) return true; } +/*! + \internal +*/ +void QIODevicePrivate::seekBuffer(qint64 newPos) +{ + const qint64 offset = newPos - pos; + pos = newPos; + + if (offset < 0 || offset >= buffer.size()) { + // When seeking backwards, an operation that is only allowed for + // random-access devices, the buffer is cleared. The next read + // operation will then refill the buffer. + buffer.clear(); + } else { + buffer.free(offset); + } +} + /*! Returns \c true if the current read and write position is at the end of the device (i.e. there is no more data available for reading on @@ -695,7 +713,7 @@ bool QIODevice::seek(qint64 pos) bool QIODevice::atEnd() const { Q_D(const QIODevice); - const bool result = (d->openMode == NotOpen || (d->buffer.isEmpty() + const bool result = (d->openMode == NotOpen || (d->isBufferEmpty() && bytesAvailable() == 0)); #if defined QIODEVICE_DEBUG printf("%p QIODevice::atEnd() returns %s, d->openMode == %d, d->pos == %lld\n", this, @@ -740,7 +758,7 @@ qint64 QIODevice::bytesAvailable() const Q_D(const QIODevice); if (!d->isSequential()) return qMax(size() - d->pos, qint64(0)); - return d->buffer.size(); + return d->buffer.size() - d->transactionPos; } /*! For buffered devices, this function returns the number of bytes @@ -777,9 +795,10 @@ qint64 QIODevice::read(char *data, qint64 maxSize) #endif const bool sequential = d->isSequential(); + const bool keepDataInBuffer = sequential && d->transactionStarted; // Short circuit for getChar() - if (maxSize == 1) { + if (maxSize == 1 && !keepDataInBuffer) { int chint; while ((chint = d->buffer.getChar()) != -1) { if (!sequential) @@ -806,9 +825,13 @@ qint64 QIODevice::read(char *data, qint64 maxSize) char *readPtr = data; forever { // Try reading from the buffer. - qint64 bufferReadChunkSize = d->buffer.read(data, maxSize); + qint64 bufferReadChunkSize = keepDataInBuffer + ? d->buffer.peek(data, maxSize, d->transactionPos) + : d->buffer.read(data, maxSize); if (bufferReadChunkSize > 0) { - if (!sequential) + if (keepDataInBuffer) + d->transactionPos += bufferReadChunkSize; + else if (!sequential) d->pos += bufferReadChunkSize; readSoFar += bufferReadChunkSize; data += bufferReadChunkSize; @@ -826,7 +849,8 @@ qint64 QIODevice::read(char *data, qint64 maxSize) // Make sure the device is positioned correctly. if (sequential || d->pos == d->devicePos || seek(d->pos)) { madeBufferReadsOnly = false; // fix readData attempt - if (maxSize >= QIODEVICE_BUFFERSIZE || (d->openMode & Unbuffered)) { + if ((maxSize >= QIODEVICE_BUFFERSIZE || (d->openMode & Unbuffered)) + && !keepDataInBuffer) { // Read big chunk directly to output buffer readFromDevice = readData(data, maxSize); deviceAtEof = (readFromDevice != maxSize); @@ -844,7 +868,9 @@ qint64 QIODevice::read(char *data, qint64 maxSize) } } } else { - const qint64 bytesToBuffer = QIODEVICE_BUFFERSIZE; + // Do not read more than maxSize on unbuffered devices + const qint64 bytesToBuffer = (d->openMode & Unbuffered) + ? qMin(maxSize, QIODEVICE_BUFFERSIZE) : QIODEVICE_BUFFERSIZE; // Try to fill QIODevice buffer by single read readFromDevice = readData(d->buffer.reserve(bytesToBuffer), bytesToBuffer); deviceAtEof = (readFromDevice != bytesToBuffer); @@ -907,10 +933,8 @@ qint64 QIODevice::read(char *data, qint64 maxSize) debugBinaryString(data - readSoFar, readSoFar); #endif - if (madeBufferReadsOnly && d->buffer.isEmpty()) { - d->buffer.clear(); + if (madeBufferReadsOnly && d->isBufferEmpty()) readData(data, 0); - } return readSoFar; } @@ -992,7 +1016,9 @@ QByteArray QIODevice::readAll() qint64 readBytes = (d->isSequential() ? Q_INT64_C(0) : size()); if (readBytes == 0) { // Size is unknown, read incrementally. - qint64 readChunkSize = qMax(d->buffer.size(), QIODEVICE_BUFFERSIZE); + qint64 readChunkSize = qMax(QIODEVICE_BUFFERSIZE, + d->isSequential() ? (d->buffer.size() - d->transactionPos) + : d->buffer.size()); qint64 readResult; do { if (readBytes + readChunkSize >= MaxByteArraySize) { @@ -1077,21 +1103,35 @@ qint64 QIODevice::readLine(char *data, qint64 maxSize) --maxSize; const bool sequential = d->isSequential(); + const bool keepDataInBuffer = sequential && d->transactionStarted; qint64 readSoFar = 0; - if (!d->buffer.isEmpty()) { - readSoFar = d->buffer.readLine(data, maxSize); + if (keepDataInBuffer) { + if (d->transactionPos < d->buffer.size()) { + // Peek line from the specified position + const qint64 i = d->buffer.indexOf('\n', maxSize, d->transactionPos); + readSoFar = d->buffer.peek(data, i >= 0 ? (i - d->transactionPos + 1) : maxSize, + d->transactionPos); + d->transactionPos += readSoFar; + if (d->transactionPos == d->buffer.size()) + readData(data, 0); + } + } else if (!d->buffer.isEmpty()) { + // QRingBuffer::readLine() terminates the line with '\0' + readSoFar = d->buffer.readLine(data, maxSize + 1); if (d->buffer.isEmpty()) readData(data,0); if (!sequential) d->pos += readSoFar; + } + + if (readSoFar) { #if defined QIODEVICE_DEBUG printf("%p \tread from buffer: %lld bytes, last character read: %hhx\n", this, readSoFar, data[readSoFar - 1]); - if (readSoFar) - debugBinaryString(data, int(readSoFar)); + debugBinaryString(data, int(readSoFar)); #endif - if (readSoFar && data[readSoFar - 1] == '\n') { + if (data[readSoFar - 1] == '\n') { if (d->openMode & Text) { // QRingBuffer::readLine() isn't Text aware. if (readSoFar > 1 && data[readSoFar - 2] == '\r') { @@ -1107,7 +1147,11 @@ qint64 QIODevice::readLine(char *data, qint64 maxSize) if (d->pos != d->devicePos && !sequential && !seek(d->pos)) return qint64(-1); d->baseReadLineDataCalled = false; - qint64 readBytes = readLineData(data + readSoFar, maxSize - readSoFar); + // Force base implementation for transaction on sequential device + // as it stores the data in internal buffer automatically. + qint64 readBytes = keepDataInBuffer + ? QIODevice::readLineData(data + readSoFar, maxSize - readSoFar) + : readLineData(data + readSoFar, maxSize - readSoFar); #if defined QIODEVICE_DEBUG printf("%p \tread from readLineData: %lld bytes, readSoFar = %lld bytes\n", this, readBytes, readSoFar); @@ -1262,7 +1306,95 @@ qint64 QIODevice::readLineData(char *data, qint64 maxSize) */ bool QIODevice::canReadLine() const { - return d_func()->buffer.canReadLine(); + Q_D(const QIODevice); + return d->buffer.indexOf('\n', d->buffer.size(), + d->isSequential() ? d->transactionPos : Q_INT64_C(0)) >= 0; +} + +/*! + \since 5.7 + + Starts a new read transaction on the device. + + Defines a restorable point within the sequence of read operations. For + sequential devices, read data will be duplicated internally to allow + recovery in case of incomplete reads. For random-access devices, + this function saves the current position. Call commitTransaction() or + rollbackTransaction() to finish the transaction. + + \note Nesting transactions is not supported. + + \sa commitTransaction(), rollbackTransaction() +*/ +void QIODevice::startTransaction() +{ + Q_D(QIODevice); + if (d->transactionStarted) { + checkWarnMessage(this, "startTransaction", "Called while transaction already in progress"); + return; + } + d->transactionPos = d->pos; + d->transactionStarted = true; +} + +/*! + \since 5.7 + + Completes a read transaction. + + For sequential devices, all data recorded in the internal buffer during + the transaction will be discarded. + + \sa startTransaction(), rollbackTransaction() +*/ +void QIODevice::commitTransaction() +{ + Q_D(QIODevice); + if (!d->transactionStarted) { + checkWarnMessage(this, "commitTransaction", "Called while no transaction in progress"); + return; + } + if (d->isSequential()) + d->buffer.free(d->transactionPos); + d->transactionStarted = false; + d->transactionPos = 0; +} + +/*! + \since 5.7 + + Rolls back a read transaction. + + Restores the input stream to the point of the startTransaction() call. + This function is commonly used to rollback the transaction when an + incomplete read was detected prior to committing the transaction. + + \sa startTransaction(), commitTransaction() +*/ +void QIODevice::rollbackTransaction() +{ + Q_D(QIODevice); + if (!d->transactionStarted) { + checkWarnMessage(this, "rollbackTransaction", "Called while no transaction in progress"); + return; + } + if (!d->isSequential()) + d->seekBuffer(d->transactionPos); + d->transactionStarted = false; + d->transactionPos = 0; +} + +/*! + \since 5.7 + + Returns \c true if a transaction is in progress on the device, otherwise + \c false. + + \sa startTransaction() +*/ +bool QIODevice::isTransactionStarted() const +{ + return d_func()->transactionStarted; } /*! @@ -1386,12 +1518,19 @@ qint64 QIODevice::write(const char *data) If \a c was not previously read from the device, the behavior is undefined. + + \note This function is not available while a transaction is in progress. */ void QIODevice::ungetChar(char c) { Q_D(QIODevice); CHECK_READABLE(read, Q_VOID); + if (d->transactionStarted) { + checkWarnMessage(this, "ungetChar", "Called while transaction is in progress"); + return; + } + #if defined QIODEVICE_DEBUG printf("%p QIODevice::ungetChar(0x%hhx '%c')\n", this, c, isprint(c) ? c : '?'); #endif @@ -1426,13 +1565,26 @@ bool QIODevicePrivate::putCharHelper(char c) */ qint64 QIODevicePrivate::peek(char *data, qint64 maxSize) { - qint64 readBytes = q_func()->read(data, maxSize); - if (readBytes <= 0) + Q_Q(QIODevice); + + if (transactionStarted) { + const qint64 savedTransactionPos = transactionPos; + const qint64 savedPos = pos; + + qint64 readBytes = q->read(data, maxSize); + + // Restore initial position + if (isSequential()) + transactionPos = savedTransactionPos; + else + seekBuffer(savedPos); return readBytes; + } + + q->startTransaction(); + qint64 readBytes = q->read(data, maxSize); + q->rollbackTransaction(); - buffer.ungetBlock(data, readBytes); - if (!isSequential()) - pos -= readBytes; return readBytes; } @@ -1441,14 +1593,26 @@ qint64 QIODevicePrivate::peek(char *data, qint64 maxSize) */ QByteArray QIODevicePrivate::peek(qint64 maxSize) { - QByteArray result = q_func()->read(maxSize); + Q_Q(QIODevice); + + if (transactionStarted) { + const qint64 savedTransactionPos = transactionPos; + const qint64 savedPos = pos; + + QByteArray result = q->read(maxSize); - if (result.isEmpty()) + // Restore initial position + if (isSequential()) + transactionPos = savedTransactionPos; + else + seekBuffer(savedPos); return result; + } + + q->startTransaction(); + QByteArray result = q->read(maxSize); + q->rollbackTransaction(); - buffer.ungetBlock(result.constData(), result.size()); - if (!isSequential()) - pos -= result.size(); return result; } -- cgit v1.2.3