from PySide6.QtCore import QObject, QTimer, Signal, Slot, Property from PySide6.QtSerialBus import QModbusTcpClient, QModbusDevice, QModbusDataUnit import struct from functools import partial # verifier l'ordre des registres attendus par l'automate (big endian / little endian) class ConnectionHandler(QObject): # Signals for UI updates connectionStatusChanged = Signal(bool) errorOccurred = Signal(str) vitesseReelUpdated = Signal(float) courantReelUpdated = Signal(float) CpReelUpdated = Signal(float) CmaxReelUpdated = Signal(float) XmaxReelUpdated = Signal(float) penteReelUpdated = Signal(float) WpReelUpdated = Signal(float) T1ReelUpdated = Signal(float) T2ReelUpdated = Signal(float) def __init__(self, parent=None): super().__init__(parent) self._modbus_device = QModbusTcpClient(self) self._is_connected = False self._connection_check_timer = QTimer(self) self._connection_check_timer.setInterval(300) self._connection_check_timer.timeout.connect(self._read_all_registers) # Connect Modbus signals self._modbus_device.stateChanged.connect(self._on_state_changed) self._modbus_device.errorOccurred.connect(self._on_error) # List of registers to decode self._float_map = { 1: self.vitesseReelUpdated, 3: self.courantReelUpdated, 5: self.CpReelUpdated, 7: self.penteReelUpdated, 9: self.T1ReelUpdated, 11: self.T2ReelUpdated, 13: self.XmaxReelUpdated, 15: self.WpReelUpdated, 17: self.CmaxReelUpdated, } # ------------------------------- # Properties # ------------------------------- @Property(bool, notify=connectionStatusChanged) def is_connected(self): return self._is_connected # ------------------------------- # Connection handling # ------------------------------- @Slot(list, int, int) def connect_button_clicked(self, ip_parts, port, server_address): self._server_address = server_address # Validate IP address try: ip_clean = ".".join(str(int(float(p))) for p in ip_parts[:4]) except (ValueError, TypeError): self.errorOccurred.emit("Invalid IP address format") return print(f"[PY] Attempting connection to {ip_clean}:{port} (server={server_address})") # Prevent connection spam state = self._modbus_device.state() if state == QModbusDevice.ConnectingState: self.errorOccurred.emit("Already connecting... please wait.") return if state == QModbusDevice.ConnectedState: print("[PY] Disconnecting from Modbus...") self._modbus_device.disconnectDevice() return # Set connection parameters self._modbus_device.setConnectionParameter(QModbusDevice.NetworkAddressParameter, ip_clean) self._modbus_device.setConnectionParameter(QModbusDevice.NetworkPortParameter, port) self._modbus_device.setTimeout(2000) self._modbus_device.setNumberOfRetries(3) # Initiate connection if not self._modbus_device.connectDevice(): # Only catches immediate internal errors self.errorOccurred.emit("Failed to start Modbus connection") print("[PY] connectDevice() returned False") else: print("[PY] connectDevice() initiated (asynchronous)") def _on_state_changed(self, state): print(f"[PY] Modbus state changed: {state}") connected = (state == QModbusDevice.ConnectedState) if connected != self._is_connected: self._is_connected = connected self.connectionStatusChanged.emit(connected) if connected: print("[PY] Connection successful, starting read timer.") self._connection_check_timer.start() else: print("[PY] Connection closed.") self._connection_check_timer.stop() def _on_error(self, error): if error != QModbusDevice.NoError: msg = self._modbus_device.errorString() print(f"[PY] Modbus error: {msg}") self.errorOccurred.emit(msg) # ------------------------------- # Reading optimized # ------------------------------- def _read_all_registers(self): """Reads 17 registers in one Modbus request""" if self._modbus_device.state() != QModbusDevice.ConnectedState: return start_addr = 1 count = 18 # we need 18 registers so that address 17 has 2 registers unit = QModbusDataUnit(QModbusDataUnit.HoldingRegisters, start_addr, count) reply = self._modbus_device.sendReadRequest(unit, self._server_address) if not reply: print("[PY] Read request failed to start") return if reply.isFinished(): reply.deleteLater() return # Safe connection with partial to avoid lambda capture problems reply.finished.connect(partial(self._handle_block_reply, reply)) # ------------------------------- # Reply decoding # ------------------------------- def _handle_block_reply(self, reply): if reply.error() != QModbusDevice.NoError: msg = reply.errorString() print(f"[PY] Read error: {msg}") self.errorOccurred.emit(msg) reply.deleteLater() return result = reply.result() if not result.isValid(): print("[PY] Invalid Modbus result") self.errorOccurred.emit("Invalid Modbus result") reply.deleteLater() return registers = result.values() # Decode all floats from the block for start_addr, signal in self._float_map.items(): idx = start_addr - 1 # convert Modbus start to list index if idx + 1 >= len(registers): print(f"[PY] Missing registers for address {start_addr}") continue # Build 32-bit raw int from 2 registers raw = (registers[idx + 1] << 16) | registers[idx] # Convert to float32 LE value = struct.unpack("= 2: raw_value = (registers[1] << 16) | registers[0] value = struct.unpack("