// Copyright 2019 The Chromium Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. #include "base/message_loop/message_pump_kqueue.h" #include #include "base/auto_reset.h" #include "base/logging.h" #include "base/mac/mac_util.h" #include "base/mac/mach_logging.h" #include "base/mac/scoped_nsautorelease_pool.h" #include "base/posix/eintr_wrapper.h" namespace base { namespace { // Prior to macOS 10.12, a kqueue could not watch individual Mach ports, only // port sets. MessagePumpKqueue will directly use Mach ports in the kqueue if // it is possible. bool KqueueNeedsPortSet() { static bool kqueue_needs_port_set = mac::IsAtMostOS10_11(); return kqueue_needs_port_set; } int ChangeOneEvent(const ScopedFD& kqueue, kevent64_s* event) { return HANDLE_EINTR(kevent64(kqueue.get(), event, 1, nullptr, 0, 0, nullptr)); } } // namespace MessagePumpKqueue::FdWatchController::FdWatchController( const Location& from_here) : FdWatchControllerInterface(from_here) {} MessagePumpKqueue::FdWatchController::~FdWatchController() { StopWatchingFileDescriptor(); } bool MessagePumpKqueue::FdWatchController::StopWatchingFileDescriptor() { if (!pump_) return true; return pump_->StopWatchingFileDescriptor(this); } void MessagePumpKqueue::FdWatchController::Init(WeakPtr pump, int fd, int mode, FdWatcher* watcher) { DCHECK_NE(fd, -1); DCHECK(!watcher_); DCHECK(watcher); DCHECK(pump); fd_ = fd; mode_ = mode; watcher_ = watcher; pump_ = pump; } void MessagePumpKqueue::FdWatchController::Reset() { fd_ = -1; mode_ = 0; watcher_ = nullptr; pump_ = nullptr; } MessagePumpKqueue::MachPortWatchController::MachPortWatchController( const Location& from_here) : from_here_(from_here) {} MessagePumpKqueue::MachPortWatchController::~MachPortWatchController() { StopWatchingMachPort(); } bool MessagePumpKqueue::MachPortWatchController::StopWatchingMachPort() { if (!pump_) return true; return pump_->StopWatchingMachPort(this); } void MessagePumpKqueue::MachPortWatchController::Init( WeakPtr pump, mach_port_t port, MachPortWatcher* watcher) { DCHECK(!watcher_); DCHECK(watcher); DCHECK(pump); port_ = port; watcher_ = watcher; pump_ = pump; } void MessagePumpKqueue::MachPortWatchController::Reset() { port_ = MACH_PORT_NULL; watcher_ = nullptr; pump_ = nullptr; } MessagePumpKqueue::MessagePumpKqueue() : kqueue_(kqueue()), weak_factory_(this) { PCHECK(kqueue_.is_valid()) << "kqueue"; // Create a Mach port that will be used to wake up the pump by sending // a message in response to ScheduleWork(). This is significantly faster than // using an EVFILT_USER event, especially when triggered across threads. kern_return_t kr = mach_port_allocate( mach_task_self(), MACH_PORT_RIGHT_RECEIVE, base::mac::ScopedMachReceiveRight::Receiver(wakeup_).get()); MACH_CHECK(kr == KERN_SUCCESS, kr) << "mach_port_allocate"; kevent64_s event{}; if (KqueueNeedsPortSet()) { kr = mach_port_allocate(mach_task_self(), MACH_PORT_RIGHT_PORT_SET, mac::ScopedMachPortSet::Receiver(port_set_).get()); MACH_CHECK(kr == KERN_SUCCESS, kr) << "mach_port_allocate PORT_SET"; kr = mach_port_insert_member(mach_task_self(), wakeup_.get(), port_set_.get()); MACH_CHECK(kr == KERN_SUCCESS, kr) << "mach_port_insert_member"; event.ident = port_set_.get(); event.filter = EVFILT_MACHPORT; event.flags = EV_ADD; } else { // When not using a port set, the wakeup port event can be specified to // directly receive the Mach message as part of the kevent64() syscall. // This is not done when using a port set, since that would potentially // receive client MachPortWatchers' messages. event.ident = wakeup_.get(); event.filter = EVFILT_MACHPORT; event.flags = EV_ADD; event.fflags = MACH_RCV_MSG; event.ext[0] = reinterpret_cast(&wakeup_buffer_); event.ext[1] = sizeof(wakeup_buffer_); } int rv = ChangeOneEvent(kqueue_, &event); PCHECK(rv == 0) << "kevent64"; } MessagePumpKqueue::~MessagePumpKqueue() {} void MessagePumpKqueue::Run(Delegate* delegate) { AutoReset reset_keep_running(&keep_running_, true); while (keep_running_) { mac::ScopedNSAutoreleasePool pool; bool do_more_work = DoInternalWork(nullptr); if (!keep_running_) break; Delegate::NextWorkInfo next_work_info = delegate->DoSomeWork(); do_more_work |= next_work_info.is_immediate(); if (!keep_running_) break; if (do_more_work) continue; do_more_work |= delegate->DoIdleWork(); if (!keep_running_) break; if (do_more_work) continue; DoInternalWork(&next_work_info); } } void MessagePumpKqueue::Quit() { keep_running_ = false; ScheduleWork(); } void MessagePumpKqueue::ScheduleWork() { mach_msg_empty_send_t message{}; message.header.msgh_size = sizeof(message); message.header.msgh_bits = MACH_MSGH_BITS_REMOTE(MACH_MSG_TYPE_MAKE_SEND_ONCE); message.header.msgh_remote_port = wakeup_.get(); kern_return_t kr = mach_msg_send(&message.header); if (kr != KERN_SUCCESS) { // If ScheduleWork() is being called by other threads faster than the pump // can dispatch work, the kernel message queue for the wakeup port can fill // up (this happens under base_perftests, for example). The kernel does // return a SEND_ONCE right in the case of failure, which must be destroyed // to avoid leaking. MACH_DLOG_IF(ERROR, (kr & ~MACH_MSG_IPC_SPACE) != MACH_SEND_NO_BUFFER, kr) << "mach_msg_send"; mach_msg_destroy(&message.header); } } void MessagePumpKqueue::ScheduleDelayedWork( const TimeTicks& delayed_work_time) { // Nothing to do. This MessagePump uses DoSomeWork(). } bool MessagePumpKqueue::WatchMachReceivePort( mach_port_t port, MachPortWatchController* controller, MachPortWatcher* delegate) { DCHECK(port != MACH_PORT_NULL); DCHECK(controller); DCHECK(delegate); if (controller->port() != MACH_PORT_NULL) { DLOG(ERROR) << "Cannot use the same MachPortWatchController while it is active"; return false; } if (KqueueNeedsPortSet()) { kern_return_t kr = mach_port_insert_member(mach_task_self(), port, port_set_.get()); if (kr != KERN_SUCCESS) { MACH_LOG(ERROR, kr) << "mach_port_insert_member"; return false; } } else { kevent64_s event{}; event.ident = port; event.filter = EVFILT_MACHPORT; event.flags = EV_ADD; int rv = ChangeOneEvent(kqueue_, &event); if (rv < 0) { DPLOG(ERROR) << "kevent64"; return false; } ++event_count_; } controller->Init(weak_factory_.GetWeakPtr(), port, delegate); port_controllers_.AddWithID(controller, port); return true; } bool MessagePumpKqueue::WatchFileDescriptor(int fd, bool persistent, int mode, FdWatchController* controller, FdWatcher* delegate) { DCHECK_GE(fd, 0); DCHECK(controller); DCHECK(delegate); DCHECK_NE(mode & Mode::WATCH_READ_WRITE, 0); if (controller->fd() != -1 && controller->fd() != fd) { DLOG(ERROR) << "Cannot use the same FdWatchController on two different FDs"; return false; } StopWatchingFileDescriptor(controller); std::vector events; kevent64_s base_event{}; base_event.ident = fd; base_event.flags = EV_ADD | (!persistent ? EV_ONESHOT : 0); if (mode & Mode::WATCH_READ) { base_event.filter = EVFILT_READ; base_event.udata = fd_controllers_.Add(controller); events.push_back(base_event); } if (mode & Mode::WATCH_WRITE) { base_event.filter = EVFILT_WRITE; base_event.udata = fd_controllers_.Add(controller); events.push_back(base_event); } int rv = HANDLE_EINTR(kevent64(kqueue_.get(), events.data(), events.size(), nullptr, 0, 0, nullptr)); if (rv < 0) { DPLOG(ERROR) << "WatchFileDescriptor kevent64"; return false; } event_count_ += events.size(); controller->Init(weak_factory_.GetWeakPtr(), fd, mode, delegate); return true; } bool MessagePumpKqueue::StopWatchingMachPort( MachPortWatchController* controller) { mach_port_t port = controller->port(); controller->Reset(); port_controllers_.Remove(port); if (KqueueNeedsPortSet()) { kern_return_t kr = mach_port_extract_member(mach_task_self(), port, port_set_.get()); if (kr != KERN_SUCCESS) { MACH_LOG(ERROR, kr) << "mach_port_extract_member"; return false; } } else { kevent64_s event{}; event.ident = port; event.filter = EVFILT_MACHPORT; event.flags = EV_DELETE; --event_count_; int rv = ChangeOneEvent(kqueue_, &event); if (rv < 0) { DPLOG(ERROR) << "kevent64"; return false; } } return true; } bool MessagePumpKqueue::StopWatchingFileDescriptor( FdWatchController* controller) { int fd = controller->fd(); int mode = controller->mode(); controller->Reset(); if (fd == -1) return true; std::vector events; kevent64_s base_event{}; base_event.ident = fd; base_event.flags = EV_DELETE; if (mode & Mode::WATCH_READ) { base_event.filter = EVFILT_READ; events.push_back(base_event); } if (mode & Mode::WATCH_WRITE) { base_event.filter = EVFILT_WRITE; events.push_back(base_event); } int rv = HANDLE_EINTR(kevent64(kqueue_.get(), events.data(), events.size(), nullptr, 0, 0, nullptr)); DPLOG_IF(ERROR, rv < 0) << "StopWatchingFileDescriptor kevent64"; // The keys for the IDMap aren't recorded anywhere (they're attached to the // kevent object in the kernel), so locate the entries by controller pointer. for (auto it = IDMap::iterator(&fd_controllers_); !it.IsAtEnd(); it.Advance()) { if (it.GetCurrentValue() == controller) { fd_controllers_.Remove(it.GetCurrentKey()); } } event_count_ -= events.size(); return rv >= 0; } bool MessagePumpKqueue::DoInternalWork(Delegate::NextWorkInfo* next_work_info) { if (events_.size() < event_count_) { events_.resize(event_count_); } bool poll = next_work_info == nullptr; int flags = poll ? KEVENT_FLAG_IMMEDIATE : 0; bool indefinite = next_work_info != nullptr && next_work_info->delayed_run_time.is_max(); int rv = 0; do { timespec timeout{}; if (!indefinite && !poll) { if (rv != 0) { // The wait was interrupted and made |next_work_info|'s view of // TimeTicks::Now() stale. Refresh it before doing another wait. next_work_info->recent_now = TimeTicks::Now(); } timeout = next_work_info->remaining_delay().ToTimeSpec(); } // This does not use HANDLE_EINTR, since retrying the syscall requires // adjusting the timeout to account for time already waited. rv = kevent64(kqueue_.get(), nullptr, 0, events_.data(), events_.size(), flags, indefinite ? nullptr : &timeout); } while (rv < 0 && errno == EINTR); PCHECK(rv >= 0) << "kevent64"; return ProcessEvents(rv); } bool MessagePumpKqueue::ProcessEvents(int count) { bool did_work = false; for (int i = 0; i < count; ++i) { auto* event = &events_[i]; if (event->filter == EVFILT_READ || event->filter == EVFILT_WRITE) { did_work = true; FdWatchController* controller = fd_controllers_.Lookup(event->udata); if (!controller) { // The controller was removed by some other work callout before // this event could be processed. continue; } FdWatcher* delegate = controller->watcher(); if (event->flags & EV_ONESHOT) { // If this was a one-shot event, the Controller needs to stop tracking // the descriptor, so it is not double-removed when it is told to stop // watching. controller->Reset(); fd_controllers_.Remove(event->udata); --event_count_; } if (event->filter == EVFILT_READ) { delegate->OnFileCanReadWithoutBlocking(event->ident); } else if (event->filter == EVFILT_WRITE) { delegate->OnFileCanWriteWithoutBlocking(event->ident); } } else if (event->filter == EVFILT_MACHPORT) { mach_port_t port = KqueueNeedsPortSet() ? event->data : event->ident; if (port == wakeup_.get()) { // The wakeup event has been received, do not treat this as "doing // work", this just wakes up the pump. if (KqueueNeedsPortSet()) { // When using the kqueue directly, the message can be received // straight into a buffer that was created when adding the event. // But when using a port set, the message must be drained manually. wakeup_buffer_.header.msgh_local_port = port; wakeup_buffer_.header.msgh_size = sizeof(wakeup_buffer_); kern_return_t kr = mach_msg_receive(&wakeup_buffer_.header); MACH_LOG_IF(ERROR, kr != KERN_SUCCESS, kr) << "mach_msg_receive wakeup"; } continue; } did_work = true; MachPortWatchController* controller = port_controllers_.Lookup(port); // The controller could have been removed by some other work callout // before this event could be processed. if (controller) { controller->watcher()->OnMachMessageReceived(port); } } else { NOTREACHED() << "Unexpected event for filter " << event->filter; } } return did_work; } } // namespace base