整理系统日志

This commit is contained in:
boyce
2019-02-21 18:19:48 +08:00
parent bcace08cf0
commit ee61b6145e
6 changed files with 122 additions and 169 deletions

View File

@@ -2,21 +2,19 @@ package cluster
import ( import (
"fmt" "fmt"
"log"
"net" "net"
"os" "os"
"strconv" "strconv"
"strings" "strings"
"time" "time"
"github.com/duanhf2012/origin/sysmodule"
"github.com/duanhf2012/origin/service" "github.com/duanhf2012/origin/service"
"github.com/duanhf2012/origin/rpc" "github.com/duanhf2012/origin/rpc"
) )
//https://github.com/rocket049/rpc2d/blob/master/rpcnode.go
//http://daizuozhuo.github.io/golang-rpc-practice/
type RpcClient struct { type RpcClient struct {
nodeid int nodeid int
pclient *rpc.Client pclient *rpc.Client
@@ -36,9 +34,7 @@ type CCluster struct {
} }
func (slf *CCluster) ReadNodeInfo(nodeid int) error { func (slf *CCluster) ReadNodeInfo(nodeid int) error {
//连接Server结点
var err error var err error
slf.cfg, err = ReadCfg("./config/cluster.json", nodeid) slf.cfg, err = ReadCfg("./config/cluster.json", nodeid)
if err != nil { if err != nil {
fmt.Printf("%v", err) fmt.Printf("%v", err)
@@ -57,39 +53,17 @@ func (slf *CCluster) GetClusterClient(id int) *rpc.Client {
return v.pclient return v.pclient
} }
func (slf *CCluster) GetClusterNode(strNodeName string) *CNodeCfg { func (slf *CCluster) GetBindUrl() string {
for _, value := range slf.cfg.NodeList { return slf.cfg.currentNode.ServerAddr
if value.NodeName == strNodeName {
return &value
}
}
return nil
}
func (slf *CCluster) GetBindUrl() (string, error) {
return slf.cfg.currentNode.ServerAddr, nil
}
type CTestData struct {
Bbbb int64
Cccc int
Ddd string
} }
func (slf *CCluster) AcceptRpc(tpcListen *net.TCPListener) error { func (slf *CCluster) AcceptRpc(tpcListen *net.TCPListener) error {
/*slf.reader, slf.writer = net.Pipe()
go rpc.ServeConn(slf.reader)
slf.LocalRpcClient = rpc.NewClient(slf.writer)
*/
for { for {
conn, err := tpcListen.Accept() conn, err := tpcListen.Accept()
if err != nil { if err != nil {
fmt.Print(err) service.GetLogger().Printf(sysmodule.LEVER_ERROR, "tpcListen.Accept error:%v", err)
return err return err
} }
//使用goroutine单独处理rpc连接请求
go rpc.ServeConn(conn) go rpc.ServeConn(conn)
} }
@@ -98,19 +72,17 @@ func (slf *CCluster) AcceptRpc(tpcListen *net.TCPListener) error {
func (slf *CCluster) ListenService() error { func (slf *CCluster) ListenService() error {
bindStr, err := slf.GetBindUrl() bindStr := slf.GetBindUrl()
if err != nil {
return err
}
tcpaddr, err := net.ResolveTCPAddr("tcp4", bindStr) tcpaddr, err := net.ResolveTCPAddr("tcp4", bindStr)
if err != nil { if err != nil {
service.GetLogger().Printf(sysmodule.LEVER_FATAL, "ResolveTCPAddr error:%v", err)
os.Exit(1) os.Exit(1)
return err return err
} }
tcplisten, err2 := net.ListenTCP("tcp", tcpaddr) tcplisten, err2 := net.ListenTCP("tcp", tcpaddr)
if err2 != nil { if err2 != nil {
service.GetLogger().Printf(sysmodule.LEVER_FATAL, "ListenTCP error:%v", err)
os.Exit(1) os.Exit(1)
return err2 return err2
} }
@@ -139,7 +111,7 @@ func (slf *CPing) Ping(ping *CPing, pong *CPong) error {
func (slf *CCluster) ConnService() error { func (slf *CCluster) ConnService() error {
ping := CPing{0} ping := CPing{0}
pong := CPong{0} pong := CPong{0}
fmt.Println(rpc.RegisterName("CPing", "", &ping)) rpc.RegisterName("CPing", "", &ping)
//连接集群服务器 //连接集群服务器
for _, nodeList := range slf.cfg.mapClusterNodeService { for _, nodeList := range slf.cfg.mapClusterNodeService {
@@ -151,7 +123,7 @@ func (slf *CCluster) ConnService() error {
for { for {
for _, rpcClient := range slf.nodeclient { for _, rpcClient := range slf.nodeclient {
// //连接状态发送ping
if rpcClient.isConnect == true { if rpcClient.isConnect == true {
ping.TimeStamp = 0 ping.TimeStamp = 0
err := rpcClient.pclient.Call("CPing.Ping", &ping, &pong) err := rpcClient.pclient.Call("CPing.Ping", &ping, &pong)
@@ -164,6 +136,7 @@ func (slf *CCluster) ConnService() error {
continue continue
} }
//非连接状态重新连接
if rpcClient.pclient != nil { if rpcClient.pclient != nil {
rpcClient.pclient.Close() rpcClient.pclient.Close()
rpcClient.pclient = nil rpcClient.pclient = nil
@@ -171,9 +144,10 @@ func (slf *CCluster) ConnService() error {
client, err := rpc.Dial("tcp", rpcClient.serverAddr) client, err := rpc.Dial("tcp", rpcClient.serverAddr)
if err != nil { if err != nil {
log.Println(err) service.GetLogger().Printf(sysmodule.LEVER_WARN, "Connect nodeid:%d,address:%s fail", rpcClient.nodeid, rpcClient.serverAddr)
continue continue
} }
service.GetLogger().Printf(sysmodule.LEVER_INFO, "Connect nodeid:%d,address:%s succ", rpcClient.nodeid, rpcClient.serverAddr)
v, _ := slf.nodeclient[rpcClient.nodeid] v, _ := slf.nodeclient[rpcClient.nodeid]
v.pclient = client v.pclient = client
@@ -188,14 +162,17 @@ func (slf *CCluster) ConnService() error {
func (slf *CCluster) Init() error { func (slf *CCluster) Init() error {
if len(os.Args) < 2 { if len(os.Args) < 2 {
service.GetLogger().Printf(sysmodule.LEVER_ERROR, "CCluster.Init error,param not find NodeId=number")
return fmt.Errorf("param error not find NodeId=number") return fmt.Errorf("param error not find NodeId=number")
} }
parts := strings.Split(os.Args[1], "=") parts := strings.Split(os.Args[1], "=")
if len(parts) < 2 { if len(parts) < 2 {
service.GetLogger().Printf(sysmodule.LEVER_ERROR, "CCluster.Init error,param not find NodeId=number")
return fmt.Errorf("param error not find NodeId=number") return fmt.Errorf("param error not find NodeId=number")
} }
if parts[0] != "NodeId" { if parts[0] != "NodeId" {
service.GetLogger().Printf(sysmodule.LEVER_ERROR, "CCluster.Init error,param error not find NodeId=number")
return fmt.Errorf("param error not find NodeId=number") return fmt.Errorf("param error not find NodeId=number")
} }
@@ -204,6 +181,7 @@ func (slf *CCluster) Init() error {
//读取配置 //读取配置
ret, err := strconv.Atoi(parts[1]) ret, err := strconv.Atoi(parts[1])
if err != nil { if err != nil {
service.GetLogger().Printf(sysmodule.LEVER_ERROR, "CCluster.Init parts[1] error,%v", err)
return err return err
} }
@@ -211,7 +189,6 @@ func (slf *CCluster) Init() error {
} }
func (slf *CCluster) Start() error { func (slf *CCluster) Start() error {
service.InstanceServiceMgr().FetchService(slf.OnFetchService) service.InstanceServiceMgr().FetchService(slf.OnFetchService)
//监听服务 //监听服务
@@ -230,7 +207,8 @@ func (slf *CCluster) Call(NodeServiceMethod string, args interface{}, reply inte
var callServiceName string var callServiceName string
nodeidList := slf.GetNodeList(NodeServiceMethod, &callServiceName) nodeidList := slf.GetNodeList(NodeServiceMethod, &callServiceName)
if len(nodeidList) > 1 || len(nodeidList) < 1 { if len(nodeidList) > 1 || len(nodeidList) < 1 {
return fmt.Errorf("Call: %s find %d nodes.", NodeServiceMethod, len(nodeidList)) service.GetLogger().Printf(sysmodule.LEVER_ERROR, "CCluster.Call(%s) not find nodes.", NodeServiceMethod)
return fmt.Errorf("CCluster.Call(%s) not find nodes.", NodeServiceMethod)
} }
nodeid := nodeidList[0] nodeid := nodeidList[0]
@@ -239,13 +217,18 @@ func (slf *CCluster) Call(NodeServiceMethod string, args interface{}, reply inte
} else { } else {
pclient := slf.GetClusterClient(nodeid) pclient := slf.GetClusterClient(nodeid)
if pclient == nil { if pclient == nil {
return fmt.Errorf("Call: NodeId %d is not find.", nodeid) service.GetLogger().Printf(sysmodule.LEVER_ERROR, "CCluster.Call(%s): NodeId %d is not find.", NodeServiceMethod, nodeid)
return fmt.Errorf("CCluster.Call(%s): NodeId %d is not find.", NodeServiceMethod, nodeid)
} }
err := pclient.Call(callServiceName, args, reply) err := pclient.Call(callServiceName, args, reply)
if err != nil {
service.GetLogger().Printf(sysmodule.LEVER_ERROR, "CCluster.Call(%s) is fail:%v.", callServiceName, err)
}
return err return err
} }
return fmt.Errorf("Call: %s fail.", NodeServiceMethod) service.GetLogger().Printf(sysmodule.LEVER_ERROR, "CCluster.Call(%s) fail.", NodeServiceMethod)
return fmt.Errorf("CCluster.Call(%s) fail.", NodeServiceMethod)
} }
func (slf *CCluster) GetNodeList(NodeServiceMethod string, rpcServerMethod *string) []int { func (slf *CCluster) GetNodeList(NodeServiceMethod string, rpcServerMethod *string) []int {
@@ -286,24 +269,32 @@ func (slf *CCluster) Go(bCast bool, NodeServiceMethod string, args interface{})
var callServiceName string var callServiceName string
nodeidList := slf.GetNodeList(NodeServiceMethod, &callServiceName) nodeidList := slf.GetNodeList(NodeServiceMethod, &callServiceName)
if len(nodeidList) < 1 { if len(nodeidList) < 1 {
return fmt.Errorf("Call: %s find %d nodes.", NodeServiceMethod, len(nodeidList)) service.GetLogger().Printf(sysmodule.LEVER_ERROR, "CCluster.Go(%s) not find nodes.", NodeServiceMethod)
return fmt.Errorf("CCluster.Go(%s) not find nodes.", NodeServiceMethod)
} }
if bCast == false && len(nodeidList) > 1 { if bCast == false && len(nodeidList) > 1 {
return fmt.Errorf("Call: %s find more nodes %d.", NodeServiceMethod, len(nodeidList)) return fmt.Errorf("CCluster.Go(%s) find more nodes", NodeServiceMethod)
} }
for _, nodeid := range nodeidList { for _, nodeid := range nodeidList {
if nodeid == slf.GetCurrentNodeId() { if nodeid == slf.GetCurrentNodeId() {
slf.LocalRpcClient.Go(callServiceName, args, nil, nil) replyCall := slf.LocalRpcClient.Go(callServiceName, args, nil, nil)
//return nil if replyCall.Error != nil {
service.GetLogger().Printf(sysmodule.LEVER_ERROR, "CCluster.Go(%s) fail:%v.", NodeServiceMethod, replyCall.Error)
}
return replyCall.Error
} else { } else {
pclient := slf.GetClusterClient(nodeid) pclient := slf.GetClusterClient(nodeid)
if pclient == nil { if pclient == nil {
return fmt.Errorf("Call: NodeId %d is not find.", nodeid) service.GetLogger().Printf(sysmodule.LEVER_ERROR, "CCluster.Go(%s) NodeId %d not find client", NodeServiceMethod, nodeid)
return fmt.Errorf("CCluster.Go(%s) NodeId %d not find client", NodeServiceMethod, nodeid)
} }
pclient.Go(callServiceName, args, nil, nil) replyCall := pclient.Go(callServiceName, args, nil, nil)
//return nil if replyCall.Error != nil {
service.GetLogger().Printf(sysmodule.LEVER_ERROR, "CCluster.Go(%s) fail:%v.", NodeServiceMethod, replyCall.Error)
}
return replyCall.Error
} }
} }
@@ -313,28 +304,31 @@ func (slf *CCluster) Go(bCast bool, NodeServiceMethod string, args interface{})
func (slf *CCluster) CallNode(nodeid int, servicemethod string, args interface{}, reply interface{}) error { func (slf *CCluster) CallNode(nodeid int, servicemethod string, args interface{}, reply interface{}) error {
pclient := slf.GetClusterClient(nodeid) pclient := slf.GetClusterClient(nodeid)
if pclient == nil { if pclient == nil {
service.GetLogger().Printf(sysmodule.LEVER_ERROR, "CCluster.CallNode(%d,%s) NodeId not find client", nodeid, servicemethod)
return fmt.Errorf("Call: NodeId %d is not find.", nodeid) return fmt.Errorf("Call: NodeId %d is not find.", nodeid)
} }
err := pclient.Call(servicemethod, args, reply) err := pclient.Call(servicemethod, args, reply)
return err if err != nil {
service.GetLogger().Printf(sysmodule.LEVER_ERROR, "CCluster.CallNode(%d,%s) fail:%v", nodeid, servicemethod, err)
}
return err
} }
func (slf *CCluster) GoNode(nodeid int, args interface{}, servicemethod string) error { func (slf *CCluster) GoNode(nodeid int, args interface{}, servicemethod string) error {
pclient := slf.GetClusterClient(nodeid) pclient := slf.GetClusterClient(nodeid)
if pclient == nil { if pclient == nil {
return fmt.Errorf("Call: NodeId %d is not find.", nodeid) service.GetLogger().Printf(sysmodule.LEVER_ERROR, "CCluster.GoNode(%d,%s) NodeId not find client", nodeid, servicemethod)
return fmt.Errorf("CCluster.GoNode(%d,%s) NodeId not find client", nodeid, servicemethod)
} }
replyCall := pclient.Go(servicemethod, args, nil, nil) replyCall := pclient.Go(servicemethod, args, nil, nil)
//ret := <-replyCall.Done
if replyCall.Error != nil { if replyCall.Error != nil {
fmt.Print(replyCall.Error) service.GetLogger().Printf(sysmodule.LEVER_ERROR, "CCluster.GoNode(%d,%s) fail:%v", nodeid, servicemethod, replyCall.Error)
} }
//fmt.Print(ret) return replyCall.Error
return nil
} }

View File

@@ -7,40 +7,33 @@ import (
) )
type CNodeCfg struct { type CNodeCfg struct {
NodeID int NodeID int
NodeName string NodeName string
ServerAddr string ServerAddr string
ServiceList []string ServiceList []string
ClusterNode []string ClusterNode []string
} }
type CNode struct { type CNode struct {
NodeID int NodeID int
NodeName string NodeName string
ServerAddr string ServerAddr string
ServiceList map[string]bool ServiceList map[string]bool
} }
type ClusterConfig struct { type ClusterConfig struct {
NodeList []CNodeCfg NodeList []CNodeCfg //配置列表
currentNode CNode //当前node
//通过id获取结点 mapIdNode map[int]CNode //map[nodeid] CNode
mapIdNode map[int]CNode
//map[nodename][ {CNode} ]
mapClusterNodeService map[string][]CNode //map[nodename] []CNode mapClusterNodeService map[string][]CNode //map[nodename] []CNode
mapClusterServiceNode map[string][]CNode //map[servicename] []CNode mapClusterServiceNode map[string][]CNode //map[servicename] []CNode
//mapLocalService map[string]bool //map[servicename] bool
currentNode CNode
} }
// ReadCfg ...
func ReadCfg(path string, nodeid int) (*ClusterConfig, error) { func ReadCfg(path string, nodeid int) (*ClusterConfig, error) {
c := &ClusterConfig{} c := &ClusterConfig{}
//1.加载解析配置
d, err := ioutil.ReadFile(path) d, err := ioutil.ReadFile(path)
if err != nil { if err != nil {
fmt.Printf("Read File %s Error!", path) fmt.Printf("Read File %s Error!", path)
@@ -57,8 +50,8 @@ func ReadCfg(path string, nodeid int) (*ClusterConfig, error) {
c.mapClusterNodeService = make(map[string][]CNode, 1) c.mapClusterNodeService = make(map[string][]CNode, 1)
c.mapClusterServiceNode = make(map[string][]CNode, 1) c.mapClusterServiceNode = make(map[string][]CNode, 1)
//2.组装mapIdNode
var custerNodeName []string var custerNodeName []string
//组装mapIdNode
for _, v := range c.NodeList { for _, v := range c.NodeList {
mapservice := make(map[string]bool, 1) mapservice := make(map[string]bool, 1)
for _, s := range v.ServiceList { for _, s := range v.ServiceList {
@@ -75,10 +68,10 @@ func ReadCfg(path string, nodeid int) (*ClusterConfig, error) {
} }
} }
//存入当前Node服务名 //3.存入当前Node服务名
c.mapClusterNodeService[c.currentNode.NodeName] = append(c.mapClusterNodeService[c.currentNode.NodeName], c.currentNode) c.mapClusterNodeService[c.currentNode.NodeName] = append(c.mapClusterNodeService[c.currentNode.NodeName], c.currentNode)
//组装mapClusterNodeService //4.组装mapClusterNodeService
for _, cn := range custerNodeName { for _, cn := range custerNodeName {
for _, n := range c.mapIdNode { for _, n := range c.mapIdNode {
if n.NodeName == cn { if n.NodeName == cn {
@@ -87,7 +80,7 @@ func ReadCfg(path string, nodeid int) (*ClusterConfig, error) {
} }
} }
//组装mapClusterServiceNode //5.组装mapClusterServiceNode
for _, nodelist := range c.mapClusterNodeService { //[]Node for _, nodelist := range c.mapClusterNodeService { //[]Node
for _, node := range nodelist { //Node for _, node := range nodelist { //Node
for s := range node.ServiceList { for s := range node.ServiceList {
@@ -138,7 +131,5 @@ func (slf *ClusterConfig) GetIdByNodeService(NodeName string, serviceName string
func (slf *ClusterConfig) HasLocalService(serviceName string) bool { func (slf *ClusterConfig) HasLocalService(serviceName string) bool {
_, ok := slf.currentNode.ServiceList[serviceName] _, ok := slf.currentNode.ServiceList[serviceName]
//_, ok := slf.mapLocalService[serviceName]
return ok == true return ok == true
} }

View File

@@ -13,6 +13,7 @@ type GlobalModule struct {
service.BaseModule service.BaseModule
} }
// 全局模块定义
var g_module GlobalModule var g_module GlobalModule
func AddModule(module service.IModule) uint32 { func AddModule(module service.IModule) uint32 {

View File

@@ -17,7 +17,7 @@ import (
) )
type CExitCtl struct { type CExitCtl struct {
exit chan bool exitChan chan bool
waitGroup *sync.WaitGroup waitGroup *sync.WaitGroup
} }
@@ -32,7 +32,7 @@ func (s *COriginNode) Init() {
//初始化全局模块 //初始化全局模块
service.InitLog() service.InitLog()
imodule := g_module.GetModuleById(sysmodule.SYS_LOG) imodule := g_module.GetModuleById(sysmodule.SYS_LOG)
service.InstanceServiceMgr().Init(imodule.(service.ILogger), s.exit, s.waitGroup) service.InstanceServiceMgr().Init(imodule.(service.ILogger), s.exitChan, s.waitGroup)
s.sigs = make(chan os.Signal, 1) s.sigs = make(chan os.Signal, 1)
signal.Notify(s.sigs, syscall.SIGINT, syscall.SIGTERM) signal.Notify(s.sigs, syscall.SIGINT, syscall.SIGTERM)
@@ -74,38 +74,50 @@ func (s *COriginNode) SetupService(services ...service.IService) {
func (s *COriginNode) Start() { func (s *COriginNode) Start() {
if s.debugListenAddress != "" { if s.debugListenAddress != "" {
go func() { go func() {
log.Println(http.ListenAndServe(s.debugListenAddress, nil)) log.Println(http.ListenAndServe(s.debugListenAddress, nil))
}() }()
} }
//开始运行集群
cluster.InstanceClusterMgr().Start() cluster.InstanceClusterMgr().Start()
//运行全局模块
RunGlobalModule() RunGlobalModule()
//开启所有服务
service.InstanceServiceMgr().Start() service.InstanceServiceMgr().Start()
//监听退出信号
select { select {
case <-s.sigs: case <-s.sigs:
fmt.Println("收到信号推出程序") service.GetLogger().Printf(sysmodule.LEVER_WARN, "Recv stop sig")
fmt.Printf("Recv stop sig")
} }
//停止运行程序
s.Stop() s.Stop()
} }
func (s *COriginNode) Stop() { func (s *COriginNode) Stop() {
close(s.exit) close(s.exitChan)
s.waitGroup.Wait() s.waitGroup.Wait()
} }
func NewOrginNode() *COriginNode { func NewOrginNode() *COriginNode {
//创建模块
node := new(COriginNode) node := new(COriginNode)
node.exit = make(chan bool) node.exitChan = make(chan bool)
node.waitGroup = &sync.WaitGroup{} node.waitGroup = &sync.WaitGroup{}
InitGlobalModule(node.exit, node.waitGroup)
//初始化全局模块
InitGlobalModule(node.exitChan, node.waitGroup)
var syslogmodule sysmodule.LogModule var syslogmodule sysmodule.LogModule
syslogmodule.Init("system", sysmodule.LEVER_INFO) syslogmodule.Init("system", sysmodule.LEVER_INFO)
syslogmodule.SetModuleId(sysmodule.SYS_LOG) syslogmodule.SetModuleId(sysmodule.SYS_LOG)
AddModule(&syslogmodule) AddModule(&syslogmodule)
//初始化集群对象
err := cluster.InstanceClusterMgr().Init() err := cluster.InstanceClusterMgr().Init()
if err != nil { if err != nil {
fmt.Print(err) fmt.Print(err)
@@ -114,13 +126,3 @@ func NewOrginNode() *COriginNode {
return node return node
} }
func HasCmdParam(param string) bool {
for i := 0; i < len(os.Args); i++ {
if os.Args[i] == param {
return true
}
}
return false
}

View File

@@ -22,10 +22,9 @@ type IModule interface {
GetModuleId() uint32 GetModuleId() uint32
GetModuleById(moduleId uint32) IModule GetModuleById(moduleId uint32) IModule
AddModule(module IModule) uint32 AddModule(module IModule) uint32
//DynamicAddModule(module IModule) uint32 RunModule(module IModule)
RunModule(module IModule) error
InitModule(exit chan bool, pwaitGroup *sync.WaitGroup) error InitModule(exit chan bool, pwaitGroup *sync.WaitGroup) error
OnInit() error OnInit() error
OnRun() bool OnRun() bool
@@ -89,7 +88,6 @@ type BaseModule struct {
CurrMaxModuleId uint32 CurrMaxModuleId uint32
rwModuleLocker sync.RWMutex rwModuleLocker sync.RWMutex
selfModule IModule
ownerModule IModule ownerModule IModule
} }
@@ -139,15 +137,6 @@ func (slf *BaseService) Init(iservice IService) error {
return nil return nil
} }
func (slf *BaseService) RPC_CheckServiceTickTimeOut(microSecond int64) error {
if slf.IsTimeOutTick(microSecond) == true {
// Log.Printf("service:%s is timeout,state:%d", slf.GetServiceName(), slf.GetStatus())
}
return nil
}
func (slf *BaseService) IsTimeOutTick(microSecond int64) bool { func (slf *BaseService) IsTimeOutTick(microSecond int64) bool {
nowtm := time.Now().UnixNano() / 1e6 nowtm := time.Now().UnixNano() / 1e6
@@ -177,7 +166,6 @@ func (slf *BaseModule) GetModuleById(moduleId uint32) IModule {
} }
func (slf *BaseModule) genModuleId() uint32 { func (slf *BaseModule) genModuleId() uint32 {
slf.rwModuleLocker.Lock() slf.rwModuleLocker.Lock()
slf.CurrMaxModuleId++ slf.CurrMaxModuleId++
moduleId := slf.CurrMaxModuleId moduleId := slf.CurrMaxModuleId
@@ -190,7 +178,7 @@ func (slf *BaseModule) RemoveModule(moduleId uint32) bool {
slf.rwModuleLocker.Lock() slf.rwModuleLocker.Lock()
_, ok := slf.mapModule[moduleId] _, ok := slf.mapModule[moduleId]
if ok == false { if ok == false {
GetLogger().Printf(LEVER_WARN, "RemoveModule fail %d...", moduleId) GetLogger().Printf(LEVER_WARN, "%T RemoveModule fail %d...", slf.GetOwner().GetModuleById(slf.GetModuleId()), moduleId)
slf.rwModuleLocker.Unlock() slf.rwModuleLocker.Unlock()
return false return false
} }
@@ -205,15 +193,25 @@ func (slf *BaseModule) IsRoot() bool {
return slf.GetOwner().GetModuleById(slf.GetModuleId()) == nil return slf.GetOwner().GetModuleById(slf.GetModuleId()) == nil
} }
const (
//ModuleNone ...
MAX_ALLOW_SET_MODULE_ID = iota + 100000000
INIT_AUTO_INCREMENT
)
func (slf *BaseModule) AddModule(module IModule) uint32 { func (slf *BaseModule) AddModule(module IModule) uint32 {
if slf.WaitGroup == nil { if slf.WaitGroup == nil {
GetLogger().Printf(LEVER_FATAL, "AddModule error %s...", fmt.Sprintf("%T", module)) GetLogger().Printf(LEVER_FATAL, "AddModule error wait group is nil:%T...", module)
}
if module.GetModuleId() > 100000000 {
return 0 return 0
} }
//用户设置的id不允许大于MAX_ALLOW_SET_MODULE_ID
if module.GetModuleId() > MAX_ALLOW_SET_MODULE_ID {
return 0
}
//如果没有设置自动生成ModuleId
if module.GetModuleId() == 0 { if module.GetModuleId() == 0 {
module.SetModuleId(slf.genModuleId()) module.SetModuleId(slf.genModuleId())
} }
@@ -224,17 +222,16 @@ func (slf *BaseModule) AddModule(module IModule) uint32 {
} else { } else {
module.SetOwner(slf.GetOwner().GetModuleById(slf.GetModuleId())) module.SetOwner(slf.GetOwner().GetModuleById(slf.GetModuleId()))
} }
} }
//设置模块退出信号捕获
module.InitModule(slf.ExitChan, slf.WaitGroup) module.InitModule(slf.ExitChan, slf.WaitGroup)
//存入父模块中
slf.rwModuleLocker.Lock() slf.rwModuleLocker.Lock()
if slf.mapModule == nil { if slf.mapModule == nil {
slf.mapModule = make(map[uint32]IModule) slf.mapModule = make(map[uint32]IModule)
} }
_, ok := slf.mapModule[module.GetModuleId()] _, ok := slf.mapModule[module.GetModuleId()]
if ok == true { if ok == true {
slf.rwModuleLocker.Unlock() slf.rwModuleLocker.Unlock()
@@ -244,12 +241,13 @@ func (slf *BaseModule) AddModule(module IModule) uint32 {
slf.mapModule[module.GetModuleId()] = module slf.mapModule[module.GetModuleId()] = module
slf.rwModuleLocker.Unlock() slf.rwModuleLocker.Unlock()
//运行模块
go module.RunModule(module) go module.RunModule(module)
return module.GetModuleId() return module.GetModuleId()
} }
func (slf *BaseModule) OnInit() error { func (slf *BaseModule) OnInit() error {
return fmt.Errorf("not implement OnInit moduletype %d ", slf.GetModuleId()) return nil
} }
func (slf *BaseModule) OnRun() bool { func (slf *BaseModule) OnRun() bool {
@@ -274,7 +272,7 @@ func (slf *BaseModule) SetOwnerService(iservice IService) {
} }
func (slf *BaseModule) InitModule(exit chan bool, pwaitGroup *sync.WaitGroup) error { func (slf *BaseModule) InitModule(exit chan bool, pwaitGroup *sync.WaitGroup) error {
slf.CurrMaxModuleId = 100000 slf.CurrMaxModuleId = INIT_AUTO_INCREMENT
slf.WaitGroup = pwaitGroup slf.WaitGroup = pwaitGroup
slf.ExitChan = exit slf.ExitChan = exit
return nil return nil
@@ -284,33 +282,30 @@ func (slf *BaseModule) getBaseModule() *BaseModule {
return slf return slf
} }
func (slf *BaseModule) RunModule(module IModule) error { func (slf *BaseModule) GetSelf() IModule {
if slf.IsRoot() {
return slf.GetOwner()
}
return slf.GetOwner().GetModuleById(slf.GetModuleId())
}
func (slf *BaseModule) RunModule(module IModule) {
module.OnInit() module.OnInit()
//运行所有子模块 //运行所有子模块
slf.rwModuleLocker.RLock()
/*
for _, subModule := range slf.mapModule {
go subModule.RunModule(subModule)
}*/
slf.rwModuleLocker.RUnlock()
slf.WaitGroup.Add(1) slf.WaitGroup.Add(1)
defer slf.WaitGroup.Done() defer slf.WaitGroup.Done()
for { for {
select { select {
case <-slf.ExitChan: case <-slf.ExitChan:
GetLogger().Printf(LEVER_WARN, "stopping module %s...", fmt.Sprintf("%T", slf)) GetLogger().Printf(LEVER_WARN, "Stopping module %T...", slf.GetSelf())
fmt.Println("stopping module %s...", fmt.Sprintf("%T", slf)) fmt.Println("Stopping module %T...", slf.GetSelf())
return nil return
} }
if module.OnRun() == false { if module.OnRun() == false {
break break
} }
slf.tickTime = time.Now().UnixNano() / 1e6 slf.tickTime = time.Now().UnixNano() / 1e6
} }
return nil
} }

View File

@@ -1,9 +1,7 @@
package service package service
import ( import (
"fmt"
"sync" "sync"
"time"
) )
type IServiceManager interface { type IServiceManager interface {
@@ -49,7 +47,6 @@ func (slf *CServiceManager) Init(logger ILogger, exit chan bool, pwaitGroup *syn
slf.logger = logger slf.logger = logger
for _, s := range slf.localserviceMap { for _, s := range slf.localserviceMap {
(s.(IModule)).InitModule(exit, pwaitGroup) (s.(IModule)).InitModule(exit, pwaitGroup)
//(s.(IModule)).OnInit()
} }
return true return true
@@ -63,52 +60,25 @@ func (slf *CServiceManager) Start() bool {
return true return true
} }
func (slf *CServiceManager) CheckServiceTimeTimeout(exit chan bool, pwaitGroup *sync.WaitGroup) {
defer pwaitGroup.Done()
for {
select {
case <-exit:
fmt.Println("CheckServiceTimeTimeout stopping...")
return
}
for _, s := range slf.localserviceMap {
if s.IsTimeOutTick(20000) == true {
Log.Printf("service:%s is timeout,state:%d", s.GetServiceName(), s.GetStatus())
}
}
time.Sleep(2 * time.Second)
}
}
func (slf *CServiceManager) GenServiceID() int { func (slf *CServiceManager) GenServiceID() int {
slf.genserviceid += 1 slf.genserviceid += 1
return slf.genserviceid return slf.genserviceid
} }
func (slf *CServiceManager) Get() bool {
for _, s := range slf.localserviceMap {
go s.OnRun()
}
return true
}
func (slf *CServiceManager) GetLogger() ILogger { func (slf *CServiceManager) GetLogger() ILogger {
return slf.logger return slf.logger
} }
var _self *CServiceManager var self *CServiceManager
func InstanceServiceMgr() *CServiceManager { func InstanceServiceMgr() *CServiceManager {
if _self == nil { if self == nil {
_self = new(CServiceManager) self = new(CServiceManager)
_self.localserviceMap = make(map[string]IService) self.localserviceMap = make(map[string]IService)
return _self return self
} }
return _self
return self
} }
func GetLogger() ILogger { func GetLogger() ILogger {