mirror of
https://github.com/YspCoder/clawgo.git
synced 2026-05-09 04:27:30 +08:00
Compare commits
4 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
09c06786a4 | ||
|
|
57961d2911 | ||
|
|
764fa94ced | ||
|
|
b007a403a1 |
@@ -1,13 +0,0 @@
|
||||
FROM golang:alpine AS builder
|
||||
WORKDIR /app
|
||||
COPY go.mod go.sum ./
|
||||
RUN go mod download
|
||||
COPY . .
|
||||
# Sync root workspace templates into embed path before build
|
||||
RUN rm -rf ./cmd/clawgo/workspace && mkdir -p ./cmd/clawgo/workspace && cp -a ./workspace/. ./cmd/clawgo/workspace/
|
||||
RUN go build -o clawgo ./cmd/clawgo
|
||||
|
||||
FROM alpine:latest
|
||||
WORKDIR /root/
|
||||
COPY --from=builder /app/clawgo .
|
||||
CMD ["./clawgo"]
|
||||
17
README.md
17
README.md
@@ -19,13 +19,28 @@
|
||||
|
||||
## 并发调度(新)⚙️
|
||||
|
||||
- 🧩 一条复合消息会先自动拆成多个子任务(可配置上限)
|
||||
- 🧩 一条复合消息会先自动拆成多个子任务
|
||||
- 🔀 同一会话内:无资源冲突的子任务可并发执行
|
||||
- 🔒 同一会话内:有资源冲突的子任务自动串行(避免互相踩)
|
||||
- 🏷️ 默认会自动推断 `resource_keys`,无需手动填写
|
||||
|
||||
---
|
||||
|
||||
## 记忆与 EKG 增强(新)🧠
|
||||
|
||||
- 📚 每个子任务执行前会自动检索相关记忆(`memory_search`)
|
||||
- 🚨 会结合 EKG 与任务审计识别“重复错误签名”风险
|
||||
- ⏱️ 高风险任务会自动附带重试退避建议,减少重复踩坑
|
||||
|
||||
---
|
||||
|
||||
## Telegram 流式渲染优化(新)💬
|
||||
|
||||
- 🧷 只在语法安全断点刷新流式内容(减少格式抖动)
|
||||
- ✅ 结束时做一次最终收敛渲染,保证排版稳定
|
||||
|
||||
---
|
||||
|
||||
## 3 分钟上手 ⚡
|
||||
|
||||
### 1) 安装
|
||||
|
||||
17
README_EN.md
17
README_EN.md
@@ -19,13 +19,28 @@ A long-running AI Agent written in Go: lightweight, auditable, and multi-channel
|
||||
|
||||
## Concurrency Scheduling (New) ⚙️
|
||||
|
||||
- 🧩 A composite message is auto-split into sub-tasks (configurable max)
|
||||
- 🧩 A composite message is automatically split into sub-tasks
|
||||
- 🔀 Within the same session: non-conflicting tasks run in parallel
|
||||
- 🔒 Within the same session: conflicting tasks run serially (to avoid collisions)
|
||||
- 🏷️ `resource_keys` are inferred automatically by default (no manual input needed)
|
||||
|
||||
---
|
||||
|
||||
## Memory + EKG Enhancements (New) 🧠
|
||||
|
||||
- 📚 Before each sub-task runs, related memory is fetched via `memory_search`
|
||||
- 🚨 EKG + task-audit are used to detect repeated error-signature risks
|
||||
- ⏱️ High-risk tasks include retry-backoff hints to reduce repeated failures
|
||||
|
||||
---
|
||||
|
||||
## Telegram Streaming Rendering (New) 💬
|
||||
|
||||
- 🧷 Stream flushes happen at syntax-safe boundaries to reduce formatting jitter
|
||||
- ✅ A final convergence render is applied when streaming ends for stable layout
|
||||
|
||||
---
|
||||
|
||||
## 3-Minute Quick Start ⚡
|
||||
|
||||
### 1) Install
|
||||
|
||||
@@ -1,68 +0,0 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"clawgo/pkg/bus"
|
||||
"clawgo/pkg/cron"
|
||||
)
|
||||
|
||||
func TestNormalizeCronTargetChatID(t *testing.T) {
|
||||
if got := normalizeCronTargetChatID("telegram", "telegram:12345"); got != "12345" {
|
||||
t.Fatalf("expected 12345, got %q", got)
|
||||
}
|
||||
if got := normalizeCronTargetChatID("telegram", "12345"); got != "12345" {
|
||||
t.Fatalf("expected unchanged chat id, got %q", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestDispatchCronJob_DeliversTargetedMessageEvenWhenDeliverFalse(t *testing.T) {
|
||||
mb := bus.NewMessageBus()
|
||||
defer mb.Close()
|
||||
|
||||
status := dispatchCronJob(mb, &cron.CronJob{
|
||||
ID: "job-1",
|
||||
Payload: cron.CronPayload{
|
||||
Message: "time to sleep",
|
||||
Deliver: false,
|
||||
Channel: "telegram",
|
||||
To: "telegram:5988738763",
|
||||
},
|
||||
})
|
||||
|
||||
if status != "delivered_targeted" {
|
||||
t.Fatalf("unexpected status: %s", status)
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
|
||||
defer cancel()
|
||||
out, ok := mb.SubscribeOutbound(ctx)
|
||||
if !ok {
|
||||
t.Fatal("expected outbound message")
|
||||
}
|
||||
if out.Channel != "telegram" || out.ChatID != "5988738763" || out.Content != "time to sleep" {
|
||||
t.Fatalf("unexpected outbound: %#v", out)
|
||||
}
|
||||
}
|
||||
|
||||
func TestDispatchCronJob_FallsBackToSystemInboundWithoutTarget(t *testing.T) {
|
||||
mb := bus.NewMessageBus()
|
||||
defer mb.Close()
|
||||
|
||||
status := dispatchCronJob(mb, &cron.CronJob{ID: "job-2", Payload: cron.CronPayload{Message: "tick"}})
|
||||
if status != "scheduled" {
|
||||
t.Fatalf("unexpected status: %s", status)
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
|
||||
defer cancel()
|
||||
in, ok := mb.ConsumeInbound(ctx)
|
||||
if !ok {
|
||||
t.Fatal("expected inbound message")
|
||||
}
|
||||
if in.Channel != "system" || in.ChatID != "internal:cron" {
|
||||
t.Fatalf("unexpected inbound: %#v", in)
|
||||
}
|
||||
}
|
||||
@@ -19,6 +19,7 @@ import (
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
"unicode/utf8"
|
||||
|
||||
"clawgo/pkg/bus"
|
||||
"clawgo/pkg/config"
|
||||
@@ -452,7 +453,12 @@ func (al *AgentLoop) processInbound(ctx context.Context, msg bus.InboundMessage)
|
||||
}
|
||||
}
|
||||
if msg.Channel == "telegram" && suppressed {
|
||||
al.bus.PublishOutbound(bus.OutboundMessage{Channel: msg.Channel, ChatID: msg.ChatID, Action: "finalize"})
|
||||
replyID := ""
|
||||
if msg.Metadata != nil {
|
||||
replyID = msg.Metadata["message_id"]
|
||||
}
|
||||
// Final pass uses full formatted content to stabilize rendering after plain streaming.
|
||||
al.bus.PublishOutbound(bus.OutboundMessage{Channel: msg.Channel, ChatID: msg.ChatID, Action: "finalize", Content: response, ReplyToID: replyID})
|
||||
}
|
||||
al.audit.Record(trigger, msg.Channel, msg.SessionKey, suppressed, err)
|
||||
al.appendTaskAudit(taskID, msg, started, err, suppressed)
|
||||
@@ -840,11 +846,15 @@ func (al *AgentLoop) processMessage(ctx context.Context, msg bus.InboundMessage)
|
||||
if time.Since(lastPush) < 450*time.Millisecond {
|
||||
return
|
||||
}
|
||||
if !shouldFlushTelegramStreamSnapshot(streamText) {
|
||||
return
|
||||
}
|
||||
lastPush = time.Now()
|
||||
replyID := ""
|
||||
if msg.Metadata != nil {
|
||||
replyID = msg.Metadata["message_id"]
|
||||
}
|
||||
// Stream with formatted rendering once snapshot is syntactically safe.
|
||||
al.bus.PublishOutbound(bus.OutboundMessage{Channel: msg.Channel, ChatID: msg.ChatID, Content: streamText, Action: "stream", ReplyToID: replyID})
|
||||
al.markSessionStreamed(msg.SessionKey)
|
||||
})
|
||||
@@ -1679,6 +1689,32 @@ func shouldDropNoReply(text string) bool {
|
||||
return strings.EqualFold(t, "NO_REPLY")
|
||||
}
|
||||
|
||||
func shouldFlushTelegramStreamSnapshot(s string) bool {
|
||||
s = strings.TrimRight(s, " \t")
|
||||
if s == "" {
|
||||
return false
|
||||
}
|
||||
last, _ := utf8.DecodeLastRuneInString(s)
|
||||
switch last {
|
||||
case '\n', '。', '!', '?', '.', '!', '?', ';', ';', ':', ':':
|
||||
default:
|
||||
return false
|
||||
}
|
||||
// Avoid flushing while code fences are still unbalanced.
|
||||
if strings.Count(s, "```")%2 == 1 {
|
||||
return false
|
||||
}
|
||||
// Avoid flushing while common inline markdown markers are unbalanced.
|
||||
if strings.Count(s, "**")%2 == 1 || strings.Count(s, "__")%2 == 1 || strings.Count(s, "~~")%2 == 1 {
|
||||
return false
|
||||
}
|
||||
// Rough guard for links/images: require bracket balance before flushing.
|
||||
if strings.Count(s, "[") != strings.Count(s, "]") || strings.Count(s, "(") != strings.Count(s, ")") {
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func parseReplyTag(text string, currentMessageID string) (content string, replyToID string) {
|
||||
t := strings.TrimSpace(text)
|
||||
if !strings.HasPrefix(t, "[[") {
|
||||
@@ -1724,7 +1760,8 @@ func rewriteSystemMessageContent(content, template string) string {
|
||||
func alSessionListForTool(sm *session.SessionManager, limit int) []tools.SessionInfo {
|
||||
items := sm.List(limit)
|
||||
out := make([]tools.SessionInfo, 0, len(items))
|
||||
for _, s := range items {
|
||||
for i := range items {
|
||||
s := &items[i]
|
||||
out = append(out, tools.SessionInfo{
|
||||
Key: s.Key,
|
||||
Kind: s.Kind,
|
||||
|
||||
@@ -1,14 +1,19 @@
|
||||
package agent
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"regexp"
|
||||
"sort"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"clawgo/pkg/bus"
|
||||
"clawgo/pkg/ekg"
|
||||
"clawgo/pkg/scheduling"
|
||||
)
|
||||
|
||||
@@ -121,7 +126,7 @@ func (al *AgentLoop) runPlannedTasks(ctx context.Context, msg bus.InboundMessage
|
||||
go func(index int, t plannedTask) {
|
||||
defer wg.Done()
|
||||
subMsg := msg
|
||||
subMsg.Content = t.Content
|
||||
subMsg.Content = al.enrichTaskContentWithMemoryAndEKG(ctx, t)
|
||||
subMsg.Metadata = cloneMetadata(msg.Metadata)
|
||||
if subMsg.Metadata == nil {
|
||||
subMsg.Metadata = map[string]string{}
|
||||
@@ -157,6 +162,165 @@ func (al *AgentLoop) runPlannedTasks(ctx context.Context, msg bus.InboundMessage
|
||||
return strings.TrimSpace(b.String()), nil
|
||||
}
|
||||
|
||||
func (al *AgentLoop) enrichTaskContentWithMemoryAndEKG(ctx context.Context, task plannedTask) string {
|
||||
base := strings.TrimSpace(task.Content)
|
||||
if base == "" {
|
||||
return base
|
||||
}
|
||||
hints := make([]string, 0, 2)
|
||||
if mem := al.memoryHintForTask(ctx, task); mem != "" {
|
||||
hints = append(hints, "Memory:\n"+mem)
|
||||
}
|
||||
if risk := al.ekgHintForTask(task); risk != "" {
|
||||
hints = append(hints, "EKG:\n"+risk)
|
||||
}
|
||||
if len(hints) == 0 {
|
||||
return base
|
||||
}
|
||||
return strings.TrimSpace(
|
||||
"Task Context (use it as constraints, avoid repeating known failures):\n" +
|
||||
strings.Join(hints, "\n\n") +
|
||||
"\n\nTask:\n" + base,
|
||||
)
|
||||
}
|
||||
|
||||
func (al *AgentLoop) memoryHintForTask(ctx context.Context, task plannedTask) string {
|
||||
if al == nil || al.tools == nil {
|
||||
return ""
|
||||
}
|
||||
res, err := al.tools.Execute(ctx, "memory_search", map[string]interface{}{
|
||||
"query": task.Content,
|
||||
"maxResults": 2,
|
||||
})
|
||||
if err != nil {
|
||||
return ""
|
||||
}
|
||||
txt := strings.TrimSpace(res)
|
||||
if txt == "" || strings.HasPrefix(strings.ToLower(txt), "no memory found") {
|
||||
return ""
|
||||
}
|
||||
return truncate(txt, 1200)
|
||||
}
|
||||
|
||||
func (al *AgentLoop) ekgHintForTask(task plannedTask) string {
|
||||
if al == nil || al.ekg == nil || strings.TrimSpace(al.workspace) == "" {
|
||||
return ""
|
||||
}
|
||||
evt, ok := al.findRecentRelatedErrorEvent(task.Content)
|
||||
if !ok {
|
||||
return ""
|
||||
}
|
||||
errSig := ekg.NormalizeErrorSignature(evt.Log)
|
||||
if errSig == "" {
|
||||
return ""
|
||||
}
|
||||
advice := al.ekg.GetAdvice(ekg.SignalContext{
|
||||
TaskID: evt.TaskID,
|
||||
ErrSig: errSig,
|
||||
Source: evt.Source,
|
||||
Channel: evt.Channel,
|
||||
})
|
||||
if !advice.ShouldEscalate {
|
||||
return ""
|
||||
}
|
||||
reasons := strings.Join(advice.Reason, ", ")
|
||||
if strings.TrimSpace(reasons) == "" {
|
||||
reasons = "repeated error signature"
|
||||
}
|
||||
return fmt.Sprintf("Related repeated error signature detected (%s). Suggested retry backoff: %ds. Last error: %s",
|
||||
errSig, advice.RetryBackoffSec, truncate(strings.TrimSpace(evt.Log), 240))
|
||||
}
|
||||
|
||||
type taskAuditErrorEvent struct {
|
||||
TaskID string
|
||||
Source string
|
||||
Channel string
|
||||
Log string
|
||||
Preview string
|
||||
}
|
||||
|
||||
func (al *AgentLoop) findRecentRelatedErrorEvent(taskContent string) (taskAuditErrorEvent, bool) {
|
||||
path := filepath.Join(strings.TrimSpace(al.workspace), "memory", "task-audit.jsonl")
|
||||
f, err := os.Open(path)
|
||||
if err != nil {
|
||||
return taskAuditErrorEvent{}, false
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
kw := tokenizeTaskText(taskContent)
|
||||
if len(kw) == 0 {
|
||||
return taskAuditErrorEvent{}, false
|
||||
}
|
||||
var best taskAuditErrorEvent
|
||||
bestScore := 0
|
||||
|
||||
s := bufio.NewScanner(f)
|
||||
for s.Scan() {
|
||||
line := strings.TrimSpace(s.Text())
|
||||
if line == "" {
|
||||
continue
|
||||
}
|
||||
var row map[string]interface{}
|
||||
if json.Unmarshal([]byte(line), &row) != nil {
|
||||
continue
|
||||
}
|
||||
if strings.ToLower(strings.TrimSpace(fmt.Sprintf("%v", row["status"]))) != "error" {
|
||||
continue
|
||||
}
|
||||
logText := strings.TrimSpace(fmt.Sprintf("%v", row["log"]))
|
||||
if logText == "" {
|
||||
continue
|
||||
}
|
||||
preview := strings.TrimSpace(fmt.Sprintf("%v", row["input_preview"]))
|
||||
score := overlapScore(kw, tokenizeTaskText(preview))
|
||||
if score < 1 || score < bestScore {
|
||||
continue
|
||||
}
|
||||
bestScore = score
|
||||
best = taskAuditErrorEvent{
|
||||
TaskID: strings.TrimSpace(fmt.Sprintf("%v", row["task_id"])),
|
||||
Source: strings.TrimSpace(fmt.Sprintf("%v", row["source"])),
|
||||
Channel: strings.TrimSpace(fmt.Sprintf("%v", row["channel"])),
|
||||
Log: logText,
|
||||
Preview: preview,
|
||||
}
|
||||
}
|
||||
if bestScore == 0 || strings.TrimSpace(best.TaskID) == "" {
|
||||
return taskAuditErrorEvent{}, false
|
||||
}
|
||||
return best, true
|
||||
}
|
||||
|
||||
func tokenizeTaskText(s string) []string {
|
||||
normalized := strings.NewReplacer("\n", " ", "\t", " ", ",", " ", ",", " ", ".", " ", "。", " ", ":", " ", ":", " ", ";", " ", ";", " ").Replace(strings.ToLower(strings.TrimSpace(s)))
|
||||
parts := strings.Fields(normalized)
|
||||
out := make([]string, 0, len(parts))
|
||||
for _, p := range parts {
|
||||
if len(p) < 3 {
|
||||
continue
|
||||
}
|
||||
out = append(out, p)
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
func overlapScore(a, b []string) int {
|
||||
if len(a) == 0 || len(b) == 0 {
|
||||
return 0
|
||||
}
|
||||
set := make(map[string]struct{}, len(a))
|
||||
for _, k := range a {
|
||||
set[k] = struct{}{}
|
||||
}
|
||||
score := 0
|
||||
for _, k := range b {
|
||||
if _, ok := set[k]; ok {
|
||||
score++
|
||||
}
|
||||
}
|
||||
return score
|
||||
}
|
||||
|
||||
func cloneMetadata(m map[string]string) map[string]string {
|
||||
if len(m) == 0 {
|
||||
return nil
|
||||
|
||||
@@ -2,9 +2,12 @@ package agent
|
||||
|
||||
import (
|
||||
"context"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
|
||||
"clawgo/pkg/bus"
|
||||
"clawgo/pkg/ekg"
|
||||
"clawgo/pkg/providers"
|
||||
)
|
||||
|
||||
@@ -50,3 +53,27 @@ func TestProcessPlannedMessage_AggregatesResults(t *testing.T) {
|
||||
t.Fatalf("expected aggregate response")
|
||||
}
|
||||
}
|
||||
|
||||
func TestFindRecentRelatedErrorEvent(t *testing.T) {
|
||||
ws := filepath.Join(t.TempDir(), "workspace")
|
||||
_ = os.MkdirAll(filepath.Join(ws, "memory"), 0o755)
|
||||
line := `{"task_id":"t1","status":"error","log":"open /tmp/a.go failed","input_preview":"修复 pkg/a.go 的读取错误","source":"direct","channel":"cli"}`
|
||||
if err := os.WriteFile(filepath.Join(ws, "memory", "task-audit.jsonl"), []byte(line+"\n"), 0o644); err != nil {
|
||||
t.Fatalf("write audit: %v", err)
|
||||
}
|
||||
loop := &AgentLoop{workspace: ws, ekg: ekg.New(ws)}
|
||||
loop.ekg.Record(ekg.Event{TaskID: "t1", Status: "error", Log: "open /tmp/a.go failed"})
|
||||
loop.ekg.Record(ekg.Event{TaskID: "t1", Status: "error", Log: "open /tmp/a.go failed"})
|
||||
loop.ekg.Record(ekg.Event{TaskID: "t1", Status: "error", Log: "open /tmp/a.go failed"})
|
||||
|
||||
ev, ok := loop.findRecentRelatedErrorEvent("请修复 pkg/a.go 的读取问题")
|
||||
if !ok {
|
||||
t.Fatalf("expected matched recent error event")
|
||||
}
|
||||
if ev.TaskID != "t1" {
|
||||
t.Fatalf("unexpected task id: %s", ev.TaskID)
|
||||
}
|
||||
if hint := loop.ekgHintForTask(plannedTask{Content: "修复 pkg/a.go"}); hint == "" {
|
||||
t.Fatalf("expected non-empty ekg hint")
|
||||
}
|
||||
}
|
||||
|
||||
@@ -65,7 +65,7 @@ type telegramRenderedChunk struct {
|
||||
|
||||
func (c *TelegramChannel) SupportsAction(action string) bool {
|
||||
switch strings.ToLower(strings.TrimSpace(action)) {
|
||||
case "", "send", "edit", "delete", "react", "stream":
|
||||
case "", "send", "edit", "delete", "react", "stream", "finalize":
|
||||
return true
|
||||
default:
|
||||
return false
|
||||
@@ -866,6 +866,8 @@ func renderTelegramStreamChunks(content string) []telegramRenderedChunk {
|
||||
parts = splitTelegramMarkdown(body, telegramStreamSplitMaxRunes)
|
||||
case telego.ModeHTML:
|
||||
parts = splitTelegramText(body, telegramSafeHTMLMaxRunes)
|
||||
case "text":
|
||||
parts = splitTelegramText(body, telegramStreamSplitMaxRunes)
|
||||
default:
|
||||
parts = splitTelegramMarkdown(body, telegramStreamSplitMaxRunes)
|
||||
mode = "auto_markdown"
|
||||
@@ -894,6 +896,14 @@ func renderTelegramStreamChunks(content string) []telegramRenderedChunk {
|
||||
if strings.TrimSpace(payload) != "" {
|
||||
out = append(out, telegramRenderedChunk{payload: payload, parseMode: telego.ModeMarkdownV2})
|
||||
}
|
||||
case "text":
|
||||
payload := trimmed
|
||||
if len([]rune(payload)) > telegramStreamSplitMaxRunes {
|
||||
payload = splitTelegramText(payload, telegramStreamSplitMaxRunes)[0]
|
||||
}
|
||||
if strings.TrimSpace(payload) != "" {
|
||||
out = append(out, telegramRenderedChunk{payload: payload, parseMode: ""})
|
||||
}
|
||||
default:
|
||||
payload := sanitizeTelegramHTML(markdownToTelegramHTML(trimmed))
|
||||
if len([]rune(payload)) > telegramSafeHTMLMaxRunes {
|
||||
@@ -914,6 +924,8 @@ func detectTelegramStreamMode(content string) (mode string, body string) {
|
||||
return telego.ModeHTML, strings.TrimSpace(trimmed[len("[mode:html]"):])
|
||||
case strings.HasPrefix(strings.ToLower(trimmed), "[mode:markdownv2]"):
|
||||
return telego.ModeMarkdownV2, strings.TrimSpace(trimmed[len("[mode:markdownv2]"):])
|
||||
case strings.HasPrefix(strings.ToLower(trimmed), "[mode:text]"):
|
||||
return "text", strings.TrimSpace(trimmed[len("[mode:text]"):])
|
||||
default:
|
||||
return "auto_markdown", content
|
||||
}
|
||||
@@ -1056,7 +1068,7 @@ func (c *TelegramChannel) deleteTelegramMessageWithRetry(ctx context.Context, ch
|
||||
|
||||
func (c *TelegramChannel) handleAction(ctx context.Context, chatID int64, action string, msg bus.OutboundMessage) error {
|
||||
messageID, ok := parseTelegramMessageID(msg.MessageID)
|
||||
if !ok && action != "send" && action != "stream" {
|
||||
if !ok && action != "send" && action != "stream" && action != "finalize" {
|
||||
return fmt.Errorf("message_id required for action=%s", action)
|
||||
}
|
||||
switch action {
|
||||
@@ -1068,6 +1080,23 @@ func (c *TelegramChannel) handleAction(ctx context.Context, chatID int64, action
|
||||
return err
|
||||
case "stream":
|
||||
return c.handleStreamAction(ctx, chatID, msg)
|
||||
case "finalize":
|
||||
if strings.TrimSpace(msg.Content) != "" {
|
||||
// Final pass in auto-markdown mode to recover rich formatting after plain streaming.
|
||||
if err := c.handleStreamAction(ctx, chatID, bus.OutboundMessage{
|
||||
ChatID: msg.ChatID,
|
||||
ReplyToID: msg.ReplyToID,
|
||||
Content: msg.Content,
|
||||
Action: "stream",
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
streamKey := telegramStreamKey(chatID, msg.ReplyToID)
|
||||
c.streamMu.Lock()
|
||||
delete(c.streamState, streamKey)
|
||||
c.streamMu.Unlock()
|
||||
return nil
|
||||
case "delete":
|
||||
delCtx, cancel := withTelegramAPITimeout(ctx)
|
||||
defer cancel()
|
||||
|
||||
Reference in New Issue
Block a user