add node heartbeat ttl offline reaper and enforce capability checks

This commit is contained in:
DBT
2026-02-24 16:20:43 +00:00
parent 07cf54538e
commit 04cbb22c57
5 changed files with 90 additions and 11 deletions

View File

@@ -7,6 +7,8 @@ import (
"time"
)
const defaultNodeTTL = 60 * time.Second
// Manager keeps paired node metadata and basic routing helpers.
type Handler func(req Request) Response
@@ -14,6 +16,7 @@ type Manager struct {
mu sync.RWMutex
nodes map[string]NodeInfo
handlers map[string]Handler
ttl time.Duration
}
var defaultManager = NewManager()
@@ -21,7 +24,9 @@ var defaultManager = NewManager()
func DefaultManager() *Manager { return defaultManager }
func NewManager() *Manager {
return &Manager{nodes: map[string]NodeInfo{}, handlers: map[string]Handler{}}
m := &Manager{nodes: map[string]NodeInfo{}, handlers: map[string]Handler{}, ttl: defaultNodeTTL}
go m.reaperLoop()
return m
}
func (m *Manager) Upsert(info NodeInfo) {
@@ -85,6 +90,27 @@ func (m *Manager) Invoke(req Request) (Response, bool) {
return resp, true
}
func (m *Manager) SupportsAction(nodeID, action string) bool {
n, ok := m.Get(nodeID)
if !ok || !n.Online {
return false
}
switch strings.ToLower(strings.TrimSpace(action)) {
case "run":
return n.Capabilities.Run
case "camera_snap", "camera_clip":
return n.Capabilities.Camera
case "screen_record", "screen_snapshot":
return n.Capabilities.Screen
case "location_get":
return n.Capabilities.Location
case "canvas_snapshot", "canvas_action":
return n.Capabilities.Canvas
default:
return n.Capabilities.Invoke
}
}
func (m *Manager) PickFor(action string) (NodeInfo, bool) {
m.mu.RLock()
defer m.mu.RUnlock()
@@ -92,7 +118,7 @@ func (m *Manager) PickFor(action string) (NodeInfo, bool) {
if !n.Online {
continue
}
switch action {
switch strings.ToLower(strings.TrimSpace(action)) {
case "run":
if n.Capabilities.Run {
return n, true
@@ -101,7 +127,7 @@ func (m *Manager) PickFor(action string) (NodeInfo, bool) {
if n.Capabilities.Camera {
return n, true
}
case "screen_record":
case "screen_record", "screen_snapshot":
if n.Capabilities.Screen {
return n, true
}
@@ -121,3 +147,19 @@ func (m *Manager) PickFor(action string) (NodeInfo, bool) {
}
return NodeInfo{}, false
}
func (m *Manager) reaperLoop() {
t := time.NewTicker(15 * time.Second)
defer t.Stop()
for range t.C {
cutoff := time.Now().UTC().Add(-m.ttl)
m.mu.Lock()
for id, n := range m.nodes {
if n.Online && !n.LastSeenAt.IsZero() && n.LastSeenAt.Before(cutoff) {
n.Online = false
m.nodes[id] = n
}
}
m.mu.Unlock()
}
}

View File

@@ -37,6 +37,7 @@ func (s *RegistryServer) Start(ctx context.Context) error {
_, _ = w.Write([]byte("ok"))
})
mux.HandleFunc("/nodes/register", s.handleRegister)
mux.HandleFunc("/nodes/heartbeat", s.handleHeartbeat)
s.server = &http.Server{Addr: s.addr, Handler: mux}
go func() {
<-ctx.Done()
@@ -53,12 +54,9 @@ func (s *RegistryServer) handleRegister(w http.ResponseWriter, r *http.Request)
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
return
}
if s.token != "" {
auth := strings.TrimSpace(r.Header.Get("Authorization"))
if auth != "Bearer "+s.token {
http.Error(w, "unauthorized", http.StatusUnauthorized)
return
}
if !s.checkAuth(r) {
http.Error(w, "unauthorized", http.StatusUnauthorized)
return
}
var n NodeInfo
if err := json.NewDecoder(r.Body).Decode(&n); err != nil {
@@ -73,3 +71,37 @@ func (s *RegistryServer) handleRegister(w http.ResponseWriter, r *http.Request)
w.Header().Set("Content-Type", "application/json")
_ = json.NewEncoder(w).Encode(map[string]interface{}{"ok": true, "id": n.ID})
}
func (s *RegistryServer) handleHeartbeat(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
return
}
if !s.checkAuth(r) {
http.Error(w, "unauthorized", http.StatusUnauthorized)
return
}
var body struct{ ID string `json:"id"` }
if err := json.NewDecoder(r.Body).Decode(&body); err != nil || strings.TrimSpace(body.ID) == "" {
http.Error(w, "id required", http.StatusBadRequest)
return
}
n, ok := s.mgr.Get(body.ID)
if !ok {
http.Error(w, "node not found", http.StatusNotFound)
return
}
n.LastSeenAt = time.Now().UTC()
n.Online = true
s.mgr.Upsert(n)
w.Header().Set("Content-Type", "application/json")
_ = json.NewEncoder(w).Encode(map[string]interface{}{"ok": true, "id": body.ID})
}
func (s *RegistryServer) checkAuth(r *http.Request) bool {
if s.token == "" {
return true
}
auth := strings.TrimSpace(r.Header.Get("Authorization"))
return auth == "Bearer "+s.token
}

View File

@@ -66,6 +66,9 @@ func (t *NodesTool) Execute(ctx context.Context, args map[string]interface{}) (s
if nodeID == "" {
return "", fmt.Errorf("no eligible node found for action=%s", action)
}
if !t.manager.SupportsAction(nodeID, action) {
return "", fmt.Errorf("node %s does not support action=%s", nodeID, action)
}
if t.router == nil {
return "", fmt.Errorf("nodes transport router not configured")
}