diff options
Diffstat (limited to 'chromium/media/filters/decoder_stream.cc')
-rw-r--r-- | chromium/media/filters/decoder_stream.cc | 598 |
1 files changed, 598 insertions, 0 deletions
diff --git a/chromium/media/filters/decoder_stream.cc b/chromium/media/filters/decoder_stream.cc new file mode 100644 index 00000000000..a912398f539 --- /dev/null +++ b/chromium/media/filters/decoder_stream.cc @@ -0,0 +1,598 @@ +// Copyright 2014 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "media/filters/decoder_stream.h" + +#include "base/bind.h" +#include "base/callback_helpers.h" +#include "base/debug/trace_event.h" +#include "base/location.h" +#include "base/logging.h" +#include "base/single_thread_task_runner.h" +#include "media/base/audio_decoder.h" +#include "media/base/bind_to_current_loop.h" +#include "media/base/decoder_buffer.h" +#include "media/base/demuxer_stream.h" +#include "media/base/video_decoder.h" +#include "media/filters/decrypting_demuxer_stream.h" + +namespace media { + +// TODO(rileya): Devise a better way of specifying trace/UMA/etc strings for +// templated classes such as this. +template <DemuxerStream::Type StreamType> +static const char* GetTraceString(); + +#define FUNCTION_DVLOG(level) \ + DVLOG(level) << __FUNCTION__ << \ + "<" << DecoderStreamTraits<StreamType>::ToString() << ">" + +template <> +const char* GetTraceString<DemuxerStream::VIDEO>() { + return "DecoderStream<VIDEO>::Decode"; +} + +template <> +const char* GetTraceString<DemuxerStream::AUDIO>() { + return "DecoderStream<AUDIO>::Decode"; +} + +template <DemuxerStream::Type StreamType> +DecoderStream<StreamType>::DecoderStream( + const scoped_refptr<base::SingleThreadTaskRunner>& task_runner, + ScopedVector<Decoder> decoders, + const SetDecryptorReadyCB& set_decryptor_ready_cb) + : task_runner_(task_runner), + state_(STATE_UNINITIALIZED), + stream_(NULL), + decoder_selector_( + new DecoderSelector<StreamType>(task_runner, + decoders.Pass(), + set_decryptor_ready_cb)), + active_splice_(false), + pending_decode_requests_(0), + weak_factory_(this) {} + +template <DemuxerStream::Type StreamType> +DecoderStream<StreamType>::~DecoderStream() { + DCHECK(state_ == STATE_UNINITIALIZED || state_ == STATE_STOPPED) << state_; +} + +template <DemuxerStream::Type StreamType> +void DecoderStream<StreamType>::Initialize(DemuxerStream* stream, + bool low_delay, + const StatisticsCB& statistics_cb, + const InitCB& init_cb) { + FUNCTION_DVLOG(2); + DCHECK(task_runner_->BelongsToCurrentThread()); + DCHECK_EQ(state_, STATE_UNINITIALIZED) << state_; + DCHECK(init_cb_.is_null()); + DCHECK(!init_cb.is_null()); + + statistics_cb_ = statistics_cb; + init_cb_ = init_cb; + stream_ = stream; + low_delay_ = low_delay; + + state_ = STATE_INITIALIZING; + // TODO(xhwang): DecoderSelector only needs a config to select a decoder. + decoder_selector_->SelectDecoder( + stream, low_delay, + base::Bind(&DecoderStream<StreamType>::OnDecoderSelected, + weak_factory_.GetWeakPtr()), + base::Bind(&DecoderStream<StreamType>::OnDecodeOutputReady, + weak_factory_.GetWeakPtr())); +} + +template <DemuxerStream::Type StreamType> +void DecoderStream<StreamType>::Read(const ReadCB& read_cb) { + FUNCTION_DVLOG(2); + DCHECK(task_runner_->BelongsToCurrentThread()); + DCHECK(state_ != STATE_UNINITIALIZED && state_ != STATE_INITIALIZING && + state_ != STATE_STOPPED) << state_; + // No two reads in the flight at any time. + DCHECK(read_cb_.is_null()); + // No read during resetting or stopping process. + DCHECK(reset_cb_.is_null()); + DCHECK(stop_cb_.is_null()); + + if (state_ == STATE_ERROR) { + task_runner_->PostTask( + FROM_HERE, base::Bind(read_cb, DECODE_ERROR, scoped_refptr<Output>())); + return; + } + + if (state_ == STATE_END_OF_STREAM && ready_outputs_.empty()) { + task_runner_->PostTask( + FROM_HERE, base::Bind(read_cb, OK, StreamTraits::CreateEOSOutput())); + return; + } + + if (!ready_outputs_.empty()) { + task_runner_->PostTask(FROM_HERE, + base::Bind(read_cb, OK, ready_outputs_.front())); + ready_outputs_.pop_front(); + } else { + read_cb_ = read_cb; + } + + if (state_ == STATE_NORMAL && CanDecodeMore()) + ReadFromDemuxerStream(); +} + +template <DemuxerStream::Type StreamType> +void DecoderStream<StreamType>::Reset(const base::Closure& closure) { + FUNCTION_DVLOG(2); + DCHECK(task_runner_->BelongsToCurrentThread()); + DCHECK(state_ != STATE_UNINITIALIZED && state_ != STATE_STOPPED) << state_; + DCHECK(reset_cb_.is_null()); + DCHECK(stop_cb_.is_null()); + + reset_cb_ = closure; + + if (!read_cb_.is_null()) { + task_runner_->PostTask(FROM_HERE, base::Bind( + base::ResetAndReturn(&read_cb_), ABORTED, scoped_refptr<Output>())); + } + + ready_outputs_.clear(); + + // During decoder reinitialization, the Decoder does not need to be and + // cannot be Reset(). |decrypting_demuxer_stream_| was reset before decoder + // reinitialization. + if (state_ == STATE_REINITIALIZING_DECODER) + return; + + // During pending demuxer read and when not using DecryptingDemuxerStream, + // the Decoder will be reset after demuxer read is returned + // (in OnBufferReady()). + if (state_ == STATE_PENDING_DEMUXER_READ && !decrypting_demuxer_stream_) + return; + + if (decrypting_demuxer_stream_) { + decrypting_demuxer_stream_->Reset(base::Bind( + &DecoderStream<StreamType>::ResetDecoder, weak_factory_.GetWeakPtr())); + return; + } + + ResetDecoder(); +} + +template <DemuxerStream::Type StreamType> +void DecoderStream<StreamType>::Stop(const base::Closure& closure) { + FUNCTION_DVLOG(2); + DCHECK(task_runner_->BelongsToCurrentThread()); + DCHECK_NE(state_, STATE_STOPPED) << state_; + DCHECK(stop_cb_.is_null()); + + stop_cb_ = closure; + + if (state_ == STATE_INITIALIZING) { + decoder_selector_->Abort(); + return; + } + + DCHECK(init_cb_.is_null()); + + // All pending callbacks will be dropped. + weak_factory_.InvalidateWeakPtrs(); + + // Post callbacks to prevent reentrance into this object. + if (!read_cb_.is_null()) { + task_runner_->PostTask(FROM_HERE, base::Bind( + base::ResetAndReturn(&read_cb_), ABORTED, scoped_refptr<Output>())); + } + if (!reset_cb_.is_null()) + task_runner_->PostTask(FROM_HERE, base::ResetAndReturn(&reset_cb_)); + + if (decrypting_demuxer_stream_) { + decrypting_demuxer_stream_->Stop(base::Bind( + &DecoderStream<StreamType>::StopDecoder, weak_factory_.GetWeakPtr())); + return; + } + + // We may not have a |decoder_| if Stop() was called during initialization. + if (decoder_) { + StopDecoder(); + return; + } + + state_ = STATE_STOPPED; + stream_ = NULL; + decoder_.reset(); + decrypting_demuxer_stream_.reset(); + task_runner_->PostTask(FROM_HERE, base::ResetAndReturn(&stop_cb_)); +} + +template <DemuxerStream::Type StreamType> +bool DecoderStream<StreamType>::CanReadWithoutStalling() const { + DCHECK(task_runner_->BelongsToCurrentThread()); + return !ready_outputs_.empty() || decoder_->CanReadWithoutStalling(); +} + +template <> +bool DecoderStream<DemuxerStream::AUDIO>::CanReadWithoutStalling() const { + DCHECK(task_runner_->BelongsToCurrentThread()); + return true; +} + +template <DemuxerStream::Type StreamType> +int DecoderStream<StreamType>::GetMaxDecodeRequests() const { + return decoder_->GetMaxDecodeRequests(); +} + +template <> +int DecoderStream<DemuxerStream::AUDIO>::GetMaxDecodeRequests() const { + return 1; +} + +template <DemuxerStream::Type StreamType> +bool DecoderStream<StreamType>::CanDecodeMore() const { + DCHECK(task_runner_->BelongsToCurrentThread()); + + // Limit total number of outputs stored in |ready_outputs_| and being decoded. + // It only makes sense to saturate decoder completely when output queue is + // empty. + int num_decodes = + static_cast<int>(ready_outputs_.size()) + pending_decode_requests_; + return num_decodes < GetMaxDecodeRequests(); +} + +template <DemuxerStream::Type StreamType> +void DecoderStream<StreamType>::OnDecoderSelected( + scoped_ptr<Decoder> selected_decoder, + scoped_ptr<DecryptingDemuxerStream> decrypting_demuxer_stream) { + FUNCTION_DVLOG(2); + DCHECK(task_runner_->BelongsToCurrentThread()); + DCHECK_EQ(state_, STATE_INITIALIZING) << state_; + DCHECK(!init_cb_.is_null()); + DCHECK(read_cb_.is_null()); + DCHECK(reset_cb_.is_null()); + + decoder_selector_.reset(); + if (decrypting_demuxer_stream) + stream_ = decrypting_demuxer_stream.get(); + + if (!selected_decoder) { + state_ = STATE_UNINITIALIZED; + StreamTraits::FinishInitialization( + base::ResetAndReturn(&init_cb_), selected_decoder.get(), stream_); + } else { + state_ = STATE_NORMAL; + decoder_ = selected_decoder.Pass(); + decrypting_demuxer_stream_ = decrypting_demuxer_stream.Pass(); + StreamTraits::FinishInitialization( + base::ResetAndReturn(&init_cb_), decoder_.get(), stream_); + } + + // Stop() called during initialization. + if (!stop_cb_.is_null()) { + Stop(base::ResetAndReturn(&stop_cb_)); + return; + } +} + +template <DemuxerStream::Type StreamType> +void DecoderStream<StreamType>::SatisfyRead( + Status status, + const scoped_refptr<Output>& output) { + DCHECK(!read_cb_.is_null()); + base::ResetAndReturn(&read_cb_).Run(status, output); +} + +template <DemuxerStream::Type StreamType> +void DecoderStream<StreamType>::Decode( + const scoped_refptr<DecoderBuffer>& buffer) { + FUNCTION_DVLOG(2); + DCHECK(state_ == STATE_NORMAL || state_ == STATE_FLUSHING_DECODER) << state_; + DCHECK_LT(pending_decode_requests_, GetMaxDecodeRequests()); + DCHECK(reset_cb_.is_null()); + DCHECK(stop_cb_.is_null()); + DCHECK(buffer); + + int buffer_size = buffer->end_of_stream() ? 0 : buffer->data_size(); + + TRACE_EVENT_ASYNC_BEGIN0("media", GetTraceString<StreamType>(), this); + ++pending_decode_requests_; + decoder_->Decode(buffer, + base::Bind(&DecoderStream<StreamType>::OnDecodeDone, + weak_factory_.GetWeakPtr(), + buffer_size, + buffer->end_of_stream())); +} + +template <DemuxerStream::Type StreamType> +void DecoderStream<StreamType>::FlushDecoder() { + Decode(DecoderBuffer::CreateEOSBuffer()); +} + +template <DemuxerStream::Type StreamType> +void DecoderStream<StreamType>::OnDecodeDone(int buffer_size, + bool end_of_stream, + typename Decoder::Status status) { + FUNCTION_DVLOG(2) << status; + DCHECK(state_ == STATE_NORMAL || state_ == STATE_FLUSHING_DECODER || + state_ == STATE_PENDING_DEMUXER_READ || state_ == STATE_ERROR) + << state_; + DCHECK(stop_cb_.is_null()); + DCHECK_GT(pending_decode_requests_, 0); + + --pending_decode_requests_; + + TRACE_EVENT_ASYNC_END0("media", GetTraceString<StreamType>(), this); + + if (state_ == STATE_ERROR) { + DCHECK(read_cb_.is_null()); + return; + } + + // Drop decoding result if Reset() was called during decoding. + // The resetting process will be handled when the decoder is reset. + if (!reset_cb_.is_null()) + return; + + switch (status) { + case Decoder::kDecodeError: + case Decoder::kDecryptError: + state_ = STATE_ERROR; + ready_outputs_.clear(); + if (!read_cb_.is_null()) + SatisfyRead(DECODE_ERROR, NULL); + return; + + case Decoder::kAborted: + // Decoder can return kAborted only when Reset is pending. + NOTREACHED(); + return; + + case Decoder::kOk: + // Any successful decode counts! + if (buffer_size > 0) + StreamTraits::ReportStatistics(statistics_cb_, buffer_size); + + if (state_ == STATE_NORMAL) { + if (end_of_stream) { + state_ = STATE_END_OF_STREAM; + if (ready_outputs_.empty() && !read_cb_.is_null()) + SatisfyRead(OK, StreamTraits::CreateEOSOutput()); + return; + } + + if (CanDecodeMore()) + ReadFromDemuxerStream(); + return; + } + + if (state_ == STATE_FLUSHING_DECODER && !pending_decode_requests_) + ReinitializeDecoder(); + return; + } +} + +template <DemuxerStream::Type StreamType> +void DecoderStream<StreamType>::OnDecodeOutputReady( + const scoped_refptr<Output>& output) { + FUNCTION_DVLOG(2) << ": " << output->timestamp().InMilliseconds() << " ms"; + DCHECK(output); + DCHECK(!output->end_of_stream()); + DCHECK(state_ == STATE_NORMAL || state_ == STATE_FLUSHING_DECODER || + state_ == STATE_PENDING_DEMUXER_READ || state_ == STATE_ERROR) + << state_; + + if (state_ == STATE_ERROR) { + DCHECK(read_cb_.is_null()); + return; + } + + // Drop decoding result if Reset() was called during decoding. + // The resetting process will be handled when the decoder is reset. + if (!reset_cb_.is_null()) + return; + + // TODO(xhwang): VideoDecoder doesn't need to return EOS after it's flushed. + // Fix all decoders and remove this block. + // Store decoded output. + ready_outputs_.push_back(output); + + if (read_cb_.is_null()) + return; + + // Satisfy outstanding read request, if any. + scoped_refptr<Output> read_result = ready_outputs_.front(); + ready_outputs_.pop_front(); + SatisfyRead(OK, output); +} + +template <DemuxerStream::Type StreamType> +void DecoderStream<StreamType>::ReadFromDemuxerStream() { + FUNCTION_DVLOG(2); + DCHECK_EQ(state_, STATE_NORMAL) << state_; + DCHECK(CanDecodeMore()); + DCHECK(reset_cb_.is_null()); + DCHECK(stop_cb_.is_null()); + + state_ = STATE_PENDING_DEMUXER_READ; + stream_->Read(base::Bind(&DecoderStream<StreamType>::OnBufferReady, + weak_factory_.GetWeakPtr())); +} + +template <DemuxerStream::Type StreamType> +void DecoderStream<StreamType>::OnBufferReady( + DemuxerStream::Status status, + const scoped_refptr<DecoderBuffer>& buffer) { + FUNCTION_DVLOG(2) << ": " << status << ", " + << buffer->AsHumanReadableString(); + + DCHECK(task_runner_->BelongsToCurrentThread()); + DCHECK(state_ == STATE_PENDING_DEMUXER_READ || state_ == STATE_ERROR || + state_ == STATE_STOPPED) + << state_; + DCHECK_EQ(buffer.get() != NULL, status == DemuxerStream::kOk) << status; + DCHECK(stop_cb_.is_null()); + + // Decoding has been stopped (e.g due to an error). + if (state_ != STATE_PENDING_DEMUXER_READ) { + DCHECK(state_ == STATE_ERROR || state_ == STATE_STOPPED); + DCHECK(read_cb_.is_null()); + return; + } + + state_ = STATE_NORMAL; + + if (status == DemuxerStream::kConfigChanged) { + FUNCTION_DVLOG(2) << ": " << "ConfigChanged"; + DCHECK(stream_->SupportsConfigChanges()); + + if (!config_change_observer_cb_.is_null()) + config_change_observer_cb_.Run(); + + state_ = STATE_FLUSHING_DECODER; + if (!reset_cb_.is_null()) { + // If we are using DecryptingDemuxerStream, we already called DDS::Reset() + // which will continue the resetting process in it's callback. + if (!decrypting_demuxer_stream_) + Reset(base::ResetAndReturn(&reset_cb_)); + // Reinitialization will continue after Reset() is done. + } else { + FlushDecoder(); + } + return; + } + + if (!reset_cb_.is_null()) { + // If we are using DecryptingDemuxerStream, we already called DDS::Reset() + // which will continue the resetting process in it's callback. + if (!decrypting_demuxer_stream_) + Reset(base::ResetAndReturn(&reset_cb_)); + return; + } + + if (status == DemuxerStream::kAborted) { + if (!read_cb_.is_null()) + SatisfyRead(DEMUXER_READ_ABORTED, NULL); + return; + } + + if (!splice_observer_cb_.is_null() && !buffer->end_of_stream()) { + const bool has_splice_ts = buffer->splice_timestamp() != kNoTimestamp(); + if (active_splice_ || has_splice_ts) { + splice_observer_cb_.Run(buffer->splice_timestamp()); + active_splice_ = has_splice_ts; + } + } + + DCHECK(status == DemuxerStream::kOk) << status; + Decode(buffer); + + // Read more data if the decoder supports multiple parallel decoding requests. + if (CanDecodeMore() && !buffer->end_of_stream()) + ReadFromDemuxerStream(); +} + +template <DemuxerStream::Type StreamType> +void DecoderStream<StreamType>::ReinitializeDecoder() { + FUNCTION_DVLOG(2); + DCHECK(task_runner_->BelongsToCurrentThread()); + DCHECK_EQ(state_, STATE_FLUSHING_DECODER) << state_; + DCHECK_EQ(pending_decode_requests_, 0); + + DCHECK(StreamTraits::GetDecoderConfig(*stream_).IsValidConfig()); + state_ = STATE_REINITIALIZING_DECODER; + DecoderStreamTraits<StreamType>::Initialize( + decoder_.get(), + StreamTraits::GetDecoderConfig(*stream_), + low_delay_, + base::Bind(&DecoderStream<StreamType>::OnDecoderReinitialized, + weak_factory_.GetWeakPtr()), + base::Bind(&DecoderStream<StreamType>::OnDecodeOutputReady, + weak_factory_.GetWeakPtr())); +} + +template <DemuxerStream::Type StreamType> +void DecoderStream<StreamType>::OnDecoderReinitialized(PipelineStatus status) { + FUNCTION_DVLOG(2); + DCHECK(task_runner_->BelongsToCurrentThread()); + DCHECK_EQ(state_, STATE_REINITIALIZING_DECODER) << state_; + DCHECK(stop_cb_.is_null()); + + // ReinitializeDecoder() can be called in two cases: + // 1, Flushing decoder finished (see OnDecodeOutputReady()). + // 2, Reset() was called during flushing decoder (see OnDecoderReset()). + // Also, Reset() can be called during pending ReinitializeDecoder(). + // This function needs to handle them all! + + state_ = (status == PIPELINE_OK) ? STATE_NORMAL : STATE_ERROR; + + if (!reset_cb_.is_null()) { + base::ResetAndReturn(&reset_cb_).Run(); + return; + } + + if (read_cb_.is_null()) + return; + + if (state_ == STATE_ERROR) { + SatisfyRead(DECODE_ERROR, NULL); + return; + } + + ReadFromDemuxerStream(); +} + +template <DemuxerStream::Type StreamType> +void DecoderStream<StreamType>::ResetDecoder() { + FUNCTION_DVLOG(2); + DCHECK(task_runner_->BelongsToCurrentThread()); + DCHECK(state_ == STATE_NORMAL || state_ == STATE_FLUSHING_DECODER || + state_ == STATE_ERROR || state_ == STATE_END_OF_STREAM) << state_; + DCHECK(!reset_cb_.is_null()); + + decoder_->Reset(base::Bind(&DecoderStream<StreamType>::OnDecoderReset, + weak_factory_.GetWeakPtr())); +} + +template <DemuxerStream::Type StreamType> +void DecoderStream<StreamType>::OnDecoderReset() { + FUNCTION_DVLOG(2); + DCHECK(task_runner_->BelongsToCurrentThread()); + DCHECK(state_ == STATE_NORMAL || state_ == STATE_FLUSHING_DECODER || + state_ == STATE_ERROR || state_ == STATE_END_OF_STREAM) << state_; + // If Reset() was called during pending read, read callback should be fired + // before the reset callback is fired. + DCHECK(read_cb_.is_null()); + DCHECK(!reset_cb_.is_null()); + DCHECK(stop_cb_.is_null()); + + if (state_ != STATE_FLUSHING_DECODER) { + state_ = STATE_NORMAL; + active_splice_ = false; + base::ResetAndReturn(&reset_cb_).Run(); + return; + } + + // The resetting process will be continued in OnDecoderReinitialized(). + ReinitializeDecoder(); +} + +template <DemuxerStream::Type StreamType> +void DecoderStream<StreamType>::StopDecoder() { + FUNCTION_DVLOG(2); + DCHECK(task_runner_->BelongsToCurrentThread()); + DCHECK(state_ != STATE_UNINITIALIZED && state_ != STATE_STOPPED) << state_; + DCHECK(!stop_cb_.is_null()); + + state_ = STATE_STOPPED; + decoder_->Stop(); + stream_ = NULL; + decoder_.reset(); + decrypting_demuxer_stream_.reset(); + // Post |stop_cb_| because pending |read_cb_| and/or |reset_cb_| are also + // posted in Stop(). + task_runner_->PostTask(FROM_HERE, base::ResetAndReturn(&stop_cb_)); +} + +template class DecoderStream<DemuxerStream::VIDEO>; +template class DecoderStream<DemuxerStream::AUDIO>; + +} // namespace media |