From 0ebbe0e31dd4a9c877c115661a0cc8672452fce4 Mon Sep 17 00:00:00 2001 From: duanhf2012 <6549168@qq.com> Date: Thu, 16 Feb 2023 15:59:07 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E6=9C=8D=E5=8A=A1=E7=9A=84?= =?UTF-8?q?=E5=90=AF=E5=81=9C=E9=A1=BA=E5=BA=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cluster/cluster.go | 12 ++++++------ node/node.go | 30 ++++++++++++++---------------- service/service.go | 25 +++++++++++++++++-------- service/servicemgr.go | 8 +++----- 4 files changed, 40 insertions(+), 35 deletions(-) diff --git a/cluster/cluster.go b/cluster/cluster.go index fd4bde9..fcda60f 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -26,7 +26,7 @@ type NodeInfo struct { Private bool ListenAddr string MaxRpcParamLen uint32 //最大Rpc参数长度 - ServiceList []string //所有的服务列表 + ServiceList []string //所有的有序服务列表 PublicServiceList []string //对外公开的服务列表 DiscoveryService []string //筛选发现的服务,如果不配置,不进行筛选 NeighborService []string @@ -248,8 +248,9 @@ func (cls *Cluster) checkDynamicDiscovery(localNodeId int) (bool, bool) { return localMaster, hasMaster } -func (cls *Cluster) appendService(serviceName string, bPublicService bool) { - cls.localNodeInfo.ServiceList = append(cls.localNodeInfo.ServiceList, serviceName) +func (cls *Cluster) AddDynamicDiscoveryService(serviceName string, bPublicService bool) { + addServiceList := append([]string{},serviceName) + cls.localNodeInfo.ServiceList = append(addServiceList,cls.localNodeInfo.ServiceList...) if bPublicService { cls.localNodeInfo.PublicServiceList = append(cls.localNodeInfo.PublicServiceList, serviceName) } @@ -293,11 +294,10 @@ func (cls *Cluster) SetupServiceDiscovery(localNodeId int, setupServiceFun Setup //2.如果为动态服务发现安装本地发现服务 cls.serviceDiscovery = getDynamicDiscovery() + cls.AddDynamicDiscoveryService(DynamicDiscoveryClientName, true) if localMaster == true { - cls.appendService(DynamicDiscoveryMasterName, false) + cls.AddDynamicDiscoveryService(DynamicDiscoveryMasterName, false) } - cls.appendService(DynamicDiscoveryClientName, true) - } func (cls *Cluster) FindRpcHandler(serviceName string) rpc.IRpcHandler { diff --git a/node/node.go b/node/node.go index 57f0dc6..cabc933 100644 --- a/node/node.go +++ b/node/node.go @@ -22,7 +22,6 @@ import ( "time" ) -var closeSig chan bool var sig chan os.Signal var nodeId int var preSetupService []service.IService //预安装 @@ -40,8 +39,6 @@ const( ) func init() { - - closeSig = make(chan bool, 1) sig = make(chan os.Signal, 3) signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM, syscall.Signal(10)) @@ -155,21 +152,23 @@ func initNode(id int) { return } - //2.setup service - for _, s := range preSetupService { - //是否配置的service - if cluster.GetCluster().IsConfigService(s.GetName()) == false { - continue + //2.顺序安装服务 + serviceOrder := cluster.GetCluster().GetLocalNodeInfo().ServiceList + for _,serviceName:= range serviceOrder{ + for _, s := range preSetupService { + if s.GetName() != serviceName { + continue + } + + pServiceCfg := cluster.GetCluster().GetServiceCfg(s.GetName()) + s.Init(s, cluster.GetRpcClient, cluster.GetRpcServer, pServiceCfg) + + service.Setup(s) } - - pServiceCfg := cluster.GetCluster().GetServiceCfg(s.GetName()) - s.Init(s, cluster.GetRpcClient, cluster.GetRpcServer, pServiceCfg) - - service.Setup(s) } //3.service初始化 - service.Init(closeSig) + service.Init() } func initLog() error { @@ -274,8 +273,7 @@ func startNode(args interface{}) error { } cluster.GetCluster().Stop() //7.退出 - close(closeSig) - service.WaitStop() + service.StopAllService() log.SRelease("Server is stop.") return nil diff --git a/service/service.go b/service/service.go index 381d965..12296e5 100644 --- a/service/service.go +++ b/service/service.go @@ -16,13 +16,11 @@ import ( "sync/atomic" ) - -var closeSig chan bool var timerDispatcherLen = 100000 type IService interface { Init(iService IService,getClientFun rpc.FuncRpcClient,getServerFun rpc.FuncRpcServer,serviceCfg interface{}) - Wait() + Stop() Start() OnSetup(iService IService) @@ -61,6 +59,8 @@ type Service struct { nodeEventLister rpc.INodeListener discoveryServiceLister rpc.IDiscoveryServiceListener chanEvent chan event.IEvent + + closeSig chan bool } // RpcConnEvent Node结点连接事件 @@ -105,6 +105,7 @@ func (s *Service) OpenProfiler() { } func (s *Service) Init(iService IService,getClientFun rpc.FuncRpcClient,getServerFun rpc.FuncRpcServer,serviceCfg interface{}) { + s.closeSig = make(chan bool, 1) s.dispatcher =timer.NewDispatcher(timerDispatcherLen) if s.chanEvent == nil { s.chanEvent = make(chan event.IEvent,maxServiceEventChannel) @@ -125,26 +126,31 @@ func (s *Service) Init(iService IService,getClientFun rpc.FuncRpcClient,getServe s.eventHandler.Init(s.eventProcessor) } - func (s *Service) Start() { s.startStatus = true + var waitRun sync.WaitGroup + for i:=int32(0);i< s.goroutineNum;i++{ s.wg.Add(1) + waitRun.Add(1) go func(){ + log.SRelease(s.GetName()," service is running",) + waitRun.Done() s.Run() }() } + + waitRun.Wait() } func (s *Service) Run() { - log.SDebug("Start running Service ", s.GetName()) defer s.wg.Done() var bStop = false s.self.(IService).OnStart() for{ var analyzer *profiler.Analyzer select { - case <- closeSig: + case <- s.closeSig: bStop = true case ev := <- s.chanEvent: switch ev.GetEventType() { @@ -238,8 +244,8 @@ func (s *Service) Release(){ log.SError("core dump info[",errString,"]\n",string(buf[:l])) } }() + s.self.OnRelease() - log.SDebug("Release Service ", s.GetName()) } func (s *Service) OnRelease(){ @@ -249,8 +255,11 @@ func (s *Service) OnInit() error { return nil } -func (s *Service) Wait(){ +func (s *Service) Stop(){ + log.SRelease("stop ",s.GetName()," service ") + close(s.closeSig) s.wg.Wait() + log.SRelease(s.GetName()," service has been stopped") } func (s *Service) GetServiceCfg()interface{}{ diff --git a/service/servicemgr.go b/service/servicemgr.go index a0e93bf..4f08f2a 100644 --- a/service/servicemgr.go +++ b/service/servicemgr.go @@ -19,9 +19,7 @@ func init(){ setupServiceList = []IService{} } -func Init(chanCloseSig chan bool) { - closeSig=chanCloseSig - +func Init() { for _,s := range setupServiceList { err := s.OnInit() if err != nil { @@ -57,8 +55,8 @@ func Start(){ } } -func WaitStop(){ +func StopAllService(){ for i := len(setupServiceList) - 1; i >= 0; i-- { - setupServiceList[i].Wait() + setupServiceList[i].Stop() } }