diff --git a/cluster/cluster.go b/cluster/cluster.go index c92a1e2..6791bb5 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -2,22 +2,21 @@ package cluster import ( "fmt" + "github.com/duanhf2012/origin/log" "github.com/duanhf2012/origin/rpc" "github.com/duanhf2012/origin/service" "strings" + "sync" ) var configdir = "./config/" -type SubNet struct { - SubNetName string - NodeList []NodeInfo -} + type NodeInfo struct { NodeId int - ListenAddr string NodeName string + ListenAddr string ServiceList []string } @@ -27,55 +26,135 @@ type NodeRpcInfo struct { } - var cluster Cluster type Cluster struct { - localsubnet SubNet //本子网 - mapSubNetInfo map[string] SubNet //子网名称,子网信息 - - mapSubNetNodeInfo map[string]map[int]NodeInfo //map[子网名称]map[NodeId]NodeInfo - localSubNetMapNode map[int]NodeInfo //本子网内 map[NodeId]NodeInfo - localSubNetMapService map[string][]NodeInfo //本子网内所有ServiceName对应的结点列表 - localNodeMapService map[string]interface{} //本Node支持的服务 - localNodeInfo NodeInfo - - localServiceCfg map[string]interface{} //map[servicename]数据 - localNodeServiceCfg map[int]map[string]interface{} //map[nodeid]map[servicename]数据 + localNodeInfo NodeInfo //× + localServiceCfg map[string]interface{} //map[servicename]配置数据* mapRpc map[int] NodeRpcInfo//nodeid - rpcServer rpc.Server -} + serviceDiscovery IServiceDiscovery //服务发现接口 + mapIdNode map[int]NodeInfo //map[NodeId]NodeInfo + mapServiceNode map[string][]int //map[serviceName]NodeInfo + + locker sync.RWMutex +} func SetConfigDir(cfgdir string){ configdir = cfgdir } -func (slf *Cluster) Init(currentNodeId int) error{ - //1.初始化配置 - err := slf.InitCfg(currentNodeId) +func SetServiceDiscovery(serviceDiscovery IServiceDiscovery) { + cluster.serviceDiscovery = serviceDiscovery +} + +func (slf *Cluster) serviceDiscoveryDelNode (nodeId int){ + slf.locker.Lock() + defer slf.locker.Unlock() + + slf.delNode(nodeId) +} + +func (slf *Cluster) delNode(nodeId int){ + //删除rpc连接关系 + rpc,ok := slf.mapRpc[nodeId] + if ok == true { + delete(slf.mapRpc,nodeId) + rpc.client.Close(false) + } + + nodeInfo,ok := slf.mapIdNode[nodeId] + if ok == false { + return + } + + for _,serviceName := range nodeInfo.ServiceList{ + slf.delServiceNode(serviceName,nodeId) + } + + delete(slf.mapIdNode,nodeId) +} + +func (slf *Cluster) delServiceNode(serviceName string,nodeId int){ + nodeList := slf.mapServiceNode[serviceName] + for idx,nId := range nodeList { + if nId == nodeId { + slf.mapServiceNode[serviceName] = append(nodeList[idx:],nodeList[idx+1:]...) + return + } + } +} + + +func (slf *Cluster) serviceDiscoverySetNodeInfo (nodeInfo *NodeInfo){ + if nodeInfo.NodeId == slf.localNodeInfo.NodeId { + return + } + + slf.locker.Lock() + defer slf.locker.Unlock() + + //先清理删除 + slf.delNode(nodeInfo.NodeId) + + //再重新组装 + mapDuplicate := map[string]interface{}{} //预防重复数据 + for _,serviceName := range nodeInfo.ServiceList { + if _,ok := mapDuplicate[serviceName];ok == true { + //存在重复 + log.Error("Bad duplicate Service Cfg.") + continue + } + + slf.mapServiceNode[serviceName] = append(slf.mapServiceNode[serviceName],nodeInfo.NodeId) + } + + slf.mapIdNode[nodeInfo.NodeId] = *nodeInfo + rpcInfo := NodeRpcInfo{} + rpcInfo.nodeinfo = *nodeInfo + rpcInfo.client = &rpc.Client{} + rpcInfo.client.Connect(nodeInfo.ListenAddr) + slf.mapRpc[nodeInfo.NodeId] = rpcInfo +} + +func (slf *Cluster) buildLocalRpc(){ + rpcInfo := NodeRpcInfo{} + rpcInfo.nodeinfo = slf.localNodeInfo + rpcInfo.client = &rpc.Client{} + rpcInfo.client.Connect("") + + slf.mapRpc[slf.localNodeInfo.NodeId] = rpcInfo +} + +func (slf *Cluster) Init(localNodeId int) error{ + slf.locker.Lock() + + + //1.处理服务发现接口 + if slf.serviceDiscovery == nil { + slf.serviceDiscovery = &ConfigDiscovery{} + } + + //2.初始化配置 + err := slf.InitCfg(localNodeId) if err != nil { + slf.locker.Unlock() return err } slf.rpcServer.Init(slf) + slf.buildLocalRpc() - //2.建议rpc连接 - slf.mapRpc = map[int] NodeRpcInfo{} - for _,nodeinfo := range slf.localSubNetMapNode { - rpcinfo := NodeRpcInfo{} - rpcinfo.nodeinfo = nodeinfo - rpcinfo.client = &rpc.Client{} - if nodeinfo.NodeId == currentNodeId { - rpcinfo.client.Connect("") - }else{ - rpcinfo.client.Connect(nodeinfo.ListenAddr) - } - slf.mapRpc[nodeinfo.NodeId] = rpcinfo + slf.serviceDiscovery.RegFunDelNode(slf.serviceDiscoveryDelNode) + slf.serviceDiscovery.RegFunSetNode(slf.serviceDiscoverySetNodeInfo) + slf.locker.Unlock() + + err = slf.serviceDiscovery.Init(localNodeId) + if err != nil { + return err } - return nil } @@ -92,12 +171,17 @@ func (slf *Cluster) Start() { slf.rpcServer.Start(slf.localNodeInfo.ListenAddr) } +func (slf *Cluster) Stop() { + slf.serviceDiscovery.OnNodeStop() +} func GetCluster() *Cluster{ return &cluster } func (slf *Cluster) GetRpcClient(nodeid int) *rpc.Client { + slf.locker.RLock() + defer slf.locker.RUnlock() c,ok := slf.mapRpc[nodeid] if ok == false { return nil diff --git a/cluster/configdiscovery.go b/cluster/configdiscovery.go new file mode 100644 index 0000000..26609e4 --- /dev/null +++ b/cluster/configdiscovery.go @@ -0,0 +1,40 @@ +package cluster + + +type ConfigDiscovery struct { + funDelService FunDelNode + funSetService FunSetNodeInfo + localNodeId int +} + +func (slf *ConfigDiscovery) Init(localNodeId int) error{ + slf.localNodeId = localNodeId + + //解析本地其他服务配置 + nodeInfoList,err := GetCluster().readLocalClusterConfig(0) + if err != nil { + return err + } + + for _,nodeInfo := range nodeInfoList { + if nodeInfo.NodeId == localNodeId { + continue + } + slf.funSetService(&nodeInfo) + } + + return nil +} + + +func (slf *ConfigDiscovery) OnNodeStop(){ +} + + +func (slf *ConfigDiscovery) RegFunDelNode(funDelNode FunDelNode){ + slf.funDelService = funDelNode +} + +func (slf *ConfigDiscovery) RegFunSetNode(funSetNodeInfo FunSetNodeInfo){ + slf.funSetService = funSetNodeInfo +} diff --git a/cluster/parsecfg.go b/cluster/parsecfg.go index 2a62b49..d6aa00a 100644 --- a/cluster/parsecfg.go +++ b/cluster/parsecfg.go @@ -10,9 +10,13 @@ import ( ) var json = jsoniter.ConfigCompatibleWithStandardLibrary +type NodeInfoList struct { + NodeList []NodeInfo +} -func (slf *Cluster) ReadClusterConfig(filepath string) (*SubNet,error) { - c := &SubNet{} + +func (slf *Cluster) ReadClusterConfig(filepath string) (*NodeInfoList,error) { + c := &NodeInfoList{} d, err := ioutil.ReadFile(filepath) if err != nil { return nil, err @@ -26,10 +30,10 @@ func (slf *Cluster) ReadClusterConfig(filepath string) (*SubNet,error) { } -func (slf *Cluster) ReadServiceConfig(filepath string) (map[string]interface{},map[int]map[string]interface{},error) { +func (slf *Cluster) readServiceConfig(filepath string) (map[string]interface{},map[int]map[string]interface{},error) { c := map[string]interface{}{} - + //读取配置 d, err := ioutil.ReadFile(filepath) if err != nil { return nil,nil, err @@ -51,162 +55,156 @@ func (slf *Cluster) ReadServiceConfig(filepath string) (map[string]interface{}, nodeServiceList := nodeServiceCfg.([]interface{}) for _,v := range nodeServiceList{ serviceCfg :=v.(map[string]interface{}) - nodeid,ok := serviceCfg["NodeId"] + nodeId,ok := serviceCfg["NodeId"] if ok == false { - log.Fatal("nodeservice list not find nodeid field: %+v",nodeServiceList) + log.Fatal("NodeService list not find nodeId field: %+v",nodeServiceList) } - mapNodeService[int(nodeid.(float64))] = serviceCfg + mapNodeService[int(nodeId.(float64))] = serviceCfg } } return serviceConfig,mapNodeService,nil } -func (slf *Cluster) ReadAllSubNetConfig() error { + +func (slf *Cluster) readLocalClusterConfig(nodeId int) ([]NodeInfo,error) { + var nodeInfoList [] NodeInfo + clusterCfgPath :=strings.TrimRight(configdir,"/") +"/cluster" + fileInfoList,err := ioutil.ReadDir(clusterCfgPath) + if err != nil { + return nil,fmt.Errorf("Read dir %s is fail :%+v",clusterCfgPath,err) + } + + //读取任何文件,只读符合格式的配置,目录下的文件可以自定义分文件 + for _,f := range fileInfoList{ + if f.IsDir() == false { + filePath := strings.TrimRight(strings.TrimRight(clusterCfgPath,"/"),"\\")+"/"+f.Name() + localNodeInfoList,err := slf.ReadClusterConfig(filePath) + if err != nil { + return nil,fmt.Errorf("read file path %s is error:%+v" ,filePath,err) + } + + for _,nodeInfo := range localNodeInfoList.NodeList { + if nodeInfo.NodeId == nodeId || nodeId == 0 { + nodeInfoList = append(nodeInfoList,nodeInfo) + //slf.localNodeInfo = nodeInfo + } + } + } + } + + if nodeId != 0 && (len(nodeInfoList)!=1){ + return nil,fmt.Errorf("%d configurations were found for the configuration with node ID %d!",len(nodeInfoList),nodeId) + } + + return nodeInfoList,nil +} + +func (slf *Cluster) readLocalService(localNodeId int) error { clusterCfgPath :=strings.TrimRight(configdir,"/") +"/cluster" fileInfoList,err := ioutil.ReadDir(clusterCfgPath) if err != nil { return fmt.Errorf("Read dir %s is fail :%+v",clusterCfgPath,err) } - slf.mapSubNetInfo =map[string] SubNet{} - for _,f := range fileInfoList{ - if f.IsDir() == true { - filePath := strings.TrimRight(strings.TrimRight(clusterCfgPath,"/"),"\\")+"/"+f.Name()+"/"+"cluster.json" - subnetinfo,err:=slf.ReadClusterConfig(filePath) + //读取任何文件,只读符合格式的配置,目录下的文件可以自定义分文件 + for _,f := range fileInfoList { + if f.IsDir() == false { + filePath := strings.TrimRight(strings.TrimRight(clusterCfgPath, "/"), "\\") + "/" + f.Name() + serviceConfig,mapNodeService,err := slf.readServiceConfig(filePath) if err != nil { - return fmt.Errorf("read file path %s is error:%+v" ,filePath,err) + continue + } + + for _,s := range slf.localNodeInfo.ServiceList{ + for{ + //取公共服务配置 + pubCfg,ok := serviceConfig[s] + if ok == true { + slf.localServiceCfg[s] = pubCfg + } + + //如果结点也配置了该服务,则覆盖之 + nodeService,ok := mapNodeService[localNodeId] + if ok == false { + break + } + sCfg,ok := nodeService[s] + if ok == false{ + break + } + + slf.localServiceCfg[s] = sCfg + break + } } - slf.mapSubNetInfo[f.Name()] = *subnetinfo } } + if len(slf.localServiceCfg)==0{ + return fmt.Errorf("No service configuration was found.") + } return nil } -func (slf *Cluster) ReadLocalSubNetServiceConfig(subnet string) error { - clusterCfgPath :=strings.TrimRight(configdir,"/") +"/cluster" - fileInfoList,err := ioutil.ReadDir(clusterCfgPath) +func (slf *Cluster) parseLocalCfg(){ + slf.mapIdNode[slf.localNodeInfo.NodeId] = slf.localNodeInfo + + for _,sName := range slf.localNodeInfo.ServiceList{ + slf.mapServiceNode[sName] = append(slf.mapServiceNode[sName],slf.localNodeInfo.NodeId) + } +} + +func (slf *Cluster) InitCfg(localNodeId int) error{ + slf.localServiceCfg = map[string]interface{}{} + slf.mapRpc = map[int] NodeRpcInfo{} + slf.mapIdNode = map[int]NodeInfo{} + slf.mapServiceNode = map[string][]int{} + + //加载本地结点的NodeList配置 + nodeInfoList,err := slf.readLocalClusterConfig(localNodeId) if err != nil { - return fmt.Errorf("Read %s dir is fail:%+v ",clusterCfgPath,err) + return err } + slf.localNodeInfo = nodeInfoList[0] - slf.mapSubNetInfo =map[string] SubNet{} - for _,f := range fileInfoList{ - if f.IsDir() == true && f.Name()==subnet{ //同一子网 - filePath := strings.TrimRight(strings.TrimRight(clusterCfgPath,"/"),"\\")+"/"+f.Name()+"/"+"service.json" - localServiceCfg,localNodeServiceCfg,err:=slf.ReadServiceConfig(filePath) - if err != nil { - return fmt.Errorf("Read file %s is fail :%+v",filePath,err) - } - slf.localServiceCfg = localServiceCfg - slf.localNodeServiceCfg =localNodeServiceCfg - } - } - - return nil -} - - - -func (slf *Cluster) InitCfg(currentNodeId int) error{ - //mapSubNetInfo := map[string] SubNet{} //子网名称,子网信息 - mapSubNetNodeInfo := map[string]map[int]NodeInfo{} //map[子网名称]map[NodeId]NodeInfo - localSubNetMapNode := map[int]NodeInfo{} //本子网内 map[NodeId]NodeInfo - localSubNetMapService := map[string][]NodeInfo{} //本子网内所有ServiceName对应的结点列表 - localNodeMapService := map[string]interface{}{} //本Node支持的服务 - localNodeInfo := NodeInfo{} - - err := slf.ReadAllSubNetConfig() + //读取本地服务配置 + err = slf.readLocalService(localNodeId) if err != nil { return err } - //分析配置 - var localSubnetName string - for subnetName,subnetInfo := range slf.mapSubNetInfo { - for _,nodeinfo := range subnetInfo.NodeList { - //装载slf.mapNodeInfo - _,ok := mapSubNetNodeInfo[subnetName] - if ok == false { - mapnodeInfo := make(map[int]NodeInfo,1) - mapnodeInfo[nodeinfo.NodeId] = nodeinfo - mapSubNetNodeInfo[subnetName] = mapnodeInfo - }else{ - mapSubNetNodeInfo[subnetName][nodeinfo.NodeId] = nodeinfo - } - - //判断本进程的子网 - if nodeinfo.NodeId == currentNodeId { - localSubnetName = subnetName - } - } - } + //本地配置服务加到全局map信息中 + slf.parseLocalCfg() + return nil +} - //装载 - subnet,ok := slf.mapSubNetInfo[localSubnetName] +func (slf *Cluster) IsConfigService(serviceName string) bool { + slf.locker.RLock() + defer slf.locker.RUnlock() + nodeList,ok := slf.mapServiceNode[serviceName] if ok == false { - return fmt.Errorf("NodeId %d not in any subnet",currentNodeId) + return false } - subnet.SubNetName = localSubnetName - for _,nodeinfo := range subnet.NodeList { - localSubNetMapNode[nodeinfo.NodeId] = nodeinfo - //装载本Node进程所有的服务 - if nodeinfo.NodeId == currentNodeId { - for _,s := range nodeinfo.ServiceList { - servicename := s - if strings.Index(s,"_") == 0 { - servicename = s[1:] - } - localNodeMapService[servicename] = nil - } - localNodeInfo = nodeinfo - } - - for _,s := range nodeinfo.ServiceList { - //以_打头的,表示只在本机进程,不对整个子网开发 - if strings.Index(s,"_") == 0 { - continue - } - - if _,ok := localSubNetMapService[s];ok== true{ - localSubNetMapService[s] = []NodeInfo{} - } - localSubNetMapService[s] = append(localSubNetMapService[s],nodeinfo) + for _,nodeId := range nodeList{ + if slf.localNodeInfo.NodeId == nodeId { + return true } } - if localNodeInfo.NodeId == 0 { - return fmt.Errorf("Canoot find NodeId %d not in any config file.",currentNodeId) - } - - slf.mapSubNetNodeInfo=mapSubNetNodeInfo - slf.localSubNetMapNode=localSubNetMapNode - slf.localSubNetMapService = localSubNetMapService - slf.localNodeMapService = localNodeMapService - slf.localsubnet = subnet - slf.localNodeInfo =localNodeInfo - - //读取服务 - return slf.ReadLocalSubNetServiceConfig(slf.localsubnet.SubNetName) + return false } - -func (slf *Cluster) IsConfigService(servicename string) bool { - _,ok := slf.localNodeMapService[servicename] - return ok -} - - - func (slf *Cluster) GetNodeIdByService(servicename string,rpcClientList *[]*rpc.Client) { - nodeInfoList,ok := slf.localSubNetMapService[servicename] + slf.locker.RLock() + defer slf.locker.RUnlock() + nodeIdList,ok := slf.mapServiceNode[servicename] if ok == true { - for _,node := range nodeInfoList { - pClient := GetCluster().GetRpcClient(node.NodeId) + for _,nodeId := range nodeIdList { + pClient := GetCluster().GetRpcClient(nodeId) if pClient==nil { - log.Error("Cannot connect node id %d",node.NodeId) + log.Error("Cannot connect node id %d",nodeId) continue } *rpcClientList = append(*rpcClientList,pClient) @@ -224,16 +222,11 @@ func (slf *Cluster) getServiceCfg(servicename string) interface{}{ return v } -func (slf *Cluster) GetServiceCfg(nodeid int,servicename string) interface{}{ - nodeService,ok := slf.localNodeServiceCfg[nodeid] +func (slf *Cluster) GetServiceCfg(serviceName string) interface{}{ + serviceCfg,ok := slf.localServiceCfg[serviceName] if ok == false { - return slf.getServiceCfg(servicename) + return nil } - v,ok := nodeService[servicename] - if ok == false{ - return slf.getServiceCfg(servicename) - } - - return v + return serviceCfg } diff --git a/cluster/servicediscovery.go b/cluster/servicediscovery.go new file mode 100644 index 0000000..70b66ac --- /dev/null +++ b/cluster/servicediscovery.go @@ -0,0 +1,15 @@ +package cluster + +type OperType int + +type FunDelNode func (nodeId int) +type FunSetNodeInfo func(nodeInfo *NodeInfo) + +type IServiceDiscovery interface { + Init(localNodeId int) error + OnNodeStop() + + RegFunDelNode(funDelNode FunDelNode) + RegFunSetNode(funSetNodeInfo FunSetNodeInfo) +} + diff --git a/network/tcp_client.go b/network/tcp_client.go index 635d1fd..89d473c 100644 --- a/network/tcp_client.go +++ b/network/tcp_client.go @@ -117,7 +117,7 @@ reconnect: } } -func (client *TCPClient) Close() { +func (client *TCPClient) Close(waitDone bool) { client.Lock() client.closeFlag = true for conn := range client.conns { @@ -126,5 +126,8 @@ func (client *TCPClient) Close() { client.conns = nil client.Unlock() - client.wg.Wait() + if waitDone == true{ + client.wg.Wait() + } } + diff --git a/node/node.go b/node/node.go index 49dc906..a32186b 100644 --- a/node/node.go +++ b/node/node.go @@ -79,7 +79,7 @@ func initNode(id int){ continue } - pServiceCfg := cluster.GetCluster().GetServiceCfg(nodeId,s.GetName()) + pServiceCfg := cluster.GetCluster().GetServiceCfg(s.GetName()) s.Init(s,cluster.GetRpcClient,cluster.GetRpcServer,pServiceCfg) service.Setup(s) @@ -153,7 +153,7 @@ func startNode(args []string) error { profiler.Report() } } - + cluster.GetCluster().Stop() //7.退出 close(closeSig) service.WaitStop() @@ -186,3 +186,4 @@ func SetSysLog(strLevel string, pathname string, flag int){ func OpenProfilerReport(interval time.Duration){ profilerInterval = interval } + diff --git a/service/module.go b/service/module.go index 51dd486..3ce48cc 100644 --- a/service/module.go +++ b/service/module.go @@ -38,7 +38,7 @@ type IModule interface { type Module struct { moduleId int64 parent IModule //父亲 - self IModule //父亲 + self IModule //自己 child map[int64]IModule //孩子们 mapActiveTimer map[*timer.Timer]interface{} mapActiveCron map[*timer.Cron]interface{}