From 589c463e7cee352cf6ac120255d1f46e01091340 Mon Sep 17 00:00:00 2001 From: boyce Date: Fri, 22 Mar 2019 12:56:12 +0800 Subject: [PATCH] =?UTF-8?q?=E8=B0=83=E7=94=A8=E6=9C=AC=E5=9C=B0service?= =?UTF-8?q?=E6=97=B6=EF=BC=8C=E9=AA=8C=E8=AF=81=E6=98=AF=E5=90=A6=E5=AE=8C?= =?UTF-8?q?=E6=88=90=E5=88=9D=E5=A7=8B=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cluster/cluster.go | 33 ++++++++++++++++--- service/Module.go | 11 +++++-- service/Service.go | 1 + .../synccacheservice/synccacheservice.go | 2 +- 4 files changed, 40 insertions(+), 7 deletions(-) diff --git a/cluster/cluster.go b/cluster/cluster.go index 5998648..5bb86c1 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -219,7 +219,8 @@ func (slf *CCluster) Start() error { //_servicename.methodname func (slf *CCluster) Call(NodeServiceMethod string, args interface{}, reply interface{}) error { var callServiceName string - nodeidList := slf.GetNodeList(NodeServiceMethod, &callServiceName) + var serviceName string + nodeidList := slf.GetNodeList(NodeServiceMethod, &callServiceName, &serviceName) if len(nodeidList) > 1 || len(nodeidList) < 1 { service.GetLogger().Printf(sysmodule.LEVER_ERROR, "CCluster.Call(%s) find nodes count %d is error.", NodeServiceMethod, len(nodeidList)) return fmt.Errorf("CCluster.Call(%s) find nodes count %d is error.", NodeServiceMethod, len(nodeidList)) @@ -227,6 +228,12 @@ func (slf *CCluster) Call(NodeServiceMethod string, args interface{}, reply inte nodeid := nodeidList[0] if nodeid == GetNodeId() { + //判断服务是否已经完成初始化 + iService := service.InstanceServiceMgr().FindService(serviceName) + if iService.IsInit() == false { + service.GetLogger().Printf(sysmodule.LEVER_ERROR, "CCluster.Call(%s): NodeId %d is not init.", NodeServiceMethod, nodeid) + return fmt.Errorf("CCluster.Call(%s): NodeId %d is not init.", NodeServiceMethod, nodeid) + } return slf.LocalRpcClient.Call(callServiceName, args, reply) } else { pclient := slf.GetClusterClient(nodeid) @@ -245,7 +252,7 @@ func (slf *CCluster) Call(NodeServiceMethod string, args interface{}, reply inte return fmt.Errorf("CCluster.Call(%s) fail.", NodeServiceMethod) } -func (slf *CCluster) GetNodeList(NodeServiceMethod string, rpcServerMethod *string) []int { +func (slf *CCluster) GetNodeList(NodeServiceMethod string, rpcServerMethod *string, rpcServiceName *string) []int { var nodename string var servicename string var methodname string @@ -275,13 +282,21 @@ func (slf *CCluster) GetNodeList(NodeServiceMethod string, rpcServerMethod *stri nodeidList = slf.GetIdByNodeService(nodename, servicename) } - *rpcServerMethod = servicename + "." + methodname + if rpcServiceName != nil { + *rpcServiceName = servicename + } + + if rpcServerMethod != nil { + *rpcServerMethod = servicename + "." + methodname + } + return nodeidList } func (slf *CCluster) Go(bCast bool, NodeServiceMethod string, args interface{}) error { var callServiceName string - nodeidList := slf.GetNodeList(NodeServiceMethod, &callServiceName) + var serviceName string + nodeidList := slf.GetNodeList(NodeServiceMethod, &callServiceName, &serviceName) if len(nodeidList) < 1 { service.GetLogger().Printf(sysmodule.LEVER_ERROR, "CCluster.Go(%s) not find nodes.", NodeServiceMethod) return fmt.Errorf("CCluster.Go(%s) not find nodes.", NodeServiceMethod) @@ -293,6 +308,12 @@ func (slf *CCluster) Go(bCast bool, NodeServiceMethod string, args interface{}) for _, nodeid := range nodeidList { if nodeid == GetNodeId() { + iService := service.InstanceServiceMgr().FindService(serviceName) + if iService.IsInit() == false { + service.GetLogger().Printf(sysmodule.LEVER_ERROR, "CCluster.Call(%s): NodeId %d is not init.", NodeServiceMethod, nodeid) + return fmt.Errorf("CCluster.Call(%s): NodeId %d is not init.", NodeServiceMethod, nodeid) + } + replyCall := slf.LocalRpcClient.Go(callServiceName, args, nil, nil) if replyCall.Error != nil { service.GetLogger().Printf(sysmodule.LEVER_ERROR, "CCluster.Go(%s) fail:%v.", NodeServiceMethod, replyCall.Error) @@ -392,6 +413,10 @@ func (slf *CCluster) HasLocalService(serviceName string) bool { return slf.cfg.HasLocalService(serviceName) } +func (slf *CCluster) HasInit(serviceName string) bool { + return slf.cfg.HasLocalService(serviceName) +} + func GetNodeId() int { return _self.cfg.currentNode.NodeID } diff --git a/service/Module.go b/service/Module.go index 27fa990..8e3a226 100644 --- a/service/Module.go +++ b/service/Module.go @@ -36,6 +36,7 @@ type IModule interface { RunModule(module IModule) //手动运行Module InitModule(exit chan bool, pwaitGroup *sync.WaitGroup) error //手动初始化Module getBaseModule() *BaseModule //获取BaseModule指针 + IsInit() bool } type BaseModule struct { @@ -52,6 +53,7 @@ type BaseModule struct { rwModuleLocker *sync.RWMutex ExitChan chan bool WaitGroup *sync.WaitGroup + bInit bool } func (slf *BaseModule) GetRoot() IModule { @@ -257,14 +259,19 @@ func (slf *BaseModule) getBaseModule() *BaseModule { return slf } +func (slf *BaseModule) IsInit() bool { + return slf.bInit +} + func (slf *BaseModule) RunModule(module IModule) { err := module.OnInit() if err != nil { GetLogger().Printf(LEVER_ERROR, "Start module %T id is %d is fail,reason:%v...", module, module.GetModuleId(), err) os.Exit(-1) - } else { - GetLogger().Printf(LEVER_INFO, "Start module %T ...", module) } + GetLogger().Printf(LEVER_INFO, "Start module %T ...", module) + + slf.bInit = true //运行所有子模块 timer := util.Timer{} diff --git a/service/Service.go b/service/Service.go index d60b2a0..0029352 100644 --- a/service/Service.go +++ b/service/Service.go @@ -26,6 +26,7 @@ type IService interface { GetServiceId() int GetStatus() int + IsInit() bool } type BaseService struct { diff --git a/sysservice/synccacheservice/synccacheservice.go b/sysservice/synccacheservice/synccacheservice.go index 8e8bcdc..bec5309 100644 --- a/sysservice/synccacheservice/synccacheservice.go +++ b/sysservice/synccacheservice/synccacheservice.go @@ -39,7 +39,7 @@ type SyncCacheData struct { func (slf *CSyncCacheService) OnInit() error { slf.syncQueue = util.NewSyncQueue() var callServiceName string - slf.nodeIdList = cluster.InstanceClusterMgr().GetNodeList("CSyncCacheService.RPC_SyncString", &callServiceName) + slf.nodeIdList = cluster.InstanceClusterMgr().GetNodeList("CSyncCacheService.RPC_SyncString", &callServiceName, nil) for _, nodeId := range slf.nodeIdList { syncCacheData := make(chan *SyncCacheData, MAX_SYNC_DATA_CHAN_NUM) slf.syncDataChanList = append(slf.syncDataChanList, syncCacheData)