diff --git a/cluster/cluster.go b/cluster/cluster.go index 77fce3e..a382925 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -69,8 +69,8 @@ func (slf *Cluster) Init(currentNodeId int) error{ rpcinfo.nodeinfo = nodeinfo rpcinfo.client = &rpc.Client{} if nodeinfo.NodeId == currentNodeId { - //rpcinfo.client.Connect("localhost") - rpcinfo.client.Connect(nodeinfo.ListenAddr) + rpcinfo.client.Connect("") + //rpcinfo.client.Connect(nodeinfo.ListenAddr) }else{ rpcinfo.client.Connect(nodeinfo.ListenAddr) } diff --git a/rpc/client.go b/rpc/client.go index 457e2a6..98c6af0 100644 --- a/rpc/client.go +++ b/rpc/client.go @@ -8,14 +8,13 @@ import ( "math" "reflect" "runtime" - "strings" "sync" "sync/atomic" "time" ) type Client struct { - blocalhost bool + bSelfNode bool network.TCPClient conn *network.TCPConn @@ -23,6 +22,8 @@ type Client struct { startSeq uint64 pending map[uint64]*list.Element pendingTimer *list.List + callRpcTimerout time.Duration + maxCheckCallRpcCount int } func (slf *Client) NewClientAgent(conn *network.TCPConn) network.Agent { @@ -34,10 +35,8 @@ func (slf *Client) NewClientAgent(conn *network.TCPConn) network.Agent { func (slf *Client) Connect(addr string) error { slf.Addr = addr - if strings.Index(addr,"localhost") == 0 { - slf.blocalhost = true - return nil - } + slf.maxCheckCallRpcCount = 100 + slf.callRpcTimerout = 10*time.Second slf.ConnNum = 1 slf.ConnectInterval = time.Second*2 slf.PendingWriteNum = 10000 @@ -48,10 +47,59 @@ func (slf *Client) Connect(addr string) error { slf.NewAgent = slf.NewClientAgent slf.LittleEndian = LittleEndian slf.ResetPending() + go slf.startCheckRpcCallTimer() + if addr == "" { + slf.bSelfNode = true + return nil + } + slf.Start() return nil } +func (slf *Client) startCheckRpcCallTimer(){ + tick :=time.NewTicker( 3 * time.Second) + + for{ + select { + case <- tick.C: + slf.checkRpcCallTimerout() + } + } + tick.Stop() +} + +func (slf *Client) makeCallFail(call *Call){ + if call.callback!=nil && call.callback.IsValid() { + call.rpcHandler.(*RpcHandler).callResponeCallBack<-call + }else{ + call.done <- call + } + slf.removePending(call.Seq) +} + +func (slf *Client) checkRpcCallTimerout(){ + tnow := time.Now() + + for i:=0;i slf.callRpcTimerout { + pCall.Err = fmt.Errorf("RPC call takes more than %d seconds!",slf.callRpcTimerout/time.Second) + slf.makeCallFail(pCall) + slf.pendingLock.Unlock() + continue + } + slf.pendingLock.Unlock() + } + +} + func (slf *Client) ResetPending(){ slf.pendingLock.Lock() if slf.pending != nil { @@ -68,6 +116,7 @@ func (slf *Client) ResetPending(){ func (slf *Client) AddPending(call *Call){ slf.pendingLock.Lock() + call.calltime = time.Now() elemTimer := slf.pendingTimer.PushBack(call) slf.pending[call.Seq] = elemTimer//如果下面发送失败,将会一一直存在这里 slf.pendingLock.Unlock() @@ -75,18 +124,20 @@ func (slf *Client) AddPending(call *Call){ func (slf *Client) RemovePending(seq uint64){ slf.pendingLock.Lock() + slf.removePending(seq) + slf.pendingLock.Unlock() +} +func (slf *Client) removePending(seq uint64){ v,ok := slf.pending[seq] if ok == false{ - slf.pendingLock.Unlock() return } slf.pendingTimer.Remove(v) delete(slf.pending,seq) - - slf.pendingLock.Unlock() } + func (slf *Client) FindPending(seq uint64) *Call{ slf.pendingLock.Lock() v,ok := slf.pending[seq] diff --git a/rpc/rpc.go b/rpc/rpc.go index 6b04461..534e9e5 100644 --- a/rpc/rpc.go +++ b/rpc/rpc.go @@ -3,6 +3,7 @@ package rpc import ( "reflect" "sync" + "time" ) type RpcRequest struct { @@ -45,6 +46,7 @@ type Call struct { connid int callback *reflect.Value rpcHandler IRpcHandler + calltime time.Time } func (slf *Call) Clear(){ diff --git a/rpc/rpchandler.go b/rpc/rpchandler.go index 9cceef6..554a3eb 100644 --- a/rpc/rpchandler.go +++ b/rpc/rpchandler.go @@ -237,12 +237,13 @@ func (slf *RpcHandler) HandlerRpcRequest(request *RpcRequest) { var oParam reflect.Value paramList = append(paramList,reflect.ValueOf(slf.GetRpcHandler())) //接受者 - oParam = reflect.New(v.oParam.Type().Elem()) - - paramList = append(paramList,reflect.ValueOf(iparam)) + if request.localReply!=nil { + oParam = reflect.ValueOf(request.localReply) //输出参数 + }else{ + oParam = reflect.New(v.oParam.Type().Elem()) + } paramList = append(paramList,oParam) //输出参数 - returnValues := v.method.Func.Call(paramList) errInter := returnValues[0].Interface() if errInter != nil { @@ -293,7 +294,7 @@ func (slf *RpcHandler) goRpc(bCast bool,nodeId int,serviceMethod string,args int //2.rpcclient调用 //如果调用本结点服务 for _,pClient := range pClientList { - if pClient.blocalhost == true { + if pClient.bSelfNode == true { pLocalRpcServer:=slf.funcRpcServer() //判断是否是同一服务 sMethod := strings.Split(serviceMethod,".") @@ -311,11 +312,11 @@ func (slf *RpcHandler) goRpc(bCast bool,nodeId int,serviceMethod string,args int return pLocalRpcServer.myselfRpcHandlerGo(sMethod[0],sMethod[1],args,nil) } //其他的rpcHandler的处理器 - pCall := pLocalRpcServer.rpcHandlerGo(true,sMethod[0],sMethod[1],args,nil) - defer ReleaseCall(pCall) + pCall := pLocalRpcServer.selfNodeRpcHandlerGo(pClient,true,sMethod[0],sMethod[1],args,nil) if pCall.Err!=nil { err = pCall.Err } + ReleaseCall(pCall) continue } @@ -345,7 +346,7 @@ func (slf *RpcHandler) callRpc(nodeId int,serviceMethod string,args interface{}, //2.rpcclient调用 //如果调用本结点服务 pClient := pClientList[0] - if pClient.blocalhost == true { + if pClient.bSelfNode == true { pLocalRpcServer:=slf.funcRpcServer() //判断是否是同一服务 sMethod := strings.Split(serviceMethod,".") @@ -360,8 +361,9 @@ func (slf *RpcHandler) callRpc(nodeId int,serviceMethod string,args interface{}, return pLocalRpcServer.myselfRpcHandlerGo(sMethod[0],sMethod[1],args,reply) } //其他的rpcHandler的处理器 - pCall := pLocalRpcServer.rpcHandlerGo(false,sMethod[0],sMethod[1],args,reply) + pCall := pLocalRpcServer.selfNodeRpcHandlerGo(pClient,false,sMethod[0],sMethod[1],args,reply) err = pCall.Done().Err + pClient.RemovePending(pCall.Seq) ReleaseCall(pCall) return err } @@ -416,7 +418,7 @@ func (slf *RpcHandler) asyncCallRpc(nodeid int,serviceMethod string,args interfa //2.rpcclient调用 //如果调用本结点服务 pClient := pClientList[0] - if pClient.blocalhost == true { + if pClient.bSelfNode == true { pLocalRpcServer:=slf.funcRpcServer() //判断是否是同一服务 sMethod := strings.Split(serviceMethod,".") @@ -439,17 +441,18 @@ func (slf *RpcHandler) asyncCallRpc(nodeid int,serviceMethod string,args interfa //其他的rpcHandler的处理器 if callback!=nil { - err = pLocalRpcServer.rpcHandlerAsyncGo(slf,false,sMethod[0],sMethod[1],args,reply,fVal) + err = pLocalRpcServer.selfNodeRpcHandlerAsyncGo(pClient,slf,false,sMethod[0],sMethod[1],args,reply,fVal) if err != nil { fVal.Call([]reflect.Value{reflect.ValueOf(reply),reflect.ValueOf(err)}) } return nil } - pCall := pLocalRpcServer.rpcHandlerGo(false,sMethod[0],sMethod[1],args,reply) - defer ReleaseCall(pCall) + pCall := pLocalRpcServer.selfNodeRpcHandlerGo(pClient,false,sMethod[0],sMethod[1],args,reply) + err = pCall.Done().Err + pClient.RemovePending(pCall.Seq) + ReleaseCall(pCall) - pResult := pCall.Done() - return pResult.Err + return err } //跨node调用 diff --git a/rpc/server.go b/rpc/server.go index 08cccfc..e0c7d4f 100644 --- a/rpc/server.go +++ b/rpc/server.go @@ -201,9 +201,10 @@ func (slf *Server) myselfRpcHandlerGo(handlerName string,methodName string, args } -func (slf *Server) rpcHandlerGo(noReply bool,handlerName string,methodName string, args interface{},reply interface{}) *Call { - pCall := MakeCall()//&Call{} - //pCall.done = make( chan *Call,1) +func (slf *Server) selfNodeRpcHandlerGo(client *Client,noReply bool,handlerName string,methodName string, args interface{},reply interface{}) *Call { + pCall := MakeCall() + pCall.Seq = client.generateSeq() + rpcHandler := slf.rpcHandleFinder.FindRpcHandler(handlerName) if rpcHandler== nil { pCall.Err = fmt.Errorf("service method %s.%s not config!", handlerName,methodName) @@ -217,7 +218,15 @@ func (slf *Server) rpcHandlerGo(noReply bool,handlerName string,methodName strin req.localReply = reply req.RpcRequestData = processor.MakeRpcRequest(0,fmt.Sprintf("%s.%s",handlerName,methodName),noReply,nil) if noReply == false { + client.AddPending(pCall) req.requestHandle = func(Returns interface{},Err *RpcError){ + v := client.FindPending(pCall.Seq) + if v == nil { + log.Error("rpcClient cannot find seq %d in pending",pCall.Seq) + ReleaseCall(pCall) + return + } + if Err!=nil { pCall.Err = Err }else{ @@ -239,8 +248,9 @@ func (slf *Server) rpcHandlerGo(noReply bool,handlerName string,methodName strin return pCall } -func (slf *Server) rpcHandlerAsyncGo(callerRpcHandler IRpcHandler,noReply bool,handlerName string,methodName string,args interface{},reply interface{},callback reflect.Value) error { +func (slf *Server) selfNodeRpcHandlerAsyncGo(client *Client,callerRpcHandler IRpcHandler,noReply bool,handlerName string,methodName string,args interface{},reply interface{},callback reflect.Value) error { pCall := MakeCall() + pCall.Seq = client.generateSeq() pCall.rpcHandler = callerRpcHandler pCall.callback = &callback pCall.Reply = reply @@ -256,7 +266,18 @@ func (slf *Server) rpcHandlerAsyncGo(callerRpcHandler IRpcHandler,noReply bool,h req.localReply = reply req.RpcRequestData = processor.MakeRpcRequest(0,fmt.Sprintf("%s.%s",handlerName,methodName),noReply,nil) if noReply == false { + client.AddPending(pCall) req.requestHandle = func(Returns interface{},Err *RpcError){ + //processor.ReleaseRpcRequest(req.RpcRequestData) + //ReleaseRpcRequest(req) + v := client.FindPending(pCall.Seq) + if v == nil { + log.Error("rpcClient cannot find seq %d in pending",pCall.Seq) + + ReleaseCall(pCall) + return + } + if Err == nil { pCall.Err = nil }else{ diff --git a/service/service.go b/service/service.go index d0fe596..170ab59 100644 --- a/service/service.go +++ b/service/service.go @@ -37,7 +37,6 @@ type Service struct { Module rpc.RpcHandler //rpc name string //service name - closeSig chan bool wg sync.WaitGroup serviceCfg interface{} gorouterNum int32