summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAlexei Rousskikh <ext-alexei.rousskikh@nokia.com>2012-03-19 12:27:04 -0400
committerChris Craig <ext-chris.craig@nokia.com>2012-03-23 19:51:15 +0100
commitcc4e1a955a9566b1a2f66d47f0ca05a0926dc98e (patch)
treeeb78be8e33c812a46a26687516b33c2bb57c075e
parent76131f514514b85baeb47a7e0bf133ad3092c565 (diff)
JsonStream & JsonBuffer API redesign
The JsonStream API has been redesigned to support more control over the size of read and write buffers. In addition, the API for notification and reading of new messages has been changed to a more QIODevice-like flavor (readyReadMessage(), messageAvailable(), and readMessage()). Change-Id: I90104713c0cf785db86c2ff716f91c0bd8bb22a0 Reviewed-by: Alexei Rousskikh <ext-alexei.rousskikh@nokia.com> Reviewed-by: Chris Craig <ext-chris.craig@nokia.com>
-rw-r--r--src/jsonbuffer.cpp214
-rw-r--r--src/jsonbuffer_p.h20
-rw-r--r--src/jsonclient.cpp29
-rw-r--r--src/jsonclient.h3
-rw-r--r--src/jsonpipe.cpp18
-rw-r--r--src/jsonpipe.h1
-rw-r--r--src/jsonserverclient.cpp26
-rw-r--r--src/jsonserverclient.h3
-rw-r--r--src/jsonstream.cpp210
-rw-r--r--src/jsonstream.h23
-rw-r--r--tests/auto/jsonstream/testClient/main.cpp8
-rw-r--r--tests/auto/jsonstream/tst_jsonstream.cpp89
12 files changed, 537 insertions, 107 deletions
diff --git a/src/jsonbuffer.cpp b/src/jsonbuffer.cpp
index fde637f..3fe5561 100644
--- a/src/jsonbuffer.cpp
+++ b/src/jsonbuffer.cpp
@@ -73,31 +73,65 @@ JsonBuffer::JsonBuffer(QObject *parent)
, mParserDepth(0)
, mParserOffset(0)
, mParserStartOffset(-1)
+ , mEmittedReadyRead(false)
+ , mMessageAvailable(false)
+ , mMessageSize(0)
+ , mEnabled(true)
{
}
/*!
+ \fn bool JsonBuffer::isEnabled() const
+
+ Returns true if \l{readyReadMessage()} signal notifier is enabled; otherwise returns false.
+
+ \sa setEnabled(), readyReadMessage()
+*/
+
+/*!
+ \fn void JsonBuffer::setEnabled(bool enable)
+
+ If \a enable is true, \l{readyReadMessage()} signal notifier is enabled;
+ otherwise the notifier is disabled.
+
+ The notifier is enabled by default, i.e. it emits the \l{readyReadMessage()}
+ signal whenever a new message is ready.
+
+ The notifier should normally be disabled while user is reading existing messages.
+
+ \sa isEnabled(), readyReadMessage()
+*/
+
+/*!
+ \fn int JsonBuffer::size() const
+
+ Returns the size of the buffer in bytes.
+*/
+
+/*!
Append the contents of a byte array \a data onto the buffer.
- During the execution of this function, the objectReceived
+ During the execution of this function, the \l{readyReadMessage()}
signal may be raised.
*/
void JsonBuffer::append(const QByteArray& data)
{
mBuffer.append(data.data(), data.size());
- processMessages();
+ if (0 < size())
+ processMessages();
}
/*!
Append the \a data pointer with length \a len onto the JsonBuffer.
- During the execution of this function, objectReceived
+ During the execution of this function, the \l{readyReadMessage()}
signal may be raised.
*/
void JsonBuffer::append(const char *data, int len)
{
mBuffer.append(data, len);
- processMessages();
+ if (0 < size())
+ processMessages();
}
/*!
@@ -131,6 +165,7 @@ int JsonBuffer::copyFromFd(int fd)
void JsonBuffer::clear()
{
mBuffer.clear();
+ resetParser();
}
/*!
@@ -177,6 +212,8 @@ void JsonBuffer::resetParser()
mParserDepth = 0;
mParserOffset = 0;
mParserStartOffset = -1;
+ mMessageAvailable = false;
+ mMessageSize = 0;
}
/*!
@@ -185,10 +222,30 @@ void JsonBuffer::resetParser()
void JsonBuffer::processMessages()
{
+ // do not process anything if disabled or if control is still inside readyReadMessage() slot
+ if (mEnabled && !mEmittedReadyRead) {
+ mEmittedReadyRead = true;
+ if (messageAvailable()) {
+ emit readyReadMessage();
+ }
+ mEmittedReadyRead = false;
+ }
+}
+
+/*!
+ \internal
+*/
+bool JsonBuffer::messageAvailable()
+{
+ if (mMessageAvailable) {
+ // already found - no need to check again
+ return true;
+ }
+
if (mFormat == FormatUndefined && mBuffer.size() >= 4) {
- if (strncmp("bson", mBuffer.data(), 4) == 0)
+ if (strncmp("bson", mBuffer.constData(), 4) == 0)
mFormat = FormatBSON;
- else if (QJsonDocument::BinaryFormatTag == *((uint *) mBuffer.data()))
+ else if (QJsonDocument::BinaryFormatTag == *((uint *) mBuffer.constData()))
mFormat = FormatQBJS;
else if (mBuffer.at(0) == 0 &&
mBuffer.at(1) != 0 &&
@@ -211,12 +268,8 @@ void JsonBuffer::processMessages()
for ( ; mParserOffset < mBuffer.size() ; mParserOffset++ ) {
char c = mBuffer.at(mParserOffset);
if (scanUtf(c)) {
- QByteArray msg = mBuffer.mid(mParserStartOffset, mParserOffset - mParserStartOffset);
- QJsonObject obj = QJsonDocument::fromJson(msg).object();
- if (!obj.isEmpty())
- emit objectReceived(obj);
- mBuffer = mBuffer.mid(mParserOffset);
- resetParser();
+ mMessageAvailable = true;
+ return true;
}
}
break;
@@ -224,13 +277,8 @@ void JsonBuffer::processMessages()
for ( ; 2 * mParserOffset < mBuffer.size() ; mParserOffset++ ) {
int16_t c = qFromBigEndian(reinterpret_cast<const int16_t *>(mBuffer.constData())[mParserOffset]);
if (scanUtf(c)) {
- QByteArray msg = mBuffer.mid(mParserStartOffset * 2, 2*(mParserOffset - mParserStartOffset));
- QString s = QTextCodec::codecForName("UTF-16BE")->toUnicode(msg);
- QJsonObject obj = QJsonDocument::fromJson(s.toUtf8()).object();
- if (!obj.isEmpty())
- emit objectReceived(obj);
- mBuffer = mBuffer.mid(mParserOffset*2);
- resetParser();
+ mMessageAvailable = true;
+ return true;
}
}
break;
@@ -238,42 +286,93 @@ void JsonBuffer::processMessages()
for ( ; 2 * mParserOffset < mBuffer.size() ; mParserOffset++ ) {
int16_t c = qFromLittleEndian(reinterpret_cast<const int16_t *>(mBuffer.constData())[mParserOffset]);
if (scanUtf(c)) {
- QByteArray msg = mBuffer.mid(mParserStartOffset * 2, 2*(mParserOffset - mParserStartOffset));
- QString s = QTextCodec::codecForName("UTF-16LE")->toUnicode(msg);
- QJsonObject obj = QJsonDocument::fromJson(s.toUtf8()).object();
- if (!obj.isEmpty())
- emit objectReceived(obj);
- mBuffer = mBuffer.mid(mParserOffset*2);
- resetParser();
+ mMessageAvailable = true;
+ return true;
}
}
break;
case FormatBSON:
- while (mBuffer.size() >= 8) {
- qint32 message_size = qFromLittleEndian(((qint32 *)mBuffer.data())[1]);
- if (mBuffer.size() < message_size + 4)
- break;
- QByteArray msg = mBuffer.mid(4, message_size);
- QJsonObject obj = QJsonDocument::fromVariant(BsonObject(msg).toMap()).object();
- if (!obj.isEmpty())
- emit objectReceived(obj);
- mBuffer = mBuffer.mid(message_size+4);
+ if (mBuffer.size() >= 8) {
+ qint32 message_size = qFromLittleEndian(((qint32 *)mBuffer.constData())[1]);
+ if (mBuffer.size() >= message_size + 4) {
+ mMessageSize = message_size;
+ mMessageAvailable = true;
+ }
}
break;
case FormatQBJS:
- while (mBuffer.size() >= 12) {
+ if (mBuffer.size() >= 12) {
// ### TODO: Should use 'sizeof(Header)'
- qint32 message_size = qFromLittleEndian(((qint32 *)mBuffer.data())[2]) + 8;
- if (mBuffer.size() < message_size)
- break;
- QByteArray msg = mBuffer.left(message_size);
- QJsonObject obj = QJsonDocument::fromBinaryData(msg).object();
- if (!obj.isEmpty())
- emit objectReceived(obj);
- mBuffer = mBuffer.mid(message_size);
+ qint32 message_size = qFromLittleEndian(((qint32 *)mBuffer.constData())[2]) + 8;
+ if (mBuffer.size() >= message_size) {
+ mMessageSize = message_size;
+ mMessageAvailable = true;
+ }
}
break;
}
+ return mMessageAvailable;
+}
+
+/*!
+ \internal
+*/
+QJsonObject JsonBuffer::readMessage()
+{
+ QJsonObject obj;
+ if (messageAvailable()) {
+ switch (mFormat) {
+ case FormatUndefined:
+ break;
+ case FormatUTF8:
+ if (mParserStartOffset >= 0) {
+ QByteArray msg = rawData(mParserStartOffset, mParserOffset - mParserStartOffset);
+ obj = QJsonDocument::fromJson(msg).object();
+ // prepare for the next
+ mBuffer.remove(0, mParserOffset);
+ resetParser();
+ }
+ break;
+ case FormatUTF16BE:
+ if (mParserStartOffset >= 0) {
+ QByteArray msg = rawData(mParserStartOffset * 2, 2*(mParserOffset - mParserStartOffset));
+ QString s = QTextCodec::codecForName("UTF-16BE")->toUnicode(msg);
+ obj = QJsonDocument::fromJson(s.toUtf8()).object();
+ // prepare for the next
+ mBuffer.remove(0, mParserOffset*2);
+ resetParser();
+ }
+ break;
+ case FormatUTF16LE:
+ if (mParserStartOffset >= 0) {
+ QByteArray msg = rawData(mParserStartOffset * 2, 2*(mParserOffset - mParserStartOffset));
+ QString s = QTextCodec::codecForName("UTF-16LE")->toUnicode(msg);
+ obj = QJsonDocument::fromJson(s.toUtf8()).object();
+ // prepare for the next
+ mBuffer.remove(0, mParserOffset*2);
+ resetParser();
+ }
+ break;
+ case FormatBSON:
+ if (mMessageSize > 0) {
+ QByteArray msg = rawData(4, mMessageSize);
+ obj = QJsonDocument::fromVariant(BsonObject(msg).toMap()).object();
+ mBuffer.remove(0, mMessageSize+4);
+ mMessageSize = 0;
+ }
+ break;
+ case FormatQBJS:
+ if (mMessageSize > 0) {
+ QByteArray msg = rawData(0, mMessageSize);
+ obj = QJsonDocument::fromBinaryData(msg).object();
+ mBuffer.remove(0, mMessageSize);
+ mMessageSize = 0;
+ }
+ break;
+ }
+ mMessageAvailable = false;
+ }
+ return obj;
}
/*!
@@ -286,9 +385,30 @@ EncodingFormat JsonBuffer::format() const
}
/*!
- \fn void JsonBuffer::objectReceived(const QJsonObject& object)
- This signal is emitted when a new Qt Binary Json \a object has been received on the
- stream.
+ \fn void JsonBuffer::readyReadMessage()
+
+ This signal is emitted once every time new data is appended to the buffer
+ and a message is ready. \b readMessage() should be used to retrieve a message
+ and \b messageAvailable() to check for next available messages.
+ The client code may look like this:
+
+ \code
+ ...
+ connect(jsonbuffer, SIGNAL(readyReadMessage()), this, SLOT(processMessages()));
+ ...
+
+ void Foo::processMessages()
+ {
+ while (jsonbuffer->messageAvailable()) {
+ QJsonObject obj = jsonbuffer->readMessage();
+ <process message>
+ }
+ }
+ \endcode
+
+ \b readyReadMessage() is not emitted recursively; if you reenter the event loop
+ inside a slot connected to the \b readyReadMessage() signal, the signal will not
+ be reemitted.
*/
#include "moc_jsonbuffer_p.cpp"
diff --git a/src/jsonbuffer_p.h b/src/jsonbuffer_p.h
index 9bce2ce..8069234 100644
--- a/src/jsonbuffer_p.h
+++ b/src/jsonbuffer_p.h
@@ -62,13 +62,22 @@ public:
EncodingFormat format() const;
+ bool messageAvailable();
+ QJsonObject readMessage();
+
+ int size() const { return mBuffer.size(); }
+
+ inline bool isEnabled() const { return mEnabled; }
+ inline void setEnabled(bool enable) { mEnabled = enable; }
+
signals:
- void objectReceived(const QJsonObject& object);
+ void readyReadMessage();
private:
void processMessages();
bool scanUtf(int c);
void resetParser();
+ QByteArray rawData(int _start, int _len) const;
private:
enum UTF8ParsingState { ParseNormal, ParseInString, ParseInBackslash };
@@ -79,8 +88,17 @@ private:
int mParserDepth;
int mParserOffset;
int mParserStartOffset;
+ bool mEmittedReadyRead;
+ bool mMessageAvailable;
+ int mMessageSize;
+ bool mEnabled;
};
+inline QByteArray JsonBuffer::rawData(int _start, int _len) const
+{
+ return QByteArray::fromRawData(mBuffer.constData() + _start, _len);
+}
+
QT_END_NAMESPACE_JSONSTREAM
#endif // _JSON_BUFFER_H
diff --git a/src/jsonclient.cpp b/src/jsonclient.cpp
index 7f1d1f4..b16fd96 100644
--- a/src/jsonclient.cpp
+++ b/src/jsonclient.cpp
@@ -136,8 +136,7 @@ bool JsonClient::connectTCP(const QString& hostname, int port)
connect(&d->mStream, SIGNAL(receive(const QJsonObject&)),
this, SIGNAL(receive(const QJsonObject&)));
- d->mStream.send(d->mRegistrationMessage);
- return true;
+ return d->mStream.send(d->mRegistrationMessage);
}
return false;
@@ -162,11 +161,9 @@ bool JsonClient::connectLocal(const QString& socketname)
connect(socket, SIGNAL(disconnected()), SLOT(handleSocketDisconnected()));
Q_D(JsonClient);
d->mStream.setDevice(socket);
- connect(&d->mStream, SIGNAL(messageReceived(const QJsonObject&)),
- this, SIGNAL(messageReceived(const QJsonObject&)));
+ connect(&d->mStream, SIGNAL(readyReadMessage()), this, SLOT(processMessages()));
// qDebug() << "Sending local socket registration message" << mRegistrationMessage;
- d->mStream.send(d->mRegistrationMessage);
- return true;
+ return d->mStream.send(d->mRegistrationMessage);
}
delete socket;
return false;
@@ -174,16 +171,19 @@ bool JsonClient::connectLocal(const QString& socketname)
/*!
Send a \a message over the socket.
+ Returns true if the entire message was send/buffered or false otherwise.
*/
-void JsonClient::send(const QJsonObject &message)
+bool JsonClient::send(const QJsonObject &message)
{
+ bool ret = false;
Q_D(JsonClient);
if (d->mStream.isOpen()) {
- d->mStream << message;
+ ret = d->mStream.send(message);
} else {
qCritical() << Q_FUNC_INFO << "stream socket is not available";
}
+ return ret;
}
/*!
@@ -214,6 +214,19 @@ void JsonClient::handleSocketDisconnected()
/*!
+ \internal
+*/
+void JsonClient::processMessages()
+{
+ Q_D(JsonClient);
+ while (d->mStream.messageAvailable()) {
+ QJsonObject obj = d->mStream.readMessage();
+ if (!obj.isEmpty())
+ emit messageReceived(obj);
+ }
+}
+
+/*!
\fn void JsonClient::messageReceived(const QJsonObject &message)
This signal is emitted when a \a message is received from the server.
*/
diff --git a/src/jsonclient.h b/src/jsonclient.h
index 70e8e88..2fe683e 100644
--- a/src/jsonclient.h
+++ b/src/jsonclient.h
@@ -63,7 +63,7 @@ public:
bool connectTCP(const QString& hostname, int port);
bool connectLocal(const QString& socketname);
- void send(const QJsonObject&);
+ bool send(const QJsonObject&);
void setFormat( EncodingFormat format );
// Do we really need a "connect with delay or error" facility?
@@ -75,6 +75,7 @@ signals:
private slots:
void handleSocketDisconnected();
+ void processMessages();
private:
Q_DECLARE_PRIVATE(JsonClient)
diff --git a/src/jsonpipe.cpp b/src/jsonpipe.cpp
index 981a25e..18c8f46 100644
--- a/src/jsonpipe.cpp
+++ b/src/jsonpipe.cpp
@@ -107,8 +107,7 @@ JsonPipe::JsonPipe(QObject *parent)
{
Q_D(JsonPipe);
d->mInBuffer = new JsonBuffer(this);
- connect(d->mInBuffer, SIGNAL(objectReceived(const QJsonObject&)),
- SLOT(objectReceived(const QJsonObject&)));
+ connect(d->mInBuffer, SIGNAL(readyReadMessage()), SLOT(processMessages()));
}
/*!
@@ -278,6 +277,21 @@ void JsonPipe::objectReceived(const QJsonObject& object)
}
/*!
+ \internal
+*/
+void JsonPipe::processMessages()
+{
+ Q_D(JsonPipe);
+ d->mInBuffer->setEnabled(false);
+ while (d->mInBuffer->messageAvailable()) {
+ QJsonObject obj = d->mInBuffer->readMessage();
+ if (!obj.isEmpty())
+ objectReceived(obj);
+ }
+ d->mInBuffer->setEnabled(true);
+}
+
+/*!
Return the current JsonPipe::EncodingFormat.
*/
diff --git a/src/jsonpipe.h b/src/jsonpipe.h
index 63bf387..eef9d65 100644
--- a/src/jsonpipe.h
+++ b/src/jsonpipe.h
@@ -80,6 +80,7 @@ signals:
void error(PipeError);
protected slots:
+ void processMessages();
void objectReceived(const QJsonObject& object);
void inReady(int fd);
void outReady(int fd);
diff --git a/src/jsonserverclient.cpp b/src/jsonserverclient.cpp
index 73fff00..5b53e74 100644
--- a/src/jsonserverclient.cpp
+++ b/src/jsonserverclient.cpp
@@ -121,8 +121,7 @@ void JsonServerClient::setSocket(QLocalSocket *socket)
d->m_stream = new JsonStream(socket);
d->m_stream->setParent(this);
connect(socket, SIGNAL(disconnected()), this, SLOT(handleDisconnect()));
- connect(d->m_stream, SIGNAL(messageReceived(const QJsonObject&)),
- this, SLOT(received(const QJsonObject&)));
+ connect(d->m_stream, SIGNAL(readyReadMessage()), this, SLOT(processMessages()));
}
}
@@ -172,8 +171,7 @@ void JsonServerClient::stop()
{
// qDebug() << Q_FUNC_INFO;
Q_D(JsonServerClient);
- disconnect(d->m_stream, SIGNAL(messageReceived(const QJsonObject&)),
- this, SLOT(received(const QJsonObject&)));
+ disconnect(d->m_stream, SIGNAL(readyReadMessage()), this, SLOT(processMessages()));
d->m_socket->disconnectFromServer();
// qDebug() << Q_FUNC_INFO << "done";
}
@@ -209,15 +207,18 @@ void JsonServerClient::received(const QJsonObject& message)
/*!
Send a \a message to the client.
+ Returns true if the entire message was send/buffered or false otherwise.
*/
-void JsonServerClient::send(const QJsonObject &message)
+bool JsonServerClient::send(const QJsonObject &message)
{
+ bool ret = false;
Q_D(JsonServerClient);
if (d->m_stream) {
// qDebug() << "Sending message" << message;
- d->m_stream->send(message);
+ ret = d->m_stream->send(message);
}
+ return ret;
}
void JsonServerClient::handleDisconnect()
@@ -233,6 +234,19 @@ void JsonServerClient::handleDisconnect()
}
/*!
+ \internal
+ */
+void JsonServerClient::processMessages()
+{
+ Q_D(JsonServerClient);
+ while (d->m_stream->messageAvailable()) {
+ QJsonObject obj = d->m_stream->readMessage();
+ if (!obj.isEmpty())
+ received(obj);
+ }
+}
+
+/*!
\fn JsonServerClient::disconnected(const QString& identifier)
This signal is emitted when the client has been disconnected. The
\a identifier property is included for convenience.
diff --git a/src/jsonserverclient.h b/src/jsonserverclient.h
index 00b7394..5c97dfb 100644
--- a/src/jsonserverclient.h
+++ b/src/jsonserverclient.h
@@ -64,7 +64,7 @@ public:
void start();
void stop();
- void send(const QJsonObject &message);
+ bool send(const QJsonObject &message);
void setAuthority(JsonAuthority *authority);
@@ -83,6 +83,7 @@ signals:
private slots:
void received(const QJsonObject& message);
void handleDisconnect();
+ void processMessages();
private:
Q_DECLARE_PRIVATE(JsonServerClient)
diff --git a/src/jsonstream.cpp b/src/jsonstream.cpp
index 348eb2d..feacffc 100644
--- a/src/jsonstream.cpp
+++ b/src/jsonstream.cpp
@@ -74,11 +74,15 @@ class JsonStreamPrivate
public:
JsonStreamPrivate()
: mDevice(0)
- , mFormat(FormatUndefined) {}
+ , mFormat(FormatUndefined)
+ , mReadBufferSize(0)
+ , mWriteBufferSize(0) {}
QIODevice *mDevice;
JsonBuffer *mBuffer;
EncodingFormat mFormat;
+ qint64 mReadBufferSize;
+ qint64 mWriteBufferSize;
};
/****************************************************************************/
@@ -102,8 +106,7 @@ JsonStream::JsonStream(QIODevice *device)
{
Q_D(JsonStream);
d->mBuffer = new JsonBuffer(this);
- connect(d->mBuffer, SIGNAL(objectReceived(const QJsonObject&)),
- SLOT(objectReceived(const QJsonObject&)));
+ connect(d->mBuffer, SIGNAL(readyReadMessage()), SLOT(messageReceived()));
setDevice(device);
}
@@ -160,21 +163,30 @@ void JsonStream::setDevice( QIODevice *device )
Q_D(JsonStream);
if (d->mDevice) {
disconnect(d->mDevice, SIGNAL(readyRead()), this, SLOT(dataReadyOnSocket()));
+ disconnect(d->mDevice, SIGNAL(bytesWritten(qint64)), this, SIGNAL(bytesWritten(qint64)));
disconnect(d->mDevice, SIGNAL(aboutToClose()), this, SIGNAL(aboutToClose()));
}
d->mDevice = device;
if (device) {
connect(device, SIGNAL(readyRead()), this, SLOT(dataReadyOnSocket()));
+ connect(device, SIGNAL(bytesWritten(qint64)), this, SIGNAL(bytesWritten(qint64)));
connect(device, SIGNAL(aboutToClose()), this, SIGNAL(aboutToClose()));
}
}
/*!
- Send a JsonObject \a object over the stream
+ Send a JsonObject \a object over the stream.
+ Returns \b true if the entire message was sent or buffered or \b false otherwise.
+
+ JsonStream does not have a write buffer of its own. Rather, it uses the
+ write buffer of the \l{device()}. It will not cause that buffer to grow
+ larger than \l{writeBufferSize()} at any time. If this would occur, this
+ method will return \b false.
*/
-void JsonStream::send(const QJsonObject& object)
+bool JsonStream::send(const QJsonObject& object)
{
+ bool bRet(false);
QJsonDocument document(object);
Q_D(JsonStream);
@@ -183,40 +195,55 @@ void JsonStream::send(const QJsonObject& object)
d->mFormat = FormatQBJS;
// Deliberate fall through
case FormatQBJS:
- sendInternal( document.toBinaryData() );
+ bRet = sendInternal( document.toBinaryData() );
break;
case FormatUTF8:
- sendInternal( document.toJson() );
+ bRet = sendInternal( document.toJson() );
break;
case FormatUTF16BE:
- sendInternal( QTextCodec::codecForName("UTF-16BE")->fromUnicode(QString::fromUtf8(document.toJson())).mid(2) ); // Chop off BOM
+ bRet = sendInternal( QTextCodec::codecForName("UTF-16BE")->fromUnicode(QString::fromUtf8(document.toJson())).mid(2) ); // Chop off BOM
break;
case FormatUTF16LE:
- sendInternal( QTextCodec::codecForName("UTF-16LE")->fromUnicode(QString::fromUtf8(document.toJson())).mid(2) ); // Chop off BOM
+ bRet = sendInternal( QTextCodec::codecForName("UTF-16LE")->fromUnicode(QString::fromUtf8(document.toJson())).mid(2) ); // Chop off BOM
break;
case FormatBSON:
{
BsonObject bson(document.toVariant().toMap());
- sendInternal(bsonToByteArray(bson));
+ bRet = sendInternal(bsonToByteArray(bson));
break;
}
}
+ return bRet;
}
/*!
\internal
Send raw QByteArray \a byteArray data over the socket.
*/
-
-void JsonStream::sendInternal(const QByteArray& byteArray)
+bool JsonStream::sendInternal(const QByteArray& byteArray)
{
Q_D(JsonStream);
- if (!d->mDevice) {
+ if (!isOpen()) {
qWarning() << Q_FUNC_INFO << "No device in JsonStream";
- return;
+ return false;
+ }
+
+ int nBytes = 0;
+ if (d->mWriteBufferSize <= 0 || d->mDevice->bytesToWrite() + byteArray.size() <= d->mWriteBufferSize) {
+ for (int nSz = byteArray.size(); nSz > 0; ) {
+ int nWrite = d->mDevice->write( byteArray.constData() + nBytes, nSz);
+ if (nWrite <= 0) {
+ // write error
+ qWarning() << Q_FUNC_INFO << __LINE__
+ << QString::fromLatin1("Write error. QIODevice::write() returned %1 (%2).")
+ .arg(nWrite).arg(d->mDevice->errorString());
+ break;
+ }
+ nBytes += nWrite;
+ nSz -= nWrite;
+ }
}
- int nBytes = d->mDevice->write( byteArray.data(), byteArray.size() );
if (QLocalSocket *socket = qobject_cast<QLocalSocket*>(d->mDevice))
socket->flush();
else if (QAbstractSocket *socket = qobject_cast<QAbstractSocket*>(d->mDevice))
@@ -224,22 +251,24 @@ void JsonStream::sendInternal(const QByteArray& byteArray)
else
qWarning() << Q_FUNC_INFO << "Unknown socket type:" << d->mDevice->metaObject()->className();
- if (nBytes != byteArray.size())
+ bool bFail;
+ if ((bFail = (nBytes != byteArray.size())))
qCritical() << Q_FUNC_INFO << __LINE__
<< QString::fromLatin1("Expected to write %1 bytes, actually %2.").arg(byteArray.size()).arg(nBytes);
+ return !bFail;
}
/*!
\internal
- Handle a received Qt Binary Json \a object and emit the correct signals
+ Handle a received readyReadMessage signal and emit the correct signals
*/
-void JsonStream::objectReceived(const QJsonObject& object)
+void JsonStream::messageReceived()
{
Q_D(JsonStream);
if (d->mFormat == FormatUndefined)
d->mFormat = d->mBuffer->format();
- emit messageReceived(object);
+ emit readyReadMessage();
}
/*!
@@ -250,6 +279,26 @@ void JsonStream::objectReceived(const QJsonObject& object)
void JsonStream::dataReadyOnSocket()
{
Q_D(JsonStream);
+ if (d->mReadBufferSize > 0) {
+ while (d->mDevice->bytesAvailable() + d->mBuffer->size() > d->mReadBufferSize) {
+ // can't fit all data into a read buffer - read a part that fits
+ d->mBuffer->append(d->mDevice->read(d->mReadBufferSize - d->mBuffer->size()));
+
+ // if the read buffer is full then emit readBufferOverflow and allow user to increase the buffer size
+ if (d->mBuffer->size() == d->mReadBufferSize) {
+ emit readBufferOverflow(d->mDevice->bytesAvailable() + d->mBuffer->size());
+ if (d->mBuffer->size() == d->mReadBufferSize) {
+ // still can't read anything - close connection
+ d->mDevice->close();
+ return;
+ }
+ else if (0 == d->mReadBufferSize) {
+ // user removed buffer size limitation
+ break;
+ }
+ }
+ }
+ }
d->mBuffer->append( d->mDevice->readAll());
}
@@ -273,23 +322,118 @@ void JsonStream::setFormat( EncodingFormat format )
d->mFormat = format;
}
+/*!
+ Returns a maximum size of the inbound message buffer.
+ */
+qint64 JsonStream::readBufferSize() const
+{
+ Q_D(const JsonStream);
+ return d->mReadBufferSize;
+}
/*!
- \relates JsonStream
+ Sets a maximum size of the inbound message buffer to \a sz thus capping a size
+ of an inbound message.
+ */
+void JsonStream::setReadBufferSize(qint64 sz)
+{
+ if (sz >= 0) {
+ Q_D(JsonStream);
+ d->mReadBufferSize = sz;
+ }
+}
- Sends the \a map via the \a stream.
-*/
-JsonStream& operator<<( JsonStream& stream, const QJsonObject& map )
+/*!
+ Returns a maximum size of the outbound message buffer. A value of 0
+ means the buffer size is unlimited.
+ */
+qint64 JsonStream::writeBufferSize() const
{
- stream.send(map);
- return stream;
+ Q_D(const JsonStream);
+ return d->mWriteBufferSize;
}
+/*!
+ Sets a maximum size of the outbound message buffer to \a sz thus capping a size
+ of an outbound message. A value of 0 means the buffer size is unlimited.
+ */
+void JsonStream::setWriteBufferSize(qint64 sz)
+{
+ if (sz >= 0) {
+ Q_D(JsonStream);
+ d->mWriteBufferSize = sz;
+ }
+}
/*!
- \fn void JsonStream::messageReceived(const QJsonObject& message)
- This signal is emitted when a new \a message has been received on the
- stream.
+ Returns a number of bytes currently in the write buffer. Effectively,
+ if \l{writeBufferSize()} is not unlimited, the largest message you can
+ send at any one time is (\l{writeBufferSize()} - \b bytesToWrite()) bytes.
+ */
+qint64 JsonStream::bytesToWrite() const
+{
+ Q_D(const JsonStream);
+ return (d->mDevice ? d->mDevice->bytesToWrite() : 0);
+}
+
+/*!
+ Returns a JSON object that has been received. If no message is
+ available, an empty JSON object is returned.
+ */
+QJsonObject JsonStream::readMessage()
+{
+ Q_D(JsonStream);
+ return d->mBuffer->readMessage();
+}
+
+/*!
+ Returns \b true if a message is available to be read via \l{readMessage()}
+ or \b false otherwise.
+ */
+bool JsonStream::messageAvailable()
+{
+ Q_D(JsonStream);
+ return d->mBuffer->messageAvailable();
+}
+
+/*! \fn JsonStream::bytesWritten(qint64 bytes)
+
+ This signal is emitted every time a payload of data has been
+ written to the device. The \a bytes argument is set to the number
+ of bytes that were written in this payload.
+
+ If a previous call to \l{send()} returned \b false, you should re-send
+ the message when this signal is emitted, as the write buffer may have been
+ emptied enough to hold the new message.
+*/
+
+/*!
+ \fn void JsonStream::readyReadMessage()
+
+ This signal is emitted once every time new data arrives on the \l{device()}
+ and a message is ready. \l{readMessage()} should be used to retrieve a message
+ and \l{messageAvailable()} to check for next available messages.
+ The client code may look like this:
+
+ \code
+ ...
+ connect(jsonstream, SIGNAL(readyReadMessage()), this, SLOT(processMessages()));
+ ...
+
+ void Foo::processMessages()
+ {
+ while (jsonstream->messageAvailable()) {
+ QJsonObject obj = jsonstream->readMessage();
+ <process message>
+ }
+ }
+ \endcode
+
+ \b readyReadMessage() is not emitted recursively; if you reenter the event loop
+ inside a slot connected to the \b readyReadMessage() signal, the signal will not
+ be reemitted.
+
+ \sa readMessage(), messageAvailable()
*/
/*!
@@ -297,6 +441,16 @@ JsonStream& operator<<( JsonStream& stream, const QJsonObject& map )
This signal is emitted when the underlying \c QIODevice is about to close.
*/
+/*! \fn JsonStream::readBufferOverflow(qint64 bytes)
+
+ This signal is emitted when the read buffer is full of data that has been read
+ from the \l{device()}, \a bytes additional bytes are available on the device,
+ but the message is not complete. The \l{readBufferSize()} mayb e increased
+ to a sufficient size in a slot connected to this signal, in which case more
+ data will be read into the read buffer. If the buffer size is not increased,
+ the connection is closed.
+ */
+
#include "moc_jsonstream.cpp"
QT_END_NAMESPACE_JSONSTREAM
diff --git a/src/jsonstream.h b/src/jsonstream.h
index 3acb204..693dad4 100644
--- a/src/jsonstream.h
+++ b/src/jsonstream.h
@@ -64,29 +64,40 @@ public:
QIODevice *device() const;
void setDevice( QIODevice *device );
- void send(const QJsonObject& message);
+ bool send(const QJsonObject& message);
EncodingFormat format() const;
void setFormat(EncodingFormat format);
+ qint64 readBufferSize() const;
+ void setReadBufferSize(qint64);
+
+ qint64 writeBufferSize() const;
+ void setWriteBufferSize(qint64 sz);
+
+ qint64 bytesToWrite() const;
+
+ bool messageAvailable();
+ QJsonObject readMessage();
+
signals:
- void messageReceived(const QJsonObject& message);
+ void bytesWritten(qint64);
+ void readyReadMessage();
void aboutToClose();
+ void readBufferOverflow(qint64);
protected slots:
void dataReadyOnSocket();
- void objectReceived(const QJsonObject& object);
+ void messageReceived();
protected:
- void sendInternal(const QByteArray& byteArray);
+ bool sendInternal(const QByteArray& byteArray);
private:
Q_DECLARE_PRIVATE(JsonStream)
QScopedPointer<JsonStreamPrivate> d_ptr;
};
-JsonStream& operator<<( JsonStream&, const QJsonObject& );
-
QT_END_NAMESPACE_JSONSTREAM
#endif // _JSON_STREAM_H
diff --git a/tests/auto/jsonstream/testClient/main.cpp b/tests/auto/jsonstream/testClient/main.cpp
index 79e9057..e23e301 100644
--- a/tests/auto/jsonstream/testClient/main.cpp
+++ b/tests/auto/jsonstream/testClient/main.cpp
@@ -127,6 +127,14 @@ void Container::sendSchemaTestMessage()
void Container::received(const QJsonObject& message)
{
+ // test large string size
+ if (message.contains("large") || message.contains("large_size")) {
+ if (message.value("large").toString().size() != message.value("large_size").toDouble()) {
+ qWarning() << "Large string size mismatch" << message.value("large").toString().size() << "!=" << message.value("large_size").toDouble();
+ exit(3);
+ }
+ }
+
QString command = message.value("command").toString();
if (!command.isEmpty()) {
qDebug() << "Received command" << command;
diff --git a/tests/auto/jsonstream/tst_jsonstream.cpp b/tests/auto/jsonstream/tst_jsonstream.cpp
index 14f3b08..8bc12d6 100644
--- a/tests/auto/jsonstream/tst_jsonstream.cpp
+++ b/tests/auto/jsonstream/tst_jsonstream.cpp
@@ -142,6 +142,7 @@ public:
void waitForFinished() {
if (process->state() == QProcess::Running)
QVERIFY(process->waitForFinished(5000));
+ QVERIFY(process->exitCode() == 0);
delete process;
process = 0;
}
@@ -173,7 +174,7 @@ class BasicServer : public QObject {
Q_OBJECT
public:
- BasicServer(const QString& socketname) : socket(0), stream(0) {
+ BasicServer(const QString& socketname, qint64 _sz = 0) : socket(0), stream(0), readBufferSize(_sz) {
QLocalServer::removeServer(socketname);
server = new QLocalServer(this);
connect(server, SIGNAL(newConnection()), SLOT(handleConnection()));
@@ -186,9 +187,8 @@ public:
server = NULL;
}
- void send(const QJsonObject& message) {
- QVERIFY(stream);
- stream->send(message);
+ bool send(const QJsonObject& message) {
+ return stream ? stream->send(message) : false;
}
void waitDisconnect(int timeout=5000) {
@@ -207,15 +207,26 @@ public:
return stream->format();
}
+ JsonStream *jsonStream() const { return stream; }
private slots:
void handleConnection() {
socket = server->nextPendingConnection();
QVERIFY(socket);
stream = new JsonStream(socket);
stream->setParent(socket);
+ if (readBufferSize > 0)
+ stream->setReadBufferSize(readBufferSize);
connect(socket, SIGNAL(disconnected()), SLOT(handleDisconnection()));
- connect(stream, SIGNAL(messageReceived(const QJsonObject&)),
- SIGNAL(messageReceived(const QJsonObject&)));
+ connect(stream, SIGNAL(readyReadMessage()), SLOT(processMessages()));
+ connect(stream, SIGNAL(readBufferOverflow(qint64)), SLOT(handleReadBufferOverflow(qint64)));
+ }
+
+ void processMessages() {
+ while (stream->messageAvailable()) {
+ QJsonObject obj = stream->readMessage();
+ if (!obj.isEmpty())
+ emit messageReceived(obj);
+ }
}
void handleDisconnection() {
@@ -225,13 +236,21 @@ private slots:
stream = NULL;
}
+ void handleReadBufferOverflow(qint64 sz) {
+ QVERIFY(readBufferSize > 0 && sz > readBufferSize);
+ stream->setReadBufferSize(sz);
+ emit readBufferOverflow(sz);
+ }
+
signals:
void messageReceived(const QJsonObject&);
+ void readBufferOverflow(qint64);
private:
QLocalServer *server;
QLocalSocket *socket;
JsonStream *stream;
+ qint64 readBufferSize;
};
/****************************/
@@ -253,6 +272,7 @@ private slots:
void pipeTest();
void pipeFormatTest();
void pipeWaitTest();
+ void bufferSizeTest();
};
void tst_JsonStream::initTestCase()
@@ -417,12 +437,67 @@ void tst_JsonStream::formatTest()
QVERIFY(object.value("item2").toString() == "This is item 2");
msg.insert("command", QLatin1String("exit"));
- server.send(msg);
+ QVERIFY(server.send(msg));
server.waitDisconnect();
child.waitForFinished();
}
}
+void tst_JsonStream::bufferSizeTest()
+{
+ QString socketname = "/tmp/tst_socket";
+
+ BasicServer server(socketname, 100);
+ QSignalSpy spy(&server, SIGNAL(messageReceived(const QJsonObject&)));
+ QSignalSpy spy1(&server, SIGNAL(readBufferOverflow(qint64)));
+ QTime stopWatch;
+
+ Child child("testClient/testClient",
+ QStringList() << "-socket" << socketname);
+
+ stopWatch.start();
+ forever {
+ QTestEventLoop::instance().enterLoop(1);
+ if (stopWatch.elapsed() >= 5000)
+ QFAIL("Timed out");
+ if (spy.count())
+ break;
+ }
+ QVERIFY(spy1.count() == 1); // overflow happend only once
+
+ QJsonObject msg = qvariant_cast<QJsonObject>(spy.last().at(0));
+ QVERIFY(msg.value("text").isString() && msg.value("text").toString() == QLatin1String("Standard text"));
+ QVERIFY(msg.value("int").isDouble() && msg.value("int").toDouble() == 100);
+ QVERIFY(msg.value("float").isDouble() && msg.value("float").toDouble() == 100.0);
+ QVERIFY(msg.value("true").isBool() && msg.value("true").toBool() == true);
+ QVERIFY(msg.value("false").isBool() && msg.value("false").toBool() == false);
+ QVERIFY(msg.value("array").isArray());
+ QJsonArray array = msg.value("array").toArray();
+ QVERIFY(array.size() == 3);
+ QVERIFY(array.at(0).toString() == "one");
+ QVERIFY(array.at(1).toString() == "two");
+ QVERIFY(array.at(2).toString() == "three");
+ QVERIFY(msg.value("object").isObject());
+ QJsonObject object = msg.value("object").toObject();
+ QVERIFY(object.value("item1").toString() == "This is item 1");
+ QVERIFY(object.value("item2").toString() == "This is item 2");
+
+ msg.insert("command", QLatin1String("exit"));
+
+ server.jsonStream()->setWriteBufferSize(100);
+ QVERIFY(!server.send(msg));
+
+ QString strLarge(500000, '*');
+ msg.insert("large", strLarge);
+ msg.insert("large_size", strLarge.size());
+ server.jsonStream()->setWriteBufferSize(0);
+ QVERIFY(server.send(msg));
+
+
+ server.waitDisconnect();
+ child.waitForFinished();
+}
+
void tst_JsonStream::schemaTest()
{
QString strSchemasDir(QDir::currentPath() + "/" + "schemas");