summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/jsonbuffer.cpp214
-rw-r--r--src/jsonbuffer_p.h20
-rw-r--r--src/jsonclient.cpp29
-rw-r--r--src/jsonclient.h3
-rw-r--r--src/jsonpipe.cpp18
-rw-r--r--src/jsonpipe.h1
-rw-r--r--src/jsonserverclient.cpp26
-rw-r--r--src/jsonserverclient.h3
-rw-r--r--src/jsonstream.cpp210
-rw-r--r--src/jsonstream.h23
-rw-r--r--tests/auto/jsonstream/testClient/main.cpp8
-rw-r--r--tests/auto/jsonstream/tst_jsonstream.cpp89
12 files changed, 537 insertions, 107 deletions
diff --git a/src/jsonbuffer.cpp b/src/jsonbuffer.cpp
index fde637f..3fe5561 100644
--- a/src/jsonbuffer.cpp
+++ b/src/jsonbuffer.cpp
@@ -73,31 +73,65 @@ JsonBuffer::JsonBuffer(QObject *parent)
, mParserDepth(0)
, mParserOffset(0)
, mParserStartOffset(-1)
+ , mEmittedReadyRead(false)
+ , mMessageAvailable(false)
+ , mMessageSize(0)
+ , mEnabled(true)
{
}
/*!
+ \fn bool JsonBuffer::isEnabled() const
+
+ Returns true if \l{readyReadMessage()} signal notifier is enabled; otherwise returns false.
+
+ \sa setEnabled(), readyReadMessage()
+*/
+
+/*!
+ \fn void JsonBuffer::setEnabled(bool enable)
+
+ If \a enable is true, \l{readyReadMessage()} signal notifier is enabled;
+ otherwise the notifier is disabled.
+
+ The notifier is enabled by default, i.e. it emits the \l{readyReadMessage()}
+ signal whenever a new message is ready.
+
+ The notifier should normally be disabled while user is reading existing messages.
+
+ \sa isEnabled(), readyReadMessage()
+*/
+
+/*!
+ \fn int JsonBuffer::size() const
+
+ Returns the size of the buffer in bytes.
+*/
+
+/*!
Append the contents of a byte array \a data onto the buffer.
- During the execution of this function, the objectReceived
+ During the execution of this function, the \l{readyReadMessage()}
signal may be raised.
*/
void JsonBuffer::append(const QByteArray& data)
{
mBuffer.append(data.data(), data.size());
- processMessages();
+ if (0 < size())
+ processMessages();
}
/*!
Append the \a data pointer with length \a len onto the JsonBuffer.
- During the execution of this function, objectReceived
+ During the execution of this function, the \l{readyReadMessage()}
signal may be raised.
*/
void JsonBuffer::append(const char *data, int len)
{
mBuffer.append(data, len);
- processMessages();
+ if (0 < size())
+ processMessages();
}
/*!
@@ -131,6 +165,7 @@ int JsonBuffer::copyFromFd(int fd)
void JsonBuffer::clear()
{
mBuffer.clear();
+ resetParser();
}
/*!
@@ -177,6 +212,8 @@ void JsonBuffer::resetParser()
mParserDepth = 0;
mParserOffset = 0;
mParserStartOffset = -1;
+ mMessageAvailable = false;
+ mMessageSize = 0;
}
/*!
@@ -185,10 +222,30 @@ void JsonBuffer::resetParser()
void JsonBuffer::processMessages()
{
+ // do not process anything if disabled or if control is still inside readyReadMessage() slot
+ if (mEnabled && !mEmittedReadyRead) {
+ mEmittedReadyRead = true;
+ if (messageAvailable()) {
+ emit readyReadMessage();
+ }
+ mEmittedReadyRead = false;
+ }
+}
+
+/*!
+ \internal
+*/
+bool JsonBuffer::messageAvailable()
+{
+ if (mMessageAvailable) {
+ // already found - no need to check again
+ return true;
+ }
+
if (mFormat == FormatUndefined && mBuffer.size() >= 4) {
- if (strncmp("bson", mBuffer.data(), 4) == 0)
+ if (strncmp("bson", mBuffer.constData(), 4) == 0)
mFormat = FormatBSON;
- else if (QJsonDocument::BinaryFormatTag == *((uint *) mBuffer.data()))
+ else if (QJsonDocument::BinaryFormatTag == *((uint *) mBuffer.constData()))
mFormat = FormatQBJS;
else if (mBuffer.at(0) == 0 &&
mBuffer.at(1) != 0 &&
@@ -211,12 +268,8 @@ void JsonBuffer::processMessages()
for ( ; mParserOffset < mBuffer.size() ; mParserOffset++ ) {
char c = mBuffer.at(mParserOffset);
if (scanUtf(c)) {
- QByteArray msg = mBuffer.mid(mParserStartOffset, mParserOffset - mParserStartOffset);
- QJsonObject obj = QJsonDocument::fromJson(msg).object();
- if (!obj.isEmpty())
- emit objectReceived(obj);
- mBuffer = mBuffer.mid(mParserOffset);
- resetParser();
+ mMessageAvailable = true;
+ return true;
}
}
break;
@@ -224,13 +277,8 @@ void JsonBuffer::processMessages()
for ( ; 2 * mParserOffset < mBuffer.size() ; mParserOffset++ ) {
int16_t c = qFromBigEndian(reinterpret_cast<const int16_t *>(mBuffer.constData())[mParserOffset]);
if (scanUtf(c)) {
- QByteArray msg = mBuffer.mid(mParserStartOffset * 2, 2*(mParserOffset - mParserStartOffset));
- QString s = QTextCodec::codecForName("UTF-16BE")->toUnicode(msg);
- QJsonObject obj = QJsonDocument::fromJson(s.toUtf8()).object();
- if (!obj.isEmpty())
- emit objectReceived(obj);
- mBuffer = mBuffer.mid(mParserOffset*2);
- resetParser();
+ mMessageAvailable = true;
+ return true;
}
}
break;
@@ -238,42 +286,93 @@ void JsonBuffer::processMessages()
for ( ; 2 * mParserOffset < mBuffer.size() ; mParserOffset++ ) {
int16_t c = qFromLittleEndian(reinterpret_cast<const int16_t *>(mBuffer.constData())[mParserOffset]);
if (scanUtf(c)) {
- QByteArray msg = mBuffer.mid(mParserStartOffset * 2, 2*(mParserOffset - mParserStartOffset));
- QString s = QTextCodec::codecForName("UTF-16LE")->toUnicode(msg);
- QJsonObject obj = QJsonDocument::fromJson(s.toUtf8()).object();
- if (!obj.isEmpty())
- emit objectReceived(obj);
- mBuffer = mBuffer.mid(mParserOffset*2);
- resetParser();
+ mMessageAvailable = true;
+ return true;
}
}
break;
case FormatBSON:
- while (mBuffer.size() >= 8) {
- qint32 message_size = qFromLittleEndian(((qint32 *)mBuffer.data())[1]);
- if (mBuffer.size() < message_size + 4)
- break;
- QByteArray msg = mBuffer.mid(4, message_size);
- QJsonObject obj = QJsonDocument::fromVariant(BsonObject(msg).toMap()).object();
- if (!obj.isEmpty())
- emit objectReceived(obj);
- mBuffer = mBuffer.mid(message_size+4);
+ if (mBuffer.size() >= 8) {
+ qint32 message_size = qFromLittleEndian(((qint32 *)mBuffer.constData())[1]);
+ if (mBuffer.size() >= message_size + 4) {
+ mMessageSize = message_size;
+ mMessageAvailable = true;
+ }
}
break;
case FormatQBJS:
- while (mBuffer.size() >= 12) {
+ if (mBuffer.size() >= 12) {
// ### TODO: Should use 'sizeof(Header)'
- qint32 message_size = qFromLittleEndian(((qint32 *)mBuffer.data())[2]) + 8;
- if (mBuffer.size() < message_size)
- break;
- QByteArray msg = mBuffer.left(message_size);
- QJsonObject obj = QJsonDocument::fromBinaryData(msg).object();
- if (!obj.isEmpty())
- emit objectReceived(obj);
- mBuffer = mBuffer.mid(message_size);
+ qint32 message_size = qFromLittleEndian(((qint32 *)mBuffer.constData())[2]) + 8;
+ if (mBuffer.size() >= message_size) {
+ mMessageSize = message_size;
+ mMessageAvailable = true;
+ }
}
break;
}
+ return mMessageAvailable;
+}
+
+/*!
+ \internal
+*/
+QJsonObject JsonBuffer::readMessage()
+{
+ QJsonObject obj;
+ if (messageAvailable()) {
+ switch (mFormat) {
+ case FormatUndefined:
+ break;
+ case FormatUTF8:
+ if (mParserStartOffset >= 0) {
+ QByteArray msg = rawData(mParserStartOffset, mParserOffset - mParserStartOffset);
+ obj = QJsonDocument::fromJson(msg).object();
+ // prepare for the next
+ mBuffer.remove(0, mParserOffset);
+ resetParser();
+ }
+ break;
+ case FormatUTF16BE:
+ if (mParserStartOffset >= 0) {
+ QByteArray msg = rawData(mParserStartOffset * 2, 2*(mParserOffset - mParserStartOffset));
+ QString s = QTextCodec::codecForName("UTF-16BE")->toUnicode(msg);
+ obj = QJsonDocument::fromJson(s.toUtf8()).object();
+ // prepare for the next
+ mBuffer.remove(0, mParserOffset*2);
+ resetParser();
+ }
+ break;
+ case FormatUTF16LE:
+ if (mParserStartOffset >= 0) {
+ QByteArray msg = rawData(mParserStartOffset * 2, 2*(mParserOffset - mParserStartOffset));
+ QString s = QTextCodec::codecForName("UTF-16LE")->toUnicode(msg);
+ obj = QJsonDocument::fromJson(s.toUtf8()).object();
+ // prepare for the next
+ mBuffer.remove(0, mParserOffset*2);
+ resetParser();
+ }
+ break;
+ case FormatBSON:
+ if (mMessageSize > 0) {
+ QByteArray msg = rawData(4, mMessageSize);
+ obj = QJsonDocument::fromVariant(BsonObject(msg).toMap()).object();
+ mBuffer.remove(0, mMessageSize+4);
+ mMessageSize = 0;
+ }
+ break;
+ case FormatQBJS:
+ if (mMessageSize > 0) {
+ QByteArray msg = rawData(0, mMessageSize);
+ obj = QJsonDocument::fromBinaryData(msg).object();
+ mBuffer.remove(0, mMessageSize);
+ mMessageSize = 0;
+ }
+ break;
+ }
+ mMessageAvailable = false;
+ }
+ return obj;
}
/*!
@@ -286,9 +385,30 @@ EncodingFormat JsonBuffer::format() const
}
/*!
- \fn void JsonBuffer::objectReceived(const QJsonObject& object)
- This signal is emitted when a new Qt Binary Json \a object has been received on the
- stream.
+ \fn void JsonBuffer::readyReadMessage()
+
+ This signal is emitted once every time new data is appended to the buffer
+ and a message is ready. \b readMessage() should be used to retrieve a message
+ and \b messageAvailable() to check for next available messages.
+ The client code may look like this:
+
+ \code
+ ...
+ connect(jsonbuffer, SIGNAL(readyReadMessage()), this, SLOT(processMessages()));
+ ...
+
+ void Foo::processMessages()
+ {
+ while (jsonbuffer->messageAvailable()) {
+ QJsonObject obj = jsonbuffer->readMessage();
+ <process message>
+ }
+ }
+ \endcode
+
+ \b readyReadMessage() is not emitted recursively; if you reenter the event loop
+ inside a slot connected to the \b readyReadMessage() signal, the signal will not
+ be reemitted.
*/
#include "moc_jsonbuffer_p.cpp"
diff --git a/src/jsonbuffer_p.h b/src/jsonbuffer_p.h
index 9bce2ce..8069234 100644
--- a/src/jsonbuffer_p.h
+++ b/src/jsonbuffer_p.h
@@ -62,13 +62,22 @@ public:
EncodingFormat format() const;
+ bool messageAvailable();
+ QJsonObject readMessage();
+
+ int size() const { return mBuffer.size(); }
+
+ inline bool isEnabled() const { return mEnabled; }
+ inline void setEnabled(bool enable) { mEnabled = enable; }
+
signals:
- void objectReceived(const QJsonObject& object);
+ void readyReadMessage();
private:
void processMessages();
bool scanUtf(int c);
void resetParser();
+ QByteArray rawData(int _start, int _len) const;
private:
enum UTF8ParsingState { ParseNormal, ParseInString, ParseInBackslash };
@@ -79,8 +88,17 @@ private:
int mParserDepth;
int mParserOffset;
int mParserStartOffset;
+ bool mEmittedReadyRead;
+ bool mMessageAvailable;
+ int mMessageSize;
+ bool mEnabled;
};
+inline QByteArray JsonBuffer::rawData(int _start, int _len) const
+{
+ return QByteArray::fromRawData(mBuffer.constData() + _start, _len);
+}
+
QT_END_NAMESPACE_JSONSTREAM
#endif // _JSON_BUFFER_H
diff --git a/src/jsonclient.cpp b/src/jsonclient.cpp
index 7f1d1f4..b16fd96 100644
--- a/src/jsonclient.cpp
+++ b/src/jsonclient.cpp
@@ -136,8 +136,7 @@ bool JsonClient::connectTCP(const QString& hostname, int port)
connect(&d->mStream, SIGNAL(receive(const QJsonObject&)),
this, SIGNAL(receive(const QJsonObject&)));
- d->mStream.send(d->mRegistrationMessage);
- return true;
+ return d->mStream.send(d->mRegistrationMessage);
}
return false;
@@ -162,11 +161,9 @@ bool JsonClient::connectLocal(const QString& socketname)
connect(socket, SIGNAL(disconnected()), SLOT(handleSocketDisconnected()));
Q_D(JsonClient);
d->mStream.setDevice(socket);
- connect(&d->mStream, SIGNAL(messageReceived(const QJsonObject&)),
- this, SIGNAL(messageReceived(const QJsonObject&)));
+ connect(&d->mStream, SIGNAL(readyReadMessage()), this, SLOT(processMessages()));
// qDebug() << "Sending local socket registration message" << mRegistrationMessage;
- d->mStream.send(d->mRegistrationMessage);
- return true;
+ return d->mStream.send(d->mRegistrationMessage);
}
delete socket;
return false;
@@ -174,16 +171,19 @@ bool JsonClient::connectLocal(const QString& socketname)
/*!
Send a \a message over the socket.
+ Returns true if the entire message was send/buffered or false otherwise.
*/
-void JsonClient::send(const QJsonObject &message)
+bool JsonClient::send(const QJsonObject &message)
{
+ bool ret = false;
Q_D(JsonClient);
if (d->mStream.isOpen()) {
- d->mStream << message;
+ ret = d->mStream.send(message);
} else {
qCritical() << Q_FUNC_INFO << "stream socket is not available";
}
+ return ret;
}
/*!
@@ -214,6 +214,19 @@ void JsonClient::handleSocketDisconnected()
/*!
+ \internal
+*/
+void JsonClient::processMessages()
+{
+ Q_D(JsonClient);
+ while (d->mStream.messageAvailable()) {
+ QJsonObject obj = d->mStream.readMessage();
+ if (!obj.isEmpty())
+ emit messageReceived(obj);
+ }
+}
+
+/*!
\fn void JsonClient::messageReceived(const QJsonObject &message)
This signal is emitted when a \a message is received from the server.
*/
diff --git a/src/jsonclient.h b/src/jsonclient.h
index 70e8e88..2fe683e 100644
--- a/src/jsonclient.h
+++ b/src/jsonclient.h
@@ -63,7 +63,7 @@ public:
bool connectTCP(const QString& hostname, int port);
bool connectLocal(const QString& socketname);
- void send(const QJsonObject&);
+ bool send(const QJsonObject&);
void setFormat( EncodingFormat format );
// Do we really need a "connect with delay or error" facility?
@@ -75,6 +75,7 @@ signals:
private slots:
void handleSocketDisconnected();
+ void processMessages();
private:
Q_DECLARE_PRIVATE(JsonClient)
diff --git a/src/jsonpipe.cpp b/src/jsonpipe.cpp
index 981a25e..18c8f46 100644
--- a/src/jsonpipe.cpp
+++ b/src/jsonpipe.cpp
@@ -107,8 +107,7 @@ JsonPipe::JsonPipe(QObject *parent)
{
Q_D(JsonPipe);
d->mInBuffer = new JsonBuffer(this);
- connect(d->mInBuffer, SIGNAL(objectReceived(const QJsonObject&)),
- SLOT(objectReceived(const QJsonObject&)));
+ connect(d->mInBuffer, SIGNAL(readyReadMessage()), SLOT(processMessages()));
}
/*!
@@ -278,6 +277,21 @@ void JsonPipe::objectReceived(const QJsonObject& object)
}
/*!
+ \internal
+*/
+void JsonPipe::processMessages()
+{
+ Q_D(JsonPipe);
+ d->mInBuffer->setEnabled(false);
+ while (d->mInBuffer->messageAvailable()) {
+ QJsonObject obj = d->mInBuffer->readMessage();
+ if (!obj.isEmpty())
+ objectReceived(obj);
+ }
+ d->mInBuffer->setEnabled(true);
+}
+
+/*!
Return the current JsonPipe::EncodingFormat.
*/
diff --git a/src/jsonpipe.h b/src/jsonpipe.h
index 63bf387..eef9d65 100644
--- a/src/jsonpipe.h
+++ b/src/jsonpipe.h
@@ -80,6 +80,7 @@ signals:
void error(PipeError);
protected slots:
+ void processMessages();
void objectReceived(const QJsonObject& object);
void inReady(int fd);
void outReady(int fd);
diff --git a/src/jsonserverclient.cpp b/src/jsonserverclient.cpp
index 73fff00..5b53e74 100644
--- a/src/jsonserverclient.cpp
+++ b/src/jsonserverclient.cpp
@@ -121,8 +121,7 @@ void JsonServerClient::setSocket(QLocalSocket *socket)
d->m_stream = new JsonStream(socket);
d->m_stream->setParent(this);
connect(socket, SIGNAL(disconnected()), this, SLOT(handleDisconnect()));
- connect(d->m_stream, SIGNAL(messageReceived(const QJsonObject&)),
- this, SLOT(received(const QJsonObject&)));
+ connect(d->m_stream, SIGNAL(readyReadMessage()), this, SLOT(processMessages()));
}
}
@@ -172,8 +171,7 @@ void JsonServerClient::stop()
{
// qDebug() << Q_FUNC_INFO;
Q_D(JsonServerClient);
- disconnect(d->m_stream, SIGNAL(messageReceived(const QJsonObject&)),
- this, SLOT(received(const QJsonObject&)));
+ disconnect(d->m_stream, SIGNAL(readyReadMessage()), this, SLOT(processMessages()));
d->m_socket->disconnectFromServer();
// qDebug() << Q_FUNC_INFO << "done";
}
@@ -209,15 +207,18 @@ void JsonServerClient::received(const QJsonObject& message)
/*!
Send a \a message to the client.
+ Returns true if the entire message was send/buffered or false otherwise.
*/
-void JsonServerClient::send(const QJsonObject &message)
+bool JsonServerClient::send(const QJsonObject &message)
{
+ bool ret = false;
Q_D(JsonServerClient);
if (d->m_stream) {
// qDebug() << "Sending message" << message;
- d->m_stream->send(message);
+ ret = d->m_stream->send(message);
}
+ return ret;
}
void JsonServerClient::handleDisconnect()
@@ -233,6 +234,19 @@ void JsonServerClient::handleDisconnect()
}
/*!
+ \internal
+ */
+void JsonServerClient::processMessages()
+{
+ Q_D(JsonServerClient);
+ while (d->m_stream->messageAvailable()) {
+ QJsonObject obj = d->m_stream->readMessage();
+ if (!obj.isEmpty())
+ received(obj);
+ }
+}
+
+/*!
\fn JsonServerClient::disconnected(const QString& identifier)
This signal is emitted when the client has been disconnected. The
\a identifier property is included for convenience.
diff --git a/src/jsonserverclient.h b/src/jsonserverclient.h
index 00b7394..5c97dfb 100644
--- a/src/jsonserverclient.h
+++ b/src/jsonserverclient.h
@@ -64,7 +64,7 @@ public:
void start();
void stop();
- void send(const QJsonObject &message);
+ bool send(const QJsonObject &message);
void setAuthority(JsonAuthority *authority);
@@ -83,6 +83,7 @@ signals:
private slots:
void received(const QJsonObject& message);
void handleDisconnect();
+ void processMessages();
private:
Q_DECLARE_PRIVATE(JsonServerClient)
diff --git a/src/jsonstream.cpp b/src/jsonstream.cpp
index 348eb2d..feacffc 100644
--- a/src/jsonstream.cpp
+++ b/src/jsonstream.cpp
@@ -74,11 +74,15 @@ class JsonStreamPrivate
public:
JsonStreamPrivate()
: mDevice(0)
- , mFormat(FormatUndefined) {}
+ , mFormat(FormatUndefined)
+ , mReadBufferSize(0)
+ , mWriteBufferSize(0) {}
QIODevice *mDevice;
JsonBuffer *mBuffer;
EncodingFormat mFormat;
+ qint64 mReadBufferSize;
+ qint64 mWriteBufferSize;
};
/****************************************************************************/
@@ -102,8 +106,7 @@ JsonStream::JsonStream(QIODevice *device)
{
Q_D(JsonStream);
d->mBuffer = new JsonBuffer(this);
- connect(d->mBuffer, SIGNAL(objectReceived(const QJsonObject&)),
- SLOT(objectReceived(const QJsonObject&)));
+ connect(d->mBuffer, SIGNAL(readyReadMessage()), SLOT(messageReceived()));
setDevice(device);
}
@@ -160,21 +163,30 @@ void JsonStream::setDevice( QIODevice *device )
Q_D(JsonStream);
if (d->mDevice) {
disconnect(d->mDevice, SIGNAL(readyRead()), this, SLOT(dataReadyOnSocket()));
+ disconnect(d->mDevice, SIGNAL(bytesWritten(qint64)), this, SIGNAL(bytesWritten(qint64)));
disconnect(d->mDevice, SIGNAL(aboutToClose()), this, SIGNAL(aboutToClose()));
}
d->mDevice = device;
if (device) {
connect(device, SIGNAL(readyRead()), this, SLOT(dataReadyOnSocket()));
+ connect(device, SIGNAL(bytesWritten(qint64)), this, SIGNAL(bytesWritten(qint64)));
connect(device, SIGNAL(aboutToClose()), this, SIGNAL(aboutToClose()));
}
}
/*!
- Send a JsonObject \a object over the stream
+ Send a JsonObject \a object over the stream.
+ Returns \b true if the entire message was sent or buffered or \b false otherwise.
+
+ JsonStream does not have a write buffer of its own. Rather, it uses the
+ write buffer of the \l{device()}. It will not cause that buffer to grow
+ larger than \l{writeBufferSize()} at any time. If this would occur, this
+ method will return \b false.
*/
-void JsonStream::send(const QJsonObject& object)
+bool JsonStream::send(const QJsonObject& object)
{
+ bool bRet(false);
QJsonDocument document(object);
Q_D(JsonStream);
@@ -183,40 +195,55 @@ void JsonStream::send(const QJsonObject& object)
d->mFormat = FormatQBJS;
// Deliberate fall through
case FormatQBJS:
- sendInternal( document.toBinaryData() );
+ bRet = sendInternal( document.toBinaryData() );
break;
case FormatUTF8:
- sendInternal( document.toJson() );
+ bRet = sendInternal( document.toJson() );
break;
case FormatUTF16BE:
- sendInternal( QTextCodec::codecForName("UTF-16BE")->fromUnicode(QString::fromUtf8(document.toJson())).mid(2) ); // Chop off BOM
+ bRet = sendInternal( QTextCodec::codecForName("UTF-16BE")->fromUnicode(QString::fromUtf8(document.toJson())).mid(2) ); // Chop off BOM
break;
case FormatUTF16LE:
- sendInternal( QTextCodec::codecForName("UTF-16LE")->fromUnicode(QString::fromUtf8(document.toJson())).mid(2) ); // Chop off BOM
+ bRet = sendInternal( QTextCodec::codecForName("UTF-16LE")->fromUnicode(QString::fromUtf8(document.toJson())).mid(2) ); // Chop off BOM
break;
case FormatBSON:
{
BsonObject bson(document.toVariant().toMap());
- sendInternal(bsonToByteArray(bson));
+ bRet = sendInternal(bsonToByteArray(bson));
break;
}
}
+ return bRet;
}
/*!
\internal
Send raw QByteArray \a byteArray data over the socket.
*/
-
-void JsonStream::sendInternal(const QByteArray& byteArray)
+bool JsonStream::sendInternal(const QByteArray& byteArray)
{
Q_D(JsonStream);
- if (!d->mDevice) {
+ if (!isOpen()) {
qWarning() << Q_FUNC_INFO << "No device in JsonStream";
- return;
+ return false;
+ }
+
+ int nBytes = 0;
+ if (d->mWriteBufferSize <= 0 || d->mDevice->bytesToWrite() + byteArray.size() <= d->mWriteBufferSize) {
+ for (int nSz = byteArray.size(); nSz > 0; ) {
+ int nWrite = d->mDevice->write( byteArray.constData() + nBytes, nSz);
+ if (nWrite <= 0) {
+ // write error
+ qWarning() << Q_FUNC_INFO << __LINE__
+ << QString::fromLatin1("Write error. QIODevice::write() returned %1 (%2).")
+ .arg(nWrite).arg(d->mDevice->errorString());
+ break;
+ }
+ nBytes += nWrite;
+ nSz -= nWrite;
+ }
}
- int nBytes = d->mDevice->write( byteArray.data(), byteArray.size() );
if (QLocalSocket *socket = qobject_cast<QLocalSocket*>(d->mDevice))
socket->flush();
else if (QAbstractSocket *socket = qobject_cast<QAbstractSocket*>(d->mDevice))
@@ -224,22 +251,24 @@ void JsonStream::sendInternal(const QByteArray& byteArray)
else
qWarning() << Q_FUNC_INFO << "Unknown socket type:" << d->mDevice->metaObject()->className();
- if (nBytes != byteArray.size())
+ bool bFail;
+ if ((bFail = (nBytes != byteArray.size())))
qCritical() << Q_FUNC_INFO << __LINE__
<< QString::fromLatin1("Expected to write %1 bytes, actually %2.").arg(byteArray.size()).arg(nBytes);
+ return !bFail;
}
/*!
\internal
- Handle a received Qt Binary Json \a object and emit the correct signals
+ Handle a received readyReadMessage signal and emit the correct signals
*/
-void JsonStream::objectReceived(const QJsonObject& object)
+void JsonStream::messageReceived()
{
Q_D(JsonStream);
if (d->mFormat == FormatUndefined)
d->mFormat = d->mBuffer->format();
- emit messageReceived(object);
+ emit readyReadMessage();
}
/*!
@@ -250,6 +279,26 @@ void JsonStream::objectReceived(const QJsonObject& object)
void JsonStream::dataReadyOnSocket()
{
Q_D(JsonStream);
+ if (d->mReadBufferSize > 0) {
+ while (d->mDevice->bytesAvailable() + d->mBuffer->size() > d->mReadBufferSize) {
+ // can't fit all data into a read buffer - read a part that fits
+ d->mBuffer->append(d->mDevice->read(d->mReadBufferSize - d->mBuffer->size()));
+
+ // if the read buffer is full then emit readBufferOverflow and allow user to increase the buffer size
+ if (d->mBuffer->size() == d->mReadBufferSize) {
+ emit readBufferOverflow(d->mDevice->bytesAvailable() + d->mBuffer->size());
+ if (d->mBuffer->size() == d->mReadBufferSize) {
+ // still can't read anything - close connection
+ d->mDevice->close();
+ return;
+ }
+ else if (0 == d->mReadBufferSize) {
+ // user removed buffer size limitation
+ break;
+ }
+ }
+ }
+ }
d->mBuffer->append( d->mDevice->readAll());
}
@@ -273,23 +322,118 @@ void JsonStream::setFormat( EncodingFormat format )
d->mFormat = format;
}
+/*!
+ Returns a maximum size of the inbound message buffer.
+ */
+qint64 JsonStream::readBufferSize() const
+{
+ Q_D(const JsonStream);
+ return d->mReadBufferSize;
+}
/*!
- \relates JsonStream
+ Sets a maximum size of the inbound message buffer to \a sz thus capping a size
+ of an inbound message.
+ */
+void JsonStream::setReadBufferSize(qint64 sz)
+{
+ if (sz >= 0) {
+ Q_D(JsonStream);
+ d->mReadBufferSize = sz;
+ }
+}
- Sends the \a map via the \a stream.
-*/
-JsonStream& operator<<( JsonStream& stream, const QJsonObject& map )
+/*!
+ Returns a maximum size of the outbound message buffer. A value of 0
+ means the buffer size is unlimited.
+ */
+qint64 JsonStream::writeBufferSize() const
{
- stream.send(map);
- return stream;
+ Q_D(const JsonStream);
+ return d->mWriteBufferSize;
}
+/*!
+ Sets a maximum size of the outbound message buffer to \a sz thus capping a size
+ of an outbound message. A value of 0 means the buffer size is unlimited.
+ */
+void JsonStream::setWriteBufferSize(qint64 sz)
+{
+ if (sz >= 0) {
+ Q_D(JsonStream);
+ d->mWriteBufferSize = sz;
+ }
+}
/*!
- \fn void JsonStream::messageReceived(const QJsonObject& message)
- This signal is emitted when a new \a message has been received on the
- stream.
+ Returns a number of bytes currently in the write buffer. Effectively,
+ if \l{writeBufferSize()} is not unlimited, the largest message you can
+ send at any one time is (\l{writeBufferSize()} - \b bytesToWrite()) bytes.
+ */
+qint64 JsonStream::bytesToWrite() const
+{
+ Q_D(const JsonStream);
+ return (d->mDevice ? d->mDevice->bytesToWrite() : 0);
+}
+
+/*!
+ Returns a JSON object that has been received. If no message is
+ available, an empty JSON object is returned.
+ */
+QJsonObject JsonStream::readMessage()
+{
+ Q_D(JsonStream);
+ return d->mBuffer->readMessage();
+}
+
+/*!
+ Returns \b true if a message is available to be read via \l{readMessage()}
+ or \b false otherwise.
+ */
+bool JsonStream::messageAvailable()
+{
+ Q_D(JsonStream);
+ return d->mBuffer->messageAvailable();
+}
+
+/*! \fn JsonStream::bytesWritten(qint64 bytes)
+
+ This signal is emitted every time a payload of data has been
+ written to the device. The \a bytes argument is set to the number
+ of bytes that were written in this payload.
+
+ If a previous call to \l{send()} returned \b false, you should re-send
+ the message when this signal is emitted, as the write buffer may have been
+ emptied enough to hold the new message.
+*/
+
+/*!
+ \fn void JsonStream::readyReadMessage()
+
+ This signal is emitted once every time new data arrives on the \l{device()}
+ and a message is ready. \l{readMessage()} should be used to retrieve a message
+ and \l{messageAvailable()} to check for next available messages.
+ The client code may look like this:
+
+ \code
+ ...
+ connect(jsonstream, SIGNAL(readyReadMessage()), this, SLOT(processMessages()));
+ ...
+
+ void Foo::processMessages()
+ {
+ while (jsonstream->messageAvailable()) {
+ QJsonObject obj = jsonstream->readMessage();
+ <process message>
+ }
+ }
+ \endcode
+
+ \b readyReadMessage() is not emitted recursively; if you reenter the event loop
+ inside a slot connected to the \b readyReadMessage() signal, the signal will not
+ be reemitted.
+
+ \sa readMessage(), messageAvailable()
*/
/*!
@@ -297,6 +441,16 @@ JsonStream& operator<<( JsonStream& stream, const QJsonObject& map )
This signal is emitted when the underlying \c QIODevice is about to close.
*/
+/*! \fn JsonStream::readBufferOverflow(qint64 bytes)
+
+ This signal is emitted when the read buffer is full of data that has been read
+ from the \l{device()}, \a bytes additional bytes are available on the device,
+ but the message is not complete. The \l{readBufferSize()} mayb e increased
+ to a sufficient size in a slot connected to this signal, in which case more
+ data will be read into the read buffer. If the buffer size is not increased,
+ the connection is closed.
+ */
+
#include "moc_jsonstream.cpp"
QT_END_NAMESPACE_JSONSTREAM
diff --git a/src/jsonstream.h b/src/jsonstream.h
index 3acb204..693dad4 100644
--- a/src/jsonstream.h
+++ b/src/jsonstream.h
@@ -64,29 +64,40 @@ public:
QIODevice *device() const;
void setDevice( QIODevice *device );
- void send(const QJsonObject& message);
+ bool send(const QJsonObject& message);
EncodingFormat format() const;
void setFormat(EncodingFormat format);
+ qint64 readBufferSize() const;
+ void setReadBufferSize(qint64);
+
+ qint64 writeBufferSize() const;
+ void setWriteBufferSize(qint64 sz);
+
+ qint64 bytesToWrite() const;
+
+ bool messageAvailable();
+ QJsonObject readMessage();
+
signals:
- void messageReceived(const QJsonObject& message);
+ void bytesWritten(qint64);
+ void readyReadMessage();
void aboutToClose();
+ void readBufferOverflow(qint64);
protected slots:
void dataReadyOnSocket();
- void objectReceived(const QJsonObject& object);
+ void messageReceived();
protected:
- void sendInternal(const QByteArray& byteArray);
+ bool sendInternal(const QByteArray& byteArray);
private:
Q_DECLARE_PRIVATE(JsonStream)
QScopedPointer<JsonStreamPrivate> d_ptr;
};
-JsonStream& operator<<( JsonStream&, const QJsonObject& );
-
QT_END_NAMESPACE_JSONSTREAM
#endif // _JSON_STREAM_H
diff --git a/tests/auto/jsonstream/testClient/main.cpp b/tests/auto/jsonstream/testClient/main.cpp
index 79e9057..e23e301 100644
--- a/tests/auto/jsonstream/testClient/main.cpp
+++ b/tests/auto/jsonstream/testClient/main.cpp
@@ -127,6 +127,14 @@ void Container::sendSchemaTestMessage()
void Container::received(const QJsonObject& message)
{
+ // test large string size
+ if (message.contains("large") || message.contains("large_size")) {
+ if (message.value("large").toString().size() != message.value("large_size").toDouble()) {
+ qWarning() << "Large string size mismatch" << message.value("large").toString().size() << "!=" << message.value("large_size").toDouble();
+ exit(3);
+ }
+ }
+
QString command = message.value("command").toString();
if (!command.isEmpty()) {
qDebug() << "Received command" << command;
diff --git a/tests/auto/jsonstream/tst_jsonstream.cpp b/tests/auto/jsonstream/tst_jsonstream.cpp
index 14f3b08..8bc12d6 100644
--- a/tests/auto/jsonstream/tst_jsonstream.cpp
+++ b/tests/auto/jsonstream/tst_jsonstream.cpp
@@ -142,6 +142,7 @@ public:
void waitForFinished() {
if (process->state() == QProcess::Running)
QVERIFY(process->waitForFinished(5000));
+ QVERIFY(process->exitCode() == 0);
delete process;
process = 0;
}
@@ -173,7 +174,7 @@ class BasicServer : public QObject {
Q_OBJECT
public:
- BasicServer(const QString& socketname) : socket(0), stream(0) {
+ BasicServer(const QString& socketname, qint64 _sz = 0) : socket(0), stream(0), readBufferSize(_sz) {
QLocalServer::removeServer(socketname);
server = new QLocalServer(this);
connect(server, SIGNAL(newConnection()), SLOT(handleConnection()));
@@ -186,9 +187,8 @@ public:
server = NULL;
}
- void send(const QJsonObject& message) {
- QVERIFY(stream);
- stream->send(message);
+ bool send(const QJsonObject& message) {
+ return stream ? stream->send(message) : false;
}
void waitDisconnect(int timeout=5000) {
@@ -207,15 +207,26 @@ public:
return stream->format();
}
+ JsonStream *jsonStream() const { return stream; }
private slots:
void handleConnection() {
socket = server->nextPendingConnection();
QVERIFY(socket);
stream = new JsonStream(socket);
stream->setParent(socket);
+ if (readBufferSize > 0)
+ stream->setReadBufferSize(readBufferSize);
connect(socket, SIGNAL(disconnected()), SLOT(handleDisconnection()));
- connect(stream, SIGNAL(messageReceived(const QJsonObject&)),
- SIGNAL(messageReceived(const QJsonObject&)));
+ connect(stream, SIGNAL(readyReadMessage()), SLOT(processMessages()));
+ connect(stream, SIGNAL(readBufferOverflow(qint64)), SLOT(handleReadBufferOverflow(qint64)));
+ }
+
+ void processMessages() {
+ while (stream->messageAvailable()) {
+ QJsonObject obj = stream->readMessage();
+ if (!obj.isEmpty())
+ emit messageReceived(obj);
+ }
}
void handleDisconnection() {
@@ -225,13 +236,21 @@ private slots:
stream = NULL;
}
+ void handleReadBufferOverflow(qint64 sz) {
+ QVERIFY(readBufferSize > 0 && sz > readBufferSize);
+ stream->setReadBufferSize(sz);
+ emit readBufferOverflow(sz);
+ }
+
signals:
void messageReceived(const QJsonObject&);
+ void readBufferOverflow(qint64);
private:
QLocalServer *server;
QLocalSocket *socket;
JsonStream *stream;
+ qint64 readBufferSize;
};
/****************************/
@@ -253,6 +272,7 @@ private slots:
void pipeTest();
void pipeFormatTest();
void pipeWaitTest();
+ void bufferSizeTest();
};
void tst_JsonStream::initTestCase()
@@ -417,12 +437,67 @@ void tst_JsonStream::formatTest()
QVERIFY(object.value("item2").toString() == "This is item 2");
msg.insert("command", QLatin1String("exit"));
- server.send(msg);
+ QVERIFY(server.send(msg));
server.waitDisconnect();
child.waitForFinished();
}
}
+void tst_JsonStream::bufferSizeTest()
+{
+ QString socketname = "/tmp/tst_socket";
+
+ BasicServer server(socketname, 100);
+ QSignalSpy spy(&server, SIGNAL(messageReceived(const QJsonObject&)));
+ QSignalSpy spy1(&server, SIGNAL(readBufferOverflow(qint64)));
+ QTime stopWatch;
+
+ Child child("testClient/testClient",
+ QStringList() << "-socket" << socketname);
+
+ stopWatch.start();
+ forever {
+ QTestEventLoop::instance().enterLoop(1);
+ if (stopWatch.elapsed() >= 5000)
+ QFAIL("Timed out");
+ if (spy.count())
+ break;
+ }
+ QVERIFY(spy1.count() == 1); // overflow happend only once
+
+ QJsonObject msg = qvariant_cast<QJsonObject>(spy.last().at(0));
+ QVERIFY(msg.value("text").isString() && msg.value("text").toString() == QLatin1String("Standard text"));
+ QVERIFY(msg.value("int").isDouble() && msg.value("int").toDouble() == 100);
+ QVERIFY(msg.value("float").isDouble() && msg.value("float").toDouble() == 100.0);
+ QVERIFY(msg.value("true").isBool() && msg.value("true").toBool() == true);
+ QVERIFY(msg.value("false").isBool() && msg.value("false").toBool() == false);
+ QVERIFY(msg.value("array").isArray());
+ QJsonArray array = msg.value("array").toArray();
+ QVERIFY(array.size() == 3);
+ QVERIFY(array.at(0).toString() == "one");
+ QVERIFY(array.at(1).toString() == "two");
+ QVERIFY(array.at(2).toString() == "three");
+ QVERIFY(msg.value("object").isObject());
+ QJsonObject object = msg.value("object").toObject();
+ QVERIFY(object.value("item1").toString() == "This is item 1");
+ QVERIFY(object.value("item2").toString() == "This is item 2");
+
+ msg.insert("command", QLatin1String("exit"));
+
+ server.jsonStream()->setWriteBufferSize(100);
+ QVERIFY(!server.send(msg));
+
+ QString strLarge(500000, '*');
+ msg.insert("large", strLarge);
+ msg.insert("large_size", strLarge.size());
+ server.jsonStream()->setWriteBufferSize(0);
+ QVERIFY(server.send(msg));
+
+
+ server.waitDisconnect();
+ child.waitForFinished();
+}
+
void tst_JsonStream::schemaTest()
{
QString strSchemasDir(QDir::currentPath() + "/" + "schemas");