summaryrefslogtreecommitdiffstats
path: root/chromium/mojo/system/raw_channel_posix.cc
diff options
context:
space:
mode:
Diffstat (limited to 'chromium/mojo/system/raw_channel_posix.cc')
-rw-r--r--chromium/mojo/system/raw_channel_posix.cc469
1 files changed, 469 insertions, 0 deletions
diff --git a/chromium/mojo/system/raw_channel_posix.cc b/chromium/mojo/system/raw_channel_posix.cc
new file mode 100644
index 00000000000..ba23599f8f7
--- /dev/null
+++ b/chromium/mojo/system/raw_channel_posix.cc
@@ -0,0 +1,469 @@
+// Copyright 2013 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "mojo/system/raw_channel.h"
+
+#include <errno.h>
+#include <sys/uio.h>
+#include <unistd.h>
+
+#include <algorithm>
+#include <deque>
+
+#include "base/basictypes.h"
+#include "base/bind.h"
+#include "base/compiler_specific.h"
+#include "base/location.h"
+#include "base/logging.h"
+#include "base/memory/scoped_ptr.h"
+#include "base/memory/weak_ptr.h"
+#include "base/message_loop/message_loop.h"
+#include "base/synchronization/lock.h"
+#include "mojo/embedder/platform_channel_utils_posix.h"
+#include "mojo/embedder/platform_handle.h"
+#include "mojo/embedder/platform_handle_vector.h"
+#include "mojo/system/transport_data.h"
+
+namespace mojo {
+namespace system {
+
+namespace {
+
+class RawChannelPosix : public RawChannel,
+ public base::MessageLoopForIO::Watcher {
+ public:
+ explicit RawChannelPosix(embedder::ScopedPlatformHandle handle);
+ virtual ~RawChannelPosix();
+
+ // |RawChannel| public methods:
+ virtual size_t GetSerializedPlatformHandleSize() const OVERRIDE;
+
+ private:
+ // |RawChannel| protected methods:
+ // Actually override this so that we can send multiple messages with (only)
+ // FDs if necessary.
+ virtual void EnqueueMessageNoLock(
+ scoped_ptr<MessageInTransit> message) OVERRIDE;
+ // Override this to handle those extra FD-only messages.
+ virtual bool OnReadMessageForRawChannel(
+ const MessageInTransit::View& message_view) OVERRIDE;
+ virtual IOResult Read(size_t* bytes_read) OVERRIDE;
+ virtual IOResult ScheduleRead() OVERRIDE;
+ virtual embedder::ScopedPlatformHandleVectorPtr GetReadPlatformHandles(
+ size_t num_platform_handles,
+ const void* platform_handle_table) OVERRIDE;
+ virtual IOResult WriteNoLock(size_t* platform_handles_written,
+ size_t* bytes_written) OVERRIDE;
+ virtual IOResult ScheduleWriteNoLock() OVERRIDE;
+ virtual bool OnInit() OVERRIDE;
+ virtual void OnShutdownNoLock(
+ scoped_ptr<ReadBuffer> read_buffer,
+ scoped_ptr<WriteBuffer> write_buffer) OVERRIDE;
+
+ // |base::MessageLoopForIO::Watcher| implementation:
+ virtual void OnFileCanReadWithoutBlocking(int fd) OVERRIDE;
+ virtual void OnFileCanWriteWithoutBlocking(int fd) OVERRIDE;
+
+ // Watches for |fd_| to become writable. Must be called on the I/O thread.
+ void WaitToWrite();
+
+ embedder::ScopedPlatformHandle fd_;
+
+ // The following members are only used on the I/O thread:
+ scoped_ptr<base::MessageLoopForIO::FileDescriptorWatcher> read_watcher_;
+ scoped_ptr<base::MessageLoopForIO::FileDescriptorWatcher> write_watcher_;
+
+ bool pending_read_;
+
+ std::deque<embedder::PlatformHandle> read_platform_handles_;
+
+ // The following members are used on multiple threads and protected by
+ // |write_lock()|:
+ bool pending_write_;
+
+ // This is used for posting tasks from write threads to the I/O thread. It
+ // must only be accessed under |write_lock_|. The weak pointers it produces
+ // are only used/invalidated on the I/O thread.
+ base::WeakPtrFactory<RawChannelPosix> weak_ptr_factory_;
+
+ DISALLOW_COPY_AND_ASSIGN(RawChannelPosix);
+};
+
+RawChannelPosix::RawChannelPosix(embedder::ScopedPlatformHandle handle)
+ : fd_(handle.Pass()),
+ pending_read_(false),
+ pending_write_(false),
+ weak_ptr_factory_(this) {
+ DCHECK(fd_.is_valid());
+}
+
+RawChannelPosix::~RawChannelPosix() {
+ DCHECK(!pending_read_);
+ DCHECK(!pending_write_);
+
+ // No need to take the |write_lock()| here -- if there are still weak pointers
+ // outstanding, then we're hosed anyway (since we wouldn't be able to
+ // invalidate them cleanly, since we might not be on the I/O thread).
+ DCHECK(!weak_ptr_factory_.HasWeakPtrs());
+
+ // These must have been shut down/destroyed on the I/O thread.
+ DCHECK(!read_watcher_);
+ DCHECK(!write_watcher_);
+
+ embedder::CloseAllPlatformHandles(&read_platform_handles_);
+}
+
+size_t RawChannelPosix::GetSerializedPlatformHandleSize() const {
+ // We don't actually need any space on POSIX (since we just send FDs).
+ return 0;
+}
+
+void RawChannelPosix::EnqueueMessageNoLock(
+ scoped_ptr<MessageInTransit> message) {
+ if (message->transport_data()) {
+ embedder::PlatformHandleVector* const platform_handles =
+ message->transport_data()->platform_handles();
+ if (platform_handles &&
+ platform_handles->size() > embedder::kPlatformChannelMaxNumHandles) {
+ // We can't attach all the FDs to a single message, so we have to "split"
+ // the message. Send as many control messages as needed first with FDs
+ // attached (and no data).
+ size_t i = 0;
+ for (; platform_handles->size() - i >
+ embedder::kPlatformChannelMaxNumHandles;
+ i += embedder::kPlatformChannelMaxNumHandles) {
+ scoped_ptr<MessageInTransit> fd_message(
+ new MessageInTransit(
+ MessageInTransit::kTypeRawChannel,
+ MessageInTransit::kSubtypeRawChannelPosixExtraPlatformHandles,
+ 0,
+ NULL));
+ embedder::ScopedPlatformHandleVectorPtr fds(
+ new embedder::PlatformHandleVector(
+ platform_handles->begin() + i,
+ platform_handles->begin() + i +
+ embedder::kPlatformChannelMaxNumHandles));
+ fd_message->SetTransportData(
+ make_scoped_ptr(new TransportData(fds.Pass())));
+ RawChannel::EnqueueMessageNoLock(fd_message.Pass());
+ }
+
+ // Remove the handles that we "moved" into the other messages.
+ platform_handles->erase(platform_handles->begin(),
+ platform_handles->begin() + i);
+ }
+ }
+
+ RawChannel::EnqueueMessageNoLock(message.Pass());
+}
+
+bool RawChannelPosix::OnReadMessageForRawChannel(
+ const MessageInTransit::View& message_view) {
+ DCHECK_EQ(message_view.type(), MessageInTransit::kTypeRawChannel);
+
+ if (message_view.subtype() ==
+ MessageInTransit::kSubtypeRawChannelPosixExtraPlatformHandles) {
+ // We don't need to do anything. |RawChannel| won't extract the platform
+ // handles, and they'll be accumulated in |Read()|.
+ return true;
+ }
+
+ return RawChannel::OnReadMessageForRawChannel(message_view);
+}
+
+RawChannel::IOResult RawChannelPosix::Read(size_t* bytes_read) {
+ DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io());
+ DCHECK(!pending_read_);
+
+ char* buffer = NULL;
+ size_t bytes_to_read = 0;
+ read_buffer()->GetBuffer(&buffer, &bytes_to_read);
+
+ size_t old_num_platform_handles = read_platform_handles_.size();
+ ssize_t read_result =
+ embedder::PlatformChannelRecvmsg(fd_.get(),
+ buffer,
+ bytes_to_read,
+ &read_platform_handles_);
+ if (read_platform_handles_.size() > old_num_platform_handles) {
+ DCHECK_LE(read_platform_handles_.size() - old_num_platform_handles,
+ embedder::kPlatformChannelMaxNumHandles);
+
+ // We should never accumulate more than |TransportData::kMaxPlatformHandles
+ // + embedder::kPlatformChannelMaxNumHandles| handles. (The latter part is
+ // possible because we could have accumulated all the handles for a message,
+ // then received the message data plus the first set of handles for the next
+ // message in the subsequent |recvmsg()|.)
+ if (read_platform_handles_.size() > (TransportData::kMaxPlatformHandles +
+ embedder::kPlatformChannelMaxNumHandles)) {
+ LOG(WARNING) << "Received too many platform handles";
+ embedder::CloseAllPlatformHandles(&read_platform_handles_);
+ read_platform_handles_.clear();
+ return IO_FAILED;
+ }
+ }
+
+ if (read_result > 0) {
+ *bytes_read = static_cast<size_t>(read_result);
+ return IO_SUCCEEDED;
+ }
+
+ // |read_result == 0| means "end of file".
+ if (read_result == 0 || (errno != EAGAIN && errno != EWOULDBLOCK)) {
+ PLOG_IF(WARNING, read_result != 0) << "recvmsg";
+
+ // Make sure that |OnFileCanReadWithoutBlocking()| won't be called again.
+ read_watcher_.reset();
+
+ return IO_FAILED;
+ }
+
+ return ScheduleRead();
+}
+
+RawChannel::IOResult RawChannelPosix::ScheduleRead() {
+ DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io());
+ DCHECK(!pending_read_);
+
+ pending_read_ = true;
+
+ return IO_PENDING;
+}
+
+embedder::ScopedPlatformHandleVectorPtr RawChannelPosix::GetReadPlatformHandles(
+ size_t num_platform_handles,
+ const void* /*platform_handle_table*/) {
+ DCHECK_GT(num_platform_handles, 0u);
+
+ if (read_platform_handles_.size() < num_platform_handles) {
+ embedder::CloseAllPlatformHandles(&read_platform_handles_);
+ read_platform_handles_.clear();
+ return embedder::ScopedPlatformHandleVectorPtr();
+ }
+
+ embedder::ScopedPlatformHandleVectorPtr rv(
+ new embedder::PlatformHandleVector(num_platform_handles));
+ rv->assign(read_platform_handles_.begin(),
+ read_platform_handles_.begin() + num_platform_handles);
+ read_platform_handles_.erase(
+ read_platform_handles_.begin(),
+ read_platform_handles_.begin() + num_platform_handles);
+ return rv.Pass();
+}
+
+RawChannel::IOResult RawChannelPosix::WriteNoLock(
+ size_t* platform_handles_written,
+ size_t* bytes_written) {
+ write_lock().AssertAcquired();
+
+ DCHECK(!pending_write_);
+
+ size_t num_platform_handles = 0;
+ ssize_t write_result;
+ if (write_buffer_no_lock()->HavePlatformHandlesToSend()) {
+ embedder::PlatformHandle* platform_handles;
+ void* serialization_data; // Actually unused.
+ write_buffer_no_lock()->GetPlatformHandlesToSend(&num_platform_handles,
+ &platform_handles,
+ &serialization_data);
+ DCHECK_GT(num_platform_handles, 0u);
+ DCHECK_LE(num_platform_handles, embedder::kPlatformChannelMaxNumHandles);
+ DCHECK(platform_handles);
+
+ // TODO(vtl): Reduce code duplication. (This is duplicated from below.)
+ std::vector<WriteBuffer::Buffer> buffers;
+ write_buffer_no_lock()->GetBuffers(&buffers);
+ DCHECK(!buffers.empty());
+ const size_t kMaxBufferCount = 10;
+ iovec iov[kMaxBufferCount];
+ size_t buffer_count = std::min(buffers.size(), kMaxBufferCount);
+ for (size_t i = 0; i < buffer_count; ++i) {
+ iov[i].iov_base = const_cast<char*>(buffers[i].addr);
+ iov[i].iov_len = buffers[i].size;
+ }
+
+ write_result = embedder::PlatformChannelSendmsgWithHandles(
+ fd_.get(), iov, buffer_count, platform_handles, num_platform_handles);
+ for (size_t i = 0; i < num_platform_handles; i++)
+ platform_handles[i].CloseIfNecessary();
+ } else {
+ std::vector<WriteBuffer::Buffer> buffers;
+ write_buffer_no_lock()->GetBuffers(&buffers);
+ DCHECK(!buffers.empty());
+
+ if (buffers.size() == 1) {
+ write_result = embedder::PlatformChannelWrite(fd_.get(), buffers[0].addr,
+ buffers[0].size);
+ } else {
+ const size_t kMaxBufferCount = 10;
+ iovec iov[kMaxBufferCount];
+ size_t buffer_count = std::min(buffers.size(), kMaxBufferCount);
+ for (size_t i = 0; i < buffer_count; ++i) {
+ iov[i].iov_base = const_cast<char*>(buffers[i].addr);
+ iov[i].iov_len = buffers[i].size;
+ }
+
+ write_result = embedder::PlatformChannelWritev(fd_.get(), iov,
+ buffer_count);
+ }
+ }
+
+ if (write_result >= 0) {
+ *platform_handles_written = num_platform_handles;
+ *bytes_written = static_cast<size_t>(write_result);
+ return IO_SUCCEEDED;
+ }
+
+ if (errno != EAGAIN && errno != EWOULDBLOCK) {
+ PLOG(ERROR) << "sendmsg/write/writev";
+ return IO_FAILED;
+ }
+
+ return ScheduleWriteNoLock();
+}
+
+RawChannel::IOResult RawChannelPosix::ScheduleWriteNoLock() {
+ write_lock().AssertAcquired();
+
+ DCHECK(!pending_write_);
+
+ // Set up to wait for the FD to become writable.
+ // If we're not on the I/O thread, we have to post a task to do this.
+ if (base::MessageLoop::current() != message_loop_for_io()) {
+ message_loop_for_io()->PostTask(
+ FROM_HERE,
+ base::Bind(&RawChannelPosix::WaitToWrite,
+ weak_ptr_factory_.GetWeakPtr()));
+ pending_write_ = true;
+ return IO_PENDING;
+ }
+
+ if (message_loop_for_io()->WatchFileDescriptor(fd_.get().fd, false,
+ base::MessageLoopForIO::WATCH_WRITE, write_watcher_.get(), this)) {
+ pending_write_ = true;
+ return IO_PENDING;
+ }
+
+ return IO_FAILED;
+}
+
+bool RawChannelPosix::OnInit() {
+ DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io());
+
+ DCHECK(!read_watcher_);
+ read_watcher_.reset(new base::MessageLoopForIO::FileDescriptorWatcher());
+ DCHECK(!write_watcher_);
+ write_watcher_.reset(new base::MessageLoopForIO::FileDescriptorWatcher());
+
+ if (!message_loop_for_io()->WatchFileDescriptor(fd_.get().fd, true,
+ base::MessageLoopForIO::WATCH_READ, read_watcher_.get(), this)) {
+ // TODO(vtl): I'm not sure |WatchFileDescriptor()| actually fails cleanly
+ // (in the sense of returning the message loop's state to what it was before
+ // it was called).
+ read_watcher_.reset();
+ write_watcher_.reset();
+ return false;
+ }
+
+ return true;
+}
+
+void RawChannelPosix::OnShutdownNoLock(
+ scoped_ptr<ReadBuffer> /*read_buffer*/,
+ scoped_ptr<WriteBuffer> /*write_buffer*/) {
+ DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io());
+ write_lock().AssertAcquired();
+
+ read_watcher_.reset(); // This will stop watching (if necessary).
+ write_watcher_.reset(); // This will stop watching (if necessary).
+
+ pending_read_ = false;
+ pending_write_ = false;
+
+ DCHECK(fd_.is_valid());
+ fd_.reset();
+
+ weak_ptr_factory_.InvalidateWeakPtrs();
+}
+
+void RawChannelPosix::OnFileCanReadWithoutBlocking(int fd) {
+ DCHECK_EQ(fd, fd_.get().fd);
+ DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io());
+
+ if (!pending_read_) {
+ NOTREACHED();
+ return;
+ }
+
+ pending_read_ = false;
+ size_t bytes_read = 0;
+ IOResult result = Read(&bytes_read);
+ if (result != IO_PENDING)
+ OnReadCompleted(result == IO_SUCCEEDED, bytes_read);
+
+ // On failure, |read_watcher_| must have been reset; on success,
+ // we assume that |OnReadCompleted()| always schedules another read.
+ // Otherwise, we could end up spinning -- getting
+ // |OnFileCanReadWithoutBlocking()| again and again but not doing any actual
+ // read.
+ // TODO(yzshen): An alternative is to stop watching if RawChannel doesn't
+ // schedule a new read. But that code won't be reached under the current
+ // RawChannel implementation.
+ DCHECK(!read_watcher_ || pending_read_);
+}
+
+void RawChannelPosix::OnFileCanWriteWithoutBlocking(int fd) {
+ DCHECK_EQ(fd, fd_.get().fd);
+ DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io());
+
+ IOResult result = IO_FAILED;
+ size_t platform_handles_written = 0;
+ size_t bytes_written = 0;
+ {
+ base::AutoLock locker(write_lock());
+
+ DCHECK(pending_write_);
+
+ pending_write_ = false;
+ result = WriteNoLock(&platform_handles_written, &bytes_written);
+ }
+
+ if (result != IO_PENDING) {
+ OnWriteCompleted(result == IO_SUCCEEDED,
+ platform_handles_written,
+ bytes_written);
+ }
+}
+
+void RawChannelPosix::WaitToWrite() {
+ DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io());
+
+ DCHECK(write_watcher_);
+
+ if (!message_loop_for_io()->WatchFileDescriptor(
+ fd_.get().fd, false, base::MessageLoopForIO::WATCH_WRITE,
+ write_watcher_.get(), this)) {
+ {
+ base::AutoLock locker(write_lock());
+
+ DCHECK(pending_write_);
+ pending_write_ = false;
+ }
+ OnWriteCompleted(false, 0, 0);
+ }
+}
+
+} // namespace
+
+// -----------------------------------------------------------------------------
+
+// Static factory method declared in raw_channel.h.
+// static
+scoped_ptr<RawChannel> RawChannel::Create(
+ embedder::ScopedPlatformHandle handle) {
+ return scoped_ptr<RawChannel>(new RawChannelPosix(handle.Pass()));
+}
+
+} // namespace system
+} // namespace mojo