diff options
Diffstat (limited to 'chromium/mojo/system/message_pipe_dispatcher_unittest.cc')
-rw-r--r-- | chromium/mojo/system/message_pipe_dispatcher_unittest.cc | 598 |
1 files changed, 598 insertions, 0 deletions
diff --git a/chromium/mojo/system/message_pipe_dispatcher_unittest.cc b/chromium/mojo/system/message_pipe_dispatcher_unittest.cc new file mode 100644 index 00000000000..291a2c152f5 --- /dev/null +++ b/chromium/mojo/system/message_pipe_dispatcher_unittest.cc @@ -0,0 +1,598 @@ +// Copyright 2013 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +// NOTE(vtl): Some of these tests are inherently flaky (e.g., if run on a +// heavily-loaded system). Sorry. |test::EpsilonTimeout()| may be increased to +// increase tolerance and reduce observed flakiness (though doing so reduces the +// meaningfulness of the test). + +#include "mojo/system/message_pipe_dispatcher.h" + +#include <string.h> + +#include <limits> + +#include "base/memory/ref_counted.h" +#include "base/memory/scoped_vector.h" +#include "base/rand_util.h" +#include "base/threading/platform_thread.h" // For |Sleep()|. +#include "base/threading/simple_thread.h" +#include "base/time/time.h" +#include "mojo/system/message_pipe.h" +#include "mojo/system/test_utils.h" +#include "mojo/system/waiter.h" +#include "mojo/system/waiter_test_utils.h" +#include "testing/gtest/include/gtest/gtest.h" + +namespace mojo { +namespace system { +namespace { + +TEST(MessagePipeDispatcherTest, Basic) { + test::Stopwatch stopwatch; + int32_t buffer[1]; + const uint32_t kBufferSize = static_cast<uint32_t>(sizeof(buffer)); + uint32_t buffer_size; + + // Run this test both with |d0| as port 0, |d1| as port 1 and vice versa. + for (unsigned i = 0; i < 2; i++) { + scoped_refptr<MessagePipeDispatcher> d0(new MessagePipeDispatcher( + MessagePipeDispatcher::kDefaultCreateOptions)); + EXPECT_EQ(Dispatcher::kTypeMessagePipe, d0->GetType()); + scoped_refptr<MessagePipeDispatcher> d1(new MessagePipeDispatcher( + MessagePipeDispatcher::kDefaultCreateOptions)); + { + scoped_refptr<MessagePipe> mp(new MessagePipe()); + d0->Init(mp, i); // 0, 1. + d1->Init(mp, i ^ 1); // 1, 0. + } + Waiter w; + uint32_t context = 0; + + // Try adding a writable waiter when already writable. + w.Init(); + EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS, + d0->AddWaiter(&w, MOJO_HANDLE_SIGNAL_WRITABLE, 0)); + // Shouldn't need to remove the waiter (it was not added). + + // Add a readable waiter to |d0|, then make it readable (by writing to + // |d1|), then wait. + w.Init(); + EXPECT_EQ(MOJO_RESULT_OK, + d0->AddWaiter(&w, MOJO_HANDLE_SIGNAL_READABLE, 1)); + buffer[0] = 123456789; + EXPECT_EQ(MOJO_RESULT_OK, + d1->WriteMessage(buffer, kBufferSize, + NULL, + MOJO_WRITE_MESSAGE_FLAG_NONE)); + stopwatch.Start(); + EXPECT_EQ(MOJO_RESULT_OK, w.Wait(MOJO_DEADLINE_INDEFINITE, &context)); + EXPECT_EQ(1u, context); + EXPECT_LT(stopwatch.Elapsed(), test::EpsilonTimeout()); + d0->RemoveWaiter(&w); + + // Try adding a readable waiter when already readable (from above). + w.Init(); + EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS, + d0->AddWaiter(&w, MOJO_HANDLE_SIGNAL_READABLE, 2)); + // Shouldn't need to remove the waiter (it was not added). + + // Make |d0| no longer readable (by reading from it). + buffer[0] = 0; + buffer_size = kBufferSize; + EXPECT_EQ(MOJO_RESULT_OK, + d0->ReadMessage(buffer, &buffer_size, + 0, NULL, + MOJO_READ_MESSAGE_FLAG_NONE)); + EXPECT_EQ(kBufferSize, buffer_size); + EXPECT_EQ(123456789, buffer[0]); + + // Wait for zero time for readability on |d0| (will time out). + w.Init(); + EXPECT_EQ(MOJO_RESULT_OK, + d0->AddWaiter(&w, MOJO_HANDLE_SIGNAL_READABLE, 3)); + stopwatch.Start(); + EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED, w.Wait(0, NULL)); + EXPECT_LT(stopwatch.Elapsed(), test::EpsilonTimeout()); + d0->RemoveWaiter(&w); + + // Wait for non-zero, finite time for readability on |d0| (will time out). + w.Init(); + EXPECT_EQ(MOJO_RESULT_OK, + d0->AddWaiter(&w, MOJO_HANDLE_SIGNAL_READABLE, 3)); + stopwatch.Start(); + EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED, + w.Wait(2 * test::EpsilonTimeout().InMicroseconds(), NULL)); + base::TimeDelta elapsed = stopwatch.Elapsed(); + EXPECT_GT(elapsed, (2-1) * test::EpsilonTimeout()); + EXPECT_LT(elapsed, (2+1) * test::EpsilonTimeout()); + d0->RemoveWaiter(&w); + + EXPECT_EQ(MOJO_RESULT_OK, d0->Close()); + EXPECT_EQ(MOJO_RESULT_OK, d1->Close()); + } +} + +TEST(MessagePipeDispatcherTest, InvalidParams) { + char buffer[1]; + + scoped_refptr<MessagePipeDispatcher> d0(new MessagePipeDispatcher( + MessagePipeDispatcher::kDefaultCreateOptions)); + scoped_refptr<MessagePipeDispatcher> d1(new MessagePipeDispatcher( + MessagePipeDispatcher::kDefaultCreateOptions)); + { + scoped_refptr<MessagePipe> mp(new MessagePipe()); + d0->Init(mp, 0); + d1->Init(mp, 1); + } + + // |WriteMessage|: + // Null buffer with nonzero buffer size. + EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT, + d0->WriteMessage(NULL, 1, + NULL, + MOJO_WRITE_MESSAGE_FLAG_NONE)); + // Huge buffer size. + EXPECT_EQ(MOJO_RESULT_RESOURCE_EXHAUSTED, + d0->WriteMessage(buffer, std::numeric_limits<uint32_t>::max(), + NULL, + MOJO_WRITE_MESSAGE_FLAG_NONE)); + + // |ReadMessage|: + // Null buffer with nonzero buffer size. + uint32_t buffer_size = 1; + EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT, + d0->ReadMessage(NULL, &buffer_size, + 0, NULL, + MOJO_READ_MESSAGE_FLAG_NONE)); + + EXPECT_EQ(MOJO_RESULT_OK, d0->Close()); + EXPECT_EQ(MOJO_RESULT_OK, d1->Close()); +} + +// Test what happens when one end is closed (single-threaded test). +TEST(MessagePipeDispatcherTest, BasicClosed) { + int32_t buffer[1]; + const uint32_t kBufferSize = static_cast<uint32_t>(sizeof(buffer)); + uint32_t buffer_size; + + // Run this test both with |d0| as port 0, |d1| as port 1 and vice versa. + for (unsigned i = 0; i < 2; i++) { + scoped_refptr<MessagePipeDispatcher> d0(new MessagePipeDispatcher( + MessagePipeDispatcher::kDefaultCreateOptions)); + scoped_refptr<MessagePipeDispatcher> d1(new MessagePipeDispatcher( + MessagePipeDispatcher::kDefaultCreateOptions)); + { + scoped_refptr<MessagePipe> mp(new MessagePipe()); + d0->Init(mp, i); // 0, 1. + d1->Init(mp, i ^ 1); // 1, 0. + } + Waiter w; + + // Write (twice) to |d1|. + buffer[0] = 123456789; + EXPECT_EQ(MOJO_RESULT_OK, + d1->WriteMessage(buffer, kBufferSize, + NULL, + MOJO_WRITE_MESSAGE_FLAG_NONE)); + buffer[0] = 234567890; + EXPECT_EQ(MOJO_RESULT_OK, + d1->WriteMessage(buffer, kBufferSize, + NULL, + MOJO_WRITE_MESSAGE_FLAG_NONE)); + + // Try waiting for readable on |d0|; should fail (already satisfied). + w.Init(); + EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS, + d0->AddWaiter(&w, MOJO_HANDLE_SIGNAL_READABLE, 0)); + + // Try reading from |d1|; should fail (nothing to read). + buffer[0] = 0; + buffer_size = kBufferSize; + EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT, + d1->ReadMessage(buffer, &buffer_size, + 0, NULL, + MOJO_READ_MESSAGE_FLAG_NONE)); + + // Close |d1|. + EXPECT_EQ(MOJO_RESULT_OK, d1->Close()); + + // Try waiting for readable on |d0|; should fail (already satisfied). + w.Init(); + EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS, + d0->AddWaiter(&w, MOJO_HANDLE_SIGNAL_READABLE, 1)); + + // Read from |d0|. + buffer[0] = 0; + buffer_size = kBufferSize; + EXPECT_EQ(MOJO_RESULT_OK, + d0->ReadMessage(buffer, &buffer_size, + 0, NULL, + MOJO_READ_MESSAGE_FLAG_NONE)); + EXPECT_EQ(kBufferSize, buffer_size); + EXPECT_EQ(123456789, buffer[0]); + + // Try waiting for readable on |d0|; should fail (already satisfied). + w.Init(); + EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS, + d0->AddWaiter(&w, MOJO_HANDLE_SIGNAL_READABLE, 2)); + + // Read again from |d0|. + buffer[0] = 0; + buffer_size = kBufferSize; + EXPECT_EQ(MOJO_RESULT_OK, + d0->ReadMessage(buffer, &buffer_size, + 0, NULL, + MOJO_READ_MESSAGE_FLAG_NONE)); + EXPECT_EQ(kBufferSize, buffer_size); + EXPECT_EQ(234567890, buffer[0]); + + // Try waiting for readable on |d0|; should fail (unsatisfiable). + w.Init(); + EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, + d0->AddWaiter(&w, MOJO_HANDLE_SIGNAL_READABLE, 3)); + + // Try waiting for writable on |d0|; should fail (unsatisfiable). + w.Init(); + EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, + d0->AddWaiter(&w, MOJO_HANDLE_SIGNAL_WRITABLE, 4)); + + // Try reading from |d0|; should fail (nothing to read and other end + // closed). + buffer[0] = 0; + buffer_size = kBufferSize; + EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, + d0->ReadMessage(buffer, &buffer_size, + 0, NULL, + MOJO_READ_MESSAGE_FLAG_NONE)); + + // Try writing to |d0|; should fail (other end closed). + buffer[0] = 345678901; + EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, + d0->WriteMessage(buffer, kBufferSize, + NULL, + MOJO_WRITE_MESSAGE_FLAG_NONE)); + + EXPECT_EQ(MOJO_RESULT_OK, d0->Close()); + } +} + +TEST(MessagePipeDispatcherTest, BasicThreaded) { + test::Stopwatch stopwatch; + int32_t buffer[1]; + const uint32_t kBufferSize = static_cast<uint32_t>(sizeof(buffer)); + uint32_t buffer_size; + base::TimeDelta elapsed; + bool did_wait; + MojoResult result; + uint32_t context; + + // Run this test both with |d0| as port 0, |d1| as port 1 and vice versa. + for (unsigned i = 0; i < 2; i++) { + scoped_refptr<MessagePipeDispatcher> d0(new MessagePipeDispatcher( + MessagePipeDispatcher::kDefaultCreateOptions)); + scoped_refptr<MessagePipeDispatcher> d1(new MessagePipeDispatcher( + MessagePipeDispatcher::kDefaultCreateOptions)); + { + scoped_refptr<MessagePipe> mp(new MessagePipe()); + d0->Init(mp, i); // 0, 1. + d1->Init(mp, i ^ 1); // 1, 0. + } + + // Wait for readable on |d1|, which will become readable after some time. + { + test::WaiterThread thread(d1, + MOJO_HANDLE_SIGNAL_READABLE, + MOJO_DEADLINE_INDEFINITE, + 1, + &did_wait, &result, &context); + stopwatch.Start(); + thread.Start(); + base::PlatformThread::Sleep(2 * test::EpsilonTimeout()); + // Wake it up by writing to |d0|. + buffer[0] = 123456789; + EXPECT_EQ(MOJO_RESULT_OK, + d0->WriteMessage(buffer, kBufferSize, + NULL, + MOJO_WRITE_MESSAGE_FLAG_NONE)); + } // Joins the thread. + elapsed = stopwatch.Elapsed(); + EXPECT_GT(elapsed, (2-1) * test::EpsilonTimeout()); + EXPECT_LT(elapsed, (2+1) * test::EpsilonTimeout()); + EXPECT_TRUE(did_wait); + EXPECT_EQ(MOJO_RESULT_OK, result); + EXPECT_EQ(1u, context); + + // Now |d1| is already readable. Try waiting for it again. + { + test::WaiterThread thread(d1, + MOJO_HANDLE_SIGNAL_READABLE, + MOJO_DEADLINE_INDEFINITE, + 2, + &did_wait, &result, &context); + stopwatch.Start(); + thread.Start(); + } // Joins the thread. + EXPECT_LT(stopwatch.Elapsed(), test::EpsilonTimeout()); + EXPECT_FALSE(did_wait); + EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS, result); + + // Consume what we wrote to |d0|. + buffer[0] = 0; + buffer_size = kBufferSize; + EXPECT_EQ(MOJO_RESULT_OK, + d1->ReadMessage(buffer, &buffer_size, + 0, NULL, + MOJO_READ_MESSAGE_FLAG_NONE)); + EXPECT_EQ(kBufferSize, buffer_size); + EXPECT_EQ(123456789, buffer[0]); + + // Wait for readable on |d1| and close |d0| after some time, which should + // cancel that wait. + { + test::WaiterThread thread(d1, + MOJO_HANDLE_SIGNAL_READABLE, + MOJO_DEADLINE_INDEFINITE, + 3, + &did_wait, &result, &context); + stopwatch.Start(); + thread.Start(); + base::PlatformThread::Sleep(2 * test::EpsilonTimeout()); + EXPECT_EQ(MOJO_RESULT_OK, d0->Close()); + } // Joins the thread. + elapsed = stopwatch.Elapsed(); + EXPECT_GT(elapsed, (2-1) * test::EpsilonTimeout()); + EXPECT_LT(elapsed, (2+1) * test::EpsilonTimeout()); + EXPECT_TRUE(did_wait); + EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, result); + EXPECT_EQ(3u, context); + + EXPECT_EQ(MOJO_RESULT_OK, d1->Close()); + } + + for (unsigned i = 0; i < 2; i++) { + scoped_refptr<MessagePipeDispatcher> d0(new MessagePipeDispatcher( + MessagePipeDispatcher::kDefaultCreateOptions)); + scoped_refptr<MessagePipeDispatcher> d1(new MessagePipeDispatcher( + MessagePipeDispatcher::kDefaultCreateOptions)); + { + scoped_refptr<MessagePipe> mp(new MessagePipe()); + d0->Init(mp, i); // 0, 1. + d1->Init(mp, i ^ 1); // 1, 0. + } + + // Wait for readable on |d1| and close |d1| after some time, which should + // cancel that wait. + { + test::WaiterThread thread(d1, + MOJO_HANDLE_SIGNAL_READABLE, + MOJO_DEADLINE_INDEFINITE, + 4, + &did_wait, &result, &context); + stopwatch.Start(); + thread.Start(); + base::PlatformThread::Sleep(2 * test::EpsilonTimeout()); + EXPECT_EQ(MOJO_RESULT_OK, d1->Close()); + } // Joins the thread. + elapsed = stopwatch.Elapsed(); + EXPECT_GT(elapsed, (2-1) * test::EpsilonTimeout()); + EXPECT_LT(elapsed, (2+1) * test::EpsilonTimeout()); + EXPECT_TRUE(did_wait); + EXPECT_EQ(MOJO_RESULT_CANCELLED, result); + EXPECT_EQ(4u, context); + + EXPECT_EQ(MOJO_RESULT_OK, d0->Close()); + } +} + +// Stress test ----------------------------------------------------------------- + +const size_t kMaxMessageSize = 2000; + +class WriterThread : public base::SimpleThread { + public: + // |*messages_written| and |*bytes_written| belong to the thread while it's + // alive. + WriterThread(scoped_refptr<Dispatcher> write_dispatcher, + size_t* messages_written, size_t* bytes_written) + : base::SimpleThread("writer_thread"), + write_dispatcher_(write_dispatcher), + messages_written_(messages_written), + bytes_written_(bytes_written) { + *messages_written_ = 0; + *bytes_written_ = 0; + } + + virtual ~WriterThread() { + Join(); + } + + private: + virtual void Run() OVERRIDE { + // Make some data to write. + unsigned char buffer[kMaxMessageSize]; + for (size_t i = 0; i < kMaxMessageSize; i++) + buffer[i] = static_cast<unsigned char>(i); + + // Number of messages to write. + *messages_written_ = static_cast<size_t>(base::RandInt(1000, 6000)); + + // Write messages. + for (size_t i = 0; i < *messages_written_; i++) { + uint32_t bytes_to_write = static_cast<uint32_t>( + base::RandInt(1, static_cast<int>(kMaxMessageSize))); + EXPECT_EQ(MOJO_RESULT_OK, + write_dispatcher_->WriteMessage(buffer, bytes_to_write, + NULL, + MOJO_WRITE_MESSAGE_FLAG_NONE)); + *bytes_written_ += bytes_to_write; + } + + // Write one last "quit" message. + EXPECT_EQ(MOJO_RESULT_OK, + write_dispatcher_->WriteMessage("quit", 4, + NULL, + MOJO_WRITE_MESSAGE_FLAG_NONE)); + } + + const scoped_refptr<Dispatcher> write_dispatcher_; + size_t* const messages_written_; + size_t* const bytes_written_; + + DISALLOW_COPY_AND_ASSIGN(WriterThread); +}; + +class ReaderThread : public base::SimpleThread { + public: + // |*messages_read| and |*bytes_read| belong to the thread while it's alive. + ReaderThread(scoped_refptr<Dispatcher> read_dispatcher, + size_t* messages_read, size_t* bytes_read) + : base::SimpleThread("reader_thread"), + read_dispatcher_(read_dispatcher), + messages_read_(messages_read), + bytes_read_(bytes_read) { + *messages_read_ = 0; + *bytes_read_ = 0; + } + + virtual ~ReaderThread() { + Join(); + } + + private: + virtual void Run() OVERRIDE { + unsigned char buffer[kMaxMessageSize]; + MojoResult result; + Waiter w; + + // Read messages. + for (;;) { + // Wait for it to be readable. + w.Init(); + result = read_dispatcher_->AddWaiter(&w, MOJO_HANDLE_SIGNAL_READABLE, 0); + EXPECT_TRUE(result == MOJO_RESULT_OK || + result == MOJO_RESULT_ALREADY_EXISTS) << "result: " << result; + if (result == MOJO_RESULT_OK) { + // Actually need to wait. + EXPECT_EQ(MOJO_RESULT_OK, w.Wait(MOJO_DEADLINE_INDEFINITE, NULL)); + read_dispatcher_->RemoveWaiter(&w); + } + + // Now, try to do the read. + // Clear the buffer so that we can check the result. + memset(buffer, 0, sizeof(buffer)); + uint32_t buffer_size = static_cast<uint32_t>(sizeof(buffer)); + result = read_dispatcher_->ReadMessage(buffer, &buffer_size, + 0, NULL, + MOJO_READ_MESSAGE_FLAG_NONE); + EXPECT_TRUE(result == MOJO_RESULT_OK || + result == MOJO_RESULT_SHOULD_WAIT) << "result: " << result; + // We're racing with others to read, so maybe we failed. + if (result == MOJO_RESULT_SHOULD_WAIT) + continue; // In which case, try again. + // Check for quit. + if (buffer_size == 4 && memcmp("quit", buffer, 4) == 0) + return; + EXPECT_GE(buffer_size, 1u); + EXPECT_LE(buffer_size, kMaxMessageSize); + EXPECT_TRUE(IsValidMessage(buffer, buffer_size)); + + (*messages_read_)++; + *bytes_read_ += buffer_size; + } + } + + static bool IsValidMessage(const unsigned char* buffer, + uint32_t message_size) { + size_t i; + for (i = 0; i < message_size; i++) { + if (buffer[i] != static_cast<unsigned char>(i)) + return false; + } + // Check that the remaining bytes weren't stomped on. + for (; i < kMaxMessageSize; i++) { + if (buffer[i] != 0) + return false; + } + return true; + } + + const scoped_refptr<Dispatcher> read_dispatcher_; + size_t* const messages_read_; + size_t* const bytes_read_; + + DISALLOW_COPY_AND_ASSIGN(ReaderThread); +}; + +TEST(MessagePipeDispatcherTest, Stress) { + static const size_t kNumWriters = 30; + static const size_t kNumReaders = kNumWriters; + + scoped_refptr<MessagePipeDispatcher> d_write(new MessagePipeDispatcher( + MessagePipeDispatcher::kDefaultCreateOptions)); + scoped_refptr<MessagePipeDispatcher> d_read(new MessagePipeDispatcher( + MessagePipeDispatcher::kDefaultCreateOptions)); + { + scoped_refptr<MessagePipe> mp(new MessagePipe()); + d_write->Init(mp, 0); + d_read->Init(mp, 1); + } + + size_t messages_written[kNumWriters]; + size_t bytes_written[kNumWriters]; + size_t messages_read[kNumReaders]; + size_t bytes_read[kNumReaders]; + { + // Make writers. + ScopedVector<WriterThread> writers; + for (size_t i = 0; i < kNumWriters; i++) { + writers.push_back( + new WriterThread(d_write, &messages_written[i], &bytes_written[i])); + } + + // Make readers. + ScopedVector<ReaderThread> readers; + for (size_t i = 0; i < kNumReaders; i++) { + readers.push_back( + new ReaderThread(d_read, &messages_read[i], &bytes_read[i])); + } + + // Start writers. + for (size_t i = 0; i < kNumWriters; i++) + writers[i]->Start(); + + // Start readers. + for (size_t i = 0; i < kNumReaders; i++) + readers[i]->Start(); + + // TODO(vtl): Maybe I should have an event that triggers all the threads to + // start doing stuff for real (so that the first ones created/started aren't + // advantaged). + } // Joins all the threads. + + size_t total_messages_written = 0; + size_t total_bytes_written = 0; + for (size_t i = 0; i < kNumWriters; i++) { + total_messages_written += messages_written[i]; + total_bytes_written += bytes_written[i]; + } + size_t total_messages_read = 0; + size_t total_bytes_read = 0; + for (size_t i = 0; i < kNumReaders; i++) { + total_messages_read += messages_read[i]; + total_bytes_read += bytes_read[i]; + // We'd have to be really unlucky to have read no messages on a thread. + EXPECT_GT(messages_read[i], 0u) << "reader: " << i; + EXPECT_GE(bytes_read[i], messages_read[i]) << "reader: " << i; + } + EXPECT_EQ(total_messages_written, total_messages_read); + EXPECT_EQ(total_bytes_written, total_bytes_read); + + EXPECT_EQ(MOJO_RESULT_OK, d_write->Close()); + EXPECT_EQ(MOJO_RESULT_OK, d_read->Close()); +} + +} // namespace +} // namespace system +} // namespace mojo |