From 8bcb2af8b00db4cf880c74146a4dbec9302b01ac Mon Sep 17 00:00:00 2001 From: Ally Dale Date: Tue, 17 Sep 2019 13:45:06 +0800 Subject: [PATCH] add auto balancing info --- cluster/cluster.go | 45 ++++++++++++++++++++++++++++++++++++++++-- cluster/config.go | 38 +++++++++++++++++++++++++++++++---- sysmodule/LogModule.go | 13 +++++++++--- 3 files changed, 87 insertions(+), 9 deletions(-) diff --git a/cluster/cluster.go b/cluster/cluster.go index ee98ced..d028e95 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -5,6 +5,7 @@ import ( "math/rand" "net" "os" + "sort" "strings" "time" @@ -274,7 +275,7 @@ func (slf *CCluster) GetNodeList(NodeServiceMethod string, rpcServerMethod *stri servicename = servicename[1:] nodeidList = append(nodeidList, GetNodeId()) } else { - nodeidList = slf.cfg.GetIdByService(servicename) + nodeidList = slf.cfg.GetIdByService(servicename, "") } } else { nodeidList = slf.GetIdByNodeService(nodename, servicename) @@ -293,7 +294,7 @@ func (slf *CCluster) GetNodeList(NodeServiceMethod string, rpcServerMethod *stri //GetNodeIdByServiceName 根据服务名查找nodeid servicename服务名 bOnline是否需要查找在线服务 func (slf *CCluster) GetNodeIdByServiceName(servicename string, bOnline bool) []int { - nodeIDList := slf.cfg.GetIdByService(servicename) + nodeIDList := slf.cfg.GetIdByService(servicename, "") if bOnline { ret := make([]int, 0, len(nodeIDList)) @@ -308,6 +309,40 @@ func (slf *CCluster) GetNodeIdByServiceName(servicename string, bOnline bool) [] return nodeIDList } +//根据Service获取负载均衡信息 +//负载均衡的策略是从配置获取所有配置了该服务的NodeId 并按NodeId排序 每个node负责处理数组index所在的那一部分 +func (slf *CCluster) GetBalancingInfo(currentNodeId int, servicename string, inSubNet bool) (*BalancingInfo, error) { + subNetName := "" + if inSubNet { + if node, ok := slf.cfg.mapIdNode[currentNodeId]; ok { + subNetName = node.SubNetName + } else { + return nil, fmt.Errorf("[cluster.GetBalancingInfo] cannot find node %d", currentNodeId) + } + } + lst := slf.cfg.GetIdByService(servicename, subNetName) + // if len(lst) <= 0 { + // return nil, fmt.Errorf("[cluster.GetBalancingInfo] cannot find service %s in any node", servicename) + // } + sort.Ints(lst) + ret := &BalancingInfo{ + NodeId: currentNodeId, + ServiceName: servicename, + TotalNum: len(lst), + MyIndex: -1, + NodeList: lst, + } + if _, ok := slf.cfg.mapIdNode[currentNodeId]; ok { + for i, v := range lst { + if v == currentNodeId { + ret.MyIndex = i + break + } + } + } + return ret, nil +} + func (slf *CCluster) CheckNodeIsConnectedByID(nodeid int) bool { if nodeid == GetNodeId() { return true @@ -522,6 +557,12 @@ func GetNodeIdByServiceName(serviceName string, bOnline bool) []int { return InstanceClusterMgr().GetNodeIdByServiceName(serviceName, bOnline) } +//获取服务的负载均衡信息 +//负载均衡的策略是从配置获取所有配置了该服务的NodeId 并按NodeId排序 每个node负责处理数组index所在的那一部分 +func GetBalancingInfo(currentNodeId int, servicename string, inSubNet bool) (*BalancingInfo, error) { + return InstanceClusterMgr().GetBalancingInfo(currentNodeId, servicename, inSubNet) +} + //随机选择在线的node发送 func CallRandomService(NodeServiceMethod string, args interface{}, reply interface{}) error { return InstanceClusterMgr().CallRandomService(NodeServiceMethod, args, reply) diff --git a/cluster/config.go b/cluster/config.go index d51a56a..1b66701 100644 --- a/cluster/config.go +++ b/cluster/config.go @@ -8,6 +8,34 @@ import ( "strings" ) +//负载均衡信息 +type BalancingInfo struct { + NodeId int //我的nodeId + ServiceName string //负载均衡的ServiceName + + TotalNum int //总共有多少个协同Node + MyIndex int //负责的index [0, TotalNum) + NodeList []int //所有协同的node列表 按NodeId升序排列 +} + +//判断hash后的Id是否命中我的NodeId +func (slf *BalancingInfo) Hit(hashId int) bool { + if hashId >= 0 && slf.TotalNum > 0 && slf.MyIndex >= 0 { + return hashId%slf.TotalNum == slf.MyIndex + } + return false +} + +//判断命中的NodeId,-1表示无法取得 +func (slf *BalancingInfo) GetHitNodeId(hashId int) int { + if hashId >= 0 && slf.TotalNum > 0 { + if idx := hashId % slf.TotalNum; idx >= 0 && idx < len(slf.NodeList) { + return slf.NodeList[idx] + } + } + return -1 +} + type CNodeCfg struct { NodeID int NodeName string @@ -148,14 +176,16 @@ func ReadCfg(path string, nodeid int, mapNodeData map[int]NodeData) (*ClusterCon return clsCfg, nil } -func (slf *ClusterConfig) GetIdByService(serviceName string) []int { - var nodeidlist []int - nodeidlist = make([]int, 0) +func (slf *ClusterConfig) GetIdByService(serviceName, subNetName string) []int { + var nodeidlist = []int{} nodeList, ok := slf.mapClusterServiceNode[serviceName] if ok == true { + nodeidlist = make([]int, 0, len(nodeList)) for _, v := range nodeList { - nodeidlist = append(nodeidlist, v.NodeID) + if subNetName == "" || subNetName == v.SubNetName { + nodeidlist = append(nodeidlist, v.NodeID) + } } } diff --git a/sysmodule/LogModule.go b/sysmodule/LogModule.go index ba9ff6e..ff6cc14 100644 --- a/sysmodule/LogModule.go +++ b/sysmodule/LogModule.go @@ -55,7 +55,13 @@ func (slf *LogModule) GetCurrentFileName() string { now := time.Now() fpath := filepath.Join("logs") os.MkdirAll(fpath, os.ModePerm) - fname := slf.logfilename + "-" + now.Format("20060102-150405") + ".log" + y, m, d := now.Date() + h := now.Hour() + mm := now.Minute() + mm -= mm % 15 //15分钟内使用同一个日志文件 + dt := y*10000 + int(m)*100 + d + tm := h*100 + mm + fname := fmt.Sprintf("%s-%d-%d.log", slf.logfilename, dt, tm) ret := filepath.Join(fpath, fname) return ret } @@ -84,9 +90,10 @@ func (slf *LogModule) CheckAndGenFile(fileline string) (newFile bool) { } var err error - slf.logFile, err = os.OpenFile(slf.GetCurrentFileName(), os.O_RDWR|os.O_CREATE|os.O_APPEND, os.ModePerm) + filename := slf.GetCurrentFileName() + slf.logFile, err = os.OpenFile(filename, os.O_RDWR|os.O_CREATE|os.O_APPEND, os.ModePerm) if err != nil { - fmt.Printf("create log file %+v error!", slf.GetCurrentFileName()) + fmt.Printf("create log file %+v error!", filename) slf.locker.Unlock() return false }