package session import ( "crypto/sha1" "encoding/hex" "encoding/json" "fmt" "os" "path/filepath" "regexp" "sort" "strconv" "strings" "sync" "time" "unicode" "github.com/YspCoder/clawgo/pkg/jsonlog" "github.com/YspCoder/clawgo/pkg/providers" ) const ( defaultSessionSegmentMaxMessages = 200 defaultSessionSegmentMaxBytes = 2 * 1024 * 1024 defaultSessionPromptLoadSegments = 2 maxTokenRefsPerSessionToken = 48 maxSearchSnippetsPerSession = 3 ) var archiveSegmentRe = regexp.MustCompile(`^(?P.+)\.(?P\d{4})\.jsonl$`) type Session struct { Key string `json:"key"` SessionID string `json:"session_id,omitempty"` Kind string `json:"kind,omitempty"` Messages []providers.Message `json:"messages"` Summary string `json:"summary,omitempty"` CompactionCount int `json:"compaction_count,omitempty"` LastLanguage string `json:"last_language,omitempty"` PreferredLanguage string `json:"preferred_language,omitempty"` Created time.Time `json:"created"` Updated time.Time `json:"updated"` mu sync.RWMutex segments []sessionSegmentMeta nextSeq int index *sessionIndexFile } type SessionManager struct { sessions map[string]*Session mu sync.RWMutex storage string segmentMaxMessages int segmentMaxBytes int64 promptLoadSegments int } type openClawEvent struct { Type string `json:"type"` Timestamp string `json:"timestamp,omitempty"` Message *struct { Role string `json:"role"` Content []struct { Type string `json:"type"` Text string `json:"text,omitempty"` } `json:"content,omitempty"` ToolCallID string `json:"toolCallId,omitempty"` ToolName string `json:"toolName,omitempty"` ToolCalls []providers.ToolCall `json:"toolCalls,omitempty"` } `json:"message,omitempty"` } type sessionSegmentMeta struct { Name string `json:"name"` Archived bool `json:"archived,omitempty"` FirstSeq int `json:"first_seq,omitempty"` LastSeq int `json:"last_seq,omitempty"` MessageCount int `json:"message_count,omitempty"` LastOffset int64 `json:"last_offset,omitempty"` UpdatedAt int64 `json:"updated_at,omitempty"` } type sessionMetaFile struct { Version int `json:"version"` SessionKey string `json:"session_key"` SessionID string `json:"session_id,omitempty"` Kind string `json:"kind,omitempty"` Summary string `json:"summary,omitempty"` CompactionCount int `json:"compaction_count,omitempty"` LastLanguage string `json:"last_language,omitempty"` PreferredLanguage string `json:"preferred_language,omitempty"` MessageCount int `json:"message_count,omitempty"` CreatedAt int64 `json:"created_at,omitempty"` UpdatedAt int64 `json:"updated_at,omitempty"` NextSeq int `json:"next_seq,omitempty"` Segments []sessionSegmentMeta `json:"segments,omitempty"` } type sessionIndexRef struct { Seq int `json:"seq"` Role string `json:"role,omitempty"` Segment string `json:"segment,omitempty"` Snippet string `json:"snippet,omitempty"` } type sessionIndexFile struct { Version int `json:"version"` SessionKey string `json:"session_key"` LastSeq int `json:"last_seq,omitempty"` LastOffset int64 `json:"last_offset,omitempty"` Segment string `json:"segment,omitempty"` UpdatedAt int64 `json:"updated_at,omitempty"` Tokens map[string][]sessionIndexRef `json:"tokens,omitempty"` } type SessionSearchSnippet struct { Seq int `json:"seq"` Role string `json:"role,omitempty"` Segment string `json:"segment,omitempty"` Content string `json:"content,omitempty"` } type SessionSearchResult struct { Key string `json:"key"` Kind string `json:"kind,omitempty"` Summary string `json:"summary,omitempty"` UpdatedAt time.Time `json:"updated_at"` Score int `json:"score"` Snippets []SessionSearchSnippet `json:"snippets,omitempty"` } type SessionCompactionSnapshot struct { Key string History []providers.Message Summary string NextSeq int } type sessionsIndexEntry struct { SessionID string `json:"sessionId"` SessionKey string `json:"sessionKey"` UpdatedAt int64 `json:"updatedAt"` Kind string `json:"kind"` ChatType string `json:"chatType"` CompactionCount int `json:"compactionCount"` SessionFile string `json:"sessionFile,omitempty"` Summary string `json:"summary,omitempty"` LastLanguage string `json:"lastLanguage,omitempty"` PreferredLanguage string `json:"preferredLanguage,omitempty"` } type appendMessageResult struct { refreshSessionsIndex bool } func NewSessionManager(storage string) *SessionManager { sm := &SessionManager{ sessions: make(map[string]*Session), storage: storage, segmentMaxMessages: readPositiveIntEnv("CLAWGO_SESSION_SEGMENT_MAX_MESSAGES", defaultSessionSegmentMaxMessages), segmentMaxBytes: int64(readPositiveIntEnv("CLAWGO_SESSION_SEGMENT_MAX_BYTES", defaultSessionSegmentMaxBytes)), promptLoadSegments: readPositiveIntEnv("CLAWGO_SESSION_PROMPT_LOAD_SEGMENTS", defaultSessionPromptLoadSegments), } if storage != "" { _ = os.MkdirAll(storage, 0755) sm.cleanupArchivedSessions() sm.loadSessions() } return sm } func (sm *SessionManager) GetOrCreate(key string) *Session { session, _ := sm.getOrCreate(key) return session } func (sm *SessionManager) getOrCreate(key string) (*Session, bool) { sm.mu.RLock() session, ok := sm.sessions[key] sm.mu.RUnlock() if ok { return session, false } sm.mu.Lock() defer sm.mu.Unlock() if session, ok = sm.sessions[key]; ok { return session, false } now := time.Now() session = &Session{ Key: key, SessionID: deriveSessionID(key), Kind: detectSessionKind(key), Messages: []providers.Message{}, Created: now, Updated: now, nextSeq: 1, } sm.sessions[key] = session return session, true } func (sm *SessionManager) AddMessage(sessionKey, role, content string) { sm.AddMessageFull(sessionKey, providers.Message{Role: role, Content: content}) } func (sm *SessionManager) AddMessageFull(sessionKey string, msg providers.Message) { session, created := sm.getOrCreate(sessionKey) persisted := false refreshIndex := created session.mu.Lock() session.Messages = append(session.Messages, msg) session.Updated = time.Now() if session.Created.IsZero() { session.Created = session.Updated } appendResult, err := sm.appendMessageLocked(session, msg) persisted = err == nil refreshIndex = refreshIndex || appendResult.refreshSessionsIndex session.mu.Unlock() if persisted && refreshIndex { _ = sm.writeOpenClawSessionsIndex() } } func (sm *SessionManager) GetPromptHistory(key string) []providers.Message { sm.mu.RLock() session, ok := sm.sessions[key] sm.mu.RUnlock() if !ok { return nil } session.mu.RLock() defer session.mu.RUnlock() history := make([]providers.Message, len(session.Messages)) copy(history, session.Messages) return history } func (sm *SessionManager) GetHistory(key string) []providers.Message { sm.mu.RLock() session, ok := sm.sessions[key] sm.mu.RUnlock() if !ok { return []providers.Message{} } session.mu.RLock() segments := append([]sessionSegmentMeta(nil), session.segments...) loaded := make([]providers.Message, len(session.Messages)) copy(loaded, session.Messages) session.mu.RUnlock() if len(segments) == 0 { return loaded } all, err := sm.loadMessagesForSegments(segments) if err != nil || len(all) == 0 { return loaded } return all } func (sm *SessionManager) GetSummary(key string) string { sm.mu.RLock() session, ok := sm.sessions[key] sm.mu.RUnlock() if !ok { return "" } session.mu.RLock() defer session.mu.RUnlock() return session.Summary } func (sm *SessionManager) SetSummary(key, summary string) { session := sm.GetOrCreate(key) session.mu.Lock() trimmed := strings.TrimSpace(summary) if session.Summary == trimmed { session.mu.Unlock() return } session.Summary = trimmed session.Updated = time.Now() _ = sm.persistSidecarsLocked(session) session.mu.Unlock() _ = sm.writeOpenClawSessionsIndex() } func (sm *SessionManager) ApplyCompaction(key string, keep []providers.Message, summary string) bool { session := sm.GetOrCreate(key) session.mu.Lock() applied := sm.applyCompactionLocked(session, keep, summary) session.mu.Unlock() if applied { _ = sm.writeOpenClawSessionsIndex() } return applied } func (sm *SessionManager) ApplyCompactionIfUnchanged(key string, baseNextSeq int, baseSummary string, keep []providers.Message, summary string) bool { session := sm.GetOrCreate(key) session.mu.Lock() if session.nextSeq != baseNextSeq || session.Summary != baseSummary { session.mu.Unlock() return false } applied := sm.applyCompactionLocked(session, keep, summary) session.mu.Unlock() if applied { _ = sm.writeOpenClawSessionsIndex() } return applied } func (sm *SessionManager) applyCompactionLocked(session *Session, keep []providers.Message, summary string) bool { if session == nil { return false } if len(session.Messages) == 0 && len(keep) == 0 { return false } session.Messages = append([]providers.Message(nil), keep...) if trimmed := strings.TrimSpace(summary); trimmed != "" { session.Summary = trimmed } session.CompactionCount++ session.Updated = time.Now() if err := sm.persistSidecarsLocked(session); err != nil { return false } return true } func (sm *SessionManager) CompactSession(key string, keepLast int, note string) bool { session := sm.GetOrCreate(key) session.mu.Lock() if keepLast <= 0 || len(session.Messages) <= keepLast { session.mu.Unlock() return false } session.Messages = append([]providers.Message(nil), session.Messages[len(session.Messages)-keepLast:]...) session.CompactionCount++ if trimmed := strings.TrimSpace(note); trimmed != "" { if strings.TrimSpace(session.Summary) == "" { session.Summary = trimmed } else { session.Summary += "\n" + trimmed } } session.Updated = time.Now() err := sm.persistSidecarsLocked(session) session.mu.Unlock() if err == nil { _ = sm.writeOpenClawSessionsIndex() } return true } func (sm *SessionManager) GetLanguagePreferences(key string) (preferred string, last string) { sm.mu.RLock() session, ok := sm.sessions[key] sm.mu.RUnlock() if !ok { return "", "" } session.mu.RLock() defer session.mu.RUnlock() return session.PreferredLanguage, session.LastLanguage } func (sm *SessionManager) CompactionSnapshot(key string) SessionCompactionSnapshot { sm.mu.RLock() session, ok := sm.sessions[key] sm.mu.RUnlock() if !ok || session == nil { return SessionCompactionSnapshot{Key: key} } session.mu.RLock() defer session.mu.RUnlock() history := make([]providers.Message, len(session.Messages)) copy(history, session.Messages) return SessionCompactionSnapshot{ Key: key, History: history, Summary: session.Summary, NextSeq: session.nextSeq, } } func (sm *SessionManager) SetLastLanguage(key, lang string) { if strings.TrimSpace(lang) == "" { return } session := sm.GetOrCreate(key) session.mu.Lock() if session.LastLanguage == lang { session.mu.Unlock() return } session.LastLanguage = lang session.Updated = time.Now() _ = sm.persistSidecarsLocked(session) session.mu.Unlock() _ = sm.writeOpenClawSessionsIndex() } func (sm *SessionManager) SetPreferredLanguage(key, lang string) { if strings.TrimSpace(lang) == "" { return } session := sm.GetOrCreate(key) session.mu.Lock() if session.PreferredLanguage == lang { session.mu.Unlock() return } session.PreferredLanguage = lang session.Updated = time.Now() _ = sm.persistSidecarsLocked(session) session.mu.Unlock() _ = sm.writeOpenClawSessionsIndex() } func (sm *SessionManager) Save(session *Session) error { if sm.storage == "" || session == nil { return nil } session.mu.Lock() if err := sm.persistSidecarsLocked(session); err != nil { session.mu.Unlock() return err } session.mu.Unlock() return sm.writeOpenClawSessionsIndex() } func (sm *SessionManager) Count() int { sm.mu.RLock() defer sm.mu.RUnlock() return len(sm.sessions) } func (sm *SessionManager) Keys() []string { sm.mu.RLock() defer sm.mu.RUnlock() keys := make([]string, 0, len(sm.sessions)) for k := range sm.sessions { keys = append(keys, k) } sort.Strings(keys) return keys } func (sm *SessionManager) List(limit int) []Session { sm.mu.RLock() defer sm.mu.RUnlock() items := make([]Session, 0, len(sm.sessions)) for _, s := range sm.sessions { s.mu.RLock() items = append(items, Session{ Key: s.Key, SessionID: s.SessionID, Kind: s.Kind, Summary: s.Summary, CompactionCount: s.CompactionCount, LastLanguage: s.LastLanguage, PreferredLanguage: s.PreferredLanguage, Created: s.Created, Updated: s.Updated, }) s.mu.RUnlock() } sort.Slice(items, func(i, j int) bool { return items[i].Updated.After(items[j].Updated) }) if limit > 0 && len(items) > limit { items = items[:limit] } return items } func (sm *SessionManager) Search(query string, kinds []string, excludeKey string, limit int) []SessionSearchResult { terms := tokenizeQueryText(query) if len(terms) == 0 { return nil } if limit <= 0 { limit = 5 } kindSet := make(map[string]struct{}, len(kinds)) for _, item := range kinds { if v := strings.ToLower(strings.TrimSpace(item)); v != "" { kindSet[v] = struct{}{} } } sm.mu.RLock() keys := make([]string, 0, len(sm.sessions)) for key := range sm.sessions { keys = append(keys, key) } sm.mu.RUnlock() sort.Strings(keys) results := make([]SessionSearchResult, 0, len(keys)) for _, key := range keys { if key == strings.TrimSpace(excludeKey) { continue } sm.mu.RLock() session := sm.sessions[key] sm.mu.RUnlock() if session == nil { continue } session.mu.RLock() kind := session.Kind summary := session.Summary updated := session.Updated index := cloneSessionIndex(session.index) session.mu.RUnlock() if len(kindSet) > 0 { if _, ok := kindSet[strings.ToLower(strings.TrimSpace(kind))]; !ok { continue } } if index != nil { if result, ok := searchSessionIndex(key, kind, summary, updated, terms, index); ok { results = append(results, result) continue } } indexPath := sm.sessionIndexPath(key) if index, err := sm.readIndexFile(indexPath); err == nil { if result, ok := searchSessionIndex(key, kind, summary, updated, terms, index); ok { results = append(results, result) continue } } if result, ok := sm.searchSessionByScan(key, kind, summary, updated, terms); ok { results = append(results, result) } } sort.Slice(results, func(i, j int) bool { if results[i].Score != results[j].Score { return results[i].Score > results[j].Score } if !results[i].UpdatedAt.Equal(results[j].UpdatedAt) { return results[i].UpdatedAt.After(results[j].UpdatedAt) } return results[i].Key < results[j].Key }) if len(results) > limit { results = results[:limit] } return results } func toOpenClawMessageEvent(msg providers.Message) openClawEvent { role := strings.TrimSpace(strings.ToLower(msg.Role)) mappedRole := role if role == "tool" { mappedRole = "toolResult" } e := openClawEvent{ Type: "message", Timestamp: time.Now().UTC().Format(time.RFC3339Nano), Message: &struct { Role string `json:"role"` Content []struct { Type string `json:"type"` Text string `json:"text,omitempty"` } `json:"content,omitempty"` ToolCallID string `json:"toolCallId,omitempty"` ToolName string `json:"toolName,omitempty"` ToolCalls []providers.ToolCall `json:"toolCalls,omitempty"` }{ Role: mappedRole, Content: []struct { Type string `json:"type"` Text string `json:"text,omitempty"` }{ {Type: "text", Text: msg.Content}, }, ToolCallID: msg.ToolCallID, ToolCalls: msg.ToolCalls, }, } return e } func fromJSONLLine(line []byte) (providers.Message, bool) { var raw providers.Message if err := json.Unmarshal(line, &raw); err == nil && strings.TrimSpace(raw.Role) != "" { return raw, true } var event openClawEvent if err := json.Unmarshal(line, &event); err != nil { return providers.Message{}, false } if event.Type != "message" || event.Message == nil { return providers.Message{}, false } role := strings.TrimSpace(strings.ToLower(event.Message.Role)) if role == "toolresult" { role = "tool" } var content string for _, part := range event.Message.Content { if strings.TrimSpace(strings.ToLower(part.Type)) == "text" { if content != "" { content += "\n" } content += part.Text } } return providers.Message{Role: role, Content: content, ToolCallID: event.Message.ToolCallID, ToolCalls: event.Message.ToolCalls}, true } func deriveSessionID(key string) string { sum := sha1.Sum([]byte("clawgo-session:" + key)) h := hex.EncodeToString(sum[:]) return h[0:8] + "-" + h[8:12] + "-" + h[12:16] + "-" + h[16:20] + "-" + h[20:32] } func detectSessionKind(key string) string { k := strings.TrimSpace(strings.ToLower(key)) switch { case strings.HasPrefix(k, "cron:"): return "cron" case strings.HasPrefix(k, "subagent:") || strings.Contains(k, ":subagent:"): return "subagent" case strings.HasPrefix(k, "hook:"): return "hook" case strings.Contains(k, ":"): return "main" default: return "other" } } func (sm *SessionManager) writeOpenClawSessionsIndex() error { if sm.storage == "" { return nil } sm.mu.RLock() defer sm.mu.RUnlock() index := map[string]sessionsIndexEntry{} for key, s := range sm.sessions { s.mu.RLock() sid := strings.TrimSpace(s.SessionID) if sid == "" { sid = deriveSessionID(key) } entry := sessionsIndexEntry{ SessionID: sid, SessionKey: key, UpdatedAt: s.Updated.UnixMilli(), ChatType: mapKindToChatType(s.Kind), Kind: s.Kind, CompactionCount: s.CompactionCount, SessionFile: filepath.Join(sm.storage, activeSegmentFilename(key)), Summary: s.Summary, LastLanguage: s.LastLanguage, PreferredLanguage: s.PreferredLanguage, } s.mu.RUnlock() index[key] = entry } data, err := json.MarshalIndent(index, "", " ") if err != nil { return err } return os.WriteFile(filepath.Join(sm.storage, "sessions.json"), data, 0644) } func (sm *SessionManager) cleanupArchivedSessions() { if sm.storage == "" { return } days := 30 if v := strings.TrimSpace(os.Getenv("CLAWGO_SESSION_ARCHIVE_RETENTION_DAYS")); v != "" { if n, err := strconv.Atoi(v); err == nil && n >= 1 { days = n } } cutoff := time.Now().Add(-time.Duration(days) * 24 * time.Hour) entries, err := os.ReadDir(sm.storage) if err != nil { return } for _, e := range entries { if e.IsDir() { continue } name := e.Name() if !strings.Contains(name, ".jsonl.deleted.") { continue } full := filepath.Join(sm.storage, name) fi, err := os.Stat(full) if err != nil { continue } if fi.ModTime().Before(cutoff) { _ = os.Remove(full) } } } func mapKindToChatType(kind string) string { switch strings.ToLower(strings.TrimSpace(kind)) { case "main": return "direct" case "cron", "subagent", "hook": return "internal" default: return "unknown" } } func (sm *SessionManager) loadSessions() error { fallbackIndex := sm.readSessionsIndex() keys, err := sm.discoverSessionKeys() if err != nil { return err } for _, key := range keys { session, loadErr := sm.loadSessionFromDisk(key, fallbackIndex[key]) if loadErr != nil { return loadErr } if session == nil { continue } sm.sessions[key] = session } return sm.writeOpenClawSessionsIndex() } func (sm *SessionManager) readSessionsIndex() map[string]sessionsIndexEntry { if sm.storage == "" { return nil } data, err := os.ReadFile(filepath.Join(sm.storage, "sessions.json")) if err != nil { return nil } var index map[string]sessionsIndexEntry if err := json.Unmarshal(data, &index); err != nil { return nil } return index } func (sm *SessionManager) loadSessionFromDisk(key string, fallback sessionsIndexEntry) (*Session, error) { meta, err := sm.loadOrRebuildMeta(key) if err != nil { return nil, err } index, indexErr := sm.readIndexFile(sm.sessionIndexPath(key)) if indexErr != nil { index = nil } if meta == nil { if strings.TrimSpace(fallback.SessionKey) == "" && key == "" { return nil, nil } now := time.UnixMilli(fallback.UpdatedAt) if fallback.UpdatedAt <= 0 { now = time.Now() } return &Session{ Key: key, SessionID: firstNonEmpty(fallback.SessionID, deriveSessionID(key)), Kind: firstNonEmpty(fallback.Kind, detectSessionKind(key)), Summary: fallback.Summary, CompactionCount: fallback.CompactionCount, LastLanguage: fallback.LastLanguage, PreferredLanguage: fallback.PreferredLanguage, Created: now, Updated: now, nextSeq: 1, index: index, }, nil } workingMessages, err := sm.loadWorkingSet(meta.Segments) if err != nil { return nil, err } created := time.UnixMilli(meta.CreatedAt) if meta.CreatedAt <= 0 { created = time.Now() } updated := time.UnixMilli(meta.UpdatedAt) if meta.UpdatedAt <= 0 { updated = created } if strings.TrimSpace(meta.Summary) == "" && strings.TrimSpace(fallback.Summary) != "" { meta.Summary = fallback.Summary } if strings.TrimSpace(meta.LastLanguage) == "" && strings.TrimSpace(fallback.LastLanguage) != "" { meta.LastLanguage = fallback.LastLanguage } if strings.TrimSpace(meta.PreferredLanguage) == "" && strings.TrimSpace(fallback.PreferredLanguage) != "" { meta.PreferredLanguage = fallback.PreferredLanguage } return &Session{ Key: key, SessionID: firstNonEmpty(meta.SessionID, fallback.SessionID, deriveSessionID(key)), Kind: firstNonEmpty(meta.Kind, fallback.Kind, detectSessionKind(key)), Messages: workingMessages, Summary: firstNonEmpty(meta.Summary, fallback.Summary), CompactionCount: maxInt(meta.CompactionCount, fallback.CompactionCount), LastLanguage: firstNonEmpty(meta.LastLanguage, fallback.LastLanguage), PreferredLanguage: firstNonEmpty(meta.PreferredLanguage, fallback.PreferredLanguage), Created: created, Updated: updated, segments: append([]sessionSegmentMeta(nil), meta.Segments...), nextSeq: maxInt(meta.NextSeq, meta.MessageCount+1, 1), index: index, }, nil } func (sm *SessionManager) loadOrRebuildMeta(key string) (*sessionMetaFile, error) { metaPath := sm.sessionMetaPath(key) indexPath := sm.sessionIndexPath(key) var meta sessionMetaFile metaValid := false if err := jsonlog.ReadJSON(metaPath, &meta); err == nil && meta.Version > 0 { if sm.metaMatchesStorage(key, &meta) { metaValid = true } } if metaValid { if _, err := sm.readIndexFile(indexPath); err == nil { return &meta, nil } } rebuilt, err := sm.rebuildSidecars(key, &meta) if err != nil { return nil, err } return rebuilt, nil } func (sm *SessionManager) metaMatchesStorage(key string, meta *sessionMetaFile) bool { if meta == nil || len(meta.Segments) == 0 { return false } for _, segment := range meta.Segments { size, err := jsonlog.FileSize(filepath.Join(sm.storage, segment.Name)) if err != nil { return false } if strings.EqualFold(segment.Name, activeSegmentFilename(key)) && size != segment.LastOffset { return false } } return true } func (sm *SessionManager) rebuildSidecars(key string, seed *sessionMetaFile) (*sessionMetaFile, error) { segments, err := sm.discoverSessionSegments(key) if err != nil { return nil, err } if len(segments) == 0 { return nil, nil } meta := &sessionMetaFile{ Version: 1, SessionKey: key, SessionID: deriveSessionID(key), Kind: detectSessionKind(key), Summary: strings.TrimSpace(seedValue(seed, func(v *sessionMetaFile) string { return v.Summary })), CompactionCount: seedInt(seed, func(v *sessionMetaFile) int { return v.CompactionCount }), LastLanguage: seedValue(seed, func(v *sessionMetaFile) string { return v.LastLanguage }), PreferredLanguage: seedValue(seed, func(v *sessionMetaFile) string { return v.PreferredLanguage }), Segments: make([]sessionSegmentMeta, 0, len(segments)), } index := sessionIndexFile{ Version: 1, SessionKey: key, LastSeq: 0, LastOffset: 0, Segment: "", UpdatedAt: time.Now().UnixMilli(), Tokens: map[string][]sessionIndexRef{}, } now := time.Now().UnixMilli() seq := 0 for _, name := range segments { fullPath := filepath.Join(sm.storage, name) size, err := jsonlog.FileSize(fullPath) if err != nil { return nil, err } seg := sessionSegmentMeta{ Name: name, Archived: !strings.EqualFold(name, activeSegmentFilename(key)), LastOffset: size, UpdatedAt: now, } if st, err := os.Stat(fullPath); err == nil { seg.UpdatedAt = st.ModTime().UnixMilli() if meta.CreatedAt == 0 || st.ModTime().UnixMilli() < meta.CreatedAt { meta.CreatedAt = st.ModTime().UnixMilli() } if st.ModTime().UnixMilli() > meta.UpdatedAt { meta.UpdatedAt = st.ModTime().UnixMilli() } } if err := jsonlog.Scan(fullPath, func(line []byte) error { msg, ok := fromJSONLLine(line) if !ok { return nil } seq++ if seg.FirstSeq == 0 { seg.FirstSeq = seq } seg.LastSeq = seq seg.MessageCount++ meta.MessageCount++ appendTokens(index.Tokens, tokenizeIndexText(msg.Content), sessionIndexRef{ Seq: seq, Role: strings.ToLower(strings.TrimSpace(msg.Role)), Segment: name, Snippet: messageSnippet(msg.Content), }) return nil }); err != nil { return nil, err } meta.Segments = append(meta.Segments, seg) } if meta.CreatedAt == 0 { meta.CreatedAt = now } if meta.UpdatedAt == 0 { meta.UpdatedAt = now } meta.NextSeq = seq + 1 if len(meta.Segments) > 0 { last := meta.Segments[len(meta.Segments)-1] index.LastSeq = last.LastSeq index.LastOffset = last.LastOffset index.Segment = last.Name } if err := sm.writeSidecarFiles(key, meta, &index); err != nil { return nil, err } return meta, nil } func (sm *SessionManager) loadWorkingSet(segments []sessionSegmentMeta) ([]providers.Message, error) { if len(segments) == 0 { return nil, nil } start := 0 if sm.promptLoadSegments > 0 && len(segments) > sm.promptLoadSegments { start = len(segments) - sm.promptLoadSegments } return sm.loadMessagesForSegments(segments[start:]) } func (sm *SessionManager) loadMessagesForSegments(segments []sessionSegmentMeta) ([]providers.Message, error) { out := make([]providers.Message, 0) for _, segment := range segments { path := filepath.Join(sm.storage, segment.Name) if err := jsonlog.Scan(path, func(line []byte) error { msg, ok := fromJSONLLine(line) if ok { out = append(out, msg) } return nil }); err != nil { if os.IsNotExist(err) { continue } return nil, err } } return out, nil } func (sm *SessionManager) GetHistoryWindow(key string, around, before, after, limit int) []providers.Message { sm.mu.RLock() session, ok := sm.sessions[key] sm.mu.RUnlock() if !ok { return nil } session.mu.RLock() segments := append([]sessionSegmentMeta(nil), session.segments...) session.mu.RUnlock() if len(segments) == 0 { return nil } startSeq, endSeq := computeHistorySeqWindow(segments, around, before, after, limit) if endSeq < startSeq { return nil } selected := make([]sessionSegmentMeta, 0, len(segments)) for _, segment := range segments { if segment.LastSeq < startSeq || segment.FirstSeq > endSeq { continue } selected = append(selected, segment) } if len(selected) == 0 { return nil } all, err := sm.loadMessagesForSegments(selected) if err != nil { return nil } out := make([]providers.Message, 0, len(all)) seq := 0 for _, segment := range selected { for i := 0; i < segment.MessageCount && seq < len(all); i++ { currentSeq := segment.FirstSeq + i msg := all[seq] seq++ if currentSeq < startSeq || currentSeq > endSeq { continue } out = append(out, msg) } } return out } func (sm *SessionManager) appendMessageLocked(session *Session, msg providers.Message) (appendMessageResult, error) { if sm.storage == "" { return appendMessageResult{}, nil } if err := sm.ensureActiveSegmentLocked(session); err != nil { return appendMessageResult{}, err } result := appendMessageResult{} if sm.shouldRolloverLocked(session) { if err := sm.rolloverLocked(session); err != nil { return appendMessageResult{}, err } result.refreshSessionsIndex = true if err := sm.ensureActiveSegmentLocked(session); err != nil { return appendMessageResult{}, err } } active := sm.activeSegmentLocked(session) if active == nil { return appendMessageResult{}, fmt.Errorf("active session segment unavailable") } offset, err := jsonlog.AppendLine(filepath.Join(sm.storage, active.Name), toOpenClawMessageEvent(msg)) if err != nil { return appendMessageResult{}, err } if active.FirstSeq == 0 { active.FirstSeq = session.nextSeq } active.LastSeq = session.nextSeq active.MessageCount++ active.LastOffset = offset active.UpdatedAt = time.Now().UnixMilli() sm.appendIndexLocked(session, msg, active.Name, active.LastOffset) session.nextSeq++ return result, sm.persistSidecarsLocked(session) } func (sm *SessionManager) ensureActiveSegmentLocked(session *Session) error { if session == nil { return nil } for i := range session.segments { if strings.EqualFold(session.segments[i].Name, activeSegmentFilename(session.Key)) { return nil } } session.segments = append(session.segments, sessionSegmentMeta{ Name: activeSegmentFilename(session.Key), Archived: false, LastOffset: 0, UpdatedAt: time.Now().UnixMilli(), }) return nil } func (sm *SessionManager) activeSegmentLocked(session *Session) *sessionSegmentMeta { if session == nil { return nil } for i := range session.segments { if strings.EqualFold(session.segments[i].Name, activeSegmentFilename(session.Key)) { return &session.segments[i] } } return nil } func (sm *SessionManager) shouldRolloverLocked(session *Session) bool { active := sm.activeSegmentLocked(session) if active == nil { return false } if sm.segmentMaxMessages > 0 && active.MessageCount >= sm.segmentMaxMessages { return true } if sm.segmentMaxBytes > 0 && active.LastOffset >= sm.segmentMaxBytes { return true } return false } func (sm *SessionManager) rolloverLocked(session *Session) error { active := sm.activeSegmentLocked(session) if active == nil || active.MessageCount == 0 { return nil } nextSeq := 1 for _, segment := range session.segments { if seq, ok := parseArchiveSegmentFilename(segment.Name, session.Key); ok && seq >= nextSeq { nextSeq = seq + 1 } } archiveName := archivedSegmentFilename(session.Key, nextSeq) if err := os.Rename(filepath.Join(sm.storage, active.Name), filepath.Join(sm.storage, archiveName)); err != nil { return err } active.Name = archiveName active.Archived = true if err := sm.ensureActiveSegmentLocked(session); err != nil { return err } newActive := sm.activeSegmentLocked(session) if newActive != nil { newActive.Archived = false newActive.FirstSeq = 0 newActive.LastSeq = 0 newActive.MessageCount = 0 newActive.LastOffset = 0 newActive.UpdatedAt = time.Now().UnixMilli() } rebuilt, err := sm.rebuildSidecars(session.Key, &sessionMetaFile{ Summary: session.Summary, CompactionCount: session.CompactionCount, LastLanguage: session.LastLanguage, PreferredLanguage: session.PreferredLanguage, }) if err != nil { return err } if rebuilt != nil { session.segments = append([]sessionSegmentMeta(nil), rebuilt.Segments...) session.nextSeq = maxInt(rebuilt.NextSeq, session.nextSeq) } return nil } func (sm *SessionManager) persistSidecarsLocked(session *Session) error { if sm.storage == "" || session == nil { return nil } meta := sessionMetaFile{ Version: 1, SessionKey: session.Key, SessionID: firstNonEmpty(session.SessionID, deriveSessionID(session.Key)), Kind: firstNonEmpty(session.Kind, detectSessionKind(session.Key)), Summary: strings.TrimSpace(session.Summary), CompactionCount: session.CompactionCount, LastLanguage: strings.TrimSpace(session.LastLanguage), PreferredLanguage: strings.TrimSpace(session.PreferredLanguage), MessageCount: sessionMessageCount(session), CreatedAt: session.Created.UnixMilli(), UpdatedAt: session.Updated.UnixMilli(), NextSeq: maxInt(session.nextSeq, sessionMessageCount(session)+1), Segments: append([]sessionSegmentMeta(nil), session.segments...), } if session.index == nil { index, err := sm.buildIndexForSessionLocked(session, meta.NextSeq-1) if err != nil { return err } session.index = index } return sm.writeSidecarFiles(session.Key, &meta, session.index) } func (sm *SessionManager) buildIndexForSessionLocked(session *Session, seqEnd int) (*sessionIndexFile, error) { messages, err := sm.loadMessagesForSegments(session.segments) if err != nil { return nil, err } index := &sessionIndexFile{ Version: 1, SessionKey: session.Key, LastSeq: 0, LastOffset: 0, Segment: "", UpdatedAt: time.Now().UnixMilli(), Tokens: map[string][]sessionIndexRef{}, } for i, msg := range messages { ref := sessionIndexRef{ Seq: i + 1, Role: strings.ToLower(strings.TrimSpace(msg.Role)), Segment: segmentNameForSeq(session.segments, i+1), Snippet: messageSnippet(msg.Content), } appendTokens(index.Tokens, tokenizeIndexText(msg.Content), ref) } index.LastSeq = seqEnd if active := sm.activeSegmentLocked(session); active != nil { index.LastOffset = active.LastOffset index.Segment = active.Name } return index, nil } func segmentNameForSeq(segments []sessionSegmentMeta, seq int) string { for _, segment := range segments { if seq >= segment.FirstSeq && seq <= segment.LastSeq { return segment.Name } } return "" } func sessionMessageCount(session *Session) int { count := 0 for _, segment := range session.segments { count += segment.MessageCount } if count == 0 && len(session.Messages) > 0 { count = len(session.Messages) } return count } func (sm *SessionManager) writeSidecarFiles(key string, meta *sessionMetaFile, index *sessionIndexFile) error { if err := jsonlog.WriteJSON(sm.sessionMetaPath(key), meta); err != nil { return err } return jsonlog.WriteJSON(sm.sessionIndexPath(key), index) } func (sm *SessionManager) readIndexFile(path string) (*sessionIndexFile, error) { var index sessionIndexFile if err := jsonlog.ReadJSON(path, &index); err != nil { return nil, err } if index.Version <= 0 { return nil, fmt.Errorf("invalid index version") } return &index, nil } func (sm *SessionManager) searchSessionByScan(key, kind, summary string, updated time.Time, terms []string) (SessionSearchResult, bool) { session := SessionSearchResult{ Key: key, Kind: kind, Summary: summary, UpdatedAt: updated, } all, err := sm.loadMessagesForSegments(sm.sessionSegments(key)) if err != nil { return SessionSearchResult{}, false } type scored struct { score int snippet SessionSearchSnippet } matches := make([]scored, 0) for idx, msg := range all { score := messageMatchScore(msg, terms) if score == 0 { continue } matches = append(matches, scored{ score: score, snippet: SessionSearchSnippet{ Seq: idx + 1, Role: strings.ToLower(strings.TrimSpace(msg.Role)), Content: messageSnippet(msg.Content), }, }) } if len(matches) == 0 { return SessionSearchResult{}, false } sort.Slice(matches, func(i, j int) bool { if matches[i].score != matches[j].score { return matches[i].score > matches[j].score } return matches[i].snippet.Seq > matches[j].snippet.Seq }) for i, item := range matches { if i >= maxSearchSnippetsPerSession { break } session.Score += item.score session.Snippets = append(session.Snippets, item.snippet) } return session, true } func (sm *SessionManager) sessionSegments(key string) []sessionSegmentMeta { sm.mu.RLock() session := sm.sessions[key] sm.mu.RUnlock() if session == nil { return nil } session.mu.RLock() defer session.mu.RUnlock() return append([]sessionSegmentMeta(nil), session.segments...) } func searchSessionIndex(key, kind, summary string, updated time.Time, terms []string, index *sessionIndexFile) (SessionSearchResult, bool) { type aggregate struct { score int ref sessionIndexRef } hits := map[int]*aggregate{} for _, term := range terms { for _, ref := range index.Tokens[term] { item := hits[ref.Seq] if item == nil { item = &aggregate{ref: ref} hits[ref.Seq] = item } item.score++ } } if len(hits) == 0 { return SessionSearchResult{}, false } aggs := make([]aggregate, 0, len(hits)) for _, item := range hits { aggs = append(aggs, *item) } sort.Slice(aggs, func(i, j int) bool { if aggs[i].score != aggs[j].score { return aggs[i].score > aggs[j].score } return aggs[i].ref.Seq > aggs[j].ref.Seq }) result := SessionSearchResult{ Key: key, Kind: kind, Summary: summary, UpdatedAt: updated, } for i, agg := range aggs { if i >= maxSearchSnippetsPerSession { break } result.Score += agg.score result.Snippets = append(result.Snippets, SessionSearchSnippet{ Seq: agg.ref.Seq, Role: agg.ref.Role, Segment: agg.ref.Segment, Content: agg.ref.Snippet, }) } return result, true } func appendTokens(dst map[string][]sessionIndexRef, tokens []string, ref sessionIndexRef) { for _, token := range tokens { items := dst[token] if len(items) >= maxTokenRefsPerSessionToken { continue } items = append(items, ref) dst[token] = items } } func (sm *SessionManager) appendIndexLocked(session *Session, msg providers.Message, segment string, offset int64) { if session == nil { return } if session.index == nil { session.index = &sessionIndexFile{ Version: 1, SessionKey: session.Key, Tokens: map[string][]sessionIndexRef{}, } } if session.index.Tokens == nil { session.index.Tokens = map[string][]sessionIndexRef{} } ref := sessionIndexRef{ Seq: session.nextSeq, Role: strings.ToLower(strings.TrimSpace(msg.Role)), Segment: segment, Snippet: messageSnippet(msg.Content), } appendTokens(session.index.Tokens, tokenizeIndexText(msg.Content), ref) session.index.LastSeq = session.nextSeq session.index.LastOffset = offset session.index.Segment = segment session.index.UpdatedAt = time.Now().UnixMilli() } func messageMatchScore(msg providers.Message, terms []string) int { if len(terms) == 0 { return 0 } content := strings.ToLower(msg.Content) score := 0 for _, term := range terms { if strings.Contains(content, term) { score++ } } return score } func messageSnippet(content string) string { content = strings.TrimSpace(strings.ReplaceAll(content, "\n", " ")) if len(content) <= 180 { return content } return content[:180] + "..." } func tokenizeIndexText(text string) []string { return tokenizeSearchText(text, false) } func tokenizeQueryText(text string) []string { return tokenizeSearchText(text, true) } func tokenizeSearchText(text string, includeSingleHan bool) []string { text = strings.ToLower(strings.TrimSpace(text)) if text == "" { return nil } out := make([]string, 0, 16) seen := map[string]struct{}{} var asciiBuf strings.Builder flushASCII := func() { if asciiBuf.Len() == 0 { return } fields := strings.FieldsFunc(asciiBuf.String(), func(r rune) bool { return !unicode.IsLetter(r) && !unicode.IsNumber(r) }) for _, field := range fields { field = strings.TrimSpace(field) if len(field) < 2 { continue } if _, ok := seen[field]; ok { continue } seen[field] = struct{}{} out = append(out, field) } asciiBuf.Reset() } var hanRunes []rune flushHan := func() { if len(hanRunes) == 0 { return } if includeSingleHan && len(hanRunes) == 1 { token := string(hanRunes[0]) if _, ok := seen[token]; !ok { seen[token] = struct{}{} out = append(out, token) } } if len(hanRunes) >= 2 { for i := 0; i < len(hanRunes)-1; i++ { token := string(hanRunes[i : i+2]) if _, ok := seen[token]; ok { continue } seen[token] = struct{}{} out = append(out, token) } } hanRunes = hanRunes[:0] } for _, r := range text { switch { case unicode.Is(unicode.Han, r): flushASCII() hanRunes = append(hanRunes, r) case unicode.IsLetter(r) || unicode.IsNumber(r): flushHan() asciiBuf.WriteRune(r) default: flushASCII() flushHan() } } flushASCII() flushHan() return out } func (sm *SessionManager) discoverSessionKeys() ([]string, error) { if sm.storage == "" { return nil, nil } entries, err := os.ReadDir(sm.storage) if err != nil { return nil, err } keys := map[string]struct{}{} for _, entry := range entries { if entry.IsDir() { continue } if key, ok := sessionKeyFromFilename(entry.Name()); ok { keys[key] = struct{}{} } } out := make([]string, 0, len(keys)) for key := range keys { out = append(out, key) } sort.Strings(out) return out, nil } func (sm *SessionManager) discoverSessionSegments(key string) ([]string, error) { entries, err := os.ReadDir(sm.storage) if err != nil { return nil, err } names := make([]string, 0) for _, entry := range entries { if entry.IsDir() { continue } name := entry.Name() switch { case name == activeSegmentFilename(key): names = append(names, name) case name == legacySegmentFilename(key): names = append(names, name) default: if _, ok := parseArchiveSegmentFilename(name, key); ok { names = append(names, name) } } } sort.Slice(names, func(i, j int) bool { return compareSegmentNames(key, names[i], names[j]) }) return names, nil } func compareSegmentNames(key, left, right string) bool { if left == right { return false } if left == legacySegmentFilename(key) { return true } if right == legacySegmentFilename(key) { return false } if left == activeSegmentFilename(key) { return false } if right == activeSegmentFilename(key) { return true } li, _ := parseArchiveSegmentFilename(left, key) ri, _ := parseArchiveSegmentFilename(right, key) return li < ri } func sessionKeyFromFilename(name string) (string, bool) { switch { case strings.HasSuffix(name, ".meta.json"): return strings.TrimSuffix(name, ".meta.json"), true case strings.HasSuffix(name, ".index.json"): return strings.TrimSuffix(name, ".index.json"), true case strings.HasSuffix(name, ".active.jsonl"): return strings.TrimSuffix(name, ".active.jsonl"), true case strings.HasSuffix(name, ".jsonl") && !strings.Contains(name, ".deleted."): if m := archiveSegmentRe.FindStringSubmatch(name); len(m) == 3 { return m[1], true } return strings.TrimSuffix(name, ".jsonl"), true default: return "", false } } func parseArchiveSegmentFilename(name, key string) (int, bool) { m := archiveSegmentRe.FindStringSubmatch(name) if len(m) != 3 || m[1] != key { return 0, false } n, err := strconv.Atoi(m[2]) if err != nil { return 0, false } return n, true } func activeSegmentFilename(key string) string { return key + ".active.jsonl" } func legacySegmentFilename(key string) string { return key + ".jsonl" } func archivedSegmentFilename(key string, seq int) string { return fmt.Sprintf("%s.%04d.jsonl", key, seq) } func (sm *SessionManager) sessionMetaPath(key string) string { return filepath.Join(sm.storage, key+".meta.json") } func (sm *SessionManager) sessionIndexPath(key string) string { return filepath.Join(sm.storage, key+".index.json") } func readPositiveIntEnv(name string, fallback int) int { if value := strings.TrimSpace(os.Getenv(name)); value != "" { if n, err := strconv.Atoi(value); err == nil && n > 0 { return n } } return fallback } func firstNonEmpty(values ...string) string { for _, value := range values { if strings.TrimSpace(value) != "" { return value } } return "" } func seedValue(seed *sessionMetaFile, get func(*sessionMetaFile) string) string { if seed == nil { return "" } return get(seed) } func seedInt(seed *sessionMetaFile, get func(*sessionMetaFile) int) int { if seed == nil { return 0 } return get(seed) } func maxInt(values ...int) int { best := 0 for _, value := range values { if value > best { best = value } } return best } func cloneSessionIndex(index *sessionIndexFile) *sessionIndexFile { if index == nil { return nil } out := &sessionIndexFile{ Version: index.Version, SessionKey: index.SessionKey, LastSeq: index.LastSeq, LastOffset: index.LastOffset, Segment: index.Segment, UpdatedAt: index.UpdatedAt, Tokens: make(map[string][]sessionIndexRef, len(index.Tokens)), } for key, refs := range index.Tokens { out.Tokens[key] = append([]sessionIndexRef(nil), refs...) } return out } func computeHistorySeqWindow(segments []sessionSegmentMeta, around, before, after, limit int) (int, int) { total := 0 for _, segment := range segments { if segment.LastSeq > total { total = segment.LastSeq } } if total <= 0 { return 1, 0 } if limit <= 0 { limit = 50 } start := 1 end := total if around > 0 { half := limit / 2 if half < 1 { half = 1 } start = around - half end = around + half if start < 1 { start = 1 } if end > total { end = total } } else { if after > 0 { start = after + 1 } if before > 0 { end = before - 1 } } if start < 1 { start = 1 } if end > total { end = total } if end < start { return 1, 0 } if end-start+1 > limit { start = end - limit + 1 if start < 1 { start = 1 } } return start, end }