esp_alox/tools/main.py
2025-07-23 16:49:17 +02:00

112 lines
3.7 KiB
Python

import serial
import time
from parser import UartMessageParser, ParserError
from message_builder import MessageBuilder, MessageBuilderError, PayloadTooLargeError, BufferOverflowError
import payload_parser
SERIAL_PORT = "/dev/ttyUSB0"
BAUDRATE = 115200
WRITE_TIMEOUT = 0.5
READ_TIMEOUT = 1.0
payload_parser = payload_parser.PayloadParser()
def on_message_received_from_uart(message_id: int, payload: bytes, payload_length: int):
"""
Callback-Funktion, die aufgerufen wird, wenn der Parser eine vollständige,
gültige Nachricht empfangen hat.
"""
print(f"\n[MAIN] Nachricht erfolgreich empfangen! ID: 0x{
message_id:02X}")
print(f"[MAIN] Payload ({payload_length} Bytes): {
payload[:payload_length].hex().upper()}")
parsed_object = payload_parser.parse_payload(
message_id, payload[:payload_length])
print(parsed_object)
def on_message_fail_from_uart(message_id: int, current_message_buffer: bytes,
current_index: int, error_type: ParserError):
"""
Callback-Funktion, die aufgerufen wird, wenn der Parser einen Fehler
beim Empfang einer Nachricht feststellt.
"""
print(f"\n[MAIN] Fehler beim Parsen der Nachricht! ID: 0x{
message_id:02X}")
print(f"[MAIN] Fehler: {error_type.name}")
print(f"[MAIN] Bisheriger Puffer ({current_index} Bytes): {
current_message_buffer[:current_index].hex().upper()}")
def run_uart_test():
"""
Führt den UART-Test durch: Sendet eine Nachricht und liest alle Antworten.
"""
ser = None
parser = UartMessageParser(
on_message_received_callback=on_message_received_from_uart,
on_message_fail_callback=on_message_fail_from_uart
)
message_builder = MessageBuilder()
try:
ser = serial.Serial(
port=SERIAL_PORT,
baudrate=BAUDRATE,
timeout=READ_TIMEOUT,
write_timeout=WRITE_TIMEOUT
)
print(f"Serielle Schnittstelle {
SERIAL_PORT} mit Baudrate {BAUDRATE} geöffnet.")
try:
# Baue die Nachricht
message_to_send = message_builder.build_message(
0x3,
b'',
255
)
print(f"\n[MAIN] Gebaute Nachricht zum Senden: {
message_to_send.hex().upper()}")
bytes_written = ser.write(message_to_send)
print(f"[MAIN] {bytes_written} Bytes gesendet.")
except (PayloadTooLargeError, BufferOverflowError) as e:
print(f"[MAIN] Fehler beim Bauen der Nachricht: {e}")
return # Beende die Funktion, wenn die Nachricht nicht gebaut werden kann
except Exception as e:
print(
f"[MAIN] Ein unerwarteter Fehler beim Bauen der Nachricht ist aufgetreten: {e}")
return
time.sleep(0.1)
received_data = ser.read_all()
if received_data:
print(f"Empfangene Daten ({len(received_data)} Bytes): {
received_data.hex().upper()}")
for byte_val in received_data:
parser.parse_byte(byte_val)
else:
print("Keine Daten empfangen.")
except serial.SerialException as e:
print(f"Fehler beim Zugriff auf die serielle Schnittstelle: {e}")
print(f"Stelle sicher, dass '{
SERIAL_PORT}' der korrekte Port ist und nicht von einer anderen Anwendung verwendet wird.")
except Exception as e:
print(f"Ein unerwarteter Fehler ist aufgetreten: {e}")
finally:
if ser and ser.is_open:
ser.close()
print("Serielle Schnittstelle geschlossen.")
# Führe den Test aus
if __name__ == "__main__":
run_uart_test()