diff --git a/cmd/clawgo/cmd_gateway.go b/cmd/clawgo/cmd_gateway.go index 0da4270..8e76d8e 100644 --- a/cmd/clawgo/cmd_gateway.go +++ b/cmd/clawgo/cmd_gateway.go @@ -189,18 +189,119 @@ func gatewayCmd() { registryServer.SetConfigAfterHook(func() { _ = syscall.Kill(os.Getpid(), syscall.SIGHUP) }) - registryServer.SetCronHandler(func(action, id string) (interface{}, error) { + registryServer.SetCronHandler(func(action string, args map[string]interface{}) (interface{}, error) { + getStr := func(k string) string { + v, _ := args[k].(string) + return strings.TrimSpace(v) + } + getBoolPtr := func(k string) *bool { + v, ok := args[k].(bool) + if !ok { + return nil + } + vv := v + return &vv + } switch strings.ToLower(strings.TrimSpace(action)) { case "", "list": return cronService.ListJobs(true), nil + case "get": + id := getStr("id") + if id == "" { + return nil, fmt.Errorf("id required") + } + j := cronService.GetJob(id) + if j == nil { + return nil, fmt.Errorf("job not found: %s", id) + } + return j, nil + case "create": + name := getStr("name") + if name == "" { + name = "webui-cron" + } + kind := getStr("kind") + if kind == "" { + kind = "every" + } + msg := getStr("message") + if msg == "" { + return nil, fmt.Errorf("message required") + } + schedule := cron.CronSchedule{Kind: kind} + if kind == "every" { + everyMS, ok := args["everyMs"].(float64) + if !ok || int64(everyMS) <= 0 { + return nil, fmt.Errorf("everyMs required for kind=every") + } + ev := int64(everyMS) + schedule.EveryMS = &ev + } + if kind == "at" { + atMS, ok := args["atMs"].(float64) + if !ok || int64(atMS) <= 0 { + return nil, fmt.Errorf("atMs required for kind=at") + } + at := int64(atMS) + schedule.AtMS = &at + } + deliver := false + if v, ok := args["deliver"].(bool); ok { + deliver = v + } + return cronService.AddJob(name, schedule, msg, deliver, getStr("channel"), getStr("to")) + case "update": + id := getStr("id") + if id == "" { + return nil, fmt.Errorf("id required") + } + in := cron.UpdateJobInput{} + if v := getStr("name"); v != "" { + in.Name = &v + } + if v := getStr("message"); v != "" { + in.Message = &v + } + if p := getBoolPtr("enabled"); p != nil { + in.Enabled = p + } + if p := getBoolPtr("deliver"); p != nil { + in.Deliver = p + } + if v := getStr("channel"); v != "" { + in.Channel = &v + } + if v := getStr("to"); v != "" { + in.To = &v + } + if kind := getStr("kind"); kind != "" { + s := cron.CronSchedule{Kind: kind} + if kind == "every" { + if everyMS, ok := args["everyMs"].(float64); ok && int64(everyMS) > 0 { + ev := int64(everyMS) + s.EveryMS = &ev + } + } + if kind == "at" { + if atMS, ok := args["atMs"].(float64); ok && int64(atMS) > 0 { + at := int64(atMS) + s.AtMS = &at + } + } + in.Schedule = &s + } + return cronService.UpdateJob(id, in) case "delete": - return map[string]interface{}{"deleted": cronService.RemoveJob(strings.TrimSpace(id)), "id": strings.TrimSpace(id)}, nil + id := getStr("id") + return map[string]interface{}{"deleted": cronService.RemoveJob(id), "id": id}, nil case "enable": - j := cronService.EnableJob(strings.TrimSpace(id), true) - return map[string]interface{}{"ok": j != nil, "id": strings.TrimSpace(id)}, nil + id := getStr("id") + j := cronService.EnableJob(id, true) + return map[string]interface{}{"ok": j != nil, "id": id}, nil case "disable": - j := cronService.EnableJob(strings.TrimSpace(id), false) - return map[string]interface{}{"ok": j != nil, "id": strings.TrimSpace(id)}, nil + id := getStr("id") + j := cronService.EnableJob(id, false) + return map[string]interface{}{"ok": j != nil, "id": id}, nil default: return nil, fmt.Errorf("unsupported cron action: %s", action) } diff --git a/pkg/cron/service.go b/pkg/cron/service.go index 13c16f2..74b8e7a 100644 --- a/pkg/cron/service.go +++ b/pkg/cron/service.go @@ -128,6 +128,17 @@ type CronService struct { runner *lifecycle.LoopRunner } +type UpdateJobInput struct { + Name *string + Enabled *bool + Schedule *CronSchedule + Message *string + Deliver *bool + Channel *string + To *string + DeleteAfterRun *bool +} + func NewCronService(storePath string, onJob JobHandler) *CronService { cs := &CronService{ storePath: storePath, @@ -651,6 +662,73 @@ func (cs *CronService) EnableJob(jobID string, enabled bool) *CronJob { return nil } +func (cs *CronService) GetJob(jobID string) *CronJob { + cs.mu.RLock() + defer cs.mu.RUnlock() + for i := range cs.store.Jobs { + if cs.store.Jobs[i].ID == jobID { + job := cs.store.Jobs[i] + return &job + } + } + return nil +} + +func (cs *CronService) UpdateJob(jobID string, in UpdateJobInput) (*CronJob, error) { + cs.mu.Lock() + defer cs.mu.Unlock() + + idx := cs.findJobIndexByIDLocked(jobID) + if idx < 0 { + return nil, fmt.Errorf("job not found: %s", jobID) + } + job := &cs.store.Jobs[idx] + + if in.Name != nil { + job.Name = *in.Name + } + if in.Schedule != nil { + job.Schedule = *in.Schedule + if job.Enabled { + now := time.Now().UnixMilli() + job.State.NextRunAtMS = cs.computeNextRun(&job.Schedule, now) + } + } + if in.Message != nil { + job.Payload.Message = *in.Message + } + if in.Deliver != nil { + job.Payload.Deliver = *in.Deliver + } + if in.Channel != nil { + job.Payload.Channel = *in.Channel + } + if in.To != nil { + job.Payload.To = *in.To + } + if in.DeleteAfterRun != nil { + job.DeleteAfterRun = *in.DeleteAfterRun + } + if in.Enabled != nil { + job.Enabled = *in.Enabled + if job.Enabled { + now := time.Now().UnixMilli() + job.State.NextRunAtMS = cs.computeNextRun(&job.Schedule, now) + } else { + job.State.NextRunAtMS = nil + } + } + job.UpdatedAtMS = time.Now().UnixMilli() + + if err := cs.saveStore(); err != nil { + cs.lastSaveError = err.Error() + return nil, err + } + cs.lastSaveError = "" + ret := *job + return &ret, nil +} + func (cs *CronService) ListJobs(includeDisabled bool) []CronJob { cs.mu.RLock() defer cs.mu.RUnlock() diff --git a/pkg/nodes/registry_server.go b/pkg/nodes/registry_server.go index 110aab0..d03e437 100644 --- a/pkg/nodes/registry_server.go +++ b/pkg/nodes/registry_server.go @@ -22,7 +22,7 @@ type RegistryServer struct { configPath string onChat func(ctx context.Context, sessionKey, content string) (string, error) onConfigAfter func() - onCron func(action, id string) (interface{}, error) + onCron func(action string, args map[string]interface{}) (interface{}, error) webUIDir string } @@ -42,7 +42,7 @@ func (s *RegistryServer) SetChatHandler(fn func(ctx context.Context, sessionKey, s.onChat = fn } func (s *RegistryServer) SetConfigAfterHook(fn func()) { s.onConfigAfter = fn } -func (s *RegistryServer) SetCronHandler(fn func(action, id string) (interface{}, error)) { +func (s *RegistryServer) SetCronHandler(fn func(action string, args map[string]interface{}) (interface{}, error)) { s.onCron = fn } func (s *RegistryServer) SetWebUIDir(dir string) { s.webUIDir = strings.TrimSpace(dir) } @@ -343,33 +343,53 @@ func (s *RegistryServer) handleWebUICron(w http.ResponseWriter, r *http.Request) http.Error(w, "cron handler not configured", http.StatusInternalServerError) return } - if r.Method == http.MethodGet { - res, err := s.onCron("list", "") + + switch r.Method { + case http.MethodGet: + id := strings.TrimSpace(r.URL.Query().Get("id")) + action := "list" + if id != "" { + action = "get" + } + res, err := s.onCron(action, map[string]interface{}{"id": id}) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } - _ = json.NewEncoder(w).Encode(map[string]interface{}{"ok": true, "jobs": res}) - return - } - if r.Method == http.MethodPost { - var body struct { - Action string `json:"action"` - ID string `json:"id"` + if action == "list" { + _ = json.NewEncoder(w).Encode(map[string]interface{}{"ok": true, "jobs": res}) + } else { + _ = json.NewEncoder(w).Encode(map[string]interface{}{"ok": true, "job": res}) } - if err := json.NewDecoder(r.Body).Decode(&body); err != nil { - http.Error(w, "invalid json", http.StatusBadRequest) - return + case http.MethodPost, http.MethodPut, http.MethodDelete: + args := map[string]interface{}{} + if r.Body != nil { + _ = json.NewDecoder(r.Body).Decode(&args) } - res, err := s.onCron(strings.ToLower(strings.TrimSpace(body.Action)), strings.TrimSpace(body.ID)) + if id := strings.TrimSpace(r.URL.Query().Get("id")); id != "" { + args["id"] = id + } + action := "" + switch r.Method { + case http.MethodPost: + action = "create" + if a, ok := args["action"].(string); ok && strings.TrimSpace(a) != "" { + action = strings.ToLower(strings.TrimSpace(a)) + } + case http.MethodPut: + action = "update" + case http.MethodDelete: + action = "delete" + } + res, err := s.onCron(action, args) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } _ = json.NewEncoder(w).Encode(map[string]interface{}{"ok": true, "result": res}) - return + default: + http.Error(w, "method not allowed", http.StatusMethodNotAllowed) } - http.Error(w, "method not allowed", http.StatusMethodNotAllowed) } func (s *RegistryServer) checkAuth(r *http.Request) bool {