diff --git a/cmd/clawgo/cmd_gateway.go b/cmd/clawgo/cmd_gateway.go index d86e40d..04e1457 100644 --- a/cmd/clawgo/cmd_gateway.go +++ b/cmd/clawgo/cmd_gateway.go @@ -226,6 +226,9 @@ func gatewayCmd() { if kind == "" { kind = "every" } + if kind == "once" { + kind = "at" + } msg := getStr("message") if msg == "" { return nil, fmt.Errorf("message required") @@ -241,10 +244,12 @@ func gatewayCmd() { } if kind == "at" { atMS, ok := args["atMs"].(float64) + var at int64 if !ok || int64(atMS) <= 0 { - return nil, fmt.Errorf("atMs required for kind=at") + at = time.Now().Add(1 * time.Minute).UnixMilli() + } else { + at = int64(atMS) } - at := int64(atMS) schedule.AtMS = &at } if kind == "cron" { @@ -284,6 +289,9 @@ func gatewayCmd() { in.To = &v } if kind := getStr("kind"); kind != "" { + if kind == "once" { + kind = "at" + } s := cron.CronSchedule{Kind: kind} if kind == "every" { if everyMS, ok := args["everyMs"].(float64); ok && int64(everyMS) > 0 { @@ -295,6 +303,9 @@ func gatewayCmd() { if atMS, ok := args["atMs"].(float64); ok && int64(atMS) > 0 { at := int64(atMS) s.AtMS = &at + } else { + at := time.Now().Add(1 * time.Minute).UnixMilli() + s.AtMS = &at } } if kind == "cron" { diff --git a/pkg/nodes/registry_server.go b/pkg/nodes/registry_server.go index aa2c6ff..afc0e8c 100644 --- a/pkg/nodes/registry_server.go +++ b/pkg/nodes/registry_server.go @@ -1,6 +1,7 @@ package nodes import ( + "bufio" "context" "encoding/json" "fmt" @@ -10,6 +11,7 @@ import ( "os" "os/exec" "path/filepath" + "regexp" "strings" "syscall" "time" @@ -380,14 +382,12 @@ func (s *RegistryServer) handleWebUIChatStream(w http.ResponseWriter, r *http.Re prompt += "[file: " + strings.TrimSpace(body.Media) + "]" } - w.Header().Set("Content-Type", "text/event-stream") + w.Header().Set("Content-Type", "text/plain; charset=utf-8") w.Header().Set("Cache-Control", "no-cache") w.Header().Set("Connection", "keep-alive") - _, _ = w.Write([]byte("event: start\ndata: {\"ok\":true}\n\n")) - flusher.Flush() resp, err := s.onChat(r.Context(), session, prompt) if err != nil { - _, _ = w.Write([]byte("event: error\ndata: {\"error\":\"" + strings.ReplaceAll(err.Error(), "\"", "'") + "\"}\n\n")) + _, _ = w.Write([]byte("Error: " + err.Error())) flusher.Flush() return } @@ -397,13 +397,9 @@ func (s *RegistryServer) handleWebUIChatStream(w http.ResponseWriter, r *http.Re if end > len(resp) { end = len(resp) } - part := strings.ReplaceAll(resp[i:end], "\n", "\\n") - part = strings.ReplaceAll(part, "\"", "'") - _, _ = w.Write([]byte("event: delta\ndata: {\"text\":\"" + part + "\"}\n\n")) + _, _ = w.Write([]byte(resp[i:end])) flusher.Flush() } - _, _ = w.Write([]byte("event: done\ndata: {\"ok\":true}\n\n")) - flusher.Flush() } func (s *RegistryServer) handleWebUINodes(w http.ResponseWriter, r *http.Request) { @@ -467,9 +463,9 @@ func (s *RegistryServer) handleWebUICron(w http.ResponseWriter, r *http.Request) return } if action == "list" { - _ = json.NewEncoder(w).Encode(map[string]interface{}{"ok": true, "jobs": res}) + _ = json.NewEncoder(w).Encode(map[string]interface{}{"ok": true, "jobs": normalizeCronJobs(res)}) } else { - _ = json.NewEncoder(w).Encode(map[string]interface{}{"ok": true, "job": res}) + _ = json.NewEncoder(w).Encode(map[string]interface{}{"ok": true, "job": normalizeCronJob(res)}) } case http.MethodPost: args := map[string]interface{}{} @@ -488,7 +484,7 @@ func (s *RegistryServer) handleWebUICron(w http.ResponseWriter, r *http.Request) http.Error(w, err.Error(), http.StatusInternalServerError) return } - _ = json.NewEncoder(w).Encode(map[string]interface{}{"ok": true, "result": res}) + _ = json.NewEncoder(w).Encode(map[string]interface{}{"ok": true, "result": normalizeCronJob(res)}) default: http.Error(w, "method not allowed", http.StatusMethodNotAllowed) } @@ -500,28 +496,30 @@ func (s *RegistryServer) handleWebUISkills(w http.ResponseWriter, r *http.Reques return } skillsDir := filepath.Join(strings.TrimSpace(s.workspacePath), "skills") - if skillsDir == "" { + if strings.TrimSpace(skillsDir) == "" { http.Error(w, "workspace not configured", http.StatusInternalServerError) return } + _ = os.MkdirAll(skillsDir, 0755) + switch r.Method { case http.MethodGet: entries, err := os.ReadDir(skillsDir) if err != nil { - if os.IsNotExist(err) { - _ = json.NewEncoder(w).Encode(map[string]interface{}{"ok": true, "skills": []interface{}{}}) - return - } http.Error(w, err.Error(), http.StatusInternalServerError) return } type skillItem struct { - Name string `json:"name"` - Enabled bool `json:"enabled"` - UpdateChecked bool `json:"update_checked"` - RemoteFound bool `json:"remote_found,omitempty"` - RemoteVersion string `json:"remote_version,omitempty"` - CheckError string `json:"check_error,omitempty"` + ID string `json:"id"` + Name string `json:"name"` + Description string `json:"description"` + Tools []string `json:"tools"` + SystemPrompt string `json:"system_prompt,omitempty"` + Enabled bool `json:"enabled"` + UpdateChecked bool `json:"update_checked"` + RemoteFound bool `json:"remote_found,omitempty"` + RemoteVersion string `json:"remote_version,omitempty"` + CheckError string `json:"check_error,omitempty"` } items := make([]skillItem, 0, len(entries)) checkUpdates := strings.TrimSpace(r.URL.Query().Get("check_updates")) != "0" @@ -532,7 +530,14 @@ func (s *RegistryServer) handleWebUISkills(w http.ResponseWriter, r *http.Reques name := e.Name() enabled := !strings.HasSuffix(name, ".disabled") baseName := strings.TrimSuffix(name, ".disabled") - it := skillItem{Name: baseName, Enabled: enabled, UpdateChecked: checkUpdates} + desc, tools, sys := readSkillMeta(filepath.Join(skillsDir, name, "SKILL.md")) + if desc == "" || len(tools) == 0 || sys == "" { + d2, t2, s2 := readSkillMeta(filepath.Join(skillsDir, baseName, "SKILL.md")) + if desc == "" { desc = d2 } + if len(tools) == 0 { tools = t2 } + if sys == "" { sys = s2 } + } + it := skillItem{ID: baseName, Name: baseName, Description: desc, Tools: tools, SystemPrompt: sys, Enabled: enabled, UpdateChecked: checkUpdates} if checkUpdates { found, version, checkErr := queryClawHubSkillVersion(r.Context(), baseName) it.RemoteFound = found @@ -545,22 +550,26 @@ func (s *RegistryServer) handleWebUISkills(w http.ResponseWriter, r *http.Reques } _ = json.NewEncoder(w).Encode(map[string]interface{}{"ok": true, "skills": items, "source": "clawhub"}) case http.MethodPost: - var body struct { - Action string `json:"action"` - Name string `json:"name"` - } + var body map[string]interface{} if err := json.NewDecoder(r.Body).Decode(&body); err != nil { http.Error(w, "invalid json", http.StatusBadRequest) return } - action := strings.ToLower(strings.TrimSpace(body.Action)) - name := strings.TrimSpace(body.Name) + action, _ := body["action"].(string) + action = strings.ToLower(strings.TrimSpace(action)) + id, _ := body["id"].(string) + name, _ := body["name"].(string) + if strings.TrimSpace(name) == "" { + name = id + } + name = strings.TrimSpace(name) if name == "" { http.Error(w, "name required", http.StatusBadRequest) return } enabledPath := filepath.Join(skillsDir, name) disabledPath := enabledPath + ".disabled" + switch action { case "enable": if _, err := os.Stat(disabledPath); err == nil { @@ -569,6 +578,7 @@ func (s *RegistryServer) handleWebUISkills(w http.ResponseWriter, r *http.Reques return } } + _ = json.NewEncoder(w).Encode(map[string]interface{}{"ok": true}) case "disable": if _, err := os.Stat(enabledPath); err == nil { if err := os.Rename(enabledPath, disabledPath); err != nil { @@ -576,16 +586,158 @@ func (s *RegistryServer) handleWebUISkills(w http.ResponseWriter, r *http.Reques return } } + _ = json.NewEncoder(w).Encode(map[string]interface{}{"ok": true}) + case "create", "update": + desc, _ := body["description"].(string) + sys, _ := body["system_prompt"].(string) + var toolsList []string + if arr, ok := body["tools"].([]interface{}); ok { + for _, v := range arr { + if sv, ok := v.(string); ok && strings.TrimSpace(sv) != "" { + toolsList = append(toolsList, strings.TrimSpace(sv)) + } + } + } + if action == "create" { + if _, err := os.Stat(enabledPath); err == nil { + http.Error(w, "skill already exists", http.StatusBadRequest) + return + } + } + if err := os.MkdirAll(filepath.Join(enabledPath, "scripts"), 0755); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + skillMD := buildSkillMarkdown(name, desc, toolsList, sys) + if err := os.WriteFile(filepath.Join(enabledPath, "SKILL.md"), []byte(skillMD), 0644); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + _ = json.NewEncoder(w).Encode(map[string]interface{}{"ok": true}) default: http.Error(w, "unsupported action", http.StatusBadRequest) + } + case http.MethodDelete: + id := strings.TrimSpace(r.URL.Query().Get("id")) + if id == "" { + http.Error(w, "id required", http.StatusBadRequest) return } - _ = json.NewEncoder(w).Encode(map[string]interface{}{"ok": true}) + pathA := filepath.Join(skillsDir, id) + pathB := pathA + ".disabled" + deleted := false + if err := os.RemoveAll(pathA); err == nil { + deleted = true + } + if err := os.RemoveAll(pathB); err == nil { + deleted = true + } + _ = json.NewEncoder(w).Encode(map[string]interface{}{"ok": true, "deleted": deleted, "id": id}) default: http.Error(w, "method not allowed", http.StatusMethodNotAllowed) } } +func buildSkillMarkdown(name, desc string, tools []string, systemPrompt string) string { + if strings.TrimSpace(desc) == "" { + desc = "No description provided." + } + t := strings.Join(tools, ", ") + return fmt.Sprintf(`--- +name: %s +description: %s +--- + +# %s + +%s + +## Tools +%s + +## System Prompt +%s +`, name, desc, name, desc, t, systemPrompt) +} + +func readSkillMeta(path string) (desc string, tools []string, systemPrompt string) { + b, err := os.ReadFile(path) + if err != nil { + return "", nil, "" + } + s := string(b) + reDesc := regexp.MustCompile(`(?m)^description:\s*(.+)$`) + reTools := regexp.MustCompile(`(?m)^##\s*Tools\s*$`) + rePrompt := regexp.MustCompile(`(?m)^##\s*System Prompt\s*$`) + if m := reDesc.FindStringSubmatch(s); len(m) > 1 { + desc = strings.TrimSpace(m[1]) + } + if loc := reTools.FindStringIndex(s); loc != nil { + block := s[loc[1]:] + if p := rePrompt.FindStringIndex(block); p != nil { + block = block[:p[0]] + } + for _, line := range strings.Split(block, "\n") { + v := strings.TrimSpace(strings.TrimPrefix(strings.TrimSpace(line), "-")) + if v != "" { + tools = append(tools, v) + } + } + } + if loc := rePrompt.FindStringIndex(s); loc != nil { + systemPrompt = strings.TrimSpace(s[loc[1]:]) + } + return +} + + +func normalizeCronJob(v interface{}) map[string]interface{} { + if v == nil { + return map[string]interface{}{} + } + b, err := json.Marshal(v) + if err != nil { + return map[string]interface{}{"raw": fmt.Sprintf("%v", v)} + } + var m map[string]interface{} + if err := json.Unmarshal(b, &m); err != nil { + return map[string]interface{}{"raw": string(b)} + } + out := map[string]interface{}{} + for k, val := range m { + out[k] = val + } + if sch, ok := m["schedule"].(map[string]interface{}); ok { + if kind, ok := sch["kind"]; ok { out["kind"] = kind } + if every, ok := sch["everyMs"]; ok { out["everyMs"] = every } + if expr, ok := sch["expr"]; ok { out["expr"] = expr } + if at, ok := sch["atMs"]; ok { out["atMs"] = at } + } + if payload, ok := m["payload"].(map[string]interface{}); ok { + if msg, ok := payload["message"]; ok { out["message"] = msg } + if d, ok := payload["deliver"]; ok { out["deliver"] = d } + if c, ok := payload["channel"]; ok { out["channel"] = c } + if to, ok := payload["to"]; ok { out["to"] = to } + } + return out +} + +func normalizeCronJobs(v interface{}) []map[string]interface{} { + b, err := json.Marshal(v) + if err != nil { + return []map[string]interface{}{} + } + var arr []interface{} + if err := json.Unmarshal(b, &arr); err != nil { + return []map[string]interface{}{} + } + out := make([]map[string]interface{}, 0, len(arr)) + for _, it := range arr { + out = append(out, normalizeCronJob(it)) + } + return out +} + func queryClawHubSkillVersion(ctx context.Context, skill string) (found bool, version string, err error) { skill = strings.TrimSpace(skill) if skill == "" { @@ -743,21 +895,28 @@ func (s *RegistryServer) handleWebUILogsStream(w http.ResponseWriter, r *http.Re http.Error(w, "stream unsupported", http.StatusInternalServerError) return } - w.Header().Set("Content-Type", "text/event-stream") + w.Header().Set("Content-Type", "application/x-ndjson") w.Header().Set("Cache-Control", "no-cache") w.Header().Set("Connection", "keep-alive") - buf := make([]byte, 4096) + + reader := bufio.NewReader(f) for { select { case <-r.Context().Done(): return default: - n, err := f.Read(buf) - if n > 0 { - chunk := strings.ReplaceAll(string(buf[:n]), "\n", "\\n") - chunk = strings.ReplaceAll(chunk, "\"", "'") - _, _ = w.Write([]byte("event: log\ndata: {\"chunk\":\"" + chunk + "\"}\n\n")) - flusher.Flush() + line, err := reader.ReadString('\n') + if len(line) > 0 { + trim := strings.TrimRight(line, "\r\n") + if trim != "" { + if json.Valid([]byte(trim)) { + _, _ = w.Write([]byte(trim + "\n")) + } else { + fallback, _ := json.Marshal(map[string]interface{}{"time": time.Now().UTC().Format(time.RFC3339), "level": "INFO", "msg": trim}) + _, _ = w.Write(append(fallback, '\n')) + } + flusher.Flush() + } } if err != nil { time.Sleep(500 * time.Millisecond)