From f78d0d58be9d77b791cd79c8edf732296ec08d86 Mon Sep 17 00:00:00 2001 From: duanhf2012 <6549168@qq.com> Date: Thu, 4 May 2023 17:35:40 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96rpc=E4=B8=8Erankservice?= =?UTF-8?q?=E6=8C=81=E4=B9=85=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- rpc/lclient.go | 8 ++++--- rpc/rclient.go | 15 +++++++++---- rpc/rpc.go | 9 ++++++++ rpc/server.go | 27 +++++++++++++----------- sysservice/rankservice/MongodbPersist.go | 14 ++++++++++-- 5 files changed, 52 insertions(+), 21 deletions(-) diff --git a/rpc/lclient.go b/rpc/lclient.go index 56c7831..f4c3728 100644 --- a/rpc/lclient.go +++ b/rpc/lclient.go @@ -44,7 +44,8 @@ func (lc *LClient) Go(rpcHandler IRpcHandler,noReply bool, serviceMethod string, sErr := errors.New("Call serviceMethod " + serviceMethod + " is error!") log.SError(sErr.Error()) call := MakeCall() - call.Err = sErr + call.DoError(sErr) + return call } @@ -53,12 +54,13 @@ func (lc *LClient) Go(rpcHandler IRpcHandler,noReply bool, serviceMethod string, //调用自己rpcHandler处理器 err := pLocalRpcServer.myselfRpcHandlerGo(lc.selfClient,serviceName, serviceMethod, args, requestHandlerNull,reply) call := MakeCall() + if err != nil { - call.Err = err + call.DoError(err) return call } - call.done<-call + call.DoOK() return call } diff --git a/rpc/rclient.go b/rpc/rclient.go index 446f8d8..6296724 100644 --- a/rpc/rclient.go +++ b/rpc/rclient.go @@ -44,8 +44,9 @@ func (rc *RClient) Go(rpcHandler IRpcHandler,noReply bool, serviceMethod string, _, processor := GetProcessorType(args) InParam, err := processor.Marshal(args) if err != nil { + log.SError(err.Error()) call := MakeCall() - call.Err = err + call.DoError(err) return call } @@ -65,14 +66,17 @@ func (rc *RClient) RawGo(rpcHandler IRpcHandler,processor IRpcProcessor, noReply if err != nil { call.Seq = 0 - call.Err = err + log.SError(err.Error()) + call.DoError(err) return call } conn := rc.GetConn() if conn == nil || conn.IsConnected()==false { call.Seq = 0 - call.Err = errors.New(serviceMethod + " was called failed,rpc client is disconnect") + sErr := errors.New(serviceMethod + " was called failed,rpc client is disconnect") + log.SError(sErr.Error()) + call.DoError(sErr) return call } @@ -83,8 +87,11 @@ func (rc *RClient) RawGo(rpcHandler IRpcHandler,processor IRpcProcessor, noReply err = conn.WriteMsg([]byte{uint8(processor.GetProcessorType())}, bytes) if err != nil { rc.selfClient.RemovePending(call.Seq) + + log.SError(err.Error()) + call.Seq = 0 - call.Err = err + call.DoError(err) } return call diff --git a/rpc/rpc.go b/rpc/rpc.go index e16753e..f21d940 100644 --- a/rpc/rpc.go +++ b/rpc/rpc.go @@ -102,6 +102,15 @@ func (rpcResponse *RpcResponse) Clear() *RpcResponse{ return rpcResponse } +func (call *Call) DoError(err error){ + call.Err = err + call.done <- call +} + +func (call *Call) DoOK(){ + call.done <- call +} + func (call *Call) Clear() *Call{ call.Seq = 0 call.ServiceMethod = "" diff --git a/rpc/server.go b/rpc/server.go index 74711e8..9b67803 100644 --- a/rpc/server.go +++ b/rpc/server.go @@ -256,10 +256,10 @@ func (server *Server) selfNodeRpcHandlerGo(processor IRpcProcessor, client *Clie rpcHandler := server.rpcHandleFinder.FindRpcHandler(handlerName) if rpcHandler == nil { + err := errors.New("service method " + serviceMethod + " not config!") + log.SError(err.Error()) pCall.Seq = 0 - pCall.Err = errors.New("service method " + serviceMethod + " not config!") - pCall.done <- pCall - log.SError(pCall.Err.Error()) + pCall.DoError(err) return pCall } @@ -273,10 +273,10 @@ func (server *Server) selfNodeRpcHandlerGo(processor IRpcProcessor, client *Clie var err error iParam,err = processor.Clone(args) if err != nil { + sErr := errors.New("RpcHandler " + handlerName + "."+serviceMethod+" deep copy inParam is error:" + err.Error()) + log.SError(sErr.Error()) pCall.Seq = 0 - pCall.Err = errors.New("RpcHandler " + handlerName + "."+serviceMethod+" deep copy inParam is error:" + err.Error()) - pCall.done <- pCall - log.SError(pCall.Err.Error()) + pCall.DoError(sErr) return pCall } @@ -289,9 +289,10 @@ func (server *Server) selfNodeRpcHandlerGo(processor IRpcProcessor, client *Clie var err error req.inParam, err = rpcHandler.UnmarshalInParam(processor, serviceMethod, rpcMethodId, rawArgs) if err != nil { + log.SError(err.Error()) + pCall.Seq = 0 + pCall.DoError(err) ReleaseRpcRequest(req) - pCall.Err = err - pCall.done <- pCall return pCall } } @@ -321,20 +322,22 @@ func (server *Server) selfNodeRpcHandlerGo(processor IRpcProcessor, client *Clie return } + if len(Err) == 0 { v.Err = nil + v.DoOK() } else { - v.Err = Err + log.SError(Err.Error()) + v.DoError(Err) } - v.done <- v } } err := rpcHandler.PushRpcRequest(req) if err != nil { + log.SError(err.Error()) + pCall.DoError(err) ReleaseRpcRequest(req) - pCall.Err = err - pCall.done <- pCall } return pCall diff --git a/sysservice/rankservice/MongodbPersist.go b/sysservice/rankservice/MongodbPersist.go index d05745f..bb62e21 100644 --- a/sysservice/rankservice/MongodbPersist.go +++ b/sysservice/rankservice/MongodbPersist.go @@ -6,9 +6,9 @@ import ( "github.com/duanhf2012/origin/rpc" "github.com/duanhf2012/origin/service" "github.com/duanhf2012/origin/sysmodule/mongodbmodule" - "github.com/duanhf2012/origin/util/coroutine" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo/options" + "runtime" "sync" "sync/atomic" "time" @@ -71,7 +71,8 @@ func (mp *MongoPersist) OnInit() error { } //开启协程 - coroutine.GoRecover(mp.persistCoroutine,-1) + go mp.persistCoroutine() + return nil } @@ -291,6 +292,15 @@ func (mp *MongoPersist) hasPersistData() bool{ } func (mp *MongoPersist) saveToDB(){ + defer func() { + if r := recover(); r != nil { + buf := make([]byte, 4096) + l := runtime.Stack(buf, false) + errString := fmt.Sprint(r) + log.SError(" Core dump info[", errString, "]\n", string(buf[:l])) + } + }() + //1.copy数据 mp.Lock() mapRemoveRankData := mp.mapRemoveRankData