mirror of
https://github.com/duanhf2012/origin.git
synced 2026-02-04 06:54:45 +08:00
183 lines
3.0 KiB
Go
183 lines
3.0 KiB
Go
package netbase
|
||
|
||
import (
|
||
"errors"
|
||
"net/http"
|
||
"sync"
|
||
"time"
|
||
|
||
"github.com/gorilla/websocket"
|
||
)
|
||
|
||
type Connection struct {
|
||
wsConnect *websocket.Conn
|
||
inChan chan []byte
|
||
outChan chan []byte
|
||
closeChan chan byte
|
||
|
||
mutex sync.Mutex // 对closeChan关闭上锁
|
||
isClosed bool // 防止closeChan被关闭多次
|
||
}
|
||
|
||
type CWebSocketServer struct {
|
||
Connection
|
||
bindurl string
|
||
}
|
||
|
||
func NewWebSocketServer(bindurl string) *CWebSocketServer {
|
||
wss := new(CWebSocketServer)
|
||
wss.bindurl = bindurl
|
||
return wss
|
||
}
|
||
|
||
func (wss *CWebSocketServer) Start() {
|
||
|
||
http.HandleFunc("/ws", wsHandler)
|
||
go http.ListenAndServe(wss.bindurl, nil)
|
||
}
|
||
|
||
var (
|
||
upgrader = websocket.Upgrader{
|
||
// 允许跨域
|
||
CheckOrigin: func(r *http.Request) bool {
|
||
return true
|
||
},
|
||
}
|
||
)
|
||
|
||
func wsHandler(w http.ResponseWriter, r *http.Request) {
|
||
// w.Write([]byte("hello"))
|
||
var (
|
||
wsConn *websocket.Conn
|
||
err error
|
||
conn *Connection
|
||
data []byte
|
||
)
|
||
// 完成ws协议的握手操作
|
||
// Upgrade:websocket
|
||
if wsConn, err = upgrader.Upgrade(w, r, nil); err != nil {
|
||
return
|
||
}
|
||
|
||
if conn, err = InitConnection(wsConn); err != nil {
|
||
goto ERR
|
||
}
|
||
|
||
// 启动线程,不断发消息
|
||
go func() {
|
||
var (
|
||
err error
|
||
)
|
||
for {
|
||
if err = conn.WriteMessage([]byte("heartbeat")); err != nil {
|
||
return
|
||
}
|
||
time.Sleep(1 * time.Second)
|
||
}
|
||
}()
|
||
|
||
for {
|
||
if data, err = conn.ReadMessage(); err != nil {
|
||
goto ERR
|
||
}
|
||
if err = conn.WriteMessage(data); err != nil {
|
||
goto ERR
|
||
}
|
||
}
|
||
|
||
ERR:
|
||
conn.Close()
|
||
|
||
}
|
||
|
||
func InitConnection(wsConn *websocket.Conn) (conn *Connection, err error) {
|
||
conn = &Connection{
|
||
wsConnect: wsConn,
|
||
inChan: make(chan []byte, 1000),
|
||
outChan: make(chan []byte, 1000),
|
||
closeChan: make(chan byte, 1),
|
||
}
|
||
// 启动读协程
|
||
go conn.readLoop()
|
||
// 启动写协程
|
||
go conn.writeLoop()
|
||
return
|
||
}
|
||
|
||
func (conn *Connection) ReadMessage() (data []byte, err error) {
|
||
|
||
select {
|
||
case data = <-conn.inChan:
|
||
case <-conn.closeChan:
|
||
err = errors.New("connection is closeed")
|
||
}
|
||
return
|
||
}
|
||
|
||
func (conn *Connection) WriteMessage(data []byte) (err error) {
|
||
|
||
select {
|
||
case conn.outChan <- data:
|
||
case <-conn.closeChan:
|
||
err = errors.New("connection is closeed")
|
||
}
|
||
return
|
||
}
|
||
|
||
func (conn *Connection) Close() {
|
||
// 线程安全,可多次调用
|
||
conn.wsConnect.Close()
|
||
// 利用标记,让closeChan只关闭一次
|
||
conn.mutex.Lock()
|
||
if !conn.isClosed {
|
||
close(conn.closeChan)
|
||
conn.isClosed = true
|
||
}
|
||
conn.mutex.Unlock()
|
||
}
|
||
|
||
// 内部实现
|
||
func (conn *Connection) readLoop() {
|
||
var (
|
||
data []byte
|
||
err error
|
||
)
|
||
for {
|
||
if _, data, err = conn.wsConnect.ReadMessage(); err != nil {
|
||
goto ERR
|
||
}
|
||
//阻塞在这里,等待inChan有空闲位置
|
||
select {
|
||
case conn.inChan <- data:
|
||
case <-conn.closeChan: // closeChan 感知 conn断开
|
||
goto ERR
|
||
}
|
||
|
||
}
|
||
|
||
ERR:
|
||
conn.Close()
|
||
}
|
||
|
||
func (conn *Connection) writeLoop() {
|
||
var (
|
||
data []byte
|
||
err error
|
||
)
|
||
|
||
for {
|
||
select {
|
||
case data = <-conn.outChan:
|
||
case <-conn.closeChan:
|
||
goto ERR
|
||
}
|
||
if err = conn.wsConnect.WriteMessage(websocket.TextMessage, data); err != nil {
|
||
goto ERR
|
||
}
|
||
}
|
||
|
||
ERR:
|
||
conn.Close()
|
||
|
||
}
|