Compare commits

...

20 Commits

Author SHA1 Message Date
boyce
4d36e525a5 优化配置读取,去消默认cluster目录 2025-01-16 13:45:06 +08:00
duanhf2012
3a4350769c 新增etcd认证配置 2025-01-08 18:11:20 +08:00
duanhf2012
d4966ea129 优化ws模块 2024-12-17 14:46:00 +08:00
duanhf2012
3b10eeb792 优化日志 2024-12-16 18:00:26 +08:00
duanhf2012
e3275e9f2a 优化模块释放顺序 2024-12-11 18:31:37 +08:00
duanhf2012
16745b34f0 优化日志 2024-12-11 17:49:59 +08:00
duanhf2012
f34dc7d53f 优化日志自定义Writer 2024-12-11 17:24:06 +08:00
duanhf2012
0a09dc2fee 优化日志 2024-12-11 17:14:29 +08:00
duanhf2012
f01a93c446 优化日志 2024-12-11 17:03:21 +08:00
duanhf2012
4d2ab4ee4f 优化代码 2024-12-11 16:44:09 +08:00
duanhf2012
ffcc5a3489 1.优化服务配置检查
2.废弃SetGoRoutineNum接口
3.释放Module优化
2024-12-06 16:05:25 +08:00
duanhf2012
cf6ca0483b Merge branch 'v2' of https://github.com/duanhf2012/origin into v2 2024-12-05 10:19:24 +08:00
duanhf2012
97a21e6f71 新增Skip接口 2024-12-05 10:19:15 +08:00
duanhf2012
f60a55d03a 优化异步RPC,去掉error返回值 2024-12-04 18:33:53 +08:00
duanhf2012
2c32d6eec9 RPC与日志优化 2024-12-03 17:21:21 +08:00
duanhf2012
da45f97fa8 优化日志 2024-11-29 15:55:35 +08:00
duanhf2012
d29abc0813 新增配置文件环境变量支持 2024-11-29 14:39:04 +08:00
duanhf2012
c9507f9ee9 替换slog日志为zap 2024-11-29 13:47:51 +08:00
duanhf2012
61de4bba3a 替换slog日志为zap 2024-11-29 13:47:27 +08:00
duanhf2012
000853b479 网络库优化 2024-11-25 17:44:08 +08:00
42 changed files with 637 additions and 1069 deletions

View File

@@ -250,7 +250,7 @@ func (cls *Cluster) Init(localNodeId string, setupServiceFun SetupServiceFun) er
//2.安装服务发现结点 //2.安装服务发现结点
err = cls.setupDiscovery(localNodeId, setupServiceFun) err = cls.setupDiscovery(localNodeId, setupServiceFun)
if err != nil { if err != nil {
log.Error("setupDiscovery fail", log.ErrorAttr("err", err)) log.Error("setupDiscovery fail", log.ErrorField("err", err))
return err return err
} }
service.RegRpcEventFun = cls.RegRpcEvent service.RegRpcEventFun = cls.RegRpcEvent

View File

@@ -10,15 +10,15 @@ import (
"go.etcd.io/etcd/client/v3" "go.etcd.io/etcd/client/v3"
"google.golang.org/protobuf/proto" "google.golang.org/protobuf/proto"
"time" "time"
"context" "context"
"errors" "errors"
"fmt" "fmt"
"go.uber.org/zap"
"path" "path"
"runtime"
"strings" "strings"
"sync/atomic" "sync/atomic"
"io/ioutil"
"crypto/x509"
"crypto/tls"
) )
const originDir = "/origin" const originDir = "/origin"
@@ -42,11 +42,16 @@ type EtcdDiscoveryService struct {
mapDiscoveryNodeId map[string]map[string]struct{} //map[networkName]map[nodeId] mapDiscoveryNodeId map[string]map[string]struct{} //map[networkName]map[nodeId]
} }
var etcdDiscovery *EtcdDiscoveryService
func getEtcdDiscovery() IServiceDiscovery { func getEtcdDiscovery() IServiceDiscovery {
etcdDiscovery := &EtcdDiscoveryService{} if etcdDiscovery == nil {
etcdDiscovery = &EtcdDiscoveryService{}
}
return etcdDiscovery return etcdDiscovery
} }
func (ed *EtcdDiscoveryService) InitDiscovery(localNodeId string, funDelNode FunDelNode, funSetNode FunSetNode) error { func (ed *EtcdDiscoveryService) InitDiscovery(localNodeId string, funDelNode FunDelNode, funSetNode FunSetNode) error {
ed.localNodeId = localNodeId ed.localNodeId = localNodeId
@@ -89,21 +94,50 @@ func (ed *EtcdDiscoveryService) OnInit() error {
} }
for i := 0; i < len(etcdDiscoveryCfg.EtcdList); i++ { for i := 0; i < len(etcdDiscoveryCfg.EtcdList); i++ {
client, cerr := clientv3.New(clientv3.Config{ var client *clientv3.Client
var tlsConfig *tls.Config
if etcdDiscoveryCfg.EtcdList[i].Cert != "" {
// load cert
cert, cerr := tls.LoadX509KeyPair(etcdDiscoveryCfg.EtcdList[i].Cert, etcdDiscoveryCfg.EtcdList[i].CertKey)
if cerr != nil {
log.Error("load cert error", log.ErrorField("err", cerr))
return cerr
}
// load root ca
caData, cerr := ioutil.ReadFile(etcdDiscoveryCfg.EtcdList[i].Ca)
if cerr != nil {
log.Error("load root ca error", log.ErrorField("err", cerr))
return cerr
}
pool := x509.NewCertPool()
pool.AppendCertsFromPEM(caData)
tlsConfig = &tls.Config{
Certificates: []tls.Certificate{cert},
RootCAs: pool,
}
}
client, err = clientv3.New(clientv3.Config{
Endpoints: etcdDiscoveryCfg.EtcdList[i].Endpoints, Endpoints: etcdDiscoveryCfg.EtcdList[i].Endpoints,
DialTimeout: etcdDiscoveryCfg.DialTimeoutMillisecond, DialTimeout: etcdDiscoveryCfg.DialTimeoutMillisecond,
Logger: zap.NewNop(), Username: etcdDiscoveryCfg.EtcdList[i].UserName,
Password: etcdDiscoveryCfg.EtcdList[i].Password,
Logger: log.GetLogger().Logger,
TLS: tlsConfig,
}) })
if cerr != nil {
log.Error("etcd discovery init fail", log.ErrorAttr("err", cerr)) if err != nil {
return cerr log.Error("etcd discovery init fail", log.ErrorField("err", err))
return err
} }
ctx, _ := context.WithTimeout(context.Background(), time.Second*3) ctx, _ := context.WithTimeout(context.Background(), time.Second*3)
_, err = client.Leases(ctx) _, err = client.Leases(ctx)
if err != nil { if err != nil {
log.Error("etcd discovery init fail", log.Any("endpoint", etcdDiscoveryCfg.EtcdList[i].Endpoints), log.ErrorAttr("err", err)) log.Error("etcd discovery init fail", log.Any("endpoint", etcdDiscoveryCfg.EtcdList[i].Endpoints), log.ErrorField("err", err))
return err return err
} }
@@ -128,7 +162,7 @@ func (ed *EtcdDiscoveryService) registerServiceByClient(client *clientv3.Client,
var resp *clientv3.LeaseGrantResponse var resp *clientv3.LeaseGrantResponse
resp, err = client.Grant(context.Background(), cluster.GetEtcdDiscovery().TTLSecond) resp, err = client.Grant(context.Background(), cluster.GetEtcdDiscovery().TTLSecond)
if err != nil { if err != nil {
log.Error("etcd registerService fail", log.ErrorAttr("err", err)) log.Error("etcd registerService fail", log.ErrorField("err", err))
ed.tryRegisterService(client, etcdClient) ed.tryRegisterService(client, etcdClient)
return return
} }
@@ -138,7 +172,7 @@ func (ed *EtcdDiscoveryService) registerServiceByClient(client *clientv3.Client,
// 注册服务节点到 etcd // 注册服务节点到 etcd
_, err = client.Put(context.Background(), ed.getRegisterKey(watchKey), ed.byteLocalNodeInfo, clientv3.WithLease(resp.ID)) _, err = client.Put(context.Background(), ed.getRegisterKey(watchKey), ed.byteLocalNodeInfo, clientv3.WithLease(resp.ID))
if err != nil { if err != nil {
log.Error("etcd Put fail", log.ErrorAttr("err", err)) log.Error("etcd Put fail", log.ErrorField("err", err))
ed.tryRegisterService(client, etcdClient) ed.tryRegisterService(client, etcdClient)
return return
} }
@@ -146,7 +180,7 @@ func (ed *EtcdDiscoveryService) registerServiceByClient(client *clientv3.Client,
etcdClient.keepAliveChan, err = client.KeepAlive(context.Background(), etcdClient.leaseID) etcdClient.keepAliveChan, err = client.KeepAlive(context.Background(), etcdClient.leaseID)
if err != nil { if err != nil {
log.Error("etcd KeepAlive fail", log.ErrorAttr("err", err)) log.Error("etcd KeepAlive fail", log.ErrorField("err", err))
ed.tryRegisterService(client, etcdClient) ed.tryRegisterService(client, etcdClient)
return return
} }
@@ -200,7 +234,7 @@ func (ed *EtcdDiscoveryService) retire() error {
// 注册服务节点到 etcd // 注册服务节点到 etcd
_, err := c.Put(context.Background(), ed.getRegisterKey(watchKey), ed.byteLocalNodeInfo, clientv3.WithLease(ec.leaseID)) _, err := c.Put(context.Background(), ed.getRegisterKey(watchKey), ed.byteLocalNodeInfo, clientv3.WithLease(ec.leaseID))
if err != nil { if err != nil {
log.Error("etcd Put fail", log.ErrorAttr("err", err)) log.Error("etcd Put fail", log.ErrorField("err", err))
return err return err
} }
} }
@@ -285,12 +319,12 @@ func (ed *EtcdDiscoveryService) setNodeInfo(networkName string, nodeInfo *rpc.No
func (ed *EtcdDiscoveryService) close() { func (ed *EtcdDiscoveryService) close() {
for c, ec := range ed.mapClient { for c, ec := range ed.mapClient {
if _, err := c.Revoke(context.Background(), ec.leaseID); err != nil { if _, err := c.Revoke(context.Background(), ec.leaseID); err != nil {
log.Error("etcd Revoke fail", log.ErrorAttr("err", err)) log.Error("etcd Revoke fail", log.ErrorField("err", err))
} }
c.Watcher.Close() c.Watcher.Close()
err := c.Close() err := c.Close()
if err != nil { if err != nil {
log.Error("etcd Close fail", log.ErrorAttr("err", err)) log.Error("etcd Close fail", log.ErrorField("err", err))
} }
} }
} }
@@ -299,7 +333,7 @@ func (ed *EtcdDiscoveryService) getServices(client *clientv3.Client, etcdClient
// 根据前缀获取现有的key // 根据前缀获取现有的key
resp, err := client.Get(context.Background(), watchKey, clientv3.WithPrefix()) resp, err := client.Get(context.Background(), watchKey, clientv3.WithPrefix())
if err != nil { if err != nil {
log.Error("etcd Get fail", log.ErrorAttr("err", err)) log.Error("etcd Get fail", log.ErrorField("err", err))
ed.tryWatch(client, etcdClient) ed.tryWatch(client, etcdClient)
return false return false
} }
@@ -322,11 +356,7 @@ func (ed *EtcdDiscoveryService) watchByClient(client *clientv3.Client, etcdClien
func (ed *EtcdDiscoveryService) watcher(client *clientv3.Client, etcdClient *etcdClientInfo, watchKey string) { func (ed *EtcdDiscoveryService) watcher(client *clientv3.Client, etcdClient *etcdClientInfo, watchKey string) {
defer func() { defer func() {
if r := recover(); r != nil { if r := recover(); r != nil {
buf := make([]byte, 4096) log.StackError(fmt.Sprint(r))
l := runtime.Stack(buf, false)
errString := fmt.Sprint(r)
log.Dump(string(buf[:l]), log.String("error", errString))
ed.tryWatch(client, etcdClient) ed.tryWatch(client, etcdClient)
} }
}() }()
@@ -355,7 +385,7 @@ func (ed *EtcdDiscoveryService) setNode(netWorkName string, byteNode []byte) str
var nodeInfo rpc.NodeInfo var nodeInfo rpc.NodeInfo
err := proto.Unmarshal(byteNode, &nodeInfo) err := proto.Unmarshal(byteNode, &nodeInfo)
if err != nil { if err != nil {
log.Error("Unmarshal fail", log.String("netWorkName", netWorkName), log.ErrorAttr("err", err)) log.Error("Unmarshal fail", log.String("netWorkName", netWorkName), log.ErrorField("err", err))
return "" return ""
} }
@@ -494,7 +524,7 @@ func (ed *EtcdDiscoveryService) RPC_ServiceRecord(etcdServiceRecord *service.Etc
ctx, _ := context.WithTimeout(context.Background(), time.Second*3) ctx, _ := context.WithTimeout(context.Background(), time.Second*3)
lg, err = client.Grant(ctx, etcdServiceRecord.TTLSecond) lg, err = client.Grant(ctx, etcdServiceRecord.TTLSecond)
if err != nil { if err != nil {
log.Error("etcd record fail,cannot grant lease", log.ErrorAttr("err", err)) log.Error("etcd record fail,cannot grant lease", log.ErrorField("err", err))
return errors.New("cannot grant lease") return errors.New("cannot grant lease")
} }
} }
@@ -503,14 +533,14 @@ func (ed *EtcdDiscoveryService) RPC_ServiceRecord(etcdServiceRecord *service.Etc
ctx, _ := context.WithTimeout(context.Background(), time.Second*3) ctx, _ := context.WithTimeout(context.Background(), time.Second*3)
_, err = client.Put(ctx, path.Join(originDir, etcdServiceRecord.RecordKey), etcdServiceRecord.RecordInfo, clientv3.WithLease(lg.ID)) _, err = client.Put(ctx, path.Join(originDir, etcdServiceRecord.RecordKey), etcdServiceRecord.RecordInfo, clientv3.WithLease(lg.ID))
if err != nil { if err != nil {
log.Error("etcd record fail,cannot put record", log.ErrorAttr("err", err)) log.Error("etcd record fail,cannot put record", log.ErrorField("err", err))
} }
return errors.New("cannot put record") return errors.New("cannot put record")
} }
_, err = client.Put(context.Background(), path.Join(originDir, etcdServiceRecord.RecordKey), etcdServiceRecord.RecordInfo) _, err = client.Put(context.Background(), path.Join(originDir, etcdServiceRecord.RecordKey), etcdServiceRecord.RecordInfo)
if err != nil { if err != nil {
log.Error("etcd record fail,cannot put record", log.ErrorAttr("err", err)) log.Error("etcd record fail,cannot put record", log.ErrorField("err", err))
return errors.New("cannot put record") return errors.New("cannot put record")
} }

View File

@@ -471,7 +471,7 @@ func (dc *OriginDiscoveryClient) OnRelease() {
err := dc.CallNodeWithTimeout(3*time.Second, masterNodeList.MasterNodeList[i].NodeId, UnRegServiceDiscover, &nodeRetireReq, &rpc.Empty{}) err := dc.CallNodeWithTimeout(3*time.Second, masterNodeList.MasterNodeList[i].NodeId, UnRegServiceDiscover, &nodeRetireReq, &rpc.Empty{})
if err != nil { if err != nil {
log.Error("call "+UnRegServiceDiscover+" is fail", log.ErrorAttr("err", err)) log.Error("call "+UnRegServiceDiscover+" is fail", log.ErrorField("err", err))
} }
} }
} }
@@ -493,7 +493,7 @@ func (dc *OriginDiscoveryClient) OnRetire() {
err := dc.GoNode(masterNodeList.MasterNodeList[i].NodeId, NodeRetireRpcMethod, &nodeRetireReq) err := dc.GoNode(masterNodeList.MasterNodeList[i].NodeId, NodeRetireRpcMethod, &nodeRetireReq)
if err != nil { if err != nil {
log.Error("call "+NodeRetireRpcMethod+" is fail", log.ErrorAttr("err", err)) log.Error("call "+NodeRetireRpcMethod+" is fail", log.ErrorField("err", err))
} }
} }
} }

View File

@@ -7,6 +7,7 @@ import (
"github.com/duanhf2012/origin/v2/rpc" "github.com/duanhf2012/origin/v2/rpc"
jsoniter "github.com/json-iterator/go" jsoniter "github.com/json-iterator/go"
"gopkg.in/yaml.v3" "gopkg.in/yaml.v3"
"io/fs"
"os" "os"
"path/filepath" "path/filepath"
"strings" "strings"
@@ -15,9 +16,15 @@ import (
var json = jsoniter.ConfigCompatibleWithStandardLibrary var json = jsoniter.ConfigCompatibleWithStandardLibrary
type EtcdList struct { type EtcdList struct {
NetworkName []string NetworkName []string
Endpoints []string Endpoints []string
UserName string
Password string
Cert string
CertKey string
Ca string
} }
type EtcdDiscovery struct { type EtcdDiscovery struct {
@@ -64,12 +71,8 @@ type NodeInfoList struct {
NodeList []NodeInfo NodeList []NodeInfo
} }
func validConfigFile(f os.DirEntry) bool { func validConfigFile(f string) bool {
if f.IsDir() == true || (filepath.Ext(f.Name()) != ".json" && filepath.Ext(f.Name()) != ".yml" && filepath.Ext(f.Name()) != ".yaml") { return strings.HasSuffix(f, ".json")|| strings.HasSuffix(f, ".yml") || strings.HasSuffix(f, ".yaml")
return false
}
return true
} }
func yamlToJson(data []byte, v interface{}) ([]byte, error) { func yamlToJson(data []byte, v interface{}) ([]byte, error) {
@@ -88,15 +91,16 @@ func yamlToJson(data []byte, v interface{}) ([]byte, error) {
} }
func unmarshalConfig(data []byte, v interface{}) error { func unmarshalConfig(data []byte, v interface{}) error {
if !json.Valid(data) { envData := []byte(os.ExpandEnv(string(data)))
if !json.Valid(envData) {
var err error var err error
data, err = yamlToJson(data, v) envData, err = yamlToJson(envData, v)
if err != nil { if err != nil {
return err return err
} }
} }
return json.Unmarshal(data, v) return json.Unmarshal(envData, v)
} }
func (d *DiscoveryInfo) getDiscoveryType() DiscoveryType { func (d *DiscoveryInfo) getDiscoveryType() DiscoveryType {
@@ -270,32 +274,33 @@ func (cls *Cluster) readLocalClusterConfig(nodeId string) (DiscoveryInfo, []Node
var discoveryInfo DiscoveryInfo var discoveryInfo DiscoveryInfo
var rpcMode RpcMode var rpcMode RpcMode
clusterCfgPath := strings.TrimRight(configDir, "/") + "/cluster"
fileInfoList, err := os.ReadDir(clusterCfgPath)
if err != nil {
return discoveryInfo, nil, rpcMode, fmt.Errorf("read dir %s is fail :%+v", clusterCfgPath, err)
}
//读取任何文件,只读符合格式的配置,目录下的文件可以自定义分文件 //读取任何文件,只读符合格式的配置,目录下的文件可以自定义分文件
for _, f := range fileInfoList { err := filepath.Walk(configDir, func(path string, info fs.FileInfo, err error)error {
if !validConfigFile(f) { if info.IsDir() {
continue return nil
} }
filePath := strings.TrimRight(strings.TrimRight(clusterCfgPath, "/"), "\\") + "/" + f.Name() if err != nil {
fileNodeInfoList, rErr := cls.ReadClusterConfig(filePath) return err
}
if !validConfigFile(info.Name()) {
return nil
}
fileNodeInfoList, rErr := cls.ReadClusterConfig(path)
if rErr != nil { if rErr != nil {
return discoveryInfo, nil, rpcMode, fmt.Errorf("read file path %s is error:%+v", filePath, rErr) return fmt.Errorf("read file path %s is error:%+v", path, rErr)
} }
err = cls.SetRpcMode(&fileNodeInfoList.RpcMode, &rpcMode) err = cls.SetRpcMode(&fileNodeInfoList.RpcMode, &rpcMode)
if err != nil { if err != nil {
return discoveryInfo, nil, rpcMode, err return err
} }
err = discoveryInfo.setDiscovery(&fileNodeInfoList.Discovery) err = discoveryInfo.setDiscovery(&fileNodeInfoList.Discovery)
if err != nil { if err != nil {
return discoveryInfo, nil, rpcMode, err return err
} }
for _, nodeInfo := range fileNodeInfoList.NodeList { for _, nodeInfo := range fileNodeInfoList.NodeList {
@@ -303,6 +308,12 @@ func (cls *Cluster) readLocalClusterConfig(nodeId string) (DiscoveryInfo, []Node
nodeInfoList = append(nodeInfoList, nodeInfo) nodeInfoList = append(nodeInfoList, nodeInfo)
} }
} }
return nil
})
if err != nil {
return discoveryInfo, nil, rpcMode, err
} }
if nodeId != rpc.NodeIdNull && (len(nodeInfoList) != 1) { if nodeId != rpc.NodeIdNull && (len(nodeInfoList) != 1) {
@@ -324,32 +335,32 @@ func (cls *Cluster) readLocalClusterConfig(nodeId string) (DiscoveryInfo, []Node
} }
func (cls *Cluster) readLocalService(localNodeId string) error { func (cls *Cluster) readLocalService(localNodeId string) error {
clusterCfgPath := strings.TrimRight(configDir, "/") + "/cluster"
fileInfoList, err := os.ReadDir(clusterCfgPath)
if err != nil {
return fmt.Errorf("read dir %s is fail :%+v", clusterCfgPath, err)
}
var globalCfg interface{} var globalCfg interface{}
publicService := map[string]interface{}{} publicService := map[string]interface{}{}
nodeService := map[string]interface{}{} nodeService := map[string]interface{}{}
//读取任何文件,只读符合格式的配置,目录下的文件可以自定义分文件 //读取任何文件,只读符合格式的配置,目录下的文件可以自定义分文件
for _, f := range fileInfoList { err := filepath.Walk(configDir, func(path string, info fs.FileInfo, err error)error{
if !validConfigFile(f) { if info.IsDir() {
continue return nil
} }
filePath := strings.TrimRight(strings.TrimRight(clusterCfgPath, "/"), "\\") + "/" + f.Name()
currGlobalCfg, serviceConfig, mapNodeService, err := cls.readServiceConfig(filePath)
if err != nil { if err != nil {
continue return err
}
if !validConfigFile(info.Name()) {
return nil
}
currGlobalCfg, serviceConfig, mapNodeService, err := cls.readServiceConfig(path)
if err != nil {
return err
} }
if currGlobalCfg != nil { if currGlobalCfg != nil {
//不允许重复的配置global配置 //不允许重复的配置global配置
if globalCfg != nil { if globalCfg != nil {
return fmt.Errorf("[Global] does not allow repeated configuration in %s", f.Name()) return fmt.Errorf("[Global] does not allow repeated configuration in %s", info.Name())
} }
globalCfg = currGlobalCfg globalCfg = currGlobalCfg
} }
@@ -365,7 +376,7 @@ func (cls *Cluster) readLocalService(localNodeId string) error {
pubCfg, ok := serviceConfig[s] pubCfg, ok := serviceConfig[s]
if ok == true { if ok == true {
if _, publicOk := publicService[s]; publicOk == true { if _, publicOk := publicService[s]; publicOk == true {
return fmt.Errorf("public service [%s] does not allow repeated configuration in %s", s, f.Name()) return fmt.Errorf("public service [%s] does not allow repeated configuration in %s", s, info.Name())
} }
publicService[s] = pubCfg publicService[s] = pubCfg
} }
@@ -381,12 +392,17 @@ func (cls *Cluster) readLocalService(localNodeId string) error {
} }
if _, nodeOK := nodeService[s]; nodeOK == true { if _, nodeOK := nodeService[s]; nodeOK == true {
return fmt.Errorf("NodeService NodeId[%s] Service[%s] does not allow repeated configuration in %s", cls.localNodeInfo.NodeId, s, f.Name()) return fmt.Errorf("NodeService NodeId[%s] Service[%s] does not allow repeated configuration in %s", cls.localNodeInfo.NodeId, s, info.Name())
} }
nodeService[s] = nodeCfg nodeService[s] = nodeCfg
break break
} }
} }
return nil
})
if err != nil {
return err
} }
//组合所有的配置 //组合所有的配置
@@ -416,13 +432,12 @@ func (cls *Cluster) readLocalService(localNodeId string) error {
return nil return nil
} }
func (cls *Cluster) parseLocalCfg() { func (cls *Cluster) parseLocalCfg() error{
rpcInfo := NodeRpcInfo{} rpcInfo := NodeRpcInfo{}
rpcInfo.nodeInfo = cls.localNodeInfo rpcInfo.nodeInfo = cls.localNodeInfo
rpcInfo.client = rpc.NewLClient(rpcInfo.nodeInfo.NodeId, &cls.callSet) rpcInfo.client = rpc.NewLClient(rpcInfo.nodeInfo.NodeId, &cls.callSet)
cls.mapRpc[cls.localNodeInfo.NodeId] = &rpcInfo cls.mapRpc[cls.localNodeInfo.NodeId] = &rpcInfo
for _, serviceName := range cls.localNodeInfo.ServiceList { for _, serviceName := range cls.localNodeInfo.ServiceList {
splitServiceName := strings.Split(serviceName, ":") splitServiceName := strings.Split(serviceName, ":")
if len(splitServiceName) == 2 { if len(splitServiceName) == 2 {
@@ -439,8 +454,13 @@ func (cls *Cluster) parseLocalCfg() {
cls.mapServiceNode[serviceName] = make(map[string]struct{}) cls.mapServiceNode[serviceName] = make(map[string]struct{})
} }
if _,ok:=cls.mapServiceNode[serviceName][cls.localNodeInfo.NodeId];ok {
return fmt.Errorf("duplicate service %s is configured in node %s", serviceName, cls.localNodeInfo.NodeId)
}
cls.mapServiceNode[serviceName][cls.localNodeInfo.NodeId] = struct{}{} cls.mapServiceNode[serviceName][cls.localNodeInfo.NodeId] = struct{}{}
} }
return nil
} }
func (cls *Cluster) IsNatsMode() bool { func (cls *Cluster) IsNatsMode() bool {
@@ -473,8 +493,7 @@ func (cls *Cluster) InitCfg(localNodeId string) error {
} }
//本地配置服务加到全局map信息中 //本地配置服务加到全局map信息中
cls.parseLocalCfg() return cls.parseLocalCfg()
return nil
} }
func (cls *Cluster) IsConfigService(serviceName string) bool { func (cls *Cluster) IsConfigService(serviceName string) bool {

View File

@@ -58,7 +58,7 @@ func (c *Concurrent) AsyncDoByQueue(queueId int64, fn func() bool, cb func(err e
} }
if fn == nil && cb == nil { if fn == nil && cb == nil {
log.Stack("fn and cb is nil") log.StackError("fn and cb is nil")
return return
} }

View File

@@ -6,7 +6,7 @@ import (
"time" "time"
"fmt" "fmt"
"runtime"
"context" "context"
"github.com/duanhf2012/origin/v2/log" "github.com/duanhf2012/origin/v2/log"
@@ -192,10 +192,7 @@ breakFor:
func (d *dispatch) DoCallback(cb func(err error)) { func (d *dispatch) DoCallback(cb func(err error)) {
defer func() { defer func() {
if r := recover(); r != nil { if r := recover(); r != nil {
buf := make([]byte, 4096) log.StackError(fmt.Sprint(r))
l := runtime.Stack(buf, false)
errString := fmt.Sprint(r)
log.Dump(string(buf[:l]), log.String("error", errString))
} }
}() }()

View File

@@ -5,7 +5,7 @@ import (
"errors" "errors"
"fmt" "fmt"
"runtime"
"github.com/duanhf2012/origin/v2/log" "github.com/duanhf2012/origin/v2/log"
) )
@@ -51,15 +51,13 @@ func (w *worker) run(waitGroup *sync.WaitGroup, t task) {
func (w *worker) exec(t *task) { func (w *worker) exec(t *task) {
defer func() { defer func() {
if r := recover(); r != nil { if r := recover(); r != nil {
buf := make([]byte, 4096)
l := runtime.Stack(buf, false)
errString := fmt.Sprint(r) errString := fmt.Sprint(r)
cb := t.cb cb := t.cb
t.cb = func(err error) { t.cb = func(err error) {
cb(errors.New(errString)) cb(errors.New(errString))
} }
log.Dump(string(buf[:l]), log.String("error", errString)) log.StackError(errString)
w.endCallFun(true, t) w.endCallFun(true, t)
} }
}() }()

View File

@@ -3,7 +3,6 @@ package event
import ( import (
"fmt" "fmt"
"github.com/duanhf2012/origin/v2/log" "github.com/duanhf2012/origin/v2/log"
"runtime"
"sync" "sync"
) )
@@ -215,10 +214,7 @@ func (handler *EventHandler) Destroy() {
func (processor *EventProcessor) EventHandler(ev IEvent) { func (processor *EventProcessor) EventHandler(ev IEvent) {
defer func() { defer func() {
if r := recover(); r != nil { if r := recover(); r != nil {
buf := make([]byte, 4096) log.StackError(fmt.Sprint(r))
l := runtime.Stack(buf, false)
errString := fmt.Sprint(r)
log.Dump(string(buf[:l]), log.String("error", errString))
} }
}() }()

1
go.mod
View File

@@ -21,6 +21,7 @@ require (
go.mongodb.org/mongo-driver v1.9.1 go.mongodb.org/mongo-driver v1.9.1
go.uber.org/zap v1.27.0 go.uber.org/zap v1.27.0
google.golang.org/protobuf v1.34.1 google.golang.org/protobuf v1.34.1
gopkg.in/natefinch/lumberjack.v2 v2.2.1
gopkg.in/yaml.v3 v3.0.1 gopkg.in/yaml.v3 v3.0.1
) )

2
go.sum
View File

@@ -330,6 +330,8 @@ gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
gopkg.in/natefinch/lumberjack.v2 v2.2.1 h1:bBRl1b0OH9s/DuPhuXpNl+VtCaJXFZ5/uEFST95x9zc=
gopkg.in/natefinch/lumberjack.v2 v2.2.1/go.mod h1:YD8tP3GAjkrDg1eZH7EGmyESg/lsYskCTPBJVb9jqSc=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=

View File

@@ -1,93 +0,0 @@
package log
import (
"strconv"
)
const _size = 9216
type Buffer struct {
bs []byte
//mu sync.Mutex // ensures atomic writes; protects the following fields
}
func (buff *Buffer) Init() {
buff.bs = make([]byte, _size)
}
// AppendByte writes a single byte to the Buffer.
func (buff *Buffer) AppendByte(v byte) {
buff.bs = append(buff.bs, v)
}
func (buff *Buffer) AppendBytes(v []byte) {
buff.bs = append(buff.bs, v...)
}
// AppendString writes a string to the Buffer.
func (buff *Buffer) AppendString(s string) {
buff.bs = append(buff.bs, s...)
}
// AppendInt appends an integer to the underlying buffer (assuming base 10).
func (buff *Buffer) AppendInt(i int64) {
buff.bs = strconv.AppendInt(buff.bs, i, 10)
}
// AppendUint appends an unsigned integer to the underlying buffer (assuming
// base 10).
func (buff *Buffer) AppendUint(i uint64) {
buff.bs = strconv.AppendUint(buff.bs, i, 10)
}
// AppendBool appends a bool to the underlying buffer.
func (buff *Buffer) AppendBool(v bool) {
buff.bs = strconv.AppendBool(buff.bs, v)
}
// AppendFloat appends a float to the underlying buffer. It doesn't quote NaN
// or +/- Inf.
func (buff *Buffer) AppendFloat(f float64, bitSize int) {
buff.bs = strconv.AppendFloat(buff.bs, f, 'f', -1, bitSize)
}
// Len returns the length of the underlying byte slice.
func (buff *Buffer) Len() int {
return len(buff.bs)
}
// Cap returns the capacity of the underlying byte slice.
func (buff *Buffer) Cap() int {
return cap(buff.bs)
}
// Bytes returns a mutable reference to the underlying byte slice.
func (buff *Buffer) Bytes() []byte {
return buff.bs
}
// String returns a string copy of the underlying byte slice.
func (buff *Buffer) String() string {
return string(buff.bs)
}
// Reset resets the underlying byte slice. Subsequent writes re-use the slice's
// backing array.
func (buff *Buffer) Reset() {
buff.bs = buff.bs[:0]
}
// Write implements io.Writer.
func (buff *Buffer) Write(bs []byte) (int, error) {
buff.bs = append(buff.bs, bs...)
return len(bs), nil
}
// TrimNewline trims any final "\n" byte from the end of the buffer.
func (buff *Buffer) TrimNewline() {
if i := len(buff.bs) - 1; i >= 0 {
if buff.bs[i] == '\n' {
buff.bs = buff.bs[:i]
}
}
}

View File

@@ -1,161 +0,0 @@
package log
import (
"context"
"io"
"log/slog"
"path/filepath"
"runtime"
"runtime/debug"
"sync"
)
const defaultSkip = 7
type IOriginHandler interface {
slog.Handler
Lock()
UnLock()
SetSkip(skip int)
GetSkip() int
}
type BaseHandler struct {
addSource bool
w io.Writer
locker sync.Mutex
skip int
}
type OriginTextHandler struct {
BaseHandler
*slog.TextHandler
}
type OriginJsonHandler struct {
BaseHandler
*slog.JSONHandler
}
func (bh *BaseHandler) SetSkip(skip int) {
bh.skip = skip
}
func (bh *BaseHandler) GetSkip() int {
return bh.skip
}
func getStrLevel(level slog.Level) string {
switch level {
case LevelTrace:
return "Trace"
case LevelDebug:
return "Debug"
case LevelInfo:
return "Info"
case LevelWarning:
return "Warning"
case LevelError:
return "Error"
case LevelStack:
return "Stack"
case LevelDump:
return "Dump"
case LevelFatal:
return "Fatal"
}
return ""
}
func defaultReplaceAttr(groups []string, a slog.Attr) slog.Attr {
if a.Key == slog.LevelKey {
level := a.Value.Any().(slog.Level)
a.Value = slog.StringValue(getStrLevel(level))
} else if a.Key == slog.TimeKey && len(groups) == 0 {
a.Value = slog.StringValue(a.Value.Time().Format("2006/01/02 15:04:05"))
} else if a.Key == slog.SourceKey {
source := a.Value.Any().(*slog.Source)
source.File = filepath.Base(source.File)
}
return a
}
func NewOriginTextHandler(level slog.Level, w io.Writer, addSource bool, replaceAttr func([]string, slog.Attr) slog.Attr) slog.Handler {
var textHandler OriginTextHandler
textHandler.addSource = addSource
textHandler.w = w
textHandler.TextHandler = slog.NewTextHandler(w, &slog.HandlerOptions{
AddSource: addSource,
Level: level,
ReplaceAttr: replaceAttr,
})
textHandler.skip = defaultSkip
return &textHandler
}
func (oh *OriginTextHandler) Handle(context context.Context, record slog.Record) error {
oh.Fill(context, &record)
oh.locker.Lock()
defer oh.locker.Unlock()
if record.Level == LevelStack || record.Level == LevelFatal {
err := oh.TextHandler.Handle(context, record)
oh.logStack(&record)
return err
} else if record.Level == LevelDump {
strDump := record.Message
record.Message = "dump info"
err := oh.TextHandler.Handle(context, record)
oh.w.Write([]byte(strDump))
return err
}
return oh.TextHandler.Handle(context, record)
}
func (bh *BaseHandler) logStack(record *slog.Record) {
bh.w.Write(debug.Stack())
}
func (bh *BaseHandler) Lock() {
bh.locker.Lock()
}
func (bh *BaseHandler) UnLock() {
bh.locker.Unlock()
}
func NewOriginJsonHandler(level slog.Level, w io.Writer, addSource bool, replaceAttr func([]string, slog.Attr) slog.Attr) slog.Handler {
var jsonHandler OriginJsonHandler
jsonHandler.addSource = addSource
jsonHandler.w = w
jsonHandler.JSONHandler = slog.NewJSONHandler(w, &slog.HandlerOptions{
AddSource: addSource,
Level: level,
ReplaceAttr: replaceAttr,
})
jsonHandler.skip = defaultSkip
return &jsonHandler
}
func (oh *OriginJsonHandler) Handle(context context.Context, record slog.Record) error {
oh.Fill(context, &record)
if record.Level == LevelStack || record.Level == LevelFatal || record.Level == LevelDump {
record.Add("stack", debug.Stack())
}
oh.locker.Lock()
defer oh.locker.Unlock()
return oh.JSONHandler.Handle(context, record)
}
func (bh *BaseHandler) Fill(_ context.Context, record *slog.Record) {
if bh.addSource {
var pcs [1]uintptr
runtime.Callers(bh.skip, pcs[:])
record.PC = pcs[0]
}
}

View File

@@ -1,520 +1,337 @@
package log package log
import ( import (
"context" "go.uber.org/zap"
"fmt" "go.uber.org/zap/zapcore"
"github.com/duanhf2012/origin/v2/util/bytespool" "gopkg.in/natefinch/lumberjack.v2"
"io"
"log/slog"
"os" "os"
"path"
"path/filepath"
"runtime"
"sync"
"sync/atomic"
"time" "time"
) )
var OpenConsole bool
var LogSize int64
var LogChannelCap int
var LogPath string
var LogLevel = LevelTrace
var gLogger, _ = NewTextLogger(LevelDebug, "", "", true, LogChannelCap)
var isSetLogger bool var isSetLogger bool
var memPool = bytespool.NewMemAreaPool() var gLogger = NewDefaultLogger()
// levels
const (
LevelTrace = slog.Level(-8)
LevelDebug = slog.LevelDebug
LevelInfo = slog.LevelInfo
LevelWarning = slog.LevelWarn
LevelError = slog.LevelError
LevelStack = slog.Level(12)
LevelDump = slog.Level(16)
LevelFatal = slog.Level(20)
)
type ILogger interface {
Trace(msg string, args ...any)
Debug(msg string, args ...any)
Info(msg string, args ...any)
Warning(msg string, args ...any)
Error(msg string, args ...any)
Stack(msg string, args ...any)
Dump(msg string, args ...any)
Fatal(msg string, args ...any)
DoSPrintf(level slog.Level, a []interface{})
FormatHeader(buf *Buffer, level slog.Level, callDepth int)
Close()
}
type Logger struct { type Logger struct {
SLogger *slog.Logger *zap.Logger
stack bool
ioWriter IoWriter OpenConsole *bool
LogPath string
sBuff Buffer FileName string
Skip int
LogLevel zapcore.Level
Encoder zapcore.Encoder
LogConfig *lumberjack.Logger
SugaredLogger *zap.SugaredLogger
CoreList []zapcore.Core
} }
type IoWriter struct { func SetLogger(logger *Logger) {
outFile io.Writer // destination for output if logger != nil {
writeBytes int64
logChannel chan []byte
wg sync.WaitGroup
closeSig chan struct{}
lockWrite sync.Mutex
filePath string
filePrefix string
fileDay int
fileCreateTime int64 //second
}
func (iw *IoWriter) Close() error {
iw.lockWrite.Lock()
defer iw.lockWrite.Unlock()
iw.close()
return nil
}
func (iw *IoWriter) close() error {
if iw.closeSig != nil {
close(iw.closeSig)
iw.closeSig = nil
}
iw.wg.Wait()
if iw.outFile != nil {
err := iw.outFile.(io.Closer).Close()
iw.outFile = nil
return err
}
return nil
}
func (iw *IoWriter) writeFile(p []byte) (n int, err error) {
//switch log file
iw.switchFile()
if iw.outFile != nil {
n, err = iw.outFile.Write(p)
if n > 0 {
atomic.AddInt64(&iw.writeBytes, int64(n))
}
}
return 0, nil
}
func (iw *IoWriter) Write(p []byte) (n int, err error) {
iw.lockWrite.Lock()
defer iw.lockWrite.Unlock()
if iw.logChannel == nil {
return iw.writeIo(p)
}
copyBuff := memPool.MakeBytes(len(p))
if copyBuff == nil {
return 0, fmt.Errorf("MakeByteSlice failed")
}
copy(copyBuff, p)
iw.logChannel <- copyBuff
return
}
func (iw *IoWriter) writeIo(p []byte) (n int, err error) {
n, err = iw.writeFile(p)
if OpenConsole {
n, err = os.Stdout.Write(p)
}
return
}
func (iw *IoWriter) setLogChannel(logChannelNum int) (err error) {
iw.lockWrite.Lock()
defer iw.lockWrite.Unlock()
iw.close()
if logChannelNum == 0 {
return nil
}
//copy iw.logChannel
var logInfo []byte
logChannel := make(chan []byte, logChannelNum)
for i := 0; i < logChannelNum && i < len(iw.logChannel); i++ {
logInfo = <-iw.logChannel
logChannel <- logInfo
}
iw.logChannel = logChannel
iw.closeSig = make(chan struct{})
iw.wg.Add(1)
go iw.run()
return nil
}
func (iw *IoWriter) run() {
defer iw.wg.Done()
Loop:
for {
select {
case <-iw.closeSig:
break Loop
case logs := <-iw.logChannel:
iw.writeIo(logs)
memPool.ReleaseBytes(logs)
}
}
for len(iw.logChannel) > 0 {
logs := <-iw.logChannel
iw.writeIo(logs)
memPool.ReleaseBytes(logs)
}
}
func (iw *IoWriter) isFull() bool {
if LogSize == 0 {
return false
}
return atomic.LoadInt64(&iw.writeBytes) >= LogSize
}
func (logger *Logger) setLogChannel(logChannel int) (err error) {
return logger.ioWriter.setLogChannel(logChannel)
}
func (iw *IoWriter) switchFile() error {
now := time.Now()
if iw.fileCreateTime == now.Unix() {
return nil
}
if iw.fileDay == now.Day() && iw.isFull() == false {
return nil
}
if iw.filePath != "" {
var err error
fileName := fmt.Sprintf("%s%d%02d%02d_%02d_%02d_%02d.log",
iw.filePrefix,
now.Year(),
now.Month(),
now.Day(),
now.Hour(),
now.Minute(),
now.Second())
filePath := path.Join(iw.filePath, fileName)
iw.outFile, err = os.Create(filePath)
if err != nil {
return err
}
iw.fileDay = now.Day()
iw.fileCreateTime = now.Unix()
atomic.StoreInt64(&iw.writeBytes, 0)
}
return nil
}
func GetDefaultHandler() IOriginHandler {
return gLogger.(*Logger).SLogger.Handler().(IOriginHandler)
}
func NewTextLogger(level slog.Level, pathName string, filePrefix string, addSource bool, logChannelCap int) (ILogger, error) {
var logger Logger
logger.ioWriter.filePath = pathName
logger.ioWriter.filePrefix = filePrefix
logger.SLogger = slog.New(NewOriginTextHandler(level, &logger.ioWriter, addSource, defaultReplaceAttr))
logger.setLogChannel(logChannelCap)
err := logger.ioWriter.switchFile()
if err != nil {
return nil, err
}
return &logger, nil
}
func NewJsonLogger(level slog.Level, pathName string, filePrefix string, addSource bool, logChannelCap int) (ILogger, error) {
var logger Logger
logger.ioWriter.filePath = pathName
logger.ioWriter.filePrefix = filePrefix
logger.SLogger = slog.New(NewOriginJsonHandler(level, &logger.ioWriter, true, defaultReplaceAttr))
logger.setLogChannel(logChannelCap)
err := logger.ioWriter.switchFile()
if err != nil {
return nil, err
}
return &logger, nil
}
// Close It's dangerous to call the method on logging
func (logger *Logger) Close() {
logger.ioWriter.Close()
}
func (logger *Logger) Trace(msg string, args ...any) {
logger.SLogger.Log(context.Background(), LevelTrace, msg, args...)
}
func (logger *Logger) Debug(msg string, args ...any) {
logger.SLogger.Log(context.Background(), LevelDebug, msg, args...)
}
func (logger *Logger) Info(msg string, args ...any) {
logger.SLogger.Log(context.Background(), LevelInfo, msg, args...)
}
func (logger *Logger) Warning(msg string, args ...any) {
logger.SLogger.Log(context.Background(), LevelWarning, msg, args...)
}
func (logger *Logger) Error(msg string, args ...any) {
logger.SLogger.Log(context.Background(), LevelError, msg, args...)
}
func (logger *Logger) Stack(msg string, args ...any) {
logger.SLogger.Log(context.Background(), LevelStack, msg, args...)
}
func (logger *Logger) Dump(msg string, args ...any) {
logger.SLogger.Log(context.Background(), LevelDump, msg, args...)
}
func (logger *Logger) Fatal(msg string, args ...any) {
logger.SLogger.Log(context.Background(), LevelFatal, msg, args...)
os.Exit(1)
}
// SetLogger It's non-thread-safe
func SetLogger(logger ILogger) {
if logger != nil && isSetLogger == false {
gLogger = logger gLogger = logger
isSetLogger = true isSetLogger = true
} }
} }
func GetLogger() ILogger { func GetLogger() *Logger {
return gLogger return gLogger
} }
func Trace(msg string, args ...any) { func (logger *Logger) SetEncoder(encoder zapcore.Encoder) {
gLogger.Trace(msg, args...) logger.Encoder = encoder
} }
func Debug(msg string, args ...any) { func (logger *Logger) SetSkip(skip int) {
gLogger.Debug(msg, args...) logger.Skip = skip
} }
func Info(msg string, args ...any) { func GetJsonEncoder() zapcore.Encoder {
gLogger.Info(msg, args...) encoderConfig := zap.NewProductionEncoderConfig()
} encoderConfig.EncodeLevel = zapcore.CapitalLevelEncoder
encoderConfig.EncodeCaller = zapcore.ShortCallerEncoder
func Warning(msg string, args ...any) { encoderConfig.EncodeTime = func(t time.Time, enc zapcore.PrimitiveArrayEncoder) {
gLogger.Warning(msg, args...) enc.AppendString(t.Format("2006-01-02 15:04:05.000"))
}
func Error(msg string, args ...any) {
gLogger.Error(msg, args...)
}
func Stack(msg string, args ...any) {
gLogger.Stack(msg, args...)
}
func Dump(dump string, args ...any) {
gLogger.Dump(dump, args...)
}
func Fatal(msg string, args ...any) {
gLogger.Fatal(msg, args...)
}
func Close() {
gLogger.Close()
}
func ErrorAttr(key string, value error) slog.Attr {
if value == nil {
return slog.Attr{Key: key, Value: slog.StringValue("nil")}
} }
return slog.Attr{Key: key, Value: slog.StringValue(value.Error())} return zapcore.NewJSONEncoder(encoderConfig)
} }
func String(key, value string) slog.Attr { func GetTxtEncoder() zapcore.Encoder {
return slog.Attr{Key: key, Value: slog.StringValue(value)} encoderConfig := zap.NewProductionEncoderConfig()
encoderConfig.EncodeLevel = zapcore.CapitalLevelEncoder
encoderConfig.EncodeCaller = zapcore.ShortCallerEncoder
encoderConfig.EncodeTime = func(t time.Time, enc zapcore.PrimitiveArrayEncoder) {
enc.AppendString(t.Format("2006-01-02 15:04:05.000"))
}
return zapcore.NewConsoleEncoder(encoderConfig)
} }
func Int(key string, value int) slog.Attr { func getLogConfig() *lumberjack.Logger {
return slog.Attr{Key: key, Value: slog.Int64Value(int64(value))} return &lumberjack.Logger{
Filename: "",
MaxSize: 2048,
MaxBackups: 0,
MaxAge: 0,
Compress: false,
}
} }
func Int64(key string, value int64) slog.Attr { func NewDefaultLogger() *Logger {
return slog.Attr{Key: key, Value: slog.Int64Value(value)} logger := Logger{}
logger.Encoder = GetJsonEncoder()
logger.LogConfig = getLogConfig()
logger.LogConfig.LocalTime = true
logger.Init()
return &logger
} }
func Int32(key string, value int32) slog.Attr { func (logger *Logger) SetLogLevel(level zapcore.Level) {
return slog.Attr{Key: key, Value: slog.Int64Value(int64(value))} logger.LogLevel = level
} }
func Int16(key string, value int16) slog.Attr { func (logger *Logger) Enabled(zapcore.Level) bool {
return slog.Attr{Key: key, Value: slog.Int64Value(int64(value))} return logger.stack
} }
func Int8(key string, value int8) slog.Attr { func (logger *Logger) Init() {
return slog.Attr{Key: key, Value: slog.Int64Value(int64(value))} if isSetLogger {
}
func Uint(key string, value uint) slog.Attr {
return slog.Attr{Key: key, Value: slog.Uint64Value(uint64(value))}
}
func Uint64(key string, v uint64) slog.Attr {
return slog.Attr{Key: key, Value: slog.Uint64Value(v)}
}
func Uint32(key string, value uint32) slog.Attr {
return slog.Attr{Key: key, Value: slog.Uint64Value(uint64(value))}
}
func Uint16(key string, value uint16) slog.Attr {
return slog.Attr{Key: key, Value: slog.Uint64Value(uint64(value))}
}
func Uint8(key string, value uint8) slog.Attr {
return slog.Attr{Key: key, Value: slog.Uint64Value(uint64(value))}
}
func Float64(key string, v float64) slog.Attr {
return slog.Attr{Key: key, Value: slog.Float64Value(v)}
}
func Bool(key string, v bool) slog.Attr {
return slog.Attr{Key: key, Value: slog.BoolValue(v)}
}
func Time(key string, v time.Time) slog.Attr {
return slog.Attr{Key: key, Value: slog.TimeValue(v)}
}
func Duration(key string, v time.Duration) slog.Attr {
return slog.Attr{Key: key, Value: slog.DurationValue(v)}
}
func Any(key string, value any) slog.Attr {
return slog.Attr{Key: key, Value: slog.AnyValue(value)}
}
func Group(key string, args ...any) slog.Attr {
return slog.Group(key, args...)
}
func (logger *Logger) DoSPrintf(level slog.Level, a []interface{}) {
if logger.SLogger.Enabled(context.Background(), level) == false {
return return
} }
logger.SLogger.Handler().(IOriginHandler).Lock() var coreList []zapcore.Core
defer logger.SLogger.Handler().(IOriginHandler).UnLock() if logger.OpenConsole == nil || *logger.OpenConsole {
core := zapcore.NewCore(logger.Encoder, zapcore.AddSync(os.Stdout), logger.LogLevel)
logger.sBuff.Reset() coreList = append(coreList, core)
logger.FormatHeader(&logger.sBuff, level, 3)
for _, s := range a {
logger.sBuff.AppendString(slog.AnyValue(s).String())
} }
logger.sBuff.AppendString("\"\n")
logger.ioWriter.Write(logger.sBuff.Bytes())
}
func (logger *Logger) STrace(a ...interface{}) { if logger.CoreList != nil {
logger.DoSPrintf(LevelTrace, a) coreList = append(coreList, logger.CoreList...)
} }else if logger.LogPath != "" {
WriteSyncer := zapcore.AddSync(logger.LogConfig)
func (logger *Logger) SDebug(a ...interface{}) { core := zapcore.NewCore(logger.Encoder, WriteSyncer, logger.LogLevel)
logger.DoSPrintf(LevelDebug, a) coreList = append(coreList, core)
}
func (logger *Logger) SInfo(a ...interface{}) {
logger.DoSPrintf(LevelInfo, a)
}
func (logger *Logger) SWarning(a ...interface{}) {
logger.DoSPrintf(LevelWarning, a)
}
func (logger *Logger) SError(a ...interface{}) {
logger.DoSPrintf(LevelError, a)
}
func STrace(a ...interface{}) {
gLogger.DoSPrintf(LevelTrace, a)
}
func SDebug(a ...interface{}) {
gLogger.DoSPrintf(LevelDebug, a)
}
func SInfo(a ...interface{}) {
gLogger.DoSPrintf(LevelInfo, a)
}
func SWarning(a ...interface{}) {
gLogger.DoSPrintf(LevelWarning, a)
}
func SError(a ...interface{}) {
gLogger.DoSPrintf(LevelError, a)
}
func (logger *Logger) FormatHeader(buf *Buffer, level slog.Level, callDepth int) {
t := time.Now()
var file string
var line int
// Release lock while getting caller info - it's expensive.
var ok bool
_, file, line, ok = runtime.Caller(callDepth)
if !ok {
file = "???"
line = 0
} }
file = filepath.Base(file)
buf.AppendString("time=\"") core := zapcore.NewTee(coreList...)
buf.AppendString(t.Format("2006/01/02 15:04:05")) logger.Logger = zap.New(core, zap.AddCaller(), zap.AddStacktrace(logger), zap.AddCallerSkip(1+logger.Skip))
buf.AppendString("\"") logger.SugaredLogger = logger.Logger.Sugar()
logger.sBuff.AppendString(" level=") }
logger.sBuff.AppendString(getStrLevel(level))
logger.sBuff.AppendString(" source=") func (logger *Logger) Debug(msg string, fields ...zap.Field) {
logger.Logger.Debug(msg, fields...)
buf.AppendString(file) }
buf.AppendByte(':')
buf.AppendInt(int64(line)) func (logger *Logger) Info(msg string, fields ...zap.Field) {
buf.AppendString(" msg=\"") logger.Logger.Info(msg, fields...)
}
func (logger *Logger) Warn(msg string, fields ...zap.Field) {
logger.Logger.Warn(msg, fields...)
}
func (logger *Logger) Error(msg string, fields ...zap.Field) {
logger.Logger.Error(msg, fields...)
}
func (logger *Logger) StackError(msg string, args ...zap.Field) {
logger.stack = true
logger.Logger.Log(zapcore.ErrorLevel, msg, args...)
logger.stack = false
}
func (logger *Logger) Fatal(msg string, fields ...zap.Field) {
gLogger.stack = true
logger.Logger.Fatal(msg, fields...)
gLogger.stack = false
}
func Debug(msg string, fields ...zap.Field) {
gLogger.Logger.Debug(msg, fields...)
}
func Info(msg string, fields ...zap.Field) {
gLogger.Logger.Info(msg, fields...)
}
func Warn(msg string, fields ...zap.Field) {
gLogger.Logger.Warn(msg, fields...)
}
func Error(msg string, fields ...zap.Field) {
gLogger.Logger.Error(msg, fields...)
}
func StackError(msg string, fields ...zap.Field) {
gLogger.stack = true
gLogger.Logger.Error(msg, fields...)
gLogger.stack = false
}
func Fatal(msg string, fields ...zap.Field) {
gLogger.stack = true
gLogger.Logger.Fatal(msg, fields...)
gLogger.stack = false
}
func Debugf(template string, args ...any) {
gLogger.SugaredLogger.Debugf(template, args...)
}
func Infof(template string, args ...any) {
gLogger.SugaredLogger.Infof(template, args...)
}
func Warnf(template string, args ...any) {
gLogger.SugaredLogger.Warnf(template, args...)
}
func Errorf(template string, args ...any) {
gLogger.SugaredLogger.Errorf(template, args...)
}
func StackErrorf(template string, args ...any) {
gLogger.stack = true
gLogger.SugaredLogger.Errorf(template, args...)
gLogger.stack = false
}
func Fatalf(template string, args ...any) {
gLogger.SugaredLogger.Fatalf(template, args...)
}
func (logger *Logger) SDebug(args ...interface{}) {
logger.SugaredLogger.Debugln(args...)
}
func (logger *Logger) SInfo(args ...interface{}) {
logger.SugaredLogger.Infoln(args...)
}
func (logger *Logger) SWarn(args ...interface{}) {
logger.SugaredLogger.Warnln(args...)
}
func (logger *Logger) SError(args ...interface{}) {
logger.SugaredLogger.Errorln(args...)
}
func (logger *Logger) SStackError(args ...interface{}) {
gLogger.stack = true
logger.SugaredLogger.Errorln(args...)
gLogger.stack = false
}
func (logger *Logger) SFatal(args ...interface{}) {
gLogger.stack = true
logger.SugaredLogger.Fatalln(args...)
gLogger.stack = false
}
func SDebug(args ...interface{}) {
gLogger.SugaredLogger.Debugln(args...)
}
func SInfo(args ...interface{}) {
gLogger.SugaredLogger.Infoln(args...)
}
func SWarn(args ...interface{}) {
gLogger.SugaredLogger.Warnln(args...)
}
func SError(args ...interface{}) {
gLogger.SugaredLogger.Errorln(args...)
}
func SStackError(args ...interface{}) {
gLogger.stack = true
gLogger.SugaredLogger.Errorln(args...)
gLogger.stack = false
}
func SFatal(args ...interface{}) {
gLogger.stack = true
gLogger.SugaredLogger.Fatalln(args...)
gLogger.stack = false
}
func ErrorField(key string, value error) zap.Field {
if value == nil {
return zap.String(key, "nil")
}
return zap.String(key, value.Error())
}
func String(key, value string) zap.Field {
return zap.String(key, value)
}
func Int(key string, value int) zap.Field {
return zap.Int(key, value)
}
func Int64(key string, value int64) zap.Field {
return zap.Int64(key, value)
}
func Int32(key string, value int32) zap.Field {
return zap.Int32(key, value)
}
func Int16(key string, value int16) zap.Field {
return zap.Int16(key, value)
}
func Int8(key string, value int8) zap.Field {
return zap.Int8(key, value)
}
func Uint(key string, value uint) zap.Field {
return zap.Uint(key, value)
}
func Uint64(key string, v uint64) zap.Field {
return zap.Uint64(key, v)
}
func Uint32(key string, value uint32) zap.Field {
return zap.Uint32(key, value)
}
func Uint16(key string, value uint16) zap.Field {
return zap.Uint16(key, value)
}
func Uint8(key string, value uint8) zap.Field {
return zap.Uint8(key, value)
}
func Float64(key string, v float64) zap.Field {
return zap.Float64(key, v)
}
func Bool(key string, v bool) zap.Field {
return zap.Bool(key, v)
}
func Bools(key string, v []bool) zap.Field {
return zap.Bools(key, v)
}
func Time(key string, v time.Time) zap.Field {
return zap.Time(key, v)
}
func Duration(key string, v time.Duration) zap.Field {
return zap.Duration(key, v)
}
func Durations(key string, v []time.Duration) zap.Field {
return zap.Durations(key, v)
}
func Any(key string, value any) zap.Field {
return zap.Any(key, value)
} }

View File

@@ -111,15 +111,15 @@ func (netConn *NetConn) doWrite(b []byte) error {
} }
// b must not be modified by the others goroutines // b must not be modified by the others goroutines
func (netConn *NetConn) Write(b []byte) error { func (netConn *NetConn) Write(b []byte) (int,error) {
netConn.Lock() netConn.Lock()
defer netConn.Unlock() defer netConn.Unlock()
if atomic.LoadInt32(&netConn.closeFlag) == 1 || b == nil { if atomic.LoadInt32(&netConn.closeFlag) == 1 || b == nil {
netConn.ReleaseReadMsg(b) netConn.ReleaseReadMsg(b)
return errors.New("conn is close") return 0,errors.New("conn is close")
} }
return netConn.doWrite(b) return len(b),netConn.doWrite(b)
} }
func (netConn *NetConn) Read(b []byte) (int, error) { func (netConn *NetConn) Read(b []byte) (int, error) {
@@ -150,7 +150,7 @@ func (netConn *NetConn) WriteMsg(args ...[]byte) error {
if atomic.LoadInt32(&netConn.closeFlag) == 1 { if atomic.LoadInt32(&netConn.closeFlag) == 1 {
return errors.New("conn is close") return errors.New("conn is close")
} }
return netConn.msgParser.Write(netConn.conn, args...) return netConn.msgParser.Write(netConn, args...)
} }
func (netConn *NetConn) WriteRawMsg(args []byte) error { func (netConn *NetConn) WriteRawMsg(args []byte) error {
@@ -158,7 +158,8 @@ func (netConn *NetConn) WriteRawMsg(args []byte) error {
return errors.New("conn is close") return errors.New("conn is close")
} }
return netConn.Write(args) _,err:= netConn.Write(args)
return err
} }
func (netConn *NetConn) IsConnected() bool { func (netConn *NetConn) IsConnected() bool {

View File

@@ -106,7 +106,7 @@ func (client *KCPClient) dial() net.Conn {
return conn return conn
} }
log.Warning("connect error ", log.String("error", err.Error()), log.String("Addr", client.Addr)) log.Warn("connect error ", log.String("error", err.Error()), log.String("Addr", client.Addr))
time.Sleep(client.ConnectInterval) time.Sleep(client.ConnectInterval)
continue continue
} }

View File

@@ -203,7 +203,7 @@ func (kp *KCPServer) initSession(session *kcp.UDPSession) {
func (kp *KCPServer) run(listener *kcp.Listener) bool { func (kp *KCPServer) run(listener *kcp.Listener) bool {
conn, err := listener.Accept() conn, err := listener.Accept()
if err != nil { if err != nil {
log.Error("accept error", log.String("ListenAddr", kp.kcpCfg.ListenAddr), log.ErrorAttr("err", err)) log.Error("accept error", log.String("ListenAddr", kp.kcpCfg.ListenAddr), log.ErrorField("err", err))
return false return false
} }
@@ -211,7 +211,7 @@ func (kp *KCPServer) run(listener *kcp.Listener) bool {
if len(kp.conns) >= kp.kcpCfg.MaxConnNum { if len(kp.conns) >= kp.kcpCfg.MaxConnNum {
kp.mutexConns.Unlock() kp.mutexConns.Unlock()
conn.Close() conn.Close()
log.Warning("too many connections") log.Warn("too many connections")
return true return true
} }
kp.conns[conn] = struct{}{} kp.conns[conn] = struct{}{}

View File

@@ -102,7 +102,7 @@ func (client *TCPClient) dial() net.Conn {
return conn return conn
} }
log.Warning("connect error ", log.String("error", err.Error()), log.String("Addr", client.Addr)) log.Warn("connect error ", log.String("error", err.Error()), log.String("Addr", client.Addr))
time.Sleep(client.ConnectInterval) time.Sleep(client.ConnectInterval)
continue continue
} }

View File

@@ -146,7 +146,7 @@ func (server *TCPServer) run() {
if len(server.conns) >= server.MaxConnNum { if len(server.conns) >= server.MaxConnNum {
server.mutexConns.Unlock() server.mutexConns.Unlock()
conn.Close() conn.Close()
log.Warning("too many connections") log.Warn("too many connections")
continue continue
} }

View File

@@ -68,7 +68,7 @@ func (handler *WSHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if len(handler.conns) >= handler.maxConnNum { if len(handler.conns) >= handler.maxConnNum {
handler.mutexConns.Unlock() handler.mutexConns.Unlock()
conn.Close() conn.Close()
log.Warning("too many connections") log.Warn("too many connections")
return return
} }
handler.conns[conn] = struct{}{} handler.conns[conn] = struct{}{}

View File

@@ -11,11 +11,13 @@ import (
"github.com/duanhf2012/origin/v2/util/buildtime" "github.com/duanhf2012/origin/v2/util/buildtime"
"github.com/duanhf2012/origin/v2/util/sysprocess" "github.com/duanhf2012/origin/v2/util/sysprocess"
"github.com/duanhf2012/origin/v2/util/timer" "github.com/duanhf2012/origin/v2/util/timer"
"go.uber.org/zap/zapcore"
"io" "io"
"net/http" "net/http"
_ "net/http/pprof" _ "net/http/pprof"
"os" "os"
"os/signal" "os/signal"
"path/filepath"
"strconv" "strconv"
"strings" "strings"
"syscall" "syscall"
@@ -54,10 +56,9 @@ func init() {
console.RegisterCommandString("retire", "", "<-retire nodeid=nodeid> retire originserver process.", retireNode) console.RegisterCommandString("retire", "", "<-retire nodeid=nodeid> retire originserver process.", retireNode)
console.RegisterCommandString("config", "", "<-config path> Configuration file path.", setConfigPath) console.RegisterCommandString("config", "", "<-config path> Configuration file path.", setConfigPath)
console.RegisterCommandString("console", "", "<-console true|false> Turn on or off screen log output.", openConsole) console.RegisterCommandString("console", "", "<-console true|false> Turn on or off screen log output.", openConsole)
console.RegisterCommandString("loglevel", "debug", "<-loglevel debug|release|warning|error|fatal> Set loglevel.", setLevel) console.RegisterCommandString("loglevel", "debug", "<-loglevel debug|info|warn|error|stackerror|fatal> Set loglevel.", setLevel)
console.RegisterCommandString("logpath", "", "<-logpath path> Set log file path.", setLogPath) console.RegisterCommandString("logpath", "", "<-logpath path> Set log file path.", setLogPath)
console.RegisterCommandInt("logsize", 0, "<-logsize size> Set log size(MB).", setLogSize) console.RegisterCommandInt("logsize", 0, "<-logsize size> Set log size(MB).", setLogSize)
console.RegisterCommandInt("logchannelcap", -1, "<-logchannelcap num> Set log channel cap.", setLogChannelCapNum)
console.RegisterCommandString("pprof", "", "<-pprof ip:port> Open performance analysis.", setPprof) console.RegisterCommandString("pprof", "", "<-pprof ip:port> Open performance analysis.", setPprof)
} }
@@ -156,12 +157,14 @@ func initNode(id string) {
nodeId = id nodeId = id
err := cluster.GetCluster().Init(GetNodeId(), Setup) err := cluster.GetCluster().Init(GetNodeId(), Setup)
if err != nil { if err != nil {
log.Error("Init cluster fail", log.ErrorAttr("error", err)) log.Error("Init cluster fail", log.ErrorField("error", err))
os.Exit(1) os.Exit(1)
} }
err = initLog() err = initLog()
if err != nil { if err != nil {
log.Error("Init log fail", log.ErrorField("error", err))
os.Exit(1)
return return
} }
@@ -211,22 +214,25 @@ func initNode(id string) {
} }
//3.service初始化 //3.service初始化
log.Info("Start running server.")
service.Init() service.Init()
} }
func initLog() error { func initLog() error {
if log.LogPath == "" { logger := log.GetLogger()
setLogPath("./log") if logger.LogPath == "" {
err := setLogPath("./log")
if err != nil {
return err
}
} }
localNodeInfo := cluster.GetCluster().GetLocalNodeInfo() localNodeInfo := cluster.GetCluster().GetLocalNodeInfo()
filePre := fmt.Sprintf("%s_", localNodeInfo.NodeId) fileName := fmt.Sprintf("%s.log", localNodeInfo.NodeId)
logger, err := log.NewTextLogger(log.LogLevel, log.LogPath, filePre, true, log.LogChannelCap) logger.FileName = fileName
if err != nil { logger.LogConfig.Filename = filepath.Join(logger.LogPath, logger.FileName)
fmt.Printf("cannot create log file!\n")
return err logger.Init()
}
log.SetLogger(logger)
return nil return nil
} }
@@ -323,34 +329,37 @@ func startNode(args interface{}) error {
myName, mErr := sysprocess.GetMyProcessName() myName, mErr := sysprocess.GetMyProcessName()
//当前进程名获取失败,不应该发生 //当前进程名获取失败,不应该发生
if mErr != nil { if mErr != nil {
log.Info("get my process's name is error", log.ErrorAttr("err", mErr)) log.Error("get my process's name is error", log.ErrorField("err", mErr))
os.Exit(-1) os.Exit(-1)
} }
//进程id存在而且进程名也相同被认为是当前进程重复运行 //进程id存在而且进程名也相同被认为是当前进程重复运行
if cErr == nil && name == myName { if cErr == nil && name == myName {
log.Info("repeat runs are not allowed", log.String("nodeId", strNodeId), log.Int("processId", processId)) log.Error("repeat runs are not allowed", log.String("nodeId", strNodeId), log.Int("processId", processId))
os.Exit(-1) os.Exit(-1)
} }
break break
} }
//2.记录进程id号 //2.记录进程id号
log.Info("Start running server.")
writeProcessPid(strNodeId) writeProcessPid(strNodeId)
timer.StartTimer(10*time.Millisecond, 1000000) timer.StartTimer(10*time.Millisecond, 1000000)
//3.初始化node //3.初始化node
defer log.GetLogger().Logger.Sync()
initNode(strNodeId) initNode(strNodeId)
//4.运行service //4.运行service
service.Start() service.Start()
//5.运行集群 //5.运行集群
cluster.GetCluster().Start() err := cluster.GetCluster().Start()
if err != nil {
log.Error(err.Error())
os.Exit(-1)
}
//6.监听程序退出信号&性能报告 //6.监听程序退出信号&性能报告
var pProfilerTicker *time.Ticker = &time.Ticker{} var pProfilerTicker *time.Ticker = &time.Ticker{}
if profilerInterval > 0 { if profilerInterval > 0 {
pProfilerTicker = time.NewTicker(profilerInterval) pProfilerTicker = time.NewTicker(profilerInterval)
@@ -378,7 +387,7 @@ func startNode(args interface{}) error {
cluster.GetCluster().Stop() cluster.GetCluster().Stop()
log.Info("Server is stop.") log.Info("Server is stop.")
log.Close()
return nil return nil
} }
@@ -400,8 +409,8 @@ func SetupTemplateFunc(fs ...func() service.IService) {
} }
} }
func SetupTemplate[T any,P templateServicePoint[T]]() { func SetupTemplate[T any, P templateServicePoint[T]]() {
SetupTemplateFunc(func() service.IService{ SetupTemplateFunc(func() service.IService {
var t T var t T
return P(&t) return P(&t)
}) })
@@ -430,9 +439,11 @@ func openConsole(args interface{}) error {
} }
strOpen := strings.ToLower(strings.TrimSpace(args.(string))) strOpen := strings.ToLower(strings.TrimSpace(args.(string)))
if strOpen == "false" { if strOpen == "false" {
log.OpenConsole = false bOpenConsole := false
log.GetLogger().OpenConsole = &bOpenConsole
} else if strOpen == "true" { } else if strOpen == "true" {
log.OpenConsole = true bOpenConsole := true
log.GetLogger().OpenConsole = &bOpenConsole
} else { } else {
return errors.New("parameter console error") return errors.New("parameter console error")
} }
@@ -446,20 +457,18 @@ func setLevel(args interface{}) error {
strlogLevel := strings.TrimSpace(args.(string)) strlogLevel := strings.TrimSpace(args.(string))
switch strlogLevel { switch strlogLevel {
case "trace":
log.LogLevel = log.LevelTrace
case "debug": case "debug":
log.LogLevel = log.LevelDebug log.GetLogger().LogLevel = zapcore.DebugLevel
case "info": case "info":
log.LogLevel = log.LevelInfo log.GetLogger().LogLevel = zapcore.InfoLevel
case "warning": case "warn":
log.LogLevel = log.LevelWarning log.GetLogger().LogLevel = zapcore.WarnLevel
case "error": case "error":
log.LogLevel = log.LevelError log.GetLogger().LogLevel = zapcore.ErrorLevel
case "stack": case "stackerror":
log.LogLevel = log.LevelStack log.GetLogger().LogLevel = zapcore.ErrorLevel
case "fatal": case "fatal":
log.LogLevel = log.LevelFatal log.GetLogger().LogLevel = zapcore.FatalLevel
default: default:
return errors.New("unknown level: " + strlogLevel) return errors.New("unknown level: " + strlogLevel)
} }
@@ -470,52 +479,33 @@ func setLogPath(args interface{}) error {
if args == "" { if args == "" {
return nil return nil
} }
logPath := strings.TrimSpace(args.(string))
log.LogPath = strings.TrimSpace(args.(string)) dir, err := os.Stat(logPath)
dir, err := os.Stat(log.LogPath) //这个文件夹不存在
if err == nil && dir.IsDir() == false { if err == nil && dir.IsDir() == false {
return errors.New("Not found dir " + log.LogPath) return errors.New("Not found dir " + logPath)
} }
if err != nil { if err != nil {
err = os.Mkdir(log.LogPath, os.ModePerm) err = os.MkdirAll(logPath, os.ModePerm)
if err != nil { if err != nil {
return errors.New("Cannot create dir " + log.LogPath) return errors.New("Cannot create dir " + logPath)
} }
} }
log.GetLogger().LogPath = logPath
return nil return nil
} }
func setLogSize(args interface{}) error { func setLogSize(args interface{}) error {
if args == "" {
return nil
}
logSize, ok := args.(int) logSize, ok := args.(int)
if ok == false { if ok == false {
return errors.New("param logsize is error") return errors.New("param logsize is error")
} }
if logSize == 0 {
log.LogSize = int64(logSize) * 1024 * 1024
return nil
}
func setLogChannelCapNum(args interface{}) error {
if args == "" {
return nil return nil
} }
logChannelCap, ok := args.(int) log.GetLogger().LogConfig.MaxSize = logSize
if ok == false {
return errors.New("param logsize is error")
}
if logChannelCap == -1 {
return nil
}
log.LogChannelCap = logChannelCap
return nil return nil
} }

View File

@@ -75,7 +75,7 @@ func (cs *CallSet) AddPending(call *Call) {
if call.Seq == 0 { if call.Seq == 0 {
cs.pendingLock.Unlock() cs.pendingLock.Unlock()
log.Stack("call is error.") log.StackError("call is error.")
return return
} }

View File

@@ -32,7 +32,7 @@ type IRealClient interface {
SetConn(conn *network.NetConn) SetConn(conn *network.NetConn)
Close(waitDone bool) Close(waitDone bool)
AsyncCall(NodeId string, timeout time.Duration, rpcHandler IRpcHandler, serviceMethod string, callback reflect.Value, args interface{}, replyParam interface{}, cancelable bool) (CancelRpc, error) AsyncCall(NodeId string, timeout time.Duration, rpcHandler IRpcHandler, serviceMethod string, callback reflect.Value, args interface{}, replyParam interface{}) (CancelRpc, error)
Go(NodeId string, timeout time.Duration, rpcHandler IRpcHandler, noReply bool, serviceMethod string, args interface{}, reply interface{}) *Call Go(NodeId string, timeout time.Duration, rpcHandler IRpcHandler, noReply bool, serviceMethod string, args interface{}, reply interface{}) *Call
RawGo(NodeId string, timeout time.Duration, rpcHandler IRpcHandler, processor IRpcProcessor, noReply bool, rpcMethodId uint32, serviceMethod string, rawArgs []byte, reply interface{}) *Call RawGo(NodeId string, timeout time.Duration, rpcHandler IRpcHandler, processor IRpcProcessor, noReply bool, rpcMethodId uint32, serviceMethod string, rawArgs []byte, reply interface{}) *Call
IsConnected() bool IsConnected() bool
@@ -104,7 +104,7 @@ func (client *Client) processRpcResponse(responseData []byte) error {
//rc.conn.ReleaseReadMsg(bytes) //rc.conn.ReleaseReadMsg(bytes)
if err != nil { if err != nil {
processor.ReleaseRpcResponse(response.RpcResponseData) processor.ReleaseRpcResponse(response.RpcResponseData)
log.Error("rpcClient Unmarshal head error", log.ErrorAttr("error", err)) log.Error("rpcClient Unmarshal head error", log.ErrorField("error", err))
return nil return nil
} }
@@ -116,7 +116,7 @@ func (client *Client) processRpcResponse(responseData []byte) error {
if len(response.RpcResponseData.GetReply()) > 0 { if len(response.RpcResponseData.GetReply()) > 0 {
err = processor.Unmarshal(response.RpcResponseData.GetReply(), v.Reply) err = processor.Unmarshal(response.RpcResponseData.GetReply(), v.Reply)
if err != nil { if err != nil {
log.Error("rpcClient Unmarshal body failed", log.ErrorAttr("error", err)) log.Error("rpcClient Unmarshal body failed", log.ErrorField("error", err))
v.Err = err v.Err = err
} }
} }
@@ -203,7 +203,7 @@ func (client *Client) rawGo(nodeId string, w IWriter, timeout time.Duration, rpc
} }
if err != nil { if err != nil {
client.RemovePending(call.Seq) client.RemovePending(call.Seq)
log.Error("WriteMsg is fail", log.ErrorAttr("error", err)) log.Error("WriteMsg is fail", log.ErrorField("error", err))
call.Seq = 0 call.Seq = 0
call.DoError(err) call.DoError(err)
} }
@@ -211,7 +211,7 @@ func (client *Client) rawGo(nodeId string, w IWriter, timeout time.Duration, rpc
return call return call
} }
func (client *Client) asyncCall(nodeId string, w IWriter, timeout time.Duration, rpcHandler IRpcHandler, serviceMethod string, callback reflect.Value, args interface{}, replyParam interface{}, cancelable bool) (CancelRpc, error) { func (client *Client) asyncCall(nodeId string, w IWriter, timeout time.Duration, rpcHandler IRpcHandler, serviceMethod string, callback reflect.Value, args interface{}, replyParam interface{}) (CancelRpc, error) {
processorType, processor := GetProcessorType(args) processorType, processor := GetProcessorType(args)
InParam, herr := processor.Marshal(args) InParam, herr := processor.Marshal(args)
if herr != nil { if herr != nil {
@@ -264,10 +264,7 @@ func (client *Client) asyncCall(nodeId string, w IWriter, timeout time.Duration,
return emptyCancelRpc, err return emptyCancelRpc, err
} }
if cancelable {
rpcCancel := RpcCancel{CallSeq: seq, Cli: client}
return rpcCancel.CancelRpc, nil
}
return emptyCancelRpc, nil rpcCancel := RpcCancel{CallSeq: seq, Cli: client}
return rpcCancel.CancelRpc, nil
} }

View File

@@ -90,7 +90,7 @@ func (lc *LClient) RawGo(nodeId string, timeout time.Duration, rpcHandler IRpcHa
return pLocalRpcServer.selfNodeRpcHandlerGo(timeout, processor, lc.selfClient, true, serviceName, rpcMethodId, serviceName, nil, nil, rawArgs) return pLocalRpcServer.selfNodeRpcHandlerGo(timeout, processor, lc.selfClient, true, serviceName, rpcMethodId, serviceName, nil, nil, rawArgs)
} }
func (lc *LClient) AsyncCall(nodeId string, timeout time.Duration, rpcHandler IRpcHandler, serviceMethod string, callback reflect.Value, args interface{}, reply interface{}, cancelable bool) (CancelRpc, error) { func (lc *LClient) AsyncCall(nodeId string, timeout time.Duration, rpcHandler IRpcHandler, serviceMethod string, callback reflect.Value, args interface{}, reply interface{}) (CancelRpc, error) {
pLocalRpcServer := rpcHandler.GetRpcServer()() pLocalRpcServer := rpcHandler.GetRpcServer()()
//判断是否是同一服务 //判断是否是同一服务
@@ -109,7 +109,7 @@ func (lc *LClient) AsyncCall(nodeId string, timeout time.Duration, rpcHandler IR
} }
//其他的rpcHandler的处理器 //其他的rpcHandler的处理器
cancelRpc, err := pLocalRpcServer.selfNodeRpcHandlerAsyncGo(timeout, lc.selfClient, rpcHandler, false, serviceName, serviceMethod, args, reply, callback, cancelable) cancelRpc, err := pLocalRpcServer.selfNodeRpcHandlerAsyncGo(timeout, lc.selfClient, rpcHandler, false, serviceName, serviceMethod, args, reply, callback)
if err != nil { if err != nil {
callback.Call([]reflect.Value{reflect.ValueOf(reply), reflect.ValueOf(err)}) callback.Call([]reflect.Value{reflect.ValueOf(reply), reflect.ValueOf(err)})
} }
@@ -121,9 +121,6 @@ func NewLClient(localNodeId string, callSet *CallSet) *Client {
client := &Client{} client := &Client{}
client.clientId = atomic.AddUint32(&clientSeq, 1) client.clientId = atomic.AddUint32(&clientSeq, 1)
client.targetNodeId = localNodeId client.targetNodeId = localNodeId
//client.maxCheckCallRpcCount = DefaultMaxCheckCallRpcCount
//client.callRpcTimeout = DefaultRpcTimeout
lClient := &LClient{} lClient := &LClient{}
lClient.selfClient = client lClient.selfClient = client
client.IRealClient = lClient client.IRealClient = lClient

View File

@@ -42,7 +42,7 @@ func (server *BaseServer) selfNodeRpcHandlerGo(timeout time.Duration, processor
rpcHandler := server.rpcHandleFinder.FindRpcHandler(handlerName) rpcHandler := server.rpcHandleFinder.FindRpcHandler(handlerName)
if rpcHandler == nil { if rpcHandler == nil {
err := errors.New("service method " + serviceMethod + " not config!") err := errors.New("service method " + serviceMethod + " not config!")
log.Error("service method not config", log.String("serviceMethod", serviceMethod), log.ErrorAttr("error", err)) log.Error("service method not config", log.String("serviceMethod", serviceMethod), log.ErrorField("error", err))
pCall.Seq = 0 pCall.Seq = 0
pCall.DoError(err) pCall.DoError(err)
@@ -74,7 +74,7 @@ func (server *BaseServer) selfNodeRpcHandlerGo(timeout time.Duration, processor
var err error var err error
req.inParam, err = rpcHandler.UnmarshalInParam(processor, serviceMethod, rpcMethodId, rawArgs) req.inParam, err = rpcHandler.UnmarshalInParam(processor, serviceMethod, rpcMethodId, rawArgs)
if err != nil { if err != nil {
log.Error("unmarshalInParam is failed", log.String("serviceMethod", serviceMethod), log.Uint32("rpcMethodId", rpcMethodId), log.ErrorAttr("error", err)) log.Error("unmarshalInParam is failed", log.String("serviceMethod", serviceMethod), log.Uint32("rpcMethodId", rpcMethodId), log.ErrorField("error", err))
pCall.Seq = 0 pCall.Seq = 0
pCall.DoError(err) pCall.DoError(err)
ReleaseRpcRequest(req) ReleaseRpcRequest(req)
@@ -90,12 +90,12 @@ func (server *BaseServer) selfNodeRpcHandlerGo(timeout time.Duration, processor
byteReturns, err := req.rpcProcessor.Marshal(Returns) byteReturns, err := req.rpcProcessor.Marshal(Returns)
if err != nil { if err != nil {
Err = ConvertError(err) Err = ConvertError(err)
log.Error("returns data cannot be marshal", log.Uint64("seq", callSeq), log.ErrorAttr("error", err)) log.Error("returns data cannot be marshal", log.Uint64("seq", callSeq), log.ErrorField("error", err))
} else { } else {
err = req.rpcProcessor.Unmarshal(byteReturns, reply) err = req.rpcProcessor.Unmarshal(byteReturns, reply)
if err != nil { if err != nil {
Err = ConvertError(err) Err = ConvertError(err)
log.Error("returns data cannot be Unmarshal", log.Uint64("seq", callSeq), log.ErrorAttr("error", err)) log.Error("returns data cannot be Unmarshal", log.Uint64("seq", callSeq), log.ErrorField("error", err))
} }
} }
} }
@@ -127,7 +127,7 @@ func (server *BaseServer) selfNodeRpcHandlerGo(timeout time.Duration, processor
return pCall return pCall
} }
func (server *BaseServer) selfNodeRpcHandlerAsyncGo(timeout time.Duration, client *Client, callerRpcHandler IRpcHandler, noReply bool, handlerName string, serviceMethod string, args interface{}, reply interface{}, callback reflect.Value, cancelable bool) (CancelRpc, error) { func (server *BaseServer) selfNodeRpcHandlerAsyncGo(timeout time.Duration, client *Client, callerRpcHandler IRpcHandler, noReply bool, handlerName string, serviceMethod string, args interface{}, reply interface{}, callback reflect.Value) (CancelRpc, error) {
rpcHandler := server.rpcHandleFinder.FindRpcHandler(handlerName) rpcHandler := server.rpcHandleFinder.FindRpcHandler(handlerName)
if rpcHandler == nil { if rpcHandler == nil {
err := errors.New("service method " + serviceMethod + " not config!") err := errors.New("service method " + serviceMethod + " not config!")
@@ -266,7 +266,7 @@ func (server *BaseServer) processRpcRequest(data []byte, connTag string, wrRespo
req.inParam, err = rpcHandler.UnmarshalInParam(req.rpcProcessor, req.RpcRequestData.GetServiceMethod(), req.RpcRequestData.GetRpcMethodId(), req.RpcRequestData.GetInParam()) req.inParam, err = rpcHandler.UnmarshalInParam(req.rpcProcessor, req.RpcRequestData.GetServiceMethod(), req.RpcRequestData.GetRpcMethodId(), req.RpcRequestData.GetInParam())
if err != nil { if err != nil {
rErr := "Call Rpc " + req.RpcRequestData.GetServiceMethod() + " Param error " + err.Error() rErr := "Call Rpc " + req.RpcRequestData.GetServiceMethod() + " Param error " + err.Error()
log.Error("call rpc param error", log.String("serviceMethod", req.RpcRequestData.GetServiceMethod()), log.ErrorAttr("error", err)) log.Error("call rpc param error", log.String("serviceMethod", req.RpcRequestData.GetServiceMethod()), log.ErrorField("error", err))
if req.requestHandle != nil { if req.requestHandle != nil {
req.requestHandle(nil, RpcError(rErr)) req.requestHandle(nil, RpcError(rErr))
} else { } else {

View File

@@ -50,7 +50,7 @@ func (nc *NatsClient) Go(nodeId string, timeout time.Duration, rpcHandler IRpcHa
_, processor := GetProcessorType(args) _, processor := GetProcessorType(args)
InParam, err := processor.Marshal(args) InParam, err := processor.Marshal(args)
if err != nil { if err != nil {
log.Error("Marshal is fail", log.ErrorAttr("error", err)) log.Error("Marshal is fail", log.ErrorField("error", err))
call := MakeCall() call := MakeCall()
call.DoError(err) call.DoError(err)
return call return call
@@ -63,8 +63,8 @@ func (nc *NatsClient) RawGo(nodeId string, timeout time.Duration, rpcHandler IRp
return nc.client.rawGo(nodeId, nc, timeout, rpcHandler, processor, noReply, rpcMethodId, serviceMethod, rawArgs, reply) return nc.client.rawGo(nodeId, nc, timeout, rpcHandler, processor, noReply, rpcMethodId, serviceMethod, rawArgs, reply)
} }
func (nc *NatsClient) AsyncCall(nodeId string, timeout time.Duration, rpcHandler IRpcHandler, serviceMethod string, callback reflect.Value, args interface{}, replyParam interface{}, cancelable bool) (CancelRpc, error) { func (nc *NatsClient) AsyncCall(nodeId string, timeout time.Duration, rpcHandler IRpcHandler, serviceMethod string, callback reflect.Value, args interface{}, replyParam interface{}) (CancelRpc, error) {
cancelRpc, err := nc.client.asyncCall(nodeId, nc, timeout, rpcHandler, serviceMethod, callback, args, replyParam, cancelable) cancelRpc, err := nc.client.asyncCall(nodeId, nc, timeout, rpcHandler, serviceMethod, callback, args, replyParam)
if err != nil { if err != nil {
callback.Call([]reflect.Value{reflect.ValueOf(replyParam), reflect.ValueOf(err)}) callback.Call([]reflect.Value{reflect.ValueOf(replyParam), reflect.ValueOf(err)})
} }

View File

@@ -48,7 +48,7 @@ func (ns *NatsServer) Start() error {
ns.natsConn, err = nats.Connect(ns.natsUrl, options...) ns.natsConn, err = nats.Connect(ns.natsUrl, options...)
if err != nil { if err != nil {
log.Error("Connect to nats fail", log.String("natsUrl", ns.natsUrl), log.ErrorAttr("err", err)) log.Error("Connect to nats fail", log.String("natsUrl", ns.natsUrl), log.ErrorField("err", err))
return err return err
} }
@@ -77,7 +77,7 @@ func (ns *NatsServer) WriteResponse(processor IRpcProcessor, nodeId string, serv
defer processor.ReleaseRpcResponse(rpcResponse.RpcResponseData) defer processor.ReleaseRpcResponse(rpcResponse.RpcResponseData)
if err != nil { if err != nil {
log.Error("marshal RpcResponseData failed", log.String("serviceMethod", serviceMethod), log.ErrorAttr("error", err)) log.Error("marshal RpcResponseData failed", log.String("serviceMethod", serviceMethod), log.ErrorField("error", err))
return return
} }
@@ -86,7 +86,7 @@ func (ns *NatsServer) WriteResponse(processor IRpcProcessor, nodeId string, serv
if ns.compressBytesLen > 0 && len(bytes) >= ns.compressBytesLen { if ns.compressBytesLen > 0 && len(bytes) >= ns.compressBytesLen {
compressBuff, err = compressor.CompressBlock(bytes) compressBuff, err = compressor.CompressBlock(bytes)
if err != nil { if err != nil {
log.Error("CompressBlock failed", log.String("serviceMethod", serviceMethod), log.ErrorAttr("error", err)) log.Error("CompressBlock failed", log.String("serviceMethod", serviceMethod), log.ErrorField("error", err))
return return
} }
if len(compressBuff) < len(bytes) { if len(compressBuff) < len(bytes) {
@@ -106,7 +106,7 @@ func (ns *NatsServer) WriteResponse(processor IRpcProcessor, nodeId string, serv
} }
if err != nil { if err != nil {
log.Error("WriteMsg error,Rpc return is fail", log.String("nodeId", nodeId), log.String("serviceMethod", serviceMethod), log.ErrorAttr("error", err)) log.Error("WriteMsg error,Rpc return is fail", log.String("nodeId", nodeId), log.String("serviceMethod", serviceMethod), log.ErrorField("error", err))
} }
} }

View File

@@ -6,7 +6,7 @@ import (
"github.com/duanhf2012/origin/v2/network" "github.com/duanhf2012/origin/v2/network"
"math" "math"
"reflect" "reflect"
"runtime"
"sync/atomic" "sync/atomic"
"time" "time"
) )
@@ -49,7 +49,7 @@ func (rc *RClient) Go(nodeId string, timeout time.Duration, rpcHandler IRpcHandl
_, processor := GetProcessorType(args) _, processor := GetProcessorType(args)
InParam, err := processor.Marshal(args) InParam, err := processor.Marshal(args)
if err != nil { if err != nil {
log.Error("Marshal is fail", log.ErrorAttr("error", err)) log.Error("Marshal is fail", log.ErrorField("error", err))
call := MakeCall() call := MakeCall()
call.DoError(err) call.DoError(err)
return call return call
@@ -62,8 +62,8 @@ func (rc *RClient) RawGo(nodeId string, timeout time.Duration, rpcHandler IRpcHa
return rc.selfClient.rawGo(nodeId, rc, timeout, rpcHandler, processor, noReply, rpcMethodId, serviceMethod, rawArgs, reply) return rc.selfClient.rawGo(nodeId, rc, timeout, rpcHandler, processor, noReply, rpcMethodId, serviceMethod, rawArgs, reply)
} }
func (rc *RClient) AsyncCall(nodeId string, timeout time.Duration, rpcHandler IRpcHandler, serviceMethod string, callback reflect.Value, args interface{}, replyParam interface{}, cancelable bool) (CancelRpc, error) { func (rc *RClient) AsyncCall(nodeId string, timeout time.Duration, rpcHandler IRpcHandler, serviceMethod string, callback reflect.Value, args interface{}, replyParam interface{}) (CancelRpc, error) {
cancelRpc, err := rc.selfClient.asyncCall(nodeId, rc, timeout, rpcHandler, serviceMethod, callback, args, replyParam, cancelable) cancelRpc, err := rc.selfClient.asyncCall(nodeId, rc, timeout, rpcHandler, serviceMethod, callback, args, replyParam)
if err != nil { if err != nil {
callback.Call([]reflect.Value{reflect.ValueOf(replyParam), reflect.ValueOf(err)}) callback.Call([]reflect.Value{reflect.ValueOf(replyParam), reflect.ValueOf(err)})
} }
@@ -74,10 +74,7 @@ func (rc *RClient) AsyncCall(nodeId string, timeout time.Duration, rpcHandler IR
func (rc *RClient) Run() { func (rc *RClient) Run() {
defer func() { defer func() {
if r := recover(); r != nil { if r := recover(); r != nil {
buf := make([]byte, 4096) log.StackError(fmt.Sprint(r))
l := runtime.Stack(buf, false)
errString := fmt.Sprint(r)
log.Dump(string(buf[:l]), log.String("error", errString))
} }
}() }()
@@ -89,7 +86,7 @@ func (rc *RClient) Run() {
for { for {
bytes, err := rc.conn.ReadMsg() bytes, err := rc.conn.ReadMsg()
if err != nil { if err != nil {
log.Error("RClient read msg is failed", log.ErrorAttr("error", err)) log.Error("RClient read msg is failed", log.ErrorField("error", err))
return return
} }

View File

@@ -6,7 +6,7 @@ import (
"github.com/duanhf2012/origin/v2/event" "github.com/duanhf2012/origin/v2/event"
"github.com/duanhf2012/origin/v2/log" "github.com/duanhf2012/origin/v2/log"
"reflect" "reflect"
"runtime"
"strings" "strings"
"time" "time"
"unicode" "unicode"
@@ -164,13 +164,6 @@ func (handler *RpcHandler) suitableMethods(method reflect.Method) error {
//取出输入参数类型 //取出输入参数类型
var rpcMethodInfo RpcMethodInfo var rpcMethodInfo RpcMethodInfo
typ := method.Type typ := method.Type
if typ.NumOut() != 1 {
return fmt.Errorf("%s The number of returned arguments must be 1", method.Name)
}
if typ.Out(0).String() != "error" {
return fmt.Errorf("%s The return parameter must be of type error", method.Name)
}
if typ.NumIn() < 2 || typ.NumIn() > 4 { if typ.NumIn() < 2 || typ.NumIn() > 4 {
return fmt.Errorf("%s Unsupported parameter format", method.Name) return fmt.Errorf("%s Unsupported parameter format", method.Name)
@@ -183,6 +176,18 @@ func (handler *RpcHandler) suitableMethods(method reflect.Method) error {
rpcMethodInfo.hasResponder = true rpcMethodInfo.hasResponder = true
} }
if rpcMethodInfo.hasResponder && typ.NumOut() > 0 {
return fmt.Errorf("%s should not have return parameters", method.Name)
}
if !rpcMethodInfo.hasResponder && typ.NumOut() != 1 {
return fmt.Errorf("%s The number of returned arguments must be 1", method.Name)
}
if !rpcMethodInfo.hasResponder && typ.Out(0).String() != "error" {
return fmt.Errorf("%s The return parameter must be of type error", method.Name)
}
for i := parIdx; i < typ.NumIn(); i++ { for i := parIdx; i < typ.NumIn(); i++ {
if handler.isExportedOrBuiltinType(typ.In(i)) == false { if handler.isExportedOrBuiltinType(typ.In(i)) == false {
return fmt.Errorf("%s Unsupported parameter types", method.Name) return fmt.Errorf("%s Unsupported parameter types", method.Name)
@@ -220,10 +225,7 @@ func (handler *RpcHandler) RegisterRpc(rpcHandler IRpcHandler) error {
func (handler *RpcHandler) HandlerRpcResponseCB(call *Call) { func (handler *RpcHandler) HandlerRpcResponseCB(call *Call) {
defer func() { defer func() {
if r := recover(); r != nil { if r := recover(); r != nil {
buf := make([]byte, 4096) log.StackError(fmt.Sprint(r))
l := runtime.Stack(buf, false)
errString := fmt.Sprint(r)
log.Dump(string(buf[:l]), log.String("error", errString))
} }
}() }()
@@ -242,10 +244,7 @@ func (handler *RpcHandler) HandlerRpcRequest(request *RpcRequest) {
defer func() { defer func() {
if r := recover(); r != nil { if r := recover(); r != nil {
buf := make([]byte, 4096) log.StackError(fmt.Sprint(r))
l := runtime.Stack(buf, false)
errString := fmt.Sprint(r)
log.Dump(string(buf[:l]), log.String("error", errString))
rpcErr := RpcError("call error : core dumps") rpcErr := RpcError("call error : core dumps")
if request.requestHandle != nil { if request.requestHandle != nil {
request.requestHandle(nil, rpcErr) request.requestHandle(nil, rpcErr)
@@ -313,9 +312,11 @@ func (handler *RpcHandler) HandlerRpcRequest(request *RpcRequest) {
requestHandle := request.requestHandle requestHandle := request.requestHandle
returnValues := v.method.Func.Call(paramList) returnValues := v.method.Func.Call(paramList)
errInter := returnValues[0].Interface() if len(returnValues) > 0 {
if errInter != nil { errInter := returnValues[0].Interface()
err = errInter.(error) if errInter != nil {
err = errInter.(error)
}
} }
if v.hasResponder == false && requestHandle != nil { if v.hasResponder == false && requestHandle != nil {
@@ -439,7 +440,7 @@ func (handler *RpcHandler) goRpc(processor IRpcProcessor, bCast bool, nodeId str
err, pClientList := handler.funcRpcClient(nodeId, serviceMethod, false, pClientList) err, pClientList := handler.funcRpcClient(nodeId, serviceMethod, false, pClientList)
if len(pClientList) == 0 { if len(pClientList) == 0 {
if err != nil { if err != nil {
log.Error("call serviceMethod is failed", log.String("serviceMethod", serviceMethod), log.ErrorAttr("error", err)) log.Error("call serviceMethod is failed", log.String("serviceMethod", serviceMethod), log.ErrorField("error", err))
} else { } else {
log.Error("cannot find serviceMethod", log.String("serviceMethod", serviceMethod)) log.Error("cannot find serviceMethod", log.String("serviceMethod", serviceMethod))
} }
@@ -468,7 +469,7 @@ func (handler *RpcHandler) callRpc(timeout time.Duration, nodeId string, service
pClientList := make([]*Client, 0, maxClusterNode) pClientList := make([]*Client, 0, maxClusterNode)
err, pClientList := handler.funcRpcClient(nodeId, serviceMethod, false, pClientList) err, pClientList := handler.funcRpcClient(nodeId, serviceMethod, false, pClientList)
if err != nil { if err != nil {
log.Error("Call serviceMethod is failed", log.ErrorAttr("error", err)) log.Error("Call serviceMethod is failed", log.ErrorField("error", err))
return err return err
} else if len(pClientList) <= 0 { } else if len(pClientList) <= 0 {
err = errors.New("Call serviceMethod is error:cannot find " + serviceMethod) err = errors.New("Call serviceMethod is error:cannot find " + serviceMethod)
@@ -532,8 +533,7 @@ func (handler *RpcHandler) asyncCallRpc(timeout time.Duration, nodeId string, se
} }
//2.rpcClient调用 //2.rpcClient调用
//如果调用本结点服务 return pClientList[0].AsyncCall(pClientList[0].GetTargetNodeId(), timeout, handler.rpcHandler, serviceMethod, fVal, args, reply, )
return pClientList[0].AsyncCall(pClientList[0].GetTargetNodeId(), timeout, handler.rpcHandler, serviceMethod, fVal, args, reply, false)
} }
func (handler *RpcHandler) GetName() string { func (handler *RpcHandler) GetName() string {
@@ -592,7 +592,7 @@ func (handler *RpcHandler) RawGoNode(rpcProcessorType RpcProcessorType, nodeId s
pClientList := make([]*Client, 0, 1) pClientList := make([]*Client, 0, 1)
err, pClientList := handler.funcRpcClient(nodeId, serviceName, false, pClientList) err, pClientList := handler.funcRpcClient(nodeId, serviceName, false, pClientList)
if len(pClientList) == 0 || err != nil { if len(pClientList) == 0 || err != nil {
log.Error("call serviceMethod is failed", log.ErrorAttr("error", err)) log.Error("call serviceMethod is failed", log.ErrorField("error", err))
return err return err
} }
if len(pClientList) > 1 { if len(pClientList) > 1 {

View File

@@ -27,9 +27,6 @@ func (rn *RpcNats) NewNatsClient(targetNodeId string,localNodeId string,callSet
client.clientId = atomic.AddUint32(&clientSeq, 1) client.clientId = atomic.AddUint32(&clientSeq, 1)
client.targetNodeId = targetNodeId client.targetNodeId = targetNodeId
//client.maxCheckCallRpcCount = DefaultMaxCheckCallRpcCount
//client.callRpcTimeout = DefaultRpcTimeout
natsClient := &rn.NatsClient natsClient := &rn.NatsClient
natsClient.localNodeId = localNodeId natsClient.localNodeId = localNodeId
natsClient.client = &client natsClient.client = &client

View File

@@ -7,7 +7,7 @@ import (
"math" "math"
"net" "net"
"reflect" "reflect"
"runtime"
"strings" "strings"
"time" "time"
) )
@@ -31,7 +31,7 @@ type IServer interface {
selfNodeRpcHandlerGo(timeout time.Duration, processor IRpcProcessor, client *Client, noReply bool, handlerName string, rpcMethodId uint32, serviceMethod string, args interface{}, reply interface{}, rawArgs []byte) *Call selfNodeRpcHandlerGo(timeout time.Duration, processor IRpcProcessor, client *Client, noReply bool, handlerName string, rpcMethodId uint32, serviceMethod string, args interface{}, reply interface{}, rawArgs []byte) *Call
myselfRpcHandlerGo(client *Client, handlerName string, serviceMethod string, args interface{}, callBack reflect.Value, reply interface{}) error myselfRpcHandlerGo(client *Client, handlerName string, serviceMethod string, args interface{}, callBack reflect.Value, reply interface{}) error
selfNodeRpcHandlerAsyncGo(timeout time.Duration, client *Client, callerRpcHandler IRpcHandler, noReply bool, handlerName string, serviceMethod string, args interface{}, reply interface{}, callback reflect.Value, cancelable bool) (CancelRpc, error) selfNodeRpcHandlerAsyncGo(timeout time.Duration, client *Client, callerRpcHandler IRpcHandler, noReply bool, handlerName string, serviceMethod string, args interface{}, reply interface{}, callback reflect.Value) (CancelRpc, error)
} }
type writeResponse func(processor IRpcProcessor, connTag string, serviceMethod string, seq uint64, reply interface{}, rpcError RpcError) type writeResponse func(processor IRpcProcessor, connTag string, serviceMethod string, seq uint64, reply interface{}, rpcError RpcError)
@@ -130,7 +130,7 @@ func (agent *RpcAgent) WriteResponse(processor IRpcProcessor, connTag string, se
defer processor.ReleaseRpcResponse(rpcResponse.RpcResponseData) defer processor.ReleaseRpcResponse(rpcResponse.RpcResponseData)
if errM != nil { if errM != nil {
log.Error("marshal RpcResponseData failed", log.String("serviceMethod", serviceMethod), log.ErrorAttr("error", errM)) log.Error("marshal RpcResponseData failed", log.String("serviceMethod", serviceMethod), log.ErrorField("error", errM))
return return
} }
@@ -141,7 +141,7 @@ func (agent *RpcAgent) WriteResponse(processor IRpcProcessor, connTag string, se
compressBuff, cErr = compressor.CompressBlock(bytes) compressBuff, cErr = compressor.CompressBlock(bytes)
if cErr != nil { if cErr != nil {
log.Error("CompressBlock failed", log.String("serviceMethod", serviceMethod), log.ErrorAttr("error", cErr)) log.Error("CompressBlock failed", log.String("serviceMethod", serviceMethod), log.ErrorField("error", cErr))
return return
} }
if len(compressBuff) < len(bytes) { if len(compressBuff) < len(bytes) {
@@ -155,17 +155,14 @@ func (agent *RpcAgent) WriteResponse(processor IRpcProcessor, connTag string, se
compressor.CompressBufferCollection(compressBuff) compressor.CompressBufferCollection(compressBuff)
} }
if errM != nil { if errM != nil {
log.Error("WriteMsg error,Rpc return is fail", log.String("serviceMethod", serviceMethod), log.ErrorAttr("error", errM)) log.Error("WriteMsg error,Rpc return is fail", log.String("serviceMethod", serviceMethod), log.ErrorField("error", errM))
} }
} }
func (agent *RpcAgent) Run() { func (agent *RpcAgent) Run() {
defer func() { defer func() {
if r := recover(); r != nil { if r := recover(); r != nil {
buf := make([]byte, 4096) log.StackError(fmt.Sprint(r))
l := runtime.Stack(buf, false)
errString := fmt.Sprint(r)
log.Dump(string(buf[:l]), log.String("error", errString))
} }
}() }()
@@ -173,7 +170,7 @@ func (agent *RpcAgent) Run() {
data, err := agent.conn.ReadMsg() data, err := agent.conn.ReadMsg()
if err != nil { if err != nil {
//will close conn //will close conn
log.Error("read message is error", log.String("remoteAddress", agent.conn.RemoteAddr().String()), log.ErrorAttr("error", err)) log.Error("read message is error", log.String("remoteAddress", agent.conn.RemoteAddr().String()), log.ErrorField("error", err))
break break
} }
@@ -181,7 +178,7 @@ func (agent *RpcAgent) Run() {
if err != nil { if err != nil {
//will close conn //will close conn
agent.conn.ReleaseReadMsg(data) agent.conn.ReleaseReadMsg(data)
log.Error("processRpcRequest is error", log.String("remoteAddress", agent.conn.RemoteAddr().String()), log.ErrorAttr("error", err)) log.Error("processRpcRequest is error", log.String("remoteAddress", agent.conn.RemoteAddr().String()), log.ErrorField("error", err))
break break
} }

View File

@@ -11,6 +11,7 @@ import (
"github.com/duanhf2012/origin/v2/log" "github.com/duanhf2012/origin/v2/log"
rpcHandle "github.com/duanhf2012/origin/v2/rpc" rpcHandle "github.com/duanhf2012/origin/v2/rpc"
"github.com/duanhf2012/origin/v2/util/timer" "github.com/duanhf2012/origin/v2/util/timer"
"slices"
) )
const InitModuleId = 1e9 const InitModuleId = 1e9
@@ -46,7 +47,7 @@ type Module struct {
moduleName string //模块名称 moduleName string //模块名称
parent IModule //父亲 parent IModule //父亲
self IModule //自己 self IModule //自己
child map[uint32]IModule //孩子们 child []IModule //孩子们
mapActiveTimer map[timer.ITimer]struct{} mapActiveTimer map[timer.ITimer]struct{}
mapActiveIdTimer map[uint64]timer.ITimer mapActiveIdTimer map[uint64]timer.ITimer
dispatcher *timer.Dispatcher //timer dispatcher *timer.Dispatcher //timer
@@ -93,10 +94,7 @@ func (m *Module) AddModule(module IModule) (uint32, error) {
pAddModule.moduleId = m.NewModuleId() pAddModule.moduleId = m.NewModuleId()
} }
if m.child == nil { _,ok := m.ancestor.getBaseModule().(*Module).descendants[module.GetModuleId()]
m.child = map[uint32]IModule{}
}
_, ok := m.child[module.GetModuleId()]
if ok == true { if ok == true {
return 0, fmt.Errorf("exists module id %d", module.GetModuleId()) return 0, fmt.Errorf("exists module id %d", module.GetModuleId())
} }
@@ -109,29 +107,33 @@ func (m *Module) AddModule(module IModule) (uint32, error) {
pAddModule.eventHandler = event.NewEventHandler() pAddModule.eventHandler = event.NewEventHandler()
pAddModule.eventHandler.Init(m.eventHandler.GetEventProcessor()) pAddModule.eventHandler.Init(m.eventHandler.GetEventProcessor())
pAddModule.IConcurrent = m.IConcurrent pAddModule.IConcurrent = m.IConcurrent
m.child = append(m.child,module)
m.ancestor.getBaseModule().(*Module).descendants[module.GetModuleId()] = module
err := module.OnInit() err := module.OnInit()
if err != nil { if err != nil {
delete(m.ancestor.getBaseModule().(*Module).descendants, module.GetModuleId())
m.child = m.child[:len(m.child)-1]
log.Error("module OnInit error",log.String("ModuleName",module.GetModuleName()),log.ErrorField("err",err))
return 0, err return 0, err
} }
m.child[module.GetModuleId()] = module
m.ancestor.getBaseModule().(*Module).descendants[module.GetModuleId()] = module
log.Debug("Add module " + module.GetModuleName() + " completed") log.Debug("Add module " + module.GetModuleName() + " completed")
return module.GetModuleId(), nil return module.GetModuleId(), nil
} }
func (m *Module) ReleaseModule(moduleId uint32) { func (m *Module) ReleaseModule(moduleId uint32) {
pModule := m.GetModule(moduleId).getBaseModule().(*Module) pModule := m.GetModule(moduleId).getBaseModule().(*Module)
pModule.self.OnRelease()
log.Debug("Release module " + pModule.GetModuleName())
//释放子孙 for i:=len(pModule.child)-1; i>=0; i-- {
for id := range pModule.child { m.ReleaseModule(pModule.child[i].GetModuleId())
m.ReleaseModule(id)
} }
pModule.self.OnRelease()
pModule.GetEventHandler().Destroy() pModule.GetEventHandler().Destroy()
log.Debug("Release module " + pModule.GetModuleName())
for pTimer := range pModule.mapActiveTimer { for pTimer := range pModule.mapActiveTimer {
pTimer.Cancel() pTimer.Cancel()
} }
@@ -140,7 +142,10 @@ func (m *Module) ReleaseModule(moduleId uint32) {
t.Cancel() t.Cancel()
} }
delete(m.child, moduleId) m.child = slices.DeleteFunc(m.child, func(module IModule) bool {
return module.GetModuleId() == moduleId
})
delete(m.ancestor.getBaseModule().(*Module).descendants, moduleId) delete(m.ancestor.getBaseModule().(*Module).descendants, moduleId)
//清理被删除的Module //清理被删除的Module
@@ -281,7 +286,7 @@ func (m *Module) SafeNewTicker(tickerId *uint64, d time.Duration, AdditionData i
func (m *Module) CancelTimerId(timerId *uint64) bool { func (m *Module) CancelTimerId(timerId *uint64) bool {
if timerId == nil || *timerId == 0 { if timerId == nil || *timerId == 0 {
log.Warning("timerId is invalid") log.Warn("timerId is invalid")
return false return false
} }
@@ -292,7 +297,7 @@ func (m *Module) CancelTimerId(timerId *uint64) bool {
t, ok := m.mapActiveIdTimer[*timerId] t, ok := m.mapActiveIdTimer[*timerId]
if ok == false { if ok == false {
log.Stack("cannot find timer id ", log.Uint64("timerId", *timerId)) log.StackError("cannot find timer id ", log.Uint64("timerId", *timerId))
return false return false
} }

View File

@@ -11,7 +11,6 @@ import (
"github.com/duanhf2012/origin/v2/rpc" "github.com/duanhf2012/origin/v2/rpc"
"github.com/duanhf2012/origin/v2/util/timer" "github.com/duanhf2012/origin/v2/util/timer"
"reflect" "reflect"
"runtime"
"strconv" "strconv"
"sync" "sync"
"sync/atomic" "sync/atomic"
@@ -261,17 +260,16 @@ func (s *Service) SetName(serviceName string) {
func (s *Service) Release() { func (s *Service) Release() {
defer func() { defer func() {
if r := recover(); r != nil { if r := recover(); r != nil {
buf := make([]byte, 4096) log.StackError(fmt.Sprint(r))
l := runtime.Stack(buf, false)
errString := fmt.Sprint(r)
log.Dump(string(buf[:l]), log.String("error", errString))
} }
}() }()
if atomic.AddInt32(&s.isRelease, -1) == -1 { if atomic.AddInt32(&s.isRelease, -1) == -1 {
s.self.OnRelease() s.self.OnRelease()
for i:=len(s.child)-1; i>=0; i-- {
s.ReleaseModule(s.child[i].GetModuleId())
}
} }
} }
func (s *Service) OnRelease() { func (s *Service) OnRelease() {
@@ -436,6 +434,7 @@ func (s *Service) SetEventChannelNum(num int) {
} }
} }
// Deprecated: replace it with the OpenConcurrent function
func (s *Service) SetGoRoutineNum(goroutineNum int32) bool { func (s *Service) SetGoRoutineNum(goroutineNum int32) bool {
//已经开始状态不允许修改协程数量,打开性能分析器不允许开多线程 //已经开始状态不允许修改协程数量,打开性能分析器不允许开多线程
if s.startStatus == true || s.profiler != nil { if s.startStatus == true || s.profiler != nil {

View File

@@ -23,7 +23,7 @@ func Init() {
for _,s := range setupServiceList { for _,s := range setupServiceList {
err := s.OnInit() err := s.OnInit()
if err != nil { if err != nil {
log.Error("Failed to initialize "+s.GetName()+" service",log.ErrorAttr("err",err)) log.Error("Failed to initialize "+s.GetName()+" service",log.ErrorField("err",err))
os.Exit(1) os.Exit(1)
} }
} }

View File

@@ -7,7 +7,6 @@ import (
"github.com/duanhf2012/origin/v2/service" "github.com/duanhf2012/origin/v2/service"
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
"io" "io"
"log/slog"
"net/http" "net/http"
"strings" "strings"
"time" "time"
@@ -69,28 +68,28 @@ func (gm *GinModule) eventHandler(ev event.IEvent) {
func (gm *GinModule) Start() { func (gm *GinModule) Start() {
gm.srv.Addr = gm.listenAddr gm.srv.Addr = gm.listenAddr
log.Info("http start listen", slog.Any("addr", gm.listenAddr)) log.Info("http start listen", log.Any("addr", gm.listenAddr))
go func() { go func() {
err := gm.srv.ListenAndServe() err := gm.srv.ListenAndServe()
if err != nil { if err != nil {
log.Error("ListenAndServe error", slog.Any("error", err.Error())) log.Error("ListenAndServe error", log.Any("error", err.Error()))
} }
}() }()
} }
func (gm *GinModule) StartTLS(certFile, keyFile string) { func (gm *GinModule) StartTLS(certFile, keyFile string) {
log.Info("http start listen", slog.Any("addr", gm.listenAddr)) log.Info("http start listen", log.Any("addr", gm.listenAddr))
go func() { go func() {
err := gm.srv.ListenAndServeTLS(certFile, keyFile) err := gm.srv.ListenAndServeTLS(certFile, keyFile)
if err != nil { if err != nil {
log.Fatal("ListenAndServeTLS error", slog.Any("error", err.Error())) log.Fatal("ListenAndServeTLS error", log.Any("error", err.Error()))
} }
}() }()
} }
func (gm *GinModule) Stop(ctx context.Context) { func (gm *GinModule) Stop(ctx context.Context) {
if err := gm.srv.Shutdown(ctx); err != nil { if err := gm.srv.Shutdown(ctx); err != nil {
log.Error("Server Shutdown", slog.Any("error", err)) log.Error("Server Shutdown", log.Any("error", err))
} }
} }
@@ -210,7 +209,7 @@ func (gm *GinModule) handleMethod(httpMethod, relativePath string, handlers ...S
select { select {
case <-ctx.Done(): case <-ctx.Done():
log.Error("GinModule process timeout", slog.Any("path", c.Request.URL.Path)) log.Error("GinModule process timeout", log.Any("path", c.Request.URL.Path))
c.AbortWithStatus(http.StatusRequestTimeout) c.AbortWithStatus(http.StatusRequestTimeout)
case <-chanWait: case <-chanWait:
} }

View File

@@ -9,7 +9,6 @@ import (
"github.com/duanhf2012/origin/v2/service" "github.com/duanhf2012/origin/v2/service"
"github.com/xtaci/kcp-go/v5" "github.com/xtaci/kcp-go/v5"
"go.mongodb.org/mongo-driver/bson/primitive" "go.mongodb.org/mongo-driver/bson/primitive"
"runtime"
"sync" "sync"
) )
@@ -167,10 +166,7 @@ func (km *KcpModule) NewAgent(conn network.Conn) network.Agent {
func (c *Client) Run() { func (c *Client) Run() {
defer func() { defer func() {
if r := recover(); r != nil { if r := recover(); r != nil {
buf := make([]byte, 4096) log.StackError(fmt.Sprint(r))
l := runtime.Stack(buf, false)
errString := fmt.Sprint(r)
log.Dump(string(buf[:l]), log.String("error", errString))
} }
}() }()
@@ -179,7 +175,7 @@ func (c *Client) Run() {
c.kcpConn.SetReadDeadline(*c.kcpModule.kcpCfg.ReadDeadlineMill) c.kcpConn.SetReadDeadline(*c.kcpModule.kcpCfg.ReadDeadlineMill)
msgBuff, err := c.kcpConn.ReadMsg() msgBuff, err := c.kcpConn.ReadMsg()
if err != nil { if err != nil {
log.Debug("read client failed", log.ErrorAttr("error", err), log.String("clientId", c.id)) log.Debug("read client failed", log.ErrorField("error", err), log.String("clientId", c.id))
break break
} }

View File

@@ -8,7 +8,6 @@ import (
"github.com/duanhf2012/origin/v2/network/processor" "github.com/duanhf2012/origin/v2/network/processor"
"github.com/duanhf2012/origin/v2/service" "github.com/duanhf2012/origin/v2/service"
"go.mongodb.org/mongo-driver/bson/primitive" "go.mongodb.org/mongo-driver/bson/primitive"
"runtime"
"sync" "sync"
"time" "time"
) )
@@ -125,10 +124,7 @@ func (slf *Client) GetId() string {
func (slf *Client) Run() { func (slf *Client) Run() {
defer func() { defer func() {
if r := recover(); r != nil { if r := recover(); r != nil {
buf := make([]byte, 4096) log.StackError(fmt.Sprint(r))
l := runtime.Stack(buf, false)
errString := fmt.Sprint(r)
log.Dump(string(buf[:l]), log.String("error", errString))
} }
}() }()
@@ -137,7 +133,7 @@ func (slf *Client) Run() {
slf.tcpConn.SetReadDeadline(slf.tcpModule.tcpServer.ReadDeadline) slf.tcpConn.SetReadDeadline(slf.tcpModule.tcpServer.ReadDeadline)
bytes, err := slf.tcpConn.ReadMsg() bytes, err := slf.tcpConn.ReadMsg()
if err != nil { if err != nil {
log.Debug("read client failed", log.ErrorAttr("error", err), log.String("clientId", slf.id)) log.Debug("read client failed", log.ErrorField("error", err), log.String("clientId", slf.id))
break break
} }
data, err := slf.tcpModule.process.Unmarshal(slf.id, bytes) data, err := slf.tcpModule.process.Unmarshal(slf.id, bytes)
@@ -185,7 +181,7 @@ func (tm *TcpModule) Close(clientId string) {
client.tcpConn.Close() client.tcpConn.Close()
} }
log.SWarning("close client:", clientId) log.SWarn("close client:", clientId)
return return
} }

View File

@@ -14,7 +14,7 @@ import (
type WSModule struct { type WSModule struct {
service.Module service.Module
wsServer network.WSServer WSServer network.WSServer
mapClientLocker sync.RWMutex mapClientLocker sync.RWMutex
mapClient map[string]*WSClient mapClient map[string]*WSClient
@@ -57,16 +57,16 @@ func (ws *WSModule) OnInit() error {
return fmt.Errorf("please call the Init function correctly") return fmt.Errorf("please call the Init function correctly")
} }
ws.wsServer.MaxConnNum = ws.wsCfg.MaxConnNum ws.WSServer.MaxConnNum = ws.wsCfg.MaxConnNum
ws.wsServer.PendingWriteNum = ws.wsCfg.PendingWriteNum ws.WSServer.PendingWriteNum = ws.wsCfg.PendingWriteNum
ws.wsServer.MaxMsgLen = ws.wsCfg.MaxMsgLen ws.WSServer.MaxMsgLen = ws.wsCfg.MaxMsgLen
ws.wsServer.Addr = ws.wsCfg.ListenAddr ws.WSServer.Addr = ws.wsCfg.ListenAddr
//3.设置解析处理器 //3.设置解析处理器
ws.process.SetByteOrder(ws.wsCfg.LittleEndian) ws.process.SetByteOrder(ws.wsCfg.LittleEndian)
ws.mapClient = make(map[string]*WSClient, ws.wsServer.MaxConnNum) ws.mapClient = make(map[string]*WSClient, ws.WSServer.MaxConnNum)
ws.wsServer.NewAgent = ws.NewWSClient ws.WSServer.NewAgent = ws.NewWSClient
//4.设置网络事件处理 //4.设置网络事件处理
ws.GetEventProcessor().RegEventReceiverFunc(event.Sys_Event_WebSocket, ws.GetEventHandler(), ws.wsEventHandler) ws.GetEventProcessor().RegEventReceiverFunc(event.Sys_Event_WebSocket, ws.GetEventHandler(), ws.wsEventHandler)
@@ -80,7 +80,7 @@ func (ws *WSModule) Init(wsCfg *WSCfg, process processor.IRawProcessor) {
} }
func (ws *WSModule) Start() error { func (ws *WSModule) Start() error {
return ws.wsServer.Start() return ws.WSServer.Start()
} }
func (ws *WSModule) wsEventHandler(ev event.IEvent) { func (ws *WSModule) wsEventHandler(ev event.IEvent) {
@@ -120,7 +120,7 @@ func (wc *WSClient) Run() {
for { for {
bytes, err := wc.wsConn.ReadMsg() bytes, err := wc.wsConn.ReadMsg()
if err != nil { if err != nil {
log.Debug("read client is error", log.String("clientId", wc.id), log.ErrorAttr("err", err)) log.Debug("read client is error", log.String("clientId", wc.id), log.ErrorField("err", err))
break break
} }
data, err := wc.wsModule.process.Unmarshal(wc.id, bytes) data, err := wc.wsModule.process.Unmarshal(wc.id, bytes)
@@ -197,3 +197,7 @@ func (ws *WSModule) SendRawMsg(clientId string, msg []byte) error {
ws.mapClientLocker.Unlock() ws.mapClientLocker.Unlock()
return client.wsConn.WriteMsg(msg) return client.wsConn.WriteMsg(msg)
} }
func (ws *WSModule) SetMessageType(messageType int) {
ws.WSServer.SetMessageType(messageType)
}

View File

@@ -66,7 +66,7 @@ func (m *RedisModule) Init(redisCfg *ConfigRedis) {
} }
c, err := redis.Dial("tcp", redisServer, opt...) c, err := redis.Dial("tcp", redisServer, opt...)
if err != nil { if err != nil {
log.Error("Connect redis fail reason:%v", err) log.Error("Connect redis fail", log.ErrorField("err",err))
return nil, err return nil, err
} }
@@ -79,7 +79,7 @@ func (m *RedisModule) Init(redisCfg *ConfigRedis) {
} }
_, err := c.Do("PING") _, err := c.Do("PING")
if err != nil { if err != nil {
log.Error("Do PING fail reason:%v", err) log.Error("Do PING fail reason", log.ErrorField("err",err))
return err return err
} }
return err return err
@@ -101,7 +101,7 @@ func (m *RedisModule) getConn() (redis.Conn, error) {
if conn.Err() != nil { if conn.Err() != nil {
err := conn.Err() err := conn.Err()
if err != nil { if err != nil {
log.Error("get Conn have error,reason:%v", err) log.Error("get Conn have error", log.ErrorField("err",err))
} }
conn.Close() conn.Close()
return nil, err return nil, err
@@ -118,7 +118,7 @@ func (m *RedisModule) TestPingRedis() error {
err = m.redisPool.TestOnBorrow(conn, time.Now()) err = m.redisPool.TestOnBorrow(conn, time.Now())
if err != nil { if err != nil {
log.Error("TestOnBorrow fail,reason:%v", err) log.Error("TestOnBorrow fail", log.ErrorField("err",err))
return err return err
} }
@@ -171,7 +171,7 @@ func (m *RedisModule) setStringByExpire(key, value, expire interface{}) error {
} }
if retErr != nil { if retErr != nil {
log.Error("setStringByExpire fail,reason:%v", retErr) log.Error("setStringByExpire fail", log.ErrorField("err",retErr))
return retErr return retErr
} }
@@ -254,7 +254,7 @@ func (m *RedisModule) setMuchStringByExpire(mapInfo map[interface{}]interface{},
} }
if serr != nil { if serr != nil {
log.Error("setMuchStringByExpire fail,reason:%v", serr) log.Error("setMuchStringByExpire fail",log.ErrorField("err",serr))
conn.Do("DISCARD") conn.Do("DISCARD")
return serr return serr
} else { } else {
@@ -262,7 +262,7 @@ func (m *RedisModule) setMuchStringByExpire(mapInfo map[interface{}]interface{},
} }
if err != nil { if err != nil {
log.Error("setMuchStringByExpire fail,reason:%v", err) log.Error("setMuchStringByExpire fail", log.ErrorField("err",err))
} }
return err return err
@@ -277,7 +277,7 @@ func (m *RedisModule) GetString(key interface{}) (string, error) {
ret, err := conn.Do("GET", key) ret, err := conn.Do("GET", key)
if err != nil { if err != nil {
log.Error("GetString fail,reason:%v", err) log.Error("GetString fail", log.ErrorField("err",err))
return "", err return "", err
} }
@@ -298,7 +298,7 @@ func (m *RedisModule) GetStringJSON(key string, st interface{}) error {
ret, err := conn.Do("GET", key) ret, err := conn.Do("GET", key)
if err != nil { if err != nil {
log.Error("GetStringJSON fail,reason:%v", err) log.Error("GetStringJSON fail", log.ErrorField("err",err))
return err return err
} }
@@ -315,7 +315,7 @@ func (m *RedisModule) GetStringJSON(key string, st interface{}) error {
} }
if err = json.Unmarshal(str, st); err != nil { if err = json.Unmarshal(str, st); err != nil {
log.Error("GetStringJSON fail json.Unmarshal is error:%s,%s,reason:%v", key, string(str), err) log.Errorf("GetStringJSON fail json.Unmarshal is error:%s,%s,reason:%v", key, string(str), err)
return err return err
} }
@@ -336,13 +336,13 @@ func (m *RedisModule) GetStringMap(keys []string) (retMap map[string]string, err
// 开始Send数据 // 开始Send数据
err = conn.Send("MULTI") err = conn.Send("MULTI")
if err != nil { if err != nil {
log.Error("GetMuchString fail %v", err) log.Errorf("GetMuchString fail %v", err)
return nil, err return nil, err
} }
for _, val := range keys { for _, val := range keys {
err = conn.Send("GET", val) err = conn.Send("GET", val)
if err != nil { if err != nil {
log.Error("GetMuchString fail,reason:%v", err) log.Errorf("GetMuchString fail,reason:%v", err)
conn.Do("DISCARD") conn.Do("DISCARD")
return nil, err return nil, err
} }
@@ -351,7 +351,7 @@ func (m *RedisModule) GetStringMap(keys []string) (retMap map[string]string, err
// 执行命令 // 执行命令
ret, err := conn.Do("EXEC") ret, err := conn.Do("EXEC")
if err != nil { if err != nil {
log.Error("GetMuchString fail %v", err) log.Errorf("GetMuchString fail %v", err)
return return
} }
@@ -383,7 +383,7 @@ func (m *RedisModule) ExistsKey(key interface{}) (bool, error) {
ret, err := conn.Do("EXISTS", key) ret, err := conn.Do("EXISTS", key)
if err != nil { if err != nil {
log.Error("ExistsKey fail, reason:%v", err) log.Errorf("ExistsKey fail, reason:%v", err)
return false, err return false, err
} }
retValue, ok := ret.(int64) retValue, ok := ret.(int64)
@@ -404,7 +404,7 @@ func (m *RedisModule) DelString(key interface{}) error {
ret, err := conn.Do("DEL", key) ret, err := conn.Do("DEL", key)
if err != nil { if err != nil {
log.Error("DelString fail, reason:%v", err) log.Errorf("DelString fail, reason:%v", err)
return err return err
} }
@@ -439,7 +439,7 @@ func (m *RedisModule) DelStringKeyList(keys []interface{}) (map[interface{}]bool
for _, val := range keys { for _, val := range keys {
err = conn.Send("DEL", val) err = conn.Send("DEL", val)
if err != nil { if err != nil {
log.Error("DelMuchString fail,reason:%v", err) log.Errorf("DelMuchString fail,reason:%v", err)
conn.Do("DISCARD") conn.Do("DISCARD")
return nil, err return nil, err
} }
@@ -448,7 +448,7 @@ func (m *RedisModule) DelStringKeyList(keys []interface{}) (map[interface{}]bool
ret, err := conn.Do("EXEC") ret, err := conn.Do("EXEC")
if err != nil { if err != nil {
log.Error("DelMuchString fail,reason:%v", err) log.Errorf("DelMuchString fail,reason:%v", err)
return nil, err return nil, err
} }
@@ -484,7 +484,7 @@ func (m *RedisModule) SetHash(redisKey, hashKey, value interface{}) error {
_, retErr := conn.Do("HSET", redisKey, hashKey, value) _, retErr := conn.Do("HSET", redisKey, hashKey, value)
if retErr != nil { if retErr != nil {
log.Error("SetHash fail,reason:%v", retErr) log.Errorf("SetHash fail,reason:%v", retErr)
} }
return retErr return retErr
@@ -502,7 +502,7 @@ func (m *RedisModule) GetAllHashJSON(redisKey string) (map[string]string, error)
value, err := conn.Do("HGETALL", redisKey) value, err := conn.Do("HGETALL", redisKey)
if err != nil { if err != nil {
log.Error("GetAllHashJSON fail,reason:%v", err) log.Errorf("GetAllHashJSON fail,reason:%v", err)
return nil, err return nil, err
} }
@@ -522,7 +522,7 @@ func (m *RedisModule) GetHash(redisKey interface{}, fieldKey interface{}) (strin
value, err := conn.Do("HGET", redisKey, fieldKey) value, err := conn.Do("HGET", redisKey, fieldKey)
if err != nil { if err != nil {
log.Error("GetHashValueByKey fail,reason:%v", err) log.Errorf("GetHashValueByKey fail,reason:%v", err)
return "", err return "", err
} }
if value == nil { if value == nil {
@@ -545,7 +545,7 @@ func (m *RedisModule) GetMuchHash(args ...interface{}) ([]string, error) {
value, err := conn.Do("HMGET", args...) value, err := conn.Do("HMGET", args...)
if err != nil { if err != nil {
log.Error("GetHashValueByKey fail,reason:%v", err) log.Errorf("GetHashValueByKey fail,reason:%v", err)
return nil, err return nil, err
} }
if value == nil { if value == nil {
@@ -582,7 +582,7 @@ func (m *RedisModule) ScanMatchKeys(cursorValue int, redisKey string, count int)
value, err := conn.Do("SCAN", cursorValue, "match", redisKey, "count", count) value, err := conn.Do("SCAN", cursorValue, "match", redisKey, "count", count)
if err != nil { if err != nil {
log.Error("GetHashValueByKey fail,reason:%v", err) log.Errorf("GetHashValueByKey fail,reason:%v", err)
return nextCursorValue, nil, err return nextCursorValue, nil, err
} }
if value == nil { if value == nil {
@@ -618,7 +618,7 @@ func (m *RedisModule) SetHashMapJSON(redisKey string, mapFieldValue map[interfac
if err == nil { if err == nil {
_, err = conn.Do("HSET", redisKey, symbol, temp) _, err = conn.Do("HSET", redisKey, symbol, temp)
if err != nil { if err != nil {
log.Error("SetMuchHashJSON fail,reason:%v", err) log.Errorf("SetMuchHashJSON fail,reason:%v", err)
conn.Send("DISCARD") conn.Send("DISCARD")
return err return err
} }
@@ -627,7 +627,7 @@ func (m *RedisModule) SetHashMapJSON(redisKey string, mapFieldValue map[interfac
// 执行命令 // 执行命令
_, err = conn.Do("EXEC") _, err = conn.Do("EXEC")
if err != nil { if err != nil {
log.Error("SetMuchHashJSON fail,reason:%v", err) log.Errorf("SetMuchHashJSON fail,reason:%v", err)
conn.Send("DISCARD") conn.Send("DISCARD")
} }
return err return err
@@ -642,7 +642,7 @@ func (m *RedisModule) DelHash(args ...interface{}) error {
_, retErr := conn.Do("HDEL", args...) _, retErr := conn.Do("HDEL", args...)
if retErr != nil { if retErr != nil {
log.Error("DelMuchHash fail,reason:%v", retErr) log.Errorf("DelMuchHash fail,reason:%v", retErr)
} }
return retErr return retErr
} }
@@ -678,7 +678,7 @@ func (m *RedisModule) setListPush(setType string, args ...interface{}) error {
_, retErr := conn.Do(setType, args...) _, retErr := conn.Do(setType, args...)
if retErr != nil { if retErr != nil {
log.Error("setList fail,reason:%v", retErr) log.Errorf("setList fail,reason:%v", retErr)
} }
return retErr return retErr
} }
@@ -705,7 +705,7 @@ func (m *RedisModule) LRangeList(key string, start, end int) ([]string, error) {
reply, err := conn.Do("lrange", key, start, end) reply, err := conn.Do("lrange", key, start, end)
if err != nil { if err != nil {
log.Error("SetListJSONRpush fail,reason:%v", err) log.Errorf("SetListJSONRpush fail,reason:%v", err)
return nil, err return nil, err
} }
@@ -722,7 +722,7 @@ func (m *RedisModule) GetListLen(key string) (int, error) {
reply, err := conn.Do("LLEN", key) reply, err := conn.Do("LLEN", key)
if err != nil { if err != nil {
log.Error("GetListLen fail,reason:%v", err) log.Errorf("GetListLen fail,reason:%v", err)
return -1, err return -1, err
} }
return redis.Int(reply, err) return redis.Int(reply, err)
@@ -748,7 +748,7 @@ func (m *RedisModule) LTrimList(key string, start, end int) error {
_, err = conn.Do("LTRIM", key, start, end) _, err = conn.Do("LTRIM", key, start, end)
if err != nil { if err != nil {
log.Error("LtrimListValue fail,reason:%v", err) log.Errorf("LtrimListValue fail,reason:%v", err)
return err return err
} }
return nil return nil
@@ -849,7 +849,7 @@ func (m *RedisModule) ZADDInsertJson(key string, score float64, value interface{
} }
_, err = conn.Do("ZADD", key, score, JsonValue) _, err = conn.Do("ZADD", key, score, JsonValue)
if err != nil { if err != nil {
log.Error("ZADDInsertJson fail,reason:%v", err) log.Errorf("ZADDInsertJson fail,reason:%v", err)
return err return err
} }
return nil return nil
@@ -865,7 +865,7 @@ func (m *RedisModule) ZADDInsert(key string, score float64, Data interface{}) er
_, err = conn.Do("ZADD", key, score, Data) _, err = conn.Do("ZADD", key, score, Data)
if err != nil { if err != nil {
log.Error("ZADDInsert fail,reason:%v", err) log.Errorf("ZADDInsert fail,reason:%v", err)
return err return err
} }
return nil return nil
@@ -1088,7 +1088,7 @@ func (m *RedisModule) HincrbyHashInt(redisKey, hashKey string, value int) error
_, retErr := conn.Do("HINCRBY", redisKey, hashKey, value) _, retErr := conn.Do("HINCRBY", redisKey, hashKey, value)
if retErr != nil { if retErr != nil {
log.Error("HincrbyHashInt fail,reason:%v", retErr) log.Errorf("HincrbyHashInt fail,reason:%v", retErr)
} }
return retErr return retErr
@@ -1103,7 +1103,7 @@ func (m *RedisModule) EXPlREInsert(key string, TTl int) error {
_, err = conn.Do("expire", key, TTl) _, err = conn.Do("expire", key, TTl)
if err != nil { if err != nil {
log.Error("expire fail,reason:%v", err) log.Errorf("expire fail,reason:%v", err)
return err return err
} }
return nil return nil
@@ -1129,7 +1129,7 @@ func (m *RedisModule) Keys(key string) ([]string, error) {
ret, err := conn.Do("KEYS", key) ret, err := conn.Do("KEYS", key)
if err != nil { if err != nil {
log.Error("KEYS fail, reason:%v", err) log.Errorf("KEYS fail, reason:%v", err)
return nil, err return nil, err
} }
retList, ok := ret.([]interface{}) retList, ok := ret.([]interface{})

View File

@@ -9,7 +9,6 @@ import (
"github.com/duanhf2012/origin/v2/service" "github.com/duanhf2012/origin/v2/service"
"github.com/duanhf2012/origin/v2/util/bytespool" "github.com/duanhf2012/origin/v2/util/bytespool"
"github.com/google/uuid" "github.com/google/uuid"
"runtime"
"strings" "strings"
"sync" "sync"
"time" "time"
@@ -140,10 +139,7 @@ func (slf *Client) GetId() string {
func (slf *Client) Run() { func (slf *Client) Run() {
defer func() { defer func() {
if r := recover(); r != nil { if r := recover(); r != nil {
buf := make([]byte, 4096) log.StackError(fmt.Sprint(r))
l := runtime.Stack(buf, false)
errString := fmt.Sprint(r)
log.Dump(string(buf[:l]), log.String("error", errString))
} }
}() }()
@@ -156,7 +152,7 @@ func (slf *Client) Run() {
slf.tcpConn.SetReadDeadline(slf.tcpService.tcpServer.ReadDeadline) slf.tcpConn.SetReadDeadline(slf.tcpService.tcpServer.ReadDeadline)
bytes, err := slf.tcpConn.ReadMsg() bytes, err := slf.tcpConn.ReadMsg()
if err != nil { if err != nil {
log.Debug("read client failed", log.ErrorAttr("error", err), log.String("clientId", slf.id)) log.Debug("read client failed", log.ErrorField("error", err), log.String("clientId", slf.id))
break break
} }
data, err := slf.tcpService.process.Unmarshal(slf.id, bytes) data, err := slf.tcpService.process.Unmarshal(slf.id, bytes)

View File

@@ -129,7 +129,7 @@ func (slf *WSClient) Run() {
for { for {
bytes, err := slf.wsConn.ReadMsg() bytes, err := slf.wsConn.ReadMsg()
if err != nil { if err != nil {
log.Debug("read client id %s is error:%+v", slf.id, err) log.Debugf("read client id %s is error:%+v", slf.id, err)
break break
} }
data, err := slf.wsService.process.Unmarshal(slf.id, bytes) data, err := slf.wsService.process.Unmarshal(slf.id, bytes)

View File

@@ -32,10 +32,10 @@ func Abs[NumType typ.Signed | typ.Float](Num NumType) NumType {
func AddSafe[NumType typ.Number](number1 NumType, number2 NumType) (NumType, bool) { func AddSafe[NumType typ.Number](number1 NumType, number2 NumType) (NumType, bool) {
ret := number1 + number2 ret := number1 + number2
if number2 > 0 && ret < number1 { if number2 > 0 && ret < number1 {
log.Stack("Calculation overflow", log.Any("number1", number1), log.Any("number2", number2)) log.SStackError("Calculation overflow", log.Any("number1", number1), log.Any("number2", number2))
return ret, false return ret, false
} else if number2 < 0 && ret > number1 { } else if number2 < 0 && ret > number1 {
log.Stack("Calculation overflow", log.Any("number1", number1), log.Any("number2", number2)) log.SStackError("Calculation overflow", log.Any("number1", number1), log.Any("number2", number2))
return ret, false return ret, false
} }
@@ -45,10 +45,10 @@ func AddSafe[NumType typ.Number](number1 NumType, number2 NumType) (NumType, boo
func SubSafe[NumType typ.Number](number1 NumType, number2 NumType) (NumType, bool) { func SubSafe[NumType typ.Number](number1 NumType, number2 NumType) (NumType, bool) {
ret := number1 - number2 ret := number1 - number2
if number2 > 0 && ret > number1 { if number2 > 0 && ret > number1 {
log.Stack("Calculation overflow", log.Any("number1", number1), log.Any("number2", number2)) log.SStackError("Calculation overflow", log.Any("number1", number1), log.Any("number2", number2))
return ret, false return ret, false
} else if number2 < 0 && ret < number1 { } else if number2 < 0 && ret < number1 {
log.Stack("Calculation overflow", log.Any("number1", number1), log.Any("number2", number2)) log.SStackError("Calculation overflow", log.Any("number1", number1), log.Any("number2", number2))
return ret, false return ret, false
} }
@@ -65,7 +65,7 @@ func MulSafe[NumType typ.Number](number1 NumType, number2 NumType) (NumType, boo
return ret, true return ret, true
} }
log.Stack("Calculation overflow", log.Any("number1", number1), log.Any("number2", number2)) log.SStackError("Calculation overflow", log.Any("number1", number1), log.Any("number2", number2))
return ret, true return ret, true
} }

View File

@@ -124,10 +124,7 @@ func (t *Timer) IsOpen() bool {
func (t *Timer) Do() { func (t *Timer) Do() {
defer func() { defer func() {
if r := recover(); r != nil { if r := recover(); r != nil {
buf := make([]byte, 4096) log.StackError(fmt.Sprint(r))
l := runtime.Stack(buf, false)
errString := fmt.Sprint(r)
log.Dump(string(buf[:l]), log.String("error", errString))
} }
}() }()
@@ -210,10 +207,7 @@ func (c *Cron) Reset() {
func (c *Cron) Do() { func (c *Cron) Do() {
defer func() { defer func() {
if r := recover(); r != nil { if r := recover(); r != nil {
buf := make([]byte, 4096) log.StackError(fmt.Sprint(r))
l := runtime.Stack(buf, false)
errString := fmt.Sprint(r)
log.Dump(string(buf[:l]), log.String("error", errString))
} }
}() }()
@@ -266,10 +260,7 @@ func (c *Cron) UnRef() {
func (c *Ticker) Do() { func (c *Ticker) Do() {
defer func() { defer func() {
if r := recover(); r != nil { if r := recover(); r != nil {
buf := make([]byte, 4096) log.StackError(fmt.Sprint(r))
l := runtime.Stack(buf, false)
errString := fmt.Sprint(r)
log.Dump(string(buf[:l]), log.String("error", errString))
} }
}() }()