去除掉全局模块

This commit is contained in:
boyce
2019-02-27 11:58:51 +08:00
parent 06ed7d3380
commit 4e541e0851
9 changed files with 123 additions and 126 deletions

View File

@@ -1,12 +1,13 @@
{ {
"NodeList":[ "PublicServiceList":["logiclog"],
"NodeList":[
{ {
"NodeID":1, "NodeID":1,
"NodeName":"N_Node1", "NodeName":"N_Node1",
"ServerAddr":"127.0.0.1:8080", "ServerAddr":"127.0.0.1:8080",
"ServiceList":["WSServerService","CTest","HttpServerService"], "ServiceList":["CTest"],
"ClusterNode":["N_Node2"] "ClusterNode":["N_Node2"]
}, },

View File

@@ -3,50 +3,94 @@ package main
import ( import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"io"
"net/http"
"os"
"time" "time"
"github.com/duanhf2012/origin/cluster" "github.com/duanhf2012/origin/sysmodule"
"github.com/duanhf2012/origin/network"
"github.com/duanhf2012/origin/originnode" "github.com/duanhf2012/origin/originnode"
"github.com/duanhf2012/origin/service" "github.com/duanhf2012/origin/service"
"github.com/duanhf2012/origin/sysservice" "github.com/duanhf2012/origin/sysservice"
"github.com/gorilla/websocket"
) )
type CMessageReceiver struct { //子模块
type CTestModule struct {
service.BaseModule
} }
func (slf *CMessageReceiver) OnConnected(webServer network.IWebsocketServer, clientid uint64) { func (ws *CTestModule) DoSomething() {
fmt.Printf("%d\n", clientid) fmt.Printf("CTestModule do some thing!\n")
} }
func (slf *CMessageReceiver) OnDisconnect(webServer network.IWebsocketServer, clientid uint64, err error) { func (ws *CTestModule) OnInit() error {
fmt.Printf("%d\n", clientid) fmt.Printf("CTestModule.OnInit\n")
fmt.Print(err) return nil
} }
func (slf *CMessageReceiver) OnRecvMsg(webServer network.IWebsocketServer, clientid uint64, msgtype int, data []byte) { func (ws *CTestModule) OnRun() bool {
fmt.Printf("%d,%d\n", clientid, msgtype) time.Sleep(2 * time.Second)
fmt.Print(string(data)) fmt.Printf("CTestModule.OnRun\n")
return true
webServer.SendMsg(clientid, websocket.TextMessage, data)
} }
func Test(res http.ResponseWriter, req *http.Request) { func (ws *CTestModule) OnEndRun() {
io.WriteString(res, "test..........!\n") fmt.Printf("CTestModule.OnEndRun\n")
} }
//CTest服务定义
type CTest struct { type CTest struct {
service.BaseService service.BaseService
tmp int tmp int
} }
func (ws *CTest) OnInit() error { func (ws *CTest) OnInit() error {
fmt.Printf("CTest.OnInit\n")
testModule := CTestModule{}
moduleId := ws.AddModule(&testModule)
pTmpModule := ws.GetModuleById(moduleId)
pTmpModuleTest := pTmpModule.(*CTestModule)
pTmpModuleTest.DoSomething()
return nil
}
func (ws *CTest) OnRun() bool {
ws.tmp = ws.tmp + 1
time.Sleep(1 * time.Second)
logic := service.InstanceServiceMgr().FindService("logiclog")
logic.(sysmodule.ILogger).Printf(sysmodule.LEVER_DEBUG, "CTest.OnRun\n")
fmt.Printf("CTest.OnRun\n")
/*
//if ws.tmp%10 == 0 {
var test CTestData
test.Bbbb = 1111
test.Cccc = 111
test.Ddd = "1111"
var test2 CTestData
err := cluster.Call("_CTest.RPC_LogTicker2\n", &test, &test2)
fmt.Print(err, test2)
//}
//模块的示例
testModule := CTestModule{}
moduleId := ws.AddModule(&testModule)
pTmpModule := ws.GetModuleById(moduleId)
pTmpModuleTest := pTmpModule.(*CTestModule)
pTmpModuleTest.DoSomething()
pservice := testModule.GetOwnerService()
fmt.Printf("%T", pservice)
*/
return true
}
func (ws *CTest) OnEndRun() {
fmt.Printf("CTest.OnEndRun\n")
}
func (ws *CTest) RPC_LogTicker2(args *CTestData, quo *CTestData) error {
*quo = *args
return nil return nil
} }
@@ -56,20 +100,6 @@ type CTestData struct {
Ddd string Ddd string
} }
type CTestModule struct {
service.BaseModule
}
func (ws *CTestModule) DoSomething() {
fmt.Printf("CTestModule do some thing!")
}
func (ws *CTest) RPC_LogTicker2(args *CTestData, quo *CTestData) error {
*quo = *args
return nil
}
func (ws *CTest) HTTP_LogTicker2(request *sysservice.HttpRequest, resp *sysservice.HttpRespone) error { func (ws *CTest) HTTP_LogTicker2(request *sysservice.HttpRequest, resp *sysservice.HttpRespone) error {
data := CTestData{111, 333, "34444"} data := CTestData{111, 333, "34444"}
@@ -77,68 +107,17 @@ func (ws *CTest) HTTP_LogTicker2(request *sysservice.HttpRequest, resp *sysservi
return nil return nil
} }
func (ws *CTest) OnRun() error {
ws.tmp = ws.tmp + 1
time.Sleep(1 * time.Second)
//if ws.tmp%10 == 0 {
var test CTestData
test.Bbbb = 1111
test.Cccc = 111
test.Ddd = "1111"
var test2 CTestData
err := cluster.Call("_CTest.RPC_LogTicker2", &test, &test2)
fmt.Print(err, test2)
//}
//模块的示例
testModule := CTestModule{}
testModule.SetModuleType(1)
ws.AddModule(&testModule)
pTmpModule := ws.GetModuleByType(1)
pTmpModuleTest := pTmpModule.(*CTestModule)
pTmpModuleTest.DoSomething()
pservice := testModule.GetOwnerService()
fmt.Printf("%T", pservice)
return nil
}
func NewCTest(servicetype int) *CTest {
wss := new(CTest)
wss.Init(wss, servicetype)
return wss
}
func checkFileIsExist(filename string) bool {
var exist = true
if _, err := os.Stat(filename); os.IsNotExist(err) {
exist = false
}
return exist
}
func (ws *CTest) OnDestory() error {
return nil
}
func main() { func main() {
node := originnode.NewOrginNode() node := originnode.NewOrginNode()
if node == nil { if node == nil {
return return
} }
var module CTestModule test := CTest{}
module.SetModuleType(1) logiclogservice := &sysservice.LogService{}
originnode.AddModule(&module) logiclogservice.InitLog("logiclog", sysmodule.LEVER_DEBUG)
ptest := originnode.GetModuleByType(1)
fmt.Print(ptest)
var receiver CMessageReceiver node.SetupService(logiclogservice, &test)
wsservice := sysservice.NewWSServerService("/ws", 1314, &receiver, false)
test := NewCTest(0)
httpserver := sysservice.NewHttpServerService(9120)
node.SetupService(test, httpserver, wsservice)
node.Init() node.Init()
node.Start() node.Start()

View File

@@ -22,8 +22,9 @@ type CNode struct {
} }
type ClusterConfig struct { type ClusterConfig struct {
NodeList []CNodeCfg //配置列表 PublicServiceList []string
currentNode CNode //当前node NodeList []CNodeCfg //配置列表
currentNode CNode //当前node
mapIdNode map[int]CNode //map[nodeid] CNode mapIdNode map[int]CNode //map[nodeid] CNode
mapClusterNodeService map[string][]CNode //map[nodename] []CNode mapClusterNodeService map[string][]CNode //map[nodename] []CNode
@@ -89,6 +90,10 @@ func ReadCfg(path string, nodeid int) (*ClusterConfig, error) {
} }
} }
//向c.currentNode中加入公共服务
for _, servicename := range c.PublicServiceList {
c.currentNode.ServiceList[servicename] = true
}
return c, nil return c, nil
} }

View File

@@ -2,13 +2,9 @@ package originnode
import ( import (
_ "net/http/pprof" _ "net/http/pprof"
"sync"
"github.com/duanhf2012/origin/sysmodule"
"github.com/duanhf2012/origin/service"
) )
/*
type GlobalModule struct { type GlobalModule struct {
service.BaseModule service.BaseModule
} }
@@ -40,3 +36,4 @@ func InitGlobalModule(exit chan bool, pwaitGroup *sync.WaitGroup) {
func RunGlobalModule() { func RunGlobalModule() {
go g_module.RunModule(&g_module) go g_module.RunModule(&g_module)
} }
*/

View File

@@ -14,6 +14,7 @@ import (
"github.com/duanhf2012/origin/cluster" "github.com/duanhf2012/origin/cluster"
"github.com/duanhf2012/origin/service" "github.com/duanhf2012/origin/service"
"github.com/duanhf2012/origin/sysmodule" "github.com/duanhf2012/origin/sysmodule"
"github.com/duanhf2012/origin/sysservice"
) )
type CExitCtl struct { type CExitCtl struct {
@@ -30,8 +31,8 @@ type COriginNode struct {
func (s *COriginNode) Init() { func (s *COriginNode) Init() {
//初始化全局模块 //初始化全局模块
imodule := g_module.GetModuleById(sysmodule.SYS_LOG) logger := service.InstanceServiceMgr().FindService("syslog").(service.ILogger)
service.InstanceServiceMgr().Init(imodule.(service.ILogger), s.exitChan, s.waitGroup) service.InstanceServiceMgr().Init(logger, s.exitChan, s.waitGroup)
s.sigs = make(chan os.Signal, 1) s.sigs = make(chan os.Signal, 1)
signal.Notify(s.sigs, syscall.SIGINT, syscall.SIGTERM) signal.Notify(s.sigs, syscall.SIGINT, syscall.SIGTERM)
@@ -83,9 +84,6 @@ func (s *COriginNode) Start() {
//开始运行集群 //开始运行集群
cluster.InstanceClusterMgr().Start() cluster.InstanceClusterMgr().Start()
//运行全局模块
RunGlobalModule()
//开启所有服务 //开启所有服务
service.InstanceServiceMgr().Start() service.InstanceServiceMgr().Start()
@@ -112,12 +110,10 @@ func NewOrginNode() *COriginNode {
node.exitChan = make(chan bool) node.exitChan = make(chan bool)
node.waitGroup = &sync.WaitGroup{} node.waitGroup = &sync.WaitGroup{}
//初始化全局模块 //安装系统服务
InitGlobalModule(node.exitChan, node.waitGroup) syslogservice := &sysservice.LogService{}
var syslogmodule sysmodule.LogModule syslogservice.InitLog("syslog", sysmodule.LEVER_INFO)
syslogmodule.Init("system", sysmodule.LEVER_INFO) service.InstanceServiceMgr().Setup(syslogservice)
syslogmodule.SetModuleId(sysmodule.SYS_LOG)
AddModule(&syslogmodule)
//初始化集群对象 //初始化集群对象
err := cluster.InstanceClusterMgr().Init() err := cluster.InstanceClusterMgr().Init()

View File

@@ -256,29 +256,34 @@ func (slf *BaseModule) getBaseModule() *BaseModule {
} }
func (slf *BaseModule) RunModule(module IModule) { func (slf *BaseModule) RunModule(module IModule) {
module.OnInit() err := module.OnInit()
if err != nil {
GetLogger().Printf(LEVER_ERROR, "Start module %T id is %d is fail,reason:%v...", module, module.GetModuleId(), err)
} else {
GetLogger().Printf(LEVER_INFO, "Start module %T id is %d...", module, module.GetModuleId())
}
//运行所有子模块 //运行所有子模块
slf.WaitGroup.Add(1) slf.WaitGroup.Add(1)
defer slf.WaitGroup.Done() defer slf.WaitGroup.Done()
for { for {
if atomic.LoadInt32(&slf.corouterstatus) != 0 { if atomic.LoadInt32(&slf.corouterstatus) != 0 {
slf.OnEndRun() module.OnEndRun()
GetLogger().Printf(LEVER_INFO, "Stopping module %T id is %d...", slf.GetSelf(), module.GetModuleId()) GetLogger().Printf(LEVER_INFO, "Stopping module %T id is %d...", module, module.GetModuleId())
break break
} }
select { select {
case <-slf.ExitChan: case <-slf.ExitChan:
slf.OnEndRun() module.OnEndRun()
GetLogger().Printf(LEVER_INFO, "Stopping module %T...", slf.GetSelf()) GetLogger().Printf(LEVER_INFO, "Stopping module %T...", module)
fmt.Println("Stopping module %T...", slf.GetSelf()) fmt.Printf("Stopping module %T...\n", module)
return return
default: default:
} }
if module.OnRun() == false { if module.OnRun() == false {
slf.OnEndRun() module.OnEndRun()
return return
} }
} }

View File

@@ -65,16 +65,19 @@ func (slf *BaseService) OnRemoveService(iservice IService) {
func (slf *BaseService) Init(iservice IService) error { func (slf *BaseService) Init(iservice IService) error {
slf.ownerService = iservice slf.ownerService = iservice
slf.servicename = fmt.Sprintf("%T", iservice)
parts := strings.Split(slf.servicename, ".") if iservice.GetServiceName() == "" {
if len(parts) != 2 { slf.servicename = fmt.Sprintf("%T", iservice)
GetLogger().Printf(LEVER_ERROR, "BaseService.Init: service name is error: %q", slf.servicename) parts := strings.Split(slf.servicename, ".")
err := fmt.Errorf("BaseService.Init: service name is error: %q", slf.servicename) if len(parts) != 2 {
return err GetLogger().Printf(LEVER_ERROR, "BaseService.Init: service name is error: %q", slf.servicename)
err := fmt.Errorf("BaseService.Init: service name is error: %q", slf.servicename)
return err
}
slf.servicename = parts[1]
} }
slf.servicename = parts[1]
slf.serviceid = InstanceServiceMgr().GenServiceID() slf.serviceid = InstanceServiceMgr().GenServiceID()
return nil return nil
} }

View File

@@ -18,7 +18,6 @@ type CServiceManager struct {
} }
func (slf *CServiceManager) Setup(s IService) bool { func (slf *CServiceManager) Setup(s IService) bool {
slf.localserviceMap[s.GetServiceName()] = s slf.localserviceMap[s.GetServiceName()] = s
return true return true
} }
@@ -43,7 +42,6 @@ func (slf *CServiceManager) FetchService(s FetchService) IService {
} }
func (slf *CServiceManager) Init(logger ILogger, exit chan bool, pwaitGroup *sync.WaitGroup) bool { func (slf *CServiceManager) Init(logger ILogger, exit chan bool, pwaitGroup *sync.WaitGroup) bool {
slf.logger = logger slf.logger = logger
for _, s := range slf.localserviceMap { for _, s := range slf.localserviceMap {
(s.(IModule)).InitModule(exit, pwaitGroup) (s.(IModule)).InitModule(exit, pwaitGroup)

View File

@@ -4,6 +4,7 @@ import (
"fmt" "fmt"
"log" "log"
"os" "os"
"sync"
"time" "time"
"github.com/duanhf2012/origin/service" "github.com/duanhf2012/origin/service"
@@ -34,6 +35,7 @@ type LogModule struct {
logger [LEVEL_MAX]*log.Logger logger [LEVEL_MAX]*log.Logger
logFile *os.File logFile *os.File
openLevel uint openLevel uint
locker sync.Mutex
} }
func (slf *LogModule) GetCurrentFileName() string { func (slf *LogModule) GetCurrentFileName() string {
@@ -41,7 +43,15 @@ func (slf *LogModule) GetCurrentFileName() string {
} }
func (slf *LogModule) CheckAndGenFile() { func (slf *LogModule) CheckAndGenFile() {
slf.locker.Lock()
if time.Now().Day() != slf.currentDay { if time.Now().Day() != slf.currentDay {
if time.Now().Day() == slf.currentDay {
slf.locker.Unlock()
return
}
slf.currentDay = time.Now().Day() slf.currentDay = time.Now().Day()
if slf.logFile != nil { if slf.logFile != nil {
slf.logFile.Close() slf.logFile.Close()
@@ -51,13 +61,16 @@ func (slf *LogModule) CheckAndGenFile() {
slf.logFile, err = os.OpenFile(slf.GetCurrentFileName(), os.O_RDWR|os.O_CREATE|os.O_APPEND, 0766) slf.logFile, err = os.OpenFile(slf.GetCurrentFileName(), os.O_RDWR|os.O_CREATE|os.O_APPEND, 0766)
if err != nil { if err != nil {
fmt.Printf("create log file %+v error!", slf.GetCurrentFileName()) fmt.Printf("create log file %+v error!", slf.GetCurrentFileName())
slf.locker.Unlock()
return return
} }
for level := 0; level < LEVEL_MAX; level++ { for level := 0; level < LEVEL_MAX; level++ {
slf.logger[level] = log.New(slf.logFile, LogPrefix[level], log.Lshortfile|log.LstdFlags) slf.logger[level] = log.New(slf.logFile, LogPrefix[level], log.Lshortfile|log.LstdFlags)
} }
} }
slf.locker.Unlock()
} }
func (slf *LogModule) Init(logfilename string, openLevel uint) { func (slf *LogModule) Init(logfilename string, openLevel uint) {