Compare commits

..

10 Commits

Author SHA1 Message Date
boyce
6c44ba180c 优化kafkamoudule日志 2025-05-21 21:38:36 +08:00
boyce
ecfd42bdec 优化消息队列日志 2025-05-21 21:34:32 +08:00
boyce
01d3b3e535 优化RankService日志 2025-05-21 21:22:24 +08:00
boyce
550d65a354 优化mysql模块日志 2025-05-21 21:19:03 +08:00
boyce
15580ffce9 新增wss证书配置支持 2025-04-21 21:23:34 +08:00
boyce
bd467a219b 废弃掉HttpService、TcpService、WSService 2025-03-28 10:33:51 +08:00
boyce
af15615345 优化日志生成路径 2025-03-14 18:03:01 +08:00
boyce
50dd80b082 整理代码 2025-03-10 11:35:19 +08:00
boyce
a6487dd41e 将默认日志改为rotatelogs 2025-01-25 00:14:18 +08:00
boyce
d5299294d8 优化日志,新增rotatelogs库支持 2025-01-25 00:04:31 +08:00
21 changed files with 214 additions and 125 deletions

View File

@@ -40,8 +40,6 @@ type NodeInfo struct {
DiscoveryService []DiscoveryService //筛选发现的服务,如果不配置,不进行筛选
status NodeStatus
Retire bool
NetworkName string
}
type NodeRpcInfo struct {

View File

@@ -3,24 +3,23 @@ package cluster
import "github.com/duanhf2012/origin/v2/rpc"
type ConfigDiscovery struct {
funDelNode FunDelNode
funSetNode FunSetNode
funDelNode FunDelNode
funSetNode FunSetNode
localNodeId string
}
func (discovery *ConfigDiscovery) InitDiscovery(localNodeId string,funDelNode FunDelNode,funSetNode FunSetNode) error{
func (discovery *ConfigDiscovery) InitDiscovery(localNodeId string, funDelNode FunDelNode, funSetNode FunSetNode) error {
discovery.localNodeId = localNodeId
discovery.funDelNode = funDelNode
discovery.funSetNode = funSetNode
//解析本地其他服务配置
_,nodeInfoList,_,err := GetCluster().readLocalClusterConfig(rpc.NodeIdNull)
_, nodeInfoList, _, err := GetCluster().readLocalClusterConfig(rpc.NodeIdNull)
if err != nil {
return err
}
for _,nodeInfo := range nodeInfoList {
for _, nodeInfo := range nodeInfoList {
if nodeInfo.NodeId == localNodeId {
continue
}
@@ -30,5 +29,3 @@ func (discovery *ConfigDiscovery) InitDiscovery(localNodeId string,funDelNode Fu
return nil
}

View File

@@ -5,17 +5,17 @@ import (
"github.com/duanhf2012/origin/v2/service"
)
func (cls *Cluster) setupDiscovery(localNodeId string, setupServiceFun SetupServiceFun) error{
func (cls *Cluster) setupDiscovery(localNodeId string, setupServiceFun SetupServiceFun) error {
if cls.discoveryInfo.getDiscoveryType() == OriginType { //origin类型服务发现
return cls.setupOriginDiscovery(localNodeId,setupServiceFun)
}else if cls.discoveryInfo.getDiscoveryType() == EtcdType{//etcd类型服务发现
return cls.setupEtcdDiscovery(localNodeId,setupServiceFun)
return cls.setupOriginDiscovery(localNodeId, setupServiceFun)
} else if cls.discoveryInfo.getDiscoveryType() == EtcdType { //etcd类型服务发现
return cls.setupEtcdDiscovery(localNodeId, setupServiceFun)
}
return cls.setupConfigDiscovery(localNodeId,setupServiceFun)
return cls.setupConfigDiscovery(localNodeId, setupServiceFun)
}
func (cls *Cluster) setupOriginDiscovery(localNodeId string, setupServiceFun SetupServiceFun) error{
func (cls *Cluster) setupOriginDiscovery(localNodeId string, setupServiceFun SetupServiceFun) error {
if cls.serviceDiscovery != nil {
return errors.New("service discovery has been setup")
}
@@ -27,6 +27,7 @@ func (cls *Cluster) setupOriginDiscovery(localNodeId string, setupServiceFun Set
}
cls.serviceDiscovery = getOriginDiscovery()
//2.如果为动态服务发现安装本地发现服务
if localMaster == true {
setupServiceFun(&masterService)
@@ -36,11 +37,10 @@ func (cls *Cluster) setupOriginDiscovery(localNodeId string, setupServiceFun Set
setupServiceFun(&clientService)
cls.AddDiscoveryService(OriginDiscoveryClientName, true)
return nil
}
func (cls *Cluster) setupEtcdDiscovery(localNodeId string, setupServiceFun SetupServiceFun) error{
func (cls *Cluster) setupEtcdDiscovery(localNodeId string, setupServiceFun SetupServiceFun) error {
if cls.serviceDiscovery != nil {
return errors.New("service discovery has been setup")
}
@@ -48,12 +48,12 @@ func (cls *Cluster) setupEtcdDiscovery(localNodeId string, setupServiceFun Setup
//setup etcd service
cls.serviceDiscovery = getEtcdDiscovery()
setupServiceFun(cls.serviceDiscovery.(service.IService))
cls.AddDiscoveryService(cls.serviceDiscovery.(service.IService).GetName(),false)
cls.AddDiscoveryService(cls.serviceDiscovery.(service.IService).GetName(), false)
return nil
}
func (cls *Cluster) setupConfigDiscovery(localNodeId string, setupServiceFun SetupServiceFun) error{
func (cls *Cluster) setupConfigDiscovery(localNodeId string, setupServiceFun SetupServiceFun) error {
if cls.serviceDiscovery != nil {
return errors.New("service discovery has been setup")
}

View File

@@ -1,6 +1,11 @@
package cluster
import (
"context"
"crypto/tls"
"crypto/x509"
"errors"
"fmt"
"github.com/duanhf2012/origin/v2/event"
"github.com/duanhf2012/origin/v2/log"
"github.com/duanhf2012/origin/v2/rpc"
@@ -9,16 +14,11 @@ import (
"go.etcd.io/etcd/api/v3/mvccpb"
"go.etcd.io/etcd/client/v3"
"google.golang.org/protobuf/proto"
"time"
"context"
"errors"
"fmt"
"os"
"path"
"strings"
"sync/atomic"
"io/ioutil"
"crypto/x509"
"crypto/tls"
"time"
)
const originDir = "/origin"
@@ -42,7 +42,8 @@ type EtcdDiscoveryService struct {
mapDiscoveryNodeId map[string]map[string]struct{} //map[networkName]map[nodeId]
}
var etcdDiscovery *EtcdDiscoveryService
var etcdDiscovery *EtcdDiscoveryService
func getEtcdDiscovery() IServiceDiscovery {
if etcdDiscovery == nil {
etcdDiscovery = &EtcdDiscoveryService{}
@@ -51,7 +52,6 @@ func getEtcdDiscovery() IServiceDiscovery {
return etcdDiscovery
}
func (ed *EtcdDiscoveryService) InitDiscovery(localNodeId string, funDelNode FunDelNode, funSetNode FunSetNode) error {
ed.localNodeId = localNodeId
@@ -99,17 +99,17 @@ func (ed *EtcdDiscoveryService) OnInit() error {
if etcdDiscoveryCfg.EtcdList[i].Cert != "" {
// load cert
cert, cerr := tls.LoadX509KeyPair(etcdDiscoveryCfg.EtcdList[i].Cert, etcdDiscoveryCfg.EtcdList[i].CertKey)
if cerr != nil {
log.Error("load cert error", log.ErrorField("err", cerr))
return cerr
cert, cErr := tls.LoadX509KeyPair(etcdDiscoveryCfg.EtcdList[i].Cert, etcdDiscoveryCfg.EtcdList[i].CertKey)
if cErr != nil {
log.Error("load cert error", log.ErrorField("err", cErr))
return cErr
}
// load root ca
caData, cerr := ioutil.ReadFile(etcdDiscoveryCfg.EtcdList[i].Ca)
if cerr != nil {
log.Error("load root ca error", log.ErrorField("err", cerr))
return cerr
caData, cErr := os.ReadFile(etcdDiscoveryCfg.EtcdList[i].Ca)
if cErr != nil {
log.Error("load root ca error", log.ErrorField("err", cErr))
return cErr
}
pool := x509.NewCertPool()
pool.AppendCertsFromPEM(caData)
@@ -122,13 +122,12 @@ func (ed *EtcdDiscoveryService) OnInit() error {
client, err = clientv3.New(clientv3.Config{
Endpoints: etcdDiscoveryCfg.EtcdList[i].Endpoints,
DialTimeout: etcdDiscoveryCfg.DialTimeoutMillisecond,
Username: etcdDiscoveryCfg.EtcdList[i].UserName,
Password: etcdDiscoveryCfg.EtcdList[i].Password,
Username: etcdDiscoveryCfg.EtcdList[i].UserName,
Password: etcdDiscoveryCfg.EtcdList[i].Password,
Logger: log.GetLogger().Logger,
TLS: tlsConfig,
TLS: tlsConfig,
})
if err != nil {
log.Error("etcd discovery init fail", log.ErrorField("err", err))
return err

View File

@@ -1,31 +1,37 @@
package log
import (
"github.com/duanhf2012/rotatelogs"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"gopkg.in/natefinch/lumberjack.v2"
"os"
"path/filepath"
"strings"
"time"
)
var isSetLogger bool
var gLogger = NewDefaultLogger()
var LogLevel zapcore.Level
var MaxSize int
var LogPath string
var OpenConsole *bool
var LogChanLen int
type Logger struct {
*zap.Logger
stack bool
OpenConsole *bool
LogPath string
FileName string
Skip int
LogLevel zapcore.Level
Encoder zapcore.Encoder
LogConfig *lumberjack.Logger
SugaredLogger *zap.SugaredLogger
CoreList []zapcore.Core
FileName string
Skip int
Encoder zapcore.Encoder
SugaredLogger *zap.SugaredLogger
CoreList []zapcore.Core
WriteSyncerFun []func() zapcore.WriteSyncer
}
// 设置Logger
func SetLogger(logger *Logger) {
if logger != nil {
gLogger = logger
@@ -33,6 +39,15 @@ func SetLogger(logger *Logger) {
}
}
// 设置ZapLogger
func SetZapLogger(zapLogger *zap.Logger) {
if zapLogger != nil {
gLogger = &Logger{}
gLogger.Logger = zapLogger
isSetLogger = true
}
}
func GetLogger() *Logger {
return gLogger
}
@@ -47,8 +62,8 @@ func (logger *Logger) SetSkip(skip int) {
func GetJsonEncoder() zapcore.Encoder {
encoderConfig := zap.NewProductionEncoderConfig()
encoderConfig.EncodeLevel = zapcore.CapitalLevelEncoder
encoderConfig.EncodeCaller = zapcore.ShortCallerEncoder
encoderConfig.EncodeLevel = zapcore.CapitalLevelEncoder
encoderConfig.EncodeTime = func(t time.Time, enc zapcore.PrimitiveArrayEncoder) {
enc.AppendString(t.Format("2006-01-02 15:04:05.000"))
}
@@ -67,51 +82,100 @@ func GetTxtEncoder() zapcore.Encoder {
return zapcore.NewConsoleEncoder(encoderConfig)
}
func getLogConfig() *lumberjack.Logger {
func (logger *Logger) getLogConfig() *lumberjack.Logger {
return &lumberjack.Logger{
Filename: "",
MaxSize: 2048,
Filename: filepath.Join(LogPath, logger.FileName),
MaxSize: MaxSize,
MaxBackups: 0,
MaxAge: 0,
Compress: false,
LocalTime: true,
}
}
func NewDefaultLogger() *Logger {
logger := Logger{}
logger.Encoder = GetJsonEncoder()
logger.LogConfig = getLogConfig()
logger.LogConfig.LocalTime = true
core := zapcore.NewCore(logger.Encoder, zapcore.AddSync(os.Stdout), zap.InfoLevel)
logger.Logger = zap.New(core, zap.AddCaller(), zap.AddCallerSkip(1))
logger.Init()
return &logger
}
func (logger *Logger) SetLogLevel(level zapcore.Level) {
logger.LogLevel = level
func (logger *Logger) SetSyncers(syncers ...func() zapcore.WriteSyncer) {
logger.WriteSyncerFun = syncers
}
func (logger *Logger) AppendSyncerFun(syncerFun func() zapcore.WriteSyncer) {
logger.WriteSyncerFun = append(logger.WriteSyncerFun, syncerFun)
}
func SetLogLevel(level zapcore.Level) {
LogLevel = level
}
func (logger *Logger) Enabled(zapcore.Level) bool {
return logger.stack
}
func (logger *Logger) NewLumberjackWriter() zapcore.WriteSyncer {
return zapcore.AddSync(
&lumberjack.Logger{
Filename: filepath.Join(LogPath, logger.FileName),
MaxSize: MaxSize,
MaxBackups: 0,
MaxAge: 0,
Compress: false,
LocalTime: true,
})
}
func (logger *Logger) NewRotatelogsWriter() zapcore.WriteSyncer {
var options []rotatelogs.Option
if MaxSize > 0 {
options = append(options, rotatelogs.WithRotateMaxSize(int64(MaxSize)))
}
if LogChanLen > 0 {
options = append(options, rotatelogs.WithChannelLen(LogChanLen))
}
options = append(options, rotatelogs.WithRotationTime(time.Hour*24))
fileName := strings.TrimRight(logger.FileName, filepath.Ext(logger.FileName))
rotateLogs, err := rotatelogs.NewRotateLogs(LogPath, "20060102/"+fileName+"_20060102_150405", options...)
if err != nil {
panic(err)
}
return zapcore.AddSync(rotateLogs)
}
func (logger *Logger) Init() {
if isSetLogger {
return
}
var syncerList []zapcore.WriteSyncer
if logger.WriteSyncerFun == nil {
syncerList = append(syncerList, logger.NewRotatelogsWriter())
} else {
for _, syncer := range logger.WriteSyncerFun {
syncerList = append(syncerList, syncer())
}
}
var coreList []zapcore.Core
if logger.OpenConsole == nil || *logger.OpenConsole {
core := zapcore.NewCore(logger.Encoder, zapcore.AddSync(os.Stdout), logger.LogLevel)
if OpenConsole == nil || *OpenConsole {
syncerList = append(syncerList, zapcore.AddSync(os.Stdout))
}
for _, writer := range syncerList {
core := zapcore.NewCore(logger.Encoder, writer, LogLevel)
coreList = append(coreList, core)
}
if logger.CoreList != nil {
coreList = append(coreList, logger.CoreList...)
}else if logger.LogPath != "" {
WriteSyncer := zapcore.AddSync(logger.LogConfig)
core := zapcore.NewCore(logger.Encoder, WriteSyncer, logger.LogLevel)
coreList = append(coreList, core)
}
core := zapcore.NewTee(coreList...)

View File

@@ -73,9 +73,22 @@ func (handler *WSHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
handler.conns[conn] = struct{}{}
handler.mutexConns.Unlock()
c,ok:=conn.NetConn().(*net.TCPConn)
if !ok {
tlsConn,ok := conn.NetConn().(*tls.Conn)
if !ok {
log.Error("conn error")
return
}
c,ok = tlsConn.NetConn().(*net.TCPConn)
if !ok {
log.Error("conn error")
return
}
}
conn.UnderlyingConn().(*net.TCPConn).SetLinger(0)
conn.UnderlyingConn().(*net.TCPConn).SetNoDelay(true)
c.SetLinger(0)
c.SetNoDelay(true)
wsConn := newWSConn(conn, r.Header, handler.pendingWriteNum, handler.maxMsgLen, handler.messageType)
agent := handler.newAgent(wsConn)
agent.Run()

View File

@@ -17,7 +17,6 @@ import (
_ "net/http/pprof"
"os"
"os/signal"
"path/filepath"
"strconv"
"strings"
"syscall"
@@ -59,6 +58,7 @@ func init() {
console.RegisterCommandString("loglevel", "debug", "<-loglevel debug|info|warn|error|stackerror|fatal> Set loglevel.", setLevel)
console.RegisterCommandString("logpath", "", "<-logpath path> Set log file path.", setLogPath)
console.RegisterCommandInt("logsize", 0, "<-logsize size> Set log size(MB).", setLogSize)
console.RegisterCommandInt("logchanlen", 0, "<-logchanlen len> Set log channel len.", setLogChanLen)
console.RegisterCommandString("pprof", "", "<-pprof ip:port> Open performance analysis.", setPprof)
}
@@ -220,7 +220,7 @@ func initNode(id string) {
func initLog() error {
logger := log.GetLogger()
if logger.LogPath == "" {
if log.LogPath == "" {
err := setLogPath("./log")
if err != nil {
return err
@@ -230,7 +230,6 @@ func initLog() error {
localNodeInfo := cluster.GetCluster().GetLocalNodeInfo()
fileName := fmt.Sprintf("%s.log", localNodeInfo.NodeId)
logger.FileName = fileName
logger.LogConfig.Filename = filepath.Join(logger.LogPath, logger.FileName)
logger.Init()
return nil
@@ -440,10 +439,10 @@ func openConsole(args interface{}) error {
strOpen := strings.ToLower(strings.TrimSpace(args.(string)))
if strOpen == "false" {
bOpenConsole := false
log.GetLogger().OpenConsole = &bOpenConsole
log.OpenConsole = &bOpenConsole
} else if strOpen == "true" {
bOpenConsole := true
log.GetLogger().OpenConsole = &bOpenConsole
log.OpenConsole = &bOpenConsole
} else {
return errors.New("parameter console error")
}
@@ -458,17 +457,17 @@ func setLevel(args interface{}) error {
strlogLevel := strings.TrimSpace(args.(string))
switch strlogLevel {
case "debug":
log.GetLogger().LogLevel = zapcore.DebugLevel
log.LogLevel = zapcore.DebugLevel
case "info":
log.GetLogger().LogLevel = zapcore.InfoLevel
log.LogLevel = zapcore.InfoLevel
case "warn":
log.GetLogger().LogLevel = zapcore.WarnLevel
log.LogLevel = zapcore.WarnLevel
case "error":
log.GetLogger().LogLevel = zapcore.ErrorLevel
log.LogLevel = zapcore.ErrorLevel
case "stackerror":
log.GetLogger().LogLevel = zapcore.ErrorLevel
log.LogLevel = zapcore.ErrorLevel
case "fatal":
log.GetLogger().LogLevel = zapcore.FatalLevel
log.LogLevel = zapcore.FatalLevel
default:
return errors.New("unknown level: " + strlogLevel)
}
@@ -480,11 +479,7 @@ func setLogPath(args interface{}) error {
return nil
}
logPath := strings.TrimSpace(args.(string))
dir, err := os.Stat(logPath)
if err == nil && dir.IsDir() == false {
return errors.New("Not found dir " + logPath)
}
_, err := os.Stat(logPath)
if err != nil {
err = os.MkdirAll(logPath, os.ModePerm)
if err != nil {
@@ -492,7 +487,7 @@ func setLogPath(args interface{}) error {
}
}
log.GetLogger().LogPath = logPath
log.LogPath = logPath
return nil
}
@@ -505,7 +500,20 @@ func setLogSize(args interface{}) error {
return nil
}
log.GetLogger().LogConfig.MaxSize = logSize
log.MaxSize = logSize
return nil
}
func setLogChanLen(args interface{}) error {
logChanLen, ok := args.(int)
if ok == false {
return errors.New("param logsize is error")
}
if logChanLen == 0 {
return nil
}
log.LogChanLen = logChanLen
return nil
}

View File

@@ -192,7 +192,7 @@ func (s *Service) run() {
break
}
if s.profiler != nil {
analyzer = s.profiler.Push("[Req]" + rpcRequest.RpcRequestData.GetServiceMethod())
analyzer = s.profiler.Push("[RpcReq]" + rpcRequest.RpcRequestData.GetServiceMethod()+"."+strconv.Itoa(int(rpcRequest.RpcRequestData.GetRpcMethodId())))
}
s.GetRpcHandler().HandlerRpcRequest(rpcRequest)

View File

@@ -248,7 +248,7 @@ func (ch *ConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession
select {
case msg := <-claim.Messages():
if msg == nil {
log.SWarning("claim will exit", log.Any("topic", claim.Topic()), log.Any("Partition", claim.Partition()))
log.SWarn("claim will exit", log.Any("topic", claim.Topic()), log.Any("Partition", claim.Partition()))
return nil
}
ch.AppendMsg(session, msg)

View File

@@ -86,7 +86,7 @@ func (p *Producer) asyncRun() {
asyncReturn := sm.Metadata.(*AsyncReturn)
asyncReturn.chanReturn <- asyncReturn
case em := <-p.Errors():
log.Error("async kafkamodule error", log.ErrorAttr("err", em.Err))
log.Error("async kafkamodule error", log.ErrorField("err", em.Err))
if em.Msg.Metadata == nil {
break
}

View File

@@ -95,7 +95,7 @@ func (m *MySQLModule) Begin() (*Tx, error) {
var txDBModule Tx
txDb, err := m.db.Begin()
if err != nil {
log.Error("Begin error:%s", err.Error())
log.Error("Begin error", log.ErrorField("err",err))
return &txDBModule, err
}
txDBModule.slowDuration = m.slowDuration
@@ -155,7 +155,7 @@ func (m *MySQLModule) runPing() {
for {
select {
case <-m.pingCoroutine.pintExit:
log.Error("RunPing stopping %s...", fmt.Sprintf("%T", m))
log.Error("RunPing stopping",log.String("url", m.url),log.String("dbname", m.dbname))
return
case <-m.pingCoroutine.tickerPing.C:
if m.db != nil {
@@ -221,12 +221,12 @@ func query(slowDuration time.Duration, db dbControl, strQuery string, args ...in
datasetList.blur = true
if checkArgs(args) != nil {
log.Error("CheckArgs is error :%s", strQuery)
log.Error("CheckArgs is error",log.String("sql",strQuery))
return &datasetList, fmt.Errorf("checkArgs is error")
}
if db == nil {
log.Error("cannot connect database:%s", strQuery)
log.Error("cannot connect database",log.String("sql", strQuery))
return &datasetList, fmt.Errorf("cannot connect database")
}
@@ -235,10 +235,10 @@ func query(slowDuration time.Duration, db dbControl, strQuery string, args ...in
timeFuncPass := time.Since(TimeFuncStart)
if checkSlow(slowDuration, timeFuncPass) {
log.Error("DBModule QueryEx Time %s , Query :%s , args :%+v", timeFuncPass, strQuery, args)
log.Error("Query slow",log.Int64("time_ms",timeFuncPass.Milliseconds()),log.String("sql", strQuery), log.Any("args",args))
}
if err != nil {
log.Error("Query:%s(%v)", strQuery, err)
log.Error("Query error", log.String("sql",strQuery),log.ErrorField("err",err))
if rows != nil {
rows.Close()
}
@@ -278,8 +278,8 @@ func query(slowDuration time.Duration, db dbControl, strQuery string, args ...in
hasRet := rows.NextResultSet()
if hasRet == false {
if rows.Err() != nil {
log.Error("Query:%s(%+v)", strQuery, rows)
if rowErr :=rows.Err();rowErr != nil {
log.Error("NextResultSet error", log.String("sql",strQuery), log.ErrorField("err",rowErr))
}
break
}
@@ -291,12 +291,12 @@ func query(slowDuration time.Duration, db dbControl, strQuery string, args ...in
func exec(slowDuration time.Duration, db dbControl, strSql string, args ...interface{}) (*DBResult, error) {
ret := &DBResult{}
if db == nil {
log.Error("cannot connect database:%s", strSql)
log.Error("cannot connect database", log.String("sql",strSql))
return ret, fmt.Errorf("cannot connect database")
}
if checkArgs(args) != nil {
log.Error("CheckArgs is error :%s", strSql)
log.Error("CheckArgs is error", log.String("sql",strSql))
return ret, fmt.Errorf("checkArgs is error")
}
@@ -304,10 +304,10 @@ func exec(slowDuration time.Duration, db dbControl, strSql string, args ...inter
res, err := db.Exec(strSql, args...)
timeFuncPass := time.Since(TimeFuncStart)
if checkSlow(slowDuration, timeFuncPass) {
log.Error("DBModule QueryEx Time %s , Query :%s , args :%+v", timeFuncPass, strSql, args)
log.Error("Exec slow",log.Int64("time_ms",timeFuncPass.Milliseconds()),log.String("sql",strSql),log.Any("args",args) )
}
if err != nil {
log.Error("Exec:%s(%v)", strSql, err)
log.Error("Exec error",log.String("sql",strSql),log.ErrorField("err", err))
return nil, err
}

View File

@@ -34,6 +34,8 @@ type WSCfg struct {
PendingWriteNum int
MaxMsgLen uint32
LittleEndian bool //是否小端序
KeyFile string
CertFile string
}
type WSPackType int8
@@ -62,13 +64,18 @@ func (ws *WSModule) OnInit() error {
ws.WSServer.MaxMsgLen = ws.wsCfg.MaxMsgLen
ws.WSServer.Addr = ws.wsCfg.ListenAddr
//3.设置解析处理器
if ws.wsCfg.KeyFile != "" && ws.wsCfg.CertFile != "" {
ws.WSServer.KeyFile = ws.wsCfg.KeyFile
ws.WSServer.CertFile = ws.wsCfg.CertFile
}
// 设置解析处理器
ws.process.SetByteOrder(ws.wsCfg.LittleEndian)
ws.mapClient = make(map[string]*WSClient, ws.WSServer.MaxConnNum)
ws.WSServer.NewAgent = ws.NewWSClient
//4.设置网络事件处理
// 设置网络事件处理
ws.GetEventProcessor().RegEventReceiverFunc(event.Sys_Event_WebSocket, ws.GetEventHandler(), ws.wsEventHandler)
return nil

View File

@@ -83,6 +83,7 @@ type HttpSession struct {
sessionDone chan *HttpSession
}
// Deprecated: replace it with the GinModule
type HttpService struct {
service.Service

View File

@@ -104,11 +104,11 @@ func (cs *CustomerSubscriber) UnSubscribe() {
func (cs *CustomerSubscriber) LoadLastIndex() {
for {
if atomic.LoadInt32(&cs.isStop) != 0 {
log.Info("topic ", cs.topic, " out of subscription")
log.SInfo("topic ", cs.topic, " out of subscription")
break
}
log.Info("customer ", cs.customerId, " start load last index ")
log.SInfo("customer ", cs.customerId, " start load last index ")
lastIndex, ret := cs.subscriber.dataPersist.LoadCustomerIndex(cs.topic, cs.customerId)
if ret == true {
if lastIndex > 0 {
@@ -116,18 +116,18 @@ func (cs *CustomerSubscriber) LoadLastIndex() {
} else {
//否则直接使用客户端发回来的
}
log.Info("customer ", cs.customerId, " load finish,start index is ", cs.StartIndex)
log.SInfo("customer ", cs.customerId, " load finish,start index is ", cs.StartIndex)
break
}
log.Info("customer ", cs.customerId, " load last index is fail...")
log.SInfo("customer ", cs.customerId, " load last index is fail...")
time.Sleep(5 * time.Second)
}
}
func (cs *CustomerSubscriber) SubscribeRun() {
defer cs.subscriber.queueWait.Done()
log.Info("topic ", cs.topic, " start subscription")
log.SInfo("topic ", cs.topic, " start subscription")
//加载之前的位置
if cs.subscribeMethod == MethodLast {
@@ -136,7 +136,7 @@ func (cs *CustomerSubscriber) SubscribeRun() {
for {
if atomic.LoadInt32(&cs.isStop) != 0 {
log.Info("topic ", cs.topic, " out of subscription")
log.SInfo("topic ", cs.topic, " out of subscription")
break
}
@@ -146,14 +146,14 @@ func (cs *CustomerSubscriber) SubscribeRun() {
//todo 检测退出
if cs.subscribe() == false {
log.Info("topic ", cs.topic, " out of subscription")
log.SInfo("topic ", cs.topic, " out of subscription")
break
}
}
//删除订阅关系
cs.subscriber.removeCustomer(cs.customerId, cs)
log.Info("topic ", cs.topic, " unsubscription")
log.SInfo("topic ", cs.topic, " unsubscription")
}
func (cs *CustomerSubscriber) subscribe() bool {

View File

@@ -63,7 +63,7 @@ func (ms *MessageQueueService) ReadCfg() error {
maxProcessTopicBacklogNum, ok := mapDBServiceCfg["MaxProcessTopicBacklogNum"]
if ok == false {
ms.maxProcessTopicBacklogNum = DefaultMaxTopicBacklogNum
log.Info("MaxProcessTopicBacklogNum config is set to the default value of ", maxProcessTopicBacklogNum)
log.SInfo("MaxProcessTopicBacklogNum config is set to the default value of ", maxProcessTopicBacklogNum)
} else {
ms.maxProcessTopicBacklogNum = int32(maxProcessTopicBacklogNum.(float64))
}
@@ -71,7 +71,7 @@ func (ms *MessageQueueService) ReadCfg() error {
memoryQueueLen, ok := mapDBServiceCfg["MemoryQueueLen"]
if ok == false {
ms.memoryQueueLen = DefaultMemoryQueueLen
log.Info("MemoryQueueLen config is set to the default value of ", DefaultMemoryQueueLen)
log.SInfo("MemoryQueueLen config is set to the default value of ", DefaultMemoryQueueLen)
} else {
ms.memoryQueueLen = int32(memoryQueueLen.(float64))
}

View File

@@ -237,7 +237,7 @@ func (mp *MongoPersist) findTopicData(topic string, startIndex uint64, limit int
defer cancelAll()
err = cursor.All(ctxAll, &res)
if err != nil {
log.Error("find collect name ", topic, " is error", log.ErrorAttr("err", err))
log.Error("find collect name error",log.String("topic",topic) ,log.ErrorField("err",err))
return nil, false
}
@@ -246,7 +246,7 @@ func (mp *MongoPersist) findTopicData(topic string, startIndex uint64, limit int
rawData, errM := bson.Marshal(res[i])
if errM != nil {
if errM != nil {
log.Error("collect name ", topic, " Marshal is error", log.ErrorAttr("err", err))
log.Error("Marshal error",log.String("topic",topic) , log.ErrorField("err", err))
return nil, false
}
continue
@@ -391,7 +391,7 @@ func (mp *MongoPersist) GetIndex(topicData *TopicData) uint64 {
if e.Key == "_id" {
errC, seq := convertToNumber[uint64](e.Value)
if errC != nil {
log.Error("value is error:%s,%+v, ", errC.Error(), e.Value)
log.Error("value is error", log.ErrorField("err",errC), log.Any("val",e.Value))
}
return seq

View File

@@ -56,9 +56,9 @@ func (ss *Subscriber) TopicSubscribe(rpcHandler rpc.IRpcHandler, subScribeType r
}
if ok == true {
log.Info("repeat subscription for customer ", customerId)
log.SInfo("repeat subscription for customer ", customerId)
} else {
log.Info("subscription for customer ", customerId)
log.SInfo("subscription for customer ", customerId)
}
}
@@ -72,7 +72,7 @@ func (ss *Subscriber) UnSubscribe(customerId string) {
customerSubscriber, ok := ss.mapCustomer[customerId]
if ok == false {
log.SWarning("failed to unsubscribe customer " + customerId)
log.SWarn("failed to unsubscribe customer ", customerId)
return
}

View File

@@ -93,7 +93,7 @@ func (tr *TopicRoom) Stop() {
func (tr *TopicRoom) topicRoomRun() {
defer tr.queueWait.Done()
log.Info("topic room ", tr.topic, " is running..")
log.SInfo("topic room ", tr.topic, " is running..")
for {
if atomic.LoadInt32(&tr.isStop) != 0 {
break
@@ -145,5 +145,5 @@ func (tr *TopicRoom) topicRoomRun() {
}
tr.customerLocker.Unlock()
log.Info("topic room ", tr.topic, " is stop")
log.SInfo("topic room ", tr.topic, " is stop")
}

View File

@@ -142,13 +142,13 @@ func (mp *MongoPersist) OnSetupRank(manual bool, rankSkip *RankSkip) error {
return nil
}
log.Info("start load rank ", rankSkip.GetRankName(), " from mongodb.")
log.SInfo("start load rank ", rankSkip.GetRankName(), " from mongodb.")
err := mp.loadFromDB(rankSkip.GetRankID(), rankSkip.GetRankName())
if err != nil {
log.SError("load from db is fail :%s", err.Error())
return err
}
log.Info("finish load rank ", rankSkip.GetRankName(), " from mongodb.")
log.SInfo("finish load rank ", rankSkip.GetRankName(), " from mongodb.")
return nil
}
@@ -296,7 +296,7 @@ func (mp *MongoPersist) saveToDB() {
buf := make([]byte, 4096)
l := runtime.Stack(buf, false)
errString := fmt.Sprint(r)
log.Dump(string(buf[:l]), log.String("error", errString))
log.StackError(string(buf[:l]), log.String("error", errString))
}
}()

View File

@@ -14,6 +14,7 @@ import (
"time"
)
// Deprecated: replace it with the TcpModule
type TcpService struct {
tcpServer network.TCPServer
service.Service

View File

@@ -12,6 +12,7 @@ import (
"sync"
)
// Deprecated: replace it with the WSModule
type WSService struct {
service.Service
wsServer network.WSServer