add phase-1 nodes control plane scaffold and register nodes tool

This commit is contained in:
DBT
2026-02-24 14:31:29 +00:00
parent 9ecb5c9b47
commit 7fa0e629e8
6 changed files with 225 additions and 0 deletions

View File

@@ -35,6 +35,18 @@
这些优化提升了高并发场景下的稳定性、可观测性与可维护性。
### 多节点 / 设备控制Phase-1
已新增 `nodes` 工具控制平面PoC
- `action=status|describe`:查看已配对节点状态与能力矩阵
- `action=run|invoke|camera_snap|screen_record|location_get`:已接入路由框架(下一阶段接数据平面传输)
实现位置:
- `pkg/nodes/types.go`
- `pkg/nodes/manager.go`
- `pkg/tools/nodes_tool.go`
### 并行任务冲突控制Autonomy
支持基于 `resource_keys` 的锁调度。任务可在内容中显式声明资源键,提升并行判冲突精度:

View File

@@ -35,6 +35,18 @@ A recent architecture pass leveraged core Go strengths:
These changes improve stability, observability, and maintainability under concurrency.
### Multi-node / device control (Phase-1)
A `nodes` tool control-plane PoC is now available:
- `action=status|describe`: inspect paired node status and capability matrix
- `action=run|invoke|camera_snap|screen_record|location_get`: routing framework is in place (data-plane bridge lands in next phase)
Implementation:
- `pkg/nodes/types.go`
- `pkg/nodes/manager.go`
- `pkg/tools/nodes_tool.go`
### Parallel task conflict control (Autonomy)
Autonomy now supports lock scheduling via `resource_keys`. You can explicitly declare keys in task text for precise conflict detection:

View File

@@ -19,6 +19,7 @@ import (
"clawgo/pkg/config"
"clawgo/pkg/cron"
"clawgo/pkg/logger"
"clawgo/pkg/nodes"
"clawgo/pkg/providers"
"clawgo/pkg/session"
"clawgo/pkg/tools"
@@ -76,6 +77,8 @@ func NewAgentLoop(cfg *config.Config, msgBus *bus.MessageBus, provider providers
toolsRegistry.Register(tools.NewAliasTool("edit", "Edit file content (OpenClaw-compatible alias of edit_file)", tools.NewEditFileTool(workspace), map[string]string{"file_path": "path", "old_string": "oldText", "new_string": "newText"}))
toolsRegistry.Register(tools.NewExecTool(cfg.Tools.Shell, workspace, processManager))
toolsRegistry.Register(tools.NewProcessTool(processManager))
nodesManager := nodes.NewManager()
toolsRegistry.Register(tools.NewNodesTool(nodesManager))
if cs != nil {
toolsRegistry.Register(tools.NewRemindTool(cs))

89
pkg/nodes/manager.go Normal file
View File

@@ -0,0 +1,89 @@
package nodes
import (
"sort"
"sync"
"time"
)
// Manager keeps paired node metadata and basic routing helpers.
type Manager struct {
mu sync.RWMutex
nodes map[string]NodeInfo
}
func NewManager() *Manager {
return &Manager{nodes: map[string]NodeInfo{}}
}
func (m *Manager) Upsert(info NodeInfo) {
m.mu.Lock()
defer m.mu.Unlock()
info.LastSeenAt = time.Now().UTC()
info.Online = true
m.nodes[info.ID] = info
}
func (m *Manager) MarkOffline(id string) {
m.mu.Lock()
defer m.mu.Unlock()
if n, ok := m.nodes[id]; ok {
n.Online = false
m.nodes[id] = n
}
}
func (m *Manager) Get(id string) (NodeInfo, bool) {
m.mu.RLock()
defer m.mu.RUnlock()
n, ok := m.nodes[id]
return n, ok
}
func (m *Manager) List() []NodeInfo {
m.mu.RLock()
defer m.mu.RUnlock()
out := make([]NodeInfo, 0, len(m.nodes))
for _, n := range m.nodes {
out = append(out, n)
}
sort.Slice(out, func(i, j int) bool { return out[i].LastSeenAt.After(out[j].LastSeenAt) })
return out
}
func (m *Manager) PickFor(action string) (NodeInfo, bool) {
m.mu.RLock()
defer m.mu.RUnlock()
for _, n := range m.nodes {
if !n.Online {
continue
}
switch action {
case "run":
if n.Capabilities.Run {
return n, true
}
case "camera_snap", "camera_clip":
if n.Capabilities.Camera {
return n, true
}
case "screen_record":
if n.Capabilities.Screen {
return n, true
}
case "location_get":
if n.Capabilities.Location {
return n, true
}
case "canvas_snapshot", "canvas_action":
if n.Capabilities.Canvas {
return n, true
}
default:
if n.Capabilities.Invoke {
return n, true
}
}
}
return NodeInfo{}, false
}

41
pkg/nodes/types.go Normal file
View File

@@ -0,0 +1,41 @@
package nodes
import "time"
// Capability matrix reported by each node agent.
type Capabilities struct {
Run bool `json:"run"`
Invoke bool `json:"invoke"`
Camera bool `json:"camera"`
Screen bool `json:"screen"`
Location bool `json:"location"`
Canvas bool `json:"canvas"`
}
// NodeInfo is the runtime descriptor for cross-device scheduling.
type NodeInfo struct {
ID string `json:"id"`
Name string `json:"name,omitempty"`
OS string `json:"os,omitempty"`
Arch string `json:"arch,omitempty"`
Version string `json:"version,omitempty"`
Capabilities Capabilities `json:"capabilities"`
LastSeenAt time.Time `json:"last_seen_at"`
Online bool `json:"online"`
}
// Envelope for node commands.
type Request struct {
Action string `json:"action"`
Node string `json:"node,omitempty"`
Args map[string]interface{} `json:"args,omitempty"`
}
// Envelope for node responses.
type Response struct {
OK bool `json:"ok"`
Error string `json:"error,omitempty"`
Node string `json:"node,omitempty"`
Action string `json:"action,omitempty"`
Payload map[string]interface{} `json:"payload,omitempty"`
}

68
pkg/tools/nodes_tool.go Normal file
View File

@@ -0,0 +1,68 @@
package tools
import (
"context"
"encoding/json"
"fmt"
"strings"
"clawgo/pkg/nodes"
)
// NodesTool provides an OpenClaw-style control surface for paired nodes.
type NodesTool struct {
manager *nodes.Manager
}
func NewNodesTool(m *nodes.Manager) *NodesTool { return &NodesTool{manager: m} }
func (t *NodesTool) Name() string { return "nodes" }
func (t *NodesTool) Description() string {
return "Manage paired nodes (status/describe/run/invoke/camera/screen/location)."
}
func (t *NodesTool) Parameters() map[string]interface{} {
return map[string]interface{}{"type": "object", "properties": map[string]interface{}{
"action": map[string]interface{}{"type": "string", "description": "status|describe|run|invoke|camera_snap|screen_record|location_get"},
"node": map[string]interface{}{"type": "string", "description": "target node id"},
"args": map[string]interface{}{"type": "object", "description": "action args"},
}, "required": []string{"action"}}
}
func (t *NodesTool) Execute(ctx context.Context, args map[string]interface{}) (string, error) {
_ = ctx
action, _ := args["action"].(string)
action = strings.TrimSpace(strings.ToLower(action))
if action == "" {
return "", fmt.Errorf("action is required")
}
nodeID, _ := args["node"].(string)
if t.manager == nil {
return "", fmt.Errorf("nodes manager not configured")
}
switch action {
case "status", "describe":
if nodeID != "" {
n, ok := t.manager.Get(nodeID)
if !ok {
return "", fmt.Errorf("node not found: %s", nodeID)
}
b, _ := json.Marshal(n)
return string(b), nil
}
b, _ := json.Marshal(t.manager.List())
return string(b), nil
default:
// Phase-1: control-plane exists, data-plane RPC bridge lands in next step.
if nodeID == "" {
if picked, ok := t.manager.PickFor(action); ok {
nodeID = picked.ID
}
}
if nodeID == "" {
return "", fmt.Errorf("no eligible node found for action=%s", action)
}
resp := nodes.Response{OK: false, Node: nodeID, Action: action, Error: "node transport bridge not implemented yet"}
b, _ := json.Marshal(resp)
return string(b), nil
}
}