From a6ea25bba0d9870b7f96483e6d56def646e6ca66 Mon Sep 17 00:00:00 2001 From: duanhf2012 Date: Tue, 20 Jul 2021 14:35:30 +0800 Subject: [PATCH] =?UTF-8?q?=E5=90=88=E5=B9=B6Service=E4=B8=AD=E7=9A=84Even?= =?UTF-8?q?t=E3=80=81RpcResponse=E3=80=81RpcRequest=E7=AE=A1=E9=81=93?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cluster/cluster.go | 2 +- event/event.go | 124 +++++++++++++++------------------ event/eventtype.go | 14 ++-- rpc/client.go | 10 +-- rpc/rpchandler.go | 44 ++++-------- rpc/server.go | 9 ++- service/module.go | 11 ++- service/service.go | 170 ++++++++++++++++++++++++++++++++------------- 8 files changed, 215 insertions(+), 169 deletions(-) diff --git a/cluster/cluster.go b/cluster/cluster.go index 4c98f58..c5d28bc 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -370,7 +370,7 @@ func (cls *Cluster) triggerRpcEvent (bConnect bool,clientSeq uint32,nodeId int) continue } - var eventData service.RpcEventData + var eventData service.RpcConnEvent eventData.IsConnect = bConnect eventData.NodeId = nodeId ser.(service.IModule).NotifyEvent(&eventData) diff --git a/event/event.go b/event/event.go index de9d564..6a4608f 100644 --- a/event/event.go +++ b/event/event.go @@ -6,7 +6,6 @@ import ( "sync" ) -const DefaultEventChannelLen = 1000000 //事件接受器 type EventCallBack func(event IEvent) @@ -18,6 +17,24 @@ type IEvent interface { type Event struct { Type EventType Data interface{} + ref bool +} + +var emptyEvent Event +func (e *Event) Reset() { + *e = emptyEvent +} + +func (e *Event) IsRef() bool { + return e.ref +} + +func (e *Event) Ref() { + e.ref = true +} + +func (e *Event) UnRef() { + e.ref = false } func (e *Event) GetEventType() EventType{ @@ -34,20 +51,23 @@ type IEventHandler interface { removeRegInfo(eventType EventType,eventProcessor IEventProcessor) } +type IEventChannel interface { + PushEvent(ev IEvent) error +} + type IEventProcessor interface { + IEventChannel + + Init(eventChannel IEventChannel) EventHandler(ev IEvent) - //同一个IEventHandler,只能接受一个EventType类型回调 - RegEventReciverFunc(eventType EventType,reciver IEventHandler,callback EventCallBack) - UnRegEventReciverFun(eventType EventType,reciver IEventHandler) - SetEventChannel(channelNum int) bool + RegEventReceiverFunc(eventType EventType, receiver IEventHandler,callback EventCallBack) + UnRegEventReceiverFun(eventType EventType, receiver IEventHandler) castEvent(event IEvent) //广播事件 - pushEvent(event IEvent) - addBindEvent(eventType EventType,reciver IEventHandler,callback EventCallBack) - addListen(eventType EventType,reciver IEventHandler) - removeBindEvent(eventType EventType,reciver IEventHandler) - removeListen(eventType EventType,reciver IEventHandler) - GetEventChan() chan IEvent + addBindEvent(eventType EventType, receiver IEventHandler,callback EventCallBack) + addListen(eventType EventType, receiver IEventHandler) + removeBindEvent(eventType EventType, receiver IEventHandler) + removeListen(eventType EventType, receiver IEventHandler) } type EventHandler struct { @@ -59,9 +79,8 @@ type EventHandler struct { mapRegEvent map[EventType]map[IEventProcessor]interface{} //向其他事件处理器监听的事件类型 } - type EventProcessor struct { - eventChannel chan IEvent + IEventChannel locker sync.RWMutex mapListenerEvent map[EventType]map[IEventProcessor]int //监听者信息 @@ -115,22 +134,12 @@ func (handler *EventHandler) Init(processor IEventProcessor){ handler.mapRegEvent =map[EventType]map[IEventProcessor]interface{}{} } -func (processor *EventProcessor) SetEventChannel(channelNum int) bool{ - processor.locker.Lock() - defer processor.locker.Unlock() - if processor.eventChannel!=nil { - return false - } - if channelNum == 0 { - channelNum = DefaultEventChannelLen - } - - processor.eventChannel = make(chan IEvent,channelNum) - return true +func (processor *EventProcessor) Init(eventChannel IEventChannel){ + processor.IEventChannel = eventChannel } -func (processor *EventProcessor) addBindEvent(eventType EventType,reciver IEventHandler,callback EventCallBack){ +func (processor *EventProcessor) addBindEvent(eventType EventType, receiver IEventHandler,callback EventCallBack){ processor.locker.Lock() defer processor.locker.Unlock() @@ -138,10 +147,10 @@ func (processor *EventProcessor) addBindEvent(eventType EventType,reciver IEvent processor.mapBindHandlerEvent[eventType] = map[IEventHandler]EventCallBack{} } - processor.mapBindHandlerEvent[eventType][reciver] = callback + processor.mapBindHandlerEvent[eventType][receiver] = callback } -func (processor *EventProcessor) addListen(eventType EventType,reciver IEventHandler){ +func (processor *EventProcessor) addListen(eventType EventType, receiver IEventHandler){ processor.locker.Lock() defer processor.locker.Unlock() @@ -149,41 +158,41 @@ func (processor *EventProcessor) addListen(eventType EventType,reciver IEventHan processor.mapListenerEvent[eventType] = map[IEventProcessor]int{} } - processor.mapListenerEvent[eventType][reciver.GetEventProcessor()] += 1 + processor.mapListenerEvent[eventType][receiver.GetEventProcessor()] += 1 } -func (processor *EventProcessor) removeBindEvent(eventType EventType,reciver IEventHandler){ +func (processor *EventProcessor) removeBindEvent(eventType EventType, receiver IEventHandler){ processor.locker.Lock() defer processor.locker.Unlock() if _,ok := processor.mapBindHandlerEvent[eventType];ok == true{ - delete(processor.mapBindHandlerEvent[eventType],reciver) + delete(processor.mapBindHandlerEvent[eventType], receiver) } } -func (processor *EventProcessor) removeListen(eventType EventType,reciver IEventHandler){ +func (processor *EventProcessor) removeListen(eventType EventType, receiver IEventHandler){ processor.locker.Lock() defer processor.locker.Unlock() if _,ok := processor.mapListenerEvent[eventType];ok == true{ - processor.mapListenerEvent[eventType][reciver.GetEventProcessor()]-=1 - if processor.mapListenerEvent[eventType][reciver.GetEventProcessor()] <= 0 { - delete(processor.mapListenerEvent[eventType],reciver.GetEventProcessor()) + processor.mapListenerEvent[eventType][receiver.GetEventProcessor()]-=1 + if processor.mapListenerEvent[eventType][receiver.GetEventProcessor()] <= 0 { + delete(processor.mapListenerEvent[eventType], receiver.GetEventProcessor()) } } } -func (processor *EventProcessor) RegEventReciverFunc(eventType EventType,reciver IEventHandler,callback EventCallBack){ - //记录reciver自己注册过的事件 - reciver.addRegInfo(eventType, processor) +func (processor *EventProcessor) RegEventReceiverFunc(eventType EventType, receiver IEventHandler,callback EventCallBack){ + //记录receiver自己注册过的事件 + receiver.addRegInfo(eventType, processor) //记录当前所属IEventProcessor注册的回调 - reciver.GetEventProcessor().addBindEvent(eventType,reciver,callback) + receiver.GetEventProcessor().addBindEvent(eventType, receiver,callback) //将注册加入到监听中 - processor.addListen(eventType,reciver) + processor.addListen(eventType, receiver) } -func (processor *EventProcessor) UnRegEventReciverFun(eventType EventType,reciver IEventHandler) { - processor.removeListen(eventType,reciver) - reciver.GetEventProcessor().removeBindEvent(eventType,reciver) - reciver.removeRegInfo(eventType, processor) +func (processor *EventProcessor) UnRegEventReceiverFun(eventType EventType, receiver IEventHandler) { + processor.removeListen(eventType, receiver) + receiver.GetEventProcessor().removeBindEvent(eventType, receiver) + receiver.removeRegInfo(eventType, processor) } func (handler *EventHandler) Destroy(){ @@ -194,23 +203,12 @@ func (handler *EventHandler) Destroy(){ continue } - for eventProcess,_ := range mapEventProcess { - eventProcess.UnRegEventReciverFun(eventTyp, handler) + for eventProcess := range mapEventProcess { + eventProcess.UnRegEventReceiverFun(eventTyp, handler) } } } -func (processor *EventProcessor) GetEventChan() chan IEvent{ - processor.locker.Lock() - defer processor.locker.Unlock() - - if processor.eventChannel == nil { - processor.eventChannel =make(chan IEvent,DefaultEventChannelLen) - } - - return processor.eventChannel -} - func (processor *EventProcessor) EventHandler(ev IEvent) { defer func() { if r := recover(); r != nil { @@ -229,14 +227,6 @@ func (processor *EventProcessor) EventHandler(ev IEvent) { } } -func (processor *EventProcessor) pushEvent(event IEvent){ - if len(processor.eventChannel)>=cap(processor.eventChannel){ - log.SError("event process channel is full,data:",event.GetEventType()) - return - } - - processor.eventChannel<-event -} func (processor *EventProcessor) castEvent(event IEvent){ if processor.mapListenerEvent == nil { @@ -250,7 +240,7 @@ func (processor *EventProcessor) castEvent(event IEvent){ return } - for proc,_ := range eventProcessor { - proc.pushEvent(event) + for proc := range eventProcessor { + proc.PushEvent(event) } } diff --git a/event/eventtype.go b/event/eventtype.go index 1680db5..026d34e 100644 --- a/event/eventtype.go +++ b/event/eventtype.go @@ -4,10 +4,14 @@ type EventType int //大于Sys_Event_User_Define给用户定义 const ( - Sys_Event_Tcp EventType = 1 - Sys_Event_Http_Event EventType = 2 - Sys_Event_WebSocket EventType = 3 - Sys_Event_Rpc_Event EventType = 4 - Sys_Event_User_Define EventType = 1000 + ServiceRpcRequestEvent EventType = -1 + ServiceRpcResponseEvent EventType = -2 + + Sys_Event_Tcp EventType = -3 + Sys_Event_Http_Event EventType = -4 + Sys_Event_WebSocket EventType = -5 + Sys_Event_Rpc_Event EventType = -6 + + Sys_Event_User_Define EventType = 1 ) diff --git a/rpc/client.go b/rpc/client.go index e54f05f..967b978 100644 --- a/rpc/client.go +++ b/rpc/client.go @@ -67,11 +67,11 @@ func (client *Client) Connect(id int,addr string) error { } func (client *Client) startCheckRpcCallTimer(){ - t:=timer.NewTimer(3*time.Second) + t:=timer.NewTimer(5*time.Second) for{ select { - case timer:=<- t.C: - timer.SetupTimer(time.Now()) + case cTimer:=<- t.C: + cTimer.SetupTimer(time.Now()) client.checkRpcCallTimeout() } } @@ -83,7 +83,7 @@ func (client *Client) startCheckRpcCallTimer(){ func (client *Client) makeCallFail(call *Call){ client.removePending(call.Seq) if call.callback!=nil && call.callback.IsValid() { - call.rpcHandler.(*RpcHandler).callResponseCallBack <-call + call.rpcHandler.PushRpcResponse(call) }else{ call.done <- call } @@ -310,7 +310,7 @@ func (client *Client) Run(){ } if v.callback!=nil && v.callback.IsValid() { - v.rpcHandler.(*RpcHandler).callResponseCallBack <-v + v.rpcHandler.PushRpcResponse(v) }else{ v.done <- v } diff --git a/rpc/rpchandler.go b/rpc/rpchandler.go index c817f2c..6fbf66b 100644 --- a/rpc/rpchandler.go +++ b/rpc/rpchandler.go @@ -46,15 +46,19 @@ type RawRpcCallBack interface { CB(data interface{}) } +type IRpcHandlerChannel interface { + PushRpcResponse(call *Call) error + PushRpcRequest(rpcRequest *RpcRequest) error +} + type RpcHandler struct { - callRequest chan *RpcRequest + IRpcHandlerChannel + rpcHandler IRpcHandler mapFunctions map[string]RpcMethodInfo mapRawFunctions map[uint32] RawRpcCallBack funcRpcClient FuncRpcClient funcRpcServer FuncRpcServer - - callResponseCallBack chan *Call //异步返回的回调 } type TriggerRpcEvent func(bConnect bool,clientSeq uint32,nodeId int) @@ -64,17 +68,13 @@ type IRpcListener interface { } type IRpcHandler interface { + IRpcHandlerChannel GetName() string - InitRpcHandler(rpcHandler IRpcHandler,getClientFun FuncRpcClient,getServerFun FuncRpcServer) + InitRpcHandler(rpcHandler IRpcHandler,getClientFun FuncRpcClient,getServerFun FuncRpcServer,rpcHandlerChannel IRpcHandlerChannel) GetRpcHandler() IRpcHandler - PushRequest(callInfo *RpcRequest) error HandlerRpcRequest(request *RpcRequest) HandlerRpcResponseCB(call *Call) - - GetRpcRequestChan() chan *RpcRequest - GetRpcResponseChan() chan *Call CallMethod(ServiceMethod string,param interface{},reply interface{}) error - AsyncCall(serviceMethod string,args interface{},callback interface{}) error Call(serviceMethod string,args interface{},reply interface{}) error Go(serviceMethod string,args interface{}) error @@ -100,9 +100,8 @@ func (handler *RpcHandler) GetRpcHandler() IRpcHandler{ return handler.rpcHandler } -func (handler *RpcHandler) InitRpcHandler(rpcHandler IRpcHandler,getClientFun FuncRpcClient,getServerFun FuncRpcServer) { - handler.callRequest = make(chan *RpcRequest,1000000) - handler.callResponseCallBack = make(chan *Call,1000000) +func (handler *RpcHandler) InitRpcHandler(rpcHandler IRpcHandler,getClientFun FuncRpcClient,getServerFun FuncRpcServer,rpcHandlerChannel IRpcHandlerChannel) { + handler.IRpcHandlerChannel = rpcHandlerChannel handler.mapRawFunctions = make(map[uint32] RawRpcCallBack) handler.rpcHandler = rpcHandler handler.mapFunctions = map[string]RpcMethodInfo{} @@ -114,8 +113,8 @@ func (handler *RpcHandler) InitRpcHandler(rpcHandler IRpcHandler,getClientFun Fu // Is this an exported - upper case - name? func isExported(name string) bool { - rune, _ := utf8.DecodeRuneInString(name) - return unicode.IsUpper(rune) + r, _ := utf8.DecodeRuneInString(name) + return unicode.IsUpper(r) } // Is this type exported or a builtin? @@ -190,23 +189,6 @@ func (handler *RpcHandler) RegisterRpc(rpcHandler IRpcHandler) error { return nil } -func (handler *RpcHandler) PushRequest(req *RpcRequest) error{ - if len(handler.callRequest) >= cap(handler.callRequest){ - return fmt.Errorf("RpcHandler %s Rpc Channel is full", handler.GetName()) - } - - handler.callRequest <- req - return nil -} - -func (handler *RpcHandler) GetRpcRequestChan() chan *RpcRequest { - return handler.callRequest -} - -func (handler *RpcHandler) GetRpcResponseChan() chan *Call{ - return handler.callResponseCallBack -} - func (handler *RpcHandler) HandlerRpcResponseCB(call *Call){ defer func() { if r := recover(); r != nil { diff --git a/rpc/server.go b/rpc/server.go index 9317461..52e4bb9 100644 --- a/rpc/server.go +++ b/rpc/server.go @@ -185,7 +185,7 @@ func (agent *RpcAgent) Run() { continue } - err = rpcHandler.PushRequest(req) + err = rpcHandler.PushRpcRequest(req) if err != nil { rpcError := RpcError(err.Error()) @@ -268,7 +268,6 @@ func (server *Server) selfNodeRpcHandlerGo(processor IRpcProcessor,client *Clien return pCall } } - //req.inputArgs = inputArgs if noReply == false { client.AddPending(pCall) @@ -303,7 +302,7 @@ func (server *Server) selfNodeRpcHandlerGo(processor IRpcProcessor,client *Clien } } - err := rpcHandler.PushRequest(req) + err := rpcHandler.PushRpcRequest(req) if err != nil { ReleaseRpcRequest(req) pCall.Err = err @@ -352,12 +351,12 @@ func (server *Server) selfNodeRpcHandlerAsyncGo(client *Client,callerRpcHandler if Returns!=nil { pCall.Reply = Returns } - pCall.rpcHandler.(*RpcHandler).callResponseCallBack <-pCall + pCall.rpcHandler.PushRpcResponse(pCall) ReleaseRpcRequest(req) } } - err := rpcHandler.PushRequest(req) + err := rpcHandler.PushRpcRequest(req) if err != nil { ReleaseRpcRequest(req) return err diff --git a/service/module.go b/service/module.go index 79920a5..49416e9 100644 --- a/service/module.go +++ b/service/module.go @@ -35,8 +35,6 @@ type IModuleTimer interface { NewTicker(d time.Duration, cb func(*timer.Ticker)) *timer.Ticker } -//1.管理各模块树层关系 -//2.提供定时器常用工具 type Module struct { rpcHandle.IRpcHandler moduleId int64 //模块Id @@ -80,8 +78,9 @@ func (m *Module) OnInit() error{ func (m *Module) AddModule(module IModule) (int64,error){ //没有事件处理器不允许加入其他模块 if m.GetEventProcessor() == nil { - return 0,fmt.Errorf("module %+v is not Event Processor is nil", m.self) + return 0,fmt.Errorf("module %+v Event Processor is nil", m.self) } + pAddModule := module.getBaseModule().(*Module) if pAddModule.GetModuleId()==0 { pAddModule.moduleId = m.NewModuleId() @@ -92,7 +91,7 @@ func (m *Module) AddModule(module IModule) (int64,error){ } _,ok := m.child[module.GetModuleId()] if ok == true { - return 0,fmt.Errorf("Exists module id %d",module.GetModuleId()) + return 0,fmt.Errorf("exists module id %d",module.GetModuleId()) } pAddModule.IRpcHandler = m.IRpcHandler pAddModule.self = module @@ -118,14 +117,14 @@ func (m *Module) ReleaseModule(moduleId int64){ pModule := m.GetModule(moduleId).getBaseModule().(*Module) //释放子孙 - for id,_ := range pModule.child { + for id := range pModule.child { m.ReleaseModule(id) } pModule.GetEventHandler().Destroy() pModule.self.OnRelease() log.SDebug("Release module ", pModule.GetModuleName()) - for pTimer,_ := range pModule.mapActiveTimer { + for pTimer := range pModule.mapActiveTimer { pTimer.Cancel() } diff --git a/service/service.go b/service/service.go index cf154e1..f0b6b8e 100644 --- a/service/service.go +++ b/service/service.go @@ -1,14 +1,16 @@ package service import ( - "fmt" + "errors" "github.com/duanhf2012/origin/event" "github.com/duanhf2012/origin/log" "github.com/duanhf2012/origin/profiler" "github.com/duanhf2012/origin/rpc" + originSync "github.com/duanhf2012/origin/util/sync" "github.com/duanhf2012/origin/util/timer" "reflect" "runtime" + "strconv" "sync" "sync/atomic" ) @@ -33,6 +35,12 @@ type IService interface { GetProfiler() *profiler.Profiler } +// eventPool的内存池,缓存Event +var maxServiceEventChannel = 2000000 +var eventPool = originSync.NewPoolEx(make(chan originSync.IPoolData, maxServiceEventChannel), func() originSync.IPoolData { + return &event.Event{} +}) + type Service struct { Module rpcHandler rpc.RpcHandler //rpc @@ -44,14 +52,23 @@ type Service struct { eventProcessor event.IEventProcessor profiler *profiler.Profiler //性能分析器 rpcEventLister rpc.IRpcListener + chanEvent chan event.IEvent } -type RpcEventData struct{ +// RpcConnEvent Node结点连接事件 +type RpcConnEvent struct{ IsConnect bool NodeId int } -func (rpcEventData *RpcEventData) GetEventType() event.EventType{ +func SetMaxServiceChannel(maxEventChannel int){ + maxServiceEventChannel = maxEventChannel + eventPool = originSync.NewPoolEx(make(chan originSync.IPoolData, maxServiceEventChannel), func() originSync.IPoolData { + return &event.Event{} + }) +} + +func (rpcEventData *RpcConnEvent) GetEventType() event.EventType{ return event.Sys_Event_Rpc_Event } @@ -70,8 +87,8 @@ func (s *Service) OpenProfiler() { func (s *Service) Init(iService IService,getClientFun rpc.FuncRpcClient,getServerFun rpc.FuncRpcServer,serviceCfg interface{}) { s.dispatcher =timer.NewDispatcher(timerDispatcherLen) - - s.rpcHandler.InitRpcHandler(iService.(rpc.IRpcHandler),getClientFun,getServerFun) + s.chanEvent = make(chan event.IEvent,maxServiceEventChannel) + s.rpcHandler.InitRpcHandler(iService.(rpc.IRpcHandler),getClientFun,getServerFun,iService.(rpc.IRpcHandlerChannel)) s.IRpcHandler = &s.rpcHandler s.self = iService.(IModule) //初始化祖先 @@ -81,25 +98,14 @@ func (s *Service) Init(iService IService,getClientFun rpc.FuncRpcClient,getServe s.serviceCfg = serviceCfg s.goroutineNum = 1 s.eventProcessor = event.NewEventProcessor() + s.eventProcessor.Init(s) s.eventHandler = event.NewEventHandler() s.eventHandler.Init(s.eventProcessor) } -func (s *Service) SetGoRoutineNum(goroutineNum int32) bool { - //已经开始状态不允许修改协程数量,打开性能分析器不允许开多线程 - if s.startStatus == true || s.profiler!=nil { - log.SError("open profiler mode is not allowed to set Multi-coroutine.") - return false - } - - s.goroutineNum = goroutineNum - return true -} func (s *Service) Start() { s.startStatus = true - s.eventProcessor.SetEventChannel(0) - for i:=int32(0);i< s.goroutineNum;i++{ s.wg.Add(1) go func(){ @@ -114,41 +120,64 @@ func (s *Service) Run() { var bStop = false s.self.(IService).OnStart() for{ - rpcRequestChan := s.GetRpcRequestChan() - rpcResponseCallBack := s.GetRpcResponseChan() - eventChan := s.eventProcessor.GetEventChan() var analyzer *profiler.Analyzer select { case <- closeSig: bStop = true - case rpcRequest :=<- rpcRequestChan: - if s.profiler!=nil { - analyzer = s.profiler.Push("[Req]"+rpcRequest.RpcRequestData.GetServiceMethod()) + case ev := <- s.chanEvent: + switch ev.GetEventType() { + case event.ServiceRpcRequestEvent: + cEvent,ok := ev.(*event.Event) + if ok == false { + log.SError("Type event conversion error") + break + } + rpcRequest,ok := cEvent.Data.(*rpc.RpcRequest) + if ok == false { + log.SError("Type *rpc.RpcRequest conversion error") + break + } + if s.profiler!=nil { + analyzer = s.profiler.Push("[Req]"+rpcRequest.RpcRequestData.GetServiceMethod()) + } + + s.GetRpcHandler().HandlerRpcRequest(rpcRequest) + if analyzer!=nil { + analyzer.Pop() + analyzer = nil + } + eventPool.Put(cEvent) + case event.ServiceRpcResponseEvent: + cEvent,ok := ev.(*event.Event) + if ok == false { + log.SError("Type event conversion error") + break + } + rpcResponseCB,ok := cEvent.Data.(*rpc.Call) + if ok == false { + log.SError("Type *rpc.Call conversion error") + break + } + if s.profiler!=nil { + analyzer = s.profiler.Push("[Res]" + rpcResponseCB.ServiceMethod) + } + s.GetRpcHandler().HandlerRpcResponseCB(rpcResponseCB) + if analyzer!=nil { + analyzer.Pop() + analyzer = nil + } + eventPool.Put(cEvent) + default: + if s.profiler!=nil { + analyzer = s.profiler.Push("[SEvent]"+strconv.Itoa(int(ev.GetEventType()))) + } + s.eventProcessor.EventHandler(ev) + if analyzer!=nil { + analyzer.Pop() + analyzer = nil + } } - s.GetRpcHandler().HandlerRpcRequest(rpcRequest) - if analyzer!=nil { - analyzer.Pop() - analyzer = nil - } - case rpcResponseCB := <-rpcResponseCallBack: - if s.profiler!=nil { - analyzer = s.profiler.Push("[Res]" + rpcResponseCB.ServiceMethod) - } - s.GetRpcHandler().HandlerRpcResponseCB(rpcResponseCB) - if analyzer!=nil { - analyzer.Pop() - analyzer = nil - } - case ev := <- eventChan: - if s.profiler!=nil { - analyzer = s.profiler.Push(fmt.Sprintf("[Event]%d", int(ev.GetEventType()))) - } - s.eventProcessor.EventHandler(ev) - if analyzer!=nil { - analyzer.Pop() - analyzer = nil - } case t := <- s.dispatcher.ChanTimer: if s.profiler != nil { analyzer = s.profiler.Push("[timer]"+t.GetName()) @@ -211,11 +240,11 @@ func (s *Service) GetProfiler() *profiler.Profiler{ } func (s *Service) RegEventReceiverFunc(eventType event.EventType, receiver event.IEventHandler,callback event.EventCallBack){ - s.eventProcessor.RegEventReciverFunc(eventType, receiver,callback) + s.eventProcessor.RegEventReceiverFunc(eventType, receiver,callback) } func (s *Service) UnRegEventReceiverFunc(eventType event.EventType, receiver event.IEventHandler){ - s.eventProcessor.UnRegEventReciverFun(eventType, receiver) + s.eventProcessor.UnRegEventReceiverFun(eventType, receiver) } func (s *Service) IsSingleCoroutine() bool { @@ -230,7 +259,7 @@ func (s *Service) OnStart(){ } func (s *Service) OnRpcEvent(ev event.IEvent){ - event := ev.(*RpcEventData) + event := ev.(*RpcConnEvent) if event.IsConnect { s.rpcEventLister.OnNodeConnected(event.NodeId) }else{ @@ -249,3 +278,46 @@ func (s *Service) UnRegRpcListener(rpcLister rpc.IRpcListener) { RegRpcEventFun(s.GetName()) } + +func (s *Service) PushRpcRequest(rpcRequest *rpc.RpcRequest) error{ + ev := eventPool.Get().(*event.Event) + ev.Type = event.ServiceRpcRequestEvent + ev.Data = rpcRequest + + return s.pushEvent(ev) +} + +func (s *Service) PushRpcResponse(call *rpc.Call) error{ + ev := eventPool.Get().(*event.Event) + ev.Type = event.ServiceRpcResponseEvent + ev.Data = call + + return s.pushEvent(ev) +} + +func (s *Service) PushEvent(ev event.IEvent) error{ + return s.pushEvent(ev) +} + +func (s *Service) pushEvent(ev event.IEvent) error{ + if len(s.chanEvent) >= maxServiceEventChannel { + err := errors.New("The event channel in the service is full") + log.SError(err.Error()) + return err + } + + s.chanEvent <- ev + return nil +} + + +func (s *Service) SetGoRoutineNum(goroutineNum int32) bool { + //已经开始状态不允许修改协程数量,打开性能分析器不允许开多线程 + if s.startStatus == true || s.profiler!=nil { + log.SError("open profiler mode is not allowed to set Multi-coroutine.") + return false + } + + s.goroutineNum = goroutineNum + return true +}