diff options
Diffstat (limited to 'chromium/mojo/system/local_message_pipe_endpoint.cc')
-rw-r--r-- | chromium/mojo/system/local_message_pipe_endpoint.cc | 165 |
1 files changed, 165 insertions, 0 deletions
diff --git a/chromium/mojo/system/local_message_pipe_endpoint.cc b/chromium/mojo/system/local_message_pipe_endpoint.cc new file mode 100644 index 00000000000..09decd3fabb --- /dev/null +++ b/chromium/mojo/system/local_message_pipe_endpoint.cc @@ -0,0 +1,165 @@ +// Copyright 2013 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "mojo/system/local_message_pipe_endpoint.h" + +#include <string.h> + +#include "base/logging.h" +#include "mojo/system/dispatcher.h" +#include "mojo/system/message_in_transit.h" + +namespace mojo { +namespace system { + +LocalMessagePipeEndpoint::LocalMessagePipeEndpoint() + : is_open_(true), + is_peer_open_(true) { +} + +LocalMessagePipeEndpoint::~LocalMessagePipeEndpoint() { + DCHECK(!is_open_); + DCHECK(message_queue_.IsEmpty()); // Should be implied by not being open. +} + +MessagePipeEndpoint::Type LocalMessagePipeEndpoint::GetType() const { + return kTypeLocal; +} + +bool LocalMessagePipeEndpoint::OnPeerClose() { + DCHECK(is_open_); + DCHECK(is_peer_open_); + + HandleSignalsState old_state = GetHandleSignalsState(); + is_peer_open_ = false; + HandleSignalsState new_state = GetHandleSignalsState(); + + if (!new_state.equals(old_state)) + waiter_list_.AwakeWaitersForStateChange(new_state); + + return true; +} + +void LocalMessagePipeEndpoint::EnqueueMessage( + scoped_ptr<MessageInTransit> message) { + DCHECK(is_open_); + DCHECK(is_peer_open_); + + bool was_empty = message_queue_.IsEmpty(); + message_queue_.AddMessage(message.Pass()); + if (was_empty) + waiter_list_.AwakeWaitersForStateChange(GetHandleSignalsState()); +} + +void LocalMessagePipeEndpoint::Close() { + DCHECK(is_open_); + is_open_ = false; + message_queue_.Clear(); +} + +void LocalMessagePipeEndpoint::CancelAllWaiters() { + DCHECK(is_open_); + waiter_list_.CancelAllWaiters(); +} + +MojoResult LocalMessagePipeEndpoint::ReadMessage(void* bytes, + uint32_t* num_bytes, + DispatcherVector* dispatchers, + uint32_t* num_dispatchers, + MojoReadMessageFlags flags) { + DCHECK(is_open_); + DCHECK(!dispatchers || dispatchers->empty()); + + const uint32_t max_bytes = num_bytes ? *num_bytes : 0; + const uint32_t max_num_dispatchers = num_dispatchers ? *num_dispatchers : 0; + + if (message_queue_.IsEmpty()) { + return is_peer_open_ ? MOJO_RESULT_SHOULD_WAIT : + MOJO_RESULT_FAILED_PRECONDITION; + } + + // TODO(vtl): If |flags & MOJO_READ_MESSAGE_FLAG_MAY_DISCARD|, we could pop + // and release the lock immediately. + bool enough_space = true; + MessageInTransit* message = message_queue_.PeekMessage(); + if (num_bytes) + *num_bytes = message->num_bytes(); + if (message->num_bytes() <= max_bytes) + memcpy(bytes, message->bytes(), message->num_bytes()); + else + enough_space = false; + + if (DispatcherVector* queued_dispatchers = message->dispatchers()) { + if (num_dispatchers) + *num_dispatchers = static_cast<uint32_t>(queued_dispatchers->size()); + if (enough_space) { + if (queued_dispatchers->empty()) { + // Nothing to do. + } else if (queued_dispatchers->size() <= max_num_dispatchers) { + DCHECK(dispatchers); + dispatchers->swap(*queued_dispatchers); + } else { + enough_space = false; + } + } + } else { + if (num_dispatchers) + *num_dispatchers = 0; + } + + message = NULL; + + if (enough_space || (flags & MOJO_READ_MESSAGE_FLAG_MAY_DISCARD)) { + message_queue_.DiscardMessage(); + + // Now it's empty, thus no longer readable. + if (message_queue_.IsEmpty()) { + // It's currently not possible to wait for non-readability, but we should + // do the state change anyway. + waiter_list_.AwakeWaitersForStateChange(GetHandleSignalsState()); + } + } + + if (!enough_space) + return MOJO_RESULT_RESOURCE_EXHAUSTED; + + return MOJO_RESULT_OK; +} + +MojoResult LocalMessagePipeEndpoint::AddWaiter(Waiter* waiter, + MojoHandleSignals signals, + uint32_t context) { + DCHECK(is_open_); + + HandleSignalsState state = GetHandleSignalsState(); + if (state.satisfies(signals)) + return MOJO_RESULT_ALREADY_EXISTS; + if (!state.can_satisfy(signals)) + return MOJO_RESULT_FAILED_PRECONDITION; + + waiter_list_.AddWaiter(waiter, signals, context); + return MOJO_RESULT_OK; +} + +void LocalMessagePipeEndpoint::RemoveWaiter(Waiter* waiter) { + DCHECK(is_open_); + waiter_list_.RemoveWaiter(waiter); +} + +HandleSignalsState LocalMessagePipeEndpoint::GetHandleSignalsState() { + HandleSignalsState rv; + if (!message_queue_.IsEmpty()) { + rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_READABLE; + rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE; + } + if (is_peer_open_) { + rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_WRITABLE; + rv.satisfiable_signals |= + MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE; + } + return rv; +} + +} // namespace system +} // namespace mojo |