From 4172a57b391f49d30995c785313c59f10e93b802 Mon Sep 17 00:00:00 2001 From: lpf Date: Sun, 8 Mar 2026 22:22:49 +0800 Subject: [PATCH] feat: unify websocket runtime and harden node control --- cmd/clawgo/cli_common.go | 1 + cmd/clawgo/cmd_node.go | 551 +++++++++++++++ cmd/clawgo/cmd_node_test.go | 132 ++++ cmd/clawgo/main.go | 2 + pkg/api/server.go | 738 ++++++++++++++++++++- pkg/api/server_test.go | 255 +++++++ pkg/nodes/types.go | 15 + pkg/tools/subagent_runtime_control_test.go | 1 + webui/package-lock.json | 36 +- webui/package.json | 4 +- webui/server.ts | 52 +- webui/src/context/AppContext.tsx | 124 +++- webui/src/pages/Chat.tsx | 108 ++- webui/src/pages/Logs.tsx | 67 +- webui/src/pages/Subagents.tsx | 120 +++- 15 files changed, 2082 insertions(+), 124 deletions(-) create mode 100644 cmd/clawgo/cmd_node.go create mode 100644 cmd/clawgo/cmd_node_test.go diff --git a/cmd/clawgo/cli_common.go b/cmd/clawgo/cli_common.go index c14d2f7..7164837 100644 --- a/cmd/clawgo/cli_common.go +++ b/cmd/clawgo/cli_common.go @@ -96,6 +96,7 @@ func printHelp() { fmt.Println(" config Get/set config values") fmt.Println(" cron Manage scheduled tasks") fmt.Println(" channel Test and manage messaging channels") + fmt.Println(" node Register remote node metadata and heartbeat") fmt.Println(" skills Manage skills (install, list, remove)") fmt.Println(" uninstall Uninstall clawgo components") fmt.Println(" version Show version information") diff --git a/cmd/clawgo/cmd_node.go b/cmd/clawgo/cmd_node.go new file mode 100644 index 0000000..2833a67 --- /dev/null +++ b/cmd/clawgo/cmd_node.go @@ -0,0 +1,551 @@ +package main + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "net/http" + "os" + "os/signal" + "runtime" + "strconv" + "strings" + "time" + + "clawgo/pkg/config" + "clawgo/pkg/nodes" + "github.com/gorilla/websocket" +) + +type nodeRegisterOptions struct { + GatewayBase string + Token string + ID string + Name string + Endpoint string + OS string + Arch string + Version string + Actions []string + Models []string + Capabilities nodes.Capabilities + Watch bool + HeartbeatSec int +} + +type nodeHeartbeatOptions struct { + GatewayBase string + Token string + ID string +} + +func nodeCmd() { + args := os.Args[2:] + if len(args) == 0 { + printNodeHelp() + return + } + switch strings.ToLower(strings.TrimSpace(args[0])) { + case "register": + nodeRegisterCmd(args[1:]) + case "heartbeat": + nodeHeartbeatCmd(args[1:]) + case "help", "--help", "-h": + printNodeHelp() + default: + fmt.Printf("Unknown node command: %s\n", args[0]) + printNodeHelp() + } +} + +func printNodeHelp() { + fmt.Println("Node commands:") + fmt.Println(" clawgo node register [options]") + fmt.Println(" clawgo node heartbeat [options]") + fmt.Println() + fmt.Println("Register options:") + fmt.Println(" --gateway Gateway base URL, e.g. http://host:18790") + fmt.Println(" --token Gateway token (optional when gateway.token is empty)") + fmt.Println(" --id Node ID (default: hostname)") + fmt.Println(" --name Node name (default: hostname)") + fmt.Println(" --endpoint Public endpoint of this node") + fmt.Println(" --os Reported OS (default: current runtime)") + fmt.Println(" --arch Reported arch (default: current runtime)") + fmt.Println(" --version Reported node version (default: current clawgo version)") + fmt.Println(" --actions Supported actions, e.g. run,agent_task") + fmt.Println(" --models Supported models, e.g. gpt-4o-mini") + fmt.Println(" --capabilities Capability flags: run,invoke,model,camera,screen,location,canvas") + fmt.Println(" --watch Keep a websocket connection open and send heartbeats") + fmt.Println(" --heartbeat-sec Heartbeat interval in seconds when --watch is set (default: 30)") + fmt.Println() + fmt.Println("Heartbeat options:") + fmt.Println(" --gateway Gateway base URL") + fmt.Println(" --token Gateway token") + fmt.Println(" --id Node ID") +} + +func nodeRegisterCmd(args []string) { + cfg, _ := loadConfig() + opts, err := parseNodeRegisterArgs(args, cfg) + if err != nil { + fmt.Printf("Error: %v\n", err) + printNodeHelp() + os.Exit(1) + } + client := &http.Client{Timeout: 20 * time.Second} + info := buildNodeInfo(opts) + ctx := context.Background() + if err := postNodeRegister(ctx, client, opts.GatewayBase, opts.Token, info); err != nil { + fmt.Printf("Error registering node: %v\n", err) + os.Exit(1) + } + fmt.Printf("✓ Node registered: %s -> %s\n", info.ID, opts.GatewayBase) + if !opts.Watch { + return + } + fmt.Printf("✓ Heartbeat loop started: every %ds\n", opts.HeartbeatSec) + if err := runNodeHeartbeatLoop(client, opts, info); err != nil { + fmt.Printf("Heartbeat loop stopped: %v\n", err) + os.Exit(1) + } +} + +func nodeHeartbeatCmd(args []string) { + cfg, _ := loadConfig() + opts, err := parseNodeHeartbeatArgs(args, cfg) + if err != nil { + fmt.Printf("Error: %v\n", err) + printNodeHelp() + os.Exit(1) + } + client := &http.Client{Timeout: 20 * time.Second} + if err := postNodeHeartbeat(context.Background(), client, opts.GatewayBase, opts.Token, opts.ID); err != nil { + fmt.Printf("Error sending heartbeat: %v\n", err) + os.Exit(1) + } + fmt.Printf("✓ Heartbeat sent: %s -> %s\n", opts.ID, opts.GatewayBase) +} + +func parseNodeRegisterArgs(args []string, cfg *config.Config) (nodeRegisterOptions, error) { + host, _ := os.Hostname() + host = strings.TrimSpace(host) + if host == "" { + host = "node" + } + opts := nodeRegisterOptions{ + GatewayBase: defaultGatewayBase(cfg), + Token: defaultGatewayToken(cfg), + ID: host, + Name: host, + OS: runtime.GOOS, + Arch: runtime.GOARCH, + Version: version, + HeartbeatSec: 30, + Capabilities: capabilitiesFromCSV("run,invoke,model"), + } + for i := 0; i < len(args); i++ { + arg := strings.TrimSpace(args[i]) + next := func() (string, error) { + if i+1 >= len(args) { + return "", fmt.Errorf("missing value for %s", arg) + } + i++ + return strings.TrimSpace(args[i]), nil + } + switch arg { + case "--gateway": + v, err := next() + if err != nil { + return opts, err + } + opts.GatewayBase = v + case "--token": + v, err := next() + if err != nil { + return opts, err + } + opts.Token = v + case "--id": + v, err := next() + if err != nil { + return opts, err + } + opts.ID = v + case "--name": + v, err := next() + if err != nil { + return opts, err + } + opts.Name = v + case "--endpoint": + v, err := next() + if err != nil { + return opts, err + } + opts.Endpoint = v + case "--os": + v, err := next() + if err != nil { + return opts, err + } + opts.OS = v + case "--arch": + v, err := next() + if err != nil { + return opts, err + } + opts.Arch = v + case "--version": + v, err := next() + if err != nil { + return opts, err + } + opts.Version = v + case "--actions": + v, err := next() + if err != nil { + return opts, err + } + opts.Actions = splitCSV(v) + case "--models": + v, err := next() + if err != nil { + return opts, err + } + opts.Models = splitCSV(v) + case "--capabilities": + v, err := next() + if err != nil { + return opts, err + } + opts.Capabilities = capabilitiesFromCSV(v) + case "--watch": + opts.Watch = true + case "--heartbeat-sec": + v, err := next() + if err != nil { + return opts, err + } + n, convErr := strconv.Atoi(v) + if convErr != nil || n <= 0 { + return opts, fmt.Errorf("invalid --heartbeat-sec: %s", v) + } + opts.HeartbeatSec = n + default: + return opts, fmt.Errorf("unknown option: %s", arg) + } + } + if strings.TrimSpace(opts.GatewayBase) == "" { + return opts, fmt.Errorf("--gateway is required") + } + if strings.TrimSpace(opts.ID) == "" { + return opts, fmt.Errorf("--id is required") + } + opts.GatewayBase = normalizeGatewayBase(opts.GatewayBase) + return opts, nil +} + +func parseNodeHeartbeatArgs(args []string, cfg *config.Config) (nodeHeartbeatOptions, error) { + host, _ := os.Hostname() + host = strings.TrimSpace(host) + if host == "" { + host = "node" + } + opts := nodeHeartbeatOptions{ + GatewayBase: defaultGatewayBase(cfg), + Token: defaultGatewayToken(cfg), + ID: host, + } + for i := 0; i < len(args); i++ { + arg := strings.TrimSpace(args[i]) + next := func() (string, error) { + if i+1 >= len(args) { + return "", fmt.Errorf("missing value for %s", arg) + } + i++ + return strings.TrimSpace(args[i]), nil + } + switch arg { + case "--gateway": + v, err := next() + if err != nil { + return opts, err + } + opts.GatewayBase = v + case "--token": + v, err := next() + if err != nil { + return opts, err + } + opts.Token = v + case "--id": + v, err := next() + if err != nil { + return opts, err + } + opts.ID = v + default: + return opts, fmt.Errorf("unknown option: %s", arg) + } + } + if strings.TrimSpace(opts.GatewayBase) == "" { + return opts, fmt.Errorf("--gateway is required") + } + if strings.TrimSpace(opts.ID) == "" { + return opts, fmt.Errorf("--id is required") + } + opts.GatewayBase = normalizeGatewayBase(opts.GatewayBase) + return opts, nil +} + +func buildNodeInfo(opts nodeRegisterOptions) nodes.NodeInfo { + return nodes.NodeInfo{ + ID: strings.TrimSpace(opts.ID), + Name: strings.TrimSpace(opts.Name), + OS: strings.TrimSpace(opts.OS), + Arch: strings.TrimSpace(opts.Arch), + Version: strings.TrimSpace(opts.Version), + Endpoint: strings.TrimSpace(opts.Endpoint), + Capabilities: opts.Capabilities, + Actions: append([]string(nil), opts.Actions...), + Models: append([]string(nil), opts.Models...), + } +} + +func runNodeHeartbeatLoop(client *http.Client, opts nodeRegisterOptions, info nodes.NodeInfo) error { + ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt) + defer stop() + for { + if err := runNodeHeartbeatSocket(ctx, opts, info); err != nil { + if ctx.Err() != nil { + fmt.Println("✓ Node heartbeat stopped") + return nil + } + fmt.Printf("Warning: node socket closed for %s: %v\n", info.ID, err) + } + if ctx.Err() != nil { + fmt.Println("✓ Node heartbeat stopped") + return nil + } + if regErr := postNodeRegister(ctx, client, opts.GatewayBase, opts.Token, info); regErr != nil { + fmt.Printf("Warning: re-register failed for %s: %v\n", info.ID, regErr) + } else { + fmt.Printf("✓ Node re-registered: %s\n", info.ID) + } + select { + case <-ctx.Done(): + fmt.Println("✓ Node heartbeat stopped") + return nil + case <-time.After(2 * time.Second): + } + } +} + +func runNodeHeartbeatSocket(ctx context.Context, opts nodeRegisterOptions, info nodes.NodeInfo) error { + wsURL := nodeWebsocketURL(opts.GatewayBase) + headers := http.Header{} + if strings.TrimSpace(opts.Token) != "" { + headers.Set("Authorization", "Bearer "+strings.TrimSpace(opts.Token)) + } + conn, _, err := websocket.DefaultDialer.DialContext(ctx, wsURL, headers) + if err != nil { + return err + } + defer conn.Close() + + _ = conn.SetWriteDeadline(time.Now().Add(10 * time.Second)) + if err := conn.WriteJSON(nodes.WireMessage{Type: "register", Node: &info}); err != nil { + return err + } + if err := readNodeAck(conn, "registered", info.ID); err != nil { + return err + } + fmt.Printf("✓ Node socket connected: %s\n", info.ID) + + ticker := time.NewTicker(time.Duration(opts.HeartbeatSec) * time.Second) + pingTicker := time.NewTicker(nodeSocketPingInterval(opts.HeartbeatSec)) + defer ticker.Stop() + defer pingTicker.Stop() + + for { + select { + case <-ctx.Done(): + return nil + case <-pingTicker.C: + if err := conn.WriteControl(websocket.PingMessage, []byte("ping"), time.Now().Add(10*time.Second)); err != nil { + return err + } + case <-ticker.C: + _ = conn.SetWriteDeadline(time.Now().Add(10 * time.Second)) + if err := conn.WriteJSON(nodes.WireMessage{Type: "heartbeat", ID: info.ID}); err != nil { + return err + } + if err := readNodeAck(conn, "heartbeat", info.ID); err != nil { + return err + } + fmt.Printf("✓ Heartbeat ok: %s\n", info.ID) + } + } +} + +func nodeSocketPingInterval(heartbeatSec int) time.Duration { + if heartbeatSec <= 0 { + return 25 * time.Second + } + interval := time.Duration(heartbeatSec) * time.Second / 2 + if interval < 10*time.Second { + interval = 10 * time.Second + } + if interval > 25*time.Second { + interval = 25 * time.Second + } + return interval +} + +func readNodeAck(conn *websocket.Conn, expectedType, id string) error { + _ = conn.SetReadDeadline(time.Now().Add(30 * time.Second)) + var ack nodes.WireAck + if err := conn.ReadJSON(&ack); err != nil { + return err + } + if !ack.OK { + if strings.TrimSpace(ack.Error) == "" { + ack.Error = "unknown websocket error" + } + return fmt.Errorf("%s", ack.Error) + } + ackType := strings.ToLower(strings.TrimSpace(ack.Type)) + if expectedType != "" && ackType != strings.ToLower(strings.TrimSpace(expectedType)) { + return fmt.Errorf("unexpected websocket ack type: %s", ack.Type) + } + if strings.TrimSpace(id) != "" && strings.TrimSpace(ack.ID) != "" && strings.TrimSpace(ack.ID) != strings.TrimSpace(id) { + return fmt.Errorf("unexpected websocket ack id: %s", ack.ID) + } + return nil +} + +func postNodeRegister(ctx context.Context, client *http.Client, gatewayBase, token string, info nodes.NodeInfo) error { + return postNodeJSON(ctx, client, gatewayBase, token, "/nodes/register", info) +} + +func postNodeHeartbeat(ctx context.Context, client *http.Client, gatewayBase, token, id string) error { + return postNodeJSON(ctx, client, gatewayBase, token, "/nodes/heartbeat", map[string]string{"id": strings.TrimSpace(id)}) +} + +func postNodeJSON(ctx context.Context, client *http.Client, gatewayBase, token, path string, payload interface{}) error { + body, err := json.Marshal(payload) + if err != nil { + return err + } + req, err := http.NewRequestWithContext(ctx, http.MethodPost, strings.TrimRight(gatewayBase, "/")+path, bytes.NewReader(body)) + if err != nil { + return err + } + req.Header.Set("Content-Type", "application/json") + if strings.TrimSpace(token) != "" { + req.Header.Set("Authorization", "Bearer "+strings.TrimSpace(token)) + } + resp, err := client.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + var out bytes.Buffer + _, _ = out.ReadFrom(resp.Body) + msg := strings.TrimSpace(out.String()) + if msg == "" { + msg = resp.Status + } + return fmt.Errorf("http %d: %s", resp.StatusCode, msg) + } + return nil +} + +func defaultGatewayBase(cfg *config.Config) string { + if raw := strings.TrimSpace(os.Getenv("CLAWGO_GATEWAY_URL")); raw != "" { + return normalizeGatewayBase(raw) + } + host := "127.0.0.1" + port := 18790 + if cfg != nil { + if v := strings.TrimSpace(cfg.Gateway.Host); v != "" && v != "0.0.0.0" && v != "::" { + host = v + } + if cfg.Gateway.Port > 0 { + port = cfg.Gateway.Port + } + } + return fmt.Sprintf("http://%s:%d", host, port) +} + +func defaultGatewayToken(cfg *config.Config) string { + if v := strings.TrimSpace(os.Getenv("CLAWGO_GATEWAY_TOKEN")); v != "" { + return v + } + if cfg == nil { + return "" + } + return strings.TrimSpace(cfg.Gateway.Token) +} + +func normalizeGatewayBase(raw string) string { + raw = strings.TrimSpace(raw) + if raw == "" { + return "" + } + if !strings.HasPrefix(raw, "http://") && !strings.HasPrefix(raw, "https://") { + raw = "http://" + raw + } + return strings.TrimRight(raw, "/") +} + +func nodeWebsocketURL(gatewayBase string) string { + base := normalizeGatewayBase(gatewayBase) + base = strings.TrimPrefix(base, "http://") + base = strings.TrimPrefix(base, "https://") + scheme := "ws://" + if strings.HasPrefix(strings.TrimSpace(gatewayBase), "https://") { + scheme = "wss://" + } + return scheme + base + "/nodes/connect" +} + +func splitCSV(raw string) []string { + parts := strings.Split(raw, ",") + out := make([]string, 0, len(parts)) + seen := map[string]bool{} + for _, part := range parts { + item := strings.TrimSpace(part) + if item == "" || seen[item] { + continue + } + seen[item] = true + out = append(out, item) + } + return out +} + +func capabilitiesFromCSV(raw string) nodes.Capabilities { + caps := nodes.Capabilities{} + for _, item := range splitCSV(raw) { + switch strings.ToLower(item) { + case "run": + caps.Run = true + case "invoke": + caps.Invoke = true + case "model", "agent_task": + caps.Model = true + case "camera": + caps.Camera = true + case "screen": + caps.Screen = true + case "location": + caps.Location = true + case "canvas": + caps.Canvas = true + } + } + return caps +} diff --git a/cmd/clawgo/cmd_node_test.go b/cmd/clawgo/cmd_node_test.go new file mode 100644 index 0000000..6db0a2c --- /dev/null +++ b/cmd/clawgo/cmd_node_test.go @@ -0,0 +1,132 @@ +package main + +import ( + "context" + "encoding/json" + "net/http" + "net/http/httptest" + "strings" + "testing" + "time" + + "clawgo/pkg/config" + "clawgo/pkg/nodes" +) + +func TestParseNodeRegisterArgsDefaults(t *testing.T) { + t.Parallel() + + cfg := config.DefaultConfig() + cfg.Gateway.Host = "gateway.example" + cfg.Gateway.Port = 7788 + cfg.Gateway.Token = "cfg-token" + + opts, err := parseNodeRegisterArgs([]string{"--id", "edge-dev"}, cfg) + if err != nil { + t.Fatalf("parseNodeRegisterArgs failed: %v", err) + } + if opts.GatewayBase != "http://gateway.example:7788" { + t.Fatalf("unexpected gateway base: %s", opts.GatewayBase) + } + if opts.Token != "cfg-token" { + t.Fatalf("unexpected token: %s", opts.Token) + } + if opts.ID != "edge-dev" { + t.Fatalf("unexpected id: %s", opts.ID) + } + if !opts.Capabilities.Run || !opts.Capabilities.Invoke || !opts.Capabilities.Model { + t.Fatalf("expected default run/invoke/model capabilities, got %+v", opts.Capabilities) + } +} + +func TestPostNodeRegisterSendsNodeInfo(t *testing.T) { + t.Parallel() + + var gotAuth string + var got nodes.NodeInfo + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != "/nodes/register" { + t.Fatalf("unexpected path: %s", r.URL.Path) + } + gotAuth = r.Header.Get("Authorization") + if err := json.NewDecoder(r.Body).Decode(&got); err != nil { + t.Fatalf("decode body: %v", err) + } + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte(`{"ok":true}`)) + })) + defer srv.Close() + + info := nodes.NodeInfo{ + ID: "edge-dev", + Name: "Edge Dev", + Endpoint: "http://edge.example:18790", + Capabilities: nodes.Capabilities{ + Run: true, Invoke: true, Model: true, + }, + Actions: []string{"run", "agent_task"}, + Models: []string{"gpt-4o-mini"}, + } + client := &http.Client{Timeout: 2 * time.Second} + if err := postNodeRegister(context.Background(), client, srv.URL, "secret", info); err != nil { + t.Fatalf("postNodeRegister failed: %v", err) + } + if gotAuth != "Bearer secret" { + t.Fatalf("unexpected auth header: %s", gotAuth) + } + if got.ID != "edge-dev" || got.Endpoint != "http://edge.example:18790" { + t.Fatalf("unexpected node payload: %+v", got) + } + if len(got.Actions) != 2 || got.Actions[1] != "agent_task" { + t.Fatalf("unexpected actions: %+v", got.Actions) + } +} + +func TestPostNodeHeartbeatSendsNodeID(t *testing.T) { + t.Parallel() + + var body map[string]string + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != "/nodes/heartbeat" { + t.Fatalf("unexpected path: %s", r.URL.Path) + } + if err := json.NewDecoder(r.Body).Decode(&body); err != nil { + t.Fatalf("decode body: %v", err) + } + w.WriteHeader(http.StatusOK) + })) + defer srv.Close() + + client := &http.Client{Timeout: 2 * time.Second} + if err := postNodeHeartbeat(context.Background(), client, srv.URL, "", "edge-dev"); err != nil { + t.Fatalf("postNodeHeartbeat failed: %v", err) + } + if strings.TrimSpace(body["id"]) != "edge-dev" { + t.Fatalf("unexpected heartbeat body: %+v", body) + } +} + +func TestNodeWebsocketURL(t *testing.T) { + t.Parallel() + + if got := nodeWebsocketURL("http://gateway.example:18790"); got != "ws://gateway.example:18790/nodes/connect" { + t.Fatalf("unexpected ws url: %s", got) + } + if got := nodeWebsocketURL("https://gateway.example"); got != "wss://gateway.example/nodes/connect" { + t.Fatalf("unexpected wss url: %s", got) + } +} + +func TestNodeSocketPingInterval(t *testing.T) { + t.Parallel() + + if got := nodeSocketPingInterval(120); got != 25*time.Second { + t.Fatalf("expected 25s cap, got %s", got) + } + if got := nodeSocketPingInterval(20); got != 10*time.Second { + t.Fatalf("expected 10s floor, got %s", got) + } + if got := nodeSocketPingInterval(30); got != 15*time.Second { + t.Fatalf("expected half heartbeat, got %s", got) + } +} diff --git a/cmd/clawgo/main.go b/cmd/clawgo/main.go index 27a2f5d..bf326e0 100644 --- a/cmd/clawgo/main.go +++ b/cmd/clawgo/main.go @@ -67,6 +67,8 @@ func main() { cronCmd() case "channel": channelCmd() + case "node": + nodeCmd() case "skills": skillsCmd() case "version", "--version", "-v": diff --git a/pkg/api/server.go b/pkg/api/server.go index be50356..c09c999 100644 --- a/pkg/api/server.go +++ b/pkg/api/server.go @@ -29,6 +29,7 @@ import ( cfgpkg "clawgo/pkg/config" "clawgo/pkg/nodes" "clawgo/pkg/tools" + "github.com/gorilla/websocket" ) type Server struct { @@ -36,6 +37,8 @@ type Server struct { token string mgr *nodes.Manager server *http.Server + nodeConnMu sync.Mutex + nodeConnIDs map[string]string gatewayVersion string webuiVersion string configPath string @@ -55,6 +58,10 @@ type Server struct { ekgCacheRows []map[string]interface{} } +var nodesWebsocketUpgrader = websocket.Upgrader{ + CheckOrigin: func(r *http.Request) bool { return true }, +} + func NewServer(host string, port int, token string, mgr *nodes.Manager) *Server { addr := strings.TrimSpace(host) if addr == "" { @@ -63,7 +70,12 @@ func NewServer(host string, port int, token string, mgr *nodes.Manager) *Server if port <= 0 { port = 7788 } - return &Server{addr: fmt.Sprintf("%s:%d", addr, port), token: strings.TrimSpace(token), mgr: mgr} + return &Server{ + addr: fmt.Sprintf("%s:%d", addr, port), + token: strings.TrimSpace(token), + mgr: mgr, + nodeConnIDs: map[string]string{}, + } } func (s *Server) SetConfigPath(path string) { s.configPath = strings.TrimSpace(path) } @@ -87,6 +99,32 @@ func (s *Server) SetWebUIDir(dir string) { s.webUIDir = st func (s *Server) SetGatewayVersion(v string) { s.gatewayVersion = strings.TrimSpace(v) } func (s *Server) SetWebUIVersion(v string) { s.webuiVersion = strings.TrimSpace(v) } +func (s *Server) rememberNodeConnection(nodeID, connID string) { + nodeID = strings.TrimSpace(nodeID) + connID = strings.TrimSpace(connID) + if nodeID == "" || connID == "" { + return + } + s.nodeConnMu.Lock() + defer s.nodeConnMu.Unlock() + s.nodeConnIDs[nodeID] = connID +} + +func (s *Server) releaseNodeConnection(nodeID, connID string) bool { + nodeID = strings.TrimSpace(nodeID) + connID = strings.TrimSpace(connID) + if nodeID == "" || connID == "" { + return false + } + s.nodeConnMu.Lock() + defer s.nodeConnMu.Unlock() + if s.nodeConnIDs[nodeID] != connID { + return false + } + delete(s.nodeConnIDs, nodeID) + return true +} + func (s *Server) Start(ctx context.Context) error { if s.mgr == nil { return nil @@ -98,12 +136,15 @@ func (s *Server) Start(ctx context.Context) error { }) mux.HandleFunc("/nodes/register", s.handleRegister) mux.HandleFunc("/nodes/heartbeat", s.handleHeartbeat) + mux.HandleFunc("/nodes/connect", s.handleNodeConnect) mux.HandleFunc("/webui", s.handleWebUI) mux.HandleFunc("/webui/", s.handleWebUIAsset) mux.HandleFunc("/webui/api/config", s.handleWebUIConfig) mux.HandleFunc("/webui/api/chat", s.handleWebUIChat) mux.HandleFunc("/webui/api/chat/history", s.handleWebUIChatHistory) mux.HandleFunc("/webui/api/chat/stream", s.handleWebUIChatStream) + mux.HandleFunc("/webui/api/chat/live", s.handleWebUIChatLive) + mux.HandleFunc("/webui/api/runtime", s.handleWebUIRuntime) mux.HandleFunc("/webui/api/version", s.handleWebUIVersion) mux.HandleFunc("/webui/api/upload", s.handleWebUIUpload) mux.HandleFunc("/webui/api/nodes", s.handleWebUINodes) @@ -113,6 +154,7 @@ func (s *Server) Start(ctx context.Context) error { mux.HandleFunc("/webui/api/memory", s.handleWebUIMemory) mux.HandleFunc("/webui/api/subagent_profiles", s.handleWebUISubagentProfiles) mux.HandleFunc("/webui/api/subagents_runtime", s.handleWebUISubagentsRuntime) + mux.HandleFunc("/webui/api/subagents_runtime/live", s.handleWebUISubagentsRuntimeLive) mux.HandleFunc("/webui/api/tool_allowlist_groups", s.handleWebUIToolAllowlistGroups) mux.HandleFunc("/webui/api/tools", s.handleWebUITools) mux.HandleFunc("/webui/api/mcp/install", s.handleWebUIMCPInstall) @@ -121,6 +163,7 @@ func (s *Server) Start(ctx context.Context) error { mux.HandleFunc("/webui/api/ekg_stats", s.handleWebUIEKGStats) mux.HandleFunc("/webui/api/exec_approvals", s.handleWebUIExecApprovals) mux.HandleFunc("/webui/api/logs/stream", s.handleWebUILogsStream) + mux.HandleFunc("/webui/api/logs/live", s.handleWebUILogsLive) mux.HandleFunc("/webui/api/logs/recent", s.handleWebUILogsRecent) s.server = &http.Server{Addr: s.addr, Handler: mux} go func() { @@ -184,6 +227,89 @@ func (s *Server) handleHeartbeat(w http.ResponseWriter, r *http.Request) { _ = json.NewEncoder(w).Encode(map[string]interface{}{"ok": true, "id": body.ID}) } +func (s *Server) handleNodeConnect(w http.ResponseWriter, r *http.Request) { + if !s.checkAuth(r) { + http.Error(w, "unauthorized", http.StatusUnauthorized) + return + } + if s.mgr == nil { + http.Error(w, "nodes manager unavailable", http.StatusInternalServerError) + return + } + conn, err := nodesWebsocketUpgrader.Upgrade(w, r, nil) + if err != nil { + return + } + defer conn.Close() + + var connectedID string + connID := fmt.Sprintf("%d", time.Now().UnixNano()) + _ = conn.SetReadDeadline(time.Now().Add(90 * time.Second)) + conn.SetPongHandler(func(string) error { + return conn.SetReadDeadline(time.Now().Add(90 * time.Second)) + }) + + writeAck := func(ack nodes.WireAck) error { + _ = conn.SetWriteDeadline(time.Now().Add(10 * time.Second)) + return conn.WriteJSON(ack) + } + + defer func() { + if strings.TrimSpace(connectedID) != "" && s.releaseNodeConnection(connectedID, connID) { + s.mgr.MarkOffline(connectedID) + } + }() + + for { + var msg nodes.WireMessage + if err := conn.ReadJSON(&msg); err != nil { + return + } + _ = conn.SetReadDeadline(time.Now().Add(90 * time.Second)) + switch strings.ToLower(strings.TrimSpace(msg.Type)) { + case "register": + if msg.Node == nil || strings.TrimSpace(msg.Node.ID) == "" { + _ = writeAck(nodes.WireAck{OK: false, Type: "register", Error: "node.id required"}) + continue + } + s.mgr.Upsert(*msg.Node) + connectedID = strings.TrimSpace(msg.Node.ID) + s.rememberNodeConnection(connectedID, connID) + if err := writeAck(nodes.WireAck{OK: true, Type: "registered", ID: connectedID}); err != nil { + return + } + case "heartbeat": + id := strings.TrimSpace(msg.ID) + if id == "" { + id = connectedID + } + if id == "" { + _ = writeAck(nodes.WireAck{OK: false, Type: "heartbeat", Error: "id required"}) + continue + } + if msg.Node != nil && strings.TrimSpace(msg.Node.ID) != "" { + s.mgr.Upsert(*msg.Node) + connectedID = strings.TrimSpace(msg.Node.ID) + s.rememberNodeConnection(connectedID, connID) + } else if n, ok := s.mgr.Get(id); ok { + s.mgr.Upsert(n) + connectedID = id + s.rememberNodeConnection(connectedID, connID) + } else { + _ = writeAck(nodes.WireAck{OK: false, Type: "heartbeat", ID: id, Error: "node not found"}) + continue + } + if err := writeAck(nodes.WireAck{OK: true, Type: "heartbeat", ID: connectedID}); err != nil { + return + } + default: + if err := writeAck(nodes.WireAck{OK: false, Type: msg.Type, ID: msg.ID, Error: "unsupported message type"}); err != nil { + return + } + } + } +} + func (s *Server) handleWebUI(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodGet { http.Error(w, "method not allowed", http.StatusMethodNotAllowed) @@ -555,6 +681,8 @@ func (s *Server) handleWebUIChatHistory(w http.ResponseWriter, r *http.Request) } func (s *Server) handleWebUIChatStream(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Deprecation", "true") + w.Header().Set("X-Clawgo-Replaced-By", "/webui/api/chat/live") if !s.checkAuth(r) { http.Error(w, "unauthorized", http.StatusUnauthorized) return @@ -616,6 +744,77 @@ func (s *Server) handleWebUIChatStream(w http.ResponseWriter, r *http.Request) { } } +func (s *Server) handleWebUIChatLive(w http.ResponseWriter, r *http.Request) { + if !s.checkAuth(r) { + http.Error(w, "unauthorized", http.StatusUnauthorized) + return + } + if r.Method != http.MethodGet { + http.Error(w, "method not allowed", http.StatusMethodNotAllowed) + return + } + if s.onChat == nil { + http.Error(w, "chat handler not configured", http.StatusInternalServerError) + return + } + conn, err := nodesWebsocketUpgrader.Upgrade(w, r, nil) + if err != nil { + return + } + defer conn.Close() + + var body struct { + Session string `json:"session"` + Message string `json:"message"` + Media string `json:"media"` + } + if err := conn.ReadJSON(&body); err != nil { + _ = conn.WriteJSON(map[string]interface{}{"ok": false, "type": "chat_error", "error": "invalid json"}) + return + } + session := body.Session + if session == "" { + session = r.URL.Query().Get("session") + } + if session == "" { + session = "main" + } + prompt := body.Message + if body.Media != "" { + if prompt != "" { + prompt += "\n" + } + prompt += "[file: " + body.Media + "]" + } + resp, err := s.onChat(r.Context(), session, prompt) + if err != nil { + _ = conn.WriteJSON(map[string]interface{}{"ok": false, "type": "chat_error", "error": err.Error(), "session": session}) + return + } + chunk := 180 + for i := 0; i < len(resp); i += chunk { + end := i + chunk + if end > len(resp) { + end = len(resp) + } + _ = conn.SetWriteDeadline(time.Now().Add(10 * time.Second)) + if err := conn.WriteJSON(map[string]interface{}{ + "ok": true, + "type": "chat_chunk", + "session": session, + "delta": resp[i:end], + }); err != nil { + return + } + } + _ = conn.SetWriteDeadline(time.Now().Add(10 * time.Second)) + _ = conn.WriteJSON(map[string]interface{}{ + "ok": true, + "type": "chat_done", + "session": session, + }) +} + func (s *Server) handleWebUIVersion(w http.ResponseWriter, r *http.Request) { if !s.checkAuth(r) { http.Error(w, "unauthorized", http.StatusUnauthorized) @@ -632,6 +831,378 @@ func (s *Server) handleWebUIVersion(w http.ResponseWriter, r *http.Request) { }) } +func (s *Server) handleWebUIRuntime(w http.ResponseWriter, r *http.Request) { + if !s.checkAuth(r) { + http.Error(w, "unauthorized", http.StatusUnauthorized) + return + } + conn, err := nodesWebsocketUpgrader.Upgrade(w, r, nil) + if err != nil { + return + } + defer conn.Close() + + ctx := r.Context() + ticker := time.NewTicker(10 * time.Second) + defer ticker.Stop() + + sendSnapshot := func() error { + payload := map[string]interface{}{ + "ok": true, + "type": "runtime_snapshot", + "snapshot": s.buildWebUIRuntimeSnapshot(ctx), + } + _ = conn.SetWriteDeadline(time.Now().Add(10 * time.Second)) + return conn.WriteJSON(payload) + } + + if err := sendSnapshot(); err != nil { + return + } + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + if err := sendSnapshot(); err != nil { + return + } + } + } +} + +func (s *Server) buildWebUIRuntimeSnapshot(ctx context.Context) map[string]interface{} { + return map[string]interface{}{ + "version": s.webUIVersionPayload(), + "nodes": s.webUINodesPayload(ctx), + "sessions": s.webUISessionsPayload(), + "task_queue": s.webUITaskQueuePayload(false), + "ekg": s.webUIEKGSummaryPayload("24h"), + "subagents": s.webUISubagentsRuntimePayload(ctx), + } +} + +func (s *Server) webUISubagentsRuntimePayload(ctx context.Context) map[string]interface{} { + if s.onSubagents == nil { + return map[string]interface{}{ + "items": []interface{}{}, + "registry": []interface{}{}, + "stream": []interface{}{}, + } + } + call := func(action string, args map[string]interface{}) interface{} { + res, err := s.onSubagents(ctx, action, args) + if err != nil { + return []interface{}{} + } + if m, ok := res.(map[string]interface{}); ok { + if items, ok := m["items"]; ok { + return items + } + } + return []interface{}{} + } + return map[string]interface{}{ + "items": call("list", map[string]interface{}{}), + "registry": call("registry", map[string]interface{}{}), + "stream": call("stream_all", map[string]interface{}{"limit": 300, "task_limit": 36}), + } +} + +func (s *Server) webUIVersionPayload() map[string]interface{} { + return map[string]interface{}{ + "gateway_version": firstNonEmptyString(s.gatewayVersion, gatewayBuildVersion()), + "webui_version": firstNonEmptyString(s.webuiVersion, detectWebUIVersion(strings.TrimSpace(s.webUIDir))), + } +} + +func (s *Server) webUINodesPayload(ctx context.Context) map[string]interface{} { + list := []nodes.NodeInfo{} + if s.mgr != nil { + list = s.mgr.List() + } + host, _ := os.Hostname() + local := nodes.NodeInfo{ID: "local", Name: "local", Endpoint: "gateway", Version: gatewayBuildVersion(), LastSeenAt: time.Now(), Online: true} + if strings.TrimSpace(host) != "" { + local.Name = host + } + if ip := detectLocalIP(); ip != "" { + local.Endpoint = ip + } + hostLower := strings.ToLower(strings.TrimSpace(host)) + matched := false + for i := range list { + id := strings.ToLower(strings.TrimSpace(list[i].ID)) + name := strings.ToLower(strings.TrimSpace(list[i].Name)) + if id == "local" || name == "local" || (hostLower != "" && name == hostLower) { + list[i].ID = "local" + list[i].Online = true + list[i].Version = local.Version + if strings.TrimSpace(local.Endpoint) != "" { + list[i].Endpoint = local.Endpoint + } + if strings.TrimSpace(local.Name) != "" { + list[i].Name = local.Name + } + list[i].LastSeenAt = time.Now() + matched = true + break + } + } + if !matched { + list = append([]nodes.NodeInfo{local}, list...) + } + return map[string]interface{}{ + "nodes": list, + "trees": s.buildNodeAgentTrees(ctx, list), + } +} + +func (s *Server) webUISessionsPayload() map[string]interface{} { + sessionsDir := filepath.Join(filepath.Dir(s.workspacePath), "agents", "main", "sessions") + _ = os.MkdirAll(sessionsDir, 0755) + type item struct { + Key string `json:"key"` + Channel string `json:"channel,omitempty"` + } + out := make([]item, 0, 16) + entries, err := os.ReadDir(sessionsDir) + if err == nil { + seen := map[string]struct{}{} + for _, e := range entries { + if e.IsDir() { + continue + } + name := e.Name() + if !strings.HasSuffix(name, ".jsonl") || strings.Contains(name, ".deleted.") { + continue + } + key := strings.TrimSuffix(name, ".jsonl") + if strings.TrimSpace(key) == "" { + continue + } + if _, ok := seen[key]; ok { + continue + } + seen[key] = struct{}{} + channel := "" + if i := strings.Index(key, ":"); i > 0 { + channel = key[:i] + } + out = append(out, item{Key: key, Channel: channel}) + } + } + if len(out) == 0 { + out = append(out, item{Key: "main", Channel: "main"}) + } + return map[string]interface{}{"sessions": out} +} + +func (s *Server) webUITaskQueuePayload(includeHeartbeat bool) map[string]interface{} { + path := filepath.Join(strings.TrimSpace(s.workspacePath), "memory", "task-audit.jsonl") + b, err := os.ReadFile(path) + lines := []string{} + if err == nil { + lines = strings.Split(string(b), "\n") + } + type agg struct { + Last map[string]interface{} + Logs []string + Attempts int + } + m := map[string]*agg{} + for _, ln := range lines { + if ln == "" { + continue + } + var row map[string]interface{} + if err := json.Unmarshal([]byte(ln), &row); err != nil { + continue + } + source := strings.ToLower(strings.TrimSpace(fmt.Sprintf("%v", row["source"]))) + if !includeHeartbeat && source == "heartbeat" { + continue + } + id := fmt.Sprintf("%v", row["task_id"]) + if id == "" { + continue + } + if _, ok := m[id]; !ok { + m[id] = &agg{Last: row, Logs: []string{}, Attempts: 0} + } + a := m[id] + a.Last = row + a.Attempts++ + if lg := strings.TrimSpace(fmt.Sprintf("%v", row["log"])); lg != "" { + if len(a.Logs) == 0 || a.Logs[len(a.Logs)-1] != lg { + a.Logs = append(a.Logs, lg) + if len(a.Logs) > 20 { + a.Logs = a.Logs[len(a.Logs)-20:] + } + } + } + } + items := make([]map[string]interface{}, 0, len(m)) + running := make([]map[string]interface{}, 0) + for _, a := range m { + row := a.Last + row["logs"] = a.Logs + row["attempts"] = a.Attempts + items = append(items, row) + if fmt.Sprintf("%v", row["status"]) == "running" { + running = append(running, row) + } + } + queuePath := filepath.Join(strings.TrimSpace(s.workspacePath), "memory", "task_queue.json") + if qb, qErr := os.ReadFile(queuePath); qErr == nil { + var q map[string]interface{} + if json.Unmarshal(qb, &q) == nil { + if arr, ok := q["running"].([]interface{}); ok { + for _, it := range arr { + if row, ok := it.(map[string]interface{}); ok { + running = append(running, row) + } + } + } + } + } + sort.Slice(items, func(i, j int) bool { + return fmt.Sprintf("%v", items[i]["updated_at"]) > fmt.Sprintf("%v", items[j]["updated_at"]) + }) + sort.Slice(running, func(i, j int) bool { + return fmt.Sprintf("%v", running[i]["updated_at"]) > fmt.Sprintf("%v", running[j]["updated_at"]) + }) + if len(items) > 30 { + items = items[:30] + } + return map[string]interface{}{"items": items, "running": running} +} + +func (s *Server) webUIEKGSummaryPayload(window string) map[string]interface{} { + workspace := strings.TrimSpace(s.workspacePath) + ekgPath := filepath.Join(workspace, "memory", "ekg-events.jsonl") + window = strings.ToLower(strings.TrimSpace(window)) + windowDur := 24 * time.Hour + switch window { + case "6h": + windowDur = 6 * time.Hour + case "24h", "": + windowDur = 24 * time.Hour + case "7d": + windowDur = 7 * 24 * time.Hour + } + selectedWindow := window + if selectedWindow == "" { + selectedWindow = "24h" + } + cutoff := time.Now().UTC().Add(-windowDur) + rows := s.loadEKGRowsCached(ekgPath, 3000) + type kv struct { + Key string `json:"key"` + Score float64 `json:"score,omitempty"` + Count int `json:"count,omitempty"` + } + providerScore := map[string]float64{} + providerScoreWorkload := map[string]float64{} + errSigCount := map[string]int{} + errSigHeartbeat := map[string]int{} + errSigWorkload := map[string]int{} + sourceStats := map[string]int{} + channelStats := map[string]int{} + for _, row := range rows { + ts := strings.TrimSpace(fmt.Sprintf("%v", row["time"])) + if ts != "" { + if tm, err := time.Parse(time.RFC3339, ts); err == nil && tm.Before(cutoff) { + continue + } + } + provider := strings.TrimSpace(fmt.Sprintf("%v", row["provider"])) + status := strings.ToLower(strings.TrimSpace(fmt.Sprintf("%v", row["status"]))) + errSig := strings.TrimSpace(fmt.Sprintf("%v", row["errsig"])) + source := strings.ToLower(strings.TrimSpace(fmt.Sprintf("%v", row["source"]))) + channel := strings.ToLower(strings.TrimSpace(fmt.Sprintf("%v", row["channel"]))) + if source == "heartbeat" { + continue + } + if source == "" { + source = "unknown" + } + if channel == "" { + channel = "unknown" + } + sourceStats[source]++ + channelStats[channel]++ + if provider != "" { + providerScoreWorkload[provider] += 1 + if status == "success" { + providerScore[provider] += 1 + } else if status == "error" { + providerScore[provider] -= 2 + } + } + if errSig != "" { + errSigWorkload[errSig]++ + if source == "heartbeat" { + errSigHeartbeat[errSig]++ + } else if status == "error" { + errSigCount[errSig]++ + } + } + } + toTopScore := func(m map[string]float64, limit int) []kv { + out := make([]kv, 0, len(m)) + for k, v := range m { + out = append(out, kv{Key: k, Score: v}) + } + sort.Slice(out, func(i, j int) bool { + if out[i].Score == out[j].Score { + return out[i].Key < out[j].Key + } + return out[i].Score > out[j].Score + }) + if len(out) > limit { + out = out[:limit] + } + return out + } + toTopCount := func(m map[string]int, limit int) []kv { + out := make([]kv, 0, len(m)) + for k, v := range m { + out = append(out, kv{Key: k, Count: v}) + } + sort.Slice(out, func(i, j int) bool { + if out[i].Count == out[j].Count { + return out[i].Key < out[j].Key + } + return out[i].Count > out[j].Count + }) + if len(out) > limit { + out = out[:limit] + } + return out + } + return map[string]interface{}{ + "ok": true, + "window": selectedWindow, + "rows": len(rows), + "provider_top_score": toTopScore(providerScore, 5), + "provider_top_workload": toTopCount(mapFromFloatCounts(providerScoreWorkload), 5), + "errsig_top": toTopCount(errSigCount, 5), + "errsig_top_heartbeat": toTopCount(errSigHeartbeat, 5), + "errsig_top_workload": toTopCount(errSigWorkload, 5), + "source_top": toTopCount(sourceStats, 5), + "channel_top": toTopCount(channelStats, 5), + } +} + +func mapFromFloatCounts(src map[string]float64) map[string]int { + out := make(map[string]int, len(src)) + for k, v := range src { + out[k] = int(v) + } + return out +} + func (s *Server) handleWebUITools(w http.ResponseWriter, r *http.Request) { if !s.checkAuth(r) { http.Error(w, "unauthorized", http.StatusUnauthorized) @@ -2694,6 +3265,76 @@ func (s *Server) handleWebUISubagentsRuntime(w http.ResponseWriter, r *http.Requ _ = json.NewEncoder(w).Encode(map[string]interface{}{"ok": true, "result": result}) } +func (s *Server) handleWebUISubagentsRuntimeLive(w http.ResponseWriter, r *http.Request) { + if !s.checkAuth(r) { + http.Error(w, "unauthorized", http.StatusUnauthorized) + return + } + if s.onSubagents == nil { + http.Error(w, "subagent runtime handler not configured", http.StatusServiceUnavailable) + return + } + conn, err := nodesWebsocketUpgrader.Upgrade(w, r, nil) + if err != nil { + return + } + defer conn.Close() + + ctx := r.Context() + taskID := strings.TrimSpace(r.URL.Query().Get("task_id")) + previewTaskID := strings.TrimSpace(r.URL.Query().Get("preview_task_id")) + ticker := time.NewTicker(2 * time.Second) + defer ticker.Stop() + + sendSnapshot := func() error { + payload := map[string]interface{}{ + "ok": true, + "type": "subagents_live", + "payload": s.buildSubagentsLivePayload(ctx, taskID, previewTaskID), + } + _ = conn.SetWriteDeadline(time.Now().Add(10 * time.Second)) + return conn.WriteJSON(payload) + } + + if err := sendSnapshot(); err != nil { + return + } + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + if err := sendSnapshot(); err != nil { + return + } + } + } +} + +func (s *Server) buildSubagentsLivePayload(ctx context.Context, taskID, previewTaskID string) map[string]interface{} { + call := func(action string, args map[string]interface{}) map[string]interface{} { + res, err := s.onSubagents(ctx, action, args) + if err != nil { + return map[string]interface{}{} + } + if m, ok := res.(map[string]interface{}); ok { + return m + } + return map[string]interface{}{} + } + payload := map[string]interface{}{} + taskID = strings.TrimSpace(taskID) + previewTaskID = strings.TrimSpace(previewTaskID) + if taskID != "" { + payload["thread"] = call("thread", map[string]interface{}{"id": taskID, "limit": 50}) + payload["inbox"] = call("inbox", map[string]interface{}{"id": taskID, "limit": 50}) + } + if previewTaskID != "" { + payload["preview"] = call("stream", map[string]interface{}{"id": previewTaskID, "limit": 12}) + } + return payload +} + func (s *Server) handleWebUIMemory(w http.ResponseWriter, r *http.Request) { if !s.checkAuth(r) { http.Error(w, "unauthorized", http.StatusUnauthorized) @@ -3244,23 +3885,85 @@ func (s *Server) handleWebUILogsRecent(w http.ResponseWriter, r *http.Request) { } out := make([]map[string]interface{}, 0, limit) for _, ln := range lines[start:] { - ln = strings.TrimSpace(ln) - if ln == "" { - continue + if parsed, ok := parseLogLine(ln); ok { + out = append(out, parsed) } - if json.Valid([]byte(ln)) { - var m map[string]interface{} - if err := json.Unmarshal([]byte(ln), &m); err == nil { - out = append(out, m) - continue - } - } - out = append(out, map[string]interface{}{"time": time.Now().UTC().Format(time.RFC3339), "level": "INFO", "msg": ln}) } _ = json.NewEncoder(w).Encode(map[string]interface{}{"ok": true, "logs": out}) } +func parseLogLine(line string) (map[string]interface{}, bool) { + line = strings.TrimSpace(line) + if line == "" { + return nil, false + } + if json.Valid([]byte(line)) { + var m map[string]interface{} + if err := json.Unmarshal([]byte(line), &m); err == nil { + return m, true + } + } + return map[string]interface{}{ + "time": time.Now().UTC().Format(time.RFC3339), + "level": "INFO", + "msg": line, + }, true +} + +func (s *Server) handleWebUILogsLive(w http.ResponseWriter, r *http.Request) { + if !s.checkAuth(r) { + http.Error(w, "unauthorized", http.StatusUnauthorized) + return + } + if r.Method != http.MethodGet { + http.Error(w, "method not allowed", http.StatusMethodNotAllowed) + return + } + path := strings.TrimSpace(s.logFilePath) + if path == "" { + http.Error(w, "log path not configured", http.StatusInternalServerError) + return + } + conn, err := nodesWebsocketUpgrader.Upgrade(w, r, nil) + if err != nil { + return + } + defer conn.Close() + + f, err := os.Open(path) + if err != nil { + _ = conn.WriteJSON(map[string]interface{}{"ok": false, "error": err.Error()}) + return + } + defer f.Close() + fi, _ := f.Stat() + if fi != nil { + _, _ = f.Seek(fi.Size(), io.SeekStart) + } + reader := bufio.NewReader(f) + ctx := r.Context() + for { + select { + case <-ctx.Done(): + return + default: + line, err := reader.ReadString('\n') + if parsed, ok := parseLogLine(line); ok { + _ = conn.SetWriteDeadline(time.Now().Add(10 * time.Second)) + if writeErr := conn.WriteJSON(map[string]interface{}{"ok": true, "type": "log_entry", "entry": parsed}); writeErr != nil { + return + } + } + if err != nil { + time.Sleep(500 * time.Millisecond) + } + } + } +} + func (s *Server) handleWebUILogsStream(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Deprecation", "true") + w.Header().Set("X-Clawgo-Replaced-By", "/webui/api/logs/live") if !s.checkAuth(r) { http.Error(w, "unauthorized", http.StatusUnauthorized) return @@ -3301,14 +4004,9 @@ func (s *Server) handleWebUILogsStream(w http.ResponseWriter, r *http.Request) { default: line, err := reader.ReadString('\n') if len(line) > 0 { - trim := strings.TrimRight(line, "\r\n") - if trim != "" { - if json.Valid([]byte(trim)) { - _, _ = w.Write([]byte(trim + "\n")) - } else { - fallback, _ := json.Marshal(map[string]interface{}{"time": time.Now().UTC().Format(time.RFC3339), "level": "INFO", "msg": trim}) - _, _ = w.Write(append(fallback, '\n')) - } + if parsed, ok := parseLogLine(line); ok { + b, _ := json.Marshal(parsed) + _, _ = w.Write(append(b, '\n')) flusher.Flush() } } diff --git a/pkg/api/server_test.go b/pkg/api/server_test.go index bbaa950..10066da 100644 --- a/pkg/api/server_test.go +++ b/pkg/api/server_test.go @@ -2,14 +2,19 @@ package api import ( "bytes" + "context" "encoding/json" "net/http" "net/http/httptest" + "os" "path/filepath" "strings" "testing" + "time" cfgpkg "clawgo/pkg/config" + "clawgo/pkg/nodes" + "github.com/gorilla/websocket" ) func TestHandleWebUIConfigRequiresConfirmForProviderAPIBaseChange(t *testing.T) { @@ -107,3 +112,253 @@ func TestHandleWebUIConfigRequiresConfirmForCustomProviderSecretChange(t *testin t.Fatalf("expected providers.proxies.backup.api_key in changed_fields, got: %s", rec.Body.String()) } } + +func TestHandleNodeConnectRegistersAndHeartbeatsNode(t *testing.T) { + t.Parallel() + + mgr := nodes.NewManager() + srv := NewServer("127.0.0.1", 0, "", mgr) + mux := http.NewServeMux() + mux.HandleFunc("/nodes/connect", srv.handleNodeConnect) + httpSrv := httptest.NewServer(mux) + defer httpSrv.Close() + + wsURL := "ws" + strings.TrimPrefix(httpSrv.URL, "http") + "/nodes/connect" + conn, _, err := websocket.DefaultDialer.Dial(wsURL, nil) + if err != nil { + t.Fatalf("dial websocket: %v", err) + } + defer conn.Close() + + info := nodes.NodeInfo{ + ID: "edge-dev", + Name: "Edge Dev", + Endpoint: "http://edge.example:18790", + Capabilities: nodes.Capabilities{ + Run: true, Invoke: true, Model: true, + }, + } + if err := conn.WriteJSON(nodes.WireMessage{Type: "register", Node: &info}); err != nil { + t.Fatalf("write register: %v", err) + } + var regAck nodes.WireAck + if err := conn.ReadJSON(®Ack); err != nil { + t.Fatalf("read register ack: %v", err) + } + if !regAck.OK || regAck.Type != "registered" || regAck.ID != "edge-dev" { + t.Fatalf("unexpected register ack: %+v", regAck) + } + + stored, ok := mgr.Get("edge-dev") + if !ok || !stored.Online { + t.Fatalf("expected node to be online after register, got %+v ok=%v", stored, ok) + } + + if err := conn.WriteJSON(nodes.WireMessage{Type: "heartbeat", ID: "edge-dev"}); err != nil { + t.Fatalf("write heartbeat: %v", err) + } + var hbAck nodes.WireAck + if err := conn.ReadJSON(&hbAck); err != nil { + t.Fatalf("read heartbeat ack: %v", err) + } + if !hbAck.OK || hbAck.Type != "heartbeat" || hbAck.ID != "edge-dev" { + t.Fatalf("unexpected heartbeat ack: %+v", hbAck) + } +} + +func TestHandleNodeConnectReconnectKeepsNewestSessionOnline(t *testing.T) { + t.Parallel() + + mgr := nodes.NewManager() + srv := NewServer("127.0.0.1", 0, "", mgr) + mux := http.NewServeMux() + mux.HandleFunc("/nodes/connect", srv.handleNodeConnect) + httpSrv := httptest.NewServer(mux) + defer httpSrv.Close() + + wsURL := "ws" + strings.TrimPrefix(httpSrv.URL, "http") + "/nodes/connect" + connect := func() *websocket.Conn { + conn, _, err := websocket.DefaultDialer.Dial(wsURL, nil) + if err != nil { + t.Fatalf("dial websocket: %v", err) + } + if err := conn.WriteJSON(nodes.WireMessage{Type: "register", Node: &nodes.NodeInfo{ID: "edge-dev", Name: "Edge Dev"}}); err != nil { + t.Fatalf("write register: %v", err) + } + var ack nodes.WireAck + if err := conn.ReadJSON(&ack); err != nil { + t.Fatalf("read register ack: %v", err) + } + if !ack.OK { + t.Fatalf("unexpected register ack: %+v", ack) + } + return conn + } + + first := connect() + second := connect() + + if err := first.Close(); err != nil { + t.Fatalf("close first connection: %v", err) + } + time.Sleep(100 * time.Millisecond) + + got, ok := mgr.Get("edge-dev") + if !ok || !got.Online { + t.Fatalf("expected newest session to keep node online, got %+v ok=%v", got, ok) + } + + _ = second.Close() +} + +func TestHandleWebUISubagentsRuntimeLive(t *testing.T) { + t.Parallel() + + srv := NewServer("127.0.0.1", 0, "", nodes.NewManager()) + srv.SetSubagentHandler(func(ctx context.Context, action string, args map[string]interface{}) (interface{}, error) { + switch action { + case "thread": + return map[string]interface{}{ + "thread": map[string]interface{}{"thread_id": "thread-1"}, + "messages": []map[string]interface{}{ + {"message_id": "msg-1", "content": "hello"}, + }, + }, nil + case "inbox": + return map[string]interface{}{ + "messages": []map[string]interface{}{ + {"message_id": "msg-2", "content": "reply"}, + }, + }, nil + case "stream": + return map[string]interface{}{ + "task": map[string]interface{}{"id": "subagent-1"}, + "items": []map[string]interface{}{ + {"kind": "event", "message": "progress"}, + }, + }, nil + default: + return map[string]interface{}{}, nil + } + }) + + mux := http.NewServeMux() + mux.HandleFunc("/webui/api/subagents_runtime/live", srv.handleWebUISubagentsRuntimeLive) + httpSrv := httptest.NewServer(mux) + defer httpSrv.Close() + + wsURL := "ws" + strings.TrimPrefix(httpSrv.URL, "http") + "/webui/api/subagents_runtime/live?task_id=subagent-1&preview_task_id=subagent-1" + conn, _, err := websocket.DefaultDialer.Dial(wsURL, nil) + if err != nil { + t.Fatalf("dial websocket: %v", err) + } + defer conn.Close() + + var msg map[string]interface{} + if err := conn.ReadJSON(&msg); err != nil { + t.Fatalf("read live snapshot: %v", err) + } + payload, _ := msg["payload"].(map[string]interface{}) + thread, _ := payload["thread"].(map[string]interface{}) + inbox, _ := payload["inbox"].(map[string]interface{}) + preview, _ := payload["preview"].(map[string]interface{}) + if thread == nil || inbox == nil || preview == nil { + t.Fatalf("expected thread/inbox/preview payload, got: %+v", msg) + } +} + +func TestHandleWebUIChatLive(t *testing.T) { + t.Parallel() + + srv := NewServer("127.0.0.1", 0, "", nodes.NewManager()) + srv.SetChatHandler(func(ctx context.Context, sessionKey, content string) (string, error) { + if sessionKey != "main" { + t.Fatalf("unexpected session key: %s", sessionKey) + } + if content != "hello" { + t.Fatalf("unexpected content: %s", content) + } + return "world", nil + }) + + mux := http.NewServeMux() + mux.HandleFunc("/webui/api/chat/live", srv.handleWebUIChatLive) + httpSrv := httptest.NewServer(mux) + defer httpSrv.Close() + + wsURL := "ws" + strings.TrimPrefix(httpSrv.URL, "http") + "/webui/api/chat/live" + conn, _, err := websocket.DefaultDialer.Dial(wsURL, nil) + if err != nil { + t.Fatalf("dial websocket: %v", err) + } + defer conn.Close() + + if err := conn.WriteJSON(map[string]interface{}{"session": "main", "message": "hello"}); err != nil { + t.Fatalf("write chat request: %v", err) + } + + _ = conn.SetReadDeadline(time.Now().Add(3 * time.Second)) + var chunk map[string]interface{} + if err := conn.ReadJSON(&chunk); err != nil { + t.Fatalf("read chat chunk: %v", err) + } + if chunk["type"] != "chat_chunk" || chunk["delta"] != "world" { + t.Fatalf("unexpected chat chunk: %+v", chunk) + } + + var done map[string]interface{} + if err := conn.ReadJSON(&done); err != nil { + t.Fatalf("read chat done: %v", err) + } + if done["type"] != "chat_done" { + t.Fatalf("unexpected chat done: %+v", done) + } +} + +func TestHandleWebUILogsLive(t *testing.T) { + t.Parallel() + + tmp := t.TempDir() + logPath := filepath.Join(tmp, "app.log") + if err := os.WriteFile(logPath, []byte(""), 0o644); err != nil { + t.Fatalf("write log file: %v", err) + } + + srv := NewServer("127.0.0.1", 0, "", nodes.NewManager()) + srv.SetLogFilePath(logPath) + + mux := http.NewServeMux() + mux.HandleFunc("/webui/api/logs/live", srv.handleWebUILogsLive) + httpSrv := httptest.NewServer(mux) + defer httpSrv.Close() + + wsURL := "ws" + strings.TrimPrefix(httpSrv.URL, "http") + "/webui/api/logs/live" + conn, _, err := websocket.DefaultDialer.Dial(wsURL, nil) + if err != nil { + t.Fatalf("dial websocket: %v", err) + } + defer conn.Close() + + go func() { + time.Sleep(200 * time.Millisecond) + f, err := os.OpenFile(logPath, os.O_APPEND|os.O_WRONLY, 0o644) + if err != nil { + return + } + defer f.Close() + _, _ = f.WriteString(`{"level":"INFO","msg":"tail-ok"}` + "\n") + }() + + _ = conn.SetReadDeadline(time.Now().Add(3 * time.Second)) + var msg map[string]interface{} + if err := conn.ReadJSON(&msg); err != nil { + t.Fatalf("read log entry: %v", err) + } + entry, _ := msg["entry"].(map[string]interface{}) + if entry == nil { + t.Fatalf("expected entry payload, got: %+v", msg) + } + if entry["msg"] != "tail-ok" { + t.Fatalf("expected tail-ok entry, got: %+v", entry) + } +} diff --git a/pkg/nodes/types.go b/pkg/nodes/types.go index fcc7e50..039d771 100644 --- a/pkg/nodes/types.go +++ b/pkg/nodes/types.go @@ -48,3 +48,18 @@ type Response struct { Action string `json:"action,omitempty"` Payload map[string]interface{} `json:"payload,omitempty"` } + +// WireMessage is the websocket envelope for node lifecycle messages. +type WireMessage struct { + Type string `json:"type"` + ID string `json:"id,omitempty"` + Node *NodeInfo `json:"node,omitempty"` +} + +// WireAck is the websocket response envelope for node lifecycle messages. +type WireAck struct { + OK bool `json:"ok"` + Type string `json:"type"` + ID string `json:"id,omitempty"` + Error string `json:"error,omitempty"` +} diff --git a/pkg/tools/subagent_runtime_control_test.go b/pkg/tools/subagent_runtime_control_test.go index 29ae8d2..9adb7ef 100644 --- a/pkg/tools/subagent_runtime_control_test.go +++ b/pkg/tools/subagent_runtime_control_test.go @@ -391,6 +391,7 @@ func TestSubagentManagerAutoRecoversRunningTaskAfterRestart(t *testing.T) { t.Fatalf("expected running task to auto-recover after restart") } + _ = waitSubagentDone(t, reloaded, 4*time.Second) got, ok := reloaded.GetTask("subagent-1") if !ok { t.Fatalf("expected recovered task to exist") diff --git a/webui/package-lock.json b/webui/package-lock.json index 24eb198..2a5975e 100644 --- a/webui/package-lock.json +++ b/webui/package-lock.json @@ -8,6 +8,7 @@ "name": "clawgo-webui", "version": "0.0.0", "dependencies": { + "@types/ws": "^8.18.1", "express": "^4.21.2", "i18next": "^25.8.13", "i18next-browser-languagedetector": "^8.2.1", @@ -17,7 +18,8 @@ "react": "^19.0.0", "react-dom": "^19.0.0", "react-i18next": "^16.5.4", - "react-router-dom": "^7.13.1" + "react-router-dom": "^7.13.1", + "ws": "^8.19.0" }, "devDependencies": { "@tailwindcss/vite": "^4.1.14", @@ -1226,7 +1228,6 @@ "version": "22.19.11", "resolved": "https://registry.npmjs.org/@types/node/-/node-22.19.11.tgz", "integrity": "sha512-BH7YwL6rA93ReqeQS1c4bsPpcfOmJasG+Fkr6Y59q83f9M1WcBRHR2vM+P9eOisYRcN3ujQoiZY8uk5W+1WL8w==", - "dev": true, "license": "MIT", "dependencies": { "undici-types": "~6.21.0" @@ -1299,6 +1300,15 @@ "@types/node": "*" } }, + "node_modules/@types/ws": { + "version": "8.18.1", + "resolved": "https://registry.npmjs.org/@types/ws/-/ws-8.18.1.tgz", + "integrity": "sha512-ThVF6DCVhA8kUGy+aazFQ4kXQ7E1Ty7A3ypFOe0IcJV8O/M511G99AW24irKrW56Wt44yG9+ij8FaqoBGkuBXg==", + "license": "MIT", + "dependencies": { + "@types/node": "*" + } + }, "node_modules/@vitejs/plugin-react": { "version": "5.1.4", "resolved": "https://registry.npmjs.org/@vitejs/plugin-react/-/plugin-react-5.1.4.tgz", @@ -3380,7 +3390,6 @@ "version": "6.21.0", "resolved": "https://registry.npmjs.org/undici-types/-/undici-types-6.21.0.tgz", "integrity": "sha512-iwDZqg0QAGrg9Rav5H4n0M64c3mkR59cJ6wQp+7C4nI0gsmExaedaYLNO44eT4AtBBwjbTiGPMlt2Md0T9H9JQ==", - "dev": true, "license": "MIT" }, "node_modules/unpipe": { @@ -4024,6 +4033,27 @@ "node": ">=0.10.0" } }, + "node_modules/ws": { + "version": "8.19.0", + "resolved": "https://registry.npmjs.org/ws/-/ws-8.19.0.tgz", + "integrity": "sha512-blAT2mjOEIi0ZzruJfIhb3nps74PRWTCz1IjglWEEpQl5XS/UNama6u2/rjFkDDouqr4L67ry+1aGIALViWjDg==", + "license": "MIT", + "engines": { + "node": ">=10.0.0" + }, + "peerDependencies": { + "bufferutil": "^4.0.1", + "utf-8-validate": ">=5.0.2" + }, + "peerDependenciesMeta": { + "bufferutil": { + "optional": true + }, + "utf-8-validate": { + "optional": true + } + } + }, "node_modules/xtend": { "version": "4.0.2", "resolved": "https://registry.npmjs.org/xtend/-/xtend-4.0.2.tgz", diff --git a/webui/package.json b/webui/package.json index c462fad..9907999 100644 --- a/webui/package.json +++ b/webui/package.json @@ -12,6 +12,7 @@ "start": "node server.ts" }, "dependencies": { + "@types/ws": "^8.18.1", "express": "^4.21.2", "i18next": "^25.8.13", "i18next-browser-languagedetector": "^8.2.1", @@ -21,7 +22,8 @@ "react": "^19.0.0", "react-dom": "^19.0.0", "react-i18next": "^16.5.4", - "react-router-dom": "^7.13.1" + "react-router-dom": "^7.13.1", + "ws": "^8.19.0" }, "devDependencies": { "@tailwindcss/vite": "^4.1.14", diff --git a/webui/server.ts b/webui/server.ts index a1481af..18973fe 100644 --- a/webui/server.ts +++ b/webui/server.ts @@ -3,10 +3,14 @@ import { createServer as createViteServer } from "vite"; import { EventEmitter } from "events"; import multer from "multer"; import fs from "fs"; +import http from "http"; +import { WebSocketServer } from "ws"; const app = express(); const PORT = 3000; const logEmitter = new EventEmitter(); +const server = http.createServer(app); +const wss = new WebSocketServer({ server }); // In-memory only for local dev fallback (no sqlite persistence) const mem = { @@ -118,6 +122,8 @@ app.post("/webui/api/skills", (req, res) => { }); app.get("/webui/api/logs/stream", (req, res) => { + res.setHeader("Deprecation", "true"); + res.setHeader("X-Clawgo-Replaced-By", "/webui/api/logs/live"); res.setHeader("Content-Type", "application/x-ndjson"); res.setHeader("Cache-Control", "no-cache"); res.setHeader("Connection", "keep-alive"); @@ -130,6 +136,8 @@ app.get("/webui/api/logs/stream", (req, res) => { }); app.post("/webui/api/chat/stream", async (req, res) => { + res.setHeader("Deprecation", "true"); + res.setHeader("X-Clawgo-Replaced-By", "/webui/api/chat/live"); const { message } = req.body || {}; res.setHeader("Content-Type", "text/plain"); res.setHeader("Transfer-Encoding", "chunked"); @@ -141,6 +149,48 @@ app.post("/webui/api/chat/stream", async (req, res) => { res.end(); }); +wss.on("connection", (socket, req) => { + if (!req.url) return; + + if (req.url.startsWith("/webui/api/logs/live")) { + const onLog = (entry: any) => { + if (socket.readyState === socket.OPEN) { + socket.send(JSON.stringify({ ok: true, type: "log_entry", entry })); + } + }; + logEmitter.on("log", onLog); + onLog({ time: new Date().toISOString(), level: "INFO", msg: "Log stream connected" }); + socket.on("close", () => { + logEmitter.off("log", onLog); + }); + return; + } + + if (req.url.startsWith("/webui/api/chat/live")) { + socket.on("message", async (payload) => { + let message = ""; + try { + const body = JSON.parse(String(payload || "{}")); + message = String(body?.message || ""); + } catch { + if (socket.readyState === socket.OPEN) { + socket.send(JSON.stringify({ ok: false, type: "chat_error", error: "invalid json" })); + } + return; + } + const words = `Simulated streaming response: ${message}`.split(" "); + for (const word of words) { + if (socket.readyState !== socket.OPEN) return; + socket.send(JSON.stringify({ ok: true, type: "chat_chunk", delta: `${word} ` })); + await new Promise((r) => setTimeout(r, 40)); + } + if (socket.readyState === socket.OPEN) { + socket.send(JSON.stringify({ ok: true, type: "chat_done" })); + } + }); + } +}); + if (process.env.NODE_ENV !== "production") { const vite = await createViteServer({ server: { middlewareMode: true }, appType: "spa" }); app.use(vite.middlewares); @@ -151,7 +201,7 @@ if (process.env.NODE_ENV !== "production") { }); } -app.listen(PORT, "0.0.0.0", () => { +server.listen(PORT, "0.0.0.0", () => { console.log(`Server running on http://localhost:${PORT}`); addLog("INFO", "Gateway WebUI Server started"); }); diff --git a/webui/src/context/AppContext.tsx b/webui/src/context/AppContext.tsx index ab76eb2..645a695 100644 --- a/webui/src/context/AppContext.tsx +++ b/webui/src/context/AppContext.tsx @@ -1,6 +1,29 @@ import React, { createContext, useContext, useState, useEffect, useCallback } from 'react'; import { CronJob, Cfg, Session, Skill } from '../types'; +type RuntimeSnapshot = { + version?: { + gateway_version?: string; + webui_version?: string; + }; + nodes?: { + nodes?: any[]; + trees?: any[]; + }; + sessions?: { + sessions?: Array<{ key: string; title?: string; channel?: string }>; + }; + task_queue?: { + items?: any[]; + }; + ekg?: Record; + subagents?: { + items?: any[]; + registry?: any[]; + stream?: any[]; + }; +}; + interface AppContextType { token: string; sidebarOpen: boolean; @@ -32,6 +55,12 @@ interface AppContextType { setTaskQueueItems: React.Dispatch>; ekgSummary: Record; setEkgSummary: React.Dispatch>>; + subagentRuntimeItems: any[]; + setSubagentRuntimeItems: React.Dispatch>; + subagentRegistryItems: any[]; + setSubagentRegistryItems: React.Dispatch>; + subagentStreamItems: any[]; + setSubagentStreamItems: React.Dispatch>; refreshAll: () => Promise; refreshCron: () => Promise; refreshNodes: () => Promise; @@ -81,6 +110,9 @@ export const AppProvider: React.FC<{ children: React.ReactNode }> = ({ children const [sessions, setSessions] = useState([{ key: 'main', title: 'main' }]); const [taskQueueItems, setTaskQueueItems] = useState([]); const [ekgSummary, setEkgSummary] = useState>({}); + const [subagentRuntimeItems, setSubagentRuntimeItems] = useState([]); + const [subagentRegistryItems, setSubagentRegistryItems] = useState([]); + const [subagentStreamItems, setSubagentStreamItems] = useState([]); const [gatewayVersion, setGatewayVersion] = useState('unknown'); const [webuiVersion, setWebuiVersion] = useState('unknown'); const [hotReloadFields, setHotReloadFields] = useState([]); @@ -218,17 +250,86 @@ export const AppProvider: React.FC<{ children: React.ReactNode }> = ({ children useEffect(() => { refreshAll(); - const interval = setInterval(() => { - loadConfig(); - refreshCron(); - refreshNodes(); - refreshSkills(); - refreshSessions(); - refreshVersion(); - refreshTaskQueue(); - refreshEKGSummary(); - }, 10000); - return () => clearInterval(interval); + }, [token, refreshAll]); + + useEffect(() => { + let disposed = false; + let socket: WebSocket | null = null; + let retryTimer: number | null = null; + + const applySnapshot = (snapshot: RuntimeSnapshot) => { + if (snapshot.version) { + setGatewayVersion(snapshot.version.gateway_version || 'unknown'); + setWebuiVersion(snapshot.version.webui_version || 'unknown'); + } + if (snapshot.nodes) { + setNodes(JSON.stringify(Array.isArray(snapshot.nodes.nodes) ? snapshot.nodes.nodes : [], null, 2)); + setNodeTrees(JSON.stringify(Array.isArray(snapshot.nodes.trees) ? snapshot.nodes.trees : [], null, 2)); + } + if (snapshot.sessions) { + const arr = Array.isArray(snapshot.sessions.sessions) ? snapshot.sessions.sessions : []; + setSessions(arr.map((s) => ({ key: s.key, title: s.title || s.key }))); + } + if (snapshot.task_queue) { + setTaskQueueItems(Array.isArray(snapshot.task_queue.items) ? snapshot.task_queue.items : []); + } + if (snapshot.ekg && typeof snapshot.ekg === 'object') { + setEkgSummary(snapshot.ekg); + } + if (snapshot.subagents) { + setSubagentRuntimeItems(Array.isArray(snapshot.subagents.items) ? snapshot.subagents.items : []); + setSubagentRegistryItems(Array.isArray(snapshot.subagents.registry) ? snapshot.subagents.registry : []); + setSubagentStreamItems(Array.isArray(snapshot.subagents.stream) ? snapshot.subagents.stream : []); + } + }; + + const connect = () => { + try { + const proto = window.location.protocol === 'https:' ? 'wss:' : 'ws:'; + const url = new URL(`${proto}//${window.location.host}/webui/api/runtime`); + if (token) url.searchParams.set('token', token); + socket = new WebSocket(url.toString()); + socket.onopen = () => { + setIsGatewayOnline(true); + }; + socket.onmessage = (event) => { + try { + const msg = JSON.parse(event.data); + if (msg?.type === 'runtime_snapshot' && msg.snapshot) { + applySnapshot(msg.snapshot as RuntimeSnapshot); + setIsGatewayOnline(true); + } + } catch (err) { + console.error(err); + } + }; + socket.onerror = () => { + setIsGatewayOnline(false); + }; + socket.onclose = () => { + socket = null; + if (disposed) return; + setIsGatewayOnline(false); + retryTimer = window.setTimeout(connect, 3000); + }; + } catch (err) { + console.error(err); + setIsGatewayOnline(false); + retryTimer = window.setTimeout(connect, 3000); + } + }; + + connect(); + + return () => { + disposed = true; + if (retryTimer !== null) { + window.clearTimeout(retryTimer); + } + if (socket) { + socket.close(); + } + }; }, [token, refreshAll, loadConfig, refreshCron, refreshNodes, refreshSkills, refreshSessions, refreshVersion, refreshTaskQueue, refreshEKGSummary]); useEffect(() => { @@ -246,6 +347,7 @@ export const AppProvider: React.FC<{ children: React.ReactNode }> = ({ children cron, setCron, skills, setSkills, clawhubInstalled, clawhubPath, sessions, setSessions, taskQueueItems, setTaskQueueItems, ekgSummary, setEkgSummary, + subagentRuntimeItems, setSubagentRuntimeItems, subagentRegistryItems, setSubagentRegistryItems, subagentStreamItems, setSubagentStreamItems, refreshAll, refreshCron, refreshNodes, refreshSkills, refreshSessions, refreshTaskQueue, refreshEKGSummary, refreshVersion, loadConfig, gatewayVersion, webuiVersion, hotReloadFields, hotReloadFieldDetails, q }}> diff --git a/webui/src/pages/Chat.tsx b/webui/src/pages/Chat.tsx index b9b54b8..433cfce 100644 --- a/webui/src/pages/Chat.tsx +++ b/webui/src/pages/Chat.tsx @@ -110,7 +110,7 @@ function collectActors(items: StreamItem[]): string[] { const Chat: React.FC = () => { const { t } = useTranslation(); - const { q, sessions } = useAppContext(); + const { q, sessions, subagentRuntimeItems, subagentRegistryItems, subagentStreamItems } = useAppContext(); const ui = useUI(); const [mainChat, setMainChat] = useState([]); const [subagentStream, setSubagentStream] = useState([]); @@ -204,6 +204,10 @@ const Chat: React.FC = () => { const loadSubagentGroup = async () => { try { + if (subagentStreamItems.length > 0) { + setSubagentStream(subagentStreamItems); + return; + } shouldAutoScrollRef.current = isNearBottom() || chatTab !== 'subagents'; const r = await fetch(`/webui/api/subagents_runtime${q}`, { method: 'POST', @@ -221,6 +225,14 @@ const Chat: React.FC = () => { const loadRegistryAgents = async () => { try { + if (subagentRegistryItems.length > 0) { + const filtered = subagentRegistryItems.filter((item: RegistryAgent) => item?.agent_id && item.enabled !== false); + setRegistryAgents(filtered); + if (!dispatchAgentID && filtered.length > 0) { + setDispatchAgentID(String(filtered[0].agent_id || '')); + } + return; + } const r = await fetch(`/webui/api/subagents_runtime${q}`, { method: 'POST', headers: { 'Content-Type': 'application/json' }, @@ -241,6 +253,10 @@ const Chat: React.FC = () => { const loadRuntimeTasks = async () => { try { + if (subagentRuntimeItems.length > 0) { + setRuntimeTasks(subagentRuntimeItems); + return; + } const r = await fetch(`/webui/api/subagents_runtime${q}`, { method: 'POST', headers: { 'Content-Type': 'application/json' }, @@ -292,16 +308,10 @@ const Chat: React.FC = () => { if (input) input.value = ''; try { - const response = await fetch(`/webui/api/chat/stream${q}`, { - method: 'POST', - headers: { 'Content-Type': 'application/json' }, - body: JSON.stringify({ session: sessionKey, message: currentMsg, media }), - }); - - if (!response.ok || !response.body) throw new Error('Chat request failed'); - - const reader = response.body.getReader(); - const decoder = new TextDecoder(); + const proto = window.location.protocol === 'https:' ? 'wss:' : 'ws:'; + const url = new URL(`${proto}//${window.location.host}/webui/api/chat/live`); + const token = new URLSearchParams(q.startsWith('?') ? q.slice(1) : q).get('token'); + if (token) url.searchParams.set('token', token); let assistantText = ''; setMainChat((prev) => [...prev, { @@ -314,20 +324,55 @@ const Chat: React.FC = () => { avatarClassName: 'bg-emerald-600/80 text-white', }]); - while (true) { - const { value, done } = await reader.read(); - if (done) break; - const chunk = decoder.decode(value, { stream: true }); - assistantText += chunk; - setMainChat((prev) => { - const next = [...prev]; - next[next.length - 1] = { - ...next[next.length - 1], - text: assistantText, - }; - return next; - }); - } + await new Promise((resolve, reject) => { + const ws = new WebSocket(url.toString()); + let settled = false; + ws.onopen = () => { + ws.send(JSON.stringify({ session: sessionKey, message: currentMsg, media })); + }; + ws.onmessage = (event) => { + try { + const payload = JSON.parse(event.data); + if (payload?.type === 'chat_chunk' && typeof payload?.delta === 'string') { + assistantText += payload.delta; + setMainChat((prev) => { + const next = [...prev]; + next[next.length - 1] = { + ...next[next.length - 1], + text: assistantText, + }; + return next; + }); + return; + } + if (payload?.type === 'chat_done') { + settled = true; + ws.close(); + resolve(); + return; + } + if (payload?.type === 'chat_error') { + settled = true; + ws.close(); + reject(new Error(payload?.error || 'Chat request failed')); + } + } catch (e) { + settled = true; + ws.close(); + reject(e); + } + }; + ws.onerror = () => { + settled = true; + ws.close(); + reject(new Error('Chat request failed')); + }; + ws.onclose = () => { + if (!settled && !assistantText) { + reject(new Error('Chat request failed')); + } + }; + }); loadHistory(); } catch (e) { @@ -354,16 +399,7 @@ const Chat: React.FC = () => { loadSubagentGroup(); loadRegistryAgents(); loadRuntimeTasks(); - }, [q, chatTab, sessionKey]); - - useEffect(() => { - if (chatTab !== 'subagents') return; - const timer = window.setInterval(() => { - loadSubagentGroup(); - loadRuntimeTasks(); - }, 5000); - return () => window.clearInterval(timer); - }, [q, chatTab]); + }, [q, chatTab, sessionKey, subagentRuntimeItems, subagentRegistryItems, subagentStreamItems]); const userSessions = (sessions || []).filter((s: any) => !String(s?.key || '').startsWith('subagent:')); @@ -538,7 +574,7 @@ const Chat: React.FC = () => { )} - + {chatTab === 'subagents' && ( diff --git a/webui/src/pages/Logs.tsx b/webui/src/pages/Logs.tsx index 5513217..97430d3 100644 --- a/webui/src/pages/Logs.tsx +++ b/webui/src/pages/Logs.tsx @@ -15,7 +15,7 @@ const Logs: React.FC = () => { const [isStreaming, setIsStreaming] = useState(true); const [showRaw, setShowRaw] = useState(false); const logEndRef = useRef(null); - const abortControllerRef = useRef(null); + const socketRef = useRef(null); const loadRecent = async () => { try { @@ -30,42 +30,39 @@ const Logs: React.FC = () => { } }; - const startStreaming = async () => { - if (abortControllerRef.current) abortControllerRef.current.abort(); - abortControllerRef.current = new AbortController(); + const closeSocket = () => { + if (socketRef.current) { + socketRef.current.close(); + socketRef.current = null; + } + }; - try { - const response = await fetch(`/webui/api/logs/stream${q}`, { - signal: abortControllerRef.current.signal, - }); + const startStreaming = () => { + closeSocket(); + const proto = window.location.protocol === 'https:' ? 'wss:' : 'ws:'; + const url = new URL(`${proto}//${window.location.host}/webui/api/logs/live`); + const token = new URLSearchParams(q.startsWith('?') ? q.slice(1) : q).get('token'); + if (token) url.searchParams.set('token', token); - if (!response.body) return; - - const reader = response.body.getReader(); - const decoder = new TextDecoder(); - - while (true) { - const { value, done } = await reader.read(); - if (done) break; - - const chunk = decoder.decode(value, { stream: true }); - const lines = chunk.split('\n').filter(line => line.trim()); - - lines.forEach(line => { - try { - const log = normalizeLog(JSON.parse(line)); - setLogs(prev => [...prev.slice(-1000), log]); - } catch (e) { - // Fallback for non-JSON logs - setLogs(prev => [...prev.slice(-1000), normalizeLog({ time: new Date().toISOString(), level: 'INFO', msg: line })]); - } - }); - } - } catch (e: any) { - if (e.name !== 'AbortError') { + const ws = new WebSocket(url.toString()); + socketRef.current = ws; + ws.onmessage = (event) => { + try { + const payload = JSON.parse(event.data); + const log = normalizeLog(payload?.entry ?? payload); + setLogs(prev => [...prev.slice(-1000), log]); + } catch (e) { console.error('L0097', e); } - } + }; + ws.onerror = (e) => { + console.error('L0097', e); + }; + ws.onclose = () => { + if (socketRef.current === ws) { + socketRef.current = null; + } + }; }; const loadCodeMap = async () => { @@ -100,11 +97,11 @@ const Logs: React.FC = () => { if (isStreaming) { startStreaming(); } else { - if (abortControllerRef.current) abortControllerRef.current.abort(); + closeSocket(); } return () => { - if (abortControllerRef.current) abortControllerRef.current.abort(); + closeSocket(); }; }, [isStreaming, q]); diff --git a/webui/src/pages/Subagents.tsx b/webui/src/pages/Subagents.tsx index 7f25929..1fa7f07 100644 --- a/webui/src/pages/Subagents.tsx +++ b/webui/src/pages/Subagents.tsx @@ -233,6 +233,14 @@ function summarizePreviewText(value?: string, limit = 180): string { return compact.length > limit ? `${compact.slice(0, limit - 3)}...` : compact; } +function tokenFromQuery(q: string): string { + const raw = String(q || '').trim(); + if (!raw) return ''; + const search = raw.startsWith('?') ? raw.slice(1) : raw; + const params = new URLSearchParams(search); + return params.get('token') || ''; +} + function bezierCurve(x1: number, y1: number, x2: number, y2: number): string { const offset = Math.max(Math.abs(y2 - y1) * 0.5, 60); return `M ${x1} ${y1} C ${x1} ${y1 + offset} ${x2} ${y2 - offset} ${x2} ${y2}`; @@ -352,7 +360,7 @@ function GraphCard({ const Subagents: React.FC = () => { const { t } = useTranslation(); - const { q, nodeTrees } = useAppContext(); + const { q, nodeTrees, subagentRuntimeItems, subagentRegistryItems } = useAppContext(); const ui = useUI(); const [items, setItems] = useState([]); @@ -426,6 +434,24 @@ const Subagents: React.FC = () => { const load = async () => { try { + if (subagentRuntimeItems.length > 0 || subagentRegistryItems.length > 0) { + const arr = Array.isArray(subagentRuntimeItems) ? subagentRuntimeItems : []; + const registry = Array.isArray(subagentRegistryItems) ? subagentRegistryItems : []; + setItems(arr); + setRegistryItems(registry); + if (registry.length === 0) { + setSelectedAgentID(''); + setSelectedId(''); + } else { + const nextAgentID = selectedAgentID && registry.find((x: RegistrySubagent) => x.agent_id === selectedAgentID) + ? selectedAgentID + : (registry[0]?.agent_id || ''); + setSelectedAgentID(nextAgentID); + const nextTask = arr.find((x: SubagentTask) => x.agent_id === nextAgentID); + setSelectedId(nextTask?.id || ''); + } + return; + } const [tasksRes, registryRes] = await Promise.all([ fetch(withAction('list')), fetch(withAction('registry')), @@ -459,14 +485,7 @@ const Subagents: React.FC = () => { useEffect(() => { load().catch(() => { }); - }, [q, selectedAgentID]); - - useEffect(() => { - const timer = window.setInterval(() => { - load().catch(() => { }); - }, 5000); - return () => window.clearInterval(timer); - }, [q, selectedAgentID]); + }, [q, selectedAgentID, subagentRuntimeItems, subagentRegistryItems]); const selected = useMemo(() => items.find((x) => x.id === selectedId) || null, [items, selectedId]); const parsedNodeTrees = useMemo(() => { @@ -947,10 +966,6 @@ const Subagents: React.FC = () => { } catch (e) { } }; - useEffect(() => { - loadThreadAndInbox(selected).catch(() => { }); - }, [selectedId, q, items]); - const loadStreamPreview = async (agentID: string, task: SubagentTask | null) => { const taskID = task?.id || ''; if (!agentID) return; @@ -1000,10 +1015,81 @@ const Subagents: React.FC = () => { }; useEffect(() => { - if (!topologyTooltip?.agentID || topologyTooltip.transportType !== 'local') return; - const latestTask = recentTaskByAgent[topologyTooltip.agentID] || null; - loadStreamPreview(topologyTooltip.agentID, latestTask).catch(() => { }); - }, [topologyTooltip?.agentID, topologyTooltip?.transportType, recentTaskByAgent, q]); + const selectedTaskID = String(selected?.id || '').trim(); + const previewAgentID = topologyTooltip?.transportType === 'local' ? String(topologyTooltip.agentID || '').trim() : ''; + const previewTask = previewAgentID ? recentTaskByAgent[previewAgentID] || null : null; + const previewTaskID = String(previewTask?.id || '').trim(); + + if (!selectedTaskID) { + setThreadDetail(null); + setThreadMessages([]); + setInboxMessages([]); + } + if (!previewAgentID) { + return; + } + setStreamPreviewByAgent((prev) => ({ + ...prev, + [previewAgentID]: { + task: previewTask, + items: prev[previewAgentID]?.items || [], + taskID: previewTaskID, + loading: !!previewTaskID, + }, + })); + + if (!selectedTaskID && !previewTaskID) return; + + const proto = window.location.protocol === 'https:' ? 'wss:' : 'ws:'; + const url = new URL(`${proto}//${window.location.host}/webui/api/subagents_runtime/live`); + if (tokenFromQuery(q)) url.searchParams.set('token', tokenFromQuery(q)); + if (selectedTaskID) url.searchParams.set('task_id', selectedTaskID); + if (previewTaskID) url.searchParams.set('preview_task_id', previewTaskID); + + const ws = new WebSocket(url.toString()); + ws.onmessage = (event) => { + try { + const msg = JSON.parse(event.data); + const payload = msg?.payload || {}; + if (payload.thread) { + setThreadDetail(payload.thread.thread || null); + setThreadMessages(Array.isArray(payload.thread.messages) ? payload.thread.messages : []); + } + if (payload.inbox) { + setInboxMessages(Array.isArray(payload.inbox.messages) ? payload.inbox.messages : []); + } + if (previewAgentID && payload.preview) { + setStreamPreviewByAgent((prev) => ({ + ...prev, + [previewAgentID]: { + task: payload.preview.task || previewTask, + items: Array.isArray(payload.preview.items) ? payload.preview.items : [], + taskID: previewTaskID, + loading: false, + }, + })); + } + } catch (err) { + console.error(err); + } + }; + ws.onerror = () => { + if (previewAgentID) { + setStreamPreviewByAgent((prev) => ({ + ...prev, + [previewAgentID]: { + task: previewTask, + items: prev[previewAgentID]?.items || [], + taskID: previewTaskID, + loading: false, + }, + })); + } + }; + return () => { + ws.close(); + }; + }, [selected?.id, topologyTooltip?.agentID, topologyTooltip?.transportType, recentTaskByAgent, q]); return (