diff options
author | Zeno Albisser <zeno.albisser@digia.com> | 2013-08-15 21:46:11 +0200 |
---|---|---|
committer | Zeno Albisser <zeno.albisser@digia.com> | 2013-08-15 21:46:11 +0200 |
commit | 679147eead574d186ebf3069647b4c23e8ccace6 (patch) | |
tree | fc247a0ac8ff119f7c8550879ebb6d3dd8d1ff69 /chromium/content/browser/streams/stream.cc |
Initial import.
Diffstat (limited to 'chromium/content/browser/streams/stream.cc')
-rw-r--r-- | chromium/content/browser/streams/stream.cc | 156 |
1 files changed, 156 insertions, 0 deletions
diff --git a/chromium/content/browser/streams/stream.cc b/chromium/content/browser/streams/stream.cc new file mode 100644 index 00000000000..6026df9e7c7 --- /dev/null +++ b/chromium/content/browser/streams/stream.cc @@ -0,0 +1,156 @@ +// Copyright (c) 2013 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "content/browser/streams/stream.h" + +#include "base/bind.h" +#include "base/location.h" +#include "base/message_loop/message_loop_proxy.h" +#include "content/browser/streams/stream_handle_impl.h" +#include "content/browser/streams/stream_read_observer.h" +#include "content/browser/streams/stream_registry.h" +#include "content/browser/streams/stream_write_observer.h" +#include "net/base/io_buffer.h" + +namespace { +// Start throttling the connection at about 1MB. +const size_t kDeferSizeThreshold = 40 * 32768; +} + +namespace content { + +Stream::Stream(StreamRegistry* registry, + StreamWriteObserver* write_observer, + const GURL& url) + : data_bytes_read_(0), + can_add_data_(true), + url_(url), + data_length_(0), + registry_(registry), + read_observer_(NULL), + write_observer_(write_observer), + stream_handle_(NULL), + weak_ptr_factory_(this) { + CreateByteStream(base::MessageLoopProxy::current(), + base::MessageLoopProxy::current(), + kDeferSizeThreshold, + &writer_, + &reader_); + + // Setup callback for writing. + writer_->RegisterCallback(base::Bind(&Stream::OnSpaceAvailable, + weak_ptr_factory_.GetWeakPtr())); + reader_->RegisterCallback(base::Bind(&Stream::OnDataAvailable, + weak_ptr_factory_.GetWeakPtr())); + + registry_->RegisterStream(this); +} + +Stream::~Stream() { +} + +bool Stream::SetReadObserver(StreamReadObserver* observer) { + if (read_observer_) + return false; + read_observer_ = observer; + return true; +} + +void Stream::RemoveReadObserver(StreamReadObserver* observer) { + DCHECK(observer == read_observer_); + read_observer_ = NULL; +} + +void Stream::RemoveWriteObserver(StreamWriteObserver* observer) { + DCHECK(observer == write_observer_); + write_observer_ = NULL; +} + +void Stream::AddData(scoped_refptr<net::IOBuffer> buffer, size_t size) { + can_add_data_ = writer_->Write(buffer, size); +} + +void Stream::AddData(const char* data, size_t size) { + scoped_refptr<net::IOBuffer> io_buffer(new net::IOBuffer(size)); + memcpy(io_buffer->data(), data, size); + can_add_data_ = writer_->Write(io_buffer, size); +} + +void Stream::Finalize() { + writer_->Close(0); + writer_.reset(NULL); + + // Continue asynchronously. + base::MessageLoopProxy::current()->PostTask( + FROM_HERE, + base::Bind(&Stream::OnDataAvailable, weak_ptr_factory_.GetWeakPtr())); +} + +Stream::StreamState Stream::ReadRawData(net::IOBuffer* buf, + int buf_size, + int* bytes_read) { + DCHECK(buf); + DCHECK(bytes_read); + + *bytes_read = 0; + if (!data_.get()) { + data_length_ = 0; + data_bytes_read_ = 0; + ByteStreamReader::StreamState state = reader_->Read(&data_, &data_length_); + switch (state) { + case ByteStreamReader::STREAM_HAS_DATA: + break; + case ByteStreamReader::STREAM_COMPLETE: + registry_->UnregisterStream(url()); + return STREAM_COMPLETE; + case ByteStreamReader::STREAM_EMPTY: + return STREAM_EMPTY; + } + } + + const size_t remaining_bytes = data_length_ - data_bytes_read_; + size_t to_read = + static_cast<size_t>(buf_size) < remaining_bytes ? + buf_size : remaining_bytes; + memcpy(buf->data(), data_->data() + data_bytes_read_, to_read); + data_bytes_read_ += to_read; + if (data_bytes_read_ >= data_length_) + data_ = NULL; + + *bytes_read = to_read; + return STREAM_HAS_DATA; +} + +scoped_ptr<StreamHandle> Stream::CreateHandle(const GURL& original_url, + const std::string& mime_type) { + CHECK(!stream_handle_); + stream_handle_ = new StreamHandleImpl(weak_ptr_factory_.GetWeakPtr(), + original_url, + mime_type); + return scoped_ptr<StreamHandle>(stream_handle_).Pass(); +} + +void Stream::CloseHandle() { + // Prevent deletion until this function ends. + scoped_refptr<Stream> ref(this); + + CHECK(stream_handle_); + stream_handle_ = NULL; + registry_->UnregisterStream(url()); + if (write_observer_) + write_observer_->OnClose(this); +} + +void Stream::OnSpaceAvailable() { + can_add_data_ = true; + if (write_observer_) + write_observer_->OnSpaceAvailable(this); +} + +void Stream::OnDataAvailable() { + if (read_observer_) + read_observer_->OnDataAvailable(this); +} + +} // namespace content |