Add multi-agent config and registry runtime flow

This commit is contained in:
lpf
2026-03-06 12:47:05 +08:00
parent 959870e6f7
commit 6902f65c54
29 changed files with 4654 additions and 76 deletions

View File

@@ -31,6 +31,11 @@ type SubagentTask struct {
RetryCount int `json:"retry_count,omitempty"`
PipelineID string `json:"pipeline_id,omitempty"`
PipelineTask string `json:"pipeline_task,omitempty"`
ThreadID string `json:"thread_id,omitempty"`
CorrelationID string `json:"correlation_id,omitempty"`
ParentRunID string `json:"parent_run_id,omitempty"`
LastMessageID string `json:"last_message_id,omitempty"`
WaitingReply bool `json:"waiting_for_reply,omitempty"`
SharedState map[string]interface{} `json:"shared_state,omitempty"`
OriginChannel string `json:"origin_channel,omitempty"`
OriginChatID string `json:"origin_chat_id,omitempty"`
@@ -53,6 +58,8 @@ type SubagentManager struct {
nextID int
runFunc SubagentRunFunc
profileStore *SubagentProfileStore
runStore *SubagentRunStore
mailboxStore *AgentMailboxStore
}
type SubagentSpawnOptions struct {
@@ -69,11 +76,16 @@ type SubagentSpawnOptions struct {
OriginChatID string
PipelineID string
PipelineTask string
ThreadID string
CorrelationID string
ParentRunID string
}
func NewSubagentManager(provider providers.LLMProvider, workspace string, bus *bus.MessageBus, orc *Orchestrator) *SubagentManager {
store := NewSubagentProfileStore(workspace)
return &SubagentManager{
runStore := NewSubagentRunStore(workspace)
mailboxStore := NewAgentMailboxStore(workspace)
mgr := &SubagentManager{
tasks: make(map[string]*SubagentTask),
cancelFuncs: make(map[string]context.CancelFunc),
archiveAfterMinute: 60,
@@ -83,13 +95,44 @@ func NewSubagentManager(provider providers.LLMProvider, workspace string, bus *b
workspace: workspace,
nextID: 1,
profileStore: store,
runStore: runStore,
mailboxStore: mailboxStore,
}
if runStore != nil {
for _, task := range runStore.List() {
mgr.tasks[task.ID] = task
}
mgr.nextID = runStore.NextIDSeed()
}
return mgr
}
func (sm *SubagentManager) Spawn(ctx context.Context, opts SubagentSpawnOptions) (string, error) {
task, err := sm.spawnTask(ctx, opts)
if err != nil {
return "", err
}
desc := fmt.Sprintf("Spawned subagent for task: %s (agent=%s)", task.Task, task.AgentID)
if task.Label != "" {
desc = fmt.Sprintf("Spawned subagent '%s' for task: %s (agent=%s)", task.Label, task.Task, task.AgentID)
}
if task.Role != "" {
desc += fmt.Sprintf(" role=%s", task.Role)
}
if task.PipelineID != "" && task.PipelineTask != "" {
desc += fmt.Sprintf(" (pipeline=%s task=%s)", task.PipelineID, task.PipelineTask)
}
return desc, nil
}
func (sm *SubagentManager) SpawnTask(ctx context.Context, opts SubagentSpawnOptions) (*SubagentTask, error) {
return sm.spawnTask(ctx, opts)
}
func (sm *SubagentManager) spawnTask(ctx context.Context, opts SubagentSpawnOptions) (*SubagentTask, error) {
task := strings.TrimSpace(opts.Task)
if task == "" {
return "", fmt.Errorf("task is required")
return nil, fmt.Errorf("task is required")
}
label := strings.TrimSpace(opts.Label)
role := strings.TrimSpace(opts.Role)
@@ -99,13 +142,13 @@ func (sm *SubagentManager) Spawn(ctx context.Context, opts SubagentSpawnOptions)
if sm.profileStore != nil {
if agentID != "" {
if p, ok, err := sm.profileStore.Get(agentID); err != nil {
return "", err
return nil, err
} else if ok {
profile = p
}
} else if role != "" {
if p, ok, err := sm.profileStore.FindByRole(role); err != nil {
return "", err
return nil, err
} else if ok {
profile = p
agentID = normalizeSubagentIdentifier(p.AgentID)
@@ -128,14 +171,14 @@ func (sm *SubagentManager) Spawn(ctx context.Context, opts SubagentSpawnOptions)
maxResultChars := 0
if profile == nil && sm.profileStore != nil {
if p, ok, err := sm.profileStore.Get(agentID); err != nil {
return "", err
return nil, err
} else if ok {
profile = p
}
}
if profile != nil {
if strings.EqualFold(strings.TrimSpace(profile.Status), "disabled") {
return "", fmt.Errorf("subagent profile '%s' is disabled", profile.AgentID)
return nil, fmt.Errorf("subagent profile '%s' is disabled", profile.AgentID)
}
if label == "" {
label = strings.TrimSpace(profile.Name)
@@ -170,7 +213,7 @@ func (sm *SubagentManager) Spawn(ctx context.Context, opts SubagentSpawnOptions)
maxResultChars = opts.MaxResultChars
}
if maxTaskChars > 0 && len(task) > maxTaskChars {
return "", fmt.Errorf("task exceeds max_task_chars quota (%d > %d)", len(task), maxTaskChars)
return nil, fmt.Errorf("task exceeds max_task_chars quota (%d > %d)", len(task), maxTaskChars)
}
maxRetries = normalizePositiveBound(maxRetries, 0, 8)
retryBackoff = normalizePositiveBound(retryBackoff, 500, 120000)
@@ -184,6 +227,9 @@ func (sm *SubagentManager) Spawn(ctx context.Context, opts SubagentSpawnOptions)
originChatID := strings.TrimSpace(opts.OriginChatID)
pipelineID := strings.TrimSpace(opts.PipelineID)
pipelineTask := strings.TrimSpace(opts.PipelineTask)
threadID := strings.TrimSpace(opts.ThreadID)
correlationID := strings.TrimSpace(opts.CorrelationID)
parentRunID := strings.TrimSpace(opts.ParentRunID)
sm.mu.Lock()
defer sm.mu.Unlock()
@@ -193,6 +239,23 @@ func (sm *SubagentManager) Spawn(ctx context.Context, opts SubagentSpawnOptions)
sessionKey := buildSubagentSessionKey(agentID, taskID)
now := time.Now().UnixMilli()
if correlationID == "" {
correlationID = taskID
}
if sm.mailboxStore != nil {
thread, err := sm.mailboxStore.EnsureThread(AgentThread{
ThreadID: threadID,
Owner: "main",
Participants: []string{"main", agentID},
Status: "open",
Topic: task,
CreatedAt: now,
UpdatedAt: now,
})
if err == nil {
threadID = thread.ThreadID
}
}
subagentTask := &SubagentTask{
ID: taskID,
Task: task,
@@ -211,6 +274,9 @@ func (sm *SubagentManager) Spawn(ctx context.Context, opts SubagentSpawnOptions)
RetryCount: 0,
PipelineID: pipelineID,
PipelineTask: pipelineTask,
ThreadID: threadID,
CorrelationID: correlationID,
ParentRunID: parentRunID,
OriginChannel: originChannel,
OriginChatID: originChatID,
Status: "running",
@@ -220,20 +286,21 @@ func (sm *SubagentManager) Spawn(ctx context.Context, opts SubagentSpawnOptions)
taskCtx, cancel := context.WithCancel(ctx)
sm.tasks[taskID] = subagentTask
sm.cancelFuncs[taskID] = cancel
sm.recordMailboxMessageLocked(subagentTask, AgentMessage{
ThreadID: threadID,
FromAgent: "main",
ToAgent: agentID,
CorrelationID: correlationID,
Type: "task",
Content: task,
RequiresReply: true,
Status: "queued",
CreatedAt: now,
})
sm.persistTaskLocked(subagentTask, "spawned", "")
go sm.runTask(taskCtx, subagentTask)
desc := fmt.Sprintf("Spawned subagent for task: %s (agent=%s)", task, agentID)
if label != "" {
desc = fmt.Sprintf("Spawned subagent '%s' for task: %s (agent=%s)", label, task, agentID)
}
if role != "" {
desc += fmt.Sprintf(" role=%s", role)
}
if pipelineID != "" && pipelineTask != "" {
desc += fmt.Sprintf(" (pipeline=%s task=%s)", pipelineID, pipelineTask)
}
return desc, nil
return cloneSubagentTask(subagentTask), nil
}
func (sm *SubagentManager) runTask(ctx context.Context, task *SubagentTask) {
@@ -247,6 +314,7 @@ func (sm *SubagentManager) runTask(ctx context.Context, task *SubagentTask) {
task.Status = "running"
task.Created = time.Now().UnixMilli()
task.Updated = task.Created
sm.persistTaskLocked(task, "started", "")
sm.mu.Unlock()
if sm.orc != nil && task.PipelineID != "" && task.PipelineTask != "" {
@@ -259,6 +327,19 @@ func (sm *SubagentManager) runTask(ctx context.Context, task *SubagentTask) {
task.Result = fmt.Sprintf("Error: %v", runErr)
task.Result = applySubagentResultQuota(task.Result, task.MaxResultChars)
task.Updated = time.Now().UnixMilli()
task.WaitingReply = false
sm.recordMailboxMessageLocked(task, AgentMessage{
ThreadID: task.ThreadID,
FromAgent: task.AgentID,
ToAgent: "main",
ReplyTo: task.LastMessageID,
CorrelationID: task.CorrelationID,
Type: "result",
Content: task.Result,
Status: "delivered",
CreatedAt: task.Updated,
})
sm.persistTaskLocked(task, "completed", task.Result)
if sm.orc != nil && task.PipelineID != "" && task.PipelineTask != "" {
_ = sm.orc.MarkTaskDone(task.PipelineID, task.PipelineTask, task.Result, runErr)
}
@@ -266,6 +347,19 @@ func (sm *SubagentManager) runTask(ctx context.Context, task *SubagentTask) {
task.Status = "completed"
task.Result = applySubagentResultQuota(result, task.MaxResultChars)
task.Updated = time.Now().UnixMilli()
task.WaitingReply = false
sm.recordMailboxMessageLocked(task, AgentMessage{
ThreadID: task.ThreadID,
FromAgent: task.AgentID,
ToAgent: "main",
ReplyTo: task.LastMessageID,
CorrelationID: task.CorrelationID,
Type: "result",
Content: task.Result,
Status: "delivered",
CreatedAt: task.Updated,
})
sm.persistTaskLocked(task, "completed", task.Result)
if sm.orc != nil && task.PipelineID != "" && task.PipelineTask != "" {
_ = sm.orc.MarkTaskDone(task.PipelineID, task.PipelineTask, task.Result, nil)
}
@@ -331,6 +425,7 @@ func (sm *SubagentManager) runWithRetry(ctx context.Context, task *SubagentTask)
sm.mu.Lock()
task.RetryCount = attempt
task.Updated = time.Now().UnixMilli()
sm.persistTaskLocked(task, "attempt_succeeded", "")
sm.mu.Unlock()
return result, nil
}
@@ -338,6 +433,7 @@ func (sm *SubagentManager) runWithRetry(ctx context.Context, task *SubagentTask)
sm.mu.Lock()
task.RetryCount = attempt
task.Updated = time.Now().UnixMilli()
sm.persistTaskLocked(task, "attempt_failed", err.Error())
sm.mu.Unlock()
if attempt >= maxRetries {
break
@@ -358,10 +454,18 @@ func (sm *SubagentManager) executeTaskOnce(ctx context.Context, task *SubagentTa
if task == nil {
return "", fmt.Errorf("subagent task is nil")
}
pending, consumedIDs := sm.consumeThreadInbox(task)
if sm.runFunc != nil {
return sm.runFunc(ctx, task)
result, err := sm.runFunc(ctx, task)
if err != nil {
sm.restoreMessageStatuses(consumedIDs)
} else {
sm.ackMessageStatuses(consumedIDs)
}
return result, err
}
if sm.provider == nil {
sm.restoreMessageStatuses(consumedIDs)
return "", fmt.Errorf("no llm provider configured for subagent execution")
}
@@ -388,13 +492,21 @@ func (sm *SubagentManager) executeTaskOnce(ctx context.Context, task *SubagentTa
Content: task.Task,
},
}
if strings.TrimSpace(pending) != "" {
messages = append(messages, providers.Message{
Role: "user",
Content: "Mailbox updates on this thread:\n" + pending,
})
}
response, err := sm.provider.Chat(ctx, messages, nil, sm.provider.GetDefaultModel(), map[string]interface{}{
"max_tokens": 4096,
})
if err != nil {
sm.restoreMessageStatuses(consumedIDs)
return "", err
}
sm.ackMessageStatuses(consumedIDs)
return response.Content, nil
}
@@ -412,11 +524,20 @@ func (sm *SubagentManager) ProfileStore() *SubagentProfileStore {
return sm.profileStore
}
func (sm *SubagentManager) NextTaskSequence() int {
sm.mu.RLock()
defer sm.mu.RUnlock()
return sm.nextID
}
func (sm *SubagentManager) GetTask(taskID string) (*SubagentTask, bool) {
sm.mu.Lock()
defer sm.mu.Unlock()
sm.pruneArchivedLocked()
task, ok := sm.tasks[taskID]
if !ok && sm.runStore != nil {
return sm.runStore.Get(taskID)
}
return task, ok
}
@@ -426,8 +547,18 @@ func (sm *SubagentManager) ListTasks() []*SubagentTask {
sm.pruneArchivedLocked()
tasks := make([]*SubagentTask, 0, len(sm.tasks))
seen := make(map[string]struct{}, len(sm.tasks))
for _, task := range sm.tasks {
tasks = append(tasks, task)
seen[task.ID] = struct{}{}
}
if sm.runStore != nil {
for _, task := range sm.runStore.List() {
if _, ok := seen[task.ID]; ok {
continue
}
tasks = append(tasks, task)
}
}
return tasks
}
@@ -445,24 +576,46 @@ func (sm *SubagentManager) KillTask(taskID string) bool {
}
if t.Status == "running" {
t.Status = "killed"
t.WaitingReply = false
t.Updated = time.Now().UnixMilli()
sm.persistTaskLocked(t, "killed", "")
}
return true
}
func (sm *SubagentManager) SteerTask(taskID, message string) bool {
return sm.sendTaskMessage(taskID, "main", "control", message, false, "")
}
func (sm *SubagentManager) SendTaskMessage(taskID, message string) bool {
return sm.sendTaskMessage(taskID, "main", "message", message, false, "")
}
func (sm *SubagentManager) ReplyToTask(taskID, replyToMessageID, message string) bool {
return sm.sendTaskMessage(taskID, "main", "reply", message, false, replyToMessageID)
}
func (sm *SubagentManager) AckTaskMessage(taskID, messageID string) bool {
sm.mu.Lock()
defer sm.mu.Unlock()
t, ok := sm.tasks[taskID]
if !ok {
return false
}
message = strings.TrimSpace(message)
if message == "" {
if sm.mailboxStore == nil {
return false
}
if strings.TrimSpace(messageID) == "" {
return false
}
t.Steering = append(t.Steering, message)
t.Updated = time.Now().UnixMilli()
msg, err := sm.mailboxStore.UpdateMessageStatus(messageID, "acked", t.Updated)
if err != nil {
return false
}
t.LastMessageID = msg.MessageID
t.WaitingReply = false
sm.persistTaskLocked(t, "acked", messageID)
return true
}
@@ -496,13 +649,66 @@ func (sm *SubagentManager) ResumeTask(ctx context.Context, taskID string) (strin
OriginChatID: t.OriginChatID,
PipelineID: t.PipelineID,
PipelineTask: t.PipelineTask,
ThreadID: t.ThreadID,
CorrelationID: t.CorrelationID,
ParentRunID: t.ID,
})
if err != nil {
return "", false
}
sm.mu.Lock()
if original, ok := sm.tasks[taskID]; ok {
sm.persistTaskLocked(original, "resumed", label)
}
sm.mu.Unlock()
return label, true
}
func (sm *SubagentManager) Events(taskID string, limit int) ([]SubagentRunEvent, error) {
if sm.runStore == nil {
return nil, nil
}
return sm.runStore.Events(taskID, limit)
}
func (sm *SubagentManager) Thread(threadID string) (*AgentThread, bool) {
if sm.mailboxStore == nil {
return nil, false
}
return sm.mailboxStore.Thread(threadID)
}
func (sm *SubagentManager) ThreadMessages(threadID string, limit int) ([]AgentMessage, error) {
if sm.mailboxStore == nil {
return nil, nil
}
return sm.mailboxStore.MessagesByThread(threadID, limit)
}
func (sm *SubagentManager) Inbox(agentID string, limit int) ([]AgentMessage, error) {
if sm.mailboxStore == nil {
return nil, nil
}
return sm.mailboxStore.Inbox(agentID, limit)
}
func (sm *SubagentManager) TaskInbox(taskID string, limit int) ([]AgentMessage, error) {
sm.mu.RLock()
task, ok := sm.tasks[taskID]
sm.mu.RUnlock()
if !ok || sm.mailboxStore == nil {
return nil, nil
}
return sm.mailboxStore.ThreadInbox(task.ThreadID, task.AgentID, limit)
}
func (sm *SubagentManager) Message(messageID string) (*AgentMessage, bool) {
if sm.mailboxStore == nil {
return nil, false
}
return sm.mailboxStore.Message(messageID)
}
func (sm *SubagentManager) pruneArchivedLocked() {
if sm.archiveAfterMinute <= 0 {
return
@@ -580,3 +786,133 @@ func buildSubagentSessionKey(agentID, taskID string) string {
}
return fmt.Sprintf("subagent:%s:%s", a, t)
}
func (sm *SubagentManager) persistTaskLocked(task *SubagentTask, eventType, message string) {
if task == nil || sm.runStore == nil {
return
}
cp := cloneSubagentTask(task)
_ = sm.runStore.AppendRun(cp)
_ = sm.runStore.AppendEvent(SubagentRunEvent{
RunID: cp.ID,
AgentID: cp.AgentID,
Type: strings.TrimSpace(eventType),
Status: cp.Status,
Message: strings.TrimSpace(message),
RetryCount: cp.RetryCount,
At: cp.Updated,
})
}
func (sm *SubagentManager) recordMailboxMessageLocked(task *SubagentTask, msg AgentMessage) {
if sm.mailboxStore == nil || task == nil {
return
}
if strings.TrimSpace(msg.ThreadID) == "" {
msg.ThreadID = task.ThreadID
}
stored, err := sm.mailboxStore.AppendMessage(msg)
if err != nil {
return
}
task.LastMessageID = stored.MessageID
if stored.RequiresReply {
task.WaitingReply = true
}
}
func (sm *SubagentManager) sendTaskMessage(taskID, fromAgent, msgType, message string, requiresReply bool, replyTo string) bool {
sm.mu.Lock()
defer sm.mu.Unlock()
t, ok := sm.tasks[taskID]
if !ok {
return false
}
message = strings.TrimSpace(message)
if message == "" {
return false
}
fromAgent = strings.TrimSpace(fromAgent)
if fromAgent == "" {
fromAgent = "main"
}
t.Updated = time.Now().UnixMilli()
if fromAgent == "main" {
t.Steering = append(t.Steering, message)
}
if strings.TrimSpace(replyTo) == "" {
replyTo = t.LastMessageID
}
toAgent := t.AgentID
if fromAgent != "main" {
toAgent = "main"
}
sm.recordMailboxMessageLocked(t, AgentMessage{
ThreadID: t.ThreadID,
FromAgent: fromAgent,
ToAgent: toAgent,
ReplyTo: replyTo,
CorrelationID: t.CorrelationID,
Type: msgType,
Content: message,
RequiresReply: requiresReply,
Status: "queued",
CreatedAt: t.Updated,
})
switch msgType {
case "control":
sm.persistTaskLocked(t, "steered", message)
case "reply":
sm.persistTaskLocked(t, "reply_sent", message)
default:
sm.persistTaskLocked(t, "message_sent", message)
}
return true
}
func (sm *SubagentManager) consumeThreadInbox(task *SubagentTask) (string, []string) {
if task == nil || sm.mailboxStore == nil {
return "", nil
}
msgs, err := sm.mailboxStore.ThreadInbox(task.ThreadID, task.AgentID, 0)
if err != nil || len(msgs) == 0 {
return "", nil
}
var sb strings.Builder
consumed := make([]string, 0, len(msgs))
now := time.Now().UnixMilli()
for _, msg := range msgs {
if _, err := sm.mailboxStore.UpdateMessageStatus(msg.MessageID, "processing", now); err != nil {
continue
}
consumed = append(consumed, msg.MessageID)
sb.WriteString(fmt.Sprintf("- [%s] from=%s type=%s", msg.MessageID, msg.FromAgent, msg.Type))
if strings.TrimSpace(msg.ReplyTo) != "" {
sb.WriteString(fmt.Sprintf(" reply_to=%s", msg.ReplyTo))
}
sb.WriteString("\n")
sb.WriteString(strings.TrimSpace(msg.Content))
sb.WriteString("\n")
}
return strings.TrimSpace(sb.String()), consumed
}
func (sm *SubagentManager) restoreMessageStatuses(messageIDs []string) {
if sm.mailboxStore == nil || len(messageIDs) == 0 {
return
}
now := time.Now().UnixMilli()
for _, messageID := range messageIDs {
_, _ = sm.mailboxStore.UpdateMessageStatus(messageID, "queued", now)
}
}
func (sm *SubagentManager) ackMessageStatuses(messageIDs []string) {
if sm.mailboxStore == nil || len(messageIDs) == 0 {
return
}
now := time.Now().UnixMilli()
for _, messageID := range messageIDs {
_, _ = sm.mailboxStore.UpdateMessageStatus(messageID, "acked", now)
}
}