diff --git a/rpc/gogopbprocessor.go b/rpc/gogopbprocessor.go index 173a9a0..40bf39f 100644 --- a/rpc/gogopbprocessor.go +++ b/rpc/gogopbprocessor.go @@ -1,26 +1,20 @@ package rpc import ( + "github.com/duanhf2012/origin/util/sync" "github.com/gogo/protobuf/proto" - "sync" ) type GoGoPBProcessor struct { } -var rpcGoGoPbResponseDataPool sync.Pool -var rpcGoGoPbRequestDataPool sync.Pool +var rpcGoGoPbResponseDataPool =sync.NewPool(make(chan interface{},10240), func()interface{}{ + return &GoGoPBRpcResponseData{} +}) - -func init(){ - rpcGoGoPbResponseDataPool.New = func()interface{}{ - return &GoGoPBRpcResponseData{} - } - - rpcGoGoPbRequestDataPool.New = func()interface{}{ - return &GoGoPBRpcRequestData{} - } -} +var rpcGoGoPbRequestDataPool =sync.NewPool(make(chan interface{},10240), func()interface{}{ + return &GoGoPBRpcRequestData{} +}) func (slf *GoGoPBRpcRequestData) MakeRequest(seq uint64,rpcMethodId uint32,serviceMethod string,noReply bool,inParam []byte) *GoGoPBRpcRequestData{ slf.Seq = seq diff --git a/rpc/jsonprocessor.go b/rpc/jsonprocessor.go index d63a450..fe48f1a 100644 --- a/rpc/jsonprocessor.go +++ b/rpc/jsonprocessor.go @@ -2,7 +2,7 @@ package rpc import ( jsoniter "github.com/json-iterator/go" - "sync" + "github.com/duanhf2012/origin/util/sync" ) var json = jsoniter.ConfigCompatibleWithStandardLibrary @@ -29,18 +29,13 @@ type JsonRpcResponseData struct { Reply []byte } -var rpcJsonResponseDataPool sync.Pool -var rpcJsonRequestDataPool sync.Pool +var rpcJsonResponseDataPool=sync.NewPool(make(chan interface{},10240), func()interface{}{ + return &JsonRpcResponseData{} +}) -func init(){ - rpcJsonResponseDataPool.New = func()interface{}{ - return &JsonRpcResponseData{} - } - - rpcJsonRequestDataPool.New = func()interface{}{ - return &JsonRpcRequestData{} - } -} +var rpcJsonRequestDataPool =sync.NewPool(make(chan interface{},10240), func()interface{}{ + return &JsonRpcRequestData{} +}) func (jsonProcessor *JsonProcessor) Marshal(v interface{}) ([]byte, error){ return json.Marshal(v) diff --git a/rpc/rpc.go b/rpc/rpc.go index ea273ba..296f496 100644 --- a/rpc/rpc.go +++ b/rpc/rpc.go @@ -28,7 +28,6 @@ func (r *Responder) IsInvalid() bool { return reflect.ValueOf(*r).Pointer() == reflect.ValueOf(reqHandlerNull).Pointer() } -//var rpcResponsePool sync.Pool var rpcRequestPool = sync.NewPoolEx(make(chan sync.IPoolData,10240),func()sync.IPoolData{ return &RpcRequest{} }) @@ -148,31 +147,20 @@ func MakeRpcRequest(rpcProcessor IRpcProcessor,seq uint64,rpcMethodId uint32,ser rpcRequest := rpcRequestPool.Get().(*RpcRequest).Clear() rpcRequest.rpcProcessor = rpcProcessor rpcRequest.RpcRequestData = rpcRequest.rpcProcessor.MakeRpcRequest(seq,rpcMethodId,serviceMethod,noReply,inParam) - rpcRequest.ref = true return rpcRequest } func ReleaseRpcRequest(rpcRequest *RpcRequest){ - if rpcRequest.ref == false { - panic("Duplicate memory release!") - } - rpcRequest.ref = false rpcRequest.rpcProcessor.ReleaseRpcRequest(rpcRequest.RpcRequestData) rpcRequestPool.Put(rpcRequest) } func MakeCall() *Call { - call := rpcCallPool.Get().(*Call).Clear() - call.ref = true - return call + return rpcCallPool.Get().(*Call) } func ReleaseCall(call *Call){ - if call.ref == false { - panic("Duplicate memory release!") - } - call.ref = false rpcCallPool.Put(call) } diff --git a/util/sync/MemPool.go b/util/sync/MemPool.go index caecf87..4a97b70 100644 --- a/util/sync/MemPool.go +++ b/util/sync/MemPool.go @@ -3,8 +3,8 @@ package sync import sysSync "sync" type Pool struct { - New func()interface{} //构建对象函数 C chan interface{} //最大缓存的数量 + syncPool sysSync.Pool } type IPoolData interface { @@ -14,18 +14,17 @@ type IPoolData interface { UnRef() } -type poolEx struct{ +type PoolEx struct{ C chan IPoolData //最大缓存的数量 syncPool sysSync.Pool } - func (pool *Pool) Get() interface{}{ select { case d := <-pool.C: return d default: - return pool.New() + return pool.syncPool.Get() } return nil @@ -35,11 +34,19 @@ func (pool *Pool) Put(data interface{}){ select { case pool.C <- data: default: + pool.syncPool.Put(data) } } -func NewPoolEx(C chan IPoolData,New func()IPoolData) *poolEx{ - var pool poolEx +func NewPool(C chan interface{},New func()interface{}) *Pool{ + var p Pool + p.C = C + p.syncPool.New = New + return &p +} + +func NewPoolEx(C chan IPoolData,New func()IPoolData) *PoolEx{ + var pool PoolEx pool.C = C //pool.New = New pool.syncPool.New = func() interface{} { @@ -48,15 +55,13 @@ func NewPoolEx(C chan IPoolData,New func()IPoolData) *poolEx{ return &pool } -func (pool *poolEx) Get() IPoolData{ +func (pool *PoolEx) Get() IPoolData{ select { case d := <-pool.C: d.Ref() - d.Reset() return d default: data := pool.syncPool.Get().(IPoolData) - data.Reset() data.Ref() return data } @@ -64,11 +69,11 @@ func (pool *poolEx) Get() IPoolData{ return nil } -func (pool *poolEx) Put(data IPoolData){ +func (pool *PoolEx) Put(data IPoolData){ if data.IsRef() == false { panic("Repeatedly freeing memory") } - + data.Reset() data.UnRef() select { case pool.C <- data: diff --git a/util/timer/timer.go b/util/timer/timer.go index 27fd036..8145ba6 100644 --- a/util/timer/timer.go +++ b/util/timer/timer.go @@ -120,6 +120,13 @@ func (t *Timer) GetName() string{ } func (t *Timer) Reset(){ + t.name = "" + t.cancelled = false + t.C = nil + t.interval = 0 + t.cb = nil + t.AdditionData = nil + t.rOpen = false } func (t *Timer) IsRef()bool{