diff --git a/clawgo_test b/clawgo_test index a4e9501..4cbca60 100755 Binary files a/clawgo_test and b/clawgo_test differ diff --git a/pkg/agent/loop.go b/pkg/agent/loop.go index 1a39fcf..4a513fa 100644 --- a/pkg/agent/loop.go +++ b/pkg/agent/loop.go @@ -299,6 +299,8 @@ func (al *AgentLoop) processMessage(ctx context.Context, msg bus.InboundMessage) }) } messages = append(messages, assistantMsg) + // 持久化包含 ToolCalls 的助手消息 + al.sessions.AddMessageFull(msg.SessionKey, assistantMsg) for _, tc := range response.ToolCalls { // Log tool call with arguments preview @@ -321,6 +323,8 @@ func (al *AgentLoop) processMessage(ctx context.Context, msg bus.InboundMessage) ToolCallID: tc.ID, } messages = append(messages, toolResultMsg) + // 持久化工具返回结果 + al.sessions.AddMessageFull(msg.SessionKey, toolResultMsg) } } @@ -343,11 +347,13 @@ func (al *AgentLoop) processMessage(ctx context.Context, msg bus.InboundMessage) } al.sessions.AddMessage(msg.SessionKey, "user", msg.Content) - // We store the filtered content in history so the model sees what the user saw - // (or we could store full content if we want the model to remember its thoughts) - // The prompt says "filter out ... from the user-facing response". - // I'll store the filtered version to be safe. - al.sessions.AddMessage(msg.SessionKey, "assistant", userContent) + + // 使用 AddMessageFull 存储包含思考过程或工具调用的完整助手消息 + al.sessions.AddMessageFull(msg.SessionKey, providers.Message{ + Role: "assistant", + Content: userContent, + }) + al.sessions.Save(al.sessions.GetOrCreate(msg.SessionKey)) // Log response preview (original content) @@ -487,6 +493,8 @@ func (al *AgentLoop) processSystemMessage(ctx context.Context, msg bus.InboundMe }) } messages = append(messages, assistantMsg) + // 持久化包含 ToolCalls 的助手消息 + al.sessions.AddMessageFull(sessionKey, assistantMsg) for _, tc := range response.ToolCalls { result, err := al.tools.Execute(ctx, tc.Name, tc.Arguments) @@ -500,6 +508,8 @@ func (al *AgentLoop) processSystemMessage(ctx context.Context, msg bus.InboundMe ToolCallID: tc.ID, } messages = append(messages, toolResultMsg) + // 持久化工具返回结果 + al.sessions.AddMessageFull(sessionKey, toolResultMsg) } } @@ -509,7 +519,15 @@ func (al *AgentLoop) processSystemMessage(ctx context.Context, msg bus.InboundMe // Save to session with system message marker al.sessions.AddMessage(sessionKey, "user", fmt.Sprintf("[System: %s] %s", msg.SenderID, msg.Content)) - al.sessions.AddMessage(sessionKey, "assistant", finalContent) + + // 如果 finalContent 中没有包含 tool calls (即最后一次 LLM 返回的结果) + // 我们已经通过循环内部的 AddMessageFull 存储了前面的步骤 + // 这里的 AddMessageFull 会存储最终回复 + al.sessions.AddMessageFull(sessionKey, providers.Message{ + Role: "assistant", + Content: finalContent, + }) + al.sessions.Save(al.sessions.GetOrCreate(sessionKey)) logger.InfoCF("agent", "System message processing completed", diff --git a/pkg/session/manager.go b/pkg/session/manager.go index e1aa0c7..06d5244 100644 --- a/pkg/session/manager.go +++ b/pkg/session/manager.go @@ -1,9 +1,11 @@ package session import ( + "bufio" "encoding/json" "os" "path/filepath" + "strings" "sync" "time" @@ -68,16 +70,43 @@ func (sm *SessionManager) GetOrCreate(key string) *Session { } func (sm *SessionManager) AddMessage(sessionKey, role, content string) { - session := sm.GetOrCreate(sessionKey) - - session.mu.Lock() - defer session.mu.Unlock() - - session.Messages = append(session.Messages, providers.Message{ + sm.AddMessageFull(sessionKey, providers.Message{ Role: role, Content: content, }) +} + +func (sm *SessionManager) AddMessageFull(sessionKey string, msg providers.Message) { + session := sm.GetOrCreate(sessionKey) + + session.mu.Lock() + session.Messages = append(session.Messages, msg) session.Updated = time.Now() + session.mu.Unlock() + + // 立即持久化 (Append-only) + sm.appendMessage(sessionKey, msg) +} + +func (sm *SessionManager) appendMessage(sessionKey string, msg providers.Message) error { + if sm.storage == "" { + return nil + } + + sessionPath := filepath.Join(sm.storage, sessionKey+".jsonl") + f, err := os.OpenFile(sessionPath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) + if err != nil { + return err + } + defer f.Close() + + data, err := json.Marshal(msg) + if err != nil { + return err + } + + _, err = f.Write(append(data, '\n')) + return err } func (sm *SessionManager) GetHistory(key string) []providers.Message { @@ -147,21 +176,20 @@ func (sm *SessionManager) TruncateHistory(key string, keepLast int) { } func (sm *SessionManager) Save(session *Session) error { + // 现已通过 AddMessageFull 实时增量持久化 + // 这里保留 Save 方法用于更新 Summary 等元数据 if sm.storage == "" { return nil } - session.mu.RLock() - defer session.mu.RUnlock() - - sessionPath := filepath.Join(sm.storage, session.Key+".json") - - data, err := json.MarshalIndent(session, "", " ") - if err != nil { - return err + metaPath := filepath.Join(sm.storage, session.Key+".meta") + meta := map[string]interface{}{ + "summary": session.Summary, + "updated": session.Updated, + "created": session.Created, } - - return os.WriteFile(sessionPath, data, 0644) + data, _ := json.MarshalIndent(meta, "", " ") + return os.WriteFile(metaPath, data, 0644) } func (sm *SessionManager) loadSessions() error { @@ -175,22 +203,47 @@ func (sm *SessionManager) loadSessions() error { continue } - if filepath.Ext(file.Name()) != ".json" { - continue + // 处理 JSONL 历史消息 + if filepath.Ext(file.Name()) == ".jsonl" { + sessionKey := strings.TrimSuffix(file.Name(), ".jsonl") + session := sm.GetOrCreate(sessionKey) + + f, err := os.Open(filepath.Join(sm.storage, file.Name())) + if err != nil { + continue + } + + scanner := bufio.NewScanner(f) + session.mu.Lock() + for scanner.Scan() { + var msg providers.Message + if err := json.Unmarshal(scanner.Bytes(), &msg); err == nil { + session.Messages = append(session.Messages, msg) + } + } + session.mu.Unlock() + f.Close() } - sessionPath := filepath.Join(sm.storage, file.Name()) - data, err := os.ReadFile(sessionPath) - if err != nil { - continue - } + // 处理元数据 + if filepath.Ext(file.Name()) == ".meta" { + sessionKey := strings.TrimSuffix(file.Name(), ".meta") + session := sm.GetOrCreate(sessionKey) - var session Session - if err := json.Unmarshal(data, &session); err != nil { - continue + data, err := os.ReadFile(filepath.Join(sm.storage, file.Name())) + if err == nil { + var meta struct { + Summary string `json:"summary"` + Updated time.Time `json:"updated"` + Created time.Time `json:"created"` + } + if err := json.Unmarshal(data, &meta); err == nil { + session.Summary = meta.Summary + session.Updated = meta.Updated + session.Created = meta.Created + } + } } - - sm.sessions[session.Key] = &session } return nil