diff --git a/log/buffer.go b/log/buffer.go index 52a05ae..a4dfc36 100644 --- a/log/buffer.go +++ b/log/buffer.go @@ -2,27 +2,20 @@ package log // import "go.uber.org/zap/buffer" import ( "strconv" - "sync" ) const _size = 9216 type Buffer struct { bs []byte - mu sync.Mutex // ensures atomic writes; protects the following fields + //mu sync.Mutex // ensures atomic writes; protects the following fields } func (buff *Buffer) Init(){ buff.bs = make([]byte,_size) } -func (buff *Buffer) Locker() { - buff.mu.Lock() -} -func (buff *Buffer) UnLocker() { - buff.mu.Unlock() -} // AppendByte writes a single byte to the Buffer. func (b *Buffer) AppendByte(v byte) { diff --git a/log/handler.go b/log/handler.go new file mode 100644 index 0000000..889db07 --- /dev/null +++ b/log/handler.go @@ -0,0 +1,146 @@ +package log + +import ( + "log/slog" + "io" + "path/filepath" + "context" + "runtime" + "runtime/debug" + "sync" +) + +type IOriginHandler interface { + slog.Handler + Lock() + UnLock() +} + +type BaseHandler struct { + addSource bool + w io.Writer + locker sync.Mutex +} + +type OriginTextHandler struct { + BaseHandler + *slog.TextHandler +} + +type OriginJsonHandler struct { + BaseHandler + *slog.JSONHandler +} + +func getStrLevel(level slog.Level) string{ + switch level { + case LevelTrace: + return "TRACE" + case LevelDebug: + return "DEBUG" + case LevelInfo: + return "INFO" + case LevelWarning: + return "WARNING" + case LevelError: + return "ERROR" + case LevelStack: + return "STACK" + case LevelDump: + return "DUMP" + case LevelFatal: + return "FATAL" + } + + return "" +} + +func defaultReplaceAttr(groups []string, a slog.Attr) slog.Attr { + if a.Key == slog.LevelKey { + level := a.Value.Any().(slog.Level) + a.Value = slog.StringValue(getStrLevel(level)) + }else if a.Key == slog.TimeKey && len(groups) == 0 { + a.Value = slog.StringValue(a.Value.Time().Format("2006/01/02 15:04:05")) + }else if a.Key == slog.SourceKey { + source := a.Value.Any().(*slog.Source) + source.File = filepath.Base(source.File) + } + return a +} + +func NewOriginTextHandler(level slog.Level,w io.Writer,addSource bool,replaceAttr func([]string,slog.Attr) slog.Attr) slog.Handler{ + var textHandler OriginTextHandler + textHandler.addSource = addSource + textHandler.w = w + textHandler.TextHandler = slog.NewTextHandler(w,&slog.HandlerOptions{ + AddSource: addSource, + Level: level, + ReplaceAttr: replaceAttr, + }) + + return &textHandler +} + +func (oh *OriginTextHandler) Handle(context context.Context, record slog.Record) error{ + oh.Fill(context,&record) + oh.locker.Lock() + defer oh.locker.Unlock() + + if record.Level == LevelStack || record.Level == LevelFatal{ + err := oh.TextHandler.Handle(context, record) + oh.logStack(&record) + return err + }else if record.Level == LevelDump { + strDump := record.Message + record.Message = "dump info" + err := oh.TextHandler.Handle(context, record) + oh.w.Write([]byte(strDump)) + return err + } + + return oh.TextHandler.Handle(context, record) +} + +func (b *BaseHandler) logStack(record *slog.Record){ + b.w.Write(debug.Stack()) +} + +func (b *BaseHandler) Lock(){ + b.locker.Lock() +} + +func (b *BaseHandler) UnLock(){ + b.locker.Unlock() +} + +func NewOriginJsonHandler(level slog.Level,w io.Writer,addSource bool,replaceAttr func([]string,slog.Attr) slog.Attr) slog.Handler{ + var jsonHandler OriginJsonHandler + jsonHandler.addSource = addSource + jsonHandler.w = w + jsonHandler.JSONHandler = slog.NewJSONHandler(w,&slog.HandlerOptions{ + AddSource: addSource, + Level: level, + ReplaceAttr: replaceAttr, + }) + + return &jsonHandler +} + +func (oh *OriginJsonHandler) Handle(context context.Context, record slog.Record) error{ + oh.Fill(context,&record) + if record.Level == LevelStack || record.Level == LevelFatal || record.Level == LevelDump{ + record.Add("stack",debug.Stack()) + } + + oh.locker.Lock() + defer oh.locker.Unlock() + return oh.JSONHandler.Handle(context, record) +} + +func (b *BaseHandler) Fill(context context.Context, record *slog.Record) { + if b.addSource { + var pcs [1]uintptr + runtime.Callers(7, pcs[:]) + record.PC = pcs[0] + } +} diff --git a/log/log.go b/log/log.go index c63246a..0596d09 100644 --- a/log/log.go +++ b/log/log.go @@ -1,21 +1,29 @@ package log import ( + "context" "fmt" + "github.com/duanhf2012/origin/util/bytespool" jsoniter "github.com/json-iterator/go" "io" + "log/slog" "os" "path" - "time" - "log/slog" - "context" + "path/filepath" + "runtime" + "sync" "sync/atomic" + "time" ) var json = jsoniter.ConfigCompatibleWithStandardLibrary var OpenConsole bool var LogSize int64 -var gLogger, _ = NewTextLogger(LevelDebug, "", "",true) +var LogChannelCap int +var LogPath string +var LogLevel slog.Level = LevelTrace +var gLogger, _ = NewTextLogger(LevelDebug, "", "",true,LogChannelCap) +var memPool = bytespool.NewMemAreaPool() // levels const ( @@ -44,9 +52,28 @@ type IoWriter struct { outFile io.Writer // destination for output outConsole io.Writer //os.Stdout writeBytes int64 + logChannel chan []byte + wg sync.WaitGroup + closeSig chan struct{} + + lockWrite sync.Mutex } func (iw *IoWriter) Close() error { + iw.lockWrite.Lock() + defer iw.lockWrite.Unlock() + + iw.close() + + return nil +} + +func (iw *IoWriter) close() error { + if iw.closeSig != nil { + close(iw.closeSig) + } + iw.wg.Wait() + if iw.outFile!= nil { return iw.outFile.(io.Closer).Close() } @@ -66,15 +93,81 @@ func (iw *IoWriter) writeFile(p []byte) (n int, err error){ } func (iw *IoWriter) Write(p []byte) (n int, err error){ + iw.lockWrite.Lock() + defer iw.lockWrite.Unlock() + + if iw.logChannel == nil { + return iw.writeIo(p) + } + + copyBuff := memPool.MakeBytes(len(p)) + if copyBuff == nil { + return 0,fmt.Errorf("MakeByteSlice failed") + } + copy(copyBuff,p) + + iw.logChannel <- copyBuff + + return +} + +func (iw *IoWriter) writeIo(p []byte) (n int, err error){ n,err = iw.writeFile(p) if iw.outConsole != nil { - return iw.outConsole.Write(p) + n,err = iw.outConsole.Write(p) } + memPool.ReleaseBytes(p) return } +func (iw *IoWriter) setLogChannel(logChannelNum int) (err error){ + iw.lockWrite.Lock() + defer iw.lockWrite.Unlock() + iw.close() + + if logChannelNum == 0 { + iw.logChannel = nil + iw.closeSig = nil + return nil + } + + //copy iw.logChannel + var logInfo []byte + logChannel := make(chan []byte,logChannelNum) + for i := 0; i < logChannelNum&&i 0 { + logs := <-iw.logChannel + iw.writeIo(logs) + } +} + func (logger *Logger) isFull() bool { if LogSize == 0 { return false @@ -83,6 +176,9 @@ func (logger *Logger) isFull() bool { return atomic.LoadInt64(&logger.ioWriter.writeBytes) >= LogSize } +func (logger *Logger) setLogChannel(logChannel int) (err error){ + return logger.ioWriter.setLogChannel(logChannel) +} func (logger *Logger) setIo() error{ now := time.Now() @@ -130,7 +226,7 @@ func (logger *Logger) setIo() error{ return nil } -func NewTextLogger(level slog.Level,pathName string,filePrefix string,addSource bool) (*Logger,error){ +func NewTextLogger(level slog.Level,pathName string,filePrefix string,addSource bool,logChannelCap int) (*Logger,error){ var logger Logger logger.filePath = pathName logger.fileprefix = filePrefix @@ -140,11 +236,11 @@ func NewTextLogger(level slog.Level,pathName string,filePrefix string,addSource return nil,err } logger.Slogger = slog.New(NewOriginTextHandler(level,&logger.ioWriter,addSource,defaultReplaceAttr)) - + logger.setLogChannel(logChannelCap) return &logger,nil } -func NewJsonLogger(level slog.Level,pathName string,filePrefix string,addSource bool) (*Logger,error){ +func NewJsonLogger(level slog.Level,pathName string,filePrefix string,addSource bool,logChannelCap int) (*Logger,error){ var logger Logger logger.filePath = pathName logger.fileprefix = filePrefix @@ -154,7 +250,8 @@ func NewJsonLogger(level slog.Level,pathName string,filePrefix string,addSource return nil,err } logger.Slogger = slog.New(NewOriginJsonHandler(level,&logger.ioWriter,true,defaultReplaceAttr)) - + logger.setLogChannel(logChannelCap) + return &logger,nil } @@ -169,7 +266,7 @@ func (logger *Logger) Trace(msg string, args ...any) { } func (logger *Logger) Debug(msg string, args ...any) { - logger.setIo() + logger.Slogger.Log(context.Background(),LevelDebug,msg,args...) } @@ -319,386 +416,113 @@ func Group(key string, args ...any) slog.Attr { return slog.Group(key, args...) } -func (logger *Logger) doSPrintf(a []interface{}) string{ - logger.sBuff.Reset() - - for _,s := range a { - switch s.(type) { - case []string: - strSlice := s.([]string) - logger.sBuff.AppendByte('[') - for _,str := range strSlice { - logger.sBuff.AppendString(str) - logger.sBuff.AppendByte(',') - } - - lastIdx := logger.sBuff.Len()-1 - if logger.sBuff.Bytes()[lastIdx] == ',' { - logger.sBuff.Bytes()[lastIdx] = ']' - }else{ - logger.sBuff.AppendByte(']') - } - - case int: - logger.sBuff.AppendInt(int64(s.(int))) - case []int: - intSlice := s.([]int) - logger.sBuff.AppendByte('[') - for _,v := range intSlice { - logger.sBuff.AppendInt(int64(v)) - logger.sBuff.AppendByte(',') - } - - lastIdx := logger.sBuff.Len()-1 - if logger.sBuff.Bytes()[lastIdx] == ',' { - logger.sBuff.Bytes()[lastIdx] = ']' - }else{ - logger.sBuff.AppendByte(']') - } - case int8: - logger.sBuff.AppendInt(int64(s.(int8))) - case []int8: - intSlice := s.([]int8) - logger.sBuff.AppendByte('[') - for _,v := range intSlice { - logger.sBuff.AppendInt(int64(v)) - logger.sBuff.AppendByte(',') - } - - lastIdx := logger.sBuff.Len()-1 - if logger.sBuff.Bytes()[lastIdx] == ',' { - logger.sBuff.Bytes()[lastIdx] = ']' - }else{ - logger.sBuff.AppendByte(']') - } - case int16: - logger.sBuff.AppendInt(int64(s.(int16))) - case []int16: - intSlice := s.([]int16) - logger.sBuff.AppendByte('[') - for _,v := range intSlice { - logger.sBuff.AppendInt(int64(v)) - logger.sBuff.AppendByte(',') - } - - lastIdx := logger.sBuff.Len()-1 - if logger.sBuff.Bytes()[lastIdx] == ',' { - logger.sBuff.Bytes()[lastIdx] = ']' - }else{ - logger.sBuff.AppendByte(']') - } - case int32: - logger.sBuff.AppendInt(int64(s.(int32))) - case []int32: - intSlice := s.([]int32) - logger.sBuff.AppendByte('[') - for _,v := range intSlice { - logger.sBuff.AppendInt(int64(v)) - logger.sBuff.AppendByte(',') - } - - lastIdx := logger.sBuff.Len()-1 - if logger.sBuff.Bytes()[lastIdx] == ',' { - logger.sBuff.Bytes()[lastIdx] = ']' - }else{ - logger.sBuff.AppendByte(']') - } - case int64: - logger.sBuff.AppendInt(s.(int64)) - case []int64: - intSlice := s.([]int64) - logger.sBuff.AppendByte('[') - for _,v := range intSlice { - logger.sBuff.AppendInt(v) - logger.sBuff.AppendByte(',') - } - - lastIdx := logger.sBuff.Len()-1 - if logger.sBuff.Bytes()[lastIdx] == ',' { - logger.sBuff.Bytes()[lastIdx] = ']' - }else{ - logger.sBuff.AppendByte(']') - } - case uint: - logger.sBuff.AppendUint(uint64(s.(uint))) - - case []uint: - intSlice := s.([]uint) - logger.sBuff.AppendByte('[') - for _,v := range intSlice { - logger.sBuff.AppendUint(uint64(v)) - logger.sBuff.AppendByte(',') - } - - lastIdx := logger.sBuff.Len()-1 - if logger.sBuff.Bytes()[lastIdx] == ',' { - logger.sBuff.Bytes()[lastIdx] = ']' - }else{ - logger.sBuff.AppendByte(']') - } - - case uint8: - logger.sBuff.AppendUint(uint64(s.(uint8))) - case []uint8: - intSlice := s.([]uint8) - logger.sBuff.AppendByte('[') - for _,v := range intSlice { - logger.sBuff.AppendUint(uint64(v)) - logger.sBuff.AppendByte(',') - } - - lastIdx := logger.sBuff.Len()-1 - if logger.sBuff.Bytes()[lastIdx] == ',' { - logger.sBuff.Bytes()[lastIdx] = ']' - }else{ - logger.sBuff.AppendByte(']') - } - - case uint16: - logger.sBuff.AppendUint(uint64(s.(uint16))) - case []uint16: - intSlice := s.([]uint16) - logger.sBuff.AppendByte('[') - for _,v := range intSlice { - logger.sBuff.AppendUint(uint64(v)) - logger.sBuff.AppendByte(',') - } - - lastIdx := logger.sBuff.Len()-1 - if logger.sBuff.Bytes()[lastIdx] == ',' { - logger.sBuff.Bytes()[lastIdx] = ']' - }else{ - logger.sBuff.AppendByte(']') - } - case uint32: - logger.sBuff.AppendUint(uint64(s.(uint32))) - case []uint32: - intSlice := s.([]uint32) - logger.sBuff.AppendByte('[') - for _,v := range intSlice { - logger.sBuff.AppendUint(uint64(v)) - logger.sBuff.AppendByte(',') - } - - lastIdx := logger.sBuff.Len()-1 - if logger.sBuff.Bytes()[lastIdx] == ',' { - logger.sBuff.Bytes()[lastIdx] = ']' - }else{ - logger.sBuff.AppendByte(']') - } - case uint64: - logger.sBuff.AppendUint(s.(uint64)) - case []uint64: - intSlice := s.([]uint64) - logger.sBuff.AppendByte('[') - for _,v := range intSlice { - logger.sBuff.AppendUint(v) - logger.sBuff.AppendByte(',') - } - lastIdx := logger.sBuff.Len()-1 - if logger.sBuff.Bytes()[lastIdx] == ',' { - logger.sBuff.Bytes()[lastIdx] = ']' - }else{ - logger.sBuff.AppendByte(']') - } - case float32: - logger.sBuff.AppendFloat(float64(s.(float32)),32) - case []float32: - intSlice := s.([]float32) - logger.sBuff.AppendByte('[') - for _,v := range intSlice { - logger.sBuff.AppendFloat(float64(v),32) - logger.sBuff.AppendByte(',') - } - lastIdx := logger.sBuff.Len()-1 - if logger.sBuff.Bytes()[lastIdx] == ',' { - logger.sBuff.Bytes()[lastIdx] = ']' - }else{ - logger.sBuff.AppendByte(']') - } - case float64: - logger.sBuff.AppendFloat(s.(float64),64) - case []float64: - intSlice := s.([]float64) - logger.sBuff.AppendByte('[') - for _,v := range intSlice { - logger.sBuff.AppendFloat(v,64) - logger.sBuff.AppendByte(',') - } - lastIdx := logger.sBuff.Len()-1 - if logger.sBuff.Bytes()[lastIdx] == ',' { - logger.sBuff.Bytes()[lastIdx] = ']' - }else{ - logger.sBuff.AppendByte(']') - } - case bool: - logger.sBuff.AppendBool(s.(bool)) - case []bool: - intSlice := s.([]bool) - logger.sBuff.AppendByte('[') - for _,v := range intSlice { - logger.sBuff.AppendBool(v) - logger.sBuff.AppendByte(',') - } - lastIdx := logger.sBuff.Len()-1 - if logger.sBuff.Bytes()[lastIdx] == ',' { - logger.sBuff.Bytes()[lastIdx] = ']' - }else{ - logger.sBuff.AppendByte(']') - } - case string: - logger.sBuff.AppendString(s.(string)) - case *int: - val := s.(*int) - if val != nil { - logger.sBuff.AppendInt(int64(*val)) - }else{ - logger.sBuff.AppendString("nil<*int>") - } - case *int8: - val := s.(*int8) - if val != nil { - logger.sBuff.AppendInt(int64(*val)) - }else{ - logger.sBuff.AppendString("nil<*int8>") - } - case *int16: - val := s.(*int16) - if val != nil { - logger.sBuff.AppendInt(int64(*val)) - }else{ - logger.sBuff.AppendString("nil<*int16>") - } - case *int32: - val := s.(*int32) - if val != nil { - logger.sBuff.AppendInt(int64(*val)) - }else{ - logger.sBuff.AppendString("nil<*int32>") - } - case *int64: - val := s.(*int64) - if val != nil { - logger.sBuff.AppendInt(int64(*val)) - }else{ - logger.sBuff.AppendString("nil<*int64>") - } - case *uint: - val := s.(*uint) - if val != nil { - logger.sBuff.AppendUint(uint64(*val)) - }else{ - logger.sBuff.AppendString("nil<*uint>") - } - case *uint8: - val := s.(*uint8) - if val != nil { - logger.sBuff.AppendUint(uint64(*val)) - }else{ - logger.sBuff.AppendString("nil<*uint8>") - } - case *uint16: - val := s.(*uint16) - if val != nil { - logger.sBuff.AppendUint(uint64(*val)) - }else{ - logger.sBuff.AppendString("nil<*uint16>") - } - case *uint32: - val := s.(*uint32) - if val != nil { - logger.sBuff.AppendUint(uint64(*val)) - }else{ - logger.sBuff.AppendString("nil<*uint32>") - } - case *uint64: - val := s.(*uint64) - if val != nil { - logger.sBuff.AppendUint(uint64(*val)) - }else{ - logger.sBuff.AppendString("nil<*uint64>") - } - case *float32: - val := s.(*float32) - if val != nil { - logger.sBuff.AppendFloat(float64(*val),32) - }else{ - logger.sBuff.AppendString("nil<*float32>") - } - case *float64: - val := s.(*float32) - if val != nil { - logger.sBuff.AppendFloat(float64(*val),64) - }else{ - logger.sBuff.AppendString("nil<*float64>") - } - case *bool: - val := s.(*bool) - if val != nil { - logger.sBuff.AppendBool(*val) - }else{ - logger.sBuff.AppendString("nil<*bool>") - } - case *string: - val := s.(*string) - if val != nil { - logger.sBuff.AppendString(*val) - }else{ - logger.sBuff.AppendString("nil<*string>") - } - //case []byte: - // logger.buf.AppendBytes(s.([]byte)) - default: - //b,err := json.MarshalToString(s) - //if err != nil { - logger.sBuff.AppendString("") - //}else{ - //logger.buf.AppendBytes(b) - //} - } +func (logger *Logger) doSPrintf(level slog.Level,a []interface{}) { + if logger.Slogger.Enabled(context.Background(),level) == false{ + return } - return logger.sBuff.String() + gLogger.Slogger.Handler().(IOriginHandler).Lock() + defer gLogger.Slogger.Handler().(IOriginHandler).UnLock() + + logger.sBuff.Reset() + + logger.formatHeader(&logger.sBuff,level,3) + + for _,s := range a { + logger.sBuff.AppendString(slog.AnyValue(s).String()) + } + logger.sBuff.AppendString("\"\n") + gLogger.ioWriter.Write([]byte(logger.sBuff.Bytes())) +} + + func (logger *Logger) STrace(a ...interface{}) { + logger.setIo() + logger.doSPrintf(LevelTrace,a) +} + +func (logger *Logger) SDebug(a ...interface{}) { + logger.setIo() + logger.doSPrintf(LevelDebug,a) +} + +func (logger *Logger) SInfo(a ...interface{}) { + logger.setIo() + logger.doSPrintf(LevelInfo,a) +} + +func (logger *Logger) SWarning(a ...interface{}) { + logger.setIo() + logger.doSPrintf(LevelWarning,a) +} + +func (logger *Logger) SError(a ...interface{}) { + logger.setIo() + logger.doSPrintf(LevelError,a) +} + +func STrace(a ...interface{}) { + gLogger.setIo() + gLogger.doSPrintf(LevelTrace,a) } func SDebug(a ...interface{}) { - gLogger.sBuff.Locker() - defer gLogger.sBuff.UnLocker() - - gLogger.Debug(gLogger.doSPrintf(a)) + gLogger.setIo() + gLogger.doSPrintf(LevelDebug,a) } func SInfo(a ...interface{}) { - gLogger.sBuff.Locker() - defer gLogger.sBuff.UnLocker() - - gLogger.Info(gLogger.doSPrintf(a)) + gLogger.setIo() + gLogger.doSPrintf(LevelInfo,a) } func SWarning(a ...interface{}) { - gLogger.sBuff.Locker() - defer gLogger.sBuff.UnLocker() - - gLogger.Warning(gLogger.doSPrintf(a)) + gLogger.setIo() + gLogger.doSPrintf(LevelWarning,a) } func SError(a ...interface{}) { - gLogger.sBuff.Locker() - defer gLogger.sBuff.UnLocker() - - gLogger.Error(gLogger.doSPrintf(a)) + gLogger.setIo() + gLogger.doSPrintf(LevelError,a) } -func SStack(a ...interface{}) { - gLogger.sBuff.Locker() - defer gLogger.sBuff.UnLocker() +func (logger *Logger) formatHeader(buf *Buffer,level slog.Level,calldepth int) { + t := time.Now() + var file string + var line int - gLogger.Stack(gLogger.doSPrintf(a)) -} + // Release lock while getting caller info - it's expensive. + var ok bool + _, file, line, ok = runtime.Caller(calldepth) + if !ok { + file = "???" + line = 0 + } + file = filepath.Base(file) -func SFatal(a ...interface{}) { - gLogger.sBuff.Locker() - defer gLogger.sBuff.UnLocker() + buf.AppendString("time=\"") + year, month, day := t.Date() + buf.AppendInt(int64(year)) + buf.AppendByte('/') + buf.AppendInt(int64(month)) + buf.AppendByte('/') + buf.AppendInt(int64(day)) + buf.AppendByte(' ') - gLogger.Fatal(gLogger.doSPrintf(a)) -} + hour, min, sec := t.Clock() + buf.AppendInt(int64(hour)) + buf.AppendByte(':') + buf.AppendInt(int64(min)) + buf.AppendByte(':') + + buf.AppendInt(int64(sec)) + buf.AppendString("\"") + + logger.sBuff.AppendString(" level=") + logger.sBuff.AppendString(getStrLevel(level)) + logger.sBuff.AppendString(" source=") + + buf.AppendString(file) + buf.AppendByte(':') + buf.AppendInt(int64(line)) + buf.AppendString(" msg=\"") +} \ No newline at end of file diff --git a/network/processor/jsonprocessor.go b/network/processor/jsonprocessor.go index adfbcdd..9ddc669 100644 --- a/network/processor/jsonprocessor.go +++ b/network/processor/jsonprocessor.go @@ -3,9 +3,9 @@ package processor import ( "encoding/json" "fmt" - "github.com/duanhf2012/origin/network" - "reflect" "github.com/duanhf2012/origin/log" + "github.com/duanhf2012/origin/util/bytespool" + "reflect" ) type MessageJsonInfo struct { @@ -24,7 +24,7 @@ type JsonProcessor struct { unknownMessageHandler UnknownMessageJsonHandler connectHandler ConnectJsonHandler disconnectHandler ConnectJsonHandler - network.INetMempool + bytespool.IBytesMempool } type JsonPackInfo struct { @@ -35,7 +35,7 @@ type JsonPackInfo struct { func NewJsonProcessor() *JsonProcessor { processor := &JsonProcessor{mapMsg:map[uint16]MessageJsonInfo{}} - processor.INetMempool = network.NewMemAreaPool() + processor.IBytesMempool = bytespool.NewMemAreaPool() return processor } @@ -58,7 +58,7 @@ func (jsonProcessor *JsonProcessor ) MsgRoute(clientId uint64,msg interface{}) e func (jsonProcessor *JsonProcessor) Unmarshal(clientId uint64,data []byte) (interface{}, error) { typeStruct := struct {Type int `json:"typ"`}{} - defer jsonProcessor.ReleaseByteSlice(data) + defer jsonProcessor.ReleaseBytes(data) err := json.Unmarshal(data, &typeStruct) if err != nil { return nil, err diff --git a/network/processor/pbprocessor.go b/network/processor/pbprocessor.go index 6e9c44d..87a1031 100644 --- a/network/processor/pbprocessor.go +++ b/network/processor/pbprocessor.go @@ -3,7 +3,7 @@ package processor import ( "encoding/binary" "fmt" - "github.com/duanhf2012/origin/network" + "github.com/duanhf2012/origin/util/bytespool" "github.com/gogo/protobuf/proto" "reflect" ) @@ -26,7 +26,7 @@ type PBProcessor struct { unknownMessageHandler UnknownMessageHandler connectHandler ConnectHandler disconnectHandler ConnectHandler - network.INetMempool + bytespool.IBytesMempool } type PBPackInfo struct { @@ -37,7 +37,7 @@ type PBPackInfo struct { func NewPBProcessor() *PBProcessor { processor := &PBProcessor{mapMsg: map[uint16]MessageInfo{}} - processor.INetMempool = network.NewMemAreaPool() + processor.IBytesMempool = bytespool.NewMemAreaPool() return processor } @@ -67,7 +67,7 @@ func (pbProcessor *PBProcessor) MsgRoute(clientId uint64, msg interface{}) error // must goroutine safe func (pbProcessor *PBProcessor) Unmarshal(clientId uint64, data []byte) (interface{}, error) { - defer pbProcessor.ReleaseByteSlice(data) + defer pbProcessor.ReleaseBytes(data) return pbProcessor.UnmarshalWithOutRelease(clientId, data) } diff --git a/network/tcp_conn.go b/network/tcp_conn.go index 094e472..31acc23 100644 --- a/network/tcp_conn.go +++ b/network/tcp_conn.go @@ -41,7 +41,7 @@ func newTCPConn(conn net.Conn, pendingWriteNum int, msgParser *MsgParser,writeDe conn.SetWriteDeadline(time.Now().Add(writeDeadline)) _, err := conn.Write(b) - tcpConn.msgParser.ReleaseByteSlice(b) + tcpConn.msgParser.ReleaseBytes(b) if err != nil { break @@ -130,7 +130,7 @@ func (tcpConn *TCPConn) ReadMsg() ([]byte, error) { } func (tcpConn *TCPConn) ReleaseReadMsg(byteBuff []byte){ - tcpConn.msgParser.ReleaseByteSlice(byteBuff) + tcpConn.msgParser.ReleaseBytes(byteBuff) } func (tcpConn *TCPConn) WriteMsg(args ...[]byte) error { diff --git a/network/tcp_msg.go b/network/tcp_msg.go index 868417c..4c24cee 100644 --- a/network/tcp_msg.go +++ b/network/tcp_msg.go @@ -3,6 +3,7 @@ package network import ( "encoding/binary" "errors" + "github.com/duanhf2012/origin/util/bytespool" "io" "math" ) @@ -16,7 +17,7 @@ type MsgParser struct { MaxMsgLen uint32 LittleEndian bool - INetMempool + bytespool.IBytesMempool } @@ -34,7 +35,7 @@ func (p *MsgParser) getMaxMsgLen(lenMsgLen int) uint32 { } func (p *MsgParser) init(){ - p.INetMempool = NewMemAreaPool() + p.IBytesMempool = bytespool.NewMemAreaPool() } // goroutine safe @@ -74,9 +75,9 @@ func (p *MsgParser) Read(conn *TCPConn) ([]byte, error) { } // data - msgData := p.MakeByteSlice(int(msgLen)) + msgData := p.MakeBytes(int(msgLen)) if _, err := io.ReadFull(conn, msgData[:msgLen]); err != nil { - p.ReleaseByteSlice(msgData) + p.ReleaseBytes(msgData) return nil, err } @@ -99,7 +100,7 @@ func (p *MsgParser) Write(conn *TCPConn, args ...[]byte) error { } //msg := make([]byte, uint32(p.lenMsgLen)+msgLen) - msg := p.MakeByteSlice(p.LenMsgLen+int(msgLen)) + msg := p.MakeBytes(p.LenMsgLen+int(msgLen)) // write len switch p.LenMsgLen { case 1: diff --git a/network/tcp_server.go b/network/tcp_server.go index 02c7d4f..887989f 100644 --- a/network/tcp_server.go +++ b/network/tcp_server.go @@ -2,6 +2,7 @@ package network import ( "github.com/duanhf2012/origin/log" + "github.com/duanhf2012/origin/util/bytespool" "net" "sync" "time" @@ -96,12 +97,12 @@ func (server *TCPServer) init() { server.MsgParser.init() } -func (server *TCPServer) SetNetMempool(mempool INetMempool){ - server.INetMempool = mempool +func (server *TCPServer) SetNetMempool(mempool bytespool.IBytesMempool){ + server.IBytesMempool = mempool } -func (server *TCPServer) GetNetMempool() INetMempool{ - return server.INetMempool +func (server *TCPServer) GetNetMempool() bytespool.IBytesMempool { + return server.IBytesMempool } func (server *TCPServer) run() { diff --git a/node/node.go b/node/node.go index 21b7798..d669a12 100644 --- a/node/node.go +++ b/node/node.go @@ -19,7 +19,6 @@ import ( "strings" "syscall" "time" - "log/slog" ) var sig chan os.Signal @@ -28,8 +27,8 @@ var preSetupService []service.IService //预安装 var profilerInterval time.Duration var bValid bool var configDir = "./config/" -var logLevel slog.Level = log.LevelTrace -var logPath string + + type BuildOSType = int8 const( @@ -51,6 +50,7 @@ func init() { console.RegisterCommandString("loglevel", "debug", "<-loglevel debug|release|warning|error|fatal> Set loglevel.", setLevel) console.RegisterCommandString("logpath", "", "<-logpath path> Set log file path.", setLogPath) console.RegisterCommandInt("logsize", 0, "<-logsize size> Set log size(MB).", setLogSize) + console.RegisterCommandInt("logchannelcap", 0, "<-logchannelcap num> Set log channel cap.", setLogChannelCapNum) console.RegisterCommandString("pprof", "", "<-pprof ip:port> Open performance analysis.", setPprof) } @@ -178,13 +178,13 @@ func initNode(id int) { } func initLog() error { - if logPath == "" { + if log.LogPath == "" { setLogPath("./log") } localnodeinfo := cluster.GetCluster().GetLocalNodeInfo() filepre := fmt.Sprintf("%s_%d_", localnodeinfo.NodeName, localnodeinfo.NodeId) - logger, err := log.NewTextLogger(logLevel,logPath,filepre,true) + logger, err := log.NewTextLogger(log.LogLevel,log.LogPath,filepre,true,log.LogChannelCap) if err != nil { fmt.Printf("cannot create log file!\n") return err @@ -282,6 +282,7 @@ func startNode(args interface{}) error { service.StopAllService() log.Info("Server is stop.") + log.Close() return nil } @@ -305,7 +306,6 @@ func GetConfigDir() string { return configDir } - func OpenProfilerReport(interval time.Duration) { profilerInterval = interval } @@ -333,19 +333,19 @@ func setLevel(args interface{}) error { strlogLevel := strings.TrimSpace(args.(string)) switch strlogLevel { case "trace": - logLevel = log.LevelTrace + log.LogLevel = log.LevelTrace case "debug": - logLevel = log.LevelDebug + log.LogLevel = log.LevelDebug case "info": - logLevel = log.LevelInfo + log.LogLevel = log.LevelInfo case "warning": - logLevel = log.LevelWarning + log.LogLevel = log.LevelWarning case "error": - logLevel = log.LevelError + log.LogLevel = log.LevelError case "stack": - logLevel = log.LevelStack + log.LogLevel = log.LevelStack case "fatal": - logLevel = log.LevelFatal + log.LogLevel = log.LevelFatal default: return errors.New("unknown level: " + strlogLevel) } @@ -356,16 +356,17 @@ func setLogPath(args interface{}) error { if args == "" { return nil } - logPath = strings.TrimSpace(args.(string)) - dir, err := os.Stat(logPath) //这个文件夹不存在 + + log.LogPath = strings.TrimSpace(args.(string)) + dir, err := os.Stat(log.LogPath) //这个文件夹不存在 if err == nil && dir.IsDir() == false { - return errors.New("Not found dir " + logPath) + return errors.New("Not found dir " + log.LogPath) } if err != nil { - err = os.Mkdir(logPath, os.ModePerm) + err = os.Mkdir(log.LogPath, os.ModePerm) if err != nil { - return errors.New("Cannot create dir " + logPath) + return errors.New("Cannot create dir " + log.LogPath) } } @@ -376,6 +377,7 @@ func setLogSize(args interface{}) error { if args == "" { return nil } + logSize,ok := args.(int) if ok == false{ return errors.New("param logsize is error") @@ -385,3 +387,17 @@ func setLogSize(args interface{}) error { return nil } + +func setLogChannelCapNum(args interface{}) error { + if args == "" { + return nil + } + + logChannelCap,ok := args.(int) + if ok == false{ + return errors.New("param logsize is error") + } + + log.LogChannelCap = logChannelCap + return nil +} diff --git a/profiler/profiler.go b/profiler/profiler.go index 6ad99e2..8c8beed 100644 --- a/profiler/profiler.go +++ b/profiler/profiler.go @@ -167,7 +167,7 @@ func DefaultReportFunction(name string,callNum int,costTime time.Duration,record elem = elem.Next() } - log.Info("report",strReport) + log.SInfo("report",strReport) } func Report() { diff --git a/rpc/compressor.go b/rpc/compressor.go index 3f28c6e..a5940a7 100644 --- a/rpc/compressor.go +++ b/rpc/compressor.go @@ -1,14 +1,14 @@ package rpc import ( - "runtime" "errors" - "github.com/pierrec/lz4/v4" "fmt" - "github.com/duanhf2012/origin/network" + "github.com/duanhf2012/origin/util/bytespool" + "github.com/pierrec/lz4/v4" + "runtime" ) -var memPool network.INetMempool = network.NewMemAreaPool() +var memPool bytespool.IBytesMempool = bytespool.NewMemAreaPool() type ICompressor interface { CompressBlock(src []byte) ([]byte, error) //dst如果有预申请使用dst内存,传入nil时内部申请 @@ -42,10 +42,10 @@ func (lc *Lz4Compressor) CompressBlock(src []byte) (dest []byte, err error) { var c lz4.Compressor var cnt int - dest = memPool.MakeByteSlice(lz4.CompressBlockBound(len(src))+1) + dest = memPool.MakeBytes(lz4.CompressBlockBound(len(src))+1) cnt, err = c.CompressBlock(src, dest[1:]) if err != nil { - memPool.ReleaseByteSlice(dest) + memPool.ReleaseBytes(dest) return nil,err } @@ -55,7 +55,7 @@ func (lc *Lz4Compressor) CompressBlock(src []byte) (dest []byte, err error) { } if ratio > 255 { - memPool.ReleaseByteSlice(dest) + memPool.ReleaseBytes(dest) return nil,fmt.Errorf("Impermissible errors") } @@ -79,10 +79,10 @@ func (lc *Lz4Compressor) UncompressBlock(src []byte) (dest []byte, err error) { return nil,fmt.Errorf("Impermissible errors") } - dest = memPool.MakeByteSlice(len(src)*int(radio)) + dest = memPool.MakeBytes(len(src)*int(radio)) cnt, err := lz4.UncompressBlock(src[1:], dest) if err != nil { - memPool.ReleaseByteSlice(dest) + memPool.ReleaseBytes(dest) return nil,err } @@ -94,9 +94,9 @@ func (lc *Lz4Compressor) compressBlockBound(n int) int{ } func (lc *Lz4Compressor) CompressBufferCollection(buffer []byte){ - memPool.ReleaseByteSlice(buffer) + memPool.ReleaseBytes(buffer) } func (lc *Lz4Compressor) UnCompressBufferCollection(buffer []byte) { - memPool.ReleaseByteSlice(buffer) + memPool.ReleaseBytes(buffer) } diff --git a/sysservice/messagequeueservice/CustomerSubscriber.go b/sysservice/messagequeueservice/CustomerSubscriber.go index 4308e04..87a7b6c 100644 --- a/sysservice/messagequeueservice/CustomerSubscriber.go +++ b/sysservice/messagequeueservice/CustomerSubscriber.go @@ -104,11 +104,11 @@ func (cs *CustomerSubscriber) UnSubscribe() { func (cs *CustomerSubscriber) LoadLastIndex() { for { if atomic.LoadInt32(&cs.isStop) != 0 { - log.SRelease("topic ", cs.topic, " out of subscription") + log.Info("topic ", cs.topic, " out of subscription") break } - log.SRelease("customer ", cs.customerId, " start load last index ") + log.Info("customer ", cs.customerId, " start load last index ") lastIndex, ret := cs.subscriber.dataPersist.LoadCustomerIndex(cs.topic, cs.customerId) if ret == true { if lastIndex > 0 { @@ -116,18 +116,18 @@ func (cs *CustomerSubscriber) LoadLastIndex() { } else { //否则直接使用客户端发回来的 } - log.SRelease("customer ", cs.customerId, " load finish,start index is ", cs.StartIndex) + log.Info("customer ", cs.customerId, " load finish,start index is ", cs.StartIndex) break } - log.SRelease("customer ", cs.customerId, " load last index is fail...") + log.Info("customer ", cs.customerId, " load last index is fail...") time.Sleep(5 * time.Second) } } func (cs *CustomerSubscriber) SubscribeRun() { defer cs.subscriber.queueWait.Done() - log.SRelease("topic ", cs.topic, " start subscription") + log.Info("topic ", cs.topic, " start subscription") //加载之前的位置 if cs.subscribeMethod == MethodLast { @@ -136,7 +136,7 @@ func (cs *CustomerSubscriber) SubscribeRun() { for { if atomic.LoadInt32(&cs.isStop) != 0 { - log.SRelease("topic ", cs.topic, " out of subscription") + log.Info("topic ", cs.topic, " out of subscription") break } @@ -146,14 +146,14 @@ func (cs *CustomerSubscriber) SubscribeRun() { //todo 检测退出 if cs.subscribe() == false { - log.SRelease("topic ", cs.topic, " out of subscription") + log.Info("topic ", cs.topic, " out of subscription") break } } //删除订阅关系 cs.subscriber.removeCustomer(cs.customerId, cs) - log.SRelease("topic ", cs.topic, " unsubscription") + log.Info("topic ", cs.topic, " unsubscription") } func (cs *CustomerSubscriber) subscribe() bool { diff --git a/sysservice/messagequeueservice/MessageQueueService.go b/sysservice/messagequeueservice/MessageQueueService.go index 77e6032..75ab955 100644 --- a/sysservice/messagequeueservice/MessageQueueService.go +++ b/sysservice/messagequeueservice/MessageQueueService.go @@ -63,7 +63,7 @@ func (ms *MessageQueueService) ReadCfg() error { maxProcessTopicBacklogNum, ok := mapDBServiceCfg["MaxProcessTopicBacklogNum"] if ok == false { ms.maxProcessTopicBacklogNum = DefaultMaxTopicBacklogNum - log.SRelease("MaxProcessTopicBacklogNum config is set to the default value of ", maxProcessTopicBacklogNum) + log.Info("MaxProcessTopicBacklogNum config is set to the default value of ", maxProcessTopicBacklogNum) } else { ms.maxProcessTopicBacklogNum = int32(maxProcessTopicBacklogNum.(float64)) } @@ -71,7 +71,7 @@ func (ms *MessageQueueService) ReadCfg() error { memoryQueueLen, ok := mapDBServiceCfg["MemoryQueueLen"] if ok == false { ms.memoryQueueLen = DefaultMemoryQueueLen - log.SRelease("MemoryQueueLen config is set to the default value of ", DefaultMemoryQueueLen) + log.Info("MemoryQueueLen config is set to the default value of ", DefaultMemoryQueueLen) } else { ms.memoryQueueLen = int32(memoryQueueLen.(float64)) } diff --git a/sysservice/messagequeueservice/Subscriber.go b/sysservice/messagequeueservice/Subscriber.go index 6dcc0a4..edae532 100644 --- a/sysservice/messagequeueservice/Subscriber.go +++ b/sysservice/messagequeueservice/Subscriber.go @@ -56,9 +56,9 @@ func (ss *Subscriber) TopicSubscribe(rpcHandler rpc.IRpcHandler, subScribeType r } if ok == true { - log.SRelease("repeat subscription for customer ", customerId) + log.Info("repeat subscription for customer ", customerId) } else { - log.SRelease("subscription for customer ", customerId) + log.Info("subscription for customer ", customerId) } } diff --git a/sysservice/messagequeueservice/TopicRoom.go b/sysservice/messagequeueservice/TopicRoom.go index 3975927..778acdf 100644 --- a/sysservice/messagequeueservice/TopicRoom.go +++ b/sysservice/messagequeueservice/TopicRoom.go @@ -93,7 +93,7 @@ func (tr *TopicRoom) Stop() { func (tr *TopicRoom) topicRoomRun() { defer tr.queueWait.Done() - log.SRelease("topic room ", tr.topic, " is running..") + log.Info("topic room ", tr.topic, " is running..") for { if atomic.LoadInt32(&tr.isStop) != 0 { break @@ -145,5 +145,5 @@ func (tr *TopicRoom) topicRoomRun() { } tr.customerLocker.Unlock() - log.SRelease("topic room ", tr.topic, " is stop") + log.Info("topic room ", tr.topic, " is stop") } diff --git a/sysservice/rankservice/MongodbPersist.go b/sysservice/rankservice/MongodbPersist.go index d07d03b..2fc7499 100644 --- a/sysservice/rankservice/MongodbPersist.go +++ b/sysservice/rankservice/MongodbPersist.go @@ -142,13 +142,13 @@ func (mp *MongoPersist) OnSetupRank(manual bool,rankSkip *RankSkip) error{ return nil } - log.SRelease("start load rank ",rankSkip.GetRankName()," from mongodb.") + log.Info("start load rank ",rankSkip.GetRankName()," from mongodb.") err := mp.loadFromDB(rankSkip.GetRankID(),rankSkip.GetRankName()) if err != nil { log.SError("load from db is fail :%s",err.Error()) return err } - log.SRelease("finish load rank ",rankSkip.GetRankName()," from mongodb.") + log.Info("finish load rank ",rankSkip.GetRankName()," from mongodb.") return nil } diff --git a/sysservice/tcpservice/tcpservice.go b/sysservice/tcpservice/tcpservice.go index 09945c4..81af4db 100644 --- a/sysservice/tcpservice/tcpservice.go +++ b/sysservice/tcpservice/tcpservice.go @@ -8,10 +8,11 @@ import ( "github.com/duanhf2012/origin/network/processor" "github.com/duanhf2012/origin/node" "github.com/duanhf2012/origin/service" - "sync/atomic" - "sync" - "time" + "github.com/duanhf2012/origin/util/bytespool" "runtime" + "sync" + "sync/atomic" + "time" ) type TcpService struct { @@ -277,14 +278,14 @@ func (tcpService *TcpService) GetConnNum() int { return connNum } -func (server *TcpService) SetNetMempool(mempool network.INetMempool){ +func (server *TcpService) SetNetMempool(mempool bytespool.IBytesMempool){ server.tcpServer.SetNetMempool(mempool) } -func (server *TcpService) GetNetMempool() network.INetMempool{ +func (server *TcpService) GetNetMempool() bytespool.IBytesMempool { return server.tcpServer.GetNetMempool() } func (server *TcpService) ReleaseNetMem(byteBuff []byte) { - server.tcpServer.GetNetMempool().ReleaseByteSlice(byteBuff) + server.tcpServer.GetNetMempool().ReleaseBytes(byteBuff) } diff --git a/network/slicepool.go b/util/bytespool/bytespool.go similarity index 89% rename from network/slicepool.go rename to util/bytespool/bytespool.go index 873f4df..42a8020 100644 --- a/network/slicepool.go +++ b/util/bytespool/bytespool.go @@ -1,12 +1,12 @@ -package network +package bytespool import ( "sync" ) -type INetMempool interface { - MakeByteSlice(size int) []byte - ReleaseByteSlice(byteBuff []byte) bool +type IBytesMempool interface { + MakeBytes(size int) []byte + ReleaseBytes(byteBuff []byte) bool } type memAreaPool struct { @@ -68,7 +68,7 @@ func (areaPool *memAreaPool) releaseByteSlice(byteBuff []byte) bool { return true } -func (areaPool *memAreaPool) MakeByteSlice(size int) []byte { +func (areaPool *memAreaPool) MakeBytes(size int) []byte { for i := 0; i < len(memAreaPoolList); i++ { if size <= memAreaPoolList[i].maxAreaValue { return memAreaPoolList[i].makeByteSlice(size) @@ -78,7 +78,7 @@ func (areaPool *memAreaPool) MakeByteSlice(size int) []byte { return make([]byte, size) } -func (areaPool *memAreaPool) ReleaseByteSlice(byteBuff []byte) bool { +func (areaPool *memAreaPool) ReleaseBytes(byteBuff []byte) bool { for i := 0; i < len(memAreaPoolList); i++ { if cap(byteBuff) <= memAreaPoolList[i].maxAreaValue { return memAreaPoolList[i].releaseByteSlice(byteBuff) diff --git a/util/math/math.go b/util/math/math.go index f015c74..b8ffe84 100644 --- a/util/math/math.go +++ b/util/math/math.go @@ -42,9 +42,9 @@ func Abs[NumType SignedNumberType](Num NumType) NumType { func Add[NumType NumberType](number1 NumType, number2 NumType) NumType { ret := number1 + number2 if number2> 0 && ret < number1 { - log.SStack("Calculation overflow , number1 is ",number1," number2 is ",number2) + log.Stack("Calculation overflow" ,log.Any("number1",number1),log.Any("number2",number2)) }else if (number2<0 && ret > number1){ - log.SStack("Calculation overflow , number1 is ",number1," number2 is ",number2) + log.Stack("Calculation overflow" ,log.Any("number1",number1),log.Any("number2",number2)) } return ret @@ -53,9 +53,9 @@ func Add[NumType NumberType](number1 NumType, number2 NumType) NumType { func Sub[NumType NumberType](number1 NumType, number2 NumType) NumType { ret := number1 - number2 if number2> 0 && ret > number1 { - log.SStack("Calculation overflow , number1 is ",number1," number2 is ",number2) + log.Stack("Calculation overflow" ,log.Any("number1",number1),log.Any("number2",number2)) }else if (number2<0 && ret < number1){ - log.SStack("Calculation overflow , number1 is ",number1," number2 is ",number2) + log.Stack("Calculation overflow" ,log.Any("number1",number1),log.Any("number2",number2)) } return ret @@ -71,8 +71,8 @@ func Mul[NumType NumberType](number1 NumType, number2 NumType) NumType { if ret / number2 == number1 { return ret } - - log.SStack("Calculation overflow , number1 is ",number1," number2 is ",number2) + + log.Stack("Calculation overflow" ,log.Any("number1",number1),log.Any("number2",number2)) return ret }