diff options
author | Jøger Hansegård <joger.hansegard@qt.io> | 2023-10-06 22:07:10 +0200 |
---|---|---|
committer | Qt Cherry-pick Bot <cherrypick_bot@qt-project.org> | 2023-10-10 06:11:35 +0000 |
commit | 1218d0108c336b56119a643539f8c3d8c5386d94 (patch) | |
tree | 9b03de20eade75f9f4e86a1158ecf0d532c6a5d1 | |
parent | c8232180b6828a8eff186d8e0fd74a5a428560f4 (diff) |
Make FFmpeg::Thread easier to read
* Remove unused code related to timeout handling and simplify sleep/
notify mechanism.
* Rename to ConsumerThread because it is not a general purpose thread,
but a thread that is waken to consume available incoming data.
* Rename functions and variables to better match intent.
* Reduce scope of mutex that appears to only protect a single stop flag.
We previously held a mutex while calling functions on derived classes,
which could increase risk of deadlocks.
Change-Id: I9e4835a4084ca717ff67e35a0b93454bd1f1ada6
Reviewed-by: Artem Dyomin <artem.dyomin@qt.io>
(cherry picked from commit 66fbfc5ca448d8668c43932aacf57d6ec4b96880)
Reviewed-by: Qt Cherry-pick Bot <cherrypick_bot@qt-project.org>
(cherry picked from commit 77414af998aceeb602a5579489828eca428969a2)
-rw-r--r-- | src/plugins/multimedia/ffmpeg/qffmpegencoder.cpp | 34 | ||||
-rw-r--r-- | src/plugins/multimedia/ffmpeg/qffmpegencoder_p.h | 16 | ||||
-rw-r--r-- | src/plugins/multimedia/ffmpeg/qffmpegthread.cpp | 51 | ||||
-rw-r--r-- | src/plugins/multimedia/ffmpeg/qffmpegthread_p.h | 63 |
4 files changed, 88 insertions, 76 deletions
diff --git a/src/plugins/multimedia/ffmpeg/qffmpegencoder.cpp b/src/plugins/multimedia/ffmpeg/qffmpegencoder.cpp index 5ad8f259a..b1b757ff9 100644 --- a/src/plugins/multimedia/ffmpeg/qffmpegencoder.cpp +++ b/src/plugins/multimedia/ffmpeg/qffmpegencoder.cpp @@ -142,10 +142,10 @@ EncodingFinalizer::EncodingFinalizer(Encoder *e) : encoder(e) { void EncodingFinalizer::run() { if (encoder->audioEncode) - encoder->audioEncode->kill(); + encoder->audioEncode->stopAndDelete(); for (auto &videoEncoder : encoder->videoEncoders) - videoEncoder->kill(); - encoder->muxer->kill(); + videoEncoder->stopAndDelete(); + encoder->muxer->stopAndDelete(); if (encoder->isHeaderWritten) { const int res = av_write_trailer(encoder->formatContext); @@ -218,7 +218,7 @@ void Muxer::addPacket(AVPacketUPtr packet) } // qCDebug(qLcFFmpegEncoder) << "Muxer::addPacket" << packet->pts << packet->stream_index; - wake(); + dataReady(); } AVPacketUPtr Muxer::takePacket() @@ -236,13 +236,13 @@ void Muxer::cleanup() { } -bool QFFmpeg::Muxer::shouldWait() const +bool QFFmpeg::Muxer::hasData() const { QMutexLocker locker(&queueMutex); - return packetQueue.empty(); + return !packetQueue.empty(); } -void Muxer::loop() +void Muxer::processOne() { auto packet = takePacket(); // qCDebug(qLcFFmpegEncoder) << "writing packet to file" << packet->pts << packet->duration << @@ -385,7 +385,7 @@ void AudioEncoder::addBuffer(const QAudioBuffer &buffer) if (!paused.loadRelaxed()) { audioBufferQueue.push(buffer); locker.unlock(); - wake(); + dataReady(); } } @@ -407,16 +407,16 @@ void AudioEncoder::init() void AudioEncoder::cleanup() { while (!audioBufferQueue.empty()) - loop(); + processOne(); while (avcodec_send_frame(codecContext.get(), nullptr) == AVERROR(EAGAIN)) retrievePackets(); retrievePackets(); } -bool AudioEncoder::shouldWait() const +bool AudioEncoder::hasData() const { QMutexLocker locker(&queueMutex); - return audioBufferQueue.empty(); + return !audioBufferQueue.empty(); } void AudioEncoder::retrievePackets() @@ -441,7 +441,7 @@ void AudioEncoder::retrievePackets() } } -void AudioEncoder::loop() +void AudioEncoder::processOne() { QAudioBuffer buffer = takeBuffer(); if (!buffer.isValid() || paused.loadAcquire()) @@ -535,7 +535,7 @@ void VideoEncoder::addFrame(const QVideoFrame &frame) locker.unlock(); // Avoid context switch on wake wake-up - wake(); + dataReady(); } } @@ -564,7 +564,7 @@ void VideoEncoder::init() void VideoEncoder::cleanup() { while (!videoFrameQueue.empty()) - loop(); + processOne(); if (frameEncoder) { while (frameEncoder->sendFrame(nullptr) == AVERROR(EAGAIN)) retrievePackets(); @@ -572,10 +572,10 @@ void VideoEncoder::cleanup() } } -bool VideoEncoder::shouldWait() const +bool VideoEncoder::hasData() const { QMutexLocker locker(&queueMutex); - return videoFrameQueue.empty(); + return !videoFrameQueue.empty(); } struct QVideoFrameHolder @@ -589,7 +589,7 @@ static void freeQVideoFrame(void *opaque, uint8_t *) delete reinterpret_cast<QVideoFrameHolder *>(opaque); } -void VideoEncoder::loop() +void VideoEncoder::processOne() { if (paused.loadAcquire()) return; diff --git a/src/plugins/multimedia/ffmpeg/qffmpegencoder_p.h b/src/plugins/multimedia/ffmpeg/qffmpegencoder_p.h index 66470dfe5..f29d9811f 100644 --- a/src/plugins/multimedia/ffmpeg/qffmpegencoder_p.h +++ b/src/plugins/multimedia/ffmpeg/qffmpegencoder_p.h @@ -105,7 +105,7 @@ private: }; -class Muxer : public Thread +class Muxer : public ConsumerThread { mutable QMutex queueMutex; std::queue<AVPacketUPtr> packetQueue; @@ -120,13 +120,13 @@ private: void init() override; void cleanup() override; - bool shouldWait() const override; - void loop() override; + bool hasData() const override; + void processOne() override; Encoder *encoder; }; -class EncoderThread : public Thread +class EncoderThread : public ConsumerThread { public: virtual void setPaused(bool b) @@ -158,8 +158,8 @@ private: void init() override; void cleanup() override; - bool shouldWait() const override; - void loop() override; + bool hasData() const override; + void processOne() override; AVStream *stream = nullptr; AVCodecContextUPtr codecContext; @@ -200,8 +200,8 @@ private: void init() override; void cleanup() override; - bool shouldWait() const override; - void loop() override; + bool hasData() const override; + void processOne() override; std::unique_ptr<VideoFrameEncoder> frameEncoder; diff --git a/src/plugins/multimedia/ffmpeg/qffmpegthread.cpp b/src/plugins/multimedia/ffmpeg/qffmpegthread.cpp index 804ba424f..c098bb68c 100644 --- a/src/plugins/multimedia/ffmpeg/qffmpegthread.cpp +++ b/src/plugins/multimedia/ffmpeg/qffmpegthread.cpp @@ -3,54 +3,45 @@ #include "qffmpegthread_p.h" -#include <qloggingcategory.h> QT_BEGIN_NAMESPACE using namespace QFFmpeg; -void Thread::kill() +void ConsumerThread::stopAndDelete() { { - QMutexLocker locker(&mutex); - exit.storeRelease(true); - killHelper(); + QMutexLocker locker(&exitMutex); + exit = true; } - wake(); + dataReady(); wait(); delete this; } -void Thread::maybePause() +void ConsumerThread::dataReady() { - while (timeOut > 0 || shouldWait()) { - if (exit.loadAcquire()) - break; - - QElapsedTimer timer; - timer.start(); - if (condition.wait(&mutex, QDeadlineTimer(timeOut, Qt::PreciseTimer))) { - if (timeOut >= 0) { - timeOut -= timer.elapsed(); - if (timeOut < 0) - timeOut = -1; - } - } else { - timeOut = -1; - } - } + condition.wakeAll(); } -void Thread::run() +void ConsumerThread::run() { init(); - QMutexLocker locker(&mutex); - while (1) { - maybePause(); - if (exit.loadAcquire()) - break; - loop(); + + while (true) { + + { + QMutexLocker locker(&exitMutex); + while (!hasData() && !exit) + condition.wait(&exitMutex); + + if (exit) + break; + } + + processOne(); } + cleanup(); } diff --git a/src/plugins/multimedia/ffmpeg/qffmpegthread_p.h b/src/plugins/multimedia/ffmpeg/qffmpegthread_p.h index e5c87e237..a9f80c36f 100644 --- a/src/plugins/multimedia/ffmpeg/qffmpegthread_p.h +++ b/src/plugins/multimedia/ffmpeg/qffmpegthread_p.h @@ -27,38 +27,59 @@ class QAudioSink; namespace QFFmpeg { -class Thread : public QThread +/*! + FFmpeg thread that is used to implement a consumer pattern. + + This thread processes work items until no more data is available. + When no more data is available, it sleeps until it is notified about + more available data. + */ +class ConsumerThread : public QThread { public: - mutable QMutex mutex; - qint64 timeOut = -1; -private: - QWaitCondition condition; + /*! + Stops the thread and deletes this object + */ + void stopAndDelete(); protected: - QAtomicInteger<bool> exit = false; -public: - // public API is thread-safe + /*! + Called on this thread when thread starts + */ + virtual void init() = 0; - void kill(); - virtual void killHelper() {} + /*! + Called on this thread before thread exits + */ + virtual void cleanup() = 0; - void wake() { - condition.wakeAll(); - } + /*! + Process one work item. Called repeatedly until hasData() returns + false, in which case the thread sleeps until the next dataReady() + notification. -protected: - virtual void init() {} - virtual void cleanup() {} - // loop() should never block, all blocking has to happen in shouldWait() - virtual void loop() = 0; - virtual bool shouldWait() const { return false; } + Note: processOne() should never block. + */ + virtual void processOne() = 0; + + /*! + Wake thread from sleep and process data until + hasData() returns false. + */ + void dataReady(); + + /*! + Must return true when data is available for processing + */ + virtual bool hasData() const = 0; private: - void maybePause(); + void run() final; - void run() override; + QMutex exitMutex; // Protects exit flag. + QWaitCondition condition; + bool exit = false; }; } |