diff options
-rw-r--r-- | src/jsonbuffer.cpp | 214 | ||||
-rw-r--r-- | src/jsonbuffer_p.h | 20 | ||||
-rw-r--r-- | src/jsonclient.cpp | 29 | ||||
-rw-r--r-- | src/jsonclient.h | 3 | ||||
-rw-r--r-- | src/jsonpipe.cpp | 18 | ||||
-rw-r--r-- | src/jsonpipe.h | 1 | ||||
-rw-r--r-- | src/jsonserverclient.cpp | 26 | ||||
-rw-r--r-- | src/jsonserverclient.h | 3 | ||||
-rw-r--r-- | src/jsonstream.cpp | 210 | ||||
-rw-r--r-- | src/jsonstream.h | 23 | ||||
-rw-r--r-- | tests/auto/jsonstream/testClient/main.cpp | 8 | ||||
-rw-r--r-- | tests/auto/jsonstream/tst_jsonstream.cpp | 89 |
12 files changed, 537 insertions, 107 deletions
diff --git a/src/jsonbuffer.cpp b/src/jsonbuffer.cpp index fde637f..3fe5561 100644 --- a/src/jsonbuffer.cpp +++ b/src/jsonbuffer.cpp @@ -73,31 +73,65 @@ JsonBuffer::JsonBuffer(QObject *parent) , mParserDepth(0) , mParserOffset(0) , mParserStartOffset(-1) + , mEmittedReadyRead(false) + , mMessageAvailable(false) + , mMessageSize(0) + , mEnabled(true) { } /*! + \fn bool JsonBuffer::isEnabled() const + + Returns true if \l{readyReadMessage()} signal notifier is enabled; otherwise returns false. + + \sa setEnabled(), readyReadMessage() +*/ + +/*! + \fn void JsonBuffer::setEnabled(bool enable) + + If \a enable is true, \l{readyReadMessage()} signal notifier is enabled; + otherwise the notifier is disabled. + + The notifier is enabled by default, i.e. it emits the \l{readyReadMessage()} + signal whenever a new message is ready. + + The notifier should normally be disabled while user is reading existing messages. + + \sa isEnabled(), readyReadMessage() +*/ + +/*! + \fn int JsonBuffer::size() const + + Returns the size of the buffer in bytes. +*/ + +/*! Append the contents of a byte array \a data onto the buffer. - During the execution of this function, the objectReceived + During the execution of this function, the \l{readyReadMessage()} signal may be raised. */ void JsonBuffer::append(const QByteArray& data) { mBuffer.append(data.data(), data.size()); - processMessages(); + if (0 < size()) + processMessages(); } /*! Append the \a data pointer with length \a len onto the JsonBuffer. - During the execution of this function, objectReceived + During the execution of this function, the \l{readyReadMessage()} signal may be raised. */ void JsonBuffer::append(const char *data, int len) { mBuffer.append(data, len); - processMessages(); + if (0 < size()) + processMessages(); } /*! @@ -131,6 +165,7 @@ int JsonBuffer::copyFromFd(int fd) void JsonBuffer::clear() { mBuffer.clear(); + resetParser(); } /*! @@ -177,6 +212,8 @@ void JsonBuffer::resetParser() mParserDepth = 0; mParserOffset = 0; mParserStartOffset = -1; + mMessageAvailable = false; + mMessageSize = 0; } /*! @@ -185,10 +222,30 @@ void JsonBuffer::resetParser() void JsonBuffer::processMessages() { + // do not process anything if disabled or if control is still inside readyReadMessage() slot + if (mEnabled && !mEmittedReadyRead) { + mEmittedReadyRead = true; + if (messageAvailable()) { + emit readyReadMessage(); + } + mEmittedReadyRead = false; + } +} + +/*! + \internal +*/ +bool JsonBuffer::messageAvailable() +{ + if (mMessageAvailable) { + // already found - no need to check again + return true; + } + if (mFormat == FormatUndefined && mBuffer.size() >= 4) { - if (strncmp("bson", mBuffer.data(), 4) == 0) + if (strncmp("bson", mBuffer.constData(), 4) == 0) mFormat = FormatBSON; - else if (QJsonDocument::BinaryFormatTag == *((uint *) mBuffer.data())) + else if (QJsonDocument::BinaryFormatTag == *((uint *) mBuffer.constData())) mFormat = FormatQBJS; else if (mBuffer.at(0) == 0 && mBuffer.at(1) != 0 && @@ -211,12 +268,8 @@ void JsonBuffer::processMessages() for ( ; mParserOffset < mBuffer.size() ; mParserOffset++ ) { char c = mBuffer.at(mParserOffset); if (scanUtf(c)) { - QByteArray msg = mBuffer.mid(mParserStartOffset, mParserOffset - mParserStartOffset); - QJsonObject obj = QJsonDocument::fromJson(msg).object(); - if (!obj.isEmpty()) - emit objectReceived(obj); - mBuffer = mBuffer.mid(mParserOffset); - resetParser(); + mMessageAvailable = true; + return true; } } break; @@ -224,13 +277,8 @@ void JsonBuffer::processMessages() for ( ; 2 * mParserOffset < mBuffer.size() ; mParserOffset++ ) { int16_t c = qFromBigEndian(reinterpret_cast<const int16_t *>(mBuffer.constData())[mParserOffset]); if (scanUtf(c)) { - QByteArray msg = mBuffer.mid(mParserStartOffset * 2, 2*(mParserOffset - mParserStartOffset)); - QString s = QTextCodec::codecForName("UTF-16BE")->toUnicode(msg); - QJsonObject obj = QJsonDocument::fromJson(s.toUtf8()).object(); - if (!obj.isEmpty()) - emit objectReceived(obj); - mBuffer = mBuffer.mid(mParserOffset*2); - resetParser(); + mMessageAvailable = true; + return true; } } break; @@ -238,42 +286,93 @@ void JsonBuffer::processMessages() for ( ; 2 * mParserOffset < mBuffer.size() ; mParserOffset++ ) { int16_t c = qFromLittleEndian(reinterpret_cast<const int16_t *>(mBuffer.constData())[mParserOffset]); if (scanUtf(c)) { - QByteArray msg = mBuffer.mid(mParserStartOffset * 2, 2*(mParserOffset - mParserStartOffset)); - QString s = QTextCodec::codecForName("UTF-16LE")->toUnicode(msg); - QJsonObject obj = QJsonDocument::fromJson(s.toUtf8()).object(); - if (!obj.isEmpty()) - emit objectReceived(obj); - mBuffer = mBuffer.mid(mParserOffset*2); - resetParser(); + mMessageAvailable = true; + return true; } } break; case FormatBSON: - while (mBuffer.size() >= 8) { - qint32 message_size = qFromLittleEndian(((qint32 *)mBuffer.data())[1]); - if (mBuffer.size() < message_size + 4) - break; - QByteArray msg = mBuffer.mid(4, message_size); - QJsonObject obj = QJsonDocument::fromVariant(BsonObject(msg).toMap()).object(); - if (!obj.isEmpty()) - emit objectReceived(obj); - mBuffer = mBuffer.mid(message_size+4); + if (mBuffer.size() >= 8) { + qint32 message_size = qFromLittleEndian(((qint32 *)mBuffer.constData())[1]); + if (mBuffer.size() >= message_size + 4) { + mMessageSize = message_size; + mMessageAvailable = true; + } } break; case FormatQBJS: - while (mBuffer.size() >= 12) { + if (mBuffer.size() >= 12) { // ### TODO: Should use 'sizeof(Header)' - qint32 message_size = qFromLittleEndian(((qint32 *)mBuffer.data())[2]) + 8; - if (mBuffer.size() < message_size) - break; - QByteArray msg = mBuffer.left(message_size); - QJsonObject obj = QJsonDocument::fromBinaryData(msg).object(); - if (!obj.isEmpty()) - emit objectReceived(obj); - mBuffer = mBuffer.mid(message_size); + qint32 message_size = qFromLittleEndian(((qint32 *)mBuffer.constData())[2]) + 8; + if (mBuffer.size() >= message_size) { + mMessageSize = message_size; + mMessageAvailable = true; + } } break; } + return mMessageAvailable; +} + +/*! + \internal +*/ +QJsonObject JsonBuffer::readMessage() +{ + QJsonObject obj; + if (messageAvailable()) { + switch (mFormat) { + case FormatUndefined: + break; + case FormatUTF8: + if (mParserStartOffset >= 0) { + QByteArray msg = rawData(mParserStartOffset, mParserOffset - mParserStartOffset); + obj = QJsonDocument::fromJson(msg).object(); + // prepare for the next + mBuffer.remove(0, mParserOffset); + resetParser(); + } + break; + case FormatUTF16BE: + if (mParserStartOffset >= 0) { + QByteArray msg = rawData(mParserStartOffset * 2, 2*(mParserOffset - mParserStartOffset)); + QString s = QTextCodec::codecForName("UTF-16BE")->toUnicode(msg); + obj = QJsonDocument::fromJson(s.toUtf8()).object(); + // prepare for the next + mBuffer.remove(0, mParserOffset*2); + resetParser(); + } + break; + case FormatUTF16LE: + if (mParserStartOffset >= 0) { + QByteArray msg = rawData(mParserStartOffset * 2, 2*(mParserOffset - mParserStartOffset)); + QString s = QTextCodec::codecForName("UTF-16LE")->toUnicode(msg); + obj = QJsonDocument::fromJson(s.toUtf8()).object(); + // prepare for the next + mBuffer.remove(0, mParserOffset*2); + resetParser(); + } + break; + case FormatBSON: + if (mMessageSize > 0) { + QByteArray msg = rawData(4, mMessageSize); + obj = QJsonDocument::fromVariant(BsonObject(msg).toMap()).object(); + mBuffer.remove(0, mMessageSize+4); + mMessageSize = 0; + } + break; + case FormatQBJS: + if (mMessageSize > 0) { + QByteArray msg = rawData(0, mMessageSize); + obj = QJsonDocument::fromBinaryData(msg).object(); + mBuffer.remove(0, mMessageSize); + mMessageSize = 0; + } + break; + } + mMessageAvailable = false; + } + return obj; } /*! @@ -286,9 +385,30 @@ EncodingFormat JsonBuffer::format() const } /*! - \fn void JsonBuffer::objectReceived(const QJsonObject& object) - This signal is emitted when a new Qt Binary Json \a object has been received on the - stream. + \fn void JsonBuffer::readyReadMessage() + + This signal is emitted once every time new data is appended to the buffer + and a message is ready. \b readMessage() should be used to retrieve a message + and \b messageAvailable() to check for next available messages. + The client code may look like this: + + \code + ... + connect(jsonbuffer, SIGNAL(readyReadMessage()), this, SLOT(processMessages())); + ... + + void Foo::processMessages() + { + while (jsonbuffer->messageAvailable()) { + QJsonObject obj = jsonbuffer->readMessage(); + <process message> + } + } + \endcode + + \b readyReadMessage() is not emitted recursively; if you reenter the event loop + inside a slot connected to the \b readyReadMessage() signal, the signal will not + be reemitted. */ #include "moc_jsonbuffer_p.cpp" diff --git a/src/jsonbuffer_p.h b/src/jsonbuffer_p.h index 9bce2ce..8069234 100644 --- a/src/jsonbuffer_p.h +++ b/src/jsonbuffer_p.h @@ -62,13 +62,22 @@ public: EncodingFormat format() const; + bool messageAvailable(); + QJsonObject readMessage(); + + int size() const { return mBuffer.size(); } + + inline bool isEnabled() const { return mEnabled; } + inline void setEnabled(bool enable) { mEnabled = enable; } + signals: - void objectReceived(const QJsonObject& object); + void readyReadMessage(); private: void processMessages(); bool scanUtf(int c); void resetParser(); + QByteArray rawData(int _start, int _len) const; private: enum UTF8ParsingState { ParseNormal, ParseInString, ParseInBackslash }; @@ -79,8 +88,17 @@ private: int mParserDepth; int mParserOffset; int mParserStartOffset; + bool mEmittedReadyRead; + bool mMessageAvailable; + int mMessageSize; + bool mEnabled; }; +inline QByteArray JsonBuffer::rawData(int _start, int _len) const +{ + return QByteArray::fromRawData(mBuffer.constData() + _start, _len); +} + QT_END_NAMESPACE_JSONSTREAM #endif // _JSON_BUFFER_H diff --git a/src/jsonclient.cpp b/src/jsonclient.cpp index 7f1d1f4..b16fd96 100644 --- a/src/jsonclient.cpp +++ b/src/jsonclient.cpp @@ -136,8 +136,7 @@ bool JsonClient::connectTCP(const QString& hostname, int port) connect(&d->mStream, SIGNAL(receive(const QJsonObject&)), this, SIGNAL(receive(const QJsonObject&))); - d->mStream.send(d->mRegistrationMessage); - return true; + return d->mStream.send(d->mRegistrationMessage); } return false; @@ -162,11 +161,9 @@ bool JsonClient::connectLocal(const QString& socketname) connect(socket, SIGNAL(disconnected()), SLOT(handleSocketDisconnected())); Q_D(JsonClient); d->mStream.setDevice(socket); - connect(&d->mStream, SIGNAL(messageReceived(const QJsonObject&)), - this, SIGNAL(messageReceived(const QJsonObject&))); + connect(&d->mStream, SIGNAL(readyReadMessage()), this, SLOT(processMessages())); // qDebug() << "Sending local socket registration message" << mRegistrationMessage; - d->mStream.send(d->mRegistrationMessage); - return true; + return d->mStream.send(d->mRegistrationMessage); } delete socket; return false; @@ -174,16 +171,19 @@ bool JsonClient::connectLocal(const QString& socketname) /*! Send a \a message over the socket. + Returns true if the entire message was send/buffered or false otherwise. */ -void JsonClient::send(const QJsonObject &message) +bool JsonClient::send(const QJsonObject &message) { + bool ret = false; Q_D(JsonClient); if (d->mStream.isOpen()) { - d->mStream << message; + ret = d->mStream.send(message); } else { qCritical() << Q_FUNC_INFO << "stream socket is not available"; } + return ret; } /*! @@ -214,6 +214,19 @@ void JsonClient::handleSocketDisconnected() /*! + \internal +*/ +void JsonClient::processMessages() +{ + Q_D(JsonClient); + while (d->mStream.messageAvailable()) { + QJsonObject obj = d->mStream.readMessage(); + if (!obj.isEmpty()) + emit messageReceived(obj); + } +} + +/*! \fn void JsonClient::messageReceived(const QJsonObject &message) This signal is emitted when a \a message is received from the server. */ diff --git a/src/jsonclient.h b/src/jsonclient.h index 70e8e88..2fe683e 100644 --- a/src/jsonclient.h +++ b/src/jsonclient.h @@ -63,7 +63,7 @@ public: bool connectTCP(const QString& hostname, int port); bool connectLocal(const QString& socketname); - void send(const QJsonObject&); + bool send(const QJsonObject&); void setFormat( EncodingFormat format ); // Do we really need a "connect with delay or error" facility? @@ -75,6 +75,7 @@ signals: private slots: void handleSocketDisconnected(); + void processMessages(); private: Q_DECLARE_PRIVATE(JsonClient) diff --git a/src/jsonpipe.cpp b/src/jsonpipe.cpp index 981a25e..18c8f46 100644 --- a/src/jsonpipe.cpp +++ b/src/jsonpipe.cpp @@ -107,8 +107,7 @@ JsonPipe::JsonPipe(QObject *parent) { Q_D(JsonPipe); d->mInBuffer = new JsonBuffer(this); - connect(d->mInBuffer, SIGNAL(objectReceived(const QJsonObject&)), - SLOT(objectReceived(const QJsonObject&))); + connect(d->mInBuffer, SIGNAL(readyReadMessage()), SLOT(processMessages())); } /*! @@ -278,6 +277,21 @@ void JsonPipe::objectReceived(const QJsonObject& object) } /*! + \internal +*/ +void JsonPipe::processMessages() +{ + Q_D(JsonPipe); + d->mInBuffer->setEnabled(false); + while (d->mInBuffer->messageAvailable()) { + QJsonObject obj = d->mInBuffer->readMessage(); + if (!obj.isEmpty()) + objectReceived(obj); + } + d->mInBuffer->setEnabled(true); +} + +/*! Return the current JsonPipe::EncodingFormat. */ diff --git a/src/jsonpipe.h b/src/jsonpipe.h index 63bf387..eef9d65 100644 --- a/src/jsonpipe.h +++ b/src/jsonpipe.h @@ -80,6 +80,7 @@ signals: void error(PipeError); protected slots: + void processMessages(); void objectReceived(const QJsonObject& object); void inReady(int fd); void outReady(int fd); diff --git a/src/jsonserverclient.cpp b/src/jsonserverclient.cpp index 73fff00..5b53e74 100644 --- a/src/jsonserverclient.cpp +++ b/src/jsonserverclient.cpp @@ -121,8 +121,7 @@ void JsonServerClient::setSocket(QLocalSocket *socket) d->m_stream = new JsonStream(socket); d->m_stream->setParent(this); connect(socket, SIGNAL(disconnected()), this, SLOT(handleDisconnect())); - connect(d->m_stream, SIGNAL(messageReceived(const QJsonObject&)), - this, SLOT(received(const QJsonObject&))); + connect(d->m_stream, SIGNAL(readyReadMessage()), this, SLOT(processMessages())); } } @@ -172,8 +171,7 @@ void JsonServerClient::stop() { // qDebug() << Q_FUNC_INFO; Q_D(JsonServerClient); - disconnect(d->m_stream, SIGNAL(messageReceived(const QJsonObject&)), - this, SLOT(received(const QJsonObject&))); + disconnect(d->m_stream, SIGNAL(readyReadMessage()), this, SLOT(processMessages())); d->m_socket->disconnectFromServer(); // qDebug() << Q_FUNC_INFO << "done"; } @@ -209,15 +207,18 @@ void JsonServerClient::received(const QJsonObject& message) /*! Send a \a message to the client. + Returns true if the entire message was send/buffered or false otherwise. */ -void JsonServerClient::send(const QJsonObject &message) +bool JsonServerClient::send(const QJsonObject &message) { + bool ret = false; Q_D(JsonServerClient); if (d->m_stream) { // qDebug() << "Sending message" << message; - d->m_stream->send(message); + ret = d->m_stream->send(message); } + return ret; } void JsonServerClient::handleDisconnect() @@ -233,6 +234,19 @@ void JsonServerClient::handleDisconnect() } /*! + \internal + */ +void JsonServerClient::processMessages() +{ + Q_D(JsonServerClient); + while (d->m_stream->messageAvailable()) { + QJsonObject obj = d->m_stream->readMessage(); + if (!obj.isEmpty()) + received(obj); + } +} + +/*! \fn JsonServerClient::disconnected(const QString& identifier) This signal is emitted when the client has been disconnected. The \a identifier property is included for convenience. diff --git a/src/jsonserverclient.h b/src/jsonserverclient.h index 00b7394..5c97dfb 100644 --- a/src/jsonserverclient.h +++ b/src/jsonserverclient.h @@ -64,7 +64,7 @@ public: void start(); void stop(); - void send(const QJsonObject &message); + bool send(const QJsonObject &message); void setAuthority(JsonAuthority *authority); @@ -83,6 +83,7 @@ signals: private slots: void received(const QJsonObject& message); void handleDisconnect(); + void processMessages(); private: Q_DECLARE_PRIVATE(JsonServerClient) diff --git a/src/jsonstream.cpp b/src/jsonstream.cpp index 348eb2d..feacffc 100644 --- a/src/jsonstream.cpp +++ b/src/jsonstream.cpp @@ -74,11 +74,15 @@ class JsonStreamPrivate public: JsonStreamPrivate() : mDevice(0) - , mFormat(FormatUndefined) {} + , mFormat(FormatUndefined) + , mReadBufferSize(0) + , mWriteBufferSize(0) {} QIODevice *mDevice; JsonBuffer *mBuffer; EncodingFormat mFormat; + qint64 mReadBufferSize; + qint64 mWriteBufferSize; }; /****************************************************************************/ @@ -102,8 +106,7 @@ JsonStream::JsonStream(QIODevice *device) { Q_D(JsonStream); d->mBuffer = new JsonBuffer(this); - connect(d->mBuffer, SIGNAL(objectReceived(const QJsonObject&)), - SLOT(objectReceived(const QJsonObject&))); + connect(d->mBuffer, SIGNAL(readyReadMessage()), SLOT(messageReceived())); setDevice(device); } @@ -160,21 +163,30 @@ void JsonStream::setDevice( QIODevice *device ) Q_D(JsonStream); if (d->mDevice) { disconnect(d->mDevice, SIGNAL(readyRead()), this, SLOT(dataReadyOnSocket())); + disconnect(d->mDevice, SIGNAL(bytesWritten(qint64)), this, SIGNAL(bytesWritten(qint64))); disconnect(d->mDevice, SIGNAL(aboutToClose()), this, SIGNAL(aboutToClose())); } d->mDevice = device; if (device) { connect(device, SIGNAL(readyRead()), this, SLOT(dataReadyOnSocket())); + connect(device, SIGNAL(bytesWritten(qint64)), this, SIGNAL(bytesWritten(qint64))); connect(device, SIGNAL(aboutToClose()), this, SIGNAL(aboutToClose())); } } /*! - Send a JsonObject \a object over the stream + Send a JsonObject \a object over the stream. + Returns \b true if the entire message was sent or buffered or \b false otherwise. + + JsonStream does not have a write buffer of its own. Rather, it uses the + write buffer of the \l{device()}. It will not cause that buffer to grow + larger than \l{writeBufferSize()} at any time. If this would occur, this + method will return \b false. */ -void JsonStream::send(const QJsonObject& object) +bool JsonStream::send(const QJsonObject& object) { + bool bRet(false); QJsonDocument document(object); Q_D(JsonStream); @@ -183,40 +195,55 @@ void JsonStream::send(const QJsonObject& object) d->mFormat = FormatQBJS; // Deliberate fall through case FormatQBJS: - sendInternal( document.toBinaryData() ); + bRet = sendInternal( document.toBinaryData() ); break; case FormatUTF8: - sendInternal( document.toJson() ); + bRet = sendInternal( document.toJson() ); break; case FormatUTF16BE: - sendInternal( QTextCodec::codecForName("UTF-16BE")->fromUnicode(QString::fromUtf8(document.toJson())).mid(2) ); // Chop off BOM + bRet = sendInternal( QTextCodec::codecForName("UTF-16BE")->fromUnicode(QString::fromUtf8(document.toJson())).mid(2) ); // Chop off BOM break; case FormatUTF16LE: - sendInternal( QTextCodec::codecForName("UTF-16LE")->fromUnicode(QString::fromUtf8(document.toJson())).mid(2) ); // Chop off BOM + bRet = sendInternal( QTextCodec::codecForName("UTF-16LE")->fromUnicode(QString::fromUtf8(document.toJson())).mid(2) ); // Chop off BOM break; case FormatBSON: { BsonObject bson(document.toVariant().toMap()); - sendInternal(bsonToByteArray(bson)); + bRet = sendInternal(bsonToByteArray(bson)); break; } } + return bRet; } /*! \internal Send raw QByteArray \a byteArray data over the socket. */ - -void JsonStream::sendInternal(const QByteArray& byteArray) +bool JsonStream::sendInternal(const QByteArray& byteArray) { Q_D(JsonStream); - if (!d->mDevice) { + if (!isOpen()) { qWarning() << Q_FUNC_INFO << "No device in JsonStream"; - return; + return false; + } + + int nBytes = 0; + if (d->mWriteBufferSize <= 0 || d->mDevice->bytesToWrite() + byteArray.size() <= d->mWriteBufferSize) { + for (int nSz = byteArray.size(); nSz > 0; ) { + int nWrite = d->mDevice->write( byteArray.constData() + nBytes, nSz); + if (nWrite <= 0) { + // write error + qWarning() << Q_FUNC_INFO << __LINE__ + << QString::fromLatin1("Write error. QIODevice::write() returned %1 (%2).") + .arg(nWrite).arg(d->mDevice->errorString()); + break; + } + nBytes += nWrite; + nSz -= nWrite; + } } - int nBytes = d->mDevice->write( byteArray.data(), byteArray.size() ); if (QLocalSocket *socket = qobject_cast<QLocalSocket*>(d->mDevice)) socket->flush(); else if (QAbstractSocket *socket = qobject_cast<QAbstractSocket*>(d->mDevice)) @@ -224,22 +251,24 @@ void JsonStream::sendInternal(const QByteArray& byteArray) else qWarning() << Q_FUNC_INFO << "Unknown socket type:" << d->mDevice->metaObject()->className(); - if (nBytes != byteArray.size()) + bool bFail; + if ((bFail = (nBytes != byteArray.size()))) qCritical() << Q_FUNC_INFO << __LINE__ << QString::fromLatin1("Expected to write %1 bytes, actually %2.").arg(byteArray.size()).arg(nBytes); + return !bFail; } /*! \internal - Handle a received Qt Binary Json \a object and emit the correct signals + Handle a received readyReadMessage signal and emit the correct signals */ -void JsonStream::objectReceived(const QJsonObject& object) +void JsonStream::messageReceived() { Q_D(JsonStream); if (d->mFormat == FormatUndefined) d->mFormat = d->mBuffer->format(); - emit messageReceived(object); + emit readyReadMessage(); } /*! @@ -250,6 +279,26 @@ void JsonStream::objectReceived(const QJsonObject& object) void JsonStream::dataReadyOnSocket() { Q_D(JsonStream); + if (d->mReadBufferSize > 0) { + while (d->mDevice->bytesAvailable() + d->mBuffer->size() > d->mReadBufferSize) { + // can't fit all data into a read buffer - read a part that fits + d->mBuffer->append(d->mDevice->read(d->mReadBufferSize - d->mBuffer->size())); + + // if the read buffer is full then emit readBufferOverflow and allow user to increase the buffer size + if (d->mBuffer->size() == d->mReadBufferSize) { + emit readBufferOverflow(d->mDevice->bytesAvailable() + d->mBuffer->size()); + if (d->mBuffer->size() == d->mReadBufferSize) { + // still can't read anything - close connection + d->mDevice->close(); + return; + } + else if (0 == d->mReadBufferSize) { + // user removed buffer size limitation + break; + } + } + } + } d->mBuffer->append( d->mDevice->readAll()); } @@ -273,23 +322,118 @@ void JsonStream::setFormat( EncodingFormat format ) d->mFormat = format; } +/*! + Returns a maximum size of the inbound message buffer. + */ +qint64 JsonStream::readBufferSize() const +{ + Q_D(const JsonStream); + return d->mReadBufferSize; +} /*! - \relates JsonStream + Sets a maximum size of the inbound message buffer to \a sz thus capping a size + of an inbound message. + */ +void JsonStream::setReadBufferSize(qint64 sz) +{ + if (sz >= 0) { + Q_D(JsonStream); + d->mReadBufferSize = sz; + } +} - Sends the \a map via the \a stream. -*/ -JsonStream& operator<<( JsonStream& stream, const QJsonObject& map ) +/*! + Returns a maximum size of the outbound message buffer. A value of 0 + means the buffer size is unlimited. + */ +qint64 JsonStream::writeBufferSize() const { - stream.send(map); - return stream; + Q_D(const JsonStream); + return d->mWriteBufferSize; } +/*! + Sets a maximum size of the outbound message buffer to \a sz thus capping a size + of an outbound message. A value of 0 means the buffer size is unlimited. + */ +void JsonStream::setWriteBufferSize(qint64 sz) +{ + if (sz >= 0) { + Q_D(JsonStream); + d->mWriteBufferSize = sz; + } +} /*! - \fn void JsonStream::messageReceived(const QJsonObject& message) - This signal is emitted when a new \a message has been received on the - stream. + Returns a number of bytes currently in the write buffer. Effectively, + if \l{writeBufferSize()} is not unlimited, the largest message you can + send at any one time is (\l{writeBufferSize()} - \b bytesToWrite()) bytes. + */ +qint64 JsonStream::bytesToWrite() const +{ + Q_D(const JsonStream); + return (d->mDevice ? d->mDevice->bytesToWrite() : 0); +} + +/*! + Returns a JSON object that has been received. If no message is + available, an empty JSON object is returned. + */ +QJsonObject JsonStream::readMessage() +{ + Q_D(JsonStream); + return d->mBuffer->readMessage(); +} + +/*! + Returns \b true if a message is available to be read via \l{readMessage()} + or \b false otherwise. + */ +bool JsonStream::messageAvailable() +{ + Q_D(JsonStream); + return d->mBuffer->messageAvailable(); +} + +/*! \fn JsonStream::bytesWritten(qint64 bytes) + + This signal is emitted every time a payload of data has been + written to the device. The \a bytes argument is set to the number + of bytes that were written in this payload. + + If a previous call to \l{send()} returned \b false, you should re-send + the message when this signal is emitted, as the write buffer may have been + emptied enough to hold the new message. +*/ + +/*! + \fn void JsonStream::readyReadMessage() + + This signal is emitted once every time new data arrives on the \l{device()} + and a message is ready. \l{readMessage()} should be used to retrieve a message + and \l{messageAvailable()} to check for next available messages. + The client code may look like this: + + \code + ... + connect(jsonstream, SIGNAL(readyReadMessage()), this, SLOT(processMessages())); + ... + + void Foo::processMessages() + { + while (jsonstream->messageAvailable()) { + QJsonObject obj = jsonstream->readMessage(); + <process message> + } + } + \endcode + + \b readyReadMessage() is not emitted recursively; if you reenter the event loop + inside a slot connected to the \b readyReadMessage() signal, the signal will not + be reemitted. + + \sa readMessage(), messageAvailable() */ /*! @@ -297,6 +441,16 @@ JsonStream& operator<<( JsonStream& stream, const QJsonObject& map ) This signal is emitted when the underlying \c QIODevice is about to close. */ +/*! \fn JsonStream::readBufferOverflow(qint64 bytes) + + This signal is emitted when the read buffer is full of data that has been read + from the \l{device()}, \a bytes additional bytes are available on the device, + but the message is not complete. The \l{readBufferSize()} mayb e increased + to a sufficient size in a slot connected to this signal, in which case more + data will be read into the read buffer. If the buffer size is not increased, + the connection is closed. + */ + #include "moc_jsonstream.cpp" QT_END_NAMESPACE_JSONSTREAM diff --git a/src/jsonstream.h b/src/jsonstream.h index 3acb204..693dad4 100644 --- a/src/jsonstream.h +++ b/src/jsonstream.h @@ -64,29 +64,40 @@ public: QIODevice *device() const; void setDevice( QIODevice *device ); - void send(const QJsonObject& message); + bool send(const QJsonObject& message); EncodingFormat format() const; void setFormat(EncodingFormat format); + qint64 readBufferSize() const; + void setReadBufferSize(qint64); + + qint64 writeBufferSize() const; + void setWriteBufferSize(qint64 sz); + + qint64 bytesToWrite() const; + + bool messageAvailable(); + QJsonObject readMessage(); + signals: - void messageReceived(const QJsonObject& message); + void bytesWritten(qint64); + void readyReadMessage(); void aboutToClose(); + void readBufferOverflow(qint64); protected slots: void dataReadyOnSocket(); - void objectReceived(const QJsonObject& object); + void messageReceived(); protected: - void sendInternal(const QByteArray& byteArray); + bool sendInternal(const QByteArray& byteArray); private: Q_DECLARE_PRIVATE(JsonStream) QScopedPointer<JsonStreamPrivate> d_ptr; }; -JsonStream& operator<<( JsonStream&, const QJsonObject& ); - QT_END_NAMESPACE_JSONSTREAM #endif // _JSON_STREAM_H diff --git a/tests/auto/jsonstream/testClient/main.cpp b/tests/auto/jsonstream/testClient/main.cpp index 79e9057..e23e301 100644 --- a/tests/auto/jsonstream/testClient/main.cpp +++ b/tests/auto/jsonstream/testClient/main.cpp @@ -127,6 +127,14 @@ void Container::sendSchemaTestMessage() void Container::received(const QJsonObject& message) { + // test large string size + if (message.contains("large") || message.contains("large_size")) { + if (message.value("large").toString().size() != message.value("large_size").toDouble()) { + qWarning() << "Large string size mismatch" << message.value("large").toString().size() << "!=" << message.value("large_size").toDouble(); + exit(3); + } + } + QString command = message.value("command").toString(); if (!command.isEmpty()) { qDebug() << "Received command" << command; diff --git a/tests/auto/jsonstream/tst_jsonstream.cpp b/tests/auto/jsonstream/tst_jsonstream.cpp index 14f3b08..8bc12d6 100644 --- a/tests/auto/jsonstream/tst_jsonstream.cpp +++ b/tests/auto/jsonstream/tst_jsonstream.cpp @@ -142,6 +142,7 @@ public: void waitForFinished() { if (process->state() == QProcess::Running) QVERIFY(process->waitForFinished(5000)); + QVERIFY(process->exitCode() == 0); delete process; process = 0; } @@ -173,7 +174,7 @@ class BasicServer : public QObject { Q_OBJECT public: - BasicServer(const QString& socketname) : socket(0), stream(0) { + BasicServer(const QString& socketname, qint64 _sz = 0) : socket(0), stream(0), readBufferSize(_sz) { QLocalServer::removeServer(socketname); server = new QLocalServer(this); connect(server, SIGNAL(newConnection()), SLOT(handleConnection())); @@ -186,9 +187,8 @@ public: server = NULL; } - void send(const QJsonObject& message) { - QVERIFY(stream); - stream->send(message); + bool send(const QJsonObject& message) { + return stream ? stream->send(message) : false; } void waitDisconnect(int timeout=5000) { @@ -207,15 +207,26 @@ public: return stream->format(); } + JsonStream *jsonStream() const { return stream; } private slots: void handleConnection() { socket = server->nextPendingConnection(); QVERIFY(socket); stream = new JsonStream(socket); stream->setParent(socket); + if (readBufferSize > 0) + stream->setReadBufferSize(readBufferSize); connect(socket, SIGNAL(disconnected()), SLOT(handleDisconnection())); - connect(stream, SIGNAL(messageReceived(const QJsonObject&)), - SIGNAL(messageReceived(const QJsonObject&))); + connect(stream, SIGNAL(readyReadMessage()), SLOT(processMessages())); + connect(stream, SIGNAL(readBufferOverflow(qint64)), SLOT(handleReadBufferOverflow(qint64))); + } + + void processMessages() { + while (stream->messageAvailable()) { + QJsonObject obj = stream->readMessage(); + if (!obj.isEmpty()) + emit messageReceived(obj); + } } void handleDisconnection() { @@ -225,13 +236,21 @@ private slots: stream = NULL; } + void handleReadBufferOverflow(qint64 sz) { + QVERIFY(readBufferSize > 0 && sz > readBufferSize); + stream->setReadBufferSize(sz); + emit readBufferOverflow(sz); + } + signals: void messageReceived(const QJsonObject&); + void readBufferOverflow(qint64); private: QLocalServer *server; QLocalSocket *socket; JsonStream *stream; + qint64 readBufferSize; }; /****************************/ @@ -253,6 +272,7 @@ private slots: void pipeTest(); void pipeFormatTest(); void pipeWaitTest(); + void bufferSizeTest(); }; void tst_JsonStream::initTestCase() @@ -417,12 +437,67 @@ void tst_JsonStream::formatTest() QVERIFY(object.value("item2").toString() == "This is item 2"); msg.insert("command", QLatin1String("exit")); - server.send(msg); + QVERIFY(server.send(msg)); server.waitDisconnect(); child.waitForFinished(); } } +void tst_JsonStream::bufferSizeTest() +{ + QString socketname = "/tmp/tst_socket"; + + BasicServer server(socketname, 100); + QSignalSpy spy(&server, SIGNAL(messageReceived(const QJsonObject&))); + QSignalSpy spy1(&server, SIGNAL(readBufferOverflow(qint64))); + QTime stopWatch; + + Child child("testClient/testClient", + QStringList() << "-socket" << socketname); + + stopWatch.start(); + forever { + QTestEventLoop::instance().enterLoop(1); + if (stopWatch.elapsed() >= 5000) + QFAIL("Timed out"); + if (spy.count()) + break; + } + QVERIFY(spy1.count() == 1); // overflow happend only once + + QJsonObject msg = qvariant_cast<QJsonObject>(spy.last().at(0)); + QVERIFY(msg.value("text").isString() && msg.value("text").toString() == QLatin1String("Standard text")); + QVERIFY(msg.value("int").isDouble() && msg.value("int").toDouble() == 100); + QVERIFY(msg.value("float").isDouble() && msg.value("float").toDouble() == 100.0); + QVERIFY(msg.value("true").isBool() && msg.value("true").toBool() == true); + QVERIFY(msg.value("false").isBool() && msg.value("false").toBool() == false); + QVERIFY(msg.value("array").isArray()); + QJsonArray array = msg.value("array").toArray(); + QVERIFY(array.size() == 3); + QVERIFY(array.at(0).toString() == "one"); + QVERIFY(array.at(1).toString() == "two"); + QVERIFY(array.at(2).toString() == "three"); + QVERIFY(msg.value("object").isObject()); + QJsonObject object = msg.value("object").toObject(); + QVERIFY(object.value("item1").toString() == "This is item 1"); + QVERIFY(object.value("item2").toString() == "This is item 2"); + + msg.insert("command", QLatin1String("exit")); + + server.jsonStream()->setWriteBufferSize(100); + QVERIFY(!server.send(msg)); + + QString strLarge(500000, '*'); + msg.insert("large", strLarge); + msg.insert("large_size", strLarge.size()); + server.jsonStream()->setWriteBufferSize(0); + QVERIFY(server.send(msg)); + + + server.waitDisconnect(); + child.waitForFinished(); +} + void tst_JsonStream::schemaTest() { QString strSchemasDir(QDir::currentPath() + "/" + "schemas"); |