From 3f45b19babb69cc79fa43d32822289b8affe8c7c Mon Sep 17 00:00:00 2001 From: boyce Date: Fri, 26 Apr 2024 10:45:58 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96wsservice=E4=B8=8Etcpservice?= =?UTF-8?q?=E7=9A=84clientid?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- network/processor/jsonprocessor.go | 20 ++++---- network/processor/pbprocessor.go | 20 ++++---- network/processor/processor.go | 12 ++--- sysservice/tcpservice/tcpservice.go | 72 ++++++++--------------------- sysservice/wsservice/wsservice.go | 63 +++++++------------------ 5 files changed, 62 insertions(+), 125 deletions(-) diff --git a/network/processor/jsonprocessor.go b/network/processor/jsonprocessor.go index df571ff..2b4f7a3 100644 --- a/network/processor/jsonprocessor.go +++ b/network/processor/jsonprocessor.go @@ -13,9 +13,9 @@ type MessageJsonInfo struct { msgHandler MessageJsonHandler } -type MessageJsonHandler func(clientId uint64,msg interface{}) -type ConnectJsonHandler func(clientId uint64) -type UnknownMessageJsonHandler func(clientId uint64,msg []byte) +type MessageJsonHandler func(clientId string,msg interface{}) +type ConnectJsonHandler func(clientId string) +type UnknownMessageJsonHandler func(clientId string,msg []byte) type JsonProcessor struct { mapMsg map[uint16]MessageJsonInfo @@ -45,7 +45,7 @@ func (jsonProcessor *JsonProcessor) SetByteOrder(littleEndian bool) { } // must goroutine safe -func (jsonProcessor *JsonProcessor ) MsgRoute(clientId uint64,msg interface{}) error{ +func (jsonProcessor *JsonProcessor ) MsgRoute(clientId string,msg interface{}) error{ pPackInfo := msg.(*JsonPackInfo) v,ok := jsonProcessor.mapMsg[pPackInfo.typ] if ok == false { @@ -56,7 +56,7 @@ func (jsonProcessor *JsonProcessor ) MsgRoute(clientId uint64,msg interface{}) e return nil } -func (jsonProcessor *JsonProcessor) Unmarshal(clientId uint64,data []byte) (interface{}, error) { +func (jsonProcessor *JsonProcessor) Unmarshal(clientId string,data []byte) (interface{}, error) { typeStruct := struct {Type int `json:"typ"`}{} defer jsonProcessor.ReleaseBytes(data) err := json.Unmarshal(data, &typeStruct) @@ -79,7 +79,7 @@ func (jsonProcessor *JsonProcessor) Unmarshal(clientId uint64,data []byte) (inte return &JsonPackInfo{typ:msgType,msg:msgData},nil } -func (jsonProcessor *JsonProcessor) Marshal(clientId uint64,msg interface{}) ([]byte, error) { +func (jsonProcessor *JsonProcessor) Marshal(clientId string,msg interface{}) ([]byte, error) { rawMsg,err := json.Marshal(msg) if err != nil { return nil,err @@ -104,9 +104,9 @@ func (jsonProcessor *JsonProcessor) MakeRawMsg(msgType uint16,msg []byte) *JsonP return &JsonPackInfo{typ:msgType,rawMsg:msg} } -func (jsonProcessor *JsonProcessor) UnknownMsgRoute(clientId uint64,msg interface{}){ +func (jsonProcessor *JsonProcessor) UnknownMsgRoute(clientId string,msg interface{}){ if jsonProcessor.unknownMessageHandler==nil { - log.Debug("Unknown message",log.Uint64("clientId",clientId)) + log.Debug("Unknown message",log.String("clientId",clientId)) return } @@ -114,13 +114,13 @@ func (jsonProcessor *JsonProcessor) UnknownMsgRoute(clientId uint64,msg interfac } -func (jsonProcessor *JsonProcessor) ConnectedRoute(clientId uint64){ +func (jsonProcessor *JsonProcessor) ConnectedRoute(clientId string){ if jsonProcessor.connectHandler != nil { jsonProcessor.connectHandler(clientId) } } -func (jsonProcessor *JsonProcessor) DisConnectedRoute(clientId uint64){ +func (jsonProcessor *JsonProcessor) DisConnectedRoute(clientId string){ if jsonProcessor.disconnectHandler != nil { jsonProcessor.disconnectHandler(clientId) } diff --git a/network/processor/pbprocessor.go b/network/processor/pbprocessor.go index 07afc79..44bc439 100644 --- a/network/processor/pbprocessor.go +++ b/network/processor/pbprocessor.go @@ -13,9 +13,9 @@ type MessageInfo struct { msgHandler MessageHandler } -type MessageHandler func(clientId uint64, msg proto.Message) -type ConnectHandler func(clientId uint64) -type UnknownMessageHandler func(clientId uint64, msg []byte) +type MessageHandler func(clientId string, msg proto.Message) +type ConnectHandler func(clientId string) +type UnknownMessageHandler func(clientId string, msg []byte) const MsgTypeSize = 2 @@ -54,7 +54,7 @@ func (slf *PBPackInfo) GetMsg() proto.Message { } // must goroutine safe -func (pbProcessor *PBProcessor) MsgRoute(clientId uint64, msg interface{}) error { +func (pbProcessor *PBProcessor) MsgRoute(clientId string, msg interface{}) error { pPackInfo := msg.(*PBPackInfo) v, ok := pbProcessor.mapMsg[pPackInfo.typ] if ok == false { @@ -66,13 +66,13 @@ func (pbProcessor *PBProcessor) MsgRoute(clientId uint64, msg interface{}) error } // must goroutine safe -func (pbProcessor *PBProcessor) Unmarshal(clientId uint64, data []byte) (interface{}, error) { +func (pbProcessor *PBProcessor) Unmarshal(clientId string, data []byte) (interface{}, error) { defer pbProcessor.ReleaseBytes(data) return pbProcessor.UnmarshalWithOutRelease(clientId, data) } // unmarshal but not release data -func (pbProcessor *PBProcessor) UnmarshalWithOutRelease(clientId uint64, data []byte) (interface{}, error) { +func (pbProcessor *PBProcessor) UnmarshalWithOutRelease(clientId string, data []byte) (interface{}, error) { var msgType uint16 if pbProcessor.LittleEndian == true { msgType = binary.LittleEndian.Uint16(data[:2]) @@ -95,7 +95,7 @@ func (pbProcessor *PBProcessor) UnmarshalWithOutRelease(clientId uint64, data [] } // must goroutine safe -func (pbProcessor *PBProcessor) Marshal(clientId uint64, msg interface{}) ([]byte, error) { +func (pbProcessor *PBProcessor) Marshal(clientId string, msg interface{}) ([]byte, error) { pMsg := msg.(*PBPackInfo) var err error @@ -133,16 +133,16 @@ func (pbProcessor *PBProcessor) MakeRawMsg(msgType uint16, msg []byte) *PBPackIn return &PBPackInfo{typ: msgType, rawMsg: msg} } -func (pbProcessor *PBProcessor) UnknownMsgRoute(clientId uint64, msg interface{}) { +func (pbProcessor *PBProcessor) UnknownMsgRoute(clientId string, msg interface{}) { pbProcessor.unknownMessageHandler(clientId, msg.([]byte)) } // connect event -func (pbProcessor *PBProcessor) ConnectedRoute(clientId uint64) { +func (pbProcessor *PBProcessor) ConnectedRoute(clientId string) { pbProcessor.connectHandler(clientId) } -func (pbProcessor *PBProcessor) DisConnectedRoute(clientId uint64) { +func (pbProcessor *PBProcessor) DisConnectedRoute(clientId string) { pbProcessor.disconnectHandler(clientId) } diff --git a/network/processor/processor.go b/network/processor/processor.go index 19806f8..b28279a 100644 --- a/network/processor/processor.go +++ b/network/processor/processor.go @@ -3,17 +3,17 @@ package processor type IProcessor interface { // must goroutine safe - MsgRoute(clientId uint64,msg interface{}) error + MsgRoute(clientId string,msg interface{}) error //must goroutine safe - UnknownMsgRoute(clientId uint64,msg interface{}) + UnknownMsgRoute(clientId string,msg interface{}) // connect event - ConnectedRoute(clientId uint64) - DisConnectedRoute(clientId uint64) + ConnectedRoute(clientId string) + DisConnectedRoute(clientId string) // must goroutine safe - Unmarshal(clientId uint64,data []byte) (interface{}, error) + Unmarshal(clientId string,data []byte) (interface{}, error) // must goroutine safe - Marshal(clientId uint64,msg interface{}) ([]byte, error) + Marshal(clientId string,msg interface{}) ([]byte, error) } type IRawProcessor interface { diff --git a/sysservice/tcpservice/tcpservice.go b/sysservice/tcpservice/tcpservice.go index 9209ebc..f998165 100644 --- a/sysservice/tcpservice/tcpservice.go +++ b/sysservice/tcpservice/tcpservice.go @@ -10,8 +10,9 @@ import ( "github.com/duanhf2012/origin/v2/util/bytespool" "runtime" "sync" - "sync/atomic" + "github.com/google/uuid" "time" + "strings" ) type TcpService struct { @@ -19,9 +20,8 @@ type TcpService struct { service.Service mapClientLocker sync.RWMutex - mapClient map[uint64] *Client + mapClient map[string] *Client process processor.IProcessor - machineId uint16 } type TcpPackType int8 @@ -32,32 +32,18 @@ const( TPT_UnknownPack TcpPackType = 3 ) -const ( - MaxMachineId = 1<<14 - 1 //最大值 16383 - MaxSeed = 1<<19 - 1 //最大值 524287 - MaxTime = 1<<31 - 1 //最大值 2147483647 -) - -var seed uint32 - type TcpPack struct { Type TcpPackType //0表示连接 1表示断开 2表示数据 - ClientId uint64 + ClientId string Data interface{} } type Client struct { - id uint64 + id string tcpConn *network.TCPConn tcpService *TcpService } -func (tcpService *TcpService) genId() uint64 { - newSeed := atomic.AddUint32(&seed,1) % MaxSeed - nowTime := uint64(time.Now().Unix())%MaxTime - return (uint64(tcpService.machineId)<<50)|(nowTime<<19)|uint64(newSeed) -} - func (tcpService *TcpService) OnInit() error{ iConfig := tcpService.GetServiceCfg() if iConfig == nil { @@ -74,17 +60,7 @@ func (tcpService *TcpService) OnInit() error{ if ok == true { tcpService.tcpServer.MaxConnNum = int(MaxConnNum.(float64)) } - - MachineId,ok := tcpCfg["MachineId"] - if ok == true { - tcpService.machineId = uint16(MachineId.(float64)) - if tcpService.machineId > MaxMachineId { - return fmt.Errorf("MachineId is error!") - } - }else { - return fmt.Errorf("MachineId is error!") - } - + PendingWriteNum,ok := tcpCfg["PendingWriteNum"] if ok == true { tcpService.tcpServer.PendingWriteNum = int(PendingWriteNum.(float64)) @@ -116,7 +92,7 @@ func (tcpService *TcpService) OnInit() error{ tcpService.tcpServer.WriteDeadline = time.Second*time.Duration(writeDeadline.(float64)) } - tcpService.mapClient = make( map[uint64] *Client, tcpService.tcpServer.MaxConnNum) + tcpService.mapClient = make( map[string] *Client, tcpService.tcpServer.MaxConnNum) tcpService.tcpServer.NewAgent = tcpService.NewClient tcpService.tcpServer.Start() @@ -146,24 +122,16 @@ func (tcpService *TcpService) NewClient(conn *network.TCPConn) network.Agent { tcpService.mapClientLocker.Lock() defer tcpService.mapClientLocker.Unlock() - for { - clientId := tcpService.genId() - _,ok := tcpService.mapClient[clientId] - if ok == true { - continue - } + uuId,_ := uuid.NewUUID() + clientId := strings.ReplaceAll(uuId.String(), "-", "") + pClient := &Client{tcpConn: conn, id: clientId} + pClient.tcpService = tcpService + tcpService.mapClient[clientId] = pClient - pClient := &Client{tcpConn:conn, id:clientId} - pClient.tcpService = tcpService - tcpService.mapClient[clientId] = pClient - - return pClient - } - - return nil + return pClient } -func (slf *Client) GetId() uint64 { +func (slf *Client) GetId() string { return slf.id } @@ -186,7 +154,7 @@ func (slf *Client) Run() { slf.tcpConn.SetReadDeadline(slf.tcpService.tcpServer.ReadDeadline) bytes,err := slf.tcpConn.ReadMsg() if err != nil { - log.Debug("read client failed",log.ErrorAttr("error",err),log.Uint64("clientId",slf.id)) + log.Debug("read client failed",log.ErrorAttr("error",err),log.String("clientId",slf.id)) break } data,err:=slf.tcpService.process.Unmarshal(slf.id,bytes) @@ -206,7 +174,7 @@ func (slf *Client) OnClose(){ delete (slf.tcpService.mapClient,slf.GetId()) } -func (tcpService *TcpService) SendMsg(clientId uint64,msg interface{}) error{ +func (tcpService *TcpService) SendMsg(clientId string,msg interface{}) error{ tcpService.mapClientLocker.Lock() client,ok := tcpService.mapClient[clientId] if ok == false{ @@ -222,7 +190,7 @@ func (tcpService *TcpService) SendMsg(clientId uint64,msg interface{}) error{ return client.tcpConn.WriteMsg(bytes) } -func (tcpService *TcpService) Close(clientId uint64) { +func (tcpService *TcpService) Close(clientId string) { tcpService.mapClientLocker.Lock() defer tcpService.mapClientLocker.Unlock() @@ -238,7 +206,7 @@ func (tcpService *TcpService) Close(clientId uint64) { return } -func (tcpService *TcpService) GetClientIp(clientid uint64) string{ +func (tcpService *TcpService) GetClientIp(clientid string) string{ tcpService.mapClientLocker.Lock() defer tcpService.mapClientLocker.Unlock() pClient,ok := tcpService.mapClient[clientid] @@ -250,7 +218,7 @@ func (tcpService *TcpService) GetClientIp(clientid uint64) string{ } -func (tcpService *TcpService) SendRawMsg(clientId uint64,msg []byte) error{ +func (tcpService *TcpService) SendRawMsg(clientId string,msg []byte) error{ tcpService.mapClientLocker.Lock() client,ok := tcpService.mapClient[clientId] if ok == false{ @@ -261,7 +229,7 @@ func (tcpService *TcpService) SendRawMsg(clientId uint64,msg []byte) error{ return client.tcpConn.WriteMsg(msg) } -func (tcpService *TcpService) SendRawData(clientId uint64,data []byte) error{ +func (tcpService *TcpService) SendRawData(clientId string,data []byte) error{ tcpService.mapClientLocker.Lock() client,ok := tcpService.mapClient[clientId] if ok == false{ diff --git a/sysservice/wsservice/wsservice.go b/sysservice/wsservice/wsservice.go index 0f3a0be..bb52b3f 100644 --- a/sysservice/wsservice/wsservice.go +++ b/sysservice/wsservice/wsservice.go @@ -8,24 +8,19 @@ import ( "github.com/duanhf2012/origin/v2/network/processor" "github.com/duanhf2012/origin/v2/service" "sync" - "sync/atomic" - "time" + "github.com/google/uuid" + "strings" ) - - type WSService struct { service.Service wsServer network.WSServer mapClientLocker sync.RWMutex - mapClient map[uint64] *WSClient + mapClient map[string] *WSClient process processor.IProcessor - machineId uint16 } -var seed uint32 - type WSPackType int8 const( WPT_Connected WSPackType = 0 @@ -38,14 +33,8 @@ const Default_WS_MaxConnNum = 3000 const Default_WS_PendingWriteNum = 10000 const Default_WS_MaxMsgLen = 65535 -const ( - MaxMachineId = 1<<14 - 1 //最大值 16383 - MaxSeed = 1<<19 - 1 //最大值 524287 - MaxTime = 1<<31 - 1 //最大值 2147483647 -) - type WSClient struct { - id uint64 + id string wsConn *network.WSConn wsService *WSService } @@ -53,7 +42,7 @@ type WSClient struct { type WSPack struct { Type WSPackType //0表示连接 1表示断开 2表示数据 MsgProcessor processor.IProcessor - ClientId uint64 + ClientId string Data interface{} } @@ -77,15 +66,7 @@ func (ws *WSService) OnInit() error{ if ok == true { ws.wsServer.MaxConnNum = int(MaxConnNum.(float64)) } - MachineId,ok := wsCfg["MachineId"] - if ok == true { - ws.machineId = uint16(MachineId.(float64)) - if ws.machineId > MaxMachineId { - return fmt.Errorf("MachineId is error!") - } - }else { - return fmt.Errorf("MachineId is error!") - } + PendingWriteNum,ok := wsCfg["PendingWriteNum"] if ok == true { ws.wsServer.PendingWriteNum = int(PendingWriteNum.(float64)) @@ -96,7 +77,7 @@ func (ws *WSService) OnInit() error{ ws.wsServer.MaxMsgLen = uint32(MaxMsgLen.(float64)) } - ws.mapClient = make( map[uint64] *WSClient, ws.wsServer.MaxConnNum) + ws.mapClient = make( map[string] *WSClient, ws.wsServer.MaxConnNum) ws.wsServer.NewAgent = ws.NewWSClient ws.wsServer.Start() return nil @@ -125,33 +106,21 @@ func (ws *WSService) SetProcessor(process processor.IProcessor,handler event.IEv ws.RegEventReceiverFunc(event.Sys_Event_WebSocket,handler, ws.WSEventHandler) } -func (ws *WSService) genId() uint64 { - newSeed := atomic.AddUint32(&seed,1) % MaxSeed - nowTime := uint64(time.Now().Unix())%MaxTime - return (uint64(ws.machineId)<<50)|(nowTime<<19)|uint64(newSeed) -} - func (ws *WSService) NewWSClient(conn *network.WSConn) network.Agent { ws.mapClientLocker.Lock() defer ws.mapClientLocker.Unlock() - for { - clientId := ws.genId() - _,ok := ws.mapClient[clientId] - if ok == true { - continue - } - - pClient := &WSClient{wsConn:conn, id: clientId} - pClient.wsService = ws - ws.mapClient[clientId] = pClient - return pClient - } + uuId, _ := uuid.NewUUID() + clientId := strings.ReplaceAll(uuId.String(), "-", "") + pClient := &WSClient{wsConn: conn, id: clientId} + pClient.wsService = ws + ws.mapClient[clientId] = pClient + return pClient return nil } -func (slf *WSClient) GetId() uint64 { +func (slf *WSClient) GetId() string { return slf.id } @@ -179,7 +148,7 @@ func (slf *WSClient) OnClose(){ delete (slf.wsService.mapClient,slf.GetId()) } -func (ws *WSService) SendMsg(clientid uint64,msg interface{}) error{ +func (ws *WSService) SendMsg(clientid string,msg interface{}) error{ ws.mapClientLocker.Lock() client,ok := ws.mapClient[clientid] if ok == false{ @@ -195,7 +164,7 @@ func (ws *WSService) SendMsg(clientid uint64,msg interface{}) error{ return client.wsConn.WriteMsg(bytes) } -func (ws *WSService) Close(clientid uint64) { +func (ws *WSService) Close(clientid string) { ws.mapClientLocker.Lock() defer ws.mapClientLocker.Unlock()