powerpods/goTool/api_stream.go
simon eb67a46158 Add LED ring control per client and broadcast over REST and WebSocket.
Solid color mode fills all ring LEDs; master routes UART commands to slaves
via ESPNOW_LED_RING. goTool exposes POST /api/led-ring, WebSocket set_led_ring,
and a dashboard LED panel with master/slave/all targets.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-29 19:24:55 +02:00

501 lines
13 KiB
Go

package main
import (
"context"
"encoding/json"
"errors"
"log"
"net/http"
"sync"
"time"
"github.com/gorilla/websocket"
"powerpod/gotool/pb"
)
const (
defaultAccelStreamInterval = 16 * time.Millisecond
minAPIStreamInterval = 1 * time.Millisecond
maxAPIStreamInterval = 10 * time.Second
)
// AccelClientSample is one slave's cached accel on the master.
type AccelClientSample struct {
ClientID uint32 `json:"client_id"`
Valid bool `json:"valid"`
X int32 `json:"x,omitempty"`
Y int32 `json:"y,omitempty"`
Z int32 `json:"z,omitempty"`
AgeMs uint32 `json:"age_ms,omitempty"`
}
// AccelStreamMessage is sent to external WebSocket clients.
type AccelStreamMessage struct {
Type string `json:"type"` // "hello" | "accel"
Serial string `json:"serial_port,omitempty"`
IntervalMs int `json:"interval_ms,omitempty"`
Commands []string `json:"commands,omitempty"`
T int64 `json:"t,omitempty"` // Unix nanoseconds
Success bool `json:"success,omitempty"`
Clients []AccelClientSample `json:"clients,omitempty"`
Error string `json:"error,omitempty"`
}
// StreamStatusMessage is the reply to set_stream / get_stream (this connection).
type StreamStatusMessage struct {
Type string `json:"type"` // "stream_status"
ReceiveAccel bool `json:"receive_accel"`
IntervalMs int `json:"interval_ms"`
Success bool `json:"success"`
Error string `json:"error,omitempty"`
}
// AccelStreamStatusMessage is the reply to set_accel_stream / get_accel_stream (slave).
type AccelStreamStatusMessage struct {
Type string `json:"type"` // "accel_stream_status"
ClientID uint32 `json:"client_id"`
Enabled bool `json:"enabled"`
Success bool `json:"success"`
SlavesUpdated uint32 `json:"slaves_updated,omitempty"`
Error string `json:"error,omitempty"`
}
type accelWSCommand struct {
Type string `json:"type"`
ClientID uint32 `json:"client_id"`
Enable *bool `json:"enable"`
IntervalMs *int `json:"interval_ms"`
}
type APIInfoResponse struct {
Name string `json:"name"`
Version string `json:"version"`
SerialPort string `json:"serial_port"`
WebSocket string `json:"websocket"`
DefaultIntervalMs int `json:"default_interval_ms"`
MinIntervalMs int `json:"min_interval_ms"`
MaxIntervalMs int `json:"max_interval_ms"`
Description string `json:"description"`
}
type wsSubscriber struct {
conn *websocket.Conn
receiveAccel bool
interval time.Duration
lastSent time.Time
}
type accelStreamHub struct {
mu sync.RWMutex
clients map[*websocket.Conn]*wsSubscriber
defaultInterval time.Duration
configChanged chan struct{}
}
func newAccelStreamHub(defaultInterval time.Duration) *accelStreamHub {
return &accelStreamHub{
clients: make(map[*websocket.Conn]*wsSubscriber),
defaultInterval: defaultInterval,
configChanged: make(chan struct{}, 1),
}
}
func (h *accelStreamHub) notifyConfigChanged() {
select {
case h.configChanged <- struct{}{}:
default:
}
}
func clampAPIInterval(d time.Duration) time.Duration {
if d < minAPIStreamInterval {
return minAPIStreamInterval
}
if d > maxAPIStreamInterval {
return maxAPIStreamInterval
}
return d
}
func (h *accelStreamHub) register(conn *websocket.Conn, portName string) *wsSubscriber {
sub := &wsSubscriber{
conn: conn,
receiveAccel: false,
interval: h.defaultInterval,
}
h.mu.Lock()
h.clients[conn] = sub
h.mu.Unlock()
hello := AccelStreamMessage{
Type: "hello",
Serial: portName,
IntervalMs: int(h.defaultInterval / time.Millisecond),
Commands: []string{
"set_stream", "get_stream", "set_accel_stream", "get_accel_stream", "set_led_ring",
},
}
if data, err := json.Marshal(hello); err == nil {
_ = conn.WriteMessage(websocket.TextMessage, data)
}
return sub
}
func (h *accelStreamHub) unregister(conn *websocket.Conn) {
h.mu.Lock()
delete(h.clients, conn)
h.mu.Unlock()
h.notifyConfigChanged()
}
func (h *accelStreamHub) anyWantsAccel() bool {
h.mu.RLock()
defer h.mu.RUnlock()
for _, sub := range h.clients {
if sub.receiveAccel {
return true
}
}
return false
}
func (h *accelStreamHub) minWantedInterval() time.Duration {
h.mu.RLock()
defer h.mu.RUnlock()
var min time.Duration
for _, sub := range h.clients {
if !sub.receiveAccel {
continue
}
if min == 0 || sub.interval < min {
min = sub.interval
}
}
if min == 0 {
return h.defaultInterval
}
return min
}
func (h *accelStreamHub) setStream(sub *wsSubscriber, enable bool, intervalMs *int) StreamStatusMessage {
h.mu.Lock()
sub.receiveAccel = enable
if intervalMs != nil {
sub.interval = clampAPIInterval(time.Duration(*intervalMs) * time.Millisecond)
}
ms := int(sub.interval / time.Millisecond)
h.mu.Unlock()
h.notifyConfigChanged()
return StreamStatusMessage{
Type: "stream_status",
ReceiveAccel: enable,
IntervalMs: ms,
Success: true,
}
}
func (h *accelStreamHub) getStream(sub *wsSubscriber) StreamStatusMessage {
h.mu.RLock()
defer h.mu.RUnlock()
return StreamStatusMessage{
Type: "stream_status",
ReceiveAccel: sub.receiveAccel,
IntervalMs: int(sub.interval / time.Millisecond),
Success: true,
}
}
func (h *accelStreamHub) deliver(msg AccelStreamMessage) {
data, err := json.Marshal(msg)
if err != nil {
return
}
now := time.Now()
h.mu.Lock()
defer h.mu.Unlock()
for conn, sub := range h.clients {
if !sub.receiveAccel {
continue
}
if !sub.lastSent.IsZero() && now.Sub(sub.lastSent) < sub.interval {
continue
}
sub.lastSent = now
if err := conn.WriteMessage(websocket.TextMessage, data); err != nil {
delete(h.clients, conn)
_ = conn.Close()
}
}
}
func runAccelStreamer(link *managedSerial, hub *accelStreamHub, dash *wsHub, ctl *accelStreamCtl, stop <-chan struct{}) {
var ticker *time.Ticker
var tick <-chan time.Time
resetTicker := func() {
if ticker != nil {
ticker.Stop()
}
interval := hub.minWantedInterval()
ticker = time.NewTicker(interval)
tick = ticker.C
}
resetTicker()
defer func() {
if ticker != nil {
ticker.Stop()
}
}()
for {
select {
case <-stop:
return
case <-hub.configChanged:
resetTicker()
case <-tick:
if !hub.anyWantsAccel() {
continue
}
if !accelStreamPollingActive(dash, ctl) {
continue
}
now := time.Now().UnixNano()
resp, err := link.readAccelSnapshotPoll(0)
if errors.Is(err, errUARTBusy) {
hub.deliver(AccelStreamMessage{
Type: "accel",
T: now,
Success: false,
Error: "uart busy",
})
continue
}
if err != nil {
hub.deliver(AccelStreamMessage{
Type: "accel",
T: now,
Success: false,
Error: err.Error(),
})
continue
}
clients := make([]AccelClientSample, 0, len(resp.GetSamples()))
for _, s := range resp.GetSamples() {
clients = append(clients, AccelClientSample{
ClientID: s.GetClientId(),
Valid: s.GetValid(),
X: s.GetX(),
Y: s.GetY(),
Z: s.GetZ(),
AgeMs: s.GetAgeMs(),
})
}
hub.deliver(AccelStreamMessage{
Type: "accel",
T: now,
Success: true,
Clients: clients,
})
}
}
}
func accelStreamPollingActive(dash *wsHub, ctl *accelStreamCtl) bool {
if ctl != nil && ctl.Any() {
return true
}
return dash != nil && dash.anyAccelStreamEnabled()
}
func writeStreamStatus(conn *websocket.Conn, msg StreamStatusMessage) {
data, err := json.Marshal(msg)
if err != nil {
return
}
_ = conn.WriteMessage(websocket.TextMessage, data)
}
func writeLedRingStatus(conn *websocket.Conn, out ledRingAPIResponse) {
out.Type = "led_ring_status"
data, err := json.Marshal(out)
if err != nil {
return
}
_ = conn.WriteMessage(websocket.TextMessage, data)
}
func writeAccelStreamStatus(conn *websocket.Conn, out accelStreamAPIResponse) {
msg := AccelStreamStatusMessage{
Type: "accel_stream_status",
ClientID: out.ClientID,
Enabled: out.Enabled,
Success: out.Success,
SlavesUpdated: out.SlavesUpdated,
Error: out.Error,
}
data, err := json.Marshal(msg)
if err != nil {
return
}
_ = conn.WriteMessage(websocket.TextMessage, data)
}
func handleAccelWSCommand(conn *websocket.Conn, sub *wsSubscriber, data []byte, link *managedSerial, dash *wsHub, ctl *accelStreamCtl, hub *accelStreamHub) {
var cmd accelWSCommand
if err := json.Unmarshal(data, &cmd); err != nil {
writeStreamStatus(conn, StreamStatusMessage{Type: "stream_status", Error: "invalid JSON"})
return
}
switch cmd.Type {
case "set_stream":
if cmd.Enable == nil {
writeStreamStatus(conn, StreamStatusMessage{
Type: "stream_status",
Error: "enable required",
})
return
}
writeStreamStatus(conn, hub.setStream(sub, *cmd.Enable, cmd.IntervalMs))
case "get_stream":
writeStreamStatus(conn, hub.getStream(sub))
case "set_accel_stream":
if cmd.ClientID == 0 {
writeAccelStreamStatus(conn, accelStreamAPIResponse{Error: "client_id required"})
return
}
if cmd.Enable == nil {
writeAccelStreamStatus(conn, accelStreamAPIResponse{
ClientID: cmd.ClientID,
Error: "enable required",
})
return
}
writeAccelStreamStatus(conn, applyAccelStreamClient(link, dash, ctl, cmd.ClientID, *cmd.Enable))
case "get_accel_stream":
if cmd.ClientID == 0 {
writeAccelStreamStatus(conn, accelStreamAPIResponse{Error: "client_id required"})
return
}
resp, err := link.AccelStreamPoll(&pb.AccelStreamRequest{
Write: false,
ClientId: cmd.ClientID,
})
if err != nil {
writeAccelStreamStatus(conn, accelStreamAPIResponse{
ClientID: cmd.ClientID,
Error: err.Error(),
})
return
}
if ctl != nil {
ctl.Set(cmd.ClientID, resp.GetEnabled())
}
writeAccelStreamStatus(conn, accelStreamAPIResponse{
Enabled: resp.GetEnabled(),
ClientID: resp.GetClientId(),
Success: resp.GetSuccess(),
})
case "set_led_ring":
var body ledRingAPIRequest
if err := json.Unmarshal(data, &body); err != nil {
writeLedRingStatus(conn, ledRingAPIResponse{Error: "invalid JSON"})
return
}
if body.Mode == "" {
writeLedRingStatus(conn, ledRingAPIResponse{Error: "mode required"})
return
}
writeLedRingStatus(conn, applyLedRing(link, body))
default:
writeStreamStatus(conn, StreamStatusMessage{
Type: "stream_status",
Error: "unknown type (set_stream, get_stream, set_accel_stream, get_accel_stream, set_led_ring)",
})
}
}
func serveExternalWS(conn *websocket.Conn, link *managedSerial, dash *wsHub, ctl *accelStreamCtl, portName string, hub *accelStreamHub) {
sub := hub.register(conn, portName)
defer hub.unregister(conn)
defer conn.Close()
for {
_, data, err := conn.ReadMessage()
if err != nil {
return
}
handleAccelWSCommand(conn, sub, data, link, dash, ctl, hub)
}
}
func mountExternalAPI(mux *http.ServeMux, portName string, defaultInterval time.Duration, hub *accelStreamHub, link *managedSerial, dash *wsHub, ctl *accelStreamCtl) {
defMs := int(defaultInterval / time.Millisecond)
mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path != "/" && r.URL.Path != "/api/v1" && r.URL.Path != "/api/v1/" {
http.NotFound(w, r)
return
}
if r.Method != http.MethodGet {
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
return
}
writeJSON(w, http.StatusOK, APIInfoResponse{
Name: "powerpod-external-api",
Version: "1",
SerialPort: portName,
WebSocket: "/ws",
DefaultIntervalMs: defMs,
MinIntervalMs: int(minAPIStreamInterval / time.Millisecond),
MaxIntervalMs: int(maxAPIStreamInterval / time.Millisecond),
Description: "WebSocket: accel stream + set_led_ring (modes: clear, color, progress, digit, blink, find-me)",
})
})
mux.HandleFunc("/ws", func(w http.ResponseWriter, r *http.Request) {
conn, err := wsUpgrader.Upgrade(w, r, nil)
if err != nil {
log.Printf("api websocket upgrade: %v", err)
return
}
serveExternalWS(conn, link, dash, ctl, portName, hub)
})
}
func runAPIServer(portName string, link *managedSerial, addr string, defaultInterval time.Duration, dash *wsHub, ctl *accelStreamCtl, stop <-chan struct{}) *http.Server {
hub := newAccelStreamHub(defaultInterval)
go runAccelStreamer(link, hub, dash, ctl, stop)
mux := http.NewServeMux()
mountExternalAPI(mux, portName, defaultInterval, hub, link, dash, ctl)
mountLedRingAPI(mux, link)
srv := &http.Server{Addr: addr, Handler: mux}
go func() {
log.Printf("external API http://localhost%s WebSocket ws://localhost%s/ws (default accel interval %s, per-client via set_stream)",
addr, addr, defaultInterval.String())
if err := srv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
log.Printf("external API server: %v", err)
}
}()
return srv
}
func shutdownAPIServer(srv *http.Server) {
if srv == nil {
return
}
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
_ = srv.Shutdown(ctx)
}