diff --git a/rpc/server.go b/rpc/server.go index ab5e9fc..84599c1 100644 --- a/rpc/server.go +++ b/rpc/server.go @@ -138,6 +138,7 @@ import ( "reflect" "strings" "sync" + "time" "unicode" "unicode/utf8" @@ -273,7 +274,42 @@ func (server *Server) ProcessQueue(name string) { return } + //定时报告队列超负荷运行 + const reportDur = time.Minute + chCap := int64(cap(chanRpc)) + reportLimit := chCap * 1 / 2 + errNum := int64(0) + lastSize := int64(0) + maxSize := int64(0) + totalSize := int64(0) + lastReportTime := time.Now() + for { + //定时报告channel有没有超负荷运行 + if size := int64(len(chanRpc)); size >= reportLimit || lastSize >= reportLimit { + if size >= reportLimit { + errNum++ + totalSize += size + if size > maxSize { + maxSize = size + } + } + dur := time.Now().Sub(lastReportTime) + if dur >= reportDur { + avgSize := int64(0) + if errNum > 0 { + avgSize = totalSize / errNum + } + orginservice.GetLogger().Printf(orginservice.LEVER_WARN, "RpcServer.ProcessQueue(%s) %d erros when channel size reaches %d/%d maxSize=%d avgSize=%d during last %s", + name, errNum, reportLimit, chCap, maxSize, avgSize, dur) + errNum = 0 + maxSize = 0 + totalSize = 0 + lastSize = size + lastReportTime = time.Now() + } + } + rpcData := <-chanRpc rpcData.service.call(rpcData.server, rpcData.sending, rpcData.wg, rpcData.mtype, rpcData.req, rpcData.argv, rpcData.replyv, rpcData.codec) } @@ -560,7 +596,8 @@ func (server *Server) ServeCodec(codec ServerCodec) { rpcChan, ok := server.mapCallQueue[service.name] if ok == true { if len(rpcChan) >= MAX_RPCDATA_QUEUE_COUNT { - orginservice.GetLogger().Printf(orginservice.LEVER_FATAL, "Rpc Service Name %s chan overload %d", service.name, MAX_RPCDATA_QUEUE_COUNT) + //不在这里写日志了 否则RPC繁忙 这里会刷日志把磁盘刷爆 ProcessQueue会记录channel繁忙的日志 + //orginservice.GetLogger().Printf(orginservice.LEVER_FATAL, "Rpc Service Name %s chan overload %d", service.name, MAX_RPCDATA_QUEUE_COUNT) continue }