diff options
Diffstat (limited to 'chromium/mojo/spy/spy.cc')
-rw-r--r-- | chromium/mojo/spy/spy.cc | 215 |
1 files changed, 215 insertions, 0 deletions
diff --git a/chromium/mojo/spy/spy.cc b/chromium/mojo/spy/spy.cc new file mode 100644 index 00000000000..3fcc49c3d66 --- /dev/null +++ b/chromium/mojo/spy/spy.cc @@ -0,0 +1,215 @@ +// Copyright 2014 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "mojo/spy/spy.h" + +#include "base/bind.h" +#include "base/compiler_specific.h" +#include "base/location.h" +#include "base/memory/ref_counted.h" +#include "base/strings/string_number_conversions.h" +#include "base/strings/string_split.h" +#include "base/threading/thread.h" +#include "base/threading/worker_pool.h" + +#include "mojo/public/cpp/system/core.h" +#include "mojo/service_manager/service_manager.h" +#include "mojo/spy/websocket_server.h" + +namespace { + +const size_t kMessageBufSize = 2 * 1024; +const size_t kHandleBufSize = 64; +const int kDefaultWebSocketPort = 42424; + +void CloseHandles(MojoHandle* handles, size_t count) { + for (size_t ix = 0; ix != count; ++count) + MojoClose(handles[ix]); +} + +// In charge of processing messages that flow over a +// single message pipe. +class MessageProcessor : + public base::RefCountedThreadSafe<MessageProcessor> { + public: + + MessageProcessor() + : last_result_(MOJO_RESULT_OK), + bytes_transfered_(0) { + + message_count_[0] = 0; + message_count_[1] = 0; + handle_count_[0] = 0; + handle_count_[1] = 0; + } + + void Start(mojo::ScopedMessagePipeHandle client, + mojo::ScopedMessagePipeHandle interceptor) { + std::vector<mojo::MessagePipeHandle> pipes; + pipes.push_back(client.get()); + pipes.push_back(interceptor.get()); + std::vector<MojoHandleSignals> handle_signals; + handle_signals.push_back(MOJO_HANDLE_SIGNAL_READABLE); + handle_signals.push_back(MOJO_HANDLE_SIGNAL_READABLE); + + scoped_ptr<char[]> mbuf(new char[kMessageBufSize]); + scoped_ptr<MojoHandle[]> hbuf(new MojoHandle[kHandleBufSize]); + + // Main processing loop: + // 1- Wait for an endpoint to have a message. + // 2- Read the message + // 3- Log data + // 4- Wait until the opposite port is ready for writting + // 4- Write the message to opposite port. + + for (;;) { + int r = WaitMany(pipes, handle_signals, MOJO_DEADLINE_INDEFINITE); + if ((r < 0) || (r > 1)) { + last_result_ = r; + break; + } + + uint32_t bytes_read = kMessageBufSize; + uint32_t handles_read = kHandleBufSize; + + if (!CheckResult(ReadMessageRaw(pipes[r], + mbuf.get(), &bytes_read, + hbuf.get(), &handles_read, + MOJO_READ_MESSAGE_FLAG_NONE))) + break; + + if (!bytes_read && !handles_read) + continue; + + if (handles_read) + handle_count_[r] += handles_read; + + ++message_count_[r]; + bytes_transfered_ += bytes_read; + + mojo::MessagePipeHandle write_handle = (r == 0) ? pipes[1] : pipes[0]; + if (!CheckResult(Wait(write_handle, + MOJO_HANDLE_SIGNAL_WRITABLE, + MOJO_DEADLINE_INDEFINITE))) + break; + + if (!CheckResult(WriteMessageRaw(write_handle, + mbuf.get(), bytes_read, + hbuf.get(), handles_read, + MOJO_WRITE_MESSAGE_FLAG_NONE))) { + // On failure we own the handles. For now just close them. + if (handles_read) + CloseHandles(hbuf.get(), handles_read); + break; + } + } + } + + private: + friend class base::RefCountedThreadSafe<MessageProcessor>; + virtual ~MessageProcessor() {} + + bool CheckResult(MojoResult mr) { + if (mr == MOJO_RESULT_OK) + return true; + last_result_ = mr; + return false; + } + + MojoResult last_result_; + uint32_t bytes_transfered_; + uint32_t message_count_[2]; + uint32_t handle_count_[2]; +}; + +// In charge of intercepting access to the service manager. +class SpyInterceptor : public mojo::ServiceManager::Interceptor { + private: + virtual mojo::ScopedMessagePipeHandle OnConnectToClient( + const GURL& url, mojo::ScopedMessagePipeHandle real_client) OVERRIDE { + if (!MustIntercept(url)) + return real_client.Pass(); + + // You can get an invalid handle if the app (or service) is + // created by unconventional means, for example the command line. + if (!real_client.is_valid()) + return real_client.Pass(); + + mojo::ScopedMessagePipeHandle faux_client; + mojo::ScopedMessagePipeHandle interceptor; + CreateMessagePipe(NULL, &faux_client, &interceptor); + + scoped_refptr<MessageProcessor> processor = new MessageProcessor(); + base::WorkerPool::PostTask( + FROM_HERE, + base::Bind(&MessageProcessor::Start, + processor, + base::Passed(&real_client), base::Passed(&interceptor)), + true); + + return faux_client.Pass(); + } + + bool MustIntercept(const GURL& url) { + // TODO(cpu): manage who and when to intercept. + return true; + } +}; + +spy::WebSocketServer* ws_server = NULL; + +void StartServer(int port) { + // TODO(cpu) figure out lifetime of the server. See Spy() dtor. + ws_server = new spy::WebSocketServer(port); + ws_server->Start(); +} + +struct SpyOptions { + int websocket_port; + + SpyOptions() + : websocket_port(kDefaultWebSocketPort) { + } +}; + +SpyOptions ProcessOptions(const std::string& options) { + SpyOptions spy_options; + if (options.empty()) + return spy_options; + base::StringPairs kv_pairs; + base::SplitStringIntoKeyValuePairs(options, ':', ',', &kv_pairs); + base::StringPairs::iterator it = kv_pairs.begin(); + for (; it != kv_pairs.end(); ++it) { + if (it->first == "port") { + int port; + if (base::StringToInt(it->second, &port)) + spy_options.websocket_port = port; + } + } + return spy_options; +} + +} // namespace + +namespace mojo { + +Spy::Spy(mojo::ServiceManager* service_manager, const std::string& options) { + SpyOptions spy_options = ProcessOptions(options); + // Start the tread what will accept commands from the frontend. + control_thread_.reset(new base::Thread("mojo_spy_control_thread")); + base::Thread::Options thread_options(base::MessageLoop::TYPE_IO, 0); + control_thread_->StartWithOptions(thread_options); + control_thread_->message_loop_proxy()->PostTask( + FROM_HERE, base::Bind(&StartServer, spy_options.websocket_port)); + + // Start intercepting mojo services. + service_manager->SetInterceptor(new SpyInterceptor()); +} + +Spy::~Spy(){ + // TODO(cpu): Do not leak the interceptor. Lifetime between the + // service_manager and the spy is still unclear hence the leak. +} + +} // namespace mojo |