From d1a671b698516847798d4041d4358c485833f8c3 Mon Sep 17 00:00:00 2001 From: Joerg Bornemann Date: Sun, 11 Dec 2011 17:09:10 +0100 Subject: extract QWindowsPipeReader from qlocalsocket_win.cpp The code for reading named pipes can now be used in other places as well. Change-Id: Id734617a3927e369491a6c5daf965169ceb01f74 Reviewed-by: Oswald Buddenhagen --- src/network/socket/qlocalsocket.h | 2 +- src/network/socket/qlocalsocket_p.h | 18 +-- src/network/socket/qlocalsocket_win.cpp | 243 +++++--------------------------- 3 files changed, 36 insertions(+), 227 deletions(-) (limited to 'src/network/socket') diff --git a/src/network/socket/qlocalsocket.h b/src/network/socket/qlocalsocket.h index a30f37011a..74c54bf873 100644 --- a/src/network/socket/qlocalsocket.h +++ b/src/network/socket/qlocalsocket.h @@ -131,9 +131,9 @@ private: Q_PRIVATE_SLOT(d_func(), void _q_stateChanged(QAbstractSocket::SocketState)) Q_PRIVATE_SLOT(d_func(), void _q_error(QAbstractSocket::SocketError)) #elif defined(Q_OS_WIN) - Q_PRIVATE_SLOT(d_func(), void _q_notified()) Q_PRIVATE_SLOT(d_func(), void _q_canWrite()) Q_PRIVATE_SLOT(d_func(), void _q_pipeClosed()) + Q_PRIVATE_SLOT(d_func(), void _q_winError(ulong, const QString &)) #else Q_PRIVATE_SLOT(d_func(), void _q_stateChanged(QAbstractSocket::SocketState)) Q_PRIVATE_SLOT(d_func(), void _q_error(QAbstractSocket::SocketError)) diff --git a/src/network/socket/qlocalsocket_p.h b/src/network/socket/qlocalsocket_p.h index 32781789b0..b256f84325 100644 --- a/src/network/socket/qlocalsocket_p.h +++ b/src/network/socket/qlocalsocket_p.h @@ -63,8 +63,8 @@ #if defined(QT_LOCALSOCKET_TCP) # include "qtcpsocket.h" #elif defined(Q_OS_WIN) +# include "private/qwindowspipereader_p.h" # include "private/qwindowspipewriter_p.h" -# include "private/qringbuffer_p.h" # include #else # include "private/qabstractsocketengine_p.h" @@ -131,25 +131,13 @@ public: ~QLocalSocketPrivate(); void destroyPipeHandles(); void setErrorString(const QString &function); - void _q_notified(); void _q_canWrite(); void _q_pipeClosed(); - DWORD checkPipeState(); - void startAsyncRead(); - bool completeAsyncRead(); - void checkReadyRead(); + void _q_winError(ulong windowsError, const QString &function); HANDLE handle; - OVERLAPPED overlapped; QWindowsPipeWriter *pipeWriter; - qint64 readBufferMaxSize; - QRingBuffer readBuffer; - int actualReadBufferSize; - QWinEventNotifier *dataReadNotifier; + QWindowsPipeReader *pipeReader; QLocalSocket::LocalSocketError error; - bool readSequenceStarted; - QTimer *emitReadyReadTimer; - bool pipeClosed; - static const qint64 initialReadBufferSize = 4096; #else QLocalUnixSocket unixSocket; QString generateErrorString(QLocalSocket::LocalSocketError, const QString &function) const; diff --git a/src/network/socket/qlocalsocket_win.cpp b/src/network/socket/qlocalsocket_win.cpp index 9d7fb4ef42..1b0ee0d9a0 100644 --- a/src/network/socket/qlocalsocket_win.cpp +++ b/src/network/socket/qlocalsocket_win.cpp @@ -50,19 +50,21 @@ QT_BEGIN_NAMESPACE void QLocalSocketPrivate::init() { Q_Q(QLocalSocket); - emitReadyReadTimer = new QTimer(q); - emitReadyReadTimer->setSingleShot(true); - QObject::connect(emitReadyReadTimer, SIGNAL(timeout()), q, SIGNAL(readyRead())); - memset(&overlapped, 0, sizeof(overlapped)); - overlapped.hEvent = CreateEvent(NULL, TRUE, FALSE, NULL); - dataReadNotifier = new QWinEventNotifier(overlapped.hEvent, q); - q->connect(dataReadNotifier, SIGNAL(activated(HANDLE)), q, SLOT(_q_notified())); + pipeReader = new QWindowsPipeReader(q); + q->connect(pipeReader, SIGNAL(readyRead()), SIGNAL(readyRead())); + q->connect(pipeReader, SIGNAL(pipeClosed()), SLOT(_q_pipeClosed()), Qt::QueuedConnection); + q->connect(pipeReader, SIGNAL(winError(ulong, const QString &)), SLOT(_q_winError(ulong, const QString &))); } void QLocalSocketPrivate::setErrorString(const QString &function) +{ + DWORD windowsError = GetLastError(); + _q_winError(windowsError, function); +} + +void QLocalSocketPrivate::_q_winError(ulong windowsError, const QString &function) { Q_Q(QLocalSocket); - BOOL windowsError = GetLastError(); QLocalSocket::LocalSocketState currentState = state; // If the connectToServer fails due to WaitNamedPipe() time-out, assume ConnectionError @@ -106,13 +108,9 @@ void QLocalSocketPrivate::setErrorString(const QString &function) QLocalSocketPrivate::QLocalSocketPrivate() : QIODevicePrivate(), handle(INVALID_HANDLE_VALUE), + pipeReader(0), pipeWriter(0), - readBufferMaxSize(0), - actualReadBufferSize(0), error(QLocalSocket::UnknownSocketError), - readSequenceStarted(false), - emitReadyReadTimer(0), - pipeClosed(false), state(QLocalSocket::UnconnectedState) { } @@ -120,7 +118,6 @@ QLocalSocketPrivate::QLocalSocketPrivate() : QIODevicePrivate(), QLocalSocketPrivate::~QLocalSocketPrivate() { destroyPipeHandles(); - CloseHandle(overlapped.hEvent); } void QLocalSocketPrivate::destroyPipeHandles() @@ -200,129 +197,7 @@ qint64 QLocalSocket::readData(char *data, qint64 maxSize) { Q_D(QLocalSocket); - if (d->pipeClosed && d->actualReadBufferSize == 0) - return -1; // signal EOF - - qint64 readSoFar; - // If startAsyncRead() read data, copy it to its destination. - if (maxSize == 1 && d->actualReadBufferSize > 0) { - *data = d->readBuffer.getChar(); - d->actualReadBufferSize--; - readSoFar = 1; - } else { - qint64 bytesToRead = qMin(qint64(d->actualReadBufferSize), maxSize); - readSoFar = 0; - while (readSoFar < bytesToRead) { - const char *ptr = d->readBuffer.readPointer(); - int bytesToReadFromThisBlock = qMin(bytesToRead - readSoFar, - qint64(d->readBuffer.nextDataBlockSize())); - memcpy(data + readSoFar, ptr, bytesToReadFromThisBlock); - readSoFar += bytesToReadFromThisBlock; - d->readBuffer.free(bytesToReadFromThisBlock); - d->actualReadBufferSize -= bytesToReadFromThisBlock; - } - } - - if (!d->pipeClosed) { - if (!d->actualReadBufferSize) - d->emitReadyReadTimer->stop(); - if (!d->readSequenceStarted) - d->startAsyncRead(); - } - - return readSoFar; -} - -/*! - \internal - Reads data from the socket into the readbuffer - */ -void QLocalSocketPrivate::startAsyncRead() -{ - do { - DWORD bytesToRead = checkPipeState(); - if (pipeClosed) - return; - - if (bytesToRead == 0) { - // There are no bytes in the pipe but we need to - // start the overlapped read with some buffer size. - bytesToRead = initialReadBufferSize; - } - - if (readBufferMaxSize && bytesToRead > (readBufferMaxSize - readBuffer.size())) { - bytesToRead = readBufferMaxSize - readBuffer.size(); - if (bytesToRead == 0) { - // Buffer is full. User must read data from the buffer - // before we can read more from the pipe. - return; - } - } - - char *ptr = readBuffer.reserve(bytesToRead); - - readSequenceStarted = true; - if (ReadFile(handle, ptr, bytesToRead, NULL, &overlapped)) { - completeAsyncRead(); - } else { - switch (GetLastError()) { - case ERROR_IO_PENDING: - // This is not an error. We're getting notified, when data arrives. - return; - case ERROR_MORE_DATA: - // This is not an error. The synchronous read succeeded. - // We're connected to a message mode pipe and the message - // didn't fit into the pipe's system buffer. - completeAsyncRead(); - break; - case ERROR_PIPE_NOT_CONNECTED: - { - // It may happen, that the other side closes the connection directly - // after writing data. Then we must set the appropriate socket state. - pipeClosed = true; - Q_Q(QLocalSocket); - QTimer::singleShot(0, q, SLOT(_q_pipeClosed())); - return; - } - default: - setErrorString(QLatin1String("QLocalSocketPrivate::startAsyncRead")); - return; - } - } - } while (!readSequenceStarted); -} - -/*! - \internal - Sets the correct size of the read buffer after a read operation. - Returns false, if an error occurred or the connection dropped. - */ -bool QLocalSocketPrivate::completeAsyncRead() -{ - ResetEvent(overlapped.hEvent); - readSequenceStarted = false; - - DWORD bytesRead; - if (!GetOverlappedResult(handle, &overlapped, &bytesRead, TRUE)) { - switch (GetLastError()) { - case ERROR_MORE_DATA: - // This is not an error. We're connected to a message mode - // pipe and the message didn't fit into the pipe's system - // buffer. We will read the remaining data in the next call. - break; - case ERROR_PIPE_NOT_CONNECTED: - return false; - default: - setErrorString(QLatin1String("QLocalSocketPrivate::completeAsyncRead")); - return false; - } - } - - actualReadBufferSize += bytesRead; - readBuffer.truncate(actualReadBufferSize); - if (!emitReadyReadTimer->isActive()) - emitReadyReadTimer->start(); - return true; + return d->pipeReader->read(data, maxSize); } qint64 QLocalSocket::writeData(const char *data, qint64 maxSize) @@ -347,26 +222,6 @@ void QLocalSocket::abort() close(); } -/*! - \internal - Returns the number of available bytes in the pipe. - Sets QLocalSocketPrivate::pipeClosed to true if the connection is broken. - */ -DWORD QLocalSocketPrivate::checkPipeState() -{ - Q_Q(QLocalSocket); - DWORD bytes; - if (PeekNamedPipe(handle, NULL, 0, NULL, &bytes, NULL)) { - return bytes; - } else { - if (!pipeClosed) { - pipeClosed = true; - QTimer::singleShot(0, q, SLOT(_q_pipeClosed())); - } - } - return 0; -} - void QLocalSocketPrivate::_q_pipeClosed() { Q_Q(QLocalSocket); @@ -384,10 +239,9 @@ void QLocalSocketPrivate::_q_pipeClosed() emit q->stateChanged(state); emit q->disconnected(); - readSequenceStarted = false; + pipeReader->stop(); destroyPipeHandles(); handle = INVALID_HANDLE_VALUE; - ResetEvent(overlapped.hEvent); if (pipeWriter) { delete pipeWriter; @@ -399,7 +253,7 @@ qint64 QLocalSocket::bytesAvailable() const { Q_D(const QLocalSocket); qint64 available = QIODevice::bytesAvailable(); - available += (qint64) d->actualReadBufferSize; + available += d->pipeReader->bytesAvailable(); return available; } @@ -412,8 +266,7 @@ qint64 QLocalSocket::bytesToWrite() const bool QLocalSocket::canReadLine() const { Q_D(const QLocalSocket); - return (QIODevice::canReadLine() - || d->readBuffer.indexOf('\n', d->actualReadBufferSize) != -1); + return QIODevice::canReadLine() || d->pipeReader->canReadLine(); } void QLocalSocket::close() @@ -475,15 +328,14 @@ bool QLocalSocket::setSocketDescriptor(quintptr socketDescriptor, LocalSocketState socketState, OpenMode openMode) { Q_D(QLocalSocket); - d->readBuffer.clear(); - d->actualReadBufferSize = 0; - QIODevice::open(openMode); - d->handle = (int*)socketDescriptor; + d->pipeReader->stop(); + d->handle = reinterpret_cast(socketDescriptor); d->state = socketState; - d->pipeClosed = false; + d->pipeReader->setHandle(d->handle); + QIODevice::open(openMode); emit stateChanged(d->state); if (d->state == ConnectedState && openMode.testFlag(QIODevice::ReadOnly)) - d->startAsyncRead(); + d->pipeReader->startAsyncRead(); return true; } @@ -494,19 +346,6 @@ void QLocalSocketPrivate::_q_canWrite() q->close(); } -void QLocalSocketPrivate::_q_notified() -{ - Q_Q(QLocalSocket); - if (!completeAsyncRead()) { - pipeClosed = true; - QTimer::singleShot(0, q, SLOT(_q_pipeClosed())); - return; - } - startAsyncRead(); - emitReadyReadTimer->stop(); - emit q->readyRead(); -} - quintptr QLocalSocket::socketDescriptor() const { Q_D(const QLocalSocket); @@ -516,13 +355,13 @@ quintptr QLocalSocket::socketDescriptor() const qint64 QLocalSocket::readBufferSize() const { Q_D(const QLocalSocket); - return d->readBufferMaxSize; + return d->pipeReader->maxReadBufferSize(); } void QLocalSocket::setReadBufferSize(qint64 size) { Q_D(QLocalSocket); - d->readBufferMaxSize = size; + d->pipeReader->setMaxReadBufferSize(size); } bool QLocalSocket::waitForConnected(int msecs) @@ -540,18 +379,10 @@ bool QLocalSocket::waitForDisconnected(int msecs) qWarning("QLocalSocket::waitForDisconnected isn't supported for write only pipes."); return false; } - QIncrementalSleepTimer timer(msecs); - forever { - d->checkPipeState(); - if (d->pipeClosed) - d->_q_pipeClosed(); - if (state() == UnconnectedState) - return true; - Sleep(timer.nextSleepTime()); - if (timer.hasTimedOut()) - break; + if (d->pipeReader->waitForPipeClosed(msecs)) { + d->_q_pipeClosed(); + return true; } - return false; } @@ -572,28 +403,18 @@ bool QLocalSocket::waitForReadyRead(int msecs) return false; // We already know that the pipe is gone, but did not enter the event loop yet. - if (d->pipeClosed) { + if (d->pipeReader->isPipeClosed()) { d->_q_pipeClosed(); return false; } - Q_ASSERT(d->readSequenceStarted); - DWORD result = WaitForSingleObject(d->overlapped.hEvent, msecs == -1 ? INFINITE : msecs); - switch (result) { - case WAIT_OBJECT_0: - d->_q_notified(); - // We just noticed that the pipe is gone. - if (d->pipeClosed) { - d->_q_pipeClosed(); - return false; - } - return true; - case WAIT_TIMEOUT: - return false; - } + bool result = d->pipeReader->waitForReadyRead(msecs); - qWarning("QLocalSocket::waitForReadyRead WaitForSingleObject failed with error code %d.", int(GetLastError())); - return false; + // We just noticed that the pipe is gone. + if (d->pipeReader->isPipeClosed()) + d->_q_pipeClosed(); + + return result; } bool QLocalSocket::waitForBytesWritten(int msecs) -- cgit v1.2.3