171 lines
8.1 KiB
Python
171 lines
8.1 KiB
Python
import enum
|
|
|
|
# --- Konstanten für das UART-Protokoll ---
|
|
# Diese Werte müssen mit denen auf deinem Embedded-System übereinstimmen
|
|
START_BYTE = 0xAA
|
|
END_BYTE = 0xCC
|
|
ESCAPE_BYTE = 0x7D # Beispielwert, bitte an dein Protokoll anpassen
|
|
|
|
MAX_PAYLOAD_LENGTH = 255 # Maximale Länge des Nachrichten-Payloads (ohne Message ID und Checksumme)
|
|
# MAX_TOTAL_CONTENT_LENGTH in C beinhaltet Message ID, Payload und Checksumme.
|
|
# Hier definieren wir MAX_PAYLOAD_LENGTH, da der Parser den Payload sammelt.
|
|
# Die Gesamtgröße des empfangenen Puffers (message + checksum) darf MAX_PAYLOAD_LENGTH + 1 nicht überschreiten,
|
|
# da die Checksumme als letztes Byte des Payloads behandelt wird.
|
|
|
|
# --- Enumerationen für Parser-Zustände und Fehler ---
|
|
class ParserState(enum.Enum):
|
|
WAITING_FOR_START_BYTE = 0
|
|
GET_MESSAGE_TYPE = 1
|
|
ESCAPED_MESSAGE_TYPE = 2
|
|
IN_PAYLOAD = 3
|
|
ESCAPE_PAYLOAD_BYTE = 4
|
|
|
|
class ParserError(enum.Enum):
|
|
NO_ERROR = 0
|
|
UNEXPECTED_COMMAND_BYTE = 1
|
|
WRONG_CHECKSUM = 2
|
|
MESSAGE_TOO_LONG = 3
|
|
|
|
class UartMessageParser:
|
|
"""
|
|
Ein State-Machine-Parser für UART-Nachrichten basierend auf der bereitgestellten C-Logik.
|
|
|
|
Nachrichtenformat (angenommen):
|
|
[START_BYTE] [MESSAGE_ID] [PAYLOAD_BYTES...] [CHECKSUM_BYTE] [END_BYTE]
|
|
|
|
Escape-Sequenzen:
|
|
Wenn START_BYTE, END_BYTE oder ESCAPE_BYTE im MESSAGE_ID oder PAYLOAD vorkommen,
|
|
werden sie durch ESCAPE_BYTE gefolgt vom ursprünglichen Byte (nicht XORed) ersetzt.
|
|
Die Checksumme wird über die unescaped Bytes berechnet.
|
|
"""
|
|
|
|
def __init__(self, on_message_received_callback=None, on_message_fail_callback=None):
|
|
"""
|
|
Initialisiert den UART-Nachrichten-Parser.
|
|
|
|
Args:
|
|
on_message_received_callback (callable, optional): Eine Funktion, die aufgerufen wird,
|
|
wenn eine gültige Nachricht empfangen wurde.
|
|
Signatur: on_message_received(message_id: int, payload: bytes, payload_length: int)
|
|
on_message_fail_callback (callable, optional): Eine Funktion, die aufgerufen wird,
|
|
wenn ein Nachrichtenfehler auftritt.
|
|
Signatur: on_message_fail(message_id: int, current_message_buffer: bytes,
|
|
current_index: int, error_type: ParserError)
|
|
"""
|
|
self.state = ParserState.WAITING_FOR_START_BYTE
|
|
self.index = 0
|
|
self.checksum = 0
|
|
self.message_id = 0
|
|
self.message_buffer = bytearray(MAX_PAYLOAD_LENGTH + 1) # +1 für Checksummen-Byte
|
|
self.error = ParserError.NO_ERROR
|
|
|
|
# Callbacks für die Anwendung. Standardmäßig None oder einfache Print-Funktionen.
|
|
self.on_message_received = on_message_received_callback if on_message_received_callback else self._default_on_message_received
|
|
self.on_message_fail = on_message_fail_callback if on_message_fail_callback else self._default_on_message_fail
|
|
|
|
def _default_on_message_received(self, message_id, payload, payload_length):
|
|
"""Standard-Callback für empfangene Nachrichten, falls keiner angegeben ist."""
|
|
print(f"Parser: Nachricht empfangen! ID: 0x{message_id:02X}, "
|
|
f"Payload ({payload_length} Bytes): {payload[:payload_length].hex().upper()}")
|
|
|
|
def _default_on_message_fail(self, message_id, current_message_buffer, current_index, error_type):
|
|
"""Standard-Callback für Nachrichtenfehler, falls keiner angegeben ist."""
|
|
print(f"Parser: Fehler bei Nachricht! ID: 0x{message_id:02X}, "
|
|
f"Fehler: {error_type.name}, "
|
|
f"Bisheriger Puffer ({current_index} Bytes): {current_message_buffer[:current_index].hex().upper()}")
|
|
|
|
def parse_byte(self, pbyte: int):
|
|
"""
|
|
Verarbeitet ein einzelnes empfangenes Byte.
|
|
|
|
Args:
|
|
pbyte (int): Das empfangene Byte (0-255).
|
|
"""
|
|
# Sicherstellen, dass pbyte ein Integer im Bereich 0-255 ist
|
|
if not isinstance(pbyte, int) or not (0 <= pbyte <= 255):
|
|
print(f"Parser: Ungültiges Byte empfangen: {pbyte}. Muss ein Integer von 0-255 sein.")
|
|
return
|
|
|
|
current_state = self.state # Für bessere Lesbarkeit
|
|
|
|
if current_state == ParserState.WAITING_FOR_START_BYTE:
|
|
if pbyte == START_BYTE:
|
|
self.index = 0
|
|
self.checksum = 0
|
|
self.message_id = 0 # Reset message_id
|
|
self.error = ParserError.NO_ERROR # Reset error
|
|
self.state = ParserState.GET_MESSAGE_TYPE
|
|
# Andernfalls ignorieren wir Bytes, bis ein Start-Byte gefunden wird
|
|
|
|
elif current_state == ParserState.ESCAPED_MESSAGE_TYPE:
|
|
self.message_id = pbyte
|
|
self.checksum ^= pbyte
|
|
self.state = ParserState.IN_PAYLOAD
|
|
|
|
elif current_state == ParserState.GET_MESSAGE_TYPE:
|
|
if pbyte == ESCAPE_BYTE:
|
|
self.state = ParserState.ESCAPED_MESSAGE_TYPE
|
|
return # Dieses Byte wurde als Escape-Sequenz verarbeitet, nicht zum Payload hinzufügen
|
|
if pbyte == START_BYTE or pbyte == END_BYTE:
|
|
self.state = ParserState.WAITING_FOR_START_BYTE
|
|
self.error = ParserError.UNEXPECTED_COMMAND_BYTE
|
|
self.on_message_fail(self.message_id, self.message_buffer, self.index, self.error)
|
|
return
|
|
self.message_id = pbyte
|
|
self.checksum ^= pbyte
|
|
self.state = ParserState.IN_PAYLOAD
|
|
|
|
elif current_state == ParserState.ESCAPE_PAYLOAD_BYTE:
|
|
# Das escapte Byte ist Teil des Payloads
|
|
if self.index < MAX_PAYLOAD_LENGTH + 1: # +1 für Checksummen-Byte
|
|
self.message_buffer[self.index] = pbyte
|
|
self.index += 1
|
|
self.checksum ^= pbyte
|
|
self.state = ParserState.IN_PAYLOAD
|
|
else:
|
|
self.state = ParserState.WAITING_FOR_START_BYTE
|
|
self.error = ParserError.MESSAGE_TOO_LONG
|
|
self.on_message_fail(self.message_id, self.message_buffer, self.index, self.error)
|
|
return
|
|
|
|
elif current_state == ParserState.IN_PAYLOAD:
|
|
if pbyte == ESCAPE_BYTE:
|
|
self.state = ParserState.ESCAPE_PAYLOAD_BYTE
|
|
return # Dieses Byte wurde als Escape-Sequenz verarbeitet
|
|
if pbyte == START_BYTE:
|
|
self.state = ParserState.WAITING_FOR_START_BYTE
|
|
self.error = ParserError.UNEXPECTED_COMMAND_BYTE
|
|
self.on_message_fail(self.message_id, self.message_buffer, self.index, self.error)
|
|
return
|
|
if pbyte == END_BYTE:
|
|
if self.checksum != 0x00:
|
|
# Checksummenfehler: Die Checksumme wurde bis zum End-Byte XORed.
|
|
# Wenn die empfangene Checksumme korrekt war, sollte das Ergebnis 0 sein.
|
|
self.state = ParserState.WAITING_FOR_START_BYTE
|
|
self.error = ParserError.WRONG_CHECKSUM
|
|
self.on_message_fail(self.message_id, self.message_buffer, self.index, self.error)
|
|
return
|
|
|
|
# Erfolgreich empfangen! Die Checksumme ist das letzte Byte im Puffer.
|
|
# Die Länge des Payloads ist index - 1 (da das letzte Byte die Checksumme war).
|
|
payload_length = self.index - 1
|
|
if payload_length < 0: # Falls nur Message ID und Checksumme, aber kein Payload
|
|
payload_length = 0
|
|
|
|
self.on_message_received(self.message_id, self.message_buffer, payload_length)
|
|
self.state = ParserState.WAITING_FOR_START_BYTE
|
|
return # EndByte wurde verarbeitet, nicht zum Payload hinzufügen
|
|
|
|
# Normales Payload-Byte
|
|
if self.index < MAX_PAYLOAD_LENGTH + 1: # +1 für Checksummen-Byte
|
|
self.message_buffer[self.index] = pbyte
|
|
self.index += 1
|
|
self.checksum ^= pbyte
|
|
else:
|
|
# Nachricht zu lang
|
|
self.state = ParserState.WAITING_FOR_START_BYTE
|
|
self.error = ParserError.MESSAGE_TOO_LONG
|
|
self.on_message_fail(self.message_id, self.message_buffer, self.index, self.error)
|
|
return
|
|
|