aboutsummaryrefslogtreecommitdiffstats
path: root/examples/bluetooth/heartrate_game/devicehandler.py
diff options
context:
space:
mode:
Diffstat (limited to 'examples/bluetooth/heartrate_game/devicehandler.py')
-rw-r--r--examples/bluetooth/heartrate_game/devicehandler.py309
1 files changed, 309 insertions, 0 deletions
diff --git a/examples/bluetooth/heartrate_game/devicehandler.py b/examples/bluetooth/heartrate_game/devicehandler.py
new file mode 100644
index 000000000..df34052b8
--- /dev/null
+++ b/examples/bluetooth/heartrate_game/devicehandler.py
@@ -0,0 +1,309 @@
+# Copyright (C) 2022 The Qt Company Ltd.
+# SPDX-License-Identifier: LicenseRef-Qt-Commercial OR BSD-3-Clause
+
+import struct
+
+from enum import IntEnum
+
+from PySide6.QtBluetooth import (QLowEnergyCharacteristic,
+ QLowEnergyController,
+ QLowEnergyDescriptor,
+ QLowEnergyService,
+ QBluetoothUuid)
+from PySide6.QtQml import QmlElement
+from PySide6.QtCore import (QByteArray, QDateTime, QRandomGenerator, QTimer,
+ Property, Signal, Slot, QEnum)
+
+from bluetoothbaseclass import BluetoothBaseClass
+from heartrate_global import simulator
+
+
+# To be used on the @QmlElement decorator
+# (QML_IMPORT_MINOR_VERSION is optional)
+QML_IMPORT_NAME = "HeartRateGame"
+QML_IMPORT_MAJOR_VERSION = 1
+
+
+@QmlElement
+class DeviceHandler(BluetoothBaseClass):
+
+ @QEnum
+ class AddressType(IntEnum):
+ PUBLIC_ADDRESS = 1
+ RANDOM_ADDRESS = 2
+
+ measuringChanged = Signal()
+ aliveChanged = Signal()
+ statsChanged = Signal()
+
+ def __init__(self, parent=None):
+ super().__init__(parent)
+
+ self.m_control = None
+ self.m_service = None
+ self.m_notificationDesc = QLowEnergyDescriptor()
+ self.m_currentDevice = None
+
+ self.m_foundHeartRateService = False
+ self.m_measuring = False
+ self.m_currentValue = 0
+ self.m_min = 0
+ self.m_max = 0
+ self.m_sum = 0
+ self.m_avg = 0.0
+ self.m_calories = 0.0
+
+ self.m_start = QDateTime()
+ self.m_stop = QDateTime()
+
+ self.m_measurements = []
+ self.m_addressType = QLowEnergyController.PublicAddress
+
+ self.m_demoTimer = QTimer()
+
+ if simulator():
+ self.m_demoTimer.setSingleShot(False)
+ self.m_demoTimer.setInterval(2000)
+ self.m_demoTimer.timeout.connect(self.updateDemoHR)
+ self.m_demoTimer.start()
+ self.updateDemoHR()
+
+ @Property(int)
+ def addressType(self):
+ if self.m_addressType == QLowEnergyController.RandomAddress:
+ return DeviceHandler.AddressType.RANDOM_ADDRESS
+ return DeviceHandler.AddressType.PUBLIC_ADDRESS
+
+ @addressType.setter
+ def addressType(self, type):
+ if type == DeviceHandler.AddressType.PUBLIC_ADDRESS:
+ self.m_addressType = QLowEnergyController.PublicAddress
+ elif type == DeviceHandler.AddressType.RANDOM_ADDRESS:
+ self.m_addressType = QLowEnergyController.RandomAddress
+
+ @Slot(QLowEnergyController.Error)
+ def controllerErrorOccurred(self, device):
+ self.error = "Cannot connect to remote device."
+
+ @Slot()
+ def controllerConnected(self):
+ self.info = "Controller connected. Search services..."
+ self.m_control.discoverServices()
+
+ @Slot()
+ def controllerDisconnected(self):
+ self.error = "LowEnergy controller disconnected"
+
+ def setDevice(self, device):
+ self.clearMessages()
+ self.m_currentDevice = device
+
+ if simulator():
+ self.info = "Demo device connected."
+ return
+
+ # Disconnect and delete old connection
+ if self.m_control:
+ self.m_control.disconnectFromDevice()
+ self.m_control = None
+
+ # Create new controller and connect it if device available
+ if self.m_currentDevice:
+
+ # Make connections
+#! [Connect-Signals-1]
+ self.m_control = QLowEnergyController.createCentral(self.m_currentDevice.device(), self)
+#! [Connect-Signals-1]
+ self.m_control.setRemoteAddressType(self.m_addressType)
+#! [Connect-Signals-2]
+
+ self.m_control.serviceDiscovered.connect(self.serviceDiscovered)
+ self.m_control.discoveryFinished.connect(self.serviceScanDone)
+
+ self.m_control.errorOccurred.connect(self.controllerErrorOccurred)
+ self.m_control.connected.connect(self.controllerConnected)
+ self.m_control.disconnected.connect(self.controllerDisconnected)
+
+ # Connect
+ self.m_control.connectToDevice()
+#! [Connect-Signals-2]
+
+ @Slot()
+ def startMeasurement(self):
+ if self.alive:
+ self.m_start = QDateTime.currentDateTime()
+ self.m_min = 0
+ self.m_max = 0
+ self.m_avg = 0
+ self.m_sum = 0
+ self.m_calories = 0.0
+ self.m_measuring = True
+ self.m_measurements.clear()
+ self.measuringChanged.emit()
+
+ @Slot()
+ def stopMeasurement(self):
+ self.m_measuring = False
+ self.measuringChanged.emit()
+
+#! [Filter HeartRate service 1]
+ @Slot(QBluetoothUuid)
+ def serviceDiscovered(self, gatt):
+ if gatt == QBluetoothUuid(QBluetoothUuid.ServiceClassUuid.HeartRate):
+ self.info = "Heart Rate service discovered. Waiting for service scan to be done..."
+ self.m_foundHeartRateService = True
+
+#! [Filter HeartRate service 1]
+
+ @Slot()
+ def serviceScanDone(self):
+ self.info = "Service scan done."
+
+ # Delete old service if available
+ if self.m_service:
+ self.m_service = None
+
+#! [Filter HeartRate service 2]
+ # If heartRateService found, create new service
+ if self.m_foundHeartRateService:
+ self.m_service = self.m_control.createServiceObject(
+ QBluetoothUuid(QBluetoothUuid.ServiceClassUuid.HeartRate), self)
+
+ if self.m_service:
+ self.m_service.stateChanged.connect(self.serviceStateChanged)
+ self.m_service.characteristicChanged.connect(self.updateHeartRateValue)
+ self.m_service.descriptorWritten.connect(self.confirmedDescriptorWrite)
+ self.m_service.discoverDetails()
+ else:
+ self.error = "Heart Rate Service not found."
+#! [Filter HeartRate service 2]
+
+# Service functions
+#! [Find HRM characteristic]
+ @Slot(QLowEnergyService.ServiceState)
+ def serviceStateChanged(self, switch):
+ if switch == QLowEnergyService.RemoteServiceDiscovering:
+ self.info = "Discovering services..."
+ elif switch == QLowEnergyService.RemoteServiceDiscovered:
+ self.info = "Service discovered."
+ hrChar = self.m_service.characteristic(
+ QBluetoothUuid(QBluetoothUuid.CharacteristicType.HeartRateMeasurement))
+ if hrChar.isValid():
+ self.m_notificationDesc = hrChar.descriptor(
+ QBluetoothUuid.DescriptorType.ClientCharacteristicConfiguration)
+ if self.m_notificationDesc.isValid():
+ self.m_service.writeDescriptor(self.m_notificationDesc,
+ QByteArray.fromHex(b"0100"))
+ else:
+ self.error = "HR Data not found."
+ self.aliveChanged.emit()
+#! [Find HRM characteristic]
+
+#! [Reading value]
+ @Slot(QLowEnergyCharacteristic, QByteArray)
+ def updateHeartRateValue(self, c, value):
+ # ignore any other characteristic change. Shouldn't really happen though
+ if c.uuid() != QBluetoothUuid(QBluetoothUuid.CharacteristicType.HeartRateMeasurement):
+ return
+
+ data = value.data()
+ flags = int(data[0])
+ # Heart Rate
+ hrvalue = 0
+ if flags & 0x1: # HR 16 bit little endian? otherwise 8 bit
+ hrvalue = struct.unpack("<H", data[1:3])[0]
+ else:
+ hrvalue = struct.unpack("B", data[1:2])[0]
+
+ self.addMeasurement(hrvalue)
+
+#! [Reading value]
+ @Slot()
+ def updateDemoHR(self):
+ randomValue = 0
+ if self.m_currentValue < 30: # Initial value
+ randomValue = 55 + QRandomGenerator.global_().bounded(30)
+ elif not self.m_measuring: # Value when relax
+ random = QRandomGenerator.global_().bounded(5)
+ randomValue = self.m_currentValue - 2 + random
+ randomValue = max(min(randomValue, 55), 75)
+ else: # Measuring
+ random = QRandomGenerator.global_().bounded(10)
+ randomValue = self.m_currentValue + random - 2
+
+ self.addMeasurement(randomValue)
+
+ @Slot(QLowEnergyCharacteristic, QByteArray)
+ def confirmedDescriptorWrite(self, d, value):
+ if (d.isValid() and d == self.m_notificationDesc
+ and value == QByteArray.fromHex(b"0000")):
+ # disabled notifications . assume disconnect intent
+ self.m_control.disconnectFromDevice()
+ self.m_service = None
+
+ @Slot()
+ def disconnectService(self):
+ self.m_foundHeartRateService = False
+
+ # disable notifications
+ if (self.m_notificationDesc.isValid() and self.m_service
+ and self.m_notificationDesc.value() == QByteArray.fromHex(b"0100")):
+ self.m_service.writeDescriptor(self.m_notificationDesc,
+ QByteArray.fromHex(b"0000"))
+ else:
+ if self.m_control:
+ self.m_control.disconnectFromDevice()
+ self.m_service = None
+
+ @Property(bool, notify=measuringChanged)
+ def measuring(self):
+ return self.m_measuring
+
+ @Property(bool, notify=aliveChanged)
+ def alive(self):
+ if simulator():
+ return True
+ if self.m_service:
+ return self.m_service.state() == QLowEnergyService.RemoteServiceDiscovered
+ return False
+
+ @Property(int, notify=statsChanged)
+ def hr(self):
+ return self.m_currentValue
+
+ @Property(int, notify=statsChanged)
+ def time(self):
+ return self.m_start.secsTo(self.m_stop)
+
+ @Property(int, notify=statsChanged)
+ def maxHR(self):
+ return self.m_max
+
+ @Property(int, notify=statsChanged)
+ def minHR(self):
+ return self.m_min
+
+ @Property(float, notify=statsChanged)
+ def average(self):
+ return self.m_avg
+
+ @Property(float, notify=statsChanged)
+ def calories(self):
+ return self.m_calories
+
+ def addMeasurement(self, value):
+ self.m_currentValue = value
+
+ # If measuring and value is appropriate
+ if self.m_measuring and value > 30 and value < 250:
+ self.m_stop = QDateTime.currentDateTime()
+ self.m_measurements.append(value)
+
+ self.m_min = value if self.m_min == 0 else min(value, self.m_min)
+ self.m_max = max(value, self.m_max)
+ self.m_sum += value
+ self.m_avg = float(self.m_sum) / len(self.m_measurements)
+ self.m_calories = ((-55.0969 + (0.6309 * self.m_avg) + (0.1988 * 94)
+ + (0.2017 * 24)) / 4.184) * 60 * self.time / 3600
+
+ self.statsChanged.emit()