diff --git a/cluster/cluster.go b/cluster/cluster.go index 492be9a..3981cdd 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -9,11 +9,9 @@ import ( "strings" "time" - "github.com/duanhf2012/origin/sysmodule" - - "github.com/duanhf2012/origin/service" - "github.com/duanhf2012/origin/rpc" + "github.com/duanhf2012/origin/service" + "github.com/duanhf2012/origin/sysmodule" ) type RpcClient struct { @@ -39,8 +37,12 @@ type CCluster struct { } func (slf *CCluster) ReadNodeInfo(nodeid int) error { - var err error - slf.cfg, err = ReadCfg("./config/cluster.json", nodeid) + mapNodeData, err := ReadAllNodeConfig("./config/nodeconfig.json") + if err != nil { + return err + } + + slf.cfg, err = ReadCfg("./config/cluster.json", nodeid, mapNodeData) if err != nil { return err } @@ -126,6 +128,10 @@ func (slf *CPing) Ping(ping *CPing, pong *CPong) error { return nil } +func (slf *CCluster) GetClusterMode() string { + return slf.cfg.GetClusterMode() +} + func (slf *CCluster) ConnService() error { ping := CPing{0} pong := CPong{0} @@ -141,6 +147,8 @@ func (slf *CCluster) ConnService() error { } } + //判断集群模式 + for { for _, rpcClient := range slf.nodeclient { @@ -186,7 +194,6 @@ func (slf *CCluster) ConnService() error { func (slf *CCluster) Init() error { if len(os.Args) < 2 { - return fmt.Errorf("Param error not find NodeId=number") } @@ -203,12 +210,12 @@ func (slf *CCluster) Init() error { slf.nodeclient = make(map[int]*RpcClient) //读取配置 - ret, err := strconv.Atoi(parts[1]) + currentNodeid, err := strconv.Atoi(parts[1]) if err != nil { return err } - return slf.ReadNodeInfo(ret) + return slf.ReadNodeInfo(currentNodeid) } func (slf *CCluster) Start() error { diff --git a/cluster/config.go b/cluster/config.go index 3828483..aba2cd5 100644 --- a/cluster/config.go +++ b/cluster/config.go @@ -5,35 +5,58 @@ import ( "errors" "fmt" "io/ioutil" + "strings" ) type CNodeCfg struct { NodeID int NodeName string - ServerAddr string ServiceList []string ClusterNode []string } type CNode struct { + SubNetName string NodeID int - NodeName string + NodeName string //SubNetName.NodeName ServerAddr string ServiceList map[string]bool } -type ClusterConfig struct { +type SubNetNodeInfo struct { + SubNetMode string + SubNetName string PublicServiceList []string NodeList []CNodeCfg //配置列表 - currentNode CNode //当前node - mapIdNode map[int]CNode //map[nodeid] CNode + //mapClusterNodeService map[string][]CNode //map[nodename] []CNode + //mapClusterServiceNode map[string][]CNode //map[servicename] []CNode +} + +type ClusterConfig struct { + SubNet []SubNetNodeInfo + //CurrentSubNetIdx int + mapIdNode map[int]CNode + currentNode CNode //当前node + mapClusterNodeService map[string][]CNode //map[nodename] []CNode mapClusterServiceNode map[string][]CNode //map[servicename] []CNode } -func ReadCfg(path string, nodeid int) (*ClusterConfig, error) { - c := &ClusterConfig{} +func GenNodeName(subnetName string, nodename string) string { + parts := strings.Split(nodename, ".") + if len(parts) < 2 { + return subnetName + "." + nodename + } + + return nodename +} + +func ReadCfg(path string, nodeid int, mapNodeData map[int]NodeData) (*ClusterConfig, error) { + clsCfg := &ClusterConfig{} + clsCfg.mapIdNode = map[int]CNode{} + clsCfg.mapClusterNodeService = make(map[string][]CNode, 1) + clsCfg.mapClusterServiceNode = make(map[string][]CNode, 1) //1.加载解析配置 d, err := ioutil.ReadFile(path) @@ -42,67 +65,76 @@ func ReadCfg(path string, nodeid int) (*ClusterConfig, error) { return nil, err } - err = json.Unmarshal(d, c) + err = json.Unmarshal(d, clsCfg) if err != nil { fmt.Printf("Read File %s ,%s Error!", path, err) return nil, err } - c.mapIdNode = make(map[int]CNode, 1) - c.mapClusterNodeService = make(map[string][]CNode, 1) - c.mapClusterServiceNode = make(map[string][]CNode, 1) + //存储所有的nodeid对应cnode信息 + var custerNodeNameList []string + for _, c := range clsCfg.SubNet { + for _, v := range c.NodeList { + mapservice := make(map[string]bool, 1) + for _, s := range v.ServiceList { + mapservice[s] = true + } + nodeData, ok := mapNodeData[v.NodeID] + if ok == false { + return nil, errors.New(fmt.Sprintf("Cannot find node id %d in nodeconfig.json file!", v.NodeID)) + } - //2.组装mapIdNode - var custerNodeName []string - for _, v := range c.NodeList { - mapservice := make(map[string]bool, 1) - for _, s := range v.ServiceList { - mapservice[s] = true - } + node := CNode{c.SubNetName, v.NodeID, c.SubNetName + "." + v.NodeName, nodeData.NodeAddr, mapservice} + clsCfg.mapIdNode[v.NodeID] = node - node := CNode{v.NodeID, v.NodeName, v.ServerAddr, mapservice} - c.mapIdNode[v.NodeID] = node + if v.NodeID == nodeid { + clsCfg.currentNode = node + for _, servicename := range c.PublicServiceList { + clsCfg.currentNode.ServiceList[servicename] = true + } - if v.NodeID == nodeid { - //保存当前结点 - c.currentNode = node - custerNodeName = v.ClusterNode + for _, nodename := range v.ClusterNode { + custerNodeNameList = append(custerNodeNameList, GenNodeName(c.SubNetName, nodename)) + } + custerNodeNameList = append(custerNodeNameList, GenNodeName(c.SubNetName, v.NodeName)) + } } } - if c.currentNode.NodeID == 0 { + if clsCfg.currentNode.NodeID == 0 { return nil, errors.New(fmt.Sprintf("Cannot find NodeId %d in cluster.json!", nodeid)) } - //3.存入当前Node服务名 - c.mapClusterNodeService[c.currentNode.NodeName] = append(c.mapClusterNodeService[c.currentNode.NodeName], c.currentNode) - - //4.组装mapClusterNodeService - for _, cn := range custerNodeName { - for _, n := range c.mapIdNode { - if n.NodeName == cn { - nodeList := c.mapClusterNodeService[n.NodeName] - if IsExistsNode(nodeList, &n) == false { - c.mapClusterNodeService[n.NodeName] = append(c.mapClusterNodeService[n.NodeName], n) + //如果集群是FULL模式 + if strings.ToUpper(clsCfg.GetClusterMode()) == "FULL" { + for _, subnet := range clsCfg.SubNet { + if subnet.SubNetName == clsCfg.currentNode.SubNetName { + for _, nodes := range subnet.NodeList { + custerNodeNameList = append(custerNodeNameList, subnet.SubNetName+"."+nodes.NodeName) } } } } - //5.组装mapClusterServiceNode - for _, nodelist := range c.mapClusterNodeService { //[]Node - for _, node := range nodelist { //Node - for s := range node.ServiceList { - c.mapClusterServiceNode[s] = append(c.mapClusterServiceNode[s], node) + for _, clusternodename := range custerNodeNameList { + for _, c := range clsCfg.SubNet { + for _, nodecfg := range c.NodeList { + if clusternodename != c.SubNetName+"."+nodecfg.NodeName { + continue + } + n, ok := clsCfg.mapIdNode[nodecfg.NodeID] + if ok == false { + return nil, errors.New(fmt.Sprintf("Cannot find NodeId %d in cluster.json!", nodecfg.NodeID)) + } + clsCfg.mapClusterNodeService[clusternodename] = append(clsCfg.mapClusterNodeService[clusternodename], n) + for _, sname := range nodecfg.ServiceList { + clsCfg.mapClusterServiceNode[sname] = append(clsCfg.mapClusterServiceNode[sname], n) + } } } } - //向c.currentNode中加入公共服务 - for _, servicename := range c.PublicServiceList { - c.currentNode.ServiceList[servicename] = true - } - return c, nil + return clsCfg, nil } func (slf *ClusterConfig) GetIdByService(serviceName string) []int { @@ -165,3 +197,103 @@ func (slf *ClusterConfig) GetNodeNameByNodeId(nodeid int) string { return node.NodeName } + +func (slf *ClusterConfig) GetClusterMode() string { + //SubNet []SubNetNodeInfo + for _, subnet := range slf.SubNet { + if subnet.SubNetName == slf.currentNode.SubNetName { + return subnet.SubNetMode + } + } + + return "" +} + +type CACfg struct { + CertFile string + KeyFile string +} + +//NodeConfig ... +type NodeData struct { + NodeID int + LogLevel uint + HttpPort uint16 + WSPort uint16 + NodeAddr string + CAFile []CACfg + + Environment string + IsListenLog int + IsSendErrorMail int +} + +type NodeConfig struct { + Public NodeData + NodeList []NodeData +} + +//ReadNodeConfig ... +func ReadAllNodeConfig(path string) (map[int]NodeData, error) { + c := &NodeConfig{} + d, err := ioutil.ReadFile(path) + if err != nil { + return nil, err + } + err = json.Unmarshal(d, c) + if err != nil { + return nil, err + } + + var mapNodeData map[int]NodeData + mapNodeData = map[int]NodeData{} + + //data = c.Public + for _, v := range c.NodeList { + mapNodeData[v.NodeID] = v + } + + return mapNodeData, nil +} +func ReadNodeConfig(path string, nodeid int) (*NodeData, error) { + + c := &NodeConfig{} + d, err := ioutil.ReadFile(path) + if err != nil { + return nil, err + } + err = json.Unmarshal(d, c) + if err != nil { + return nil, err + } + + var data NodeData + data = c.Public + for _, v := range c.NodeList { + if v.NodeID == nodeid { + data = v + if v.Environment == "" && c.Public.Environment != "" { + data.Environment = c.Public.Environment + } + if len(v.CAFile) == 0 && len(c.Public.CAFile) != 0 { + data.CAFile = c.Public.CAFile + } + + if v.HttpPort == 0 && c.Public.HttpPort != 0 { + data.HttpPort = c.Public.HttpPort + } + if v.WSPort == 0 && c.Public.WSPort != 0 { + data.WSPort = c.Public.WSPort + } + if v.LogLevel == 0 && c.Public.LogLevel != 0 { + data.LogLevel = c.Public.LogLevel + } + if v.IsListenLog == 0 && c.Public.IsListenLog != 0 { + data.IsListenLog = c.Public.IsListenLog + } + break + } + } + + return &data, nil +}