mirror of
https://github.com/duanhf2012/origin.git
synced 2026-02-12 22:54:43 +08:00
Compare commits
11 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
6c44ba180c | ||
|
|
ecfd42bdec | ||
|
|
01d3b3e535 | ||
|
|
550d65a354 | ||
|
|
15580ffce9 | ||
|
|
bd467a219b | ||
|
|
af15615345 | ||
|
|
50dd80b082 | ||
|
|
a6487dd41e | ||
|
|
d5299294d8 | ||
|
|
4d36e525a5 |
@@ -40,8 +40,6 @@ type NodeInfo struct {
|
|||||||
DiscoveryService []DiscoveryService //筛选发现的服务,如果不配置,不进行筛选
|
DiscoveryService []DiscoveryService //筛选发现的服务,如果不配置,不进行筛选
|
||||||
status NodeStatus
|
status NodeStatus
|
||||||
Retire bool
|
Retire bool
|
||||||
|
|
||||||
NetworkName string
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type NodeRpcInfo struct {
|
type NodeRpcInfo struct {
|
||||||
|
|||||||
@@ -3,24 +3,23 @@ package cluster
|
|||||||
import "github.com/duanhf2012/origin/v2/rpc"
|
import "github.com/duanhf2012/origin/v2/rpc"
|
||||||
|
|
||||||
type ConfigDiscovery struct {
|
type ConfigDiscovery struct {
|
||||||
funDelNode FunDelNode
|
funDelNode FunDelNode
|
||||||
funSetNode FunSetNode
|
funSetNode FunSetNode
|
||||||
localNodeId string
|
localNodeId string
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (discovery *ConfigDiscovery) InitDiscovery(localNodeId string, funDelNode FunDelNode, funSetNode FunSetNode) error {
|
||||||
func (discovery *ConfigDiscovery) InitDiscovery(localNodeId string,funDelNode FunDelNode,funSetNode FunSetNode) error{
|
|
||||||
discovery.localNodeId = localNodeId
|
discovery.localNodeId = localNodeId
|
||||||
discovery.funDelNode = funDelNode
|
discovery.funDelNode = funDelNode
|
||||||
discovery.funSetNode = funSetNode
|
discovery.funSetNode = funSetNode
|
||||||
|
|
||||||
//解析本地其他服务配置
|
//解析本地其他服务配置
|
||||||
_,nodeInfoList,_,err := GetCluster().readLocalClusterConfig(rpc.NodeIdNull)
|
_, nodeInfoList, _, err := GetCluster().readLocalClusterConfig(rpc.NodeIdNull)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
for _,nodeInfo := range nodeInfoList {
|
for _, nodeInfo := range nodeInfoList {
|
||||||
if nodeInfo.NodeId == localNodeId {
|
if nodeInfo.NodeId == localNodeId {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
@@ -30,5 +29,3 @@ func (discovery *ConfigDiscovery) InitDiscovery(localNodeId string,funDelNode Fu
|
|||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -5,17 +5,17 @@ import (
|
|||||||
"github.com/duanhf2012/origin/v2/service"
|
"github.com/duanhf2012/origin/v2/service"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (cls *Cluster) setupDiscovery(localNodeId string, setupServiceFun SetupServiceFun) error{
|
func (cls *Cluster) setupDiscovery(localNodeId string, setupServiceFun SetupServiceFun) error {
|
||||||
if cls.discoveryInfo.getDiscoveryType() == OriginType { //origin类型服务发现
|
if cls.discoveryInfo.getDiscoveryType() == OriginType { //origin类型服务发现
|
||||||
return cls.setupOriginDiscovery(localNodeId,setupServiceFun)
|
return cls.setupOriginDiscovery(localNodeId, setupServiceFun)
|
||||||
}else if cls.discoveryInfo.getDiscoveryType() == EtcdType{//etcd类型服务发现
|
} else if cls.discoveryInfo.getDiscoveryType() == EtcdType { //etcd类型服务发现
|
||||||
return cls.setupEtcdDiscovery(localNodeId,setupServiceFun)
|
return cls.setupEtcdDiscovery(localNodeId, setupServiceFun)
|
||||||
}
|
}
|
||||||
|
|
||||||
return cls.setupConfigDiscovery(localNodeId,setupServiceFun)
|
return cls.setupConfigDiscovery(localNodeId, setupServiceFun)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cls *Cluster) setupOriginDiscovery(localNodeId string, setupServiceFun SetupServiceFun) error{
|
func (cls *Cluster) setupOriginDiscovery(localNodeId string, setupServiceFun SetupServiceFun) error {
|
||||||
if cls.serviceDiscovery != nil {
|
if cls.serviceDiscovery != nil {
|
||||||
return errors.New("service discovery has been setup")
|
return errors.New("service discovery has been setup")
|
||||||
}
|
}
|
||||||
@@ -27,6 +27,7 @@ func (cls *Cluster) setupOriginDiscovery(localNodeId string, setupServiceFun Set
|
|||||||
}
|
}
|
||||||
|
|
||||||
cls.serviceDiscovery = getOriginDiscovery()
|
cls.serviceDiscovery = getOriginDiscovery()
|
||||||
|
|
||||||
//2.如果为动态服务发现安装本地发现服务
|
//2.如果为动态服务发现安装本地发现服务
|
||||||
if localMaster == true {
|
if localMaster == true {
|
||||||
setupServiceFun(&masterService)
|
setupServiceFun(&masterService)
|
||||||
@@ -36,11 +37,10 @@ func (cls *Cluster) setupOriginDiscovery(localNodeId string, setupServiceFun Set
|
|||||||
setupServiceFun(&clientService)
|
setupServiceFun(&clientService)
|
||||||
cls.AddDiscoveryService(OriginDiscoveryClientName, true)
|
cls.AddDiscoveryService(OriginDiscoveryClientName, true)
|
||||||
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cls *Cluster) setupEtcdDiscovery(localNodeId string, setupServiceFun SetupServiceFun) error{
|
func (cls *Cluster) setupEtcdDiscovery(localNodeId string, setupServiceFun SetupServiceFun) error {
|
||||||
if cls.serviceDiscovery != nil {
|
if cls.serviceDiscovery != nil {
|
||||||
return errors.New("service discovery has been setup")
|
return errors.New("service discovery has been setup")
|
||||||
}
|
}
|
||||||
@@ -48,12 +48,12 @@ func (cls *Cluster) setupEtcdDiscovery(localNodeId string, setupServiceFun Setup
|
|||||||
//setup etcd service
|
//setup etcd service
|
||||||
cls.serviceDiscovery = getEtcdDiscovery()
|
cls.serviceDiscovery = getEtcdDiscovery()
|
||||||
setupServiceFun(cls.serviceDiscovery.(service.IService))
|
setupServiceFun(cls.serviceDiscovery.(service.IService))
|
||||||
|
|
||||||
cls.AddDiscoveryService(cls.serviceDiscovery.(service.IService).GetName(),false)
|
cls.AddDiscoveryService(cls.serviceDiscovery.(service.IService).GetName(), false)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cls *Cluster) setupConfigDiscovery(localNodeId string, setupServiceFun SetupServiceFun) error{
|
func (cls *Cluster) setupConfigDiscovery(localNodeId string, setupServiceFun SetupServiceFun) error {
|
||||||
if cls.serviceDiscovery != nil {
|
if cls.serviceDiscovery != nil {
|
||||||
return errors.New("service discovery has been setup")
|
return errors.New("service discovery has been setup")
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,6 +1,11 @@
|
|||||||
package cluster
|
package cluster
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
|
"crypto/tls"
|
||||||
|
"crypto/x509"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
"github.com/duanhf2012/origin/v2/event"
|
"github.com/duanhf2012/origin/v2/event"
|
||||||
"github.com/duanhf2012/origin/v2/log"
|
"github.com/duanhf2012/origin/v2/log"
|
||||||
"github.com/duanhf2012/origin/v2/rpc"
|
"github.com/duanhf2012/origin/v2/rpc"
|
||||||
@@ -9,16 +14,11 @@ import (
|
|||||||
"go.etcd.io/etcd/api/v3/mvccpb"
|
"go.etcd.io/etcd/api/v3/mvccpb"
|
||||||
"go.etcd.io/etcd/client/v3"
|
"go.etcd.io/etcd/client/v3"
|
||||||
"google.golang.org/protobuf/proto"
|
"google.golang.org/protobuf/proto"
|
||||||
"time"
|
"os"
|
||||||
"context"
|
|
||||||
"errors"
|
|
||||||
"fmt"
|
|
||||||
"path"
|
"path"
|
||||||
"strings"
|
"strings"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"io/ioutil"
|
"time"
|
||||||
"crypto/x509"
|
|
||||||
"crypto/tls"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
const originDir = "/origin"
|
const originDir = "/origin"
|
||||||
@@ -42,7 +42,8 @@ type EtcdDiscoveryService struct {
|
|||||||
mapDiscoveryNodeId map[string]map[string]struct{} //map[networkName]map[nodeId]
|
mapDiscoveryNodeId map[string]map[string]struct{} //map[networkName]map[nodeId]
|
||||||
}
|
}
|
||||||
|
|
||||||
var etcdDiscovery *EtcdDiscoveryService
|
var etcdDiscovery *EtcdDiscoveryService
|
||||||
|
|
||||||
func getEtcdDiscovery() IServiceDiscovery {
|
func getEtcdDiscovery() IServiceDiscovery {
|
||||||
if etcdDiscovery == nil {
|
if etcdDiscovery == nil {
|
||||||
etcdDiscovery = &EtcdDiscoveryService{}
|
etcdDiscovery = &EtcdDiscoveryService{}
|
||||||
@@ -51,7 +52,6 @@ func getEtcdDiscovery() IServiceDiscovery {
|
|||||||
return etcdDiscovery
|
return etcdDiscovery
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
func (ed *EtcdDiscoveryService) InitDiscovery(localNodeId string, funDelNode FunDelNode, funSetNode FunSetNode) error {
|
func (ed *EtcdDiscoveryService) InitDiscovery(localNodeId string, funDelNode FunDelNode, funSetNode FunSetNode) error {
|
||||||
ed.localNodeId = localNodeId
|
ed.localNodeId = localNodeId
|
||||||
|
|
||||||
@@ -99,17 +99,17 @@ func (ed *EtcdDiscoveryService) OnInit() error {
|
|||||||
|
|
||||||
if etcdDiscoveryCfg.EtcdList[i].Cert != "" {
|
if etcdDiscoveryCfg.EtcdList[i].Cert != "" {
|
||||||
// load cert
|
// load cert
|
||||||
cert, cerr := tls.LoadX509KeyPair(etcdDiscoveryCfg.EtcdList[i].Cert, etcdDiscoveryCfg.EtcdList[i].CertKey)
|
cert, cErr := tls.LoadX509KeyPair(etcdDiscoveryCfg.EtcdList[i].Cert, etcdDiscoveryCfg.EtcdList[i].CertKey)
|
||||||
if cerr != nil {
|
if cErr != nil {
|
||||||
log.Error("load cert error", log.ErrorField("err", cerr))
|
log.Error("load cert error", log.ErrorField("err", cErr))
|
||||||
return cerr
|
return cErr
|
||||||
}
|
}
|
||||||
|
|
||||||
// load root ca
|
// load root ca
|
||||||
caData, cerr := ioutil.ReadFile(etcdDiscoveryCfg.EtcdList[i].Ca)
|
caData, cErr := os.ReadFile(etcdDiscoveryCfg.EtcdList[i].Ca)
|
||||||
if cerr != nil {
|
if cErr != nil {
|
||||||
log.Error("load root ca error", log.ErrorField("err", cerr))
|
log.Error("load root ca error", log.ErrorField("err", cErr))
|
||||||
return cerr
|
return cErr
|
||||||
}
|
}
|
||||||
pool := x509.NewCertPool()
|
pool := x509.NewCertPool()
|
||||||
pool.AppendCertsFromPEM(caData)
|
pool.AppendCertsFromPEM(caData)
|
||||||
@@ -122,13 +122,12 @@ func (ed *EtcdDiscoveryService) OnInit() error {
|
|||||||
client, err = clientv3.New(clientv3.Config{
|
client, err = clientv3.New(clientv3.Config{
|
||||||
Endpoints: etcdDiscoveryCfg.EtcdList[i].Endpoints,
|
Endpoints: etcdDiscoveryCfg.EtcdList[i].Endpoints,
|
||||||
DialTimeout: etcdDiscoveryCfg.DialTimeoutMillisecond,
|
DialTimeout: etcdDiscoveryCfg.DialTimeoutMillisecond,
|
||||||
Username: etcdDiscoveryCfg.EtcdList[i].UserName,
|
Username: etcdDiscoveryCfg.EtcdList[i].UserName,
|
||||||
Password: etcdDiscoveryCfg.EtcdList[i].Password,
|
Password: etcdDiscoveryCfg.EtcdList[i].Password,
|
||||||
Logger: log.GetLogger().Logger,
|
Logger: log.GetLogger().Logger,
|
||||||
TLS: tlsConfig,
|
TLS: tlsConfig,
|
||||||
})
|
})
|
||||||
|
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("etcd discovery init fail", log.ErrorField("err", err))
|
log.Error("etcd discovery init fail", log.ErrorField("err", err))
|
||||||
return err
|
return err
|
||||||
|
|||||||
@@ -7,6 +7,7 @@ import (
|
|||||||
"github.com/duanhf2012/origin/v2/rpc"
|
"github.com/duanhf2012/origin/v2/rpc"
|
||||||
jsoniter "github.com/json-iterator/go"
|
jsoniter "github.com/json-iterator/go"
|
||||||
"gopkg.in/yaml.v3"
|
"gopkg.in/yaml.v3"
|
||||||
|
"io/fs"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"strings"
|
"strings"
|
||||||
@@ -70,12 +71,8 @@ type NodeInfoList struct {
|
|||||||
NodeList []NodeInfo
|
NodeList []NodeInfo
|
||||||
}
|
}
|
||||||
|
|
||||||
func validConfigFile(f os.DirEntry) bool {
|
func validConfigFile(f string) bool {
|
||||||
if f.IsDir() == true || (filepath.Ext(f.Name()) != ".json" && filepath.Ext(f.Name()) != ".yml" && filepath.Ext(f.Name()) != ".yaml") {
|
return strings.HasSuffix(f, ".json")|| strings.HasSuffix(f, ".yml") || strings.HasSuffix(f, ".yaml")
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
return true
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func yamlToJson(data []byte, v interface{}) ([]byte, error) {
|
func yamlToJson(data []byte, v interface{}) ([]byte, error) {
|
||||||
@@ -277,32 +274,33 @@ func (cls *Cluster) readLocalClusterConfig(nodeId string) (DiscoveryInfo, []Node
|
|||||||
var discoveryInfo DiscoveryInfo
|
var discoveryInfo DiscoveryInfo
|
||||||
var rpcMode RpcMode
|
var rpcMode RpcMode
|
||||||
|
|
||||||
clusterCfgPath := strings.TrimRight(configDir, "/") + "/cluster"
|
|
||||||
fileInfoList, err := os.ReadDir(clusterCfgPath)
|
|
||||||
if err != nil {
|
|
||||||
return discoveryInfo, nil, rpcMode, fmt.Errorf("read dir %s is fail :%+v", clusterCfgPath, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
//读取任何文件,只读符合格式的配置,目录下的文件可以自定义分文件
|
//读取任何文件,只读符合格式的配置,目录下的文件可以自定义分文件
|
||||||
for _, f := range fileInfoList {
|
err := filepath.Walk(configDir, func(path string, info fs.FileInfo, err error)error {
|
||||||
if !validConfigFile(f) {
|
if info.IsDir() {
|
||||||
continue
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
filePath := strings.TrimRight(strings.TrimRight(clusterCfgPath, "/"), "\\") + "/" + f.Name()
|
if err != nil {
|
||||||
fileNodeInfoList, rErr := cls.ReadClusterConfig(filePath)
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if !validConfigFile(info.Name()) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
fileNodeInfoList, rErr := cls.ReadClusterConfig(path)
|
||||||
if rErr != nil {
|
if rErr != nil {
|
||||||
return discoveryInfo, nil, rpcMode, fmt.Errorf("read file path %s is error:%+v", filePath, rErr)
|
return fmt.Errorf("read file path %s is error:%+v", path, rErr)
|
||||||
}
|
}
|
||||||
|
|
||||||
err = cls.SetRpcMode(&fileNodeInfoList.RpcMode, &rpcMode)
|
err = cls.SetRpcMode(&fileNodeInfoList.RpcMode, &rpcMode)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return discoveryInfo, nil, rpcMode, err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
err = discoveryInfo.setDiscovery(&fileNodeInfoList.Discovery)
|
err = discoveryInfo.setDiscovery(&fileNodeInfoList.Discovery)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return discoveryInfo, nil, rpcMode, err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, nodeInfo := range fileNodeInfoList.NodeList {
|
for _, nodeInfo := range fileNodeInfoList.NodeList {
|
||||||
@@ -310,6 +308,12 @@ func (cls *Cluster) readLocalClusterConfig(nodeId string) (DiscoveryInfo, []Node
|
|||||||
nodeInfoList = append(nodeInfoList, nodeInfo)
|
nodeInfoList = append(nodeInfoList, nodeInfo)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return discoveryInfo, nil, rpcMode, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if nodeId != rpc.NodeIdNull && (len(nodeInfoList) != 1) {
|
if nodeId != rpc.NodeIdNull && (len(nodeInfoList) != 1) {
|
||||||
@@ -331,32 +335,32 @@ func (cls *Cluster) readLocalClusterConfig(nodeId string) (DiscoveryInfo, []Node
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (cls *Cluster) readLocalService(localNodeId string) error {
|
func (cls *Cluster) readLocalService(localNodeId string) error {
|
||||||
clusterCfgPath := strings.TrimRight(configDir, "/") + "/cluster"
|
|
||||||
fileInfoList, err := os.ReadDir(clusterCfgPath)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("read dir %s is fail :%+v", clusterCfgPath, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
var globalCfg interface{}
|
var globalCfg interface{}
|
||||||
publicService := map[string]interface{}{}
|
publicService := map[string]interface{}{}
|
||||||
nodeService := map[string]interface{}{}
|
nodeService := map[string]interface{}{}
|
||||||
|
|
||||||
//读取任何文件,只读符合格式的配置,目录下的文件可以自定义分文件
|
//读取任何文件,只读符合格式的配置,目录下的文件可以自定义分文件
|
||||||
for _, f := range fileInfoList {
|
err := filepath.Walk(configDir, func(path string, info fs.FileInfo, err error)error{
|
||||||
if !validConfigFile(f) {
|
if info.IsDir() {
|
||||||
continue
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
filePath := strings.TrimRight(strings.TrimRight(clusterCfgPath, "/"), "\\") + "/" + f.Name()
|
|
||||||
currGlobalCfg, serviceConfig, mapNodeService, err := cls.readServiceConfig(filePath)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
continue
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if !validConfigFile(info.Name()) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
currGlobalCfg, serviceConfig, mapNodeService, err := cls.readServiceConfig(path)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if currGlobalCfg != nil {
|
if currGlobalCfg != nil {
|
||||||
//不允许重复的配置global配置
|
//不允许重复的配置global配置
|
||||||
if globalCfg != nil {
|
if globalCfg != nil {
|
||||||
return fmt.Errorf("[Global] does not allow repeated configuration in %s", f.Name())
|
return fmt.Errorf("[Global] does not allow repeated configuration in %s", info.Name())
|
||||||
}
|
}
|
||||||
globalCfg = currGlobalCfg
|
globalCfg = currGlobalCfg
|
||||||
}
|
}
|
||||||
@@ -372,7 +376,7 @@ func (cls *Cluster) readLocalService(localNodeId string) error {
|
|||||||
pubCfg, ok := serviceConfig[s]
|
pubCfg, ok := serviceConfig[s]
|
||||||
if ok == true {
|
if ok == true {
|
||||||
if _, publicOk := publicService[s]; publicOk == true {
|
if _, publicOk := publicService[s]; publicOk == true {
|
||||||
return fmt.Errorf("public service [%s] does not allow repeated configuration in %s", s, f.Name())
|
return fmt.Errorf("public service [%s] does not allow repeated configuration in %s", s, info.Name())
|
||||||
}
|
}
|
||||||
publicService[s] = pubCfg
|
publicService[s] = pubCfg
|
||||||
}
|
}
|
||||||
@@ -388,12 +392,17 @@ func (cls *Cluster) readLocalService(localNodeId string) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if _, nodeOK := nodeService[s]; nodeOK == true {
|
if _, nodeOK := nodeService[s]; nodeOK == true {
|
||||||
return fmt.Errorf("NodeService NodeId[%s] Service[%s] does not allow repeated configuration in %s", cls.localNodeInfo.NodeId, s, f.Name())
|
return fmt.Errorf("NodeService NodeId[%s] Service[%s] does not allow repeated configuration in %s", cls.localNodeInfo.NodeId, s, info.Name())
|
||||||
}
|
}
|
||||||
nodeService[s] = nodeCfg
|
nodeService[s] = nodeCfg
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
//组合所有的配置
|
//组合所有的配置
|
||||||
|
|||||||
112
log/log.go
112
log/log.go
@@ -1,31 +1,37 @@
|
|||||||
package log
|
package log
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"github.com/duanhf2012/rotatelogs"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
"go.uber.org/zap/zapcore"
|
"go.uber.org/zap/zapcore"
|
||||||
"gopkg.in/natefinch/lumberjack.v2"
|
"gopkg.in/natefinch/lumberjack.v2"
|
||||||
"os"
|
"os"
|
||||||
|
"path/filepath"
|
||||||
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
var isSetLogger bool
|
var isSetLogger bool
|
||||||
var gLogger = NewDefaultLogger()
|
var gLogger = NewDefaultLogger()
|
||||||
|
var LogLevel zapcore.Level
|
||||||
|
var MaxSize int
|
||||||
|
var LogPath string
|
||||||
|
var OpenConsole *bool
|
||||||
|
var LogChanLen int
|
||||||
|
|
||||||
type Logger struct {
|
type Logger struct {
|
||||||
*zap.Logger
|
*zap.Logger
|
||||||
stack bool
|
stack bool
|
||||||
|
|
||||||
OpenConsole *bool
|
FileName string
|
||||||
LogPath string
|
Skip int
|
||||||
FileName string
|
Encoder zapcore.Encoder
|
||||||
Skip int
|
SugaredLogger *zap.SugaredLogger
|
||||||
LogLevel zapcore.Level
|
CoreList []zapcore.Core
|
||||||
Encoder zapcore.Encoder
|
WriteSyncerFun []func() zapcore.WriteSyncer
|
||||||
LogConfig *lumberjack.Logger
|
|
||||||
SugaredLogger *zap.SugaredLogger
|
|
||||||
CoreList []zapcore.Core
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 设置Logger
|
||||||
func SetLogger(logger *Logger) {
|
func SetLogger(logger *Logger) {
|
||||||
if logger != nil {
|
if logger != nil {
|
||||||
gLogger = logger
|
gLogger = logger
|
||||||
@@ -33,6 +39,15 @@ func SetLogger(logger *Logger) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 设置ZapLogger
|
||||||
|
func SetZapLogger(zapLogger *zap.Logger) {
|
||||||
|
if zapLogger != nil {
|
||||||
|
gLogger = &Logger{}
|
||||||
|
gLogger.Logger = zapLogger
|
||||||
|
isSetLogger = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func GetLogger() *Logger {
|
func GetLogger() *Logger {
|
||||||
return gLogger
|
return gLogger
|
||||||
}
|
}
|
||||||
@@ -47,8 +62,8 @@ func (logger *Logger) SetSkip(skip int) {
|
|||||||
|
|
||||||
func GetJsonEncoder() zapcore.Encoder {
|
func GetJsonEncoder() zapcore.Encoder {
|
||||||
encoderConfig := zap.NewProductionEncoderConfig()
|
encoderConfig := zap.NewProductionEncoderConfig()
|
||||||
encoderConfig.EncodeLevel = zapcore.CapitalLevelEncoder
|
|
||||||
encoderConfig.EncodeCaller = zapcore.ShortCallerEncoder
|
encoderConfig.EncodeCaller = zapcore.ShortCallerEncoder
|
||||||
|
encoderConfig.EncodeLevel = zapcore.CapitalLevelEncoder
|
||||||
encoderConfig.EncodeTime = func(t time.Time, enc zapcore.PrimitiveArrayEncoder) {
|
encoderConfig.EncodeTime = func(t time.Time, enc zapcore.PrimitiveArrayEncoder) {
|
||||||
enc.AppendString(t.Format("2006-01-02 15:04:05.000"))
|
enc.AppendString(t.Format("2006-01-02 15:04:05.000"))
|
||||||
}
|
}
|
||||||
@@ -67,51 +82,100 @@ func GetTxtEncoder() zapcore.Encoder {
|
|||||||
return zapcore.NewConsoleEncoder(encoderConfig)
|
return zapcore.NewConsoleEncoder(encoderConfig)
|
||||||
}
|
}
|
||||||
|
|
||||||
func getLogConfig() *lumberjack.Logger {
|
func (logger *Logger) getLogConfig() *lumberjack.Logger {
|
||||||
return &lumberjack.Logger{
|
return &lumberjack.Logger{
|
||||||
Filename: "",
|
Filename: filepath.Join(LogPath, logger.FileName),
|
||||||
MaxSize: 2048,
|
MaxSize: MaxSize,
|
||||||
MaxBackups: 0,
|
MaxBackups: 0,
|
||||||
MaxAge: 0,
|
MaxAge: 0,
|
||||||
Compress: false,
|
Compress: false,
|
||||||
|
LocalTime: true,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewDefaultLogger() *Logger {
|
func NewDefaultLogger() *Logger {
|
||||||
logger := Logger{}
|
logger := Logger{}
|
||||||
logger.Encoder = GetJsonEncoder()
|
logger.Encoder = GetJsonEncoder()
|
||||||
logger.LogConfig = getLogConfig()
|
core := zapcore.NewCore(logger.Encoder, zapcore.AddSync(os.Stdout), zap.InfoLevel)
|
||||||
logger.LogConfig.LocalTime = true
|
logger.Logger = zap.New(core, zap.AddCaller(), zap.AddCallerSkip(1))
|
||||||
|
|
||||||
logger.Init()
|
|
||||||
return &logger
|
return &logger
|
||||||
}
|
}
|
||||||
|
|
||||||
func (logger *Logger) SetLogLevel(level zapcore.Level) {
|
func (logger *Logger) SetSyncers(syncers ...func() zapcore.WriteSyncer) {
|
||||||
logger.LogLevel = level
|
logger.WriteSyncerFun = syncers
|
||||||
|
}
|
||||||
|
|
||||||
|
func (logger *Logger) AppendSyncerFun(syncerFun func() zapcore.WriteSyncer) {
|
||||||
|
logger.WriteSyncerFun = append(logger.WriteSyncerFun, syncerFun)
|
||||||
|
}
|
||||||
|
|
||||||
|
func SetLogLevel(level zapcore.Level) {
|
||||||
|
LogLevel = level
|
||||||
}
|
}
|
||||||
|
|
||||||
func (logger *Logger) Enabled(zapcore.Level) bool {
|
func (logger *Logger) Enabled(zapcore.Level) bool {
|
||||||
return logger.stack
|
return logger.stack
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (logger *Logger) NewLumberjackWriter() zapcore.WriteSyncer {
|
||||||
|
return zapcore.AddSync(
|
||||||
|
&lumberjack.Logger{
|
||||||
|
Filename: filepath.Join(LogPath, logger.FileName),
|
||||||
|
MaxSize: MaxSize,
|
||||||
|
MaxBackups: 0,
|
||||||
|
MaxAge: 0,
|
||||||
|
Compress: false,
|
||||||
|
LocalTime: true,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (logger *Logger) NewRotatelogsWriter() zapcore.WriteSyncer {
|
||||||
|
var options []rotatelogs.Option
|
||||||
|
|
||||||
|
if MaxSize > 0 {
|
||||||
|
options = append(options, rotatelogs.WithRotateMaxSize(int64(MaxSize)))
|
||||||
|
}
|
||||||
|
if LogChanLen > 0 {
|
||||||
|
options = append(options, rotatelogs.WithChannelLen(LogChanLen))
|
||||||
|
}
|
||||||
|
options = append(options, rotatelogs.WithRotationTime(time.Hour*24))
|
||||||
|
|
||||||
|
fileName := strings.TrimRight(logger.FileName, filepath.Ext(logger.FileName))
|
||||||
|
rotateLogs, err := rotatelogs.NewRotateLogs(LogPath, "20060102/"+fileName+"_20060102_150405", options...)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return zapcore.AddSync(rotateLogs)
|
||||||
|
}
|
||||||
|
|
||||||
func (logger *Logger) Init() {
|
func (logger *Logger) Init() {
|
||||||
if isSetLogger {
|
if isSetLogger {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var syncerList []zapcore.WriteSyncer
|
||||||
|
if logger.WriteSyncerFun == nil {
|
||||||
|
syncerList = append(syncerList, logger.NewRotatelogsWriter())
|
||||||
|
} else {
|
||||||
|
for _, syncer := range logger.WriteSyncerFun {
|
||||||
|
syncerList = append(syncerList, syncer())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
var coreList []zapcore.Core
|
var coreList []zapcore.Core
|
||||||
if logger.OpenConsole == nil || *logger.OpenConsole {
|
if OpenConsole == nil || *OpenConsole {
|
||||||
core := zapcore.NewCore(logger.Encoder, zapcore.AddSync(os.Stdout), logger.LogLevel)
|
syncerList = append(syncerList, zapcore.AddSync(os.Stdout))
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, writer := range syncerList {
|
||||||
|
core := zapcore.NewCore(logger.Encoder, writer, LogLevel)
|
||||||
coreList = append(coreList, core)
|
coreList = append(coreList, core)
|
||||||
}
|
}
|
||||||
|
|
||||||
if logger.CoreList != nil {
|
if logger.CoreList != nil {
|
||||||
coreList = append(coreList, logger.CoreList...)
|
coreList = append(coreList, logger.CoreList...)
|
||||||
}else if logger.LogPath != "" {
|
|
||||||
WriteSyncer := zapcore.AddSync(logger.LogConfig)
|
|
||||||
core := zapcore.NewCore(logger.Encoder, WriteSyncer, logger.LogLevel)
|
|
||||||
coreList = append(coreList, core)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
core := zapcore.NewTee(coreList...)
|
core := zapcore.NewTee(coreList...)
|
||||||
|
|||||||
@@ -73,9 +73,22 @@ func (handler *WSHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
|||||||
}
|
}
|
||||||
handler.conns[conn] = struct{}{}
|
handler.conns[conn] = struct{}{}
|
||||||
handler.mutexConns.Unlock()
|
handler.mutexConns.Unlock()
|
||||||
|
c,ok:=conn.NetConn().(*net.TCPConn)
|
||||||
|
if !ok {
|
||||||
|
tlsConn,ok := conn.NetConn().(*tls.Conn)
|
||||||
|
if !ok {
|
||||||
|
log.Error("conn error")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
c,ok = tlsConn.NetConn().(*net.TCPConn)
|
||||||
|
if !ok {
|
||||||
|
log.Error("conn error")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
conn.UnderlyingConn().(*net.TCPConn).SetLinger(0)
|
c.SetLinger(0)
|
||||||
conn.UnderlyingConn().(*net.TCPConn).SetNoDelay(true)
|
c.SetNoDelay(true)
|
||||||
wsConn := newWSConn(conn, r.Header, handler.pendingWriteNum, handler.maxMsgLen, handler.messageType)
|
wsConn := newWSConn(conn, r.Header, handler.pendingWriteNum, handler.maxMsgLen, handler.messageType)
|
||||||
agent := handler.newAgent(wsConn)
|
agent := handler.newAgent(wsConn)
|
||||||
agent.Run()
|
agent.Run()
|
||||||
|
|||||||
44
node/node.go
44
node/node.go
@@ -17,7 +17,6 @@ import (
|
|||||||
_ "net/http/pprof"
|
_ "net/http/pprof"
|
||||||
"os"
|
"os"
|
||||||
"os/signal"
|
"os/signal"
|
||||||
"path/filepath"
|
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"syscall"
|
"syscall"
|
||||||
@@ -59,6 +58,7 @@ func init() {
|
|||||||
console.RegisterCommandString("loglevel", "debug", "<-loglevel debug|info|warn|error|stackerror|fatal> Set loglevel.", setLevel)
|
console.RegisterCommandString("loglevel", "debug", "<-loglevel debug|info|warn|error|stackerror|fatal> Set loglevel.", setLevel)
|
||||||
console.RegisterCommandString("logpath", "", "<-logpath path> Set log file path.", setLogPath)
|
console.RegisterCommandString("logpath", "", "<-logpath path> Set log file path.", setLogPath)
|
||||||
console.RegisterCommandInt("logsize", 0, "<-logsize size> Set log size(MB).", setLogSize)
|
console.RegisterCommandInt("logsize", 0, "<-logsize size> Set log size(MB).", setLogSize)
|
||||||
|
console.RegisterCommandInt("logchanlen", 0, "<-logchanlen len> Set log channel len.", setLogChanLen)
|
||||||
console.RegisterCommandString("pprof", "", "<-pprof ip:port> Open performance analysis.", setPprof)
|
console.RegisterCommandString("pprof", "", "<-pprof ip:port> Open performance analysis.", setPprof)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -220,7 +220,7 @@ func initNode(id string) {
|
|||||||
|
|
||||||
func initLog() error {
|
func initLog() error {
|
||||||
logger := log.GetLogger()
|
logger := log.GetLogger()
|
||||||
if logger.LogPath == "" {
|
if log.LogPath == "" {
|
||||||
err := setLogPath("./log")
|
err := setLogPath("./log")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
@@ -230,7 +230,6 @@ func initLog() error {
|
|||||||
localNodeInfo := cluster.GetCluster().GetLocalNodeInfo()
|
localNodeInfo := cluster.GetCluster().GetLocalNodeInfo()
|
||||||
fileName := fmt.Sprintf("%s.log", localNodeInfo.NodeId)
|
fileName := fmt.Sprintf("%s.log", localNodeInfo.NodeId)
|
||||||
logger.FileName = fileName
|
logger.FileName = fileName
|
||||||
logger.LogConfig.Filename = filepath.Join(logger.LogPath, logger.FileName)
|
|
||||||
|
|
||||||
logger.Init()
|
logger.Init()
|
||||||
return nil
|
return nil
|
||||||
@@ -440,10 +439,10 @@ func openConsole(args interface{}) error {
|
|||||||
strOpen := strings.ToLower(strings.TrimSpace(args.(string)))
|
strOpen := strings.ToLower(strings.TrimSpace(args.(string)))
|
||||||
if strOpen == "false" {
|
if strOpen == "false" {
|
||||||
bOpenConsole := false
|
bOpenConsole := false
|
||||||
log.GetLogger().OpenConsole = &bOpenConsole
|
log.OpenConsole = &bOpenConsole
|
||||||
} else if strOpen == "true" {
|
} else if strOpen == "true" {
|
||||||
bOpenConsole := true
|
bOpenConsole := true
|
||||||
log.GetLogger().OpenConsole = &bOpenConsole
|
log.OpenConsole = &bOpenConsole
|
||||||
} else {
|
} else {
|
||||||
return errors.New("parameter console error")
|
return errors.New("parameter console error")
|
||||||
}
|
}
|
||||||
@@ -458,17 +457,17 @@ func setLevel(args interface{}) error {
|
|||||||
strlogLevel := strings.TrimSpace(args.(string))
|
strlogLevel := strings.TrimSpace(args.(string))
|
||||||
switch strlogLevel {
|
switch strlogLevel {
|
||||||
case "debug":
|
case "debug":
|
||||||
log.GetLogger().LogLevel = zapcore.DebugLevel
|
log.LogLevel = zapcore.DebugLevel
|
||||||
case "info":
|
case "info":
|
||||||
log.GetLogger().LogLevel = zapcore.InfoLevel
|
log.LogLevel = zapcore.InfoLevel
|
||||||
case "warn":
|
case "warn":
|
||||||
log.GetLogger().LogLevel = zapcore.WarnLevel
|
log.LogLevel = zapcore.WarnLevel
|
||||||
case "error":
|
case "error":
|
||||||
log.GetLogger().LogLevel = zapcore.ErrorLevel
|
log.LogLevel = zapcore.ErrorLevel
|
||||||
case "stackerror":
|
case "stackerror":
|
||||||
log.GetLogger().LogLevel = zapcore.ErrorLevel
|
log.LogLevel = zapcore.ErrorLevel
|
||||||
case "fatal":
|
case "fatal":
|
||||||
log.GetLogger().LogLevel = zapcore.FatalLevel
|
log.LogLevel = zapcore.FatalLevel
|
||||||
default:
|
default:
|
||||||
return errors.New("unknown level: " + strlogLevel)
|
return errors.New("unknown level: " + strlogLevel)
|
||||||
}
|
}
|
||||||
@@ -480,11 +479,7 @@ func setLogPath(args interface{}) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
logPath := strings.TrimSpace(args.(string))
|
logPath := strings.TrimSpace(args.(string))
|
||||||
dir, err := os.Stat(logPath)
|
_, err := os.Stat(logPath)
|
||||||
if err == nil && dir.IsDir() == false {
|
|
||||||
return errors.New("Not found dir " + logPath)
|
|
||||||
}
|
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
err = os.MkdirAll(logPath, os.ModePerm)
|
err = os.MkdirAll(logPath, os.ModePerm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -492,7 +487,7 @@ func setLogPath(args interface{}) error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
log.GetLogger().LogPath = logPath
|
log.LogPath = logPath
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -505,7 +500,20 @@ func setLogSize(args interface{}) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
log.GetLogger().LogConfig.MaxSize = logSize
|
log.MaxSize = logSize
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func setLogChanLen(args interface{}) error {
|
||||||
|
logChanLen, ok := args.(int)
|
||||||
|
if ok == false {
|
||||||
|
return errors.New("param logsize is error")
|
||||||
|
}
|
||||||
|
if logChanLen == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
log.LogChanLen = logChanLen
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|||||||
@@ -192,7 +192,7 @@ func (s *Service) run() {
|
|||||||
break
|
break
|
||||||
}
|
}
|
||||||
if s.profiler != nil {
|
if s.profiler != nil {
|
||||||
analyzer = s.profiler.Push("[Req]" + rpcRequest.RpcRequestData.GetServiceMethod())
|
analyzer = s.profiler.Push("[RpcReq]" + rpcRequest.RpcRequestData.GetServiceMethod()+"."+strconv.Itoa(int(rpcRequest.RpcRequestData.GetRpcMethodId())))
|
||||||
}
|
}
|
||||||
|
|
||||||
s.GetRpcHandler().HandlerRpcRequest(rpcRequest)
|
s.GetRpcHandler().HandlerRpcRequest(rpcRequest)
|
||||||
|
|||||||
@@ -248,7 +248,7 @@ func (ch *ConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession
|
|||||||
select {
|
select {
|
||||||
case msg := <-claim.Messages():
|
case msg := <-claim.Messages():
|
||||||
if msg == nil {
|
if msg == nil {
|
||||||
log.SWarning("claim will exit", log.Any("topic", claim.Topic()), log.Any("Partition", claim.Partition()))
|
log.SWarn("claim will exit", log.Any("topic", claim.Topic()), log.Any("Partition", claim.Partition()))
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
ch.AppendMsg(session, msg)
|
ch.AppendMsg(session, msg)
|
||||||
|
|||||||
@@ -86,7 +86,7 @@ func (p *Producer) asyncRun() {
|
|||||||
asyncReturn := sm.Metadata.(*AsyncReturn)
|
asyncReturn := sm.Metadata.(*AsyncReturn)
|
||||||
asyncReturn.chanReturn <- asyncReturn
|
asyncReturn.chanReturn <- asyncReturn
|
||||||
case em := <-p.Errors():
|
case em := <-p.Errors():
|
||||||
log.Error("async kafkamodule error", log.ErrorAttr("err", em.Err))
|
log.Error("async kafkamodule error", log.ErrorField("err", em.Err))
|
||||||
if em.Msg.Metadata == nil {
|
if em.Msg.Metadata == nil {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -95,7 +95,7 @@ func (m *MySQLModule) Begin() (*Tx, error) {
|
|||||||
var txDBModule Tx
|
var txDBModule Tx
|
||||||
txDb, err := m.db.Begin()
|
txDb, err := m.db.Begin()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("Begin error:%s", err.Error())
|
log.Error("Begin error", log.ErrorField("err",err))
|
||||||
return &txDBModule, err
|
return &txDBModule, err
|
||||||
}
|
}
|
||||||
txDBModule.slowDuration = m.slowDuration
|
txDBModule.slowDuration = m.slowDuration
|
||||||
@@ -155,7 +155,7 @@ func (m *MySQLModule) runPing() {
|
|||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-m.pingCoroutine.pintExit:
|
case <-m.pingCoroutine.pintExit:
|
||||||
log.Error("RunPing stopping %s...", fmt.Sprintf("%T", m))
|
log.Error("RunPing stopping",log.String("url", m.url),log.String("dbname", m.dbname))
|
||||||
return
|
return
|
||||||
case <-m.pingCoroutine.tickerPing.C:
|
case <-m.pingCoroutine.tickerPing.C:
|
||||||
if m.db != nil {
|
if m.db != nil {
|
||||||
@@ -221,12 +221,12 @@ func query(slowDuration time.Duration, db dbControl, strQuery string, args ...in
|
|||||||
datasetList.blur = true
|
datasetList.blur = true
|
||||||
|
|
||||||
if checkArgs(args) != nil {
|
if checkArgs(args) != nil {
|
||||||
log.Error("CheckArgs is error :%s", strQuery)
|
log.Error("CheckArgs is error",log.String("sql",strQuery))
|
||||||
return &datasetList, fmt.Errorf("checkArgs is error")
|
return &datasetList, fmt.Errorf("checkArgs is error")
|
||||||
}
|
}
|
||||||
|
|
||||||
if db == nil {
|
if db == nil {
|
||||||
log.Error("cannot connect database:%s", strQuery)
|
log.Error("cannot connect database",log.String("sql", strQuery))
|
||||||
return &datasetList, fmt.Errorf("cannot connect database")
|
return &datasetList, fmt.Errorf("cannot connect database")
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -235,10 +235,10 @@ func query(slowDuration time.Duration, db dbControl, strQuery string, args ...in
|
|||||||
timeFuncPass := time.Since(TimeFuncStart)
|
timeFuncPass := time.Since(TimeFuncStart)
|
||||||
|
|
||||||
if checkSlow(slowDuration, timeFuncPass) {
|
if checkSlow(slowDuration, timeFuncPass) {
|
||||||
log.Error("DBModule QueryEx Time %s , Query :%s , args :%+v", timeFuncPass, strQuery, args)
|
log.Error("Query slow",log.Int64("time_ms",timeFuncPass.Milliseconds()),log.String("sql", strQuery), log.Any("args",args))
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("Query:%s(%v)", strQuery, err)
|
log.Error("Query error", log.String("sql",strQuery),log.ErrorField("err",err))
|
||||||
if rows != nil {
|
if rows != nil {
|
||||||
rows.Close()
|
rows.Close()
|
||||||
}
|
}
|
||||||
@@ -278,8 +278,8 @@ func query(slowDuration time.Duration, db dbControl, strQuery string, args ...in
|
|||||||
hasRet := rows.NextResultSet()
|
hasRet := rows.NextResultSet()
|
||||||
|
|
||||||
if hasRet == false {
|
if hasRet == false {
|
||||||
if rows.Err() != nil {
|
if rowErr :=rows.Err();rowErr != nil {
|
||||||
log.Error("Query:%s(%+v)", strQuery, rows)
|
log.Error("NextResultSet error", log.String("sql",strQuery), log.ErrorField("err",rowErr))
|
||||||
}
|
}
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
@@ -291,12 +291,12 @@ func query(slowDuration time.Duration, db dbControl, strQuery string, args ...in
|
|||||||
func exec(slowDuration time.Duration, db dbControl, strSql string, args ...interface{}) (*DBResult, error) {
|
func exec(slowDuration time.Duration, db dbControl, strSql string, args ...interface{}) (*DBResult, error) {
|
||||||
ret := &DBResult{}
|
ret := &DBResult{}
|
||||||
if db == nil {
|
if db == nil {
|
||||||
log.Error("cannot connect database:%s", strSql)
|
log.Error("cannot connect database", log.String("sql",strSql))
|
||||||
return ret, fmt.Errorf("cannot connect database")
|
return ret, fmt.Errorf("cannot connect database")
|
||||||
}
|
}
|
||||||
|
|
||||||
if checkArgs(args) != nil {
|
if checkArgs(args) != nil {
|
||||||
log.Error("CheckArgs is error :%s", strSql)
|
log.Error("CheckArgs is error", log.String("sql",strSql))
|
||||||
return ret, fmt.Errorf("checkArgs is error")
|
return ret, fmt.Errorf("checkArgs is error")
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -304,10 +304,10 @@ func exec(slowDuration time.Duration, db dbControl, strSql string, args ...inter
|
|||||||
res, err := db.Exec(strSql, args...)
|
res, err := db.Exec(strSql, args...)
|
||||||
timeFuncPass := time.Since(TimeFuncStart)
|
timeFuncPass := time.Since(TimeFuncStart)
|
||||||
if checkSlow(slowDuration, timeFuncPass) {
|
if checkSlow(slowDuration, timeFuncPass) {
|
||||||
log.Error("DBModule QueryEx Time %s , Query :%s , args :%+v", timeFuncPass, strSql, args)
|
log.Error("Exec slow",log.Int64("time_ms",timeFuncPass.Milliseconds()),log.String("sql",strSql),log.Any("args",args) )
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("Exec:%s(%v)", strSql, err)
|
log.Error("Exec error",log.String("sql",strSql),log.ErrorField("err", err))
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -34,6 +34,8 @@ type WSCfg struct {
|
|||||||
PendingWriteNum int
|
PendingWriteNum int
|
||||||
MaxMsgLen uint32
|
MaxMsgLen uint32
|
||||||
LittleEndian bool //是否小端序
|
LittleEndian bool //是否小端序
|
||||||
|
KeyFile string
|
||||||
|
CertFile string
|
||||||
}
|
}
|
||||||
|
|
||||||
type WSPackType int8
|
type WSPackType int8
|
||||||
@@ -62,13 +64,18 @@ func (ws *WSModule) OnInit() error {
|
|||||||
ws.WSServer.MaxMsgLen = ws.wsCfg.MaxMsgLen
|
ws.WSServer.MaxMsgLen = ws.wsCfg.MaxMsgLen
|
||||||
ws.WSServer.Addr = ws.wsCfg.ListenAddr
|
ws.WSServer.Addr = ws.wsCfg.ListenAddr
|
||||||
|
|
||||||
//3.设置解析处理器
|
if ws.wsCfg.KeyFile != "" && ws.wsCfg.CertFile != "" {
|
||||||
|
ws.WSServer.KeyFile = ws.wsCfg.KeyFile
|
||||||
|
ws.WSServer.CertFile = ws.wsCfg.CertFile
|
||||||
|
}
|
||||||
|
|
||||||
|
// 设置解析处理器
|
||||||
ws.process.SetByteOrder(ws.wsCfg.LittleEndian)
|
ws.process.SetByteOrder(ws.wsCfg.LittleEndian)
|
||||||
|
|
||||||
ws.mapClient = make(map[string]*WSClient, ws.WSServer.MaxConnNum)
|
ws.mapClient = make(map[string]*WSClient, ws.WSServer.MaxConnNum)
|
||||||
ws.WSServer.NewAgent = ws.NewWSClient
|
ws.WSServer.NewAgent = ws.NewWSClient
|
||||||
|
|
||||||
//4.设置网络事件处理
|
// 设置网络事件处理
|
||||||
ws.GetEventProcessor().RegEventReceiverFunc(event.Sys_Event_WebSocket, ws.GetEventHandler(), ws.wsEventHandler)
|
ws.GetEventProcessor().RegEventReceiverFunc(event.Sys_Event_WebSocket, ws.GetEventHandler(), ws.wsEventHandler)
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
|||||||
@@ -83,6 +83,7 @@ type HttpSession struct {
|
|||||||
sessionDone chan *HttpSession
|
sessionDone chan *HttpSession
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Deprecated: replace it with the GinModule
|
||||||
type HttpService struct {
|
type HttpService struct {
|
||||||
service.Service
|
service.Service
|
||||||
|
|
||||||
|
|||||||
@@ -104,11 +104,11 @@ func (cs *CustomerSubscriber) UnSubscribe() {
|
|||||||
func (cs *CustomerSubscriber) LoadLastIndex() {
|
func (cs *CustomerSubscriber) LoadLastIndex() {
|
||||||
for {
|
for {
|
||||||
if atomic.LoadInt32(&cs.isStop) != 0 {
|
if atomic.LoadInt32(&cs.isStop) != 0 {
|
||||||
log.Info("topic ", cs.topic, " out of subscription")
|
log.SInfo("topic ", cs.topic, " out of subscription")
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Info("customer ", cs.customerId, " start load last index ")
|
log.SInfo("customer ", cs.customerId, " start load last index ")
|
||||||
lastIndex, ret := cs.subscriber.dataPersist.LoadCustomerIndex(cs.topic, cs.customerId)
|
lastIndex, ret := cs.subscriber.dataPersist.LoadCustomerIndex(cs.topic, cs.customerId)
|
||||||
if ret == true {
|
if ret == true {
|
||||||
if lastIndex > 0 {
|
if lastIndex > 0 {
|
||||||
@@ -116,18 +116,18 @@ func (cs *CustomerSubscriber) LoadLastIndex() {
|
|||||||
} else {
|
} else {
|
||||||
//否则直接使用客户端发回来的
|
//否则直接使用客户端发回来的
|
||||||
}
|
}
|
||||||
log.Info("customer ", cs.customerId, " load finish,start index is ", cs.StartIndex)
|
log.SInfo("customer ", cs.customerId, " load finish,start index is ", cs.StartIndex)
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Info("customer ", cs.customerId, " load last index is fail...")
|
log.SInfo("customer ", cs.customerId, " load last index is fail...")
|
||||||
time.Sleep(5 * time.Second)
|
time.Sleep(5 * time.Second)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cs *CustomerSubscriber) SubscribeRun() {
|
func (cs *CustomerSubscriber) SubscribeRun() {
|
||||||
defer cs.subscriber.queueWait.Done()
|
defer cs.subscriber.queueWait.Done()
|
||||||
log.Info("topic ", cs.topic, " start subscription")
|
log.SInfo("topic ", cs.topic, " start subscription")
|
||||||
|
|
||||||
//加载之前的位置
|
//加载之前的位置
|
||||||
if cs.subscribeMethod == MethodLast {
|
if cs.subscribeMethod == MethodLast {
|
||||||
@@ -136,7 +136,7 @@ func (cs *CustomerSubscriber) SubscribeRun() {
|
|||||||
|
|
||||||
for {
|
for {
|
||||||
if atomic.LoadInt32(&cs.isStop) != 0 {
|
if atomic.LoadInt32(&cs.isStop) != 0 {
|
||||||
log.Info("topic ", cs.topic, " out of subscription")
|
log.SInfo("topic ", cs.topic, " out of subscription")
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -146,14 +146,14 @@ func (cs *CustomerSubscriber) SubscribeRun() {
|
|||||||
|
|
||||||
//todo 检测退出
|
//todo 检测退出
|
||||||
if cs.subscribe() == false {
|
if cs.subscribe() == false {
|
||||||
log.Info("topic ", cs.topic, " out of subscription")
|
log.SInfo("topic ", cs.topic, " out of subscription")
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
//删除订阅关系
|
//删除订阅关系
|
||||||
cs.subscriber.removeCustomer(cs.customerId, cs)
|
cs.subscriber.removeCustomer(cs.customerId, cs)
|
||||||
log.Info("topic ", cs.topic, " unsubscription")
|
log.SInfo("topic ", cs.topic, " unsubscription")
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cs *CustomerSubscriber) subscribe() bool {
|
func (cs *CustomerSubscriber) subscribe() bool {
|
||||||
|
|||||||
@@ -63,7 +63,7 @@ func (ms *MessageQueueService) ReadCfg() error {
|
|||||||
maxProcessTopicBacklogNum, ok := mapDBServiceCfg["MaxProcessTopicBacklogNum"]
|
maxProcessTopicBacklogNum, ok := mapDBServiceCfg["MaxProcessTopicBacklogNum"]
|
||||||
if ok == false {
|
if ok == false {
|
||||||
ms.maxProcessTopicBacklogNum = DefaultMaxTopicBacklogNum
|
ms.maxProcessTopicBacklogNum = DefaultMaxTopicBacklogNum
|
||||||
log.Info("MaxProcessTopicBacklogNum config is set to the default value of ", maxProcessTopicBacklogNum)
|
log.SInfo("MaxProcessTopicBacklogNum config is set to the default value of ", maxProcessTopicBacklogNum)
|
||||||
} else {
|
} else {
|
||||||
ms.maxProcessTopicBacklogNum = int32(maxProcessTopicBacklogNum.(float64))
|
ms.maxProcessTopicBacklogNum = int32(maxProcessTopicBacklogNum.(float64))
|
||||||
}
|
}
|
||||||
@@ -71,7 +71,7 @@ func (ms *MessageQueueService) ReadCfg() error {
|
|||||||
memoryQueueLen, ok := mapDBServiceCfg["MemoryQueueLen"]
|
memoryQueueLen, ok := mapDBServiceCfg["MemoryQueueLen"]
|
||||||
if ok == false {
|
if ok == false {
|
||||||
ms.memoryQueueLen = DefaultMemoryQueueLen
|
ms.memoryQueueLen = DefaultMemoryQueueLen
|
||||||
log.Info("MemoryQueueLen config is set to the default value of ", DefaultMemoryQueueLen)
|
log.SInfo("MemoryQueueLen config is set to the default value of ", DefaultMemoryQueueLen)
|
||||||
} else {
|
} else {
|
||||||
ms.memoryQueueLen = int32(memoryQueueLen.(float64))
|
ms.memoryQueueLen = int32(memoryQueueLen.(float64))
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -237,7 +237,7 @@ func (mp *MongoPersist) findTopicData(topic string, startIndex uint64, limit int
|
|||||||
defer cancelAll()
|
defer cancelAll()
|
||||||
err = cursor.All(ctxAll, &res)
|
err = cursor.All(ctxAll, &res)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("find collect name ", topic, " is error", log.ErrorAttr("err", err))
|
log.Error("find collect name error",log.String("topic",topic) ,log.ErrorField("err",err))
|
||||||
return nil, false
|
return nil, false
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -246,7 +246,7 @@ func (mp *MongoPersist) findTopicData(topic string, startIndex uint64, limit int
|
|||||||
rawData, errM := bson.Marshal(res[i])
|
rawData, errM := bson.Marshal(res[i])
|
||||||
if errM != nil {
|
if errM != nil {
|
||||||
if errM != nil {
|
if errM != nil {
|
||||||
log.Error("collect name ", topic, " Marshal is error", log.ErrorAttr("err", err))
|
log.Error("Marshal error",log.String("topic",topic) , log.ErrorField("err", err))
|
||||||
return nil, false
|
return nil, false
|
||||||
}
|
}
|
||||||
continue
|
continue
|
||||||
@@ -391,7 +391,7 @@ func (mp *MongoPersist) GetIndex(topicData *TopicData) uint64 {
|
|||||||
if e.Key == "_id" {
|
if e.Key == "_id" {
|
||||||
errC, seq := convertToNumber[uint64](e.Value)
|
errC, seq := convertToNumber[uint64](e.Value)
|
||||||
if errC != nil {
|
if errC != nil {
|
||||||
log.Error("value is error:%s,%+v, ", errC.Error(), e.Value)
|
log.Error("value is error", log.ErrorField("err",errC), log.Any("val",e.Value))
|
||||||
}
|
}
|
||||||
|
|
||||||
return seq
|
return seq
|
||||||
|
|||||||
@@ -56,9 +56,9 @@ func (ss *Subscriber) TopicSubscribe(rpcHandler rpc.IRpcHandler, subScribeType r
|
|||||||
}
|
}
|
||||||
|
|
||||||
if ok == true {
|
if ok == true {
|
||||||
log.Info("repeat subscription for customer ", customerId)
|
log.SInfo("repeat subscription for customer ", customerId)
|
||||||
} else {
|
} else {
|
||||||
log.Info("subscription for customer ", customerId)
|
log.SInfo("subscription for customer ", customerId)
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
@@ -72,7 +72,7 @@ func (ss *Subscriber) UnSubscribe(customerId string) {
|
|||||||
|
|
||||||
customerSubscriber, ok := ss.mapCustomer[customerId]
|
customerSubscriber, ok := ss.mapCustomer[customerId]
|
||||||
if ok == false {
|
if ok == false {
|
||||||
log.SWarning("failed to unsubscribe customer " + customerId)
|
log.SWarn("failed to unsubscribe customer ", customerId)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -93,7 +93,7 @@ func (tr *TopicRoom) Stop() {
|
|||||||
func (tr *TopicRoom) topicRoomRun() {
|
func (tr *TopicRoom) topicRoomRun() {
|
||||||
defer tr.queueWait.Done()
|
defer tr.queueWait.Done()
|
||||||
|
|
||||||
log.Info("topic room ", tr.topic, " is running..")
|
log.SInfo("topic room ", tr.topic, " is running..")
|
||||||
for {
|
for {
|
||||||
if atomic.LoadInt32(&tr.isStop) != 0 {
|
if atomic.LoadInt32(&tr.isStop) != 0 {
|
||||||
break
|
break
|
||||||
@@ -145,5 +145,5 @@ func (tr *TopicRoom) topicRoomRun() {
|
|||||||
}
|
}
|
||||||
tr.customerLocker.Unlock()
|
tr.customerLocker.Unlock()
|
||||||
|
|
||||||
log.Info("topic room ", tr.topic, " is stop")
|
log.SInfo("topic room ", tr.topic, " is stop")
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -142,13 +142,13 @@ func (mp *MongoPersist) OnSetupRank(manual bool, rankSkip *RankSkip) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Info("start load rank ", rankSkip.GetRankName(), " from mongodb.")
|
log.SInfo("start load rank ", rankSkip.GetRankName(), " from mongodb.")
|
||||||
err := mp.loadFromDB(rankSkip.GetRankID(), rankSkip.GetRankName())
|
err := mp.loadFromDB(rankSkip.GetRankID(), rankSkip.GetRankName())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.SError("load from db is fail :%s", err.Error())
|
log.SError("load from db is fail :%s", err.Error())
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
log.Info("finish load rank ", rankSkip.GetRankName(), " from mongodb.")
|
log.SInfo("finish load rank ", rankSkip.GetRankName(), " from mongodb.")
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -296,7 +296,7 @@ func (mp *MongoPersist) saveToDB() {
|
|||||||
buf := make([]byte, 4096)
|
buf := make([]byte, 4096)
|
||||||
l := runtime.Stack(buf, false)
|
l := runtime.Stack(buf, false)
|
||||||
errString := fmt.Sprint(r)
|
errString := fmt.Sprint(r)
|
||||||
log.Dump(string(buf[:l]), log.String("error", errString))
|
log.StackError(string(buf[:l]), log.String("error", errString))
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
|||||||
@@ -14,6 +14,7 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// Deprecated: replace it with the TcpModule
|
||||||
type TcpService struct {
|
type TcpService struct {
|
||||||
tcpServer network.TCPServer
|
tcpServer network.TCPServer
|
||||||
service.Service
|
service.Service
|
||||||
|
|||||||
@@ -12,6 +12,7 @@ import (
|
|||||||
"sync"
|
"sync"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// Deprecated: replace it with the WSModule
|
||||||
type WSService struct {
|
type WSService struct {
|
||||||
service.Service
|
service.Service
|
||||||
wsServer network.WSServer
|
wsServer network.WSServer
|
||||||
|
|||||||
Reference in New Issue
Block a user