Removed old vibe coded Python Test Tool

This commit is contained in:
simon 2025-07-26 10:39:11 +02:00
parent 441347fc95
commit ebb739a3a0
4 changed files with 0 additions and 725 deletions

View File

@ -1,248 +0,0 @@
import queue # Zum sicheren Datenaustausch zwischen Threads
import serial
import time
import threading
import sys
from parser import UartMessageParser, ParserError
from message_builder import MessageBuilder, MessageBuilderError, PayloadTooLargeError, BufferOverflowError
import payload_parser
from rich.console import Console
from rich.table import Table
SERIAL_PORT = "/dev/ttyUSB0"
BAUDRATE = 115200
WRITE_TIMEOUT = 1.5
READ_TIMEOUT = 2.0
payload_parser = payload_parser.PayloadParser()
def on_message_received_from_uart(parsed_message):
print(f"[CALLBACK] Nachricht empfangen: MSGID=0x{
parsed_message.msgid:02X}, Length={parsed_message.payload_len}")
received_message_queue.put(parsed_message)
def on_message_fail_from_uart(error_message):
print(f"[CALLBACK] Fehler beim Parsen: {error_message}")
class ParsedMessage:
def __init__(self, msgid, payload_len):
self.msgid = msgid
self.payload_len = payload_len
received_message_queue = queue.Queue()
class SerialReader(threading.Thread):
# Ändere den Konstruktor, um eine bereits geöffnete serielle Instanz zu akzeptieren
def __init__(self, ser_instance, read_timeout, parser):
super().__init__()
# Speichere die übergebene serielle Instanz
self.ser = ser_instance
self.read_timeout = read_timeout
self.parser = parser
self.running = False
self.daemon = True # Thread beendet sich mit dem Hauptprogramm
def run(self):
# Überprüfe, ob die serielle Schnittstelle wirklich offen ist, bevor du beginnst
if not self.ser or not self.ser.is_open:
print(
f"[{self.name}] Fehler: Serielle Schnittstelle ist nicht geöffnet.")
return
print(f"[{self.name}] Lese-Thread gestartet. Überwache {self.ser.port}...")
self.ser.timeout = self.read_timeout # Setze den Timeout für byteweises Lesen
self.running = True
while self.running:
try:
byte = self.ser.read(1)
if byte:
self.parser.parse_byte(byte[0])
else:
pass # Timeout, kein Byte verfügbar, Thread läuft weiter
except serial.SerialException as e:
print(f"[{self.name}] Lesefehler: {e}")
self.running = False
except Exception as e:
print(f"[{self.name}] Unerwarteter Fehler im Lese-Thread: {e}")
self.running = False
# Der Thread schließt den Port NICHT mehr, das ist Aufgabe des Hauptprogramms.
print(f"[{self.name}] Lese-Thread beendet.")
def stop(self):
self.running = False
print(f"[{self.name}] Lese-Thread wird beendet...")
def on_message_received_from_uart(message_id: int, payload: bytes, payload_length: int):
"""
Callback-Funktion, die aufgerufen wird, wenn der Parser eine vollständige,
gültige Nachricht empfangen hat.
"""
print(f"\n[MAIN] Nachricht erfolgreich empfangen! ID: 0x{
message_id:02X}")
print(f"[MAIN] Payload ({payload_length} Bytes): {
payload[:payload_length].hex().upper()}")
parsed_object = payload_parser.parse_payload(
message_id, payload[:payload_length])
if message_id == 0x04:
print(parsed_object)
table = Table(title="Clients")
columns = ["ClientId", "IsAvailable",
"IsSlotUsed", "MAC", "LastPing", "LastSuccesfullPing"]
rows = []
for x in parsed_object.clients:
mac_string = ':'.join(f'{byte:02x}' for byte in x.mac_address)
rows.append([str(x.client_id), str(x.is_available), str(x.is_slot_used), mac_string,
str(x.last_ping), str(x.last_successfull_ping)])
for column in columns:
table.add_column(column)
for row in rows:
table.add_row(*row, style='bright_green')
console = Console()
console.print(table)
def on_message_fail_from_uart(message_id: int, current_message_buffer: bytes,
current_index: int, error_type: ParserError):
"""
Callback-Funktion, die aufgerufen wird, wenn der Parser einen Fehler
beim Empfang einer Nachricht feststellt.
"""
print(f"\n[MAIN] Fehler beim Parsen der Nachricht! ID: 0x{
message_id:02X}")
print(f"[MAIN] Fehler: {error_type.name}")
print(f"[MAIN] Bisheriger Puffer ({current_index} Bytes): {
current_message_buffer[:current_index].hex().upper()}")
def run_uart_test():
"""
Führt den UART-Test durch: Sendet eine Nachricht und liest alle Antworten.
"""
ser = None
parser = UartMessageParser(
on_message_received_callback=on_message_received_from_uart,
on_message_fail_callback=on_message_fail_from_uart
)
message_builder = MessageBuilder()
try:
ser = serial.Serial(
port=SERIAL_PORT,
baudrate=BAUDRATE,
timeout=READ_TIMEOUT,
write_timeout=WRITE_TIMEOUT
)
print(f"Serielle Schnittstelle {
SERIAL_PORT} mit Baudrate {BAUDRATE} geöffnet.")
reader_thread = SerialReader(
ser_instance=ser,
read_timeout=10,
parser=parser
)
reader_thread.start() # Starte den Lese-Thread
while not reader_thread.running:
time.sleep(0.1)
print("\n--- UART Testkonsole ---")
print("Gib eine Zahl (1-10) ein, um eine Nachricht zu senden.")
print("Gib 'q' oder 'exit' ein, um das Programm zu beenden.")
while True:
# Warte auf Benutzereingabe
user_input = sys.stdin.readline().strip().lower()
if user_input in ('q', 'exit'):
break
try:
choice = int(user_input)
if choice in MESSAGES:
msg_info = MESSAGES[choice]
print(f"\n[MAIN] Sende Nachricht für Option {
choice} (MSGID: 0x{msg_info['msg_id']:02X})...")
try:
message_to_send = message_builder.build_message(
msg_info["msg_id"],
msg_info["payload"],
255 # Max Payload Length
)
print(f"[MAIN] Gebaute Nachricht zum Senden: {
message_to_send.hex().upper()}")
bytes_written = ser.write(message_to_send)
print(f"[MAIN] {bytes_written} Bytes gesendet.")
except (PayloadTooLargeError, BufferOverflowError) as e:
print(f"[MAIN] Fehler beim Bauen der Nachricht: {e}")
except Exception as e:
print(
f"[MAIN] Ein unerwarteter Fehler beim Senden der Nachricht ist aufgetreten: {e}")
else:
print(
"Ungültige Option. Bitte gib eine Zahl zwischen 1 und 10 ein.")
except ValueError:
print("Ungültige Eingabe. Bitte gib eine Zahl oder 'q' ein.")
except Exception as e:
print(
f"[MAIN] Ein unerwarteter Fehler bei der Eingabeverarbeitung ist aufgetreten: {e}")
# Verarbeite empfangene Nachrichten, die sich in der Queue angesammelt haben
while not received_message_queue.empty():
msg = received_message_queue.get()
print(
f" > [MAIN-Loop] Verarbeitet: MSGID=0x{msg.msgid:02X}, Length={msg.payload_len}")
received_message_queue.task_done()
except serial.SerialException as e:
print(f"Fehler beim Zugriff auf die serielle Schnittstelle: {e}")
print(f"Stelle sicher, dass '{
SERIAL_PORT}' der korrekte Port ist und nicht von einer anderen Anwendung verwendet wird.")
except KeyboardInterrupt:
print("\n[MAIN] Test durch Benutzer abgebrochen (Ctrl+C).")
except Exception as e:
print(f"Ein unerwarteter Fehler im Hauptprogramm ist aufgetreten: {e}")
finally:
if 'reader_thread' in locals() and reader_thread.is_alive():
reader_thread.stop()
reader_thread.join(timeout=5)
if reader_thread.is_alive():
print(
"[MAIN] Warnung: Lese-Thread konnte nicht sauber beendet werden.")
if ser and ser.is_open:
ser.close()
print("Serielle Schnittstelle geschlossen.")
print("[MAIN] Programm beendet.")
# Nachrichten-Mapping
MESSAGES = {
1: {"msg_id": 0x01, "payload": b"Echo Message 1"},
2: {"msg_id": 0x02, "payload": b"Version Request"},
3: {"msg_id": 0x03, "payload": b"Client Info Request"},
4: {"msg_id": 0x04, "payload": b"Custom Data 4"},
5: {"msg_id": 0x05, "payload": b"Custom Data 5"},
6: {"msg_id": 0x06, "payload": b"Custom Data 6"},
7: {"msg_id": 0x07, "payload": b"Custom Data 7"},
8: {"msg_id": 0x08, "payload": b"Custom Data 8"},
9: {"msg_id": 0x09, "payload": b"Custom Data 9"},
10: {"msg_id": 0x0A, "payload": b"Custom Data 10 - Last One!"},
}
# Führe den Test aus
if __name__ == "__main__":
run_uart_test()

View File

@ -1,115 +0,0 @@
import enum
START_BYTE = 0xAA
ESCAPE_BYTE = 0xBB
END_BYTE = 0xCC
class MessageBuilderError(Exception):
"""Basisklasse für Fehler des Message Builders."""
pass
class PayloadTooLargeError(MessageBuilderError):
"""Ausnahme, wenn der Payload zu groß für den Puffer ist."""
def __init__(self, required_size, buffer_size):
super().__init__(f"Payload ({
required_size} bytes) ist größer als der verfügbare Puffer ({buffer_size} bytes).")
self.required_size = required_size
self.buffer_size = buffer_size
class BufferOverflowError(MessageBuilderError):
"""Ausnahme, wenn der Puffer während des Bauens überläuft."""
def __init__(self, current_size, max_size, byte_to_add=None):
msg = f"Pufferüberlauf: Aktuelle Größe {
current_size}, Max. Größe {max_size}."
if byte_to_add is not None:
msg += f" Versuch, Byte 0x{byte_to_add:02X} hinzuzufügen."
super().__init__(msg)
self.current_size = current_size
self.max_size = max_size
self.byte_to_add = byte_to_add
class MessageBuilder:
"""
Klasse zum Aufbau von UART-Nachrichten gemäß dem definierten Protokoll,
inklusive Stuffing und Checksummenberechnung.
"""
def __init__(self):
pass
def _needs_stuffing_byte(self, byte: int) -> bool:
"""
Prüft, ob ein Byte ein Stuffing-Byte benötigt (d.h. ob es ein Steuerbyte ist).
"""
return (byte == START_BYTE or byte == ESCAPE_BYTE or byte == END_BYTE)
def _add_byte_with_length_check(self, byte: int, buffer: bytearray, max_length: int):
"""
Fügt ein Byte zum Puffer hinzu und prüft auf Pufferüberlauf.
Löst BufferOverflowError aus, wenn der Puffer voll ist.
"""
if len(buffer) >= max_length:
raise BufferOverflowError(len(buffer), max_length, byte)
buffer.append(byte)
def build_message(self, msgid: int, payload: bytes, msg_buffer_size: int) -> bytes:
"""
Baut eine vollständige UART-Nachricht.
Args:
msgid (int): Die Message ID (0-255).
payload (bytes): Die Nutzdaten der Nachricht als Byte-Objekt.
msg_buffer_size (int): Die maximale Größe des Ausgabepuffers.
Dies ist die maximale Länge der *fertigen* Nachricht.
Returns:
bytes: Die fertig aufgebaute Nachricht als Byte-Objekt.
Raises:
PayloadTooLargeError: Wenn der Payload (mit Overhead) den Puffer überschreiten würde.
BufferOverflowError: Wenn während des Bauens ein Pufferüberlauf auftritt.
"""
if len(payload) + 4 > msg_buffer_size:
raise PayloadTooLargeError(len(payload) + 4, msg_buffer_size)
checksum = 0
msg_buffer = bytearray()
# 1. StartByte hinzufügen
self._add_byte_with_length_check(
START_BYTE, msg_buffer, msg_buffer_size)
# 2. Message ID hinzufügen (mit Stuffing)
if self._needs_stuffing_byte(msgid):
self._add_byte_with_length_check(
ESCAPE_BYTE, msg_buffer, msg_buffer_size)
self._add_byte_with_length_check(msgid, msg_buffer, msg_buffer_size)
checksum ^= msgid
# 3. Payload-Bytes hinzufügen (mit Stuffing)
for byte_val in payload:
if self._needs_stuffing_byte(byte_val):
self._add_byte_with_length_check(
ESCAPE_BYTE, msg_buffer, msg_buffer_size)
self._add_byte_with_length_check(
byte_val, msg_buffer, msg_buffer_size)
checksum ^= byte_val
# 4. Checksumme hinzufügen (mit Stuffing)
if self._needs_stuffing_byte(checksum):
self._add_byte_with_length_check(
ESCAPE_BYTE, msg_buffer, msg_buffer_size)
self._add_byte_with_length_check(checksum, msg_buffer, msg_buffer_size)
# 5. EndByte hinzufügen
self._add_byte_with_length_check(END_BYTE, msg_buffer, msg_buffer_size)
# Konvertiere bytearray zu unveränderlichem bytes-Objekt
return bytes(msg_buffer)

View File

@ -1,170 +0,0 @@
import enum
# --- Konstanten für das UART-Protokoll ---
# Diese Werte müssen mit denen auf deinem Embedded-System übereinstimmen
START_BYTE = 0xAA
END_BYTE = 0xCC
ESCAPE_BYTE = 0x7D # Beispielwert, bitte an dein Protokoll anpassen
MAX_PAYLOAD_LENGTH = 255 # Maximale Länge des Nachrichten-Payloads (ohne Message ID und Checksumme)
# MAX_TOTAL_CONTENT_LENGTH in C beinhaltet Message ID, Payload und Checksumme.
# Hier definieren wir MAX_PAYLOAD_LENGTH, da der Parser den Payload sammelt.
# Die Gesamtgröße des empfangenen Puffers (message + checksum) darf MAX_PAYLOAD_LENGTH + 1 nicht überschreiten,
# da die Checksumme als letztes Byte des Payloads behandelt wird.
# --- Enumerationen für Parser-Zustände und Fehler ---
class ParserState(enum.Enum):
WAITING_FOR_START_BYTE = 0
GET_MESSAGE_TYPE = 1
ESCAPED_MESSAGE_TYPE = 2
IN_PAYLOAD = 3
ESCAPE_PAYLOAD_BYTE = 4
class ParserError(enum.Enum):
NO_ERROR = 0
UNEXPECTED_COMMAND_BYTE = 1
WRONG_CHECKSUM = 2
MESSAGE_TOO_LONG = 3
class UartMessageParser:
"""
Ein State-Machine-Parser für UART-Nachrichten basierend auf der bereitgestellten C-Logik.
Nachrichtenformat (angenommen):
[START_BYTE] [MESSAGE_ID] [PAYLOAD_BYTES...] [CHECKSUM_BYTE] [END_BYTE]
Escape-Sequenzen:
Wenn START_BYTE, END_BYTE oder ESCAPE_BYTE im MESSAGE_ID oder PAYLOAD vorkommen,
werden sie durch ESCAPE_BYTE gefolgt vom ursprünglichen Byte (nicht XORed) ersetzt.
Die Checksumme wird über die unescaped Bytes berechnet.
"""
def __init__(self, on_message_received_callback=None, on_message_fail_callback=None):
"""
Initialisiert den UART-Nachrichten-Parser.
Args:
on_message_received_callback (callable, optional): Eine Funktion, die aufgerufen wird,
wenn eine gültige Nachricht empfangen wurde.
Signatur: on_message_received(message_id: int, payload: bytes, payload_length: int)
on_message_fail_callback (callable, optional): Eine Funktion, die aufgerufen wird,
wenn ein Nachrichtenfehler auftritt.
Signatur: on_message_fail(message_id: int, current_message_buffer: bytes,
current_index: int, error_type: ParserError)
"""
self.state = ParserState.WAITING_FOR_START_BYTE
self.index = 0
self.checksum = 0
self.message_id = 0
self.message_buffer = bytearray(MAX_PAYLOAD_LENGTH + 1) # +1 für Checksummen-Byte
self.error = ParserError.NO_ERROR
# Callbacks für die Anwendung. Standardmäßig None oder einfache Print-Funktionen.
self.on_message_received = on_message_received_callback if on_message_received_callback else self._default_on_message_received
self.on_message_fail = on_message_fail_callback if on_message_fail_callback else self._default_on_message_fail
def _default_on_message_received(self, message_id, payload, payload_length):
"""Standard-Callback für empfangene Nachrichten, falls keiner angegeben ist."""
print(f"Parser: Nachricht empfangen! ID: 0x{message_id:02X}, "
f"Payload ({payload_length} Bytes): {payload[:payload_length].hex().upper()}")
def _default_on_message_fail(self, message_id, current_message_buffer, current_index, error_type):
"""Standard-Callback für Nachrichtenfehler, falls keiner angegeben ist."""
print(f"Parser: Fehler bei Nachricht! ID: 0x{message_id:02X}, "
f"Fehler: {error_type.name}, "
f"Bisheriger Puffer ({current_index} Bytes): {current_message_buffer[:current_index].hex().upper()}")
def parse_byte(self, pbyte: int):
"""
Verarbeitet ein einzelnes empfangenes Byte.
Args:
pbyte (int): Das empfangene Byte (0-255).
"""
# Sicherstellen, dass pbyte ein Integer im Bereich 0-255 ist
if not isinstance(pbyte, int) or not (0 <= pbyte <= 255):
print(f"Parser: Ungültiges Byte empfangen: {pbyte}. Muss ein Integer von 0-255 sein.")
return
current_state = self.state # Für bessere Lesbarkeit
if current_state == ParserState.WAITING_FOR_START_BYTE:
if pbyte == START_BYTE:
self.index = 0
self.checksum = 0
self.message_id = 0 # Reset message_id
self.error = ParserError.NO_ERROR # Reset error
self.state = ParserState.GET_MESSAGE_TYPE
# Andernfalls ignorieren wir Bytes, bis ein Start-Byte gefunden wird
elif current_state == ParserState.ESCAPED_MESSAGE_TYPE:
self.message_id = pbyte
self.checksum ^= pbyte
self.state = ParserState.IN_PAYLOAD
elif current_state == ParserState.GET_MESSAGE_TYPE:
if pbyte == ESCAPE_BYTE:
self.state = ParserState.ESCAPED_MESSAGE_TYPE
return # Dieses Byte wurde als Escape-Sequenz verarbeitet, nicht zum Payload hinzufügen
if pbyte == START_BYTE or pbyte == END_BYTE:
self.state = ParserState.WAITING_FOR_START_BYTE
self.error = ParserError.UNEXPECTED_COMMAND_BYTE
self.on_message_fail(self.message_id, self.message_buffer, self.index, self.error)
return
self.message_id = pbyte
self.checksum ^= pbyte
self.state = ParserState.IN_PAYLOAD
elif current_state == ParserState.ESCAPE_PAYLOAD_BYTE:
# Das escapte Byte ist Teil des Payloads
if self.index < MAX_PAYLOAD_LENGTH + 1: # +1 für Checksummen-Byte
self.message_buffer[self.index] = pbyte
self.index += 1
self.checksum ^= pbyte
self.state = ParserState.IN_PAYLOAD
else:
self.state = ParserState.WAITING_FOR_START_BYTE
self.error = ParserError.MESSAGE_TOO_LONG
self.on_message_fail(self.message_id, self.message_buffer, self.index, self.error)
return
elif current_state == ParserState.IN_PAYLOAD:
if pbyte == ESCAPE_BYTE:
self.state = ParserState.ESCAPE_PAYLOAD_BYTE
return # Dieses Byte wurde als Escape-Sequenz verarbeitet
if pbyte == START_BYTE:
self.state = ParserState.WAITING_FOR_START_BYTE
self.error = ParserError.UNEXPECTED_COMMAND_BYTE
self.on_message_fail(self.message_id, self.message_buffer, self.index, self.error)
return
if pbyte == END_BYTE:
if self.checksum != 0x00:
# Checksummenfehler: Die Checksumme wurde bis zum End-Byte XORed.
# Wenn die empfangene Checksumme korrekt war, sollte das Ergebnis 0 sein.
self.state = ParserState.WAITING_FOR_START_BYTE
self.error = ParserError.WRONG_CHECKSUM
self.on_message_fail(self.message_id, self.message_buffer, self.index, self.error)
return
# Erfolgreich empfangen! Die Checksumme ist das letzte Byte im Puffer.
# Die Länge des Payloads ist index - 1 (da das letzte Byte die Checksumme war).
payload_length = self.index - 1
if payload_length < 0: # Falls nur Message ID und Checksumme, aber kein Payload
payload_length = 0
self.on_message_received(self.message_id, self.message_buffer, payload_length)
self.state = ParserState.WAITING_FOR_START_BYTE
return # EndByte wurde verarbeitet, nicht zum Payload hinzufügen
# Normales Payload-Byte
if self.index < MAX_PAYLOAD_LENGTH + 1: # +1 für Checksummen-Byte
self.message_buffer[self.index] = pbyte
self.index += 1
self.checksum ^= pbyte
else:
# Nachricht zu lang
self.state = ParserState.WAITING_FOR_START_BYTE
self.error = ParserError.MESSAGE_TOO_LONG
self.on_message_fail(self.message_id, self.message_buffer, self.index, self.error)
return

View File

@ -1,192 +0,0 @@
import dataclasses
import struct
from typing import Optional, Union, List
@dataclasses.dataclass
class StatusMessage:
"""
Repräsentiert eine Status-Nachricht (z.B. Message ID 0x01).
Payload-Format: [status_code: uint8], [battery_level: uint8], [uptime_seconds: uint16]
"""
status_code: int
battery_level: int # 0-100%
uptime_seconds: int
@dataclasses.dataclass
class SensorDataMessage:
"""
Repräsentiert eine Sensor-Daten-Nachricht (z.B. Message ID 0x02).
Payload-Format: [temperature_celsius: int16], [humidity_percent: uint16]
"""
temperature_celsius: int # signed short
humidity_percent: int # unsigned short
@dataclasses.dataclass
class ClientEntry:
"""
Repräsentiert die Informationen für einen einzelnen Client innerhalb der ClientInfoMessage.
Payload-Format:
[client_id: uint8]
[is_available: uint8] (0=false, >0=true)
[is_slot_used: uint8] (0=false, >0=true)
[mac_address: bytes (6)]
[unoccupied_value_1: uint32]
[unoccupied_value_2: uint32]
Gesamt: 1 + 1 + 1 + 6 + 4 + 4 = 17 Bytes pro Eintrag.
"""
client_id: int
is_available: bool
is_slot_used: bool
mac_address: bytes # 6 Bytes MAC-Adresse
last_ping: int # 4 Bytes, unbelegt
last_successfull_ping: int # 4 Bytes, unbelegt
@dataclasses.dataclass
class ClientInfoMessage:
"""
Repräsentiert eine Nachricht mit Client-Informationen (Message ID 0x03).
Payload-Format:
[num_clients: uint8]
[client_entry_1: ClientEntry]
[client_entry_2: ClientEntry]
...
"""
num_clients: int
clients: List[ClientEntry]
@dataclasses.dataclass
class UnknownMessage:
"""
Repräsentiert eine Nachricht mit unbekannter ID oder fehlerhaftem Payload.
"""
message_id: int
raw_payload: bytes
error_message: str
# --- Payload Parser Klasse ---
class PayloadParser:
"""
Interpretiert den Payload einer UART-Nachricht basierend auf ihrer Message ID
und wandelt ihn in ein strukturiertes Python-Objekt (dataclass) um.
"""
def __init__(self):
# Ein Dictionary, das Message IDs auf ihre entsprechenden Parsing-Funktionen abbildet.
self._parser_map = {
0x01: self._parse_status_message,
0x02: self._parse_sensor_data_message,
# Aktualisiert für die neue 0x03 Struktur
0x03: self._parse_client_info_message,
0x04: self._parse_client_info_message,
# Füge hier weitere Message IDs und ihre Parsing-Funktionen hinzu
}
def _parse_status_message(self, payload: bytes) -> Union[StatusMessage, UnknownMessage]:
"""Parsen des Payloads für Message ID 0x01 (StatusMessage)."""
# Erwartetes Format: 1 Byte Status, 1 Byte Battery, 2 Bytes Uptime (Little-Endian)
if len(payload) != 4:
return UnknownMessage(0x01, payload, f"Falsche Payload-Länge für StatusMessage: Erwartet 4, Got {len(payload)}")
try:
# '<BBH' bedeutet: Little-Endian, Byte (unsigned char), Byte (unsigned char), Half-word (unsigned short)
status_code, battery_level, uptime_seconds = struct.unpack(
'<BBH', payload)
return StatusMessage(status_code, battery_level, uptime_seconds)
except struct.error as e:
return UnknownMessage(0x01, payload, f"Fehler beim Entpacken der StatusMessage: {e}")
def _parse_sensor_data_message(self, payload: bytes) -> Union[SensorDataMessage, UnknownMessage]:
"""Parsen des Payloads für Message ID 0x02 (SensorDataMessage)."""
# Erwartetes Format: 2 Bytes Temperatur (signed short), 2 Bytes Feuchtigkeit (unsigned short) (Little-Endian)
if len(payload) != 4:
return UnknownMessage(0x02, payload, f"Falsche Payload-Länge für SensorDataMessage: Erwartet 4, Got {len(payload)}")
try:
# '<hH' bedeutet: Little-Endian, short (signed), unsigned short
temperature_celsius, humidity_percent = struct.unpack(
'<hH', payload)
return SensorDataMessage(temperature_celsius, humidity_percent)
except struct.error as e:
return UnknownMessage(0x02, payload, f"Fehler beim Entpacken der SensorDataMessage: {e}")
def _parse_client_info_message(self, payload: bytes) -> Union[ClientInfoMessage, UnknownMessage]:
"""Parsen des Payloads für Message ID 0x03 (ClientInfoMessage)."""
if not payload:
# Wenn der Payload leer ist, aber num_clients erwartet wird, ist das ein Fehler
return UnknownMessage(0x03, payload, "Payload für ClientInfoMessage ist leer, aber num_clients erwartet.")
try:
# Das erste Byte ist die Anzahl der Clients
num_clients = payload[0]
# Die restlichen Bytes sind die Client-Einträge
client_data_bytes = payload[1:]
# 1 (ID) + 1 (Avail) + 1 (Used) + 6 (MAC) + 4 (Val1) + 4 (Val2)
EXPECTED_CLIENT_ENTRY_SIZE = 17
if len(client_data_bytes) != num_clients * EXPECTED_CLIENT_ENTRY_SIZE:
return UnknownMessage(0x03, payload,
f"Falsche Payload-Länge für Client-Einträge: Erwartet {
num_clients * EXPECTED_CLIENT_ENTRY_SIZE}, "
f"Got {len(client_data_bytes)} nach num_clients.")
clients_list: List[ClientEntry] = []
# Formatstring für einen Client-Eintrag:
# < : Little-Endian
# B : uint8 (client_id, is_available, is_slot_used)
# 6s: 6 Bytes (mac_address)
# I : uint32 (unoccupied_value_1, unoccupied_value_2)
CLIENT_ENTRY_FORMAT = '<BBB6sII'
for i in range(num_clients):
start_index = i * EXPECTED_CLIENT_ENTRY_SIZE
end_index = start_index + EXPECTED_CLIENT_ENTRY_SIZE
entry_bytes = client_data_bytes[start_index:end_index]
# Entpacke die Daten für einen Client-Eintrag
client_id, is_available_byte, is_slot_used_byte, mac_address, val1, val2 = \
struct.unpack(CLIENT_ENTRY_FORMAT, entry_bytes)
# Konvertiere 0/1 Bytes zu boolschen Werten
is_available = bool(is_available_byte)
is_slot_used = bool(is_slot_used_byte)
clients_list.append(ClientEntry(
client_id=client_id,
is_available=is_available,
is_slot_used=is_slot_used,
mac_address=mac_address,
last_ping=val1,
last_successfull_ping=val2
))
return ClientInfoMessage(num_clients=num_clients, clients=clients_list)
except struct.error as e:
return UnknownMessage(0x03, payload, f"Fehler beim Entpacken der ClientInfoMessage-Einträge: {e}")
except Exception as e:
return UnknownMessage(0x03, payload, f"Unerwarteter Fehler beim Parsen der ClientInfoMessage: {e}")
def parse_payload(self, message_id: int, payload: bytes) -> Union[StatusMessage, SensorDataMessage, ClientInfoMessage, UnknownMessage]:
"""
Interpretiert den gegebenen Payload basierend auf der Message ID.
Args:
message_id (int): Die ID der Nachricht.
payload (bytes): Die rohen Nutzdaten der Nachricht.
Returns:
Union[StatusMessage, SensorDataMessage, ClientInfoMessage, UnknownMessage]:
Ein dataclass-Objekt, das die dekodierten Daten repräsentiert,
oder ein UnknownMessage-Objekt bei unbekannter ID oder Parsing-Fehler.
"""
parser_func = self._parser_map.get(message_id)
if parser_func:
return parser_func(payload)
else:
return UnknownMessage(message_id, payload, "Unbekannte Message ID.")