powerpods/goTool/dashboard.go
simon f512936d97 Add CACHE_STATUS UART poll and dashboard live stream.
Combine cached accel and tap in one low-overhead master command for ~16 ms
host polling. The dashboard uses a single live-stream toggle plus per-slave
accel-stream controls; fix live_stream state so polling is not cleared every
slow client refresh.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-29 20:57:54 +02:00

737 lines
17 KiB
Go

package main
import (
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"log"
"sync"
"time"
"github.com/gorilla/websocket"
"powerpod/gotool/pb"
)
type MasterView struct {
Version uint32 `json:"version"`
GitHash string `json:"git_hash"`
RunningPartition string `json:"running_partition,omitempty"`
Deadzone uint32 `json:"deadzone,omitempty"`
Lipo1 lipoReadingJSON `json:"lipo1"`
Lipo2 lipoReadingJSON `json:"lipo2"`
BatteryAgeMs uint32 `json:"battery_age_ms,omitempty"`
OK bool `json:"ok"`
Error string `json:"error,omitempty"`
}
type ClientView struct {
ID uint32 `json:"id"`
MAC string `json:"mac"`
Version uint32 `json:"version"`
Deadzone uint32 `json:"deadzone,omitempty"`
Available bool `json:"available"`
Used bool `json:"used"`
LastPing uint32 `json:"last_ping"`
LastSuccessPing uint32 `json:"last_success_ping"`
AccelValid bool `json:"accel_valid"`
AccelX int32 `json:"accel_x"`
AccelY int32 `json:"accel_y"`
AccelZ int32 `json:"accel_z"`
AccelAgeMs uint32 `json:"accel_age_ms"`
AccelStream bool `json:"accel_stream"`
TapNotifySingle bool `json:"tap_notify_single"`
TapNotifyDouble bool `json:"tap_notify_double"`
TapNotifyTriple bool `json:"tap_notify_triple"`
LastTap string `json:"last_tap,omitempty"`
LastTapAt int64 `json:"last_tap_at,omitempty"`
Lipo1 lipoReadingJSON `json:"lipo1"`
Lipo2 lipoReadingJSON `json:"lipo2"`
BatteryAgeMs uint32 `json:"battery_age_ms,omitempty"`
}
type DashboardState struct {
UpdatedAt string `json:"updated_at"`
SerialPort string `json:"serial_port"`
UARTConnected bool `json:"uart_connected"`
SerialOK bool `json:"serial_ok"`
SerialError string `json:"serial_error,omitempty"`
/** Host: fast CACHE_STATUS poll (~16 ms) for accel + tap. */
LiveStream bool `json:"live_stream"`
Master MasterView `json:"master"`
Clients []ClientView `json:"clients"`
}
type wsHub struct {
mu sync.RWMutex
clients map[*websocket.Conn]struct{}
state DashboardState
liveStream bool
}
func newWSHub() *wsHub {
return &wsHub{clients: make(map[*websocket.Conn]struct{})}
}
func (h *wsHub) setState(st DashboardState) {
h.mu.Lock()
prev := h.state
st.LiveStream = prev.LiveStream
st.Clients = preserveClientAccel(st.Clients, prev.Clients, st.LiveStream)
st.Clients = preserveClientBattery(st.Clients, prev.Clients)
st.Clients = preserveClientTap(st.Clients, prev.Clients)
if !st.Master.Lipo1.Valid && !st.Master.Lipo2.Valid {
if prev.Master.Lipo1.Valid || prev.Master.Lipo2.Valid {
st.Master.Lipo1 = prev.Master.Lipo1
st.Master.Lipo2 = prev.Master.Lipo2
st.Master.BatteryAgeMs = prev.Master.BatteryAgeMs
}
}
h.liveStream = st.LiveStream
h.state = st
conns := make([]*websocket.Conn, 0, len(h.clients))
for c := range h.clients {
conns = append(conns, c)
}
h.mu.Unlock()
data, err := json.Marshal(st)
if err != nil {
return
}
for _, c := range conns {
_ = c.WriteMessage(websocket.TextMessage, data)
}
}
func (h *wsHub) register(c *websocket.Conn) {
h.mu.Lock()
h.clients[c] = struct{}{}
snap := h.state
h.mu.Unlock()
if data, err := json.Marshal(snap); err == nil {
_ = c.WriteMessage(websocket.TextMessage, data)
}
}
func (h *wsHub) unregister(c *websocket.Conn) {
h.mu.Lock()
delete(h.clients, c)
h.mu.Unlock()
}
func applyAccelSamples(clients []ClientView, samples []*pb.AccelSample) []ClientView {
if len(samples) == 0 {
return clients
}
byID := make(map[uint32]*pb.AccelSample, len(samples))
for _, s := range samples {
byID[s.GetClientId()] = s
}
out := make([]ClientView, len(clients))
for i, c := range clients {
out[i] = c
if !c.AccelStream {
out[i].AccelValid = false
continue
}
s, ok := byID[c.ID]
if !ok {
continue
}
out[i].AccelValid = s.GetValid()
if s.GetValid() {
out[i].AccelX = s.GetX()
out[i].AccelY = s.GetY()
out[i].AccelZ = s.GetZ()
out[i].AccelAgeMs = s.GetAgeMs()
}
}
return out
}
func preserveClientAccel(newClients, oldClients []ClientView, liveStream bool) []ClientView {
if len(oldClients) == 0 {
return newClients
}
oldByID := make(map[uint32]ClientView, len(oldClients))
for _, c := range oldClients {
oldByID[c.ID] = c
}
out := make([]ClientView, len(newClients))
for i, c := range newClients {
out[i] = c
if !liveStream && !c.AccelStream {
continue
}
if liveStream && !c.AccelStream {
out[i].AccelValid = false
out[i].AccelX = 0
out[i].AccelY = 0
out[i].AccelZ = 0
out[i].AccelAgeMs = 0
continue
}
prev, ok := oldByID[c.ID]
if !ok || !prev.AccelValid {
continue
}
if !c.AccelValid {
out[i].AccelValid = prev.AccelValid
out[i].AccelX = prev.AccelX
out[i].AccelY = prev.AccelY
out[i].AccelZ = prev.AccelZ
out[i].AccelAgeMs = prev.AccelAgeMs
}
}
return out
}
func preserveClientBattery(newClients, oldClients []ClientView) []ClientView {
if len(oldClients) == 0 {
return newClients
}
oldByID := make(map[uint32]ClientView, len(oldClients))
for _, c := range oldClients {
oldByID[c.ID] = c
}
out := make([]ClientView, len(newClients))
for i, c := range newClients {
out[i] = c
if c.Lipo1.Valid || c.Lipo2.Valid {
continue
}
prev, ok := oldByID[c.ID]
if !ok {
continue
}
if prev.Lipo1.Valid || prev.Lipo2.Valid {
out[i].Lipo1 = prev.Lipo1
out[i].Lipo2 = prev.Lipo2
out[i].BatteryAgeMs = prev.BatteryAgeMs
}
}
return out
}
func anyClientAccelStream(clients []ClientView) bool {
for _, c := range clients {
if c.AccelStream {
return true
}
}
return false
}
func anyClientTapNotify(clients []ClientView) bool {
for _, c := range clients {
if c.TapNotifySingle || c.TapNotifyDouble || c.TapNotifyTriple {
return true
}
}
return false
}
func tapKindLabelPB(k pb.TapKind) string {
switch k {
case pb.TapKind_TAP_SINGLE:
return "single"
case pb.TapKind_TAP_DOUBLE:
return "double"
case pb.TapKind_TAP_TRIPLE:
return "triple"
default:
return ""
}
}
func applyTapEvents(clients []ClientView, events []*pb.TapEvent) []ClientView {
if len(events) == 0 {
return clients
}
byID := make(map[uint32]*pb.TapEvent, len(events))
for _, e := range events {
if e.GetValid() {
byID[e.GetClientId()] = e
}
}
if len(byID) == 0 {
return clients
}
now := time.Now().UnixMilli()
out := make([]ClientView, len(clients))
for i, c := range clients {
out[i] = c
if !clientTapNotifyAny(c) {
continue
}
e, ok := byID[c.ID]
if !ok {
continue
}
out[i].LastTap = tapKindLabelPB(e.GetKind())
out[i].LastTapAt = now
}
return out
}
const clientTapDisplayMinMs = 2000
func clientTapNotifyAny(c ClientView) bool {
return c.TapNotifySingle || c.TapNotifyDouble || c.TapNotifyTriple
}
func preserveClientTap(newClients, oldClients []ClientView) []ClientView {
if len(oldClients) == 0 {
return newClients
}
oldByID := make(map[uint32]ClientView, len(oldClients))
for _, c := range oldClients {
oldByID[c.ID] = c
}
cutoff := time.Now().Add(-clientTapDisplayMinMs * time.Millisecond).UnixMilli()
out := make([]ClientView, len(newClients))
for i, c := range newClients {
out[i] = c
if c.LastTap != "" {
continue
}
prev, ok := oldByID[c.ID]
if !ok || prev.LastTap == "" || prev.LastTapAt < cutoff {
continue
}
out[i].LastTap = prev.LastTap
out[i].LastTapAt = prev.LastTapAt
}
return out
}
// patchClientAccelStream updates stream flag immediately (e.g. after REST) and pushes WS.
func (h *wsHub) patchClientAccelStream(clientID uint32, enabled bool) {
h.mu.Lock()
for i := range h.state.Clients {
if h.state.Clients[i].ID != clientID {
continue
}
h.state.Clients[i].AccelStream = enabled
if !enabled {
h.state.Clients[i].AccelValid = false
h.state.Clients[i].AccelX = 0
h.state.Clients[i].AccelY = 0
h.state.Clients[i].AccelZ = 0
h.state.Clients[i].AccelAgeMs = 0
}
break
}
st := h.state
st.UpdatedAt = time.Now().Format(time.RFC3339)
conns := make([]*websocket.Conn, 0, len(h.clients))
for c := range h.clients {
conns = append(conns, c)
}
h.mu.Unlock()
data, err := json.Marshal(st)
if err != nil {
return
}
for _, c := range conns {
_ = c.WriteMessage(websocket.TextMessage, data)
}
}
func (h *wsHub) anyAccelStreamEnabled() bool {
h.mu.RLock()
defer h.mu.RUnlock()
return anyClientAccelStream(h.state.Clients)
}
func (h *wsHub) anyTapNotifyEnabled() bool {
h.mu.RLock()
defer h.mu.RUnlock()
return anyClientTapNotify(h.state.Clients)
}
func (h *wsHub) liveStreamEnabled() bool {
h.mu.RLock()
defer h.mu.RUnlock()
return h.liveStream
}
func (h *wsHub) snapshotClients() []ClientView {
h.mu.RLock()
defer h.mu.RUnlock()
out := make([]ClientView, len(h.state.Clients))
copy(out, h.state.Clients)
return out
}
// patchLiveStream toggles host CACHE_STATUS polling (~16 ms).
func (h *wsHub) patchLiveStream(enabled bool) {
h.mu.Lock()
h.liveStream = enabled
st := h.state
st.LiveStream = enabled
if !enabled {
for i := range st.Clients {
st.Clients[i].AccelValid = false
st.Clients[i].AccelX = 0
st.Clients[i].AccelY = 0
st.Clients[i].AccelZ = 0
st.Clients[i].AccelAgeMs = 0
st.Clients[i].LastTap = ""
st.Clients[i].LastTapAt = 0
}
}
h.state = st
conns := make([]*websocket.Conn, 0, len(h.clients))
for c := range h.clients {
conns = append(conns, c)
}
h.mu.Unlock()
data, err := json.Marshal(st)
if err != nil {
return
}
for _, c := range conns {
_ = c.WriteMessage(websocket.TextMessage, data)
}
}
// patchClientTapNotify updates tap notify flags immediately (e.g. after REST) and pushes WS.
func (h *wsHub) patchClientTapNotify(clientID uint32, single, doubleTap, triple bool) {
h.mu.Lock()
for i := range h.state.Clients {
if h.state.Clients[i].ID != clientID {
continue
}
h.state.Clients[i].TapNotifySingle = single
h.state.Clients[i].TapNotifyDouble = doubleTap
h.state.Clients[i].TapNotifyTriple = triple
if !single && !doubleTap && !triple {
h.state.Clients[i].LastTap = ""
h.state.Clients[i].LastTapAt = 0
}
break
}
st := h.state
st.UpdatedAt = time.Now().Format(time.RFC3339)
conns := make([]*websocket.Conn, 0, len(h.clients))
for c := range h.clients {
conns = append(conns, c)
}
h.mu.Unlock()
data, err := json.Marshal(st)
if err != nil {
return
}
for _, c := range conns {
_ = c.WriteMessage(websocket.TextMessage, data)
}
}
// mergeAccel updates cached accel on clients and pushes state to dashboard WebSockets.
func (h *wsHub) mergeAccel(samples []*pb.AccelSample) {
if !h.liveStreamEnabled() {
return
}
h.mu.Lock()
st := h.state
st.Clients = applyAccelSamples(st.Clients, samples)
st.UpdatedAt = time.Now().Format(time.RFC3339)
h.state = st
conns := make([]*websocket.Conn, 0, len(h.clients))
for c := range h.clients {
conns = append(conns, c)
}
h.mu.Unlock()
data, err := json.Marshal(st)
if err != nil {
return
}
for _, c := range conns {
_ = c.WriteMessage(websocket.TextMessage, data)
}
}
func (h *wsHub) mergeTap(events []*pb.TapEvent) {
if len(events) == 0 || !h.liveStreamEnabled() {
return
}
h.mu.Lock()
st := h.state
st.Clients = applyTapEvents(st.Clients, events)
st.UpdatedAt = time.Now().Format(time.RFC3339)
h.state = st
conns := make([]*websocket.Conn, 0, len(h.clients))
for c := range h.clients {
conns = append(conns, c)
}
h.mu.Unlock()
data, err := json.Marshal(st)
if err != nil {
return
}
for _, c := range conns {
_ = c.WriteMessage(websocket.TextMessage, data)
}
}
func (h *wsHub) broadcastRaw(v any) {
h.mu.RLock()
conns := make([]*websocket.Conn, 0, len(h.clients))
for c := range h.clients {
conns = append(conns, c)
}
h.mu.RUnlock()
data, err := json.Marshal(v)
if err != nil {
return
}
for _, c := range conns {
_ = c.WriteMessage(websocket.TextMessage, data)
}
}
func pollDashboard(link *managedSerial, portName string, last *DashboardState, streamCtl *accelStreamCtl, tapCtl *tapNotifyCtl) DashboardState {
st := DashboardState{
UpdatedAt: time.Now().Format(time.RFC3339),
SerialPort: portName,
Clients: []ClientView{},
}
ver, err := link.getVersionPoll()
if errors.Is(err, errUARTBusy) {
return pausedPollState(portName, last)
}
if err != nil {
return disconnectedState(portName, err)
}
st.UARTConnected = true
st.SerialOK = true
st.Master = MasterView{
Version: ver.GetVersion(),
GitHash: ver.GetGitHash(),
RunningPartition: ver.GetRunningPartition(),
OK: true,
}
if dz, err := readDeadzonePoll(link, 0); err == nil {
st.Master.Deadzone = dz
}
clients, err := link.listClientsPoll()
if err != nil {
if errors.Is(err, errUARTBusy) {
return pausedPollState(portName, last)
}
st.SerialOK = false
st.SerialError = err.Error()
st.UARTConnected = link.IsConnected()
return st
}
for _, c := range clients {
cv := ClientView{
ID: c.GetId(),
MAC: formatMAC(c.GetMac()),
Version: c.GetVersion(),
Available: c.GetAvailable(),
Used: c.GetUsed(),
LastPing: c.GetLastPing(),
LastSuccessPing: c.GetLastSuccessPing(),
AccelStream: c.GetAccelStreamEnabled(),
TapNotifySingle: c.GetTapNotifySingle(),
TapNotifyDouble: c.GetTapNotifyDouble(),
TapNotifyTriple: c.GetTapNotifyTriple(),
}
st.Clients = append(st.Clients, cv)
}
applyBatteryToState(link, &st)
if last == nil || !last.LiveStream {
for i, c := range clients {
if dz, err := readDeadzonePoll(link, c.GetId()); err == nil {
st.Clients[i].Deadzone = dz
}
}
}
if last != nil {
st.LiveStream = last.LiveStream
}
if streamCtl != nil {
streamCtl.SyncFromClients(st.Clients)
}
if tapCtl != nil {
tapCtl.SyncFromClients(st.Clients)
}
return st
}
func applyBatteryToState(link *managedSerial, st *DashboardState) {
bat, err := link.BatteryStatusPoll(&pb.BatteryStatusRequest{AllClients: true})
if err != nil {
log.Printf("battery poll: %v", err)
return
}
applyBatterySamplesToState(st, batterySamplesFromPB(bat.GetSamples()))
}
func (h *wsHub) mergeBattery(samples []batterySampleJSON) {
if len(samples) == 0 {
return
}
h.mu.Lock()
st := h.state
applyBatterySamplesToState(&st, samples)
st.UpdatedAt = time.Now().Format(time.RFC3339)
h.state = st
conns := make([]*websocket.Conn, 0, len(h.clients))
for c := range h.clients {
conns = append(conns, c)
}
h.mu.Unlock()
data, err := json.Marshal(st)
if err != nil {
return
}
for _, c := range conns {
_ = c.WriteMessage(websocket.TextMessage, data)
}
}
func runBatteryPoller(link *managedSerial, hub *wsHub, interval time.Duration, stop <-chan struct{}) {
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-stop:
return
case <-ticker.C:
if hub.clientCount() == 0 {
continue
}
bat, err := link.BatteryStatusPoll(&pb.BatteryStatusRequest{AllClients: true})
if err != nil {
continue
}
hub.mergeBattery(batterySamplesFromPB(bat.GetSamples()))
}
}
}
func runCacheStatusDashboardPoller(link *managedSerial, hub *wsHub, interval time.Duration, stop <-chan struct{}) {
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-stop:
return
case <-ticker.C:
if !hub.liveStreamEnabled() {
continue
}
cache, err := link.readCacheStatusPoll()
if err != nil {
continue
}
hub.mergeAccel(cache.GetAccel())
hub.mergeTap(cache.GetTaps())
}
}
}
func (h *wsHub) clientCount() int {
h.mu.RLock()
n := len(h.clients)
h.mu.RUnlock()
return n
}
func pausedPollState(portName string, last *DashboardState) DashboardState {
if last != nil && last.UARTConnected {
st := *last
st.UpdatedAt = time.Now().Format(time.RFC3339)
st.SerialPort = portName
st.SerialOK = true
st.SerialError = "Live-Polling pausiert (OTA läuft)"
return st
}
return disconnectedState(portName, errUARTBusy)
}
func readDeadzone(link *managedSerial, clientID uint32) (uint32, error) {
r, err := link.AccelDeadzone(&pb.AccelDeadzoneRequest{
Write: false,
ClientId: clientID,
})
if err != nil {
return 0, err
}
if !r.GetSuccess() {
return 0, fmt.Errorf("deadzone read failed for client %d", clientID)
}
return r.GetDeadzone(), nil
}
func readDeadzonePoll(link *managedSerial, clientID uint32) (uint32, error) {
r, err := link.AccelDeadzonePoll(&pb.AccelDeadzoneRequest{
Write: false,
ClientId: clientID,
})
if err != nil {
return 0, err
}
if !r.GetSuccess() {
return 0, fmt.Errorf("deadzone read failed for client %d", clientID)
}
return r.GetDeadzone(), nil
}
func formatMAC(mac []byte) string {
if len(mac) == 0 {
return ""
}
return hex.EncodeToString(mac)
}
func runPoller(link *managedSerial, portName string, hub *wsHub, streamCtl *accelStreamCtl, tapCtl *tapNotifyCtl, interval time.Duration, stop <-chan struct{}) {
// streamCtl / tapCtl kept for external API; dashboard uses hub.state flags.
ticker := time.NewTicker(interval)
defer ticker.Stop()
uartUp := false
var lastGood DashboardState
publish := func() {
st := pollDashboard(link, portName, &lastGood, streamCtl, tapCtl)
hub.setState(st)
if st.UARTConnected && st.SerialOK {
hub.mu.RLock()
lastGood = hub.state
hub.mu.RUnlock()
}
if st.UARTConnected && !uartUp {
log.Printf("UART %s connected", portName)
}
uartUp = st.UARTConnected
}
publish()
for {
select {
case <-stop:
return
case <-ticker.C:
publish()
}
}
}