From ebb739a3a00f060002e5c459eabc5235280891a7 Mon Sep 17 00:00:00 2001 From: simon Date: Sat, 26 Jul 2025 10:39:11 +0200 Subject: [PATCH] Removed old vibe coded Python Test Tool --- tools/main.py | 248 --------------------------------------- tools/message_builder.py | 115 ------------------ tools/parser.py | 170 --------------------------- tools/payload_parser.py | 192 ------------------------------ 4 files changed, 725 deletions(-) delete mode 100644 tools/main.py delete mode 100644 tools/message_builder.py delete mode 100644 tools/parser.py delete mode 100644 tools/payload_parser.py diff --git a/tools/main.py b/tools/main.py deleted file mode 100644 index 15c1651..0000000 --- a/tools/main.py +++ /dev/null @@ -1,248 +0,0 @@ -import queue # Zum sicheren Datenaustausch zwischen Threads -import serial -import time -import threading -import sys -from parser import UartMessageParser, ParserError -from message_builder import MessageBuilder, MessageBuilderError, PayloadTooLargeError, BufferOverflowError -import payload_parser -from rich.console import Console -from rich.table import Table - -SERIAL_PORT = "/dev/ttyUSB0" -BAUDRATE = 115200 -WRITE_TIMEOUT = 1.5 -READ_TIMEOUT = 2.0 - -payload_parser = payload_parser.PayloadParser() - - -def on_message_received_from_uart(parsed_message): - print(f"[CALLBACK] Nachricht empfangen: MSGID=0x{ - parsed_message.msgid:02X}, Length={parsed_message.payload_len}") - received_message_queue.put(parsed_message) - - -def on_message_fail_from_uart(error_message): - print(f"[CALLBACK] Fehler beim Parsen: {error_message}") - - -class ParsedMessage: - def __init__(self, msgid, payload_len): - self.msgid = msgid - self.payload_len = payload_len - - -received_message_queue = queue.Queue() - - -class SerialReader(threading.Thread): - # Ändere den Konstruktor, um eine bereits geöffnete serielle Instanz zu akzeptieren - def __init__(self, ser_instance, read_timeout, parser): - super().__init__() - # Speichere die übergebene serielle Instanz - self.ser = ser_instance - self.read_timeout = read_timeout - self.parser = parser - self.running = False - self.daemon = True # Thread beendet sich mit dem Hauptprogramm - - def run(self): - # Überprüfe, ob die serielle Schnittstelle wirklich offen ist, bevor du beginnst - if not self.ser or not self.ser.is_open: - print( - f"[{self.name}] Fehler: Serielle Schnittstelle ist nicht geöffnet.") - return - - print(f"[{self.name}] Lese-Thread gestartet. Überwache {self.ser.port}...") - self.ser.timeout = self.read_timeout # Setze den Timeout für byteweises Lesen - self.running = True - - while self.running: - try: - byte = self.ser.read(1) - if byte: - self.parser.parse_byte(byte[0]) - else: - pass # Timeout, kein Byte verfügbar, Thread läuft weiter - except serial.SerialException as e: - print(f"[{self.name}] Lesefehler: {e}") - self.running = False - except Exception as e: - print(f"[{self.name}] Unerwarteter Fehler im Lese-Thread: {e}") - self.running = False - - # Der Thread schließt den Port NICHT mehr, das ist Aufgabe des Hauptprogramms. - print(f"[{self.name}] Lese-Thread beendet.") - - def stop(self): - self.running = False - print(f"[{self.name}] Lese-Thread wird beendet...") - - -def on_message_received_from_uart(message_id: int, payload: bytes, payload_length: int): - """ - Callback-Funktion, die aufgerufen wird, wenn der Parser eine vollständige, - gültige Nachricht empfangen hat. - """ - print(f"\n[MAIN] Nachricht erfolgreich empfangen! ID: 0x{ - message_id:02X}") - print(f"[MAIN] Payload ({payload_length} Bytes): { - payload[:payload_length].hex().upper()}") - - parsed_object = payload_parser.parse_payload( - message_id, payload[:payload_length]) - - if message_id == 0x04: - print(parsed_object) - table = Table(title="Clients") - columns = ["ClientId", "IsAvailable", - "IsSlotUsed", "MAC", "LastPing", "LastSuccesfullPing"] - - rows = [] - for x in parsed_object.clients: - mac_string = ':'.join(f'{byte:02x}' for byte in x.mac_address) - rows.append([str(x.client_id), str(x.is_available), str(x.is_slot_used), mac_string, - str(x.last_ping), str(x.last_successfull_ping)]) - - for column in columns: - table.add_column(column) - - for row in rows: - table.add_row(*row, style='bright_green') - - console = Console() - console.print(table) - - -def on_message_fail_from_uart(message_id: int, current_message_buffer: bytes, - current_index: int, error_type: ParserError): - """ - Callback-Funktion, die aufgerufen wird, wenn der Parser einen Fehler - beim Empfang einer Nachricht feststellt. - """ - print(f"\n[MAIN] Fehler beim Parsen der Nachricht! ID: 0x{ - message_id:02X}") - print(f"[MAIN] Fehler: {error_type.name}") - print(f"[MAIN] Bisheriger Puffer ({current_index} Bytes): { - current_message_buffer[:current_index].hex().upper()}") - - -def run_uart_test(): - """ - Führt den UART-Test durch: Sendet eine Nachricht und liest alle Antworten. - """ - ser = None - - parser = UartMessageParser( - on_message_received_callback=on_message_received_from_uart, - on_message_fail_callback=on_message_fail_from_uart - ) - message_builder = MessageBuilder() - - try: - ser = serial.Serial( - port=SERIAL_PORT, - baudrate=BAUDRATE, - timeout=READ_TIMEOUT, - write_timeout=WRITE_TIMEOUT - ) - print(f"Serielle Schnittstelle { - SERIAL_PORT} mit Baudrate {BAUDRATE} geöffnet.") - - reader_thread = SerialReader( - ser_instance=ser, - read_timeout=10, - parser=parser - ) - reader_thread.start() # Starte den Lese-Thread - - while not reader_thread.running: - time.sleep(0.1) - - print("\n--- UART Testkonsole ---") - print("Gib eine Zahl (1-10) ein, um eine Nachricht zu senden.") - print("Gib 'q' oder 'exit' ein, um das Programm zu beenden.") - - while True: - # Warte auf Benutzereingabe - user_input = sys.stdin.readline().strip().lower() - - if user_input in ('q', 'exit'): - break - - try: - choice = int(user_input) - if choice in MESSAGES: - msg_info = MESSAGES[choice] - print(f"\n[MAIN] Sende Nachricht für Option { - choice} (MSGID: 0x{msg_info['msg_id']:02X})...") - try: - message_to_send = message_builder.build_message( - msg_info["msg_id"], - msg_info["payload"], - 255 # Max Payload Length - ) - print(f"[MAIN] Gebaute Nachricht zum Senden: { - message_to_send.hex().upper()}") - bytes_written = ser.write(message_to_send) - print(f"[MAIN] {bytes_written} Bytes gesendet.") - except (PayloadTooLargeError, BufferOverflowError) as e: - print(f"[MAIN] Fehler beim Bauen der Nachricht: {e}") - except Exception as e: - print( - f"[MAIN] Ein unerwarteter Fehler beim Senden der Nachricht ist aufgetreten: {e}") - else: - print( - "Ungültige Option. Bitte gib eine Zahl zwischen 1 und 10 ein.") - except ValueError: - print("Ungültige Eingabe. Bitte gib eine Zahl oder 'q' ein.") - except Exception as e: - print( - f"[MAIN] Ein unerwarteter Fehler bei der Eingabeverarbeitung ist aufgetreten: {e}") - - # Verarbeite empfangene Nachrichten, die sich in der Queue angesammelt haben - while not received_message_queue.empty(): - msg = received_message_queue.get() - print( - f" > [MAIN-Loop] Verarbeitet: MSGID=0x{msg.msgid:02X}, Length={msg.payload_len}") - received_message_queue.task_done() - - except serial.SerialException as e: - print(f"Fehler beim Zugriff auf die serielle Schnittstelle: {e}") - print(f"Stelle sicher, dass '{ - SERIAL_PORT}' der korrekte Port ist und nicht von einer anderen Anwendung verwendet wird.") - except KeyboardInterrupt: - print("\n[MAIN] Test durch Benutzer abgebrochen (Ctrl+C).") - except Exception as e: - print(f"Ein unerwarteter Fehler im Hauptprogramm ist aufgetreten: {e}") - finally: - if 'reader_thread' in locals() and reader_thread.is_alive(): - reader_thread.stop() - reader_thread.join(timeout=5) - if reader_thread.is_alive(): - print( - "[MAIN] Warnung: Lese-Thread konnte nicht sauber beendet werden.") - if ser and ser.is_open: - ser.close() - print("Serielle Schnittstelle geschlossen.") - print("[MAIN] Programm beendet.") - - -# Nachrichten-Mapping -MESSAGES = { - 1: {"msg_id": 0x01, "payload": b"Echo Message 1"}, - 2: {"msg_id": 0x02, "payload": b"Version Request"}, - 3: {"msg_id": 0x03, "payload": b"Client Info Request"}, - 4: {"msg_id": 0x04, "payload": b"Custom Data 4"}, - 5: {"msg_id": 0x05, "payload": b"Custom Data 5"}, - 6: {"msg_id": 0x06, "payload": b"Custom Data 6"}, - 7: {"msg_id": 0x07, "payload": b"Custom Data 7"}, - 8: {"msg_id": 0x08, "payload": b"Custom Data 8"}, - 9: {"msg_id": 0x09, "payload": b"Custom Data 9"}, - 10: {"msg_id": 0x0A, "payload": b"Custom Data 10 - Last One!"}, -} - -# Führe den Test aus -if __name__ == "__main__": - run_uart_test() diff --git a/tools/message_builder.py b/tools/message_builder.py deleted file mode 100644 index efcf9c1..0000000 --- a/tools/message_builder.py +++ /dev/null @@ -1,115 +0,0 @@ -import enum - -START_BYTE = 0xAA -ESCAPE_BYTE = 0xBB -END_BYTE = 0xCC - - -class MessageBuilderError(Exception): - """Basisklasse für Fehler des Message Builders.""" - pass - - -class PayloadTooLargeError(MessageBuilderError): - """Ausnahme, wenn der Payload zu groß für den Puffer ist.""" - - def __init__(self, required_size, buffer_size): - super().__init__(f"Payload ({ - required_size} bytes) ist größer als der verfügbare Puffer ({buffer_size} bytes).") - self.required_size = required_size - self.buffer_size = buffer_size - - -class BufferOverflowError(MessageBuilderError): - """Ausnahme, wenn der Puffer während des Bauens überläuft.""" - - def __init__(self, current_size, max_size, byte_to_add=None): - msg = f"Pufferüberlauf: Aktuelle Größe { - current_size}, Max. Größe {max_size}." - if byte_to_add is not None: - msg += f" Versuch, Byte 0x{byte_to_add:02X} hinzuzufügen." - super().__init__(msg) - self.current_size = current_size - self.max_size = max_size - self.byte_to_add = byte_to_add - - -class MessageBuilder: - """ - Klasse zum Aufbau von UART-Nachrichten gemäß dem definierten Protokoll, - inklusive Stuffing und Checksummenberechnung. - """ - - def __init__(self): - pass - - def _needs_stuffing_byte(self, byte: int) -> bool: - """ - Prüft, ob ein Byte ein Stuffing-Byte benötigt (d.h. ob es ein Steuerbyte ist). - """ - return (byte == START_BYTE or byte == ESCAPE_BYTE or byte == END_BYTE) - - def _add_byte_with_length_check(self, byte: int, buffer: bytearray, max_length: int): - """ - Fügt ein Byte zum Puffer hinzu und prüft auf Pufferüberlauf. - Löst BufferOverflowError aus, wenn der Puffer voll ist. - """ - if len(buffer) >= max_length: - raise BufferOverflowError(len(buffer), max_length, byte) - buffer.append(byte) - - def build_message(self, msgid: int, payload: bytes, msg_buffer_size: int) -> bytes: - """ - Baut eine vollständige UART-Nachricht. - - Args: - msgid (int): Die Message ID (0-255). - payload (bytes): Die Nutzdaten der Nachricht als Byte-Objekt. - msg_buffer_size (int): Die maximale Größe des Ausgabepuffers. - Dies ist die maximale Länge der *fertigen* Nachricht. - - Returns: - bytes: Die fertig aufgebaute Nachricht als Byte-Objekt. - - Raises: - PayloadTooLargeError: Wenn der Payload (mit Overhead) den Puffer überschreiten würde. - BufferOverflowError: Wenn während des Bauens ein Pufferüberlauf auftritt. - """ - - if len(payload) + 4 > msg_buffer_size: - raise PayloadTooLargeError(len(payload) + 4, msg_buffer_size) - - checksum = 0 - msg_buffer = bytearray() - - # 1. StartByte hinzufügen - self._add_byte_with_length_check( - START_BYTE, msg_buffer, msg_buffer_size) - - # 2. Message ID hinzufügen (mit Stuffing) - if self._needs_stuffing_byte(msgid): - self._add_byte_with_length_check( - ESCAPE_BYTE, msg_buffer, msg_buffer_size) - self._add_byte_with_length_check(msgid, msg_buffer, msg_buffer_size) - checksum ^= msgid - - # 3. Payload-Bytes hinzufügen (mit Stuffing) - for byte_val in payload: - if self._needs_stuffing_byte(byte_val): - self._add_byte_with_length_check( - ESCAPE_BYTE, msg_buffer, msg_buffer_size) - self._add_byte_with_length_check( - byte_val, msg_buffer, msg_buffer_size) - checksum ^= byte_val - - # 4. Checksumme hinzufügen (mit Stuffing) - if self._needs_stuffing_byte(checksum): - self._add_byte_with_length_check( - ESCAPE_BYTE, msg_buffer, msg_buffer_size) - self._add_byte_with_length_check(checksum, msg_buffer, msg_buffer_size) - - # 5. EndByte hinzufügen - self._add_byte_with_length_check(END_BYTE, msg_buffer, msg_buffer_size) - - # Konvertiere bytearray zu unveränderlichem bytes-Objekt - return bytes(msg_buffer) diff --git a/tools/parser.py b/tools/parser.py deleted file mode 100644 index b06601b..0000000 --- a/tools/parser.py +++ /dev/null @@ -1,170 +0,0 @@ -import enum - -# --- Konstanten für das UART-Protokoll --- -# Diese Werte müssen mit denen auf deinem Embedded-System übereinstimmen -START_BYTE = 0xAA -END_BYTE = 0xCC -ESCAPE_BYTE = 0x7D # Beispielwert, bitte an dein Protokoll anpassen - -MAX_PAYLOAD_LENGTH = 255 # Maximale Länge des Nachrichten-Payloads (ohne Message ID und Checksumme) -# MAX_TOTAL_CONTENT_LENGTH in C beinhaltet Message ID, Payload und Checksumme. -# Hier definieren wir MAX_PAYLOAD_LENGTH, da der Parser den Payload sammelt. -# Die Gesamtgröße des empfangenen Puffers (message + checksum) darf MAX_PAYLOAD_LENGTH + 1 nicht überschreiten, -# da die Checksumme als letztes Byte des Payloads behandelt wird. - -# --- Enumerationen für Parser-Zustände und Fehler --- -class ParserState(enum.Enum): - WAITING_FOR_START_BYTE = 0 - GET_MESSAGE_TYPE = 1 - ESCAPED_MESSAGE_TYPE = 2 - IN_PAYLOAD = 3 - ESCAPE_PAYLOAD_BYTE = 4 - -class ParserError(enum.Enum): - NO_ERROR = 0 - UNEXPECTED_COMMAND_BYTE = 1 - WRONG_CHECKSUM = 2 - MESSAGE_TOO_LONG = 3 - -class UartMessageParser: - """ - Ein State-Machine-Parser für UART-Nachrichten basierend auf der bereitgestellten C-Logik. - - Nachrichtenformat (angenommen): - [START_BYTE] [MESSAGE_ID] [PAYLOAD_BYTES...] [CHECKSUM_BYTE] [END_BYTE] - - Escape-Sequenzen: - Wenn START_BYTE, END_BYTE oder ESCAPE_BYTE im MESSAGE_ID oder PAYLOAD vorkommen, - werden sie durch ESCAPE_BYTE gefolgt vom ursprünglichen Byte (nicht XORed) ersetzt. - Die Checksumme wird über die unescaped Bytes berechnet. - """ - - def __init__(self, on_message_received_callback=None, on_message_fail_callback=None): - """ - Initialisiert den UART-Nachrichten-Parser. - - Args: - on_message_received_callback (callable, optional): Eine Funktion, die aufgerufen wird, - wenn eine gültige Nachricht empfangen wurde. - Signatur: on_message_received(message_id: int, payload: bytes, payload_length: int) - on_message_fail_callback (callable, optional): Eine Funktion, die aufgerufen wird, - wenn ein Nachrichtenfehler auftritt. - Signatur: on_message_fail(message_id: int, current_message_buffer: bytes, - current_index: int, error_type: ParserError) - """ - self.state = ParserState.WAITING_FOR_START_BYTE - self.index = 0 - self.checksum = 0 - self.message_id = 0 - self.message_buffer = bytearray(MAX_PAYLOAD_LENGTH + 1) # +1 für Checksummen-Byte - self.error = ParserError.NO_ERROR - - # Callbacks für die Anwendung. Standardmäßig None oder einfache Print-Funktionen. - self.on_message_received = on_message_received_callback if on_message_received_callback else self._default_on_message_received - self.on_message_fail = on_message_fail_callback if on_message_fail_callback else self._default_on_message_fail - - def _default_on_message_received(self, message_id, payload, payload_length): - """Standard-Callback für empfangene Nachrichten, falls keiner angegeben ist.""" - print(f"Parser: Nachricht empfangen! ID: 0x{message_id:02X}, " - f"Payload ({payload_length} Bytes): {payload[:payload_length].hex().upper()}") - - def _default_on_message_fail(self, message_id, current_message_buffer, current_index, error_type): - """Standard-Callback für Nachrichtenfehler, falls keiner angegeben ist.""" - print(f"Parser: Fehler bei Nachricht! ID: 0x{message_id:02X}, " - f"Fehler: {error_type.name}, " - f"Bisheriger Puffer ({current_index} Bytes): {current_message_buffer[:current_index].hex().upper()}") - - def parse_byte(self, pbyte: int): - """ - Verarbeitet ein einzelnes empfangenes Byte. - - Args: - pbyte (int): Das empfangene Byte (0-255). - """ - # Sicherstellen, dass pbyte ein Integer im Bereich 0-255 ist - if not isinstance(pbyte, int) or not (0 <= pbyte <= 255): - print(f"Parser: Ungültiges Byte empfangen: {pbyte}. Muss ein Integer von 0-255 sein.") - return - - current_state = self.state # Für bessere Lesbarkeit - - if current_state == ParserState.WAITING_FOR_START_BYTE: - if pbyte == START_BYTE: - self.index = 0 - self.checksum = 0 - self.message_id = 0 # Reset message_id - self.error = ParserError.NO_ERROR # Reset error - self.state = ParserState.GET_MESSAGE_TYPE - # Andernfalls ignorieren wir Bytes, bis ein Start-Byte gefunden wird - - elif current_state == ParserState.ESCAPED_MESSAGE_TYPE: - self.message_id = pbyte - self.checksum ^= pbyte - self.state = ParserState.IN_PAYLOAD - - elif current_state == ParserState.GET_MESSAGE_TYPE: - if pbyte == ESCAPE_BYTE: - self.state = ParserState.ESCAPED_MESSAGE_TYPE - return # Dieses Byte wurde als Escape-Sequenz verarbeitet, nicht zum Payload hinzufügen - if pbyte == START_BYTE or pbyte == END_BYTE: - self.state = ParserState.WAITING_FOR_START_BYTE - self.error = ParserError.UNEXPECTED_COMMAND_BYTE - self.on_message_fail(self.message_id, self.message_buffer, self.index, self.error) - return - self.message_id = pbyte - self.checksum ^= pbyte - self.state = ParserState.IN_PAYLOAD - - elif current_state == ParserState.ESCAPE_PAYLOAD_BYTE: - # Das escapte Byte ist Teil des Payloads - if self.index < MAX_PAYLOAD_LENGTH + 1: # +1 für Checksummen-Byte - self.message_buffer[self.index] = pbyte - self.index += 1 - self.checksum ^= pbyte - self.state = ParserState.IN_PAYLOAD - else: - self.state = ParserState.WAITING_FOR_START_BYTE - self.error = ParserError.MESSAGE_TOO_LONG - self.on_message_fail(self.message_id, self.message_buffer, self.index, self.error) - return - - elif current_state == ParserState.IN_PAYLOAD: - if pbyte == ESCAPE_BYTE: - self.state = ParserState.ESCAPE_PAYLOAD_BYTE - return # Dieses Byte wurde als Escape-Sequenz verarbeitet - if pbyte == START_BYTE: - self.state = ParserState.WAITING_FOR_START_BYTE - self.error = ParserError.UNEXPECTED_COMMAND_BYTE - self.on_message_fail(self.message_id, self.message_buffer, self.index, self.error) - return - if pbyte == END_BYTE: - if self.checksum != 0x00: - # Checksummenfehler: Die Checksumme wurde bis zum End-Byte XORed. - # Wenn die empfangene Checksumme korrekt war, sollte das Ergebnis 0 sein. - self.state = ParserState.WAITING_FOR_START_BYTE - self.error = ParserError.WRONG_CHECKSUM - self.on_message_fail(self.message_id, self.message_buffer, self.index, self.error) - return - - # Erfolgreich empfangen! Die Checksumme ist das letzte Byte im Puffer. - # Die Länge des Payloads ist index - 1 (da das letzte Byte die Checksumme war). - payload_length = self.index - 1 - if payload_length < 0: # Falls nur Message ID und Checksumme, aber kein Payload - payload_length = 0 - - self.on_message_received(self.message_id, self.message_buffer, payload_length) - self.state = ParserState.WAITING_FOR_START_BYTE - return # EndByte wurde verarbeitet, nicht zum Payload hinzufügen - - # Normales Payload-Byte - if self.index < MAX_PAYLOAD_LENGTH + 1: # +1 für Checksummen-Byte - self.message_buffer[self.index] = pbyte - self.index += 1 - self.checksum ^= pbyte - else: - # Nachricht zu lang - self.state = ParserState.WAITING_FOR_START_BYTE - self.error = ParserError.MESSAGE_TOO_LONG - self.on_message_fail(self.message_id, self.message_buffer, self.index, self.error) - return - diff --git a/tools/payload_parser.py b/tools/payload_parser.py deleted file mode 100644 index 5739f87..0000000 --- a/tools/payload_parser.py +++ /dev/null @@ -1,192 +0,0 @@ -import dataclasses -import struct -from typing import Optional, Union, List - - -@dataclasses.dataclass -class StatusMessage: - """ - Repräsentiert eine Status-Nachricht (z.B. Message ID 0x01). - Payload-Format: [status_code: uint8], [battery_level: uint8], [uptime_seconds: uint16] - """ - status_code: int - battery_level: int # 0-100% - uptime_seconds: int - - -@dataclasses.dataclass -class SensorDataMessage: - """ - Repräsentiert eine Sensor-Daten-Nachricht (z.B. Message ID 0x02). - Payload-Format: [temperature_celsius: int16], [humidity_percent: uint16] - """ - temperature_celsius: int # signed short - humidity_percent: int # unsigned short - - -@dataclasses.dataclass -class ClientEntry: - """ - Repräsentiert die Informationen für einen einzelnen Client innerhalb der ClientInfoMessage. - Payload-Format: - [client_id: uint8] - [is_available: uint8] (0=false, >0=true) - [is_slot_used: uint8] (0=false, >0=true) - [mac_address: bytes (6)] - [unoccupied_value_1: uint32] - [unoccupied_value_2: uint32] - Gesamt: 1 + 1 + 1 + 6 + 4 + 4 = 17 Bytes pro Eintrag. - """ - client_id: int - is_available: bool - is_slot_used: bool - mac_address: bytes # 6 Bytes MAC-Adresse - last_ping: int # 4 Bytes, unbelegt - last_successfull_ping: int # 4 Bytes, unbelegt - - -@dataclasses.dataclass -class ClientInfoMessage: - """ - Repräsentiert eine Nachricht mit Client-Informationen (Message ID 0x03). - Payload-Format: - [num_clients: uint8] - [client_entry_1: ClientEntry] - [client_entry_2: ClientEntry] - ... - """ - num_clients: int - clients: List[ClientEntry] - - -@dataclasses.dataclass -class UnknownMessage: - """ - Repräsentiert eine Nachricht mit unbekannter ID oder fehlerhaftem Payload. - """ - message_id: int - raw_payload: bytes - error_message: str - -# --- Payload Parser Klasse --- - - -class PayloadParser: - """ - Interpretiert den Payload einer UART-Nachricht basierend auf ihrer Message ID - und wandelt ihn in ein strukturiertes Python-Objekt (dataclass) um. - """ - - def __init__(self): - # Ein Dictionary, das Message IDs auf ihre entsprechenden Parsing-Funktionen abbildet. - self._parser_map = { - 0x01: self._parse_status_message, - 0x02: self._parse_sensor_data_message, - # Aktualisiert für die neue 0x03 Struktur - 0x03: self._parse_client_info_message, - 0x04: self._parse_client_info_message, - # Füge hier weitere Message IDs und ihre Parsing-Funktionen hinzu - } - - def _parse_status_message(self, payload: bytes) -> Union[StatusMessage, UnknownMessage]: - """Parsen des Payloads für Message ID 0x01 (StatusMessage).""" - # Erwartetes Format: 1 Byte Status, 1 Byte Battery, 2 Bytes Uptime (Little-Endian) - if len(payload) != 4: - return UnknownMessage(0x01, payload, f"Falsche Payload-Länge für StatusMessage: Erwartet 4, Got {len(payload)}") - try: - # ' Union[SensorDataMessage, UnknownMessage]: - """Parsen des Payloads für Message ID 0x02 (SensorDataMessage).""" - # Erwartetes Format: 2 Bytes Temperatur (signed short), 2 Bytes Feuchtigkeit (unsigned short) (Little-Endian) - if len(payload) != 4: - return UnknownMessage(0x02, payload, f"Falsche Payload-Länge für SensorDataMessage: Erwartet 4, Got {len(payload)}") - try: - # ' Union[ClientInfoMessage, UnknownMessage]: - """Parsen des Payloads für Message ID 0x03 (ClientInfoMessage).""" - if not payload: - # Wenn der Payload leer ist, aber num_clients erwartet wird, ist das ein Fehler - return UnknownMessage(0x03, payload, "Payload für ClientInfoMessage ist leer, aber num_clients erwartet.") - - try: - # Das erste Byte ist die Anzahl der Clients - num_clients = payload[0] - # Die restlichen Bytes sind die Client-Einträge - client_data_bytes = payload[1:] - - # 1 (ID) + 1 (Avail) + 1 (Used) + 6 (MAC) + 4 (Val1) + 4 (Val2) - EXPECTED_CLIENT_ENTRY_SIZE = 17 - - if len(client_data_bytes) != num_clients * EXPECTED_CLIENT_ENTRY_SIZE: - return UnknownMessage(0x03, payload, - f"Falsche Payload-Länge für Client-Einträge: Erwartet { - num_clients * EXPECTED_CLIENT_ENTRY_SIZE}, " - f"Got {len(client_data_bytes)} nach num_clients.") - - clients_list: List[ClientEntry] = [] - # Formatstring für einen Client-Eintrag: - # < : Little-Endian - # B : uint8 (client_id, is_available, is_slot_used) - # 6s: 6 Bytes (mac_address) - # I : uint32 (unoccupied_value_1, unoccupied_value_2) - CLIENT_ENTRY_FORMAT = ' Union[StatusMessage, SensorDataMessage, ClientInfoMessage, UnknownMessage]: - """ - Interpretiert den gegebenen Payload basierend auf der Message ID. - - Args: - message_id (int): Die ID der Nachricht. - payload (bytes): Die rohen Nutzdaten der Nachricht. - - Returns: - Union[StatusMessage, SensorDataMessage, ClientInfoMessage, UnknownMessage]: - Ein dataclass-Objekt, das die dekodierten Daten repräsentiert, - oder ein UnknownMessage-Objekt bei unbekannter ID oder Parsing-Fehler. - """ - parser_func = self._parser_map.get(message_id) - if parser_func: - return parser_func(payload) - else: - return UnknownMessage(message_id, payload, "Unbekannte Message ID.")