From bb1e3337f6426d137c9255c1559c6d2ff3d07cab Mon Sep 17 00:00:00 2001 From: Ally Dale Date: Tue, 2 Jul 2019 09:43:19 +0800 Subject: [PATCH 1/5] optimization of bytes conversion --- network/websocketclient.go | 8 ++++---- network/websocketserver.go | 2 ++ 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/network/websocketclient.go b/network/websocketclient.go index fa687a0..6d82eb9 100644 --- a/network/websocketclient.go +++ b/network/websocketclient.go @@ -38,7 +38,7 @@ type WebsocketClient struct { timeoutsec time.Duration bRun bool - ping string + ping []byte } const ( @@ -71,12 +71,12 @@ func (ws *WebsocketClient) Init(slf IWebsocketClient, strurl, strProxyPath strin } ws.url = strurl - ws.ping = `ping` + ws.ping = []byte(`ping`) return nil } func (ws *WebsocketClient) SetPing(ping string) { - ws.ping = ping + ws.ping = []byte(ping) } //OnRun ... @@ -182,7 +182,7 @@ func (ws *WebsocketClient) writeMsg() error { case <-timerC: if ws.state == 2 { - err := ws.WriteMessage([]byte(ws.ping)) + err := ws.WriteMessage(ws.ping) if err != nil { service.GetLogger().Printf(service.LEVER_WARN, "websocket client is disconnect [%s],information is %v", ws.url, err) ws.state = 0 diff --git a/network/websocketserver.go b/network/websocketserver.go index 145f2ed..1eb16c6 100644 --- a/network/websocketserver.go +++ b/network/websocketserver.go @@ -23,6 +23,8 @@ type IWebsocketServer interface { CreateClient(conn *websocket.Conn) *WSClient Disconnect(clientid uint64) ReleaseClient(pclient *WSClient) + Clients() []uint64 + BroadcastMsg(messageType int, msg []byte) int } type IMessageReceiver interface { From 6b1404e2035418a68914e1e31e6ea4691565d503 Mon Sep 17 00:00:00 2001 From: Ally Dale Date: Tue, 2 Jul 2019 09:57:15 +0800 Subject: [PATCH 2/5] setup default logger --- service/Logger.go | 20 ++++++++++++++++++++ service/servicemanager.go | 6 +++++- 2 files changed, 25 insertions(+), 1 deletion(-) diff --git a/service/Logger.go b/service/Logger.go index 7d4c4dc..76bec99 100644 --- a/service/Logger.go +++ b/service/Logger.go @@ -1,5 +1,9 @@ package service +import ( + "fmt" +) + const ( LEVER_UNKNOW = 0 LEVER_DEBUG = 1 @@ -10,8 +14,24 @@ const ( LEVEL_MAX = 6 ) +var defaultLogger = &LoggerFmt{} + type ILogger interface { Printf(level uint, format string, v ...interface{}) Print(level uint, v ...interface{}) SetLogLevel(level uint) } + +type LoggerFmt struct { +} + +func (slf *LoggerFmt) Printf(level uint, format string, v ...interface{}) { + fmt.Printf(format, v...) + fmt.Println("") +} +func (slf *LoggerFmt) Print(level uint, v ...interface{}) { + fmt.Println(v...) +} +func (slf *LoggerFmt) SetLogLevel(level uint) { + //do nothing +} diff --git a/service/servicemanager.go b/service/servicemanager.go index 565e63d..e52744d 100644 --- a/service/servicemanager.go +++ b/service/servicemanager.go @@ -84,7 +84,11 @@ func (slf *CServiceManager) GenServiceID() int { } func (slf *CServiceManager) GetLogger() ILogger { - return slf.logger + ret := slf.logger + if ret == nil { + ret = defaultLogger + } + return ret } var self *CServiceManager From f057166f0956385c84c58867f02e38e1782b67bd Mon Sep 17 00:00:00 2001 From: Ally Dale Date: Tue, 2 Jul 2019 10:16:09 +0800 Subject: [PATCH 3/5] add servicelist --- servicelist/servicelist.go | 37 +++++++++++++++++++++++++++++++++++++ 1 file changed, 37 insertions(+) create mode 100644 servicelist/servicelist.go diff --git a/servicelist/servicelist.go b/servicelist/servicelist.go new file mode 100644 index 0000000..0f02d2c --- /dev/null +++ b/servicelist/servicelist.go @@ -0,0 +1,37 @@ +package servicelist + +import ( + "github.com/duanhf2012/origin/originnode" + "github.com/duanhf2012/origin/service" +) + +var node = func() *originnode.COriginNode { + //1.新建OrginNode结点 + node := originnode.NewOriginNode() + if node == nil { + println("originnode.NewOriginNode fail") + return nil + } + return node +}() + +var serviceList []service.IService + +// 增加服务列表 在init中调用 +// 因为是init的时候调用 所以不用锁 +func PushService(s service.IService) { + serviceList = append(serviceList, s) +} + +//在main中调用该函数即可加载所有service +//debugCheckUrl "localhost:6060" +func Start(debugCheckUrl string) { + node.OpenDebugCheck(debugCheckUrl) + node.SetupService(serviceList...) + + //5.初始化结点 + node.Init() + + //6.开始结点 + node.Start() +} From b83c7dea29bd1c036cdf8437d2a33251831b7a56 Mon Sep 17 00:00:00 2001 From: Ally Dale Date: Fri, 5 Jul 2019 19:18:48 +0800 Subject: [PATCH 4/5] add GoQueueEx to avoid Logging in origin --- cluster/cluster.go | 43 +++++++++++++++++++++++++++++++++++-------- 1 file changed, 35 insertions(+), 8 deletions(-) diff --git a/cluster/cluster.go b/cluster/cluster.go index 61f02cc..f987a2a 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -345,12 +345,19 @@ func (slf *CCluster) GetRpcClientByNodeId(nodeid int) *RpcClient { } func (slf *CCluster) Go(bCast bool, NodeServiceMethod string, args interface{}, queueModle bool) error { + return slf.goImpl(bCast, NodeServiceMethod, args, queueModle, true) +} + +func (slf *CCluster) goImpl(bCast bool, NodeServiceMethod string, args interface{}, queueModle bool, log bool) error { var callServiceName string var serviceName string nodeidList := slf.GetNodeList(NodeServiceMethod, &callServiceName, &serviceName) if len(nodeidList) < 1 { - service.GetLogger().Printf(sysmodule.LEVER_ERROR, "CCluster.Go(%s) not find nodes.", NodeServiceMethod) - return fmt.Errorf("CCluster.Go(%s) not find nodes.", NodeServiceMethod) + err := fmt.Errorf("CCluster.Go(%s) not find nodes.", NodeServiceMethod) + if log { + service.GetLogger().Printf(sysmodule.LEVER_ERROR, err.Error()) + } + return err } if bCast == false && len(nodeidList) > 1 { @@ -364,23 +371,38 @@ func (slf *CCluster) Go(bCast bool, NodeServiceMethod string, args interface{}, return fmt.Errorf("CCluster.Go(%s) cannot find service %s", NodeServiceMethod, serviceName) } if iService.IsInit() == false { - service.GetLogger().Printf(sysmodule.LEVER_WARN, "CCluster.Call(%s): NodeId %d is not init.", NodeServiceMethod, nodeid) - return fmt.Errorf("CCluster.Call(%s): NodeId %d is not init.", NodeServiceMethod, nodeid) + err := fmt.Errorf("CCluster.Call(%s): NodeId %d is not init.", NodeServiceMethod, nodeid) + if log { + service.GetLogger().Printf(sysmodule.LEVER_WARN, err.Error()) + } + return err } replyCall := slf.LocalRpcClient.Go(callServiceName, args, nil, nil, queueModle) if replyCall.Error != nil { - service.GetLogger().Printf(sysmodule.LEVER_ERROR, "CCluster.Go(%s) fail:%v.", NodeServiceMethod, replyCall.Error) + err := fmt.Errorf("CCluster.Go(%s) fail:%v.", NodeServiceMethod, replyCall.Error) + if log { + service.GetLogger().Printf(sysmodule.LEVER_ERROR, err.Error()) + } else { + return err + } } } else { pclient := slf.GetClusterClient(nodeid) if pclient == nil { - service.GetLogger().Printf(sysmodule.LEVER_ERROR, "CCluster.Go(%s) NodeId %d not find client", NodeServiceMethod, nodeid) - return fmt.Errorf("CCluster.Go(%s) NodeId %d not find client", NodeServiceMethod, nodeid) + err := fmt.Errorf("CCluster.Go(%s) NodeId %d not find client", NodeServiceMethod, nodeid) + if log { + service.GetLogger().Printf(sysmodule.LEVER_ERROR, err.Error()) + } + return err } replyCall := pclient.Go(callServiceName, args, nil, nil, queueModle) if replyCall.Error != nil { - service.GetLogger().Printf(sysmodule.LEVER_ERROR, "CCluster.Go(%s) fail:%v.", NodeServiceMethod, replyCall.Error) + err := fmt.Errorf("CCluster.Go(%s) fail:%v.", NodeServiceMethod, replyCall.Error) + if log { + service.GetLogger().Printf(sysmodule.LEVER_ERROR, err.Error()) + } + return err } } } @@ -455,6 +477,11 @@ func GoQueue(NodeServiceMethod string, args interface{}) error { return InstanceClusterMgr().Go(false, NodeServiceMethod, args, true) } +//在GoQueue的基础上增加是否写日志参数 +func GoQueueEx(NodeServiceMethod string, args interface{}, log bool) error { + return InstanceClusterMgr().goImpl(false, NodeServiceMethod, args, true, log) +} + func CastGoQueue(NodeServiceMethod string, args interface{}) error { return InstanceClusterMgr().Go(true, NodeServiceMethod, args, true) } From 22b9642de0661686bf280799cf93ff9283fe84dc Mon Sep 17 00:00:00 2001 From: Ally Dale Date: Wed, 10 Jul 2019 17:10:21 +0800 Subject: [PATCH 5/5] avoid logging disaster when report channel busy in Queue mode --- rpc/server.go | 39 ++++++++++++++++++++++++++++++++++++++- 1 file changed, 38 insertions(+), 1 deletion(-) diff --git a/rpc/server.go b/rpc/server.go index ab5e9fc..84599c1 100644 --- a/rpc/server.go +++ b/rpc/server.go @@ -138,6 +138,7 @@ import ( "reflect" "strings" "sync" + "time" "unicode" "unicode/utf8" @@ -273,7 +274,42 @@ func (server *Server) ProcessQueue(name string) { return } + //定时报告队列超负荷运行 + const reportDur = time.Minute + chCap := int64(cap(chanRpc)) + reportLimit := chCap * 1 / 2 + errNum := int64(0) + lastSize := int64(0) + maxSize := int64(0) + totalSize := int64(0) + lastReportTime := time.Now() + for { + //定时报告channel有没有超负荷运行 + if size := int64(len(chanRpc)); size >= reportLimit || lastSize >= reportLimit { + if size >= reportLimit { + errNum++ + totalSize += size + if size > maxSize { + maxSize = size + } + } + dur := time.Now().Sub(lastReportTime) + if dur >= reportDur { + avgSize := int64(0) + if errNum > 0 { + avgSize = totalSize / errNum + } + orginservice.GetLogger().Printf(orginservice.LEVER_WARN, "RpcServer.ProcessQueue(%s) %d erros when channel size reaches %d/%d maxSize=%d avgSize=%d during last %s", + name, errNum, reportLimit, chCap, maxSize, avgSize, dur) + errNum = 0 + maxSize = 0 + totalSize = 0 + lastSize = size + lastReportTime = time.Now() + } + } + rpcData := <-chanRpc rpcData.service.call(rpcData.server, rpcData.sending, rpcData.wg, rpcData.mtype, rpcData.req, rpcData.argv, rpcData.replyv, rpcData.codec) } @@ -560,7 +596,8 @@ func (server *Server) ServeCodec(codec ServerCodec) { rpcChan, ok := server.mapCallQueue[service.name] if ok == true { if len(rpcChan) >= MAX_RPCDATA_QUEUE_COUNT { - orginservice.GetLogger().Printf(orginservice.LEVER_FATAL, "Rpc Service Name %s chan overload %d", service.name, MAX_RPCDATA_QUEUE_COUNT) + //不在这里写日志了 否则RPC繁忙 这里会刷日志把磁盘刷爆 ProcessQueue会记录channel繁忙的日志 + //orginservice.GetLogger().Printf(orginservice.LEVER_FATAL, "Rpc Service Name %s chan overload %d", service.name, MAX_RPCDATA_QUEUE_COUNT) continue }