mirror of
https://github.com/duanhf2012/origin.git
synced 2026-02-03 22:45:13 +08:00
@@ -2,7 +2,6 @@ package cluster
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/duanhf2012/origin/log"
|
||||
"github.com/duanhf2012/origin/rpc"
|
||||
"github.com/duanhf2012/origin/service"
|
||||
"strings"
|
||||
@@ -108,39 +107,24 @@ func (slf *Cluster) GetRpcClient(nodeid int) *rpc.Client {
|
||||
return c.client
|
||||
}
|
||||
|
||||
func GetRpcClient(nodeId int,serviceMethod string) ([]*rpc.Client,error) {
|
||||
var rpcClientList []*rpc.Client
|
||||
func GetRpcClient(nodeId int,serviceMethod string,clientList *[]*rpc.Client) error {
|
||||
if nodeId>0 {
|
||||
pClient := GetCluster().GetRpcClient(nodeId)
|
||||
if pClient==nil {
|
||||
return rpcClientList,fmt.Errorf("cannot find nodeid %d!",nodeId)
|
||||
return fmt.Errorf("cannot find nodeid %d!",nodeId)
|
||||
}
|
||||
rpcClientList = append(rpcClientList,pClient)
|
||||
return rpcClientList,nil
|
||||
*clientList = append(*clientList,pClient)
|
||||
return nil
|
||||
}
|
||||
|
||||
serviceAndMethod := strings.Split(serviceMethod,".")
|
||||
if len(serviceAndMethod)!=2 {
|
||||
return nil,fmt.Errorf("servicemethod param %s is error!",serviceMethod)
|
||||
return fmt.Errorf("servicemethod param %s is error!",serviceMethod)
|
||||
}
|
||||
|
||||
//1.找到对应的rpcnodeid
|
||||
|
||||
nodeidList := GetCluster().GetNodeIdByService(serviceAndMethod[0])
|
||||
if len(nodeidList) ==0 {
|
||||
return rpcClientList,fmt.Errorf("Cannot Find %s nodeid",serviceMethod)
|
||||
}
|
||||
|
||||
for _,nodeid:= range nodeidList {
|
||||
pClient := GetCluster().GetRpcClient(nodeid)
|
||||
if pClient==nil {
|
||||
log.Error("Cannot connect node id %d",nodeid)
|
||||
continue
|
||||
}
|
||||
rpcClientList = append(rpcClientList,pClient)
|
||||
}
|
||||
|
||||
return rpcClientList,nil
|
||||
GetCluster().GetNodeIdByService(serviceAndMethod[0],clientList)
|
||||
return nil
|
||||
}
|
||||
|
||||
func GetRpcServer() *rpc.Server{
|
||||
|
||||
@@ -4,6 +4,7 @@ import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"github.com/duanhf2012/origin/log"
|
||||
"github.com/duanhf2012/origin/rpc"
|
||||
"io/ioutil"
|
||||
"strings"
|
||||
)
|
||||
@@ -195,18 +196,23 @@ func (slf *Cluster) IsConfigService(servicename string) bool {
|
||||
return ok
|
||||
}
|
||||
|
||||
func (slf *Cluster) GetNodeIdByService(servicename string) []int{
|
||||
var nodelist []int
|
||||
|
||||
|
||||
func (slf *Cluster) GetNodeIdByService(servicename string,rpcClientList *[]*rpc.Client) {
|
||||
nodeInfoList,ok := slf.localSubNetMapService[servicename]
|
||||
if ok == true {
|
||||
for _,node := range nodeInfoList {
|
||||
nodelist = append(nodelist,node.NodeId)
|
||||
pClient := GetCluster().GetRpcClient(node.NodeId)
|
||||
if pClient==nil {
|
||||
log.Error("Cannot connect node id %d",node.NodeId)
|
||||
continue
|
||||
}
|
||||
*rpcClientList = append(*rpcClientList,pClient)
|
||||
}
|
||||
}
|
||||
|
||||
return nodelist
|
||||
}
|
||||
|
||||
|
||||
func (slf *Cluster) getServiceCfg(servicename string) interface{}{
|
||||
v,ok := slf.localServiceCfg[servicename]
|
||||
if ok == false {
|
||||
|
||||
@@ -105,7 +105,7 @@ func (slf *Client) generateSeq() uint64{
|
||||
return atomic.AddUint64(&slf.startSeq,1)
|
||||
}
|
||||
|
||||
func (slf *Client) AsycGo(rpcHandler IRpcHandler,serviceMethod string,callback reflect.Value, args interface{},replyParam interface{}) error {
|
||||
func (slf *Client) AsycCall(rpcHandler IRpcHandler,serviceMethod string,callback reflect.Value, args interface{},replyParam interface{}) error {
|
||||
call := MakeCall()
|
||||
call.Reply = replyParam
|
||||
call.callback = &callback
|
||||
@@ -161,21 +161,27 @@ func (slf *Client) Go(noReply bool,serviceMethod string, args interface{},reply
|
||||
request := &RpcRequest{}
|
||||
call.Arg = args
|
||||
call.Seq = slf.generateSeq()
|
||||
if noReply == false {
|
||||
slf.AddPending(call)
|
||||
}
|
||||
request.RpcRequestData = processor.MakeRpcRequest(slf.startSeq,serviceMethod,noReply,InParam)
|
||||
bytes,err := processor.Marshal(request.RpcRequestData)
|
||||
processor.ReleaseRpcRequest(request.RpcRequestData)
|
||||
if err != nil {
|
||||
call.Err = err
|
||||
slf.RemovePending(call.Seq)
|
||||
return call
|
||||
}
|
||||
|
||||
if slf.conn == nil {
|
||||
call.Err = fmt.Errorf("call %s is fail,rpc client is disconnect.",serviceMethod)
|
||||
slf.RemovePending(call.Seq)
|
||||
return call
|
||||
}
|
||||
|
||||
err = slf.conn.WriteMsg(bytes)
|
||||
if err != nil {
|
||||
slf.RemovePending(call.Seq)
|
||||
call.Err = err
|
||||
}
|
||||
|
||||
|
||||
@@ -77,7 +77,7 @@ func (slf *JsonProcessor) ReleaseRpcRespose(rpcRequestData IRpcResponseData){
|
||||
rpcJsonResponeDataPool.Put(rpcRequestData)
|
||||
}
|
||||
|
||||
func (slf *JsonRpcRequestData) IsReply() bool{
|
||||
func (slf *JsonRpcRequestData) IsNoReply() bool{
|
||||
return slf.NoReply
|
||||
}
|
||||
|
||||
|
||||
@@ -84,7 +84,7 @@ func (slf *MsgpProcessor) ReleaseRpcRespose(rpcRequestData IRpcResponseData){
|
||||
rpcResponeDataPool.Put(rpcRequestData)
|
||||
}
|
||||
|
||||
func (slf *MsgpRpcRequestData) IsReply() bool{
|
||||
func (slf *MsgpRpcRequestData) IsNoReply() bool{
|
||||
return slf.NoReply
|
||||
}
|
||||
|
||||
|
||||
@@ -69,7 +69,7 @@ func (slf *PBProcessor) ReleaseRpcRespose(rpcRequestData IRpcRequestData){
|
||||
}
|
||||
|
||||
|
||||
func (slf *PBRpcRequestData) IsReply() bool{
|
||||
func (slf *PBRpcRequestData) IsNoReply() bool{
|
||||
return slf.GetNoReply()
|
||||
}
|
||||
|
||||
|
||||
@@ -22,7 +22,7 @@ type IRpcRequestData interface {
|
||||
GetSeq() uint64
|
||||
GetServiceMethod() string
|
||||
GetInParam() []byte
|
||||
IsReply() bool
|
||||
IsNoReply() bool
|
||||
}
|
||||
|
||||
type IRpcResponseData interface {
|
||||
|
||||
@@ -10,7 +10,7 @@ import (
|
||||
"unicode/utf8"
|
||||
)
|
||||
|
||||
type FuncRpcClient func(nodeid int,serviceMethod string) ([]*Client,error)
|
||||
type FuncRpcClient func(nodeid int,serviceMethod string,client *[]*Client) error
|
||||
type FuncRpcServer func() (*Server)
|
||||
var NilError = reflect.Zero(reflect.TypeOf((*error)(nil)).Elem())
|
||||
|
||||
@@ -279,7 +279,8 @@ func (slf *RpcHandler) CallMethod(ServiceMethod string,param interface{},reply i
|
||||
}
|
||||
|
||||
func (slf *RpcHandler) goRpc(bCast bool,nodeId int,serviceMethod string,args interface{}) error {
|
||||
pClientList,err := slf.funcRpcClient(nodeId,serviceMethod)
|
||||
var pClientList []*Client
|
||||
err := slf.funcRpcClient(nodeId,serviceMethod,&pClientList)
|
||||
if err != nil {
|
||||
log.Error("Call serviceMethod is error:%+v!",err)
|
||||
return err
|
||||
@@ -319,7 +320,7 @@ func (slf *RpcHandler) goRpc(bCast bool,nodeId int,serviceMethod string,args int
|
||||
}
|
||||
|
||||
//跨node调用
|
||||
pCall := pClient.Go(false,serviceMethod,args,nil)
|
||||
pCall := pClient.Go(true,serviceMethod,args,nil)
|
||||
if pCall.Err!=nil {
|
||||
err = pCall.Err
|
||||
}
|
||||
@@ -330,7 +331,8 @@ func (slf *RpcHandler) goRpc(bCast bool,nodeId int,serviceMethod string,args int
|
||||
}
|
||||
|
||||
func (slf *RpcHandler) callRpc(nodeId int,serviceMethod string,args interface{},reply interface{}) error {
|
||||
pClientList,err := slf.funcRpcClient(nodeId,serviceMethod)
|
||||
var pClientList []*Client
|
||||
err := slf.funcRpcClient(nodeId,serviceMethod,&pClientList)
|
||||
if err != nil {
|
||||
log.Error("Call serviceMethod is error:%+v!",err)
|
||||
return err
|
||||
@@ -367,6 +369,7 @@ func (slf *RpcHandler) callRpc(nodeId int,serviceMethod string,args interface{},
|
||||
//跨node调用
|
||||
pCall := pClient.Go(false,serviceMethod,args,reply)
|
||||
if pCall.Err != nil {
|
||||
ReleaseCall(pCall)
|
||||
return pCall.Err
|
||||
}
|
||||
err = pCall.Done().Err
|
||||
@@ -395,14 +398,15 @@ func (slf *RpcHandler) asyncCallRpc(nodeid int,serviceMethod string,args interfa
|
||||
}
|
||||
|
||||
reply := reflect.New(fVal.Type().In(0).Elem()).Interface()
|
||||
pClientList,err := slf.funcRpcClient(nodeid,serviceMethod)
|
||||
var pClientList []*Client
|
||||
err := slf.funcRpcClient(nodeid,serviceMethod,&pClientList)
|
||||
if err != nil {
|
||||
fVal.Call([]reflect.Value{reflect.ValueOf(reply),reflect.ValueOf(err)})
|
||||
log.Error("Call serviceMethod is error:%+v!",err)
|
||||
return nil
|
||||
}
|
||||
|
||||
if len(pClientList) > 1 {
|
||||
if pClientList== nil || len(pClientList) > 1 {
|
||||
err := fmt.Errorf("Cannot call more then 1 node!")
|
||||
fVal.Call([]reflect.Value{reflect.ValueOf(reply),reflect.ValueOf(err)})
|
||||
log.Error("Cannot call more then 1 node!")
|
||||
@@ -449,7 +453,7 @@ func (slf *RpcHandler) asyncCallRpc(nodeid int,serviceMethod string,args interfa
|
||||
}
|
||||
|
||||
//跨node调用
|
||||
err = pClient.AsycGo(slf,serviceMethod,fVal,args,reply)
|
||||
err = pClient.AsycCall(slf,serviceMethod,fVal,args,reply)
|
||||
if err != nil {
|
||||
fVal.Call([]reflect.Value{reflect.ValueOf(reply),reflect.ValueOf(err)})
|
||||
}
|
||||
|
||||
@@ -140,7 +140,7 @@ func (agent *RpcAgent) Run() {
|
||||
continue
|
||||
}
|
||||
|
||||
if req.RpcRequestData.IsReply()== false {
|
||||
if req.RpcRequestData.IsNoReply()==false {
|
||||
req.requestHandle = func(Returns interface{},Err *RpcError){
|
||||
agent.WriteRespone(req.RpcRequestData.GetServiceMethod(),req.RpcRequestData.GetSeq(),Returns,Err)
|
||||
}
|
||||
@@ -149,7 +149,11 @@ func (agent *RpcAgent) Run() {
|
||||
err = rpcHandler.PushRequest(req)
|
||||
if err != nil {
|
||||
rpcError := RpcError(err.Error())
|
||||
agent.WriteRespone(req.RpcRequestData.GetServiceMethod(),req.RpcRequestData.GetSeq(),nil,&rpcError)
|
||||
|
||||
if req.RpcRequestData.IsNoReply() {
|
||||
agent.WriteRespone(req.RpcRequestData.GetServiceMethod(),req.RpcRequestData.GetSeq(),nil,&rpcError)
|
||||
}
|
||||
|
||||
processor.ReleaseRpcRequest(req.RpcRequestData)
|
||||
ReleaseRpcRequest(req)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user