新增私有服务功能(结点中服务配置加前缀_)

This commit is contained in:
boyce
2020-11-04 10:56:11 +08:00
parent c7d900ace4
commit 2fc1217b18
5 changed files with 63 additions and 32 deletions

View File

@@ -1,5 +1,6 @@
package cluster package cluster
import "strings"
type ConfigDiscovery struct { type ConfigDiscovery struct {
funDelService FunDelNode funDelService FunDelNode
@@ -7,6 +8,17 @@ type ConfigDiscovery struct {
localNodeId int localNodeId int
} }
func (discovery *ConfigDiscovery) privateService(nodeInfo *NodeInfo){
var serviceList []string
for _,s := range nodeInfo.ServiceList {
if strings.HasPrefix(s,"_") {
continue
}
serviceList = append(serviceList,s)
}
nodeInfo.ServiceList = serviceList
}
func (discovery *ConfigDiscovery) Init(localNodeId int) error{ func (discovery *ConfigDiscovery) Init(localNodeId int) error{
discovery.localNodeId = localNodeId discovery.localNodeId = localNodeId
@@ -20,6 +32,8 @@ func (discovery *ConfigDiscovery) Init(localNodeId int) error{
if nodeInfo.NodeId == localNodeId { if nodeInfo.NodeId == localNodeId {
continue continue
} }
//去除私有服务
discovery.privateService(&nodeInfo)
discovery.funSetService(&nodeInfo) discovery.funSetService(&nodeInfo)
} }

View File

@@ -146,6 +146,12 @@ func (cls *Cluster) parseLocalCfg(){
} }
} }
func (cls *Cluster) localPrivateService(localNodeInfo *NodeInfo){
for i:=0;i<len(localNodeInfo.ServiceList);i++{
localNodeInfo.ServiceList[i] = strings.TrimLeft(localNodeInfo.ServiceList[i],"_")
}
}
func (cls *Cluster) InitCfg(localNodeId int) error{ func (cls *Cluster) InitCfg(localNodeId int) error{
cls.localServiceCfg = map[string]interface{}{} cls.localServiceCfg = map[string]interface{}{}
cls.mapRpc = map[int] NodeRpcInfo{} cls.mapRpc = map[int] NodeRpcInfo{}
@@ -158,6 +164,7 @@ func (cls *Cluster) InitCfg(localNodeId int) error{
return err return err
} }
cls.localNodeInfo = nodeInfoList[0] cls.localNodeInfo = nodeInfoList[0]
cls.localPrivateService(&cls.localNodeInfo)
//读取本地服务配置 //读取本地服务配置
err = cls.readLocalService(localNodeId) err = cls.readLocalService(localNodeId)

View File

@@ -88,6 +88,9 @@ func (slf *RpcRequest) Clear() *RpcRequest{
slf.localParam = nil slf.localParam = nil
slf.requestHandle = nil slf.requestHandle = nil
slf.callback = nil slf.callback = nil
slf.bLocalRequest = false
slf.inputArgs = nil
slf.rpcProcessor = nil
return slf return slf
} }
@@ -101,6 +104,7 @@ func (call *Call) Clear() *Call{
call.ServiceMethod = "" call.ServiceMethod = ""
call.Reply = nil call.Reply = nil
call.Response = nil call.Response = nil
call.done = nil
call.Err = nil call.Err = nil
call.connId = 0 call.connId = 0
call.callback = nil call.callback = nil

View File

@@ -360,8 +360,8 @@ func (handler *RpcHandler) goRpc(processor IRpcProcessor,bCast bool,nodeId int,s
if pClient.bSelfNode == true { if pClient.bSelfNode == true {
pLocalRpcServer:= handler.funcRpcServer() pLocalRpcServer:= handler.funcRpcServer()
//判断是否是同一服务 //判断是否是同一服务
sMethod := strings.Split(serviceMethod,".") findIndex := strings.Index(serviceMethod,".")
if len(sMethod)!=2 { if findIndex==-1 {
sErr := fmt.Errorf("Call serviceMethod %s is error!",serviceMethod) sErr := fmt.Errorf("Call serviceMethod %s is error!",serviceMethod)
log.Error("%+v", sErr) log.Error("%+v", sErr)
if sErr != nil { if sErr != nil {
@@ -369,13 +369,14 @@ func (handler *RpcHandler) goRpc(processor IRpcProcessor,bCast bool,nodeId int,s
} }
continue continue
} }
//调用自己rpcHandler处理器 serviceName := serviceMethod[:findIndex]
if sMethod[0] == handler.rpcHandler.GetName() { //自己服务调用 method := serviceMethod[findIndex+1:]
// if serviceName == handler.rpcHandler.GetName() { //自己服务调用
return pLocalRpcServer.myselfRpcHandlerGo(sMethod[0],sMethod[1],args,nil) //调用自己rpcHandler处理器
return pLocalRpcServer.myselfRpcHandlerGo(serviceName,method,args,nil)
} }
//其他的rpcHandler的处理器 //其他的rpcHandler的处理器
pCall := pLocalRpcServer.selfNodeRpcHandlerGo(processor,pClient,true,sMethod[0],sMethod[1],args,nil,nil) pCall := pLocalRpcServer.selfNodeRpcHandlerGo(processor,pClient,true,serviceName,method,args,nil,nil)
if pCall.Err!=nil { if pCall.Err!=nil {
err = pCall.Err err = pCall.Err
} }
@@ -412,19 +413,20 @@ func (handler *RpcHandler) callRpc(nodeId int,serviceMethod string,args interfac
if pClient.bSelfNode == true { if pClient.bSelfNode == true {
pLocalRpcServer:= handler.funcRpcServer() pLocalRpcServer:= handler.funcRpcServer()
//判断是否是同一服务 //判断是否是同一服务
sMethod := strings.Split(serviceMethod,".") findIndex := strings.Index(serviceMethod,".")
if len(sMethod)!=2 { if findIndex==-1 {
err := fmt.Errorf("Call serviceMethod %s is error!",serviceMethod) err := fmt.Errorf("Call serviceMethod %s is error!",serviceMethod)
log.Error("%+v",err) log.Error("%+v",err)
return err return err
} }
//调用自己rpcHandler处理器 serviceName := serviceMethod[:findIndex]
if sMethod[0] == handler.rpcHandler.GetName() { //自己服务调用 method := serviceMethod[findIndex+1:]
// if serviceName == handler.rpcHandler.GetName() { //自己服务调用
return pLocalRpcServer.myselfRpcHandlerGo(sMethod[0],sMethod[1],args,reply) //调用自己rpcHandler处理器
return pLocalRpcServer.myselfRpcHandlerGo(serviceName,method,args,reply)
} }
//其他的rpcHandler的处理器 //其他的rpcHandler的处理器
pCall := pLocalRpcServer.selfNodeRpcHandlerGo(nil,pClient,false,sMethod[0],sMethod[1],args,reply,nil) pCall := pLocalRpcServer.selfNodeRpcHandlerGo(nil,pClient,false,serviceName,method,args,reply,nil)
err = pCall.Done().Err err = pCall.Done().Err
pClient.RemovePending(pCall.Seq) pClient.RemovePending(pCall.Seq)
ReleaseCall(pCall) ReleaseCall(pCall)
@@ -484,16 +486,18 @@ func (handler *RpcHandler) asyncCallRpc(nodeid int,serviceMethod string,args int
if pClient.bSelfNode == true { if pClient.bSelfNode == true {
pLocalRpcServer:= handler.funcRpcServer() pLocalRpcServer:= handler.funcRpcServer()
//判断是否是同一服务 //判断是否是同一服务
sMethod := strings.Split(serviceMethod,".") findIndex := strings.Index(serviceMethod,".")
if len(sMethod)!=2 { if findIndex==-1 {
err := fmt.Errorf("Call serviceMethod %s is error!",serviceMethod) err := fmt.Errorf("Call serviceMethod %s is error!",serviceMethod)
fVal.Call([]reflect.Value{reflect.ValueOf(reply),reflect.ValueOf(err)}) fVal.Call([]reflect.Value{reflect.ValueOf(reply),reflect.ValueOf(err)})
log.Error("%+v",err) log.Error("%+v",err)
return nil return nil
} }
serviceName := serviceMethod[:findIndex]
method := serviceMethod[findIndex+1:]
//调用自己rpcHandler处理器 //调用自己rpcHandler处理器
if sMethod[0] == handler.rpcHandler.GetName() { //自己服务调用 if serviceName == handler.rpcHandler.GetName() { //自己服务调用
err := pLocalRpcServer.myselfRpcHandlerGo(sMethod[0],sMethod[1],args,reply) err := pLocalRpcServer.myselfRpcHandlerGo(serviceName,method,args,reply)
if err == nil { if err == nil {
fVal.Call([]reflect.Value{reflect.ValueOf(reply),NilError}) fVal.Call([]reflect.Value{reflect.ValueOf(reply),NilError})
}else{ }else{
@@ -503,13 +507,13 @@ func (handler *RpcHandler) asyncCallRpc(nodeid int,serviceMethod string,args int
//其他的rpcHandler的处理器 //其他的rpcHandler的处理器
if callback!=nil { if callback!=nil {
err = pLocalRpcServer.selfNodeRpcHandlerAsyncGo(pClient, handler,false,sMethod[0],sMethod[1],args,reply,fVal) err = pLocalRpcServer.selfNodeRpcHandlerAsyncGo(pClient, handler,false,serviceName,method,args,reply,fVal)
if err != nil { if err != nil {
fVal.Call([]reflect.Value{reflect.ValueOf(reply),reflect.ValueOf(err)}) fVal.Call([]reflect.Value{reflect.ValueOf(reply),reflect.ValueOf(err)})
} }
return nil return nil
} }
pCall := pLocalRpcServer.selfNodeRpcHandlerGo(nil,pClient,false,sMethod[0],sMethod[1],args,reply,nil) pCall := pLocalRpcServer.selfNodeRpcHandlerGo(nil,pClient,false,serviceName,method,args,reply,nil)
err = pCall.Done().Err err = pCall.Done().Err
pClient.RemovePending(pCall.Seq) pClient.RemovePending(pCall.Seq)
ReleaseCall(pCall) ReleaseCall(pCall)
@@ -582,8 +586,8 @@ func (handler *RpcHandler) RawGoNode(rpcProcessorType RpcProcessorType,nodeId in
if pClient.bSelfNode == true { if pClient.bSelfNode == true {
pLocalRpcServer:= handler.funcRpcServer() pLocalRpcServer:= handler.funcRpcServer()
//判断是否是同一服务 //判断是否是同一服务
sMethod := strings.Split(serviceMethod,".") findIndex := strings.Index(serviceMethod,".")
if len(sMethod)!=2 { if findIndex==-1 {
serr := fmt.Errorf("Call serviceMethod %s is error!",serviceMethod) serr := fmt.Errorf("Call serviceMethod %s is error!",serviceMethod)
log.Error("%+v",serr) log.Error("%+v",serr)
if serr!= nil { if serr!= nil {
@@ -591,15 +595,17 @@ func (handler *RpcHandler) RawGoNode(rpcProcessorType RpcProcessorType,nodeId in
} }
continue continue
} }
serviceName := serviceMethod[:findIndex]
method := serviceMethod[findIndex+1:]
//调用自己rpcHandler处理器 //调用自己rpcHandler处理器
if sMethod[0] == handler.rpcHandler.GetName() { //自己服务调用 if serviceName == handler.rpcHandler.GetName() { //自己服务调用
err:= pLocalRpcServer.myselfRpcHandlerGo(sMethod[0],sMethod[1],args,nil) err:= pLocalRpcServer.myselfRpcHandlerGo(serviceName,method,args,nil)
args.DoGc() args.DoGc()
return err return err
} }
//其他的rpcHandler的处理器 //其他的rpcHandler的处理器
pCall := pLocalRpcServer.selfNodeRpcHandlerGo(processor,pClient,true,sMethod[0],sMethod[1],nil,nil,args) pCall := pLocalRpcServer.selfNodeRpcHandlerGo(processor,pClient,true,serviceName,method,nil,nil,args)
if pCall.Err!=nil { if pCall.Err!=nil {
err = pCall.Err err = pCall.Err
} }

View File

@@ -183,8 +183,8 @@ func (r *Router) GetRouterId(clientId uint64,serviceName *string) int {
return routerId return routerId
} }
func (r *Router) SetRouterId(clientId uint64,serviceName *string,routerId int){ func (r *Router) SetRouterId(clientId uint64,serviceName string,routerId int){
r.mapClientRouterCache[clientId][*serviceName] = routerId r.mapClientRouterCache[clientId][serviceName] = routerId
} }
type RawInputArgs struct { type RawInputArgs struct {
@@ -212,21 +212,21 @@ func (args RawInputArgs) DoGc() {
network.ReleaseByteSlice(args.rawData) network.ReleaseByteSlice(args.rawData)
} }
func (r *Router) RouterMessage(clientId uint64,msgType uint16,msg []byte) { func (r *Router) RouterMessage(cliId uint64,msgType uint16,msg []byte) {
routerInfo:= r.GetMsgRouterService(msgType) routerInfo:= r.GetMsgRouterService(msgType)
if routerInfo==nil { if routerInfo==nil {
log.Error("The message type is %d with no configured route!",msgType) log.Error("The message type is %d with no configured route!",msgType)
return return
} }
routerId := r.GetRouterId(clientId,&routerInfo.ServiceName) routerId := r.GetRouterId(cliId,&routerInfo.ServiceName)
if routerId ==0 { if routerId ==0 {
routerId = r.loadBalance.SelectNode(routerInfo.ServiceName) routerId = r.loadBalance.SelectNode(routerInfo.ServiceName)
r.SetRouterId(clientId,&routerInfo.ServiceName,routerId) r.SetRouterId(cliId,routerInfo.ServiceName,routerId)
} }
if routerId>0 { if routerId>0 {
r.rpcHandler.RawGoNode(rpc.RpcProcessorPb,routerId,routerInfo.Rpc,RawInputArgs{rawData: msg,clientId: clientId}) r.rpcHandler.RawGoNode(rpc.RpcProcessorPb,routerId,routerInfo.Rpc,RawInputArgs{rawData: msg,clientId: cliId})
} }
} }
@@ -243,7 +243,7 @@ func (r *Router) RouterEvent(clientId uint64,eventType string) bool{
routerId := r.GetRouterId(clientId,&routerInfo.ServiceName) routerId := r.GetRouterId(clientId,&routerInfo.ServiceName)
if routerId ==0 { if routerId ==0 {
routerId = r.loadBalance.SelectNode(routerInfo.ServiceName) routerId = r.loadBalance.SelectNode(routerInfo.ServiceName)
r.SetRouterId(clientId,&routerInfo.ServiceName,routerId) r.SetRouterId(clientId,routerInfo.ServiceName,routerId)
} }
if routerId>0 { if routerId>0 {