Compare commits

...

5 Commits

8 changed files with 436 additions and 90 deletions

View File

@@ -40,7 +40,11 @@
已新增 `nodes` 工具控制平面PoC
- `action=status|describe`:查看已配对节点状态与能力矩阵
- `action=run|invoke|camera_snap|screen_record|location_get`:已接入路由框架(下一阶段接数据平面传输)
- `action=run|invoke|camera_snap|screen_record|location_get`:已接入路由框架
- `mode=auto|p2p|relay`:默认 `auto`(优先 p2p失败回退 relay
- relay 已支持 HTTP 节点桥接:按 action 路由到 `/run` `/camera/snap` `/screen/record` `/location/get` `/canvas/*`(未知 action 回退 `/invoke`
- 可在 `NodeInfo` 中配置 `token`relay 会自动附加 `Authorization: Bearer <token>`
- `nodes` 工具支持设备快捷参数:`facing``duration_ms``command`
实现位置:
- `pkg/nodes/types.go`

View File

@@ -40,7 +40,11 @@ These changes improve stability, observability, and maintainability under concur
A `nodes` tool control-plane PoC is now available:
- `action=status|describe`: inspect paired node status and capability matrix
- `action=run|invoke|camera_snap|screen_record|location_get`: routing framework is in place (data-plane bridge lands in next phase)
- `action=run|invoke|camera_snap|screen_record|location_get`: routing framework is in place
- `mode=auto|p2p|relay`: default `auto` (prefer p2p, fallback to relay)
- relay now supports HTTP node bridging with action-specific routes: `/run`, `/camera/snap`, `/screen/record`, `/location/get`, `/canvas/*` (unknown action falls back to `/invoke`)
- `NodeInfo.token` is supported; relay automatically sets `Authorization: Bearer <token>`
- `nodes` tool supports device shortcuts: `facing`, `duration_ms`, `command`
Implementation:
- `pkg/nodes/types.go`

View File

@@ -78,7 +78,27 @@ func NewAgentLoop(cfg *config.Config, msgBus *bus.MessageBus, provider providers
toolsRegistry.Register(tools.NewExecTool(cfg.Tools.Shell, workspace, processManager))
toolsRegistry.Register(tools.NewProcessTool(processManager))
nodesManager := nodes.NewManager()
toolsRegistry.Register(tools.NewNodesTool(nodesManager))
nodesManager.Upsert(nodes.NodeInfo{ID: "local", Name: "local", Capabilities: nodes.Capabilities{Run: true, Invoke: true, Camera: true, Screen: true, Location: true, Canvas: true}, Online: true})
nodesManager.RegisterHandler("local", func(req nodes.Request) nodes.Response {
switch req.Action {
case "run":
payload := map[string]interface{}{"transport": "relay-local"}
if cmdRaw, ok := req.Args["command"].([]interface{}); ok && len(cmdRaw) > 0 {
parts := make([]string, 0, len(cmdRaw))
for _, x := range cmdRaw {
parts = append(parts, fmt.Sprint(x))
}
payload["command"] = parts
}
return nodes.Response{OK: true, Node: "local", Action: req.Action, Payload: payload}
case "camera_snap", "camera_clip", "screen_record", "screen_snapshot", "location_get", "canvas_snapshot", "canvas_action":
return nodes.Response{OK: true, Node: "local", Action: req.Action, Payload: map[string]interface{}{"transport": "relay-local", "simulated": true, "args": req.Args}}
default:
return nodes.Response{OK: true, Node: "local", Action: req.Action, Payload: map[string]interface{}{"echo": req.Args, "transport": "relay-local"}}
}
})
nodesRouter := &nodes.Router{P2P: &nodes.StubP2PTransport{}, Relay: &nodes.HTTPRelayTransport{Manager: nodesManager}}
toolsRegistry.Register(tools.NewNodesTool(nodesManager, nodesRouter))
if cs != nil {
toolsRegistry.Register(tools.NewRemindTool(cs))

View File

@@ -2,18 +2,22 @@ package nodes
import (
"sort"
"strings"
"sync"
"time"
)
// Manager keeps paired node metadata and basic routing helpers.
type Handler func(req Request) Response
type Manager struct {
mu sync.RWMutex
nodes map[string]NodeInfo
mu sync.RWMutex
nodes map[string]NodeInfo
handlers map[string]Handler
}
func NewManager() *Manager {
return &Manager{nodes: map[string]NodeInfo{}}
return &Manager{nodes: map[string]NodeInfo{}, handlers: map[string]Handler{}}
}
func (m *Manager) Upsert(info NodeInfo) {
@@ -51,6 +55,32 @@ func (m *Manager) List() []NodeInfo {
return out
}
func (m *Manager) RegisterHandler(nodeID string, h Handler) {
m.mu.Lock()
defer m.mu.Unlock()
if strings.TrimSpace(nodeID) == "" || h == nil {
return
}
m.handlers[nodeID] = h
}
func (m *Manager) Invoke(req Request) (Response, bool) {
m.mu.RLock()
h, ok := m.handlers[req.Node]
m.mu.RUnlock()
if !ok {
return Response{}, false
}
resp := h(req)
if strings.TrimSpace(resp.Node) == "" {
resp.Node = req.Node
}
if strings.TrimSpace(resp.Action) == "" {
resp.Action = req.Action
}
return resp, true
}
func (m *Manager) PickFor(action string) (NodeInfo, bool) {
m.mu.RLock()
defer m.mu.RUnlock()

143
pkg/nodes/transport.go Normal file
View File

@@ -0,0 +1,143 @@
package nodes
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"strings"
"time"
)
// Transport abstracts node data-plane delivery.
type Transport interface {
Name() string
Send(ctx context.Context, req Request) (Response, error)
}
// Router prefers p2p transport and falls back to relay.
type Router struct {
P2P Transport
Relay Transport
}
func (r *Router) Dispatch(ctx context.Context, req Request, mode string) (Response, error) {
m := strings.ToLower(strings.TrimSpace(mode))
if m == "" {
m = "auto"
}
switch m {
case "p2p":
if r.P2P == nil {
return Response{OK: false, Node: req.Node, Action: req.Action, Error: "p2p transport unavailable"}, nil
}
return r.P2P.Send(ctx, req)
case "relay":
if r.Relay == nil {
return Response{OK: false, Node: req.Node, Action: req.Action, Error: "relay transport unavailable"}, nil
}
return r.Relay.Send(ctx, req)
default: // auto
if r.P2P != nil {
if resp, err := r.P2P.Send(ctx, req); err == nil && resp.OK {
return resp, nil
}
}
if r.Relay != nil {
return r.Relay.Send(ctx, req)
}
return Response{}, fmt.Errorf("no transport available")
}
}
// StubP2PTransport provides phase-2 negotiation scaffold.
type StubP2PTransport struct{}
func (s *StubP2PTransport) Name() string { return "p2p" }
func (s *StubP2PTransport) Send(ctx context.Context, req Request) (Response, error) {
_ = ctx
return Response{OK: false, Node: req.Node, Action: req.Action, Error: "p2p session not established yet"}, nil
}
// HTTPRelayTransport dispatches requests to node-agent endpoints over HTTP.
type HTTPRelayTransport struct {
Manager *Manager
Client *http.Client
}
func (s *HTTPRelayTransport) Name() string { return "relay" }
func actionHTTPPath(action string) string {
switch strings.ToLower(strings.TrimSpace(action)) {
case "run":
return "/run"
case "invoke":
return "/invoke"
case "camera_snap":
return "/camera/snap"
case "camera_clip":
return "/camera/clip"
case "screen_record":
return "/screen/record"
case "screen_snapshot":
return "/screen/snapshot"
case "location_get":
return "/location/get"
case "canvas_snapshot":
return "/canvas/snapshot"
case "canvas_action":
return "/canvas/action"
default:
return "/invoke"
}
}
func (s *HTTPRelayTransport) Send(ctx context.Context, req Request) (Response, error) {
if s.Manager == nil {
return Response{OK: false, Node: req.Node, Action: req.Action, Error: "relay manager not configured"}, nil
}
if resp, ok := s.Manager.Invoke(req); ok {
return resp, nil
}
n, ok := s.Manager.Get(req.Node)
if !ok {
return Response{OK: false, Node: req.Node, Action: req.Action, Error: "node not found"}, nil
}
endpoint := strings.TrimRight(strings.TrimSpace(n.Endpoint), "/")
if endpoint == "" {
return Response{OK: false, Node: req.Node, Action: req.Action, Error: "node endpoint not configured"}, nil
}
client := s.Client
if client == nil {
client = &http.Client{Timeout: 20 * time.Second}
}
body, _ := json.Marshal(req)
path := actionHTTPPath(req.Action)
hreq, err := http.NewRequestWithContext(ctx, http.MethodPost, endpoint+path, bytes.NewReader(body))
if err != nil {
return Response{}, err
}
hreq.Header.Set("Content-Type", "application/json")
if tok := strings.TrimSpace(n.Token); tok != "" {
hreq.Header.Set("Authorization", "Bearer "+tok)
}
hresp, err := client.Do(hreq)
if err != nil {
return Response{OK: false, Node: req.Node, Action: req.Action, Error: err.Error()}, nil
}
defer hresp.Body.Close()
payload, _ := io.ReadAll(io.LimitReader(hresp.Body, 1<<20))
var resp Response
if err := json.Unmarshal(payload, &resp); err != nil {
return Response{OK: false, Node: req.Node, Action: req.Action, Error: fmt.Sprintf("invalid node response: %s", strings.TrimSpace(string(payload)))}, nil
}
if strings.TrimSpace(resp.Node) == "" {
resp.Node = req.Node
}
if strings.TrimSpace(resp.Action) == "" {
resp.Action = req.Action
}
return resp, nil
}

View File

@@ -19,6 +19,8 @@ type NodeInfo struct {
OS string `json:"os,omitempty"`
Arch string `json:"arch,omitempty"`
Version string `json:"version,omitempty"`
Endpoint string `json:"endpoint,omitempty"`
Token string `json:"token,omitempty"`
Capabilities Capabilities `json:"capabilities"`
LastSeenAt time.Time `json:"last_seen_at"`
Online bool `json:"online"`

View File

@@ -6,17 +6,35 @@ import (
"fmt"
"os"
"path/filepath"
"sort"
"strings"
"sync"
"time"
)
type MemorySearchTool struct {
workspace string
mu sync.RWMutex
cache map[string]cachedMemoryFile
}
type memoryBlock struct {
lineNum int
content string
heading string
lower string
}
type cachedMemoryFile struct {
modTime time.Time
blocks []memoryBlock
tokenIndex map[string][]int
}
func NewMemorySearchTool(workspace string) *MemorySearchTool {
return &MemorySearchTool{
workspace: workspace,
cache: make(map[string]cachedMemoryFile),
}
}
@@ -63,6 +81,9 @@ func (t *MemorySearchTool) Execute(ctx context.Context, args map[string]interfac
if m, ok := args["maxResults"].(float64); ok {
maxResults = int(m)
}
if maxResults <= 0 {
maxResults = 5
}
keywords := strings.Fields(strings.ToLower(query))
if len(keywords) == 0 {
@@ -74,11 +95,16 @@ func (t *MemorySearchTool) Execute(ctx context.Context, args map[string]interfac
resultsChan := make(chan []searchResult, len(files))
var wg sync.WaitGroup
// 并发搜索所有文件
for _, file := range files {
if ctx.Err() != nil {
break
}
wg.Add(1)
go func(f string) {
defer wg.Done()
if ctx.Err() != nil {
return
}
matches, err := t.searchFile(f, keywords)
if err == nil {
resultsChan <- matches
@@ -86,7 +112,6 @@ func (t *MemorySearchTool) Execute(ctx context.Context, args map[string]interfac
}(file)
}
// 异步关闭通道
go func() {
wg.Wait()
close(resultsChan)
@@ -97,14 +122,15 @@ func (t *MemorySearchTool) Execute(ctx context.Context, args map[string]interfac
allResults = append(allResults, matches...)
}
// Simple ranking: sort by score (number of keyword matches) desc
for i := 0; i < len(allResults); i++ {
for j := i + 1; j < len(allResults); j++ {
if allResults[j].score > allResults[i].score {
allResults[i], allResults[j] = allResults[j], allResults[i]
}
sort.SliceStable(allResults, func(i, j int) bool {
if allResults[i].score != allResults[j].score {
return allResults[i].score > allResults[j].score
}
}
if allResults[i].file != allResults[j].file {
return allResults[i].file < allResults[j].file
}
return allResults[i].lineNum < allResults[j].lineNum
})
if len(allResults) > maxResults {
allResults = allResults[:maxResults]
@@ -179,101 +205,192 @@ func dedupeStrings(items []string) []string {
return out
}
// searchFile parses the markdown file into blocks (paragraphs/list items) and searches them
// searchFile searches parsed markdown blocks with cache by file modtime.
func (t *MemorySearchTool) searchFile(path string, keywords []string) ([]searchResult, error) {
file, err := os.Open(path)
blocks, tokenIndex, err := t.getOrParseBlocks(path)
if err != nil {
return nil, err
}
defer file.Close()
var results []searchResult
scanner := bufio.NewScanner(file)
var currentBlock strings.Builder
var blockStartLine int = 1
var currentLineNum int = 0
var lastHeading string
processBlock := func() {
content := strings.TrimSpace(currentBlock.String())
if content != "" {
lowerContent := strings.ToLower(content)
score := 0
// Calculate score: how many keywords are present?
for _, kw := range keywords {
if strings.Contains(lowerContent, kw) {
score++
}
candidate := candidateBlockIndexes(tokenIndex, keywords, len(blocks))
results := make([]searchResult, 0, 8)
for _, idx := range candidate {
if idx < 0 || idx >= len(blocks) {
continue
}
b := blocks[idx]
score := 0
for _, kw := range keywords {
if strings.Contains(b.lower, kw) {
score++
}
// Add bonus if heading matches
if lastHeading != "" {
lowerHeading := strings.ToLower(lastHeading)
for _, kw := range keywords {
if strings.Contains(lowerHeading, kw) {
score++
}
}
// Prepend heading context if not already part of block
if !strings.HasPrefix(content, "#") {
content = fmt.Sprintf("[%s]\n%s", lastHeading, content)
}
}
// Only keep if at least one keyword matched
if score > 0 {
results = append(results, searchResult{
file: path,
lineNum: blockStartLine,
content: content,
score: score,
})
if b.heading != "" && strings.Contains(strings.ToLower(b.heading), kw) {
score++
}
}
currentBlock.Reset()
if score == 0 {
continue
}
content := b.content
if b.heading != "" && !strings.HasPrefix(strings.TrimSpace(content), "#") {
content = fmt.Sprintf("[%s]\n%s", b.heading, content)
}
results = append(results, searchResult{file: path, lineNum: b.lineNum, content: content, score: score})
}
return results, nil
}
func (t *MemorySearchTool) getOrParseBlocks(path string) ([]memoryBlock, map[string][]int, error) {
st, err := os.Stat(path)
if err != nil {
return nil, nil, err
}
mod := st.ModTime()
t.mu.RLock()
if c, ok := t.cache[path]; ok && c.modTime.Equal(mod) {
blocks := c.blocks
idx := c.tokenIndex
t.mu.RUnlock()
return blocks, idx, nil
}
t.mu.RUnlock()
blocks, tokenIndex, err := parseMarkdownBlocks(path)
if err != nil {
return nil, nil, err
}
t.mu.Lock()
t.cache[path] = cachedMemoryFile{modTime: mod, blocks: blocks, tokenIndex: tokenIndex}
t.mu.Unlock()
return blocks, tokenIndex, nil
}
func parseMarkdownBlocks(path string) ([]memoryBlock, map[string][]int, error) {
file, err := os.Open(path)
if err != nil {
return nil, nil, err
}
defer file.Close()
blocks := make([]memoryBlock, 0, 32)
scanner := bufio.NewScanner(file)
var current strings.Builder
blockStartLine := 1
currentLine := 0
lastHeading := ""
flush := func() {
content := strings.TrimSpace(current.String())
if content == "" {
current.Reset()
return
}
blocks = append(blocks, memoryBlock{lineNum: blockStartLine, content: content, heading: lastHeading, lower: strings.ToLower(content)})
current.Reset()
}
for scanner.Scan() {
currentLineNum++
currentLine++
line := scanner.Text()
trimmed := strings.TrimSpace(line)
// Markdown Block Logic:
// 1. Headers start new blocks
// 2. Empty lines separate blocks
// 3. List items start new blocks (optional, but good for logs)
isHeader := strings.HasPrefix(trimmed, "#")
isEmpty := trimmed == ""
isList := strings.HasPrefix(trimmed, "- ") || strings.HasPrefix(trimmed, "* ") || (len(trimmed) > 3 && trimmed[1] == '.' && trimmed[2] == ' ')
if isHeader {
processBlock() // Flush previous
flush()
lastHeading = strings.TrimLeft(trimmed, "# ")
blockStartLine = currentLineNum
currentBlock.WriteString(line + "\n")
processBlock() // Headers are their own blocks too
blockStartLine = currentLine
current.WriteString(line + "\n")
flush()
continue
}
if isEmpty {
processBlock() // Flush previous
blockStartLine = currentLineNum + 1
flush()
blockStartLine = currentLine + 1
continue
}
if isList {
processBlock() // Flush previous (treat list items as atomic for better granularity)
blockStartLine = currentLineNum
flush()
blockStartLine = currentLine
}
if currentBlock.Len() == 0 {
blockStartLine = currentLineNum
if current.Len() == 0 {
blockStartLine = currentLine
}
currentBlock.WriteString(line + "\n")
current.WriteString(line + "\n")
}
processBlock() // Flush last block
return results, nil
flush()
tokenIndex := buildTokenIndex(blocks)
return blocks, tokenIndex, nil
}
func buildTokenIndex(blocks []memoryBlock) map[string][]int {
idx := make(map[string][]int, 256)
for i, b := range blocks {
seen := map[string]struct{}{}
for _, tok := range tokenizeForIndex(b.lower + " " + strings.ToLower(b.heading)) {
if _, ok := seen[tok]; ok {
continue
}
seen[tok] = struct{}{}
idx[tok] = append(idx[tok], i)
}
}
return idx
}
func tokenizeForIndex(s string) []string {
s = strings.ToLower(strings.TrimSpace(s))
if s == "" {
return nil
}
s = strings.NewReplacer("\n", " ", "\t", " ", ",", " ", ".", " ", ":", " ", ";", " ", "(", " ", ")", " ", "[", " ", "]", " ", "{", " ", "}", " ", "`", " ", "\"", " ", "'", " ").Replace(s)
parts := strings.Fields(s)
out := make([]string, 0, len(parts))
for _, p := range parts {
if len(p) < 2 {
continue
}
out = append(out, p)
}
return out
}
func candidateBlockIndexes(tokenIndex map[string][]int, keywords []string, total int) []int {
if total <= 0 {
return nil
}
if len(keywords) == 0 || len(tokenIndex) == 0 {
out := make([]int, 0, total)
for i := 0; i < total; i++ {
out = append(out, i)
}
return out
}
candMap := map[int]int{}
for _, kw := range keywords {
kw = strings.TrimSpace(strings.ToLower(kw))
if kw == "" {
continue
}
for tok, ids := range tokenIndex {
if strings.Contains(tok, kw) {
for _, id := range ids {
candMap[id]++
}
}
}
}
if len(candMap) == 0 {
out := make([]int, 0, total)
for i := 0; i < total; i++ {
out = append(out, i)
}
return out
}
out := make([]int, 0, len(candMap))
for id := range candMap {
out = append(out, id)
}
sort.Slice(out, func(i, j int) bool { return candMap[out[i]] > candMap[out[j]] })
return out
}

View File

@@ -12,18 +12,23 @@ import (
// NodesTool provides an OpenClaw-style control surface for paired nodes.
type NodesTool struct {
manager *nodes.Manager
router *nodes.Router
}
func NewNodesTool(m *nodes.Manager) *NodesTool { return &NodesTool{manager: m} }
func NewNodesTool(m *nodes.Manager, r *nodes.Router) *NodesTool { return &NodesTool{manager: m, router: r} }
func (t *NodesTool) Name() string { return "nodes" }
func (t *NodesTool) Description() string {
return "Manage paired nodes (status/describe/run/invoke/camera/screen/location)."
return "Manage paired nodes (status/describe/run/invoke/camera/screen/location/canvas)."
}
func (t *NodesTool) Parameters() map[string]interface{} {
return map[string]interface{}{"type": "object", "properties": map[string]interface{}{
"action": map[string]interface{}{"type": "string", "description": "status|describe|run|invoke|camera_snap|screen_record|location_get"},
"action": map[string]interface{}{"type": "string", "description": "status|describe|run|invoke|camera_snap|camera_clip|screen_record|screen_snapshot|location_get|canvas_snapshot|canvas_action"},
"node": map[string]interface{}{"type": "string", "description": "target node id"},
"mode": map[string]interface{}{"type": "string", "description": "auto|p2p|relay (default auto)"},
"args": map[string]interface{}{"type": "object", "description": "action args"},
"command": map[string]interface{}{"type": "array", "description": "run command array shortcut"},
"facing": map[string]interface{}{"type": "string", "description": "camera facing: front|back|both"},
"duration_ms": map[string]interface{}{"type": "integer", "description": "clip/record duration"},
}, "required": []string{"action"}}
}
@@ -35,6 +40,7 @@ func (t *NodesTool) Execute(ctx context.Context, args map[string]interface{}) (s
return "", fmt.Errorf("action is required")
}
nodeID, _ := args["node"].(string)
mode, _ := args["mode"].(string)
if t.manager == nil {
return "", fmt.Errorf("nodes manager not configured")
}
@@ -52,7 +58,6 @@ func (t *NodesTool) Execute(ctx context.Context, args map[string]interface{}) (s
b, _ := json.Marshal(t.manager.List())
return string(b), nil
default:
// Phase-1: control-plane exists, data-plane RPC bridge lands in next step.
if nodeID == "" {
if picked, ok := t.manager.PickFor(action); ok {
nodeID = picked.ID
@@ -61,7 +66,28 @@ func (t *NodesTool) Execute(ctx context.Context, args map[string]interface{}) (s
if nodeID == "" {
return "", fmt.Errorf("no eligible node found for action=%s", action)
}
resp := nodes.Response{OK: false, Node: nodeID, Action: action, Error: "node transport bridge not implemented yet"}
if t.router == nil {
return "", fmt.Errorf("nodes transport router not configured")
}
reqArgs := map[string]interface{}{}
if raw, ok := args["args"].(map[string]interface{}); ok {
for k, v := range raw {
reqArgs[k] = v
}
}
if cmd, ok := args["command"].([]interface{}); ok && len(cmd) > 0 {
reqArgs["command"] = cmd
}
if facing, _ := args["facing"].(string); strings.TrimSpace(facing) != "" {
reqArgs["facing"] = strings.TrimSpace(facing)
}
if d, ok := args["duration_ms"].(float64); ok && d > 0 {
reqArgs["duration_ms"] = int(d)
}
resp, err := t.router.Dispatch(ctx, nodes.Request{Action: action, Node: nodeID, Args: reqArgs}, mode)
if err != nil {
return "", err
}
b, _ := json.Marshal(resp)
return string(b), nil
}