diff --git a/pkg/agent/loop.go b/pkg/agent/loop.go index 66f6f01..6121fc8 100644 --- a/pkg/agent/loop.go +++ b/pkg/agent/loop.go @@ -19,6 +19,7 @@ import ( "strings" "sync" "time" + "unicode/utf8" "clawgo/pkg/bus" "clawgo/pkg/config" @@ -452,7 +453,12 @@ func (al *AgentLoop) processInbound(ctx context.Context, msg bus.InboundMessage) } } if msg.Channel == "telegram" && suppressed { - al.bus.PublishOutbound(bus.OutboundMessage{Channel: msg.Channel, ChatID: msg.ChatID, Action: "finalize"}) + replyID := "" + if msg.Metadata != nil { + replyID = msg.Metadata["message_id"] + } + // Final pass uses full formatted content to stabilize rendering after plain streaming. + al.bus.PublishOutbound(bus.OutboundMessage{Channel: msg.Channel, ChatID: msg.ChatID, Action: "finalize", Content: response, ReplyToID: replyID}) } al.audit.Record(trigger, msg.Channel, msg.SessionKey, suppressed, err) al.appendTaskAudit(taskID, msg, started, err, suppressed) @@ -840,11 +846,15 @@ func (al *AgentLoop) processMessage(ctx context.Context, msg bus.InboundMessage) if time.Since(lastPush) < 450*time.Millisecond { return } + if !shouldFlushTelegramStreamSnapshot(streamText) { + return + } lastPush = time.Now() replyID := "" if msg.Metadata != nil { replyID = msg.Metadata["message_id"] } + // Stream with formatted rendering once snapshot is syntactically safe. al.bus.PublishOutbound(bus.OutboundMessage{Channel: msg.Channel, ChatID: msg.ChatID, Content: streamText, Action: "stream", ReplyToID: replyID}) al.markSessionStreamed(msg.SessionKey) }) @@ -1679,6 +1689,32 @@ func shouldDropNoReply(text string) bool { return strings.EqualFold(t, "NO_REPLY") } +func shouldFlushTelegramStreamSnapshot(s string) bool { + s = strings.TrimRight(s, " \t") + if s == "" { + return false + } + last, _ := utf8.DecodeLastRuneInString(s) + switch last { + case '\n', '。', '!', '?', '.', '!', '?', ';', ';', ':', ':': + default: + return false + } + // Avoid flushing while code fences are still unbalanced. + if strings.Count(s, "```")%2 == 1 { + return false + } + // Avoid flushing while common inline markdown markers are unbalanced. + if strings.Count(s, "**")%2 == 1 || strings.Count(s, "__")%2 == 1 || strings.Count(s, "~~")%2 == 1 { + return false + } + // Rough guard for links/images: require bracket balance before flushing. + if strings.Count(s, "[") != strings.Count(s, "]") || strings.Count(s, "(") != strings.Count(s, ")") { + return false + } + return true +} + func parseReplyTag(text string, currentMessageID string) (content string, replyToID string) { t := strings.TrimSpace(text) if !strings.HasPrefix(t, "[[") { diff --git a/pkg/agent/session_planner.go b/pkg/agent/session_planner.go index e332974..74d2cbf 100644 --- a/pkg/agent/session_planner.go +++ b/pkg/agent/session_planner.go @@ -1,14 +1,19 @@ package agent import ( + "bufio" "context" + "encoding/json" "fmt" + "os" + "path/filepath" "regexp" "sort" "strings" "sync" "clawgo/pkg/bus" + "clawgo/pkg/ekg" "clawgo/pkg/scheduling" ) @@ -121,7 +126,7 @@ func (al *AgentLoop) runPlannedTasks(ctx context.Context, msg bus.InboundMessage go func(index int, t plannedTask) { defer wg.Done() subMsg := msg - subMsg.Content = t.Content + subMsg.Content = al.enrichTaskContentWithMemoryAndEKG(ctx, t) subMsg.Metadata = cloneMetadata(msg.Metadata) if subMsg.Metadata == nil { subMsg.Metadata = map[string]string{} @@ -157,6 +162,165 @@ func (al *AgentLoop) runPlannedTasks(ctx context.Context, msg bus.InboundMessage return strings.TrimSpace(b.String()), nil } +func (al *AgentLoop) enrichTaskContentWithMemoryAndEKG(ctx context.Context, task plannedTask) string { + base := strings.TrimSpace(task.Content) + if base == "" { + return base + } + hints := make([]string, 0, 2) + if mem := al.memoryHintForTask(ctx, task); mem != "" { + hints = append(hints, "Memory:\n"+mem) + } + if risk := al.ekgHintForTask(task); risk != "" { + hints = append(hints, "EKG:\n"+risk) + } + if len(hints) == 0 { + return base + } + return strings.TrimSpace( + "Task Context (use it as constraints, avoid repeating known failures):\n" + + strings.Join(hints, "\n\n") + + "\n\nTask:\n" + base, + ) +} + +func (al *AgentLoop) memoryHintForTask(ctx context.Context, task plannedTask) string { + if al == nil || al.tools == nil { + return "" + } + res, err := al.tools.Execute(ctx, "memory_search", map[string]interface{}{ + "query": task.Content, + "maxResults": 2, + }) + if err != nil { + return "" + } + txt := strings.TrimSpace(res) + if txt == "" || strings.HasPrefix(strings.ToLower(txt), "no memory found") { + return "" + } + return truncate(txt, 1200) +} + +func (al *AgentLoop) ekgHintForTask(task plannedTask) string { + if al == nil || al.ekg == nil || strings.TrimSpace(al.workspace) == "" { + return "" + } + evt, ok := al.findRecentRelatedErrorEvent(task.Content) + if !ok { + return "" + } + errSig := ekg.NormalizeErrorSignature(evt.Log) + if errSig == "" { + return "" + } + advice := al.ekg.GetAdvice(ekg.SignalContext{ + TaskID: evt.TaskID, + ErrSig: errSig, + Source: evt.Source, + Channel: evt.Channel, + }) + if !advice.ShouldEscalate { + return "" + } + reasons := strings.Join(advice.Reason, ", ") + if strings.TrimSpace(reasons) == "" { + reasons = "repeated error signature" + } + return fmt.Sprintf("Related repeated error signature detected (%s). Suggested retry backoff: %ds. Last error: %s", + errSig, advice.RetryBackoffSec, truncate(strings.TrimSpace(evt.Log), 240)) +} + +type taskAuditErrorEvent struct { + TaskID string + Source string + Channel string + Log string + Preview string +} + +func (al *AgentLoop) findRecentRelatedErrorEvent(taskContent string) (taskAuditErrorEvent, bool) { + path := filepath.Join(strings.TrimSpace(al.workspace), "memory", "task-audit.jsonl") + f, err := os.Open(path) + if err != nil { + return taskAuditErrorEvent{}, false + } + defer f.Close() + + kw := tokenizeTaskText(taskContent) + if len(kw) == 0 { + return taskAuditErrorEvent{}, false + } + var best taskAuditErrorEvent + bestScore := 0 + + s := bufio.NewScanner(f) + for s.Scan() { + line := strings.TrimSpace(s.Text()) + if line == "" { + continue + } + var row map[string]interface{} + if json.Unmarshal([]byte(line), &row) != nil { + continue + } + if strings.ToLower(strings.TrimSpace(fmt.Sprintf("%v", row["status"]))) != "error" { + continue + } + logText := strings.TrimSpace(fmt.Sprintf("%v", row["log"])) + if logText == "" { + continue + } + preview := strings.TrimSpace(fmt.Sprintf("%v", row["input_preview"])) + score := overlapScore(kw, tokenizeTaskText(preview)) + if score < 1 || score < bestScore { + continue + } + bestScore = score + best = taskAuditErrorEvent{ + TaskID: strings.TrimSpace(fmt.Sprintf("%v", row["task_id"])), + Source: strings.TrimSpace(fmt.Sprintf("%v", row["source"])), + Channel: strings.TrimSpace(fmt.Sprintf("%v", row["channel"])), + Log: logText, + Preview: preview, + } + } + if bestScore == 0 || strings.TrimSpace(best.TaskID) == "" { + return taskAuditErrorEvent{}, false + } + return best, true +} + +func tokenizeTaskText(s string) []string { + normalized := strings.NewReplacer("\n", " ", "\t", " ", ",", " ", ",", " ", ".", " ", "。", " ", ":", " ", ":", " ", ";", " ", ";", " ").Replace(strings.ToLower(strings.TrimSpace(s))) + parts := strings.Fields(normalized) + out := make([]string, 0, len(parts)) + for _, p := range parts { + if len(p) < 3 { + continue + } + out = append(out, p) + } + return out +} + +func overlapScore(a, b []string) int { + if len(a) == 0 || len(b) == 0 { + return 0 + } + set := make(map[string]struct{}, len(a)) + for _, k := range a { + set[k] = struct{}{} + } + score := 0 + for _, k := range b { + if _, ok := set[k]; ok { + score++ + } + } + return score +} + func cloneMetadata(m map[string]string) map[string]string { if len(m) == 0 { return nil diff --git a/pkg/agent/session_planner_test.go b/pkg/agent/session_planner_test.go index 165bf2f..11059be 100644 --- a/pkg/agent/session_planner_test.go +++ b/pkg/agent/session_planner_test.go @@ -2,9 +2,12 @@ package agent import ( "context" + "os" + "path/filepath" "testing" "clawgo/pkg/bus" + "clawgo/pkg/ekg" "clawgo/pkg/providers" ) @@ -50,3 +53,27 @@ func TestProcessPlannedMessage_AggregatesResults(t *testing.T) { t.Fatalf("expected aggregate response") } } + +func TestFindRecentRelatedErrorEvent(t *testing.T) { + ws := filepath.Join(t.TempDir(), "workspace") + _ = os.MkdirAll(filepath.Join(ws, "memory"), 0o755) + line := `{"task_id":"t1","status":"error","log":"open /tmp/a.go failed","input_preview":"修复 pkg/a.go 的读取错误","source":"direct","channel":"cli"}` + if err := os.WriteFile(filepath.Join(ws, "memory", "task-audit.jsonl"), []byte(line+"\n"), 0o644); err != nil { + t.Fatalf("write audit: %v", err) + } + loop := &AgentLoop{workspace: ws, ekg: ekg.New(ws)} + loop.ekg.Record(ekg.Event{TaskID: "t1", Status: "error", Log: "open /tmp/a.go failed"}) + loop.ekg.Record(ekg.Event{TaskID: "t1", Status: "error", Log: "open /tmp/a.go failed"}) + loop.ekg.Record(ekg.Event{TaskID: "t1", Status: "error", Log: "open /tmp/a.go failed"}) + + ev, ok := loop.findRecentRelatedErrorEvent("请修复 pkg/a.go 的读取问题") + if !ok { + t.Fatalf("expected matched recent error event") + } + if ev.TaskID != "t1" { + t.Fatalf("unexpected task id: %s", ev.TaskID) + } + if hint := loop.ekgHintForTask(plannedTask{Content: "修复 pkg/a.go"}); hint == "" { + t.Fatalf("expected non-empty ekg hint") + } +} diff --git a/pkg/channels/telegram.go b/pkg/channels/telegram.go index 1773835..668946f 100644 --- a/pkg/channels/telegram.go +++ b/pkg/channels/telegram.go @@ -65,7 +65,7 @@ type telegramRenderedChunk struct { func (c *TelegramChannel) SupportsAction(action string) bool { switch strings.ToLower(strings.TrimSpace(action)) { - case "", "send", "edit", "delete", "react", "stream": + case "", "send", "edit", "delete", "react", "stream", "finalize": return true default: return false @@ -866,6 +866,8 @@ func renderTelegramStreamChunks(content string) []telegramRenderedChunk { parts = splitTelegramMarkdown(body, telegramStreamSplitMaxRunes) case telego.ModeHTML: parts = splitTelegramText(body, telegramSafeHTMLMaxRunes) + case "text": + parts = splitTelegramText(body, telegramStreamSplitMaxRunes) default: parts = splitTelegramMarkdown(body, telegramStreamSplitMaxRunes) mode = "auto_markdown" @@ -894,6 +896,14 @@ func renderTelegramStreamChunks(content string) []telegramRenderedChunk { if strings.TrimSpace(payload) != "" { out = append(out, telegramRenderedChunk{payload: payload, parseMode: telego.ModeMarkdownV2}) } + case "text": + payload := trimmed + if len([]rune(payload)) > telegramStreamSplitMaxRunes { + payload = splitTelegramText(payload, telegramStreamSplitMaxRunes)[0] + } + if strings.TrimSpace(payload) != "" { + out = append(out, telegramRenderedChunk{payload: payload, parseMode: ""}) + } default: payload := sanitizeTelegramHTML(markdownToTelegramHTML(trimmed)) if len([]rune(payload)) > telegramSafeHTMLMaxRunes { @@ -914,6 +924,8 @@ func detectTelegramStreamMode(content string) (mode string, body string) { return telego.ModeHTML, strings.TrimSpace(trimmed[len("[mode:html]"):]) case strings.HasPrefix(strings.ToLower(trimmed), "[mode:markdownv2]"): return telego.ModeMarkdownV2, strings.TrimSpace(trimmed[len("[mode:markdownv2]"):]) + case strings.HasPrefix(strings.ToLower(trimmed), "[mode:text]"): + return "text", strings.TrimSpace(trimmed[len("[mode:text]"):]) default: return "auto_markdown", content } @@ -1068,6 +1080,23 @@ func (c *TelegramChannel) handleAction(ctx context.Context, chatID int64, action return err case "stream": return c.handleStreamAction(ctx, chatID, msg) + case "finalize": + if strings.TrimSpace(msg.Content) != "" { + // Final pass in auto-markdown mode to recover rich formatting after plain streaming. + if err := c.handleStreamAction(ctx, chatID, bus.OutboundMessage{ + ChatID: msg.ChatID, + ReplyToID: msg.ReplyToID, + Content: msg.Content, + Action: "stream", + }); err != nil { + return err + } + } + streamKey := telegramStreamKey(chatID, msg.ReplyToID) + c.streamMu.Lock() + delete(c.streamState, streamKey) + c.streamMu.Unlock() + return nil case "delete": delCtx, cancel := withTelegramAPITimeout(ctx) defer cancel()