调用本地service时,验证是否完成初始化

This commit is contained in:
boyce
2019-03-22 12:56:12 +08:00
parent 8ff8f4aeaf
commit 589c463e7c
4 changed files with 40 additions and 7 deletions

View File

@@ -219,7 +219,8 @@ func (slf *CCluster) Start() error {
//_servicename.methodname
func (slf *CCluster) Call(NodeServiceMethod string, args interface{}, reply interface{}) error {
var callServiceName string
nodeidList := slf.GetNodeList(NodeServiceMethod, &callServiceName)
var serviceName string
nodeidList := slf.GetNodeList(NodeServiceMethod, &callServiceName, &serviceName)
if len(nodeidList) > 1 || len(nodeidList) < 1 {
service.GetLogger().Printf(sysmodule.LEVER_ERROR, "CCluster.Call(%s) find nodes count %d is error.", NodeServiceMethod, len(nodeidList))
return fmt.Errorf("CCluster.Call(%s) find nodes count %d is error.", NodeServiceMethod, len(nodeidList))
@@ -227,6 +228,12 @@ func (slf *CCluster) Call(NodeServiceMethod string, args interface{}, reply inte
nodeid := nodeidList[0]
if nodeid == GetNodeId() {
//判断服务是否已经完成初始化
iService := service.InstanceServiceMgr().FindService(serviceName)
if iService.IsInit() == false {
service.GetLogger().Printf(sysmodule.LEVER_ERROR, "CCluster.Call(%s): NodeId %d is not init.", NodeServiceMethod, nodeid)
return fmt.Errorf("CCluster.Call(%s): NodeId %d is not init.", NodeServiceMethod, nodeid)
}
return slf.LocalRpcClient.Call(callServiceName, args, reply)
} else {
pclient := slf.GetClusterClient(nodeid)
@@ -245,7 +252,7 @@ func (slf *CCluster) Call(NodeServiceMethod string, args interface{}, reply inte
return fmt.Errorf("CCluster.Call(%s) fail.", NodeServiceMethod)
}
func (slf *CCluster) GetNodeList(NodeServiceMethod string, rpcServerMethod *string) []int {
func (slf *CCluster) GetNodeList(NodeServiceMethod string, rpcServerMethod *string, rpcServiceName *string) []int {
var nodename string
var servicename string
var methodname string
@@ -275,13 +282,21 @@ func (slf *CCluster) GetNodeList(NodeServiceMethod string, rpcServerMethod *stri
nodeidList = slf.GetIdByNodeService(nodename, servicename)
}
*rpcServerMethod = servicename + "." + methodname
if rpcServiceName != nil {
*rpcServiceName = servicename
}
if rpcServerMethod != nil {
*rpcServerMethod = servicename + "." + methodname
}
return nodeidList
}
func (slf *CCluster) Go(bCast bool, NodeServiceMethod string, args interface{}) error {
var callServiceName string
nodeidList := slf.GetNodeList(NodeServiceMethod, &callServiceName)
var serviceName string
nodeidList := slf.GetNodeList(NodeServiceMethod, &callServiceName, &serviceName)
if len(nodeidList) < 1 {
service.GetLogger().Printf(sysmodule.LEVER_ERROR, "CCluster.Go(%s) not find nodes.", NodeServiceMethod)
return fmt.Errorf("CCluster.Go(%s) not find nodes.", NodeServiceMethod)
@@ -293,6 +308,12 @@ func (slf *CCluster) Go(bCast bool, NodeServiceMethod string, args interface{})
for _, nodeid := range nodeidList {
if nodeid == GetNodeId() {
iService := service.InstanceServiceMgr().FindService(serviceName)
if iService.IsInit() == false {
service.GetLogger().Printf(sysmodule.LEVER_ERROR, "CCluster.Call(%s): NodeId %d is not init.", NodeServiceMethod, nodeid)
return fmt.Errorf("CCluster.Call(%s): NodeId %d is not init.", NodeServiceMethod, nodeid)
}
replyCall := slf.LocalRpcClient.Go(callServiceName, args, nil, nil)
if replyCall.Error != nil {
service.GetLogger().Printf(sysmodule.LEVER_ERROR, "CCluster.Go(%s) fail:%v.", NodeServiceMethod, replyCall.Error)
@@ -392,6 +413,10 @@ func (slf *CCluster) HasLocalService(serviceName string) bool {
return slf.cfg.HasLocalService(serviceName)
}
func (slf *CCluster) HasInit(serviceName string) bool {
return slf.cfg.HasLocalService(serviceName)
}
func GetNodeId() int {
return _self.cfg.currentNode.NodeID
}

View File

@@ -36,6 +36,7 @@ type IModule interface {
RunModule(module IModule) //手动运行Module
InitModule(exit chan bool, pwaitGroup *sync.WaitGroup) error //手动初始化Module
getBaseModule() *BaseModule //获取BaseModule指针
IsInit() bool
}
type BaseModule struct {
@@ -52,6 +53,7 @@ type BaseModule struct {
rwModuleLocker *sync.RWMutex
ExitChan chan bool
WaitGroup *sync.WaitGroup
bInit bool
}
func (slf *BaseModule) GetRoot() IModule {
@@ -257,14 +259,19 @@ func (slf *BaseModule) getBaseModule() *BaseModule {
return slf
}
func (slf *BaseModule) IsInit() bool {
return slf.bInit
}
func (slf *BaseModule) RunModule(module IModule) {
err := module.OnInit()
if err != nil {
GetLogger().Printf(LEVER_ERROR, "Start module %T id is %d is fail,reason:%v...", module, module.GetModuleId(), err)
os.Exit(-1)
} else {
GetLogger().Printf(LEVER_INFO, "Start module %T ...", module)
}
GetLogger().Printf(LEVER_INFO, "Start module %T ...", module)
slf.bInit = true
//运行所有子模块
timer := util.Timer{}

View File

@@ -26,6 +26,7 @@ type IService interface {
GetServiceId() int
GetStatus() int
IsInit() bool
}
type BaseService struct {

View File

@@ -39,7 +39,7 @@ type SyncCacheData struct {
func (slf *CSyncCacheService) OnInit() error {
slf.syncQueue = util.NewSyncQueue()
var callServiceName string
slf.nodeIdList = cluster.InstanceClusterMgr().GetNodeList("CSyncCacheService.RPC_SyncString", &callServiceName)
slf.nodeIdList = cluster.InstanceClusterMgr().GetNodeList("CSyncCacheService.RPC_SyncString", &callServiceName, nil)
for _, nodeId := range slf.nodeIdList {
syncCacheData := make(chan *SyncCacheData, MAX_SYNC_DATA_CHAN_NUM)
slf.syncDataChanList = append(slf.syncDataChanList, syncCacheData)