diff --git a/cmd/clawgo/cmd_gateway.go b/cmd/clawgo/cmd_gateway.go index 8830d30..d34bfeb 100644 --- a/cmd/clawgo/cmd_gateway.go +++ b/cmd/clawgo/cmd_gateway.go @@ -179,6 +179,8 @@ func gatewayCmd() { registryServer := nodes.NewRegistryServer(cfg.Gateway.Host, cfg.Gateway.Port, cfg.Gateway.Token, nodes.DefaultManager()) registryServer.SetConfigPath(getConfigPath()) + registryServer.SetWorkspacePath(cfg.WorkspacePath()) + registryServer.SetLogFilePath(cfg.LogFilePath()) registryServer.SetWebUIDir(filepath.Join(cfg.WorkspacePath(), "webui-dist")) registryServer.SetChatHandler(func(cctx context.Context, sessionKey, content string) (string, error) { if strings.TrimSpace(content) == "" { @@ -186,6 +188,9 @@ func gatewayCmd() { } return agentLoop.ProcessDirect(cctx, content, sessionKey) }) + registryServer.SetChannelsHandler(func() interface{} { + return channelManager.GetStatus() + }) registryServer.SetConfigAfterHook(func() { _ = syscall.Kill(os.Getpid(), syscall.SIGHUP) }) diff --git a/pkg/nodes/registry_server.go b/pkg/nodes/registry_server.go index fb96f40..770a161 100644 --- a/pkg/nodes/registry_server.go +++ b/pkg/nodes/registry_server.go @@ -20,9 +20,12 @@ type RegistryServer struct { mgr *Manager server *http.Server configPath string + workspacePath string + logFilePath string onChat func(ctx context.Context, sessionKey, content string) (string, error) onConfigAfter func() onCron func(action string, args map[string]interface{}) (interface{}, error) + onChannels func() interface{} webUIDir string } @@ -38,9 +41,12 @@ func NewRegistryServer(host string, port int, token string, mgr *Manager) *Regis } func (s *RegistryServer) SetConfigPath(path string) { s.configPath = strings.TrimSpace(path) } +func (s *RegistryServer) SetWorkspacePath(path string) { s.workspacePath = strings.TrimSpace(path) } +func (s *RegistryServer) SetLogFilePath(path string) { s.logFilePath = strings.TrimSpace(path) } func (s *RegistryServer) SetChatHandler(fn func(ctx context.Context, sessionKey, content string) (string, error)) { s.onChat = fn } +func (s *RegistryServer) SetChannelsHandler(fn func() interface{}) { s.onChannels = fn } func (s *RegistryServer) SetConfigAfterHook(fn func()) { s.onConfigAfter = fn } func (s *RegistryServer) SetCronHandler(fn func(action string, args map[string]interface{}) (interface{}, error)) { s.onCron = fn @@ -62,9 +68,14 @@ func (s *RegistryServer) Start(ctx context.Context) error { mux.HandleFunc("/webui/", s.handleWebUIAsset) mux.HandleFunc("/webui/api/config", s.handleWebUIConfig) mux.HandleFunc("/webui/api/chat", s.handleWebUIChat) + mux.HandleFunc("/webui/api/chat/stream", s.handleWebUIChatStream) mux.HandleFunc("/webui/api/upload", s.handleWebUIUpload) mux.HandleFunc("/webui/api/nodes", s.handleWebUINodes) mux.HandleFunc("/webui/api/cron", s.handleWebUICron) + mux.HandleFunc("/webui/api/channels", s.handleWebUIChannels) + mux.HandleFunc("/webui/api/skills", s.handleWebUISkills) + mux.HandleFunc("/webui/api/exec_approvals", s.handleWebUIExecApprovals) + mux.HandleFunc("/webui/api/logs/stream", s.handleWebUILogsStream) s.server = &http.Server{Addr: s.addr, Handler: mux} go func() { <-ctx.Done() @@ -332,6 +343,71 @@ func (s *RegistryServer) handleWebUIChat(w http.ResponseWriter, r *http.Request) _ = json.NewEncoder(w).Encode(map[string]interface{}{"ok": true, "reply": resp, "session": session}) } +func (s *RegistryServer) handleWebUIChatStream(w http.ResponseWriter, r *http.Request) { + if !s.checkAuth(r) { + http.Error(w, "unauthorized", http.StatusUnauthorized) + return + } + if r.Method != http.MethodPost { + http.Error(w, "method not allowed", http.StatusMethodNotAllowed) + return + } + if s.onChat == nil { + http.Error(w, "chat handler not configured", http.StatusInternalServerError) + return + } + flusher, ok := w.(http.Flusher) + if !ok { + http.Error(w, "stream unsupported", http.StatusInternalServerError) + return + } + var body struct { + Session string `json:"session"` + Message string `json:"message"` + Media string `json:"media"` + } + if err := json.NewDecoder(r.Body).Decode(&body); err != nil { + http.Error(w, "invalid json", http.StatusBadRequest) + return + } + session := strings.TrimSpace(body.Session) + if session == "" { + session = "webui:default" + } + prompt := strings.TrimSpace(body.Message) + if strings.TrimSpace(body.Media) != "" { + if prompt != "" { + prompt += "\n" + } + prompt += "[file: " + strings.TrimSpace(body.Media) + "]" + } + + w.Header().Set("Content-Type", "text/event-stream") + w.Header().Set("Cache-Control", "no-cache") + w.Header().Set("Connection", "keep-alive") + _, _ = w.Write([]byte("event: start\ndata: {\"ok\":true}\n\n")) + flusher.Flush() + resp, err := s.onChat(r.Context(), session, prompt) + if err != nil { + _, _ = w.Write([]byte("event: error\ndata: {\"error\":\"" + strings.ReplaceAll(err.Error(), "\"", "'") + "\"}\n\n")) + flusher.Flush() + return + } + chunk := 180 + for i := 0; i < len(resp); i += chunk { + end := i + chunk + if end > len(resp) { + end = len(resp) + } + part := strings.ReplaceAll(resp[i:end], "\n", "\\n") + part = strings.ReplaceAll(part, "\"", "'") + _, _ = w.Write([]byte("event: delta\ndata: {\"text\":\"" + part + "\"}\n\n")) + flusher.Flush() + } + _, _ = w.Write([]byte("event: done\ndata: {\"ok\":true}\n\n")) + flusher.Flush() +} + func (s *RegistryServer) handleWebUINodes(w http.ResponseWriter, r *http.Request) { if !s.checkAuth(r) { http.Error(w, "unauthorized", http.StatusUnauthorized) @@ -398,6 +474,214 @@ func (s *RegistryServer) handleWebUICron(w http.ResponseWriter, r *http.Request) } } +func (s *RegistryServer) handleWebUIChannels(w http.ResponseWriter, r *http.Request) { + if !s.checkAuth(r) { + http.Error(w, "unauthorized", http.StatusUnauthorized) + return + } + if r.Method != http.MethodGet { + http.Error(w, "method not allowed", http.StatusMethodNotAllowed) + return + } + res := map[string]interface{}{"ok": true, "channels": []interface{}{}} + if s.onChannels != nil { + res["channels"] = s.onChannels() + } + _ = json.NewEncoder(w).Encode(res) +} + +func (s *RegistryServer) handleWebUISkills(w http.ResponseWriter, r *http.Request) { + if !s.checkAuth(r) { + http.Error(w, "unauthorized", http.StatusUnauthorized) + return + } + skillsDir := filepath.Join(strings.TrimSpace(s.workspacePath), "skills") + if skillsDir == "" { + http.Error(w, "workspace not configured", http.StatusInternalServerError) + return + } + switch r.Method { + case http.MethodGet: + entries, err := os.ReadDir(skillsDir) + if err != nil { + if os.IsNotExist(err) { + _ = json.NewEncoder(w).Encode(map[string]interface{}{"ok": true, "skills": []interface{}{}}) + return + } + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + type skillItem struct { + Name string `json:"name"` + Enabled bool `json:"enabled"` + } + items := make([]skillItem, 0, len(entries)) + for _, e := range entries { + if !e.IsDir() { + continue + } + name := e.Name() + enabled := !strings.HasSuffix(name, ".disabled") + items = append(items, skillItem{Name: strings.TrimSuffix(name, ".disabled"), Enabled: enabled}) + } + _ = json.NewEncoder(w).Encode(map[string]interface{}{"ok": true, "skills": items}) + case http.MethodPost: + var body struct { + Action string `json:"action"` + Name string `json:"name"` + } + if err := json.NewDecoder(r.Body).Decode(&body); err != nil { + http.Error(w, "invalid json", http.StatusBadRequest) + return + } + action := strings.ToLower(strings.TrimSpace(body.Action)) + name := strings.TrimSpace(body.Name) + if name == "" { + http.Error(w, "name required", http.StatusBadRequest) + return + } + enabledPath := filepath.Join(skillsDir, name) + disabledPath := enabledPath + ".disabled" + switch action { + case "enable": + if _, err := os.Stat(disabledPath); err == nil { + if err := os.Rename(disabledPath, enabledPath); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + } + case "disable": + if _, err := os.Stat(enabledPath); err == nil { + if err := os.Rename(enabledPath, disabledPath); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + } + default: + http.Error(w, "unsupported action", http.StatusBadRequest) + return + } + _ = json.NewEncoder(w).Encode(map[string]interface{}{"ok": true}) + default: + http.Error(w, "method not allowed", http.StatusMethodNotAllowed) + } +} + +func (s *RegistryServer) handleWebUIExecApprovals(w http.ResponseWriter, r *http.Request) { + if !s.checkAuth(r) { + http.Error(w, "unauthorized", http.StatusUnauthorized) + return + } + if strings.TrimSpace(s.configPath) == "" { + http.Error(w, "config path not set", http.StatusInternalServerError) + return + } + b, err := os.ReadFile(s.configPath) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + var cfg map[string]interface{} + if err := json.Unmarshal(b, &cfg); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + if r.Method == http.MethodGet { + toolsMap, _ := cfg["tools"].(map[string]interface{}) + shellMap, _ := toolsMap["shell"].(map[string]interface{}) + _ = json.NewEncoder(w).Encode(map[string]interface{}{"ok": true, "exec_approvals": shellMap}) + return + } + if r.Method == http.MethodPost { + var body map[string]interface{} + if err := json.NewDecoder(r.Body).Decode(&body); err != nil { + http.Error(w, "invalid json", http.StatusBadRequest) + return + } + toolsMap, _ := cfg["tools"].(map[string]interface{}) + if toolsMap == nil { + toolsMap = map[string]interface{}{} + cfg["tools"] = toolsMap + } + shellMap, _ := toolsMap["shell"].(map[string]interface{}) + if shellMap == nil { + shellMap = map[string]interface{}{} + toolsMap["shell"] = shellMap + } + for k, v := range body { + shellMap[k] = v + } + out, _ := json.MarshalIndent(cfg, "", " ") + tmp := s.configPath + ".tmp" + if err := os.WriteFile(tmp, out, 0644); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + if err := os.Rename(tmp, s.configPath); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + if s.onConfigAfter != nil { + s.onConfigAfter() + } + _ = json.NewEncoder(w).Encode(map[string]interface{}{"ok": true, "reloaded": true}) + return + } + http.Error(w, "method not allowed", http.StatusMethodNotAllowed) +} + +func (s *RegistryServer) handleWebUILogsStream(w http.ResponseWriter, r *http.Request) { + if !s.checkAuth(r) { + http.Error(w, "unauthorized", http.StatusUnauthorized) + return + } + if r.Method != http.MethodGet { + http.Error(w, "method not allowed", http.StatusMethodNotAllowed) + return + } + path := strings.TrimSpace(s.logFilePath) + if path == "" { + http.Error(w, "log path not configured", http.StatusInternalServerError) + return + } + f, err := os.Open(path) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + defer f.Close() + fi, _ := f.Stat() + if fi != nil { + _, _ = f.Seek(fi.Size(), io.SeekStart) + } + flusher, ok := w.(http.Flusher) + if !ok { + http.Error(w, "stream unsupported", http.StatusInternalServerError) + return + } + w.Header().Set("Content-Type", "text/event-stream") + w.Header().Set("Cache-Control", "no-cache") + w.Header().Set("Connection", "keep-alive") + buf := make([]byte, 4096) + for { + select { + case <-r.Context().Done(): + return + default: + n, err := f.Read(buf) + if n > 0 { + chunk := strings.ReplaceAll(string(buf[:n]), "\n", "\\n") + chunk = strings.ReplaceAll(chunk, "\"", "'") + _, _ = w.Write([]byte("event: log\ndata: {\"chunk\":\"" + chunk + "\"}\n\n")) + flusher.Flush() + } + if err != nil { + time.Sleep(500 * time.Millisecond) + } + } + } +} + func (s *RegistryServer) checkAuth(r *http.Request) bool { if s.token == "" { return true