p0 reliability: add content-hash inbound dedupe, cache ekg stats parsing, and collapse task-queue attempts/log noise

This commit is contained in:
DBT
2026-03-01 11:14:48 +00:00
parent 048d781818
commit b90cc5c40a
3 changed files with 92 additions and 31 deletions

View File

@@ -3,6 +3,7 @@ package channels
import (
"context"
"fmt"
"hash/fnv"
"path/filepath"
"strings"
"sync"
@@ -83,29 +84,35 @@ func (c *BaseChannel) IsAllowed(senderID string) bool {
return false
}
func (c *BaseChannel) isDuplicateInboundMessage(messageID string) bool {
messageID = strings.TrimSpace(messageID)
if messageID == "" {
func (c *BaseChannel) seenRecently(key string, ttl time.Duration) bool {
key = strings.TrimSpace(key)
if key == "" {
return false
}
now := time.Now()
const ttl = 10 * time.Minute
c.recentMsgMu.Lock()
defer c.recentMsgMu.Unlock()
for id, ts := range c.recentMsg {
if now.Sub(ts) > ttl {
if now.Sub(ts) > 10*time.Minute {
delete(c.recentMsg, id)
}
}
if ts, ok := c.recentMsg[messageID]; ok {
if ts, ok := c.recentMsg[key]; ok {
if now.Sub(ts) <= ttl {
return true
}
}
c.recentMsg[messageID] = now
c.recentMsg[key] = now
return false
}
func messageDigest(s string) string {
s = strings.ToLower(strings.TrimSpace(s))
h := fnv.New32a()
_, _ = h.Write([]byte(s))
return fmt.Sprintf("%08x", h.Sum32())
}
func (c *BaseChannel) HandleMessage(senderID, chatID, content string, media []string, metadata map[string]string) {
if !c.IsAllowed(senderID) {
logger.WarnCF("channels", "Message rejected by allowlist", map[string]interface{}{
@@ -118,7 +125,7 @@ func (c *BaseChannel) HandleMessage(senderID, chatID, content string, media []st
if metadata != nil {
if messageID := strings.TrimSpace(metadata["message_id"]); messageID != "" {
if c.isDuplicateInboundMessage(c.name + ":" + messageID) {
if c.seenRecently(c.name+":"+messageID, 10*time.Minute) {
logger.WarnCF("channels", "Duplicate inbound message skipped", map[string]interface{}{
logger.FieldChannel: c.name,
"message_id": messageID,
@@ -128,6 +135,15 @@ func (c *BaseChannel) HandleMessage(senderID, chatID, content string, media []st
}
}
}
// Fallback dedupe when platform omits/changes message_id (short window, same sender/chat/content).
contentKey := c.name + ":content:" + chatID + ":" + senderID + ":" + messageDigest(content)
if c.seenRecently(contentKey, 12*time.Second) {
logger.WarnCF("channels", "Duplicate inbound content skipped", map[string]interface{}{
logger.FieldChannel: c.name,
logger.FieldChatID: chatID,
})
return
}
// Build session key: channel:chatID
sessionKey := fmt.Sprintf("%s:%s", c.name, chatID)

View File

@@ -18,6 +18,7 @@ import (
"sort"
"strconv"
"strings"
"sync"
"syscall"
"time"
@@ -37,6 +38,11 @@ type RegistryServer struct {
onConfigAfter func()
onCron func(action string, args map[string]interface{}) (interface{}, error)
webUIDir string
ekgCacheMu sync.Mutex
ekgCachePath string
ekgCacheStamp time.Time
ekgCacheSize int64
ekgCacheRows []map[string]interface{}
}
func NewRegistryServer(host string, port int, token string, mgr *Manager) *RegistryServer {
@@ -1504,8 +1510,9 @@ func (s *RegistryServer) handleWebUITaskQueue(w http.ResponseWriter, r *http.Req
}
lines := strings.Split(string(b), "\n")
type agg struct {
Last map[string]interface{}
Logs []string
Last map[string]interface{}
Logs []string
Attempts int
}
m := map[string]*agg{}
for _, ln := range lines {
@@ -1525,11 +1532,18 @@ func (s *RegistryServer) handleWebUITaskQueue(w http.ResponseWriter, r *http.Req
continue
}
if _, ok := m[id]; !ok {
m[id] = &agg{Last: row, Logs: []string{}}
m[id] = &agg{Last: row, Logs: []string{}, Attempts: 0}
}
m[id].Last = row
if lg := fmt.Sprintf("%v", row["log"]); lg != "" {
m[id].Logs = append(m[id].Logs, lg)
a := m[id]
a.Last = row
a.Attempts++
if lg := strings.TrimSpace(fmt.Sprintf("%v", row["log"])); lg != "" {
if len(a.Logs) == 0 || a.Logs[len(a.Logs)-1] != lg {
a.Logs = append(a.Logs, lg)
if len(a.Logs) > 20 {
a.Logs = a.Logs[len(a.Logs)-20:]
}
}
}
}
items := make([]map[string]interface{}, 0, len(m))
@@ -1537,6 +1551,7 @@ func (s *RegistryServer) handleWebUITaskQueue(w http.ResponseWriter, r *http.Req
for _, a := range m {
row := a.Last
row["logs"] = a.Logs
row["attempts"] = a.Attempts
items = append(items, row)
if fmt.Sprintf("%v", row["status"]) == "running" {
running = append(running, row)
@@ -1581,6 +1596,7 @@ func (s *RegistryServer) handleWebUITaskQueue(w http.ResponseWriter, r *http.Req
}
}
sort.Slice(items, func(i, j int) bool { return fmt.Sprintf("%v", items[i]["time"]) > fmt.Sprintf("%v", items[j]["time"]) })
stats := map[string]int{"total": len(items), "running": len(running), "idle_round_budget": 0, "active_user": 0, "manual_pause": 0}
for _, it := range items {
reason := fmt.Sprintf("%v", it["block_reason"])
@@ -1628,6 +1644,48 @@ func (s *RegistryServer) handleWebUITaskDailySummary(w http.ResponseWriter, r *h
_ = json.NewEncoder(w).Encode(map[string]interface{}{"ok": true, "date": date, "report": report})
}
func (s *RegistryServer) loadEKGRowsCached(path string, maxLines int) []map[string]interface{} {
path = strings.TrimSpace(path)
if path == "" {
return nil
}
fi, err := os.Stat(path)
if err != nil {
return nil
}
s.ekgCacheMu.Lock()
defer s.ekgCacheMu.Unlock()
if s.ekgCachePath == path && s.ekgCacheSize == fi.Size() && s.ekgCacheStamp.Equal(fi.ModTime()) && len(s.ekgCacheRows) > 0 {
return s.ekgCacheRows
}
b, err := os.ReadFile(path)
if err != nil {
return nil
}
lines := strings.Split(string(b), "\n")
if len(lines) > 0 && lines[len(lines)-1] == "" {
lines = lines[:len(lines)-1]
}
if maxLines > 0 && len(lines) > maxLines {
lines = lines[len(lines)-maxLines:]
}
rows := make([]map[string]interface{}, 0, len(lines))
for _, ln := range lines {
if strings.TrimSpace(ln) == "" {
continue
}
var row map[string]interface{}
if json.Unmarshal([]byte(ln), &row) == nil {
rows = append(rows, row)
}
}
s.ekgCachePath = path
s.ekgCacheSize = fi.Size()
s.ekgCacheStamp = fi.ModTime()
s.ekgCacheRows = rows
return rows
}
func (s *RegistryServer) handleWebUIEKGStats(w http.ResponseWriter, r *http.Request) {
if !s.checkAuth(r) {
http.Error(w, "unauthorized", http.StatusUnauthorized)
@@ -1654,14 +1712,7 @@ func (s *RegistryServer) handleWebUIEKGStats(w http.ResponseWriter, r *http.Requ
selectedWindow = "24h"
}
cutoff := time.Now().UTC().Add(-windowDur)
b, _ := os.ReadFile(ekgPath)
lines := strings.Split(string(b), "\n")
if len(lines) > 0 && lines[len(lines)-1] == "" {
lines = lines[:len(lines)-1]
}
if len(lines) > 3000 {
lines = lines[len(lines)-3000:]
}
rows := s.loadEKGRowsCached(ekgPath, 3000)
type kv struct {
Key string `json:"key"`
Score float64 `json:"score,omitempty"`
@@ -1674,14 +1725,7 @@ func (s *RegistryServer) handleWebUIEKGStats(w http.ResponseWriter, r *http.Requ
errSigWorkload := map[string]int{}
sourceStats := map[string]int{}
channelStats := map[string]int{}
for _, ln := range lines {
if strings.TrimSpace(ln) == "" {
continue
}
var row map[string]interface{}
if json.Unmarshal([]byte(ln), &row) != nil {
continue
}
for _, row := range rows {
ts := strings.TrimSpace(fmt.Sprintf("%v", row["time"]))
if ts != "" {
if tm, err := time.Parse(time.RFC3339, ts); err == nil {

View File

@@ -17,6 +17,7 @@ type TaskAuditItem = {
last_pause_at?: string;
duration_ms?: number;
retry_count?: number;
attempts?: number;
error?: string;
provider?: string;
model?: string;
@@ -155,7 +156,7 @@ const TaskAudit: React.FC = () => {
className={`w-full text-left px-3 py-2 border-b border-zinc-800/60 hover:bg-zinc-800/40 ${active ? 'bg-indigo-500/15' : ''}`}
>
<div className="text-sm font-medium text-zinc-100 truncate">{it.task_id || `task-${idx + 1}`}</div>
<div className="text-xs text-zinc-400 truncate">{it.channel || '-'} · {it.status} · {it.duration_ms || 0}ms · retry:{it.retry_count || 0} · {it.source || '-'} · {it.provider || '-'} / {it.model || '-'}</div>
<div className="text-xs text-zinc-400 truncate">{it.channel || '-'} · {it.status} · attempts:{it.attempts || 1} · {it.duration_ms || 0}ms · retry:{it.retry_count || 0} · {it.source || '-'} · {it.provider || '-'} / {it.model || '-'}</div>
<div className="text-[11px] text-zinc-500 truncate">{it.time}</div>
</button>
);