优化rpc内存池

This commit is contained in:
boyce
2021-01-08 10:21:54 +08:00
parent f88423c7ab
commit 7e288ccdd8
5 changed files with 38 additions and 49 deletions

View File

@@ -1,26 +1,20 @@
package rpc package rpc
import ( import (
"github.com/duanhf2012/origin/util/sync"
"github.com/gogo/protobuf/proto" "github.com/gogo/protobuf/proto"
"sync"
) )
type GoGoPBProcessor struct { type GoGoPBProcessor struct {
} }
var rpcGoGoPbResponseDataPool sync.Pool var rpcGoGoPbResponseDataPool =sync.NewPool(make(chan interface{},10240), func()interface{}{
var rpcGoGoPbRequestDataPool sync.Pool return &GoGoPBRpcResponseData{}
})
var rpcGoGoPbRequestDataPool =sync.NewPool(make(chan interface{},10240), func()interface{}{
func init(){ return &GoGoPBRpcRequestData{}
rpcGoGoPbResponseDataPool.New = func()interface{}{ })
return &GoGoPBRpcResponseData{}
}
rpcGoGoPbRequestDataPool.New = func()interface{}{
return &GoGoPBRpcRequestData{}
}
}
func (slf *GoGoPBRpcRequestData) MakeRequest(seq uint64,rpcMethodId uint32,serviceMethod string,noReply bool,inParam []byte) *GoGoPBRpcRequestData{ func (slf *GoGoPBRpcRequestData) MakeRequest(seq uint64,rpcMethodId uint32,serviceMethod string,noReply bool,inParam []byte) *GoGoPBRpcRequestData{
slf.Seq = seq slf.Seq = seq

View File

@@ -2,7 +2,7 @@ package rpc
import ( import (
jsoniter "github.com/json-iterator/go" jsoniter "github.com/json-iterator/go"
"sync" "github.com/duanhf2012/origin/util/sync"
) )
var json = jsoniter.ConfigCompatibleWithStandardLibrary var json = jsoniter.ConfigCompatibleWithStandardLibrary
@@ -29,18 +29,13 @@ type JsonRpcResponseData struct {
Reply []byte Reply []byte
} }
var rpcJsonResponseDataPool sync.Pool var rpcJsonResponseDataPool=sync.NewPool(make(chan interface{},10240), func()interface{}{
var rpcJsonRequestDataPool sync.Pool return &JsonRpcResponseData{}
})
func init(){ var rpcJsonRequestDataPool =sync.NewPool(make(chan interface{},10240), func()interface{}{
rpcJsonResponseDataPool.New = func()interface{}{ return &JsonRpcRequestData{}
return &JsonRpcResponseData{} })
}
rpcJsonRequestDataPool.New = func()interface{}{
return &JsonRpcRequestData{}
}
}
func (jsonProcessor *JsonProcessor) Marshal(v interface{}) ([]byte, error){ func (jsonProcessor *JsonProcessor) Marshal(v interface{}) ([]byte, error){
return json.Marshal(v) return json.Marshal(v)

View File

@@ -28,7 +28,6 @@ func (r *Responder) IsInvalid() bool {
return reflect.ValueOf(*r).Pointer() == reflect.ValueOf(reqHandlerNull).Pointer() return reflect.ValueOf(*r).Pointer() == reflect.ValueOf(reqHandlerNull).Pointer()
} }
//var rpcResponsePool sync.Pool
var rpcRequestPool = sync.NewPoolEx(make(chan sync.IPoolData,10240),func()sync.IPoolData{ var rpcRequestPool = sync.NewPoolEx(make(chan sync.IPoolData,10240),func()sync.IPoolData{
return &RpcRequest{} return &RpcRequest{}
}) })
@@ -148,31 +147,20 @@ func MakeRpcRequest(rpcProcessor IRpcProcessor,seq uint64,rpcMethodId uint32,ser
rpcRequest := rpcRequestPool.Get().(*RpcRequest).Clear() rpcRequest := rpcRequestPool.Get().(*RpcRequest).Clear()
rpcRequest.rpcProcessor = rpcProcessor rpcRequest.rpcProcessor = rpcProcessor
rpcRequest.RpcRequestData = rpcRequest.rpcProcessor.MakeRpcRequest(seq,rpcMethodId,serviceMethod,noReply,inParam) rpcRequest.RpcRequestData = rpcRequest.rpcProcessor.MakeRpcRequest(seq,rpcMethodId,serviceMethod,noReply,inParam)
rpcRequest.ref = true
return rpcRequest return rpcRequest
} }
func ReleaseRpcRequest(rpcRequest *RpcRequest){ func ReleaseRpcRequest(rpcRequest *RpcRequest){
if rpcRequest.ref == false {
panic("Duplicate memory release!")
}
rpcRequest.ref = false
rpcRequest.rpcProcessor.ReleaseRpcRequest(rpcRequest.RpcRequestData) rpcRequest.rpcProcessor.ReleaseRpcRequest(rpcRequest.RpcRequestData)
rpcRequestPool.Put(rpcRequest) rpcRequestPool.Put(rpcRequest)
} }
func MakeCall() *Call { func MakeCall() *Call {
call := rpcCallPool.Get().(*Call).Clear() return rpcCallPool.Get().(*Call)
call.ref = true
return call
} }
func ReleaseCall(call *Call){ func ReleaseCall(call *Call){
if call.ref == false {
panic("Duplicate memory release!")
}
call.ref = false
rpcCallPool.Put(call) rpcCallPool.Put(call)
} }

View File

@@ -3,8 +3,8 @@ package sync
import sysSync "sync" import sysSync "sync"
type Pool struct { type Pool struct {
New func()interface{} //构建对象函数
C chan interface{} //最大缓存的数量 C chan interface{} //最大缓存的数量
syncPool sysSync.Pool
} }
type IPoolData interface { type IPoolData interface {
@@ -14,18 +14,17 @@ type IPoolData interface {
UnRef() UnRef()
} }
type poolEx struct{ type PoolEx struct{
C chan IPoolData //最大缓存的数量 C chan IPoolData //最大缓存的数量
syncPool sysSync.Pool syncPool sysSync.Pool
} }
func (pool *Pool) Get() interface{}{ func (pool *Pool) Get() interface{}{
select { select {
case d := <-pool.C: case d := <-pool.C:
return d return d
default: default:
return pool.New() return pool.syncPool.Get()
} }
return nil return nil
@@ -35,11 +34,19 @@ func (pool *Pool) Put(data interface{}){
select { select {
case pool.C <- data: case pool.C <- data:
default: default:
pool.syncPool.Put(data)
} }
} }
func NewPoolEx(C chan IPoolData,New func()IPoolData) *poolEx{ func NewPool(C chan interface{},New func()interface{}) *Pool{
var pool poolEx var p Pool
p.C = C
p.syncPool.New = New
return &p
}
func NewPoolEx(C chan IPoolData,New func()IPoolData) *PoolEx{
var pool PoolEx
pool.C = C pool.C = C
//pool.New = New //pool.New = New
pool.syncPool.New = func() interface{} { pool.syncPool.New = func() interface{} {
@@ -48,15 +55,13 @@ func NewPoolEx(C chan IPoolData,New func()IPoolData) *poolEx{
return &pool return &pool
} }
func (pool *poolEx) Get() IPoolData{ func (pool *PoolEx) Get() IPoolData{
select { select {
case d := <-pool.C: case d := <-pool.C:
d.Ref() d.Ref()
d.Reset()
return d return d
default: default:
data := pool.syncPool.Get().(IPoolData) data := pool.syncPool.Get().(IPoolData)
data.Reset()
data.Ref() data.Ref()
return data return data
} }
@@ -64,11 +69,11 @@ func (pool *poolEx) Get() IPoolData{
return nil return nil
} }
func (pool *poolEx) Put(data IPoolData){ func (pool *PoolEx) Put(data IPoolData){
if data.IsRef() == false { if data.IsRef() == false {
panic("Repeatedly freeing memory") panic("Repeatedly freeing memory")
} }
data.Reset()
data.UnRef() data.UnRef()
select { select {
case pool.C <- data: case pool.C <- data:

View File

@@ -120,6 +120,13 @@ func (t *Timer) GetName() string{
} }
func (t *Timer) Reset(){ func (t *Timer) Reset(){
t.name = ""
t.cancelled = false
t.C = nil
t.interval = 0
t.cb = nil
t.AdditionData = nil
t.rOpen = false
} }
func (t *Timer) IsRef()bool{ func (t *Timer) IsRef()bool{