Files
clawgo/pkg/tools/subagent.go

1136 lines
32 KiB
Go

package tools
import (
"context"
"errors"
"fmt"
"os"
"path/filepath"
"strings"
"sync"
"time"
"github.com/YspCoder/clawgo/pkg/bus"
"github.com/YspCoder/clawgo/pkg/providers"
)
type SubagentRun struct {
ID string `json:"id"`
Task string `json:"task"`
Label string `json:"label"`
Role string `json:"role"`
AgentID string `json:"agent_id"`
Transport string `json:"transport,omitempty"`
ParentAgentID string `json:"parent_agent_id,omitempty"`
NotifyMainPolicy string `json:"notify_main_policy,omitempty"`
SessionKey string `json:"session_key"`
MemoryNS string `json:"memory_ns"`
SystemPromptFile string `json:"system_prompt_file,omitempty"`
ToolAllowlist []string `json:"tool_allowlist,omitempty"`
MaxRetries int `json:"max_retries,omitempty"`
RetryBackoff int `json:"retry_backoff,omitempty"`
TimeoutSec int `json:"timeout_sec,omitempty"`
MaxToolIterations int `json:"max_tool_iterations,omitempty"`
MaxTaskChars int `json:"max_task_chars,omitempty"`
MaxResultChars int `json:"max_result_chars,omitempty"`
RetryCount int `json:"retry_count,omitempty"`
IterationCount int `json:"iteration_count,omitempty"`
AttemptCount int `json:"attempt_count,omitempty"`
RestartCount int `json:"restart_count,omitempty"`
PromptTokens int `json:"prompt_tokens,omitempty"`
CompletionTokens int `json:"completion_tokens,omitempty"`
TotalTokens int `json:"total_tokens,omitempty"`
LastFailureCode string `json:"last_failure_code,omitempty"`
ThreadID string `json:"thread_id,omitempty"`
CorrelationID string `json:"correlation_id,omitempty"`
ParentRunID string `json:"parent_run_id,omitempty"`
LastMessageID string `json:"last_message_id,omitempty"`
WaitingReply bool `json:"waiting_for_reply,omitempty"`
SharedState map[string]interface{} `json:"shared_state,omitempty"`
OriginChannel string `json:"origin_channel,omitempty"`
OriginChatID string `json:"origin_chat_id,omitempty"`
Status string `json:"status"`
Result string `json:"result,omitempty"`
Steering []string `json:"steering,omitempty"`
Created int64 `json:"created"`
Updated int64 `json:"updated"`
}
type SubagentManager struct {
runs map[string]*SubagentRun
cancelFuncs map[string]context.CancelFunc
waiters map[string]map[chan struct{}]struct{}
recoverableRunIDs []string
archiveAfterMinute int64
mu sync.RWMutex
provider providers.LLMProvider
bus *bus.MessageBus
workspace string
nextID int
runFunc SubagentRunFunc
profileStore *SubagentProfileStore
runStore *SubagentRunStore
mailboxStore *AgentMailboxStore
}
type SubagentSpawnOptions struct {
Task string
Label string
Role string
AgentID string
NotifyMainPolicy string
MaxRetries int
RetryBackoff int
TimeoutSec int
MaxToolIterations int
MaxTaskChars int
MaxResultChars int
OriginChannel string
OriginChatID string
ThreadID string
CorrelationID string
ParentRunID string
}
func NewSubagentManager(provider providers.LLMProvider, workspace string, bus *bus.MessageBus) *SubagentManager {
store := NewSubagentProfileStore(workspace)
runStore := NewSubagentRunStore(workspace)
mailboxStore := NewAgentMailboxStore(workspace)
mgr := &SubagentManager{
runs: make(map[string]*SubagentRun),
cancelFuncs: make(map[string]context.CancelFunc),
waiters: make(map[string]map[chan struct{}]struct{}),
archiveAfterMinute: 60,
provider: provider,
bus: bus,
workspace: workspace,
nextID: 1,
profileStore: store,
runStore: runStore,
mailboxStore: mailboxStore,
}
if runStore != nil {
for _, run := range runStore.List() {
mgr.runs[run.ID] = run
if run.Status == RuntimeStatusRunning {
mgr.recoverableRunIDs = append(mgr.recoverableRunIDs, run.ID)
}
}
mgr.nextID = runStore.NextIDSeed()
}
go mgr.resumeRecoveredRuns()
return mgr
}
func (sm *SubagentManager) Spawn(ctx context.Context, opts SubagentSpawnOptions) (string, error) {
run, err := sm.spawnRun(ctx, opts)
if err != nil {
return "", err
}
desc := fmt.Sprintf("Spawned subagent for task: %s (agent=%s)", run.Task, run.AgentID)
if run.Label != "" {
desc = fmt.Sprintf("Spawned subagent '%s' for task: %s (agent=%s)", run.Label, run.Task, run.AgentID)
}
if run.Role != "" {
desc += fmt.Sprintf(" role=%s", run.Role)
}
return desc, nil
}
func (sm *SubagentManager) SpawnRun(ctx context.Context, opts SubagentSpawnOptions) (*SubagentRun, error) {
return sm.spawnRun(ctx, opts)
}
func (sm *SubagentManager) spawnRun(ctx context.Context, opts SubagentSpawnOptions) (*SubagentRun, error) {
task := strings.TrimSpace(opts.Task)
if task == "" {
return nil, fmt.Errorf("task is required")
}
label := strings.TrimSpace(opts.Label)
role := strings.TrimSpace(opts.Role)
agentID := normalizeSubagentIdentifier(opts.AgentID)
originalRole := role
var profile *SubagentProfile
if sm.profileStore != nil {
if agentID != "" {
if p, ok, err := sm.profileStore.Get(agentID); err != nil {
return nil, err
} else if ok {
profile = p
}
} else if role != "" {
if p, ok, err := sm.profileStore.FindByRole(role); err != nil {
return nil, err
} else if ok {
profile = p
agentID = normalizeSubagentIdentifier(p.AgentID)
}
}
}
if agentID == "" {
agentID = normalizeSubagentIdentifier(role)
}
if agentID == "" {
agentID = "default"
}
memoryNS := agentID
systemPromptFile := ""
transport := "local"
parentAgentID := ""
notifyMainPolicy := "final_only"
toolAllowlist := []string(nil)
maxRetries := 0
retryBackoff := 1000
timeoutSec := 0
maxToolIterations := 0
maxTaskChars := 0
maxResultChars := 0
if profile == nil && sm.profileStore != nil {
if p, ok, err := sm.profileStore.Get(agentID); err != nil {
return nil, err
} else if ok {
profile = p
}
}
if profile != nil {
if strings.EqualFold(strings.TrimSpace(profile.Status), "disabled") {
return nil, fmt.Errorf("subagent profile '%s' is disabled", profile.AgentID)
}
if label == "" {
label = strings.TrimSpace(profile.Name)
}
if role == "" {
role = strings.TrimSpace(profile.Role)
}
if ns := normalizeSubagentIdentifier(profile.MemoryNamespace); ns != "" {
memoryNS = ns
}
transport = strings.TrimSpace(profile.Transport)
if transport == "" {
transport = "local"
}
parentAgentID = strings.TrimSpace(profile.ParentAgentID)
notifyMainPolicy = normalizeNotifyMainPolicy(profile.NotifyMainPolicy)
systemPromptFile = strings.TrimSpace(profile.SystemPromptFile)
toolAllowlist = append([]string(nil), profile.ToolAllowlist...)
maxRetries = profile.MaxRetries
retryBackoff = profile.RetryBackoff
timeoutSec = profile.TimeoutSec
maxToolIterations = profile.MaxToolIterations
maxTaskChars = profile.MaxTaskChars
maxResultChars = profile.MaxResultChars
}
if opts.MaxRetries > 0 {
maxRetries = opts.MaxRetries
}
if opts.RetryBackoff > 0 {
retryBackoff = opts.RetryBackoff
}
if opts.TimeoutSec > 0 {
timeoutSec = opts.TimeoutSec
}
if opts.MaxToolIterations > 0 {
maxToolIterations = opts.MaxToolIterations
}
if opts.MaxTaskChars > 0 {
maxTaskChars = opts.MaxTaskChars
}
if opts.MaxResultChars > 0 {
maxResultChars = opts.MaxResultChars
}
if maxTaskChars > 0 && len(task) > maxTaskChars {
return nil, fmt.Errorf("task exceeds max_task_chars quota (%d > %d)", len(task), maxTaskChars)
}
maxRetries = normalizePositiveBound(maxRetries, 0, 8)
retryBackoff = normalizePositiveBound(retryBackoff, 500, 120000)
timeoutSec = normalizePositiveBound(timeoutSec, 0, 3600)
maxToolIterations = normalizePositiveBound(maxToolIterations, 0, 200)
maxTaskChars = normalizePositiveBound(maxTaskChars, 0, 400000)
maxResultChars = normalizePositiveBound(maxResultChars, 0, 400000)
if role == "" {
role = originalRole
}
originChannel := strings.TrimSpace(opts.OriginChannel)
originChatID := strings.TrimSpace(opts.OriginChatID)
if raw := strings.TrimSpace(opts.NotifyMainPolicy); raw != "" {
notifyMainPolicy = normalizeNotifyMainPolicy(raw)
}
threadID := strings.TrimSpace(opts.ThreadID)
correlationID := strings.TrimSpace(opts.CorrelationID)
parentRunID := strings.TrimSpace(opts.ParentRunID)
sm.mu.Lock()
defer sm.mu.Unlock()
runID := fmt.Sprintf("subagent-%d", sm.nextID)
sm.nextID++
sessionKey := buildSubagentSessionKey(agentID, runID)
now := time.Now().UnixMilli()
if correlationID == "" {
correlationID = runID
}
if sm.mailboxStore != nil {
thread, err := sm.mailboxStore.EnsureThread(AgentThread{
ThreadID: threadID,
Owner: "main",
Participants: []string{"main", agentID},
Status: "open",
Topic: task,
CreatedAt: now,
UpdatedAt: now,
})
if err == nil {
threadID = thread.ThreadID
}
}
subagentRun := &SubagentRun{
ID: runID,
Task: task,
Label: label,
Role: role,
AgentID: agentID,
Transport: transport,
ParentAgentID: parentAgentID,
NotifyMainPolicy: notifyMainPolicy,
SessionKey: sessionKey,
MemoryNS: memoryNS,
SystemPromptFile: systemPromptFile,
ToolAllowlist: toolAllowlist,
MaxRetries: maxRetries,
RetryBackoff: retryBackoff,
TimeoutSec: timeoutSec,
MaxToolIterations: maxToolIterations,
MaxTaskChars: maxTaskChars,
MaxResultChars: maxResultChars,
RetryCount: 0,
ThreadID: threadID,
CorrelationID: correlationID,
ParentRunID: parentRunID,
OriginChannel: originChannel,
OriginChatID: originChatID,
Status: RuntimeStatusRouting,
Created: now,
Updated: now,
}
taskCtx, cancel := context.WithCancel(ctx)
sm.runs[runID] = subagentRun
sm.cancelFuncs[runID] = cancel
sm.recordMailboxMessageLocked(subagentRun, AgentMessage{
ThreadID: threadID,
FromAgent: "main",
ToAgent: agentID,
CorrelationID: correlationID,
Type: "task",
Content: task,
RequiresReply: true,
Status: "queued",
CreatedAt: now,
})
sm.persistRunLocked(subagentRun, "spawned", "")
go sm.runSubagent(taskCtx, subagentRun)
return cloneSubagentRun(subagentRun), nil
}
func (sm *SubagentManager) runSubagent(ctx context.Context, run *SubagentRun) {
defer func() {
sm.mu.Lock()
delete(sm.cancelFuncs, run.ID)
sm.mu.Unlock()
}()
sm.mu.Lock()
run.Status = RuntimeStatusRunning
run.Created = time.Now().UnixMilli()
run.Updated = run.Created
sm.persistRunLocked(run, "started", "")
sm.mu.Unlock()
result, runErr := sm.runWithRetry(ctx, run)
sm.mu.Lock()
if runErr != nil {
run.Status = RuntimeStatusFailed
run.LastFailureCode = classifySubagentFailureCode(runErr)
run.Result = fmt.Sprintf("Error: %v", runErr)
run.Result = applySubagentResultQuota(run.Result, run.MaxResultChars)
run.Updated = time.Now().UnixMilli()
run.WaitingReply = false
sm.recordMailboxMessageLocked(run, AgentMessage{
ThreadID: run.ThreadID,
FromAgent: run.AgentID,
ToAgent: "main",
ReplyTo: run.LastMessageID,
CorrelationID: run.CorrelationID,
Type: "result",
Content: run.Result,
Status: "delivered",
CreatedAt: run.Updated,
})
sm.persistRunLocked(run, "failed", run.Result)
sm.notifyRunWaitersLocked(run.ID)
} else {
run.Status = RuntimeStatusCompleted
run.LastFailureCode = ""
run.Result = applySubagentResultQuota(result, run.MaxResultChars)
run.Updated = time.Now().UnixMilli()
run.WaitingReply = false
sm.recordMailboxMessageLocked(run, AgentMessage{
ThreadID: run.ThreadID,
FromAgent: run.AgentID,
ToAgent: "main",
ReplyTo: run.LastMessageID,
CorrelationID: run.CorrelationID,
Type: "result",
Content: run.Result,
Status: "delivered",
CreatedAt: run.Updated,
})
sm.persistRunLocked(run, "completed", run.Result)
sm.notifyRunWaitersLocked(run.ID)
}
sm.mu.Unlock()
// 2. Result broadcast
if sm.bus != nil && shouldNotifyMainOnFinal(run.NotifyMainPolicy, runErr, run) {
announceContent, notifyReason := buildSubagentMainNotification(run, runErr)
sm.bus.PublishInbound(bus.InboundMessage{
Channel: "system",
SenderID: fmt.Sprintf("subagent:%s", run.ID),
ChatID: fmt.Sprintf("%s:%s", run.OriginChannel, run.OriginChatID),
SessionKey: run.SessionKey,
Content: announceContent,
Metadata: map[string]string{
"trigger": "subagent",
"subagent_id": run.ID,
"agent_id": run.AgentID,
"role": run.Role,
"session_key": run.SessionKey,
"memory_ns": run.MemoryNS,
"retry_count": fmt.Sprintf("%d", run.RetryCount),
"iteration_count": fmt.Sprintf("%d", run.IterationCount),
"timeout_sec": fmt.Sprintf("%d", run.TimeoutSec),
"status": run.Status,
"notify_reason": notifyReason,
},
})
}
}
func normalizeNotifyMainPolicy(v string) string {
switch strings.ToLower(strings.TrimSpace(v)) {
case "", "final_only":
return "final_only"
case "milestone", "on_blocked", "always", "internal_only":
return strings.ToLower(strings.TrimSpace(v))
default:
return "final_only"
}
}
func shouldNotifyMainOnFinal(policy string, runErr error, run *SubagentRun) bool {
switch normalizeNotifyMainPolicy(policy) {
case "internal_only":
return false
case "always", "final_only":
return true
case "on_blocked":
return isBlockedSubagentError(runErr)
case "milestone":
return false
default:
return true
}
}
func buildSubagentMainNotification(run *SubagentRun, runErr error) (string, string) {
status := "completed"
reason := "final"
if runErr != nil {
status = "failed"
if isBlockedSubagentError(runErr) {
status = "blocked"
reason = "blocked"
}
}
return fmt.Sprintf(
"Subagent update\nagent: %s\nrun: %s\nstatus: %s\nreason: %s\ntask: %s\nsummary: %s",
strings.TrimSpace(run.AgentID),
strings.TrimSpace(run.ID),
status,
reason,
summarizeSubagentText(firstNonEmpty(run.Label, run.Task), 120),
summarizeSubagentText(run.Result, 280),
), reason
}
func isBlockedSubagentError(err error) bool {
if err == nil {
return false
}
if errors.Is(err, context.DeadlineExceeded) {
return true
}
msg := strings.ToLower(strings.TrimSpace(err.Error()))
if msg == "" {
return false
}
blockedHints := []string{
"timeout",
"deadline exceeded",
"quota",
"rate limit",
"too many requests",
"permission denied",
"requires input",
"waiting for reply",
"blocked",
}
for _, hint := range blockedHints {
if strings.Contains(msg, hint) {
return true
}
}
return false
}
func summarizeSubagentText(s string, max int) string {
s = strings.TrimSpace(strings.ReplaceAll(s, "\r\n", "\n"))
s = strings.ReplaceAll(s, "\n", " ")
s = strings.Join(strings.Fields(s), " ")
if s == "" {
return "(empty)"
}
if max > 0 && len(s) > max {
return strings.TrimSpace(s[:max-3]) + "..."
}
return s
}
func firstNonEmpty(values ...string) string {
for _, v := range values {
if strings.TrimSpace(v) != "" {
return strings.TrimSpace(v)
}
}
return ""
}
func (sm *SubagentManager) runWithRetry(ctx context.Context, run *SubagentRun) (string, error) {
maxRetries := normalizePositiveBound(run.MaxRetries, 0, 8)
backoffMs := normalizePositiveBound(run.RetryBackoff, 500, 120000)
timeoutSec := normalizePositiveBound(run.TimeoutSec, 0, 3600)
var lastErr error
for attempt := 0; attempt <= maxRetries; attempt++ {
if remaining := sm.remainingIterations(run); remaining == 0 && run.MaxToolIterations > 0 {
run.LastFailureCode = "retry_limit"
return "", fmt.Errorf("subagent iteration budget exhausted")
}
runCtx := ctx
var cancel context.CancelFunc
if timeoutSec > 0 {
runCtx, cancel = context.WithTimeout(ctx, time.Duration(timeoutSec)*time.Second)
}
result, err := sm.executeRunOnce(runCtx, run)
if cancel != nil {
cancel()
}
if err == nil {
sm.mu.Lock()
run.RetryCount = attempt
run.LastFailureCode = ""
run.Updated = time.Now().UnixMilli()
sm.persistRunLocked(run, "attempt_succeeded", "")
sm.mu.Unlock()
return result, nil
}
lastErr = err
run.LastFailureCode = classifySubagentFailureCode(err)
sm.mu.Lock()
run.RetryCount = attempt
run.Updated = time.Now().UnixMilli()
sm.persistRunLocked(run, "attempt_failed", err.Error())
sm.mu.Unlock()
if attempt >= maxRetries || !shouldRetrySubagentError(err) {
break
}
select {
case <-ctx.Done():
return "", ctx.Err()
case <-time.After(time.Duration(backoffMs) * time.Millisecond):
}
}
if lastErr == nil {
lastErr = fmt.Errorf("subagent run failed with unknown error")
}
return "", lastErr
}
func (sm *SubagentManager) executeRunOnce(ctx context.Context, run *SubagentRun) (string, error) {
if run == nil {
return "", fmt.Errorf("subagent run is nil")
}
pending, consumedIDs := sm.consumeThreadInbox(run)
stats := &SubagentExecutionStats{}
ctx = WithSubagentExecutionStats(ctx, stats)
if sm.runFunc != nil {
if remaining := sm.remainingIterations(run); remaining > 0 {
ctx = WithSubagentIterationBudget(ctx, remaining)
}
result, err := sm.runFunc(ctx, run)
sm.applyExecutionStats(run, stats)
if err != nil {
sm.restoreMessageStatuses(consumedIDs)
} else {
sm.ackMessageStatuses(consumedIDs)
}
return result, err
}
if sm.provider == nil {
sm.restoreMessageStatuses(consumedIDs)
return "", fmt.Errorf("no llm provider configured for subagent execution")
}
systemPrompt := sm.resolveSystemPrompt(run)
messages := []providers.Message{
{
Role: "system",
Content: systemPrompt,
},
{
Role: "user",
Content: run.Task,
},
}
if strings.TrimSpace(pending) != "" {
messages = append(messages, providers.Message{
Role: "user",
Content: "Mailbox updates on this thread:\n" + pending,
})
}
response, err := sm.provider.Chat(ctx, messages, nil, sm.provider.GetDefaultModel(), map[string]interface{}{
"max_tokens": 4096,
})
stats.Attempts++
sm.applyExecutionStats(run, stats)
if err != nil {
sm.restoreMessageStatuses(consumedIDs)
return "", err
}
sm.ackMessageStatuses(consumedIDs)
return response.Content, nil
}
func (sm *SubagentManager) resolveSystemPrompt(run *SubagentRun) string {
systemPrompt := "You are a subagent. Follow workspace AGENTS.md and complete the task independently."
workspacePrompt := sm.readWorkspacePromptFile("AGENTS.md")
if workspacePrompt != "" {
systemPrompt = "Workspace policy (AGENTS.md):\n" + workspacePrompt + "\n\nComplete the given task independently and report the result."
}
if run == nil {
return systemPrompt
}
if promptFile := strings.TrimSpace(run.SystemPromptFile); promptFile != "" {
if promptText := sm.readWorkspacePromptFile(promptFile); promptText != "" {
return systemPrompt + "\n\nSubagent policy (" + promptFile + "):\n" + promptText
}
}
return systemPrompt
}
func (sm *SubagentManager) readWorkspacePromptFile(relPath string) string {
ws := strings.TrimSpace(sm.workspace)
relPath = strings.TrimSpace(relPath)
if ws == "" || relPath == "" || filepath.IsAbs(relPath) {
return ""
}
fullPath := filepath.Clean(filepath.Join(ws, relPath))
relToWorkspace, err := filepath.Rel(ws, fullPath)
if err != nil || strings.HasPrefix(relToWorkspace, "..") {
return ""
}
data, err := os.ReadFile(fullPath)
if err != nil {
return ""
}
return strings.TrimSpace(string(data))
}
type SubagentRunFunc func(ctx context.Context, run *SubagentRun) (string, error)
func (sm *SubagentManager) SetRunFunc(f SubagentRunFunc) {
sm.mu.Lock()
defer sm.mu.Unlock()
sm.runFunc = f
go sm.resumeRecoveredRuns()
}
func (sm *SubagentManager) ProfileStore() *SubagentProfileStore {
sm.mu.RLock()
defer sm.mu.RUnlock()
return sm.profileStore
}
func (sm *SubagentManager) resumeRecoveredRuns() {
if sm == nil {
return
}
sm.mu.Lock()
if sm.runFunc == nil && sm.provider == nil {
sm.mu.Unlock()
return
}
runIDs := append([]string(nil), sm.recoverableRunIDs...)
sm.recoverableRunIDs = nil
toResume := make([]*SubagentRun, 0, len(runIDs))
for _, runID := range runIDs {
run, ok := sm.runs[runID]
if !ok || run == nil || run.Status != "running" {
continue
}
run.Updated = time.Now().UnixMilli()
sm.persistRunLocked(run, "recovered", "auto-resumed after restart")
toResume = append(toResume, run)
}
sm.mu.Unlock()
for _, run := range toResume {
taskCtx, cancel := context.WithCancel(context.Background())
sm.mu.Lock()
sm.cancelFuncs[run.ID] = cancel
sm.mu.Unlock()
go sm.runSubagent(taskCtx, run)
}
}
func (sm *SubagentManager) NextRunSequence() int {
sm.mu.RLock()
defer sm.mu.RUnlock()
return sm.nextID
}
func (sm *SubagentManager) listRuns() []*SubagentRun {
sm.mu.Lock()
defer sm.mu.Unlock()
sm.pruneArchivedLocked()
runs := make([]*SubagentRun, 0, len(sm.runs))
seen := make(map[string]struct{}, len(sm.runs))
for _, run := range sm.runs {
runs = append(runs, run)
seen[run.ID] = struct{}{}
}
if sm.runStore != nil {
for _, run := range sm.runStore.List() {
if _, ok := seen[run.ID]; ok {
continue
}
runs = append(runs, run)
}
}
return runs
}
func (sm *SubagentManager) Events(runID string, limit int) ([]SubagentRunEvent, error) {
if sm.runStore == nil {
return nil, nil
}
return sm.runStore.Events(runID, limit)
}
func (sm *SubagentManager) RuntimeSnapshot(limit int) RuntimeSnapshot {
if sm == nil {
return RuntimeSnapshot{}
}
runs := sm.listRuns()
snapshot := RuntimeSnapshot{
Requests: make([]RequestRecord, 0, len(runs)),
Runs: make([]RunRecord, 0, len(runs)),
}
seenThreads := map[string]struct{}{}
for _, run := range runs {
snapshot.Requests = append(snapshot.Requests, runToRequestRecord(run))
snapshot.Runs = append(snapshot.Runs, runToRunRecord(run))
if evts, err := sm.Events(run.ID, limit); err == nil {
for _, evt := range evts {
snapshot.Events = append(snapshot.Events, EventRecord{
ID: EventRecordID(evt.RunID, evt.Type, evt.At),
RunID: evt.RunID,
RequestID: evt.RunID,
AgentID: evt.AgentID,
Type: evt.Type,
Status: evt.Status,
FailureCode: evt.FailureCode,
Message: evt.Message,
RetryCount: evt.RetryCount,
At: evt.At,
})
}
}
threadID := strings.TrimSpace(run.ThreadID)
if threadID == "" {
continue
}
if _, ok := seenThreads[threadID]; !ok {
if thread, found := sm.Thread(threadID); found {
snapshot.Threads = append(snapshot.Threads, threadToThreadRecord(thread))
}
seenThreads[threadID] = struct{}{}
}
if msgs, err := sm.ThreadMessages(threadID, limit); err == nil {
for _, msg := range msgs {
snapshot.Artifacts = append(snapshot.Artifacts, messageToArtifactRecord(msg))
}
}
}
return snapshot
}
func (sm *SubagentManager) Thread(threadID string) (*AgentThread, bool) {
if sm.mailboxStore == nil {
return nil, false
}
return sm.mailboxStore.Thread(threadID)
}
func (sm *SubagentManager) ThreadMessages(threadID string, limit int) ([]AgentMessage, error) {
if sm.mailboxStore == nil {
return nil, nil
}
return sm.mailboxStore.MessagesByThread(threadID, limit)
}
func (sm *SubagentManager) Inbox(agentID string, limit int) ([]AgentMessage, error) {
if sm.mailboxStore == nil {
return nil, nil
}
return sm.mailboxStore.Inbox(agentID, limit)
}
func (sm *SubagentManager) Message(messageID string) (*AgentMessage, bool) {
if sm.mailboxStore == nil {
return nil, false
}
return sm.mailboxStore.Message(messageID)
}
func (sm *SubagentManager) pruneArchivedLocked() {
if sm.archiveAfterMinute <= 0 {
return
}
cutoff := time.Now().Add(-time.Duration(sm.archiveAfterMinute) * time.Minute).UnixMilli()
for id, run := range sm.runs {
if !IsTerminalRuntimeStatus(run.Status) {
continue
}
if run.Updated > 0 && run.Updated < cutoff {
delete(sm.runs, id)
delete(sm.cancelFuncs, id)
}
}
}
func normalizePositiveBound(v, min, max int) int {
if v < min {
return min
}
if max > 0 && v > max {
return max
}
return v
}
func applySubagentResultQuota(result string, maxChars int) string {
if maxChars <= 0 {
return result
}
if len(result) <= maxChars {
return result
}
suffix := "\n\n[TRUNCATED: result exceeds max_result_chars quota]"
trimmed := result[:maxChars]
if len(trimmed)+len(suffix) > maxChars && maxChars > len(suffix) {
trimmed = trimmed[:maxChars-len(suffix)]
}
return strings.TrimSpace(trimmed) + suffix
}
func (sm *SubagentManager) remainingIterations(run *SubagentRun) int {
if run == nil || run.MaxToolIterations <= 0 {
return 0
}
remaining := run.MaxToolIterations - run.IterationCount
if remaining < 0 {
return 0
}
return remaining
}
func (sm *SubagentManager) applyExecutionStats(run *SubagentRun, stats *SubagentExecutionStats) {
if run == nil || stats == nil {
return
}
run.IterationCount += stats.Iterations
run.AttemptCount += stats.Attempts
run.RestartCount += stats.Restarts
run.PromptTokens += stats.PromptTokens
run.CompletionTokens += stats.CompletionTokens
run.TotalTokens += stats.TotalTokens
if strings.TrimSpace(stats.FailureCode) != "" {
run.LastFailureCode = strings.TrimSpace(stats.FailureCode)
}
}
func shouldRetrySubagentError(err error) bool {
code := classifySubagentFailureCode(err)
switch code {
case "", "timeout", "stream_failed", "stream_stale", "context_compacted":
return true
case "continuation_exhausted", "retry_limit":
return false
default:
return !errors.Is(err, context.Canceled)
}
}
func classifySubagentFailureCode(err error) string {
if err == nil {
return ""
}
if errors.Is(err, context.DeadlineExceeded) {
return "timeout"
}
var execErr *providers.ProviderExecutionError
if errors.As(err, &execErr) && execErr != nil {
if strings.TrimSpace(execErr.Code) != "" {
return strings.TrimSpace(execErr.Code)
}
}
lower := strings.ToLower(strings.TrimSpace(err.Error()))
switch {
case strings.Contains(lower, "max tool iterations"), strings.Contains(lower, "iteration budget exhausted"):
return "retry_limit"
case strings.Contains(lower, "stream stale"):
return "stream_stale"
case strings.Contains(lower, "stream failed"):
return "stream_failed"
case strings.Contains(lower, "continuation exhausted"), strings.Contains(lower, "thinking budget exhausted"):
return "continuation_exhausted"
case strings.Contains(lower, "compaction"):
return "context_compacted"
default:
return ""
}
}
func normalizeSubagentIdentifier(in string) string {
in = strings.TrimSpace(strings.ToLower(in))
if in == "" {
return ""
}
var sb strings.Builder
for _, r := range in {
switch {
case r >= 'a' && r <= 'z':
sb.WriteRune(r)
case r >= '0' && r <= '9':
sb.WriteRune(r)
case r == '-' || r == '_' || r == '.':
sb.WriteRune(r)
case r == ' ':
sb.WriteRune('-')
}
}
out := strings.Trim(sb.String(), "-_.")
if out == "" {
return ""
}
return out
}
func buildSubagentSessionKey(agentID, runID string) string {
a := normalizeSubagentIdentifier(agentID)
if a == "" {
a = "default"
}
t := normalizeSubagentIdentifier(runID)
if t == "" {
t = "run"
}
return fmt.Sprintf("subagent:%s:%s", a, t)
}
func (sm *SubagentManager) persistRunLocked(run *SubagentRun, eventType, message string) {
if run == nil || sm.runStore == nil {
return
}
cp := cloneSubagentRun(run)
_ = sm.runStore.AppendRun(cp)
_ = sm.runStore.AppendEvent(SubagentRunEvent{
RunID: cp.ID,
AgentID: cp.AgentID,
Type: strings.TrimSpace(eventType),
Status: cp.Status,
FailureCode: strings.TrimSpace(cp.LastFailureCode),
Message: strings.TrimSpace(message),
RetryCount: cp.RetryCount,
At: cp.Updated,
})
}
func (sm *SubagentManager) waitRun(ctx context.Context, runID string) (*SubagentRun, bool, error) {
if sm == nil {
return nil, false, fmt.Errorf("subagent manager not available")
}
runID = strings.TrimSpace(runID)
if runID == "" {
return nil, false, fmt.Errorf("run id is required")
}
if ctx == nil {
ctx = context.Background()
}
ch := make(chan struct{}, 1)
sm.mu.Lock()
sm.pruneArchivedLocked()
run, ok := sm.runs[runID]
if !ok && sm.runStore != nil {
if persisted, found := sm.runStore.Get(runID); found && persisted != nil {
if IsTerminalRuntimeStatus(persisted.Status) {
sm.mu.Unlock()
return persisted, true, nil
}
}
}
if ok && run != nil && IsTerminalRuntimeStatus(run.Status) {
cp := cloneSubagentRun(run)
sm.mu.Unlock()
return cp, true, nil
}
waiters := sm.waiters[runID]
if waiters == nil {
waiters = map[chan struct{}]struct{}{}
sm.waiters[runID] = waiters
}
waiters[ch] = struct{}{}
sm.mu.Unlock()
defer sm.removeRunWaiter(runID, ch)
for {
select {
case <-ctx.Done():
return nil, false, ctx.Err()
case <-ch:
sm.mu.Lock()
sm.pruneArchivedLocked()
run, ok := sm.runs[runID]
if ok && run != nil && IsTerminalRuntimeStatus(run.Status) {
cp := cloneSubagentRun(run)
sm.mu.Unlock()
return cp, true, nil
}
if !ok && sm.runStore != nil {
if persisted, found := sm.runStore.Get(runID); found && persisted != nil && IsTerminalRuntimeStatus(persisted.Status) {
sm.mu.Unlock()
return persisted, true, nil
}
}
sm.mu.Unlock()
}
}
}
func (sm *SubagentManager) removeRunWaiter(runID string, ch chan struct{}) {
sm.mu.Lock()
defer sm.mu.Unlock()
waiters := sm.waiters[runID]
if len(waiters) == 0 {
delete(sm.waiters, runID)
return
}
delete(waiters, ch)
if len(waiters) == 0 {
delete(sm.waiters, runID)
}
}
func (sm *SubagentManager) notifyRunWaitersLocked(runID string) {
waiters := sm.waiters[runID]
if len(waiters) == 0 {
delete(sm.waiters, runID)
return
}
for ch := range waiters {
select {
case ch <- struct{}{}:
default:
}
}
delete(sm.waiters, runID)
}
func (sm *SubagentManager) recordMailboxMessageLocked(run *SubagentRun, msg AgentMessage) {
if sm.mailboxStore == nil || run == nil {
return
}
if strings.TrimSpace(msg.ThreadID) == "" {
msg.ThreadID = run.ThreadID
}
stored, err := sm.mailboxStore.AppendMessage(msg)
if err != nil {
return
}
run.LastMessageID = stored.MessageID
if stored.RequiresReply {
run.WaitingReply = true
}
}
func (sm *SubagentManager) consumeThreadInbox(run *SubagentRun) (string, []string) {
if run == nil || sm.mailboxStore == nil {
return "", nil
}
msgs, err := sm.mailboxStore.ThreadInbox(run.ThreadID, run.AgentID, 0)
if err != nil || len(msgs) == 0 {
return "", nil
}
var sb strings.Builder
consumed := make([]string, 0, len(msgs))
now := time.Now().UnixMilli()
for _, msg := range msgs {
if _, err := sm.mailboxStore.UpdateMessageStatus(msg.MessageID, "processing", now); err != nil {
continue
}
consumed = append(consumed, msg.MessageID)
sb.WriteString(fmt.Sprintf("- [%s] from=%s type=%s", msg.MessageID, msg.FromAgent, msg.Type))
if strings.TrimSpace(msg.ReplyTo) != "" {
sb.WriteString(fmt.Sprintf(" reply_to=%s", msg.ReplyTo))
}
sb.WriteString("\n")
sb.WriteString(strings.TrimSpace(msg.Content))
sb.WriteString("\n")
}
return strings.TrimSpace(sb.String()), consumed
}
func (sm *SubagentManager) restoreMessageStatuses(messageIDs []string) {
if sm.mailboxStore == nil || len(messageIDs) == 0 {
return
}
now := time.Now().UnixMilli()
for _, messageID := range messageIDs {
_, _ = sm.mailboxStore.UpdateMessageStatus(messageID, "queued", now)
}
}
func (sm *SubagentManager) ackMessageStatuses(messageIDs []string) {
if sm.mailboxStore == nil || len(messageIDs) == 0 {
return
}
now := time.Now().UnixMilli()
for _, messageID := range messageIDs {
_, _ = sm.mailboxStore.UpdateMessageStatus(messageID, "acked", now)
}
}