diff --git a/README.md b/README.md index a1d7e9b..c6b3d7c 100644 --- a/README.md +++ b/README.md @@ -30,11 +30,11 @@ go get -v -u github.com/duanhf2012/origin/v2 package main import ( - "github.com/duanhf2012/origin/v2/node" + "github.com/duanhf2012/origin/v2/node" ) func main() { - node.Start() + node.Start() } ``` @@ -64,7 +64,7 @@ origin的配置文件以json格式,主要包含Discovery、RpcMode、NodeList "Discovery":{}, "RpcMode":{}, "NodeList":[], - "Service":{}, + "Service":{}, "Global": {} } ``` @@ -138,9 +138,9 @@ MasterNodeList:指定哪些Node为服务发现Master结点,需要配置NodeI ```json { - "RpcMode":{ - "Type": "Default" - } + "RpcMode":{ + "Type": "Default" + } } ``` @@ -152,14 +152,14 @@ Nats模式 ```json { - "RpcMode":{ - "Type": "Nats", - "remark": "support Default or Nats", - "Nats": { - "NatsUrl":"127.0.0.1:4222", - "NoRandomize": true - } - } + "RpcMode":{ + "Type": "Nats", + "remark": "support Default or Nats", + "Nats": { + "NatsUrl":"127.0.0.1:4222", + "NoRandomize": true + } + } } ``` @@ -221,69 +221,68 @@ service.json如下: ``` { "Service":{ - "HttpService":{ - "ListenAddr":"0.0.0.0:9402", - "ReadTimeout":10000, - "WriteTimeout":10000, - "ProcessTimeout":10000, - "ManualStart": false, - "CAFile":[ - { - "Certfile":"", - "Keyfile":"" - } - ] - - }, - "TcpService":{ - "ListenAddr":"0.0.0.0:9030", - "MaxConnNum":3000, - "PendingWriteNum":10000, - "LittleEndian":false, - "MinMsgLen":4, - "MaxMsgLen":65535 - }, - "WSService":{ - "ListenAddr":"0.0.0.0:9031", - "MaxConnNum":3000, - "PendingWriteNum":10000, - "MaxMsgLen":65535 - } + "HttpService":{ + "ListenAddr":"0.0.0.0:9402", + "ReadTimeout":10000, + "WriteTimeout":10000, + "ProcessTimeout":10000, + "ManualStart": false, + "CAFile":[ + { + "Certfile":"", + "Keyfile":"" + }] + + }, + "TcpService":{ + "ListenAddr":"0.0.0.0:9030", + "MaxConnNum":3000, + "PendingWriteNum":10000, + "LittleEndian":false, + "MinMsgLen":4, + "MaxMsgLen":65535 + }, + "WSService":{ + "ListenAddr":"0.0.0.0:9031", + "MaxConnNum":3000, + "PendingWriteNum":10000, + "MaxMsgLen":65535 + } }, "NodeService":[ { "NodeId":1, - "TcpService":{ - "ListenAddr":"0.0.0.0:9830", - "MaxConnNum":3000, - "PendingWriteNum":10000, - "LittleEndian":false, - "MinMsgLen":4, - "MaxMsgLen":65535 - }, - "WSService":{ - "ListenAddr":"0.0.0.0:9031", - "MaxConnNum":3000, - "PendingWriteNum":10000, - "MaxMsgLen":65535 - } + "TcpService":{ + "ListenAddr":"0.0.0.0:9830", + "MaxConnNum":3000, + "PendingWriteNum":10000, + "LittleEndian":false, + "MinMsgLen":4, + "MaxMsgLen":65535 + }, + "WSService":{ + "ListenAddr":"0.0.0.0:9031", + "MaxConnNum":3000, + "PendingWriteNum":10000, + "MaxMsgLen":65535 + } }, { "NodeId":2, - "TcpService":{ - "ListenAddr":"0.0.0.0:9030", - "MaxConnNum":3000, - "PendingWriteNum":10000, - "LittleEndian":false, - "MinMsgLen":4, - "MaxMsgLen":65535 - }, - "WSService":{ - "ListenAddr":"0.0.0.0:9031", - "MaxConnNum":3000, - "PendingWriteNum":10000, - "MaxMsgLen":65535 - } + "TcpService":{ + "ListenAddr":"0.0.0.0:9030", + "MaxConnNum":3000, + "PendingWriteNum":10000, + "LittleEndian":false, + "MinMsgLen":4, + "MaxMsgLen":65535 + }, + "WSService":{ + "ListenAddr":"0.0.0.0:9031", + "MaxConnNum":3000, + "PendingWriteNum":10000, + "MaxMsgLen":65535 + } } ] @@ -325,9 +324,9 @@ service.json如下: ```json { -"Global": { - "AreaId": 1 - } + "Global": { + "AreaId": 1 + } } ``` @@ -337,7 +336,7 @@ service.json如下: globalCfg := cluster.GetCluster().GetGlobalCfg() mapGlobal, ok := globalCfg.(map[string]interface{}) if ok == false { - return fmt.Errorf("Canot find Global from config.") + return fmt.Errorf("Canot find Global from config.") } areaId, ok := mapGlobal["AreaId"] @@ -358,26 +357,26 @@ simple_service/TestService1.go如下: package simple_service import ( - "github.com/duanhf2012/origin/v2/node" - "github.com/duanhf2012/origin/v2/service" + "github.com/duanhf2012/origin/v2/node" + "github.com/duanhf2012/origin/v2/service" ) //模块加载时自动安装TestService1服务 func init(){ - node.Setup(&TestService1{}) + node.Setup(&TestService1{}) } //新建自定义服务TestService1 type TestService1 struct { - //所有的自定义服务必需加入service.Service基服务 - //那么该自定义服务将有各种功能特性 - //例如: Rpc,事件驱动,定时器等 - service.Service + //所有的自定义服务必需加入service.Service基服务 + //那么该自定义服务将有各种功能特性 + //例如: Rpc,事件驱动,定时器等 + service.Service } //服务初始化函数,在安装服务时,服务将自动调用OnInit函数 func (slf *TestService1) OnInit() error { - return nil + return nil } @@ -387,20 +386,20 @@ simple_service/TestService2.go如下: ``` import ( - "github.com/duanhf2012/origin/v2/node" - "github.com/duanhf2012/origin/v2/service" + "github.com/duanhf2012/origin/v2/node" + "github.com/duanhf2012/origin/v2/service" ) func init(){ - node.Setup(&TestService2{}) + node.Setup(&TestService2{}) } type TestService2 struct { - service.Service + service.Service } func (slf *TestService2) OnInit() error { - return nil + return nil } @@ -412,13 +411,13 @@ func (slf *TestService2) OnInit() error { package main import ( - "github.com/duanhf2012/origin/v2/node" - //导入simple_service模块 - _"orginserver/simple_service" + "github.com/duanhf2012/origin/v2/node" + //导入simple_service模块 + _"orginserver/simple_service" ) func main(){ - node.Start() + node.Start() } ``` @@ -432,7 +431,7 @@ func main(){ "NodeId": "nodeid_1", "Private": false, "ListenAddr":"127.0.0.1:8001", - "remark":"//以_打头的,表示只在本机进程,不对整个子网开发", + "remark":"//以_打头的,表示只在本机进程,不对整个子网开发", "ServiceList": ["TestService1","TestService2"] } ] @@ -459,14 +458,14 @@ TestService2 OnInit. ``` func (slf *TestService2) OnInit() error { - fmt.Printf("TestService2 OnInit.\n") - slf.AfterFunc(time.Second*1,slf.OnSecondTick) - return nil + fmt.Printf("TestService2 OnInit.\n") + slf.AfterFunc(time.Second*1,slf.OnSecondTick) + return nil } func (slf *TestService2) OnSecondTick(){ - fmt.Printf("tick.\n") - slf.AfterFunc(time.Second*1,slf.OnSecondTick) + fmt.Printf("tick.\n") + slf.AfterFunc(time.Second*1,slf.OnSecondTick) } ``` @@ -477,19 +476,19 @@ func (slf *TestService2) OnSecondTick(){ ``` func (slf *TestService2) OnInit() error { - fmt.Printf("TestService2 OnInit.\n") + fmt.Printf("TestService2 OnInit.\n") - //crontab模式定时触发 - //NewCronExpr的参数分别代表:Seconds Minutes Hours DayOfMonth Month DayOfWeek - //以下为每换分钟时触发 - cron,_:=timer.NewCronExpr("0 * * * * *") - slf.CronFunc(cron,slf.OnCron) - return nil + //crontab模式定时触发 + //NewCronExpr的参数分别代表:Seconds Minutes Hours DayOfMonth Month DayOfWeek + //以下为每换分钟时触发 + cron,_:=timer.NewCronExpr("0 * * * * *") + slf.CronFunc(cron,slf.OnCron) + return nil } func (slf *TestService2) OnCron(cron *timer.Cron){ - fmt.Printf(":A minute passed!\n") + fmt.Printf(":A minute passed!\n") } ``` @@ -503,11 +502,11 @@ func (slf *TestService2) OnCron(cron *timer.Cron){ ``` func (slf *TestService1) OnInit() error { - fmt.Printf("TestService1 OnInit.\n") + fmt.Printf("TestService1 OnInit.\n") - //打开多线程处理模式,10个协程并发处理 - slf.SetGoRoutineNum(10) - return nil + //打开多线程处理模式,10个协程并发处理 + slf.SetGoRoutineNum(10) + return nil } ``` @@ -519,33 +518,33 @@ func (slf *TestService1) OnInit() error { ``` func (slf *TestService1) OnInit() error { - fmt.Printf("TestService1 OnInit.\n") - //打开性能分析工具 - slf.OpenProfiler() - //监控超过1秒的慢处理 - slf.GetProfiler().SetOverTime(time.Second*1) - //监控超过10秒的超慢处理,您可以用它来定位是否存在死循环 - //比如以下设置10秒,我的应用中是不会发生超过10秒的一次函数调用 - //所以设置为10秒。 - slf.GetProfiler().SetMaxOverTime(time.Second*10) + fmt.Printf("TestService1 OnInit.\n") + //打开性能分析工具 + slf.OpenProfiler() + //监控超过1秒的慢处理 + slf.GetProfiler().SetOverTime(time.Second*1) + //监控超过10秒的超慢处理,您可以用它来定位是否存在死循环 + //比如以下设置10秒,我的应用中是不会发生超过10秒的一次函数调用 + //所以设置为10秒。 + slf.GetProfiler().SetMaxOverTime(time.Second*10) - slf.AfterFunc(time.Second*2,slf.Loop) - //打开多线程处理模式,10个协程并发处理 - //slf.SetGoRoutineNum(10) - return nil + slf.AfterFunc(time.Second*2,slf.Loop) + //打开多线程处理模式,10个协程并发处理 + //slf.SetGoRoutineNum(10) + return nil } func (slf *TestService1) Loop(){ - for { - time.Sleep(time.Second*1) - } + for { + time.Sleep(time.Second*1) + } } func main(){ - //打开性能分析报告功能,并设置10秒汇报一次 - node.OpenProfilerReport(time.Second*10) - node.Start() + //打开性能分析报告功能,并设置10秒汇报一次 + node.OpenProfilerReport(time.Second*10) + node.Start() } ``` @@ -565,9 +564,9 @@ too slow process:Timer_orginserver/simple_service.(*TestService1).Loop-fm is tak ``` func (ts *TestService) OnInit() error{ - ts.RegRpcListener(ts) + ts.RegRpcListener(ts) - return nil + return nil } func (ts *TestService) OnNodeConnected(nodeId int){ @@ -589,60 +588,60 @@ Module创建与销毁: package simple_module import ( - "fmt" - "github.com/duanhf2012/origin/v2/node" - "github.com/duanhf2012/origin/v2/service" + "fmt" + "github.com/duanhf2012/origin/v2/node" + "github.com/duanhf2012/origin/v2/service" ) func init(){ - node.Setup(&TestService3{}) + node.Setup(&TestService3{}) } type TestService3 struct { - service.Service + service.Service } type Module1 struct { - service.Module + service.Module } type Module2 struct { - service.Module + service.Module } func (slf *Module1) OnInit()error{ - fmt.Printf("Module1 OnInit.\n") - return nil + fmt.Printf("Module1 OnInit.\n") + return nil } func (slf *Module1) OnRelease(){ - fmt.Printf("Module1 Release.\n") + fmt.Printf("Module1 Release.\n") } func (slf *Module2) OnInit()error{ - fmt.Printf("Module2 OnInit.\n") - return nil + fmt.Printf("Module2 OnInit.\n") + return nil } func (slf *Module2) OnRelease(){ - fmt.Printf("Module2 Release.\n") + fmt.Printf("Module2 Release.\n") } func (slf *TestService3) OnInit() error { - //新建两个Module对象 - module1 := &Module1{} - module2 := &Module2{} - //将module1添加到服务中 - module1Id,_ := slf.AddModule(module1) - //在module1中添加module2模块 - module1.AddModule(module2) - fmt.Printf("module1 id is %d, module2 id is %d",module1Id,module2.GetModuleId()) + //新建两个Module对象 + module1 := &Module1{} + module2 := &Module2{} + //将module1添加到服务中 + module1Id,_ := slf.AddModule(module1) + //在module1中添加module2模块 + module1.AddModule(module2) + fmt.Printf("module1 id is %d, module2 id is %d",module1Id,module2.GetModuleId()) - //释放模块module1 - slf.ReleaseModule(module1Id) - fmt.Printf("xxxxxxxxxxx") - return nil + //释放模块module1 + slf.ReleaseModule(module1Id) + fmt.Printf("xxxxxxxxxxx") + return nil } ``` @@ -670,39 +669,39 @@ Module1 Release. package simple_event import ( - "github.com/duanhf2012/origin/v2/event" - "github.com/duanhf2012/origin/v2/node" - "github.com/duanhf2012/origin/v2/service" - "time" + "github.com/duanhf2012/origin/v2/event" + "github.com/duanhf2012/origin/v2/node" + "github.com/duanhf2012/origin/v2/service" + "time" ) const ( - //自定义事件类型,必需从event.Sys_Event_User_Define开始 - //event.Sys_Event_User_Define以内给系统预留 - EVENT1 event.EventType =event.Sys_Event_User_Define+1 + //自定义事件类型,必需从event.Sys_Event_User_Define开始 + //event.Sys_Event_User_Define以内给系统预留 + EVENT1 event.EventType =event.Sys_Event_User_Define+1 ) func init(){ - node.Setup(&TestService4{}) + node.Setup(&TestService4{}) } type TestService4 struct { - service.Service + service.Service } func (slf *TestService4) OnInit() error { - //10秒后触发广播事件 - slf.AfterFunc(time.Second*10,slf.TriggerEvent) - return nil + //10秒后触发广播事件 + slf.AfterFunc(time.Second*10,slf.TriggerEvent) + return nil } func (slf *TestService4) TriggerEvent(){ - //广播事件,传入event.Event对象,类型为EVENT1,Data可以自定义任何数据 - //这样,所有监听者都可以收到该事件 - slf.GetEventHandler().NotifyEvent(&event.Event{ - Type: EVENT1, - Data: "event data.", - }) + //广播事件,传入event.Event对象,类型为EVENT1,Data可以自定义任何数据 + //这样,所有监听者都可以收到该事件 + slf.GetEventHandler().NotifyEvent(&event.Event{ + Type: EVENT1, + Data: "event data.", + }) } @@ -714,53 +713,53 @@ func (slf *TestService4) TriggerEvent(){ package simple_event import ( - "fmt" - "github.com/duanhf2012/origin/v2/event" - "github.com/duanhf2012/origin/v2/node" - "github.com/duanhf2012/origin/v2/service" + "fmt" + "github.com/duanhf2012/origin/v2/event" + "github.com/duanhf2012/origin/v2/node" + "github.com/duanhf2012/origin/v2/service" ) func init(){ - node.Setup(&TestService5{}) + node.Setup(&TestService5{}) } type TestService5 struct { - service.Service + service.Service } type TestModule struct { - service.Module + service.Module } func (slf *TestModule) OnInit() error{ - //在当前node中查找TestService4 - pService := node.GetService("TestService4") + //在当前node中查找TestService4 + pService := node.GetService("TestService4") - //在TestModule中,往TestService4中注册EVENT1类型事件监听 - pService.(*TestService4).GetEventProcessor().RegEventReciverFunc(EVENT1,slf.GetEventHandler(),slf.OnModuleEvent) - return nil + //在TestModule中,往TestService4中注册EVENT1类型事件监听 + pService.(*TestService4).GetEventProcessor().RegEventReciverFunc(EVENT1,slf.GetEventHandler(),slf.OnModuleEvent) + return nil } func (slf *TestModule) OnModuleEvent(ev event.IEvent){ - event := ev.(*event.Event) - fmt.Printf("OnModuleEvent type :%d data:%+v\n",event.GetEventType(),event.Data) + event := ev.(*event.Event) + fmt.Printf("OnModuleEvent type :%d data:%+v\n",event.GetEventType(),event.Data) } //服务初始化函数,在安装服务时,服务将自动调用OnInit函数 func (slf *TestService5) OnInit() error { - //通过服务名获取服务对象 - pService := node.GetService("TestService4") + //通过服务名获取服务对象 + pService := node.GetService("TestService4") - ////在TestModule中,往TestService4中注册EVENT1类型事件监听 - pService.(*TestService4).GetEventProcessor().RegEventReciverFunc(EVENT1,slf.GetEventHandler(),slf.OnServiceEvent) - slf.AddModule(&TestModule{}) - return nil + ////在TestModule中,往TestService4中注册EVENT1类型事件监听 + pService.(*TestService4).GetEventProcessor().RegEventReciverFunc(EVENT1,slf.GetEventHandler(),slf.OnServiceEvent) + slf.AddModule(&TestModule{}) + return nil } func (slf *TestService5) OnServiceEvent(ev event.IEvent){ - event := ev.(*event.Event) - fmt.Printf("OnServiceEvent type :%d data:%+v\n",event.Type,event.Data) + event := ev.(*event.Event) + fmt.Printf("OnServiceEvent type :%d data:%+v\n",event.Type,event.Data) } @@ -786,31 +785,31 @@ simple_rpc/TestService6.go文件如下: package simple_rpc import ( - "github.com/duanhf2012/origin/v2/node" - "github.com/duanhf2012/origin/v2/service" + "github.com/duanhf2012/origin/v2/node" + "github.com/duanhf2012/origin/v2/service" ) func init(){ - node.Setup(&TestService6{}) + node.Setup(&TestService6{}) } type TestService6 struct { - service.Service + service.Service } func (slf *TestService6) OnInit() error { - return nil + return nil } type InputData struct { - A int - B int + A int + B int } // 注意RPC函数名的格式必需为RPC_FunctionName或者是RPCFunctionName,如下的RPC_Sum也可以写成RPCSum func (slf *TestService6) RPC_Sum(input *InputData,output *int) error{ - *output = input.A+input.B - return nil + *output = input.A+input.B + return nil } ``` @@ -821,96 +820,96 @@ simple_rpc/TestService7.go文件如下: package simple_rpc import ( - "fmt" - "github.com/duanhf2012/origin/v2/node" - "github.com/duanhf2012/origin/v2/service" - "time" + "fmt" + "github.com/duanhf2012/origin/v2/node" + "github.com/duanhf2012/origin/v2/service" + "time" ) func init(){ - node.Setup(&TestService7{}) + node.Setup(&TestService7{}) } type TestService7 struct { - service.Service + service.Service } func (slf *TestService7) OnInit() error { - slf.AfterFunc(time.Second*2,slf.CallTest) - slf.AfterFunc(time.Second*2,slf.AsyncCallTest) - slf.AfterFunc(time.Second*2,slf.GoTest) - return nil + slf.AfterFunc(time.Second*2,slf.CallTest) + slf.AfterFunc(time.Second*2,slf.AsyncCallTest) + slf.AfterFunc(time.Second*2,slf.GoTest) + return nil } func (slf *TestService7) CallTest(){ - var input InputData - input.A = 300 - input.B = 600 - var output int + var input InputData + input.A = 300 + input.B = 600 + var output int - //同步调用其他服务的rpc,input为传入的rpc,output为输出参数 - err := slf.Call("TestService6.RPC_Sum",&input,&output) - if err != nil { - fmt.Printf("Call error :%+v\n",err) - }else{ - fmt.Printf("Call output %d\n",output) - } - - - //自定义超时,默认rpc超时时间为15s,以下设置1秒钟超过 - err = slf.CallWithTimeout(time.Second*1, "TestService6.RPC_Sum", &input, &output) - if err != nil { - fmt.Printf("Call error :%+v\n", err) - } else { - fmt.Printf("Call output %d\n", output) - } + //同步调用其他服务的rpc,input为传入的rpc,output为输出参数 + err := slf.Call("TestService6.RPC_Sum",&input,&output) + if err != nil { + fmt.Printf("Call error :%+v\n",err) + }else{ + fmt.Printf("Call output %d\n",output) + } + + + //自定义超时,默认rpc超时时间为15s,以下设置1秒钟超过 + err = slf.CallWithTimeout(time.Second*1, "TestService6.RPC_Sum", &input, &output) + if err != nil { + fmt.Printf("Call error :%+v\n", err) + } else { + fmt.Printf("Call output %d\n", output) + } } func (slf *TestService7) AsyncCallTest(){ - var input InputData - input.A = 300 - input.B = 600 - /*slf.AsyncCallNode(1,"TestService6.RPC_Sum",&input,func(output *int,err error){ - })*/ - //异步调用,在数据返回时,会回调传入函数 - //注意函数的第一个参数一定是RPC_Sum函数的第二个参数,err error为RPC_Sum返回值 - err := slf.AsyncCall("TestService6.RPC_Sum", &input, func(output *int, err error) { - if err != nil { - fmt.Printf("AsyncCall error :%+v\n", err) - } else { - fmt.Printf("AsyncCall output %d\n", *output) - } - }) - fmt.Println(err) + var input InputData + input.A = 300 + input.B = 600 + /*slf.AsyncCallNode(1,"TestService6.RPC_Sum",&input,func(output *int,err error){ + })*/ + //异步调用,在数据返回时,会回调传入函数 + //注意函数的第一个参数一定是RPC_Sum函数的第二个参数,err error为RPC_Sum返回值 + err := slf.AsyncCall("TestService6.RPC_Sum", &input, func(output *int, err error) { + if err != nil { + fmt.Printf("AsyncCall error :%+v\n", err) + } else { + fmt.Printf("AsyncCall output %d\n", *output) + } + }) + fmt.Println(err) - //自定义超时,返回一个cancel函数,可以在业务需要时取消rpc调用 - rpcCancel, err := slf.AsyncCallWithTimeout(time.Second*1, "TestService6.RPC_Sum", &input, func(output *int, err error) { - //如果下面注释的rpcCancel()函数被调用,这里可能将不再返回 - if err != nil { - fmt.Printf("AsyncCall error :%+v\n", err) - } else { - fmt.Printf("AsyncCall output %d\n", *output) - } - }) - //rpcCancel() - fmt.Println(err, rpcCancel) - + //自定义超时,返回一个cancel函数,可以在业务需要时取消rpc调用 + rpcCancel, err := slf.AsyncCallWithTimeout(time.Second*1, "TestService6.RPC_Sum", &input, func(output *int, err error) { + //如果下面注释的rpcCancel()函数被调用,这里可能将不再返回 + if err != nil { + fmt.Printf("AsyncCall error :%+v\n", err) + } else { + fmt.Printf("AsyncCall output %d\n", *output) + } + }) + //rpcCancel() + fmt.Println(err, rpcCancel) + } func (slf *TestService7) GoTest(){ - var input InputData - input.A = 300 - input.B = 600 + var input InputData + input.A = 300 + input.B = 600 - //在某些应用场景下不需要数据返回可以使用Go,它是不阻塞的,只需要填入输入参数 - err := slf.Go("TestService6.RPC_Sum",&input) - if err != nil { - fmt.Printf("Go error :%+v\n",err) - } + //在某些应用场景下不需要数据返回可以使用Go,它是不阻塞的,只需要填入输入参数 + err := slf.Go("TestService6.RPC_Sum",&input) + if err != nil { + fmt.Printf("Go error :%+v\n",err) + } - //以下是广播方式,如果在同一个子网中有多个同名的服务名,CastGo将会广播给所有的node - //slf.CastGo("TestService6.RPC_Sum",&input) + //以下是广播方式,如果在同一个子网中有多个同名的服务名,CastGo将会广播给所有的node + //slf.CastGo("TestService6.RPC_Sum",&input) } ``` @@ -922,65 +921,65 @@ func (slf *TestService7) GoTest(){ --------------- 在开发中经常会有将某些任务放到其他协程中并发执行,执行完成后,将服务的工作线程去回调。使用方式很简单,先打开该功能如下代码: ``` - //以下通过cpu数量来定开启协程并发数量,建议:(1)cpu密集型计算使用1.0 (2)i/o密集型使用2.0或者更高 - slf.OpenConcurrentByNumCPU(1.0) - - //以下通过函数打开并发协程数,以下协程数最小5,最大10,任务管道的cap数量1000000 - //origin会根据任务的数量在最小与最大协程数间动态伸缩 - //slf.OpenConcurrent(5, 10, 1000000) + //以下通过cpu数量来定开启协程并发数量,建议:(1)cpu密集型计算使用1.0 (2)i/o密集型使用2.0或者更高 + slf.OpenConcurrentByNumCPU(1.0) + + //以下通过函数打开并发协程数,以下协程数最小5,最大10,任务管道的cap数量1000000 + //origin会根据任务的数量在最小与最大协程数间动态伸缩 + //slf.OpenConcurrent(5, 10, 1000000) ``` 使用示例如下: ``` func (slf *TestService13) testAsyncDo() { - var context struct { - data int64 - } + var context struct { + data int64 + } - //1.示例普通使用 - //参数一的函数在其他协程池中执行完成,将执行完成事件放入服务工作协程, - //参数二的函数在服务协程中执行,是协程安全的。 - slf.AsyncDo(func() bool { - //该函数回调在协程池中执行 - context.data = 100 - return true - }, func(err error) { - //函数将在服务协程中执行 - fmt.Print(context.data) //显示100 - }) + //1.示例普通使用 + //参数一的函数在其他协程池中执行完成,将执行完成事件放入服务工作协程, + //参数二的函数在服务协程中执行,是协程安全的。 + slf.AsyncDo(func() bool { + //该函数回调在协程池中执行 + context.data = 100 + return true + }, func(err error) { + //函数将在服务协程中执行 + fmt.Print(context.data) //显示100 + }) - //2.示例按队列顺序 - //参数一传入队列Id,同一个队列Id将在协程池中被排队执行 - //以下进行两次调用,因为两次都传入参数queueId都为1,所以它们会都进入queueId为1的排队执行 - queueId := int64(1) - for i := 0; i < 2; i++ { - slf.AsyncDoByQueue(queueId, func() bool { - //该函数会被2次调用,但是会排队执行 - return true - }, func(err error) { - //函数将在服务协程中执行 - }) - } + //2.示例按队列顺序 + //参数一传入队列Id,同一个队列Id将在协程池中被排队执行 + //以下进行两次调用,因为两次都传入参数queueId都为1,所以它们会都进入queueId为1的排队执行 + queueId := int64(1) + for i := 0; i < 2; i++ { + slf.AsyncDoByQueue(queueId, func() bool { + //该函数会被2次调用,但是会排队执行 + return true + }, func(err error) { + //函数将在服务协程中执行 + }) + } - //3.函数参数可以某中一个为空 - //参数二函数将被延迟执行 - slf.AsyncDo(nil, func(err error) { - //将在下 - }) + //3.函数参数可以某中一个为空 + //参数二函数将被延迟执行 + slf.AsyncDo(nil, func(err error) { + //将在下 + }) - //参数一函数在协程池中执行,但没有在服务协程中回调 - slf.AsyncDo(func() bool { - return true - }, nil) + //参数一函数在协程池中执行,但没有在服务协程中回调 + slf.AsyncDo(func() bool { + return true + }, nil) - //4.函数返回值控制不进行回调 - slf.AsyncDo(func() bool { - //返回false时,参数二函数将不会被执行; 为true时,则会被执行 - return false - }, func(err error) { - //该函数将不会被执行 - }) + //4.函数返回值控制不进行回调 + slf.AsyncDo(func() bool { + //返回false时,参数二函数将不会被执行; 为true时,则会被执行 + return false + }, func(err error) { + //该函数将不会被执行 + }) } ``` @@ -992,21 +991,21 @@ origin引擎默认使用读取所有结点配置的进行确认结点有哪些Se ``` { - "NodeList": [{ - "NodeId": "nodeid_test", - "ListenAddr": "127.0.0.1:8801", - "MaxRpcParamLen": 409600, - "Private": false, - "remark": "//以_打头的,表示只在本机进程,不对整个子网开发", - "ServiceList": ["_TestService1", "TestService9", "TestService10"], - "DiscoveryService": [ - { + "NodeList": [{ + "NodeId": "nodeid_test", + "ListenAddr": "127.0.0.1:8801", + "MaxRpcParamLen": 409600, + "Private": false, + "remark": "//以_打头的,表示只在本机进程,不对整个子网开发", + "ServiceList": ["_TestService1", "TestService9", "TestService10"], + "DiscoveryService": [ + { "MasterNodeId": "nodeid_1", "NetworkName":"networkname1" "DiscoveryService": ["TestService8"] } - ] - }] + ] + }] } ``` DiscoveryService:在当前nodeid为nodeid_test的结点中,只发现 MasterNodeId为nodeid_1或NetworkName为networkname1网络中的TestService8服务。 @@ -1026,68 +1025,68 @@ simple_http/TestHttpService.go文件如下: package simple_http import ( - "fmt" - "github.com/duanhf2012/origin/v2/node" - "github.com/duanhf2012/origin/v2/service" - "github.com/duanhf2012/origin/v2/sysservice" - "net/http" + "fmt" + "github.com/duanhf2012/origin/v2/node" + "github.com/duanhf2012/origin/v2/service" + "github.com/duanhf2012/origin/v2/sysservice" + "net/http" ) func init(){ - node.Setup(&sysservice.HttpService{}) - node.Setup(&TestHttpService{}) + node.Setup(&sysservice.HttpService{}) + node.Setup(&TestHttpService{}) } //新建自定义服务TestService1 type TestHttpService struct { - service.Service + service.Service } func (slf *TestHttpService) OnInit() error { - //获取系统httpservice服务 - httpservice := node.GetService("HttpService").(*sysservice.HttpService) + //获取系统httpservice服务 + httpservice := node.GetService("HttpService").(*sysservice.HttpService) - //新建并设置路由对象 - httpRouter := sysservice.NewHttpHttpRouter() - httpservice.SetHttpRouter(httpRouter,slf.GetEventHandler()) + //新建并设置路由对象 + httpRouter := sysservice.NewHttpHttpRouter() + httpservice.SetHttpRouter(httpRouter,slf.GetEventHandler()) - //GET方法,请求url:http://127.0.0.1:9402/get/query?nickname=boyce - //并header中新增key为uid,value为1000的头,则用postman测试返回结果为: - //head uid:1000, nickname:boyce - httpRouter.GET("/get/query", slf.HttpGet) + //GET方法,请求url:http://127.0.0.1:9402/get/query?nickname=boyce + //并header中新增key为uid,value为1000的头,则用postman测试返回结果为: + //head uid:1000, nickname:boyce + httpRouter.GET("/get/query", slf.HttpGet) - //POST方法 请求url:http://127.0.0.1:9402/post/query - //返回结果为:{"msg":"hello world"} - httpRouter.POST("/post/query", slf.HttpPost) + //POST方法 请求url:http://127.0.0.1:9402/post/query + //返回结果为:{"msg":"hello world"} + httpRouter.POST("/post/query", slf.HttpPost) - //GET方式获取目录下的资源,http://127.0.0.1:port/img/head/a.jpg - httpRouter.SetServeFile(sysservice.METHOD_GET,"/img/head/","d:/img") + //GET方式获取目录下的资源,http://127.0.0.1:port/img/head/a.jpg + httpRouter.SetServeFile(sysservice.METHOD_GET,"/img/head/","d:/img") - //如果配置"ManualStart": true配置为true,则使用以下方法进行开启http监听 - //httpservice.StartListen() - return nil + //如果配置"ManualStart": true配置为true,则使用以下方法进行开启http监听 + //httpservice.StartListen() + return nil } func (slf *TestHttpService) HttpGet(session *sysservice.HttpSession){ - //从头中获取key为uid对应的值 - uid := session.GetHeader("uid") - //从url参数中获取key为nickname对应的值 - nickname,_ := session.Query("nickname") - //向body部分写入数据 - session.Write([]byte(fmt.Sprintf("head uid:%s, nickname:%s",uid,nickname))) - //写入http状态 - session.WriteStatusCode(http.StatusOK) - //完成返回 - session.Done() + //从头中获取key为uid对应的值 + uid := session.GetHeader("uid") + //从url参数中获取key为nickname对应的值 + nickname,_ := session.Query("nickname") + //向body部分写入数据 + session.Write([]byte(fmt.Sprintf("head uid:%s, nickname:%s",uid,nickname))) + //写入http状态 + session.WriteStatusCode(http.StatusOK) + //完成返回 + session.Done() } type HttpRespone struct { - Msg string `json:"msg"` + Msg string `json:"msg"` } func (slf *TestHttpService) HttpPost(session *sysservice.HttpSession){ - //也可以采用直接返回数据对象方式,如下: - session.WriteJsonDone(http.StatusOK,&HttpRespone{Msg: "hello world"}) + //也可以采用直接返回数据对象方式,如下: + session.WriteJsonDone(http.StatusOK,&HttpRespone{Msg: "hello world"}) } ``` @@ -1105,76 +1104,80 @@ simple_tcp/TestTcpService.go文件如下: package simple_tcp import ( - "fmt" - "github.com/duanhf2012/origin/v2/network/processor" - "github.com/duanhf2012/origin/v2/node" - "github.com/duanhf2012/origin/v2/service" - "github.com/duanhf2012/origin/v2/sysservice" - "github.com/golang/protobuf/proto" - "orginserver/simple_tcp/msgpb" + "fmt" + "github.com/duanhf2012/origin/v2/network/processor" + "github.com/duanhf2012/origin/v2/node" + "github.com/duanhf2012/origin/v2/service" + "github.com/duanhf2012/origin/v2/sysservice" + "github.com/golang/protobuf/proto" + "orginserver/simple_tcp/msgpb" ) func init(){ - node.Setup(&sysservice.TcpService{}) - node.Setup(&TestTcpService{}) + node.Setup(&sysservice.TcpService{}) + node.Setup(&TestTcpService{}) } //新建自定义服务TestService1 type TestTcpService struct { - service.Service - processor *processor.PBProcessor - tcpService *sysservice.TcpService + service.Service + processor *processor.PBProcessor + tcpService *sysservice.TcpService } func (slf *TestTcpService) OnInit() error { - //获取安装好了的TcpService对象 - slf.tcpService = node.GetService("TcpService").(*sysservice.TcpService) + //获取安装好了的TcpService对象 + slf.tcpService = node.GetService("TcpService").(*sysservice.TcpService) - //新建内置的protobuf处理器,您也可以自定义路由器,比如json,后续会补充 - slf.processor = processor.NewPBProcessor() + //新建内置的protobuf处理器,您也可以自定义路由器,比如json,后续会补充 + slf.processor = processor.NewPBProcessor() - //注册监听客户连接断开事件 - slf.processor.RegisterDisConnected(slf.OnDisconnected) - //注册监听客户连接事件 - slf.processor.RegisterConnected(slf.OnConnected) - //注册监听消息类型MsgType_MsgReq,并注册回调 - slf.processor.Register(uint16(msgpb.MsgType_MsgReq),&msgpb.Req{},slf.OnRequest) - //将protobuf消息处理器设置到TcpService服务中 - slf.tcpService.SetProcessor(slf.processor,slf.GetEventHandler()) + //注册监听客户连接断开事件 + slf.processor.RegisterDisConnected(slf.OnDisconnected) + //注册监听客户连接事件 + slf.processor.RegisterConnected(slf.OnConnected) + //注册监听消息类型MsgType_MsgReq,并注册回调 + slf.processor.Register(uint16(msgpb.MsgType_MsgReq),&msgpb.Req{},slf.OnRequest) + //将protobuf消息处理器设置到TcpService服务中 + slf.tcpService.SetProcessor(slf.processor,slf.GetEventHandler()) - return nil + return nil } func (slf *TestTcpService) OnConnected(clientid string){ - fmt.Printf("client id %s connected\n",clientid) + fmt.Printf("client id %s connected\n",clientid) } func (slf *TestTcpService) OnDisconnected(clientid string){ - fmt.Printf("client id %s disconnected\n",clientid) + fmt.Printf("client id %s disconnected\n",clientid) } func (slf *TestTcpService) OnRequest (clientid string,msg proto.Message){ - //解析客户端发过来的数据 - pReq := msg.(*msgpb.Req) - //发送数据给客户端 - err := slf.tcpService.SendMsg(clientid,&msgpb.Req{ - Msg: proto.String(pReq.GetMsg()), - }) - if err != nil { - fmt.Printf("send msg is fail %+v!",err) - } + //解析客户端发过来的数据 + pReq := msg.(*msgpb.Req) + //发送数据给客户端 + err := slf.tcpService.SendMsg(clientid,&msgpb.Req{ + Msg: proto.String(pReq.GetMsg()), + }) + if err != nil { + fmt.Printf("send msg is fail %+v!",err) + } } ``` 第十章:其他系统模块介绍 ------------------------ -* sysservice/wsservice.go:支持了WebSocket协议,使用方法与TcpService类似 -* sysmodule/DBModule.go:对mysql数据库操作 -* sysmodule/RedisModule.go:对Redis数据进行操作 -* sysmodule/HttpClientPoolModule.go:Http客户端请求封装 +* sysservice/wsservice/:支持了WebSocket协议,使用方法与TcpService类似 +* sysservice/messagequeueservice/:自定义的消息队列 +* sysservice/rankservice/:排行榜服务,采用跳表数据结构实现 +* sysmodule/mysqlmodule/:对mysql数据库操作 +* sysmodule/redismodule/:对Redis数据进行操作 +* sysmodule/httpclientmodule/:Http客户端请求封装 +* sysmodule/ginmodule/:对gin模块的封装,支持服务协程处理 +* sysmodule/kafkamodule/:对kafka的封装 * log/log.go:日志的封装,可以使用它构建对象记录业务文件日志 * util:在该目录下,有常用的uuid,hash,md5,协程封装等工具库 * https://github.com/duanhf2012/originservice: 其他扩展支持的服务可以在该工程上看到,目前支持firebase推送的封装。