diff --git a/rpc/rpc.go b/rpc/rpc.go index 372d8e9..ea273ba 100644 --- a/rpc/rpc.go +++ b/rpc/rpc.go @@ -2,7 +2,7 @@ package rpc import ( "reflect" - "sync" + "github.com/duanhf2012/origin/util/sync" "time" ) @@ -29,8 +29,13 @@ func (r *Responder) IsInvalid() bool { } //var rpcResponsePool sync.Pool -var rpcRequestPool sync.Pool -var rpcCallPool sync.Pool +var rpcRequestPool = sync.NewPoolEx(make(chan sync.IPoolData,10240),func()sync.IPoolData{ + return &RpcRequest{} +}) + +var rpcCallPool = sync.NewPoolEx(make(chan sync.IPoolData,10240),func()sync.IPoolData{ + return &Call{done:make(chan *Call,1)} +}) type IRpcRequestData interface { @@ -72,16 +77,6 @@ type Call struct { callTime time.Time } -func init(){ - rpcRequestPool.New = func() interface{} { - return &RpcRequest{} - } - - rpcCallPool.New = func() interface{} { - return &Call{done:make(chan *Call,1)} - } -} - func (slf *RpcRequest) Clear() *RpcRequest{ slf.RpcRequestData = nil slf.localReply = nil @@ -92,6 +87,22 @@ func (slf *RpcRequest) Clear() *RpcRequest{ return slf } +func (slf *RpcRequest) Reset() { + slf.Clear() +} + +func (slf *RpcRequest) IsRef()bool{ + return slf.ref +} + +func (slf *RpcRequest) Ref(){ + slf.ref = true +} + +func (slf *RpcRequest) UnRef(){ + slf.ref = false +} + func (rpcResponse *RpcResponse) Clear() *RpcResponse{ rpcResponse.RpcResponseData = nil return rpcResponse @@ -113,6 +124,22 @@ func (call *Call) Clear() *Call{ return call } +func (call *Call) Reset() { + call.Clear() +} + +func (call *Call) IsRef()bool{ + return call.ref +} + +func (call *Call) Ref(){ + call.ref = true +} + +func (call *Call) UnRef(){ + call.ref = false +} + func (call *Call) Done() *Call{ return <-call.done } diff --git a/util/sync/MemPool.go b/util/sync/MemPool.go index ae7315b..caecf87 100644 --- a/util/sync/MemPool.go +++ b/util/sync/MemPool.go @@ -1,5 +1,7 @@ package sync +import sysSync "sync" + type Pool struct { New func()interface{} //构建对象函数 C chan interface{} //最大缓存的数量 @@ -12,11 +14,12 @@ type IPoolData interface { UnRef() } -type PoolEx struct{ - New func()IPoolData //构建对象函数 +type poolEx struct{ C chan IPoolData //最大缓存的数量 + syncPool sysSync.Pool } + func (pool *Pool) Get() interface{}{ select { case d := <-pool.C: @@ -35,22 +38,33 @@ func (pool *Pool) Put(data interface{}){ } } -func (pool *PoolEx) Get() IPoolData{ +func NewPoolEx(C chan IPoolData,New func()IPoolData) *poolEx{ + var pool poolEx + pool.C = C + //pool.New = New + pool.syncPool.New = func() interface{} { + return New() + } + return &pool +} + +func (pool *poolEx) Get() IPoolData{ select { case d := <-pool.C: d.Ref() d.Reset() return d default: - data := pool.New() + data := pool.syncPool.Get().(IPoolData) data.Reset() data.Ref() + return data } return nil } -func (pool *PoolEx) Put(data IPoolData){ +func (pool *poolEx) Put(data IPoolData){ if data.IsRef() == false { panic("Repeatedly freeing memory") } @@ -59,6 +73,8 @@ func (pool *PoolEx) Put(data IPoolData){ select { case pool.C <- data: default: + pool.syncPool.Put(data) } } + diff --git a/util/timer/timer.go b/util/timer/timer.go index 4471a2b..27fd036 100644 --- a/util/timer/timer.go +++ b/util/timer/timer.go @@ -2,9 +2,10 @@ package timer import ( "fmt" + "github.com/duanhf2012/origin/util/sync" "reflect" "runtime" - "sync" + "time" ) @@ -18,6 +19,8 @@ type Timer struct { cb func() AdditionData interface{} //定时器附加数据 rOpen bool //是否重新打开 + + ref bool } // Ticker @@ -30,17 +33,17 @@ type Cron struct { Timer } -var timerPool = sync.Pool{New: func() interface{}{ +var timerPool = sync.NewPoolEx(make(chan sync.IPoolData,1000),func() sync.IPoolData{ return &Timer{} -}} +}) -var cronPool = sync.Pool{New: func() interface{}{ +var cronPool = sync.NewPoolEx(make(chan sync.IPoolData,1000),func() sync.IPoolData{ return &Cron{} -}} +}) -var tickerPool = sync.Pool{New: func() interface{}{ +var tickerPool =sync.NewPoolEx(make(chan sync.IPoolData,1000),func() sync.IPoolData{ return &Ticker{} -}} +}) func newTimer(d time.Duration,c chan *Timer,cb func(),name string,additionData interface{}) *Timer{ if c == nil { @@ -116,6 +119,53 @@ func (t *Timer) GetName() string{ return t.name } +func (t *Timer) Reset(){ +} + +func (t *Timer) IsRef()bool{ + return t.ref +} + +func (t *Timer) Ref(){ + t.ref = true +} + +func (t *Timer) UnRef(){ + t.ref = false +} + +func (c *Cron) Reset(){ +} + +func (c *Cron) IsRef()bool{ + return c.ref +} + +func (c *Cron) Ref(){ + c.ref = true +} + +func (c *Cron) UnRef(){ + c.ref = false +} + +func (c *Ticker) Reset(){ +} + +func (c *Ticker) IsRef()bool{ + return c.ref +} + +func (c *Ticker) Ref(){ + c.ref = true +} + +func (c *Ticker) UnRef(){ + c.ref = false +} + + + func NewDispatcher(l int) *Dispatcher { disp := new(Dispatcher) disp.ChanTimer = make(chan *Timer, l)