合并Service中的Event、RpcResponse、RpcRequest管道

This commit is contained in:
duanhf2012
2021-07-20 14:35:30 +08:00
parent c9f47d796c
commit a6ea25bba0
8 changed files with 215 additions and 169 deletions

View File

@@ -370,7 +370,7 @@ func (cls *Cluster) triggerRpcEvent (bConnect bool,clientSeq uint32,nodeId int)
continue continue
} }
var eventData service.RpcEventData var eventData service.RpcConnEvent
eventData.IsConnect = bConnect eventData.IsConnect = bConnect
eventData.NodeId = nodeId eventData.NodeId = nodeId
ser.(service.IModule).NotifyEvent(&eventData) ser.(service.IModule).NotifyEvent(&eventData)

View File

@@ -6,7 +6,6 @@ import (
"sync" "sync"
) )
const DefaultEventChannelLen = 1000000
//事件接受器 //事件接受器
type EventCallBack func(event IEvent) type EventCallBack func(event IEvent)
@@ -18,6 +17,24 @@ type IEvent interface {
type Event struct { type Event struct {
Type EventType Type EventType
Data interface{} Data interface{}
ref bool
}
var emptyEvent Event
func (e *Event) Reset() {
*e = emptyEvent
}
func (e *Event) IsRef() bool {
return e.ref
}
func (e *Event) Ref() {
e.ref = true
}
func (e *Event) UnRef() {
e.ref = false
} }
func (e *Event) GetEventType() EventType{ func (e *Event) GetEventType() EventType{
@@ -34,20 +51,23 @@ type IEventHandler interface {
removeRegInfo(eventType EventType,eventProcessor IEventProcessor) removeRegInfo(eventType EventType,eventProcessor IEventProcessor)
} }
type IEventChannel interface {
PushEvent(ev IEvent) error
}
type IEventProcessor interface { type IEventProcessor interface {
IEventChannel
Init(eventChannel IEventChannel)
EventHandler(ev IEvent) EventHandler(ev IEvent)
//同一个IEventHandler只能接受一个EventType类型回调 RegEventReceiverFunc(eventType EventType, receiver IEventHandler,callback EventCallBack)
RegEventReciverFunc(eventType EventType,reciver IEventHandler,callback EventCallBack) UnRegEventReceiverFun(eventType EventType, receiver IEventHandler)
UnRegEventReciverFun(eventType EventType,reciver IEventHandler)
SetEventChannel(channelNum int) bool
castEvent(event IEvent) //广播事件 castEvent(event IEvent) //广播事件
pushEvent(event IEvent) addBindEvent(eventType EventType, receiver IEventHandler,callback EventCallBack)
addBindEvent(eventType EventType,reciver IEventHandler,callback EventCallBack) addListen(eventType EventType, receiver IEventHandler)
addListen(eventType EventType,reciver IEventHandler) removeBindEvent(eventType EventType, receiver IEventHandler)
removeBindEvent(eventType EventType,reciver IEventHandler) removeListen(eventType EventType, receiver IEventHandler)
removeListen(eventType EventType,reciver IEventHandler)
GetEventChan() chan IEvent
} }
type EventHandler struct { type EventHandler struct {
@@ -59,9 +79,8 @@ type EventHandler struct {
mapRegEvent map[EventType]map[IEventProcessor]interface{} //向其他事件处理器监听的事件类型 mapRegEvent map[EventType]map[IEventProcessor]interface{} //向其他事件处理器监听的事件类型
} }
type EventProcessor struct { type EventProcessor struct {
eventChannel chan IEvent IEventChannel
locker sync.RWMutex locker sync.RWMutex
mapListenerEvent map[EventType]map[IEventProcessor]int //监听者信息 mapListenerEvent map[EventType]map[IEventProcessor]int //监听者信息
@@ -115,22 +134,12 @@ func (handler *EventHandler) Init(processor IEventProcessor){
handler.mapRegEvent =map[EventType]map[IEventProcessor]interface{}{} handler.mapRegEvent =map[EventType]map[IEventProcessor]interface{}{}
} }
func (processor *EventProcessor) SetEventChannel(channelNum int) bool{
processor.locker.Lock()
defer processor.locker.Unlock()
if processor.eventChannel!=nil {
return false
}
if channelNum == 0 { func (processor *EventProcessor) Init(eventChannel IEventChannel){
channelNum = DefaultEventChannelLen processor.IEventChannel = eventChannel
}
processor.eventChannel = make(chan IEvent,channelNum)
return true
} }
func (processor *EventProcessor) addBindEvent(eventType EventType,reciver IEventHandler,callback EventCallBack){ func (processor *EventProcessor) addBindEvent(eventType EventType, receiver IEventHandler,callback EventCallBack){
processor.locker.Lock() processor.locker.Lock()
defer processor.locker.Unlock() defer processor.locker.Unlock()
@@ -138,10 +147,10 @@ func (processor *EventProcessor) addBindEvent(eventType EventType,reciver IEvent
processor.mapBindHandlerEvent[eventType] = map[IEventHandler]EventCallBack{} processor.mapBindHandlerEvent[eventType] = map[IEventHandler]EventCallBack{}
} }
processor.mapBindHandlerEvent[eventType][reciver] = callback processor.mapBindHandlerEvent[eventType][receiver] = callback
} }
func (processor *EventProcessor) addListen(eventType EventType,reciver IEventHandler){ func (processor *EventProcessor) addListen(eventType EventType, receiver IEventHandler){
processor.locker.Lock() processor.locker.Lock()
defer processor.locker.Unlock() defer processor.locker.Unlock()
@@ -149,41 +158,41 @@ func (processor *EventProcessor) addListen(eventType EventType,reciver IEventHan
processor.mapListenerEvent[eventType] = map[IEventProcessor]int{} processor.mapListenerEvent[eventType] = map[IEventProcessor]int{}
} }
processor.mapListenerEvent[eventType][reciver.GetEventProcessor()] += 1 processor.mapListenerEvent[eventType][receiver.GetEventProcessor()] += 1
} }
func (processor *EventProcessor) removeBindEvent(eventType EventType,reciver IEventHandler){ func (processor *EventProcessor) removeBindEvent(eventType EventType, receiver IEventHandler){
processor.locker.Lock() processor.locker.Lock()
defer processor.locker.Unlock() defer processor.locker.Unlock()
if _,ok := processor.mapBindHandlerEvent[eventType];ok == true{ if _,ok := processor.mapBindHandlerEvent[eventType];ok == true{
delete(processor.mapBindHandlerEvent[eventType],reciver) delete(processor.mapBindHandlerEvent[eventType], receiver)
} }
} }
func (processor *EventProcessor) removeListen(eventType EventType,reciver IEventHandler){ func (processor *EventProcessor) removeListen(eventType EventType, receiver IEventHandler){
processor.locker.Lock() processor.locker.Lock()
defer processor.locker.Unlock() defer processor.locker.Unlock()
if _,ok := processor.mapListenerEvent[eventType];ok == true{ if _,ok := processor.mapListenerEvent[eventType];ok == true{
processor.mapListenerEvent[eventType][reciver.GetEventProcessor()]-=1 processor.mapListenerEvent[eventType][receiver.GetEventProcessor()]-=1
if processor.mapListenerEvent[eventType][reciver.GetEventProcessor()] <= 0 { if processor.mapListenerEvent[eventType][receiver.GetEventProcessor()] <= 0 {
delete(processor.mapListenerEvent[eventType],reciver.GetEventProcessor()) delete(processor.mapListenerEvent[eventType], receiver.GetEventProcessor())
} }
} }
} }
func (processor *EventProcessor) RegEventReciverFunc(eventType EventType,reciver IEventHandler,callback EventCallBack){ func (processor *EventProcessor) RegEventReceiverFunc(eventType EventType, receiver IEventHandler,callback EventCallBack){
//记录reciver自己注册过的事件 //记录receiver自己注册过的事件
reciver.addRegInfo(eventType, processor) receiver.addRegInfo(eventType, processor)
//记录当前所属IEventProcessor注册的回调 //记录当前所属IEventProcessor注册的回调
reciver.GetEventProcessor().addBindEvent(eventType,reciver,callback) receiver.GetEventProcessor().addBindEvent(eventType, receiver,callback)
//将注册加入到监听中 //将注册加入到监听中
processor.addListen(eventType,reciver) processor.addListen(eventType, receiver)
} }
func (processor *EventProcessor) UnRegEventReciverFun(eventType EventType,reciver IEventHandler) { func (processor *EventProcessor) UnRegEventReceiverFun(eventType EventType, receiver IEventHandler) {
processor.removeListen(eventType,reciver) processor.removeListen(eventType, receiver)
reciver.GetEventProcessor().removeBindEvent(eventType,reciver) receiver.GetEventProcessor().removeBindEvent(eventType, receiver)
reciver.removeRegInfo(eventType, processor) receiver.removeRegInfo(eventType, processor)
} }
func (handler *EventHandler) Destroy(){ func (handler *EventHandler) Destroy(){
@@ -194,23 +203,12 @@ func (handler *EventHandler) Destroy(){
continue continue
} }
for eventProcess,_ := range mapEventProcess { for eventProcess := range mapEventProcess {
eventProcess.UnRegEventReciverFun(eventTyp, handler) eventProcess.UnRegEventReceiverFun(eventTyp, handler)
} }
} }
} }
func (processor *EventProcessor) GetEventChan() chan IEvent{
processor.locker.Lock()
defer processor.locker.Unlock()
if processor.eventChannel == nil {
processor.eventChannel =make(chan IEvent,DefaultEventChannelLen)
}
return processor.eventChannel
}
func (processor *EventProcessor) EventHandler(ev IEvent) { func (processor *EventProcessor) EventHandler(ev IEvent) {
defer func() { defer func() {
if r := recover(); r != nil { if r := recover(); r != nil {
@@ -229,14 +227,6 @@ func (processor *EventProcessor) EventHandler(ev IEvent) {
} }
} }
func (processor *EventProcessor) pushEvent(event IEvent){
if len(processor.eventChannel)>=cap(processor.eventChannel){
log.SError("event process channel is full,data:",event.GetEventType())
return
}
processor.eventChannel<-event
}
func (processor *EventProcessor) castEvent(event IEvent){ func (processor *EventProcessor) castEvent(event IEvent){
if processor.mapListenerEvent == nil { if processor.mapListenerEvent == nil {
@@ -250,7 +240,7 @@ func (processor *EventProcessor) castEvent(event IEvent){
return return
} }
for proc,_ := range eventProcessor { for proc := range eventProcessor {
proc.pushEvent(event) proc.PushEvent(event)
} }
} }

View File

@@ -4,10 +4,14 @@ type EventType int
//大于Sys_Event_User_Define给用户定义 //大于Sys_Event_User_Define给用户定义
const ( const (
Sys_Event_Tcp EventType = 1 ServiceRpcRequestEvent EventType = -1
Sys_Event_Http_Event EventType = 2 ServiceRpcResponseEvent EventType = -2
Sys_Event_WebSocket EventType = 3
Sys_Event_Rpc_Event EventType = 4 Sys_Event_Tcp EventType = -3
Sys_Event_User_Define EventType = 1000 Sys_Event_Http_Event EventType = -4
Sys_Event_WebSocket EventType = -5
Sys_Event_Rpc_Event EventType = -6
Sys_Event_User_Define EventType = 1
) )

View File

@@ -67,11 +67,11 @@ func (client *Client) Connect(id int,addr string) error {
} }
func (client *Client) startCheckRpcCallTimer(){ func (client *Client) startCheckRpcCallTimer(){
t:=timer.NewTimer(3*time.Second) t:=timer.NewTimer(5*time.Second)
for{ for{
select { select {
case timer:=<- t.C: case cTimer:=<- t.C:
timer.SetupTimer(time.Now()) cTimer.SetupTimer(time.Now())
client.checkRpcCallTimeout() client.checkRpcCallTimeout()
} }
} }
@@ -83,7 +83,7 @@ func (client *Client) startCheckRpcCallTimer(){
func (client *Client) makeCallFail(call *Call){ func (client *Client) makeCallFail(call *Call){
client.removePending(call.Seq) client.removePending(call.Seq)
if call.callback!=nil && call.callback.IsValid() { if call.callback!=nil && call.callback.IsValid() {
call.rpcHandler.(*RpcHandler).callResponseCallBack <-call call.rpcHandler.PushRpcResponse(call)
}else{ }else{
call.done <- call call.done <- call
} }
@@ -310,7 +310,7 @@ func (client *Client) Run(){
} }
if v.callback!=nil && v.callback.IsValid() { if v.callback!=nil && v.callback.IsValid() {
v.rpcHandler.(*RpcHandler).callResponseCallBack <-v v.rpcHandler.PushRpcResponse(v)
}else{ }else{
v.done <- v v.done <- v
} }

View File

@@ -46,15 +46,19 @@ type RawRpcCallBack interface {
CB(data interface{}) CB(data interface{})
} }
type IRpcHandlerChannel interface {
PushRpcResponse(call *Call) error
PushRpcRequest(rpcRequest *RpcRequest) error
}
type RpcHandler struct { type RpcHandler struct {
callRequest chan *RpcRequest IRpcHandlerChannel
rpcHandler IRpcHandler rpcHandler IRpcHandler
mapFunctions map[string]RpcMethodInfo mapFunctions map[string]RpcMethodInfo
mapRawFunctions map[uint32] RawRpcCallBack mapRawFunctions map[uint32] RawRpcCallBack
funcRpcClient FuncRpcClient funcRpcClient FuncRpcClient
funcRpcServer FuncRpcServer funcRpcServer FuncRpcServer
callResponseCallBack chan *Call //异步返回的回调
} }
type TriggerRpcEvent func(bConnect bool,clientSeq uint32,nodeId int) type TriggerRpcEvent func(bConnect bool,clientSeq uint32,nodeId int)
@@ -64,17 +68,13 @@ type IRpcListener interface {
} }
type IRpcHandler interface { type IRpcHandler interface {
IRpcHandlerChannel
GetName() string GetName() string
InitRpcHandler(rpcHandler IRpcHandler,getClientFun FuncRpcClient,getServerFun FuncRpcServer) InitRpcHandler(rpcHandler IRpcHandler,getClientFun FuncRpcClient,getServerFun FuncRpcServer,rpcHandlerChannel IRpcHandlerChannel)
GetRpcHandler() IRpcHandler GetRpcHandler() IRpcHandler
PushRequest(callInfo *RpcRequest) error
HandlerRpcRequest(request *RpcRequest) HandlerRpcRequest(request *RpcRequest)
HandlerRpcResponseCB(call *Call) HandlerRpcResponseCB(call *Call)
GetRpcRequestChan() chan *RpcRequest
GetRpcResponseChan() chan *Call
CallMethod(ServiceMethod string,param interface{},reply interface{}) error CallMethod(ServiceMethod string,param interface{},reply interface{}) error
AsyncCall(serviceMethod string,args interface{},callback interface{}) error AsyncCall(serviceMethod string,args interface{},callback interface{}) error
Call(serviceMethod string,args interface{},reply interface{}) error Call(serviceMethod string,args interface{},reply interface{}) error
Go(serviceMethod string,args interface{}) error Go(serviceMethod string,args interface{}) error
@@ -100,9 +100,8 @@ func (handler *RpcHandler) GetRpcHandler() IRpcHandler{
return handler.rpcHandler return handler.rpcHandler
} }
func (handler *RpcHandler) InitRpcHandler(rpcHandler IRpcHandler,getClientFun FuncRpcClient,getServerFun FuncRpcServer) { func (handler *RpcHandler) InitRpcHandler(rpcHandler IRpcHandler,getClientFun FuncRpcClient,getServerFun FuncRpcServer,rpcHandlerChannel IRpcHandlerChannel) {
handler.callRequest = make(chan *RpcRequest,1000000) handler.IRpcHandlerChannel = rpcHandlerChannel
handler.callResponseCallBack = make(chan *Call,1000000)
handler.mapRawFunctions = make(map[uint32] RawRpcCallBack) handler.mapRawFunctions = make(map[uint32] RawRpcCallBack)
handler.rpcHandler = rpcHandler handler.rpcHandler = rpcHandler
handler.mapFunctions = map[string]RpcMethodInfo{} handler.mapFunctions = map[string]RpcMethodInfo{}
@@ -114,8 +113,8 @@ func (handler *RpcHandler) InitRpcHandler(rpcHandler IRpcHandler,getClientFun Fu
// Is this an exported - upper case - name? // Is this an exported - upper case - name?
func isExported(name string) bool { func isExported(name string) bool {
rune, _ := utf8.DecodeRuneInString(name) r, _ := utf8.DecodeRuneInString(name)
return unicode.IsUpper(rune) return unicode.IsUpper(r)
} }
// Is this type exported or a builtin? // Is this type exported or a builtin?
@@ -190,23 +189,6 @@ func (handler *RpcHandler) RegisterRpc(rpcHandler IRpcHandler) error {
return nil return nil
} }
func (handler *RpcHandler) PushRequest(req *RpcRequest) error{
if len(handler.callRequest) >= cap(handler.callRequest){
return fmt.Errorf("RpcHandler %s Rpc Channel is full", handler.GetName())
}
handler.callRequest <- req
return nil
}
func (handler *RpcHandler) GetRpcRequestChan() chan *RpcRequest {
return handler.callRequest
}
func (handler *RpcHandler) GetRpcResponseChan() chan *Call{
return handler.callResponseCallBack
}
func (handler *RpcHandler) HandlerRpcResponseCB(call *Call){ func (handler *RpcHandler) HandlerRpcResponseCB(call *Call){
defer func() { defer func() {
if r := recover(); r != nil { if r := recover(); r != nil {

View File

@@ -185,7 +185,7 @@ func (agent *RpcAgent) Run() {
continue continue
} }
err = rpcHandler.PushRequest(req) err = rpcHandler.PushRpcRequest(req)
if err != nil { if err != nil {
rpcError := RpcError(err.Error()) rpcError := RpcError(err.Error())
@@ -268,7 +268,6 @@ func (server *Server) selfNodeRpcHandlerGo(processor IRpcProcessor,client *Clien
return pCall return pCall
} }
} }
//req.inputArgs = inputArgs
if noReply == false { if noReply == false {
client.AddPending(pCall) client.AddPending(pCall)
@@ -303,7 +302,7 @@ func (server *Server) selfNodeRpcHandlerGo(processor IRpcProcessor,client *Clien
} }
} }
err := rpcHandler.PushRequest(req) err := rpcHandler.PushRpcRequest(req)
if err != nil { if err != nil {
ReleaseRpcRequest(req) ReleaseRpcRequest(req)
pCall.Err = err pCall.Err = err
@@ -352,12 +351,12 @@ func (server *Server) selfNodeRpcHandlerAsyncGo(client *Client,callerRpcHandler
if Returns!=nil { if Returns!=nil {
pCall.Reply = Returns pCall.Reply = Returns
} }
pCall.rpcHandler.(*RpcHandler).callResponseCallBack <-pCall pCall.rpcHandler.PushRpcResponse(pCall)
ReleaseRpcRequest(req) ReleaseRpcRequest(req)
} }
} }
err := rpcHandler.PushRequest(req) err := rpcHandler.PushRpcRequest(req)
if err != nil { if err != nil {
ReleaseRpcRequest(req) ReleaseRpcRequest(req)
return err return err

View File

@@ -35,8 +35,6 @@ type IModuleTimer interface {
NewTicker(d time.Duration, cb func(*timer.Ticker)) *timer.Ticker NewTicker(d time.Duration, cb func(*timer.Ticker)) *timer.Ticker
} }
//1.管理各模块树层关系
//2.提供定时器常用工具
type Module struct { type Module struct {
rpcHandle.IRpcHandler rpcHandle.IRpcHandler
moduleId int64 //模块Id moduleId int64 //模块Id
@@ -80,8 +78,9 @@ func (m *Module) OnInit() error{
func (m *Module) AddModule(module IModule) (int64,error){ func (m *Module) AddModule(module IModule) (int64,error){
//没有事件处理器不允许加入其他模块 //没有事件处理器不允许加入其他模块
if m.GetEventProcessor() == nil { if m.GetEventProcessor() == nil {
return 0,fmt.Errorf("module %+v is not Event Processor is nil", m.self) return 0,fmt.Errorf("module %+v Event Processor is nil", m.self)
} }
pAddModule := module.getBaseModule().(*Module) pAddModule := module.getBaseModule().(*Module)
if pAddModule.GetModuleId()==0 { if pAddModule.GetModuleId()==0 {
pAddModule.moduleId = m.NewModuleId() pAddModule.moduleId = m.NewModuleId()
@@ -92,7 +91,7 @@ func (m *Module) AddModule(module IModule) (int64,error){
} }
_,ok := m.child[module.GetModuleId()] _,ok := m.child[module.GetModuleId()]
if ok == true { if ok == true {
return 0,fmt.Errorf("Exists module id %d",module.GetModuleId()) return 0,fmt.Errorf("exists module id %d",module.GetModuleId())
} }
pAddModule.IRpcHandler = m.IRpcHandler pAddModule.IRpcHandler = m.IRpcHandler
pAddModule.self = module pAddModule.self = module
@@ -118,14 +117,14 @@ func (m *Module) ReleaseModule(moduleId int64){
pModule := m.GetModule(moduleId).getBaseModule().(*Module) pModule := m.GetModule(moduleId).getBaseModule().(*Module)
//释放子孙 //释放子孙
for id,_ := range pModule.child { for id := range pModule.child {
m.ReleaseModule(id) m.ReleaseModule(id)
} }
pModule.GetEventHandler().Destroy() pModule.GetEventHandler().Destroy()
pModule.self.OnRelease() pModule.self.OnRelease()
log.SDebug("Release module ", pModule.GetModuleName()) log.SDebug("Release module ", pModule.GetModuleName())
for pTimer,_ := range pModule.mapActiveTimer { for pTimer := range pModule.mapActiveTimer {
pTimer.Cancel() pTimer.Cancel()
} }

View File

@@ -1,14 +1,16 @@
package service package service
import ( import (
"fmt" "errors"
"github.com/duanhf2012/origin/event" "github.com/duanhf2012/origin/event"
"github.com/duanhf2012/origin/log" "github.com/duanhf2012/origin/log"
"github.com/duanhf2012/origin/profiler" "github.com/duanhf2012/origin/profiler"
"github.com/duanhf2012/origin/rpc" "github.com/duanhf2012/origin/rpc"
originSync "github.com/duanhf2012/origin/util/sync"
"github.com/duanhf2012/origin/util/timer" "github.com/duanhf2012/origin/util/timer"
"reflect" "reflect"
"runtime" "runtime"
"strconv"
"sync" "sync"
"sync/atomic" "sync/atomic"
) )
@@ -33,6 +35,12 @@ type IService interface {
GetProfiler() *profiler.Profiler GetProfiler() *profiler.Profiler
} }
// eventPool的内存池,缓存Event
var maxServiceEventChannel = 2000000
var eventPool = originSync.NewPoolEx(make(chan originSync.IPoolData, maxServiceEventChannel), func() originSync.IPoolData {
return &event.Event{}
})
type Service struct { type Service struct {
Module Module
rpcHandler rpc.RpcHandler //rpc rpcHandler rpc.RpcHandler //rpc
@@ -44,14 +52,23 @@ type Service struct {
eventProcessor event.IEventProcessor eventProcessor event.IEventProcessor
profiler *profiler.Profiler //性能分析器 profiler *profiler.Profiler //性能分析器
rpcEventLister rpc.IRpcListener rpcEventLister rpc.IRpcListener
chanEvent chan event.IEvent
} }
type RpcEventData struct{ // RpcConnEvent Node结点连接事件
type RpcConnEvent struct{
IsConnect bool IsConnect bool
NodeId int NodeId int
} }
func (rpcEventData *RpcEventData) GetEventType() event.EventType{ func SetMaxServiceChannel(maxEventChannel int){
maxServiceEventChannel = maxEventChannel
eventPool = originSync.NewPoolEx(make(chan originSync.IPoolData, maxServiceEventChannel), func() originSync.IPoolData {
return &event.Event{}
})
}
func (rpcEventData *RpcConnEvent) GetEventType() event.EventType{
return event.Sys_Event_Rpc_Event return event.Sys_Event_Rpc_Event
} }
@@ -70,8 +87,8 @@ func (s *Service) OpenProfiler() {
func (s *Service) Init(iService IService,getClientFun rpc.FuncRpcClient,getServerFun rpc.FuncRpcServer,serviceCfg interface{}) { func (s *Service) Init(iService IService,getClientFun rpc.FuncRpcClient,getServerFun rpc.FuncRpcServer,serviceCfg interface{}) {
s.dispatcher =timer.NewDispatcher(timerDispatcherLen) s.dispatcher =timer.NewDispatcher(timerDispatcherLen)
s.chanEvent = make(chan event.IEvent,maxServiceEventChannel)
s.rpcHandler.InitRpcHandler(iService.(rpc.IRpcHandler),getClientFun,getServerFun) s.rpcHandler.InitRpcHandler(iService.(rpc.IRpcHandler),getClientFun,getServerFun,iService.(rpc.IRpcHandlerChannel))
s.IRpcHandler = &s.rpcHandler s.IRpcHandler = &s.rpcHandler
s.self = iService.(IModule) s.self = iService.(IModule)
//初始化祖先 //初始化祖先
@@ -81,25 +98,14 @@ func (s *Service) Init(iService IService,getClientFun rpc.FuncRpcClient,getServe
s.serviceCfg = serviceCfg s.serviceCfg = serviceCfg
s.goroutineNum = 1 s.goroutineNum = 1
s.eventProcessor = event.NewEventProcessor() s.eventProcessor = event.NewEventProcessor()
s.eventProcessor.Init(s)
s.eventHandler = event.NewEventHandler() s.eventHandler = event.NewEventHandler()
s.eventHandler.Init(s.eventProcessor) s.eventHandler.Init(s.eventProcessor)
} }
func (s *Service) SetGoRoutineNum(goroutineNum int32) bool {
//已经开始状态不允许修改协程数量,打开性能分析器不允许开多线程
if s.startStatus == true || s.profiler!=nil {
log.SError("open profiler mode is not allowed to set Multi-coroutine.")
return false
}
s.goroutineNum = goroutineNum
return true
}
func (s *Service) Start() { func (s *Service) Start() {
s.startStatus = true s.startStatus = true
s.eventProcessor.SetEventChannel(0)
for i:=int32(0);i< s.goroutineNum;i++{ for i:=int32(0);i< s.goroutineNum;i++{
s.wg.Add(1) s.wg.Add(1)
go func(){ go func(){
@@ -114,41 +120,64 @@ func (s *Service) Run() {
var bStop = false var bStop = false
s.self.(IService).OnStart() s.self.(IService).OnStart()
for{ for{
rpcRequestChan := s.GetRpcRequestChan()
rpcResponseCallBack := s.GetRpcResponseChan()
eventChan := s.eventProcessor.GetEventChan()
var analyzer *profiler.Analyzer var analyzer *profiler.Analyzer
select { select {
case <- closeSig: case <- closeSig:
bStop = true bStop = true
case rpcRequest :=<- rpcRequestChan: case ev := <- s.chanEvent:
if s.profiler!=nil { switch ev.GetEventType() {
analyzer = s.profiler.Push("[Req]"+rpcRequest.RpcRequestData.GetServiceMethod()) case event.ServiceRpcRequestEvent:
cEvent,ok := ev.(*event.Event)
if ok == false {
log.SError("Type event conversion error")
break
}
rpcRequest,ok := cEvent.Data.(*rpc.RpcRequest)
if ok == false {
log.SError("Type *rpc.RpcRequest conversion error")
break
}
if s.profiler!=nil {
analyzer = s.profiler.Push("[Req]"+rpcRequest.RpcRequestData.GetServiceMethod())
}
s.GetRpcHandler().HandlerRpcRequest(rpcRequest)
if analyzer!=nil {
analyzer.Pop()
analyzer = nil
}
eventPool.Put(cEvent)
case event.ServiceRpcResponseEvent:
cEvent,ok := ev.(*event.Event)
if ok == false {
log.SError("Type event conversion error")
break
}
rpcResponseCB,ok := cEvent.Data.(*rpc.Call)
if ok == false {
log.SError("Type *rpc.Call conversion error")
break
}
if s.profiler!=nil {
analyzer = s.profiler.Push("[Res]" + rpcResponseCB.ServiceMethod)
}
s.GetRpcHandler().HandlerRpcResponseCB(rpcResponseCB)
if analyzer!=nil {
analyzer.Pop()
analyzer = nil
}
eventPool.Put(cEvent)
default:
if s.profiler!=nil {
analyzer = s.profiler.Push("[SEvent]"+strconv.Itoa(int(ev.GetEventType())))
}
s.eventProcessor.EventHandler(ev)
if analyzer!=nil {
analyzer.Pop()
analyzer = nil
}
} }
s.GetRpcHandler().HandlerRpcRequest(rpcRequest)
if analyzer!=nil {
analyzer.Pop()
analyzer = nil
}
case rpcResponseCB := <-rpcResponseCallBack:
if s.profiler!=nil {
analyzer = s.profiler.Push("[Res]" + rpcResponseCB.ServiceMethod)
}
s.GetRpcHandler().HandlerRpcResponseCB(rpcResponseCB)
if analyzer!=nil {
analyzer.Pop()
analyzer = nil
}
case ev := <- eventChan:
if s.profiler!=nil {
analyzer = s.profiler.Push(fmt.Sprintf("[Event]%d", int(ev.GetEventType())))
}
s.eventProcessor.EventHandler(ev)
if analyzer!=nil {
analyzer.Pop()
analyzer = nil
}
case t := <- s.dispatcher.ChanTimer: case t := <- s.dispatcher.ChanTimer:
if s.profiler != nil { if s.profiler != nil {
analyzer = s.profiler.Push("[timer]"+t.GetName()) analyzer = s.profiler.Push("[timer]"+t.GetName())
@@ -211,11 +240,11 @@ func (s *Service) GetProfiler() *profiler.Profiler{
} }
func (s *Service) RegEventReceiverFunc(eventType event.EventType, receiver event.IEventHandler,callback event.EventCallBack){ func (s *Service) RegEventReceiverFunc(eventType event.EventType, receiver event.IEventHandler,callback event.EventCallBack){
s.eventProcessor.RegEventReciverFunc(eventType, receiver,callback) s.eventProcessor.RegEventReceiverFunc(eventType, receiver,callback)
} }
func (s *Service) UnRegEventReceiverFunc(eventType event.EventType, receiver event.IEventHandler){ func (s *Service) UnRegEventReceiverFunc(eventType event.EventType, receiver event.IEventHandler){
s.eventProcessor.UnRegEventReciverFun(eventType, receiver) s.eventProcessor.UnRegEventReceiverFun(eventType, receiver)
} }
func (s *Service) IsSingleCoroutine() bool { func (s *Service) IsSingleCoroutine() bool {
@@ -230,7 +259,7 @@ func (s *Service) OnStart(){
} }
func (s *Service) OnRpcEvent(ev event.IEvent){ func (s *Service) OnRpcEvent(ev event.IEvent){
event := ev.(*RpcEventData) event := ev.(*RpcConnEvent)
if event.IsConnect { if event.IsConnect {
s.rpcEventLister.OnNodeConnected(event.NodeId) s.rpcEventLister.OnNodeConnected(event.NodeId)
}else{ }else{
@@ -249,3 +278,46 @@ func (s *Service) UnRegRpcListener(rpcLister rpc.IRpcListener) {
RegRpcEventFun(s.GetName()) RegRpcEventFun(s.GetName())
} }
func (s *Service) PushRpcRequest(rpcRequest *rpc.RpcRequest) error{
ev := eventPool.Get().(*event.Event)
ev.Type = event.ServiceRpcRequestEvent
ev.Data = rpcRequest
return s.pushEvent(ev)
}
func (s *Service) PushRpcResponse(call *rpc.Call) error{
ev := eventPool.Get().(*event.Event)
ev.Type = event.ServiceRpcResponseEvent
ev.Data = call
return s.pushEvent(ev)
}
func (s *Service) PushEvent(ev event.IEvent) error{
return s.pushEvent(ev)
}
func (s *Service) pushEvent(ev event.IEvent) error{
if len(s.chanEvent) >= maxServiceEventChannel {
err := errors.New("The event channel in the service is full")
log.SError(err.Error())
return err
}
s.chanEvent <- ev
return nil
}
func (s *Service) SetGoRoutineNum(goroutineNum int32) bool {
//已经开始状态不允许修改协程数量,打开性能分析器不允许开多线程
if s.startStatus == true || s.profiler!=nil {
log.SError("open profiler mode is not allowed to set Multi-coroutine.")
return false
}
s.goroutineNum = goroutineNum
return true
}