From 4c6d72fd658244b0dcfe43c1979d2d1a1cff6c48 Mon Sep 17 00:00:00 2001 From: duanhf2012 Date: Tue, 25 Feb 2020 16:36:43 +0800 Subject: [PATCH] =?UTF-8?q?1.websocket=E6=8E=A5=E5=8F=97=E6=B6=88=E6=81=AF?= =?UTF-8?q?=EF=BC=8C=E6=AF=8F15=E7=A7=92=E7=9B=91=E6=8E=A7=E6=9C=80?= =?UTF-8?q?=E5=A4=A7=E8=80=97=E6=97=B6=E6=B6=88=E6=81=AF=E7=B1=BB=E5=9E=8B?= =?UTF-8?q?,=E6=9F=90=E6=B6=88=E6=81=AF=E5=A4=84=E7=90=86=E8=B6=85?= =?UTF-8?q?=E8=BF=87300ms=E4=BC=9A=E6=9C=89=E7=9B=91=E6=8E=A7=E6=97=A5?= =?UTF-8?q?=E5=BF=97=202.GoQueue=E5=A4=84=E7=90=86=E9=98=9F=E5=88=97?= =?UTF-8?q?=E7=9B=91=E6=8E=A7=EF=BC=8C=E6=AF=8F1=E5=88=86=E9=92=9F?= =?UTF-8?q?=E6=B1=87=E6=8A=A5=E7=A7=AF=E5=8E=8B=E5=A4=84=E7=90=86=E7=9A=84?= =?UTF-8?q?=E6=9C=80=E5=A4=A7=E9=87=8F=E3=80=82=E5=B9=B6=E7=9B=91=E6=8E=A7?= =?UTF-8?q?=E6=AD=BB=E5=BE=AA=E7=8E=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Test/main.go | 5 ++ Test2/Helloworld/config/cluster.json | 3 +- network/websocketserver.go | 23 +++++++- originnode/node.go | 6 +++ rpc/server.go | 46 ++++++---------- service/DeadForMonitor.go | 78 ++++++++++++++++++++++++++++ service/Module.go | 12 +++-- util/Timer.go | 6 +++ 8 files changed, 143 insertions(+), 36 deletions(-) create mode 100644 service/DeadForMonitor.go diff --git a/Test/main.go b/Test/main.go index feaea83..d87bbf2 100644 --- a/Test/main.go +++ b/Test/main.go @@ -7,6 +7,7 @@ import ( "github.com/duanhf2012/origin/originnode" "github.com/duanhf2012/origin/sysservice" "github.com/duanhf2012/origin/sysservice/originhttp" + "time" ) @@ -31,6 +32,9 @@ func main() { return } + //打开Module死循环监控 + node.EnableMonitorModule(time.Second*5) + nodeCfg, _ := cluster.ReadNodeConfig("./config/nodeconfig.json", cluster.GetNodeId()) httpserver := originhttp.NewHttpServerService(nodeCfg.HttpPort) // http服务 for _, ca := range nodeCfg.CAFile { @@ -46,6 +50,7 @@ func main() { httpserver.SetPrintRequestTime(true) node.SetupService(httpserver,pTcpService) + node.Init() node.Start() } diff --git a/Test2/Helloworld/config/cluster.json b/Test2/Helloworld/config/cluster.json index b515de2..3980558 100644 --- a/Test2/Helloworld/config/cluster.json +++ b/Test2/Helloworld/config/cluster.json @@ -9,7 +9,8 @@ "NodeName": "N_Node1", "ServiceList": [ "TestService1", - "TestService2" + "TestService2", + "HttpServerService" ], "ClusterNode":[] }, diff --git a/network/websocketserver.go b/network/websocketserver.go index 1eb16c6..f42f952 100644 --- a/network/websocketserver.go +++ b/network/websocketserver.go @@ -4,6 +4,7 @@ import ( "crypto/tls" "errors" "fmt" + "github.com/duanhf2012/origin/util" "net/http" "os" "runtime/debug" @@ -79,7 +80,6 @@ const ( ) func (slf *WebsocketServer) Init(port uint16) { - slf.port = port slf.mapClient = make(map[uint64]*WSClient) } @@ -88,7 +88,7 @@ func (slf *WebsocketServer) CreateClient(conn *websocket.Conn) *WSClient { slf.locker.Lock() slf.maxClientid++ clientid := slf.maxClientid - pclient := &WSClient{clientid, conn, make(chan WSMessage, MAX_MSG_COUNT)} + pclient := &WSClient{clientid, conn, make(chan WSMessage, MAX_MSG_COUNT+1)} slf.mapClient[pclient.clientid] = pclient slf.locker.Unlock() @@ -245,6 +245,12 @@ func (slf *BaseMessageReciver) startReadMsg(pclient *WSClient) { } }() + var maxTimeStamp int64 + var maxMsgType int + logMinMsgTime :=time.Millisecond*300 + + statisticsIntervalTm := util.Timer{} + statisticsIntervalTm.SetupTimer(1000 * 15)//15秒间隔 for { pclient.conn.SetReadDeadline(time.Now().Add(15 * time.Second)) msgtype, message, err := pclient.conn.ReadMessage() @@ -255,7 +261,20 @@ func (slf *BaseMessageReciver) startReadMsg(pclient *WSClient) { return } + if statisticsIntervalTm.CheckTimeOut() { + service.GetLogger().Printf(service.LEVER_INFO, "MaxMsgtype:%d,diff:%d",maxMsgType,maxTimeStamp) + } + //记录处理时间 + startRecvTm := time.Now().UnixNano() slf.messageReciver.OnRecvMsg(pclient.clientid, msgtype, message) + diff := time.Now().UnixNano() - startRecvTm + if diff> maxTimeStamp{ + maxTimeStamp = diff + maxMsgType = msgtype + } + if diff >= int64(logMinMsgTime) { + service.GetLogger().Printf(service.LEVER_WARN, "Process slowly MaxMsgtype:%d,diff:%d",maxMsgType,maxTimeStamp) + } } } diff --git a/originnode/node.go b/originnode/node.go index 2ac77f9..b4133f4 100644 --- a/originnode/node.go +++ b/originnode/node.go @@ -5,6 +5,7 @@ import ( "log" "strconv" "strings" + "time" "github.com/duanhf2012/origin/util" @@ -167,3 +168,8 @@ func (s *COriginNode) GetSysLog() *sysservice.LogService { return logService.(*sysservice.LogService) } + +func (s *COriginNode) EnableMonitorModule(checkInterval time.Duration){ + service.EnableDeadForMonitor(checkInterval) +} + diff --git a/rpc/server.go b/rpc/server.go index 8f9eb4e..4f8e4f3 100644 --- a/rpc/server.go +++ b/rpc/server.go @@ -146,6 +146,7 @@ import ( orginservice "github.com/duanhf2012/origin/service" "github.com/duanhf2012/origin/util" + "github.com/duanhf2012/origin/util/uuid" ) const ( @@ -275,43 +276,28 @@ func (server *Server) ProcessQueue(name string) { } //定时报告队列超负荷运行 - const reportDur = time.Minute - chCap := int64(cap(chanRpc)) - reportLimit := chCap * 1 / 2 - errNum := int64(0) - lastSize := int64(0) - maxSize := int64(0) - totalSize := int64(0) - lastReportTime := time.Now() + var checktm util.Timer + checktm.SetupTimerEx(time.Minute*1) + maxSize := 0 + uuidkey := uuid.Rand().HexEx() for { - //定时报告channel有没有超负荷运行 - if size := int64(len(chanRpc)); size >= reportLimit || lastSize >= reportLimit { - if size >= reportLimit { - errNum++ - totalSize += size - if size > maxSize { - maxSize = size - } - } - dur := time.Now().Sub(lastReportTime) - if dur >= reportDur { - avgSize := int64(0) - if errNum > 0 { - avgSize = totalSize / errNum - } - orginservice.GetLogger().Printf(orginservice.LEVER_WARN, "RpcServer.ProcessQueue(%s) %d erros when channel size reaches %d/%d maxSize=%d avgSize=%d during last %s", - name, errNum, reportLimit, chCap, maxSize, avgSize, dur) - errNum = 0 - maxSize = 0 - totalSize = 0 - lastSize = size - lastReportTime = time.Now() + if checktm.CheckTimeOut() { + orginservice.GetLogger().Printf(orginservice.LEVER_WARN, "RpcServer.ProcessQueue(%s) max %d",name,maxSize) + maxSize = 0 + }else { + curSize := len(chanRpc) + if curSize > maxSize { + maxSize = curSize } } rpcData := <-chanRpc + + + orginservice.MonitorEnter(uuidkey,name) rpcData.service.call(rpcData.server, rpcData.sending, rpcData.wg, rpcData.mtype, rpcData.req, rpcData.argv, rpcData.replyv, rpcData.codec) + orginservice.MonitorLeave(uuidkey) } } diff --git a/service/DeadForMonitor.go b/service/DeadForMonitor.go new file mode 100644 index 0000000..4e0a0eb --- /dev/null +++ b/service/DeadForMonitor.go @@ -0,0 +1,78 @@ +package service + +import ( + "fmt" + "github.com/duanhf2012/origin/util" + "runtime/pprof" + "time" +) + +type ModuleMontior struct { + mapModule *util.MapEx +} + +type ModuleInfo struct { + enterStartTm int64 + mNameInfo string +} + +var moduleMontior ModuleMontior + + + +func MonitorEnter(uuid string,strMonitorInfo string){ + if moduleMontior.mapModule == nil { + return + } + moduleMontior.mapModule.Set(uuid, &ModuleInfo{enterStartTm:time.Now().Unix(),mNameInfo:strMonitorInfo}) +} + +func MonitorLeave(uuid string){ + if moduleMontior.mapModule == nil { + return + } + + moduleMontior.mapModule.Del(uuid) +} + + + +func ReportDeadFor(){ + if moduleMontior.mapModule == nil { + return + } + moduleMontior.mapModule.RLockRange(func(key interface{}, value interface{}) { + if value != nil { + pModuleInfo := value.(*ModuleInfo) + //超过5分钟认为dead for + if time.Now().Unix() - pModuleInfo.enterStartTm > 300 { + GetLogger().Printf(LEVER_FATAL, "module is %s, Dead cycle\n", pModuleInfo.mNameInfo) + } + } + }) +} + +func EnableDeadForMonitor(checkInterval time.Duration){ + moduleMontior.mapModule = util.NewMapEx() + var tmInval util.Timer + tmInval.SetupTimer(int32(checkInterval.Milliseconds())) + go func(){ + for { + time.Sleep(time.Second*5) + if tmInval.CheckTimeOut(){ + ReportDeadFor() + ReportPprof() + } + } + }() +} + + +func ReportPprof(){ + strReport := "" + for _, p := range pprof.Profiles() { + strReport += fmt.Sprintf("Name %s,count %d\n",p.Name(),p.Count()) + } + + GetLogger().Printf(LEVER_INFO, "PProf %s\n", strReport) +} \ No newline at end of file diff --git a/service/Module.go b/service/Module.go index ca7fd16..fc5e458 100644 --- a/service/Module.go +++ b/service/Module.go @@ -2,11 +2,11 @@ package service import ( "fmt" + "github.com/duanhf2012/origin/util" + "github.com/duanhf2012/origin/util/uuid" "runtime/debug" "sync" "sync/atomic" - - "github.com/duanhf2012/origin/util" ) const ( @@ -328,6 +328,9 @@ func (slf *BaseModule) RunModule(module IModule) { timer.SetupTimer(1000) slf.WaitGroup.Add(1) defer slf.WaitGroup.Done() + + uuidkey := uuid.Rand().HexEx() + moduleTypeName := fmt.Sprintf("%T",module) for { if atomic.LoadInt32(&slf.corouterstatus) != 0 { module.OnEndRun() @@ -346,11 +349,14 @@ func (slf *BaseModule) RunModule(module IModule) { } } + MonitorEnter(uuidkey,moduleTypeName) if module.OnRun() == false { module.OnEndRun() + MonitorLeave(uuidkey) GetLogger().Printf(LEVER_INFO, "OnEndRun module %T...", module) return } - } + MonitorLeave(uuidkey) + } } diff --git a/util/Timer.go b/util/Timer.go index 23eae0d..b016d45 100644 --- a/util/Timer.go +++ b/util/Timer.go @@ -18,6 +18,12 @@ func (slf *Timer) SetupTimer(ms int32) { slf.timeinterval = int64(ms) * 1e6 } +func (slf *Timer) SetupTimerEx(tm time.Duration) { + slf.lasttime = time.Now().UnixNano() + slf.timeinterval = int64(tm) +} + + func (slf *Timer) SetupTimerDouble() { slf.lasttime = time.Now().UnixNano() slf.timeinterval *= 2