Unify agent topology and subagent memory logging

This commit is contained in:
lpf
2026-03-06 15:14:58 +08:00
parent 86691f75d0
commit cc04d9ab3a
27 changed files with 1408 additions and 791 deletions

View File

@@ -28,6 +28,7 @@ import (
"clawgo/pkg/logger"
"clawgo/pkg/nodes"
"clawgo/pkg/providers"
"clawgo/pkg/runtimecfg"
"clawgo/pkg/scheduling"
"clawgo/pkg/session"
"clawgo/pkg/tools"
@@ -63,8 +64,7 @@ type AgentLoop struct {
orchestrator *tools.Orchestrator
subagentRouter *tools.SubagentRouter
subagentConfigTool *tools.SubagentConfigTool
pendingSubagentDraft map[string]map[string]interface{}
pendingDraftStore *PendingSubagentDraftStore
nodeRouter *nodes.Router
configPath string
}
@@ -188,7 +188,6 @@ func NewAgentLoop(cfg *config.Config, msgBus *bus.MessageBus, provider providers
subagentManager := tools.NewSubagentManager(provider, workspace, msgBus, orchestrator)
subagentRouter := tools.NewSubagentRouter(subagentManager)
subagentConfigTool := tools.NewSubagentConfigTool("")
pendingDraftStore := NewPendingSubagentDraftStore(workspace)
spawnTool := tools.NewSpawnTool(subagentManager)
toolsRegistry.Register(spawnTool)
toolsRegistry.Register(tools.NewSubagentsTool(subagentManager))
@@ -261,11 +260,7 @@ func NewAgentLoop(cfg *config.Config, msgBus *bus.MessageBus, provider providers
orchestrator: orchestrator,
subagentRouter: subagentRouter,
subagentConfigTool: subagentConfigTool,
pendingSubagentDraft: map[string]map[string]interface{}{},
pendingDraftStore: pendingDraftStore,
}
if pendingDraftStore != nil {
loop.pendingSubagentDraft = pendingDraftStore.All()
nodeRouter: nodesRouter,
}
// Initialize provider fallback chain (primary + proxy_fallbacks).
loop.providerPool = map[string]providers.LLMProvider{}
@@ -309,6 +304,9 @@ func NewAgentLoop(cfg *config.Config, msgBus *bus.MessageBus, provider providers
if task == nil {
return "", fmt.Errorf("subagent task is nil")
}
if strings.EqualFold(strings.TrimSpace(task.Transport), "node") {
return loop.dispatchNodeSubagentTask(ctx, task)
}
sessionKey := strings.TrimSpace(task.SessionKey)
if sessionKey == "" {
sessionKey = fmt.Sprintf("subagent:%s", strings.TrimSpace(task.ID))
@@ -320,6 +318,61 @@ func NewAgentLoop(cfg *config.Config, msgBus *bus.MessageBus, provider providers
return loop
}
func (al *AgentLoop) dispatchNodeSubagentTask(ctx context.Context, task *tools.SubagentTask) (string, error) {
if al == nil || task == nil {
return "", fmt.Errorf("node subagent task is nil")
}
if al.nodeRouter == nil {
return "", fmt.Errorf("node router is not configured")
}
nodeID := strings.TrimSpace(task.NodeID)
if nodeID == "" {
return "", fmt.Errorf("node-backed subagent %q missing node_id", task.AgentID)
}
taskInput := loopTaskInputForNode(task)
resp, err := al.nodeRouter.Dispatch(ctx, nodes.Request{
Action: "agent_task",
Node: nodeID,
Task: taskInput,
}, "auto")
if err != nil {
return "", err
}
if !resp.OK {
if strings.TrimSpace(resp.Error) != "" {
return "", fmt.Errorf("node %s agent_task failed: %s", nodeID, strings.TrimSpace(resp.Error))
}
return "", fmt.Errorf("node %s agent_task failed", nodeID)
}
if result := nodeAgentTaskResult(resp.Payload); result != "" {
return result, nil
}
return fmt.Sprintf("node %s completed agent_task", nodeID), nil
}
func loopTaskInputForNode(task *tools.SubagentTask) string {
if task == nil {
return ""
}
if parent := strings.TrimSpace(task.ParentAgentID); parent != "" {
return fmt.Sprintf("Parent Agent: %s\nSubtree Branch: %s\n\nTask:\n%s", parent, task.AgentID, strings.TrimSpace(task.Task))
}
return strings.TrimSpace(task.Task)
}
func nodeAgentTaskResult(payload map[string]interface{}) string {
if len(payload) == 0 {
return ""
}
if result, _ := payload["result"].(string); strings.TrimSpace(result) != "" {
return strings.TrimSpace(result)
}
if content, _ := payload["content"].(string); strings.TrimSpace(content) != "" {
return strings.TrimSpace(content)
}
return ""
}
func (al *AgentLoop) buildSubagentTaskInput(task *tools.SubagentTask) string {
if task == nil {
return ""
@@ -1109,27 +1162,96 @@ func (al *AgentLoop) appendDailySummaryLog(msg bus.InboundMessage, response stri
if strings.TrimSpace(al.workspace) == "" {
return
}
userText := strings.TrimSpace(msg.Content)
respText := strings.TrimSpace(response)
if userText == "" && respText == "" {
if respText == "" {
return
}
// Avoid noisy heartbeat/system boilerplate.
lc := strings.ToLower(userText)
lc := strings.ToLower(strings.TrimSpace(msg.Content))
if strings.Contains(lc, "heartbeat") && strings.Contains(strings.ToLower(respText), "heartbeat_ok") {
return
}
ms := NewMemoryStore(al.workspace)
line := fmt.Sprintf("- [%s] channel=%s session=%s\n - user: %s\n - result: %s",
time.Now().Format("15:04"),
strings.TrimSpace(msg.Channel),
strings.TrimSpace(msg.SessionKey),
truncate(strings.ReplaceAll(userText, "\n", " "), 180),
truncate(strings.ReplaceAll(respText, "\n", " "), 220),
)
if err := ms.AppendToday(line); err != nil {
namespace := resolveInboundMemoryNamespace(msg)
detailLine := al.buildDailySummaryEntry(msg, namespace, respText)
ms := NewMemoryStoreWithNamespace(al.workspace, namespace)
if err := ms.AppendToday(detailLine); err != nil {
logger.WarnCF("agent", logger.C0158, map[string]interface{}{logger.FieldError: err.Error()})
}
if namespace != "main" {
mainLine := al.buildMainMemorySubagentEntry(msg, namespace, respText)
if err := NewMemoryStore(al.workspace).AppendToday(mainLine); err != nil {
logger.WarnCF("agent", logger.C0158, map[string]interface{}{"target": "main", logger.FieldError: err.Error()})
}
}
}
func (al *AgentLoop) buildDailySummaryEntry(msg bus.InboundMessage, namespace, response string) string {
title := al.dailySummaryTitle(msg, namespace)
return fmt.Sprintf("## %s %s\n\n- Did: %s\n- Channel: %s\n- Session: %s",
time.Now().Format("15:04"),
title,
truncate(strings.ReplaceAll(strings.TrimSpace(response), "\n", " "), 320),
strings.TrimSpace(msg.Channel),
strings.TrimSpace(msg.SessionKey),
)
}
func (al *AgentLoop) buildMainMemorySubagentEntry(msg bus.InboundMessage, namespace, response string) string {
title := al.dailySummaryTitle(msg, namespace)
return fmt.Sprintf("## %s %s\n\n- Subagent: %s\n- Did: %s\n- Session: %s",
time.Now().Format("15:04"),
title,
al.memoryAgentTitle(namespace),
truncate(strings.ReplaceAll(strings.TrimSpace(response), "\n", " "), 220),
strings.TrimSpace(msg.SessionKey),
)
}
func (al *AgentLoop) dailySummaryTitle(msg bus.InboundMessage, namespace string) string {
agentName := al.memoryAgentTitle(namespace)
taskTitle := summarizeMemoryTaskTitle(msg.Content)
if taskTitle == "" {
return agentName
}
return fmt.Sprintf("%s | %s", agentName, taskTitle)
}
func (al *AgentLoop) memoryAgentTitle(namespace string) string {
ns := normalizeMemoryNamespace(namespace)
if ns == "main" {
return "Main Agent"
}
cfg := runtimecfg.Get()
if cfg != nil {
if subcfg, ok := cfg.Agents.Subagents[ns]; ok {
if name := strings.TrimSpace(subcfg.DisplayName); name != "" {
return name
}
}
}
return strings.ReplaceAll(ns, "-", " ")
}
func summarizeMemoryTaskTitle(content string) string {
text := strings.TrimSpace(content)
if text == "" {
return ""
}
if idx := strings.Index(text, "Task:\n"); idx >= 0 {
text = strings.TrimSpace(text[idx+len("Task:\n"):])
}
text = strings.ReplaceAll(text, "\r\n", "\n")
if idx := strings.Index(text, "\n"); idx >= 0 {
text = text[:idx]
}
text = strings.TrimSpace(text)
if text == "" {
return ""
}
if len(text) > 72 {
return strings.TrimSpace(text[:69]) + "..."
}
return text
}
func (al *AgentLoop) processSystemMessage(ctx context.Context, msg bus.InboundMessage) (string, error) {