diff --git a/goTool/api_stream.go b/goTool/api_stream.go index 3d3b6d1..0a17bb3 100644 --- a/goTool/api_stream.go +++ b/goTool/api_stream.go @@ -9,55 +9,61 @@ import ( "sync" "time" - "github.com/gorilla/websocket" "powerpod/gotool/pb" + + "github.com/gorilla/websocket" ) const ( defaultAccelStreamInterval = 16 * time.Millisecond + defaultPreFetchMs = 2 minAPIStreamInterval = 1 * time.Millisecond maxAPIStreamInterval = 10 * time.Second // How long tap events stay in API push/cache after first sight (matches dashboard). apiTapDisplayMinMs = 2000 ) -// AccelClientSample is one slave's cached accel on the master. -type AccelClientSample struct { - ClientID uint32 `json:"client_id"` - Valid bool `json:"valid"` - X int32 `json:"x,omitempty"` - Y int32 `json:"y,omitempty"` - Z int32 `json:"z,omitempty"` - AgeMs uint32 `json:"age_ms,omitempty"` +// InputClientSample is one slave's cached accel + tap state on the master. +type InputClientSample struct { + ClientID uint32 `json:"client_id"` + Valid bool `json:"valid"` + X int32 `json:"x,omitempty"` + Y int32 `json:"y,omitempty"` + Z int32 `json:"z,omitempty"` + AccelAgeMs uint32 `json:"accel_age_ms,omitempty"` + TapKind string `json:"tap_kind"` + TapAgeMs uint32 `json:"tap_age_ms,omitempty"` } -// AccelStreamMessage is sent to external WebSocket clients (hello + accel samples). -type AccelStreamMessage struct { - Type string `json:"type"` // "hello" | "accel" +// InputStreamMessage is sent to external WebSocket clients (hello + input samples). +type InputStreamMessage struct { + Type string `json:"type"` // "hello" | "input" Serial string `json:"serial_port,omitempty"` IntervalMs int `json:"interval_ms,omitempty"` + PreFetchMs int `json:"pre_fetch_ms,omitempty"` TapDisplayMinMs int `json:"tap_display_min_ms,omitempty"` Commands []string `json:"commands,omitempty"` Note string `json:"note,omitempty"` T int64 `json:"t,omitempty"` // Unix nanoseconds Success bool `json:"success,omitempty"` - Clients []AccelClientSample `json:"clients,omitempty"` + Clients []InputClientSample `json:"clients,omitempty"` Error string `json:"error,omitempty"` } // StreamStatusMessage is the reply to set_stream / get_stream (this connection). type StreamStatusMessage struct { - Type string `json:"type"` // "stream_status" - ReceiveAccel bool `json:"receive_accel"` - IntervalMs int `json:"interval_ms"` - Success bool `json:"success"` - Error string `json:"error,omitempty"` + Type string `json:"type"` // "stream_status" + ReceiveInput bool `json:"receive_input"` + IntervalMs int `json:"interval_ms"` + PreFetch int `json:"pre_fetch"` + Success bool `json:"success"` + Error string `json:"error,omitempty"` } -// AccelStreamStatusMessage is the reply to set_accel_stream / get_accel_stream (slave). -type AccelStreamStatusMessage struct { - Type string `json:"type"` // "accel_stream_status" +// InputStreamStatusMessage is the reply to set_input_stream / get_input_stream (slave). +type InputStreamStatusMessage struct { + Type string `json:"type"` // "input_stream_status" ClientID uint32 `json:"client_id"` Enabled bool `json:"enabled"` Success bool `json:"success"` @@ -65,33 +71,6 @@ type AccelStreamStatusMessage struct { Error string `json:"error,omitempty"` } -// TapClientEvent is one tap visible to API clients (fresh or within tap_display_min_ms). -type TapClientEvent struct { - ClientID uint32 `json:"client_id"` - Valid bool `json:"valid"` - Kind string `json:"kind,omitempty"` // single | double | triple - AgeMs uint32 `json:"age_ms,omitempty"` - ShownAtMs int64 `json:"shown_at_ms,omitempty"` // Unix ms when API first saw this tap -} - -// TapStreamMessage is pushed to external WebSocket clients when receive_tap is on. -type TapStreamMessage struct { - Type string `json:"type"` // "tap" - T int64 `json:"t,omitempty"` - Success bool `json:"success,omitempty"` - Events []TapClientEvent `json:"events,omitempty"` - Error string `json:"error,omitempty"` -} - -// TapStreamStatusMessage is the reply to set_tap_stream / get_tap_stream (this connection). -type TapStreamStatusMessage struct { - Type string `json:"type"` // "tap_stream_status" - ReceiveTap bool `json:"receive_tap"` - IntervalMs int `json:"interval_ms"` - Success bool `json:"success"` - Error string `json:"error,omitempty"` -} - // APIClientInfo is one registered slave (or slot) from CLIENT_INFO. type APIClientInfo struct { ID uint32 `json:"id"` @@ -101,7 +80,7 @@ type APIClientInfo struct { Used bool `json:"used"` LastPing uint32 `json:"last_ping"` LastSuccessPing uint32 `json:"last_success_ping"` - AccelStream bool `json:"accel_stream"` + InputStream bool `json:"input_stream"` TapNotifySingle bool `json:"tap_notify_single"` TapNotifyDouble bool `json:"tap_notify_double"` TapNotifyTriple bool `json:"tap_notify_triple"` @@ -132,6 +111,7 @@ type accelWSCommand struct { ClientID uint32 `json:"client_id"` Enable *bool `json:"enable"` IntervalMs *int `json:"interval_ms"` + PreFetch *int `json:"pre_fetch"` Single *bool `json:"single"` DoubleTap *bool `json:"double_tap"` Triple *bool `json:"triple"` @@ -144,6 +124,7 @@ type APIInfoResponse struct { SerialPort string `json:"serial_port"` WebSocket string `json:"websocket"` DefaultIntervalMs int `json:"default_interval_ms"` + DefaultPreFetchMs int `json:"default_pre_fetch_ms"` MinIntervalMs int `json:"min_interval_ms"` MaxIntervalMs int `json:"max_interval_ms"` TapDisplayMinMs int `json:"tap_display_min_ms"` @@ -153,21 +134,28 @@ type APIInfoResponse struct { type cachedTapEvent struct { kind string shownAt time.Time + ageMs uint32 } type wsSubscriber struct { conn *websocket.Conn - receiveAccel bool - receiveTap bool + receiveInput bool interval time.Duration - lastAccelSent time.Time - lastTapSent time.Time + preFetch time.Duration + lastInputSent time.Time +} + +type pendingInputCache struct { + cache *pb.CacheStatusResponse + readAt time.Time + readErr error } type accelStreamHub struct { mu sync.RWMutex clients map[*websocket.Conn]*wsSubscriber defaultInterval time.Duration + defaultPreFetch time.Duration configChanged chan struct{} recentTaps map[uint32]cachedTapEvent } @@ -176,6 +164,7 @@ func newAccelStreamHub(defaultInterval time.Duration) *accelStreamHub { return &accelStreamHub{ clients: make(map[*websocket.Conn]*wsSubscriber), defaultInterval: defaultInterval, + defaultPreFetch: defaultPreFetchMs * time.Millisecond, configChanged: make(chan struct{}, 1), } } @@ -197,26 +186,39 @@ func clampAPIInterval(d time.Duration) time.Duration { return d } +func clampPreFetch(d time.Duration) time.Duration { + if d < 0 { + return 0 + } + if d > maxAPIStreamInterval { + return maxAPIStreamInterval + } + return d +} + func (h *accelStreamHub) register(conn *websocket.Conn, portName string) *wsSubscriber { sub := &wsSubscriber{ conn: conn, - receiveAccel: false, + receiveInput: false, interval: h.defaultInterval, + preFetch: h.defaultPreFetch, } h.mu.Lock() h.clients[conn] = sub h.mu.Unlock() - hello := AccelStreamMessage{ + hello := InputStreamMessage{ Type: "hello", Serial: portName, IntervalMs: int(h.defaultInterval / time.Millisecond), + PreFetchMs: int(h.defaultPreFetch / time.Millisecond), TapDisplayMinMs: apiTapDisplayMinMs, - Note: "set_tap_notify configures slave S/D/T only; set_tap_stream enables tap polling/push", + Note: "set_tap_notify configures slave S/D/T only; set_stream enables input polling/push on this connection", Commands: []string{ "list_clients", - "set_stream", "get_stream", "set_accel_stream", "get_accel_stream", - "set_tap_stream", "get_tap_stream", "set_tap_notify", "get_tap_notify", + "set_stream", "get_stream", + "set_input_stream", "get_input_stream", + "set_tap_notify", "get_tap_notify", "set_led_ring", "get_battery", }, } @@ -229,36 +231,25 @@ func (h *accelStreamHub) register(conn *websocket.Conn, portName string) *wsSubs func (h *accelStreamHub) unregister(conn *websocket.Conn) { h.mu.Lock() delete(h.clients, conn) - anyTap := false + anyInput := false for _, sub := range h.clients { - if sub.receiveTap { - anyTap = true + if sub.receiveInput { + anyInput = true break } } - if !anyTap { + if !anyInput { h.recentTaps = nil } h.mu.Unlock() h.notifyConfigChanged() } -func (h *accelStreamHub) anyWantsAccel() bool { +func (h *accelStreamHub) anyWantsInput() bool { h.mu.RLock() defer h.mu.RUnlock() for _, sub := range h.clients { - if sub.receiveAccel { - return true - } - } - return false -} - -func (h *accelStreamHub) anyWantsTap() bool { - h.mu.RLock() - defer h.mu.RUnlock() - for _, sub := range h.clients { - if sub.receiveTap { + if sub.receiveInput { return true } } @@ -270,7 +261,7 @@ func (h *accelStreamHub) minWantedInterval() time.Duration { defer h.mu.RUnlock() var min time.Duration for _, sub := range h.clients { - if !sub.receiveAccel && !sub.receiveTap { + if !sub.receiveInput { continue } if min == 0 || sub.interval < min { @@ -283,20 +274,28 @@ func (h *accelStreamHub) minWantedInterval() time.Duration { return min } -func (h *accelStreamHub) setStream(sub *wsSubscriber, enable bool, intervalMs *int) StreamStatusMessage { +func (h *accelStreamHub) setStream(sub *wsSubscriber, enable bool, intervalMs, preFetchMs *int) StreamStatusMessage { h.mu.Lock() - sub.receiveAccel = enable + sub.receiveInput = enable + if !enable { + h.recentTaps = nil + } if intervalMs != nil { sub.interval = clampAPIInterval(time.Duration(*intervalMs) * time.Millisecond) } + if preFetchMs != nil { + sub.preFetch = clampPreFetch(time.Duration(*preFetchMs) * time.Millisecond) + } ms := int(sub.interval / time.Millisecond) + pf := int(sub.preFetch / time.Millisecond) h.mu.Unlock() h.notifyConfigChanged() return StreamStatusMessage{ Type: "stream_status", - ReceiveAccel: enable, + ReceiveInput: enable, IntervalMs: ms, + PreFetch: pf, Success: true, } } @@ -306,45 +305,47 @@ func (h *accelStreamHub) getStream(sub *wsSubscriber) StreamStatusMessage { defer h.mu.RUnlock() return StreamStatusMessage{ Type: "stream_status", - ReceiveAccel: sub.receiveAccel, + ReceiveInput: sub.receiveInput, IntervalMs: int(sub.interval / time.Millisecond), + PreFetch: int(sub.preFetch / time.Millisecond), Success: true, } } -func (h *accelStreamHub) setTapStream(sub *wsSubscriber, enable bool, intervalMs *int) TapStreamStatusMessage { - h.mu.Lock() - sub.receiveTap = enable - if !enable { - h.recentTaps = nil - } - if intervalMs != nil { - sub.interval = clampAPIInterval(time.Duration(*intervalMs) * time.Millisecond) - } - ms := int(sub.interval / time.Millisecond) - h.mu.Unlock() - h.notifyConfigChanged() - - return TapStreamStatusMessage{ - Type: "tap_stream_status", - ReceiveTap: enable, - IntervalMs: ms, - Success: true, - } -} - -func (h *accelStreamHub) getTapStream(sub *wsSubscriber) TapStreamStatusMessage { +func (h *accelStreamHub) streamTiming(now time.Time) (needRead, needDeliver bool, waitPreFetch time.Duration) { h.mu.RLock() defer h.mu.RUnlock() - return TapStreamStatusMessage{ - Type: "tap_stream_status", - ReceiveTap: sub.receiveTap, - IntervalMs: int(sub.interval / time.Millisecond), - Success: true, + for _, sub := range h.clients { + if !sub.receiveInput { + continue + } + if sub.lastInputSent.IsZero() { + needRead = true + needDeliver = true + if sub.preFetch > waitPreFetch { + waitPreFetch = sub.preFetch + } + continue + } + nextPush := sub.lastInputSent.Add(sub.interval) + readAt := nextPush.Add(-sub.preFetch) + if !now.Before(readAt) { + needRead = true + } + if !now.Before(nextPush) { + needDeliver = true + if sub.preFetch > waitPreFetch { + waitPreFetch = sub.preFetch + } + } } + return needRead, needDeliver, waitPreFetch } -func (h *accelStreamHub) ingestTapEvents(incoming []TapClientEvent) []TapClientEvent { +func (h *accelStreamHub) ingestTapFromCache(cache *pb.CacheStatusResponse) { + if cache == nil { + return + } h.mu.Lock() defer h.mu.Unlock() @@ -352,39 +353,67 @@ func (h *accelStreamHub) ingestTapEvents(incoming []TapClientEvent) []TapClientE if h.recentTaps == nil { h.recentTaps = make(map[uint32]cachedTapEvent) } - for _, e := range incoming { - if !e.Valid || e.Kind == "" { + for _, c := range cache.GetClients() { + t := c.GetTap() + if t == nil { continue } - h.recentTaps[e.ClientID] = cachedTapEvent{kind: e.Kind, shownAt: now} + kind := tapKindLabelPB(t.GetKind()) + if kind == "" { + continue + } + h.recentTaps[c.GetClientId()] = cachedTapEvent{ + kind: kind, + shownAt: now, + ageMs: t.GetAgeMs(), + } } - return h.activeTapEventsLocked(now) + h.pruneRecentTapsLocked(now) } -func (h *accelStreamHub) activeTapEventsLocked(now time.Time) []TapClientEvent { +func (h *accelStreamHub) pruneRecentTapsLocked(now time.Time) { if len(h.recentTaps) == 0 { - return nil + return } cutoff := now.Add(-apiTapDisplayMinMs * time.Millisecond) - out := make([]TapClientEvent, 0, len(h.recentTaps)) for id, ev := range h.recentTaps { if ev.shownAt.Before(cutoff) { delete(h.recentTaps, id) - continue } - shownAtMs := ev.shownAt.UnixMilli() - out = append(out, TapClientEvent{ - ClientID: id, - Valid: true, - Kind: ev.kind, - AgeMs: uint32(now.Sub(ev.shownAt).Milliseconds()), - ShownAtMs: shownAtMs, - }) + } +} + +func (h *accelStreamHub) inputClientsFromCacheLocked(cache *pb.CacheStatusResponse, now time.Time) []InputClientSample { + h.pruneRecentTapsLocked(now) + out := make([]InputClientSample, 0, len(cache.GetClients())) + for _, c := range cache.GetClients() { + sample := InputClientSample{ + ClientID: c.GetClientId(), + TapKind: "none", + } + if a := c.GetAccel(); a != nil { + sample.Valid = a.GetValid() + if a.GetValid() { + sample.X = a.GetX() + sample.Y = a.GetY() + sample.Z = a.GetZ() + sample.AccelAgeMs = a.GetAgeMs() + } + } + if ev, ok := h.recentTaps[c.GetClientId()]; ok { + sample.TapKind = ev.kind + if t := c.GetTap(); t != nil && tapKindLabelPB(t.GetKind()) == ev.kind { + sample.TapAgeMs = t.GetAgeMs() + } else { + sample.TapAgeMs = ev.ageMs + uint32(now.Sub(ev.shownAt).Milliseconds()) + } + } + out = append(out, sample) } return out } -func (h *accelStreamHub) deliver(msg AccelStreamMessage) { +func (h *accelStreamHub) deliverInput(msg InputStreamMessage) { data, err := json.Marshal(msg) if err != nil { return @@ -394,13 +423,13 @@ func (h *accelStreamHub) deliver(msg AccelStreamMessage) { h.mu.Lock() defer h.mu.Unlock() for conn, sub := range h.clients { - if !sub.receiveAccel { + if !sub.receiveInput { continue } - if !sub.lastAccelSent.IsZero() && now.Sub(sub.lastAccelSent) < sub.interval { + if !sub.lastInputSent.IsZero() && now.Sub(sub.lastInputSent) < sub.interval { continue } - sub.lastAccelSent = now + sub.lastInputSent = now if err := conn.WriteMessage(websocket.TextMessage, data); err != nil { delete(h.clients, conn) _ = conn.Close() @@ -408,155 +437,79 @@ func (h *accelStreamHub) deliver(msg AccelStreamMessage) { } } -func (h *accelStreamHub) deliverTap(msg TapStreamMessage) { - data, err := json.Marshal(msg) - if err != nil { - return - } +func runInputStreamer(link *managedSerial, hub *accelStreamHub, dash *wsHub, ctl *accelStreamCtl, tapCtl *tapNotifyCtl, stop <-chan struct{}) { + ticker := time.NewTicker(minAPIStreamInterval) + defer ticker.Stop() - now := time.Now() - h.mu.Lock() - defer h.mu.Unlock() - for conn, sub := range h.clients { - if !sub.receiveTap { - continue - } - if !sub.lastTapSent.IsZero() && now.Sub(sub.lastTapSent) < sub.interval { - continue - } - sub.lastTapSent = now - if err := conn.WriteMessage(websocket.TextMessage, data); err != nil { - delete(h.clients, conn) - _ = conn.Close() - } - } -} - -func runAccelStreamer(link *managedSerial, hub *accelStreamHub, dash *wsHub, ctl *accelStreamCtl, tapCtl *tapNotifyCtl, stop <-chan struct{}) { - var ticker *time.Ticker - var tick <-chan time.Time - - resetTicker := func() { - if ticker != nil { - ticker.Stop() - } - interval := hub.minWantedInterval() - ticker = time.NewTicker(interval) - tick = ticker.C - } - resetTicker() - defer func() { - if ticker != nil { - ticker.Stop() - } - }() + var pending *pendingInputCache for { select { case <-stop: return case <-hub.configChanged: - resetTicker() - case <-tick: - wantAccel := hub.anyWantsAccel() && accelStreamPollingActive(dash, ctl) - wantTap := hub.anyWantsTap() - if !wantAccel && !wantTap { + pending = nil + case now := <-ticker.C: + if !hub.anyWantsInput() || !inputPollingActive(dash, ctl, tapCtl) { + pending = nil continue } - now := time.Now().UnixNano() - cache, err := link.readCacheStatusPoll() - if errors.Is(err, errUARTBusy) { - if wantAccel { - hub.deliver(AccelStreamMessage{ - Type: "accel", - T: now, - Success: false, - Error: "uart busy", - }) + needRead, needDeliver, waitPreFetch := hub.streamTiming(now) + + if needRead && pending == nil { + cache, err := link.readCacheStatusPoll() + if err != nil { + pending = &pendingInputCache{readErr: err, readAt: now} + } else { + hub.ingestTapFromCache(cache) + pending = &pendingInputCache{cache: cache, readAt: now} } - if wantTap { - hub.deliverTap(TapStreamMessage{ - Type: "tap", - T: now, - Success: false, - Error: "uart busy", - }) - } - continue } - if err != nil { - if wantAccel { - hub.deliver(AccelStreamMessage{ - Type: "accel", - T: now, - Success: false, - Error: err.Error(), - }) - } - if wantTap { - hub.deliverTap(TapStreamMessage{ - Type: "tap", - T: now, - Success: false, - Error: err.Error(), - }) - } + + if !needDeliver || pending == nil { continue } - if wantAccel { - samples := accelSamplesFromCacheStatus(cache) - clients := make([]AccelClientSample, 0, len(samples)) - for _, s := range samples { - clients = append(clients, AccelClientSample{ - ClientID: s.GetClientId(), - Valid: s.GetValid(), - X: s.GetX(), - Y: s.GetY(), - Z: s.GetZ(), - AgeMs: s.GetAgeMs(), - }) + ts := now.UnixNano() + if pending.readErr != nil { + errMsg := pending.readErr.Error() + if errors.Is(pending.readErr, errUARTBusy) { + errMsg = "uart busy" } - hub.deliver(AccelStreamMessage{ - Type: "accel", - T: now, + hub.deliverInput(InputStreamMessage{ + Type: "input", + T: ts, + Success: false, + Error: errMsg, + }) + pending = nil + continue + } + + if pending.cache != nil && now.Sub(pending.readAt) >= waitPreFetch { + hub.mu.RLock() + clients := hub.inputClientsFromCacheLocked(pending.cache, now) + hub.mu.RUnlock() + hub.deliverInput(InputStreamMessage{ + Type: "input", + T: ts, Success: true, Clients: clients, }) - } - if wantTap { - events := tapEventsFromCacheStatus(cache) - fresh := make([]TapClientEvent, 0, len(events)) - for _, e := range events { - if !e.GetValid() { - continue - } - fresh = append(fresh, TapClientEvent{ - ClientID: e.GetClientId(), - Valid: true, - Kind: tapKindLabelPB(e.GetKind()), - AgeMs: e.GetAgeMs(), - }) - } - visible := hub.ingestTapEvents(fresh) - if len(visible) > 0 { - hub.deliverTap(TapStreamMessage{ - Type: "tap", - T: now, - Success: true, - Events: visible, - }) - } + pending = nil } } } } -func accelStreamPollingActive(dash *wsHub, ctl *accelStreamCtl) bool { +func inputPollingActive(dash *wsHub, ctl *accelStreamCtl, tapCtl *tapNotifyCtl) bool { if ctl != nil && ctl.Any() { return true } + if tapCtl != nil && tapCtl.Any() { + return true + } return dash != nil && dash.anyAccelStreamEnabled() } @@ -586,9 +539,9 @@ func writeLedRingStatus(conn *websocket.Conn, out ledRingAPIResponse) { _ = conn.WriteMessage(websocket.TextMessage, data) } -func writeAccelStreamStatus(conn *websocket.Conn, out accelStreamAPIResponse) { - msg := AccelStreamStatusMessage{ - Type: "accel_stream_status", +func writeInputStreamStatus(conn *websocket.Conn, out accelStreamAPIResponse) { + msg := InputStreamStatusMessage{ + Type: "input_stream_status", ClientID: out.ClientID, Enabled: out.Enabled, Success: out.Success, @@ -602,14 +555,6 @@ func writeAccelStreamStatus(conn *websocket.Conn, out accelStreamAPIResponse) { _ = conn.WriteMessage(websocket.TextMessage, data) } -func writeTapStreamStatus(conn *websocket.Conn, msg TapStreamStatusMessage) { - data, err := json.Marshal(msg) - if err != nil { - return - } - _ = conn.WriteMessage(websocket.TextMessage, data) -} - func clientInfoToAPI(c *pb.ClientInfo) APIClientInfo { return APIClientInfo{ ID: c.GetId(), @@ -619,7 +564,7 @@ func clientInfoToAPI(c *pb.ClientInfo) APIClientInfo { Used: c.GetUsed(), LastPing: c.GetLastPing(), LastSuccessPing: c.GetLastSuccessPing(), - AccelStream: c.GetAccelStreamEnabled(), + InputStream: c.GetAccelStreamEnabled(), TapNotifySingle: c.GetTapNotifySingle(), TapNotifyDouble: c.GetTapNotifyDouble(), TapNotifyTriple: c.GetTapNotifyTriple(), @@ -717,28 +662,28 @@ func handleAccelWSCommand(conn *websocket.Conn, sub *wsSubscriber, data []byte, }) return } - writeStreamStatus(conn, hub.setStream(sub, *cmd.Enable, cmd.IntervalMs)) + writeStreamStatus(conn, hub.setStream(sub, *cmd.Enable, cmd.IntervalMs, cmd.PreFetch)) case "get_stream": writeStreamStatus(conn, hub.getStream(sub)) - case "set_accel_stream": + case "set_input_stream": if cmd.ClientID == 0 { - writeAccelStreamStatus(conn, accelStreamAPIResponse{Error: "client_id required"}) + writeInputStreamStatus(conn, accelStreamAPIResponse{Error: "client_id required"}) return } if cmd.Enable == nil { - writeAccelStreamStatus(conn, accelStreamAPIResponse{ + writeInputStreamStatus(conn, accelStreamAPIResponse{ ClientID: cmd.ClientID, Error: "enable required", }) return } - writeAccelStreamStatus(conn, applyAccelStreamClient(link, dash, ctl, cmd.ClientID, *cmd.Enable)) + writeInputStreamStatus(conn, applyAccelStreamClient(link, dash, ctl, cmd.ClientID, *cmd.Enable)) - case "get_accel_stream": + case "get_input_stream": if cmd.ClientID == 0 { - writeAccelStreamStatus(conn, accelStreamAPIResponse{Error: "client_id required"}) + writeInputStreamStatus(conn, accelStreamAPIResponse{Error: "client_id required"}) return } resp, err := link.AccelStreamPoll(&pb.AccelStreamRequest{ @@ -746,7 +691,7 @@ func handleAccelWSCommand(conn *websocket.Conn, sub *wsSubscriber, data []byte, ClientId: cmd.ClientID, }) if err != nil { - writeAccelStreamStatus(conn, accelStreamAPIResponse{ + writeInputStreamStatus(conn, accelStreamAPIResponse{ ClientID: cmd.ClientID, Error: err.Error(), }) @@ -755,25 +700,12 @@ func handleAccelWSCommand(conn *websocket.Conn, sub *wsSubscriber, data []byte, if ctl != nil { ctl.Set(cmd.ClientID, resp.GetEnabled()) } - writeAccelStreamStatus(conn, accelStreamAPIResponse{ + writeInputStreamStatus(conn, accelStreamAPIResponse{ Enabled: resp.GetEnabled(), ClientID: resp.GetClientId(), Success: resp.GetSuccess(), }) - case "set_tap_stream": - if cmd.Enable == nil { - writeTapStreamStatus(conn, TapStreamStatusMessage{ - Type: "tap_stream_status", - Error: "enable required", - }) - return - } - writeTapStreamStatus(conn, hub.setTapStream(sub, *cmd.Enable, cmd.IntervalMs)) - - case "get_tap_stream": - writeTapStreamStatus(conn, hub.getTapStream(sub)) - case "set_tap_notify": if cmd.AllClients { if cmd.Single == nil || cmd.DoubleTap == nil || cmd.Triple == nil { @@ -827,11 +759,11 @@ func handleAccelWSCommand(conn *websocket.Conn, sub *wsSubscriber, data []byte, tapCtl.Set(cmd.ClientID, resp.GetSingle(), resp.GetDoubleTap(), resp.GetTriple()) } writeTapNotifyStatus(conn, tapNotifyAPIResponse{ - ClientID: cmd.ClientID, - Success: resp.GetSuccess(), - Single: resp.GetSingle(), + ClientID: cmd.ClientID, + Success: resp.GetSuccess(), + Single: resp.GetSingle(), DoubleTap: resp.GetDoubleTap(), - Triple: resp.GetTriple(), + Triple: resp.GetTriple(), }) case "set_led_ring": @@ -860,7 +792,7 @@ func handleAccelWSCommand(conn *websocket.Conn, sub *wsSubscriber, data []byte, default: writeStreamStatus(conn, StreamStatusMessage{ Type: "stream_status", - Error: "unknown type (list_clients, set_stream, get_stream, set_accel_stream, get_accel_stream, set_tap_stream, get_tap_stream, set_tap_notify, get_tap_notify, set_led_ring, get_battery)", + Error: "unknown type (list_clients, set_stream, get_stream, set_input_stream, get_input_stream, set_tap_notify, get_tap_notify, set_led_ring, get_battery)", }) } } @@ -896,10 +828,11 @@ func mountExternalAPI(mux *http.ServeMux, portName string, defaultInterval time. SerialPort: portName, WebSocket: "/ws", DefaultIntervalMs: defMs, + DefaultPreFetchMs: defaultPreFetchMs, MinIntervalMs: int(minAPIStreamInterval / time.Millisecond), MaxIntervalMs: int(maxAPIStreamInterval / time.Millisecond), TapDisplayMinMs: apiTapDisplayMinMs, - Description: "WebSocket: set_accel_stream + set_stream for accel; set_tap_notify (slave S/D/T) then set_tap_stream for tap events (shown ≥2s)", + Description: "WebSocket: set_input_stream + set_stream for input (accel + tap); set_tap_notify configures slave tap kinds", }) }) @@ -915,7 +848,7 @@ func mountExternalAPI(mux *http.ServeMux, portName string, defaultInterval time. func runAPIServer(portName string, link *managedSerial, addr string, defaultInterval time.Duration, dash *wsHub, ctl *accelStreamCtl, tapCtl *tapNotifyCtl, stop <-chan struct{}) *http.Server { hub := newAccelStreamHub(defaultInterval) - go runAccelStreamer(link, hub, dash, ctl, tapCtl, stop) + go runInputStreamer(link, hub, dash, ctl, tapCtl, stop) mux := http.NewServeMux() mountExternalAPI(mux, portName, defaultInterval, hub, link, dash, ctl, tapCtl) @@ -924,7 +857,7 @@ func runAPIServer(portName string, link *managedSerial, addr string, defaultInte srv := &http.Server{Addr: addr, Handler: mux} go func() { - log.Printf("external API http://localhost%s WebSocket ws://localhost%s/ws (default stream interval %s, per-client via set_stream / set_tap_stream)", + log.Printf("external API http://localhost%s WebSocket ws://localhost%s/ws (default stream interval %s, per-client via set_stream)", addr, addr, defaultInterval.String()) if err := srv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { log.Printf("external API server: %v", err) diff --git a/goTool/docs/API_WEBSOCKET.md b/goTool/docs/API_WEBSOCKET.md index c7613e9..c263071 100644 --- a/goTool/docs/API_WEBSOCKET.md +++ b/goTool/docs/API_WEBSOCKET.md @@ -1,15 +1,10 @@ # WebSocket API -`go run . -port /dev/ttyUSB0 serve` exposes two WebSocket endpoints. They share the same UART link but serve different purposes. +`go run . -port /dev/ttyUSB0 serve` exposes the WebSocket enpoint | URL | Port (default) | Role | |-----|----------------|------| -| `ws://localhost:8080/ws` | Dashboard (`-addr`) | Server → client only: full `DashboardState` JSON (~2 s poll + live-stream accel/tap) | -| `ws://localhost:8081/ws` | External API (`-api-addr`) | Request/response commands + optional **accel** / **tap** push streams | - -Disable the external server with `-api-addr ""`. - -CLI overview and UART commands: [`../README.md`](../README.md). HTTP endpoints: [`API_REST.md`](API_REST.md). +| `ws://localhost:8081/ws` | External API (`-api-addr`) | Request/response commands + optional **input** push stream | --- @@ -19,44 +14,49 @@ CLI overview and UART commands: [`../README.md`](../README.md). HTTP endpoints: 1. Connect → server sends **`hello`** (receive off; lists available commands). 2. Send JSON commands → server replies with a matching `*_status` or `client_list` message (one reply per command). -3. After `set_stream` / `set_tap_stream` with `enable: true`, the server may send **`accel`** and/or **`tap`** messages **without** a prior command (push stream). +3. After `set_stream` with `enable: true`, the server may send **`input`** messages **without** a prior command (push stream). Commands and stream pushes are multiplexed on one socket. While streaming, always parse `type` and branch (status vs sample vs error). -### Two layers (accel and tap) +### Two layers (firmware vs host) | Layer | Commands | Effect | |-------|----------|--------| -| **Firmware (ESP-NOW)** | `set_accel_stream`, `set_tap_notify` | Per `client_id`: slave sends accel or tap kinds to the master | -| **This connection (host)** | `set_stream`, `set_tap_stream` | Whether **you** receive push JSON and at what rate (`interval_ms`, 1 ms … 10 s) | +| **Firmware (ESP-NOW)** | `set_input_stream`, `set_tap_notify` | Per `client_id`: slave sends accel samples and/or tap events to the master | +| **This connection (host)** | `set_stream` | Whether **you** receive push JSON, at what rate (`interval_ms`, 1 ms … 10 s), and how early the UART read starts (`pre_fetch`) | -- **Accel UART polling** runs only if at least one connection has `receive_accel: true` **and** at least one slave streams accel (`set_accel_stream` or dashboard). -- **Tap UART polling** runs only if at least one connection has `receive_tap: true` (`set_tap_stream`). `set_tap_notify` alone does **not** poll. +- **UART polling** runs only if at least one connection has `receive_input: true` (`set_stream`) **and** at least one slave streams input (`set_input_stream`) or has tap notify enabled (`set_tap_notify`). +- **`set_tap_notify` alone** configures which tap kinds the slave reports; it does **not** enable host push by itself — you still need `set_stream`. Typical sequence: 1. `list_clients` → slave IDs -2. Per slave: `set_accel_stream` / `set_tap_notify` as needed -3. `set_stream` and/or `set_tap_stream` with `"enable": true` -4. Read push messages in a loop +2. Per slave: `set_input_stream` and/or `set_tap_notify` as needed +3. `set_stream` with `"enable": true` +4. Read **`input`** messages in a loop -There is **no per-slave filter** on push messages: each `accel` contains all cached slaves; each `tap` contains all visible events. Filter by `client_id` in your app. +There is **no per-slave filter** on push messages: each `input` contains all cached slaves. Filter by `client_id` in your app. --- ## Push stream messages -These are the samples you get after enabling receive. Interval is per WebSocket connection; the server UART poll uses the **minimum** `interval_ms` among all subscribers that want accel or tap. +These are the samples you get after enabling receive. Timing is per WebSocket connection: -### `accel` (type `"accel"`) +- **`interval_ms`** — minimum time between consecutive `input` pushes on this socket. +- **`pre_fetch`** — milliseconds **before** each scheduled push when the host sends the UART cache read, so the master has time to collect data from all slaves before the JSON goes out. -Sent only when `set_stream` has `enable: true`, a slave streams accel, and the poll tick fires for this connection. +The server UART poll uses the **minimum** `interval_ms` among all subscribers with `receive_input: true`. -**Success** — all slaves with a cache entry on the master (not only those with `valid: true`): +### `input` (type `"input"`) + +Sent when `set_stream` has `enable: true` and the poll tick fires for this connection (after the UART read started `pre_fetch` ms earlier). Each message combines the latest accel cache and visible tap state for every slave slot on the master. + +**Success** — all slaves with a cache entry (not only those with `valid: true`): ```json { - "type": "accel", + "type": "input", "t": 1716900123456789012, "success": true, "clients": [ @@ -66,11 +66,14 @@ Sent only when `set_stream` has `enable: true`, a slave streams accel, and the p "x": 12, "y": -34, "z": 16384, - "age_ms": 8 + "accel_age_ms": 8, + "tap_kind": "single", + "tap_age_ms": 3 }, { "client_id": 42, - "valid": false + "valid": false, + "tap_kind": "none" } ] } @@ -82,15 +85,19 @@ Sent only when `set_stream` has `enable: true`, a slave streams accel, and the p | `success` | `true` if `CACHE_STATUS` succeeded | | `clients[]` | One entry per slave slot in the master cache | | `client_id` | ESP-NOW client id (same as `list_clients`) | -| `valid` | `false` if no sample yet or stale; omit `x`/`y`/`z` when false | +| `valid` | `false` if no accel sample yet or stale; omit `x`/`y`/`z` when false | | `x`, `y`, `z` | Raw accelerometer LSB (BMA456, ±2 g scale on the pod) | -| `age_ms` | Milliseconds since the master received this sample | +| `accel_age_ms` | Milliseconds since the master received this accel sample | +| `tap_kind` | `"none"`, `"single"`, `"double"`, or `"triple"` | +| `tap_age_ms` | Milliseconds since the tap was seen in the master cache; omit when `tap_kind` is `"none"` | + +Tap events stay visible for **`tap_display_min_ms`** (2000 ms, also in `hello`) after the API first saw them, even if the hardware age grows. **Failure** (e.g. UART busy): ```json { - "type": "accel", + "type": "input", "t": 1716900123456789012, "success": false, "error": "uart busy" @@ -99,53 +106,6 @@ Sent only when `set_stream` has `enable: true`, a slave streams accel, and the p No `clients` array on failure. -### `tap` (type `"tap"`) - -Sent only when `set_tap_stream` has `enable: true` and there is at least one event to show. - -Events appear when the master cache reports a new tap. Each event stays in push payloads for **`tap_display_min_ms`** (2000 ms, also in `hello`) after the API first saw it, even if the hardware age grows. - -**Success**: - -```json -{ - "type": "tap", - "t": 1716900123456789012, - "success": true, - "events": [ - { - "client_id": 16, - "valid": true, - "kind": "single", - "age_ms": 3, - "shown_at_ms": 1717000000123 - } - ] -} -``` - -| Field | Meaning | -|-------|---------| -| `t` | Unix timestamp in **nanoseconds** (poll time) | -| `events[]` | All taps currently “on screen” for the API | -| `client_id` | Slave that tapped | -| `kind` | `"single"`, `"double"`, or `"triple"` | -| `age_ms` | Age in the master cache when read | -| `shown_at_ms` | Unix **milliseconds** when this host first included the event | - -If no events are visible, **no** `tap` message is sent on that tick (unlike accel, which can send empty `clients` only on success with cache data). - -**Failure**: - -```json -{ - "type": "tap", - "t": 1716900123456789012, - "success": false, - "error": "uart busy" -} -``` - --- ## Commands (request → response) @@ -159,13 +119,13 @@ Send one JSON object per message. Field `type` selects the command. "type": "hello", "serial_port": "/dev/ttyUSB0", "interval_ms": 16, + "pre_fetch_ms": 2, "tap_display_min_ms": 2000, - "note": "set_tap_notify configures slave S/D/T only; set_tap_stream enables tap polling/push", + "note": "set_tap_notify configures slave S/D/T only; set_stream enables input polling/push on this connection", "commands": [ "list_clients", "set_stream", "get_stream", - "set_accel_stream", "get_accel_stream", - "set_tap_stream", "get_tap_stream", + "set_input_stream", "get_input_stream", "set_tap_notify", "get_tap_notify", "set_led_ring", "get_battery" ] @@ -191,7 +151,7 @@ Response `client_list`: "used": true, "last_ping": 1234, "last_success_ping": 1200, - "accel_stream": false, + "input_stream": false, "tap_notify_single": false, "tap_notify_double": false, "tap_notify_triple": false @@ -200,45 +160,38 @@ Response `client_list`: } ``` -### `set_stream` / `get_stream` (receive accel on this connection) +### `set_stream` / `get_stream` (receive input on this connection) ```json -{"type":"set_stream","enable":true,"interval_ms":32} +{"type":"set_stream","enable":true,"interval_ms":32,"pre_fetch":2} {"type":"get_stream"} ``` +| Field | Meaning | +|-------|---------| +| `enable` | Turn push stream on/off for this connection | +| `interval_ms` | Minimum time between `input` pushes (1 … 10000) | +| `pre_fetch` | Milliseconds before each push when the host starts the UART cache read; optional, default in `hello` (`pre_fetch_ms`) | + Response `stream_status`: ```json -{"type":"stream_status","receive_accel":true,"interval_ms":32,"success":true} +{"type":"stream_status","receive_input":true,"interval_ms":32,"pre_fetch":2,"success":true} ``` -### `set_accel_stream` / `get_accel_stream` (firmware, per slave) +### `set_input_stream` / `get_input_stream` (firmware, per slave) -`client_id` required (> 0). +`client_id` required (> 0). Enables accel streaming from the slave to the master. ```json -{"type":"set_accel_stream","client_id":16,"enable":true} -{"type":"get_accel_stream","client_id":16} +{"type":"set_input_stream","client_id":16,"enable":true} +{"type":"get_input_stream","client_id":16} ``` -Response `accel_stream_status`: +Response `input_stream_status`: ```json -{"type":"accel_stream_status","client_id":16,"enabled":true,"success":true} -``` - -### `set_tap_stream` / `get_tap_stream` (receive tap on this connection) - -```json -{"type":"set_tap_stream","enable":true,"interval_ms":16} -{"type":"get_tap_stream"} -``` - -Response `tap_stream_status`: - -```json -{"type":"tap_stream_status","receive_tap":true,"interval_ms":16,"success":true} +{"type":"input_stream_status","client_id":16,"enabled":true,"success":true} ``` ### `set_tap_notify` / `get_tap_notify` (firmware, per slave) @@ -266,83 +219,55 @@ Response `tap_notify_status`: ### `set_led_ring` -Same JSON body as [`POST /api/led-ring`](API_REST.md#led-ring) with `"type":"set_led_ring"` added. Reply: `led_ring_status`. +Control the LED ring on the master or a slave. + +```json +{"type":"set_led_ring","mode":"color","client_id":16,"r":255,"g":0,"b":0,"intensity":128} +{"type":"set_led_ring","mode":"digit","client_id":0,"digit":3,"r":0,"g":255,"b":0} +{"type":"set_led_ring","mode":"find-me","all_clients":true,"slaves_only":true} +``` + +| `mode` | Notes | +|--------|--------| +| `clear` | Turn off | +| `color` | Full ring RGB + `intensity` | +| `progress` | `progress` 0–100 | +| `digit` | `digit` 0–10 | +| `blink` | `blink_ms`, `blink_count` | +| `find-me` | Locate pod | + +Use `client_id` (`0` = master) or `all_clients` (+ optional `slaves_only`) for broadcast. + +Response `led_ring_status`: + +```json +{"type":"led_ring_status","success":true,"mode":5,"client_id":16,"slaves_updated":1} +``` ### `get_battery` -Body: `{"type":"get_battery","all_clients":true}` or `"client_id":16`. Default if omitted: all clients. +Read cached battery samples from the master. Slaves push battery every **30 s**; this command reads the master cache. -Reply: `battery_status` with `samples[]` (see REST doc). - ---- - -## Examples - -### Accel stream - -```python -import asyncio, json, websockets - -async def main(): - async with websockets.connect("ws://127.0.0.1:8081/ws") as ws: - print(await ws.recv()) # hello - await ws.send(json.dumps({"type": "list_clients"})) - clients = json.loads(await ws.recv())["clients"] - for c in clients: - if not c.get("available"): - continue - await ws.send(json.dumps({ - "type": "set_accel_stream", "client_id": c["id"], "enable": True - })) - await ws.recv() # accel_stream_status - await ws.send(json.dumps({"type": "set_stream", "enable": True, "interval_ms": 16})) - await ws.recv() # stream_status - while True: - msg = json.loads(await ws.recv()) - if msg.get("type") != "accel": - continue - if not msg.get("success"): - print("error:", msg.get("error")) - continue - for c in msg.get("clients", []): - if c.get("valid"): - print(c["client_id"], c["x"], c["y"], c["z"], "age", c.get("age_ms")) - -asyncio.run(main()) +```json +{"type":"get_battery","all_clients":true} +{"type":"get_battery","client_id":16} ``` -### Tap stream +Default if omitted: all clients. -```python -import asyncio, json, websockets +Response `battery_status`: -async def main(): - async with websockets.connect("ws://127.0.0.1:8081/ws") as ws: - print(await ws.recv()) # hello - await ws.send(json.dumps({ - "type": "set_tap_notify", "client_id": 16, - "single": True, "double_tap": False, "triple": False - })) - await ws.recv() # tap_notify_status - await ws.send(json.dumps({"type": "set_tap_stream", "enable": True, "interval_ms": 16})) - await ws.recv() # tap_stream_status - while True: - msg = json.loads(await ws.recv()) - if msg.get("type") == "tap" and msg.get("events"): - for e in msg["events"]: - print(e["client_id"], e["kind"], "age", e.get("age_ms")) - -asyncio.run(main()) +```json +{ + "type": "battery_status", + "success": true, + "samples": [ + { + "client_id": 16, + "lipo1": {"valid": true, "voltage_mv": 3850, "percent": 71}, + "lipo2": {"valid": false}, + "age_ms": 1200 + } + ] +} ``` - ---- - -## Dashboard WebSocket (`:8080/ws`) - -Read-only from the browser’s perspective: the server pushes JSON whenever state changes. Clients do not send commands on this socket (messages are ignored). - -Payload shape: `DashboardState` — `updated_at`, `serial_port`, `uart_connected`, `live_stream`, `master`, `clients[]` (id, mac, accel, tap notify flags, battery, etc.). Accel/tap samples appear here when **Live stream** is enabled in the UI (`PUT /api/live-stream`). - -During OTA, additional messages with `"type":"ota_progress"` may appear on the same socket. - -Configure slaves via REST on `:8080` ([`API_REST.md`](API_REST.md)), not via this WebSocket.