mirror of
https://github.com/YspCoder/clawgo.git
synced 2026-04-13 04:27:28 +08:00
feat: expose webrtc session health signals
This commit is contained in:
@@ -209,4 +209,16 @@ func TestWebRTCTransportSendEndToEnd(t *testing.T) {
|
||||
if resp.Payload["used_transport"] != nil {
|
||||
t.Fatalf("transport annotations should not be added at transport layer: %+v", resp.Payload)
|
||||
}
|
||||
|
||||
snapshot := transport.Snapshot()
|
||||
if snapshot["active_sessions"] != 1 {
|
||||
t.Fatalf("expected one active session, got %+v", snapshot)
|
||||
}
|
||||
nodesRaw, _ := snapshot["nodes"].([]map[string]interface{})
|
||||
if len(nodesRaw) == 0 {
|
||||
t.Fatalf("expected node snapshots, got %+v", snapshot)
|
||||
}
|
||||
if nodesRaw[0]["status"] != "open" {
|
||||
t.Fatalf("expected open status, got %+v", nodesRaw[0])
|
||||
}
|
||||
}
|
||||
|
||||
@@ -12,18 +12,28 @@ import (
|
||||
)
|
||||
|
||||
type gatewayRTCSession struct {
|
||||
nodeID string
|
||||
pc *webrtc.PeerConnection
|
||||
dc *webrtc.DataChannel
|
||||
ready chan struct{}
|
||||
readyMu sync.Once
|
||||
writeMu sync.Mutex
|
||||
pending map[string]chan Response
|
||||
mu sync.Mutex
|
||||
nextID uint64
|
||||
nodeID string
|
||||
pc *webrtc.PeerConnection
|
||||
dc *webrtc.DataChannel
|
||||
ready chan struct{}
|
||||
readyMu sync.Once
|
||||
writeMu sync.Mutex
|
||||
pending map[string]chan Response
|
||||
mu sync.Mutex
|
||||
nextID uint64
|
||||
status string
|
||||
lastError string
|
||||
retryCount int
|
||||
createdAt time.Time
|
||||
lastAttempt time.Time
|
||||
lastReadyAt time.Time
|
||||
}
|
||||
|
||||
func (s *gatewayRTCSession) markReady() {
|
||||
s.mu.Lock()
|
||||
s.status = "open"
|
||||
s.lastReadyAt = time.Now().UTC()
|
||||
s.mu.Unlock()
|
||||
s.readyMu.Do(func() { close(s.ready) })
|
||||
}
|
||||
|
||||
@@ -47,6 +57,40 @@ func (s *gatewayRTCSession) nextRequestID() string {
|
||||
return fmt.Sprintf("rtc-%s-%d", s.nodeID, s.nextID)
|
||||
}
|
||||
|
||||
func (s *gatewayRTCSession) setStatus(status string) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
s.status = strings.TrimSpace(status)
|
||||
}
|
||||
|
||||
func (s *gatewayRTCSession) setLastError(err error) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
if err == nil {
|
||||
s.lastError = ""
|
||||
return
|
||||
}
|
||||
s.lastError = strings.TrimSpace(err.Error())
|
||||
}
|
||||
|
||||
func (s *gatewayRTCSession) snapshot() map[string]interface{} {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
status := s.status
|
||||
if status == "" {
|
||||
status = "connecting"
|
||||
}
|
||||
return map[string]interface{}{
|
||||
"node": s.nodeID,
|
||||
"status": status,
|
||||
"last_error": s.lastError,
|
||||
"retry_count": s.retryCount,
|
||||
"created_at": s.createdAt,
|
||||
"last_attempt": s.lastAttempt,
|
||||
"last_ready_at": s.lastReadyAt,
|
||||
}
|
||||
}
|
||||
|
||||
type WebRTCTransport struct {
|
||||
stunServers []string
|
||||
|
||||
@@ -77,15 +121,14 @@ func (t *WebRTCTransport) Snapshot() map[string]interface{} {
|
||||
nodes := make([]map[string]interface{}, 0, len(t.sessions))
|
||||
active := 0
|
||||
for nodeID, session := range t.sessions {
|
||||
status := "connecting"
|
||||
if session != nil && session.dc != nil && session.dc.ReadyState() == webrtc.DataChannelStateOpen {
|
||||
status = "open"
|
||||
active++
|
||||
}
|
||||
nodes = append(nodes, map[string]interface{}{
|
||||
"node": nodeID,
|
||||
"status": status,
|
||||
})
|
||||
if session == nil {
|
||||
nodes = append(nodes, map[string]interface{}{"node": nodeID, "status": "unknown"})
|
||||
continue
|
||||
}
|
||||
nodes = append(nodes, session.snapshot())
|
||||
}
|
||||
return map[string]interface{}{
|
||||
"transport": "webrtc",
|
||||
@@ -115,6 +158,7 @@ func (t *WebRTCTransport) UnbindSignaler(nodeID string) {
|
||||
delete(t.sessions, nodeID)
|
||||
t.mu.Unlock()
|
||||
if session != nil && session.pc != nil {
|
||||
session.setStatus("offline")
|
||||
_ = session.pc.Close()
|
||||
}
|
||||
}
|
||||
@@ -157,12 +201,17 @@ func (t *WebRTCTransport) Send(ctx context.Context, req Request) (Response, erro
|
||||
if err != nil {
|
||||
return Response{OK: false, Code: "p2p_unavailable", Node: req.Node, Action: req.Action, Error: err.Error()}, nil
|
||||
}
|
||||
session.setLastError(nil)
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
session.setStatus("cancelled")
|
||||
session.setLastError(ctx.Err())
|
||||
return Response{}, ctx.Err()
|
||||
case <-session.ready:
|
||||
case <-time.After(8 * time.Second):
|
||||
session.setStatus("timeout")
|
||||
session.setLastError(fmt.Errorf("webrtc session not ready"))
|
||||
return Response{OK: false, Code: "p2p_timeout", Node: req.Node, Action: req.Action, Error: "webrtc session not ready"}, nil
|
||||
}
|
||||
|
||||
@@ -181,6 +230,8 @@ func (t *WebRTCTransport) Send(ctx context.Context, req Request) (Response, erro
|
||||
session.mu.Lock()
|
||||
delete(session.pending, reqID)
|
||||
session.mu.Unlock()
|
||||
session.setStatus("send_failed")
|
||||
session.setLastError(err)
|
||||
return Response{OK: false, Code: "p2p_send_failed", Node: req.Node, Action: req.Action, Error: err.Error()}, nil
|
||||
}
|
||||
|
||||
@@ -189,8 +240,17 @@ func (t *WebRTCTransport) Send(ctx context.Context, req Request) (Response, erro
|
||||
session.mu.Lock()
|
||||
delete(session.pending, reqID)
|
||||
session.mu.Unlock()
|
||||
session.setStatus("cancelled")
|
||||
session.setLastError(ctx.Err())
|
||||
return Response{}, ctx.Err()
|
||||
case resp := <-respCh:
|
||||
if resp.OK {
|
||||
session.setStatus("open")
|
||||
session.setLastError(nil)
|
||||
} else if strings.TrimSpace(resp.Error) != "" {
|
||||
session.setStatus("remote_error")
|
||||
session.setLastError(fmt.Errorf("%s", strings.TrimSpace(resp.Error)))
|
||||
}
|
||||
return resp, nil
|
||||
}
|
||||
}
|
||||
@@ -203,6 +263,10 @@ func (t *WebRTCTransport) ensureSession(nodeID string) (*gatewayRTCSession, erro
|
||||
|
||||
t.mu.Lock()
|
||||
if session := t.sessions[nodeID]; session != nil {
|
||||
session.mu.Lock()
|
||||
session.retryCount++
|
||||
session.lastAttempt = time.Now().UTC()
|
||||
session.mu.Unlock()
|
||||
t.mu.Unlock()
|
||||
return session, nil
|
||||
}
|
||||
@@ -225,11 +289,14 @@ func (t *WebRTCTransport) ensureSession(nodeID string) (*gatewayRTCSession, erro
|
||||
return nil, err
|
||||
}
|
||||
session := &gatewayRTCSession{
|
||||
nodeID: nodeID,
|
||||
pc: pc,
|
||||
dc: dc,
|
||||
ready: make(chan struct{}),
|
||||
pending: map[string]chan Response{},
|
||||
nodeID: nodeID,
|
||||
pc: pc,
|
||||
dc: dc,
|
||||
ready: make(chan struct{}),
|
||||
pending: map[string]chan Response{},
|
||||
status: "connecting",
|
||||
createdAt: time.Now().UTC(),
|
||||
lastAttempt: time.Now().UTC(),
|
||||
}
|
||||
|
||||
pc.OnICECandidate(func(candidate *webrtc.ICECandidate) {
|
||||
@@ -249,8 +316,10 @@ func (t *WebRTCTransport) ensureSession(nodeID string) (*gatewayRTCSession, erro
|
||||
})
|
||||
})
|
||||
pc.OnConnectionStateChange(func(state webrtc.PeerConnectionState) {
|
||||
session.setStatus(strings.ToLower(strings.TrimSpace(state.String())))
|
||||
switch state {
|
||||
case webrtc.PeerConnectionStateFailed, webrtc.PeerConnectionStateClosed, webrtc.PeerConnectionStateDisconnected:
|
||||
session.setLastError(fmt.Errorf("peer connection state: %s", state.String()))
|
||||
t.mu.Lock()
|
||||
if t.sessions[nodeID] == session {
|
||||
delete(t.sessions, nodeID)
|
||||
@@ -261,6 +330,10 @@ func (t *WebRTCTransport) ensureSession(nodeID string) (*gatewayRTCSession, erro
|
||||
dc.OnOpen(func() {
|
||||
session.markReady()
|
||||
})
|
||||
dc.OnError(func(err error) {
|
||||
session.setStatus("channel_error")
|
||||
session.setLastError(err)
|
||||
})
|
||||
dc.OnMessage(func(message webrtc.DataChannelMessage) {
|
||||
var msg WireMessage
|
||||
if err := json.Unmarshal(message.Data, &msg); err != nil {
|
||||
@@ -304,6 +377,8 @@ func (t *WebRTCTransport) ensureSession(nodeID string) (*gatewayRTCSession, erro
|
||||
t.mu.Lock()
|
||||
delete(t.sessions, nodeID)
|
||||
t.mu.Unlock()
|
||||
session.setStatus("signal_unavailable")
|
||||
session.setLastError(fmt.Errorf("node %s signaling unavailable", nodeID))
|
||||
_ = pc.Close()
|
||||
return nil, fmt.Errorf("node %s signaling unavailable", nodeID)
|
||||
}
|
||||
@@ -317,6 +392,8 @@ func (t *WebRTCTransport) ensureSession(nodeID string) (*gatewayRTCSession, erro
|
||||
t.mu.Lock()
|
||||
delete(t.sessions, nodeID)
|
||||
t.mu.Unlock()
|
||||
session.setStatus("signal_failed")
|
||||
session.setLastError(err)
|
||||
_ = pc.Close()
|
||||
return nil, err
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user