diff --git a/README.md b/README.md index 7b7868c..0a08d12 100644 --- a/README.md +++ b/README.md @@ -17,7 +17,7 @@ Hello world! 下面我们来一步步的建立origin服务器,先下载[origin引擎](https://github.com/duanhf2012/origin "origin引擎"),或者使用如下命令: ```go -go get -v -u github.com/duanhf2012/origin +go get -v -u github.com/duanhf2012/origin/v2 ``` [README.md](README.md) 于是下载到GOPATH环境目录中,在src中加入main.go,内容如下: diff --git a/cluster/cluster.go b/cluster/cluster.go index ea5f3ee..cb3f9cf 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -21,13 +21,13 @@ const ( ) type MasterDiscoveryService struct { - MasterNodeId int32 //要筛选的主结点Id,如果不配置或者配置成0,表示针对所有的主结点 + MasterNodeId string //要筛选的主结点Id,如果不配置或者配置成0,表示针对所有的主结点 DiscoveryService []string //只发现的服务列表 } type NodeInfo struct { - NodeId int - NodeName string + NodeId string + //NodeName string Private bool ListenAddr string MaxRpcParamLen uint32 //最大Rpc参数长度 @@ -56,9 +56,9 @@ type Cluster struct { locker sync.RWMutex //结点与服务关系保护锁 - mapRpc map[int]*NodeRpcInfo //nodeId + mapRpc map[string]*NodeRpcInfo //nodeId //mapIdNode map[int]NodeInfo //map[NodeId]NodeInfo - mapServiceNode map[string]map[int]struct{} //map[serviceName]map[NodeId] + mapServiceNode map[string]map[string]struct{} //map[serviceName]map[NodeId] rpcServer rpc.Server rpcEventLocker sync.RWMutex //Rpc事件监听保护锁 @@ -86,7 +86,7 @@ func (cls *Cluster) Stop() { cls.serviceDiscovery.OnNodeStop() } -func (cls *Cluster) DiscardNode(nodeId int) { +func (cls *Cluster) DiscardNode(nodeId string) { cls.locker.Lock() nodeInfo, ok := cls.mapRpc[nodeId] bDel := (ok == true) && nodeInfo.nodeInfo.status == Discard @@ -97,7 +97,7 @@ func (cls *Cluster) DiscardNode(nodeId int) { } } -func (cls *Cluster) DelNode(nodeId int, immediately bool) { +func (cls *Cluster) DelNode(nodeId string, immediately bool) { //MasterDiscover结点与本地结点不删除 if cls.GetMasterDiscoveryNodeInfo(nodeId) != nil || nodeId == cls.localNodeInfo.NodeId { return @@ -114,7 +114,7 @@ func (cls *Cluster) DelNode(nodeId int, immediately bool) { //正在连接中不主动断开,只断开没有连接中的 if rpc.client.IsConnected() { rpc.nodeInfo.status = Discard - log.Info("Discard node",log.Int("nodeId",rpc.nodeInfo.NodeId),log.String("ListenAddr", rpc.nodeInfo.ListenAddr)) + log.Info("Discard node",log.String("nodeId",rpc.nodeInfo.NodeId),log.String("ListenAddr", rpc.nodeInfo.ListenAddr)) return } } @@ -128,18 +128,18 @@ func (cls *Cluster) DelNode(nodeId int, immediately bool) { rpc.client.Close(false) } - log.Info("remove node ",log.Int("NodeId", rpc.nodeInfo.NodeId),log.String("ListenAddr", rpc.nodeInfo.ListenAddr)) + log.Info("remove node ",log.String("NodeId", rpc.nodeInfo.NodeId),log.String("ListenAddr", rpc.nodeInfo.ListenAddr)) } -func (cls *Cluster) serviceDiscoveryDelNode(nodeId int, immediately bool) { - if nodeId == 0 { - return - } +func (cls *Cluster) serviceDiscoveryDelNode(nodeId string, immediately bool) { + //if nodeId == "" { + // return + //} cls.DelNode(nodeId, immediately) } -func (cls *Cluster) delServiceNode(serviceName string, nodeId int) { +func (cls *Cluster) delServiceNode(serviceName string, nodeId string) { if nodeId == cls.localNodeInfo.NodeId { return } @@ -178,13 +178,13 @@ func (cls *Cluster) serviceDiscoverySetNodeInfo(nodeInfo *NodeInfo) { } mapDuplicate[serviceName] = nil if _, ok := cls.mapServiceNode[serviceName]; ok == false { - cls.mapServiceNode[serviceName] = make(map[int]struct{}, 1) + cls.mapServiceNode[serviceName] = make(map[string]struct{}, 1) } cls.mapServiceNode[serviceName][nodeInfo.NodeId] = struct{}{} } if lastNodeInfo != nil { - log.Info("Discovery nodeId",log.Int("NodeId", nodeInfo.NodeId),log.Any("services:", nodeInfo.PublicServiceList),log.Bool("Retire",nodeInfo.Retire)) + log.Info("Discovery nodeId",log.String("NodeId", nodeInfo.NodeId),log.Any("services:", nodeInfo.PublicServiceList),log.Bool("Retire",nodeInfo.Retire)) lastNodeInfo.nodeInfo = *nodeInfo return } @@ -194,12 +194,12 @@ func (cls *Cluster) serviceDiscoverySetNodeInfo(nodeInfo *NodeInfo) { rpcInfo.nodeInfo = *nodeInfo rpcInfo.client =rpc.NewRClient(nodeInfo.NodeId, nodeInfo.ListenAddr, nodeInfo.MaxRpcParamLen,cls.localNodeInfo.CompressBytesLen,cls.triggerRpcEvent) cls.mapRpc[nodeInfo.NodeId] = &rpcInfo - log.Info("Discovery nodeId and new rpc client",log.Int("NodeId", nodeInfo.NodeId),log.Any("services:", nodeInfo.PublicServiceList),log.Bool("Retire",nodeInfo.Retire),log.String("nodeListenAddr",nodeInfo.ListenAddr)) + log.Info("Discovery nodeId and new rpc client",log.String("NodeId", nodeInfo.NodeId),log.Any("services:", nodeInfo.PublicServiceList),log.Bool("Retire",nodeInfo.Retire),log.String("nodeListenAddr",nodeInfo.ListenAddr)) } -func (cls *Cluster) Init(localNodeId int, setupServiceFun SetupServiceFun) error { +func (cls *Cluster) Init(localNodeId string, setupServiceFun SetupServiceFun) error { //1.初始化配置 err := cls.InitCfg(localNodeId) if err != nil { @@ -223,7 +223,7 @@ func (cls *Cluster) Init(localNodeId int, setupServiceFun SetupServiceFun) error return nil } -func (cls *Cluster) checkDynamicDiscovery(localNodeId int) (bool, bool) { +func (cls *Cluster) checkDynamicDiscovery(localNodeId string) (bool, bool) { var localMaster bool //本结点是否为Master结点 var hasMaster bool //是否配置Master服务 @@ -247,7 +247,7 @@ func (cls *Cluster) AddDynamicDiscoveryService(serviceName string, bPublicServic } if _, ok := cls.mapServiceNode[serviceName]; ok == false { - cls.mapServiceNode[serviceName] = map[int]struct{}{} + cls.mapServiceNode[serviceName] = map[string]struct{}{} } cls.mapServiceNode[serviceName][cls.localNodeInfo.NodeId] = struct{}{} } @@ -256,7 +256,7 @@ func (cls *Cluster) GetDiscoveryNodeList() []NodeInfo { return cls.masterDiscoveryNodeList } -func (cls *Cluster) GetMasterDiscoveryNodeInfo(nodeId int) *NodeInfo { +func (cls *Cluster) GetMasterDiscoveryNodeInfo(nodeId string) *NodeInfo { for i := 0; i < len(cls.masterDiscoveryNodeList); i++ { if cls.masterDiscoveryNodeList[i].NodeId == nodeId { return &cls.masterDiscoveryNodeList[i] @@ -270,7 +270,7 @@ func (cls *Cluster) IsMasterDiscoveryNode() bool { return cls.GetMasterDiscoveryNodeInfo(cls.GetLocalNodeInfo().NodeId) != nil } -func (cls *Cluster) SetupServiceDiscovery(localNodeId int, setupServiceFun SetupServiceFun) { +func (cls *Cluster) SetupServiceDiscovery(localNodeId string, setupServiceFun SetupServiceFun) { if cls.serviceDiscovery != nil { return } @@ -300,7 +300,7 @@ func (cls *Cluster) FindRpcHandler(serviceName string) rpc.IRpcHandler { return pService.GetRpcHandler() } -func (cls *Cluster) getRpcClient(nodeId int) (*rpc.Client,bool) { +func (cls *Cluster) getRpcClient(nodeId string) (*rpc.Client,bool) { c, ok := cls.mapRpc[nodeId] if ok == false { return nil,false @@ -309,14 +309,14 @@ func (cls *Cluster) getRpcClient(nodeId int) (*rpc.Client,bool) { return c.client,c.nodeInfo.Retire } -func (cls *Cluster) GetRpcClient(nodeId int) (*rpc.Client,bool) { +func (cls *Cluster) GetRpcClient(nodeId string) (*rpc.Client,bool) { cls.locker.RLock() defer cls.locker.RUnlock() return cls.getRpcClient(nodeId) } -func GetRpcClient(nodeId int, serviceMethod string,filterRetire bool, clientList []*rpc.Client) (error, int) { - if nodeId > 0 { +func GetRpcClient(nodeId string, serviceMethod string,filterRetire bool, clientList []*rpc.Client) (error, int) { + if nodeId != rpc.NodeIdNull { pClient,retire := GetCluster().GetRpcClient(nodeId) if pClient == nil { return fmt.Errorf("cannot find nodeid %d!", nodeId), 0 @@ -345,12 +345,12 @@ func GetRpcServer() *rpc.Server { return &cluster.rpcServer } -func (cls *Cluster) IsNodeConnected(nodeId int) bool { +func (cls *Cluster) IsNodeConnected(nodeId string) bool { pClient,_ := cls.GetRpcClient(nodeId) return pClient != nil && pClient.IsConnected() } -func (cls *Cluster) IsNodeRetire(nodeId int) bool { +func (cls *Cluster) IsNodeRetire(nodeId string) bool { cls.locker.RLock() defer cls.locker.RUnlock() @@ -359,7 +359,7 @@ func (cls *Cluster) IsNodeRetire(nodeId int) bool { } -func (cls *Cluster) triggerRpcEvent(bConnect bool, clientId uint32, nodeId int) { +func (cls *Cluster) triggerRpcEvent(bConnect bool, clientId uint32, nodeId string) { cls.locker.Lock() nodeInfo, ok := cls.mapRpc[nodeId] if ok == false || nodeInfo.client == nil || nodeInfo.client.GetClientId() != clientId { @@ -384,7 +384,7 @@ func (cls *Cluster) triggerRpcEvent(bConnect bool, clientId uint32, nodeId int) } } -func (cls *Cluster) TriggerDiscoveryEvent(bDiscovery bool, nodeId int, serviceName []string) { +func (cls *Cluster) TriggerDiscoveryEvent(bDiscovery bool, nodeId string, serviceName []string) { cls.rpcEventLocker.Lock() defer cls.rpcEventLocker.Unlock() @@ -443,7 +443,7 @@ func (cls *Cluster) UnReDiscoveryEvent(serviceName string) { -func HasService(nodeId int, serviceName string) bool { +func HasService(nodeId string, serviceName string) bool { cluster.locker.RLock() defer cluster.locker.RUnlock() @@ -456,7 +456,7 @@ func HasService(nodeId int, serviceName string) bool { return false } -func GetNodeByServiceName(serviceName string) map[int]struct{} { +func GetNodeByServiceName(serviceName string) map[string]struct{} { cluster.locker.RLock() defer cluster.locker.RUnlock() @@ -465,7 +465,7 @@ func GetNodeByServiceName(serviceName string) map[int]struct{} { return nil } - mapNodeId := map[int]struct{}{} + mapNodeId := map[string]struct{}{} for nodeId,_ := range mapNode { mapNodeId[nodeId] = struct{}{} } @@ -477,7 +477,7 @@ func (cls *Cluster) GetGlobalCfg() interface{} { return cls.globalCfg } -func (cls *Cluster) GetNodeInfo(nodeId int) (NodeInfo,bool) { +func (cls *Cluster) GetNodeInfo(nodeId string) (NodeInfo,bool) { cls.locker.RLock() defer cls.locker.RUnlock() diff --git a/cluster/configdiscovery.go b/cluster/configdiscovery.go index 5d2fd96..73db25c 100644 --- a/cluster/configdiscovery.go +++ b/cluster/configdiscovery.go @@ -1,19 +1,21 @@ package cluster +import "github.com/duanhf2012/origin/v2/rpc" + type ConfigDiscovery struct { funDelService FunDelNode funSetService FunSetNodeInfo - localNodeId int + localNodeId string } -func (discovery *ConfigDiscovery) InitDiscovery(localNodeId int,funDelNode FunDelNode,funSetNodeInfo FunSetNodeInfo) error{ +func (discovery *ConfigDiscovery) InitDiscovery(localNodeId string,funDelNode FunDelNode,funSetNodeInfo FunSetNodeInfo) error{ discovery.localNodeId = localNodeId discovery.funDelService = funDelNode discovery.funSetService = funSetNodeInfo //解析本地其他服务配置 - _,nodeInfoList,err := GetCluster().readLocalClusterConfig(0) + _,nodeInfoList,err := GetCluster().readLocalClusterConfig(rpc.NodeIdNull) if err != nil { return err } diff --git a/cluster/dynamicdiscovery.go b/cluster/dynamicdiscovery.go index ab96673..605bb96 100644 --- a/cluster/dynamicdiscovery.go +++ b/cluster/dynamicdiscovery.go @@ -20,7 +20,7 @@ const NodeRetireRpcMethod = DynamicDiscoveryMasterName+".RPC_NodeRetire" type DynamicDiscoveryMaster struct { service.Service - mapNodeInfo map[int32]struct{} + mapNodeInfo map[string]struct{} nodeInfo []*rpc.NodeInfo } @@ -29,9 +29,9 @@ type DynamicDiscoveryClient struct { funDelService FunDelNode funSetService FunSetNodeInfo - localNodeId int + localNodeId string - mapDiscovery map[int32]map[int32]struct{} //map[masterNodeId]map[nodeId]struct{} + mapDiscovery map[string]map[string]struct{} //map[masterNodeId]map[nodeId]struct{} bRetire bool } @@ -47,7 +47,7 @@ func init() { clientService.SetName(DynamicDiscoveryClientName) } -func (ds *DynamicDiscoveryMaster) isRegNode(nodeId int32) bool { +func (ds *DynamicDiscoveryMaster) isRegNode(nodeId string) bool { _, ok := ds.mapNodeInfo[nodeId] return ok } @@ -81,7 +81,7 @@ func (ds *DynamicDiscoveryMaster) addNodeInfo(nInfo *rpc.NodeInfo) { ds.nodeInfo = append(ds.nodeInfo, nodeInfo) } -func (ds *DynamicDiscoveryMaster) removeNodeInfo(nodeId int32) { +func (ds *DynamicDiscoveryMaster) removeNodeInfo(nodeId string) { if _,ok:= ds.mapNodeInfo[nodeId];ok == false { return } @@ -97,7 +97,7 @@ func (ds *DynamicDiscoveryMaster) removeNodeInfo(nodeId int32) { } func (ds *DynamicDiscoveryMaster) OnInit() error { - ds.mapNodeInfo = make(map[int32]struct{}, 20) + ds.mapNodeInfo = make(map[string]struct{}, 20) ds.RegRpcListener(ds) return nil @@ -106,8 +106,7 @@ func (ds *DynamicDiscoveryMaster) OnInit() error { func (ds *DynamicDiscoveryMaster) OnStart() { var nodeInfo rpc.NodeInfo localNodeInfo := cluster.GetLocalNodeInfo() - nodeInfo.NodeId = int32(localNodeInfo.NodeId) - nodeInfo.NodeName = localNodeInfo.NodeName + nodeInfo.NodeId = localNodeInfo.NodeId nodeInfo.ListenAddr = localNodeInfo.ListenAddr nodeInfo.PublicServiceList = localNodeInfo.PublicServiceList nodeInfo.MaxRpcParamLen = localNodeInfo.MaxRpcParamLen @@ -117,9 +116,9 @@ func (ds *DynamicDiscoveryMaster) OnStart() { ds.addNodeInfo(&nodeInfo) } -func (ds *DynamicDiscoveryMaster) OnNodeConnected(nodeId int) { +func (ds *DynamicDiscoveryMaster) OnNodeConnected(nodeId string) { //没注册过结点不通知 - if ds.isRegNode(int32(nodeId)) == false { + if ds.isRegNode(nodeId) == false { return } @@ -127,21 +126,21 @@ func (ds *DynamicDiscoveryMaster) OnNodeConnected(nodeId int) { var notifyDiscover rpc.SubscribeDiscoverNotify notifyDiscover.IsFull = true notifyDiscover.NodeInfo = ds.nodeInfo - notifyDiscover.MasterNodeId = int32(cluster.GetLocalNodeInfo().NodeId) + notifyDiscover.MasterNodeId = cluster.GetLocalNodeInfo().NodeId ds.GoNode(nodeId, SubServiceDiscover, ¬ifyDiscover) } -func (ds *DynamicDiscoveryMaster) OnNodeDisconnect(nodeId int) { - if ds.isRegNode(int32(nodeId)) == false { +func (ds *DynamicDiscoveryMaster) OnNodeDisconnect(nodeId string) { + if ds.isRegNode(nodeId) == false { return } - ds.removeNodeInfo(int32(nodeId)) + ds.removeNodeInfo(nodeId) var notifyDiscover rpc.SubscribeDiscoverNotify - notifyDiscover.MasterNodeId = int32(cluster.GetLocalNodeInfo().NodeId) - notifyDiscover.DelNodeId = int32(nodeId) + notifyDiscover.MasterNodeId = cluster.GetLocalNodeInfo().NodeId + notifyDiscover.DelNodeId = nodeId //删除结点 cluster.DelNode(nodeId, true) @@ -151,17 +150,17 @@ func (ds *DynamicDiscoveryMaster) OnNodeDisconnect(nodeId int) { func (ds *DynamicDiscoveryMaster) RpcCastGo(serviceMethod string, args interface{}) { for nodeId, _ := range ds.mapNodeInfo { - ds.GoNode(int(nodeId), serviceMethod, args) + ds.GoNode(nodeId, serviceMethod, args) } } func (ds *DynamicDiscoveryMaster) RPC_NodeRetire(req *rpc.NodeRetireReq, res *rpc.Empty) error { - log.Info("node is retire",log.Int32("nodeId",req.NodeInfo.NodeId),log.Bool("retire",req.NodeInfo.Retire)) + log.Info("node is retire",log.String("nodeId",req.NodeInfo.NodeId),log.Bool("retire",req.NodeInfo.Retire)) ds.updateNodeInfo(req.NodeInfo) var notifyDiscover rpc.SubscribeDiscoverNotify - notifyDiscover.MasterNodeId = int32(cluster.GetLocalNodeInfo().NodeId) + notifyDiscover.MasterNodeId = cluster.GetLocalNodeInfo().NodeId notifyDiscover.NodeInfo = append(notifyDiscover.NodeInfo, req.NodeInfo) ds.RpcCastGo(SubServiceDiscover, ¬ifyDiscover) @@ -179,7 +178,7 @@ func (ds *DynamicDiscoveryMaster) RPC_RegServiceDiscover(req *rpc.ServiceDiscove //广播给其他所有结点 var notifyDiscover rpc.SubscribeDiscoverNotify - notifyDiscover.MasterNodeId = int32(cluster.GetLocalNodeInfo().NodeId) + notifyDiscover.MasterNodeId = cluster.GetLocalNodeInfo().NodeId notifyDiscover.NodeInfo = append(notifyDiscover.NodeInfo, req.NodeInfo) ds.RpcCastGo(SubServiceDiscover, ¬ifyDiscover) @@ -188,8 +187,7 @@ func (ds *DynamicDiscoveryMaster) RPC_RegServiceDiscover(req *rpc.ServiceDiscove //初始化结点信息 var nodeInfo NodeInfo - nodeInfo.NodeId = int(req.NodeInfo.NodeId) - nodeInfo.NodeName = req.NodeInfo.NodeName + nodeInfo.NodeId = req.NodeInfo.NodeId nodeInfo.Private = req.NodeInfo.Private nodeInfo.ServiceList = req.NodeInfo.PublicServiceList nodeInfo.PublicServiceList = req.NodeInfo.PublicServiceList @@ -208,19 +206,19 @@ func (ds *DynamicDiscoveryMaster) RPC_RegServiceDiscover(req *rpc.ServiceDiscove func (dc *DynamicDiscoveryClient) OnInit() error { dc.RegRpcListener(dc) - dc.mapDiscovery = map[int32]map[int32]struct{}{} + dc.mapDiscovery = map[string]map[string]struct{}{} return nil } -func (dc *DynamicDiscoveryClient) addMasterNode(masterNodeId int32, nodeId int32) { +func (dc *DynamicDiscoveryClient) addMasterNode(masterNodeId string, nodeId string) { _, ok := dc.mapDiscovery[masterNodeId] if ok == false { - dc.mapDiscovery[masterNodeId] = map[int32]struct{}{} + dc.mapDiscovery[masterNodeId] = map[string]struct{}{} } dc.mapDiscovery[masterNodeId][nodeId] = struct{}{} } -func (dc *DynamicDiscoveryClient) removeMasterNode(masterNodeId int32, nodeId int32) { +func (dc *DynamicDiscoveryClient) removeMasterNode(masterNodeId string, nodeId string) { mapNodeId, ok := dc.mapDiscovery[masterNodeId] if ok == false { return @@ -229,7 +227,7 @@ func (dc *DynamicDiscoveryClient) removeMasterNode(masterNodeId int32, nodeId in delete(mapNodeId, nodeId) } -func (dc *DynamicDiscoveryClient) findNodeId(nodeId int32) bool { +func (dc *DynamicDiscoveryClient) findNodeId(nodeId string) bool { for _, mapNodeId := range dc.mapDiscovery { _, ok := mapNodeId[nodeId] if ok == true { @@ -255,13 +253,13 @@ func (dc *DynamicDiscoveryClient) addDiscoveryMaster() { } } -func (dc *DynamicDiscoveryClient) fullCompareDiffNode(masterNodeId int32, mapNodeInfo map[int32]*rpc.NodeInfo) []int32 { +func (dc *DynamicDiscoveryClient) fullCompareDiffNode(masterNodeId string, mapNodeInfo map[string]*rpc.NodeInfo) []string { if mapNodeInfo == nil { return nil } - diffNodeIdSlice := make([]int32, 0, len(mapNodeInfo)) - mapNodeId := map[int32]struct{}{} + diffNodeIdSlice := make([]string, 0, len(mapNodeInfo)) + mapNodeId := map[string]struct{}{} mapNodeId, ok := dc.mapDiscovery[masterNodeId] if ok == false { return nil @@ -280,10 +278,10 @@ func (dc *DynamicDiscoveryClient) fullCompareDiffNode(masterNodeId int32, mapNod //订阅发现的服务通知 func (dc *DynamicDiscoveryClient) RPC_SubServiceDiscover(req *rpc.SubscribeDiscoverNotify) error { - mapNodeInfo := map[int32]*rpc.NodeInfo{} + mapNodeInfo := map[string]*rpc.NodeInfo{} for _, nodeInfo := range req.NodeInfo { //不对本地结点或者不存在任何公开服务的结点 - if int(nodeInfo.NodeId) == dc.localNodeId { + if nodeInfo.NodeId == dc.localNodeId { continue } @@ -298,7 +296,6 @@ func (dc *DynamicDiscoveryClient) RPC_SubServiceDiscover(req *rpc.SubscribeDisco if nInfo == nil { nInfo = &rpc.NodeInfo{} nInfo.NodeId = nodeInfo.NodeId - nInfo.NodeName = nodeInfo.NodeName nInfo.ListenAddr = nodeInfo.ListenAddr nInfo.MaxRpcParamLen = nodeInfo.MaxRpcParamLen nInfo.Retire = nodeInfo.Retire @@ -312,7 +309,7 @@ func (dc *DynamicDiscoveryClient) RPC_SubServiceDiscover(req *rpc.SubscribeDisco } //如果为完整同步,则找出差异的结点 - var willDelNodeId []int32 + var willDelNodeId []string if req.IsFull == true { diffNode := dc.fullCompareDiffNode(req.MasterNodeId, mapNodeInfo) if len(diffNode) > 0 { @@ -321,16 +318,16 @@ func (dc *DynamicDiscoveryClient) RPC_SubServiceDiscover(req *rpc.SubscribeDisco } //指定删除结点 - if req.DelNodeId > 0 && req.DelNodeId != int32(dc.localNodeId) { + if req.DelNodeId != rpc.NodeIdNull && req.DelNodeId != dc.localNodeId { willDelNodeId = append(willDelNodeId, req.DelNodeId) } //删除不必要的结点 for _, nodeId := range willDelNodeId { - cluster.TriggerDiscoveryEvent(false,int(nodeId),nil) - dc.removeMasterNode(req.MasterNodeId, int32(nodeId)) + cluster.TriggerDiscoveryEvent(false,nodeId,nil) + dc.removeMasterNode(req.MasterNodeId, nodeId) if dc.findNodeId(nodeId) == false { - dc.funDelService(int(nodeId), false) + dc.funDelService(nodeId, false) } } @@ -341,13 +338,13 @@ func (dc *DynamicDiscoveryClient) RPC_SubServiceDiscover(req *rpc.SubscribeDisco continue } - cluster.TriggerDiscoveryEvent(true,int(nodeInfo.NodeId),nodeInfo.PublicServiceList) + cluster.TriggerDiscoveryEvent(true,nodeInfo.NodeId,nodeInfo.PublicServiceList) } return nil } -func (dc *DynamicDiscoveryClient) isDiscoverNode(nodeId int) bool { +func (dc *DynamicDiscoveryClient) isDiscoverNode(nodeId string) bool { for i := 0; i < len(cluster.masterDiscoveryNodeList); i++ { if cluster.masterDiscoveryNodeList[i].NodeId == nodeId { return true @@ -357,7 +354,7 @@ func (dc *DynamicDiscoveryClient) isDiscoverNode(nodeId int) bool { return false } -func (dc *DynamicDiscoveryClient) OnNodeConnected(nodeId int) { +func (dc *DynamicDiscoveryClient) OnNodeConnected(nodeId string) { dc.regServiceDiscover(nodeId) } @@ -369,22 +366,21 @@ func (dc *DynamicDiscoveryClient) OnRetire(){ var nodeRetireReq rpc.NodeRetireReq nodeRetireReq.NodeInfo = &rpc.NodeInfo{} - nodeRetireReq.NodeInfo.NodeId = int32(cluster.localNodeInfo.NodeId) - nodeRetireReq.NodeInfo.NodeName = cluster.localNodeInfo.NodeName + nodeRetireReq.NodeInfo.NodeId = cluster.localNodeInfo.NodeId nodeRetireReq.NodeInfo.ListenAddr = cluster.localNodeInfo.ListenAddr nodeRetireReq.NodeInfo.MaxRpcParamLen = cluster.localNodeInfo.MaxRpcParamLen nodeRetireReq.NodeInfo.PublicServiceList = cluster.localNodeInfo.PublicServiceList nodeRetireReq.NodeInfo.Retire = dc.bRetire nodeRetireReq.NodeInfo.Private = cluster.localNodeInfo.Private - err := dc.GoNode(int(masterNodeList[i].NodeId),NodeRetireRpcMethod,&nodeRetireReq) + err := dc.GoNode(masterNodeList[i].NodeId,NodeRetireRpcMethod,&nodeRetireReq) if err!= nil { log.Error("call "+NodeRetireRpcMethod+" is fail",log.ErrorAttr("err",err)) } } } -func (dc *DynamicDiscoveryClient) regServiceDiscover(nodeId int){ +func (dc *DynamicDiscoveryClient) regServiceDiscover(nodeId string){ nodeInfo := cluster.GetMasterDiscoveryNodeInfo(nodeId) if nodeInfo == nil { return @@ -392,8 +388,7 @@ func (dc *DynamicDiscoveryClient) regServiceDiscover(nodeId int){ var req rpc.ServiceDiscoverReq req.NodeInfo = &rpc.NodeInfo{} - req.NodeInfo.NodeId = int32(cluster.localNodeInfo.NodeId) - req.NodeInfo.NodeName = cluster.localNodeInfo.NodeName + req.NodeInfo.NodeId = cluster.localNodeInfo.NodeId req.NodeInfo.ListenAddr = cluster.localNodeInfo.ListenAddr req.NodeInfo.MaxRpcParamLen = cluster.localNodeInfo.MaxRpcParamLen req.NodeInfo.PublicServiceList = cluster.localNodeInfo.PublicServiceList @@ -416,13 +411,13 @@ func (dc *DynamicDiscoveryClient) regServiceDiscover(nodeId int){ } } -func (dc *DynamicDiscoveryClient) canDiscoveryService(fromMasterNodeId int32,serviceName string) bool{ +func (dc *DynamicDiscoveryClient) canDiscoveryService(fromMasterNodeId string,serviceName string) bool{ canDiscovery := true for i:=0;i rpc.NodeInfo 0, // 1: rpc.SubscribeDiscoverNotify.nodeInfo:type_name -> rpc.NodeInfo 0, // 2: rpc.NodeRetireReq.nodeInfo:type_name -> rpc.NodeInfo @@ -395,13 +385,13 @@ var file_proto_rpcproto_dynamicdiscover_proto_depIdxs = []int32{ 0, // [0:3] is the sub-list for field type_name } -func init() { file_proto_rpcproto_dynamicdiscover_proto_init() } -func file_proto_rpcproto_dynamicdiscover_proto_init() { - if File_proto_rpcproto_dynamicdiscover_proto != nil { +func init() { file_rpcproto_dynamicdiscover_proto_init() } +func file_rpcproto_dynamicdiscover_proto_init() { + if File_rpcproto_dynamicdiscover_proto != nil { return } if !protoimpl.UnsafeEnabled { - file_proto_rpcproto_dynamicdiscover_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + file_rpcproto_dynamicdiscover_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*NodeInfo); i { case 0: return &v.state @@ -413,7 +403,7 @@ func file_proto_rpcproto_dynamicdiscover_proto_init() { return nil } } - file_proto_rpcproto_dynamicdiscover_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + file_rpcproto_dynamicdiscover_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*ServiceDiscoverReq); i { case 0: return &v.state @@ -425,7 +415,7 @@ func file_proto_rpcproto_dynamicdiscover_proto_init() { return nil } } - file_proto_rpcproto_dynamicdiscover_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { + file_rpcproto_dynamicdiscover_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*SubscribeDiscoverNotify); i { case 0: return &v.state @@ -437,7 +427,7 @@ func file_proto_rpcproto_dynamicdiscover_proto_init() { return nil } } - file_proto_rpcproto_dynamicdiscover_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} { + file_rpcproto_dynamicdiscover_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*NodeRetireReq); i { case 0: return &v.state @@ -449,7 +439,7 @@ func file_proto_rpcproto_dynamicdiscover_proto_init() { return nil } } - file_proto_rpcproto_dynamicdiscover_proto_msgTypes[4].Exporter = func(v interface{}, i int) interface{} { + file_rpcproto_dynamicdiscover_proto_msgTypes[4].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*Empty); i { case 0: return &v.state @@ -466,18 +456,18 @@ func file_proto_rpcproto_dynamicdiscover_proto_init() { out := protoimpl.TypeBuilder{ File: protoimpl.DescBuilder{ GoPackagePath: reflect.TypeOf(x{}).PkgPath(), - RawDescriptor: file_proto_rpcproto_dynamicdiscover_proto_rawDesc, + RawDescriptor: file_rpcproto_dynamicdiscover_proto_rawDesc, NumEnums: 0, NumMessages: 5, NumExtensions: 0, NumServices: 0, }, - GoTypes: file_proto_rpcproto_dynamicdiscover_proto_goTypes, - DependencyIndexes: file_proto_rpcproto_dynamicdiscover_proto_depIdxs, - MessageInfos: file_proto_rpcproto_dynamicdiscover_proto_msgTypes, + GoTypes: file_rpcproto_dynamicdiscover_proto_goTypes, + DependencyIndexes: file_rpcproto_dynamicdiscover_proto_depIdxs, + MessageInfos: file_rpcproto_dynamicdiscover_proto_msgTypes, }.Build() - File_proto_rpcproto_dynamicdiscover_proto = out.File - file_proto_rpcproto_dynamicdiscover_proto_rawDesc = nil - file_proto_rpcproto_dynamicdiscover_proto_goTypes = nil - file_proto_rpcproto_dynamicdiscover_proto_depIdxs = nil + File_rpcproto_dynamicdiscover_proto = out.File + file_rpcproto_dynamicdiscover_proto_rawDesc = nil + file_rpcproto_dynamicdiscover_proto_goTypes = nil + file_rpcproto_dynamicdiscover_proto_depIdxs = nil } diff --git a/rpc/dynamicdiscover.proto b/rpc/dynamicdiscover.proto index 6f5e1d8..67f4b9a 100644 --- a/rpc/dynamicdiscover.proto +++ b/rpc/dynamicdiscover.proto @@ -3,13 +3,12 @@ package rpc; option go_package = ".;rpc"; message NodeInfo{ - int32 NodeId = 1; - string NodeName = 2; - string ListenAddr = 3; - uint32 MaxRpcParamLen = 4; - bool Private = 5; - bool Retire = 6; - repeated string PublicServiceList = 7; + string NodeId = 1; + string ListenAddr = 2; + uint32 MaxRpcParamLen = 3; + bool Private = 4; + bool Retire = 5; + repeated string PublicServiceList = 6; } //Client->Master @@ -19,9 +18,9 @@ message ServiceDiscoverReq{ //Master->Client message SubscribeDiscoverNotify{ - int32 MasterNodeId = 1; + string MasterNodeId = 1; bool IsFull = 2; - int32 DelNodeId = 3; + string DelNodeId = 3; repeated NodeInfo nodeInfo = 4; } diff --git a/rpc/lclient.go b/rpc/lclient.go index cf0947a..dd8320f 100644 --- a/rpc/lclient.go +++ b/rpc/lclient.go @@ -119,7 +119,7 @@ func (lc *LClient) AsyncCall(timeout time.Duration,rpcHandler IRpcHandler, servi return calcelRpc,nil } -func NewLClient(nodeId int) *Client{ +func NewLClient(nodeId string) *Client{ client := &Client{} client.clientId = atomic.AddUint32(&clientSeq, 1) client.nodeId = nodeId diff --git a/rpc/rclient.go b/rpc/rclient.go index f3cef0d..31992e4 100644 --- a/rpc/rclient.go +++ b/rpc/rclient.go @@ -278,7 +278,7 @@ func (rc *RClient) OnClose() { rc.TriggerRpcConnEvent(false, rc.selfClient.GetClientId(), rc.selfClient.GetNodeId()) } -func NewRClient(nodeId int, addr string, maxRpcParamLen uint32,compressBytesLen int,triggerRpcConnEvent TriggerRpcConnEvent) *Client{ +func NewRClient(nodeId string, addr string, maxRpcParamLen uint32,compressBytesLen int,triggerRpcConnEvent TriggerRpcConnEvent) *Client{ client := &Client{} client.clientId = atomic.AddUint32(&clientSeq, 1) client.nodeId = nodeId diff --git a/rpc/rpchandler.go b/rpc/rpchandler.go index fef321f..876281b 100644 --- a/rpc/rpchandler.go +++ b/rpc/rpchandler.go @@ -14,9 +14,9 @@ import ( const maxClusterNode int = 128 -type FuncRpcClient func(nodeId int, serviceMethod string,filterRetire bool, client []*Client) (error, int) +type FuncRpcClient func(nodeId string, serviceMethod string,filterRetire bool, client []*Client) (error, int) type FuncRpcServer func() *Server - +const NodeIdNull = "" var nilError = reflect.Zero(reflect.TypeOf((*error)(nil)).Elem()) @@ -65,15 +65,15 @@ type RpcHandler struct { pClientList []*Client } -type TriggerRpcConnEvent func(bConnect bool, clientSeq uint32, nodeId int) +type TriggerRpcConnEvent func(bConnect bool, clientSeq uint32, nodeId string) type INodeListener interface { - OnNodeConnected(nodeId int) - OnNodeDisconnect(nodeId int) + OnNodeConnected(nodeId string) + OnNodeDisconnect(nodeId string) } type IDiscoveryServiceListener interface { - OnDiscoveryService(nodeId int, serviceName []string) - OnUnDiscoveryService(nodeId int) + OnDiscoveryService(nodeId string, serviceName []string) + OnUnDiscoveryService(nodeId string) } type CancelRpc func() @@ -89,18 +89,18 @@ type IRpcHandler interface { CallMethod(client *Client,ServiceMethod string, param interface{},callBack reflect.Value, reply interface{}) error Call(serviceMethod string, args interface{}, reply interface{}) error - CallNode(nodeId int, serviceMethod string, args interface{}, reply interface{}) error + CallNode(nodeId string, serviceMethod string, args interface{}, reply interface{}) error AsyncCall(serviceMethod string, args interface{}, callback interface{}) error - AsyncCallNode(nodeId int, serviceMethod string, args interface{}, callback interface{}) error + AsyncCallNode(nodeId string, serviceMethod string, args interface{}, callback interface{}) error CallWithTimeout(timeout time.Duration,serviceMethod string, args interface{}, reply interface{}) error - CallNodeWithTimeout(timeout time.Duration,nodeId int, serviceMethod string, args interface{}, reply interface{}) error + CallNodeWithTimeout(timeout time.Duration,nodeId string, serviceMethod string, args interface{}, reply interface{}) error AsyncCallWithTimeout(timeout time.Duration,serviceMethod string, args interface{}, callback interface{}) (CancelRpc,error) - AsyncCallNodeWithTimeout(timeout time.Duration,nodeId int, serviceMethod string, args interface{}, callback interface{}) (CancelRpc,error) + AsyncCallNodeWithTimeout(timeout time.Duration,nodeId string, serviceMethod string, args interface{}, callback interface{}) (CancelRpc,error) Go(serviceMethod string, args interface{}) error - GoNode(nodeId int, serviceMethod string, args interface{}) error - RawGoNode(rpcProcessorType RpcProcessorType, nodeId int, rpcMethodId uint32, serviceName string, rawArgs []byte) error + GoNode(nodeId string, serviceMethod string, args interface{}) error + RawGoNode(rpcProcessorType RpcProcessorType, nodeId string, rpcMethodId uint32, serviceName string, rawArgs []byte) error CastGo(serviceMethod string, args interface{}) error IsSingleCoroutine() bool UnmarshalInParam(rpcProcessor IRpcProcessor, serviceMethod string, rawRpcMethodId uint32, inParam []byte) (interface{}, error) @@ -426,7 +426,7 @@ func (handler *RpcHandler) CallMethod(client *Client,ServiceMethod string, param return err } -func (handler *RpcHandler) goRpc(processor IRpcProcessor, bCast bool, nodeId int, serviceMethod string, args interface{}) error { +func (handler *RpcHandler) goRpc(processor IRpcProcessor, bCast bool, nodeId string, serviceMethod string, args interface{}) error { var pClientList [maxClusterNode]*Client err, count := handler.funcRpcClient(nodeId, serviceMethod,false, pClientList[:]) if count == 0 { @@ -456,7 +456,7 @@ func (handler *RpcHandler) goRpc(processor IRpcProcessor, bCast bool, nodeId int return err } -func (handler *RpcHandler) callRpc(timeout time.Duration,nodeId int, serviceMethod string, args interface{}, reply interface{}) error { +func (handler *RpcHandler) callRpc(timeout time.Duration,nodeId string, serviceMethod string, args interface{}, reply interface{}) error { var pClientList [maxClusterNode]*Client err, count := handler.funcRpcClient(nodeId, serviceMethod,false, pClientList[:]) if err != nil { @@ -480,7 +480,7 @@ func (handler *RpcHandler) callRpc(timeout time.Duration,nodeId int, serviceMeth return err } -func (handler *RpcHandler) asyncCallRpc(timeout time.Duration,nodeId int, serviceMethod string, args interface{}, callback interface{}) (CancelRpc,error) { +func (handler *RpcHandler) asyncCallRpc(timeout time.Duration,nodeId string, serviceMethod string, args interface{}, callback interface{}) (CancelRpc,error) { fVal := reflect.ValueOf(callback) if fVal.Kind() != reflect.Func { err := errors.New("call " + serviceMethod + " input callback param is error!") @@ -505,14 +505,14 @@ func (handler *RpcHandler) asyncCallRpc(timeout time.Duration,nodeId int, servic err, count := handler.funcRpcClient(nodeId, serviceMethod,false, pClientList[:]) if count == 0 || err != nil { if err == nil { - if nodeId > 0 { + if nodeId != NodeIdNull { err = fmt.Errorf("cannot find %s from nodeId %d",serviceMethod,nodeId) }else { err = fmt.Errorf("No %s service found in the origin network",serviceMethod) } } fVal.Call([]reflect.Value{reflect.ValueOf(reply), reflect.ValueOf(err)}) - log.Error("cannot find serviceMethod from node",log.String("serviceMethod",serviceMethod),log.Int("nodeId",nodeId)) + log.Error("cannot find serviceMethod from node",log.String("serviceMethod",serviceMethod),log.String("nodeId",nodeId)) return emptyCancelRpc,nil } @@ -537,53 +537,53 @@ func (handler *RpcHandler) IsSingleCoroutine() bool { } func (handler *RpcHandler) CallWithTimeout(timeout time.Duration,serviceMethod string, args interface{}, reply interface{}) error { - return handler.callRpc(timeout,0, serviceMethod, args, reply) + return handler.callRpc(timeout,NodeIdNull, serviceMethod, args, reply) } -func (handler *RpcHandler) CallNodeWithTimeout(timeout time.Duration,nodeId int, serviceMethod string, args interface{}, reply interface{}) error{ +func (handler *RpcHandler) CallNodeWithTimeout(timeout time.Duration,nodeId string, serviceMethod string, args interface{}, reply interface{}) error{ return handler.callRpc(timeout,nodeId, serviceMethod, args, reply) } func (handler *RpcHandler) AsyncCallWithTimeout(timeout time.Duration,serviceMethod string, args interface{}, callback interface{}) (CancelRpc,error){ - return handler.asyncCallRpc(timeout,0, serviceMethod, args, callback) + return handler.asyncCallRpc(timeout,NodeIdNull, serviceMethod, args, callback) } -func (handler *RpcHandler) AsyncCallNodeWithTimeout(timeout time.Duration,nodeId int, serviceMethod string, args interface{}, callback interface{}) (CancelRpc,error){ +func (handler *RpcHandler) AsyncCallNodeWithTimeout(timeout time.Duration,nodeId string, serviceMethod string, args interface{}, callback interface{}) (CancelRpc,error){ return handler.asyncCallRpc(timeout,nodeId, serviceMethod, args, callback) } func (handler *RpcHandler) AsyncCall(serviceMethod string, args interface{}, callback interface{}) error { - _,err := handler.asyncCallRpc(DefaultRpcTimeout,0, serviceMethod, args, callback) + _,err := handler.asyncCallRpc(DefaultRpcTimeout,NodeIdNull, serviceMethod, args, callback) return err } func (handler *RpcHandler) Call(serviceMethod string, args interface{}, reply interface{}) error { - return handler.callRpc(DefaultRpcTimeout,0, serviceMethod, args, reply) + return handler.callRpc(DefaultRpcTimeout,NodeIdNull, serviceMethod, args, reply) } func (handler *RpcHandler) Go(serviceMethod string, args interface{}) error { - return handler.goRpc(nil, false, 0, serviceMethod, args) + return handler.goRpc(nil, false, NodeIdNull, serviceMethod, args) } -func (handler *RpcHandler) AsyncCallNode(nodeId int, serviceMethod string, args interface{}, callback interface{}) error { +func (handler *RpcHandler) AsyncCallNode(nodeId string, serviceMethod string, args interface{}, callback interface{}) error { _,err:= handler.asyncCallRpc(DefaultRpcTimeout,nodeId, serviceMethod, args, callback) return err } -func (handler *RpcHandler) CallNode(nodeId int, serviceMethod string, args interface{}, reply interface{}) error { +func (handler *RpcHandler) CallNode(nodeId string, serviceMethod string, args interface{}, reply interface{}) error { return handler.callRpc(DefaultRpcTimeout,nodeId, serviceMethod, args, reply) } -func (handler *RpcHandler) GoNode(nodeId int, serviceMethod string, args interface{}) error { +func (handler *RpcHandler) GoNode(nodeId string, serviceMethod string, args interface{}) error { return handler.goRpc(nil, false, nodeId, serviceMethod, args) } func (handler *RpcHandler) CastGo(serviceMethod string, args interface{}) error { - return handler.goRpc(nil, true, 0, serviceMethod, args) + return handler.goRpc(nil, true, NodeIdNull, serviceMethod, args) } -func (handler *RpcHandler) RawGoNode(rpcProcessorType RpcProcessorType, nodeId int, rpcMethodId uint32, serviceName string, rawArgs []byte) error { +func (handler *RpcHandler) RawGoNode(rpcProcessorType RpcProcessorType, nodeId string, rpcMethodId uint32, serviceName string, rawArgs []byte) error { processor := GetProcessor(uint8(rpcProcessorType)) err, count := handler.funcRpcClient(nodeId, serviceName,false, handler.pClientList) if count == 0 || err != nil { diff --git a/service/service.go b/service/service.go index 3979c4b..08fb8af 100644 --- a/service/service.go +++ b/service/service.go @@ -67,14 +67,14 @@ type Service struct { // RpcConnEvent Node结点连接事件 type RpcConnEvent struct{ IsConnect bool - NodeId int + NodeId string } // DiscoveryServiceEvent 发现服务结点 type DiscoveryServiceEvent struct{ IsDiscovery bool ServiceName []string - NodeId int + NodeId string } func SetMaxServiceChannel(maxEventChannel int){ diff --git a/sysservice/tcpservice/tcpservice.go b/sysservice/tcpservice/tcpservice.go index 1793b41..9209ebc 100644 --- a/sysservice/tcpservice/tcpservice.go +++ b/sysservice/tcpservice/tcpservice.go @@ -6,7 +6,6 @@ import ( "github.com/duanhf2012/origin/v2/log" "github.com/duanhf2012/origin/v2/network" "github.com/duanhf2012/origin/v2/network/processor" - "github.com/duanhf2012/origin/v2/node" "github.com/duanhf2012/origin/v2/service" "github.com/duanhf2012/origin/v2/util/bytespool" "runtime" @@ -22,6 +21,7 @@ type TcpService struct { mapClientLocker sync.RWMutex mapClient map[uint64] *Client process processor.IProcessor + machineId uint16 } type TcpPackType int8 @@ -33,7 +33,7 @@ const( ) const ( - MaxNodeId = 1<<14 - 1 //最大值 16383 + MaxMachineId = 1<<14 - 1 //最大值 16383 MaxSeed = 1<<19 - 1 //最大值 524287 MaxTime = 1<<31 - 1 //最大值 2147483647 ) @@ -55,7 +55,7 @@ type Client struct { func (tcpService *TcpService) genId() uint64 { newSeed := atomic.AddUint32(&seed,1) % MaxSeed nowTime := uint64(time.Now().Unix())%MaxTime - return (uint64(node.GetNodeId()%MaxNodeId)<<50)|(nowTime<<19)|uint64(newSeed) + return (uint64(tcpService.machineId)<<50)|(nowTime<<19)|uint64(newSeed) } func (tcpService *TcpService) OnInit() error{ @@ -74,6 +74,17 @@ func (tcpService *TcpService) OnInit() error{ if ok == true { tcpService.tcpServer.MaxConnNum = int(MaxConnNum.(float64)) } + + MachineId,ok := tcpCfg["MachineId"] + if ok == true { + tcpService.machineId = uint16(MachineId.(float64)) + if tcpService.machineId > MaxMachineId { + return fmt.Errorf("MachineId is error!") + } + }else { + return fmt.Errorf("MachineId is error!") + } + PendingWriteNum,ok := tcpCfg["PendingWriteNum"] if ok == true { tcpService.tcpServer.PendingWriteNum = int(PendingWriteNum.(float64)) diff --git a/sysservice/wsservice/wsservice.go b/sysservice/wsservice/wsservice.go index b1643d2..0f3a0be 100644 --- a/sysservice/wsservice/wsservice.go +++ b/sysservice/wsservice/wsservice.go @@ -7,7 +7,6 @@ import ( "github.com/duanhf2012/origin/v2/network" "github.com/duanhf2012/origin/v2/network/processor" "github.com/duanhf2012/origin/v2/service" - "github.com/duanhf2012/origin/v2/node" "sync" "sync/atomic" "time" @@ -22,8 +21,7 @@ type WSService struct { mapClientLocker sync.RWMutex mapClient map[uint64] *WSClient process processor.IProcessor - - + machineId uint16 } var seed uint32 @@ -41,7 +39,7 @@ const Default_WS_PendingWriteNum = 10000 const Default_WS_MaxMsgLen = 65535 const ( - MaxNodeId = 1<<14 - 1 //最大值 16383 + MaxMachineId = 1<<14 - 1 //最大值 16383 MaxSeed = 1<<19 - 1 //最大值 524287 MaxTime = 1<<31 - 1 //最大值 2147483647 ) @@ -79,6 +77,15 @@ func (ws *WSService) OnInit() error{ if ok == true { ws.wsServer.MaxConnNum = int(MaxConnNum.(float64)) } + MachineId,ok := wsCfg["MachineId"] + if ok == true { + ws.machineId = uint16(MachineId.(float64)) + if ws.machineId > MaxMachineId { + return fmt.Errorf("MachineId is error!") + } + }else { + return fmt.Errorf("MachineId is error!") + } PendingWriteNum,ok := wsCfg["PendingWriteNum"] if ok == true { ws.wsServer.PendingWriteNum = int(PendingWriteNum.(float64)) @@ -119,13 +126,9 @@ func (ws *WSService) SetProcessor(process processor.IProcessor,handler event.IEv } func (ws *WSService) genId() uint64 { - if node.GetNodeId()>MaxNodeId{ - panic("nodeId exceeds the maximum!") - } - newSeed := atomic.AddUint32(&seed,1) % MaxSeed nowTime := uint64(time.Now().Unix())%MaxTime - return (uint64(node.GetNodeId())<<50)|(nowTime<<19)|uint64(newSeed) + return (uint64(ws.machineId)<<50)|(nowTime<<19)|uint64(newSeed) } func (ws *WSService) NewWSClient(conn *network.WSConn) network.Agent {