mirror of
https://github.com/YspCoder/clawgo.git
synced 2026-04-13 20:47:49 +08:00
add Automatically compacted context
This commit is contained in:
@@ -44,6 +44,7 @@ type AgentLoop struct {
|
||||
tools *tools.ToolRegistry
|
||||
orchestrator *tools.Orchestrator
|
||||
running atomic.Bool
|
||||
compactionCfg config.ContextCompactionConfig
|
||||
}
|
||||
|
||||
func NewAgentLoop(cfg *config.Config, msgBus *bus.MessageBus, provider providers.LLMProvider, cs *cron.CronService) *AgentLoop {
|
||||
@@ -125,6 +126,7 @@ func NewAgentLoop(cfg *config.Config, msgBus *bus.MessageBus, provider providers
|
||||
contextBuilder: NewContextBuilder(workspace, cfg.Memory, func() []string { return toolsRegistry.GetSummaries() }),
|
||||
tools: toolsRegistry,
|
||||
orchestrator: orchestrator,
|
||||
compactionCfg: cfg.Agents.Defaults.ContextCompaction,
|
||||
}
|
||||
|
||||
// 注入递归运行逻辑,使 subagent 具备 full tool-calling 能力
|
||||
@@ -259,7 +261,7 @@ func (al *AgentLoop) processMessage(ctx context.Context, msg bus.InboundMessage)
|
||||
Content: userContent,
|
||||
})
|
||||
|
||||
if err := al.sessions.Save(al.sessions.GetOrCreate(msg.SessionKey)); err != nil {
|
||||
if err := al.persistSessionWithCompaction(ctx, msg.SessionKey); err != nil {
|
||||
logger.WarnCF("agent", "Failed to save session metadata", map[string]interface{}{
|
||||
"session_key": msg.SessionKey,
|
||||
logger.FieldError: err.Error(),
|
||||
@@ -348,7 +350,7 @@ func (al *AgentLoop) processSystemMessage(ctx context.Context, msg bus.InboundMe
|
||||
Content: finalContent,
|
||||
})
|
||||
|
||||
if err := al.sessions.Save(al.sessions.GetOrCreate(sessionKey)); err != nil {
|
||||
if err := al.persistSessionWithCompaction(ctx, sessionKey); err != nil {
|
||||
logger.WarnCF("agent", "Failed to save session metadata", map[string]interface{}{
|
||||
"session_key": sessionKey,
|
||||
logger.FieldError: err.Error(),
|
||||
@@ -718,6 +720,124 @@ func formatToolsForLog(tools []providers.ToolDefinition) string {
|
||||
return result
|
||||
}
|
||||
|
||||
func (al *AgentLoop) persistSessionWithCompaction(ctx context.Context, sessionKey string) error {
|
||||
if err := al.maybeCompactContext(ctx, sessionKey); err != nil {
|
||||
logger.WarnCF("agent", "Context compaction skipped due to error", map[string]interface{}{
|
||||
"session_key": sessionKey,
|
||||
logger.FieldError: err.Error(),
|
||||
})
|
||||
}
|
||||
return al.sessions.Save(al.sessions.GetOrCreate(sessionKey))
|
||||
}
|
||||
|
||||
func (al *AgentLoop) maybeCompactContext(ctx context.Context, sessionKey string) error {
|
||||
cfg := al.compactionCfg
|
||||
if !cfg.Enabled {
|
||||
return nil
|
||||
}
|
||||
|
||||
messageCount := al.sessions.MessageCount(sessionKey)
|
||||
if messageCount < cfg.TriggerMessages {
|
||||
return nil
|
||||
}
|
||||
|
||||
history := al.sessions.GetHistory(sessionKey)
|
||||
if len(history) < cfg.TriggerMessages {
|
||||
return nil
|
||||
}
|
||||
if cfg.KeepRecentMessages >= len(history) {
|
||||
return nil
|
||||
}
|
||||
|
||||
summary := al.sessions.GetSummary(sessionKey)
|
||||
compactUntil := len(history) - cfg.KeepRecentMessages
|
||||
compactCtx, cancel := context.WithTimeout(ctx, 25*time.Second)
|
||||
defer cancel()
|
||||
newSummary, err := al.buildCompactedSummary(compactCtx, summary, history[:compactUntil], cfg.MaxTranscriptChars)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
newSummary = strings.TrimSpace(newSummary)
|
||||
if newSummary == "" {
|
||||
return nil
|
||||
}
|
||||
if len(newSummary) > cfg.MaxSummaryChars {
|
||||
newSummary = truncateString(newSummary, cfg.MaxSummaryChars)
|
||||
}
|
||||
|
||||
before, after, err := al.sessions.CompactHistory(sessionKey, newSummary, cfg.KeepRecentMessages)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
logger.InfoCF("agent", "Context compacted automatically", map[string]interface{}{
|
||||
"session_key": sessionKey,
|
||||
"before_messages": before,
|
||||
"after_messages": after,
|
||||
"kept_recent": cfg.KeepRecentMessages,
|
||||
"summary_chars": len(newSummary),
|
||||
"trigger_messages": cfg.TriggerMessages,
|
||||
})
|
||||
return nil
|
||||
}
|
||||
|
||||
func (al *AgentLoop) buildCompactedSummary(
|
||||
ctx context.Context,
|
||||
existingSummary string,
|
||||
messages []providers.Message,
|
||||
maxTranscriptChars int,
|
||||
) (string, error) {
|
||||
transcript := formatCompactionTranscript(messages, maxTranscriptChars)
|
||||
if strings.TrimSpace(transcript) == "" {
|
||||
return strings.TrimSpace(existingSummary), nil
|
||||
}
|
||||
|
||||
systemPrompt := "You are a conversation compactor. Merge prior summary and transcript into a concise, factual memory for future turns. Keep user preferences, constraints, decisions, unresolved tasks, and key technical context. Do not include speculative content."
|
||||
userPrompt := fmt.Sprintf("Existing summary:\n%s\n\nTranscript to compact:\n%s\n\nReturn a compact markdown summary with sections: Key Facts, Decisions, Open Items, Next Steps.",
|
||||
strings.TrimSpace(existingSummary), transcript)
|
||||
|
||||
resp, err := al.callLLMWithModelFallback(ctx, []providers.Message{
|
||||
{Role: "system", Content: systemPrompt},
|
||||
{Role: "user", Content: userPrompt},
|
||||
}, nil, map[string]interface{}{
|
||||
"max_tokens": 1200,
|
||||
"temperature": 0.2,
|
||||
})
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
return resp.Content, nil
|
||||
}
|
||||
|
||||
func formatCompactionTranscript(messages []providers.Message, maxChars int) string {
|
||||
if maxChars <= 0 || len(messages) == 0 {
|
||||
return ""
|
||||
}
|
||||
|
||||
var sb strings.Builder
|
||||
used := 0
|
||||
for _, m := range messages {
|
||||
role := strings.TrimSpace(m.Role)
|
||||
if role == "" {
|
||||
role = "unknown"
|
||||
}
|
||||
line := fmt.Sprintf("[%s] %s\n", role, strings.TrimSpace(m.Content))
|
||||
if len(line) > 1200 {
|
||||
line = truncateString(line, 1200) + "\n"
|
||||
}
|
||||
if used+len(line) > maxChars {
|
||||
remain := maxChars - used
|
||||
if remain > 16 {
|
||||
sb.WriteString(truncateString(line, remain))
|
||||
}
|
||||
break
|
||||
}
|
||||
sb.WriteString(line)
|
||||
used += len(line)
|
||||
}
|
||||
return strings.TrimSpace(sb.String())
|
||||
}
|
||||
|
||||
func (al *AgentLoop) handleSlashCommand(content string) (bool, string, error) {
|
||||
text := strings.TrimSpace(content)
|
||||
if !strings.HasPrefix(text, "/") {
|
||||
|
||||
@@ -27,12 +27,21 @@ type AgentsConfig struct {
|
||||
}
|
||||
|
||||
type AgentDefaults struct {
|
||||
Workspace string `json:"workspace" env:"CLAWGO_AGENTS_DEFAULTS_WORKSPACE"`
|
||||
Model string `json:"model" env:"CLAWGO_AGENTS_DEFAULTS_MODEL"`
|
||||
ModelFallbacks []string `json:"model_fallbacks" env:"CLAWGO_AGENTS_DEFAULTS_MODEL_FALLBACKS"`
|
||||
MaxTokens int `json:"max_tokens" env:"CLAWGO_AGENTS_DEFAULTS_MAX_TOKENS"`
|
||||
Temperature float64 `json:"temperature" env:"CLAWGO_AGENTS_DEFAULTS_TEMPERATURE"`
|
||||
MaxToolIterations int `json:"max_tool_iterations" env:"CLAWGO_AGENTS_DEFAULTS_MAX_TOOL_ITERATIONS"`
|
||||
Workspace string `json:"workspace" env:"CLAWGO_AGENTS_DEFAULTS_WORKSPACE"`
|
||||
Model string `json:"model" env:"CLAWGO_AGENTS_DEFAULTS_MODEL"`
|
||||
ModelFallbacks []string `json:"model_fallbacks" env:"CLAWGO_AGENTS_DEFAULTS_MODEL_FALLBACKS"`
|
||||
MaxTokens int `json:"max_tokens" env:"CLAWGO_AGENTS_DEFAULTS_MAX_TOKENS"`
|
||||
Temperature float64 `json:"temperature" env:"CLAWGO_AGENTS_DEFAULTS_TEMPERATURE"`
|
||||
MaxToolIterations int `json:"max_tool_iterations" env:"CLAWGO_AGENTS_DEFAULTS_MAX_TOOL_ITERATIONS"`
|
||||
ContextCompaction ContextCompactionConfig `json:"context_compaction"`
|
||||
}
|
||||
|
||||
type ContextCompactionConfig struct {
|
||||
Enabled bool `json:"enabled" env:"CLAWGO_AGENTS_DEFAULTS_CONTEXT_COMPACTION_ENABLED"`
|
||||
TriggerMessages int `json:"trigger_messages" env:"CLAWGO_AGENTS_DEFAULTS_CONTEXT_COMPACTION_TRIGGER_MESSAGES"`
|
||||
KeepRecentMessages int `json:"keep_recent_messages" env:"CLAWGO_AGENTS_DEFAULTS_CONTEXT_COMPACTION_KEEP_RECENT_MESSAGES"`
|
||||
MaxSummaryChars int `json:"max_summary_chars" env:"CLAWGO_AGENTS_DEFAULTS_CONTEXT_COMPACTION_MAX_SUMMARY_CHARS"`
|
||||
MaxTranscriptChars int `json:"max_transcript_chars" env:"CLAWGO_AGENTS_DEFAULTS_CONTEXT_COMPACTION_MAX_TRANSCRIPT_CHARS"`
|
||||
}
|
||||
|
||||
type ChannelsConfig struct {
|
||||
@@ -213,6 +222,13 @@ func DefaultConfig() *Config {
|
||||
MaxTokens: 8192,
|
||||
Temperature: 0.7,
|
||||
MaxToolIterations: 20,
|
||||
ContextCompaction: ContextCompactionConfig{
|
||||
Enabled: true,
|
||||
TriggerMessages: 60,
|
||||
KeepRecentMessages: 20,
|
||||
MaxSummaryChars: 6000,
|
||||
MaxTranscriptChars: 20000,
|
||||
},
|
||||
},
|
||||
},
|
||||
Channels: ChannelsConfig{
|
||||
|
||||
@@ -17,6 +17,24 @@ func Validate(cfg *Config) []error {
|
||||
if cfg.Agents.Defaults.MaxToolIterations <= 0 {
|
||||
errs = append(errs, fmt.Errorf("agents.defaults.max_tool_iterations must be > 0"))
|
||||
}
|
||||
if cfg.Agents.Defaults.ContextCompaction.Enabled {
|
||||
cc := cfg.Agents.Defaults.ContextCompaction
|
||||
if cc.TriggerMessages <= 0 {
|
||||
errs = append(errs, fmt.Errorf("agents.defaults.context_compaction.trigger_messages must be > 0 when enabled=true"))
|
||||
}
|
||||
if cc.KeepRecentMessages <= 0 {
|
||||
errs = append(errs, fmt.Errorf("agents.defaults.context_compaction.keep_recent_messages must be > 0 when enabled=true"))
|
||||
}
|
||||
if cc.TriggerMessages > 0 && cc.KeepRecentMessages >= cc.TriggerMessages {
|
||||
errs = append(errs, fmt.Errorf("agents.defaults.context_compaction.keep_recent_messages must be < trigger_messages"))
|
||||
}
|
||||
if cc.MaxSummaryChars <= 0 {
|
||||
errs = append(errs, fmt.Errorf("agents.defaults.context_compaction.max_summary_chars must be > 0 when enabled=true"))
|
||||
}
|
||||
if cc.MaxTranscriptChars <= 0 {
|
||||
errs = append(errs, fmt.Errorf("agents.defaults.context_compaction.max_transcript_chars must be > 0 when enabled=true"))
|
||||
}
|
||||
}
|
||||
|
||||
if cfg.Providers.Proxy.APIBase == "" {
|
||||
errs = append(errs, fmt.Errorf("providers.proxy.api_base is required"))
|
||||
|
||||
@@ -3,6 +3,7 @@ package session
|
||||
import (
|
||||
"bufio"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
@@ -120,6 +121,35 @@ func (sm *SessionManager) appendMessage(sessionKey string, msg providers.Message
|
||||
return err
|
||||
}
|
||||
|
||||
func (sm *SessionManager) rewriteHistory(sessionKey string, messages []providers.Message) error {
|
||||
if sm.storage == "" {
|
||||
return nil
|
||||
}
|
||||
|
||||
sessionPath := filepath.Join(sm.storage, sessionKey+".jsonl")
|
||||
tmpPath := sessionPath + ".tmp"
|
||||
|
||||
f, err := os.OpenFile(tmpPath, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0644)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
enc := json.NewEncoder(f)
|
||||
for _, msg := range messages {
|
||||
if err := enc.Encode(msg); err != nil {
|
||||
_ = f.Close()
|
||||
_ = os.Remove(tmpPath)
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if err := f.Close(); err != nil {
|
||||
_ = os.Remove(tmpPath)
|
||||
return err
|
||||
}
|
||||
return os.Rename(tmpPath, sessionPath)
|
||||
}
|
||||
|
||||
func (sm *SessionManager) GetHistory(key string) []providers.Message {
|
||||
sm.mu.RLock()
|
||||
session, ok := sm.sessions[key]
|
||||
@@ -186,6 +216,49 @@ func (sm *SessionManager) TruncateHistory(key string, keepLast int) {
|
||||
session.Updated = time.Now()
|
||||
}
|
||||
|
||||
func (sm *SessionManager) MessageCount(key string) int {
|
||||
sm.mu.RLock()
|
||||
session, ok := sm.sessions[key]
|
||||
sm.mu.RUnlock()
|
||||
if !ok {
|
||||
return 0
|
||||
}
|
||||
|
||||
session.mu.RLock()
|
||||
defer session.mu.RUnlock()
|
||||
return len(session.Messages)
|
||||
}
|
||||
|
||||
func (sm *SessionManager) CompactHistory(key, summary string, keepLast int) (int, int, error) {
|
||||
sm.mu.RLock()
|
||||
session, ok := sm.sessions[key]
|
||||
sm.mu.RUnlock()
|
||||
if !ok {
|
||||
return 0, 0, fmt.Errorf("session not found: %s", key)
|
||||
}
|
||||
|
||||
if keepLast < 0 {
|
||||
keepLast = 0
|
||||
}
|
||||
|
||||
session.mu.Lock()
|
||||
before := len(session.Messages)
|
||||
if keepLast < before {
|
||||
session.Messages = session.Messages[before-keepLast:]
|
||||
}
|
||||
session.Summary = summary
|
||||
session.Updated = time.Now()
|
||||
after := len(session.Messages)
|
||||
msgs := make([]providers.Message, after)
|
||||
copy(msgs, session.Messages)
|
||||
session.mu.Unlock()
|
||||
|
||||
if err := sm.rewriteHistory(key, msgs); err != nil {
|
||||
return before, after, err
|
||||
}
|
||||
return before, after, nil
|
||||
}
|
||||
|
||||
func (sm *SessionManager) Save(session *Session) error {
|
||||
// 现已通过 AddMessageFull 实时增量持久化
|
||||
// 这里保留 Save 方法用于更新 Summary 等元数据
|
||||
|
||||
Reference in New Issue
Block a user