diff options
Diffstat (limited to 'examples/multimedia/audiooutput/audiooutput.py')
-rw-r--r-- | examples/multimedia/audiooutput/audiooutput.py | 276 |
1 files changed, 276 insertions, 0 deletions
diff --git a/examples/multimedia/audiooutput/audiooutput.py b/examples/multimedia/audiooutput/audiooutput.py new file mode 100644 index 000000000..06d52f68a --- /dev/null +++ b/examples/multimedia/audiooutput/audiooutput.py @@ -0,0 +1,276 @@ +# Copyright (C) 2013 Riverbank Computing Limited. +# Copyright (C) 2022 The Qt Company Ltd. +# SPDX-License-Identifier: LicenseRef-Qt-Commercial OR BSD-3-Clause + +"""PySide6 port of the multimedia/audiooutput example from Qt v5.x, originating from PyQt""" + +import sys +from math import pi, sin +from struct import pack + +from PySide6.QtCore import (QByteArray, QIODevice, Qt, QSysInfo, QTimer, + qWarning, Slot) +from PySide6.QtMultimedia import (QAudio, QAudioFormat, + QAudioSink, QMediaDevices) +from PySide6.QtWidgets import (QApplication, QComboBox, QHBoxLayout, QLabel, + QMainWindow, QPushButton, QSlider, + QVBoxLayout, QWidget) + + +class Generator(QIODevice): + + def __init__(self, format, durationUs, sampleRate, parent): + super().__init__(parent) + + self.m_pos = 0 + self.m_buffer = QByteArray() + + self.generate_data(format, durationUs, sampleRate) + + def start(self): + self.open(QIODevice.ReadOnly) + + def stop(self): + self.m_pos = 0 + self.close() + + def generate_data(self, fmt, durationUs, sampleRate): + pack_format = '' + + sample_size = fmt.bytesPerSample() * 8 + if sample_size == 8: + if fmt.sampleFormat() == QAudioFormat.UInt8: + scaler = lambda x: ((1.0 + x) / 2 * 255) # noqa: E731 + pack_format = 'B' + elif fmt.sampleFormat() == QAudioFormat.Int16: + scaler = lambda x: x * 127 # noqa: E731 + pack_format = 'b' + elif sample_size == 16: + little_endian = QSysInfo.ByteOrder == QSysInfo.LittleEndian + if fmt.sampleFormat() == QAudioFormat.UInt8: + scaler = lambda x: (1.0 + x) / 2 * 65535 # noqa: E731 + pack_format = '<H' if little_endian else '>H' + elif fmt.sampleFormat() == QAudioFormat.Int16: + scaler = lambda x: x * 32767 # noqa: E731 + pack_format = '<h' if little_endian else '>h' + + assert pack_format != '' + + channel_bytes = fmt.bytesPerSample() + + length = (fmt.sampleRate() * fmt.channelCount() * channel_bytes) * durationUs // 100000 + + self.m_buffer.clear() + sample_index = 0 + factor = 2 * pi * sampleRate / fmt.sampleRate() + + while length != 0: + x = sin((sample_index % fmt.sampleRate()) * factor) + packed = pack(pack_format, int(scaler(x))) + + for _ in range(fmt.channelCount()): + self.m_buffer.append(packed) + length -= channel_bytes + + sample_index += 1 + + def readData(self, maxlen): + data = QByteArray() + total = 0 + + while maxlen > total: + chunk = min(self.m_buffer.size() - self.m_pos, maxlen - total) + data.append(self.m_buffer.mid(self.m_pos, chunk)) + self.m_pos = (self.m_pos + chunk) % self.m_buffer.size() + total += chunk + + return data.data() + + def writeData(self, data): + return 0 + + def bytesAvailable(self): + return self.m_buffer.size() + super(Generator, self).bytesAvailable() + + +class AudioTest(QMainWindow): + + PUSH_MODE_LABEL = "Enable push mode" + PULL_MODE_LABEL = "Enable pull mode" + SUSPEND_LABEL = "Suspend playback" + RESUME_LABEL = "Resume playback" + + DURATION_SECONDS = 1 + TONE_SAMPLE_RATE_HZ = 600 + DATA_SAMPLE_RATE_HZ = 44100 + + def __init__(self, devices): + super().__init__() + + self.m_devices = devices + self.m_device = self.m_devices[0] + self.m_output = None + + self.initialize_window() + self.initialize_audio() + + def initialize_window(self): + + central_widget = QWidget() + layout = QVBoxLayout(central_widget) + + self.m_deviceBox = QComboBox() + self.m_deviceBox.activated[int].connect(self.device_changed) + for deviceInfo in self.m_devices: + self.m_deviceBox.addItem(deviceInfo.description(), deviceInfo) + + layout.addWidget(self.m_deviceBox) + + self.m_modeButton = QPushButton() + self.m_modeButton.clicked.connect(self.toggle_mode) + self.m_modeButton.setText(self.PUSH_MODE_LABEL) + + layout.addWidget(self.m_modeButton) + + self.m_suspendResumeButton = QPushButton(clicked=self.toggle_suspend_resume) + self.m_suspendResumeButton.setText(self.SUSPEND_LABEL) + + layout.addWidget(self.m_suspendResumeButton) + + volume_box = QHBoxLayout() + volume_label = QLabel("Volume:") + self.m_volumeSlider = QSlider(Qt.Horizontal, minimum=0, maximum=100, singleStep=10) + self.m_volumeSlider.valueChanged.connect(self.volume_changed) + + volume_box.addWidget(volume_label) + volume_box.addWidget(self.m_volumeSlider) + + layout.addLayout(volume_box) + + self.setCentralWidget(central_widget) + + def initialize_audio(self): + self.m_pullTimer = QTimer(self) + self.m_pullTimer.timeout.connect(self.pull_timer_expired) + self.m_pullMode = True + + self.m_format = QAudioFormat() + self.m_format.setSampleRate(self.DATA_SAMPLE_RATE_HZ) + self.m_format.setChannelCount(1) + self.m_format.setSampleFormat(QAudioFormat.Int16) + + info = self.m_devices[0] + if not info.isFormatSupported(self.m_format): + qWarning("Default format not supported - trying to use nearest") + self.m_format = info.nearestFormat(self.m_format) + + self.m_generator = Generator(self.m_format, self.DURATION_SECONDS * 1000000, + self.TONE_SAMPLE_RATE_HZ, self) + + self.create_audio_output() + + def create_audio_output(self): + self.m_audioSink = QAudioSink(self.m_device, self.m_format) + self.m_audioSink.stateChanged.connect(self.handle_state_changed) + + self.m_generator.start() + self.m_audioSink.start(self.m_generator) + self.m_volumeSlider.setValue(self.m_audioSink.volume() * 100) + + def closeEvent(self, e): + self.stop() + e.accept() + + def stop(self): + self.m_pullTimer.stop() + self.m_generator.stop() + self.m_audioSink.stop() + + @Slot(int) + def device_changed(self, index): + self.stop() + self.m_device = self.m_deviceBox.itemData(index) + + self.create_audio_output() + + @Slot(int) + def volume_changed(self, value): + if self.m_audioSink is not None: + self.m_audioSink.setVolume(value / 100.0) + + @Slot() + def notified(self): + bytes_free = self.m_audioSink.bytesFree() + elapsed = self.m_audioSink.elapsedUSecs() + processed = self.m_audioSink.processedUSecs() + qWarning(f"bytesFree = {bytes_free}, " + f"elapsedUSecs = {elapsed}, " + f"processedUSecs = {processed}") + + @Slot() + def pull_timer_expired(self): + if self.m_audioSink is not None and self.m_audioSink.state() != QAudio.StoppedState: + bytes_free = self.m_audioSink.bytesFree() + data = self.m_generator.read(bytes_free) + if data: + self.m_output.write(data) + + @Slot() + def toggle_mode(self): + self.m_pullTimer.stop() + self.m_audioSink.stop() + + if self.m_pullMode: + self.m_modeButton.setText(self.PULL_MODE_LABEL) + self.m_output = self.m_audioSink.start() + self.m_pullMode = False + self.m_pullTimer.start(20) + else: + self.m_modeButton.setText(self.PUSH_MODE_LABEL) + self.m_pullMode = True + self.m_audioSink.start(self.m_generator) + + self.m_suspendResumeButton.setText(self.SUSPEND_LABEL) + + @Slot() + def toggle_suspend_resume(self): + if self.m_audioSink.state() == QAudio.SuspendedState: + qWarning("status: Suspended, resume()") + self.m_audioSink.resume() + self.m_suspendResumeButton.setText(self.SUSPEND_LABEL) + elif self.m_audioSink.state() == QAudio.ActiveState: + qWarning("status: Active, suspend()") + self.m_audioSink.suspend() + self.m_suspendResumeButton.setText(self.RESUME_LABEL) + elif self.m_audioSink.state() == QAudio.StoppedState: + qWarning("status: Stopped, resume()") + self.m_audioSink.resume() + self.m_suspendResumeButton.setText(self.SUSPEND_LABEL) + elif self.m_audioSink.state() == QAudio.IdleState: + qWarning("status: IdleState") + + state_map = { + QAudio.ActiveState: "ActiveState", + QAudio.SuspendedState: "SuspendedState", + QAudio.StoppedState: "StoppedState", + QAudio.IdleState: "IdleState"} + + @Slot("QAudio::State") + def handle_state_changed(self, state): + state = self.state_map.get(state, 'Unknown') + qWarning(f"state = {state}") + + +if __name__ == '__main__': + app = QApplication(sys.argv) + app.setApplicationName("Audio Output Test") + + devices = QMediaDevices.audioOutputs() + if not devices: + print('No audio outputs found.', file=sys.stderr) + sys.exit(-1) + + audio = AudioTest(devices) + audio.show() + + sys.exit(app.exec()) |