aboutsummaryrefslogtreecommitdiffstats
path: root/examples/serialbus/modbus/modbusclient/mainwindow.py
diff options
context:
space:
mode:
Diffstat (limited to 'examples/serialbus/modbus/modbusclient/mainwindow.py')
-rw-r--r--examples/serialbus/modbus/modbusclient/mainwindow.py331
1 files changed, 331 insertions, 0 deletions
diff --git a/examples/serialbus/modbus/modbusclient/mainwindow.py b/examples/serialbus/modbus/modbusclient/mainwindow.py
new file mode 100644
index 000000000..02f9d478b
--- /dev/null
+++ b/examples/serialbus/modbus/modbusclient/mainwindow.py
@@ -0,0 +1,331 @@
+# Copyright (C) 2022 The Qt Company Ltd.
+# SPDX-License-Identifier: LicenseRef-Qt-Commercial OR BSD-3-Clause
+
+import functools
+from enum import IntEnum
+
+from PySide6.QtCore import QUrl, Slot
+from PySide6.QtGui import QStandardItemModel, QStandardItem
+from PySide6.QtWidgets import QMainWindow
+from PySide6.QtSerialBus import (QModbusDataUnit,
+ QModbusDevice, QModbusReply,
+ QModbusRtuSerialClient, QModbusTcpClient)
+
+from ui_mainwindow import Ui_MainWindow
+from settingsdialog import SettingsDialog
+from writeregistermodel import WriteRegisterModel
+
+
+class ModbusConnection(IntEnum):
+ SERIAL = 0
+ TCP = 1
+
+
+class MainWindow(QMainWindow):
+
+ def __init__(self, parent=None):
+ super().__init__(parent)
+ self.ui = Ui_MainWindow()
+ self.ui.setupUi(self)
+
+ self._modbus_device = None
+
+ self._settings_dialog = SettingsDialog(self)
+
+ self.init_actions()
+
+ self._write_model = WriteRegisterModel(self)
+ self._write_model.set_start_address(self.ui.writeAddress.value())
+ self._write_model.set_number_of_values(self.ui.writeSize.currentText())
+
+ self.ui.writeValueTable.setModel(self._write_model)
+ self.ui.writeValueTable.hideColumn(2)
+ vp = self.ui.writeValueTable.viewport()
+ self._write_model.update_viewport.connect(vp.update)
+
+ self.ui.writeTable.addItem("Coils", QModbusDataUnit.Coils)
+ self.ui.writeTable.addItem("Discrete Inputs", QModbusDataUnit.DiscreteInputs)
+ self.ui.writeTable.addItem("Input Registers", QModbusDataUnit.InputRegisters)
+ self.ui.writeTable.addItem("Holding Registers", QModbusDataUnit.HoldingRegisters)
+
+ self.ui.connectType.setCurrentIndex(0)
+ self.onConnectTypeChanged(0)
+
+ self._write_size_model = QStandardItemModel(0, 1, self)
+ for i in range(1, 11):
+ self._write_size_model.appendRow(QStandardItem(f"{i}"))
+ self.ui.writeSize.setModel(self._write_size_model)
+ self.ui.writeSize.setCurrentText("10")
+ self.ui.writeSize.currentTextChanged.connect(self._write_model.set_number_of_values)
+
+ self.ui.writeAddress.valueChanged.connect(self._write_model.set_start_address)
+ self.ui.writeAddress.valueChanged.connect(self._writeAddress)
+
+ @Slot(int)
+ def _writeAddress(self, i):
+ last_possible_index = 0
+ currentIndex = self.ui.writeSize.currentIndex()
+ for ii in range(0, 10):
+ if ii < (10 - i):
+ last_possible_index = ii
+ self._write_size_model.item(ii).setEnabled(True)
+ else:
+ self._write_size_model.item(ii).setEnabled(False)
+ if currentIndex > last_possible_index:
+ self.ui.writeSize.setCurrentIndex(last_possible_index)
+
+ def _close_device(self):
+ if self._modbus_device:
+ self._modbus_device.disconnectDevice()
+ del self._modbus_device
+ self._modbus_device = None
+
+ def closeEvent(self, event):
+ self._close_device()
+ event.accept()
+
+ def init_actions(self):
+ self.ui.actionConnect.setEnabled(True)
+ self.ui.actionDisconnect.setEnabled(False)
+ self.ui.actionExit.setEnabled(True)
+ self.ui.actionOptions.setEnabled(True)
+
+ self.ui.connectButton.clicked.connect(self.onConnectButtonClicked)
+ self.ui.actionConnect.triggered.connect(self.onConnectButtonClicked)
+ self.ui.actionDisconnect.triggered.connect(self.onConnectButtonClicked)
+ self.ui.readButton.clicked.connect(self.onReadButtonClicked)
+ self.ui.writeButton.clicked.connect(self.onWriteButtonClicked)
+ self.ui.readWriteButton.clicked.connect(self.onReadWriteButtonClicked)
+ self.ui.connectType.currentIndexChanged.connect(self.onConnectTypeChanged)
+ self.ui.writeTable.currentIndexChanged.connect(self.onWriteTableChanged)
+
+ self.ui.actionExit.triggered.connect(self.close)
+ self.ui.actionOptions.triggered.connect(self._settings_dialog.show)
+
+ @Slot(int)
+ def onConnectTypeChanged(self, index):
+ self._close_device()
+
+ if index == ModbusConnection.SERIAL:
+ self._modbus_device = QModbusRtuSerialClient(self)
+ elif index == ModbusConnection.TCP:
+ self._modbus_device = QModbusTcpClient(self)
+ if not self.ui.portEdit.text():
+ self.ui.portEdit.setText("127.0.0.1:502")
+
+ self._modbus_device.errorOccurred.connect(self._show_device_errorstring)
+
+ if not self._modbus_device:
+ self.ui.connectButton.setDisabled(True)
+ message = "Could not create Modbus client."
+ self.statusBar().showMessage(message, 5000)
+ else:
+ self._modbus_device.stateChanged.connect(self.onModbusStateChanged)
+
+ @Slot()
+ def _show_device_errorstring(self):
+ self.statusBar().showMessage(self._modbus_device.errorString(), 5000)
+
+ @Slot()
+ def onConnectButtonClicked(self):
+ if not self._modbus_device:
+ return
+
+ self.statusBar().clearMessage()
+ md = self._modbus_device
+ if md.state() != QModbusDevice.ConnectedState:
+ settings = self._settings_dialog.settings()
+ if self.ui.connectType.currentIndex() == ModbusConnection.SERIAL:
+ md.setConnectionParameter(QModbusDevice.SerialPortNameParameter,
+ self.ui.portEdit.text())
+ md.setConnectionParameter(QModbusDevice.SerialParityParameter,
+ settings.parity)
+ md.setConnectionParameter(QModbusDevice.SerialBaudRateParameter,
+ settings.baud)
+ md.setConnectionParameter(QModbusDevice.SerialDataBitsParameter,
+ settings.data_bits)
+ md.setConnectionParameter(QModbusDevice.SerialStopBitsParameter,
+ settings.stop_bits)
+ else:
+ url = QUrl.fromUserInput(self.ui.portEdit.text())
+ md.setConnectionParameter(QModbusDevice.NetworkPortParameter,
+ url.port())
+ md.setConnectionParameter(QModbusDevice.NetworkAddressParameter,
+ url.host())
+
+ md.setTimeout(settings.response_time)
+ md.setNumberOfRetries(settings.number_of_retries)
+ if not md.connectDevice():
+ message = "Connect failed: " + md.errorString()
+ self.statusBar().showMessage(message, 5000)
+ else:
+ self.ui.actionConnect.setEnabled(False)
+ self.ui.actionDisconnect.setEnabled(True)
+
+ else:
+ md.disconnectDevice()
+ self.ui.actionConnect.setEnabled(True)
+ self.ui.actionDisconnect.setEnabled(False)
+
+ @Slot(int)
+ def onModbusStateChanged(self, state):
+ connected = (state != QModbusDevice.UnconnectedState)
+ self.ui.actionConnect.setEnabled(not connected)
+ self.ui.actionDisconnect.setEnabled(connected)
+
+ if state == QModbusDevice.UnconnectedState:
+ self.ui.connectButton.setText("Connect")
+ elif state == QModbusDevice.ConnectedState:
+ self.ui.connectButton.setText("Disconnect")
+
+ @Slot()
+ def onReadButtonClicked(self):
+ if not self._modbus_device:
+ return
+ self.ui.readValue.clear()
+ self.statusBar().clearMessage()
+ reply = self._modbus_device.sendReadRequest(self.read_request(),
+ self.ui.serverEdit.value())
+ if reply:
+ if not reply.isFinished():
+ reply.finished.connect(functools.partial(self.onReadReady, reply))
+ else:
+ del reply # broadcast replies return immediately
+ else:
+ message = "Read error: " + self._modbus_device.errorString()
+ self.statusBar().showMessage(message, 5000)
+
+ @Slot()
+ def onReadReady(self, reply):
+ if not reply:
+ return
+
+ if reply.error() == QModbusDevice.NoError:
+ unit = reply.result()
+ total = unit.valueCount()
+ for i in range(0, total):
+ addr = unit.startAddress() + i
+ value = unit.value(i)
+ if unit.registerType().value <= QModbusDataUnit.Coils.value:
+ entry = f"Address: {addr}, Value: {value}"
+ else:
+ entry = f"Address: {addr}, Value: {value:x}"
+ self.ui.readValue.addItem(entry)
+
+ elif reply.error() == QModbusDevice.ProtocolError:
+ e = reply.errorString()
+ ex = reply.rawResult().exceptionCode()
+ message = f"Read response error: {e} (Modbus exception: 0x{ex:x})"
+ self.statusBar().showMessage(message, 5000)
+ else:
+ e = reply.errorString()
+ code = int(reply.error())
+ message = f"Read response error: {e} (code: 0x{code:x})"
+ self.statusBar().showMessage(message, 5000)
+
+ reply.deleteLater()
+
+ @Slot()
+ def onWriteButtonClicked(self):
+ if not self._modbus_device:
+ return
+ self.statusBar().clearMessage()
+
+ write_unit = self.write_request()
+ total = write_unit.valueCount()
+ table = write_unit.registerType()
+ for i in range(0, total):
+ addr = i + write_unit.startAddress()
+ if table == QModbusDataUnit.Coils:
+ write_unit.setValue(i, self._write_model.m_coils[addr])
+ else:
+ write_unit.setValue(i, self._write_model.m_holdingRegisters[addr])
+
+ reply = self._modbus_device.sendWriteRequest(write_unit,
+ self.ui.serverEdit.value())
+ if reply:
+ if reply.isFinished():
+ # broadcast replies return immediately
+ reply.deleteLater()
+ else:
+ reply.finished.connect(functools.partial(self._write_finished, reply))
+ else:
+ message = "Write error: " + self._modbus_device.errorString()
+ self.statusBar().showMessage(message, 5000)
+
+ @Slot(QModbusReply)
+ def _write_finished(self, reply):
+ error = reply.error()
+ if error == QModbusDevice.ProtocolError:
+ e = reply.errorString()
+ ex = reply.rawResult().exceptionCode()
+ message = f"Write response error: {e} (Modbus exception: 0x{ex:x}"
+ self.statusBar().showMessage(message, 5000)
+ elif error != QModbusDevice.NoError:
+ e = reply.errorString()
+ message = f"Write response error: {e} (code: 0x{error:x})"
+ self.statusBar().showMessage(message, 5000)
+ reply.deleteLater()
+
+ @Slot()
+ def onReadWriteButtonClicked(self):
+ if not self._modbus_device:
+ return
+ self.ui.readValue.clear()
+ self.statusBar().clearMessage()
+
+ write_unit = self.write_request()
+ table = write_unit.registerType()
+ total = write_unit.valueCount()
+ for i in range(0, total):
+ addr = i + write_unit.startAddress()
+ if table == QModbusDataUnit.Coils:
+ write_unit.setValue(i, self._write_model.m_coils[addr])
+ else:
+ write_unit.setValue(i, self._write_model.m_holdingRegisters[addr])
+
+ reply = self._modbus_device.sendReadWriteRequest(self.read_request(),
+ write_unit,
+ self.ui.serverEdit.value())
+ if reply:
+ if not reply.isFinished():
+ reply.finished.connect(functools.partial(self.onReadReady, reply))
+ else:
+ del reply # broadcast replies return immediately
+ else:
+ message = "Read error: " + self._modbus_device.errorString()
+ self.statusBar().showMessage(message, 5000)
+
+ @Slot(int)
+ def onWriteTableChanged(self, index):
+ coils_or_holding = index == 0 or index == 3
+ if coils_or_holding:
+ self.ui.writeValueTable.setColumnHidden(1, index != 0)
+ self.ui.writeValueTable.setColumnHidden(2, index != 3)
+ self.ui.writeValueTable.resizeColumnToContents(0)
+
+ self.ui.readWriteButton.setEnabled(index == 3)
+ self.ui.writeButton.setEnabled(coils_or_holding)
+ self.ui.writeGroupBox.setEnabled(coils_or_holding)
+
+ def read_request(self):
+ table = self.ui.writeTable.currentData()
+
+ start_address = self.ui.readAddress.value()
+ assert start_address >= 0 and start_address < 10
+
+ # do not go beyond 10 entries
+ number_of_entries = min(int(self.ui.readSize.currentText()),
+ 10 - start_address)
+ return QModbusDataUnit(table, start_address, number_of_entries)
+
+ def write_request(self):
+ table = self.ui.writeTable.currentData()
+
+ start_address = self.ui.writeAddress.value()
+ assert start_address >= 0 and start_address < 10
+
+ # do not go beyond 10 entries
+ number_of_entries = min(int(self.ui.writeSize.currentText()),
+ 10 - start_address)
+ return QModbusDataUnit(table, start_address, number_of_entries)