mirror of
https://github.com/YspCoder/clawgo.git
synced 2026-05-19 21:07:30 +08:00
fix subagent completion notify path and pipeline origin routing
This commit is contained in:
@@ -668,7 +668,11 @@ func loadHeartbeatAckToken(workspace string) string {
|
|||||||
|
|
||||||
func (al *AgentLoop) prepareOutbound(msg bus.InboundMessage, response string) (bus.OutboundMessage, bool) {
|
func (al *AgentLoop) prepareOutbound(msg bus.InboundMessage, response string) (bus.OutboundMessage, bool) {
|
||||||
if shouldDropNoReply(response) {
|
if shouldDropNoReply(response) {
|
||||||
return bus.OutboundMessage{}, false
|
if fallback, ok := fallbackSubagentNotification(msg); ok {
|
||||||
|
response = fallback
|
||||||
|
} else {
|
||||||
|
return bus.OutboundMessage{}, false
|
||||||
|
}
|
||||||
}
|
}
|
||||||
currentMsgID := ""
|
currentMsgID := ""
|
||||||
if msg.Metadata != nil {
|
if msg.Metadata != nil {
|
||||||
@@ -677,19 +681,18 @@ func (al *AgentLoop) prepareOutbound(msg bus.InboundMessage, response string) (b
|
|||||||
clean, replyToID := parseReplyTag(response, currentMsgID)
|
clean, replyToID := parseReplyTag(response, currentMsgID)
|
||||||
clean = strings.TrimSpace(clean)
|
clean = strings.TrimSpace(clean)
|
||||||
if clean == "" {
|
if clean == "" {
|
||||||
return bus.OutboundMessage{}, false
|
if fallback, ok := fallbackSubagentNotification(msg); ok {
|
||||||
|
clean = fallback
|
||||||
|
} else {
|
||||||
|
return bus.OutboundMessage{}, false
|
||||||
|
}
|
||||||
}
|
}
|
||||||
if al.shouldSuppressOutbound(msg, clean) {
|
if al.shouldSuppressOutbound(msg, clean) {
|
||||||
return bus.OutboundMessage{}, false
|
return bus.OutboundMessage{}, false
|
||||||
}
|
}
|
||||||
outbound := bus.OutboundMessage{Channel: msg.Channel, ChatID: msg.ChatID, Content: clean, ReplyToID: strings.TrimSpace(replyToID)}
|
outbound := bus.OutboundMessage{Channel: msg.Channel, ChatID: msg.ChatID, Content: clean, ReplyToID: strings.TrimSpace(replyToID)}
|
||||||
if msg.Channel == "system" {
|
if msg.Channel == "system" {
|
||||||
if originChannel, originChatID, ok := strings.Cut(msg.ChatID, ":"); ok && strings.TrimSpace(originChannel) != "" {
|
outbound.Channel, outbound.ChatID = resolveSystemOrigin(msg.ChatID)
|
||||||
outbound.Channel = originChannel
|
|
||||||
outbound.ChatID = originChatID
|
|
||||||
} else {
|
|
||||||
outbound.Channel = "cli"
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
return outbound, true
|
return outbound, true
|
||||||
}
|
}
|
||||||
@@ -1075,16 +1078,7 @@ func (al *AgentLoop) processSystemMessage(ctx context.Context, msg bus.InboundMe
|
|||||||
"chat_id": msg.ChatID,
|
"chat_id": msg.ChatID,
|
||||||
})
|
})
|
||||||
|
|
||||||
// Parse origin from chat_id (format: "channel:chat_id")
|
originChannel, originChatID := resolveSystemOrigin(msg.ChatID)
|
||||||
var originChannel, originChatID string
|
|
||||||
if idx := strings.Index(msg.ChatID, ":"); idx > 0 {
|
|
||||||
originChannel = msg.ChatID[:idx]
|
|
||||||
originChatID = msg.ChatID[idx+1:]
|
|
||||||
} else {
|
|
||||||
// Fallback
|
|
||||||
originChannel = "cli"
|
|
||||||
originChatID = msg.ChatID
|
|
||||||
}
|
|
||||||
|
|
||||||
// Use the origin session for context
|
// Use the origin session for context
|
||||||
sessionKey := fmt.Sprintf("%s:%s", originChannel, originChatID)
|
sessionKey := fmt.Sprintf("%s:%s", originChannel, originChatID)
|
||||||
@@ -1817,6 +1811,54 @@ func shouldDropNoReply(text string) bool {
|
|||||||
return strings.EqualFold(t, "NO_REPLY")
|
return strings.EqualFold(t, "NO_REPLY")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func resolveSystemOrigin(chatID string) (string, string) {
|
||||||
|
raw := strings.TrimSpace(chatID)
|
||||||
|
if raw == "" {
|
||||||
|
return "cli", "direct"
|
||||||
|
}
|
||||||
|
originChannel, originChatID, ok := strings.Cut(raw, ":")
|
||||||
|
if !ok {
|
||||||
|
return "cli", raw
|
||||||
|
}
|
||||||
|
originChannel = strings.TrimSpace(originChannel)
|
||||||
|
originChatID = strings.TrimSpace(originChatID)
|
||||||
|
switch {
|
||||||
|
case originChannel == "" && originChatID == "":
|
||||||
|
return "cli", "direct"
|
||||||
|
case originChannel == "":
|
||||||
|
return "cli", originChatID
|
||||||
|
case originChatID == "":
|
||||||
|
return originChannel, "direct"
|
||||||
|
default:
|
||||||
|
return originChannel, originChatID
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func isSubagentSystemMessage(msg bus.InboundMessage) bool {
|
||||||
|
if msg.Channel != "system" {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
if msg.Metadata != nil && strings.EqualFold(strings.TrimSpace(msg.Metadata["trigger"]), "subagent") {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
return strings.HasPrefix(strings.ToLower(strings.TrimSpace(msg.SenderID)), "subagent:")
|
||||||
|
}
|
||||||
|
|
||||||
|
func fallbackSubagentNotification(msg bus.InboundMessage) (string, bool) {
|
||||||
|
if !isSubagentSystemMessage(msg) {
|
||||||
|
return "", false
|
||||||
|
}
|
||||||
|
content := strings.TrimSpace(msg.Content)
|
||||||
|
if content == "" {
|
||||||
|
id := strings.TrimSpace(strings.TrimPrefix(msg.SenderID, "subagent:"))
|
||||||
|
if id == "" {
|
||||||
|
id = "unknown"
|
||||||
|
}
|
||||||
|
content = fmt.Sprintf("Subagent %s completed.", id)
|
||||||
|
}
|
||||||
|
return content, true
|
||||||
|
}
|
||||||
|
|
||||||
func shouldFlushTelegramStreamSnapshot(s string) bool {
|
func shouldFlushTelegramStreamSnapshot(s string) bool {
|
||||||
s = strings.TrimRight(s, " \t")
|
s = strings.TrimRight(s, " \t")
|
||||||
if s == "" {
|
if s == "" {
|
||||||
|
|||||||
68
pkg/agent/loop_system_notify_test.go
Normal file
68
pkg/agent/loop_system_notify_test.go
Normal file
@@ -0,0 +1,68 @@
|
|||||||
|
package agent
|
||||||
|
|
||||||
|
import (
|
||||||
|
"strings"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"clawgo/pkg/bus"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestPrepareOutboundSubagentNoReplyFallback(t *testing.T) {
|
||||||
|
al := &AgentLoop{}
|
||||||
|
msg := bus.InboundMessage{
|
||||||
|
Channel: "system",
|
||||||
|
SenderID: "subagent:subagent-1",
|
||||||
|
ChatID: "telegram:9527",
|
||||||
|
Content: "Task 'coder' completed.\n\nResult:\nOK",
|
||||||
|
Metadata: map[string]string{
|
||||||
|
"trigger": "subagent",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
outbound, ok := al.prepareOutbound(msg, "NO_REPLY")
|
||||||
|
if !ok {
|
||||||
|
t.Fatalf("expected outbound notification for subagent NO_REPLY fallback")
|
||||||
|
}
|
||||||
|
if outbound.Channel != "telegram" || outbound.ChatID != "9527" {
|
||||||
|
t.Fatalf("unexpected outbound target: %s:%s", outbound.Channel, outbound.ChatID)
|
||||||
|
}
|
||||||
|
if strings.TrimSpace(outbound.Content) != strings.TrimSpace(msg.Content) {
|
||||||
|
t.Fatalf("expected fallback content from system message, got: %q", outbound.Content)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestPrepareOutboundNoReplySuppressedForNonSubagent(t *testing.T) {
|
||||||
|
al := &AgentLoop{}
|
||||||
|
msg := bus.InboundMessage{
|
||||||
|
Channel: "cli",
|
||||||
|
ChatID: "direct",
|
||||||
|
Content: "hello",
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, ok := al.prepareOutbound(msg, "NO_REPLY"); ok {
|
||||||
|
t.Fatalf("expected NO_REPLY to be suppressed for non-subagent messages")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestPrepareOutboundSubagentNoReplyFallbackWithMissingOrigin(t *testing.T) {
|
||||||
|
al := &AgentLoop{}
|
||||||
|
msg := bus.InboundMessage{
|
||||||
|
Channel: "system",
|
||||||
|
SenderID: "subagent:subagent-9",
|
||||||
|
ChatID: ":",
|
||||||
|
Metadata: map[string]string{
|
||||||
|
"trigger": "subagent",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
outbound, ok := al.prepareOutbound(msg, "NO_REPLY")
|
||||||
|
if !ok {
|
||||||
|
t.Fatalf("expected outbound notification for malformed system origin")
|
||||||
|
}
|
||||||
|
if outbound.Channel != "cli" || outbound.ChatID != "direct" {
|
||||||
|
t.Fatalf("expected fallback origin cli:direct, got %s:%s", outbound.Channel, outbound.ChatID)
|
||||||
|
}
|
||||||
|
if outbound.Content != "Subagent subagent-9 completed." {
|
||||||
|
t.Fatalf("unexpected fallback content: %q", outbound.Content)
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -53,6 +53,14 @@ func (t *PipelineCreateTool) Parameters() map[string]interface{} {
|
|||||||
"required": []string{"id", "goal"},
|
"required": []string{"id", "goal"},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
"channel": map[string]interface{}{
|
||||||
|
"type": "string",
|
||||||
|
"description": "Optional origin channel for completion notifications (auto-injected in normal chat flow)",
|
||||||
|
},
|
||||||
|
"chat_id": map[string]interface{}{
|
||||||
|
"type": "string",
|
||||||
|
"description": "Optional origin chat ID for completion notifications (auto-injected in normal chat flow)",
|
||||||
|
},
|
||||||
},
|
},
|
||||||
"required": []string{"objective", "tasks"},
|
"required": []string{"objective", "tasks"},
|
||||||
}
|
}
|
||||||
@@ -97,7 +105,8 @@ func (t *PipelineCreateTool) Execute(_ context.Context, args map[string]interfac
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
p, err := t.orc.CreatePipeline(label, objective, "tool", "tool", specs)
|
originChannel, originChatID := resolvePipelineOrigin(args, "tool", "tool")
|
||||||
|
p, err := t.orc.CreatePipeline(label, objective, originChannel, originChatID, specs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
@@ -227,6 +236,14 @@ func (t *PipelineDispatchTool) Parameters() map[string]interface{} {
|
|||||||
"description": "Maximum number of tasks to dispatch in this call (default 3)",
|
"description": "Maximum number of tasks to dispatch in this call (default 3)",
|
||||||
"default": 3,
|
"default": 3,
|
||||||
},
|
},
|
||||||
|
"channel": map[string]interface{}{
|
||||||
|
"type": "string",
|
||||||
|
"description": "Optional origin channel override for spawned subagents",
|
||||||
|
},
|
||||||
|
"chat_id": map[string]interface{}{
|
||||||
|
"type": "string",
|
||||||
|
"description": "Optional origin chat ID override for spawned subagents",
|
||||||
|
},
|
||||||
},
|
},
|
||||||
"required": []string{"pipeline_id"},
|
"required": []string{"pipeline_id"},
|
||||||
}
|
}
|
||||||
@@ -247,6 +264,21 @@ func (t *PipelineDispatchTool) Execute(ctx context.Context, args map[string]inte
|
|||||||
if raw, ok := args["max_dispatch"].(float64); ok && raw > 0 {
|
if raw, ok := args["max_dispatch"].(float64); ok && raw > 0 {
|
||||||
maxDispatch = int(raw)
|
maxDispatch = int(raw)
|
||||||
}
|
}
|
||||||
|
originChannel, originChatID := resolvePipelineOrigin(args, "", "")
|
||||||
|
if p, ok := t.orc.GetPipeline(pipelineID); ok && p != nil {
|
||||||
|
if strings.TrimSpace(originChannel) == "" {
|
||||||
|
originChannel = strings.TrimSpace(p.OriginChannel)
|
||||||
|
}
|
||||||
|
if strings.TrimSpace(originChatID) == "" {
|
||||||
|
originChatID = strings.TrimSpace(p.OriginChatID)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if strings.TrimSpace(originChannel) == "" {
|
||||||
|
originChannel = "tool"
|
||||||
|
}
|
||||||
|
if strings.TrimSpace(originChatID) == "" {
|
||||||
|
originChatID = "tool"
|
||||||
|
}
|
||||||
|
|
||||||
ready, err := t.orc.ReadyTasks(pipelineID)
|
ready, err := t.orc.ReadyTasks(pipelineID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -289,8 +321,8 @@ func (t *PipelineDispatchTool) Execute(ctx context.Context, args map[string]inte
|
|||||||
Label: label,
|
Label: label,
|
||||||
Role: task.Role,
|
Role: task.Role,
|
||||||
AgentID: agentID,
|
AgentID: agentID,
|
||||||
OriginChannel: "tool",
|
OriginChannel: originChannel,
|
||||||
OriginChatID: "tool",
|
OriginChatID: originChatID,
|
||||||
PipelineID: pipelineID,
|
PipelineID: pipelineID,
|
||||||
PipelineTask: task.ID,
|
PipelineTask: task.ID,
|
||||||
}); err != nil {
|
}); err != nil {
|
||||||
@@ -306,3 +338,17 @@ func (t *PipelineDispatchTool) Execute(ctx context.Context, args map[string]inte
|
|||||||
}
|
}
|
||||||
return fmt.Sprintf("Pipeline %s dispatch result:\n%s", pipelineID, strings.Join(lines, "\n")), nil
|
return fmt.Sprintf("Pipeline %s dispatch result:\n%s", pipelineID, strings.Join(lines, "\n")), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func resolvePipelineOrigin(args map[string]interface{}, defaultChannel, defaultChatID string) (string, string) {
|
||||||
|
originChannel, _ := args["channel"].(string)
|
||||||
|
originChatID, _ := args["chat_id"].(string)
|
||||||
|
originChannel = strings.TrimSpace(originChannel)
|
||||||
|
originChatID = strings.TrimSpace(originChatID)
|
||||||
|
if originChannel == "" {
|
||||||
|
originChannel = strings.TrimSpace(defaultChannel)
|
||||||
|
}
|
||||||
|
if originChatID == "" {
|
||||||
|
originChatID = strings.TrimSpace(defaultChatID)
|
||||||
|
}
|
||||||
|
return originChannel, originChatID
|
||||||
|
}
|
||||||
|
|||||||
@@ -275,8 +275,15 @@ func (sm *SubagentManager) runTask(ctx context.Context, task *SubagentTask) {
|
|||||||
// 2. Result broadcast (keep existing behavior)
|
// 2. Result broadcast (keep existing behavior)
|
||||||
if sm.bus != nil {
|
if sm.bus != nil {
|
||||||
prefix := "Task completed"
|
prefix := "Task completed"
|
||||||
|
if runErr != nil {
|
||||||
|
prefix = "Task failed"
|
||||||
|
}
|
||||||
if task.Label != "" {
|
if task.Label != "" {
|
||||||
prefix = fmt.Sprintf("Task '%s' completed", task.Label)
|
if runErr != nil {
|
||||||
|
prefix = fmt.Sprintf("Task '%s' failed", task.Label)
|
||||||
|
} else {
|
||||||
|
prefix = fmt.Sprintf("Task '%s' completed", task.Label)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
announceContent := fmt.Sprintf("%s.\n\nResult:\n%s", prefix, task.Result)
|
announceContent := fmt.Sprintf("%s.\n\nResult:\n%s", prefix, task.Result)
|
||||||
if task.PipelineID != "" && task.PipelineTask != "" {
|
if task.PipelineID != "" && task.PipelineTask != "" {
|
||||||
@@ -299,6 +306,7 @@ func (sm *SubagentManager) runTask(ctx context.Context, task *SubagentTask) {
|
|||||||
"timeout_sec": fmt.Sprintf("%d", task.TimeoutSec),
|
"timeout_sec": fmt.Sprintf("%d", task.TimeoutSec),
|
||||||
"pipeline_id": task.PipelineID,
|
"pipeline_id": task.PipelineID,
|
||||||
"pipeline_task": task.PipelineTask,
|
"pipeline_task": task.PipelineTask,
|
||||||
|
"status": task.Status,
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -3,8 +3,11 @@ package tools
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"clawgo/pkg/bus"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestSubagentSpawnEnforcesTaskQuota(t *testing.T) {
|
func TestSubagentSpawnEnforcesTaskQuota(t *testing.T) {
|
||||||
@@ -105,6 +108,45 @@ func TestSubagentRunWithTimeoutFails(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestSubagentBroadcastIncludesFailureStatus(t *testing.T) {
|
||||||
|
workspace := t.TempDir()
|
||||||
|
msgBus := bus.NewMessageBus()
|
||||||
|
defer msgBus.Close()
|
||||||
|
|
||||||
|
manager := NewSubagentManager(nil, workspace, msgBus, nil)
|
||||||
|
manager.SetRunFunc(func(ctx context.Context, task *SubagentTask) (string, error) {
|
||||||
|
return "", errors.New("boom")
|
||||||
|
})
|
||||||
|
|
||||||
|
_, err := manager.Spawn(context.Background(), SubagentSpawnOptions{
|
||||||
|
Task: "failing task",
|
||||||
|
AgentID: "coder",
|
||||||
|
OriginChannel: "cli",
|
||||||
|
OriginChatID: "direct",
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("spawn failed: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
task := waitSubagentDone(t, manager, 4*time.Second)
|
||||||
|
if task.Status != "failed" {
|
||||||
|
t.Fatalf("expected failed task, got %s", task.Status)
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
msg, ok := msgBus.ConsumeInbound(ctx)
|
||||||
|
if !ok {
|
||||||
|
t.Fatalf("expected subagent completion message")
|
||||||
|
}
|
||||||
|
if got := strings.TrimSpace(msg.Metadata["status"]); got != "failed" {
|
||||||
|
t.Fatalf("expected metadata status=failed, got %q", got)
|
||||||
|
}
|
||||||
|
if !strings.Contains(strings.ToLower(msg.Content), "failed") {
|
||||||
|
t.Fatalf("expected failure wording in content, got %q", msg.Content)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func waitSubagentDone(t *testing.T, manager *SubagentManager, timeout time.Duration) *SubagentTask {
|
func waitSubagentDone(t *testing.T, manager *SubagentManager, timeout time.Duration) *SubagentTask {
|
||||||
t.Helper()
|
t.Helper()
|
||||||
deadline := time.Now().Add(timeout)
|
deadline := time.Now().Add(timeout)
|
||||||
|
|||||||
Reference in New Issue
Block a user