From 59efc05d2476bccbbef4124b0c8e58699fba05d7 Mon Sep 17 00:00:00 2001 From: boyce Date: Tue, 23 Apr 2024 10:44:21 +0800 Subject: [PATCH] =?UTF-8?q?=E6=96=B0=E5=A2=9Erpc=20nats=E6=A8=A1=E5=9D=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cluster/cluster.go | 35 +++- cluster/configdiscovery.go | 2 +- cluster/origindiscovery.go | 5 +- cluster/parsecfg.go | 70 ++++++- network/tcp_server.go | 19 +- node/node.go | 7 +- rpc/callset.go | 146 +++++++++++++++ rpc/client.go | 362 +++++++++++++++++++++++-------------- rpc/lclient.go | 20 +- rpc/lserver.go | 291 +++++++++++++++++++++++++++++ rpc/natsclient.go | 89 +++++++++ rpc/natsserver.go | 119 ++++++++++++ rpc/rclient.go | 224 +++-------------------- rpc/rpchandler.go | 10 +- rpc/rpcnats.go | 41 +++++ rpc/server.go | 319 ++++---------------------------- 16 files changed, 1101 insertions(+), 658 deletions(-) create mode 100644 rpc/callset.go create mode 100644 rpc/lserver.go create mode 100644 rpc/natsclient.go create mode 100644 rpc/natsserver.go create mode 100644 rpc/rpcnats.go diff --git a/cluster/cluster.go b/cluster/cluster.go index 168ea47..cc2a8a3 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -52,19 +52,21 @@ type Cluster struct { localNodeInfo NodeInfo //本结点配置信息 discoveryInfo DiscoveryInfo //服务发现配置 + rpcMode RpcMode //masterDiscoveryNodeList []NodeInfo //配置发现Master结点 globalCfg interface{} //全局配置 localServiceCfg map[string]interface{} //map[serviceName]配置数据* serviceDiscovery IServiceDiscovery //服务发现接口 - locker sync.RWMutex //结点与服务关系保护锁 mapRpc map[string]*NodeRpcInfo //nodeId - //mapIdNode map[int]NodeInfo //map[NodeId]NodeInfo mapServiceNode map[string]map[string]struct{} //map[serviceName]map[NodeId] - rpcServer rpc.Server + callSet rpc.CallSet + rpcNats rpc.RpcNats + rpcServer rpc.IServer + rpcEventLocker sync.RWMutex //Rpc事件监听保护锁 mapServiceListenRpcEvent map[string]struct{} //ServiceName mapServiceListenDiscoveryEvent map[string]struct{} //ServiceName @@ -82,11 +84,12 @@ func SetServiceDiscovery(serviceDiscovery IServiceDiscovery) { cluster.serviceDiscovery = serviceDiscovery } -func (cls *Cluster) Start() { - cls.rpcServer.Start(cls.localNodeInfo.ListenAddr, cls.localNodeInfo.MaxRpcParamLen,cls.localNodeInfo.CompressBytesLen) +func (cls *Cluster) Start() error{ + return cls.rpcServer.Start() } func (cls *Cluster) Stop() { + cls.rpcServer.Stop() } func (cls *Cluster) DiscardNode(nodeId string) { @@ -191,7 +194,12 @@ func (cls *Cluster) serviceDiscoverySetNodeInfo(nodeInfo *NodeInfo) { //不存在时,则建立连接 rpcInfo := NodeRpcInfo{} rpcInfo.nodeInfo = *nodeInfo - rpcInfo.client =rpc.NewRClient(nodeInfo.NodeId, nodeInfo.ListenAddr, nodeInfo.MaxRpcParamLen,cls.localNodeInfo.CompressBytesLen,cls.triggerRpcEvent) + + if cls.IsNatsMode() { + rpcInfo.client = cls.rpcNats.NewNatsClient(nodeInfo.NodeId, cls.GetLocalNodeInfo().NodeId,&cls.callSet) + }else{ + rpcInfo.client =rpc.NewRClient(nodeInfo.NodeId, nodeInfo.ListenAddr, nodeInfo.MaxRpcParamLen,cls.localNodeInfo.CompressBytesLen,cls.triggerRpcEvent,&cls.callSet) + } cls.mapRpc[nodeInfo.NodeId] = &rpcInfo log.Info("Discovery nodeId and new rpc client",log.String("NodeId", nodeInfo.NodeId),log.Any("services:", nodeInfo.PublicServiceList),log.Bool("Retire",nodeInfo.Retire),log.String("nodeListenAddr",nodeInfo.ListenAddr)) } @@ -204,7 +212,15 @@ func (cls *Cluster) Init(localNodeId string, setupServiceFun SetupServiceFun) er return err } - cls.rpcServer.Init(cls) + cls.callSet.Init() + if cls.IsNatsMode() { + cls.rpcNats.Init(cls.rpcMode.Nats.NatsUrl,cls.rpcMode.Nats.NoRandomize,cls.GetLocalNodeInfo().NodeId,cls.localNodeInfo.CompressBytesLen,cls) + cls.rpcServer = &cls.rpcNats + }else{ + s := &rpc.Server{} + s.Init(cls.localNodeInfo.ListenAddr,cls.localNodeInfo.MaxRpcParamLen,cls.localNodeInfo.CompressBytesLen,cls) + cls.rpcServer = s + } //2.安装服务发现结点 err = cls.setupDiscovery(localNodeId, setupServiceFun) @@ -225,7 +241,6 @@ func (cls *Cluster) Init(localNodeId string, setupServiceFun SetupServiceFun) er return nil } - func (cls *Cluster) FindRpcHandler(serviceName string) rpc.IRpcHandler { pService := service.GetService(serviceName) if pService == nil { @@ -276,8 +291,8 @@ func GetRpcClient(nodeId string, serviceMethod string,filterRetire bool, clientL return GetCluster().GetNodeIdByService(serviceName, clientList, filterRetire) } -func GetRpcServer() *rpc.Server { - return &cluster.rpcServer +func GetRpcServer() rpc.IServer { + return cluster.rpcServer } func (cls *Cluster) IsNodeConnected(nodeId string) bool { diff --git a/cluster/configdiscovery.go b/cluster/configdiscovery.go index 17e18b9..2c28b8b 100644 --- a/cluster/configdiscovery.go +++ b/cluster/configdiscovery.go @@ -15,7 +15,7 @@ func (discovery *ConfigDiscovery) InitDiscovery(localNodeId string,funDelNode Fu discovery.funSetNode = funSetNode //解析本地其他服务配置 - _,nodeInfoList,err := GetCluster().readLocalClusterConfig(rpc.NodeIdNull) + _,nodeInfoList,_,err := GetCluster().readLocalClusterConfig(rpc.NodeIdNull) if err != nil { return err } diff --git a/cluster/origindiscovery.go b/cluster/origindiscovery.go index da0c80a..34fb9c3 100644 --- a/cluster/origindiscovery.go +++ b/cluster/origindiscovery.go @@ -318,7 +318,6 @@ func (dc *OriginDiscoveryClient) addDiscoveryMaster() { continue } dc.funSetNode(&discoveryNodeList.MasterNodeList[i]) - } } @@ -556,6 +555,10 @@ func (cls *Cluster) IsOriginMasterDiscoveryNode(nodeId string) bool { } func (cls *Cluster) getOriginMasterDiscoveryNodeInfo(nodeId string) *NodeInfo { + if cls.discoveryInfo.Origin == nil { + return nil + } + for i := 0; i < len(cls.discoveryInfo.Origin.MasterNodeList); i++ { if cls.discoveryInfo.Origin.MasterNodeList[i].NodeId == nodeId { return &cls.discoveryInfo.Origin.MasterNodeList[i] diff --git a/cluster/parsecfg.go b/cluster/parsecfg.go index b28635f..d8f8207 100644 --- a/cluster/parsecfg.go +++ b/cluster/parsecfg.go @@ -9,6 +9,7 @@ import ( "path/filepath" "strings" "time" + "errors" ) var json = jsoniter.ConfigCompatibleWithStandardLibrary @@ -44,7 +45,19 @@ type DiscoveryInfo struct { Origin *OriginDiscovery //orign } +type NatsConfig struct { + NatsUrl string + NoRandomize bool +} + +type RpcMode struct { + Typ string `json:"Type"` + Nats NatsConfig + +} + type NodeInfoList struct { + RpcMode RpcMode Discovery DiscoveryInfo NodeList []NodeInfo } @@ -183,14 +196,41 @@ func (cls *Cluster) readServiceConfig(filepath string) (interface{}, map[string] return GlobalCfg, serviceConfig, mapNodeService, nil } -func (cls *Cluster) readLocalClusterConfig(nodeId string) (DiscoveryInfo, []NodeInfo, error) { +func (cls *Cluster) SetRpcMode(cfgRpcMode *RpcMode,rpcMode *RpcMode) error { + //忽略掉没有设置的配置 + if cfgRpcMode.Typ == "" { + return nil + } + + //不允许重复的配置Rpc模式 + + if cfgRpcMode.Typ != "" && rpcMode.Typ != ""{ + return errors.New("repeat config RpcMode") + } + + //检查Typ是否合法 + if cfgRpcMode.Typ!="Nats" && cfgRpcMode.Typ!="Default" { + return fmt.Errorf("RpcMode %s is not support", rpcMode.Typ) + } + + if cfgRpcMode.Typ == "Nats" && len(cfgRpcMode.Nats.NatsUrl)==0 { + return fmt.Errorf("nats rpc mode config NatsUrl is empty") + } + + *rpcMode = *cfgRpcMode + + return nil +} + +func (cls *Cluster) readLocalClusterConfig(nodeId string) (DiscoveryInfo, []NodeInfo,RpcMode, error) { var nodeInfoList []NodeInfo var discoveryInfo DiscoveryInfo + var rpcMode RpcMode clusterCfgPath := strings.TrimRight(configDir, "/") + "/cluster" fileInfoList, err := os.ReadDir(clusterCfgPath) if err != nil { - return discoveryInfo, nil, fmt.Errorf("Read dir %s is fail :%+v", clusterCfgPath, err) + return discoveryInfo, nil,rpcMode, fmt.Errorf("Read dir %s is fail :%+v", clusterCfgPath, err) } //读取任何文件,只读符合格式的配置,目录下的文件可以自定义分文件 @@ -200,12 +240,17 @@ func (cls *Cluster) readLocalClusterConfig(nodeId string) (DiscoveryInfo, []Node fileNodeInfoList, rerr := cls.ReadClusterConfig(filePath) if rerr != nil { - return discoveryInfo, nil, fmt.Errorf("read file path %s is error:%+v", filePath, rerr) + return discoveryInfo, nil,rpcMode, fmt.Errorf("read file path %s is error:%+v", filePath, rerr) + } + + err = cls.SetRpcMode(&fileNodeInfoList.RpcMode,&rpcMode) + if err != nil { + return discoveryInfo, nil,rpcMode, err } err = discoveryInfo.setDiscovery(&fileNodeInfoList.Discovery) if err != nil { - return discoveryInfo,nil,err + return discoveryInfo,nil,rpcMode,err } for _, nodeInfo := range fileNodeInfoList.NodeList { @@ -217,7 +262,7 @@ func (cls *Cluster) readLocalClusterConfig(nodeId string) (DiscoveryInfo, []Node } if nodeId != rpc.NodeIdNull && (len(nodeInfoList) != 1) { - return discoveryInfo, nil, fmt.Errorf("%d configurations were found for the configuration with node ID %d!", len(nodeInfoList), nodeId) + return discoveryInfo, nil,rpcMode, fmt.Errorf("%d configurations were found for the configuration with node ID %d!", len(nodeInfoList), nodeId) } for i, _ := range nodeInfoList { @@ -231,7 +276,7 @@ func (cls *Cluster) readLocalClusterConfig(nodeId string) (DiscoveryInfo, []Node } } - return discoveryInfo, nodeInfoList, nil + return discoveryInfo, nodeInfoList, rpcMode,nil } func (cls *Cluster) readLocalService(localNodeId string) error { @@ -325,7 +370,7 @@ func (cls *Cluster) readLocalService(localNodeId string) error { func (cls *Cluster) parseLocalCfg() { rpcInfo := NodeRpcInfo{} rpcInfo.nodeInfo = cls.localNodeInfo - rpcInfo.client = rpc.NewLClient(rpcInfo.nodeInfo.NodeId) + rpcInfo.client = rpc.NewLClient(rpcInfo.nodeInfo.NodeId,&cls.callSet) cls.mapRpc[cls.localNodeInfo.NodeId] = &rpcInfo @@ -338,18 +383,27 @@ func (cls *Cluster) parseLocalCfg() { } } +func (cls *Cluster) IsNatsMode() bool { + return cls.rpcMode.Typ == "Nats" +} + +func (cls *Cluster) GetNatsUrl() string { + return cls.rpcMode.Nats.NatsUrl +} + func (cls *Cluster) InitCfg(localNodeId string) error { cls.localServiceCfg = map[string]interface{}{} cls.mapRpc = map[string]*NodeRpcInfo{} cls.mapServiceNode = map[string]map[string]struct{}{} //加载本地结点的NodeList配置 - discoveryInfo, nodeInfoList, err := cls.readLocalClusterConfig(localNodeId) + discoveryInfo, nodeInfoList,rpcMode, err := cls.readLocalClusterConfig(localNodeId) if err != nil { return err } cls.localNodeInfo = nodeInfoList[0] cls.discoveryInfo = discoveryInfo + cls.rpcMode = rpcMode //读取本地服务配置 err = cls.readLocalService(localNodeId) diff --git a/network/tcp_server.go b/network/tcp_server.go index 2b9b745..227f30e 100644 --- a/network/tcp_server.go +++ b/network/tcp_server.go @@ -6,6 +6,8 @@ import ( "net" "sync" "time" + "fmt" + "errors" ) const( @@ -36,15 +38,20 @@ type TCPServer struct { MsgParser } -func (server *TCPServer) Start() { - server.init() +func (server *TCPServer) Start() error{ + err := server.init() + if err != nil { + return err + } go server.run() + + return nil } -func (server *TCPServer) init() { +func (server *TCPServer) init() error{ ln, err := net.Listen("tcp", server.Addr) if err != nil { - log.Fatal("Listen tcp fail",log.String("error", err.Error())) + return fmt.Errorf("Listen tcp fail,error:%s",err.Error()) } if server.MaxConnNum <= 0 { @@ -89,12 +96,14 @@ func (server *TCPServer) init() { } if server.NewAgent == nil { - log.Fatal("NewAgent must not be nil") + return errors.New("NewAgent must not be nil") } server.ln = ln server.conns = make(ConnSet) server.MsgParser.init() + + return nil } func (server *TCPServer) SetNetMempool(mempool bytespool.IBytesMempool){ diff --git a/node/node.go b/node/node.go index c8c5baa..abd750c 100644 --- a/node/node.go +++ b/node/node.go @@ -326,9 +326,7 @@ func startNode(args interface{}) error { //5.运行集群 cluster.GetCluster().Start() - - - + //6.监听程序退出信号&性能报告 bRun := true var pProfilerTicker *time.Ticker = &time.Ticker{} @@ -352,9 +350,10 @@ func startNode(args interface{}) error { } } - cluster.GetCluster().Stop() + //7.退出 service.StopAllService() + cluster.GetCluster().Stop() log.Info("Server is stop.") log.Close() diff --git a/rpc/callset.go b/rpc/callset.go new file mode 100644 index 0000000..7e25b03 --- /dev/null +++ b/rpc/callset.go @@ -0,0 +1,146 @@ +package rpc + +import ( + "errors" + + "strconv" + "sync" + "sync/atomic" + "time" + "github.com/duanhf2012/origin/v2/log" +) + +type CallSet struct { + pendingLock sync.RWMutex + startSeq uint64 + pending map[uint64]*Call + callRpcTimeout time.Duration + maxCheckCallRpcCount int + + callTimerHeap CallTimerHeap +} + + + +func (cs *CallSet) Init(){ + cs.pendingLock.Lock() + cs.callTimerHeap.Init() + cs.pending = make(map[uint64]*Call,4096) + + cs.maxCheckCallRpcCount = DefaultMaxCheckCallRpcCount + cs.callRpcTimeout = DefaultRpcTimeout + + go cs.checkRpcCallTimeout() + cs.pendingLock.Unlock() +} + +func (bc *CallSet) makeCallFail(call *Call) { + if call.callback != nil && call.callback.IsValid() { + call.rpcHandler.PushRpcResponse(call) + } else { + call.done <- call + } +} + +func (bc *CallSet) checkRpcCallTimeout() { + for{ + time.Sleep(DefaultCheckRpcCallTimeoutInterval) + for i := 0; i < bc.maxCheckCallRpcCount; i++ { + bc.pendingLock.Lock() + + callSeq := bc.callTimerHeap.PopTimeout() + if callSeq == 0 { + bc.pendingLock.Unlock() + break + } + + pCall := bc.pending[callSeq] + if pCall == nil { + bc.pendingLock.Unlock() + log.Error("call seq is not find",log.Uint64("seq", callSeq)) + continue + } + + delete(bc.pending,callSeq) + strTimeout := strconv.FormatInt(int64(pCall.TimeOut.Seconds()), 10) + pCall.Err = errors.New("RPC call takes more than " + strTimeout + " seconds,method is "+pCall.ServiceMethod) + log.Error("call timeout",log.String("error",pCall.Err.Error())) + bc.makeCallFail(pCall) + bc.pendingLock.Unlock() + continue + } + } +} + +func (bc *CallSet) AddPending(call *Call) { + bc.pendingLock.Lock() + + if call.Seq == 0 { + bc.pendingLock.Unlock() + log.Stack("call is error.") + return + } + + bc.pending[call.Seq] = call + bc.callTimerHeap.AddTimer(call.Seq,call.TimeOut) + + bc.pendingLock.Unlock() +} + +func (bc *CallSet) RemovePending(seq uint64) *Call { + if seq == 0 { + return nil + } + bc.pendingLock.Lock() + call := bc.removePending(seq) + bc.pendingLock.Unlock() + return call +} + +func (bc *CallSet) removePending(seq uint64) *Call { + v, ok := bc.pending[seq] + if ok == false { + return nil + } + + bc.callTimerHeap.Cancel(seq) + delete(bc.pending, seq) + return v +} + +func (bc *CallSet) FindPending(seq uint64) (pCall *Call) { + if seq == 0 { + return nil + } + + bc.pendingLock.Lock() + pCall = bc.pending[seq] + bc.pendingLock.Unlock() + + return pCall +} + +func (bc *CallSet) cleanPending(){ + bc.pendingLock.Lock() + for { + callSeq := bc.callTimerHeap.PopFirst() + if callSeq == 0 { + break + } + pCall := bc.pending[callSeq] + if pCall == nil { + log.Error("call Seq is not find",log.Uint64("seq",callSeq)) + continue + } + + delete(bc.pending,callSeq) + pCall.Err = errors.New("nodeid is disconnect ") + bc.makeCallFail(pCall) + } + + bc.pendingLock.Unlock() +} + +func (bc *CallSet) generateSeq() uint64 { + return atomic.AddUint64(&bc.startSeq, 1) +} diff --git a/rpc/client.go b/rpc/client.go index 9d85dde..b9600c3 100644 --- a/rpc/client.go +++ b/rpc/client.go @@ -4,11 +4,9 @@ import ( "errors" "github.com/duanhf2012/origin/v2/network" "reflect" - "strconv" - "sync" - "sync/atomic" "time" "github.com/duanhf2012/origin/v2/log" + "fmt" ) const( @@ -17,8 +15,7 @@ const( DefaultRpcMinMsgLen = 2 DefaultMaxCheckCallRpcCount = 1000 DefaultMaxPendingWriteNum = 200000 - - + DefaultConnectInterval = 2*time.Second DefaultCheckRpcCallTimeoutInterval = 1*time.Second DefaultRpcTimeout = 15*time.Second @@ -26,29 +23,34 @@ const( var clientSeq uint32 +type IWriter interface { + WriteMsg (nodeId string,args ...[]byte) error + IsConnected() bool +} + type IRealClient interface { SetConn(conn *network.TCPConn) Close(waitDone bool) - AsyncCall(timeout time.Duration,rpcHandler IRpcHandler, serviceMethod string, callback reflect.Value, args interface{}, replyParam interface{},cancelable bool) (CancelRpc,error) - Go(timeout time.Duration,rpcHandler IRpcHandler, noReply bool, serviceMethod string, args interface{}, reply interface{}) *Call - RawGo(timeout time.Duration,rpcHandler IRpcHandler,processor IRpcProcessor, noReply bool, rpcMethodId uint32, serviceMethod string, rawArgs []byte, reply interface{}) *Call + AsyncCall(NodeId string,timeout time.Duration,rpcHandler IRpcHandler, serviceMethod string, callback reflect.Value, args interface{}, replyParam interface{},cancelable bool) (CancelRpc,error) + Go(NodeId string,timeout time.Duration,rpcHandler IRpcHandler, noReply bool, serviceMethod string, args interface{}, reply interface{}) *Call + RawGo(NodeId string,timeout time.Duration,rpcHandler IRpcHandler,processor IRpcProcessor, noReply bool, rpcMethodId uint32, serviceMethod string, rawArgs []byte, reply interface{}) *Call IsConnected() bool Run() OnClose() + + Bind(server IServer) } + + type Client struct { clientId uint32 - nodeId string - pendingLock sync.RWMutex - startSeq uint64 - pending map[uint64]*Call - callRpcTimeout time.Duration - maxCheckCallRpcCount int + targetNodeId string + compressBytesLen int - callTimerHeap CallTimerHeap + *CallSet IRealClient } @@ -58,128 +60,218 @@ func (client *Client) NewClientAgent(conn *network.TCPConn) network.Agent { return client } -func (bc *Client) makeCallFail(call *Call) { - if call.callback != nil && call.callback.IsValid() { - call.rpcHandler.PushRpcResponse(call) - } else { - call.done <- call - } -} - -func (bc *Client) checkRpcCallTimeout() { - for{ - time.Sleep(DefaultCheckRpcCallTimeoutInterval) - for i := 0; i < bc.maxCheckCallRpcCount; i++ { - bc.pendingLock.Lock() - - callSeq := bc.callTimerHeap.PopTimeout() - if callSeq == 0 { - bc.pendingLock.Unlock() - break - } - - pCall := bc.pending[callSeq] - if pCall == nil { - bc.pendingLock.Unlock() - log.Error("call seq is not find",log.Uint64("seq", callSeq)) - continue - } - - delete(bc.pending,callSeq) - strTimeout := strconv.FormatInt(int64(pCall.TimeOut.Seconds()), 10) - pCall.Err = errors.New("RPC call takes more than " + strTimeout + " seconds,method is "+pCall.ServiceMethod) - log.Error("call timeout",log.String("error",pCall.Err.Error())) - bc.makeCallFail(pCall) - bc.pendingLock.Unlock() - continue - } - } -} - -func (client *Client) InitPending() { - client.pendingLock.Lock() - client.callTimerHeap.Init() - client.pending = make(map[uint64]*Call,4096) - client.pendingLock.Unlock() -} - -func (bc *Client) AddPending(call *Call) { - bc.pendingLock.Lock() - - if call.Seq == 0 { - bc.pendingLock.Unlock() - log.Stack("call is error.") - return - } - - bc.pending[call.Seq] = call - bc.callTimerHeap.AddTimer(call.Seq,call.TimeOut) - - bc.pendingLock.Unlock() -} - -func (bc *Client) RemovePending(seq uint64) *Call { - if seq == 0 { - return nil - } - bc.pendingLock.Lock() - call := bc.removePending(seq) - bc.pendingLock.Unlock() - return call -} - -func (bc *Client) removePending(seq uint64) *Call { - v, ok := bc.pending[seq] - if ok == false { - return nil - } - - bc.callTimerHeap.Cancel(seq) - delete(bc.pending, seq) - return v -} - -func (bc *Client) FindPending(seq uint64) (pCall *Call) { - if seq == 0 { - return nil - } - - bc.pendingLock.Lock() - pCall = bc.pending[seq] - bc.pendingLock.Unlock() - - return pCall -} - -func (bc *Client) cleanPending(){ - bc.pendingLock.Lock() - for { - callSeq := bc.callTimerHeap.PopFirst() - if callSeq == 0 { - break - } - pCall := bc.pending[callSeq] - if pCall == nil { - log.Error("call Seq is not find",log.Uint64("seq",callSeq)) - continue - } - - delete(bc.pending,callSeq) - pCall.Err = errors.New("nodeid is disconnect ") - bc.makeCallFail(pCall) - } - - bc.pendingLock.Unlock() -} - -func (bc *Client) generateSeq() uint64 { - return atomic.AddUint64(&bc.startSeq, 1) -} - -func (client *Client) GetNodeId() string { - return client.nodeId +func (client *Client) GetTargetNodeId() string { + return client.targetNodeId } func (client *Client) GetClientId() uint32 { return client.clientId } + + +func (client *Client) processRpcResponse(responseData []byte) error{ + bCompress := (responseData[0]>>7) > 0 + processor := GetProcessor(responseData[0]&0x7f) + if processor == nil { + //rc.conn.ReleaseReadMsg(responseData) + err:= errors.New(fmt.Sprintf("cannot find process %d",responseData[0]&0x7f)) + log.Error(err.Error()) + return err + } + + //1.解析head + response := RpcResponse{} + response.RpcResponseData = processor.MakeRpcResponse(0, "", nil) + + //解压缩 + byteData := responseData[1:] + var compressBuff []byte + + if bCompress == true { + var unCompressErr error + compressBuff,unCompressErr = compressor.UncompressBlock(byteData) + if unCompressErr!= nil { + //rc.conn.ReleaseReadMsg(responseData) + err := fmt.Errorf("uncompressBlock failed,err :%s",unCompressErr.Error()) + return err + } + + byteData = compressBuff + } + + err := processor.Unmarshal(byteData, response.RpcResponseData) + if cap(compressBuff) > 0 { + compressor.UnCompressBufferCollection(compressBuff) + } + + //rc.conn.ReleaseReadMsg(bytes) + if err != nil { + processor.ReleaseRpcResponse(response.RpcResponseData) + log.Error("rpcClient Unmarshal head error",log.ErrorAttr("error",err)) + return nil + } + + v := client.RemovePending(response.RpcResponseData.GetSeq()) + if v == nil { + log.Error("rpcClient cannot find seq",log.Uint64("seq",response.RpcResponseData.GetSeq())) + } else { + v.Err = nil + if len(response.RpcResponseData.GetReply()) > 0 { + err = processor.Unmarshal(response.RpcResponseData.GetReply(), v.Reply) + if err != nil { + log.Error("rpcClient Unmarshal body failed",log.ErrorAttr("error",err)) + v.Err = err + } + } + + if response.RpcResponseData.GetErr() != nil { + v.Err = response.RpcResponseData.GetErr() + } + + if v.callback != nil && v.callback.IsValid() { + v.rpcHandler.PushRpcResponse(v) + } else { + v.done <- v + } + } + + processor.ReleaseRpcResponse(response.RpcResponseData) + + return nil +} + + +//func (rc *Client) Go(timeout time.Duration,rpcHandler IRpcHandler,noReply bool, serviceMethod string, args interface{}, reply interface{}) *Call { +// _, processor := GetProcessorType(args) +// InParam, err := processor.Marshal(args) +// if err != nil { +// log.Error("Marshal is fail",log.ErrorAttr("error",err)) +// call := MakeCall() +// call.DoError(err) +// return call +// } +// +// return rc.RawGo(timeout,rpcHandler,processor, noReply, 0, serviceMethod, InParam, reply) +//} + +func (rc *Client) rawGo(nodeId string,w IWriter,timeout time.Duration,rpcHandler IRpcHandler,processor IRpcProcessor, noReply bool, rpcMethodId uint32, serviceMethod string, rawArgs []byte, reply interface{}) *Call { + call := MakeCall() + call.ServiceMethod = serviceMethod + call.Reply = reply + call.Seq = rc.generateSeq() + call.TimeOut = timeout + + request := MakeRpcRequest(processor, call.Seq, rpcMethodId, serviceMethod, noReply, rawArgs) + bytes, err := processor.Marshal(request.RpcRequestData) + ReleaseRpcRequest(request) + + if err != nil { + call.Seq = 0 + log.Error("marshal is fail",log.String("error",err.Error())) + call.DoError(err) + return call + } + + if w == nil || w.IsConnected()==false { + call.Seq = 0 + sErr := errors.New(serviceMethod + " was called failed,rpc client is disconnect") + log.Error("conn is disconnect",log.String("error",sErr.Error())) + call.DoError(sErr) + return call + } + + var compressBuff[]byte + bCompress := uint8(0) + if rc.compressBytesLen > 0 && len(bytes) >= rc.compressBytesLen { + var cErr error + compressBuff,cErr = compressor.CompressBlock(bytes) + if cErr != nil { + call.Seq = 0 + log.Error("compress fail",log.String("error",cErr.Error())) + call.DoError(cErr) + return call + } + if len(compressBuff) < len(bytes) { + bytes = compressBuff + bCompress = 1<<7 + } + } + + if noReply == false { + rc.AddPending(call) + } + + err = w.WriteMsg(nodeId,[]byte{uint8(processor.GetProcessorType())|bCompress}, bytes) + if cap(compressBuff) >0 { + compressor.CompressBufferCollection(compressBuff) + } + if err != nil { + rc.RemovePending(call.Seq) + log.Error("WiteMsg is fail",log.ErrorAttr("error",err)) + call.Seq = 0 + call.DoError(err) + } + + return call +} + +func (rc *Client) asyncCall(nodeId string,w IWriter,timeout time.Duration,rpcHandler IRpcHandler, serviceMethod string, callback reflect.Value, args interface{}, replyParam interface{},cancelable bool) (CancelRpc,error) { + processorType, processor := GetProcessorType(args) + InParam, herr := processor.Marshal(args) + if herr != nil { + return emptyCancelRpc,herr + } + + seq := rc.generateSeq() + request := MakeRpcRequest(processor, seq, 0, serviceMethod, false, InParam) + bytes, err := processor.Marshal(request.RpcRequestData) + ReleaseRpcRequest(request) + if err != nil { + return emptyCancelRpc,err + } + + if w == nil || w.IsConnected()==false { + return emptyCancelRpc,errors.New("Rpc server is disconnect,call " + serviceMethod) + } + + var compressBuff[]byte + bCompress := uint8(0) + if rc.compressBytesLen>0 &&len(bytes) >= rc.compressBytesLen { + var cErr error + compressBuff,cErr = compressor.CompressBlock(bytes) + if cErr != nil { + return emptyCancelRpc,cErr + } + + if len(compressBuff) < len(bytes) { + bytes = compressBuff + bCompress = 1<<7 + } + } + + call := MakeCall() + call.Reply = replyParam + call.callback = &callback + call.rpcHandler = rpcHandler + call.ServiceMethod = serviceMethod + call.Seq = seq + call.TimeOut = timeout + rc.AddPending(call) + + err = w.WriteMsg(nodeId,[]byte{uint8(processorType)|bCompress}, bytes) + if cap(compressBuff) >0 { + compressor.CompressBufferCollection(compressBuff) + } + if err != nil { + rc.RemovePending(call.Seq) + ReleaseCall(call) + return emptyCancelRpc,err + } + + if cancelable { + rpcCancel := RpcCancel{CallSeq:seq,Cli: rc} + return rpcCancel.CancelRpc,nil + } + + return emptyCancelRpc,nil +} \ No newline at end of file diff --git a/rpc/lclient.go b/rpc/lclient.go index dd8320f..8285d4d 100644 --- a/rpc/lclient.go +++ b/rpc/lclient.go @@ -37,7 +37,7 @@ func (lc *LClient) SetConn(conn *network.TCPConn){ func (lc *LClient) Close(waitDone bool){ } -func (lc *LClient) Go(timeout time.Duration,rpcHandler IRpcHandler,noReply bool, serviceMethod string, args interface{}, reply interface{}) *Call { +func (lc *LClient) Go(nodeId string,timeout time.Duration,rpcHandler IRpcHandler,noReply bool, serviceMethod string, args interface{}, reply interface{}) *Call { pLocalRpcServer := rpcHandler.GetRpcServer()() //判断是否是同一服务 findIndex := strings.Index(serviceMethod, ".") @@ -70,7 +70,7 @@ func (lc *LClient) Go(timeout time.Duration,rpcHandler IRpcHandler,noReply bool, } -func (rc *LClient) RawGo(timeout time.Duration,rpcHandler IRpcHandler,processor IRpcProcessor, noReply bool, rpcMethodId uint32, serviceName string, rawArgs []byte, reply interface{}) *Call { +func (rc *LClient) RawGo(nodeId string,timeout time.Duration,rpcHandler IRpcHandler,processor IRpcProcessor, noReply bool, rpcMethodId uint32, serviceName string, rawArgs []byte, reply interface{}) *Call { pLocalRpcServer := rpcHandler.GetRpcServer()() //服务自我调用 @@ -92,7 +92,7 @@ func (rc *LClient) RawGo(timeout time.Duration,rpcHandler IRpcHandler,processor } -func (lc *LClient) AsyncCall(timeout time.Duration,rpcHandler IRpcHandler, serviceMethod string, callback reflect.Value, args interface{}, reply interface{},cancelable bool) (CancelRpc,error) { +func (lc *LClient) AsyncCall(nodeId string,timeout time.Duration,rpcHandler IRpcHandler, serviceMethod string, callback reflect.Value, args interface{}, reply interface{},cancelable bool) (CancelRpc,error) { pLocalRpcServer := rpcHandler.GetRpcServer()() //判断是否是同一服务 @@ -119,17 +119,19 @@ func (lc *LClient) AsyncCall(timeout time.Duration,rpcHandler IRpcHandler, servi return calcelRpc,nil } -func NewLClient(nodeId string) *Client{ +func NewLClient(localNodeId string,callSet *CallSet) *Client{ client := &Client{} client.clientId = atomic.AddUint32(&clientSeq, 1) - client.nodeId = nodeId - client.maxCheckCallRpcCount = DefaultMaxCheckCallRpcCount - client.callRpcTimeout = DefaultRpcTimeout + client.targetNodeId = localNodeId + //client.maxCheckCallRpcCount = DefaultMaxCheckCallRpcCount + //client.callRpcTimeout = DefaultRpcTimeout lClient := &LClient{} lClient.selfClient = client client.IRealClient = lClient - client.InitPending() - go client.checkRpcCallTimeout() + client.CallSet = callSet return client } + +func (rc *LClient) Bind(server IServer){ +} \ No newline at end of file diff --git a/rpc/lserver.go b/rpc/lserver.go new file mode 100644 index 0000000..e0e29be --- /dev/null +++ b/rpc/lserver.go @@ -0,0 +1,291 @@ +package rpc +import ( + "errors" + "github.com/duanhf2012/origin/v2/log" + "reflect" + "time" + "strings" + "fmt" +) + + +type BaseServer struct { + localNodeId string + compressBytesLen int + + rpcHandleFinder RpcHandleFinder + iServer IServer +} + +func (ls *BaseServer) initBaseServer(compressBytesLen int,rpcHandleFinder RpcHandleFinder){ + ls.compressBytesLen = compressBytesLen + ls.rpcHandleFinder = rpcHandleFinder +} + +func (ls *BaseServer) myselfRpcHandlerGo(client *Client,handlerName string, serviceMethod string, args interface{},callBack reflect.Value, reply interface{}) error { + rpcHandler := ls.rpcHandleFinder.FindRpcHandler(handlerName) + if rpcHandler == nil { + err := errors.New("service method " + serviceMethod + " not config!") + log.Error("service method not config",log.String("serviceMethod",serviceMethod)) + return err + } + + return rpcHandler.CallMethod(client,serviceMethod, args,callBack, reply) +} + +func (ls *BaseServer) selfNodeRpcHandlerGo(timeout time.Duration,processor IRpcProcessor, client *Client, noReply bool, handlerName string, rpcMethodId uint32, serviceMethod string, args interface{}, reply interface{}, rawArgs []byte) *Call { + pCall := MakeCall() + pCall.Seq = client.generateSeq() + pCall.TimeOut = timeout + pCall.ServiceMethod = serviceMethod + + rpcHandler := ls.rpcHandleFinder.FindRpcHandler(handlerName) + if rpcHandler == nil { + err := errors.New("service method " + serviceMethod + " not config!") + log.Error("service method not config",log.String("serviceMethod",serviceMethod),log.ErrorAttr("error",err)) + pCall.Seq = 0 + pCall.DoError(err) + + return pCall + } + + var iParam interface{} + if processor == nil { + _, processor = GetProcessorType(args) + } + + if args != nil { + var err error + iParam,err = processor.Clone(args) + if err != nil { + sErr := errors.New("RpcHandler " + handlerName + "."+serviceMethod+" deep copy inParam is error:" + err.Error()) + log.Error("deep copy inParam is failed",log.String("handlerName",handlerName),log.String("serviceMethod",serviceMethod)) + pCall.Seq = 0 + pCall.DoError(sErr) + + return pCall + } + } + + req := MakeRpcRequest(processor, 0, rpcMethodId, serviceMethod, noReply, nil) + req.inParam = iParam + req.localReply = reply + if rawArgs != nil { + var err error + req.inParam, err = rpcHandler.UnmarshalInParam(processor, serviceMethod, rpcMethodId, rawArgs) + if err != nil { + log.Error("unmarshalInParam is failed",log.String("serviceMethod",serviceMethod),log.Uint32("rpcMethodId",rpcMethodId),log.ErrorAttr("error",err)) + pCall.Seq = 0 + pCall.DoError(err) + ReleaseRpcRequest(req) + return pCall + } + } + + if noReply == false { + client.AddPending(pCall) + callSeq := pCall.Seq + req.requestHandle = func(Returns interface{}, Err RpcError) { + if reply != nil && Returns != reply && Returns != nil { + byteReturns, err := req.rpcProcessor.Marshal(Returns) + if err != nil { + Err = ConvertError(err) + log.Error("returns data cannot be marshal",log.Uint64("seq",callSeq),log.ErrorAttr("error",err)) + }else{ + err = req.rpcProcessor.Unmarshal(byteReturns, reply) + if err != nil { + Err = ConvertError(err) + log.Error("returns data cannot be Unmarshal",log.Uint64("seq",callSeq),log.ErrorAttr("error",err)) + } + } + } + + ReleaseRpcRequest(req) + v := client.RemovePending(callSeq) + if v == nil { + log.Error("rpcClient cannot find seq",log.Uint64("seq",callSeq)) + return + } + + if len(Err) == 0 { + v.Err = nil + v.DoOK() + } else { + log.Error(Err.Error()) + v.DoError(Err) + } + } + } + + err := rpcHandler.PushRpcRequest(req) + if err != nil { + log.Error(err.Error()) + pCall.DoError(err) + ReleaseRpcRequest(req) + } + + return pCall +} + +func (server *BaseServer) selfNodeRpcHandlerAsyncGo(timeout time.Duration,client *Client, callerRpcHandler IRpcHandler, noReply bool, handlerName string, serviceMethod string, args interface{}, reply interface{}, callback reflect.Value,cancelable bool) (CancelRpc,error) { + rpcHandler := server.rpcHandleFinder.FindRpcHandler(handlerName) + if rpcHandler == nil { + err := errors.New("service method " + serviceMethod + " not config!") + log.Error(err.Error()) + return emptyCancelRpc,err + } + + _, processor := GetProcessorType(args) + iParam,err := processor.Clone(args) + if err != nil { + errM := errors.New("RpcHandler " + handlerName + "."+serviceMethod+" deep copy inParam is error:" + err.Error()) + log.Error(errM.Error()) + return emptyCancelRpc,errM + } + + req := MakeRpcRequest(processor, 0, 0, serviceMethod, noReply, nil) + req.inParam = iParam + req.localReply = reply + + cancelRpc := emptyCancelRpc + var callSeq uint64 + if noReply == false { + callSeq = client.generateSeq() + pCall := MakeCall() + pCall.Seq = callSeq + pCall.rpcHandler = callerRpcHandler + pCall.callback = &callback + pCall.Reply = reply + pCall.ServiceMethod = serviceMethod + pCall.TimeOut = timeout + client.AddPending(pCall) + rpcCancel := RpcCancel{CallSeq: callSeq,Cli: client} + cancelRpc = rpcCancel.CancelRpc + + req.requestHandle = func(Returns interface{}, Err RpcError) { + v := client.RemovePending(callSeq) + if v == nil { + ReleaseRpcRequest(req) + return + } + if len(Err) == 0 { + v.Err = nil + } else { + v.Err = Err + } + + if Returns != nil { + v.Reply = Returns + } + v.rpcHandler.PushRpcResponse(v) + ReleaseRpcRequest(req) + } + } + + err = rpcHandler.PushRpcRequest(req) + if err != nil { + ReleaseRpcRequest(req) + if callSeq > 0 { + client.RemovePending(callSeq) + } + return emptyCancelRpc,err + } + + return cancelRpc,nil +} + +func (bs *BaseServer) processRpcRequest(data []byte,connTag string,wrResponse writeResponse) error{ + bCompress := (data[0]>>7) > 0 + processor := GetProcessor(data[0]&0x7f) + if processor == nil { + return errors.New("cannot find processor") + } + + //解析head + var compressBuff []byte + byteData := data[1:] + if bCompress == true { + var unCompressErr error + + compressBuff,unCompressErr = compressor.UncompressBlock(byteData) + if unCompressErr!= nil { + return errors.New("UncompressBlock failed") + } + + byteData = compressBuff + } + + req := MakeRpcRequest(processor, 0, 0, "", false, nil) + err := processor.Unmarshal(byteData, req.RpcRequestData) + if cap(compressBuff) > 0 { + compressor.UnCompressBufferCollection(compressBuff) + } + + if err != nil { + if req.RpcRequestData.GetSeq() > 0 { + rpcError := RpcError(err.Error()) + if req.RpcRequestData.IsNoReply() == false { + wrResponse(processor,connTag, req.RpcRequestData.GetServiceMethod(), req.RpcRequestData.GetSeq(), nil, rpcError) + } + } + + ReleaseRpcRequest(req) + return err + } + + //交给程序处理 + serviceMethod := strings.Split(req.RpcRequestData.GetServiceMethod(), ".") + if len(serviceMethod) < 1 { + rpcError := RpcError("rpc request req.ServiceMethod is error") + if req.RpcRequestData.IsNoReply() == false { + wrResponse(processor,connTag, req.RpcRequestData.GetServiceMethod(), req.RpcRequestData.GetSeq(), nil, rpcError) + } + ReleaseRpcRequest(req) + log.Error("rpc request req.ServiceMethod is error") + return nil + } + + rpcHandler := bs.rpcHandleFinder.FindRpcHandler(serviceMethod[0]) + if rpcHandler == nil { + rpcError := RpcError(fmt.Sprintf("service method %s not config!", req.RpcRequestData.GetServiceMethod())) + if req.RpcRequestData.IsNoReply() == false { + wrResponse(processor,connTag, req.RpcRequestData.GetServiceMethod(), req.RpcRequestData.GetSeq(), nil, rpcError) + } + log.Error("serviceMethod not config",log.String("serviceMethod",req.RpcRequestData.GetServiceMethod())) + ReleaseRpcRequest(req) + return nil + } + + if req.RpcRequestData.IsNoReply() == false { + req.requestHandle = func(Returns interface{}, Err RpcError) { + wrResponse(processor,connTag, req.RpcRequestData.GetServiceMethod(), req.RpcRequestData.GetSeq(), Returns, Err) + ReleaseRpcRequest(req) + } + } + + req.inParam, err = rpcHandler.UnmarshalInParam(req.rpcProcessor, req.RpcRequestData.GetServiceMethod(), req.RpcRequestData.GetRpcMethodId(), req.RpcRequestData.GetInParam()) + if err != nil { + rErr := "Call Rpc " + req.RpcRequestData.GetServiceMethod() + " Param error " + err.Error() + log.Error("call rpc param error",log.String("serviceMethod",req.RpcRequestData.GetServiceMethod()),log.ErrorAttr("error",err)) + if req.requestHandle != nil { + req.requestHandle(nil, RpcError(rErr)) + } else { + ReleaseRpcRequest(req) + } + + return nil + } + + err = rpcHandler.PushRpcRequest(req) + if err != nil { + rpcError := RpcError(err.Error()) + + if req.RpcRequestData.IsNoReply() { + wrResponse(processor, connTag,req.RpcRequestData.GetServiceMethod(), req.RpcRequestData.GetSeq(), nil, rpcError) + } + + ReleaseRpcRequest(req) + } + + return nil +} \ No newline at end of file diff --git a/rpc/natsclient.go b/rpc/natsclient.go new file mode 100644 index 0000000..994fe11 --- /dev/null +++ b/rpc/natsclient.go @@ -0,0 +1,89 @@ +package rpc +import ( + "github.com/duanhf2012/origin/v2/network" + "reflect" + "time" + "github.com/nats-io/nats.go" + "github.com/duanhf2012/origin/v2/log" +) + +//跨结点连接的Client +type NatsClient struct { + localNodeId string + + natsConn *nats.Conn + client *Client +} + +func (nc *NatsClient) Start(natsConn *nats.Conn) error{ + nc.natsConn = natsConn + _,err := nc.natsConn.QueueSubscribe("oc."+nc.localNodeId, "oc",nc.onSubscribe) + + return err +} + +func (nc *NatsClient) onSubscribe(msg *nats.Msg){ + //处理消息 + nc.client.processRpcResponse(msg.Data) +} + +func (nc *NatsClient) SetConn(conn *network.TCPConn){ +} + +func (nc *NatsClient) Close(waitDone bool){ +} + +func (nc *NatsClient) Run(){ +} + +func (nc *NatsClient) OnClose(){ +} + +func (rc *NatsClient) Bind(server IServer){ + s := server.(*NatsServer) + rc.natsConn = s.natsConn +} + +func (rc *NatsClient) Go(nodeId string,timeout time.Duration,rpcHandler IRpcHandler,noReply bool, serviceMethod string, args interface{}, reply interface{}) *Call { + _, processor := GetProcessorType(args) + InParam, err := processor.Marshal(args) + if err != nil { + log.Error("Marshal is fail",log.ErrorAttr("error",err)) + call := MakeCall() + call.DoError(err) + return call + } + + return rc.client.rawGo(nodeId,rc,timeout,rpcHandler,processor, noReply, 0, serviceMethod, InParam, reply) +} + +func (rc *NatsClient) RawGo(nodeId string,timeout time.Duration,rpcHandler IRpcHandler,processor IRpcProcessor, noReply bool, rpcMethodId uint32, serviceMethod string, rawArgs []byte, reply interface{}) *Call { + return rc.client.rawGo(nodeId,rc,timeout,rpcHandler,processor, noReply, rpcMethodId, serviceMethod, rawArgs, reply) +} + +func (rc *NatsClient) AsyncCall(nodeId string,timeout time.Duration,rpcHandler IRpcHandler, serviceMethod string, callback reflect.Value, args interface{}, replyParam interface{},cancelable bool) (CancelRpc,error) { + cancelRpc,err := rc.client.asyncCall(nodeId,rc,timeout,rpcHandler, serviceMethod, callback, args, replyParam,cancelable) + if err != nil { + callback.Call([]reflect.Value{reflect.ValueOf(replyParam), reflect.ValueOf(err)}) + } + + return cancelRpc,nil +} + +func (rc *NatsClient) WriteMsg (nodeId string,args ...[]byte) error{ + buff := make([]byte,0,4096) + for _,ar := range args { + buff = append(buff,ar...) + } + + var msg nats.Msg + msg.Subject = "os."+nodeId + msg.Data = buff + msg.Header = nats.Header{} + msg.Header.Set("fnode",rc.localNodeId) + return rc.natsConn.PublishMsg(&msg) +} + +func (rc *NatsClient) IsConnected() bool{ + return rc.natsConn.Status() == nats.CONNECTED +} diff --git a/rpc/natsserver.go b/rpc/natsserver.go new file mode 100644 index 0000000..875d260 --- /dev/null +++ b/rpc/natsserver.go @@ -0,0 +1,119 @@ +package rpc + +import ( + "github.com/nats-io/nats.go" + "github.com/duanhf2012/origin/v2/log" + "time" +) + +type NatsServer struct { + BaseServer + natsUrl string + + natsConn *nats.Conn + NoRandomize bool + + nodeSubTopic string + compressBytesLen int +} + +func (ns *NatsServer) Start() error{ + var err error + var options []nats.Option + + options = append(options,nats.DisconnectErrHandler(func(nc *nats.Conn, err error) { + log.Error("nats is disconnect",log.String("connUrl",nc.ConnectedUrl())) + // handle disconnect error event + })) + + options = append(options,nats.ReconnectHandler(func(nc *nats.Conn) { + log.Error("nats is reconnection",log.String("connUrl",nc.ConnectedUrl())) + // handle reconnect event + })) + + options = append(options,nats.MaxReconnects(-1)) + + options = append(options,nats.ReconnectWait(time.Second*3)) + + if ns.NoRandomize { + options = append(options,nats.DontRandomize()) + } + + ns.natsConn,err = nats.Connect(ns.natsUrl,options...) + if err != nil { + log.Error("Connect to nats fail",log.String("natsUrl",ns.natsUrl),log.ErrorAttr("err",err)) + return err + } + + //开始订阅 + _,err = ns.natsConn.QueueSubscribe(ns.nodeSubTopic,"os", func(msg *nats.Msg) { + ns.processRpcRequest(msg.Data,msg.Header.Get("fnode"),ns.WriteResponse) + }) + + return err +} + +func (ns *NatsServer) WriteResponse(processor IRpcProcessor, nodeId string,serviceMethod string, seq uint64, reply interface{}, rpcError RpcError){ + var mReply []byte + var err error + + if reply != nil { + mReply, err = processor.Marshal(reply) + if err != nil { + rpcError = ConvertError(err) + } + } + + var rpcResponse RpcResponse + rpcResponse.RpcResponseData = processor.MakeRpcResponse(seq, rpcError, mReply) + bytes, err := processor.Marshal(rpcResponse.RpcResponseData) + defer processor.ReleaseRpcResponse(rpcResponse.RpcResponseData) + + if err != nil { + log.Error("mashal RpcResponseData failed",log.String("serviceMethod",serviceMethod),log.ErrorAttr("error",err)) + return + } + + var compressBuff[]byte + bCompress := uint8(0) + if ns.compressBytesLen >0 && len(bytes) >= ns.compressBytesLen { + compressBuff,err = compressor.CompressBlock(bytes) + if err != nil { + log.Error("CompressBlock failed",log.String("serviceMethod",serviceMethod),log.ErrorAttr("error",err)) + return + } + if len(compressBuff) < len(bytes) { + bytes = compressBuff + bCompress = 1<<7 + } + } + + sendData := make([]byte,0,4096) + byteTypeAndCompress := []byte{uint8(processor.GetProcessorType())|bCompress} + sendData = append(sendData,byteTypeAndCompress...) + sendData = append(sendData,bytes...) + err = ns.natsConn.PublishMsg(&nats.Msg{Subject: "oc."+nodeId, Data: sendData}) + + if cap(compressBuff) >0 { + compressor.CompressBufferCollection(compressBuff) + } + + if err != nil { + log.Error("WriteMsg error,Rpc return is fail",log.String("nodeId",nodeId),log.String("serviceMethod",serviceMethod),log.ErrorAttr("error",err)) + } +} + +func (ns *NatsServer) Stop(){ + if ns.natsConn != nil { + ns.natsConn.Close() + } +} + +func (ns *NatsServer) initServer(natsUrl string, noRandomize bool,localNodeId string,compressBytesLen int,rpcHandleFinder RpcHandleFinder){ + ns.natsUrl = natsUrl + ns.NoRandomize = noRandomize + ns.localNodeId = localNodeId + ns.compressBytesLen = compressBytesLen + ns.initBaseServer(compressBytesLen,rpcHandleFinder) + ns.nodeSubTopic = "os."+localNodeId //服务器 +} diff --git a/rpc/rclient.go b/rpc/rclient.go index 31992e4..cdb3429 100644 --- a/rpc/rclient.go +++ b/rpc/rclient.go @@ -1,7 +1,6 @@ package rpc import ( - "errors" "fmt" "github.com/duanhf2012/origin/v2/log" "github.com/duanhf2012/origin/v2/network" @@ -14,7 +13,6 @@ import ( //跨结点连接的Client type RClient struct { - compressBytesLen int selfClient *Client network.TCPClient conn *network.TCPConn @@ -42,7 +40,11 @@ func (rc *RClient) SetConn(conn *network.TCPConn){ rc.Unlock() } -func (rc *RClient) Go(timeout time.Duration,rpcHandler IRpcHandler,noReply bool, serviceMethod string, args interface{}, reply interface{}) *Call { +func (rc *RClient) WriteMsg (nodeId string,args ...[]byte) error{ + return rc.conn.WriteMsg(args...) +} + +func (rc *RClient) Go(nodeId string,timeout time.Duration,rpcHandler IRpcHandler,noReply bool, serviceMethod string, args interface{}, reply interface{}) *Call { _, processor := GetProcessorType(args) InParam, err := processor.Marshal(args) if err != nil { @@ -52,74 +54,15 @@ func (rc *RClient) Go(timeout time.Duration,rpcHandler IRpcHandler,noReply bool, return call } - return rc.RawGo(timeout,rpcHandler,processor, noReply, 0, serviceMethod, InParam, reply) + return rc.selfClient.rawGo(nodeId,rc,timeout,rpcHandler,processor, noReply, 0, serviceMethod, InParam, reply) } -func (rc *RClient) RawGo(timeout time.Duration,rpcHandler IRpcHandler,processor IRpcProcessor, noReply bool, rpcMethodId uint32, serviceMethod string, rawArgs []byte, reply interface{}) *Call { - call := MakeCall() - call.ServiceMethod = serviceMethod - call.Reply = reply - call.Seq = rc.selfClient.generateSeq() - call.TimeOut = timeout - - request := MakeRpcRequest(processor, call.Seq, rpcMethodId, serviceMethod, noReply, rawArgs) - bytes, err := processor.Marshal(request.RpcRequestData) - ReleaseRpcRequest(request) - - if err != nil { - call.Seq = 0 - log.Error("marshal is fail",log.String("error",err.Error())) - call.DoError(err) - return call - } - - conn := rc.GetConn() - if conn == nil || conn.IsConnected()==false { - call.Seq = 0 - sErr := errors.New(serviceMethod + " was called failed,rpc client is disconnect") - log.Error("conn is disconnect",log.String("error",sErr.Error())) - call.DoError(sErr) - return call - } - - var compressBuff[]byte - bCompress := uint8(0) - if rc.compressBytesLen > 0 && len(bytes) >= rc.compressBytesLen { - var cErr error - compressBuff,cErr = compressor.CompressBlock(bytes) - if cErr != nil { - call.Seq = 0 - log.Error("compress fail",log.String("error",cErr.Error())) - call.DoError(cErr) - return call - } - if len(compressBuff) < len(bytes) { - bytes = compressBuff - bCompress = 1<<7 - } - } - - if noReply == false { - rc.selfClient.AddPending(call) - } - - err = conn.WriteMsg([]byte{uint8(processor.GetProcessorType())|bCompress}, bytes) - if cap(compressBuff) >0 { - compressor.CompressBufferCollection(compressBuff) - } - if err != nil { - rc.selfClient.RemovePending(call.Seq) - log.Error("WiteMsg is fail",log.ErrorAttr("error",err)) - call.Seq = 0 - call.DoError(err) - } - - return call +func (rc *RClient) RawGo(nodeId string,timeout time.Duration,rpcHandler IRpcHandler,processor IRpcProcessor, noReply bool, rpcMethodId uint32, serviceMethod string, rawArgs []byte, reply interface{}) *Call { + return rc.selfClient.rawGo(nodeId,rc,timeout,rpcHandler,processor, noReply, rpcMethodId, serviceMethod, rawArgs, reply) } - -func (rc *RClient) AsyncCall(timeout time.Duration,rpcHandler IRpcHandler, serviceMethod string, callback reflect.Value, args interface{}, replyParam interface{},cancelable bool) (CancelRpc,error) { - cancelRpc,err := rc.asyncCall(timeout,rpcHandler, serviceMethod, callback, args, replyParam,cancelable) +func (rc *RClient) AsyncCall(nodeId string,timeout time.Duration,rpcHandler IRpcHandler, serviceMethod string, callback reflect.Value, args interface{}, replyParam interface{},cancelable bool) (CancelRpc,error) { + cancelRpc,err := rc.selfClient.asyncCall(nodeId,rc,timeout,rpcHandler, serviceMethod, callback, args, replyParam,cancelable) if err != nil { callback.Call([]reflect.Value{reflect.ValueOf(replyParam), reflect.ValueOf(err)}) } @@ -127,68 +70,6 @@ func (rc *RClient) AsyncCall(timeout time.Duration,rpcHandler IRpcHandler, servi return cancelRpc,nil } -func (rc *RClient) asyncCall(timeout time.Duration,rpcHandler IRpcHandler, serviceMethod string, callback reflect.Value, args interface{}, replyParam interface{},cancelable bool) (CancelRpc,error) { - processorType, processor := GetProcessorType(args) - InParam, herr := processor.Marshal(args) - if herr != nil { - return emptyCancelRpc,herr - } - - seq := rc.selfClient.generateSeq() - request := MakeRpcRequest(processor, seq, 0, serviceMethod, false, InParam) - bytes, err := processor.Marshal(request.RpcRequestData) - ReleaseRpcRequest(request) - if err != nil { - return emptyCancelRpc,err - } - - conn := rc.GetConn() - if conn == nil || conn.IsConnected()==false { - return emptyCancelRpc,errors.New("Rpc server is disconnect,call " + serviceMethod) - } - - var compressBuff[]byte - bCompress := uint8(0) - if rc.compressBytesLen>0 &&len(bytes) >= rc.compressBytesLen { - var cErr error - compressBuff,cErr = compressor.CompressBlock(bytes) - if cErr != nil { - return emptyCancelRpc,cErr - } - - if len(compressBuff) < len(bytes) { - bytes = compressBuff - bCompress = 1<<7 - } - } - - call := MakeCall() - call.Reply = replyParam - call.callback = &callback - call.rpcHandler = rpcHandler - call.ServiceMethod = serviceMethod - call.Seq = seq - call.TimeOut = timeout - rc.selfClient.AddPending(call) - - err = conn.WriteMsg([]byte{uint8(processorType)|bCompress}, bytes) - if cap(compressBuff) >0 { - compressor.CompressBufferCollection(compressBuff) - } - if err != nil { - rc.selfClient.RemovePending(call.Seq) - ReleaseCall(call) - return emptyCancelRpc,err - } - - if cancelable { - rpcCancel := RpcCancel{CallSeq:seq,Cli: rc.selfClient} - return rpcCancel.CancelRpc,nil - } - - return emptyCancelRpc,nil -} - func (rc *RClient) Run() { defer func() { if r := recover(); r != nil { @@ -199,7 +80,7 @@ func (rc *RClient) Run() { } }() - rc.TriggerRpcConnEvent(true, rc.selfClient.GetClientId(), rc.selfClient.GetNodeId()) + rc.TriggerRpcConnEvent(true, rc.selfClient.GetClientId(), rc.selfClient.GetTargetNodeId()) for { bytes, err := rc.conn.ReadMsg() if err != nil { @@ -207,85 +88,27 @@ func (rc *RClient) Run() { return } - bCompress := (bytes[0]>>7) > 0 - processor := GetProcessor(bytes[0]&0x7f) - if processor == nil { - rc.conn.ReleaseReadMsg(bytes) - log.Error("cannot find process",log.Uint8("process type",bytes[0]&0x7f)) - return - } - - //1.解析head - response := RpcResponse{} - response.RpcResponseData = processor.MakeRpcResponse(0, "", nil) - - //解压缩 - byteData := bytes[1:] - var compressBuff []byte - - if bCompress == true { - var unCompressErr error - compressBuff,unCompressErr = compressor.UncompressBlock(byteData) - if unCompressErr!= nil { - rc.conn.ReleaseReadMsg(bytes) - log.Error("uncompressBlock failed",log.ErrorAttr("error",unCompressErr)) - return - } - byteData = compressBuff - } - - err = processor.Unmarshal(byteData, response.RpcResponseData) - if cap(compressBuff) > 0 { - compressor.UnCompressBufferCollection(compressBuff) - } - + err = rc.selfClient.processRpcResponse(bytes) rc.conn.ReleaseReadMsg(bytes) if err != nil { - processor.ReleaseRpcResponse(response.RpcResponseData) - log.Error("rpcClient Unmarshal head error",log.ErrorAttr("error",err)) - continue + return } - - v := rc.selfClient.RemovePending(response.RpcResponseData.GetSeq()) - if v == nil { - log.Error("rpcClient cannot find seq",log.Uint64("seq",response.RpcResponseData.GetSeq())) - } else { - v.Err = nil - if len(response.RpcResponseData.GetReply()) > 0 { - err = processor.Unmarshal(response.RpcResponseData.GetReply(), v.Reply) - if err != nil { - log.Error("rpcClient Unmarshal body failed",log.ErrorAttr("error",err)) - v.Err = err - } - } - - if response.RpcResponseData.GetErr() != nil { - v.Err = response.RpcResponseData.GetErr() - } - - if v.callback != nil && v.callback.IsValid() { - v.rpcHandler.PushRpcResponse(v) - } else { - v.done <- v - } - } - - processor.ReleaseRpcResponse(response.RpcResponseData) } } func (rc *RClient) OnClose() { - rc.TriggerRpcConnEvent(false, rc.selfClient.GetClientId(), rc.selfClient.GetNodeId()) + rc.TriggerRpcConnEvent(false, rc.selfClient.GetClientId(), rc.selfClient.GetTargetNodeId()) } -func NewRClient(nodeId string, addr string, maxRpcParamLen uint32,compressBytesLen int,triggerRpcConnEvent TriggerRpcConnEvent) *Client{ +func NewRClient(targetNodeId string, addr string, maxRpcParamLen uint32,compressBytesLen int,triggerRpcConnEvent TriggerRpcConnEvent,callSet *CallSet) *Client{ client := &Client{} client.clientId = atomic.AddUint32(&clientSeq, 1) - client.nodeId = nodeId - client.maxCheckCallRpcCount = DefaultMaxCheckCallRpcCount - client.callRpcTimeout = DefaultRpcTimeout + client.targetNodeId = targetNodeId + //client.maxCheckCallRpcCount = DefaultMaxCheckCallRpcCount + //client.callRpcTimeout = DefaultRpcTimeout + client.compressBytesLen = compressBytesLen + c:= &RClient{} - c.compressBytesLen = compressBytesLen c.selfClient = client c.Addr = addr c.ConnectInterval = DefaultConnectInterval @@ -306,8 +129,7 @@ func NewRClient(nodeId string, addr string, maxRpcParamLen uint32,compressBytesL c.MaxMsgLen = math.MaxUint32 } client.IRealClient = c - client.InitPending() - go client.checkRpcCallTimeout() + client.CallSet = callSet c.Start() return client } @@ -318,3 +140,7 @@ func (rc *RClient) Close(waitDone bool) { rc.selfClient.cleanPending() } + +func (rc *RClient) Bind(server IServer){ + +} \ No newline at end of file diff --git a/rpc/rpchandler.go b/rpc/rpchandler.go index 876281b..64c5c25 100644 --- a/rpc/rpchandler.go +++ b/rpc/rpchandler.go @@ -15,7 +15,7 @@ import ( const maxClusterNode int = 128 type FuncRpcClient func(nodeId string, serviceMethod string,filterRetire bool, client []*Client) (error, int) -type FuncRpcServer func() *Server +type FuncRpcServer func() IServer const NodeIdNull = "" var nilError = reflect.Zero(reflect.TypeOf((*error)(nil)).Elem()) @@ -445,7 +445,7 @@ func (handler *RpcHandler) goRpc(processor IRpcProcessor, bCast bool, nodeId str //2.rpcClient调用 for i := 0; i < count; i++ { - pCall := pClientList[i].Go(DefaultRpcTimeout,handler.rpcHandler,true, serviceMethod, args, nil) + pCall := pClientList[i].Go(pClientList[i].GetTargetNodeId(),DefaultRpcTimeout,handler.rpcHandler,true, serviceMethod, args, nil) if pCall.Err != nil { err = pCall.Err } @@ -472,7 +472,7 @@ func (handler *RpcHandler) callRpc(timeout time.Duration,nodeId string, serviceM } pClient := pClientList[0] - pCall := pClient.Go(timeout,handler.rpcHandler,false, serviceMethod, args, reply) + pCall := pClient.Go(pClient.GetTargetNodeId(),timeout,handler.rpcHandler,false, serviceMethod, args, reply) err = pCall.Done().Err pClient.RemovePending(pCall.Seq) @@ -525,7 +525,7 @@ func (handler *RpcHandler) asyncCallRpc(timeout time.Duration,nodeId string, ser //2.rpcClient调用 //如果调用本结点服务 - return pClientList[0].AsyncCall(timeout,handler.rpcHandler, serviceMethod, fVal, args, reply,false) + return pClientList[0].AsyncCall(pClientList[0].GetTargetNodeId(),timeout,handler.rpcHandler, serviceMethod, fVal, args, reply,false) } func (handler *RpcHandler) GetName() string { @@ -600,7 +600,7 @@ func (handler *RpcHandler) RawGoNode(rpcProcessorType RpcProcessorType, nodeId s //如果调用本结点服务 for i := 0; i < count; i++ { //跨node调用 - pCall := handler.pClientList[i].RawGo(DefaultRpcTimeout,handler.rpcHandler,processor, true, rpcMethodId, serviceName, rawArgs, nil) + pCall := handler.pClientList[i].RawGo(handler.pClientList[i].GetTargetNodeId(),DefaultRpcTimeout,handler.rpcHandler,processor, true, rpcMethodId, serviceName, rawArgs, nil) if pCall.Err != nil { err = pCall.Err } diff --git a/rpc/rpcnats.go b/rpc/rpcnats.go new file mode 100644 index 0000000..e820545 --- /dev/null +++ b/rpc/rpcnats.go @@ -0,0 +1,41 @@ +package rpc + +import "sync/atomic" + +type RpcNats struct { + NatsServer + NatsClient +} + +func (rn *RpcNats) Start() error{ + err := rn.NatsServer.Start() + if err != nil { + return err + } + + return rn.NatsClient.Start(rn.NatsServer.natsConn) +} + +func (rn *RpcNats) Init(natsUrl string, noRandomize bool, nodeId string,compressBytesLen int,rpcHandleFinder RpcHandleFinder){ + rn.NatsClient.localNodeId = nodeId + rn.NatsServer.initServer(natsUrl,noRandomize, nodeId,compressBytesLen,rpcHandleFinder) + rn.NatsServer.iServer = rn +} + +func (rn *RpcNats) NewNatsClient(targetNodeId string,localNodeId string,callSet *CallSet) *Client{ + var client Client + + client.clientId = atomic.AddUint32(&clientSeq, 1) + client.targetNodeId = targetNodeId + //client.maxCheckCallRpcCount = DefaultMaxCheckCallRpcCount + //client.callRpcTimeout = DefaultRpcTimeout + + natsClient := &rn.NatsClient + natsClient.localNodeId = localNodeId + natsClient.client = &client + + client.IRealClient = natsClient + client.CallSet = callSet + + return &client +} \ No newline at end of file diff --git a/rpc/server.go b/rpc/server.go index dc70cca..fb4fb06 100644 --- a/rpc/server.go +++ b/rpc/server.go @@ -1,7 +1,6 @@ package rpc import ( - "errors" "fmt" "github.com/duanhf2012/origin/v2/log" "github.com/duanhf2012/origin/v2/network" @@ -13,6 +12,7 @@ import ( "runtime" ) +const Default_ReadWriteDeadline = 15*time.Second type RpcProcessorType uint8 const ( @@ -24,12 +24,25 @@ var arrayProcessor = []IRpcProcessor{&JsonProcessor{}, &PBProcessor{}} var arrayProcessorLen uint8 = 2 var LittleEndian bool +type IServer interface { + Start() error + Stop() + + selfNodeRpcHandlerGo(timeout time.Duration,processor IRpcProcessor, client *Client, noReply bool, handlerName string, rpcMethodId uint32, serviceMethod string, args interface{}, reply interface{}, rawArgs []byte) *Call + myselfRpcHandlerGo(client *Client,handlerName string, serviceMethod string, args interface{},callBack reflect.Value, reply interface{}) error + selfNodeRpcHandlerAsyncGo(timeout time.Duration,client *Client, callerRpcHandler IRpcHandler, noReply bool, handlerName string, serviceMethod string, args interface{}, reply interface{}, callback reflect.Value,cancelable bool) (CancelRpc,error) +} + +type writeResponse func(processor IRpcProcessor,connTag string, serviceMethod string, seq uint64, reply interface{}, rpcError RpcError) + type Server struct { + BaseServer + functions map[interface{}]interface{} - rpcHandleFinder RpcHandleFinder rpcServer *network.TCPServer - compressBytesLen int + listenAddr string + maxRpcParamLen uint32 } type RpcAgent struct { @@ -60,24 +73,25 @@ func GetProcessor(processorType uint8) IRpcProcessor { return arrayProcessor[processorType] } -func (server *Server) Init(rpcHandleFinder RpcHandleFinder) { - server.rpcHandleFinder = rpcHandleFinder +func (server *Server) Init(listenAddr string, maxRpcParamLen uint32,compressBytesLen int,rpcHandleFinder RpcHandleFinder) { + server.initBaseServer(compressBytesLen,rpcHandleFinder) + server.listenAddr = listenAddr + server.maxRpcParamLen = maxRpcParamLen + server.rpcServer = &network.TCPServer{} } -const Default_ReadWriteDeadline = 15*time.Second - -func (server *Server) Start(listenAddr string, maxRpcParamLen uint32,compressBytesLen int) { - splitAddr := strings.Split(listenAddr, ":") +func (server *Server) Start() error{ + splitAddr := strings.Split(server.listenAddr, ":") if len(splitAddr) != 2 { - log.Fatal("listen addr is failed", log.String("listenAddr",listenAddr)) + return fmt.Errorf("listen addr is failed,listenAddr:%s", server.listenAddr) } server.rpcServer.Addr = ":" + splitAddr[1] server.rpcServer.MinMsgLen = 2 - server.compressBytesLen = compressBytesLen - if maxRpcParamLen > 0 { - server.rpcServer.MaxMsgLen = maxRpcParamLen + server.compressBytesLen = server.compressBytesLen + if server.maxRpcParamLen > 0 { + server.rpcServer.MaxMsgLen = server.maxRpcParamLen } else { server.rpcServer.MaxMsgLen = math.MaxUint32 } @@ -90,12 +104,16 @@ func (server *Server) Start(listenAddr string, maxRpcParamLen uint32,compressByt server.rpcServer.ReadDeadline = Default_ReadWriteDeadline server.rpcServer.LenMsgLen = DefaultRpcLenMsgLen - server.rpcServer.Start() + return server.rpcServer.Start() +} + +func (server *Server) Stop(){ + server.rpcServer.Close() } func (agent *RpcAgent) OnDestroy() {} -func (agent *RpcAgent) WriteResponse(processor IRpcProcessor, serviceMethod string, seq uint64, reply interface{}, rpcError RpcError) { +func (agent *RpcAgent) WriteResponse(processor IRpcProcessor, connTag string, serviceMethod string, seq uint64, reply interface{}, rpcError RpcError) { var mReply []byte var errM error @@ -159,102 +177,12 @@ func (agent *RpcAgent) Run() { break } - bCompress := (data[0]>>7) > 0 - processor := GetProcessor(data[0]&0x7f) - if processor == nil { - agent.conn.ReleaseReadMsg(data) - log.Warning("cannot find processor",log.String("RemoteAddr",agent.conn.RemoteAddr().String())) - return - } - - //解析head - var compressBuff []byte - byteData := data[1:] - if bCompress == true { - var unCompressErr error - - compressBuff,unCompressErr = compressor.UncompressBlock(byteData) - if unCompressErr!= nil { - agent.conn.ReleaseReadMsg(data) - log.Error("UncompressBlock failed",log.String("RemoteAddr",agent.conn.RemoteAddr().String()),log.ErrorAttr("error",unCompressErr)) - return - } - byteData = compressBuff - } - - req := MakeRpcRequest(processor, 0, 0, "", false, nil) - err = processor.Unmarshal(byteData, req.RpcRequestData) - if cap(compressBuff) > 0 { - compressor.UnCompressBufferCollection(compressBuff) - } - agent.conn.ReleaseReadMsg(data) + defer agent.conn.ReleaseReadMsg(data) + err = agent.rpcServer.processRpcRequest( data,"",agent.WriteResponse) if err != nil { - log.Error("Unmarshal failed",log.String("RemoteAddr",agent.conn.RemoteAddr().String()),log.ErrorAttr("error",err)) - if req.RpcRequestData.GetSeq() > 0 { - rpcError := RpcError(err.Error()) - if req.RpcRequestData.IsNoReply() == false { - agent.WriteResponse(processor, req.RpcRequestData.GetServiceMethod(), req.RpcRequestData.GetSeq(), nil, rpcError) - } - ReleaseRpcRequest(req) - continue - } else { - ReleaseRpcRequest(req) - break - } - } - - //交给程序处理 - serviceMethod := strings.Split(req.RpcRequestData.GetServiceMethod(), ".") - if len(serviceMethod) < 1 { - rpcError := RpcError("rpc request req.ServiceMethod is error") - if req.RpcRequestData.IsNoReply() == false { - agent.WriteResponse(processor, req.RpcRequestData.GetServiceMethod(), req.RpcRequestData.GetSeq(), nil, rpcError) - } - ReleaseRpcRequest(req) - log.Error("rpc request req.ServiceMethod is error") - continue - } - - rpcHandler := agent.rpcServer.rpcHandleFinder.FindRpcHandler(serviceMethod[0]) - if rpcHandler == nil { - rpcError := RpcError(fmt.Sprintf("service method %s not config!", req.RpcRequestData.GetServiceMethod())) - if req.RpcRequestData.IsNoReply() == false { - agent.WriteResponse(processor, req.RpcRequestData.GetServiceMethod(), req.RpcRequestData.GetSeq(), nil, rpcError) - } - log.Error("serviceMethod not config",log.String("serviceMethod",req.RpcRequestData.GetServiceMethod())) - ReleaseRpcRequest(req) - continue - } - - if req.RpcRequestData.IsNoReply() == false { - req.requestHandle = func(Returns interface{}, Err RpcError) { - agent.WriteResponse(processor, req.RpcRequestData.GetServiceMethod(), req.RpcRequestData.GetSeq(), Returns, Err) - ReleaseRpcRequest(req) - } - } - - req.inParam, err = rpcHandler.UnmarshalInParam(req.rpcProcessor, req.RpcRequestData.GetServiceMethod(), req.RpcRequestData.GetRpcMethodId(), req.RpcRequestData.GetInParam()) - if err != nil { - rErr := "Call Rpc " + req.RpcRequestData.GetServiceMethod() + " Param error " + err.Error() - log.Error("call rpc param error",log.String("serviceMethod",req.RpcRequestData.GetServiceMethod()),log.ErrorAttr("error",err)) - if req.requestHandle != nil { - req.requestHandle(nil, RpcError(rErr)) - } else { - ReleaseRpcRequest(req) - } - - continue - } - - err = rpcHandler.PushRpcRequest(req) - if err != nil { - rpcError := RpcError(err.Error()) - - if req.RpcRequestData.IsNoReply() { - agent.WriteResponse(processor, req.RpcRequestData.GetServiceMethod(), req.RpcRequestData.GetSeq(), nil, rpcError) - } - - ReleaseRpcRequest(req) + log.Error("processRpcRequest is error",log.String("remoteAddress",agent.conn.RemoteAddr().String()),log.ErrorAttr("error",err)) + //will close tcpconn + break } } } @@ -287,174 +215,3 @@ func (server *Server) NewAgent(c *network.TCPConn) network.Agent { return agent } -func (server *Server) myselfRpcHandlerGo(client *Client,handlerName string, serviceMethod string, args interface{},callBack reflect.Value, reply interface{}) error { - rpcHandler := server.rpcHandleFinder.FindRpcHandler(handlerName) - if rpcHandler == nil { - err := errors.New("service method " + serviceMethod + " not config!") - log.Error("service method not config",log.String("serviceMethod",serviceMethod)) - return err - } - - return rpcHandler.CallMethod(client,serviceMethod, args,callBack, reply) -} - -func (server *Server) selfNodeRpcHandlerGo(timeout time.Duration,processor IRpcProcessor, client *Client, noReply bool, handlerName string, rpcMethodId uint32, serviceMethod string, args interface{}, reply interface{}, rawArgs []byte) *Call { - pCall := MakeCall() - pCall.Seq = client.generateSeq() - pCall.TimeOut = timeout - pCall.ServiceMethod = serviceMethod - - rpcHandler := server.rpcHandleFinder.FindRpcHandler(handlerName) - if rpcHandler == nil { - err := errors.New("service method " + serviceMethod + " not config!") - log.Error("service method not config",log.String("serviceMethod",serviceMethod),log.ErrorAttr("error",err)) - pCall.Seq = 0 - pCall.DoError(err) - - return pCall - } - - var iParam interface{} - if processor == nil { - _, processor = GetProcessorType(args) - } - - if args != nil { - var err error - iParam,err = processor.Clone(args) - if err != nil { - sErr := errors.New("RpcHandler " + handlerName + "."+serviceMethod+" deep copy inParam is error:" + err.Error()) - log.Error("deep copy inParam is failed",log.String("handlerName",handlerName),log.String("serviceMethod",serviceMethod)) - pCall.Seq = 0 - pCall.DoError(sErr) - - return pCall - } - } - - req := MakeRpcRequest(processor, 0, rpcMethodId, serviceMethod, noReply, nil) - req.inParam = iParam - req.localReply = reply - if rawArgs != nil { - var err error - req.inParam, err = rpcHandler.UnmarshalInParam(processor, serviceMethod, rpcMethodId, rawArgs) - if err != nil { - log.Error("unmarshalInParam is failed",log.String("serviceMethod",serviceMethod),log.Uint32("rpcMethodId",rpcMethodId),log.ErrorAttr("error",err)) - pCall.Seq = 0 - pCall.DoError(err) - ReleaseRpcRequest(req) - return pCall - } - } - - if noReply == false { - client.AddPending(pCall) - callSeq := pCall.Seq - req.requestHandle = func(Returns interface{}, Err RpcError) { - if reply != nil && Returns != reply && Returns != nil { - byteReturns, err := req.rpcProcessor.Marshal(Returns) - if err != nil { - Err = ConvertError(err) - log.Error("returns data cannot be marshal",log.Uint64("seq",callSeq),log.ErrorAttr("error",err)) - }else{ - err = req.rpcProcessor.Unmarshal(byteReturns, reply) - if err != nil { - Err = ConvertError(err) - log.Error("returns data cannot be Unmarshal",log.Uint64("seq",callSeq),log.ErrorAttr("error",err)) - } - } - } - - ReleaseRpcRequest(req) - v := client.RemovePending(callSeq) - if v == nil { - log.Error("rpcClient cannot find seq",log.Uint64("seq",callSeq)) - return - } - - if len(Err) == 0 { - v.Err = nil - v.DoOK() - } else { - log.Error(Err.Error()) - v.DoError(Err) - } - } - } - - err := rpcHandler.PushRpcRequest(req) - if err != nil { - log.Error(err.Error()) - pCall.DoError(err) - ReleaseRpcRequest(req) - } - - return pCall -} - -func (server *Server) selfNodeRpcHandlerAsyncGo(timeout time.Duration,client *Client, callerRpcHandler IRpcHandler, noReply bool, handlerName string, serviceMethod string, args interface{}, reply interface{}, callback reflect.Value,cancelable bool) (CancelRpc,error) { - rpcHandler := server.rpcHandleFinder.FindRpcHandler(handlerName) - if rpcHandler == nil { - err := errors.New("service method " + serviceMethod + " not config!") - log.Error(err.Error()) - return emptyCancelRpc,err - } - - _, processor := GetProcessorType(args) - iParam,err := processor.Clone(args) - if err != nil { - errM := errors.New("RpcHandler " + handlerName + "."+serviceMethod+" deep copy inParam is error:" + err.Error()) - log.Error(errM.Error()) - return emptyCancelRpc,errM - } - - req := MakeRpcRequest(processor, 0, 0, serviceMethod, noReply, nil) - req.inParam = iParam - req.localReply = reply - - cancelRpc := emptyCancelRpc - var callSeq uint64 - if noReply == false { - callSeq = client.generateSeq() - pCall := MakeCall() - pCall.Seq = callSeq - pCall.rpcHandler = callerRpcHandler - pCall.callback = &callback - pCall.Reply = reply - pCall.ServiceMethod = serviceMethod - pCall.TimeOut = timeout - client.AddPending(pCall) - rpcCancel := RpcCancel{CallSeq: callSeq,Cli: client} - cancelRpc = rpcCancel.CancelRpc - - req.requestHandle = func(Returns interface{}, Err RpcError) { - v := client.RemovePending(callSeq) - if v == nil { - ReleaseRpcRequest(req) - return - } - if len(Err) == 0 { - v.Err = nil - } else { - v.Err = Err - } - - if Returns != nil { - v.Reply = Returns - } - v.rpcHandler.PushRpcResponse(v) - ReleaseRpcRequest(req) - } - } - - err = rpcHandler.PushRpcRequest(req) - if err != nil { - ReleaseRpcRequest(req) - if callSeq > 0 { - client.RemovePending(callSeq) - } - return emptyCancelRpc,err - } - - return cancelRpc,nil -}