Files
origin/rpc/client.go
2020-04-01 14:42:33 +08:00

206 lines
4.3 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package rpc
import (
"fmt"
"github.com/duanhf2012/origin/log"
"github.com/duanhf2012/origin/network"
"math"
"reflect"
"strings"
"sync"
"time"
)
type Client struct {
blocalhost bool
network.TCPClient
conn *network.TCPConn
pendingLock sync.RWMutex
startSeq uint64
pending map[uint64]*Call
}
func (slf *Client) NewClientAgent(conn *network.TCPConn) network.Agent {
slf.conn = conn
slf.pending = map[uint64]*Call{}
return slf
}
func (slf *Client) Connect(addr string) error {
slf.Addr = addr
if strings.Index(addr,"localhost") == 0 {
slf.blocalhost = true
return nil
}
slf.ConnNum = 1
slf.ConnectInterval = time.Second*2
slf.PendingWriteNum = 10000
slf.AutoReconnect = true
slf.LenMsgLen =2
slf.MinMsgLen = 2
slf.MaxMsgLen = math.MaxUint16
slf.NewAgent =slf.NewClientAgent
slf.LittleEndian = LittleEndian
slf.pendingLock.Lock()
for _,v := range slf.pending {
v.Err = fmt.Errorf("node is disconnect.")
v.done <- v
}
slf.pending = map[uint64]*Call{}
slf.pendingLock.Unlock()
slf.Start()
return nil
}
func (slf *Client) AsycGo(rpcHandler IRpcHandler,mutiCoroutine bool,serviceMethod string,callback reflect.Value, args interface{},replyParam interface{}) error {
call := new(Call)
call.Reply = replyParam
call.callback = &callback
call.rpcHandler = rpcHandler
request := &RpcRequest{}
request.NoReply = false
call.Arg = args
slf.pendingLock.Lock()
slf.startSeq += 1
call.Seq = slf.startSeq
request.Seq = slf.startSeq
slf.pending[call.Seq] = call
slf.pendingLock.Unlock()
request.ServiceMethod = serviceMethod
var herr error
request.InParam,herr = processor.Marshal(args)
if herr != nil {
return herr
}
bytes,err := processor.Marshal(request)
if err != nil {
return err
}
if slf.conn == nil {
return fmt.Errorf("Rpc server is disconnect,call %s is fail!",serviceMethod)
}
err = slf.conn.WriteMsg(bytes)
if err != nil {
call.Err = err
}
return call.Err
}
func (slf *Client) Go(noReply bool,serviceMethod string, args interface{},reply interface{}) *Call {
call := new(Call)
call.done = make(chan *Call,1)
call.Reply = reply
request := &RpcRequest{}
request.NoReply = noReply
call.Arg = args
slf.pendingLock.Lock()
slf.startSeq += 1
call.Seq = slf.startSeq
request.Seq = slf.startSeq
slf.pending[call.Seq] = call
slf.pendingLock.Unlock()
request.ServiceMethod = serviceMethod
var herr error
request.InParam,herr = processor.Marshal(args)
if herr != nil {
call.Err = herr
return call
}
bytes,err := processor.Marshal(request)
if err != nil {
call.Err = err
return call
}
err = slf.conn.WriteMsg(bytes)
if err != nil {
call.Err = err
}
return call
}
type RequestHandler func(Returns interface{},Err *RpcError)
type RpcRequest struct {
//packhead
Seq uint64 // sequence number chosen by client
ServiceMethod string // format: "Service.Method"
NoReply bool //是否需要返回
//packbody
InParam []byte
//other data
localReply interface{}
localParam interface{} //本地调用的参数列表
requestHandle RequestHandler
callback *reflect.Value
}
type RpcResponse struct {
//head
Seq uint64 // sequence number chosen by client
Err *RpcError
//returns
Returns []byte
}
func (slf *Client) Run(){
for {
bytes,err := slf.conn.ReadMsg()
if err != nil {
log.Error("rpcClient %s ReadMsg error:%+v",slf.Addr,err)
return
}
//1.解析head
respone := &RpcResponse{}
err = processor.Unmarshal(bytes,respone)
if err != nil {
log.Error("rpcClient Unmarshal head error,error:%+v",err)
continue
}
slf.pendingLock.Lock()
v,ok := slf.pending[respone.Seq]
if ok == false {
log.Error("rpcClient cannot find seq %d in pending",respone.Seq)
slf.pendingLock.Unlock()
}else {
delete(slf.pending,respone.Seq)
slf.pendingLock.Unlock()
//*****如果对方返回nil测试跨node时调用其他服务rpc不存在的情况
if len(respone.Returns) >0 {
err = processor.Unmarshal(respone.Returns,v.Reply)
if err != nil {
log.Error("rpcClient Unmarshal body error,error:%+v",err)
v.Err = err
}
}
if respone.Err != nil {
v.Err= respone.Err
}
if v.callback.IsValid() {
v.rpcHandler.(*RpcHandler).callResponeCallBack<-v
}else{
//发送至接受者
v.done <- v
}
}
}
}
func (slf *Client) OnClose(){
}