Release v0.1.0 agent topology and runtime refresh

This commit is contained in:
lpf
2026-03-06 17:44:13 +08:00
parent ac5a1bfcb2
commit 7d9ca89476
34 changed files with 1216 additions and 1462 deletions

View File

@@ -69,14 +69,6 @@ func (t *SpawnTool) Parameters() map[string]interface{} {
"type": "integer",
"description": "Optional result size quota in characters.",
},
"pipeline_id": map[string]interface{}{
"type": "string",
"description": "Optional pipeline ID for orchestrated multi-agent workflow",
},
"task_id": map[string]interface{}{
"type": "string",
"description": "Optional task ID under the pipeline",
},
"channel": map[string]interface{}{
"type": "string",
"description": "Optional origin channel override",
@@ -111,8 +103,6 @@ func (t *SpawnTool) Execute(ctx context.Context, args map[string]interface{}) (s
timeoutSec := intArg(args, "timeout_sec")
maxTaskChars := intArg(args, "max_task_chars")
maxResultChars := intArg(args, "max_result_chars")
pipelineID, _ := args["pipeline_id"].(string)
taskID, _ := args["task_id"].(string)
if label == "" && role != "" {
label = role
} else if label == "" && agentID != "" {
@@ -150,8 +140,6 @@ func (t *SpawnTool) Execute(ctx context.Context, args map[string]interface{}) (s
MaxResultChars: maxResultChars,
OriginChannel: originChannel,
OriginChatID: originChatID,
PipelineID: pipelineID,
PipelineTask: taskID,
})
if err != nil {
return "", fmt.Errorf("failed to spawn subagent: %w", err)

View File

@@ -33,8 +33,6 @@ type SubagentTask struct {
MaxTaskChars int `json:"max_task_chars,omitempty"`
MaxResultChars int `json:"max_result_chars,omitempty"`
RetryCount int `json:"retry_count,omitempty"`
PipelineID string `json:"pipeline_id,omitempty"`
PipelineTask string `json:"pipeline_task,omitempty"`
ThreadID string `json:"thread_id,omitempty"`
CorrelationID string `json:"correlation_id,omitempty"`
ParentRunID string `json:"parent_run_id,omitempty"`
@@ -57,7 +55,6 @@ type SubagentManager struct {
mu sync.RWMutex
provider providers.LLMProvider
bus *bus.MessageBus
orc *Orchestrator
workspace string
nextID int
runFunc SubagentRunFunc
@@ -78,14 +75,12 @@ type SubagentSpawnOptions struct {
MaxResultChars int
OriginChannel string
OriginChatID string
PipelineID string
PipelineTask string
ThreadID string
CorrelationID string
ParentRunID string
}
func NewSubagentManager(provider providers.LLMProvider, workspace string, bus *bus.MessageBus, orc *Orchestrator) *SubagentManager {
func NewSubagentManager(provider providers.LLMProvider, workspace string, bus *bus.MessageBus) *SubagentManager {
store := NewSubagentProfileStore(workspace)
runStore := NewSubagentRunStore(workspace)
mailboxStore := NewAgentMailboxStore(workspace)
@@ -95,7 +90,6 @@ func NewSubagentManager(provider providers.LLMProvider, workspace string, bus *b
archiveAfterMinute: 60,
provider: provider,
bus: bus,
orc: orc,
workspace: workspace,
nextID: 1,
profileStore: store,
@@ -123,9 +117,6 @@ func (sm *SubagentManager) Spawn(ctx context.Context, opts SubagentSpawnOptions)
if task.Role != "" {
desc += fmt.Sprintf(" role=%s", task.Role)
}
if task.PipelineID != "" && task.PipelineTask != "" {
desc += fmt.Sprintf(" (pipeline=%s task=%s)", task.PipelineID, task.PipelineTask)
}
return desc, nil
}
@@ -240,8 +231,6 @@ func (sm *SubagentManager) spawnTask(ctx context.Context, opts SubagentSpawnOpti
}
originChannel := strings.TrimSpace(opts.OriginChannel)
originChatID := strings.TrimSpace(opts.OriginChatID)
pipelineID := strings.TrimSpace(opts.PipelineID)
pipelineTask := strings.TrimSpace(opts.PipelineTask)
threadID := strings.TrimSpace(opts.ThreadID)
correlationID := strings.TrimSpace(opts.CorrelationID)
parentRunID := strings.TrimSpace(opts.ParentRunID)
@@ -291,8 +280,6 @@ func (sm *SubagentManager) spawnTask(ctx context.Context, opts SubagentSpawnOpti
MaxTaskChars: maxTaskChars,
MaxResultChars: maxResultChars,
RetryCount: 0,
PipelineID: pipelineID,
PipelineTask: pipelineTask,
ThreadID: threadID,
CorrelationID: correlationID,
ParentRunID: parentRunID,
@@ -336,9 +323,6 @@ func (sm *SubagentManager) runTask(ctx context.Context, task *SubagentTask) {
sm.persistTaskLocked(task, "started", "")
sm.mu.Unlock()
if sm.orc != nil && task.PipelineID != "" && task.PipelineTask != "" {
_ = sm.orc.MarkTaskRunning(task.PipelineID, task.PipelineTask)
}
result, runErr := sm.runWithRetry(ctx, task)
sm.mu.Lock()
if runErr != nil {
@@ -359,9 +343,6 @@ func (sm *SubagentManager) runTask(ctx context.Context, task *SubagentTask) {
CreatedAt: task.Updated,
})
sm.persistTaskLocked(task, "completed", task.Result)
if sm.orc != nil && task.PipelineID != "" && task.PipelineTask != "" {
_ = sm.orc.MarkTaskDone(task.PipelineID, task.PipelineTask, task.Result, runErr)
}
} else {
task.Status = "completed"
task.Result = applySubagentResultQuota(result, task.MaxResultChars)
@@ -379,9 +360,6 @@ func (sm *SubagentManager) runTask(ctx context.Context, task *SubagentTask) {
CreatedAt: task.Updated,
})
sm.persistTaskLocked(task, "completed", task.Result)
if sm.orc != nil && task.PipelineID != "" && task.PipelineTask != "" {
_ = sm.orc.MarkTaskDone(task.PipelineID, task.PipelineTask, task.Result, nil)
}
}
sm.mu.Unlock()
@@ -399,9 +377,6 @@ func (sm *SubagentManager) runTask(ctx context.Context, task *SubagentTask) {
}
}
announceContent := fmt.Sprintf("%s.\n\nResult:\n%s", prefix, task.Result)
if task.PipelineID != "" && task.PipelineTask != "" {
announceContent += fmt.Sprintf("\n\nPipeline: %s\nPipeline Task: %s", task.PipelineID, task.PipelineTask)
}
sm.bus.PublishInbound(bus.InboundMessage{
Channel: "system",
SenderID: fmt.Sprintf("subagent:%s", task.ID),
@@ -409,17 +384,15 @@ func (sm *SubagentManager) runTask(ctx context.Context, task *SubagentTask) {
SessionKey: task.SessionKey,
Content: announceContent,
Metadata: map[string]string{
"trigger": "subagent",
"subagent_id": task.ID,
"agent_id": task.AgentID,
"role": task.Role,
"session_key": task.SessionKey,
"memory_ns": task.MemoryNS,
"retry_count": fmt.Sprintf("%d", task.RetryCount),
"timeout_sec": fmt.Sprintf("%d", task.TimeoutSec),
"pipeline_id": task.PipelineID,
"pipeline_task": task.PipelineTask,
"status": task.Status,
"trigger": "subagent",
"subagent_id": task.ID,
"agent_id": task.AgentID,
"role": task.Role,
"session_key": task.SessionKey,
"memory_ns": task.MemoryNS,
"retry_count": fmt.Sprintf("%d", task.RetryCount),
"timeout_sec": fmt.Sprintf("%d", task.TimeoutSec),
"status": task.Status,
},
})
}
@@ -692,8 +665,6 @@ func (sm *SubagentManager) ResumeTask(ctx context.Context, taskID string) (strin
MaxResultChars: t.MaxResultChars,
OriginChannel: t.OriginChannel,
OriginChatID: t.OriginChatID,
PipelineID: t.PipelineID,
PipelineTask: t.PipelineTask,
ThreadID: t.ThreadID,
CorrelationID: t.CorrelationID,
ParentRunID: t.ID,

View File

@@ -382,6 +382,9 @@ func (s *SubagentProfileStore) nodeProfileLocked(agentID string) (SubagentProfil
}
}
for _, node := range nodes.DefaultManager().List() {
if isLocalNode(node.ID) {
continue
}
profile := profileFromNode(node, parentAgentID)
if profile.AgentID == id {
return profile, true
@@ -430,6 +433,9 @@ func (s *SubagentProfileStore) nodeProfilesLocked() []SubagentProfile {
}
out := make([]SubagentProfile, 0, len(nodeItems))
for _, node := range nodeItems {
if isLocalNode(node.ID) {
continue
}
profile := profileFromNode(node, parentAgentID)
if profile.AgentID == "" {
continue
@@ -473,6 +479,10 @@ func nodeBranchAgentID(nodeID string) string {
return "node." + id + ".main"
}
func isLocalNode(nodeID string) bool {
return normalizeSubagentIdentifier(nodeID) == "local"
}
type SubagentProfileTool struct {
store *SubagentProfileStore
}
@@ -504,7 +514,7 @@ func (t *SubagentProfileTool) Parameters() map[string]interface{} {
"status": map[string]interface{}{"type": "string", "description": "active|disabled"},
"tool_allowlist": map[string]interface{}{
"type": "array",
"description": "Tool allowlist entries. Supports tool names, '*'/'all', and grouped tokens like 'group:files_read' or '@pipeline'.",
"description": "Tool allowlist entries. Supports tool names, '*'/'all', and grouped tokens like 'group:files_read'.",
"items": map[string]interface{}{"type": "string"},
},
"max_retries": map[string]interface{}{"type": "integer", "description": "Retry limit for subagent task execution."},

View File

@@ -46,7 +46,7 @@ func TestSubagentProfileStoreNormalization(t *testing.T) {
func TestSubagentManagerSpawnRejectsDisabledProfile(t *testing.T) {
workspace := t.TempDir()
manager := NewSubagentManager(nil, workspace, nil, nil)
manager := NewSubagentManager(nil, workspace, nil)
manager.SetRunFunc(func(ctx context.Context, task *SubagentTask) (string, error) {
return "ok", nil
})
@@ -74,7 +74,7 @@ func TestSubagentManagerSpawnRejectsDisabledProfile(t *testing.T) {
func TestSubagentManagerSpawnResolvesProfileByRole(t *testing.T) {
workspace := t.TempDir()
manager := NewSubagentManager(nil, workspace, nil, nil)
manager := NewSubagentManager(nil, workspace, nil)
store := manager.ProfileStore()
if store == nil {
t.Fatalf("expected profile store to be available")
@@ -234,3 +234,44 @@ func TestSubagentProfileStoreIncludesNodeMainBranchProfiles(t *testing.T) {
t.Fatalf("expected node-managed delete to fail")
}
}
func TestSubagentProfileStoreExcludesLocalNodeMainBranchProfile(t *testing.T) {
runtimecfg.Set(config.DefaultConfig())
t.Cleanup(func() {
runtimecfg.Set(config.DefaultConfig())
nodes.DefaultManager().Remove("local")
})
cfg := config.DefaultConfig()
cfg.Agents.Router.Enabled = true
cfg.Agents.Router.MainAgentID = "main"
cfg.Agents.Subagents["main"] = config.SubagentConfig{
Enabled: true,
Type: "router",
SystemPromptFile: "agents/main/AGENT.md",
}
runtimecfg.Set(cfg)
nodes.DefaultManager().Upsert(nodes.NodeInfo{
ID: "local",
Name: "Local",
Online: true,
})
store := NewSubagentProfileStore(t.TempDir())
if profile, ok, err := store.Get(nodeBranchAgentID("local")); err != nil {
t.Fatalf("get failed: %v", err)
} else if ok {
t.Fatalf("expected local node branch profile to be excluded, got %+v", profile)
}
items, err := store.List()
if err != nil {
t.Fatalf("list failed: %v", err)
}
for _, item := range items {
if item.AgentID == nodeBranchAgentID("local") {
t.Fatalf("local node branch profile should not appear in list")
}
}
}

View File

@@ -9,7 +9,7 @@ import (
func TestSubagentRouterDispatchAndWaitReply(t *testing.T) {
workspace := t.TempDir()
manager := NewSubagentManager(nil, workspace, nil, nil)
manager := NewSubagentManager(nil, workspace, nil)
manager.SetRunFunc(func(ctx context.Context, task *SubagentTask) (string, error) {
return "router-result", nil
})

View File

@@ -17,7 +17,7 @@ func TestSubagentSpawnEnforcesTaskQuota(t *testing.T) {
t.Parallel()
workspace := t.TempDir()
manager := NewSubagentManager(nil, workspace, nil, nil)
manager := NewSubagentManager(nil, workspace, nil)
manager.SetRunFunc(func(ctx context.Context, task *SubagentTask) (string, error) {
return "ok", nil
})
@@ -45,7 +45,7 @@ func TestSubagentSpawnEnforcesTaskQuota(t *testing.T) {
func TestSubagentRunWithRetryEventuallySucceeds(t *testing.T) {
workspace := t.TempDir()
manager := NewSubagentManager(nil, workspace, nil, nil)
manager := NewSubagentManager(nil, workspace, nil)
attempts := 0
manager.SetRunFunc(func(ctx context.Context, task *SubagentTask) (string, error) {
attempts++
@@ -81,7 +81,7 @@ func TestSubagentRunWithRetryEventuallySucceeds(t *testing.T) {
func TestSubagentRunWithTimeoutFails(t *testing.T) {
workspace := t.TempDir()
manager := NewSubagentManager(nil, workspace, nil, nil)
manager := NewSubagentManager(nil, workspace, nil)
manager.SetRunFunc(func(ctx context.Context, task *SubagentTask) (string, error) {
select {
case <-ctx.Done():
@@ -116,7 +116,7 @@ func TestSubagentBroadcastIncludesFailureStatus(t *testing.T) {
msgBus := bus.NewMessageBus()
defer msgBus.Close()
manager := NewSubagentManager(nil, workspace, msgBus, nil)
manager := NewSubagentManager(nil, workspace, msgBus)
manager.SetRunFunc(func(ctx context.Context, task *SubagentTask) (string, error) {
return "", errors.New("boom")
})
@@ -152,7 +152,7 @@ func TestSubagentBroadcastIncludesFailureStatus(t *testing.T) {
func TestSubagentManagerRestoresPersistedRuns(t *testing.T) {
workspace := t.TempDir()
manager := NewSubagentManager(nil, workspace, nil, nil)
manager := NewSubagentManager(nil, workspace, nil)
manager.SetRunFunc(func(ctx context.Context, task *SubagentTask) (string, error) {
return "persisted", nil
})
@@ -172,7 +172,7 @@ func TestSubagentManagerRestoresPersistedRuns(t *testing.T) {
t.Fatalf("expected completed task, got %s", task.Status)
}
reloaded := NewSubagentManager(nil, workspace, nil, nil)
reloaded := NewSubagentManager(nil, workspace, nil)
got, ok := reloaded.GetTask(task.ID)
if !ok {
t.Fatalf("expected persisted task to reload")
@@ -207,7 +207,7 @@ func TestSubagentManagerRestoresPersistedRuns(t *testing.T) {
func TestSubagentManagerPersistsEvents(t *testing.T) {
workspace := t.TempDir()
manager := NewSubagentManager(nil, workspace, nil, nil)
manager := NewSubagentManager(nil, workspace, nil)
manager.SetRunFunc(func(ctx context.Context, task *SubagentTask) (string, error) {
time.Sleep(100 * time.Millisecond)
return "ok", nil
@@ -249,7 +249,7 @@ func TestSubagentManagerPersistsEvents(t *testing.T) {
func TestSubagentMailboxStoresThreadAndReplies(t *testing.T) {
workspace := t.TempDir()
manager := NewSubagentManager(nil, workspace, nil, nil)
manager := NewSubagentManager(nil, workspace, nil)
manager.SetRunFunc(func(ctx context.Context, task *SubagentTask) (string, error) {
return "done", nil
})
@@ -294,7 +294,7 @@ func TestSubagentMailboxStoresThreadAndReplies(t *testing.T) {
func TestSubagentMailboxInboxIncludesControlMessages(t *testing.T) {
workspace := t.TempDir()
manager := NewSubagentManager(nil, workspace, nil, nil)
manager := NewSubagentManager(nil, workspace, nil)
manager.SetRunFunc(func(ctx context.Context, task *SubagentTask) (string, error) {
time.Sleep(150 * time.Millisecond)
return "ok", nil
@@ -336,7 +336,7 @@ func TestSubagentMailboxInboxIncludesControlMessages(t *testing.T) {
func TestSubagentMailboxReplyAndAckFlow(t *testing.T) {
workspace := t.TempDir()
manager := NewSubagentManager(nil, workspace, nil, nil)
manager := NewSubagentManager(nil, workspace, nil)
manager.SetRunFunc(func(ctx context.Context, task *SubagentTask) (string, error) {
time.Sleep(150 * time.Millisecond)
return "ok", nil
@@ -405,7 +405,7 @@ func TestSubagentMailboxReplyAndAckFlow(t *testing.T) {
func TestSubagentResumeConsumesQueuedThreadInbox(t *testing.T) {
workspace := t.TempDir()
manager := NewSubagentManager(nil, workspace, nil, nil)
manager := NewSubagentManager(nil, workspace, nil)
observedQueued := make(chan int, 4)
manager.SetRunFunc(func(ctx context.Context, task *SubagentTask) (string, error) {
inbox, err := manager.TaskInbox(task.ID, 10)
@@ -507,7 +507,7 @@ func TestSubagentUsesConfiguredSystemPromptFile(t *testing.T) {
t.Fatalf("write coder AGENT failed: %v", err)
}
provider := &captureProvider{}
manager := NewSubagentManager(provider, workspace, nil, nil)
manager := NewSubagentManager(provider, workspace, nil)
if _, err := manager.ProfileStore().Upsert(SubagentProfile{
AgentID: "coder",
Status: "active",

View File

@@ -43,12 +43,6 @@ var defaultToolAllowlistGroups = []ToolAllowlistGroup{
Aliases: []string{"memory"},
Tools: []string{"memory_search", "memory_get", "memory_write"},
},
{
Name: "pipeline",
Description: "Pipeline orchestration tools",
Aliases: []string{"pipelines"},
Tools: []string{"pipeline_create", "pipeline_status", "pipeline_state_set", "pipeline_dispatch"},
},
{
Name: "subagents",
Description: "Subagent management tools",

View File

@@ -17,7 +17,7 @@ func TestExpandToolAllowlistEntries_GroupPrefix(t *testing.T) {
}
func TestExpandToolAllowlistEntries_BareGroupAndAlias(t *testing.T) {
got := ExpandToolAllowlistEntries([]string{"memory_all", "@pipeline"})
got := ExpandToolAllowlistEntries([]string{"memory_all", "@subagents"})
contains := map[string]bool{}
for _, item := range got {
contains[item] = true
@@ -25,7 +25,7 @@ func TestExpandToolAllowlistEntries_BareGroupAndAlias(t *testing.T) {
if !contains["memory_search"] || !contains["memory_write"] {
t.Fatalf("memory_all expansion missing memory tools: %v", got)
}
if !contains["pipeline_dispatch"] || !contains["pipeline_status"] {
t.Fatalf("pipeline alias expansion missing pipeline tools: %v", got)
if !contains["spawn"] || !contains["subagents"] || !contains["subagent_profile"] {
t.Fatalf("subagents alias expansion missing subagent tools: %v", got)
}
}