From bba5eb2929476f7db993d40f42c81754248c4061 Mon Sep 17 00:00:00 2001 From: boyce Date: Thu, 25 Apr 2024 11:20:06 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96origin=E9=BB=98=E8=AE=A4?= =?UTF-8?q?=E6=A8=A1=E5=BC=8F=E7=9A=84=E6=9C=8D=E5=8A=A1=E5=8F=91=E7=8E=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cluster/cluster.go | 24 +--- cluster/nodettl.go | 71 +++++++++++ cluster/origindiscovery.go | 226 ++++++++++++++++++++------------- cluster/parsecfg.go | 14 +- event/eventtype.go | 13 +- rpc/client.go | 4 +- rpc/natsclient.go | 5 +- rpc/natsserver.go | 12 +- rpc/origindiscover.pb.go | 253 ++++++++++++++++++++++++++++++++----- rpc/origindiscover.proto | 18 ++- rpc/rclient.go | 20 ++- rpc/rpcevent.go | 23 ++++ rpc/rpchandler.go | 12 +- rpc/rpcnats.go | 7 +- service/service.go | 52 +++++--- 15 files changed, 573 insertions(+), 181 deletions(-) create mode 100644 cluster/nodettl.go create mode 100644 rpc/rpcevent.go diff --git a/cluster/cluster.go b/cluster/cluster.go index cc2a8a3..d56a16e 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -7,6 +7,7 @@ import ( "github.com/duanhf2012/origin/v2/service" "strings" "sync" + "github.com/duanhf2012/origin/v2/event" ) var configDir = "./config/" @@ -196,9 +197,9 @@ func (cls *Cluster) serviceDiscoverySetNodeInfo(nodeInfo *NodeInfo) { rpcInfo.nodeInfo = *nodeInfo if cls.IsNatsMode() { - rpcInfo.client = cls.rpcNats.NewNatsClient(nodeInfo.NodeId, cls.GetLocalNodeInfo().NodeId,&cls.callSet) + rpcInfo.client = cls.rpcNats.NewNatsClient(nodeInfo.NodeId, cls.GetLocalNodeInfo().NodeId,&cls.callSet,cls.NotifyAllService) }else{ - rpcInfo.client =rpc.NewRClient(nodeInfo.NodeId, nodeInfo.ListenAddr, nodeInfo.MaxRpcParamLen,cls.localNodeInfo.CompressBytesLen,cls.triggerRpcEvent,&cls.callSet) + rpcInfo.client =rpc.NewRClient(nodeInfo.NodeId, nodeInfo.ListenAddr, nodeInfo.MaxRpcParamLen,cls.localNodeInfo.CompressBytesLen,&cls.callSet,cls.NotifyAllService) } cls.mapRpc[nodeInfo.NodeId] = &rpcInfo log.Info("Discovery nodeId and new rpc client",log.String("NodeId", nodeInfo.NodeId),log.Any("services:", nodeInfo.PublicServiceList),log.Bool("Retire",nodeInfo.Retire),log.String("nodeListenAddr",nodeInfo.ListenAddr)) @@ -214,7 +215,7 @@ func (cls *Cluster) Init(localNodeId string, setupServiceFun SetupServiceFun) er cls.callSet.Init() if cls.IsNatsMode() { - cls.rpcNats.Init(cls.rpcMode.Nats.NatsUrl,cls.rpcMode.Nats.NoRandomize,cls.GetLocalNodeInfo().NodeId,cls.localNodeInfo.CompressBytesLen,cls) + cls.rpcNats.Init(cls.rpcMode.Nats.NatsUrl,cls.rpcMode.Nats.NoRandomize,cls.GetLocalNodeInfo().NodeId,cls.localNodeInfo.CompressBytesLen,cls,cluster.NotifyAllService) cls.rpcServer = &cls.rpcNats }else{ s := &rpc.Server{} @@ -308,18 +309,10 @@ func (cls *Cluster) IsNodeRetire(nodeId string) bool { return retire } - -func (cls *Cluster) triggerRpcEvent(bConnect bool, clientId uint32, nodeId string) { - cls.locker.Lock() - nodeInfo, ok := cls.mapRpc[nodeId] - if ok == false || nodeInfo.client == nil || nodeInfo.client.GetClientId() != clientId { - cls.locker.Unlock() - return - } - cls.locker.Unlock() - +func (cls *Cluster) NotifyAllService(event event.IEvent){ cls.rpcEventLocker.Lock() defer cls.rpcEventLocker.Unlock() + for serviceName, _ := range cls.mapServiceListenRpcEvent { ser := service.GetService(serviceName) if ser == nil { @@ -327,10 +320,7 @@ func (cls *Cluster) triggerRpcEvent(bConnect bool, clientId uint32, nodeId strin continue } - var eventData service.RpcConnEvent - eventData.IsConnect = bConnect - eventData.NodeId = nodeId - ser.(service.IModule).NotifyEvent(&eventData) + ser.(service.IModule).NotifyEvent(event) } } diff --git a/cluster/nodettl.go b/cluster/nodettl.go new file mode 100644 index 0000000..e22cb22 --- /dev/null +++ b/cluster/nodettl.go @@ -0,0 +1,71 @@ +package cluster + +import ( + "time" + "container/list" +) + +type nodeTTL struct { + nodeId string + refreshTime time.Time +} + +type nodeSetTTL struct { + l *list.List + mapElement map[string]*list.Element + ttl time.Duration +} + +func (ns *nodeSetTTL) init(ttl time.Duration) { + ns.ttl = ttl + ns.mapElement = make(map[string]*list.Element,32) + ns.l = list.New() +} + +func (ns *nodeSetTTL) removeNode(nodeId string) { + ele,ok:=ns.mapElement[nodeId] + if ok == false { + return + } + + ns.l.Remove(ele) + delete(ns.mapElement,nodeId) +} + +func (ns *nodeSetTTL) addAndRefreshNode(nodeId string){ + ele,ok:=ns.mapElement[nodeId] + if ok == false { + ele = ns.l.PushBack(nodeId) + ele.Value = &nodeTTL{nodeId,time.Now()} + ns.mapElement[nodeId] = ele + return + } + + ele.Value.(*nodeTTL).refreshTime = time.Now() + ns.l.MoveToBack(ele) +} + +func (ns *nodeSetTTL) checkTTL(cb func(nodeIdList []string)){ + nodeIdList := []string{} + for{ + f := ns.l.Front() + if f == nil { + break + } + + nt := f.Value.(*nodeTTL) + if time.Now().Sub(nt.refreshTime) > ns.ttl { + nodeIdList = append(nodeIdList,nt.nodeId) + }else{ + break + } + + //删除结点 + ns.l.Remove(f) + delete(ns.mapElement,nt.nodeId) + } + + if len(nodeIdList) >0 { + cb(nodeIdList) + } +} diff --git a/cluster/origindiscovery.go b/cluster/origindiscovery.go index 34fb9c3..06b6625 100644 --- a/cluster/origindiscovery.go +++ b/cluster/origindiscovery.go @@ -16,77 +16,16 @@ const RegServiceDiscover = OriginDiscoveryMasterName + ".RPC_RegServiceDiscover" const SubServiceDiscover = OriginDiscoveryClientName + ".RPC_SubServiceDiscover" const AddSubServiceDiscover = OriginDiscoveryMasterName + ".RPC_AddSubServiceDiscover" const NodeRetireRpcMethod = OriginDiscoveryMasterName+".RPC_NodeRetire" -// -//type nodeTTL struct { -// nodeId string -// refreshTime time.Time -//} -// -//type nodeSetTTL struct { -// l *list.List -// mapElement map[string]*list.Element -// ttl time.Duration -//} -// -//func (ns *nodeSetTTL) init(ttl time.Duration) { -// ns.ttl = ttl -// ns.mapElement = make(map[string]*list.Element,32) -// ns.l = list.New() -//} -// -//func (ns *nodeSetTTL) removeNode(nodeId string) { -// ele,ok:=ns.mapElement[nodeId] -// if ok == false { -// return -// } -// -// ns.l.Remove(ele) -// delete(ns.mapElement,nodeId) -//} -// -//func (ns *nodeSetTTL) addAndRefreshNode(nodeId string){ -// ele,ok:=ns.mapElement[nodeId] -// if ok == false { -// ele = ns.l.PushBack(nodeId) -// ele.Value = &nodeTTL{nodeId,time.Now()} -// ns.mapElement[nodeId] = ele -// return -// } -// -// ele.Value.(*nodeTTL).refreshTime = time.Now() -// ns.l.MoveToBack(ele) -//} -// -//func (ns *nodeSetTTL) checkTTL(cb func(nodeIdList []string)){ -// nodeIdList := []string{} -// for{ -// f := ns.l.Front() -// if f == nil { -// break -// } -// -// nt := f.Value.(*nodeTTL) -// if time.Now().Sub(nt.refreshTime) > ns.ttl { -// nodeIdList = append(nodeIdList,nt.nodeId) -// }else{ -// break -// } -// -// //删除结点 -// ns.l.Remove(f) -// delete(ns.mapElement,nt.nodeId) -// } -// -// if len(nodeIdList) >0 { -// cb(nodeIdList) -// } -//} +const RpcPingMethod = OriginDiscoveryMasterName+".RPC_Ping" +const UnRegServiceDiscover = OriginDiscoveryMasterName+".RPC_UnRegServiceDiscover" type OriginDiscoveryMaster struct { service.Service mapNodeInfo map[string]struct{} nodeInfo []*rpc.NodeInfo + + nsTTL nodeSetTTL } type OriginDiscoveryClient struct { @@ -164,11 +103,35 @@ func (ds *OriginDiscoveryMaster) removeNodeInfo(nodeId string) { func (ds *OriginDiscoveryMaster) OnInit() error { ds.mapNodeInfo = make(map[string]struct{}, 20) - ds.RegRpcListener(ds) + ds.RegNodeConnListener(ds) + ds.RegNatsConnListener(ds) + + ds.nsTTL.init(time.Duration(cluster.GetOriginDiscovery().TTLSecond)*time.Second) return nil } +func (ds *OriginDiscoveryMaster) checkTTL(){ + if cluster.IsNatsMode() == false { + return + } + + interval := time.Duration(cluster.GetOriginDiscovery().TTLSecond)*time.Second + interval = interval /3 /2 + if interval < time.Second { + interval = time.Second + } + + ds.NewTicker(interval,func(t *timer.Ticker){ + ds.nsTTL.checkTTL(func(nodeIdList []string) { + for _,nodeId := range nodeIdList { + log.Debug("TTL expiry",log.String("nodeId",nodeId)) + ds.OnNodeDisconnect(nodeId) + } + }) + }) +} + func (ds *OriginDiscoveryMaster) OnStart() { var nodeInfo rpc.NodeInfo localNodeInfo := cluster.GetLocalNodeInfo() @@ -179,21 +142,23 @@ func (ds *OriginDiscoveryMaster) OnStart() { nodeInfo.Private = localNodeInfo.Private nodeInfo.Retire = localNodeInfo.Retire ds.addNodeInfo(&nodeInfo) + + ds.checkTTL() +} + +func (dc *OriginDiscoveryMaster) OnNatsConnected(){ + //向所有的节点同步服务发现信息 + var notifyDiscover rpc.SubscribeDiscoverNotify + notifyDiscover.IsFull = true + notifyDiscover.NodeInfo = dc.nodeInfo + notifyDiscover.MasterNodeId = cluster.GetLocalNodeInfo().NodeId + dc.RpcCastGo(SubServiceDiscover, ¬ifyDiscover) +} + +func (dc *OriginDiscoveryMaster) OnNatsDisconnect(){ } func (ds *OriginDiscoveryMaster) OnNodeConnected(nodeId string) { - //没注册过结点不通知 - if ds.isRegNode(nodeId) == false { - return - } - - //向它发布所有服务列表信息 - var notifyDiscover rpc.SubscribeDiscoverNotify - notifyDiscover.IsFull = true - notifyDiscover.NodeInfo = ds.nodeInfo - notifyDiscover.MasterNodeId = cluster.GetLocalNodeInfo().NodeId - - ds.GoNode(nodeId, SubServiceDiscover, ¬ifyDiscover) } func (ds *OriginDiscoveryMaster) OnNodeDisconnect(nodeId string) { @@ -203,9 +168,11 @@ func (ds *OriginDiscoveryMaster) OnNodeDisconnect(nodeId string) { ds.removeNodeInfo(nodeId) + //主动删除已经存在的结点,确保先断开,再连接 var notifyDiscover rpc.SubscribeDiscoverNotify notifyDiscover.MasterNodeId = cluster.GetLocalNodeInfo().NodeId notifyDiscover.DelNodeId = nodeId + //删除结点 cluster.DelNode(nodeId, true) @@ -219,6 +186,18 @@ func (ds *OriginDiscoveryMaster) RpcCastGo(serviceMethod string, args interface{ } } +func (ds *OriginDiscoveryMaster) RPC_Ping(req *rpc.Ping, res *rpc.Pong) error { + if ds.isRegNode(req.NodeId) == false{ + res.Ok = false + return nil + } + + return nil + res.Ok = true + ds.nsTTL.addAndRefreshNode(req.NodeId) + return nil +} + func (ds *OriginDiscoveryMaster) RPC_NodeRetire(req *rpc.NodeRetireReq, res *rpc.Empty) error { log.Info("node is retire",log.String("nodeId",req.NodeInfo.NodeId),log.Bool("retire",req.NodeInfo.Retire)) @@ -233,7 +212,7 @@ func (ds *OriginDiscoveryMaster) RPC_NodeRetire(req *rpc.NodeRetireReq, res *rpc } // 收到注册过来的结点 -func (ds *OriginDiscoveryMaster) RPC_RegServiceDiscover(req *rpc.ServiceDiscoverReq, res *rpc.Empty) error { +func (ds *OriginDiscoveryMaster) RPC_RegServiceDiscover(req *rpc.RegServiceDiscoverReq, res *rpc.SubscribeDiscoverNotify) error { if req.NodeInfo == nil { err := errors.New("RPC_RegServiceDiscover req is error.") log.Error(err.Error()) @@ -241,6 +220,10 @@ func (ds *OriginDiscoveryMaster) RPC_RegServiceDiscover(req *rpc.ServiceDiscover return err } + if req.NodeInfo.NodeId != cluster.GetLocalNodeInfo().NodeId { + ds.nsTTL.addAndRefreshNode(req.NodeInfo.NodeId) + } + //广播给其他所有结点 var notifyDiscover rpc.SubscribeDiscoverNotify notifyDiscover.MasterNodeId = cluster.GetLocalNodeInfo().NodeId @@ -266,11 +249,17 @@ func (ds *OriginDiscoveryMaster) RPC_RegServiceDiscover(req *rpc.ServiceDiscover //加入到本地Cluster模块中,将连接该结点 cluster.serviceDiscoverySetNodeInfo(&nodeInfo) + + res.IsFull = true + res.NodeInfo = ds.nodeInfo + res.MasterNodeId = cluster.GetLocalNodeInfo().NodeId return nil } func (dc *OriginDiscoveryClient) OnInit() error { - dc.RegRpcListener(dc) + dc.RegNodeConnListener(dc) + dc.RegNatsConnListener(dc) + dc.mapDiscovery = map[string]map[string]struct{}{} dc.mapMasterNetwork = map[string]string{} @@ -305,9 +294,42 @@ func (dc *OriginDiscoveryClient) findNodeId(nodeId string) bool { return false } +func (dc *OriginDiscoveryClient) ping(){ + if cluster.IsNatsMode() == false { + return + } + + interval := time.Duration(cluster.GetOriginDiscovery().TTLSecond)*time.Second + interval = interval /3 + if interval < time.Second { + interval = time.Second + } + + dc.NewTicker(interval,func(t *timer.Ticker){ + var ping rpc.Ping + ping.NodeId = cluster.GetLocalNodeInfo().NodeId + masterNodes := GetCluster().GetOriginDiscovery().MasterNodeList + for i:=0;iMaster -type ServiceDiscoverReq struct { +type RegServiceDiscoverReq struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields @@ -116,8 +116,8 @@ type ServiceDiscoverReq struct { NodeInfo *NodeInfo `protobuf:"bytes,1,opt,name=nodeInfo,proto3" json:"nodeInfo,omitempty"` } -func (x *ServiceDiscoverReq) Reset() { - *x = ServiceDiscoverReq{} +func (x *RegServiceDiscoverReq) Reset() { + *x = RegServiceDiscoverReq{} if protoimpl.UnsafeEnabled { mi := &file_rpcproto_origindiscover_proto_msgTypes[1] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) @@ -125,13 +125,13 @@ func (x *ServiceDiscoverReq) Reset() { } } -func (x *ServiceDiscoverReq) String() string { +func (x *RegServiceDiscoverReq) String() string { return protoimpl.X.MessageStringOf(x) } -func (*ServiceDiscoverReq) ProtoMessage() {} +func (*RegServiceDiscoverReq) ProtoMessage() {} -func (x *ServiceDiscoverReq) ProtoReflect() protoreflect.Message { +func (x *RegServiceDiscoverReq) ProtoReflect() protoreflect.Message { mi := &file_rpcproto_origindiscover_proto_msgTypes[1] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) @@ -143,12 +143,12 @@ func (x *ServiceDiscoverReq) ProtoReflect() protoreflect.Message { return mi.MessageOf(x) } -// Deprecated: Use ServiceDiscoverReq.ProtoReflect.Descriptor instead. -func (*ServiceDiscoverReq) Descriptor() ([]byte, []int) { +// Deprecated: Use RegServiceDiscoverReq.ProtoReflect.Descriptor instead. +func (*RegServiceDiscoverReq) Descriptor() ([]byte, []int) { return file_rpcproto_origindiscover_proto_rawDescGZIP(), []int{1} } -func (x *ServiceDiscoverReq) GetNodeInfo() *NodeInfo { +func (x *RegServiceDiscoverReq) GetNodeInfo() *NodeInfo { if x != nil { return x.NodeInfo } @@ -314,6 +314,149 @@ func (*Empty) Descriptor() ([]byte, []int) { return file_rpcproto_origindiscover_proto_rawDescGZIP(), []int{4} } +// Client->Master +type Ping struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + NodeId string `protobuf:"bytes,1,opt,name=NodeId,proto3" json:"NodeId,omitempty"` +} + +func (x *Ping) Reset() { + *x = Ping{} + if protoimpl.UnsafeEnabled { + mi := &file_rpcproto_origindiscover_proto_msgTypes[5] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Ping) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Ping) ProtoMessage() {} + +func (x *Ping) ProtoReflect() protoreflect.Message { + mi := &file_rpcproto_origindiscover_proto_msgTypes[5] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Ping.ProtoReflect.Descriptor instead. +func (*Ping) Descriptor() ([]byte, []int) { + return file_rpcproto_origindiscover_proto_rawDescGZIP(), []int{5} +} + +func (x *Ping) GetNodeId() string { + if x != nil { + return x.NodeId + } + return "" +} + +// Master->Client +type Pong struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Ok bool `protobuf:"varint,1,opt,name=ok,proto3" json:"ok,omitempty"` +} + +func (x *Pong) Reset() { + *x = Pong{} + if protoimpl.UnsafeEnabled { + mi := &file_rpcproto_origindiscover_proto_msgTypes[6] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Pong) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Pong) ProtoMessage() {} + +func (x *Pong) ProtoReflect() protoreflect.Message { + mi := &file_rpcproto_origindiscover_proto_msgTypes[6] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Pong.ProtoReflect.Descriptor instead. +func (*Pong) Descriptor() ([]byte, []int) { + return file_rpcproto_origindiscover_proto_rawDescGZIP(), []int{6} +} + +func (x *Pong) GetOk() bool { + if x != nil { + return x.Ok + } + return false +} + +type UnRegServiceDiscoverReq struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + NodeId string `protobuf:"bytes,1,opt,name=NodeId,proto3" json:"NodeId,omitempty"` +} + +func (x *UnRegServiceDiscoverReq) Reset() { + *x = UnRegServiceDiscoverReq{} + if protoimpl.UnsafeEnabled { + mi := &file_rpcproto_origindiscover_proto_msgTypes[7] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *UnRegServiceDiscoverReq) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*UnRegServiceDiscoverReq) ProtoMessage() {} + +func (x *UnRegServiceDiscoverReq) ProtoReflect() protoreflect.Message { + mi := &file_rpcproto_origindiscover_proto_msgTypes[7] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use UnRegServiceDiscoverReq.ProtoReflect.Descriptor instead. +func (*UnRegServiceDiscoverReq) Descriptor() ([]byte, []int) { + return file_rpcproto_origindiscover_proto_rawDescGZIP(), []int{7} +} + +func (x *UnRegServiceDiscoverReq) GetNodeId() string { + if x != nil { + return x.NodeId + } + return "" +} + var File_rpcproto_origindiscover_proto protoreflect.FileDescriptor var file_rpcproto_origindiscover_proto_rawDesc = []byte{ @@ -332,26 +475,33 @@ var file_rpcproto_origindiscover_proto_rawDesc = []byte{ 0x69, 0x72, 0x65, 0x12, 0x2c, 0x0a, 0x11, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x63, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x4c, 0x69, 0x73, 0x74, 0x18, 0x06, 0x20, 0x03, 0x28, 0x09, 0x52, 0x11, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x63, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x4c, 0x69, 0x73, - 0x74, 0x22, 0x3f, 0x0a, 0x12, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x44, 0x69, 0x73, 0x63, - 0x6f, 0x76, 0x65, 0x72, 0x52, 0x65, 0x71, 0x12, 0x29, 0x0a, 0x08, 0x6e, 0x6f, 0x64, 0x65, 0x49, + 0x74, 0x22, 0x42, 0x0a, 0x15, 0x52, 0x65, 0x67, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x44, + 0x69, 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x52, 0x65, 0x71, 0x12, 0x29, 0x0a, 0x08, 0x6e, 0x6f, + 0x64, 0x65, 0x49, 0x6e, 0x66, 0x6f, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0d, 0x2e, 0x72, + 0x70, 0x63, 0x2e, 0x4e, 0x6f, 0x64, 0x65, 0x49, 0x6e, 0x66, 0x6f, 0x52, 0x08, 0x6e, 0x6f, 0x64, + 0x65, 0x49, 0x6e, 0x66, 0x6f, 0x22, 0x9e, 0x01, 0x0a, 0x17, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, + 0x69, 0x62, 0x65, 0x44, 0x69, 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x4e, 0x6f, 0x74, 0x69, 0x66, + 0x79, 0x12, 0x22, 0x0a, 0x0c, 0x4d, 0x61, 0x73, 0x74, 0x65, 0x72, 0x4e, 0x6f, 0x64, 0x65, 0x49, + 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, 0x4d, 0x61, 0x73, 0x74, 0x65, 0x72, 0x4e, + 0x6f, 0x64, 0x65, 0x49, 0x64, 0x12, 0x16, 0x0a, 0x06, 0x49, 0x73, 0x46, 0x75, 0x6c, 0x6c, 0x18, + 0x02, 0x20, 0x01, 0x28, 0x08, 0x52, 0x06, 0x49, 0x73, 0x46, 0x75, 0x6c, 0x6c, 0x12, 0x1c, 0x0a, + 0x09, 0x44, 0x65, 0x6c, 0x4e, 0x6f, 0x64, 0x65, 0x49, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, + 0x52, 0x09, 0x44, 0x65, 0x6c, 0x4e, 0x6f, 0x64, 0x65, 0x49, 0x64, 0x12, 0x29, 0x0a, 0x08, 0x6e, + 0x6f, 0x64, 0x65, 0x49, 0x6e, 0x66, 0x6f, 0x18, 0x04, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x0d, 0x2e, + 0x72, 0x70, 0x63, 0x2e, 0x4e, 0x6f, 0x64, 0x65, 0x49, 0x6e, 0x66, 0x6f, 0x52, 0x08, 0x6e, 0x6f, + 0x64, 0x65, 0x49, 0x6e, 0x66, 0x6f, 0x22, 0x3a, 0x0a, 0x0d, 0x4e, 0x6f, 0x64, 0x65, 0x52, 0x65, + 0x74, 0x69, 0x72, 0x65, 0x52, 0x65, 0x71, 0x12, 0x29, 0x0a, 0x08, 0x6e, 0x6f, 0x64, 0x65, 0x49, 0x6e, 0x66, 0x6f, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0d, 0x2e, 0x72, 0x70, 0x63, 0x2e, 0x4e, 0x6f, 0x64, 0x65, 0x49, 0x6e, 0x66, 0x6f, 0x52, 0x08, 0x6e, 0x6f, 0x64, 0x65, 0x49, 0x6e, - 0x66, 0x6f, 0x22, 0x9e, 0x01, 0x0a, 0x17, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, - 0x44, 0x69, 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x4e, 0x6f, 0x74, 0x69, 0x66, 0x79, 0x12, 0x22, - 0x0a, 0x0c, 0x4d, 0x61, 0x73, 0x74, 0x65, 0x72, 0x4e, 0x6f, 0x64, 0x65, 0x49, 0x64, 0x18, 0x01, - 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, 0x4d, 0x61, 0x73, 0x74, 0x65, 0x72, 0x4e, 0x6f, 0x64, 0x65, - 0x49, 0x64, 0x12, 0x16, 0x0a, 0x06, 0x49, 0x73, 0x46, 0x75, 0x6c, 0x6c, 0x18, 0x02, 0x20, 0x01, - 0x28, 0x08, 0x52, 0x06, 0x49, 0x73, 0x46, 0x75, 0x6c, 0x6c, 0x12, 0x1c, 0x0a, 0x09, 0x44, 0x65, - 0x6c, 0x4e, 0x6f, 0x64, 0x65, 0x49, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x44, - 0x65, 0x6c, 0x4e, 0x6f, 0x64, 0x65, 0x49, 0x64, 0x12, 0x29, 0x0a, 0x08, 0x6e, 0x6f, 0x64, 0x65, - 0x49, 0x6e, 0x66, 0x6f, 0x18, 0x04, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x0d, 0x2e, 0x72, 0x70, 0x63, - 0x2e, 0x4e, 0x6f, 0x64, 0x65, 0x49, 0x6e, 0x66, 0x6f, 0x52, 0x08, 0x6e, 0x6f, 0x64, 0x65, 0x49, - 0x6e, 0x66, 0x6f, 0x22, 0x3a, 0x0a, 0x0d, 0x4e, 0x6f, 0x64, 0x65, 0x52, 0x65, 0x74, 0x69, 0x72, - 0x65, 0x52, 0x65, 0x71, 0x12, 0x29, 0x0a, 0x08, 0x6e, 0x6f, 0x64, 0x65, 0x49, 0x6e, 0x66, 0x6f, - 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0d, 0x2e, 0x72, 0x70, 0x63, 0x2e, 0x4e, 0x6f, 0x64, - 0x65, 0x49, 0x6e, 0x66, 0x6f, 0x52, 0x08, 0x6e, 0x6f, 0x64, 0x65, 0x49, 0x6e, 0x66, 0x6f, 0x22, - 0x07, 0x0a, 0x05, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x42, 0x07, 0x5a, 0x05, 0x2e, 0x3b, 0x72, 0x70, - 0x63, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x66, 0x6f, 0x22, 0x07, 0x0a, 0x05, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x1e, 0x0a, 0x04, 0x50, + 0x69, 0x6e, 0x67, 0x12, 0x16, 0x0a, 0x06, 0x4e, 0x6f, 0x64, 0x65, 0x49, 0x64, 0x18, 0x01, 0x20, + 0x01, 0x28, 0x09, 0x52, 0x06, 0x4e, 0x6f, 0x64, 0x65, 0x49, 0x64, 0x22, 0x16, 0x0a, 0x04, 0x50, + 0x6f, 0x6e, 0x67, 0x12, 0x0e, 0x0a, 0x02, 0x6f, 0x6b, 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, 0x52, + 0x02, 0x6f, 0x6b, 0x22, 0x31, 0x0a, 0x17, 0x55, 0x6e, 0x52, 0x65, 0x67, 0x53, 0x65, 0x72, 0x76, + 0x69, 0x63, 0x65, 0x44, 0x69, 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x52, 0x65, 0x71, 0x12, 0x16, + 0x0a, 0x06, 0x4e, 0x6f, 0x64, 0x65, 0x49, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, + 0x4e, 0x6f, 0x64, 0x65, 0x49, 0x64, 0x42, 0x07, 0x5a, 0x05, 0x2e, 0x3b, 0x72, 0x70, 0x63, 0x62, + 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -366,16 +516,19 @@ func file_rpcproto_origindiscover_proto_rawDescGZIP() []byte { return file_rpcproto_origindiscover_proto_rawDescData } -var file_rpcproto_origindiscover_proto_msgTypes = make([]protoimpl.MessageInfo, 5) +var file_rpcproto_origindiscover_proto_msgTypes = make([]protoimpl.MessageInfo, 8) var file_rpcproto_origindiscover_proto_goTypes = []interface{}{ (*NodeInfo)(nil), // 0: rpc.NodeInfo - (*ServiceDiscoverReq)(nil), // 1: rpc.ServiceDiscoverReq + (*RegServiceDiscoverReq)(nil), // 1: rpc.RegServiceDiscoverReq (*SubscribeDiscoverNotify)(nil), // 2: rpc.SubscribeDiscoverNotify (*NodeRetireReq)(nil), // 3: rpc.NodeRetireReq (*Empty)(nil), // 4: rpc.Empty + (*Ping)(nil), // 5: rpc.Ping + (*Pong)(nil), // 6: rpc.Pong + (*UnRegServiceDiscoverReq)(nil), // 7: rpc.UnRegServiceDiscoverReq } var file_rpcproto_origindiscover_proto_depIdxs = []int32{ - 0, // 0: rpc.ServiceDiscoverReq.nodeInfo:type_name -> rpc.NodeInfo + 0, // 0: rpc.RegServiceDiscoverReq.nodeInfo:type_name -> rpc.NodeInfo 0, // 1: rpc.SubscribeDiscoverNotify.nodeInfo:type_name -> rpc.NodeInfo 0, // 2: rpc.NodeRetireReq.nodeInfo:type_name -> rpc.NodeInfo 3, // [3:3] is the sub-list for method output_type @@ -404,7 +557,7 @@ func file_rpcproto_origindiscover_proto_init() { } } file_rpcproto_origindiscover_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*ServiceDiscoverReq); i { + switch v := v.(*RegServiceDiscoverReq); i { case 0: return &v.state case 1: @@ -451,6 +604,42 @@ func file_rpcproto_origindiscover_proto_init() { return nil } } + file_rpcproto_origindiscover_proto_msgTypes[5].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Ping); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_rpcproto_origindiscover_proto_msgTypes[6].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Pong); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_rpcproto_origindiscover_proto_msgTypes[7].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*UnRegServiceDiscoverReq); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } } type x struct{} out := protoimpl.TypeBuilder{ @@ -458,7 +647,7 @@ func file_rpcproto_origindiscover_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_rpcproto_origindiscover_proto_rawDesc, NumEnums: 0, - NumMessages: 5, + NumMessages: 8, NumExtensions: 0, NumServices: 0, }, diff --git a/rpc/origindiscover.proto b/rpc/origindiscover.proto index 67f4b9a..dae53b5 100644 --- a/rpc/origindiscover.proto +++ b/rpc/origindiscover.proto @@ -12,7 +12,7 @@ message NodeInfo{ } //Client->Master -message ServiceDiscoverReq{ +message RegServiceDiscoverReq{ NodeInfo nodeInfo = 1; } @@ -32,4 +32,18 @@ message NodeRetireReq{ //Master->Client message Empty{ -} \ No newline at end of file +} + +//Client->Master +message Ping{ + string NodeId = 1; +} + +//Master->Client +message Pong{ + bool ok = 1; +} + +message UnRegServiceDiscoverReq{ + string NodeId = 1; +} diff --git a/rpc/rclient.go b/rpc/rclient.go index cdb3429..0f459a5 100644 --- a/rpc/rclient.go +++ b/rpc/rclient.go @@ -16,7 +16,8 @@ type RClient struct { selfClient *Client network.TCPClient conn *network.TCPConn - TriggerRpcConnEvent + + notifyEventFun NotifyEventFun } func (rc *RClient) IsConnected() bool { @@ -80,7 +81,11 @@ func (rc *RClient) Run() { } }() - rc.TriggerRpcConnEvent(true, rc.selfClient.GetClientId(), rc.selfClient.GetTargetNodeId()) + var eventData RpcConnEvent + eventData.IsConnect = true + eventData.NodeId = rc.selfClient.GetTargetNodeId() + rc.notifyEventFun(&eventData) + for { bytes, err := rc.conn.ReadMsg() if err != nil { @@ -97,15 +102,16 @@ func (rc *RClient) Run() { } func (rc *RClient) OnClose() { - rc.TriggerRpcConnEvent(false, rc.selfClient.GetClientId(), rc.selfClient.GetTargetNodeId()) + var connEvent RpcConnEvent + connEvent.IsConnect = false + connEvent.NodeId = rc.selfClient.GetTargetNodeId() + rc.notifyEventFun(&connEvent) } -func NewRClient(targetNodeId string, addr string, maxRpcParamLen uint32,compressBytesLen int,triggerRpcConnEvent TriggerRpcConnEvent,callSet *CallSet) *Client{ +func NewRClient(targetNodeId string, addr string, maxRpcParamLen uint32,compressBytesLen int,callSet *CallSet,notifyEventFun NotifyEventFun) *Client{ client := &Client{} client.clientId = atomic.AddUint32(&clientSeq, 1) client.targetNodeId = targetNodeId - //client.maxCheckCallRpcCount = DefaultMaxCheckCallRpcCount - //client.callRpcTimeout = DefaultRpcTimeout client.compressBytesLen = compressBytesLen c:= &RClient{} @@ -114,7 +120,7 @@ func NewRClient(targetNodeId string, addr string, maxRpcParamLen uint32,compress c.ConnectInterval = DefaultConnectInterval c.PendingWriteNum = DefaultMaxPendingWriteNum c.AutoReconnect = true - c.TriggerRpcConnEvent = triggerRpcConnEvent + c.notifyEventFun = notifyEventFun c.ConnNum = DefaultRpcConnNum c.LenMsgLen = DefaultRpcLenMsgLen c.MinMsgLen = DefaultRpcMinMsgLen diff --git a/rpc/rpcevent.go b/rpc/rpcevent.go new file mode 100644 index 0000000..b416509 --- /dev/null +++ b/rpc/rpcevent.go @@ -0,0 +1,23 @@ +package rpc + +import "github.com/duanhf2012/origin/v2/event" + +type NotifyEventFun func (event event.IEvent) + +// RpcConnEvent Node结点连接事件 +type RpcConnEvent struct{ + IsConnect bool + NodeId string +} + +func (rc *RpcConnEvent) GetEventType() event.EventType{ + return event.Sys_Event_Node_Conn_Event +} + +type NatsConnEvent struct { + IsConnect bool +} + +func (nc *NatsConnEvent) GetEventType() event.EventType{ + return event.Sys_Event_Nats_Conn_Event +} diff --git a/rpc/rpchandler.go b/rpc/rpchandler.go index 64c5c25..ccb1a97 100644 --- a/rpc/rpchandler.go +++ b/rpc/rpchandler.go @@ -9,6 +9,7 @@ import ( "strings" "unicode" "unicode/utf8" + "github.com/duanhf2012/origin/v2/event" "time" ) @@ -65,12 +66,19 @@ type RpcHandler struct { pClientList []*Client } -type TriggerRpcConnEvent func(bConnect bool, clientSeq uint32, nodeId string) -type INodeListener interface { +//type TriggerRpcConnEvent func(bConnect bool, clientSeq uint32, nodeId string) +type NotifyEventToAllService func(event event.IEvent) + +type INodeConnListener interface { OnNodeConnected(nodeId string) OnNodeDisconnect(nodeId string) } +type INatsConnListener interface { + OnNatsConnected() + OnNatsDisconnect() +} + type IDiscoveryServiceListener interface { OnDiscoveryService(nodeId string, serviceName []string) OnUnDiscoveryService(nodeId string) diff --git a/rpc/rpcnats.go b/rpc/rpcnats.go index e820545..e4b9e03 100644 --- a/rpc/rpcnats.go +++ b/rpc/rpcnats.go @@ -16,13 +16,13 @@ func (rn *RpcNats) Start() error{ return rn.NatsClient.Start(rn.NatsServer.natsConn) } -func (rn *RpcNats) Init(natsUrl string, noRandomize bool, nodeId string,compressBytesLen int,rpcHandleFinder RpcHandleFinder){ +func (rn *RpcNats) Init(natsUrl string, noRandomize bool, nodeId string,compressBytesLen int,rpcHandleFinder RpcHandleFinder,notifyEventFun NotifyEventFun){ rn.NatsClient.localNodeId = nodeId - rn.NatsServer.initServer(natsUrl,noRandomize, nodeId,compressBytesLen,rpcHandleFinder) + rn.NatsServer.initServer(natsUrl,noRandomize, nodeId,compressBytesLen,rpcHandleFinder,notifyEventFun) rn.NatsServer.iServer = rn } -func (rn *RpcNats) NewNatsClient(targetNodeId string,localNodeId string,callSet *CallSet) *Client{ +func (rn *RpcNats) NewNatsClient(targetNodeId string,localNodeId string,callSet *CallSet,notifyEventFun NotifyEventFun) *Client{ var client Client client.clientId = atomic.AddUint32(&clientSeq, 1) @@ -33,6 +33,7 @@ func (rn *RpcNats) NewNatsClient(targetNodeId string,localNodeId string,callSet natsClient := &rn.NatsClient natsClient.localNodeId = localNodeId natsClient.client = &client + natsClient.notifyEventFun = notifyEventFun client.IRealClient = natsClient client.CallSet = callSet diff --git a/service/service.go b/service/service.go index 2e76631..94dab30 100644 --- a/service/service.go +++ b/service/service.go @@ -58,17 +58,14 @@ type Service struct { retire int32 eventProcessor event.IEventProcessor profiler *profiler.Profiler //性能分析器 - nodeEventLister rpc.INodeListener + nodeConnLister rpc.INodeConnListener + natsConnListener rpc.INatsConnListener discoveryServiceLister rpc.IDiscoveryServiceListener chanEvent chan event.IEvent closeSig chan struct{} } -// RpcConnEvent Node结点连接事件 -type RpcConnEvent struct{ - IsConnect bool - NodeId string -} + // DiscoveryServiceEvent 发现服务结点 type DiscoveryServiceEvent struct{ @@ -96,9 +93,6 @@ func (rpcEventData *DiscoveryServiceEvent) GetEventType() event.EventType{ return event.Sys_Event_DiscoverService } -func (rpcEventData *RpcConnEvent) GetEventType() event.EventType{ - return event.Sys_Event_Node_Event -} func (s *Service) OnSetup(iService IService){ if iService.GetName() == "" { @@ -322,12 +316,21 @@ func (s *Service) RegRawRpc(rpcMethodId uint32,rawRpcCB rpc.RawRpcCallBack){ func (s *Service) OnStart(){ } -func (s *Service) OnNodeEvent(ev event.IEvent){ - event := ev.(*RpcConnEvent) +func (s *Service) OnNodeConnEvent(ev event.IEvent){ + event := ev.(*rpc.RpcConnEvent) if event.IsConnect { - s.nodeEventLister.OnNodeConnected(event.NodeId) + s.nodeConnLister.OnNodeConnected(event.NodeId) }else{ - s.nodeEventLister.OnNodeDisconnect(event.NodeId) + s.nodeConnLister.OnNodeDisconnect(event.NodeId) + } +} + +func (s *Service) OnNatsConnEvent(ev event.IEvent){ + event := ev.(*rpc.NatsConnEvent) + if event.IsConnect { + s.natsConnListener.OnNatsConnected() + }else{ + s.natsConnListener.OnNatsDisconnect() } } @@ -340,14 +343,25 @@ func (s *Service) OnDiscoverServiceEvent(ev event.IEvent){ } } -func (s *Service) RegRpcListener(rpcEventLister rpc.INodeListener) { - s.nodeEventLister = rpcEventLister - s.RegEventReceiverFunc(event.Sys_Event_Node_Event,s.GetEventHandler(),s.OnNodeEvent) +func (s *Service) RegNodeConnListener(nodeConnListener rpc.INodeConnListener) { + s.nodeConnLister = nodeConnListener + s.RegEventReceiverFunc(event.Sys_Event_Node_Conn_Event,s.GetEventHandler(),s.OnNodeConnEvent) RegRpcEventFun(s.GetName()) } -func (s *Service) UnRegRpcListener(rpcLister rpc.INodeListener) { - s.UnRegEventReceiverFunc(event.Sys_Event_Node_Event,s.GetEventHandler()) +func (s *Service) UnRegNodeConnListener() { + s.UnRegEventReceiverFunc(event.Sys_Event_Node_Conn_Event,s.GetEventHandler()) + UnRegRpcEventFun(s.GetName()) +} + +func (s *Service) RegNatsConnListener(natsConnListener rpc.INatsConnListener) { + s.natsConnListener = natsConnListener + s.RegEventReceiverFunc(event.Sys_Event_Nats_Conn_Event,s.GetEventHandler(),s.OnNatsConnEvent) + RegRpcEventFun(s.GetName()) +} + +func (s *Service) UnRegNatsConnListener() { + s.UnRegEventReceiverFunc(event.Sys_Event_Nats_Conn_Event,s.GetEventHandler()) UnRegRpcEventFun(s.GetName()) } @@ -357,7 +371,7 @@ func (s *Service) RegDiscoverListener(discoveryServiceListener rpc.IDiscoverySer RegDiscoveryServiceEventFun(s.GetName()) } -func (s *Service) UnRegDiscoverListener(rpcLister rpc.INodeListener) { +func (s *Service) UnRegDiscoverListener() { s.UnRegEventReceiverFunc(event.Sys_Event_DiscoverService,s.GetEventHandler()) UnRegDiscoveryServiceEventFun(s.GetName()) }