summaryrefslogtreecommitdiffstats
path: root/Source/WTF/wtf/win/WorkQueueWin.cpp
blob: 6e61cde69fb43300d6ee6f19876056dab186516d (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
/*
* Copyright (C) 2010, 2015 Apple Inc. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
*    notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
*    notice, this list of conditions and the following disclaimer in the
*    documentation and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY APPLE INC. AND ITS CONTRIBUTORS ``AS IS''
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
* THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
* PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR ITS CONTRIBUTORS
* BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
* THE POSSIBILITY OF SUCH DAMAGE.
*/

#include "config.h"
#include "WorkQueue.h"

#include <wtf/MathExtras.h>
#include <wtf/Threading.h>
#include <wtf/win/WorkItemWin.h>

namespace WTF {

void WorkQueue::handleCallback(void* context, BOOLEAN timerOrWaitFired)
{
    ASSERT_ARG(context, context);
    ASSERT_ARG(timerOrWaitFired, !timerOrWaitFired);

    WorkItemWin* item = static_cast<WorkItemWin*>(context);
    RefPtr<WorkQueue> queue = item->queue();

    {
        MutexLocker lock(queue->m_workItemQueueLock);
        queue->m_workItemQueue.append(item);

        // If no other thread is performing work, we can do it on this thread.
        if (!queue->tryRegisterAsWorkThread()) {
            // Some other thread is performing work. Since we hold the queue lock, we can be sure
            // that the work thread is not exiting due to an empty queue and will process the work
            // item we just added to it. If we weren't holding the lock we'd have to signal
            // m_performWorkEvent to make sure the work item got picked up.
            return;
        }
    }

    queue->performWorkOnRegisteredWorkThread();
}

DWORD WorkQueue::workThreadCallback(void* context)
{
    ASSERT_ARG(context, context);

    WorkQueue* queue = static_cast<WorkQueue*>(context);

    if (!queue->tryRegisterAsWorkThread())
        return 0;

    queue->performWorkOnRegisteredWorkThread();
    return 0;
}

void WorkQueue::performWorkOnRegisteredWorkThread()
{
    ASSERT(m_isWorkThreadRegistered);

    m_workItemQueueLock.lock();

    while (!m_workItemQueue.isEmpty()) {
        Vector<RefPtr<WorkItemWin>> workItemQueue;
        m_workItemQueue.swap(workItemQueue);

        // Allow more work to be scheduled while we're not using the queue directly.
        m_workItemQueueLock.unlock();
        for (auto& workItem : workItemQueue) {
            workItem->function()();
            deref();
        }
        m_workItemQueueLock.lock();
    }

    // One invariant we maintain is that any work scheduled while a work thread is registered will
    // be handled by that work thread. Unregister as the work thread while the queue lock is still
    // held so that no work can be scheduled while we're still registered.
    unregisterAsWorkThread();

    m_workItemQueueLock.unlock();
}

void WorkQueue::platformInitialize(const char* name, Type, QOS)
{
    m_isWorkThreadRegistered = 0;
    m_timerQueue = ::CreateTimerQueue();
    ASSERT_WITH_MESSAGE(m_timerQueue, "::CreateTimerQueue failed with error %lu", ::GetLastError());
}

bool WorkQueue::tryRegisterAsWorkThread()
{
    LONG result = ::InterlockedCompareExchange(&m_isWorkThreadRegistered, 1, 0);
    ASSERT(!result || result == 1);
    return !result;
}

void WorkQueue::unregisterAsWorkThread()
{
    LONG result = ::InterlockedCompareExchange(&m_isWorkThreadRegistered, 0, 1);
    ASSERT_UNUSED(result, result == 1);
}

void WorkQueue::platformInvalidate()
{
#if !ASSERT_DISABLED
    MutexLocker lock(m_handlesLock);
    ASSERT(m_handles.isEmpty());
#endif

    // FIXME: We need to ensure that any timer-queue timers that fire after this point don't try to
    // access this WorkQueue <http://webkit.org/b/44690>.
    ::DeleteTimerQueueEx(m_timerQueue, 0);
}

void WorkQueue::dispatch(std::function<void()> function)
{
    MutexLocker locker(m_workItemQueueLock);
    ref();
    m_workItemQueue.append(WorkItemWin::create(function, this));

    // Spawn a work thread to perform the work we just added. As an optimization, we avoid
    // spawning the thread if a work thread is already registered. This prevents multiple work
    // threads from being spawned in most cases. (Note that when a work thread has been spawned but
    // hasn't registered itself yet, m_isWorkThreadRegistered will be false and we'll end up
    // spawning a second work thread here. But work thread registration process will ensure that
    // only one thread actually ends up performing work.)
    if (!m_isWorkThreadRegistered)
        ::QueueUserWorkItem(workThreadCallback, this, WT_EXECUTEDEFAULT);
}

struct TimerContext : public ThreadSafeRefCounted<TimerContext> {
    static RefPtr<TimerContext> create() { return adoptRef(new TimerContext); }

    WorkQueue* queue;
    std::function<void()> function;
    Mutex timerMutex;
    HANDLE timer;

private:
    TimerContext()
        : queue(nullptr)
        , timer(0)
    {
    }
};

void WorkQueue::timerCallback(void* context, BOOLEAN timerOrWaitFired)
{
    ASSERT_ARG(context, context);
    ASSERT_UNUSED(timerOrWaitFired, timerOrWaitFired);

    // Balanced by leakRef in scheduleWorkAfterDelay.
    RefPtr<TimerContext> timerContext = adoptRef(static_cast<TimerContext*>(context));

    timerContext->queue->dispatch(timerContext->function);

    MutexLocker lock(timerContext->timerMutex);
    ASSERT(timerContext->timer);
    ASSERT(timerContext->queue->m_timerQueue);
    if (!::DeleteTimerQueueTimer(timerContext->queue->m_timerQueue, timerContext->timer, 0)) {
        // Getting ERROR_IO_PENDING here means that the timer will be destroyed once the callback is done executing.
        ASSERT_WITH_MESSAGE(::GetLastError() == ERROR_IO_PENDING, "::DeleteTimerQueueTimer failed with error %lu", ::GetLastError());
    }
}

void WorkQueue::dispatchAfter(std::chrono::nanoseconds duration, std::function<void()> function)
{
    ASSERT(m_timerQueue);
    ref();

    RefPtr<TimerContext> context = TimerContext::create();
    context->queue = this;
    context->function = function;

    {
        // The timer callback could fire before ::CreateTimerQueueTimer even returns, so we protect
        // context->timer with a mutex to ensure the timer callback doesn't access it before the
        // timer handle has been stored in it.
        MutexLocker lock(context->timerMutex);

        int64_t milliseconds = std::chrono::duration_cast<std::chrono::milliseconds>(duration).count();

        // From empirical testing, we've seen CreateTimerQueueTimer() sometimes fire up to 5+ ms early.
        // This causes havoc for clients of this code that expect to not be called back until the
        // specified duration has expired. Other folks online have also observed some slop in the
        // firing times of CreateTimerQuqueTimer(). From the data posted at
        // http://omeg.pl/blog/2011/11/on-winapi-timers-and-their-resolution, it appears that the slop
        // can be up to about 10 ms. To ensure that we don't fire the timer early, we'll tack on a
        // slop adjustment to the duration, and we'll use double the worst amount of slop observed
        // so far.
        const int64_t slopAdjustment = 20;
        if (milliseconds) 
            milliseconds += slopAdjustment;

        // Since our timer callback is quick, we can execute in the timer thread itself and avoid
        // an extra thread switch over to a worker thread.
        if (!::CreateTimerQueueTimer(&context->timer, m_timerQueue, timerCallback, context.get(), clampTo<DWORD>(milliseconds), 0, WT_EXECUTEINTIMERTHREAD)) {
            ASSERT_WITH_MESSAGE(false, "::CreateTimerQueueTimer failed with error %lu", ::GetLastError());
            return;
        }
    }

    // The timer callback will handle destroying context.
    context.release().leakRef();
}

void WorkQueue::unregisterWaitAndDestroyItemSoon(PassRefPtr<HandleWorkItem> item)
{
    // We're going to make a blocking call to ::UnregisterWaitEx before closing the handle. (The
    // blocking version of ::UnregisterWaitEx is much simpler than the non-blocking version.) If we
    // do this on the current thread, we'll deadlock if we're currently in a callback function for
    // the wait we're unregistering. So instead we do it asynchronously on some other worker thread.

    ::QueueUserWorkItem(unregisterWaitAndDestroyItemCallback, item.leakRef(), WT_EXECUTEDEFAULT);
}

DWORD WINAPI WorkQueue::unregisterWaitAndDestroyItemCallback(void* context)
{
    ASSERT_ARG(context, context);
    RefPtr<HandleWorkItem> item = adoptRef(static_cast<HandleWorkItem*>(context));

    // Now that we know we're not in a callback function for the wait we're unregistering, we can
    // make a blocking call to ::UnregisterWaitEx.
    if (!::UnregisterWaitEx(item->waitHandle(), INVALID_HANDLE_VALUE)) {
        DWORD error = ::GetLastError();
        ASSERT_NOT_REACHED();
    }

    return 0;
}

#if PLATFORM(QT)
void WorkQueue::registerHandle(HANDLE handle, const std::function<void()>& function)
{
    RefPtr<HandleWorkItem> handleItem = HandleWorkItem::createByAdoptingHandle(handle, function, this);

    {
        MutexLocker lock(m_handlesLock);
        ASSERT_ARG(handle, !m_handles.contains(handle));
        m_handles.set(handle, handleItem);
    }

    HANDLE waitHandle;
    if (!::RegisterWaitForSingleObject(&waitHandle, handle, handleCallback, handleItem.get(), INFINITE, WT_EXECUTEDEFAULT)) {
        DWORD error = ::GetLastError();
        ASSERT_NOT_REACHED();
    }
    handleItem->setWaitHandle(waitHandle);
}

void WorkQueue::unregisterAndCloseHandle(HANDLE handle)
{
    RefPtr<HandleWorkItem> item;
    {
        MutexLocker locker(m_handlesLock);
        ASSERT_ARG(handle, m_handles.contains(handle));
        item = m_handles.take(handle);
    }

    unregisterWaitAndDestroyItemSoon(item.release());
}
#endif

} // namespace WTF