优化本结点与跨结点Rpc结构&简化原始Rpc接口

This commit is contained in:
duanhf2012
2023-01-31 13:50:41 +08:00
parent 541abd93b4
commit a7c6b45764
12 changed files with 549 additions and 460 deletions

View File

@@ -110,15 +110,13 @@ func (cls *Cluster) DelNode(nodeId int, immediately bool) {
break
}
rpc.client.Lock()
//正在连接中不主动断开,只断开没有连接中的
if rpc.client.IsConnected() {
nodeInfo.status = Discard
rpc.client.Unlock()
log.SRelease("Discard node ", nodeInfo.NodeId, " ", nodeInfo.ListenAddr)
return
}
rpc.client.Unlock()
break
}
@@ -194,20 +192,17 @@ func (cls *Cluster) serviceDiscoverySetNodeInfo(nodeInfo *NodeInfo) {
if _, rpcInfoOK := cls.mapRpc[nodeInfo.NodeId]; rpcInfoOK == true {
return
}
rpcInfo := NodeRpcInfo{}
rpcInfo.nodeInfo = *nodeInfo
rpcInfo.client = &rpc.Client{}
rpcInfo.client.TriggerRpcEvent = cls.triggerRpcEvent
rpcInfo.client.Connect(nodeInfo.NodeId, nodeInfo.ListenAddr, nodeInfo.MaxRpcParamLen)
rpcInfo.client =rpc.NewRClient(nodeInfo.NodeId, nodeInfo.ListenAddr, nodeInfo.MaxRpcParamLen,cls.triggerRpcEvent)
cls.mapRpc[nodeInfo.NodeId] = rpcInfo
}
func (cls *Cluster) buildLocalRpc() {
rpcInfo := NodeRpcInfo{}
rpcInfo.nodeInfo = cls.localNodeInfo
rpcInfo.client = &rpc.Client{}
rpcInfo.client.Connect(rpcInfo.nodeInfo.NodeId, "", 0)
rpcInfo.client = rpc.NewLClient(rpcInfo.nodeInfo.NodeId)
cls.mapRpc[cls.localNodeInfo.NodeId] = rpcInfo
}
@@ -358,10 +353,10 @@ func (cls *Cluster) IsNodeConnected(nodeId int) bool {
return pClient != nil && pClient.IsConnected()
}
func (cls *Cluster) triggerRpcEvent(bConnect bool, clientSeq uint32, nodeId int) {
func (cls *Cluster) triggerRpcEvent(bConnect bool, clientId uint32, nodeId int) {
cls.locker.Lock()
nodeInfo, ok := cls.mapRpc[nodeId]
if ok == false || nodeInfo.client == nil || nodeInfo.client.GetClientSeq() != clientSeq {
if ok == false || nodeInfo.client == nil || nodeInfo.client.GetClientId() != clientId {
cls.locker.Unlock()
return
}
@@ -383,7 +378,6 @@ func (cls *Cluster) triggerRpcEvent(bConnect bool, clientSeq uint32, nodeId int)
}
}
func (cls *Cluster) TriggerDiscoveryEvent(bDiscovery bool, nodeId int, serviceName []string) {
cls.rpcEventLocker.Lock()
defer cls.rpcEventLocker.Unlock()

View File

@@ -40,6 +40,8 @@ func (p *MsgParser) init(){
if p.MaxMsgLen > max {
p.MaxMsgLen = max
}
p.INetMempool = NewMemAreaPool()
}

View File

@@ -3,91 +3,58 @@ package rpc
import (
"container/list"
"errors"
"fmt"
"github.com/duanhf2012/origin/log"
"github.com/duanhf2012/origin/network"
"math"
"reflect"
"runtime"
"strconv"
"sync"
"sync/atomic"
"time"
)
type Client struct {
clientSeq uint32
id int
bSelfNode bool
network.TCPClient
conn *network.TCPConn
const MaxCheckCallRpcCount = 1000
const MaxPendingWriteNum = 200000
const ConnectInterval = 2*time.Second
const RpcConnNum = 1
const RpcLenMsgLen = 4
const RpcMinMsgLen = 2
const CheckRpcCallTimeoutInterval = 5*time.Second
const DefaultRpcTimeout = 15*time.Second
var clientSeq uint32
type IRealClient interface {
SetConn(conn *network.TCPConn)
Close(waitDone bool)
AsyncCall(rpcHandler IRpcHandler, serviceMethod string, callback reflect.Value, args interface{}, replyParam interface{}) error
Go(rpcHandler IRpcHandler, noReply bool, serviceMethod string, args interface{}, reply interface{}) *Call
RawGo(rpcHandler IRpcHandler,processor IRpcProcessor, noReply bool, rpcMethodId uint32, serviceMethod string, rawArgs []byte, reply interface{}) *Call
IsConnected() bool
Run()
OnClose()
}
type Client struct {
clientId uint32
nodeId int
pendingLock sync.RWMutex
startSeq uint64
pending map[uint64]*list.Element
pendingTimer *list.List
callRpcTimeout time.Duration
maxCheckCallRpcCount int
TriggerRpcEvent
IRealClient
}
const MaxCheckCallRpcCount = 1000
const MaxPendingWriteNum = 200000
const ConnectInterval = 2*time.Second
var clientSeq uint32
func (client *Client) NewClientAgent(conn *network.TCPConn) network.Agent {
client.conn = conn
client.ResetPending()
client.SetConn(conn)
return client
}
func (client *Client) Connect(id int, addr string, maxRpcParamLen uint32) error {
client.clientSeq = atomic.AddUint32(&clientSeq, 1)
client.id = id
client.Addr = addr
client.maxCheckCallRpcCount = MaxCheckCallRpcCount
client.callRpcTimeout = 15 * time.Second
client.ConnectInterval = ConnectInterval
client.PendingWriteNum = MaxPendingWriteNum
client.AutoReconnect = true
client.ConnNum = 1
client.LenMsgLen = 4
client.MinMsgLen = 2
client.ReadDeadline = Default_ReadWriteDeadline
client.WriteDeadline = Default_ReadWriteDeadline
if maxRpcParamLen > 0 {
client.MaxMsgLen = maxRpcParamLen
} else {
client.MaxMsgLen = math.MaxUint32
}
client.NewAgent = client.NewClientAgent
client.LittleEndian = LittleEndian
client.ResetPending()
go client.startCheckRpcCallTimer()
if addr == "" {
client.bSelfNode = true
return nil
}
client.Start()
return nil
}
func (client *Client) startCheckRpcCallTimer() {
for {
time.Sleep(5 * time.Second)
client.checkRpcCallTimeout()
}
}
func (client *Client) makeCallFail(call *Call) {
client.removePending(call.Seq)
func (bc *Client) makeCallFail(call *Call) {
bc.removePending(call.Seq)
if call.callback != nil && call.callback.IsValid() {
call.rpcHandler.PushRpcResponse(call)
} else {
@@ -95,29 +62,38 @@ func (client *Client) makeCallFail(call *Call) {
}
}
func (client *Client) checkRpcCallTimeout() {
now := time.Now()
func (bc *Client) checkRpcCallTimeout() {
for{
time.Sleep(CheckRpcCallTimeoutInterval)
now := time.Now()
for i := 0; i < client.maxCheckCallRpcCount; i++ {
client.pendingLock.Lock()
pElem := client.pendingTimer.Front()
if pElem == nil {
client.pendingLock.Unlock()
for i := 0; i < bc.maxCheckCallRpcCount; i++ {
bc.pendingLock.Lock()
if bc.pendingTimer == nil {
bc.pendingLock.Unlock()
break
}
pElem := bc.pendingTimer.Front()
if pElem == nil {
bc.pendingLock.Unlock()
break
}
pCall := pElem.Value.(*Call)
if now.Sub(pCall.callTime) > bc.callRpcTimeout {
strTimeout := strconv.FormatInt(int64(bc.callRpcTimeout/time.Second), 10)
pCall.Err = errors.New("RPC call takes more than " + strTimeout + " seconds")
bc.makeCallFail(pCall)
bc.pendingLock.Unlock()
continue
}
bc.pendingLock.Unlock()
break
}
pCall := pElem.Value.(*Call)
if now.Sub(pCall.callTime) > client.callRpcTimeout {
strTimeout := strconv.FormatInt(int64(client.callRpcTimeout/time.Second), 10)
pCall.Err = errors.New("RPC call takes more than " + strTimeout + " seconds")
client.makeCallFail(pCall)
client.pendingLock.Unlock()
continue
}
client.pendingLock.Unlock()
}
}
func (client *Client) ResetPending() {
func (client *Client) InitPending() {
client.pendingLock.Lock()
if client.pending != nil {
for _, v := range client.pending {
@@ -131,235 +107,62 @@ func (client *Client) ResetPending() {
client.pendingLock.Unlock()
}
func (client *Client) AddPending(call *Call) {
client.pendingLock.Lock()
func (bc *Client) AddPending(call *Call) {
bc.pendingLock.Lock()
call.callTime = time.Now()
elemTimer := client.pendingTimer.PushBack(call)
client.pending[call.Seq] = elemTimer //如果下面发送失败,将会一一直存在这里
client.pendingLock.Unlock()
elemTimer := bc.pendingTimer.PushBack(call)
bc.pending[call.Seq] = elemTimer //如果下面发送失败,将会一一直存在这里
bc.pendingLock.Unlock()
}
func (client *Client) RemovePending(seq uint64) *Call {
if seq == 0 {
func (bc *Client) RemovePending(seq uint64) *Call {
if seq == 0 {
return nil
}
client.pendingLock.Lock()
call := client.removePending(seq)
client.pendingLock.Unlock()
bc.pendingLock.Lock()
call := bc.removePending(seq)
bc.pendingLock.Unlock()
return call
}
func (client *Client) removePending(seq uint64) *Call {
v, ok := client.pending[seq]
func (bc *Client) removePending(seq uint64) *Call {
v, ok := bc.pending[seq]
if ok == false {
return nil
}
call := v.Value.(*Call)
client.pendingTimer.Remove(v)
delete(client.pending, seq)
bc.pendingTimer.Remove(v)
delete(bc.pending, seq)
return call
}
func (client *Client) FindPending(seq uint64) *Call {
func (bc *Client) FindPending(seq uint64) *Call {
if seq == 0 {
return nil
}
client.pendingLock.Lock()
v, ok := client.pending[seq]
bc.pendingLock.Lock()
v, ok := bc.pending[seq]
if ok == false {
client.pendingLock.Unlock()
bc.pendingLock.Unlock()
return nil
}
pCall := v.Value.(*Call)
client.pendingLock.Unlock()
bc.pendingLock.Unlock()
return pCall
}
func (client *Client) generateSeq() uint64 {
return atomic.AddUint64(&client.startSeq, 1)
func (bc *Client) generateSeq() uint64 {
return atomic.AddUint64(&bc.startSeq, 1)
}
func (client *Client) AsyncCall(rpcHandler IRpcHandler, serviceMethod string, callback reflect.Value, args interface{}, replyParam interface{}) error {
processorType, processor := GetProcessorType(args)
InParam, herr := processor.Marshal(args)
if herr != nil {
return herr
}
seq := client.generateSeq()
request := MakeRpcRequest(processor, seq, 0, serviceMethod, false, InParam)
bytes, err := processor.Marshal(request.RpcRequestData)
ReleaseRpcRequest(request)
if err != nil {
return err
}
if client.conn == nil {
return errors.New("Rpc server is disconnect,call " + serviceMethod)
}
call := MakeCall()
call.Reply = replyParam
call.callback = &callback
call.rpcHandler = rpcHandler
call.ServiceMethod = serviceMethod
call.Seq = seq
client.AddPending(call)
err = client.conn.WriteMsg([]byte{uint8(processorType)}, bytes)
if err != nil {
client.RemovePending(call.Seq)
ReleaseCall(call)
return err
}
return nil
func (client *Client) GetNodeId() int {
return client.nodeId
}
func (client *Client) RawGo(processor IRpcProcessor, noReply bool, rpcMethodId uint32, serviceMethod string, args []byte, reply interface{}) *Call {
call := MakeCall()
call.ServiceMethod = serviceMethod
call.Reply = reply
call.Seq = client.generateSeq()
request := MakeRpcRequest(processor, call.Seq, rpcMethodId, serviceMethod, noReply, args)
bytes, err := processor.Marshal(request.RpcRequestData)
ReleaseRpcRequest(request)
if err != nil {
call.Seq = 0
call.Err = err
return call
}
if client.conn == nil {
call.Seq = 0
call.Err = errors.New(serviceMethod + " was called failed,rpc client is disconnect")
return call
}
if noReply == false {
client.AddPending(call)
}
err = client.conn.WriteMsg([]byte{uint8(processor.GetProcessorType())}, bytes)
if err != nil {
client.RemovePending(call.Seq)
call.Seq = 0
call.Err = err
}
return call
}
func (client *Client) Go(noReply bool, serviceMethod string, args interface{}, reply interface{}) *Call {
_, processor := GetProcessorType(args)
InParam, err := processor.Marshal(args)
if err != nil {
call := MakeCall()
call.Err = err
return call
}
return client.RawGo(processor, noReply, 0, serviceMethod, InParam, reply)
}
func (client *Client) Run() {
defer func() {
if r := recover(); r != nil {
buf := make([]byte, 4096)
l := runtime.Stack(buf, false)
errString := fmt.Sprint(r)
log.SError("core dump info[", errString, "]\n", string(buf[:l]))
}
}()
client.TriggerRpcEvent(true, client.GetClientSeq(), client.GetId())
for {
bytes, err := client.conn.ReadMsg()
if err != nil {
log.SError("rpcClient ", client.Addr, " ReadMsg error:", err.Error())
return
}
processor := GetProcessor(bytes[0])
if processor == nil {
client.conn.ReleaseReadMsg(bytes)
log.SError("rpcClient ", client.Addr, " ReadMsg head error:", err.Error())
return
}
//1.解析head
response := RpcResponse{}
response.RpcResponseData = processor.MakeRpcResponse(0, "", nil)
err = processor.Unmarshal(bytes[1:], response.RpcResponseData)
client.conn.ReleaseReadMsg(bytes)
if err != nil {
processor.ReleaseRpcResponse(response.RpcResponseData)
log.SError("rpcClient Unmarshal head error:", err.Error())
continue
}
v := client.RemovePending(response.RpcResponseData.GetSeq())
if v == nil {
log.SError("rpcClient cannot find seq ", response.RpcResponseData.GetSeq(), " in pending")
} else {
v.Err = nil
if len(response.RpcResponseData.GetReply()) > 0 {
err = processor.Unmarshal(response.RpcResponseData.GetReply(), v.Reply)
if err != nil {
log.SError("rpcClient Unmarshal body error:", err.Error())
v.Err = err
}
}
if response.RpcResponseData.GetErr() != nil {
v.Err = response.RpcResponseData.GetErr()
}
if v.callback != nil && v.callback.IsValid() {
v.rpcHandler.PushRpcResponse(v)
} else {
v.done <- v
}
}
processor.ReleaseRpcResponse(response.RpcResponseData)
}
}
func (client *Client) OnClose() {
client.TriggerRpcEvent(false, client.GetClientSeq(), client.GetId())
}
func (client *Client) IsConnected() bool {
return client.bSelfNode || (client.conn != nil && client.conn.IsConnected() == true)
}
func (client *Client) GetId() int {
return client.id
}
func (client *Client) Close(waitDone bool) {
client.TCPClient.Close(waitDone)
client.pendingLock.Lock()
for {
pElem := client.pendingTimer.Front()
if pElem == nil {
break
}
pCall := pElem.Value.(*Call)
pCall.Err = errors.New("nodeid is disconnect ")
client.makeCallFail(pCall)
}
client.pendingLock.Unlock()
}
func (client *Client) GetClientSeq() uint32 {
return client.clientSeq
func (client *Client) GetClientId() uint32 {
return client.clientId
}

View File

@@ -3,6 +3,7 @@ package rpc
import (
"github.com/duanhf2012/origin/util/sync"
"github.com/gogo/protobuf/proto"
"fmt"
)
type GoGoPBProcessor struct {
@@ -73,6 +74,15 @@ func (slf *GoGoPBProcessor) GetProcessorType() RpcProcessorType{
return RpcProcessorGoGoPB
}
func (slf *GoGoPBProcessor) Clone(src interface{}) (interface{},error){
srcMsg,ok := src.(proto.Message)
if ok == false {
return nil,fmt.Errorf("param is not of proto.message type")
}
return proto.Clone(srcMsg),nil
}
func (slf *GoGoPBRpcRequestData) IsNoReply() bool{
return slf.GetNoReply()
}
@@ -91,5 +101,3 @@ func (slf *GoGoPBRpcResponseData) GetErr() *RpcError {

View File

@@ -3,6 +3,7 @@ package rpc
import (
"github.com/duanhf2012/origin/util/sync"
jsoniter "github.com/json-iterator/go"
"reflect"
)
var json = jsoniter.ConfigCompatibleWithStandardLibrary
@@ -119,6 +120,22 @@ func (jsonRpcResponseData *JsonRpcResponseData) GetReply() []byte{
}
func (jsonProcessor *JsonProcessor) Clone(src interface{}) (interface{},error){
dstValue := reflect.New(reflect.ValueOf(src).Type().Elem())
bytes,err := json.Marshal(src)
if err != nil {
return nil,err
}
dst := dstValue.Interface()
err = json.Unmarshal(bytes,dst)
if err != nil {
return nil,err
}
return dst,nil
}

131
rpc/lclient.go Normal file
View File

@@ -0,0 +1,131 @@
package rpc
import (
"errors"
"github.com/duanhf2012/origin/log"
"github.com/duanhf2012/origin/network"
"reflect"
"strings"
"sync/atomic"
)
//本结点的Client
type LClient struct {
selfClient *Client
}
func (rc *LClient) Lock(){
}
func (rc *LClient) Unlock(){
}
func (lc *LClient) Run(){
}
func (lc *LClient) OnClose(){
}
func (lc *LClient) IsConnected() bool {
return true
}
func (lc *LClient) SetConn(conn *network.TCPConn){
}
func (lc *LClient) Close(waitDone bool){
}
func (lc *LClient) Go(rpcHandler IRpcHandler,noReply bool, serviceMethod string, args interface{}, reply interface{}) *Call {
pLocalRpcServer := rpcHandler.GetRpcServer()()
//判断是否是同一服务
findIndex := strings.Index(serviceMethod, ".")
if findIndex == -1 {
sErr := errors.New("Call serviceMethod " + serviceMethod + " is error!")
log.SError(sErr.Error())
call := MakeCall()
call.Err = sErr
return call
}
serviceName := serviceMethod[:findIndex]
if serviceName == rpcHandler.GetName() { //自己服务调用
//调用自己rpcHandler处理器
err := pLocalRpcServer.myselfRpcHandlerGo(lc.selfClient,serviceName, serviceMethod, args, requestHandlerNull,reply)
call := MakeCall()
if err != nil {
call.Err = err
return call
}
call.done<-call
return call
}
//其他的rpcHandler的处理器
return pLocalRpcServer.selfNodeRpcHandlerGo(nil, lc.selfClient, noReply, serviceName, 0, serviceMethod, args, reply, nil)
}
func (rc *LClient) RawGo(rpcHandler IRpcHandler,processor IRpcProcessor, noReply bool, rpcMethodId uint32, serviceName string, rawArgs []byte, reply interface{}) *Call {
pLocalRpcServer := rpcHandler.GetRpcServer()()
call := MakeCall()
call.ServiceMethod = serviceName
call.Reply = reply
//服务自我调用
if serviceName == rpcHandler.GetName() {
err := pLocalRpcServer.myselfRpcHandlerGo(rc.selfClient,serviceName, serviceName, rawArgs, requestHandlerNull,nil)
call.Err = err
call.done <- call
return call
}
//其他的rpcHandler的处理器
return pLocalRpcServer.selfNodeRpcHandlerGo(processor,rc.selfClient, true, serviceName, rpcMethodId, serviceName, nil, nil, rawArgs)
}
func (lc *LClient) AsyncCall(rpcHandler IRpcHandler, serviceMethod string, callback reflect.Value, args interface{}, reply interface{}) error {
pLocalRpcServer := rpcHandler.GetRpcServer()()
//判断是否是同一服务
findIndex := strings.Index(serviceMethod, ".")
if findIndex == -1 {
err := errors.New("Call serviceMethod " + serviceMethod + " is error!")
callback.Call([]reflect.Value{reflect.ValueOf(reply), reflect.ValueOf(err)})
log.SError(err.Error())
return nil
}
serviceName := serviceMethod[:findIndex]
//调用自己rpcHandler处理器
if serviceName == rpcHandler.GetName() { //自己服务调用
return pLocalRpcServer.myselfRpcHandlerGo(lc.selfClient,serviceName, serviceMethod, args,callback ,reply)
}
//其他的rpcHandler的处理器
err := pLocalRpcServer.selfNodeRpcHandlerAsyncGo(lc.selfClient, rpcHandler, false, serviceName, serviceMethod, args, reply, callback)
if err != nil {
callback.Call([]reflect.Value{reflect.ValueOf(reply), reflect.ValueOf(err)})
}
return nil
}
func NewLClient(nodeId int) *Client{
client := &Client{}
client.clientId = atomic.AddUint32(&clientSeq, 1)
client.nodeId = nodeId
client.maxCheckCallRpcCount = MaxCheckCallRpcCount
client.callRpcTimeout = DefaultRpcTimeout
lClient := &LClient{}
lClient.selfClient = client
client.IRealClient = lClient
client.InitPending()
go client.checkRpcCallTimeout()
return client
}

View File

@@ -1,6 +1,7 @@
package rpc
type IRpcProcessor interface {
Clone(src interface{}) (interface{},error)
Marshal(v interface{}) ([]byte, error) //b表示自定义缓冲区可以填nil由系统自动分配
Unmarshal(data []byte, v interface{}) error
MakeRpcRequest(seq uint64,rpcMethodId uint32,serviceMethod string,noReply bool,inParam []byte) IRpcRequestData

261
rpc/rclient.go Normal file
View File

@@ -0,0 +1,261 @@
package rpc
import (
"errors"
"fmt"
"github.com/duanhf2012/origin/log"
"github.com/duanhf2012/origin/network"
"math"
"reflect"
"runtime"
"sync/atomic"
)
//跨结点连接的Client
type RClient struct {
selfClient *Client
network.TCPClient
conn *network.TCPConn
TriggerRpcConnEvent
}
func (rc *RClient) IsConnected() bool {
rc.Lock()
defer rc.Unlock()
return rc.conn != nil && rc.conn.IsConnected() == true
}
func (rc *RClient) GetConn() *network.TCPConn{
rc.Lock()
conn := rc.conn
rc.Unlock()
return conn
}
func (rc *RClient) SetConn(conn *network.TCPConn){
rc.Lock()
rc.conn = conn
rc.Unlock()
}
func (rc *RClient) Go(rpcHandler IRpcHandler,noReply bool, serviceMethod string, args interface{}, reply interface{}) *Call {
_, processor := GetProcessorType(args)
InParam, err := processor.Marshal(args)
if err != nil {
call := MakeCall()
call.Err = err
return call
}
return rc.RawGo(rpcHandler,processor, noReply, 0, serviceMethod, InParam, reply)
}
func (rc *RClient) RawGo(rpcHandler IRpcHandler,processor IRpcProcessor, noReply bool, rpcMethodId uint32, serviceMethod string, rawArgs []byte, reply interface{}) *Call {
call := MakeCall()
call.ServiceMethod = serviceMethod
call.Reply = reply
call.Seq = rc.selfClient.generateSeq()
request := MakeRpcRequest(processor, call.Seq, rpcMethodId, serviceMethod, noReply, rawArgs)
bytes, err := processor.Marshal(request.RpcRequestData)
ReleaseRpcRequest(request)
if err != nil {
call.Seq = 0
call.Err = err
return call
}
conn := rc.GetConn()
if conn == nil || conn.IsConnected()==false {
call.Seq = 0
call.Err = errors.New(serviceMethod + " was called failed,rpc client is disconnect")
return call
}
if noReply == false {
rc.selfClient.AddPending(call)
}
err = conn.WriteMsg([]byte{uint8(processor.GetProcessorType())}, bytes)
if err != nil {
rc.selfClient.RemovePending(call.Seq)
call.Seq = 0
call.Err = err
}
return call
}
func (rc *RClient) AsyncCall(rpcHandler IRpcHandler, serviceMethod string, callback reflect.Value, args interface{}, replyParam interface{}) error {
err := rc.asyncCall(rpcHandler, serviceMethod, callback, args, replyParam)
if err != nil {
callback.Call([]reflect.Value{reflect.ValueOf(replyParam), reflect.ValueOf(err)})
}
return nil
}
func (rc *RClient) asyncCall(rpcHandler IRpcHandler, serviceMethod string, callback reflect.Value, args interface{}, replyParam interface{}) error {
processorType, processor := GetProcessorType(args)
InParam, herr := processor.Marshal(args)
if herr != nil {
return herr
}
seq := rc.selfClient.generateSeq()
request := MakeRpcRequest(processor, seq, 0, serviceMethod, false, InParam)
bytes, err := processor.Marshal(request.RpcRequestData)
ReleaseRpcRequest(request)
if err != nil {
return err
}
conn := rc.GetConn()
if conn == nil || conn.IsConnected()==false {
return errors.New("Rpc server is disconnect,call " + serviceMethod)
}
call := MakeCall()
call.Reply = replyParam
call.callback = &callback
call.rpcHandler = rpcHandler
call.ServiceMethod = serviceMethod
call.Seq = seq
rc.selfClient.AddPending(call)
err = conn.WriteMsg([]byte{uint8(processorType)}, bytes)
if err != nil {
rc.selfClient.RemovePending(call.Seq)
ReleaseCall(call)
return err
}
return nil
}
func (rc *RClient) Run() {
defer func() {
if r := recover(); r != nil {
buf := make([]byte, 4096)
l := runtime.Stack(buf, false)
errString := fmt.Sprint(r)
log.SError("core dump info[", errString, "]\n", string(buf[:l]))
}
}()
rc.TriggerRpcConnEvent(true, rc.selfClient.GetClientId(), rc.selfClient.GetNodeId())
for {
bytes, err := rc.conn.ReadMsg()
if err != nil {
log.SError("rpcClient ", rc.Addr, " ReadMsg error:", err.Error())
return
}
processor := GetProcessor(bytes[0])
if processor == nil {
rc.conn.ReleaseReadMsg(bytes)
log.SError("rpcClient ", rc.Addr, " ReadMsg head error:", err.Error())
return
}
//1.解析head
response := RpcResponse{}
response.RpcResponseData = processor.MakeRpcResponse(0, "", nil)
err = processor.Unmarshal(bytes[1:], response.RpcResponseData)
rc.conn.ReleaseReadMsg(bytes)
if err != nil {
processor.ReleaseRpcResponse(response.RpcResponseData)
log.SError("rpcClient Unmarshal head error:", err.Error())
continue
}
v := rc.selfClient.RemovePending(response.RpcResponseData.GetSeq())
if v == nil {
log.SError("rpcClient cannot find seq ", response.RpcResponseData.GetSeq(), " in pending")
} else {
v.Err = nil
if len(response.RpcResponseData.GetReply()) > 0 {
err = processor.Unmarshal(response.RpcResponseData.GetReply(), v.Reply)
if err != nil {
log.SError("rpcClient Unmarshal body error:", err.Error())
v.Err = err
}
}
if response.RpcResponseData.GetErr() != nil {
v.Err = response.RpcResponseData.GetErr()
}
if v.callback != nil && v.callback.IsValid() {
v.rpcHandler.PushRpcResponse(v)
} else {
v.done <- v
}
}
processor.ReleaseRpcResponse(response.RpcResponseData)
}
}
func (rc *RClient) OnClose() {
rc.TriggerRpcConnEvent(false, rc.selfClient.GetClientId(), rc.selfClient.GetNodeId())
}
func NewRClient(nodeId int, addr string, maxRpcParamLen uint32,triggerRpcConnEvent TriggerRpcConnEvent) *Client{
client := &Client{}
client.clientId = atomic.AddUint32(&clientSeq, 1)
client.nodeId = nodeId
client.maxCheckCallRpcCount = MaxCheckCallRpcCount
client.callRpcTimeout = DefaultRpcTimeout
c:= &RClient{}
c.selfClient = client
c.Addr = addr
c.ConnectInterval = ConnectInterval
c.PendingWriteNum = MaxPendingWriteNum
c.AutoReconnect = true
c.TriggerRpcConnEvent = triggerRpcConnEvent
c.ConnNum = RpcConnNum
c.LenMsgLen = RpcLenMsgLen
c.MinMsgLen = RpcMinMsgLen
c.ReadDeadline = Default_ReadWriteDeadline
c.WriteDeadline = Default_ReadWriteDeadline
c.LittleEndian = LittleEndian
c.NewAgent = client.NewClientAgent
if maxRpcParamLen > 0 {
c.MaxMsgLen = maxRpcParamLen
} else {
c.MaxMsgLen = math.MaxUint32
}
client.IRealClient = c
client.InitPending()
go client.checkRpcCallTimeout()
c.Start()
return client
}
func (rc *RClient) Close(waitDone bool) {
rc.TCPClient.Close(waitDone)
rc.selfClient.pendingLock.Lock()
for {
pElem := rc.selfClient.pendingTimer.Front()
if pElem == nil {
break
}
pCall := pElem.Value.(*Call)
pCall.Err = errors.New("nodeid is disconnect ")
rc.selfClient.makeCallFail(pCall)
}
rc.selfClient.pendingLock.Unlock()
}

View File

@@ -51,12 +51,6 @@ type IRpcResponseData interface {
GetReply() []byte
}
type IRawInputArgs interface {
GetRawData() []byte //获取原始数据
DoFree() //处理完成,回收内存
DoEscape() //逃逸,GC自动回收
}
type RpcHandleFinder interface {
FindRpcHandler(serviceMethod string) IRpcHandler
}

View File

@@ -6,7 +6,6 @@ import (
"github.com/duanhf2012/origin/log"
"reflect"
"runtime"
"strconv"
"strings"
"unicode"
"unicode/utf8"
@@ -17,6 +16,7 @@ const maxClusterNode int = 128
type FuncRpcClient func(nodeId int, serviceMethod string, client []*Client) (error, int)
type FuncRpcServer func() *Server
var nilError = reflect.Zero(reflect.TypeOf((*error)(nil)).Elem())
type RpcError string
@@ -45,10 +45,7 @@ type RpcMethodInfo struct {
rpcProcessorType RpcProcessorType
}
type RawRpcCallBack interface {
Unmarshal(data []byte) (interface{}, error)
CB(data interface{})
}
type RawRpcCallBack func(rawData []byte)
type IRpcHandlerChannel interface {
PushRpcResponse(call *Call) error
@@ -67,7 +64,7 @@ type RpcHandler struct {
pClientList []*Client
}
type TriggerRpcEvent func(bConnect bool, clientSeq uint32, nodeId int)
type TriggerRpcConnEvent func(bConnect bool, clientSeq uint32, nodeId int)
type INodeListener interface {
OnNodeConnected(nodeId int)
OnNodeDisconnect(nodeId int)
@@ -92,10 +89,11 @@ type IRpcHandler interface {
AsyncCallNode(nodeId int, serviceMethod string, args interface{}, callback interface{}) error
CallNode(nodeId int, serviceMethod string, args interface{}, reply interface{}) error
GoNode(nodeId int, serviceMethod string, args interface{}) error
RawGoNode(rpcProcessorType RpcProcessorType, nodeId int, rpcMethodId uint32, serviceName string, rawArgs IRawInputArgs) error
RawGoNode(rpcProcessorType RpcProcessorType, nodeId int, rpcMethodId uint32, serviceName string, rawArgs []byte) error
CastGo(serviceMethod string, args interface{}) error
IsSingleCoroutine() bool
UnmarshalInParam(rpcProcessor IRpcProcessor, serviceMethod string, rawRpcMethodId uint32, inParam []byte) (interface{}, error)
GetRpcServer() FuncRpcServer
}
func reqHandlerNull(Returns interface{}, Err RpcError) {
@@ -244,8 +242,13 @@ func (handler *RpcHandler) HandlerRpcRequest(request *RpcRequest) {
log.SError("RpcHandler cannot find request rpc id", rawRpcId)
return
}
rawData,ok := request.inParam.([]byte)
if ok == false {
log.SError("RpcHandler " + handler.rpcHandler.GetName()," cannot convert in param to []byte", rawRpcId)
return
}
v.CB(request.inParam)
v(rawData)
return
}
@@ -427,36 +430,8 @@ func (handler *RpcHandler) goRpc(processor IRpcProcessor, bCast bool, nodeId int
}
//2.rpcClient调用
//如果调用本结点服务
for i := 0; i < count; i++ {
if pClientList[i].bSelfNode == true {
pLocalRpcServer := handler.funcRpcServer()
//判断是否是同一服务
findIndex := strings.Index(serviceMethod, ".")
if findIndex == -1 {
sErr := errors.New("Call serviceMethod " + serviceMethod + " is error!")
log.SError(sErr.Error())
err = sErr
continue
}
serviceName := serviceMethod[:findIndex]
if serviceName == handler.rpcHandler.GetName() { //自己服务调用
//调用自己rpcHandler处理器
return pLocalRpcServer.myselfRpcHandlerGo(pClientList[i],serviceName, serviceMethod, args, requestHandlerNull,nil)
}
//其他的rpcHandler的处理器
pCall := pLocalRpcServer.selfNodeRpcHandlerGo(processor, pClientList[i], true, serviceName, 0, serviceMethod, args, nil, nil)
if pCall.Err != nil {
err = pCall.Err
}
pClientList[i].RemovePending(pCall.Seq)
ReleaseCall(pCall)
continue
}
//跨node调用
pCall := pClientList[i].Go(true, serviceMethod, args, nil)
pCall := pClientList[i].Go(handler.rpcHandler,true, serviceMethod, args, nil)
if pCall.Err != nil {
err = pCall.Err
}
@@ -482,38 +457,14 @@ func (handler *RpcHandler) callRpc(nodeId int, serviceMethod string, args interf
return errors.New("cannot call more then 1 node")
}
//2.rpcClient调用
//如果调用本结点服务
pClient := pClientList[0]
if pClient.bSelfNode == true {
pLocalRpcServer := handler.funcRpcServer()
//判断是否是同一服务
findIndex := strings.Index(serviceMethod, ".")
if findIndex == -1 {
err := errors.New("Call serviceMethod " + serviceMethod + "is error!")
log.SError(err.Error())
return err
}
serviceName := serviceMethod[:findIndex]
if serviceName == handler.rpcHandler.GetName() { //自己服务调用
//调用自己rpcHandler处理器
return pLocalRpcServer.myselfRpcHandlerGo(pClient,serviceName, serviceMethod, args,requestHandlerNull, reply)
}
//其他的rpcHandler的处理器
pCall := pLocalRpcServer.selfNodeRpcHandlerGo(nil, pClient, false, serviceName, 0, serviceMethod, args, reply, nil)
err = pCall.Done().Err
pClient.RemovePending(pCall.Seq)
ReleaseCall(pCall)
return err
}
//跨node调用
pCall := pClient.Go(false, serviceMethod, args, reply)
pCall := pClient.Go(handler.rpcHandler,false, serviceMethod, args, reply)
if pCall.Err != nil {
err = pCall.Err
ReleaseCall(pCall)
return err
}
err = pCall.Done().Err
pClient.RemovePending(pCall.Seq)
ReleaseCall(pCall)
@@ -541,12 +492,11 @@ func (handler *RpcHandler) asyncCallRpc(nodeId int, serviceMethod string, args i
}
reply := reflect.New(fVal.Type().In(0).Elem()).Interface()
var pClientList [maxClusterNode]*Client
var pClientList [2]*Client
err, count := handler.funcRpcClient(nodeId, serviceMethod, pClientList[:])
if count == 0 || err != nil {
strNodeId := strconv.Itoa(nodeId)
if err == nil {
err = errors.New("cannot find rpcClient from nodeId " + strNodeId + " " + serviceMethod)
err = fmt.Errorf("cannot find %s from nodeId %d",serviceMethod,nodeId)
}
fVal.Call([]reflect.Value{reflect.ValueOf(reply), reflect.ValueOf(err)})
log.SError("Call serviceMethod is error:", err.Error())
@@ -563,35 +513,9 @@ func (handler *RpcHandler) asyncCallRpc(nodeId int, serviceMethod string, args i
//2.rpcClient调用
//如果调用本结点服务
pClient := pClientList[0]
if pClient.bSelfNode == true {
pLocalRpcServer := handler.funcRpcServer()
//判断是否是同一服务
findIndex := strings.Index(serviceMethod, ".")
if findIndex == -1 {
err := errors.New("Call serviceMethod " + serviceMethod + " is error!")
fVal.Call([]reflect.Value{reflect.ValueOf(reply), reflect.ValueOf(err)})
log.SError(err.Error())
return nil
}
serviceName := serviceMethod[:findIndex]
//调用自己rpcHandler处理器
if serviceName == handler.rpcHandler.GetName() { //自己服务调用
return pLocalRpcServer.myselfRpcHandlerGo(pClient,serviceName, serviceMethod, args,fVal ,reply)
}
pClient.AsyncCall(handler.rpcHandler, serviceMethod, fVal, args, reply)
//其他的rpcHandler的处理器
err = pLocalRpcServer.selfNodeRpcHandlerAsyncGo(pClient, handler, false, serviceName, serviceMethod, args, reply, fVal)
if err != nil {
fVal.Call([]reflect.Value{reflect.ValueOf(reply), reflect.ValueOf(err)})
}
return nil
}
//跨node调用
err = pClient.AsyncCall(handler, serviceMethod, fVal, args, reply)
if err != nil {
fVal.Call([]reflect.Value{reflect.ValueOf(reply), reflect.ValueOf(err)})
}
return nil
}
@@ -631,16 +555,14 @@ func (handler *RpcHandler) CastGo(serviceMethod string, args interface{}) error
return handler.goRpc(nil, true, 0, serviceMethod, args)
}
func (handler *RpcHandler) RawGoNode(rpcProcessorType RpcProcessorType, nodeId int, rpcMethodId uint32, serviceName string, rawArgs IRawInputArgs) error {
func (handler *RpcHandler) RawGoNode(rpcProcessorType RpcProcessorType, nodeId int, rpcMethodId uint32, serviceName string, rawArgs []byte) error {
processor := GetProcessor(uint8(rpcProcessorType))
err, count := handler.funcRpcClient(nodeId, serviceName, handler.pClientList)
if count == 0 || err != nil {
//args.DoGc()
log.SError("Call serviceMethod is error:", err.Error())
return err
}
if count > 1 {
//args.DoGc()
err := errors.New("cannot call more then 1 node")
log.SError(err.Error())
return err
@@ -649,32 +571,12 @@ func (handler *RpcHandler) RawGoNode(rpcProcessorType RpcProcessorType, nodeId i
//2.rpcClient调用
//如果调用本结点服务
for i := 0; i < count; i++ {
if handler.pClientList[i].bSelfNode == true {
pLocalRpcServer := handler.funcRpcServer()
//调用自己rpcHandler处理器
if serviceName == handler.rpcHandler.GetName() { //自己服务调用
err := pLocalRpcServer.myselfRpcHandlerGo(handler.pClientList[i],serviceName, serviceName, rawArgs.GetRawData(), requestHandlerNull,nil)
//args.DoGc()
return err
}
//其他的rpcHandler的处理器
pCall := pLocalRpcServer.selfNodeRpcHandlerGo(processor, handler.pClientList[i], true, serviceName, rpcMethodId, serviceName, nil, nil, rawArgs.GetRawData())
rawArgs.DoEscape()
if pCall.Err != nil {
err = pCall.Err
}
handler.pClientList[i].RemovePending(pCall.Seq)
ReleaseCall(pCall)
continue
}
//跨node调用
pCall := handler.pClientList[i].RawGo(processor, true, rpcMethodId, serviceName, rawArgs.GetRawData(), nil)
rawArgs.DoFree()
pCall := handler.pClientList[i].RawGo(handler.rpcHandler,processor, true, rpcMethodId, serviceName, rawArgs, nil)
if pCall.Err != nil {
err = pCall.Err
}
handler.pClientList[i].RemovePending(pCall.Seq)
ReleaseCall(pCall)
}
@@ -688,23 +590,7 @@ func (handler *RpcHandler) RegRawRpc(rpcMethodId uint32, rawRpcCB RawRpcCallBack
func (handler *RpcHandler) UnmarshalInParam(rpcProcessor IRpcProcessor, serviceMethod string, rawRpcMethodId uint32, inParam []byte) (interface{}, error) {
if rawRpcMethodId > 0 {
v, ok := handler.mapRawFunctions[rawRpcMethodId]
if ok == false {
strRawRpcMethodId := strconv.FormatUint(uint64(rawRpcMethodId), 10)
err := errors.New("RpcHandler cannot find request rpc id " + strRawRpcMethodId)
log.SError(err.Error())
return nil, err
}
msg, err := v.Unmarshal(inParam)
if err != nil {
strRawRpcMethodId := strconv.FormatUint(uint64(rawRpcMethodId), 10)
err := errors.New("RpcHandler cannot Unmarshal rpc id " + strRawRpcMethodId)
log.SError(err.Error())
return nil, err
}
return msg, err
return inParam,nil
}
v, ok := handler.mapFunctions[serviceMethod]
@@ -717,3 +603,8 @@ func (handler *RpcHandler) UnmarshalInParam(rpcProcessor IRpcProcessor, serviceM
err = rpcProcessor.Unmarshal(inParam, param)
return param, err
}
func (handler *RpcHandler) GetRpcServer() FuncRpcServer{
return handler.funcRpcServer
}

View File

@@ -19,7 +19,6 @@ const (
RpcProcessorGoGoPB RpcProcessorType = 1
)
//var processor IRpcProcessor = &JsonProcessor{}
var arrayProcessor = []IRpcProcessor{&JsonProcessor{}, &GoGoPBProcessor{}}
var arrayProcessorLen uint8 = 2
var LittleEndian bool
@@ -245,12 +244,12 @@ func (server *Server) myselfRpcHandlerGo(client *Client,handlerName string, serv
log.SError(err.Error())
return err
}
return rpcHandler.CallMethod(client,serviceMethod, args,callBack, reply)
}
func (server *Server) selfNodeRpcHandlerGo(processor IRpcProcessor, client *Client, noReply bool, handlerName string, rpcMethodId uint32, serviceMethod string, args interface{}, reply interface{}, rawArgs []byte) *Call {
pCall := MakeCall()
pCall.Seq = client.generateSeq()
@@ -266,22 +265,13 @@ func (server *Server) selfNodeRpcHandlerGo(processor IRpcProcessor, client *Clie
}
var iParam interface{}
if processor == nil {
_, processor = GetProcessorType(args)
}
if args != nil {
inParamValue := reflect.New(reflect.ValueOf(args).Type().Elem())
//args
//复制输入参数
iParam = inParamValue.Interface()
bytes,err := processor.Marshal(args)
if err == nil {
err = processor.Unmarshal(bytes,iParam)
}
var err error
iParam,err = processor.Clone(args)
if err != nil {
pCall.Seq = 0
pCall.Err = errors.New("RpcHandler " + handlerName + "."+serviceMethod+" deep copy inParam is error:" + err.Error())
@@ -359,15 +349,7 @@ func (server *Server) selfNodeRpcHandlerAsyncGo(client *Client, callerRpcHandler
}
_, processor := GetProcessorType(args)
inParamValue := reflect.New(reflect.ValueOf(args).Type().Elem())
//args
//复制输入参数
iParam := inParamValue.Interface()
bytes,err := processor.Marshal(args)
if err == nil {
err = processor.Unmarshal(bytes,iParam)
}
iParam,err := processor.Clone(args)
if err != nil {
errM := errors.New("RpcHandler " + handlerName + "."+serviceMethod+" deep copy inParam is error:" + err.Error())
log.SError(errM.Error())

View File

@@ -273,6 +273,11 @@ func (m *Module) SafeNewTicker(tickerId *uint64, d time.Duration, AdditionData i
}
func (m *Module) CancelTimerId(timerId *uint64) bool {
if timerId==nil || *timerId == 0 {
log.SWarning("timerId is invalid")
return false
}
if m.mapActiveIdTimer == nil {
log.SError("mapActiveIdTimer is nil")
return false
@@ -280,7 +285,7 @@ func (m *Module) CancelTimerId(timerId *uint64) bool {
t, ok := m.mapActiveIdTimer[*timerId]
if ok == false {
log.SError("cannot find timer id ", timerId)
log.SStack("cannot find timer id ", timerId)
return false
}