From 9e128759f6d0756007a368e9ea0e10a68a31f07f Mon Sep 17 00:00:00 2001 From: duanhf2012 Date: Thu, 2 Apr 2020 15:25:25 +0800 Subject: [PATCH] =?UTF-8?q?=E6=96=B0=E5=A2=9E=E6=80=A7=E8=83=BD=E5=88=86?= =?UTF-8?q?=E6=9E=90=E5=99=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cluster/cluster.go | 4 +- example/main.go | 5 ++ node/node.go | 13 ++++ profiler/profiler.go | 180 +++++++++++++++++++++++++++++++++++++++++++ rpc/client.go | 3 +- rpc/server.go | 2 +- service/module.go | 4 +- service/service.go | 48 +++++++++++- util/timer/timer.go | 13 +++- 9 files changed, 262 insertions(+), 10 deletions(-) create mode 100644 profiler/profiler.go diff --git a/cluster/cluster.go b/cluster/cluster.go index c8c5be5..aa9470d 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -69,8 +69,8 @@ func (slf *Cluster) Init(currentNodeId int) error{ rpcinfo.nodeinfo = nodeinfo rpcinfo.client = &rpc.Client{} if nodeinfo.NodeId == currentNodeId { - rpcinfo.client.Connect("localhost") - //rpcinfo.client.Connect(nodeinfo.ListenAddr) + //rpcinfo.client.Connect("localhost") + rpcinfo.client.Connect(nodeinfo.ListenAddr) }else{ rpcinfo.client.Connect(nodeinfo.ListenAddr) } diff --git a/example/main.go b/example/main.go index 7171bfe..48eed8f 100644 --- a/example/main.go +++ b/example/main.go @@ -92,6 +92,8 @@ func (slf *Module4) OnRelease() { } func (slf *TestServiceCall) OnInit() error { + slf.OpenProfiler() + //slf.AfterFunc(time.Second*1,slf.Run) slf.AfterFunc(time.Second*1,slf.Test) moduleid1,_ = slf.AddModule(&Module1{}) @@ -182,6 +184,7 @@ func (slf *TestService1) RPC_Test(a *Param,b *Param) error { } func (slf *TestService1) OnInit() error { + slf.OpenProfiler() return nil } /* @@ -213,6 +216,7 @@ func (slf *TestServiceCall) TestDB() { } func (slf *TestService2) OnInit() error { + slf.OpenProfiler() return nil } @@ -221,6 +225,7 @@ func main(){ gateService := &GateService.GateService{} tcpService.SetEventReciver(gateService) node.Setup(tcpService,gateService) + node.OpenProfilerReport(time.Second*10) node.Start() } diff --git a/node/node.go b/node/node.go index f51d9f4..961658b 100644 --- a/node/node.go +++ b/node/node.go @@ -5,18 +5,21 @@ import ( "github.com/duanhf2012/origin/cluster" "github.com/duanhf2012/origin/console" "github.com/duanhf2012/origin/log" + "github.com/duanhf2012/origin/profiler" "github.com/duanhf2012/origin/service" "io/ioutil" "os" "os/signal" "strconv" "syscall" + "time" ) var closeSig chan bool var sigs chan os.Signal var nodeId int var preSetupService []service.IService //预安装 +var profilerInterval time.Duration func init() { closeSig = make(chan bool,1) @@ -111,12 +114,18 @@ func startNode(paramNodeId interface{}) error { service.Start() writeProcessPid() bRun := true + var pProfilerTicker *time.Ticker = &time.Ticker{} + if profilerInterval>0 { + pProfilerTicker = time.NewTicker(profilerInterval) + } for bRun { select { case <-sigs: log.Debug("receipt stop signal.") bRun = false + case <- pProfilerTicker.C: + profiler.Report() } } @@ -147,3 +156,7 @@ func SetSysLog(strLevel string, pathname string, flag int){ logs,_:= log.New(strLevel,pathname,flag) log.Export(logs) } + +func OpenProfilerReport(interval time.Duration){ + profilerInterval = interval +} diff --git a/profiler/profiler.go b/profiler/profiler.go new file mode 100644 index 0000000..8fb0ed3 --- /dev/null +++ b/profiler/profiler.go @@ -0,0 +1,180 @@ +package profiler + +import ( + "container/list" + "fmt" + "github.com/duanhf2012/origin/log" + "sync" + "time" +) + +//最大超长时间,一般可以认为是死锁或者死循环,或者极差的性能问题 +var Default_MaxOverTime time.Duration = 5*time.Second +//超过该时间将会监控报告 +var Default_OverTime time.Duration = 10*time.Millisecond +var Default_MaxRecordNum int = 100 //最大记录条数 + +type Element struct { + tagName string + pushTime time.Time +} + +type RecordType int +const ( + MaxOverTime_Type = 1 + OverTime_Type =2 + ) + + +type Record struct { + RType RecordType + CostTime time.Duration + RecordName string +} + +type Profiler struct { + stack *list.List //Element + stackLocker sync.RWMutex + + record *list.List //Record + + callNum int //调用次数 + totalCostTime time.Duration //总消费时间长 + + maxOverTime time.Duration + overTime time.Duration + maxRecordNum int +} + +var mapProfiler map[string]*Profiler + +func init(){ + mapProfiler = map[string]*Profiler{} +} + +func RegProfiler(profilerName string) *Profiler { + if _,ok :=mapProfiler[profilerName];ok==true { + return nil + } + + pProfiler := &Profiler{stack:list.New(),record:list.New(),maxOverTime:Default_MaxOverTime,overTime:Default_OverTime} + mapProfiler[profilerName] =pProfiler + return pProfiler +} + +func (slf *Profiler) Push(tag string) { + slf.stackLocker.Lock() + slf.stack.PushBack(&Element{tagName:tag,pushTime:time.Now()}) + slf.stackLocker.Unlock() +} + +func (slf *Profiler) pushRecordLog(record *Record){ + if slf.record.Len()>=Default_MaxRecordNum{ + front := slf.stack.Front() + if front!=nil { + slf.stack.Remove(front) + } + } + + slf.record.PushBack(record) +} + +func (slf *Profiler) check(pElem *Element) (*Record,time.Duration) { + if pElem == nil { + return nil,0 + } + + subTm := time.Now().Sub(pElem.pushTime) + if subTm < slf.overTime { + return nil,subTm + } + + record := Record{ + RType: OverTime_Type, + CostTime: subTm, + RecordName: pElem.tagName, + } + + if subTm>slf.maxOverTime { + record.RType = MaxOverTime_Type + } + + return &record,subTm +} + +func (slf *Profiler) Pop() { + slf.stackLocker.Lock() + + front := slf.stack.Front() + if front!=nil && front.Value!=nil { + pElement := front.Value.(*Element) + pElem,subTm := slf.check(pElement) + slf.callNum+=1 + slf.totalCostTime += subTm + if pElem != nil { + slf.pushRecordLog(pElem) + } + slf.stack.Remove(front) + } + + slf.stackLocker.Unlock() +} + +type ReportFunType func(name string,callNum int,costTime time.Duration,record *list.List) + +var reportFunc ReportFunType =DefaultReportFunction + +func SetReportFunction(reportFun ReportFunType) { + reportFunc = reportFun +} + +func DefaultReportFunction(name string,callNum int,costTime time.Duration,record *list.List){ + if record.Len()<=0 { + return + } + + var strReport string + strReport = "Profiler report tag "+name+":\n" + strReport += fmt.Sprintf("process count %d,take time %d Milliseconds,average %d Milliseconds/per.\n",callNum,costTime.Milliseconds(),costTime.Milliseconds()/int64(callNum)) + elem := record.Front() + var strTypes string + for elem!=nil { + pRecord := elem.Value.(*Record) + if pRecord.RType == MaxOverTime_Type { + strTypes = "very slow process" + }else{ + strTypes = "slow process" + } + + strReport += fmt.Sprintf("%s:%s is take %d Milliseconds\n",strTypes,pRecord.RecordName,pRecord.CostTime.Milliseconds()) + elem = elem.Next() + } + + log.Release(strReport) +} + +func Report() { + var record *list.List + for name,prof := range mapProfiler{ + prof.stackLocker.RLock() + if prof.record.Len() == 0 { + prof.stackLocker.RUnlock() + continue + } + + //取栈的队首,是否存在异常MaxOverTime数据 + pElem := prof.stack.Front() + if pElem!=nil && pElem.Value!=nil{ + pRecord,_ := prof.check(pElem.Value.(*Element)) + if pRecord!=nil { + prof.pushRecordLog(pRecord) + } + } + record = prof.record + prof.record = list.New() + prof.stackLocker.RUnlock() + + DefaultReportFunction(name,prof.callNum,prof.totalCostTime,record) + } +} + diff --git a/rpc/client.go b/rpc/client.go index 12c68a1..ed261a0 100644 --- a/rpc/client.go +++ b/rpc/client.go @@ -59,6 +59,7 @@ func (slf *Client) AsycGo(rpcHandler IRpcHandler,serviceMethod string,callback r call.Reply = replyParam call.callback = &callback call.rpcHandler = rpcHandler + call.ServiceMethod = serviceMethod request := &RpcRequest{} request.NoReply = false @@ -95,7 +96,7 @@ func (slf *Client) Go(noReply bool,serviceMethod string, args interface{},reply call := new(Call) call.done = make(chan *Call,1) call.Reply = reply - + call.ServiceMethod = serviceMethod request := &RpcRequest{} request.NoReply = noReply call.Arg = args diff --git a/rpc/server.go b/rpc/server.go index 5483c92..64b1239 100644 --- a/rpc/server.go +++ b/rpc/server.go @@ -15,7 +15,7 @@ var LittleEndian bool type Call struct { Seq uint64 - //ServiceMethod string + ServiceMethod string Arg interface{} Reply interface{} Respone *RpcResponse diff --git a/service/module.go b/service/module.go index 2a4e048..8873ba1 100644 --- a/service/module.go +++ b/service/module.go @@ -6,6 +6,7 @@ import ( "github.com/duanhf2012/origin/log" "github.com/duanhf2012/origin/util/timer" "reflect" + "runtime" "time" ) @@ -170,7 +171,8 @@ func (slf *Module) AfterFunc(d time.Duration, cb func()) *timer.Timer { slf.mapActiveTimer =map[*timer.Timer]interface{}{} } - tm := slf.dispatcher.AfterFuncEx(d,func(t *timer.Timer){ + funName := runtime.FuncForPC(reflect.ValueOf(cb).Pointer()).Name() + tm := slf.dispatcher.AfterFuncEx(funName,d,func(t *timer.Timer){ cb() delete(slf.mapActiveTimer,t) }) diff --git a/service/service.go b/service/service.go index f778055..ae88db6 100644 --- a/service/service.go +++ b/service/service.go @@ -4,6 +4,7 @@ import ( "fmt" "github.com/duanhf2012/origin/event" "github.com/duanhf2012/origin/log" + "github.com/duanhf2012/origin/profiler" "github.com/duanhf2012/origin/rpc" "github.com/duanhf2012/origin/util/timer" "reflect" @@ -27,6 +28,8 @@ type IService interface { Start() GetRpcHandler() rpc.IRpcHandler GetServiceCfg()interface{} + OpenProfiler() + GetProfiler() *profiler.Profiler } @@ -41,6 +44,7 @@ type Service struct { gorouterNum int32 startStatus bool + profiler *profiler.Profiler //性能分析器 } func (slf *Service) OnSetup(iservice IService){ @@ -49,8 +53,14 @@ func (slf *Service) OnSetup(iservice IService){ } } -func (slf *Service) Init(iservice IService,getClientFun rpc.FuncRpcClient,getServerFun rpc.FuncRpcServer,serviceCfg interface{}) { +func (slf *Service) OpenProfiler(){ + slf.profiler = profiler.RegProfiler(slf.GetName()) + if slf.profiler==nil { + log.Fatal("rofiler.RegProfiler %s fail.",slf.GetName()) + } +} +func (slf *Service) Init(iservice IService,getClientFun rpc.FuncRpcClient,getServerFun rpc.FuncRpcServer,serviceCfg interface{}) { slf.dispatcher =timer.NewDispatcher(timerDispatcherLen) slf.this = iservice slf.InitRpcHandler(iservice.(rpc.IRpcHandler),getClientFun,getServerFun) @@ -96,13 +106,38 @@ func (slf *Service) Run() { case <- closeSig: bStop = true case rpcRequest :=<- rpcRequestChan: + if slf.profiler!=nil { + slf.profiler.Push("Req_"+rpcRequest.ServiceMethod) + } + slf.GetRpcHandler().HandlerRpcRequest(rpcRequest) + if slf.profiler!=nil { + slf.profiler.Pop() + } case rpcResponeCB := <- rpcResponeCallBack: - slf.GetRpcHandler().HandlerRpcResponeCB(rpcResponeCB) + if slf.profiler!=nil { + slf.profiler.Push("Res_" + rpcResponeCB.ServiceMethod) + } + slf.GetRpcHandler().HandlerRpcResponeCB(rpcResponeCB) + if slf.profiler!=nil { + slf.profiler.Pop() + } case ev := <- eventChan: - slf.EventHandler(slf.this.(event.IEventProcessor),ev) + if slf.profiler!=nil { + slf.profiler.Push(fmt.Sprintf("Event_%d", int(ev.Type))) + } + slf.EventHandler(slf.this.(event.IEventProcessor),ev) + if slf.profiler!=nil { + slf.profiler.Pop() + } case t := <- slf.dispatcher.ChanTimer: + if slf.profiler!=nil { + slf.profiler.Push(fmt.Sprintf("Timer_%s", t.GetFunctionName())) + } t.Cb() + if slf.profiler!=nil { + slf.profiler.Pop() + } } if bStop == true { @@ -114,7 +149,6 @@ func (slf *Service) Run() { break } } - } func (slf *Service) GetName() string{ @@ -149,3 +183,9 @@ func (slf *Service) Wait(){ func (slf *Service) GetServiceCfg()interface{}{ return slf.serviceCfg } + + +func (slf *Service) GetProfiler() *profiler.Profiler{ + return slf.profiler +} + diff --git a/util/timer/timer.go b/util/timer/timer.go index 435babc..fa5d335 100644 --- a/util/timer/timer.go +++ b/util/timer/timer.go @@ -3,6 +3,7 @@ package timer import ( "fmt" "github.com/duanhf2012/origin/log" + "reflect" "runtime" "time" ) @@ -23,6 +24,7 @@ type Timer struct { t *time.Timer cb func() cbex func(*Timer) + name string } func (t *Timer) Stop() { @@ -30,6 +32,10 @@ func (t *Timer) Stop() { t.cb = nil } +func (t *Timer) GetFunctionName() string { + return t.name +} + func (t *Timer) Cb() { defer func() { if r := recover(); r != nil { @@ -51,15 +57,20 @@ func (t *Timer) Cb() { func (disp *Dispatcher) AfterFunc(d time.Duration, cb func()) *Timer { t := new(Timer) t.cb = cb + t.name = reflect.TypeOf(cb).Name() + t.t = time.AfterFunc(d, func() { disp.ChanTimer <- t }) + return t } -func (disp *Dispatcher) AfterFuncEx(d time.Duration, cbex func(timer *Timer)) *Timer { +func (disp *Dispatcher) AfterFuncEx(funName string,d time.Duration, cbex func(timer *Timer)) *Timer { t := new(Timer) t.cbex = cbex + t.name = funName//reflect.TypeOf(cbex).Name() + //t.name = runtime.FuncForPC(reflect.ValueOf(cbex).Pointer()).Name() t.t = time.AfterFunc(d, func() { disp.ChanTimer <- t })