新增服务发现事件监听

This commit is contained in:
orgin
2022-07-07 13:38:38 +08:00
parent 77988906f8
commit 769f680b17
6 changed files with 128 additions and 14 deletions

View File

@@ -56,6 +56,7 @@ type Cluster struct {
rpcServer rpc.Server rpcServer rpc.Server
rpcEventLocker sync.RWMutex //Rpc事件监听保护锁 rpcEventLocker sync.RWMutex //Rpc事件监听保护锁
mapServiceListenRpcEvent map[string]struct{} //ServiceName mapServiceListenRpcEvent map[string]struct{} //ServiceName
mapServiceListenDiscoveryEvent map[string]struct{} //ServiceName
} }
func GetCluster() *Cluster { func GetCluster() *Cluster {
@@ -224,6 +225,9 @@ func (cls *Cluster) Init(localNodeId int, setupServiceFun SetupServiceFun) error
//2.安装服务发现结点 //2.安装服务发现结点
cls.SetupServiceDiscovery(localNodeId, setupServiceFun) cls.SetupServiceDiscovery(localNodeId, setupServiceFun)
service.RegRpcEventFun = cls.RegRpcEvent service.RegRpcEventFun = cls.RegRpcEvent
service.UnRegRpcEventFun = cls.UnRegRpcEvent
service.RegDiscoveryServiceEventFun = cls.RegDiscoveryEvent
service.UnRegDiscoveryServiceEventFun = cls.UnReDiscoveryEvent
err = cls.serviceDiscovery.InitDiscovery(localNodeId, cls.serviceDiscoveryDelNode, cls.serviceDiscoverySetNodeInfo) err = cls.serviceDiscovery.InitDiscovery(localNodeId, cls.serviceDiscoveryDelNode, cls.serviceDiscoverySetNodeInfo)
if err != nil { if err != nil {
@@ -379,6 +383,27 @@ func (cls *Cluster) triggerRpcEvent(bConnect bool, clientSeq uint32, nodeId int)
cls.rpcEventLocker.Unlock() cls.rpcEventLocker.Unlock()
} }
func (cls *Cluster) TriggerDiscoveryEvent(bDiscovery bool, nodeId int, serviceName []string) {
cls.rpcEventLocker.Lock()
defer cls.rpcEventLocker.Unlock()
for sName, _ := range cls.mapServiceListenDiscoveryEvent {
ser := service.GetService(sName)
if ser == nil {
log.SError("cannot find service name ", serviceName)
continue
}
var eventData service.DiscoveryServiceEvent
eventData.IsDiscovery = bDiscovery
eventData.NodeId = nodeId
eventData.ServiceName = serviceName
ser.(service.IModule).NotifyEvent(&eventData)
}
}
func (cls *Cluster) GetLocalNodeInfo() *NodeInfo { func (cls *Cluster) GetLocalNodeInfo() *NodeInfo {
return &cls.localNodeInfo return &cls.localNodeInfo
} }
@@ -399,6 +424,23 @@ func (cls *Cluster) UnRegRpcEvent(serviceName string) {
cls.rpcEventLocker.Unlock() cls.rpcEventLocker.Unlock()
} }
func (cls *Cluster) RegDiscoveryEvent(serviceName string) {
cls.rpcEventLocker.Lock()
if cls.mapServiceListenDiscoveryEvent == nil {
cls.mapServiceListenDiscoveryEvent = map[string]struct{}{}
}
cls.mapServiceListenDiscoveryEvent[serviceName] = struct{}{}
cls.rpcEventLocker.Unlock()
}
func (cls *Cluster) UnReDiscoveryEvent(serviceName string) {
cls.rpcEventLocker.Lock()
delete(cls.mapServiceListenDiscoveryEvent, serviceName)
cls.rpcEventLocker.Unlock()
}
func (cls *Cluster) FetchAllNodeId(fetchFun func(nodeId int)) { func (cls *Cluster) FetchAllNodeId(fetchFun func(nodeId int)) {
cls.locker.Lock() cls.locker.Lock()
for nodeId, _ := range cls.mapIdNode { for nodeId, _ := range cls.mapIdNode {
@@ -420,6 +462,32 @@ func HasService(nodeId int, serviceName string) bool {
return false return false
} }
func GetNodeByServiceName(serviceName string) map[int]struct{} {
cluster.locker.RLock()
defer cluster.locker.RUnlock()
mapNode, ok := cluster.mapServiceNode[serviceName]
if ok == false {
return nil
}
var mapNodeId map[int]struct{}
for nodeId,_ := range mapNode {
mapNodeId[nodeId] = struct{}{}
}
return mapNodeId
}
func (cls *Cluster) GetGlobalCfg() interface{} { func (cls *Cluster) GetGlobalCfg() interface{} {
return cls.globalCfg return cls.globalCfg
} }
func (cls *Cluster) GetNodeInfo(nodeId int) (NodeInfo,bool) {
cls.locker.Lock()
defer cls.locker.Unlock()
nodeInfo,ok:= cls.mapIdNode[nodeId]
return nodeInfo,ok
}

View File

@@ -290,6 +290,8 @@ func (dc *DynamicDiscoveryClient) RPC_SubServiceDiscover(req *rpc.SubscribeDisco
//删除不必要的结点 //删除不必要的结点
for _, nodeId := range willDelNodeId { for _, nodeId := range willDelNodeId {
nodeInfo,_ := cluster.GetNodeInfo(int(nodeId))
cluster.TriggerDiscoveryEvent(false,int(nodeId),nodeInfo.PublicServiceList)
dc.removeMasterNode(req.MasterNodeId, int32(nodeId)) dc.removeMasterNode(req.MasterNodeId, int32(nodeId))
if dc.findNodeId(nodeId) == false { if dc.findNodeId(nodeId) == false {
dc.funDelService(int(nodeId), false) dc.funDelService(int(nodeId), false)
@@ -300,6 +302,7 @@ func (dc *DynamicDiscoveryClient) RPC_SubServiceDiscover(req *rpc.SubscribeDisco
for _, nodeInfo := range mapNodeInfo { for _, nodeInfo := range mapNodeInfo {
dc.addMasterNode(req.MasterNodeId, nodeInfo.NodeId) dc.addMasterNode(req.MasterNodeId, nodeInfo.NodeId)
dc.setNodeInfo(nodeInfo) dc.setNodeInfo(nodeInfo)
cluster.TriggerDiscoveryEvent(true,int(nodeInfo.NodeId),nodeInfo.PublicServiceList)
} }
return nil return nil

View File

@@ -9,8 +9,9 @@ const (
Sys_Event_Tcp EventType = -3 Sys_Event_Tcp EventType = -3
Sys_Event_Http_Event EventType = -4 Sys_Event_Http_Event EventType = -4
Sys_Event_WebSocket EventType = -5 Sys_Event_WebSocket EventType = -5
Sys_Event_Rpc_Event EventType = -6 Sys_Event_Node_Event EventType = -6
Sys_Event_DiscoverService EventType = -7
Sys_Event_User_Define EventType = 1 Sys_Event_User_Define EventType = 1
) )

View File

@@ -68,11 +68,16 @@ type RpcHandler struct {
} }
type TriggerRpcEvent func(bConnect bool, clientSeq uint32, nodeId int) type TriggerRpcEvent func(bConnect bool, clientSeq uint32, nodeId int)
type IRpcListener interface { type INodeListener interface {
OnNodeConnected(nodeId int) OnNodeConnected(nodeId int)
OnNodeDisconnect(nodeId int) OnNodeDisconnect(nodeId int)
} }
type IDiscoveryServiceListener interface {
OnDiscoveryService(nodeId int, serviceName []string)
OnUnDiscoveryService(nodeId int, serviceName []string)
}
type IRpcHandler interface { type IRpcHandler interface {
IRpcHandlerChannel IRpcHandlerChannel
GetName() string GetName() string

View File

@@ -58,7 +58,8 @@ type Service struct {
startStatus bool startStatus bool
eventProcessor event.IEventProcessor eventProcessor event.IEventProcessor
profiler *profiler.Profiler //性能分析器 profiler *profiler.Profiler //性能分析器
rpcEventLister rpc.IRpcListener nodeEventLister rpc.INodeListener
discoveryServiceLister rpc.IDiscoveryServiceListener
chanEvent chan event.IEvent chanEvent chan event.IEvent
} }
@@ -68,6 +69,13 @@ type RpcConnEvent struct{
NodeId int NodeId int
} }
// DiscoveryServiceEvent 发现服务结点
type DiscoveryServiceEvent struct{
IsDiscovery bool
ServiceName []string
NodeId int
}
func SetMaxServiceChannel(maxEventChannel int){ func SetMaxServiceChannel(maxEventChannel int){
maxServiceEventChannel = maxEventChannel maxServiceEventChannel = maxEventChannel
eventPool = originSync.NewPoolEx(make(chan originSync.IPoolData, maxServiceEventChannel), func() originSync.IPoolData { eventPool = originSync.NewPoolEx(make(chan originSync.IPoolData, maxServiceEventChannel), func() originSync.IPoolData {
@@ -75,8 +83,12 @@ func SetMaxServiceChannel(maxEventChannel int){
}) })
} }
func (rpcEventData *DiscoveryServiceEvent) GetEventType() event.EventType{
return event.Sys_Event_DiscoverService
}
func (rpcEventData *RpcConnEvent) GetEventType() event.EventType{ func (rpcEventData *RpcConnEvent) GetEventType() event.EventType{
return event.Sys_Event_Rpc_Event return event.Sys_Event_Node_Event
} }
func (s *Service) OnSetup(iService IService){ func (s *Service) OnSetup(iService IService){
@@ -268,24 +280,44 @@ func (s *Service) RegRawRpc(rpcMethodId uint32,rawRpcCB rpc.RawRpcCallBack){
func (s *Service) OnStart(){ func (s *Service) OnStart(){
} }
func (s *Service) OnRpcEvent(ev event.IEvent){ func (s *Service) OnNodeEvent(ev event.IEvent){
event := ev.(*RpcConnEvent) event := ev.(*RpcConnEvent)
if event.IsConnect { if event.IsConnect {
s.rpcEventLister.OnNodeConnected(event.NodeId) s.nodeEventLister.OnNodeConnected(event.NodeId)
}else{ }else{
s.rpcEventLister.OnNodeDisconnect(event.NodeId) s.nodeEventLister.OnNodeDisconnect(event.NodeId)
} }
} }
func (s *Service) RegRpcListener(rpcEventLister rpc.IRpcListener) { func (s *Service) OnDiscoverServiceEvent(ev event.IEvent){
s.rpcEventLister = rpcEventLister event := ev.(*DiscoveryServiceEvent)
s.RegEventReceiverFunc(event.Sys_Event_Rpc_Event,s.GetEventHandler(),s.OnRpcEvent) if event.IsDiscovery {
s.discoveryServiceLister.OnDiscoveryService(event.NodeId,event.ServiceName)
}else{
s.discoveryServiceLister.OnUnDiscoveryService(event.NodeId,event.ServiceName)
}
}
func (s *Service) RegRpcListener(rpcEventLister rpc.INodeListener) {
s.nodeEventLister = rpcEventLister
s.RegEventReceiverFunc(event.Sys_Event_Node_Event,s.GetEventHandler(),s.OnNodeEvent)
RegRpcEventFun(s.GetName()) RegRpcEventFun(s.GetName())
} }
func (s *Service) UnRegRpcListener(rpcLister rpc.IRpcListener) { func (s *Service) UnRegRpcListener(rpcLister rpc.INodeListener) {
s.UnRegEventReceiverFunc(event.Sys_Event_Rpc_Event,s.GetEventHandler()) s.UnRegEventReceiverFunc(event.Sys_Event_Node_Event,s.GetEventHandler())
RegRpcEventFun(s.GetName()) UnRegRpcEventFun(s.GetName())
}
func (s *Service) RegDiscoverListener(discoveryServiceListener rpc.IDiscoveryServiceListener) {
s.discoveryServiceLister = discoveryServiceListener
s.RegEventReceiverFunc(event.Sys_Event_DiscoverService,s.GetEventHandler(),s.OnDiscoverServiceEvent)
RegDiscoveryServiceEventFun(s.GetName())
}
func (s *Service) UnRegDiscoverListener(rpcLister rpc.INodeListener) {
s.UnRegEventReceiverFunc(event.Sys_Event_DiscoverService,s.GetEventHandler())
UnRegDiscoveryServiceEventFun(s.GetName())
} }

View File

@@ -5,7 +5,12 @@ var mapServiceName map[string]IService
var setupServiceList []IService var setupServiceList []IService
type RegRpcEventFunType func(serviceName string) type RegRpcEventFunType func(serviceName string)
type RegDiscoveryServiceEventFunType func(serviceName string)
var RegRpcEventFun RegRpcEventFunType var RegRpcEventFun RegRpcEventFunType
var UnRegRpcEventFun RegRpcEventFunType
var RegDiscoveryServiceEventFun RegDiscoveryServiceEventFunType
var UnRegDiscoveryServiceEventFun RegDiscoveryServiceEventFunType
func init(){ func init(){
mapServiceName = map[string]IService{} mapServiceName = map[string]IService{}