From 4e541e0851a79ee47fc8c2eb6ecda795c90764be Mon Sep 17 00:00:00 2001 From: boyce Date: Wed, 27 Feb 2019 11:58:51 +0800 Subject: [PATCH] =?UTF-8?q?=E5=8E=BB=E9=99=A4=E6=8E=89=E5=85=A8=E5=B1=80?= =?UTF-8?q?=E6=A8=A1=E5=9D=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Test/config/cluster.json | 5 +- Test/main.go | 157 +++++++++++++++++--------------------- cluster/config.go | 9 ++- originnode/global.go | 7 +- originnode/node.go | 18 ++--- service/Module.go | 19 +++-- service/Service.go | 19 +++-- service/servicemanager.go | 2 - sysmodule/LogModule.go | 13 ++++ 9 files changed, 123 insertions(+), 126 deletions(-) diff --git a/Test/config/cluster.json b/Test/config/cluster.json index 00b7c73..dbc121d 100644 --- a/Test/config/cluster.json +++ b/Test/config/cluster.json @@ -1,12 +1,13 @@ { -"NodeList":[ +"PublicServiceList":["logiclog"], +"NodeList":[ { "NodeID":1, "NodeName":"N_Node1", "ServerAddr":"127.0.0.1:8080", - "ServiceList":["WSServerService","CTest","HttpServerService"], + "ServiceList":["CTest"], "ClusterNode":["N_Node2"] }, diff --git a/Test/main.go b/Test/main.go index 2796f54..68de5e5 100644 --- a/Test/main.go +++ b/Test/main.go @@ -3,50 +3,94 @@ package main import ( "encoding/json" "fmt" - "io" - "net/http" - "os" "time" - "github.com/duanhf2012/origin/cluster" - "github.com/duanhf2012/origin/network" + "github.com/duanhf2012/origin/sysmodule" + "github.com/duanhf2012/origin/originnode" "github.com/duanhf2012/origin/service" "github.com/duanhf2012/origin/sysservice" - - "github.com/gorilla/websocket" ) -type CMessageReceiver struct { +//子模块 +type CTestModule struct { + service.BaseModule } -func (slf *CMessageReceiver) OnConnected(webServer network.IWebsocketServer, clientid uint64) { - fmt.Printf("%d\n", clientid) +func (ws *CTestModule) DoSomething() { + fmt.Printf("CTestModule do some thing!\n") } -func (slf *CMessageReceiver) OnDisconnect(webServer network.IWebsocketServer, clientid uint64, err error) { - fmt.Printf("%d\n", clientid) - fmt.Print(err) +func (ws *CTestModule) OnInit() error { + fmt.Printf("CTestModule.OnInit\n") + return nil } -func (slf *CMessageReceiver) OnRecvMsg(webServer network.IWebsocketServer, clientid uint64, msgtype int, data []byte) { - fmt.Printf("%d,%d\n", clientid, msgtype) - fmt.Print(string(data)) - - webServer.SendMsg(clientid, websocket.TextMessage, data) +func (ws *CTestModule) OnRun() bool { + time.Sleep(2 * time.Second) + fmt.Printf("CTestModule.OnRun\n") + return true } -func Test(res http.ResponseWriter, req *http.Request) { - io.WriteString(res, "test..........!\n") +func (ws *CTestModule) OnEndRun() { + fmt.Printf("CTestModule.OnEndRun\n") } +//CTest服务定义 type CTest struct { service.BaseService tmp int } func (ws *CTest) OnInit() error { + fmt.Printf("CTest.OnInit\n") + testModule := CTestModule{} + moduleId := ws.AddModule(&testModule) + pTmpModule := ws.GetModuleById(moduleId) + pTmpModuleTest := pTmpModule.(*CTestModule) + pTmpModuleTest.DoSomething() + + return nil +} + +func (ws *CTest) OnRun() bool { + + ws.tmp = ws.tmp + 1 + time.Sleep(1 * time.Second) + logic := service.InstanceServiceMgr().FindService("logiclog") + logic.(sysmodule.ILogger).Printf(sysmodule.LEVER_DEBUG, "CTest.OnRun\n") + fmt.Printf("CTest.OnRun\n") + /* + //if ws.tmp%10 == 0 { + var test CTestData + test.Bbbb = 1111 + test.Cccc = 111 + test.Ddd = "1111" + var test2 CTestData + err := cluster.Call("_CTest.RPC_LogTicker2\n", &test, &test2) + fmt.Print(err, test2) + //} + + //模块的示例 + testModule := CTestModule{} + moduleId := ws.AddModule(&testModule) + pTmpModule := ws.GetModuleById(moduleId) + pTmpModuleTest := pTmpModule.(*CTestModule) + pTmpModuleTest.DoSomething() + pservice := testModule.GetOwnerService() + fmt.Printf("%T", pservice) + */ + return true +} + +func (ws *CTest) OnEndRun() { + fmt.Printf("CTest.OnEndRun\n") +} + +func (ws *CTest) RPC_LogTicker2(args *CTestData, quo *CTestData) error { + + *quo = *args return nil } @@ -56,20 +100,6 @@ type CTestData struct { Ddd string } -type CTestModule struct { - service.BaseModule -} - -func (ws *CTestModule) DoSomething() { - fmt.Printf("CTestModule do some thing!") -} - -func (ws *CTest) RPC_LogTicker2(args *CTestData, quo *CTestData) error { - - *quo = *args - return nil -} - func (ws *CTest) HTTP_LogTicker2(request *sysservice.HttpRequest, resp *sysservice.HttpRespone) error { data := CTestData{111, 333, "34444"} @@ -77,68 +107,17 @@ func (ws *CTest) HTTP_LogTicker2(request *sysservice.HttpRequest, resp *sysservi return nil } -func (ws *CTest) OnRun() error { - - ws.tmp = ws.tmp + 1 - time.Sleep(1 * time.Second) - //if ws.tmp%10 == 0 { - var test CTestData - test.Bbbb = 1111 - test.Cccc = 111 - test.Ddd = "1111" - var test2 CTestData - err := cluster.Call("_CTest.RPC_LogTicker2", &test, &test2) - fmt.Print(err, test2) - //} - - //模块的示例 - testModule := CTestModule{} - testModule.SetModuleType(1) - ws.AddModule(&testModule) - - pTmpModule := ws.GetModuleByType(1) - pTmpModuleTest := pTmpModule.(*CTestModule) - pTmpModuleTest.DoSomething() - pservice := testModule.GetOwnerService() - fmt.Printf("%T", pservice) - return nil -} - -func NewCTest(servicetype int) *CTest { - wss := new(CTest) - wss.Init(wss, servicetype) - return wss -} - -func checkFileIsExist(filename string) bool { - var exist = true - if _, err := os.Stat(filename); os.IsNotExist(err) { - exist = false - } - return exist -} - -func (ws *CTest) OnDestory() error { - return nil -} - func main() { node := originnode.NewOrginNode() if node == nil { return } - var module CTestModule - module.SetModuleType(1) - originnode.AddModule(&module) - ptest := originnode.GetModuleByType(1) - fmt.Print(ptest) + test := CTest{} + logiclogservice := &sysservice.LogService{} + logiclogservice.InitLog("logiclog", sysmodule.LEVER_DEBUG) - var receiver CMessageReceiver - wsservice := sysservice.NewWSServerService("/ws", 1314, &receiver, false) - test := NewCTest(0) - httpserver := sysservice.NewHttpServerService(9120) - node.SetupService(test, httpserver, wsservice) + node.SetupService(logiclogservice, &test) node.Init() node.Start() diff --git a/cluster/config.go b/cluster/config.go index 6b12177..66eedff 100644 --- a/cluster/config.go +++ b/cluster/config.go @@ -22,8 +22,9 @@ type CNode struct { } type ClusterConfig struct { - NodeList []CNodeCfg //配置列表 - currentNode CNode //当前node + PublicServiceList []string + NodeList []CNodeCfg //配置列表 + currentNode CNode //当前node mapIdNode map[int]CNode //map[nodeid] CNode mapClusterNodeService map[string][]CNode //map[nodename] []CNode @@ -89,6 +90,10 @@ func ReadCfg(path string, nodeid int) (*ClusterConfig, error) { } } + //向c.currentNode中加入公共服务 + for _, servicename := range c.PublicServiceList { + c.currentNode.ServiceList[servicename] = true + } return c, nil } diff --git a/originnode/global.go b/originnode/global.go index 4914f62..9157465 100644 --- a/originnode/global.go +++ b/originnode/global.go @@ -2,13 +2,9 @@ package originnode import ( _ "net/http/pprof" - "sync" - - "github.com/duanhf2012/origin/sysmodule" - - "github.com/duanhf2012/origin/service" ) +/* type GlobalModule struct { service.BaseModule } @@ -40,3 +36,4 @@ func InitGlobalModule(exit chan bool, pwaitGroup *sync.WaitGroup) { func RunGlobalModule() { go g_module.RunModule(&g_module) } +*/ diff --git a/originnode/node.go b/originnode/node.go index aa39865..8927931 100644 --- a/originnode/node.go +++ b/originnode/node.go @@ -14,6 +14,7 @@ import ( "github.com/duanhf2012/origin/cluster" "github.com/duanhf2012/origin/service" "github.com/duanhf2012/origin/sysmodule" + "github.com/duanhf2012/origin/sysservice" ) type CExitCtl struct { @@ -30,8 +31,8 @@ type COriginNode struct { func (s *COriginNode) Init() { //初始化全局模块 - imodule := g_module.GetModuleById(sysmodule.SYS_LOG) - service.InstanceServiceMgr().Init(imodule.(service.ILogger), s.exitChan, s.waitGroup) + logger := service.InstanceServiceMgr().FindService("syslog").(service.ILogger) + service.InstanceServiceMgr().Init(logger, s.exitChan, s.waitGroup) s.sigs = make(chan os.Signal, 1) signal.Notify(s.sigs, syscall.SIGINT, syscall.SIGTERM) @@ -83,9 +84,6 @@ func (s *COriginNode) Start() { //开始运行集群 cluster.InstanceClusterMgr().Start() - //运行全局模块 - RunGlobalModule() - //开启所有服务 service.InstanceServiceMgr().Start() @@ -112,12 +110,10 @@ func NewOrginNode() *COriginNode { node.exitChan = make(chan bool) node.waitGroup = &sync.WaitGroup{} - //初始化全局模块 - InitGlobalModule(node.exitChan, node.waitGroup) - var syslogmodule sysmodule.LogModule - syslogmodule.Init("system", sysmodule.LEVER_INFO) - syslogmodule.SetModuleId(sysmodule.SYS_LOG) - AddModule(&syslogmodule) + //安装系统服务 + syslogservice := &sysservice.LogService{} + syslogservice.InitLog("syslog", sysmodule.LEVER_INFO) + service.InstanceServiceMgr().Setup(syslogservice) //初始化集群对象 err := cluster.InstanceClusterMgr().Init() diff --git a/service/Module.go b/service/Module.go index 06c7965..b71bacd 100644 --- a/service/Module.go +++ b/service/Module.go @@ -256,29 +256,34 @@ func (slf *BaseModule) getBaseModule() *BaseModule { } func (slf *BaseModule) RunModule(module IModule) { - module.OnInit() + err := module.OnInit() + if err != nil { + GetLogger().Printf(LEVER_ERROR, "Start module %T id is %d is fail,reason:%v...", module, module.GetModuleId(), err) + } else { + GetLogger().Printf(LEVER_INFO, "Start module %T id is %d...", module, module.GetModuleId()) + } //运行所有子模块 slf.WaitGroup.Add(1) defer slf.WaitGroup.Done() for { if atomic.LoadInt32(&slf.corouterstatus) != 0 { - slf.OnEndRun() - GetLogger().Printf(LEVER_INFO, "Stopping module %T id is %d...", slf.GetSelf(), module.GetModuleId()) + module.OnEndRun() + GetLogger().Printf(LEVER_INFO, "Stopping module %T id is %d...", module, module.GetModuleId()) break } select { case <-slf.ExitChan: - slf.OnEndRun() - GetLogger().Printf(LEVER_INFO, "Stopping module %T...", slf.GetSelf()) - fmt.Println("Stopping module %T...", slf.GetSelf()) + module.OnEndRun() + GetLogger().Printf(LEVER_INFO, "Stopping module %T...", module) + fmt.Printf("Stopping module %T...\n", module) return default: } if module.OnRun() == false { - slf.OnEndRun() + module.OnEndRun() return } } diff --git a/service/Service.go b/service/Service.go index 740a66c..d60b2a0 100644 --- a/service/Service.go +++ b/service/Service.go @@ -65,16 +65,19 @@ func (slf *BaseService) OnRemoveService(iservice IService) { func (slf *BaseService) Init(iservice IService) error { slf.ownerService = iservice - slf.servicename = fmt.Sprintf("%T", iservice) - parts := strings.Split(slf.servicename, ".") - if len(parts) != 2 { - GetLogger().Printf(LEVER_ERROR, "BaseService.Init: service name is error: %q", slf.servicename) - err := fmt.Errorf("BaseService.Init: service name is error: %q", slf.servicename) - return err + + if iservice.GetServiceName() == "" { + slf.servicename = fmt.Sprintf("%T", iservice) + parts := strings.Split(slf.servicename, ".") + if len(parts) != 2 { + GetLogger().Printf(LEVER_ERROR, "BaseService.Init: service name is error: %q", slf.servicename) + err := fmt.Errorf("BaseService.Init: service name is error: %q", slf.servicename) + return err + } + + slf.servicename = parts[1] } - slf.servicename = parts[1] slf.serviceid = InstanceServiceMgr().GenServiceID() - return nil } diff --git a/service/servicemanager.go b/service/servicemanager.go index f4ef1aa..aa1ca7f 100644 --- a/service/servicemanager.go +++ b/service/servicemanager.go @@ -18,7 +18,6 @@ type CServiceManager struct { } func (slf *CServiceManager) Setup(s IService) bool { - slf.localserviceMap[s.GetServiceName()] = s return true } @@ -43,7 +42,6 @@ func (slf *CServiceManager) FetchService(s FetchService) IService { } func (slf *CServiceManager) Init(logger ILogger, exit chan bool, pwaitGroup *sync.WaitGroup) bool { - slf.logger = logger for _, s := range slf.localserviceMap { (s.(IModule)).InitModule(exit, pwaitGroup) diff --git a/sysmodule/LogModule.go b/sysmodule/LogModule.go index 6e07b03..f8f5432 100644 --- a/sysmodule/LogModule.go +++ b/sysmodule/LogModule.go @@ -4,6 +4,7 @@ import ( "fmt" "log" "os" + "sync" "time" "github.com/duanhf2012/origin/service" @@ -34,6 +35,7 @@ type LogModule struct { logger [LEVEL_MAX]*log.Logger logFile *os.File openLevel uint + locker sync.Mutex } func (slf *LogModule) GetCurrentFileName() string { @@ -41,7 +43,15 @@ func (slf *LogModule) GetCurrentFileName() string { } func (slf *LogModule) CheckAndGenFile() { + + slf.locker.Lock() if time.Now().Day() != slf.currentDay { + + if time.Now().Day() == slf.currentDay { + slf.locker.Unlock() + return + } + slf.currentDay = time.Now().Day() if slf.logFile != nil { slf.logFile.Close() @@ -51,13 +61,16 @@ func (slf *LogModule) CheckAndGenFile() { slf.logFile, err = os.OpenFile(slf.GetCurrentFileName(), os.O_RDWR|os.O_CREATE|os.O_APPEND, 0766) if err != nil { fmt.Printf("create log file %+v error!", slf.GetCurrentFileName()) + slf.locker.Unlock() return } for level := 0; level < LEVEL_MAX; level++ { slf.logger[level] = log.New(slf.logFile, LogPrefix[level], log.Lshortfile|log.LstdFlags) } + } + slf.locker.Unlock() } func (slf *LogModule) Init(logfilename string, openLevel uint) {