Added many Eventbus Structs for the Frontend Handling

This commit is contained in:
simon 2026-02-08 23:49:22 +01:00
parent 7534459e73
commit f8aa19d331
5 changed files with 198 additions and 50 deletions

View File

@ -9,13 +9,20 @@ const (
CmdInitState = "init_state" CmdInitState = "init_state"
CmdConnect = "connect" CmdConnect = "connect"
CmdDisconnect = "disconnect" CmdDisconnect = "disconnect"
CmdSendMessage = "send"
CmdRX = "uart_rx" CmdRX = "uart_rx"
CmdTX = "uart_tx" CmdTX = "uart_tx"
) )
var MessageReceiveRegistry = map[string]func() any{
CmdConnect: func() any { return &WsUartConnect{} },
CmdDisconnect: func() any { return &WsUartDisconnect{} },
CmdSendMessage: func() any { return &WsUartSendMessage{} },
}
type WsMessage struct { type WsMessage struct {
Cmd string `json:"cmd"` Cmd string `json:"cmd"`
Payload any `json:"payload"` Payload []byte `json:"payload,omitempty"`
} }
type SystemState struct { type SystemState struct {
@ -28,13 +35,14 @@ type SystemState struct {
type WsUartConnect struct { type WsUartConnect struct {
SelectedAdapter string `json:"selected_adapter"` SelectedAdapter string `json:"selected_adapter"`
Baudrates string `json:"baudrates"` Baudrate int `json:"baudrate"`
} }
type WsUartDisconnect struct { type WsUartDisconnect struct {
} }
type WsUartSendMessage struct { type WsUartSendMessage struct {
MsgId byte `json:"msg_id"`
Data []byte `json:"data"` Data []byte `json:"data"`
} }

View File

@ -86,3 +86,25 @@ type PayloadOtaPayload struct {
type PayloadOtaStartEspNow struct { type PayloadOtaStartEspNow struct {
Data []byte Data []byte
} }
type ActionUartConnect struct {
Adapter string
Baudrate int
}
type ActionUartConnected struct {
Adapter string
Baudrate int
Error error
}
type ActionUartDisconnect struct {
}
type ActionUartDisconnected struct {
}
type ActionUartSendMessage struct {
MsgId byte
Data []byte
}

View File

@ -3,6 +3,7 @@ package frontend
import ( import (
"context" "context"
"embed" "embed"
"encoding/json"
"io/fs" "io/fs"
"log" "log"
"net/http" "net/http"
@ -33,16 +34,15 @@ func New(bus eventbus.EventBus) *FServer {
} }
func (fsrv *FServer) routes() { func (fsrv *FServer) routes() {
// Statische Dateien aus dem Embed-FS // Static files from the Embed-FS
// "www" Präfix entfernen, damit index.html unter / verfügbar ist // remove "www" prefix, so index.html is reachable over /
root, _ := fs.Sub(staticFiles, "www") root, _ := fs.Sub(staticFiles, "www")
fsrv.mux.Handle("/", http.FileServer(http.FS(root))) fsrv.mux.Handle("/", http.FileServer(http.FS(root)))
// WebSocket Endpunkt
fsrv.mux.HandleFunc("/ws", fsrv.handleWS) fsrv.mux.HandleFunc("/ws", fsrv.handleWS)
} }
func (fsrv *FServer) handleWS(w http.ResponseWriter, r *http.Request) { func (fs *FServer) handleWS(w http.ResponseWriter, r *http.Request) {
conn, err := upgrader.Upgrade(w, r, nil) conn, err := upgrader.Upgrade(w, r, nil)
if err != nil { if err != nil {
log.Printf("Upgrade error: %v", err) log.Printf("Upgrade error: %v", err)
@ -50,53 +50,126 @@ func (fsrv *FServer) handleWS(w http.ResponseWriter, r *http.Request) {
} }
defer conn.Close() defer conn.Close()
// Kanäle für die Hardware-Events abonnieren
rxChan := fsrv.bus.Subscribe(api.TopicUARTRx)
txChan := fsrv.bus.Subscribe(api.TopicUARTTx)
// Context nutzen, um Goroutinen zu stoppen, wenn die Verbindung abreißt // Context nutzen, um Goroutinen zu stoppen, wenn die Verbindung abreißt
ctx, cancel := context.WithCancel(r.Context()) ctx, cancel := context.WithCancel(r.Context())
defer cancel() defer cancel()
// WRITER: Send Events to Browser // WRITER: Send Events to Browser
go func() { go fs.HandleAppEvents(ctx, conn)
for {
select {
case <-ctx.Done():
return
case f := <-rxChan:
if err := conn.WriteJSON(map[string]any{"type": "rx", "frame": f}); err != nil {
return
}
case f := <-txChan:
if err := conn.WriteJSON(map[string]any{"type": "tx", "frame": f}); err != nil {
return
}
}
}
}()
// READER: Commands from Browser // READER: Commands from Browser
for { // This Function is Blocking
var cmd api.WsMessage fs.GetFrontendEvents(ctx, conn)
if err := conn.ReadJSON(&cmd); err != nil {
log.Printf("WS Read Error: %v", err)
break
} }
fsrv.bus.Publish(api.TopicFrontendCmd, cmd) func (fs *FServer) Start(addr string) error {
log.Printf("Browser Action: %s auf with %v", cmd.Cmd, cmd.Payload)
}
}
func (fsrv *FServer) Start(addr string) error {
server := &http.Server{ server := &http.Server{
Addr: addr, Addr: addr,
Handler: fsrv.mux, Handler: fs.mux,
ReadTimeout: 5 * time.Second, ReadTimeout: 5 * time.Second,
WriteTimeout: 10 * time.Second, WriteTimeout: 10 * time.Second,
} }
ctx, cancle := context.WithCancel(context.Background())
defer cancle()
go fs.HandleFrontendEvents(ctx)
log.Printf("Frontend Server gestartet auf %s", addr) log.Printf("Frontend Server gestartet auf %s", addr)
return server.ListenAndServe() return server.ListenAndServe()
} }
func (fs *FServer) HandleAppEvents(ctx context.Context, conn *websocket.Conn) error {
// Kanäle für die Hardware-Events abonnieren
rxChan := fs.bus.Subscribe(api.TopicUARTRx)
txChan := fs.bus.Subscribe(api.TopicUARTTx)
UartActions := fs.bus.Subscribe(api.TopicUartAction)
for {
select {
case <-ctx.Done():
return nil
case f := <-rxChan:
if err := conn.WriteJSON(map[string]any{"type": "rx", "frame": f}); err != nil {
return nil
}
case f := <-txChan:
if err := conn.WriteJSON(map[string]any{"type": "tx", "frame": f}); err != nil {
return nil
}
case msgT := <-UartActions:
switch msg := msgT.(type) {
case api.ActionUartConnected:
// TODO: nicht hier die daten nachhaltig speichern damit sie ans frontend gesendet werden können
// TODO: das muss irgendwo central passieren nicht für jeden client
if msg.Error != nil {
}
continue
case api.ActionUartDisconnected:
continue
}
}
}
}
func (fs *FServer) GetFrontendEvents(ctx context.Context, conn *websocket.Conn) error {
for {
select {
case <-ctx.Done():
return nil
default:
var cmd api.WsMessage
if err := conn.ReadJSON(&cmd); err != nil {
log.Printf("WS Read Error: %v", err)
return err
}
val, ok := api.MessageReceiveRegistry[cmd.Cmd]
if !ok {
log.Printf("No Message Type mapped to %v", cmd.Cmd)
continue
}
valM := val()
err := json.Unmarshal(cmd.Payload, valM)
if err != nil {
log.Printf("Could not Unmarshal payload %v", cmd.Payload)
}
fs.bus.Publish(api.TopicFrontendCmd, valM)
log.Printf("Browser Action: %s auf with %v", cmd.Cmd, cmd.Payload)
}
}
}
func (fs *FServer) HandleFrontendEvents(ctx context.Context) error {
fChan := fs.bus.Subscribe(api.TopicFrontendCmd)
for {
select {
case <-ctx.Done():
return nil
case msg := <-fChan:
switch msgT := msg.(type) {
case api.WsUartSendMessage:
log.Printf("Sending Uart Data % X", msgT.Data)
fs.bus.Publish(api.TopicUartAction, api.ActionUartSendMessage{
MsgId: msgT.MsgId,
Data: msgT.Data,
})
continue
case api.WsUartConnect:
log.Printf("Connect with %s : %d", msgT.SelectedAdapter, msgT.Baudrate)
fs.bus.Publish(api.TopicUartAction, api.ActionUartConnect{
Adapter: msgT.SelectedAdapter,
Baudrate: msgT.Baudrate,
})
continue
case api.WsUartDisconnect:
log.Printf("Disconnect from Uart Adapter")
fs.bus.Publish(api.TopicUartAction, api.ActionUartDisconnect{})
continue
}
}
}
}

View File

@ -58,7 +58,18 @@ func StartTests(config Config) {
func StartApp(config Config) { func StartApp(config Config) {
bus := eventbus.New() bus := eventbus.New()
com, err := uart.Connect(bus, config.UartPort, config.Baudrate) com, err := uart.NewCom(bus)
ctx, cancle := context.WithCancel(context.Background())
defer cancle()
go com.EventbusHandler(ctx)
if err != nil {
log.Printf("Could not Create Com with Uart Device %v", err)
return
}
err = com.Connect(config.UartPort, config.Baudrate)
if err != nil { if err != nil {
log.Printf("Could not Connect with Uart Device %v", err) log.Printf("Could not Connect with Uart Device %v", err)
} }
@ -72,8 +83,6 @@ func StartApp(config Config) {
updateSlices := SliceUpdate(update, 200) updateSlices := SliceUpdate(update, 200)
oManager := NewOTAManager(bus, com, updateSlices) oManager := NewOTAManager(bus, com, updateSlices)
ctx, cancle := context.WithCancel(context.Background())
defer cancle()
StartMessageHandling(ctx, bus) StartMessageHandling(ctx, bus)
oManager.StartUpdateHandler(ctx) oManager.StartUpdateHandler(ctx)

View File

@ -2,6 +2,7 @@ package uart
import ( import (
"context" "context"
"fmt"
"log" "log"
"time" "time"
@ -16,15 +17,26 @@ type Com struct {
cancel context.CancelFunc cancel context.CancelFunc
} }
func Connect(bus eventbus.EventBus, portName string, baudrate int) (*Com, error) { func NewCom(bus eventbus.EventBus) (*Com, error) {
return &Com{
bus: bus,
port: nil,
cancel: nil,
}, nil
}
func (c *Com) Connect(portName string, baudrate int) error {
if c.port != nil {
return fmt.Errorf("Port already connected")
}
mode := &serial.Mode{BaudRate: baudrate} mode := &serial.Mode{BaudRate: baudrate}
port, err := serial.Open(portName, mode) port, err := serial.Open(portName, mode)
if err != nil { if err != nil {
return nil, err return err
} }
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
drv := New(bus) drv := New(c.bus)
go func() { go func() {
buff := make([]byte, 1024) buff := make([]byte, 1024)
@ -48,11 +60,9 @@ func Connect(bus eventbus.EventBus, portName string, baudrate int) (*Com, error)
} }
}() }()
return &Com{ c.port = port
bus: bus, c.cancel = cancel
port: port, return nil
cancel: cancel,
}, nil
} }
func (c *Com) Close() { func (c *Com) Close() {
@ -100,3 +110,29 @@ func (c *Com) Send(id byte, payload []byte) error {
return err return err
} }
func (c *Com) EventbusHandler(ctx context.Context) error {
UActions := c.bus.Subscribe(api.TopicUartAction)
for {
select {
case <-ctx.Done():
return nil
case msgT := <-UActions:
switch msg := msgT.(type) {
case api.ActionUartConnect:
err := c.Connect(msg.Adapter, msg.Baudrate)
c.bus.Publish(api.TopicUartAction, api.ActionUartConnected{
Adapter: msg.Adapter,
Baudrate: msg.Baudrate,
Error: err,
})
case api.ActionUartDisconnect:
c.Close()
c.bus.Publish(api.TopicUartAction, api.ActionUartDisconnected{})
case api.ActionUartSendMessage:
c.Send(msg.MsgId, msg.Data)
}
}
}
}