mirror of
https://github.com/duanhf2012/origin.git
synced 2026-02-13 07:04:44 +08:00
Compare commits
3 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
4d36e525a5 | ||
|
|
3a4350769c | ||
|
|
d4966ea129 |
@@ -13,10 +13,12 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"go.uber.org/zap"
|
|
||||||
"path"
|
"path"
|
||||||
"strings"
|
"strings"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
|
"io/ioutil"
|
||||||
|
"crypto/x509"
|
||||||
|
"crypto/tls"
|
||||||
)
|
)
|
||||||
|
|
||||||
const originDir = "/origin"
|
const originDir = "/origin"
|
||||||
@@ -40,11 +42,16 @@ type EtcdDiscoveryService struct {
|
|||||||
mapDiscoveryNodeId map[string]map[string]struct{} //map[networkName]map[nodeId]
|
mapDiscoveryNodeId map[string]map[string]struct{} //map[networkName]map[nodeId]
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var etcdDiscovery *EtcdDiscoveryService
|
||||||
func getEtcdDiscovery() IServiceDiscovery {
|
func getEtcdDiscovery() IServiceDiscovery {
|
||||||
etcdDiscovery := &EtcdDiscoveryService{}
|
if etcdDiscovery == nil {
|
||||||
|
etcdDiscovery = &EtcdDiscoveryService{}
|
||||||
|
}
|
||||||
|
|
||||||
return etcdDiscovery
|
return etcdDiscovery
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
func (ed *EtcdDiscoveryService) InitDiscovery(localNodeId string, funDelNode FunDelNode, funSetNode FunSetNode) error {
|
func (ed *EtcdDiscoveryService) InitDiscovery(localNodeId string, funDelNode FunDelNode, funSetNode FunSetNode) error {
|
||||||
ed.localNodeId = localNodeId
|
ed.localNodeId = localNodeId
|
||||||
|
|
||||||
@@ -87,15 +94,44 @@ func (ed *EtcdDiscoveryService) OnInit() error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
for i := 0; i < len(etcdDiscoveryCfg.EtcdList); i++ {
|
for i := 0; i < len(etcdDiscoveryCfg.EtcdList); i++ {
|
||||||
client, cerr := clientv3.New(clientv3.Config{
|
var client *clientv3.Client
|
||||||
|
var tlsConfig *tls.Config
|
||||||
|
|
||||||
|
if etcdDiscoveryCfg.EtcdList[i].Cert != "" {
|
||||||
|
// load cert
|
||||||
|
cert, cerr := tls.LoadX509KeyPair(etcdDiscoveryCfg.EtcdList[i].Cert, etcdDiscoveryCfg.EtcdList[i].CertKey)
|
||||||
|
if cerr != nil {
|
||||||
|
log.Error("load cert error", log.ErrorField("err", cerr))
|
||||||
|
return cerr
|
||||||
|
}
|
||||||
|
|
||||||
|
// load root ca
|
||||||
|
caData, cerr := ioutil.ReadFile(etcdDiscoveryCfg.EtcdList[i].Ca)
|
||||||
|
if cerr != nil {
|
||||||
|
log.Error("load root ca error", log.ErrorField("err", cerr))
|
||||||
|
return cerr
|
||||||
|
}
|
||||||
|
pool := x509.NewCertPool()
|
||||||
|
pool.AppendCertsFromPEM(caData)
|
||||||
|
tlsConfig = &tls.Config{
|
||||||
|
Certificates: []tls.Certificate{cert},
|
||||||
|
RootCAs: pool,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
client, err = clientv3.New(clientv3.Config{
|
||||||
Endpoints: etcdDiscoveryCfg.EtcdList[i].Endpoints,
|
Endpoints: etcdDiscoveryCfg.EtcdList[i].Endpoints,
|
||||||
DialTimeout: etcdDiscoveryCfg.DialTimeoutMillisecond,
|
DialTimeout: etcdDiscoveryCfg.DialTimeoutMillisecond,
|
||||||
Logger: zap.NewNop(),
|
Username: etcdDiscoveryCfg.EtcdList[i].UserName,
|
||||||
|
Password: etcdDiscoveryCfg.EtcdList[i].Password,
|
||||||
|
Logger: log.GetLogger().Logger,
|
||||||
|
TLS: tlsConfig,
|
||||||
})
|
})
|
||||||
|
|
||||||
if cerr != nil {
|
|
||||||
log.Error("etcd discovery init fail", log.ErrorField("err", cerr))
|
if err != nil {
|
||||||
return cerr
|
log.Error("etcd discovery init fail", log.ErrorField("err", err))
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
ctx, _ := context.WithTimeout(context.Background(), time.Second*3)
|
ctx, _ := context.WithTimeout(context.Background(), time.Second*3)
|
||||||
|
|||||||
@@ -7,6 +7,7 @@ import (
|
|||||||
"github.com/duanhf2012/origin/v2/rpc"
|
"github.com/duanhf2012/origin/v2/rpc"
|
||||||
jsoniter "github.com/json-iterator/go"
|
jsoniter "github.com/json-iterator/go"
|
||||||
"gopkg.in/yaml.v3"
|
"gopkg.in/yaml.v3"
|
||||||
|
"io/fs"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"strings"
|
"strings"
|
||||||
@@ -15,9 +16,15 @@ import (
|
|||||||
|
|
||||||
var json = jsoniter.ConfigCompatibleWithStandardLibrary
|
var json = jsoniter.ConfigCompatibleWithStandardLibrary
|
||||||
|
|
||||||
|
|
||||||
type EtcdList struct {
|
type EtcdList struct {
|
||||||
NetworkName []string
|
NetworkName []string
|
||||||
Endpoints []string
|
Endpoints []string
|
||||||
|
UserName string
|
||||||
|
Password string
|
||||||
|
Cert string
|
||||||
|
CertKey string
|
||||||
|
Ca string
|
||||||
}
|
}
|
||||||
|
|
||||||
type EtcdDiscovery struct {
|
type EtcdDiscovery struct {
|
||||||
@@ -64,12 +71,8 @@ type NodeInfoList struct {
|
|||||||
NodeList []NodeInfo
|
NodeList []NodeInfo
|
||||||
}
|
}
|
||||||
|
|
||||||
func validConfigFile(f os.DirEntry) bool {
|
func validConfigFile(f string) bool {
|
||||||
if f.IsDir() == true || (filepath.Ext(f.Name()) != ".json" && filepath.Ext(f.Name()) != ".yml" && filepath.Ext(f.Name()) != ".yaml") {
|
return strings.HasSuffix(f, ".json")|| strings.HasSuffix(f, ".yml") || strings.HasSuffix(f, ".yaml")
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
return true
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func yamlToJson(data []byte, v interface{}) ([]byte, error) {
|
func yamlToJson(data []byte, v interface{}) ([]byte, error) {
|
||||||
@@ -271,32 +274,33 @@ func (cls *Cluster) readLocalClusterConfig(nodeId string) (DiscoveryInfo, []Node
|
|||||||
var discoveryInfo DiscoveryInfo
|
var discoveryInfo DiscoveryInfo
|
||||||
var rpcMode RpcMode
|
var rpcMode RpcMode
|
||||||
|
|
||||||
clusterCfgPath := strings.TrimRight(configDir, "/") + "/cluster"
|
|
||||||
fileInfoList, err := os.ReadDir(clusterCfgPath)
|
|
||||||
if err != nil {
|
|
||||||
return discoveryInfo, nil, rpcMode, fmt.Errorf("read dir %s is fail :%+v", clusterCfgPath, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
//读取任何文件,只读符合格式的配置,目录下的文件可以自定义分文件
|
//读取任何文件,只读符合格式的配置,目录下的文件可以自定义分文件
|
||||||
for _, f := range fileInfoList {
|
err := filepath.Walk(configDir, func(path string, info fs.FileInfo, err error)error {
|
||||||
if !validConfigFile(f) {
|
if info.IsDir() {
|
||||||
continue
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
filePath := strings.TrimRight(strings.TrimRight(clusterCfgPath, "/"), "\\") + "/" + f.Name()
|
if err != nil {
|
||||||
fileNodeInfoList, rErr := cls.ReadClusterConfig(filePath)
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if !validConfigFile(info.Name()) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
fileNodeInfoList, rErr := cls.ReadClusterConfig(path)
|
||||||
if rErr != nil {
|
if rErr != nil {
|
||||||
return discoveryInfo, nil, rpcMode, fmt.Errorf("read file path %s is error:%+v", filePath, rErr)
|
return fmt.Errorf("read file path %s is error:%+v", path, rErr)
|
||||||
}
|
}
|
||||||
|
|
||||||
err = cls.SetRpcMode(&fileNodeInfoList.RpcMode, &rpcMode)
|
err = cls.SetRpcMode(&fileNodeInfoList.RpcMode, &rpcMode)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return discoveryInfo, nil, rpcMode, err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
err = discoveryInfo.setDiscovery(&fileNodeInfoList.Discovery)
|
err = discoveryInfo.setDiscovery(&fileNodeInfoList.Discovery)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return discoveryInfo, nil, rpcMode, err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, nodeInfo := range fileNodeInfoList.NodeList {
|
for _, nodeInfo := range fileNodeInfoList.NodeList {
|
||||||
@@ -304,6 +308,12 @@ func (cls *Cluster) readLocalClusterConfig(nodeId string) (DiscoveryInfo, []Node
|
|||||||
nodeInfoList = append(nodeInfoList, nodeInfo)
|
nodeInfoList = append(nodeInfoList, nodeInfo)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return discoveryInfo, nil, rpcMode, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if nodeId != rpc.NodeIdNull && (len(nodeInfoList) != 1) {
|
if nodeId != rpc.NodeIdNull && (len(nodeInfoList) != 1) {
|
||||||
@@ -325,32 +335,32 @@ func (cls *Cluster) readLocalClusterConfig(nodeId string) (DiscoveryInfo, []Node
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (cls *Cluster) readLocalService(localNodeId string) error {
|
func (cls *Cluster) readLocalService(localNodeId string) error {
|
||||||
clusterCfgPath := strings.TrimRight(configDir, "/") + "/cluster"
|
|
||||||
fileInfoList, err := os.ReadDir(clusterCfgPath)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("read dir %s is fail :%+v", clusterCfgPath, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
var globalCfg interface{}
|
var globalCfg interface{}
|
||||||
publicService := map[string]interface{}{}
|
publicService := map[string]interface{}{}
|
||||||
nodeService := map[string]interface{}{}
|
nodeService := map[string]interface{}{}
|
||||||
|
|
||||||
//读取任何文件,只读符合格式的配置,目录下的文件可以自定义分文件
|
//读取任何文件,只读符合格式的配置,目录下的文件可以自定义分文件
|
||||||
for _, f := range fileInfoList {
|
err := filepath.Walk(configDir, func(path string, info fs.FileInfo, err error)error{
|
||||||
if !validConfigFile(f) {
|
if info.IsDir() {
|
||||||
continue
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
filePath := strings.TrimRight(strings.TrimRight(clusterCfgPath, "/"), "\\") + "/" + f.Name()
|
|
||||||
currGlobalCfg, serviceConfig, mapNodeService, err := cls.readServiceConfig(filePath)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
continue
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if !validConfigFile(info.Name()) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
currGlobalCfg, serviceConfig, mapNodeService, err := cls.readServiceConfig(path)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if currGlobalCfg != nil {
|
if currGlobalCfg != nil {
|
||||||
//不允许重复的配置global配置
|
//不允许重复的配置global配置
|
||||||
if globalCfg != nil {
|
if globalCfg != nil {
|
||||||
return fmt.Errorf("[Global] does not allow repeated configuration in %s", f.Name())
|
return fmt.Errorf("[Global] does not allow repeated configuration in %s", info.Name())
|
||||||
}
|
}
|
||||||
globalCfg = currGlobalCfg
|
globalCfg = currGlobalCfg
|
||||||
}
|
}
|
||||||
@@ -366,7 +376,7 @@ func (cls *Cluster) readLocalService(localNodeId string) error {
|
|||||||
pubCfg, ok := serviceConfig[s]
|
pubCfg, ok := serviceConfig[s]
|
||||||
if ok == true {
|
if ok == true {
|
||||||
if _, publicOk := publicService[s]; publicOk == true {
|
if _, publicOk := publicService[s]; publicOk == true {
|
||||||
return fmt.Errorf("public service [%s] does not allow repeated configuration in %s", s, f.Name())
|
return fmt.Errorf("public service [%s] does not allow repeated configuration in %s", s, info.Name())
|
||||||
}
|
}
|
||||||
publicService[s] = pubCfg
|
publicService[s] = pubCfg
|
||||||
}
|
}
|
||||||
@@ -382,12 +392,17 @@ func (cls *Cluster) readLocalService(localNodeId string) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if _, nodeOK := nodeService[s]; nodeOK == true {
|
if _, nodeOK := nodeService[s]; nodeOK == true {
|
||||||
return fmt.Errorf("NodeService NodeId[%s] Service[%s] does not allow repeated configuration in %s", cls.localNodeInfo.NodeId, s, f.Name())
|
return fmt.Errorf("NodeService NodeId[%s] Service[%s] does not allow repeated configuration in %s", cls.localNodeInfo.NodeId, s, info.Name())
|
||||||
}
|
}
|
||||||
nodeService[s] = nodeCfg
|
nodeService[s] = nodeCfg
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
//组合所有的配置
|
//组合所有的配置
|
||||||
|
|||||||
@@ -14,7 +14,7 @@ import (
|
|||||||
type WSModule struct {
|
type WSModule struct {
|
||||||
service.Module
|
service.Module
|
||||||
|
|
||||||
wsServer network.WSServer
|
WSServer network.WSServer
|
||||||
|
|
||||||
mapClientLocker sync.RWMutex
|
mapClientLocker sync.RWMutex
|
||||||
mapClient map[string]*WSClient
|
mapClient map[string]*WSClient
|
||||||
@@ -57,16 +57,16 @@ func (ws *WSModule) OnInit() error {
|
|||||||
return fmt.Errorf("please call the Init function correctly")
|
return fmt.Errorf("please call the Init function correctly")
|
||||||
}
|
}
|
||||||
|
|
||||||
ws.wsServer.MaxConnNum = ws.wsCfg.MaxConnNum
|
ws.WSServer.MaxConnNum = ws.wsCfg.MaxConnNum
|
||||||
ws.wsServer.PendingWriteNum = ws.wsCfg.PendingWriteNum
|
ws.WSServer.PendingWriteNum = ws.wsCfg.PendingWriteNum
|
||||||
ws.wsServer.MaxMsgLen = ws.wsCfg.MaxMsgLen
|
ws.WSServer.MaxMsgLen = ws.wsCfg.MaxMsgLen
|
||||||
ws.wsServer.Addr = ws.wsCfg.ListenAddr
|
ws.WSServer.Addr = ws.wsCfg.ListenAddr
|
||||||
|
|
||||||
//3.设置解析处理器
|
//3.设置解析处理器
|
||||||
ws.process.SetByteOrder(ws.wsCfg.LittleEndian)
|
ws.process.SetByteOrder(ws.wsCfg.LittleEndian)
|
||||||
|
|
||||||
ws.mapClient = make(map[string]*WSClient, ws.wsServer.MaxConnNum)
|
ws.mapClient = make(map[string]*WSClient, ws.WSServer.MaxConnNum)
|
||||||
ws.wsServer.NewAgent = ws.NewWSClient
|
ws.WSServer.NewAgent = ws.NewWSClient
|
||||||
|
|
||||||
//4.设置网络事件处理
|
//4.设置网络事件处理
|
||||||
ws.GetEventProcessor().RegEventReceiverFunc(event.Sys_Event_WebSocket, ws.GetEventHandler(), ws.wsEventHandler)
|
ws.GetEventProcessor().RegEventReceiverFunc(event.Sys_Event_WebSocket, ws.GetEventHandler(), ws.wsEventHandler)
|
||||||
@@ -80,7 +80,7 @@ func (ws *WSModule) Init(wsCfg *WSCfg, process processor.IRawProcessor) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (ws *WSModule) Start() error {
|
func (ws *WSModule) Start() error {
|
||||||
return ws.wsServer.Start()
|
return ws.WSServer.Start()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ws *WSModule) wsEventHandler(ev event.IEvent) {
|
func (ws *WSModule) wsEventHandler(ev event.IEvent) {
|
||||||
@@ -197,3 +197,7 @@ func (ws *WSModule) SendRawMsg(clientId string, msg []byte) error {
|
|||||||
ws.mapClientLocker.Unlock()
|
ws.mapClientLocker.Unlock()
|
||||||
return client.wsConn.WriteMsg(msg)
|
return client.wsConn.WriteMsg(msg)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (ws *WSModule) SetMessageType(messageType int) {
|
||||||
|
ws.WSServer.SetMessageType(messageType)
|
||||||
|
}
|
||||||
|
|||||||
@@ -32,10 +32,10 @@ func Abs[NumType typ.Signed | typ.Float](Num NumType) NumType {
|
|||||||
func AddSafe[NumType typ.Number](number1 NumType, number2 NumType) (NumType, bool) {
|
func AddSafe[NumType typ.Number](number1 NumType, number2 NumType) (NumType, bool) {
|
||||||
ret := number1 + number2
|
ret := number1 + number2
|
||||||
if number2 > 0 && ret < number1 {
|
if number2 > 0 && ret < number1 {
|
||||||
log.Stack("Calculation overflow", log.Any("number1", number1), log.Any("number2", number2))
|
log.SStackError("Calculation overflow", log.Any("number1", number1), log.Any("number2", number2))
|
||||||
return ret, false
|
return ret, false
|
||||||
} else if number2 < 0 && ret > number1 {
|
} else if number2 < 0 && ret > number1 {
|
||||||
log.Stack("Calculation overflow", log.Any("number1", number1), log.Any("number2", number2))
|
log.SStackError("Calculation overflow", log.Any("number1", number1), log.Any("number2", number2))
|
||||||
return ret, false
|
return ret, false
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -45,10 +45,10 @@ func AddSafe[NumType typ.Number](number1 NumType, number2 NumType) (NumType, boo
|
|||||||
func SubSafe[NumType typ.Number](number1 NumType, number2 NumType) (NumType, bool) {
|
func SubSafe[NumType typ.Number](number1 NumType, number2 NumType) (NumType, bool) {
|
||||||
ret := number1 - number2
|
ret := number1 - number2
|
||||||
if number2 > 0 && ret > number1 {
|
if number2 > 0 && ret > number1 {
|
||||||
log.Stack("Calculation overflow", log.Any("number1", number1), log.Any("number2", number2))
|
log.SStackError("Calculation overflow", log.Any("number1", number1), log.Any("number2", number2))
|
||||||
return ret, false
|
return ret, false
|
||||||
} else if number2 < 0 && ret < number1 {
|
} else if number2 < 0 && ret < number1 {
|
||||||
log.Stack("Calculation overflow", log.Any("number1", number1), log.Any("number2", number2))
|
log.SStackError("Calculation overflow", log.Any("number1", number1), log.Any("number2", number2))
|
||||||
return ret, false
|
return ret, false
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -65,7 +65,7 @@ func MulSafe[NumType typ.Number](number1 NumType, number2 NumType) (NumType, boo
|
|||||||
return ret, true
|
return ret, true
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Stack("Calculation overflow", log.Any("number1", number1), log.Any("number2", number2))
|
log.SStackError("Calculation overflow", log.Any("number1", number1), log.Any("number2", number2))
|
||||||
return ret, true
|
return ret, true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user