From acbd79996d7dedd65cda98bd1b0f15690f0271e4 Mon Sep 17 00:00:00 2001 From: Louai Al-Khanji Date: Fri, 16 Oct 2015 16:19:53 +0300 Subject: QEventDispatcherUNIX: Use poll instead of select This allows us to support file descriptors >= FD_SETSIZE. Change-Id: I7e4a35333446a587cfd13c077fa5e19fa3d1abc4 Reviewed-by: Thiago Macieira --- src/corelib/kernel/qeventdispatcher_unix.cpp | 440 ++++++++++----------------- src/corelib/kernel/qeventdispatcher_unix_p.h | 90 +++--- 2 files changed, 206 insertions(+), 324 deletions(-) (limited to 'src/corelib') diff --git a/src/corelib/kernel/qeventdispatcher_unix.cpp b/src/corelib/kernel/qeventdispatcher_unix.cpp index 87478e0e01..7fd18dc9c1 100644 --- a/src/corelib/kernel/qeventdispatcher_unix.cpp +++ b/src/corelib/kernel/qeventdispatcher_unix.cpp @@ -66,7 +66,7 @@ # define _POSIX_MONOTONIC_CLOCK 1 # endif # include -# include +# include #endif #if (_POSIX_MONOTONIC_CLOCK-0 <= 0) || defined(QT_BOOTSTRAPPED) @@ -75,6 +75,26 @@ QT_BEGIN_NAMESPACE +static const char *socketType(QSocketNotifier::Type type) +{ + switch (type) { + case QSocketNotifier::Read: + return "Read"; + case QSocketNotifier::Write: + return "Write"; + case QSocketNotifier::Exception: + return "Exception"; + } + + Q_UNREACHABLE(); +} + +static pollfd make_pollfd(int fd, short events) +{ + pollfd pfd = { fd, events, 0 }; + return pfd; +} + #if defined(Q_OS_VXWORKS) static void initThreadPipeFD(int fd) { @@ -133,8 +153,6 @@ QEventDispatcherUNIXPrivate::QEventDispatcherUNIXPrivate() if (Q_UNLIKELY(pipefail)) qFatal("QEventDispatcherUNIXPrivate(): Can not continue without a thread pipe"); - - sn_highest = -1; } QEventDispatcherUNIXPrivate::~QEventDispatcherUNIXPrivate() @@ -159,116 +177,11 @@ QEventDispatcherUNIXPrivate::~QEventDispatcherUNIXPrivate() qDeleteAll(timerList); } -int QEventDispatcherUNIXPrivate::doSelect(QEventLoop::ProcessEventsFlags flags, timespec *timeout) -{ - Q_Q(QEventDispatcherUNIX); - - // needed in QEventDispatcherUNIX::select() - timerList.updateCurrentTime(); - - int nsel; - do { - // Process timers and socket notifiers - the common UNIX stuff - int highest = 0; - if (! (flags & QEventLoop::ExcludeSocketNotifiers) && (sn_highest >= 0)) { - // return the highest fd we can wait for input on - sn_vec[0].select_fds = sn_vec[0].enabled_fds; - sn_vec[1].select_fds = sn_vec[1].enabled_fds; - sn_vec[2].select_fds = sn_vec[2].enabled_fds; - highest = sn_highest; - } else { - FD_ZERO(&sn_vec[0].select_fds); - FD_ZERO(&sn_vec[1].select_fds); - FD_ZERO(&sn_vec[2].select_fds); - } - - int wakeUpFd = initThreadWakeUp(); - highest = qMax(highest, wakeUpFd); - - nsel = q->select(highest + 1, - &sn_vec[0].select_fds, - &sn_vec[1].select_fds, - &sn_vec[2].select_fds, - timeout); - } while (nsel == -1 && (errno == EINTR || errno == EAGAIN)); - - if (nsel == -1) { - if (errno == EBADF) { - // it seems a socket notifier has a bad fd... find out - // which one it is and disable it - fd_set fdset; - timeval tm; - tm.tv_sec = tm.tv_usec = 0l; - - for (int type = 0; type < 3; ++type) { - QSockNotType::List &list = sn_vec[type].list; - if (list.size() == 0) - continue; - - for (int i = 0; i < list.size(); ++i) { - QSockNot *sn = list[i]; - - FD_ZERO(&fdset); - FD_SET(sn->fd, &fdset); - - int ret = -1; - do { - switch (type) { - case 0: // read - ret = select(sn->fd + 1, &fdset, 0, 0, &tm); - break; - case 1: // write - ret = select(sn->fd + 1, 0, &fdset, 0, &tm); - break; - case 2: // except - ret = select(sn->fd + 1, 0, 0, &fdset, &tm); - break; - } - } while (ret == -1 && (errno == EINTR || errno == EAGAIN)); - - if (ret == -1 && errno == EBADF) { - // disable the invalid socket notifier - static const char *t[] = { "Read", "Write", "Exception" }; - qWarning("QSocketNotifier: Invalid socket %d and type '%s', disabling...", - sn->fd, t[type]); - sn->obj->setEnabled(false); - } - } - } - } else { - // EINVAL... shouldn't happen, so let's complain to stderr - // and hope someone sends us a bug report - perror("select"); - } - } - - int nevents = processThreadWakeUp(nsel); - - // activate socket notifiers - if (! (flags & QEventLoop::ExcludeSocketNotifiers) && nsel > 0 && sn_highest >= 0) { - // if select says data is ready on any socket, then set the socket notifier - // to pending - for (int i=0; i<3; i++) { - QSockNotType::List &list = sn_vec[i].list; - for (int j = 0; j < list.size(); ++j) { - QSockNot *sn = list[j]; - if (FD_ISSET(sn->fd, &sn_vec[i].select_fds)) - q->setSocketNotifierPending(sn->obj); - } - } - } - return (nevents + q->activateSocketNotifiers()); -} - -int QEventDispatcherUNIXPrivate::initThreadWakeUp() +int QEventDispatcherUNIXPrivate::processThreadWakeUp(const pollfd &pfd) { - FD_SET(thread_pipe[0], &sn_vec[0].select_fds); - return thread_pipe[0]; -} + Q_ASSERT(pfd.fd == thread_pipe[0]); -int QEventDispatcherUNIXPrivate::processThreadWakeUp(int nsel) -{ - if (nsel > 0 && FD_ISSET(thread_pipe[0], &sn_vec[0].select_fds)) { + if (pfd.revents & POLLIN) { // some other thread woke us up... consume the data on the thread pipe so that // select doesn't immediately return next time #if defined(Q_OS_VXWORKS) @@ -298,6 +211,80 @@ int QEventDispatcherUNIXPrivate::processThreadWakeUp(int nsel) return 0; } +void QEventDispatcherUNIXPrivate::setSocketNotifierPending(QSocketNotifier *notifier) +{ + Q_ASSERT(notifier); + + if (pendingNotifiers.contains(notifier)) + return; + + pendingNotifiers << notifier; +} + +int QEventDispatcherUNIXPrivate::activateTimers() +{ + return timerList.activateTimers(); +} + +void QEventDispatcherUNIXPrivate::markPendingSocketNotifiers() +{ + for (const pollfd &pfd : qAsConst(pollfds)) { + if (pfd.fd < 0 || pfd.revents == 0) + continue; + + auto it = socketNotifiers.find(pfd.fd); + Q_ASSERT(it != socketNotifiers.end()); + + const QSocketNotifierSetUNIX &sn_set = it.value(); + + static const struct { + QSocketNotifier::Type type; + short flags; + } notifiers[] = { + { QSocketNotifier::Read, POLLIN | POLLHUP | POLLERR }, + { QSocketNotifier::Write, POLLOUT | POLLHUP | POLLERR }, + { QSocketNotifier::Exception, POLLPRI | POLLHUP | POLLERR } + }; + + for (const auto &n : notifiers) { + QSocketNotifier *notifier = sn_set.notifiers[n.type]; + + if (!notifier) + continue; + + if (pfd.revents & POLLNVAL) { + qWarning("QSocketNotifier: Invalid socket %d with type %s, disabling...", + it.key(), socketType(n.type)); + notifier->setEnabled(false); + } + + if (pfd.revents & n.flags) + setSocketNotifierPending(notifier); + } + } + + pollfds.resize(0); +} + +int QEventDispatcherUNIXPrivate::activateSocketNotifiers() +{ + markPendingSocketNotifiers(); + + if (pendingNotifiers.isEmpty()) + return 0; + + int n_activated = 0; + QEvent event(QEvent::SockAct); + + while (!pendingNotifiers.isEmpty()) { + QSocketNotifier *notifier = pendingNotifiers.takeFirst(); + QCoreApplication::sendEvent(notifier, &event); + ++n_activated; + } + + return n_activated; +} + QEventDispatcherUNIX::QEventDispatcherUNIX(QObject *parent) : QAbstractEventDispatcher(*new QEventDispatcherUNIXPrivate, parent) { } @@ -307,14 +294,7 @@ QEventDispatcherUNIX::QEventDispatcherUNIX(QEventDispatcherUNIXPrivate &dd, QObj { } QEventDispatcherUNIX::~QEventDispatcherUNIX() -{ -} - -int QEventDispatcherUNIX::select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, - timespec *timeout) -{ - return qt_safe_select(nfds, readfds, writefds, exceptfds, timeout); -} +{ } /*! \internal @@ -385,22 +365,6 @@ QEventDispatcherUNIX::registeredTimers(QObject *object) const return d->timerList.registeredTimers(object); } -/***************************************************************************** - Socket notifier type - *****************************************************************************/ -QSockNotType::QSockNotType() -{ - FD_ZERO(&select_fds); - FD_ZERO(&enabled_fds); - FD_ZERO(&pending_fds); -} - -QSockNotType::~QSockNotType() -{ - for (int i = 0; i < list.size(); ++i) - delete list[i]; -} - /***************************************************************************** QEventDispatcher implementations for UNIX *****************************************************************************/ @@ -409,160 +373,59 @@ void QEventDispatcherUNIX::registerSocketNotifier(QSocketNotifier *notifier) { Q_ASSERT(notifier); int sockfd = notifier->socket(); - int type = notifier->type(); + QSocketNotifier::Type type = notifier->type(); #ifndef QT_NO_DEBUG - if (sockfd < 0 - || unsigned(sockfd) >= FD_SETSIZE) { - qWarning("QSocketNotifier: Internal error"); - return; - } else if (notifier->thread() != thread() - || thread() != QThread::currentThread()) { + if (notifier->thread() != thread() || thread() != QThread::currentThread()) { qWarning("QSocketNotifier: socket notifiers cannot be enabled from another thread"); return; } #endif Q_D(QEventDispatcherUNIX); - QSockNotType::List &list = d->sn_vec[type].list; - fd_set *fds = &d->sn_vec[type].enabled_fds; - QSockNot *sn; - - sn = new QSockNot; - sn->obj = notifier; - sn->fd = sockfd; - sn->queue = &d->sn_vec[type].pending_fds; - - int i; - for (i = 0; i < list.size(); ++i) { - QSockNot *p = list[i]; - if (p->fd < sockfd) - break; - if (p->fd == sockfd) { - static const char *t[] = { "Read", "Write", "Exception" }; - qWarning("QSocketNotifier: Multiple socket notifiers for " - "same socket %d and type %s", sockfd, t[type]); - } - } - list.insert(i, sn); + QSocketNotifierSetUNIX &sn_set = d->socketNotifiers[sockfd]; - FD_SET(sockfd, fds); - d->sn_highest = qMax(d->sn_highest, sockfd); + if (sn_set.notifiers[type] && sn_set.notifiers[type] != notifier) + qWarning("%s: Multiple socket notifiers for same socket %d and type %s", + Q_FUNC_INFO, sockfd, socketType(type)); + + sn_set.notifiers[type] = notifier; } void QEventDispatcherUNIX::unregisterSocketNotifier(QSocketNotifier *notifier) { Q_ASSERT(notifier); int sockfd = notifier->socket(); - int type = notifier->type(); + QSocketNotifier::Type type = notifier->type(); #ifndef QT_NO_DEBUG - if (sockfd < 0 - || unsigned(sockfd) >= FD_SETSIZE) { - qWarning("QSocketNotifier: Internal error"); - return; - } else if (notifier->thread() != thread() - || thread() != QThread::currentThread()) { + if (notifier->thread() != thread() || thread() != QThread::currentThread()) { qWarning("QSocketNotifier: socket notifiers cannot be disabled from another thread"); return; } #endif Q_D(QEventDispatcherUNIX); - QSockNotType::List &list = d->sn_vec[type].list; - fd_set *fds = &d->sn_vec[type].enabled_fds; - QSockNot *sn = 0; - int i; - for (i = 0; i < list.size(); ++i) { - sn = list[i]; - if(sn->obj == notifier && sn->fd == sockfd) - break; - } - if (i == list.size()) // not found - return; - FD_CLR(sockfd, fds); // clear fd bit - FD_CLR(sockfd, sn->queue); - d->sn_pending_list.removeAll(sn); // remove from activation list - list.removeAt(i); // remove notifier found above - delete sn; - - if (d->sn_highest == sockfd) { // find highest fd - d->sn_highest = -1; - for (int i=0; i<3; i++) { - if (!d->sn_vec[i].list.isEmpty()) - d->sn_highest = qMax(d->sn_highest, // list is fd-sorted - d->sn_vec[i].list[0]->fd); - } - } -} + d->pendingNotifiers.removeOne(notifier); -void QEventDispatcherUNIX::setSocketNotifierPending(QSocketNotifier *notifier) -{ - Q_ASSERT(notifier); - int sockfd = notifier->socket(); - int type = notifier->type(); -#ifndef QT_NO_DEBUG - if (sockfd < 0 - || unsigned(sockfd) >= FD_SETSIZE) { - qWarning("QSocketNotifier: Internal error"); + auto i = d->socketNotifiers.find(sockfd); + if (i == d->socketNotifiers.end()) return; - } - Q_ASSERT(notifier->thread() == thread() && thread() == QThread::currentThread()); -#endif - Q_D(QEventDispatcherUNIX); - QSockNotType::List &list = d->sn_vec[type].list; - QSockNot *sn = 0; - int i; - for (i = 0; i < list.size(); ++i) { - sn = list[i]; - if(sn->obj == notifier && sn->fd == sockfd) - break; - } - if (i == list.size()) // not found + QSocketNotifierSetUNIX &sn_set = i.value(); + + if (sn_set.notifiers[type] == nullptr) return; - // We choose a random activation order to be more fair under high load. - // If a constant order is used and a peer early in the list can - // saturate the IO, it might grab our attention completely. - // Also, if we're using a straight list, the callback routines may - // delete other entries from the list before those other entries are - // processed. - if (! FD_ISSET(sn->fd, sn->queue)) { - if (d->sn_pending_list.isEmpty()) { - d->sn_pending_list.append(sn); - } else { - d->sn_pending_list.insert((qrand() & 0xff) % - (d->sn_pending_list.size()+1), sn); - } - FD_SET(sn->fd, sn->queue); + if (sn_set.notifiers[type] != notifier) { + qWarning("%s: Multiple socket notifiers for same socket %d and type %s", + Q_FUNC_INFO, sockfd, socketType(type)); + return; } -} - -int QEventDispatcherUNIX::activateTimers() -{ - Q_ASSERT(thread() == QThread::currentThread()); - Q_D(QEventDispatcherUNIX); - return d->timerList.activateTimers(); -} -int QEventDispatcherUNIX::activateSocketNotifiers() -{ - Q_D(QEventDispatcherUNIX); - if (d->sn_pending_list.isEmpty()) - return 0; + sn_set.notifiers[type] = nullptr; - // activate entries - int n_act = 0; - QEvent event(QEvent::SockAct); - while (!d->sn_pending_list.isEmpty()) { - QSockNot *sn = d->sn_pending_list.takeFirst(); - if (FD_ISSET(sn->fd, sn->queue)) { - FD_CLR(sn->fd, sn->queue); - QCoreApplication::sendEvent(sn->obj, &event); - ++n_act; - } - } - return n_act; + if (sn_set.isEmpty()) + d->socketNotifiers.erase(i); } bool QEventDispatcherUNIX::processEvents(QEventLoop::ProcessEventsFlags flags) @@ -574,39 +437,54 @@ bool QEventDispatcherUNIX::processEvents(QEventLoop::ProcessEventsFlags flags) emit awake(); QCoreApplicationPrivate::sendPostedEvents(0, 0, d->threadData); - int nevents = 0; + const bool include_timers = (flags & QEventLoop::X11ExcludeTimers) == 0; + const bool include_notifiers = (flags & QEventLoop::ExcludeSocketNotifiers) == 0; + const bool wait_for_events = flags & QEventLoop::WaitForMoreEvents; + const bool canWait = (d->threadData->canWaitLocked() && !d->interrupt.load() - && (flags & QEventLoop::WaitForMoreEvents)); + && wait_for_events); if (canWait) emit aboutToBlock(); - if (!d->interrupt.load()) { - // return the maximum time we can wait for an event. - timespec *tm = 0; - timespec wait_tm = { 0l, 0l }; - if (!(flags & QEventLoop::X11ExcludeTimers)) { - if (d->timerList.timerWait(wait_tm)) - tm = &wait_tm; - } + if (d->interrupt.load()) + return false; - if (!canWait) { - if (!tm) - tm = &wait_tm; + timespec *tm = nullptr; + timespec wait_tm = { 0, 0 }; - // no time to wait - tm->tv_sec = 0l; - tm->tv_nsec = 0l; - } + if (!canWait || (include_timers && d->timerList.timerWait(wait_tm))) + tm = &wait_tm; - nevents = d->doSelect(flags, tm); + d->pollfds.reserve(1 + (include_notifiers ? d->socketNotifiers.size() : 0)); + d->pollfds.resize(0); - // activate timers - if (! (flags & QEventLoop::X11ExcludeTimers)) { - nevents += activateTimers(); - } + if (include_notifiers) + for (auto it = d->socketNotifiers.cbegin(); it != d->socketNotifiers.cend(); ++it) + d->pollfds.append(make_pollfd(it.key(), it.value().events())); + + // This must be last, as it's popped off the end below + d->pollfds.append(make_pollfd(d->thread_pipe[0], POLLIN)); + + int nevents = 0; + + switch (qt_safe_poll(d->pollfds.data(), d->pollfds.size(), tm)) { + case -1: + perror("qt_safe_poll"); + break; + case 0: + break; + default: + nevents += d->processThreadWakeUp(d->pollfds.takeLast()); + if (include_notifiers) + nevents += d->activateSocketNotifiers(); + break; } + + if (include_timers) + nevents += d->activateTimers(); + // return true if we handled events, false otherwise return (nevents > 0); } diff --git a/src/corelib/kernel/qeventdispatcher_unix_p.h b/src/corelib/kernel/qeventdispatcher_unix_p.h index 0ed57d12f8..b401c07040 100644 --- a/src/corelib/kernel/qeventdispatcher_unix_p.h +++ b/src/corelib/kernel/qeventdispatcher_unix_p.h @@ -59,38 +59,22 @@ #include "QtCore/qvarlengtharray.h" #include "private/qtimerinfo_unix_p.h" -#if !defined(Q_OS_VXWORKS) -# include -# if (!defined(Q_OS_HPUX) || defined(__ia64)) && !defined(Q_OS_NACL) -# include -# endif -#endif - QT_BEGIN_NAMESPACE -struct QSockNot -{ - QSocketNotifier *obj; - int fd; - fd_set *queue; -}; +class QEventDispatcherUNIXPrivate; -class QSockNotType +struct Q_CORE_EXPORT QSocketNotifierSetUNIX Q_DECL_FINAL { -public: - QSockNotType(); - ~QSockNotType(); - - typedef QPodList List; + inline QSocketNotifierSetUNIX() Q_DECL_NOTHROW; - List list; - fd_set select_fds; - fd_set enabled_fds; - fd_set pending_fds; + inline bool isEmpty() const Q_DECL_NOTHROW; + inline short events() const Q_DECL_NOTHROW; + QSocketNotifier *notifiers[3]; }; -class QEventDispatcherUNIXPrivate; +Q_DECLARE_TYPEINFO(QSocketNotifierSetUNIX, Q_PRIMITIVE_TYPE); + class Q_CORE_EXPORT QEventDispatcherUNIX : public QAbstractEventDispatcher { @@ -120,15 +104,6 @@ public: protected: QEventDispatcherUNIX(QEventDispatcherUNIXPrivate &dd, QObject *parent = 0); - - void setSocketNotifierPending(QSocketNotifier *notifier); - - int activateTimers(); - int activateSocketNotifiers(); - - virtual int select(int nfds, - fd_set *readfds, fd_set *writefds, fd_set *exceptfds, - timespec *timeout); }; class Q_CORE_EXPORT QEventDispatcherUNIXPrivate : public QAbstractEventDispatcherPrivate @@ -139,28 +114,57 @@ public: QEventDispatcherUNIXPrivate(); ~QEventDispatcherUNIXPrivate(); - int doSelect(QEventLoop::ProcessEventsFlags flags, timespec *timeout); - virtual int initThreadWakeUp() Q_DECL_FINAL; - virtual int processThreadWakeUp(int nsel) Q_DECL_FINAL; + int processThreadWakeUp(const pollfd &pfd); + + int activateTimers(); + + void markPendingSocketNotifiers(); + int activateSocketNotifiers(); + void setSocketNotifierPending(QSocketNotifier *notifier); // note for eventfd(7) support: // if thread_pipe[1] is -1, then eventfd(7) is in use and is stored in thread_pipe[0] int thread_pipe[2]; - // highest fd for all socket notifiers - int sn_highest; - // 3 socket notifier types - read, write and exception - QSockNotType sn_vec[3]; + QVector pollfds; - QTimerInfoList timerList; + QHash socketNotifiers; + QVector pendingNotifiers; - // pending socket notifiers list - QSockNotType::List sn_pending_list; + QTimerInfoList timerList; QAtomicInt wakeUps; QAtomicInt interrupt; // bool }; +inline QSocketNotifierSetUNIX::QSocketNotifierSetUNIX() Q_DECL_NOTHROW +{ + notifiers[0] = 0; + notifiers[1] = 0; + notifiers[2] = 0; +} + +inline bool QSocketNotifierSetUNIX::isEmpty() const Q_DECL_NOTHROW +{ + return !notifiers[0] && !notifiers[1] && !notifiers[2]; +} + +inline short QSocketNotifierSetUNIX::events() const Q_DECL_NOTHROW +{ + short result = 0; + + if (notifiers[0]) + result |= POLLIN; + + if (notifiers[1]) + result |= POLLOUT; + + if (notifiers[2]) + result |= POLLPRI; + + return result; +} + QT_END_NAMESPACE #endif // QEVENTDISPATCHER_UNIX_P_H -- cgit v1.2.3