esp_alox/tools/message_builder.py
2025-07-23 16:48:55 +02:00

116 lines
4.1 KiB
Python

import enum
START_BYTE = 0xAA
ESCAPE_BYTE = 0xBB
END_BYTE = 0xCC
class MessageBuilderError(Exception):
"""Basisklasse für Fehler des Message Builders."""
pass
class PayloadTooLargeError(MessageBuilderError):
"""Ausnahme, wenn der Payload zu groß für den Puffer ist."""
def __init__(self, required_size, buffer_size):
super().__init__(f"Payload ({
required_size} bytes) ist größer als der verfügbare Puffer ({buffer_size} bytes).")
self.required_size = required_size
self.buffer_size = buffer_size
class BufferOverflowError(MessageBuilderError):
"""Ausnahme, wenn der Puffer während des Bauens überläuft."""
def __init__(self, current_size, max_size, byte_to_add=None):
msg = f"Pufferüberlauf: Aktuelle Größe {
current_size}, Max. Größe {max_size}."
if byte_to_add is not None:
msg += f" Versuch, Byte 0x{byte_to_add:02X} hinzuzufügen."
super().__init__(msg)
self.current_size = current_size
self.max_size = max_size
self.byte_to_add = byte_to_add
class MessageBuilder:
"""
Klasse zum Aufbau von UART-Nachrichten gemäß dem definierten Protokoll,
inklusive Stuffing und Checksummenberechnung.
"""
def __init__(self):
pass
def _needs_stuffing_byte(self, byte: int) -> bool:
"""
Prüft, ob ein Byte ein Stuffing-Byte benötigt (d.h. ob es ein Steuerbyte ist).
"""
return (byte == START_BYTE or byte == ESCAPE_BYTE or byte == END_BYTE)
def _add_byte_with_length_check(self, byte: int, buffer: bytearray, max_length: int):
"""
Fügt ein Byte zum Puffer hinzu und prüft auf Pufferüberlauf.
Löst BufferOverflowError aus, wenn der Puffer voll ist.
"""
if len(buffer) >= max_length:
raise BufferOverflowError(len(buffer), max_length, byte)
buffer.append(byte)
def build_message(self, msgid: int, payload: bytes, msg_buffer_size: int) -> bytes:
"""
Baut eine vollständige UART-Nachricht.
Args:
msgid (int): Die Message ID (0-255).
payload (bytes): Die Nutzdaten der Nachricht als Byte-Objekt.
msg_buffer_size (int): Die maximale Größe des Ausgabepuffers.
Dies ist die maximale Länge der *fertigen* Nachricht.
Returns:
bytes: Die fertig aufgebaute Nachricht als Byte-Objekt.
Raises:
PayloadTooLargeError: Wenn der Payload (mit Overhead) den Puffer überschreiten würde.
BufferOverflowError: Wenn während des Bauens ein Pufferüberlauf auftritt.
"""
if len(payload) + 4 > msg_buffer_size:
raise PayloadTooLargeError(len(payload) + 4, msg_buffer_size)
checksum = 0
msg_buffer = bytearray()
# 1. StartByte hinzufügen
self._add_byte_with_length_check(
START_BYTE, msg_buffer, msg_buffer_size)
# 2. Message ID hinzufügen (mit Stuffing)
if self._needs_stuffing_byte(msgid):
self._add_byte_with_length_check(
ESCAPE_BYTE, msg_buffer, msg_buffer_size)
self._add_byte_with_length_check(msgid, msg_buffer, msg_buffer_size)
checksum ^= msgid
# 3. Payload-Bytes hinzufügen (mit Stuffing)
for byte_val in payload:
if self._needs_stuffing_byte(byte_val):
self._add_byte_with_length_check(
ESCAPE_BYTE, msg_buffer, msg_buffer_size)
self._add_byte_with_length_check(
byte_val, msg_buffer, msg_buffer_size)
checksum ^= byte_val
# 4. Checksumme hinzufügen (mit Stuffing)
if self._needs_stuffing_byte(checksum):
self._add_byte_with_length_check(
ESCAPE_BYTE, msg_buffer, msg_buffer_size)
self._add_byte_with_length_check(checksum, msg_buffer, msg_buffer_size)
# 5. EndByte hinzufügen
self._add_byte_with_length_check(END_BYTE, msg_buffer, msg_buffer_size)
# Konvertiere bytearray zu unveränderlichem bytes-Objekt
return bytes(msg_buffer)