diff --git a/pkg/session/manager.go b/pkg/session/manager.go index 68af46c..b95756d 100644 --- a/pkg/session/manager.go +++ b/pkg/session/manager.go @@ -7,36 +7,124 @@ import ( "os" "path/filepath" "sort" + "strconv" "strings" "sync" "time" "clawgo/pkg/logger" "clawgo/pkg/providers" + + "github.com/google/uuid" +) + +const ( + sessionsIndexFile = "sessions.json" + openClawVersion = 3 ) type Session struct { - Key string `json:"key"` - Messages []providers.Message `json:"messages"` - Summary string `json:"summary,omitempty"` - TokenIn int `json:"token_in,omitempty"` - TokenOut int `json:"token_out,omitempty"` - TokenSum int `json:"token_sum,omitempty"` - Created time.Time `json:"created"` - Updated time.Time `json:"updated"` - mu sync.RWMutex + Key string `json:"key"` + SessionID string `json:"session_id,omitempty"` + Messages []providers.Message `json:"messages"` + Summary string `json:"summary,omitempty"` + TokenIn int `json:"token_in,omitempty"` + TokenOut int `json:"token_out,omitempty"` + TokenSum int `json:"token_sum,omitempty"` + Created time.Time `json:"created"` + Updated time.Time `json:"updated"` + mu sync.RWMutex +} + +type SessionMeta struct { + SessionID string `json:"sessionId"` + SessionFile string `json:"sessionFile"` + UpdatedAt int64 `json:"updatedAt"` + Extra map[string]interface{} `json:"-"` +} + +func (m *SessionMeta) UnmarshalJSON(data []byte) error { + var raw map[string]json.RawMessage + if err := json.Unmarshal(data, &raw); err != nil { + return err + } + + m.Extra = make(map[string]interface{}) + for k, v := range raw { + switch k { + case "sessionId": + _ = json.Unmarshal(v, &m.SessionID) + case "sessionFile": + _ = json.Unmarshal(v, &m.SessionFile) + case "updatedAt": + if err := unmarshalUpdatedAt(v, &m.UpdatedAt); err != nil { + return err + } + default: + var anyVal interface{} + if err := json.Unmarshal(v, &anyVal); err == nil { + m.Extra[k] = anyVal + } + } + } + return nil +} + +func (m SessionMeta) MarshalJSON() ([]byte, error) { + obj := make(map[string]interface{}) + for k, v := range m.Extra { + obj[k] = v + } + obj["sessionId"] = m.SessionID + obj["sessionFile"] = m.SessionFile + obj["updatedAt"] = m.UpdatedAt + return json.Marshal(obj) +} + +type openClawContentPart struct { + Type string `json:"type"` + Text string `json:"text,omitempty"` + Thinking string `json:"thinking,omitempty"` + ID string `json:"id,omitempty"` + Name string `json:"name,omitempty"` + Arguments map[string]interface{} `json:"arguments,omitempty"` + ToolCallID string `json:"toolCallId,omitempty"` + ToolName string `json:"toolName,omitempty"` + IsError bool `json:"isError,omitempty"` +} + +type openClawMessage struct { + Role string `json:"role"` + Content []openClawContentPart `json:"content,omitempty"` + Timestamp int64 `json:"timestamp,omitempty"` +} + +type sessionEvent struct { + Type string `json:"type"` + Version int `json:"version,omitempty"` + ID string `json:"id,omitempty"` + ParentID *string `json:"parentId,omitempty"` + Timestamp string `json:"timestamp"` + Cwd string `json:"cwd,omitempty"` + Message *openClawMessage `json:"message,omitempty"` } type SessionManager struct { - sessions map[string]*Session - mu sync.RWMutex - storage string + sessions map[string]*Session + sessionIndex map[string]SessionMeta + sessionKeyByID map[string]string + lastEventBySessionID map[string]string + mu sync.RWMutex + storage string } func NewSessionManager(storage string) *SessionManager { sm := &SessionManager{ - sessions: make(map[string]*Session), - storage: storage, + sessions: make(map[string]*Session), + sessionIndex: make(map[string]SessionMeta), + sessionKeyByID: make(map[string]string), + lastEventBySessionID: make(map[string]string), + storage: storage, } if storage != "" { @@ -56,35 +144,38 @@ func (sm *SessionManager) GetOrCreate(key string) *Session { sm.mu.RLock() session, ok := sm.sessions[key] sm.mu.RUnlock() - if ok { return session } + now := time.Now().UTC() + meta, err := sm.ensureSessionMeta(key, now) + if err != nil { + logger.WarnCF("session", "Failed to ensure session meta", map[string]interface{}{ + "session_key": key, + logger.FieldError: err.Error(), + }) + } + sm.mu.Lock() defer sm.mu.Unlock() - - // Re-check existence after acquiring Write lock if session, ok = sm.sessions[key]; ok { return session } session = &Session{ - Key: key, - Messages: []providers.Message{}, - Created: time.Now(), - Updated: time.Now(), + Key: key, + SessionID: meta.SessionID, + Messages: []providers.Message{}, + Created: now, + Updated: now, } sm.sessions[key] = session - return session } func (sm *SessionManager) AddMessage(sessionKey, role, content string) { - sm.AddMessageFull(sessionKey, providers.Message{ - Role: role, - Content: content, - }) + sm.AddMessageFull(sessionKey, providers.Message{Role: role, Content: content}) } func (sm *SessionManager) AddMessageFull(sessionKey string, msg providers.Message) { @@ -92,10 +183,9 @@ func (sm *SessionManager) AddMessageFull(sessionKey string, msg providers.Messag session.mu.Lock() session.Messages = append(session.Messages, msg) - session.Updated = time.Now() + session.Updated = time.Now().UTC() session.mu.Unlock() - // Persist immediately (append-only) if err := sm.appendMessage(sessionKey, msg); err != nil { logger.ErrorCF("session", "Failed to persist session message", map[string]interface{}{ "session_key": sessionKey, @@ -109,20 +199,17 @@ func (sm *SessionManager) appendMessage(sessionKey string, msg providers.Message return nil } - sessionPath := filepath.Join(sm.storage, sessionKey+".jsonl") - f, err := os.OpenFile(sessionPath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) + now := time.Now().UTC() + meta, err := sm.ensureSessionMeta(sessionKey, now) if err != nil { return err } - defer f.Close() - - data, err := json.Marshal(msg) - if err != nil { + if err := sm.appendSessionHistory(meta.SessionID, msg); err != nil { return err } - _, err = f.Write(append(data, '\n')) - return err + meta.UpdatedAt = toUnixMs(now) + return sm.saveSessionMeta(sessionKey, meta) } func (sm *SessionManager) rewriteHistory(sessionKey string, messages []providers.Message) error { @@ -130,42 +217,70 @@ func (sm *SessionManager) rewriteHistory(sessionKey string, messages []providers return nil } - sessionPath := filepath.Join(sm.storage, sessionKey+".jsonl") + meta, ok := sm.getSession(sessionKey) + if !ok { + now := time.Now().UTC() + var err error + meta, err = sm.ensureSessionMeta(sessionKey, now) + if err != nil { + return err + } + } + + sessionPath := sm.resolveSessionFile(meta) tmpPath := sessionPath + ".tmp" + if err := sm.ensureSessionHeader(meta.SessionID, sessionPath); err != nil { + return err + } f, err := os.OpenFile(tmpPath, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0644) if err != nil { return err } - enc := json.NewEncoder(f) + header := sm.buildSessionHeaderEvent(meta.SessionID, time.Now().UTC()) + if err := writeEventLine(f, header); err != nil { + _ = f.Close() + _ = os.Remove(tmpPath) + return err + } + lastID := header.ID for _, msg := range messages { - if err := enc.Encode(msg); err != nil { + ev := sm.buildMessageEvent(msg, time.Now().UTC(), lastID) + if err := writeEventLine(f, ev); err != nil { _ = f.Close() _ = os.Remove(tmpPath) return err } + lastID = ev.ID } if err := f.Close(); err != nil { _ = os.Remove(tmpPath) return err } - return os.Rename(tmpPath, sessionPath) + if err := os.Rename(tmpPath, sessionPath); err != nil { + return err + } + + sm.mu.Lock() + sm.lastEventBySessionID[meta.SessionID] = lastID + sm.mu.Unlock() + + meta.UpdatedAt = toUnixMs(time.Now().UTC()) + return sm.saveSessionMeta(sessionKey, meta) } func (sm *SessionManager) GetHistory(key string) []providers.Message { sm.mu.RLock() session, ok := sm.sessions[key] sm.mu.RUnlock() - if !ok { return []providers.Message{} } session.mu.RLock() defer session.mu.RUnlock() - history := make([]providers.Message, len(session.Messages)) copy(history, session.Messages) return history @@ -175,14 +290,12 @@ func (sm *SessionManager) GetSummary(key string) string { sm.mu.RLock() session, ok := sm.sessions[key] sm.mu.RUnlock() - if !ok { return "" } session.mu.RLock() defer session.mu.RUnlock() - return session.Summary } @@ -190,34 +303,31 @@ func (sm *SessionManager) SetSummary(key string, summary string) { sm.mu.RLock() session, ok := sm.sessions[key] sm.mu.RUnlock() - - if ok { - session.mu.Lock() - defer session.mu.Unlock() - - session.Summary = summary - session.Updated = time.Now() + if !ok { + return } + + session.mu.Lock() + session.Summary = summary + session.Updated = time.Now().UTC() + session.mu.Unlock() } func (sm *SessionManager) TruncateHistory(key string, keepLast int) { sm.mu.RLock() session, ok := sm.sessions[key] sm.mu.RUnlock() - if !ok { return } session.mu.Lock() defer session.mu.Unlock() - if len(session.Messages) <= keepLast { return } - session.Messages = session.Messages[len(session.Messages)-keepLast:] - session.Updated = time.Now() + session.Updated = time.Now().UTC() } func (sm *SessionManager) MessageCount(key string) int { @@ -227,7 +337,6 @@ func (sm *SessionManager) MessageCount(key string) int { if !ok { return 0 } - session.mu.RLock() defer session.mu.RUnlock() return len(session.Messages) @@ -236,7 +345,6 @@ func (sm *SessionManager) MessageCount(key string) int { func (sm *SessionManager) ListSessionKeys() []string { sm.mu.RLock() defer sm.mu.RUnlock() - keys := make([]string, 0, len(sm.sessions)) for k := range sm.sessions { k = strings.TrimSpace(k) @@ -256,7 +364,6 @@ func (sm *SessionManager) CompactHistory(key, summary string, keepLast int) (int if !ok { return 0, 0, fmt.Errorf("session not found: %s", key) } - if keepLast < 0 { keepLast = 0 } @@ -267,7 +374,7 @@ func (sm *SessionManager) CompactHistory(key, summary string, keepLast int) (int session.Messages = session.Messages[before-keepLast:] } session.Summary = summary - session.Updated = time.Now() + session.Updated = time.Now().UTC() after := len(session.Messages) msgs := make([]providers.Message, after) copy(msgs, session.Messages) @@ -280,35 +387,23 @@ func (sm *SessionManager) CompactHistory(key, summary string, keepLast int) (int } func (sm *SessionManager) Save(session *Session) error { - // Messages are now persisted incrementally via AddMessageFull. - // Keep Save for summary and other metadata updates. - if sm.storage == "" { + if sm.storage == "" || session == nil { return nil } session.mu.RLock() - summary := session.Summary updated := session.Updated - created := session.Created - tokenIn := session.TokenIn - tokenOut := session.TokenOut - tokenSum := session.TokenSum session.mu.RUnlock() - - metaPath := filepath.Join(sm.storage, session.Key+".meta") - meta := map[string]interface{}{ - "summary": summary, - "updated": updated, - "created": created, - "token_in": tokenIn, - "token_out": tokenOut, - "token_sum": tokenSum, + if updated.IsZero() { + updated = time.Now().UTC() } - data, err := json.MarshalIndent(meta, "", " ") + + meta, err := sm.ensureSessionMeta(session.Key, updated) if err != nil { return err } - return os.WriteFile(metaPath, data, 0644) + meta.UpdatedAt = toUnixMs(updated) + return sm.saveSessionMeta(session.Key, meta) } func (sm *SessionManager) AddTokenUsage(sessionKey string, in, out, sum int) { @@ -321,7 +416,7 @@ func (sm *SessionManager) AddTokenUsage(sessionKey string, in, out, sum int) { session.TokenIn += in session.TokenOut += out session.TokenSum += sum - session.Updated = time.Now() + session.Updated = time.Now().UTC() session.mu.Unlock() if sm.storage != "" { @@ -334,70 +429,556 @@ func (sm *SessionManager) AddTokenUsage(sessionKey string, in, out, sum int) { } } -func (sm *SessionManager) loadSessions() error { - files, err := os.ReadDir(sm.storage) +func (sm *SessionManager) GetSession(sessionKey string) (SessionMeta, bool) { + return sm.getSession(sessionKey) +} + +func (sm *SessionManager) SaveSessionMeta(sessionKey string, sessionMeta SessionMeta) error { + return sm.saveSessionMeta(sessionKey, sessionMeta) +} + +func (sm *SessionManager) AppendSessionHistory(sessionID string, event providers.Message) error { + return sm.appendSessionHistory(sessionID, event) +} + +func (sm *SessionManager) LoadSessionHistory(sessionID string, limit int) ([]providers.Message, error) { + messages, _, err := sm.loadSessionHistoryWithState(sessionID, limit) + return messages, err +} + +func (sm *SessionManager) getSession(sessionKey string) (SessionMeta, bool) { + sm.mu.RLock() + defer sm.mu.RUnlock() + meta, ok := sm.sessionIndex[sessionKey] + return meta, ok +} + +func (sm *SessionManager) saveSessionMeta(sessionKey string, sessionMeta SessionMeta) error { + if sm.storage == "" { + return nil + } + + sessionKey = strings.TrimSpace(sessionKey) + if sessionKey == "" { + return fmt.Errorf("sessionKey is required") + } + sessionMeta.SessionID = strings.TrimSpace(sessionMeta.SessionID) + if sessionMeta.SessionID == "" { + sessionMeta.SessionID = uuid.NewString() + } + if strings.TrimSpace(sessionMeta.SessionFile) == "" { + sessionMeta.SessionFile = filepath.Join(sm.storage, sessionMeta.SessionID+".jsonl") + } else if !filepath.IsAbs(sessionMeta.SessionFile) { + sessionMeta.SessionFile = filepath.Join(sm.storage, sessionMeta.SessionFile) + } + if sessionMeta.UpdatedAt == 0 { + sessionMeta.UpdatedAt = toUnixMs(time.Now().UTC()) + } + if sessionMeta.Extra == nil { + sessionMeta.Extra = make(map[string]interface{}) + } + + sm.mu.Lock() + sm.sessionIndex[sessionKey] = sessionMeta + sm.sessionKeyByID[sessionMeta.SessionID] = sessionKey + if sess, ok := sm.sessions[sessionKey]; ok { + sess.SessionID = sessionMeta.SessionID + } + err := sm.writeSessionsIndexLocked() + sm.mu.Unlock() if err != nil { return err } - for _, file := range files { - if file.IsDir() { + return sm.ensureSessionHeader(sessionMeta.SessionID, sessionMeta.SessionFile) +} + +func (sm *SessionManager) appendSessionHistory(sessionID string, event providers.Message) error { + if sm.storage == "" { + return nil + } + sessionID = strings.TrimSpace(sessionID) + if sessionID == "" { + return fmt.Errorf("sessionID is required") + } + + sessionPath := filepath.Join(sm.storage, sessionID+".jsonl") + sm.mu.RLock() + if key, ok := sm.sessionKeyByID[sessionID]; ok { + if meta, ok := sm.sessionIndex[key]; ok { + sessionPath = sm.resolveSessionFile(meta) + } + } + parent := sm.lastEventBySessionID[sessionID] + sm.mu.RUnlock() + + if err := sm.ensureSessionHeader(sessionID, sessionPath); err != nil { + return err + } + + f, err := os.OpenFile(sessionPath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) + if err != nil { + return err + } + defer f.Close() + + ev := sm.buildMessageEvent(event, time.Now().UTC(), parent) + if err := writeEventLine(f, ev); err != nil { + return err + } + + sm.mu.Lock() + sm.lastEventBySessionID[sessionID] = ev.ID + sm.mu.Unlock() + return nil +} + +func (sm *SessionManager) loadSessionHistory(sessionID string, limit int) ([]providers.Message, error) { + messages, _, err := sm.loadSessionHistoryWithState(sessionID, limit) + return messages, err +} + +func (sm *SessionManager) loadSessionHistoryWithState(sessionID string, limit int) ([]providers.Message, string, error) { + if sm.storage == "" || strings.TrimSpace(sessionID) == "" { + return []providers.Message{}, "", nil + } + + sessionPath := filepath.Join(sm.storage, sessionID+".jsonl") + sm.mu.RLock() + if key, ok := sm.sessionKeyByID[sessionID]; ok { + if meta, ok := sm.sessionIndex[key]; ok { + sessionPath = sm.resolveSessionFile(meta) + } + } + sm.mu.RUnlock() + + f, err := os.Open(sessionPath) + if err != nil { + if os.IsNotExist(err) { + return []providers.Message{}, "", nil + } + return nil, "", err + } + defer f.Close() + + messages := make([]providers.Message, 0) + var lastEventID string + scanner := bufio.NewScanner(f) + for scanner.Scan() { + line := strings.TrimSpace(scanner.Text()) + if line == "" { continue } - - // Load JSONL history messages - if filepath.Ext(file.Name()) == ".jsonl" { - sessionKey := strings.TrimSuffix(file.Name(), ".jsonl") - session := sm.GetOrCreate(sessionKey) - - f, err := os.Open(filepath.Join(sm.storage, file.Name())) - if err != nil { - continue - } - scanner := bufio.NewScanner(f) - session.mu.Lock() - for scanner.Scan() { - var msg providers.Message - if err := json.Unmarshal(scanner.Bytes(), &msg); err == nil { - session.Messages = append(session.Messages, msg) - } - } - session.mu.Unlock() - if err := scanner.Err(); err != nil { - logger.WarnCF("session", "Error while scanning session history", map[string]interface{}{ - "file": file.Name(), - logger.FieldError: err.Error(), - }) - } - _ = f.Close() + var ev sessionEvent + if err := json.Unmarshal([]byte(line), &ev); err != nil { + logger.WarnCF("session", "Skip malformed session event", map[string]interface{}{ + "session_id": sessionID, + logger.FieldError: err.Error(), + }) + continue } - - // Load metadata - if filepath.Ext(file.Name()) == ".meta" { - sessionKey := strings.TrimSuffix(file.Name(), ".meta") - session := sm.GetOrCreate(sessionKey) - - data, err := os.ReadFile(filepath.Join(sm.storage, file.Name())) - if err == nil { - var meta struct { - Summary string `json:"summary"` - Updated time.Time `json:"updated"` - Created time.Time `json:"created"` - TokenIn int `json:"token_in"` - TokenOut int `json:"token_out"` - TokenSum int `json:"token_sum"` - } - if err := json.Unmarshal(data, &meta); err == nil { - session.Summary = meta.Summary - session.Updated = meta.Updated - session.Created = meta.Created - session.TokenIn = meta.TokenIn - session.TokenOut = meta.TokenOut - session.TokenSum = meta.TokenSum - } - } + if strings.TrimSpace(ev.ID) != "" { + lastEventID = ev.ID } + if ev.Type != "message" || ev.Message == nil { + continue + } + messages = append(messages, toProviderMessage(*ev.Message)) + } + if err := scanner.Err(); err != nil { + return nil, "", err + } + + if limit > 0 && len(messages) > limit { + messages = messages[len(messages)-limit:] + } + return messages, lastEventID, nil +} + +func (sm *SessionManager) ensureSessionMeta(sessionKey string, updatedAt time.Time) (SessionMeta, error) { + if sm.storage == "" { + return SessionMeta{}, nil + } + + sessionKey = strings.TrimSpace(sessionKey) + if sessionKey == "" { + return SessionMeta{}, fmt.Errorf("sessionKey is required") + } + + if meta, ok := sm.getSession(sessionKey); ok { + if !updatedAt.IsZero() { + meta.UpdatedAt = toUnixMs(updatedAt) + } + if err := sm.saveSessionMeta(sessionKey, meta); err != nil { + return SessionMeta{}, err + } + return meta, nil + } + + if updatedAt.IsZero() { + updatedAt = time.Now().UTC() + } + meta := SessionMeta{ + SessionID: uuid.NewString(), + SessionFile: filepath.Join(sm.storage, uuid.NewString()+".jsonl"), + UpdatedAt: toUnixMs(updatedAt), + } + meta.SessionFile = filepath.Join(sm.storage, meta.SessionID+".jsonl") + if err := sm.saveSessionMeta(sessionKey, meta); err != nil { + return SessionMeta{}, err + } + meta, ok := sm.getSession(sessionKey) + if !ok { + return SessionMeta{}, fmt.Errorf("session meta not found after save: %s", sessionKey) + } + return meta, nil +} + +func (sm *SessionManager) loadSessions() error { + if sm.storage == "" { + return nil + } + if err := sm.readSessionsIndex(); err != nil { + return err + } + + sm.mu.Lock() + for sessionKey, meta := range sm.sessionIndex { + sm.sessionKeyByID[meta.SessionID] = sessionKey + updated := fromUnixMs(meta.UpdatedAt) + sm.sessions[sessionKey] = &Session{ + Key: sessionKey, + SessionID: meta.SessionID, + Messages: []providers.Message{}, + Created: updated, + Updated: updated, + } + } + sm.mu.Unlock() + + sm.mu.RLock() + indexCopy := make(map[string]SessionMeta, len(sm.sessionIndex)) + for k, v := range sm.sessionIndex { + indexCopy[k] = v + } + sm.mu.RUnlock() + + for sessionKey, meta := range indexCopy { + messages, lastID, err := sm.loadSessionHistoryWithState(meta.SessionID, 0) + if err != nil { + logger.WarnCF("session", "Failed to load session history", map[string]interface{}{ + "session_key": sessionKey, + "session_id": meta.SessionID, + logger.FieldError: err.Error(), + }) + continue + } + sm.mu.Lock() + sm.lastEventBySessionID[meta.SessionID] = lastID + session := sm.sessions[sessionKey] + sm.mu.Unlock() + if session == nil { + continue + } + session.mu.Lock() + session.Messages = append(session.Messages, messages...) + session.mu.Unlock() } return nil } + +func (sm *SessionManager) readSessionsIndex() error { + indexPath := filepath.Join(sm.storage, sessionsIndexFile) + data, err := os.ReadFile(indexPath) + if err != nil { + if os.IsNotExist(err) { + return nil + } + return err + } + if len(strings.TrimSpace(string(data))) == 0 { + return nil + } + + parsed := make(map[string]SessionMeta) + if err := json.Unmarshal(data, &parsed); err != nil { + return err + } + + sm.mu.Lock() + for k, meta := range parsed { + k = strings.TrimSpace(k) + if k == "" { + continue + } + meta.SessionID = strings.TrimSpace(meta.SessionID) + if meta.SessionID == "" { + continue + } + if strings.TrimSpace(meta.SessionFile) == "" { + meta.SessionFile = filepath.Join(sm.storage, meta.SessionID+".jsonl") + } else if !filepath.IsAbs(meta.SessionFile) { + meta.SessionFile = filepath.Join(sm.storage, meta.SessionFile) + } + if meta.UpdatedAt == 0 { + meta.UpdatedAt = toUnixMs(time.Now().UTC()) + } + if meta.Extra == nil { + meta.Extra = make(map[string]interface{}) + } + sm.sessionIndex[k] = meta + } + sm.mu.Unlock() + return nil +} + +func (sm *SessionManager) writeSessionsIndexLocked() error { + if sm.storage == "" { + return nil + } + + indexPath := filepath.Join(sm.storage, sessionsIndexFile) + tmpPath := indexPath + ".tmp" + + indexCopy := make(map[string]SessionMeta, len(sm.sessionIndex)) + for k, v := range sm.sessionIndex { + indexCopy[k] = v + } + + data, err := json.MarshalIndent(indexCopy, "", " ") + if err != nil { + return err + } + if err := os.WriteFile(tmpPath, data, 0644); err != nil { + return err + } + return os.Rename(tmpPath, indexPath) +} + +func (sm *SessionManager) ensureSessionHeader(sessionID, sessionFile string) error { + if sm.storage == "" { + return nil + } + sessionID = strings.TrimSpace(sessionID) + if sessionID == "" { + return fmt.Errorf("sessionID is required") + } + sessionPath := sessionFile + if strings.TrimSpace(sessionPath) == "" { + sessionPath = filepath.Join(sm.storage, sessionID+".jsonl") + } + if !filepath.IsAbs(sessionPath) { + sessionPath = filepath.Join(sm.storage, sessionPath) + } + + if err := os.MkdirAll(filepath.Dir(sessionPath), 0755); err != nil { + return err + } + + st, err := os.Stat(sessionPath) + if err == nil && st.Size() > 0 { + return nil + } + if err != nil && !os.IsNotExist(err) { + return err + } + + f, err := os.OpenFile(sessionPath, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0644) + if err != nil { + return err + } + defer f.Close() + + ev := sm.buildSessionHeaderEvent(sessionID, time.Now().UTC()) + if err := writeEventLine(f, ev); err != nil { + return err + } + + sm.mu.Lock() + sm.lastEventBySessionID[sessionID] = ev.ID + sm.mu.Unlock() + return nil +} + +func (sm *SessionManager) buildSessionHeaderEvent(sessionID string, ts time.Time) sessionEvent { + cwd, _ := os.Getwd() + return sessionEvent{ + Type: "session", + Version: openClawVersion, + ID: sessionID, + Timestamp: ts.UTC().Format(time.RFC3339Nano), + Cwd: cwd, + } +} + +func (sm *SessionManager) buildMessageEvent(msg providers.Message, ts time.Time, parentID string) sessionEvent { + evID := newEventID() + var parent *string + parentID = strings.TrimSpace(parentID) + if parentID != "" { + p := parentID + parent = &p + } + return sessionEvent{ + Type: "message", + ID: evID, + ParentID: parent, + Timestamp: ts.UTC().Format(time.RFC3339Nano), + Message: toOpenClawMessage(msg, ts), + } +} + +func toOpenClawMessage(msg providers.Message, ts time.Time) *openClawMessage { + parts := make([]openClawContentPart, 0, 1+len(msg.ContentParts)+len(msg.ToolCalls)) + for _, p := range msg.ContentParts { + if p.Type == "" { + continue + } + part := openClawContentPart{Type: p.Type, Text: p.Text} + parts = append(parts, part) + } + if strings.TrimSpace(msg.Content) != "" { + partType := "text" + if msg.Role == "assistant" && strings.Contains(msg.Content, "") { + partType = "thinking" + } + parts = append(parts, openClawContentPart{Type: partType, Text: msg.Content}) + } + for _, tc := range msg.ToolCalls { + part := openClawContentPart{ + Type: "toolCall", + ID: tc.ID, + Name: tc.Name, + Arguments: tc.Arguments, + } + if part.Name == "" && tc.Function != nil { + part.Name = tc.Function.Name + } + parts = append(parts, part) + } + if msg.Role == "tool" { + parts = []openClawContentPart{{ + Type: "toolResult", + ToolCallID: msg.ToolCallID, + ToolName: "tool", + Text: msg.Content, + }} + } + + return &openClawMessage{ + Role: msg.Role, + Content: parts, + Timestamp: toUnixMs(ts), + } +} + +func toProviderMessage(msg openClawMessage) providers.Message { + out := providers.Message{Role: msg.Role} + textParts := make([]string, 0) + toolCalls := make([]providers.ToolCall, 0) + for _, p := range msg.Content { + switch p.Type { + case "text", "thinking": + if strings.TrimSpace(p.Text) != "" { + textParts = append(textParts, p.Text) + } + case "toolCall": + toolCalls = append(toolCalls, providers.ToolCall{ + ID: p.ID, + Type: "function", + Name: p.Name, + Arguments: p.Arguments, + }) + case "toolResult": + if strings.TrimSpace(p.Text) != "" { + textParts = append(textParts, p.Text) + } + if strings.TrimSpace(p.ToolCallID) != "" { + out.ToolCallID = p.ToolCallID + } + } + } + if len(toolCalls) > 0 { + out.ToolCalls = toolCalls + } + if len(textParts) > 0 { + out.Content = strings.Join(textParts, "\n") + } + return out +} + +func (sm *SessionManager) resolveSessionFile(meta SessionMeta) string { + if strings.TrimSpace(meta.SessionFile) == "" { + return filepath.Join(sm.storage, meta.SessionID+".jsonl") + } + if filepath.IsAbs(meta.SessionFile) { + return meta.SessionFile + } + return filepath.Join(sm.storage, meta.SessionFile) +} + +func writeEventLine(f *os.File, ev sessionEvent) error { + b, err := json.Marshal(ev) + if err != nil { + return err + } + _, err = f.Write(append(b, '\n')) + return err +} + +func unmarshalUpdatedAt(raw json.RawMessage, out *int64) error { + if len(raw) == 0 { + *out = 0 + return nil + } + + var asInt int64 + if err := json.Unmarshal(raw, &asInt); err == nil { + *out = asInt + return nil + } + + var asFloat float64 + if err := json.Unmarshal(raw, &asFloat); err == nil { + *out = int64(asFloat) + return nil + } + + var asString string + if err := json.Unmarshal(raw, &asString); err == nil { + if asString == "" { + *out = 0 + return nil + } + if n, err := strconv.ParseInt(asString, 10, 64); err == nil { + *out = n + return nil + } + if ts, err := time.Parse(time.RFC3339Nano, asString); err == nil { + *out = toUnixMs(ts) + return nil + } + } + + return fmt.Errorf("invalid updatedAt value: %s", string(raw)) +} + +func toUnixMs(t time.Time) int64 { + return t.UTC().UnixNano() / int64(time.Millisecond) +} + +func fromUnixMs(ms int64) time.Time { + if ms <= 0 { + return time.Now().UTC() + } + return time.Unix(0, ms*int64(time.Millisecond)).UTC() +} + +func newEventID() string { + id := strings.ReplaceAll(uuid.NewString(), "-", "") + if len(id) > 8 { + return id[:8] + } + return id +} diff --git a/pkg/session/manager_test.go b/pkg/session/manager_test.go new file mode 100644 index 0000000..f82b36a --- /dev/null +++ b/pkg/session/manager_test.go @@ -0,0 +1,146 @@ +package session + +import ( + "bufio" + "encoding/json" + "os" + "path/filepath" + "testing" + + "clawgo/pkg/providers" +) + +func TestSessionIndexReadWriteOpenClawFormat(t *testing.T) { + dir := t.TempDir() + sm := NewSessionManager(dir) + + meta := SessionMeta{ + SessionID: "sid-1", + SessionFile: filepath.Join(dir, "sid-1.jsonl"), + UpdatedAt: 1770962127556, + } + if err := sm.saveSessionMeta("channel:chat", meta); err != nil { + t.Fatalf("saveSessionMeta failed: %v", err) + } + + indexPath := filepath.Join(dir, sessionsIndexFile) + data, err := os.ReadFile(indexPath) + if err != nil { + t.Fatalf("failed to read sessions index: %v", err) + } + + raw := map[string]map[string]interface{}{} + if err := json.Unmarshal(data, &raw); err != nil { + t.Fatalf("failed to parse sessions index: %v", err) + } + entry, ok := raw["channel:chat"] + if !ok { + t.Fatalf("sessions index missing key") + } + + if _, ok := entry["sessionId"].(string); !ok { + t.Fatalf("sessionId should be string, got %T", entry["sessionId"]) + } + if _, ok := entry["sessionFile"].(string); !ok { + t.Fatalf("sessionFile should be string, got %T", entry["sessionFile"]) + } + if _, ok := entry["updatedAt"].(float64); !ok { + t.Fatalf("updatedAt should be number(ms), got %T", entry["updatedAt"]) + } +} + +func TestAppendAndLoadSessionHistoryOpenClawJSONL(t *testing.T) { + dir := t.TempDir() + sm := NewSessionManager(dir) + + sessionKey := "telegram:chat-1" + sm.AddMessage(sessionKey, "user", "hello") + sm.AddMessage(sessionKey, "assistant", "world") + + meta, ok := sm.getSession(sessionKey) + if !ok { + t.Fatalf("expected session meta for key") + } + + f, err := os.Open(meta.SessionFile) + if err != nil { + t.Fatalf("failed to open session file: %v", err) + } + defer f.Close() + + scanner := bufio.NewScanner(f) + lines := make([]string, 0) + for scanner.Scan() { + lines = append(lines, scanner.Text()) + } + if err := scanner.Err(); err != nil { + t.Fatalf("scan failed: %v", err) + } + if len(lines) < 3 { + t.Fatalf("expected at least 3 lines(session + 2 messages), got %d", len(lines)) + } + + var first map[string]interface{} + if err := json.Unmarshal([]byte(lines[0]), &first); err != nil { + t.Fatalf("invalid first event json: %v", err) + } + if first["type"] != "session" { + t.Fatalf("first event should be session, got %v", first["type"]) + } + + all, err := sm.loadSessionHistory(meta.SessionID, 0) + if err != nil { + t.Fatalf("loadSessionHistory failed: %v", err) + } + if len(all) != 2 { + t.Fatalf("unexpected history len: got %d want 2", len(all)) + } + if all[0].Content != "hello" || all[1].Content != "world" { + t.Fatalf("unexpected loaded content: %+v", all) + } + + limited, err := sm.loadSessionHistory(meta.SessionID, 1) + if err != nil { + t.Fatalf("loadSessionHistory(limit) failed: %v", err) + } + if len(limited) != 1 || limited[0].Content != "world" { + t.Fatalf("unexpected limited history: %+v", limited) + } +} + +func TestMultipleEventsUnderSameSessionKeyOpenClaw(t *testing.T) { + dir := t.TempDir() + sm := NewSessionManager(dir) + + sessionKey := "discord:room-1" + events := []providers.Message{ + {Role: "user", Content: "first"}, + {Role: "assistant", Content: "second"}, + {Role: "user", Content: "third"}, + } + for _, e := range events { + sm.AddMessageFull(sessionKey, e) + } + + meta, ok := sm.getSession(sessionKey) + if !ok { + t.Fatalf("expected session meta") + } + + history, err := sm.loadSessionHistory(meta.SessionID, 0) + if err != nil { + t.Fatalf("loadSessionHistory failed: %v", err) + } + if len(history) != 3 { + t.Fatalf("unexpected history len: got %d want 3", len(history)) + } + if history[2].Content != "third" { + t.Fatalf("expected latest content third, got %q", history[2].Content) + } + + sm2 := NewSessionManager(dir) + reloaded := sm2.GetHistory(sessionKey) + if len(reloaded) != 3 { + t.Fatalf("unexpected reloaded history len: got %d want 3", len(reloaded)) + } +}