diff options
Diffstat (limited to 'chromium/mojo/system/raw_channel_posix.cc')
-rw-r--r-- | chromium/mojo/system/raw_channel_posix.cc | 469 |
1 files changed, 469 insertions, 0 deletions
diff --git a/chromium/mojo/system/raw_channel_posix.cc b/chromium/mojo/system/raw_channel_posix.cc new file mode 100644 index 00000000000..ba23599f8f7 --- /dev/null +++ b/chromium/mojo/system/raw_channel_posix.cc @@ -0,0 +1,469 @@ +// Copyright 2013 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "mojo/system/raw_channel.h" + +#include <errno.h> +#include <sys/uio.h> +#include <unistd.h> + +#include <algorithm> +#include <deque> + +#include "base/basictypes.h" +#include "base/bind.h" +#include "base/compiler_specific.h" +#include "base/location.h" +#include "base/logging.h" +#include "base/memory/scoped_ptr.h" +#include "base/memory/weak_ptr.h" +#include "base/message_loop/message_loop.h" +#include "base/synchronization/lock.h" +#include "mojo/embedder/platform_channel_utils_posix.h" +#include "mojo/embedder/platform_handle.h" +#include "mojo/embedder/platform_handle_vector.h" +#include "mojo/system/transport_data.h" + +namespace mojo { +namespace system { + +namespace { + +class RawChannelPosix : public RawChannel, + public base::MessageLoopForIO::Watcher { + public: + explicit RawChannelPosix(embedder::ScopedPlatformHandle handle); + virtual ~RawChannelPosix(); + + // |RawChannel| public methods: + virtual size_t GetSerializedPlatformHandleSize() const OVERRIDE; + + private: + // |RawChannel| protected methods: + // Actually override this so that we can send multiple messages with (only) + // FDs if necessary. + virtual void EnqueueMessageNoLock( + scoped_ptr<MessageInTransit> message) OVERRIDE; + // Override this to handle those extra FD-only messages. + virtual bool OnReadMessageForRawChannel( + const MessageInTransit::View& message_view) OVERRIDE; + virtual IOResult Read(size_t* bytes_read) OVERRIDE; + virtual IOResult ScheduleRead() OVERRIDE; + virtual embedder::ScopedPlatformHandleVectorPtr GetReadPlatformHandles( + size_t num_platform_handles, + const void* platform_handle_table) OVERRIDE; + virtual IOResult WriteNoLock(size_t* platform_handles_written, + size_t* bytes_written) OVERRIDE; + virtual IOResult ScheduleWriteNoLock() OVERRIDE; + virtual bool OnInit() OVERRIDE; + virtual void OnShutdownNoLock( + scoped_ptr<ReadBuffer> read_buffer, + scoped_ptr<WriteBuffer> write_buffer) OVERRIDE; + + // |base::MessageLoopForIO::Watcher| implementation: + virtual void OnFileCanReadWithoutBlocking(int fd) OVERRIDE; + virtual void OnFileCanWriteWithoutBlocking(int fd) OVERRIDE; + + // Watches for |fd_| to become writable. Must be called on the I/O thread. + void WaitToWrite(); + + embedder::ScopedPlatformHandle fd_; + + // The following members are only used on the I/O thread: + scoped_ptr<base::MessageLoopForIO::FileDescriptorWatcher> read_watcher_; + scoped_ptr<base::MessageLoopForIO::FileDescriptorWatcher> write_watcher_; + + bool pending_read_; + + std::deque<embedder::PlatformHandle> read_platform_handles_; + + // The following members are used on multiple threads and protected by + // |write_lock()|: + bool pending_write_; + + // This is used for posting tasks from write threads to the I/O thread. It + // must only be accessed under |write_lock_|. The weak pointers it produces + // are only used/invalidated on the I/O thread. + base::WeakPtrFactory<RawChannelPosix> weak_ptr_factory_; + + DISALLOW_COPY_AND_ASSIGN(RawChannelPosix); +}; + +RawChannelPosix::RawChannelPosix(embedder::ScopedPlatformHandle handle) + : fd_(handle.Pass()), + pending_read_(false), + pending_write_(false), + weak_ptr_factory_(this) { + DCHECK(fd_.is_valid()); +} + +RawChannelPosix::~RawChannelPosix() { + DCHECK(!pending_read_); + DCHECK(!pending_write_); + + // No need to take the |write_lock()| here -- if there are still weak pointers + // outstanding, then we're hosed anyway (since we wouldn't be able to + // invalidate them cleanly, since we might not be on the I/O thread). + DCHECK(!weak_ptr_factory_.HasWeakPtrs()); + + // These must have been shut down/destroyed on the I/O thread. + DCHECK(!read_watcher_); + DCHECK(!write_watcher_); + + embedder::CloseAllPlatformHandles(&read_platform_handles_); +} + +size_t RawChannelPosix::GetSerializedPlatformHandleSize() const { + // We don't actually need any space on POSIX (since we just send FDs). + return 0; +} + +void RawChannelPosix::EnqueueMessageNoLock( + scoped_ptr<MessageInTransit> message) { + if (message->transport_data()) { + embedder::PlatformHandleVector* const platform_handles = + message->transport_data()->platform_handles(); + if (platform_handles && + platform_handles->size() > embedder::kPlatformChannelMaxNumHandles) { + // We can't attach all the FDs to a single message, so we have to "split" + // the message. Send as many control messages as needed first with FDs + // attached (and no data). + size_t i = 0; + for (; platform_handles->size() - i > + embedder::kPlatformChannelMaxNumHandles; + i += embedder::kPlatformChannelMaxNumHandles) { + scoped_ptr<MessageInTransit> fd_message( + new MessageInTransit( + MessageInTransit::kTypeRawChannel, + MessageInTransit::kSubtypeRawChannelPosixExtraPlatformHandles, + 0, + NULL)); + embedder::ScopedPlatformHandleVectorPtr fds( + new embedder::PlatformHandleVector( + platform_handles->begin() + i, + platform_handles->begin() + i + + embedder::kPlatformChannelMaxNumHandles)); + fd_message->SetTransportData( + make_scoped_ptr(new TransportData(fds.Pass()))); + RawChannel::EnqueueMessageNoLock(fd_message.Pass()); + } + + // Remove the handles that we "moved" into the other messages. + platform_handles->erase(platform_handles->begin(), + platform_handles->begin() + i); + } + } + + RawChannel::EnqueueMessageNoLock(message.Pass()); +} + +bool RawChannelPosix::OnReadMessageForRawChannel( + const MessageInTransit::View& message_view) { + DCHECK_EQ(message_view.type(), MessageInTransit::kTypeRawChannel); + + if (message_view.subtype() == + MessageInTransit::kSubtypeRawChannelPosixExtraPlatformHandles) { + // We don't need to do anything. |RawChannel| won't extract the platform + // handles, and they'll be accumulated in |Read()|. + return true; + } + + return RawChannel::OnReadMessageForRawChannel(message_view); +} + +RawChannel::IOResult RawChannelPosix::Read(size_t* bytes_read) { + DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io()); + DCHECK(!pending_read_); + + char* buffer = NULL; + size_t bytes_to_read = 0; + read_buffer()->GetBuffer(&buffer, &bytes_to_read); + + size_t old_num_platform_handles = read_platform_handles_.size(); + ssize_t read_result = + embedder::PlatformChannelRecvmsg(fd_.get(), + buffer, + bytes_to_read, + &read_platform_handles_); + if (read_platform_handles_.size() > old_num_platform_handles) { + DCHECK_LE(read_platform_handles_.size() - old_num_platform_handles, + embedder::kPlatformChannelMaxNumHandles); + + // We should never accumulate more than |TransportData::kMaxPlatformHandles + // + embedder::kPlatformChannelMaxNumHandles| handles. (The latter part is + // possible because we could have accumulated all the handles for a message, + // then received the message data plus the first set of handles for the next + // message in the subsequent |recvmsg()|.) + if (read_platform_handles_.size() > (TransportData::kMaxPlatformHandles + + embedder::kPlatformChannelMaxNumHandles)) { + LOG(WARNING) << "Received too many platform handles"; + embedder::CloseAllPlatformHandles(&read_platform_handles_); + read_platform_handles_.clear(); + return IO_FAILED; + } + } + + if (read_result > 0) { + *bytes_read = static_cast<size_t>(read_result); + return IO_SUCCEEDED; + } + + // |read_result == 0| means "end of file". + if (read_result == 0 || (errno != EAGAIN && errno != EWOULDBLOCK)) { + PLOG_IF(WARNING, read_result != 0) << "recvmsg"; + + // Make sure that |OnFileCanReadWithoutBlocking()| won't be called again. + read_watcher_.reset(); + + return IO_FAILED; + } + + return ScheduleRead(); +} + +RawChannel::IOResult RawChannelPosix::ScheduleRead() { + DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io()); + DCHECK(!pending_read_); + + pending_read_ = true; + + return IO_PENDING; +} + +embedder::ScopedPlatformHandleVectorPtr RawChannelPosix::GetReadPlatformHandles( + size_t num_platform_handles, + const void* /*platform_handle_table*/) { + DCHECK_GT(num_platform_handles, 0u); + + if (read_platform_handles_.size() < num_platform_handles) { + embedder::CloseAllPlatformHandles(&read_platform_handles_); + read_platform_handles_.clear(); + return embedder::ScopedPlatformHandleVectorPtr(); + } + + embedder::ScopedPlatformHandleVectorPtr rv( + new embedder::PlatformHandleVector(num_platform_handles)); + rv->assign(read_platform_handles_.begin(), + read_platform_handles_.begin() + num_platform_handles); + read_platform_handles_.erase( + read_platform_handles_.begin(), + read_platform_handles_.begin() + num_platform_handles); + return rv.Pass(); +} + +RawChannel::IOResult RawChannelPosix::WriteNoLock( + size_t* platform_handles_written, + size_t* bytes_written) { + write_lock().AssertAcquired(); + + DCHECK(!pending_write_); + + size_t num_platform_handles = 0; + ssize_t write_result; + if (write_buffer_no_lock()->HavePlatformHandlesToSend()) { + embedder::PlatformHandle* platform_handles; + void* serialization_data; // Actually unused. + write_buffer_no_lock()->GetPlatformHandlesToSend(&num_platform_handles, + &platform_handles, + &serialization_data); + DCHECK_GT(num_platform_handles, 0u); + DCHECK_LE(num_platform_handles, embedder::kPlatformChannelMaxNumHandles); + DCHECK(platform_handles); + + // TODO(vtl): Reduce code duplication. (This is duplicated from below.) + std::vector<WriteBuffer::Buffer> buffers; + write_buffer_no_lock()->GetBuffers(&buffers); + DCHECK(!buffers.empty()); + const size_t kMaxBufferCount = 10; + iovec iov[kMaxBufferCount]; + size_t buffer_count = std::min(buffers.size(), kMaxBufferCount); + for (size_t i = 0; i < buffer_count; ++i) { + iov[i].iov_base = const_cast<char*>(buffers[i].addr); + iov[i].iov_len = buffers[i].size; + } + + write_result = embedder::PlatformChannelSendmsgWithHandles( + fd_.get(), iov, buffer_count, platform_handles, num_platform_handles); + for (size_t i = 0; i < num_platform_handles; i++) + platform_handles[i].CloseIfNecessary(); + } else { + std::vector<WriteBuffer::Buffer> buffers; + write_buffer_no_lock()->GetBuffers(&buffers); + DCHECK(!buffers.empty()); + + if (buffers.size() == 1) { + write_result = embedder::PlatformChannelWrite(fd_.get(), buffers[0].addr, + buffers[0].size); + } else { + const size_t kMaxBufferCount = 10; + iovec iov[kMaxBufferCount]; + size_t buffer_count = std::min(buffers.size(), kMaxBufferCount); + for (size_t i = 0; i < buffer_count; ++i) { + iov[i].iov_base = const_cast<char*>(buffers[i].addr); + iov[i].iov_len = buffers[i].size; + } + + write_result = embedder::PlatformChannelWritev(fd_.get(), iov, + buffer_count); + } + } + + if (write_result >= 0) { + *platform_handles_written = num_platform_handles; + *bytes_written = static_cast<size_t>(write_result); + return IO_SUCCEEDED; + } + + if (errno != EAGAIN && errno != EWOULDBLOCK) { + PLOG(ERROR) << "sendmsg/write/writev"; + return IO_FAILED; + } + + return ScheduleWriteNoLock(); +} + +RawChannel::IOResult RawChannelPosix::ScheduleWriteNoLock() { + write_lock().AssertAcquired(); + + DCHECK(!pending_write_); + + // Set up to wait for the FD to become writable. + // If we're not on the I/O thread, we have to post a task to do this. + if (base::MessageLoop::current() != message_loop_for_io()) { + message_loop_for_io()->PostTask( + FROM_HERE, + base::Bind(&RawChannelPosix::WaitToWrite, + weak_ptr_factory_.GetWeakPtr())); + pending_write_ = true; + return IO_PENDING; + } + + if (message_loop_for_io()->WatchFileDescriptor(fd_.get().fd, false, + base::MessageLoopForIO::WATCH_WRITE, write_watcher_.get(), this)) { + pending_write_ = true; + return IO_PENDING; + } + + return IO_FAILED; +} + +bool RawChannelPosix::OnInit() { + DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io()); + + DCHECK(!read_watcher_); + read_watcher_.reset(new base::MessageLoopForIO::FileDescriptorWatcher()); + DCHECK(!write_watcher_); + write_watcher_.reset(new base::MessageLoopForIO::FileDescriptorWatcher()); + + if (!message_loop_for_io()->WatchFileDescriptor(fd_.get().fd, true, + base::MessageLoopForIO::WATCH_READ, read_watcher_.get(), this)) { + // TODO(vtl): I'm not sure |WatchFileDescriptor()| actually fails cleanly + // (in the sense of returning the message loop's state to what it was before + // it was called). + read_watcher_.reset(); + write_watcher_.reset(); + return false; + } + + return true; +} + +void RawChannelPosix::OnShutdownNoLock( + scoped_ptr<ReadBuffer> /*read_buffer*/, + scoped_ptr<WriteBuffer> /*write_buffer*/) { + DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io()); + write_lock().AssertAcquired(); + + read_watcher_.reset(); // This will stop watching (if necessary). + write_watcher_.reset(); // This will stop watching (if necessary). + + pending_read_ = false; + pending_write_ = false; + + DCHECK(fd_.is_valid()); + fd_.reset(); + + weak_ptr_factory_.InvalidateWeakPtrs(); +} + +void RawChannelPosix::OnFileCanReadWithoutBlocking(int fd) { + DCHECK_EQ(fd, fd_.get().fd); + DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io()); + + if (!pending_read_) { + NOTREACHED(); + return; + } + + pending_read_ = false; + size_t bytes_read = 0; + IOResult result = Read(&bytes_read); + if (result != IO_PENDING) + OnReadCompleted(result == IO_SUCCEEDED, bytes_read); + + // On failure, |read_watcher_| must have been reset; on success, + // we assume that |OnReadCompleted()| always schedules another read. + // Otherwise, we could end up spinning -- getting + // |OnFileCanReadWithoutBlocking()| again and again but not doing any actual + // read. + // TODO(yzshen): An alternative is to stop watching if RawChannel doesn't + // schedule a new read. But that code won't be reached under the current + // RawChannel implementation. + DCHECK(!read_watcher_ || pending_read_); +} + +void RawChannelPosix::OnFileCanWriteWithoutBlocking(int fd) { + DCHECK_EQ(fd, fd_.get().fd); + DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io()); + + IOResult result = IO_FAILED; + size_t platform_handles_written = 0; + size_t bytes_written = 0; + { + base::AutoLock locker(write_lock()); + + DCHECK(pending_write_); + + pending_write_ = false; + result = WriteNoLock(&platform_handles_written, &bytes_written); + } + + if (result != IO_PENDING) { + OnWriteCompleted(result == IO_SUCCEEDED, + platform_handles_written, + bytes_written); + } +} + +void RawChannelPosix::WaitToWrite() { + DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io()); + + DCHECK(write_watcher_); + + if (!message_loop_for_io()->WatchFileDescriptor( + fd_.get().fd, false, base::MessageLoopForIO::WATCH_WRITE, + write_watcher_.get(), this)) { + { + base::AutoLock locker(write_lock()); + + DCHECK(pending_write_); + pending_write_ = false; + } + OnWriteCompleted(false, 0, 0); + } +} + +} // namespace + +// ----------------------------------------------------------------------------- + +// Static factory method declared in raw_channel.h. +// static +scoped_ptr<RawChannel> RawChannel::Create( + embedder::ScopedPlatformHandle handle) { + return scoped_ptr<RawChannel>(new RawChannelPosix(handle.Pass())); +} + +} // namespace system +} // namespace mojo |