diff --git a/cluster/cluster.go b/cluster/cluster.go index 408f265..1a1c71f 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -2,21 +2,19 @@ package cluster import ( "fmt" - "log" "net" "os" "strconv" "strings" "time" + "github.com/duanhf2012/origin/sysmodule" + "github.com/duanhf2012/origin/service" "github.com/duanhf2012/origin/rpc" ) -//https://github.com/rocket049/rpc2d/blob/master/rpcnode.go -//http://daizuozhuo.github.io/golang-rpc-practice/ - type RpcClient struct { nodeid int pclient *rpc.Client @@ -36,9 +34,7 @@ type CCluster struct { } func (slf *CCluster) ReadNodeInfo(nodeid int) error { - //连接Server结点 var err error - slf.cfg, err = ReadCfg("./config/cluster.json", nodeid) if err != nil { fmt.Printf("%v", err) @@ -57,39 +53,17 @@ func (slf *CCluster) GetClusterClient(id int) *rpc.Client { return v.pclient } -func (slf *CCluster) GetClusterNode(strNodeName string) *CNodeCfg { - for _, value := range slf.cfg.NodeList { - if value.NodeName == strNodeName { - return &value - } - } - - return nil -} - -func (slf *CCluster) GetBindUrl() (string, error) { - return slf.cfg.currentNode.ServerAddr, nil -} - -type CTestData struct { - Bbbb int64 - Cccc int - Ddd string +func (slf *CCluster) GetBindUrl() string { + return slf.cfg.currentNode.ServerAddr } func (slf *CCluster) AcceptRpc(tpcListen *net.TCPListener) error { - /*slf.reader, slf.writer = net.Pipe() - go rpc.ServeConn(slf.reader) - slf.LocalRpcClient = rpc.NewClient(slf.writer) - */ for { conn, err := tpcListen.Accept() if err != nil { - fmt.Print(err) + service.GetLogger().Printf(sysmodule.LEVER_ERROR, "tpcListen.Accept error:%v", err) return err } - - //使用goroutine单独处理rpc连接请求 go rpc.ServeConn(conn) } @@ -98,19 +72,17 @@ func (slf *CCluster) AcceptRpc(tpcListen *net.TCPListener) error { func (slf *CCluster) ListenService() error { - bindStr, err := slf.GetBindUrl() - if err != nil { - return err - } - + bindStr := slf.GetBindUrl() tcpaddr, err := net.ResolveTCPAddr("tcp4", bindStr) if err != nil { + service.GetLogger().Printf(sysmodule.LEVER_FATAL, "ResolveTCPAddr error:%v", err) os.Exit(1) return err } tcplisten, err2 := net.ListenTCP("tcp", tcpaddr) if err2 != nil { + service.GetLogger().Printf(sysmodule.LEVER_FATAL, "ListenTCP error:%v", err) os.Exit(1) return err2 } @@ -139,7 +111,7 @@ func (slf *CPing) Ping(ping *CPing, pong *CPong) error { func (slf *CCluster) ConnService() error { ping := CPing{0} pong := CPong{0} - fmt.Println(rpc.RegisterName("CPing", "", &ping)) + rpc.RegisterName("CPing", "", &ping) //连接集群服务器 for _, nodeList := range slf.cfg.mapClusterNodeService { @@ -151,7 +123,7 @@ func (slf *CCluster) ConnService() error { for { for _, rpcClient := range slf.nodeclient { - // + //连接状态发送ping if rpcClient.isConnect == true { ping.TimeStamp = 0 err := rpcClient.pclient.Call("CPing.Ping", &ping, &pong) @@ -164,6 +136,7 @@ func (slf *CCluster) ConnService() error { continue } + //非连接状态重新连接 if rpcClient.pclient != nil { rpcClient.pclient.Close() rpcClient.pclient = nil @@ -171,9 +144,10 @@ func (slf *CCluster) ConnService() error { client, err := rpc.Dial("tcp", rpcClient.serverAddr) if err != nil { - log.Println(err) + service.GetLogger().Printf(sysmodule.LEVER_WARN, "Connect nodeid:%d,address:%s fail", rpcClient.nodeid, rpcClient.serverAddr) continue } + service.GetLogger().Printf(sysmodule.LEVER_INFO, "Connect nodeid:%d,address:%s succ", rpcClient.nodeid, rpcClient.serverAddr) v, _ := slf.nodeclient[rpcClient.nodeid] v.pclient = client @@ -188,14 +162,17 @@ func (slf *CCluster) ConnService() error { func (slf *CCluster) Init() error { if len(os.Args) < 2 { + service.GetLogger().Printf(sysmodule.LEVER_ERROR, "CCluster.Init error,param not find NodeId=number") return fmt.Errorf("param error not find NodeId=number") } parts := strings.Split(os.Args[1], "=") if len(parts) < 2 { + service.GetLogger().Printf(sysmodule.LEVER_ERROR, "CCluster.Init error,param not find NodeId=number") return fmt.Errorf("param error not find NodeId=number") } if parts[0] != "NodeId" { + service.GetLogger().Printf(sysmodule.LEVER_ERROR, "CCluster.Init error,param error not find NodeId=number") return fmt.Errorf("param error not find NodeId=number") } @@ -204,6 +181,7 @@ func (slf *CCluster) Init() error { //读取配置 ret, err := strconv.Atoi(parts[1]) if err != nil { + service.GetLogger().Printf(sysmodule.LEVER_ERROR, "CCluster.Init parts[1] error,%v", err) return err } @@ -211,7 +189,6 @@ func (slf *CCluster) Init() error { } func (slf *CCluster) Start() error { - service.InstanceServiceMgr().FetchService(slf.OnFetchService) //监听服务 @@ -230,7 +207,8 @@ func (slf *CCluster) Call(NodeServiceMethod string, args interface{}, reply inte var callServiceName string nodeidList := slf.GetNodeList(NodeServiceMethod, &callServiceName) if len(nodeidList) > 1 || len(nodeidList) < 1 { - return fmt.Errorf("Call: %s find %d nodes.", NodeServiceMethod, len(nodeidList)) + service.GetLogger().Printf(sysmodule.LEVER_ERROR, "CCluster.Call(%s) not find nodes.", NodeServiceMethod) + return fmt.Errorf("CCluster.Call(%s) not find nodes.", NodeServiceMethod) } nodeid := nodeidList[0] @@ -239,13 +217,18 @@ func (slf *CCluster) Call(NodeServiceMethod string, args interface{}, reply inte } else { pclient := slf.GetClusterClient(nodeid) if pclient == nil { - return fmt.Errorf("Call: NodeId %d is not find.", nodeid) + service.GetLogger().Printf(sysmodule.LEVER_ERROR, "CCluster.Call(%s): NodeId %d is not find.", NodeServiceMethod, nodeid) + return fmt.Errorf("CCluster.Call(%s): NodeId %d is not find.", NodeServiceMethod, nodeid) } err := pclient.Call(callServiceName, args, reply) + if err != nil { + service.GetLogger().Printf(sysmodule.LEVER_ERROR, "CCluster.Call(%s) is fail:%v.", callServiceName, err) + } return err } - return fmt.Errorf("Call: %s fail.", NodeServiceMethod) + service.GetLogger().Printf(sysmodule.LEVER_ERROR, "CCluster.Call(%s) fail.", NodeServiceMethod) + return fmt.Errorf("CCluster.Call(%s) fail.", NodeServiceMethod) } func (slf *CCluster) GetNodeList(NodeServiceMethod string, rpcServerMethod *string) []int { @@ -286,24 +269,32 @@ func (slf *CCluster) Go(bCast bool, NodeServiceMethod string, args interface{}) var callServiceName string nodeidList := slf.GetNodeList(NodeServiceMethod, &callServiceName) if len(nodeidList) < 1 { - return fmt.Errorf("Call: %s find %d nodes.", NodeServiceMethod, len(nodeidList)) + service.GetLogger().Printf(sysmodule.LEVER_ERROR, "CCluster.Go(%s) not find nodes.", NodeServiceMethod) + return fmt.Errorf("CCluster.Go(%s) not find nodes.", NodeServiceMethod) } if bCast == false && len(nodeidList) > 1 { - return fmt.Errorf("Call: %s find more nodes %d.", NodeServiceMethod, len(nodeidList)) + return fmt.Errorf("CCluster.Go(%s) find more nodes", NodeServiceMethod) } for _, nodeid := range nodeidList { if nodeid == slf.GetCurrentNodeId() { - slf.LocalRpcClient.Go(callServiceName, args, nil, nil) - //return nil + replyCall := slf.LocalRpcClient.Go(callServiceName, args, nil, nil) + if replyCall.Error != nil { + service.GetLogger().Printf(sysmodule.LEVER_ERROR, "CCluster.Go(%s) fail:%v.", NodeServiceMethod, replyCall.Error) + } + return replyCall.Error } else { pclient := slf.GetClusterClient(nodeid) if pclient == nil { - return fmt.Errorf("Call: NodeId %d is not find.", nodeid) + service.GetLogger().Printf(sysmodule.LEVER_ERROR, "CCluster.Go(%s) NodeId %d not find client", NodeServiceMethod, nodeid) + return fmt.Errorf("CCluster.Go(%s) NodeId %d not find client", NodeServiceMethod, nodeid) } - pclient.Go(callServiceName, args, nil, nil) - //return nil + replyCall := pclient.Go(callServiceName, args, nil, nil) + if replyCall.Error != nil { + service.GetLogger().Printf(sysmodule.LEVER_ERROR, "CCluster.Go(%s) fail:%v.", NodeServiceMethod, replyCall.Error) + } + return replyCall.Error } } @@ -313,28 +304,31 @@ func (slf *CCluster) Go(bCast bool, NodeServiceMethod string, args interface{}) func (slf *CCluster) CallNode(nodeid int, servicemethod string, args interface{}, reply interface{}) error { pclient := slf.GetClusterClient(nodeid) if pclient == nil { + service.GetLogger().Printf(sysmodule.LEVER_ERROR, "CCluster.CallNode(%d,%s) NodeId not find client", nodeid, servicemethod) return fmt.Errorf("Call: NodeId %d is not find.", nodeid) } err := pclient.Call(servicemethod, args, reply) - return err + if err != nil { + service.GetLogger().Printf(sysmodule.LEVER_ERROR, "CCluster.CallNode(%d,%s) fail:%v", nodeid, servicemethod, err) + } + return err } func (slf *CCluster) GoNode(nodeid int, args interface{}, servicemethod string) error { pclient := slf.GetClusterClient(nodeid) if pclient == nil { - return fmt.Errorf("Call: NodeId %d is not find.", nodeid) + service.GetLogger().Printf(sysmodule.LEVER_ERROR, "CCluster.GoNode(%d,%s) NodeId not find client", nodeid, servicemethod) + return fmt.Errorf("CCluster.GoNode(%d,%s) NodeId not find client", nodeid, servicemethod) } replyCall := pclient.Go(servicemethod, args, nil, nil) - //ret := <-replyCall.Done if replyCall.Error != nil { - fmt.Print(replyCall.Error) + service.GetLogger().Printf(sysmodule.LEVER_ERROR, "CCluster.GoNode(%d,%s) fail:%v", nodeid, servicemethod, replyCall.Error) } - //fmt.Print(ret) - return nil + return replyCall.Error } diff --git a/cluster/config.go b/cluster/config.go index 8cef0ab..6b12177 100644 --- a/cluster/config.go +++ b/cluster/config.go @@ -7,40 +7,33 @@ import ( ) type CNodeCfg struct { - NodeID int - NodeName string - + NodeID int + NodeName string ServerAddr string ServiceList []string ClusterNode []string } type CNode struct { - NodeID int - NodeName string - + NodeID int + NodeName string ServerAddr string ServiceList map[string]bool } type ClusterConfig struct { - NodeList []CNodeCfg + NodeList []CNodeCfg //配置列表 + currentNode CNode //当前node - //通过id获取结点 - mapIdNode map[int]CNode - - //map[nodename][ {CNode} ] + mapIdNode map[int]CNode //map[nodeid] CNode mapClusterNodeService map[string][]CNode //map[nodename] []CNode mapClusterServiceNode map[string][]CNode //map[servicename] []CNode - //mapLocalService map[string]bool //map[servicename] bool - - currentNode CNode } -// ReadCfg ... func ReadCfg(path string, nodeid int) (*ClusterConfig, error) { c := &ClusterConfig{} + //1.加载解析配置 d, err := ioutil.ReadFile(path) if err != nil { fmt.Printf("Read File %s Error!", path) @@ -57,8 +50,8 @@ func ReadCfg(path string, nodeid int) (*ClusterConfig, error) { c.mapClusterNodeService = make(map[string][]CNode, 1) c.mapClusterServiceNode = make(map[string][]CNode, 1) + //2.组装mapIdNode var custerNodeName []string - //组装mapIdNode for _, v := range c.NodeList { mapservice := make(map[string]bool, 1) for _, s := range v.ServiceList { @@ -75,10 +68,10 @@ func ReadCfg(path string, nodeid int) (*ClusterConfig, error) { } } - //存入当前Node服务名 + //3.存入当前Node服务名 c.mapClusterNodeService[c.currentNode.NodeName] = append(c.mapClusterNodeService[c.currentNode.NodeName], c.currentNode) - //组装mapClusterNodeService + //4.组装mapClusterNodeService for _, cn := range custerNodeName { for _, n := range c.mapIdNode { if n.NodeName == cn { @@ -87,7 +80,7 @@ func ReadCfg(path string, nodeid int) (*ClusterConfig, error) { } } - //组装mapClusterServiceNode + //5.组装mapClusterServiceNode for _, nodelist := range c.mapClusterNodeService { //[]Node for _, node := range nodelist { //Node for s := range node.ServiceList { @@ -138,7 +131,5 @@ func (slf *ClusterConfig) GetIdByNodeService(NodeName string, serviceName string func (slf *ClusterConfig) HasLocalService(serviceName string) bool { _, ok := slf.currentNode.ServiceList[serviceName] - - //_, ok := slf.mapLocalService[serviceName] return ok == true } diff --git a/originnode/global.go b/originnode/global.go index 2fbaf22..58016c5 100644 --- a/originnode/global.go +++ b/originnode/global.go @@ -13,6 +13,7 @@ type GlobalModule struct { service.BaseModule } +// 全局模块定义 var g_module GlobalModule func AddModule(module service.IModule) uint32 { diff --git a/originnode/node.go b/originnode/node.go index 904813d..5be229b 100644 --- a/originnode/node.go +++ b/originnode/node.go @@ -17,7 +17,7 @@ import ( ) type CExitCtl struct { - exit chan bool + exitChan chan bool waitGroup *sync.WaitGroup } @@ -32,7 +32,7 @@ func (s *COriginNode) Init() { //初始化全局模块 service.InitLog() imodule := g_module.GetModuleById(sysmodule.SYS_LOG) - service.InstanceServiceMgr().Init(imodule.(service.ILogger), s.exit, s.waitGroup) + service.InstanceServiceMgr().Init(imodule.(service.ILogger), s.exitChan, s.waitGroup) s.sigs = make(chan os.Signal, 1) signal.Notify(s.sigs, syscall.SIGINT, syscall.SIGTERM) @@ -74,38 +74,50 @@ func (s *COriginNode) SetupService(services ...service.IService) { func (s *COriginNode) Start() { if s.debugListenAddress != "" { go func() { - log.Println(http.ListenAndServe(s.debugListenAddress, nil)) }() } + //开始运行集群 cluster.InstanceClusterMgr().Start() + + //运行全局模块 RunGlobalModule() + + //开启所有服务 service.InstanceServiceMgr().Start() + //监听退出信号 select { case <-s.sigs: - fmt.Println("收到信号推出程序") + service.GetLogger().Printf(sysmodule.LEVER_WARN, "Recv stop sig") + fmt.Printf("Recv stop sig") } + //停止运行程序 s.Stop() } func (s *COriginNode) Stop() { - close(s.exit) + close(s.exitChan) s.waitGroup.Wait() } func NewOrginNode() *COriginNode { + + //创建模块 node := new(COriginNode) - node.exit = make(chan bool) + node.exitChan = make(chan bool) node.waitGroup = &sync.WaitGroup{} - InitGlobalModule(node.exit, node.waitGroup) + + //初始化全局模块 + InitGlobalModule(node.exitChan, node.waitGroup) var syslogmodule sysmodule.LogModule syslogmodule.Init("system", sysmodule.LEVER_INFO) syslogmodule.SetModuleId(sysmodule.SYS_LOG) AddModule(&syslogmodule) + //初始化集群对象 err := cluster.InstanceClusterMgr().Init() if err != nil { fmt.Print(err) @@ -114,13 +126,3 @@ func NewOrginNode() *COriginNode { return node } - -func HasCmdParam(param string) bool { - for i := 0; i < len(os.Args); i++ { - if os.Args[i] == param { - return true - } - } - - return false -} diff --git a/service/Service.go b/service/Service.go index a41f12a..90eb964 100644 --- a/service/Service.go +++ b/service/Service.go @@ -22,10 +22,9 @@ type IModule interface { GetModuleId() uint32 GetModuleById(moduleId uint32) IModule AddModule(module IModule) uint32 - //DynamicAddModule(module IModule) uint32 - - RunModule(module IModule) error + RunModule(module IModule) InitModule(exit chan bool, pwaitGroup *sync.WaitGroup) error + OnInit() error OnRun() bool @@ -89,7 +88,6 @@ type BaseModule struct { CurrMaxModuleId uint32 rwModuleLocker sync.RWMutex - selfModule IModule ownerModule IModule } @@ -139,15 +137,6 @@ func (slf *BaseService) Init(iservice IService) error { return nil } -func (slf *BaseService) RPC_CheckServiceTickTimeOut(microSecond int64) error { - - if slf.IsTimeOutTick(microSecond) == true { - // Log.Printf("service:%s is timeout,state:%d", slf.GetServiceName(), slf.GetStatus()) - } - - return nil -} - func (slf *BaseService) IsTimeOutTick(microSecond int64) bool { nowtm := time.Now().UnixNano() / 1e6 @@ -177,7 +166,6 @@ func (slf *BaseModule) GetModuleById(moduleId uint32) IModule { } func (slf *BaseModule) genModuleId() uint32 { - slf.rwModuleLocker.Lock() slf.CurrMaxModuleId++ moduleId := slf.CurrMaxModuleId @@ -190,7 +178,7 @@ func (slf *BaseModule) RemoveModule(moduleId uint32) bool { slf.rwModuleLocker.Lock() _, ok := slf.mapModule[moduleId] if ok == false { - GetLogger().Printf(LEVER_WARN, "RemoveModule fail %d...", moduleId) + GetLogger().Printf(LEVER_WARN, "%T RemoveModule fail %d...", slf.GetOwner().GetModuleById(slf.GetModuleId()), moduleId) slf.rwModuleLocker.Unlock() return false } @@ -205,15 +193,25 @@ func (slf *BaseModule) IsRoot() bool { return slf.GetOwner().GetModuleById(slf.GetModuleId()) == nil } +const ( + //ModuleNone ... + + MAX_ALLOW_SET_MODULE_ID = iota + 100000000 + INIT_AUTO_INCREMENT +) + func (slf *BaseModule) AddModule(module IModule) uint32 { if slf.WaitGroup == nil { - GetLogger().Printf(LEVER_FATAL, "AddModule error %s...", fmt.Sprintf("%T", module)) - } - - if module.GetModuleId() > 100000000 { + GetLogger().Printf(LEVER_FATAL, "AddModule error wait group is nil:%T...", module) return 0 } + //用户设置的id不允许大于MAX_ALLOW_SET_MODULE_ID + if module.GetModuleId() > MAX_ALLOW_SET_MODULE_ID { + return 0 + } + + //如果没有设置,自动生成ModuleId if module.GetModuleId() == 0 { module.SetModuleId(slf.genModuleId()) } @@ -224,17 +222,16 @@ func (slf *BaseModule) AddModule(module IModule) uint32 { } else { module.SetOwner(slf.GetOwner().GetModuleById(slf.GetModuleId())) } - } + //设置模块退出信号捕获 module.InitModule(slf.ExitChan, slf.WaitGroup) + //存入父模块中 slf.rwModuleLocker.Lock() - if slf.mapModule == nil { slf.mapModule = make(map[uint32]IModule) } - _, ok := slf.mapModule[module.GetModuleId()] if ok == true { slf.rwModuleLocker.Unlock() @@ -244,12 +241,13 @@ func (slf *BaseModule) AddModule(module IModule) uint32 { slf.mapModule[module.GetModuleId()] = module slf.rwModuleLocker.Unlock() + //运行模块 go module.RunModule(module) return module.GetModuleId() } func (slf *BaseModule) OnInit() error { - return fmt.Errorf("not implement OnInit moduletype %d ", slf.GetModuleId()) + return nil } func (slf *BaseModule) OnRun() bool { @@ -274,7 +272,7 @@ func (slf *BaseModule) SetOwnerService(iservice IService) { } func (slf *BaseModule) InitModule(exit chan bool, pwaitGroup *sync.WaitGroup) error { - slf.CurrMaxModuleId = 100000 + slf.CurrMaxModuleId = INIT_AUTO_INCREMENT slf.WaitGroup = pwaitGroup slf.ExitChan = exit return nil @@ -284,33 +282,30 @@ func (slf *BaseModule) getBaseModule() *BaseModule { return slf } -func (slf *BaseModule) RunModule(module IModule) error { +func (slf *BaseModule) GetSelf() IModule { + if slf.IsRoot() { + return slf.GetOwner() + } + return slf.GetOwner().GetModuleById(slf.GetModuleId()) +} + +func (slf *BaseModule) RunModule(module IModule) { module.OnInit() //运行所有子模块 - slf.rwModuleLocker.RLock() - - /* - for _, subModule := range slf.mapModule { - go subModule.RunModule(subModule) - }*/ - slf.rwModuleLocker.RUnlock() - slf.WaitGroup.Add(1) defer slf.WaitGroup.Done() for { select { case <-slf.ExitChan: - GetLogger().Printf(LEVER_WARN, "stopping module %s...", fmt.Sprintf("%T", slf)) - fmt.Println("stopping module %s...", fmt.Sprintf("%T", slf)) - return nil + GetLogger().Printf(LEVER_WARN, "Stopping module %T...", slf.GetSelf()) + fmt.Println("Stopping module %T...", slf.GetSelf()) + return } if module.OnRun() == false { break } slf.tickTime = time.Now().UnixNano() / 1e6 } - - return nil } diff --git a/service/servicemanager.go b/service/servicemanager.go index 802e44b..f4ef1aa 100644 --- a/service/servicemanager.go +++ b/service/servicemanager.go @@ -1,9 +1,7 @@ package service import ( - "fmt" "sync" - "time" ) type IServiceManager interface { @@ -49,7 +47,6 @@ func (slf *CServiceManager) Init(logger ILogger, exit chan bool, pwaitGroup *syn slf.logger = logger for _, s := range slf.localserviceMap { (s.(IModule)).InitModule(exit, pwaitGroup) - //(s.(IModule)).OnInit() } return true @@ -63,52 +60,25 @@ func (slf *CServiceManager) Start() bool { return true } -func (slf *CServiceManager) CheckServiceTimeTimeout(exit chan bool, pwaitGroup *sync.WaitGroup) { - defer pwaitGroup.Done() - for { - select { - case <-exit: - fmt.Println("CheckServiceTimeTimeout stopping...") - return - } - - for _, s := range slf.localserviceMap { - - if s.IsTimeOutTick(20000) == true { - Log.Printf("service:%s is timeout,state:%d", s.GetServiceName(), s.GetStatus()) - } - } - time.Sleep(2 * time.Second) - } - -} - func (slf *CServiceManager) GenServiceID() int { slf.genserviceid += 1 return slf.genserviceid } -func (slf *CServiceManager) Get() bool { - for _, s := range slf.localserviceMap { - go s.OnRun() - } - - return true -} - func (slf *CServiceManager) GetLogger() ILogger { return slf.logger } -var _self *CServiceManager +var self *CServiceManager func InstanceServiceMgr() *CServiceManager { - if _self == nil { - _self = new(CServiceManager) - _self.localserviceMap = make(map[string]IService) - return _self + if self == nil { + self = new(CServiceManager) + self.localserviceMap = make(map[string]IService) + return self } - return _self + + return self } func GetLogger() ILogger {