diff --git a/pkg/channels/base.go b/pkg/channels/base.go index e18212f..c988f33 100644 --- a/pkg/channels/base.go +++ b/pkg/channels/base.go @@ -3,6 +3,7 @@ package channels import ( "context" "fmt" + "hash/fnv" "path/filepath" "strings" "sync" @@ -83,29 +84,35 @@ func (c *BaseChannel) IsAllowed(senderID string) bool { return false } -func (c *BaseChannel) isDuplicateInboundMessage(messageID string) bool { - messageID = strings.TrimSpace(messageID) - if messageID == "" { +func (c *BaseChannel) seenRecently(key string, ttl time.Duration) bool { + key = strings.TrimSpace(key) + if key == "" { return false } now := time.Now() - const ttl = 10 * time.Minute c.recentMsgMu.Lock() defer c.recentMsgMu.Unlock() for id, ts := range c.recentMsg { - if now.Sub(ts) > ttl { + if now.Sub(ts) > 10*time.Minute { delete(c.recentMsg, id) } } - if ts, ok := c.recentMsg[messageID]; ok { + if ts, ok := c.recentMsg[key]; ok { if now.Sub(ts) <= ttl { return true } } - c.recentMsg[messageID] = now + c.recentMsg[key] = now return false } +func messageDigest(s string) string { + s = strings.ToLower(strings.TrimSpace(s)) + h := fnv.New32a() + _, _ = h.Write([]byte(s)) + return fmt.Sprintf("%08x", h.Sum32()) +} + func (c *BaseChannel) HandleMessage(senderID, chatID, content string, media []string, metadata map[string]string) { if !c.IsAllowed(senderID) { logger.WarnCF("channels", "Message rejected by allowlist", map[string]interface{}{ @@ -118,7 +125,7 @@ func (c *BaseChannel) HandleMessage(senderID, chatID, content string, media []st if metadata != nil { if messageID := strings.TrimSpace(metadata["message_id"]); messageID != "" { - if c.isDuplicateInboundMessage(c.name + ":" + messageID) { + if c.seenRecently(c.name+":"+messageID, 10*time.Minute) { logger.WarnCF("channels", "Duplicate inbound message skipped", map[string]interface{}{ logger.FieldChannel: c.name, "message_id": messageID, @@ -128,6 +135,15 @@ func (c *BaseChannel) HandleMessage(senderID, chatID, content string, media []st } } } + // Fallback dedupe when platform omits/changes message_id (short window, same sender/chat/content). + contentKey := c.name + ":content:" + chatID + ":" + senderID + ":" + messageDigest(content) + if c.seenRecently(contentKey, 12*time.Second) { + logger.WarnCF("channels", "Duplicate inbound content skipped", map[string]interface{}{ + logger.FieldChannel: c.name, + logger.FieldChatID: chatID, + }) + return + } // Build session key: channel:chatID sessionKey := fmt.Sprintf("%s:%s", c.name, chatID) diff --git a/pkg/nodes/registry_server.go b/pkg/nodes/registry_server.go index 83f25b4..b92238f 100644 --- a/pkg/nodes/registry_server.go +++ b/pkg/nodes/registry_server.go @@ -18,6 +18,7 @@ import ( "sort" "strconv" "strings" + "sync" "syscall" "time" @@ -37,6 +38,11 @@ type RegistryServer struct { onConfigAfter func() onCron func(action string, args map[string]interface{}) (interface{}, error) webUIDir string + ekgCacheMu sync.Mutex + ekgCachePath string + ekgCacheStamp time.Time + ekgCacheSize int64 + ekgCacheRows []map[string]interface{} } func NewRegistryServer(host string, port int, token string, mgr *Manager) *RegistryServer { @@ -1504,8 +1510,9 @@ func (s *RegistryServer) handleWebUITaskQueue(w http.ResponseWriter, r *http.Req } lines := strings.Split(string(b), "\n") type agg struct { - Last map[string]interface{} - Logs []string + Last map[string]interface{} + Logs []string + Attempts int } m := map[string]*agg{} for _, ln := range lines { @@ -1525,11 +1532,18 @@ func (s *RegistryServer) handleWebUITaskQueue(w http.ResponseWriter, r *http.Req continue } if _, ok := m[id]; !ok { - m[id] = &agg{Last: row, Logs: []string{}} + m[id] = &agg{Last: row, Logs: []string{}, Attempts: 0} } - m[id].Last = row - if lg := fmt.Sprintf("%v", row["log"]); lg != "" { - m[id].Logs = append(m[id].Logs, lg) + a := m[id] + a.Last = row + a.Attempts++ + if lg := strings.TrimSpace(fmt.Sprintf("%v", row["log"])); lg != "" { + if len(a.Logs) == 0 || a.Logs[len(a.Logs)-1] != lg { + a.Logs = append(a.Logs, lg) + if len(a.Logs) > 20 { + a.Logs = a.Logs[len(a.Logs)-20:] + } + } } } items := make([]map[string]interface{}, 0, len(m)) @@ -1537,6 +1551,7 @@ func (s *RegistryServer) handleWebUITaskQueue(w http.ResponseWriter, r *http.Req for _, a := range m { row := a.Last row["logs"] = a.Logs + row["attempts"] = a.Attempts items = append(items, row) if fmt.Sprintf("%v", row["status"]) == "running" { running = append(running, row) @@ -1581,6 +1596,7 @@ func (s *RegistryServer) handleWebUITaskQueue(w http.ResponseWriter, r *http.Req } } + sort.Slice(items, func(i, j int) bool { return fmt.Sprintf("%v", items[i]["time"]) > fmt.Sprintf("%v", items[j]["time"]) }) stats := map[string]int{"total": len(items), "running": len(running), "idle_round_budget": 0, "active_user": 0, "manual_pause": 0} for _, it := range items { reason := fmt.Sprintf("%v", it["block_reason"]) @@ -1628,6 +1644,48 @@ func (s *RegistryServer) handleWebUITaskDailySummary(w http.ResponseWriter, r *h _ = json.NewEncoder(w).Encode(map[string]interface{}{"ok": true, "date": date, "report": report}) } +func (s *RegistryServer) loadEKGRowsCached(path string, maxLines int) []map[string]interface{} { + path = strings.TrimSpace(path) + if path == "" { + return nil + } + fi, err := os.Stat(path) + if err != nil { + return nil + } + s.ekgCacheMu.Lock() + defer s.ekgCacheMu.Unlock() + if s.ekgCachePath == path && s.ekgCacheSize == fi.Size() && s.ekgCacheStamp.Equal(fi.ModTime()) && len(s.ekgCacheRows) > 0 { + return s.ekgCacheRows + } + b, err := os.ReadFile(path) + if err != nil { + return nil + } + lines := strings.Split(string(b), "\n") + if len(lines) > 0 && lines[len(lines)-1] == "" { + lines = lines[:len(lines)-1] + } + if maxLines > 0 && len(lines) > maxLines { + lines = lines[len(lines)-maxLines:] + } + rows := make([]map[string]interface{}, 0, len(lines)) + for _, ln := range lines { + if strings.TrimSpace(ln) == "" { + continue + } + var row map[string]interface{} + if json.Unmarshal([]byte(ln), &row) == nil { + rows = append(rows, row) + } + } + s.ekgCachePath = path + s.ekgCacheSize = fi.Size() + s.ekgCacheStamp = fi.ModTime() + s.ekgCacheRows = rows + return rows +} + func (s *RegistryServer) handleWebUIEKGStats(w http.ResponseWriter, r *http.Request) { if !s.checkAuth(r) { http.Error(w, "unauthorized", http.StatusUnauthorized) @@ -1654,14 +1712,7 @@ func (s *RegistryServer) handleWebUIEKGStats(w http.ResponseWriter, r *http.Requ selectedWindow = "24h" } cutoff := time.Now().UTC().Add(-windowDur) - b, _ := os.ReadFile(ekgPath) - lines := strings.Split(string(b), "\n") - if len(lines) > 0 && lines[len(lines)-1] == "" { - lines = lines[:len(lines)-1] - } - if len(lines) > 3000 { - lines = lines[len(lines)-3000:] - } + rows := s.loadEKGRowsCached(ekgPath, 3000) type kv struct { Key string `json:"key"` Score float64 `json:"score,omitempty"` @@ -1674,14 +1725,7 @@ func (s *RegistryServer) handleWebUIEKGStats(w http.ResponseWriter, r *http.Requ errSigWorkload := map[string]int{} sourceStats := map[string]int{} channelStats := map[string]int{} - for _, ln := range lines { - if strings.TrimSpace(ln) == "" { - continue - } - var row map[string]interface{} - if json.Unmarshal([]byte(ln), &row) != nil { - continue - } + for _, row := range rows { ts := strings.TrimSpace(fmt.Sprintf("%v", row["time"])) if ts != "" { if tm, err := time.Parse(time.RFC3339, ts); err == nil { diff --git a/webui/src/pages/TaskAudit.tsx b/webui/src/pages/TaskAudit.tsx index 435e465..d692a33 100644 --- a/webui/src/pages/TaskAudit.tsx +++ b/webui/src/pages/TaskAudit.tsx @@ -17,6 +17,7 @@ type TaskAuditItem = { last_pause_at?: string; duration_ms?: number; retry_count?: number; + attempts?: number; error?: string; provider?: string; model?: string; @@ -155,7 +156,7 @@ const TaskAudit: React.FC = () => { className={`w-full text-left px-3 py-2 border-b border-zinc-800/60 hover:bg-zinc-800/40 ${active ? 'bg-indigo-500/15' : ''}`} >