mirror of
https://github.com/YspCoder/clawgo.git
synced 2026-05-06 16:57:29 +08:00
feat: surface node p2p runtime visibility
This commit is contained in:
@@ -41,6 +41,7 @@ type Server struct {
|
||||
nodeConnIDs map[string]string
|
||||
nodeSockets map[string]*nodeSocketConn
|
||||
nodeWebRTC *nodes.WebRTCTransport
|
||||
nodeP2PStatus func() map[string]interface{}
|
||||
gatewayVersion string
|
||||
webuiVersion string
|
||||
configPath string
|
||||
@@ -120,6 +121,9 @@ func (s *Server) SetWebUIVersion(v string) { s.webuiVersion
|
||||
func (s *Server) SetNodeWebRTCTransport(t *nodes.WebRTCTransport) {
|
||||
s.nodeWebRTC = t
|
||||
}
|
||||
func (s *Server) SetNodeP2PStatusHandler(fn func() map[string]interface{}) {
|
||||
s.nodeP2PStatus = fn
|
||||
}
|
||||
|
||||
func (s *Server) rememberNodeConnection(nodeID, connID string) {
|
||||
nodeID = strings.TrimSpace(nodeID)
|
||||
@@ -1066,9 +1070,14 @@ func (s *Server) webUINodesPayload(ctx context.Context) map[string]interface{} {
|
||||
if !matched {
|
||||
list = append([]nodes.NodeInfo{local}, list...)
|
||||
}
|
||||
p2p := map[string]interface{}{}
|
||||
if s.nodeP2PStatus != nil {
|
||||
p2p = s.nodeP2PStatus()
|
||||
}
|
||||
return map[string]interface{}{
|
||||
"nodes": list,
|
||||
"trees": s.buildNodeAgentTrees(ctx, list),
|
||||
"p2p": p2p,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1548,7 +1557,11 @@ func (s *Server) handleWebUINodes(w http.ResponseWriter, r *http.Request) {
|
||||
list = append([]nodes.NodeInfo{local}, list...)
|
||||
}
|
||||
trees := s.buildNodeAgentTrees(r.Context(), list)
|
||||
_ = json.NewEncoder(w).Encode(map[string]interface{}{"ok": true, "nodes": list, "trees": trees})
|
||||
p2p := map[string]interface{}{}
|
||||
if s.nodeP2PStatus != nil {
|
||||
p2p = s.nodeP2PStatus()
|
||||
}
|
||||
_ = json.NewEncoder(w).Encode(map[string]interface{}{"ok": true, "nodes": list, "trees": trees, "p2p": p2p})
|
||||
case http.MethodPost:
|
||||
var body struct {
|
||||
Action string `json:"action"`
|
||||
|
||||
@@ -428,3 +428,31 @@ func TestHandleWebUILogsLive(t *testing.T) {
|
||||
t.Fatalf("expected tail-ok entry, got: %+v", entry)
|
||||
}
|
||||
}
|
||||
|
||||
func TestHandleWebUINodesIncludesP2PSummary(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
srv := NewServer("127.0.0.1", 0, "", nodes.NewManager())
|
||||
srv.SetNodeP2PStatusHandler(func() map[string]interface{} {
|
||||
return map[string]interface{}{
|
||||
"enabled": true,
|
||||
"transport": "webrtc",
|
||||
"active_sessions": 2,
|
||||
}
|
||||
})
|
||||
|
||||
req := httptest.NewRequest(http.MethodGet, "/webui/api/nodes", nil)
|
||||
rec := httptest.NewRecorder()
|
||||
srv.handleWebUINodes(rec, req)
|
||||
if rec.Code != http.StatusOK {
|
||||
t.Fatalf("expected 200, got %d", rec.Code)
|
||||
}
|
||||
var body map[string]interface{}
|
||||
if err := json.Unmarshal(rec.Body.Bytes(), &body); err != nil {
|
||||
t.Fatalf("decode body: %v", err)
|
||||
}
|
||||
p2p, _ := body["p2p"].(map[string]interface{})
|
||||
if p2p == nil || p2p["transport"] != "webrtc" {
|
||||
t.Fatalf("expected p2p summary, got %+v", body)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -33,25 +33,44 @@ func (r *Router) Dispatch(ctx context.Context, req Request, mode string) (Respon
|
||||
if r.P2P == nil {
|
||||
return Response{OK: false, Node: req.Node, Action: req.Action, Error: "p2p transport unavailable"}, nil
|
||||
}
|
||||
return r.P2P.Send(ctx, req)
|
||||
resp, err := r.P2P.Send(ctx, req)
|
||||
return annotateTransport(resp, "p2p", r.P2P.Name(), ""), err
|
||||
case "relay":
|
||||
if r.Relay == nil {
|
||||
return Response{OK: false, Node: req.Node, Action: req.Action, Error: "relay transport unavailable"}, nil
|
||||
}
|
||||
return r.Relay.Send(ctx, req)
|
||||
resp, err := r.Relay.Send(ctx, req)
|
||||
return annotateTransport(resp, "relay", r.Relay.Name(), ""), err
|
||||
default: // auto
|
||||
if r.P2P != nil {
|
||||
if resp, err := r.P2P.Send(ctx, req); err == nil && resp.OK {
|
||||
return resp, nil
|
||||
return annotateTransport(resp, "auto", r.P2P.Name(), ""), nil
|
||||
}
|
||||
}
|
||||
if r.Relay != nil {
|
||||
return r.Relay.Send(ctx, req)
|
||||
resp, err := r.Relay.Send(ctx, req)
|
||||
return annotateTransport(resp, "auto", r.Relay.Name(), "p2p"), err
|
||||
}
|
||||
return Response{}, fmt.Errorf("no transport available")
|
||||
}
|
||||
}
|
||||
|
||||
func annotateTransport(resp Response, mode, usedTransport, fallbackFrom string) Response {
|
||||
if resp.Payload == nil {
|
||||
resp.Payload = map[string]interface{}{}
|
||||
}
|
||||
if strings.TrimSpace(mode) != "" {
|
||||
resp.Payload["dispatch_mode"] = strings.TrimSpace(mode)
|
||||
}
|
||||
if strings.TrimSpace(usedTransport) != "" {
|
||||
resp.Payload["used_transport"] = strings.TrimSpace(usedTransport)
|
||||
}
|
||||
if strings.TrimSpace(fallbackFrom) != "" {
|
||||
resp.Payload["fallback_from"] = strings.TrimSpace(fallbackFrom)
|
||||
}
|
||||
return resp
|
||||
}
|
||||
|
||||
// WebsocketP2PTransport uses the persistent node websocket as a request/response tunnel
|
||||
// while the project evolves toward a true peer data channel.
|
||||
type WebsocketP2PTransport struct {
|
||||
|
||||
@@ -71,6 +71,29 @@ func NewWebRTCTransport(stunServers []string) *WebRTCTransport {
|
||||
|
||||
func (t *WebRTCTransport) Name() string { return "p2p-webrtc" }
|
||||
|
||||
func (t *WebRTCTransport) Snapshot() map[string]interface{} {
|
||||
t.mu.Lock()
|
||||
defer t.mu.Unlock()
|
||||
nodes := make([]map[string]interface{}, 0, len(t.sessions))
|
||||
active := 0
|
||||
for nodeID, session := range t.sessions {
|
||||
status := "connecting"
|
||||
if session != nil && session.dc != nil && session.dc.ReadyState() == webrtc.DataChannelStateOpen {
|
||||
status = "open"
|
||||
active++
|
||||
}
|
||||
nodes = append(nodes, map[string]interface{}{
|
||||
"node": nodeID,
|
||||
"status": status,
|
||||
})
|
||||
}
|
||||
return map[string]interface{}{
|
||||
"transport": "webrtc",
|
||||
"active_sessions": active,
|
||||
"nodes": nodes,
|
||||
}
|
||||
}
|
||||
|
||||
func (t *WebRTCTransport) BindSignaler(nodeID string, sender WireSender) {
|
||||
nodeID = strings.TrimSpace(nodeID)
|
||||
if nodeID == "" {
|
||||
|
||||
@@ -22,20 +22,20 @@ type NodesTool struct {
|
||||
func NewNodesTool(m *nodes.Manager, r *nodes.Router, auditPath string) *NodesTool {
|
||||
return &NodesTool{manager: m, router: r, auditPath: strings.TrimSpace(auditPath)}
|
||||
}
|
||||
func (t *NodesTool) Name() string { return "nodes" }
|
||||
func (t *NodesTool) Name() string { return "nodes" }
|
||||
func (t *NodesTool) Description() string {
|
||||
return "Manage paired nodes (status/describe/run/invoke/camera/screen/location/canvas)."
|
||||
}
|
||||
func (t *NodesTool) Parameters() map[string]interface{} {
|
||||
return map[string]interface{}{"type": "object", "properties": map[string]interface{}{
|
||||
"action": map[string]interface{}{"type": "string", "description": "status|describe|run|invoke|agent_task|camera_snap|camera_clip|screen_record|screen_snapshot|location_get|canvas_snapshot|canvas_action"},
|
||||
"node": map[string]interface{}{"type": "string", "description": "target node id"},
|
||||
"mode": map[string]interface{}{"type": "string", "description": "auto|p2p|relay (default auto)"},
|
||||
"args": map[string]interface{}{"type": "object", "description": "action args"},
|
||||
"task": map[string]interface{}{"type": "string", "description": "agent_task content for child node model"},
|
||||
"model": map[string]interface{}{"type": "string", "description": "optional model for agent_task"},
|
||||
"command": map[string]interface{}{"type": "array", "items": map[string]interface{}{"type": "string"}, "description": "run command array shortcut"},
|
||||
"facing": map[string]interface{}{"type": "string", "description": "camera facing: front|back|both"},
|
||||
"action": map[string]interface{}{"type": "string", "description": "status|describe|run|invoke|agent_task|camera_snap|camera_clip|screen_record|screen_snapshot|location_get|canvas_snapshot|canvas_action"},
|
||||
"node": map[string]interface{}{"type": "string", "description": "target node id"},
|
||||
"mode": map[string]interface{}{"type": "string", "description": "auto|p2p|relay (default auto)"},
|
||||
"args": map[string]interface{}{"type": "object", "description": "action args"},
|
||||
"task": map[string]interface{}{"type": "string", "description": "agent_task content for child node model"},
|
||||
"model": map[string]interface{}{"type": "string", "description": "optional model for agent_task"},
|
||||
"command": map[string]interface{}{"type": "array", "items": map[string]interface{}{"type": "string"}, "description": "run command array shortcut"},
|
||||
"facing": map[string]interface{}{"type": "string", "description": "camera facing: front|back|both"},
|
||||
"duration_ms": map[string]interface{}{"type": "integer", "description": "clip/record duration"},
|
||||
}, "required": []string{"action"}}
|
||||
}
|
||||
@@ -144,6 +144,12 @@ func (t *NodesTool) writeAudit(req nodes.Request, resp nodes.Response, mode stri
|
||||
"error": resp.Error,
|
||||
"duration_ms": durationMs,
|
||||
}
|
||||
if used, _ := resp.Payload["used_transport"].(string); strings.TrimSpace(used) != "" {
|
||||
row["used_transport"] = strings.TrimSpace(used)
|
||||
}
|
||||
if fallback, _ := resp.Payload["fallback_from"].(string); strings.TrimSpace(fallback) != "" {
|
||||
row["fallback_from"] = strings.TrimSpace(fallback)
|
||||
}
|
||||
b, _ := json.Marshal(row)
|
||||
f, err := os.OpenFile(t.auditPath, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0644)
|
||||
if err != nil {
|
||||
|
||||
Reference in New Issue
Block a user