From a7c6b45764f3563c21f9dc000a16c8af8706d044 Mon Sep 17 00:00:00 2001 From: duanhf2012 <6549168@qq.com> Date: Tue, 31 Jan 2023 13:50:41 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E6=9C=AC=E7=BB=93=E7=82=B9?= =?UTF-8?q?=E4=B8=8E=E8=B7=A8=E7=BB=93=E7=82=B9Rpc=E7=BB=93=E6=9E=84&?= =?UTF-8?q?=E7=AE=80=E5=8C=96=E5=8E=9F=E5=A7=8BRpc=E6=8E=A5=E5=8F=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cluster/cluster.go | 18 +- network/tcp_msg.go | 2 + rpc/client.go | 361 ++++++++++------------------------------- rpc/gogopbprocessor.go | 12 +- rpc/jsonprocessor.go | 17 ++ rpc/lclient.go | 131 +++++++++++++++ rpc/processor.go | 1 + rpc/rclient.go | 261 +++++++++++++++++++++++++++++ rpc/rpc.go | 6 - rpc/rpchandler.go | 163 +++---------------- rpc/server.go | 30 +--- service/module.go | 7 +- 12 files changed, 549 insertions(+), 460 deletions(-) create mode 100644 rpc/lclient.go create mode 100644 rpc/rclient.go diff --git a/cluster/cluster.go b/cluster/cluster.go index b578d52..fd4bde9 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -110,15 +110,13 @@ func (cls *Cluster) DelNode(nodeId int, immediately bool) { break } - rpc.client.Lock() //正在连接中不主动断开,只断开没有连接中的 if rpc.client.IsConnected() { nodeInfo.status = Discard - rpc.client.Unlock() log.SRelease("Discard node ", nodeInfo.NodeId, " ", nodeInfo.ListenAddr) return } - rpc.client.Unlock() + break } @@ -194,20 +192,17 @@ func (cls *Cluster) serviceDiscoverySetNodeInfo(nodeInfo *NodeInfo) { if _, rpcInfoOK := cls.mapRpc[nodeInfo.NodeId]; rpcInfoOK == true { return } + rpcInfo := NodeRpcInfo{} rpcInfo.nodeInfo = *nodeInfo - rpcInfo.client = &rpc.Client{} - rpcInfo.client.TriggerRpcEvent = cls.triggerRpcEvent - rpcInfo.client.Connect(nodeInfo.NodeId, nodeInfo.ListenAddr, nodeInfo.MaxRpcParamLen) + rpcInfo.client =rpc.NewRClient(nodeInfo.NodeId, nodeInfo.ListenAddr, nodeInfo.MaxRpcParamLen,cls.triggerRpcEvent) cls.mapRpc[nodeInfo.NodeId] = rpcInfo - } func (cls *Cluster) buildLocalRpc() { rpcInfo := NodeRpcInfo{} rpcInfo.nodeInfo = cls.localNodeInfo - rpcInfo.client = &rpc.Client{} - rpcInfo.client.Connect(rpcInfo.nodeInfo.NodeId, "", 0) + rpcInfo.client = rpc.NewLClient(rpcInfo.nodeInfo.NodeId) cls.mapRpc[cls.localNodeInfo.NodeId] = rpcInfo } @@ -358,10 +353,10 @@ func (cls *Cluster) IsNodeConnected(nodeId int) bool { return pClient != nil && pClient.IsConnected() } -func (cls *Cluster) triggerRpcEvent(bConnect bool, clientSeq uint32, nodeId int) { +func (cls *Cluster) triggerRpcEvent(bConnect bool, clientId uint32, nodeId int) { cls.locker.Lock() nodeInfo, ok := cls.mapRpc[nodeId] - if ok == false || nodeInfo.client == nil || nodeInfo.client.GetClientSeq() != clientSeq { + if ok == false || nodeInfo.client == nil || nodeInfo.client.GetClientId() != clientId { cls.locker.Unlock() return } @@ -383,7 +378,6 @@ func (cls *Cluster) triggerRpcEvent(bConnect bool, clientSeq uint32, nodeId int) } } - func (cls *Cluster) TriggerDiscoveryEvent(bDiscovery bool, nodeId int, serviceName []string) { cls.rpcEventLocker.Lock() defer cls.rpcEventLocker.Unlock() diff --git a/network/tcp_msg.go b/network/tcp_msg.go index 4f128fd..f55f177 100644 --- a/network/tcp_msg.go +++ b/network/tcp_msg.go @@ -40,6 +40,8 @@ func (p *MsgParser) init(){ if p.MaxMsgLen > max { p.MaxMsgLen = max } + + p.INetMempool = NewMemAreaPool() } diff --git a/rpc/client.go b/rpc/client.go index 9fc7282..3e11791 100644 --- a/rpc/client.go +++ b/rpc/client.go @@ -3,91 +3,58 @@ package rpc import ( "container/list" "errors" - "fmt" - "github.com/duanhf2012/origin/log" "github.com/duanhf2012/origin/network" - "math" "reflect" - "runtime" "strconv" "sync" "sync/atomic" "time" ) -type Client struct { - clientSeq uint32 - id int - bSelfNode bool - network.TCPClient - conn *network.TCPConn +const MaxCheckCallRpcCount = 1000 +const MaxPendingWriteNum = 200000 +const ConnectInterval = 2*time.Second +const RpcConnNum = 1 +const RpcLenMsgLen = 4 +const RpcMinMsgLen = 2 +const CheckRpcCallTimeoutInterval = 5*time.Second +const DefaultRpcTimeout = 15*time.Second +var clientSeq uint32 +type IRealClient interface { + SetConn(conn *network.TCPConn) + Close(waitDone bool) + + AsyncCall(rpcHandler IRpcHandler, serviceMethod string, callback reflect.Value, args interface{}, replyParam interface{}) error + Go(rpcHandler IRpcHandler, noReply bool, serviceMethod string, args interface{}, reply interface{}) *Call + RawGo(rpcHandler IRpcHandler,processor IRpcProcessor, noReply bool, rpcMethodId uint32, serviceMethod string, rawArgs []byte, reply interface{}) *Call + IsConnected() bool + + Run() + OnClose() +} + +type Client struct { + clientId uint32 + nodeId int pendingLock sync.RWMutex startSeq uint64 pending map[uint64]*list.Element pendingTimer *list.List callRpcTimeout time.Duration maxCheckCallRpcCount int - TriggerRpcEvent + + IRealClient } -const MaxCheckCallRpcCount = 1000 -const MaxPendingWriteNum = 200000 -const ConnectInterval = 2*time.Second -var clientSeq uint32 - func (client *Client) NewClientAgent(conn *network.TCPConn) network.Agent { - client.conn = conn - client.ResetPending() + client.SetConn(conn) return client } - -func (client *Client) Connect(id int, addr string, maxRpcParamLen uint32) error { - client.clientSeq = atomic.AddUint32(&clientSeq, 1) - client.id = id - client.Addr = addr - client.maxCheckCallRpcCount = MaxCheckCallRpcCount - client.callRpcTimeout = 15 * time.Second - client.ConnectInterval = ConnectInterval - client.PendingWriteNum = MaxPendingWriteNum - client.AutoReconnect = true - - client.ConnNum = 1 - client.LenMsgLen = 4 - client.MinMsgLen = 2 - client.ReadDeadline = Default_ReadWriteDeadline - client.WriteDeadline = Default_ReadWriteDeadline - - if maxRpcParamLen > 0 { - client.MaxMsgLen = maxRpcParamLen - } else { - client.MaxMsgLen = math.MaxUint32 - } - - client.NewAgent = client.NewClientAgent - client.LittleEndian = LittleEndian - client.ResetPending() - go client.startCheckRpcCallTimer() - if addr == "" { - client.bSelfNode = true - return nil - } - - client.Start() - return nil -} - -func (client *Client) startCheckRpcCallTimer() { - for { - time.Sleep(5 * time.Second) - client.checkRpcCallTimeout() - } -} - -func (client *Client) makeCallFail(call *Call) { - client.removePending(call.Seq) +func (bc *Client) makeCallFail(call *Call) { + bc.removePending(call.Seq) if call.callback != nil && call.callback.IsValid() { call.rpcHandler.PushRpcResponse(call) } else { @@ -95,29 +62,38 @@ func (client *Client) makeCallFail(call *Call) { } } -func (client *Client) checkRpcCallTimeout() { - now := time.Now() +func (bc *Client) checkRpcCallTimeout() { + for{ + time.Sleep(CheckRpcCallTimeoutInterval) + now := time.Now() - for i := 0; i < client.maxCheckCallRpcCount; i++ { - client.pendingLock.Lock() - pElem := client.pendingTimer.Front() - if pElem == nil { - client.pendingLock.Unlock() + for i := 0; i < bc.maxCheckCallRpcCount; i++ { + bc.pendingLock.Lock() + if bc.pendingTimer == nil { + bc.pendingLock.Unlock() + break + } + + pElem := bc.pendingTimer.Front() + if pElem == nil { + bc.pendingLock.Unlock() + break + } + pCall := pElem.Value.(*Call) + if now.Sub(pCall.callTime) > bc.callRpcTimeout { + strTimeout := strconv.FormatInt(int64(bc.callRpcTimeout/time.Second), 10) + pCall.Err = errors.New("RPC call takes more than " + strTimeout + " seconds") + bc.makeCallFail(pCall) + bc.pendingLock.Unlock() + continue + } + bc.pendingLock.Unlock() break } - pCall := pElem.Value.(*Call) - if now.Sub(pCall.callTime) > client.callRpcTimeout { - strTimeout := strconv.FormatInt(int64(client.callRpcTimeout/time.Second), 10) - pCall.Err = errors.New("RPC call takes more than " + strTimeout + " seconds") - client.makeCallFail(pCall) - client.pendingLock.Unlock() - continue - } - client.pendingLock.Unlock() } } -func (client *Client) ResetPending() { +func (client *Client) InitPending() { client.pendingLock.Lock() if client.pending != nil { for _, v := range client.pending { @@ -131,235 +107,62 @@ func (client *Client) ResetPending() { client.pendingLock.Unlock() } -func (client *Client) AddPending(call *Call) { - client.pendingLock.Lock() + +func (bc *Client) AddPending(call *Call) { + bc.pendingLock.Lock() call.callTime = time.Now() - elemTimer := client.pendingTimer.PushBack(call) - client.pending[call.Seq] = elemTimer //如果下面发送失败,将会一一直存在这里 - client.pendingLock.Unlock() + elemTimer := bc.pendingTimer.PushBack(call) + bc.pending[call.Seq] = elemTimer //如果下面发送失败,将会一一直存在这里 + bc.pendingLock.Unlock() } -func (client *Client) RemovePending(seq uint64) *Call { - if seq == 0 { +func (bc *Client) RemovePending(seq uint64) *Call { + if seq == 0 { return nil } - client.pendingLock.Lock() - call := client.removePending(seq) - client.pendingLock.Unlock() + bc.pendingLock.Lock() + call := bc.removePending(seq) + bc.pendingLock.Unlock() return call } -func (client *Client) removePending(seq uint64) *Call { - v, ok := client.pending[seq] +func (bc *Client) removePending(seq uint64) *Call { + v, ok := bc.pending[seq] if ok == false { return nil } call := v.Value.(*Call) - client.pendingTimer.Remove(v) - delete(client.pending, seq) + bc.pendingTimer.Remove(v) + delete(bc.pending, seq) return call } -func (client *Client) FindPending(seq uint64) *Call { +func (bc *Client) FindPending(seq uint64) *Call { if seq == 0 { return nil } - client.pendingLock.Lock() - v, ok := client.pending[seq] + bc.pendingLock.Lock() + v, ok := bc.pending[seq] if ok == false { - client.pendingLock.Unlock() + bc.pendingLock.Unlock() return nil } pCall := v.Value.(*Call) - client.pendingLock.Unlock() + bc.pendingLock.Unlock() return pCall } -func (client *Client) generateSeq() uint64 { - return atomic.AddUint64(&client.startSeq, 1) +func (bc *Client) generateSeq() uint64 { + return atomic.AddUint64(&bc.startSeq, 1) } -func (client *Client) AsyncCall(rpcHandler IRpcHandler, serviceMethod string, callback reflect.Value, args interface{}, replyParam interface{}) error { - processorType, processor := GetProcessorType(args) - InParam, herr := processor.Marshal(args) - if herr != nil { - return herr - } - - seq := client.generateSeq() - request := MakeRpcRequest(processor, seq, 0, serviceMethod, false, InParam) - bytes, err := processor.Marshal(request.RpcRequestData) - ReleaseRpcRequest(request) - if err != nil { - return err - } - - if client.conn == nil { - return errors.New("Rpc server is disconnect,call " + serviceMethod) - } - - call := MakeCall() - call.Reply = replyParam - call.callback = &callback - call.rpcHandler = rpcHandler - call.ServiceMethod = serviceMethod - call.Seq = seq - client.AddPending(call) - - err = client.conn.WriteMsg([]byte{uint8(processorType)}, bytes) - if err != nil { - client.RemovePending(call.Seq) - ReleaseCall(call) - return err - } - - return nil +func (client *Client) GetNodeId() int { + return client.nodeId } -func (client *Client) RawGo(processor IRpcProcessor, noReply bool, rpcMethodId uint32, serviceMethod string, args []byte, reply interface{}) *Call { - call := MakeCall() - call.ServiceMethod = serviceMethod - call.Reply = reply - call.Seq = client.generateSeq() - - request := MakeRpcRequest(processor, call.Seq, rpcMethodId, serviceMethod, noReply, args) - bytes, err := processor.Marshal(request.RpcRequestData) - ReleaseRpcRequest(request) - if err != nil { - call.Seq = 0 - call.Err = err - return call - } - - if client.conn == nil { - call.Seq = 0 - call.Err = errors.New(serviceMethod + " was called failed,rpc client is disconnect") - return call - } - - if noReply == false { - client.AddPending(call) - } - - err = client.conn.WriteMsg([]byte{uint8(processor.GetProcessorType())}, bytes) - if err != nil { - client.RemovePending(call.Seq) - call.Seq = 0 - call.Err = err - } - - return call -} - -func (client *Client) Go(noReply bool, serviceMethod string, args interface{}, reply interface{}) *Call { - _, processor := GetProcessorType(args) - InParam, err := processor.Marshal(args) - if err != nil { - call := MakeCall() - call.Err = err - return call - } - - return client.RawGo(processor, noReply, 0, serviceMethod, InParam, reply) -} - -func (client *Client) Run() { - defer func() { - if r := recover(); r != nil { - buf := make([]byte, 4096) - l := runtime.Stack(buf, false) - errString := fmt.Sprint(r) - log.SError("core dump info[", errString, "]\n", string(buf[:l])) - } - }() - - client.TriggerRpcEvent(true, client.GetClientSeq(), client.GetId()) - for { - bytes, err := client.conn.ReadMsg() - if err != nil { - log.SError("rpcClient ", client.Addr, " ReadMsg error:", err.Error()) - return - } - - processor := GetProcessor(bytes[0]) - if processor == nil { - client.conn.ReleaseReadMsg(bytes) - log.SError("rpcClient ", client.Addr, " ReadMsg head error:", err.Error()) - return - } - - //1.解析head - response := RpcResponse{} - response.RpcResponseData = processor.MakeRpcResponse(0, "", nil) - - err = processor.Unmarshal(bytes[1:], response.RpcResponseData) - client.conn.ReleaseReadMsg(bytes) - if err != nil { - processor.ReleaseRpcResponse(response.RpcResponseData) - log.SError("rpcClient Unmarshal head error:", err.Error()) - continue - } - - v := client.RemovePending(response.RpcResponseData.GetSeq()) - if v == nil { - log.SError("rpcClient cannot find seq ", response.RpcResponseData.GetSeq(), " in pending") - } else { - v.Err = nil - if len(response.RpcResponseData.GetReply()) > 0 { - err = processor.Unmarshal(response.RpcResponseData.GetReply(), v.Reply) - if err != nil { - log.SError("rpcClient Unmarshal body error:", err.Error()) - v.Err = err - } - } - - if response.RpcResponseData.GetErr() != nil { - v.Err = response.RpcResponseData.GetErr() - } - - if v.callback != nil && v.callback.IsValid() { - v.rpcHandler.PushRpcResponse(v) - } else { - v.done <- v - } - } - - processor.ReleaseRpcResponse(response.RpcResponseData) - } -} - -func (client *Client) OnClose() { - client.TriggerRpcEvent(false, client.GetClientSeq(), client.GetId()) -} - -func (client *Client) IsConnected() bool { - return client.bSelfNode || (client.conn != nil && client.conn.IsConnected() == true) -} - -func (client *Client) GetId() int { - return client.id -} - -func (client *Client) Close(waitDone bool) { - client.TCPClient.Close(waitDone) - - client.pendingLock.Lock() - for { - pElem := client.pendingTimer.Front() - if pElem == nil { - break - } - - pCall := pElem.Value.(*Call) - pCall.Err = errors.New("nodeid is disconnect ") - client.makeCallFail(pCall) - } - client.pendingLock.Unlock() -} - -func (client *Client) GetClientSeq() uint32 { - return client.clientSeq +func (client *Client) GetClientId() uint32 { + return client.clientId } diff --git a/rpc/gogopbprocessor.go b/rpc/gogopbprocessor.go index 2c2892f..9df0dd5 100644 --- a/rpc/gogopbprocessor.go +++ b/rpc/gogopbprocessor.go @@ -3,6 +3,7 @@ package rpc import ( "github.com/duanhf2012/origin/util/sync" "github.com/gogo/protobuf/proto" + "fmt" ) type GoGoPBProcessor struct { @@ -73,6 +74,15 @@ func (slf *GoGoPBProcessor) GetProcessorType() RpcProcessorType{ return RpcProcessorGoGoPB } +func (slf *GoGoPBProcessor) Clone(src interface{}) (interface{},error){ + srcMsg,ok := src.(proto.Message) + if ok == false { + return nil,fmt.Errorf("param is not of proto.message type") + } + + return proto.Clone(srcMsg),nil +} + func (slf *GoGoPBRpcRequestData) IsNoReply() bool{ return slf.GetNoReply() } @@ -91,5 +101,3 @@ func (slf *GoGoPBRpcResponseData) GetErr() *RpcError { - - diff --git a/rpc/jsonprocessor.go b/rpc/jsonprocessor.go index 01bf6c1..1ff7355 100644 --- a/rpc/jsonprocessor.go +++ b/rpc/jsonprocessor.go @@ -3,6 +3,7 @@ package rpc import ( "github.com/duanhf2012/origin/util/sync" jsoniter "github.com/json-iterator/go" + "reflect" ) var json = jsoniter.ConfigCompatibleWithStandardLibrary @@ -119,6 +120,22 @@ func (jsonRpcResponseData *JsonRpcResponseData) GetReply() []byte{ } +func (jsonProcessor *JsonProcessor) Clone(src interface{}) (interface{},error){ + dstValue := reflect.New(reflect.ValueOf(src).Type().Elem()) + bytes,err := json.Marshal(src) + if err != nil { + return nil,err + } + + dst := dstValue.Interface() + err = json.Unmarshal(bytes,dst) + if err != nil { + return nil,err + } + + return dst,nil +} + diff --git a/rpc/lclient.go b/rpc/lclient.go new file mode 100644 index 0000000..202a1d4 --- /dev/null +++ b/rpc/lclient.go @@ -0,0 +1,131 @@ +package rpc + +import ( + "errors" + "github.com/duanhf2012/origin/log" + "github.com/duanhf2012/origin/network" + "reflect" + "strings" + "sync/atomic" +) + +//本结点的Client +type LClient struct { + selfClient *Client +} + +func (rc *LClient) Lock(){ +} + +func (rc *LClient) Unlock(){ +} + +func (lc *LClient) Run(){ +} + +func (lc *LClient) OnClose(){ +} + +func (lc *LClient) IsConnected() bool { + return true +} + +func (lc *LClient) SetConn(conn *network.TCPConn){ +} + +func (lc *LClient) Close(waitDone bool){ +} + +func (lc *LClient) Go(rpcHandler IRpcHandler,noReply bool, serviceMethod string, args interface{}, reply interface{}) *Call { + pLocalRpcServer := rpcHandler.GetRpcServer()() + //判断是否是同一服务 + findIndex := strings.Index(serviceMethod, ".") + if findIndex == -1 { + sErr := errors.New("Call serviceMethod " + serviceMethod + " is error!") + log.SError(sErr.Error()) + call := MakeCall() + call.Err = sErr + return call + } + + serviceName := serviceMethod[:findIndex] + if serviceName == rpcHandler.GetName() { //自己服务调用 + //调用自己rpcHandler处理器 + err := pLocalRpcServer.myselfRpcHandlerGo(lc.selfClient,serviceName, serviceMethod, args, requestHandlerNull,reply) + call := MakeCall() + if err != nil { + call.Err = err + return call + } + + call.done<-call + return call + } + + //其他的rpcHandler的处理器 + return pLocalRpcServer.selfNodeRpcHandlerGo(nil, lc.selfClient, noReply, serviceName, 0, serviceMethod, args, reply, nil) +} + + +func (rc *LClient) RawGo(rpcHandler IRpcHandler,processor IRpcProcessor, noReply bool, rpcMethodId uint32, serviceName string, rawArgs []byte, reply interface{}) *Call { + pLocalRpcServer := rpcHandler.GetRpcServer()() + + call := MakeCall() + call.ServiceMethod = serviceName + call.Reply = reply + + //服务自我调用 + if serviceName == rpcHandler.GetName() { + err := pLocalRpcServer.myselfRpcHandlerGo(rc.selfClient,serviceName, serviceName, rawArgs, requestHandlerNull,nil) + call.Err = err + call.done <- call + + return call + } + + //其他的rpcHandler的处理器 + return pLocalRpcServer.selfNodeRpcHandlerGo(processor,rc.selfClient, true, serviceName, rpcMethodId, serviceName, nil, nil, rawArgs) +} + + +func (lc *LClient) AsyncCall(rpcHandler IRpcHandler, serviceMethod string, callback reflect.Value, args interface{}, reply interface{}) error { + pLocalRpcServer := rpcHandler.GetRpcServer()() + + //判断是否是同一服务 + findIndex := strings.Index(serviceMethod, ".") + if findIndex == -1 { + err := errors.New("Call serviceMethod " + serviceMethod + " is error!") + callback.Call([]reflect.Value{reflect.ValueOf(reply), reflect.ValueOf(err)}) + log.SError(err.Error()) + return nil + } + + serviceName := serviceMethod[:findIndex] + //调用自己rpcHandler处理器 + if serviceName == rpcHandler.GetName() { //自己服务调用 + return pLocalRpcServer.myselfRpcHandlerGo(lc.selfClient,serviceName, serviceMethod, args,callback ,reply) + } + + //其他的rpcHandler的处理器 + err := pLocalRpcServer.selfNodeRpcHandlerAsyncGo(lc.selfClient, rpcHandler, false, serviceName, serviceMethod, args, reply, callback) + if err != nil { + callback.Call([]reflect.Value{reflect.ValueOf(reply), reflect.ValueOf(err)}) + } + + return nil +} + +func NewLClient(nodeId int) *Client{ + client := &Client{} + client.clientId = atomic.AddUint32(&clientSeq, 1) + client.nodeId = nodeId + client.maxCheckCallRpcCount = MaxCheckCallRpcCount + client.callRpcTimeout = DefaultRpcTimeout + + lClient := &LClient{} + lClient.selfClient = client + client.IRealClient = lClient + client.InitPending() + go client.checkRpcCallTimeout() + return client +} diff --git a/rpc/processor.go b/rpc/processor.go index 9d278bb..c07f67b 100644 --- a/rpc/processor.go +++ b/rpc/processor.go @@ -1,6 +1,7 @@ package rpc type IRpcProcessor interface { + Clone(src interface{}) (interface{},error) Marshal(v interface{}) ([]byte, error) //b表示自定义缓冲区,可以填nil,由系统自动分配 Unmarshal(data []byte, v interface{}) error MakeRpcRequest(seq uint64,rpcMethodId uint32,serviceMethod string,noReply bool,inParam []byte) IRpcRequestData diff --git a/rpc/rclient.go b/rpc/rclient.go new file mode 100644 index 0000000..16778a0 --- /dev/null +++ b/rpc/rclient.go @@ -0,0 +1,261 @@ +package rpc + +import ( + "errors" + "fmt" + "github.com/duanhf2012/origin/log" + "github.com/duanhf2012/origin/network" + "math" + "reflect" + "runtime" + "sync/atomic" +) + +//跨结点连接的Client +type RClient struct { + selfClient *Client + network.TCPClient + conn *network.TCPConn + TriggerRpcConnEvent +} + +func (rc *RClient) IsConnected() bool { + rc.Lock() + defer rc.Unlock() + + return rc.conn != nil && rc.conn.IsConnected() == true +} + +func (rc *RClient) GetConn() *network.TCPConn{ + rc.Lock() + conn := rc.conn + rc.Unlock() + + return conn +} + +func (rc *RClient) SetConn(conn *network.TCPConn){ + rc.Lock() + rc.conn = conn + rc.Unlock() +} + +func (rc *RClient) Go(rpcHandler IRpcHandler,noReply bool, serviceMethod string, args interface{}, reply interface{}) *Call { + _, processor := GetProcessorType(args) + InParam, err := processor.Marshal(args) + if err != nil { + call := MakeCall() + call.Err = err + return call + } + + return rc.RawGo(rpcHandler,processor, noReply, 0, serviceMethod, InParam, reply) +} + + +func (rc *RClient) RawGo(rpcHandler IRpcHandler,processor IRpcProcessor, noReply bool, rpcMethodId uint32, serviceMethod string, rawArgs []byte, reply interface{}) *Call { + call := MakeCall() + call.ServiceMethod = serviceMethod + call.Reply = reply + call.Seq = rc.selfClient.generateSeq() + + request := MakeRpcRequest(processor, call.Seq, rpcMethodId, serviceMethod, noReply, rawArgs) + bytes, err := processor.Marshal(request.RpcRequestData) + ReleaseRpcRequest(request) + + if err != nil { + call.Seq = 0 + call.Err = err + return call + } + + conn := rc.GetConn() + if conn == nil || conn.IsConnected()==false { + call.Seq = 0 + call.Err = errors.New(serviceMethod + " was called failed,rpc client is disconnect") + return call + } + + if noReply == false { + rc.selfClient.AddPending(call) + } + + err = conn.WriteMsg([]byte{uint8(processor.GetProcessorType())}, bytes) + if err != nil { + rc.selfClient.RemovePending(call.Seq) + call.Seq = 0 + call.Err = err + } + + return call +} + + +func (rc *RClient) AsyncCall(rpcHandler IRpcHandler, serviceMethod string, callback reflect.Value, args interface{}, replyParam interface{}) error { + err := rc.asyncCall(rpcHandler, serviceMethod, callback, args, replyParam) + if err != nil { + callback.Call([]reflect.Value{reflect.ValueOf(replyParam), reflect.ValueOf(err)}) + } + + return nil +} + +func (rc *RClient) asyncCall(rpcHandler IRpcHandler, serviceMethod string, callback reflect.Value, args interface{}, replyParam interface{}) error { + processorType, processor := GetProcessorType(args) + InParam, herr := processor.Marshal(args) + if herr != nil { + return herr + } + + seq := rc.selfClient.generateSeq() + request := MakeRpcRequest(processor, seq, 0, serviceMethod, false, InParam) + bytes, err := processor.Marshal(request.RpcRequestData) + ReleaseRpcRequest(request) + if err != nil { + return err + } + + conn := rc.GetConn() + if conn == nil || conn.IsConnected()==false { + return errors.New("Rpc server is disconnect,call " + serviceMethod) + } + + call := MakeCall() + call.Reply = replyParam + call.callback = &callback + call.rpcHandler = rpcHandler + call.ServiceMethod = serviceMethod + call.Seq = seq + rc.selfClient.AddPending(call) + + err = conn.WriteMsg([]byte{uint8(processorType)}, bytes) + if err != nil { + rc.selfClient.RemovePending(call.Seq) + ReleaseCall(call) + return err + } + + return nil +} + +func (rc *RClient) Run() { + defer func() { + if r := recover(); r != nil { + buf := make([]byte, 4096) + l := runtime.Stack(buf, false) + errString := fmt.Sprint(r) + log.SError("core dump info[", errString, "]\n", string(buf[:l])) + } + }() + + rc.TriggerRpcConnEvent(true, rc.selfClient.GetClientId(), rc.selfClient.GetNodeId()) + for { + bytes, err := rc.conn.ReadMsg() + if err != nil { + log.SError("rpcClient ", rc.Addr, " ReadMsg error:", err.Error()) + return + } + + processor := GetProcessor(bytes[0]) + if processor == nil { + rc.conn.ReleaseReadMsg(bytes) + log.SError("rpcClient ", rc.Addr, " ReadMsg head error:", err.Error()) + return + } + + //1.解析head + response := RpcResponse{} + response.RpcResponseData = processor.MakeRpcResponse(0, "", nil) + + err = processor.Unmarshal(bytes[1:], response.RpcResponseData) + rc.conn.ReleaseReadMsg(bytes) + if err != nil { + processor.ReleaseRpcResponse(response.RpcResponseData) + log.SError("rpcClient Unmarshal head error:", err.Error()) + continue + } + + v := rc.selfClient.RemovePending(response.RpcResponseData.GetSeq()) + if v == nil { + log.SError("rpcClient cannot find seq ", response.RpcResponseData.GetSeq(), " in pending") + } else { + v.Err = nil + if len(response.RpcResponseData.GetReply()) > 0 { + err = processor.Unmarshal(response.RpcResponseData.GetReply(), v.Reply) + if err != nil { + log.SError("rpcClient Unmarshal body error:", err.Error()) + v.Err = err + } + } + + if response.RpcResponseData.GetErr() != nil { + v.Err = response.RpcResponseData.GetErr() + } + + if v.callback != nil && v.callback.IsValid() { + v.rpcHandler.PushRpcResponse(v) + } else { + v.done <- v + } + } + + processor.ReleaseRpcResponse(response.RpcResponseData) + } +} + +func (rc *RClient) OnClose() { + rc.TriggerRpcConnEvent(false, rc.selfClient.GetClientId(), rc.selfClient.GetNodeId()) +} + +func NewRClient(nodeId int, addr string, maxRpcParamLen uint32,triggerRpcConnEvent TriggerRpcConnEvent) *Client{ + client := &Client{} + client.clientId = atomic.AddUint32(&clientSeq, 1) + client.nodeId = nodeId + client.maxCheckCallRpcCount = MaxCheckCallRpcCount + client.callRpcTimeout = DefaultRpcTimeout + + c:= &RClient{} + c.selfClient = client + c.Addr = addr + c.ConnectInterval = ConnectInterval + c.PendingWriteNum = MaxPendingWriteNum + c.AutoReconnect = true + c.TriggerRpcConnEvent = triggerRpcConnEvent + c.ConnNum = RpcConnNum + c.LenMsgLen = RpcLenMsgLen + c.MinMsgLen = RpcMinMsgLen + c.ReadDeadline = Default_ReadWriteDeadline + c.WriteDeadline = Default_ReadWriteDeadline + c.LittleEndian = LittleEndian + c.NewAgent = client.NewClientAgent + + if maxRpcParamLen > 0 { + c.MaxMsgLen = maxRpcParamLen + } else { + c.MaxMsgLen = math.MaxUint32 + } + client.IRealClient = c + client.InitPending() + go client.checkRpcCallTimeout() + c.Start() + return client +} + + +func (rc *RClient) Close(waitDone bool) { + rc.TCPClient.Close(waitDone) + + rc.selfClient.pendingLock.Lock() + for { + pElem := rc.selfClient.pendingTimer.Front() + if pElem == nil { + break + } + + pCall := pElem.Value.(*Call) + pCall.Err = errors.New("nodeid is disconnect ") + rc.selfClient.makeCallFail(pCall) + } + rc.selfClient.pendingLock.Unlock() +} + diff --git a/rpc/rpc.go b/rpc/rpc.go index 1b8252a..e16753e 100644 --- a/rpc/rpc.go +++ b/rpc/rpc.go @@ -51,12 +51,6 @@ type IRpcResponseData interface { GetReply() []byte } -type IRawInputArgs interface { - GetRawData() []byte //获取原始数据 - DoFree() //处理完成,回收内存 - DoEscape() //逃逸,GC自动回收 -} - type RpcHandleFinder interface { FindRpcHandler(serviceMethod string) IRpcHandler } diff --git a/rpc/rpchandler.go b/rpc/rpchandler.go index 00605a6..7e4c19d 100644 --- a/rpc/rpchandler.go +++ b/rpc/rpchandler.go @@ -6,7 +6,6 @@ import ( "github.com/duanhf2012/origin/log" "reflect" "runtime" - "strconv" "strings" "unicode" "unicode/utf8" @@ -17,6 +16,7 @@ const maxClusterNode int = 128 type FuncRpcClient func(nodeId int, serviceMethod string, client []*Client) (error, int) type FuncRpcServer func() *Server + var nilError = reflect.Zero(reflect.TypeOf((*error)(nil)).Elem()) type RpcError string @@ -45,10 +45,7 @@ type RpcMethodInfo struct { rpcProcessorType RpcProcessorType } -type RawRpcCallBack interface { - Unmarshal(data []byte) (interface{}, error) - CB(data interface{}) -} +type RawRpcCallBack func(rawData []byte) type IRpcHandlerChannel interface { PushRpcResponse(call *Call) error @@ -67,7 +64,7 @@ type RpcHandler struct { pClientList []*Client } -type TriggerRpcEvent func(bConnect bool, clientSeq uint32, nodeId int) +type TriggerRpcConnEvent func(bConnect bool, clientSeq uint32, nodeId int) type INodeListener interface { OnNodeConnected(nodeId int) OnNodeDisconnect(nodeId int) @@ -92,10 +89,11 @@ type IRpcHandler interface { AsyncCallNode(nodeId int, serviceMethod string, args interface{}, callback interface{}) error CallNode(nodeId int, serviceMethod string, args interface{}, reply interface{}) error GoNode(nodeId int, serviceMethod string, args interface{}) error - RawGoNode(rpcProcessorType RpcProcessorType, nodeId int, rpcMethodId uint32, serviceName string, rawArgs IRawInputArgs) error + RawGoNode(rpcProcessorType RpcProcessorType, nodeId int, rpcMethodId uint32, serviceName string, rawArgs []byte) error CastGo(serviceMethod string, args interface{}) error IsSingleCoroutine() bool UnmarshalInParam(rpcProcessor IRpcProcessor, serviceMethod string, rawRpcMethodId uint32, inParam []byte) (interface{}, error) + GetRpcServer() FuncRpcServer } func reqHandlerNull(Returns interface{}, Err RpcError) { @@ -244,8 +242,13 @@ func (handler *RpcHandler) HandlerRpcRequest(request *RpcRequest) { log.SError("RpcHandler cannot find request rpc id", rawRpcId) return } + rawData,ok := request.inParam.([]byte) + if ok == false { + log.SError("RpcHandler " + handler.rpcHandler.GetName()," cannot convert in param to []byte", rawRpcId) + return + } - v.CB(request.inParam) + v(rawData) return } @@ -427,36 +430,8 @@ func (handler *RpcHandler) goRpc(processor IRpcProcessor, bCast bool, nodeId int } //2.rpcClient调用 - //如果调用本结点服务 for i := 0; i < count; i++ { - if pClientList[i].bSelfNode == true { - pLocalRpcServer := handler.funcRpcServer() - //判断是否是同一服务 - findIndex := strings.Index(serviceMethod, ".") - if findIndex == -1 { - sErr := errors.New("Call serviceMethod " + serviceMethod + " is error!") - log.SError(sErr.Error()) - err = sErr - - continue - } - serviceName := serviceMethod[:findIndex] - if serviceName == handler.rpcHandler.GetName() { //自己服务调用 - //调用自己rpcHandler处理器 - return pLocalRpcServer.myselfRpcHandlerGo(pClientList[i],serviceName, serviceMethod, args, requestHandlerNull,nil) - } - //其他的rpcHandler的处理器 - pCall := pLocalRpcServer.selfNodeRpcHandlerGo(processor, pClientList[i], true, serviceName, 0, serviceMethod, args, nil, nil) - if pCall.Err != nil { - err = pCall.Err - } - pClientList[i].RemovePending(pCall.Seq) - ReleaseCall(pCall) - continue - } - - //跨node调用 - pCall := pClientList[i].Go(true, serviceMethod, args, nil) + pCall := pClientList[i].Go(handler.rpcHandler,true, serviceMethod, args, nil) if pCall.Err != nil { err = pCall.Err } @@ -482,38 +457,14 @@ func (handler *RpcHandler) callRpc(nodeId int, serviceMethod string, args interf return errors.New("cannot call more then 1 node") } - //2.rpcClient调用 - //如果调用本结点服务 pClient := pClientList[0] - if pClient.bSelfNode == true { - pLocalRpcServer := handler.funcRpcServer() - //判断是否是同一服务 - findIndex := strings.Index(serviceMethod, ".") - if findIndex == -1 { - err := errors.New("Call serviceMethod " + serviceMethod + "is error!") - log.SError(err.Error()) - return err - } - serviceName := serviceMethod[:findIndex] - if serviceName == handler.rpcHandler.GetName() { //自己服务调用 - //调用自己rpcHandler处理器 - return pLocalRpcServer.myselfRpcHandlerGo(pClient,serviceName, serviceMethod, args,requestHandlerNull, reply) - } - //其他的rpcHandler的处理器 - pCall := pLocalRpcServer.selfNodeRpcHandlerGo(nil, pClient, false, serviceName, 0, serviceMethod, args, reply, nil) - err = pCall.Done().Err - pClient.RemovePending(pCall.Seq) - ReleaseCall(pCall) - return err - } - - //跨node调用 - pCall := pClient.Go(false, serviceMethod, args, reply) + pCall := pClient.Go(handler.rpcHandler,false, serviceMethod, args, reply) if pCall.Err != nil { err = pCall.Err ReleaseCall(pCall) return err } + err = pCall.Done().Err pClient.RemovePending(pCall.Seq) ReleaseCall(pCall) @@ -541,12 +492,11 @@ func (handler *RpcHandler) asyncCallRpc(nodeId int, serviceMethod string, args i } reply := reflect.New(fVal.Type().In(0).Elem()).Interface() - var pClientList [maxClusterNode]*Client + var pClientList [2]*Client err, count := handler.funcRpcClient(nodeId, serviceMethod, pClientList[:]) if count == 0 || err != nil { - strNodeId := strconv.Itoa(nodeId) if err == nil { - err = errors.New("cannot find rpcClient from nodeId " + strNodeId + " " + serviceMethod) + err = fmt.Errorf("cannot find %s from nodeId %d",serviceMethod,nodeId) } fVal.Call([]reflect.Value{reflect.ValueOf(reply), reflect.ValueOf(err)}) log.SError("Call serviceMethod is error:", err.Error()) @@ -563,35 +513,9 @@ func (handler *RpcHandler) asyncCallRpc(nodeId int, serviceMethod string, args i //2.rpcClient调用 //如果调用本结点服务 pClient := pClientList[0] - if pClient.bSelfNode == true { - pLocalRpcServer := handler.funcRpcServer() - //判断是否是同一服务 - findIndex := strings.Index(serviceMethod, ".") - if findIndex == -1 { - err := errors.New("Call serviceMethod " + serviceMethod + " is error!") - fVal.Call([]reflect.Value{reflect.ValueOf(reply), reflect.ValueOf(err)}) - log.SError(err.Error()) - return nil - } - serviceName := serviceMethod[:findIndex] - //调用自己rpcHandler处理器 - if serviceName == handler.rpcHandler.GetName() { //自己服务调用 - return pLocalRpcServer.myselfRpcHandlerGo(pClient,serviceName, serviceMethod, args,fVal ,reply) - } + pClient.AsyncCall(handler.rpcHandler, serviceMethod, fVal, args, reply) - //其他的rpcHandler的处理器 - err = pLocalRpcServer.selfNodeRpcHandlerAsyncGo(pClient, handler, false, serviceName, serviceMethod, args, reply, fVal) - if err != nil { - fVal.Call([]reflect.Value{reflect.ValueOf(reply), reflect.ValueOf(err)}) - } - return nil - } - - //跨node调用 - err = pClient.AsyncCall(handler, serviceMethod, fVal, args, reply) - if err != nil { - fVal.Call([]reflect.Value{reflect.ValueOf(reply), reflect.ValueOf(err)}) - } + return nil } @@ -631,16 +555,14 @@ func (handler *RpcHandler) CastGo(serviceMethod string, args interface{}) error return handler.goRpc(nil, true, 0, serviceMethod, args) } -func (handler *RpcHandler) RawGoNode(rpcProcessorType RpcProcessorType, nodeId int, rpcMethodId uint32, serviceName string, rawArgs IRawInputArgs) error { +func (handler *RpcHandler) RawGoNode(rpcProcessorType RpcProcessorType, nodeId int, rpcMethodId uint32, serviceName string, rawArgs []byte) error { processor := GetProcessor(uint8(rpcProcessorType)) err, count := handler.funcRpcClient(nodeId, serviceName, handler.pClientList) if count == 0 || err != nil { - //args.DoGc() log.SError("Call serviceMethod is error:", err.Error()) return err } if count > 1 { - //args.DoGc() err := errors.New("cannot call more then 1 node") log.SError(err.Error()) return err @@ -649,32 +571,12 @@ func (handler *RpcHandler) RawGoNode(rpcProcessorType RpcProcessorType, nodeId i //2.rpcClient调用 //如果调用本结点服务 for i := 0; i < count; i++ { - if handler.pClientList[i].bSelfNode == true { - pLocalRpcServer := handler.funcRpcServer() - //调用自己rpcHandler处理器 - if serviceName == handler.rpcHandler.GetName() { //自己服务调用 - err := pLocalRpcServer.myselfRpcHandlerGo(handler.pClientList[i],serviceName, serviceName, rawArgs.GetRawData(), requestHandlerNull,nil) - //args.DoGc() - return err - } - - //其他的rpcHandler的处理器 - pCall := pLocalRpcServer.selfNodeRpcHandlerGo(processor, handler.pClientList[i], true, serviceName, rpcMethodId, serviceName, nil, nil, rawArgs.GetRawData()) - rawArgs.DoEscape() - if pCall.Err != nil { - err = pCall.Err - } - handler.pClientList[i].RemovePending(pCall.Seq) - ReleaseCall(pCall) - continue - } - //跨node调用 - pCall := handler.pClientList[i].RawGo(processor, true, rpcMethodId, serviceName, rawArgs.GetRawData(), nil) - rawArgs.DoFree() + pCall := handler.pClientList[i].RawGo(handler.rpcHandler,processor, true, rpcMethodId, serviceName, rawArgs, nil) if pCall.Err != nil { err = pCall.Err } + handler.pClientList[i].RemovePending(pCall.Seq) ReleaseCall(pCall) } @@ -688,23 +590,7 @@ func (handler *RpcHandler) RegRawRpc(rpcMethodId uint32, rawRpcCB RawRpcCallBack func (handler *RpcHandler) UnmarshalInParam(rpcProcessor IRpcProcessor, serviceMethod string, rawRpcMethodId uint32, inParam []byte) (interface{}, error) { if rawRpcMethodId > 0 { - v, ok := handler.mapRawFunctions[rawRpcMethodId] - if ok == false { - strRawRpcMethodId := strconv.FormatUint(uint64(rawRpcMethodId), 10) - err := errors.New("RpcHandler cannot find request rpc id " + strRawRpcMethodId) - log.SError(err.Error()) - return nil, err - } - - msg, err := v.Unmarshal(inParam) - if err != nil { - strRawRpcMethodId := strconv.FormatUint(uint64(rawRpcMethodId), 10) - err := errors.New("RpcHandler cannot Unmarshal rpc id " + strRawRpcMethodId) - log.SError(err.Error()) - return nil, err - } - - return msg, err + return inParam,nil } v, ok := handler.mapFunctions[serviceMethod] @@ -717,3 +603,8 @@ func (handler *RpcHandler) UnmarshalInParam(rpcProcessor IRpcProcessor, serviceM err = rpcProcessor.Unmarshal(inParam, param) return param, err } + + +func (handler *RpcHandler) GetRpcServer() FuncRpcServer{ + return handler.funcRpcServer +} diff --git a/rpc/server.go b/rpc/server.go index 7a2146c..a268956 100644 --- a/rpc/server.go +++ b/rpc/server.go @@ -19,7 +19,6 @@ const ( RpcProcessorGoGoPB RpcProcessorType = 1 ) -//var processor IRpcProcessor = &JsonProcessor{} var arrayProcessor = []IRpcProcessor{&JsonProcessor{}, &GoGoPBProcessor{}} var arrayProcessorLen uint8 = 2 var LittleEndian bool @@ -245,12 +244,12 @@ func (server *Server) myselfRpcHandlerGo(client *Client,handlerName string, serv log.SError(err.Error()) return err } - - - + return rpcHandler.CallMethod(client,serviceMethod, args,callBack, reply) } + + func (server *Server) selfNodeRpcHandlerGo(processor IRpcProcessor, client *Client, noReply bool, handlerName string, rpcMethodId uint32, serviceMethod string, args interface{}, reply interface{}, rawArgs []byte) *Call { pCall := MakeCall() pCall.Seq = client.generateSeq() @@ -266,22 +265,13 @@ func (server *Server) selfNodeRpcHandlerGo(processor IRpcProcessor, client *Clie } var iParam interface{} - - if processor == nil { _, processor = GetProcessorType(args) } if args != nil { - inParamValue := reflect.New(reflect.ValueOf(args).Type().Elem()) - //args - //复制输入参数 - iParam = inParamValue.Interface() - bytes,err := processor.Marshal(args) - if err == nil { - err = processor.Unmarshal(bytes,iParam) - } - + var err error + iParam,err = processor.Clone(args) if err != nil { pCall.Seq = 0 pCall.Err = errors.New("RpcHandler " + handlerName + "."+serviceMethod+" deep copy inParam is error:" + err.Error()) @@ -359,15 +349,7 @@ func (server *Server) selfNodeRpcHandlerAsyncGo(client *Client, callerRpcHandler } _, processor := GetProcessorType(args) - inParamValue := reflect.New(reflect.ValueOf(args).Type().Elem()) - //args - //复制输入参数 - iParam := inParamValue.Interface() - bytes,err := processor.Marshal(args) - if err == nil { - err = processor.Unmarshal(bytes,iParam) - } - + iParam,err := processor.Clone(args) if err != nil { errM := errors.New("RpcHandler " + handlerName + "."+serviceMethod+" deep copy inParam is error:" + err.Error()) log.SError(errM.Error()) diff --git a/service/module.go b/service/module.go index 35db9b8..5df993e 100644 --- a/service/module.go +++ b/service/module.go @@ -273,6 +273,11 @@ func (m *Module) SafeNewTicker(tickerId *uint64, d time.Duration, AdditionData i } func (m *Module) CancelTimerId(timerId *uint64) bool { + if timerId==nil || *timerId == 0 { + log.SWarning("timerId is invalid") + return false + } + if m.mapActiveIdTimer == nil { log.SError("mapActiveIdTimer is nil") return false @@ -280,7 +285,7 @@ func (m *Module) CancelTimerId(timerId *uint64) bool { t, ok := m.mapActiveIdTimer[*timerId] if ok == false { - log.SError("cannot find timer id ", timerId) + log.SStack("cannot find timer id ", timerId) return false }