优化rpc与rankservice持久化

This commit is contained in:
duanhf2012
2023-05-04 17:35:40 +08:00
parent 5675681ab1
commit f78d0d58be
5 changed files with 52 additions and 21 deletions

View File

@@ -44,7 +44,8 @@ func (lc *LClient) Go(rpcHandler IRpcHandler,noReply bool, serviceMethod string,
sErr := errors.New("Call serviceMethod " + serviceMethod + " is error!") sErr := errors.New("Call serviceMethod " + serviceMethod + " is error!")
log.SError(sErr.Error()) log.SError(sErr.Error())
call := MakeCall() call := MakeCall()
call.Err = sErr call.DoError(sErr)
return call return call
} }
@@ -53,12 +54,13 @@ func (lc *LClient) Go(rpcHandler IRpcHandler,noReply bool, serviceMethod string,
//调用自己rpcHandler处理器 //调用自己rpcHandler处理器
err := pLocalRpcServer.myselfRpcHandlerGo(lc.selfClient,serviceName, serviceMethod, args, requestHandlerNull,reply) err := pLocalRpcServer.myselfRpcHandlerGo(lc.selfClient,serviceName, serviceMethod, args, requestHandlerNull,reply)
call := MakeCall() call := MakeCall()
if err != nil { if err != nil {
call.Err = err call.DoError(err)
return call return call
} }
call.done<-call call.DoOK()
return call return call
} }

View File

@@ -44,8 +44,9 @@ func (rc *RClient) Go(rpcHandler IRpcHandler,noReply bool, serviceMethod string,
_, processor := GetProcessorType(args) _, processor := GetProcessorType(args)
InParam, err := processor.Marshal(args) InParam, err := processor.Marshal(args)
if err != nil { if err != nil {
log.SError(err.Error())
call := MakeCall() call := MakeCall()
call.Err = err call.DoError(err)
return call return call
} }
@@ -65,14 +66,17 @@ func (rc *RClient) RawGo(rpcHandler IRpcHandler,processor IRpcProcessor, noReply
if err != nil { if err != nil {
call.Seq = 0 call.Seq = 0
call.Err = err log.SError(err.Error())
call.DoError(err)
return call return call
} }
conn := rc.GetConn() conn := rc.GetConn()
if conn == nil || conn.IsConnected()==false { if conn == nil || conn.IsConnected()==false {
call.Seq = 0 call.Seq = 0
call.Err = errors.New(serviceMethod + " was called failed,rpc client is disconnect") sErr := errors.New(serviceMethod + " was called failed,rpc client is disconnect")
log.SError(sErr.Error())
call.DoError(sErr)
return call return call
} }
@@ -83,8 +87,11 @@ func (rc *RClient) RawGo(rpcHandler IRpcHandler,processor IRpcProcessor, noReply
err = conn.WriteMsg([]byte{uint8(processor.GetProcessorType())}, bytes) err = conn.WriteMsg([]byte{uint8(processor.GetProcessorType())}, bytes)
if err != nil { if err != nil {
rc.selfClient.RemovePending(call.Seq) rc.selfClient.RemovePending(call.Seq)
log.SError(err.Error())
call.Seq = 0 call.Seq = 0
call.Err = err call.DoError(err)
} }
return call return call

View File

@@ -102,6 +102,15 @@ func (rpcResponse *RpcResponse) Clear() *RpcResponse{
return rpcResponse return rpcResponse
} }
func (call *Call) DoError(err error){
call.Err = err
call.done <- call
}
func (call *Call) DoOK(){
call.done <- call
}
func (call *Call) Clear() *Call{ func (call *Call) Clear() *Call{
call.Seq = 0 call.Seq = 0
call.ServiceMethod = "" call.ServiceMethod = ""

View File

@@ -256,10 +256,10 @@ func (server *Server) selfNodeRpcHandlerGo(processor IRpcProcessor, client *Clie
rpcHandler := server.rpcHandleFinder.FindRpcHandler(handlerName) rpcHandler := server.rpcHandleFinder.FindRpcHandler(handlerName)
if rpcHandler == nil { if rpcHandler == nil {
err := errors.New("service method " + serviceMethod + " not config!")
log.SError(err.Error())
pCall.Seq = 0 pCall.Seq = 0
pCall.Err = errors.New("service method " + serviceMethod + " not config!") pCall.DoError(err)
pCall.done <- pCall
log.SError(pCall.Err.Error())
return pCall return pCall
} }
@@ -273,10 +273,10 @@ func (server *Server) selfNodeRpcHandlerGo(processor IRpcProcessor, client *Clie
var err error var err error
iParam,err = processor.Clone(args) iParam,err = processor.Clone(args)
if err != nil { if err != nil {
sErr := errors.New("RpcHandler " + handlerName + "."+serviceMethod+" deep copy inParam is error:" + err.Error())
log.SError(sErr.Error())
pCall.Seq = 0 pCall.Seq = 0
pCall.Err = errors.New("RpcHandler " + handlerName + "."+serviceMethod+" deep copy inParam is error:" + err.Error()) pCall.DoError(sErr)
pCall.done <- pCall
log.SError(pCall.Err.Error())
return pCall return pCall
} }
@@ -289,9 +289,10 @@ func (server *Server) selfNodeRpcHandlerGo(processor IRpcProcessor, client *Clie
var err error var err error
req.inParam, err = rpcHandler.UnmarshalInParam(processor, serviceMethod, rpcMethodId, rawArgs) req.inParam, err = rpcHandler.UnmarshalInParam(processor, serviceMethod, rpcMethodId, rawArgs)
if err != nil { if err != nil {
log.SError(err.Error())
pCall.Seq = 0
pCall.DoError(err)
ReleaseRpcRequest(req) ReleaseRpcRequest(req)
pCall.Err = err
pCall.done <- pCall
return pCall return pCall
} }
} }
@@ -321,20 +322,22 @@ func (server *Server) selfNodeRpcHandlerGo(processor IRpcProcessor, client *Clie
return return
} }
if len(Err) == 0 { if len(Err) == 0 {
v.Err = nil v.Err = nil
v.DoOK()
} else { } else {
v.Err = Err log.SError(Err.Error())
v.DoError(Err)
} }
v.done <- v
} }
} }
err := rpcHandler.PushRpcRequest(req) err := rpcHandler.PushRpcRequest(req)
if err != nil { if err != nil {
log.SError(err.Error())
pCall.DoError(err)
ReleaseRpcRequest(req) ReleaseRpcRequest(req)
pCall.Err = err
pCall.done <- pCall
} }
return pCall return pCall

View File

@@ -6,9 +6,9 @@ import (
"github.com/duanhf2012/origin/rpc" "github.com/duanhf2012/origin/rpc"
"github.com/duanhf2012/origin/service" "github.com/duanhf2012/origin/service"
"github.com/duanhf2012/origin/sysmodule/mongodbmodule" "github.com/duanhf2012/origin/sysmodule/mongodbmodule"
"github.com/duanhf2012/origin/util/coroutine"
"go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo/options" "go.mongodb.org/mongo-driver/mongo/options"
"runtime"
"sync" "sync"
"sync/atomic" "sync/atomic"
"time" "time"
@@ -71,7 +71,8 @@ func (mp *MongoPersist) OnInit() error {
} }
//开启协程 //开启协程
coroutine.GoRecover(mp.persistCoroutine,-1) go mp.persistCoroutine()
return nil return nil
} }
@@ -291,6 +292,15 @@ func (mp *MongoPersist) hasPersistData() bool{
} }
func (mp *MongoPersist) saveToDB(){ func (mp *MongoPersist) saveToDB(){
defer func() {
if r := recover(); r != nil {
buf := make([]byte, 4096)
l := runtime.Stack(buf, false)
errString := fmt.Sprint(r)
log.SError(" Core dump info[", errString, "]\n", string(buf[:l]))
}
}()
//1.copy数据 //1.copy数据
mp.Lock() mp.Lock()
mapRemoveRankData := mp.mapRemoveRankData mapRemoveRankData := mp.mapRemoveRankData