refactor(api): move webui endpoints out of nodes and fix office state updates

This commit is contained in:
lpf
2026-03-05 16:09:36 +08:00
parent e4da6d2141
commit 3553be2d53
6 changed files with 738 additions and 168 deletions

12
pkg/api/reload_unix.go Normal file
View File

@@ -0,0 +1,12 @@
//go:build !windows
package api
import (
"os"
"syscall"
)
func requestSelfReloadSignal() error {
return syscall.Kill(os.Getpid(), syscall.SIGHUP)
}

View File

@@ -0,0 +1,8 @@
//go:build windows
package api
// requestSelfReloadSignal is a no-op on Windows (no SIGHUP semantics).
func requestSelfReloadSignal() error {
return nil
}

View File

@@ -1,4 +1,4 @@
package nodes
package api
import (
"archive/tar"
@@ -27,12 +27,13 @@ import (
"time"
cfgpkg "clawgo/pkg/config"
"clawgo/pkg/nodes"
)
type RegistryServer struct {
type Server struct {
addr string
token string
mgr *Manager
mgr *nodes.Manager
server *http.Server
configPath string
workspacePath string
@@ -49,7 +50,7 @@ type RegistryServer struct {
ekgCacheRows []map[string]interface{}
}
func NewRegistryServer(host string, port int, token string, mgr *Manager) *RegistryServer {
func NewServer(host string, port int, token string, mgr *nodes.Manager) *Server {
addr := strings.TrimSpace(host)
if addr == "" {
addr = "0.0.0.0"
@@ -57,25 +58,25 @@ func NewRegistryServer(host string, port int, token string, mgr *Manager) *Regis
if port <= 0 {
port = 7788
}
return &RegistryServer{addr: fmt.Sprintf("%s:%d", addr, port), token: strings.TrimSpace(token), mgr: mgr}
return &Server{addr: fmt.Sprintf("%s:%d", addr, port), token: strings.TrimSpace(token), mgr: mgr}
}
func (s *RegistryServer) SetConfigPath(path string) { s.configPath = strings.TrimSpace(path) }
func (s *RegistryServer) SetWorkspacePath(path string) { s.workspacePath = strings.TrimSpace(path) }
func (s *RegistryServer) SetLogFilePath(path string) { s.logFilePath = strings.TrimSpace(path) }
func (s *RegistryServer) SetChatHandler(fn func(ctx context.Context, sessionKey, content string) (string, error)) {
func (s *Server) SetConfigPath(path string) { s.configPath = strings.TrimSpace(path) }
func (s *Server) SetWorkspacePath(path string) { s.workspacePath = strings.TrimSpace(path) }
func (s *Server) SetLogFilePath(path string) { s.logFilePath = strings.TrimSpace(path) }
func (s *Server) SetChatHandler(fn func(ctx context.Context, sessionKey, content string) (string, error)) {
s.onChat = fn
}
func (s *RegistryServer) SetChatHistoryHandler(fn func(sessionKey string) []map[string]interface{}) {
func (s *Server) SetChatHistoryHandler(fn func(sessionKey string) []map[string]interface{}) {
s.onChatHistory = fn
}
func (s *RegistryServer) SetConfigAfterHook(fn func()) { s.onConfigAfter = fn }
func (s *RegistryServer) SetCronHandler(fn func(action string, args map[string]interface{}) (interface{}, error)) {
func (s *Server) SetConfigAfterHook(fn func()) { s.onConfigAfter = fn }
func (s *Server) SetCronHandler(fn func(action string, args map[string]interface{}) (interface{}, error)) {
s.onCron = fn
}
func (s *RegistryServer) SetWebUIDir(dir string) { s.webUIDir = strings.TrimSpace(dir) }
func (s *Server) SetWebUIDir(dir string) { s.webUIDir = strings.TrimSpace(dir) }
func (s *RegistryServer) Start(ctx context.Context) error {
func (s *Server) Start(ctx context.Context) error {
if s.mgr == nil {
return nil
}
@@ -119,7 +120,7 @@ func (s *RegistryServer) Start(ctx context.Context) error {
return nil
}
func (s *RegistryServer) handleRegister(w http.ResponseWriter, r *http.Request) {
func (s *Server) handleRegister(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
return
@@ -128,7 +129,7 @@ func (s *RegistryServer) handleRegister(w http.ResponseWriter, r *http.Request)
http.Error(w, "unauthorized", http.StatusUnauthorized)
return
}
var n NodeInfo
var n nodes.NodeInfo
if err := json.NewDecoder(r.Body).Decode(&n); err != nil {
http.Error(w, "invalid json", http.StatusBadRequest)
return
@@ -142,7 +143,7 @@ func (s *RegistryServer) handleRegister(w http.ResponseWriter, r *http.Request)
_ = json.NewEncoder(w).Encode(map[string]interface{}{"ok": true, "id": n.ID})
}
func (s *RegistryServer) handleHeartbeat(w http.ResponseWriter, r *http.Request) {
func (s *Server) handleHeartbeat(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
return
@@ -170,7 +171,7 @@ func (s *RegistryServer) handleHeartbeat(w http.ResponseWriter, r *http.Request)
_ = json.NewEncoder(w).Encode(map[string]interface{}{"ok": true, "id": body.ID})
}
func (s *RegistryServer) handleWebUI(w http.ResponseWriter, r *http.Request) {
func (s *Server) handleWebUI(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
return
@@ -196,7 +197,7 @@ func (s *RegistryServer) handleWebUI(w http.ResponseWriter, r *http.Request) {
_, _ = w.Write([]byte(webUIHTML))
}
func (s *RegistryServer) handleWebUIAsset(w http.ResponseWriter, r *http.Request) {
func (s *Server) handleWebUIAsset(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
return
@@ -215,7 +216,7 @@ func (s *RegistryServer) handleWebUIAsset(w http.ResponseWriter, r *http.Request
http.NotFound(w, r)
}
func (s *RegistryServer) tryServeWebUIDist(w http.ResponseWriter, r *http.Request, reqPath string) bool {
func (s *Server) tryServeWebUIDist(w http.ResponseWriter, r *http.Request, reqPath string) bool {
dir := strings.TrimSpace(s.webUIDir)
if dir == "" {
return false
@@ -237,7 +238,7 @@ func (s *RegistryServer) tryServeWebUIDist(w http.ResponseWriter, r *http.Reques
return true
}
func (s *RegistryServer) handleWebUIConfig(w http.ResponseWriter, r *http.Request) {
func (s *Server) handleWebUIConfig(w http.ResponseWriter, r *http.Request) {
if !s.checkAuth(r) {
http.Error(w, "unauthorized", http.StatusUnauthorized)
return
@@ -404,7 +405,7 @@ func getPathValue(m map[string]interface{}, path string) interface{} {
return cur
}
func (s *RegistryServer) handleWebUIUpload(w http.ResponseWriter, r *http.Request) {
func (s *Server) handleWebUIUpload(w http.ResponseWriter, r *http.Request) {
if !s.checkAuth(r) {
http.Error(w, "unauthorized", http.StatusUnauthorized)
return
@@ -440,7 +441,7 @@ func (s *RegistryServer) handleWebUIUpload(w http.ResponseWriter, r *http.Reques
_ = json.NewEncoder(w).Encode(map[string]interface{}{"ok": true, "path": path, "name": h.Filename})
}
func (s *RegistryServer) handleWebUIChat(w http.ResponseWriter, r *http.Request) {
func (s *Server) handleWebUIChat(w http.ResponseWriter, r *http.Request) {
if !s.checkAuth(r) {
http.Error(w, "unauthorized", http.StatusUnauthorized)
return
@@ -484,7 +485,7 @@ func (s *RegistryServer) handleWebUIChat(w http.ResponseWriter, r *http.Request)
_ = json.NewEncoder(w).Encode(map[string]interface{}{"ok": true, "reply": resp, "session": session})
}
func (s *RegistryServer) handleWebUIChatHistory(w http.ResponseWriter, r *http.Request) {
func (s *Server) handleWebUIChatHistory(w http.ResponseWriter, r *http.Request) {
if !s.checkAuth(r) {
http.Error(w, "unauthorized", http.StatusUnauthorized)
return
@@ -504,7 +505,7 @@ func (s *RegistryServer) handleWebUIChatHistory(w http.ResponseWriter, r *http.R
_ = json.NewEncoder(w).Encode(map[string]interface{}{"ok": true, "session": session, "messages": s.onChatHistory(session)})
}
func (s *RegistryServer) handleWebUIChatStream(w http.ResponseWriter, r *http.Request) {
func (s *Server) handleWebUIChatStream(w http.ResponseWriter, r *http.Request) {
if !s.checkAuth(r) {
http.Error(w, "unauthorized", http.StatusUnauthorized)
return
@@ -566,7 +567,7 @@ func (s *RegistryServer) handleWebUIChatStream(w http.ResponseWriter, r *http.Re
}
}
func (s *RegistryServer) handleWebUIVersion(w http.ResponseWriter, r *http.Request) {
func (s *Server) handleWebUIVersion(w http.ResponseWriter, r *http.Request) {
if !s.checkAuth(r) {
http.Error(w, "unauthorized", http.StatusUnauthorized)
return
@@ -582,19 +583,19 @@ func (s *RegistryServer) handleWebUIVersion(w http.ResponseWriter, r *http.Reque
})
}
func (s *RegistryServer) handleWebUINodes(w http.ResponseWriter, r *http.Request) {
func (s *Server) handleWebUINodes(w http.ResponseWriter, r *http.Request) {
if !s.checkAuth(r) {
http.Error(w, "unauthorized", http.StatusUnauthorized)
return
}
switch r.Method {
case http.MethodGet:
list := []NodeInfo{}
list := []nodes.NodeInfo{}
if s.mgr != nil {
list = s.mgr.List()
}
host, _ := os.Hostname()
local := NodeInfo{ID: "local", Name: "local", Endpoint: "gateway", Version: gatewayBuildVersion(), LastSeenAt: time.Now(), Online: true}
local := nodes.NodeInfo{ID: "local", Name: "local", Endpoint: "gateway", Version: gatewayBuildVersion(), LastSeenAt: time.Now(), Online: true}
if strings.TrimSpace(host) != "" {
local.Name = host
}
@@ -623,7 +624,7 @@ func (s *RegistryServer) handleWebUINodes(w http.ResponseWriter, r *http.Request
}
}
if !matched {
list = append([]NodeInfo{local}, list...)
list = append([]nodes.NodeInfo{local}, list...)
}
_ = json.NewEncoder(w).Encode(map[string]interface{}{"ok": true, "nodes": list})
case http.MethodPost:
@@ -652,7 +653,7 @@ func (s *RegistryServer) handleWebUINodes(w http.ResponseWriter, r *http.Request
}
}
func (s *RegistryServer) handleWebUICron(w http.ResponseWriter, r *http.Request) {
func (s *Server) handleWebUICron(w http.ResponseWriter, r *http.Request) {
if !s.checkAuth(r) {
http.Error(w, "unauthorized", http.StatusUnauthorized)
return
@@ -702,7 +703,7 @@ func (s *RegistryServer) handleWebUICron(w http.ResponseWriter, r *http.Request)
}
}
func (s *RegistryServer) handleWebUISkills(w http.ResponseWriter, r *http.Request) {
func (s *Server) handleWebUISkills(w http.ResponseWriter, r *http.Request) {
if !s.checkAuth(r) {
http.Error(w, "unauthorized", http.StatusUnauthorized)
return
@@ -1802,7 +1803,7 @@ func anyToString(v interface{}) string {
}
}
func (s *RegistryServer) handleWebUISessions(w http.ResponseWriter, r *http.Request) {
func (s *Server) handleWebUISessions(w http.ResponseWriter, r *http.Request) {
if !s.checkAuth(r) {
http.Error(w, "unauthorized", http.StatusUnauthorized)
return
@@ -1850,7 +1851,7 @@ func (s *RegistryServer) handleWebUISessions(w http.ResponseWriter, r *http.Requ
_ = json.NewEncoder(w).Encode(map[string]interface{}{"ok": true, "sessions": out})
}
func (s *RegistryServer) handleWebUIMemory(w http.ResponseWriter, r *http.Request) {
func (s *Server) handleWebUIMemory(w http.ResponseWriter, r *http.Request) {
if !s.checkAuth(r) {
http.Error(w, "unauthorized", http.StatusUnauthorized)
return
@@ -1925,7 +1926,7 @@ func (s *RegistryServer) handleWebUIMemory(w http.ResponseWriter, r *http.Reques
}
}
func (s *RegistryServer) handleWebUITaskAudit(w http.ResponseWriter, r *http.Request) {
func (s *Server) handleWebUITaskAudit(w http.ResponseWriter, r *http.Request) {
if !s.checkAuth(r) {
http.Error(w, "unauthorized", http.StatusUnauthorized)
return
@@ -2041,7 +2042,7 @@ func (s *RegistryServer) handleWebUITaskAudit(w http.ResponseWriter, r *http.Req
_ = json.NewEncoder(w).Encode(map[string]interface{}{"ok": true, "items": items})
}
func (s *RegistryServer) handleWebUITaskQueue(w http.ResponseWriter, r *http.Request) {
func (s *Server) handleWebUITaskQueue(w http.ResponseWriter, r *http.Request) {
if !s.checkAuth(r) {
http.Error(w, "unauthorized", http.StatusUnauthorized)
return
@@ -2258,7 +2259,7 @@ func (s *RegistryServer) handleWebUITaskQueue(w http.ResponseWriter, r *http.Req
_ = json.NewEncoder(w).Encode(map[string]interface{}{"ok": true, "running": running, "items": items, "stats": stats})
}
func (s *RegistryServer) handleWebUITaskDailySummary(w http.ResponseWriter, r *http.Request) {
func (s *Server) handleWebUITaskDailySummary(w http.ResponseWriter, r *http.Request) {
if !s.checkAuth(r) {
http.Error(w, "unauthorized", http.StatusUnauthorized)
return
@@ -2290,7 +2291,7 @@ func (s *RegistryServer) handleWebUITaskDailySummary(w http.ResponseWriter, r *h
_ = json.NewEncoder(w).Encode(map[string]interface{}{"ok": true, "date": date, "report": report})
}
func (s *RegistryServer) loadEKGRowsCached(path string, maxLines int) []map[string]interface{} {
func (s *Server) loadEKGRowsCached(path string, maxLines int) []map[string]interface{} {
path = strings.TrimSpace(path)
if path == "" {
return nil
@@ -2332,7 +2333,7 @@ func (s *RegistryServer) loadEKGRowsCached(path string, maxLines int) []map[stri
return rows
}
func (s *RegistryServer) handleWebUIEKGStats(w http.ResponseWriter, r *http.Request) {
func (s *Server) handleWebUIEKGStats(w http.ResponseWriter, r *http.Request) {
if !s.checkAuth(r) {
http.Error(w, "unauthorized", http.StatusUnauthorized)
return
@@ -2470,7 +2471,7 @@ func (s *RegistryServer) handleWebUIEKGStats(w http.ResponseWriter, r *http.Requ
})
}
func (s *RegistryServer) handleWebUIOfficeState(w http.ResponseWriter, r *http.Request) {
func (s *Server) handleWebUIOfficeState(w http.ResponseWriter, r *http.Request) {
if !s.checkAuth(r) {
http.Error(w, "unauthorized", http.StatusUnauthorized)
return
@@ -2496,6 +2497,40 @@ func (s *RegistryServer) handleWebUIOfficeState(w http.ResponseWriter, r *http.R
}
return time.Time{}
}
normalizeTaskStatus := func(raw string) string {
st := strings.ToLower(strings.TrimSpace(raw))
switch st {
case "running", "doing", "in_progress", "in-progress", "executing", "processing", "active":
return "running"
case "waiting", "queued", "queue", "todo", "pending", "paused", "idle":
return "waiting"
case "blocked":
return "blocked"
case "error", "failed", "fail":
return "error"
case "success", "done", "completed", "complete":
return "success"
case "suppressed", "skip", "skipped":
return "suppressed"
default:
return st
}
}
isFreshTaskState := func(status string, ts time.Time) bool {
if ts.IsZero() {
return false
}
window := 30 * time.Minute
switch status {
case "running", "waiting":
window = 2 * time.Hour
case "blocked", "error":
window = 6 * time.Hour
case "success", "suppressed":
window = 30 * time.Minute
}
return !ts.Before(now.Add(-window))
}
ipv4Pattern := regexp.MustCompile(`\b(\d{1,3})\.(\d{1,3})\.(\d{1,3})\.(\d{1,3})\b`)
maskIPv4 := func(text string) string {
if strings.TrimSpace(text) == "" {
@@ -2537,6 +2572,11 @@ func (s *RegistryServer) handleWebUIOfficeState(w http.ResponseWriter, r *http.R
continue
}
t := parseTime(fmt.Sprintf("%v", row["time"]))
row["status"] = normalizeTaskStatus(fmt.Sprintf("%v", row["status"]))
st := fmt.Sprintf("%v", row["status"])
if !isFreshTaskState(st, t) {
continue
}
prev, ok := latestTimeByTask[taskID]
if ok && !t.IsZero() && t.Before(prev) {
continue
@@ -2559,12 +2599,16 @@ func (s *RegistryServer) handleWebUIOfficeState(w http.ResponseWriter, r *http.R
row := map[string]interface{}{
"task_id": id,
"time": fmt.Sprintf("%v", t["updated_at"]),
"status": fmt.Sprintf("%v", t["status"]),
"status": normalizeTaskStatus(fmt.Sprintf("%v", t["status"])),
"source": fmt.Sprintf("%v", t["source"]),
"input_preview": fmt.Sprintf("%v", t["content"]),
"log": fmt.Sprintf("%v", t["block_reason"]),
}
tm := parseTime(fmt.Sprintf("%v", row["time"]))
st := fmt.Sprintf("%v", row["status"])
if !isFreshTaskState(st, tm) {
continue
}
prev, ok := latestTimeByTask[id]
if !ok || prev.IsZero() || (!tm.IsZero() && tm.After(prev)) {
latestByTask[id] = row
@@ -2604,7 +2648,7 @@ func (s *RegistryServer) handleWebUIOfficeState(w http.ResponseWriter, r *http.R
"suppressed": 0,
}
for _, row := range items {
st := strings.ToLower(strings.TrimSpace(fmt.Sprintf("%v", row["status"])))
st := normalizeTaskStatus(fmt.Sprintf("%v", row["status"]))
if _, ok := stats[st]; ok {
stats[st]++
}
@@ -2651,7 +2695,7 @@ func (s *RegistryServer) handleWebUIOfficeState(w http.ResponseWriter, r *http.R
}
}
for _, row := range items {
st := strings.ToLower(strings.TrimSpace(fmt.Sprintf("%v", row["status"])))
st := normalizeTaskStatus(fmt.Sprintf("%v", row["status"]))
if isMainStatus(st) {
mainTaskID = strings.TrimSpace(fmt.Sprintf("%v", row["task_id"]))
mainDetail = strings.TrimSpace(fmt.Sprintf("%v", row["input_preview"]))
@@ -2675,7 +2719,7 @@ func (s *RegistryServer) handleWebUIOfficeState(w http.ResponseWriter, r *http.R
}
}
nodeState := func(n NodeInfo) string {
nodeState := func(n nodes.NodeInfo) string {
if !n.Online {
return "offline"
}
@@ -2685,7 +2729,7 @@ func (s *RegistryServer) handleWebUIOfficeState(w http.ResponseWriter, r *http.R
}
return "online"
}
nodeZone := func(n NodeInfo) string {
nodeZone := func(n nodes.NodeInfo) string {
if !n.Online {
return "bug"
}
@@ -2697,7 +2741,7 @@ func (s *RegistryServer) handleWebUIOfficeState(w http.ResponseWriter, r *http.R
}
return "breakroom"
}
nodeDetail := func(n NodeInfo) string {
nodeDetail := func(n nodes.NodeInfo) string {
parts := make([]string, 0, 4)
if ep := strings.TrimSpace(n.Endpoint); ep != "" {
parts = append(parts, maskIPv4(ep))
@@ -2722,12 +2766,12 @@ func (s *RegistryServer) handleWebUIOfficeState(w http.ResponseWriter, r *http.R
return maskIPv4(strings.Join(parts, " · "))
}
allNodes := []NodeInfo{}
allNodes := []nodes.NodeInfo{}
if s.mgr != nil {
allNodes = s.mgr.List()
}
host, _ := os.Hostname()
localNode := NodeInfo{ID: "local", Name: "local", Endpoint: "gateway", Version: gatewayBuildVersion(), LastSeenAt: now, Online: true}
localNode := nodes.NodeInfo{ID: "local", Name: "local", Endpoint: "gateway", Version: gatewayBuildVersion(), LastSeenAt: now, Online: true}
if strings.TrimSpace(host) != "" {
localNode.Name = strings.TrimSpace(host)
}
@@ -2736,7 +2780,7 @@ func (s *RegistryServer) handleWebUIOfficeState(w http.ResponseWriter, r *http.R
}
hostLower := strings.ToLower(strings.TrimSpace(host))
mainNode := localNode
otherNodes := make([]NodeInfo, 0, len(allNodes))
otherNodes := make([]nodes.NodeInfo, 0, len(allNodes))
for _, n := range allNodes {
idLower := strings.ToLower(strings.TrimSpace(n.ID))
nameLower := strings.ToLower(strings.TrimSpace(n.Name))
@@ -2852,7 +2896,7 @@ func (s *RegistryServer) handleWebUIOfficeState(w http.ResponseWriter, r *http.R
})
}
func (s *RegistryServer) handleWebUITasks(w http.ResponseWriter, r *http.Request) {
func (s *Server) handleWebUITasks(w http.ResponseWriter, r *http.Request) {
if !s.checkAuth(r) {
http.Error(w, "unauthorized", http.StatusUnauthorized)
return
@@ -2959,7 +3003,7 @@ func (s *RegistryServer) handleWebUITasks(w http.ResponseWriter, r *http.Request
_ = json.NewEncoder(w).Encode(map[string]interface{}{"ok": true})
}
func (s *RegistryServer) handleWebUIExecApprovals(w http.ResponseWriter, r *http.Request) {
func (s *Server) handleWebUIExecApprovals(w http.ResponseWriter, r *http.Request) {
if !s.checkAuth(r) {
http.Error(w, "unauthorized", http.StatusUnauthorized)
return
@@ -3022,7 +3066,7 @@ func (s *RegistryServer) handleWebUIExecApprovals(w http.ResponseWriter, r *http
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
}
func (s *RegistryServer) handleWebUILogsRecent(w http.ResponseWriter, r *http.Request) {
func (s *Server) handleWebUILogsRecent(w http.ResponseWriter, r *http.Request) {
if !s.checkAuth(r) {
http.Error(w, "unauthorized", http.StatusUnauthorized)
return
@@ -3073,7 +3117,7 @@ func (s *RegistryServer) handleWebUILogsRecent(w http.ResponseWriter, r *http.Re
_ = json.NewEncoder(w).Encode(map[string]interface{}{"ok": true, "logs": out})
}
func (s *RegistryServer) handleWebUILogsStream(w http.ResponseWriter, r *http.Request) {
func (s *Server) handleWebUILogsStream(w http.ResponseWriter, r *http.Request) {
if !s.checkAuth(r) {
http.Error(w, "unauthorized", http.StatusUnauthorized)
return
@@ -3132,7 +3176,7 @@ func (s *RegistryServer) handleWebUILogsStream(w http.ResponseWriter, r *http.Re
}
}
func (s *RegistryServer) checkAuth(r *http.Request) bool {
func (s *Server) checkAuth(r *http.Request) bool {
if s.token == "" {
return true
}