From 04cbb22c57fc34ef98d6b4006d4768877efd8063 Mon Sep 17 00:00:00 2001 From: DBT Date: Tue, 24 Feb 2026 16:20:43 +0000 Subject: [PATCH] add node heartbeat ttl offline reaper and enforce capability checks --- README.md | 3 ++- README_EN.md | 3 ++- pkg/nodes/manager.go | 48 +++++++++++++++++++++++++++++++++--- pkg/nodes/registry_server.go | 44 ++++++++++++++++++++++++++++----- pkg/tools/nodes_tool.go | 3 +++ 5 files changed, 90 insertions(+), 11 deletions(-) diff --git a/README.md b/README.md index 1a076a2..b33396e 100644 --- a/README.md +++ b/README.md @@ -44,7 +44,8 @@ - `mode=auto|p2p|relay`:默认 `auto`(优先 p2p,失败回退 relay) - relay 已支持 HTTP 节点桥接:按 action 路由到 `/run` `/camera/snap` `/screen/record` `/location/get` `/canvas/*`(未知 action 回退 `/invoke`) - 主节点网关支持节点注册:`POST http://:/nodes/register` -- 可在 `gateway.token` 配置网关注册令牌;子节点注册需带 `Authorization: Bearer ` +- 支持节点续租:`POST /nodes/heartbeat`(配合 TTL 自动离线) +- 可在 `gateway.token` 配置网关注册令牌;子节点注册/续租需带 `Authorization: Bearer ` - 可在 `NodeInfo` 中配置 `token`,relay 会自动附加 `Authorization: Bearer ` - `nodes` 工具支持设备快捷参数:`facing`、`duration_ms`、`command` diff --git a/README_EN.md b/README_EN.md index fd4a0fb..ec29f44 100644 --- a/README_EN.md +++ b/README_EN.md @@ -44,7 +44,8 @@ A `nodes` tool control-plane PoC is now available: - `mode=auto|p2p|relay`: default `auto` (prefer p2p, fallback to relay) - relay now supports HTTP node bridging with action-specific routes: `/run`, `/camera/snap`, `/screen/record`, `/location/get`, `/canvas/*` (unknown action falls back to `/invoke`) - gateway supports node registration: `POST http://:/nodes/register` -- configure `gateway.token` as registration token; child nodes must send `Authorization: Bearer ` +- supports node lease renew: `POST /nodes/heartbeat` (TTL-based offline marking) +- configure `gateway.token` as registration token; child nodes must send `Authorization: Bearer ` for register/heartbeat - `NodeInfo.token` is supported; relay automatically sets `Authorization: Bearer ` - `nodes` tool supports device shortcuts: `facing`, `duration_ms`, `command` diff --git a/pkg/nodes/manager.go b/pkg/nodes/manager.go index 3a25a42..04e044f 100644 --- a/pkg/nodes/manager.go +++ b/pkg/nodes/manager.go @@ -7,6 +7,8 @@ import ( "time" ) +const defaultNodeTTL = 60 * time.Second + // Manager keeps paired node metadata and basic routing helpers. type Handler func(req Request) Response @@ -14,6 +16,7 @@ type Manager struct { mu sync.RWMutex nodes map[string]NodeInfo handlers map[string]Handler + ttl time.Duration } var defaultManager = NewManager() @@ -21,7 +24,9 @@ var defaultManager = NewManager() func DefaultManager() *Manager { return defaultManager } func NewManager() *Manager { - return &Manager{nodes: map[string]NodeInfo{}, handlers: map[string]Handler{}} + m := &Manager{nodes: map[string]NodeInfo{}, handlers: map[string]Handler{}, ttl: defaultNodeTTL} + go m.reaperLoop() + return m } func (m *Manager) Upsert(info NodeInfo) { @@ -85,6 +90,27 @@ func (m *Manager) Invoke(req Request) (Response, bool) { return resp, true } +func (m *Manager) SupportsAction(nodeID, action string) bool { + n, ok := m.Get(nodeID) + if !ok || !n.Online { + return false + } + switch strings.ToLower(strings.TrimSpace(action)) { + case "run": + return n.Capabilities.Run + case "camera_snap", "camera_clip": + return n.Capabilities.Camera + case "screen_record", "screen_snapshot": + return n.Capabilities.Screen + case "location_get": + return n.Capabilities.Location + case "canvas_snapshot", "canvas_action": + return n.Capabilities.Canvas + default: + return n.Capabilities.Invoke + } +} + func (m *Manager) PickFor(action string) (NodeInfo, bool) { m.mu.RLock() defer m.mu.RUnlock() @@ -92,7 +118,7 @@ func (m *Manager) PickFor(action string) (NodeInfo, bool) { if !n.Online { continue } - switch action { + switch strings.ToLower(strings.TrimSpace(action)) { case "run": if n.Capabilities.Run { return n, true @@ -101,7 +127,7 @@ func (m *Manager) PickFor(action string) (NodeInfo, bool) { if n.Capabilities.Camera { return n, true } - case "screen_record": + case "screen_record", "screen_snapshot": if n.Capabilities.Screen { return n, true } @@ -121,3 +147,19 @@ func (m *Manager) PickFor(action string) (NodeInfo, bool) { } return NodeInfo{}, false } + +func (m *Manager) reaperLoop() { + t := time.NewTicker(15 * time.Second) + defer t.Stop() + for range t.C { + cutoff := time.Now().UTC().Add(-m.ttl) + m.mu.Lock() + for id, n := range m.nodes { + if n.Online && !n.LastSeenAt.IsZero() && n.LastSeenAt.Before(cutoff) { + n.Online = false + m.nodes[id] = n + } + } + m.mu.Unlock() + } +} diff --git a/pkg/nodes/registry_server.go b/pkg/nodes/registry_server.go index d6d1a0e..ab9dc3a 100644 --- a/pkg/nodes/registry_server.go +++ b/pkg/nodes/registry_server.go @@ -37,6 +37,7 @@ func (s *RegistryServer) Start(ctx context.Context) error { _, _ = w.Write([]byte("ok")) }) mux.HandleFunc("/nodes/register", s.handleRegister) + mux.HandleFunc("/nodes/heartbeat", s.handleHeartbeat) s.server = &http.Server{Addr: s.addr, Handler: mux} go func() { <-ctx.Done() @@ -53,12 +54,9 @@ func (s *RegistryServer) handleRegister(w http.ResponseWriter, r *http.Request) http.Error(w, "method not allowed", http.StatusMethodNotAllowed) return } - if s.token != "" { - auth := strings.TrimSpace(r.Header.Get("Authorization")) - if auth != "Bearer "+s.token { - http.Error(w, "unauthorized", http.StatusUnauthorized) - return - } + if !s.checkAuth(r) { + http.Error(w, "unauthorized", http.StatusUnauthorized) + return } var n NodeInfo if err := json.NewDecoder(r.Body).Decode(&n); err != nil { @@ -73,3 +71,37 @@ func (s *RegistryServer) handleRegister(w http.ResponseWriter, r *http.Request) w.Header().Set("Content-Type", "application/json") _ = json.NewEncoder(w).Encode(map[string]interface{}{"ok": true, "id": n.ID}) } + +func (s *RegistryServer) handleHeartbeat(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + http.Error(w, "method not allowed", http.StatusMethodNotAllowed) + return + } + if !s.checkAuth(r) { + http.Error(w, "unauthorized", http.StatusUnauthorized) + return + } + var body struct{ ID string `json:"id"` } + if err := json.NewDecoder(r.Body).Decode(&body); err != nil || strings.TrimSpace(body.ID) == "" { + http.Error(w, "id required", http.StatusBadRequest) + return + } + n, ok := s.mgr.Get(body.ID) + if !ok { + http.Error(w, "node not found", http.StatusNotFound) + return + } + n.LastSeenAt = time.Now().UTC() + n.Online = true + s.mgr.Upsert(n) + w.Header().Set("Content-Type", "application/json") + _ = json.NewEncoder(w).Encode(map[string]interface{}{"ok": true, "id": body.ID}) +} + +func (s *RegistryServer) checkAuth(r *http.Request) bool { + if s.token == "" { + return true + } + auth := strings.TrimSpace(r.Header.Get("Authorization")) + return auth == "Bearer "+s.token +} diff --git a/pkg/tools/nodes_tool.go b/pkg/tools/nodes_tool.go index 339dd5b..7acadbf 100644 --- a/pkg/tools/nodes_tool.go +++ b/pkg/tools/nodes_tool.go @@ -66,6 +66,9 @@ func (t *NodesTool) Execute(ctx context.Context, args map[string]interface{}) (s if nodeID == "" { return "", fmt.Errorf("no eligible node found for action=%s", action) } + if !t.manager.SupportsAction(nodeID, action) { + return "", fmt.Errorf("node %s does not support action=%s", nodeID, action) + } if t.router == nil { return "", fmt.Errorf("nodes transport router not configured") }