补提交文件

This commit is contained in:
duanhf2012
2020-03-28 10:00:53 +08:00
parent 7906599709
commit 4ce56964b0
2 changed files with 318 additions and 0 deletions

192
service/module.go Normal file
View File

@@ -0,0 +1,192 @@
package service
import (
"fmt"
"github.com/duanhf2012/originnet/event"
"github.com/duanhf2012/originnet/util/timer"
"time"
)
const InitModuleId = 1e18
type IModule interface {
SetModuleId(moduleId int64) bool
GetModuleId() int64
AddModule(module IModule) (int64,error)
GetModule(moduleId int64) IModule
GetAncestor()IModule
ReleaseModule(moduleId int64)
NewModuleId() int64
GetParent()IModule
OnInit() error
OnRelease()
getBaseModule() IModule
GetService() IService
GetEventChan() chan *event.Event
}
//1.管理各模块树层关系
//2.提供定时器常用工具
type Module struct {
moduleId int64
parent IModule //父亲
self IModule //父亲
child map[int64]IModule //孩子们
mapActiveTimer map[*timer.Timer]interface{}
mapActiveCron map[*timer.Cron]interface{}
dispatcher *timer.Dispatcher //timer
//根结点
ancestor IModule //始祖
seedModuleId int64 //模块id种子
descendants map[int64]IModule//始祖的后裔们
//事件管道
event.EventProcessor
//eventChan chan *SEvent
}
func (slf *Module) SetModuleId(moduleId int64) bool{
if moduleId > 0 {
return false
}
slf.moduleId = moduleId
return true
}
func (slf *Module) GetModuleId() int64{
return slf.moduleId
}
func (slf *Module) OnInit() error{
return nil
}
func (slf *Module) AddModule(module IModule) (int64,error){
pAddModule := module.getBaseModule().(*Module)
if pAddModule.GetModuleId()==0 {
pAddModule.moduleId = slf.NewModuleId()
}
if slf.child == nil {
slf.child = map[int64]IModule{}
}
_,ok := slf.child[module.GetModuleId()]
if ok == true {
return 0,fmt.Errorf("Exists module id %d",module.GetModuleId())
}
pAddModule.self = module
pAddModule.parent = slf.self
pAddModule.dispatcher = slf.GetAncestor().getBaseModule().(*Module).dispatcher
pAddModule.ancestor = slf.ancestor
err := module.OnInit()
if err != nil {
return 0,err
}
slf.child[module.GetModuleId()] = module
slf.ancestor.getBaseModule().(*Module).descendants[module.GetModuleId()] = module
return module.GetModuleId(),nil
}
func (slf *Module) ReleaseModule(moduleId int64){
//pBaseModule := slf.GetModule(moduleId).getBaseModule().(*Module)
pModule := slf.GetModule(moduleId).getBaseModule().(*Module)
//释放子孙
for id,_ := range pModule.child {
slf.ReleaseModule(id)
}
pModule.self.OnRelease()
for pTimer,_ := range pModule.mapActiveTimer {
pTimer.Stop()
}
for pCron,_ := range pModule.mapActiveCron {
pCron.Stop()
}
delete(slf.child,moduleId)
delete (slf.ancestor.getBaseModule().(*Module).descendants,moduleId)
//清理被删除的Module
pModule.self = nil
pModule.parent = nil
pModule.child = nil
pModule.mapActiveTimer = nil
pModule.mapActiveCron = nil
pModule.dispatcher = nil
pModule.ancestor = nil
pModule.descendants = nil
}
func (slf *Module) NewModuleId() int64{
slf.ancestor.getBaseModule().(*Module).seedModuleId+=1
return slf.ancestor.getBaseModule().(*Module).seedModuleId
}
func (slf *Module) GetAncestor()IModule{
return slf.ancestor
}
func (slf *Module) GetModule(moduleId int64) IModule{
iModule,ok := slf.GetAncestor().getBaseModule().(*Module).descendants[moduleId]
if ok == false{
return nil
}
return iModule
}
func (slf *Module) getBaseModule() IModule{
return slf
}
func (slf *Module) GetParent()IModule{
return slf.parent
}
func (slf *Module) AfterFunc(d time.Duration, cb func()) *timer.Timer {
if slf.mapActiveTimer == nil {
slf.mapActiveTimer =map[*timer.Timer]interface{}{}
}
tm := slf.dispatcher.AfterFuncEx(d,func(t *timer.Timer){
cb()
delete(slf.mapActiveTimer,t)
})
slf.mapActiveTimer[tm] = nil
return tm
}
func (slf *Module) CronFunc(cronExpr *timer.CronExpr, cb func()) *timer.Cron {
if slf.mapActiveCron == nil {
slf.mapActiveCron =map[*timer.Cron]interface{}{}
}
cron := slf.dispatcher.CronFuncEx(cronExpr, func(cron *timer.Cron) {
cb()
delete(slf.mapActiveCron,cron)
})
slf.mapActiveCron[cron] = nil
return cron
}
func (slf *Module) OnRelease(){
}
func (slf *Module) GetService() IService {
return slf.GetAncestor().(IService)
}

126
service/service.go Normal file
View File

@@ -0,0 +1,126 @@
package service
import (
"github.com/duanhf2012/originnet/rpc"
"github.com/duanhf2012/originnet/util/timer"
"reflect"
"sync"
"sync/atomic"
)
var closeSig chan bool
var timerDispatcherLen = 10
type IService interface {
Init(iservice IService,getClientFun rpc.FuncRpcClient,getServerFun rpc.FuncRpcServer,serviceCfg interface{})
GetName() string
OnInit() error
OnRelease()
Wait()
Start()
GetRpcHandler() rpc.IRpcHandler
GetServiceCfg()interface{}
}
type Service struct {
Module
rpc.RpcHandler //rpc
name string //service name
closeSig chan bool
wg sync.WaitGroup
this IService
serviceCfg interface{}
gorouterNum int32
startStatus bool
}
func (slf *Service) Init(iservice IService,getClientFun rpc.FuncRpcClient,getServerFun rpc.FuncRpcServer,serviceCfg interface{}) {
slf.name = reflect.Indirect(reflect.ValueOf(iservice)).Type().Name()
slf.dispatcher =timer.NewDispatcher(timerDispatcherLen)
slf.this = iservice
slf.InitRpcHandler(iservice.(rpc.IRpcHandler),getClientFun,getServerFun)
//初始化祖先
slf.ancestor = iservice.(IModule)
slf.seedModuleId =InitModuleId
slf.descendants = map[int64]IModule{}
slf.serviceCfg = serviceCfg
slf.gorouterNum = 1
slf.this.OnInit()
}
func (slf *Service) SetGoRouterNum(gorouterNum int32) bool {
//已经开始状态不允许修改协程数量
if slf.startStatus == true {
return false
}
slf.gorouterNum = gorouterNum
return true
}
func (slf *Service) Start() {
slf.startStatus = true
for i:=int32(0);i<slf.gorouterNum;i++{
slf.wg.Add(1)
go func(){
slf.Run()
}()
}
}
func (slf *Service) Run() {
defer slf.wg.Done()
var bStop = false
for{
rpcRequestChan := slf.GetRpcRequestChan()
rpcResponeCallBack := slf.GetRpcResponeChan()
eventChan := slf.GetEventChan()
select {
case <- closeSig:
bStop = true
case rpcRequest :=<- rpcRequestChan:
slf.GetRpcHandler().HandlerRpcRequest(rpcRequest)
case rpcResponeCB := <- rpcResponeCallBack:
slf.GetRpcHandler().HandlerRpcResponeCB(rpcResponeCB)
case event := <- eventChan:
slf.OnEventHandler(event)
case t := <- slf.dispatcher.ChanTimer:
t.Cb()
}
if bStop == true {
if atomic.AddInt32(&slf.gorouterNum,-1)<=0 {
slf.startStatus = false
slf.OnRelease()
}
break
}
}
}
func (slf *Service) GetName() string{
return slf.name
}
func (slf *Service) OnRelease(){
}
func (slf *Service) OnInit() error {
return nil
}
func (slf *Service) Wait(){
slf.wg.Wait()
}
func (slf *Service) GetServiceCfg()interface{}{
return slf.serviceCfg
}