diff --git a/pkg/agent/loop.go b/pkg/agent/loop.go index 79db7c5..e2ce00f 100644 --- a/pkg/agent/loop.go +++ b/pkg/agent/loop.go @@ -310,18 +310,14 @@ func (al *AgentLoop) lockSessionRun(sessionKey string) func() { } func (al *AgentLoop) processInbound(ctx context.Context, msg bus.InboundMessage) { - taskID, noticeCount, stopNotice := al.startLongRunNotice(ctx, msg) - defer stopNotice() + taskID := fmt.Sprintf("%s-%d", shortSessionKey(msg.SessionKey), time.Now().Unix()%100000) + started := time.Now() response, err := al.processMessage(ctx, msg) if err != nil { response = fmt.Sprintf("Error processing message: %v", err) } - if noticeCount != nil && *noticeCount > 0 && response != "" { - response = fmt.Sprintf("任务ID %s 已完成。\n\n%s", taskID, response) - } - trigger := al.getTrigger(msg) suppressed := false if response != "" { @@ -335,40 +331,7 @@ func (al *AgentLoop) processInbound(ctx context.Context, msg bus.InboundMessage) al.bus.PublishOutbound(bus.OutboundMessage{Channel: msg.Channel, ChatID: msg.ChatID, Action: "finalize"}) } al.audit.Record(trigger, msg.Channel, msg.SessionKey, suppressed, err) -} - -func (al *AgentLoop) startLongRunNotice(ctx context.Context, msg bus.InboundMessage) (string, *int, func()) { - first := 45 * time.Second - interval := 45 * time.Second - if v := os.Getenv("CLAWGO_LONGRUN_NOTICE_SEC"); v != "" { - if n, err := strconv.Atoi(v); err == nil && n > 0 { - first = time.Duration(n) * time.Second - interval = first - } - } - taskID := fmt.Sprintf("%s-%d", shortSessionKey(msg.SessionKey), time.Now().Unix()%100000) - stop := make(chan struct{}) - notified := 0 - go func() { - t := time.NewTimer(first) - defer t.Stop() - for { - select { - case <-ctx.Done(): - return - case <-stop: - return - case <-t.C: - notified++ - text := fmt.Sprintf("任务ID %s 正在执行中(第%d次进度通知)…", taskID, notified) - if outbound, ok := al.prepareOutbound(msg, text); ok { - al.bus.PublishOutbound(outbound) - } - t.Reset(interval) - } - } - }() - return taskID, ¬ified, func() { close(stop) } + al.appendTaskAudit(taskID, msg, started, err, suppressed) } func shortSessionKey(s string) string { @@ -378,6 +341,39 @@ func shortSessionKey(s string) string { return s[:8] } +func (al *AgentLoop) appendTaskAudit(taskID string, msg bus.InboundMessage, started time.Time, runErr error, suppressed bool) { + if al.workspace == "" { + return + } + path := filepath.Join(al.workspace, "memory", "task-audit.jsonl") + _ = os.MkdirAll(filepath.Dir(path), 0755) + status := "success" + if runErr != nil { + status = "error" + } else if suppressed { + status = "suppressed" + } + row := map[string]interface{}{ + "task_id": taskID, + "time": time.Now().UTC().Format(time.RFC3339), + "channel": msg.Channel, + "session": msg.SessionKey, + "chat_id": msg.ChatID, + "sender_id": msg.SenderID, + "status": status, + "duration_ms": int(time.Since(started).Milliseconds()), + "error": func() string { if runErr != nil { return runErr.Error() }; return "" }(), + "input_preview": truncate(strings.ReplaceAll(msg.Content, "\n", " "), 180), + } + b, _ := json.Marshal(row) + f, err := os.OpenFile(path, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0644) + if err != nil { + return + } + defer f.Close() + _, _ = f.Write(append(b, '\n')) +} + func sessionShardCount() int { if v := strings.TrimSpace(os.Getenv("CLAWGO_SESSION_SHARDS")); v != "" { if n, err := strconv.Atoi(v); err == nil && n > 0 { diff --git a/pkg/nodes/registry_server.go b/pkg/nodes/registry_server.go index 881b7d0..780dd41 100644 --- a/pkg/nodes/registry_server.go +++ b/pkg/nodes/registry_server.go @@ -88,6 +88,7 @@ func (s *RegistryServer) Start(ctx context.Context) error { mux.HandleFunc("/webui/api/skills", s.handleWebUISkills) mux.HandleFunc("/webui/api/sessions", s.handleWebUISessions) mux.HandleFunc("/webui/api/memory", s.handleWebUIMemory) + mux.HandleFunc("/webui/api/task_audit", s.handleWebUITaskAudit) mux.HandleFunc("/webui/api/exec_approvals", s.handleWebUIExecApprovals) mux.HandleFunc("/webui/api/logs/stream", s.handleWebUILogsStream) mux.HandleFunc("/webui/api/logs/recent", s.handleWebUILogsRecent) @@ -1339,6 +1340,50 @@ func (s *RegistryServer) handleWebUIMemory(w http.ResponseWriter, r *http.Reques } } +func (s *RegistryServer) handleWebUITaskAudit(w http.ResponseWriter, r *http.Request) { + if !s.checkAuth(r) { + http.Error(w, "unauthorized", http.StatusUnauthorized) + return + } + if r.Method != http.MethodGet { + http.Error(w, "method not allowed", http.StatusMethodNotAllowed) + return + } + path := filepath.Join(strings.TrimSpace(s.workspacePath), "memory", "task-audit.jsonl") + limit := 100 + if v := r.URL.Query().Get("limit"); v != "" { + if n, err := strconv.Atoi(v); err == nil && n > 0 { + if n > 500 { + n = 500 + } + limit = n + } + } + b, err := os.ReadFile(path) + if err != nil { + _ = json.NewEncoder(w).Encode(map[string]interface{}{"ok": true, "items": []map[string]interface{}{}}) + return + } + lines := strings.Split(string(b), "\n") + if len(lines) > 0 && lines[len(lines)-1] == "" { + lines = lines[:len(lines)-1] + } + if len(lines) > limit { + lines = lines[len(lines)-limit:] + } + items := make([]map[string]interface{}, 0, len(lines)) + for _, ln := range lines { + if ln == "" { + continue + } + var row map[string]interface{} + if err := json.Unmarshal([]byte(ln), &row); err == nil { + items = append(items, row) + } + } + _ = json.NewEncoder(w).Encode(map[string]interface{}{"ok": true, "items": items}) +} + func (s *RegistryServer) handleWebUIExecApprovals(w http.ResponseWriter, r *http.Request) { if !s.checkAuth(r) { http.Error(w, "unauthorized", http.StatusUnauthorized)