From fad6a0aee23b9f65d75d860fa207df26a8197166 Mon Sep 17 00:00:00 2001 From: simon Date: Wed, 23 Jul 2025 16:49:17 +0200 Subject: [PATCH] Added Python Test tool --- tools/main.py | 111 +++++++++++++++++++++++ tools/parser.py | 170 +++++++++++++++++++++++++++++++++++ tools/payload_parser.py | 192 ++++++++++++++++++++++++++++++++++++++++ 3 files changed, 473 insertions(+) create mode 100644 tools/main.py create mode 100644 tools/parser.py create mode 100644 tools/payload_parser.py diff --git a/tools/main.py b/tools/main.py new file mode 100644 index 0000000..a13621e --- /dev/null +++ b/tools/main.py @@ -0,0 +1,111 @@ +import serial +import time +from parser import UartMessageParser, ParserError +from message_builder import MessageBuilder, MessageBuilderError, PayloadTooLargeError, BufferOverflowError +import payload_parser + +SERIAL_PORT = "/dev/ttyUSB0" +BAUDRATE = 115200 +WRITE_TIMEOUT = 0.5 +READ_TIMEOUT = 1.0 + +payload_parser = payload_parser.PayloadParser() + + +def on_message_received_from_uart(message_id: int, payload: bytes, payload_length: int): + """ + Callback-Funktion, die aufgerufen wird, wenn der Parser eine vollständige, + gültige Nachricht empfangen hat. + """ + print(f"\n[MAIN] Nachricht erfolgreich empfangen! ID: 0x{ + message_id:02X}") + print(f"[MAIN] Payload ({payload_length} Bytes): { + payload[:payload_length].hex().upper()}") + + parsed_object = payload_parser.parse_payload( + message_id, payload[:payload_length]) + + print(parsed_object) + + +def on_message_fail_from_uart(message_id: int, current_message_buffer: bytes, + current_index: int, error_type: ParserError): + """ + Callback-Funktion, die aufgerufen wird, wenn der Parser einen Fehler + beim Empfang einer Nachricht feststellt. + """ + print(f"\n[MAIN] Fehler beim Parsen der Nachricht! ID: 0x{ + message_id:02X}") + print(f"[MAIN] Fehler: {error_type.name}") + print(f"[MAIN] Bisheriger Puffer ({current_index} Bytes): { + current_message_buffer[:current_index].hex().upper()}") + + +def run_uart_test(): + """ + Führt den UART-Test durch: Sendet eine Nachricht und liest alle Antworten. + """ + ser = None + + parser = UartMessageParser( + on_message_received_callback=on_message_received_from_uart, + on_message_fail_callback=on_message_fail_from_uart + ) + message_builder = MessageBuilder() + + try: + ser = serial.Serial( + port=SERIAL_PORT, + baudrate=BAUDRATE, + timeout=READ_TIMEOUT, + write_timeout=WRITE_TIMEOUT + ) + print(f"Serielle Schnittstelle { + SERIAL_PORT} mit Baudrate {BAUDRATE} geöffnet.") + + try: + # Baue die Nachricht + message_to_send = message_builder.build_message( + 0x3, + b'', + 255 + ) + print(f"\n[MAIN] Gebaute Nachricht zum Senden: { + message_to_send.hex().upper()}") + bytes_written = ser.write(message_to_send) + print(f"[MAIN] {bytes_written} Bytes gesendet.") + except (PayloadTooLargeError, BufferOverflowError) as e: + print(f"[MAIN] Fehler beim Bauen der Nachricht: {e}") + return # Beende die Funktion, wenn die Nachricht nicht gebaut werden kann + except Exception as e: + print( + f"[MAIN] Ein unerwarteter Fehler beim Bauen der Nachricht ist aufgetreten: {e}") + return + + time.sleep(0.1) + + received_data = ser.read_all() + + if received_data: + print(f"Empfangene Daten ({len(received_data)} Bytes): { + received_data.hex().upper()}") + for byte_val in received_data: + parser.parse_byte(byte_val) + else: + print("Keine Daten empfangen.") + + except serial.SerialException as e: + print(f"Fehler beim Zugriff auf die serielle Schnittstelle: {e}") + print(f"Stelle sicher, dass '{ + SERIAL_PORT}' der korrekte Port ist und nicht von einer anderen Anwendung verwendet wird.") + except Exception as e: + print(f"Ein unerwarteter Fehler ist aufgetreten: {e}") + finally: + if ser and ser.is_open: + ser.close() + print("Serielle Schnittstelle geschlossen.") + + +# Führe den Test aus +if __name__ == "__main__": + run_uart_test() diff --git a/tools/parser.py b/tools/parser.py new file mode 100644 index 0000000..b06601b --- /dev/null +++ b/tools/parser.py @@ -0,0 +1,170 @@ +import enum + +# --- Konstanten für das UART-Protokoll --- +# Diese Werte müssen mit denen auf deinem Embedded-System übereinstimmen +START_BYTE = 0xAA +END_BYTE = 0xCC +ESCAPE_BYTE = 0x7D # Beispielwert, bitte an dein Protokoll anpassen + +MAX_PAYLOAD_LENGTH = 255 # Maximale Länge des Nachrichten-Payloads (ohne Message ID und Checksumme) +# MAX_TOTAL_CONTENT_LENGTH in C beinhaltet Message ID, Payload und Checksumme. +# Hier definieren wir MAX_PAYLOAD_LENGTH, da der Parser den Payload sammelt. +# Die Gesamtgröße des empfangenen Puffers (message + checksum) darf MAX_PAYLOAD_LENGTH + 1 nicht überschreiten, +# da die Checksumme als letztes Byte des Payloads behandelt wird. + +# --- Enumerationen für Parser-Zustände und Fehler --- +class ParserState(enum.Enum): + WAITING_FOR_START_BYTE = 0 + GET_MESSAGE_TYPE = 1 + ESCAPED_MESSAGE_TYPE = 2 + IN_PAYLOAD = 3 + ESCAPE_PAYLOAD_BYTE = 4 + +class ParserError(enum.Enum): + NO_ERROR = 0 + UNEXPECTED_COMMAND_BYTE = 1 + WRONG_CHECKSUM = 2 + MESSAGE_TOO_LONG = 3 + +class UartMessageParser: + """ + Ein State-Machine-Parser für UART-Nachrichten basierend auf der bereitgestellten C-Logik. + + Nachrichtenformat (angenommen): + [START_BYTE] [MESSAGE_ID] [PAYLOAD_BYTES...] [CHECKSUM_BYTE] [END_BYTE] + + Escape-Sequenzen: + Wenn START_BYTE, END_BYTE oder ESCAPE_BYTE im MESSAGE_ID oder PAYLOAD vorkommen, + werden sie durch ESCAPE_BYTE gefolgt vom ursprünglichen Byte (nicht XORed) ersetzt. + Die Checksumme wird über die unescaped Bytes berechnet. + """ + + def __init__(self, on_message_received_callback=None, on_message_fail_callback=None): + """ + Initialisiert den UART-Nachrichten-Parser. + + Args: + on_message_received_callback (callable, optional): Eine Funktion, die aufgerufen wird, + wenn eine gültige Nachricht empfangen wurde. + Signatur: on_message_received(message_id: int, payload: bytes, payload_length: int) + on_message_fail_callback (callable, optional): Eine Funktion, die aufgerufen wird, + wenn ein Nachrichtenfehler auftritt. + Signatur: on_message_fail(message_id: int, current_message_buffer: bytes, + current_index: int, error_type: ParserError) + """ + self.state = ParserState.WAITING_FOR_START_BYTE + self.index = 0 + self.checksum = 0 + self.message_id = 0 + self.message_buffer = bytearray(MAX_PAYLOAD_LENGTH + 1) # +1 für Checksummen-Byte + self.error = ParserError.NO_ERROR + + # Callbacks für die Anwendung. Standardmäßig None oder einfache Print-Funktionen. + self.on_message_received = on_message_received_callback if on_message_received_callback else self._default_on_message_received + self.on_message_fail = on_message_fail_callback if on_message_fail_callback else self._default_on_message_fail + + def _default_on_message_received(self, message_id, payload, payload_length): + """Standard-Callback für empfangene Nachrichten, falls keiner angegeben ist.""" + print(f"Parser: Nachricht empfangen! ID: 0x{message_id:02X}, " + f"Payload ({payload_length} Bytes): {payload[:payload_length].hex().upper()}") + + def _default_on_message_fail(self, message_id, current_message_buffer, current_index, error_type): + """Standard-Callback für Nachrichtenfehler, falls keiner angegeben ist.""" + print(f"Parser: Fehler bei Nachricht! ID: 0x{message_id:02X}, " + f"Fehler: {error_type.name}, " + f"Bisheriger Puffer ({current_index} Bytes): {current_message_buffer[:current_index].hex().upper()}") + + def parse_byte(self, pbyte: int): + """ + Verarbeitet ein einzelnes empfangenes Byte. + + Args: + pbyte (int): Das empfangene Byte (0-255). + """ + # Sicherstellen, dass pbyte ein Integer im Bereich 0-255 ist + if not isinstance(pbyte, int) or not (0 <= pbyte <= 255): + print(f"Parser: Ungültiges Byte empfangen: {pbyte}. Muss ein Integer von 0-255 sein.") + return + + current_state = self.state # Für bessere Lesbarkeit + + if current_state == ParserState.WAITING_FOR_START_BYTE: + if pbyte == START_BYTE: + self.index = 0 + self.checksum = 0 + self.message_id = 0 # Reset message_id + self.error = ParserError.NO_ERROR # Reset error + self.state = ParserState.GET_MESSAGE_TYPE + # Andernfalls ignorieren wir Bytes, bis ein Start-Byte gefunden wird + + elif current_state == ParserState.ESCAPED_MESSAGE_TYPE: + self.message_id = pbyte + self.checksum ^= pbyte + self.state = ParserState.IN_PAYLOAD + + elif current_state == ParserState.GET_MESSAGE_TYPE: + if pbyte == ESCAPE_BYTE: + self.state = ParserState.ESCAPED_MESSAGE_TYPE + return # Dieses Byte wurde als Escape-Sequenz verarbeitet, nicht zum Payload hinzufügen + if pbyte == START_BYTE or pbyte == END_BYTE: + self.state = ParserState.WAITING_FOR_START_BYTE + self.error = ParserError.UNEXPECTED_COMMAND_BYTE + self.on_message_fail(self.message_id, self.message_buffer, self.index, self.error) + return + self.message_id = pbyte + self.checksum ^= pbyte + self.state = ParserState.IN_PAYLOAD + + elif current_state == ParserState.ESCAPE_PAYLOAD_BYTE: + # Das escapte Byte ist Teil des Payloads + if self.index < MAX_PAYLOAD_LENGTH + 1: # +1 für Checksummen-Byte + self.message_buffer[self.index] = pbyte + self.index += 1 + self.checksum ^= pbyte + self.state = ParserState.IN_PAYLOAD + else: + self.state = ParserState.WAITING_FOR_START_BYTE + self.error = ParserError.MESSAGE_TOO_LONG + self.on_message_fail(self.message_id, self.message_buffer, self.index, self.error) + return + + elif current_state == ParserState.IN_PAYLOAD: + if pbyte == ESCAPE_BYTE: + self.state = ParserState.ESCAPE_PAYLOAD_BYTE + return # Dieses Byte wurde als Escape-Sequenz verarbeitet + if pbyte == START_BYTE: + self.state = ParserState.WAITING_FOR_START_BYTE + self.error = ParserError.UNEXPECTED_COMMAND_BYTE + self.on_message_fail(self.message_id, self.message_buffer, self.index, self.error) + return + if pbyte == END_BYTE: + if self.checksum != 0x00: + # Checksummenfehler: Die Checksumme wurde bis zum End-Byte XORed. + # Wenn die empfangene Checksumme korrekt war, sollte das Ergebnis 0 sein. + self.state = ParserState.WAITING_FOR_START_BYTE + self.error = ParserError.WRONG_CHECKSUM + self.on_message_fail(self.message_id, self.message_buffer, self.index, self.error) + return + + # Erfolgreich empfangen! Die Checksumme ist das letzte Byte im Puffer. + # Die Länge des Payloads ist index - 1 (da das letzte Byte die Checksumme war). + payload_length = self.index - 1 + if payload_length < 0: # Falls nur Message ID und Checksumme, aber kein Payload + payload_length = 0 + + self.on_message_received(self.message_id, self.message_buffer, payload_length) + self.state = ParserState.WAITING_FOR_START_BYTE + return # EndByte wurde verarbeitet, nicht zum Payload hinzufügen + + # Normales Payload-Byte + if self.index < MAX_PAYLOAD_LENGTH + 1: # +1 für Checksummen-Byte + self.message_buffer[self.index] = pbyte + self.index += 1 + self.checksum ^= pbyte + else: + # Nachricht zu lang + self.state = ParserState.WAITING_FOR_START_BYTE + self.error = ParserError.MESSAGE_TOO_LONG + self.on_message_fail(self.message_id, self.message_buffer, self.index, self.error) + return + diff --git a/tools/payload_parser.py b/tools/payload_parser.py new file mode 100644 index 0000000..5739f87 --- /dev/null +++ b/tools/payload_parser.py @@ -0,0 +1,192 @@ +import dataclasses +import struct +from typing import Optional, Union, List + + +@dataclasses.dataclass +class StatusMessage: + """ + Repräsentiert eine Status-Nachricht (z.B. Message ID 0x01). + Payload-Format: [status_code: uint8], [battery_level: uint8], [uptime_seconds: uint16] + """ + status_code: int + battery_level: int # 0-100% + uptime_seconds: int + + +@dataclasses.dataclass +class SensorDataMessage: + """ + Repräsentiert eine Sensor-Daten-Nachricht (z.B. Message ID 0x02). + Payload-Format: [temperature_celsius: int16], [humidity_percent: uint16] + """ + temperature_celsius: int # signed short + humidity_percent: int # unsigned short + + +@dataclasses.dataclass +class ClientEntry: + """ + Repräsentiert die Informationen für einen einzelnen Client innerhalb der ClientInfoMessage. + Payload-Format: + [client_id: uint8] + [is_available: uint8] (0=false, >0=true) + [is_slot_used: uint8] (0=false, >0=true) + [mac_address: bytes (6)] + [unoccupied_value_1: uint32] + [unoccupied_value_2: uint32] + Gesamt: 1 + 1 + 1 + 6 + 4 + 4 = 17 Bytes pro Eintrag. + """ + client_id: int + is_available: bool + is_slot_used: bool + mac_address: bytes # 6 Bytes MAC-Adresse + last_ping: int # 4 Bytes, unbelegt + last_successfull_ping: int # 4 Bytes, unbelegt + + +@dataclasses.dataclass +class ClientInfoMessage: + """ + Repräsentiert eine Nachricht mit Client-Informationen (Message ID 0x03). + Payload-Format: + [num_clients: uint8] + [client_entry_1: ClientEntry] + [client_entry_2: ClientEntry] + ... + """ + num_clients: int + clients: List[ClientEntry] + + +@dataclasses.dataclass +class UnknownMessage: + """ + Repräsentiert eine Nachricht mit unbekannter ID oder fehlerhaftem Payload. + """ + message_id: int + raw_payload: bytes + error_message: str + +# --- Payload Parser Klasse --- + + +class PayloadParser: + """ + Interpretiert den Payload einer UART-Nachricht basierend auf ihrer Message ID + und wandelt ihn in ein strukturiertes Python-Objekt (dataclass) um. + """ + + def __init__(self): + # Ein Dictionary, das Message IDs auf ihre entsprechenden Parsing-Funktionen abbildet. + self._parser_map = { + 0x01: self._parse_status_message, + 0x02: self._parse_sensor_data_message, + # Aktualisiert für die neue 0x03 Struktur + 0x03: self._parse_client_info_message, + 0x04: self._parse_client_info_message, + # Füge hier weitere Message IDs und ihre Parsing-Funktionen hinzu + } + + def _parse_status_message(self, payload: bytes) -> Union[StatusMessage, UnknownMessage]: + """Parsen des Payloads für Message ID 0x01 (StatusMessage).""" + # Erwartetes Format: 1 Byte Status, 1 Byte Battery, 2 Bytes Uptime (Little-Endian) + if len(payload) != 4: + return UnknownMessage(0x01, payload, f"Falsche Payload-Länge für StatusMessage: Erwartet 4, Got {len(payload)}") + try: + # ' Union[SensorDataMessage, UnknownMessage]: + """Parsen des Payloads für Message ID 0x02 (SensorDataMessage).""" + # Erwartetes Format: 2 Bytes Temperatur (signed short), 2 Bytes Feuchtigkeit (unsigned short) (Little-Endian) + if len(payload) != 4: + return UnknownMessage(0x02, payload, f"Falsche Payload-Länge für SensorDataMessage: Erwartet 4, Got {len(payload)}") + try: + # ' Union[ClientInfoMessage, UnknownMessage]: + """Parsen des Payloads für Message ID 0x03 (ClientInfoMessage).""" + if not payload: + # Wenn der Payload leer ist, aber num_clients erwartet wird, ist das ein Fehler + return UnknownMessage(0x03, payload, "Payload für ClientInfoMessage ist leer, aber num_clients erwartet.") + + try: + # Das erste Byte ist die Anzahl der Clients + num_clients = payload[0] + # Die restlichen Bytes sind die Client-Einträge + client_data_bytes = payload[1:] + + # 1 (ID) + 1 (Avail) + 1 (Used) + 6 (MAC) + 4 (Val1) + 4 (Val2) + EXPECTED_CLIENT_ENTRY_SIZE = 17 + + if len(client_data_bytes) != num_clients * EXPECTED_CLIENT_ENTRY_SIZE: + return UnknownMessage(0x03, payload, + f"Falsche Payload-Länge für Client-Einträge: Erwartet { + num_clients * EXPECTED_CLIENT_ENTRY_SIZE}, " + f"Got {len(client_data_bytes)} nach num_clients.") + + clients_list: List[ClientEntry] = [] + # Formatstring für einen Client-Eintrag: + # < : Little-Endian + # B : uint8 (client_id, is_available, is_slot_used) + # 6s: 6 Bytes (mac_address) + # I : uint32 (unoccupied_value_1, unoccupied_value_2) + CLIENT_ENTRY_FORMAT = ' Union[StatusMessage, SensorDataMessage, ClientInfoMessage, UnknownMessage]: + """ + Interpretiert den gegebenen Payload basierend auf der Message ID. + + Args: + message_id (int): Die ID der Nachricht. + payload (bytes): Die rohen Nutzdaten der Nachricht. + + Returns: + Union[StatusMessage, SensorDataMessage, ClientInfoMessage, UnknownMessage]: + Ein dataclass-Objekt, das die dekodierten Daten repräsentiert, + oder ein UnknownMessage-Objekt bei unbekannter ID oder Parsing-Fehler. + """ + parser_func = self._parser_map.get(message_id) + if parser_func: + return parser_func(payload) + else: + return UnknownMessage(message_id, payload, "Unbekannte Message ID.")