esp_alox/tools/payload_parser.py
2025-07-23 16:49:17 +02:00

193 lines
7.8 KiB
Python

import dataclasses
import struct
from typing import Optional, Union, List
@dataclasses.dataclass
class StatusMessage:
"""
Repräsentiert eine Status-Nachricht (z.B. Message ID 0x01).
Payload-Format: [status_code: uint8], [battery_level: uint8], [uptime_seconds: uint16]
"""
status_code: int
battery_level: int # 0-100%
uptime_seconds: int
@dataclasses.dataclass
class SensorDataMessage:
"""
Repräsentiert eine Sensor-Daten-Nachricht (z.B. Message ID 0x02).
Payload-Format: [temperature_celsius: int16], [humidity_percent: uint16]
"""
temperature_celsius: int # signed short
humidity_percent: int # unsigned short
@dataclasses.dataclass
class ClientEntry:
"""
Repräsentiert die Informationen für einen einzelnen Client innerhalb der ClientInfoMessage.
Payload-Format:
[client_id: uint8]
[is_available: uint8] (0=false, >0=true)
[is_slot_used: uint8] (0=false, >0=true)
[mac_address: bytes (6)]
[unoccupied_value_1: uint32]
[unoccupied_value_2: uint32]
Gesamt: 1 + 1 + 1 + 6 + 4 + 4 = 17 Bytes pro Eintrag.
"""
client_id: int
is_available: bool
is_slot_used: bool
mac_address: bytes # 6 Bytes MAC-Adresse
last_ping: int # 4 Bytes, unbelegt
last_successfull_ping: int # 4 Bytes, unbelegt
@dataclasses.dataclass
class ClientInfoMessage:
"""
Repräsentiert eine Nachricht mit Client-Informationen (Message ID 0x03).
Payload-Format:
[num_clients: uint8]
[client_entry_1: ClientEntry]
[client_entry_2: ClientEntry]
...
"""
num_clients: int
clients: List[ClientEntry]
@dataclasses.dataclass
class UnknownMessage:
"""
Repräsentiert eine Nachricht mit unbekannter ID oder fehlerhaftem Payload.
"""
message_id: int
raw_payload: bytes
error_message: str
# --- Payload Parser Klasse ---
class PayloadParser:
"""
Interpretiert den Payload einer UART-Nachricht basierend auf ihrer Message ID
und wandelt ihn in ein strukturiertes Python-Objekt (dataclass) um.
"""
def __init__(self):
# Ein Dictionary, das Message IDs auf ihre entsprechenden Parsing-Funktionen abbildet.
self._parser_map = {
0x01: self._parse_status_message,
0x02: self._parse_sensor_data_message,
# Aktualisiert für die neue 0x03 Struktur
0x03: self._parse_client_info_message,
0x04: self._parse_client_info_message,
# Füge hier weitere Message IDs und ihre Parsing-Funktionen hinzu
}
def _parse_status_message(self, payload: bytes) -> Union[StatusMessage, UnknownMessage]:
"""Parsen des Payloads für Message ID 0x01 (StatusMessage)."""
# Erwartetes Format: 1 Byte Status, 1 Byte Battery, 2 Bytes Uptime (Little-Endian)
if len(payload) != 4:
return UnknownMessage(0x01, payload, f"Falsche Payload-Länge für StatusMessage: Erwartet 4, Got {len(payload)}")
try:
# '<BBH' bedeutet: Little-Endian, Byte (unsigned char), Byte (unsigned char), Half-word (unsigned short)
status_code, battery_level, uptime_seconds = struct.unpack(
'<BBH', payload)
return StatusMessage(status_code, battery_level, uptime_seconds)
except struct.error as e:
return UnknownMessage(0x01, payload, f"Fehler beim Entpacken der StatusMessage: {e}")
def _parse_sensor_data_message(self, payload: bytes) -> Union[SensorDataMessage, UnknownMessage]:
"""Parsen des Payloads für Message ID 0x02 (SensorDataMessage)."""
# Erwartetes Format: 2 Bytes Temperatur (signed short), 2 Bytes Feuchtigkeit (unsigned short) (Little-Endian)
if len(payload) != 4:
return UnknownMessage(0x02, payload, f"Falsche Payload-Länge für SensorDataMessage: Erwartet 4, Got {len(payload)}")
try:
# '<hH' bedeutet: Little-Endian, short (signed), unsigned short
temperature_celsius, humidity_percent = struct.unpack(
'<hH', payload)
return SensorDataMessage(temperature_celsius, humidity_percent)
except struct.error as e:
return UnknownMessage(0x02, payload, f"Fehler beim Entpacken der SensorDataMessage: {e}")
def _parse_client_info_message(self, payload: bytes) -> Union[ClientInfoMessage, UnknownMessage]:
"""Parsen des Payloads für Message ID 0x03 (ClientInfoMessage)."""
if not payload:
# Wenn der Payload leer ist, aber num_clients erwartet wird, ist das ein Fehler
return UnknownMessage(0x03, payload, "Payload für ClientInfoMessage ist leer, aber num_clients erwartet.")
try:
# Das erste Byte ist die Anzahl der Clients
num_clients = payload[0]
# Die restlichen Bytes sind die Client-Einträge
client_data_bytes = payload[1:]
# 1 (ID) + 1 (Avail) + 1 (Used) + 6 (MAC) + 4 (Val1) + 4 (Val2)
EXPECTED_CLIENT_ENTRY_SIZE = 17
if len(client_data_bytes) != num_clients * EXPECTED_CLIENT_ENTRY_SIZE:
return UnknownMessage(0x03, payload,
f"Falsche Payload-Länge für Client-Einträge: Erwartet {
num_clients * EXPECTED_CLIENT_ENTRY_SIZE}, "
f"Got {len(client_data_bytes)} nach num_clients.")
clients_list: List[ClientEntry] = []
# Formatstring für einen Client-Eintrag:
# < : Little-Endian
# B : uint8 (client_id, is_available, is_slot_used)
# 6s: 6 Bytes (mac_address)
# I : uint32 (unoccupied_value_1, unoccupied_value_2)
CLIENT_ENTRY_FORMAT = '<BBB6sII'
for i in range(num_clients):
start_index = i * EXPECTED_CLIENT_ENTRY_SIZE
end_index = start_index + EXPECTED_CLIENT_ENTRY_SIZE
entry_bytes = client_data_bytes[start_index:end_index]
# Entpacke die Daten für einen Client-Eintrag
client_id, is_available_byte, is_slot_used_byte, mac_address, val1, val2 = \
struct.unpack(CLIENT_ENTRY_FORMAT, entry_bytes)
# Konvertiere 0/1 Bytes zu boolschen Werten
is_available = bool(is_available_byte)
is_slot_used = bool(is_slot_used_byte)
clients_list.append(ClientEntry(
client_id=client_id,
is_available=is_available,
is_slot_used=is_slot_used,
mac_address=mac_address,
last_ping=val1,
last_successfull_ping=val2
))
return ClientInfoMessage(num_clients=num_clients, clients=clients_list)
except struct.error as e:
return UnknownMessage(0x03, payload, f"Fehler beim Entpacken der ClientInfoMessage-Einträge: {e}")
except Exception as e:
return UnknownMessage(0x03, payload, f"Unerwarteter Fehler beim Parsen der ClientInfoMessage: {e}")
def parse_payload(self, message_id: int, payload: bytes) -> Union[StatusMessage, SensorDataMessage, ClientInfoMessage, UnknownMessage]:
"""
Interpretiert den gegebenen Payload basierend auf der Message ID.
Args:
message_id (int): Die ID der Nachricht.
payload (bytes): Die rohen Nutzdaten der Nachricht.
Returns:
Union[StatusMessage, SensorDataMessage, ClientInfoMessage, UnknownMessage]:
Ein dataclass-Objekt, das die dekodierten Daten repräsentiert,
oder ein UnknownMessage-Objekt bei unbekannter ID oder Parsing-Fehler.
"""
parser_func = self._parser_map.get(message_id)
if parser_func:
return parser_func(payload)
else:
return UnknownMessage(message_id, payload, "Unbekannte Message ID.")