mirror of
https://github.com/YspCoder/clawgo.git
synced 2026-04-13 18:07:36 +08:00
feat: implement high-performance JSONL session persistence with incremental append support
This commit is contained in:
BIN
clawgo_test
BIN
clawgo_test
Binary file not shown.
@@ -299,6 +299,8 @@ func (al *AgentLoop) processMessage(ctx context.Context, msg bus.InboundMessage)
|
||||
})
|
||||
}
|
||||
messages = append(messages, assistantMsg)
|
||||
// 持久化包含 ToolCalls 的助手消息
|
||||
al.sessions.AddMessageFull(msg.SessionKey, assistantMsg)
|
||||
|
||||
for _, tc := range response.ToolCalls {
|
||||
// Log tool call with arguments preview
|
||||
@@ -321,6 +323,8 @@ func (al *AgentLoop) processMessage(ctx context.Context, msg bus.InboundMessage)
|
||||
ToolCallID: tc.ID,
|
||||
}
|
||||
messages = append(messages, toolResultMsg)
|
||||
// 持久化工具返回结果
|
||||
al.sessions.AddMessageFull(msg.SessionKey, toolResultMsg)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -343,11 +347,13 @@ func (al *AgentLoop) processMessage(ctx context.Context, msg bus.InboundMessage)
|
||||
}
|
||||
|
||||
al.sessions.AddMessage(msg.SessionKey, "user", msg.Content)
|
||||
// We store the filtered content in history so the model sees what the user saw
|
||||
// (or we could store full content if we want the model to remember its thoughts)
|
||||
// The prompt says "filter out ... from the user-facing response".
|
||||
// I'll store the filtered version to be safe.
|
||||
al.sessions.AddMessage(msg.SessionKey, "assistant", userContent)
|
||||
|
||||
// 使用 AddMessageFull 存储包含思考过程或工具调用的完整助手消息
|
||||
al.sessions.AddMessageFull(msg.SessionKey, providers.Message{
|
||||
Role: "assistant",
|
||||
Content: userContent,
|
||||
})
|
||||
|
||||
al.sessions.Save(al.sessions.GetOrCreate(msg.SessionKey))
|
||||
|
||||
// Log response preview (original content)
|
||||
@@ -487,6 +493,8 @@ func (al *AgentLoop) processSystemMessage(ctx context.Context, msg bus.InboundMe
|
||||
})
|
||||
}
|
||||
messages = append(messages, assistantMsg)
|
||||
// 持久化包含 ToolCalls 的助手消息
|
||||
al.sessions.AddMessageFull(sessionKey, assistantMsg)
|
||||
|
||||
for _, tc := range response.ToolCalls {
|
||||
result, err := al.tools.Execute(ctx, tc.Name, tc.Arguments)
|
||||
@@ -500,6 +508,8 @@ func (al *AgentLoop) processSystemMessage(ctx context.Context, msg bus.InboundMe
|
||||
ToolCallID: tc.ID,
|
||||
}
|
||||
messages = append(messages, toolResultMsg)
|
||||
// 持久化工具返回结果
|
||||
al.sessions.AddMessageFull(sessionKey, toolResultMsg)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -509,7 +519,15 @@ func (al *AgentLoop) processSystemMessage(ctx context.Context, msg bus.InboundMe
|
||||
|
||||
// Save to session with system message marker
|
||||
al.sessions.AddMessage(sessionKey, "user", fmt.Sprintf("[System: %s] %s", msg.SenderID, msg.Content))
|
||||
al.sessions.AddMessage(sessionKey, "assistant", finalContent)
|
||||
|
||||
// 如果 finalContent 中没有包含 tool calls (即最后一次 LLM 返回的结果)
|
||||
// 我们已经通过循环内部的 AddMessageFull 存储了前面的步骤
|
||||
// 这里的 AddMessageFull 会存储最终回复
|
||||
al.sessions.AddMessageFull(sessionKey, providers.Message{
|
||||
Role: "assistant",
|
||||
Content: finalContent,
|
||||
})
|
||||
|
||||
al.sessions.Save(al.sessions.GetOrCreate(sessionKey))
|
||||
|
||||
logger.InfoCF("agent", "System message processing completed",
|
||||
|
||||
@@ -1,9 +1,11 @@
|
||||
package session
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"encoding/json"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
@@ -68,16 +70,43 @@ func (sm *SessionManager) GetOrCreate(key string) *Session {
|
||||
}
|
||||
|
||||
func (sm *SessionManager) AddMessage(sessionKey, role, content string) {
|
||||
session := sm.GetOrCreate(sessionKey)
|
||||
|
||||
session.mu.Lock()
|
||||
defer session.mu.Unlock()
|
||||
|
||||
session.Messages = append(session.Messages, providers.Message{
|
||||
sm.AddMessageFull(sessionKey, providers.Message{
|
||||
Role: role,
|
||||
Content: content,
|
||||
})
|
||||
}
|
||||
|
||||
func (sm *SessionManager) AddMessageFull(sessionKey string, msg providers.Message) {
|
||||
session := sm.GetOrCreate(sessionKey)
|
||||
|
||||
session.mu.Lock()
|
||||
session.Messages = append(session.Messages, msg)
|
||||
session.Updated = time.Now()
|
||||
session.mu.Unlock()
|
||||
|
||||
// 立即持久化 (Append-only)
|
||||
sm.appendMessage(sessionKey, msg)
|
||||
}
|
||||
|
||||
func (sm *SessionManager) appendMessage(sessionKey string, msg providers.Message) error {
|
||||
if sm.storage == "" {
|
||||
return nil
|
||||
}
|
||||
|
||||
sessionPath := filepath.Join(sm.storage, sessionKey+".jsonl")
|
||||
f, err := os.OpenFile(sessionPath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
data, err := json.Marshal(msg)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
_, err = f.Write(append(data, '\n'))
|
||||
return err
|
||||
}
|
||||
|
||||
func (sm *SessionManager) GetHistory(key string) []providers.Message {
|
||||
@@ -147,21 +176,20 @@ func (sm *SessionManager) TruncateHistory(key string, keepLast int) {
|
||||
}
|
||||
|
||||
func (sm *SessionManager) Save(session *Session) error {
|
||||
// 现已通过 AddMessageFull 实时增量持久化
|
||||
// 这里保留 Save 方法用于更新 Summary 等元数据
|
||||
if sm.storage == "" {
|
||||
return nil
|
||||
}
|
||||
|
||||
session.mu.RLock()
|
||||
defer session.mu.RUnlock()
|
||||
|
||||
sessionPath := filepath.Join(sm.storage, session.Key+".json")
|
||||
|
||||
data, err := json.MarshalIndent(session, "", " ")
|
||||
if err != nil {
|
||||
return err
|
||||
metaPath := filepath.Join(sm.storage, session.Key+".meta")
|
||||
meta := map[string]interface{}{
|
||||
"summary": session.Summary,
|
||||
"updated": session.Updated,
|
||||
"created": session.Created,
|
||||
}
|
||||
|
||||
return os.WriteFile(sessionPath, data, 0644)
|
||||
data, _ := json.MarshalIndent(meta, "", " ")
|
||||
return os.WriteFile(metaPath, data, 0644)
|
||||
}
|
||||
|
||||
func (sm *SessionManager) loadSessions() error {
|
||||
@@ -175,22 +203,47 @@ func (sm *SessionManager) loadSessions() error {
|
||||
continue
|
||||
}
|
||||
|
||||
if filepath.Ext(file.Name()) != ".json" {
|
||||
continue
|
||||
// 处理 JSONL 历史消息
|
||||
if filepath.Ext(file.Name()) == ".jsonl" {
|
||||
sessionKey := strings.TrimSuffix(file.Name(), ".jsonl")
|
||||
session := sm.GetOrCreate(sessionKey)
|
||||
|
||||
f, err := os.Open(filepath.Join(sm.storage, file.Name()))
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
scanner := bufio.NewScanner(f)
|
||||
session.mu.Lock()
|
||||
for scanner.Scan() {
|
||||
var msg providers.Message
|
||||
if err := json.Unmarshal(scanner.Bytes(), &msg); err == nil {
|
||||
session.Messages = append(session.Messages, msg)
|
||||
}
|
||||
}
|
||||
session.mu.Unlock()
|
||||
f.Close()
|
||||
}
|
||||
|
||||
sessionPath := filepath.Join(sm.storage, file.Name())
|
||||
data, err := os.ReadFile(sessionPath)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
// 处理元数据
|
||||
if filepath.Ext(file.Name()) == ".meta" {
|
||||
sessionKey := strings.TrimSuffix(file.Name(), ".meta")
|
||||
session := sm.GetOrCreate(sessionKey)
|
||||
|
||||
var session Session
|
||||
if err := json.Unmarshal(data, &session); err != nil {
|
||||
continue
|
||||
data, err := os.ReadFile(filepath.Join(sm.storage, file.Name()))
|
||||
if err == nil {
|
||||
var meta struct {
|
||||
Summary string `json:"summary"`
|
||||
Updated time.Time `json:"updated"`
|
||||
Created time.Time `json:"created"`
|
||||
}
|
||||
if err := json.Unmarshal(data, &meta); err == nil {
|
||||
session.Summary = meta.Summary
|
||||
session.Updated = meta.Updated
|
||||
session.Created = meta.Created
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
sm.sessions[session.Key] = &session
|
||||
}
|
||||
|
||||
return nil
|
||||
|
||||
Reference in New Issue
Block a user