mirror of
https://github.com/duanhf2012/origin.git
synced 2026-02-11 13:04:41 +08:00
Compare commits
3 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
4d36e525a5 | ||
|
|
3a4350769c | ||
|
|
d4966ea129 |
@@ -13,10 +13,12 @@ import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"go.uber.org/zap"
|
||||
"path"
|
||||
"strings"
|
||||
"sync/atomic"
|
||||
"io/ioutil"
|
||||
"crypto/x509"
|
||||
"crypto/tls"
|
||||
)
|
||||
|
||||
const originDir = "/origin"
|
||||
@@ -40,11 +42,16 @@ type EtcdDiscoveryService struct {
|
||||
mapDiscoveryNodeId map[string]map[string]struct{} //map[networkName]map[nodeId]
|
||||
}
|
||||
|
||||
var etcdDiscovery *EtcdDiscoveryService
|
||||
func getEtcdDiscovery() IServiceDiscovery {
|
||||
etcdDiscovery := &EtcdDiscoveryService{}
|
||||
if etcdDiscovery == nil {
|
||||
etcdDiscovery = &EtcdDiscoveryService{}
|
||||
}
|
||||
|
||||
return etcdDiscovery
|
||||
}
|
||||
|
||||
|
||||
func (ed *EtcdDiscoveryService) InitDiscovery(localNodeId string, funDelNode FunDelNode, funSetNode FunSetNode) error {
|
||||
ed.localNodeId = localNodeId
|
||||
|
||||
@@ -87,15 +94,44 @@ func (ed *EtcdDiscoveryService) OnInit() error {
|
||||
}
|
||||
|
||||
for i := 0; i < len(etcdDiscoveryCfg.EtcdList); i++ {
|
||||
client, cerr := clientv3.New(clientv3.Config{
|
||||
var client *clientv3.Client
|
||||
var tlsConfig *tls.Config
|
||||
|
||||
if etcdDiscoveryCfg.EtcdList[i].Cert != "" {
|
||||
// load cert
|
||||
cert, cerr := tls.LoadX509KeyPair(etcdDiscoveryCfg.EtcdList[i].Cert, etcdDiscoveryCfg.EtcdList[i].CertKey)
|
||||
if cerr != nil {
|
||||
log.Error("load cert error", log.ErrorField("err", cerr))
|
||||
return cerr
|
||||
}
|
||||
|
||||
// load root ca
|
||||
caData, cerr := ioutil.ReadFile(etcdDiscoveryCfg.EtcdList[i].Ca)
|
||||
if cerr != nil {
|
||||
log.Error("load root ca error", log.ErrorField("err", cerr))
|
||||
return cerr
|
||||
}
|
||||
pool := x509.NewCertPool()
|
||||
pool.AppendCertsFromPEM(caData)
|
||||
tlsConfig = &tls.Config{
|
||||
Certificates: []tls.Certificate{cert},
|
||||
RootCAs: pool,
|
||||
}
|
||||
}
|
||||
|
||||
client, err = clientv3.New(clientv3.Config{
|
||||
Endpoints: etcdDiscoveryCfg.EtcdList[i].Endpoints,
|
||||
DialTimeout: etcdDiscoveryCfg.DialTimeoutMillisecond,
|
||||
Logger: zap.NewNop(),
|
||||
Username: etcdDiscoveryCfg.EtcdList[i].UserName,
|
||||
Password: etcdDiscoveryCfg.EtcdList[i].Password,
|
||||
Logger: log.GetLogger().Logger,
|
||||
TLS: tlsConfig,
|
||||
})
|
||||
|
||||
if cerr != nil {
|
||||
log.Error("etcd discovery init fail", log.ErrorField("err", cerr))
|
||||
return cerr
|
||||
|
||||
if err != nil {
|
||||
log.Error("etcd discovery init fail", log.ErrorField("err", err))
|
||||
return err
|
||||
}
|
||||
|
||||
ctx, _ := context.WithTimeout(context.Background(), time.Second*3)
|
||||
|
||||
@@ -7,6 +7,7 @@ import (
|
||||
"github.com/duanhf2012/origin/v2/rpc"
|
||||
jsoniter "github.com/json-iterator/go"
|
||||
"gopkg.in/yaml.v3"
|
||||
"io/fs"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
@@ -15,9 +16,15 @@ import (
|
||||
|
||||
var json = jsoniter.ConfigCompatibleWithStandardLibrary
|
||||
|
||||
|
||||
type EtcdList struct {
|
||||
NetworkName []string
|
||||
Endpoints []string
|
||||
UserName string
|
||||
Password string
|
||||
Cert string
|
||||
CertKey string
|
||||
Ca string
|
||||
}
|
||||
|
||||
type EtcdDiscovery struct {
|
||||
@@ -64,12 +71,8 @@ type NodeInfoList struct {
|
||||
NodeList []NodeInfo
|
||||
}
|
||||
|
||||
func validConfigFile(f os.DirEntry) bool {
|
||||
if f.IsDir() == true || (filepath.Ext(f.Name()) != ".json" && filepath.Ext(f.Name()) != ".yml" && filepath.Ext(f.Name()) != ".yaml") {
|
||||
return false
|
||||
}
|
||||
|
||||
return true
|
||||
func validConfigFile(f string) bool {
|
||||
return strings.HasSuffix(f, ".json")|| strings.HasSuffix(f, ".yml") || strings.HasSuffix(f, ".yaml")
|
||||
}
|
||||
|
||||
func yamlToJson(data []byte, v interface{}) ([]byte, error) {
|
||||
@@ -271,32 +274,33 @@ func (cls *Cluster) readLocalClusterConfig(nodeId string) (DiscoveryInfo, []Node
|
||||
var discoveryInfo DiscoveryInfo
|
||||
var rpcMode RpcMode
|
||||
|
||||
clusterCfgPath := strings.TrimRight(configDir, "/") + "/cluster"
|
||||
fileInfoList, err := os.ReadDir(clusterCfgPath)
|
||||
if err != nil {
|
||||
return discoveryInfo, nil, rpcMode, fmt.Errorf("read dir %s is fail :%+v", clusterCfgPath, err)
|
||||
}
|
||||
|
||||
//读取任何文件,只读符合格式的配置,目录下的文件可以自定义分文件
|
||||
for _, f := range fileInfoList {
|
||||
if !validConfigFile(f) {
|
||||
continue
|
||||
err := filepath.Walk(configDir, func(path string, info fs.FileInfo, err error)error {
|
||||
if info.IsDir() {
|
||||
return nil
|
||||
}
|
||||
|
||||
filePath := strings.TrimRight(strings.TrimRight(clusterCfgPath, "/"), "\\") + "/" + f.Name()
|
||||
fileNodeInfoList, rErr := cls.ReadClusterConfig(filePath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if !validConfigFile(info.Name()) {
|
||||
return nil
|
||||
}
|
||||
|
||||
fileNodeInfoList, rErr := cls.ReadClusterConfig(path)
|
||||
if rErr != nil {
|
||||
return discoveryInfo, nil, rpcMode, fmt.Errorf("read file path %s is error:%+v", filePath, rErr)
|
||||
return fmt.Errorf("read file path %s is error:%+v", path, rErr)
|
||||
}
|
||||
|
||||
err = cls.SetRpcMode(&fileNodeInfoList.RpcMode, &rpcMode)
|
||||
if err != nil {
|
||||
return discoveryInfo, nil, rpcMode, err
|
||||
return err
|
||||
}
|
||||
|
||||
err = discoveryInfo.setDiscovery(&fileNodeInfoList.Discovery)
|
||||
if err != nil {
|
||||
return discoveryInfo, nil, rpcMode, err
|
||||
return err
|
||||
}
|
||||
|
||||
for _, nodeInfo := range fileNodeInfoList.NodeList {
|
||||
@@ -304,6 +308,12 @@ func (cls *Cluster) readLocalClusterConfig(nodeId string) (DiscoveryInfo, []Node
|
||||
nodeInfoList = append(nodeInfoList, nodeInfo)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return discoveryInfo, nil, rpcMode, err
|
||||
}
|
||||
|
||||
if nodeId != rpc.NodeIdNull && (len(nodeInfoList) != 1) {
|
||||
@@ -325,32 +335,32 @@ func (cls *Cluster) readLocalClusterConfig(nodeId string) (DiscoveryInfo, []Node
|
||||
}
|
||||
|
||||
func (cls *Cluster) readLocalService(localNodeId string) error {
|
||||
clusterCfgPath := strings.TrimRight(configDir, "/") + "/cluster"
|
||||
fileInfoList, err := os.ReadDir(clusterCfgPath)
|
||||
if err != nil {
|
||||
return fmt.Errorf("read dir %s is fail :%+v", clusterCfgPath, err)
|
||||
}
|
||||
|
||||
var globalCfg interface{}
|
||||
publicService := map[string]interface{}{}
|
||||
nodeService := map[string]interface{}{}
|
||||
|
||||
//读取任何文件,只读符合格式的配置,目录下的文件可以自定义分文件
|
||||
for _, f := range fileInfoList {
|
||||
if !validConfigFile(f) {
|
||||
continue
|
||||
err := filepath.Walk(configDir, func(path string, info fs.FileInfo, err error)error{
|
||||
if info.IsDir() {
|
||||
return nil
|
||||
}
|
||||
|
||||
filePath := strings.TrimRight(strings.TrimRight(clusterCfgPath, "/"), "\\") + "/" + f.Name()
|
||||
currGlobalCfg, serviceConfig, mapNodeService, err := cls.readServiceConfig(filePath)
|
||||
if err != nil {
|
||||
continue
|
||||
return err
|
||||
}
|
||||
|
||||
if !validConfigFile(info.Name()) {
|
||||
return nil
|
||||
}
|
||||
currGlobalCfg, serviceConfig, mapNodeService, err := cls.readServiceConfig(path)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if currGlobalCfg != nil {
|
||||
//不允许重复的配置global配置
|
||||
if globalCfg != nil {
|
||||
return fmt.Errorf("[Global] does not allow repeated configuration in %s", f.Name())
|
||||
return fmt.Errorf("[Global] does not allow repeated configuration in %s", info.Name())
|
||||
}
|
||||
globalCfg = currGlobalCfg
|
||||
}
|
||||
@@ -366,7 +376,7 @@ func (cls *Cluster) readLocalService(localNodeId string) error {
|
||||
pubCfg, ok := serviceConfig[s]
|
||||
if ok == true {
|
||||
if _, publicOk := publicService[s]; publicOk == true {
|
||||
return fmt.Errorf("public service [%s] does not allow repeated configuration in %s", s, f.Name())
|
||||
return fmt.Errorf("public service [%s] does not allow repeated configuration in %s", s, info.Name())
|
||||
}
|
||||
publicService[s] = pubCfg
|
||||
}
|
||||
@@ -382,12 +392,17 @@ func (cls *Cluster) readLocalService(localNodeId string) error {
|
||||
}
|
||||
|
||||
if _, nodeOK := nodeService[s]; nodeOK == true {
|
||||
return fmt.Errorf("NodeService NodeId[%s] Service[%s] does not allow repeated configuration in %s", cls.localNodeInfo.NodeId, s, f.Name())
|
||||
return fmt.Errorf("NodeService NodeId[%s] Service[%s] does not allow repeated configuration in %s", cls.localNodeInfo.NodeId, s, info.Name())
|
||||
}
|
||||
nodeService[s] = nodeCfg
|
||||
break
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
//组合所有的配置
|
||||
|
||||
@@ -14,7 +14,7 @@ import (
|
||||
type WSModule struct {
|
||||
service.Module
|
||||
|
||||
wsServer network.WSServer
|
||||
WSServer network.WSServer
|
||||
|
||||
mapClientLocker sync.RWMutex
|
||||
mapClient map[string]*WSClient
|
||||
@@ -57,16 +57,16 @@ func (ws *WSModule) OnInit() error {
|
||||
return fmt.Errorf("please call the Init function correctly")
|
||||
}
|
||||
|
||||
ws.wsServer.MaxConnNum = ws.wsCfg.MaxConnNum
|
||||
ws.wsServer.PendingWriteNum = ws.wsCfg.PendingWriteNum
|
||||
ws.wsServer.MaxMsgLen = ws.wsCfg.MaxMsgLen
|
||||
ws.wsServer.Addr = ws.wsCfg.ListenAddr
|
||||
ws.WSServer.MaxConnNum = ws.wsCfg.MaxConnNum
|
||||
ws.WSServer.PendingWriteNum = ws.wsCfg.PendingWriteNum
|
||||
ws.WSServer.MaxMsgLen = ws.wsCfg.MaxMsgLen
|
||||
ws.WSServer.Addr = ws.wsCfg.ListenAddr
|
||||
|
||||
//3.设置解析处理器
|
||||
ws.process.SetByteOrder(ws.wsCfg.LittleEndian)
|
||||
|
||||
ws.mapClient = make(map[string]*WSClient, ws.wsServer.MaxConnNum)
|
||||
ws.wsServer.NewAgent = ws.NewWSClient
|
||||
ws.mapClient = make(map[string]*WSClient, ws.WSServer.MaxConnNum)
|
||||
ws.WSServer.NewAgent = ws.NewWSClient
|
||||
|
||||
//4.设置网络事件处理
|
||||
ws.GetEventProcessor().RegEventReceiverFunc(event.Sys_Event_WebSocket, ws.GetEventHandler(), ws.wsEventHandler)
|
||||
@@ -80,7 +80,7 @@ func (ws *WSModule) Init(wsCfg *WSCfg, process processor.IRawProcessor) {
|
||||
}
|
||||
|
||||
func (ws *WSModule) Start() error {
|
||||
return ws.wsServer.Start()
|
||||
return ws.WSServer.Start()
|
||||
}
|
||||
|
||||
func (ws *WSModule) wsEventHandler(ev event.IEvent) {
|
||||
@@ -197,3 +197,7 @@ func (ws *WSModule) SendRawMsg(clientId string, msg []byte) error {
|
||||
ws.mapClientLocker.Unlock()
|
||||
return client.wsConn.WriteMsg(msg)
|
||||
}
|
||||
|
||||
func (ws *WSModule) SetMessageType(messageType int) {
|
||||
ws.WSServer.SetMessageType(messageType)
|
||||
}
|
||||
|
||||
@@ -32,10 +32,10 @@ func Abs[NumType typ.Signed | typ.Float](Num NumType) NumType {
|
||||
func AddSafe[NumType typ.Number](number1 NumType, number2 NumType) (NumType, bool) {
|
||||
ret := number1 + number2
|
||||
if number2 > 0 && ret < number1 {
|
||||
log.Stack("Calculation overflow", log.Any("number1", number1), log.Any("number2", number2))
|
||||
log.SStackError("Calculation overflow", log.Any("number1", number1), log.Any("number2", number2))
|
||||
return ret, false
|
||||
} else if number2 < 0 && ret > number1 {
|
||||
log.Stack("Calculation overflow", log.Any("number1", number1), log.Any("number2", number2))
|
||||
log.SStackError("Calculation overflow", log.Any("number1", number1), log.Any("number2", number2))
|
||||
return ret, false
|
||||
}
|
||||
|
||||
@@ -45,10 +45,10 @@ func AddSafe[NumType typ.Number](number1 NumType, number2 NumType) (NumType, boo
|
||||
func SubSafe[NumType typ.Number](number1 NumType, number2 NumType) (NumType, bool) {
|
||||
ret := number1 - number2
|
||||
if number2 > 0 && ret > number1 {
|
||||
log.Stack("Calculation overflow", log.Any("number1", number1), log.Any("number2", number2))
|
||||
log.SStackError("Calculation overflow", log.Any("number1", number1), log.Any("number2", number2))
|
||||
return ret, false
|
||||
} else if number2 < 0 && ret < number1 {
|
||||
log.Stack("Calculation overflow", log.Any("number1", number1), log.Any("number2", number2))
|
||||
log.SStackError("Calculation overflow", log.Any("number1", number1), log.Any("number2", number2))
|
||||
return ret, false
|
||||
}
|
||||
|
||||
@@ -65,7 +65,7 @@ func MulSafe[NumType typ.Number](number1 NumType, number2 NumType) (NumType, boo
|
||||
return ret, true
|
||||
}
|
||||
|
||||
log.Stack("Calculation overflow", log.Any("number1", number1), log.Any("number2", number2))
|
||||
log.SStackError("Calculation overflow", log.Any("number1", number1), log.Any("number2", number2))
|
||||
return ret, true
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user