diff options
-rw-r--r-- | CMakeLists.txt | 1 | ||||
-rw-r--r-- | src/jomlib/iocompletionport.cpp | 77 | ||||
-rw-r--r-- | src/jomlib/iocompletionport.h | 37 | ||||
-rw-r--r-- | src/jomlib/jomlib.pro | 6 | ||||
-rw-r--r-- | src/jomlib/process.cpp | 293 | ||||
-rw-r--r-- | src/jomlib/process.h | 12 | ||||
-rw-r--r-- | src/jomlib/targetexecutor.cpp | 17 |
7 files changed, 332 insertions, 111 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt index b132c79..f0b9c42 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -51,6 +51,7 @@ set(JOM_SRCS src/jomlib/fileinfo.cpp src/jomlib/filetime.cpp src/jomlib/helperfunctions.cpp + src/jomlib/iocompletionport.cpp src/jomlib/macrotable.cpp src/jomlib/makefile.cpp src/jomlib/makefilefactory.cpp diff --git a/src/jomlib/iocompletionport.cpp b/src/jomlib/iocompletionport.cpp new file mode 100644 index 0000000..2bf21c0 --- /dev/null +++ b/src/jomlib/iocompletionport.cpp @@ -0,0 +1,77 @@ +#include "iocompletionport.h" + +namespace NMakeFile { + +IoCompletionPort::IoCompletionPort() + : hPort(INVALID_HANDLE_VALUE) +{ + setObjectName(QLatin1String("I/O completion port thread")); + HANDLE hIOCP = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, NULL, 0); + if (!hIOCP) { + qWarning("CreateIoCompletionPort failed with error code %d.\n", GetLastError()); + return; + } + hPort = hIOCP; +} + +IoCompletionPort::~IoCompletionPort() +{ + PostQueuedCompletionStatus(hPort, 0, NULL, NULL); + QThread::wait(); + CloseHandle(hPort); +} + +void IoCompletionPort::registerObserver(IoCompletionPortObserver *observer, HANDLE hFile) +{ + HANDLE hIOCP = CreateIoCompletionPort(hFile, hPort, reinterpret_cast<ULONG_PTR>(observer), 0); + if (!hIOCP) { + qWarning("Can't associate file handle with I/O completion port. Error code %d.\n", GetLastError()); + return; + } + mutex.lock(); + observers.insert(observer); + mutex.unlock(); + if (!QThread::isRunning()) + QThread::start(); +} + +void IoCompletionPort::unregisterObserver(IoCompletionPortObserver *observer) +{ + mutex.lock(); + observers.remove(observer); + mutex.unlock(); +} + +void IoCompletionPort::run() +{ + DWORD dwBytesRead; + ULONG_PTR pulCompletionKey; + OVERLAPPED *overlapped; + + for (;;) { + BOOL success = GetQueuedCompletionStatus(hPort, + &dwBytesRead, + &pulCompletionKey, + &overlapped, + INFINITE); + + DWORD errorCode = success ? ERROR_SUCCESS : GetLastError(); + if (!success && !overlapped) { + printf("GetQueuedCompletionStatus failed with error code %d.\n", errorCode); + return; + } + + if (success && !(dwBytesRead || pulCompletionKey || overlapped)) { + // We've posted null values via PostQueuedCompletionStatus to end this thread. + return; + } + + IoCompletionPortObserver *observer = reinterpret_cast<IoCompletionPortObserver *>(pulCompletionKey); + mutex.lock(); + if (observers.contains(observer)) + observer->completionPortNotified(dwBytesRead, errorCode); + mutex.unlock(); + } +} + +} // namespace NMakeFile diff --git a/src/jomlib/iocompletionport.h b/src/jomlib/iocompletionport.h new file mode 100644 index 0000000..66dff6e --- /dev/null +++ b/src/jomlib/iocompletionport.h @@ -0,0 +1,37 @@ +#ifndef IOCOMPLETIONPORT_H +#define IOCOMPLETIONPORT_H + +#include <qset.h> +#include <qmutex.h> +#include <qthread.h> +#include <qt_windows.h> + +namespace NMakeFile { + +class IoCompletionPortObserver +{ +public: + virtual void completionPortNotified(DWORD numberOfBytes, DWORD errorCode) = 0; +}; + +class IoCompletionPort : protected QThread +{ +public: + IoCompletionPort(); + ~IoCompletionPort(); + + void registerObserver(IoCompletionPortObserver *notifier, HANDLE hFile); + void unregisterObserver(IoCompletionPortObserver *notifier); + +protected: + void run(); + +private: + HANDLE hPort; + QSet<IoCompletionPortObserver *> observers; + QMutex mutex; +}; + +} // namespace NMakeFile + +#endif // IOCOMPLETIONPORT_H diff --git a/src/jomlib/jomlib.pro b/src/jomlib/jomlib.pro index 01330a1..afcb136 100644 --- a/src/jomlib/jomlib.pro +++ b/src/jomlib/jomlib.pro @@ -54,7 +54,8 @@ HEADERS += \ ppexprparser.h \ targetexecutor.h \ commandexecutor.h \ - process.h + process.h \ + iocompletionport.h SOURCES += \ fileinfo.cpp \ @@ -73,4 +74,5 @@ SOURCES += \ ppexprparser.cpp \ targetexecutor.cpp \ commandexecutor.cpp \ - process.cpp + process.cpp \ + iocompletionport.cpp diff --git a/src/jomlib/process.cpp b/src/jomlib/process.cpp index c6b797e..3084286 100644 --- a/src/jomlib/process.cpp +++ b/src/jomlib/process.cpp @@ -22,20 +22,28 @@ ****************************************************************************/ #include "process.h" +#include "iocompletionport.h" + #include <QByteArray> -#include <qt_windows.h> -#include <QMap> -#include <QMetaType> #include <QDir> +#include <QMap> +#include <QMutex> +#include <QTimer> + +#include <qt_windows.h> namespace NMakeFile { +Q_GLOBAL_STATIC(IoCompletionPort, iocp) + struct Pipe { Pipe() : hWrite(INVALID_HANDLE_VALUE) , hRead(INVALID_HANDLE_VALUE) - {} + { + ZeroMemory(&overlapped, sizeof(overlapped)); + } ~Pipe() { @@ -47,6 +55,7 @@ struct Pipe HANDLE hWrite; HANDLE hRead; + OVERLAPPED overlapped; }; static void safelyCloseHandle(HANDLE &h) @@ -57,26 +66,36 @@ static void safelyCloseHandle(HANDLE &h) } } -struct ProcessPrivate +class ProcessPrivate : public IoCompletionPortObserver { - ProcessPrivate() - : hWatcherThread(INVALID_HANDLE_VALUE), +public: + ProcessPrivate(Process *process) + : q(process), hProcess(INVALID_HANDLE_VALUE), - hProcessThread(INVALID_HANDLE_VALUE) - {} + hProcessThread(INVALID_HANDLE_VALUE), + exitCode(STILL_ACTIVE) + { + } + + bool startRead(); + void completionPortNotified(DWORD numberOfBytes, DWORD errorCode); - HANDLE hWatcherThread; + Process *q; HANDLE hProcess; HANDLE hProcessThread; Pipe stdoutPipe; Pipe stderrPipe; Pipe stdinPipe; // we don't use it but some processes demand it (e.g. xcopy) QByteArray outputBuffer; + QMutex outputBufferLock; + QMutex bufferedOutputModeSwitchMutex; + QByteArray intermediateOutputBuffer; + DWORD exitCode; }; Process::Process(QObject *parent) : QObject(parent), - d(new ProcessPrivate), + d(new ProcessPrivate(this)), m_state(NotRunning), m_exitCode(0), m_exitStatus(NormalExit), @@ -93,10 +112,26 @@ Process::Process(QObject *parent) Process::~Process() { - waitForFinished(); + if (m_state == Running) + qWarning("Process: destroyed while process still running."); + printBufferedOutput(); delete d; } +void Process::setBufferedOutput(bool b) +{ + if (m_bufferedOutput == b) + return; + + d->bufferedOutputModeSwitchMutex.lock(); + + m_bufferedOutput = b; + if (!m_bufferedOutput) + printBufferedOutput(); + + d->bufferedOutputModeSwitchMutex.unlock(); +} + void Process::setWorkingDirectory(const QString &path) { m_workingDirectory = path; @@ -188,14 +223,50 @@ enum PipeType { InputPipe, OutputPipe }; static bool setupPipe(Pipe &pipe, SECURITY_ATTRIBUTES *sa, PipeType pt) { - if (!CreatePipe(&pipe.hRead, &pipe.hWrite, sa, 0)) { - qWarning("CreatePipe failed with error code %d.", GetLastError()); + BOOL oldInheritHandle = sa->bInheritHandle; + static DWORD instanceCount = 0; + const size_t maxPipeNameLen = 256; + wchar_t pipeName[maxPipeNameLen]; + swprintf_s(pipeName, maxPipeNameLen, L"\\\\.\\pipe\\jom-%X-%X", + GetCurrentProcessId(), instanceCount++); + + sa->bInheritHandle = (pt == InputPipe); + const DWORD dwPipeBufferSize = 1024 * 1024; + HANDLE hRead; + hRead = CreateNamedPipe(pipeName, + PIPE_ACCESS_INBOUND | FILE_FLAG_OVERLAPPED, + PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT | PIPE_REJECT_REMOTE_CLIENTS, + 1, // only one pipe instance + 0, // output buffer size + dwPipeBufferSize, // input buffer size + 0, + sa); + if (hRead == INVALID_HANDLE_VALUE) { + qErrnoWarning("Process: CreateNamedPipe failed."); return false; } - if (!SetHandleInformation(pt == InputPipe ? pipe.hWrite : pipe.hRead, HANDLE_FLAG_INHERIT, 0)) { - qWarning("SetHandleInformation failed with error code %d.", GetLastError()); + + sa->bInheritHandle = (pt == OutputPipe); + HANDLE hWrite = INVALID_HANDLE_VALUE; + hWrite = CreateFile(pipeName, + GENERIC_WRITE, + 0, + sa, + OPEN_EXISTING, + FILE_FLAG_OVERLAPPED, + NULL); + if (hWrite == INVALID_HANDLE_VALUE) { + qErrnoWarning("Process: CreateFile failed."); + CloseHandle(hRead); return false; } + + // Wait until connection is in place. + ConnectNamedPipe(hRead, NULL); + + pipe.hRead = hRead; + pipe.hWrite = hWrite; + sa->bInheritHandle = oldInheritHandle; return true; } @@ -207,25 +278,21 @@ void Process::start(const QString &commandLine) sa.nLength = sizeof(sa); sa.bInheritHandle = TRUE; - if (m_bufferedOutput) { - if (!setupPipe(d->stdinPipe, &sa, InputPipe)) - qFatal("Cannot setup pipe for stdin."); - if (!setupPipe(d->stdoutPipe, &sa, OutputPipe)) - qFatal("Cannot setup pipe for stdout."); + if (!setupPipe(d->stdinPipe, &sa, InputPipe)) + qFatal("Cannot setup pipe for stdin."); + if (!setupPipe(d->stdoutPipe, &sa, OutputPipe)) + qFatal("Cannot setup pipe for stdout."); - // Let the child process write to the same handle, like QProcess::MergedChannels. - DuplicateHandle(GetCurrentProcess(), d->stdoutPipe.hWrite, GetCurrentProcess(), - &d->stderrPipe.hWrite, 0, TRUE, DUPLICATE_SAME_ACCESS); - } + // Let the child process write to the same handle, like QProcess::MergedChannels. + DuplicateHandle(GetCurrentProcess(), d->stdoutPipe.hWrite, GetCurrentProcess(), + &d->stderrPipe.hWrite, 0, TRUE, DUPLICATE_SAME_ACCESS); STARTUPINFO si = {0}; si.cb = sizeof(si); - if (m_bufferedOutput) { - si.hStdInput = d->stdinPipe.hRead; - si.hStdOutput = d->stdoutPipe.hWrite; - si.hStdError = d->stderrPipe.hWrite; - si.dwFlags = STARTF_USESTDHANDLES; - } + si.hStdInput = d->stdinPipe.hRead; + si.hStdOutput = d->stdoutPipe.hWrite; + si.hStdError = d->stderrPipe.hWrite; + si.dwFlags = STARTF_USESTDHANDLES; DWORD dwCreationFlags = CREATE_UNICODE_ENVIRONMENT; PROCESS_INFORMATION pi; @@ -248,100 +315,124 @@ void Process::start(const QString &commandLine) } // Close the pipe handles. This process doesn't need them anymore. - if (m_bufferedOutput) { - safelyCloseHandle(d->stdinPipe.hRead); - safelyCloseHandle(d->stdoutPipe.hWrite); - safelyCloseHandle(d->stderrPipe.hWrite); - } + safelyCloseHandle(d->stdinPipe.hRead); + safelyCloseHandle(d->stdinPipe.hWrite); + safelyCloseHandle(d->stdoutPipe.hWrite); + safelyCloseHandle(d->stderrPipe.hWrite); - // TODO: use a global notifier object instead of creating a thread for every starting process d->hProcess = pi.hProcess; d->hProcessThread = pi.hThread; - HANDLE hThread = CreateThread(NULL, 4096, processWatcherThread, this, 0, NULL); - if (!hThread) { - safelyCloseHandle(d->hProcess); - safelyCloseHandle(d->hProcessThread); - m_state = NotRunning; + iocp()->registerObserver(d, d->stdoutPipe.hRead); + if (d->startRead()) { + m_state = Running; + } else { emit error(FailedToStart); - return; } - d->hWatcherThread = hThread; - m_state = Running; } -DWORD WINAPI Process::processWatcherThread(void *lpParameter) +void Process::onProcessFinished() { - Process *process = reinterpret_cast<Process*>(lpParameter); - ProcessPrivate *d = process->d; - - DWORD dwRead; - const size_t buflen = 4096; - char chBuf[buflen]; - BOOL bSuccess = FALSE; - - if (process->m_bufferedOutput) { - for (;;) - { - DWORD bytesAvailable; - if (!PeekNamedPipe(d->stdoutPipe.hRead, 0, 0, 0, &bytesAvailable, 0)) - break; - if (bytesAvailable == 0) { - if (WaitForSingleObject(d->hProcess, 100) == WAIT_TIMEOUT) - continue; - else - break; - } - bSuccess = ReadFile(d->stdoutPipe.hRead, chBuf, buflen, &dwRead, NULL); - if (!bSuccess || dwRead == 0) - break; - - d->outputBuffer.append(QByteArray(chBuf, dwRead)); - } - - safelyCloseHandle(d->stdoutPipe.hRead); - safelyCloseHandle(d->stdinPipe.hWrite); - } else { - WaitForSingleObject(d->hProcess, INFINITE); - } - - DWORD exitCode = 0; - bool crashed; - if (GetExitCodeProcess(d->hProcess, &exitCode)) { - //### for now we assume a crash if exit code is less than -1 or the magic number - crashed = (exitCode == 0xf291 || (int)exitCode < 0); - } + if (m_state != Running) + return; + iocp()->unregisterObserver(d); + safelyCloseHandle(d->stdoutPipe.hRead); + safelyCloseHandle(d->stderrPipe.hRead); safelyCloseHandle(d->hProcess); safelyCloseHandle(d->hProcessThread); + printBufferedOutput(); + m_state = NotRunning; + DWORD exitCode = d->exitCode; + d->exitCode = STILL_ACTIVE; - QMetaObject::invokeMethod(process, "onProcessFinished", Qt::QueuedConnection, Q_ARG(int, exitCode), Q_ARG(bool, crashed)); - return 0; + //### for now we assume a crash if exit code is less than -1 or the magic number + bool crashed = (exitCode == 0xf291 || (int)exitCode < 0); + ExitStatus exitStatus = crashed ? Process::CrashExit : Process::NormalExit; + emit finished(exitCode, exitStatus); } -void Process::onProcessFinished(int exitCode, bool crashed) +bool Process::waitForFinished() { if (m_state != Running) - return; + return true; + //if (WaitForSingleObject(d->hProcess, INFINITE) == WAIT_TIMEOUT) + // return false; + + QEventLoop eventLoop; + connect(this, SIGNAL(finished(int, Process::ExitStatus)), &eventLoop, SLOT(quit())); + eventLoop.exec(); + + m_state = NotRunning; + return true; +} + +/** + * Starts the asynchronous read operation. + * Returns true, if initiating the read operation was successful. + */ +bool ProcessPrivate::startRead() +{ + DWORD dwRead; + BOOL bSuccess; + + const DWORD minReadBufferSize = 4096; + bSuccess = PeekNamedPipe(stdoutPipe.hRead, NULL, 0, NULL, &dwRead, NULL); + if (!bSuccess || dwRead < minReadBufferSize) + dwRead = minReadBufferSize; + + intermediateOutputBuffer.resize(dwRead); + bSuccess = ReadFile(stdoutPipe.hRead, + intermediateOutputBuffer.data(), + intermediateOutputBuffer.size(), + NULL, + &stdoutPipe.overlapped); + if (!bSuccess) { + DWORD dwError = GetLastError(); + if (dwError != ERROR_IO_PENDING) + return false; + } + return true; +} +void Process::printBufferedOutput() +{ + d->outputBufferLock.lock(); if (!d->outputBuffer.isEmpty()) { fputs(d->outputBuffer.data(), stdout); + fflush(stdout); d->outputBuffer.clear(); } - safelyCloseHandle(d->hWatcherThread); - m_state = NotRunning; - ExitStatus exitStatus = crashed ? Process::CrashExit : Process::NormalExit; - emit finished(exitCode, exitStatus); + d->outputBufferLock.unlock(); } -bool Process::waitForFinished() +void ProcessPrivate::completionPortNotified(DWORD numberOfBytes, DWORD errorCode) { - if (m_state != Running) - return true; - if (WaitForSingleObject(d->hWatcherThread, INFINITE) == WAIT_TIMEOUT) - return false; - safelyCloseHandle(d->hWatcherThread); - m_state = NotRunning; - return true; + if (numberOfBytes) { + bufferedOutputModeSwitchMutex.lock(); + + if (q->m_bufferedOutput) { + outputBufferLock.lock(); + outputBuffer.append(intermediateOutputBuffer.data(), numberOfBytes); + outputBufferLock.unlock(); + } else { + intermediateOutputBuffer[(uint)numberOfBytes] = 0; + printf(intermediateOutputBuffer.data()); + fflush(stdout); + } + + bufferedOutputModeSwitchMutex.unlock(); + } + + if (errorCode == ERROR_SUCCESS) + if (startRead()) + return; + + if (exitCode == STILL_ACTIVE) + if (!GetExitCodeProcess(hProcess, &exitCode)) + exitCode = STILL_ACTIVE; + + if (exitCode != STILL_ACTIVE) + QTimer::singleShot(0, q, SLOT(onProcessFinished())); } } // namespace NMakeFile diff --git a/src/jomlib/process.h b/src/jomlib/process.h index 8d2d4e5..5cc535b 100644 --- a/src/jomlib/process.h +++ b/src/jomlib/process.h @@ -56,7 +56,7 @@ public: Running }; - void setBufferedOutput(bool b) { m_bufferedOutput = b; } + void setBufferedOutput(bool b); bool isBufferedOutputSet() const { return m_bufferedOutput; } void setWorkingDirectory(const QString &path); const QString &workingDirectory() const { return m_workingDirectory; } @@ -75,11 +75,13 @@ public slots: bool waitForFinished(); private: - static unsigned long __stdcall processWatcherThread(void *lpParameter); - Q_INVOKABLE void onProcessFinished(int, bool); + void printBufferedOutput(); + +private slots: + void onProcessFinished(); private: - struct ProcessPrivate *d; + class ProcessPrivate *d; QString m_workingDirectory; QStringList m_environment; QByteArray m_envBlock; @@ -87,6 +89,8 @@ private: int m_exitCode; ExitStatus m_exitStatus; bool m_bufferedOutput; + + friend class ProcessPrivate; }; } // namespace NMakeFile diff --git a/src/jomlib/targetexecutor.cpp b/src/jomlib/targetexecutor.cpp index 20c1bd2..6f4cdbf 100644 --- a/src/jomlib/targetexecutor.cpp +++ b/src/jomlib/targetexecutor.cpp @@ -158,10 +158,19 @@ void TargetExecutor::onChildFinished(CommandExecutor* executor, bool abortMakePr { Q_CHECK_PTR(executor->target()); m_depgraph->removeLeaf(executor->target()); - if (executor->isBufferedOutputSet()) - m_availableProcesses.append(executor); - else - m_availableProcesses.prepend(executor); + m_availableProcesses.append(executor); + if (!executor->isBufferedOutputSet()) { + executor->setBufferedOutput(true); + bool found = false; + foreach (CommandExecutor *cmdex, m_processes) { + if (cmdex->isActive()) { + cmdex->setBufferedOutput(false); + found = true; + } + } + if (!found) + m_availableProcesses.first()->setBufferedOutput(false); + } if (!abortMakeProcess && m_blockingCommand && m_blockingCommand == executor) { //qDebug() << "UNBLOCK" << QCoreApplication::applicationPid(); |