package netbase import ( "errors" "net/http" "sync" "time" "github.com/gorilla/websocket" ) type Connection struct { wsConnect *websocket.Conn inChan chan []byte outChan chan []byte closeChan chan byte mutex sync.Mutex // 对closeChan关闭上锁 isClosed bool // 防止closeChan被关闭多次 } type CWebSocketServer struct { Connection bindurl string } func NewWebSocketServer(bindurl string) *CWebSocketServer { wss := new(CWebSocketServer) wss.bindurl = bindurl return wss } func (wss *CWebSocketServer) Start() { http.HandleFunc("/ws", wsHandler) go http.ListenAndServe(wss.bindurl, nil) } var ( upgrader = websocket.Upgrader{ // 允许跨域 CheckOrigin: func(r *http.Request) bool { return true }, } ) func wsHandler(w http.ResponseWriter, r *http.Request) { // w.Write([]byte("hello")) var ( wsConn *websocket.Conn err error conn *Connection data []byte ) // 完成ws协议的握手操作 // Upgrade:websocket if wsConn, err = upgrader.Upgrade(w, r, nil); err != nil { return } if conn, err = InitConnection(wsConn); err != nil { goto ERR } // 启动线程,不断发消息 go func() { var ( err error ) for { if err = conn.WriteMessage([]byte("heartbeat")); err != nil { return } time.Sleep(1 * time.Second) } }() for { if data, err = conn.ReadMessage(); err != nil { goto ERR } if err = conn.WriteMessage(data); err != nil { goto ERR } } ERR: conn.Close() } func InitConnection(wsConn *websocket.Conn) (conn *Connection, err error) { conn = &Connection{ wsConnect: wsConn, inChan: make(chan []byte, 1000), outChan: make(chan []byte, 1000), closeChan: make(chan byte, 1), } // 启动读协程 go conn.readLoop() // 启动写协程 go conn.writeLoop() return } func (conn *Connection) ReadMessage() (data []byte, err error) { select { case data = <-conn.inChan: case <-conn.closeChan: err = errors.New("connection is closeed") } return } func (conn *Connection) WriteMessage(data []byte) (err error) { select { case conn.outChan <- data: case <-conn.closeChan: err = errors.New("connection is closeed") } return } func (conn *Connection) Close() { // 线程安全,可多次调用 conn.wsConnect.Close() // 利用标记,让closeChan只关闭一次 conn.mutex.Lock() if !conn.isClosed { close(conn.closeChan) conn.isClosed = true } conn.mutex.Unlock() } // 内部实现 func (conn *Connection) readLoop() { var ( data []byte err error ) for { if _, data, err = conn.wsConnect.ReadMessage(); err != nil { goto ERR } //阻塞在这里,等待inChan有空闲位置 select { case conn.inChan <- data: case <-conn.closeChan: // closeChan 感知 conn断开 goto ERR } } ERR: conn.Close() } func (conn *Connection) writeLoop() { var ( data []byte err error ) for { select { case data = <-conn.outChan: case <-conn.closeChan: goto ERR } if err = conn.wsConnect.WriteMessage(websocket.TextMessage, data); err != nil { goto ERR } } ERR: conn.Close() }