Compare commits

...

7 Commits

Author SHA1 Message Date
orgin
3763f7d848 整理优化cluster 2022-07-11 10:55:57 +08:00
orgin
769f680b17 新增服务发现事件监听 2022-07-07 13:38:38 +08:00
orgin
77988906f8 优化记录rpc的方法名称 2022-06-30 17:56:16 +08:00
orgin
ae0ba1d966 优化循环队列test 2022-06-30 10:38:05 +08:00
orgin
f61fd5d1be 新增循环队列 2022-06-30 09:50:32 +08:00
orgin
eb1867c5fd 扩展IService新增接口 2022-06-29 11:15:22 +08:00
orgin
8823d5fba4 扩展服务,新增自定义服务事件管道大小接口 2022-06-28 17:33:03 +08:00
9 changed files with 396 additions and 31 deletions

View File

@@ -46,16 +46,18 @@ type Cluster struct {
globalCfg interface{} //全局配置
localServiceCfg map[string]interface{} //map[serviceName]配置数据*
mapRpc map[int]NodeRpcInfo //nodeId
serviceDiscovery IServiceDiscovery //服务发现接口
locker sync.RWMutex //结点与服务关系保护锁
mapRpc map[int]NodeRpcInfo //nodeId
mapIdNode map[int]NodeInfo //map[NodeId]NodeInfo
mapServiceNode map[string]map[int]struct{} //map[serviceName]map[NodeId]
rpcServer rpc.Server
rpcEventLocker sync.RWMutex //Rpc事件监听保护锁
mapServiceListenRpcEvent map[string]struct{} //ServiceName
mapServiceListenDiscoveryEvent map[string]struct{} //ServiceName
}
func GetCluster() *Cluster {
@@ -94,9 +96,10 @@ func (cls *Cluster) DelNode(nodeId int, immediately bool) {
return
}
cls.locker.Lock()
defer cls.locker.Unlock()
nodeInfo, ok := cls.mapIdNode[nodeId]
if ok == false {
cls.locker.Unlock()
return
}
@@ -112,7 +115,6 @@ func (cls *Cluster) DelNode(nodeId int, immediately bool) {
if rpc.client.IsConnected() {
nodeInfo.status = Discard
rpc.client.Unlock()
cls.locker.Unlock()
log.SRelease("Discard node ", nodeInfo.NodeId, " ", nodeInfo.ListenAddr)
return
}
@@ -126,7 +128,6 @@ func (cls *Cluster) DelNode(nodeId int, immediately bool) {
delete(cls.mapIdNode, nodeId)
delete(cls.mapRpc, nodeId)
cls.locker.Unlock()
if ok == true {
rpc.client.Close(false)
}
@@ -224,6 +225,9 @@ func (cls *Cluster) Init(localNodeId int, setupServiceFun SetupServiceFun) error
//2.安装服务发现结点
cls.SetupServiceDiscovery(localNodeId, setupServiceFun)
service.RegRpcEventFun = cls.RegRpcEvent
service.UnRegRpcEventFun = cls.UnRegRpcEvent
service.RegDiscoveryServiceEventFun = cls.RegDiscoveryEvent
service.UnRegDiscoveryServiceEventFun = cls.UnReDiscoveryEvent
err = cls.serviceDiscovery.InitDiscovery(localNodeId, cls.serviceDiscoveryDelNode, cls.serviceDiscoverySetNodeInfo)
if err != nil {
@@ -364,6 +368,7 @@ func (cls *Cluster) triggerRpcEvent(bConnect bool, clientSeq uint32, nodeId int)
cls.locker.Unlock()
cls.rpcEventLocker.Lock()
defer cls.rpcEventLocker.Unlock()
for serviceName, _ := range cls.mapServiceListenRpcEvent {
ser := service.GetService(serviceName)
if ser == nil {
@@ -376,7 +381,27 @@ func (cls *Cluster) triggerRpcEvent(bConnect bool, clientSeq uint32, nodeId int)
eventData.NodeId = nodeId
ser.(service.IModule).NotifyEvent(&eventData)
}
cls.rpcEventLocker.Unlock()
}
func (cls *Cluster) TriggerDiscoveryEvent(bDiscovery bool, nodeId int, serviceName []string) {
cls.rpcEventLocker.Lock()
defer cls.rpcEventLocker.Unlock()
for sName, _ := range cls.mapServiceListenDiscoveryEvent {
ser := service.GetService(sName)
if ser == nil {
log.SError("cannot find service name ", serviceName)
continue
}
var eventData service.DiscoveryServiceEvent
eventData.IsDiscovery = bDiscovery
eventData.NodeId = nodeId
eventData.ServiceName = serviceName
ser.(service.IModule).NotifyEvent(&eventData)
}
}
func (cls *Cluster) GetLocalNodeInfo() *NodeInfo {
@@ -399,14 +424,25 @@ func (cls *Cluster) UnRegRpcEvent(serviceName string) {
cls.rpcEventLocker.Unlock()
}
func (cls *Cluster) FetchAllNodeId(fetchFun func(nodeId int)) {
cls.locker.Lock()
for nodeId, _ := range cls.mapIdNode {
fetchFun(nodeId)
func (cls *Cluster) RegDiscoveryEvent(serviceName string) {
cls.rpcEventLocker.Lock()
if cls.mapServiceListenDiscoveryEvent == nil {
cls.mapServiceListenDiscoveryEvent = map[string]struct{}{}
}
cls.locker.Unlock()
cls.mapServiceListenDiscoveryEvent[serviceName] = struct{}{}
cls.rpcEventLocker.Unlock()
}
func (cls *Cluster) UnReDiscoveryEvent(serviceName string) {
cls.rpcEventLocker.Lock()
delete(cls.mapServiceListenDiscoveryEvent, serviceName)
cls.rpcEventLocker.Unlock()
}
func HasService(nodeId int, serviceName string) bool {
cluster.locker.RLock()
defer cluster.locker.RUnlock()
@@ -420,6 +456,32 @@ func HasService(nodeId int, serviceName string) bool {
return false
}
func GetNodeByServiceName(serviceName string) map[int]struct{} {
cluster.locker.RLock()
defer cluster.locker.RUnlock()
mapNode, ok := cluster.mapServiceNode[serviceName]
if ok == false {
return nil
}
var mapNodeId map[int]struct{}
for nodeId,_ := range mapNode {
mapNodeId[nodeId] = struct{}{}
}
return mapNodeId
}
func (cls *Cluster) GetGlobalCfg() interface{} {
return cls.globalCfg
}
func (cls *Cluster) GetNodeInfo(nodeId int) (NodeInfo,bool) {
cls.locker.RLock()
defer cls.locker.RUnlock()
nodeInfo,ok:= cls.mapIdNode[nodeId]
return nodeInfo,ok
}

View File

@@ -290,6 +290,8 @@ func (dc *DynamicDiscoveryClient) RPC_SubServiceDiscover(req *rpc.SubscribeDisco
//删除不必要的结点
for _, nodeId := range willDelNodeId {
nodeInfo,_ := cluster.GetNodeInfo(int(nodeId))
cluster.TriggerDiscoveryEvent(false,int(nodeId),nodeInfo.PublicServiceList)
dc.removeMasterNode(req.MasterNodeId, int32(nodeId))
if dc.findNodeId(nodeId) == false {
dc.funDelService(int(nodeId), false)
@@ -300,6 +302,7 @@ func (dc *DynamicDiscoveryClient) RPC_SubServiceDiscover(req *rpc.SubscribeDisco
for _, nodeInfo := range mapNodeInfo {
dc.addMasterNode(req.MasterNodeId, nodeInfo.NodeId)
dc.setNodeInfo(nodeInfo)
cluster.TriggerDiscoveryEvent(true,int(nodeInfo.NodeId),nodeInfo.PublicServiceList)
}
return nil

View File

@@ -9,8 +9,9 @@ const (
Sys_Event_Tcp EventType = -3
Sys_Event_Http_Event EventType = -4
Sys_Event_WebSocket EventType = -5
Sys_Event_Rpc_Event EventType = -6
Sys_Event_WebSocket EventType = -5
Sys_Event_Node_Event EventType = -6
Sys_Event_DiscoverService EventType = -7
Sys_Event_User_Define EventType = 1
)

View File

@@ -68,11 +68,16 @@ type RpcHandler struct {
}
type TriggerRpcEvent func(bConnect bool, clientSeq uint32, nodeId int)
type IRpcListener interface {
type INodeListener interface {
OnNodeConnected(nodeId int)
OnNodeDisconnect(nodeId int)
}
type IDiscoveryServiceListener interface {
OnDiscoveryService(nodeId int, serviceName []string)
OnUnDiscoveryService(nodeId int, serviceName []string)
}
type IRpcHandler interface {
IRpcHandlerChannel
GetName() string

View File

@@ -338,7 +338,7 @@ func (server *Server) selfNodeRpcHandlerAsyncGo(client *Client, callerRpcHandler
pCall.rpcHandler = callerRpcHandler
pCall.callback = &callback
pCall.Reply = reply
pCall.ServiceMethod = serviceMethod
client.AddPending(pCall)
req.requestHandle = func(Returns interface{}, Err RpcError) {
v := client.RemovePending(callSeq)

View File

@@ -22,18 +22,24 @@ var timerDispatcherLen = 100000
type IService interface {
Init(iService IService,getClientFun rpc.FuncRpcClient,getServerFun rpc.FuncRpcServer,serviceCfg interface{})
SetName(serviceName string)
GetName() string
Wait()
Start()
OnSetup(iService IService)
OnInit() error
OnStart()
OnRelease()
Wait()
Start()
SetName(serviceName string)
GetName() string
GetRpcHandler() rpc.IRpcHandler
GetServiceCfg()interface{}
OpenProfiler()
GetProfiler() *profiler.Profiler
GetServiceEventChannelNum() int
GetServiceTimerChannelNum() int
SetEventChannelNum(num int)
OpenProfiler()
}
// eventPool的内存池,缓存Event
@@ -52,7 +58,8 @@ type Service struct {
startStatus bool
eventProcessor event.IEventProcessor
profiler *profiler.Profiler //性能分析器
rpcEventLister rpc.IRpcListener
nodeEventLister rpc.INodeListener
discoveryServiceLister rpc.IDiscoveryServiceListener
chanEvent chan event.IEvent
}
@@ -62,6 +69,13 @@ type RpcConnEvent struct{
NodeId int
}
// DiscoveryServiceEvent 发现服务结点
type DiscoveryServiceEvent struct{
IsDiscovery bool
ServiceName []string
NodeId int
}
func SetMaxServiceChannel(maxEventChannel int){
maxServiceEventChannel = maxEventChannel
eventPool = originSync.NewPoolEx(make(chan originSync.IPoolData, maxServiceEventChannel), func() originSync.IPoolData {
@@ -69,8 +83,12 @@ func SetMaxServiceChannel(maxEventChannel int){
})
}
func (rpcEventData *DiscoveryServiceEvent) GetEventType() event.EventType{
return event.Sys_Event_DiscoverService
}
func (rpcEventData *RpcConnEvent) GetEventType() event.EventType{
return event.Sys_Event_Rpc_Event
return event.Sys_Event_Node_Event
}
func (s *Service) OnSetup(iService IService){
@@ -88,7 +106,10 @@ func (s *Service) OpenProfiler() {
func (s *Service) Init(iService IService,getClientFun rpc.FuncRpcClient,getServerFun rpc.FuncRpcServer,serviceCfg interface{}) {
s.dispatcher =timer.NewDispatcher(timerDispatcherLen)
s.chanEvent = make(chan event.IEvent,maxServiceEventChannel)
if s.chanEvent == nil {
s.chanEvent = make(chan event.IEvent,maxServiceEventChannel)
}
s.rpcHandler.InitRpcHandler(iService.(rpc.IRpcHandler),getClientFun,getServerFun,iService.(rpc.IRpcHandlerChannel))
s.IRpcHandler = &s.rpcHandler
s.self = iService.(IModule)
@@ -259,24 +280,44 @@ func (s *Service) RegRawRpc(rpcMethodId uint32,rawRpcCB rpc.RawRpcCallBack){
func (s *Service) OnStart(){
}
func (s *Service) OnRpcEvent(ev event.IEvent){
func (s *Service) OnNodeEvent(ev event.IEvent){
event := ev.(*RpcConnEvent)
if event.IsConnect {
s.rpcEventLister.OnNodeConnected(event.NodeId)
s.nodeEventLister.OnNodeConnected(event.NodeId)
}else{
s.rpcEventLister.OnNodeDisconnect(event.NodeId)
s.nodeEventLister.OnNodeDisconnect(event.NodeId)
}
}
func (s *Service) RegRpcListener(rpcEventLister rpc.IRpcListener) {
s.rpcEventLister = rpcEventLister
s.RegEventReceiverFunc(event.Sys_Event_Rpc_Event,s.GetEventHandler(),s.OnRpcEvent)
func (s *Service) OnDiscoverServiceEvent(ev event.IEvent){
event := ev.(*DiscoveryServiceEvent)
if event.IsDiscovery {
s.discoveryServiceLister.OnDiscoveryService(event.NodeId,event.ServiceName)
}else{
s.discoveryServiceLister.OnUnDiscoveryService(event.NodeId,event.ServiceName)
}
}
func (s *Service) RegRpcListener(rpcEventLister rpc.INodeListener) {
s.nodeEventLister = rpcEventLister
s.RegEventReceiverFunc(event.Sys_Event_Node_Event,s.GetEventHandler(),s.OnNodeEvent)
RegRpcEventFun(s.GetName())
}
func (s *Service) UnRegRpcListener(rpcLister rpc.IRpcListener) {
s.UnRegEventReceiverFunc(event.Sys_Event_Rpc_Event,s.GetEventHandler())
RegRpcEventFun(s.GetName())
func (s *Service) UnRegRpcListener(rpcLister rpc.INodeListener) {
s.UnRegEventReceiverFunc(event.Sys_Event_Node_Event,s.GetEventHandler())
UnRegRpcEventFun(s.GetName())
}
func (s *Service) RegDiscoverListener(discoveryServiceListener rpc.IDiscoveryServiceListener) {
s.discoveryServiceLister = discoveryServiceListener
s.RegEventReceiverFunc(event.Sys_Event_DiscoverService,s.GetEventHandler(),s.OnDiscoverServiceEvent)
RegDiscoveryServiceEventFun(s.GetName())
}
func (s *Service) UnRegDiscoverListener(rpcLister rpc.INodeListener) {
s.UnRegEventReceiverFunc(event.Sys_Event_DiscoverService,s.GetEventHandler())
UnRegDiscoveryServiceEventFun(s.GetName())
}
@@ -311,6 +352,21 @@ func (s *Service) pushEvent(ev event.IEvent) error{
return nil
}
func (s *Service) GetServiceEventChannelNum() int{
return len(s.chanEvent)
}
func (s *Service) GetServiceTimerChannelNum() int{
return len(s.dispatcher.ChanTimer)
}
func (s *Service) SetEventChannelNum(num int){
if s.chanEvent == nil {
s.chanEvent = make(chan event.IEvent,num)
}else {
panic("this stage cannot be set")
}
}
func (s *Service) SetGoRoutineNum(goroutineNum int32) bool {
//已经开始状态不允许修改协程数量,打开性能分析器不允许开多线程

View File

@@ -5,7 +5,12 @@ var mapServiceName map[string]IService
var setupServiceList []IService
type RegRpcEventFunType func(serviceName string)
type RegDiscoveryServiceEventFunType func(serviceName string)
var RegRpcEventFun RegRpcEventFunType
var UnRegRpcEventFun RegRpcEventFunType
var RegDiscoveryServiceEventFun RegDiscoveryServiceEventFunType
var UnRegDiscoveryServiceEventFun RegDiscoveryServiceEventFunType
func init(){
mapServiceName = map[string]IService{}

167
util/queue/squeue.go Normal file
View File

@@ -0,0 +1,167 @@
package queue
import (
"sync"
)
/*
这是一个循环队列
*/
type SQueue[ElementType any] struct {
elements []ElementType
head int
tail int
locker sync.RWMutex
}
//游标,通过该游标获取数据
type SCursor[ElementType any] struct {
pos int
squeue *SQueue[ElementType]
}
func NewSQueue[ElementType any](maxElementNum int) *SQueue[ElementType]{
queue := &SQueue[ElementType]{}
queue.elements = make([]ElementType,maxElementNum+1)
return queue
}
//游标移动到队首
func (s *SCursor[ElementType]) First(){
s.squeue.locker.RLock()
defer s.squeue.locker.RUnlock()
s.pos = s.squeue.head
}
//从当前位置移动游标,注意如果在多协程读或者pop时可能会导致游标失效
func (s *SCursor[ElementType]) Next() (elem ElementType,ret bool){
s.squeue.locker.RLock()
defer s.squeue.locker.RUnlock()
if s.pos == s.squeue.tail {
return
}
s.pos++
s.pos = (s.pos)%(len(s.squeue.elements))
return s.squeue.elements[s.pos],true
}
//获取队列元数个数
func (s *SQueue[ElementType]) Len() int {
s.locker.RLock()
defer s.locker.RUnlock()
return s.len()
}
func (s *SQueue[ElementType]) len() int {
if s.head <= s.tail {
return s.tail - s.head
}
//(len(s.elements)-1-s.head)+(s.tail+1)
return len(s.elements)-s.head+s.tail
}
//获取游标,默认是队首
func (s *SQueue[ElementType]) GetCursor() (cur SCursor[ElementType]){
s.locker.RLock()
defer s.locker.RUnlock()
cur.squeue = s
cur.pos = s.head
return
}
//获取指定位置的游标
func (s *SQueue[ElementType]) GetPosCursor(pos int) (cur SCursor[ElementType],ret bool){
s.locker.RLock()
defer s.locker.RUnlock()
if s.head < s.tail {
if pos<=s.head || pos>s.tail{
return
}
ret = true
cur.squeue = s
cur.pos = pos
return
}
if pos >s.tail && pos <=s.head {
return
}
cur.squeue = s
cur.pos = pos
return
}
//从队首移除掉指定数量元素
func (s *SQueue[ElementType]) RemoveElement(elementNum int) (removeNum int) {
s.locker.Lock()
defer s.locker.Unlock()
lens := s.len()
if elementNum > lens{
removeNum = lens
}else{
removeNum = elementNum
}
s.head = (s.head + removeNum)%len(s.elements)
return
}
//从队首Pop元素
func (s *SQueue[ElementType]) Pop() (elem ElementType,ret bool){
s.locker.Lock()
defer s.locker.Unlock()
if s.head == s.tail {
return
}
s.head++
s.head = s.head%len(s.elements)
return s.elements[s.head],true
}
//从队尾Push数据
func (s *SQueue[ElementType]) Push(elem ElementType) bool {
s.locker.Lock()
defer s.locker.Unlock()
nextPos := (s.tail+1) % len(s.elements)
if nextPos == s.head {
//is full
return false
}
s.tail = nextPos
s.elements[s.tail] = elem
return true
}
//判断队列是否为空
func (s *SQueue[ElementType]) IsEmpty() bool{
s.locker.RLock()
defer s.locker.RUnlock()
return s.head == s.tail
}
//判断队列是否已满
func (s *SQueue[ElementType]) IsFull() bool{
s.locker.RLock()
defer s.locker.RUnlock()
nextPos := (s.tail+1) % len(s.elements)
return nextPos == s.head
}

View File

@@ -0,0 +1,66 @@
package queue
import (
"testing"
)
func Test_Example(t *testing.T) {
//1.创建阶列
queue := NewSQueue[int](5)
//2.判断是否为空
t.Log("is empty :", queue.IsEmpty())
t.Log("is full :", queue.IsFull())
//3.游标使用,打印所有数据
cursor := queue.GetCursor()
cursor.First()
for {
elem, ret := cursor.Next()
if ret == false {
break
}
t.Log("elem:", elem)
}
//4.push数据塞满队列
for i := 0; i < 6; i++ {
t.Log("push:", queue.Push(i))
}
t.Log("is empty :", queue.IsEmpty())
t.Log("is full :", queue.IsFull())
//5.使用游标遍历所有数据
cursor.First()
for {
elem, ret := cursor.Next()
if ret == false {
break
}
t.Log("elem:", elem)
}
//6.删除2个元素
removeNum := queue.RemoveElement(2)
t.Log("Remove Num:", removeNum)
//7.游标遍历
cursor.First()
for {
elem, ret := cursor.Next()
if ret == false {
break
}
t.Log("elem:", elem)
}
//8.pop数据所有
for i := 0; i < 6; i++ {
elem, ret := queue.Pop()
t.Log("pop:", elem, "-", ret, " len:", queue.Len())
}
t.Log("is empty :", queue.IsEmpty())
t.Log("is full :", queue.IsFull())
}