package main import ( "encoding/hex" "encoding/json" "errors" "fmt" "log" "sync" "time" "github.com/gorilla/websocket" "powerpod/gotool/pb" ) type MasterView struct { Version uint32 `json:"version"` GitHash string `json:"git_hash"` RunningPartition string `json:"running_partition,omitempty"` Deadzone uint32 `json:"deadzone,omitempty"` OK bool `json:"ok"` Error string `json:"error,omitempty"` } type ClientView struct { ID uint32 `json:"id"` MAC string `json:"mac"` Version uint32 `json:"version"` Deadzone uint32 `json:"deadzone,omitempty"` Available bool `json:"available"` Used bool `json:"used"` LastPing uint32 `json:"last_ping"` LastSuccessPing uint32 `json:"last_success_ping"` AccelValid bool `json:"accel_valid"` AccelX int32 `json:"accel_x"` AccelY int32 `json:"accel_y"` AccelZ int32 `json:"accel_z"` AccelAgeMs uint32 `json:"accel_age_ms"` AccelStream bool `json:"accel_stream"` } type DashboardState struct { UpdatedAt string `json:"updated_at"` SerialPort string `json:"serial_port"` UARTConnected bool `json:"uart_connected"` SerialOK bool `json:"serial_ok"` SerialError string `json:"serial_error,omitempty"` Master MasterView `json:"master"` Clients []ClientView `json:"clients"` } type wsHub struct { mu sync.RWMutex clients map[*websocket.Conn]struct{} state DashboardState } func newWSHub() *wsHub { return &wsHub{clients: make(map[*websocket.Conn]struct{})} } func (h *wsHub) setState(st DashboardState) { h.mu.Lock() prev := h.state.Clients st.Clients = preserveClientAccel(st.Clients, prev) h.state = st conns := make([]*websocket.Conn, 0, len(h.clients)) for c := range h.clients { conns = append(conns, c) } h.mu.Unlock() data, err := json.Marshal(st) if err != nil { return } for _, c := range conns { _ = c.WriteMessage(websocket.TextMessage, data) } } func (h *wsHub) register(c *websocket.Conn) { h.mu.Lock() h.clients[c] = struct{}{} snap := h.state h.mu.Unlock() if data, err := json.Marshal(snap); err == nil { _ = c.WriteMessage(websocket.TextMessage, data) } } func (h *wsHub) unregister(c *websocket.Conn) { h.mu.Lock() delete(h.clients, c) h.mu.Unlock() } func applyAccelSamples(clients []ClientView, samples []*pb.AccelSample) []ClientView { if len(samples) == 0 { return clients } byID := make(map[uint32]*pb.AccelSample, len(samples)) for _, s := range samples { byID[s.GetClientId()] = s } out := make([]ClientView, len(clients)) for i, c := range clients { out[i] = c if !c.AccelStream { out[i].AccelValid = false continue } s, ok := byID[c.ID] if !ok { continue } out[i].AccelValid = s.GetValid() if s.GetValid() { out[i].AccelX = s.GetX() out[i].AccelY = s.GetY() out[i].AccelZ = s.GetZ() out[i].AccelAgeMs = s.GetAgeMs() } } return out } func preserveClientAccel(newClients, oldClients []ClientView) []ClientView { if len(oldClients) == 0 { return newClients } oldByID := make(map[uint32]ClientView, len(oldClients)) for _, c := range oldClients { oldByID[c.ID] = c } out := make([]ClientView, len(newClients)) for i, c := range newClients { out[i] = c if !c.AccelStream { continue } prev, ok := oldByID[c.ID] if !ok || !prev.AccelValid { continue } if !c.AccelValid { out[i].AccelValid = prev.AccelValid out[i].AccelX = prev.AccelX out[i].AccelY = prev.AccelY out[i].AccelZ = prev.AccelZ out[i].AccelAgeMs = prev.AccelAgeMs } } return out } func anyClientAccelStream(clients []ClientView) bool { for _, c := range clients { if c.AccelStream { return true } } return false } // patchClientAccelStream updates stream flag immediately (e.g. after REST) and pushes WS. func (h *wsHub) patchClientAccelStream(clientID uint32, enabled bool) { h.mu.Lock() for i := range h.state.Clients { if h.state.Clients[i].ID != clientID { continue } h.state.Clients[i].AccelStream = enabled if !enabled { h.state.Clients[i].AccelValid = false h.state.Clients[i].AccelX = 0 h.state.Clients[i].AccelY = 0 h.state.Clients[i].AccelZ = 0 h.state.Clients[i].AccelAgeMs = 0 } break } st := h.state st.UpdatedAt = time.Now().Format(time.RFC3339) conns := make([]*websocket.Conn, 0, len(h.clients)) for c := range h.clients { conns = append(conns, c) } h.mu.Unlock() data, err := json.Marshal(st) if err != nil { return } for _, c := range conns { _ = c.WriteMessage(websocket.TextMessage, data) } } func (h *wsHub) anyAccelStreamEnabled() bool { h.mu.RLock() defer h.mu.RUnlock() return anyClientAccelStream(h.state.Clients) } // mergeAccel updates cached accel on clients and pushes state to dashboard WebSockets. func (h *wsHub) mergeAccel(samples []*pb.AccelSample) { h.mu.Lock() st := h.state st.Clients = applyAccelSamples(st.Clients, samples) st.UpdatedAt = time.Now().Format(time.RFC3339) h.state = st conns := make([]*websocket.Conn, 0, len(h.clients)) for c := range h.clients { conns = append(conns, c) } h.mu.Unlock() data, err := json.Marshal(st) if err != nil { return } for _, c := range conns { _ = c.WriteMessage(websocket.TextMessage, data) } } func (h *wsHub) broadcastRaw(v any) { h.mu.RLock() conns := make([]*websocket.Conn, 0, len(h.clients)) for c := range h.clients { conns = append(conns, c) } h.mu.RUnlock() data, err := json.Marshal(v) if err != nil { return } for _, c := range conns { _ = c.WriteMessage(websocket.TextMessage, data) } } func pollDashboard(link *managedSerial, portName string, last *DashboardState, streamCtl *accelStreamCtl) DashboardState { st := DashboardState{ UpdatedAt: time.Now().Format(time.RFC3339), SerialPort: portName, Clients: []ClientView{}, } ver, err := link.getVersionPoll() if errors.Is(err, errUARTBusy) { return pausedPollState(portName, last) } if err != nil { return disconnectedState(portName, err) } st.UARTConnected = true st.SerialOK = true st.Master = MasterView{ Version: ver.GetVersion(), GitHash: ver.GetGitHash(), RunningPartition: ver.GetRunningPartition(), OK: true, } if dz, err := readDeadzonePoll(link, 0); err == nil { st.Master.Deadzone = dz } clients, err := link.listClientsPoll() if err != nil { if errors.Is(err, errUARTBusy) { return pausedPollState(portName, last) } st.SerialOK = false st.SerialError = err.Error() st.UARTConnected = link.IsConnected() return st } for _, c := range clients { cv := ClientView{ ID: c.GetId(), MAC: formatMAC(c.GetMac()), Version: c.GetVersion(), Available: c.GetAvailable(), Used: c.GetUsed(), LastPing: c.GetLastPing(), LastSuccessPing: c.GetLastSuccessPing(), AccelStream: c.GetAccelStreamEnabled(), } st.Clients = append(st.Clients, cv) } if anyClientAccelStream(st.Clients) { for i := range st.Clients { if !st.Clients[i].AccelStream { continue } if dz, err := readDeadzonePoll(link, st.Clients[i].ID); err == nil { st.Clients[i].Deadzone = dz } } if snap, err := link.readAccelSnapshotPoll(0); err == nil { st.Clients = applyAccelSamples(st.Clients, snap.GetSamples()) } } else { for i, c := range clients { if dz, err := readDeadzonePoll(link, c.GetId()); err == nil { st.Clients[i].Deadzone = dz } } } if streamCtl != nil { streamCtl.SyncFromClients(st.Clients) } return st } func runAccelDashboardPoller(link *managedSerial, hub *wsHub, interval time.Duration, stop <-chan struct{}) { ticker := time.NewTicker(interval) defer ticker.Stop() for { select { case <-stop: return case <-ticker.C: if hub.clientCount() == 0 || !hub.anyAccelStreamEnabled() { continue } snap, err := link.readAccelSnapshotPoll(0) if err != nil { continue } hub.mergeAccel(snap.GetSamples()) } } } func (h *wsHub) clientCount() int { h.mu.RLock() n := len(h.clients) h.mu.RUnlock() return n } func pausedPollState(portName string, last *DashboardState) DashboardState { if last != nil && last.UARTConnected { st := *last st.UpdatedAt = time.Now().Format(time.RFC3339) st.SerialPort = portName st.SerialOK = true st.SerialError = "Live-Polling pausiert (OTA läuft)" return st } return disconnectedState(portName, errUARTBusy) } func readDeadzone(link *managedSerial, clientID uint32) (uint32, error) { r, err := link.AccelDeadzone(&pb.AccelDeadzoneRequest{ Write: false, ClientId: clientID, }) if err != nil { return 0, err } if !r.GetSuccess() { return 0, fmt.Errorf("deadzone read failed for client %d", clientID) } return r.GetDeadzone(), nil } func readDeadzonePoll(link *managedSerial, clientID uint32) (uint32, error) { r, err := link.AccelDeadzonePoll(&pb.AccelDeadzoneRequest{ Write: false, ClientId: clientID, }) if err != nil { return 0, err } if !r.GetSuccess() { return 0, fmt.Errorf("deadzone read failed for client %d", clientID) } return r.GetDeadzone(), nil } func formatMAC(mac []byte) string { if len(mac) == 0 { return "" } return hex.EncodeToString(mac) } func runPoller(link *managedSerial, portName string, hub *wsHub, streamCtl *accelStreamCtl, interval time.Duration, stop <-chan struct{}) { // streamCtl kept for external API; dashboard uses hub.state AccelStream flags. ticker := time.NewTicker(interval) defer ticker.Stop() uartUp := false var lastGood DashboardState publish := func() { st := pollDashboard(link, portName, &lastGood, streamCtl) if st.UARTConnected && st.SerialOK { lastGood = st } if st.UARTConnected && !uartUp { log.Printf("UART %s connected", portName) } uartUp = st.UARTConnected hub.setState(st) } publish() for { select { case <-stop: return case <-ticker.C: publish() } } }