mirror of
https://github.com/YspCoder/clawgo.git
synced 2026-04-15 02:47:29 +08:00
Remove node runtime and config surface
This commit is contained in:
@@ -27,7 +27,6 @@ import (
|
||||
"github.com/YspCoder/clawgo/pkg/config"
|
||||
"github.com/YspCoder/clawgo/pkg/cron"
|
||||
"github.com/YspCoder/clawgo/pkg/logger"
|
||||
"github.com/YspCoder/clawgo/pkg/nodes"
|
||||
"github.com/YspCoder/clawgo/pkg/providers"
|
||||
"github.com/YspCoder/clawgo/pkg/runtimecfg"
|
||||
"github.com/YspCoder/clawgo/pkg/scheduling"
|
||||
@@ -64,7 +63,6 @@ type AgentLoop struct {
|
||||
sessionStreamed map[string]bool
|
||||
subagentManager *tools.SubagentManager
|
||||
subagentRouter *tools.SubagentRouter
|
||||
nodeRouter *nodes.Router
|
||||
configPath string
|
||||
subagentDigestMu sync.Mutex
|
||||
subagentDigestDelay time.Duration
|
||||
@@ -99,38 +97,6 @@ func (al *AgentLoop) SetConfigPath(path string) {
|
||||
al.configPath = strings.TrimSpace(path)
|
||||
}
|
||||
|
||||
func (al *AgentLoop) SetNodeP2PTransport(t nodes.Transport) {
|
||||
if al == nil || al.nodeRouter == nil {
|
||||
return
|
||||
}
|
||||
al.nodeRouter.P2P = t
|
||||
}
|
||||
|
||||
func (al *AgentLoop) DispatchNodeRequest(ctx context.Context, req nodes.Request, mode string) (nodes.Response, error) {
|
||||
if al == nil || al.tools == nil {
|
||||
return nodes.Response{}, fmt.Errorf("agent loop not ready")
|
||||
}
|
||||
args := map[string]interface{}{
|
||||
"action": req.Action,
|
||||
"node": req.Node,
|
||||
"mode": mode,
|
||||
"task": req.Task,
|
||||
"model": req.Model,
|
||||
}
|
||||
if len(req.Args) > 0 {
|
||||
args["args"] = req.Args
|
||||
}
|
||||
out, err := al.tools.Execute(ctx, "nodes", args)
|
||||
if err != nil {
|
||||
return nodes.Response{}, err
|
||||
}
|
||||
var resp nodes.Response
|
||||
if err := json.Unmarshal([]byte(out), &resp); err != nil {
|
||||
return nodes.Response{}, err
|
||||
}
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
// StartupCompactionReport provides startup memory/session maintenance stats.
|
||||
type StartupCompactionReport struct {
|
||||
TotalSessions int `json:"total_sessions"`
|
||||
@@ -154,66 +120,6 @@ func NewAgentLoop(cfg *config.Config, msgBus *bus.MessageBus, provider providers
|
||||
toolsRegistry.Register(tools.NewExecTool(cfg.Tools.Shell, workspace, processManager))
|
||||
toolsRegistry.Register(tools.NewProcessTool(processManager))
|
||||
toolsRegistry.Register(tools.NewSkillExecTool(workspace))
|
||||
nodesManager := nodes.DefaultManager()
|
||||
nodesManager.SetAuditPath(filepath.Join(workspace, "memory", "nodes-audit.jsonl"))
|
||||
nodesManager.SetStatePath(filepath.Join(workspace, "memory", "nodes-state.json"))
|
||||
nodesManager.Upsert(nodes.NodeInfo{ID: "local", Name: "local", Capabilities: nodes.Capabilities{Run: true, Invoke: true, Model: true, Camera: true, Screen: true, Location: true, Canvas: true}, Models: []string{"local-sim"}, Online: true})
|
||||
nodesManager.RegisterHandler("local", func(req nodes.Request) nodes.Response {
|
||||
switch req.Action {
|
||||
case "run":
|
||||
payload := map[string]interface{}{"transport": "relay-local", "simulated": true}
|
||||
if cmdRaw, ok := req.Args["command"].([]interface{}); ok && len(cmdRaw) > 0 {
|
||||
parts := make([]string, 0, len(cmdRaw))
|
||||
for _, x := range cmdRaw {
|
||||
parts = append(parts, fmt.Sprint(x))
|
||||
}
|
||||
payload["command"] = parts
|
||||
}
|
||||
return nodes.Response{OK: true, Code: "ok", Node: "local", Action: req.Action, Payload: payload}
|
||||
case "agent_task":
|
||||
return nodes.Response{OK: true, Code: "ok", Node: "local", Action: req.Action, Payload: map[string]interface{}{"transport": "relay-local", "simulated": true, "model": req.Model, "task": req.Task, "result": "local child-model simulated execution completed"}}
|
||||
case "camera_snap":
|
||||
return nodes.Response{OK: true, Code: "ok", Node: "local", Action: req.Action, Payload: map[string]interface{}{"transport": "relay-local", "media_type": "image", "storage": "inline", "facing": req.Args["facing"], "simulated": true, "meta": map[string]interface{}{"width": 1280, "height": 720}}}
|
||||
case "camera_clip":
|
||||
return nodes.Response{OK: true, Code: "ok", Node: "local", Action: req.Action, Payload: map[string]interface{}{"transport": "relay-local", "media_type": "video", "storage": "path", "path": "/tmp/camera_clip.mp4", "duration_ms": req.Args["duration_ms"], "simulated": true, "meta": map[string]interface{}{"fps": 30}}}
|
||||
case "screen_snapshot":
|
||||
return nodes.Response{OK: true, Code: "ok", Node: "local", Action: req.Action, Payload: map[string]interface{}{"transport": "relay-local", "media_type": "image", "storage": "inline", "simulated": true, "meta": map[string]interface{}{"width": 1920, "height": 1080}}}
|
||||
case "screen_record":
|
||||
return nodes.Response{OK: true, Code: "ok", Node: "local", Action: req.Action, Payload: map[string]interface{}{"transport": "relay-local", "media_type": "video", "storage": "path", "path": "/tmp/screen_record.mp4", "duration_ms": req.Args["duration_ms"], "simulated": true, "meta": map[string]interface{}{"fps": 30}}}
|
||||
case "location_get":
|
||||
return nodes.Response{OK: true, Code: "ok", Node: "local", Action: req.Action, Payload: map[string]interface{}{"transport": "relay-local", "lat": 0.0, "lng": 0.0, "accuracy": "simulated", "meta": map[string]interface{}{"provider": "simulated"}}}
|
||||
case "canvas_snapshot":
|
||||
return nodes.Response{OK: true, Code: "ok", Node: "local", Action: req.Action, Payload: map[string]interface{}{"transport": "relay-local", "image": "data:image/png;base64,<simulated>", "media_type": "image", "storage": "inline", "simulated": true, "meta": map[string]interface{}{"width": 1280, "height": 720}}}
|
||||
case "canvas_action":
|
||||
return nodes.Response{OK: true, Code: "ok", Node: "local", Action: req.Action, Payload: map[string]interface{}{"transport": "relay-local", "applied": true, "simulated": true, "args": req.Args}}
|
||||
default:
|
||||
return nodes.Response{OK: false, Code: "unsupported_action", Node: "local", Action: req.Action, Error: "unsupported local simulated action"}
|
||||
}
|
||||
})
|
||||
nodeDispatchPolicy := nodes.DispatchPolicy{
|
||||
PreferLocal: cfg.Gateway.Nodes.Dispatch.PreferLocal,
|
||||
PreferP2P: cfg.Gateway.Nodes.Dispatch.PreferP2P,
|
||||
AllowRelayFallback: cfg.Gateway.Nodes.Dispatch.AllowRelayFallback,
|
||||
ActionTags: cfg.Gateway.Nodes.Dispatch.ActionTags,
|
||||
AgentTags: cfg.Gateway.Nodes.Dispatch.AgentTags,
|
||||
AllowActions: cfg.Gateway.Nodes.Dispatch.AllowActions,
|
||||
DenyActions: cfg.Gateway.Nodes.Dispatch.DenyActions,
|
||||
AllowAgents: cfg.Gateway.Nodes.Dispatch.AllowAgents,
|
||||
DenyAgents: cfg.Gateway.Nodes.Dispatch.DenyAgents,
|
||||
}
|
||||
nodesManager.SetDispatchPolicy(nodeDispatchPolicy)
|
||||
var nodeP2P nodes.Transport
|
||||
if cfg.Gateway.Nodes.P2P.Enabled {
|
||||
switch strings.ToLower(strings.TrimSpace(cfg.Gateway.Nodes.P2P.Transport)) {
|
||||
case "", "websocket_tunnel":
|
||||
nodeP2P = &nodes.WebsocketP2PTransport{Manager: nodesManager}
|
||||
case "webrtc":
|
||||
// Keep the mode explicit but non-default until a direct data channel is production-ready.
|
||||
nodeP2P = &nodes.WebsocketP2PTransport{Manager: nodesManager}
|
||||
}
|
||||
}
|
||||
nodesRouter := &nodes.Router{P2P: nodeP2P, Relay: &nodes.HTTPRelayTransport{Manager: nodesManager}, Policy: nodesManager.DispatchPolicy()}
|
||||
toolsRegistry.Register(tools.NewNodesTool(nodesManager, nodesRouter, filepath.Join(workspace, "memory", "nodes-dispatch-audit.jsonl")))
|
||||
|
||||
if cs != nil {
|
||||
toolsRegistry.Register(tools.NewRemindTool(cs))
|
||||
@@ -331,7 +237,6 @@ func NewAgentLoop(cfg *config.Config, msgBus *bus.MessageBus, provider providers
|
||||
telegramStreaming: cfg.Channels.Telegram.Streaming,
|
||||
subagentManager: subagentManager,
|
||||
subagentRouter: subagentRouter,
|
||||
nodeRouter: nodesRouter,
|
||||
subagentDigestDelay: 5 * time.Second,
|
||||
subagentDigests: map[string]*subagentDigestState{},
|
||||
}
|
||||
@@ -415,9 +320,6 @@ func NewAgentLoop(cfg *config.Config, msgBus *bus.MessageBus, provider providers
|
||||
if run == nil {
|
||||
return "", fmt.Errorf("subagent run is nil")
|
||||
}
|
||||
if strings.EqualFold(strings.TrimSpace(run.Transport), "node") {
|
||||
return loop.dispatchNodeSubagentRun(ctx, run)
|
||||
}
|
||||
sessionKey := strings.TrimSpace(run.SessionKey)
|
||||
if sessionKey == "" {
|
||||
sessionKey = fmt.Sprintf("subagent:%s", strings.TrimSpace(run.ID))
|
||||
@@ -453,68 +355,17 @@ func NewAgentLoop(cfg *config.Config, msgBus *bus.MessageBus, provider providers
|
||||
return loop
|
||||
}
|
||||
|
||||
func (al *AgentLoop) dispatchNodeSubagentRun(ctx context.Context, run *tools.SubagentRun) (string, error) {
|
||||
if al == nil || run == nil {
|
||||
return "", fmt.Errorf("node subagent run is nil")
|
||||
}
|
||||
if al.nodeRouter == nil {
|
||||
return "", fmt.Errorf("node router is not configured")
|
||||
}
|
||||
nodeID := strings.TrimSpace(run.NodeID)
|
||||
if nodeID == "" {
|
||||
return "", fmt.Errorf("node-backed subagent %q missing node_id", run.AgentID)
|
||||
}
|
||||
taskInput := loopRunInputForNode(run)
|
||||
reqArgs := map[string]interface{}{}
|
||||
if remoteAgentID := remoteAgentIDForNodeBranch(run.AgentID, nodeID); remoteAgentID != "" {
|
||||
reqArgs["remote_agent_id"] = remoteAgentID
|
||||
}
|
||||
resp, err := al.nodeRouter.Dispatch(ctx, nodes.Request{
|
||||
Action: "agent_task",
|
||||
Node: nodeID,
|
||||
Task: taskInput,
|
||||
Args: reqArgs,
|
||||
}, "auto")
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
if !resp.OK {
|
||||
if strings.TrimSpace(resp.Error) != "" {
|
||||
return "", fmt.Errorf("node %s agent_task failed: %s", nodeID, strings.TrimSpace(resp.Error))
|
||||
}
|
||||
return "", fmt.Errorf("node %s agent_task failed", nodeID)
|
||||
}
|
||||
if result := nodeAgentTaskResult(resp.Payload); result != "" {
|
||||
return result, nil
|
||||
}
|
||||
return fmt.Sprintf("node %s completed agent_task", nodeID), nil
|
||||
}
|
||||
|
||||
func remoteAgentIDForNodeBranch(agentID, nodeID string) string {
|
||||
agentID = strings.TrimSpace(agentID)
|
||||
nodeID = strings.TrimSpace(nodeID)
|
||||
if agentID == "" || nodeID == "" {
|
||||
return ""
|
||||
}
|
||||
prefix := "node." + nodeID + "."
|
||||
if !strings.HasPrefix(agentID, prefix) {
|
||||
return ""
|
||||
}
|
||||
remote := strings.TrimPrefix(agentID, prefix)
|
||||
if strings.TrimSpace(remote) == "" {
|
||||
return ""
|
||||
}
|
||||
return remote
|
||||
}
|
||||
|
||||
func loopRunInputForNode(run *tools.SubagentRun) string {
|
||||
func (al *AgentLoop) buildSubagentRunInput(run *tools.SubagentRun) string {
|
||||
if run == nil {
|
||||
return ""
|
||||
}
|
||||
if parent := strings.TrimSpace(run.ParentAgentID); parent != "" {
|
||||
return fmt.Sprintf("Parent Agent: %s\nSubtree Branch: %s\n\nTask:\n%s", parent, run.AgentID, strings.TrimSpace(run.Task))
|
||||
taskText := strings.TrimSpace(run.Task)
|
||||
if promptFile := strings.TrimSpace(run.SystemPromptFile); promptFile != "" {
|
||||
if promptText := al.readSubagentPromptFile(promptFile); promptText != "" {
|
||||
return fmt.Sprintf("Role Profile Policy (%s):\n%s\n\nTask:\n%s", promptFile, promptText, taskText)
|
||||
}
|
||||
}
|
||||
return strings.TrimSpace(run.Task)
|
||||
return taskText
|
||||
}
|
||||
|
||||
func nodeAgentTaskResult(payload map[string]interface{}) string {
|
||||
@@ -530,19 +381,6 @@ func nodeAgentTaskResult(payload map[string]interface{}) string {
|
||||
return ""
|
||||
}
|
||||
|
||||
func (al *AgentLoop) buildSubagentRunInput(run *tools.SubagentRun) string {
|
||||
if run == nil {
|
||||
return ""
|
||||
}
|
||||
taskText := strings.TrimSpace(run.Task)
|
||||
if promptFile := strings.TrimSpace(run.SystemPromptFile); promptFile != "" {
|
||||
if promptText := al.readSubagentPromptFile(promptFile); promptText != "" {
|
||||
return fmt.Sprintf("Role Profile Policy (%s):\n%s\n\nTask:\n%s", promptFile, promptText, taskText)
|
||||
}
|
||||
}
|
||||
return taskText
|
||||
}
|
||||
|
||||
func (al *AgentLoop) readSubagentPromptFile(relPath string) string {
|
||||
if al == nil {
|
||||
return ""
|
||||
|
||||
@@ -1,32 +0,0 @@
|
||||
package agent
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/YspCoder/clawgo/pkg/bus"
|
||||
"github.com/YspCoder/clawgo/pkg/config"
|
||||
)
|
||||
|
||||
func TestNewAgentLoopDisablesNodeP2PByDefault(t *testing.T) {
|
||||
cfg := config.DefaultConfig()
|
||||
cfg.Agents.Defaults.Workspace = t.TempDir()
|
||||
|
||||
loop := NewAgentLoop(cfg, bus.NewMessageBus(), stubLLMProvider{}, nil)
|
||||
if loop.nodeRouter == nil {
|
||||
t.Fatalf("expected node router to be configured")
|
||||
}
|
||||
if loop.nodeRouter.P2P != nil {
|
||||
t.Fatalf("expected node p2p transport to be disabled by default")
|
||||
}
|
||||
}
|
||||
|
||||
func TestNewAgentLoopEnablesNodeP2PWhenConfigured(t *testing.T) {
|
||||
cfg := config.DefaultConfig()
|
||||
cfg.Agents.Defaults.Workspace = t.TempDir()
|
||||
cfg.Gateway.Nodes.P2P.Enabled = true
|
||||
|
||||
loop := NewAgentLoop(cfg, bus.NewMessageBus(), stubLLMProvider{}, nil)
|
||||
if loop.nodeRouter == nil || loop.nodeRouter.P2P == nil {
|
||||
t.Fatalf("expected node p2p transport to be enabled")
|
||||
}
|
||||
}
|
||||
@@ -1,61 +0,0 @@
|
||||
package agent
|
||||
|
||||
import (
|
||||
"context"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/YspCoder/clawgo/pkg/nodes"
|
||||
"github.com/YspCoder/clawgo/pkg/tools"
|
||||
)
|
||||
|
||||
func TestDispatchNodeSubagentRunUsesNodeAgentTask(t *testing.T) {
|
||||
manager := nodes.NewManager()
|
||||
manager.Upsert(nodes.NodeInfo{
|
||||
ID: "edge-dev",
|
||||
Name: "Edge Dev",
|
||||
Online: true,
|
||||
Capabilities: nodes.Capabilities{
|
||||
Model: true,
|
||||
},
|
||||
})
|
||||
manager.RegisterHandler("edge-dev", func(req nodes.Request) nodes.Response {
|
||||
if req.Action != "agent_task" {
|
||||
t.Fatalf("unexpected action: %s", req.Action)
|
||||
}
|
||||
if got, _ := req.Args["remote_agent_id"].(string); got != "coder" {
|
||||
t.Fatalf("expected remote_agent_id=coder, got %+v", req.Args)
|
||||
}
|
||||
if !strings.Contains(req.Task, "Parent Agent: main") {
|
||||
t.Fatalf("expected parent-agent context in task, got %q", req.Task)
|
||||
}
|
||||
return nodes.Response{
|
||||
OK: true,
|
||||
Action: req.Action,
|
||||
Node: req.Node,
|
||||
Payload: map[string]interface{}{
|
||||
"result": "remote-main-done",
|
||||
},
|
||||
}
|
||||
})
|
||||
|
||||
loop := &AgentLoop{
|
||||
nodeRouter: &nodes.Router{
|
||||
Relay: &nodes.HTTPRelayTransport{Manager: manager},
|
||||
},
|
||||
}
|
||||
out, err := loop.dispatchNodeSubagentRun(context.Background(), &tools.SubagentRun{
|
||||
ID: "subagent-1",
|
||||
AgentID: "node.edge-dev.coder",
|
||||
Transport: "node",
|
||||
NodeID: "edge-dev",
|
||||
ParentAgentID: "main",
|
||||
Task: "Implement fix on remote node",
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("dispatchNodeSubagentRun failed: %v", err)
|
||||
}
|
||||
if out != "remote-main-done" {
|
||||
t.Fatalf("unexpected node result: %q", out)
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user