Unify external WebSocket push into a single input stream.

Replace separate accel/tap commands and messages with set_input_stream and input pushes that combine accel and tap per client, including pre_fetch timing.

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
simon 2026-05-31 13:47:39 +02:00
parent 498b89d7ba
commit 41a66d4417
2 changed files with 321 additions and 463 deletions

View File

@ -9,55 +9,61 @@ import (
"sync"
"time"
"github.com/gorilla/websocket"
"powerpod/gotool/pb"
"github.com/gorilla/websocket"
)
const (
defaultAccelStreamInterval = 16 * time.Millisecond
defaultPreFetchMs = 2
minAPIStreamInterval = 1 * time.Millisecond
maxAPIStreamInterval = 10 * time.Second
// How long tap events stay in API push/cache after first sight (matches dashboard).
apiTapDisplayMinMs = 2000
)
// AccelClientSample is one slave's cached accel on the master.
type AccelClientSample struct {
ClientID uint32 `json:"client_id"`
Valid bool `json:"valid"`
X int32 `json:"x,omitempty"`
Y int32 `json:"y,omitempty"`
Z int32 `json:"z,omitempty"`
AgeMs uint32 `json:"age_ms,omitempty"`
// InputClientSample is one slave's cached accel + tap state on the master.
type InputClientSample struct {
ClientID uint32 `json:"client_id"`
Valid bool `json:"valid"`
X int32 `json:"x,omitempty"`
Y int32 `json:"y,omitempty"`
Z int32 `json:"z,omitempty"`
AccelAgeMs uint32 `json:"accel_age_ms,omitempty"`
TapKind string `json:"tap_kind"`
TapAgeMs uint32 `json:"tap_age_ms,omitempty"`
}
// AccelStreamMessage is sent to external WebSocket clients (hello + accel samples).
type AccelStreamMessage struct {
Type string `json:"type"` // "hello" | "accel"
// InputStreamMessage is sent to external WebSocket clients (hello + input samples).
type InputStreamMessage struct {
Type string `json:"type"` // "hello" | "input"
Serial string `json:"serial_port,omitempty"`
IntervalMs int `json:"interval_ms,omitempty"`
PreFetchMs int `json:"pre_fetch_ms,omitempty"`
TapDisplayMinMs int `json:"tap_display_min_ms,omitempty"`
Commands []string `json:"commands,omitempty"`
Note string `json:"note,omitempty"`
T int64 `json:"t,omitempty"` // Unix nanoseconds
Success bool `json:"success,omitempty"`
Clients []AccelClientSample `json:"clients,omitempty"`
Clients []InputClientSample `json:"clients,omitempty"`
Error string `json:"error,omitempty"`
}
// StreamStatusMessage is the reply to set_stream / get_stream (this connection).
type StreamStatusMessage struct {
Type string `json:"type"` // "stream_status"
ReceiveAccel bool `json:"receive_accel"`
IntervalMs int `json:"interval_ms"`
Success bool `json:"success"`
Error string `json:"error,omitempty"`
Type string `json:"type"` // "stream_status"
ReceiveInput bool `json:"receive_input"`
IntervalMs int `json:"interval_ms"`
PreFetch int `json:"pre_fetch"`
Success bool `json:"success"`
Error string `json:"error,omitempty"`
}
// AccelStreamStatusMessage is the reply to set_accel_stream / get_accel_stream (slave).
type AccelStreamStatusMessage struct {
Type string `json:"type"` // "accel_stream_status"
// InputStreamStatusMessage is the reply to set_input_stream / get_input_stream (slave).
type InputStreamStatusMessage struct {
Type string `json:"type"` // "input_stream_status"
ClientID uint32 `json:"client_id"`
Enabled bool `json:"enabled"`
Success bool `json:"success"`
@ -65,33 +71,6 @@ type AccelStreamStatusMessage struct {
Error string `json:"error,omitempty"`
}
// TapClientEvent is one tap visible to API clients (fresh or within tap_display_min_ms).
type TapClientEvent struct {
ClientID uint32 `json:"client_id"`
Valid bool `json:"valid"`
Kind string `json:"kind,omitempty"` // single | double | triple
AgeMs uint32 `json:"age_ms,omitempty"`
ShownAtMs int64 `json:"shown_at_ms,omitempty"` // Unix ms when API first saw this tap
}
// TapStreamMessage is pushed to external WebSocket clients when receive_tap is on.
type TapStreamMessage struct {
Type string `json:"type"` // "tap"
T int64 `json:"t,omitempty"`
Success bool `json:"success,omitempty"`
Events []TapClientEvent `json:"events,omitempty"`
Error string `json:"error,omitempty"`
}
// TapStreamStatusMessage is the reply to set_tap_stream / get_tap_stream (this connection).
type TapStreamStatusMessage struct {
Type string `json:"type"` // "tap_stream_status"
ReceiveTap bool `json:"receive_tap"`
IntervalMs int `json:"interval_ms"`
Success bool `json:"success"`
Error string `json:"error,omitempty"`
}
// APIClientInfo is one registered slave (or slot) from CLIENT_INFO.
type APIClientInfo struct {
ID uint32 `json:"id"`
@ -101,7 +80,7 @@ type APIClientInfo struct {
Used bool `json:"used"`
LastPing uint32 `json:"last_ping"`
LastSuccessPing uint32 `json:"last_success_ping"`
AccelStream bool `json:"accel_stream"`
InputStream bool `json:"input_stream"`
TapNotifySingle bool `json:"tap_notify_single"`
TapNotifyDouble bool `json:"tap_notify_double"`
TapNotifyTriple bool `json:"tap_notify_triple"`
@ -132,6 +111,7 @@ type accelWSCommand struct {
ClientID uint32 `json:"client_id"`
Enable *bool `json:"enable"`
IntervalMs *int `json:"interval_ms"`
PreFetch *int `json:"pre_fetch"`
Single *bool `json:"single"`
DoubleTap *bool `json:"double_tap"`
Triple *bool `json:"triple"`
@ -144,6 +124,7 @@ type APIInfoResponse struct {
SerialPort string `json:"serial_port"`
WebSocket string `json:"websocket"`
DefaultIntervalMs int `json:"default_interval_ms"`
DefaultPreFetchMs int `json:"default_pre_fetch_ms"`
MinIntervalMs int `json:"min_interval_ms"`
MaxIntervalMs int `json:"max_interval_ms"`
TapDisplayMinMs int `json:"tap_display_min_ms"`
@ -153,21 +134,28 @@ type APIInfoResponse struct {
type cachedTapEvent struct {
kind string
shownAt time.Time
ageMs uint32
}
type wsSubscriber struct {
conn *websocket.Conn
receiveAccel bool
receiveTap bool
receiveInput bool
interval time.Duration
lastAccelSent time.Time
lastTapSent time.Time
preFetch time.Duration
lastInputSent time.Time
}
type pendingInputCache struct {
cache *pb.CacheStatusResponse
readAt time.Time
readErr error
}
type accelStreamHub struct {
mu sync.RWMutex
clients map[*websocket.Conn]*wsSubscriber
defaultInterval time.Duration
defaultPreFetch time.Duration
configChanged chan struct{}
recentTaps map[uint32]cachedTapEvent
}
@ -176,6 +164,7 @@ func newAccelStreamHub(defaultInterval time.Duration) *accelStreamHub {
return &accelStreamHub{
clients: make(map[*websocket.Conn]*wsSubscriber),
defaultInterval: defaultInterval,
defaultPreFetch: defaultPreFetchMs * time.Millisecond,
configChanged: make(chan struct{}, 1),
}
}
@ -197,26 +186,39 @@ func clampAPIInterval(d time.Duration) time.Duration {
return d
}
func clampPreFetch(d time.Duration) time.Duration {
if d < 0 {
return 0
}
if d > maxAPIStreamInterval {
return maxAPIStreamInterval
}
return d
}
func (h *accelStreamHub) register(conn *websocket.Conn, portName string) *wsSubscriber {
sub := &wsSubscriber{
conn: conn,
receiveAccel: false,
receiveInput: false,
interval: h.defaultInterval,
preFetch: h.defaultPreFetch,
}
h.mu.Lock()
h.clients[conn] = sub
h.mu.Unlock()
hello := AccelStreamMessage{
hello := InputStreamMessage{
Type: "hello",
Serial: portName,
IntervalMs: int(h.defaultInterval / time.Millisecond),
PreFetchMs: int(h.defaultPreFetch / time.Millisecond),
TapDisplayMinMs: apiTapDisplayMinMs,
Note: "set_tap_notify configures slave S/D/T only; set_tap_stream enables tap polling/push",
Note: "set_tap_notify configures slave S/D/T only; set_stream enables input polling/push on this connection",
Commands: []string{
"list_clients",
"set_stream", "get_stream", "set_accel_stream", "get_accel_stream",
"set_tap_stream", "get_tap_stream", "set_tap_notify", "get_tap_notify",
"set_stream", "get_stream",
"set_input_stream", "get_input_stream",
"set_tap_notify", "get_tap_notify",
"set_led_ring", "get_battery",
},
}
@ -229,36 +231,25 @@ func (h *accelStreamHub) register(conn *websocket.Conn, portName string) *wsSubs
func (h *accelStreamHub) unregister(conn *websocket.Conn) {
h.mu.Lock()
delete(h.clients, conn)
anyTap := false
anyInput := false
for _, sub := range h.clients {
if sub.receiveTap {
anyTap = true
if sub.receiveInput {
anyInput = true
break
}
}
if !anyTap {
if !anyInput {
h.recentTaps = nil
}
h.mu.Unlock()
h.notifyConfigChanged()
}
func (h *accelStreamHub) anyWantsAccel() bool {
func (h *accelStreamHub) anyWantsInput() bool {
h.mu.RLock()
defer h.mu.RUnlock()
for _, sub := range h.clients {
if sub.receiveAccel {
return true
}
}
return false
}
func (h *accelStreamHub) anyWantsTap() bool {
h.mu.RLock()
defer h.mu.RUnlock()
for _, sub := range h.clients {
if sub.receiveTap {
if sub.receiveInput {
return true
}
}
@ -270,7 +261,7 @@ func (h *accelStreamHub) minWantedInterval() time.Duration {
defer h.mu.RUnlock()
var min time.Duration
for _, sub := range h.clients {
if !sub.receiveAccel && !sub.receiveTap {
if !sub.receiveInput {
continue
}
if min == 0 || sub.interval < min {
@ -283,20 +274,28 @@ func (h *accelStreamHub) minWantedInterval() time.Duration {
return min
}
func (h *accelStreamHub) setStream(sub *wsSubscriber, enable bool, intervalMs *int) StreamStatusMessage {
func (h *accelStreamHub) setStream(sub *wsSubscriber, enable bool, intervalMs, preFetchMs *int) StreamStatusMessage {
h.mu.Lock()
sub.receiveAccel = enable
sub.receiveInput = enable
if !enable {
h.recentTaps = nil
}
if intervalMs != nil {
sub.interval = clampAPIInterval(time.Duration(*intervalMs) * time.Millisecond)
}
if preFetchMs != nil {
sub.preFetch = clampPreFetch(time.Duration(*preFetchMs) * time.Millisecond)
}
ms := int(sub.interval / time.Millisecond)
pf := int(sub.preFetch / time.Millisecond)
h.mu.Unlock()
h.notifyConfigChanged()
return StreamStatusMessage{
Type: "stream_status",
ReceiveAccel: enable,
ReceiveInput: enable,
IntervalMs: ms,
PreFetch: pf,
Success: true,
}
}
@ -306,45 +305,47 @@ func (h *accelStreamHub) getStream(sub *wsSubscriber) StreamStatusMessage {
defer h.mu.RUnlock()
return StreamStatusMessage{
Type: "stream_status",
ReceiveAccel: sub.receiveAccel,
ReceiveInput: sub.receiveInput,
IntervalMs: int(sub.interval / time.Millisecond),
PreFetch: int(sub.preFetch / time.Millisecond),
Success: true,
}
}
func (h *accelStreamHub) setTapStream(sub *wsSubscriber, enable bool, intervalMs *int) TapStreamStatusMessage {
h.mu.Lock()
sub.receiveTap = enable
if !enable {
h.recentTaps = nil
}
if intervalMs != nil {
sub.interval = clampAPIInterval(time.Duration(*intervalMs) * time.Millisecond)
}
ms := int(sub.interval / time.Millisecond)
h.mu.Unlock()
h.notifyConfigChanged()
return TapStreamStatusMessage{
Type: "tap_stream_status",
ReceiveTap: enable,
IntervalMs: ms,
Success: true,
}
}
func (h *accelStreamHub) getTapStream(sub *wsSubscriber) TapStreamStatusMessage {
func (h *accelStreamHub) streamTiming(now time.Time) (needRead, needDeliver bool, waitPreFetch time.Duration) {
h.mu.RLock()
defer h.mu.RUnlock()
return TapStreamStatusMessage{
Type: "tap_stream_status",
ReceiveTap: sub.receiveTap,
IntervalMs: int(sub.interval / time.Millisecond),
Success: true,
for _, sub := range h.clients {
if !sub.receiveInput {
continue
}
if sub.lastInputSent.IsZero() {
needRead = true
needDeliver = true
if sub.preFetch > waitPreFetch {
waitPreFetch = sub.preFetch
}
continue
}
nextPush := sub.lastInputSent.Add(sub.interval)
readAt := nextPush.Add(-sub.preFetch)
if !now.Before(readAt) {
needRead = true
}
if !now.Before(nextPush) {
needDeliver = true
if sub.preFetch > waitPreFetch {
waitPreFetch = sub.preFetch
}
}
}
return needRead, needDeliver, waitPreFetch
}
func (h *accelStreamHub) ingestTapEvents(incoming []TapClientEvent) []TapClientEvent {
func (h *accelStreamHub) ingestTapFromCache(cache *pb.CacheStatusResponse) {
if cache == nil {
return
}
h.mu.Lock()
defer h.mu.Unlock()
@ -352,39 +353,67 @@ func (h *accelStreamHub) ingestTapEvents(incoming []TapClientEvent) []TapClientE
if h.recentTaps == nil {
h.recentTaps = make(map[uint32]cachedTapEvent)
}
for _, e := range incoming {
if !e.Valid || e.Kind == "" {
for _, c := range cache.GetClients() {
t := c.GetTap()
if t == nil {
continue
}
h.recentTaps[e.ClientID] = cachedTapEvent{kind: e.Kind, shownAt: now}
kind := tapKindLabelPB(t.GetKind())
if kind == "" {
continue
}
h.recentTaps[c.GetClientId()] = cachedTapEvent{
kind: kind,
shownAt: now,
ageMs: t.GetAgeMs(),
}
}
return h.activeTapEventsLocked(now)
h.pruneRecentTapsLocked(now)
}
func (h *accelStreamHub) activeTapEventsLocked(now time.Time) []TapClientEvent {
func (h *accelStreamHub) pruneRecentTapsLocked(now time.Time) {
if len(h.recentTaps) == 0 {
return nil
return
}
cutoff := now.Add(-apiTapDisplayMinMs * time.Millisecond)
out := make([]TapClientEvent, 0, len(h.recentTaps))
for id, ev := range h.recentTaps {
if ev.shownAt.Before(cutoff) {
delete(h.recentTaps, id)
continue
}
shownAtMs := ev.shownAt.UnixMilli()
out = append(out, TapClientEvent{
ClientID: id,
Valid: true,
Kind: ev.kind,
AgeMs: uint32(now.Sub(ev.shownAt).Milliseconds()),
ShownAtMs: shownAtMs,
})
}
}
func (h *accelStreamHub) inputClientsFromCacheLocked(cache *pb.CacheStatusResponse, now time.Time) []InputClientSample {
h.pruneRecentTapsLocked(now)
out := make([]InputClientSample, 0, len(cache.GetClients()))
for _, c := range cache.GetClients() {
sample := InputClientSample{
ClientID: c.GetClientId(),
TapKind: "none",
}
if a := c.GetAccel(); a != nil {
sample.Valid = a.GetValid()
if a.GetValid() {
sample.X = a.GetX()
sample.Y = a.GetY()
sample.Z = a.GetZ()
sample.AccelAgeMs = a.GetAgeMs()
}
}
if ev, ok := h.recentTaps[c.GetClientId()]; ok {
sample.TapKind = ev.kind
if t := c.GetTap(); t != nil && tapKindLabelPB(t.GetKind()) == ev.kind {
sample.TapAgeMs = t.GetAgeMs()
} else {
sample.TapAgeMs = ev.ageMs + uint32(now.Sub(ev.shownAt).Milliseconds())
}
}
out = append(out, sample)
}
return out
}
func (h *accelStreamHub) deliver(msg AccelStreamMessage) {
func (h *accelStreamHub) deliverInput(msg InputStreamMessage) {
data, err := json.Marshal(msg)
if err != nil {
return
@ -394,13 +423,13 @@ func (h *accelStreamHub) deliver(msg AccelStreamMessage) {
h.mu.Lock()
defer h.mu.Unlock()
for conn, sub := range h.clients {
if !sub.receiveAccel {
if !sub.receiveInput {
continue
}
if !sub.lastAccelSent.IsZero() && now.Sub(sub.lastAccelSent) < sub.interval {
if !sub.lastInputSent.IsZero() && now.Sub(sub.lastInputSent) < sub.interval {
continue
}
sub.lastAccelSent = now
sub.lastInputSent = now
if err := conn.WriteMessage(websocket.TextMessage, data); err != nil {
delete(h.clients, conn)
_ = conn.Close()
@ -408,155 +437,79 @@ func (h *accelStreamHub) deliver(msg AccelStreamMessage) {
}
}
func (h *accelStreamHub) deliverTap(msg TapStreamMessage) {
data, err := json.Marshal(msg)
if err != nil {
return
}
func runInputStreamer(link *managedSerial, hub *accelStreamHub, dash *wsHub, ctl *accelStreamCtl, tapCtl *tapNotifyCtl, stop <-chan struct{}) {
ticker := time.NewTicker(minAPIStreamInterval)
defer ticker.Stop()
now := time.Now()
h.mu.Lock()
defer h.mu.Unlock()
for conn, sub := range h.clients {
if !sub.receiveTap {
continue
}
if !sub.lastTapSent.IsZero() && now.Sub(sub.lastTapSent) < sub.interval {
continue
}
sub.lastTapSent = now
if err := conn.WriteMessage(websocket.TextMessage, data); err != nil {
delete(h.clients, conn)
_ = conn.Close()
}
}
}
func runAccelStreamer(link *managedSerial, hub *accelStreamHub, dash *wsHub, ctl *accelStreamCtl, tapCtl *tapNotifyCtl, stop <-chan struct{}) {
var ticker *time.Ticker
var tick <-chan time.Time
resetTicker := func() {
if ticker != nil {
ticker.Stop()
}
interval := hub.minWantedInterval()
ticker = time.NewTicker(interval)
tick = ticker.C
}
resetTicker()
defer func() {
if ticker != nil {
ticker.Stop()
}
}()
var pending *pendingInputCache
for {
select {
case <-stop:
return
case <-hub.configChanged:
resetTicker()
case <-tick:
wantAccel := hub.anyWantsAccel() && accelStreamPollingActive(dash, ctl)
wantTap := hub.anyWantsTap()
if !wantAccel && !wantTap {
pending = nil
case now := <-ticker.C:
if !hub.anyWantsInput() || !inputPollingActive(dash, ctl, tapCtl) {
pending = nil
continue
}
now := time.Now().UnixNano()
cache, err := link.readCacheStatusPoll()
if errors.Is(err, errUARTBusy) {
if wantAccel {
hub.deliver(AccelStreamMessage{
Type: "accel",
T: now,
Success: false,
Error: "uart busy",
})
needRead, needDeliver, waitPreFetch := hub.streamTiming(now)
if needRead && pending == nil {
cache, err := link.readCacheStatusPoll()
if err != nil {
pending = &pendingInputCache{readErr: err, readAt: now}
} else {
hub.ingestTapFromCache(cache)
pending = &pendingInputCache{cache: cache, readAt: now}
}
if wantTap {
hub.deliverTap(TapStreamMessage{
Type: "tap",
T: now,
Success: false,
Error: "uart busy",
})
}
continue
}
if err != nil {
if wantAccel {
hub.deliver(AccelStreamMessage{
Type: "accel",
T: now,
Success: false,
Error: err.Error(),
})
}
if wantTap {
hub.deliverTap(TapStreamMessage{
Type: "tap",
T: now,
Success: false,
Error: err.Error(),
})
}
if !needDeliver || pending == nil {
continue
}
if wantAccel {
samples := accelSamplesFromCacheStatus(cache)
clients := make([]AccelClientSample, 0, len(samples))
for _, s := range samples {
clients = append(clients, AccelClientSample{
ClientID: s.GetClientId(),
Valid: s.GetValid(),
X: s.GetX(),
Y: s.GetY(),
Z: s.GetZ(),
AgeMs: s.GetAgeMs(),
})
ts := now.UnixNano()
if pending.readErr != nil {
errMsg := pending.readErr.Error()
if errors.Is(pending.readErr, errUARTBusy) {
errMsg = "uart busy"
}
hub.deliver(AccelStreamMessage{
Type: "accel",
T: now,
hub.deliverInput(InputStreamMessage{
Type: "input",
T: ts,
Success: false,
Error: errMsg,
})
pending = nil
continue
}
if pending.cache != nil && now.Sub(pending.readAt) >= waitPreFetch {
hub.mu.RLock()
clients := hub.inputClientsFromCacheLocked(pending.cache, now)
hub.mu.RUnlock()
hub.deliverInput(InputStreamMessage{
Type: "input",
T: ts,
Success: true,
Clients: clients,
})
}
if wantTap {
events := tapEventsFromCacheStatus(cache)
fresh := make([]TapClientEvent, 0, len(events))
for _, e := range events {
if !e.GetValid() {
continue
}
fresh = append(fresh, TapClientEvent{
ClientID: e.GetClientId(),
Valid: true,
Kind: tapKindLabelPB(e.GetKind()),
AgeMs: e.GetAgeMs(),
})
}
visible := hub.ingestTapEvents(fresh)
if len(visible) > 0 {
hub.deliverTap(TapStreamMessage{
Type: "tap",
T: now,
Success: true,
Events: visible,
})
}
pending = nil
}
}
}
}
func accelStreamPollingActive(dash *wsHub, ctl *accelStreamCtl) bool {
func inputPollingActive(dash *wsHub, ctl *accelStreamCtl, tapCtl *tapNotifyCtl) bool {
if ctl != nil && ctl.Any() {
return true
}
if tapCtl != nil && tapCtl.Any() {
return true
}
return dash != nil && dash.anyAccelStreamEnabled()
}
@ -586,9 +539,9 @@ func writeLedRingStatus(conn *websocket.Conn, out ledRingAPIResponse) {
_ = conn.WriteMessage(websocket.TextMessage, data)
}
func writeAccelStreamStatus(conn *websocket.Conn, out accelStreamAPIResponse) {
msg := AccelStreamStatusMessage{
Type: "accel_stream_status",
func writeInputStreamStatus(conn *websocket.Conn, out accelStreamAPIResponse) {
msg := InputStreamStatusMessage{
Type: "input_stream_status",
ClientID: out.ClientID,
Enabled: out.Enabled,
Success: out.Success,
@ -602,14 +555,6 @@ func writeAccelStreamStatus(conn *websocket.Conn, out accelStreamAPIResponse) {
_ = conn.WriteMessage(websocket.TextMessage, data)
}
func writeTapStreamStatus(conn *websocket.Conn, msg TapStreamStatusMessage) {
data, err := json.Marshal(msg)
if err != nil {
return
}
_ = conn.WriteMessage(websocket.TextMessage, data)
}
func clientInfoToAPI(c *pb.ClientInfo) APIClientInfo {
return APIClientInfo{
ID: c.GetId(),
@ -619,7 +564,7 @@ func clientInfoToAPI(c *pb.ClientInfo) APIClientInfo {
Used: c.GetUsed(),
LastPing: c.GetLastPing(),
LastSuccessPing: c.GetLastSuccessPing(),
AccelStream: c.GetAccelStreamEnabled(),
InputStream: c.GetAccelStreamEnabled(),
TapNotifySingle: c.GetTapNotifySingle(),
TapNotifyDouble: c.GetTapNotifyDouble(),
TapNotifyTriple: c.GetTapNotifyTriple(),
@ -717,28 +662,28 @@ func handleAccelWSCommand(conn *websocket.Conn, sub *wsSubscriber, data []byte,
})
return
}
writeStreamStatus(conn, hub.setStream(sub, *cmd.Enable, cmd.IntervalMs))
writeStreamStatus(conn, hub.setStream(sub, *cmd.Enable, cmd.IntervalMs, cmd.PreFetch))
case "get_stream":
writeStreamStatus(conn, hub.getStream(sub))
case "set_accel_stream":
case "set_input_stream":
if cmd.ClientID == 0 {
writeAccelStreamStatus(conn, accelStreamAPIResponse{Error: "client_id required"})
writeInputStreamStatus(conn, accelStreamAPIResponse{Error: "client_id required"})
return
}
if cmd.Enable == nil {
writeAccelStreamStatus(conn, accelStreamAPIResponse{
writeInputStreamStatus(conn, accelStreamAPIResponse{
ClientID: cmd.ClientID,
Error: "enable required",
})
return
}
writeAccelStreamStatus(conn, applyAccelStreamClient(link, dash, ctl, cmd.ClientID, *cmd.Enable))
writeInputStreamStatus(conn, applyAccelStreamClient(link, dash, ctl, cmd.ClientID, *cmd.Enable))
case "get_accel_stream":
case "get_input_stream":
if cmd.ClientID == 0 {
writeAccelStreamStatus(conn, accelStreamAPIResponse{Error: "client_id required"})
writeInputStreamStatus(conn, accelStreamAPIResponse{Error: "client_id required"})
return
}
resp, err := link.AccelStreamPoll(&pb.AccelStreamRequest{
@ -746,7 +691,7 @@ func handleAccelWSCommand(conn *websocket.Conn, sub *wsSubscriber, data []byte,
ClientId: cmd.ClientID,
})
if err != nil {
writeAccelStreamStatus(conn, accelStreamAPIResponse{
writeInputStreamStatus(conn, accelStreamAPIResponse{
ClientID: cmd.ClientID,
Error: err.Error(),
})
@ -755,25 +700,12 @@ func handleAccelWSCommand(conn *websocket.Conn, sub *wsSubscriber, data []byte,
if ctl != nil {
ctl.Set(cmd.ClientID, resp.GetEnabled())
}
writeAccelStreamStatus(conn, accelStreamAPIResponse{
writeInputStreamStatus(conn, accelStreamAPIResponse{
Enabled: resp.GetEnabled(),
ClientID: resp.GetClientId(),
Success: resp.GetSuccess(),
})
case "set_tap_stream":
if cmd.Enable == nil {
writeTapStreamStatus(conn, TapStreamStatusMessage{
Type: "tap_stream_status",
Error: "enable required",
})
return
}
writeTapStreamStatus(conn, hub.setTapStream(sub, *cmd.Enable, cmd.IntervalMs))
case "get_tap_stream":
writeTapStreamStatus(conn, hub.getTapStream(sub))
case "set_tap_notify":
if cmd.AllClients {
if cmd.Single == nil || cmd.DoubleTap == nil || cmd.Triple == nil {
@ -827,11 +759,11 @@ func handleAccelWSCommand(conn *websocket.Conn, sub *wsSubscriber, data []byte,
tapCtl.Set(cmd.ClientID, resp.GetSingle(), resp.GetDoubleTap(), resp.GetTriple())
}
writeTapNotifyStatus(conn, tapNotifyAPIResponse{
ClientID: cmd.ClientID,
Success: resp.GetSuccess(),
Single: resp.GetSingle(),
ClientID: cmd.ClientID,
Success: resp.GetSuccess(),
Single: resp.GetSingle(),
DoubleTap: resp.GetDoubleTap(),
Triple: resp.GetTriple(),
Triple: resp.GetTriple(),
})
case "set_led_ring":
@ -860,7 +792,7 @@ func handleAccelWSCommand(conn *websocket.Conn, sub *wsSubscriber, data []byte,
default:
writeStreamStatus(conn, StreamStatusMessage{
Type: "stream_status",
Error: "unknown type (list_clients, set_stream, get_stream, set_accel_stream, get_accel_stream, set_tap_stream, get_tap_stream, set_tap_notify, get_tap_notify, set_led_ring, get_battery)",
Error: "unknown type (list_clients, set_stream, get_stream, set_input_stream, get_input_stream, set_tap_notify, get_tap_notify, set_led_ring, get_battery)",
})
}
}
@ -896,10 +828,11 @@ func mountExternalAPI(mux *http.ServeMux, portName string, defaultInterval time.
SerialPort: portName,
WebSocket: "/ws",
DefaultIntervalMs: defMs,
DefaultPreFetchMs: defaultPreFetchMs,
MinIntervalMs: int(minAPIStreamInterval / time.Millisecond),
MaxIntervalMs: int(maxAPIStreamInterval / time.Millisecond),
TapDisplayMinMs: apiTapDisplayMinMs,
Description: "WebSocket: set_accel_stream + set_stream for accel; set_tap_notify (slave S/D/T) then set_tap_stream for tap events (shown ≥2s)",
Description: "WebSocket: set_input_stream + set_stream for input (accel + tap); set_tap_notify configures slave tap kinds",
})
})
@ -915,7 +848,7 @@ func mountExternalAPI(mux *http.ServeMux, portName string, defaultInterval time.
func runAPIServer(portName string, link *managedSerial, addr string, defaultInterval time.Duration, dash *wsHub, ctl *accelStreamCtl, tapCtl *tapNotifyCtl, stop <-chan struct{}) *http.Server {
hub := newAccelStreamHub(defaultInterval)
go runAccelStreamer(link, hub, dash, ctl, tapCtl, stop)
go runInputStreamer(link, hub, dash, ctl, tapCtl, stop)
mux := http.NewServeMux()
mountExternalAPI(mux, portName, defaultInterval, hub, link, dash, ctl, tapCtl)
@ -924,7 +857,7 @@ func runAPIServer(portName string, link *managedSerial, addr string, defaultInte
srv := &http.Server{Addr: addr, Handler: mux}
go func() {
log.Printf("external API http://localhost%s WebSocket ws://localhost%s/ws (default stream interval %s, per-client via set_stream / set_tap_stream)",
log.Printf("external API http://localhost%s WebSocket ws://localhost%s/ws (default stream interval %s, per-client via set_stream)",
addr, addr, defaultInterval.String())
if err := srv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
log.Printf("external API server: %v", err)

View File

@ -1,15 +1,10 @@
# WebSocket API
`go run . -port /dev/ttyUSB0 serve` exposes two WebSocket endpoints. They share the same UART link but serve different purposes.
`go run . -port /dev/ttyUSB0 serve` exposes the WebSocket enpoint
| URL | Port (default) | Role |
|-----|----------------|------|
| `ws://localhost:8080/ws` | Dashboard (`-addr`) | Server → client only: full `DashboardState` JSON (~2 s poll + live-stream accel/tap) |
| `ws://localhost:8081/ws` | External API (`-api-addr`) | Request/response commands + optional **accel** / **tap** push streams |
Disable the external server with `-api-addr ""`.
CLI overview and UART commands: [`../README.md`](../README.md). HTTP endpoints: [`API_REST.md`](API_REST.md).
| `ws://localhost:8081/ws` | External API (`-api-addr`) | Request/response commands + optional **input** push stream |
---
@ -19,44 +14,49 @@ CLI overview and UART commands: [`../README.md`](../README.md). HTTP endpoints:
1. Connect → server sends **`hello`** (receive off; lists available commands).
2. Send JSON commands → server replies with a matching `*_status` or `client_list` message (one reply per command).
3. After `set_stream` / `set_tap_stream` with `enable: true`, the server may send **`accel`** and/or **`tap`** messages **without** a prior command (push stream).
3. After `set_stream` with `enable: true`, the server may send **`input`** messages **without** a prior command (push stream).
Commands and stream pushes are multiplexed on one socket. While streaming, always parse `type` and branch (status vs sample vs error).
### Two layers (accel and tap)
### Two layers (firmware vs host)
| Layer | Commands | Effect |
|-------|----------|--------|
| **Firmware (ESP-NOW)** | `set_accel_stream`, `set_tap_notify` | Per `client_id`: slave sends accel or tap kinds to the master |
| **This connection (host)** | `set_stream`, `set_tap_stream` | Whether **you** receive push JSON and at what rate (`interval_ms`, 1 ms … 10 s) |
| **Firmware (ESP-NOW)** | `set_input_stream`, `set_tap_notify` | Per `client_id`: slave sends accel samples and/or tap events to the master |
| **This connection (host)** | `set_stream` | Whether **you** receive push JSON, at what rate (`interval_ms`, 1 ms … 10 s), and how early the UART read starts (`pre_fetch`) |
- **Accel UART polling** runs only if at least one connection has `receive_accel: true` **and** at least one slave streams accel (`set_accel_stream` or dashboard).
- **Tap UART polling** runs only if at least one connection has `receive_tap: true` (`set_tap_stream`). `set_tap_notify` alone does **not** poll.
- **UART polling** runs only if at least one connection has `receive_input: true` (`set_stream`) **and** at least one slave streams input (`set_input_stream`) or has tap notify enabled (`set_tap_notify`).
- **`set_tap_notify` alone** configures which tap kinds the slave reports; it does **not** enable host push by itself — you still need `set_stream`.
Typical sequence:
1. `list_clients` → slave IDs
2. Per slave: `set_accel_stream` / `set_tap_notify` as needed
3. `set_stream` and/or `set_tap_stream` with `"enable": true`
4. Read push messages in a loop
2. Per slave: `set_input_stream` and/or `set_tap_notify` as needed
3. `set_stream` with `"enable": true`
4. Read **`input`** messages in a loop
There is **no per-slave filter** on push messages: each `accel` contains all cached slaves; each `tap` contains all visible events. Filter by `client_id` in your app.
There is **no per-slave filter** on push messages: each `input` contains all cached slaves. Filter by `client_id` in your app.
---
## Push stream messages
These are the samples you get after enabling receive. Interval is per WebSocket connection; the server UART poll uses the **minimum** `interval_ms` among all subscribers that want accel or tap.
These are the samples you get after enabling receive. Timing is per WebSocket connection:
### `accel` (type `"accel"`)
- **`interval_ms`** — minimum time between consecutive `input` pushes on this socket.
- **`pre_fetch`** — milliseconds **before** each scheduled push when the host sends the UART cache read, so the master has time to collect data from all slaves before the JSON goes out.
Sent only when `set_stream` has `enable: true`, a slave streams accel, and the poll tick fires for this connection.
The server UART poll uses the **minimum** `interval_ms` among all subscribers with `receive_input: true`.
**Success** — all slaves with a cache entry on the master (not only those with `valid: true`):
### `input` (type `"input"`)
Sent when `set_stream` has `enable: true` and the poll tick fires for this connection (after the UART read started `pre_fetch` ms earlier). Each message combines the latest accel cache and visible tap state for every slave slot on the master.
**Success** — all slaves with a cache entry (not only those with `valid: true`):
```json
{
"type": "accel",
"type": "input",
"t": 1716900123456789012,
"success": true,
"clients": [
@ -66,11 +66,14 @@ Sent only when `set_stream` has `enable: true`, a slave streams accel, and the p
"x": 12,
"y": -34,
"z": 16384,
"age_ms": 8
"accel_age_ms": 8,
"tap_kind": "single",
"tap_age_ms": 3
},
{
"client_id": 42,
"valid": false
"valid": false,
"tap_kind": "none"
}
]
}
@ -82,15 +85,19 @@ Sent only when `set_stream` has `enable: true`, a slave streams accel, and the p
| `success` | `true` if `CACHE_STATUS` succeeded |
| `clients[]` | One entry per slave slot in the master cache |
| `client_id` | ESP-NOW client id (same as `list_clients`) |
| `valid` | `false` if no sample yet or stale; omit `x`/`y`/`z` when false |
| `valid` | `false` if no accel sample yet or stale; omit `x`/`y`/`z` when false |
| `x`, `y`, `z` | Raw accelerometer LSB (BMA456, ±2 g scale on the pod) |
| `age_ms` | Milliseconds since the master received this sample |
| `accel_age_ms` | Milliseconds since the master received this accel sample |
| `tap_kind` | `"none"`, `"single"`, `"double"`, or `"triple"` |
| `tap_age_ms` | Milliseconds since the tap was seen in the master cache; omit when `tap_kind` is `"none"` |
Tap events stay visible for **`tap_display_min_ms`** (2000 ms, also in `hello`) after the API first saw them, even if the hardware age grows.
**Failure** (e.g. UART busy):
```json
{
"type": "accel",
"type": "input",
"t": 1716900123456789012,
"success": false,
"error": "uart busy"
@ -99,53 +106,6 @@ Sent only when `set_stream` has `enable: true`, a slave streams accel, and the p
No `clients` array on failure.
### `tap` (type `"tap"`)
Sent only when `set_tap_stream` has `enable: true` and there is at least one event to show.
Events appear when the master cache reports a new tap. Each event stays in push payloads for **`tap_display_min_ms`** (2000 ms, also in `hello`) after the API first saw it, even if the hardware age grows.
**Success**:
```json
{
"type": "tap",
"t": 1716900123456789012,
"success": true,
"events": [
{
"client_id": 16,
"valid": true,
"kind": "single",
"age_ms": 3,
"shown_at_ms": 1717000000123
}
]
}
```
| Field | Meaning |
|-------|---------|
| `t` | Unix timestamp in **nanoseconds** (poll time) |
| `events[]` | All taps currently “on screen” for the API |
| `client_id` | Slave that tapped |
| `kind` | `"single"`, `"double"`, or `"triple"` |
| `age_ms` | Age in the master cache when read |
| `shown_at_ms` | Unix **milliseconds** when this host first included the event |
If no events are visible, **no** `tap` message is sent on that tick (unlike accel, which can send empty `clients` only on success with cache data).
**Failure**:
```json
{
"type": "tap",
"t": 1716900123456789012,
"success": false,
"error": "uart busy"
}
```
---
## Commands (request → response)
@ -159,13 +119,13 @@ Send one JSON object per message. Field `type` selects the command.
"type": "hello",
"serial_port": "/dev/ttyUSB0",
"interval_ms": 16,
"pre_fetch_ms": 2,
"tap_display_min_ms": 2000,
"note": "set_tap_notify configures slave S/D/T only; set_tap_stream enables tap polling/push",
"note": "set_tap_notify configures slave S/D/T only; set_stream enables input polling/push on this connection",
"commands": [
"list_clients",
"set_stream", "get_stream",
"set_accel_stream", "get_accel_stream",
"set_tap_stream", "get_tap_stream",
"set_input_stream", "get_input_stream",
"set_tap_notify", "get_tap_notify",
"set_led_ring", "get_battery"
]
@ -191,7 +151,7 @@ Response `client_list`:
"used": true,
"last_ping": 1234,
"last_success_ping": 1200,
"accel_stream": false,
"input_stream": false,
"tap_notify_single": false,
"tap_notify_double": false,
"tap_notify_triple": false
@ -200,45 +160,38 @@ Response `client_list`:
}
```
### `set_stream` / `get_stream` (receive accel on this connection)
### `set_stream` / `get_stream` (receive input on this connection)
```json
{"type":"set_stream","enable":true,"interval_ms":32}
{"type":"set_stream","enable":true,"interval_ms":32,"pre_fetch":2}
{"type":"get_stream"}
```
| Field | Meaning |
|-------|---------|
| `enable` | Turn push stream on/off for this connection |
| `interval_ms` | Minimum time between `input` pushes (1 … 10000) |
| `pre_fetch` | Milliseconds before each push when the host starts the UART cache read; optional, default in `hello` (`pre_fetch_ms`) |
Response `stream_status`:
```json
{"type":"stream_status","receive_accel":true,"interval_ms":32,"success":true}
{"type":"stream_status","receive_input":true,"interval_ms":32,"pre_fetch":2,"success":true}
```
### `set_accel_stream` / `get_accel_stream` (firmware, per slave)
### `set_input_stream` / `get_input_stream` (firmware, per slave)
`client_id` required (> 0).
`client_id` required (> 0). Enables accel streaming from the slave to the master.
```json
{"type":"set_accel_stream","client_id":16,"enable":true}
{"type":"get_accel_stream","client_id":16}
{"type":"set_input_stream","client_id":16,"enable":true}
{"type":"get_input_stream","client_id":16}
```
Response `accel_stream_status`:
Response `input_stream_status`:
```json
{"type":"accel_stream_status","client_id":16,"enabled":true,"success":true}
```
### `set_tap_stream` / `get_tap_stream` (receive tap on this connection)
```json
{"type":"set_tap_stream","enable":true,"interval_ms":16}
{"type":"get_tap_stream"}
```
Response `tap_stream_status`:
```json
{"type":"tap_stream_status","receive_tap":true,"interval_ms":16,"success":true}
{"type":"input_stream_status","client_id":16,"enabled":true,"success":true}
```
### `set_tap_notify` / `get_tap_notify` (firmware, per slave)
@ -266,83 +219,55 @@ Response `tap_notify_status`:
### `set_led_ring`
Same JSON body as [`POST /api/led-ring`](API_REST.md#led-ring) with `"type":"set_led_ring"` added. Reply: `led_ring_status`.
Control the LED ring on the master or a slave.
```json
{"type":"set_led_ring","mode":"color","client_id":16,"r":255,"g":0,"b":0,"intensity":128}
{"type":"set_led_ring","mode":"digit","client_id":0,"digit":3,"r":0,"g":255,"b":0}
{"type":"set_led_ring","mode":"find-me","all_clients":true,"slaves_only":true}
```
| `mode` | Notes |
|--------|--------|
| `clear` | Turn off |
| `color` | Full ring RGB + `intensity` |
| `progress` | `progress` 0100 |
| `digit` | `digit` 010 |
| `blink` | `blink_ms`, `blink_count` |
| `find-me` | Locate pod |
Use `client_id` (`0` = master) or `all_clients` (+ optional `slaves_only`) for broadcast.
Response `led_ring_status`:
```json
{"type":"led_ring_status","success":true,"mode":5,"client_id":16,"slaves_updated":1}
```
### `get_battery`
Body: `{"type":"get_battery","all_clients":true}` or `"client_id":16`. Default if omitted: all clients.
Read cached battery samples from the master. Slaves push battery every **30 s**; this command reads the master cache.
Reply: `battery_status` with `samples[]` (see REST doc).
---
## Examples
### Accel stream
```python
import asyncio, json, websockets
async def main():
async with websockets.connect("ws://127.0.0.1:8081/ws") as ws:
print(await ws.recv()) # hello
await ws.send(json.dumps({"type": "list_clients"}))
clients = json.loads(await ws.recv())["clients"]
for c in clients:
if not c.get("available"):
continue
await ws.send(json.dumps({
"type": "set_accel_stream", "client_id": c["id"], "enable": True
}))
await ws.recv() # accel_stream_status
await ws.send(json.dumps({"type": "set_stream", "enable": True, "interval_ms": 16}))
await ws.recv() # stream_status
while True:
msg = json.loads(await ws.recv())
if msg.get("type") != "accel":
continue
if not msg.get("success"):
print("error:", msg.get("error"))
continue
for c in msg.get("clients", []):
if c.get("valid"):
print(c["client_id"], c["x"], c["y"], c["z"], "age", c.get("age_ms"))
asyncio.run(main())
```json
{"type":"get_battery","all_clients":true}
{"type":"get_battery","client_id":16}
```
### Tap stream
Default if omitted: all clients.
```python
import asyncio, json, websockets
Response `battery_status`:
async def main():
async with websockets.connect("ws://127.0.0.1:8081/ws") as ws:
print(await ws.recv()) # hello
await ws.send(json.dumps({
"type": "set_tap_notify", "client_id": 16,
"single": True, "double_tap": False, "triple": False
}))
await ws.recv() # tap_notify_status
await ws.send(json.dumps({"type": "set_tap_stream", "enable": True, "interval_ms": 16}))
await ws.recv() # tap_stream_status
while True:
msg = json.loads(await ws.recv())
if msg.get("type") == "tap" and msg.get("events"):
for e in msg["events"]:
print(e["client_id"], e["kind"], "age", e.get("age_ms"))
asyncio.run(main())
```json
{
"type": "battery_status",
"success": true,
"samples": [
{
"client_id": 16,
"lipo1": {"valid": true, "voltage_mv": 3850, "percent": 71},
"lipo2": {"valid": false},
"age_ms": 1200
}
]
}
```
---
## Dashboard WebSocket (`:8080/ws`)
Read-only from the browsers perspective: the server pushes JSON whenever state changes. Clients do not send commands on this socket (messages are ignored).
Payload shape: `DashboardState``updated_at`, `serial_port`, `uart_connected`, `live_stream`, `master`, `clients[]` (id, mac, accel, tap notify flags, battery, etc.). Accel/tap samples appear here when **Live stream** is enabled in the UI (`PUT /api/live-stream`).
During OTA, additional messages with `"type":"ota_progress"` may appear on the same socket.
Configure slaves via REST on `:8080` ([`API_REST.md`](API_REST.md)), not via this WebSocket.