mirror of
https://github.com/duanhf2012/origin.git
synced 2026-02-27 01:04:47 +08:00
Compare commits
2 Commits
a6487dd41e
...
v2.1.6
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
af15615345 | ||
|
|
50dd80b082 |
@@ -40,8 +40,6 @@ type NodeInfo struct {
|
|||||||
DiscoveryService []DiscoveryService //筛选发现的服务,如果不配置,不进行筛选
|
DiscoveryService []DiscoveryService //筛选发现的服务,如果不配置,不进行筛选
|
||||||
status NodeStatus
|
status NodeStatus
|
||||||
Retire bool
|
Retire bool
|
||||||
|
|
||||||
NetworkName string
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type NodeRpcInfo struct {
|
type NodeRpcInfo struct {
|
||||||
|
|||||||
@@ -3,24 +3,23 @@ package cluster
|
|||||||
import "github.com/duanhf2012/origin/v2/rpc"
|
import "github.com/duanhf2012/origin/v2/rpc"
|
||||||
|
|
||||||
type ConfigDiscovery struct {
|
type ConfigDiscovery struct {
|
||||||
funDelNode FunDelNode
|
funDelNode FunDelNode
|
||||||
funSetNode FunSetNode
|
funSetNode FunSetNode
|
||||||
localNodeId string
|
localNodeId string
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (discovery *ConfigDiscovery) InitDiscovery(localNodeId string, funDelNode FunDelNode, funSetNode FunSetNode) error {
|
||||||
func (discovery *ConfigDiscovery) InitDiscovery(localNodeId string,funDelNode FunDelNode,funSetNode FunSetNode) error{
|
|
||||||
discovery.localNodeId = localNodeId
|
discovery.localNodeId = localNodeId
|
||||||
discovery.funDelNode = funDelNode
|
discovery.funDelNode = funDelNode
|
||||||
discovery.funSetNode = funSetNode
|
discovery.funSetNode = funSetNode
|
||||||
|
|
||||||
//解析本地其他服务配置
|
//解析本地其他服务配置
|
||||||
_,nodeInfoList,_,err := GetCluster().readLocalClusterConfig(rpc.NodeIdNull)
|
_, nodeInfoList, _, err := GetCluster().readLocalClusterConfig(rpc.NodeIdNull)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
for _,nodeInfo := range nodeInfoList {
|
for _, nodeInfo := range nodeInfoList {
|
||||||
if nodeInfo.NodeId == localNodeId {
|
if nodeInfo.NodeId == localNodeId {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
@@ -30,5 +29,3 @@ func (discovery *ConfigDiscovery) InitDiscovery(localNodeId string,funDelNode Fu
|
|||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -5,17 +5,17 @@ import (
|
|||||||
"github.com/duanhf2012/origin/v2/service"
|
"github.com/duanhf2012/origin/v2/service"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (cls *Cluster) setupDiscovery(localNodeId string, setupServiceFun SetupServiceFun) error{
|
func (cls *Cluster) setupDiscovery(localNodeId string, setupServiceFun SetupServiceFun) error {
|
||||||
if cls.discoveryInfo.getDiscoveryType() == OriginType { //origin类型服务发现
|
if cls.discoveryInfo.getDiscoveryType() == OriginType { //origin类型服务发现
|
||||||
return cls.setupOriginDiscovery(localNodeId,setupServiceFun)
|
return cls.setupOriginDiscovery(localNodeId, setupServiceFun)
|
||||||
}else if cls.discoveryInfo.getDiscoveryType() == EtcdType{//etcd类型服务发现
|
} else if cls.discoveryInfo.getDiscoveryType() == EtcdType { //etcd类型服务发现
|
||||||
return cls.setupEtcdDiscovery(localNodeId,setupServiceFun)
|
return cls.setupEtcdDiscovery(localNodeId, setupServiceFun)
|
||||||
}
|
}
|
||||||
|
|
||||||
return cls.setupConfigDiscovery(localNodeId,setupServiceFun)
|
return cls.setupConfigDiscovery(localNodeId, setupServiceFun)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cls *Cluster) setupOriginDiscovery(localNodeId string, setupServiceFun SetupServiceFun) error{
|
func (cls *Cluster) setupOriginDiscovery(localNodeId string, setupServiceFun SetupServiceFun) error {
|
||||||
if cls.serviceDiscovery != nil {
|
if cls.serviceDiscovery != nil {
|
||||||
return errors.New("service discovery has been setup")
|
return errors.New("service discovery has been setup")
|
||||||
}
|
}
|
||||||
@@ -27,6 +27,7 @@ func (cls *Cluster) setupOriginDiscovery(localNodeId string, setupServiceFun Set
|
|||||||
}
|
}
|
||||||
|
|
||||||
cls.serviceDiscovery = getOriginDiscovery()
|
cls.serviceDiscovery = getOriginDiscovery()
|
||||||
|
|
||||||
//2.如果为动态服务发现安装本地发现服务
|
//2.如果为动态服务发现安装本地发现服务
|
||||||
if localMaster == true {
|
if localMaster == true {
|
||||||
setupServiceFun(&masterService)
|
setupServiceFun(&masterService)
|
||||||
@@ -36,11 +37,10 @@ func (cls *Cluster) setupOriginDiscovery(localNodeId string, setupServiceFun Set
|
|||||||
setupServiceFun(&clientService)
|
setupServiceFun(&clientService)
|
||||||
cls.AddDiscoveryService(OriginDiscoveryClientName, true)
|
cls.AddDiscoveryService(OriginDiscoveryClientName, true)
|
||||||
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cls *Cluster) setupEtcdDiscovery(localNodeId string, setupServiceFun SetupServiceFun) error{
|
func (cls *Cluster) setupEtcdDiscovery(localNodeId string, setupServiceFun SetupServiceFun) error {
|
||||||
if cls.serviceDiscovery != nil {
|
if cls.serviceDiscovery != nil {
|
||||||
return errors.New("service discovery has been setup")
|
return errors.New("service discovery has been setup")
|
||||||
}
|
}
|
||||||
@@ -49,11 +49,11 @@ func (cls *Cluster) setupEtcdDiscovery(localNodeId string, setupServiceFun Setup
|
|||||||
cls.serviceDiscovery = getEtcdDiscovery()
|
cls.serviceDiscovery = getEtcdDiscovery()
|
||||||
setupServiceFun(cls.serviceDiscovery.(service.IService))
|
setupServiceFun(cls.serviceDiscovery.(service.IService))
|
||||||
|
|
||||||
cls.AddDiscoveryService(cls.serviceDiscovery.(service.IService).GetName(),false)
|
cls.AddDiscoveryService(cls.serviceDiscovery.(service.IService).GetName(), false)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cls *Cluster) setupConfigDiscovery(localNodeId string, setupServiceFun SetupServiceFun) error{
|
func (cls *Cluster) setupConfigDiscovery(localNodeId string, setupServiceFun SetupServiceFun) error {
|
||||||
if cls.serviceDiscovery != nil {
|
if cls.serviceDiscovery != nil {
|
||||||
return errors.New("service discovery has been setup")
|
return errors.New("service discovery has been setup")
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,6 +1,11 @@
|
|||||||
package cluster
|
package cluster
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
|
"crypto/tls"
|
||||||
|
"crypto/x509"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
"github.com/duanhf2012/origin/v2/event"
|
"github.com/duanhf2012/origin/v2/event"
|
||||||
"github.com/duanhf2012/origin/v2/log"
|
"github.com/duanhf2012/origin/v2/log"
|
||||||
"github.com/duanhf2012/origin/v2/rpc"
|
"github.com/duanhf2012/origin/v2/rpc"
|
||||||
@@ -9,16 +14,11 @@ import (
|
|||||||
"go.etcd.io/etcd/api/v3/mvccpb"
|
"go.etcd.io/etcd/api/v3/mvccpb"
|
||||||
"go.etcd.io/etcd/client/v3"
|
"go.etcd.io/etcd/client/v3"
|
||||||
"google.golang.org/protobuf/proto"
|
"google.golang.org/protobuf/proto"
|
||||||
"time"
|
"os"
|
||||||
"context"
|
|
||||||
"errors"
|
|
||||||
"fmt"
|
|
||||||
"path"
|
"path"
|
||||||
"strings"
|
"strings"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"io/ioutil"
|
"time"
|
||||||
"crypto/x509"
|
|
||||||
"crypto/tls"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
const originDir = "/origin"
|
const originDir = "/origin"
|
||||||
@@ -42,7 +42,8 @@ type EtcdDiscoveryService struct {
|
|||||||
mapDiscoveryNodeId map[string]map[string]struct{} //map[networkName]map[nodeId]
|
mapDiscoveryNodeId map[string]map[string]struct{} //map[networkName]map[nodeId]
|
||||||
}
|
}
|
||||||
|
|
||||||
var etcdDiscovery *EtcdDiscoveryService
|
var etcdDiscovery *EtcdDiscoveryService
|
||||||
|
|
||||||
func getEtcdDiscovery() IServiceDiscovery {
|
func getEtcdDiscovery() IServiceDiscovery {
|
||||||
if etcdDiscovery == nil {
|
if etcdDiscovery == nil {
|
||||||
etcdDiscovery = &EtcdDiscoveryService{}
|
etcdDiscovery = &EtcdDiscoveryService{}
|
||||||
@@ -51,7 +52,6 @@ func getEtcdDiscovery() IServiceDiscovery {
|
|||||||
return etcdDiscovery
|
return etcdDiscovery
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
func (ed *EtcdDiscoveryService) InitDiscovery(localNodeId string, funDelNode FunDelNode, funSetNode FunSetNode) error {
|
func (ed *EtcdDiscoveryService) InitDiscovery(localNodeId string, funDelNode FunDelNode, funSetNode FunSetNode) error {
|
||||||
ed.localNodeId = localNodeId
|
ed.localNodeId = localNodeId
|
||||||
|
|
||||||
@@ -99,17 +99,17 @@ func (ed *EtcdDiscoveryService) OnInit() error {
|
|||||||
|
|
||||||
if etcdDiscoveryCfg.EtcdList[i].Cert != "" {
|
if etcdDiscoveryCfg.EtcdList[i].Cert != "" {
|
||||||
// load cert
|
// load cert
|
||||||
cert, cerr := tls.LoadX509KeyPair(etcdDiscoveryCfg.EtcdList[i].Cert, etcdDiscoveryCfg.EtcdList[i].CertKey)
|
cert, cErr := tls.LoadX509KeyPair(etcdDiscoveryCfg.EtcdList[i].Cert, etcdDiscoveryCfg.EtcdList[i].CertKey)
|
||||||
if cerr != nil {
|
if cErr != nil {
|
||||||
log.Error("load cert error", log.ErrorField("err", cerr))
|
log.Error("load cert error", log.ErrorField("err", cErr))
|
||||||
return cerr
|
return cErr
|
||||||
}
|
}
|
||||||
|
|
||||||
// load root ca
|
// load root ca
|
||||||
caData, cerr := ioutil.ReadFile(etcdDiscoveryCfg.EtcdList[i].Ca)
|
caData, cErr := os.ReadFile(etcdDiscoveryCfg.EtcdList[i].Ca)
|
||||||
if cerr != nil {
|
if cErr != nil {
|
||||||
log.Error("load root ca error", log.ErrorField("err", cerr))
|
log.Error("load root ca error", log.ErrorField("err", cErr))
|
||||||
return cerr
|
return cErr
|
||||||
}
|
}
|
||||||
pool := x509.NewCertPool()
|
pool := x509.NewCertPool()
|
||||||
pool.AppendCertsFromPEM(caData)
|
pool.AppendCertsFromPEM(caData)
|
||||||
@@ -122,13 +122,12 @@ func (ed *EtcdDiscoveryService) OnInit() error {
|
|||||||
client, err = clientv3.New(clientv3.Config{
|
client, err = clientv3.New(clientv3.Config{
|
||||||
Endpoints: etcdDiscoveryCfg.EtcdList[i].Endpoints,
|
Endpoints: etcdDiscoveryCfg.EtcdList[i].Endpoints,
|
||||||
DialTimeout: etcdDiscoveryCfg.DialTimeoutMillisecond,
|
DialTimeout: etcdDiscoveryCfg.DialTimeoutMillisecond,
|
||||||
Username: etcdDiscoveryCfg.EtcdList[i].UserName,
|
Username: etcdDiscoveryCfg.EtcdList[i].UserName,
|
||||||
Password: etcdDiscoveryCfg.EtcdList[i].Password,
|
Password: etcdDiscoveryCfg.EtcdList[i].Password,
|
||||||
Logger: log.GetLogger().Logger,
|
Logger: log.GetLogger().Logger,
|
||||||
TLS: tlsConfig,
|
TLS: tlsConfig,
|
||||||
})
|
})
|
||||||
|
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("etcd discovery init fail", log.ErrorField("err", err))
|
log.Error("etcd discovery init fail", log.ErrorField("err", err))
|
||||||
return err
|
return err
|
||||||
|
|||||||
@@ -479,11 +479,7 @@ func setLogPath(args interface{}) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
logPath := strings.TrimSpace(args.(string))
|
logPath := strings.TrimSpace(args.(string))
|
||||||
dir, err := os.Stat(logPath)
|
_, err := os.Stat(logPath)
|
||||||
if err != nil || dir.IsDir() == false {
|
|
||||||
return errors.New("Not found dir " + logPath)
|
|
||||||
}
|
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
err = os.MkdirAll(logPath, os.ModePerm)
|
err = os.MkdirAll(logPath, os.ModePerm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|||||||
@@ -192,7 +192,7 @@ func (s *Service) run() {
|
|||||||
break
|
break
|
||||||
}
|
}
|
||||||
if s.profiler != nil {
|
if s.profiler != nil {
|
||||||
analyzer = s.profiler.Push("[Req]" + rpcRequest.RpcRequestData.GetServiceMethod())
|
analyzer = s.profiler.Push("[RpcReq]" + rpcRequest.RpcRequestData.GetServiceMethod()+"."+strconv.Itoa(int(rpcRequest.RpcRequestData.GetRpcMethodId())))
|
||||||
}
|
}
|
||||||
|
|
||||||
s.GetRpcHandler().HandlerRpcRequest(rpcRequest)
|
s.GetRpcHandler().HandlerRpcRequest(rpcRequest)
|
||||||
|
|||||||
Reference in New Issue
Block a user