diff options
Diffstat (limited to 'examples/serialbus/modbus/modbusclient/mainwindow.py')
-rw-r--r-- | examples/serialbus/modbus/modbusclient/mainwindow.py | 331 |
1 files changed, 331 insertions, 0 deletions
diff --git a/examples/serialbus/modbus/modbusclient/mainwindow.py b/examples/serialbus/modbus/modbusclient/mainwindow.py new file mode 100644 index 000000000..02f9d478b --- /dev/null +++ b/examples/serialbus/modbus/modbusclient/mainwindow.py @@ -0,0 +1,331 @@ +# Copyright (C) 2022 The Qt Company Ltd. +# SPDX-License-Identifier: LicenseRef-Qt-Commercial OR BSD-3-Clause + +import functools +from enum import IntEnum + +from PySide6.QtCore import QUrl, Slot +from PySide6.QtGui import QStandardItemModel, QStandardItem +from PySide6.QtWidgets import QMainWindow +from PySide6.QtSerialBus import (QModbusDataUnit, + QModbusDevice, QModbusReply, + QModbusRtuSerialClient, QModbusTcpClient) + +from ui_mainwindow import Ui_MainWindow +from settingsdialog import SettingsDialog +from writeregistermodel import WriteRegisterModel + + +class ModbusConnection(IntEnum): + SERIAL = 0 + TCP = 1 + + +class MainWindow(QMainWindow): + + def __init__(self, parent=None): + super().__init__(parent) + self.ui = Ui_MainWindow() + self.ui.setupUi(self) + + self._modbus_device = None + + self._settings_dialog = SettingsDialog(self) + + self.init_actions() + + self._write_model = WriteRegisterModel(self) + self._write_model.set_start_address(self.ui.writeAddress.value()) + self._write_model.set_number_of_values(self.ui.writeSize.currentText()) + + self.ui.writeValueTable.setModel(self._write_model) + self.ui.writeValueTable.hideColumn(2) + vp = self.ui.writeValueTable.viewport() + self._write_model.update_viewport.connect(vp.update) + + self.ui.writeTable.addItem("Coils", QModbusDataUnit.Coils) + self.ui.writeTable.addItem("Discrete Inputs", QModbusDataUnit.DiscreteInputs) + self.ui.writeTable.addItem("Input Registers", QModbusDataUnit.InputRegisters) + self.ui.writeTable.addItem("Holding Registers", QModbusDataUnit.HoldingRegisters) + + self.ui.connectType.setCurrentIndex(0) + self.onConnectTypeChanged(0) + + self._write_size_model = QStandardItemModel(0, 1, self) + for i in range(1, 11): + self._write_size_model.appendRow(QStandardItem(f"{i}")) + self.ui.writeSize.setModel(self._write_size_model) + self.ui.writeSize.setCurrentText("10") + self.ui.writeSize.currentTextChanged.connect(self._write_model.set_number_of_values) + + self.ui.writeAddress.valueChanged.connect(self._write_model.set_start_address) + self.ui.writeAddress.valueChanged.connect(self._writeAddress) + + @Slot(int) + def _writeAddress(self, i): + last_possible_index = 0 + currentIndex = self.ui.writeSize.currentIndex() + for ii in range(0, 10): + if ii < (10 - i): + last_possible_index = ii + self._write_size_model.item(ii).setEnabled(True) + else: + self._write_size_model.item(ii).setEnabled(False) + if currentIndex > last_possible_index: + self.ui.writeSize.setCurrentIndex(last_possible_index) + + def _close_device(self): + if self._modbus_device: + self._modbus_device.disconnectDevice() + del self._modbus_device + self._modbus_device = None + + def closeEvent(self, event): + self._close_device() + event.accept() + + def init_actions(self): + self.ui.actionConnect.setEnabled(True) + self.ui.actionDisconnect.setEnabled(False) + self.ui.actionExit.setEnabled(True) + self.ui.actionOptions.setEnabled(True) + + self.ui.connectButton.clicked.connect(self.onConnectButtonClicked) + self.ui.actionConnect.triggered.connect(self.onConnectButtonClicked) + self.ui.actionDisconnect.triggered.connect(self.onConnectButtonClicked) + self.ui.readButton.clicked.connect(self.onReadButtonClicked) + self.ui.writeButton.clicked.connect(self.onWriteButtonClicked) + self.ui.readWriteButton.clicked.connect(self.onReadWriteButtonClicked) + self.ui.connectType.currentIndexChanged.connect(self.onConnectTypeChanged) + self.ui.writeTable.currentIndexChanged.connect(self.onWriteTableChanged) + + self.ui.actionExit.triggered.connect(self.close) + self.ui.actionOptions.triggered.connect(self._settings_dialog.show) + + @Slot(int) + def onConnectTypeChanged(self, index): + self._close_device() + + if index == ModbusConnection.SERIAL: + self._modbus_device = QModbusRtuSerialClient(self) + elif index == ModbusConnection.TCP: + self._modbus_device = QModbusTcpClient(self) + if not self.ui.portEdit.text(): + self.ui.portEdit.setText("127.0.0.1:502") + + self._modbus_device.errorOccurred.connect(self._show_device_errorstring) + + if not self._modbus_device: + self.ui.connectButton.setDisabled(True) + message = "Could not create Modbus client." + self.statusBar().showMessage(message, 5000) + else: + self._modbus_device.stateChanged.connect(self.onModbusStateChanged) + + @Slot() + def _show_device_errorstring(self): + self.statusBar().showMessage(self._modbus_device.errorString(), 5000) + + @Slot() + def onConnectButtonClicked(self): + if not self._modbus_device: + return + + self.statusBar().clearMessage() + md = self._modbus_device + if md.state() != QModbusDevice.ConnectedState: + settings = self._settings_dialog.settings() + if self.ui.connectType.currentIndex() == ModbusConnection.SERIAL: + md.setConnectionParameter(QModbusDevice.SerialPortNameParameter, + self.ui.portEdit.text()) + md.setConnectionParameter(QModbusDevice.SerialParityParameter, + settings.parity) + md.setConnectionParameter(QModbusDevice.SerialBaudRateParameter, + settings.baud) + md.setConnectionParameter(QModbusDevice.SerialDataBitsParameter, + settings.data_bits) + md.setConnectionParameter(QModbusDevice.SerialStopBitsParameter, + settings.stop_bits) + else: + url = QUrl.fromUserInput(self.ui.portEdit.text()) + md.setConnectionParameter(QModbusDevice.NetworkPortParameter, + url.port()) + md.setConnectionParameter(QModbusDevice.NetworkAddressParameter, + url.host()) + + md.setTimeout(settings.response_time) + md.setNumberOfRetries(settings.number_of_retries) + if not md.connectDevice(): + message = "Connect failed: " + md.errorString() + self.statusBar().showMessage(message, 5000) + else: + self.ui.actionConnect.setEnabled(False) + self.ui.actionDisconnect.setEnabled(True) + + else: + md.disconnectDevice() + self.ui.actionConnect.setEnabled(True) + self.ui.actionDisconnect.setEnabled(False) + + @Slot(int) + def onModbusStateChanged(self, state): + connected = (state != QModbusDevice.UnconnectedState) + self.ui.actionConnect.setEnabled(not connected) + self.ui.actionDisconnect.setEnabled(connected) + + if state == QModbusDevice.UnconnectedState: + self.ui.connectButton.setText("Connect") + elif state == QModbusDevice.ConnectedState: + self.ui.connectButton.setText("Disconnect") + + @Slot() + def onReadButtonClicked(self): + if not self._modbus_device: + return + self.ui.readValue.clear() + self.statusBar().clearMessage() + reply = self._modbus_device.sendReadRequest(self.read_request(), + self.ui.serverEdit.value()) + if reply: + if not reply.isFinished(): + reply.finished.connect(functools.partial(self.onReadReady, reply)) + else: + del reply # broadcast replies return immediately + else: + message = "Read error: " + self._modbus_device.errorString() + self.statusBar().showMessage(message, 5000) + + @Slot() + def onReadReady(self, reply): + if not reply: + return + + if reply.error() == QModbusDevice.NoError: + unit = reply.result() + total = unit.valueCount() + for i in range(0, total): + addr = unit.startAddress() + i + value = unit.value(i) + if unit.registerType().value <= QModbusDataUnit.Coils.value: + entry = f"Address: {addr}, Value: {value}" + else: + entry = f"Address: {addr}, Value: {value:x}" + self.ui.readValue.addItem(entry) + + elif reply.error() == QModbusDevice.ProtocolError: + e = reply.errorString() + ex = reply.rawResult().exceptionCode() + message = f"Read response error: {e} (Modbus exception: 0x{ex:x})" + self.statusBar().showMessage(message, 5000) + else: + e = reply.errorString() + code = int(reply.error()) + message = f"Read response error: {e} (code: 0x{code:x})" + self.statusBar().showMessage(message, 5000) + + reply.deleteLater() + + @Slot() + def onWriteButtonClicked(self): + if not self._modbus_device: + return + self.statusBar().clearMessage() + + write_unit = self.write_request() + total = write_unit.valueCount() + table = write_unit.registerType() + for i in range(0, total): + addr = i + write_unit.startAddress() + if table == QModbusDataUnit.Coils: + write_unit.setValue(i, self._write_model.m_coils[addr]) + else: + write_unit.setValue(i, self._write_model.m_holdingRegisters[addr]) + + reply = self._modbus_device.sendWriteRequest(write_unit, + self.ui.serverEdit.value()) + if reply: + if reply.isFinished(): + # broadcast replies return immediately + reply.deleteLater() + else: + reply.finished.connect(functools.partial(self._write_finished, reply)) + else: + message = "Write error: " + self._modbus_device.errorString() + self.statusBar().showMessage(message, 5000) + + @Slot(QModbusReply) + def _write_finished(self, reply): + error = reply.error() + if error == QModbusDevice.ProtocolError: + e = reply.errorString() + ex = reply.rawResult().exceptionCode() + message = f"Write response error: {e} (Modbus exception: 0x{ex:x}" + self.statusBar().showMessage(message, 5000) + elif error != QModbusDevice.NoError: + e = reply.errorString() + message = f"Write response error: {e} (code: 0x{error:x})" + self.statusBar().showMessage(message, 5000) + reply.deleteLater() + + @Slot() + def onReadWriteButtonClicked(self): + if not self._modbus_device: + return + self.ui.readValue.clear() + self.statusBar().clearMessage() + + write_unit = self.write_request() + table = write_unit.registerType() + total = write_unit.valueCount() + for i in range(0, total): + addr = i + write_unit.startAddress() + if table == QModbusDataUnit.Coils: + write_unit.setValue(i, self._write_model.m_coils[addr]) + else: + write_unit.setValue(i, self._write_model.m_holdingRegisters[addr]) + + reply = self._modbus_device.sendReadWriteRequest(self.read_request(), + write_unit, + self.ui.serverEdit.value()) + if reply: + if not reply.isFinished(): + reply.finished.connect(functools.partial(self.onReadReady, reply)) + else: + del reply # broadcast replies return immediately + else: + message = "Read error: " + self._modbus_device.errorString() + self.statusBar().showMessage(message, 5000) + + @Slot(int) + def onWriteTableChanged(self, index): + coils_or_holding = index == 0 or index == 3 + if coils_or_holding: + self.ui.writeValueTable.setColumnHidden(1, index != 0) + self.ui.writeValueTable.setColumnHidden(2, index != 3) + self.ui.writeValueTable.resizeColumnToContents(0) + + self.ui.readWriteButton.setEnabled(index == 3) + self.ui.writeButton.setEnabled(coils_or_holding) + self.ui.writeGroupBox.setEnabled(coils_or_holding) + + def read_request(self): + table = self.ui.writeTable.currentData() + + start_address = self.ui.readAddress.value() + assert start_address >= 0 and start_address < 10 + + # do not go beyond 10 entries + number_of_entries = min(int(self.ui.readSize.currentText()), + 10 - start_address) + return QModbusDataUnit(table, start_address, number_of_entries) + + def write_request(self): + table = self.ui.writeTable.currentData() + + start_address = self.ui.writeAddress.value() + assert start_address >= 0 and start_address < 10 + + # do not go beyond 10 entries + number_of_entries = min(int(self.ui.writeSize.currentText()), + 10 - start_address) + return QModbusDataUnit(table, start_address, number_of_entries) |