mirror of
https://github.com/duanhf2012/origin.git
synced 2026-02-05 15:34:49 +08:00
Compare commits
21 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3a4350769c | ||
|
|
d4966ea129 | ||
|
|
3b10eeb792 | ||
|
|
e3275e9f2a | ||
|
|
16745b34f0 | ||
|
|
f34dc7d53f | ||
|
|
0a09dc2fee | ||
|
|
f01a93c446 | ||
|
|
4d2ab4ee4f | ||
|
|
ffcc5a3489 | ||
|
|
cf6ca0483b | ||
|
|
97a21e6f71 | ||
|
|
f60a55d03a | ||
|
|
2c32d6eec9 | ||
|
|
da45f97fa8 | ||
|
|
d29abc0813 | ||
|
|
c9507f9ee9 | ||
|
|
61de4bba3a | ||
|
|
000853b479 | ||
|
|
387e83d65c | ||
|
|
07a102c6ea |
@@ -250,7 +250,7 @@ func (cls *Cluster) Init(localNodeId string, setupServiceFun SetupServiceFun) er
|
||||
//2.安装服务发现结点
|
||||
err = cls.setupDiscovery(localNodeId, setupServiceFun)
|
||||
if err != nil {
|
||||
log.Error("setupDiscovery fail", log.ErrorAttr("err", err))
|
||||
log.Error("setupDiscovery fail", log.ErrorField("err", err))
|
||||
return err
|
||||
}
|
||||
service.RegRpcEventFun = cls.RegRpcEvent
|
||||
|
||||
@@ -10,15 +10,15 @@ import (
|
||||
"go.etcd.io/etcd/client/v3"
|
||||
"google.golang.org/protobuf/proto"
|
||||
"time"
|
||||
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"go.uber.org/zap"
|
||||
"path"
|
||||
"runtime"
|
||||
"strings"
|
||||
"sync/atomic"
|
||||
"io/ioutil"
|
||||
"crypto/x509"
|
||||
"crypto/tls"
|
||||
)
|
||||
|
||||
const originDir = "/origin"
|
||||
@@ -42,11 +42,16 @@ type EtcdDiscoveryService struct {
|
||||
mapDiscoveryNodeId map[string]map[string]struct{} //map[networkName]map[nodeId]
|
||||
}
|
||||
|
||||
var etcdDiscovery *EtcdDiscoveryService
|
||||
func getEtcdDiscovery() IServiceDiscovery {
|
||||
etcdDiscovery := &EtcdDiscoveryService{}
|
||||
if etcdDiscovery == nil {
|
||||
etcdDiscovery = &EtcdDiscoveryService{}
|
||||
}
|
||||
|
||||
return etcdDiscovery
|
||||
}
|
||||
|
||||
|
||||
func (ed *EtcdDiscoveryService) InitDiscovery(localNodeId string, funDelNode FunDelNode, funSetNode FunSetNode) error {
|
||||
ed.localNodeId = localNodeId
|
||||
|
||||
@@ -89,21 +94,50 @@ func (ed *EtcdDiscoveryService) OnInit() error {
|
||||
}
|
||||
|
||||
for i := 0; i < len(etcdDiscoveryCfg.EtcdList); i++ {
|
||||
client, cerr := clientv3.New(clientv3.Config{
|
||||
var client *clientv3.Client
|
||||
var tlsConfig *tls.Config
|
||||
|
||||
if etcdDiscoveryCfg.EtcdList[i].Cert != "" {
|
||||
// load cert
|
||||
cert, cerr := tls.LoadX509KeyPair(etcdDiscoveryCfg.EtcdList[i].Cert, etcdDiscoveryCfg.EtcdList[i].CertKey)
|
||||
if cerr != nil {
|
||||
log.Error("load cert error", log.ErrorField("err", cerr))
|
||||
return cerr
|
||||
}
|
||||
|
||||
// load root ca
|
||||
caData, cerr := ioutil.ReadFile(etcdDiscoveryCfg.EtcdList[i].Ca)
|
||||
if cerr != nil {
|
||||
log.Error("load root ca error", log.ErrorField("err", cerr))
|
||||
return cerr
|
||||
}
|
||||
pool := x509.NewCertPool()
|
||||
pool.AppendCertsFromPEM(caData)
|
||||
tlsConfig = &tls.Config{
|
||||
Certificates: []tls.Certificate{cert},
|
||||
RootCAs: pool,
|
||||
}
|
||||
}
|
||||
|
||||
client, err = clientv3.New(clientv3.Config{
|
||||
Endpoints: etcdDiscoveryCfg.EtcdList[i].Endpoints,
|
||||
DialTimeout: etcdDiscoveryCfg.DialTimeoutMillisecond,
|
||||
Logger: zap.NewNop(),
|
||||
Username: etcdDiscoveryCfg.EtcdList[i].UserName,
|
||||
Password: etcdDiscoveryCfg.EtcdList[i].Password,
|
||||
Logger: log.GetLogger().Logger,
|
||||
TLS: tlsConfig,
|
||||
})
|
||||
|
||||
if cerr != nil {
|
||||
log.Error("etcd discovery init fail", log.ErrorAttr("err", cerr))
|
||||
return cerr
|
||||
|
||||
if err != nil {
|
||||
log.Error("etcd discovery init fail", log.ErrorField("err", err))
|
||||
return err
|
||||
}
|
||||
|
||||
ctx, _ := context.WithTimeout(context.Background(), time.Second*3)
|
||||
_, err = client.Leases(ctx)
|
||||
if err != nil {
|
||||
log.Error("etcd discovery init fail", log.Any("endpoint", etcdDiscoveryCfg.EtcdList[i].Endpoints), log.ErrorAttr("err", err))
|
||||
log.Error("etcd discovery init fail", log.Any("endpoint", etcdDiscoveryCfg.EtcdList[i].Endpoints), log.ErrorField("err", err))
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -128,7 +162,7 @@ func (ed *EtcdDiscoveryService) registerServiceByClient(client *clientv3.Client,
|
||||
var resp *clientv3.LeaseGrantResponse
|
||||
resp, err = client.Grant(context.Background(), cluster.GetEtcdDiscovery().TTLSecond)
|
||||
if err != nil {
|
||||
log.Error("etcd registerService fail", log.ErrorAttr("err", err))
|
||||
log.Error("etcd registerService fail", log.ErrorField("err", err))
|
||||
ed.tryRegisterService(client, etcdClient)
|
||||
return
|
||||
}
|
||||
@@ -138,7 +172,7 @@ func (ed *EtcdDiscoveryService) registerServiceByClient(client *clientv3.Client,
|
||||
// 注册服务节点到 etcd
|
||||
_, err = client.Put(context.Background(), ed.getRegisterKey(watchKey), ed.byteLocalNodeInfo, clientv3.WithLease(resp.ID))
|
||||
if err != nil {
|
||||
log.Error("etcd Put fail", log.ErrorAttr("err", err))
|
||||
log.Error("etcd Put fail", log.ErrorField("err", err))
|
||||
ed.tryRegisterService(client, etcdClient)
|
||||
return
|
||||
}
|
||||
@@ -146,7 +180,7 @@ func (ed *EtcdDiscoveryService) registerServiceByClient(client *clientv3.Client,
|
||||
|
||||
etcdClient.keepAliveChan, err = client.KeepAlive(context.Background(), etcdClient.leaseID)
|
||||
if err != nil {
|
||||
log.Error("etcd KeepAlive fail", log.ErrorAttr("err", err))
|
||||
log.Error("etcd KeepAlive fail", log.ErrorField("err", err))
|
||||
ed.tryRegisterService(client, etcdClient)
|
||||
return
|
||||
}
|
||||
@@ -200,7 +234,7 @@ func (ed *EtcdDiscoveryService) retire() error {
|
||||
// 注册服务节点到 etcd
|
||||
_, err := c.Put(context.Background(), ed.getRegisterKey(watchKey), ed.byteLocalNodeInfo, clientv3.WithLease(ec.leaseID))
|
||||
if err != nil {
|
||||
log.Error("etcd Put fail", log.ErrorAttr("err", err))
|
||||
log.Error("etcd Put fail", log.ErrorField("err", err))
|
||||
return err
|
||||
}
|
||||
}
|
||||
@@ -285,12 +319,12 @@ func (ed *EtcdDiscoveryService) setNodeInfo(networkName string, nodeInfo *rpc.No
|
||||
func (ed *EtcdDiscoveryService) close() {
|
||||
for c, ec := range ed.mapClient {
|
||||
if _, err := c.Revoke(context.Background(), ec.leaseID); err != nil {
|
||||
log.Error("etcd Revoke fail", log.ErrorAttr("err", err))
|
||||
log.Error("etcd Revoke fail", log.ErrorField("err", err))
|
||||
}
|
||||
c.Watcher.Close()
|
||||
err := c.Close()
|
||||
if err != nil {
|
||||
log.Error("etcd Close fail", log.ErrorAttr("err", err))
|
||||
log.Error("etcd Close fail", log.ErrorField("err", err))
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -299,7 +333,7 @@ func (ed *EtcdDiscoveryService) getServices(client *clientv3.Client, etcdClient
|
||||
// 根据前缀获取现有的key
|
||||
resp, err := client.Get(context.Background(), watchKey, clientv3.WithPrefix())
|
||||
if err != nil {
|
||||
log.Error("etcd Get fail", log.ErrorAttr("err", err))
|
||||
log.Error("etcd Get fail", log.ErrorField("err", err))
|
||||
ed.tryWatch(client, etcdClient)
|
||||
return false
|
||||
}
|
||||
@@ -322,11 +356,7 @@ func (ed *EtcdDiscoveryService) watchByClient(client *clientv3.Client, etcdClien
|
||||
func (ed *EtcdDiscoveryService) watcher(client *clientv3.Client, etcdClient *etcdClientInfo, watchKey string) {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
buf := make([]byte, 4096)
|
||||
l := runtime.Stack(buf, false)
|
||||
errString := fmt.Sprint(r)
|
||||
log.Dump(string(buf[:l]), log.String("error", errString))
|
||||
|
||||
log.StackError(fmt.Sprint(r))
|
||||
ed.tryWatch(client, etcdClient)
|
||||
}
|
||||
}()
|
||||
@@ -355,7 +385,7 @@ func (ed *EtcdDiscoveryService) setNode(netWorkName string, byteNode []byte) str
|
||||
var nodeInfo rpc.NodeInfo
|
||||
err := proto.Unmarshal(byteNode, &nodeInfo)
|
||||
if err != nil {
|
||||
log.Error("Unmarshal fail", log.String("netWorkName", netWorkName), log.ErrorAttr("err", err))
|
||||
log.Error("Unmarshal fail", log.String("netWorkName", netWorkName), log.ErrorField("err", err))
|
||||
return ""
|
||||
}
|
||||
|
||||
@@ -494,7 +524,7 @@ func (ed *EtcdDiscoveryService) RPC_ServiceRecord(etcdServiceRecord *service.Etc
|
||||
ctx, _ := context.WithTimeout(context.Background(), time.Second*3)
|
||||
lg, err = client.Grant(ctx, etcdServiceRecord.TTLSecond)
|
||||
if err != nil {
|
||||
log.Error("etcd record fail,cannot grant lease", log.ErrorAttr("err", err))
|
||||
log.Error("etcd record fail,cannot grant lease", log.ErrorField("err", err))
|
||||
return errors.New("cannot grant lease")
|
||||
}
|
||||
}
|
||||
@@ -503,14 +533,14 @@ func (ed *EtcdDiscoveryService) RPC_ServiceRecord(etcdServiceRecord *service.Etc
|
||||
ctx, _ := context.WithTimeout(context.Background(), time.Second*3)
|
||||
_, err = client.Put(ctx, path.Join(originDir, etcdServiceRecord.RecordKey), etcdServiceRecord.RecordInfo, clientv3.WithLease(lg.ID))
|
||||
if err != nil {
|
||||
log.Error("etcd record fail,cannot put record", log.ErrorAttr("err", err))
|
||||
log.Error("etcd record fail,cannot put record", log.ErrorField("err", err))
|
||||
}
|
||||
return errors.New("cannot put record")
|
||||
}
|
||||
|
||||
_, err = client.Put(context.Background(), path.Join(originDir, etcdServiceRecord.RecordKey), etcdServiceRecord.RecordInfo)
|
||||
if err != nil {
|
||||
log.Error("etcd record fail,cannot put record", log.ErrorAttr("err", err))
|
||||
log.Error("etcd record fail,cannot put record", log.ErrorField("err", err))
|
||||
return errors.New("cannot put record")
|
||||
}
|
||||
|
||||
|
||||
@@ -471,7 +471,7 @@ func (dc *OriginDiscoveryClient) OnRelease() {
|
||||
|
||||
err := dc.CallNodeWithTimeout(3*time.Second, masterNodeList.MasterNodeList[i].NodeId, UnRegServiceDiscover, &nodeRetireReq, &rpc.Empty{})
|
||||
if err != nil {
|
||||
log.Error("call "+UnRegServiceDiscover+" is fail", log.ErrorAttr("err", err))
|
||||
log.Error("call "+UnRegServiceDiscover+" is fail", log.ErrorField("err", err))
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -493,7 +493,7 @@ func (dc *OriginDiscoveryClient) OnRetire() {
|
||||
|
||||
err := dc.GoNode(masterNodeList.MasterNodeList[i].NodeId, NodeRetireRpcMethod, &nodeRetireReq)
|
||||
if err != nil {
|
||||
log.Error("call "+NodeRetireRpcMethod+" is fail", log.ErrorAttr("err", err))
|
||||
log.Error("call "+NodeRetireRpcMethod+" is fail", log.ErrorField("err", err))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -15,9 +15,15 @@ import (
|
||||
|
||||
var json = jsoniter.ConfigCompatibleWithStandardLibrary
|
||||
|
||||
|
||||
type EtcdList struct {
|
||||
NetworkName []string
|
||||
Endpoints []string
|
||||
UserName string
|
||||
Password string
|
||||
Cert string
|
||||
CertKey string
|
||||
Ca string
|
||||
}
|
||||
|
||||
type EtcdDiscovery struct {
|
||||
@@ -88,15 +94,16 @@ func yamlToJson(data []byte, v interface{}) ([]byte, error) {
|
||||
}
|
||||
|
||||
func unmarshalConfig(data []byte, v interface{}) error {
|
||||
if !json.Valid(data) {
|
||||
envData := []byte(os.ExpandEnv(string(data)))
|
||||
if !json.Valid(envData) {
|
||||
var err error
|
||||
data, err = yamlToJson(data, v)
|
||||
envData, err = yamlToJson(envData, v)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return json.Unmarshal(data, v)
|
||||
return json.Unmarshal(envData, v)
|
||||
}
|
||||
|
||||
func (d *DiscoveryInfo) getDiscoveryType() DiscoveryType {
|
||||
@@ -416,13 +423,12 @@ func (cls *Cluster) readLocalService(localNodeId string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (cls *Cluster) parseLocalCfg() {
|
||||
func (cls *Cluster) parseLocalCfg() error{
|
||||
rpcInfo := NodeRpcInfo{}
|
||||
rpcInfo.nodeInfo = cls.localNodeInfo
|
||||
rpcInfo.client = rpc.NewLClient(rpcInfo.nodeInfo.NodeId, &cls.callSet)
|
||||
|
||||
cls.mapRpc[cls.localNodeInfo.NodeId] = &rpcInfo
|
||||
|
||||
for _, serviceName := range cls.localNodeInfo.ServiceList {
|
||||
splitServiceName := strings.Split(serviceName, ":")
|
||||
if len(splitServiceName) == 2 {
|
||||
@@ -439,8 +445,13 @@ func (cls *Cluster) parseLocalCfg() {
|
||||
cls.mapServiceNode[serviceName] = make(map[string]struct{})
|
||||
}
|
||||
|
||||
if _,ok:=cls.mapServiceNode[serviceName][cls.localNodeInfo.NodeId];ok {
|
||||
return fmt.Errorf("duplicate service %s is configured in node %s", serviceName, cls.localNodeInfo.NodeId)
|
||||
}
|
||||
cls.mapServiceNode[serviceName][cls.localNodeInfo.NodeId] = struct{}{}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (cls *Cluster) IsNatsMode() bool {
|
||||
@@ -473,8 +484,7 @@ func (cls *Cluster) InitCfg(localNodeId string) error {
|
||||
}
|
||||
|
||||
//本地配置服务加到全局map信息中
|
||||
cls.parseLocalCfg()
|
||||
return nil
|
||||
return cls.parseLocalCfg()
|
||||
}
|
||||
|
||||
func (cls *Cluster) IsConfigService(serviceName string) bool {
|
||||
|
||||
@@ -58,7 +58,7 @@ func (c *Concurrent) AsyncDoByQueue(queueId int64, fn func() bool, cb func(err e
|
||||
}
|
||||
|
||||
if fn == nil && cb == nil {
|
||||
log.Stack("fn and cb is nil")
|
||||
log.StackError("fn and cb is nil")
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
@@ -6,7 +6,7 @@ import (
|
||||
"time"
|
||||
|
||||
"fmt"
|
||||
"runtime"
|
||||
|
||||
|
||||
"context"
|
||||
"github.com/duanhf2012/origin/v2/log"
|
||||
@@ -192,10 +192,7 @@ breakFor:
|
||||
func (d *dispatch) DoCallback(cb func(err error)) {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
buf := make([]byte, 4096)
|
||||
l := runtime.Stack(buf, false)
|
||||
errString := fmt.Sprint(r)
|
||||
log.Dump(string(buf[:l]), log.String("error", errString))
|
||||
log.StackError(fmt.Sprint(r))
|
||||
}
|
||||
}()
|
||||
|
||||
|
||||
@@ -5,7 +5,7 @@ import (
|
||||
|
||||
"errors"
|
||||
"fmt"
|
||||
"runtime"
|
||||
|
||||
|
||||
"github.com/duanhf2012/origin/v2/log"
|
||||
)
|
||||
@@ -51,15 +51,13 @@ func (w *worker) run(waitGroup *sync.WaitGroup, t task) {
|
||||
func (w *worker) exec(t *task) {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
buf := make([]byte, 4096)
|
||||
l := runtime.Stack(buf, false)
|
||||
errString := fmt.Sprint(r)
|
||||
|
||||
cb := t.cb
|
||||
t.cb = func(err error) {
|
||||
cb(errors.New(errString))
|
||||
}
|
||||
log.Dump(string(buf[:l]), log.String("error", errString))
|
||||
log.StackError(errString)
|
||||
w.endCallFun(true, t)
|
||||
}
|
||||
}()
|
||||
|
||||
@@ -3,7 +3,6 @@ package event
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/duanhf2012/origin/v2/log"
|
||||
"runtime"
|
||||
"sync"
|
||||
)
|
||||
|
||||
@@ -215,10 +214,7 @@ func (handler *EventHandler) Destroy() {
|
||||
func (processor *EventProcessor) EventHandler(ev IEvent) {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
buf := make([]byte, 4096)
|
||||
l := runtime.Stack(buf, false)
|
||||
errString := fmt.Sprint(r)
|
||||
log.Dump(string(buf[:l]), log.String("error", errString))
|
||||
log.StackError(fmt.Sprint(r))
|
||||
}
|
||||
}()
|
||||
|
||||
|
||||
1
go.mod
1
go.mod
@@ -21,6 +21,7 @@ require (
|
||||
go.mongodb.org/mongo-driver v1.9.1
|
||||
go.uber.org/zap v1.27.0
|
||||
google.golang.org/protobuf v1.34.1
|
||||
gopkg.in/natefinch/lumberjack.v2 v2.2.1
|
||||
gopkg.in/yaml.v3 v3.0.1
|
||||
)
|
||||
|
||||
|
||||
2
go.sum
2
go.sum
@@ -330,6 +330,8 @@ gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8
|
||||
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
|
||||
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
|
||||
gopkg.in/natefinch/lumberjack.v2 v2.2.1 h1:bBRl1b0OH9s/DuPhuXpNl+VtCaJXFZ5/uEFST95x9zc=
|
||||
gopkg.in/natefinch/lumberjack.v2 v2.2.1/go.mod h1:YD8tP3GAjkrDg1eZH7EGmyESg/lsYskCTPBJVb9jqSc=
|
||||
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
||||
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
|
||||
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
|
||||
|
||||
@@ -1,93 +0,0 @@
|
||||
package log
|
||||
|
||||
import (
|
||||
"strconv"
|
||||
)
|
||||
|
||||
const _size = 9216
|
||||
|
||||
type Buffer struct {
|
||||
bs []byte
|
||||
//mu sync.Mutex // ensures atomic writes; protects the following fields
|
||||
}
|
||||
|
||||
func (buff *Buffer) Init() {
|
||||
buff.bs = make([]byte, _size)
|
||||
}
|
||||
|
||||
// AppendByte writes a single byte to the Buffer.
|
||||
func (buff *Buffer) AppendByte(v byte) {
|
||||
buff.bs = append(buff.bs, v)
|
||||
}
|
||||
|
||||
func (buff *Buffer) AppendBytes(v []byte) {
|
||||
buff.bs = append(buff.bs, v...)
|
||||
}
|
||||
|
||||
// AppendString writes a string to the Buffer.
|
||||
func (buff *Buffer) AppendString(s string) {
|
||||
buff.bs = append(buff.bs, s...)
|
||||
}
|
||||
|
||||
// AppendInt appends an integer to the underlying buffer (assuming base 10).
|
||||
func (buff *Buffer) AppendInt(i int64) {
|
||||
buff.bs = strconv.AppendInt(buff.bs, i, 10)
|
||||
}
|
||||
|
||||
// AppendUint appends an unsigned integer to the underlying buffer (assuming
|
||||
// base 10).
|
||||
func (buff *Buffer) AppendUint(i uint64) {
|
||||
buff.bs = strconv.AppendUint(buff.bs, i, 10)
|
||||
}
|
||||
|
||||
// AppendBool appends a bool to the underlying buffer.
|
||||
func (buff *Buffer) AppendBool(v bool) {
|
||||
buff.bs = strconv.AppendBool(buff.bs, v)
|
||||
}
|
||||
|
||||
// AppendFloat appends a float to the underlying buffer. It doesn't quote NaN
|
||||
// or +/- Inf.
|
||||
func (buff *Buffer) AppendFloat(f float64, bitSize int) {
|
||||
buff.bs = strconv.AppendFloat(buff.bs, f, 'f', -1, bitSize)
|
||||
}
|
||||
|
||||
// Len returns the length of the underlying byte slice.
|
||||
func (buff *Buffer) Len() int {
|
||||
return len(buff.bs)
|
||||
}
|
||||
|
||||
// Cap returns the capacity of the underlying byte slice.
|
||||
func (buff *Buffer) Cap() int {
|
||||
return cap(buff.bs)
|
||||
}
|
||||
|
||||
// Bytes returns a mutable reference to the underlying byte slice.
|
||||
func (buff *Buffer) Bytes() []byte {
|
||||
return buff.bs
|
||||
}
|
||||
|
||||
// String returns a string copy of the underlying byte slice.
|
||||
func (buff *Buffer) String() string {
|
||||
return string(buff.bs)
|
||||
}
|
||||
|
||||
// Reset resets the underlying byte slice. Subsequent writes re-use the slice's
|
||||
// backing array.
|
||||
func (buff *Buffer) Reset() {
|
||||
buff.bs = buff.bs[:0]
|
||||
}
|
||||
|
||||
// Write implements io.Writer.
|
||||
func (buff *Buffer) Write(bs []byte) (int, error) {
|
||||
buff.bs = append(buff.bs, bs...)
|
||||
return len(bs), nil
|
||||
}
|
||||
|
||||
// TrimNewline trims any final "\n" byte from the end of the buffer.
|
||||
func (buff *Buffer) TrimNewline() {
|
||||
if i := len(buff.bs) - 1; i >= 0 {
|
||||
if buff.bs[i] == '\n' {
|
||||
buff.bs = buff.bs[:i]
|
||||
}
|
||||
}
|
||||
}
|
||||
161
log/handler.go
161
log/handler.go
@@ -1,161 +0,0 @@
|
||||
package log
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
"log/slog"
|
||||
"path/filepath"
|
||||
"runtime"
|
||||
"runtime/debug"
|
||||
"sync"
|
||||
)
|
||||
|
||||
const defaultSkip = 7
|
||||
|
||||
type IOriginHandler interface {
|
||||
slog.Handler
|
||||
Lock()
|
||||
UnLock()
|
||||
SetSkip(skip int)
|
||||
GetSkip() int
|
||||
}
|
||||
|
||||
type BaseHandler struct {
|
||||
addSource bool
|
||||
w io.Writer
|
||||
locker sync.Mutex
|
||||
skip int
|
||||
}
|
||||
|
||||
type OriginTextHandler struct {
|
||||
BaseHandler
|
||||
*slog.TextHandler
|
||||
}
|
||||
|
||||
type OriginJsonHandler struct {
|
||||
BaseHandler
|
||||
*slog.JSONHandler
|
||||
}
|
||||
|
||||
func (bh *BaseHandler) SetSkip(skip int) {
|
||||
bh.skip = skip
|
||||
}
|
||||
|
||||
func (bh *BaseHandler) GetSkip() int {
|
||||
return bh.skip
|
||||
}
|
||||
|
||||
func getStrLevel(level slog.Level) string {
|
||||
switch level {
|
||||
case LevelTrace:
|
||||
return "Trace"
|
||||
case LevelDebug:
|
||||
return "Debug"
|
||||
case LevelInfo:
|
||||
return "Info"
|
||||
case LevelWarning:
|
||||
return "Warning"
|
||||
case LevelError:
|
||||
return "Error"
|
||||
case LevelStack:
|
||||
return "Stack"
|
||||
case LevelDump:
|
||||
return "Dump"
|
||||
case LevelFatal:
|
||||
return "Fatal"
|
||||
}
|
||||
|
||||
return ""
|
||||
}
|
||||
|
||||
func defaultReplaceAttr(groups []string, a slog.Attr) slog.Attr {
|
||||
if a.Key == slog.LevelKey {
|
||||
level := a.Value.Any().(slog.Level)
|
||||
a.Value = slog.StringValue(getStrLevel(level))
|
||||
} else if a.Key == slog.TimeKey && len(groups) == 0 {
|
||||
a.Value = slog.StringValue(a.Value.Time().Format("2006/01/02 15:04:05"))
|
||||
} else if a.Key == slog.SourceKey {
|
||||
source := a.Value.Any().(*slog.Source)
|
||||
source.File = filepath.Base(source.File)
|
||||
}
|
||||
return a
|
||||
}
|
||||
|
||||
func NewOriginTextHandler(level slog.Level, w io.Writer, addSource bool, replaceAttr func([]string, slog.Attr) slog.Attr) slog.Handler {
|
||||
var textHandler OriginTextHandler
|
||||
textHandler.addSource = addSource
|
||||
textHandler.w = w
|
||||
textHandler.TextHandler = slog.NewTextHandler(w, &slog.HandlerOptions{
|
||||
AddSource: addSource,
|
||||
Level: level,
|
||||
ReplaceAttr: replaceAttr,
|
||||
})
|
||||
|
||||
textHandler.skip = defaultSkip
|
||||
return &textHandler
|
||||
}
|
||||
|
||||
func (oh *OriginTextHandler) Handle(context context.Context, record slog.Record) error {
|
||||
oh.Fill(context, &record)
|
||||
oh.locker.Lock()
|
||||
defer oh.locker.Unlock()
|
||||
|
||||
if record.Level == LevelStack || record.Level == LevelFatal {
|
||||
err := oh.TextHandler.Handle(context, record)
|
||||
oh.logStack(&record)
|
||||
return err
|
||||
} else if record.Level == LevelDump {
|
||||
strDump := record.Message
|
||||
record.Message = "dump info"
|
||||
err := oh.TextHandler.Handle(context, record)
|
||||
oh.w.Write([]byte(strDump))
|
||||
return err
|
||||
}
|
||||
|
||||
return oh.TextHandler.Handle(context, record)
|
||||
}
|
||||
|
||||
func (bh *BaseHandler) logStack(record *slog.Record) {
|
||||
bh.w.Write(debug.Stack())
|
||||
}
|
||||
|
||||
func (bh *BaseHandler) Lock() {
|
||||
bh.locker.Lock()
|
||||
}
|
||||
|
||||
func (bh *BaseHandler) UnLock() {
|
||||
bh.locker.Unlock()
|
||||
}
|
||||
|
||||
func NewOriginJsonHandler(level slog.Level, w io.Writer, addSource bool, replaceAttr func([]string, slog.Attr) slog.Attr) slog.Handler {
|
||||
var jsonHandler OriginJsonHandler
|
||||
jsonHandler.addSource = addSource
|
||||
jsonHandler.w = w
|
||||
jsonHandler.JSONHandler = slog.NewJSONHandler(w, &slog.HandlerOptions{
|
||||
AddSource: addSource,
|
||||
Level: level,
|
||||
ReplaceAttr: replaceAttr,
|
||||
})
|
||||
|
||||
jsonHandler.skip = defaultSkip
|
||||
return &jsonHandler
|
||||
}
|
||||
|
||||
func (oh *OriginJsonHandler) Handle(context context.Context, record slog.Record) error {
|
||||
oh.Fill(context, &record)
|
||||
if record.Level == LevelStack || record.Level == LevelFatal || record.Level == LevelDump {
|
||||
record.Add("stack", debug.Stack())
|
||||
}
|
||||
|
||||
oh.locker.Lock()
|
||||
defer oh.locker.Unlock()
|
||||
return oh.JSONHandler.Handle(context, record)
|
||||
}
|
||||
|
||||
func (bh *BaseHandler) Fill(_ context.Context, record *slog.Record) {
|
||||
if bh.addSource {
|
||||
var pcs [1]uintptr
|
||||
runtime.Callers(bh.skip, pcs[:])
|
||||
record.PC = pcs[0]
|
||||
}
|
||||
}
|
||||
763
log/log.go
763
log/log.go
@@ -1,520 +1,337 @@
|
||||
package log
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/duanhf2012/origin/v2/util/bytespool"
|
||||
"io"
|
||||
"log/slog"
|
||||
"go.uber.org/zap"
|
||||
"go.uber.org/zap/zapcore"
|
||||
"gopkg.in/natefinch/lumberjack.v2"
|
||||
"os"
|
||||
"path"
|
||||
"path/filepath"
|
||||
"runtime"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
)
|
||||
|
||||
var OpenConsole bool
|
||||
var LogSize int64
|
||||
var LogChannelCap int
|
||||
var LogPath string
|
||||
var LogLevel = LevelTrace
|
||||
|
||||
var gLogger, _ = NewTextLogger(LevelDebug, "", "", true, LogChannelCap)
|
||||
var isSetLogger bool
|
||||
var memPool = bytespool.NewMemAreaPool()
|
||||
|
||||
// levels
|
||||
const (
|
||||
LevelTrace = slog.Level(-8)
|
||||
LevelDebug = slog.LevelDebug
|
||||
LevelInfo = slog.LevelInfo
|
||||
LevelWarning = slog.LevelWarn
|
||||
LevelError = slog.LevelError
|
||||
LevelStack = slog.Level(12)
|
||||
LevelDump = slog.Level(16)
|
||||
LevelFatal = slog.Level(20)
|
||||
)
|
||||
|
||||
type ILogger interface {
|
||||
Trace(msg string, args ...any)
|
||||
Debug(msg string, args ...any)
|
||||
Info(msg string, args ...any)
|
||||
Warning(msg string, args ...any)
|
||||
Error(msg string, args ...any)
|
||||
Stack(msg string, args ...any)
|
||||
Dump(msg string, args ...any)
|
||||
Fatal(msg string, args ...any)
|
||||
|
||||
DoSPrintf(level slog.Level, a []interface{})
|
||||
FormatHeader(buf *Buffer, level slog.Level, callDepth int)
|
||||
Close()
|
||||
}
|
||||
var gLogger = NewDefaultLogger()
|
||||
|
||||
type Logger struct {
|
||||
SLogger *slog.Logger
|
||||
*zap.Logger
|
||||
stack bool
|
||||
|
||||
ioWriter IoWriter
|
||||
|
||||
sBuff Buffer
|
||||
OpenConsole *bool
|
||||
LogPath string
|
||||
FileName string
|
||||
Skip int
|
||||
LogLevel zapcore.Level
|
||||
Encoder zapcore.Encoder
|
||||
LogConfig *lumberjack.Logger
|
||||
SugaredLogger *zap.SugaredLogger
|
||||
CoreList []zapcore.Core
|
||||
}
|
||||
|
||||
type IoWriter struct {
|
||||
outFile io.Writer // destination for output
|
||||
writeBytes int64
|
||||
logChannel chan []byte
|
||||
wg sync.WaitGroup
|
||||
closeSig chan struct{}
|
||||
|
||||
lockWrite sync.Mutex
|
||||
|
||||
filePath string
|
||||
filePrefix string
|
||||
fileDay int
|
||||
fileCreateTime int64 //second
|
||||
}
|
||||
|
||||
func (iw *IoWriter) Close() error {
|
||||
iw.lockWrite.Lock()
|
||||
defer iw.lockWrite.Unlock()
|
||||
|
||||
iw.close()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (iw *IoWriter) close() error {
|
||||
if iw.closeSig != nil {
|
||||
close(iw.closeSig)
|
||||
iw.closeSig = nil
|
||||
}
|
||||
iw.wg.Wait()
|
||||
|
||||
if iw.outFile != nil {
|
||||
err := iw.outFile.(io.Closer).Close()
|
||||
iw.outFile = nil
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (iw *IoWriter) writeFile(p []byte) (n int, err error) {
|
||||
//switch log file
|
||||
iw.switchFile()
|
||||
|
||||
if iw.outFile != nil {
|
||||
n, err = iw.outFile.Write(p)
|
||||
if n > 0 {
|
||||
atomic.AddInt64(&iw.writeBytes, int64(n))
|
||||
}
|
||||
}
|
||||
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
func (iw *IoWriter) Write(p []byte) (n int, err error) {
|
||||
iw.lockWrite.Lock()
|
||||
defer iw.lockWrite.Unlock()
|
||||
|
||||
if iw.logChannel == nil {
|
||||
return iw.writeIo(p)
|
||||
}
|
||||
|
||||
copyBuff := memPool.MakeBytes(len(p))
|
||||
if copyBuff == nil {
|
||||
return 0, fmt.Errorf("MakeByteSlice failed")
|
||||
}
|
||||
copy(copyBuff, p)
|
||||
|
||||
iw.logChannel <- copyBuff
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (iw *IoWriter) writeIo(p []byte) (n int, err error) {
|
||||
n, err = iw.writeFile(p)
|
||||
|
||||
if OpenConsole {
|
||||
n, err = os.Stdout.Write(p)
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (iw *IoWriter) setLogChannel(logChannelNum int) (err error) {
|
||||
iw.lockWrite.Lock()
|
||||
defer iw.lockWrite.Unlock()
|
||||
iw.close()
|
||||
|
||||
if logChannelNum == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
//copy iw.logChannel
|
||||
var logInfo []byte
|
||||
logChannel := make(chan []byte, logChannelNum)
|
||||
for i := 0; i < logChannelNum && i < len(iw.logChannel); i++ {
|
||||
logInfo = <-iw.logChannel
|
||||
logChannel <- logInfo
|
||||
}
|
||||
iw.logChannel = logChannel
|
||||
|
||||
iw.closeSig = make(chan struct{})
|
||||
iw.wg.Add(1)
|
||||
go iw.run()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (iw *IoWriter) run() {
|
||||
defer iw.wg.Done()
|
||||
|
||||
Loop:
|
||||
for {
|
||||
select {
|
||||
case <-iw.closeSig:
|
||||
break Loop
|
||||
case logs := <-iw.logChannel:
|
||||
iw.writeIo(logs)
|
||||
memPool.ReleaseBytes(logs)
|
||||
}
|
||||
}
|
||||
|
||||
for len(iw.logChannel) > 0 {
|
||||
logs := <-iw.logChannel
|
||||
iw.writeIo(logs)
|
||||
memPool.ReleaseBytes(logs)
|
||||
}
|
||||
}
|
||||
|
||||
func (iw *IoWriter) isFull() bool {
|
||||
if LogSize == 0 {
|
||||
return false
|
||||
}
|
||||
|
||||
return atomic.LoadInt64(&iw.writeBytes) >= LogSize
|
||||
}
|
||||
|
||||
func (logger *Logger) setLogChannel(logChannel int) (err error) {
|
||||
return logger.ioWriter.setLogChannel(logChannel)
|
||||
}
|
||||
|
||||
func (iw *IoWriter) switchFile() error {
|
||||
now := time.Now()
|
||||
if iw.fileCreateTime == now.Unix() {
|
||||
return nil
|
||||
}
|
||||
|
||||
if iw.fileDay == now.Day() && iw.isFull() == false {
|
||||
return nil
|
||||
}
|
||||
|
||||
if iw.filePath != "" {
|
||||
var err error
|
||||
fileName := fmt.Sprintf("%s%d%02d%02d_%02d_%02d_%02d.log",
|
||||
iw.filePrefix,
|
||||
now.Year(),
|
||||
now.Month(),
|
||||
now.Day(),
|
||||
now.Hour(),
|
||||
now.Minute(),
|
||||
now.Second())
|
||||
|
||||
filePath := path.Join(iw.filePath, fileName)
|
||||
|
||||
iw.outFile, err = os.Create(filePath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
iw.fileDay = now.Day()
|
||||
iw.fileCreateTime = now.Unix()
|
||||
atomic.StoreInt64(&iw.writeBytes, 0)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func GetDefaultHandler() IOriginHandler {
|
||||
return gLogger.(*Logger).SLogger.Handler().(IOriginHandler)
|
||||
}
|
||||
|
||||
func NewTextLogger(level slog.Level, pathName string, filePrefix string, addSource bool, logChannelCap int) (ILogger, error) {
|
||||
var logger Logger
|
||||
logger.ioWriter.filePath = pathName
|
||||
logger.ioWriter.filePrefix = filePrefix
|
||||
|
||||
logger.SLogger = slog.New(NewOriginTextHandler(level, &logger.ioWriter, addSource, defaultReplaceAttr))
|
||||
logger.setLogChannel(logChannelCap)
|
||||
err := logger.ioWriter.switchFile()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &logger, nil
|
||||
}
|
||||
|
||||
func NewJsonLogger(level slog.Level, pathName string, filePrefix string, addSource bool, logChannelCap int) (ILogger, error) {
|
||||
var logger Logger
|
||||
logger.ioWriter.filePath = pathName
|
||||
logger.ioWriter.filePrefix = filePrefix
|
||||
|
||||
logger.SLogger = slog.New(NewOriginJsonHandler(level, &logger.ioWriter, true, defaultReplaceAttr))
|
||||
logger.setLogChannel(logChannelCap)
|
||||
err := logger.ioWriter.switchFile()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &logger, nil
|
||||
}
|
||||
|
||||
// Close It's dangerous to call the method on logging
|
||||
func (logger *Logger) Close() {
|
||||
logger.ioWriter.Close()
|
||||
}
|
||||
|
||||
func (logger *Logger) Trace(msg string, args ...any) {
|
||||
logger.SLogger.Log(context.Background(), LevelTrace, msg, args...)
|
||||
}
|
||||
|
||||
func (logger *Logger) Debug(msg string, args ...any) {
|
||||
|
||||
logger.SLogger.Log(context.Background(), LevelDebug, msg, args...)
|
||||
}
|
||||
|
||||
func (logger *Logger) Info(msg string, args ...any) {
|
||||
logger.SLogger.Log(context.Background(), LevelInfo, msg, args...)
|
||||
}
|
||||
|
||||
func (logger *Logger) Warning(msg string, args ...any) {
|
||||
logger.SLogger.Log(context.Background(), LevelWarning, msg, args...)
|
||||
}
|
||||
|
||||
func (logger *Logger) Error(msg string, args ...any) {
|
||||
logger.SLogger.Log(context.Background(), LevelError, msg, args...)
|
||||
}
|
||||
|
||||
func (logger *Logger) Stack(msg string, args ...any) {
|
||||
logger.SLogger.Log(context.Background(), LevelStack, msg, args...)
|
||||
}
|
||||
|
||||
func (logger *Logger) Dump(msg string, args ...any) {
|
||||
logger.SLogger.Log(context.Background(), LevelDump, msg, args...)
|
||||
}
|
||||
|
||||
func (logger *Logger) Fatal(msg string, args ...any) {
|
||||
logger.SLogger.Log(context.Background(), LevelFatal, msg, args...)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
// SetLogger It's non-thread-safe
|
||||
func SetLogger(logger ILogger) {
|
||||
if logger != nil && isSetLogger == false {
|
||||
func SetLogger(logger *Logger) {
|
||||
if logger != nil {
|
||||
gLogger = logger
|
||||
isSetLogger = true
|
||||
}
|
||||
}
|
||||
|
||||
func GetLogger() ILogger {
|
||||
func GetLogger() *Logger {
|
||||
return gLogger
|
||||
}
|
||||
|
||||
func Trace(msg string, args ...any) {
|
||||
gLogger.Trace(msg, args...)
|
||||
func (logger *Logger) SetEncoder(encoder zapcore.Encoder) {
|
||||
logger.Encoder = encoder
|
||||
}
|
||||
|
||||
func Debug(msg string, args ...any) {
|
||||
gLogger.Debug(msg, args...)
|
||||
func (logger *Logger) SetSkip(skip int) {
|
||||
logger.Skip = skip
|
||||
}
|
||||
|
||||
func Info(msg string, args ...any) {
|
||||
gLogger.Info(msg, args...)
|
||||
}
|
||||
|
||||
func Warning(msg string, args ...any) {
|
||||
gLogger.Warning(msg, args...)
|
||||
}
|
||||
|
||||
func Error(msg string, args ...any) {
|
||||
gLogger.Error(msg, args...)
|
||||
}
|
||||
|
||||
func Stack(msg string, args ...any) {
|
||||
gLogger.Stack(msg, args...)
|
||||
}
|
||||
|
||||
func Dump(dump string, args ...any) {
|
||||
gLogger.Dump(dump, args...)
|
||||
}
|
||||
|
||||
func Fatal(msg string, args ...any) {
|
||||
gLogger.Fatal(msg, args...)
|
||||
}
|
||||
|
||||
func Close() {
|
||||
gLogger.Close()
|
||||
}
|
||||
|
||||
func ErrorAttr(key string, value error) slog.Attr {
|
||||
if value == nil {
|
||||
return slog.Attr{Key: key, Value: slog.StringValue("nil")}
|
||||
func GetJsonEncoder() zapcore.Encoder {
|
||||
encoderConfig := zap.NewProductionEncoderConfig()
|
||||
encoderConfig.EncodeLevel = zapcore.CapitalLevelEncoder
|
||||
encoderConfig.EncodeCaller = zapcore.ShortCallerEncoder
|
||||
encoderConfig.EncodeTime = func(t time.Time, enc zapcore.PrimitiveArrayEncoder) {
|
||||
enc.AppendString(t.Format("2006-01-02 15:04:05.000"))
|
||||
}
|
||||
|
||||
return slog.Attr{Key: key, Value: slog.StringValue(value.Error())}
|
||||
return zapcore.NewJSONEncoder(encoderConfig)
|
||||
}
|
||||
|
||||
func String(key, value string) slog.Attr {
|
||||
return slog.Attr{Key: key, Value: slog.StringValue(value)}
|
||||
func GetTxtEncoder() zapcore.Encoder {
|
||||
encoderConfig := zap.NewProductionEncoderConfig()
|
||||
encoderConfig.EncodeLevel = zapcore.CapitalLevelEncoder
|
||||
encoderConfig.EncodeCaller = zapcore.ShortCallerEncoder
|
||||
encoderConfig.EncodeTime = func(t time.Time, enc zapcore.PrimitiveArrayEncoder) {
|
||||
enc.AppendString(t.Format("2006-01-02 15:04:05.000"))
|
||||
}
|
||||
|
||||
return zapcore.NewConsoleEncoder(encoderConfig)
|
||||
}
|
||||
|
||||
func Int(key string, value int) slog.Attr {
|
||||
return slog.Attr{Key: key, Value: slog.Int64Value(int64(value))}
|
||||
func getLogConfig() *lumberjack.Logger {
|
||||
return &lumberjack.Logger{
|
||||
Filename: "",
|
||||
MaxSize: 2048,
|
||||
MaxBackups: 0,
|
||||
MaxAge: 0,
|
||||
Compress: false,
|
||||
}
|
||||
}
|
||||
|
||||
func Int64(key string, value int64) slog.Attr {
|
||||
return slog.Attr{Key: key, Value: slog.Int64Value(value)}
|
||||
func NewDefaultLogger() *Logger {
|
||||
logger := Logger{}
|
||||
logger.Encoder = GetJsonEncoder()
|
||||
logger.LogConfig = getLogConfig()
|
||||
logger.LogConfig.LocalTime = true
|
||||
|
||||
logger.Init()
|
||||
return &logger
|
||||
}
|
||||
|
||||
func Int32(key string, value int32) slog.Attr {
|
||||
return slog.Attr{Key: key, Value: slog.Int64Value(int64(value))}
|
||||
func (logger *Logger) SetLogLevel(level zapcore.Level) {
|
||||
logger.LogLevel = level
|
||||
}
|
||||
|
||||
func Int16(key string, value int16) slog.Attr {
|
||||
return slog.Attr{Key: key, Value: slog.Int64Value(int64(value))}
|
||||
func (logger *Logger) Enabled(zapcore.Level) bool {
|
||||
return logger.stack
|
||||
}
|
||||
|
||||
func Int8(key string, value int8) slog.Attr {
|
||||
return slog.Attr{Key: key, Value: slog.Int64Value(int64(value))}
|
||||
}
|
||||
|
||||
func Uint(key string, value uint) slog.Attr {
|
||||
return slog.Attr{Key: key, Value: slog.Uint64Value(uint64(value))}
|
||||
}
|
||||
|
||||
func Uint64(key string, v uint64) slog.Attr {
|
||||
return slog.Attr{Key: key, Value: slog.Uint64Value(v)}
|
||||
}
|
||||
|
||||
func Uint32(key string, value uint32) slog.Attr {
|
||||
return slog.Attr{Key: key, Value: slog.Uint64Value(uint64(value))}
|
||||
}
|
||||
|
||||
func Uint16(key string, value uint16) slog.Attr {
|
||||
return slog.Attr{Key: key, Value: slog.Uint64Value(uint64(value))}
|
||||
}
|
||||
|
||||
func Uint8(key string, value uint8) slog.Attr {
|
||||
return slog.Attr{Key: key, Value: slog.Uint64Value(uint64(value))}
|
||||
}
|
||||
|
||||
func Float64(key string, v float64) slog.Attr {
|
||||
return slog.Attr{Key: key, Value: slog.Float64Value(v)}
|
||||
}
|
||||
|
||||
func Bool(key string, v bool) slog.Attr {
|
||||
return slog.Attr{Key: key, Value: slog.BoolValue(v)}
|
||||
}
|
||||
|
||||
func Time(key string, v time.Time) slog.Attr {
|
||||
return slog.Attr{Key: key, Value: slog.TimeValue(v)}
|
||||
}
|
||||
|
||||
func Duration(key string, v time.Duration) slog.Attr {
|
||||
return slog.Attr{Key: key, Value: slog.DurationValue(v)}
|
||||
}
|
||||
|
||||
func Any(key string, value any) slog.Attr {
|
||||
return slog.Attr{Key: key, Value: slog.AnyValue(value)}
|
||||
}
|
||||
|
||||
func Group(key string, args ...any) slog.Attr {
|
||||
return slog.Group(key, args...)
|
||||
}
|
||||
|
||||
func (logger *Logger) DoSPrintf(level slog.Level, a []interface{}) {
|
||||
if logger.SLogger.Enabled(context.Background(), level) == false {
|
||||
func (logger *Logger) Init() {
|
||||
if isSetLogger {
|
||||
return
|
||||
}
|
||||
|
||||
logger.SLogger.Handler().(IOriginHandler).Lock()
|
||||
defer logger.SLogger.Handler().(IOriginHandler).UnLock()
|
||||
|
||||
logger.sBuff.Reset()
|
||||
|
||||
logger.FormatHeader(&logger.sBuff, level, 3)
|
||||
|
||||
for _, s := range a {
|
||||
logger.sBuff.AppendString(slog.AnyValue(s).String())
|
||||
var coreList []zapcore.Core
|
||||
if logger.OpenConsole == nil || *logger.OpenConsole {
|
||||
core := zapcore.NewCore(logger.Encoder, zapcore.AddSync(os.Stdout), logger.LogLevel)
|
||||
coreList = append(coreList, core)
|
||||
}
|
||||
logger.sBuff.AppendString("\"\n")
|
||||
logger.ioWriter.Write(logger.sBuff.Bytes())
|
||||
}
|
||||
|
||||
func (logger *Logger) STrace(a ...interface{}) {
|
||||
logger.DoSPrintf(LevelTrace, a)
|
||||
}
|
||||
|
||||
func (logger *Logger) SDebug(a ...interface{}) {
|
||||
logger.DoSPrintf(LevelDebug, a)
|
||||
}
|
||||
|
||||
func (logger *Logger) SInfo(a ...interface{}) {
|
||||
logger.DoSPrintf(LevelInfo, a)
|
||||
}
|
||||
|
||||
func (logger *Logger) SWarning(a ...interface{}) {
|
||||
logger.DoSPrintf(LevelWarning, a)
|
||||
}
|
||||
|
||||
func (logger *Logger) SError(a ...interface{}) {
|
||||
logger.DoSPrintf(LevelError, a)
|
||||
}
|
||||
|
||||
func STrace(a ...interface{}) {
|
||||
gLogger.DoSPrintf(LevelTrace, a)
|
||||
}
|
||||
|
||||
func SDebug(a ...interface{}) {
|
||||
gLogger.DoSPrintf(LevelDebug, a)
|
||||
}
|
||||
|
||||
func SInfo(a ...interface{}) {
|
||||
gLogger.DoSPrintf(LevelInfo, a)
|
||||
}
|
||||
|
||||
func SWarning(a ...interface{}) {
|
||||
gLogger.DoSPrintf(LevelWarning, a)
|
||||
}
|
||||
|
||||
func SError(a ...interface{}) {
|
||||
gLogger.DoSPrintf(LevelError, a)
|
||||
}
|
||||
|
||||
func (logger *Logger) FormatHeader(buf *Buffer, level slog.Level, callDepth int) {
|
||||
t := time.Now()
|
||||
var file string
|
||||
var line int
|
||||
|
||||
// Release lock while getting caller info - it's expensive.
|
||||
var ok bool
|
||||
_, file, line, ok = runtime.Caller(callDepth)
|
||||
if !ok {
|
||||
file = "???"
|
||||
line = 0
|
||||
if logger.CoreList != nil {
|
||||
coreList = append(coreList, logger.CoreList...)
|
||||
}else if logger.LogPath != "" {
|
||||
WriteSyncer := zapcore.AddSync(logger.LogConfig)
|
||||
core := zapcore.NewCore(logger.Encoder, WriteSyncer, logger.LogLevel)
|
||||
coreList = append(coreList, core)
|
||||
}
|
||||
file = filepath.Base(file)
|
||||
|
||||
buf.AppendString("time=\"")
|
||||
buf.AppendString(t.Format("2006/01/02 15:04:05"))
|
||||
buf.AppendString("\"")
|
||||
logger.sBuff.AppendString(" level=")
|
||||
logger.sBuff.AppendString(getStrLevel(level))
|
||||
logger.sBuff.AppendString(" source=")
|
||||
|
||||
buf.AppendString(file)
|
||||
buf.AppendByte(':')
|
||||
buf.AppendInt(int64(line))
|
||||
buf.AppendString(" msg=\"")
|
||||
core := zapcore.NewTee(coreList...)
|
||||
logger.Logger = zap.New(core, zap.AddCaller(), zap.AddStacktrace(logger), zap.AddCallerSkip(1+logger.Skip))
|
||||
logger.SugaredLogger = logger.Logger.Sugar()
|
||||
}
|
||||
|
||||
func (logger *Logger) Debug(msg string, fields ...zap.Field) {
|
||||
logger.Logger.Debug(msg, fields...)
|
||||
}
|
||||
|
||||
func (logger *Logger) Info(msg string, fields ...zap.Field) {
|
||||
logger.Logger.Info(msg, fields...)
|
||||
}
|
||||
|
||||
func (logger *Logger) Warn(msg string, fields ...zap.Field) {
|
||||
logger.Logger.Warn(msg, fields...)
|
||||
}
|
||||
|
||||
func (logger *Logger) Error(msg string, fields ...zap.Field) {
|
||||
logger.Logger.Error(msg, fields...)
|
||||
}
|
||||
|
||||
func (logger *Logger) StackError(msg string, args ...zap.Field) {
|
||||
logger.stack = true
|
||||
logger.Logger.Log(zapcore.ErrorLevel, msg, args...)
|
||||
logger.stack = false
|
||||
}
|
||||
|
||||
func (logger *Logger) Fatal(msg string, fields ...zap.Field) {
|
||||
gLogger.stack = true
|
||||
logger.Logger.Fatal(msg, fields...)
|
||||
gLogger.stack = false
|
||||
}
|
||||
|
||||
func Debug(msg string, fields ...zap.Field) {
|
||||
gLogger.Logger.Debug(msg, fields...)
|
||||
}
|
||||
|
||||
func Info(msg string, fields ...zap.Field) {
|
||||
gLogger.Logger.Info(msg, fields...)
|
||||
}
|
||||
|
||||
func Warn(msg string, fields ...zap.Field) {
|
||||
gLogger.Logger.Warn(msg, fields...)
|
||||
}
|
||||
|
||||
func Error(msg string, fields ...zap.Field) {
|
||||
gLogger.Logger.Error(msg, fields...)
|
||||
}
|
||||
|
||||
func StackError(msg string, fields ...zap.Field) {
|
||||
gLogger.stack = true
|
||||
gLogger.Logger.Error(msg, fields...)
|
||||
gLogger.stack = false
|
||||
}
|
||||
|
||||
func Fatal(msg string, fields ...zap.Field) {
|
||||
gLogger.stack = true
|
||||
gLogger.Logger.Fatal(msg, fields...)
|
||||
gLogger.stack = false
|
||||
}
|
||||
|
||||
func Debugf(template string, args ...any) {
|
||||
gLogger.SugaredLogger.Debugf(template, args...)
|
||||
}
|
||||
|
||||
func Infof(template string, args ...any) {
|
||||
gLogger.SugaredLogger.Infof(template, args...)
|
||||
}
|
||||
|
||||
func Warnf(template string, args ...any) {
|
||||
gLogger.SugaredLogger.Warnf(template, args...)
|
||||
}
|
||||
|
||||
func Errorf(template string, args ...any) {
|
||||
gLogger.SugaredLogger.Errorf(template, args...)
|
||||
}
|
||||
|
||||
func StackErrorf(template string, args ...any) {
|
||||
gLogger.stack = true
|
||||
gLogger.SugaredLogger.Errorf(template, args...)
|
||||
gLogger.stack = false
|
||||
}
|
||||
|
||||
func Fatalf(template string, args ...any) {
|
||||
gLogger.SugaredLogger.Fatalf(template, args...)
|
||||
}
|
||||
|
||||
func (logger *Logger) SDebug(args ...interface{}) {
|
||||
logger.SugaredLogger.Debugln(args...)
|
||||
}
|
||||
|
||||
func (logger *Logger) SInfo(args ...interface{}) {
|
||||
logger.SugaredLogger.Infoln(args...)
|
||||
}
|
||||
|
||||
func (logger *Logger) SWarn(args ...interface{}) {
|
||||
logger.SugaredLogger.Warnln(args...)
|
||||
}
|
||||
|
||||
func (logger *Logger) SError(args ...interface{}) {
|
||||
logger.SugaredLogger.Errorln(args...)
|
||||
}
|
||||
|
||||
func (logger *Logger) SStackError(args ...interface{}) {
|
||||
gLogger.stack = true
|
||||
logger.SugaredLogger.Errorln(args...)
|
||||
gLogger.stack = false
|
||||
}
|
||||
|
||||
func (logger *Logger) SFatal(args ...interface{}) {
|
||||
gLogger.stack = true
|
||||
logger.SugaredLogger.Fatalln(args...)
|
||||
gLogger.stack = false
|
||||
}
|
||||
|
||||
func SDebug(args ...interface{}) {
|
||||
gLogger.SugaredLogger.Debugln(args...)
|
||||
}
|
||||
|
||||
func SInfo(args ...interface{}) {
|
||||
gLogger.SugaredLogger.Infoln(args...)
|
||||
}
|
||||
|
||||
func SWarn(args ...interface{}) {
|
||||
gLogger.SugaredLogger.Warnln(args...)
|
||||
}
|
||||
|
||||
func SError(args ...interface{}) {
|
||||
gLogger.SugaredLogger.Errorln(args...)
|
||||
}
|
||||
|
||||
func SStackError(args ...interface{}) {
|
||||
gLogger.stack = true
|
||||
gLogger.SugaredLogger.Errorln(args...)
|
||||
gLogger.stack = false
|
||||
}
|
||||
|
||||
func SFatal(args ...interface{}) {
|
||||
gLogger.stack = true
|
||||
gLogger.SugaredLogger.Fatalln(args...)
|
||||
gLogger.stack = false
|
||||
}
|
||||
|
||||
func ErrorField(key string, value error) zap.Field {
|
||||
if value == nil {
|
||||
return zap.String(key, "nil")
|
||||
}
|
||||
return zap.String(key, value.Error())
|
||||
}
|
||||
|
||||
func String(key, value string) zap.Field {
|
||||
return zap.String(key, value)
|
||||
}
|
||||
|
||||
func Int(key string, value int) zap.Field {
|
||||
return zap.Int(key, value)
|
||||
}
|
||||
|
||||
func Int64(key string, value int64) zap.Field {
|
||||
return zap.Int64(key, value)
|
||||
}
|
||||
|
||||
func Int32(key string, value int32) zap.Field {
|
||||
return zap.Int32(key, value)
|
||||
}
|
||||
|
||||
func Int16(key string, value int16) zap.Field {
|
||||
return zap.Int16(key, value)
|
||||
}
|
||||
|
||||
func Int8(key string, value int8) zap.Field {
|
||||
return zap.Int8(key, value)
|
||||
}
|
||||
|
||||
func Uint(key string, value uint) zap.Field {
|
||||
return zap.Uint(key, value)
|
||||
}
|
||||
|
||||
func Uint64(key string, v uint64) zap.Field {
|
||||
return zap.Uint64(key, v)
|
||||
}
|
||||
|
||||
func Uint32(key string, value uint32) zap.Field {
|
||||
return zap.Uint32(key, value)
|
||||
}
|
||||
|
||||
func Uint16(key string, value uint16) zap.Field {
|
||||
return zap.Uint16(key, value)
|
||||
}
|
||||
|
||||
func Uint8(key string, value uint8) zap.Field {
|
||||
return zap.Uint8(key, value)
|
||||
}
|
||||
|
||||
func Float64(key string, v float64) zap.Field {
|
||||
return zap.Float64(key, v)
|
||||
}
|
||||
|
||||
func Bool(key string, v bool) zap.Field {
|
||||
return zap.Bool(key, v)
|
||||
}
|
||||
|
||||
func Bools(key string, v []bool) zap.Field {
|
||||
return zap.Bools(key, v)
|
||||
}
|
||||
|
||||
func Time(key string, v time.Time) zap.Field {
|
||||
return zap.Time(key, v)
|
||||
}
|
||||
|
||||
func Duration(key string, v time.Duration) zap.Field {
|
||||
return zap.Duration(key, v)
|
||||
}
|
||||
|
||||
func Durations(key string, v []time.Duration) zap.Field {
|
||||
return zap.Durations(key, v)
|
||||
}
|
||||
|
||||
func Any(key string, value any) zap.Field {
|
||||
return zap.Any(key, value)
|
||||
}
|
||||
|
||||
@@ -111,15 +111,15 @@ func (netConn *NetConn) doWrite(b []byte) error {
|
||||
}
|
||||
|
||||
// b must not be modified by the others goroutines
|
||||
func (netConn *NetConn) Write(b []byte) error {
|
||||
func (netConn *NetConn) Write(b []byte) (int,error) {
|
||||
netConn.Lock()
|
||||
defer netConn.Unlock()
|
||||
if atomic.LoadInt32(&netConn.closeFlag) == 1 || b == nil {
|
||||
netConn.ReleaseReadMsg(b)
|
||||
return errors.New("conn is close")
|
||||
return 0,errors.New("conn is close")
|
||||
}
|
||||
|
||||
return netConn.doWrite(b)
|
||||
return len(b),netConn.doWrite(b)
|
||||
}
|
||||
|
||||
func (netConn *NetConn) Read(b []byte) (int, error) {
|
||||
@@ -150,7 +150,7 @@ func (netConn *NetConn) WriteMsg(args ...[]byte) error {
|
||||
if atomic.LoadInt32(&netConn.closeFlag) == 1 {
|
||||
return errors.New("conn is close")
|
||||
}
|
||||
return netConn.msgParser.Write(netConn.conn, args...)
|
||||
return netConn.msgParser.Write(netConn, args...)
|
||||
}
|
||||
|
||||
func (netConn *NetConn) WriteRawMsg(args []byte) error {
|
||||
@@ -158,7 +158,8 @@ func (netConn *NetConn) WriteRawMsg(args []byte) error {
|
||||
return errors.New("conn is close")
|
||||
}
|
||||
|
||||
return netConn.Write(args)
|
||||
_,err:= netConn.Write(args)
|
||||
return err
|
||||
}
|
||||
|
||||
func (netConn *NetConn) IsConnected() bool {
|
||||
|
||||
@@ -106,7 +106,7 @@ func (client *KCPClient) dial() net.Conn {
|
||||
return conn
|
||||
}
|
||||
|
||||
log.Warning("connect error ", log.String("error", err.Error()), log.String("Addr", client.Addr))
|
||||
log.Warn("connect error ", log.String("error", err.Error()), log.String("Addr", client.Addr))
|
||||
time.Sleep(client.ConnectInterval)
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -203,7 +203,7 @@ func (kp *KCPServer) initSession(session *kcp.UDPSession) {
|
||||
func (kp *KCPServer) run(listener *kcp.Listener) bool {
|
||||
conn, err := listener.Accept()
|
||||
if err != nil {
|
||||
log.Error("accept error", log.String("ListenAddr", kp.kcpCfg.ListenAddr), log.ErrorAttr("err", err))
|
||||
log.Error("accept error", log.String("ListenAddr", kp.kcpCfg.ListenAddr), log.ErrorField("err", err))
|
||||
return false
|
||||
}
|
||||
|
||||
@@ -211,7 +211,7 @@ func (kp *KCPServer) run(listener *kcp.Listener) bool {
|
||||
if len(kp.conns) >= kp.kcpCfg.MaxConnNum {
|
||||
kp.mutexConns.Unlock()
|
||||
conn.Close()
|
||||
log.Warning("too many connections")
|
||||
log.Warn("too many connections")
|
||||
return true
|
||||
}
|
||||
kp.conns[conn] = struct{}{}
|
||||
|
||||
@@ -102,7 +102,7 @@ func (client *TCPClient) dial() net.Conn {
|
||||
return conn
|
||||
}
|
||||
|
||||
log.Warning("connect error ", log.String("error", err.Error()), log.String("Addr", client.Addr))
|
||||
log.Warn("connect error ", log.String("error", err.Error()), log.String("Addr", client.Addr))
|
||||
time.Sleep(client.ConnectInterval)
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -146,7 +146,7 @@ func (server *TCPServer) run() {
|
||||
if len(server.conns) >= server.MaxConnNum {
|
||||
server.mutexConns.Unlock()
|
||||
conn.Close()
|
||||
log.Warning("too many connections")
|
||||
log.Warn("too many connections")
|
||||
continue
|
||||
}
|
||||
|
||||
|
||||
@@ -68,7 +68,7 @@ func (handler *WSHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
if len(handler.conns) >= handler.maxConnNum {
|
||||
handler.mutexConns.Unlock()
|
||||
conn.Close()
|
||||
log.Warning("too many connections")
|
||||
log.Warn("too many connections")
|
||||
return
|
||||
}
|
||||
handler.conns[conn] = struct{}{}
|
||||
|
||||
108
node/node.go
108
node/node.go
@@ -11,11 +11,13 @@ import (
|
||||
"github.com/duanhf2012/origin/v2/util/buildtime"
|
||||
"github.com/duanhf2012/origin/v2/util/sysprocess"
|
||||
"github.com/duanhf2012/origin/v2/util/timer"
|
||||
"go.uber.org/zap/zapcore"
|
||||
"io"
|
||||
"net/http"
|
||||
_ "net/http/pprof"
|
||||
"os"
|
||||
"os/signal"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
"strings"
|
||||
"syscall"
|
||||
@@ -54,10 +56,9 @@ func init() {
|
||||
console.RegisterCommandString("retire", "", "<-retire nodeid=nodeid> retire originserver process.", retireNode)
|
||||
console.RegisterCommandString("config", "", "<-config path> Configuration file path.", setConfigPath)
|
||||
console.RegisterCommandString("console", "", "<-console true|false> Turn on or off screen log output.", openConsole)
|
||||
console.RegisterCommandString("loglevel", "debug", "<-loglevel debug|release|warning|error|fatal> Set loglevel.", setLevel)
|
||||
console.RegisterCommandString("loglevel", "debug", "<-loglevel debug|info|warn|error|stackerror|fatal> Set loglevel.", setLevel)
|
||||
console.RegisterCommandString("logpath", "", "<-logpath path> Set log file path.", setLogPath)
|
||||
console.RegisterCommandInt("logsize", 0, "<-logsize size> Set log size(MB).", setLogSize)
|
||||
console.RegisterCommandInt("logchannelcap", -1, "<-logchannelcap num> Set log channel cap.", setLogChannelCapNum)
|
||||
console.RegisterCommandString("pprof", "", "<-pprof ip:port> Open performance analysis.", setPprof)
|
||||
}
|
||||
|
||||
@@ -156,12 +157,14 @@ func initNode(id string) {
|
||||
nodeId = id
|
||||
err := cluster.GetCluster().Init(GetNodeId(), Setup)
|
||||
if err != nil {
|
||||
log.Error("Init cluster fail", log.ErrorAttr("error", err))
|
||||
log.Error("Init cluster fail", log.ErrorField("error", err))
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
err = initLog()
|
||||
if err != nil {
|
||||
log.Error("Init log fail", log.ErrorField("error", err))
|
||||
os.Exit(1)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -211,22 +214,25 @@ func initNode(id string) {
|
||||
}
|
||||
|
||||
//3.service初始化
|
||||
log.Info("Start running server.")
|
||||
service.Init()
|
||||
}
|
||||
|
||||
func initLog() error {
|
||||
if log.LogPath == "" {
|
||||
setLogPath("./log")
|
||||
logger := log.GetLogger()
|
||||
if logger.LogPath == "" {
|
||||
err := setLogPath("./log")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
localNodeInfo := cluster.GetCluster().GetLocalNodeInfo()
|
||||
filePre := fmt.Sprintf("%s_", localNodeInfo.NodeId)
|
||||
logger, err := log.NewTextLogger(log.LogLevel, log.LogPath, filePre, true, log.LogChannelCap)
|
||||
if err != nil {
|
||||
fmt.Printf("cannot create log file!\n")
|
||||
return err
|
||||
}
|
||||
log.SetLogger(logger)
|
||||
fileName := fmt.Sprintf("%s.log", localNodeInfo.NodeId)
|
||||
logger.FileName = fileName
|
||||
logger.LogConfig.Filename = filepath.Join(logger.LogPath, logger.FileName)
|
||||
|
||||
logger.Init()
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -323,34 +329,37 @@ func startNode(args interface{}) error {
|
||||
myName, mErr := sysprocess.GetMyProcessName()
|
||||
//当前进程名获取失败,不应该发生
|
||||
if mErr != nil {
|
||||
log.Info("get my process's name is error", log.ErrorAttr("err", mErr))
|
||||
log.Error("get my process's name is error", log.ErrorField("err", mErr))
|
||||
os.Exit(-1)
|
||||
}
|
||||
|
||||
//进程id存在,而且进程名也相同,被认为是当前进程重复运行
|
||||
if cErr == nil && name == myName {
|
||||
log.Info("repeat runs are not allowed", log.String("nodeId", strNodeId), log.Int("processId", processId))
|
||||
log.Error("repeat runs are not allowed", log.String("nodeId", strNodeId), log.Int("processId", processId))
|
||||
os.Exit(-1)
|
||||
}
|
||||
break
|
||||
}
|
||||
|
||||
//2.记录进程id号
|
||||
log.Info("Start running server.")
|
||||
writeProcessPid(strNodeId)
|
||||
timer.StartTimer(10*time.Millisecond, 1000000)
|
||||
|
||||
//3.初始化node
|
||||
defer log.GetLogger().Logger.Sync()
|
||||
initNode(strNodeId)
|
||||
|
||||
//4.运行service
|
||||
service.Start()
|
||||
|
||||
//5.运行集群
|
||||
cluster.GetCluster().Start()
|
||||
err := cluster.GetCluster().Start()
|
||||
if err != nil {
|
||||
log.Error(err.Error())
|
||||
os.Exit(-1)
|
||||
}
|
||||
|
||||
//6.监听程序退出信号&性能报告
|
||||
|
||||
var pProfilerTicker *time.Ticker = &time.Ticker{}
|
||||
if profilerInterval > 0 {
|
||||
pProfilerTicker = time.NewTicker(profilerInterval)
|
||||
@@ -378,7 +387,7 @@ func startNode(args interface{}) error {
|
||||
cluster.GetCluster().Stop()
|
||||
|
||||
log.Info("Server is stop.")
|
||||
log.Close()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -400,8 +409,8 @@ func SetupTemplateFunc(fs ...func() service.IService) {
|
||||
}
|
||||
}
|
||||
|
||||
func SetupTemplate[T any,P templateServicePoint[T]]() {
|
||||
SetupTemplateFunc(func() service.IService{
|
||||
func SetupTemplate[T any, P templateServicePoint[T]]() {
|
||||
SetupTemplateFunc(func() service.IService {
|
||||
var t T
|
||||
return P(&t)
|
||||
})
|
||||
@@ -430,9 +439,11 @@ func openConsole(args interface{}) error {
|
||||
}
|
||||
strOpen := strings.ToLower(strings.TrimSpace(args.(string)))
|
||||
if strOpen == "false" {
|
||||
log.OpenConsole = false
|
||||
bOpenConsole := false
|
||||
log.GetLogger().OpenConsole = &bOpenConsole
|
||||
} else if strOpen == "true" {
|
||||
log.OpenConsole = true
|
||||
bOpenConsole := true
|
||||
log.GetLogger().OpenConsole = &bOpenConsole
|
||||
} else {
|
||||
return errors.New("parameter console error")
|
||||
}
|
||||
@@ -446,20 +457,18 @@ func setLevel(args interface{}) error {
|
||||
|
||||
strlogLevel := strings.TrimSpace(args.(string))
|
||||
switch strlogLevel {
|
||||
case "trace":
|
||||
log.LogLevel = log.LevelTrace
|
||||
case "debug":
|
||||
log.LogLevel = log.LevelDebug
|
||||
log.GetLogger().LogLevel = zapcore.DebugLevel
|
||||
case "info":
|
||||
log.LogLevel = log.LevelInfo
|
||||
case "warning":
|
||||
log.LogLevel = log.LevelWarning
|
||||
log.GetLogger().LogLevel = zapcore.InfoLevel
|
||||
case "warn":
|
||||
log.GetLogger().LogLevel = zapcore.WarnLevel
|
||||
case "error":
|
||||
log.LogLevel = log.LevelError
|
||||
case "stack":
|
||||
log.LogLevel = log.LevelStack
|
||||
log.GetLogger().LogLevel = zapcore.ErrorLevel
|
||||
case "stackerror":
|
||||
log.GetLogger().LogLevel = zapcore.ErrorLevel
|
||||
case "fatal":
|
||||
log.LogLevel = log.LevelFatal
|
||||
log.GetLogger().LogLevel = zapcore.FatalLevel
|
||||
default:
|
||||
return errors.New("unknown level: " + strlogLevel)
|
||||
}
|
||||
@@ -470,52 +479,33 @@ func setLogPath(args interface{}) error {
|
||||
if args == "" {
|
||||
return nil
|
||||
}
|
||||
|
||||
log.LogPath = strings.TrimSpace(args.(string))
|
||||
dir, err := os.Stat(log.LogPath) //这个文件夹不存在
|
||||
logPath := strings.TrimSpace(args.(string))
|
||||
dir, err := os.Stat(logPath)
|
||||
if err == nil && dir.IsDir() == false {
|
||||
return errors.New("Not found dir " + log.LogPath)
|
||||
return errors.New("Not found dir " + logPath)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
err = os.Mkdir(log.LogPath, os.ModePerm)
|
||||
err = os.MkdirAll(logPath, os.ModePerm)
|
||||
if err != nil {
|
||||
return errors.New("Cannot create dir " + log.LogPath)
|
||||
return errors.New("Cannot create dir " + logPath)
|
||||
}
|
||||
}
|
||||
|
||||
log.GetLogger().LogPath = logPath
|
||||
return nil
|
||||
}
|
||||
|
||||
func setLogSize(args interface{}) error {
|
||||
if args == "" {
|
||||
return nil
|
||||
}
|
||||
|
||||
logSize, ok := args.(int)
|
||||
if ok == false {
|
||||
return errors.New("param logsize is error")
|
||||
}
|
||||
|
||||
log.LogSize = int64(logSize) * 1024 * 1024
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func setLogChannelCapNum(args interface{}) error {
|
||||
if args == "" {
|
||||
if logSize == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
logChannelCap, ok := args.(int)
|
||||
if ok == false {
|
||||
return errors.New("param logsize is error")
|
||||
}
|
||||
log.GetLogger().LogConfig.MaxSize = logSize
|
||||
|
||||
if logChannelCap == -1 {
|
||||
return nil
|
||||
}
|
||||
|
||||
log.LogChannelCap = logChannelCap
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -75,7 +75,7 @@ func (cs *CallSet) AddPending(call *Call) {
|
||||
|
||||
if call.Seq == 0 {
|
||||
cs.pendingLock.Unlock()
|
||||
log.Stack("call is error.")
|
||||
log.StackError("call is error.")
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
@@ -32,7 +32,7 @@ type IRealClient interface {
|
||||
SetConn(conn *network.NetConn)
|
||||
Close(waitDone bool)
|
||||
|
||||
AsyncCall(NodeId string, timeout time.Duration, rpcHandler IRpcHandler, serviceMethod string, callback reflect.Value, args interface{}, replyParam interface{}, cancelable bool) (CancelRpc, error)
|
||||
AsyncCall(NodeId string, timeout time.Duration, rpcHandler IRpcHandler, serviceMethod string, callback reflect.Value, args interface{}, replyParam interface{}) (CancelRpc, error)
|
||||
Go(NodeId string, timeout time.Duration, rpcHandler IRpcHandler, noReply bool, serviceMethod string, args interface{}, reply interface{}) *Call
|
||||
RawGo(NodeId string, timeout time.Duration, rpcHandler IRpcHandler, processor IRpcProcessor, noReply bool, rpcMethodId uint32, serviceMethod string, rawArgs []byte, reply interface{}) *Call
|
||||
IsConnected() bool
|
||||
@@ -104,7 +104,7 @@ func (client *Client) processRpcResponse(responseData []byte) error {
|
||||
//rc.conn.ReleaseReadMsg(bytes)
|
||||
if err != nil {
|
||||
processor.ReleaseRpcResponse(response.RpcResponseData)
|
||||
log.Error("rpcClient Unmarshal head error", log.ErrorAttr("error", err))
|
||||
log.Error("rpcClient Unmarshal head error", log.ErrorField("error", err))
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -116,7 +116,7 @@ func (client *Client) processRpcResponse(responseData []byte) error {
|
||||
if len(response.RpcResponseData.GetReply()) > 0 {
|
||||
err = processor.Unmarshal(response.RpcResponseData.GetReply(), v.Reply)
|
||||
if err != nil {
|
||||
log.Error("rpcClient Unmarshal body failed", log.ErrorAttr("error", err))
|
||||
log.Error("rpcClient Unmarshal body failed", log.ErrorField("error", err))
|
||||
v.Err = err
|
||||
}
|
||||
}
|
||||
@@ -203,7 +203,7 @@ func (client *Client) rawGo(nodeId string, w IWriter, timeout time.Duration, rpc
|
||||
}
|
||||
if err != nil {
|
||||
client.RemovePending(call.Seq)
|
||||
log.Error("WriteMsg is fail", log.ErrorAttr("error", err))
|
||||
log.Error("WriteMsg is fail", log.ErrorField("error", err))
|
||||
call.Seq = 0
|
||||
call.DoError(err)
|
||||
}
|
||||
@@ -211,7 +211,7 @@ func (client *Client) rawGo(nodeId string, w IWriter, timeout time.Duration, rpc
|
||||
return call
|
||||
}
|
||||
|
||||
func (client *Client) asyncCall(nodeId string, w IWriter, timeout time.Duration, rpcHandler IRpcHandler, serviceMethod string, callback reflect.Value, args interface{}, replyParam interface{}, cancelable bool) (CancelRpc, error) {
|
||||
func (client *Client) asyncCall(nodeId string, w IWriter, timeout time.Duration, rpcHandler IRpcHandler, serviceMethod string, callback reflect.Value, args interface{}, replyParam interface{}) (CancelRpc, error) {
|
||||
processorType, processor := GetProcessorType(args)
|
||||
InParam, herr := processor.Marshal(args)
|
||||
if herr != nil {
|
||||
@@ -264,10 +264,7 @@ func (client *Client) asyncCall(nodeId string, w IWriter, timeout time.Duration,
|
||||
return emptyCancelRpc, err
|
||||
}
|
||||
|
||||
if cancelable {
|
||||
rpcCancel := RpcCancel{CallSeq: seq, Cli: client}
|
||||
return rpcCancel.CancelRpc, nil
|
||||
}
|
||||
|
||||
return emptyCancelRpc, nil
|
||||
rpcCancel := RpcCancel{CallSeq: seq, Cli: client}
|
||||
return rpcCancel.CancelRpc, nil
|
||||
}
|
||||
|
||||
@@ -90,7 +90,7 @@ func (lc *LClient) RawGo(nodeId string, timeout time.Duration, rpcHandler IRpcHa
|
||||
return pLocalRpcServer.selfNodeRpcHandlerGo(timeout, processor, lc.selfClient, true, serviceName, rpcMethodId, serviceName, nil, nil, rawArgs)
|
||||
}
|
||||
|
||||
func (lc *LClient) AsyncCall(nodeId string, timeout time.Duration, rpcHandler IRpcHandler, serviceMethod string, callback reflect.Value, args interface{}, reply interface{}, cancelable bool) (CancelRpc, error) {
|
||||
func (lc *LClient) AsyncCall(nodeId string, timeout time.Duration, rpcHandler IRpcHandler, serviceMethod string, callback reflect.Value, args interface{}, reply interface{}) (CancelRpc, error) {
|
||||
pLocalRpcServer := rpcHandler.GetRpcServer()()
|
||||
|
||||
//判断是否是同一服务
|
||||
@@ -109,7 +109,7 @@ func (lc *LClient) AsyncCall(nodeId string, timeout time.Duration, rpcHandler IR
|
||||
}
|
||||
|
||||
//其他的rpcHandler的处理器
|
||||
cancelRpc, err := pLocalRpcServer.selfNodeRpcHandlerAsyncGo(timeout, lc.selfClient, rpcHandler, false, serviceName, serviceMethod, args, reply, callback, cancelable)
|
||||
cancelRpc, err := pLocalRpcServer.selfNodeRpcHandlerAsyncGo(timeout, lc.selfClient, rpcHandler, false, serviceName, serviceMethod, args, reply, callback)
|
||||
if err != nil {
|
||||
callback.Call([]reflect.Value{reflect.ValueOf(reply), reflect.ValueOf(err)})
|
||||
}
|
||||
@@ -121,9 +121,6 @@ func NewLClient(localNodeId string, callSet *CallSet) *Client {
|
||||
client := &Client{}
|
||||
client.clientId = atomic.AddUint32(&clientSeq, 1)
|
||||
client.targetNodeId = localNodeId
|
||||
//client.maxCheckCallRpcCount = DefaultMaxCheckCallRpcCount
|
||||
//client.callRpcTimeout = DefaultRpcTimeout
|
||||
|
||||
lClient := &LClient{}
|
||||
lClient.selfClient = client
|
||||
client.IRealClient = lClient
|
||||
|
||||
@@ -42,7 +42,7 @@ func (server *BaseServer) selfNodeRpcHandlerGo(timeout time.Duration, processor
|
||||
rpcHandler := server.rpcHandleFinder.FindRpcHandler(handlerName)
|
||||
if rpcHandler == nil {
|
||||
err := errors.New("service method " + serviceMethod + " not config!")
|
||||
log.Error("service method not config", log.String("serviceMethod", serviceMethod), log.ErrorAttr("error", err))
|
||||
log.Error("service method not config", log.String("serviceMethod", serviceMethod), log.ErrorField("error", err))
|
||||
pCall.Seq = 0
|
||||
pCall.DoError(err)
|
||||
|
||||
@@ -74,7 +74,7 @@ func (server *BaseServer) selfNodeRpcHandlerGo(timeout time.Duration, processor
|
||||
var err error
|
||||
req.inParam, err = rpcHandler.UnmarshalInParam(processor, serviceMethod, rpcMethodId, rawArgs)
|
||||
if err != nil {
|
||||
log.Error("unmarshalInParam is failed", log.String("serviceMethod", serviceMethod), log.Uint32("rpcMethodId", rpcMethodId), log.ErrorAttr("error", err))
|
||||
log.Error("unmarshalInParam is failed", log.String("serviceMethod", serviceMethod), log.Uint32("rpcMethodId", rpcMethodId), log.ErrorField("error", err))
|
||||
pCall.Seq = 0
|
||||
pCall.DoError(err)
|
||||
ReleaseRpcRequest(req)
|
||||
@@ -90,12 +90,12 @@ func (server *BaseServer) selfNodeRpcHandlerGo(timeout time.Duration, processor
|
||||
byteReturns, err := req.rpcProcessor.Marshal(Returns)
|
||||
if err != nil {
|
||||
Err = ConvertError(err)
|
||||
log.Error("returns data cannot be marshal", log.Uint64("seq", callSeq), log.ErrorAttr("error", err))
|
||||
log.Error("returns data cannot be marshal", log.Uint64("seq", callSeq), log.ErrorField("error", err))
|
||||
} else {
|
||||
err = req.rpcProcessor.Unmarshal(byteReturns, reply)
|
||||
if err != nil {
|
||||
Err = ConvertError(err)
|
||||
log.Error("returns data cannot be Unmarshal", log.Uint64("seq", callSeq), log.ErrorAttr("error", err))
|
||||
log.Error("returns data cannot be Unmarshal", log.Uint64("seq", callSeq), log.ErrorField("error", err))
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -127,7 +127,7 @@ func (server *BaseServer) selfNodeRpcHandlerGo(timeout time.Duration, processor
|
||||
return pCall
|
||||
}
|
||||
|
||||
func (server *BaseServer) selfNodeRpcHandlerAsyncGo(timeout time.Duration, client *Client, callerRpcHandler IRpcHandler, noReply bool, handlerName string, serviceMethod string, args interface{}, reply interface{}, callback reflect.Value, cancelable bool) (CancelRpc, error) {
|
||||
func (server *BaseServer) selfNodeRpcHandlerAsyncGo(timeout time.Duration, client *Client, callerRpcHandler IRpcHandler, noReply bool, handlerName string, serviceMethod string, args interface{}, reply interface{}, callback reflect.Value) (CancelRpc, error) {
|
||||
rpcHandler := server.rpcHandleFinder.FindRpcHandler(handlerName)
|
||||
if rpcHandler == nil {
|
||||
err := errors.New("service method " + serviceMethod + " not config!")
|
||||
@@ -266,7 +266,7 @@ func (server *BaseServer) processRpcRequest(data []byte, connTag string, wrRespo
|
||||
req.inParam, err = rpcHandler.UnmarshalInParam(req.rpcProcessor, req.RpcRequestData.GetServiceMethod(), req.RpcRequestData.GetRpcMethodId(), req.RpcRequestData.GetInParam())
|
||||
if err != nil {
|
||||
rErr := "Call Rpc " + req.RpcRequestData.GetServiceMethod() + " Param error " + err.Error()
|
||||
log.Error("call rpc param error", log.String("serviceMethod", req.RpcRequestData.GetServiceMethod()), log.ErrorAttr("error", err))
|
||||
log.Error("call rpc param error", log.String("serviceMethod", req.RpcRequestData.GetServiceMethod()), log.ErrorField("error", err))
|
||||
if req.requestHandle != nil {
|
||||
req.requestHandle(nil, RpcError(rErr))
|
||||
} else {
|
||||
|
||||
@@ -50,7 +50,7 @@ func (nc *NatsClient) Go(nodeId string, timeout time.Duration, rpcHandler IRpcHa
|
||||
_, processor := GetProcessorType(args)
|
||||
InParam, err := processor.Marshal(args)
|
||||
if err != nil {
|
||||
log.Error("Marshal is fail", log.ErrorAttr("error", err))
|
||||
log.Error("Marshal is fail", log.ErrorField("error", err))
|
||||
call := MakeCall()
|
||||
call.DoError(err)
|
||||
return call
|
||||
@@ -63,8 +63,8 @@ func (nc *NatsClient) RawGo(nodeId string, timeout time.Duration, rpcHandler IRp
|
||||
return nc.client.rawGo(nodeId, nc, timeout, rpcHandler, processor, noReply, rpcMethodId, serviceMethod, rawArgs, reply)
|
||||
}
|
||||
|
||||
func (nc *NatsClient) AsyncCall(nodeId string, timeout time.Duration, rpcHandler IRpcHandler, serviceMethod string, callback reflect.Value, args interface{}, replyParam interface{}, cancelable bool) (CancelRpc, error) {
|
||||
cancelRpc, err := nc.client.asyncCall(nodeId, nc, timeout, rpcHandler, serviceMethod, callback, args, replyParam, cancelable)
|
||||
func (nc *NatsClient) AsyncCall(nodeId string, timeout time.Duration, rpcHandler IRpcHandler, serviceMethod string, callback reflect.Value, args interface{}, replyParam interface{}) (CancelRpc, error) {
|
||||
cancelRpc, err := nc.client.asyncCall(nodeId, nc, timeout, rpcHandler, serviceMethod, callback, args, replyParam)
|
||||
if err != nil {
|
||||
callback.Call([]reflect.Value{reflect.ValueOf(replyParam), reflect.ValueOf(err)})
|
||||
}
|
||||
|
||||
@@ -48,7 +48,7 @@ func (ns *NatsServer) Start() error {
|
||||
|
||||
ns.natsConn, err = nats.Connect(ns.natsUrl, options...)
|
||||
if err != nil {
|
||||
log.Error("Connect to nats fail", log.String("natsUrl", ns.natsUrl), log.ErrorAttr("err", err))
|
||||
log.Error("Connect to nats fail", log.String("natsUrl", ns.natsUrl), log.ErrorField("err", err))
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -77,7 +77,7 @@ func (ns *NatsServer) WriteResponse(processor IRpcProcessor, nodeId string, serv
|
||||
defer processor.ReleaseRpcResponse(rpcResponse.RpcResponseData)
|
||||
|
||||
if err != nil {
|
||||
log.Error("marshal RpcResponseData failed", log.String("serviceMethod", serviceMethod), log.ErrorAttr("error", err))
|
||||
log.Error("marshal RpcResponseData failed", log.String("serviceMethod", serviceMethod), log.ErrorField("error", err))
|
||||
return
|
||||
}
|
||||
|
||||
@@ -86,7 +86,7 @@ func (ns *NatsServer) WriteResponse(processor IRpcProcessor, nodeId string, serv
|
||||
if ns.compressBytesLen > 0 && len(bytes) >= ns.compressBytesLen {
|
||||
compressBuff, err = compressor.CompressBlock(bytes)
|
||||
if err != nil {
|
||||
log.Error("CompressBlock failed", log.String("serviceMethod", serviceMethod), log.ErrorAttr("error", err))
|
||||
log.Error("CompressBlock failed", log.String("serviceMethod", serviceMethod), log.ErrorField("error", err))
|
||||
return
|
||||
}
|
||||
if len(compressBuff) < len(bytes) {
|
||||
@@ -106,7 +106,7 @@ func (ns *NatsServer) WriteResponse(processor IRpcProcessor, nodeId string, serv
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
log.Error("WriteMsg error,Rpc return is fail", log.String("nodeId", nodeId), log.String("serviceMethod", serviceMethod), log.ErrorAttr("error", err))
|
||||
log.Error("WriteMsg error,Rpc return is fail", log.String("nodeId", nodeId), log.String("serviceMethod", serviceMethod), log.ErrorField("error", err))
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -6,7 +6,7 @@ import (
|
||||
"github.com/duanhf2012/origin/v2/network"
|
||||
"math"
|
||||
"reflect"
|
||||
"runtime"
|
||||
|
||||
"sync/atomic"
|
||||
"time"
|
||||
)
|
||||
@@ -49,7 +49,7 @@ func (rc *RClient) Go(nodeId string, timeout time.Duration, rpcHandler IRpcHandl
|
||||
_, processor := GetProcessorType(args)
|
||||
InParam, err := processor.Marshal(args)
|
||||
if err != nil {
|
||||
log.Error("Marshal is fail", log.ErrorAttr("error", err))
|
||||
log.Error("Marshal is fail", log.ErrorField("error", err))
|
||||
call := MakeCall()
|
||||
call.DoError(err)
|
||||
return call
|
||||
@@ -62,8 +62,8 @@ func (rc *RClient) RawGo(nodeId string, timeout time.Duration, rpcHandler IRpcHa
|
||||
return rc.selfClient.rawGo(nodeId, rc, timeout, rpcHandler, processor, noReply, rpcMethodId, serviceMethod, rawArgs, reply)
|
||||
}
|
||||
|
||||
func (rc *RClient) AsyncCall(nodeId string, timeout time.Duration, rpcHandler IRpcHandler, serviceMethod string, callback reflect.Value, args interface{}, replyParam interface{}, cancelable bool) (CancelRpc, error) {
|
||||
cancelRpc, err := rc.selfClient.asyncCall(nodeId, rc, timeout, rpcHandler, serviceMethod, callback, args, replyParam, cancelable)
|
||||
func (rc *RClient) AsyncCall(nodeId string, timeout time.Duration, rpcHandler IRpcHandler, serviceMethod string, callback reflect.Value, args interface{}, replyParam interface{}) (CancelRpc, error) {
|
||||
cancelRpc, err := rc.selfClient.asyncCall(nodeId, rc, timeout, rpcHandler, serviceMethod, callback, args, replyParam)
|
||||
if err != nil {
|
||||
callback.Call([]reflect.Value{reflect.ValueOf(replyParam), reflect.ValueOf(err)})
|
||||
}
|
||||
@@ -74,10 +74,7 @@ func (rc *RClient) AsyncCall(nodeId string, timeout time.Duration, rpcHandler IR
|
||||
func (rc *RClient) Run() {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
buf := make([]byte, 4096)
|
||||
l := runtime.Stack(buf, false)
|
||||
errString := fmt.Sprint(r)
|
||||
log.Dump(string(buf[:l]), log.String("error", errString))
|
||||
log.StackError(fmt.Sprint(r))
|
||||
}
|
||||
}()
|
||||
|
||||
@@ -89,7 +86,7 @@ func (rc *RClient) Run() {
|
||||
for {
|
||||
bytes, err := rc.conn.ReadMsg()
|
||||
if err != nil {
|
||||
log.Error("RClient read msg is failed", log.ErrorAttr("error", err))
|
||||
log.Error("RClient read msg is failed", log.ErrorField("error", err))
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
@@ -6,7 +6,7 @@ import (
|
||||
"github.com/duanhf2012/origin/v2/event"
|
||||
"github.com/duanhf2012/origin/v2/log"
|
||||
"reflect"
|
||||
"runtime"
|
||||
|
||||
"strings"
|
||||
"time"
|
||||
"unicode"
|
||||
@@ -164,13 +164,6 @@ func (handler *RpcHandler) suitableMethods(method reflect.Method) error {
|
||||
//取出输入参数类型
|
||||
var rpcMethodInfo RpcMethodInfo
|
||||
typ := method.Type
|
||||
if typ.NumOut() != 1 {
|
||||
return fmt.Errorf("%s The number of returned arguments must be 1", method.Name)
|
||||
}
|
||||
|
||||
if typ.Out(0).String() != "error" {
|
||||
return fmt.Errorf("%s The return parameter must be of type error", method.Name)
|
||||
}
|
||||
|
||||
if typ.NumIn() < 2 || typ.NumIn() > 4 {
|
||||
return fmt.Errorf("%s Unsupported parameter format", method.Name)
|
||||
@@ -183,6 +176,18 @@ func (handler *RpcHandler) suitableMethods(method reflect.Method) error {
|
||||
rpcMethodInfo.hasResponder = true
|
||||
}
|
||||
|
||||
if rpcMethodInfo.hasResponder && typ.NumOut() > 0 {
|
||||
return fmt.Errorf("%s should not have return parameters", method.Name)
|
||||
}
|
||||
|
||||
if !rpcMethodInfo.hasResponder && typ.NumOut() != 1 {
|
||||
return fmt.Errorf("%s The number of returned arguments must be 1", method.Name)
|
||||
}
|
||||
|
||||
if !rpcMethodInfo.hasResponder && typ.Out(0).String() != "error" {
|
||||
return fmt.Errorf("%s The return parameter must be of type error", method.Name)
|
||||
}
|
||||
|
||||
for i := parIdx; i < typ.NumIn(); i++ {
|
||||
if handler.isExportedOrBuiltinType(typ.In(i)) == false {
|
||||
return fmt.Errorf("%s Unsupported parameter types", method.Name)
|
||||
@@ -220,10 +225,7 @@ func (handler *RpcHandler) RegisterRpc(rpcHandler IRpcHandler) error {
|
||||
func (handler *RpcHandler) HandlerRpcResponseCB(call *Call) {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
buf := make([]byte, 4096)
|
||||
l := runtime.Stack(buf, false)
|
||||
errString := fmt.Sprint(r)
|
||||
log.Dump(string(buf[:l]), log.String("error", errString))
|
||||
log.StackError(fmt.Sprint(r))
|
||||
}
|
||||
}()
|
||||
|
||||
@@ -242,10 +244,7 @@ func (handler *RpcHandler) HandlerRpcRequest(request *RpcRequest) {
|
||||
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
buf := make([]byte, 4096)
|
||||
l := runtime.Stack(buf, false)
|
||||
errString := fmt.Sprint(r)
|
||||
log.Dump(string(buf[:l]), log.String("error", errString))
|
||||
log.StackError(fmt.Sprint(r))
|
||||
rpcErr := RpcError("call error : core dumps")
|
||||
if request.requestHandle != nil {
|
||||
request.requestHandle(nil, rpcErr)
|
||||
@@ -313,9 +312,11 @@ func (handler *RpcHandler) HandlerRpcRequest(request *RpcRequest) {
|
||||
|
||||
requestHandle := request.requestHandle
|
||||
returnValues := v.method.Func.Call(paramList)
|
||||
errInter := returnValues[0].Interface()
|
||||
if errInter != nil {
|
||||
err = errInter.(error)
|
||||
if len(returnValues) > 0 {
|
||||
errInter := returnValues[0].Interface()
|
||||
if errInter != nil {
|
||||
err = errInter.(error)
|
||||
}
|
||||
}
|
||||
|
||||
if v.hasResponder == false && requestHandle != nil {
|
||||
@@ -439,7 +440,7 @@ func (handler *RpcHandler) goRpc(processor IRpcProcessor, bCast bool, nodeId str
|
||||
err, pClientList := handler.funcRpcClient(nodeId, serviceMethod, false, pClientList)
|
||||
if len(pClientList) == 0 {
|
||||
if err != nil {
|
||||
log.Error("call serviceMethod is failed", log.String("serviceMethod", serviceMethod), log.ErrorAttr("error", err))
|
||||
log.Error("call serviceMethod is failed", log.String("serviceMethod", serviceMethod), log.ErrorField("error", err))
|
||||
} else {
|
||||
log.Error("cannot find serviceMethod", log.String("serviceMethod", serviceMethod))
|
||||
}
|
||||
@@ -468,7 +469,7 @@ func (handler *RpcHandler) callRpc(timeout time.Duration, nodeId string, service
|
||||
pClientList := make([]*Client, 0, maxClusterNode)
|
||||
err, pClientList := handler.funcRpcClient(nodeId, serviceMethod, false, pClientList)
|
||||
if err != nil {
|
||||
log.Error("Call serviceMethod is failed", log.ErrorAttr("error", err))
|
||||
log.Error("Call serviceMethod is failed", log.ErrorField("error", err))
|
||||
return err
|
||||
} else if len(pClientList) <= 0 {
|
||||
err = errors.New("Call serviceMethod is error:cannot find " + serviceMethod)
|
||||
@@ -532,8 +533,7 @@ func (handler *RpcHandler) asyncCallRpc(timeout time.Duration, nodeId string, se
|
||||
}
|
||||
|
||||
//2.rpcClient调用
|
||||
//如果调用本结点服务
|
||||
return pClientList[0].AsyncCall(pClientList[0].GetTargetNodeId(), timeout, handler.rpcHandler, serviceMethod, fVal, args, reply, false)
|
||||
return pClientList[0].AsyncCall(pClientList[0].GetTargetNodeId(), timeout, handler.rpcHandler, serviceMethod, fVal, args, reply, )
|
||||
}
|
||||
|
||||
func (handler *RpcHandler) GetName() string {
|
||||
@@ -592,7 +592,7 @@ func (handler *RpcHandler) RawGoNode(rpcProcessorType RpcProcessorType, nodeId s
|
||||
pClientList := make([]*Client, 0, 1)
|
||||
err, pClientList := handler.funcRpcClient(nodeId, serviceName, false, pClientList)
|
||||
if len(pClientList) == 0 || err != nil {
|
||||
log.Error("call serviceMethod is failed", log.ErrorAttr("error", err))
|
||||
log.Error("call serviceMethod is failed", log.ErrorField("error", err))
|
||||
return err
|
||||
}
|
||||
if len(pClientList) > 1 {
|
||||
|
||||
@@ -27,9 +27,6 @@ func (rn *RpcNats) NewNatsClient(targetNodeId string,localNodeId string,callSet
|
||||
|
||||
client.clientId = atomic.AddUint32(&clientSeq, 1)
|
||||
client.targetNodeId = targetNodeId
|
||||
//client.maxCheckCallRpcCount = DefaultMaxCheckCallRpcCount
|
||||
//client.callRpcTimeout = DefaultRpcTimeout
|
||||
|
||||
natsClient := &rn.NatsClient
|
||||
natsClient.localNodeId = localNodeId
|
||||
natsClient.client = &client
|
||||
|
||||
@@ -7,7 +7,7 @@ import (
|
||||
"math"
|
||||
"net"
|
||||
"reflect"
|
||||
"runtime"
|
||||
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
@@ -31,7 +31,7 @@ type IServer interface {
|
||||
|
||||
selfNodeRpcHandlerGo(timeout time.Duration, processor IRpcProcessor, client *Client, noReply bool, handlerName string, rpcMethodId uint32, serviceMethod string, args interface{}, reply interface{}, rawArgs []byte) *Call
|
||||
myselfRpcHandlerGo(client *Client, handlerName string, serviceMethod string, args interface{}, callBack reflect.Value, reply interface{}) error
|
||||
selfNodeRpcHandlerAsyncGo(timeout time.Duration, client *Client, callerRpcHandler IRpcHandler, noReply bool, handlerName string, serviceMethod string, args interface{}, reply interface{}, callback reflect.Value, cancelable bool) (CancelRpc, error)
|
||||
selfNodeRpcHandlerAsyncGo(timeout time.Duration, client *Client, callerRpcHandler IRpcHandler, noReply bool, handlerName string, serviceMethod string, args interface{}, reply interface{}, callback reflect.Value) (CancelRpc, error)
|
||||
}
|
||||
|
||||
type writeResponse func(processor IRpcProcessor, connTag string, serviceMethod string, seq uint64, reply interface{}, rpcError RpcError)
|
||||
@@ -130,7 +130,7 @@ func (agent *RpcAgent) WriteResponse(processor IRpcProcessor, connTag string, se
|
||||
defer processor.ReleaseRpcResponse(rpcResponse.RpcResponseData)
|
||||
|
||||
if errM != nil {
|
||||
log.Error("marshal RpcResponseData failed", log.String("serviceMethod", serviceMethod), log.ErrorAttr("error", errM))
|
||||
log.Error("marshal RpcResponseData failed", log.String("serviceMethod", serviceMethod), log.ErrorField("error", errM))
|
||||
return
|
||||
}
|
||||
|
||||
@@ -141,7 +141,7 @@ func (agent *RpcAgent) WriteResponse(processor IRpcProcessor, connTag string, se
|
||||
|
||||
compressBuff, cErr = compressor.CompressBlock(bytes)
|
||||
if cErr != nil {
|
||||
log.Error("CompressBlock failed", log.String("serviceMethod", serviceMethod), log.ErrorAttr("error", cErr))
|
||||
log.Error("CompressBlock failed", log.String("serviceMethod", serviceMethod), log.ErrorField("error", cErr))
|
||||
return
|
||||
}
|
||||
if len(compressBuff) < len(bytes) {
|
||||
@@ -155,17 +155,14 @@ func (agent *RpcAgent) WriteResponse(processor IRpcProcessor, connTag string, se
|
||||
compressor.CompressBufferCollection(compressBuff)
|
||||
}
|
||||
if errM != nil {
|
||||
log.Error("WriteMsg error,Rpc return is fail", log.String("serviceMethod", serviceMethod), log.ErrorAttr("error", errM))
|
||||
log.Error("WriteMsg error,Rpc return is fail", log.String("serviceMethod", serviceMethod), log.ErrorField("error", errM))
|
||||
}
|
||||
}
|
||||
|
||||
func (agent *RpcAgent) Run() {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
buf := make([]byte, 4096)
|
||||
l := runtime.Stack(buf, false)
|
||||
errString := fmt.Sprint(r)
|
||||
log.Dump(string(buf[:l]), log.String("error", errString))
|
||||
log.StackError(fmt.Sprint(r))
|
||||
}
|
||||
}()
|
||||
|
||||
@@ -173,7 +170,7 @@ func (agent *RpcAgent) Run() {
|
||||
data, err := agent.conn.ReadMsg()
|
||||
if err != nil {
|
||||
//will close conn
|
||||
log.Error("read message is error", log.String("remoteAddress", agent.conn.RemoteAddr().String()), log.ErrorAttr("error", err))
|
||||
log.Error("read message is error", log.String("remoteAddress", agent.conn.RemoteAddr().String()), log.ErrorField("error", err))
|
||||
break
|
||||
}
|
||||
|
||||
@@ -181,7 +178,7 @@ func (agent *RpcAgent) Run() {
|
||||
if err != nil {
|
||||
//will close conn
|
||||
agent.conn.ReleaseReadMsg(data)
|
||||
log.Error("processRpcRequest is error", log.String("remoteAddress", agent.conn.RemoteAddr().String()), log.ErrorAttr("error", err))
|
||||
log.Error("processRpcRequest is error", log.String("remoteAddress", agent.conn.RemoteAddr().String()), log.ErrorField("error", err))
|
||||
|
||||
break
|
||||
}
|
||||
|
||||
@@ -11,6 +11,7 @@ import (
|
||||
"github.com/duanhf2012/origin/v2/log"
|
||||
rpcHandle "github.com/duanhf2012/origin/v2/rpc"
|
||||
"github.com/duanhf2012/origin/v2/util/timer"
|
||||
"slices"
|
||||
)
|
||||
|
||||
const InitModuleId = 1e9
|
||||
@@ -35,9 +36,9 @@ type IModule interface {
|
||||
}
|
||||
|
||||
type IModuleTimer interface {
|
||||
AfterFunc(d time.Duration, cb func(*timer.Timer)) *timer.Timer
|
||||
CronFunc(cronExpr *timer.CronExpr, cb func(*timer.Cron)) *timer.Cron
|
||||
NewTicker(d time.Duration, cb func(*timer.Ticker)) *timer.Ticker
|
||||
SafeAfterFunc(d time.Duration, cb func(*timer.Timer)) *timer.Timer
|
||||
SafeCronFunc(cronExpr *timer.CronExpr, cb func(*timer.Cron)) *timer.Cron
|
||||
SafeNewTicker(d time.Duration, cb func(*timer.Ticker)) *timer.Ticker
|
||||
}
|
||||
|
||||
type Module struct {
|
||||
@@ -46,7 +47,7 @@ type Module struct {
|
||||
moduleName string //模块名称
|
||||
parent IModule //父亲
|
||||
self IModule //自己
|
||||
child map[uint32]IModule //孩子们
|
||||
child []IModule //孩子们
|
||||
mapActiveTimer map[timer.ITimer]struct{}
|
||||
mapActiveIdTimer map[uint64]timer.ITimer
|
||||
dispatcher *timer.Dispatcher //timer
|
||||
@@ -93,10 +94,7 @@ func (m *Module) AddModule(module IModule) (uint32, error) {
|
||||
pAddModule.moduleId = m.NewModuleId()
|
||||
}
|
||||
|
||||
if m.child == nil {
|
||||
m.child = map[uint32]IModule{}
|
||||
}
|
||||
_, ok := m.child[module.GetModuleId()]
|
||||
_,ok := m.ancestor.getBaseModule().(*Module).descendants[module.GetModuleId()]
|
||||
if ok == true {
|
||||
return 0, fmt.Errorf("exists module id %d", module.GetModuleId())
|
||||
}
|
||||
@@ -109,29 +107,33 @@ func (m *Module) AddModule(module IModule) (uint32, error) {
|
||||
pAddModule.eventHandler = event.NewEventHandler()
|
||||
pAddModule.eventHandler.Init(m.eventHandler.GetEventProcessor())
|
||||
pAddModule.IConcurrent = m.IConcurrent
|
||||
|
||||
m.child = append(m.child,module)
|
||||
m.ancestor.getBaseModule().(*Module).descendants[module.GetModuleId()] = module
|
||||
|
||||
err := module.OnInit()
|
||||
if err != nil {
|
||||
delete(m.ancestor.getBaseModule().(*Module).descendants, module.GetModuleId())
|
||||
m.child = m.child[:len(m.child)-1]
|
||||
log.Error("module OnInit error",log.String("ModuleName",module.GetModuleName()),log.ErrorField("err",err))
|
||||
return 0, err
|
||||
}
|
||||
|
||||
m.child[module.GetModuleId()] = module
|
||||
m.ancestor.getBaseModule().(*Module).descendants[module.GetModuleId()] = module
|
||||
|
||||
log.Debug("Add module " + module.GetModuleName() + " completed")
|
||||
return module.GetModuleId(), nil
|
||||
}
|
||||
|
||||
func (m *Module) ReleaseModule(moduleId uint32) {
|
||||
pModule := m.GetModule(moduleId).getBaseModule().(*Module)
|
||||
pModule.self.OnRelease()
|
||||
log.Debug("Release module " + pModule.GetModuleName())
|
||||
|
||||
//释放子孙
|
||||
for id := range pModule.child {
|
||||
m.ReleaseModule(id)
|
||||
for i:=len(pModule.child)-1; i>=0; i-- {
|
||||
m.ReleaseModule(pModule.child[i].GetModuleId())
|
||||
}
|
||||
|
||||
pModule.self.OnRelease()
|
||||
pModule.GetEventHandler().Destroy()
|
||||
log.Debug("Release module " + pModule.GetModuleName())
|
||||
|
||||
for pTimer := range pModule.mapActiveTimer {
|
||||
pTimer.Cancel()
|
||||
}
|
||||
@@ -140,7 +142,10 @@ func (m *Module) ReleaseModule(moduleId uint32) {
|
||||
t.Cancel()
|
||||
}
|
||||
|
||||
delete(m.child, moduleId)
|
||||
m.child = slices.DeleteFunc(m.child, func(module IModule) bool {
|
||||
return module.GetModuleId() == moduleId
|
||||
})
|
||||
|
||||
delete(m.ancestor.getBaseModule().(*Module).descendants, moduleId)
|
||||
|
||||
//清理被删除的Module
|
||||
@@ -208,6 +213,7 @@ func (m *Module) OnAddTimer(t timer.ITimer) {
|
||||
}
|
||||
}
|
||||
|
||||
// Deprecated: this function simply calls SafeAfterFunc
|
||||
func (m *Module) AfterFunc(d time.Duration, cb func(*timer.Timer)) *timer.Timer {
|
||||
if m.mapActiveTimer == nil {
|
||||
m.mapActiveTimer = map[timer.ITimer]struct{}{}
|
||||
@@ -216,6 +222,7 @@ func (m *Module) AfterFunc(d time.Duration, cb func(*timer.Timer)) *timer.Timer
|
||||
return m.dispatcher.AfterFunc(d, nil, cb, m.OnCloseTimer, m.OnAddTimer)
|
||||
}
|
||||
|
||||
// Deprecated: this function simply calls SafeCronFunc
|
||||
func (m *Module) CronFunc(cronExpr *timer.CronExpr, cb func(*timer.Cron)) *timer.Cron {
|
||||
if m.mapActiveTimer == nil {
|
||||
m.mapActiveTimer = map[timer.ITimer]struct{}{}
|
||||
@@ -224,6 +231,7 @@ func (m *Module) CronFunc(cronExpr *timer.CronExpr, cb func(*timer.Cron)) *timer
|
||||
return m.dispatcher.CronFunc(cronExpr, nil, cb, m.OnCloseTimer, m.OnAddTimer)
|
||||
}
|
||||
|
||||
// Deprecated: this function simply calls SafeNewTicker
|
||||
func (m *Module) NewTicker(d time.Duration, cb func(*timer.Ticker)) *timer.Ticker {
|
||||
if m.mapActiveTimer == nil {
|
||||
m.mapActiveTimer = map[timer.ITimer]struct{}{}
|
||||
@@ -278,7 +286,7 @@ func (m *Module) SafeNewTicker(tickerId *uint64, d time.Duration, AdditionData i
|
||||
|
||||
func (m *Module) CancelTimerId(timerId *uint64) bool {
|
||||
if timerId == nil || *timerId == 0 {
|
||||
log.Warning("timerId is invalid")
|
||||
log.Warn("timerId is invalid")
|
||||
return false
|
||||
}
|
||||
|
||||
@@ -289,7 +297,7 @@ func (m *Module) CancelTimerId(timerId *uint64) bool {
|
||||
|
||||
t, ok := m.mapActiveIdTimer[*timerId]
|
||||
if ok == false {
|
||||
log.Stack("cannot find timer id ", log.Uint64("timerId", *timerId))
|
||||
log.StackError("cannot find timer id ", log.Uint64("timerId", *timerId))
|
||||
return false
|
||||
}
|
||||
|
||||
|
||||
@@ -11,7 +11,6 @@ import (
|
||||
"github.com/duanhf2012/origin/v2/rpc"
|
||||
"github.com/duanhf2012/origin/v2/util/timer"
|
||||
"reflect"
|
||||
"runtime"
|
||||
"strconv"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
@@ -261,17 +260,16 @@ func (s *Service) SetName(serviceName string) {
|
||||
func (s *Service) Release() {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
buf := make([]byte, 4096)
|
||||
l := runtime.Stack(buf, false)
|
||||
errString := fmt.Sprint(r)
|
||||
log.Dump(string(buf[:l]), log.String("error", errString))
|
||||
log.StackError(fmt.Sprint(r))
|
||||
}
|
||||
}()
|
||||
|
||||
if atomic.AddInt32(&s.isRelease, -1) == -1 {
|
||||
s.self.OnRelease()
|
||||
for i:=len(s.child)-1; i>=0; i-- {
|
||||
s.ReleaseModule(s.child[i].GetModuleId())
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func (s *Service) OnRelease() {
|
||||
@@ -436,6 +434,7 @@ func (s *Service) SetEventChannelNum(num int) {
|
||||
}
|
||||
}
|
||||
|
||||
// Deprecated: replace it with the OpenConcurrent function
|
||||
func (s *Service) SetGoRoutineNum(goroutineNum int32) bool {
|
||||
//已经开始状态不允许修改协程数量,打开性能分析器不允许开多线程
|
||||
if s.startStatus == true || s.profiler != nil {
|
||||
|
||||
@@ -23,7 +23,7 @@ func Init() {
|
||||
for _,s := range setupServiceList {
|
||||
err := s.OnInit()
|
||||
if err != nil {
|
||||
log.Error("Failed to initialize "+s.GetName()+" service",log.ErrorAttr("err",err))
|
||||
log.Error("Failed to initialize "+s.GetName()+" service",log.ErrorField("err",err))
|
||||
os.Exit(1)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -7,7 +7,6 @@ import (
|
||||
"github.com/duanhf2012/origin/v2/service"
|
||||
"github.com/gin-gonic/gin"
|
||||
"io"
|
||||
"log/slog"
|
||||
"net/http"
|
||||
"strings"
|
||||
"time"
|
||||
@@ -69,28 +68,28 @@ func (gm *GinModule) eventHandler(ev event.IEvent) {
|
||||
|
||||
func (gm *GinModule) Start() {
|
||||
gm.srv.Addr = gm.listenAddr
|
||||
log.Info("http start listen", slog.Any("addr", gm.listenAddr))
|
||||
log.Info("http start listen", log.Any("addr", gm.listenAddr))
|
||||
go func() {
|
||||
err := gm.srv.ListenAndServe()
|
||||
if err != nil {
|
||||
log.Error("ListenAndServe error", slog.Any("error", err.Error()))
|
||||
log.Error("ListenAndServe error", log.Any("error", err.Error()))
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func (gm *GinModule) StartTLS(certFile, keyFile string) {
|
||||
log.Info("http start listen", slog.Any("addr", gm.listenAddr))
|
||||
log.Info("http start listen", log.Any("addr", gm.listenAddr))
|
||||
go func() {
|
||||
err := gm.srv.ListenAndServeTLS(certFile, keyFile)
|
||||
if err != nil {
|
||||
log.Fatal("ListenAndServeTLS error", slog.Any("error", err.Error()))
|
||||
log.Fatal("ListenAndServeTLS error", log.Any("error", err.Error()))
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func (gm *GinModule) Stop(ctx context.Context) {
|
||||
if err := gm.srv.Shutdown(ctx); err != nil {
|
||||
log.Error("Server Shutdown", slog.Any("error", err))
|
||||
log.Error("Server Shutdown", log.Any("error", err))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -210,7 +209,7 @@ func (gm *GinModule) handleMethod(httpMethod, relativePath string, handlers ...S
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
log.Error("GinModule process timeout", slog.Any("path", c.Request.URL.Path))
|
||||
log.Error("GinModule process timeout", log.Any("path", c.Request.URL.Path))
|
||||
c.AbortWithStatus(http.StatusRequestTimeout)
|
||||
case <-chanWait:
|
||||
}
|
||||
|
||||
@@ -9,7 +9,6 @@ import (
|
||||
"github.com/duanhf2012/origin/v2/service"
|
||||
"github.com/xtaci/kcp-go/v5"
|
||||
"go.mongodb.org/mongo-driver/bson/primitive"
|
||||
"runtime"
|
||||
"sync"
|
||||
)
|
||||
|
||||
@@ -167,10 +166,7 @@ func (km *KcpModule) NewAgent(conn network.Conn) network.Agent {
|
||||
func (c *Client) Run() {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
buf := make([]byte, 4096)
|
||||
l := runtime.Stack(buf, false)
|
||||
errString := fmt.Sprint(r)
|
||||
log.Dump(string(buf[:l]), log.String("error", errString))
|
||||
log.StackError(fmt.Sprint(r))
|
||||
}
|
||||
}()
|
||||
|
||||
@@ -179,7 +175,7 @@ func (c *Client) Run() {
|
||||
c.kcpConn.SetReadDeadline(*c.kcpModule.kcpCfg.ReadDeadlineMill)
|
||||
msgBuff, err := c.kcpConn.ReadMsg()
|
||||
if err != nil {
|
||||
log.Debug("read client failed", log.ErrorAttr("error", err), log.String("clientId", c.id))
|
||||
log.Debug("read client failed", log.ErrorField("error", err), log.String("clientId", c.id))
|
||||
break
|
||||
}
|
||||
|
||||
|
||||
@@ -8,7 +8,6 @@ import (
|
||||
"github.com/duanhf2012/origin/v2/network/processor"
|
||||
"github.com/duanhf2012/origin/v2/service"
|
||||
"go.mongodb.org/mongo-driver/bson/primitive"
|
||||
"runtime"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
@@ -125,10 +124,7 @@ func (slf *Client) GetId() string {
|
||||
func (slf *Client) Run() {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
buf := make([]byte, 4096)
|
||||
l := runtime.Stack(buf, false)
|
||||
errString := fmt.Sprint(r)
|
||||
log.Dump(string(buf[:l]), log.String("error", errString))
|
||||
log.StackError(fmt.Sprint(r))
|
||||
}
|
||||
}()
|
||||
|
||||
@@ -137,7 +133,7 @@ func (slf *Client) Run() {
|
||||
slf.tcpConn.SetReadDeadline(slf.tcpModule.tcpServer.ReadDeadline)
|
||||
bytes, err := slf.tcpConn.ReadMsg()
|
||||
if err != nil {
|
||||
log.Debug("read client failed", log.ErrorAttr("error", err), log.String("clientId", slf.id))
|
||||
log.Debug("read client failed", log.ErrorField("error", err), log.String("clientId", slf.id))
|
||||
break
|
||||
}
|
||||
data, err := slf.tcpModule.process.Unmarshal(slf.id, bytes)
|
||||
@@ -185,7 +181,7 @@ func (tm *TcpModule) Close(clientId string) {
|
||||
client.tcpConn.Close()
|
||||
}
|
||||
|
||||
log.SWarning("close client:", clientId)
|
||||
log.SWarn("close client:", clientId)
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
@@ -14,7 +14,7 @@ import (
|
||||
type WSModule struct {
|
||||
service.Module
|
||||
|
||||
wsServer network.WSServer
|
||||
WSServer network.WSServer
|
||||
|
||||
mapClientLocker sync.RWMutex
|
||||
mapClient map[string]*WSClient
|
||||
@@ -57,16 +57,16 @@ func (ws *WSModule) OnInit() error {
|
||||
return fmt.Errorf("please call the Init function correctly")
|
||||
}
|
||||
|
||||
ws.wsServer.MaxConnNum = ws.wsCfg.MaxConnNum
|
||||
ws.wsServer.PendingWriteNum = ws.wsCfg.PendingWriteNum
|
||||
ws.wsServer.MaxMsgLen = ws.wsCfg.MaxMsgLen
|
||||
ws.wsServer.Addr = ws.wsCfg.ListenAddr
|
||||
ws.WSServer.MaxConnNum = ws.wsCfg.MaxConnNum
|
||||
ws.WSServer.PendingWriteNum = ws.wsCfg.PendingWriteNum
|
||||
ws.WSServer.MaxMsgLen = ws.wsCfg.MaxMsgLen
|
||||
ws.WSServer.Addr = ws.wsCfg.ListenAddr
|
||||
|
||||
//3.设置解析处理器
|
||||
ws.process.SetByteOrder(ws.wsCfg.LittleEndian)
|
||||
|
||||
ws.mapClient = make(map[string]*WSClient, ws.wsServer.MaxConnNum)
|
||||
ws.wsServer.NewAgent = ws.NewWSClient
|
||||
ws.mapClient = make(map[string]*WSClient, ws.WSServer.MaxConnNum)
|
||||
ws.WSServer.NewAgent = ws.NewWSClient
|
||||
|
||||
//4.设置网络事件处理
|
||||
ws.GetEventProcessor().RegEventReceiverFunc(event.Sys_Event_WebSocket, ws.GetEventHandler(), ws.wsEventHandler)
|
||||
@@ -80,7 +80,7 @@ func (ws *WSModule) Init(wsCfg *WSCfg, process processor.IRawProcessor) {
|
||||
}
|
||||
|
||||
func (ws *WSModule) Start() error {
|
||||
return ws.wsServer.Start()
|
||||
return ws.WSServer.Start()
|
||||
}
|
||||
|
||||
func (ws *WSModule) wsEventHandler(ev event.IEvent) {
|
||||
@@ -120,7 +120,7 @@ func (wc *WSClient) Run() {
|
||||
for {
|
||||
bytes, err := wc.wsConn.ReadMsg()
|
||||
if err != nil {
|
||||
log.Debug("read client is error", log.String("clientId", wc.id), log.ErrorAttr("err", err))
|
||||
log.Debug("read client is error", log.String("clientId", wc.id), log.ErrorField("err", err))
|
||||
break
|
||||
}
|
||||
data, err := wc.wsModule.process.Unmarshal(wc.id, bytes)
|
||||
@@ -197,3 +197,7 @@ func (ws *WSModule) SendRawMsg(clientId string, msg []byte) error {
|
||||
ws.mapClientLocker.Unlock()
|
||||
return client.wsConn.WriteMsg(msg)
|
||||
}
|
||||
|
||||
func (ws *WSModule) SetMessageType(messageType int) {
|
||||
ws.WSServer.SetMessageType(messageType)
|
||||
}
|
||||
|
||||
@@ -66,7 +66,7 @@ func (m *RedisModule) Init(redisCfg *ConfigRedis) {
|
||||
}
|
||||
c, err := redis.Dial("tcp", redisServer, opt...)
|
||||
if err != nil {
|
||||
log.Error("Connect redis fail reason:%v", err)
|
||||
log.Error("Connect redis fail", log.ErrorField("err",err))
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@@ -79,7 +79,7 @@ func (m *RedisModule) Init(redisCfg *ConfigRedis) {
|
||||
}
|
||||
_, err := c.Do("PING")
|
||||
if err != nil {
|
||||
log.Error("Do PING fail reason:%v", err)
|
||||
log.Error("Do PING fail reason", log.ErrorField("err",err))
|
||||
return err
|
||||
}
|
||||
return err
|
||||
@@ -101,7 +101,7 @@ func (m *RedisModule) getConn() (redis.Conn, error) {
|
||||
if conn.Err() != nil {
|
||||
err := conn.Err()
|
||||
if err != nil {
|
||||
log.Error("get Conn have error,reason:%v", err)
|
||||
log.Error("get Conn have error", log.ErrorField("err",err))
|
||||
}
|
||||
conn.Close()
|
||||
return nil, err
|
||||
@@ -118,7 +118,7 @@ func (m *RedisModule) TestPingRedis() error {
|
||||
|
||||
err = m.redisPool.TestOnBorrow(conn, time.Now())
|
||||
if err != nil {
|
||||
log.Error("TestOnBorrow fail,reason:%v", err)
|
||||
log.Error("TestOnBorrow fail", log.ErrorField("err",err))
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -171,7 +171,7 @@ func (m *RedisModule) setStringByExpire(key, value, expire interface{}) error {
|
||||
}
|
||||
|
||||
if retErr != nil {
|
||||
log.Error("setStringByExpire fail,reason:%v", retErr)
|
||||
log.Error("setStringByExpire fail", log.ErrorField("err",retErr))
|
||||
return retErr
|
||||
}
|
||||
|
||||
@@ -254,7 +254,7 @@ func (m *RedisModule) setMuchStringByExpire(mapInfo map[interface{}]interface{},
|
||||
}
|
||||
|
||||
if serr != nil {
|
||||
log.Error("setMuchStringByExpire fail,reason:%v", serr)
|
||||
log.Error("setMuchStringByExpire fail",log.ErrorField("err",serr))
|
||||
conn.Do("DISCARD")
|
||||
return serr
|
||||
} else {
|
||||
@@ -262,7 +262,7 @@ func (m *RedisModule) setMuchStringByExpire(mapInfo map[interface{}]interface{},
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
log.Error("setMuchStringByExpire fail,reason:%v", err)
|
||||
log.Error("setMuchStringByExpire fail", log.ErrorField("err",err))
|
||||
}
|
||||
|
||||
return err
|
||||
@@ -277,7 +277,7 @@ func (m *RedisModule) GetString(key interface{}) (string, error) {
|
||||
|
||||
ret, err := conn.Do("GET", key)
|
||||
if err != nil {
|
||||
log.Error("GetString fail,reason:%v", err)
|
||||
log.Error("GetString fail", log.ErrorField("err",err))
|
||||
return "", err
|
||||
}
|
||||
|
||||
@@ -298,7 +298,7 @@ func (m *RedisModule) GetStringJSON(key string, st interface{}) error {
|
||||
|
||||
ret, err := conn.Do("GET", key)
|
||||
if err != nil {
|
||||
log.Error("GetStringJSON fail,reason:%v", err)
|
||||
log.Error("GetStringJSON fail", log.ErrorField("err",err))
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -315,7 +315,7 @@ func (m *RedisModule) GetStringJSON(key string, st interface{}) error {
|
||||
}
|
||||
|
||||
if err = json.Unmarshal(str, st); err != nil {
|
||||
log.Error("GetStringJSON fail json.Unmarshal is error:%s,%s,reason:%v", key, string(str), err)
|
||||
log.Errorf("GetStringJSON fail json.Unmarshal is error:%s,%s,reason:%v", key, string(str), err)
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -336,13 +336,13 @@ func (m *RedisModule) GetStringMap(keys []string) (retMap map[string]string, err
|
||||
// 开始Send数据
|
||||
err = conn.Send("MULTI")
|
||||
if err != nil {
|
||||
log.Error("GetMuchString fail %v", err)
|
||||
log.Errorf("GetMuchString fail %v", err)
|
||||
return nil, err
|
||||
}
|
||||
for _, val := range keys {
|
||||
err = conn.Send("GET", val)
|
||||
if err != nil {
|
||||
log.Error("GetMuchString fail,reason:%v", err)
|
||||
log.Errorf("GetMuchString fail,reason:%v", err)
|
||||
conn.Do("DISCARD")
|
||||
return nil, err
|
||||
}
|
||||
@@ -351,7 +351,7 @@ func (m *RedisModule) GetStringMap(keys []string) (retMap map[string]string, err
|
||||
// 执行命令
|
||||
ret, err := conn.Do("EXEC")
|
||||
if err != nil {
|
||||
log.Error("GetMuchString fail %v", err)
|
||||
log.Errorf("GetMuchString fail %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -383,7 +383,7 @@ func (m *RedisModule) ExistsKey(key interface{}) (bool, error) {
|
||||
|
||||
ret, err := conn.Do("EXISTS", key)
|
||||
if err != nil {
|
||||
log.Error("ExistsKey fail, reason:%v", err)
|
||||
log.Errorf("ExistsKey fail, reason:%v", err)
|
||||
return false, err
|
||||
}
|
||||
retValue, ok := ret.(int64)
|
||||
@@ -404,7 +404,7 @@ func (m *RedisModule) DelString(key interface{}) error {
|
||||
|
||||
ret, err := conn.Do("DEL", key)
|
||||
if err != nil {
|
||||
log.Error("DelString fail, reason:%v", err)
|
||||
log.Errorf("DelString fail, reason:%v", err)
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -439,7 +439,7 @@ func (m *RedisModule) DelStringKeyList(keys []interface{}) (map[interface{}]bool
|
||||
for _, val := range keys {
|
||||
err = conn.Send("DEL", val)
|
||||
if err != nil {
|
||||
log.Error("DelMuchString fail,reason:%v", err)
|
||||
log.Errorf("DelMuchString fail,reason:%v", err)
|
||||
conn.Do("DISCARD")
|
||||
return nil, err
|
||||
}
|
||||
@@ -448,7 +448,7 @@ func (m *RedisModule) DelStringKeyList(keys []interface{}) (map[interface{}]bool
|
||||
ret, err := conn.Do("EXEC")
|
||||
|
||||
if err != nil {
|
||||
log.Error("DelMuchString fail,reason:%v", err)
|
||||
log.Errorf("DelMuchString fail,reason:%v", err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@@ -484,7 +484,7 @@ func (m *RedisModule) SetHash(redisKey, hashKey, value interface{}) error {
|
||||
|
||||
_, retErr := conn.Do("HSET", redisKey, hashKey, value)
|
||||
if retErr != nil {
|
||||
log.Error("SetHash fail,reason:%v", retErr)
|
||||
log.Errorf("SetHash fail,reason:%v", retErr)
|
||||
}
|
||||
|
||||
return retErr
|
||||
@@ -502,7 +502,7 @@ func (m *RedisModule) GetAllHashJSON(redisKey string) (map[string]string, error)
|
||||
|
||||
value, err := conn.Do("HGETALL", redisKey)
|
||||
if err != nil {
|
||||
log.Error("GetAllHashJSON fail,reason:%v", err)
|
||||
log.Errorf("GetAllHashJSON fail,reason:%v", err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@@ -522,7 +522,7 @@ func (m *RedisModule) GetHash(redisKey interface{}, fieldKey interface{}) (strin
|
||||
|
||||
value, err := conn.Do("HGET", redisKey, fieldKey)
|
||||
if err != nil {
|
||||
log.Error("GetHashValueByKey fail,reason:%v", err)
|
||||
log.Errorf("GetHashValueByKey fail,reason:%v", err)
|
||||
return "", err
|
||||
}
|
||||
if value == nil {
|
||||
@@ -545,7 +545,7 @@ func (m *RedisModule) GetMuchHash(args ...interface{}) ([]string, error) {
|
||||
|
||||
value, err := conn.Do("HMGET", args...)
|
||||
if err != nil {
|
||||
log.Error("GetHashValueByKey fail,reason:%v", err)
|
||||
log.Errorf("GetHashValueByKey fail,reason:%v", err)
|
||||
return nil, err
|
||||
}
|
||||
if value == nil {
|
||||
@@ -582,7 +582,7 @@ func (m *RedisModule) ScanMatchKeys(cursorValue int, redisKey string, count int)
|
||||
|
||||
value, err := conn.Do("SCAN", cursorValue, "match", redisKey, "count", count)
|
||||
if err != nil {
|
||||
log.Error("GetHashValueByKey fail,reason:%v", err)
|
||||
log.Errorf("GetHashValueByKey fail,reason:%v", err)
|
||||
return nextCursorValue, nil, err
|
||||
}
|
||||
if value == nil {
|
||||
@@ -618,7 +618,7 @@ func (m *RedisModule) SetHashMapJSON(redisKey string, mapFieldValue map[interfac
|
||||
if err == nil {
|
||||
_, err = conn.Do("HSET", redisKey, symbol, temp)
|
||||
if err != nil {
|
||||
log.Error("SetMuchHashJSON fail,reason:%v", err)
|
||||
log.Errorf("SetMuchHashJSON fail,reason:%v", err)
|
||||
conn.Send("DISCARD")
|
||||
return err
|
||||
}
|
||||
@@ -627,7 +627,7 @@ func (m *RedisModule) SetHashMapJSON(redisKey string, mapFieldValue map[interfac
|
||||
// 执行命令
|
||||
_, err = conn.Do("EXEC")
|
||||
if err != nil {
|
||||
log.Error("SetMuchHashJSON fail,reason:%v", err)
|
||||
log.Errorf("SetMuchHashJSON fail,reason:%v", err)
|
||||
conn.Send("DISCARD")
|
||||
}
|
||||
return err
|
||||
@@ -642,7 +642,7 @@ func (m *RedisModule) DelHash(args ...interface{}) error {
|
||||
|
||||
_, retErr := conn.Do("HDEL", args...)
|
||||
if retErr != nil {
|
||||
log.Error("DelMuchHash fail,reason:%v", retErr)
|
||||
log.Errorf("DelMuchHash fail,reason:%v", retErr)
|
||||
}
|
||||
return retErr
|
||||
}
|
||||
@@ -678,7 +678,7 @@ func (m *RedisModule) setListPush(setType string, args ...interface{}) error {
|
||||
|
||||
_, retErr := conn.Do(setType, args...)
|
||||
if retErr != nil {
|
||||
log.Error("setList fail,reason:%v", retErr)
|
||||
log.Errorf("setList fail,reason:%v", retErr)
|
||||
}
|
||||
return retErr
|
||||
}
|
||||
@@ -705,7 +705,7 @@ func (m *RedisModule) LRangeList(key string, start, end int) ([]string, error) {
|
||||
|
||||
reply, err := conn.Do("lrange", key, start, end)
|
||||
if err != nil {
|
||||
log.Error("SetListJSONRpush fail,reason:%v", err)
|
||||
log.Errorf("SetListJSONRpush fail,reason:%v", err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@@ -722,7 +722,7 @@ func (m *RedisModule) GetListLen(key string) (int, error) {
|
||||
|
||||
reply, err := conn.Do("LLEN", key)
|
||||
if err != nil {
|
||||
log.Error("GetListLen fail,reason:%v", err)
|
||||
log.Errorf("GetListLen fail,reason:%v", err)
|
||||
return -1, err
|
||||
}
|
||||
return redis.Int(reply, err)
|
||||
@@ -748,7 +748,7 @@ func (m *RedisModule) LTrimList(key string, start, end int) error {
|
||||
|
||||
_, err = conn.Do("LTRIM", key, start, end)
|
||||
if err != nil {
|
||||
log.Error("LtrimListValue fail,reason:%v", err)
|
||||
log.Errorf("LtrimListValue fail,reason:%v", err)
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
@@ -849,7 +849,7 @@ func (m *RedisModule) ZADDInsertJson(key string, score float64, value interface{
|
||||
}
|
||||
_, err = conn.Do("ZADD", key, score, JsonValue)
|
||||
if err != nil {
|
||||
log.Error("ZADDInsertJson fail,reason:%v", err)
|
||||
log.Errorf("ZADDInsertJson fail,reason:%v", err)
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
@@ -865,7 +865,7 @@ func (m *RedisModule) ZADDInsert(key string, score float64, Data interface{}) er
|
||||
|
||||
_, err = conn.Do("ZADD", key, score, Data)
|
||||
if err != nil {
|
||||
log.Error("ZADDInsert fail,reason:%v", err)
|
||||
log.Errorf("ZADDInsert fail,reason:%v", err)
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
@@ -1088,7 +1088,7 @@ func (m *RedisModule) HincrbyHashInt(redisKey, hashKey string, value int) error
|
||||
|
||||
_, retErr := conn.Do("HINCRBY", redisKey, hashKey, value)
|
||||
if retErr != nil {
|
||||
log.Error("HincrbyHashInt fail,reason:%v", retErr)
|
||||
log.Errorf("HincrbyHashInt fail,reason:%v", retErr)
|
||||
}
|
||||
|
||||
return retErr
|
||||
@@ -1103,7 +1103,7 @@ func (m *RedisModule) EXPlREInsert(key string, TTl int) error {
|
||||
|
||||
_, err = conn.Do("expire", key, TTl)
|
||||
if err != nil {
|
||||
log.Error("expire fail,reason:%v", err)
|
||||
log.Errorf("expire fail,reason:%v", err)
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
@@ -1129,7 +1129,7 @@ func (m *RedisModule) Keys(key string) ([]string, error) {
|
||||
|
||||
ret, err := conn.Do("KEYS", key)
|
||||
if err != nil {
|
||||
log.Error("KEYS fail, reason:%v", err)
|
||||
log.Errorf("KEYS fail, reason:%v", err)
|
||||
return nil, err
|
||||
}
|
||||
retList, ok := ret.([]interface{})
|
||||
|
||||
@@ -9,7 +9,6 @@ import (
|
||||
"github.com/duanhf2012/origin/v2/service"
|
||||
"github.com/duanhf2012/origin/v2/util/bytespool"
|
||||
"github.com/google/uuid"
|
||||
"runtime"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
@@ -140,10 +139,7 @@ func (slf *Client) GetId() string {
|
||||
func (slf *Client) Run() {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
buf := make([]byte, 4096)
|
||||
l := runtime.Stack(buf, false)
|
||||
errString := fmt.Sprint(r)
|
||||
log.Dump(string(buf[:l]), log.String("error", errString))
|
||||
log.StackError(fmt.Sprint(r))
|
||||
}
|
||||
}()
|
||||
|
||||
@@ -156,7 +152,7 @@ func (slf *Client) Run() {
|
||||
slf.tcpConn.SetReadDeadline(slf.tcpService.tcpServer.ReadDeadline)
|
||||
bytes, err := slf.tcpConn.ReadMsg()
|
||||
if err != nil {
|
||||
log.Debug("read client failed", log.ErrorAttr("error", err), log.String("clientId", slf.id))
|
||||
log.Debug("read client failed", log.ErrorField("error", err), log.String("clientId", slf.id))
|
||||
break
|
||||
}
|
||||
data, err := slf.tcpService.process.Unmarshal(slf.id, bytes)
|
||||
|
||||
@@ -129,7 +129,7 @@ func (slf *WSClient) Run() {
|
||||
for {
|
||||
bytes, err := slf.wsConn.ReadMsg()
|
||||
if err != nil {
|
||||
log.Debug("read client id %s is error:%+v", slf.id, err)
|
||||
log.Debugf("read client id %s is error:%+v", slf.id, err)
|
||||
break
|
||||
}
|
||||
data, err := slf.wsService.process.Unmarshal(slf.id, bytes)
|
||||
|
||||
173
util/pattern/pubsub/pubsub.go
Normal file
173
util/pattern/pubsub/pubsub.go
Normal file
@@ -0,0 +1,173 @@
|
||||
package pubsub
|
||||
|
||||
import (
|
||||
"container/list"
|
||||
"sync/atomic"
|
||||
)
|
||||
|
||||
type TopicType int
|
||||
type Key uint64
|
||||
|
||||
type IBaseSubscriber interface {
|
||||
OnSubscribe(key Key)
|
||||
GetKey() Key
|
||||
}
|
||||
|
||||
type ISubscriber interface {
|
||||
IBaseSubscriber
|
||||
OnEvent(ctx ...any)
|
||||
}
|
||||
|
||||
type IPublisher interface {
|
||||
Publish(topic TopicType, ctx ...any)
|
||||
Subscribe(topic TopicType, sub ISubscriber)
|
||||
UnSubscribe(topic TopicType)
|
||||
UnSubscribeKey(key Key)
|
||||
}
|
||||
|
||||
var keyID uint64
|
||||
|
||||
func genKeyID() Key {
|
||||
return Key(atomic.AddUint64(&keyID, 1))
|
||||
}
|
||||
|
||||
type KeyData struct {
|
||||
subscriber ISubscriber
|
||||
topicType TopicType
|
||||
keyElement *list.Element
|
||||
}
|
||||
|
||||
type SubscriberSet map[Key]KeyData
|
||||
type TopicSet map[TopicType]*list.List
|
||||
|
||||
type Publisher struct {
|
||||
subscriberSet SubscriberSet
|
||||
topicSet TopicSet
|
||||
}
|
||||
|
||||
func (set *SubscriberSet) init() {
|
||||
*set = make(SubscriberSet, 64)
|
||||
}
|
||||
|
||||
func (set *SubscriberSet) add(keyElement *list.Element, topicType TopicType, subscriber ISubscriber) {
|
||||
(*set)[keyElement.Value.(Key)] = KeyData{subscriber: subscriber, topicType: topicType, keyElement: keyElement}
|
||||
}
|
||||
|
||||
func (set *SubscriberSet) del(key Key) {
|
||||
delete(*set, key)
|
||||
}
|
||||
|
||||
func (set *SubscriberSet) get(key Key) (KeyData, bool) {
|
||||
keyData, ok := (*set)[key]
|
||||
if !ok {
|
||||
return keyData, false
|
||||
}
|
||||
|
||||
return keyData, true
|
||||
}
|
||||
|
||||
func (set *TopicSet) init() {
|
||||
*set = make(TopicSet, 64)
|
||||
}
|
||||
|
||||
func (set *TopicSet) add(topic TopicType, key Key) *list.Element {
|
||||
keyList := (*set)[topic]
|
||||
if keyList == nil {
|
||||
keyList = list.New()
|
||||
(*set)[topic] = keyList
|
||||
}
|
||||
|
||||
return keyList.PushBack(key)
|
||||
}
|
||||
|
||||
func (set *TopicSet) del(topic TopicType, keyElement *list.Element) {
|
||||
keyList := (*set)[topic]
|
||||
if keyList == nil {
|
||||
return
|
||||
}
|
||||
|
||||
keyList.Remove(keyElement)
|
||||
}
|
||||
|
||||
func (set *TopicSet) foreach(topic TopicType, cb func(key Key)) {
|
||||
keyList := (*set)[topic]
|
||||
if keyList == nil {
|
||||
return
|
||||
}
|
||||
for e := keyList.Front(); e != nil; e = e.Next() {
|
||||
cb(e.Value.(Key))
|
||||
}
|
||||
}
|
||||
|
||||
type BaseSubscriber struct {
|
||||
key Key
|
||||
}
|
||||
|
||||
func (bs *BaseSubscriber) OnSubscribe(key Key) {
|
||||
bs.key = key
|
||||
}
|
||||
|
||||
func (bs *BaseSubscriber) GetKey() Key {
|
||||
return bs.key
|
||||
}
|
||||
|
||||
func (pub *Publisher) lazyInit() {
|
||||
if pub.subscriberSet == nil {
|
||||
pub.subscriberSet.init()
|
||||
}
|
||||
if pub.topicSet == nil {
|
||||
pub.topicSet.init()
|
||||
}
|
||||
}
|
||||
|
||||
func (pub *Publisher) add(topic TopicType, sub ISubscriber) Key {
|
||||
key := genKeyID()
|
||||
ele := pub.topicSet.add(topic, key)
|
||||
pub.subscriberSet.add(ele, topic, sub)
|
||||
|
||||
return key
|
||||
}
|
||||
|
||||
func (pub *Publisher) Publish(topic TopicType, ctx ...any) {
|
||||
pub.lazyInit()
|
||||
pub.topicSet.foreach(topic, func(key Key) {
|
||||
keyData, ok := pub.subscriberSet.get(key)
|
||||
if ok == false {
|
||||
return
|
||||
}
|
||||
keyData.subscriber.OnEvent(ctx...)
|
||||
})
|
||||
}
|
||||
|
||||
func (pub *Publisher) Subscribe(topic TopicType, sub ISubscriber) bool {
|
||||
if topic == 0 {
|
||||
return false
|
||||
}
|
||||
|
||||
pub.lazyInit()
|
||||
sub.OnSubscribe(pub.add(topic, sub))
|
||||
return true
|
||||
}
|
||||
|
||||
func (pub *Publisher) UnSubscribe(topic TopicType) {
|
||||
keyList := pub.topicSet[topic]
|
||||
if keyList == nil {
|
||||
return
|
||||
}
|
||||
|
||||
for e := keyList.Front(); e != nil; e = e.Next() {
|
||||
pub.subscriberSet.del(e.Value.(Key))
|
||||
}
|
||||
|
||||
delete(pub.topicSet, topic)
|
||||
}
|
||||
|
||||
func (pub *Publisher) UnSubscribeKey(key Key) {
|
||||
keyData, ok := pub.subscriberSet.get(key)
|
||||
if ok == false {
|
||||
return
|
||||
}
|
||||
|
||||
pub.topicSet.del(keyData.topicType, keyData.keyElement)
|
||||
pub.subscriberSet.del(key)
|
||||
}
|
||||
54
util/pattern/pubsub/pubsub_test.go
Normal file
54
util/pattern/pubsub/pubsub_test.go
Normal file
@@ -0,0 +1,54 @@
|
||||
package pubsub
|
||||
|
||||
import (
|
||||
"testing"
|
||||
)
|
||||
|
||||
const (
|
||||
Invalid TopicType = iota
|
||||
Topic1
|
||||
Topic2
|
||||
)
|
||||
|
||||
var test *testing.T
|
||||
|
||||
type Subscriber1 struct {
|
||||
BaseSubscriber
|
||||
}
|
||||
|
||||
type Subscriber2 struct {
|
||||
BaseSubscriber
|
||||
}
|
||||
|
||||
func (sub *Subscriber1) OnEvent(ctx ...any) {
|
||||
test.Log("Subscriber1 OnEvent", " key ", sub.GetKey(), ctx)
|
||||
}
|
||||
|
||||
func (sub *Subscriber2) OnEvent(ctx ...any) {
|
||||
test.Log("Subscriber2 OnEvent", " key ", sub.GetKey(), ctx)
|
||||
}
|
||||
|
||||
func TestPubSub(t *testing.T) {
|
||||
test = t
|
||||
var publisher Publisher
|
||||
|
||||
// 创建3个订阅者
|
||||
var subscriber []ISubscriber
|
||||
subscriber = append(subscriber, &Subscriber1{}, &Subscriber1{}, &Subscriber2{})
|
||||
|
||||
// 分别注册进Publisher中
|
||||
publisher.Subscribe(Topic1, subscriber[0])
|
||||
publisher.Subscribe(Topic1, subscriber[1])
|
||||
publisher.Subscribe(Topic2, subscriber[2])
|
||||
|
||||
// 发布订阅,两个Subscriber1都会调用OnEvent
|
||||
publisher.Publish(Topic1, 1, 2, 3)
|
||||
|
||||
// 删除订阅,Publish后,只有Subscriber1的key2收到
|
||||
publisher.UnSubscribeKey(subscriber[0].GetKey())
|
||||
publisher.Publish(Topic1, 1, 2, 3)
|
||||
|
||||
// 删除Topic2,Publish将收不到
|
||||
publisher.UnSubscribe(Topic2)
|
||||
publisher.Publish(Topic2, 1)
|
||||
}
|
||||
@@ -32,10 +32,10 @@ func Abs[NumType typ.Signed | typ.Float](Num NumType) NumType {
|
||||
func AddSafe[NumType typ.Number](number1 NumType, number2 NumType) (NumType, bool) {
|
||||
ret := number1 + number2
|
||||
if number2 > 0 && ret < number1 {
|
||||
log.Stack("Calculation overflow", log.Any("number1", number1), log.Any("number2", number2))
|
||||
log.SStackError("Calculation overflow", log.Any("number1", number1), log.Any("number2", number2))
|
||||
return ret, false
|
||||
} else if number2 < 0 && ret > number1 {
|
||||
log.Stack("Calculation overflow", log.Any("number1", number1), log.Any("number2", number2))
|
||||
log.SStackError("Calculation overflow", log.Any("number1", number1), log.Any("number2", number2))
|
||||
return ret, false
|
||||
}
|
||||
|
||||
@@ -45,10 +45,10 @@ func AddSafe[NumType typ.Number](number1 NumType, number2 NumType) (NumType, boo
|
||||
func SubSafe[NumType typ.Number](number1 NumType, number2 NumType) (NumType, bool) {
|
||||
ret := number1 - number2
|
||||
if number2 > 0 && ret > number1 {
|
||||
log.Stack("Calculation overflow", log.Any("number1", number1), log.Any("number2", number2))
|
||||
log.SStackError("Calculation overflow", log.Any("number1", number1), log.Any("number2", number2))
|
||||
return ret, false
|
||||
} else if number2 < 0 && ret < number1 {
|
||||
log.Stack("Calculation overflow", log.Any("number1", number1), log.Any("number2", number2))
|
||||
log.SStackError("Calculation overflow", log.Any("number1", number1), log.Any("number2", number2))
|
||||
return ret, false
|
||||
}
|
||||
|
||||
@@ -65,7 +65,7 @@ func MulSafe[NumType typ.Number](number1 NumType, number2 NumType) (NumType, boo
|
||||
return ret, true
|
||||
}
|
||||
|
||||
log.Stack("Calculation overflow", log.Any("number1", number1), log.Any("number2", number2))
|
||||
log.SStackError("Calculation overflow", log.Any("number1", number1), log.Any("number2", number2))
|
||||
return ret, true
|
||||
}
|
||||
|
||||
|
||||
@@ -124,10 +124,7 @@ func (t *Timer) IsOpen() bool {
|
||||
func (t *Timer) Do() {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
buf := make([]byte, 4096)
|
||||
l := runtime.Stack(buf, false)
|
||||
errString := fmt.Sprint(r)
|
||||
log.Dump(string(buf[:l]), log.String("error", errString))
|
||||
log.StackError(fmt.Sprint(r))
|
||||
}
|
||||
}()
|
||||
|
||||
@@ -210,10 +207,7 @@ func (c *Cron) Reset() {
|
||||
func (c *Cron) Do() {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
buf := make([]byte, 4096)
|
||||
l := runtime.Stack(buf, false)
|
||||
errString := fmt.Sprint(r)
|
||||
log.Dump(string(buf[:l]), log.String("error", errString))
|
||||
log.StackError(fmt.Sprint(r))
|
||||
}
|
||||
}()
|
||||
|
||||
@@ -266,10 +260,7 @@ func (c *Cron) UnRef() {
|
||||
func (c *Ticker) Do() {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
buf := make([]byte, 4096)
|
||||
l := runtime.Stack(buf, false)
|
||||
errString := fmt.Sprint(r)
|
||||
log.Dump(string(buf[:l]), log.String("error", errString))
|
||||
log.StackError(fmt.Sprint(r))
|
||||
}
|
||||
}()
|
||||
|
||||
|
||||
Reference in New Issue
Block a user