TcpService新增设置内存池接口

This commit is contained in:
boyce
2020-12-14 16:05:32 +08:00
parent e25eda14d1
commit 4796ad7404
8 changed files with 41 additions and 16 deletions

View File

@@ -23,6 +23,7 @@ type JsonProcessor struct {
unknownMessageHandler UnknownMessageJsonHandler
connectHandler ConnectJsonHandler
disconnectHandler ConnectJsonHandler
network.INetMempool
}
type JsonPackInfo struct {
@@ -33,6 +34,8 @@ type JsonPackInfo struct {
func NewJsonProcessor() *JsonProcessor {
processor := &JsonProcessor{mapMsg:map[uint16]MessageJsonInfo{}}
processor.INetMempool = network.NewMemAreaPool()
return processor
}
@@ -54,7 +57,7 @@ func (jsonProcessor *JsonProcessor ) MsgRoute(msg interface{},userdata interface
func (jsonProcessor *JsonProcessor) Unmarshal(data []byte) (interface{}, error) {
typeStruct := struct {Type int `json:"typ"`}{}
defer network.ReleaseByteSlice(data)
defer jsonProcessor.ReleaseByteSlice(data)
err := json.Unmarshal(data, &typeStruct)
if err != nil {
return nil, err

View File

@@ -25,6 +25,7 @@ type PBProcessor struct {
unknownMessageHandler UnknownMessageHandler
connectHandler ConnectHandler
disconnectHandler ConnectHandler
network.INetMempool
}
type PBPackInfo struct {
@@ -35,6 +36,7 @@ type PBPackInfo struct {
func NewPBProcessor() *PBProcessor {
processor := &PBProcessor{mapMsg:map[uint16]MessageInfo{}}
processor.INetMempool = network.NewMemAreaPool()
return processor
}
@@ -64,7 +66,7 @@ func (pbProcessor *PBProcessor ) MsgRoute(msg interface{},userdata interface{})
// must goroutine safe
func (pbProcessor *PBProcessor ) Unmarshal(data []byte) (interface{}, error) {
defer network.ReleaseByteSlice(data)
defer pbProcessor.ReleaseByteSlice(data)
var msgType uint16
if pbProcessor.LittleEndian == true {
msgType = binary.LittleEndian.Uint16(data[:2])

View File

@@ -32,3 +32,4 @@ type IRawProcessor interface {
SetConnectedHandler(connectHandler RawConnectHandler)
SetDisConnectedHandler(disconnectHandler RawConnectHandler)
}

View File

@@ -4,6 +4,11 @@ import (
"sync"
)
type INetMempool interface {
MakeByteSlice(size int) []byte
ReleaseByteSlice(byteBuff []byte) bool
}
type memAreaPool struct {
minAreaValue int //最小范围值
maxAreaValue int //最大范围值
@@ -20,6 +25,10 @@ func init(){
}
}
func NewMemAreaPool() *memAreaPool{
return &memAreaPool{}
}
func (areaPool *memAreaPool) makePool(){
poolLen := (areaPool.maxAreaValue - areaPool.minAreaValue+1)/areaPool.growthValue
areaPool.pool = make([]sync.Pool,poolLen)
@@ -50,7 +59,6 @@ func (areaPool *memAreaPool) getPosByteSize(size int) int{
return pos
}
func (areaPool *memAreaPool) releaseByteSlice(byteBuff []byte) bool{
pos := areaPool.getPosByteSize(cap(byteBuff))
if pos > len(areaPool.pool) || pos == -1{
@@ -62,10 +70,9 @@ func (areaPool *memAreaPool) releaseByteSlice(byteBuff []byte) bool{
return true
}
func MakeByteSlice(size int) []byte{
func (areaPool *memAreaPool) MakeByteSlice(size int) []byte{
for i:=0;i<len(memAreaPoolList);i++{
if size <= memAreaPoolList[i].maxAreaValue {
return memAreaPoolList[i].makeByteSlice(size)
}
}
@@ -73,8 +80,7 @@ func MakeByteSlice(size int) []byte{
return nil
}
func ReleaseByteSlice(byteBuff []byte) bool {
func (areaPool *memAreaPool) ReleaseByteSlice(byteBuff []byte) bool {
for i:=0;i<len(memAreaPoolList);i++{
if cap(byteBuff) <= memAreaPoolList[i].maxAreaValue {
return memAreaPoolList[i].releaseByteSlice(byteBuff)
@@ -83,5 +89,3 @@ func ReleaseByteSlice(byteBuff []byte) bool {
return false
}

View File

@@ -38,7 +38,7 @@ func newTCPConn(conn net.Conn, pendingWriteNum int, msgParser *MsgParser) *TCPCo
break
}
_, err := conn.Write(b)
ReleaseByteSlice(b)
tcpConn.msgParser.ReleaseByteSlice(b)
if err != nil {
break
@@ -126,7 +126,7 @@ func (tcpConn *TCPConn) ReadMsg() ([]byte, error) {
}
func (tcpConn *TCPConn) ReleaseReadMsg(byteBuff []byte){
ReleaseByteSlice(byteBuff)
tcpConn.msgParser.ReleaseByteSlice(byteBuff)
}
func (tcpConn *TCPConn) WriteMsg(args ...[]byte) error {

View File

@@ -15,6 +15,8 @@ type MsgParser struct {
minMsgLen uint32
maxMsgLen uint32
littleEndian bool
INetMempool
}
func NewMsgParser() *MsgParser {
@@ -23,7 +25,7 @@ func NewMsgParser() *MsgParser {
p.minMsgLen = 1
p.maxMsgLen = 4096
p.littleEndian = false
p.INetMempool = NewMemAreaPool()
return p
}
@@ -99,9 +101,9 @@ func (p *MsgParser) Read(conn *TCPConn) ([]byte, error) {
// data
//msgData := make([]byte, msgLen)
msgData := MakeByteSlice(int(msgLen))
msgData := p.MakeByteSlice(int(msgLen))
if _, err := io.ReadFull(conn, msgData); err != nil {
ReleaseByteSlice(msgData)
p.ReleaseByteSlice(msgData)
return nil, err
}
@@ -124,7 +126,7 @@ func (p *MsgParser) Write(conn *TCPConn, args ...[]byte) error {
}
//msg := make([]byte, uint32(p.lenMsgLen)+msgLen)
msg := MakeByteSlice(p.lenMsgLen+int(msgLen))
msg := p.MakeByteSlice(p.lenMsgLen+int(msgLen))
// write len
switch p.lenMsgLen {
case 1:

View File

@@ -24,6 +24,7 @@ type TCPServer struct {
MaxMsgLen uint32
LittleEndian bool
msgParser *MsgParser
netMemPool INetMempool
}
func (server *TCPServer) Start() {
@@ -54,11 +55,19 @@ func (server *TCPServer) init() {
// msg parser
msgParser := NewMsgParser()
if msgParser.INetMempool == nil {
msgParser.INetMempool = NewMemAreaPool()
}
msgParser.SetMsgLen(server.LenMsgLen, server.MinMsgLen, server.MaxMsgLen)
msgParser.SetByteOrder(server.LittleEndian)
server.msgParser = msgParser
}
func (server *TCPServer) SetNetMempool(mempool INetMempool){
server.msgParser.INetMempool = mempool
}
func (server *TCPServer) run() {
server.wgLn.Add(1)
defer server.wgLn.Done()

View File

@@ -270,4 +270,8 @@ func (tcpService *TcpService) GetConnNum() int {
connNum := len(tcpService.mapClient)
tcpService.mapClientLocker.Unlock()
return connNum
}
}
func (server *TcpService) SetNetMempool(mempool network.INetMempool){
server.tcpServer.SetNetMempool(mempool)
}