package main import ( "encoding/hex" "encoding/json" "errors" "fmt" "log" "sync" "time" "github.com/gorilla/websocket" "powerpod/gotool/pb" ) type MasterView struct { Version uint32 `json:"version"` GitHash string `json:"git_hash"` RunningPartition string `json:"running_partition,omitempty"` Deadzone uint32 `json:"deadzone,omitempty"` Lipo1 lipoReadingJSON `json:"lipo1"` Lipo2 lipoReadingJSON `json:"lipo2"` BatteryAgeMs uint32 `json:"battery_age_ms,omitempty"` OK bool `json:"ok"` Error string `json:"error,omitempty"` } type ClientView struct { ID uint32 `json:"id"` MAC string `json:"mac"` Version uint32 `json:"version"` Deadzone uint32 `json:"deadzone,omitempty"` Available bool `json:"available"` Used bool `json:"used"` LastPing uint32 `json:"last_ping"` LastSuccessPing uint32 `json:"last_success_ping"` AccelValid bool `json:"accel_valid"` AccelX int32 `json:"accel_x"` AccelY int32 `json:"accel_y"` AccelZ int32 `json:"accel_z"` AccelAgeMs uint32 `json:"accel_age_ms"` AccelStream bool `json:"accel_stream"` TapNotifySingle bool `json:"tap_notify_single"` TapNotifyDouble bool `json:"tap_notify_double"` TapNotifyTriple bool `json:"tap_notify_triple"` LastTap string `json:"last_tap,omitempty"` LastTapAt int64 `json:"last_tap_at,omitempty"` Lipo1 lipoReadingJSON `json:"lipo1"` Lipo2 lipoReadingJSON `json:"lipo2"` BatteryAgeMs uint32 `json:"battery_age_ms,omitempty"` } type DashboardState struct { UpdatedAt string `json:"updated_at"` SerialPort string `json:"serial_port"` UARTConnected bool `json:"uart_connected"` SerialOK bool `json:"serial_ok"` SerialError string `json:"serial_error,omitempty"` /** Host: fast CACHE_STATUS poll (~16 ms) for accel + tap. */ LiveStream bool `json:"live_stream"` Master MasterView `json:"master"` Clients []ClientView `json:"clients"` } type wsHub struct { mu sync.RWMutex clients map[*websocket.Conn]struct{} state DashboardState liveStream bool } func newWSHub() *wsHub { return &wsHub{clients: make(map[*websocket.Conn]struct{})} } func (h *wsHub) setState(st DashboardState) { h.mu.Lock() prev := h.state st.LiveStream = prev.LiveStream st.Clients = preserveClientAccel(st.Clients, prev.Clients, st.LiveStream) st.Clients = preserveClientBattery(st.Clients, prev.Clients) st.Clients = preserveClientTap(st.Clients, prev.Clients) if !st.Master.Lipo1.Valid && !st.Master.Lipo2.Valid { if prev.Master.Lipo1.Valid || prev.Master.Lipo2.Valid { st.Master.Lipo1 = prev.Master.Lipo1 st.Master.Lipo2 = prev.Master.Lipo2 st.Master.BatteryAgeMs = prev.Master.BatteryAgeMs } } h.liveStream = st.LiveStream h.state = st conns := make([]*websocket.Conn, 0, len(h.clients)) for c := range h.clients { conns = append(conns, c) } h.mu.Unlock() data, err := json.Marshal(st) if err != nil { return } for _, c := range conns { _ = c.WriteMessage(websocket.TextMessage, data) } } func (h *wsHub) register(c *websocket.Conn) { h.mu.Lock() h.clients[c] = struct{}{} snap := h.state h.mu.Unlock() if data, err := json.Marshal(snap); err == nil { _ = c.WriteMessage(websocket.TextMessage, data) } } func (h *wsHub) unregister(c *websocket.Conn) { h.mu.Lock() delete(h.clients, c) h.mu.Unlock() } func applyAccelSamples(clients []ClientView, samples []*pb.AccelSample) []ClientView { if len(samples) == 0 { return clients } byID := make(map[uint32]*pb.AccelSample, len(samples)) for _, s := range samples { byID[s.GetClientId()] = s } out := make([]ClientView, len(clients)) for i, c := range clients { out[i] = c if !c.AccelStream { out[i].AccelValid = false continue } s, ok := byID[c.ID] if !ok { continue } out[i].AccelValid = s.GetValid() if s.GetValid() { out[i].AccelX = s.GetX() out[i].AccelY = s.GetY() out[i].AccelZ = s.GetZ() out[i].AccelAgeMs = s.GetAgeMs() } } return out } func preserveClientAccel(newClients, oldClients []ClientView, liveStream bool) []ClientView { if len(oldClients) == 0 { return newClients } oldByID := make(map[uint32]ClientView, len(oldClients)) for _, c := range oldClients { oldByID[c.ID] = c } out := make([]ClientView, len(newClients)) for i, c := range newClients { out[i] = c if !liveStream && !c.AccelStream { continue } if liveStream && !c.AccelStream { out[i].AccelValid = false out[i].AccelX = 0 out[i].AccelY = 0 out[i].AccelZ = 0 out[i].AccelAgeMs = 0 continue } prev, ok := oldByID[c.ID] if !ok || !prev.AccelValid { continue } if !c.AccelValid { out[i].AccelValid = prev.AccelValid out[i].AccelX = prev.AccelX out[i].AccelY = prev.AccelY out[i].AccelZ = prev.AccelZ out[i].AccelAgeMs = prev.AccelAgeMs } } return out } func preserveClientBattery(newClients, oldClients []ClientView) []ClientView { if len(oldClients) == 0 { return newClients } oldByID := make(map[uint32]ClientView, len(oldClients)) for _, c := range oldClients { oldByID[c.ID] = c } out := make([]ClientView, len(newClients)) for i, c := range newClients { out[i] = c if c.Lipo1.Valid || c.Lipo2.Valid { continue } prev, ok := oldByID[c.ID] if !ok { continue } if prev.Lipo1.Valid || prev.Lipo2.Valid { out[i].Lipo1 = prev.Lipo1 out[i].Lipo2 = prev.Lipo2 out[i].BatteryAgeMs = prev.BatteryAgeMs } } return out } func anyClientAccelStream(clients []ClientView) bool { for _, c := range clients { if c.AccelStream { return true } } return false } func anyClientTapNotify(clients []ClientView) bool { for _, c := range clients { if c.TapNotifySingle || c.TapNotifyDouble || c.TapNotifyTriple { return true } } return false } func tapKindLabelPB(k pb.TapKind) string { switch k { case pb.TapKind_TAP_SINGLE: return "single" case pb.TapKind_TAP_DOUBLE: return "double" case pb.TapKind_TAP_TRIPLE: return "triple" default: return "" } } func applyTapEvents(clients []ClientView, events []*pb.TapEvent) []ClientView { if len(events) == 0 { return clients } byID := make(map[uint32]*pb.TapEvent, len(events)) for _, e := range events { if e.GetValid() { byID[e.GetClientId()] = e } } if len(byID) == 0 { return clients } now := time.Now().UnixMilli() out := make([]ClientView, len(clients)) for i, c := range clients { out[i] = c if !clientTapNotifyAny(c) { continue } e, ok := byID[c.ID] if !ok { continue } out[i].LastTap = tapKindLabelPB(e.GetKind()) out[i].LastTapAt = now } return out } const clientTapDisplayMinMs = 2000 func clientTapNotifyAny(c ClientView) bool { return c.TapNotifySingle || c.TapNotifyDouble || c.TapNotifyTriple } func preserveClientTap(newClients, oldClients []ClientView) []ClientView { if len(oldClients) == 0 { return newClients } oldByID := make(map[uint32]ClientView, len(oldClients)) for _, c := range oldClients { oldByID[c.ID] = c } cutoff := time.Now().Add(-clientTapDisplayMinMs * time.Millisecond).UnixMilli() out := make([]ClientView, len(newClients)) for i, c := range newClients { out[i] = c if c.LastTap != "" { continue } prev, ok := oldByID[c.ID] if !ok || prev.LastTap == "" || prev.LastTapAt < cutoff { continue } out[i].LastTap = prev.LastTap out[i].LastTapAt = prev.LastTapAt } return out } // patchClientAccelStream updates stream flag immediately (e.g. after REST) and pushes WS. func (h *wsHub) patchClientAccelStream(clientID uint32, enabled bool) { h.mu.Lock() for i := range h.state.Clients { if h.state.Clients[i].ID != clientID { continue } h.state.Clients[i].AccelStream = enabled if !enabled { h.state.Clients[i].AccelValid = false h.state.Clients[i].AccelX = 0 h.state.Clients[i].AccelY = 0 h.state.Clients[i].AccelZ = 0 h.state.Clients[i].AccelAgeMs = 0 } break } st := h.state st.UpdatedAt = time.Now().Format(time.RFC3339) conns := make([]*websocket.Conn, 0, len(h.clients)) for c := range h.clients { conns = append(conns, c) } h.mu.Unlock() data, err := json.Marshal(st) if err != nil { return } for _, c := range conns { _ = c.WriteMessage(websocket.TextMessage, data) } } func (h *wsHub) anyAccelStreamEnabled() bool { h.mu.RLock() defer h.mu.RUnlock() return anyClientAccelStream(h.state.Clients) } func (h *wsHub) anyTapNotifyEnabled() bool { h.mu.RLock() defer h.mu.RUnlock() return anyClientTapNotify(h.state.Clients) } func (h *wsHub) liveStreamEnabled() bool { h.mu.RLock() defer h.mu.RUnlock() return h.liveStream } func (h *wsHub) snapshotClients() []ClientView { h.mu.RLock() defer h.mu.RUnlock() out := make([]ClientView, len(h.state.Clients)) copy(out, h.state.Clients) return out } // patchLiveStream toggles host CACHE_STATUS polling (~16 ms). func (h *wsHub) patchLiveStream(enabled bool) { h.mu.Lock() h.liveStream = enabled st := h.state st.LiveStream = enabled if !enabled { for i := range st.Clients { st.Clients[i].AccelValid = false st.Clients[i].AccelX = 0 st.Clients[i].AccelY = 0 st.Clients[i].AccelZ = 0 st.Clients[i].AccelAgeMs = 0 st.Clients[i].LastTap = "" st.Clients[i].LastTapAt = 0 } } h.state = st conns := make([]*websocket.Conn, 0, len(h.clients)) for c := range h.clients { conns = append(conns, c) } h.mu.Unlock() data, err := json.Marshal(st) if err != nil { return } for _, c := range conns { _ = c.WriteMessage(websocket.TextMessage, data) } } // patchClientTapNotify updates tap notify flags immediately (e.g. after REST) and pushes WS. func (h *wsHub) patchClientTapNotify(clientID uint32, single, doubleTap, triple bool) { h.mu.Lock() for i := range h.state.Clients { if h.state.Clients[i].ID != clientID { continue } h.state.Clients[i].TapNotifySingle = single h.state.Clients[i].TapNotifyDouble = doubleTap h.state.Clients[i].TapNotifyTriple = triple if !single && !doubleTap && !triple { h.state.Clients[i].LastTap = "" h.state.Clients[i].LastTapAt = 0 } break } st := h.state st.UpdatedAt = time.Now().Format(time.RFC3339) conns := make([]*websocket.Conn, 0, len(h.clients)) for c := range h.clients { conns = append(conns, c) } h.mu.Unlock() data, err := json.Marshal(st) if err != nil { return } for _, c := range conns { _ = c.WriteMessage(websocket.TextMessage, data) } } // mergeAccel updates cached accel on clients and pushes state to dashboard WebSockets. func (h *wsHub) mergeAccel(samples []*pb.AccelSample) { if !h.liveStreamEnabled() { return } h.mu.Lock() st := h.state st.Clients = applyAccelSamples(st.Clients, samples) st.UpdatedAt = time.Now().Format(time.RFC3339) h.state = st conns := make([]*websocket.Conn, 0, len(h.clients)) for c := range h.clients { conns = append(conns, c) } h.mu.Unlock() data, err := json.Marshal(st) if err != nil { return } for _, c := range conns { _ = c.WriteMessage(websocket.TextMessage, data) } } func (h *wsHub) mergeTap(events []*pb.TapEvent) { if len(events) == 0 || !h.liveStreamEnabled() { return } h.mu.Lock() st := h.state st.Clients = applyTapEvents(st.Clients, events) st.UpdatedAt = time.Now().Format(time.RFC3339) h.state = st conns := make([]*websocket.Conn, 0, len(h.clients)) for c := range h.clients { conns = append(conns, c) } h.mu.Unlock() data, err := json.Marshal(st) if err != nil { return } for _, c := range conns { _ = c.WriteMessage(websocket.TextMessage, data) } } func (h *wsHub) broadcastRaw(v any) { h.mu.RLock() conns := make([]*websocket.Conn, 0, len(h.clients)) for c := range h.clients { conns = append(conns, c) } h.mu.RUnlock() data, err := json.Marshal(v) if err != nil { return } for _, c := range conns { _ = c.WriteMessage(websocket.TextMessage, data) } } func pollDashboard(link *managedSerial, portName string, last *DashboardState, streamCtl *accelStreamCtl, tapCtl *tapNotifyCtl) DashboardState { st := DashboardState{ UpdatedAt: time.Now().Format(time.RFC3339), SerialPort: portName, Clients: []ClientView{}, } ver, err := link.getVersionPoll() if errors.Is(err, errUARTBusy) { return pausedPollState(portName, last) } if err != nil { return disconnectedState(portName, err) } st.UARTConnected = true st.SerialOK = true st.Master = MasterView{ Version: ver.GetVersion(), GitHash: ver.GetGitHash(), RunningPartition: ver.GetRunningPartition(), OK: true, } if dz, err := readDeadzonePoll(link, 0); err == nil { st.Master.Deadzone = dz } clients, err := link.listClientsPoll() if err != nil { if errors.Is(err, errUARTBusy) { return pausedPollState(portName, last) } st.SerialOK = false st.SerialError = err.Error() st.UARTConnected = link.IsConnected() return st } for _, c := range clients { cv := ClientView{ ID: c.GetId(), MAC: formatMAC(c.GetMac()), Version: c.GetVersion(), Available: c.GetAvailable(), Used: c.GetUsed(), LastPing: c.GetLastPing(), LastSuccessPing: c.GetLastSuccessPing(), AccelStream: c.GetAccelStreamEnabled(), TapNotifySingle: c.GetTapNotifySingle(), TapNotifyDouble: c.GetTapNotifyDouble(), TapNotifyTriple: c.GetTapNotifyTriple(), } st.Clients = append(st.Clients, cv) } applyBatteryToState(link, &st) if last == nil || !last.LiveStream { for i, c := range clients { if dz, err := readDeadzonePoll(link, c.GetId()); err == nil { st.Clients[i].Deadzone = dz } } } if last != nil { st.LiveStream = last.LiveStream } if streamCtl != nil { streamCtl.SyncFromClients(st.Clients) } if tapCtl != nil { tapCtl.SyncFromClients(st.Clients) } return st } func applyBatteryToState(link *managedSerial, st *DashboardState) { bat, err := link.BatteryStatusPoll(&pb.BatteryStatusRequest{AllClients: true}) if err != nil { log.Printf("battery poll: %v", err) return } applyBatterySamplesToState(st, batterySamplesFromPB(bat.GetSamples())) } func (h *wsHub) mergeBattery(samples []batterySampleJSON) { if len(samples) == 0 { return } h.mu.Lock() st := h.state applyBatterySamplesToState(&st, samples) st.UpdatedAt = time.Now().Format(time.RFC3339) h.state = st conns := make([]*websocket.Conn, 0, len(h.clients)) for c := range h.clients { conns = append(conns, c) } h.mu.Unlock() data, err := json.Marshal(st) if err != nil { return } for _, c := range conns { _ = c.WriteMessage(websocket.TextMessage, data) } } func runBatteryPoller(link *managedSerial, hub *wsHub, interval time.Duration, stop <-chan struct{}) { ticker := time.NewTicker(interval) defer ticker.Stop() for { select { case <-stop: return case <-ticker.C: if hub.clientCount() == 0 { continue } bat, err := link.BatteryStatusPoll(&pb.BatteryStatusRequest{AllClients: true}) if err != nil { continue } hub.mergeBattery(batterySamplesFromPB(bat.GetSamples())) } } } func runCacheStatusDashboardPoller(link *managedSerial, hub *wsHub, interval time.Duration, stop <-chan struct{}) { ticker := time.NewTicker(interval) defer ticker.Stop() for { select { case <-stop: return case <-ticker.C: if !hub.liveStreamEnabled() { continue } cache, err := link.readCacheStatusPoll() if err != nil { continue } hub.mergeAccel(accelSamplesFromCacheStatus(cache)) hub.mergeTap(tapEventsFromCacheStatus(cache)) } } } func (h *wsHub) clientCount() int { h.mu.RLock() n := len(h.clients) h.mu.RUnlock() return n } func pausedPollState(portName string, last *DashboardState) DashboardState { if last != nil && last.UARTConnected { st := *last st.UpdatedAt = time.Now().Format(time.RFC3339) st.SerialPort = portName st.SerialOK = true st.SerialError = "Live-Polling pausiert (OTA läuft)" return st } return disconnectedState(portName, errUARTBusy) } func readDeadzone(link *managedSerial, clientID uint32) (uint32, error) { r, err := link.AccelDeadzone(&pb.AccelDeadzoneRequest{ Write: false, ClientId: clientID, }) if err != nil { return 0, err } if !r.GetSuccess() { return 0, fmt.Errorf("deadzone read failed for client %d", clientID) } return r.GetDeadzone(), nil } func readDeadzonePoll(link *managedSerial, clientID uint32) (uint32, error) { r, err := link.AccelDeadzonePoll(&pb.AccelDeadzoneRequest{ Write: false, ClientId: clientID, }) if err != nil { return 0, err } if !r.GetSuccess() { return 0, fmt.Errorf("deadzone read failed for client %d", clientID) } return r.GetDeadzone(), nil } func formatMAC(mac []byte) string { if len(mac) == 0 { return "" } return hex.EncodeToString(mac) } func runPoller(link *managedSerial, portName string, hub *wsHub, streamCtl *accelStreamCtl, tapCtl *tapNotifyCtl, interval time.Duration, stop <-chan struct{}) { // streamCtl / tapCtl kept for external API; dashboard uses hub.state flags. ticker := time.NewTicker(interval) defer ticker.Stop() uartUp := false var lastGood DashboardState publish := func() { st := pollDashboard(link, portName, &lastGood, streamCtl, tapCtl) hub.setState(st) if st.UARTConnected && st.SerialOK { hub.mu.RLock() lastGood = hub.state hub.mu.RUnlock() } if st.UARTConnected && !uartUp { log.Printf("UART %s connected", portName) } uartUp = st.UARTConnected } publish() for { select { case <-stop: return case <-ticker.C: publish() } } }