优化服务的启停顺序

This commit is contained in:
duanhf2012
2023-02-16 15:59:07 +08:00
parent e326e342f2
commit 0ebbe0e31d
4 changed files with 40 additions and 35 deletions

View File

@@ -26,7 +26,7 @@ type NodeInfo struct {
Private bool Private bool
ListenAddr string ListenAddr string
MaxRpcParamLen uint32 //最大Rpc参数长度 MaxRpcParamLen uint32 //最大Rpc参数长度
ServiceList []string //所有的服务列表 ServiceList []string //所有的有序服务列表
PublicServiceList []string //对外公开的服务列表 PublicServiceList []string //对外公开的服务列表
DiscoveryService []string //筛选发现的服务,如果不配置,不进行筛选 DiscoveryService []string //筛选发现的服务,如果不配置,不进行筛选
NeighborService []string NeighborService []string
@@ -248,8 +248,9 @@ func (cls *Cluster) checkDynamicDiscovery(localNodeId int) (bool, bool) {
return localMaster, hasMaster return localMaster, hasMaster
} }
func (cls *Cluster) appendService(serviceName string, bPublicService bool) { func (cls *Cluster) AddDynamicDiscoveryService(serviceName string, bPublicService bool) {
cls.localNodeInfo.ServiceList = append(cls.localNodeInfo.ServiceList, serviceName) addServiceList := append([]string{},serviceName)
cls.localNodeInfo.ServiceList = append(addServiceList,cls.localNodeInfo.ServiceList...)
if bPublicService { if bPublicService {
cls.localNodeInfo.PublicServiceList = append(cls.localNodeInfo.PublicServiceList, serviceName) cls.localNodeInfo.PublicServiceList = append(cls.localNodeInfo.PublicServiceList, serviceName)
} }
@@ -293,11 +294,10 @@ func (cls *Cluster) SetupServiceDiscovery(localNodeId int, setupServiceFun Setup
//2.如果为动态服务发现安装本地发现服务 //2.如果为动态服务发现安装本地发现服务
cls.serviceDiscovery = getDynamicDiscovery() cls.serviceDiscovery = getDynamicDiscovery()
cls.AddDynamicDiscoveryService(DynamicDiscoveryClientName, true)
if localMaster == true { if localMaster == true {
cls.appendService(DynamicDiscoveryMasterName, false) cls.AddDynamicDiscoveryService(DynamicDiscoveryMasterName, false)
} }
cls.appendService(DynamicDiscoveryClientName, true)
} }
func (cls *Cluster) FindRpcHandler(serviceName string) rpc.IRpcHandler { func (cls *Cluster) FindRpcHandler(serviceName string) rpc.IRpcHandler {

View File

@@ -22,7 +22,6 @@ import (
"time" "time"
) )
var closeSig chan bool
var sig chan os.Signal var sig chan os.Signal
var nodeId int var nodeId int
var preSetupService []service.IService //预安装 var preSetupService []service.IService //预安装
@@ -40,8 +39,6 @@ const(
) )
func init() { func init() {
closeSig = make(chan bool, 1)
sig = make(chan os.Signal, 3) sig = make(chan os.Signal, 3)
signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM, syscall.Signal(10)) signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM, syscall.Signal(10))
@@ -155,21 +152,23 @@ func initNode(id int) {
return return
} }
//2.setup service //2.顺序安装服务
for _, s := range preSetupService { serviceOrder := cluster.GetCluster().GetLocalNodeInfo().ServiceList
//是否配置的service for _,serviceName:= range serviceOrder{
if cluster.GetCluster().IsConfigService(s.GetName()) == false { for _, s := range preSetupService {
continue if s.GetName() != serviceName {
continue
}
pServiceCfg := cluster.GetCluster().GetServiceCfg(s.GetName())
s.Init(s, cluster.GetRpcClient, cluster.GetRpcServer, pServiceCfg)
service.Setup(s)
} }
pServiceCfg := cluster.GetCluster().GetServiceCfg(s.GetName())
s.Init(s, cluster.GetRpcClient, cluster.GetRpcServer, pServiceCfg)
service.Setup(s)
} }
//3.service初始化 //3.service初始化
service.Init(closeSig) service.Init()
} }
func initLog() error { func initLog() error {
@@ -274,8 +273,7 @@ func startNode(args interface{}) error {
} }
cluster.GetCluster().Stop() cluster.GetCluster().Stop()
//7.退出 //7.退出
close(closeSig) service.StopAllService()
service.WaitStop()
log.SRelease("Server is stop.") log.SRelease("Server is stop.")
return nil return nil

View File

@@ -16,13 +16,11 @@ import (
"sync/atomic" "sync/atomic"
) )
var closeSig chan bool
var timerDispatcherLen = 100000 var timerDispatcherLen = 100000
type IService interface { type IService interface {
Init(iService IService,getClientFun rpc.FuncRpcClient,getServerFun rpc.FuncRpcServer,serviceCfg interface{}) Init(iService IService,getClientFun rpc.FuncRpcClient,getServerFun rpc.FuncRpcServer,serviceCfg interface{})
Wait() Stop()
Start() Start()
OnSetup(iService IService) OnSetup(iService IService)
@@ -61,6 +59,8 @@ type Service struct {
nodeEventLister rpc.INodeListener nodeEventLister rpc.INodeListener
discoveryServiceLister rpc.IDiscoveryServiceListener discoveryServiceLister rpc.IDiscoveryServiceListener
chanEvent chan event.IEvent chanEvent chan event.IEvent
closeSig chan bool
} }
// RpcConnEvent Node结点连接事件 // RpcConnEvent Node结点连接事件
@@ -105,6 +105,7 @@ func (s *Service) OpenProfiler() {
} }
func (s *Service) Init(iService IService,getClientFun rpc.FuncRpcClient,getServerFun rpc.FuncRpcServer,serviceCfg interface{}) { func (s *Service) Init(iService IService,getClientFun rpc.FuncRpcClient,getServerFun rpc.FuncRpcServer,serviceCfg interface{}) {
s.closeSig = make(chan bool, 1)
s.dispatcher =timer.NewDispatcher(timerDispatcherLen) s.dispatcher =timer.NewDispatcher(timerDispatcherLen)
if s.chanEvent == nil { if s.chanEvent == nil {
s.chanEvent = make(chan event.IEvent,maxServiceEventChannel) s.chanEvent = make(chan event.IEvent,maxServiceEventChannel)
@@ -125,26 +126,31 @@ func (s *Service) Init(iService IService,getClientFun rpc.FuncRpcClient,getServe
s.eventHandler.Init(s.eventProcessor) s.eventHandler.Init(s.eventProcessor)
} }
func (s *Service) Start() { func (s *Service) Start() {
s.startStatus = true s.startStatus = true
var waitRun sync.WaitGroup
for i:=int32(0);i< s.goroutineNum;i++{ for i:=int32(0);i< s.goroutineNum;i++{
s.wg.Add(1) s.wg.Add(1)
waitRun.Add(1)
go func(){ go func(){
log.SRelease(s.GetName()," service is running",)
waitRun.Done()
s.Run() s.Run()
}() }()
} }
waitRun.Wait()
} }
func (s *Service) Run() { func (s *Service) Run() {
log.SDebug("Start running Service ", s.GetName())
defer s.wg.Done() defer s.wg.Done()
var bStop = false var bStop = false
s.self.(IService).OnStart() s.self.(IService).OnStart()
for{ for{
var analyzer *profiler.Analyzer var analyzer *profiler.Analyzer
select { select {
case <- closeSig: case <- s.closeSig:
bStop = true bStop = true
case ev := <- s.chanEvent: case ev := <- s.chanEvent:
switch ev.GetEventType() { switch ev.GetEventType() {
@@ -238,8 +244,8 @@ func (s *Service) Release(){
log.SError("core dump info[",errString,"]\n",string(buf[:l])) log.SError("core dump info[",errString,"]\n",string(buf[:l]))
} }
}() }()
s.self.OnRelease() s.self.OnRelease()
log.SDebug("Release Service ", s.GetName())
} }
func (s *Service) OnRelease(){ func (s *Service) OnRelease(){
@@ -249,8 +255,11 @@ func (s *Service) OnInit() error {
return nil return nil
} }
func (s *Service) Wait(){ func (s *Service) Stop(){
log.SRelease("stop ",s.GetName()," service ")
close(s.closeSig)
s.wg.Wait() s.wg.Wait()
log.SRelease(s.GetName()," service has been stopped")
} }
func (s *Service) GetServiceCfg()interface{}{ func (s *Service) GetServiceCfg()interface{}{

View File

@@ -19,9 +19,7 @@ func init(){
setupServiceList = []IService{} setupServiceList = []IService{}
} }
func Init(chanCloseSig chan bool) { func Init() {
closeSig=chanCloseSig
for _,s := range setupServiceList { for _,s := range setupServiceList {
err := s.OnInit() err := s.OnInit()
if err != nil { if err != nil {
@@ -57,8 +55,8 @@ func Start(){
} }
} }
func WaitStop(){ func StopAllService(){
for i := len(setupServiceList) - 1; i >= 0; i-- { for i := len(setupServiceList) - 1; i >= 0; i-- {
setupServiceList[i].Wait() setupServiceList[i].Stop()
} }
} }