Files
clawgo/pkg/tools/subagent.go

1033 lines
28 KiB
Go

package tools
import (
"context"
"errors"
"fmt"
"os"
"path/filepath"
"strings"
"sync"
"time"
"github.com/YspCoder/clawgo/pkg/bus"
"github.com/YspCoder/clawgo/pkg/providers"
)
type SubagentRun struct {
ID string `json:"id"`
Task string `json:"task"`
Label string `json:"label"`
Role string `json:"role"`
AgentID string `json:"agent_id"`
Transport string `json:"transport,omitempty"`
ParentAgentID string `json:"parent_agent_id,omitempty"`
NotifyMainPolicy string `json:"notify_main_policy,omitempty"`
SessionKey string `json:"session_key"`
MemoryNS string `json:"memory_ns"`
SystemPromptFile string `json:"system_prompt_file,omitempty"`
ToolAllowlist []string `json:"tool_allowlist,omitempty"`
MaxRetries int `json:"max_retries,omitempty"`
RetryBackoff int `json:"retry_backoff,omitempty"`
TimeoutSec int `json:"timeout_sec,omitempty"`
MaxTaskChars int `json:"max_task_chars,omitempty"`
MaxResultChars int `json:"max_result_chars,omitempty"`
RetryCount int `json:"retry_count,omitempty"`
ThreadID string `json:"thread_id,omitempty"`
CorrelationID string `json:"correlation_id,omitempty"`
ParentRunID string `json:"parent_run_id,omitempty"`
LastMessageID string `json:"last_message_id,omitempty"`
WaitingReply bool `json:"waiting_for_reply,omitempty"`
SharedState map[string]interface{} `json:"shared_state,omitempty"`
OriginChannel string `json:"origin_channel,omitempty"`
OriginChatID string `json:"origin_chat_id,omitempty"`
Status string `json:"status"`
Result string `json:"result,omitempty"`
Steering []string `json:"steering,omitempty"`
Created int64 `json:"created"`
Updated int64 `json:"updated"`
}
type SubagentManager struct {
runs map[string]*SubagentRun
cancelFuncs map[string]context.CancelFunc
waiters map[string]map[chan struct{}]struct{}
recoverableRunIDs []string
archiveAfterMinute int64
mu sync.RWMutex
provider providers.LLMProvider
bus *bus.MessageBus
workspace string
nextID int
runFunc SubagentRunFunc
profileStore *SubagentProfileStore
runStore *SubagentRunStore
mailboxStore *AgentMailboxStore
}
type SubagentSpawnOptions struct {
Task string
Label string
Role string
AgentID string
NotifyMainPolicy string
MaxRetries int
RetryBackoff int
TimeoutSec int
MaxTaskChars int
MaxResultChars int
OriginChannel string
OriginChatID string
ThreadID string
CorrelationID string
ParentRunID string
}
func NewSubagentManager(provider providers.LLMProvider, workspace string, bus *bus.MessageBus) *SubagentManager {
store := NewSubagentProfileStore(workspace)
runStore := NewSubagentRunStore(workspace)
mailboxStore := NewAgentMailboxStore(workspace)
mgr := &SubagentManager{
runs: make(map[string]*SubagentRun),
cancelFuncs: make(map[string]context.CancelFunc),
waiters: make(map[string]map[chan struct{}]struct{}),
archiveAfterMinute: 60,
provider: provider,
bus: bus,
workspace: workspace,
nextID: 1,
profileStore: store,
runStore: runStore,
mailboxStore: mailboxStore,
}
if runStore != nil {
for _, run := range runStore.List() {
mgr.runs[run.ID] = run
if run.Status == RuntimeStatusRunning {
mgr.recoverableRunIDs = append(mgr.recoverableRunIDs, run.ID)
}
}
mgr.nextID = runStore.NextIDSeed()
}
go mgr.resumeRecoveredRuns()
return mgr
}
func (sm *SubagentManager) Spawn(ctx context.Context, opts SubagentSpawnOptions) (string, error) {
run, err := sm.spawnRun(ctx, opts)
if err != nil {
return "", err
}
desc := fmt.Sprintf("Spawned subagent for task: %s (agent=%s)", run.Task, run.AgentID)
if run.Label != "" {
desc = fmt.Sprintf("Spawned subagent '%s' for task: %s (agent=%s)", run.Label, run.Task, run.AgentID)
}
if run.Role != "" {
desc += fmt.Sprintf(" role=%s", run.Role)
}
return desc, nil
}
func (sm *SubagentManager) SpawnRun(ctx context.Context, opts SubagentSpawnOptions) (*SubagentRun, error) {
return sm.spawnRun(ctx, opts)
}
func (sm *SubagentManager) spawnRun(ctx context.Context, opts SubagentSpawnOptions) (*SubagentRun, error) {
task := strings.TrimSpace(opts.Task)
if task == "" {
return nil, fmt.Errorf("task is required")
}
label := strings.TrimSpace(opts.Label)
role := strings.TrimSpace(opts.Role)
agentID := normalizeSubagentIdentifier(opts.AgentID)
originalRole := role
var profile *SubagentProfile
if sm.profileStore != nil {
if agentID != "" {
if p, ok, err := sm.profileStore.Get(agentID); err != nil {
return nil, err
} else if ok {
profile = p
}
} else if role != "" {
if p, ok, err := sm.profileStore.FindByRole(role); err != nil {
return nil, err
} else if ok {
profile = p
agentID = normalizeSubagentIdentifier(p.AgentID)
}
}
}
if agentID == "" {
agentID = normalizeSubagentIdentifier(role)
}
if agentID == "" {
agentID = "default"
}
memoryNS := agentID
systemPromptFile := ""
transport := "local"
parentAgentID := ""
notifyMainPolicy := "final_only"
toolAllowlist := []string(nil)
maxRetries := 0
retryBackoff := 1000
timeoutSec := 0
maxTaskChars := 0
maxResultChars := 0
if profile == nil && sm.profileStore != nil {
if p, ok, err := sm.profileStore.Get(agentID); err != nil {
return nil, err
} else if ok {
profile = p
}
}
if profile != nil {
if strings.EqualFold(strings.TrimSpace(profile.Status), "disabled") {
return nil, fmt.Errorf("subagent profile '%s' is disabled", profile.AgentID)
}
if label == "" {
label = strings.TrimSpace(profile.Name)
}
if role == "" {
role = strings.TrimSpace(profile.Role)
}
if ns := normalizeSubagentIdentifier(profile.MemoryNamespace); ns != "" {
memoryNS = ns
}
transport = strings.TrimSpace(profile.Transport)
if transport == "" {
transport = "local"
}
parentAgentID = strings.TrimSpace(profile.ParentAgentID)
notifyMainPolicy = normalizeNotifyMainPolicy(profile.NotifyMainPolicy)
systemPromptFile = strings.TrimSpace(profile.SystemPromptFile)
toolAllowlist = append([]string(nil), profile.ToolAllowlist...)
maxRetries = profile.MaxRetries
retryBackoff = profile.RetryBackoff
timeoutSec = profile.TimeoutSec
maxTaskChars = profile.MaxTaskChars
maxResultChars = profile.MaxResultChars
}
if opts.MaxRetries > 0 {
maxRetries = opts.MaxRetries
}
if opts.RetryBackoff > 0 {
retryBackoff = opts.RetryBackoff
}
if opts.TimeoutSec > 0 {
timeoutSec = opts.TimeoutSec
}
if opts.MaxTaskChars > 0 {
maxTaskChars = opts.MaxTaskChars
}
if opts.MaxResultChars > 0 {
maxResultChars = opts.MaxResultChars
}
if maxTaskChars > 0 && len(task) > maxTaskChars {
return nil, fmt.Errorf("task exceeds max_task_chars quota (%d > %d)", len(task), maxTaskChars)
}
maxRetries = normalizePositiveBound(maxRetries, 0, 8)
retryBackoff = normalizePositiveBound(retryBackoff, 500, 120000)
timeoutSec = normalizePositiveBound(timeoutSec, 0, 3600)
maxTaskChars = normalizePositiveBound(maxTaskChars, 0, 400000)
maxResultChars = normalizePositiveBound(maxResultChars, 0, 400000)
if role == "" {
role = originalRole
}
originChannel := strings.TrimSpace(opts.OriginChannel)
originChatID := strings.TrimSpace(opts.OriginChatID)
if raw := strings.TrimSpace(opts.NotifyMainPolicy); raw != "" {
notifyMainPolicy = normalizeNotifyMainPolicy(raw)
}
threadID := strings.TrimSpace(opts.ThreadID)
correlationID := strings.TrimSpace(opts.CorrelationID)
parentRunID := strings.TrimSpace(opts.ParentRunID)
sm.mu.Lock()
defer sm.mu.Unlock()
runID := fmt.Sprintf("subagent-%d", sm.nextID)
sm.nextID++
sessionKey := buildSubagentSessionKey(agentID, runID)
now := time.Now().UnixMilli()
if correlationID == "" {
correlationID = runID
}
if sm.mailboxStore != nil {
thread, err := sm.mailboxStore.EnsureThread(AgentThread{
ThreadID: threadID,
Owner: "main",
Participants: []string{"main", agentID},
Status: "open",
Topic: task,
CreatedAt: now,
UpdatedAt: now,
})
if err == nil {
threadID = thread.ThreadID
}
}
subagentRun := &SubagentRun{
ID: runID,
Task: task,
Label: label,
Role: role,
AgentID: agentID,
Transport: transport,
ParentAgentID: parentAgentID,
NotifyMainPolicy: notifyMainPolicy,
SessionKey: sessionKey,
MemoryNS: memoryNS,
SystemPromptFile: systemPromptFile,
ToolAllowlist: toolAllowlist,
MaxRetries: maxRetries,
RetryBackoff: retryBackoff,
TimeoutSec: timeoutSec,
MaxTaskChars: maxTaskChars,
MaxResultChars: maxResultChars,
RetryCount: 0,
ThreadID: threadID,
CorrelationID: correlationID,
ParentRunID: parentRunID,
OriginChannel: originChannel,
OriginChatID: originChatID,
Status: RuntimeStatusRouting,
Created: now,
Updated: now,
}
taskCtx, cancel := context.WithCancel(ctx)
sm.runs[runID] = subagentRun
sm.cancelFuncs[runID] = cancel
sm.recordMailboxMessageLocked(subagentRun, AgentMessage{
ThreadID: threadID,
FromAgent: "main",
ToAgent: agentID,
CorrelationID: correlationID,
Type: "task",
Content: task,
RequiresReply: true,
Status: "queued",
CreatedAt: now,
})
sm.persistRunLocked(subagentRun, "spawned", "")
go sm.runSubagent(taskCtx, subagentRun)
return cloneSubagentRun(subagentRun), nil
}
func (sm *SubagentManager) runSubagent(ctx context.Context, run *SubagentRun) {
defer func() {
sm.mu.Lock()
delete(sm.cancelFuncs, run.ID)
sm.mu.Unlock()
}()
sm.mu.Lock()
run.Status = RuntimeStatusRunning
run.Created = time.Now().UnixMilli()
run.Updated = run.Created
sm.persistRunLocked(run, "started", "")
sm.mu.Unlock()
result, runErr := sm.runWithRetry(ctx, run)
sm.mu.Lock()
if runErr != nil {
run.Status = RuntimeStatusFailed
run.Result = fmt.Sprintf("Error: %v", runErr)
run.Result = applySubagentResultQuota(run.Result, run.MaxResultChars)
run.Updated = time.Now().UnixMilli()
run.WaitingReply = false
sm.recordMailboxMessageLocked(run, AgentMessage{
ThreadID: run.ThreadID,
FromAgent: run.AgentID,
ToAgent: "main",
ReplyTo: run.LastMessageID,
CorrelationID: run.CorrelationID,
Type: "result",
Content: run.Result,
Status: "delivered",
CreatedAt: run.Updated,
})
sm.persistRunLocked(run, "failed", run.Result)
sm.notifyRunWaitersLocked(run.ID)
} else {
run.Status = RuntimeStatusCompleted
run.Result = applySubagentResultQuota(result, run.MaxResultChars)
run.Updated = time.Now().UnixMilli()
run.WaitingReply = false
sm.recordMailboxMessageLocked(run, AgentMessage{
ThreadID: run.ThreadID,
FromAgent: run.AgentID,
ToAgent: "main",
ReplyTo: run.LastMessageID,
CorrelationID: run.CorrelationID,
Type: "result",
Content: run.Result,
Status: "delivered",
CreatedAt: run.Updated,
})
sm.persistRunLocked(run, "completed", run.Result)
sm.notifyRunWaitersLocked(run.ID)
}
sm.mu.Unlock()
// 2. Result broadcast
if sm.bus != nil && shouldNotifyMainOnFinal(run.NotifyMainPolicy, runErr, run) {
announceContent, notifyReason := buildSubagentMainNotification(run, runErr)
sm.bus.PublishInbound(bus.InboundMessage{
Channel: "system",
SenderID: fmt.Sprintf("subagent:%s", run.ID),
ChatID: fmt.Sprintf("%s:%s", run.OriginChannel, run.OriginChatID),
SessionKey: run.SessionKey,
Content: announceContent,
Metadata: map[string]string{
"trigger": "subagent",
"subagent_id": run.ID,
"agent_id": run.AgentID,
"role": run.Role,
"session_key": run.SessionKey,
"memory_ns": run.MemoryNS,
"retry_count": fmt.Sprintf("%d", run.RetryCount),
"timeout_sec": fmt.Sprintf("%d", run.TimeoutSec),
"status": run.Status,
"notify_reason": notifyReason,
},
})
}
}
func normalizeNotifyMainPolicy(v string) string {
switch strings.ToLower(strings.TrimSpace(v)) {
case "", "final_only":
return "final_only"
case "milestone", "on_blocked", "always", "internal_only":
return strings.ToLower(strings.TrimSpace(v))
default:
return "final_only"
}
}
func shouldNotifyMainOnFinal(policy string, runErr error, run *SubagentRun) bool {
switch normalizeNotifyMainPolicy(policy) {
case "internal_only":
return false
case "always", "final_only":
return true
case "on_blocked":
return isBlockedSubagentError(runErr)
case "milestone":
return false
default:
return true
}
}
func buildSubagentMainNotification(run *SubagentRun, runErr error) (string, string) {
status := "completed"
reason := "final"
if runErr != nil {
status = "failed"
if isBlockedSubagentError(runErr) {
status = "blocked"
reason = "blocked"
}
}
return fmt.Sprintf(
"Subagent update\nagent: %s\nrun: %s\nstatus: %s\nreason: %s\ntask: %s\nsummary: %s",
strings.TrimSpace(run.AgentID),
strings.TrimSpace(run.ID),
status,
reason,
summarizeSubagentText(firstNonEmpty(run.Label, run.Task), 120),
summarizeSubagentText(run.Result, 280),
), reason
}
func isBlockedSubagentError(err error) bool {
if err == nil {
return false
}
if errors.Is(err, context.DeadlineExceeded) {
return true
}
msg := strings.ToLower(strings.TrimSpace(err.Error()))
if msg == "" {
return false
}
blockedHints := []string{
"timeout",
"deadline exceeded",
"quota",
"rate limit",
"too many requests",
"permission denied",
"requires input",
"waiting for reply",
"blocked",
}
for _, hint := range blockedHints {
if strings.Contains(msg, hint) {
return true
}
}
return false
}
func summarizeSubagentText(s string, max int) string {
s = strings.TrimSpace(strings.ReplaceAll(s, "\r\n", "\n"))
s = strings.ReplaceAll(s, "\n", " ")
s = strings.Join(strings.Fields(s), " ")
if s == "" {
return "(empty)"
}
if max > 0 && len(s) > max {
return strings.TrimSpace(s[:max-3]) + "..."
}
return s
}
func firstNonEmpty(values ...string) string {
for _, v := range values {
if strings.TrimSpace(v) != "" {
return strings.TrimSpace(v)
}
}
return ""
}
func (sm *SubagentManager) runWithRetry(ctx context.Context, run *SubagentRun) (string, error) {
maxRetries := normalizePositiveBound(run.MaxRetries, 0, 8)
backoffMs := normalizePositiveBound(run.RetryBackoff, 500, 120000)
timeoutSec := normalizePositiveBound(run.TimeoutSec, 0, 3600)
var lastErr error
for attempt := 0; attempt <= maxRetries; attempt++ {
runCtx := ctx
var cancel context.CancelFunc
if timeoutSec > 0 {
runCtx, cancel = context.WithTimeout(ctx, time.Duration(timeoutSec)*time.Second)
}
result, err := sm.executeRunOnce(runCtx, run)
if cancel != nil {
cancel()
}
if err == nil {
sm.mu.Lock()
run.RetryCount = attempt
run.Updated = time.Now().UnixMilli()
sm.persistRunLocked(run, "attempt_succeeded", "")
sm.mu.Unlock()
return result, nil
}
lastErr = err
sm.mu.Lock()
run.RetryCount = attempt
run.Updated = time.Now().UnixMilli()
sm.persistRunLocked(run, "attempt_failed", err.Error())
sm.mu.Unlock()
if attempt >= maxRetries {
break
}
select {
case <-ctx.Done():
return "", ctx.Err()
case <-time.After(time.Duration(backoffMs) * time.Millisecond):
}
}
if lastErr == nil {
lastErr = fmt.Errorf("subagent run failed with unknown error")
}
return "", lastErr
}
func (sm *SubagentManager) executeRunOnce(ctx context.Context, run *SubagentRun) (string, error) {
if run == nil {
return "", fmt.Errorf("subagent run is nil")
}
pending, consumedIDs := sm.consumeThreadInbox(run)
if sm.runFunc != nil {
result, err := sm.runFunc(ctx, run)
if err != nil {
sm.restoreMessageStatuses(consumedIDs)
} else {
sm.ackMessageStatuses(consumedIDs)
}
return result, err
}
if sm.provider == nil {
sm.restoreMessageStatuses(consumedIDs)
return "", fmt.Errorf("no llm provider configured for subagent execution")
}
systemPrompt := sm.resolveSystemPrompt(run)
messages := []providers.Message{
{
Role: "system",
Content: systemPrompt,
},
{
Role: "user",
Content: run.Task,
},
}
if strings.TrimSpace(pending) != "" {
messages = append(messages, providers.Message{
Role: "user",
Content: "Mailbox updates on this thread:\n" + pending,
})
}
response, err := sm.provider.Chat(ctx, messages, nil, sm.provider.GetDefaultModel(), map[string]interface{}{
"max_tokens": 4096,
})
if err != nil {
sm.restoreMessageStatuses(consumedIDs)
return "", err
}
sm.ackMessageStatuses(consumedIDs)
return response.Content, nil
}
func (sm *SubagentManager) resolveSystemPrompt(run *SubagentRun) string {
systemPrompt := "You are a subagent. Follow workspace AGENTS.md and complete the task independently."
workspacePrompt := sm.readWorkspacePromptFile("AGENTS.md")
if workspacePrompt != "" {
systemPrompt = "Workspace policy (AGENTS.md):\n" + workspacePrompt + "\n\nComplete the given task independently and report the result."
}
if run == nil {
return systemPrompt
}
if promptFile := strings.TrimSpace(run.SystemPromptFile); promptFile != "" {
if promptText := sm.readWorkspacePromptFile(promptFile); promptText != "" {
return systemPrompt + "\n\nSubagent policy (" + promptFile + "):\n" + promptText
}
}
return systemPrompt
}
func (sm *SubagentManager) readWorkspacePromptFile(relPath string) string {
ws := strings.TrimSpace(sm.workspace)
relPath = strings.TrimSpace(relPath)
if ws == "" || relPath == "" || filepath.IsAbs(relPath) {
return ""
}
fullPath := filepath.Clean(filepath.Join(ws, relPath))
relToWorkspace, err := filepath.Rel(ws, fullPath)
if err != nil || strings.HasPrefix(relToWorkspace, "..") {
return ""
}
data, err := os.ReadFile(fullPath)
if err != nil {
return ""
}
return strings.TrimSpace(string(data))
}
type SubagentRunFunc func(ctx context.Context, run *SubagentRun) (string, error)
func (sm *SubagentManager) SetRunFunc(f SubagentRunFunc) {
sm.mu.Lock()
defer sm.mu.Unlock()
sm.runFunc = f
go sm.resumeRecoveredRuns()
}
func (sm *SubagentManager) ProfileStore() *SubagentProfileStore {
sm.mu.RLock()
defer sm.mu.RUnlock()
return sm.profileStore
}
func (sm *SubagentManager) resumeRecoveredRuns() {
if sm == nil {
return
}
sm.mu.Lock()
if sm.runFunc == nil && sm.provider == nil {
sm.mu.Unlock()
return
}
runIDs := append([]string(nil), sm.recoverableRunIDs...)
sm.recoverableRunIDs = nil
toResume := make([]*SubagentRun, 0, len(runIDs))
for _, runID := range runIDs {
run, ok := sm.runs[runID]
if !ok || run == nil || run.Status != "running" {
continue
}
run.Updated = time.Now().UnixMilli()
sm.persistRunLocked(run, "recovered", "auto-resumed after restart")
toResume = append(toResume, run)
}
sm.mu.Unlock()
for _, run := range toResume {
taskCtx, cancel := context.WithCancel(context.Background())
sm.mu.Lock()
sm.cancelFuncs[run.ID] = cancel
sm.mu.Unlock()
go sm.runSubagent(taskCtx, run)
}
}
func (sm *SubagentManager) NextRunSequence() int {
sm.mu.RLock()
defer sm.mu.RUnlock()
return sm.nextID
}
func (sm *SubagentManager) listRuns() []*SubagentRun {
sm.mu.Lock()
defer sm.mu.Unlock()
sm.pruneArchivedLocked()
runs := make([]*SubagentRun, 0, len(sm.runs))
seen := make(map[string]struct{}, len(sm.runs))
for _, run := range sm.runs {
runs = append(runs, run)
seen[run.ID] = struct{}{}
}
if sm.runStore != nil {
for _, run := range sm.runStore.List() {
if _, ok := seen[run.ID]; ok {
continue
}
runs = append(runs, run)
}
}
return runs
}
func (sm *SubagentManager) Events(runID string, limit int) ([]SubagentRunEvent, error) {
if sm.runStore == nil {
return nil, nil
}
return sm.runStore.Events(runID, limit)
}
func (sm *SubagentManager) RuntimeSnapshot(limit int) RuntimeSnapshot {
if sm == nil {
return RuntimeSnapshot{}
}
runs := sm.listRuns()
snapshot := RuntimeSnapshot{
Requests: make([]RequestRecord, 0, len(runs)),
Runs: make([]RunRecord, 0, len(runs)),
}
seenThreads := map[string]struct{}{}
for _, run := range runs {
snapshot.Requests = append(snapshot.Requests, runToRequestRecord(run))
snapshot.Runs = append(snapshot.Runs, runToRunRecord(run))
if evts, err := sm.Events(run.ID, limit); err == nil {
for _, evt := range evts {
snapshot.Events = append(snapshot.Events, EventRecord{
ID: EventRecordID(evt.RunID, evt.Type, evt.At),
RunID: evt.RunID,
RequestID: evt.RunID,
AgentID: evt.AgentID,
Type: evt.Type,
Status: evt.Status,
Message: evt.Message,
RetryCount: evt.RetryCount,
At: evt.At,
})
}
}
threadID := strings.TrimSpace(run.ThreadID)
if threadID == "" {
continue
}
if _, ok := seenThreads[threadID]; !ok {
if thread, found := sm.Thread(threadID); found {
snapshot.Threads = append(snapshot.Threads, threadToThreadRecord(thread))
}
seenThreads[threadID] = struct{}{}
}
if msgs, err := sm.ThreadMessages(threadID, limit); err == nil {
for _, msg := range msgs {
snapshot.Artifacts = append(snapshot.Artifacts, messageToArtifactRecord(msg))
}
}
}
return snapshot
}
func (sm *SubagentManager) Thread(threadID string) (*AgentThread, bool) {
if sm.mailboxStore == nil {
return nil, false
}
return sm.mailboxStore.Thread(threadID)
}
func (sm *SubagentManager) ThreadMessages(threadID string, limit int) ([]AgentMessage, error) {
if sm.mailboxStore == nil {
return nil, nil
}
return sm.mailboxStore.MessagesByThread(threadID, limit)
}
func (sm *SubagentManager) Inbox(agentID string, limit int) ([]AgentMessage, error) {
if sm.mailboxStore == nil {
return nil, nil
}
return sm.mailboxStore.Inbox(agentID, limit)
}
func (sm *SubagentManager) Message(messageID string) (*AgentMessage, bool) {
if sm.mailboxStore == nil {
return nil, false
}
return sm.mailboxStore.Message(messageID)
}
func (sm *SubagentManager) pruneArchivedLocked() {
if sm.archiveAfterMinute <= 0 {
return
}
cutoff := time.Now().Add(-time.Duration(sm.archiveAfterMinute) * time.Minute).UnixMilli()
for id, run := range sm.runs {
if !IsTerminalRuntimeStatus(run.Status) {
continue
}
if run.Updated > 0 && run.Updated < cutoff {
delete(sm.runs, id)
delete(sm.cancelFuncs, id)
}
}
}
func normalizePositiveBound(v, min, max int) int {
if v < min {
return min
}
if max > 0 && v > max {
return max
}
return v
}
func applySubagentResultQuota(result string, maxChars int) string {
if maxChars <= 0 {
return result
}
if len(result) <= maxChars {
return result
}
suffix := "\n\n[TRUNCATED: result exceeds max_result_chars quota]"
trimmed := result[:maxChars]
if len(trimmed)+len(suffix) > maxChars && maxChars > len(suffix) {
trimmed = trimmed[:maxChars-len(suffix)]
}
return strings.TrimSpace(trimmed) + suffix
}
func normalizeSubagentIdentifier(in string) string {
in = strings.TrimSpace(strings.ToLower(in))
if in == "" {
return ""
}
var sb strings.Builder
for _, r := range in {
switch {
case r >= 'a' && r <= 'z':
sb.WriteRune(r)
case r >= '0' && r <= '9':
sb.WriteRune(r)
case r == '-' || r == '_' || r == '.':
sb.WriteRune(r)
case r == ' ':
sb.WriteRune('-')
}
}
out := strings.Trim(sb.String(), "-_.")
if out == "" {
return ""
}
return out
}
func buildSubagentSessionKey(agentID, runID string) string {
a := normalizeSubagentIdentifier(agentID)
if a == "" {
a = "default"
}
t := normalizeSubagentIdentifier(runID)
if t == "" {
t = "run"
}
return fmt.Sprintf("subagent:%s:%s", a, t)
}
func (sm *SubagentManager) persistRunLocked(run *SubagentRun, eventType, message string) {
if run == nil || sm.runStore == nil {
return
}
cp := cloneSubagentRun(run)
_ = sm.runStore.AppendRun(cp)
_ = sm.runStore.AppendEvent(SubagentRunEvent{
RunID: cp.ID,
AgentID: cp.AgentID,
Type: strings.TrimSpace(eventType),
Status: cp.Status,
Message: strings.TrimSpace(message),
RetryCount: cp.RetryCount,
At: cp.Updated,
})
}
func (sm *SubagentManager) waitRun(ctx context.Context, runID string) (*SubagentRun, bool, error) {
if sm == nil {
return nil, false, fmt.Errorf("subagent manager not available")
}
runID = strings.TrimSpace(runID)
if runID == "" {
return nil, false, fmt.Errorf("run id is required")
}
if ctx == nil {
ctx = context.Background()
}
ch := make(chan struct{}, 1)
sm.mu.Lock()
sm.pruneArchivedLocked()
run, ok := sm.runs[runID]
if !ok && sm.runStore != nil {
if persisted, found := sm.runStore.Get(runID); found && persisted != nil {
if IsTerminalRuntimeStatus(persisted.Status) {
sm.mu.Unlock()
return persisted, true, nil
}
}
}
if ok && run != nil && IsTerminalRuntimeStatus(run.Status) {
cp := cloneSubagentRun(run)
sm.mu.Unlock()
return cp, true, nil
}
waiters := sm.waiters[runID]
if waiters == nil {
waiters = map[chan struct{}]struct{}{}
sm.waiters[runID] = waiters
}
waiters[ch] = struct{}{}
sm.mu.Unlock()
defer sm.removeRunWaiter(runID, ch)
for {
select {
case <-ctx.Done():
return nil, false, ctx.Err()
case <-ch:
sm.mu.Lock()
sm.pruneArchivedLocked()
run, ok := sm.runs[runID]
if ok && run != nil && IsTerminalRuntimeStatus(run.Status) {
cp := cloneSubagentRun(run)
sm.mu.Unlock()
return cp, true, nil
}
if !ok && sm.runStore != nil {
if persisted, found := sm.runStore.Get(runID); found && persisted != nil && IsTerminalRuntimeStatus(persisted.Status) {
sm.mu.Unlock()
return persisted, true, nil
}
}
sm.mu.Unlock()
}
}
}
func (sm *SubagentManager) removeRunWaiter(runID string, ch chan struct{}) {
sm.mu.Lock()
defer sm.mu.Unlock()
waiters := sm.waiters[runID]
if len(waiters) == 0 {
delete(sm.waiters, runID)
return
}
delete(waiters, ch)
if len(waiters) == 0 {
delete(sm.waiters, runID)
}
}
func (sm *SubagentManager) notifyRunWaitersLocked(runID string) {
waiters := sm.waiters[runID]
if len(waiters) == 0 {
delete(sm.waiters, runID)
return
}
for ch := range waiters {
select {
case ch <- struct{}{}:
default:
}
}
delete(sm.waiters, runID)
}
func (sm *SubagentManager) recordMailboxMessageLocked(run *SubagentRun, msg AgentMessage) {
if sm.mailboxStore == nil || run == nil {
return
}
if strings.TrimSpace(msg.ThreadID) == "" {
msg.ThreadID = run.ThreadID
}
stored, err := sm.mailboxStore.AppendMessage(msg)
if err != nil {
return
}
run.LastMessageID = stored.MessageID
if stored.RequiresReply {
run.WaitingReply = true
}
}
func (sm *SubagentManager) consumeThreadInbox(run *SubagentRun) (string, []string) {
if run == nil || sm.mailboxStore == nil {
return "", nil
}
msgs, err := sm.mailboxStore.ThreadInbox(run.ThreadID, run.AgentID, 0)
if err != nil || len(msgs) == 0 {
return "", nil
}
var sb strings.Builder
consumed := make([]string, 0, len(msgs))
now := time.Now().UnixMilli()
for _, msg := range msgs {
if _, err := sm.mailboxStore.UpdateMessageStatus(msg.MessageID, "processing", now); err != nil {
continue
}
consumed = append(consumed, msg.MessageID)
sb.WriteString(fmt.Sprintf("- [%s] from=%s type=%s", msg.MessageID, msg.FromAgent, msg.Type))
if strings.TrimSpace(msg.ReplyTo) != "" {
sb.WriteString(fmt.Sprintf(" reply_to=%s", msg.ReplyTo))
}
sb.WriteString("\n")
sb.WriteString(strings.TrimSpace(msg.Content))
sb.WriteString("\n")
}
return strings.TrimSpace(sb.String()), consumed
}
func (sm *SubagentManager) restoreMessageStatuses(messageIDs []string) {
if sm.mailboxStore == nil || len(messageIDs) == 0 {
return
}
now := time.Now().UnixMilli()
for _, messageID := range messageIDs {
_, _ = sm.mailboxStore.UpdateMessageStatus(messageID, "queued", now)
}
}
func (sm *SubagentManager) ackMessageStatuses(messageIDs []string) {
if sm.mailboxStore == nil || len(messageIDs) == 0 {
return
}
now := time.Now().UnixMilli()
for _, messageID := range messageIDs {
_, _ = sm.mailboxStore.UpdateMessageStatus(messageID, "acked", now)
}
}