From b9e1efa3efd5ccd944f88773736c404dc3b42f75 Mon Sep 17 00:00:00 2001 From: lpf Date: Thu, 5 Mar 2026 22:17:44 +0800 Subject: [PATCH] fix subagent completion notify path and pipeline origin routing --- pkg/agent/loop.go | 78 +++++++++++++++++----- pkg/agent/loop_system_notify_test.go | 68 +++++++++++++++++++ pkg/tools/pipeline_tools.go | 52 ++++++++++++++- pkg/tools/subagent.go | 10 ++- pkg/tools/subagent_runtime_control_test.go | 42 ++++++++++++ 5 files changed, 228 insertions(+), 22 deletions(-) create mode 100644 pkg/agent/loop_system_notify_test.go diff --git a/pkg/agent/loop.go b/pkg/agent/loop.go index 75e7e2c..dc48254 100644 --- a/pkg/agent/loop.go +++ b/pkg/agent/loop.go @@ -668,7 +668,11 @@ func loadHeartbeatAckToken(workspace string) string { func (al *AgentLoop) prepareOutbound(msg bus.InboundMessage, response string) (bus.OutboundMessage, bool) { if shouldDropNoReply(response) { - return bus.OutboundMessage{}, false + if fallback, ok := fallbackSubagentNotification(msg); ok { + response = fallback + } else { + return bus.OutboundMessage{}, false + } } currentMsgID := "" if msg.Metadata != nil { @@ -677,19 +681,18 @@ func (al *AgentLoop) prepareOutbound(msg bus.InboundMessage, response string) (b clean, replyToID := parseReplyTag(response, currentMsgID) clean = strings.TrimSpace(clean) if clean == "" { - return bus.OutboundMessage{}, false + if fallback, ok := fallbackSubagentNotification(msg); ok { + clean = fallback + } else { + return bus.OutboundMessage{}, false + } } if al.shouldSuppressOutbound(msg, clean) { return bus.OutboundMessage{}, false } outbound := bus.OutboundMessage{Channel: msg.Channel, ChatID: msg.ChatID, Content: clean, ReplyToID: strings.TrimSpace(replyToID)} if msg.Channel == "system" { - if originChannel, originChatID, ok := strings.Cut(msg.ChatID, ":"); ok && strings.TrimSpace(originChannel) != "" { - outbound.Channel = originChannel - outbound.ChatID = originChatID - } else { - outbound.Channel = "cli" - } + outbound.Channel, outbound.ChatID = resolveSystemOrigin(msg.ChatID) } return outbound, true } @@ -1075,16 +1078,7 @@ func (al *AgentLoop) processSystemMessage(ctx context.Context, msg bus.InboundMe "chat_id": msg.ChatID, }) - // Parse origin from chat_id (format: "channel:chat_id") - var originChannel, originChatID string - if idx := strings.Index(msg.ChatID, ":"); idx > 0 { - originChannel = msg.ChatID[:idx] - originChatID = msg.ChatID[idx+1:] - } else { - // Fallback - originChannel = "cli" - originChatID = msg.ChatID - } + originChannel, originChatID := resolveSystemOrigin(msg.ChatID) // Use the origin session for context sessionKey := fmt.Sprintf("%s:%s", originChannel, originChatID) @@ -1817,6 +1811,54 @@ func shouldDropNoReply(text string) bool { return strings.EqualFold(t, "NO_REPLY") } +func resolveSystemOrigin(chatID string) (string, string) { + raw := strings.TrimSpace(chatID) + if raw == "" { + return "cli", "direct" + } + originChannel, originChatID, ok := strings.Cut(raw, ":") + if !ok { + return "cli", raw + } + originChannel = strings.TrimSpace(originChannel) + originChatID = strings.TrimSpace(originChatID) + switch { + case originChannel == "" && originChatID == "": + return "cli", "direct" + case originChannel == "": + return "cli", originChatID + case originChatID == "": + return originChannel, "direct" + default: + return originChannel, originChatID + } +} + +func isSubagentSystemMessage(msg bus.InboundMessage) bool { + if msg.Channel != "system" { + return false + } + if msg.Metadata != nil && strings.EqualFold(strings.TrimSpace(msg.Metadata["trigger"]), "subagent") { + return true + } + return strings.HasPrefix(strings.ToLower(strings.TrimSpace(msg.SenderID)), "subagent:") +} + +func fallbackSubagentNotification(msg bus.InboundMessage) (string, bool) { + if !isSubagentSystemMessage(msg) { + return "", false + } + content := strings.TrimSpace(msg.Content) + if content == "" { + id := strings.TrimSpace(strings.TrimPrefix(msg.SenderID, "subagent:")) + if id == "" { + id = "unknown" + } + content = fmt.Sprintf("Subagent %s completed.", id) + } + return content, true +} + func shouldFlushTelegramStreamSnapshot(s string) bool { s = strings.TrimRight(s, " \t") if s == "" { diff --git a/pkg/agent/loop_system_notify_test.go b/pkg/agent/loop_system_notify_test.go new file mode 100644 index 0000000..d052a1d --- /dev/null +++ b/pkg/agent/loop_system_notify_test.go @@ -0,0 +1,68 @@ +package agent + +import ( + "strings" + "testing" + + "clawgo/pkg/bus" +) + +func TestPrepareOutboundSubagentNoReplyFallback(t *testing.T) { + al := &AgentLoop{} + msg := bus.InboundMessage{ + Channel: "system", + SenderID: "subagent:subagent-1", + ChatID: "telegram:9527", + Content: "Task 'coder' completed.\n\nResult:\nOK", + Metadata: map[string]string{ + "trigger": "subagent", + }, + } + + outbound, ok := al.prepareOutbound(msg, "NO_REPLY") + if !ok { + t.Fatalf("expected outbound notification for subagent NO_REPLY fallback") + } + if outbound.Channel != "telegram" || outbound.ChatID != "9527" { + t.Fatalf("unexpected outbound target: %s:%s", outbound.Channel, outbound.ChatID) + } + if strings.TrimSpace(outbound.Content) != strings.TrimSpace(msg.Content) { + t.Fatalf("expected fallback content from system message, got: %q", outbound.Content) + } +} + +func TestPrepareOutboundNoReplySuppressedForNonSubagent(t *testing.T) { + al := &AgentLoop{} + msg := bus.InboundMessage{ + Channel: "cli", + ChatID: "direct", + Content: "hello", + } + + if _, ok := al.prepareOutbound(msg, "NO_REPLY"); ok { + t.Fatalf("expected NO_REPLY to be suppressed for non-subagent messages") + } +} + +func TestPrepareOutboundSubagentNoReplyFallbackWithMissingOrigin(t *testing.T) { + al := &AgentLoop{} + msg := bus.InboundMessage{ + Channel: "system", + SenderID: "subagent:subagent-9", + ChatID: ":", + Metadata: map[string]string{ + "trigger": "subagent", + }, + } + + outbound, ok := al.prepareOutbound(msg, "NO_REPLY") + if !ok { + t.Fatalf("expected outbound notification for malformed system origin") + } + if outbound.Channel != "cli" || outbound.ChatID != "direct" { + t.Fatalf("expected fallback origin cli:direct, got %s:%s", outbound.Channel, outbound.ChatID) + } + if outbound.Content != "Subagent subagent-9 completed." { + t.Fatalf("unexpected fallback content: %q", outbound.Content) + } +} diff --git a/pkg/tools/pipeline_tools.go b/pkg/tools/pipeline_tools.go index f0b5311..5b3e327 100644 --- a/pkg/tools/pipeline_tools.go +++ b/pkg/tools/pipeline_tools.go @@ -53,6 +53,14 @@ func (t *PipelineCreateTool) Parameters() map[string]interface{} { "required": []string{"id", "goal"}, }, }, + "channel": map[string]interface{}{ + "type": "string", + "description": "Optional origin channel for completion notifications (auto-injected in normal chat flow)", + }, + "chat_id": map[string]interface{}{ + "type": "string", + "description": "Optional origin chat ID for completion notifications (auto-injected in normal chat flow)", + }, }, "required": []string{"objective", "tasks"}, } @@ -97,7 +105,8 @@ func (t *PipelineCreateTool) Execute(_ context.Context, args map[string]interfac }) } - p, err := t.orc.CreatePipeline(label, objective, "tool", "tool", specs) + originChannel, originChatID := resolvePipelineOrigin(args, "tool", "tool") + p, err := t.orc.CreatePipeline(label, objective, originChannel, originChatID, specs) if err != nil { return "", err } @@ -227,6 +236,14 @@ func (t *PipelineDispatchTool) Parameters() map[string]interface{} { "description": "Maximum number of tasks to dispatch in this call (default 3)", "default": 3, }, + "channel": map[string]interface{}{ + "type": "string", + "description": "Optional origin channel override for spawned subagents", + }, + "chat_id": map[string]interface{}{ + "type": "string", + "description": "Optional origin chat ID override for spawned subagents", + }, }, "required": []string{"pipeline_id"}, } @@ -247,6 +264,21 @@ func (t *PipelineDispatchTool) Execute(ctx context.Context, args map[string]inte if raw, ok := args["max_dispatch"].(float64); ok && raw > 0 { maxDispatch = int(raw) } + originChannel, originChatID := resolvePipelineOrigin(args, "", "") + if p, ok := t.orc.GetPipeline(pipelineID); ok && p != nil { + if strings.TrimSpace(originChannel) == "" { + originChannel = strings.TrimSpace(p.OriginChannel) + } + if strings.TrimSpace(originChatID) == "" { + originChatID = strings.TrimSpace(p.OriginChatID) + } + } + if strings.TrimSpace(originChannel) == "" { + originChannel = "tool" + } + if strings.TrimSpace(originChatID) == "" { + originChatID = "tool" + } ready, err := t.orc.ReadyTasks(pipelineID) if err != nil { @@ -289,8 +321,8 @@ func (t *PipelineDispatchTool) Execute(ctx context.Context, args map[string]inte Label: label, Role: task.Role, AgentID: agentID, - OriginChannel: "tool", - OriginChatID: "tool", + OriginChannel: originChannel, + OriginChatID: originChatID, PipelineID: pipelineID, PipelineTask: task.ID, }); err != nil { @@ -306,3 +338,17 @@ func (t *PipelineDispatchTool) Execute(ctx context.Context, args map[string]inte } return fmt.Sprintf("Pipeline %s dispatch result:\n%s", pipelineID, strings.Join(lines, "\n")), nil } + +func resolvePipelineOrigin(args map[string]interface{}, defaultChannel, defaultChatID string) (string, string) { + originChannel, _ := args["channel"].(string) + originChatID, _ := args["chat_id"].(string) + originChannel = strings.TrimSpace(originChannel) + originChatID = strings.TrimSpace(originChatID) + if originChannel == "" { + originChannel = strings.TrimSpace(defaultChannel) + } + if originChatID == "" { + originChatID = strings.TrimSpace(defaultChatID) + } + return originChannel, originChatID +} diff --git a/pkg/tools/subagent.go b/pkg/tools/subagent.go index 7afc09e..f889ab2 100644 --- a/pkg/tools/subagent.go +++ b/pkg/tools/subagent.go @@ -275,8 +275,15 @@ func (sm *SubagentManager) runTask(ctx context.Context, task *SubagentTask) { // 2. Result broadcast (keep existing behavior) if sm.bus != nil { prefix := "Task completed" + if runErr != nil { + prefix = "Task failed" + } if task.Label != "" { - prefix = fmt.Sprintf("Task '%s' completed", task.Label) + if runErr != nil { + prefix = fmt.Sprintf("Task '%s' failed", task.Label) + } else { + prefix = fmt.Sprintf("Task '%s' completed", task.Label) + } } announceContent := fmt.Sprintf("%s.\n\nResult:\n%s", prefix, task.Result) if task.PipelineID != "" && task.PipelineTask != "" { @@ -299,6 +306,7 @@ func (sm *SubagentManager) runTask(ctx context.Context, task *SubagentTask) { "timeout_sec": fmt.Sprintf("%d", task.TimeoutSec), "pipeline_id": task.PipelineID, "pipeline_task": task.PipelineTask, + "status": task.Status, }, }) } diff --git a/pkg/tools/subagent_runtime_control_test.go b/pkg/tools/subagent_runtime_control_test.go index 9d64a02..5ec604a 100644 --- a/pkg/tools/subagent_runtime_control_test.go +++ b/pkg/tools/subagent_runtime_control_test.go @@ -3,8 +3,11 @@ package tools import ( "context" "errors" + "strings" "testing" "time" + + "clawgo/pkg/bus" ) func TestSubagentSpawnEnforcesTaskQuota(t *testing.T) { @@ -105,6 +108,45 @@ func TestSubagentRunWithTimeoutFails(t *testing.T) { } } +func TestSubagentBroadcastIncludesFailureStatus(t *testing.T) { + workspace := t.TempDir() + msgBus := bus.NewMessageBus() + defer msgBus.Close() + + manager := NewSubagentManager(nil, workspace, msgBus, nil) + manager.SetRunFunc(func(ctx context.Context, task *SubagentTask) (string, error) { + return "", errors.New("boom") + }) + + _, err := manager.Spawn(context.Background(), SubagentSpawnOptions{ + Task: "failing task", + AgentID: "coder", + OriginChannel: "cli", + OriginChatID: "direct", + }) + if err != nil { + t.Fatalf("spawn failed: %v", err) + } + + task := waitSubagentDone(t, manager, 4*time.Second) + if task.Status != "failed" { + t.Fatalf("expected failed task, got %s", task.Status) + } + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + msg, ok := msgBus.ConsumeInbound(ctx) + if !ok { + t.Fatalf("expected subagent completion message") + } + if got := strings.TrimSpace(msg.Metadata["status"]); got != "failed" { + t.Fatalf("expected metadata status=failed, got %q", got) + } + if !strings.Contains(strings.ToLower(msg.Content), "failed") { + t.Fatalf("expected failure wording in content, got %q", msg.Content) + } +} + func waitSubagentDone(t *testing.T, manager *SubagentManager, timeout time.Duration) *SubagentTask { t.Helper() deadline := time.Now().Add(timeout)