mirror of
https://github.com/YspCoder/clawgo.git
synced 2026-05-17 10:44:52 +08:00
Add internal subagent stream and notify policy
This commit is contained in:
@@ -131,6 +131,7 @@ func (al *AgentLoop) HandleSubagentRuntime(ctx context.Context, action string, a
|
||||
"transport": fallbackString(strings.TrimSpace(subcfg.Transport), "local"),
|
||||
"node_id": strings.TrimSpace(subcfg.NodeID),
|
||||
"parent_agent_id": strings.TrimSpace(subcfg.ParentAgentID),
|
||||
"notify_main_policy": fallbackString(strings.TrimSpace(subcfg.NotifyMainPolicy), "final_only"),
|
||||
"display_name": subcfg.DisplayName,
|
||||
"role": subcfg.Role,
|
||||
"description": subcfg.Description,
|
||||
@@ -157,6 +158,7 @@ func (al *AgentLoop) HandleSubagentRuntime(ctx context.Context, action string, a
|
||||
"transport": profile.Transport,
|
||||
"node_id": profile.NodeID,
|
||||
"parent_agent_id": profile.ParentAgentID,
|
||||
"notify_main_policy": fallbackString(strings.TrimSpace(profile.NotifyMainPolicy), "final_only"),
|
||||
"display_name": profile.Name,
|
||||
"role": profile.Role,
|
||||
"description": "Node-registered remote main agent branch",
|
||||
@@ -360,6 +362,37 @@ func (al *AgentLoop) HandleSubagentRuntime(ctx context.Context, action string, a
|
||||
return nil, err
|
||||
}
|
||||
return map[string]interface{}{"found": true, "thread": thread, "messages": items}, nil
|
||||
case "stream":
|
||||
taskID, err := resolveSubagentTaskIDForRuntime(sm, runtimeStringArg(args, "id"))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
task, ok := sm.GetTask(taskID)
|
||||
if !ok {
|
||||
return map[string]interface{}{"found": false}, nil
|
||||
}
|
||||
events, err := sm.Events(taskID, runtimeIntArg(args, "limit", 100))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var thread *tools.AgentThread
|
||||
var messages []tools.AgentMessage
|
||||
if strings.TrimSpace(task.ThreadID) != "" {
|
||||
if th, ok := sm.Thread(task.ThreadID); ok {
|
||||
thread = th
|
||||
}
|
||||
messages, err = sm.ThreadMessages(task.ThreadID, runtimeIntArg(args, "limit", 100))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
stream := mergeSubagentStream(events, messages)
|
||||
return map[string]interface{}{
|
||||
"found": true,
|
||||
"task": cloneSubagentTask(task),
|
||||
"thread": thread,
|
||||
"items": stream,
|
||||
}, nil
|
||||
case "inbox":
|
||||
agentID := runtimeStringArg(args, "agent_id")
|
||||
if agentID == "" {
|
||||
@@ -386,6 +419,47 @@ func (al *AgentLoop) HandleSubagentRuntime(ctx context.Context, action string, a
|
||||
}
|
||||
}
|
||||
|
||||
func mergeSubagentStream(events []tools.SubagentRunEvent, messages []tools.AgentMessage) []map[string]interface{} {
|
||||
items := make([]map[string]interface{}, 0, len(events)+len(messages))
|
||||
for _, evt := range events {
|
||||
items = append(items, map[string]interface{}{
|
||||
"kind": "event",
|
||||
"at": evt.At,
|
||||
"run_id": evt.RunID,
|
||||
"agent_id": evt.AgentID,
|
||||
"event_type": evt.Type,
|
||||
"status": evt.Status,
|
||||
"message": evt.Message,
|
||||
"retry_count": evt.RetryCount,
|
||||
})
|
||||
}
|
||||
for _, msg := range messages {
|
||||
items = append(items, map[string]interface{}{
|
||||
"kind": "message",
|
||||
"at": msg.CreatedAt,
|
||||
"message_id": msg.MessageID,
|
||||
"thread_id": msg.ThreadID,
|
||||
"from_agent": msg.FromAgent,
|
||||
"to_agent": msg.ToAgent,
|
||||
"reply_to": msg.ReplyTo,
|
||||
"correlation_id": msg.CorrelationID,
|
||||
"message_type": msg.Type,
|
||||
"content": msg.Content,
|
||||
"status": msg.Status,
|
||||
"requires_reply": msg.RequiresReply,
|
||||
})
|
||||
}
|
||||
sort.Slice(items, func(i, j int) bool {
|
||||
left, _ := items[i]["at"].(int64)
|
||||
right, _ := items[j]["at"].(int64)
|
||||
if left != right {
|
||||
return left < right
|
||||
}
|
||||
return fmt.Sprintf("%v", items[i]["kind"]) < fmt.Sprintf("%v", items[j]["kind"])
|
||||
})
|
||||
return items
|
||||
}
|
||||
|
||||
func cloneSubagentTask(in *tools.SubagentTask) *tools.SubagentTask {
|
||||
if in == nil {
|
||||
return nil
|
||||
|
||||
@@ -5,6 +5,7 @@ import (
|
||||
"os"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"clawgo/pkg/config"
|
||||
"clawgo/pkg/runtimecfg"
|
||||
@@ -75,6 +76,7 @@ func TestHandleSubagentRuntimeUpsertConfigSubagent(t *testing.T) {
|
||||
out, err := loop.HandleSubagentRuntime(context.Background(), "upsert_config_subagent", map[string]interface{}{
|
||||
"agent_id": "reviewer",
|
||||
"role": "testing",
|
||||
"notify_main_policy": "internal_only",
|
||||
"display_name": "Review Agent",
|
||||
"system_prompt": "review changes",
|
||||
"system_prompt_file": "agents/reviewer/AGENT.md",
|
||||
@@ -99,6 +101,9 @@ func TestHandleSubagentRuntimeUpsertConfigSubagent(t *testing.T) {
|
||||
if subcfg.SystemPromptFile != "agents/reviewer/AGENT.md" {
|
||||
t.Fatalf("expected system_prompt_file to persist, got %+v", subcfg)
|
||||
}
|
||||
if subcfg.NotifyMainPolicy != "internal_only" {
|
||||
t.Fatalf("expected notify_main_policy to persist, got %+v", subcfg)
|
||||
}
|
||||
if len(reloaded.Agents.Router.Rules) == 0 {
|
||||
t.Fatalf("expected router rules to be persisted")
|
||||
}
|
||||
@@ -316,3 +321,71 @@ func TestHandleSubagentRuntimeProtectsMainAgent(t *testing.T) {
|
||||
t.Fatalf("expected deleting main agent to fail")
|
||||
}
|
||||
}
|
||||
|
||||
func TestHandleSubagentRuntimeStream(t *testing.T) {
|
||||
workspace := t.TempDir()
|
||||
manager := tools.NewSubagentManager(nil, workspace, nil)
|
||||
manager.SetRunFunc(func(ctx context.Context, task *tools.SubagentTask) (string, error) {
|
||||
return "stream-result", nil
|
||||
})
|
||||
loop := &AgentLoop{
|
||||
workspace: workspace,
|
||||
subagentManager: manager,
|
||||
subagentRouter: tools.NewSubagentRouter(manager),
|
||||
}
|
||||
|
||||
out, err := loop.HandleSubagentRuntime(context.Background(), "spawn", map[string]interface{}{
|
||||
"task": "prepare streamable task",
|
||||
"agent_id": "coder",
|
||||
"channel": "webui",
|
||||
"chat_id": "webui",
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("spawn failed: %v", err)
|
||||
}
|
||||
payload, ok := out.(map[string]interface{})
|
||||
if !ok {
|
||||
t.Fatalf("unexpected spawn payload: %T", out)
|
||||
}
|
||||
_ = payload
|
||||
var task *tools.SubagentTask
|
||||
for i := 0; i < 50; i++ {
|
||||
tasks := manager.ListTasks()
|
||||
if len(tasks) > 0 && tasks[0].Status == "completed" {
|
||||
task = tasks[0]
|
||||
break
|
||||
}
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
}
|
||||
if task == nil {
|
||||
t.Fatalf("expected completed task")
|
||||
}
|
||||
|
||||
out, err = loop.HandleSubagentRuntime(context.Background(), "stream", map[string]interface{}{
|
||||
"id": task.ID,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("stream failed: %v", err)
|
||||
}
|
||||
streamPayload, ok := out.(map[string]interface{})
|
||||
if !ok || streamPayload["found"] != true {
|
||||
t.Fatalf("unexpected stream payload: %#v", out)
|
||||
}
|
||||
items, ok := streamPayload["items"].([]map[string]interface{})
|
||||
if !ok || len(items) == 0 {
|
||||
t.Fatalf("expected merged stream items, got %#v", streamPayload["items"])
|
||||
}
|
||||
foundEvent := false
|
||||
foundMessage := false
|
||||
for _, item := range items {
|
||||
switch item["kind"] {
|
||||
case "event":
|
||||
foundEvent = true
|
||||
case "message":
|
||||
foundMessage = true
|
||||
}
|
||||
}
|
||||
if !foundEvent || !foundMessage {
|
||||
t.Fatalf("expected merged event and message items, got %#v", items)
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user