diff --git a/pkg/agent/session_planner.go b/pkg/agent/session_planner.go index f0cf2fb..62963e6 100644 --- a/pkg/agent/session_planner.go +++ b/pkg/agent/session_planner.go @@ -5,6 +5,7 @@ import ( "context" "encoding/json" "fmt" + "math" "os" "path/filepath" "regexp" @@ -123,6 +124,11 @@ func splitPlannedSegments(content string) []string { func (al *AgentLoop) runPlannedTasks(ctx context.Context, msg bus.InboundMessage, tasks []plannedTask) (string, error) { results := make([]plannedTaskResult, len(tasks)) var wg sync.WaitGroup + var progressMu sync.Mutex + completed := 0 + failed := 0 + milestones := plannedProgressMilestones(len(tasks)) + notified := make(map[int]struct{}, len(milestones)) for i, task := range tasks { wg.Add(1) go func(index int, t plannedTask) { @@ -142,7 +148,22 @@ func (al *AgentLoop) runPlannedTasks(ctx context.Context, msg bus.InboundMessage res.ErrText = err.Error() } results[index] = res - al.publishPlannedTaskProgress(msg, len(tasks), res) + progressMu.Lock() + completed++ + if res.ErrText != "" { + failed++ + } + snapshotCompleted := completed + snapshotFailed := failed + shouldNotify := shouldPublishPlannedTaskProgress(len(tasks), snapshotCompleted, res, milestones, notified) + if shouldNotify && res.ErrText == "" { + notified[snapshotCompleted] = struct{}{} + } + progressMu.Unlock() + + if shouldNotify { + al.publishPlannedTaskProgress(msg, len(tasks), snapshotCompleted, snapshotFailed, res) + } }(i, task) } wg.Wait() @@ -163,7 +184,50 @@ func (al *AgentLoop) runPlannedTasks(ctx context.Context, msg bus.InboundMessage return strings.TrimSpace(b.String()), nil } -func (al *AgentLoop) publishPlannedTaskProgress(msg bus.InboundMessage, total int, res plannedTaskResult) { +func plannedProgressMilestones(total int) []int { + if total <= 3 { + return nil + } + points := []float64{0.33, 0.66} + out := make([]int, 0, len(points)) + seen := map[int]struct{}{} + for _, p := range points { + step := int(math.Round(float64(total) * p)) + if step <= 0 || step >= total { + continue + } + if _, ok := seen[step]; ok { + continue + } + seen[step] = struct{}{} + out = append(out, step) + } + return out +} + +func shouldPublishPlannedTaskProgress(total, completed int, res plannedTaskResult, milestones []int, notified map[int]struct{}) bool { + if total <= 1 { + return false + } + if strings.TrimSpace(res.ErrText) != "" { + return true + } + if completed >= total { + return false + } + for _, step := range milestones { + if completed != step { + continue + } + if _, ok := notified[step]; ok { + return false + } + return true + } + return false +} + +func (al *AgentLoop) publishPlannedTaskProgress(msg bus.InboundMessage, total, completed, failed int, res plannedTaskResult) { if al == nil || al.bus == nil || total <= 1 { return } @@ -184,7 +248,7 @@ func (al *AgentLoop) publishPlannedTaskProgress(msg bus.InboundMessage, total in body = "(无输出)" } body = summarizePlannedTaskProgressBody(body, 6, 320) - content := fmt.Sprintf("进度 %d/%d:任务%d已%s\n%s", idx, total, idx, status, body) + content := fmt.Sprintf("阶段进度 %d/%d(失败 %d)\n最近任务:%d 已%s\n%s", completed, total, failed, idx, status, body) al.bus.PublishOutbound(bus.OutboundMessage{ Channel: msg.Channel, ChatID: msg.ChatID, diff --git a/pkg/agent/session_planner_progress_test.go b/pkg/agent/session_planner_progress_test.go new file mode 100644 index 0000000..3567580 --- /dev/null +++ b/pkg/agent/session_planner_progress_test.go @@ -0,0 +1,35 @@ +package agent + +import "testing" + +func TestPlannedProgressMilestones(t *testing.T) { + t.Parallel() + + got := plannedProgressMilestones(12) + if len(got) != 2 || got[0] != 4 || got[1] != 8 { + t.Fatalf("unexpected milestones: %#v", got) + } +} + +func TestShouldPublishPlannedTaskProgress(t *testing.T) { + t.Parallel() + + milestones := plannedProgressMilestones(12) + notified := map[int]struct{}{} + if shouldPublishPlannedTaskProgress(12, 1, plannedTaskResult{}, milestones, notified) { + t.Fatalf("did not expect early success notification") + } + if !shouldPublishPlannedTaskProgress(12, 4, plannedTaskResult{}, milestones, notified) { + t.Fatalf("expected milestone notification") + } + notified[4] = struct{}{} + if shouldPublishPlannedTaskProgress(12, 4, plannedTaskResult{}, milestones, notified) { + t.Fatalf("did not expect duplicate milestone notification") + } + if !shouldPublishPlannedTaskProgress(12, 5, plannedTaskResult{ErrText: "boom"}, milestones, notified) { + t.Fatalf("expected failure notification") + } + if shouldPublishPlannedTaskProgress(3, 3, plannedTaskResult{}, plannedProgressMilestones(3), map[int]struct{}{}) { + t.Fatalf("did not expect final success notification") + } +}