mirror of
https://github.com/duanhf2012/origin.git
synced 2026-03-02 11:21:11 +08:00
avoid logging disaster when report channel busy in Queue mode
This commit is contained in:
@@ -138,6 +138,7 @@ import (
|
|||||||
"reflect"
|
"reflect"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
|
"time"
|
||||||
"unicode"
|
"unicode"
|
||||||
"unicode/utf8"
|
"unicode/utf8"
|
||||||
|
|
||||||
@@ -273,7 +274,42 @@ func (server *Server) ProcessQueue(name string) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//定时报告队列超负荷运行
|
||||||
|
const reportDur = time.Minute
|
||||||
|
chCap := int64(cap(chanRpc))
|
||||||
|
reportLimit := chCap * 1 / 2
|
||||||
|
errNum := int64(0)
|
||||||
|
lastSize := int64(0)
|
||||||
|
maxSize := int64(0)
|
||||||
|
totalSize := int64(0)
|
||||||
|
lastReportTime := time.Now()
|
||||||
|
|
||||||
for {
|
for {
|
||||||
|
//定时报告channel有没有超负荷运行
|
||||||
|
if size := int64(len(chanRpc)); size >= reportLimit || lastSize >= reportLimit {
|
||||||
|
if size >= reportLimit {
|
||||||
|
errNum++
|
||||||
|
totalSize += size
|
||||||
|
if size > maxSize {
|
||||||
|
maxSize = size
|
||||||
|
}
|
||||||
|
}
|
||||||
|
dur := time.Now().Sub(lastReportTime)
|
||||||
|
if dur >= reportDur {
|
||||||
|
avgSize := int64(0)
|
||||||
|
if errNum > 0 {
|
||||||
|
avgSize = totalSize / errNum
|
||||||
|
}
|
||||||
|
orginservice.GetLogger().Printf(orginservice.LEVER_WARN, "RpcServer.ProcessQueue(%s) %d erros when channel size reaches %d/%d maxSize=%d avgSize=%d during last %s",
|
||||||
|
name, errNum, reportLimit, chCap, maxSize, avgSize, dur)
|
||||||
|
errNum = 0
|
||||||
|
maxSize = 0
|
||||||
|
totalSize = 0
|
||||||
|
lastSize = size
|
||||||
|
lastReportTime = time.Now()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
rpcData := <-chanRpc
|
rpcData := <-chanRpc
|
||||||
rpcData.service.call(rpcData.server, rpcData.sending, rpcData.wg, rpcData.mtype, rpcData.req, rpcData.argv, rpcData.replyv, rpcData.codec)
|
rpcData.service.call(rpcData.server, rpcData.sending, rpcData.wg, rpcData.mtype, rpcData.req, rpcData.argv, rpcData.replyv, rpcData.codec)
|
||||||
}
|
}
|
||||||
@@ -560,7 +596,8 @@ func (server *Server) ServeCodec(codec ServerCodec) {
|
|||||||
rpcChan, ok := server.mapCallQueue[service.name]
|
rpcChan, ok := server.mapCallQueue[service.name]
|
||||||
if ok == true {
|
if ok == true {
|
||||||
if len(rpcChan) >= MAX_RPCDATA_QUEUE_COUNT {
|
if len(rpcChan) >= MAX_RPCDATA_QUEUE_COUNT {
|
||||||
orginservice.GetLogger().Printf(orginservice.LEVER_FATAL, "Rpc Service Name %s chan overload %d", service.name, MAX_RPCDATA_QUEUE_COUNT)
|
//不在这里写日志了 否则RPC繁忙 这里会刷日志把磁盘刷爆 ProcessQueue会记录channel繁忙的日志
|
||||||
|
//orginservice.GetLogger().Printf(orginservice.LEVER_FATAL, "Rpc Service Name %s chan overload %d", service.name, MAX_RPCDATA_QUEUE_COUNT)
|
||||||
|
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user