From 776b23402208c79f4bab48176d12bc4b6e009517 Mon Sep 17 00:00:00 2001 From: orgin Date: Thu, 2 Jun 2022 16:09:16 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E7=BD=91=E7=BB=9C=E8=BF=9E?= =?UTF-8?q?=E6=8E=A5Id=E7=94=9F=E6=88=90=E8=A7=84=E5=88=99&=E4=BC=98?= =?UTF-8?q?=E5=8C=96WebSocket=E6=9C=8D=E5=8A=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- network/processor/jsonprocessor.go | 15 +++++++-- network/ws_client.go | 8 +++-- network/ws_conn.go | 4 +-- network/ws_server.go | 16 +++++++++- sysservice/tcpservice/tcpservice.go | 17 +++++------ sysservice/wsservice/wsservice.go | 47 +++++++++++++++++++++++------ 6 files changed, 82 insertions(+), 25 deletions(-) diff --git a/network/processor/jsonprocessor.go b/network/processor/jsonprocessor.go index 7d32912..661a6ab 100644 --- a/network/processor/jsonprocessor.go +++ b/network/processor/jsonprocessor.go @@ -5,6 +5,7 @@ import ( "fmt" "github.com/duanhf2012/origin/network" "reflect" + "github.com/duanhf2012/origin/log" ) type MessageJsonInfo struct { @@ -104,15 +105,25 @@ func (jsonProcessor *JsonProcessor) MakeRawMsg(msgType uint16,msg []byte) *JsonP } func (jsonProcessor *JsonProcessor) UnknownMsgRoute(clientId uint64,msg interface{}){ + if jsonProcessor.unknownMessageHandler==nil { + log.SDebug("Unknown message received from ",clientId) + return + } + jsonProcessor.unknownMessageHandler(clientId,msg.([]byte)) + } func (jsonProcessor *JsonProcessor) ConnectedRoute(clientId uint64){ - jsonProcessor.connectHandler(clientId) + if jsonProcessor.connectHandler != nil { + jsonProcessor.connectHandler(clientId) + } } func (jsonProcessor *JsonProcessor) DisConnectedRoute(clientId uint64){ - jsonProcessor.disconnectHandler(clientId) + if jsonProcessor.disconnectHandler != nil { + jsonProcessor.disconnectHandler(clientId) + } } func (jsonProcessor *JsonProcessor) RegisterUnknownMsg(unknownMessageHandler UnknownMessageJsonHandler){ diff --git a/network/ws_client.go b/network/ws_client.go index 6c06b6b..7e469d2 100644 --- a/network/ws_client.go +++ b/network/ws_client.go @@ -21,6 +21,7 @@ type WSClient struct { cons WebsocketConnSet wg sync.WaitGroup closeFlag bool + messageType int } func (client *WSClient) Start() { @@ -62,7 +63,7 @@ func (client *WSClient) init() { if client.cons != nil { log.SFatal("client is running") } - + client.messageType = websocket.TextMessage client.cons = make(WebsocketConnSet) client.closeFlag = false client.dialer = websocket.Dialer{ @@ -83,6 +84,9 @@ func (client *WSClient) dial() *websocket.Conn { } } +func (client *WSClient) SetMessageType(messageType int){ + client.messageType = messageType +} func (client *WSClient) connect() { defer client.wg.Done() @@ -102,7 +106,7 @@ reconnect: client.cons[conn] = struct{}{} client.Unlock() - wsConn := newWSConn(conn, client.PendingWriteNum, client.MaxMsgLen) + wsConn := newWSConn(conn, client.PendingWriteNum, client.MaxMsgLen,client.messageType) agent := client.NewAgent(wsConn) agent.Run() diff --git a/network/ws_conn.go b/network/ws_conn.go index 6c7f2f1..2c4fc29 100644 --- a/network/ws_conn.go +++ b/network/ws_conn.go @@ -18,7 +18,7 @@ type WSConn struct { closeFlag bool } -func newWSConn(conn *websocket.Conn, pendingWriteNum int, maxMsgLen uint32) *WSConn { +func newWSConn(conn *websocket.Conn, pendingWriteNum int, maxMsgLen uint32,messageType int) *WSConn { wsConn := new(WSConn) wsConn.conn = conn wsConn.writeChan = make(chan []byte, pendingWriteNum) @@ -30,7 +30,7 @@ func newWSConn(conn *websocket.Conn, pendingWriteNum int, maxMsgLen uint32) *WSC break } - err := conn.WriteMessage(websocket.BinaryMessage, b) + err := conn.WriteMessage(messageType, b) if err != nil { break } diff --git a/network/ws_server.go b/network/ws_server.go index 8546f01..170a62f 100644 --- a/network/ws_server.go +++ b/network/ws_server.go @@ -21,6 +21,7 @@ type WSServer struct { NewAgent func(*WSConn) Agent ln net.Listener handler *WSHandler + messageType int } type WSHandler struct { @@ -32,6 +33,11 @@ type WSHandler struct { conns WebsocketConnSet mutexConns sync.Mutex wg sync.WaitGroup + messageType int +} + +func (handler *WSHandler) SetMessageType(messageType int){ + handler.messageType = messageType } func (handler *WSHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { @@ -45,6 +51,7 @@ func (handler *WSHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } conn.SetReadLimit(int64(handler.maxMsgLen)) + handler.messageType = websocket.TextMessage handler.wg.Add(1) defer handler.wg.Done() @@ -64,7 +71,7 @@ func (handler *WSHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { handler.conns[conn] = struct{}{} handler.mutexConns.Unlock() - wsConn := newWSConn(conn, handler.pendingWriteNum, handler.maxMsgLen) + wsConn := newWSConn(conn, handler.pendingWriteNum, handler.maxMsgLen,handler.messageType) agent := handler.newAgent(wsConn) agent.Run() @@ -76,6 +83,13 @@ func (handler *WSHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { agent.OnClose() } +func (server *WSServer) SetMessageType(messageType int){ + server.messageType = messageType + if server.handler!= nil { + server.handler.SetMessageType(messageType) + } +} + func (server *WSServer) Start() { ln, err := net.Listen("tcp", server.Addr) if err != nil { diff --git a/sysservice/tcpservice/tcpservice.go b/sysservice/tcpservice/tcpservice.go index 7732440..dcc64dd 100644 --- a/sysservice/tcpservice/tcpservice.go +++ b/sysservice/tcpservice/tcpservice.go @@ -8,6 +8,7 @@ import ( "github.com/duanhf2012/origin/network/processor" "github.com/duanhf2012/origin/node" "github.com/duanhf2012/origin/service" + "sync/atomic" "sync" "time" "runtime" @@ -42,12 +43,12 @@ const Default_ReadDeadline = 180 //30s const Default_WriteDeadline = 180 //30s const ( - MaxNodeId = 1<<10 - 1 //Uint10 - MaxSeed = 1<<22 - 1 //MaxUint24 + MaxNodeId = 1<<14 - 1 //最大值 16383 + MaxSeed = 1<<19 - 1 //最大值 524287 + MaxTime = 1<<31 - 1 //最大值 2147483647 ) var seed uint32 -var seedLocker sync.Mutex type TcpPack struct { Type TcpPackType //0表示连接 1表示断开 2表示数据 @@ -66,14 +67,12 @@ func (tcpService *TcpService) genId() uint64 { panic("nodeId exceeds the maximum!") } - seedLocker.Lock() - seed = (seed+1)%MaxSeed - seedLocker.Unlock() - - nowTime := uint64(time.Now().Second()) - return (uint64(node.GetNodeId())<<54)|(nowTime<<22)|uint64(seed) + newSeed := atomic.AddUint32(&seed,1) % MaxSeed + nowTime := uint64(time.Now().Unix())%MaxTime + return (uint64(node.GetNodeId())<<50)|(nowTime<<19)|uint64(newSeed) } + func GetNodeId(agentId uint64) int { return int(agentId>>54) } diff --git a/sysservice/wsservice/wsservice.go b/sysservice/wsservice/wsservice.go index b9d8d9f..6cb42dc 100644 --- a/sysservice/wsservice/wsservice.go +++ b/sysservice/wsservice/wsservice.go @@ -7,19 +7,27 @@ import ( "github.com/duanhf2012/origin/network" "github.com/duanhf2012/origin/network/processor" "github.com/duanhf2012/origin/service" + "github.com/duanhf2012/origin/node" "sync" + "sync/atomic" + "time" ) + + type WSService struct { service.Service wsServer network.WSServer mapClientLocker sync.RWMutex mapClient map[uint64] *WSClient - initClientId uint64 process processor.IProcessor + + } +var seed uint32 + type WSPackType int8 const( WPT_Connected WSPackType = 0 @@ -32,6 +40,12 @@ const Default_WS_MaxConnNum = 3000 const Default_WS_PendingWriteNum = 10000 const Default_WS_MaxMsgLen = 65535 +const ( + MaxNodeId = 1<<14 - 1 //最大值 16383 + MaxSeed = 1<<19 - 1 //最大值 524287 + MaxTime = 1<<31 - 1 //最大值 2147483647 +) + type WSClient struct { id uint64 wsConn *network.WSConn @@ -46,6 +60,7 @@ type WSPack struct { } func (ws *WSService) OnInit() error{ + iConfig := ws.GetServiceCfg() if iConfig == nil { return fmt.Errorf("%s service config is error!", ws.GetName()) @@ -80,6 +95,10 @@ func (ws *WSService) OnInit() error{ return nil } +func (ws *WSService) SetMessageType(messageType int){ + ws.wsServer.SetMessageType(messageType) +} + func (ws *WSService) WSEventHandler(ev event.IEvent) { pack := ev.(*event.Event).Data.(*WSPack) switch pack.Type { @@ -88,9 +107,9 @@ func (ws *WSService) WSEventHandler(ev event.IEvent) { case WPT_DisConnected: pack.MsgProcessor.DisConnectedRoute(pack.ClientId) case WPT_UnknownPack: - pack.MsgProcessor.UnknownMsgRoute(pack.Data,pack.ClientId) + pack.MsgProcessor.UnknownMsgRoute(pack.ClientId,pack.Data) case WPT_Pack: - pack.MsgProcessor.MsgRoute(pack.Data, pack.ClientId) + pack.MsgProcessor.MsgRoute(pack.ClientId,pack.Data) } } @@ -99,20 +118,30 @@ func (ws *WSService) SetProcessor(process processor.IProcessor,handler event.IEv ws.RegEventReceiverFunc(event.Sys_Event_WebSocket,handler, ws.WSEventHandler) } +func (ws *WSService) genId() uint64 { + if node.GetNodeId()>MaxNodeId{ + panic("nodeId exceeds the maximum!") + } + + newSeed := atomic.AddUint32(&seed,1) % MaxSeed + nowTime := uint64(time.Now().Unix())%MaxTime + return (uint64(node.GetNodeId())<<50)|(nowTime<<19)|uint64(newSeed) +} + func (ws *WSService) NewWSClient(conn *network.WSConn) network.Agent { ws.mapClientLocker.Lock() defer ws.mapClientLocker.Unlock() for { - ws.initClientId+=1 - _,ok := ws.mapClient[ws.initClientId] + clientId := ws.genId() + _,ok := ws.mapClient[clientId] if ok == true { continue } - pClient := &WSClient{wsConn:conn, id: ws.initClientId} + pClient := &WSClient{wsConn:conn, id: clientId} pClient.wsService = ws - ws.mapClient[ws.initClientId] = pClient + ws.mapClient[clientId] = pClient return pClient } @@ -131,7 +160,7 @@ func (slf *WSClient) Run() { log.Debug("read client id %d is error:%+v",slf.id,err) break } - data,err:=slf.wsService.process.Unmarshal(bytes) + data,err:=slf.wsService.process.Unmarshal(slf.id,bytes) if err != nil { slf.wsService.NotifyEvent(&event.Event{Type:event.Sys_Event_WebSocket,Data:&WSPack{ClientId:slf.id,Type:WPT_UnknownPack,Data:bytes,MsgProcessor:slf.wsService.process}}) continue @@ -156,7 +185,7 @@ func (ws *WSService) SendMsg(clientid uint64,msg interface{}) error{ } ws.mapClientLocker.Unlock() - bytes,err := ws.process.Marshal(msg) + bytes,err := ws.process.Marshal(clientid,msg) if err != nil { return err }