import enum # --- Konstanten für das UART-Protokoll --- # Diese Werte müssen mit denen auf deinem Embedded-System übereinstimmen START_BYTE = 0xAA END_BYTE = 0xCC ESCAPE_BYTE = 0x7D # Beispielwert, bitte an dein Protokoll anpassen MAX_PAYLOAD_LENGTH = 255 # Maximale Länge des Nachrichten-Payloads (ohne Message ID und Checksumme) # MAX_TOTAL_CONTENT_LENGTH in C beinhaltet Message ID, Payload und Checksumme. # Hier definieren wir MAX_PAYLOAD_LENGTH, da der Parser den Payload sammelt. # Die Gesamtgröße des empfangenen Puffers (message + checksum) darf MAX_PAYLOAD_LENGTH + 1 nicht überschreiten, # da die Checksumme als letztes Byte des Payloads behandelt wird. # --- Enumerationen für Parser-Zustände und Fehler --- class ParserState(enum.Enum): WAITING_FOR_START_BYTE = 0 GET_MESSAGE_TYPE = 1 ESCAPED_MESSAGE_TYPE = 2 IN_PAYLOAD = 3 ESCAPE_PAYLOAD_BYTE = 4 class ParserError(enum.Enum): NO_ERROR = 0 UNEXPECTED_COMMAND_BYTE = 1 WRONG_CHECKSUM = 2 MESSAGE_TOO_LONG = 3 class UartMessageParser: """ Ein State-Machine-Parser für UART-Nachrichten basierend auf der bereitgestellten C-Logik. Nachrichtenformat (angenommen): [START_BYTE] [MESSAGE_ID] [PAYLOAD_BYTES...] [CHECKSUM_BYTE] [END_BYTE] Escape-Sequenzen: Wenn START_BYTE, END_BYTE oder ESCAPE_BYTE im MESSAGE_ID oder PAYLOAD vorkommen, werden sie durch ESCAPE_BYTE gefolgt vom ursprünglichen Byte (nicht XORed) ersetzt. Die Checksumme wird über die unescaped Bytes berechnet. """ def __init__(self, on_message_received_callback=None, on_message_fail_callback=None): """ Initialisiert den UART-Nachrichten-Parser. Args: on_message_received_callback (callable, optional): Eine Funktion, die aufgerufen wird, wenn eine gültige Nachricht empfangen wurde. Signatur: on_message_received(message_id: int, payload: bytes, payload_length: int) on_message_fail_callback (callable, optional): Eine Funktion, die aufgerufen wird, wenn ein Nachrichtenfehler auftritt. Signatur: on_message_fail(message_id: int, current_message_buffer: bytes, current_index: int, error_type: ParserError) """ self.state = ParserState.WAITING_FOR_START_BYTE self.index = 0 self.checksum = 0 self.message_id = 0 self.message_buffer = bytearray(MAX_PAYLOAD_LENGTH + 1) # +1 für Checksummen-Byte self.error = ParserError.NO_ERROR # Callbacks für die Anwendung. Standardmäßig None oder einfache Print-Funktionen. self.on_message_received = on_message_received_callback if on_message_received_callback else self._default_on_message_received self.on_message_fail = on_message_fail_callback if on_message_fail_callback else self._default_on_message_fail def _default_on_message_received(self, message_id, payload, payload_length): """Standard-Callback für empfangene Nachrichten, falls keiner angegeben ist.""" print(f"Parser: Nachricht empfangen! ID: 0x{message_id:02X}, " f"Payload ({payload_length} Bytes): {payload[:payload_length].hex().upper()}") def _default_on_message_fail(self, message_id, current_message_buffer, current_index, error_type): """Standard-Callback für Nachrichtenfehler, falls keiner angegeben ist.""" print(f"Parser: Fehler bei Nachricht! ID: 0x{message_id:02X}, " f"Fehler: {error_type.name}, " f"Bisheriger Puffer ({current_index} Bytes): {current_message_buffer[:current_index].hex().upper()}") def parse_byte(self, pbyte: int): """ Verarbeitet ein einzelnes empfangenes Byte. Args: pbyte (int): Das empfangene Byte (0-255). """ # Sicherstellen, dass pbyte ein Integer im Bereich 0-255 ist if not isinstance(pbyte, int) or not (0 <= pbyte <= 255): print(f"Parser: Ungültiges Byte empfangen: {pbyte}. Muss ein Integer von 0-255 sein.") return current_state = self.state # Für bessere Lesbarkeit if current_state == ParserState.WAITING_FOR_START_BYTE: if pbyte == START_BYTE: self.index = 0 self.checksum = 0 self.message_id = 0 # Reset message_id self.error = ParserError.NO_ERROR # Reset error self.state = ParserState.GET_MESSAGE_TYPE # Andernfalls ignorieren wir Bytes, bis ein Start-Byte gefunden wird elif current_state == ParserState.ESCAPED_MESSAGE_TYPE: self.message_id = pbyte self.checksum ^= pbyte self.state = ParserState.IN_PAYLOAD elif current_state == ParserState.GET_MESSAGE_TYPE: if pbyte == ESCAPE_BYTE: self.state = ParserState.ESCAPED_MESSAGE_TYPE return # Dieses Byte wurde als Escape-Sequenz verarbeitet, nicht zum Payload hinzufügen if pbyte == START_BYTE or pbyte == END_BYTE: self.state = ParserState.WAITING_FOR_START_BYTE self.error = ParserError.UNEXPECTED_COMMAND_BYTE self.on_message_fail(self.message_id, self.message_buffer, self.index, self.error) return self.message_id = pbyte self.checksum ^= pbyte self.state = ParserState.IN_PAYLOAD elif current_state == ParserState.ESCAPE_PAYLOAD_BYTE: # Das escapte Byte ist Teil des Payloads if self.index < MAX_PAYLOAD_LENGTH + 1: # +1 für Checksummen-Byte self.message_buffer[self.index] = pbyte self.index += 1 self.checksum ^= pbyte self.state = ParserState.IN_PAYLOAD else: self.state = ParserState.WAITING_FOR_START_BYTE self.error = ParserError.MESSAGE_TOO_LONG self.on_message_fail(self.message_id, self.message_buffer, self.index, self.error) return elif current_state == ParserState.IN_PAYLOAD: if pbyte == ESCAPE_BYTE: self.state = ParserState.ESCAPE_PAYLOAD_BYTE return # Dieses Byte wurde als Escape-Sequenz verarbeitet if pbyte == START_BYTE: self.state = ParserState.WAITING_FOR_START_BYTE self.error = ParserError.UNEXPECTED_COMMAND_BYTE self.on_message_fail(self.message_id, self.message_buffer, self.index, self.error) return if pbyte == END_BYTE: if self.checksum != 0x00: # Checksummenfehler: Die Checksumme wurde bis zum End-Byte XORed. # Wenn die empfangene Checksumme korrekt war, sollte das Ergebnis 0 sein. self.state = ParserState.WAITING_FOR_START_BYTE self.error = ParserError.WRONG_CHECKSUM self.on_message_fail(self.message_id, self.message_buffer, self.index, self.error) return # Erfolgreich empfangen! Die Checksumme ist das letzte Byte im Puffer. # Die Länge des Payloads ist index - 1 (da das letzte Byte die Checksumme war). payload_length = self.index - 1 if payload_length < 0: # Falls nur Message ID und Checksumme, aber kein Payload payload_length = 0 self.on_message_received(self.message_id, self.message_buffer, payload_length) self.state = ParserState.WAITING_FOR_START_BYTE return # EndByte wurde verarbeitet, nicht zum Payload hinzufügen # Normales Payload-Byte if self.index < MAX_PAYLOAD_LENGTH + 1: # +1 für Checksummen-Byte self.message_buffer[self.index] = pbyte self.index += 1 self.checksum ^= pbyte else: # Nachricht zu lang self.state = ParserState.WAITING_FOR_START_BYTE self.error = ParserError.MESSAGE_TOO_LONG self.on_message_fail(self.message_id, self.message_buffer, self.index, self.error) return