This commit is contained in:
duanhf2012
2024-12-05 10:19:24 +08:00
10 changed files with 49 additions and 49 deletions

View File

@@ -169,30 +169,30 @@ func Fatal(msg string, fields ...zap.Field) {
gLogger.stack = false
}
func Debugf(msg string, args ...any) {
gLogger.sugaredLogger.Debugf(msg, args...)
func Debugf(template string, args ...any) {
gLogger.sugaredLogger.Debugf(template, args...)
}
func Infof(msg string, args ...any) {
gLogger.sugaredLogger.Infof(msg, args...)
func Infof(template string, args ...any) {
gLogger.sugaredLogger.Infof(template, args...)
}
func Warnf(msg string, args ...any) {
gLogger.sugaredLogger.Warnf(msg, args...)
func Warnf(template string, args ...any) {
gLogger.sugaredLogger.Warnf(template, args...)
}
func Errorf(msg string, args ...any) {
gLogger.sugaredLogger.Errorf(msg, args...)
func Errorf(template string, args ...any) {
gLogger.sugaredLogger.Errorf(template, args...)
}
func StackErrorf(msg string, args ...any) {
func StackErrorf(template string, args ...any) {
gLogger.stack = true
gLogger.sugaredLogger.Errorf(msg, args...)
gLogger.sugaredLogger.Errorf(template, args...)
gLogger.stack = false
}
func Fatalf(msg string, args ...any) {
gLogger.sugaredLogger.Fatalf(msg, args...)
func Fatalf(template string, args ...any) {
gLogger.sugaredLogger.Fatalf(template, args...)
}
func (logger *Logger) SDebug(args ...interface{}) {

View File

@@ -330,13 +330,13 @@ func startNode(args interface{}) error {
myName, mErr := sysprocess.GetMyProcessName()
//当前进程名获取失败,不应该发生
if mErr != nil {
log.Info("get my process's name is error", log.ErrorField("err", mErr))
log.Error("get my process's name is error", log.ErrorField("err", mErr))
os.Exit(-1)
}
//进程id存在而且进程名也相同被认为是当前进程重复运行
if cErr == nil && name == myName {
log.Info("repeat runs are not allowed", log.String("nodeId", strNodeId), log.Int("processId", processId))
log.Error("repeat runs are not allowed", log.String("nodeId", strNodeId), log.Int("processId", processId))
os.Exit(-1)
}
break
@@ -354,10 +354,13 @@ func startNode(args interface{}) error {
service.Start()
//5.运行集群
cluster.GetCluster().Start()
err := cluster.GetCluster().Start()
if err != nil {
log.Error(err.Error())
os.Exit(-1)
}
//6.监听程序退出信号&性能报告
var pProfilerTicker *time.Ticker = &time.Ticker{}
if profilerInterval > 0 {
pProfilerTicker = time.NewTicker(profilerInterval)

View File

@@ -32,7 +32,7 @@ type IRealClient interface {
SetConn(conn *network.NetConn)
Close(waitDone bool)
AsyncCall(NodeId string, timeout time.Duration, rpcHandler IRpcHandler, serviceMethod string, callback reflect.Value, args interface{}, replyParam interface{}, cancelable bool) (CancelRpc, error)
AsyncCall(NodeId string, timeout time.Duration, rpcHandler IRpcHandler, serviceMethod string, callback reflect.Value, args interface{}, replyParam interface{}) (CancelRpc, error)
Go(NodeId string, timeout time.Duration, rpcHandler IRpcHandler, noReply bool, serviceMethod string, args interface{}, reply interface{}) *Call
RawGo(NodeId string, timeout time.Duration, rpcHandler IRpcHandler, processor IRpcProcessor, noReply bool, rpcMethodId uint32, serviceMethod string, rawArgs []byte, reply interface{}) *Call
IsConnected() bool
@@ -211,7 +211,7 @@ func (client *Client) rawGo(nodeId string, w IWriter, timeout time.Duration, rpc
return call
}
func (client *Client) asyncCall(nodeId string, w IWriter, timeout time.Duration, rpcHandler IRpcHandler, serviceMethod string, callback reflect.Value, args interface{}, replyParam interface{}, cancelable bool) (CancelRpc, error) {
func (client *Client) asyncCall(nodeId string, w IWriter, timeout time.Duration, rpcHandler IRpcHandler, serviceMethod string, callback reflect.Value, args interface{}, replyParam interface{}) (CancelRpc, error) {
processorType, processor := GetProcessorType(args)
InParam, herr := processor.Marshal(args)
if herr != nil {
@@ -264,10 +264,7 @@ func (client *Client) asyncCall(nodeId string, w IWriter, timeout time.Duration,
return emptyCancelRpc, err
}
if cancelable {
rpcCancel := RpcCancel{CallSeq: seq, Cli: client}
return rpcCancel.CancelRpc, nil
}
return emptyCancelRpc, nil
rpcCancel := RpcCancel{CallSeq: seq, Cli: client}
return rpcCancel.CancelRpc, nil
}

View File

@@ -90,7 +90,7 @@ func (lc *LClient) RawGo(nodeId string, timeout time.Duration, rpcHandler IRpcHa
return pLocalRpcServer.selfNodeRpcHandlerGo(timeout, processor, lc.selfClient, true, serviceName, rpcMethodId, serviceName, nil, nil, rawArgs)
}
func (lc *LClient) AsyncCall(nodeId string, timeout time.Duration, rpcHandler IRpcHandler, serviceMethod string, callback reflect.Value, args interface{}, reply interface{}, cancelable bool) (CancelRpc, error) {
func (lc *LClient) AsyncCall(nodeId string, timeout time.Duration, rpcHandler IRpcHandler, serviceMethod string, callback reflect.Value, args interface{}, reply interface{}) (CancelRpc, error) {
pLocalRpcServer := rpcHandler.GetRpcServer()()
//判断是否是同一服务
@@ -109,7 +109,7 @@ func (lc *LClient) AsyncCall(nodeId string, timeout time.Duration, rpcHandler IR
}
//其他的rpcHandler的处理器
cancelRpc, err := pLocalRpcServer.selfNodeRpcHandlerAsyncGo(timeout, lc.selfClient, rpcHandler, false, serviceName, serviceMethod, args, reply, callback, cancelable)
cancelRpc, err := pLocalRpcServer.selfNodeRpcHandlerAsyncGo(timeout, lc.selfClient, rpcHandler, false, serviceName, serviceMethod, args, reply, callback)
if err != nil {
callback.Call([]reflect.Value{reflect.ValueOf(reply), reflect.ValueOf(err)})
}
@@ -121,9 +121,6 @@ func NewLClient(localNodeId string, callSet *CallSet) *Client {
client := &Client{}
client.clientId = atomic.AddUint32(&clientSeq, 1)
client.targetNodeId = localNodeId
//client.maxCheckCallRpcCount = DefaultMaxCheckCallRpcCount
//client.callRpcTimeout = DefaultRpcTimeout
lClient := &LClient{}
lClient.selfClient = client
client.IRealClient = lClient

View File

@@ -127,7 +127,7 @@ func (server *BaseServer) selfNodeRpcHandlerGo(timeout time.Duration, processor
return pCall
}
func (server *BaseServer) selfNodeRpcHandlerAsyncGo(timeout time.Duration, client *Client, callerRpcHandler IRpcHandler, noReply bool, handlerName string, serviceMethod string, args interface{}, reply interface{}, callback reflect.Value, cancelable bool) (CancelRpc, error) {
func (server *BaseServer) selfNodeRpcHandlerAsyncGo(timeout time.Duration, client *Client, callerRpcHandler IRpcHandler, noReply bool, handlerName string, serviceMethod string, args interface{}, reply interface{}, callback reflect.Value) (CancelRpc, error) {
rpcHandler := server.rpcHandleFinder.FindRpcHandler(handlerName)
if rpcHandler == nil {
err := errors.New("service method " + serviceMethod + " not config!")

View File

@@ -63,8 +63,8 @@ func (nc *NatsClient) RawGo(nodeId string, timeout time.Duration, rpcHandler IRp
return nc.client.rawGo(nodeId, nc, timeout, rpcHandler, processor, noReply, rpcMethodId, serviceMethod, rawArgs, reply)
}
func (nc *NatsClient) AsyncCall(nodeId string, timeout time.Duration, rpcHandler IRpcHandler, serviceMethod string, callback reflect.Value, args interface{}, replyParam interface{}, cancelable bool) (CancelRpc, error) {
cancelRpc, err := nc.client.asyncCall(nodeId, nc, timeout, rpcHandler, serviceMethod, callback, args, replyParam, cancelable)
func (nc *NatsClient) AsyncCall(nodeId string, timeout time.Duration, rpcHandler IRpcHandler, serviceMethod string, callback reflect.Value, args interface{}, replyParam interface{}) (CancelRpc, error) {
cancelRpc, err := nc.client.asyncCall(nodeId, nc, timeout, rpcHandler, serviceMethod, callback, args, replyParam)
if err != nil {
callback.Call([]reflect.Value{reflect.ValueOf(replyParam), reflect.ValueOf(err)})
}

View File

@@ -62,8 +62,8 @@ func (rc *RClient) RawGo(nodeId string, timeout time.Duration, rpcHandler IRpcHa
return rc.selfClient.rawGo(nodeId, rc, timeout, rpcHandler, processor, noReply, rpcMethodId, serviceMethod, rawArgs, reply)
}
func (rc *RClient) AsyncCall(nodeId string, timeout time.Duration, rpcHandler IRpcHandler, serviceMethod string, callback reflect.Value, args interface{}, replyParam interface{}, cancelable bool) (CancelRpc, error) {
cancelRpc, err := rc.selfClient.asyncCall(nodeId, rc, timeout, rpcHandler, serviceMethod, callback, args, replyParam, cancelable)
func (rc *RClient) AsyncCall(nodeId string, timeout time.Duration, rpcHandler IRpcHandler, serviceMethod string, callback reflect.Value, args interface{}, replyParam interface{}) (CancelRpc, error) {
cancelRpc, err := rc.selfClient.asyncCall(nodeId, rc, timeout, rpcHandler, serviceMethod, callback, args, replyParam)
if err != nil {
callback.Call([]reflect.Value{reflect.ValueOf(replyParam), reflect.ValueOf(err)})
}

View File

@@ -164,13 +164,6 @@ func (handler *RpcHandler) suitableMethods(method reflect.Method) error {
//取出输入参数类型
var rpcMethodInfo RpcMethodInfo
typ := method.Type
if typ.NumOut() != 1 {
return fmt.Errorf("%s The number of returned arguments must be 1", method.Name)
}
if typ.Out(0).String() != "error" {
return fmt.Errorf("%s The return parameter must be of type error", method.Name)
}
if typ.NumIn() < 2 || typ.NumIn() > 4 {
return fmt.Errorf("%s Unsupported parameter format", method.Name)
@@ -183,6 +176,18 @@ func (handler *RpcHandler) suitableMethods(method reflect.Method) error {
rpcMethodInfo.hasResponder = true
}
if rpcMethodInfo.hasResponder && typ.NumOut() > 0 {
return fmt.Errorf("%s should not have return parameters", method.Name)
}
if !rpcMethodInfo.hasResponder && typ.NumOut() != 1 {
return fmt.Errorf("%s The number of returned arguments must be 1", method.Name)
}
if !rpcMethodInfo.hasResponder && typ.Out(0).String() != "error" {
return fmt.Errorf("%s The return parameter must be of type error", method.Name)
}
for i := parIdx; i < typ.NumIn(); i++ {
if handler.isExportedOrBuiltinType(typ.In(i)) == false {
return fmt.Errorf("%s Unsupported parameter types", method.Name)
@@ -307,9 +312,11 @@ func (handler *RpcHandler) HandlerRpcRequest(request *RpcRequest) {
requestHandle := request.requestHandle
returnValues := v.method.Func.Call(paramList)
errInter := returnValues[0].Interface()
if errInter != nil {
err = errInter.(error)
if len(returnValues) > 0 {
errInter := returnValues[0].Interface()
if errInter != nil {
err = errInter.(error)
}
}
if v.hasResponder == false && requestHandle != nil {
@@ -526,8 +533,7 @@ func (handler *RpcHandler) asyncCallRpc(timeout time.Duration, nodeId string, se
}
//2.rpcClient调用
//如果调用本结点服务
return pClientList[0].AsyncCall(pClientList[0].GetTargetNodeId(), timeout, handler.rpcHandler, serviceMethod, fVal, args, reply, false)
return pClientList[0].AsyncCall(pClientList[0].GetTargetNodeId(), timeout, handler.rpcHandler, serviceMethod, fVal, args, reply, )
}
func (handler *RpcHandler) GetName() string {

View File

@@ -27,9 +27,6 @@ func (rn *RpcNats) NewNatsClient(targetNodeId string,localNodeId string,callSet
client.clientId = atomic.AddUint32(&clientSeq, 1)
client.targetNodeId = targetNodeId
//client.maxCheckCallRpcCount = DefaultMaxCheckCallRpcCount
//client.callRpcTimeout = DefaultRpcTimeout
natsClient := &rn.NatsClient
natsClient.localNodeId = localNodeId
natsClient.client = &client

View File

@@ -31,7 +31,7 @@ type IServer interface {
selfNodeRpcHandlerGo(timeout time.Duration, processor IRpcProcessor, client *Client, noReply bool, handlerName string, rpcMethodId uint32, serviceMethod string, args interface{}, reply interface{}, rawArgs []byte) *Call
myselfRpcHandlerGo(client *Client, handlerName string, serviceMethod string, args interface{}, callBack reflect.Value, reply interface{}) error
selfNodeRpcHandlerAsyncGo(timeout time.Duration, client *Client, callerRpcHandler IRpcHandler, noReply bool, handlerName string, serviceMethod string, args interface{}, reply interface{}, callback reflect.Value, cancelable bool) (CancelRpc, error)
selfNodeRpcHandlerAsyncGo(timeout time.Duration, client *Client, callerRpcHandler IRpcHandler, noReply bool, handlerName string, serviceMethod string, args interface{}, reply interface{}, callback reflect.Value) (CancelRpc, error)
}
type writeResponse func(processor IRpcProcessor, connTag string, serviceMethod string, seq uint64, reply interface{}, rpcError RpcError)