feat: add session auto-planning and resource-based concurrency scheduling

This commit is contained in:
lpf
2026-03-04 12:44:31 +08:00
parent ba8cfbe131
commit 154ab3f7f9
16 changed files with 1193 additions and 486 deletions

View File

@@ -27,6 +27,7 @@ import (
"clawgo/pkg/logger"
"clawgo/pkg/nodes"
"clawgo/pkg/providers"
"clawgo/pkg/scheduling"
"clawgo/pkg/session"
"clawgo/pkg/tools"
)
@@ -53,12 +54,13 @@ type AgentLoop struct {
runtimeCompactionNote string
startupCompactionNote string
systemRewriteTemplate string
sessionAutoPlan bool
sessionAutoPlanMax int
audit *triggerAudit
running bool
intentMu sync.RWMutex
intentHints map[string]string
sessionRunMu sync.Mutex
sessionRunLocks map[string]*sync.Mutex
sessionScheduler *SessionScheduler
providerNames []string
providerPool map[string]providers.LLMProvider
providerResponses map[string]config.ProviderResponsesConfig
@@ -240,16 +242,24 @@ func NewAgentLoop(cfg *config.Config, msgBus *bus.MessageBus, provider providers
runtimeCompactionNote: cfg.Agents.Defaults.Texts.RuntimeCompactionNote,
startupCompactionNote: cfg.Agents.Defaults.Texts.StartupCompactionNote,
systemRewriteTemplate: cfg.Agents.Defaults.Texts.SystemRewriteTemplate,
sessionAutoPlan: cfg.Agents.Defaults.RuntimeControl.SessionAutoPlanEnabled,
sessionAutoPlanMax: cfg.Agents.Defaults.RuntimeControl.SessionAutoPlanMaxTasks,
audit: newTriggerAudit(workspace),
running: false,
intentHints: map[string]string{},
sessionRunLocks: map[string]*sync.Mutex{},
sessionScheduler: NewSessionScheduler(cfg.Agents.Defaults.RuntimeControl.SessionMaxParallelRuns),
ekg: ekg.New(workspace),
sessionProvider: map[string]string{},
sessionStreamed: map[string]bool{},
providerResponses: map[string]config.ProviderResponsesConfig{},
telegramStreaming: cfg.Channels.Telegram.Streaming,
}
if !cfg.Agents.Defaults.RuntimeControl.SessionResourceSchedulingEnabled {
loop.sessionScheduler = nil
}
if loop.sessionAutoPlanMax <= 0 {
loop.sessionAutoPlanMax = 4
}
// Initialize provider fallback chain (primary + proxy_fallbacks).
loop.providerPool = map[string]providers.LLMProvider{}
@@ -345,22 +355,6 @@ func (al *AgentLoop) buildSessionShards(ctx context.Context) []chan bus.InboundM
return shards
}
func (al *AgentLoop) lockSessionRun(sessionKey string) func() {
key := strings.TrimSpace(sessionKey)
if key == "" {
key = "default"
}
al.sessionRunMu.Lock()
mu, ok := al.sessionRunLocks[key]
if !ok {
mu = &sync.Mutex{}
al.sessionRunLocks[key] = mu
}
al.sessionRunMu.Unlock()
mu.Lock()
return func() { mu.Unlock() }
}
func (al *AgentLoop) tryFallbackProviders(ctx context.Context, msg bus.InboundMessage, messages []providers.Message, toolDefs []providers.ToolDefinition, options map[string]interface{}, primaryErr error) (*providers.LLMResponse, string, error) {
if len(al.providerNames) <= 1 {
return nil, "", primaryErr
@@ -452,7 +446,7 @@ func (al *AgentLoop) processInbound(ctx context.Context, msg bus.InboundMessage)
started := time.Now()
al.appendTaskAuditEvent(taskID, msg, "running", started, 0, "started", false)
response, err := al.processMessage(ctx, msg)
response, err := al.processPlannedMessage(ctx, msg)
if err != nil {
response = fmt.Sprintf("Error processing message: %v", err)
}
@@ -676,7 +670,7 @@ func (al *AgentLoop) ProcessDirect(ctx context.Context, content, sessionKey stri
SessionKey: sessionKey,
}
return al.processMessage(ctx, msg)
return al.processPlannedMessage(ctx, msg)
}
func (al *AgentLoop) GetSessionHistory(sessionKey string) []providers.Message {
@@ -687,8 +681,11 @@ func (al *AgentLoop) processMessage(ctx context.Context, msg bus.InboundMessage)
if msg.SessionKey == "" {
msg.SessionKey = "main"
}
unlock := al.lockSessionRun(msg.SessionKey)
defer unlock()
release, err := al.acquireSessionResources(ctx, &msg)
if err != nil {
return "", err
}
defer release()
if len(al.providerNames) > 0 {
al.setSessionProvider(msg.SessionKey, al.providerNames[0])
}
@@ -1038,6 +1035,36 @@ func (al *AgentLoop) processMessage(ctx context.Context, msg bus.InboundMessage)
return userContent, nil
}
func (al *AgentLoop) acquireSessionResources(ctx context.Context, msg *bus.InboundMessage) (func(), error) {
if al == nil || msg == nil || al.sessionScheduler == nil {
return func() {}, nil
}
keys, cleaned := al.resolveMessageResourceKeys(msg)
msg.Content = cleaned
return al.sessionScheduler.Acquire(ctx, msg.SessionKey, keys)
}
func (al *AgentLoop) resolveMessageResourceKeys(msg *bus.InboundMessage) ([]string, string) {
if msg == nil {
return nil, ""
}
content := msg.Content
if msg.Metadata != nil {
if raw := strings.TrimSpace(msg.Metadata["resource_keys"]); raw != "" {
if explicit := scheduling.ParseResourceKeyList(raw); len(explicit) > 0 {
return explicit, content
}
}
}
if explicit, cleaned, ok := scheduling.ExtractResourceKeysDirective(content); ok {
if strings.TrimSpace(cleaned) != "" {
content = cleaned
}
return explicit, content
}
return scheduling.DeriveResourceKeys(content), content
}
func (al *AgentLoop) appendDailySummaryLog(msg bus.InboundMessage, response string) {
if strings.TrimSpace(al.workspace) == "" {
return