优化网络连接Id生成规则&优化WebSocket服务

This commit is contained in:
orgin
2022-06-02 16:09:16 +08:00
parent a4f425bd69
commit 776b234022
6 changed files with 82 additions and 25 deletions

View File

@@ -5,6 +5,7 @@ import (
"fmt"
"github.com/duanhf2012/origin/network"
"reflect"
"github.com/duanhf2012/origin/log"
)
type MessageJsonInfo struct {
@@ -104,15 +105,25 @@ func (jsonProcessor *JsonProcessor) MakeRawMsg(msgType uint16,msg []byte) *JsonP
}
func (jsonProcessor *JsonProcessor) UnknownMsgRoute(clientId uint64,msg interface{}){
if jsonProcessor.unknownMessageHandler==nil {
log.SDebug("Unknown message received from ",clientId)
return
}
jsonProcessor.unknownMessageHandler(clientId,msg.([]byte))
}
func (jsonProcessor *JsonProcessor) ConnectedRoute(clientId uint64){
jsonProcessor.connectHandler(clientId)
if jsonProcessor.connectHandler != nil {
jsonProcessor.connectHandler(clientId)
}
}
func (jsonProcessor *JsonProcessor) DisConnectedRoute(clientId uint64){
jsonProcessor.disconnectHandler(clientId)
if jsonProcessor.disconnectHandler != nil {
jsonProcessor.disconnectHandler(clientId)
}
}
func (jsonProcessor *JsonProcessor) RegisterUnknownMsg(unknownMessageHandler UnknownMessageJsonHandler){

View File

@@ -21,6 +21,7 @@ type WSClient struct {
cons WebsocketConnSet
wg sync.WaitGroup
closeFlag bool
messageType int
}
func (client *WSClient) Start() {
@@ -62,7 +63,7 @@ func (client *WSClient) init() {
if client.cons != nil {
log.SFatal("client is running")
}
client.messageType = websocket.TextMessage
client.cons = make(WebsocketConnSet)
client.closeFlag = false
client.dialer = websocket.Dialer{
@@ -83,6 +84,9 @@ func (client *WSClient) dial() *websocket.Conn {
}
}
func (client *WSClient) SetMessageType(messageType int){
client.messageType = messageType
}
func (client *WSClient) connect() {
defer client.wg.Done()
@@ -102,7 +106,7 @@ reconnect:
client.cons[conn] = struct{}{}
client.Unlock()
wsConn := newWSConn(conn, client.PendingWriteNum, client.MaxMsgLen)
wsConn := newWSConn(conn, client.PendingWriteNum, client.MaxMsgLen,client.messageType)
agent := client.NewAgent(wsConn)
agent.Run()

View File

@@ -18,7 +18,7 @@ type WSConn struct {
closeFlag bool
}
func newWSConn(conn *websocket.Conn, pendingWriteNum int, maxMsgLen uint32) *WSConn {
func newWSConn(conn *websocket.Conn, pendingWriteNum int, maxMsgLen uint32,messageType int) *WSConn {
wsConn := new(WSConn)
wsConn.conn = conn
wsConn.writeChan = make(chan []byte, pendingWriteNum)
@@ -30,7 +30,7 @@ func newWSConn(conn *websocket.Conn, pendingWriteNum int, maxMsgLen uint32) *WSC
break
}
err := conn.WriteMessage(websocket.BinaryMessage, b)
err := conn.WriteMessage(messageType, b)
if err != nil {
break
}

View File

@@ -21,6 +21,7 @@ type WSServer struct {
NewAgent func(*WSConn) Agent
ln net.Listener
handler *WSHandler
messageType int
}
type WSHandler struct {
@@ -32,6 +33,11 @@ type WSHandler struct {
conns WebsocketConnSet
mutexConns sync.Mutex
wg sync.WaitGroup
messageType int
}
func (handler *WSHandler) SetMessageType(messageType int){
handler.messageType = messageType
}
func (handler *WSHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
@@ -45,6 +51,7 @@ func (handler *WSHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return
}
conn.SetReadLimit(int64(handler.maxMsgLen))
handler.messageType = websocket.TextMessage
handler.wg.Add(1)
defer handler.wg.Done()
@@ -64,7 +71,7 @@ func (handler *WSHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
handler.conns[conn] = struct{}{}
handler.mutexConns.Unlock()
wsConn := newWSConn(conn, handler.pendingWriteNum, handler.maxMsgLen)
wsConn := newWSConn(conn, handler.pendingWriteNum, handler.maxMsgLen,handler.messageType)
agent := handler.newAgent(wsConn)
agent.Run()
@@ -76,6 +83,13 @@ func (handler *WSHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
agent.OnClose()
}
func (server *WSServer) SetMessageType(messageType int){
server.messageType = messageType
if server.handler!= nil {
server.handler.SetMessageType(messageType)
}
}
func (server *WSServer) Start() {
ln, err := net.Listen("tcp", server.Addr)
if err != nil {

View File

@@ -8,6 +8,7 @@ import (
"github.com/duanhf2012/origin/network/processor"
"github.com/duanhf2012/origin/node"
"github.com/duanhf2012/origin/service"
"sync/atomic"
"sync"
"time"
"runtime"
@@ -42,12 +43,12 @@ const Default_ReadDeadline = 180 //30s
const Default_WriteDeadline = 180 //30s
const (
MaxNodeId = 1<<10 - 1 //Uint10
MaxSeed = 1<<22 - 1 //MaxUint24
MaxNodeId = 1<<14 - 1 //最大值 16383
MaxSeed = 1<<19 - 1 //最大值 524287
MaxTime = 1<<31 - 1 //最大值 2147483647
)
var seed uint32
var seedLocker sync.Mutex
type TcpPack struct {
Type TcpPackType //0表示连接 1表示断开 2表示数据
@@ -66,14 +67,12 @@ func (tcpService *TcpService) genId() uint64 {
panic("nodeId exceeds the maximum!")
}
seedLocker.Lock()
seed = (seed+1)%MaxSeed
seedLocker.Unlock()
nowTime := uint64(time.Now().Second())
return (uint64(node.GetNodeId())<<54)|(nowTime<<22)|uint64(seed)
newSeed := atomic.AddUint32(&seed,1) % MaxSeed
nowTime := uint64(time.Now().Unix())%MaxTime
return (uint64(node.GetNodeId())<<50)|(nowTime<<19)|uint64(newSeed)
}
func GetNodeId(agentId uint64) int {
return int(agentId>>54)
}

View File

@@ -7,19 +7,27 @@ import (
"github.com/duanhf2012/origin/network"
"github.com/duanhf2012/origin/network/processor"
"github.com/duanhf2012/origin/service"
"github.com/duanhf2012/origin/node"
"sync"
"sync/atomic"
"time"
)
type WSService struct {
service.Service
wsServer network.WSServer
mapClientLocker sync.RWMutex
mapClient map[uint64] *WSClient
initClientId uint64
process processor.IProcessor
}
var seed uint32
type WSPackType int8
const(
WPT_Connected WSPackType = 0
@@ -32,6 +40,12 @@ const Default_WS_MaxConnNum = 3000
const Default_WS_PendingWriteNum = 10000
const Default_WS_MaxMsgLen = 65535
const (
MaxNodeId = 1<<14 - 1 //最大值 16383
MaxSeed = 1<<19 - 1 //最大值 524287
MaxTime = 1<<31 - 1 //最大值 2147483647
)
type WSClient struct {
id uint64
wsConn *network.WSConn
@@ -46,6 +60,7 @@ type WSPack struct {
}
func (ws *WSService) OnInit() error{
iConfig := ws.GetServiceCfg()
if iConfig == nil {
return fmt.Errorf("%s service config is error!", ws.GetName())
@@ -80,6 +95,10 @@ func (ws *WSService) OnInit() error{
return nil
}
func (ws *WSService) SetMessageType(messageType int){
ws.wsServer.SetMessageType(messageType)
}
func (ws *WSService) WSEventHandler(ev event.IEvent) {
pack := ev.(*event.Event).Data.(*WSPack)
switch pack.Type {
@@ -88,9 +107,9 @@ func (ws *WSService) WSEventHandler(ev event.IEvent) {
case WPT_DisConnected:
pack.MsgProcessor.DisConnectedRoute(pack.ClientId)
case WPT_UnknownPack:
pack.MsgProcessor.UnknownMsgRoute(pack.Data,pack.ClientId)
pack.MsgProcessor.UnknownMsgRoute(pack.ClientId,pack.Data)
case WPT_Pack:
pack.MsgProcessor.MsgRoute(pack.Data, pack.ClientId)
pack.MsgProcessor.MsgRoute(pack.ClientId,pack.Data)
}
}
@@ -99,20 +118,30 @@ func (ws *WSService) SetProcessor(process processor.IProcessor,handler event.IEv
ws.RegEventReceiverFunc(event.Sys_Event_WebSocket,handler, ws.WSEventHandler)
}
func (ws *WSService) genId() uint64 {
if node.GetNodeId()>MaxNodeId{
panic("nodeId exceeds the maximum!")
}
newSeed := atomic.AddUint32(&seed,1) % MaxSeed
nowTime := uint64(time.Now().Unix())%MaxTime
return (uint64(node.GetNodeId())<<50)|(nowTime<<19)|uint64(newSeed)
}
func (ws *WSService) NewWSClient(conn *network.WSConn) network.Agent {
ws.mapClientLocker.Lock()
defer ws.mapClientLocker.Unlock()
for {
ws.initClientId+=1
_,ok := ws.mapClient[ws.initClientId]
clientId := ws.genId()
_,ok := ws.mapClient[clientId]
if ok == true {
continue
}
pClient := &WSClient{wsConn:conn, id: ws.initClientId}
pClient := &WSClient{wsConn:conn, id: clientId}
pClient.wsService = ws
ws.mapClient[ws.initClientId] = pClient
ws.mapClient[clientId] = pClient
return pClient
}
@@ -131,7 +160,7 @@ func (slf *WSClient) Run() {
log.Debug("read client id %d is error:%+v",slf.id,err)
break
}
data,err:=slf.wsService.process.Unmarshal(bytes)
data,err:=slf.wsService.process.Unmarshal(slf.id,bytes)
if err != nil {
slf.wsService.NotifyEvent(&event.Event{Type:event.Sys_Event_WebSocket,Data:&WSPack{ClientId:slf.id,Type:WPT_UnknownPack,Data:bytes,MsgProcessor:slf.wsService.process}})
continue
@@ -156,7 +185,7 @@ func (ws *WSService) SendMsg(clientid uint64,msg interface{}) error{
}
ws.mapClientLocker.Unlock()
bytes,err := ws.process.Marshal(msg)
bytes,err := ws.process.Marshal(clientid,msg)
if err != nil {
return err
}