提交FULL集群模式

This commit is contained in:
boyce
2019-09-04 13:30:04 +08:00
parent 89936a887d
commit 8d20108a82
2 changed files with 193 additions and 54 deletions

View File

@@ -9,11 +9,9 @@ import (
"strings"
"time"
"github.com/duanhf2012/origin/sysmodule"
"github.com/duanhf2012/origin/service"
"github.com/duanhf2012/origin/rpc"
"github.com/duanhf2012/origin/service"
"github.com/duanhf2012/origin/sysmodule"
)
type RpcClient struct {
@@ -39,8 +37,12 @@ type CCluster struct {
}
func (slf *CCluster) ReadNodeInfo(nodeid int) error {
var err error
slf.cfg, err = ReadCfg("./config/cluster.json", nodeid)
mapNodeData, err := ReadAllNodeConfig("./config/nodeconfig.json")
if err != nil {
return err
}
slf.cfg, err = ReadCfg("./config/cluster.json", nodeid, mapNodeData)
if err != nil {
return err
}
@@ -126,6 +128,10 @@ func (slf *CPing) Ping(ping *CPing, pong *CPong) error {
return nil
}
func (slf *CCluster) GetClusterMode() string {
return slf.cfg.GetClusterMode()
}
func (slf *CCluster) ConnService() error {
ping := CPing{0}
pong := CPong{0}
@@ -141,6 +147,8 @@ func (slf *CCluster) ConnService() error {
}
}
//判断集群模式
for {
for _, rpcClient := range slf.nodeclient {
@@ -186,7 +194,6 @@ func (slf *CCluster) ConnService() error {
func (slf *CCluster) Init() error {
if len(os.Args) < 2 {
return fmt.Errorf("Param error not find NodeId=number")
}
@@ -203,12 +210,12 @@ func (slf *CCluster) Init() error {
slf.nodeclient = make(map[int]*RpcClient)
//读取配置
ret, err := strconv.Atoi(parts[1])
currentNodeid, err := strconv.Atoi(parts[1])
if err != nil {
return err
}
return slf.ReadNodeInfo(ret)
return slf.ReadNodeInfo(currentNodeid)
}
func (slf *CCluster) Start() error {

View File

@@ -5,35 +5,58 @@ import (
"errors"
"fmt"
"io/ioutil"
"strings"
)
type CNodeCfg struct {
NodeID int
NodeName string
ServerAddr string
ServiceList []string
ClusterNode []string
}
type CNode struct {
SubNetName string
NodeID int
NodeName string
NodeName string //SubNetName.NodeName
ServerAddr string
ServiceList map[string]bool
}
type ClusterConfig struct {
type SubNetNodeInfo struct {
SubNetMode string
SubNetName string
PublicServiceList []string
NodeList []CNodeCfg //配置列表
currentNode CNode //当前node
mapIdNode map[int]CNode //map[nodeid] CNode
//mapClusterNodeService map[string][]CNode //map[nodename] []CNode
//mapClusterServiceNode map[string][]CNode //map[servicename] []CNode
}
type ClusterConfig struct {
SubNet []SubNetNodeInfo
//CurrentSubNetIdx int
mapIdNode map[int]CNode
currentNode CNode //当前node
mapClusterNodeService map[string][]CNode //map[nodename] []CNode
mapClusterServiceNode map[string][]CNode //map[servicename] []CNode
}
func ReadCfg(path string, nodeid int) (*ClusterConfig, error) {
c := &ClusterConfig{}
func GenNodeName(subnetName string, nodename string) string {
parts := strings.Split(nodename, ".")
if len(parts) < 2 {
return subnetName + "." + nodename
}
return nodename
}
func ReadCfg(path string, nodeid int, mapNodeData map[int]NodeData) (*ClusterConfig, error) {
clsCfg := &ClusterConfig{}
clsCfg.mapIdNode = map[int]CNode{}
clsCfg.mapClusterNodeService = make(map[string][]CNode, 1)
clsCfg.mapClusterServiceNode = make(map[string][]CNode, 1)
//1.加载解析配置
d, err := ioutil.ReadFile(path)
@@ -42,67 +65,76 @@ func ReadCfg(path string, nodeid int) (*ClusterConfig, error) {
return nil, err
}
err = json.Unmarshal(d, c)
err = json.Unmarshal(d, clsCfg)
if err != nil {
fmt.Printf("Read File %s ,%s Error!", path, err)
return nil, err
}
c.mapIdNode = make(map[int]CNode, 1)
c.mapClusterNodeService = make(map[string][]CNode, 1)
c.mapClusterServiceNode = make(map[string][]CNode, 1)
//存储所有的nodeid对应cnode信息
var custerNodeNameList []string
for _, c := range clsCfg.SubNet {
for _, v := range c.NodeList {
mapservice := make(map[string]bool, 1)
for _, s := range v.ServiceList {
mapservice[s] = true
}
nodeData, ok := mapNodeData[v.NodeID]
if ok == false {
return nil, errors.New(fmt.Sprintf("Cannot find node id %d in nodeconfig.json file!", v.NodeID))
}
//2.组装mapIdNode
var custerNodeName []string
for _, v := range c.NodeList {
mapservice := make(map[string]bool, 1)
for _, s := range v.ServiceList {
mapservice[s] = true
}
node := CNode{c.SubNetName, v.NodeID, c.SubNetName + "." + v.NodeName, nodeData.NodeAddr, mapservice}
clsCfg.mapIdNode[v.NodeID] = node
node := CNode{v.NodeID, v.NodeName, v.ServerAddr, mapservice}
c.mapIdNode[v.NodeID] = node
if v.NodeID == nodeid {
clsCfg.currentNode = node
for _, servicename := range c.PublicServiceList {
clsCfg.currentNode.ServiceList[servicename] = true
}
if v.NodeID == nodeid {
//保存当前结点
c.currentNode = node
custerNodeName = v.ClusterNode
for _, nodename := range v.ClusterNode {
custerNodeNameList = append(custerNodeNameList, GenNodeName(c.SubNetName, nodename))
}
custerNodeNameList = append(custerNodeNameList, GenNodeName(c.SubNetName, v.NodeName))
}
}
}
if c.currentNode.NodeID == 0 {
if clsCfg.currentNode.NodeID == 0 {
return nil, errors.New(fmt.Sprintf("Cannot find NodeId %d in cluster.json!", nodeid))
}
//3.存入当前Node服务名
c.mapClusterNodeService[c.currentNode.NodeName] = append(c.mapClusterNodeService[c.currentNode.NodeName], c.currentNode)
//4.组装mapClusterNodeService
for _, cn := range custerNodeName {
for _, n := range c.mapIdNode {
if n.NodeName == cn {
nodeList := c.mapClusterNodeService[n.NodeName]
if IsExistsNode(nodeList, &n) == false {
c.mapClusterNodeService[n.NodeName] = append(c.mapClusterNodeService[n.NodeName], n)
//如果集群是FULL模式
if strings.ToUpper(clsCfg.GetClusterMode()) == "FULL" {
for _, subnet := range clsCfg.SubNet {
if subnet.SubNetName == clsCfg.currentNode.SubNetName {
for _, nodes := range subnet.NodeList {
custerNodeNameList = append(custerNodeNameList, subnet.SubNetName+"."+nodes.NodeName)
}
}
}
}
//5.组装mapClusterServiceNode
for _, nodelist := range c.mapClusterNodeService { //[]Node
for _, node := range nodelist { //Node
for s := range node.ServiceList {
c.mapClusterServiceNode[s] = append(c.mapClusterServiceNode[s], node)
for _, clusternodename := range custerNodeNameList {
for _, c := range clsCfg.SubNet {
for _, nodecfg := range c.NodeList {
if clusternodename != c.SubNetName+"."+nodecfg.NodeName {
continue
}
n, ok := clsCfg.mapIdNode[nodecfg.NodeID]
if ok == false {
return nil, errors.New(fmt.Sprintf("Cannot find NodeId %d in cluster.json!", nodecfg.NodeID))
}
clsCfg.mapClusterNodeService[clusternodename] = append(clsCfg.mapClusterNodeService[clusternodename], n)
for _, sname := range nodecfg.ServiceList {
clsCfg.mapClusterServiceNode[sname] = append(clsCfg.mapClusterServiceNode[sname], n)
}
}
}
}
//向c.currentNode中加入公共服务
for _, servicename := range c.PublicServiceList {
c.currentNode.ServiceList[servicename] = true
}
return c, nil
return clsCfg, nil
}
func (slf *ClusterConfig) GetIdByService(serviceName string) []int {
@@ -165,3 +197,103 @@ func (slf *ClusterConfig) GetNodeNameByNodeId(nodeid int) string {
return node.NodeName
}
func (slf *ClusterConfig) GetClusterMode() string {
//SubNet []SubNetNodeInfo
for _, subnet := range slf.SubNet {
if subnet.SubNetName == slf.currentNode.SubNetName {
return subnet.SubNetMode
}
}
return ""
}
type CACfg struct {
CertFile string
KeyFile string
}
//NodeConfig ...
type NodeData struct {
NodeID int
LogLevel uint
HttpPort uint16
WSPort uint16
NodeAddr string
CAFile []CACfg
Environment string
IsListenLog int
IsSendErrorMail int
}
type NodeConfig struct {
Public NodeData
NodeList []NodeData
}
//ReadNodeConfig ...
func ReadAllNodeConfig(path string) (map[int]NodeData, error) {
c := &NodeConfig{}
d, err := ioutil.ReadFile(path)
if err != nil {
return nil, err
}
err = json.Unmarshal(d, c)
if err != nil {
return nil, err
}
var mapNodeData map[int]NodeData
mapNodeData = map[int]NodeData{}
//data = c.Public
for _, v := range c.NodeList {
mapNodeData[v.NodeID] = v
}
return mapNodeData, nil
}
func ReadNodeConfig(path string, nodeid int) (*NodeData, error) {
c := &NodeConfig{}
d, err := ioutil.ReadFile(path)
if err != nil {
return nil, err
}
err = json.Unmarshal(d, c)
if err != nil {
return nil, err
}
var data NodeData
data = c.Public
for _, v := range c.NodeList {
if v.NodeID == nodeid {
data = v
if v.Environment == "" && c.Public.Environment != "" {
data.Environment = c.Public.Environment
}
if len(v.CAFile) == 0 && len(c.Public.CAFile) != 0 {
data.CAFile = c.Public.CAFile
}
if v.HttpPort == 0 && c.Public.HttpPort != 0 {
data.HttpPort = c.Public.HttpPort
}
if v.WSPort == 0 && c.Public.WSPort != 0 {
data.WSPort = c.Public.WSPort
}
if v.LogLevel == 0 && c.Public.LogLevel != 0 {
data.LogLevel = c.Public.LogLevel
}
if v.IsListenLog == 0 && c.Public.IsListenLog != 0 {
data.IsListenLog = c.Public.IsListenLog
}
break
}
}
return &data, nil
}