From 769f680b17cf25cf0660f4af5238b6583ae1f39d Mon Sep 17 00:00:00 2001 From: orgin Date: Thu, 7 Jul 2022 13:38:38 +0800 Subject: [PATCH] =?UTF-8?q?=E6=96=B0=E5=A2=9E=E6=9C=8D=E5=8A=A1=E5=8F=91?= =?UTF-8?q?=E7=8E=B0=E4=BA=8B=E4=BB=B6=E7=9B=91=E5=90=AC?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cluster/cluster.go | 68 +++++++++++++++++++++++++++++++++++++ cluster/dynamicdiscovery.go | 3 ++ event/eventtype.go | 5 +-- rpc/rpchandler.go | 7 +++- service/service.go | 54 +++++++++++++++++++++++------ service/servicemgr.go | 5 +++ 6 files changed, 128 insertions(+), 14 deletions(-) diff --git a/cluster/cluster.go b/cluster/cluster.go index 1a157a4..9d44dfb 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -56,6 +56,7 @@ type Cluster struct { rpcServer rpc.Server rpcEventLocker sync.RWMutex //Rpc事件监听保护锁 mapServiceListenRpcEvent map[string]struct{} //ServiceName + mapServiceListenDiscoveryEvent map[string]struct{} //ServiceName } func GetCluster() *Cluster { @@ -224,6 +225,9 @@ func (cls *Cluster) Init(localNodeId int, setupServiceFun SetupServiceFun) error //2.安装服务发现结点 cls.SetupServiceDiscovery(localNodeId, setupServiceFun) service.RegRpcEventFun = cls.RegRpcEvent + service.UnRegRpcEventFun = cls.UnRegRpcEvent + service.RegDiscoveryServiceEventFun = cls.RegDiscoveryEvent + service.UnRegDiscoveryServiceEventFun = cls.UnReDiscoveryEvent err = cls.serviceDiscovery.InitDiscovery(localNodeId, cls.serviceDiscoveryDelNode, cls.serviceDiscoverySetNodeInfo) if err != nil { @@ -379,6 +383,27 @@ func (cls *Cluster) triggerRpcEvent(bConnect bool, clientSeq uint32, nodeId int) cls.rpcEventLocker.Unlock() } + +func (cls *Cluster) TriggerDiscoveryEvent(bDiscovery bool, nodeId int, serviceName []string) { + cls.rpcEventLocker.Lock() + defer cls.rpcEventLocker.Unlock() + + for sName, _ := range cls.mapServiceListenDiscoveryEvent { + ser := service.GetService(sName) + if ser == nil { + log.SError("cannot find service name ", serviceName) + continue + } + + var eventData service.DiscoveryServiceEvent + eventData.IsDiscovery = bDiscovery + eventData.NodeId = nodeId + eventData.ServiceName = serviceName + ser.(service.IModule).NotifyEvent(&eventData) + } + +} + func (cls *Cluster) GetLocalNodeInfo() *NodeInfo { return &cls.localNodeInfo } @@ -399,6 +424,23 @@ func (cls *Cluster) UnRegRpcEvent(serviceName string) { cls.rpcEventLocker.Unlock() } + +func (cls *Cluster) RegDiscoveryEvent(serviceName string) { + cls.rpcEventLocker.Lock() + if cls.mapServiceListenDiscoveryEvent == nil { + cls.mapServiceListenDiscoveryEvent = map[string]struct{}{} + } + + cls.mapServiceListenDiscoveryEvent[serviceName] = struct{}{} + cls.rpcEventLocker.Unlock() +} + +func (cls *Cluster) UnReDiscoveryEvent(serviceName string) { + cls.rpcEventLocker.Lock() + delete(cls.mapServiceListenDiscoveryEvent, serviceName) + cls.rpcEventLocker.Unlock() +} + func (cls *Cluster) FetchAllNodeId(fetchFun func(nodeId int)) { cls.locker.Lock() for nodeId, _ := range cls.mapIdNode { @@ -420,6 +462,32 @@ func HasService(nodeId int, serviceName string) bool { return false } +func GetNodeByServiceName(serviceName string) map[int]struct{} { + cluster.locker.RLock() + defer cluster.locker.RUnlock() + + mapNode, ok := cluster.mapServiceNode[serviceName] + if ok == false { + return nil + } + + var mapNodeId map[int]struct{} + for nodeId,_ := range mapNode { + mapNodeId[nodeId] = struct{}{} + } + + return mapNodeId +} + func (cls *Cluster) GetGlobalCfg() interface{} { return cls.globalCfg } + + +func (cls *Cluster) GetNodeInfo(nodeId int) (NodeInfo,bool) { + cls.locker.Lock() + defer cls.locker.Unlock() + + nodeInfo,ok:= cls.mapIdNode[nodeId] + return nodeInfo,ok +} diff --git a/cluster/dynamicdiscovery.go b/cluster/dynamicdiscovery.go index 4f984a6..6293cd4 100644 --- a/cluster/dynamicdiscovery.go +++ b/cluster/dynamicdiscovery.go @@ -290,6 +290,8 @@ func (dc *DynamicDiscoveryClient) RPC_SubServiceDiscover(req *rpc.SubscribeDisco //删除不必要的结点 for _, nodeId := range willDelNodeId { + nodeInfo,_ := cluster.GetNodeInfo(int(nodeId)) + cluster.TriggerDiscoveryEvent(false,int(nodeId),nodeInfo.PublicServiceList) dc.removeMasterNode(req.MasterNodeId, int32(nodeId)) if dc.findNodeId(nodeId) == false { dc.funDelService(int(nodeId), false) @@ -300,6 +302,7 @@ func (dc *DynamicDiscoveryClient) RPC_SubServiceDiscover(req *rpc.SubscribeDisco for _, nodeInfo := range mapNodeInfo { dc.addMasterNode(req.MasterNodeId, nodeInfo.NodeId) dc.setNodeInfo(nodeInfo) + cluster.TriggerDiscoveryEvent(true,int(nodeInfo.NodeId),nodeInfo.PublicServiceList) } return nil diff --git a/event/eventtype.go b/event/eventtype.go index 026d34e..daac65d 100644 --- a/event/eventtype.go +++ b/event/eventtype.go @@ -9,8 +9,9 @@ const ( Sys_Event_Tcp EventType = -3 Sys_Event_Http_Event EventType = -4 - Sys_Event_WebSocket EventType = -5 - Sys_Event_Rpc_Event EventType = -6 + Sys_Event_WebSocket EventType = -5 + Sys_Event_Node_Event EventType = -6 + Sys_Event_DiscoverService EventType = -7 Sys_Event_User_Define EventType = 1 ) diff --git a/rpc/rpchandler.go b/rpc/rpchandler.go index a1bbd1a..ba0438d 100644 --- a/rpc/rpchandler.go +++ b/rpc/rpchandler.go @@ -68,11 +68,16 @@ type RpcHandler struct { } type TriggerRpcEvent func(bConnect bool, clientSeq uint32, nodeId int) -type IRpcListener interface { +type INodeListener interface { OnNodeConnected(nodeId int) OnNodeDisconnect(nodeId int) } +type IDiscoveryServiceListener interface { + OnDiscoveryService(nodeId int, serviceName []string) + OnUnDiscoveryService(nodeId int, serviceName []string) +} + type IRpcHandler interface { IRpcHandlerChannel GetName() string diff --git a/service/service.go b/service/service.go index 0436302..381d965 100644 --- a/service/service.go +++ b/service/service.go @@ -58,7 +58,8 @@ type Service struct { startStatus bool eventProcessor event.IEventProcessor profiler *profiler.Profiler //性能分析器 - rpcEventLister rpc.IRpcListener + nodeEventLister rpc.INodeListener + discoveryServiceLister rpc.IDiscoveryServiceListener chanEvent chan event.IEvent } @@ -68,6 +69,13 @@ type RpcConnEvent struct{ NodeId int } +// DiscoveryServiceEvent 发现服务结点 +type DiscoveryServiceEvent struct{ + IsDiscovery bool + ServiceName []string + NodeId int +} + func SetMaxServiceChannel(maxEventChannel int){ maxServiceEventChannel = maxEventChannel eventPool = originSync.NewPoolEx(make(chan originSync.IPoolData, maxServiceEventChannel), func() originSync.IPoolData { @@ -75,8 +83,12 @@ func SetMaxServiceChannel(maxEventChannel int){ }) } +func (rpcEventData *DiscoveryServiceEvent) GetEventType() event.EventType{ + return event.Sys_Event_DiscoverService +} + func (rpcEventData *RpcConnEvent) GetEventType() event.EventType{ - return event.Sys_Event_Rpc_Event + return event.Sys_Event_Node_Event } func (s *Service) OnSetup(iService IService){ @@ -268,24 +280,44 @@ func (s *Service) RegRawRpc(rpcMethodId uint32,rawRpcCB rpc.RawRpcCallBack){ func (s *Service) OnStart(){ } -func (s *Service) OnRpcEvent(ev event.IEvent){ +func (s *Service) OnNodeEvent(ev event.IEvent){ event := ev.(*RpcConnEvent) if event.IsConnect { - s.rpcEventLister.OnNodeConnected(event.NodeId) + s.nodeEventLister.OnNodeConnected(event.NodeId) }else{ - s.rpcEventLister.OnNodeDisconnect(event.NodeId) + s.nodeEventLister.OnNodeDisconnect(event.NodeId) } } -func (s *Service) RegRpcListener(rpcEventLister rpc.IRpcListener) { - s.rpcEventLister = rpcEventLister - s.RegEventReceiverFunc(event.Sys_Event_Rpc_Event,s.GetEventHandler(),s.OnRpcEvent) +func (s *Service) OnDiscoverServiceEvent(ev event.IEvent){ + event := ev.(*DiscoveryServiceEvent) + if event.IsDiscovery { + s.discoveryServiceLister.OnDiscoveryService(event.NodeId,event.ServiceName) + }else{ + s.discoveryServiceLister.OnUnDiscoveryService(event.NodeId,event.ServiceName) + } +} + +func (s *Service) RegRpcListener(rpcEventLister rpc.INodeListener) { + s.nodeEventLister = rpcEventLister + s.RegEventReceiverFunc(event.Sys_Event_Node_Event,s.GetEventHandler(),s.OnNodeEvent) RegRpcEventFun(s.GetName()) } -func (s *Service) UnRegRpcListener(rpcLister rpc.IRpcListener) { - s.UnRegEventReceiverFunc(event.Sys_Event_Rpc_Event,s.GetEventHandler()) - RegRpcEventFun(s.GetName()) +func (s *Service) UnRegRpcListener(rpcLister rpc.INodeListener) { + s.UnRegEventReceiverFunc(event.Sys_Event_Node_Event,s.GetEventHandler()) + UnRegRpcEventFun(s.GetName()) +} + +func (s *Service) RegDiscoverListener(discoveryServiceListener rpc.IDiscoveryServiceListener) { + s.discoveryServiceLister = discoveryServiceListener + s.RegEventReceiverFunc(event.Sys_Event_DiscoverService,s.GetEventHandler(),s.OnDiscoverServiceEvent) + RegDiscoveryServiceEventFun(s.GetName()) +} + +func (s *Service) UnRegDiscoverListener(rpcLister rpc.INodeListener) { + s.UnRegEventReceiverFunc(event.Sys_Event_DiscoverService,s.GetEventHandler()) + UnRegDiscoveryServiceEventFun(s.GetName()) } diff --git a/service/servicemgr.go b/service/servicemgr.go index 2654ec2..8bab399 100644 --- a/service/servicemgr.go +++ b/service/servicemgr.go @@ -5,7 +5,12 @@ var mapServiceName map[string]IService var setupServiceList []IService type RegRpcEventFunType func(serviceName string) +type RegDiscoveryServiceEventFunType func(serviceName string) var RegRpcEventFun RegRpcEventFunType +var UnRegRpcEventFun RegRpcEventFunType + +var RegDiscoveryServiceEventFun RegDiscoveryServiceEventFunType +var UnRegDiscoveryServiceEventFun RegDiscoveryServiceEventFunType func init(){ mapServiceName = map[string]IService{}