From 3db78e05777292d9dcd7c3544344df66bc4ff687 Mon Sep 17 00:00:00 2001 From: lpf Date: Sun, 8 Mar 2026 23:04:35 +0800 Subject: [PATCH] feat: expose webrtc session health signals --- pkg/nodes/transport_test.go | 12 ++++ pkg/nodes/webrtc.go | 117 ++++++++++++++++++++++++++++++------ 2 files changed, 109 insertions(+), 20 deletions(-) diff --git a/pkg/nodes/transport_test.go b/pkg/nodes/transport_test.go index 92fbf7f..9cd3cc6 100644 --- a/pkg/nodes/transport_test.go +++ b/pkg/nodes/transport_test.go @@ -209,4 +209,16 @@ func TestWebRTCTransportSendEndToEnd(t *testing.T) { if resp.Payload["used_transport"] != nil { t.Fatalf("transport annotations should not be added at transport layer: %+v", resp.Payload) } + + snapshot := transport.Snapshot() + if snapshot["active_sessions"] != 1 { + t.Fatalf("expected one active session, got %+v", snapshot) + } + nodesRaw, _ := snapshot["nodes"].([]map[string]interface{}) + if len(nodesRaw) == 0 { + t.Fatalf("expected node snapshots, got %+v", snapshot) + } + if nodesRaw[0]["status"] != "open" { + t.Fatalf("expected open status, got %+v", nodesRaw[0]) + } } diff --git a/pkg/nodes/webrtc.go b/pkg/nodes/webrtc.go index a02b855..9049a83 100644 --- a/pkg/nodes/webrtc.go +++ b/pkg/nodes/webrtc.go @@ -12,18 +12,28 @@ import ( ) type gatewayRTCSession struct { - nodeID string - pc *webrtc.PeerConnection - dc *webrtc.DataChannel - ready chan struct{} - readyMu sync.Once - writeMu sync.Mutex - pending map[string]chan Response - mu sync.Mutex - nextID uint64 + nodeID string + pc *webrtc.PeerConnection + dc *webrtc.DataChannel + ready chan struct{} + readyMu sync.Once + writeMu sync.Mutex + pending map[string]chan Response + mu sync.Mutex + nextID uint64 + status string + lastError string + retryCount int + createdAt time.Time + lastAttempt time.Time + lastReadyAt time.Time } func (s *gatewayRTCSession) markReady() { + s.mu.Lock() + s.status = "open" + s.lastReadyAt = time.Now().UTC() + s.mu.Unlock() s.readyMu.Do(func() { close(s.ready) }) } @@ -47,6 +57,40 @@ func (s *gatewayRTCSession) nextRequestID() string { return fmt.Sprintf("rtc-%s-%d", s.nodeID, s.nextID) } +func (s *gatewayRTCSession) setStatus(status string) { + s.mu.Lock() + defer s.mu.Unlock() + s.status = strings.TrimSpace(status) +} + +func (s *gatewayRTCSession) setLastError(err error) { + s.mu.Lock() + defer s.mu.Unlock() + if err == nil { + s.lastError = "" + return + } + s.lastError = strings.TrimSpace(err.Error()) +} + +func (s *gatewayRTCSession) snapshot() map[string]interface{} { + s.mu.Lock() + defer s.mu.Unlock() + status := s.status + if status == "" { + status = "connecting" + } + return map[string]interface{}{ + "node": s.nodeID, + "status": status, + "last_error": s.lastError, + "retry_count": s.retryCount, + "created_at": s.createdAt, + "last_attempt": s.lastAttempt, + "last_ready_at": s.lastReadyAt, + } +} + type WebRTCTransport struct { stunServers []string @@ -77,15 +121,14 @@ func (t *WebRTCTransport) Snapshot() map[string]interface{} { nodes := make([]map[string]interface{}, 0, len(t.sessions)) active := 0 for nodeID, session := range t.sessions { - status := "connecting" if session != nil && session.dc != nil && session.dc.ReadyState() == webrtc.DataChannelStateOpen { - status = "open" active++ } - nodes = append(nodes, map[string]interface{}{ - "node": nodeID, - "status": status, - }) + if session == nil { + nodes = append(nodes, map[string]interface{}{"node": nodeID, "status": "unknown"}) + continue + } + nodes = append(nodes, session.snapshot()) } return map[string]interface{}{ "transport": "webrtc", @@ -115,6 +158,7 @@ func (t *WebRTCTransport) UnbindSignaler(nodeID string) { delete(t.sessions, nodeID) t.mu.Unlock() if session != nil && session.pc != nil { + session.setStatus("offline") _ = session.pc.Close() } } @@ -157,12 +201,17 @@ func (t *WebRTCTransport) Send(ctx context.Context, req Request) (Response, erro if err != nil { return Response{OK: false, Code: "p2p_unavailable", Node: req.Node, Action: req.Action, Error: err.Error()}, nil } + session.setLastError(nil) select { case <-ctx.Done(): + session.setStatus("cancelled") + session.setLastError(ctx.Err()) return Response{}, ctx.Err() case <-session.ready: case <-time.After(8 * time.Second): + session.setStatus("timeout") + session.setLastError(fmt.Errorf("webrtc session not ready")) return Response{OK: false, Code: "p2p_timeout", Node: req.Node, Action: req.Action, Error: "webrtc session not ready"}, nil } @@ -181,6 +230,8 @@ func (t *WebRTCTransport) Send(ctx context.Context, req Request) (Response, erro session.mu.Lock() delete(session.pending, reqID) session.mu.Unlock() + session.setStatus("send_failed") + session.setLastError(err) return Response{OK: false, Code: "p2p_send_failed", Node: req.Node, Action: req.Action, Error: err.Error()}, nil } @@ -189,8 +240,17 @@ func (t *WebRTCTransport) Send(ctx context.Context, req Request) (Response, erro session.mu.Lock() delete(session.pending, reqID) session.mu.Unlock() + session.setStatus("cancelled") + session.setLastError(ctx.Err()) return Response{}, ctx.Err() case resp := <-respCh: + if resp.OK { + session.setStatus("open") + session.setLastError(nil) + } else if strings.TrimSpace(resp.Error) != "" { + session.setStatus("remote_error") + session.setLastError(fmt.Errorf("%s", strings.TrimSpace(resp.Error))) + } return resp, nil } } @@ -203,6 +263,10 @@ func (t *WebRTCTransport) ensureSession(nodeID string) (*gatewayRTCSession, erro t.mu.Lock() if session := t.sessions[nodeID]; session != nil { + session.mu.Lock() + session.retryCount++ + session.lastAttempt = time.Now().UTC() + session.mu.Unlock() t.mu.Unlock() return session, nil } @@ -225,11 +289,14 @@ func (t *WebRTCTransport) ensureSession(nodeID string) (*gatewayRTCSession, erro return nil, err } session := &gatewayRTCSession{ - nodeID: nodeID, - pc: pc, - dc: dc, - ready: make(chan struct{}), - pending: map[string]chan Response{}, + nodeID: nodeID, + pc: pc, + dc: dc, + ready: make(chan struct{}), + pending: map[string]chan Response{}, + status: "connecting", + createdAt: time.Now().UTC(), + lastAttempt: time.Now().UTC(), } pc.OnICECandidate(func(candidate *webrtc.ICECandidate) { @@ -249,8 +316,10 @@ func (t *WebRTCTransport) ensureSession(nodeID string) (*gatewayRTCSession, erro }) }) pc.OnConnectionStateChange(func(state webrtc.PeerConnectionState) { + session.setStatus(strings.ToLower(strings.TrimSpace(state.String()))) switch state { case webrtc.PeerConnectionStateFailed, webrtc.PeerConnectionStateClosed, webrtc.PeerConnectionStateDisconnected: + session.setLastError(fmt.Errorf("peer connection state: %s", state.String())) t.mu.Lock() if t.sessions[nodeID] == session { delete(t.sessions, nodeID) @@ -261,6 +330,10 @@ func (t *WebRTCTransport) ensureSession(nodeID string) (*gatewayRTCSession, erro dc.OnOpen(func() { session.markReady() }) + dc.OnError(func(err error) { + session.setStatus("channel_error") + session.setLastError(err) + }) dc.OnMessage(func(message webrtc.DataChannelMessage) { var msg WireMessage if err := json.Unmarshal(message.Data, &msg); err != nil { @@ -304,6 +377,8 @@ func (t *WebRTCTransport) ensureSession(nodeID string) (*gatewayRTCSession, erro t.mu.Lock() delete(t.sessions, nodeID) t.mu.Unlock() + session.setStatus("signal_unavailable") + session.setLastError(fmt.Errorf("node %s signaling unavailable", nodeID)) _ = pc.Close() return nil, fmt.Errorf("node %s signaling unavailable", nodeID) } @@ -317,6 +392,8 @@ func (t *WebRTCTransport) ensureSession(nodeID string) (*gatewayRTCSession, erro t.mu.Lock() delete(t.sessions, nodeID) t.mu.Unlock() + session.setStatus("signal_failed") + session.setLastError(err) _ = pc.Close() return nil, err }