优化工程结构

This commit is contained in:
boyce
2020-10-30 16:32:37 +08:00
parent 3025eaebd5
commit d2f52b382d
33 changed files with 1087 additions and 1210 deletions

View File

@@ -18,146 +18,146 @@ type Client struct {
network.TCPClient
conn *network.TCPConn
pendingLock sync.RWMutex
startSeq uint64
pending map[uint64]*list.Element
pendingTimer *list.List
callRpcTimerout time.Duration
pendingLock sync.RWMutex
startSeq uint64
pending map[uint64]*list.Element
pendingTimer *list.List
callRpcTimeout time.Duration
maxCheckCallRpcCount int
}
func (slf *Client) NewClientAgent(conn *network.TCPConn) network.Agent {
slf.conn = conn
slf.ResetPending()
func (client *Client) NewClientAgent(conn *network.TCPConn) network.Agent {
client.conn = conn
client.ResetPending()
return slf
return client
}
func (slf *Client) Connect(addr string) error {
slf.Addr = addr
slf.maxCheckCallRpcCount = 100
slf.callRpcTimerout = 15*time.Second
slf.ConnNum = 1
slf.ConnectInterval = time.Second*2
slf.PendingWriteNum = 2000000
slf.AutoReconnect = true
slf.LenMsgLen = 2
slf.MinMsgLen = 2
slf.MaxMsgLen = math.MaxUint16
slf.NewAgent = slf.NewClientAgent
slf.LittleEndian = LittleEndian
slf.ResetPending()
go slf.startCheckRpcCallTimer()
func (client *Client) Connect(addr string) error {
client.Addr = addr
client.maxCheckCallRpcCount = 100
client.callRpcTimeout = 15*time.Second
client.ConnNum = 1
client.ConnectInterval = time.Second*2
client.PendingWriteNum = 2000000
client.AutoReconnect = true
client.LenMsgLen = 2
client.MinMsgLen = 2
client.MaxMsgLen = math.MaxUint16
client.NewAgent = client.NewClientAgent
client.LittleEndian = LittleEndian
client.ResetPending()
go client.startCheckRpcCallTimer()
if addr == "" {
slf.bSelfNode = true
client.bSelfNode = true
return nil
}
slf.Start()
client.Start()
return nil
}
func (slf *Client) startCheckRpcCallTimer(){
func (client *Client) startCheckRpcCallTimer(){
tick :=time.NewTicker( 3 * time.Second)
for{
select {
case <- tick.C:
slf.checkRpcCallTimerout()
client.checkRpcCallTimeout()
}
}
tick.Stop()
}
func (slf *Client) makeCallFail(call *Call){
func (client *Client) makeCallFail(call *Call){
if call.callback!=nil && call.callback.IsValid() {
call.rpcHandler.(*RpcHandler).callResponeCallBack<-call
call.rpcHandler.(*RpcHandler).callResponseCallBack <-call
}else{
call.done <- call
}
slf.removePending(call.Seq)
client.removePending(call.Seq)
}
func (slf *Client) checkRpcCallTimerout(){
tnow := time.Now()
func (client *Client) checkRpcCallTimeout(){
now := time.Now()
for i:=0;i<slf.maxCheckCallRpcCount;i++ {
slf.pendingLock.Lock()
pElem := slf.pendingTimer.Front()
for i:=0;i< client.maxCheckCallRpcCount;i++ {
client.pendingLock.Lock()
pElem := client.pendingTimer.Front()
if pElem == nil {
slf.pendingLock.Unlock()
client.pendingLock.Unlock()
break
}
pCall := pElem.Value.(*Call)
if tnow.Sub(pCall.calltime) > slf.callRpcTimerout {
pCall.Err = fmt.Errorf("RPC call takes more than %d seconds!",slf.callRpcTimerout/time.Second)
slf.makeCallFail(pCall)
slf.pendingLock.Unlock()
if now.Sub(pCall.callTime) > client.callRpcTimeout {
pCall.Err = fmt.Errorf("RPC call takes more than %d seconds!", client.callRpcTimeout/time.Second)
client.makeCallFail(pCall)
client.pendingLock.Unlock()
continue
}
slf.pendingLock.Unlock()
client.pendingLock.Unlock()
}
}
func (slf *Client) ResetPending(){
slf.pendingLock.Lock()
if slf.pending != nil {
for _,v := range slf.pending {
func (client *Client) ResetPending(){
client.pendingLock.Lock()
if client.pending != nil {
for _,v := range client.pending {
v.Value.(*Call).Err = fmt.Errorf("node is disconnect.")
v.Value.(*Call).done <- v.Value.(*Call)
}
}
slf.pending = map[uint64]*list.Element{}
slf.pendingTimer = list.New()
slf.pendingLock.Unlock()
client.pending = map[uint64]*list.Element{}
client.pendingTimer = list.New()
client.pendingLock.Unlock()
}
func (slf *Client) AddPending(call *Call){
slf.pendingLock.Lock()
call.calltime = time.Now()
elemTimer := slf.pendingTimer.PushBack(call)
slf.pending[call.Seq] = elemTimer//如果下面发送失败,将会一一直存在这里
slf.pendingLock.Unlock()
func (client *Client) AddPending(call *Call){
client.pendingLock.Lock()
call.callTime = time.Now()
elemTimer := client.pendingTimer.PushBack(call)
client.pending[call.Seq] = elemTimer //如果下面发送失败,将会一一直存在这里
client.pendingLock.Unlock()
}
func (slf *Client) RemovePending(seq uint64) *Call{
slf.pendingLock.Lock()
call := slf.removePending(seq)
slf.pendingLock.Unlock()
func (client *Client) RemovePending(seq uint64) *Call{
client.pendingLock.Lock()
call := client.removePending(seq)
client.pendingLock.Unlock()
return call
}
func (slf *Client) removePending(seq uint64) *Call{
v,ok := slf.pending[seq]
func (client *Client) removePending(seq uint64) *Call{
v,ok := client.pending[seq]
if ok == false{
return nil
}
slf.pendingTimer.Remove(v)
delete(slf.pending,seq)
client.pendingTimer.Remove(v)
delete(client.pending,seq)
return v.Value.(*Call)
}
func (slf *Client) FindPending(seq uint64) *Call{
slf.pendingLock.Lock()
v,ok := slf.pending[seq]
func (client *Client) FindPending(seq uint64) *Call{
client.pendingLock.Lock()
v,ok := client.pending[seq]
if ok == false {
slf.pendingLock.Unlock()
client.pendingLock.Unlock()
return nil
}
pCall := v.Value.(*Call)
slf.pendingLock.Unlock()
client.pendingLock.Unlock()
return pCall
}
func (slf *Client) generateSeq() uint64{
return atomic.AddUint64(&slf.startSeq,1)
func (client *Client) generateSeq() uint64{
return atomic.AddUint64(&client.startSeq,1)
}
func (slf *Client) AsycCall(rpcHandler IRpcHandler,serviceMethod string,callback reflect.Value, args interface{},replyParam interface{}) error {
func (client *Client) AsyncCall(rpcHandler IRpcHandler,serviceMethod string,callback reflect.Value, args interface{},replyParam interface{}) error {
call := MakeCall()
call.Reply = replyParam
call.callback = &callback
@@ -173,69 +173,69 @@ func (slf *Client) AsycCall(rpcHandler IRpcHandler,serviceMethod string,callback
request := &RpcRequest{}
call.Arg = args
call.Seq = slf.generateSeq()
request.RpcRequestData = processor.MakeRpcRequest(slf.startSeq,serviceMethod,false,InParam,nil)
slf.AddPending(call)
call.Seq = client.generateSeq()
request.RpcRequestData = processor.MakeRpcRequest(client.startSeq,serviceMethod,false,InParam,nil)
client.AddPending(call)
bytes,err := processor.Marshal(request.RpcRequestData)
processor.ReleaseRpcRequest(request.RpcRequestData)
if err != nil {
slf.RemovePending(call.Seq)
client.RemovePending(call.Seq)
ReleaseCall(call)
return err
}
if slf.conn == nil {
slf.RemovePending(call.Seq)
if client.conn == nil {
client.RemovePending(call.Seq)
ReleaseCall(call)
return fmt.Errorf("Rpc server is disconnect,call %s is fail!",serviceMethod)
}
err = slf.conn.WriteMsg([]byte{uint8(processorType)},bytes)
err = client.conn.WriteMsg([]byte{uint8(processorType)},bytes)
if err != nil {
slf.RemovePending(call.Seq)
client.RemovePending(call.Seq)
ReleaseCall(call)
}
return err
}
func (slf *Client) RawGo(processor IRpcProcessor,noReply bool,serviceMethod string,args []byte,additionParam interface{},reply interface{}) *Call {
func (client *Client) RawGo(processor IRpcProcessor,noReply bool,serviceMethod string,args []byte,additionParam interface{},reply interface{}) *Call {
call := MakeCall()
call.ServiceMethod = serviceMethod
call.Reply = reply
request := &RpcRequest{}
call.Arg = args
call.Seq = slf.generateSeq()
call.Seq = client.generateSeq()
if noReply == false {
slf.AddPending(call)
client.AddPending(call)
}
request.RpcRequestData = processor.MakeRpcRequest(slf.startSeq,serviceMethod,noReply,args,additionParam)
request.RpcRequestData = processor.MakeRpcRequest(client.startSeq,serviceMethod,noReply,args,additionParam)
bytes,err := processor.Marshal(request.RpcRequestData)
processor.ReleaseRpcRequest(request.RpcRequestData)
if err != nil {
call.Err = err
slf.RemovePending(call.Seq)
client.RemovePending(call.Seq)
return call
}
if slf.conn == nil {
if client.conn == nil {
call.Err = fmt.Errorf("call %s is fail,rpc client is disconnect.",serviceMethod)
slf.RemovePending(call.Seq)
client.RemovePending(call.Seq)
return call
}
err = slf.conn.WriteMsg([]byte{uint8(processor.GetProcessorType())},bytes)
err = client.conn.WriteMsg([]byte{uint8(processor.GetProcessorType())},bytes)
if err != nil {
slf.RemovePending(call.Seq)
client.RemovePending(call.Seq)
call.Err = err
}
return call
}
func (slf *Client) Go(noReply bool,serviceMethod string, args interface{},reply interface{}) *Call {
func (client *Client) Go(noReply bool,serviceMethod string, args interface{},reply interface{}) *Call {
_,processor := GetProcessorType(args)
InParam,err := processor.Marshal(args)
if err != nil {
@@ -243,10 +243,10 @@ func (slf *Client) Go(noReply bool,serviceMethod string, args interface{},reply
call.Err = err
}
return slf.RawGo(processor,noReply,serviceMethod,InParam,nil,reply)
return client.RawGo(processor,noReply,serviceMethod,InParam,nil,reply)
}
func (slf *Client) Run(){
func (client *Client) Run(){
defer func() {
if r := recover(); r != nil {
buf := make([]byte, 4096)
@@ -257,62 +257,62 @@ func (slf *Client) Run(){
}()
for {
bytes,err := slf.conn.ReadMsg()
bytes,err := client.conn.ReadMsg()
if err != nil {
log.Error("rpcClient %s ReadMsg error:%+v",slf.Addr,err)
log.Error("rpcClient %s ReadMsg error:%+v", client.Addr,err)
return
}
processor := GetProcessor(uint8(bytes[0]))
if processor==nil {
slf.conn.ReleaseReadMsg(bytes)
log.Error("rpcClient %s ReadMsg head error:%+v",slf.Addr,err)
client.conn.ReleaseReadMsg(bytes)
log.Error("rpcClient %s ReadMsg head error:%+v", client.Addr,err)
return
}
//1.解析head
respone := &RpcResponse{}
respone.RpcResponeData =processor.MakeRpcResponse(0,nil,nil)
respone.RpcResponseData =processor.MakeRpcResponse(0,nil,nil)
err = processor.Unmarshal(bytes[1:],respone.RpcResponeData)
slf.conn.ReleaseReadMsg(bytes)
err = processor.Unmarshal(bytes[1:],respone.RpcResponseData)
client.conn.ReleaseReadMsg(bytes)
if err != nil {
processor.ReleaseRpcRespose(respone.RpcResponeData)
processor.ReleaseRpcRespose(respone.RpcResponseData)
log.Error("rpcClient Unmarshal head error,error:%+v",err)
continue
}
v := slf.RemovePending(respone.RpcResponeData.GetSeq())
v := client.RemovePending(respone.RpcResponseData.GetSeq())
if v == nil {
log.Error("rpcClient cannot find seq %d in pending",respone.RpcResponeData.GetSeq())
log.Error("rpcClient cannot find seq %d in pending",respone.RpcResponseData.GetSeq())
}else {
v.Err = nil
if len(respone.RpcResponeData.GetReply()) >0 {
err = processor.Unmarshal(respone.RpcResponeData.GetReply(),v.Reply)
if len(respone.RpcResponseData.GetReply()) >0 {
err = processor.Unmarshal(respone.RpcResponseData.GetReply(),v.Reply)
if err != nil {
log.Error("rpcClient Unmarshal body error,error:%+v",err)
v.Err = err
}
}
if respone.RpcResponeData.GetErr() != nil {
v.Err= respone.RpcResponeData.GetErr()
if respone.RpcResponseData.GetErr() != nil {
v.Err= respone.RpcResponseData.GetErr()
}
if v.callback!=nil && v.callback.IsValid() {
v.rpcHandler.(*RpcHandler).callResponeCallBack<-v
v.rpcHandler.(*RpcHandler).callResponseCallBack <-v
}else{
v.done <- v
}
}
processor.ReleaseRpcRespose(respone.RpcResponeData)
processor.ReleaseRpcRespose(respone.RpcResponseData)
}
}
func (slf *Client) OnClose(){
func (client *Client) OnClose(){
}
func (slf *Client) IsConnected() bool {
return slf.conn!=nil && slf.conn.IsConnected()==true
func (client *Client) IsConnected() bool {
return client.conn!=nil && client.conn.IsConnected()==true
}

View File

@@ -20,7 +20,6 @@ type JsonRpcRequestData struct {
AdditionParam interface{}
}
type JsonRpcResponseData struct {
//head
Seq uint64 // sequence number chosen by client
@@ -30,14 +29,11 @@ type JsonRpcResponseData struct {
Reply []byte
}
var rpcJsonResponeDataPool sync.Pool
var rpcJsonResponseDataPool sync.Pool
var rpcJsonRequestDataPool sync.Pool
func init(){
rpcJsonResponeDataPool.New = func()interface{}{
rpcJsonResponseDataPool.New = func()interface{}{
return &JsonRpcResponseData{}
}
@@ -46,16 +42,15 @@ func init(){
}
}
func (slf *JsonProcessor) Marshal(v interface{}) ([]byte, error){
return json.Marshal(v)
func (jsonProcessor *JsonProcessor) Marshal(v interface{}) ([]byte, error){
return jsonProcessor.Marshal(v)
}
func (slf *JsonProcessor) Unmarshal(data []byte, v interface{}) error{
func (jsonProcessor *JsonProcessor) Unmarshal(data []byte, v interface{}) error{
return json.Unmarshal(data,v)
}
func (slf *JsonProcessor) MakeRpcRequest(seq uint64,serviceMethod string,noReply bool,inParam []byte,additionParam interface{}) IRpcRequestData{
func (jsonProcessor *JsonProcessor) MakeRpcRequest(seq uint64,serviceMethod string,noReply bool,inParam []byte,additionParam interface{}) IRpcRequestData{
jsonRpcRequestData := rpcJsonRequestDataPool.Get().(*JsonRpcRequestData)
jsonRpcRequestData.Seq = seq
jsonRpcRequestData.ServiceMethod = serviceMethod
@@ -65,75 +60,72 @@ func (slf *JsonProcessor) MakeRpcRequest(seq uint64,serviceMethod string,noReply
return jsonRpcRequestData
}
func (slf *JsonProcessor) MakeRpcResponse(seq uint64,err *RpcError,reply []byte) IRpcResponseData {
jsonRpcResponseData := rpcJsonResponeDataPool.Get().(*JsonRpcResponseData)
func (jsonProcessor *JsonProcessor) MakeRpcResponse(seq uint64,err *RpcError,reply []byte) IRpcResponseData {
jsonRpcResponseData := rpcJsonResponseDataPool.Get().(*JsonRpcResponseData)
jsonRpcResponseData.Seq = seq
jsonRpcResponseData.Err = err.Error()
jsonRpcResponseData.Reply = reply
return jsonRpcResponseData
}
func (slf *JsonProcessor) ReleaseRpcRequest(rpcRequestData IRpcRequestData){
func (jsonProcessor *JsonProcessor) ReleaseRpcRequest(rpcRequestData IRpcRequestData){
rpcJsonRequestDataPool.Put(rpcRequestData)
}
func (slf *JsonProcessor) ReleaseRpcRespose(rpcRequestData IRpcResponseData){
rpcJsonResponeDataPool.Put(rpcRequestData)
func (jsonProcessor *JsonProcessor) ReleaseRpcRespose(rpcRequestData IRpcResponseData){
rpcJsonResponseDataPool.Put(rpcRequestData)
}
func (slf *JsonProcessor) IsParse(param interface{}) bool {
func (jsonProcessor *JsonProcessor) IsParse(param interface{}) bool {
_,err := json.Marshal(param)
return err==nil
}
func (slf *JsonProcessor) GetProcessorType() RpcProcessorType{
return RPC_PROCESSOR_JSON
func (jsonProcessor *JsonProcessor) GetProcessorType() RpcProcessorType{
return RpcProcessorJson
}
func (slf *JsonRpcRequestData) IsNoReply() bool{
return slf.NoReply
func (jsonRpcRequestData *JsonRpcRequestData) IsNoReply() bool{
return jsonRpcRequestData.NoReply
}
func (slf *JsonRpcRequestData) GetSeq() uint64{
return slf.Seq
func (jsonRpcRequestData *JsonRpcRequestData) GetSeq() uint64{
return jsonRpcRequestData.Seq
}
func (slf *JsonRpcRequestData) GetServiceMethod() string{
return slf.ServiceMethod
func (jsonRpcRequestData *JsonRpcRequestData) GetServiceMethod() string{
return jsonRpcRequestData.ServiceMethod
}
func (slf *JsonRpcRequestData) GetInParam() []byte{
return slf.InParam
func (jsonRpcRequestData *JsonRpcRequestData) GetInParam() []byte{
return jsonRpcRequestData.InParam
}
func (slf *JsonRpcRequestData) GetParamValue() interface{}{
return slf.AdditionParam
func (jsonRpcRequestData *JsonRpcRequestData) GetParamValue() interface{}{
return jsonRpcRequestData.AdditionParam
}
func (slf *JsonRpcRequestData) GetAdditionParams() IRawAdditionParam{
return slf
func (jsonRpcRequestData *JsonRpcRequestData) GetAdditionParams() IRawAdditionParam{
return jsonRpcRequestData
}
func (slf *JsonRpcResponseData) GetSeq() uint64 {
return slf.Seq
func (jsonRpcResponseData *JsonRpcResponseData) GetSeq() uint64 {
return jsonRpcResponseData.Seq
}
func (slf *JsonRpcResponseData) GetErr() *RpcError {
if slf.Err == ""{
func (jsonRpcResponseData *JsonRpcResponseData) GetErr() *RpcError {
if jsonRpcResponseData.Err == ""{
return nil
}
return Errorf(slf.Err)
return Errorf(jsonRpcResponseData.Err)
}
func (slf *JsonRpcResponseData) GetReply() []byte{
return slf.Reply
func (jsonRpcResponseData *JsonRpcResponseData) GetReply() []byte{
return jsonRpcResponseData.Reply
}

View File

@@ -9,12 +9,12 @@ import (
type PBProcessor struct {
}
var rpcPbResponeDataPool sync.Pool
var rpcPbResponseDataPool sync.Pool
var rpcPbRequestDataPool sync.Pool
func init(){
rpcPbResponeDataPool.New = func()interface{}{
rpcPbResponseDataPool.New = func()interface{}{
return &PBRpcResponseData{}
}
@@ -115,7 +115,7 @@ func (slf *PBProcessor) MakeRpcRequest(seq uint64,serviceMethod string,noReply b
}
func (slf *PBProcessor) MakeRpcResponse(seq uint64,err *RpcError,reply []byte) IRpcResponseData {
pPBRpcResponseData := rpcPbResponeDataPool.Get().(*PBRpcResponseData)
pPBRpcResponseData := rpcPbResponseDataPool.Get().(*PBRpcResponseData)
pPBRpcResponseData.MakeRespone(seq,err,reply)
return pPBRpcResponseData
}
@@ -125,7 +125,7 @@ func (slf *PBProcessor) ReleaseRpcRequest(rpcRequestData IRpcRequestData){
}
func (slf *PBProcessor) ReleaseRpcRespose(rpcRequestData IRpcResponseData){
rpcPbResponeDataPool.Put(rpcRequestData)
rpcPbResponseDataPool.Put(rpcRequestData)
}
func (slf *PBProcessor) IsParse(param interface{}) bool {
@@ -135,7 +135,7 @@ func (slf *PBProcessor) IsParse(param interface{}) bool {
func (slf *PBProcessor) GetProcessorType() RpcProcessorType{
return RPC_PROCESSOR_PB
return RpcProcessorPb
}

View File

@@ -19,22 +19,12 @@ type RpcRequest struct {
}
type RpcResponse struct {
RpcResponeData IRpcResponseData
RpcResponseData IRpcResponseData
}
func (slf *RpcRequest) Clear() *RpcRequest{
slf.RpcRequestData = nil
slf.localReply = nil
slf.localParam = nil
slf.requestHandle = nil
slf.callback = nil
return slf
}
func (slf *RpcResponse) Clear() *RpcResponse{
slf.RpcResponeData = nil
return slf
}
var rpcResponsePool sync.Pool
var rpcRequestPool sync.Pool
var rpcCallPool sync.Pool
type IRawAdditionParam interface {
GetParamValue() interface{}
@@ -54,60 +44,30 @@ type IRpcResponseData interface {
GetReply() []byte
}
type RequestHandler func(Returns interface{},Err *RpcError)
type RawAdditionParamNull struct {
}
func (slf *RawAdditionParamNull) GetParamValue() interface{}{
return nil
}
type Call struct {
Seq uint64
ServiceMethod string
Arg interface{}
Reply interface{}
Respone *RpcResponse
Err error
done chan *Call // Strobes when call is complete.
connid int
callback *reflect.Value
rpcHandler IRpcHandler
calltime time.Time
}
func (slf *Call) Clear() *Call{
slf.Seq = 0
slf.ServiceMethod = ""
slf.Arg = nil
slf.Reply = nil
slf.Respone = nil
slf.Err = nil
slf.connid = 0
slf.callback = nil
slf.rpcHandler = nil
return slf
}
func (slf *Call) Done() *Call{
return <-slf.done
}
type RpcHandleFinder interface {
FindRpcHandler(serviceMethod string) IRpcHandler
}
type RequestHandler func(Returns interface{},Err *RpcError)
type RawAdditionParamNull struct {
}
var rpcResponePool sync.Pool
var rpcRequestPool sync.Pool
var rpcCallPool sync.Pool
type Call struct {
Seq uint64
ServiceMethod string
Arg interface{}
Reply interface{}
Response *RpcResponse
Err error
done chan *Call // Strobes when call is complete.
connId int
callback *reflect.Value
rpcHandler IRpcHandler
callTime time.Time
}
func init(){
rpcResponePool.New = func()interface{}{
rpcResponsePool.New = func()interface{}{
return &RpcResponse{}
}
@@ -120,8 +80,39 @@ func init(){
}
}
func (slf *RpcRequest) Clear() *RpcRequest{
slf.RpcRequestData = nil
slf.localReply = nil
slf.localParam = nil
slf.requestHandle = nil
slf.callback = nil
return slf
}
func (rpcResponse *RpcResponse) Clear() *RpcResponse{
rpcResponse.RpcResponseData = nil
return rpcResponse
}
func (call *Call) Clear() *Call{
call.Seq = 0
call.ServiceMethod = ""
call.Arg = nil
call.Reply = nil
call.Response = nil
call.Err = nil
call.connId = 0
call.callback = nil
call.rpcHandler = nil
return call
}
func (call *Call) Done() *Call{
return <-call.done
}
func MakeRpcResponse() *RpcResponse{
return rpcResponePool.Get().(*RpcResponse).Clear()
return rpcResponsePool.Get().(*RpcResponse).Clear()
}
func MakeRpcRequest() *RpcRequest{
@@ -132,8 +123,8 @@ func MakeCall() *Call {
return rpcCallPool.Get().(*Call).Clear()
}
func ReleaseRpcResponse(rpcRespone *RpcResponse){
rpcResponePool.Put(rpcRespone)
func ReleaseRpcResponse(rpcResponse *RpcResponse){
rpcResponsePool.Put(rpcResponse)
}
func ReleaseRpcRequest(rpcRequest *RpcRequest){
@@ -142,4 +133,8 @@ func ReleaseRpcRequest(rpcRequest *RpcRequest){
func ReleaseCall(call *Call){
rpcCallPool.Put(call)
}
}
func (slf *RawAdditionParamNull) GetParamValue() interface{}{
return nil
}

View File

@@ -10,7 +10,7 @@ import (
"unicode/utf8"
)
type FuncRpcClient func(nodeid int,serviceMethod string,client *[]*Client) error
type FuncRpcClient func(nodeId int,serviceMethod string,client *[]*Client) error
type FuncRpcServer func() (*Server)
var NilError = reflect.Zero(reflect.TypeOf((*error)(nil)).Elem())
@@ -35,42 +35,40 @@ func ConvertError(e error) *RpcError{
func Errorf(format string, a ...interface{}) *RpcError {
rpcErr := RpcError(fmt.Sprintf(format,a...))
return &rpcErr
}
type RpcMethodInfo struct {
method reflect.Method
iparam reflect.Value
iInParam interface{}
oParam reflect.Value
iOutParam interface{}
inParamValue reflect.Value
inParam interface{}
outParamValue reflect.Value
additionParam reflect.Value
//addition *IRawAdditionParam
hashAdditionParam bool
hasAdditionParam bool
rpcProcessorType RpcProcessorType
}
type RpcHandler struct {
callRequest chan *RpcRequest
rpcHandler IRpcHandler
mapfunctons map[string]RpcMethodInfo
callRequest chan *RpcRequest
rpcHandler IRpcHandler
mapFunctions map[string]RpcMethodInfo
funcRpcClient FuncRpcClient
funcRpcServer FuncRpcServer
callResponeCallBack chan *Call //异步返回的回调
callResponseCallBack chan *Call //异步返回的回调
}
type IRpcHandler interface {
GetName() string
InitRpcHandler(rpcHandler IRpcHandler,getClientFun FuncRpcClient,getServerFun FuncRpcServer)
GetRpcHandler() IRpcHandler
PushRequest(callinfo *RpcRequest) error
PushRequest(callInfo *RpcRequest) error
HandlerRpcRequest(request *RpcRequest)
HandlerRpcResponeCB(call *Call)
HandlerRpcResponseCB(call *Call)
GetRpcRequestChan() chan *RpcRequest
GetRpcResponeChan() chan *Call
GetRpcResponseChan() chan *Call
CallMethod(ServiceMethod string,param interface{},reply interface{}) error
AsyncCall(serviceMethod string,args interface{},callback interface{}) error
@@ -88,20 +86,20 @@ var rawAdditionParamValueNull reflect.Value
func init(){
rawAdditionParamValueNull = reflect.ValueOf(&RawAdditionParamNull{})
}
func (slf *RpcHandler) GetRpcHandler() IRpcHandler{
return slf.rpcHandler
func (handler *RpcHandler) GetRpcHandler() IRpcHandler{
return handler.rpcHandler
}
func (slf *RpcHandler) InitRpcHandler(rpcHandler IRpcHandler,getClientFun FuncRpcClient,getServerFun FuncRpcServer) {
slf.callRequest = make(chan *RpcRequest,1000000)
slf.callResponeCallBack = make(chan *Call,1000000)
func (handler *RpcHandler) InitRpcHandler(rpcHandler IRpcHandler,getClientFun FuncRpcClient,getServerFun FuncRpcServer) {
handler.callRequest = make(chan *RpcRequest,1000000)
handler.callResponseCallBack = make(chan *Call,1000000)
slf.rpcHandler = rpcHandler
slf.mapfunctons = map[string]RpcMethodInfo{}
slf.funcRpcClient = getClientFun
slf.funcRpcServer = getServerFun
handler.rpcHandler = rpcHandler
handler.mapFunctions = map[string]RpcMethodInfo{}
handler.funcRpcClient = getClientFun
handler.funcRpcServer = getServerFun
slf.RegisterRpc(rpcHandler)
handler.RegisterRpc(rpcHandler)
}
// Is this an exported - upper case - name?
@@ -111,7 +109,7 @@ func isExported(name string) bool {
}
// Is this type exported or a builtin?
func (slf *RpcHandler) isExportedOrBuiltinType(t reflect.Type) bool {
func (handler *RpcHandler) isExportedOrBuiltinType(t reflect.Type) bool {
for t.Kind() == reflect.Ptr {
t = t.Elem()
}
@@ -120,8 +118,7 @@ func (slf *RpcHandler) isExportedOrBuiltinType(t reflect.Type) bool {
return isExported(t.Name()) || t.PkgPath() == ""
}
func (slf *RpcHandler) suitableMethods(method reflect.Method) error {
func (handler *RpcHandler) suitableMethods(method reflect.Method) error {
//只有RPC_开头的才能被调用
if strings.Index(method.Name,"RPC_")!=0 {
return nil
@@ -146,35 +143,35 @@ func (slf *RpcHandler) suitableMethods(method reflect.Method) error {
var parIdx int = 1
if typ.In(parIdx).String() == "rpc.IRawAdditionParam" {
parIdx += 1
rpcMethodInfo.hashAdditionParam = true
rpcMethodInfo.hasAdditionParam = true
}
for i:= parIdx ;i<typ.NumIn();i++{
if slf.isExportedOrBuiltinType(typ.In(i)) == false {
if handler.isExportedOrBuiltinType(typ.In(i)) == false {
return fmt.Errorf("%s Unsupported parameter types!",method.Name)
}
}
rpcMethodInfo.iparam = reflect.New(typ.In(parIdx).Elem()) //append(rpcMethodInfo.iparam,)
rpcMethodInfo.iInParam = reflect.New(typ.In(parIdx).Elem()).Interface()
pt,_ := GetProcessorType(rpcMethodInfo.iparam.Interface())
rpcMethodInfo.inParamValue = reflect.New(typ.In(parIdx).Elem()) //append(rpcMethodInfo.iparam,)
rpcMethodInfo.inParam = reflect.New(typ.In(parIdx).Elem()).Interface()
pt,_ := GetProcessorType(rpcMethodInfo.inParamValue.Interface())
rpcMethodInfo.rpcProcessorType = pt
parIdx++
if parIdx< typ.NumIn() {
rpcMethodInfo.oParam = reflect.New(typ.In(parIdx).Elem())
rpcMethodInfo.outParamValue = reflect.New(typ.In(parIdx).Elem())
}
rpcMethodInfo.method = method
slf.mapfunctons[slf.rpcHandler.GetName()+"."+method.Name] = rpcMethodInfo
handler.mapFunctions[handler.rpcHandler.GetName()+"."+method.Name] = rpcMethodInfo
return nil
}
func (slf *RpcHandler) RegisterRpc(rpcHandler IRpcHandler) error {
func (handler *RpcHandler) RegisterRpc(rpcHandler IRpcHandler) error {
typ := reflect.TypeOf(rpcHandler)
for m:=0;m<typ.NumMethod();m++{
method := typ.Method(m)
err := slf.suitableMethods(method)
err := handler.suitableMethods(method)
if err != nil {
panic(err)
}
@@ -183,24 +180,24 @@ func (slf *RpcHandler) RegisterRpc(rpcHandler IRpcHandler) error {
return nil
}
func (slf *RpcHandler) PushRequest(req *RpcRequest) error{
if len(slf.callRequest) >= cap(slf.callRequest){
return fmt.Errorf("RpcHandler %s Rpc Channel is full.",slf.GetName())
func (handler *RpcHandler) PushRequest(req *RpcRequest) error{
if len(handler.callRequest) >= cap(handler.callRequest){
return fmt.Errorf("RpcHandler %s Rpc Channel is full.", handler.GetName())
}
slf.callRequest <- req
handler.callRequest <- req
return nil
}
func (slf *RpcHandler) GetRpcRequestChan() (chan *RpcRequest) {
return slf.callRequest
func (handler *RpcHandler) GetRpcRequestChan() (chan *RpcRequest) {
return handler.callRequest
}
func (slf *RpcHandler) GetRpcResponeChan() chan *Call{
return slf.callResponeCallBack
func (handler *RpcHandler) GetRpcResponseChan() chan *Call{
return handler.callResponseCallBack
}
func (slf *RpcHandler) HandlerRpcResponeCB(call *Call){
func (handler *RpcHandler) HandlerRpcResponseCB(call *Call){
defer func() {
if r := recover(); r != nil {
buf := make([]byte, 4096)
@@ -218,8 +215,7 @@ func (slf *RpcHandler) HandlerRpcResponeCB(call *Call){
ReleaseCall(call)
}
func (slf *RpcHandler) HandlerRpcRequest(request *RpcRequest) {
func (handler *RpcHandler) HandlerRpcRequest(request *RpcRequest) {
defer func() {
if r := recover(); r != nil {
buf := make([]byte, 4096)
@@ -235,9 +231,9 @@ func (slf *RpcHandler) HandlerRpcRequest(request *RpcRequest) {
defer ReleaseRpcRequest(request)
defer request.rpcProcessor.ReleaseRpcRequest(request.RpcRequestData)
v,ok := slf.mapfunctons[request.RpcRequestData.GetServiceMethod()]
v,ok := handler.mapFunctions[request.RpcRequestData.GetServiceMethod()]
if ok == false {
err := Errorf("RpcHandler %s cannot find %s",slf.rpcHandler.GetName(),request.RpcRequestData.GetServiceMethod())
err := Errorf("RpcHandler %s cannot find %s", handler.rpcHandler.GetName(),request.RpcRequestData.GetServiceMethod())
log.Error("%s",err.Error())
if request.requestHandle!=nil {
request.requestHandle(nil,err)
@@ -249,10 +245,10 @@ func (slf *RpcHandler) HandlerRpcRequest(request *RpcRequest) {
var err error
var iParam interface{}
//单协程下减少gc
if slf.IsSingleCoroutine(){
iParam = v.iInParam
if handler.IsSingleCoroutine(){
iParam = v.inParam
}else{
iParam = reflect.New(v.iparam.Type().Elem()).Interface()
iParam = reflect.New(v.inParamValue.Type().Elem()).Interface()
}
if request.bLocalRequest == false {
@@ -269,10 +265,10 @@ func (slf *RpcHandler) HandlerRpcRequest(request *RpcRequest) {
if request.localRawParam!=nil {
err = request.rpcProcessor.Unmarshal(request.localRawParam,iParam)
if err!=nil {
rerr := Errorf("Call Rpc %s Param error %+v",request.RpcRequestData.GetServiceMethod(),err)
log.Error("%s",rerr.Error())
rErr := Errorf("Call Rpc %s Param error %+v",request.RpcRequestData.GetServiceMethod(),err)
log.Error("%s", rErr.Error())
if request.requestHandle!=nil {
request.requestHandle(nil, rerr)
request.requestHandle(nil, rErr)
}
return
}
@@ -281,9 +277,9 @@ func (slf *RpcHandler) HandlerRpcRequest(request *RpcRequest) {
}
}
paramList = append(paramList,reflect.ValueOf(slf.GetRpcHandler())) //接受者
paramList = append(paramList,reflect.ValueOf(handler.GetRpcHandler())) //接受者
additionParams := request.RpcRequestData.GetAdditionParams()
if v.hashAdditionParam == true{
if v.hasAdditionParam == true{
if additionParams!=nil && additionParams.GetParamValue()!=nil{
additionVal := reflect.ValueOf(additionParams)
paramList = append(paramList,additionVal)
@@ -294,19 +290,19 @@ func (slf *RpcHandler) HandlerRpcRequest(request *RpcRequest) {
paramList = append(paramList,reflect.ValueOf(iParam))
var oParam reflect.Value
if v.oParam.IsValid() {
if v.outParamValue.IsValid() {
if request.localReply!=nil {
oParam = reflect.ValueOf(request.localReply) //输出参数
}else if slf.IsSingleCoroutine()==true{
oParam = v.oParam
}else if handler.IsSingleCoroutine()==true{
oParam = v.outParamValue
}else{
oParam = reflect.New(v.oParam.Type().Elem())
oParam = reflect.New(v.outParamValue.Type().Elem())
}
paramList = append(paramList,oParam) //输出参数
}else if(request.requestHandle!=nil){ //调用方有返回值,但被调用函数没有返回参数
rerr := Errorf("Call Rpc %s without return parameter!",request.RpcRequestData.GetServiceMethod())
log.Error("%s",rerr.Error())
request.requestHandle(nil, rerr)
}else if(request.requestHandle != nil){ //调用方有返回值,但被调用函数没有返回参数
rErr := Errorf("Call Rpc %s without return parameter!",request.RpcRequestData.GetServiceMethod())
log.Error("%s",rErr.Error())
request.requestHandle(nil, rErr)
return
}
returnValues := v.method.Func.Call(paramList)
@@ -320,18 +316,18 @@ func (slf *RpcHandler) HandlerRpcRequest(request *RpcRequest) {
}
}
func (slf *RpcHandler) CallMethod(ServiceMethod string,param interface{},reply interface{}) error{
func (handler *RpcHandler) CallMethod(ServiceMethod string,param interface{},reply interface{}) error{
var err error
v,ok := slf.mapfunctons[ServiceMethod]
v,ok := handler.mapFunctions[ServiceMethod]
if ok == false {
err = fmt.Errorf("RpcHandler %s cannot find %s",slf.rpcHandler.GetName(),ServiceMethod)
err = fmt.Errorf("RpcHandler %s cannot find %s", handler.rpcHandler.GetName(),ServiceMethod)
log.Error("%s",err.Error())
return err
}
var paramList []reflect.Value
paramList = append(paramList,reflect.ValueOf(slf.GetRpcHandler())) //接受者
paramList = append(paramList,reflect.ValueOf(handler.GetRpcHandler())) //接受者
paramList = append(paramList,reflect.ValueOf(param))
paramList = append(paramList,reflect.ValueOf(reply)) //输出参数
@@ -344,9 +340,9 @@ func (slf *RpcHandler) CallMethod(ServiceMethod string,param interface{},reply i
return err
}
func (slf *RpcHandler) goRpc(processor IRpcProcessor,bCast bool,nodeId int,serviceMethod string,args interface{}) error {
func (handler *RpcHandler) goRpc(processor IRpcProcessor,bCast bool,nodeId int,serviceMethod string,args interface{}) error {
var pClientList []*Client
err := slf.funcRpcClient(nodeId,serviceMethod,&pClientList)
err := handler.funcRpcClient(nodeId,serviceMethod,&pClientList)
if err != nil {
log.Error("Call serviceMethod is error:%+v!",err)
return err
@@ -360,19 +356,19 @@ func (slf *RpcHandler) goRpc(processor IRpcProcessor,bCast bool,nodeId int,servi
//如果调用本结点服务
for _,pClient := range pClientList {
if pClient.bSelfNode == true {
pLocalRpcServer:=slf.funcRpcServer()
pLocalRpcServer:= handler.funcRpcServer()
//判断是否是同一服务
sMethod := strings.Split(serviceMethod,".")
if len(sMethod)!=2 {
serr := fmt.Errorf("Call serviceMethod %s is error!",serviceMethod)
log.Error("%+v",serr)
if serr!= nil {
err = serr
sErr := fmt.Errorf("Call serviceMethod %s is error!",serviceMethod)
log.Error("%+v", sErr)
if sErr != nil {
err = sErr
}
continue
}
//调用自己rpcHandler处理器
if sMethod[0] == slf.rpcHandler.GetName() { //自己服务调用
if sMethod[0] == handler.rpcHandler.GetName() { //自己服务调用
//
return pLocalRpcServer.myselfRpcHandlerGo(sMethod[0],sMethod[1],args,nil)
}
@@ -396,11 +392,9 @@ func (slf *RpcHandler) goRpc(processor IRpcProcessor,bCast bool,nodeId int,servi
return err
}
func (slf *RpcHandler) rawGoRpc(processor IRpcProcessor,bCast bool,nodeId int,serviceMethod string,args []byte,additionParam interface{}) error {
func (handler *RpcHandler) rawGoRpc(processor IRpcProcessor,bCast bool,nodeId int,serviceMethod string,args []byte,additionParam interface{}) error {
var pClientList []*Client
err := slf.funcRpcClient(nodeId,serviceMethod,&pClientList)
err := handler.funcRpcClient(nodeId,serviceMethod,&pClientList)
if err != nil {
log.Error("Call serviceMethod is error:%+v!",err)
return err
@@ -414,7 +408,7 @@ func (slf *RpcHandler) rawGoRpc(processor IRpcProcessor,bCast bool,nodeId int,se
//如果调用本结点服务
for _,pClient := range pClientList {
if pClient.bSelfNode == true {
pLocalRpcServer:=slf.funcRpcServer()
pLocalRpcServer:= handler.funcRpcServer()
//判断是否是同一服务
sMethod := strings.Split(serviceMethod,".")
if len(sMethod)!=2 {
@@ -426,7 +420,7 @@ func (slf *RpcHandler) rawGoRpc(processor IRpcProcessor,bCast bool,nodeId int,se
continue
}
//调用自己rpcHandler处理器
if sMethod[0] == slf.rpcHandler.GetName() { //自己服务调用
if sMethod[0] == handler.rpcHandler.GetName() { //自己服务调用
//
return pLocalRpcServer.myselfRpcHandlerGo(sMethod[0],sMethod[1],args,nil)
}
@@ -451,9 +445,9 @@ func (slf *RpcHandler) rawGoRpc(processor IRpcProcessor,bCast bool,nodeId int,se
}
func (slf *RpcHandler) callRpc(nodeId int,serviceMethod string,args interface{},reply interface{}) error {
func (handler *RpcHandler) callRpc(nodeId int,serviceMethod string,args interface{},reply interface{}) error {
var pClientList []*Client
err := slf.funcRpcClient(nodeId,serviceMethod,&pClientList)
err := handler.funcRpcClient(nodeId,serviceMethod,&pClientList)
if err != nil {
log.Error("Call serviceMethod is error:%+v!",err)
return err
@@ -467,7 +461,7 @@ func (slf *RpcHandler) callRpc(nodeId int,serviceMethod string,args interface{},
//如果调用本结点服务
pClient := pClientList[0]
if pClient.bSelfNode == true {
pLocalRpcServer:=slf.funcRpcServer()
pLocalRpcServer:= handler.funcRpcServer()
//判断是否是同一服务
sMethod := strings.Split(serviceMethod,".")
if len(sMethod)!=2 {
@@ -476,7 +470,7 @@ func (slf *RpcHandler) callRpc(nodeId int,serviceMethod string,args interface{},
return err
}
//调用自己rpcHandler处理器
if sMethod[0] == slf.rpcHandler.GetName() { //自己服务调用
if sMethod[0] == handler.rpcHandler.GetName() { //自己服务调用
//
return pLocalRpcServer.myselfRpcHandlerGo(sMethod[0],sMethod[1],args,reply)
}
@@ -499,7 +493,7 @@ func (slf *RpcHandler) callRpc(nodeId int,serviceMethod string,args interface{},
return err
}
func (slf *RpcHandler) asyncCallRpc(nodeid int,serviceMethod string,args interface{},callback interface{}) error {
func (handler *RpcHandler) asyncCallRpc(nodeid int,serviceMethod string,args interface{},callback interface{}) error {
fVal := reflect.ValueOf(callback)
if fVal.Kind()!=reflect.Func{
err := fmt.Errorf("call %s input callback param is error!",serviceMethod)
@@ -521,7 +515,7 @@ func (slf *RpcHandler) asyncCallRpc(nodeid int,serviceMethod string,args interfa
reply := reflect.New(fVal.Type().In(0).Elem()).Interface()
var pClientList []*Client
err := slf.funcRpcClient(nodeid,serviceMethod,&pClientList)
err := handler.funcRpcClient(nodeid,serviceMethod,&pClientList)
if err != nil {
fVal.Call([]reflect.Value{reflect.ValueOf(reply),reflect.ValueOf(err)})
log.Error("Call serviceMethod is error:%+v!",err)
@@ -539,7 +533,7 @@ func (slf *RpcHandler) asyncCallRpc(nodeid int,serviceMethod string,args interfa
//如果调用本结点服务
pClient := pClientList[0]
if pClient.bSelfNode == true {
pLocalRpcServer:=slf.funcRpcServer()
pLocalRpcServer:= handler.funcRpcServer()
//判断是否是同一服务
sMethod := strings.Split(serviceMethod,".")
if len(sMethod)!=2 {
@@ -549,7 +543,7 @@ func (slf *RpcHandler) asyncCallRpc(nodeid int,serviceMethod string,args interfa
return nil
}
//调用自己rpcHandler处理器
if sMethod[0] == slf.rpcHandler.GetName() { //自己服务调用
if sMethod[0] == handler.rpcHandler.GetName() { //自己服务调用
err := pLocalRpcServer.myselfRpcHandlerGo(sMethod[0],sMethod[1],args,reply)
if err == nil {
fVal.Call([]reflect.Value{reflect.ValueOf(reply),NilError})
@@ -560,7 +554,7 @@ func (slf *RpcHandler) asyncCallRpc(nodeid int,serviceMethod string,args interfa
//其他的rpcHandler的处理器
if callback!=nil {
err = pLocalRpcServer.selfNodeRpcHandlerAsyncGo(pClient,slf,false,sMethod[0],sMethod[1],args,reply,fVal)
err = pLocalRpcServer.selfNodeRpcHandlerAsyncGo(pClient, handler,false,sMethod[0],sMethod[1],args,reply,fVal)
if err != nil {
fVal.Call([]reflect.Value{reflect.ValueOf(reply),reflect.ValueOf(err)})
}
@@ -575,54 +569,54 @@ func (slf *RpcHandler) asyncCallRpc(nodeid int,serviceMethod string,args interfa
}
//跨node调用
err = pClient.AsycCall(slf,serviceMethod,fVal,args,reply)
err = pClient.AsyncCall(handler,serviceMethod,fVal,args,reply)
if err != nil {
fVal.Call([]reflect.Value{reflect.ValueOf(reply),reflect.ValueOf(err)})
}
return nil
}
func (slf *RpcHandler) GetName() string{
return slf.rpcHandler.GetName()
func (handler *RpcHandler) GetName() string{
return handler.rpcHandler.GetName()
}
func (slf *RpcHandler) IsSingleCoroutine() bool{
return slf.rpcHandler.IsSingleCoroutine()
func (handler *RpcHandler) IsSingleCoroutine() bool{
return handler.rpcHandler.IsSingleCoroutine()
}
func (slf *RpcHandler) AsyncCall(serviceMethod string,args interface{},callback interface{}) error {
return slf.asyncCallRpc(0,serviceMethod,args,callback)
func (handler *RpcHandler) AsyncCall(serviceMethod string,args interface{},callback interface{}) error {
return handler.asyncCallRpc(0,serviceMethod,args,callback)
}
func (slf *RpcHandler) Call(serviceMethod string,args interface{},reply interface{}) error {
return slf.callRpc(0,serviceMethod,args,reply)
func (handler *RpcHandler) Call(serviceMethod string,args interface{},reply interface{}) error {
return handler.callRpc(0,serviceMethod,args,reply)
}
func (slf *RpcHandler) Go(serviceMethod string,args interface{}) error {
return slf.goRpc(nil,false,0,serviceMethod,args)
func (handler *RpcHandler) Go(serviceMethod string,args interface{}) error {
return handler.goRpc(nil,false,0,serviceMethod,args)
}
func (slf *RpcHandler) AsyncCallNode(nodeId int,serviceMethod string,args interface{},callback interface{}) error {
return slf.asyncCallRpc(nodeId,serviceMethod,args,callback)
func (handler *RpcHandler) AsyncCallNode(nodeId int,serviceMethod string,args interface{},callback interface{}) error {
return handler.asyncCallRpc(nodeId,serviceMethod,args,callback)
}
func (slf *RpcHandler) CallNode(nodeId int,serviceMethod string,args interface{},reply interface{}) error {
return slf.callRpc(nodeId,serviceMethod,args,reply)
func (handler *RpcHandler) CallNode(nodeId int,serviceMethod string,args interface{},reply interface{}) error {
return handler.callRpc(nodeId,serviceMethod,args,reply)
}
func (slf *RpcHandler) GoNode(nodeId int,serviceMethod string,args interface{}) error {
return slf.goRpc(nil,false,nodeId,serviceMethod,args)
func (handler *RpcHandler) GoNode(nodeId int,serviceMethod string,args interface{}) error {
return handler.goRpc(nil,false,nodeId,serviceMethod,args)
}
func (slf *RpcHandler) CastGo(serviceMethod string,args interface{}) {
slf.goRpc(nil,true,0,serviceMethod,args)
func (handler *RpcHandler) CastGo(serviceMethod string,args interface{}) {
handler.goRpc(nil,true,0,serviceMethod,args)
}
func (slf *RpcHandler) RawGoNode(rpcProcessorType RpcProcessorType,nodeId int,serviceMethod string,args []byte,additionParam interface{}) error {
return slf.rawGoRpc(GetProcessor(uint8(rpcProcessorType)),false,nodeId,serviceMethod,args,additionParam)
func (handler *RpcHandler) RawGoNode(rpcProcessorType RpcProcessorType,nodeId int,serviceMethod string,args []byte,additionParam interface{}) error {
return handler.rawGoRpc(GetProcessor(uint8(rpcProcessorType)),false,nodeId,serviceMethod,args,additionParam)
}
func (slf *RpcHandler) RawCastGo(rpcProcessorType RpcProcessorType,serviceMethod string,args []byte,additionParam interface{}) {
slf.goRpc(GetProcessor(uint8(rpcProcessorType)),true,0,serviceMethod,args)
func (handler *RpcHandler) RawCastGo(rpcProcessorType RpcProcessorType,serviceMethod string,args []byte,additionParam interface{}) {
handler.goRpc(GetProcessor(uint8(rpcProcessorType)),true,0,serviceMethod,args)
}

View File

@@ -12,8 +12,8 @@ import (
type RpcProcessorType uint8
const (
RPC_PROCESSOR_JSON RpcProcessorType = 0
RPC_PROCESSOR_PB RpcProcessorType = 1
RpcProcessorJson RpcProcessorType = 0
RpcProcessorPb RpcProcessorType = 1
)
//var processor IRpcProcessor = &JsonProcessor{}
@@ -22,10 +22,16 @@ var arrayProcessorLen uint8 = 2
var LittleEndian bool
type Server struct {
functions map[interface{}]interface{}
cmdchannel chan *Call
functions map[interface{}]interface{}
cmdChannel chan *Call
rpcHandleFinder RpcHandleFinder
rpcserver *network.TCPServer
rpcServer *network.TCPServer
}
type RpcAgent struct {
conn network.Conn
rpcServer *Server
userData interface{}
}
func AppendProcessor(rpcProcessor IRpcProcessor) {
@@ -40,7 +46,7 @@ func GetProcessorType(param interface{}) (RpcProcessorType,IRpcProcessor){
}
}
return RPC_PROCESSOR_JSON,arrayProcessor[RPC_PROCESSOR_JSON]
return RpcProcessorJson,arrayProcessor[RpcProcessorJson]
}
func GetProcessor(processorType uint8) IRpcProcessor{
@@ -50,39 +56,32 @@ func GetProcessor(processorType uint8) IRpcProcessor{
return arrayProcessor[processorType]
}
func (slf *Server) Init(rpcHandleFinder RpcHandleFinder) {
slf.cmdchannel = make(chan *Call,100000)
slf.rpcHandleFinder = rpcHandleFinder
slf.rpcserver = &network.TCPServer{}
func (server *Server) Init(rpcHandleFinder RpcHandleFinder) {
server.cmdChannel = make(chan *Call,100000)
server.rpcHandleFinder = rpcHandleFinder
server.rpcServer = &network.TCPServer{}
}
func (slf *Server) Start(listenAddr string) {
func (server *Server) Start(listenAddr string) {
splitAddr := strings.Split(listenAddr,":")
if len(splitAddr)!=2{
log.Fatal("listen addr is error :%s",listenAddr)
}
slf.rpcserver.Addr = ":"+splitAddr[1]
slf.rpcserver.LenMsgLen = 2 //uint16
slf.rpcserver.MinMsgLen = 2
slf.rpcserver.MaxMsgLen = math.MaxUint16
slf.rpcserver.MaxConnNum = 10000
slf.rpcserver.PendingWriteNum = 2000000
slf.rpcserver.NewAgent =slf.NewAgent
slf.rpcserver.LittleEndian = LittleEndian
slf.rpcserver.Start()
server.rpcServer.Addr = ":"+splitAddr[1]
server.rpcServer.LenMsgLen = 2 //uint16
server.rpcServer.MinMsgLen = 2
server.rpcServer.MaxMsgLen = math.MaxUint16
server.rpcServer.MaxConnNum = 10000
server.rpcServer.PendingWriteNum = 2000000
server.rpcServer.NewAgent = server.NewAgent
server.rpcServer.LittleEndian = LittleEndian
server.rpcServer.Start()
}
func (agent *RpcAgent) OnDestroy() {}
func (gate *RpcAgent) OnDestroy() {}
type RpcAgent struct {
conn network.Conn
rpcserver *Server
userData interface{}
}
func (agent *RpcAgent) WriteRespone(processor IRpcProcessor,serviceMethod string,seq uint64,reply interface{},err *RpcError) {
func (agent *RpcAgent) WriteResponse(processor IRpcProcessor,serviceMethod string,seq uint64,reply interface{},err *RpcError) {
var mReply []byte
var rpcError *RpcError
var errM error
@@ -99,9 +98,9 @@ func (agent *RpcAgent) WriteRespone(processor IRpcProcessor,serviceMethod string
}
var rpcResponse RpcResponse
rpcResponse.RpcResponeData = processor.MakeRpcResponse(seq,rpcError,mReply)
bytes,errM := processor.Marshal(rpcResponse.RpcResponeData)
defer processor.ReleaseRpcRespose(rpcResponse.RpcResponeData)
rpcResponse.RpcResponseData = processor.MakeRpcResponse(seq,rpcError,mReply)
bytes,errM := processor.Marshal(rpcResponse.RpcResponseData)
defer processor.ReleaseRpcRespose(rpcResponse.RpcResponseData)
if errM != nil {
log.Error("service method %s %+v Marshal error:%+v!", serviceMethod,rpcResponse,errM)
@@ -114,7 +113,6 @@ func (agent *RpcAgent) WriteRespone(processor IRpcProcessor,serviceMethod string
}
}
func (agent *RpcAgent) Run() {
for {
data,err := agent.conn.ReadMsg()
@@ -123,6 +121,7 @@ func (agent *RpcAgent) Run() {
//will close tcpconn
break
}
processor := GetProcessor(uint8(data[0]))
if processor==nil {
agent.conn.ReleaseReadMsg(data)
@@ -140,7 +139,7 @@ func (agent *RpcAgent) Run() {
log.Error("rpc Unmarshal request is error: %v", err)
if req.RpcRequestData.GetSeq()>0 {
rpcError := RpcError(err.Error())
agent.WriteRespone(processor,req.RpcRequestData.GetServiceMethod(),req.RpcRequestData.GetSeq(),nil,&rpcError)
agent.WriteResponse(processor,req.RpcRequestData.GetServiceMethod(),req.RpcRequestData.GetSeq(),nil,&rpcError)
processor.ReleaseRpcRequest(req.RpcRequestData)
ReleaseRpcRequest(req)
continue
@@ -156,17 +155,17 @@ func (agent *RpcAgent) Run() {
serviceMethod := strings.Split(req.RpcRequestData.GetServiceMethod(),".")
if len(serviceMethod)!=2 {
rpcError := RpcError("rpc request req.ServiceMethod is error")
agent.WriteRespone(processor,req.RpcRequestData.GetServiceMethod(),req.RpcRequestData.GetSeq(),nil,&rpcError)
agent.WriteResponse(processor,req.RpcRequestData.GetServiceMethod(),req.RpcRequestData.GetSeq(),nil,&rpcError)
processor.ReleaseRpcRequest(req.RpcRequestData)
ReleaseRpcRequest(req)
log.Debug("rpc request req.ServiceMethod is error")
continue
}
rpcHandler := agent.rpcserver.rpcHandleFinder.FindRpcHandler(serviceMethod[0])
rpcHandler := agent.rpcServer.rpcHandleFinder.FindRpcHandler(serviceMethod[0])
if rpcHandler== nil {
rpcError := RpcError(fmt.Sprintf("service method %s not config!", req.RpcRequestData.GetServiceMethod()))
agent.WriteRespone(processor,req.RpcRequestData.GetServiceMethod(),req.RpcRequestData.GetSeq(),nil,&rpcError)
agent.WriteResponse(processor,req.RpcRequestData.GetServiceMethod(),req.RpcRequestData.GetSeq(),nil,&rpcError)
processor.ReleaseRpcRequest(req.RpcRequestData)
ReleaseRpcRequest(req)
log.Error("service method %s not config!", req.RpcRequestData.GetServiceMethod())
@@ -175,7 +174,7 @@ func (agent *RpcAgent) Run() {
if req.RpcRequestData.IsNoReply()==false {
req.requestHandle = func(Returns interface{},Err *RpcError){
agent.WriteRespone(processor,req.RpcRequestData.GetServiceMethod(),req.RpcRequestData.GetSeq(),Returns,Err)
agent.WriteResponse(processor,req.RpcRequestData.GetServiceMethod(),req.RpcRequestData.GetSeq(),Returns,Err)
}
}
@@ -184,7 +183,7 @@ func (agent *RpcAgent) Run() {
rpcError := RpcError(err.Error())
if req.RpcRequestData.IsNoReply() {
agent.WriteRespone(processor,req.RpcRequestData.GetServiceMethod(),req.RpcRequestData.GetSeq(),nil,&rpcError)
agent.WriteResponse(processor,req.RpcRequestData.GetServiceMethod(),req.RpcRequestData.GetSeq(),nil,&rpcError)
}
processor.ReleaseRpcRequest(req.RpcRequestData)
@@ -215,15 +214,14 @@ func (agent *RpcAgent) Destroy() {
agent.conn.Destroy()
}
func (slf *Server) NewAgent(conn *network.TCPConn) network.Agent {
agent := &RpcAgent{conn: conn, rpcserver: slf}
func (server *Server) NewAgent(conn *network.TCPConn) network.Agent {
agent := &RpcAgent{conn: conn, rpcServer: server}
return agent
}
func (slf *Server) myselfRpcHandlerGo(handlerName string,methodName string, args interface{},reply interface{}) error {
rpcHandler := slf.rpcHandleFinder.FindRpcHandler(handlerName)
func (server *Server) myselfRpcHandlerGo(handlerName string,methodName string, args interface{},reply interface{}) error {
rpcHandler := server.rpcHandleFinder.FindRpcHandler(handlerName)
if rpcHandler== nil {
err := fmt.Errorf("service method %s.%s not config!", handlerName,methodName)
log.Error("%s",err.Error())
@@ -234,11 +232,11 @@ func (slf *Server) myselfRpcHandlerGo(handlerName string,methodName string, args
}
func (slf *Server) selfNodeRpcHandlerGo(processor IRpcProcessor,client *Client,noReply bool,handlerName string,methodName string, args interface{},rawArgs []byte,reply interface{},additionParam interface{}) *Call {
func (server *Server) selfNodeRpcHandlerGo(processor IRpcProcessor,client *Client,noReply bool,handlerName string,methodName string, args interface{},rawArgs []byte,reply interface{},additionParam interface{}) *Call {
pCall := MakeCall()
pCall.Seq = client.generateSeq()
rpcHandler := slf.rpcHandleFinder.FindRpcHandler(handlerName)
rpcHandler := server.rpcHandleFinder.FindRpcHandler(handlerName)
if rpcHandler== nil {
pCall.Err = fmt.Errorf("service method %s.%s not config!", handlerName,methodName)
log.Error("%s",pCall.Err.Error())
@@ -288,13 +286,13 @@ func (slf *Server) selfNodeRpcHandlerGo(processor IRpcProcessor,client *Client,n
return pCall
}
func (slf *Server) selfNodeRpcHandlerAsyncGo(client *Client,callerRpcHandler IRpcHandler,noReply bool,handlerName string,methodName string,args interface{},reply interface{},callback reflect.Value) error {
func (server *Server) selfNodeRpcHandlerAsyncGo(client *Client,callerRpcHandler IRpcHandler,noReply bool,handlerName string,methodName string,args interface{},reply interface{},callback reflect.Value) error {
pCall := MakeCall()
pCall.Seq = client.generateSeq()
pCall.rpcHandler = callerRpcHandler
pCall.callback = &callback
pCall.Reply = reply
rpcHandler := slf.rpcHandleFinder.FindRpcHandler(handlerName)
rpcHandler := server.rpcHandleFinder.FindRpcHandler(handlerName)
if rpcHandler== nil {
err := fmt.Errorf("service method %s.%s not config!", handlerName,methodName)
log.Error("%+v",err)
@@ -330,11 +328,10 @@ func (slf *Server) selfNodeRpcHandlerAsyncGo(client *Client,callerRpcHandler IRp
if Returns!=nil {
pCall.Reply = Returns
}
pCall.rpcHandler.(*RpcHandler).callResponeCallBack<-pCall
pCall.rpcHandler.(*RpcHandler).callResponseCallBack <-pCall
}
}
err := rpcHandler.PushRequest(req)
if err != nil {
processor.ReleaseRpcRequest(req.RpcRequestData)