mirror of
https://github.com/duanhf2012/origin.git
synced 2026-02-04 15:04:45 +08:00
Compare commits
7 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
6f87cda07f | ||
|
|
2e4863d073 | ||
|
|
330644cebb | ||
|
|
ef7ee0ab8e | ||
|
|
976efe0c04 | ||
|
|
b61906fb24 | ||
|
|
29dcf2bfeb |
@@ -411,13 +411,13 @@ func GetNodeByServiceName(serviceName string) map[string]struct{} {
|
||||
return mapNodeId
|
||||
}
|
||||
|
||||
// GetNodeByTemplateServiceName 通过模板服务名获取服务名,返回 map[serviceName真实服务名]NodeId
|
||||
func GetNodeByTemplateServiceName(templateServiceName string) map[string]string {
|
||||
// GetNodeByTemplateServiceName 通过模板服务名获取服务名,返回 map[serviceName真实服务名][]NodeId
|
||||
func GetNodeByTemplateServiceName(templateServiceName string) map[string][]string {
|
||||
cluster.locker.RLock()
|
||||
defer cluster.locker.RUnlock()
|
||||
|
||||
mapServiceName := cluster.mapTemplateServiceNode[templateServiceName]
|
||||
mapNodeId := make(map[string]string, 9)
|
||||
mapNodeId := make(map[string][]string, 9)
|
||||
for serviceName := range mapServiceName {
|
||||
mapNode, ok := cluster.mapServiceNode[serviceName]
|
||||
if ok == false {
|
||||
@@ -425,7 +425,9 @@ func GetNodeByTemplateServiceName(templateServiceName string) map[string]string
|
||||
}
|
||||
|
||||
for nodeId := range mapNode {
|
||||
mapNodeId[serviceName] = nodeId
|
||||
nodes := mapNodeId[serviceName]
|
||||
nodes = append(nodes, nodeId)
|
||||
mapNodeId[serviceName] = nodes
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -507,6 +507,30 @@ func (cls *Cluster) IsConfigService(serviceName string) bool {
|
||||
return ok
|
||||
}
|
||||
|
||||
// GetServiceInfoByTemplateService 通过模板服务名,获取map[服务名]map[NodeId]struct{}
|
||||
func (cls *Cluster) GetServiceInfoByTemplateService(templateServiceName string) map[string]map[string]struct{} {
|
||||
mapService := map[string]map[string]struct{}{}
|
||||
cls.locker.RLock()
|
||||
defer cls.locker.RUnlock()
|
||||
|
||||
mapServiceName := cls.mapTemplateServiceNode[templateServiceName]
|
||||
for serviceName := range mapServiceName {
|
||||
mapNodeId, ok := cls.mapServiceNode[serviceName]
|
||||
if ok == true {
|
||||
for nodeId := range mapNodeId{
|
||||
mapNodeIds:=mapService[serviceName]
|
||||
if mapNodeIds==nil {
|
||||
mapNodeIds = map[string]struct{}{}
|
||||
mapService[serviceName] = mapNodeIds
|
||||
}
|
||||
mapNodeIds[nodeId] = struct{}{}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return mapService
|
||||
}
|
||||
|
||||
func (cls *Cluster) GetNodeIdByTemplateService(templateServiceName string, rpcClientList []*rpc.Client, filterRetire bool) (error, []*rpc.Client) {
|
||||
cls.locker.RLock()
|
||||
defer cls.locker.RUnlock()
|
||||
|
||||
2
go.mod
2
go.mod
@@ -6,7 +6,7 @@ toolchain go1.22.7
|
||||
|
||||
require (
|
||||
github.com/IBM/sarama v1.43.3
|
||||
github.com/duanhf2012/rotatelogs v0.0.0-20250124024205-39765c212d8a
|
||||
github.com/duanhf2012/rotatelogs v0.0.0-20260202065658-f38ef69c6a39
|
||||
github.com/gin-gonic/gin v1.10.0
|
||||
github.com/go-sql-driver/mysql v1.6.0
|
||||
github.com/goccy/go-json v0.10.2
|
||||
|
||||
4
go.sum
4
go.sum
@@ -20,8 +20,8 @@ github.com/coreos/go-systemd/v22 v22.3.2/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSV
|
||||
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
|
||||
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/duanhf2012/rotatelogs v0.0.0-20250124024205-39765c212d8a h1:BVmZrOSKTg9ry1YjqY6IjVXmBDsFdX/W+pnvO5cPUDc=
|
||||
github.com/duanhf2012/rotatelogs v0.0.0-20250124024205-39765c212d8a/go.mod h1:S/NNkpdnXps6VXaYVVDFtqQAm/NKayHxxOAhsrFnCgg=
|
||||
github.com/duanhf2012/rotatelogs v0.0.0-20260202065658-f38ef69c6a39 h1:T+lS1jdEUNgkx3gG6MrO4rgkf8jGJNHSfRLKsLz31MM=
|
||||
github.com/duanhf2012/rotatelogs v0.0.0-20260202065658-f38ef69c6a39/go.mod h1:S/NNkpdnXps6VXaYVVDFtqQAm/NKayHxxOAhsrFnCgg=
|
||||
github.com/eapache/go-resiliency v1.7.0 h1:n3NRTnBn5N0Cbi/IeOHuQn9s2UwVUH7Ga0ZWcP+9JTA=
|
||||
github.com/eapache/go-resiliency v1.7.0/go.mod h1:5yPzW0MIvSe0JDsv0v+DvcjEv2FyD6iZYSs1ZI+iQho=
|
||||
github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 h1:Oy0F4ALJ04o5Qqpdz8XLIpNA3WM/iSIXqxtqo7UGVws=
|
||||
|
||||
@@ -142,7 +142,7 @@ func (logger *Logger) NewRotatelogsWriter() zapcore.WriteSyncer {
|
||||
options = append(options, rotatelogs.WithRotationTime(time.Hour*24))
|
||||
|
||||
fileName := strings.TrimRight(logger.FileName, filepath.Ext(logger.FileName))
|
||||
rotateLogs, err := rotatelogs.NewRotateLogs(LogPath, "20060102/"+fileName+"_20060102_150405", options...)
|
||||
rotateLogs, err := rotatelogs.NewRotateLogs(LogPath, fileName,"20060102","_20060102_150405", options...)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
@@ -69,15 +69,15 @@ func (client *KCPClient) init() {
|
||||
if client.MinMsgLen == 0 {
|
||||
client.MinMsgLen = Default_MinMsgLen
|
||||
}
|
||||
if client.MaxMsgLen == 0 {
|
||||
client.MaxMsgLen = Default_MaxMsgLen
|
||||
if client.MaxReadMsgLen == 0 {
|
||||
client.MaxReadMsgLen = Default_MaxReadMsgLen
|
||||
}
|
||||
if client.LenMsgLen == 0 {
|
||||
client.LenMsgLen = Default_LenMsgLen
|
||||
}
|
||||
maxMsgLen := client.MsgParser.getMaxMsgLen()
|
||||
if client.MaxMsgLen > maxMsgLen {
|
||||
client.MaxMsgLen = maxMsgLen
|
||||
if client.MaxReadMsgLen > maxMsgLen {
|
||||
client.MaxReadMsgLen = maxMsgLen
|
||||
log.Info("invalid MaxMsgLen", log.Uint32("reset", maxMsgLen))
|
||||
}
|
||||
|
||||
|
||||
@@ -81,7 +81,8 @@ type KcpCfg struct {
|
||||
LittleEndian bool //是否小端序
|
||||
LenMsgLen int //消息头占用byte数量,只能是1byte,2byte,4byte。如果是4byte,意味着消息最大可以是math.MaxUint32(4GB)
|
||||
MinMsgLen uint32 //最小消息长度
|
||||
MaxMsgLen uint32 //最大消息长度,超过判定不合法,断开连接
|
||||
MaxReadMsgLen uint32 //最大读消息长度,超过判定不合法,断开连接
|
||||
MaxWriteMsgLen uint32 // 最大写消息长度
|
||||
PendingWriteNum int //写channel最大消息数量
|
||||
}
|
||||
|
||||
@@ -89,7 +90,8 @@ func (kp *KCPServer) Init(kcpCfg *KcpCfg) {
|
||||
kp.kcpCfg = kcpCfg
|
||||
kp.msgParser.Init()
|
||||
kp.msgParser.LenMsgLen = kp.kcpCfg.LenMsgLen
|
||||
kp.msgParser.MaxMsgLen = kp.kcpCfg.MaxMsgLen
|
||||
kp.msgParser.MaxReadMsgLen = kp.kcpCfg.MaxReadMsgLen
|
||||
kp.msgParser.MaxWriteMsgLen = kp.kcpCfg.MaxWriteMsgLen
|
||||
kp.msgParser.MinMsgLen = kp.kcpCfg.MinMsgLen
|
||||
kp.msgParser.LittleEndian = kp.kcpCfg.LittleEndian
|
||||
|
||||
|
||||
@@ -45,9 +45,8 @@ func (jsonProcessor *JsonProcessor) SetByteOrder(littleEndian bool) {
|
||||
}
|
||||
|
||||
// MsgRoute must goroutine safe
|
||||
func (jsonProcessor *JsonProcessor) MsgRoute(clientId string, msg interface{}, recyclerReaderBytes func(data []byte)) error {
|
||||
func (jsonProcessor *JsonProcessor) MsgRoute(clientId string, msg interface{}) error {
|
||||
pPackInfo := msg.(*JsonPackInfo)
|
||||
defer recyclerReaderBytes(pPackInfo.rawMsg)
|
||||
|
||||
v, ok := jsonProcessor.mapMsg[pPackInfo.typ]
|
||||
if ok == false {
|
||||
@@ -107,8 +106,7 @@ func (jsonProcessor *JsonProcessor) MakeRawMsg(msgType uint16, msg []byte) *Json
|
||||
return &JsonPackInfo{typ: msgType, rawMsg: msg}
|
||||
}
|
||||
|
||||
func (jsonProcessor *JsonProcessor) UnknownMsgRoute(clientId string, msg interface{}, recyclerReaderBytes func(data []byte)) {
|
||||
defer recyclerReaderBytes(msg.([]byte))
|
||||
func (jsonProcessor *JsonProcessor) UnknownMsgRoute(clientId string, msg interface{}) {
|
||||
if jsonProcessor.unknownMessageHandler == nil {
|
||||
log.Debug("Unknown message", log.String("clientId", clientId))
|
||||
return
|
||||
|
||||
@@ -54,10 +54,8 @@ func (slf *PBPackInfo) GetMsg() proto.Message {
|
||||
}
|
||||
|
||||
// MsgRoute must goroutine safe
|
||||
func (pbProcessor *PBProcessor) MsgRoute(clientId string, msg interface{}, recyclerReaderBytes func(data []byte)) error {
|
||||
func (pbProcessor *PBProcessor) MsgRoute(clientId string, msg interface{}) error {
|
||||
pPackInfo := msg.(*PBPackInfo)
|
||||
defer recyclerReaderBytes(pPackInfo.rawMsg)
|
||||
|
||||
v, ok := pbProcessor.mapMsg[pPackInfo.typ]
|
||||
if ok == false {
|
||||
return fmt.Errorf("cannot find msgtype %d is register", pPackInfo.typ)
|
||||
@@ -134,9 +132,8 @@ func (pbProcessor *PBProcessor) MakeRawMsg(msgType uint16, msg []byte) *PBPackIn
|
||||
return &PBPackInfo{typ: msgType, rawMsg: msg}
|
||||
}
|
||||
|
||||
func (pbProcessor *PBProcessor) UnknownMsgRoute(clientId string, msg interface{}, recyclerReaderBytes func(data []byte)) {
|
||||
func (pbProcessor *PBProcessor) UnknownMsgRoute(clientId string, msg interface{}) {
|
||||
pbProcessor.unknownMessageHandler(clientId, msg.([]byte))
|
||||
recyclerReaderBytes(msg.([]byte))
|
||||
}
|
||||
|
||||
func (pbProcessor *PBProcessor) ConnectedRoute(clientId string) {
|
||||
|
||||
@@ -10,7 +10,7 @@ type RawMessageInfo struct {
|
||||
msgHandler RawMessageHandler
|
||||
}
|
||||
|
||||
type RawMessageHandler func(clientId string, packType uint16, msg []byte)
|
||||
type RawMessageHandler func(clientId string, packType uint16,additionData any, msg []byte)
|
||||
type RawConnectHandler func(clientId string)
|
||||
type UnknownRawMessageHandler func(clientId string, msg []byte)
|
||||
|
||||
@@ -39,10 +39,9 @@ func (pbRawProcessor *PBRawProcessor) SetByteOrder(littleEndian bool) {
|
||||
}
|
||||
|
||||
// MsgRoute must goroutine safe
|
||||
func (pbRawProcessor *PBRawProcessor) MsgRoute(clientId string, msg interface{}, recyclerReaderBytes func(data []byte)) error {
|
||||
func (pbRawProcessor *PBRawProcessor) MsgRoute(clientId string, msg interface{}) error {
|
||||
pPackInfo := msg.(*PBRawPackInfo)
|
||||
pbRawProcessor.msgHandler(clientId, pPackInfo.typ, pPackInfo.rawMsg)
|
||||
recyclerReaderBytes(pPackInfo.rawMsg)
|
||||
pbRawProcessor.msgHandler(clientId, pPackInfo.typ, nil, pPackInfo.rawMsg)
|
||||
|
||||
return nil
|
||||
}
|
||||
@@ -83,8 +82,7 @@ func (pbRawProcessor *PBRawProcessor) MakeRawMsg(msgType uint16, msg []byte, pbR
|
||||
pbRawPackInfo.rawMsg = msg
|
||||
}
|
||||
|
||||
func (pbRawProcessor *PBRawProcessor) UnknownMsgRoute(clientId string, msg interface{}, recyclerReaderBytes func(data []byte)) {
|
||||
defer recyclerReaderBytes(msg.([]byte))
|
||||
func (pbRawProcessor *PBRawProcessor) UnknownMsgRoute(clientId string, msg interface{}) {
|
||||
if pbRawProcessor.unknownMessageHandler == nil {
|
||||
return
|
||||
}
|
||||
|
||||
@@ -2,9 +2,9 @@ package processor
|
||||
|
||||
type IProcessor interface {
|
||||
// MsgRoute must goroutine safe
|
||||
MsgRoute(clientId string, msg interface{}, recyclerReaderBytes func(data []byte)) error
|
||||
MsgRoute(clientId string, msg interface{}) error
|
||||
// UnknownMsgRoute must goroutine safe
|
||||
UnknownMsgRoute(clientId string, msg interface{}, recyclerReaderBytes func(data []byte))
|
||||
UnknownMsgRoute(clientId string, msg interface{})
|
||||
// ConnectedRoute connect event
|
||||
ConnectedRoute(clientId string)
|
||||
DisConnectedRoute(clientId string)
|
||||
@@ -20,7 +20,6 @@ type IRawProcessor interface {
|
||||
|
||||
SetByteOrder(littleEndian bool)
|
||||
SetRawMsgHandler(handle RawMessageHandler)
|
||||
MakeRawMsg(msgType uint16, msg []byte, pbRawPackInfo *PBRawPackInfo)
|
||||
SetUnknownMsgHandler(unknownMessageHandler UnknownRawMessageHandler)
|
||||
SetConnectedHandler(connectHandler RawConnectHandler)
|
||||
SetDisConnectedHandler(disconnectHandler RawConnectHandler)
|
||||
|
||||
@@ -68,18 +68,19 @@ func (client *TCPClient) init() {
|
||||
if client.MinMsgLen == 0 {
|
||||
client.MinMsgLen = Default_MinMsgLen
|
||||
}
|
||||
if client.MaxMsgLen == 0 {
|
||||
client.MaxMsgLen = Default_MaxMsgLen
|
||||
if client.MaxReadMsgLen == 0 {
|
||||
client.MaxReadMsgLen = Default_MaxReadMsgLen
|
||||
}
|
||||
if client.LenMsgLen == 0 {
|
||||
client.LenMsgLen = Default_LenMsgLen
|
||||
}
|
||||
maxMsgLen := client.MsgParser.getMaxMsgLen()
|
||||
if client.MaxMsgLen > maxMsgLen {
|
||||
client.MaxMsgLen = maxMsgLen
|
||||
if client.MaxReadMsgLen > maxMsgLen {
|
||||
client.MaxReadMsgLen = maxMsgLen
|
||||
log.Info("invalid MaxMsgLen", log.Uint32("reset", maxMsgLen))
|
||||
}
|
||||
|
||||
|
||||
client.cons = make(ConnSet)
|
||||
client.closeFlag = false
|
||||
client.MsgParser.Init()
|
||||
|
||||
@@ -14,7 +14,8 @@ import (
|
||||
type MsgParser struct {
|
||||
LenMsgLen int
|
||||
MinMsgLen uint32
|
||||
MaxMsgLen uint32
|
||||
MaxReadMsgLen uint32
|
||||
MaxWriteMsgLen uint32
|
||||
LittleEndian bool
|
||||
|
||||
bytespool.IBytesMemPool
|
||||
@@ -67,7 +68,7 @@ func (p *MsgParser) Read(r io.Reader) ([]byte, error) {
|
||||
}
|
||||
|
||||
// check len
|
||||
if msgLen > p.MaxMsgLen {
|
||||
if msgLen > p.MaxReadMsgLen {
|
||||
return nil, errors.New("message too long")
|
||||
} else if msgLen < p.MinMsgLen {
|
||||
return nil, errors.New("message too short")
|
||||
@@ -92,7 +93,7 @@ func (p *MsgParser) Write(conn io.Writer, args ...[]byte) error {
|
||||
}
|
||||
|
||||
// check len
|
||||
if msgLen > p.MaxMsgLen {
|
||||
if p.MaxWriteMsgLen > 0 && msgLen > p.MaxWriteMsgLen {
|
||||
return errors.New("message too long")
|
||||
} else if msgLen < p.MinMsgLen {
|
||||
return errors.New("message too short")
|
||||
|
||||
@@ -11,13 +11,13 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
Default_ReadDeadline = time.Second * 30 //默认读超时30s
|
||||
Default_WriteDeadline = time.Second * 30 //默认写超时30s
|
||||
Default_MaxConnNum = 1000000 //默认最大连接数
|
||||
Default_PendingWriteNum = 100000 //单连接写消息Channel容量
|
||||
Default_MinMsgLen = 2 //最小消息长度2byte
|
||||
Default_LenMsgLen = 2 //包头字段长度占用2byte
|
||||
Default_MaxMsgLen = 65535 //最大消息长度
|
||||
Default_ReadDeadline = time.Second * 30 // 默认读超时30s
|
||||
Default_WriteDeadline = time.Second * 30 // 默认写超时30s
|
||||
Default_MaxConnNum = 1000000 // 默认最大连接数
|
||||
Default_PendingWriteNum = 100000 // 单连接写消息Channel容量
|
||||
Default_MinMsgLen = 2 // 最小消息长度2byte
|
||||
Default_LenMsgLen = 2 // 包头字段长度占用2byte
|
||||
Default_MaxReadMsgLen = 65535 // 最大读消息长度
|
||||
)
|
||||
|
||||
type TCPServer struct {
|
||||
@@ -70,14 +70,14 @@ func (server *TCPServer) init() error {
|
||||
log.Info("invalid LenMsgLen", log.Int("reset", server.LenMsgLen))
|
||||
}
|
||||
|
||||
if server.MaxMsgLen <= 0 {
|
||||
server.MaxMsgLen = Default_MaxMsgLen
|
||||
log.Info("invalid MaxMsgLen", log.Uint32("reset to", server.MaxMsgLen))
|
||||
if server.MaxReadMsgLen <= 0 {
|
||||
server.MaxReadMsgLen = Default_MaxReadMsgLen
|
||||
log.Info("invalid MaxMsgLen", log.Uint32("reset to", server.MaxReadMsgLen))
|
||||
}
|
||||
|
||||
maxMsgLen := server.MsgParser.getMaxMsgLen()
|
||||
if server.MaxMsgLen > maxMsgLen {
|
||||
server.MaxMsgLen = maxMsgLen
|
||||
if server.MaxReadMsgLen > maxMsgLen {
|
||||
server.MaxReadMsgLen = maxMsgLen
|
||||
log.Info("invalid MaxMsgLen", log.Uint32("reset", maxMsgLen))
|
||||
}
|
||||
|
||||
|
||||
@@ -15,16 +15,16 @@ type WSConn struct {
|
||||
sync.Mutex
|
||||
conn *websocket.Conn
|
||||
writeChan chan []byte
|
||||
maxMsgLen uint32
|
||||
maxWriteMsgLen uint32
|
||||
closeFlag bool
|
||||
header http.Header
|
||||
}
|
||||
|
||||
func newWSConn(conn *websocket.Conn, header http.Header, pendingWriteNum int, maxMsgLen uint32, messageType int) *WSConn {
|
||||
func newWSConn(conn *websocket.Conn, header http.Header, pendingWriteNum int, maxWriteMsgLen uint32, messageType int) *WSConn {
|
||||
wsConn := new(WSConn)
|
||||
wsConn.conn = conn
|
||||
wsConn.writeChan = make(chan []byte, pendingWriteNum)
|
||||
wsConn.maxMsgLen = maxMsgLen
|
||||
wsConn.maxWriteMsgLen = maxWriteMsgLen
|
||||
wsConn.header = header
|
||||
|
||||
go func() {
|
||||
@@ -118,7 +118,7 @@ func (wsConn *WSConn) WriteMsg(args ...[]byte) error {
|
||||
}
|
||||
|
||||
// check len
|
||||
if msgLen > wsConn.maxMsgLen {
|
||||
if wsConn.maxWriteMsgLen > 0 && msgLen > wsConn.maxWriteMsgLen {
|
||||
return errors.New("message too long")
|
||||
} else if msgLen < 1 {
|
||||
return errors.New("message too short")
|
||||
|
||||
@@ -16,7 +16,8 @@ type WSServer struct {
|
||||
Addr string
|
||||
MaxConnNum int
|
||||
PendingWriteNum int
|
||||
MaxMsgLen uint32
|
||||
MaxReadMsgLen uint32
|
||||
MaxWriteMsgLen uint32
|
||||
CertFile string
|
||||
KeyFile string
|
||||
NewAgent func(*WSConn) Agent
|
||||
@@ -32,7 +33,8 @@ type WSServer struct {
|
||||
type WSHandler struct {
|
||||
maxConnNum int
|
||||
pendingWriteNum int
|
||||
maxMsgLen uint32
|
||||
maxReadMsgLen uint32
|
||||
maxWriteMsgLen uint32
|
||||
newAgent func(*WSConn) Agent
|
||||
upgrader websocket.Upgrader
|
||||
conns WebsocketConnSet
|
||||
@@ -55,7 +57,7 @@ func (handler *WSHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
log.Error("upgrade fail", log.String("error", err.Error()))
|
||||
return
|
||||
}
|
||||
conn.SetReadLimit(int64(handler.maxMsgLen))
|
||||
conn.SetReadLimit(int64(handler.maxReadMsgLen))
|
||||
if handler.messageType == 0 {
|
||||
handler.messageType = websocket.TextMessage
|
||||
}
|
||||
@@ -93,7 +95,7 @@ func (handler *WSHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
c.SetLinger(0)
|
||||
c.SetNoDelay(true)
|
||||
wsConn := newWSConn(conn, r.Header, handler.pendingWriteNum, handler.maxMsgLen, handler.messageType)
|
||||
wsConn := newWSConn(conn, r.Header, handler.pendingWriteNum, handler.maxWriteMsgLen, handler.messageType)
|
||||
agent := handler.newAgent(wsConn)
|
||||
agent.Run()
|
||||
|
||||
@@ -118,7 +120,6 @@ func (server *WSServer) Start() error {
|
||||
log.Error("WSServer Listen fail", log.String("error", err.Error()))
|
||||
return err
|
||||
}
|
||||
|
||||
if server.MaxConnNum <= 0 {
|
||||
server.MaxConnNum = 100
|
||||
log.Info("invalid MaxConnNum", log.Int("reset", server.MaxConnNum))
|
||||
@@ -127,9 +128,9 @@ func (server *WSServer) Start() error {
|
||||
server.PendingWriteNum = 100
|
||||
log.Info("invalid PendingWriteNum", log.Int("reset", server.PendingWriteNum))
|
||||
}
|
||||
if server.MaxMsgLen <= 0 {
|
||||
server.MaxMsgLen = 4096
|
||||
log.Info("invalid MaxMsgLen", log.Uint32("reset", server.MaxMsgLen))
|
||||
if server.MaxReadMsgLen <= 0 {
|
||||
server.MaxReadMsgLen = 4096
|
||||
log.Info("invalid MaxReadMsgLen", log.Uint32("reset", server.MaxReadMsgLen))
|
||||
}
|
||||
if server.HandshakeTimeout <= 0 {
|
||||
server.HandshakeTimeout = 15 * time.Second
|
||||
@@ -167,7 +168,8 @@ func (server *WSServer) Start() error {
|
||||
server.handler = &WSHandler{
|
||||
maxConnNum: server.MaxConnNum,
|
||||
pendingWriteNum: server.PendingWriteNum,
|
||||
maxMsgLen: server.MaxMsgLen,
|
||||
maxReadMsgLen: server.MaxReadMsgLen,
|
||||
maxWriteMsgLen: server.MaxWriteMsgLen,
|
||||
newAgent: server.NewAgent,
|
||||
conns: make(WebsocketConnSet),
|
||||
messageType: server.messageType,
|
||||
|
||||
@@ -127,9 +127,11 @@ func NewRClient(targetNodeId string, addr string, maxRpcParamLen uint32, compres
|
||||
c.NewAgent = client.NewClientAgent
|
||||
|
||||
if maxRpcParamLen > 0 {
|
||||
c.MaxMsgLen = maxRpcParamLen
|
||||
c.MaxReadMsgLen = maxRpcParamLen
|
||||
c.MaxWriteMsgLen = maxRpcParamLen
|
||||
} else {
|
||||
c.MaxMsgLen = math.MaxUint32
|
||||
c.MaxReadMsgLen = math.MaxUint32
|
||||
c.MaxWriteMsgLen = math.MaxUint32
|
||||
}
|
||||
client.IRealClient = c
|
||||
client.CallSet = callSet
|
||||
|
||||
@@ -91,9 +91,11 @@ func (server *Server) Start() error {
|
||||
server.rpcServer.Addr = ":" + splitAddr[1]
|
||||
server.rpcServer.MinMsgLen = 2
|
||||
if server.maxRpcParamLen > 0 {
|
||||
server.rpcServer.MaxMsgLen = server.maxRpcParamLen
|
||||
server.rpcServer.MaxReadMsgLen = server.maxRpcParamLen
|
||||
server.rpcServer.MaxWriteMsgLen = server.maxRpcParamLen
|
||||
} else {
|
||||
server.rpcServer.MaxMsgLen = math.MaxUint32
|
||||
server.rpcServer.MaxReadMsgLen = math.MaxUint32
|
||||
server.rpcServer.MaxWriteMsgLen = math.MaxUint32
|
||||
}
|
||||
|
||||
server.rpcServer.MaxConnNum = 100000
|
||||
|
||||
@@ -19,6 +19,7 @@ type KcpModule struct {
|
||||
mapClientLocker sync.RWMutex
|
||||
mapClient map[string]*Client
|
||||
process processor.IRawProcessor
|
||||
newClientIdHandler func() string
|
||||
|
||||
kcpServer network.KCPServer
|
||||
kcpCfg *network.KcpCfg
|
||||
@@ -56,7 +57,11 @@ func (km *KcpModule) OnInit() error {
|
||||
km.process.SetByteOrder(km.kcpCfg.LittleEndian)
|
||||
km.kcpServer.Init(km.kcpCfg)
|
||||
km.kcpServer.NewAgent = km.NewAgent
|
||||
|
||||
if km.newClientIdHandler == nil {
|
||||
km.newClientIdHandler = func()string{
|
||||
return primitive.NewObjectID().Hex()
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -65,6 +70,10 @@ func (km *KcpModule) Init(kcpCfg *network.KcpCfg, process processor.IRawProcesso
|
||||
km.process = process
|
||||
}
|
||||
|
||||
func (km *KcpModule) SetNewClientIdHandler(newClientIdHandler func() string){
|
||||
km.newClientIdHandler = newClientIdHandler
|
||||
}
|
||||
|
||||
func (km *KcpModule) Start() error {
|
||||
return km.kcpServer.Start()
|
||||
}
|
||||
@@ -77,9 +86,9 @@ func (km *KcpModule) kcpEventHandler(ev event.IEvent) {
|
||||
case KPTDisConnected:
|
||||
km.process.DisConnectedRoute(e.StringExt[0])
|
||||
case KPTUnknownPack:
|
||||
km.process.UnknownMsgRoute(e.StringExt[0], e.Data, e.AnyExt[0].(func(data []byte)))
|
||||
km.process.UnknownMsgRoute(e.StringExt[0], e.Data)
|
||||
case KPTPack:
|
||||
km.process.MsgRoute(e.StringExt[0], e.Data, e.AnyExt[0].(func(data []byte)))
|
||||
km.process.MsgRoute(e.StringExt[0], e.Data)
|
||||
}
|
||||
|
||||
event.DeleteEvent(ev)
|
||||
@@ -111,7 +120,7 @@ func (km *KcpModule) newClient(conn network.Conn) *Client {
|
||||
km.mapClientLocker.Lock()
|
||||
defer km.mapClientLocker.Unlock()
|
||||
|
||||
pClient := &Client{kcpConn: conn.(*network.NetConn), id: primitive.NewObjectID().Hex()}
|
||||
pClient := &Client{kcpConn: conn.(*network.NetConn), id: km.newClientIdHandler()}
|
||||
pClient.kcpModule = km
|
||||
km.mapClient[pClient.id] = pClient
|
||||
|
||||
|
||||
@@ -20,6 +20,7 @@ type TcpModule struct {
|
||||
mapClient map[string]*Client
|
||||
process processor.IRawProcessor
|
||||
tcpCfg *TcpCfg
|
||||
newClientIdHandler func() string
|
||||
}
|
||||
|
||||
type TcpPackType int8
|
||||
@@ -35,6 +36,7 @@ type TcpPack struct {
|
||||
Type TcpPackType //0表示连接 1表示断开 2表示数据
|
||||
ClientId string
|
||||
Data interface{}
|
||||
rawData []byte
|
||||
RecyclerReaderBytes func(data []byte)
|
||||
}
|
||||
|
||||
@@ -51,7 +53,8 @@ type TcpCfg struct {
|
||||
LittleEndian bool //是否小端序
|
||||
LenMsgLen int //消息头占用byte数量,只能是1byte,2byte,4byte。如果是4byte,意味着消息最大可以是math.MaxUint32(4GB)
|
||||
MinMsgLen uint32 //最小消息长度
|
||||
MaxMsgLen uint32 //最大消息长度,超过判定不合法,断开连接
|
||||
MaxReadMsgLen uint32 //最大消息长度,超过判定不合法,断开连接
|
||||
MaxWriteMsgLen uint32 // 最大写消息长度
|
||||
ReadDeadlineSecond time.Duration //读超时
|
||||
WriteDeadlineSecond time.Duration //写超时
|
||||
}
|
||||
@@ -68,11 +71,17 @@ func (tm *TcpModule) OnInit() error {
|
||||
tm.tcpServer.LittleEndian = tm.tcpCfg.LittleEndian
|
||||
tm.tcpServer.LenMsgLen = tm.tcpCfg.LenMsgLen
|
||||
tm.tcpServer.MinMsgLen = tm.tcpCfg.MinMsgLen
|
||||
tm.tcpServer.MaxMsgLen = tm.tcpCfg.MaxMsgLen
|
||||
tm.tcpServer.MaxReadMsgLen = tm.tcpCfg.MaxReadMsgLen
|
||||
tm.tcpServer.MaxWriteMsgLen = tm.tcpCfg.MaxWriteMsgLen
|
||||
tm.tcpServer.ReadDeadline = tm.tcpCfg.ReadDeadlineSecond * time.Second
|
||||
tm.tcpServer.WriteDeadline = tm.tcpCfg.WriteDeadlineSecond * time.Second
|
||||
tm.mapClient = make(map[string]*Client, tm.tcpServer.MaxConnNum)
|
||||
tm.tcpServer.NewAgent = tm.NewClient
|
||||
if tm.newClientIdHandler == nil {
|
||||
tm.newClientIdHandler = func()string{
|
||||
return primitive.NewObjectID().Hex()
|
||||
}
|
||||
}
|
||||
|
||||
//3.设置解析处理器
|
||||
tm.process.SetByteOrder(tm.tcpCfg.LittleEndian)
|
||||
@@ -87,6 +96,10 @@ func (tm *TcpModule) Init(tcpCfg *TcpCfg, process processor.IRawProcessor) {
|
||||
tm.process = process
|
||||
}
|
||||
|
||||
func (tm *TcpModule) SetNewClientIdHandler(newClientIdHandler func() string){
|
||||
tm.newClientIdHandler = newClientIdHandler
|
||||
}
|
||||
|
||||
func (tm *TcpModule) Start() error {
|
||||
return tm.tcpServer.Start()
|
||||
}
|
||||
@@ -99,9 +112,11 @@ func (tm *TcpModule) tcpEventHandler(ev event.IEvent) {
|
||||
case TPTDisConnected:
|
||||
tm.process.DisConnectedRoute(pack.ClientId)
|
||||
case TPTUnknownPack:
|
||||
tm.process.UnknownMsgRoute(pack.ClientId, pack.Data, pack.RecyclerReaderBytes)
|
||||
tm.process.UnknownMsgRoute(pack.ClientId, pack.Data)
|
||||
pack.RecyclerReaderBytes(pack.rawData)
|
||||
case TPTPack:
|
||||
tm.process.MsgRoute(pack.ClientId, pack.Data, pack.RecyclerReaderBytes)
|
||||
tm.process.MsgRoute(pack.ClientId, pack.Data)
|
||||
pack.RecyclerReaderBytes(pack.rawData)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -109,7 +124,7 @@ func (tm *TcpModule) NewClient(conn network.Conn) network.Agent {
|
||||
tm.mapClientLocker.Lock()
|
||||
defer tm.mapClientLocker.Unlock()
|
||||
|
||||
clientId := primitive.NewObjectID().Hex()
|
||||
clientId := tm.newClientIdHandler()
|
||||
pClient := &Client{tcpConn: conn.(*network.NetConn), id: clientId}
|
||||
pClient.tcpModule = tm
|
||||
tm.mapClient[clientId] = pClient
|
||||
@@ -138,10 +153,10 @@ func (slf *Client) Run() {
|
||||
}
|
||||
data, err := slf.tcpModule.process.Unmarshal(slf.id, bytes)
|
||||
if err != nil {
|
||||
slf.tcpModule.NotifyEvent(&event.Event{Type: event.Sys_Event_Tcp, Data: TcpPack{ClientId: slf.id, Type: TPTUnknownPack, Data: bytes, RecyclerReaderBytes: slf.tcpConn.GetRecyclerReaderBytes()}})
|
||||
slf.tcpModule.NotifyEvent(&event.Event{Type: event.Sys_Event_Tcp, Data: TcpPack{ClientId: slf.id, Type: TPTUnknownPack, Data: data,rawData: bytes,RecyclerReaderBytes: slf.tcpConn.GetRecyclerReaderBytes()}})
|
||||
continue
|
||||
}
|
||||
slf.tcpModule.NotifyEvent(&event.Event{Type: event.Sys_Event_Tcp, Data: TcpPack{ClientId: slf.id, Type: TPTPack, Data: data, RecyclerReaderBytes: slf.tcpConn.GetRecyclerReaderBytes()}})
|
||||
slf.tcpModule.NotifyEvent(&event.Event{Type: event.Sys_Event_Tcp, Data: TcpPack{ClientId: slf.id, Type: TPTPack, Data: data,rawData: bytes, RecyclerReaderBytes: slf.tcpConn.GetRecyclerReaderBytes()}})
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -22,6 +22,7 @@ type WSModule struct {
|
||||
mapClient map[string]*WSClient
|
||||
process processor.IRawProcessor
|
||||
wsCfg *WSCfg
|
||||
newClientIdHandler func() string
|
||||
}
|
||||
|
||||
type WSClient struct {
|
||||
@@ -34,7 +35,8 @@ type WSCfg struct {
|
||||
ListenAddr string
|
||||
MaxConnNum int
|
||||
PendingWriteNum int
|
||||
MaxMsgLen uint32
|
||||
MaxReadMsgLen uint32
|
||||
MaxWriteMsgLen uint32
|
||||
LittleEndian bool //是否小端序
|
||||
KeyFile string
|
||||
CertFile string
|
||||
@@ -67,12 +69,17 @@ func (ws *WSModule) OnInit() error {
|
||||
|
||||
ws.WSServer.MaxConnNum = ws.wsCfg.MaxConnNum
|
||||
ws.WSServer.PendingWriteNum = ws.wsCfg.PendingWriteNum
|
||||
ws.WSServer.MaxMsgLen = ws.wsCfg.MaxMsgLen
|
||||
ws.WSServer.MaxReadMsgLen = ws.wsCfg.MaxReadMsgLen
|
||||
ws.WSServer.MaxWriteMsgLen = ws.wsCfg.MaxWriteMsgLen
|
||||
ws.WSServer.Addr = ws.wsCfg.ListenAddr
|
||||
ws.WSServer.HandshakeTimeout = ws.wsCfg.HandshakeTimeoutSecond*time.Second
|
||||
ws.WSServer.ReadTimeout = ws.wsCfg.ReadTimeoutSecond*time.Second
|
||||
ws.WSServer.WriteTimeout = ws.wsCfg.WriteTimeoutSecond*time.Second
|
||||
|
||||
if ws.newClientIdHandler == nil {
|
||||
ws.newClientIdHandler = func()string{
|
||||
return primitive.NewObjectID().Hex()
|
||||
}
|
||||
}
|
||||
if ws.wsCfg.KeyFile != "" && ws.wsCfg.CertFile != "" {
|
||||
ws.WSServer.KeyFile = ws.wsCfg.KeyFile
|
||||
ws.WSServer.CertFile = ws.wsCfg.CertFile
|
||||
@@ -95,6 +102,10 @@ func (ws *WSModule) Init(wsCfg *WSCfg, process processor.IRawProcessor) {
|
||||
ws.process = process
|
||||
}
|
||||
|
||||
func (ws *WSModule) SetNewClientIdHandler(newClientIdHandler func() string){
|
||||
ws.newClientIdHandler = newClientIdHandler
|
||||
}
|
||||
|
||||
func (ws *WSModule) Start() error {
|
||||
return ws.WSServer.Start()
|
||||
}
|
||||
@@ -107,9 +118,9 @@ func (ws *WSModule) wsEventHandler(ev event.IEvent) {
|
||||
case WPTDisConnected:
|
||||
ws.process.DisConnectedRoute(pack.ClientId)
|
||||
case WPTUnknownPack:
|
||||
ws.process.UnknownMsgRoute(pack.ClientId, pack.Data, ws.recyclerReaderBytes)
|
||||
ws.process.UnknownMsgRoute(pack.ClientId, pack.Data)
|
||||
case WPTPack:
|
||||
ws.process.MsgRoute(pack.ClientId, pack.Data, ws.recyclerReaderBytes)
|
||||
ws.process.MsgRoute(pack.ClientId, pack.Data)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -119,7 +130,7 @@ func (ws *WSModule) recyclerReaderBytes([]byte) {
|
||||
func (ws *WSModule) NewWSClient(conn *network.WSConn) network.Agent {
|
||||
ws.mapClientLocker.Lock()
|
||||
defer ws.mapClientLocker.Unlock()
|
||||
pClient := &WSClient{wsConn: conn, id: primitive.NewObjectID().Hex()}
|
||||
pClient := &WSClient{wsConn: conn, id: ws.newClientIdHandler()}
|
||||
pClient.wsModule = ws
|
||||
ws.mapClient[pClient.id] = pClient
|
||||
|
||||
@@ -140,7 +151,7 @@ func (wc *WSClient) Run() {
|
||||
}
|
||||
data, err := wc.wsModule.process.Unmarshal(wc.id, bytes)
|
||||
if err != nil {
|
||||
wc.wsModule.NotifyEvent(&event.Event{Type: event.Sys_Event_WebSocket, Data: &WSPack{ClientId: wc.id, Type: WPTUnknownPack, Data: bytes}})
|
||||
wc.wsModule.NotifyEvent(&event.Event{Type: event.Sys_Event_WebSocket, Data: &WSPack{ClientId: wc.id, Type: WPTUnknownPack, Data: data}})
|
||||
continue
|
||||
}
|
||||
wc.wsModule.NotifyEvent(&event.Event{Type: event.Sys_Event_WebSocket, Data: &WSPack{ClientId: wc.id, Type: WPTPack, Data: data}})
|
||||
|
||||
Reference in New Issue
Block a user