mirror of
https://github.com/duanhf2012/origin.git
synced 2026-02-13 23:24:45 +08:00
Compare commits
12 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3b10eeb792 | ||
|
|
e3275e9f2a | ||
|
|
16745b34f0 | ||
|
|
f34dc7d53f | ||
|
|
0a09dc2fee | ||
|
|
f01a93c446 | ||
|
|
4d2ab4ee4f | ||
|
|
ffcc5a3489 | ||
|
|
cf6ca0483b | ||
|
|
97a21e6f71 | ||
|
|
f60a55d03a | ||
|
|
2c32d6eec9 |
@@ -417,13 +417,12 @@ func (cls *Cluster) readLocalService(localNodeId string) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cls *Cluster) parseLocalCfg() {
|
func (cls *Cluster) parseLocalCfg() error{
|
||||||
rpcInfo := NodeRpcInfo{}
|
rpcInfo := NodeRpcInfo{}
|
||||||
rpcInfo.nodeInfo = cls.localNodeInfo
|
rpcInfo.nodeInfo = cls.localNodeInfo
|
||||||
rpcInfo.client = rpc.NewLClient(rpcInfo.nodeInfo.NodeId, &cls.callSet)
|
rpcInfo.client = rpc.NewLClient(rpcInfo.nodeInfo.NodeId, &cls.callSet)
|
||||||
|
|
||||||
cls.mapRpc[cls.localNodeInfo.NodeId] = &rpcInfo
|
cls.mapRpc[cls.localNodeInfo.NodeId] = &rpcInfo
|
||||||
|
|
||||||
for _, serviceName := range cls.localNodeInfo.ServiceList {
|
for _, serviceName := range cls.localNodeInfo.ServiceList {
|
||||||
splitServiceName := strings.Split(serviceName, ":")
|
splitServiceName := strings.Split(serviceName, ":")
|
||||||
if len(splitServiceName) == 2 {
|
if len(splitServiceName) == 2 {
|
||||||
@@ -440,8 +439,13 @@ func (cls *Cluster) parseLocalCfg() {
|
|||||||
cls.mapServiceNode[serviceName] = make(map[string]struct{})
|
cls.mapServiceNode[serviceName] = make(map[string]struct{})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if _,ok:=cls.mapServiceNode[serviceName][cls.localNodeInfo.NodeId];ok {
|
||||||
|
return fmt.Errorf("duplicate service %s is configured in node %s", serviceName, cls.localNodeInfo.NodeId)
|
||||||
|
}
|
||||||
cls.mapServiceNode[serviceName][cls.localNodeInfo.NodeId] = struct{}{}
|
cls.mapServiceNode[serviceName][cls.localNodeInfo.NodeId] = struct{}{}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cls *Cluster) IsNatsMode() bool {
|
func (cls *Cluster) IsNatsMode() bool {
|
||||||
@@ -474,8 +478,7 @@ func (cls *Cluster) InitCfg(localNodeId string) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
//本地配置服务加到全局map信息中
|
//本地配置服务加到全局map信息中
|
||||||
cls.parseLocalCfg()
|
return cls.parseLocalCfg()
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cls *Cluster) IsConfigService(serviceName string) bool {
|
func (cls *Cluster) IsConfigService(serviceName string) bool {
|
||||||
|
|||||||
75
log/log.go
75
log/log.go
@@ -18,14 +18,16 @@ type Logger struct {
|
|||||||
OpenConsole *bool
|
OpenConsole *bool
|
||||||
LogPath string
|
LogPath string
|
||||||
FileName string
|
FileName string
|
||||||
|
Skip int
|
||||||
LogLevel zapcore.Level
|
LogLevel zapcore.Level
|
||||||
Encoder zapcore.Encoder
|
Encoder zapcore.Encoder
|
||||||
LogConfig *lumberjack.Logger
|
LogConfig *lumberjack.Logger
|
||||||
sugaredLogger *zap.SugaredLogger
|
SugaredLogger *zap.SugaredLogger
|
||||||
|
CoreList []zapcore.Core
|
||||||
}
|
}
|
||||||
|
|
||||||
func SetLogger(logger *Logger) {
|
func SetLogger(logger *Logger) {
|
||||||
if logger != nil && isSetLogger == false {
|
if logger != nil {
|
||||||
gLogger = logger
|
gLogger = logger
|
||||||
isSetLogger = true
|
isSetLogger = true
|
||||||
}
|
}
|
||||||
@@ -39,6 +41,10 @@ func (logger *Logger) SetEncoder(encoder zapcore.Encoder) {
|
|||||||
logger.Encoder = encoder
|
logger.Encoder = encoder
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (logger *Logger) SetSkip(skip int) {
|
||||||
|
logger.Skip = skip
|
||||||
|
}
|
||||||
|
|
||||||
func GetJsonEncoder() zapcore.Encoder {
|
func GetJsonEncoder() zapcore.Encoder {
|
||||||
encoderConfig := zap.NewProductionEncoderConfig()
|
encoderConfig := zap.NewProductionEncoderConfig()
|
||||||
encoderConfig.EncodeLevel = zapcore.CapitalLevelEncoder
|
encoderConfig.EncodeLevel = zapcore.CapitalLevelEncoder
|
||||||
@@ -90,22 +96,27 @@ func (logger *Logger) Enabled(zapcore.Level) bool {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (logger *Logger) Init() {
|
func (logger *Logger) Init() {
|
||||||
var coreList []zapcore.Core
|
if isSetLogger {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
var coreList []zapcore.Core
|
||||||
if logger.OpenConsole == nil || *logger.OpenConsole {
|
if logger.OpenConsole == nil || *logger.OpenConsole {
|
||||||
core := zapcore.NewCore(logger.Encoder, zapcore.AddSync(os.Stdout), logger.LogLevel)
|
core := zapcore.NewCore(logger.Encoder, zapcore.AddSync(os.Stdout), logger.LogLevel)
|
||||||
coreList = append(coreList, core)
|
coreList = append(coreList, core)
|
||||||
}
|
}
|
||||||
|
|
||||||
if logger.LogPath != "" {
|
if logger.CoreList != nil {
|
||||||
writeSyncer := zapcore.AddSync(logger.LogConfig)
|
coreList = append(coreList, logger.CoreList...)
|
||||||
core := zapcore.NewCore(logger.Encoder, writeSyncer, logger.LogLevel)
|
}else if logger.LogPath != "" {
|
||||||
|
WriteSyncer := zapcore.AddSync(logger.LogConfig)
|
||||||
|
core := zapcore.NewCore(logger.Encoder, WriteSyncer, logger.LogLevel)
|
||||||
coreList = append(coreList, core)
|
coreList = append(coreList, core)
|
||||||
}
|
}
|
||||||
|
|
||||||
core := zapcore.NewTee(coreList...)
|
core := zapcore.NewTee(coreList...)
|
||||||
logger.Logger = zap.New(core, zap.AddCaller(), zap.AddStacktrace(logger), zap.AddCallerSkip(1))
|
logger.Logger = zap.New(core, zap.AddCaller(), zap.AddStacktrace(logger), zap.AddCallerSkip(1+logger.Skip))
|
||||||
logger.sugaredLogger = logger.Logger.Sugar()
|
logger.SugaredLogger = logger.Logger.Sugar()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (logger *Logger) Debug(msg string, fields ...zap.Field) {
|
func (logger *Logger) Debug(msg string, fields ...zap.Field) {
|
||||||
@@ -164,85 +175,85 @@ func Fatal(msg string, fields ...zap.Field) {
|
|||||||
gLogger.stack = false
|
gLogger.stack = false
|
||||||
}
|
}
|
||||||
|
|
||||||
func Debugf(msg string, args ...any) {
|
func Debugf(template string, args ...any) {
|
||||||
gLogger.sugaredLogger.Debugf(msg, args...)
|
gLogger.SugaredLogger.Debugf(template, args...)
|
||||||
}
|
}
|
||||||
|
|
||||||
func Infof(msg string, args ...any) {
|
func Infof(template string, args ...any) {
|
||||||
gLogger.sugaredLogger.Infof(msg, args...)
|
gLogger.SugaredLogger.Infof(template, args...)
|
||||||
}
|
}
|
||||||
|
|
||||||
func Warnf(msg string, args ...any) {
|
func Warnf(template string, args ...any) {
|
||||||
gLogger.sugaredLogger.Warnf(msg, args...)
|
gLogger.SugaredLogger.Warnf(template, args...)
|
||||||
}
|
}
|
||||||
|
|
||||||
func Errorf(msg string, args ...any) {
|
func Errorf(template string, args ...any) {
|
||||||
gLogger.sugaredLogger.Errorf(msg, args...)
|
gLogger.SugaredLogger.Errorf(template, args...)
|
||||||
}
|
}
|
||||||
|
|
||||||
func StackErrorf(msg string, args ...any) {
|
func StackErrorf(template string, args ...any) {
|
||||||
gLogger.stack = true
|
gLogger.stack = true
|
||||||
gLogger.sugaredLogger.Errorf(msg, args...)
|
gLogger.SugaredLogger.Errorf(template, args...)
|
||||||
gLogger.stack = false
|
gLogger.stack = false
|
||||||
}
|
}
|
||||||
|
|
||||||
func Fatalf(msg string, args ...any) {
|
func Fatalf(template string, args ...any) {
|
||||||
gLogger.sugaredLogger.Fatalf(msg, args...)
|
gLogger.SugaredLogger.Fatalf(template, args...)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (logger *Logger) SDebug(args ...interface{}) {
|
func (logger *Logger) SDebug(args ...interface{}) {
|
||||||
logger.sugaredLogger.Debugln(args...)
|
logger.SugaredLogger.Debugln(args...)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (logger *Logger) SInfo(args ...interface{}) {
|
func (logger *Logger) SInfo(args ...interface{}) {
|
||||||
logger.sugaredLogger.Infoln(args...)
|
logger.SugaredLogger.Infoln(args...)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (logger *Logger) SWarn(args ...interface{}) {
|
func (logger *Logger) SWarn(args ...interface{}) {
|
||||||
logger.sugaredLogger.Warnln(args...)
|
logger.SugaredLogger.Warnln(args...)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (logger *Logger) SError(args ...interface{}) {
|
func (logger *Logger) SError(args ...interface{}) {
|
||||||
logger.sugaredLogger.Errorln(args...)
|
logger.SugaredLogger.Errorln(args...)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (logger *Logger) SStackError(args ...interface{}) {
|
func (logger *Logger) SStackError(args ...interface{}) {
|
||||||
gLogger.stack = true
|
gLogger.stack = true
|
||||||
logger.sugaredLogger.Errorln(args...)
|
logger.SugaredLogger.Errorln(args...)
|
||||||
gLogger.stack = false
|
gLogger.stack = false
|
||||||
}
|
}
|
||||||
|
|
||||||
func (logger *Logger) SFatal(args ...interface{}) {
|
func (logger *Logger) SFatal(args ...interface{}) {
|
||||||
gLogger.stack = true
|
gLogger.stack = true
|
||||||
logger.sugaredLogger.Fatalln(args...)
|
logger.SugaredLogger.Fatalln(args...)
|
||||||
gLogger.stack = false
|
gLogger.stack = false
|
||||||
}
|
}
|
||||||
|
|
||||||
func SDebug(args ...interface{}) {
|
func SDebug(args ...interface{}) {
|
||||||
gLogger.sugaredLogger.Debugln(args...)
|
gLogger.SugaredLogger.Debugln(args...)
|
||||||
}
|
}
|
||||||
|
|
||||||
func SInfo(args ...interface{}) {
|
func SInfo(args ...interface{}) {
|
||||||
gLogger.sugaredLogger.Infoln(args...)
|
gLogger.SugaredLogger.Infoln(args...)
|
||||||
}
|
}
|
||||||
|
|
||||||
func SWarn(args ...interface{}) {
|
func SWarn(args ...interface{}) {
|
||||||
gLogger.sugaredLogger.Warnln(args...)
|
gLogger.SugaredLogger.Warnln(args...)
|
||||||
}
|
}
|
||||||
|
|
||||||
func SError(args ...interface{}) {
|
func SError(args ...interface{}) {
|
||||||
gLogger.sugaredLogger.Errorln(args...)
|
gLogger.SugaredLogger.Errorln(args...)
|
||||||
}
|
}
|
||||||
|
|
||||||
func SStackError(args ...interface{}) {
|
func SStackError(args ...interface{}) {
|
||||||
gLogger.stack = true
|
gLogger.stack = true
|
||||||
gLogger.sugaredLogger.Errorln(args...)
|
gLogger.SugaredLogger.Errorln(args...)
|
||||||
gLogger.stack = false
|
gLogger.stack = false
|
||||||
}
|
}
|
||||||
|
|
||||||
func SFatal(args ...interface{}) {
|
func SFatal(args ...interface{}) {
|
||||||
gLogger.stack = true
|
gLogger.stack = true
|
||||||
gLogger.sugaredLogger.Fatalln(args...)
|
gLogger.SugaredLogger.Fatalln(args...)
|
||||||
gLogger.stack = false
|
gLogger.stack = false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
16
node/node.go
16
node/node.go
@@ -230,7 +230,6 @@ func initLog() error {
|
|||||||
localNodeInfo := cluster.GetCluster().GetLocalNodeInfo()
|
localNodeInfo := cluster.GetCluster().GetLocalNodeInfo()
|
||||||
fileName := fmt.Sprintf("%s.log", localNodeInfo.NodeId)
|
fileName := fmt.Sprintf("%s.log", localNodeInfo.NodeId)
|
||||||
logger.FileName = fileName
|
logger.FileName = fileName
|
||||||
filepath.Join()
|
|
||||||
logger.LogConfig.Filename = filepath.Join(logger.LogPath, logger.FileName)
|
logger.LogConfig.Filename = filepath.Join(logger.LogPath, logger.FileName)
|
||||||
|
|
||||||
logger.Init()
|
logger.Init()
|
||||||
@@ -330,13 +329,13 @@ func startNode(args interface{}) error {
|
|||||||
myName, mErr := sysprocess.GetMyProcessName()
|
myName, mErr := sysprocess.GetMyProcessName()
|
||||||
//当前进程名获取失败,不应该发生
|
//当前进程名获取失败,不应该发生
|
||||||
if mErr != nil {
|
if mErr != nil {
|
||||||
log.Info("get my process's name is error", log.ErrorField("err", mErr))
|
log.Error("get my process's name is error", log.ErrorField("err", mErr))
|
||||||
os.Exit(-1)
|
os.Exit(-1)
|
||||||
}
|
}
|
||||||
|
|
||||||
//进程id存在,而且进程名也相同,被认为是当前进程重复运行
|
//进程id存在,而且进程名也相同,被认为是当前进程重复运行
|
||||||
if cErr == nil && name == myName {
|
if cErr == nil && name == myName {
|
||||||
log.Info("repeat runs are not allowed", log.String("nodeId", strNodeId), log.Int("processId", processId))
|
log.Error("repeat runs are not allowed", log.String("nodeId", strNodeId), log.Int("processId", processId))
|
||||||
os.Exit(-1)
|
os.Exit(-1)
|
||||||
}
|
}
|
||||||
break
|
break
|
||||||
@@ -354,10 +353,13 @@ func startNode(args interface{}) error {
|
|||||||
service.Start()
|
service.Start()
|
||||||
|
|
||||||
//5.运行集群
|
//5.运行集群
|
||||||
cluster.GetCluster().Start()
|
err := cluster.GetCluster().Start()
|
||||||
|
if err != nil {
|
||||||
|
log.Error(err.Error())
|
||||||
|
os.Exit(-1)
|
||||||
|
}
|
||||||
|
|
||||||
//6.监听程序退出信号&性能报告
|
//6.监听程序退出信号&性能报告
|
||||||
|
|
||||||
var pProfilerTicker *time.Ticker = &time.Ticker{}
|
var pProfilerTicker *time.Ticker = &time.Ticker{}
|
||||||
if profilerInterval > 0 {
|
if profilerInterval > 0 {
|
||||||
pProfilerTicker = time.NewTicker(profilerInterval)
|
pProfilerTicker = time.NewTicker(profilerInterval)
|
||||||
@@ -484,9 +486,9 @@ func setLogPath(args interface{}) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
err = os.Mkdir(log.GetLogger().LogPath, os.ModePerm)
|
err = os.MkdirAll(logPath, os.ModePerm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.New("Cannot create dir " + log.GetLogger().LogPath)
|
return errors.New("Cannot create dir " + logPath)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -32,7 +32,7 @@ type IRealClient interface {
|
|||||||
SetConn(conn *network.NetConn)
|
SetConn(conn *network.NetConn)
|
||||||
Close(waitDone bool)
|
Close(waitDone bool)
|
||||||
|
|
||||||
AsyncCall(NodeId string, timeout time.Duration, rpcHandler IRpcHandler, serviceMethod string, callback reflect.Value, args interface{}, replyParam interface{}, cancelable bool) (CancelRpc, error)
|
AsyncCall(NodeId string, timeout time.Duration, rpcHandler IRpcHandler, serviceMethod string, callback reflect.Value, args interface{}, replyParam interface{}) (CancelRpc, error)
|
||||||
Go(NodeId string, timeout time.Duration, rpcHandler IRpcHandler, noReply bool, serviceMethod string, args interface{}, reply interface{}) *Call
|
Go(NodeId string, timeout time.Duration, rpcHandler IRpcHandler, noReply bool, serviceMethod string, args interface{}, reply interface{}) *Call
|
||||||
RawGo(NodeId string, timeout time.Duration, rpcHandler IRpcHandler, processor IRpcProcessor, noReply bool, rpcMethodId uint32, serviceMethod string, rawArgs []byte, reply interface{}) *Call
|
RawGo(NodeId string, timeout time.Duration, rpcHandler IRpcHandler, processor IRpcProcessor, noReply bool, rpcMethodId uint32, serviceMethod string, rawArgs []byte, reply interface{}) *Call
|
||||||
IsConnected() bool
|
IsConnected() bool
|
||||||
@@ -211,7 +211,7 @@ func (client *Client) rawGo(nodeId string, w IWriter, timeout time.Duration, rpc
|
|||||||
return call
|
return call
|
||||||
}
|
}
|
||||||
|
|
||||||
func (client *Client) asyncCall(nodeId string, w IWriter, timeout time.Duration, rpcHandler IRpcHandler, serviceMethod string, callback reflect.Value, args interface{}, replyParam interface{}, cancelable bool) (CancelRpc, error) {
|
func (client *Client) asyncCall(nodeId string, w IWriter, timeout time.Duration, rpcHandler IRpcHandler, serviceMethod string, callback reflect.Value, args interface{}, replyParam interface{}) (CancelRpc, error) {
|
||||||
processorType, processor := GetProcessorType(args)
|
processorType, processor := GetProcessorType(args)
|
||||||
InParam, herr := processor.Marshal(args)
|
InParam, herr := processor.Marshal(args)
|
||||||
if herr != nil {
|
if herr != nil {
|
||||||
@@ -264,10 +264,7 @@ func (client *Client) asyncCall(nodeId string, w IWriter, timeout time.Duration,
|
|||||||
return emptyCancelRpc, err
|
return emptyCancelRpc, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if cancelable {
|
|
||||||
rpcCancel := RpcCancel{CallSeq: seq, Cli: client}
|
|
||||||
return rpcCancel.CancelRpc, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
return emptyCancelRpc, nil
|
rpcCancel := RpcCancel{CallSeq: seq, Cli: client}
|
||||||
|
return rpcCancel.CancelRpc, nil
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -90,7 +90,7 @@ func (lc *LClient) RawGo(nodeId string, timeout time.Duration, rpcHandler IRpcHa
|
|||||||
return pLocalRpcServer.selfNodeRpcHandlerGo(timeout, processor, lc.selfClient, true, serviceName, rpcMethodId, serviceName, nil, nil, rawArgs)
|
return pLocalRpcServer.selfNodeRpcHandlerGo(timeout, processor, lc.selfClient, true, serviceName, rpcMethodId, serviceName, nil, nil, rawArgs)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (lc *LClient) AsyncCall(nodeId string, timeout time.Duration, rpcHandler IRpcHandler, serviceMethod string, callback reflect.Value, args interface{}, reply interface{}, cancelable bool) (CancelRpc, error) {
|
func (lc *LClient) AsyncCall(nodeId string, timeout time.Duration, rpcHandler IRpcHandler, serviceMethod string, callback reflect.Value, args interface{}, reply interface{}) (CancelRpc, error) {
|
||||||
pLocalRpcServer := rpcHandler.GetRpcServer()()
|
pLocalRpcServer := rpcHandler.GetRpcServer()()
|
||||||
|
|
||||||
//判断是否是同一服务
|
//判断是否是同一服务
|
||||||
@@ -109,7 +109,7 @@ func (lc *LClient) AsyncCall(nodeId string, timeout time.Duration, rpcHandler IR
|
|||||||
}
|
}
|
||||||
|
|
||||||
//其他的rpcHandler的处理器
|
//其他的rpcHandler的处理器
|
||||||
cancelRpc, err := pLocalRpcServer.selfNodeRpcHandlerAsyncGo(timeout, lc.selfClient, rpcHandler, false, serviceName, serviceMethod, args, reply, callback, cancelable)
|
cancelRpc, err := pLocalRpcServer.selfNodeRpcHandlerAsyncGo(timeout, lc.selfClient, rpcHandler, false, serviceName, serviceMethod, args, reply, callback)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
callback.Call([]reflect.Value{reflect.ValueOf(reply), reflect.ValueOf(err)})
|
callback.Call([]reflect.Value{reflect.ValueOf(reply), reflect.ValueOf(err)})
|
||||||
}
|
}
|
||||||
@@ -121,9 +121,6 @@ func NewLClient(localNodeId string, callSet *CallSet) *Client {
|
|||||||
client := &Client{}
|
client := &Client{}
|
||||||
client.clientId = atomic.AddUint32(&clientSeq, 1)
|
client.clientId = atomic.AddUint32(&clientSeq, 1)
|
||||||
client.targetNodeId = localNodeId
|
client.targetNodeId = localNodeId
|
||||||
//client.maxCheckCallRpcCount = DefaultMaxCheckCallRpcCount
|
|
||||||
//client.callRpcTimeout = DefaultRpcTimeout
|
|
||||||
|
|
||||||
lClient := &LClient{}
|
lClient := &LClient{}
|
||||||
lClient.selfClient = client
|
lClient.selfClient = client
|
||||||
client.IRealClient = lClient
|
client.IRealClient = lClient
|
||||||
|
|||||||
@@ -127,7 +127,7 @@ func (server *BaseServer) selfNodeRpcHandlerGo(timeout time.Duration, processor
|
|||||||
return pCall
|
return pCall
|
||||||
}
|
}
|
||||||
|
|
||||||
func (server *BaseServer) selfNodeRpcHandlerAsyncGo(timeout time.Duration, client *Client, callerRpcHandler IRpcHandler, noReply bool, handlerName string, serviceMethod string, args interface{}, reply interface{}, callback reflect.Value, cancelable bool) (CancelRpc, error) {
|
func (server *BaseServer) selfNodeRpcHandlerAsyncGo(timeout time.Duration, client *Client, callerRpcHandler IRpcHandler, noReply bool, handlerName string, serviceMethod string, args interface{}, reply interface{}, callback reflect.Value) (CancelRpc, error) {
|
||||||
rpcHandler := server.rpcHandleFinder.FindRpcHandler(handlerName)
|
rpcHandler := server.rpcHandleFinder.FindRpcHandler(handlerName)
|
||||||
if rpcHandler == nil {
|
if rpcHandler == nil {
|
||||||
err := errors.New("service method " + serviceMethod + " not config!")
|
err := errors.New("service method " + serviceMethod + " not config!")
|
||||||
|
|||||||
@@ -63,8 +63,8 @@ func (nc *NatsClient) RawGo(nodeId string, timeout time.Duration, rpcHandler IRp
|
|||||||
return nc.client.rawGo(nodeId, nc, timeout, rpcHandler, processor, noReply, rpcMethodId, serviceMethod, rawArgs, reply)
|
return nc.client.rawGo(nodeId, nc, timeout, rpcHandler, processor, noReply, rpcMethodId, serviceMethod, rawArgs, reply)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (nc *NatsClient) AsyncCall(nodeId string, timeout time.Duration, rpcHandler IRpcHandler, serviceMethod string, callback reflect.Value, args interface{}, replyParam interface{}, cancelable bool) (CancelRpc, error) {
|
func (nc *NatsClient) AsyncCall(nodeId string, timeout time.Duration, rpcHandler IRpcHandler, serviceMethod string, callback reflect.Value, args interface{}, replyParam interface{}) (CancelRpc, error) {
|
||||||
cancelRpc, err := nc.client.asyncCall(nodeId, nc, timeout, rpcHandler, serviceMethod, callback, args, replyParam, cancelable)
|
cancelRpc, err := nc.client.asyncCall(nodeId, nc, timeout, rpcHandler, serviceMethod, callback, args, replyParam)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
callback.Call([]reflect.Value{reflect.ValueOf(replyParam), reflect.ValueOf(err)})
|
callback.Call([]reflect.Value{reflect.ValueOf(replyParam), reflect.ValueOf(err)})
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -62,8 +62,8 @@ func (rc *RClient) RawGo(nodeId string, timeout time.Duration, rpcHandler IRpcHa
|
|||||||
return rc.selfClient.rawGo(nodeId, rc, timeout, rpcHandler, processor, noReply, rpcMethodId, serviceMethod, rawArgs, reply)
|
return rc.selfClient.rawGo(nodeId, rc, timeout, rpcHandler, processor, noReply, rpcMethodId, serviceMethod, rawArgs, reply)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (rc *RClient) AsyncCall(nodeId string, timeout time.Duration, rpcHandler IRpcHandler, serviceMethod string, callback reflect.Value, args interface{}, replyParam interface{}, cancelable bool) (CancelRpc, error) {
|
func (rc *RClient) AsyncCall(nodeId string, timeout time.Duration, rpcHandler IRpcHandler, serviceMethod string, callback reflect.Value, args interface{}, replyParam interface{}) (CancelRpc, error) {
|
||||||
cancelRpc, err := rc.selfClient.asyncCall(nodeId, rc, timeout, rpcHandler, serviceMethod, callback, args, replyParam, cancelable)
|
cancelRpc, err := rc.selfClient.asyncCall(nodeId, rc, timeout, rpcHandler, serviceMethod, callback, args, replyParam)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
callback.Call([]reflect.Value{reflect.ValueOf(replyParam), reflect.ValueOf(err)})
|
callback.Call([]reflect.Value{reflect.ValueOf(replyParam), reflect.ValueOf(err)})
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -164,13 +164,6 @@ func (handler *RpcHandler) suitableMethods(method reflect.Method) error {
|
|||||||
//取出输入参数类型
|
//取出输入参数类型
|
||||||
var rpcMethodInfo RpcMethodInfo
|
var rpcMethodInfo RpcMethodInfo
|
||||||
typ := method.Type
|
typ := method.Type
|
||||||
if typ.NumOut() != 1 {
|
|
||||||
return fmt.Errorf("%s The number of returned arguments must be 1", method.Name)
|
|
||||||
}
|
|
||||||
|
|
||||||
if typ.Out(0).String() != "error" {
|
|
||||||
return fmt.Errorf("%s The return parameter must be of type error", method.Name)
|
|
||||||
}
|
|
||||||
|
|
||||||
if typ.NumIn() < 2 || typ.NumIn() > 4 {
|
if typ.NumIn() < 2 || typ.NumIn() > 4 {
|
||||||
return fmt.Errorf("%s Unsupported parameter format", method.Name)
|
return fmt.Errorf("%s Unsupported parameter format", method.Name)
|
||||||
@@ -183,6 +176,18 @@ func (handler *RpcHandler) suitableMethods(method reflect.Method) error {
|
|||||||
rpcMethodInfo.hasResponder = true
|
rpcMethodInfo.hasResponder = true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if rpcMethodInfo.hasResponder && typ.NumOut() > 0 {
|
||||||
|
return fmt.Errorf("%s should not have return parameters", method.Name)
|
||||||
|
}
|
||||||
|
|
||||||
|
if !rpcMethodInfo.hasResponder && typ.NumOut() != 1 {
|
||||||
|
return fmt.Errorf("%s The number of returned arguments must be 1", method.Name)
|
||||||
|
}
|
||||||
|
|
||||||
|
if !rpcMethodInfo.hasResponder && typ.Out(0).String() != "error" {
|
||||||
|
return fmt.Errorf("%s The return parameter must be of type error", method.Name)
|
||||||
|
}
|
||||||
|
|
||||||
for i := parIdx; i < typ.NumIn(); i++ {
|
for i := parIdx; i < typ.NumIn(); i++ {
|
||||||
if handler.isExportedOrBuiltinType(typ.In(i)) == false {
|
if handler.isExportedOrBuiltinType(typ.In(i)) == false {
|
||||||
return fmt.Errorf("%s Unsupported parameter types", method.Name)
|
return fmt.Errorf("%s Unsupported parameter types", method.Name)
|
||||||
@@ -307,9 +312,11 @@ func (handler *RpcHandler) HandlerRpcRequest(request *RpcRequest) {
|
|||||||
|
|
||||||
requestHandle := request.requestHandle
|
requestHandle := request.requestHandle
|
||||||
returnValues := v.method.Func.Call(paramList)
|
returnValues := v.method.Func.Call(paramList)
|
||||||
errInter := returnValues[0].Interface()
|
if len(returnValues) > 0 {
|
||||||
if errInter != nil {
|
errInter := returnValues[0].Interface()
|
||||||
err = errInter.(error)
|
if errInter != nil {
|
||||||
|
err = errInter.(error)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if v.hasResponder == false && requestHandle != nil {
|
if v.hasResponder == false && requestHandle != nil {
|
||||||
@@ -526,8 +533,7 @@ func (handler *RpcHandler) asyncCallRpc(timeout time.Duration, nodeId string, se
|
|||||||
}
|
}
|
||||||
|
|
||||||
//2.rpcClient调用
|
//2.rpcClient调用
|
||||||
//如果调用本结点服务
|
return pClientList[0].AsyncCall(pClientList[0].GetTargetNodeId(), timeout, handler.rpcHandler, serviceMethod, fVal, args, reply, )
|
||||||
return pClientList[0].AsyncCall(pClientList[0].GetTargetNodeId(), timeout, handler.rpcHandler, serviceMethod, fVal, args, reply, false)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (handler *RpcHandler) GetName() string {
|
func (handler *RpcHandler) GetName() string {
|
||||||
|
|||||||
@@ -27,9 +27,6 @@ func (rn *RpcNats) NewNatsClient(targetNodeId string,localNodeId string,callSet
|
|||||||
|
|
||||||
client.clientId = atomic.AddUint32(&clientSeq, 1)
|
client.clientId = atomic.AddUint32(&clientSeq, 1)
|
||||||
client.targetNodeId = targetNodeId
|
client.targetNodeId = targetNodeId
|
||||||
//client.maxCheckCallRpcCount = DefaultMaxCheckCallRpcCount
|
|
||||||
//client.callRpcTimeout = DefaultRpcTimeout
|
|
||||||
|
|
||||||
natsClient := &rn.NatsClient
|
natsClient := &rn.NatsClient
|
||||||
natsClient.localNodeId = localNodeId
|
natsClient.localNodeId = localNodeId
|
||||||
natsClient.client = &client
|
natsClient.client = &client
|
||||||
|
|||||||
@@ -31,7 +31,7 @@ type IServer interface {
|
|||||||
|
|
||||||
selfNodeRpcHandlerGo(timeout time.Duration, processor IRpcProcessor, client *Client, noReply bool, handlerName string, rpcMethodId uint32, serviceMethod string, args interface{}, reply interface{}, rawArgs []byte) *Call
|
selfNodeRpcHandlerGo(timeout time.Duration, processor IRpcProcessor, client *Client, noReply bool, handlerName string, rpcMethodId uint32, serviceMethod string, args interface{}, reply interface{}, rawArgs []byte) *Call
|
||||||
myselfRpcHandlerGo(client *Client, handlerName string, serviceMethod string, args interface{}, callBack reflect.Value, reply interface{}) error
|
myselfRpcHandlerGo(client *Client, handlerName string, serviceMethod string, args interface{}, callBack reflect.Value, reply interface{}) error
|
||||||
selfNodeRpcHandlerAsyncGo(timeout time.Duration, client *Client, callerRpcHandler IRpcHandler, noReply bool, handlerName string, serviceMethod string, args interface{}, reply interface{}, callback reflect.Value, cancelable bool) (CancelRpc, error)
|
selfNodeRpcHandlerAsyncGo(timeout time.Duration, client *Client, callerRpcHandler IRpcHandler, noReply bool, handlerName string, serviceMethod string, args interface{}, reply interface{}, callback reflect.Value) (CancelRpc, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
type writeResponse func(processor IRpcProcessor, connTag string, serviceMethod string, seq uint64, reply interface{}, rpcError RpcError)
|
type writeResponse func(processor IRpcProcessor, connTag string, serviceMethod string, seq uint64, reply interface{}, rpcError RpcError)
|
||||||
|
|||||||
@@ -11,6 +11,7 @@ import (
|
|||||||
"github.com/duanhf2012/origin/v2/log"
|
"github.com/duanhf2012/origin/v2/log"
|
||||||
rpcHandle "github.com/duanhf2012/origin/v2/rpc"
|
rpcHandle "github.com/duanhf2012/origin/v2/rpc"
|
||||||
"github.com/duanhf2012/origin/v2/util/timer"
|
"github.com/duanhf2012/origin/v2/util/timer"
|
||||||
|
"slices"
|
||||||
)
|
)
|
||||||
|
|
||||||
const InitModuleId = 1e9
|
const InitModuleId = 1e9
|
||||||
@@ -46,7 +47,7 @@ type Module struct {
|
|||||||
moduleName string //模块名称
|
moduleName string //模块名称
|
||||||
parent IModule //父亲
|
parent IModule //父亲
|
||||||
self IModule //自己
|
self IModule //自己
|
||||||
child map[uint32]IModule //孩子们
|
child []IModule //孩子们
|
||||||
mapActiveTimer map[timer.ITimer]struct{}
|
mapActiveTimer map[timer.ITimer]struct{}
|
||||||
mapActiveIdTimer map[uint64]timer.ITimer
|
mapActiveIdTimer map[uint64]timer.ITimer
|
||||||
dispatcher *timer.Dispatcher //timer
|
dispatcher *timer.Dispatcher //timer
|
||||||
@@ -93,10 +94,7 @@ func (m *Module) AddModule(module IModule) (uint32, error) {
|
|||||||
pAddModule.moduleId = m.NewModuleId()
|
pAddModule.moduleId = m.NewModuleId()
|
||||||
}
|
}
|
||||||
|
|
||||||
if m.child == nil {
|
_,ok := m.ancestor.getBaseModule().(*Module).descendants[module.GetModuleId()]
|
||||||
m.child = map[uint32]IModule{}
|
|
||||||
}
|
|
||||||
_, ok := m.child[module.GetModuleId()]
|
|
||||||
if ok == true {
|
if ok == true {
|
||||||
return 0, fmt.Errorf("exists module id %d", module.GetModuleId())
|
return 0, fmt.Errorf("exists module id %d", module.GetModuleId())
|
||||||
}
|
}
|
||||||
@@ -109,29 +107,33 @@ func (m *Module) AddModule(module IModule) (uint32, error) {
|
|||||||
pAddModule.eventHandler = event.NewEventHandler()
|
pAddModule.eventHandler = event.NewEventHandler()
|
||||||
pAddModule.eventHandler.Init(m.eventHandler.GetEventProcessor())
|
pAddModule.eventHandler.Init(m.eventHandler.GetEventProcessor())
|
||||||
pAddModule.IConcurrent = m.IConcurrent
|
pAddModule.IConcurrent = m.IConcurrent
|
||||||
|
|
||||||
|
m.child = append(m.child,module)
|
||||||
|
m.ancestor.getBaseModule().(*Module).descendants[module.GetModuleId()] = module
|
||||||
|
|
||||||
err := module.OnInit()
|
err := module.OnInit()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
delete(m.ancestor.getBaseModule().(*Module).descendants, module.GetModuleId())
|
||||||
|
m.child = m.child[:len(m.child)-1]
|
||||||
|
log.Error("module OnInit error",log.String("ModuleName",module.GetModuleName()),log.ErrorField("err",err))
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
|
|
||||||
m.child[module.GetModuleId()] = module
|
|
||||||
m.ancestor.getBaseModule().(*Module).descendants[module.GetModuleId()] = module
|
|
||||||
|
|
||||||
log.Debug("Add module " + module.GetModuleName() + " completed")
|
log.Debug("Add module " + module.GetModuleName() + " completed")
|
||||||
return module.GetModuleId(), nil
|
return module.GetModuleId(), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Module) ReleaseModule(moduleId uint32) {
|
func (m *Module) ReleaseModule(moduleId uint32) {
|
||||||
pModule := m.GetModule(moduleId).getBaseModule().(*Module)
|
pModule := m.GetModule(moduleId).getBaseModule().(*Module)
|
||||||
|
pModule.self.OnRelease()
|
||||||
|
log.Debug("Release module " + pModule.GetModuleName())
|
||||||
|
|
||||||
//释放子孙
|
for i:=len(pModule.child)-1; i>=0; i-- {
|
||||||
for id := range pModule.child {
|
m.ReleaseModule(pModule.child[i].GetModuleId())
|
||||||
m.ReleaseModule(id)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pModule.self.OnRelease()
|
|
||||||
pModule.GetEventHandler().Destroy()
|
pModule.GetEventHandler().Destroy()
|
||||||
log.Debug("Release module " + pModule.GetModuleName())
|
|
||||||
for pTimer := range pModule.mapActiveTimer {
|
for pTimer := range pModule.mapActiveTimer {
|
||||||
pTimer.Cancel()
|
pTimer.Cancel()
|
||||||
}
|
}
|
||||||
@@ -140,7 +142,10 @@ func (m *Module) ReleaseModule(moduleId uint32) {
|
|||||||
t.Cancel()
|
t.Cancel()
|
||||||
}
|
}
|
||||||
|
|
||||||
delete(m.child, moduleId)
|
m.child = slices.DeleteFunc(m.child, func(module IModule) bool {
|
||||||
|
return module.GetModuleId() == moduleId
|
||||||
|
})
|
||||||
|
|
||||||
delete(m.ancestor.getBaseModule().(*Module).descendants, moduleId)
|
delete(m.ancestor.getBaseModule().(*Module).descendants, moduleId)
|
||||||
|
|
||||||
//清理被删除的Module
|
//清理被删除的Module
|
||||||
|
|||||||
@@ -266,8 +266,10 @@ func (s *Service) Release() {
|
|||||||
|
|
||||||
if atomic.AddInt32(&s.isRelease, -1) == -1 {
|
if atomic.AddInt32(&s.isRelease, -1) == -1 {
|
||||||
s.self.OnRelease()
|
s.self.OnRelease()
|
||||||
|
for i:=len(s.child)-1; i>=0; i-- {
|
||||||
|
s.ReleaseModule(s.child[i].GetModuleId())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Service) OnRelease() {
|
func (s *Service) OnRelease() {
|
||||||
@@ -432,6 +434,7 @@ func (s *Service) SetEventChannelNum(num int) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Deprecated: replace it with the OpenConcurrent function
|
||||||
func (s *Service) SetGoRoutineNum(goroutineNum int32) bool {
|
func (s *Service) SetGoRoutineNum(goroutineNum int32) bool {
|
||||||
//已经开始状态不允许修改协程数量,打开性能分析器不允许开多线程
|
//已经开始状态不允许修改协程数量,打开性能分析器不允许开多线程
|
||||||
if s.startStatus == true || s.profiler != nil {
|
if s.startStatus == true || s.profiler != nil {
|
||||||
|
|||||||
Reference in New Issue
Block a user