替换日志库为slog

This commit is contained in:
duanhf2012
2023-08-15 15:46:38 +08:00
parent 4ad8204fde
commit ef8182eec7
30 changed files with 811 additions and 769 deletions

View File

@@ -69,7 +69,7 @@ const Default_ReadWriteDeadline = 15*time.Second
func (server *Server) Start(listenAddr string, maxRpcParamLen uint32,compressBytesLen int) {
splitAddr := strings.Split(listenAddr, ":")
if len(splitAddr) != 2 {
log.SFatal("listen addr is error :", listenAddr)
log.Fatal("listen addr is failed", log.String("listenAddr",listenAddr))
}
server.rpcServer.Addr = ":" + splitAddr[1]
@@ -111,7 +111,7 @@ func (agent *RpcAgent) WriteResponse(processor IRpcProcessor, serviceMethod stri
defer processor.ReleaseRpcResponse(rpcResponse.RpcResponseData)
if errM != nil {
log.SError("service method ", serviceMethod, " Marshal error:", errM.Error())
log.Error("mashal RpcResponseData failed",log.String("serviceMethod",serviceMethod),log.ErrorAttr("error",errM))
return
}
@@ -122,7 +122,7 @@ func (agent *RpcAgent) WriteResponse(processor IRpcProcessor, serviceMethod stri
compressBuff,cErr = compressor.CompressBlock(bytes)
if cErr != nil {
log.SError("service method ", serviceMethod, " CompressBlock error:", cErr.Error())
log.Error("CompressBlock failed",log.String("serviceMethod",serviceMethod),log.ErrorAttr("error",cErr))
return
}
if len(compressBuff) < len(bytes) {
@@ -136,7 +136,7 @@ func (agent *RpcAgent) WriteResponse(processor IRpcProcessor, serviceMethod stri
compressor.CompressBufferCollection(compressBuff)
}
if errM != nil {
log.SError("Rpc ", serviceMethod, " return is error:", errM.Error())
log.Error("WriteMsg error,Rpc return is fail",log.String("serviceMethod",serviceMethod),log.ErrorAttr("error",errM))
}
}
@@ -144,7 +144,7 @@ func (agent *RpcAgent) Run() {
for {
data, err := agent.conn.ReadMsg()
if err != nil {
log.SError("remoteAddress:", agent.conn.RemoteAddr().String(), ",read message: ", err.Error())
log.Error("read message is error",log.String("remoteAddress",agent.conn.RemoteAddr().String()),log.ErrorAttr("error",err))
//will close tcpconn
break
}
@@ -153,7 +153,7 @@ func (agent *RpcAgent) Run() {
processor := GetProcessor(data[0]&0x7f)
if processor == nil {
agent.conn.ReleaseReadMsg(data)
log.SError("remote rpc ", agent.conn.RemoteAddr().String(), " cannot find processor:", data[0])
log.Warning("cannot find processor",log.String("RemoteAddr",agent.conn.RemoteAddr().String()))
return
}
@@ -166,7 +166,7 @@ func (agent *RpcAgent) Run() {
compressBuff,unCompressErr = compressor.UncompressBlock(byteData)
if unCompressErr!= nil {
agent.conn.ReleaseReadMsg(data)
log.SError("rpcClient ", agent.conn.RemoteAddr().String(), " ReadMsg head error:", unCompressErr.Error())
log.Error("UncompressBlock failed",log.String("RemoteAddr",agent.conn.RemoteAddr().String()),log.ErrorAttr("error",unCompressErr))
return
}
byteData = compressBuff
@@ -179,7 +179,7 @@ func (agent *RpcAgent) Run() {
}
agent.conn.ReleaseReadMsg(data)
if err != nil {
log.SError("rpc Unmarshal request is error:", err.Error())
log.Error("Unmarshal failed",log.String("RemoteAddr",agent.conn.RemoteAddr().String()),log.ErrorAttr("error",err))
if req.RpcRequestData.GetSeq() > 0 {
rpcError := RpcError(err.Error())
if req.RpcRequestData.IsNoReply() == false {
@@ -201,7 +201,7 @@ func (agent *RpcAgent) Run() {
agent.WriteResponse(processor, req.RpcRequestData.GetServiceMethod(), req.RpcRequestData.GetSeq(), nil, rpcError)
}
ReleaseRpcRequest(req)
log.SError("rpc request req.ServiceMethod is error")
log.Error("rpc request req.ServiceMethod is error")
continue
}
@@ -211,8 +211,7 @@ func (agent *RpcAgent) Run() {
if req.RpcRequestData.IsNoReply() == false {
agent.WriteResponse(processor, req.RpcRequestData.GetServiceMethod(), req.RpcRequestData.GetSeq(), nil, rpcError)
}
log.SError("service method ", req.RpcRequestData.GetServiceMethod(), " not config!")
log.Error("serviceMethod not config",log.String("serviceMethod",req.RpcRequestData.GetServiceMethod()))
ReleaseRpcRequest(req)
continue
}
@@ -232,7 +231,7 @@ func (agent *RpcAgent) Run() {
} else {
ReleaseRpcRequest(req)
}
log.SError(rErr)
log.Error("call rpc param error",log.String("serviceMethod",req.RpcRequestData.GetServiceMethod()),log.ErrorAttr("error",err))
continue
}
@@ -281,7 +280,7 @@ func (server *Server) myselfRpcHandlerGo(client *Client,handlerName string, serv
rpcHandler := server.rpcHandleFinder.FindRpcHandler(handlerName)
if rpcHandler == nil {
err := errors.New("service method " + serviceMethod + " not config!")
log.SError(err.Error())
log.Error("service method not config",log.String("serviceMethod",serviceMethod))
return err
}
@@ -297,7 +296,7 @@ func (server *Server) selfNodeRpcHandlerGo(timeout time.Duration,processor IRpcP
rpcHandler := server.rpcHandleFinder.FindRpcHandler(handlerName)
if rpcHandler == nil {
err := errors.New("service method " + serviceMethod + " not config!")
log.SError(err.Error())
log.Error("service method not config",log.String("serviceMethod",serviceMethod),log.ErrorAttr("error",err))
pCall.Seq = 0
pCall.DoError(err)
@@ -314,7 +313,7 @@ func (server *Server) selfNodeRpcHandlerGo(timeout time.Duration,processor IRpcP
iParam,err = processor.Clone(args)
if err != nil {
sErr := errors.New("RpcHandler " + handlerName + "."+serviceMethod+" deep copy inParam is error:" + err.Error())
log.SError(sErr.Error())
log.Error("deep copy inParam is failed",log.String("handlerName",handlerName),log.String("serviceMethod",serviceMethod))
pCall.Seq = 0
pCall.DoError(sErr)
@@ -329,7 +328,7 @@ func (server *Server) selfNodeRpcHandlerGo(timeout time.Duration,processor IRpcP
var err error
req.inParam, err = rpcHandler.UnmarshalInParam(processor, serviceMethod, rpcMethodId, rawArgs)
if err != nil {
log.SError(err.Error())
log.Error("unmarshalInParam is failed",log.String("serviceMethod",serviceMethod),log.Uint32("rpcMethodId",rpcMethodId),log.ErrorAttr("error",err))
pCall.Seq = 0
pCall.DoError(err)
ReleaseRpcRequest(req)
@@ -345,12 +344,12 @@ func (server *Server) selfNodeRpcHandlerGo(timeout time.Duration,processor IRpcP
byteReturns, err := req.rpcProcessor.Marshal(Returns)
if err != nil {
Err = ConvertError(err)
log.SError("returns data cannot be marshal,callSeq is ", callSeq," error is ",err.Error())
log.Error("returns data cannot be marshal",log.Uint64("seq",callSeq),log.ErrorAttr("error",err))
}else{
err = req.rpcProcessor.Unmarshal(byteReturns, reply)
if err != nil {
Err = ConvertError(err)
log.SError("returns data cannot be Unmarshal,callSeq is ", callSeq," error is ",err.Error())
log.Error("returns data cannot be Unmarshal",log.Uint64("seq",callSeq),log.ErrorAttr("error",err))
}
}
}
@@ -358,8 +357,7 @@ func (server *Server) selfNodeRpcHandlerGo(timeout time.Duration,processor IRpcP
ReleaseRpcRequest(req)
v := client.RemovePending(callSeq)
if v == nil {
log.SError("rpcClient cannot find seq ",callSeq, " in pending")
log.Error("rpcClient cannot find seq",log.Uint64("seq",callSeq))
return
}
@@ -367,7 +365,7 @@ func (server *Server) selfNodeRpcHandlerGo(timeout time.Duration,processor IRpcP
v.Err = nil
v.DoOK()
} else {
log.SError(Err.Error())
log.Error(Err.Error())
v.DoError(Err)
}
}
@@ -375,7 +373,7 @@ func (server *Server) selfNodeRpcHandlerGo(timeout time.Duration,processor IRpcP
err := rpcHandler.PushRpcRequest(req)
if err != nil {
log.SError(err.Error())
log.Error(err.Error())
pCall.DoError(err)
ReleaseRpcRequest(req)
}
@@ -387,7 +385,7 @@ func (server *Server) selfNodeRpcHandlerAsyncGo(timeout time.Duration,client *Cl
rpcHandler := server.rpcHandleFinder.FindRpcHandler(handlerName)
if rpcHandler == nil {
err := errors.New("service method " + serviceMethod + " not config!")
log.SError(err.Error())
log.Error(err.Error())
return emptyCancelRpc,err
}
@@ -395,7 +393,7 @@ func (server *Server) selfNodeRpcHandlerAsyncGo(timeout time.Duration,client *Cl
iParam,err := processor.Clone(args)
if err != nil {
errM := errors.New("RpcHandler " + handlerName + "."+serviceMethod+" deep copy inParam is error:" + err.Error())
log.SError(errM.Error())
log.Error(errM.Error())
return emptyCancelRpc,errM
}