Files
clawgo/pkg/tools/subagent_mailbox.go
2026-04-13 13:41:01 +08:00

589 lines
17 KiB
Go

package tools
import (
"encoding/json"
"fmt"
"os"
"path/filepath"
"sort"
"strconv"
"strings"
"sync"
"time"
"github.com/YspCoder/clawgo/pkg/jsonlog"
)
type AgentThread struct {
ThreadID string `json:"thread_id"`
Owner string `json:"owner"`
Participants []string `json:"participants,omitempty"`
Status string `json:"status"`
Topic string `json:"topic,omitempty"`
CreatedAt int64 `json:"created_at"`
UpdatedAt int64 `json:"updated_at"`
}
type AgentMessage struct {
MessageID string `json:"message_id"`
ThreadID string `json:"thread_id"`
FromAgent string `json:"from_agent"`
ToAgent string `json:"to_agent"`
ReplyTo string `json:"reply_to,omitempty"`
CorrelationID string `json:"correlation_id,omitempty"`
Type string `json:"type"`
Content string `json:"content"`
RequiresReply bool `json:"requires_reply,omitempty"`
Status string `json:"status"`
CreatedAt int64 `json:"created_at"`
}
type mailboxThreadsMetaFile struct {
Version int `json:"version"`
LastOffset int64 `json:"last_offset,omitempty"`
ThreadSeq int `json:"thread_seq,omitempty"`
UpdatedAt int64 `json:"updated_at,omitempty"`
Threads []AgentThread `json:"threads,omitempty"`
}
type mailboxMessagesMetaFile struct {
Version int `json:"version"`
LastOffset int64 `json:"last_offset,omitempty"`
MsgSeq int `json:"msg_seq,omitempty"`
UpdatedAt int64 `json:"updated_at,omitempty"`
Messages []AgentMessage `json:"messages,omitempty"`
ThreadMessages map[string][]string `json:"thread_messages,omitempty"`
QueuedByAgent map[string][]string `json:"queued_by_agent,omitempty"`
QueuedByThreadAgent map[string][]string `json:"queued_by_thread_agent,omitempty"`
}
type AgentMailboxStore struct {
dir string
threadsPath string
msgsPath string
threadsMetaPath string
msgsMetaPath string
mu sync.RWMutex
threads map[string]*AgentThread
messages map[string]*AgentMessage
threadMessages map[string][]string
queuedByAgent map[string][]string
queuedByThreadAgent map[string][]string
msgSeq int
threadSeq int
}
func NewAgentMailboxStore(workspace string) *AgentMailboxStore {
workspace = strings.TrimSpace(workspace)
if workspace == "" {
return nil
}
dir := filepath.Join(workspace, "agents", "runtime")
s := &AgentMailboxStore{
dir: dir,
threadsPath: filepath.Join(dir, "threads.jsonl"),
msgsPath: filepath.Join(dir, "agent_messages.jsonl"),
threadsMetaPath: filepath.Join(dir, "threads.meta.json"),
msgsMetaPath: filepath.Join(dir, "agent_messages.meta.json"),
threads: map[string]*AgentThread{},
messages: map[string]*AgentMessage{},
threadMessages: map[string][]string{},
queuedByAgent: map[string][]string{},
queuedByThreadAgent: map[string][]string{},
}
_ = os.MkdirAll(dir, 0o755)
_ = s.load()
return s
}
func (s *AgentMailboxStore) load() error {
s.mu.Lock()
defer s.mu.Unlock()
s.resetLocked()
if err := s.loadThreadsLocked(); err != nil {
return err
}
return s.loadMessagesLocked()
}
func (s *AgentMailboxStore) resetLocked() {
s.threads = map[string]*AgentThread{}
s.messages = map[string]*AgentMessage{}
s.threadMessages = map[string][]string{}
s.queuedByAgent = map[string][]string{}
s.queuedByThreadAgent = map[string][]string{}
s.msgSeq = 0
s.threadSeq = 0
}
func (s *AgentMailboxStore) loadThreadsLocked() error {
size, err := jsonlog.FileSize(s.threadsPath)
if err != nil {
return err
}
var meta mailboxThreadsMetaFile
if err := jsonlog.ReadJSON(s.threadsMetaPath, &meta); err == nil && meta.Version > 0 && meta.LastOffset == size {
for _, thread := range meta.Threads {
cp := thread
cp.Participants = append([]string(nil), thread.Participants...)
s.threads[cp.ThreadID] = &cp
}
s.threadSeq = meta.ThreadSeq
return nil
}
if size == 0 {
return s.persistThreadsMetaLocked(size)
}
if err := jsonlog.Scan(s.threadsPath, func(line []byte) error {
var thread AgentThread
if err := json.Unmarshal(line, &thread); err != nil || strings.TrimSpace(thread.ThreadID) == "" {
return nil
}
cp := thread
cp.Participants = append([]string(nil), thread.Participants...)
s.threads[thread.ThreadID] = &cp
if n := parseThreadSequence(thread.ThreadID); n > s.threadSeq {
s.threadSeq = n
}
return nil
}); err != nil {
return err
}
return s.persistThreadsMetaLocked(size)
}
func (s *AgentMailboxStore) loadMessagesLocked() error {
size, err := jsonlog.FileSize(s.msgsPath)
if err != nil {
return err
}
var meta mailboxMessagesMetaFile
if err := jsonlog.ReadJSON(s.msgsMetaPath, &meta); err == nil && meta.Version > 0 && meta.LastOffset == size {
for _, msg := range meta.Messages {
cp := msg
s.messages[msg.MessageID] = &cp
}
s.threadMessages = cloneStringSliceMap(meta.ThreadMessages)
s.queuedByAgent = cloneStringSliceMap(meta.QueuedByAgent)
s.queuedByThreadAgent = cloneStringSliceMap(meta.QueuedByThreadAgent)
s.msgSeq = meta.MsgSeq
s.reconcileThreadsFromMessagesLocked()
return nil
}
if size == 0 {
return s.persistMessagesMetaLocked(size)
}
if err := jsonlog.Scan(s.msgsPath, func(line []byte) error {
var msg AgentMessage
if err := json.Unmarshal(line, &msg); err != nil || strings.TrimSpace(msg.MessageID) == "" {
return nil
}
s.indexMessageLocked(msg)
return nil
}); err != nil {
return err
}
return s.persistMessagesMetaLocked(size)
}
func (s *AgentMailboxStore) EnsureThread(thread AgentThread) (AgentThread, error) {
if s == nil {
return thread, nil
}
s.mu.Lock()
defer s.mu.Unlock()
if err := os.MkdirAll(s.dir, 0o755); err != nil {
return AgentThread{}, err
}
if strings.TrimSpace(thread.ThreadID) == "" {
s.threadSeq++
thread.ThreadID = fmt.Sprintf("thread-%04d", s.threadSeq)
}
thread.Participants = normalizeStringList(thread.Participants)
if strings.TrimSpace(thread.Status) == "" {
thread.Status = "open"
}
if thread.CreatedAt <= 0 {
thread.CreatedAt = thread.UpdatedAt
}
if thread.CreatedAt <= 0 {
thread.CreatedAt = 1
}
if thread.UpdatedAt <= 0 {
thread.UpdatedAt = thread.CreatedAt
}
offset, err := jsonlog.AppendLine(s.threadsPath, thread)
if err != nil {
return AgentThread{}, err
}
cp := thread
cp.Participants = append([]string(nil), thread.Participants...)
s.threads[thread.ThreadID] = &cp
if n := parseThreadSequence(thread.ThreadID); n > s.threadSeq {
s.threadSeq = n
}
if err := s.persistThreadsMetaLocked(offset); err != nil {
return AgentThread{}, err
}
return thread, nil
}
func (s *AgentMailboxStore) AppendMessage(msg AgentMessage) (AgentMessage, error) {
if s == nil {
return msg, nil
}
s.mu.Lock()
defer s.mu.Unlock()
if err := os.MkdirAll(s.dir, 0o755); err != nil {
return AgentMessage{}, err
}
if strings.TrimSpace(msg.MessageID) == "" {
s.msgSeq++
msg.MessageID = fmt.Sprintf("msg-%06d", s.msgSeq)
}
if strings.TrimSpace(msg.Status) == "" {
msg.Status = "queued"
}
offset, err := jsonlog.AppendLine(s.msgsPath, msg)
if err != nil {
return AgentMessage{}, err
}
s.indexMessageLocked(msg)
if err := s.persistMessagesMetaLocked(offset); err != nil {
return AgentMessage{}, err
}
if err := s.persistThreadsMetaLocked(0); err != nil {
return AgentMessage{}, err
}
return msg, nil
}
func (s *AgentMailboxStore) Thread(threadID string) (*AgentThread, bool) {
if s == nil {
return nil, false
}
s.mu.RLock()
defer s.mu.RUnlock()
thread, ok := s.threads[strings.TrimSpace(threadID)]
if !ok {
return nil, false
}
cp := *thread
cp.Participants = append([]string(nil), thread.Participants...)
return &cp, true
}
func (s *AgentMailboxStore) MessagesByThread(threadID string, limit int) ([]AgentMessage, error) {
if s == nil {
return nil, nil
}
return s.currentIndexedMessages(s.threadMessages[strings.TrimSpace(threadID)], limit), nil
}
func (s *AgentMailboxStore) Inbox(agentID string, limit int) ([]AgentMessage, error) {
if s == nil {
return nil, nil
}
return s.currentIndexedMessages(s.queuedByAgent[strings.TrimSpace(agentID)], limit), nil
}
func (s *AgentMailboxStore) ThreadInbox(threadID, agentID string, limit int) ([]AgentMessage, error) {
if s == nil {
return nil, nil
}
return s.currentIndexedMessages(s.queuedByThreadAgent[mailboxThreadAgentKey(threadID, agentID)], limit), nil
}
func (s *AgentMailboxStore) Message(messageID string) (*AgentMessage, bool) {
if s == nil {
return nil, false
}
s.mu.RLock()
defer s.mu.RUnlock()
msg, ok := s.messages[strings.TrimSpace(messageID)]
if !ok {
return nil, false
}
cp := *msg
return &cp, true
}
func (s *AgentMailboxStore) UpdateMessageStatus(messageID, status string, at int64) (*AgentMessage, error) {
if s == nil {
return nil, nil
}
s.mu.RLock()
current, ok := s.messages[strings.TrimSpace(messageID)]
s.mu.RUnlock()
if !ok {
return nil, fmt.Errorf("message not found: %s", messageID)
}
updated := *current
updated.Status = strings.TrimSpace(status)
if updated.Status == "" {
updated.Status = current.Status
}
if at > 0 {
updated.CreatedAt = at
}
msg, err := s.AppendMessage(updated)
if err != nil {
return nil, err
}
return &msg, nil
}
func (s *AgentMailboxStore) currentIndexedMessages(ids []string, limit int) []AgentMessage {
s.mu.RLock()
defer s.mu.RUnlock()
out := make([]AgentMessage, 0, len(ids))
for _, id := range ids {
msg := s.messages[id]
if msg == nil {
continue
}
out = append(out, *msg)
}
sort.Slice(out, func(i, j int) bool {
if out[i].CreatedAt != out[j].CreatedAt {
return out[i].CreatedAt < out[j].CreatedAt
}
return out[i].MessageID < out[j].MessageID
})
if limit > 0 && len(out) > limit {
out = out[len(out)-limit:]
}
return out
}
func (s *AgentMailboxStore) indexMessageLocked(msg AgentMessage) {
msg.MessageID = strings.TrimSpace(msg.MessageID)
if msg.MessageID == "" {
return
}
if n := parseMessageSequence(msg.MessageID); n > s.msgSeq {
s.msgSeq = n
}
if existing := s.messages[msg.MessageID]; existing != nil {
s.removeQueuedIndexesLocked(*existing)
}
cp := msg
s.messages[msg.MessageID] = &cp
s.threadMessages[msg.ThreadID] = appendUniqueString(s.threadMessages[msg.ThreadID], msg.MessageID)
if strings.EqualFold(strings.TrimSpace(msg.Status), "queued") {
s.queuedByAgent[msg.ToAgent] = appendUniqueString(s.queuedByAgent[msg.ToAgent], msg.MessageID)
s.queuedByThreadAgent[mailboxThreadAgentKey(msg.ThreadID, msg.ToAgent)] = appendUniqueString(s.queuedByThreadAgent[mailboxThreadAgentKey(msg.ThreadID, msg.ToAgent)], msg.MessageID)
}
thread := s.ensureThreadStateLocked(msg.ThreadID)
if thread != nil {
if msg.CreatedAt > thread.UpdatedAt {
thread.UpdatedAt = msg.CreatedAt
}
participants := append([]string(nil), thread.Participants...)
participants = append(participants, msg.FromAgent, msg.ToAgent)
thread.Participants = normalizeStringList(participants)
}
}
func (s *AgentMailboxStore) ensureThreadStateLocked(threadID string) *AgentThread {
threadID = strings.TrimSpace(threadID)
if threadID == "" {
return nil
}
thread := s.threads[threadID]
if thread != nil {
return thread
}
thread = &AgentThread{
ThreadID: threadID,
Status: "open",
CreatedAt: 1,
UpdatedAt: 1,
Participants: nil,
}
s.threads[threadID] = thread
if n := parseThreadSequence(threadID); n > s.threadSeq {
s.threadSeq = n
}
return thread
}
func (s *AgentMailboxStore) removeQueuedIndexesLocked(msg AgentMessage) {
if !strings.EqualFold(strings.TrimSpace(msg.Status), "queued") {
return
}
s.queuedByAgent[msg.ToAgent] = removeStringValue(s.queuedByAgent[msg.ToAgent], msg.MessageID)
s.queuedByThreadAgent[mailboxThreadAgentKey(msg.ThreadID, msg.ToAgent)] = removeStringValue(s.queuedByThreadAgent[mailboxThreadAgentKey(msg.ThreadID, msg.ToAgent)], msg.MessageID)
}
func (s *AgentMailboxStore) reconcileThreadsFromMessagesLocked() {
for _, msg := range s.messages {
s.ensureThreadStateLocked(msg.ThreadID)
}
for _, msg := range s.messages {
thread := s.threads[msg.ThreadID]
if thread == nil {
continue
}
if msg.CreatedAt > thread.UpdatedAt {
thread.UpdatedAt = msg.CreatedAt
}
participants := append([]string(nil), thread.Participants...)
participants = append(participants, msg.FromAgent, msg.ToAgent)
thread.Participants = normalizeStringList(participants)
}
}
func (s *AgentMailboxStore) persistThreadsMetaLocked(offset int64) error {
if offset <= 0 {
size, err := jsonlog.FileSize(s.threadsPath)
if err != nil {
return err
}
offset = size
}
meta := mailboxThreadsMetaFile{
Version: 1,
LastOffset: offset,
ThreadSeq: s.threadSeq,
UpdatedAt: time.Now().UnixMilli(),
Threads: make([]AgentThread, 0, len(s.threads)),
}
for _, thread := range s.threads {
cp := *thread
cp.Participants = append([]string(nil), thread.Participants...)
meta.Threads = append(meta.Threads, cp)
}
sort.Slice(meta.Threads, func(i, j int) bool {
if meta.Threads[i].UpdatedAt != meta.Threads[j].UpdatedAt {
return meta.Threads[i].UpdatedAt > meta.Threads[j].UpdatedAt
}
return meta.Threads[i].ThreadID < meta.Threads[j].ThreadID
})
return jsonlog.WriteJSON(s.threadsMetaPath, meta)
}
func (s *AgentMailboxStore) persistMessagesMetaLocked(offset int64) error {
meta := mailboxMessagesMetaFile{
Version: 1,
LastOffset: offset,
MsgSeq: s.msgSeq,
UpdatedAt: time.Now().UnixMilli(),
Messages: make([]AgentMessage, 0, len(s.messages)),
ThreadMessages: cloneStringSliceMap(s.threadMessages),
QueuedByAgent: cloneStringSliceMap(s.queuedByAgent),
QueuedByThreadAgent: cloneStringSliceMap(s.queuedByThreadAgent),
}
for _, msg := range s.messages {
meta.Messages = append(meta.Messages, *msg)
}
sort.Slice(meta.Messages, func(i, j int) bool {
if meta.Messages[i].CreatedAt != meta.Messages[j].CreatedAt {
return meta.Messages[i].CreatedAt < meta.Messages[j].CreatedAt
}
return meta.Messages[i].MessageID < meta.Messages[j].MessageID
})
return jsonlog.WriteJSON(s.msgsMetaPath, meta)
}
func parseThreadSequence(threadID string) int {
threadID = strings.TrimSpace(threadID)
if !strings.HasPrefix(threadID, "thread-") {
return 0
}
n, _ := strconv.Atoi(strings.TrimPrefix(threadID, "thread-"))
return n
}
func threadToThreadRecord(thread *AgentThread) ThreadRecord {
if thread == nil {
return ThreadRecord{}
}
return ThreadRecord{
ID: thread.ThreadID,
OwnerAgentID: thread.Owner,
Participants: append([]string(nil), thread.Participants...),
Status: thread.Status,
Topic: thread.Topic,
CreatedAt: thread.CreatedAt,
UpdatedAt: thread.UpdatedAt,
}
}
func messageToArtifactRecord(msg AgentMessage) ArtifactRecord {
agentID := strings.TrimSpace(msg.FromAgent)
if agentID == "" {
agentID = strings.TrimSpace(msg.ToAgent)
}
return ArtifactRecord{
ID: msg.MessageID,
RunID: msg.CorrelationID,
RequestID: msg.CorrelationID,
ThreadID: msg.ThreadID,
Kind: "message",
Name: msg.Type,
Content: msg.Content,
AgentID: agentID,
FromAgent: msg.FromAgent,
ToAgent: msg.ToAgent,
ReplyTo: msg.ReplyTo,
CorrelationID: msg.CorrelationID,
Status: msg.Status,
RequiresReply: msg.RequiresReply,
CreatedAt: msg.CreatedAt,
Visible: true,
SourceType: "agent_message",
}
}
func parseMessageSequence(messageID string) int {
messageID = strings.TrimSpace(messageID)
if !strings.HasPrefix(messageID, "msg-") {
return 0
}
n, _ := strconv.Atoi(strings.TrimPrefix(messageID, "msg-"))
return n
}
func mailboxThreadAgentKey(threadID, agentID string) string {
return strings.TrimSpace(threadID) + "::" + strings.TrimSpace(agentID)
}
func cloneStringSliceMap(src map[string][]string) map[string][]string {
if len(src) == 0 {
return map[string][]string{}
}
out := make(map[string][]string, len(src))
for key, values := range src {
out[key] = append([]string(nil), values...)
}
return out
}
func appendUniqueString(values []string, value string) []string {
value = strings.TrimSpace(value)
if value == "" {
return values
}
for _, existing := range values {
if existing == value {
return values
}
}
return append(values, value)
}
func removeStringValue(values []string, value string) []string {
if len(values) == 0 {
return values
}
out := values[:0]
for _, existing := range values {
if existing == value {
continue
}
out = append(out, existing)
}
return append([]string(nil), out...)
}