1.websocket接受消息,每15秒监控最大耗时消息类型,某消息处理超过300ms会有监控日志

2.GoQueue处理队列监控,每1分钟汇报积压处理的最大量。并监控死循环
This commit is contained in:
duanhf2012
2020-02-25 16:36:43 +08:00
parent 5680712396
commit 4c6d72fd65
8 changed files with 143 additions and 36 deletions

View File

@@ -7,6 +7,7 @@ import (
"github.com/duanhf2012/origin/originnode"
"github.com/duanhf2012/origin/sysservice"
"github.com/duanhf2012/origin/sysservice/originhttp"
"time"
)
@@ -31,6 +32,9 @@ func main() {
return
}
//打开Module死循环监控
node.EnableMonitorModule(time.Second*5)
nodeCfg, _ := cluster.ReadNodeConfig("./config/nodeconfig.json", cluster.GetNodeId())
httpserver := originhttp.NewHttpServerService(nodeCfg.HttpPort) // http服务
for _, ca := range nodeCfg.CAFile {
@@ -46,6 +50,7 @@ func main() {
httpserver.SetPrintRequestTime(true)
node.SetupService(httpserver,pTcpService)
node.Init()
node.Start()
}

View File

@@ -9,7 +9,8 @@
"NodeName": "N_Node1",
"ServiceList": [
"TestService1",
"TestService2"
"TestService2",
"HttpServerService"
],
"ClusterNode":[]
},

View File

@@ -4,6 +4,7 @@ import (
"crypto/tls"
"errors"
"fmt"
"github.com/duanhf2012/origin/util"
"net/http"
"os"
"runtime/debug"
@@ -79,7 +80,6 @@ const (
)
func (slf *WebsocketServer) Init(port uint16) {
slf.port = port
slf.mapClient = make(map[uint64]*WSClient)
}
@@ -88,7 +88,7 @@ func (slf *WebsocketServer) CreateClient(conn *websocket.Conn) *WSClient {
slf.locker.Lock()
slf.maxClientid++
clientid := slf.maxClientid
pclient := &WSClient{clientid, conn, make(chan WSMessage, MAX_MSG_COUNT)}
pclient := &WSClient{clientid, conn, make(chan WSMessage, MAX_MSG_COUNT+1)}
slf.mapClient[pclient.clientid] = pclient
slf.locker.Unlock()
@@ -245,6 +245,12 @@ func (slf *BaseMessageReciver) startReadMsg(pclient *WSClient) {
}
}()
var maxTimeStamp int64
var maxMsgType int
logMinMsgTime :=time.Millisecond*300
statisticsIntervalTm := util.Timer{}
statisticsIntervalTm.SetupTimer(1000 * 15)//15秒间隔
for {
pclient.conn.SetReadDeadline(time.Now().Add(15 * time.Second))
msgtype, message, err := pclient.conn.ReadMessage()
@@ -255,7 +261,20 @@ func (slf *BaseMessageReciver) startReadMsg(pclient *WSClient) {
return
}
if statisticsIntervalTm.CheckTimeOut() {
service.GetLogger().Printf(service.LEVER_INFO, "MaxMsgtype:%d,diff:%d",maxMsgType,maxTimeStamp)
}
//记录处理时间
startRecvTm := time.Now().UnixNano()
slf.messageReciver.OnRecvMsg(pclient.clientid, msgtype, message)
diff := time.Now().UnixNano() - startRecvTm
if diff> maxTimeStamp{
maxTimeStamp = diff
maxMsgType = msgtype
}
if diff >= int64(logMinMsgTime) {
service.GetLogger().Printf(service.LEVER_WARN, "Process slowly MaxMsgtype:%d,diff:%d",maxMsgType,maxTimeStamp)
}
}
}

View File

@@ -5,6 +5,7 @@ import (
"log"
"strconv"
"strings"
"time"
"github.com/duanhf2012/origin/util"
@@ -167,3 +168,8 @@ func (s *COriginNode) GetSysLog() *sysservice.LogService {
return logService.(*sysservice.LogService)
}
func (s *COriginNode) EnableMonitorModule(checkInterval time.Duration){
service.EnableDeadForMonitor(checkInterval)
}

View File

@@ -146,6 +146,7 @@ import (
orginservice "github.com/duanhf2012/origin/service"
"github.com/duanhf2012/origin/util"
"github.com/duanhf2012/origin/util/uuid"
)
const (
@@ -275,43 +276,28 @@ func (server *Server) ProcessQueue(name string) {
}
//定时报告队列超负荷运行
const reportDur = time.Minute
chCap := int64(cap(chanRpc))
reportLimit := chCap * 1 / 2
errNum := int64(0)
lastSize := int64(0)
maxSize := int64(0)
totalSize := int64(0)
lastReportTime := time.Now()
var checktm util.Timer
checktm.SetupTimerEx(time.Minute*1)
maxSize := 0
uuidkey := uuid.Rand().HexEx()
for {
//定时报告channel有没有超负荷运行
if size := int64(len(chanRpc)); size >= reportLimit || lastSize >= reportLimit {
if size >= reportLimit {
errNum++
totalSize += size
if size > maxSize {
maxSize = size
}
}
dur := time.Now().Sub(lastReportTime)
if dur >= reportDur {
avgSize := int64(0)
if errNum > 0 {
avgSize = totalSize / errNum
}
orginservice.GetLogger().Printf(orginservice.LEVER_WARN, "RpcServer.ProcessQueue(%s) %d erros when channel size reaches %d/%d maxSize=%d avgSize=%d during last %s",
name, errNum, reportLimit, chCap, maxSize, avgSize, dur)
errNum = 0
maxSize = 0
totalSize = 0
lastSize = size
lastReportTime = time.Now()
if checktm.CheckTimeOut() {
orginservice.GetLogger().Printf(orginservice.LEVER_WARN, "RpcServer.ProcessQueue(%s) max %d",name,maxSize)
maxSize = 0
}else {
curSize := len(chanRpc)
if curSize > maxSize {
maxSize = curSize
}
}
rpcData := <-chanRpc
orginservice.MonitorEnter(uuidkey,name)
rpcData.service.call(rpcData.server, rpcData.sending, rpcData.wg, rpcData.mtype, rpcData.req, rpcData.argv, rpcData.replyv, rpcData.codec)
orginservice.MonitorLeave(uuidkey)
}
}

78
service/DeadForMonitor.go Normal file
View File

@@ -0,0 +1,78 @@
package service
import (
"fmt"
"github.com/duanhf2012/origin/util"
"runtime/pprof"
"time"
)
type ModuleMontior struct {
mapModule *util.MapEx
}
type ModuleInfo struct {
enterStartTm int64
mNameInfo string
}
var moduleMontior ModuleMontior
func MonitorEnter(uuid string,strMonitorInfo string){
if moduleMontior.mapModule == nil {
return
}
moduleMontior.mapModule.Set(uuid, &ModuleInfo{enterStartTm:time.Now().Unix(),mNameInfo:strMonitorInfo})
}
func MonitorLeave(uuid string){
if moduleMontior.mapModule == nil {
return
}
moduleMontior.mapModule.Del(uuid)
}
func ReportDeadFor(){
if moduleMontior.mapModule == nil {
return
}
moduleMontior.mapModule.RLockRange(func(key interface{}, value interface{}) {
if value != nil {
pModuleInfo := value.(*ModuleInfo)
//超过5分钟认为dead for
if time.Now().Unix() - pModuleInfo.enterStartTm > 300 {
GetLogger().Printf(LEVER_FATAL, "module is %s, Dead cycle\n", pModuleInfo.mNameInfo)
}
}
})
}
func EnableDeadForMonitor(checkInterval time.Duration){
moduleMontior.mapModule = util.NewMapEx()
var tmInval util.Timer
tmInval.SetupTimer(int32(checkInterval.Milliseconds()))
go func(){
for {
time.Sleep(time.Second*5)
if tmInval.CheckTimeOut(){
ReportDeadFor()
ReportPprof()
}
}
}()
}
func ReportPprof(){
strReport := ""
for _, p := range pprof.Profiles() {
strReport += fmt.Sprintf("Name %s,count %d\n",p.Name(),p.Count())
}
GetLogger().Printf(LEVER_INFO, "PProf %s\n", strReport)
}

View File

@@ -2,11 +2,11 @@ package service
import (
"fmt"
"github.com/duanhf2012/origin/util"
"github.com/duanhf2012/origin/util/uuid"
"runtime/debug"
"sync"
"sync/atomic"
"github.com/duanhf2012/origin/util"
)
const (
@@ -328,6 +328,9 @@ func (slf *BaseModule) RunModule(module IModule) {
timer.SetupTimer(1000)
slf.WaitGroup.Add(1)
defer slf.WaitGroup.Done()
uuidkey := uuid.Rand().HexEx()
moduleTypeName := fmt.Sprintf("%T",module)
for {
if atomic.LoadInt32(&slf.corouterstatus) != 0 {
module.OnEndRun()
@@ -346,11 +349,14 @@ func (slf *BaseModule) RunModule(module IModule) {
}
}
MonitorEnter(uuidkey,moduleTypeName)
if module.OnRun() == false {
module.OnEndRun()
MonitorLeave(uuidkey)
GetLogger().Printf(LEVER_INFO, "OnEndRun module %T...", module)
return
}
}
MonitorLeave(uuidkey)
}
}

View File

@@ -18,6 +18,12 @@ func (slf *Timer) SetupTimer(ms int32) {
slf.timeinterval = int64(ms) * 1e6
}
func (slf *Timer) SetupTimerEx(tm time.Duration) {
slf.lasttime = time.Now().UnixNano()
slf.timeinterval = int64(tm)
}
func (slf *Timer) SetupTimerDouble() {
slf.lasttime = time.Now().UnixNano()
slf.timeinterval *= 2