mirror of
https://github.com/YspCoder/clawgo.git
synced 2026-04-29 09:57:31 +08:00
feat: ship subagent runtime and remove autonomy/task legacy
This commit is contained in:
@@ -108,7 +108,6 @@ func printHelp() {
|
||||
fmt.Println(" clawgo gateway # register service")
|
||||
fmt.Println(" clawgo gateway start|stop|restart|status")
|
||||
fmt.Println(" clawgo gateway run # run foreground")
|
||||
fmt.Println(" clawgo gateway autonomy on|off|status")
|
||||
fmt.Println()
|
||||
fmt.Println("Uninstall:")
|
||||
fmt.Println(" clawgo uninstall # remove gateway service")
|
||||
|
||||
@@ -2,7 +2,6 @@ package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"os"
|
||||
"os/exec"
|
||||
@@ -15,7 +14,6 @@ import (
|
||||
|
||||
"clawgo/pkg/agent"
|
||||
"clawgo/pkg/api"
|
||||
"clawgo/pkg/autonomy"
|
||||
"clawgo/pkg/bus"
|
||||
"clawgo/pkg/channels"
|
||||
"clawgo/pkg/config"
|
||||
@@ -46,15 +44,9 @@ func gatewayCmd() {
|
||||
os.Exit(1)
|
||||
}
|
||||
return
|
||||
case "autonomy":
|
||||
if err := gatewayAutonomyControlCmd(args[1:]); err != nil {
|
||||
fmt.Printf("Error: %v\n", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
return
|
||||
default:
|
||||
fmt.Printf("Unknown gateway command: %s\n", args[0])
|
||||
fmt.Println("Usage: clawgo gateway [run|start|stop|restart|status|autonomy on|off|status]")
|
||||
fmt.Println("Usage: clawgo gateway [run|start|stop|restart|status]")
|
||||
return
|
||||
}
|
||||
|
||||
@@ -75,7 +67,6 @@ func gatewayCmd() {
|
||||
})
|
||||
configureCronServiceRuntime(cronService, cfg)
|
||||
heartbeatService := buildHeartbeatService(cfg, msgBus)
|
||||
autonomyEngine := buildAutonomyEngine(cfg, msgBus)
|
||||
sentinelService := sentinel.NewService(
|
||||
getConfigPath(),
|
||||
cfg.WorkspacePath(),
|
||||
@@ -128,10 +119,6 @@ func gatewayCmd() {
|
||||
fmt.Printf("Error starting heartbeat service: %v\n", err)
|
||||
}
|
||||
fmt.Println("✓ Heartbeat service started")
|
||||
autonomyEngine.Start()
|
||||
if cfg.Agents.Defaults.Autonomy.Enabled {
|
||||
fmt.Println("✓ Autonomy engine started")
|
||||
}
|
||||
if cfg.Sentinel.Enabled {
|
||||
sentinelService.Start()
|
||||
fmt.Println("✓ Sentinel service started")
|
||||
@@ -170,6 +157,12 @@ func gatewayCmd() {
|
||||
registryServer.SetConfigAfterHook(func() {
|
||||
_ = requestGatewayReloadSignal()
|
||||
})
|
||||
registryServer.SetSubagentHandler(func(cctx context.Context, action string, args map[string]interface{}) (interface{}, error) {
|
||||
return agentLoop.HandleSubagentRuntime(cctx, action, args)
|
||||
})
|
||||
registryServer.SetPipelineHandler(func(cctx context.Context, action string, args map[string]interface{}) (interface{}, error) {
|
||||
return agentLoop.HandlePipelineRuntime(cctx, action, args)
|
||||
})
|
||||
registryServer.SetCronHandler(func(action string, args map[string]interface{}) (interface{}, error) {
|
||||
getStr := func(k string) string {
|
||||
v, _ := args[k].(string)
|
||||
@@ -334,13 +327,10 @@ func gatewayCmd() {
|
||||
}
|
||||
configureCronServiceRuntime(cronService, newCfg)
|
||||
heartbeatService.Stop()
|
||||
autonomyEngine.Stop()
|
||||
heartbeatService = buildHeartbeatService(newCfg, msgBus)
|
||||
autonomyEngine = buildAutonomyEngine(newCfg, msgBus)
|
||||
if err := heartbeatService.Start(); err != nil {
|
||||
fmt.Printf("Error starting heartbeat service: %v\n", err)
|
||||
}
|
||||
autonomyEngine.Start()
|
||||
|
||||
if reflect.DeepEqual(cfg, newCfg) {
|
||||
fmt.Println("✓ Config unchanged, skip reload")
|
||||
@@ -352,8 +342,6 @@ func gatewayCmd() {
|
||||
reflect.DeepEqual(cfg.Tools, newCfg.Tools) &&
|
||||
reflect.DeepEqual(cfg.Channels, newCfg.Channels)
|
||||
|
||||
autonomyChanges := summarizeAutonomyChanges(cfg, newCfg)
|
||||
|
||||
if runtimeSame {
|
||||
configureLogging(newCfg)
|
||||
sentinelService.Stop()
|
||||
@@ -378,9 +366,6 @@ func gatewayCmd() {
|
||||
}
|
||||
cfg = newCfg
|
||||
runtimecfg.Set(cfg)
|
||||
if len(autonomyChanges) > 0 {
|
||||
fmt.Printf("↻ Autonomy changes: %s\n", strings.Join(autonomyChanges, ", "))
|
||||
}
|
||||
fmt.Println("✓ Config hot-reload applied (logging/metadata only)")
|
||||
continue
|
||||
}
|
||||
@@ -424,15 +409,11 @@ func gatewayCmd() {
|
||||
continue
|
||||
}
|
||||
go agentLoop.Run(ctx)
|
||||
if len(autonomyChanges) > 0 {
|
||||
fmt.Printf("↻ Autonomy changes: %s\n", strings.Join(autonomyChanges, ", "))
|
||||
}
|
||||
fmt.Println("✓ Config hot-reload applied")
|
||||
default:
|
||||
fmt.Println("\nShutting down...")
|
||||
cancel()
|
||||
heartbeatService.Stop()
|
||||
autonomyEngine.Stop()
|
||||
sentinelService.Stop()
|
||||
cronService.Stop()
|
||||
agentLoop.Stop()
|
||||
@@ -443,126 +424,6 @@ func gatewayCmd() {
|
||||
}
|
||||
}
|
||||
|
||||
func gatewayAutonomyControlCmd(args []string) error {
|
||||
if len(args) < 1 {
|
||||
return fmt.Errorf("usage: clawgo gateway autonomy [on|off|status]")
|
||||
}
|
||||
cfg, err := loadConfig()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
memDir := filepath.Join(cfg.WorkspacePath(), "memory")
|
||||
if err := os.MkdirAll(memDir, 0755); err != nil {
|
||||
return err
|
||||
}
|
||||
pausePath := filepath.Join(memDir, "autonomy.pause")
|
||||
ctrlPath := filepath.Join(memDir, "autonomy.control.json")
|
||||
|
||||
type autonomyControl struct {
|
||||
Enabled bool `json:"enabled"`
|
||||
UpdatedAt string `json:"updated_at"`
|
||||
Source string `json:"source"`
|
||||
}
|
||||
|
||||
writeControl := func(enabled bool) error {
|
||||
c := autonomyControl{Enabled: enabled, UpdatedAt: time.Now().UTC().Format(time.RFC3339), Source: "manual_cli"}
|
||||
data, err := json.MarshalIndent(c, "", " ")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return os.WriteFile(ctrlPath, append(data, '\n'), 0644)
|
||||
}
|
||||
|
||||
switch strings.ToLower(strings.TrimSpace(args[0])) {
|
||||
case "on":
|
||||
_ = os.Remove(pausePath)
|
||||
if err := writeControl(true); err != nil {
|
||||
return err
|
||||
}
|
||||
fmt.Println("✓ Autonomy enabled")
|
||||
return nil
|
||||
case "off":
|
||||
if err := writeControl(false); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := os.WriteFile(pausePath, []byte(time.Now().UTC().Format(time.RFC3339)+"\n"), 0644); err != nil {
|
||||
return err
|
||||
}
|
||||
fmt.Println("✓ Autonomy disabled (paused)")
|
||||
return nil
|
||||
case "status":
|
||||
enabled := true
|
||||
reason := "default"
|
||||
updatedAt := ""
|
||||
source := ""
|
||||
if data, err := os.ReadFile(ctrlPath); err == nil {
|
||||
var c autonomyControl
|
||||
if json.Unmarshal(data, &c) == nil {
|
||||
enabled = c.Enabled
|
||||
updatedAt = c.UpdatedAt
|
||||
source = c.Source
|
||||
if !c.Enabled {
|
||||
reason = "control_file"
|
||||
}
|
||||
}
|
||||
}
|
||||
if _, err := os.Stat(pausePath); err == nil {
|
||||
enabled = false
|
||||
reason = "pause_file"
|
||||
}
|
||||
fmt.Printf("Autonomy status: %v (%s)\n", enabled, reason)
|
||||
if strings.TrimSpace(updatedAt) != "" {
|
||||
fmt.Printf("Last switch: %s", updatedAt)
|
||||
if strings.TrimSpace(source) != "" {
|
||||
fmt.Printf(" via %s", source)
|
||||
}
|
||||
fmt.Println()
|
||||
}
|
||||
fmt.Printf("Control file: %s\n", ctrlPath)
|
||||
fmt.Printf("Pause file: %s\n", pausePath)
|
||||
return nil
|
||||
default:
|
||||
return fmt.Errorf("usage: clawgo gateway autonomy [on|off|status]")
|
||||
}
|
||||
}
|
||||
|
||||
func summarizeAutonomyChanges(oldCfg, newCfg *config.Config) []string {
|
||||
if oldCfg == nil || newCfg == nil {
|
||||
return nil
|
||||
}
|
||||
o := oldCfg.Agents.Defaults.Autonomy
|
||||
n := newCfg.Agents.Defaults.Autonomy
|
||||
changes := make([]string, 0)
|
||||
if o.Enabled != n.Enabled {
|
||||
changes = append(changes, "enabled")
|
||||
}
|
||||
if o.TickIntervalSec != n.TickIntervalSec {
|
||||
changes = append(changes, "tick_interval_sec")
|
||||
}
|
||||
if o.MinRunIntervalSec != n.MinRunIntervalSec {
|
||||
changes = append(changes, "min_run_interval_sec")
|
||||
}
|
||||
if o.UserIdleResumeSec != n.UserIdleResumeSec {
|
||||
changes = append(changes, "user_idle_resume_sec")
|
||||
}
|
||||
if o.WaitingResumeDebounceSec != n.WaitingResumeDebounceSec {
|
||||
changes = append(changes, "waiting_resume_debounce_sec")
|
||||
}
|
||||
if o.IdleRoundBudgetReleaseSec != n.IdleRoundBudgetReleaseSec {
|
||||
changes = append(changes, "idle_round_budget_release_sec")
|
||||
}
|
||||
if strings.TrimSpace(o.QuietHours) != strings.TrimSpace(n.QuietHours) {
|
||||
changes = append(changes, "quiet_hours")
|
||||
}
|
||||
if o.NotifyCooldownSec != n.NotifyCooldownSec {
|
||||
changes = append(changes, "notify_cooldown_sec")
|
||||
}
|
||||
if o.NotifySameReasonCooldownSec != n.NotifySameReasonCooldownSec {
|
||||
changes = append(changes, "notify_same_reason_cooldown_sec")
|
||||
}
|
||||
return changes
|
||||
}
|
||||
|
||||
func runGatewayStartupCompactionCheck(parent context.Context, agentLoop *agent.AgentLoop) {
|
||||
if agentLoop == nil {
|
||||
return
|
||||
@@ -910,74 +771,3 @@ func buildHeartbeatService(cfg *config.Config, msgBus *bus.MessageBus) *heartbea
|
||||
return "queued", nil
|
||||
}, hbInterval, cfg.Agents.Defaults.Heartbeat.Enabled, cfg.Agents.Defaults.Heartbeat.PromptTemplate)
|
||||
}
|
||||
|
||||
func buildAutonomyEngine(cfg *config.Config, msgBus *bus.MessageBus) *autonomy.Engine {
|
||||
a := cfg.Agents.Defaults.Autonomy
|
||||
idleRoundBudgetReleaseSec := a.IdleRoundBudgetReleaseSec
|
||||
if idleRoundBudgetReleaseSec == 0 {
|
||||
idleRoundBudgetReleaseSec = 1800
|
||||
}
|
||||
notifyChannel, notifyChatID := inferAutonomyNotifyTarget(cfg)
|
||||
notifyAllowFrom := []string{}
|
||||
switch notifyChannel {
|
||||
case "telegram":
|
||||
notifyAllowFrom = append(notifyAllowFrom, cfg.Channels.Telegram.AllowFrom...)
|
||||
case "feishu":
|
||||
notifyAllowFrom = append(notifyAllowFrom, cfg.Channels.Feishu.AllowFrom...)
|
||||
case "whatsapp":
|
||||
notifyAllowFrom = append(notifyAllowFrom, cfg.Channels.WhatsApp.AllowFrom...)
|
||||
case "discord":
|
||||
notifyAllowFrom = append(notifyAllowFrom, cfg.Channels.Discord.AllowFrom...)
|
||||
case "qq":
|
||||
notifyAllowFrom = append(notifyAllowFrom, cfg.Channels.QQ.AllowFrom...)
|
||||
case "dingtalk":
|
||||
notifyAllowFrom = append(notifyAllowFrom, cfg.Channels.DingTalk.AllowFrom...)
|
||||
}
|
||||
return autonomy.NewEngine(autonomy.Options{
|
||||
Enabled: a.Enabled,
|
||||
TickIntervalSec: a.TickIntervalSec,
|
||||
MinRunIntervalSec: a.MinRunIntervalSec,
|
||||
MaxPendingDurationSec: a.MaxPendingDurationSec,
|
||||
MaxConsecutiveStalls: a.MaxConsecutiveStalls,
|
||||
MaxDispatchPerTick: a.MaxDispatchPerTick,
|
||||
NotifyCooldownSec: a.NotifyCooldownSec,
|
||||
NotifySameReasonCooldownSec: a.NotifySameReasonCooldownSec,
|
||||
QuietHours: a.QuietHours,
|
||||
UserIdleResumeSec: a.UserIdleResumeSec,
|
||||
MaxRoundsWithoutUser: a.MaxRoundsWithoutUser,
|
||||
TaskHistoryRetentionDays: a.TaskHistoryRetentionDays,
|
||||
WaitingResumeDebounceSec: a.WaitingResumeDebounceSec,
|
||||
IdleRoundBudgetReleaseSec: idleRoundBudgetReleaseSec,
|
||||
AllowedTaskKeywords: a.AllowedTaskKeywords,
|
||||
EKGConsecutiveErrorThreshold: a.EKGConsecutiveErrorThreshold,
|
||||
Workspace: cfg.WorkspacePath(),
|
||||
DefaultNotifyChannel: notifyChannel,
|
||||
DefaultNotifyChatID: notifyChatID,
|
||||
NotifyAllowFrom: notifyAllowFrom,
|
||||
}, msgBus)
|
||||
}
|
||||
|
||||
func inferAutonomyNotifyTarget(cfg *config.Config) (string, string) {
|
||||
if cfg == nil {
|
||||
return "", ""
|
||||
}
|
||||
if cfg.Channels.Telegram.Enabled && len(cfg.Channels.Telegram.AllowFrom) > 0 {
|
||||
return "telegram", strings.TrimSpace(cfg.Channels.Telegram.AllowFrom[0])
|
||||
}
|
||||
if cfg.Channels.Feishu.Enabled && len(cfg.Channels.Feishu.AllowFrom) > 0 {
|
||||
return "feishu", strings.TrimSpace(cfg.Channels.Feishu.AllowFrom[0])
|
||||
}
|
||||
if cfg.Channels.WhatsApp.Enabled && len(cfg.Channels.WhatsApp.AllowFrom) > 0 {
|
||||
return "whatsapp", strings.TrimSpace(cfg.Channels.WhatsApp.AllowFrom[0])
|
||||
}
|
||||
if cfg.Channels.Discord.Enabled && len(cfg.Channels.Discord.AllowFrom) > 0 {
|
||||
return "discord", strings.TrimSpace(cfg.Channels.Discord.AllowFrom[0])
|
||||
}
|
||||
if cfg.Channels.QQ.Enabled && len(cfg.Channels.QQ.AllowFrom) > 0 {
|
||||
return "qq", strings.TrimSpace(cfg.Channels.QQ.AllowFrom[0])
|
||||
}
|
||||
if cfg.Channels.DingTalk.Enabled && len(cfg.Channels.DingTalk.AllowFrom) > 0 {
|
||||
return "dingtalk", strings.TrimSpace(cfg.Channels.DingTalk.AllowFrom[0])
|
||||
}
|
||||
return "", ""
|
||||
}
|
||||
|
||||
@@ -94,9 +94,6 @@ func statusCmd() {
|
||||
triggerStats := filepath.Join(workspace, "memory", "trigger-stats.json")
|
||||
if data, err := os.ReadFile(triggerStats); err == nil {
|
||||
fmt.Printf("Trigger Stats: %s\n", strings.TrimSpace(string(data)))
|
||||
if summary := summarizeAutonomyActions(data); summary != "" {
|
||||
fmt.Printf("Autonomy Action Stats: %s\n", summary)
|
||||
}
|
||||
}
|
||||
auditPath := filepath.Join(workspace, "memory", "trigger-audit.jsonl")
|
||||
if errs, err := collectRecentTriggerErrors(auditPath, 5); err == nil && len(errs) > 0 {
|
||||
@@ -138,26 +135,6 @@ func statusCmd() {
|
||||
fmt.Printf(" - %s\n", key)
|
||||
}
|
||||
}
|
||||
fmt.Printf("Autonomy Config: idle_resume=%ds waiting_debounce=%ds notify_cooldown=%ds same_reason_cooldown=%ds\n",
|
||||
cfg.Agents.Defaults.Autonomy.UserIdleResumeSec,
|
||||
cfg.Agents.Defaults.Autonomy.WaitingResumeDebounceSec,
|
||||
cfg.Agents.Defaults.Autonomy.NotifyCooldownSec,
|
||||
cfg.Agents.Defaults.Autonomy.NotifySameReasonCooldownSec,
|
||||
)
|
||||
if summary, prio, reasons, nextRetry, dedupeHits, waitingLocks, lockKeys, err := collectAutonomyTaskSummary(filepath.Join(workspace, "memory", "tasks.json")); err == nil {
|
||||
fmt.Printf("Autonomy Tasks: todo=%d doing=%d waiting=%d blocked=%d done=%d dedupe_hits=%d\n", summary["todo"], summary["doing"], summary["waiting"], summary["blocked"], summary["done"], dedupeHits)
|
||||
fmt.Printf("Autonomy Priority: high=%d normal=%d low=%d\n", prio["high"], prio["normal"], prio["low"])
|
||||
if reasons["active_user"] > 0 || reasons["manual_pause"] > 0 || reasons["max_consecutive_stalls"] > 0 || reasons["resource_lock"] > 0 {
|
||||
fmt.Printf("Autonomy Block Reasons: active_user=%d manual_pause=%d max_stalls=%d resource_lock=%d\n", reasons["active_user"], reasons["manual_pause"], reasons["max_consecutive_stalls"], reasons["resource_lock"])
|
||||
}
|
||||
if waitingLocks > 0 || lockKeys > 0 {
|
||||
fmt.Printf("Autonomy Locks: waiting=%d unique_keys=%d\n", waitingLocks, lockKeys)
|
||||
}
|
||||
if nextRetry != "" {
|
||||
fmt.Printf("Autonomy Next Retry: %s\n", nextRetry)
|
||||
}
|
||||
fmt.Printf("Autonomy Control: %s\n", autonomyControlState(workspace))
|
||||
}
|
||||
ns := nodes.DefaultManager().List()
|
||||
if len(ns) > 0 {
|
||||
online := 0
|
||||
@@ -197,81 +174,6 @@ func statusCmd() {
|
||||
}
|
||||
}
|
||||
|
||||
func summarizeAutonomyActions(statsJSON []byte) string {
|
||||
var payload struct {
|
||||
Counts map[string]int `json:"counts"`
|
||||
}
|
||||
if err := json.Unmarshal(statsJSON, &payload); err != nil || payload.Counts == nil {
|
||||
return ""
|
||||
}
|
||||
keys := []string{"autonomy:dispatch", "autonomy:waiting", "autonomy:resume", "autonomy:blocked", "autonomy:complete"}
|
||||
parts := make([]string, 0, len(keys)+1)
|
||||
total := 0
|
||||
for _, k := range keys {
|
||||
if v, ok := payload.Counts[k]; ok {
|
||||
parts = append(parts, fmt.Sprintf("%s=%d", strings.TrimPrefix(k, "autonomy:"), v))
|
||||
total += v
|
||||
}
|
||||
}
|
||||
if total > 0 {
|
||||
d := payload.Counts["autonomy:dispatch"]
|
||||
w := payload.Counts["autonomy:waiting"]
|
||||
b := payload.Counts["autonomy:blocked"]
|
||||
parts = append(parts, fmt.Sprintf("ratios(dispatch/waiting/blocked)=%.2f/%.2f/%.2f", float64(d)/float64(total), float64(w)/float64(total), float64(b)/float64(total)))
|
||||
}
|
||||
wa := payload.Counts["autonomy:waiting:active_user"]
|
||||
wm := payload.Counts["autonomy:waiting:manual_pause"]
|
||||
ra := payload.Counts["autonomy:resume:active_user"]
|
||||
rm := payload.Counts["autonomy:resume:manual_pause"]
|
||||
if wa+wm+ra+rm > 0 {
|
||||
parts = append(parts, fmt.Sprintf("wait_resume(active_user=%d/%d manual_pause=%d/%d)", wa, ra, wm, rm))
|
||||
waitTotal := wa + wm
|
||||
resumeTotal := ra + rm
|
||||
if waitTotal >= 8 {
|
||||
parts = append(parts, fmt.Sprintf("flap_risk=%s", flapRisk(waitTotal, resumeTotal)))
|
||||
}
|
||||
}
|
||||
return strings.Join(parts, " ")
|
||||
}
|
||||
|
||||
func flapRisk(waitTotal, resumeTotal int) string {
|
||||
if waitTotal <= 0 {
|
||||
return "low"
|
||||
}
|
||||
if resumeTotal == 0 {
|
||||
return "high(no_resume)"
|
||||
}
|
||||
ratio := float64(waitTotal) / float64(resumeTotal)
|
||||
if ratio >= 2.0 || ratio <= 0.5 {
|
||||
return "high"
|
||||
}
|
||||
if ratio >= 1.5 || ratio <= 0.67 {
|
||||
return "medium"
|
||||
}
|
||||
return "low"
|
||||
}
|
||||
|
||||
func autonomyControlState(workspace string) string {
|
||||
memDir := filepath.Join(workspace, "memory")
|
||||
pausePath := filepath.Join(memDir, "autonomy.pause")
|
||||
if _, err := os.Stat(pausePath); err == nil {
|
||||
return "paused (autonomy.pause)"
|
||||
}
|
||||
ctrlPath := filepath.Join(memDir, "autonomy.control.json")
|
||||
if data, err := os.ReadFile(ctrlPath); err == nil {
|
||||
var c struct {
|
||||
Enabled bool `json:"enabled"`
|
||||
}
|
||||
if json.Unmarshal(data, &c) == nil {
|
||||
if c.Enabled {
|
||||
return "enabled"
|
||||
}
|
||||
return "disabled (control file)"
|
||||
}
|
||||
}
|
||||
return "default"
|
||||
}
|
||||
|
||||
func collectSessionKindCounts(sessionsDir string) (map[string]int, error) {
|
||||
indexPath := filepath.Join(sessionsDir, "sessions.json")
|
||||
data, err := os.ReadFile(indexPath)
|
||||
@@ -360,70 +262,6 @@ func collectTriggerErrorCounts(path string) (map[string]int, error) {
|
||||
return counts, nil
|
||||
}
|
||||
|
||||
func collectAutonomyTaskSummary(path string) (map[string]int, map[string]int, map[string]int, string, int, int, int, error) {
|
||||
data, err := os.ReadFile(path)
|
||||
if err != nil {
|
||||
if os.IsNotExist(err) {
|
||||
return map[string]int{"todo": 0, "doing": 0, "waiting": 0, "blocked": 0, "done": 0}, map[string]int{"high": 0, "normal": 0, "low": 0}, map[string]int{"active_user": 0, "manual_pause": 0, "max_consecutive_stalls": 0, "resource_lock": 0}, "", 0, 0, 0, nil
|
||||
}
|
||||
return nil, nil, nil, "", 0, 0, 0, err
|
||||
}
|
||||
var items []struct {
|
||||
Status string `json:"status"`
|
||||
Priority string `json:"priority"`
|
||||
BlockReason string `json:"block_reason"`
|
||||
RetryAfter string `json:"retry_after"`
|
||||
DedupeHits int `json:"dedupe_hits"`
|
||||
ResourceKeys []string `json:"resource_keys"`
|
||||
}
|
||||
if err := json.Unmarshal(data, &items); err != nil {
|
||||
return nil, nil, nil, "", 0, 0, 0, err
|
||||
}
|
||||
summary := map[string]int{"todo": 0, "doing": 0, "waiting": 0, "blocked": 0, "done": 0}
|
||||
priorities := map[string]int{"high": 0, "normal": 0, "low": 0}
|
||||
reasons := map[string]int{"active_user": 0, "manual_pause": 0, "max_consecutive_stalls": 0, "resource_lock": 0}
|
||||
nextRetry := ""
|
||||
nextRetryAt := time.Time{}
|
||||
totalDedupe := 0
|
||||
waitingLocks := 0
|
||||
lockKeySet := map[string]struct{}{}
|
||||
for _, it := range items {
|
||||
s := strings.ToLower(strings.TrimSpace(it.Status))
|
||||
if _, ok := summary[s]; ok {
|
||||
summary[s]++
|
||||
}
|
||||
totalDedupe += it.DedupeHits
|
||||
r := strings.ToLower(strings.TrimSpace(it.BlockReason))
|
||||
if _, ok := reasons[r]; ok {
|
||||
reasons[r]++
|
||||
}
|
||||
if s == "waiting" && r == "resource_lock" {
|
||||
waitingLocks++
|
||||
for _, k := range it.ResourceKeys {
|
||||
kk := strings.TrimSpace(strings.ToLower(k))
|
||||
if kk != "" {
|
||||
lockKeySet[kk] = struct{}{}
|
||||
}
|
||||
}
|
||||
}
|
||||
p := strings.ToLower(strings.TrimSpace(it.Priority))
|
||||
if _, ok := priorities[p]; ok {
|
||||
priorities[p]++
|
||||
} else {
|
||||
priorities["normal"]++
|
||||
}
|
||||
if strings.TrimSpace(it.RetryAfter) != "" {
|
||||
if t, err := time.Parse(time.RFC3339, it.RetryAfter); err == nil {
|
||||
if nextRetryAt.IsZero() || t.Before(nextRetryAt) {
|
||||
nextRetryAt = t
|
||||
nextRetry = t.Format(time.RFC3339)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return summary, priorities, reasons, nextRetry, totalDedupe, waitingLocks, len(lockKeySet), nil
|
||||
}
|
||||
|
||||
func collectNodeDispatchStats(path string) (int, int, int, string, error) {
|
||||
data, err := os.ReadFile(path)
|
||||
if err != nil {
|
||||
|
||||
Reference in New Issue
Block a user