mirror of
https://github.com/YspCoder/clawgo.git
synced 2026-04-13 13:17:28 +08:00
274 lines
8.1 KiB
Go
274 lines
8.1 KiB
Go
package api
|
|
|
|
import (
|
|
"encoding/json"
|
|
"fmt"
|
|
"net/http"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/YspCoder/clawgo/pkg/nodes"
|
|
"github.com/gorilla/websocket"
|
|
)
|
|
|
|
func (s *Server) SetNodeWebRTCTransport(t *nodes.WebRTCTransport) {
|
|
s.nodeWebRTC = t
|
|
}
|
|
|
|
func (s *Server) SetNodeP2PStatusHandler(fn func() map[string]interface{}) {
|
|
s.nodeP2PStatus = fn
|
|
}
|
|
|
|
func (s *Server) rememberNodeConnection(nodeID, connID string) {
|
|
nodeID = strings.TrimSpace(nodeID)
|
|
connID = strings.TrimSpace(connID)
|
|
if nodeID == "" || connID == "" {
|
|
return
|
|
}
|
|
s.nodeConnMu.Lock()
|
|
defer s.nodeConnMu.Unlock()
|
|
s.nodeConnIDs[nodeID] = connID
|
|
}
|
|
|
|
func (s *Server) bindNodeSocket(nodeID, connID string, conn *websocket.Conn) {
|
|
nodeID = strings.TrimSpace(nodeID)
|
|
connID = strings.TrimSpace(connID)
|
|
if nodeID == "" || connID == "" || conn == nil {
|
|
return
|
|
}
|
|
next := &nodeSocketConn{connID: connID, conn: conn}
|
|
s.nodeConnMu.Lock()
|
|
prev := s.nodeSockets[nodeID]
|
|
s.nodeSockets[nodeID] = next
|
|
s.nodeConnMu.Unlock()
|
|
if s.mgr != nil {
|
|
s.mgr.RegisterWireSender(nodeID, next)
|
|
}
|
|
if s.nodeWebRTC != nil {
|
|
s.nodeWebRTC.BindSignaler(nodeID, next)
|
|
}
|
|
if prev != nil && prev.connID != connID {
|
|
_ = prev.conn.Close()
|
|
}
|
|
}
|
|
|
|
func (s *Server) releaseNodeConnection(nodeID, connID string) bool {
|
|
nodeID = strings.TrimSpace(nodeID)
|
|
connID = strings.TrimSpace(connID)
|
|
if nodeID == "" || connID == "" {
|
|
return false
|
|
}
|
|
s.nodeConnMu.Lock()
|
|
defer s.nodeConnMu.Unlock()
|
|
if s.nodeConnIDs[nodeID] != connID {
|
|
return false
|
|
}
|
|
delete(s.nodeConnIDs, nodeID)
|
|
if sock := s.nodeSockets[nodeID]; sock != nil && sock.connID == connID {
|
|
delete(s.nodeSockets, nodeID)
|
|
}
|
|
if s.mgr != nil {
|
|
s.mgr.RegisterWireSender(nodeID, nil)
|
|
}
|
|
if s.nodeWebRTC != nil {
|
|
s.nodeWebRTC.UnbindSignaler(nodeID)
|
|
}
|
|
return true
|
|
}
|
|
|
|
func (s *Server) getNodeSocket(nodeID string) *nodeSocketConn {
|
|
nodeID = strings.TrimSpace(nodeID)
|
|
if nodeID == "" {
|
|
return nil
|
|
}
|
|
s.nodeConnMu.Lock()
|
|
defer s.nodeConnMu.Unlock()
|
|
return s.nodeSockets[nodeID]
|
|
}
|
|
|
|
func (s *Server) sendNodeSocketMessage(nodeID string, msg nodes.WireMessage) error {
|
|
sock := s.getNodeSocket(nodeID)
|
|
if sock == nil || sock.conn == nil {
|
|
return fmt.Errorf("node %s not connected", strings.TrimSpace(nodeID))
|
|
}
|
|
return sock.writeJSON(msg)
|
|
}
|
|
|
|
func (s *Server) handleNodeConnect(w http.ResponseWriter, r *http.Request) {
|
|
if !s.checkAuth(r) {
|
|
http.Error(w, "unauthorized", http.StatusUnauthorized)
|
|
return
|
|
}
|
|
if s.mgr == nil {
|
|
http.Error(w, "nodes manager unavailable", http.StatusInternalServerError)
|
|
return
|
|
}
|
|
conn, err := s.websocketUpgrader().Upgrade(w, r, nil)
|
|
if err != nil {
|
|
return
|
|
}
|
|
defer conn.Close()
|
|
|
|
var connectedID string
|
|
connID := fmt.Sprintf("%d", time.Now().UnixNano())
|
|
_ = conn.SetReadDeadline(time.Now().Add(90 * time.Second))
|
|
conn.SetPongHandler(func(string) error {
|
|
return conn.SetReadDeadline(time.Now().Add(90 * time.Second))
|
|
})
|
|
|
|
writeAck := func(ack nodes.WireAck) error {
|
|
if strings.TrimSpace(connectedID) != "" {
|
|
if sock := s.getNodeSocket(connectedID); sock != nil && sock.connID == connID && sock.conn == conn {
|
|
return sock.writeJSON(ack)
|
|
}
|
|
}
|
|
_ = conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
|
|
return conn.WriteJSON(ack)
|
|
}
|
|
|
|
defer func() {
|
|
if strings.TrimSpace(connectedID) != "" && s.releaseNodeConnection(connectedID, connID) {
|
|
s.mgr.MarkOffline(connectedID)
|
|
}
|
|
}()
|
|
|
|
for {
|
|
var msg nodes.WireMessage
|
|
if err := conn.ReadJSON(&msg); err != nil {
|
|
return
|
|
}
|
|
_ = conn.SetReadDeadline(time.Now().Add(90 * time.Second))
|
|
if s.mgr != nil && s.mgr.HandleWireMessage(msg) {
|
|
continue
|
|
}
|
|
type nodeSocketHandler func(nodes.WireMessage) bool
|
|
handlers := map[string]nodeSocketHandler{
|
|
"register": func(msg nodes.WireMessage) bool {
|
|
if msg.Node == nil || strings.TrimSpace(msg.Node.ID) == "" {
|
|
_ = writeAck(nodes.WireAck{OK: false, Type: "register", Error: "node.id required"})
|
|
return true
|
|
}
|
|
s.mgr.Upsert(*msg.Node)
|
|
connectedID = strings.TrimSpace(msg.Node.ID)
|
|
s.rememberNodeConnection(connectedID, connID)
|
|
s.bindNodeSocket(connectedID, connID, conn)
|
|
return writeAck(nodes.WireAck{OK: true, Type: "registered", ID: connectedID}) == nil
|
|
},
|
|
"heartbeat": func(msg nodes.WireMessage) bool {
|
|
id := strings.TrimSpace(msg.ID)
|
|
if id == "" {
|
|
id = connectedID
|
|
}
|
|
if id == "" {
|
|
_ = writeAck(nodes.WireAck{OK: false, Type: "heartbeat", Error: "id required"})
|
|
return true
|
|
}
|
|
if msg.Node != nil && strings.TrimSpace(msg.Node.ID) != "" {
|
|
s.mgr.Upsert(*msg.Node)
|
|
connectedID = strings.TrimSpace(msg.Node.ID)
|
|
s.rememberNodeConnection(connectedID, connID)
|
|
s.bindNodeSocket(connectedID, connID, conn)
|
|
} else if n, ok := s.mgr.Get(id); ok {
|
|
s.mgr.Upsert(n)
|
|
connectedID = id
|
|
s.rememberNodeConnection(connectedID, connID)
|
|
s.bindNodeSocket(connectedID, connID, conn)
|
|
} else {
|
|
_ = writeAck(nodes.WireAck{OK: false, Type: "heartbeat", ID: id, Error: "node not found"})
|
|
return true
|
|
}
|
|
return writeAck(nodes.WireAck{OK: true, Type: "heartbeat", ID: connectedID}) == nil
|
|
},
|
|
"signal_offer": func(msg nodes.WireMessage) bool { return s.handleNodeSignalMessage(msg, connectedID, writeAck) },
|
|
"signal_answer": func(msg nodes.WireMessage) bool { return s.handleNodeSignalMessage(msg, connectedID, writeAck) },
|
|
"signal_candidate": func(msg nodes.WireMessage) bool { return s.handleNodeSignalMessage(msg, connectedID, writeAck) },
|
|
}
|
|
if handler := handlers[strings.ToLower(strings.TrimSpace(msg.Type))]; handler != nil {
|
|
if !handler(msg) {
|
|
return
|
|
}
|
|
continue
|
|
}
|
|
if err := writeAck(nodes.WireAck{OK: false, Type: msg.Type, ID: msg.ID, Error: "unsupported message type"}); err != nil {
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *Server) handleNodeSignalMessage(msg nodes.WireMessage, connectedID string, writeAck func(nodes.WireAck) error) bool {
|
|
targetID := strings.TrimSpace(msg.To)
|
|
if s.nodeWebRTC != nil && (targetID == "" || strings.EqualFold(targetID, "gateway")) {
|
|
if err := s.nodeWebRTC.HandleSignal(msg); err != nil {
|
|
_ = writeAck(nodes.WireAck{OK: false, Type: msg.Type, ID: msg.ID, Error: err.Error()})
|
|
return true
|
|
}
|
|
return writeAck(nodes.WireAck{OK: true, Type: "signaled", ID: msg.ID}) == nil
|
|
}
|
|
if strings.TrimSpace(connectedID) == "" {
|
|
_ = writeAck(nodes.WireAck{OK: false, Type: msg.Type, Error: "node not registered"})
|
|
return true
|
|
}
|
|
if targetID == "" {
|
|
_ = writeAck(nodes.WireAck{OK: false, Type: msg.Type, ID: msg.ID, Error: "target node required"})
|
|
return true
|
|
}
|
|
msg.From = connectedID
|
|
if err := s.sendNodeSocketMessage(targetID, msg); err != nil {
|
|
_ = writeAck(nodes.WireAck{OK: false, Type: msg.Type, ID: msg.ID, Error: err.Error()})
|
|
return true
|
|
}
|
|
return writeAck(nodes.WireAck{OK: true, Type: "relayed", ID: msg.ID}) == nil
|
|
}
|
|
|
|
func (s *Server) handleWebUINodes(w http.ResponseWriter, r *http.Request) {
|
|
if !s.checkAuth(r) {
|
|
http.Error(w, "unauthorized", http.StatusUnauthorized)
|
|
return
|
|
}
|
|
switch r.Method {
|
|
case http.MethodGet:
|
|
payload := s.webUINodesPayload(r.Context())
|
|
payload["ok"] = true
|
|
writeJSON(w, payload)
|
|
case http.MethodPost:
|
|
var body struct {
|
|
Action string `json:"action"`
|
|
ID string `json:"id"`
|
|
}
|
|
if err := json.NewDecoder(r.Body).Decode(&body); err != nil {
|
|
http.Error(w, "invalid json", http.StatusBadRequest)
|
|
return
|
|
}
|
|
action := strings.ToLower(body.Action)
|
|
if action != "delete" {
|
|
http.Error(w, "unsupported action", http.StatusBadRequest)
|
|
return
|
|
}
|
|
if s.mgr == nil {
|
|
http.Error(w, "nodes manager unavailable", http.StatusInternalServerError)
|
|
return
|
|
}
|
|
id := body.ID
|
|
ok := s.mgr.Remove(id)
|
|
writeJSON(w, map[string]interface{}{"ok": true, "deleted": ok, "id": id})
|
|
default:
|
|
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
|
|
}
|
|
}
|
|
|
|
func (s *Server) handleWebUINodeDispatches(w http.ResponseWriter, r *http.Request) {
|
|
if !s.checkAuth(r) {
|
|
http.Error(w, "unauthorized", http.StatusUnauthorized)
|
|
return
|
|
}
|
|
if r.Method != http.MethodGet {
|
|
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
|
|
return
|
|
}
|
|
limit := queryBoundedPositiveInt(r, "limit", 50, 500)
|
|
writeJSON(w, map[string]interface{}{
|
|
"ok": true,
|
|
"items": s.webUINodesDispatchPayload(limit),
|
|
})
|
|
}
|