diff --git a/cluster/cluster.go b/cluster/cluster.go index c4b0384..319f484 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -40,8 +40,6 @@ type NodeInfo struct { DiscoveryService []DiscoveryService //筛选发现的服务,如果不配置,不进行筛选 status NodeStatus Retire bool - - NetworkName string } type NodeRpcInfo struct { diff --git a/cluster/configdiscovery.go b/cluster/configdiscovery.go index 2c28b8b..8c2fab9 100644 --- a/cluster/configdiscovery.go +++ b/cluster/configdiscovery.go @@ -3,24 +3,23 @@ package cluster import "github.com/duanhf2012/origin/v2/rpc" type ConfigDiscovery struct { - funDelNode FunDelNode - funSetNode FunSetNode + funDelNode FunDelNode + funSetNode FunSetNode localNodeId string } - -func (discovery *ConfigDiscovery) InitDiscovery(localNodeId string,funDelNode FunDelNode,funSetNode FunSetNode) error{ +func (discovery *ConfigDiscovery) InitDiscovery(localNodeId string, funDelNode FunDelNode, funSetNode FunSetNode) error { discovery.localNodeId = localNodeId discovery.funDelNode = funDelNode discovery.funSetNode = funSetNode - + //解析本地其他服务配置 - _,nodeInfoList,_,err := GetCluster().readLocalClusterConfig(rpc.NodeIdNull) + _, nodeInfoList, _, err := GetCluster().readLocalClusterConfig(rpc.NodeIdNull) if err != nil { return err } - for _,nodeInfo := range nodeInfoList { + for _, nodeInfo := range nodeInfoList { if nodeInfo.NodeId == localNodeId { continue } @@ -30,5 +29,3 @@ func (discovery *ConfigDiscovery) InitDiscovery(localNodeId string,funDelNode Fu return nil } - - diff --git a/cluster/discovery.go b/cluster/discovery.go index 36da5a2..fac4910 100644 --- a/cluster/discovery.go +++ b/cluster/discovery.go @@ -5,17 +5,17 @@ import ( "github.com/duanhf2012/origin/v2/service" ) -func (cls *Cluster) setupDiscovery(localNodeId string, setupServiceFun SetupServiceFun) error{ +func (cls *Cluster) setupDiscovery(localNodeId string, setupServiceFun SetupServiceFun) error { if cls.discoveryInfo.getDiscoveryType() == OriginType { //origin类型服务发现 - return cls.setupOriginDiscovery(localNodeId,setupServiceFun) - }else if cls.discoveryInfo.getDiscoveryType() == EtcdType{//etcd类型服务发现 - return cls.setupEtcdDiscovery(localNodeId,setupServiceFun) + return cls.setupOriginDiscovery(localNodeId, setupServiceFun) + } else if cls.discoveryInfo.getDiscoveryType() == EtcdType { //etcd类型服务发现 + return cls.setupEtcdDiscovery(localNodeId, setupServiceFun) } - return cls.setupConfigDiscovery(localNodeId,setupServiceFun) + return cls.setupConfigDiscovery(localNodeId, setupServiceFun) } -func (cls *Cluster) setupOriginDiscovery(localNodeId string, setupServiceFun SetupServiceFun) error{ +func (cls *Cluster) setupOriginDiscovery(localNodeId string, setupServiceFun SetupServiceFun) error { if cls.serviceDiscovery != nil { return errors.New("service discovery has been setup") } @@ -27,6 +27,7 @@ func (cls *Cluster) setupOriginDiscovery(localNodeId string, setupServiceFun Set } cls.serviceDiscovery = getOriginDiscovery() + //2.如果为动态服务发现安装本地发现服务 if localMaster == true { setupServiceFun(&masterService) @@ -36,11 +37,10 @@ func (cls *Cluster) setupOriginDiscovery(localNodeId string, setupServiceFun Set setupServiceFun(&clientService) cls.AddDiscoveryService(OriginDiscoveryClientName, true) - return nil } -func (cls *Cluster) setupEtcdDiscovery(localNodeId string, setupServiceFun SetupServiceFun) error{ +func (cls *Cluster) setupEtcdDiscovery(localNodeId string, setupServiceFun SetupServiceFun) error { if cls.serviceDiscovery != nil { return errors.New("service discovery has been setup") } @@ -48,12 +48,12 @@ func (cls *Cluster) setupEtcdDiscovery(localNodeId string, setupServiceFun Setup //setup etcd service cls.serviceDiscovery = getEtcdDiscovery() setupServiceFun(cls.serviceDiscovery.(service.IService)) - - cls.AddDiscoveryService(cls.serviceDiscovery.(service.IService).GetName(),false) + + cls.AddDiscoveryService(cls.serviceDiscovery.(service.IService).GetName(), false) return nil } -func (cls *Cluster) setupConfigDiscovery(localNodeId string, setupServiceFun SetupServiceFun) error{ +func (cls *Cluster) setupConfigDiscovery(localNodeId string, setupServiceFun SetupServiceFun) error { if cls.serviceDiscovery != nil { return errors.New("service discovery has been setup") } diff --git a/cluster/etcddiscovery.go b/cluster/etcddiscovery.go index 460331b..1daf259 100644 --- a/cluster/etcddiscovery.go +++ b/cluster/etcddiscovery.go @@ -1,6 +1,11 @@ package cluster import ( + "context" + "crypto/tls" + "crypto/x509" + "errors" + "fmt" "github.com/duanhf2012/origin/v2/event" "github.com/duanhf2012/origin/v2/log" "github.com/duanhf2012/origin/v2/rpc" @@ -9,16 +14,11 @@ import ( "go.etcd.io/etcd/api/v3/mvccpb" "go.etcd.io/etcd/client/v3" "google.golang.org/protobuf/proto" - "time" - "context" - "errors" - "fmt" + "os" "path" "strings" "sync/atomic" - "io/ioutil" - "crypto/x509" - "crypto/tls" + "time" ) const originDir = "/origin" @@ -42,7 +42,8 @@ type EtcdDiscoveryService struct { mapDiscoveryNodeId map[string]map[string]struct{} //map[networkName]map[nodeId] } -var etcdDiscovery *EtcdDiscoveryService +var etcdDiscovery *EtcdDiscoveryService + func getEtcdDiscovery() IServiceDiscovery { if etcdDiscovery == nil { etcdDiscovery = &EtcdDiscoveryService{} @@ -51,7 +52,6 @@ func getEtcdDiscovery() IServiceDiscovery { return etcdDiscovery } - func (ed *EtcdDiscoveryService) InitDiscovery(localNodeId string, funDelNode FunDelNode, funSetNode FunSetNode) error { ed.localNodeId = localNodeId @@ -99,17 +99,17 @@ func (ed *EtcdDiscoveryService) OnInit() error { if etcdDiscoveryCfg.EtcdList[i].Cert != "" { // load cert - cert, cerr := tls.LoadX509KeyPair(etcdDiscoveryCfg.EtcdList[i].Cert, etcdDiscoveryCfg.EtcdList[i].CertKey) - if cerr != nil { - log.Error("load cert error", log.ErrorField("err", cerr)) - return cerr + cert, cErr := tls.LoadX509KeyPair(etcdDiscoveryCfg.EtcdList[i].Cert, etcdDiscoveryCfg.EtcdList[i].CertKey) + if cErr != nil { + log.Error("load cert error", log.ErrorField("err", cErr)) + return cErr } // load root ca - caData, cerr := ioutil.ReadFile(etcdDiscoveryCfg.EtcdList[i].Ca) - if cerr != nil { - log.Error("load root ca error", log.ErrorField("err", cerr)) - return cerr + caData, cErr := os.ReadFile(etcdDiscoveryCfg.EtcdList[i].Ca) + if cErr != nil { + log.Error("load root ca error", log.ErrorField("err", cErr)) + return cErr } pool := x509.NewCertPool() pool.AppendCertsFromPEM(caData) @@ -122,13 +122,12 @@ func (ed *EtcdDiscoveryService) OnInit() error { client, err = clientv3.New(clientv3.Config{ Endpoints: etcdDiscoveryCfg.EtcdList[i].Endpoints, DialTimeout: etcdDiscoveryCfg.DialTimeoutMillisecond, - Username: etcdDiscoveryCfg.EtcdList[i].UserName, - Password: etcdDiscoveryCfg.EtcdList[i].Password, + Username: etcdDiscoveryCfg.EtcdList[i].UserName, + Password: etcdDiscoveryCfg.EtcdList[i].Password, Logger: log.GetLogger().Logger, - TLS: tlsConfig, + TLS: tlsConfig, }) - if err != nil { log.Error("etcd discovery init fail", log.ErrorField("err", err)) return err diff --git a/service/service.go b/service/service.go index 2d10e6d..e0be23f 100644 --- a/service/service.go +++ b/service/service.go @@ -192,7 +192,7 @@ func (s *Service) run() { break } if s.profiler != nil { - analyzer = s.profiler.Push("[Req]" + rpcRequest.RpcRequestData.GetServiceMethod()) + analyzer = s.profiler.Push("[RpcReq]" + rpcRequest.RpcRequestData.GetServiceMethod()+"."+strconv.Itoa(int(rpcRequest.RpcRequestData.GetRpcMethodId()))) } s.GetRpcHandler().HandlerRpcRequest(rpcRequest)