From b4c81b4889dbb183d0a0141695831a2cbb6da672 Mon Sep 17 00:00:00 2001 From: boyce Date: Thu, 14 Feb 2019 15:41:13 +0800 Subject: [PATCH] =?UTF-8?q?=E9=87=8D=E6=9E=84=E9=9B=86=E7=BE=A4=E9=85=8D?= =?UTF-8?q?=E7=BD=AE=E8=AF=BB=E5=8F=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cluster/cluster.go | 46 +++++++++++++++++++++++----------------------- cluster/config.go | 44 ++++++++++++++++++++++++++++---------------- 2 files changed, 51 insertions(+), 39 deletions(-) diff --git a/cluster/cluster.go b/cluster/cluster.go index 36a72af..408f265 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -229,23 +229,20 @@ func (slf *CCluster) Start() error { func (slf *CCluster) Call(NodeServiceMethod string, args interface{}, reply interface{}) error { var callServiceName string nodeidList := slf.GetNodeList(NodeServiceMethod, &callServiceName) - if len(nodeidList) > 1 { + if len(nodeidList) > 1 || len(nodeidList) < 1 { return fmt.Errorf("Call: %s find %d nodes.", NodeServiceMethod, len(nodeidList)) } - for _, nodeid := range nodeidList { - if nodeid == slf.GetCurrentNodeId() { - return slf.LocalRpcClient.Call(callServiceName, args, reply) - } else { - - pclient := slf.GetClusterClient(nodeid) - - if pclient == nil { - return fmt.Errorf("Call: NodeId %d is not find.", nodeid) - } - err := pclient.Call(callServiceName, args, reply) - return err + nodeid := nodeidList[0] + if nodeid == slf.GetCurrentNodeId() { + return slf.LocalRpcClient.Call(callServiceName, args, reply) + } else { + pclient := slf.GetClusterClient(nodeid) + if pclient == nil { + return fmt.Errorf("Call: NodeId %d is not find.", nodeid) } + err := pclient.Call(callServiceName, args, reply) + return err } return fmt.Errorf("Call: %s fail.", NodeServiceMethod) @@ -285,31 +282,34 @@ func (slf *CCluster) GetNodeList(NodeServiceMethod string, rpcServerMethod *stri return nodeidList } -func (slf *CCluster) Go(NodeServiceMethod string, args interface{}) error { +func (slf *CCluster) Go(bCast bool, NodeServiceMethod string, args interface{}) error { var callServiceName string nodeidList := slf.GetNodeList(NodeServiceMethod, &callServiceName) - if len(nodeidList) > 1 { + if len(nodeidList) < 1 { return fmt.Errorf("Call: %s find %d nodes.", NodeServiceMethod, len(nodeidList)) } + if bCast == false && len(nodeidList) > 1 { + return fmt.Errorf("Call: %s find more nodes %d.", NodeServiceMethod, len(nodeidList)) + } + for _, nodeid := range nodeidList { if nodeid == slf.GetCurrentNodeId() { slf.LocalRpcClient.Go(callServiceName, args, nil, nil) - return nil + //return nil } else { - pclient := slf.GetClusterClient(nodeid) - if pclient == nil { return fmt.Errorf("Call: NodeId %d is not find.", nodeid) } pclient.Go(callServiceName, args, nil, nil) - return nil + //return nil } } - return fmt.Errorf("Call: %s fail.", NodeServiceMethod) + return nil } + func (slf *CCluster) CallNode(nodeid int, servicemethod string, args interface{}, reply interface{}) error { pclient := slf.GetClusterClient(nodeid) if pclient == nil { @@ -351,7 +351,7 @@ func Call(NodeServiceMethod string, args interface{}, reply interface{}) error { } func Go(NodeServiceMethod string, args interface{}) error { - return InstanceClusterMgr().Go(NodeServiceMethod, args) + return InstanceClusterMgr().Go(false, NodeServiceMethod, args) } func CallNode(NodeId int, servicemethod string, args interface{}, reply interface{}) error { @@ -362,8 +362,8 @@ func GoNode(NodeId int, servicemethod string, args interface{}) error { return InstanceClusterMgr().GoNode(NodeId, args, servicemethod) } -func CastCall(NodeServiceMethod string, args interface{}, reply []interface{}) error { - return InstanceClusterMgr().Call(NodeServiceMethod, args, reply) +func CastGo(NodeServiceMethod string, args interface{}) error { + return InstanceClusterMgr().Go(true, NodeServiceMethod, args) } var _self *CCluster diff --git a/cluster/config.go b/cluster/config.go index eb54e4d..f52ba39 100644 --- a/cluster/config.go +++ b/cluster/config.go @@ -2,6 +2,7 @@ package cluster import ( "encoding/json" + "fmt" "io/ioutil" ) @@ -31,7 +32,7 @@ type ClusterConfig struct { //map[nodename][ {CNode} ] mapClusterNodeService map[string][]CNode //map[nodename] []CNode mapClusterServiceNode map[string][]CNode //map[servicename] []CNode - mapLocalService map[string]bool //map[servicename] bool + //mapLocalService map[string]bool //map[servicename] bool currentNode CNode } @@ -42,18 +43,21 @@ func ReadCfg(path string, nodeid int) (*ClusterConfig, error) { d, err := ioutil.ReadFile(path) if err != nil { + fmt.Printf("Read File %s Error!", path) return nil, err } + err = json.Unmarshal(d, c) if err != nil { + fmt.Printf("Read File %s ,%s Error!", path, err) return nil, err } c.mapIdNode = make(map[int]CNode, 1) c.mapClusterNodeService = make(map[string][]CNode, 1) - c.mapLocalService = make(map[string]bool) c.mapClusterServiceNode = make(map[string][]CNode, 1) + var custerNodeName []string //组装mapIdNode for _, v := range c.NodeList { mapservice := make(map[string]bool, 1) @@ -61,26 +65,32 @@ func ReadCfg(path string, nodeid int) (*ClusterConfig, error) { mapservice[s] = true } - c.mapIdNode[v.NodeID] = CNode{v.NodeID, v.NodeName, v.ServerAddr, mapservice} + node := CNode{v.NodeID, v.NodeName, v.ServerAddr, mapservice} + c.mapIdNode[v.NodeID] = node + + if v.NodeID == nodeid { + //保存当前结点 + c.currentNode = node + custerNodeName = v.ClusterNode + } } //组装mapClusterNodeService - for _, n := range c.mapIdNode { - c.mapClusterNodeService[n.NodeName] = append(c.mapClusterNodeService[n.NodeName], n) - - //组装mapClusterServiceNode - for s := range n.ServiceList { - c.mapClusterServiceNode[s] = append(c.mapClusterServiceNode[s], n) - - if n.NodeID == nodeid { - c.mapLocalService[s] = true + for _, cn := range custerNodeName { + for _, n := range c.mapIdNode { + if n.NodeName == cn { + c.mapClusterNodeService[n.NodeName] = append(c.mapClusterNodeService[n.NodeName], n) } } + } - if n.NodeID == nodeid { - c.currentNode = n + //组装mapClusterServiceNode + for _, nodelist := range c.mapClusterNodeService { //[]Node + for _, node := range nodelist { //Node + for s := range node.ServiceList { + c.mapClusterServiceNode[s] = append(c.mapClusterServiceNode[s], node) + } } - } return c, nil @@ -124,6 +134,8 @@ func (slf *ClusterConfig) GetIdByNodeService(NodeName string, serviceName string } func (slf *ClusterConfig) HasLocalService(serviceName string) bool { - _, ok := slf.mapLocalService[serviceName] + _, ok := slf.currentNode.ServiceList[serviceName] + + //_, ok := slf.mapLocalService[serviceName] return ok == true }