Compare commits

...

24 Commits

Author SHA1 Message Date
boyce
af15615345 优化日志生成路径 2025-03-14 18:03:01 +08:00
boyce
50dd80b082 整理代码 2025-03-10 11:35:19 +08:00
boyce
a6487dd41e 将默认日志改为rotatelogs 2025-01-25 00:14:18 +08:00
boyce
d5299294d8 优化日志,新增rotatelogs库支持 2025-01-25 00:04:31 +08:00
boyce
4d36e525a5 优化配置读取,去消默认cluster目录 2025-01-16 13:45:06 +08:00
duanhf2012
3a4350769c 新增etcd认证配置 2025-01-08 18:11:20 +08:00
duanhf2012
d4966ea129 优化ws模块 2024-12-17 14:46:00 +08:00
duanhf2012
3b10eeb792 优化日志 2024-12-16 18:00:26 +08:00
duanhf2012
e3275e9f2a 优化模块释放顺序 2024-12-11 18:31:37 +08:00
duanhf2012
16745b34f0 优化日志 2024-12-11 17:49:59 +08:00
duanhf2012
f34dc7d53f 优化日志自定义Writer 2024-12-11 17:24:06 +08:00
duanhf2012
0a09dc2fee 优化日志 2024-12-11 17:14:29 +08:00
duanhf2012
f01a93c446 优化日志 2024-12-11 17:03:21 +08:00
duanhf2012
4d2ab4ee4f 优化代码 2024-12-11 16:44:09 +08:00
duanhf2012
ffcc5a3489 1.优化服务配置检查
2.废弃SetGoRoutineNum接口
3.释放Module优化
2024-12-06 16:05:25 +08:00
duanhf2012
cf6ca0483b Merge branch 'v2' of https://github.com/duanhf2012/origin into v2 2024-12-05 10:19:24 +08:00
duanhf2012
97a21e6f71 新增Skip接口 2024-12-05 10:19:15 +08:00
duanhf2012
f60a55d03a 优化异步RPC,去掉error返回值 2024-12-04 18:33:53 +08:00
duanhf2012
2c32d6eec9 RPC与日志优化 2024-12-03 17:21:21 +08:00
duanhf2012
da45f97fa8 优化日志 2024-11-29 15:55:35 +08:00
duanhf2012
d29abc0813 新增配置文件环境变量支持 2024-11-29 14:39:04 +08:00
duanhf2012
c9507f9ee9 替换slog日志为zap 2024-11-29 13:47:51 +08:00
duanhf2012
61de4bba3a 替换slog日志为zap 2024-11-29 13:47:27 +08:00
duanhf2012
000853b479 网络库优化 2024-11-25 17:44:08 +08:00
44 changed files with 715 additions and 1081 deletions

View File

@@ -40,8 +40,6 @@ type NodeInfo struct {
DiscoveryService []DiscoveryService //筛选发现的服务,如果不配置,不进行筛选
status NodeStatus
Retire bool
NetworkName string
}
type NodeRpcInfo struct {
@@ -250,7 +248,7 @@ func (cls *Cluster) Init(localNodeId string, setupServiceFun SetupServiceFun) er
//2.安装服务发现结点
err = cls.setupDiscovery(localNodeId, setupServiceFun)
if err != nil {
log.Error("setupDiscovery fail", log.ErrorAttr("err", err))
log.Error("setupDiscovery fail", log.ErrorField("err", err))
return err
}
service.RegRpcEventFun = cls.RegRpcEvent

View File

@@ -3,24 +3,23 @@ package cluster
import "github.com/duanhf2012/origin/v2/rpc"
type ConfigDiscovery struct {
funDelNode FunDelNode
funSetNode FunSetNode
funDelNode FunDelNode
funSetNode FunSetNode
localNodeId string
}
func (discovery *ConfigDiscovery) InitDiscovery(localNodeId string,funDelNode FunDelNode,funSetNode FunSetNode) error{
func (discovery *ConfigDiscovery) InitDiscovery(localNodeId string, funDelNode FunDelNode, funSetNode FunSetNode) error {
discovery.localNodeId = localNodeId
discovery.funDelNode = funDelNode
discovery.funSetNode = funSetNode
//解析本地其他服务配置
_,nodeInfoList,_,err := GetCluster().readLocalClusterConfig(rpc.NodeIdNull)
_, nodeInfoList, _, err := GetCluster().readLocalClusterConfig(rpc.NodeIdNull)
if err != nil {
return err
}
for _,nodeInfo := range nodeInfoList {
for _, nodeInfo := range nodeInfoList {
if nodeInfo.NodeId == localNodeId {
continue
}
@@ -30,5 +29,3 @@ func (discovery *ConfigDiscovery) InitDiscovery(localNodeId string,funDelNode Fu
return nil
}

View File

@@ -5,17 +5,17 @@ import (
"github.com/duanhf2012/origin/v2/service"
)
func (cls *Cluster) setupDiscovery(localNodeId string, setupServiceFun SetupServiceFun) error{
func (cls *Cluster) setupDiscovery(localNodeId string, setupServiceFun SetupServiceFun) error {
if cls.discoveryInfo.getDiscoveryType() == OriginType { //origin类型服务发现
return cls.setupOriginDiscovery(localNodeId,setupServiceFun)
}else if cls.discoveryInfo.getDiscoveryType() == EtcdType{//etcd类型服务发现
return cls.setupEtcdDiscovery(localNodeId,setupServiceFun)
return cls.setupOriginDiscovery(localNodeId, setupServiceFun)
} else if cls.discoveryInfo.getDiscoveryType() == EtcdType { //etcd类型服务发现
return cls.setupEtcdDiscovery(localNodeId, setupServiceFun)
}
return cls.setupConfigDiscovery(localNodeId,setupServiceFun)
return cls.setupConfigDiscovery(localNodeId, setupServiceFun)
}
func (cls *Cluster) setupOriginDiscovery(localNodeId string, setupServiceFun SetupServiceFun) error{
func (cls *Cluster) setupOriginDiscovery(localNodeId string, setupServiceFun SetupServiceFun) error {
if cls.serviceDiscovery != nil {
return errors.New("service discovery has been setup")
}
@@ -27,6 +27,7 @@ func (cls *Cluster) setupOriginDiscovery(localNodeId string, setupServiceFun Set
}
cls.serviceDiscovery = getOriginDiscovery()
//2.如果为动态服务发现安装本地发现服务
if localMaster == true {
setupServiceFun(&masterService)
@@ -36,11 +37,10 @@ func (cls *Cluster) setupOriginDiscovery(localNodeId string, setupServiceFun Set
setupServiceFun(&clientService)
cls.AddDiscoveryService(OriginDiscoveryClientName, true)
return nil
}
func (cls *Cluster) setupEtcdDiscovery(localNodeId string, setupServiceFun SetupServiceFun) error{
func (cls *Cluster) setupEtcdDiscovery(localNodeId string, setupServiceFun SetupServiceFun) error {
if cls.serviceDiscovery != nil {
return errors.New("service discovery has been setup")
}
@@ -48,12 +48,12 @@ func (cls *Cluster) setupEtcdDiscovery(localNodeId string, setupServiceFun Setup
//setup etcd service
cls.serviceDiscovery = getEtcdDiscovery()
setupServiceFun(cls.serviceDiscovery.(service.IService))
cls.AddDiscoveryService(cls.serviceDiscovery.(service.IService).GetName(),false)
cls.AddDiscoveryService(cls.serviceDiscovery.(service.IService).GetName(), false)
return nil
}
func (cls *Cluster) setupConfigDiscovery(localNodeId string, setupServiceFun SetupServiceFun) error{
func (cls *Cluster) setupConfigDiscovery(localNodeId string, setupServiceFun SetupServiceFun) error {
if cls.serviceDiscovery != nil {
return errors.New("service discovery has been setup")
}

View File

@@ -1,6 +1,11 @@
package cluster
import (
"context"
"crypto/tls"
"crypto/x509"
"errors"
"fmt"
"github.com/duanhf2012/origin/v2/event"
"github.com/duanhf2012/origin/v2/log"
"github.com/duanhf2012/origin/v2/rpc"
@@ -9,16 +14,11 @@ import (
"go.etcd.io/etcd/api/v3/mvccpb"
"go.etcd.io/etcd/client/v3"
"google.golang.org/protobuf/proto"
"time"
"context"
"errors"
"fmt"
"go.uber.org/zap"
"os"
"path"
"runtime"
"strings"
"sync/atomic"
"time"
)
const originDir = "/origin"
@@ -42,8 +42,13 @@ type EtcdDiscoveryService struct {
mapDiscoveryNodeId map[string]map[string]struct{} //map[networkName]map[nodeId]
}
var etcdDiscovery *EtcdDiscoveryService
func getEtcdDiscovery() IServiceDiscovery {
etcdDiscovery := &EtcdDiscoveryService{}
if etcdDiscovery == nil {
etcdDiscovery = &EtcdDiscoveryService{}
}
return etcdDiscovery
}
@@ -89,21 +94,49 @@ func (ed *EtcdDiscoveryService) OnInit() error {
}
for i := 0; i < len(etcdDiscoveryCfg.EtcdList); i++ {
client, cerr := clientv3.New(clientv3.Config{
var client *clientv3.Client
var tlsConfig *tls.Config
if etcdDiscoveryCfg.EtcdList[i].Cert != "" {
// load cert
cert, cErr := tls.LoadX509KeyPair(etcdDiscoveryCfg.EtcdList[i].Cert, etcdDiscoveryCfg.EtcdList[i].CertKey)
if cErr != nil {
log.Error("load cert error", log.ErrorField("err", cErr))
return cErr
}
// load root ca
caData, cErr := os.ReadFile(etcdDiscoveryCfg.EtcdList[i].Ca)
if cErr != nil {
log.Error("load root ca error", log.ErrorField("err", cErr))
return cErr
}
pool := x509.NewCertPool()
pool.AppendCertsFromPEM(caData)
tlsConfig = &tls.Config{
Certificates: []tls.Certificate{cert},
RootCAs: pool,
}
}
client, err = clientv3.New(clientv3.Config{
Endpoints: etcdDiscoveryCfg.EtcdList[i].Endpoints,
DialTimeout: etcdDiscoveryCfg.DialTimeoutMillisecond,
Logger: zap.NewNop(),
Username: etcdDiscoveryCfg.EtcdList[i].UserName,
Password: etcdDiscoveryCfg.EtcdList[i].Password,
Logger: log.GetLogger().Logger,
TLS: tlsConfig,
})
if cerr != nil {
log.Error("etcd discovery init fail", log.ErrorAttr("err", cerr))
return cerr
if err != nil {
log.Error("etcd discovery init fail", log.ErrorField("err", err))
return err
}
ctx, _ := context.WithTimeout(context.Background(), time.Second*3)
_, err = client.Leases(ctx)
if err != nil {
log.Error("etcd discovery init fail", log.Any("endpoint", etcdDiscoveryCfg.EtcdList[i].Endpoints), log.ErrorAttr("err", err))
log.Error("etcd discovery init fail", log.Any("endpoint", etcdDiscoveryCfg.EtcdList[i].Endpoints), log.ErrorField("err", err))
return err
}
@@ -128,7 +161,7 @@ func (ed *EtcdDiscoveryService) registerServiceByClient(client *clientv3.Client,
var resp *clientv3.LeaseGrantResponse
resp, err = client.Grant(context.Background(), cluster.GetEtcdDiscovery().TTLSecond)
if err != nil {
log.Error("etcd registerService fail", log.ErrorAttr("err", err))
log.Error("etcd registerService fail", log.ErrorField("err", err))
ed.tryRegisterService(client, etcdClient)
return
}
@@ -138,7 +171,7 @@ func (ed *EtcdDiscoveryService) registerServiceByClient(client *clientv3.Client,
// 注册服务节点到 etcd
_, err = client.Put(context.Background(), ed.getRegisterKey(watchKey), ed.byteLocalNodeInfo, clientv3.WithLease(resp.ID))
if err != nil {
log.Error("etcd Put fail", log.ErrorAttr("err", err))
log.Error("etcd Put fail", log.ErrorField("err", err))
ed.tryRegisterService(client, etcdClient)
return
}
@@ -146,7 +179,7 @@ func (ed *EtcdDiscoveryService) registerServiceByClient(client *clientv3.Client,
etcdClient.keepAliveChan, err = client.KeepAlive(context.Background(), etcdClient.leaseID)
if err != nil {
log.Error("etcd KeepAlive fail", log.ErrorAttr("err", err))
log.Error("etcd KeepAlive fail", log.ErrorField("err", err))
ed.tryRegisterService(client, etcdClient)
return
}
@@ -200,7 +233,7 @@ func (ed *EtcdDiscoveryService) retire() error {
// 注册服务节点到 etcd
_, err := c.Put(context.Background(), ed.getRegisterKey(watchKey), ed.byteLocalNodeInfo, clientv3.WithLease(ec.leaseID))
if err != nil {
log.Error("etcd Put fail", log.ErrorAttr("err", err))
log.Error("etcd Put fail", log.ErrorField("err", err))
return err
}
}
@@ -285,12 +318,12 @@ func (ed *EtcdDiscoveryService) setNodeInfo(networkName string, nodeInfo *rpc.No
func (ed *EtcdDiscoveryService) close() {
for c, ec := range ed.mapClient {
if _, err := c.Revoke(context.Background(), ec.leaseID); err != nil {
log.Error("etcd Revoke fail", log.ErrorAttr("err", err))
log.Error("etcd Revoke fail", log.ErrorField("err", err))
}
c.Watcher.Close()
err := c.Close()
if err != nil {
log.Error("etcd Close fail", log.ErrorAttr("err", err))
log.Error("etcd Close fail", log.ErrorField("err", err))
}
}
}
@@ -299,7 +332,7 @@ func (ed *EtcdDiscoveryService) getServices(client *clientv3.Client, etcdClient
// 根据前缀获取现有的key
resp, err := client.Get(context.Background(), watchKey, clientv3.WithPrefix())
if err != nil {
log.Error("etcd Get fail", log.ErrorAttr("err", err))
log.Error("etcd Get fail", log.ErrorField("err", err))
ed.tryWatch(client, etcdClient)
return false
}
@@ -322,11 +355,7 @@ func (ed *EtcdDiscoveryService) watchByClient(client *clientv3.Client, etcdClien
func (ed *EtcdDiscoveryService) watcher(client *clientv3.Client, etcdClient *etcdClientInfo, watchKey string) {
defer func() {
if r := recover(); r != nil {
buf := make([]byte, 4096)
l := runtime.Stack(buf, false)
errString := fmt.Sprint(r)
log.Dump(string(buf[:l]), log.String("error", errString))
log.StackError(fmt.Sprint(r))
ed.tryWatch(client, etcdClient)
}
}()
@@ -355,7 +384,7 @@ func (ed *EtcdDiscoveryService) setNode(netWorkName string, byteNode []byte) str
var nodeInfo rpc.NodeInfo
err := proto.Unmarshal(byteNode, &nodeInfo)
if err != nil {
log.Error("Unmarshal fail", log.String("netWorkName", netWorkName), log.ErrorAttr("err", err))
log.Error("Unmarshal fail", log.String("netWorkName", netWorkName), log.ErrorField("err", err))
return ""
}
@@ -494,7 +523,7 @@ func (ed *EtcdDiscoveryService) RPC_ServiceRecord(etcdServiceRecord *service.Etc
ctx, _ := context.WithTimeout(context.Background(), time.Second*3)
lg, err = client.Grant(ctx, etcdServiceRecord.TTLSecond)
if err != nil {
log.Error("etcd record fail,cannot grant lease", log.ErrorAttr("err", err))
log.Error("etcd record fail,cannot grant lease", log.ErrorField("err", err))
return errors.New("cannot grant lease")
}
}
@@ -503,14 +532,14 @@ func (ed *EtcdDiscoveryService) RPC_ServiceRecord(etcdServiceRecord *service.Etc
ctx, _ := context.WithTimeout(context.Background(), time.Second*3)
_, err = client.Put(ctx, path.Join(originDir, etcdServiceRecord.RecordKey), etcdServiceRecord.RecordInfo, clientv3.WithLease(lg.ID))
if err != nil {
log.Error("etcd record fail,cannot put record", log.ErrorAttr("err", err))
log.Error("etcd record fail,cannot put record", log.ErrorField("err", err))
}
return errors.New("cannot put record")
}
_, err = client.Put(context.Background(), path.Join(originDir, etcdServiceRecord.RecordKey), etcdServiceRecord.RecordInfo)
if err != nil {
log.Error("etcd record fail,cannot put record", log.ErrorAttr("err", err))
log.Error("etcd record fail,cannot put record", log.ErrorField("err", err))
return errors.New("cannot put record")
}

View File

@@ -471,7 +471,7 @@ func (dc *OriginDiscoveryClient) OnRelease() {
err := dc.CallNodeWithTimeout(3*time.Second, masterNodeList.MasterNodeList[i].NodeId, UnRegServiceDiscover, &nodeRetireReq, &rpc.Empty{})
if err != nil {
log.Error("call "+UnRegServiceDiscover+" is fail", log.ErrorAttr("err", err))
log.Error("call "+UnRegServiceDiscover+" is fail", log.ErrorField("err", err))
}
}
}
@@ -493,7 +493,7 @@ func (dc *OriginDiscoveryClient) OnRetire() {
err := dc.GoNode(masterNodeList.MasterNodeList[i].NodeId, NodeRetireRpcMethod, &nodeRetireReq)
if err != nil {
log.Error("call "+NodeRetireRpcMethod+" is fail", log.ErrorAttr("err", err))
log.Error("call "+NodeRetireRpcMethod+" is fail", log.ErrorField("err", err))
}
}
}

View File

@@ -7,6 +7,7 @@ import (
"github.com/duanhf2012/origin/v2/rpc"
jsoniter "github.com/json-iterator/go"
"gopkg.in/yaml.v3"
"io/fs"
"os"
"path/filepath"
"strings"
@@ -15,9 +16,15 @@ import (
var json = jsoniter.ConfigCompatibleWithStandardLibrary
type EtcdList struct {
NetworkName []string
Endpoints []string
UserName string
Password string
Cert string
CertKey string
Ca string
}
type EtcdDiscovery struct {
@@ -64,12 +71,8 @@ type NodeInfoList struct {
NodeList []NodeInfo
}
func validConfigFile(f os.DirEntry) bool {
if f.IsDir() == true || (filepath.Ext(f.Name()) != ".json" && filepath.Ext(f.Name()) != ".yml" && filepath.Ext(f.Name()) != ".yaml") {
return false
}
return true
func validConfigFile(f string) bool {
return strings.HasSuffix(f, ".json")|| strings.HasSuffix(f, ".yml") || strings.HasSuffix(f, ".yaml")
}
func yamlToJson(data []byte, v interface{}) ([]byte, error) {
@@ -88,15 +91,16 @@ func yamlToJson(data []byte, v interface{}) ([]byte, error) {
}
func unmarshalConfig(data []byte, v interface{}) error {
if !json.Valid(data) {
envData := []byte(os.ExpandEnv(string(data)))
if !json.Valid(envData) {
var err error
data, err = yamlToJson(data, v)
envData, err = yamlToJson(envData, v)
if err != nil {
return err
}
}
return json.Unmarshal(data, v)
return json.Unmarshal(envData, v)
}
func (d *DiscoveryInfo) getDiscoveryType() DiscoveryType {
@@ -270,32 +274,33 @@ func (cls *Cluster) readLocalClusterConfig(nodeId string) (DiscoveryInfo, []Node
var discoveryInfo DiscoveryInfo
var rpcMode RpcMode
clusterCfgPath := strings.TrimRight(configDir, "/") + "/cluster"
fileInfoList, err := os.ReadDir(clusterCfgPath)
if err != nil {
return discoveryInfo, nil, rpcMode, fmt.Errorf("read dir %s is fail :%+v", clusterCfgPath, err)
}
//读取任何文件,只读符合格式的配置,目录下的文件可以自定义分文件
for _, f := range fileInfoList {
if !validConfigFile(f) {
continue
err := filepath.Walk(configDir, func(path string, info fs.FileInfo, err error)error {
if info.IsDir() {
return nil
}
filePath := strings.TrimRight(strings.TrimRight(clusterCfgPath, "/"), "\\") + "/" + f.Name()
fileNodeInfoList, rErr := cls.ReadClusterConfig(filePath)
if err != nil {
return err
}
if !validConfigFile(info.Name()) {
return nil
}
fileNodeInfoList, rErr := cls.ReadClusterConfig(path)
if rErr != nil {
return discoveryInfo, nil, rpcMode, fmt.Errorf("read file path %s is error:%+v", filePath, rErr)
return fmt.Errorf("read file path %s is error:%+v", path, rErr)
}
err = cls.SetRpcMode(&fileNodeInfoList.RpcMode, &rpcMode)
if err != nil {
return discoveryInfo, nil, rpcMode, err
return err
}
err = discoveryInfo.setDiscovery(&fileNodeInfoList.Discovery)
if err != nil {
return discoveryInfo, nil, rpcMode, err
return err
}
for _, nodeInfo := range fileNodeInfoList.NodeList {
@@ -303,6 +308,12 @@ func (cls *Cluster) readLocalClusterConfig(nodeId string) (DiscoveryInfo, []Node
nodeInfoList = append(nodeInfoList, nodeInfo)
}
}
return nil
})
if err != nil {
return discoveryInfo, nil, rpcMode, err
}
if nodeId != rpc.NodeIdNull && (len(nodeInfoList) != 1) {
@@ -324,32 +335,32 @@ func (cls *Cluster) readLocalClusterConfig(nodeId string) (DiscoveryInfo, []Node
}
func (cls *Cluster) readLocalService(localNodeId string) error {
clusterCfgPath := strings.TrimRight(configDir, "/") + "/cluster"
fileInfoList, err := os.ReadDir(clusterCfgPath)
if err != nil {
return fmt.Errorf("read dir %s is fail :%+v", clusterCfgPath, err)
}
var globalCfg interface{}
publicService := map[string]interface{}{}
nodeService := map[string]interface{}{}
//读取任何文件,只读符合格式的配置,目录下的文件可以自定义分文件
for _, f := range fileInfoList {
if !validConfigFile(f) {
continue
err := filepath.Walk(configDir, func(path string, info fs.FileInfo, err error)error{
if info.IsDir() {
return nil
}
filePath := strings.TrimRight(strings.TrimRight(clusterCfgPath, "/"), "\\") + "/" + f.Name()
currGlobalCfg, serviceConfig, mapNodeService, err := cls.readServiceConfig(filePath)
if err != nil {
continue
return err
}
if !validConfigFile(info.Name()) {
return nil
}
currGlobalCfg, serviceConfig, mapNodeService, err := cls.readServiceConfig(path)
if err != nil {
return err
}
if currGlobalCfg != nil {
//不允许重复的配置global配置
if globalCfg != nil {
return fmt.Errorf("[Global] does not allow repeated configuration in %s", f.Name())
return fmt.Errorf("[Global] does not allow repeated configuration in %s", info.Name())
}
globalCfg = currGlobalCfg
}
@@ -365,7 +376,7 @@ func (cls *Cluster) readLocalService(localNodeId string) error {
pubCfg, ok := serviceConfig[s]
if ok == true {
if _, publicOk := publicService[s]; publicOk == true {
return fmt.Errorf("public service [%s] does not allow repeated configuration in %s", s, f.Name())
return fmt.Errorf("public service [%s] does not allow repeated configuration in %s", s, info.Name())
}
publicService[s] = pubCfg
}
@@ -381,12 +392,17 @@ func (cls *Cluster) readLocalService(localNodeId string) error {
}
if _, nodeOK := nodeService[s]; nodeOK == true {
return fmt.Errorf("NodeService NodeId[%s] Service[%s] does not allow repeated configuration in %s", cls.localNodeInfo.NodeId, s, f.Name())
return fmt.Errorf("NodeService NodeId[%s] Service[%s] does not allow repeated configuration in %s", cls.localNodeInfo.NodeId, s, info.Name())
}
nodeService[s] = nodeCfg
break
}
}
return nil
})
if err != nil {
return err
}
//组合所有的配置
@@ -416,13 +432,12 @@ func (cls *Cluster) readLocalService(localNodeId string) error {
return nil
}
func (cls *Cluster) parseLocalCfg() {
func (cls *Cluster) parseLocalCfg() error{
rpcInfo := NodeRpcInfo{}
rpcInfo.nodeInfo = cls.localNodeInfo
rpcInfo.client = rpc.NewLClient(rpcInfo.nodeInfo.NodeId, &cls.callSet)
cls.mapRpc[cls.localNodeInfo.NodeId] = &rpcInfo
for _, serviceName := range cls.localNodeInfo.ServiceList {
splitServiceName := strings.Split(serviceName, ":")
if len(splitServiceName) == 2 {
@@ -439,8 +454,13 @@ func (cls *Cluster) parseLocalCfg() {
cls.mapServiceNode[serviceName] = make(map[string]struct{})
}
if _,ok:=cls.mapServiceNode[serviceName][cls.localNodeInfo.NodeId];ok {
return fmt.Errorf("duplicate service %s is configured in node %s", serviceName, cls.localNodeInfo.NodeId)
}
cls.mapServiceNode[serviceName][cls.localNodeInfo.NodeId] = struct{}{}
}
return nil
}
func (cls *Cluster) IsNatsMode() bool {
@@ -473,8 +493,7 @@ func (cls *Cluster) InitCfg(localNodeId string) error {
}
//本地配置服务加到全局map信息中
cls.parseLocalCfg()
return nil
return cls.parseLocalCfg()
}
func (cls *Cluster) IsConfigService(serviceName string) bool {

View File

@@ -58,7 +58,7 @@ func (c *Concurrent) AsyncDoByQueue(queueId int64, fn func() bool, cb func(err e
}
if fn == nil && cb == nil {
log.Stack("fn and cb is nil")
log.StackError("fn and cb is nil")
return
}

View File

@@ -6,7 +6,7 @@ import (
"time"
"fmt"
"runtime"
"context"
"github.com/duanhf2012/origin/v2/log"
@@ -192,10 +192,7 @@ breakFor:
func (d *dispatch) DoCallback(cb func(err error)) {
defer func() {
if r := recover(); r != nil {
buf := make([]byte, 4096)
l := runtime.Stack(buf, false)
errString := fmt.Sprint(r)
log.Dump(string(buf[:l]), log.String("error", errString))
log.StackError(fmt.Sprint(r))
}
}()

View File

@@ -5,7 +5,7 @@ import (
"errors"
"fmt"
"runtime"
"github.com/duanhf2012/origin/v2/log"
)
@@ -51,15 +51,13 @@ func (w *worker) run(waitGroup *sync.WaitGroup, t task) {
func (w *worker) exec(t *task) {
defer func() {
if r := recover(); r != nil {
buf := make([]byte, 4096)
l := runtime.Stack(buf, false)
errString := fmt.Sprint(r)
cb := t.cb
t.cb = func(err error) {
cb(errors.New(errString))
}
log.Dump(string(buf[:l]), log.String("error", errString))
log.StackError(errString)
w.endCallFun(true, t)
}
}()

View File

@@ -3,7 +3,6 @@ package event
import (
"fmt"
"github.com/duanhf2012/origin/v2/log"
"runtime"
"sync"
)
@@ -215,10 +214,7 @@ func (handler *EventHandler) Destroy() {
func (processor *EventProcessor) EventHandler(ev IEvent) {
defer func() {
if r := recover(); r != nil {
buf := make([]byte, 4096)
l := runtime.Stack(buf, false)
errString := fmt.Sprint(r)
log.Dump(string(buf[:l]), log.String("error", errString))
log.StackError(fmt.Sprint(r))
}
}()

1
go.mod
View File

@@ -21,6 +21,7 @@ require (
go.mongodb.org/mongo-driver v1.9.1
go.uber.org/zap v1.27.0
google.golang.org/protobuf v1.34.1
gopkg.in/natefinch/lumberjack.v2 v2.2.1
gopkg.in/yaml.v3 v3.0.1
)

2
go.sum
View File

@@ -330,6 +330,8 @@ gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
gopkg.in/natefinch/lumberjack.v2 v2.2.1 h1:bBRl1b0OH9s/DuPhuXpNl+VtCaJXFZ5/uEFST95x9zc=
gopkg.in/natefinch/lumberjack.v2 v2.2.1/go.mod h1:YD8tP3GAjkrDg1eZH7EGmyESg/lsYskCTPBJVb9jqSc=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=

View File

@@ -1,93 +0,0 @@
package log
import (
"strconv"
)
const _size = 9216
type Buffer struct {
bs []byte
//mu sync.Mutex // ensures atomic writes; protects the following fields
}
func (buff *Buffer) Init() {
buff.bs = make([]byte, _size)
}
// AppendByte writes a single byte to the Buffer.
func (buff *Buffer) AppendByte(v byte) {
buff.bs = append(buff.bs, v)
}
func (buff *Buffer) AppendBytes(v []byte) {
buff.bs = append(buff.bs, v...)
}
// AppendString writes a string to the Buffer.
func (buff *Buffer) AppendString(s string) {
buff.bs = append(buff.bs, s...)
}
// AppendInt appends an integer to the underlying buffer (assuming base 10).
func (buff *Buffer) AppendInt(i int64) {
buff.bs = strconv.AppendInt(buff.bs, i, 10)
}
// AppendUint appends an unsigned integer to the underlying buffer (assuming
// base 10).
func (buff *Buffer) AppendUint(i uint64) {
buff.bs = strconv.AppendUint(buff.bs, i, 10)
}
// AppendBool appends a bool to the underlying buffer.
func (buff *Buffer) AppendBool(v bool) {
buff.bs = strconv.AppendBool(buff.bs, v)
}
// AppendFloat appends a float to the underlying buffer. It doesn't quote NaN
// or +/- Inf.
func (buff *Buffer) AppendFloat(f float64, bitSize int) {
buff.bs = strconv.AppendFloat(buff.bs, f, 'f', -1, bitSize)
}
// Len returns the length of the underlying byte slice.
func (buff *Buffer) Len() int {
return len(buff.bs)
}
// Cap returns the capacity of the underlying byte slice.
func (buff *Buffer) Cap() int {
return cap(buff.bs)
}
// Bytes returns a mutable reference to the underlying byte slice.
func (buff *Buffer) Bytes() []byte {
return buff.bs
}
// String returns a string copy of the underlying byte slice.
func (buff *Buffer) String() string {
return string(buff.bs)
}
// Reset resets the underlying byte slice. Subsequent writes re-use the slice's
// backing array.
func (buff *Buffer) Reset() {
buff.bs = buff.bs[:0]
}
// Write implements io.Writer.
func (buff *Buffer) Write(bs []byte) (int, error) {
buff.bs = append(buff.bs, bs...)
return len(bs), nil
}
// TrimNewline trims any final "\n" byte from the end of the buffer.
func (buff *Buffer) TrimNewline() {
if i := len(buff.bs) - 1; i >= 0 {
if buff.bs[i] == '\n' {
buff.bs = buff.bs[:i]
}
}
}

View File

@@ -1,161 +0,0 @@
package log
import (
"context"
"io"
"log/slog"
"path/filepath"
"runtime"
"runtime/debug"
"sync"
)
const defaultSkip = 7
type IOriginHandler interface {
slog.Handler
Lock()
UnLock()
SetSkip(skip int)
GetSkip() int
}
type BaseHandler struct {
addSource bool
w io.Writer
locker sync.Mutex
skip int
}
type OriginTextHandler struct {
BaseHandler
*slog.TextHandler
}
type OriginJsonHandler struct {
BaseHandler
*slog.JSONHandler
}
func (bh *BaseHandler) SetSkip(skip int) {
bh.skip = skip
}
func (bh *BaseHandler) GetSkip() int {
return bh.skip
}
func getStrLevel(level slog.Level) string {
switch level {
case LevelTrace:
return "Trace"
case LevelDebug:
return "Debug"
case LevelInfo:
return "Info"
case LevelWarning:
return "Warning"
case LevelError:
return "Error"
case LevelStack:
return "Stack"
case LevelDump:
return "Dump"
case LevelFatal:
return "Fatal"
}
return ""
}
func defaultReplaceAttr(groups []string, a slog.Attr) slog.Attr {
if a.Key == slog.LevelKey {
level := a.Value.Any().(slog.Level)
a.Value = slog.StringValue(getStrLevel(level))
} else if a.Key == slog.TimeKey && len(groups) == 0 {
a.Value = slog.StringValue(a.Value.Time().Format("2006/01/02 15:04:05"))
} else if a.Key == slog.SourceKey {
source := a.Value.Any().(*slog.Source)
source.File = filepath.Base(source.File)
}
return a
}
func NewOriginTextHandler(level slog.Level, w io.Writer, addSource bool, replaceAttr func([]string, slog.Attr) slog.Attr) slog.Handler {
var textHandler OriginTextHandler
textHandler.addSource = addSource
textHandler.w = w
textHandler.TextHandler = slog.NewTextHandler(w, &slog.HandlerOptions{
AddSource: addSource,
Level: level,
ReplaceAttr: replaceAttr,
})
textHandler.skip = defaultSkip
return &textHandler
}
func (oh *OriginTextHandler) Handle(context context.Context, record slog.Record) error {
oh.Fill(context, &record)
oh.locker.Lock()
defer oh.locker.Unlock()
if record.Level == LevelStack || record.Level == LevelFatal {
err := oh.TextHandler.Handle(context, record)
oh.logStack(&record)
return err
} else if record.Level == LevelDump {
strDump := record.Message
record.Message = "dump info"
err := oh.TextHandler.Handle(context, record)
oh.w.Write([]byte(strDump))
return err
}
return oh.TextHandler.Handle(context, record)
}
func (bh *BaseHandler) logStack(record *slog.Record) {
bh.w.Write(debug.Stack())
}
func (bh *BaseHandler) Lock() {
bh.locker.Lock()
}
func (bh *BaseHandler) UnLock() {
bh.locker.Unlock()
}
func NewOriginJsonHandler(level slog.Level, w io.Writer, addSource bool, replaceAttr func([]string, slog.Attr) slog.Attr) slog.Handler {
var jsonHandler OriginJsonHandler
jsonHandler.addSource = addSource
jsonHandler.w = w
jsonHandler.JSONHandler = slog.NewJSONHandler(w, &slog.HandlerOptions{
AddSource: addSource,
Level: level,
ReplaceAttr: replaceAttr,
})
jsonHandler.skip = defaultSkip
return &jsonHandler
}
func (oh *OriginJsonHandler) Handle(context context.Context, record slog.Record) error {
oh.Fill(context, &record)
if record.Level == LevelStack || record.Level == LevelFatal || record.Level == LevelDump {
record.Add("stack", debug.Stack())
}
oh.locker.Lock()
defer oh.locker.Unlock()
return oh.JSONHandler.Handle(context, record)
}
func (bh *BaseHandler) Fill(_ context.Context, record *slog.Record) {
if bh.addSource {
var pcs [1]uintptr
runtime.Callers(bh.skip, pcs[:])
record.PC = pcs[0]
}
}

View File

@@ -1,520 +1,401 @@
package log
import (
"context"
"fmt"
"github.com/duanhf2012/origin/v2/util/bytespool"
"io"
"log/slog"
"github.com/duanhf2012/rotatelogs"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"gopkg.in/natefinch/lumberjack.v2"
"os"
"path"
"path/filepath"
"runtime"
"sync"
"sync/atomic"
"strings"
"time"
)
var OpenConsole bool
var LogSize int64
var LogChannelCap int
var LogPath string
var LogLevel = LevelTrace
var gLogger, _ = NewTextLogger(LevelDebug, "", "", true, LogChannelCap)
var isSetLogger bool
var memPool = bytespool.NewMemAreaPool()
// levels
const (
LevelTrace = slog.Level(-8)
LevelDebug = slog.LevelDebug
LevelInfo = slog.LevelInfo
LevelWarning = slog.LevelWarn
LevelError = slog.LevelError
LevelStack = slog.Level(12)
LevelDump = slog.Level(16)
LevelFatal = slog.Level(20)
)
type ILogger interface {
Trace(msg string, args ...any)
Debug(msg string, args ...any)
Info(msg string, args ...any)
Warning(msg string, args ...any)
Error(msg string, args ...any)
Stack(msg string, args ...any)
Dump(msg string, args ...any)
Fatal(msg string, args ...any)
DoSPrintf(level slog.Level, a []interface{})
FormatHeader(buf *Buffer, level slog.Level, callDepth int)
Close()
}
var gLogger = NewDefaultLogger()
var LogLevel zapcore.Level
var MaxSize int
var LogPath string
var OpenConsole *bool
var LogChanLen int
type Logger struct {
SLogger *slog.Logger
*zap.Logger
stack bool
ioWriter IoWriter
sBuff Buffer
FileName string
Skip int
Encoder zapcore.Encoder
SugaredLogger *zap.SugaredLogger
CoreList []zapcore.Core
WriteSyncerFun []func() zapcore.WriteSyncer
}
type IoWriter struct {
outFile io.Writer // destination for output
writeBytes int64
logChannel chan []byte
wg sync.WaitGroup
closeSig chan struct{}
lockWrite sync.Mutex
filePath string
filePrefix string
fileDay int
fileCreateTime int64 //second
}
func (iw *IoWriter) Close() error {
iw.lockWrite.Lock()
defer iw.lockWrite.Unlock()
iw.close()
return nil
}
func (iw *IoWriter) close() error {
if iw.closeSig != nil {
close(iw.closeSig)
iw.closeSig = nil
}
iw.wg.Wait()
if iw.outFile != nil {
err := iw.outFile.(io.Closer).Close()
iw.outFile = nil
return err
}
return nil
}
func (iw *IoWriter) writeFile(p []byte) (n int, err error) {
//switch log file
iw.switchFile()
if iw.outFile != nil {
n, err = iw.outFile.Write(p)
if n > 0 {
atomic.AddInt64(&iw.writeBytes, int64(n))
}
}
return 0, nil
}
func (iw *IoWriter) Write(p []byte) (n int, err error) {
iw.lockWrite.Lock()
defer iw.lockWrite.Unlock()
if iw.logChannel == nil {
return iw.writeIo(p)
}
copyBuff := memPool.MakeBytes(len(p))
if copyBuff == nil {
return 0, fmt.Errorf("MakeByteSlice failed")
}
copy(copyBuff, p)
iw.logChannel <- copyBuff
return
}
func (iw *IoWriter) writeIo(p []byte) (n int, err error) {
n, err = iw.writeFile(p)
if OpenConsole {
n, err = os.Stdout.Write(p)
}
return
}
func (iw *IoWriter) setLogChannel(logChannelNum int) (err error) {
iw.lockWrite.Lock()
defer iw.lockWrite.Unlock()
iw.close()
if logChannelNum == 0 {
return nil
}
//copy iw.logChannel
var logInfo []byte
logChannel := make(chan []byte, logChannelNum)
for i := 0; i < logChannelNum && i < len(iw.logChannel); i++ {
logInfo = <-iw.logChannel
logChannel <- logInfo
}
iw.logChannel = logChannel
iw.closeSig = make(chan struct{})
iw.wg.Add(1)
go iw.run()
return nil
}
func (iw *IoWriter) run() {
defer iw.wg.Done()
Loop:
for {
select {
case <-iw.closeSig:
break Loop
case logs := <-iw.logChannel:
iw.writeIo(logs)
memPool.ReleaseBytes(logs)
}
}
for len(iw.logChannel) > 0 {
logs := <-iw.logChannel
iw.writeIo(logs)
memPool.ReleaseBytes(logs)
}
}
func (iw *IoWriter) isFull() bool {
if LogSize == 0 {
return false
}
return atomic.LoadInt64(&iw.writeBytes) >= LogSize
}
func (logger *Logger) setLogChannel(logChannel int) (err error) {
return logger.ioWriter.setLogChannel(logChannel)
}
func (iw *IoWriter) switchFile() error {
now := time.Now()
if iw.fileCreateTime == now.Unix() {
return nil
}
if iw.fileDay == now.Day() && iw.isFull() == false {
return nil
}
if iw.filePath != "" {
var err error
fileName := fmt.Sprintf("%s%d%02d%02d_%02d_%02d_%02d.log",
iw.filePrefix,
now.Year(),
now.Month(),
now.Day(),
now.Hour(),
now.Minute(),
now.Second())
filePath := path.Join(iw.filePath, fileName)
iw.outFile, err = os.Create(filePath)
if err != nil {
return err
}
iw.fileDay = now.Day()
iw.fileCreateTime = now.Unix()
atomic.StoreInt64(&iw.writeBytes, 0)
}
return nil
}
func GetDefaultHandler() IOriginHandler {
return gLogger.(*Logger).SLogger.Handler().(IOriginHandler)
}
func NewTextLogger(level slog.Level, pathName string, filePrefix string, addSource bool, logChannelCap int) (ILogger, error) {
var logger Logger
logger.ioWriter.filePath = pathName
logger.ioWriter.filePrefix = filePrefix
logger.SLogger = slog.New(NewOriginTextHandler(level, &logger.ioWriter, addSource, defaultReplaceAttr))
logger.setLogChannel(logChannelCap)
err := logger.ioWriter.switchFile()
if err != nil {
return nil, err
}
return &logger, nil
}
func NewJsonLogger(level slog.Level, pathName string, filePrefix string, addSource bool, logChannelCap int) (ILogger, error) {
var logger Logger
logger.ioWriter.filePath = pathName
logger.ioWriter.filePrefix = filePrefix
logger.SLogger = slog.New(NewOriginJsonHandler(level, &logger.ioWriter, true, defaultReplaceAttr))
logger.setLogChannel(logChannelCap)
err := logger.ioWriter.switchFile()
if err != nil {
return nil, err
}
return &logger, nil
}
// Close It's dangerous to call the method on logging
func (logger *Logger) Close() {
logger.ioWriter.Close()
}
func (logger *Logger) Trace(msg string, args ...any) {
logger.SLogger.Log(context.Background(), LevelTrace, msg, args...)
}
func (logger *Logger) Debug(msg string, args ...any) {
logger.SLogger.Log(context.Background(), LevelDebug, msg, args...)
}
func (logger *Logger) Info(msg string, args ...any) {
logger.SLogger.Log(context.Background(), LevelInfo, msg, args...)
}
func (logger *Logger) Warning(msg string, args ...any) {
logger.SLogger.Log(context.Background(), LevelWarning, msg, args...)
}
func (logger *Logger) Error(msg string, args ...any) {
logger.SLogger.Log(context.Background(), LevelError, msg, args...)
}
func (logger *Logger) Stack(msg string, args ...any) {
logger.SLogger.Log(context.Background(), LevelStack, msg, args...)
}
func (logger *Logger) Dump(msg string, args ...any) {
logger.SLogger.Log(context.Background(), LevelDump, msg, args...)
}
func (logger *Logger) Fatal(msg string, args ...any) {
logger.SLogger.Log(context.Background(), LevelFatal, msg, args...)
os.Exit(1)
}
// SetLogger It's non-thread-safe
func SetLogger(logger ILogger) {
if logger != nil && isSetLogger == false {
// 设置Logger
func SetLogger(logger *Logger) {
if logger != nil {
gLogger = logger
isSetLogger = true
}
}
func GetLogger() ILogger {
// 设置ZapLogger
func SetZapLogger(zapLogger *zap.Logger) {
if zapLogger != nil {
gLogger = &Logger{}
gLogger.Logger = zapLogger
isSetLogger = true
}
}
func GetLogger() *Logger {
return gLogger
}
func Trace(msg string, args ...any) {
gLogger.Trace(msg, args...)
func (logger *Logger) SetEncoder(encoder zapcore.Encoder) {
logger.Encoder = encoder
}
func Debug(msg string, args ...any) {
gLogger.Debug(msg, args...)
func (logger *Logger) SetSkip(skip int) {
logger.Skip = skip
}
func Info(msg string, args ...any) {
gLogger.Info(msg, args...)
}
func Warning(msg string, args ...any) {
gLogger.Warning(msg, args...)
}
func Error(msg string, args ...any) {
gLogger.Error(msg, args...)
}
func Stack(msg string, args ...any) {
gLogger.Stack(msg, args...)
}
func Dump(dump string, args ...any) {
gLogger.Dump(dump, args...)
}
func Fatal(msg string, args ...any) {
gLogger.Fatal(msg, args...)
}
func Close() {
gLogger.Close()
}
func ErrorAttr(key string, value error) slog.Attr {
if value == nil {
return slog.Attr{Key: key, Value: slog.StringValue("nil")}
func GetJsonEncoder() zapcore.Encoder {
encoderConfig := zap.NewProductionEncoderConfig()
encoderConfig.EncodeCaller = zapcore.ShortCallerEncoder
encoderConfig.EncodeLevel = zapcore.CapitalLevelEncoder
encoderConfig.EncodeTime = func(t time.Time, enc zapcore.PrimitiveArrayEncoder) {
enc.AppendString(t.Format("2006-01-02 15:04:05.000"))
}
return slog.Attr{Key: key, Value: slog.StringValue(value.Error())}
return zapcore.NewJSONEncoder(encoderConfig)
}
func String(key, value string) slog.Attr {
return slog.Attr{Key: key, Value: slog.StringValue(value)}
func GetTxtEncoder() zapcore.Encoder {
encoderConfig := zap.NewProductionEncoderConfig()
encoderConfig.EncodeLevel = zapcore.CapitalLevelEncoder
encoderConfig.EncodeCaller = zapcore.ShortCallerEncoder
encoderConfig.EncodeTime = func(t time.Time, enc zapcore.PrimitiveArrayEncoder) {
enc.AppendString(t.Format("2006-01-02 15:04:05.000"))
}
return zapcore.NewConsoleEncoder(encoderConfig)
}
func Int(key string, value int) slog.Attr {
return slog.Attr{Key: key, Value: slog.Int64Value(int64(value))}
func (logger *Logger) getLogConfig() *lumberjack.Logger {
return &lumberjack.Logger{
Filename: filepath.Join(LogPath, logger.FileName),
MaxSize: MaxSize,
MaxBackups: 0,
MaxAge: 0,
Compress: false,
LocalTime: true,
}
}
func Int64(key string, value int64) slog.Attr {
return slog.Attr{Key: key, Value: slog.Int64Value(value)}
func NewDefaultLogger() *Logger {
logger := Logger{}
logger.Encoder = GetJsonEncoder()
core := zapcore.NewCore(logger.Encoder, zapcore.AddSync(os.Stdout), zap.InfoLevel)
logger.Logger = zap.New(core, zap.AddCaller(), zap.AddCallerSkip(1))
return &logger
}
func Int32(key string, value int32) slog.Attr {
return slog.Attr{Key: key, Value: slog.Int64Value(int64(value))}
func (logger *Logger) SetSyncers(syncers ...func() zapcore.WriteSyncer) {
logger.WriteSyncerFun = syncers
}
func Int16(key string, value int16) slog.Attr {
return slog.Attr{Key: key, Value: slog.Int64Value(int64(value))}
func (logger *Logger) AppendSyncerFun(syncerFun func() zapcore.WriteSyncer) {
logger.WriteSyncerFun = append(logger.WriteSyncerFun, syncerFun)
}
func Int8(key string, value int8) slog.Attr {
return slog.Attr{Key: key, Value: slog.Int64Value(int64(value))}
func SetLogLevel(level zapcore.Level) {
LogLevel = level
}
func Uint(key string, value uint) slog.Attr {
return slog.Attr{Key: key, Value: slog.Uint64Value(uint64(value))}
func (logger *Logger) Enabled(zapcore.Level) bool {
return logger.stack
}
func Uint64(key string, v uint64) slog.Attr {
return slog.Attr{Key: key, Value: slog.Uint64Value(v)}
func (logger *Logger) NewLumberjackWriter() zapcore.WriteSyncer {
return zapcore.AddSync(
&lumberjack.Logger{
Filename: filepath.Join(LogPath, logger.FileName),
MaxSize: MaxSize,
MaxBackups: 0,
MaxAge: 0,
Compress: false,
LocalTime: true,
})
}
func Uint32(key string, value uint32) slog.Attr {
return slog.Attr{Key: key, Value: slog.Uint64Value(uint64(value))}
func (logger *Logger) NewRotatelogsWriter() zapcore.WriteSyncer {
var options []rotatelogs.Option
if MaxSize > 0 {
options = append(options, rotatelogs.WithRotateMaxSize(int64(MaxSize)))
}
if LogChanLen > 0 {
options = append(options, rotatelogs.WithChannelLen(LogChanLen))
}
options = append(options, rotatelogs.WithRotationTime(time.Hour*24))
fileName := strings.TrimRight(logger.FileName, filepath.Ext(logger.FileName))
rotateLogs, err := rotatelogs.NewRotateLogs(LogPath, "20060102/"+fileName+"_20060102_150405", options...)
if err != nil {
panic(err)
}
return zapcore.AddSync(rotateLogs)
}
func Uint16(key string, value uint16) slog.Attr {
return slog.Attr{Key: key, Value: slog.Uint64Value(uint64(value))}
}
func Uint8(key string, value uint8) slog.Attr {
return slog.Attr{Key: key, Value: slog.Uint64Value(uint64(value))}
}
func Float64(key string, v float64) slog.Attr {
return slog.Attr{Key: key, Value: slog.Float64Value(v)}
}
func Bool(key string, v bool) slog.Attr {
return slog.Attr{Key: key, Value: slog.BoolValue(v)}
}
func Time(key string, v time.Time) slog.Attr {
return slog.Attr{Key: key, Value: slog.TimeValue(v)}
}
func Duration(key string, v time.Duration) slog.Attr {
return slog.Attr{Key: key, Value: slog.DurationValue(v)}
}
func Any(key string, value any) slog.Attr {
return slog.Attr{Key: key, Value: slog.AnyValue(value)}
}
func Group(key string, args ...any) slog.Attr {
return slog.Group(key, args...)
}
func (logger *Logger) DoSPrintf(level slog.Level, a []interface{}) {
if logger.SLogger.Enabled(context.Background(), level) == false {
func (logger *Logger) Init() {
if isSetLogger {
return
}
logger.SLogger.Handler().(IOriginHandler).Lock()
defer logger.SLogger.Handler().(IOriginHandler).UnLock()
logger.sBuff.Reset()
logger.FormatHeader(&logger.sBuff, level, 3)
for _, s := range a {
logger.sBuff.AppendString(slog.AnyValue(s).String())
var syncerList []zapcore.WriteSyncer
if logger.WriteSyncerFun == nil {
syncerList = append(syncerList, logger.NewRotatelogsWriter())
} else {
for _, syncer := range logger.WriteSyncerFun {
syncerList = append(syncerList, syncer())
}
}
logger.sBuff.AppendString("\"\n")
logger.ioWriter.Write(logger.sBuff.Bytes())
}
func (logger *Logger) STrace(a ...interface{}) {
logger.DoSPrintf(LevelTrace, a)
}
func (logger *Logger) SDebug(a ...interface{}) {
logger.DoSPrintf(LevelDebug, a)
}
func (logger *Logger) SInfo(a ...interface{}) {
logger.DoSPrintf(LevelInfo, a)
}
func (logger *Logger) SWarning(a ...interface{}) {
logger.DoSPrintf(LevelWarning, a)
}
func (logger *Logger) SError(a ...interface{}) {
logger.DoSPrintf(LevelError, a)
}
func STrace(a ...interface{}) {
gLogger.DoSPrintf(LevelTrace, a)
}
func SDebug(a ...interface{}) {
gLogger.DoSPrintf(LevelDebug, a)
}
func SInfo(a ...interface{}) {
gLogger.DoSPrintf(LevelInfo, a)
}
func SWarning(a ...interface{}) {
gLogger.DoSPrintf(LevelWarning, a)
}
func SError(a ...interface{}) {
gLogger.DoSPrintf(LevelError, a)
}
func (logger *Logger) FormatHeader(buf *Buffer, level slog.Level, callDepth int) {
t := time.Now()
var file string
var line int
// Release lock while getting caller info - it's expensive.
var ok bool
_, file, line, ok = runtime.Caller(callDepth)
if !ok {
file = "???"
line = 0
var coreList []zapcore.Core
if OpenConsole == nil || *OpenConsole {
syncerList = append(syncerList, zapcore.AddSync(os.Stdout))
}
file = filepath.Base(file)
buf.AppendString("time=\"")
buf.AppendString(t.Format("2006/01/02 15:04:05"))
buf.AppendString("\"")
logger.sBuff.AppendString(" level=")
logger.sBuff.AppendString(getStrLevel(level))
logger.sBuff.AppendString(" source=")
for _, writer := range syncerList {
core := zapcore.NewCore(logger.Encoder, writer, LogLevel)
coreList = append(coreList, core)
}
buf.AppendString(file)
buf.AppendByte(':')
buf.AppendInt(int64(line))
buf.AppendString(" msg=\"")
if logger.CoreList != nil {
coreList = append(coreList, logger.CoreList...)
}
core := zapcore.NewTee(coreList...)
logger.Logger = zap.New(core, zap.AddCaller(), zap.AddStacktrace(logger), zap.AddCallerSkip(1+logger.Skip))
logger.SugaredLogger = logger.Logger.Sugar()
}
func (logger *Logger) Debug(msg string, fields ...zap.Field) {
logger.Logger.Debug(msg, fields...)
}
func (logger *Logger) Info(msg string, fields ...zap.Field) {
logger.Logger.Info(msg, fields...)
}
func (logger *Logger) Warn(msg string, fields ...zap.Field) {
logger.Logger.Warn(msg, fields...)
}
func (logger *Logger) Error(msg string, fields ...zap.Field) {
logger.Logger.Error(msg, fields...)
}
func (logger *Logger) StackError(msg string, args ...zap.Field) {
logger.stack = true
logger.Logger.Log(zapcore.ErrorLevel, msg, args...)
logger.stack = false
}
func (logger *Logger) Fatal(msg string, fields ...zap.Field) {
gLogger.stack = true
logger.Logger.Fatal(msg, fields...)
gLogger.stack = false
}
func Debug(msg string, fields ...zap.Field) {
gLogger.Logger.Debug(msg, fields...)
}
func Info(msg string, fields ...zap.Field) {
gLogger.Logger.Info(msg, fields...)
}
func Warn(msg string, fields ...zap.Field) {
gLogger.Logger.Warn(msg, fields...)
}
func Error(msg string, fields ...zap.Field) {
gLogger.Logger.Error(msg, fields...)
}
func StackError(msg string, fields ...zap.Field) {
gLogger.stack = true
gLogger.Logger.Error(msg, fields...)
gLogger.stack = false
}
func Fatal(msg string, fields ...zap.Field) {
gLogger.stack = true
gLogger.Logger.Fatal(msg, fields...)
gLogger.stack = false
}
func Debugf(template string, args ...any) {
gLogger.SugaredLogger.Debugf(template, args...)
}
func Infof(template string, args ...any) {
gLogger.SugaredLogger.Infof(template, args...)
}
func Warnf(template string, args ...any) {
gLogger.SugaredLogger.Warnf(template, args...)
}
func Errorf(template string, args ...any) {
gLogger.SugaredLogger.Errorf(template, args...)
}
func StackErrorf(template string, args ...any) {
gLogger.stack = true
gLogger.SugaredLogger.Errorf(template, args...)
gLogger.stack = false
}
func Fatalf(template string, args ...any) {
gLogger.SugaredLogger.Fatalf(template, args...)
}
func (logger *Logger) SDebug(args ...interface{}) {
logger.SugaredLogger.Debugln(args...)
}
func (logger *Logger) SInfo(args ...interface{}) {
logger.SugaredLogger.Infoln(args...)
}
func (logger *Logger) SWarn(args ...interface{}) {
logger.SugaredLogger.Warnln(args...)
}
func (logger *Logger) SError(args ...interface{}) {
logger.SugaredLogger.Errorln(args...)
}
func (logger *Logger) SStackError(args ...interface{}) {
gLogger.stack = true
logger.SugaredLogger.Errorln(args...)
gLogger.stack = false
}
func (logger *Logger) SFatal(args ...interface{}) {
gLogger.stack = true
logger.SugaredLogger.Fatalln(args...)
gLogger.stack = false
}
func SDebug(args ...interface{}) {
gLogger.SugaredLogger.Debugln(args...)
}
func SInfo(args ...interface{}) {
gLogger.SugaredLogger.Infoln(args...)
}
func SWarn(args ...interface{}) {
gLogger.SugaredLogger.Warnln(args...)
}
func SError(args ...interface{}) {
gLogger.SugaredLogger.Errorln(args...)
}
func SStackError(args ...interface{}) {
gLogger.stack = true
gLogger.SugaredLogger.Errorln(args...)
gLogger.stack = false
}
func SFatal(args ...interface{}) {
gLogger.stack = true
gLogger.SugaredLogger.Fatalln(args...)
gLogger.stack = false
}
func ErrorField(key string, value error) zap.Field {
if value == nil {
return zap.String(key, "nil")
}
return zap.String(key, value.Error())
}
func String(key, value string) zap.Field {
return zap.String(key, value)
}
func Int(key string, value int) zap.Field {
return zap.Int(key, value)
}
func Int64(key string, value int64) zap.Field {
return zap.Int64(key, value)
}
func Int32(key string, value int32) zap.Field {
return zap.Int32(key, value)
}
func Int16(key string, value int16) zap.Field {
return zap.Int16(key, value)
}
func Int8(key string, value int8) zap.Field {
return zap.Int8(key, value)
}
func Uint(key string, value uint) zap.Field {
return zap.Uint(key, value)
}
func Uint64(key string, v uint64) zap.Field {
return zap.Uint64(key, v)
}
func Uint32(key string, value uint32) zap.Field {
return zap.Uint32(key, value)
}
func Uint16(key string, value uint16) zap.Field {
return zap.Uint16(key, value)
}
func Uint8(key string, value uint8) zap.Field {
return zap.Uint8(key, value)
}
func Float64(key string, v float64) zap.Field {
return zap.Float64(key, v)
}
func Bool(key string, v bool) zap.Field {
return zap.Bool(key, v)
}
func Bools(key string, v []bool) zap.Field {
return zap.Bools(key, v)
}
func Time(key string, v time.Time) zap.Field {
return zap.Time(key, v)
}
func Duration(key string, v time.Duration) zap.Field {
return zap.Duration(key, v)
}
func Durations(key string, v []time.Duration) zap.Field {
return zap.Durations(key, v)
}
func Any(key string, value any) zap.Field {
return zap.Any(key, value)
}

View File

@@ -111,15 +111,15 @@ func (netConn *NetConn) doWrite(b []byte) error {
}
// b must not be modified by the others goroutines
func (netConn *NetConn) Write(b []byte) error {
func (netConn *NetConn) Write(b []byte) (int,error) {
netConn.Lock()
defer netConn.Unlock()
if atomic.LoadInt32(&netConn.closeFlag) == 1 || b == nil {
netConn.ReleaseReadMsg(b)
return errors.New("conn is close")
return 0,errors.New("conn is close")
}
return netConn.doWrite(b)
return len(b),netConn.doWrite(b)
}
func (netConn *NetConn) Read(b []byte) (int, error) {
@@ -150,7 +150,7 @@ func (netConn *NetConn) WriteMsg(args ...[]byte) error {
if atomic.LoadInt32(&netConn.closeFlag) == 1 {
return errors.New("conn is close")
}
return netConn.msgParser.Write(netConn.conn, args...)
return netConn.msgParser.Write(netConn, args...)
}
func (netConn *NetConn) WriteRawMsg(args []byte) error {
@@ -158,7 +158,8 @@ func (netConn *NetConn) WriteRawMsg(args []byte) error {
return errors.New("conn is close")
}
return netConn.Write(args)
_,err:= netConn.Write(args)
return err
}
func (netConn *NetConn) IsConnected() bool {

View File

@@ -106,7 +106,7 @@ func (client *KCPClient) dial() net.Conn {
return conn
}
log.Warning("connect error ", log.String("error", err.Error()), log.String("Addr", client.Addr))
log.Warn("connect error ", log.String("error", err.Error()), log.String("Addr", client.Addr))
time.Sleep(client.ConnectInterval)
continue
}

View File

@@ -203,7 +203,7 @@ func (kp *KCPServer) initSession(session *kcp.UDPSession) {
func (kp *KCPServer) run(listener *kcp.Listener) bool {
conn, err := listener.Accept()
if err != nil {
log.Error("accept error", log.String("ListenAddr", kp.kcpCfg.ListenAddr), log.ErrorAttr("err", err))
log.Error("accept error", log.String("ListenAddr", kp.kcpCfg.ListenAddr), log.ErrorField("err", err))
return false
}
@@ -211,7 +211,7 @@ func (kp *KCPServer) run(listener *kcp.Listener) bool {
if len(kp.conns) >= kp.kcpCfg.MaxConnNum {
kp.mutexConns.Unlock()
conn.Close()
log.Warning("too many connections")
log.Warn("too many connections")
return true
}
kp.conns[conn] = struct{}{}

View File

@@ -102,7 +102,7 @@ func (client *TCPClient) dial() net.Conn {
return conn
}
log.Warning("connect error ", log.String("error", err.Error()), log.String("Addr", client.Addr))
log.Warn("connect error ", log.String("error", err.Error()), log.String("Addr", client.Addr))
time.Sleep(client.ConnectInterval)
continue
}

View File

@@ -146,7 +146,7 @@ func (server *TCPServer) run() {
if len(server.conns) >= server.MaxConnNum {
server.mutexConns.Unlock()
conn.Close()
log.Warning("too many connections")
log.Warn("too many connections")
continue
}

View File

@@ -68,7 +68,7 @@ func (handler *WSHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if len(handler.conns) >= handler.maxConnNum {
handler.mutexConns.Unlock()
conn.Close()
log.Warning("too many connections")
log.Warn("too many connections")
return
}
handler.conns[conn] = struct{}{}

View File

@@ -11,6 +11,7 @@ import (
"github.com/duanhf2012/origin/v2/util/buildtime"
"github.com/duanhf2012/origin/v2/util/sysprocess"
"github.com/duanhf2012/origin/v2/util/timer"
"go.uber.org/zap/zapcore"
"io"
"net/http"
_ "net/http/pprof"
@@ -54,10 +55,10 @@ func init() {
console.RegisterCommandString("retire", "", "<-retire nodeid=nodeid> retire originserver process.", retireNode)
console.RegisterCommandString("config", "", "<-config path> Configuration file path.", setConfigPath)
console.RegisterCommandString("console", "", "<-console true|false> Turn on or off screen log output.", openConsole)
console.RegisterCommandString("loglevel", "debug", "<-loglevel debug|release|warning|error|fatal> Set loglevel.", setLevel)
console.RegisterCommandString("loglevel", "debug", "<-loglevel debug|info|warn|error|stackerror|fatal> Set loglevel.", setLevel)
console.RegisterCommandString("logpath", "", "<-logpath path> Set log file path.", setLogPath)
console.RegisterCommandInt("logsize", 0, "<-logsize size> Set log size(MB).", setLogSize)
console.RegisterCommandInt("logchannelcap", -1, "<-logchannelcap num> Set log channel cap.", setLogChannelCapNum)
console.RegisterCommandInt("logchanlen", 0, "<-logchanlen len> Set log channel len.", setLogChanLen)
console.RegisterCommandString("pprof", "", "<-pprof ip:port> Open performance analysis.", setPprof)
}
@@ -156,12 +157,14 @@ func initNode(id string) {
nodeId = id
err := cluster.GetCluster().Init(GetNodeId(), Setup)
if err != nil {
log.Error("Init cluster fail", log.ErrorAttr("error", err))
log.Error("Init cluster fail", log.ErrorField("error", err))
os.Exit(1)
}
err = initLog()
if err != nil {
log.Error("Init log fail", log.ErrorField("error", err))
os.Exit(1)
return
}
@@ -211,22 +214,24 @@ func initNode(id string) {
}
//3.service初始化
log.Info("Start running server.")
service.Init()
}
func initLog() error {
logger := log.GetLogger()
if log.LogPath == "" {
setLogPath("./log")
err := setLogPath("./log")
if err != nil {
return err
}
}
localNodeInfo := cluster.GetCluster().GetLocalNodeInfo()
filePre := fmt.Sprintf("%s_", localNodeInfo.NodeId)
logger, err := log.NewTextLogger(log.LogLevel, log.LogPath, filePre, true, log.LogChannelCap)
if err != nil {
fmt.Printf("cannot create log file!\n")
return err
}
log.SetLogger(logger)
fileName := fmt.Sprintf("%s.log", localNodeInfo.NodeId)
logger.FileName = fileName
logger.Init()
return nil
}
@@ -323,34 +328,37 @@ func startNode(args interface{}) error {
myName, mErr := sysprocess.GetMyProcessName()
//当前进程名获取失败,不应该发生
if mErr != nil {
log.Info("get my process's name is error", log.ErrorAttr("err", mErr))
log.Error("get my process's name is error", log.ErrorField("err", mErr))
os.Exit(-1)
}
//进程id存在而且进程名也相同被认为是当前进程重复运行
if cErr == nil && name == myName {
log.Info("repeat runs are not allowed", log.String("nodeId", strNodeId), log.Int("processId", processId))
log.Error("repeat runs are not allowed", log.String("nodeId", strNodeId), log.Int("processId", processId))
os.Exit(-1)
}
break
}
//2.记录进程id号
log.Info("Start running server.")
writeProcessPid(strNodeId)
timer.StartTimer(10*time.Millisecond, 1000000)
//3.初始化node
defer log.GetLogger().Logger.Sync()
initNode(strNodeId)
//4.运行service
service.Start()
//5.运行集群
cluster.GetCluster().Start()
err := cluster.GetCluster().Start()
if err != nil {
log.Error(err.Error())
os.Exit(-1)
}
//6.监听程序退出信号&性能报告
var pProfilerTicker *time.Ticker = &time.Ticker{}
if profilerInterval > 0 {
pProfilerTicker = time.NewTicker(profilerInterval)
@@ -378,7 +386,7 @@ func startNode(args interface{}) error {
cluster.GetCluster().Stop()
log.Info("Server is stop.")
log.Close()
return nil
}
@@ -400,8 +408,8 @@ func SetupTemplateFunc(fs ...func() service.IService) {
}
}
func SetupTemplate[T any,P templateServicePoint[T]]() {
SetupTemplateFunc(func() service.IService{
func SetupTemplate[T any, P templateServicePoint[T]]() {
SetupTemplateFunc(func() service.IService {
var t T
return P(&t)
})
@@ -430,9 +438,11 @@ func openConsole(args interface{}) error {
}
strOpen := strings.ToLower(strings.TrimSpace(args.(string)))
if strOpen == "false" {
log.OpenConsole = false
bOpenConsole := false
log.OpenConsole = &bOpenConsole
} else if strOpen == "true" {
log.OpenConsole = true
bOpenConsole := true
log.OpenConsole = &bOpenConsole
} else {
return errors.New("parameter console error")
}
@@ -446,20 +456,18 @@ func setLevel(args interface{}) error {
strlogLevel := strings.TrimSpace(args.(string))
switch strlogLevel {
case "trace":
log.LogLevel = log.LevelTrace
case "debug":
log.LogLevel = log.LevelDebug
log.LogLevel = zapcore.DebugLevel
case "info":
log.LogLevel = log.LevelInfo
case "warning":
log.LogLevel = log.LevelWarning
log.LogLevel = zapcore.InfoLevel
case "warn":
log.LogLevel = zapcore.WarnLevel
case "error":
log.LogLevel = log.LevelError
case "stack":
log.LogLevel = log.LevelStack
log.LogLevel = zapcore.ErrorLevel
case "stackerror":
log.LogLevel = zapcore.ErrorLevel
case "fatal":
log.LogLevel = log.LevelFatal
log.LogLevel = zapcore.FatalLevel
default:
return errors.New("unknown level: " + strlogLevel)
}
@@ -470,52 +478,42 @@ func setLogPath(args interface{}) error {
if args == "" {
return nil
}
log.LogPath = strings.TrimSpace(args.(string))
dir, err := os.Stat(log.LogPath) //这个文件夹不存在
if err == nil && dir.IsDir() == false {
return errors.New("Not found dir " + log.LogPath)
}
logPath := strings.TrimSpace(args.(string))
_, err := os.Stat(logPath)
if err != nil {
err = os.Mkdir(log.LogPath, os.ModePerm)
err = os.MkdirAll(logPath, os.ModePerm)
if err != nil {
return errors.New("Cannot create dir " + log.LogPath)
return errors.New("Cannot create dir " + logPath)
}
}
log.LogPath = logPath
return nil
}
func setLogSize(args interface{}) error {
if args == "" {
return nil
}
logSize, ok := args.(int)
if ok == false {
return errors.New("param logsize is error")
}
if logSize == 0 {
return nil
}
log.LogSize = int64(logSize) * 1024 * 1024
log.MaxSize = logSize
return nil
}
func setLogChannelCapNum(args interface{}) error {
if args == "" {
return nil
}
logChannelCap, ok := args.(int)
func setLogChanLen(args interface{}) error {
logChanLen, ok := args.(int)
if ok == false {
return errors.New("param logsize is error")
}
if logChannelCap == -1 {
if logChanLen == 0 {
return nil
}
log.LogChannelCap = logChannelCap
log.LogChanLen = logChanLen
return nil
}

View File

@@ -75,7 +75,7 @@ func (cs *CallSet) AddPending(call *Call) {
if call.Seq == 0 {
cs.pendingLock.Unlock()
log.Stack("call is error.")
log.StackError("call is error.")
return
}

View File

@@ -32,7 +32,7 @@ type IRealClient interface {
SetConn(conn *network.NetConn)
Close(waitDone bool)
AsyncCall(NodeId string, timeout time.Duration, rpcHandler IRpcHandler, serviceMethod string, callback reflect.Value, args interface{}, replyParam interface{}, cancelable bool) (CancelRpc, error)
AsyncCall(NodeId string, timeout time.Duration, rpcHandler IRpcHandler, serviceMethod string, callback reflect.Value, args interface{}, replyParam interface{}) (CancelRpc, error)
Go(NodeId string, timeout time.Duration, rpcHandler IRpcHandler, noReply bool, serviceMethod string, args interface{}, reply interface{}) *Call
RawGo(NodeId string, timeout time.Duration, rpcHandler IRpcHandler, processor IRpcProcessor, noReply bool, rpcMethodId uint32, serviceMethod string, rawArgs []byte, reply interface{}) *Call
IsConnected() bool
@@ -104,7 +104,7 @@ func (client *Client) processRpcResponse(responseData []byte) error {
//rc.conn.ReleaseReadMsg(bytes)
if err != nil {
processor.ReleaseRpcResponse(response.RpcResponseData)
log.Error("rpcClient Unmarshal head error", log.ErrorAttr("error", err))
log.Error("rpcClient Unmarshal head error", log.ErrorField("error", err))
return nil
}
@@ -116,7 +116,7 @@ func (client *Client) processRpcResponse(responseData []byte) error {
if len(response.RpcResponseData.GetReply()) > 0 {
err = processor.Unmarshal(response.RpcResponseData.GetReply(), v.Reply)
if err != nil {
log.Error("rpcClient Unmarshal body failed", log.ErrorAttr("error", err))
log.Error("rpcClient Unmarshal body failed", log.ErrorField("error", err))
v.Err = err
}
}
@@ -203,7 +203,7 @@ func (client *Client) rawGo(nodeId string, w IWriter, timeout time.Duration, rpc
}
if err != nil {
client.RemovePending(call.Seq)
log.Error("WriteMsg is fail", log.ErrorAttr("error", err))
log.Error("WriteMsg is fail", log.ErrorField("error", err))
call.Seq = 0
call.DoError(err)
}
@@ -211,7 +211,7 @@ func (client *Client) rawGo(nodeId string, w IWriter, timeout time.Duration, rpc
return call
}
func (client *Client) asyncCall(nodeId string, w IWriter, timeout time.Duration, rpcHandler IRpcHandler, serviceMethod string, callback reflect.Value, args interface{}, replyParam interface{}, cancelable bool) (CancelRpc, error) {
func (client *Client) asyncCall(nodeId string, w IWriter, timeout time.Duration, rpcHandler IRpcHandler, serviceMethod string, callback reflect.Value, args interface{}, replyParam interface{}) (CancelRpc, error) {
processorType, processor := GetProcessorType(args)
InParam, herr := processor.Marshal(args)
if herr != nil {
@@ -264,10 +264,7 @@ func (client *Client) asyncCall(nodeId string, w IWriter, timeout time.Duration,
return emptyCancelRpc, err
}
if cancelable {
rpcCancel := RpcCancel{CallSeq: seq, Cli: client}
return rpcCancel.CancelRpc, nil
}
return emptyCancelRpc, nil
rpcCancel := RpcCancel{CallSeq: seq, Cli: client}
return rpcCancel.CancelRpc, nil
}

View File

@@ -90,7 +90,7 @@ func (lc *LClient) RawGo(nodeId string, timeout time.Duration, rpcHandler IRpcHa
return pLocalRpcServer.selfNodeRpcHandlerGo(timeout, processor, lc.selfClient, true, serviceName, rpcMethodId, serviceName, nil, nil, rawArgs)
}
func (lc *LClient) AsyncCall(nodeId string, timeout time.Duration, rpcHandler IRpcHandler, serviceMethod string, callback reflect.Value, args interface{}, reply interface{}, cancelable bool) (CancelRpc, error) {
func (lc *LClient) AsyncCall(nodeId string, timeout time.Duration, rpcHandler IRpcHandler, serviceMethod string, callback reflect.Value, args interface{}, reply interface{}) (CancelRpc, error) {
pLocalRpcServer := rpcHandler.GetRpcServer()()
//判断是否是同一服务
@@ -109,7 +109,7 @@ func (lc *LClient) AsyncCall(nodeId string, timeout time.Duration, rpcHandler IR
}
//其他的rpcHandler的处理器
cancelRpc, err := pLocalRpcServer.selfNodeRpcHandlerAsyncGo(timeout, lc.selfClient, rpcHandler, false, serviceName, serviceMethod, args, reply, callback, cancelable)
cancelRpc, err := pLocalRpcServer.selfNodeRpcHandlerAsyncGo(timeout, lc.selfClient, rpcHandler, false, serviceName, serviceMethod, args, reply, callback)
if err != nil {
callback.Call([]reflect.Value{reflect.ValueOf(reply), reflect.ValueOf(err)})
}
@@ -121,9 +121,6 @@ func NewLClient(localNodeId string, callSet *CallSet) *Client {
client := &Client{}
client.clientId = atomic.AddUint32(&clientSeq, 1)
client.targetNodeId = localNodeId
//client.maxCheckCallRpcCount = DefaultMaxCheckCallRpcCount
//client.callRpcTimeout = DefaultRpcTimeout
lClient := &LClient{}
lClient.selfClient = client
client.IRealClient = lClient

View File

@@ -42,7 +42,7 @@ func (server *BaseServer) selfNodeRpcHandlerGo(timeout time.Duration, processor
rpcHandler := server.rpcHandleFinder.FindRpcHandler(handlerName)
if rpcHandler == nil {
err := errors.New("service method " + serviceMethod + " not config!")
log.Error("service method not config", log.String("serviceMethod", serviceMethod), log.ErrorAttr("error", err))
log.Error("service method not config", log.String("serviceMethod", serviceMethod), log.ErrorField("error", err))
pCall.Seq = 0
pCall.DoError(err)
@@ -74,7 +74,7 @@ func (server *BaseServer) selfNodeRpcHandlerGo(timeout time.Duration, processor
var err error
req.inParam, err = rpcHandler.UnmarshalInParam(processor, serviceMethod, rpcMethodId, rawArgs)
if err != nil {
log.Error("unmarshalInParam is failed", log.String("serviceMethod", serviceMethod), log.Uint32("rpcMethodId", rpcMethodId), log.ErrorAttr("error", err))
log.Error("unmarshalInParam is failed", log.String("serviceMethod", serviceMethod), log.Uint32("rpcMethodId", rpcMethodId), log.ErrorField("error", err))
pCall.Seq = 0
pCall.DoError(err)
ReleaseRpcRequest(req)
@@ -90,12 +90,12 @@ func (server *BaseServer) selfNodeRpcHandlerGo(timeout time.Duration, processor
byteReturns, err := req.rpcProcessor.Marshal(Returns)
if err != nil {
Err = ConvertError(err)
log.Error("returns data cannot be marshal", log.Uint64("seq", callSeq), log.ErrorAttr("error", err))
log.Error("returns data cannot be marshal", log.Uint64("seq", callSeq), log.ErrorField("error", err))
} else {
err = req.rpcProcessor.Unmarshal(byteReturns, reply)
if err != nil {
Err = ConvertError(err)
log.Error("returns data cannot be Unmarshal", log.Uint64("seq", callSeq), log.ErrorAttr("error", err))
log.Error("returns data cannot be Unmarshal", log.Uint64("seq", callSeq), log.ErrorField("error", err))
}
}
}
@@ -127,7 +127,7 @@ func (server *BaseServer) selfNodeRpcHandlerGo(timeout time.Duration, processor
return pCall
}
func (server *BaseServer) selfNodeRpcHandlerAsyncGo(timeout time.Duration, client *Client, callerRpcHandler IRpcHandler, noReply bool, handlerName string, serviceMethod string, args interface{}, reply interface{}, callback reflect.Value, cancelable bool) (CancelRpc, error) {
func (server *BaseServer) selfNodeRpcHandlerAsyncGo(timeout time.Duration, client *Client, callerRpcHandler IRpcHandler, noReply bool, handlerName string, serviceMethod string, args interface{}, reply interface{}, callback reflect.Value) (CancelRpc, error) {
rpcHandler := server.rpcHandleFinder.FindRpcHandler(handlerName)
if rpcHandler == nil {
err := errors.New("service method " + serviceMethod + " not config!")
@@ -266,7 +266,7 @@ func (server *BaseServer) processRpcRequest(data []byte, connTag string, wrRespo
req.inParam, err = rpcHandler.UnmarshalInParam(req.rpcProcessor, req.RpcRequestData.GetServiceMethod(), req.RpcRequestData.GetRpcMethodId(), req.RpcRequestData.GetInParam())
if err != nil {
rErr := "Call Rpc " + req.RpcRequestData.GetServiceMethod() + " Param error " + err.Error()
log.Error("call rpc param error", log.String("serviceMethod", req.RpcRequestData.GetServiceMethod()), log.ErrorAttr("error", err))
log.Error("call rpc param error", log.String("serviceMethod", req.RpcRequestData.GetServiceMethod()), log.ErrorField("error", err))
if req.requestHandle != nil {
req.requestHandle(nil, RpcError(rErr))
} else {

View File

@@ -50,7 +50,7 @@ func (nc *NatsClient) Go(nodeId string, timeout time.Duration, rpcHandler IRpcHa
_, processor := GetProcessorType(args)
InParam, err := processor.Marshal(args)
if err != nil {
log.Error("Marshal is fail", log.ErrorAttr("error", err))
log.Error("Marshal is fail", log.ErrorField("error", err))
call := MakeCall()
call.DoError(err)
return call
@@ -63,8 +63,8 @@ func (nc *NatsClient) RawGo(nodeId string, timeout time.Duration, rpcHandler IRp
return nc.client.rawGo(nodeId, nc, timeout, rpcHandler, processor, noReply, rpcMethodId, serviceMethod, rawArgs, reply)
}
func (nc *NatsClient) AsyncCall(nodeId string, timeout time.Duration, rpcHandler IRpcHandler, serviceMethod string, callback reflect.Value, args interface{}, replyParam interface{}, cancelable bool) (CancelRpc, error) {
cancelRpc, err := nc.client.asyncCall(nodeId, nc, timeout, rpcHandler, serviceMethod, callback, args, replyParam, cancelable)
func (nc *NatsClient) AsyncCall(nodeId string, timeout time.Duration, rpcHandler IRpcHandler, serviceMethod string, callback reflect.Value, args interface{}, replyParam interface{}) (CancelRpc, error) {
cancelRpc, err := nc.client.asyncCall(nodeId, nc, timeout, rpcHandler, serviceMethod, callback, args, replyParam)
if err != nil {
callback.Call([]reflect.Value{reflect.ValueOf(replyParam), reflect.ValueOf(err)})
}

View File

@@ -48,7 +48,7 @@ func (ns *NatsServer) Start() error {
ns.natsConn, err = nats.Connect(ns.natsUrl, options...)
if err != nil {
log.Error("Connect to nats fail", log.String("natsUrl", ns.natsUrl), log.ErrorAttr("err", err))
log.Error("Connect to nats fail", log.String("natsUrl", ns.natsUrl), log.ErrorField("err", err))
return err
}
@@ -77,7 +77,7 @@ func (ns *NatsServer) WriteResponse(processor IRpcProcessor, nodeId string, serv
defer processor.ReleaseRpcResponse(rpcResponse.RpcResponseData)
if err != nil {
log.Error("marshal RpcResponseData failed", log.String("serviceMethod", serviceMethod), log.ErrorAttr("error", err))
log.Error("marshal RpcResponseData failed", log.String("serviceMethod", serviceMethod), log.ErrorField("error", err))
return
}
@@ -86,7 +86,7 @@ func (ns *NatsServer) WriteResponse(processor IRpcProcessor, nodeId string, serv
if ns.compressBytesLen > 0 && len(bytes) >= ns.compressBytesLen {
compressBuff, err = compressor.CompressBlock(bytes)
if err != nil {
log.Error("CompressBlock failed", log.String("serviceMethod", serviceMethod), log.ErrorAttr("error", err))
log.Error("CompressBlock failed", log.String("serviceMethod", serviceMethod), log.ErrorField("error", err))
return
}
if len(compressBuff) < len(bytes) {
@@ -106,7 +106,7 @@ func (ns *NatsServer) WriteResponse(processor IRpcProcessor, nodeId string, serv
}
if err != nil {
log.Error("WriteMsg error,Rpc return is fail", log.String("nodeId", nodeId), log.String("serviceMethod", serviceMethod), log.ErrorAttr("error", err))
log.Error("WriteMsg error,Rpc return is fail", log.String("nodeId", nodeId), log.String("serviceMethod", serviceMethod), log.ErrorField("error", err))
}
}

View File

@@ -6,7 +6,7 @@ import (
"github.com/duanhf2012/origin/v2/network"
"math"
"reflect"
"runtime"
"sync/atomic"
"time"
)
@@ -49,7 +49,7 @@ func (rc *RClient) Go(nodeId string, timeout time.Duration, rpcHandler IRpcHandl
_, processor := GetProcessorType(args)
InParam, err := processor.Marshal(args)
if err != nil {
log.Error("Marshal is fail", log.ErrorAttr("error", err))
log.Error("Marshal is fail", log.ErrorField("error", err))
call := MakeCall()
call.DoError(err)
return call
@@ -62,8 +62,8 @@ func (rc *RClient) RawGo(nodeId string, timeout time.Duration, rpcHandler IRpcHa
return rc.selfClient.rawGo(nodeId, rc, timeout, rpcHandler, processor, noReply, rpcMethodId, serviceMethod, rawArgs, reply)
}
func (rc *RClient) AsyncCall(nodeId string, timeout time.Duration, rpcHandler IRpcHandler, serviceMethod string, callback reflect.Value, args interface{}, replyParam interface{}, cancelable bool) (CancelRpc, error) {
cancelRpc, err := rc.selfClient.asyncCall(nodeId, rc, timeout, rpcHandler, serviceMethod, callback, args, replyParam, cancelable)
func (rc *RClient) AsyncCall(nodeId string, timeout time.Duration, rpcHandler IRpcHandler, serviceMethod string, callback reflect.Value, args interface{}, replyParam interface{}) (CancelRpc, error) {
cancelRpc, err := rc.selfClient.asyncCall(nodeId, rc, timeout, rpcHandler, serviceMethod, callback, args, replyParam)
if err != nil {
callback.Call([]reflect.Value{reflect.ValueOf(replyParam), reflect.ValueOf(err)})
}
@@ -74,10 +74,7 @@ func (rc *RClient) AsyncCall(nodeId string, timeout time.Duration, rpcHandler IR
func (rc *RClient) Run() {
defer func() {
if r := recover(); r != nil {
buf := make([]byte, 4096)
l := runtime.Stack(buf, false)
errString := fmt.Sprint(r)
log.Dump(string(buf[:l]), log.String("error", errString))
log.StackError(fmt.Sprint(r))
}
}()
@@ -89,7 +86,7 @@ func (rc *RClient) Run() {
for {
bytes, err := rc.conn.ReadMsg()
if err != nil {
log.Error("RClient read msg is failed", log.ErrorAttr("error", err))
log.Error("RClient read msg is failed", log.ErrorField("error", err))
return
}

View File

@@ -6,7 +6,7 @@ import (
"github.com/duanhf2012/origin/v2/event"
"github.com/duanhf2012/origin/v2/log"
"reflect"
"runtime"
"strings"
"time"
"unicode"
@@ -164,13 +164,6 @@ func (handler *RpcHandler) suitableMethods(method reflect.Method) error {
//取出输入参数类型
var rpcMethodInfo RpcMethodInfo
typ := method.Type
if typ.NumOut() != 1 {
return fmt.Errorf("%s The number of returned arguments must be 1", method.Name)
}
if typ.Out(0).String() != "error" {
return fmt.Errorf("%s The return parameter must be of type error", method.Name)
}
if typ.NumIn() < 2 || typ.NumIn() > 4 {
return fmt.Errorf("%s Unsupported parameter format", method.Name)
@@ -183,6 +176,18 @@ func (handler *RpcHandler) suitableMethods(method reflect.Method) error {
rpcMethodInfo.hasResponder = true
}
if rpcMethodInfo.hasResponder && typ.NumOut() > 0 {
return fmt.Errorf("%s should not have return parameters", method.Name)
}
if !rpcMethodInfo.hasResponder && typ.NumOut() != 1 {
return fmt.Errorf("%s The number of returned arguments must be 1", method.Name)
}
if !rpcMethodInfo.hasResponder && typ.Out(0).String() != "error" {
return fmt.Errorf("%s The return parameter must be of type error", method.Name)
}
for i := parIdx; i < typ.NumIn(); i++ {
if handler.isExportedOrBuiltinType(typ.In(i)) == false {
return fmt.Errorf("%s Unsupported parameter types", method.Name)
@@ -220,10 +225,7 @@ func (handler *RpcHandler) RegisterRpc(rpcHandler IRpcHandler) error {
func (handler *RpcHandler) HandlerRpcResponseCB(call *Call) {
defer func() {
if r := recover(); r != nil {
buf := make([]byte, 4096)
l := runtime.Stack(buf, false)
errString := fmt.Sprint(r)
log.Dump(string(buf[:l]), log.String("error", errString))
log.StackError(fmt.Sprint(r))
}
}()
@@ -242,10 +244,7 @@ func (handler *RpcHandler) HandlerRpcRequest(request *RpcRequest) {
defer func() {
if r := recover(); r != nil {
buf := make([]byte, 4096)
l := runtime.Stack(buf, false)
errString := fmt.Sprint(r)
log.Dump(string(buf[:l]), log.String("error", errString))
log.StackError(fmt.Sprint(r))
rpcErr := RpcError("call error : core dumps")
if request.requestHandle != nil {
request.requestHandle(nil, rpcErr)
@@ -313,9 +312,11 @@ func (handler *RpcHandler) HandlerRpcRequest(request *RpcRequest) {
requestHandle := request.requestHandle
returnValues := v.method.Func.Call(paramList)
errInter := returnValues[0].Interface()
if errInter != nil {
err = errInter.(error)
if len(returnValues) > 0 {
errInter := returnValues[0].Interface()
if errInter != nil {
err = errInter.(error)
}
}
if v.hasResponder == false && requestHandle != nil {
@@ -439,7 +440,7 @@ func (handler *RpcHandler) goRpc(processor IRpcProcessor, bCast bool, nodeId str
err, pClientList := handler.funcRpcClient(nodeId, serviceMethod, false, pClientList)
if len(pClientList) == 0 {
if err != nil {
log.Error("call serviceMethod is failed", log.String("serviceMethod", serviceMethod), log.ErrorAttr("error", err))
log.Error("call serviceMethod is failed", log.String("serviceMethod", serviceMethod), log.ErrorField("error", err))
} else {
log.Error("cannot find serviceMethod", log.String("serviceMethod", serviceMethod))
}
@@ -468,7 +469,7 @@ func (handler *RpcHandler) callRpc(timeout time.Duration, nodeId string, service
pClientList := make([]*Client, 0, maxClusterNode)
err, pClientList := handler.funcRpcClient(nodeId, serviceMethod, false, pClientList)
if err != nil {
log.Error("Call serviceMethod is failed", log.ErrorAttr("error", err))
log.Error("Call serviceMethod is failed", log.ErrorField("error", err))
return err
} else if len(pClientList) <= 0 {
err = errors.New("Call serviceMethod is error:cannot find " + serviceMethod)
@@ -532,8 +533,7 @@ func (handler *RpcHandler) asyncCallRpc(timeout time.Duration, nodeId string, se
}
//2.rpcClient调用
//如果调用本结点服务
return pClientList[0].AsyncCall(pClientList[0].GetTargetNodeId(), timeout, handler.rpcHandler, serviceMethod, fVal, args, reply, false)
return pClientList[0].AsyncCall(pClientList[0].GetTargetNodeId(), timeout, handler.rpcHandler, serviceMethod, fVal, args, reply, )
}
func (handler *RpcHandler) GetName() string {
@@ -592,7 +592,7 @@ func (handler *RpcHandler) RawGoNode(rpcProcessorType RpcProcessorType, nodeId s
pClientList := make([]*Client, 0, 1)
err, pClientList := handler.funcRpcClient(nodeId, serviceName, false, pClientList)
if len(pClientList) == 0 || err != nil {
log.Error("call serviceMethod is failed", log.ErrorAttr("error", err))
log.Error("call serviceMethod is failed", log.ErrorField("error", err))
return err
}
if len(pClientList) > 1 {

View File

@@ -27,9 +27,6 @@ func (rn *RpcNats) NewNatsClient(targetNodeId string,localNodeId string,callSet
client.clientId = atomic.AddUint32(&clientSeq, 1)
client.targetNodeId = targetNodeId
//client.maxCheckCallRpcCount = DefaultMaxCheckCallRpcCount
//client.callRpcTimeout = DefaultRpcTimeout
natsClient := &rn.NatsClient
natsClient.localNodeId = localNodeId
natsClient.client = &client

View File

@@ -7,7 +7,7 @@ import (
"math"
"net"
"reflect"
"runtime"
"strings"
"time"
)
@@ -31,7 +31,7 @@ type IServer interface {
selfNodeRpcHandlerGo(timeout time.Duration, processor IRpcProcessor, client *Client, noReply bool, handlerName string, rpcMethodId uint32, serviceMethod string, args interface{}, reply interface{}, rawArgs []byte) *Call
myselfRpcHandlerGo(client *Client, handlerName string, serviceMethod string, args interface{}, callBack reflect.Value, reply interface{}) error
selfNodeRpcHandlerAsyncGo(timeout time.Duration, client *Client, callerRpcHandler IRpcHandler, noReply bool, handlerName string, serviceMethod string, args interface{}, reply interface{}, callback reflect.Value, cancelable bool) (CancelRpc, error)
selfNodeRpcHandlerAsyncGo(timeout time.Duration, client *Client, callerRpcHandler IRpcHandler, noReply bool, handlerName string, serviceMethod string, args interface{}, reply interface{}, callback reflect.Value) (CancelRpc, error)
}
type writeResponse func(processor IRpcProcessor, connTag string, serviceMethod string, seq uint64, reply interface{}, rpcError RpcError)
@@ -130,7 +130,7 @@ func (agent *RpcAgent) WriteResponse(processor IRpcProcessor, connTag string, se
defer processor.ReleaseRpcResponse(rpcResponse.RpcResponseData)
if errM != nil {
log.Error("marshal RpcResponseData failed", log.String("serviceMethod", serviceMethod), log.ErrorAttr("error", errM))
log.Error("marshal RpcResponseData failed", log.String("serviceMethod", serviceMethod), log.ErrorField("error", errM))
return
}
@@ -141,7 +141,7 @@ func (agent *RpcAgent) WriteResponse(processor IRpcProcessor, connTag string, se
compressBuff, cErr = compressor.CompressBlock(bytes)
if cErr != nil {
log.Error("CompressBlock failed", log.String("serviceMethod", serviceMethod), log.ErrorAttr("error", cErr))
log.Error("CompressBlock failed", log.String("serviceMethod", serviceMethod), log.ErrorField("error", cErr))
return
}
if len(compressBuff) < len(bytes) {
@@ -155,17 +155,14 @@ func (agent *RpcAgent) WriteResponse(processor IRpcProcessor, connTag string, se
compressor.CompressBufferCollection(compressBuff)
}
if errM != nil {
log.Error("WriteMsg error,Rpc return is fail", log.String("serviceMethod", serviceMethod), log.ErrorAttr("error", errM))
log.Error("WriteMsg error,Rpc return is fail", log.String("serviceMethod", serviceMethod), log.ErrorField("error", errM))
}
}
func (agent *RpcAgent) Run() {
defer func() {
if r := recover(); r != nil {
buf := make([]byte, 4096)
l := runtime.Stack(buf, false)
errString := fmt.Sprint(r)
log.Dump(string(buf[:l]), log.String("error", errString))
log.StackError(fmt.Sprint(r))
}
}()
@@ -173,7 +170,7 @@ func (agent *RpcAgent) Run() {
data, err := agent.conn.ReadMsg()
if err != nil {
//will close conn
log.Error("read message is error", log.String("remoteAddress", agent.conn.RemoteAddr().String()), log.ErrorAttr("error", err))
log.Error("read message is error", log.String("remoteAddress", agent.conn.RemoteAddr().String()), log.ErrorField("error", err))
break
}
@@ -181,7 +178,7 @@ func (agent *RpcAgent) Run() {
if err != nil {
//will close conn
agent.conn.ReleaseReadMsg(data)
log.Error("processRpcRequest is error", log.String("remoteAddress", agent.conn.RemoteAddr().String()), log.ErrorAttr("error", err))
log.Error("processRpcRequest is error", log.String("remoteAddress", agent.conn.RemoteAddr().String()), log.ErrorField("error", err))
break
}

View File

@@ -11,6 +11,7 @@ import (
"github.com/duanhf2012/origin/v2/log"
rpcHandle "github.com/duanhf2012/origin/v2/rpc"
"github.com/duanhf2012/origin/v2/util/timer"
"slices"
)
const InitModuleId = 1e9
@@ -46,7 +47,7 @@ type Module struct {
moduleName string //模块名称
parent IModule //父亲
self IModule //自己
child map[uint32]IModule //孩子们
child []IModule //孩子们
mapActiveTimer map[timer.ITimer]struct{}
mapActiveIdTimer map[uint64]timer.ITimer
dispatcher *timer.Dispatcher //timer
@@ -93,10 +94,7 @@ func (m *Module) AddModule(module IModule) (uint32, error) {
pAddModule.moduleId = m.NewModuleId()
}
if m.child == nil {
m.child = map[uint32]IModule{}
}
_, ok := m.child[module.GetModuleId()]
_,ok := m.ancestor.getBaseModule().(*Module).descendants[module.GetModuleId()]
if ok == true {
return 0, fmt.Errorf("exists module id %d", module.GetModuleId())
}
@@ -109,29 +107,33 @@ func (m *Module) AddModule(module IModule) (uint32, error) {
pAddModule.eventHandler = event.NewEventHandler()
pAddModule.eventHandler.Init(m.eventHandler.GetEventProcessor())
pAddModule.IConcurrent = m.IConcurrent
m.child = append(m.child,module)
m.ancestor.getBaseModule().(*Module).descendants[module.GetModuleId()] = module
err := module.OnInit()
if err != nil {
delete(m.ancestor.getBaseModule().(*Module).descendants, module.GetModuleId())
m.child = m.child[:len(m.child)-1]
log.Error("module OnInit error",log.String("ModuleName",module.GetModuleName()),log.ErrorField("err",err))
return 0, err
}
m.child[module.GetModuleId()] = module
m.ancestor.getBaseModule().(*Module).descendants[module.GetModuleId()] = module
log.Debug("Add module " + module.GetModuleName() + " completed")
return module.GetModuleId(), nil
}
func (m *Module) ReleaseModule(moduleId uint32) {
pModule := m.GetModule(moduleId).getBaseModule().(*Module)
pModule.self.OnRelease()
log.Debug("Release module " + pModule.GetModuleName())
//释放子孙
for id := range pModule.child {
m.ReleaseModule(id)
for i:=len(pModule.child)-1; i>=0; i-- {
m.ReleaseModule(pModule.child[i].GetModuleId())
}
pModule.self.OnRelease()
pModule.GetEventHandler().Destroy()
log.Debug("Release module " + pModule.GetModuleName())
for pTimer := range pModule.mapActiveTimer {
pTimer.Cancel()
}
@@ -140,7 +142,10 @@ func (m *Module) ReleaseModule(moduleId uint32) {
t.Cancel()
}
delete(m.child, moduleId)
m.child = slices.DeleteFunc(m.child, func(module IModule) bool {
return module.GetModuleId() == moduleId
})
delete(m.ancestor.getBaseModule().(*Module).descendants, moduleId)
//清理被删除的Module
@@ -281,7 +286,7 @@ func (m *Module) SafeNewTicker(tickerId *uint64, d time.Duration, AdditionData i
func (m *Module) CancelTimerId(timerId *uint64) bool {
if timerId == nil || *timerId == 0 {
log.Warning("timerId is invalid")
log.Warn("timerId is invalid")
return false
}
@@ -292,7 +297,7 @@ func (m *Module) CancelTimerId(timerId *uint64) bool {
t, ok := m.mapActiveIdTimer[*timerId]
if ok == false {
log.Stack("cannot find timer id ", log.Uint64("timerId", *timerId))
log.StackError("cannot find timer id ", log.Uint64("timerId", *timerId))
return false
}

View File

@@ -11,7 +11,6 @@ import (
"github.com/duanhf2012/origin/v2/rpc"
"github.com/duanhf2012/origin/v2/util/timer"
"reflect"
"runtime"
"strconv"
"sync"
"sync/atomic"
@@ -193,7 +192,7 @@ func (s *Service) run() {
break
}
if s.profiler != nil {
analyzer = s.profiler.Push("[Req]" + rpcRequest.RpcRequestData.GetServiceMethod())
analyzer = s.profiler.Push("[RpcReq]" + rpcRequest.RpcRequestData.GetServiceMethod()+"."+strconv.Itoa(int(rpcRequest.RpcRequestData.GetRpcMethodId())))
}
s.GetRpcHandler().HandlerRpcRequest(rpcRequest)
@@ -261,17 +260,16 @@ func (s *Service) SetName(serviceName string) {
func (s *Service) Release() {
defer func() {
if r := recover(); r != nil {
buf := make([]byte, 4096)
l := runtime.Stack(buf, false)
errString := fmt.Sprint(r)
log.Dump(string(buf[:l]), log.String("error", errString))
log.StackError(fmt.Sprint(r))
}
}()
if atomic.AddInt32(&s.isRelease, -1) == -1 {
s.self.OnRelease()
for i:=len(s.child)-1; i>=0; i-- {
s.ReleaseModule(s.child[i].GetModuleId())
}
}
}
func (s *Service) OnRelease() {
@@ -436,6 +434,7 @@ func (s *Service) SetEventChannelNum(num int) {
}
}
// Deprecated: replace it with the OpenConcurrent function
func (s *Service) SetGoRoutineNum(goroutineNum int32) bool {
//已经开始状态不允许修改协程数量,打开性能分析器不允许开多线程
if s.startStatus == true || s.profiler != nil {

View File

@@ -23,7 +23,7 @@ func Init() {
for _,s := range setupServiceList {
err := s.OnInit()
if err != nil {
log.Error("Failed to initialize "+s.GetName()+" service",log.ErrorAttr("err",err))
log.Error("Failed to initialize "+s.GetName()+" service",log.ErrorField("err",err))
os.Exit(1)
}
}

View File

@@ -7,7 +7,6 @@ import (
"github.com/duanhf2012/origin/v2/service"
"github.com/gin-gonic/gin"
"io"
"log/slog"
"net/http"
"strings"
"time"
@@ -69,28 +68,28 @@ func (gm *GinModule) eventHandler(ev event.IEvent) {
func (gm *GinModule) Start() {
gm.srv.Addr = gm.listenAddr
log.Info("http start listen", slog.Any("addr", gm.listenAddr))
log.Info("http start listen", log.Any("addr", gm.listenAddr))
go func() {
err := gm.srv.ListenAndServe()
if err != nil {
log.Error("ListenAndServe error", slog.Any("error", err.Error()))
log.Error("ListenAndServe error", log.Any("error", err.Error()))
}
}()
}
func (gm *GinModule) StartTLS(certFile, keyFile string) {
log.Info("http start listen", slog.Any("addr", gm.listenAddr))
log.Info("http start listen", log.Any("addr", gm.listenAddr))
go func() {
err := gm.srv.ListenAndServeTLS(certFile, keyFile)
if err != nil {
log.Fatal("ListenAndServeTLS error", slog.Any("error", err.Error()))
log.Fatal("ListenAndServeTLS error", log.Any("error", err.Error()))
}
}()
}
func (gm *GinModule) Stop(ctx context.Context) {
if err := gm.srv.Shutdown(ctx); err != nil {
log.Error("Server Shutdown", slog.Any("error", err))
log.Error("Server Shutdown", log.Any("error", err))
}
}
@@ -210,7 +209,7 @@ func (gm *GinModule) handleMethod(httpMethod, relativePath string, handlers ...S
select {
case <-ctx.Done():
log.Error("GinModule process timeout", slog.Any("path", c.Request.URL.Path))
log.Error("GinModule process timeout", log.Any("path", c.Request.URL.Path))
c.AbortWithStatus(http.StatusRequestTimeout)
case <-chanWait:
}

View File

@@ -9,7 +9,6 @@ import (
"github.com/duanhf2012/origin/v2/service"
"github.com/xtaci/kcp-go/v5"
"go.mongodb.org/mongo-driver/bson/primitive"
"runtime"
"sync"
)
@@ -167,10 +166,7 @@ func (km *KcpModule) NewAgent(conn network.Conn) network.Agent {
func (c *Client) Run() {
defer func() {
if r := recover(); r != nil {
buf := make([]byte, 4096)
l := runtime.Stack(buf, false)
errString := fmt.Sprint(r)
log.Dump(string(buf[:l]), log.String("error", errString))
log.StackError(fmt.Sprint(r))
}
}()
@@ -179,7 +175,7 @@ func (c *Client) Run() {
c.kcpConn.SetReadDeadline(*c.kcpModule.kcpCfg.ReadDeadlineMill)
msgBuff, err := c.kcpConn.ReadMsg()
if err != nil {
log.Debug("read client failed", log.ErrorAttr("error", err), log.String("clientId", c.id))
log.Debug("read client failed", log.ErrorField("error", err), log.String("clientId", c.id))
break
}

View File

@@ -8,7 +8,6 @@ import (
"github.com/duanhf2012/origin/v2/network/processor"
"github.com/duanhf2012/origin/v2/service"
"go.mongodb.org/mongo-driver/bson/primitive"
"runtime"
"sync"
"time"
)
@@ -125,10 +124,7 @@ func (slf *Client) GetId() string {
func (slf *Client) Run() {
defer func() {
if r := recover(); r != nil {
buf := make([]byte, 4096)
l := runtime.Stack(buf, false)
errString := fmt.Sprint(r)
log.Dump(string(buf[:l]), log.String("error", errString))
log.StackError(fmt.Sprint(r))
}
}()
@@ -137,7 +133,7 @@ func (slf *Client) Run() {
slf.tcpConn.SetReadDeadline(slf.tcpModule.tcpServer.ReadDeadline)
bytes, err := slf.tcpConn.ReadMsg()
if err != nil {
log.Debug("read client failed", log.ErrorAttr("error", err), log.String("clientId", slf.id))
log.Debug("read client failed", log.ErrorField("error", err), log.String("clientId", slf.id))
break
}
data, err := slf.tcpModule.process.Unmarshal(slf.id, bytes)
@@ -185,7 +181,7 @@ func (tm *TcpModule) Close(clientId string) {
client.tcpConn.Close()
}
log.SWarning("close client:", clientId)
log.SWarn("close client:", clientId)
return
}

View File

@@ -14,7 +14,7 @@ import (
type WSModule struct {
service.Module
wsServer network.WSServer
WSServer network.WSServer
mapClientLocker sync.RWMutex
mapClient map[string]*WSClient
@@ -57,16 +57,16 @@ func (ws *WSModule) OnInit() error {
return fmt.Errorf("please call the Init function correctly")
}
ws.wsServer.MaxConnNum = ws.wsCfg.MaxConnNum
ws.wsServer.PendingWriteNum = ws.wsCfg.PendingWriteNum
ws.wsServer.MaxMsgLen = ws.wsCfg.MaxMsgLen
ws.wsServer.Addr = ws.wsCfg.ListenAddr
ws.WSServer.MaxConnNum = ws.wsCfg.MaxConnNum
ws.WSServer.PendingWriteNum = ws.wsCfg.PendingWriteNum
ws.WSServer.MaxMsgLen = ws.wsCfg.MaxMsgLen
ws.WSServer.Addr = ws.wsCfg.ListenAddr
//3.设置解析处理器
ws.process.SetByteOrder(ws.wsCfg.LittleEndian)
ws.mapClient = make(map[string]*WSClient, ws.wsServer.MaxConnNum)
ws.wsServer.NewAgent = ws.NewWSClient
ws.mapClient = make(map[string]*WSClient, ws.WSServer.MaxConnNum)
ws.WSServer.NewAgent = ws.NewWSClient
//4.设置网络事件处理
ws.GetEventProcessor().RegEventReceiverFunc(event.Sys_Event_WebSocket, ws.GetEventHandler(), ws.wsEventHandler)
@@ -80,7 +80,7 @@ func (ws *WSModule) Init(wsCfg *WSCfg, process processor.IRawProcessor) {
}
func (ws *WSModule) Start() error {
return ws.wsServer.Start()
return ws.WSServer.Start()
}
func (ws *WSModule) wsEventHandler(ev event.IEvent) {
@@ -120,7 +120,7 @@ func (wc *WSClient) Run() {
for {
bytes, err := wc.wsConn.ReadMsg()
if err != nil {
log.Debug("read client is error", log.String("clientId", wc.id), log.ErrorAttr("err", err))
log.Debug("read client is error", log.String("clientId", wc.id), log.ErrorField("err", err))
break
}
data, err := wc.wsModule.process.Unmarshal(wc.id, bytes)
@@ -197,3 +197,7 @@ func (ws *WSModule) SendRawMsg(clientId string, msg []byte) error {
ws.mapClientLocker.Unlock()
return client.wsConn.WriteMsg(msg)
}
func (ws *WSModule) SetMessageType(messageType int) {
ws.WSServer.SetMessageType(messageType)
}

View File

@@ -66,7 +66,7 @@ func (m *RedisModule) Init(redisCfg *ConfigRedis) {
}
c, err := redis.Dial("tcp", redisServer, opt...)
if err != nil {
log.Error("Connect redis fail reason:%v", err)
log.Error("Connect redis fail", log.ErrorField("err",err))
return nil, err
}
@@ -79,7 +79,7 @@ func (m *RedisModule) Init(redisCfg *ConfigRedis) {
}
_, err := c.Do("PING")
if err != nil {
log.Error("Do PING fail reason:%v", err)
log.Error("Do PING fail reason", log.ErrorField("err",err))
return err
}
return err
@@ -101,7 +101,7 @@ func (m *RedisModule) getConn() (redis.Conn, error) {
if conn.Err() != nil {
err := conn.Err()
if err != nil {
log.Error("get Conn have error,reason:%v", err)
log.Error("get Conn have error", log.ErrorField("err",err))
}
conn.Close()
return nil, err
@@ -118,7 +118,7 @@ func (m *RedisModule) TestPingRedis() error {
err = m.redisPool.TestOnBorrow(conn, time.Now())
if err != nil {
log.Error("TestOnBorrow fail,reason:%v", err)
log.Error("TestOnBorrow fail", log.ErrorField("err",err))
return err
}
@@ -171,7 +171,7 @@ func (m *RedisModule) setStringByExpire(key, value, expire interface{}) error {
}
if retErr != nil {
log.Error("setStringByExpire fail,reason:%v", retErr)
log.Error("setStringByExpire fail", log.ErrorField("err",retErr))
return retErr
}
@@ -254,7 +254,7 @@ func (m *RedisModule) setMuchStringByExpire(mapInfo map[interface{}]interface{},
}
if serr != nil {
log.Error("setMuchStringByExpire fail,reason:%v", serr)
log.Error("setMuchStringByExpire fail",log.ErrorField("err",serr))
conn.Do("DISCARD")
return serr
} else {
@@ -262,7 +262,7 @@ func (m *RedisModule) setMuchStringByExpire(mapInfo map[interface{}]interface{},
}
if err != nil {
log.Error("setMuchStringByExpire fail,reason:%v", err)
log.Error("setMuchStringByExpire fail", log.ErrorField("err",err))
}
return err
@@ -277,7 +277,7 @@ func (m *RedisModule) GetString(key interface{}) (string, error) {
ret, err := conn.Do("GET", key)
if err != nil {
log.Error("GetString fail,reason:%v", err)
log.Error("GetString fail", log.ErrorField("err",err))
return "", err
}
@@ -298,7 +298,7 @@ func (m *RedisModule) GetStringJSON(key string, st interface{}) error {
ret, err := conn.Do("GET", key)
if err != nil {
log.Error("GetStringJSON fail,reason:%v", err)
log.Error("GetStringJSON fail", log.ErrorField("err",err))
return err
}
@@ -315,7 +315,7 @@ func (m *RedisModule) GetStringJSON(key string, st interface{}) error {
}
if err = json.Unmarshal(str, st); err != nil {
log.Error("GetStringJSON fail json.Unmarshal is error:%s,%s,reason:%v", key, string(str), err)
log.Errorf("GetStringJSON fail json.Unmarshal is error:%s,%s,reason:%v", key, string(str), err)
return err
}
@@ -336,13 +336,13 @@ func (m *RedisModule) GetStringMap(keys []string) (retMap map[string]string, err
// 开始Send数据
err = conn.Send("MULTI")
if err != nil {
log.Error("GetMuchString fail %v", err)
log.Errorf("GetMuchString fail %v", err)
return nil, err
}
for _, val := range keys {
err = conn.Send("GET", val)
if err != nil {
log.Error("GetMuchString fail,reason:%v", err)
log.Errorf("GetMuchString fail,reason:%v", err)
conn.Do("DISCARD")
return nil, err
}
@@ -351,7 +351,7 @@ func (m *RedisModule) GetStringMap(keys []string) (retMap map[string]string, err
// 执行命令
ret, err := conn.Do("EXEC")
if err != nil {
log.Error("GetMuchString fail %v", err)
log.Errorf("GetMuchString fail %v", err)
return
}
@@ -383,7 +383,7 @@ func (m *RedisModule) ExistsKey(key interface{}) (bool, error) {
ret, err := conn.Do("EXISTS", key)
if err != nil {
log.Error("ExistsKey fail, reason:%v", err)
log.Errorf("ExistsKey fail, reason:%v", err)
return false, err
}
retValue, ok := ret.(int64)
@@ -404,7 +404,7 @@ func (m *RedisModule) DelString(key interface{}) error {
ret, err := conn.Do("DEL", key)
if err != nil {
log.Error("DelString fail, reason:%v", err)
log.Errorf("DelString fail, reason:%v", err)
return err
}
@@ -439,7 +439,7 @@ func (m *RedisModule) DelStringKeyList(keys []interface{}) (map[interface{}]bool
for _, val := range keys {
err = conn.Send("DEL", val)
if err != nil {
log.Error("DelMuchString fail,reason:%v", err)
log.Errorf("DelMuchString fail,reason:%v", err)
conn.Do("DISCARD")
return nil, err
}
@@ -448,7 +448,7 @@ func (m *RedisModule) DelStringKeyList(keys []interface{}) (map[interface{}]bool
ret, err := conn.Do("EXEC")
if err != nil {
log.Error("DelMuchString fail,reason:%v", err)
log.Errorf("DelMuchString fail,reason:%v", err)
return nil, err
}
@@ -484,7 +484,7 @@ func (m *RedisModule) SetHash(redisKey, hashKey, value interface{}) error {
_, retErr := conn.Do("HSET", redisKey, hashKey, value)
if retErr != nil {
log.Error("SetHash fail,reason:%v", retErr)
log.Errorf("SetHash fail,reason:%v", retErr)
}
return retErr
@@ -502,7 +502,7 @@ func (m *RedisModule) GetAllHashJSON(redisKey string) (map[string]string, error)
value, err := conn.Do("HGETALL", redisKey)
if err != nil {
log.Error("GetAllHashJSON fail,reason:%v", err)
log.Errorf("GetAllHashJSON fail,reason:%v", err)
return nil, err
}
@@ -522,7 +522,7 @@ func (m *RedisModule) GetHash(redisKey interface{}, fieldKey interface{}) (strin
value, err := conn.Do("HGET", redisKey, fieldKey)
if err != nil {
log.Error("GetHashValueByKey fail,reason:%v", err)
log.Errorf("GetHashValueByKey fail,reason:%v", err)
return "", err
}
if value == nil {
@@ -545,7 +545,7 @@ func (m *RedisModule) GetMuchHash(args ...interface{}) ([]string, error) {
value, err := conn.Do("HMGET", args...)
if err != nil {
log.Error("GetHashValueByKey fail,reason:%v", err)
log.Errorf("GetHashValueByKey fail,reason:%v", err)
return nil, err
}
if value == nil {
@@ -582,7 +582,7 @@ func (m *RedisModule) ScanMatchKeys(cursorValue int, redisKey string, count int)
value, err := conn.Do("SCAN", cursorValue, "match", redisKey, "count", count)
if err != nil {
log.Error("GetHashValueByKey fail,reason:%v", err)
log.Errorf("GetHashValueByKey fail,reason:%v", err)
return nextCursorValue, nil, err
}
if value == nil {
@@ -618,7 +618,7 @@ func (m *RedisModule) SetHashMapJSON(redisKey string, mapFieldValue map[interfac
if err == nil {
_, err = conn.Do("HSET", redisKey, symbol, temp)
if err != nil {
log.Error("SetMuchHashJSON fail,reason:%v", err)
log.Errorf("SetMuchHashJSON fail,reason:%v", err)
conn.Send("DISCARD")
return err
}
@@ -627,7 +627,7 @@ func (m *RedisModule) SetHashMapJSON(redisKey string, mapFieldValue map[interfac
// 执行命令
_, err = conn.Do("EXEC")
if err != nil {
log.Error("SetMuchHashJSON fail,reason:%v", err)
log.Errorf("SetMuchHashJSON fail,reason:%v", err)
conn.Send("DISCARD")
}
return err
@@ -642,7 +642,7 @@ func (m *RedisModule) DelHash(args ...interface{}) error {
_, retErr := conn.Do("HDEL", args...)
if retErr != nil {
log.Error("DelMuchHash fail,reason:%v", retErr)
log.Errorf("DelMuchHash fail,reason:%v", retErr)
}
return retErr
}
@@ -678,7 +678,7 @@ func (m *RedisModule) setListPush(setType string, args ...interface{}) error {
_, retErr := conn.Do(setType, args...)
if retErr != nil {
log.Error("setList fail,reason:%v", retErr)
log.Errorf("setList fail,reason:%v", retErr)
}
return retErr
}
@@ -705,7 +705,7 @@ func (m *RedisModule) LRangeList(key string, start, end int) ([]string, error) {
reply, err := conn.Do("lrange", key, start, end)
if err != nil {
log.Error("SetListJSONRpush fail,reason:%v", err)
log.Errorf("SetListJSONRpush fail,reason:%v", err)
return nil, err
}
@@ -722,7 +722,7 @@ func (m *RedisModule) GetListLen(key string) (int, error) {
reply, err := conn.Do("LLEN", key)
if err != nil {
log.Error("GetListLen fail,reason:%v", err)
log.Errorf("GetListLen fail,reason:%v", err)
return -1, err
}
return redis.Int(reply, err)
@@ -748,7 +748,7 @@ func (m *RedisModule) LTrimList(key string, start, end int) error {
_, err = conn.Do("LTRIM", key, start, end)
if err != nil {
log.Error("LtrimListValue fail,reason:%v", err)
log.Errorf("LtrimListValue fail,reason:%v", err)
return err
}
return nil
@@ -849,7 +849,7 @@ func (m *RedisModule) ZADDInsertJson(key string, score float64, value interface{
}
_, err = conn.Do("ZADD", key, score, JsonValue)
if err != nil {
log.Error("ZADDInsertJson fail,reason:%v", err)
log.Errorf("ZADDInsertJson fail,reason:%v", err)
return err
}
return nil
@@ -865,7 +865,7 @@ func (m *RedisModule) ZADDInsert(key string, score float64, Data interface{}) er
_, err = conn.Do("ZADD", key, score, Data)
if err != nil {
log.Error("ZADDInsert fail,reason:%v", err)
log.Errorf("ZADDInsert fail,reason:%v", err)
return err
}
return nil
@@ -1088,7 +1088,7 @@ func (m *RedisModule) HincrbyHashInt(redisKey, hashKey string, value int) error
_, retErr := conn.Do("HINCRBY", redisKey, hashKey, value)
if retErr != nil {
log.Error("HincrbyHashInt fail,reason:%v", retErr)
log.Errorf("HincrbyHashInt fail,reason:%v", retErr)
}
return retErr
@@ -1103,7 +1103,7 @@ func (m *RedisModule) EXPlREInsert(key string, TTl int) error {
_, err = conn.Do("expire", key, TTl)
if err != nil {
log.Error("expire fail,reason:%v", err)
log.Errorf("expire fail,reason:%v", err)
return err
}
return nil
@@ -1129,7 +1129,7 @@ func (m *RedisModule) Keys(key string) ([]string, error) {
ret, err := conn.Do("KEYS", key)
if err != nil {
log.Error("KEYS fail, reason:%v", err)
log.Errorf("KEYS fail, reason:%v", err)
return nil, err
}
retList, ok := ret.([]interface{})

View File

@@ -9,7 +9,6 @@ import (
"github.com/duanhf2012/origin/v2/service"
"github.com/duanhf2012/origin/v2/util/bytespool"
"github.com/google/uuid"
"runtime"
"strings"
"sync"
"time"
@@ -140,10 +139,7 @@ func (slf *Client) GetId() string {
func (slf *Client) Run() {
defer func() {
if r := recover(); r != nil {
buf := make([]byte, 4096)
l := runtime.Stack(buf, false)
errString := fmt.Sprint(r)
log.Dump(string(buf[:l]), log.String("error", errString))
log.StackError(fmt.Sprint(r))
}
}()
@@ -156,7 +152,7 @@ func (slf *Client) Run() {
slf.tcpConn.SetReadDeadline(slf.tcpService.tcpServer.ReadDeadline)
bytes, err := slf.tcpConn.ReadMsg()
if err != nil {
log.Debug("read client failed", log.ErrorAttr("error", err), log.String("clientId", slf.id))
log.Debug("read client failed", log.ErrorField("error", err), log.String("clientId", slf.id))
break
}
data, err := slf.tcpService.process.Unmarshal(slf.id, bytes)

View File

@@ -129,7 +129,7 @@ func (slf *WSClient) Run() {
for {
bytes, err := slf.wsConn.ReadMsg()
if err != nil {
log.Debug("read client id %s is error:%+v", slf.id, err)
log.Debugf("read client id %s is error:%+v", slf.id, err)
break
}
data, err := slf.wsService.process.Unmarshal(slf.id, bytes)

View File

@@ -32,10 +32,10 @@ func Abs[NumType typ.Signed | typ.Float](Num NumType) NumType {
func AddSafe[NumType typ.Number](number1 NumType, number2 NumType) (NumType, bool) {
ret := number1 + number2
if number2 > 0 && ret < number1 {
log.Stack("Calculation overflow", log.Any("number1", number1), log.Any("number2", number2))
log.SStackError("Calculation overflow", log.Any("number1", number1), log.Any("number2", number2))
return ret, false
} else if number2 < 0 && ret > number1 {
log.Stack("Calculation overflow", log.Any("number1", number1), log.Any("number2", number2))
log.SStackError("Calculation overflow", log.Any("number1", number1), log.Any("number2", number2))
return ret, false
}
@@ -45,10 +45,10 @@ func AddSafe[NumType typ.Number](number1 NumType, number2 NumType) (NumType, boo
func SubSafe[NumType typ.Number](number1 NumType, number2 NumType) (NumType, bool) {
ret := number1 - number2
if number2 > 0 && ret > number1 {
log.Stack("Calculation overflow", log.Any("number1", number1), log.Any("number2", number2))
log.SStackError("Calculation overflow", log.Any("number1", number1), log.Any("number2", number2))
return ret, false
} else if number2 < 0 && ret < number1 {
log.Stack("Calculation overflow", log.Any("number1", number1), log.Any("number2", number2))
log.SStackError("Calculation overflow", log.Any("number1", number1), log.Any("number2", number2))
return ret, false
}
@@ -65,7 +65,7 @@ func MulSafe[NumType typ.Number](number1 NumType, number2 NumType) (NumType, boo
return ret, true
}
log.Stack("Calculation overflow", log.Any("number1", number1), log.Any("number2", number2))
log.SStackError("Calculation overflow", log.Any("number1", number1), log.Any("number2", number2))
return ret, true
}

View File

@@ -124,10 +124,7 @@ func (t *Timer) IsOpen() bool {
func (t *Timer) Do() {
defer func() {
if r := recover(); r != nil {
buf := make([]byte, 4096)
l := runtime.Stack(buf, false)
errString := fmt.Sprint(r)
log.Dump(string(buf[:l]), log.String("error", errString))
log.StackError(fmt.Sprint(r))
}
}()
@@ -210,10 +207,7 @@ func (c *Cron) Reset() {
func (c *Cron) Do() {
defer func() {
if r := recover(); r != nil {
buf := make([]byte, 4096)
l := runtime.Stack(buf, false)
errString := fmt.Sprint(r)
log.Dump(string(buf[:l]), log.String("error", errString))
log.StackError(fmt.Sprint(r))
}
}()
@@ -266,10 +260,7 @@ func (c *Cron) UnRef() {
func (c *Ticker) Do() {
defer func() {
if r := recover(); r != nil {
buf := make([]byte, 4096)
l := runtime.Stack(buf, false)
errString := fmt.Sprint(r)
log.Dump(string(buf[:l]), log.String("error", errString))
log.StackError(fmt.Sprint(r))
}
}()