修改子网集群示例

This commit is contained in:
boyce
2019-09-04 15:36:00 +08:00
parent 8d20108a82
commit 527286f8ee
9 changed files with 254 additions and 165 deletions

34
Test/SubNet1_Service.go Normal file
View File

@@ -0,0 +1,34 @@
package main
import (
"fmt"
"github.com/duanhf2012/origin/cluster"
"github.com/duanhf2012/origin/originnode"
"github.com/duanhf2012/origin/service"
)
type SubNet1_Service struct {
service.BaseService
}
func init() {
originnode.InitService(&SubNet1_Service{})
}
//OnInit ...
func (ws *SubNet1_Service) OnInit() error {
return nil
}
//OnRun ...
func (ws *SubNet1_Service) OnRun() bool {
var in InputData
var ret int
in.A1 = 10
in.A2 = 20
err := cluster.Call("SubNet2_Service1.RPC_Multi", &in, &ret)
fmt.Printf("%+v", err)
return false
}

40
Test/SubNet1_Service1.go Normal file
View File

@@ -0,0 +1,40 @@
package main
import (
"github.com/duanhf2012/origin/originnode"
"github.com/duanhf2012/origin/service"
)
type InputData struct {
A1 int
A2 int
}
type SubNet1_Service1 struct {
service.BaseService
}
func init() {
originnode.InitService(&SubNet1_Service1{})
}
//OnInit ...
func (ws *SubNet1_Service1) OnInit() error {
return nil
}
//OnRun ...
func (ws *SubNet1_Service1) OnRun() bool {
return false
}
//服务要对外的接口规划如下:
//RPC_MethodName(arg *DataType1, ret *DataType2) error
//如果不符合规范,在加载服务时,该函数将不会被映射,其他服务将不允能调用。
func (slf *SubNet1_Service1) RPC_Add(arg *InputData, ret *int) error {
*ret = arg.A1 + arg.A2
return nil
}

31
Test/SubNet1_Service2.go Normal file
View File

@@ -0,0 +1,31 @@
package main
import (
"github.com/duanhf2012/origin/originnode"
"github.com/duanhf2012/origin/service"
)
type SubNet1_Service2 struct {
service.BaseService
}
func init() {
originnode.InitService(&SubNet1_Service2{})
}
//OnInit ...
func (ws *SubNet1_Service2) OnInit() error {
return nil
}
//OnRun ...
func (ws *SubNet1_Service2) OnRun() bool {
return false
}
func (slf *SubNet1_Service2) RPC_Sub(arg *InputData, ret *int) error {
*ret = arg.A1 - arg.A2
return nil
}

31
Test/SubNet2_Service1.go Normal file
View File

@@ -0,0 +1,31 @@
package main
import (
"github.com/duanhf2012/origin/originnode"
"github.com/duanhf2012/origin/service"
)
type SubNet2_Service1 struct {
service.BaseService
}
func init() {
originnode.InitService(&SubNet2_Service1{})
}
//OnInit ...
func (ws *SubNet2_Service1) OnInit() error {
return nil
}
//OnRun ...
func (ws *SubNet2_Service1) OnRun() bool {
return false
}
func (slf *SubNet2_Service1) RPC_Multi(arg *InputData, ret *int) error {
*ret = arg.A1 * arg.A2
return nil
}

31
Test/SubNet2_Service2.go Normal file
View File

@@ -0,0 +1,31 @@
package main
import (
"github.com/duanhf2012/origin/originnode"
"github.com/duanhf2012/origin/service"
)
type SubNet2_Service2 struct {
service.BaseService
}
func init() {
originnode.InitService(&SubNet2_Service2{})
}
//OnInit ...
func (ws *SubNet2_Service2) OnInit() error {
return nil
}
//OnRun ...
func (ws *SubNet2_Service2) OnRun() bool {
return false
}
func (slf *SubNet2_Service2) RPC_Div(arg *InputData, ret *int) error {
*ret = arg.A1 / arg.A2
return nil
}

View File

@@ -1,20 +1,65 @@
{ {
"PublicServiceList":["logiclog"], "SubNet": [{
"Remark": "Manual,Full,Auto",
"//":" Auto or Manual", "SubNetMode": "Full",
"ClusterPattern":"Auto", "SubNetName": "SubNet1",
"MasterNodeId":1, "PublicServiceList": ["logiclog"],
"NodeList":[ "NodeList": [{
{ "NodeID": 1,
"NodeID":1, "NodeName": "N_Node1",
"NodeName":"N_Node1", "ServiceList": [
"ServerAddr":"127.0.0.1:10000", "HttpServerService",
"ServiceList":["CTestService1","CTestService2","HttpServerService","WSServerService","CWebSockService"], "SubNet1_Service",
"ClusterNode":["N_Node2"] "SubNet1_Service1"
} ],
] "ClusterNode":["SubNet2.N_Node1"]
},
{
"NodeID": 2,
"NodeName": "N_Node2",
"ServiceList": [
"SubNet1_Service2"
],
"ClusterNode":[]
}
]
},
{
"Remark": "Manual,Full,Auto",
"SubNetMode": "Full",
"SubNetName": "SubNet2",
"PublicServiceList": ["logiclog"],
"NodeList": [{
"NodeID": 30,
"NodeName": "N_Node1",
"ServiceList": [
"SubNet2_Service1"
],
"ClusterNode":["URQ.N_Node1"]
},
{
"NodeID": 40,
"NodeName": "N_Node4",
"ServiceList": [
"SubNet2_Service2"
],
"ClusterNode":[]
}
]
}
]
} }

View File

@@ -1,112 +1,25 @@
package main package main
import ( import (
"fmt"
"time"
"github.com/duanhf2012/origin/cluster" "github.com/duanhf2012/origin/cluster"
"github.com/duanhf2012/origin/sysservice"
"github.com/duanhf2012/origin/originnode" "github.com/duanhf2012/origin/originnode"
"github.com/duanhf2012/origin/service" "github.com/duanhf2012/origin/sysservice"
) )
type CTestService1 struct {
service.BaseService
}
func (slf *CTestService1) OnInit() error {
fmt.Println("CTestService1.OnInit")
return nil
}
//RPC回调函数
type InputData struct {
A1 int
A2 int
}
func (slf *CTestService1) OnRun() bool {
fmt.Println("CTestService1.OnRun")
var ret int
input := InputData{100, 11}
err := cluster.Call("CTestService2.RPC_Add", &input, &ret)
fmt.Print(err, "\n", ret, "\n")
return false
}
func (slf *CTestService1) OnEndRun() {
fmt.Println("CTestService1.OnEndRun")
}
//所有的服务如果想对外提供HTTP服务接口规范必需为以下格式
//HTTP_MethodName(request *sysservice.HttpRequest, resp *sysservice.HttpRespone) error
//访问方式http://127.0.0.1:9120/ServiceName/MethodName
//则以下接口访问方式https://proxy.atbc.com:9120/CTestService1/GetInfo
func (slf *CTestService1) HTTP_GetInfo(request *sysservice.HttpRequest, resp *sysservice.HttpRespone) error {
strRet := "{a:\"hello,world!\"}"
resp.Respone = []byte(strRet)
return nil
}
type CTestService2 struct {
service.BaseService
}
func (slf *CTestService2) OnInit() error {
fmt.Println("CTestService2.OnInit")
return nil
}
func (slf *CTestService2) OnRun() bool {
fmt.Println("CTestService2.OnRun")
time.Sleep(time.Second * 5)
return true
}
func (slf *CTestService2) OnEndRun() {
fmt.Println("CTestService2.OnEndRun")
}
//服务要对外的接口规划如下:
//RPC_MethodName(arg *DataType1, ret *DataType2) error
//如果不符合规范,在加载服务时,该函数将不会被映射,其他服务将不允能调用。
func (slf *CTestService2) RPC_Add(arg *InputData, ret *int) error {
*ret = arg.A1 + arg.A2
return nil
}
func main() { func main() {
node := originnode.NewOriginNode() node := originnode.NewOriginNode()
if node == nil { if node == nil {
return return
} }
//1.新增http服务 nodeCfg, _ := cluster.ReadNodeConfig("./config/nodeconfig.json", cluster.GetNodeId())
//该服务比较特殊安装加载完成后会将所有的service符合规范的HTTP_接口映射 httpserver := sysservice.NewHttpServerService(nodeCfg.HttpPort) // http服务
//将能被外部调用 for _, ca := range nodeCfg.CAFile {
httpserver := sysservice.NewHttpServerService(9120) httpserver.SetHttps(ca.CertFile, ca.KeyFile)
}
httpserver.SetPrintRequestTime(true)
//2.新建websocket服务 node.SetupService(httpserver)
//新建websocket消息回调服务网络i/o发生事件时回调OnConnected,OnDisconnect,OnRecvMsg接口。
wss := NewWebSockService()
//新建websocket监听服务
pWS := sysservice.NewWSServerService(9121)
//设置以下证书文件支持https
//pWS.SetWSS("/root/1884337_proxy.atbc.com.pem", "/root/1884337_proxy.atbc.com.key")
//将回调服务安装到websocket监听服务中
pWS.SetupReciver("/wss", wss, false)
//设置以下证书支持wss
//httpserver.SetHttps("/root/1884337_proxy.atbc.com.pem", "/root/1884337_proxy.atbc.com.key")
node.SetupService(&CTestService1{}, &CTestService2{}, httpserver, wss, pWS)
node.Init() node.Init()
node.Start() node.Start()
} }

View File

@@ -1,47 +0,0 @@
package main
import (
"fmt"
"github.com/duanhf2012/origin/network"
"github.com/duanhf2012/origin/service"
"github.com/gorilla/websocket"
)
//CRHDataService ...
type CWebSockService struct {
service.BaseService
network.BaseMessageReciver
//websockServer network.IWebsocketServer
}
//NewCRHService ...
func NewWebSockService() *CWebSockService {
wss := new(CWebSockService)
return wss
}
//OnInit ...
func (ws *CWebSockService) OnInit() error {
return nil
}
//OnRun ...
func (ws *CWebSockService) OnRun() bool {
return false
}
func (ws *CWebSockService) OnConnected(clientid uint64) {
date := []byte("CWebSockService OnConnected!..")
ws.WsServer.SendMsg(clientid, websocket.TextMessage, date)
}
func (ws *CWebSockService) OnDisconnect(clientid uint64, err error) {
fmt.Print("CWebSockService OnDisconnect")
}
func (ws *CWebSockService) OnRecvMsg(clientid uint64, msgtype int, data []byte) {
date := []byte("OnRecvMsg!..CWebSockService")
ws.WsServer.SendMsg(clientid, websocket.TextMessage, date)
}

View File

@@ -52,6 +52,17 @@ func GenNodeName(subnetName string, nodename string) string {
return nodename return nodename
} }
func AddCluster(clusterNodeNameList *[]string, nodename string) bool {
for _, n := range *clusterNodeNameList {
if n == nodename {
return false
}
}
*clusterNodeNameList = append(*clusterNodeNameList, nodename)
return true
}
func ReadCfg(path string, nodeid int, mapNodeData map[int]NodeData) (*ClusterConfig, error) { func ReadCfg(path string, nodeid int, mapNodeData map[int]NodeData) (*ClusterConfig, error) {
clsCfg := &ClusterConfig{} clsCfg := &ClusterConfig{}
clsCfg.mapIdNode = map[int]CNode{} clsCfg.mapIdNode = map[int]CNode{}
@@ -72,7 +83,7 @@ func ReadCfg(path string, nodeid int, mapNodeData map[int]NodeData) (*ClusterCon
} }
//存储所有的nodeid对应cnode信息 //存储所有的nodeid对应cnode信息
var custerNodeNameList []string var clusterNodeNameList []string
for _, c := range clsCfg.SubNet { for _, c := range clsCfg.SubNet {
for _, v := range c.NodeList { for _, v := range c.NodeList {
mapservice := make(map[string]bool, 1) mapservice := make(map[string]bool, 1)
@@ -94,9 +105,9 @@ func ReadCfg(path string, nodeid int, mapNodeData map[int]NodeData) (*ClusterCon
} }
for _, nodename := range v.ClusterNode { for _, nodename := range v.ClusterNode {
custerNodeNameList = append(custerNodeNameList, GenNodeName(c.SubNetName, nodename)) AddCluster(&clusterNodeNameList, GenNodeName(c.SubNetName, nodename))
} }
custerNodeNameList = append(custerNodeNameList, GenNodeName(c.SubNetName, v.NodeName)) AddCluster(&clusterNodeNameList, GenNodeName(c.SubNetName, v.NodeName))
} }
} }
} }
@@ -110,13 +121,13 @@ func ReadCfg(path string, nodeid int, mapNodeData map[int]NodeData) (*ClusterCon
for _, subnet := range clsCfg.SubNet { for _, subnet := range clsCfg.SubNet {
if subnet.SubNetName == clsCfg.currentNode.SubNetName { if subnet.SubNetName == clsCfg.currentNode.SubNetName {
for _, nodes := range subnet.NodeList { for _, nodes := range subnet.NodeList {
custerNodeNameList = append(custerNodeNameList, subnet.SubNetName+"."+nodes.NodeName) AddCluster(&clusterNodeNameList, GenNodeName(subnet.SubNetName, nodes.NodeName))
} }
} }
} }
} }
for _, clusternodename := range custerNodeNameList { for _, clusternodename := range clusterNodeNameList {
for _, c := range clsCfg.SubNet { for _, c := range clsCfg.SubNet {
for _, nodecfg := range c.NodeList { for _, nodecfg := range c.NodeList {
if clusternodename != c.SubNetName+"."+nodecfg.NodeName { if clusternodename != c.SubNetName+"."+nodecfg.NodeName {
@@ -126,7 +137,7 @@ func ReadCfg(path string, nodeid int, mapNodeData map[int]NodeData) (*ClusterCon
if ok == false { if ok == false {
return nil, errors.New(fmt.Sprintf("Cannot find NodeId %d in cluster.json!", nodecfg.NodeID)) return nil, errors.New(fmt.Sprintf("Cannot find NodeId %d in cluster.json!", nodecfg.NodeID))
} }
clsCfg.mapClusterNodeService[clusternodename] = append(clsCfg.mapClusterNodeService[clusternodename], n) clsCfg.mapClusterNodeService[nodecfg.NodeName] = append(clsCfg.mapClusterNodeService[nodecfg.NodeName], n)
for _, sname := range nodecfg.ServiceList { for _, sname := range nodecfg.ServiceList {
clsCfg.mapClusterServiceNode[sname] = append(clsCfg.mapClusterServiceNode[sname], n) clsCfg.mapClusterServiceNode[sname] = append(clsCfg.mapClusterServiceNode[sname], n)
} }