From ba4afb4e3079abbeaa2f2c0a6694df81918f9727 Mon Sep 17 00:00:00 2001 From: boyce Date: Fri, 12 Apr 2019 15:25:00 +0800 Subject: [PATCH] =?UTF-8?q?websocket=20client=20=E9=87=8D=E8=BF=9E?= =?UTF-8?q?=E7=BC=93=E5=86=B2=E5=8C=BA=E6=95=B0=E6=8D=AE=E6=B2=A1=E6=9C=89?= =?UTF-8?q?=E6=B8=85=E7=A9=BAbug?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- network/websocketclient.go | 42 ++++++++++++++++++++++++++++++++------ 1 file changed, 36 insertions(+), 6 deletions(-) diff --git a/network/websocketclient.go b/network/websocketclient.go index 90bae53..fa687a0 100644 --- a/network/websocketclient.go +++ b/network/websocketclient.go @@ -1,6 +1,7 @@ package network import ( + "errors" "fmt" "net/http" "net/url" @@ -32,6 +33,7 @@ type WebsocketClient struct { url string state int //0未连接状态 1正在重连 2连接状态 bwritemsg chan []byte + closer chan bool slf IWebsocketClient timeoutsec time.Duration @@ -39,6 +41,10 @@ type WebsocketClient struct { ping string } +const ( + MAX_WRITE_MSG = 10240 +) + //Init ... func (ws *WebsocketClient) Init(slf IWebsocketClient, strurl, strProxyPath string, timeoutsec time.Duration) error { @@ -65,7 +71,6 @@ func (ws *WebsocketClient) Init(slf IWebsocketClient, strurl, strProxyPath strin } ws.url = strurl - ws.bwritemsg = make(chan []byte, 1000) ws.ping = `ping` return nil } @@ -94,8 +99,9 @@ func (ws *WebsocketClient) OnRun() error { time.Sleep(2 * time.Second) ws.StartConnect() } else if ws.state == 1 { - ws.conn.Close() ws.state = 0 + close(ws.closer) + ws.conn.Close() ws.slf.OnDisconnect() } else if ws.state == 2 { ws.conn.SetReadDeadline(time.Now().Add(ws.timeoutsec * time.Second)) @@ -105,6 +111,7 @@ func (ws *WebsocketClient) OnRun() error { service.GetLogger().Printf(service.LEVER_WARN, "websocket client is disconnect [%s],information is %v", ws.url, err) ws.conn.Close() ws.state = 0 + close(ws.closer) ws.slf.OnDisconnect() continue } @@ -125,7 +132,8 @@ func (ws *WebsocketClient) StartConnect() error { if err != nil { return err } - + ws.closer = make(chan bool) + ws.bwritemsg = make(chan []byte, MAX_WRITE_MSG) ws.state = 2 ws.slf.OnConnected() @@ -167,6 +175,11 @@ func (ws *WebsocketClient) writeMsg() error { continue } select { + case _, ok := <-ws.closer: + if ok == false { + break + } + case <-timerC: if ws.state == 2 { err := ws.WriteMessage([]byte(ws.ping)) @@ -177,8 +190,8 @@ func (ws *WebsocketClient) writeMsg() error { ws.slf.OnDisconnect() } } - case msg := <-ws.bwritemsg: - if ws.state == 2 { + case msg, ok := <-ws.bwritemsg: + if ok == true && ws.state == 2 { ws.conn.SetWriteDeadline(time.Now().Add(ws.timeoutsec * time.Second)) err := ws.conn.WriteMessage(websocket.TextMessage, msg) if err != nil { @@ -201,7 +214,24 @@ func (ws *WebsocketClient) ReConnect() { //WriteMessage ... func (ws *WebsocketClient) WriteMessage(msg []byte) error { - ws.bwritemsg <- msg + if ws.closer == nil || ws.bwritemsg == nil { + service.GetLogger().Printf(service.LEVER_WARN, "WriteMessage data fail,websocket client is disconnect.") + return errors.New("riteMessage data fail,websocket client is disconnect.") + } + select { + case <-ws.closer: + service.GetLogger().Printf(service.LEVER_WARN, "WriteMessage data fail,websocket client is disconnect.") + return errors.New("riteMessage data fail,websocket client is disconnect.") + default: + if len(ws.bwritemsg) < MAX_WRITE_MSG { + ws.bwritemsg <- msg + } else { + service.GetLogger().Printf(service.LEVER_ERROR, "WriteMessage data fail,bwriteMsg is overload.") + return errors.New("WriteMessage data fail,bwriteMsg is overload.") + } + + } + return nil }