新增网络模块

This commit is contained in:
boyce
2019-01-26 13:56:26 +08:00
parent 7e17a99f43
commit 674ecae76e
3 changed files with 2 additions and 184 deletions

View File

@@ -1,182 +0,0 @@
package netbase
import (
"errors"
"net/http"
"sync"
"time"
"github.com/gorilla/websocket"
)
type Connection struct {
wsConnect *websocket.Conn
inChan chan []byte
outChan chan []byte
closeChan chan byte
mutex sync.Mutex // 对closeChan关闭上锁
isClosed bool // 防止closeChan被关闭多次
}
type CWebSocketServer struct {
Connection
bindurl string
}
func NewWebSocketServer(bindurl string) *CWebSocketServer {
wss := new(CWebSocketServer)
wss.bindurl = bindurl
return wss
}
func (wss *CWebSocketServer) Start() {
http.HandleFunc("/ws", wsHandler)
go http.ListenAndServe(wss.bindurl, nil)
}
var (
upgrader = websocket.Upgrader{
// 允许跨域
CheckOrigin: func(r *http.Request) bool {
return true
},
}
)
func wsHandler(w http.ResponseWriter, r *http.Request) {
// w.Write([]byte("hello"))
var (
wsConn *websocket.Conn
err error
conn *Connection
data []byte
)
// 完成ws协议的握手操作
// Upgrade:websocket
if wsConn, err = upgrader.Upgrade(w, r, nil); err != nil {
return
}
if conn, err = InitConnection(wsConn); err != nil {
goto ERR
}
// 启动线程,不断发消息
go func() {
var (
err error
)
for {
if err = conn.WriteMessage([]byte("heartbeat")); err != nil {
return
}
time.Sleep(1 * time.Second)
}
}()
for {
if data, err = conn.ReadMessage(); err != nil {
goto ERR
}
if err = conn.WriteMessage(data); err != nil {
goto ERR
}
}
ERR:
conn.Close()
}
func InitConnection(wsConn *websocket.Conn) (conn *Connection, err error) {
conn = &Connection{
wsConnect: wsConn,
inChan: make(chan []byte, 1000),
outChan: make(chan []byte, 1000),
closeChan: make(chan byte, 1),
}
// 启动读协程
go conn.readLoop()
// 启动写协程
go conn.writeLoop()
return
}
func (conn *Connection) ReadMessage() (data []byte, err error) {
select {
case data = <-conn.inChan:
case <-conn.closeChan:
err = errors.New("connection is closeed")
}
return
}
func (conn *Connection) WriteMessage(data []byte) (err error) {
select {
case conn.outChan <- data:
case <-conn.closeChan:
err = errors.New("connection is closeed")
}
return
}
func (conn *Connection) Close() {
// 线程安全,可多次调用
conn.wsConnect.Close()
// 利用标记让closeChan只关闭一次
conn.mutex.Lock()
if !conn.isClosed {
close(conn.closeChan)
conn.isClosed = true
}
conn.mutex.Unlock()
}
// 内部实现
func (conn *Connection) readLoop() {
var (
data []byte
err error
)
for {
if _, data, err = conn.wsConnect.ReadMessage(); err != nil {
goto ERR
}
//阻塞在这里等待inChan有空闲位置
select {
case conn.inChan <- data:
case <-conn.closeChan: // closeChan 感知 conn断开
goto ERR
}
}
ERR:
conn.Close()
}
func (conn *Connection) writeLoop() {
var (
data []byte
err error
)
for {
select {
case data = <-conn.outChan:
case <-conn.closeChan:
goto ERR
}
if err = conn.wsConnect.WriteMessage(websocket.TextMessage, data); err != nil {
goto ERR
}
}
ERR:
conn.Close()
}

View File

@@ -1,4 +1,4 @@
package helper package network
import ( import (
"fmt" "fmt"

View File

@@ -1,4 +1,4 @@
package helper package network
import ( import (
"fmt" "fmt"