Improved Tool for Testing with Threading

This commit is contained in:
simon 2025-07-23 17:36:28 +02:00
parent fad6a0aee2
commit 73d3e24786

View File

@ -1,17 +1,85 @@
import queue # Zum sicheren Datenaustausch zwischen Threads
import serial
import time
import threading
import sys
from parser import UartMessageParser, ParserError
from message_builder import MessageBuilder, MessageBuilderError, PayloadTooLargeError, BufferOverflowError
import payload_parser
from rich.console import Console
from rich.table import Table
SERIAL_PORT = "/dev/ttyUSB0"
BAUDRATE = 115200
WRITE_TIMEOUT = 0.5
READ_TIMEOUT = 1.0
WRITE_TIMEOUT = 1.5
READ_TIMEOUT = 2.0
payload_parser = payload_parser.PayloadParser()
def on_message_received_from_uart(parsed_message):
print(f"[CALLBACK] Nachricht empfangen: MSGID=0x{
parsed_message.msgid:02X}, Length={parsed_message.payload_len}")
received_message_queue.put(parsed_message)
def on_message_fail_from_uart(error_message):
print(f"[CALLBACK] Fehler beim Parsen: {error_message}")
class ParsedMessage:
def __init__(self, msgid, payload_len):
self.msgid = msgid
self.payload_len = payload_len
received_message_queue = queue.Queue()
class SerialReader(threading.Thread):
# Ändere den Konstruktor, um eine bereits geöffnete serielle Instanz zu akzeptieren
def __init__(self, ser_instance, read_timeout, parser):
super().__init__()
# Speichere die übergebene serielle Instanz
self.ser = ser_instance
self.read_timeout = read_timeout
self.parser = parser
self.running = False
self.daemon = True # Thread beendet sich mit dem Hauptprogramm
def run(self):
# Überprüfe, ob die serielle Schnittstelle wirklich offen ist, bevor du beginnst
if not self.ser or not self.ser.is_open:
print(
f"[{self.name}] Fehler: Serielle Schnittstelle ist nicht geöffnet.")
return
print(f"[{self.name}] Lese-Thread gestartet. Überwache {self.ser.port}...")
self.ser.timeout = self.read_timeout # Setze den Timeout für byteweises Lesen
self.running = True
while self.running:
try:
byte = self.ser.read(1)
if byte:
self.parser.parse_byte(byte[0])
else:
pass # Timeout, kein Byte verfügbar, Thread läuft weiter
except serial.SerialException as e:
print(f"[{self.name}] Lesefehler: {e}")
self.running = False
except Exception as e:
print(f"[{self.name}] Unerwarteter Fehler im Lese-Thread: {e}")
self.running = False
# Der Thread schließt den Port NICHT mehr, das ist Aufgabe des Hauptprogramms.
print(f"[{self.name}] Lese-Thread beendet.")
def stop(self):
self.running = False
print(f"[{self.name}] Lese-Thread wird beendet...")
def on_message_received_from_uart(message_id: int, payload: bytes, payload_length: int):
"""
Callback-Funktion, die aufgerufen wird, wenn der Parser eine vollständige,
@ -25,7 +93,26 @@ def on_message_received_from_uart(message_id: int, payload: bytes, payload_lengt
parsed_object = payload_parser.parse_payload(
message_id, payload[:payload_length])
print(parsed_object)
if message_id == 0x04:
print(parsed_object)
table = Table(title="Clients")
columns = ["ClientId", "IsAvailable",
"IsSlotUsed", "MAC", "LastPing", "LastSuccesfullPing"]
rows = []
for x in parsed_object.clients:
mac_string = ':'.join(f'{byte:02x}' for byte in x.mac_address)
rows.append([str(x.client_id), str(x.is_available), str(x.is_slot_used), mac_string,
str(x.last_ping), str(x.last_successfull_ping)])
for column in columns:
table.add_column(column)
for row in rows:
table.add_row(*row, style='bright_green')
console = Console()
console.print(table)
def on_message_fail_from_uart(message_id: int, current_message_buffer: bytes,
@ -63,49 +150,99 @@ def run_uart_test():
print(f"Serielle Schnittstelle {
SERIAL_PORT} mit Baudrate {BAUDRATE} geöffnet.")
try:
# Baue die Nachricht
message_to_send = message_builder.build_message(
0x3,
b'',
255
)
print(f"\n[MAIN] Gebaute Nachricht zum Senden: {
message_to_send.hex().upper()}")
bytes_written = ser.write(message_to_send)
print(f"[MAIN] {bytes_written} Bytes gesendet.")
except (PayloadTooLargeError, BufferOverflowError) as e:
print(f"[MAIN] Fehler beim Bauen der Nachricht: {e}")
return # Beende die Funktion, wenn die Nachricht nicht gebaut werden kann
except Exception as e:
print(
f"[MAIN] Ein unerwarteter Fehler beim Bauen der Nachricht ist aufgetreten: {e}")
return
reader_thread = SerialReader(
ser_instance=ser,
read_timeout=10,
parser=parser
)
reader_thread.start() # Starte den Lese-Thread
time.sleep(0.1)
while not reader_thread.running:
time.sleep(0.1)
received_data = ser.read_all()
print("\n--- UART Testkonsole ---")
print("Gib eine Zahl (1-10) ein, um eine Nachricht zu senden.")
print("Gib 'q' oder 'exit' ein, um das Programm zu beenden.")
if received_data:
print(f"Empfangene Daten ({len(received_data)} Bytes): {
received_data.hex().upper()}")
for byte_val in received_data:
parser.parse_byte(byte_val)
else:
print("Keine Daten empfangen.")
while True:
# Warte auf Benutzereingabe
user_input = sys.stdin.readline().strip().lower()
if user_input in ('q', 'exit'):
break
try:
choice = int(user_input)
if choice in MESSAGES:
msg_info = MESSAGES[choice]
print(f"\n[MAIN] Sende Nachricht für Option {
choice} (MSGID: 0x{msg_info['msg_id']:02X})...")
try:
message_to_send = message_builder.build_message(
msg_info["msg_id"],
msg_info["payload"],
255 # Max Payload Length
)
print(f"[MAIN] Gebaute Nachricht zum Senden: {
message_to_send.hex().upper()}")
bytes_written = ser.write(message_to_send)
print(f"[MAIN] {bytes_written} Bytes gesendet.")
except (PayloadTooLargeError, BufferOverflowError) as e:
print(f"[MAIN] Fehler beim Bauen der Nachricht: {e}")
except Exception as e:
print(
f"[MAIN] Ein unerwarteter Fehler beim Senden der Nachricht ist aufgetreten: {e}")
else:
print(
"Ungültige Option. Bitte gib eine Zahl zwischen 1 und 10 ein.")
except ValueError:
print("Ungültige Eingabe. Bitte gib eine Zahl oder 'q' ein.")
except Exception as e:
print(
f"[MAIN] Ein unerwarteter Fehler bei der Eingabeverarbeitung ist aufgetreten: {e}")
# Verarbeite empfangene Nachrichten, die sich in der Queue angesammelt haben
while not received_message_queue.empty():
msg = received_message_queue.get()
print(
f" > [MAIN-Loop] Verarbeitet: MSGID=0x{msg.msgid:02X}, Length={msg.payload_len}")
received_message_queue.task_done()
except serial.SerialException as e:
print(f"Fehler beim Zugriff auf die serielle Schnittstelle: {e}")
print(f"Stelle sicher, dass '{
SERIAL_PORT}' der korrekte Port ist und nicht von einer anderen Anwendung verwendet wird.")
except KeyboardInterrupt:
print("\n[MAIN] Test durch Benutzer abgebrochen (Ctrl+C).")
except Exception as e:
print(f"Ein unerwarteter Fehler ist aufgetreten: {e}")
print(f"Ein unerwarteter Fehler im Hauptprogramm ist aufgetreten: {e}")
finally:
if 'reader_thread' in locals() and reader_thread.is_alive():
reader_thread.stop()
reader_thread.join(timeout=5)
if reader_thread.is_alive():
print(
"[MAIN] Warnung: Lese-Thread konnte nicht sauber beendet werden.")
if ser and ser.is_open:
ser.close()
print("Serielle Schnittstelle geschlossen.")
print("[MAIN] Programm beendet.")
# Nachrichten-Mapping
MESSAGES = {
1: {"msg_id": 0x01, "payload": b"Echo Message 1"},
2: {"msg_id": 0x02, "payload": b"Version Request"},
3: {"msg_id": 0x03, "payload": b"Client Info Request"},
4: {"msg_id": 0x04, "payload": b"Custom Data 4"},
5: {"msg_id": 0x05, "payload": b"Custom Data 5"},
6: {"msg_id": 0x06, "payload": b"Custom Data 6"},
7: {"msg_id": 0x07, "payload": b"Custom Data 7"},
8: {"msg_id": 0x08, "payload": b"Custom Data 8"},
9: {"msg_id": 0x09, "payload": b"Custom Data 9"},
10: {"msg_id": 0x0A, "payload": b"Custom Data 10 - Last One!"},
}
# Führe den Test aus
if __name__ == "__main__":
run_uart_test()