diff options
Diffstat (limited to 'chromium/net/spdy/spdy_stream.cc')
-rw-r--r-- | chromium/net/spdy/spdy_stream.cc | 402 |
1 files changed, 190 insertions, 212 deletions
diff --git a/chromium/net/spdy/spdy_stream.cc b/chromium/net/spdy/spdy_stream.cc index ccf48ecd7d2..40ec654bba4 100644 --- a/chromium/net/spdy/spdy_stream.cc +++ b/chromium/net/spdy/spdy_stream.cc @@ -85,32 +85,26 @@ SpdyStream::SpdyStream(SpdyStreamType type, int32 initial_recv_window_size, const BoundNetLog& net_log) : type_(type), - weak_ptr_factory_(this), - in_do_loop_(false), - continue_buffering_data_(type_ == SPDY_PUSH_STREAM), stream_id_(0), url_(url), priority_(priority), - slot_(0), send_stalled_by_flow_control_(false), send_window_size_(initial_send_window_size), recv_window_size_(initial_recv_window_size), unacked_recv_window_bytes_(0), session_(session), delegate_(NULL), - send_status_( - (type_ == SPDY_PUSH_STREAM) ? - NO_MORE_DATA_TO_SEND : MORE_DATA_TO_SEND), + pending_send_status_(MORE_DATA_TO_SEND), request_time_(base::Time::Now()), response_headers_status_(RESPONSE_HEADERS_ARE_INCOMPLETE), - io_state_((type_ == SPDY_PUSH_STREAM) ? STATE_IDLE : STATE_NONE), + io_state_(STATE_IDLE), response_status_(OK), net_log_(net_log), raw_received_bytes_(0), send_bytes_(0), recv_bytes_(0), - just_completed_frame_type_(DATA), - just_completed_frame_size_(0) { + write_handler_guard_(false), + weak_ptr_factory_(this) { CHECK(type_ == SPDY_BIDIRECTIONAL_STREAM || type_ == SPDY_REQUEST_RESPONSE_STREAM || type_ == SPDY_PUSH_STREAM); @@ -119,7 +113,7 @@ SpdyStream::SpdyStream(SpdyStreamType type, } SpdyStream::~SpdyStream() { - CHECK(!in_do_loop_); + CHECK(!write_handler_guard_); UpdateHistograms(); } @@ -128,20 +122,25 @@ void SpdyStream::SetDelegate(Delegate* delegate) { CHECK(delegate); delegate_ = delegate; - if (type_ == SPDY_PUSH_STREAM) { - DCHECK(continue_buffering_data_); + CHECK(io_state_ == STATE_IDLE || + io_state_ == STATE_HALF_CLOSED_LOCAL_UNCLAIMED || + io_state_ == STATE_RESERVED_REMOTE); + + if (io_state_ == STATE_HALF_CLOSED_LOCAL_UNCLAIMED) { + DCHECK_EQ(type_, SPDY_PUSH_STREAM); base::MessageLoop::current()->PostTask( FROM_HERE, - base::Bind(&SpdyStream::PushedStreamReplayData, GetWeakPtr())); + base::Bind(&SpdyStream::PushedStreamReplay, GetWeakPtr())); } } -void SpdyStream::PushedStreamReplayData() { +void SpdyStream::PushedStreamReplay() { DCHECK_EQ(type_, SPDY_PUSH_STREAM); DCHECK_NE(stream_id_, 0u); - DCHECK(continue_buffering_data_); + CHECK_EQ(stream_id_ % 2, 0u); - continue_buffering_data_ = false; + CHECK_EQ(io_state_, STATE_HALF_CLOSED_LOCAL_UNCLAIMED); + io_state_ = STATE_HALF_CLOSED_LOCAL; // The delegate methods called below may delete |this|, so use // |weak_this| to detect that. @@ -156,7 +155,7 @@ void SpdyStream::PushedStreamReplayData() { // we're waiting for another HEADERS frame, and we had better not // have any pending data frames. CHECK(weak_this); - if (!pending_buffers_.empty()) { + if (!pending_recv_data_.empty()) { LogStreamError(ERR_SPDY_PROTOCOL_ERROR, "Data received with incomplete headers."); session_->CloseActiveStream(stream_id_, ERR_SPDY_PROTOCOL_ERROR); @@ -170,10 +169,10 @@ void SpdyStream::PushedStreamReplayData() { response_headers_status_ = RESPONSE_HEADERS_ARE_COMPLETE; - while (!pending_buffers_.empty()) { - // Take ownership of the first element of |pending_buffers_|. - scoped_ptr<SpdyBuffer> buffer(pending_buffers_.front()); - pending_buffers_.weak_erase(pending_buffers_.begin()); + while (!pending_recv_data_.empty()) { + // Take ownership of the first element of |pending_recv_data_|. + scoped_ptr<SpdyBuffer> buffer(pending_recv_data_.front()); + pending_recv_data_.weak_erase(pending_recv_data_.begin()); bool eof = (buffer == NULL); @@ -185,31 +184,30 @@ void SpdyStream::PushedStreamReplayData() { return; if (eof) { - DCHECK(pending_buffers_.empty()); + DCHECK(pending_recv_data_.empty()); session_->CloseActiveStream(stream_id_, OK); DCHECK(!weak_this); - // |pending_buffers_| is invalid at this point. + // |pending_recv_data_| is invalid at this point. break; } } } scoped_ptr<SpdyFrame> SpdyStream::ProduceSynStreamFrame() { - CHECK_EQ(io_state_, STATE_SEND_REQUEST_HEADERS_COMPLETE); + CHECK_EQ(io_state_, STATE_IDLE); CHECK(request_headers_); CHECK_GT(stream_id_, 0u); SpdyControlFlags flags = - (send_status_ == NO_MORE_DATA_TO_SEND) ? + (pending_send_status_ == NO_MORE_DATA_TO_SEND) ? CONTROL_FLAG_FIN : CONTROL_FLAG_NONE; scoped_ptr<SpdyFrame> frame(session_->CreateSynStream( - stream_id_, priority_, slot_, flags, *request_headers_)); + stream_id_, priority_, flags, *request_headers_)); send_time_ = base::TimeTicks::Now(); return frame.Pass(); } void SpdyStream::DetachDelegate() { - CHECK(!in_do_loop_); DCHECK(!IsClosed()); delegate_ = NULL; Cancel(); @@ -399,7 +397,7 @@ int SpdyStream::OnInitialResponseHeadersReceived( case SPDY_BIDIRECTIONAL_STREAM: // For a bidirectional stream, we're ready for the response // headers once we've finished sending the request headers. - if (io_state_ < STATE_IDLE) { + if (io_state_ == STATE_IDLE) { session_->ResetStream(stream_id_, RST_STREAM_PROTOCOL_ERROR, "Response received before request sent"); return ERR_SPDY_PROTOCOL_ERROR; @@ -408,10 +406,8 @@ int SpdyStream::OnInitialResponseHeadersReceived( case SPDY_REQUEST_RESPONSE_STREAM: // For a request/response stream, we're ready for the response - // headers once we've finished sending the request headers and - // the request body (if we have one). - if ((io_state_ < STATE_IDLE) || (send_status_ == MORE_DATA_TO_SEND) || - pending_send_data_.get()) { + // headers once we've finished sending the request headers. + if (io_state_ == STATE_IDLE) { session_->ResetStream(stream_id_, RST_STREAM_PROTOCOL_ERROR, "Response received before request sent"); return ERR_SPDY_PROTOCOL_ERROR; @@ -419,15 +415,21 @@ int SpdyStream::OnInitialResponseHeadersReceived( break; case SPDY_PUSH_STREAM: - // For a push stream, we're ready immediately. - DCHECK_EQ(send_status_, NO_MORE_DATA_TO_SEND); - DCHECK_EQ(io_state_, STATE_IDLE); + // Push streams transition to a locally half-closed state upon headers. + // We must continue to buffer data while waiting for a call to + // SetDelegate() (which may not ever happen). + CHECK_EQ(io_state_, STATE_RESERVED_REMOTE); + if (!delegate_) { + io_state_ = STATE_HALF_CLOSED_LOCAL_UNCLAIMED; + } else { + io_state_ = STATE_HALF_CLOSED_LOCAL; + } break; } metrics_.StartStream(); - DCHECK_EQ(io_state_, STATE_IDLE); + DCHECK_NE(io_state_, STATE_IDLE); response_time_ = response_time; recv_first_byte_time_ = recv_first_byte_time; @@ -451,20 +453,30 @@ int SpdyStream::OnAdditionalResponseHeadersReceived( return MergeWithResponseHeaders(additional_response_headers); } +void SpdyStream::OnPushPromiseHeadersReceived(const SpdyHeaderBlock& headers) { + CHECK(!request_headers_.get()); + CHECK_EQ(io_state_, STATE_IDLE); + CHECK_EQ(type_, SPDY_PUSH_STREAM); + DCHECK(!delegate_); + + io_state_ = STATE_RESERVED_REMOTE; + request_headers_.reset(new SpdyHeaderBlock(headers)); +} + void SpdyStream::OnDataReceived(scoped_ptr<SpdyBuffer> buffer) { DCHECK(session_->IsStreamActive(stream_id_)); // If we're still buffering data for a push stream, we will do the // check for data received with incomplete headers in // PushedStreamReplayData(). - if (!delegate_ || continue_buffering_data_) { + if (io_state_ == STATE_HALF_CLOSED_LOCAL_UNCLAIMED) { DCHECK_EQ(type_, SPDY_PUSH_STREAM); // It should be valid for this to happen in the server push case. // We'll return received data when delegate gets attached to the stream. if (buffer) { - pending_buffers_.push_back(buffer.release()); + pending_recv_data_.push_back(buffer.release()); } else { - pending_buffers_.push_back(NULL); + pending_recv_data_.push_back(NULL); metrics_.StopStream(); // Note: we leave the stream open in the session until the stream // is claimed. @@ -485,8 +497,15 @@ void SpdyStream::OnDataReceived(scoped_ptr<SpdyBuffer> buffer) { if (!buffer) { metrics_.StopStream(); - // Deletes |this|. - session_->CloseActiveStream(stream_id_, OK); + if (io_state_ == STATE_OPEN) { + io_state_ = STATE_HALF_CLOSED_REMOTE; + } else if (io_state_ == STATE_HALF_CLOSED_LOCAL) { + io_state_ = STATE_CLOSED; + // Deletes |this|. + session_->CloseActiveStream(stream_id_, OK); + } else { + NOTREACHED() << io_state_; + } return; } @@ -509,16 +528,81 @@ void SpdyStream::OnDataReceived(scoped_ptr<SpdyBuffer> buffer) { void SpdyStream::OnFrameWriteComplete(SpdyFrameType frame_type, size_t frame_size) { + DCHECK_NE(type_, SPDY_PUSH_STREAM); + if (frame_size < session_->GetFrameMinimumSize() || frame_size > session_->GetFrameMaximumSize()) { NOTREACHED(); return; } - if (IsClosed()) + CHECK(frame_type == SYN_STREAM || + frame_type == DATA) << frame_type; + + int result = (frame_type == SYN_STREAM) ? + OnRequestHeadersSent() : OnDataSent(frame_size); + if (result == ERR_IO_PENDING) { + // The write operation hasn't completed yet. return; - just_completed_frame_type_ = frame_type; - just_completed_frame_size_ = frame_size; - DoLoop(OK); + } + + if (pending_send_status_ == NO_MORE_DATA_TO_SEND) { + if(io_state_ == STATE_OPEN) { + io_state_ = STATE_HALF_CLOSED_LOCAL; + } else if(io_state_ == STATE_HALF_CLOSED_REMOTE) { + io_state_ = STATE_CLOSED; + } else { + NOTREACHED() << io_state_; + } + } + // Notify delegate of write completion. Must not destroy |this|. + CHECK(delegate_); + { + base::WeakPtr<SpdyStream> weak_this = GetWeakPtr(); + write_handler_guard_ = true; + if (frame_type == SYN_STREAM) { + delegate_->OnRequestHeadersSent(); + } else { + delegate_->OnDataSent(); + } + CHECK(weak_this); + write_handler_guard_ = false; + } + + if (io_state_ == STATE_CLOSED) { + // Deletes |this|. + session_->CloseActiveStream(stream_id_, OK); + } +} + +int SpdyStream::OnRequestHeadersSent() { + CHECK_EQ(io_state_, STATE_IDLE); + CHECK_NE(stream_id_, 0u); + + io_state_ = STATE_OPEN; + return OK; +} + +int SpdyStream::OnDataSent(size_t frame_size) { + CHECK(io_state_ == STATE_OPEN || + io_state_ == STATE_HALF_CLOSED_REMOTE) << io_state_; + + size_t frame_payload_size = frame_size - session_->GetDataFrameMinimumSize(); + + CHECK_GE(frame_size, session_->GetDataFrameMinimumSize()); + CHECK_LE(frame_payload_size, session_->GetDataFrameMaximumPayload()); + + send_bytes_ += frame_payload_size; + + // If more data is available to send, dispatch it and + // return that the write operation is still ongoing. + pending_send_data_->DidConsume(frame_payload_size); + if (pending_send_data_->BytesRemaining() > 0) { + QueueNextDataFrame(); + return ERR_IO_PENDING; + } else { + pending_send_data_ = NULL; + return OK; + } } SpdyMajorVersion SpdyStream::GetProtocolVersion() const { @@ -532,7 +616,8 @@ void SpdyStream::LogStreamError(int status, const std::string& description) { } void SpdyStream::OnClose(int status) { - CHECK(!in_do_loop_); + // In most cases, the stream should already be CLOSED. The exception is when a + // SpdySession is shutting down while the stream is in an intermediate state. io_state_ = STATE_CLOSED; response_status_ = status; Delegate* delegate = delegate_; @@ -544,7 +629,6 @@ void SpdyStream::OnClose(int status) { } void SpdyStream::Cancel() { - CHECK(!in_do_loop_); // We may be called again from a delegate's OnClose(). if (io_state_ == STATE_CLOSED) return; @@ -558,7 +642,6 @@ void SpdyStream::Cancel() { } void SpdyStream::Close() { - CHECK(!in_do_loop_); // We may be called again from a delegate's OnClose(). if (io_state_ == STATE_CLOSED) return; @@ -578,25 +661,29 @@ base::WeakPtr<SpdyStream> SpdyStream::GetWeakPtr() { int SpdyStream::SendRequestHeaders(scoped_ptr<SpdyHeaderBlock> request_headers, SpdySendStatus send_status) { CHECK_NE(type_, SPDY_PUSH_STREAM); - CHECK_EQ(send_status_, MORE_DATA_TO_SEND); + CHECK_EQ(pending_send_status_, MORE_DATA_TO_SEND); CHECK(!request_headers_); CHECK(!pending_send_data_.get()); - CHECK_EQ(io_state_, STATE_NONE); + CHECK_EQ(io_state_, STATE_IDLE); request_headers_ = request_headers.Pass(); - send_status_ = send_status; - io_state_ = STATE_SEND_REQUEST_HEADERS; - return DoLoop(OK); + pending_send_status_ = send_status; + session_->EnqueueStreamWrite( + GetWeakPtr(), SYN_STREAM, + scoped_ptr<SpdyBufferProducer>( + new SynStreamBufferProducer(GetWeakPtr()))); + return ERR_IO_PENDING; } void SpdyStream::SendData(IOBuffer* data, int length, SpdySendStatus send_status) { CHECK_NE(type_, SPDY_PUSH_STREAM); - CHECK_EQ(send_status_, MORE_DATA_TO_SEND); - CHECK_GE(io_state_, STATE_SEND_REQUEST_HEADERS_COMPLETE); + CHECK_EQ(pending_send_status_, MORE_DATA_TO_SEND); + CHECK(io_state_ == STATE_OPEN || + io_state_ == STATE_HALF_CLOSED_REMOTE) << io_state_; CHECK(!pending_send_data_.get()); pending_send_data_ = new DrainableIOBuffer(data, length); - send_status_ = send_status; + pending_send_status_ = send_status; QueueNextDataFrame(); } @@ -612,8 +699,9 @@ bool SpdyStream::GetSSLCertRequestInfo(SSLCertRequestInfo* cert_request_info) { } void SpdyStream::PossiblyResumeIfSendStalled() { - DCHECK(!IsClosed()); - + if (IsLocallyClosed()) { + return; + } if (send_stalled_by_flow_control_ && !session_->IsSendStalled() && send_window_size_ > 0) { net_log_.AddEvent( @@ -628,10 +716,24 @@ bool SpdyStream::IsClosed() const { return io_state_ == STATE_CLOSED; } +bool SpdyStream::IsLocallyClosed() const { + return io_state_ == STATE_HALF_CLOSED_LOCAL_UNCLAIMED || + io_state_ == STATE_HALF_CLOSED_LOCAL || + io_state_ == STATE_CLOSED; +} + bool SpdyStream::IsIdle() const { return io_state_ == STATE_IDLE; } +bool SpdyStream::IsOpen() const { + return io_state_ == STATE_OPEN; +} + +bool SpdyStream::IsReservedRemote() const { + return io_state_ == STATE_RESERVED_REMOTE; +} + NextProto SpdyStream::GetProtocol() const { return session_->protocol(); } @@ -644,165 +746,17 @@ bool SpdyStream::GetLoadTimingInfo(LoadTimingInfo* load_timing_info) const { } GURL SpdyStream::GetUrlFromHeaders() const { - if (type_ != SPDY_PUSH_STREAM && !request_headers_) + if (!request_headers_) return GURL(); - const SpdyHeaderBlock& headers = - (type_ == SPDY_PUSH_STREAM) ? response_headers_ : *request_headers_; - return GetUrlFromHeaderBlock(headers, GetProtocolVersion(), - type_ == SPDY_PUSH_STREAM); + return GetUrlFromHeaderBlock( + *request_headers_, GetProtocolVersion(), type_ == SPDY_PUSH_STREAM); } bool SpdyStream::HasUrlFromHeaders() const { return !GetUrlFromHeaders().is_empty(); } -int SpdyStream::DoLoop(int result) { - CHECK(!in_do_loop_); - in_do_loop_ = true; - - do { - State state = io_state_; - io_state_ = STATE_NONE; - switch (state) { - case STATE_SEND_REQUEST_HEADERS: - CHECK_EQ(result, OK); - result = DoSendRequestHeaders(); - break; - case STATE_SEND_REQUEST_HEADERS_COMPLETE: - CHECK_EQ(result, OK); - result = DoSendRequestHeadersComplete(); - break; - - // For request/response streams, no data is sent from the client - // while in the OPEN state, so OnFrameWriteComplete is never - // called here. The HTTP body is handled in the OnDataReceived - // callback, which does not call into DoLoop. - // - // For bidirectional streams, we'll send and receive data once - // the connection is established. Received data is handled in - // OnDataReceived. Sent data is handled in - // OnFrameWriteComplete, which calls DoOpen(). - case STATE_IDLE: - CHECK_EQ(result, OK); - result = DoOpen(); - break; - - case STATE_CLOSED: - DCHECK_NE(result, ERR_IO_PENDING); - break; - default: - NOTREACHED() << io_state_; - break; - } - } while (result != ERR_IO_PENDING && io_state_ != STATE_NONE && - io_state_ != STATE_IDLE); - - CHECK(in_do_loop_); - in_do_loop_ = false; - - return result; -} - -int SpdyStream::DoSendRequestHeaders() { - DCHECK_NE(type_, SPDY_PUSH_STREAM); - io_state_ = STATE_SEND_REQUEST_HEADERS_COMPLETE; - - session_->EnqueueStreamWrite( - GetWeakPtr(), SYN_STREAM, - scoped_ptr<SpdyBufferProducer>( - new SynStreamBufferProducer(GetWeakPtr()))); - return ERR_IO_PENDING; -} - -namespace { - -// Assuming we're in STATE_IDLE, maps the given type (which must not -// be SPDY_PUSH_STREAM) and send status to a result to return from -// DoSendRequestHeadersComplete() or DoOpen(). -int GetOpenStateResult(SpdyStreamType type, SpdySendStatus send_status) { - switch (type) { - case SPDY_BIDIRECTIONAL_STREAM: - // For bidirectional streams, there's nothing else to do. - DCHECK_EQ(send_status, MORE_DATA_TO_SEND); - return OK; - - case SPDY_REQUEST_RESPONSE_STREAM: - // For request/response streams, wait for the delegate to send - // data if there's request data to send; we'll get called back - // when the send finishes. - if (send_status == MORE_DATA_TO_SEND) - return ERR_IO_PENDING; - - return OK; - - case SPDY_PUSH_STREAM: - // This should never be called for push streams. - break; - } - - CHECK(false); - return ERR_UNEXPECTED; -} - -} // namespace - -int SpdyStream::DoSendRequestHeadersComplete() { - DCHECK_NE(type_, SPDY_PUSH_STREAM); - DCHECK_EQ(just_completed_frame_type_, SYN_STREAM); - DCHECK_NE(stream_id_, 0u); - - io_state_ = STATE_IDLE; - - CHECK(delegate_); - // Must not close |this|; if it does, it will trigger the |in_do_loop_| - // check in the destructor. - delegate_->OnRequestHeadersSent(); - - return GetOpenStateResult(type_, send_status_); -} - -int SpdyStream::DoOpen() { - DCHECK_NE(type_, SPDY_PUSH_STREAM); - - if (just_completed_frame_type_ != DATA) { - NOTREACHED(); - return ERR_UNEXPECTED; - } - - if (just_completed_frame_size_ < session_->GetDataFrameMinimumSize()) { - NOTREACHED(); - return ERR_UNEXPECTED; - } - - size_t frame_payload_size = - just_completed_frame_size_ - session_->GetDataFrameMinimumSize(); - if (frame_payload_size > session_->GetDataFrameMaximumPayload()) { - NOTREACHED(); - return ERR_UNEXPECTED; - } - - // Set |io_state_| first as |delegate_| may check it. - io_state_ = STATE_IDLE; - - send_bytes_ += frame_payload_size; - - pending_send_data_->DidConsume(frame_payload_size); - if (pending_send_data_->BytesRemaining() > 0) { - QueueNextDataFrame(); - return ERR_IO_PENDING; - } - - pending_send_data_ = NULL; - - CHECK(delegate_); - // Must not close |this|; if it does, it will trigger the - // |in_do_loop_| check in the destructor. - delegate_->OnDataSent(); - - return GetOpenStateResult(type_, send_status_); -} - void SpdyStream::UpdateHistograms() { // We need at least the receive timers to be filled in, as otherwise // metrics can be bogus. @@ -836,13 +790,14 @@ void SpdyStream::UpdateHistograms() { void SpdyStream::QueueNextDataFrame() { // Until the request has been completely sent, we cannot be sure // that our stream_id is correct. - DCHECK_GT(io_state_, STATE_SEND_REQUEST_HEADERS_COMPLETE); + CHECK(io_state_ == STATE_OPEN || + io_state_ == STATE_HALF_CLOSED_REMOTE) << io_state_; CHECK_GT(stream_id_, 0u); CHECK(pending_send_data_.get()); CHECK_GT(pending_send_data_->BytesRemaining(), 0); SpdyDataFlags flags = - (send_status_ == NO_MORE_DATA_TO_SEND) ? + (pending_send_status_ == NO_MORE_DATA_TO_SEND) ? DATA_FLAG_FIN : DATA_FLAG_NONE; scoped_ptr<SpdyBuffer> data_buffer( session_->CreateDataBuffer(stream_id_, @@ -931,4 +886,27 @@ int SpdyStream::MergeWithResponseHeaders( return OK; } +#define STATE_CASE(s) \ + case s: \ + description = base::StringPrintf("%s (0x%08X)", #s, s); \ + break + +std::string SpdyStream::DescribeState(State state) { + std::string description; + switch (state) { + STATE_CASE(STATE_IDLE); + STATE_CASE(STATE_OPEN); + STATE_CASE(STATE_HALF_CLOSED_LOCAL_UNCLAIMED); + STATE_CASE(STATE_HALF_CLOSED_LOCAL); + STATE_CASE(STATE_CLOSED); + default: + description = base::StringPrintf("Unknown state 0x%08X (%u)", state, + state); + break; + } + return description; +} + +#undef STATE_CASE + } // namespace net |