Compare commits

...

7 Commits

Author SHA1 Message Date
duanhf2012
da45f97fa8 优化日志 2024-11-29 15:55:35 +08:00
duanhf2012
d29abc0813 新增配置文件环境变量支持 2024-11-29 14:39:04 +08:00
duanhf2012
c9507f9ee9 替换slog日志为zap 2024-11-29 13:47:51 +08:00
duanhf2012
61de4bba3a 替换slog日志为zap 2024-11-29 13:47:27 +08:00
duanhf2012
000853b479 网络库优化 2024-11-25 17:44:08 +08:00
duanhf2012
387e83d65c Deprecated一些定时器接口 2024-10-12 17:49:59 +08:00
duanhf2012
07a102c6ea 新增发布订阅模式 2024-10-11 09:00:12 +08:00
41 changed files with 653 additions and 931 deletions

View File

@@ -250,7 +250,7 @@ func (cls *Cluster) Init(localNodeId string, setupServiceFun SetupServiceFun) er
//2.安装服务发现结点 //2.安装服务发现结点
err = cls.setupDiscovery(localNodeId, setupServiceFun) err = cls.setupDiscovery(localNodeId, setupServiceFun)
if err != nil { if err != nil {
log.Error("setupDiscovery fail", log.ErrorAttr("err", err)) log.Error("setupDiscovery fail", log.ErrorField("err", err))
return err return err
} }
service.RegRpcEventFun = cls.RegRpcEvent service.RegRpcEventFun = cls.RegRpcEvent

View File

@@ -10,13 +10,11 @@ import (
"go.etcd.io/etcd/client/v3" "go.etcd.io/etcd/client/v3"
"google.golang.org/protobuf/proto" "google.golang.org/protobuf/proto"
"time" "time"
"context" "context"
"errors" "errors"
"fmt" "fmt"
"go.uber.org/zap" "go.uber.org/zap"
"path" "path"
"runtime"
"strings" "strings"
"sync/atomic" "sync/atomic"
) )
@@ -96,14 +94,14 @@ func (ed *EtcdDiscoveryService) OnInit() error {
}) })
if cerr != nil { if cerr != nil {
log.Error("etcd discovery init fail", log.ErrorAttr("err", cerr)) log.Error("etcd discovery init fail", log.ErrorField("err", cerr))
return cerr return cerr
} }
ctx, _ := context.WithTimeout(context.Background(), time.Second*3) ctx, _ := context.WithTimeout(context.Background(), time.Second*3)
_, err = client.Leases(ctx) _, err = client.Leases(ctx)
if err != nil { if err != nil {
log.Error("etcd discovery init fail", log.Any("endpoint", etcdDiscoveryCfg.EtcdList[i].Endpoints), log.ErrorAttr("err", err)) log.Error("etcd discovery init fail", log.Any("endpoint", etcdDiscoveryCfg.EtcdList[i].Endpoints), log.ErrorField("err", err))
return err return err
} }
@@ -128,7 +126,7 @@ func (ed *EtcdDiscoveryService) registerServiceByClient(client *clientv3.Client,
var resp *clientv3.LeaseGrantResponse var resp *clientv3.LeaseGrantResponse
resp, err = client.Grant(context.Background(), cluster.GetEtcdDiscovery().TTLSecond) resp, err = client.Grant(context.Background(), cluster.GetEtcdDiscovery().TTLSecond)
if err != nil { if err != nil {
log.Error("etcd registerService fail", log.ErrorAttr("err", err)) log.Error("etcd registerService fail", log.ErrorField("err", err))
ed.tryRegisterService(client, etcdClient) ed.tryRegisterService(client, etcdClient)
return return
} }
@@ -138,7 +136,7 @@ func (ed *EtcdDiscoveryService) registerServiceByClient(client *clientv3.Client,
// 注册服务节点到 etcd // 注册服务节点到 etcd
_, err = client.Put(context.Background(), ed.getRegisterKey(watchKey), ed.byteLocalNodeInfo, clientv3.WithLease(resp.ID)) _, err = client.Put(context.Background(), ed.getRegisterKey(watchKey), ed.byteLocalNodeInfo, clientv3.WithLease(resp.ID))
if err != nil { if err != nil {
log.Error("etcd Put fail", log.ErrorAttr("err", err)) log.Error("etcd Put fail", log.ErrorField("err", err))
ed.tryRegisterService(client, etcdClient) ed.tryRegisterService(client, etcdClient)
return return
} }
@@ -146,7 +144,7 @@ func (ed *EtcdDiscoveryService) registerServiceByClient(client *clientv3.Client,
etcdClient.keepAliveChan, err = client.KeepAlive(context.Background(), etcdClient.leaseID) etcdClient.keepAliveChan, err = client.KeepAlive(context.Background(), etcdClient.leaseID)
if err != nil { if err != nil {
log.Error("etcd KeepAlive fail", log.ErrorAttr("err", err)) log.Error("etcd KeepAlive fail", log.ErrorField("err", err))
ed.tryRegisterService(client, etcdClient) ed.tryRegisterService(client, etcdClient)
return return
} }
@@ -200,7 +198,7 @@ func (ed *EtcdDiscoveryService) retire() error {
// 注册服务节点到 etcd // 注册服务节点到 etcd
_, err := c.Put(context.Background(), ed.getRegisterKey(watchKey), ed.byteLocalNodeInfo, clientv3.WithLease(ec.leaseID)) _, err := c.Put(context.Background(), ed.getRegisterKey(watchKey), ed.byteLocalNodeInfo, clientv3.WithLease(ec.leaseID))
if err != nil { if err != nil {
log.Error("etcd Put fail", log.ErrorAttr("err", err)) log.Error("etcd Put fail", log.ErrorField("err", err))
return err return err
} }
} }
@@ -285,12 +283,12 @@ func (ed *EtcdDiscoveryService) setNodeInfo(networkName string, nodeInfo *rpc.No
func (ed *EtcdDiscoveryService) close() { func (ed *EtcdDiscoveryService) close() {
for c, ec := range ed.mapClient { for c, ec := range ed.mapClient {
if _, err := c.Revoke(context.Background(), ec.leaseID); err != nil { if _, err := c.Revoke(context.Background(), ec.leaseID); err != nil {
log.Error("etcd Revoke fail", log.ErrorAttr("err", err)) log.Error("etcd Revoke fail", log.ErrorField("err", err))
} }
c.Watcher.Close() c.Watcher.Close()
err := c.Close() err := c.Close()
if err != nil { if err != nil {
log.Error("etcd Close fail", log.ErrorAttr("err", err)) log.Error("etcd Close fail", log.ErrorField("err", err))
} }
} }
} }
@@ -299,7 +297,7 @@ func (ed *EtcdDiscoveryService) getServices(client *clientv3.Client, etcdClient
// 根据前缀获取现有的key // 根据前缀获取现有的key
resp, err := client.Get(context.Background(), watchKey, clientv3.WithPrefix()) resp, err := client.Get(context.Background(), watchKey, clientv3.WithPrefix())
if err != nil { if err != nil {
log.Error("etcd Get fail", log.ErrorAttr("err", err)) log.Error("etcd Get fail", log.ErrorField("err", err))
ed.tryWatch(client, etcdClient) ed.tryWatch(client, etcdClient)
return false return false
} }
@@ -322,11 +320,7 @@ func (ed *EtcdDiscoveryService) watchByClient(client *clientv3.Client, etcdClien
func (ed *EtcdDiscoveryService) watcher(client *clientv3.Client, etcdClient *etcdClientInfo, watchKey string) { func (ed *EtcdDiscoveryService) watcher(client *clientv3.Client, etcdClient *etcdClientInfo, watchKey string) {
defer func() { defer func() {
if r := recover(); r != nil { if r := recover(); r != nil {
buf := make([]byte, 4096) log.StackError(fmt.Sprint(r))
l := runtime.Stack(buf, false)
errString := fmt.Sprint(r)
log.Dump(string(buf[:l]), log.String("error", errString))
ed.tryWatch(client, etcdClient) ed.tryWatch(client, etcdClient)
} }
}() }()
@@ -355,7 +349,7 @@ func (ed *EtcdDiscoveryService) setNode(netWorkName string, byteNode []byte) str
var nodeInfo rpc.NodeInfo var nodeInfo rpc.NodeInfo
err := proto.Unmarshal(byteNode, &nodeInfo) err := proto.Unmarshal(byteNode, &nodeInfo)
if err != nil { if err != nil {
log.Error("Unmarshal fail", log.String("netWorkName", netWorkName), log.ErrorAttr("err", err)) log.Error("Unmarshal fail", log.String("netWorkName", netWorkName), log.ErrorField("err", err))
return "" return ""
} }
@@ -494,7 +488,7 @@ func (ed *EtcdDiscoveryService) RPC_ServiceRecord(etcdServiceRecord *service.Etc
ctx, _ := context.WithTimeout(context.Background(), time.Second*3) ctx, _ := context.WithTimeout(context.Background(), time.Second*3)
lg, err = client.Grant(ctx, etcdServiceRecord.TTLSecond) lg, err = client.Grant(ctx, etcdServiceRecord.TTLSecond)
if err != nil { if err != nil {
log.Error("etcd record fail,cannot grant lease", log.ErrorAttr("err", err)) log.Error("etcd record fail,cannot grant lease", log.ErrorField("err", err))
return errors.New("cannot grant lease") return errors.New("cannot grant lease")
} }
} }
@@ -503,14 +497,14 @@ func (ed *EtcdDiscoveryService) RPC_ServiceRecord(etcdServiceRecord *service.Etc
ctx, _ := context.WithTimeout(context.Background(), time.Second*3) ctx, _ := context.WithTimeout(context.Background(), time.Second*3)
_, err = client.Put(ctx, path.Join(originDir, etcdServiceRecord.RecordKey), etcdServiceRecord.RecordInfo, clientv3.WithLease(lg.ID)) _, err = client.Put(ctx, path.Join(originDir, etcdServiceRecord.RecordKey), etcdServiceRecord.RecordInfo, clientv3.WithLease(lg.ID))
if err != nil { if err != nil {
log.Error("etcd record fail,cannot put record", log.ErrorAttr("err", err)) log.Error("etcd record fail,cannot put record", log.ErrorField("err", err))
} }
return errors.New("cannot put record") return errors.New("cannot put record")
} }
_, err = client.Put(context.Background(), path.Join(originDir, etcdServiceRecord.RecordKey), etcdServiceRecord.RecordInfo) _, err = client.Put(context.Background(), path.Join(originDir, etcdServiceRecord.RecordKey), etcdServiceRecord.RecordInfo)
if err != nil { if err != nil {
log.Error("etcd record fail,cannot put record", log.ErrorAttr("err", err)) log.Error("etcd record fail,cannot put record", log.ErrorField("err", err))
return errors.New("cannot put record") return errors.New("cannot put record")
} }

View File

@@ -471,7 +471,7 @@ func (dc *OriginDiscoveryClient) OnRelease() {
err := dc.CallNodeWithTimeout(3*time.Second, masterNodeList.MasterNodeList[i].NodeId, UnRegServiceDiscover, &nodeRetireReq, &rpc.Empty{}) err := dc.CallNodeWithTimeout(3*time.Second, masterNodeList.MasterNodeList[i].NodeId, UnRegServiceDiscover, &nodeRetireReq, &rpc.Empty{})
if err != nil { if err != nil {
log.Error("call "+UnRegServiceDiscover+" is fail", log.ErrorAttr("err", err)) log.Error("call "+UnRegServiceDiscover+" is fail", log.ErrorField("err", err))
} }
} }
} }
@@ -493,7 +493,7 @@ func (dc *OriginDiscoveryClient) OnRetire() {
err := dc.GoNode(masterNodeList.MasterNodeList[i].NodeId, NodeRetireRpcMethod, &nodeRetireReq) err := dc.GoNode(masterNodeList.MasterNodeList[i].NodeId, NodeRetireRpcMethod, &nodeRetireReq)
if err != nil { if err != nil {
log.Error("call "+NodeRetireRpcMethod+" is fail", log.ErrorAttr("err", err)) log.Error("call "+NodeRetireRpcMethod+" is fail", log.ErrorField("err", err))
} }
} }
} }

View File

@@ -88,15 +88,16 @@ func yamlToJson(data []byte, v interface{}) ([]byte, error) {
} }
func unmarshalConfig(data []byte, v interface{}) error { func unmarshalConfig(data []byte, v interface{}) error {
if !json.Valid(data) { envData := []byte(os.ExpandEnv(string(data)))
if !json.Valid(envData) {
var err error var err error
data, err = yamlToJson(data, v) envData, err = yamlToJson(envData, v)
if err != nil { if err != nil {
return err return err
} }
} }
return json.Unmarshal(data, v) return json.Unmarshal(envData, v)
} }
func (d *DiscoveryInfo) getDiscoveryType() DiscoveryType { func (d *DiscoveryInfo) getDiscoveryType() DiscoveryType {

View File

@@ -58,7 +58,7 @@ func (c *Concurrent) AsyncDoByQueue(queueId int64, fn func() bool, cb func(err e
} }
if fn == nil && cb == nil { if fn == nil && cb == nil {
log.Stack("fn and cb is nil") log.StackError("fn and cb is nil")
return return
} }

View File

@@ -6,7 +6,7 @@ import (
"time" "time"
"fmt" "fmt"
"runtime"
"context" "context"
"github.com/duanhf2012/origin/v2/log" "github.com/duanhf2012/origin/v2/log"
@@ -192,10 +192,7 @@ breakFor:
func (d *dispatch) DoCallback(cb func(err error)) { func (d *dispatch) DoCallback(cb func(err error)) {
defer func() { defer func() {
if r := recover(); r != nil { if r := recover(); r != nil {
buf := make([]byte, 4096) log.StackError(fmt.Sprint(r))
l := runtime.Stack(buf, false)
errString := fmt.Sprint(r)
log.Dump(string(buf[:l]), log.String("error", errString))
} }
}() }()

View File

@@ -5,7 +5,7 @@ import (
"errors" "errors"
"fmt" "fmt"
"runtime"
"github.com/duanhf2012/origin/v2/log" "github.com/duanhf2012/origin/v2/log"
) )
@@ -51,15 +51,13 @@ func (w *worker) run(waitGroup *sync.WaitGroup, t task) {
func (w *worker) exec(t *task) { func (w *worker) exec(t *task) {
defer func() { defer func() {
if r := recover(); r != nil { if r := recover(); r != nil {
buf := make([]byte, 4096)
l := runtime.Stack(buf, false)
errString := fmt.Sprint(r) errString := fmt.Sprint(r)
cb := t.cb cb := t.cb
t.cb = func(err error) { t.cb = func(err error) {
cb(errors.New(errString)) cb(errors.New(errString))
} }
log.Dump(string(buf[:l]), log.String("error", errString)) log.StackError(errString)
w.endCallFun(true, t) w.endCallFun(true, t)
} }
}() }()

View File

@@ -3,7 +3,6 @@ package event
import ( import (
"fmt" "fmt"
"github.com/duanhf2012/origin/v2/log" "github.com/duanhf2012/origin/v2/log"
"runtime"
"sync" "sync"
) )
@@ -215,10 +214,7 @@ func (handler *EventHandler) Destroy() {
func (processor *EventProcessor) EventHandler(ev IEvent) { func (processor *EventProcessor) EventHandler(ev IEvent) {
defer func() { defer func() {
if r := recover(); r != nil { if r := recover(); r != nil {
buf := make([]byte, 4096) log.StackError(fmt.Sprint(r))
l := runtime.Stack(buf, false)
errString := fmt.Sprint(r)
log.Dump(string(buf[:l]), log.String("error", errString))
} }
}() }()

1
go.mod
View File

@@ -21,6 +21,7 @@ require (
go.mongodb.org/mongo-driver v1.9.1 go.mongodb.org/mongo-driver v1.9.1
go.uber.org/zap v1.27.0 go.uber.org/zap v1.27.0
google.golang.org/protobuf v1.34.1 google.golang.org/protobuf v1.34.1
gopkg.in/natefinch/lumberjack.v2 v2.2.1
gopkg.in/yaml.v3 v3.0.1 gopkg.in/yaml.v3 v3.0.1
) )

2
go.sum
View File

@@ -330,6 +330,8 @@ gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
gopkg.in/natefinch/lumberjack.v2 v2.2.1 h1:bBRl1b0OH9s/DuPhuXpNl+VtCaJXFZ5/uEFST95x9zc=
gopkg.in/natefinch/lumberjack.v2 v2.2.1/go.mod h1:YD8tP3GAjkrDg1eZH7EGmyESg/lsYskCTPBJVb9jqSc=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=

View File

@@ -1,93 +0,0 @@
package log
import (
"strconv"
)
const _size = 9216
type Buffer struct {
bs []byte
//mu sync.Mutex // ensures atomic writes; protects the following fields
}
func (buff *Buffer) Init() {
buff.bs = make([]byte, _size)
}
// AppendByte writes a single byte to the Buffer.
func (buff *Buffer) AppendByte(v byte) {
buff.bs = append(buff.bs, v)
}
func (buff *Buffer) AppendBytes(v []byte) {
buff.bs = append(buff.bs, v...)
}
// AppendString writes a string to the Buffer.
func (buff *Buffer) AppendString(s string) {
buff.bs = append(buff.bs, s...)
}
// AppendInt appends an integer to the underlying buffer (assuming base 10).
func (buff *Buffer) AppendInt(i int64) {
buff.bs = strconv.AppendInt(buff.bs, i, 10)
}
// AppendUint appends an unsigned integer to the underlying buffer (assuming
// base 10).
func (buff *Buffer) AppendUint(i uint64) {
buff.bs = strconv.AppendUint(buff.bs, i, 10)
}
// AppendBool appends a bool to the underlying buffer.
func (buff *Buffer) AppendBool(v bool) {
buff.bs = strconv.AppendBool(buff.bs, v)
}
// AppendFloat appends a float to the underlying buffer. It doesn't quote NaN
// or +/- Inf.
func (buff *Buffer) AppendFloat(f float64, bitSize int) {
buff.bs = strconv.AppendFloat(buff.bs, f, 'f', -1, bitSize)
}
// Len returns the length of the underlying byte slice.
func (buff *Buffer) Len() int {
return len(buff.bs)
}
// Cap returns the capacity of the underlying byte slice.
func (buff *Buffer) Cap() int {
return cap(buff.bs)
}
// Bytes returns a mutable reference to the underlying byte slice.
func (buff *Buffer) Bytes() []byte {
return buff.bs
}
// String returns a string copy of the underlying byte slice.
func (buff *Buffer) String() string {
return string(buff.bs)
}
// Reset resets the underlying byte slice. Subsequent writes re-use the slice's
// backing array.
func (buff *Buffer) Reset() {
buff.bs = buff.bs[:0]
}
// Write implements io.Writer.
func (buff *Buffer) Write(bs []byte) (int, error) {
buff.bs = append(buff.bs, bs...)
return len(bs), nil
}
// TrimNewline trims any final "\n" byte from the end of the buffer.
func (buff *Buffer) TrimNewline() {
if i := len(buff.bs) - 1; i >= 0 {
if buff.bs[i] == '\n' {
buff.bs = buff.bs[:i]
}
}
}

View File

@@ -1,161 +0,0 @@
package log
import (
"context"
"io"
"log/slog"
"path/filepath"
"runtime"
"runtime/debug"
"sync"
)
const defaultSkip = 7
type IOriginHandler interface {
slog.Handler
Lock()
UnLock()
SetSkip(skip int)
GetSkip() int
}
type BaseHandler struct {
addSource bool
w io.Writer
locker sync.Mutex
skip int
}
type OriginTextHandler struct {
BaseHandler
*slog.TextHandler
}
type OriginJsonHandler struct {
BaseHandler
*slog.JSONHandler
}
func (bh *BaseHandler) SetSkip(skip int) {
bh.skip = skip
}
func (bh *BaseHandler) GetSkip() int {
return bh.skip
}
func getStrLevel(level slog.Level) string {
switch level {
case LevelTrace:
return "Trace"
case LevelDebug:
return "Debug"
case LevelInfo:
return "Info"
case LevelWarning:
return "Warning"
case LevelError:
return "Error"
case LevelStack:
return "Stack"
case LevelDump:
return "Dump"
case LevelFatal:
return "Fatal"
}
return ""
}
func defaultReplaceAttr(groups []string, a slog.Attr) slog.Attr {
if a.Key == slog.LevelKey {
level := a.Value.Any().(slog.Level)
a.Value = slog.StringValue(getStrLevel(level))
} else if a.Key == slog.TimeKey && len(groups) == 0 {
a.Value = slog.StringValue(a.Value.Time().Format("2006/01/02 15:04:05"))
} else if a.Key == slog.SourceKey {
source := a.Value.Any().(*slog.Source)
source.File = filepath.Base(source.File)
}
return a
}
func NewOriginTextHandler(level slog.Level, w io.Writer, addSource bool, replaceAttr func([]string, slog.Attr) slog.Attr) slog.Handler {
var textHandler OriginTextHandler
textHandler.addSource = addSource
textHandler.w = w
textHandler.TextHandler = slog.NewTextHandler(w, &slog.HandlerOptions{
AddSource: addSource,
Level: level,
ReplaceAttr: replaceAttr,
})
textHandler.skip = defaultSkip
return &textHandler
}
func (oh *OriginTextHandler) Handle(context context.Context, record slog.Record) error {
oh.Fill(context, &record)
oh.locker.Lock()
defer oh.locker.Unlock()
if record.Level == LevelStack || record.Level == LevelFatal {
err := oh.TextHandler.Handle(context, record)
oh.logStack(&record)
return err
} else if record.Level == LevelDump {
strDump := record.Message
record.Message = "dump info"
err := oh.TextHandler.Handle(context, record)
oh.w.Write([]byte(strDump))
return err
}
return oh.TextHandler.Handle(context, record)
}
func (bh *BaseHandler) logStack(record *slog.Record) {
bh.w.Write(debug.Stack())
}
func (bh *BaseHandler) Lock() {
bh.locker.Lock()
}
func (bh *BaseHandler) UnLock() {
bh.locker.Unlock()
}
func NewOriginJsonHandler(level slog.Level, w io.Writer, addSource bool, replaceAttr func([]string, slog.Attr) slog.Attr) slog.Handler {
var jsonHandler OriginJsonHandler
jsonHandler.addSource = addSource
jsonHandler.w = w
jsonHandler.JSONHandler = slog.NewJSONHandler(w, &slog.HandlerOptions{
AddSource: addSource,
Level: level,
ReplaceAttr: replaceAttr,
})
jsonHandler.skip = defaultSkip
return &jsonHandler
}
func (oh *OriginJsonHandler) Handle(context context.Context, record slog.Record) error {
oh.Fill(context, &record)
if record.Level == LevelStack || record.Level == LevelFatal || record.Level == LevelDump {
record.Add("stack", debug.Stack())
}
oh.locker.Lock()
defer oh.locker.Unlock()
return oh.JSONHandler.Handle(context, record)
}
func (bh *BaseHandler) Fill(_ context.Context, record *slog.Record) {
if bh.addSource {
var pcs [1]uintptr
runtime.Callers(bh.skip, pcs[:])
record.PC = pcs[0]
}
}

View File

@@ -1,520 +1,326 @@
package log package log
import ( import (
"context" "go.uber.org/zap"
"fmt" "go.uber.org/zap/zapcore"
"github.com/duanhf2012/origin/v2/util/bytespool" "gopkg.in/natefinch/lumberjack.v2"
"io"
"log/slog"
"os" "os"
"path"
"path/filepath"
"runtime"
"sync"
"sync/atomic"
"time" "time"
) )
var OpenConsole bool
var LogSize int64
var LogChannelCap int
var LogPath string
var LogLevel = LevelTrace
var gLogger, _ = NewTextLogger(LevelDebug, "", "", true, LogChannelCap)
var isSetLogger bool var isSetLogger bool
var memPool = bytespool.NewMemAreaPool() var gLogger = NewDefaultLogger()
// levels
const (
LevelTrace = slog.Level(-8)
LevelDebug = slog.LevelDebug
LevelInfo = slog.LevelInfo
LevelWarning = slog.LevelWarn
LevelError = slog.LevelError
LevelStack = slog.Level(12)
LevelDump = slog.Level(16)
LevelFatal = slog.Level(20)
)
type ILogger interface {
Trace(msg string, args ...any)
Debug(msg string, args ...any)
Info(msg string, args ...any)
Warning(msg string, args ...any)
Error(msg string, args ...any)
Stack(msg string, args ...any)
Dump(msg string, args ...any)
Fatal(msg string, args ...any)
DoSPrintf(level slog.Level, a []interface{})
FormatHeader(buf *Buffer, level slog.Level, callDepth int)
Close()
}
type Logger struct { type Logger struct {
SLogger *slog.Logger *zap.Logger
stack bool
ioWriter IoWriter OpenConsole *bool
LogPath string
sBuff Buffer FileName string
LogLevel zapcore.Level
Encoder zapcore.Encoder
LogConfig *lumberjack.Logger
sugaredLogger *zap.SugaredLogger
} }
type IoWriter struct { func SetLogger(logger *Logger) {
outFile io.Writer // destination for output
writeBytes int64
logChannel chan []byte
wg sync.WaitGroup
closeSig chan struct{}
lockWrite sync.Mutex
filePath string
filePrefix string
fileDay int
fileCreateTime int64 //second
}
func (iw *IoWriter) Close() error {
iw.lockWrite.Lock()
defer iw.lockWrite.Unlock()
iw.close()
return nil
}
func (iw *IoWriter) close() error {
if iw.closeSig != nil {
close(iw.closeSig)
iw.closeSig = nil
}
iw.wg.Wait()
if iw.outFile != nil {
err := iw.outFile.(io.Closer).Close()
iw.outFile = nil
return err
}
return nil
}
func (iw *IoWriter) writeFile(p []byte) (n int, err error) {
//switch log file
iw.switchFile()
if iw.outFile != nil {
n, err = iw.outFile.Write(p)
if n > 0 {
atomic.AddInt64(&iw.writeBytes, int64(n))
}
}
return 0, nil
}
func (iw *IoWriter) Write(p []byte) (n int, err error) {
iw.lockWrite.Lock()
defer iw.lockWrite.Unlock()
if iw.logChannel == nil {
return iw.writeIo(p)
}
copyBuff := memPool.MakeBytes(len(p))
if copyBuff == nil {
return 0, fmt.Errorf("MakeByteSlice failed")
}
copy(copyBuff, p)
iw.logChannel <- copyBuff
return
}
func (iw *IoWriter) writeIo(p []byte) (n int, err error) {
n, err = iw.writeFile(p)
if OpenConsole {
n, err = os.Stdout.Write(p)
}
return
}
func (iw *IoWriter) setLogChannel(logChannelNum int) (err error) {
iw.lockWrite.Lock()
defer iw.lockWrite.Unlock()
iw.close()
if logChannelNum == 0 {
return nil
}
//copy iw.logChannel
var logInfo []byte
logChannel := make(chan []byte, logChannelNum)
for i := 0; i < logChannelNum && i < len(iw.logChannel); i++ {
logInfo = <-iw.logChannel
logChannel <- logInfo
}
iw.logChannel = logChannel
iw.closeSig = make(chan struct{})
iw.wg.Add(1)
go iw.run()
return nil
}
func (iw *IoWriter) run() {
defer iw.wg.Done()
Loop:
for {
select {
case <-iw.closeSig:
break Loop
case logs := <-iw.logChannel:
iw.writeIo(logs)
memPool.ReleaseBytes(logs)
}
}
for len(iw.logChannel) > 0 {
logs := <-iw.logChannel
iw.writeIo(logs)
memPool.ReleaseBytes(logs)
}
}
func (iw *IoWriter) isFull() bool {
if LogSize == 0 {
return false
}
return atomic.LoadInt64(&iw.writeBytes) >= LogSize
}
func (logger *Logger) setLogChannel(logChannel int) (err error) {
return logger.ioWriter.setLogChannel(logChannel)
}
func (iw *IoWriter) switchFile() error {
now := time.Now()
if iw.fileCreateTime == now.Unix() {
return nil
}
if iw.fileDay == now.Day() && iw.isFull() == false {
return nil
}
if iw.filePath != "" {
var err error
fileName := fmt.Sprintf("%s%d%02d%02d_%02d_%02d_%02d.log",
iw.filePrefix,
now.Year(),
now.Month(),
now.Day(),
now.Hour(),
now.Minute(),
now.Second())
filePath := path.Join(iw.filePath, fileName)
iw.outFile, err = os.Create(filePath)
if err != nil {
return err
}
iw.fileDay = now.Day()
iw.fileCreateTime = now.Unix()
atomic.StoreInt64(&iw.writeBytes, 0)
}
return nil
}
func GetDefaultHandler() IOriginHandler {
return gLogger.(*Logger).SLogger.Handler().(IOriginHandler)
}
func NewTextLogger(level slog.Level, pathName string, filePrefix string, addSource bool, logChannelCap int) (ILogger, error) {
var logger Logger
logger.ioWriter.filePath = pathName
logger.ioWriter.filePrefix = filePrefix
logger.SLogger = slog.New(NewOriginTextHandler(level, &logger.ioWriter, addSource, defaultReplaceAttr))
logger.setLogChannel(logChannelCap)
err := logger.ioWriter.switchFile()
if err != nil {
return nil, err
}
return &logger, nil
}
func NewJsonLogger(level slog.Level, pathName string, filePrefix string, addSource bool, logChannelCap int) (ILogger, error) {
var logger Logger
logger.ioWriter.filePath = pathName
logger.ioWriter.filePrefix = filePrefix
logger.SLogger = slog.New(NewOriginJsonHandler(level, &logger.ioWriter, true, defaultReplaceAttr))
logger.setLogChannel(logChannelCap)
err := logger.ioWriter.switchFile()
if err != nil {
return nil, err
}
return &logger, nil
}
// Close It's dangerous to call the method on logging
func (logger *Logger) Close() {
logger.ioWriter.Close()
}
func (logger *Logger) Trace(msg string, args ...any) {
logger.SLogger.Log(context.Background(), LevelTrace, msg, args...)
}
func (logger *Logger) Debug(msg string, args ...any) {
logger.SLogger.Log(context.Background(), LevelDebug, msg, args...)
}
func (logger *Logger) Info(msg string, args ...any) {
logger.SLogger.Log(context.Background(), LevelInfo, msg, args...)
}
func (logger *Logger) Warning(msg string, args ...any) {
logger.SLogger.Log(context.Background(), LevelWarning, msg, args...)
}
func (logger *Logger) Error(msg string, args ...any) {
logger.SLogger.Log(context.Background(), LevelError, msg, args...)
}
func (logger *Logger) Stack(msg string, args ...any) {
logger.SLogger.Log(context.Background(), LevelStack, msg, args...)
}
func (logger *Logger) Dump(msg string, args ...any) {
logger.SLogger.Log(context.Background(), LevelDump, msg, args...)
}
func (logger *Logger) Fatal(msg string, args ...any) {
logger.SLogger.Log(context.Background(), LevelFatal, msg, args...)
os.Exit(1)
}
// SetLogger It's non-thread-safe
func SetLogger(logger ILogger) {
if logger != nil && isSetLogger == false { if logger != nil && isSetLogger == false {
gLogger = logger gLogger = logger
isSetLogger = true isSetLogger = true
} }
} }
func GetLogger() ILogger { func GetLogger() *Logger {
return gLogger return gLogger
} }
func Trace(msg string, args ...any) { func (logger *Logger) SetEncoder(encoder zapcore.Encoder) {
gLogger.Trace(msg, args...) logger.Encoder = encoder
} }
func Debug(msg string, args ...any) { func GetJsonEncoder() zapcore.Encoder {
gLogger.Debug(msg, args...) encoderConfig := zap.NewProductionEncoderConfig()
encoderConfig.EncodeLevel = zapcore.CapitalLevelEncoder
encoderConfig.EncodeCaller = zapcore.ShortCallerEncoder
encoderConfig.EncodeTime = func(t time.Time, enc zapcore.PrimitiveArrayEncoder) {
enc.AppendString(t.Format("2006-01-02 15:04:05.000"))
}
return zapcore.NewJSONEncoder(encoderConfig)
} }
func Info(msg string, args ...any) { func GetTxtEncoder() zapcore.Encoder {
gLogger.Info(msg, args...) encoderConfig := zap.NewProductionEncoderConfig()
encoderConfig.EncodeLevel = zapcore.CapitalLevelEncoder
encoderConfig.EncodeCaller = zapcore.ShortCallerEncoder
encoderConfig.EncodeTime = func(t time.Time, enc zapcore.PrimitiveArrayEncoder) {
enc.AppendString(t.Format("2006-01-02 15:04:05.000"))
}
return zapcore.NewConsoleEncoder(encoderConfig)
} }
func Warning(msg string, args ...any) { func getLogConfig() *lumberjack.Logger {
gLogger.Warning(msg, args...) return &lumberjack.Logger{
Filename: "",
MaxSize: 2048,
MaxBackups: 0,
MaxAge: 0,
Compress: false,
}
} }
func Error(msg string, args ...any) { func NewDefaultLogger() *Logger {
gLogger.Error(msg, args...) logger := Logger{}
logger.Encoder = GetJsonEncoder()
logger.LogConfig = getLogConfig()
logger.LogConfig.LocalTime = true
logger.Init()
return &logger
} }
func Stack(msg string, args ...any) { func (logger *Logger) SetLogLevel(level zapcore.Level) {
gLogger.Stack(msg, args...) logger.LogLevel = level
} }
func Dump(dump string, args ...any) { func (logger *Logger) Enabled(zapcore.Level) bool {
gLogger.Dump(dump, args...) return logger.stack
} }
func Fatal(msg string, args ...any) { func (logger *Logger) Init() {
gLogger.Fatal(msg, args...) var coreList []zapcore.Core
if logger.OpenConsole == nil || *logger.OpenConsole {
core := zapcore.NewCore(logger.Encoder, zapcore.AddSync(os.Stdout), logger.LogLevel)
coreList = append(coreList, core)
}
if logger.LogPath != "" {
writeSyncer := zapcore.AddSync(logger.LogConfig)
core := zapcore.NewCore(logger.Encoder, writeSyncer, logger.LogLevel)
coreList = append(coreList, core)
}
core := zapcore.NewTee(coreList...)
logger.Logger = zap.New(core, zap.AddCaller(), zap.AddStacktrace(logger), zap.AddCallerSkip(1))
logger.sugaredLogger = logger.Logger.Sugar()
} }
func Close() { func (logger *Logger) Debug(msg string, fields ...zap.Field) {
gLogger.Close() logger.Logger.Debug(msg, fields...)
} }
func ErrorAttr(key string, value error) slog.Attr { func (logger *Logger) Info(msg string, fields ...zap.Field) {
logger.Logger.Info(msg, fields...)
}
func (logger *Logger) Warn(msg string, fields ...zap.Field) {
logger.Logger.Warn(msg, fields...)
}
func (logger *Logger) Error(msg string, fields ...zap.Field) {
logger.Logger.Error(msg, fields...)
}
func (logger *Logger) StackError(msg string, args ...zap.Field) {
logger.stack = true
logger.Logger.Log(zapcore.ErrorLevel, msg, args...)
logger.stack = false
}
func (logger *Logger) Fatal(msg string, fields ...zap.Field) {
gLogger.stack = true
logger.Logger.Fatal(msg, fields...)
gLogger.stack = false
}
func Debug(msg string, fields ...zap.Field) {
gLogger.Logger.Debug(msg, fields...)
}
func Info(msg string, fields ...zap.Field) {
gLogger.Logger.Info(msg, fields...)
}
func Warn(msg string, fields ...zap.Field) {
gLogger.Logger.Warn(msg, fields...)
}
func Error(msg string, fields ...zap.Field) {
gLogger.Logger.Error(msg, fields...)
}
func StackError(msg string, fields ...zap.Field) {
gLogger.stack = true
gLogger.Logger.Error(msg, fields...)
gLogger.stack = false
}
func Fatal(msg string, fields ...zap.Field) {
gLogger.stack = true
gLogger.Logger.Fatal(msg, fields...)
gLogger.stack = false
}
func Debugf(msg string, args ...any) {
gLogger.sugaredLogger.Debugf(msg, args...)
}
func Infof(msg string, args ...any) {
gLogger.sugaredLogger.Infof(msg, args...)
}
func Warnf(msg string, args ...any) {
gLogger.sugaredLogger.Warnf(msg, args...)
}
func Errorf(msg string, args ...any) {
gLogger.sugaredLogger.Errorf(msg, args...)
}
func StackErrorf(msg string, args ...any) {
gLogger.stack = true
gLogger.sugaredLogger.Errorf(msg, args...)
gLogger.stack = false
}
func Fatalf(msg string, args ...any) {
gLogger.sugaredLogger.Fatalf(msg, args...)
}
func (logger *Logger) SDebug(args ...interface{}) {
logger.sugaredLogger.Debugln(args...)
}
func (logger *Logger) SInfo(args ...interface{}) {
logger.sugaredLogger.Infoln(args...)
}
func (logger *Logger) SWarn(args ...interface{}) {
logger.sugaredLogger.Warnln(args...)
}
func (logger *Logger) SError(args ...interface{}) {
logger.sugaredLogger.Errorln(args...)
}
func (logger *Logger) SStackError(args ...interface{}) {
gLogger.stack = true
logger.sugaredLogger.Errorln(args...)
gLogger.stack = false
}
func (logger *Logger) SFatal(args ...interface{}) {
gLogger.stack = true
logger.sugaredLogger.Fatalln(args...)
gLogger.stack = false
}
func SDebug(args ...interface{}) {
gLogger.sugaredLogger.Debugln(args...)
}
func SInfo(args ...interface{}) {
gLogger.sugaredLogger.Infoln(args...)
}
func SWarn(args ...interface{}) {
gLogger.sugaredLogger.Warnln(args...)
}
func SError(args ...interface{}) {
gLogger.sugaredLogger.Errorln(args...)
}
func SStackError(args ...interface{}) {
gLogger.stack = true
gLogger.sugaredLogger.Errorln(args...)
gLogger.stack = false
}
func SFatal(args ...interface{}) {
gLogger.stack = true
gLogger.sugaredLogger.Fatalln(args...)
gLogger.stack = false
}
func ErrorField(key string, value error) zap.Field {
if value == nil { if value == nil {
return slog.Attr{Key: key, Value: slog.StringValue("nil")} return zap.String(key, "nil")
} }
return zap.String(key, value.Error())
return slog.Attr{Key: key, Value: slog.StringValue(value.Error())}
} }
func String(key, value string) slog.Attr { func String(key, value string) zap.Field {
return slog.Attr{Key: key, Value: slog.StringValue(value)} return zap.String(key, value)
} }
func Int(key string, value int) slog.Attr { func Int(key string, value int) zap.Field {
return slog.Attr{Key: key, Value: slog.Int64Value(int64(value))} return zap.Int(key, value)
} }
func Int64(key string, value int64) slog.Attr { func Int64(key string, value int64) zap.Field {
return slog.Attr{Key: key, Value: slog.Int64Value(value)} return zap.Int64(key, value)
} }
func Int32(key string, value int32) slog.Attr { func Int32(key string, value int32) zap.Field {
return slog.Attr{Key: key, Value: slog.Int64Value(int64(value))} return zap.Int32(key, value)
} }
func Int16(key string, value int16) slog.Attr { func Int16(key string, value int16) zap.Field {
return slog.Attr{Key: key, Value: slog.Int64Value(int64(value))} return zap.Int16(key, value)
} }
func Int8(key string, value int8) slog.Attr { func Int8(key string, value int8) zap.Field {
return slog.Attr{Key: key, Value: slog.Int64Value(int64(value))} return zap.Int8(key, value)
} }
func Uint(key string, value uint) slog.Attr { func Uint(key string, value uint) zap.Field {
return slog.Attr{Key: key, Value: slog.Uint64Value(uint64(value))} return zap.Uint(key, value)
} }
func Uint64(key string, v uint64) slog.Attr { func Uint64(key string, v uint64) zap.Field {
return slog.Attr{Key: key, Value: slog.Uint64Value(v)} return zap.Uint64(key, v)
} }
func Uint32(key string, value uint32) slog.Attr { func Uint32(key string, value uint32) zap.Field {
return slog.Attr{Key: key, Value: slog.Uint64Value(uint64(value))} return zap.Uint32(key, value)
} }
func Uint16(key string, value uint16) slog.Attr { func Uint16(key string, value uint16) zap.Field {
return slog.Attr{Key: key, Value: slog.Uint64Value(uint64(value))} return zap.Uint16(key, value)
} }
func Uint8(key string, value uint8) slog.Attr { func Uint8(key string, value uint8) zap.Field {
return slog.Attr{Key: key, Value: slog.Uint64Value(uint64(value))} return zap.Uint8(key, value)
} }
func Float64(key string, v float64) slog.Attr { func Float64(key string, v float64) zap.Field {
return slog.Attr{Key: key, Value: slog.Float64Value(v)} return zap.Float64(key, v)
} }
func Bool(key string, v bool) slog.Attr { func Bool(key string, v bool) zap.Field {
return slog.Attr{Key: key, Value: slog.BoolValue(v)} return zap.Bool(key, v)
} }
func Time(key string, v time.Time) slog.Attr { func Bools(key string, v []bool) zap.Field {
return slog.Attr{Key: key, Value: slog.TimeValue(v)} return zap.Bools(key, v)
} }
func Duration(key string, v time.Duration) slog.Attr { func Time(key string, v time.Time) zap.Field {
return slog.Attr{Key: key, Value: slog.DurationValue(v)} return zap.Time(key, v)
} }
func Any(key string, value any) slog.Attr { func Duration(key string, v time.Duration) zap.Field {
return slog.Attr{Key: key, Value: slog.AnyValue(value)} return zap.Duration(key, v)
} }
func Group(key string, args ...any) slog.Attr { func Durations(key string, v []time.Duration) zap.Field {
return slog.Group(key, args...) return zap.Durations(key, v)
} }
func (logger *Logger) DoSPrintf(level slog.Level, a []interface{}) { func Any(key string, value any) zap.Field {
if logger.SLogger.Enabled(context.Background(), level) == false { return zap.Any(key, value)
return
}
logger.SLogger.Handler().(IOriginHandler).Lock()
defer logger.SLogger.Handler().(IOriginHandler).UnLock()
logger.sBuff.Reset()
logger.FormatHeader(&logger.sBuff, level, 3)
for _, s := range a {
logger.sBuff.AppendString(slog.AnyValue(s).String())
}
logger.sBuff.AppendString("\"\n")
logger.ioWriter.Write(logger.sBuff.Bytes())
}
func (logger *Logger) STrace(a ...interface{}) {
logger.DoSPrintf(LevelTrace, a)
}
func (logger *Logger) SDebug(a ...interface{}) {
logger.DoSPrintf(LevelDebug, a)
}
func (logger *Logger) SInfo(a ...interface{}) {
logger.DoSPrintf(LevelInfo, a)
}
func (logger *Logger) SWarning(a ...interface{}) {
logger.DoSPrintf(LevelWarning, a)
}
func (logger *Logger) SError(a ...interface{}) {
logger.DoSPrintf(LevelError, a)
}
func STrace(a ...interface{}) {
gLogger.DoSPrintf(LevelTrace, a)
}
func SDebug(a ...interface{}) {
gLogger.DoSPrintf(LevelDebug, a)
}
func SInfo(a ...interface{}) {
gLogger.DoSPrintf(LevelInfo, a)
}
func SWarning(a ...interface{}) {
gLogger.DoSPrintf(LevelWarning, a)
}
func SError(a ...interface{}) {
gLogger.DoSPrintf(LevelError, a)
}
func (logger *Logger) FormatHeader(buf *Buffer, level slog.Level, callDepth int) {
t := time.Now()
var file string
var line int
// Release lock while getting caller info - it's expensive.
var ok bool
_, file, line, ok = runtime.Caller(callDepth)
if !ok {
file = "???"
line = 0
}
file = filepath.Base(file)
buf.AppendString("time=\"")
buf.AppendString(t.Format("2006/01/02 15:04:05"))
buf.AppendString("\"")
logger.sBuff.AppendString(" level=")
logger.sBuff.AppendString(getStrLevel(level))
logger.sBuff.AppendString(" source=")
buf.AppendString(file)
buf.AppendByte(':')
buf.AppendInt(int64(line))
buf.AppendString(" msg=\"")
} }

View File

@@ -111,15 +111,15 @@ func (netConn *NetConn) doWrite(b []byte) error {
} }
// b must not be modified by the others goroutines // b must not be modified by the others goroutines
func (netConn *NetConn) Write(b []byte) error { func (netConn *NetConn) Write(b []byte) (int,error) {
netConn.Lock() netConn.Lock()
defer netConn.Unlock() defer netConn.Unlock()
if atomic.LoadInt32(&netConn.closeFlag) == 1 || b == nil { if atomic.LoadInt32(&netConn.closeFlag) == 1 || b == nil {
netConn.ReleaseReadMsg(b) netConn.ReleaseReadMsg(b)
return errors.New("conn is close") return 0,errors.New("conn is close")
} }
return netConn.doWrite(b) return len(b),netConn.doWrite(b)
} }
func (netConn *NetConn) Read(b []byte) (int, error) { func (netConn *NetConn) Read(b []byte) (int, error) {
@@ -150,7 +150,7 @@ func (netConn *NetConn) WriteMsg(args ...[]byte) error {
if atomic.LoadInt32(&netConn.closeFlag) == 1 { if atomic.LoadInt32(&netConn.closeFlag) == 1 {
return errors.New("conn is close") return errors.New("conn is close")
} }
return netConn.msgParser.Write(netConn.conn, args...) return netConn.msgParser.Write(netConn, args...)
} }
func (netConn *NetConn) WriteRawMsg(args []byte) error { func (netConn *NetConn) WriteRawMsg(args []byte) error {
@@ -158,7 +158,8 @@ func (netConn *NetConn) WriteRawMsg(args []byte) error {
return errors.New("conn is close") return errors.New("conn is close")
} }
return netConn.Write(args) _,err:= netConn.Write(args)
return err
} }
func (netConn *NetConn) IsConnected() bool { func (netConn *NetConn) IsConnected() bool {

View File

@@ -106,7 +106,7 @@ func (client *KCPClient) dial() net.Conn {
return conn return conn
} }
log.Warning("connect error ", log.String("error", err.Error()), log.String("Addr", client.Addr)) log.Warn("connect error ", log.String("error", err.Error()), log.String("Addr", client.Addr))
time.Sleep(client.ConnectInterval) time.Sleep(client.ConnectInterval)
continue continue
} }

View File

@@ -203,7 +203,7 @@ func (kp *KCPServer) initSession(session *kcp.UDPSession) {
func (kp *KCPServer) run(listener *kcp.Listener) bool { func (kp *KCPServer) run(listener *kcp.Listener) bool {
conn, err := listener.Accept() conn, err := listener.Accept()
if err != nil { if err != nil {
log.Error("accept error", log.String("ListenAddr", kp.kcpCfg.ListenAddr), log.ErrorAttr("err", err)) log.Error("accept error", log.String("ListenAddr", kp.kcpCfg.ListenAddr), log.ErrorField("err", err))
return false return false
} }
@@ -211,7 +211,7 @@ func (kp *KCPServer) run(listener *kcp.Listener) bool {
if len(kp.conns) >= kp.kcpCfg.MaxConnNum { if len(kp.conns) >= kp.kcpCfg.MaxConnNum {
kp.mutexConns.Unlock() kp.mutexConns.Unlock()
conn.Close() conn.Close()
log.Warning("too many connections") log.Warn("too many connections")
return true return true
} }
kp.conns[conn] = struct{}{} kp.conns[conn] = struct{}{}

View File

@@ -102,7 +102,7 @@ func (client *TCPClient) dial() net.Conn {
return conn return conn
} }
log.Warning("connect error ", log.String("error", err.Error()), log.String("Addr", client.Addr)) log.Warn("connect error ", log.String("error", err.Error()), log.String("Addr", client.Addr))
time.Sleep(client.ConnectInterval) time.Sleep(client.ConnectInterval)
continue continue
} }

View File

@@ -146,7 +146,7 @@ func (server *TCPServer) run() {
if len(server.conns) >= server.MaxConnNum { if len(server.conns) >= server.MaxConnNum {
server.mutexConns.Unlock() server.mutexConns.Unlock()
conn.Close() conn.Close()
log.Warning("too many connections") log.Warn("too many connections")
continue continue
} }

View File

@@ -68,7 +68,7 @@ func (handler *WSHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if len(handler.conns) >= handler.maxConnNum { if len(handler.conns) >= handler.maxConnNum {
handler.mutexConns.Unlock() handler.mutexConns.Unlock()
conn.Close() conn.Close()
log.Warning("too many connections") log.Warn("too many connections")
return return
} }
handler.conns[conn] = struct{}{} handler.conns[conn] = struct{}{}

View File

@@ -11,11 +11,13 @@ import (
"github.com/duanhf2012/origin/v2/util/buildtime" "github.com/duanhf2012/origin/v2/util/buildtime"
"github.com/duanhf2012/origin/v2/util/sysprocess" "github.com/duanhf2012/origin/v2/util/sysprocess"
"github.com/duanhf2012/origin/v2/util/timer" "github.com/duanhf2012/origin/v2/util/timer"
"go.uber.org/zap/zapcore"
"io" "io"
"net/http" "net/http"
_ "net/http/pprof" _ "net/http/pprof"
"os" "os"
"os/signal" "os/signal"
"path/filepath"
"strconv" "strconv"
"strings" "strings"
"syscall" "syscall"
@@ -54,10 +56,9 @@ func init() {
console.RegisterCommandString("retire", "", "<-retire nodeid=nodeid> retire originserver process.", retireNode) console.RegisterCommandString("retire", "", "<-retire nodeid=nodeid> retire originserver process.", retireNode)
console.RegisterCommandString("config", "", "<-config path> Configuration file path.", setConfigPath) console.RegisterCommandString("config", "", "<-config path> Configuration file path.", setConfigPath)
console.RegisterCommandString("console", "", "<-console true|false> Turn on or off screen log output.", openConsole) console.RegisterCommandString("console", "", "<-console true|false> Turn on or off screen log output.", openConsole)
console.RegisterCommandString("loglevel", "debug", "<-loglevel debug|release|warning|error|fatal> Set loglevel.", setLevel) console.RegisterCommandString("loglevel", "debug", "<-loglevel debug|info|warn|error|stackerror|fatal> Set loglevel.", setLevel)
console.RegisterCommandString("logpath", "", "<-logpath path> Set log file path.", setLogPath) console.RegisterCommandString("logpath", "", "<-logpath path> Set log file path.", setLogPath)
console.RegisterCommandInt("logsize", 0, "<-logsize size> Set log size(MB).", setLogSize) console.RegisterCommandInt("logsize", 0, "<-logsize size> Set log size(MB).", setLogSize)
console.RegisterCommandInt("logchannelcap", -1, "<-logchannelcap num> Set log channel cap.", setLogChannelCapNum)
console.RegisterCommandString("pprof", "", "<-pprof ip:port> Open performance analysis.", setPprof) console.RegisterCommandString("pprof", "", "<-pprof ip:port> Open performance analysis.", setPprof)
} }
@@ -156,12 +157,14 @@ func initNode(id string) {
nodeId = id nodeId = id
err := cluster.GetCluster().Init(GetNodeId(), Setup) err := cluster.GetCluster().Init(GetNodeId(), Setup)
if err != nil { if err != nil {
log.Error("Init cluster fail", log.ErrorAttr("error", err)) log.Error("Init cluster fail", log.ErrorField("error", err))
os.Exit(1) os.Exit(1)
} }
err = initLog() err = initLog()
if err != nil { if err != nil {
log.Error("Init log fail", log.ErrorField("error", err))
os.Exit(1)
return return
} }
@@ -211,22 +214,26 @@ func initNode(id string) {
} }
//3.service初始化 //3.service初始化
log.Info("Start running server.")
service.Init() service.Init()
} }
func initLog() error { func initLog() error {
if log.LogPath == "" { logger := log.GetLogger()
setLogPath("./log") if logger.LogPath == "" {
err := setLogPath("./log")
if err != nil {
return err
}
} }
localNodeInfo := cluster.GetCluster().GetLocalNodeInfo() localNodeInfo := cluster.GetCluster().GetLocalNodeInfo()
filePre := fmt.Sprintf("%s_", localNodeInfo.NodeId) fileName := fmt.Sprintf("%s.log", localNodeInfo.NodeId)
logger, err := log.NewTextLogger(log.LogLevel, log.LogPath, filePre, true, log.LogChannelCap) logger.FileName = fileName
if err != nil { filepath.Join()
fmt.Printf("cannot create log file!\n") logger.LogConfig.Filename = filepath.Join(logger.LogPath, logger.FileName)
return err
} logger.Init()
log.SetLogger(logger)
return nil return nil
} }
@@ -323,7 +330,7 @@ func startNode(args interface{}) error {
myName, mErr := sysprocess.GetMyProcessName() myName, mErr := sysprocess.GetMyProcessName()
//当前进程名获取失败,不应该发生 //当前进程名获取失败,不应该发生
if mErr != nil { if mErr != nil {
log.Info("get my process's name is error", log.ErrorAttr("err", mErr)) log.Info("get my process's name is error", log.ErrorField("err", mErr))
os.Exit(-1) os.Exit(-1)
} }
@@ -336,11 +343,11 @@ func startNode(args interface{}) error {
} }
//2.记录进程id号 //2.记录进程id号
log.Info("Start running server.")
writeProcessPid(strNodeId) writeProcessPid(strNodeId)
timer.StartTimer(10*time.Millisecond, 1000000) timer.StartTimer(10*time.Millisecond, 1000000)
//3.初始化node //3.初始化node
defer log.GetLogger().Logger.Sync()
initNode(strNodeId) initNode(strNodeId)
//4.运行service //4.运行service
@@ -378,7 +385,7 @@ func startNode(args interface{}) error {
cluster.GetCluster().Stop() cluster.GetCluster().Stop()
log.Info("Server is stop.") log.Info("Server is stop.")
log.Close()
return nil return nil
} }
@@ -400,8 +407,8 @@ func SetupTemplateFunc(fs ...func() service.IService) {
} }
} }
func SetupTemplate[T any,P templateServicePoint[T]]() { func SetupTemplate[T any, P templateServicePoint[T]]() {
SetupTemplateFunc(func() service.IService{ SetupTemplateFunc(func() service.IService {
var t T var t T
return P(&t) return P(&t)
}) })
@@ -430,9 +437,11 @@ func openConsole(args interface{}) error {
} }
strOpen := strings.ToLower(strings.TrimSpace(args.(string))) strOpen := strings.ToLower(strings.TrimSpace(args.(string)))
if strOpen == "false" { if strOpen == "false" {
log.OpenConsole = false bOpenConsole := false
log.GetLogger().OpenConsole = &bOpenConsole
} else if strOpen == "true" { } else if strOpen == "true" {
log.OpenConsole = true bOpenConsole := true
log.GetLogger().OpenConsole = &bOpenConsole
} else { } else {
return errors.New("parameter console error") return errors.New("parameter console error")
} }
@@ -446,20 +455,18 @@ func setLevel(args interface{}) error {
strlogLevel := strings.TrimSpace(args.(string)) strlogLevel := strings.TrimSpace(args.(string))
switch strlogLevel { switch strlogLevel {
case "trace":
log.LogLevel = log.LevelTrace
case "debug": case "debug":
log.LogLevel = log.LevelDebug log.GetLogger().LogLevel = zapcore.DebugLevel
case "info": case "info":
log.LogLevel = log.LevelInfo log.GetLogger().LogLevel = zapcore.InfoLevel
case "warning": case "warn":
log.LogLevel = log.LevelWarning log.GetLogger().LogLevel = zapcore.WarnLevel
case "error": case "error":
log.LogLevel = log.LevelError log.GetLogger().LogLevel = zapcore.ErrorLevel
case "stack": case "stackerror":
log.LogLevel = log.LevelStack log.GetLogger().LogLevel = zapcore.ErrorLevel
case "fatal": case "fatal":
log.LogLevel = log.LevelFatal log.GetLogger().LogLevel = zapcore.FatalLevel
default: default:
return errors.New("unknown level: " + strlogLevel) return errors.New("unknown level: " + strlogLevel)
} }
@@ -470,52 +477,33 @@ func setLogPath(args interface{}) error {
if args == "" { if args == "" {
return nil return nil
} }
logPath := strings.TrimSpace(args.(string))
log.LogPath = strings.TrimSpace(args.(string)) dir, err := os.Stat(logPath)
dir, err := os.Stat(log.LogPath) //这个文件夹不存在
if err == nil && dir.IsDir() == false { if err == nil && dir.IsDir() == false {
return errors.New("Not found dir " + log.LogPath) return errors.New("Not found dir " + logPath)
} }
if err != nil { if err != nil {
err = os.Mkdir(log.LogPath, os.ModePerm) err = os.Mkdir(log.GetLogger().LogPath, os.ModePerm)
if err != nil { if err != nil {
return errors.New("Cannot create dir " + log.LogPath) return errors.New("Cannot create dir " + log.GetLogger().LogPath)
} }
} }
log.GetLogger().LogPath = logPath
return nil return nil
} }
func setLogSize(args interface{}) error { func setLogSize(args interface{}) error {
if args == "" {
return nil
}
logSize, ok := args.(int) logSize, ok := args.(int)
if ok == false { if ok == false {
return errors.New("param logsize is error") return errors.New("param logsize is error")
} }
if logSize == 0 {
log.LogSize = int64(logSize) * 1024 * 1024
return nil
}
func setLogChannelCapNum(args interface{}) error {
if args == "" {
return nil return nil
} }
logChannelCap, ok := args.(int) log.GetLogger().LogConfig.MaxSize = logSize
if ok == false {
return errors.New("param logsize is error")
}
if logChannelCap == -1 {
return nil
}
log.LogChannelCap = logChannelCap
return nil return nil
} }

View File

@@ -75,7 +75,7 @@ func (cs *CallSet) AddPending(call *Call) {
if call.Seq == 0 { if call.Seq == 0 {
cs.pendingLock.Unlock() cs.pendingLock.Unlock()
log.Stack("call is error.") log.StackError("call is error.")
return return
} }

View File

@@ -104,7 +104,7 @@ func (client *Client) processRpcResponse(responseData []byte) error {
//rc.conn.ReleaseReadMsg(bytes) //rc.conn.ReleaseReadMsg(bytes)
if err != nil { if err != nil {
processor.ReleaseRpcResponse(response.RpcResponseData) processor.ReleaseRpcResponse(response.RpcResponseData)
log.Error("rpcClient Unmarshal head error", log.ErrorAttr("error", err)) log.Error("rpcClient Unmarshal head error", log.ErrorField("error", err))
return nil return nil
} }
@@ -116,7 +116,7 @@ func (client *Client) processRpcResponse(responseData []byte) error {
if len(response.RpcResponseData.GetReply()) > 0 { if len(response.RpcResponseData.GetReply()) > 0 {
err = processor.Unmarshal(response.RpcResponseData.GetReply(), v.Reply) err = processor.Unmarshal(response.RpcResponseData.GetReply(), v.Reply)
if err != nil { if err != nil {
log.Error("rpcClient Unmarshal body failed", log.ErrorAttr("error", err)) log.Error("rpcClient Unmarshal body failed", log.ErrorField("error", err))
v.Err = err v.Err = err
} }
} }
@@ -203,7 +203,7 @@ func (client *Client) rawGo(nodeId string, w IWriter, timeout time.Duration, rpc
} }
if err != nil { if err != nil {
client.RemovePending(call.Seq) client.RemovePending(call.Seq)
log.Error("WriteMsg is fail", log.ErrorAttr("error", err)) log.Error("WriteMsg is fail", log.ErrorField("error", err))
call.Seq = 0 call.Seq = 0
call.DoError(err) call.DoError(err)
} }

View File

@@ -42,7 +42,7 @@ func (server *BaseServer) selfNodeRpcHandlerGo(timeout time.Duration, processor
rpcHandler := server.rpcHandleFinder.FindRpcHandler(handlerName) rpcHandler := server.rpcHandleFinder.FindRpcHandler(handlerName)
if rpcHandler == nil { if rpcHandler == nil {
err := errors.New("service method " + serviceMethod + " not config!") err := errors.New("service method " + serviceMethod + " not config!")
log.Error("service method not config", log.String("serviceMethod", serviceMethod), log.ErrorAttr("error", err)) log.Error("service method not config", log.String("serviceMethod", serviceMethod), log.ErrorField("error", err))
pCall.Seq = 0 pCall.Seq = 0
pCall.DoError(err) pCall.DoError(err)
@@ -74,7 +74,7 @@ func (server *BaseServer) selfNodeRpcHandlerGo(timeout time.Duration, processor
var err error var err error
req.inParam, err = rpcHandler.UnmarshalInParam(processor, serviceMethod, rpcMethodId, rawArgs) req.inParam, err = rpcHandler.UnmarshalInParam(processor, serviceMethod, rpcMethodId, rawArgs)
if err != nil { if err != nil {
log.Error("unmarshalInParam is failed", log.String("serviceMethod", serviceMethod), log.Uint32("rpcMethodId", rpcMethodId), log.ErrorAttr("error", err)) log.Error("unmarshalInParam is failed", log.String("serviceMethod", serviceMethod), log.Uint32("rpcMethodId", rpcMethodId), log.ErrorField("error", err))
pCall.Seq = 0 pCall.Seq = 0
pCall.DoError(err) pCall.DoError(err)
ReleaseRpcRequest(req) ReleaseRpcRequest(req)
@@ -90,12 +90,12 @@ func (server *BaseServer) selfNodeRpcHandlerGo(timeout time.Duration, processor
byteReturns, err := req.rpcProcessor.Marshal(Returns) byteReturns, err := req.rpcProcessor.Marshal(Returns)
if err != nil { if err != nil {
Err = ConvertError(err) Err = ConvertError(err)
log.Error("returns data cannot be marshal", log.Uint64("seq", callSeq), log.ErrorAttr("error", err)) log.Error("returns data cannot be marshal", log.Uint64("seq", callSeq), log.ErrorField("error", err))
} else { } else {
err = req.rpcProcessor.Unmarshal(byteReturns, reply) err = req.rpcProcessor.Unmarshal(byteReturns, reply)
if err != nil { if err != nil {
Err = ConvertError(err) Err = ConvertError(err)
log.Error("returns data cannot be Unmarshal", log.Uint64("seq", callSeq), log.ErrorAttr("error", err)) log.Error("returns data cannot be Unmarshal", log.Uint64("seq", callSeq), log.ErrorField("error", err))
} }
} }
} }
@@ -266,7 +266,7 @@ func (server *BaseServer) processRpcRequest(data []byte, connTag string, wrRespo
req.inParam, err = rpcHandler.UnmarshalInParam(req.rpcProcessor, req.RpcRequestData.GetServiceMethod(), req.RpcRequestData.GetRpcMethodId(), req.RpcRequestData.GetInParam()) req.inParam, err = rpcHandler.UnmarshalInParam(req.rpcProcessor, req.RpcRequestData.GetServiceMethod(), req.RpcRequestData.GetRpcMethodId(), req.RpcRequestData.GetInParam())
if err != nil { if err != nil {
rErr := "Call Rpc " + req.RpcRequestData.GetServiceMethod() + " Param error " + err.Error() rErr := "Call Rpc " + req.RpcRequestData.GetServiceMethod() + " Param error " + err.Error()
log.Error("call rpc param error", log.String("serviceMethod", req.RpcRequestData.GetServiceMethod()), log.ErrorAttr("error", err)) log.Error("call rpc param error", log.String("serviceMethod", req.RpcRequestData.GetServiceMethod()), log.ErrorField("error", err))
if req.requestHandle != nil { if req.requestHandle != nil {
req.requestHandle(nil, RpcError(rErr)) req.requestHandle(nil, RpcError(rErr))
} else { } else {

View File

@@ -50,7 +50,7 @@ func (nc *NatsClient) Go(nodeId string, timeout time.Duration, rpcHandler IRpcHa
_, processor := GetProcessorType(args) _, processor := GetProcessorType(args)
InParam, err := processor.Marshal(args) InParam, err := processor.Marshal(args)
if err != nil { if err != nil {
log.Error("Marshal is fail", log.ErrorAttr("error", err)) log.Error("Marshal is fail", log.ErrorField("error", err))
call := MakeCall() call := MakeCall()
call.DoError(err) call.DoError(err)
return call return call

View File

@@ -48,7 +48,7 @@ func (ns *NatsServer) Start() error {
ns.natsConn, err = nats.Connect(ns.natsUrl, options...) ns.natsConn, err = nats.Connect(ns.natsUrl, options...)
if err != nil { if err != nil {
log.Error("Connect to nats fail", log.String("natsUrl", ns.natsUrl), log.ErrorAttr("err", err)) log.Error("Connect to nats fail", log.String("natsUrl", ns.natsUrl), log.ErrorField("err", err))
return err return err
} }
@@ -77,7 +77,7 @@ func (ns *NatsServer) WriteResponse(processor IRpcProcessor, nodeId string, serv
defer processor.ReleaseRpcResponse(rpcResponse.RpcResponseData) defer processor.ReleaseRpcResponse(rpcResponse.RpcResponseData)
if err != nil { if err != nil {
log.Error("marshal RpcResponseData failed", log.String("serviceMethod", serviceMethod), log.ErrorAttr("error", err)) log.Error("marshal RpcResponseData failed", log.String("serviceMethod", serviceMethod), log.ErrorField("error", err))
return return
} }
@@ -86,7 +86,7 @@ func (ns *NatsServer) WriteResponse(processor IRpcProcessor, nodeId string, serv
if ns.compressBytesLen > 0 && len(bytes) >= ns.compressBytesLen { if ns.compressBytesLen > 0 && len(bytes) >= ns.compressBytesLen {
compressBuff, err = compressor.CompressBlock(bytes) compressBuff, err = compressor.CompressBlock(bytes)
if err != nil { if err != nil {
log.Error("CompressBlock failed", log.String("serviceMethod", serviceMethod), log.ErrorAttr("error", err)) log.Error("CompressBlock failed", log.String("serviceMethod", serviceMethod), log.ErrorField("error", err))
return return
} }
if len(compressBuff) < len(bytes) { if len(compressBuff) < len(bytes) {
@@ -106,7 +106,7 @@ func (ns *NatsServer) WriteResponse(processor IRpcProcessor, nodeId string, serv
} }
if err != nil { if err != nil {
log.Error("WriteMsg error,Rpc return is fail", log.String("nodeId", nodeId), log.String("serviceMethod", serviceMethod), log.ErrorAttr("error", err)) log.Error("WriteMsg error,Rpc return is fail", log.String("nodeId", nodeId), log.String("serviceMethod", serviceMethod), log.ErrorField("error", err))
} }
} }

View File

@@ -6,7 +6,7 @@ import (
"github.com/duanhf2012/origin/v2/network" "github.com/duanhf2012/origin/v2/network"
"math" "math"
"reflect" "reflect"
"runtime"
"sync/atomic" "sync/atomic"
"time" "time"
) )
@@ -49,7 +49,7 @@ func (rc *RClient) Go(nodeId string, timeout time.Duration, rpcHandler IRpcHandl
_, processor := GetProcessorType(args) _, processor := GetProcessorType(args)
InParam, err := processor.Marshal(args) InParam, err := processor.Marshal(args)
if err != nil { if err != nil {
log.Error("Marshal is fail", log.ErrorAttr("error", err)) log.Error("Marshal is fail", log.ErrorField("error", err))
call := MakeCall() call := MakeCall()
call.DoError(err) call.DoError(err)
return call return call
@@ -74,10 +74,7 @@ func (rc *RClient) AsyncCall(nodeId string, timeout time.Duration, rpcHandler IR
func (rc *RClient) Run() { func (rc *RClient) Run() {
defer func() { defer func() {
if r := recover(); r != nil { if r := recover(); r != nil {
buf := make([]byte, 4096) log.StackError(fmt.Sprint(r))
l := runtime.Stack(buf, false)
errString := fmt.Sprint(r)
log.Dump(string(buf[:l]), log.String("error", errString))
} }
}() }()
@@ -89,7 +86,7 @@ func (rc *RClient) Run() {
for { for {
bytes, err := rc.conn.ReadMsg() bytes, err := rc.conn.ReadMsg()
if err != nil { if err != nil {
log.Error("RClient read msg is failed", log.ErrorAttr("error", err)) log.Error("RClient read msg is failed", log.ErrorField("error", err))
return return
} }

View File

@@ -6,7 +6,7 @@ import (
"github.com/duanhf2012/origin/v2/event" "github.com/duanhf2012/origin/v2/event"
"github.com/duanhf2012/origin/v2/log" "github.com/duanhf2012/origin/v2/log"
"reflect" "reflect"
"runtime"
"strings" "strings"
"time" "time"
"unicode" "unicode"
@@ -220,10 +220,7 @@ func (handler *RpcHandler) RegisterRpc(rpcHandler IRpcHandler) error {
func (handler *RpcHandler) HandlerRpcResponseCB(call *Call) { func (handler *RpcHandler) HandlerRpcResponseCB(call *Call) {
defer func() { defer func() {
if r := recover(); r != nil { if r := recover(); r != nil {
buf := make([]byte, 4096) log.StackError(fmt.Sprint(r))
l := runtime.Stack(buf, false)
errString := fmt.Sprint(r)
log.Dump(string(buf[:l]), log.String("error", errString))
} }
}() }()
@@ -242,10 +239,7 @@ func (handler *RpcHandler) HandlerRpcRequest(request *RpcRequest) {
defer func() { defer func() {
if r := recover(); r != nil { if r := recover(); r != nil {
buf := make([]byte, 4096) log.StackError(fmt.Sprint(r))
l := runtime.Stack(buf, false)
errString := fmt.Sprint(r)
log.Dump(string(buf[:l]), log.String("error", errString))
rpcErr := RpcError("call error : core dumps") rpcErr := RpcError("call error : core dumps")
if request.requestHandle != nil { if request.requestHandle != nil {
request.requestHandle(nil, rpcErr) request.requestHandle(nil, rpcErr)
@@ -439,7 +433,7 @@ func (handler *RpcHandler) goRpc(processor IRpcProcessor, bCast bool, nodeId str
err, pClientList := handler.funcRpcClient(nodeId, serviceMethod, false, pClientList) err, pClientList := handler.funcRpcClient(nodeId, serviceMethod, false, pClientList)
if len(pClientList) == 0 { if len(pClientList) == 0 {
if err != nil { if err != nil {
log.Error("call serviceMethod is failed", log.String("serviceMethod", serviceMethod), log.ErrorAttr("error", err)) log.Error("call serviceMethod is failed", log.String("serviceMethod", serviceMethod), log.ErrorField("error", err))
} else { } else {
log.Error("cannot find serviceMethod", log.String("serviceMethod", serviceMethod)) log.Error("cannot find serviceMethod", log.String("serviceMethod", serviceMethod))
} }
@@ -468,7 +462,7 @@ func (handler *RpcHandler) callRpc(timeout time.Duration, nodeId string, service
pClientList := make([]*Client, 0, maxClusterNode) pClientList := make([]*Client, 0, maxClusterNode)
err, pClientList := handler.funcRpcClient(nodeId, serviceMethod, false, pClientList) err, pClientList := handler.funcRpcClient(nodeId, serviceMethod, false, pClientList)
if err != nil { if err != nil {
log.Error("Call serviceMethod is failed", log.ErrorAttr("error", err)) log.Error("Call serviceMethod is failed", log.ErrorField("error", err))
return err return err
} else if len(pClientList) <= 0 { } else if len(pClientList) <= 0 {
err = errors.New("Call serviceMethod is error:cannot find " + serviceMethod) err = errors.New("Call serviceMethod is error:cannot find " + serviceMethod)
@@ -592,7 +586,7 @@ func (handler *RpcHandler) RawGoNode(rpcProcessorType RpcProcessorType, nodeId s
pClientList := make([]*Client, 0, 1) pClientList := make([]*Client, 0, 1)
err, pClientList := handler.funcRpcClient(nodeId, serviceName, false, pClientList) err, pClientList := handler.funcRpcClient(nodeId, serviceName, false, pClientList)
if len(pClientList) == 0 || err != nil { if len(pClientList) == 0 || err != nil {
log.Error("call serviceMethod is failed", log.ErrorAttr("error", err)) log.Error("call serviceMethod is failed", log.ErrorField("error", err))
return err return err
} }
if len(pClientList) > 1 { if len(pClientList) > 1 {

View File

@@ -7,7 +7,7 @@ import (
"math" "math"
"net" "net"
"reflect" "reflect"
"runtime"
"strings" "strings"
"time" "time"
) )
@@ -130,7 +130,7 @@ func (agent *RpcAgent) WriteResponse(processor IRpcProcessor, connTag string, se
defer processor.ReleaseRpcResponse(rpcResponse.RpcResponseData) defer processor.ReleaseRpcResponse(rpcResponse.RpcResponseData)
if errM != nil { if errM != nil {
log.Error("marshal RpcResponseData failed", log.String("serviceMethod", serviceMethod), log.ErrorAttr("error", errM)) log.Error("marshal RpcResponseData failed", log.String("serviceMethod", serviceMethod), log.ErrorField("error", errM))
return return
} }
@@ -141,7 +141,7 @@ func (agent *RpcAgent) WriteResponse(processor IRpcProcessor, connTag string, se
compressBuff, cErr = compressor.CompressBlock(bytes) compressBuff, cErr = compressor.CompressBlock(bytes)
if cErr != nil { if cErr != nil {
log.Error("CompressBlock failed", log.String("serviceMethod", serviceMethod), log.ErrorAttr("error", cErr)) log.Error("CompressBlock failed", log.String("serviceMethod", serviceMethod), log.ErrorField("error", cErr))
return return
} }
if len(compressBuff) < len(bytes) { if len(compressBuff) < len(bytes) {
@@ -155,17 +155,14 @@ func (agent *RpcAgent) WriteResponse(processor IRpcProcessor, connTag string, se
compressor.CompressBufferCollection(compressBuff) compressor.CompressBufferCollection(compressBuff)
} }
if errM != nil { if errM != nil {
log.Error("WriteMsg error,Rpc return is fail", log.String("serviceMethod", serviceMethod), log.ErrorAttr("error", errM)) log.Error("WriteMsg error,Rpc return is fail", log.String("serviceMethod", serviceMethod), log.ErrorField("error", errM))
} }
} }
func (agent *RpcAgent) Run() { func (agent *RpcAgent) Run() {
defer func() { defer func() {
if r := recover(); r != nil { if r := recover(); r != nil {
buf := make([]byte, 4096) log.StackError(fmt.Sprint(r))
l := runtime.Stack(buf, false)
errString := fmt.Sprint(r)
log.Dump(string(buf[:l]), log.String("error", errString))
} }
}() }()
@@ -173,7 +170,7 @@ func (agent *RpcAgent) Run() {
data, err := agent.conn.ReadMsg() data, err := agent.conn.ReadMsg()
if err != nil { if err != nil {
//will close conn //will close conn
log.Error("read message is error", log.String("remoteAddress", agent.conn.RemoteAddr().String()), log.ErrorAttr("error", err)) log.Error("read message is error", log.String("remoteAddress", agent.conn.RemoteAddr().String()), log.ErrorField("error", err))
break break
} }
@@ -181,7 +178,7 @@ func (agent *RpcAgent) Run() {
if err != nil { if err != nil {
//will close conn //will close conn
agent.conn.ReleaseReadMsg(data) agent.conn.ReleaseReadMsg(data)
log.Error("processRpcRequest is error", log.String("remoteAddress", agent.conn.RemoteAddr().String()), log.ErrorAttr("error", err)) log.Error("processRpcRequest is error", log.String("remoteAddress", agent.conn.RemoteAddr().String()), log.ErrorField("error", err))
break break
} }

View File

@@ -35,9 +35,9 @@ type IModule interface {
} }
type IModuleTimer interface { type IModuleTimer interface {
AfterFunc(d time.Duration, cb func(*timer.Timer)) *timer.Timer SafeAfterFunc(d time.Duration, cb func(*timer.Timer)) *timer.Timer
CronFunc(cronExpr *timer.CronExpr, cb func(*timer.Cron)) *timer.Cron SafeCronFunc(cronExpr *timer.CronExpr, cb func(*timer.Cron)) *timer.Cron
NewTicker(d time.Duration, cb func(*timer.Ticker)) *timer.Ticker SafeNewTicker(d time.Duration, cb func(*timer.Ticker)) *timer.Ticker
} }
type Module struct { type Module struct {
@@ -208,6 +208,7 @@ func (m *Module) OnAddTimer(t timer.ITimer) {
} }
} }
// Deprecated: this function simply calls SafeAfterFunc
func (m *Module) AfterFunc(d time.Duration, cb func(*timer.Timer)) *timer.Timer { func (m *Module) AfterFunc(d time.Duration, cb func(*timer.Timer)) *timer.Timer {
if m.mapActiveTimer == nil { if m.mapActiveTimer == nil {
m.mapActiveTimer = map[timer.ITimer]struct{}{} m.mapActiveTimer = map[timer.ITimer]struct{}{}
@@ -216,6 +217,7 @@ func (m *Module) AfterFunc(d time.Duration, cb func(*timer.Timer)) *timer.Timer
return m.dispatcher.AfterFunc(d, nil, cb, m.OnCloseTimer, m.OnAddTimer) return m.dispatcher.AfterFunc(d, nil, cb, m.OnCloseTimer, m.OnAddTimer)
} }
// Deprecated: this function simply calls SafeCronFunc
func (m *Module) CronFunc(cronExpr *timer.CronExpr, cb func(*timer.Cron)) *timer.Cron { func (m *Module) CronFunc(cronExpr *timer.CronExpr, cb func(*timer.Cron)) *timer.Cron {
if m.mapActiveTimer == nil { if m.mapActiveTimer == nil {
m.mapActiveTimer = map[timer.ITimer]struct{}{} m.mapActiveTimer = map[timer.ITimer]struct{}{}
@@ -224,6 +226,7 @@ func (m *Module) CronFunc(cronExpr *timer.CronExpr, cb func(*timer.Cron)) *timer
return m.dispatcher.CronFunc(cronExpr, nil, cb, m.OnCloseTimer, m.OnAddTimer) return m.dispatcher.CronFunc(cronExpr, nil, cb, m.OnCloseTimer, m.OnAddTimer)
} }
// Deprecated: this function simply calls SafeNewTicker
func (m *Module) NewTicker(d time.Duration, cb func(*timer.Ticker)) *timer.Ticker { func (m *Module) NewTicker(d time.Duration, cb func(*timer.Ticker)) *timer.Ticker {
if m.mapActiveTimer == nil { if m.mapActiveTimer == nil {
m.mapActiveTimer = map[timer.ITimer]struct{}{} m.mapActiveTimer = map[timer.ITimer]struct{}{}
@@ -278,7 +281,7 @@ func (m *Module) SafeNewTicker(tickerId *uint64, d time.Duration, AdditionData i
func (m *Module) CancelTimerId(timerId *uint64) bool { func (m *Module) CancelTimerId(timerId *uint64) bool {
if timerId == nil || *timerId == 0 { if timerId == nil || *timerId == 0 {
log.Warning("timerId is invalid") log.Warn("timerId is invalid")
return false return false
} }
@@ -289,7 +292,7 @@ func (m *Module) CancelTimerId(timerId *uint64) bool {
t, ok := m.mapActiveIdTimer[*timerId] t, ok := m.mapActiveIdTimer[*timerId]
if ok == false { if ok == false {
log.Stack("cannot find timer id ", log.Uint64("timerId", *timerId)) log.StackError("cannot find timer id ", log.Uint64("timerId", *timerId))
return false return false
} }

View File

@@ -11,7 +11,6 @@ import (
"github.com/duanhf2012/origin/v2/rpc" "github.com/duanhf2012/origin/v2/rpc"
"github.com/duanhf2012/origin/v2/util/timer" "github.com/duanhf2012/origin/v2/util/timer"
"reflect" "reflect"
"runtime"
"strconv" "strconv"
"sync" "sync"
"sync/atomic" "sync/atomic"
@@ -261,10 +260,7 @@ func (s *Service) SetName(serviceName string) {
func (s *Service) Release() { func (s *Service) Release() {
defer func() { defer func() {
if r := recover(); r != nil { if r := recover(); r != nil {
buf := make([]byte, 4096) log.StackError(fmt.Sprint(r))
l := runtime.Stack(buf, false)
errString := fmt.Sprint(r)
log.Dump(string(buf[:l]), log.String("error", errString))
} }
}() }()

View File

@@ -23,7 +23,7 @@ func Init() {
for _,s := range setupServiceList { for _,s := range setupServiceList {
err := s.OnInit() err := s.OnInit()
if err != nil { if err != nil {
log.Error("Failed to initialize "+s.GetName()+" service",log.ErrorAttr("err",err)) log.Error("Failed to initialize "+s.GetName()+" service",log.ErrorField("err",err))
os.Exit(1) os.Exit(1)
} }
} }

View File

@@ -7,7 +7,6 @@ import (
"github.com/duanhf2012/origin/v2/service" "github.com/duanhf2012/origin/v2/service"
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
"io" "io"
"log/slog"
"net/http" "net/http"
"strings" "strings"
"time" "time"
@@ -69,28 +68,28 @@ func (gm *GinModule) eventHandler(ev event.IEvent) {
func (gm *GinModule) Start() { func (gm *GinModule) Start() {
gm.srv.Addr = gm.listenAddr gm.srv.Addr = gm.listenAddr
log.Info("http start listen", slog.Any("addr", gm.listenAddr)) log.Info("http start listen", log.Any("addr", gm.listenAddr))
go func() { go func() {
err := gm.srv.ListenAndServe() err := gm.srv.ListenAndServe()
if err != nil { if err != nil {
log.Error("ListenAndServe error", slog.Any("error", err.Error())) log.Error("ListenAndServe error", log.Any("error", err.Error()))
} }
}() }()
} }
func (gm *GinModule) StartTLS(certFile, keyFile string) { func (gm *GinModule) StartTLS(certFile, keyFile string) {
log.Info("http start listen", slog.Any("addr", gm.listenAddr)) log.Info("http start listen", log.Any("addr", gm.listenAddr))
go func() { go func() {
err := gm.srv.ListenAndServeTLS(certFile, keyFile) err := gm.srv.ListenAndServeTLS(certFile, keyFile)
if err != nil { if err != nil {
log.Fatal("ListenAndServeTLS error", slog.Any("error", err.Error())) log.Fatal("ListenAndServeTLS error", log.Any("error", err.Error()))
} }
}() }()
} }
func (gm *GinModule) Stop(ctx context.Context) { func (gm *GinModule) Stop(ctx context.Context) {
if err := gm.srv.Shutdown(ctx); err != nil { if err := gm.srv.Shutdown(ctx); err != nil {
log.Error("Server Shutdown", slog.Any("error", err)) log.Error("Server Shutdown", log.Any("error", err))
} }
} }
@@ -210,7 +209,7 @@ func (gm *GinModule) handleMethod(httpMethod, relativePath string, handlers ...S
select { select {
case <-ctx.Done(): case <-ctx.Done():
log.Error("GinModule process timeout", slog.Any("path", c.Request.URL.Path)) log.Error("GinModule process timeout", log.Any("path", c.Request.URL.Path))
c.AbortWithStatus(http.StatusRequestTimeout) c.AbortWithStatus(http.StatusRequestTimeout)
case <-chanWait: case <-chanWait:
} }

View File

@@ -9,7 +9,6 @@ import (
"github.com/duanhf2012/origin/v2/service" "github.com/duanhf2012/origin/v2/service"
"github.com/xtaci/kcp-go/v5" "github.com/xtaci/kcp-go/v5"
"go.mongodb.org/mongo-driver/bson/primitive" "go.mongodb.org/mongo-driver/bson/primitive"
"runtime"
"sync" "sync"
) )
@@ -167,10 +166,7 @@ func (km *KcpModule) NewAgent(conn network.Conn) network.Agent {
func (c *Client) Run() { func (c *Client) Run() {
defer func() { defer func() {
if r := recover(); r != nil { if r := recover(); r != nil {
buf := make([]byte, 4096) log.StackError(fmt.Sprint(r))
l := runtime.Stack(buf, false)
errString := fmt.Sprint(r)
log.Dump(string(buf[:l]), log.String("error", errString))
} }
}() }()
@@ -179,7 +175,7 @@ func (c *Client) Run() {
c.kcpConn.SetReadDeadline(*c.kcpModule.kcpCfg.ReadDeadlineMill) c.kcpConn.SetReadDeadline(*c.kcpModule.kcpCfg.ReadDeadlineMill)
msgBuff, err := c.kcpConn.ReadMsg() msgBuff, err := c.kcpConn.ReadMsg()
if err != nil { if err != nil {
log.Debug("read client failed", log.ErrorAttr("error", err), log.String("clientId", c.id)) log.Debug("read client failed", log.ErrorField("error", err), log.String("clientId", c.id))
break break
} }

View File

@@ -8,7 +8,6 @@ import (
"github.com/duanhf2012/origin/v2/network/processor" "github.com/duanhf2012/origin/v2/network/processor"
"github.com/duanhf2012/origin/v2/service" "github.com/duanhf2012/origin/v2/service"
"go.mongodb.org/mongo-driver/bson/primitive" "go.mongodb.org/mongo-driver/bson/primitive"
"runtime"
"sync" "sync"
"time" "time"
) )
@@ -125,10 +124,7 @@ func (slf *Client) GetId() string {
func (slf *Client) Run() { func (slf *Client) Run() {
defer func() { defer func() {
if r := recover(); r != nil { if r := recover(); r != nil {
buf := make([]byte, 4096) log.StackError(fmt.Sprint(r))
l := runtime.Stack(buf, false)
errString := fmt.Sprint(r)
log.Dump(string(buf[:l]), log.String("error", errString))
} }
}() }()
@@ -137,7 +133,7 @@ func (slf *Client) Run() {
slf.tcpConn.SetReadDeadline(slf.tcpModule.tcpServer.ReadDeadline) slf.tcpConn.SetReadDeadline(slf.tcpModule.tcpServer.ReadDeadline)
bytes, err := slf.tcpConn.ReadMsg() bytes, err := slf.tcpConn.ReadMsg()
if err != nil { if err != nil {
log.Debug("read client failed", log.ErrorAttr("error", err), log.String("clientId", slf.id)) log.Debug("read client failed", log.ErrorField("error", err), log.String("clientId", slf.id))
break break
} }
data, err := slf.tcpModule.process.Unmarshal(slf.id, bytes) data, err := slf.tcpModule.process.Unmarshal(slf.id, bytes)
@@ -185,7 +181,7 @@ func (tm *TcpModule) Close(clientId string) {
client.tcpConn.Close() client.tcpConn.Close()
} }
log.SWarning("close client:", clientId) log.SWarn("close client:", clientId)
return return
} }

View File

@@ -120,7 +120,7 @@ func (wc *WSClient) Run() {
for { for {
bytes, err := wc.wsConn.ReadMsg() bytes, err := wc.wsConn.ReadMsg()
if err != nil { if err != nil {
log.Debug("read client is error", log.String("clientId", wc.id), log.ErrorAttr("err", err)) log.Debug("read client is error", log.String("clientId", wc.id), log.ErrorField("err", err))
break break
} }
data, err := wc.wsModule.process.Unmarshal(wc.id, bytes) data, err := wc.wsModule.process.Unmarshal(wc.id, bytes)

View File

@@ -66,7 +66,7 @@ func (m *RedisModule) Init(redisCfg *ConfigRedis) {
} }
c, err := redis.Dial("tcp", redisServer, opt...) c, err := redis.Dial("tcp", redisServer, opt...)
if err != nil { if err != nil {
log.Error("Connect redis fail reason:%v", err) log.Error("Connect redis fail", log.ErrorField("err",err))
return nil, err return nil, err
} }
@@ -79,7 +79,7 @@ func (m *RedisModule) Init(redisCfg *ConfigRedis) {
} }
_, err := c.Do("PING") _, err := c.Do("PING")
if err != nil { if err != nil {
log.Error("Do PING fail reason:%v", err) log.Error("Do PING fail reason", log.ErrorField("err",err))
return err return err
} }
return err return err
@@ -101,7 +101,7 @@ func (m *RedisModule) getConn() (redis.Conn, error) {
if conn.Err() != nil { if conn.Err() != nil {
err := conn.Err() err := conn.Err()
if err != nil { if err != nil {
log.Error("get Conn have error,reason:%v", err) log.Error("get Conn have error", log.ErrorField("err",err))
} }
conn.Close() conn.Close()
return nil, err return nil, err
@@ -118,7 +118,7 @@ func (m *RedisModule) TestPingRedis() error {
err = m.redisPool.TestOnBorrow(conn, time.Now()) err = m.redisPool.TestOnBorrow(conn, time.Now())
if err != nil { if err != nil {
log.Error("TestOnBorrow fail,reason:%v", err) log.Error("TestOnBorrow fail", log.ErrorField("err",err))
return err return err
} }
@@ -171,7 +171,7 @@ func (m *RedisModule) setStringByExpire(key, value, expire interface{}) error {
} }
if retErr != nil { if retErr != nil {
log.Error("setStringByExpire fail,reason:%v", retErr) log.Error("setStringByExpire fail", log.ErrorField("err",retErr))
return retErr return retErr
} }
@@ -254,7 +254,7 @@ func (m *RedisModule) setMuchStringByExpire(mapInfo map[interface{}]interface{},
} }
if serr != nil { if serr != nil {
log.Error("setMuchStringByExpire fail,reason:%v", serr) log.Error("setMuchStringByExpire fail",log.ErrorField("err",serr))
conn.Do("DISCARD") conn.Do("DISCARD")
return serr return serr
} else { } else {
@@ -262,7 +262,7 @@ func (m *RedisModule) setMuchStringByExpire(mapInfo map[interface{}]interface{},
} }
if err != nil { if err != nil {
log.Error("setMuchStringByExpire fail,reason:%v", err) log.Error("setMuchStringByExpire fail", log.ErrorField("err",err))
} }
return err return err
@@ -277,7 +277,7 @@ func (m *RedisModule) GetString(key interface{}) (string, error) {
ret, err := conn.Do("GET", key) ret, err := conn.Do("GET", key)
if err != nil { if err != nil {
log.Error("GetString fail,reason:%v", err) log.Error("GetString fail", log.ErrorField("err",err))
return "", err return "", err
} }
@@ -298,7 +298,7 @@ func (m *RedisModule) GetStringJSON(key string, st interface{}) error {
ret, err := conn.Do("GET", key) ret, err := conn.Do("GET", key)
if err != nil { if err != nil {
log.Error("GetStringJSON fail,reason:%v", err) log.Error("GetStringJSON fail", log.ErrorField("err",err))
return err return err
} }
@@ -315,7 +315,7 @@ func (m *RedisModule) GetStringJSON(key string, st interface{}) error {
} }
if err = json.Unmarshal(str, st); err != nil { if err = json.Unmarshal(str, st); err != nil {
log.Error("GetStringJSON fail json.Unmarshal is error:%s,%s,reason:%v", key, string(str), err) log.Errorf("GetStringJSON fail json.Unmarshal is error:%s,%s,reason:%v", key, string(str), err)
return err return err
} }
@@ -336,13 +336,13 @@ func (m *RedisModule) GetStringMap(keys []string) (retMap map[string]string, err
// 开始Send数据 // 开始Send数据
err = conn.Send("MULTI") err = conn.Send("MULTI")
if err != nil { if err != nil {
log.Error("GetMuchString fail %v", err) log.Errorf("GetMuchString fail %v", err)
return nil, err return nil, err
} }
for _, val := range keys { for _, val := range keys {
err = conn.Send("GET", val) err = conn.Send("GET", val)
if err != nil { if err != nil {
log.Error("GetMuchString fail,reason:%v", err) log.Errorf("GetMuchString fail,reason:%v", err)
conn.Do("DISCARD") conn.Do("DISCARD")
return nil, err return nil, err
} }
@@ -351,7 +351,7 @@ func (m *RedisModule) GetStringMap(keys []string) (retMap map[string]string, err
// 执行命令 // 执行命令
ret, err := conn.Do("EXEC") ret, err := conn.Do("EXEC")
if err != nil { if err != nil {
log.Error("GetMuchString fail %v", err) log.Errorf("GetMuchString fail %v", err)
return return
} }
@@ -383,7 +383,7 @@ func (m *RedisModule) ExistsKey(key interface{}) (bool, error) {
ret, err := conn.Do("EXISTS", key) ret, err := conn.Do("EXISTS", key)
if err != nil { if err != nil {
log.Error("ExistsKey fail, reason:%v", err) log.Errorf("ExistsKey fail, reason:%v", err)
return false, err return false, err
} }
retValue, ok := ret.(int64) retValue, ok := ret.(int64)
@@ -404,7 +404,7 @@ func (m *RedisModule) DelString(key interface{}) error {
ret, err := conn.Do("DEL", key) ret, err := conn.Do("DEL", key)
if err != nil { if err != nil {
log.Error("DelString fail, reason:%v", err) log.Errorf("DelString fail, reason:%v", err)
return err return err
} }
@@ -439,7 +439,7 @@ func (m *RedisModule) DelStringKeyList(keys []interface{}) (map[interface{}]bool
for _, val := range keys { for _, val := range keys {
err = conn.Send("DEL", val) err = conn.Send("DEL", val)
if err != nil { if err != nil {
log.Error("DelMuchString fail,reason:%v", err) log.Errorf("DelMuchString fail,reason:%v", err)
conn.Do("DISCARD") conn.Do("DISCARD")
return nil, err return nil, err
} }
@@ -448,7 +448,7 @@ func (m *RedisModule) DelStringKeyList(keys []interface{}) (map[interface{}]bool
ret, err := conn.Do("EXEC") ret, err := conn.Do("EXEC")
if err != nil { if err != nil {
log.Error("DelMuchString fail,reason:%v", err) log.Errorf("DelMuchString fail,reason:%v", err)
return nil, err return nil, err
} }
@@ -484,7 +484,7 @@ func (m *RedisModule) SetHash(redisKey, hashKey, value interface{}) error {
_, retErr := conn.Do("HSET", redisKey, hashKey, value) _, retErr := conn.Do("HSET", redisKey, hashKey, value)
if retErr != nil { if retErr != nil {
log.Error("SetHash fail,reason:%v", retErr) log.Errorf("SetHash fail,reason:%v", retErr)
} }
return retErr return retErr
@@ -502,7 +502,7 @@ func (m *RedisModule) GetAllHashJSON(redisKey string) (map[string]string, error)
value, err := conn.Do("HGETALL", redisKey) value, err := conn.Do("HGETALL", redisKey)
if err != nil { if err != nil {
log.Error("GetAllHashJSON fail,reason:%v", err) log.Errorf("GetAllHashJSON fail,reason:%v", err)
return nil, err return nil, err
} }
@@ -522,7 +522,7 @@ func (m *RedisModule) GetHash(redisKey interface{}, fieldKey interface{}) (strin
value, err := conn.Do("HGET", redisKey, fieldKey) value, err := conn.Do("HGET", redisKey, fieldKey)
if err != nil { if err != nil {
log.Error("GetHashValueByKey fail,reason:%v", err) log.Errorf("GetHashValueByKey fail,reason:%v", err)
return "", err return "", err
} }
if value == nil { if value == nil {
@@ -545,7 +545,7 @@ func (m *RedisModule) GetMuchHash(args ...interface{}) ([]string, error) {
value, err := conn.Do("HMGET", args...) value, err := conn.Do("HMGET", args...)
if err != nil { if err != nil {
log.Error("GetHashValueByKey fail,reason:%v", err) log.Errorf("GetHashValueByKey fail,reason:%v", err)
return nil, err return nil, err
} }
if value == nil { if value == nil {
@@ -582,7 +582,7 @@ func (m *RedisModule) ScanMatchKeys(cursorValue int, redisKey string, count int)
value, err := conn.Do("SCAN", cursorValue, "match", redisKey, "count", count) value, err := conn.Do("SCAN", cursorValue, "match", redisKey, "count", count)
if err != nil { if err != nil {
log.Error("GetHashValueByKey fail,reason:%v", err) log.Errorf("GetHashValueByKey fail,reason:%v", err)
return nextCursorValue, nil, err return nextCursorValue, nil, err
} }
if value == nil { if value == nil {
@@ -618,7 +618,7 @@ func (m *RedisModule) SetHashMapJSON(redisKey string, mapFieldValue map[interfac
if err == nil { if err == nil {
_, err = conn.Do("HSET", redisKey, symbol, temp) _, err = conn.Do("HSET", redisKey, symbol, temp)
if err != nil { if err != nil {
log.Error("SetMuchHashJSON fail,reason:%v", err) log.Errorf("SetMuchHashJSON fail,reason:%v", err)
conn.Send("DISCARD") conn.Send("DISCARD")
return err return err
} }
@@ -627,7 +627,7 @@ func (m *RedisModule) SetHashMapJSON(redisKey string, mapFieldValue map[interfac
// 执行命令 // 执行命令
_, err = conn.Do("EXEC") _, err = conn.Do("EXEC")
if err != nil { if err != nil {
log.Error("SetMuchHashJSON fail,reason:%v", err) log.Errorf("SetMuchHashJSON fail,reason:%v", err)
conn.Send("DISCARD") conn.Send("DISCARD")
} }
return err return err
@@ -642,7 +642,7 @@ func (m *RedisModule) DelHash(args ...interface{}) error {
_, retErr := conn.Do("HDEL", args...) _, retErr := conn.Do("HDEL", args...)
if retErr != nil { if retErr != nil {
log.Error("DelMuchHash fail,reason:%v", retErr) log.Errorf("DelMuchHash fail,reason:%v", retErr)
} }
return retErr return retErr
} }
@@ -678,7 +678,7 @@ func (m *RedisModule) setListPush(setType string, args ...interface{}) error {
_, retErr := conn.Do(setType, args...) _, retErr := conn.Do(setType, args...)
if retErr != nil { if retErr != nil {
log.Error("setList fail,reason:%v", retErr) log.Errorf("setList fail,reason:%v", retErr)
} }
return retErr return retErr
} }
@@ -705,7 +705,7 @@ func (m *RedisModule) LRangeList(key string, start, end int) ([]string, error) {
reply, err := conn.Do("lrange", key, start, end) reply, err := conn.Do("lrange", key, start, end)
if err != nil { if err != nil {
log.Error("SetListJSONRpush fail,reason:%v", err) log.Errorf("SetListJSONRpush fail,reason:%v", err)
return nil, err return nil, err
} }
@@ -722,7 +722,7 @@ func (m *RedisModule) GetListLen(key string) (int, error) {
reply, err := conn.Do("LLEN", key) reply, err := conn.Do("LLEN", key)
if err != nil { if err != nil {
log.Error("GetListLen fail,reason:%v", err) log.Errorf("GetListLen fail,reason:%v", err)
return -1, err return -1, err
} }
return redis.Int(reply, err) return redis.Int(reply, err)
@@ -748,7 +748,7 @@ func (m *RedisModule) LTrimList(key string, start, end int) error {
_, err = conn.Do("LTRIM", key, start, end) _, err = conn.Do("LTRIM", key, start, end)
if err != nil { if err != nil {
log.Error("LtrimListValue fail,reason:%v", err) log.Errorf("LtrimListValue fail,reason:%v", err)
return err return err
} }
return nil return nil
@@ -849,7 +849,7 @@ func (m *RedisModule) ZADDInsertJson(key string, score float64, value interface{
} }
_, err = conn.Do("ZADD", key, score, JsonValue) _, err = conn.Do("ZADD", key, score, JsonValue)
if err != nil { if err != nil {
log.Error("ZADDInsertJson fail,reason:%v", err) log.Errorf("ZADDInsertJson fail,reason:%v", err)
return err return err
} }
return nil return nil
@@ -865,7 +865,7 @@ func (m *RedisModule) ZADDInsert(key string, score float64, Data interface{}) er
_, err = conn.Do("ZADD", key, score, Data) _, err = conn.Do("ZADD", key, score, Data)
if err != nil { if err != nil {
log.Error("ZADDInsert fail,reason:%v", err) log.Errorf("ZADDInsert fail,reason:%v", err)
return err return err
} }
return nil return nil
@@ -1088,7 +1088,7 @@ func (m *RedisModule) HincrbyHashInt(redisKey, hashKey string, value int) error
_, retErr := conn.Do("HINCRBY", redisKey, hashKey, value) _, retErr := conn.Do("HINCRBY", redisKey, hashKey, value)
if retErr != nil { if retErr != nil {
log.Error("HincrbyHashInt fail,reason:%v", retErr) log.Errorf("HincrbyHashInt fail,reason:%v", retErr)
} }
return retErr return retErr
@@ -1103,7 +1103,7 @@ func (m *RedisModule) EXPlREInsert(key string, TTl int) error {
_, err = conn.Do("expire", key, TTl) _, err = conn.Do("expire", key, TTl)
if err != nil { if err != nil {
log.Error("expire fail,reason:%v", err) log.Errorf("expire fail,reason:%v", err)
return err return err
} }
return nil return nil
@@ -1129,7 +1129,7 @@ func (m *RedisModule) Keys(key string) ([]string, error) {
ret, err := conn.Do("KEYS", key) ret, err := conn.Do("KEYS", key)
if err != nil { if err != nil {
log.Error("KEYS fail, reason:%v", err) log.Errorf("KEYS fail, reason:%v", err)
return nil, err return nil, err
} }
retList, ok := ret.([]interface{}) retList, ok := ret.([]interface{})

View File

@@ -9,7 +9,6 @@ import (
"github.com/duanhf2012/origin/v2/service" "github.com/duanhf2012/origin/v2/service"
"github.com/duanhf2012/origin/v2/util/bytespool" "github.com/duanhf2012/origin/v2/util/bytespool"
"github.com/google/uuid" "github.com/google/uuid"
"runtime"
"strings" "strings"
"sync" "sync"
"time" "time"
@@ -140,10 +139,7 @@ func (slf *Client) GetId() string {
func (slf *Client) Run() { func (slf *Client) Run() {
defer func() { defer func() {
if r := recover(); r != nil { if r := recover(); r != nil {
buf := make([]byte, 4096) log.StackError(fmt.Sprint(r))
l := runtime.Stack(buf, false)
errString := fmt.Sprint(r)
log.Dump(string(buf[:l]), log.String("error", errString))
} }
}() }()
@@ -156,7 +152,7 @@ func (slf *Client) Run() {
slf.tcpConn.SetReadDeadline(slf.tcpService.tcpServer.ReadDeadline) slf.tcpConn.SetReadDeadline(slf.tcpService.tcpServer.ReadDeadline)
bytes, err := slf.tcpConn.ReadMsg() bytes, err := slf.tcpConn.ReadMsg()
if err != nil { if err != nil {
log.Debug("read client failed", log.ErrorAttr("error", err), log.String("clientId", slf.id)) log.Debug("read client failed", log.ErrorField("error", err), log.String("clientId", slf.id))
break break
} }
data, err := slf.tcpService.process.Unmarshal(slf.id, bytes) data, err := slf.tcpService.process.Unmarshal(slf.id, bytes)

View File

@@ -129,7 +129,7 @@ func (slf *WSClient) Run() {
for { for {
bytes, err := slf.wsConn.ReadMsg() bytes, err := slf.wsConn.ReadMsg()
if err != nil { if err != nil {
log.Debug("read client id %s is error:%+v", slf.id, err) log.Debugf("read client id %s is error:%+v", slf.id, err)
break break
} }
data, err := slf.wsService.process.Unmarshal(slf.id, bytes) data, err := slf.wsService.process.Unmarshal(slf.id, bytes)

View File

@@ -0,0 +1,173 @@
package pubsub
import (
"container/list"
"sync/atomic"
)
type TopicType int
type Key uint64
type IBaseSubscriber interface {
OnSubscribe(key Key)
GetKey() Key
}
type ISubscriber interface {
IBaseSubscriber
OnEvent(ctx ...any)
}
type IPublisher interface {
Publish(topic TopicType, ctx ...any)
Subscribe(topic TopicType, sub ISubscriber)
UnSubscribe(topic TopicType)
UnSubscribeKey(key Key)
}
var keyID uint64
func genKeyID() Key {
return Key(atomic.AddUint64(&keyID, 1))
}
type KeyData struct {
subscriber ISubscriber
topicType TopicType
keyElement *list.Element
}
type SubscriberSet map[Key]KeyData
type TopicSet map[TopicType]*list.List
type Publisher struct {
subscriberSet SubscriberSet
topicSet TopicSet
}
func (set *SubscriberSet) init() {
*set = make(SubscriberSet, 64)
}
func (set *SubscriberSet) add(keyElement *list.Element, topicType TopicType, subscriber ISubscriber) {
(*set)[keyElement.Value.(Key)] = KeyData{subscriber: subscriber, topicType: topicType, keyElement: keyElement}
}
func (set *SubscriberSet) del(key Key) {
delete(*set, key)
}
func (set *SubscriberSet) get(key Key) (KeyData, bool) {
keyData, ok := (*set)[key]
if !ok {
return keyData, false
}
return keyData, true
}
func (set *TopicSet) init() {
*set = make(TopicSet, 64)
}
func (set *TopicSet) add(topic TopicType, key Key) *list.Element {
keyList := (*set)[topic]
if keyList == nil {
keyList = list.New()
(*set)[topic] = keyList
}
return keyList.PushBack(key)
}
func (set *TopicSet) del(topic TopicType, keyElement *list.Element) {
keyList := (*set)[topic]
if keyList == nil {
return
}
keyList.Remove(keyElement)
}
func (set *TopicSet) foreach(topic TopicType, cb func(key Key)) {
keyList := (*set)[topic]
if keyList == nil {
return
}
for e := keyList.Front(); e != nil; e = e.Next() {
cb(e.Value.(Key))
}
}
type BaseSubscriber struct {
key Key
}
func (bs *BaseSubscriber) OnSubscribe(key Key) {
bs.key = key
}
func (bs *BaseSubscriber) GetKey() Key {
return bs.key
}
func (pub *Publisher) lazyInit() {
if pub.subscriberSet == nil {
pub.subscriberSet.init()
}
if pub.topicSet == nil {
pub.topicSet.init()
}
}
func (pub *Publisher) add(topic TopicType, sub ISubscriber) Key {
key := genKeyID()
ele := pub.topicSet.add(topic, key)
pub.subscriberSet.add(ele, topic, sub)
return key
}
func (pub *Publisher) Publish(topic TopicType, ctx ...any) {
pub.lazyInit()
pub.topicSet.foreach(topic, func(key Key) {
keyData, ok := pub.subscriberSet.get(key)
if ok == false {
return
}
keyData.subscriber.OnEvent(ctx...)
})
}
func (pub *Publisher) Subscribe(topic TopicType, sub ISubscriber) bool {
if topic == 0 {
return false
}
pub.lazyInit()
sub.OnSubscribe(pub.add(topic, sub))
return true
}
func (pub *Publisher) UnSubscribe(topic TopicType) {
keyList := pub.topicSet[topic]
if keyList == nil {
return
}
for e := keyList.Front(); e != nil; e = e.Next() {
pub.subscriberSet.del(e.Value.(Key))
}
delete(pub.topicSet, topic)
}
func (pub *Publisher) UnSubscribeKey(key Key) {
keyData, ok := pub.subscriberSet.get(key)
if ok == false {
return
}
pub.topicSet.del(keyData.topicType, keyData.keyElement)
pub.subscriberSet.del(key)
}

View File

@@ -0,0 +1,54 @@
package pubsub
import (
"testing"
)
const (
Invalid TopicType = iota
Topic1
Topic2
)
var test *testing.T
type Subscriber1 struct {
BaseSubscriber
}
type Subscriber2 struct {
BaseSubscriber
}
func (sub *Subscriber1) OnEvent(ctx ...any) {
test.Log("Subscriber1 OnEvent", " key ", sub.GetKey(), ctx)
}
func (sub *Subscriber2) OnEvent(ctx ...any) {
test.Log("Subscriber2 OnEvent", " key ", sub.GetKey(), ctx)
}
func TestPubSub(t *testing.T) {
test = t
var publisher Publisher
// 创建3个订阅者
var subscriber []ISubscriber
subscriber = append(subscriber, &Subscriber1{}, &Subscriber1{}, &Subscriber2{})
// 分别注册进Publisher中
publisher.Subscribe(Topic1, subscriber[0])
publisher.Subscribe(Topic1, subscriber[1])
publisher.Subscribe(Topic2, subscriber[2])
// 发布订阅,两个Subscriber1都会调用OnEvent
publisher.Publish(Topic1, 1, 2, 3)
// 删除订阅Publish后只有Subscriber1的key2收到
publisher.UnSubscribeKey(subscriber[0].GetKey())
publisher.Publish(Topic1, 1, 2, 3)
// 删除Topic2Publish将收不到
publisher.UnSubscribe(Topic2)
publisher.Publish(Topic2, 1)
}

View File

@@ -124,10 +124,7 @@ func (t *Timer) IsOpen() bool {
func (t *Timer) Do() { func (t *Timer) Do() {
defer func() { defer func() {
if r := recover(); r != nil { if r := recover(); r != nil {
buf := make([]byte, 4096) log.StackError(fmt.Sprint(r))
l := runtime.Stack(buf, false)
errString := fmt.Sprint(r)
log.Dump(string(buf[:l]), log.String("error", errString))
} }
}() }()
@@ -210,10 +207,7 @@ func (c *Cron) Reset() {
func (c *Cron) Do() { func (c *Cron) Do() {
defer func() { defer func() {
if r := recover(); r != nil { if r := recover(); r != nil {
buf := make([]byte, 4096) log.StackError(fmt.Sprint(r))
l := runtime.Stack(buf, false)
errString := fmt.Sprint(r)
log.Dump(string(buf[:l]), log.String("error", errString))
} }
}() }()
@@ -266,10 +260,7 @@ func (c *Cron) UnRef() {
func (c *Ticker) Do() { func (c *Ticker) Do() {
defer func() { defer func() {
if r := recover(); r != nil { if r := recover(); r != nil {
buf := make([]byte, 4096) log.StackError(fmt.Sprint(r))
l := runtime.Stack(buf, false)
errString := fmt.Sprint(r)
log.Dump(string(buf[:l]), log.String("error", errString))
} }
}() }()