powerpods/goTool/api_stream.go

876 lines
24 KiB
Go

package main
import (
"context"
"encoding/json"
"errors"
"log"
"net/http"
"sync"
"time"
"powerpod/gotool/pb"
"github.com/gorilla/websocket"
)
const (
defaultAccelStreamInterval = 16 * time.Millisecond
defaultPreFetchMs = 2
minAPIStreamInterval = 1 * time.Millisecond
maxAPIStreamInterval = 10 * time.Second
// How long tap events stay in API push/cache after first sight (matches dashboard).
apiTapDisplayMinMs = 2000
)
// InputClientSample is one slave's cached accel + tap state on the master.
type InputClientSample struct {
ClientID uint32 `json:"client_id"`
Valid bool `json:"valid"`
X int32 `json:"x,omitempty"`
Y int32 `json:"y,omitempty"`
Z int32 `json:"z,omitempty"`
AccelAgeMs uint32 `json:"accel_age_ms,omitempty"`
TapKind string `json:"tap_kind,omitempty"`
TapAgeMs uint32 `json:"tap_age_ms,omitempty"`
}
// InputStreamMessage is sent to external WebSocket clients (hello + input samples).
type InputStreamMessage struct {
Type string `json:"type"` // "hello" | "input"
Serial string `json:"serial_port,omitempty"`
IntervalMs int `json:"interval_ms,omitempty"`
PreFetchMs int `json:"pre_fetch_ms,omitempty"`
TapDisplayMinMs int `json:"tap_display_min_ms,omitempty"`
Commands []string `json:"commands,omitempty"`
Note string `json:"note,omitempty"`
T int64 `json:"t,omitempty"` // Unix nanoseconds
Success bool `json:"success,omitempty"`
Clients []InputClientSample `json:"clients,omitempty"`
Error string `json:"error,omitempty"`
}
// StreamStatusMessage is the reply to set_stream / get_stream (this connection).
type StreamStatusMessage struct {
Type string `json:"type"` // "stream_status"
ReceiveInput bool `json:"receive_input"`
IntervalMs int `json:"interval_ms"`
PreFetch int `json:"pre_fetch"`
Success bool `json:"success"`
Error string `json:"error,omitempty"`
}
// InputStreamStatusMessage is the reply to set_input_stream / get_input_stream (slave).
type InputStreamStatusMessage struct {
Type string `json:"type"` // "input_stream_status"
ClientID uint32 `json:"client_id"`
Enabled bool `json:"enabled"`
Success bool `json:"success"`
SlavesUpdated uint32 `json:"slaves_updated,omitempty"`
Error string `json:"error,omitempty"`
}
// APIClientInfo is one registered slave (or slot) from CLIENT_INFO.
type APIClientInfo struct {
ID uint32 `json:"id"`
MAC string `json:"mac"`
Version uint32 `json:"version"`
Available bool `json:"available"`
Used bool `json:"used"`
LastPing uint32 `json:"last_ping"`
LastSuccessPing uint32 `json:"last_success_ping"`
InputStream bool `json:"input_stream"`
TapNotifySingle bool `json:"tap_notify_single"`
TapNotifyDouble bool `json:"tap_notify_double"`
TapNotifyTriple bool `json:"tap_notify_triple"`
}
// ClientListMessage is the reply to list_clients.
type ClientListMessage struct {
Type string `json:"type"` // "client_list"
Success bool `json:"success"`
Clients []APIClientInfo `json:"clients,omitempty"`
Error string `json:"error,omitempty"`
}
// TapNotifyStatusMessage is the reply to set_tap_notify / get_tap_notify (slave).
type TapNotifyStatusMessage struct {
Type string `json:"type"` // "tap_notify_status"
ClientID uint32 `json:"client_id"`
Single bool `json:"single"`
DoubleTap bool `json:"double_tap"`
Triple bool `json:"triple"`
Success bool `json:"success"`
SlavesUpdated uint32 `json:"slaves_updated,omitempty"`
Error string `json:"error,omitempty"`
}
type accelWSCommand struct {
Type string `json:"type"`
ClientID uint32 `json:"client_id"`
Enable *bool `json:"enable"`
IntervalMs *int `json:"interval_ms"`
PreFetch *int `json:"pre_fetch"`
Single *bool `json:"single"`
DoubleTap *bool `json:"double_tap"`
Triple *bool `json:"triple"`
AllClients bool `json:"all_clients"`
}
type APIInfoResponse struct {
Name string `json:"name"`
Version string `json:"version"`
SerialPort string `json:"serial_port"`
WebSocket string `json:"websocket"`
DefaultIntervalMs int `json:"default_interval_ms"`
DefaultPreFetchMs int `json:"default_pre_fetch_ms"`
MinIntervalMs int `json:"min_interval_ms"`
MaxIntervalMs int `json:"max_interval_ms"`
TapDisplayMinMs int `json:"tap_display_min_ms"`
Description string `json:"description"`
}
type cachedTapEvent struct {
kind string
shownAt time.Time
ageMs uint32
}
type wsSubscriber struct {
conn *websocket.Conn
receiveInput bool
interval time.Duration
preFetch time.Duration
lastInputSent time.Time
}
type pendingInputCache struct {
cache *pb.CacheStatusResponse
readAt time.Time
readErr error
}
type accelStreamHub struct {
mu sync.RWMutex
clients map[*websocket.Conn]*wsSubscriber
defaultInterval time.Duration
defaultPreFetch time.Duration
configChanged chan struct{}
recentTaps map[uint32]cachedTapEvent
}
func newAccelStreamHub(defaultInterval time.Duration) *accelStreamHub {
return &accelStreamHub{
clients: make(map[*websocket.Conn]*wsSubscriber),
defaultInterval: defaultInterval,
defaultPreFetch: defaultPreFetchMs * time.Millisecond,
configChanged: make(chan struct{}, 1),
}
}
func (h *accelStreamHub) notifyConfigChanged() {
select {
case h.configChanged <- struct{}{}:
default:
}
}
func clampAPIInterval(d time.Duration) time.Duration {
if d < minAPIStreamInterval {
return minAPIStreamInterval
}
if d > maxAPIStreamInterval {
return maxAPIStreamInterval
}
return d
}
func clampPreFetch(d time.Duration) time.Duration {
if d < 0 {
return 0
}
if d > maxAPIStreamInterval {
return maxAPIStreamInterval
}
return d
}
func (h *accelStreamHub) register(conn *websocket.Conn, portName string) *wsSubscriber {
sub := &wsSubscriber{
conn: conn,
receiveInput: false,
interval: h.defaultInterval,
preFetch: h.defaultPreFetch,
}
h.mu.Lock()
h.clients[conn] = sub
h.mu.Unlock()
hello := InputStreamMessage{
Type: "hello",
Serial: portName,
IntervalMs: int(h.defaultInterval / time.Millisecond),
PreFetchMs: int(h.defaultPreFetch / time.Millisecond),
TapDisplayMinMs: apiTapDisplayMinMs,
Note: "set_tap_notify configures slave S/D/T only; set_stream enables input polling/push on this connection",
Commands: []string{
"list_clients",
"set_stream", "get_stream",
"set_input_stream", "get_input_stream",
"set_tap_notify", "get_tap_notify",
"set_led_ring", "get_battery",
},
}
if data, err := json.Marshal(hello); err == nil {
_ = conn.WriteMessage(websocket.TextMessage, data)
}
return sub
}
func (h *accelStreamHub) unregister(conn *websocket.Conn) {
h.mu.Lock()
delete(h.clients, conn)
anyInput := false
for _, sub := range h.clients {
if sub.receiveInput {
anyInput = true
break
}
}
if !anyInput {
h.recentTaps = nil
}
h.mu.Unlock()
h.notifyConfigChanged()
}
func (h *accelStreamHub) anyWantsInput() bool {
h.mu.RLock()
defer h.mu.RUnlock()
for _, sub := range h.clients {
if sub.receiveInput {
return true
}
}
return false
}
func (h *accelStreamHub) minWantedInterval() time.Duration {
h.mu.RLock()
defer h.mu.RUnlock()
var min time.Duration
for _, sub := range h.clients {
if !sub.receiveInput {
continue
}
if min == 0 || sub.interval < min {
min = sub.interval
}
}
if min == 0 {
return h.defaultInterval
}
return min
}
func (h *accelStreamHub) setStream(sub *wsSubscriber, enable bool, intervalMs, preFetchMs *int) StreamStatusMessage {
h.mu.Lock()
sub.receiveInput = enable
if !enable {
h.recentTaps = nil
}
if intervalMs != nil {
sub.interval = clampAPIInterval(time.Duration(*intervalMs) * time.Millisecond)
}
if preFetchMs != nil {
sub.preFetch = clampPreFetch(time.Duration(*preFetchMs) * time.Millisecond)
}
ms := int(sub.interval / time.Millisecond)
pf := int(sub.preFetch / time.Millisecond)
h.mu.Unlock()
h.notifyConfigChanged()
return StreamStatusMessage{
Type: "stream_status",
ReceiveInput: enable,
IntervalMs: ms,
PreFetch: pf,
Success: true,
}
}
func (h *accelStreamHub) getStream(sub *wsSubscriber) StreamStatusMessage {
h.mu.RLock()
defer h.mu.RUnlock()
return StreamStatusMessage{
Type: "stream_status",
ReceiveInput: sub.receiveInput,
IntervalMs: int(sub.interval / time.Millisecond),
PreFetch: int(sub.preFetch / time.Millisecond),
Success: true,
}
}
func (h *accelStreamHub) streamTiming(now time.Time) (needRead, needDeliver bool, waitPreFetch time.Duration) {
h.mu.RLock()
defer h.mu.RUnlock()
for _, sub := range h.clients {
if !sub.receiveInput {
continue
}
if sub.lastInputSent.IsZero() {
needRead = true
needDeliver = true
if sub.preFetch > waitPreFetch {
waitPreFetch = sub.preFetch
}
continue
}
nextPush := sub.lastInputSent.Add(sub.interval)
readAt := nextPush.Add(-sub.preFetch)
if !now.Before(readAt) {
needRead = true
}
if !now.Before(nextPush) {
needDeliver = true
if sub.preFetch > waitPreFetch {
waitPreFetch = sub.preFetch
}
}
}
return needRead, needDeliver, waitPreFetch
}
func (h *accelStreamHub) ingestTapFromCache(cache *pb.CacheStatusResponse) {
if cache == nil {
return
}
h.mu.Lock()
defer h.mu.Unlock()
now := time.Now()
if h.recentTaps == nil {
h.recentTaps = make(map[uint32]cachedTapEvent)
}
for _, c := range cache.GetClients() {
t := c.GetTap()
if t == nil {
continue
}
kind := tapKindLabelPB(t.GetKind())
if kind == "" {
continue
}
h.recentTaps[c.GetClientId()] = cachedTapEvent{
kind: kind,
shownAt: now,
ageMs: t.GetAgeMs(),
}
}
h.pruneRecentTapsLocked(now)
}
func (h *accelStreamHub) pruneRecentTapsLocked(now time.Time) {
if len(h.recentTaps) == 0 {
return
}
cutoff := now.Add(-apiTapDisplayMinMs * time.Millisecond)
for id, ev := range h.recentTaps {
if ev.shownAt.Before(cutoff) {
delete(h.recentTaps, id)
}
}
}
func (h *accelStreamHub) inputClientsFromCacheLocked(cache *pb.CacheStatusResponse, now time.Time) []InputClientSample {
h.pruneRecentTapsLocked(now)
out := make([]InputClientSample, 0, len(cache.GetClients()))
for _, c := range cache.GetClients() {
sample := InputClientSample{
ClientID: c.GetClientId(),
}
if a := c.GetAccel(); a != nil {
sample.Valid = a.GetValid()
if a.GetValid() {
sample.X = a.GetX()
sample.Y = a.GetY()
sample.Z = a.GetZ()
sample.AccelAgeMs = a.GetAgeMs()
}
}
if ev, ok := h.recentTaps[c.GetClientId()]; ok {
sample.TapKind = ev.kind
if t := c.GetTap(); t != nil && tapKindLabelPB(t.GetKind()) == ev.kind {
sample.TapAgeMs = t.GetAgeMs()
} else {
sample.TapAgeMs = ev.ageMs + uint32(now.Sub(ev.shownAt).Milliseconds())
}
}
out = append(out, sample)
}
return out
}
func (h *accelStreamHub) deliverInput(msg InputStreamMessage) {
data, err := json.Marshal(msg)
if err != nil {
return
}
now := time.Now()
h.mu.Lock()
defer h.mu.Unlock()
for conn, sub := range h.clients {
if !sub.receiveInput {
continue
}
if !sub.lastInputSent.IsZero() && now.Sub(sub.lastInputSent) < sub.interval {
continue
}
sub.lastInputSent = now
if err := conn.WriteMessage(websocket.TextMessage, data); err != nil {
delete(h.clients, conn)
_ = conn.Close()
}
}
}
func runInputStreamer(link *managedSerial, hub *accelStreamHub, dash *wsHub, ctl *accelStreamCtl, tapCtl *tapNotifyCtl, stop <-chan struct{}) {
ticker := time.NewTicker(minAPIStreamInterval)
defer ticker.Stop()
var pending *pendingInputCache
for {
select {
case <-stop:
return
case <-hub.configChanged:
pending = nil
case now := <-ticker.C:
if !hub.anyWantsInput() || !inputPollingActive(dash, ctl, tapCtl) {
pending = nil
continue
}
needRead, needDeliver, waitPreFetch := hub.streamTiming(now)
if needRead && pending == nil {
cache, err := link.readCacheStatusPoll()
if err != nil {
pending = &pendingInputCache{readErr: err, readAt: now}
} else {
hub.ingestTapFromCache(cache)
pending = &pendingInputCache{cache: cache, readAt: now}
}
}
if !needDeliver || pending == nil {
continue
}
ts := now.UnixNano()
if pending.readErr != nil {
errMsg := pending.readErr.Error()
if errors.Is(pending.readErr, errUARTBusy) {
errMsg = "uart busy"
}
hub.deliverInput(InputStreamMessage{
Type: "input",
T: ts,
Success: false,
Error: errMsg,
})
pending = nil
continue
}
if pending.cache != nil && now.Sub(pending.readAt) >= waitPreFetch {
hub.mu.RLock()
clients := hub.inputClientsFromCacheLocked(pending.cache, now)
hub.mu.RUnlock()
hub.deliverInput(InputStreamMessage{
Type: "input",
T: ts,
Success: true,
Clients: clients,
})
pending = nil
}
}
}
}
func inputPollingActive(dash *wsHub, ctl *accelStreamCtl, tapCtl *tapNotifyCtl) bool {
if ctl != nil && ctl.Any() {
return true
}
if tapCtl != nil && tapCtl.Any() {
return true
}
return dash != nil && dash.anyAccelStreamEnabled()
}
func writeStreamStatus(conn *websocket.Conn, msg StreamStatusMessage) {
data, err := json.Marshal(msg)
if err != nil {
return
}
_ = conn.WriteMessage(websocket.TextMessage, data)
}
func writeBatteryStatus(conn *websocket.Conn, out batteryAPIResponse) {
out.Type = "battery_status"
data, err := json.Marshal(out)
if err != nil {
return
}
_ = conn.WriteMessage(websocket.TextMessage, data)
}
func writeLedRingStatus(conn *websocket.Conn, out ledRingAPIResponse) {
out.Type = "led_ring_status"
data, err := json.Marshal(out)
if err != nil {
return
}
_ = conn.WriteMessage(websocket.TextMessage, data)
}
func writeInputStreamStatus(conn *websocket.Conn, out accelStreamAPIResponse) {
msg := InputStreamStatusMessage{
Type: "input_stream_status",
ClientID: out.ClientID,
Enabled: out.Enabled,
Success: out.Success,
SlavesUpdated: out.SlavesUpdated,
Error: out.Error,
}
data, err := json.Marshal(msg)
if err != nil {
return
}
_ = conn.WriteMessage(websocket.TextMessage, data)
}
func clientInfoToAPI(c *pb.ClientInfo) APIClientInfo {
return APIClientInfo{
ID: c.GetId(),
MAC: formatMAC(c.GetMac()),
Version: c.GetVersion(),
Available: c.GetAvailable(),
Used: c.GetUsed(),
LastPing: c.GetLastPing(),
LastSuccessPing: c.GetLastSuccessPing(),
InputStream: c.GetAccelStreamEnabled(),
TapNotifySingle: c.GetTapNotifySingle(),
TapNotifyDouble: c.GetTapNotifyDouble(),
TapNotifyTriple: c.GetTapNotifyTriple(),
}
}
func writeClientList(conn *websocket.Conn, msg ClientListMessage) {
data, err := json.Marshal(msg)
if err != nil {
return
}
_ = conn.WriteMessage(websocket.TextMessage, data)
}
func writeTapNotifyStatus(conn *websocket.Conn, out tapNotifyAPIResponse) {
msg := TapNotifyStatusMessage{
Type: "tap_notify_status",
ClientID: out.ClientID,
Single: out.Single,
DoubleTap: out.DoubleTap,
Triple: out.Triple,
Success: out.Success,
SlavesUpdated: out.SlavesUpdated,
Error: out.Error,
}
data, err := json.Marshal(msg)
if err != nil {
return
}
_ = conn.WriteMessage(websocket.TextMessage, data)
}
func applyTapNotifyClientWS(link *managedSerial, dash *wsHub, tapCtl *tapNotifyCtl, clientID uint32, single, doubleTap, triple bool) tapNotifyAPIResponse {
resp, err := link.TapNotify(&pb.TapNotifyRequest{
Write: true,
ClientId: clientID,
Single: single,
DoubleTap: doubleTap,
Triple: triple,
})
if err != nil {
return tapNotifyAPIResponse{ClientID: clientID, Error: err.Error()}
}
out := tapNotifyAPIResponse{
ClientID: resp.GetClientId(),
Success: resp.GetSuccess(),
SlavesUpdated: resp.GetSlavesUpdated(),
Single: resp.GetSingle(),
DoubleTap: resp.GetDoubleTap(),
Triple: resp.GetTriple(),
}
if resp.GetSuccess() {
if tapCtl != nil {
tapCtl.Set(clientID, single, doubleTap, triple)
}
if dash != nil {
dash.patchClientTapNotify(clientID, single, doubleTap, triple)
}
}
return out
}
func handleAccelWSCommand(conn *websocket.Conn, sub *wsSubscriber, data []byte, link *managedSerial, dash *wsHub, ctl *accelStreamCtl, tapCtl *tapNotifyCtl, hub *accelStreamHub) {
var cmd accelWSCommand
if err := json.Unmarshal(data, &cmd); err != nil {
writeStreamStatus(conn, StreamStatusMessage{Type: "stream_status", Error: "invalid JSON"})
return
}
switch cmd.Type {
case "list_clients":
clients, err := link.listClientsPoll()
if err != nil {
writeClientList(conn, ClientListMessage{
Type: "client_list",
Error: err.Error(),
})
return
}
out := make([]APIClientInfo, 0, len(clients))
for _, c := range clients {
out = append(out, clientInfoToAPI(c))
}
writeClientList(conn, ClientListMessage{
Type: "client_list",
Success: true,
Clients: out,
})
case "set_stream":
if cmd.Enable == nil {
writeStreamStatus(conn, StreamStatusMessage{
Type: "stream_status",
Error: "enable required",
})
return
}
writeStreamStatus(conn, hub.setStream(sub, *cmd.Enable, cmd.IntervalMs, cmd.PreFetch))
case "get_stream":
writeStreamStatus(conn, hub.getStream(sub))
case "set_input_stream":
if cmd.ClientID == 0 {
writeInputStreamStatus(conn, accelStreamAPIResponse{Error: "client_id required"})
return
}
if cmd.Enable == nil {
writeInputStreamStatus(conn, accelStreamAPIResponse{
ClientID: cmd.ClientID,
Error: "enable required",
})
return
}
writeInputStreamStatus(conn, applyAccelStreamClient(link, dash, ctl, cmd.ClientID, *cmd.Enable))
case "get_input_stream":
if cmd.ClientID == 0 {
writeInputStreamStatus(conn, accelStreamAPIResponse{Error: "client_id required"})
return
}
resp, err := link.AccelStreamPoll(&pb.AccelStreamRequest{
Write: false,
ClientId: cmd.ClientID,
})
if err != nil {
writeInputStreamStatus(conn, accelStreamAPIResponse{
ClientID: cmd.ClientID,
Error: err.Error(),
})
return
}
if ctl != nil {
ctl.Set(cmd.ClientID, resp.GetEnabled())
}
writeInputStreamStatus(conn, accelStreamAPIResponse{
Enabled: resp.GetEnabled(),
ClientID: resp.GetClientId(),
Success: resp.GetSuccess(),
})
case "set_tap_notify":
if cmd.AllClients {
if cmd.Single == nil || cmd.DoubleTap == nil || cmd.Triple == nil {
writeTapNotifyStatus(conn, tapNotifyAPIResponse{Error: "single, double_tap, triple required"})
return
}
updated, err := applyTapNotifyAll(link, dash, tapCtl, *cmd.Single, *cmd.DoubleTap, *cmd.Triple)
if err != nil {
writeTapNotifyStatus(conn, tapNotifyAPIResponse{Error: err.Error()})
return
}
writeTapNotifyStatus(conn, tapNotifyAPIResponse{
Success: updated > 0,
SlavesUpdated: updated,
Single: *cmd.Single,
DoubleTap: *cmd.DoubleTap,
Triple: *cmd.Triple,
})
return
}
if cmd.ClientID == 0 {
writeTapNotifyStatus(conn, tapNotifyAPIResponse{Error: "client_id required"})
return
}
if cmd.Single == nil || cmd.DoubleTap == nil || cmd.Triple == nil {
writeTapNotifyStatus(conn, tapNotifyAPIResponse{
ClientID: cmd.ClientID,
Error: "single, double_tap, triple required",
})
return
}
writeTapNotifyStatus(conn, applyTapNotifyClientWS(link, dash, tapCtl, cmd.ClientID, *cmd.Single, *cmd.DoubleTap, *cmd.Triple))
case "get_tap_notify":
if cmd.ClientID == 0 {
writeTapNotifyStatus(conn, tapNotifyAPIResponse{Error: "client_id required"})
return
}
resp, err := link.TapNotifyPoll(&pb.TapNotifyRequest{
Write: false,
ClientId: cmd.ClientID,
})
if err != nil {
writeTapNotifyStatus(conn, tapNotifyAPIResponse{
ClientID: cmd.ClientID,
Error: err.Error(),
})
return
}
if tapCtl != nil {
tapCtl.Set(cmd.ClientID, resp.GetSingle(), resp.GetDoubleTap(), resp.GetTriple())
}
writeTapNotifyStatus(conn, tapNotifyAPIResponse{
ClientID: cmd.ClientID,
Success: resp.GetSuccess(),
Single: resp.GetSingle(),
DoubleTap: resp.GetDoubleTap(),
Triple: resp.GetTriple(),
})
case "set_led_ring":
var body ledRingAPIRequest
if err := json.Unmarshal(data, &body); err != nil {
writeLedRingStatus(conn, ledRingAPIResponse{Error: "invalid JSON"})
return
}
if body.Mode == "" {
writeLedRingStatus(conn, ledRingAPIResponse{Error: "mode required"})
return
}
writeLedRingStatus(conn, applyLedRing(link, body))
case "get_battery":
var body batteryAPIRequest
if err := json.Unmarshal(data, &body); err != nil {
writeBatteryStatus(conn, batteryAPIResponse{Error: "invalid JSON"})
return
}
if !body.AllClients && body.ClientID == 0 {
body.AllClients = true
}
writeBatteryStatus(conn, applyBatteryStatus(link, body))
default:
writeStreamStatus(conn, StreamStatusMessage{
Type: "stream_status",
Error: "unknown type (list_clients, set_stream, get_stream, set_input_stream, get_input_stream, set_tap_notify, get_tap_notify, set_led_ring, get_battery)",
})
}
}
func serveExternalWS(conn *websocket.Conn, link *managedSerial, dash *wsHub, ctl *accelStreamCtl, tapCtl *tapNotifyCtl, portName string, hub *accelStreamHub) {
sub := hub.register(conn, portName)
defer hub.unregister(conn)
defer conn.Close()
for {
_, data, err := conn.ReadMessage()
if err != nil {
return
}
handleAccelWSCommand(conn, sub, data, link, dash, ctl, tapCtl, hub)
}
}
func mountExternalAPI(mux *http.ServeMux, portName string, defaultInterval time.Duration, hub *accelStreamHub, link *managedSerial, dash *wsHub, ctl *accelStreamCtl, tapCtl *tapNotifyCtl) {
defMs := int(defaultInterval / time.Millisecond)
mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path != "/" && r.URL.Path != "/api/v1" && r.URL.Path != "/api/v1/" {
http.NotFound(w, r)
return
}
if r.Method != http.MethodGet {
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
return
}
writeJSON(w, http.StatusOK, APIInfoResponse{
Name: "powerpod-external-api",
Version: "1",
SerialPort: portName,
WebSocket: "/ws",
DefaultIntervalMs: defMs,
DefaultPreFetchMs: defaultPreFetchMs,
MinIntervalMs: int(minAPIStreamInterval / time.Millisecond),
MaxIntervalMs: int(maxAPIStreamInterval / time.Millisecond),
TapDisplayMinMs: apiTapDisplayMinMs,
Description: "WebSocket: set_input_stream + set_stream for input (accel + tap); set_tap_notify configures slave tap kinds",
})
})
mux.HandleFunc("/ws", func(w http.ResponseWriter, r *http.Request) {
conn, err := wsUpgrader.Upgrade(w, r, nil)
if err != nil {
log.Printf("api websocket upgrade: %v", err)
return
}
serveExternalWS(conn, link, dash, ctl, tapCtl, portName, hub)
})
}
func runAPIServer(portName string, link *managedSerial, addr string, defaultInterval time.Duration, dash *wsHub, ctl *accelStreamCtl, tapCtl *tapNotifyCtl, stop <-chan struct{}) *http.Server {
hub := newAccelStreamHub(defaultInterval)
go runInputStreamer(link, hub, dash, ctl, tapCtl, stop)
mux := http.NewServeMux()
mountExternalAPI(mux, portName, defaultInterval, hub, link, dash, ctl, tapCtl)
mountLedRingAPI(mux, link)
mountBatteryAPI(mux, link)
srv := &http.Server{Addr: addr, Handler: mux}
go func() {
log.Printf("external API http://localhost%s WebSocket ws://localhost%s/ws (default stream interval %s, per-client via set_stream)",
addr, addr, defaultInterval.String())
if err := srv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
log.Printf("external API server: %v", err)
}
}()
return srv
}
func shutdownAPIServer(srv *http.Server) {
if srv == nil {
return
}
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
_ = srv.Shutdown(ctx)
}