Compare commits

..

4 Commits

Author SHA1 Message Date
boyce
15580ffce9 新增wss证书配置支持 2025-04-21 21:23:34 +08:00
boyce
bd467a219b 废弃掉HttpService、TcpService、WSService 2025-03-28 10:33:51 +08:00
boyce
af15615345 优化日志生成路径 2025-03-14 18:03:01 +08:00
boyce
50dd80b082 整理代码 2025-03-10 11:35:19 +08:00
11 changed files with 66 additions and 53 deletions

View File

@@ -40,8 +40,6 @@ type NodeInfo struct {
DiscoveryService []DiscoveryService //筛选发现的服务,如果不配置,不进行筛选 DiscoveryService []DiscoveryService //筛选发现的服务,如果不配置,不进行筛选
status NodeStatus status NodeStatus
Retire bool Retire bool
NetworkName string
} }
type NodeRpcInfo struct { type NodeRpcInfo struct {

View File

@@ -3,24 +3,23 @@ package cluster
import "github.com/duanhf2012/origin/v2/rpc" import "github.com/duanhf2012/origin/v2/rpc"
type ConfigDiscovery struct { type ConfigDiscovery struct {
funDelNode FunDelNode funDelNode FunDelNode
funSetNode FunSetNode funSetNode FunSetNode
localNodeId string localNodeId string
} }
func (discovery *ConfigDiscovery) InitDiscovery(localNodeId string, funDelNode FunDelNode, funSetNode FunSetNode) error {
func (discovery *ConfigDiscovery) InitDiscovery(localNodeId string,funDelNode FunDelNode,funSetNode FunSetNode) error{
discovery.localNodeId = localNodeId discovery.localNodeId = localNodeId
discovery.funDelNode = funDelNode discovery.funDelNode = funDelNode
discovery.funSetNode = funSetNode discovery.funSetNode = funSetNode
//解析本地其他服务配置 //解析本地其他服务配置
_,nodeInfoList,_,err := GetCluster().readLocalClusterConfig(rpc.NodeIdNull) _, nodeInfoList, _, err := GetCluster().readLocalClusterConfig(rpc.NodeIdNull)
if err != nil { if err != nil {
return err return err
} }
for _,nodeInfo := range nodeInfoList { for _, nodeInfo := range nodeInfoList {
if nodeInfo.NodeId == localNodeId { if nodeInfo.NodeId == localNodeId {
continue continue
} }
@@ -30,5 +29,3 @@ func (discovery *ConfigDiscovery) InitDiscovery(localNodeId string,funDelNode Fu
return nil return nil
} }

View File

@@ -5,17 +5,17 @@ import (
"github.com/duanhf2012/origin/v2/service" "github.com/duanhf2012/origin/v2/service"
) )
func (cls *Cluster) setupDiscovery(localNodeId string, setupServiceFun SetupServiceFun) error{ func (cls *Cluster) setupDiscovery(localNodeId string, setupServiceFun SetupServiceFun) error {
if cls.discoveryInfo.getDiscoveryType() == OriginType { //origin类型服务发现 if cls.discoveryInfo.getDiscoveryType() == OriginType { //origin类型服务发现
return cls.setupOriginDiscovery(localNodeId,setupServiceFun) return cls.setupOriginDiscovery(localNodeId, setupServiceFun)
}else if cls.discoveryInfo.getDiscoveryType() == EtcdType{//etcd类型服务发现 } else if cls.discoveryInfo.getDiscoveryType() == EtcdType { //etcd类型服务发现
return cls.setupEtcdDiscovery(localNodeId,setupServiceFun) return cls.setupEtcdDiscovery(localNodeId, setupServiceFun)
} }
return cls.setupConfigDiscovery(localNodeId,setupServiceFun) return cls.setupConfigDiscovery(localNodeId, setupServiceFun)
} }
func (cls *Cluster) setupOriginDiscovery(localNodeId string, setupServiceFun SetupServiceFun) error{ func (cls *Cluster) setupOriginDiscovery(localNodeId string, setupServiceFun SetupServiceFun) error {
if cls.serviceDiscovery != nil { if cls.serviceDiscovery != nil {
return errors.New("service discovery has been setup") return errors.New("service discovery has been setup")
} }
@@ -27,6 +27,7 @@ func (cls *Cluster) setupOriginDiscovery(localNodeId string, setupServiceFun Set
} }
cls.serviceDiscovery = getOriginDiscovery() cls.serviceDiscovery = getOriginDiscovery()
//2.如果为动态服务发现安装本地发现服务 //2.如果为动态服务发现安装本地发现服务
if localMaster == true { if localMaster == true {
setupServiceFun(&masterService) setupServiceFun(&masterService)
@@ -36,11 +37,10 @@ func (cls *Cluster) setupOriginDiscovery(localNodeId string, setupServiceFun Set
setupServiceFun(&clientService) setupServiceFun(&clientService)
cls.AddDiscoveryService(OriginDiscoveryClientName, true) cls.AddDiscoveryService(OriginDiscoveryClientName, true)
return nil return nil
} }
func (cls *Cluster) setupEtcdDiscovery(localNodeId string, setupServiceFun SetupServiceFun) error{ func (cls *Cluster) setupEtcdDiscovery(localNodeId string, setupServiceFun SetupServiceFun) error {
if cls.serviceDiscovery != nil { if cls.serviceDiscovery != nil {
return errors.New("service discovery has been setup") return errors.New("service discovery has been setup")
} }
@@ -49,11 +49,11 @@ func (cls *Cluster) setupEtcdDiscovery(localNodeId string, setupServiceFun Setup
cls.serviceDiscovery = getEtcdDiscovery() cls.serviceDiscovery = getEtcdDiscovery()
setupServiceFun(cls.serviceDiscovery.(service.IService)) setupServiceFun(cls.serviceDiscovery.(service.IService))
cls.AddDiscoveryService(cls.serviceDiscovery.(service.IService).GetName(),false) cls.AddDiscoveryService(cls.serviceDiscovery.(service.IService).GetName(), false)
return nil return nil
} }
func (cls *Cluster) setupConfigDiscovery(localNodeId string, setupServiceFun SetupServiceFun) error{ func (cls *Cluster) setupConfigDiscovery(localNodeId string, setupServiceFun SetupServiceFun) error {
if cls.serviceDiscovery != nil { if cls.serviceDiscovery != nil {
return errors.New("service discovery has been setup") return errors.New("service discovery has been setup")
} }

View File

@@ -1,6 +1,11 @@
package cluster package cluster
import ( import (
"context"
"crypto/tls"
"crypto/x509"
"errors"
"fmt"
"github.com/duanhf2012/origin/v2/event" "github.com/duanhf2012/origin/v2/event"
"github.com/duanhf2012/origin/v2/log" "github.com/duanhf2012/origin/v2/log"
"github.com/duanhf2012/origin/v2/rpc" "github.com/duanhf2012/origin/v2/rpc"
@@ -9,16 +14,11 @@ import (
"go.etcd.io/etcd/api/v3/mvccpb" "go.etcd.io/etcd/api/v3/mvccpb"
"go.etcd.io/etcd/client/v3" "go.etcd.io/etcd/client/v3"
"google.golang.org/protobuf/proto" "google.golang.org/protobuf/proto"
"time" "os"
"context"
"errors"
"fmt"
"path" "path"
"strings" "strings"
"sync/atomic" "sync/atomic"
"io/ioutil" "time"
"crypto/x509"
"crypto/tls"
) )
const originDir = "/origin" const originDir = "/origin"
@@ -42,7 +42,8 @@ type EtcdDiscoveryService struct {
mapDiscoveryNodeId map[string]map[string]struct{} //map[networkName]map[nodeId] mapDiscoveryNodeId map[string]map[string]struct{} //map[networkName]map[nodeId]
} }
var etcdDiscovery *EtcdDiscoveryService var etcdDiscovery *EtcdDiscoveryService
func getEtcdDiscovery() IServiceDiscovery { func getEtcdDiscovery() IServiceDiscovery {
if etcdDiscovery == nil { if etcdDiscovery == nil {
etcdDiscovery = &EtcdDiscoveryService{} etcdDiscovery = &EtcdDiscoveryService{}
@@ -51,7 +52,6 @@ func getEtcdDiscovery() IServiceDiscovery {
return etcdDiscovery return etcdDiscovery
} }
func (ed *EtcdDiscoveryService) InitDiscovery(localNodeId string, funDelNode FunDelNode, funSetNode FunSetNode) error { func (ed *EtcdDiscoveryService) InitDiscovery(localNodeId string, funDelNode FunDelNode, funSetNode FunSetNode) error {
ed.localNodeId = localNodeId ed.localNodeId = localNodeId
@@ -99,17 +99,17 @@ func (ed *EtcdDiscoveryService) OnInit() error {
if etcdDiscoveryCfg.EtcdList[i].Cert != "" { if etcdDiscoveryCfg.EtcdList[i].Cert != "" {
// load cert // load cert
cert, cerr := tls.LoadX509KeyPair(etcdDiscoveryCfg.EtcdList[i].Cert, etcdDiscoveryCfg.EtcdList[i].CertKey) cert, cErr := tls.LoadX509KeyPair(etcdDiscoveryCfg.EtcdList[i].Cert, etcdDiscoveryCfg.EtcdList[i].CertKey)
if cerr != nil { if cErr != nil {
log.Error("load cert error", log.ErrorField("err", cerr)) log.Error("load cert error", log.ErrorField("err", cErr))
return cerr return cErr
} }
// load root ca // load root ca
caData, cerr := ioutil.ReadFile(etcdDiscoveryCfg.EtcdList[i].Ca) caData, cErr := os.ReadFile(etcdDiscoveryCfg.EtcdList[i].Ca)
if cerr != nil { if cErr != nil {
log.Error("load root ca error", log.ErrorField("err", cerr)) log.Error("load root ca error", log.ErrorField("err", cErr))
return cerr return cErr
} }
pool := x509.NewCertPool() pool := x509.NewCertPool()
pool.AppendCertsFromPEM(caData) pool.AppendCertsFromPEM(caData)
@@ -122,13 +122,12 @@ func (ed *EtcdDiscoveryService) OnInit() error {
client, err = clientv3.New(clientv3.Config{ client, err = clientv3.New(clientv3.Config{
Endpoints: etcdDiscoveryCfg.EtcdList[i].Endpoints, Endpoints: etcdDiscoveryCfg.EtcdList[i].Endpoints,
DialTimeout: etcdDiscoveryCfg.DialTimeoutMillisecond, DialTimeout: etcdDiscoveryCfg.DialTimeoutMillisecond,
Username: etcdDiscoveryCfg.EtcdList[i].UserName, Username: etcdDiscoveryCfg.EtcdList[i].UserName,
Password: etcdDiscoveryCfg.EtcdList[i].Password, Password: etcdDiscoveryCfg.EtcdList[i].Password,
Logger: log.GetLogger().Logger, Logger: log.GetLogger().Logger,
TLS: tlsConfig, TLS: tlsConfig,
}) })
if err != nil { if err != nil {
log.Error("etcd discovery init fail", log.ErrorField("err", err)) log.Error("etcd discovery init fail", log.ErrorField("err", err))
return err return err

View File

@@ -73,9 +73,22 @@ func (handler *WSHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
} }
handler.conns[conn] = struct{}{} handler.conns[conn] = struct{}{}
handler.mutexConns.Unlock() handler.mutexConns.Unlock()
c,ok:=conn.NetConn().(*net.TCPConn)
if !ok {
tlsConn,ok := conn.NetConn().(*tls.Conn)
if !ok {
log.Error("conn error")
return
}
c,ok = tlsConn.NetConn().(*net.TCPConn)
if !ok {
log.Error("conn error")
return
}
}
conn.UnderlyingConn().(*net.TCPConn).SetLinger(0) c.SetLinger(0)
conn.UnderlyingConn().(*net.TCPConn).SetNoDelay(true) c.SetNoDelay(true)
wsConn := newWSConn(conn, r.Header, handler.pendingWriteNum, handler.maxMsgLen, handler.messageType) wsConn := newWSConn(conn, r.Header, handler.pendingWriteNum, handler.maxMsgLen, handler.messageType)
agent := handler.newAgent(wsConn) agent := handler.newAgent(wsConn)
agent.Run() agent.Run()

View File

@@ -479,11 +479,7 @@ func setLogPath(args interface{}) error {
return nil return nil
} }
logPath := strings.TrimSpace(args.(string)) logPath := strings.TrimSpace(args.(string))
dir, err := os.Stat(logPath) _, err := os.Stat(logPath)
if err != nil || dir.IsDir() == false {
return errors.New("Not found dir " + logPath)
}
if err != nil { if err != nil {
err = os.MkdirAll(logPath, os.ModePerm) err = os.MkdirAll(logPath, os.ModePerm)
if err != nil { if err != nil {

View File

@@ -192,7 +192,7 @@ func (s *Service) run() {
break break
} }
if s.profiler != nil { if s.profiler != nil {
analyzer = s.profiler.Push("[Req]" + rpcRequest.RpcRequestData.GetServiceMethod()) analyzer = s.profiler.Push("[RpcReq]" + rpcRequest.RpcRequestData.GetServiceMethod()+"."+strconv.Itoa(int(rpcRequest.RpcRequestData.GetRpcMethodId())))
} }
s.GetRpcHandler().HandlerRpcRequest(rpcRequest) s.GetRpcHandler().HandlerRpcRequest(rpcRequest)

View File

@@ -34,6 +34,8 @@ type WSCfg struct {
PendingWriteNum int PendingWriteNum int
MaxMsgLen uint32 MaxMsgLen uint32
LittleEndian bool //是否小端序 LittleEndian bool //是否小端序
KeyFile string
CertFile string
} }
type WSPackType int8 type WSPackType int8
@@ -62,13 +64,18 @@ func (ws *WSModule) OnInit() error {
ws.WSServer.MaxMsgLen = ws.wsCfg.MaxMsgLen ws.WSServer.MaxMsgLen = ws.wsCfg.MaxMsgLen
ws.WSServer.Addr = ws.wsCfg.ListenAddr ws.WSServer.Addr = ws.wsCfg.ListenAddr
//3.设置解析处理器 if ws.wsCfg.KeyFile != "" && ws.wsCfg.CertFile != "" {
ws.WSServer.KeyFile = ws.wsCfg.KeyFile
ws.WSServer.CertFile = ws.wsCfg.CertFile
}
// 设置解析处理器
ws.process.SetByteOrder(ws.wsCfg.LittleEndian) ws.process.SetByteOrder(ws.wsCfg.LittleEndian)
ws.mapClient = make(map[string]*WSClient, ws.WSServer.MaxConnNum) ws.mapClient = make(map[string]*WSClient, ws.WSServer.MaxConnNum)
ws.WSServer.NewAgent = ws.NewWSClient ws.WSServer.NewAgent = ws.NewWSClient
//4.设置网络事件处理 // 设置网络事件处理
ws.GetEventProcessor().RegEventReceiverFunc(event.Sys_Event_WebSocket, ws.GetEventHandler(), ws.wsEventHandler) ws.GetEventProcessor().RegEventReceiverFunc(event.Sys_Event_WebSocket, ws.GetEventHandler(), ws.wsEventHandler)
return nil return nil

View File

@@ -83,6 +83,7 @@ type HttpSession struct {
sessionDone chan *HttpSession sessionDone chan *HttpSession
} }
// Deprecated: replace it with the GinModule
type HttpService struct { type HttpService struct {
service.Service service.Service

View File

@@ -14,6 +14,7 @@ import (
"time" "time"
) )
// Deprecated: replace it with the TcpModule
type TcpService struct { type TcpService struct {
tcpServer network.TCPServer tcpServer network.TCPServer
service.Service service.Service

View File

@@ -12,6 +12,7 @@ import (
"sync" "sync"
) )
// Deprecated: replace it with the WSModule
type WSService struct { type WSService struct {
service.Service service.Service
wsServer network.WSServer wsServer network.WSServer