From 07cf54538e6e296adcd2d871349a6896bcf98f17 Mon Sep 17 00:00:00 2001 From: DBT Date: Tue, 24 Feb 2026 16:06:44 +0000 Subject: [PATCH] add gateway token-based node registration over host:port and shared nodes manager --- README.md | 2 + README_EN.md | 2 + cmd/clawgo/cmd_gateway.go | 8 ++++ config.example.json | 3 +- pkg/agent/loop.go | 2 +- pkg/config/config.go | 10 +++-- pkg/nodes/manager.go | 4 ++ pkg/nodes/registry_server.go | 75 ++++++++++++++++++++++++++++++++++++ 8 files changed, 100 insertions(+), 6 deletions(-) create mode 100644 pkg/nodes/registry_server.go diff --git a/README.md b/README.md index 2a160a2..1a076a2 100644 --- a/README.md +++ b/README.md @@ -43,6 +43,8 @@ - `action=run|invoke|camera_snap|screen_record|location_get`:已接入路由框架 - `mode=auto|p2p|relay`:默认 `auto`(优先 p2p,失败回退 relay) - relay 已支持 HTTP 节点桥接:按 action 路由到 `/run` `/camera/snap` `/screen/record` `/location/get` `/canvas/*`(未知 action 回退 `/invoke`) +- 主节点网关支持节点注册:`POST http://:/nodes/register` +- 可在 `gateway.token` 配置网关注册令牌;子节点注册需带 `Authorization: Bearer ` - 可在 `NodeInfo` 中配置 `token`,relay 会自动附加 `Authorization: Bearer ` - `nodes` 工具支持设备快捷参数:`facing`、`duration_ms`、`command` diff --git a/README_EN.md b/README_EN.md index 59a4f1a..fd4a0fb 100644 --- a/README_EN.md +++ b/README_EN.md @@ -43,6 +43,8 @@ A `nodes` tool control-plane PoC is now available: - `action=run|invoke|camera_snap|screen_record|location_get`: routing framework is in place - `mode=auto|p2p|relay`: default `auto` (prefer p2p, fallback to relay) - relay now supports HTTP node bridging with action-specific routes: `/run`, `/camera/snap`, `/screen/record`, `/location/get`, `/canvas/*` (unknown action falls back to `/invoke`) +- gateway supports node registration: `POST http://:/nodes/register` +- configure `gateway.token` as registration token; child nodes must send `Authorization: Bearer ` - `NodeInfo.token` is supported; relay automatically sets `Authorization: Bearer ` - `nodes` tool supports device shortcuts: `facing`, `duration_ms`, `command` diff --git a/cmd/clawgo/cmd_gateway.go b/cmd/clawgo/cmd_gateway.go index 633c0fd..7d7b6c6 100644 --- a/cmd/clawgo/cmd_gateway.go +++ b/cmd/clawgo/cmd_gateway.go @@ -23,6 +23,7 @@ import ( "clawgo/pkg/cron" "clawgo/pkg/heartbeat" "clawgo/pkg/logger" + "clawgo/pkg/nodes" "clawgo/pkg/providers" "clawgo/pkg/runtimecfg" "clawgo/pkg/sentinel" @@ -176,6 +177,13 @@ func gatewayCmd() { fmt.Printf("Error starting channels: %v\n", err) } + registryServer := nodes.NewRegistryServer(cfg.Gateway.Host, cfg.Gateway.Port, cfg.Gateway.Token, nodes.DefaultManager()) + if err := registryServer.Start(ctx); err != nil { + fmt.Printf("Error starting node registry server: %v\n", err) + } else { + fmt.Printf("✓ Node registry server started on %s:%d\n", cfg.Gateway.Host, cfg.Gateway.Port) + } + go agentLoop.Run(ctx) go runGatewayStartupCompactionCheck(ctx, agentLoop) go runGatewayBootstrapInit(ctx, cfg, agentLoop) diff --git a/config.example.json b/config.example.json index 9731f27..a562359 100644 --- a/config.example.json +++ b/config.example.json @@ -158,7 +158,8 @@ }, "gateway": { "host": "0.0.0.0", - "port": 18790 + "port": 18790, + "token": "" }, "cron": { "min_sleep_sec": 1, diff --git a/pkg/agent/loop.go b/pkg/agent/loop.go index 424119d..c5d5171 100644 --- a/pkg/agent/loop.go +++ b/pkg/agent/loop.go @@ -77,7 +77,7 @@ func NewAgentLoop(cfg *config.Config, msgBus *bus.MessageBus, provider providers toolsRegistry.Register(tools.NewAliasTool("edit", "Edit file content (OpenClaw-compatible alias of edit_file)", tools.NewEditFileTool(workspace), map[string]string{"file_path": "path", "old_string": "oldText", "new_string": "newText"})) toolsRegistry.Register(tools.NewExecTool(cfg.Tools.Shell, workspace, processManager)) toolsRegistry.Register(tools.NewProcessTool(processManager)) - nodesManager := nodes.NewManager() + nodesManager := nodes.DefaultManager() nodesManager.Upsert(nodes.NodeInfo{ID: "local", Name: "local", Capabilities: nodes.Capabilities{Run: true, Invoke: true, Camera: true, Screen: true, Location: true, Canvas: true}, Online: true}) nodesManager.RegisterHandler("local", func(req nodes.Request) nodes.Response { switch req.Action { diff --git a/pkg/config/config.go b/pkg/config/config.go index 6c0cfe3..dda93ae 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -194,8 +194,9 @@ type ProviderConfig struct { } type GatewayConfig struct { - Host string `json:"host" env:"CLAWGO_GATEWAY_HOST"` - Port int `json:"port" env:"CLAWGO_GATEWAY_PORT"` + Host string `json:"host" env:"CLAWGO_GATEWAY_HOST"` + Port int `json:"port" env:"CLAWGO_GATEWAY_PORT"` + Token string `json:"token" env:"CLAWGO_GATEWAY_TOKEN"` } type CronConfig struct { @@ -423,8 +424,9 @@ func DefaultConfig() *Config { Proxies: map[string]ProviderConfig{}, }, Gateway: GatewayConfig{ - Host: "0.0.0.0", - Port: 18790, + Host: "0.0.0.0", + Port: 18790, + Token: "", }, Cron: CronConfig{ MinSleepSec: 1, diff --git a/pkg/nodes/manager.go b/pkg/nodes/manager.go index 0fa88cc..3a25a42 100644 --- a/pkg/nodes/manager.go +++ b/pkg/nodes/manager.go @@ -16,6 +16,10 @@ type Manager struct { handlers map[string]Handler } +var defaultManager = NewManager() + +func DefaultManager() *Manager { return defaultManager } + func NewManager() *Manager { return &Manager{nodes: map[string]NodeInfo{}, handlers: map[string]Handler{}} } diff --git a/pkg/nodes/registry_server.go b/pkg/nodes/registry_server.go new file mode 100644 index 0000000..d6d1a0e --- /dev/null +++ b/pkg/nodes/registry_server.go @@ -0,0 +1,75 @@ +package nodes + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + "strings" + "time" +) + +type RegistryServer struct { + addr string + token string + mgr *Manager + server *http.Server +} + +func NewRegistryServer(host string, port int, token string, mgr *Manager) *RegistryServer { + addr := strings.TrimSpace(host) + if addr == "" { + addr = "0.0.0.0" + } + if port <= 0 { + port = 7788 + } + return &RegistryServer{addr: fmt.Sprintf("%s:%d", addr, port), token: strings.TrimSpace(token), mgr: mgr} +} + +func (s *RegistryServer) Start(ctx context.Context) error { + if s.mgr == nil { + return nil + } + mux := http.NewServeMux() + mux.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte("ok")) + }) + mux.HandleFunc("/nodes/register", s.handleRegister) + s.server = &http.Server{Addr: s.addr, Handler: mux} + go func() { + <-ctx.Done() + shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + _ = s.server.Shutdown(shutdownCtx) + }() + go func() { _ = s.server.ListenAndServe() }() + return nil +} + +func (s *RegistryServer) handleRegister(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + http.Error(w, "method not allowed", http.StatusMethodNotAllowed) + return + } + if s.token != "" { + auth := strings.TrimSpace(r.Header.Get("Authorization")) + if auth != "Bearer "+s.token { + http.Error(w, "unauthorized", http.StatusUnauthorized) + return + } + } + var n NodeInfo + if err := json.NewDecoder(r.Body).Decode(&n); err != nil { + http.Error(w, "invalid json", http.StatusBadRequest) + return + } + if strings.TrimSpace(n.ID) == "" { + http.Error(w, "id required", http.StatusBadRequest) + return + } + s.mgr.Upsert(n) + w.Header().Set("Content-Type", "application/json") + _ = json.NewEncoder(w).Encode(map[string]interface{}{"ok": true, "id": n.ID}) +}