diff --git a/pkg/channels/base.go b/pkg/channels/base.go index 53c1101..e18212f 100644 --- a/pkg/channels/base.go +++ b/pkg/channels/base.go @@ -5,7 +5,9 @@ import ( "fmt" "path/filepath" "strings" + "sync" "sync/atomic" + "time" "clawgo/pkg/bus" "clawgo/pkg/logger" @@ -32,6 +34,8 @@ type BaseChannel struct { running atomic.Bool name string allowList []string + recentMsgMu sync.Mutex + recentMsg map[string]time.Time } func NewBaseChannel(name string, config interface{}, bus *bus.MessageBus, allowList []string) *BaseChannel { @@ -40,6 +44,7 @@ func NewBaseChannel(name string, config interface{}, bus *bus.MessageBus, allowL bus: bus, name: name, allowList: allowList, + recentMsg: map[string]time.Time{}, } } @@ -78,6 +83,29 @@ func (c *BaseChannel) IsAllowed(senderID string) bool { return false } +func (c *BaseChannel) isDuplicateInboundMessage(messageID string) bool { + messageID = strings.TrimSpace(messageID) + if messageID == "" { + return false + } + now := time.Now() + const ttl = 10 * time.Minute + c.recentMsgMu.Lock() + defer c.recentMsgMu.Unlock() + for id, ts := range c.recentMsg { + if now.Sub(ts) > ttl { + delete(c.recentMsg, id) + } + } + if ts, ok := c.recentMsg[messageID]; ok { + if now.Sub(ts) <= ttl { + return true + } + } + c.recentMsg[messageID] = now + return false +} + func (c *BaseChannel) HandleMessage(senderID, chatID, content string, media []string, metadata map[string]string) { if !c.IsAllowed(senderID) { logger.WarnCF("channels", "Message rejected by allowlist", map[string]interface{}{ @@ -88,6 +116,19 @@ func (c *BaseChannel) HandleMessage(senderID, chatID, content string, media []st return } + if metadata != nil { + if messageID := strings.TrimSpace(metadata["message_id"]); messageID != "" { + if c.isDuplicateInboundMessage(c.name + ":" + messageID) { + logger.WarnCF("channels", "Duplicate inbound message skipped", map[string]interface{}{ + logger.FieldChannel: c.name, + "message_id": messageID, + logger.FieldChatID: chatID, + }) + return + } + } + } + // Build session key: channel:chatID sessionKey := fmt.Sprintf("%s:%s", c.name, chatID) diff --git a/pkg/channels/feishu.go b/pkg/channels/feishu.go index 22e673b..1c643b2 100644 --- a/pkg/channels/feishu.go +++ b/pkg/channels/feishu.go @@ -41,9 +41,6 @@ type FeishuChannel struct { tenantAccessToken string tenantTokenExpire time.Time tenantTokenErr error - - recentMsgMu sync.Mutex - recentMsg map[string]time.Time } func (c *FeishuChannel) SupportsAction(action string) bool { @@ -62,7 +59,6 @@ func NewFeishuChannel(cfg config.FeishuConfig, bus *bus.MessageBus) (*FeishuChan BaseChannel: base, config: cfg, client: lark.NewClient(cfg.AppID, cfg.AppSecret), - recentMsg: map[string]time.Time{}, }, nil } @@ -226,12 +222,6 @@ func (c *FeishuChannel) handleMessageReceive(ctx context.Context, event *larkim. if sender != nil && sender.TenantKey != nil { metadata["tenant_key"] = *sender.TenantKey } - if messageID := metadata["message_id"]; messageID != "" { - if c.isDuplicateInboundMessage(messageID) { - logger.WarnCF("feishu", "Duplicate Feishu inbound message skipped", map[string]interface{}{"message_id": messageID, logger.FieldChatID: chatID}) - return nil - } - } logger.InfoCF("feishu", "Feishu message received", map[string]interface{}{ logger.FieldSenderID: senderID, @@ -332,29 +322,6 @@ func (c *FeishuChannel) isAllowedChat(chatID, chatType string) bool { return false } -func (c *FeishuChannel) isDuplicateInboundMessage(messageID string) bool { - messageID = strings.TrimSpace(messageID) - if messageID == "" { - return false - } - now := time.Now() - const ttl = 10 * time.Minute - c.recentMsgMu.Lock() - defer c.recentMsgMu.Unlock() - for id, ts := range c.recentMsg { - if now.Sub(ts) > ttl { - delete(c.recentMsg, id) - } - } - if ts, ok := c.recentMsg[messageID]; ok { - if now.Sub(ts) <= ttl { - return true - } - } - c.recentMsg[messageID] = now - return false -} - func (c *FeishuChannel) shouldHandleGroupMessage(chatType, content string) bool { chatType = strings.ToLower(strings.TrimSpace(chatType)) isGroup := chatType != "" && chatType != "p2p" diff --git a/pkg/nodes/registry_server.go b/pkg/nodes/registry_server.go index 509f429..4636f5a 100644 --- a/pkg/nodes/registry_server.go +++ b/pkg/nodes/registry_server.go @@ -1637,8 +1637,18 @@ func (s *RegistryServer) handleWebUIEKGStats(w http.ResponseWriter, r *http.Requ if len(lines) > 3000 { lines = lines[len(lines)-3000:] } + type kv struct { + Key string `json:"key"` + Score float64 `json:"score,omitempty"` + Count int `json:"count,omitempty"` + } providerScore := map[string]float64{} + providerScoreWorkload := map[string]float64{} errSigCount := map[string]int{} + errSigHeartbeat := map[string]int{} + errSigWorkload := map[string]int{} + sourceStats := map[string]int{} + channelStats := map[string]int{} for _, ln := range lines { if strings.TrimSpace(ln) == "" { continue @@ -1650,40 +1660,52 @@ func (s *RegistryServer) handleWebUIEKGStats(w http.ResponseWriter, r *http.Requ provider := strings.TrimSpace(fmt.Sprintf("%v", row["provider"])) status := strings.ToLower(strings.TrimSpace(fmt.Sprintf("%v", row["status"]))) errSig := strings.TrimSpace(fmt.Sprintf("%v", row["errsig"])) + source := strings.ToLower(strings.TrimSpace(fmt.Sprintf("%v", row["source"]))) + channel := strings.ToLower(strings.TrimSpace(fmt.Sprintf("%v", row["channel"]))) + if source == "" { + source = "unknown" + } + if channel == "" { + channel = "unknown" + } + sourceStats[source]++ + channelStats[channel]++ + isHeartbeat := source == "heartbeat" if provider != "" { switch status { case "success": providerScore[provider] += 1 + if !isHeartbeat { providerScoreWorkload[provider] += 1 } case "suppressed": providerScore[provider] += 0.2 + if !isHeartbeat { providerScoreWorkload[provider] += 0.2 } case "error": providerScore[provider] -= 1 + if !isHeartbeat { providerScoreWorkload[provider] -= 1 } } } if errSig != "" { errSigCount[errSig]++ + if isHeartbeat { + errSigHeartbeat[errSig]++ + } else { + errSigWorkload[errSig]++ + } } } - type kv struct { - Key string `json:"key"` - Score float64 `json:"score,omitempty"` - Count int `json:"count,omitempty"` + toTopScore := func(m map[string]float64, n int) []kv { + out := make([]kv, 0, len(m)) + for k, v := range m { out = append(out, kv{Key:k, Score:v}) } + sort.Slice(out, func(i,j int) bool { return out[i].Score > out[j].Score }) + if len(out) > n { out = out[:n] } + return out } - providerTop := make([]kv, 0, len(providerScore)) - for k, v := range providerScore { - providerTop = append(providerTop, kv{Key: k, Score: v}) - } - sort.Slice(providerTop, func(i, j int) bool { return providerTop[i].Score > providerTop[j].Score }) - if len(providerTop) > 5 { - providerTop = providerTop[:5] - } - errTop := make([]kv, 0, len(errSigCount)) - for k, v := range errSigCount { - errTop = append(errTop, kv{Key: k, Count: v}) - } - sort.Slice(errTop, func(i, j int) bool { return errTop[i].Count > errTop[j].Count }) - if len(errTop) > 5 { - errTop = errTop[:5] + toTopCount := func(m map[string]int, n int) []kv { + out := make([]kv, 0, len(m)) + for k, v := range m { out = append(out, kv{Key:k, Count:v}) } + sort.Slice(out, func(i,j int) bool { return out[i].Count > out[j].Count }) + if len(out) > n { out = out[:n] } + return out } escalations := 0 tasksPath := filepath.Join(workspace, "memory", "tasks.json") @@ -1698,10 +1720,15 @@ func (s *RegistryServer) handleWebUIEKGStats(w http.ResponseWriter, r *http.Requ } } _ = json.NewEncoder(w).Encode(map[string]interface{}{ - "ok": true, - "provider_top": providerTop, - "errsig_top": errTop, - "escalation_count": escalations, + "ok": true, + "provider_top": toTopScore(providerScore, 5), + "provider_top_workload": toTopScore(providerScoreWorkload, 5), + "errsig_top": toTopCount(errSigCount, 5), + "errsig_top_heartbeat": toTopCount(errSigHeartbeat, 5), + "errsig_top_workload": toTopCount(errSigWorkload, 5), + "source_stats": sourceStats, + "channel_stats": channelStats, + "escalation_count": escalations, }) } diff --git a/webui/src/pages/TaskAudit.tsx b/webui/src/pages/TaskAudit.tsx index 36c3129..1711cc1 100644 --- a/webui/src/pages/TaskAudit.tsx +++ b/webui/src/pages/TaskAudit.tsx @@ -40,7 +40,12 @@ const TaskAudit: React.FC = () => { const [reportDate, setReportDate] = useState(new Date().toISOString().slice(0,10)); const [showDailyReport, setShowDailyReport] = useState(false); const [ekgProviderTop, setEkgProviderTop] = useState([]); + const [ekgProviderTopWorkload, setEkgProviderTopWorkload] = useState([]); const [ekgErrsigTop, setEkgErrsigTop] = useState([]); + const [ekgErrsigTopHeartbeat, setEkgErrsigTopHeartbeat] = useState([]); + const [ekgErrsigTopWorkload, setEkgErrsigTopWorkload] = useState([]); + const [ekgSourceStats, setEkgSourceStats] = useState>({}); + const [ekgChannelStats, setEkgChannelStats] = useState>({}); const [ekgEscalationCount, setEkgEscalationCount] = useState(0); const fetchData = async () => { @@ -67,7 +72,12 @@ const TaskAudit: React.FC = () => { if (er.ok) { const ej = await er.json(); setEkgProviderTop(Array.isArray(ej.provider_top) ? ej.provider_top : []); + setEkgProviderTopWorkload(Array.isArray(ej.provider_top_workload) ? ej.provider_top_workload : []); setEkgErrsigTop(Array.isArray(ej.errsig_top) ? ej.errsig_top : []); + setEkgErrsigTopHeartbeat(Array.isArray(ej.errsig_top_heartbeat) ? ej.errsig_top_heartbeat : []); + setEkgErrsigTopWorkload(Array.isArray(ej.errsig_top_workload) ? ej.errsig_top_workload : []); + setEkgSourceStats(ej.source_stats && typeof ej.source_stats === 'object' ? ej.source_stats : {}); + setEkgChannelStats(ej.channel_stats && typeof ej.channel_stats === 'object' ? ej.channel_stats : {}); setEkgEscalationCount(Number(ej.escalation_count || 0)); } } catch (e) { @@ -145,8 +155,34 @@ const TaskAudit: React.FC = () => {
Escalations
{ekgEscalationCount}
-
-
Top Providers
+
+
Source Stats
+
+ {Object.keys(ekgSourceStats).length === 0 ?
-
: Object.entries(ekgSourceStats).map(([k, v]) => ( +
{k}: {v}
+ ))} +
+
+
+
Channel Stats
+
+ {Object.keys(ekgChannelStats).length === 0 ?
-
: Object.entries(ekgChannelStats).map(([k, v]) => ( +
{k}: {v}
+ ))} +
+
+
+
+
+
Top Providers (workload)
+
+ {ekgProviderTopWorkload.length === 0 ?
-
: ekgProviderTopWorkload.map((x, i) => ( +
{x.key} ({Number(x.score || 0).toFixed(2)})
+ ))} +
+
+
+
Top Providers (all)
{ekgProviderTop.length === 0 ?
-
: ekgProviderTop.map((x, i) => (
{x.key} ({Number(x.score || 0).toFixed(2)})
@@ -154,12 +190,30 @@ const TaskAudit: React.FC = () => {
-
-
Top Error Signatures
-
- {ekgErrsigTop.length === 0 ?
-
: ekgErrsigTop.map((x, i) => ( -
{x.key} (x{x.count || 0})
- ))} +
+
+
Top Error Signatures (workload)
+
+ {ekgErrsigTopWorkload.length === 0 ?
-
: ekgErrsigTopWorkload.map((x, i) => ( +
{x.key} (x{x.count || 0})
+ ))} +
+
+
+
Top Error Signatures (heartbeat)
+
+ {ekgErrsigTopHeartbeat.length === 0 ?
-
: ekgErrsigTopHeartbeat.map((x, i) => ( +
{x.key} (x{x.count || 0})
+ ))} +
+
+
+
Top Error Signatures (all)
+
+ {ekgErrsigTop.length === 0 ?
-
: ekgErrsigTop.map((x, i) => ( +
{x.key} (x{x.count || 0})
+ ))} +