Compare commits

...

23 Commits

Author SHA1 Message Date
boyce
6c44ba180c 优化kafkamoudule日志 2025-05-21 21:38:36 +08:00
boyce
ecfd42bdec 优化消息队列日志 2025-05-21 21:34:32 +08:00
boyce
01d3b3e535 优化RankService日志 2025-05-21 21:22:24 +08:00
boyce
550d65a354 优化mysql模块日志 2025-05-21 21:19:03 +08:00
boyce
15580ffce9 新增wss证书配置支持 2025-04-21 21:23:34 +08:00
boyce
bd467a219b 废弃掉HttpService、TcpService、WSService 2025-03-28 10:33:51 +08:00
boyce
af15615345 优化日志生成路径 2025-03-14 18:03:01 +08:00
boyce
50dd80b082 整理代码 2025-03-10 11:35:19 +08:00
boyce
a6487dd41e 将默认日志改为rotatelogs 2025-01-25 00:14:18 +08:00
boyce
d5299294d8 优化日志,新增rotatelogs库支持 2025-01-25 00:04:31 +08:00
boyce
4d36e525a5 优化配置读取,去消默认cluster目录 2025-01-16 13:45:06 +08:00
duanhf2012
3a4350769c 新增etcd认证配置 2025-01-08 18:11:20 +08:00
duanhf2012
d4966ea129 优化ws模块 2024-12-17 14:46:00 +08:00
duanhf2012
3b10eeb792 优化日志 2024-12-16 18:00:26 +08:00
duanhf2012
e3275e9f2a 优化模块释放顺序 2024-12-11 18:31:37 +08:00
duanhf2012
16745b34f0 优化日志 2024-12-11 17:49:59 +08:00
duanhf2012
f34dc7d53f 优化日志自定义Writer 2024-12-11 17:24:06 +08:00
duanhf2012
0a09dc2fee 优化日志 2024-12-11 17:14:29 +08:00
duanhf2012
f01a93c446 优化日志 2024-12-11 17:03:21 +08:00
duanhf2012
4d2ab4ee4f 优化代码 2024-12-11 16:44:09 +08:00
duanhf2012
ffcc5a3489 1.优化服务配置检查
2.废弃SetGoRoutineNum接口
3.释放Module优化
2024-12-06 16:05:25 +08:00
duanhf2012
cf6ca0483b Merge branch 'v2' of https://github.com/duanhf2012/origin into v2 2024-12-05 10:19:24 +08:00
duanhf2012
97a21e6f71 新增Skip接口 2024-12-05 10:19:15 +08:00
24 changed files with 371 additions and 206 deletions

View File

@@ -40,8 +40,6 @@ type NodeInfo struct {
DiscoveryService []DiscoveryService //筛选发现的服务,如果不配置,不进行筛选 DiscoveryService []DiscoveryService //筛选发现的服务,如果不配置,不进行筛选
status NodeStatus status NodeStatus
Retire bool Retire bool
NetworkName string
} }
type NodeRpcInfo struct { type NodeRpcInfo struct {

View File

@@ -3,24 +3,23 @@ package cluster
import "github.com/duanhf2012/origin/v2/rpc" import "github.com/duanhf2012/origin/v2/rpc"
type ConfigDiscovery struct { type ConfigDiscovery struct {
funDelNode FunDelNode funDelNode FunDelNode
funSetNode FunSetNode funSetNode FunSetNode
localNodeId string localNodeId string
} }
func (discovery *ConfigDiscovery) InitDiscovery(localNodeId string, funDelNode FunDelNode, funSetNode FunSetNode) error {
func (discovery *ConfigDiscovery) InitDiscovery(localNodeId string,funDelNode FunDelNode,funSetNode FunSetNode) error{
discovery.localNodeId = localNodeId discovery.localNodeId = localNodeId
discovery.funDelNode = funDelNode discovery.funDelNode = funDelNode
discovery.funSetNode = funSetNode discovery.funSetNode = funSetNode
//解析本地其他服务配置 //解析本地其他服务配置
_,nodeInfoList,_,err := GetCluster().readLocalClusterConfig(rpc.NodeIdNull) _, nodeInfoList, _, err := GetCluster().readLocalClusterConfig(rpc.NodeIdNull)
if err != nil { if err != nil {
return err return err
} }
for _,nodeInfo := range nodeInfoList { for _, nodeInfo := range nodeInfoList {
if nodeInfo.NodeId == localNodeId { if nodeInfo.NodeId == localNodeId {
continue continue
} }
@@ -30,5 +29,3 @@ func (discovery *ConfigDiscovery) InitDiscovery(localNodeId string,funDelNode Fu
return nil return nil
} }

View File

@@ -5,17 +5,17 @@ import (
"github.com/duanhf2012/origin/v2/service" "github.com/duanhf2012/origin/v2/service"
) )
func (cls *Cluster) setupDiscovery(localNodeId string, setupServiceFun SetupServiceFun) error{ func (cls *Cluster) setupDiscovery(localNodeId string, setupServiceFun SetupServiceFun) error {
if cls.discoveryInfo.getDiscoveryType() == OriginType { //origin类型服务发现 if cls.discoveryInfo.getDiscoveryType() == OriginType { //origin类型服务发现
return cls.setupOriginDiscovery(localNodeId,setupServiceFun) return cls.setupOriginDiscovery(localNodeId, setupServiceFun)
}else if cls.discoveryInfo.getDiscoveryType() == EtcdType{//etcd类型服务发现 } else if cls.discoveryInfo.getDiscoveryType() == EtcdType { //etcd类型服务发现
return cls.setupEtcdDiscovery(localNodeId,setupServiceFun) return cls.setupEtcdDiscovery(localNodeId, setupServiceFun)
} }
return cls.setupConfigDiscovery(localNodeId,setupServiceFun) return cls.setupConfigDiscovery(localNodeId, setupServiceFun)
} }
func (cls *Cluster) setupOriginDiscovery(localNodeId string, setupServiceFun SetupServiceFun) error{ func (cls *Cluster) setupOriginDiscovery(localNodeId string, setupServiceFun SetupServiceFun) error {
if cls.serviceDiscovery != nil { if cls.serviceDiscovery != nil {
return errors.New("service discovery has been setup") return errors.New("service discovery has been setup")
} }
@@ -27,6 +27,7 @@ func (cls *Cluster) setupOriginDiscovery(localNodeId string, setupServiceFun Set
} }
cls.serviceDiscovery = getOriginDiscovery() cls.serviceDiscovery = getOriginDiscovery()
//2.如果为动态服务发现安装本地发现服务 //2.如果为动态服务发现安装本地发现服务
if localMaster == true { if localMaster == true {
setupServiceFun(&masterService) setupServiceFun(&masterService)
@@ -36,11 +37,10 @@ func (cls *Cluster) setupOriginDiscovery(localNodeId string, setupServiceFun Set
setupServiceFun(&clientService) setupServiceFun(&clientService)
cls.AddDiscoveryService(OriginDiscoveryClientName, true) cls.AddDiscoveryService(OriginDiscoveryClientName, true)
return nil return nil
} }
func (cls *Cluster) setupEtcdDiscovery(localNodeId string, setupServiceFun SetupServiceFun) error{ func (cls *Cluster) setupEtcdDiscovery(localNodeId string, setupServiceFun SetupServiceFun) error {
if cls.serviceDiscovery != nil { if cls.serviceDiscovery != nil {
return errors.New("service discovery has been setup") return errors.New("service discovery has been setup")
} }
@@ -48,12 +48,12 @@ func (cls *Cluster) setupEtcdDiscovery(localNodeId string, setupServiceFun Setup
//setup etcd service //setup etcd service
cls.serviceDiscovery = getEtcdDiscovery() cls.serviceDiscovery = getEtcdDiscovery()
setupServiceFun(cls.serviceDiscovery.(service.IService)) setupServiceFun(cls.serviceDiscovery.(service.IService))
cls.AddDiscoveryService(cls.serviceDiscovery.(service.IService).GetName(),false) cls.AddDiscoveryService(cls.serviceDiscovery.(service.IService).GetName(), false)
return nil return nil
} }
func (cls *Cluster) setupConfigDiscovery(localNodeId string, setupServiceFun SetupServiceFun) error{ func (cls *Cluster) setupConfigDiscovery(localNodeId string, setupServiceFun SetupServiceFun) error {
if cls.serviceDiscovery != nil { if cls.serviceDiscovery != nil {
return errors.New("service discovery has been setup") return errors.New("service discovery has been setup")
} }

View File

@@ -1,6 +1,11 @@
package cluster package cluster
import ( import (
"context"
"crypto/tls"
"crypto/x509"
"errors"
"fmt"
"github.com/duanhf2012/origin/v2/event" "github.com/duanhf2012/origin/v2/event"
"github.com/duanhf2012/origin/v2/log" "github.com/duanhf2012/origin/v2/log"
"github.com/duanhf2012/origin/v2/rpc" "github.com/duanhf2012/origin/v2/rpc"
@@ -9,14 +14,11 @@ import (
"go.etcd.io/etcd/api/v3/mvccpb" "go.etcd.io/etcd/api/v3/mvccpb"
"go.etcd.io/etcd/client/v3" "go.etcd.io/etcd/client/v3"
"google.golang.org/protobuf/proto" "google.golang.org/protobuf/proto"
"time" "os"
"context"
"errors"
"fmt"
"go.uber.org/zap"
"path" "path"
"strings" "strings"
"sync/atomic" "sync/atomic"
"time"
) )
const originDir = "/origin" const originDir = "/origin"
@@ -40,8 +42,13 @@ type EtcdDiscoveryService struct {
mapDiscoveryNodeId map[string]map[string]struct{} //map[networkName]map[nodeId] mapDiscoveryNodeId map[string]map[string]struct{} //map[networkName]map[nodeId]
} }
var etcdDiscovery *EtcdDiscoveryService
func getEtcdDiscovery() IServiceDiscovery { func getEtcdDiscovery() IServiceDiscovery {
etcdDiscovery := &EtcdDiscoveryService{} if etcdDiscovery == nil {
etcdDiscovery = &EtcdDiscoveryService{}
}
return etcdDiscovery return etcdDiscovery
} }
@@ -87,15 +94,43 @@ func (ed *EtcdDiscoveryService) OnInit() error {
} }
for i := 0; i < len(etcdDiscoveryCfg.EtcdList); i++ { for i := 0; i < len(etcdDiscoveryCfg.EtcdList); i++ {
client, cerr := clientv3.New(clientv3.Config{ var client *clientv3.Client
var tlsConfig *tls.Config
if etcdDiscoveryCfg.EtcdList[i].Cert != "" {
// load cert
cert, cErr := tls.LoadX509KeyPair(etcdDiscoveryCfg.EtcdList[i].Cert, etcdDiscoveryCfg.EtcdList[i].CertKey)
if cErr != nil {
log.Error("load cert error", log.ErrorField("err", cErr))
return cErr
}
// load root ca
caData, cErr := os.ReadFile(etcdDiscoveryCfg.EtcdList[i].Ca)
if cErr != nil {
log.Error("load root ca error", log.ErrorField("err", cErr))
return cErr
}
pool := x509.NewCertPool()
pool.AppendCertsFromPEM(caData)
tlsConfig = &tls.Config{
Certificates: []tls.Certificate{cert},
RootCAs: pool,
}
}
client, err = clientv3.New(clientv3.Config{
Endpoints: etcdDiscoveryCfg.EtcdList[i].Endpoints, Endpoints: etcdDiscoveryCfg.EtcdList[i].Endpoints,
DialTimeout: etcdDiscoveryCfg.DialTimeoutMillisecond, DialTimeout: etcdDiscoveryCfg.DialTimeoutMillisecond,
Logger: zap.NewNop(), Username: etcdDiscoveryCfg.EtcdList[i].UserName,
Password: etcdDiscoveryCfg.EtcdList[i].Password,
Logger: log.GetLogger().Logger,
TLS: tlsConfig,
}) })
if cerr != nil { if err != nil {
log.Error("etcd discovery init fail", log.ErrorField("err", cerr)) log.Error("etcd discovery init fail", log.ErrorField("err", err))
return cerr return err
} }
ctx, _ := context.WithTimeout(context.Background(), time.Second*3) ctx, _ := context.WithTimeout(context.Background(), time.Second*3)

View File

@@ -7,6 +7,7 @@ import (
"github.com/duanhf2012/origin/v2/rpc" "github.com/duanhf2012/origin/v2/rpc"
jsoniter "github.com/json-iterator/go" jsoniter "github.com/json-iterator/go"
"gopkg.in/yaml.v3" "gopkg.in/yaml.v3"
"io/fs"
"os" "os"
"path/filepath" "path/filepath"
"strings" "strings"
@@ -15,9 +16,15 @@ import (
var json = jsoniter.ConfigCompatibleWithStandardLibrary var json = jsoniter.ConfigCompatibleWithStandardLibrary
type EtcdList struct { type EtcdList struct {
NetworkName []string NetworkName []string
Endpoints []string Endpoints []string
UserName string
Password string
Cert string
CertKey string
Ca string
} }
type EtcdDiscovery struct { type EtcdDiscovery struct {
@@ -64,12 +71,8 @@ type NodeInfoList struct {
NodeList []NodeInfo NodeList []NodeInfo
} }
func validConfigFile(f os.DirEntry) bool { func validConfigFile(f string) bool {
if f.IsDir() == true || (filepath.Ext(f.Name()) != ".json" && filepath.Ext(f.Name()) != ".yml" && filepath.Ext(f.Name()) != ".yaml") { return strings.HasSuffix(f, ".json")|| strings.HasSuffix(f, ".yml") || strings.HasSuffix(f, ".yaml")
return false
}
return true
} }
func yamlToJson(data []byte, v interface{}) ([]byte, error) { func yamlToJson(data []byte, v interface{}) ([]byte, error) {
@@ -271,32 +274,33 @@ func (cls *Cluster) readLocalClusterConfig(nodeId string) (DiscoveryInfo, []Node
var discoveryInfo DiscoveryInfo var discoveryInfo DiscoveryInfo
var rpcMode RpcMode var rpcMode RpcMode
clusterCfgPath := strings.TrimRight(configDir, "/") + "/cluster"
fileInfoList, err := os.ReadDir(clusterCfgPath)
if err != nil {
return discoveryInfo, nil, rpcMode, fmt.Errorf("read dir %s is fail :%+v", clusterCfgPath, err)
}
//读取任何文件,只读符合格式的配置,目录下的文件可以自定义分文件 //读取任何文件,只读符合格式的配置,目录下的文件可以自定义分文件
for _, f := range fileInfoList { err := filepath.Walk(configDir, func(path string, info fs.FileInfo, err error)error {
if !validConfigFile(f) { if info.IsDir() {
continue return nil
} }
filePath := strings.TrimRight(strings.TrimRight(clusterCfgPath, "/"), "\\") + "/" + f.Name() if err != nil {
fileNodeInfoList, rErr := cls.ReadClusterConfig(filePath) return err
}
if !validConfigFile(info.Name()) {
return nil
}
fileNodeInfoList, rErr := cls.ReadClusterConfig(path)
if rErr != nil { if rErr != nil {
return discoveryInfo, nil, rpcMode, fmt.Errorf("read file path %s is error:%+v", filePath, rErr) return fmt.Errorf("read file path %s is error:%+v", path, rErr)
} }
err = cls.SetRpcMode(&fileNodeInfoList.RpcMode, &rpcMode) err = cls.SetRpcMode(&fileNodeInfoList.RpcMode, &rpcMode)
if err != nil { if err != nil {
return discoveryInfo, nil, rpcMode, err return err
} }
err = discoveryInfo.setDiscovery(&fileNodeInfoList.Discovery) err = discoveryInfo.setDiscovery(&fileNodeInfoList.Discovery)
if err != nil { if err != nil {
return discoveryInfo, nil, rpcMode, err return err
} }
for _, nodeInfo := range fileNodeInfoList.NodeList { for _, nodeInfo := range fileNodeInfoList.NodeList {
@@ -304,6 +308,12 @@ func (cls *Cluster) readLocalClusterConfig(nodeId string) (DiscoveryInfo, []Node
nodeInfoList = append(nodeInfoList, nodeInfo) nodeInfoList = append(nodeInfoList, nodeInfo)
} }
} }
return nil
})
if err != nil {
return discoveryInfo, nil, rpcMode, err
} }
if nodeId != rpc.NodeIdNull && (len(nodeInfoList) != 1) { if nodeId != rpc.NodeIdNull && (len(nodeInfoList) != 1) {
@@ -325,32 +335,32 @@ func (cls *Cluster) readLocalClusterConfig(nodeId string) (DiscoveryInfo, []Node
} }
func (cls *Cluster) readLocalService(localNodeId string) error { func (cls *Cluster) readLocalService(localNodeId string) error {
clusterCfgPath := strings.TrimRight(configDir, "/") + "/cluster"
fileInfoList, err := os.ReadDir(clusterCfgPath)
if err != nil {
return fmt.Errorf("read dir %s is fail :%+v", clusterCfgPath, err)
}
var globalCfg interface{} var globalCfg interface{}
publicService := map[string]interface{}{} publicService := map[string]interface{}{}
nodeService := map[string]interface{}{} nodeService := map[string]interface{}{}
//读取任何文件,只读符合格式的配置,目录下的文件可以自定义分文件 //读取任何文件,只读符合格式的配置,目录下的文件可以自定义分文件
for _, f := range fileInfoList { err := filepath.Walk(configDir, func(path string, info fs.FileInfo, err error)error{
if !validConfigFile(f) { if info.IsDir() {
continue return nil
} }
filePath := strings.TrimRight(strings.TrimRight(clusterCfgPath, "/"), "\\") + "/" + f.Name()
currGlobalCfg, serviceConfig, mapNodeService, err := cls.readServiceConfig(filePath)
if err != nil { if err != nil {
continue return err
}
if !validConfigFile(info.Name()) {
return nil
}
currGlobalCfg, serviceConfig, mapNodeService, err := cls.readServiceConfig(path)
if err != nil {
return err
} }
if currGlobalCfg != nil { if currGlobalCfg != nil {
//不允许重复的配置global配置 //不允许重复的配置global配置
if globalCfg != nil { if globalCfg != nil {
return fmt.Errorf("[Global] does not allow repeated configuration in %s", f.Name()) return fmt.Errorf("[Global] does not allow repeated configuration in %s", info.Name())
} }
globalCfg = currGlobalCfg globalCfg = currGlobalCfg
} }
@@ -366,7 +376,7 @@ func (cls *Cluster) readLocalService(localNodeId string) error {
pubCfg, ok := serviceConfig[s] pubCfg, ok := serviceConfig[s]
if ok == true { if ok == true {
if _, publicOk := publicService[s]; publicOk == true { if _, publicOk := publicService[s]; publicOk == true {
return fmt.Errorf("public service [%s] does not allow repeated configuration in %s", s, f.Name()) return fmt.Errorf("public service [%s] does not allow repeated configuration in %s", s, info.Name())
} }
publicService[s] = pubCfg publicService[s] = pubCfg
} }
@@ -382,12 +392,17 @@ func (cls *Cluster) readLocalService(localNodeId string) error {
} }
if _, nodeOK := nodeService[s]; nodeOK == true { if _, nodeOK := nodeService[s]; nodeOK == true {
return fmt.Errorf("NodeService NodeId[%s] Service[%s] does not allow repeated configuration in %s", cls.localNodeInfo.NodeId, s, f.Name()) return fmt.Errorf("NodeService NodeId[%s] Service[%s] does not allow repeated configuration in %s", cls.localNodeInfo.NodeId, s, info.Name())
} }
nodeService[s] = nodeCfg nodeService[s] = nodeCfg
break break
} }
} }
return nil
})
if err != nil {
return err
} }
//组合所有的配置 //组合所有的配置
@@ -417,13 +432,12 @@ func (cls *Cluster) readLocalService(localNodeId string) error {
return nil return nil
} }
func (cls *Cluster) parseLocalCfg() { func (cls *Cluster) parseLocalCfg() error{
rpcInfo := NodeRpcInfo{} rpcInfo := NodeRpcInfo{}
rpcInfo.nodeInfo = cls.localNodeInfo rpcInfo.nodeInfo = cls.localNodeInfo
rpcInfo.client = rpc.NewLClient(rpcInfo.nodeInfo.NodeId, &cls.callSet) rpcInfo.client = rpc.NewLClient(rpcInfo.nodeInfo.NodeId, &cls.callSet)
cls.mapRpc[cls.localNodeInfo.NodeId] = &rpcInfo cls.mapRpc[cls.localNodeInfo.NodeId] = &rpcInfo
for _, serviceName := range cls.localNodeInfo.ServiceList { for _, serviceName := range cls.localNodeInfo.ServiceList {
splitServiceName := strings.Split(serviceName, ":") splitServiceName := strings.Split(serviceName, ":")
if len(splitServiceName) == 2 { if len(splitServiceName) == 2 {
@@ -440,8 +454,13 @@ func (cls *Cluster) parseLocalCfg() {
cls.mapServiceNode[serviceName] = make(map[string]struct{}) cls.mapServiceNode[serviceName] = make(map[string]struct{})
} }
if _,ok:=cls.mapServiceNode[serviceName][cls.localNodeInfo.NodeId];ok {
return fmt.Errorf("duplicate service %s is configured in node %s", serviceName, cls.localNodeInfo.NodeId)
}
cls.mapServiceNode[serviceName][cls.localNodeInfo.NodeId] = struct{}{} cls.mapServiceNode[serviceName][cls.localNodeInfo.NodeId] = struct{}{}
} }
return nil
} }
func (cls *Cluster) IsNatsMode() bool { func (cls *Cluster) IsNatsMode() bool {
@@ -474,8 +493,7 @@ func (cls *Cluster) InitCfg(localNodeId string) error {
} }
//本地配置服务加到全局map信息中 //本地配置服务加到全局map信息中
cls.parseLocalCfg() return cls.parseLocalCfg()
return nil
} }
func (cls *Cluster) IsConfigService(serviceName string) bool { func (cls *Cluster) IsConfigService(serviceName string) bool {

View File

@@ -1,36 +1,53 @@
package log package log
import ( import (
"github.com/duanhf2012/rotatelogs"
"go.uber.org/zap" "go.uber.org/zap"
"go.uber.org/zap/zapcore" "go.uber.org/zap/zapcore"
"gopkg.in/natefinch/lumberjack.v2" "gopkg.in/natefinch/lumberjack.v2"
"os" "os"
"path/filepath"
"strings"
"time" "time"
) )
var isSetLogger bool var isSetLogger bool
var gLogger = NewDefaultLogger() var gLogger = NewDefaultLogger()
var LogLevel zapcore.Level
var MaxSize int
var LogPath string
var OpenConsole *bool
var LogChanLen int
type Logger struct { type Logger struct {
*zap.Logger *zap.Logger
stack bool stack bool
OpenConsole *bool FileName string
LogPath string Skip int
FileName string Encoder zapcore.Encoder
LogLevel zapcore.Level SugaredLogger *zap.SugaredLogger
Encoder zapcore.Encoder CoreList []zapcore.Core
LogConfig *lumberjack.Logger WriteSyncerFun []func() zapcore.WriteSyncer
sugaredLogger *zap.SugaredLogger
} }
// 设置Logger
func SetLogger(logger *Logger) { func SetLogger(logger *Logger) {
if logger != nil && isSetLogger == false { if logger != nil {
gLogger = logger gLogger = logger
isSetLogger = true isSetLogger = true
} }
} }
// 设置ZapLogger
func SetZapLogger(zapLogger *zap.Logger) {
if zapLogger != nil {
gLogger = &Logger{}
gLogger.Logger = zapLogger
isSetLogger = true
}
}
func GetLogger() *Logger { func GetLogger() *Logger {
return gLogger return gLogger
} }
@@ -39,10 +56,14 @@ func (logger *Logger) SetEncoder(encoder zapcore.Encoder) {
logger.Encoder = encoder logger.Encoder = encoder
} }
func (logger *Logger) SetSkip(skip int) {
logger.Skip = skip
}
func GetJsonEncoder() zapcore.Encoder { func GetJsonEncoder() zapcore.Encoder {
encoderConfig := zap.NewProductionEncoderConfig() encoderConfig := zap.NewProductionEncoderConfig()
encoderConfig.EncodeLevel = zapcore.CapitalLevelEncoder
encoderConfig.EncodeCaller = zapcore.ShortCallerEncoder encoderConfig.EncodeCaller = zapcore.ShortCallerEncoder
encoderConfig.EncodeLevel = zapcore.CapitalLevelEncoder
encoderConfig.EncodeTime = func(t time.Time, enc zapcore.PrimitiveArrayEncoder) { encoderConfig.EncodeTime = func(t time.Time, enc zapcore.PrimitiveArrayEncoder) {
enc.AppendString(t.Format("2006-01-02 15:04:05.000")) enc.AppendString(t.Format("2006-01-02 15:04:05.000"))
} }
@@ -61,51 +82,105 @@ func GetTxtEncoder() zapcore.Encoder {
return zapcore.NewConsoleEncoder(encoderConfig) return zapcore.NewConsoleEncoder(encoderConfig)
} }
func getLogConfig() *lumberjack.Logger { func (logger *Logger) getLogConfig() *lumberjack.Logger {
return &lumberjack.Logger{ return &lumberjack.Logger{
Filename: "", Filename: filepath.Join(LogPath, logger.FileName),
MaxSize: 2048, MaxSize: MaxSize,
MaxBackups: 0, MaxBackups: 0,
MaxAge: 0, MaxAge: 0,
Compress: false, Compress: false,
LocalTime: true,
} }
} }
func NewDefaultLogger() *Logger { func NewDefaultLogger() *Logger {
logger := Logger{} logger := Logger{}
logger.Encoder = GetJsonEncoder() logger.Encoder = GetJsonEncoder()
logger.LogConfig = getLogConfig() core := zapcore.NewCore(logger.Encoder, zapcore.AddSync(os.Stdout), zap.InfoLevel)
logger.LogConfig.LocalTime = true logger.Logger = zap.New(core, zap.AddCaller(), zap.AddCallerSkip(1))
logger.Init()
return &logger return &logger
} }
func (logger *Logger) SetLogLevel(level zapcore.Level) { func (logger *Logger) SetSyncers(syncers ...func() zapcore.WriteSyncer) {
logger.LogLevel = level logger.WriteSyncerFun = syncers
}
func (logger *Logger) AppendSyncerFun(syncerFun func() zapcore.WriteSyncer) {
logger.WriteSyncerFun = append(logger.WriteSyncerFun, syncerFun)
}
func SetLogLevel(level zapcore.Level) {
LogLevel = level
} }
func (logger *Logger) Enabled(zapcore.Level) bool { func (logger *Logger) Enabled(zapcore.Level) bool {
return logger.stack return logger.stack
} }
func (logger *Logger) Init() { func (logger *Logger) NewLumberjackWriter() zapcore.WriteSyncer {
var coreList []zapcore.Core return zapcore.AddSync(
&lumberjack.Logger{
Filename: filepath.Join(LogPath, logger.FileName),
MaxSize: MaxSize,
MaxBackups: 0,
MaxAge: 0,
Compress: false,
LocalTime: true,
})
}
if logger.OpenConsole == nil || *logger.OpenConsole { func (logger *Logger) NewRotatelogsWriter() zapcore.WriteSyncer {
core := zapcore.NewCore(logger.Encoder, zapcore.AddSync(os.Stdout), logger.LogLevel) var options []rotatelogs.Option
if MaxSize > 0 {
options = append(options, rotatelogs.WithRotateMaxSize(int64(MaxSize)))
}
if LogChanLen > 0 {
options = append(options, rotatelogs.WithChannelLen(LogChanLen))
}
options = append(options, rotatelogs.WithRotationTime(time.Hour*24))
fileName := strings.TrimRight(logger.FileName, filepath.Ext(logger.FileName))
rotateLogs, err := rotatelogs.NewRotateLogs(LogPath, "20060102/"+fileName+"_20060102_150405", options...)
if err != nil {
panic(err)
}
return zapcore.AddSync(rotateLogs)
}
func (logger *Logger) Init() {
if isSetLogger {
return
}
var syncerList []zapcore.WriteSyncer
if logger.WriteSyncerFun == nil {
syncerList = append(syncerList, logger.NewRotatelogsWriter())
} else {
for _, syncer := range logger.WriteSyncerFun {
syncerList = append(syncerList, syncer())
}
}
var coreList []zapcore.Core
if OpenConsole == nil || *OpenConsole {
syncerList = append(syncerList, zapcore.AddSync(os.Stdout))
}
for _, writer := range syncerList {
core := zapcore.NewCore(logger.Encoder, writer, LogLevel)
coreList = append(coreList, core) coreList = append(coreList, core)
} }
if logger.LogPath != "" { if logger.CoreList != nil {
writeSyncer := zapcore.AddSync(logger.LogConfig) coreList = append(coreList, logger.CoreList...)
core := zapcore.NewCore(logger.Encoder, writeSyncer, logger.LogLevel)
coreList = append(coreList, core)
} }
core := zapcore.NewTee(coreList...) core := zapcore.NewTee(coreList...)
logger.Logger = zap.New(core, zap.AddCaller(), zap.AddStacktrace(logger), zap.AddCallerSkip(1)) logger.Logger = zap.New(core, zap.AddCaller(), zap.AddStacktrace(logger), zap.AddCallerSkip(1+logger.Skip))
logger.sugaredLogger = logger.Logger.Sugar() logger.SugaredLogger = logger.Logger.Sugar()
} }
func (logger *Logger) Debug(msg string, fields ...zap.Field) { func (logger *Logger) Debug(msg string, fields ...zap.Field) {
@@ -165,84 +240,84 @@ func Fatal(msg string, fields ...zap.Field) {
} }
func Debugf(template string, args ...any) { func Debugf(template string, args ...any) {
gLogger.sugaredLogger.Debugf(template, args...) gLogger.SugaredLogger.Debugf(template, args...)
} }
func Infof(template string, args ...any) { func Infof(template string, args ...any) {
gLogger.sugaredLogger.Infof(template, args...) gLogger.SugaredLogger.Infof(template, args...)
} }
func Warnf(template string, args ...any) { func Warnf(template string, args ...any) {
gLogger.sugaredLogger.Warnf(template, args...) gLogger.SugaredLogger.Warnf(template, args...)
} }
func Errorf(template string, args ...any) { func Errorf(template string, args ...any) {
gLogger.sugaredLogger.Errorf(template, args...) gLogger.SugaredLogger.Errorf(template, args...)
} }
func StackErrorf(template string, args ...any) { func StackErrorf(template string, args ...any) {
gLogger.stack = true gLogger.stack = true
gLogger.sugaredLogger.Errorf(template, args...) gLogger.SugaredLogger.Errorf(template, args...)
gLogger.stack = false gLogger.stack = false
} }
func Fatalf(template string, args ...any) { func Fatalf(template string, args ...any) {
gLogger.sugaredLogger.Fatalf(template, args...) gLogger.SugaredLogger.Fatalf(template, args...)
} }
func (logger *Logger) SDebug(args ...interface{}) { func (logger *Logger) SDebug(args ...interface{}) {
logger.sugaredLogger.Debugln(args...) logger.SugaredLogger.Debugln(args...)
} }
func (logger *Logger) SInfo(args ...interface{}) { func (logger *Logger) SInfo(args ...interface{}) {
logger.sugaredLogger.Infoln(args...) logger.SugaredLogger.Infoln(args...)
} }
func (logger *Logger) SWarn(args ...interface{}) { func (logger *Logger) SWarn(args ...interface{}) {
logger.sugaredLogger.Warnln(args...) logger.SugaredLogger.Warnln(args...)
} }
func (logger *Logger) SError(args ...interface{}) { func (logger *Logger) SError(args ...interface{}) {
logger.sugaredLogger.Errorln(args...) logger.SugaredLogger.Errorln(args...)
} }
func (logger *Logger) SStackError(args ...interface{}) { func (logger *Logger) SStackError(args ...interface{}) {
gLogger.stack = true gLogger.stack = true
logger.sugaredLogger.Errorln(args...) logger.SugaredLogger.Errorln(args...)
gLogger.stack = false gLogger.stack = false
} }
func (logger *Logger) SFatal(args ...interface{}) { func (logger *Logger) SFatal(args ...interface{}) {
gLogger.stack = true gLogger.stack = true
logger.sugaredLogger.Fatalln(args...) logger.SugaredLogger.Fatalln(args...)
gLogger.stack = false gLogger.stack = false
} }
func SDebug(args ...interface{}) { func SDebug(args ...interface{}) {
gLogger.sugaredLogger.Debugln(args...) gLogger.SugaredLogger.Debugln(args...)
} }
func SInfo(args ...interface{}) { func SInfo(args ...interface{}) {
gLogger.sugaredLogger.Infoln(args...) gLogger.SugaredLogger.Infoln(args...)
} }
func SWarn(args ...interface{}) { func SWarn(args ...interface{}) {
gLogger.sugaredLogger.Warnln(args...) gLogger.SugaredLogger.Warnln(args...)
} }
func SError(args ...interface{}) { func SError(args ...interface{}) {
gLogger.sugaredLogger.Errorln(args...) gLogger.SugaredLogger.Errorln(args...)
} }
func SStackError(args ...interface{}) { func SStackError(args ...interface{}) {
gLogger.stack = true gLogger.stack = true
gLogger.sugaredLogger.Errorln(args...) gLogger.SugaredLogger.Errorln(args...)
gLogger.stack = false gLogger.stack = false
} }
func SFatal(args ...interface{}) { func SFatal(args ...interface{}) {
gLogger.stack = true gLogger.stack = true
gLogger.sugaredLogger.Fatalln(args...) gLogger.SugaredLogger.Fatalln(args...)
gLogger.stack = false gLogger.stack = false
} }

View File

@@ -73,9 +73,22 @@ func (handler *WSHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
} }
handler.conns[conn] = struct{}{} handler.conns[conn] = struct{}{}
handler.mutexConns.Unlock() handler.mutexConns.Unlock()
c,ok:=conn.NetConn().(*net.TCPConn)
if !ok {
tlsConn,ok := conn.NetConn().(*tls.Conn)
if !ok {
log.Error("conn error")
return
}
c,ok = tlsConn.NetConn().(*net.TCPConn)
if !ok {
log.Error("conn error")
return
}
}
conn.UnderlyingConn().(*net.TCPConn).SetLinger(0) c.SetLinger(0)
conn.UnderlyingConn().(*net.TCPConn).SetNoDelay(true) c.SetNoDelay(true)
wsConn := newWSConn(conn, r.Header, handler.pendingWriteNum, handler.maxMsgLen, handler.messageType) wsConn := newWSConn(conn, r.Header, handler.pendingWriteNum, handler.maxMsgLen, handler.messageType)
agent := handler.newAgent(wsConn) agent := handler.newAgent(wsConn)
agent.Run() agent.Run()

View File

@@ -17,7 +17,6 @@ import (
_ "net/http/pprof" _ "net/http/pprof"
"os" "os"
"os/signal" "os/signal"
"path/filepath"
"strconv" "strconv"
"strings" "strings"
"syscall" "syscall"
@@ -59,6 +58,7 @@ func init() {
console.RegisterCommandString("loglevel", "debug", "<-loglevel debug|info|warn|error|stackerror|fatal> Set loglevel.", setLevel) console.RegisterCommandString("loglevel", "debug", "<-loglevel debug|info|warn|error|stackerror|fatal> Set loglevel.", setLevel)
console.RegisterCommandString("logpath", "", "<-logpath path> Set log file path.", setLogPath) console.RegisterCommandString("logpath", "", "<-logpath path> Set log file path.", setLogPath)
console.RegisterCommandInt("logsize", 0, "<-logsize size> Set log size(MB).", setLogSize) console.RegisterCommandInt("logsize", 0, "<-logsize size> Set log size(MB).", setLogSize)
console.RegisterCommandInt("logchanlen", 0, "<-logchanlen len> Set log channel len.", setLogChanLen)
console.RegisterCommandString("pprof", "", "<-pprof ip:port> Open performance analysis.", setPprof) console.RegisterCommandString("pprof", "", "<-pprof ip:port> Open performance analysis.", setPprof)
} }
@@ -220,7 +220,7 @@ func initNode(id string) {
func initLog() error { func initLog() error {
logger := log.GetLogger() logger := log.GetLogger()
if logger.LogPath == "" { if log.LogPath == "" {
err := setLogPath("./log") err := setLogPath("./log")
if err != nil { if err != nil {
return err return err
@@ -230,8 +230,6 @@ func initLog() error {
localNodeInfo := cluster.GetCluster().GetLocalNodeInfo() localNodeInfo := cluster.GetCluster().GetLocalNodeInfo()
fileName := fmt.Sprintf("%s.log", localNodeInfo.NodeId) fileName := fmt.Sprintf("%s.log", localNodeInfo.NodeId)
logger.FileName = fileName logger.FileName = fileName
filepath.Join()
logger.LogConfig.Filename = filepath.Join(logger.LogPath, logger.FileName)
logger.Init() logger.Init()
return nil return nil
@@ -441,10 +439,10 @@ func openConsole(args interface{}) error {
strOpen := strings.ToLower(strings.TrimSpace(args.(string))) strOpen := strings.ToLower(strings.TrimSpace(args.(string)))
if strOpen == "false" { if strOpen == "false" {
bOpenConsole := false bOpenConsole := false
log.GetLogger().OpenConsole = &bOpenConsole log.OpenConsole = &bOpenConsole
} else if strOpen == "true" { } else if strOpen == "true" {
bOpenConsole := true bOpenConsole := true
log.GetLogger().OpenConsole = &bOpenConsole log.OpenConsole = &bOpenConsole
} else { } else {
return errors.New("parameter console error") return errors.New("parameter console error")
} }
@@ -459,17 +457,17 @@ func setLevel(args interface{}) error {
strlogLevel := strings.TrimSpace(args.(string)) strlogLevel := strings.TrimSpace(args.(string))
switch strlogLevel { switch strlogLevel {
case "debug": case "debug":
log.GetLogger().LogLevel = zapcore.DebugLevel log.LogLevel = zapcore.DebugLevel
case "info": case "info":
log.GetLogger().LogLevel = zapcore.InfoLevel log.LogLevel = zapcore.InfoLevel
case "warn": case "warn":
log.GetLogger().LogLevel = zapcore.WarnLevel log.LogLevel = zapcore.WarnLevel
case "error": case "error":
log.GetLogger().LogLevel = zapcore.ErrorLevel log.LogLevel = zapcore.ErrorLevel
case "stackerror": case "stackerror":
log.GetLogger().LogLevel = zapcore.ErrorLevel log.LogLevel = zapcore.ErrorLevel
case "fatal": case "fatal":
log.GetLogger().LogLevel = zapcore.FatalLevel log.LogLevel = zapcore.FatalLevel
default: default:
return errors.New("unknown level: " + strlogLevel) return errors.New("unknown level: " + strlogLevel)
} }
@@ -481,19 +479,15 @@ func setLogPath(args interface{}) error {
return nil return nil
} }
logPath := strings.TrimSpace(args.(string)) logPath := strings.TrimSpace(args.(string))
dir, err := os.Stat(logPath) _, err := os.Stat(logPath)
if err == nil && dir.IsDir() == false {
return errors.New("Not found dir " + logPath)
}
if err != nil { if err != nil {
err = os.Mkdir(log.GetLogger().LogPath, os.ModePerm) err = os.MkdirAll(logPath, os.ModePerm)
if err != nil { if err != nil {
return errors.New("Cannot create dir " + log.GetLogger().LogPath) return errors.New("Cannot create dir " + logPath)
} }
} }
log.GetLogger().LogPath = logPath log.LogPath = logPath
return nil return nil
} }
@@ -506,7 +500,20 @@ func setLogSize(args interface{}) error {
return nil return nil
} }
log.GetLogger().LogConfig.MaxSize = logSize log.MaxSize = logSize
return nil return nil
} }
func setLogChanLen(args interface{}) error {
logChanLen, ok := args.(int)
if ok == false {
return errors.New("param logsize is error")
}
if logChanLen == 0 {
return nil
}
log.LogChanLen = logChanLen
return nil
}

View File

@@ -11,6 +11,7 @@ import (
"github.com/duanhf2012/origin/v2/log" "github.com/duanhf2012/origin/v2/log"
rpcHandle "github.com/duanhf2012/origin/v2/rpc" rpcHandle "github.com/duanhf2012/origin/v2/rpc"
"github.com/duanhf2012/origin/v2/util/timer" "github.com/duanhf2012/origin/v2/util/timer"
"slices"
) )
const InitModuleId = 1e9 const InitModuleId = 1e9
@@ -46,7 +47,7 @@ type Module struct {
moduleName string //模块名称 moduleName string //模块名称
parent IModule //父亲 parent IModule //父亲
self IModule //自己 self IModule //自己
child map[uint32]IModule //孩子们 child []IModule //孩子们
mapActiveTimer map[timer.ITimer]struct{} mapActiveTimer map[timer.ITimer]struct{}
mapActiveIdTimer map[uint64]timer.ITimer mapActiveIdTimer map[uint64]timer.ITimer
dispatcher *timer.Dispatcher //timer dispatcher *timer.Dispatcher //timer
@@ -93,10 +94,7 @@ func (m *Module) AddModule(module IModule) (uint32, error) {
pAddModule.moduleId = m.NewModuleId() pAddModule.moduleId = m.NewModuleId()
} }
if m.child == nil { _,ok := m.ancestor.getBaseModule().(*Module).descendants[module.GetModuleId()]
m.child = map[uint32]IModule{}
}
_, ok := m.child[module.GetModuleId()]
if ok == true { if ok == true {
return 0, fmt.Errorf("exists module id %d", module.GetModuleId()) return 0, fmt.Errorf("exists module id %d", module.GetModuleId())
} }
@@ -109,29 +107,33 @@ func (m *Module) AddModule(module IModule) (uint32, error) {
pAddModule.eventHandler = event.NewEventHandler() pAddModule.eventHandler = event.NewEventHandler()
pAddModule.eventHandler.Init(m.eventHandler.GetEventProcessor()) pAddModule.eventHandler.Init(m.eventHandler.GetEventProcessor())
pAddModule.IConcurrent = m.IConcurrent pAddModule.IConcurrent = m.IConcurrent
m.child = append(m.child,module)
m.ancestor.getBaseModule().(*Module).descendants[module.GetModuleId()] = module
err := module.OnInit() err := module.OnInit()
if err != nil { if err != nil {
delete(m.ancestor.getBaseModule().(*Module).descendants, module.GetModuleId())
m.child = m.child[:len(m.child)-1]
log.Error("module OnInit error",log.String("ModuleName",module.GetModuleName()),log.ErrorField("err",err))
return 0, err return 0, err
} }
m.child[module.GetModuleId()] = module
m.ancestor.getBaseModule().(*Module).descendants[module.GetModuleId()] = module
log.Debug("Add module " + module.GetModuleName() + " completed") log.Debug("Add module " + module.GetModuleName() + " completed")
return module.GetModuleId(), nil return module.GetModuleId(), nil
} }
func (m *Module) ReleaseModule(moduleId uint32) { func (m *Module) ReleaseModule(moduleId uint32) {
pModule := m.GetModule(moduleId).getBaseModule().(*Module) pModule := m.GetModule(moduleId).getBaseModule().(*Module)
pModule.self.OnRelease()
log.Debug("Release module " + pModule.GetModuleName())
//释放子孙 for i:=len(pModule.child)-1; i>=0; i-- {
for id := range pModule.child { m.ReleaseModule(pModule.child[i].GetModuleId())
m.ReleaseModule(id)
} }
pModule.self.OnRelease()
pModule.GetEventHandler().Destroy() pModule.GetEventHandler().Destroy()
log.Debug("Release module " + pModule.GetModuleName())
for pTimer := range pModule.mapActiveTimer { for pTimer := range pModule.mapActiveTimer {
pTimer.Cancel() pTimer.Cancel()
} }
@@ -140,7 +142,10 @@ func (m *Module) ReleaseModule(moduleId uint32) {
t.Cancel() t.Cancel()
} }
delete(m.child, moduleId) m.child = slices.DeleteFunc(m.child, func(module IModule) bool {
return module.GetModuleId() == moduleId
})
delete(m.ancestor.getBaseModule().(*Module).descendants, moduleId) delete(m.ancestor.getBaseModule().(*Module).descendants, moduleId)
//清理被删除的Module //清理被删除的Module

View File

@@ -192,7 +192,7 @@ func (s *Service) run() {
break break
} }
if s.profiler != nil { if s.profiler != nil {
analyzer = s.profiler.Push("[Req]" + rpcRequest.RpcRequestData.GetServiceMethod()) analyzer = s.profiler.Push("[RpcReq]" + rpcRequest.RpcRequestData.GetServiceMethod()+"."+strconv.Itoa(int(rpcRequest.RpcRequestData.GetRpcMethodId())))
} }
s.GetRpcHandler().HandlerRpcRequest(rpcRequest) s.GetRpcHandler().HandlerRpcRequest(rpcRequest)
@@ -266,8 +266,10 @@ func (s *Service) Release() {
if atomic.AddInt32(&s.isRelease, -1) == -1 { if atomic.AddInt32(&s.isRelease, -1) == -1 {
s.self.OnRelease() s.self.OnRelease()
for i:=len(s.child)-1; i>=0; i-- {
s.ReleaseModule(s.child[i].GetModuleId())
}
} }
} }
func (s *Service) OnRelease() { func (s *Service) OnRelease() {
@@ -432,6 +434,7 @@ func (s *Service) SetEventChannelNum(num int) {
} }
} }
// Deprecated: replace it with the OpenConcurrent function
func (s *Service) SetGoRoutineNum(goroutineNum int32) bool { func (s *Service) SetGoRoutineNum(goroutineNum int32) bool {
//已经开始状态不允许修改协程数量,打开性能分析器不允许开多线程 //已经开始状态不允许修改协程数量,打开性能分析器不允许开多线程
if s.startStatus == true || s.profiler != nil { if s.startStatus == true || s.profiler != nil {

View File

@@ -248,7 +248,7 @@ func (ch *ConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession
select { select {
case msg := <-claim.Messages(): case msg := <-claim.Messages():
if msg == nil { if msg == nil {
log.SWarning("claim will exit", log.Any("topic", claim.Topic()), log.Any("Partition", claim.Partition())) log.SWarn("claim will exit", log.Any("topic", claim.Topic()), log.Any("Partition", claim.Partition()))
return nil return nil
} }
ch.AppendMsg(session, msg) ch.AppendMsg(session, msg)

View File

@@ -86,7 +86,7 @@ func (p *Producer) asyncRun() {
asyncReturn := sm.Metadata.(*AsyncReturn) asyncReturn := sm.Metadata.(*AsyncReturn)
asyncReturn.chanReturn <- asyncReturn asyncReturn.chanReturn <- asyncReturn
case em := <-p.Errors(): case em := <-p.Errors():
log.Error("async kafkamodule error", log.ErrorAttr("err", em.Err)) log.Error("async kafkamodule error", log.ErrorField("err", em.Err))
if em.Msg.Metadata == nil { if em.Msg.Metadata == nil {
break break
} }

View File

@@ -95,7 +95,7 @@ func (m *MySQLModule) Begin() (*Tx, error) {
var txDBModule Tx var txDBModule Tx
txDb, err := m.db.Begin() txDb, err := m.db.Begin()
if err != nil { if err != nil {
log.Error("Begin error:%s", err.Error()) log.Error("Begin error", log.ErrorField("err",err))
return &txDBModule, err return &txDBModule, err
} }
txDBModule.slowDuration = m.slowDuration txDBModule.slowDuration = m.slowDuration
@@ -155,7 +155,7 @@ func (m *MySQLModule) runPing() {
for { for {
select { select {
case <-m.pingCoroutine.pintExit: case <-m.pingCoroutine.pintExit:
log.Error("RunPing stopping %s...", fmt.Sprintf("%T", m)) log.Error("RunPing stopping",log.String("url", m.url),log.String("dbname", m.dbname))
return return
case <-m.pingCoroutine.tickerPing.C: case <-m.pingCoroutine.tickerPing.C:
if m.db != nil { if m.db != nil {
@@ -221,12 +221,12 @@ func query(slowDuration time.Duration, db dbControl, strQuery string, args ...in
datasetList.blur = true datasetList.blur = true
if checkArgs(args) != nil { if checkArgs(args) != nil {
log.Error("CheckArgs is error :%s", strQuery) log.Error("CheckArgs is error",log.String("sql",strQuery))
return &datasetList, fmt.Errorf("checkArgs is error") return &datasetList, fmt.Errorf("checkArgs is error")
} }
if db == nil { if db == nil {
log.Error("cannot connect database:%s", strQuery) log.Error("cannot connect database",log.String("sql", strQuery))
return &datasetList, fmt.Errorf("cannot connect database") return &datasetList, fmt.Errorf("cannot connect database")
} }
@@ -235,10 +235,10 @@ func query(slowDuration time.Duration, db dbControl, strQuery string, args ...in
timeFuncPass := time.Since(TimeFuncStart) timeFuncPass := time.Since(TimeFuncStart)
if checkSlow(slowDuration, timeFuncPass) { if checkSlow(slowDuration, timeFuncPass) {
log.Error("DBModule QueryEx Time %s , Query :%s , args :%+v", timeFuncPass, strQuery, args) log.Error("Query slow",log.Int64("time_ms",timeFuncPass.Milliseconds()),log.String("sql", strQuery), log.Any("args",args))
} }
if err != nil { if err != nil {
log.Error("Query:%s(%v)", strQuery, err) log.Error("Query error", log.String("sql",strQuery),log.ErrorField("err",err))
if rows != nil { if rows != nil {
rows.Close() rows.Close()
} }
@@ -278,8 +278,8 @@ func query(slowDuration time.Duration, db dbControl, strQuery string, args ...in
hasRet := rows.NextResultSet() hasRet := rows.NextResultSet()
if hasRet == false { if hasRet == false {
if rows.Err() != nil { if rowErr :=rows.Err();rowErr != nil {
log.Error("Query:%s(%+v)", strQuery, rows) log.Error("NextResultSet error", log.String("sql",strQuery), log.ErrorField("err",rowErr))
} }
break break
} }
@@ -291,12 +291,12 @@ func query(slowDuration time.Duration, db dbControl, strQuery string, args ...in
func exec(slowDuration time.Duration, db dbControl, strSql string, args ...interface{}) (*DBResult, error) { func exec(slowDuration time.Duration, db dbControl, strSql string, args ...interface{}) (*DBResult, error) {
ret := &DBResult{} ret := &DBResult{}
if db == nil { if db == nil {
log.Error("cannot connect database:%s", strSql) log.Error("cannot connect database", log.String("sql",strSql))
return ret, fmt.Errorf("cannot connect database") return ret, fmt.Errorf("cannot connect database")
} }
if checkArgs(args) != nil { if checkArgs(args) != nil {
log.Error("CheckArgs is error :%s", strSql) log.Error("CheckArgs is error", log.String("sql",strSql))
return ret, fmt.Errorf("checkArgs is error") return ret, fmt.Errorf("checkArgs is error")
} }
@@ -304,10 +304,10 @@ func exec(slowDuration time.Duration, db dbControl, strSql string, args ...inter
res, err := db.Exec(strSql, args...) res, err := db.Exec(strSql, args...)
timeFuncPass := time.Since(TimeFuncStart) timeFuncPass := time.Since(TimeFuncStart)
if checkSlow(slowDuration, timeFuncPass) { if checkSlow(slowDuration, timeFuncPass) {
log.Error("DBModule QueryEx Time %s , Query :%s , args :%+v", timeFuncPass, strSql, args) log.Error("Exec slow",log.Int64("time_ms",timeFuncPass.Milliseconds()),log.String("sql",strSql),log.Any("args",args) )
} }
if err != nil { if err != nil {
log.Error("Exec:%s(%v)", strSql, err) log.Error("Exec error",log.String("sql",strSql),log.ErrorField("err", err))
return nil, err return nil, err
} }

View File

@@ -14,7 +14,7 @@ import (
type WSModule struct { type WSModule struct {
service.Module service.Module
wsServer network.WSServer WSServer network.WSServer
mapClientLocker sync.RWMutex mapClientLocker sync.RWMutex
mapClient map[string]*WSClient mapClient map[string]*WSClient
@@ -34,6 +34,8 @@ type WSCfg struct {
PendingWriteNum int PendingWriteNum int
MaxMsgLen uint32 MaxMsgLen uint32
LittleEndian bool //是否小端序 LittleEndian bool //是否小端序
KeyFile string
CertFile string
} }
type WSPackType int8 type WSPackType int8
@@ -57,18 +59,23 @@ func (ws *WSModule) OnInit() error {
return fmt.Errorf("please call the Init function correctly") return fmt.Errorf("please call the Init function correctly")
} }
ws.wsServer.MaxConnNum = ws.wsCfg.MaxConnNum ws.WSServer.MaxConnNum = ws.wsCfg.MaxConnNum
ws.wsServer.PendingWriteNum = ws.wsCfg.PendingWriteNum ws.WSServer.PendingWriteNum = ws.wsCfg.PendingWriteNum
ws.wsServer.MaxMsgLen = ws.wsCfg.MaxMsgLen ws.WSServer.MaxMsgLen = ws.wsCfg.MaxMsgLen
ws.wsServer.Addr = ws.wsCfg.ListenAddr ws.WSServer.Addr = ws.wsCfg.ListenAddr
//3.设置解析处理器 if ws.wsCfg.KeyFile != "" && ws.wsCfg.CertFile != "" {
ws.WSServer.KeyFile = ws.wsCfg.KeyFile
ws.WSServer.CertFile = ws.wsCfg.CertFile
}
// 设置解析处理器
ws.process.SetByteOrder(ws.wsCfg.LittleEndian) ws.process.SetByteOrder(ws.wsCfg.LittleEndian)
ws.mapClient = make(map[string]*WSClient, ws.wsServer.MaxConnNum) ws.mapClient = make(map[string]*WSClient, ws.WSServer.MaxConnNum)
ws.wsServer.NewAgent = ws.NewWSClient ws.WSServer.NewAgent = ws.NewWSClient
//4.设置网络事件处理 // 设置网络事件处理
ws.GetEventProcessor().RegEventReceiverFunc(event.Sys_Event_WebSocket, ws.GetEventHandler(), ws.wsEventHandler) ws.GetEventProcessor().RegEventReceiverFunc(event.Sys_Event_WebSocket, ws.GetEventHandler(), ws.wsEventHandler)
return nil return nil
@@ -80,7 +87,7 @@ func (ws *WSModule) Init(wsCfg *WSCfg, process processor.IRawProcessor) {
} }
func (ws *WSModule) Start() error { func (ws *WSModule) Start() error {
return ws.wsServer.Start() return ws.WSServer.Start()
} }
func (ws *WSModule) wsEventHandler(ev event.IEvent) { func (ws *WSModule) wsEventHandler(ev event.IEvent) {
@@ -197,3 +204,7 @@ func (ws *WSModule) SendRawMsg(clientId string, msg []byte) error {
ws.mapClientLocker.Unlock() ws.mapClientLocker.Unlock()
return client.wsConn.WriteMsg(msg) return client.wsConn.WriteMsg(msg)
} }
func (ws *WSModule) SetMessageType(messageType int) {
ws.WSServer.SetMessageType(messageType)
}

View File

@@ -83,6 +83,7 @@ type HttpSession struct {
sessionDone chan *HttpSession sessionDone chan *HttpSession
} }
// Deprecated: replace it with the GinModule
type HttpService struct { type HttpService struct {
service.Service service.Service

View File

@@ -104,11 +104,11 @@ func (cs *CustomerSubscriber) UnSubscribe() {
func (cs *CustomerSubscriber) LoadLastIndex() { func (cs *CustomerSubscriber) LoadLastIndex() {
for { for {
if atomic.LoadInt32(&cs.isStop) != 0 { if atomic.LoadInt32(&cs.isStop) != 0 {
log.Info("topic ", cs.topic, " out of subscription") log.SInfo("topic ", cs.topic, " out of subscription")
break break
} }
log.Info("customer ", cs.customerId, " start load last index ") log.SInfo("customer ", cs.customerId, " start load last index ")
lastIndex, ret := cs.subscriber.dataPersist.LoadCustomerIndex(cs.topic, cs.customerId) lastIndex, ret := cs.subscriber.dataPersist.LoadCustomerIndex(cs.topic, cs.customerId)
if ret == true { if ret == true {
if lastIndex > 0 { if lastIndex > 0 {
@@ -116,18 +116,18 @@ func (cs *CustomerSubscriber) LoadLastIndex() {
} else { } else {
//否则直接使用客户端发回来的 //否则直接使用客户端发回来的
} }
log.Info("customer ", cs.customerId, " load finish,start index is ", cs.StartIndex) log.SInfo("customer ", cs.customerId, " load finish,start index is ", cs.StartIndex)
break break
} }
log.Info("customer ", cs.customerId, " load last index is fail...") log.SInfo("customer ", cs.customerId, " load last index is fail...")
time.Sleep(5 * time.Second) time.Sleep(5 * time.Second)
} }
} }
func (cs *CustomerSubscriber) SubscribeRun() { func (cs *CustomerSubscriber) SubscribeRun() {
defer cs.subscriber.queueWait.Done() defer cs.subscriber.queueWait.Done()
log.Info("topic ", cs.topic, " start subscription") log.SInfo("topic ", cs.topic, " start subscription")
//加载之前的位置 //加载之前的位置
if cs.subscribeMethod == MethodLast { if cs.subscribeMethod == MethodLast {
@@ -136,7 +136,7 @@ func (cs *CustomerSubscriber) SubscribeRun() {
for { for {
if atomic.LoadInt32(&cs.isStop) != 0 { if atomic.LoadInt32(&cs.isStop) != 0 {
log.Info("topic ", cs.topic, " out of subscription") log.SInfo("topic ", cs.topic, " out of subscription")
break break
} }
@@ -146,14 +146,14 @@ func (cs *CustomerSubscriber) SubscribeRun() {
//todo 检测退出 //todo 检测退出
if cs.subscribe() == false { if cs.subscribe() == false {
log.Info("topic ", cs.topic, " out of subscription") log.SInfo("topic ", cs.topic, " out of subscription")
break break
} }
} }
//删除订阅关系 //删除订阅关系
cs.subscriber.removeCustomer(cs.customerId, cs) cs.subscriber.removeCustomer(cs.customerId, cs)
log.Info("topic ", cs.topic, " unsubscription") log.SInfo("topic ", cs.topic, " unsubscription")
} }
func (cs *CustomerSubscriber) subscribe() bool { func (cs *CustomerSubscriber) subscribe() bool {

View File

@@ -63,7 +63,7 @@ func (ms *MessageQueueService) ReadCfg() error {
maxProcessTopicBacklogNum, ok := mapDBServiceCfg["MaxProcessTopicBacklogNum"] maxProcessTopicBacklogNum, ok := mapDBServiceCfg["MaxProcessTopicBacklogNum"]
if ok == false { if ok == false {
ms.maxProcessTopicBacklogNum = DefaultMaxTopicBacklogNum ms.maxProcessTopicBacklogNum = DefaultMaxTopicBacklogNum
log.Info("MaxProcessTopicBacklogNum config is set to the default value of ", maxProcessTopicBacklogNum) log.SInfo("MaxProcessTopicBacklogNum config is set to the default value of ", maxProcessTopicBacklogNum)
} else { } else {
ms.maxProcessTopicBacklogNum = int32(maxProcessTopicBacklogNum.(float64)) ms.maxProcessTopicBacklogNum = int32(maxProcessTopicBacklogNum.(float64))
} }
@@ -71,7 +71,7 @@ func (ms *MessageQueueService) ReadCfg() error {
memoryQueueLen, ok := mapDBServiceCfg["MemoryQueueLen"] memoryQueueLen, ok := mapDBServiceCfg["MemoryQueueLen"]
if ok == false { if ok == false {
ms.memoryQueueLen = DefaultMemoryQueueLen ms.memoryQueueLen = DefaultMemoryQueueLen
log.Info("MemoryQueueLen config is set to the default value of ", DefaultMemoryQueueLen) log.SInfo("MemoryQueueLen config is set to the default value of ", DefaultMemoryQueueLen)
} else { } else {
ms.memoryQueueLen = int32(memoryQueueLen.(float64)) ms.memoryQueueLen = int32(memoryQueueLen.(float64))
} }

View File

@@ -237,7 +237,7 @@ func (mp *MongoPersist) findTopicData(topic string, startIndex uint64, limit int
defer cancelAll() defer cancelAll()
err = cursor.All(ctxAll, &res) err = cursor.All(ctxAll, &res)
if err != nil { if err != nil {
log.Error("find collect name ", topic, " is error", log.ErrorAttr("err", err)) log.Error("find collect name error",log.String("topic",topic) ,log.ErrorField("err",err))
return nil, false return nil, false
} }
@@ -246,7 +246,7 @@ func (mp *MongoPersist) findTopicData(topic string, startIndex uint64, limit int
rawData, errM := bson.Marshal(res[i]) rawData, errM := bson.Marshal(res[i])
if errM != nil { if errM != nil {
if errM != nil { if errM != nil {
log.Error("collect name ", topic, " Marshal is error", log.ErrorAttr("err", err)) log.Error("Marshal error",log.String("topic",topic) , log.ErrorField("err", err))
return nil, false return nil, false
} }
continue continue
@@ -391,7 +391,7 @@ func (mp *MongoPersist) GetIndex(topicData *TopicData) uint64 {
if e.Key == "_id" { if e.Key == "_id" {
errC, seq := convertToNumber[uint64](e.Value) errC, seq := convertToNumber[uint64](e.Value)
if errC != nil { if errC != nil {
log.Error("value is error:%s,%+v, ", errC.Error(), e.Value) log.Error("value is error", log.ErrorField("err",errC), log.Any("val",e.Value))
} }
return seq return seq

View File

@@ -56,9 +56,9 @@ func (ss *Subscriber) TopicSubscribe(rpcHandler rpc.IRpcHandler, subScribeType r
} }
if ok == true { if ok == true {
log.Info("repeat subscription for customer ", customerId) log.SInfo("repeat subscription for customer ", customerId)
} else { } else {
log.Info("subscription for customer ", customerId) log.SInfo("subscription for customer ", customerId)
} }
} }
@@ -72,7 +72,7 @@ func (ss *Subscriber) UnSubscribe(customerId string) {
customerSubscriber, ok := ss.mapCustomer[customerId] customerSubscriber, ok := ss.mapCustomer[customerId]
if ok == false { if ok == false {
log.SWarning("failed to unsubscribe customer " + customerId) log.SWarn("failed to unsubscribe customer ", customerId)
return return
} }

View File

@@ -93,7 +93,7 @@ func (tr *TopicRoom) Stop() {
func (tr *TopicRoom) topicRoomRun() { func (tr *TopicRoom) topicRoomRun() {
defer tr.queueWait.Done() defer tr.queueWait.Done()
log.Info("topic room ", tr.topic, " is running..") log.SInfo("topic room ", tr.topic, " is running..")
for { for {
if atomic.LoadInt32(&tr.isStop) != 0 { if atomic.LoadInt32(&tr.isStop) != 0 {
break break
@@ -145,5 +145,5 @@ func (tr *TopicRoom) topicRoomRun() {
} }
tr.customerLocker.Unlock() tr.customerLocker.Unlock()
log.Info("topic room ", tr.topic, " is stop") log.SInfo("topic room ", tr.topic, " is stop")
} }

View File

@@ -142,13 +142,13 @@ func (mp *MongoPersist) OnSetupRank(manual bool, rankSkip *RankSkip) error {
return nil return nil
} }
log.Info("start load rank ", rankSkip.GetRankName(), " from mongodb.") log.SInfo("start load rank ", rankSkip.GetRankName(), " from mongodb.")
err := mp.loadFromDB(rankSkip.GetRankID(), rankSkip.GetRankName()) err := mp.loadFromDB(rankSkip.GetRankID(), rankSkip.GetRankName())
if err != nil { if err != nil {
log.SError("load from db is fail :%s", err.Error()) log.SError("load from db is fail :%s", err.Error())
return err return err
} }
log.Info("finish load rank ", rankSkip.GetRankName(), " from mongodb.") log.SInfo("finish load rank ", rankSkip.GetRankName(), " from mongodb.")
return nil return nil
} }
@@ -296,7 +296,7 @@ func (mp *MongoPersist) saveToDB() {
buf := make([]byte, 4096) buf := make([]byte, 4096)
l := runtime.Stack(buf, false) l := runtime.Stack(buf, false)
errString := fmt.Sprint(r) errString := fmt.Sprint(r)
log.Dump(string(buf[:l]), log.String("error", errString)) log.StackError(string(buf[:l]), log.String("error", errString))
} }
}() }()

View File

@@ -14,6 +14,7 @@ import (
"time" "time"
) )
// Deprecated: replace it with the TcpModule
type TcpService struct { type TcpService struct {
tcpServer network.TCPServer tcpServer network.TCPServer
service.Service service.Service

View File

@@ -12,6 +12,7 @@ import (
"sync" "sync"
) )
// Deprecated: replace it with the WSModule
type WSService struct { type WSService struct {
service.Service service.Service
wsServer network.WSServer wsServer network.WSServer

View File

@@ -32,10 +32,10 @@ func Abs[NumType typ.Signed | typ.Float](Num NumType) NumType {
func AddSafe[NumType typ.Number](number1 NumType, number2 NumType) (NumType, bool) { func AddSafe[NumType typ.Number](number1 NumType, number2 NumType) (NumType, bool) {
ret := number1 + number2 ret := number1 + number2
if number2 > 0 && ret < number1 { if number2 > 0 && ret < number1 {
log.Stack("Calculation overflow", log.Any("number1", number1), log.Any("number2", number2)) log.SStackError("Calculation overflow", log.Any("number1", number1), log.Any("number2", number2))
return ret, false return ret, false
} else if number2 < 0 && ret > number1 { } else if number2 < 0 && ret > number1 {
log.Stack("Calculation overflow", log.Any("number1", number1), log.Any("number2", number2)) log.SStackError("Calculation overflow", log.Any("number1", number1), log.Any("number2", number2))
return ret, false return ret, false
} }
@@ -45,10 +45,10 @@ func AddSafe[NumType typ.Number](number1 NumType, number2 NumType) (NumType, boo
func SubSafe[NumType typ.Number](number1 NumType, number2 NumType) (NumType, bool) { func SubSafe[NumType typ.Number](number1 NumType, number2 NumType) (NumType, bool) {
ret := number1 - number2 ret := number1 - number2
if number2 > 0 && ret > number1 { if number2 > 0 && ret > number1 {
log.Stack("Calculation overflow", log.Any("number1", number1), log.Any("number2", number2)) log.SStackError("Calculation overflow", log.Any("number1", number1), log.Any("number2", number2))
return ret, false return ret, false
} else if number2 < 0 && ret < number1 { } else if number2 < 0 && ret < number1 {
log.Stack("Calculation overflow", log.Any("number1", number1), log.Any("number2", number2)) log.SStackError("Calculation overflow", log.Any("number1", number1), log.Any("number2", number2))
return ret, false return ret, false
} }
@@ -65,7 +65,7 @@ func MulSafe[NumType typ.Number](number1 NumType, number2 NumType) (NumType, boo
return ret, true return ret, true
} }
log.Stack("Calculation overflow", log.Any("number1", number1), log.Any("number2", number2)) log.SStackError("Calculation overflow", log.Any("number1", number1), log.Any("number2", number2))
return ret, true return ret, true
} }