diff --git a/network/websocketserver.go b/network/websocketserver.go index ffb8113..114b31b 100644 --- a/network/websocketserver.go +++ b/network/websocketserver.go @@ -1,9 +1,11 @@ package network import ( + "errors" "fmt" "net/http" "os" + "runtime/debug" "sync" "time" @@ -90,6 +92,8 @@ func (slf *WebsocketServer) ReleaseClient(pclient *WSClient) { slf.locker.Lock() delete(slf.mapClient, pclient.clientid) slf.locker.Unlock() + //关闭写管道 + close(pclient.bwritemsg) } func (slf *WebsocketServer) SetupReciver(pattern string, messageReciver IMessageReceiver, bEnableCompression bool) { @@ -127,8 +131,16 @@ func (slf *WebsocketServer) startListen() { func (slf *WSClient) startSendMsg() { for { - msgbuf := <-slf.bwritemsg - slf.conn.WriteMessage(msgbuf.msgtype, msgbuf.bwritemsg) + msgbuf, ok := <-slf.bwritemsg + if ok == false { + break + } + + err := slf.conn.WriteMessage(msgbuf.msgtype, msgbuf.bwritemsg) + if err != nil { + service.GetLogger().Printf(sysmodule.LEVER_INFO, "write client id %d is error :%v\n", slf.clientid, err) + break + } } } @@ -146,6 +158,7 @@ func (slf *WebsocketServer) SendMsg(clientid uint64, messageType int, msg []byte } value.bwritemsg <- WSMessage{messageType, msg} + return true } @@ -164,6 +177,23 @@ func (slf *WebsocketServer) Stop() { } func (slf *BaseMessageReciver) startReadMsg(pclient *WSClient) { + defer func() { + if r := recover(); r != nil { + var coreInfo string + str, ok := r.(string) + if ok { + coreInfo = string(debug.Stack()) + } else { + coreInfo = "Panic!" + } + + coreInfo += "\n" + fmt.Sprintf("core information is %s\n", str) + service.GetLogger().Printf(service.LEVER_FATAL, coreInfo) + slf.messageReciver.OnDisconnect(pclient.clientid, errors.New("core dump")) + slf.WsServer.ReleaseClient(pclient) + } + }() + for { msgtype, message, err := pclient.conn.ReadMessage() if err != nil {