diff --git a/cluster/cluster.go b/cluster/cluster.go index 6791bb5..ee35ee2 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -9,9 +9,7 @@ import ( "sync" ) -var configdir = "./config/" - - +var configDir = "./config/" type NodeInfo struct { NodeId int @@ -21,83 +19,77 @@ type NodeInfo struct { } type NodeRpcInfo struct { - nodeinfo NodeInfo + nodeInfo NodeInfo client *rpc.Client } - var cluster Cluster - type Cluster struct { - localNodeInfo NodeInfo //× - localServiceCfg map[string]interface{} //map[servicename]配置数据* - - mapRpc map[int] NodeRpcInfo//nodeid - rpcServer rpc.Server - serviceDiscovery IServiceDiscovery //服务发现接口 - - mapIdNode map[int]NodeInfo //map[NodeId]NodeInfo - mapServiceNode map[string][]int //map[serviceName]NodeInfo - + localNodeInfo NodeInfo + localServiceCfg map[string]interface{} //map[serviceName]配置数据* + mapRpc map[int] NodeRpcInfo //nodeId + serviceDiscovery IServiceDiscovery //服务发现接口 + mapIdNode map[int]NodeInfo //map[NodeId]NodeInfo + mapServiceNode map[string][]int //map[serviceName]NodeInfo locker sync.RWMutex + rpcServer rpc.Server } -func SetConfigDir(cfgdir string){ - configdir = cfgdir +func SetConfigDir(cfgDir string){ + configDir = cfgDir } func SetServiceDiscovery(serviceDiscovery IServiceDiscovery) { cluster.serviceDiscovery = serviceDiscovery } -func (slf *Cluster) serviceDiscoveryDelNode (nodeId int){ - slf.locker.Lock() - defer slf.locker.Unlock() +func (cls *Cluster) serviceDiscoveryDelNode (nodeId int){ + cls.locker.Lock() + defer cls.locker.Unlock() - slf.delNode(nodeId) + cls.delNode(nodeId) } -func (slf *Cluster) delNode(nodeId int){ +func (cls *Cluster) delNode(nodeId int){ //删除rpc连接关系 - rpc,ok := slf.mapRpc[nodeId] + rpc,ok := cls.mapRpc[nodeId] if ok == true { - delete(slf.mapRpc,nodeId) + delete(cls.mapRpc,nodeId) rpc.client.Close(false) } - nodeInfo,ok := slf.mapIdNode[nodeId] + nodeInfo,ok := cls.mapIdNode[nodeId] if ok == false { return } for _,serviceName := range nodeInfo.ServiceList{ - slf.delServiceNode(serviceName,nodeId) + cls.delServiceNode(serviceName,nodeId) } - delete(slf.mapIdNode,nodeId) + delete(cls.mapIdNode,nodeId) } -func (slf *Cluster) delServiceNode(serviceName string,nodeId int){ - nodeList := slf.mapServiceNode[serviceName] +func (cls *Cluster) delServiceNode(serviceName string,nodeId int){ + nodeList := cls.mapServiceNode[serviceName] for idx,nId := range nodeList { if nId == nodeId { - slf.mapServiceNode[serviceName] = append(nodeList[idx:],nodeList[idx+1:]...) + cls.mapServiceNode[serviceName] = append(nodeList[idx:],nodeList[idx+1:]...) return } } } - -func (slf *Cluster) serviceDiscoverySetNodeInfo (nodeInfo *NodeInfo){ - if nodeInfo.NodeId == slf.localNodeInfo.NodeId { +func (cls *Cluster) serviceDiscoverySetNodeInfo (nodeInfo *NodeInfo){ + if nodeInfo.NodeId == cls.localNodeInfo.NodeId { return } - slf.locker.Lock() - defer slf.locker.Unlock() + cls.locker.Lock() + defer cls.locker.Unlock() //先清理删除 - slf.delNode(nodeInfo.NodeId) + cls.delNode(nodeInfo.NodeId) //再重新组装 mapDuplicate := map[string]interface{}{} //预防重复数据 @@ -108,58 +100,58 @@ func (slf *Cluster) serviceDiscoverySetNodeInfo (nodeInfo *NodeInfo){ continue } - slf.mapServiceNode[serviceName] = append(slf.mapServiceNode[serviceName],nodeInfo.NodeId) + cls.mapServiceNode[serviceName] = append(cls.mapServiceNode[serviceName],nodeInfo.NodeId) } - slf.mapIdNode[nodeInfo.NodeId] = *nodeInfo + cls.mapIdNode[nodeInfo.NodeId] = *nodeInfo rpcInfo := NodeRpcInfo{} - rpcInfo.nodeinfo = *nodeInfo + rpcInfo.nodeInfo = *nodeInfo rpcInfo.client = &rpc.Client{} rpcInfo.client.Connect(nodeInfo.ListenAddr) - slf.mapRpc[nodeInfo.NodeId] = rpcInfo + cls.mapRpc[nodeInfo.NodeId] = rpcInfo } -func (slf *Cluster) buildLocalRpc(){ +func (cls *Cluster) buildLocalRpc(){ rpcInfo := NodeRpcInfo{} - rpcInfo.nodeinfo = slf.localNodeInfo + rpcInfo.nodeInfo = cls.localNodeInfo rpcInfo.client = &rpc.Client{} rpcInfo.client.Connect("") - slf.mapRpc[slf.localNodeInfo.NodeId] = rpcInfo + cls.mapRpc[cls.localNodeInfo.NodeId] = rpcInfo } -func (slf *Cluster) Init(localNodeId int) error{ - slf.locker.Lock() - +func (cls *Cluster) Init(localNodeId int) error{ + cls.locker.Lock() //1.处理服务发现接口 - if slf.serviceDiscovery == nil { - slf.serviceDiscovery = &ConfigDiscovery{} + if cls.serviceDiscovery == nil { + cls.serviceDiscovery = &ConfigDiscovery{} } //2.初始化配置 - err := slf.InitCfg(localNodeId) + err := cls.InitCfg(localNodeId) if err != nil { - slf.locker.Unlock() + cls.locker.Unlock() return err } - slf.rpcServer.Init(slf) - slf.buildLocalRpc() + cls.rpcServer.Init(cls) + cls.buildLocalRpc() - slf.serviceDiscovery.RegFunDelNode(slf.serviceDiscoveryDelNode) - slf.serviceDiscovery.RegFunSetNode(slf.serviceDiscoverySetNodeInfo) - slf.locker.Unlock() + cls.serviceDiscovery.RegFunDelNode(cls.serviceDiscoveryDelNode) + cls.serviceDiscovery.RegFunSetNode(cls.serviceDiscoverySetNodeInfo) + cls.locker.Unlock() - err = slf.serviceDiscovery.Init(localNodeId) + err = cls.serviceDiscovery.Init(localNodeId) if err != nil { return err } + return nil } -func (slf *Cluster) FindRpcHandler(servicename string) rpc.IRpcHandler { - pService := service.GetService(servicename) +func (cls *Cluster) FindRpcHandler(serviceName string) rpc.IRpcHandler { + pService := service.GetService(serviceName) if pService == nil { return nil } @@ -167,22 +159,22 @@ func (slf *Cluster) FindRpcHandler(servicename string) rpc.IRpcHandler { return pService.GetRpcHandler() } -func (slf *Cluster) Start() { - slf.rpcServer.Start(slf.localNodeInfo.ListenAddr) +func (cls *Cluster) Start() { + cls.rpcServer.Start(cls.localNodeInfo.ListenAddr) } -func (slf *Cluster) Stop() { - slf.serviceDiscovery.OnNodeStop() +func (cls *Cluster) Stop() { + cls.serviceDiscovery.OnNodeStop() } func GetCluster() *Cluster{ return &cluster } -func (slf *Cluster) GetRpcClient(nodeid int) *rpc.Client { - slf.locker.RLock() - defer slf.locker.RUnlock() - c,ok := slf.mapRpc[nodeid] +func (cls *Cluster) GetRpcClient(nodeId int) *rpc.Client { + cls.locker.RLock() + defer cls.locker.RUnlock() + c,ok := cls.mapRpc[nodeId] if ok == false { return nil } @@ -205,7 +197,7 @@ func GetRpcClient(nodeId int,serviceMethod string,clientList *[]*rpc.Client) err return fmt.Errorf("servicemethod param %s is error!",serviceMethod) } - //1.找到对应的rpcnodeid + //1.找到对应的rpcNodeid GetCluster().GetNodeIdByService(serviceAndMethod[0],clientList) return nil } @@ -214,7 +206,7 @@ func GetRpcServer() *rpc.Server{ return &cluster.rpcServer } -func (slf *Cluster) IsNodeConnected (nodeId int) bool { - pClient := slf.GetRpcClient(nodeId) +func (cls *Cluster) IsNodeConnected (nodeId int) bool { + pClient := cls.GetRpcClient(nodeId) return pClient!=nil && pClient.IsConnected() } diff --git a/cluster/configdiscovery.go b/cluster/configdiscovery.go index 26609e4..a3b1685 100644 --- a/cluster/configdiscovery.go +++ b/cluster/configdiscovery.go @@ -7,8 +7,8 @@ type ConfigDiscovery struct { localNodeId int } -func (slf *ConfigDiscovery) Init(localNodeId int) error{ - slf.localNodeId = localNodeId +func (discovery *ConfigDiscovery) Init(localNodeId int) error{ + discovery.localNodeId = localNodeId //解析本地其他服务配置 nodeInfoList,err := GetCluster().readLocalClusterConfig(0) @@ -20,21 +20,19 @@ func (slf *ConfigDiscovery) Init(localNodeId int) error{ if nodeInfo.NodeId == localNodeId { continue } - slf.funSetService(&nodeInfo) + discovery.funSetService(&nodeInfo) } return nil } - -func (slf *ConfigDiscovery) OnNodeStop(){ +func (discovery *ConfigDiscovery) OnNodeStop(){ } - -func (slf *ConfigDiscovery) RegFunDelNode(funDelNode FunDelNode){ - slf.funDelService = funDelNode +func (discovery *ConfigDiscovery) RegFunDelNode(funDelNode FunDelNode){ + discovery.funDelService = funDelNode } -func (slf *ConfigDiscovery) RegFunSetNode(funSetNodeInfo FunSetNodeInfo){ - slf.funSetService = funSetNodeInfo +func (discovery *ConfigDiscovery) RegFunSetNode(funSetNodeInfo FunSetNodeInfo){ + discovery.funSetService = funSetNodeInfo } diff --git a/cluster/parsecfg.go b/cluster/parsecfg.go index a780328..346ea6a 100644 --- a/cluster/parsecfg.go +++ b/cluster/parsecfg.go @@ -14,8 +14,7 @@ type NodeInfoList struct { NodeList []NodeInfo } - -func (slf *Cluster) ReadClusterConfig(filepath string) (*NodeInfoList,error) { +func (cls *Cluster) ReadClusterConfig(filepath string) (*NodeInfoList,error) { c := &NodeInfoList{} d, err := ioutil.ReadFile(filepath) if err != nil { @@ -29,9 +28,7 @@ func (slf *Cluster) ReadClusterConfig(filepath string) (*NodeInfoList,error) { return c,nil } - -func (slf *Cluster) readServiceConfig(filepath string) (map[string]interface{},map[int]map[string]interface{},error) { - +func (cls *Cluster) readServiceConfig(filepath string) (map[string]interface{},map[int]map[string]interface{},error) { c := map[string]interface{}{} //读取配置 d, err := ioutil.ReadFile(filepath) @@ -65,10 +62,9 @@ func (slf *Cluster) readServiceConfig(filepath string) (map[string]interface{}, return serviceConfig,mapNodeService,nil } - -func (slf *Cluster) readLocalClusterConfig(nodeId int) ([]NodeInfo,error) { +func (cls *Cluster) readLocalClusterConfig(nodeId int) ([]NodeInfo,error) { var nodeInfoList [] NodeInfo - clusterCfgPath :=strings.TrimRight(configdir,"/") +"/cluster" + clusterCfgPath :=strings.TrimRight(configDir,"/") +"/cluster" fileInfoList,err := ioutil.ReadDir(clusterCfgPath) if err != nil { return nil,fmt.Errorf("Read dir %s is fail :%+v",clusterCfgPath,err) @@ -78,7 +74,7 @@ func (slf *Cluster) readLocalClusterConfig(nodeId int) ([]NodeInfo,error) { for _,f := range fileInfoList{ if f.IsDir() == false { filePath := strings.TrimRight(strings.TrimRight(clusterCfgPath,"/"),"\\")+"/"+f.Name() - localNodeInfoList,err := slf.ReadClusterConfig(filePath) + localNodeInfoList,err := cls.ReadClusterConfig(filePath) if err != nil { return nil,fmt.Errorf("read file path %s is error:%+v" ,filePath,err) } @@ -86,7 +82,6 @@ func (slf *Cluster) readLocalClusterConfig(nodeId int) ([]NodeInfo,error) { for _,nodeInfo := range localNodeInfoList.NodeList { if nodeInfo.NodeId == nodeId || nodeId == 0 { nodeInfoList = append(nodeInfoList,nodeInfo) - //slf.localNodeInfo = nodeInfo } } } @@ -99,8 +94,8 @@ func (slf *Cluster) readLocalClusterConfig(nodeId int) ([]NodeInfo,error) { return nodeInfoList,nil } -func (slf *Cluster) readLocalService(localNodeId int) error { - clusterCfgPath :=strings.TrimRight(configdir,"/") +"/cluster" +func (cls *Cluster) readLocalService(localNodeId int) error { + clusterCfgPath :=strings.TrimRight(configDir,"/") +"/cluster" fileInfoList,err := ioutil.ReadDir(clusterCfgPath) if err != nil { return fmt.Errorf("Read dir %s is fail :%+v",clusterCfgPath,err) @@ -110,17 +105,17 @@ func (slf *Cluster) readLocalService(localNodeId int) error { for _,f := range fileInfoList { if f.IsDir() == false { filePath := strings.TrimRight(strings.TrimRight(clusterCfgPath, "/"), "\\") + "/" + f.Name() - serviceConfig,mapNodeService,err := slf.readServiceConfig(filePath) + serviceConfig,mapNodeService,err := cls.readServiceConfig(filePath) if err != nil { continue } - for _,s := range slf.localNodeInfo.ServiceList{ + for _,s := range cls.localNodeInfo.ServiceList{ for{ //取公共服务配置 pubCfg,ok := serviceConfig[s] if ok == true { - slf.localServiceCfg[s] = pubCfg + cls.localServiceCfg[s] = pubCfg } //如果结点也配置了该服务,则覆盖之 @@ -133,7 +128,7 @@ func (slf *Cluster) readLocalService(localNodeId int) error { break } - slf.localServiceCfg[s] = sCfg + cls.localServiceCfg[s] = sCfg break } } @@ -143,49 +138,48 @@ func (slf *Cluster) readLocalService(localNodeId int) error { return nil } -func (slf *Cluster) parseLocalCfg(){ - slf.mapIdNode[slf.localNodeInfo.NodeId] = slf.localNodeInfo +func (cls *Cluster) parseLocalCfg(){ + cls.mapIdNode[cls.localNodeInfo.NodeId] = cls.localNodeInfo - for _,sName := range slf.localNodeInfo.ServiceList{ - slf.mapServiceNode[sName] = append(slf.mapServiceNode[sName],slf.localNodeInfo.NodeId) + for _,sName := range cls.localNodeInfo.ServiceList{ + cls.mapServiceNode[sName] = append(cls.mapServiceNode[sName], cls.localNodeInfo.NodeId) } } -func (slf *Cluster) InitCfg(localNodeId int) error{ - slf.localServiceCfg = map[string]interface{}{} - slf.mapRpc = map[int] NodeRpcInfo{} - slf.mapIdNode = map[int]NodeInfo{} - slf.mapServiceNode = map[string][]int{} +func (cls *Cluster) InitCfg(localNodeId int) error{ + cls.localServiceCfg = map[string]interface{}{} + cls.mapRpc = map[int] NodeRpcInfo{} + cls.mapIdNode = map[int]NodeInfo{} + cls.mapServiceNode = map[string][]int{} //加载本地结点的NodeList配置 - nodeInfoList,err := slf.readLocalClusterConfig(localNodeId) + nodeInfoList,err := cls.readLocalClusterConfig(localNodeId) if err != nil { return err } - slf.localNodeInfo = nodeInfoList[0] + cls.localNodeInfo = nodeInfoList[0] //读取本地服务配置 - err = slf.readLocalService(localNodeId) + err = cls.readLocalService(localNodeId) if err != nil { return err } //本地配置服务加到全局map信息中 - slf.parseLocalCfg() + cls.parseLocalCfg() return nil } - -func (slf *Cluster) IsConfigService(serviceName string) bool { - slf.locker.RLock() - defer slf.locker.RUnlock() - nodeList,ok := slf.mapServiceNode[serviceName] +func (cls *Cluster) IsConfigService(serviceName string) bool { + cls.locker.RLock() + defer cls.locker.RUnlock() + nodeList,ok := cls.mapServiceNode[serviceName] if ok == false { return false } for _,nodeId := range nodeList{ - if slf.localNodeInfo.NodeId == nodeId { + if cls.localNodeInfo.NodeId == nodeId { return true } } @@ -193,10 +187,10 @@ func (slf *Cluster) IsConfigService(serviceName string) bool { return false } -func (slf *Cluster) GetNodeIdByService(servicename string,rpcClientList *[]*rpc.Client) { - slf.locker.RLock() - defer slf.locker.RUnlock() - nodeIdList,ok := slf.mapServiceNode[servicename] +func (cls *Cluster) GetNodeIdByService(serviceName string,rpcClientList *[]*rpc.Client) { + cls.locker.RLock() + defer cls.locker.RUnlock() + nodeIdList,ok := cls.mapServiceNode[serviceName] if ok == true { for _,nodeId := range nodeIdList { pClient := GetCluster().GetRpcClient(nodeId) @@ -209,9 +203,8 @@ func (slf *Cluster) GetNodeIdByService(servicename string,rpcClientList *[]*rpc. } } - -func (slf *Cluster) getServiceCfg(servicename string) interface{}{ - v,ok := slf.localServiceCfg[servicename] +func (cls *Cluster) getServiceCfg(serviceName string) interface{}{ + v,ok := cls.localServiceCfg[serviceName] if ok == false { return nil } @@ -219,8 +212,8 @@ func (slf *Cluster) getServiceCfg(servicename string) interface{}{ return v } -func (slf *Cluster) GetServiceCfg(serviceName string) interface{}{ - serviceCfg,ok := slf.localServiceCfg[serviceName] +func (cls *Cluster) GetServiceCfg(serviceName string) interface{}{ + serviceCfg,ok := cls.localServiceCfg[serviceName] if ok == false { return nil } diff --git a/cluster/servicediscovery.go b/cluster/servicediscovery.go index 70b66ac..77adf88 100644 --- a/cluster/servicediscovery.go +++ b/cluster/servicediscovery.go @@ -8,7 +8,6 @@ type FunSetNodeInfo func(nodeInfo *NodeInfo) type IServiceDiscovery interface { Init(localNodeId int) error OnNodeStop() - RegFunDelNode(funDelNode FunDelNode) RegFunSetNode(funSetNodeInfo FunSetNodeInfo) } diff --git a/console/command.go b/console/command.go index 0e4803e..b256474 100644 --- a/console/command.go +++ b/console/command.go @@ -9,6 +9,7 @@ import ( type valueType int type CommandFunctionCB func(args interface{}) error var commandList []*command +var programName string const( boolType valueType = iota stringType valueType = iota @@ -29,15 +30,12 @@ func (cmd *command) execute() error{ }else if cmd.valType == stringType { return cmd.fn(cmd.strValue) }else{ - return fmt.Errorf("unknow command type!") + return fmt.Errorf("Unknow command type.") } return nil } - -var programName string - func Run(args []string) error { flag.Parse() programName = args[0] @@ -57,7 +55,11 @@ func Run(args []string) error { } } - return startCmd.execute() + if startCmd != nil { + return startCmd.execute() + } + + return fmt.Errorf("Command input parameter error,try `%s -help` for help",args[0]) } func RegisterCommandBool(cmdName string, defaultValue bool, usage string,fn CommandFunctionCB){ diff --git a/event/event.go b/event/event.go index c3133bc..00b9fcd 100644 --- a/event/event.go +++ b/event/event.go @@ -7,7 +7,7 @@ import ( "sync" ) -const Default_EventChannelLen = 10000 +const DefaultEventChannelLen = 10000 //事件接受器 type EventCallBack func(event *Event) @@ -22,7 +22,7 @@ type IEventHandler interface { Init(processor IEventProcessor) GetEventProcessor() IEventProcessor //获得事件 NotifyEvent(*Event) - Desctory() + Destroy() //注册了事件 addRegInfo(eventType EventType,eventProcessor IEventProcessor) removeRegInfo(eventType EventType,eventProcessor IEventProcessor) @@ -77,135 +77,134 @@ func NewEventProcessor() IEventProcessor{ return &ep } -func (slf *EventHandler) addRegInfo(eventType EventType,eventProcessor IEventProcessor){ - slf.locker.Lock() - defer slf.locker.Unlock() - if slf.mapRegEvent == nil { - slf.mapRegEvent = map[EventType]map[IEventProcessor]interface{}{} +func (handler *EventHandler) addRegInfo(eventType EventType,eventProcessor IEventProcessor){ + handler.locker.Lock() + defer handler.locker.Unlock() + if handler.mapRegEvent == nil { + handler.mapRegEvent = map[EventType]map[IEventProcessor]interface{}{} } - if _,ok := slf.mapRegEvent[eventType] ;ok == false{ - slf.mapRegEvent[eventType] = map[IEventProcessor]interface{}{} + if _,ok := handler.mapRegEvent[eventType] ;ok == false{ + handler.mapRegEvent[eventType] = map[IEventProcessor]interface{}{} } - slf.mapRegEvent[eventType][eventProcessor] = nil + handler.mapRegEvent[eventType][eventProcessor] = nil } -func (slf *EventHandler) removeRegInfo(eventType EventType,eventProcessor IEventProcessor){ - if _,ok :=slf.mapRegEvent[eventType];ok == true { - delete(slf.mapRegEvent[eventType],eventProcessor) +func (handler *EventHandler) removeRegInfo(eventType EventType,eventProcessor IEventProcessor){ + if _,ok := handler.mapRegEvent[eventType];ok == true { + delete(handler.mapRegEvent[eventType],eventProcessor) } } -func (slf *EventHandler) GetEventProcessor() IEventProcessor{ - return slf.eventProcessor +func (handler *EventHandler) GetEventProcessor() IEventProcessor{ + return handler.eventProcessor } -func (slf *EventHandler) NotifyEvent(ev *Event){ - slf.GetEventProcessor().castEvent(ev) +func (handler *EventHandler) NotifyEvent(ev *Event){ + handler.GetEventProcessor().castEvent(ev) } -func (slf *EventHandler) Init(processor IEventProcessor){ - slf.eventProcessor = processor - slf.mapRegEvent =map[EventType]map[IEventProcessor]interface{}{} +func (handler *EventHandler) Init(processor IEventProcessor){ + handler.eventProcessor = processor + handler.mapRegEvent =map[EventType]map[IEventProcessor]interface{}{} } -func (slf *EventProcessor) SetEventChannel(channelNum int) bool{ - slf.locker.Lock() - defer slf.locker.Unlock() - if slf.eventChannel!=nil { +func (processor *EventProcessor) SetEventChannel(channelNum int) bool{ + processor.locker.Lock() + defer processor.locker.Unlock() + if processor.eventChannel!=nil { return false } if channelNum == 0 { - channelNum = Default_EventChannelLen + channelNum = DefaultEventChannelLen } - slf.eventChannel = make(chan *Event,channelNum) + processor.eventChannel = make(chan *Event,channelNum) return true } -func (slf *EventProcessor) addBindEvent(eventType EventType,reciver IEventHandler,callback EventCallBack){ - slf.locker.Lock() - defer slf.locker.Unlock() +func (processor *EventProcessor) addBindEvent(eventType EventType,reciver IEventHandler,callback EventCallBack){ + processor.locker.Lock() + defer processor.locker.Unlock() - if _,ok := slf.mapBindHandlerEvent[eventType]; ok == false { - slf.mapBindHandlerEvent[eventType] = map[IEventHandler]EventCallBack{} + if _,ok := processor.mapBindHandlerEvent[eventType]; ok == false { + processor.mapBindHandlerEvent[eventType] = map[IEventHandler]EventCallBack{} } - slf.mapBindHandlerEvent[eventType][reciver] = callback + processor.mapBindHandlerEvent[eventType][reciver] = callback } -func (slf *EventProcessor) addListen(eventType EventType,reciver IEventHandler){ - slf.locker.Lock() - defer slf.locker.Unlock() +func (processor *EventProcessor) addListen(eventType EventType,reciver IEventHandler){ + processor.locker.Lock() + defer processor.locker.Unlock() - if _,ok :=slf.mapListenerEvent[eventType];ok == false{ - slf.mapListenerEvent[eventType] = map[IEventProcessor]int{} + if _,ok := processor.mapListenerEvent[eventType];ok == false{ + processor.mapListenerEvent[eventType] = map[IEventProcessor]int{} } - slf.mapListenerEvent[eventType][reciver.GetEventProcessor()] += 1 + processor.mapListenerEvent[eventType][reciver.GetEventProcessor()] += 1 } -func (slf *EventProcessor) removeBindEvent(eventType EventType,reciver IEventHandler){ - slf.locker.Lock() - defer slf.locker.Unlock() - if _,ok := slf.mapBindHandlerEvent[eventType];ok == true{ - delete(slf.mapBindHandlerEvent[eventType],reciver) +func (processor *EventProcessor) removeBindEvent(eventType EventType,reciver IEventHandler){ + processor.locker.Lock() + defer processor.locker.Unlock() + if _,ok := processor.mapBindHandlerEvent[eventType];ok == true{ + delete(processor.mapBindHandlerEvent[eventType],reciver) } } -func (slf *EventProcessor) removeListen(eventType EventType,reciver IEventHandler){ - slf.locker.Lock() - defer slf.locker.Unlock() - if _,ok := slf.mapListenerEvent[eventType];ok == true{ - slf.mapListenerEvent[eventType][reciver.GetEventProcessor()]-=1 - if slf.mapListenerEvent[eventType][reciver.GetEventProcessor()] <= 0 { - delete(slf.mapListenerEvent[eventType],reciver.GetEventProcessor()) +func (processor *EventProcessor) removeListen(eventType EventType,reciver IEventHandler){ + processor.locker.Lock() + defer processor.locker.Unlock() + if _,ok := processor.mapListenerEvent[eventType];ok == true{ + processor.mapListenerEvent[eventType][reciver.GetEventProcessor()]-=1 + if processor.mapListenerEvent[eventType][reciver.GetEventProcessor()] <= 0 { + delete(processor.mapListenerEvent[eventType],reciver.GetEventProcessor()) } } } -func (slf *EventProcessor) RegEventReciverFunc(eventType EventType,reciver IEventHandler,callback EventCallBack){ +func (processor *EventProcessor) RegEventReciverFunc(eventType EventType,reciver IEventHandler,callback EventCallBack){ //记录reciver自己注册过的事件 - reciver.addRegInfo(eventType,slf) + reciver.addRegInfo(eventType, processor) //记录当前所属IEventProcessor注册的回调 reciver.GetEventProcessor().addBindEvent(eventType,reciver,callback) //将注册加入到监听中 - slf.addListen(eventType,reciver) + processor.addListen(eventType,reciver) } -func (slf *EventProcessor) UnRegEventReciverFun(eventType EventType,reciver IEventHandler) { - slf.removeListen(eventType,reciver) +func (processor *EventProcessor) UnRegEventReciverFun(eventType EventType,reciver IEventHandler) { + processor.removeListen(eventType,reciver) reciver.GetEventProcessor().removeBindEvent(eventType,reciver) - reciver.removeRegInfo(eventType,slf) + reciver.removeRegInfo(eventType, processor) } -func (slf *EventHandler) Desctory(){ - for eventTyp,mapEventProcess := range slf.mapRegEvent { +func (handler *EventHandler) Destroy(){ + for eventTyp,mapEventProcess := range handler.mapRegEvent { if mapEventProcess == nil { continue } - //map[IEventProcessor]interface{} for eventProcess,_ := range mapEventProcess { - eventProcess.UnRegEventReciverFun(eventTyp,slf) + eventProcess.UnRegEventReciverFun(eventTyp, handler) } } } -func (slf *EventProcessor) GetEventChan() chan *Event{ - slf.locker.Lock() - defer slf.locker.Unlock() +func (processor *EventProcessor) GetEventChan() chan *Event{ + processor.locker.Lock() + defer processor.locker.Unlock() - if slf.eventChannel == nil { - slf.eventChannel =make(chan *Event,Default_EventChannelLen) + if processor.eventChannel == nil { + processor.eventChannel =make(chan *Event,DefaultEventChannelLen) } - return slf.eventChannel + return processor.eventChannel } -func (slf *EventProcessor) EventHandler(ev *Event) { +func (processor *EventProcessor) EventHandler(ev *Event) { defer func() { if r := recover(); r != nil { buf := make([]byte, 4096) @@ -215,7 +214,7 @@ func (slf *EventProcessor) EventHandler(ev *Event) { } }() - mapCallBack,ok := slf.mapBindHandlerEvent[ev.Type] + mapCallBack,ok := processor.mapBindHandlerEvent[ev.Type] if ok == false { return } @@ -227,28 +226,28 @@ func (slf *EventProcessor) EventHandler(ev *Event) { -func (slf *EventProcessor) pushEvent(event *Event){ - if len(slf.eventChannel)>=cap(slf.eventChannel){ +func (processor *EventProcessor) pushEvent(event *Event){ + if len(processor.eventChannel)>=cap(processor.eventChannel){ log.Error("event process channel is full.") return } - slf.eventChannel<-event + processor.eventChannel<-event } -func (slf *EventProcessor) castEvent(event *Event){ - if slf.mapListenerEvent == nil{ +func (processor *EventProcessor) castEvent(event *Event){ + if processor.mapListenerEvent == nil{ log.Error("mapListenerEvent not init!") return } - processor,ok :=slf.mapListenerEvent[event.Type] + eventProcessor,ok := processor.mapListenerEvent[event.Type] if ok == false || processor == nil{ log.Debug("event type %d not listen.",event.Type) return } - for proc,_ := range processor { + for proc,_ := range eventProcessor { proc.pushEvent(event) } } diff --git a/service/module.go b/service/module.go index b360729..a0838b7 100644 --- a/service/module.go +++ b/service/module.go @@ -124,7 +124,7 @@ func (slf *Module) ReleaseModule(moduleId int64){ slf.ReleaseModule(id) } - pModule.GetEventHandler().Desctory() + pModule.GetEventHandler().Destroy() pModule.self.OnRelease() log.Debug("Release module %s.",slf.GetModuleName()) for pTimer,_ := range pModule.mapActiveTimer {