summaryrefslogtreecommitdiffstats
path: root/src/core/net/custom_url_loader_factory.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/net/custom_url_loader_factory.cpp')
-rw-r--r--src/core/net/custom_url_loader_factory.cpp98
1 files changed, 65 insertions, 33 deletions
diff --git a/src/core/net/custom_url_loader_factory.cpp b/src/core/net/custom_url_loader_factory.cpp
index 8aa0c0578..555eccf42 100644
--- a/src/core/net/custom_url_loader_factory.cpp
+++ b/src/core/net/custom_url_loader_factory.cpp
@@ -45,7 +45,7 @@
#include "content/public/browser/browser_thread.h"
#include "mojo/public/cpp/bindings/binding_set.h"
#include "mojo/public/cpp/system/data_pipe.h"
-#include "mojo/public/cpp/system/data_pipe_producer.h"
+#include "mojo/public/cpp/system/simple_watcher.h"
#include "net/base/net_errors.h"
#include "net/http/http_status_code.h"
#include "net/http/http_util.h"
@@ -123,8 +123,9 @@ private:
, m_client(std::move(client_info))
, m_request(request)
{
+ DCHECK(m_taskRunner->RunsTasksInCurrentSequence());
m_binding.set_connection_error_handler(
- base::BindOnce(&CustomURLLoader::OnConnectionError, base::Unretained(this)));
+ base::BindOnce(&CustomURLLoader::OnConnectionError, m_weakPtrFactory.GetWeakPtr()));
m_firstBytePosition = 0;
m_device = nullptr;
m_error = 0;
@@ -136,6 +137,7 @@ private:
void Start()
{
+ DCHECK(m_taskRunner->RunsTasksInCurrentSequence());
m_head.request_start = base::TimeTicks::Now();
if (!m_pipe.consumer_handle.is_valid())
@@ -166,12 +168,14 @@ private:
void CompleteWithFailure(net::Error net_error)
{
+ DCHECK(m_taskRunner->RunsTasksInCurrentSequence());
m_client->OnComplete(network::URLLoaderCompletionStatus(net_error));
ClearProxyAndClient(false);
}
void OnConnectionError()
{
+ DCHECK(m_taskRunner->RunsTasksInCurrentSequence());
m_binding.Close();
if (m_client.is_bound())
ClearProxyAndClient(false);
@@ -181,6 +185,7 @@ private:
void OnTransferComplete(MojoResult result)
{
+ DCHECK(m_taskRunner->RunsTasksInCurrentSequence());
if (result == MOJO_RESULT_OK) {
network::URLLoaderCompletionStatus status(net::OK);
status.encoded_data_length = m_totalBytesRead + m_head.headers->raw_headers().length();
@@ -195,6 +200,7 @@ private:
void ClearProxyAndClient(bool wait_for_loader_error = false)
{
+ DCHECK(m_taskRunner->RunsTasksInCurrentSequence());
m_proxy->m_client = nullptr;
m_client.reset();
if (m_device && m_device->isOpen())
@@ -210,15 +216,12 @@ private:
// URLRequestCustomJobProxy::Client:
void notifyExpectedContentSize(qint64 size) override
{
+ DCHECK(m_taskRunner->RunsTasksInCurrentSequence());
m_head.content_length = size;
}
void notifyHeadersComplete() override
{
- m_taskRunner->PostTask(FROM_HERE,
- base::BindOnce(&CustomURLLoader::reportHeadersComplete, base::Unretained(this)));
- }
- void reportHeadersComplete()
- {
+ DCHECK(m_taskRunner->RunsTasksInCurrentSequence());
DCHECK(!m_error);
m_head.response_start = base::TimeTicks::Now();
@@ -267,17 +270,28 @@ private:
m_client->OnStartLoadingResponseBody(std::move(m_pipe.consumer_handle));
readAvailableData();
+ if (m_device) {
+ m_watcher = std::make_unique<mojo::SimpleWatcher>(
+ FROM_HERE, mojo::SimpleWatcher::ArmingPolicy::AUTOMATIC, m_taskRunner);
+ m_watcher->Watch(m_pipe.producer_handle.get(), MOJO_HANDLE_SIGNAL_WRITABLE,
+ MOJO_WATCH_CONDITION_SATISFIED,
+ base::BindRepeating(&CustomURLLoader::notifyReadyWrite,
+ m_weakPtrFactory.GetWeakPtr()));
+ }
}
void notifyCanceled() override
{
+ DCHECK(m_taskRunner->RunsTasksInCurrentSequence());
OnTransferComplete(MOJO_RESULT_CANCELLED);
}
void notifyAborted() override
{
+ DCHECK(m_taskRunner->RunsTasksInCurrentSequence());
notifyStartFailure(net::ERR_ABORTED);
}
void notifyStartFailure(int error) override
{
+ DCHECK(m_taskRunner->RunsTasksInCurrentSequence());
m_head.response_start = base::TimeTicks::Now();
std::string headers;
switch (error) {
@@ -308,56 +322,74 @@ private:
}
void notifyReadyRead() override
{
- m_taskRunner->PostTask(FROM_HERE,
- base::BindOnce(&CustomURLLoader::readAvailableData, base::Unretained(this)));
+ DCHECK(m_taskRunner->RunsTasksInCurrentSequence());
+ readAvailableData();
}
- void readAvailableData()
+ void notifyReadyWrite(MojoResult result, const mojo::HandleSignalsState &state)
{
- if (m_error) {
- CompleteWithFailure(net::Error(m_error));
- return;
- }
- if (!m_device) {
+ DCHECK(m_taskRunner->RunsTasksInCurrentSequence());
+ if (result != MOJO_RESULT_OK) {
CompleteWithFailure(net::ERR_FAILED);
return;
}
- char buffer[2048];
- do {
- int read_size = m_device->read(buffer, 2048);
- if (m_error) {
- CompleteWithFailure(net::Error(m_error));
- return;
- }
- if (read_size > 0) {
- uint32_t read_bytes = read_size;
- m_pipe.producer_handle->WriteData(buffer, &read_bytes, MOJO_WRITE_DATA_FLAG_NONE);
- m_totalBytesRead += read_bytes;
- } else if (read_size < 0 && !m_device->atEnd()) {
- CompleteWithFailure(net::ERR_FAILED);
+ readAvailableData();
+ }
+ void readAvailableData()
+ {
+ DCHECK(m_taskRunner->RunsTasksInCurrentSequence());
+ for (;;) {
+ if (m_error || !m_device)
+ break;
+
+ void *buffer = nullptr;
+ uint32_t bufferSize = 0;
+ MojoResult beginResult = m_pipe.producer_handle->BeginWriteData(
+ &buffer, &bufferSize, MOJO_BEGIN_WRITE_DATA_FLAG_NONE);
+ if (beginResult == MOJO_RESULT_SHOULD_WAIT)
+ return; // Wait for pipe watcher
+ if (beginResult != MOJO_RESULT_OK)
+ break;
+
+ int readResult = m_device->read(static_cast<char *>(buffer), bufferSize);
+ uint32_t bytesRead = std::max(readResult, 0);
+ m_pipe.producer_handle->EndWriteData(bytesRead);
+ m_totalBytesRead += bytesRead;
+ m_client->OnTransferSizeUpdated(m_totalBytesRead);
+
+ if (m_device->atEnd()) {
+ OnTransferComplete(MOJO_RESULT_OK);
return;
}
- } while (m_device->bytesAvailable());
- m_client->OnTransferSizeUpdated(m_totalBytesRead);
- if (m_device->atEnd())
- OnTransferComplete(MOJO_RESULT_OK);
+
+ if (readResult == 0)
+ return; // Wait for readyRead
+ if (readResult < 0)
+ break;
+ }
+
+ CompleteWithFailure(m_error ? net::Error(m_error) : net::ERR_FAILED);
}
base::TaskRunner *taskRunner() override
{
+ DCHECK(m_taskRunner->RunsTasksInCurrentSequence());
return m_taskRunner.get();
}
- scoped_refptr<base::TaskRunner> m_taskRunner;
+ scoped_refptr<base::SequencedTaskRunner> m_taskRunner;
scoped_refptr<URLRequestCustomJobProxy> m_proxy;
mojo::Binding<network::mojom::URLLoader> m_binding;
network::mojom::URLLoaderClientPtr m_client;
mojo::DataPipe m_pipe;
+ std::unique_ptr<mojo::SimpleWatcher> m_watcher;
network::ResourceRequest m_request;
network::ResourceResponseHead m_head;
qint64 m_totalBytesRead = 0;
bool m_corsEnabled;
+ base::WeakPtrFactory<CustomURLLoader> m_weakPtrFactory{this};
+
DISALLOW_COPY_AND_ASSIGN(CustomURLLoader);
};