From e36693eeff279daf4ed7e47935c68dd6a851511b Mon Sep 17 00:00:00 2001 From: boyce Date: Fri, 19 Apr 2024 16:39:57 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96etcd=E5=8F=91=E7=8E=B0?= =?UTF-8?q?=E6=9C=8D=E5=8A=A1=EF=BC=8C=E6=94=AF=E6=8C=81=E8=AE=B0=E5=BD=95?= =?UTF-8?q?=E9=99=84=E5=8A=A0=E4=BF=A1=E6=81=AFrpc?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cluster/discovery.go | 2 +- cluster/etcddiscovery.go | 97 ++++++++++- cluster/origindiscovery.go | 105 ++++++++++-- cluster/parsecfg.go | 15 +- ...micdiscover.pb.go => origindiscover.pb.go} | 154 +++++++++--------- ...micdiscover.proto => origindiscover.proto} | 0 service/service.go | 13 +- 7 files changed, 278 insertions(+), 108 deletions(-) rename rpc/{dynamicdiscover.pb.go => origindiscover.pb.go} (61%) rename rpc/{dynamicdiscover.proto => origindiscover.proto} (100%) diff --git a/cluster/discovery.go b/cluster/discovery.go index e92b596..36da5a2 100644 --- a/cluster/discovery.go +++ b/cluster/discovery.go @@ -62,7 +62,7 @@ func (cls *Cluster) setupConfigDiscovery(localNodeId string, setupServiceFun Set return nil } -func (cls *Cluster) GetOriginDiscoveryNodeList() []NodeInfo { +func (cls *Cluster) GetOriginDiscovery() *OriginDiscovery { return cls.discoveryInfo.Origin } diff --git a/cluster/etcddiscovery.go b/cluster/etcddiscovery.go index 4286ddb..c89e7da 100644 --- a/cluster/etcddiscovery.go +++ b/cluster/etcddiscovery.go @@ -15,6 +15,7 @@ import ( "context" "errors" "fmt" + "path" "runtime" "strings" "sync/atomic" @@ -38,7 +39,7 @@ type EtcdDiscoveryService struct { byteLocalNodeInfo string mapClient map[*clientv3.Client]*etcdClientInfo isClose int32 - + bRetire bool mapDiscoveryNodeId map[string]map[string]struct{} //map[networkName]map[nodeId] } @@ -100,7 +101,8 @@ func (ed *EtcdDiscoveryService) OnInit() error { } ctx,_:=context.WithTimeout(context.Background(),time.Second*3) - _,err = client.Put(ctx,testKey,"") + _,err = client.Leases(ctx) + //_,err = client.Put(ctx,testKey,"") if err != nil { log.Error("etcd discovery init fail",log.Any("endpoint",etcdDiscoveryCfg.EtcdList[i].Endpoints),log.ErrorAttr("err",err)) return err @@ -186,9 +188,37 @@ func (ed *EtcdDiscoveryService) tryWatch(client *clientv3.Client,etcdClient *etc }) } +func (ed *EtcdDiscoveryService) tryLaterRetire() { + ed.AfterFunc(time.Second, func(*timer.Timer) { + if ed.retire() != nil { + ed.tryLaterRetire() + } + }) +} + +func (ed *EtcdDiscoveryService) retire() error{ + //从etcd中更新 + for c,ec := range ed.mapClient { + for _, watchKey := range ec.watchKeys { + // 注册服务节点到 etcd + _, err := c.Put(context.Background(), ed.getRegisterKey(watchKey), ed.byteLocalNodeInfo, clientv3.WithLease(ec.leaseID)) + if err != nil { + log.Error("etcd Put fail", log.ErrorAttr("err", err)) + return err + } + } + } + + return nil +} + func (ed *EtcdDiscoveryService) OnRetire(){ - atomic.StoreInt32(&ed.isClose,1) - ed.close() + ed.bRetire = true + ed.marshalNodeInfo() + + if ed.retire()!= nil { + ed.tryLaterRetire() + } } func (ed *EtcdDiscoveryService) OnRelease(){ @@ -212,7 +242,7 @@ func (ed *EtcdDiscoveryService) marshalNodeInfo() error{ var nodeInfo rpc.NodeInfo nodeInfo.NodeId = nInfo.NodeId nodeInfo.ListenAddr = nInfo.ListenAddr - nodeInfo.Retire = nInfo.Retire + nodeInfo.Retire = ed.bRetire nodeInfo.PublicServiceList = nInfo.PublicServiceList nodeInfo.MaxRpcParamLen = nInfo.MaxRpcParamLen @@ -344,7 +374,7 @@ func (ed *EtcdDiscoveryService) delNode(fullKey string) string{ return "" } - ed.funDelNode(nodeId,true) + ed.funDelNode(nodeId,false) return nodeId } @@ -415,7 +445,7 @@ func (ed *EtcdDiscoveryService) OnEventGets(watchKey string,Kvs []*mvccpb.KeyVal mapLastNodeId := ed.mapDiscoveryNodeId[watchKey] // 根据watchKey获取对应的节点ID集合 for nodeId := range mapLastNodeId { // 遍历所有节点ID if _,ok := mapNode[nodeId];ok == false && nodeId != ed.localNodeId { // 检查节点是否不存在于mapNode且不是本地节点 - ed.funDelNode(nodeId,true) // 调用函数删除该节点 + ed.funDelNode(nodeId,false) // 调用函数删除该节点 delete(ed.mapDiscoveryNodeId[watchKey],nodeId) log.Debug(">>etcd OnEventGets Delete",log.String("watchKey",watchKey),log.String("nodeId",nodeId)) } @@ -441,3 +471,56 @@ func (ed *EtcdDiscoveryService) addNodeId(watchKey string,nodeId string) { ed.mapDiscoveryNodeId[watchKey][nodeId] = struct{}{} } + +func (ed *EtcdDiscoveryService) OnNodeDisconnect(nodeId string) { + //将Discard结点清理 + cluster.DiscardNode(nodeId) +} + +func (ed *EtcdDiscoveryService) RPC_ServiceRecord(etcdServiceRecord *service.EtcdServiceRecordEvent,empty *service.Empty) error{ + var client *clientv3.Client + + //写入到etcd中 + for c, info := range ed.mapClient{ + for _,watchKey := range info.watchKeys { + if ed.getNetworkNameByWatchKey(watchKey) == etcdServiceRecord.NetworkName { + client = c + break + } + } + } + + if client == nil { + log.Error("etcd record fail,cannot find network name",log.String("networkName",etcdServiceRecord.NetworkName)) + return errors.New("annot find network name") + } + + var lg *clientv3.LeaseGrantResponse + var err error + + if etcdServiceRecord.TTLSecond > 0 { + ctx,_:=context.WithTimeout(context.Background(),time.Second*3) + lg, err = client.Grant(ctx, etcdServiceRecord.TTLSecond) + if err != nil { + log.Error("etcd record fail,cannot grant lease",log.ErrorAttr("err",err)) + return errors.New("cannot grant lease") + } + } + + if lg != nil { + ctx,_:=context.WithTimeout(context.Background(),time.Second*3) + _, err = client.Put(ctx, path.Join(originDir,etcdServiceRecord.RecordKey),etcdServiceRecord.RecordInfo, clientv3.WithLease(lg.ID)) + if err != nil { + log.Error("etcd record fail,cannot put record",log.ErrorAttr("err",err)) + } + return errors.New("cannot put record") + } + + _,err = client.Put(context.Background(), path.Join(originDir,etcdServiceRecord.RecordKey),etcdServiceRecord.RecordInfo) + if err != nil { + log.Error("etcd record fail,cannot put record",log.ErrorAttr("err",err)) + return errors.New("cannot put record") + } + + return nil +} diff --git a/cluster/origindiscovery.go b/cluster/origindiscovery.go index 54e2882..da0c80a 100644 --- a/cluster/origindiscovery.go +++ b/cluster/origindiscovery.go @@ -5,9 +5,9 @@ import ( "github.com/duanhf2012/origin/v2/log" "github.com/duanhf2012/origin/v2/rpc" "github.com/duanhf2012/origin/v2/service" - "time" "github.com/duanhf2012/origin/v2/util/timer" "google.golang.org/protobuf/proto" + "time" ) const OriginDiscoveryMasterName = "DiscoveryMaster" @@ -16,6 +16,71 @@ const RegServiceDiscover = OriginDiscoveryMasterName + ".RPC_RegServiceDiscover" const SubServiceDiscover = OriginDiscoveryClientName + ".RPC_SubServiceDiscover" const AddSubServiceDiscover = OriginDiscoveryMasterName + ".RPC_AddSubServiceDiscover" const NodeRetireRpcMethod = OriginDiscoveryMasterName+".RPC_NodeRetire" +// +//type nodeTTL struct { +// nodeId string +// refreshTime time.Time +//} +// +//type nodeSetTTL struct { +// l *list.List +// mapElement map[string]*list.Element +// ttl time.Duration +//} +// +//func (ns *nodeSetTTL) init(ttl time.Duration) { +// ns.ttl = ttl +// ns.mapElement = make(map[string]*list.Element,32) +// ns.l = list.New() +//} +// +//func (ns *nodeSetTTL) removeNode(nodeId string) { +// ele,ok:=ns.mapElement[nodeId] +// if ok == false { +// return +// } +// +// ns.l.Remove(ele) +// delete(ns.mapElement,nodeId) +//} +// +//func (ns *nodeSetTTL) addAndRefreshNode(nodeId string){ +// ele,ok:=ns.mapElement[nodeId] +// if ok == false { +// ele = ns.l.PushBack(nodeId) +// ele.Value = &nodeTTL{nodeId,time.Now()} +// ns.mapElement[nodeId] = ele +// return +// } +// +// ele.Value.(*nodeTTL).refreshTime = time.Now() +// ns.l.MoveToBack(ele) +//} +// +//func (ns *nodeSetTTL) checkTTL(cb func(nodeIdList []string)){ +// nodeIdList := []string{} +// for{ +// f := ns.l.Front() +// if f == nil { +// break +// } +// +// nt := f.Value.(*nodeTTL) +// if time.Now().Sub(nt.refreshTime) > ns.ttl { +// nodeIdList = append(nodeIdList,nt.nodeId) +// }else{ +// break +// } +// +// //删除结点 +// ns.l.Remove(f) +// delete(ns.mapElement,nt.nodeId) +// } +// +// if len(nodeIdList) >0 { +// cb(nodeIdList) +// } +//} type OriginDiscoveryMaster struct { service.Service @@ -48,7 +113,6 @@ func init() { clientService.SetName(OriginDiscoveryClientName) } - func (ds *OriginDiscoveryMaster) isRegNode(nodeId string) bool { _, ok := ds.mapNodeInfo[nodeId] return ok @@ -247,12 +311,14 @@ func (dc *OriginDiscoveryClient) OnStart() { } func (dc *OriginDiscoveryClient) addDiscoveryMaster() { - discoveryNodeList := cluster.GetOriginDiscoveryNodeList() - for i := 0; i < len(discoveryNodeList); i++ { - if discoveryNodeList[i].NodeId == cluster.GetLocalNodeInfo().NodeId { + discoveryNodeList := cluster.GetOriginDiscovery() + + for i := 0; i < len(discoveryNodeList.MasterNodeList); i++ { + if discoveryNodeList.MasterNodeList[i].NodeId == cluster.GetLocalNodeInfo().NodeId { continue } - dc.funSetNode(&discoveryNodeList[i]) + dc.funSetNode(&discoveryNodeList.MasterNodeList[i]) + } } @@ -355,8 +421,8 @@ func (dc *OriginDiscoveryClient) OnNodeConnected(nodeId string) { func (dc *OriginDiscoveryClient) OnRetire(){ dc.bRetire = true - masterNodeList := cluster.GetOriginDiscoveryNodeList() - for i:=0;i rpc.NodeInfo 0, // 1: rpc.SubscribeDiscoverNotify.nodeInfo:type_name -> rpc.NodeInfo 0, // 2: rpc.NodeRetireReq.nodeInfo:type_name -> rpc.NodeInfo @@ -385,13 +385,13 @@ var file_rpcproto_dynamicdiscover_proto_depIdxs = []int32{ 0, // [0:3] is the sub-list for field type_name } -func init() { file_rpcproto_dynamicdiscover_proto_init() } -func file_rpcproto_dynamicdiscover_proto_init() { - if File_rpcproto_dynamicdiscover_proto != nil { +func init() { file_rpcproto_origindiscover_proto_init() } +func file_rpcproto_origindiscover_proto_init() { + if File_rpcproto_origindiscover_proto != nil { return } if !protoimpl.UnsafeEnabled { - file_rpcproto_dynamicdiscover_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + file_rpcproto_origindiscover_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*NodeInfo); i { case 0: return &v.state @@ -403,7 +403,7 @@ func file_rpcproto_dynamicdiscover_proto_init() { return nil } } - file_rpcproto_dynamicdiscover_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + file_rpcproto_origindiscover_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*ServiceDiscoverReq); i { case 0: return &v.state @@ -415,7 +415,7 @@ func file_rpcproto_dynamicdiscover_proto_init() { return nil } } - file_rpcproto_dynamicdiscover_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { + file_rpcproto_origindiscover_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*SubscribeDiscoverNotify); i { case 0: return &v.state @@ -427,7 +427,7 @@ func file_rpcproto_dynamicdiscover_proto_init() { return nil } } - file_rpcproto_dynamicdiscover_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} { + file_rpcproto_origindiscover_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*NodeRetireReq); i { case 0: return &v.state @@ -439,7 +439,7 @@ func file_rpcproto_dynamicdiscover_proto_init() { return nil } } - file_rpcproto_dynamicdiscover_proto_msgTypes[4].Exporter = func(v interface{}, i int) interface{} { + file_rpcproto_origindiscover_proto_msgTypes[4].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*Empty); i { case 0: return &v.state @@ -456,18 +456,18 @@ func file_rpcproto_dynamicdiscover_proto_init() { out := protoimpl.TypeBuilder{ File: protoimpl.DescBuilder{ GoPackagePath: reflect.TypeOf(x{}).PkgPath(), - RawDescriptor: file_rpcproto_dynamicdiscover_proto_rawDesc, + RawDescriptor: file_rpcproto_origindiscover_proto_rawDesc, NumEnums: 0, NumMessages: 5, NumExtensions: 0, NumServices: 0, }, - GoTypes: file_rpcproto_dynamicdiscover_proto_goTypes, - DependencyIndexes: file_rpcproto_dynamicdiscover_proto_depIdxs, - MessageInfos: file_rpcproto_dynamicdiscover_proto_msgTypes, + GoTypes: file_rpcproto_origindiscover_proto_goTypes, + DependencyIndexes: file_rpcproto_origindiscover_proto_depIdxs, + MessageInfos: file_rpcproto_origindiscover_proto_msgTypes, }.Build() - File_rpcproto_dynamicdiscover_proto = out.File - file_rpcproto_dynamicdiscover_proto_rawDesc = nil - file_rpcproto_dynamicdiscover_proto_goTypes = nil - file_rpcproto_dynamicdiscover_proto_depIdxs = nil + File_rpcproto_origindiscover_proto = out.File + file_rpcproto_origindiscover_proto_rawDesc = nil + file_rpcproto_origindiscover_proto_goTypes = nil + file_rpcproto_origindiscover_proto_depIdxs = nil } diff --git a/rpc/dynamicdiscover.proto b/rpc/origindiscover.proto similarity index 100% rename from rpc/dynamicdiscover.proto rename to rpc/origindiscover.proto diff --git a/service/service.go b/service/service.go index 08fb8af..2e76631 100644 --- a/service/service.go +++ b/service/service.go @@ -77,6 +77,17 @@ type DiscoveryServiceEvent struct{ NodeId string } +type EtcdServiceRecordEvent struct { + NetworkName string + TTLSecond int64 + RecordKey string + RecordInfo string +} + +type Empty struct { + +} + func SetMaxServiceChannel(maxEventChannel int){ maxServiceEventChannelNum = maxEventChannel } @@ -410,4 +421,4 @@ func (s *Service) SetGoRoutineNum(goroutineNum int32) bool { } func (s *Service) OnRetire(){ -} \ No newline at end of file +}