diff --git a/cluster/cluster.go b/cluster/cluster.go index 5bc3a19..bcc0bfd 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -36,6 +36,7 @@ type NodeInfo struct { PublicServiceList []string //对外公开的服务列表 MasterDiscoveryService []MasterDiscoveryService //筛选发现的服务,如果不配置,不进行筛选 status NodeStatus + Retire bool } type NodeRpcInfo struct { @@ -55,8 +56,8 @@ type Cluster struct { locker sync.RWMutex //结点与服务关系保护锁 - mapRpc map[int]NodeRpcInfo //nodeId - mapIdNode map[int]NodeInfo //map[NodeId]NodeInfo + mapRpc map[int]*NodeRpcInfo //nodeId + //mapIdNode map[int]NodeInfo //map[NodeId]NodeInfo mapServiceNode map[string]map[int]struct{} //map[serviceName]map[NodeId] rpcServer rpc.Server @@ -87,10 +88,11 @@ func (cls *Cluster) Stop() { func (cls *Cluster) DiscardNode(nodeId int) { cls.locker.Lock() - nodeInfo, ok := cls.mapIdNode[nodeId] + nodeInfo, ok := cls.mapRpc[nodeId] + bDel := (ok == true) && nodeInfo.nodeInfo.status == Discard cls.locker.Unlock() - if ok == true && nodeInfo.status == Discard { + if bDel { cls.DelNode(nodeId, true) } } @@ -103,39 +105,30 @@ func (cls *Cluster) DelNode(nodeId int, immediately bool) { cls.locker.Lock() defer cls.locker.Unlock() - nodeInfo, ok := cls.mapIdNode[nodeId] + rpc, ok := cls.mapRpc[nodeId] if ok == false { return } - rpc, ok := cls.mapRpc[nodeId] - for { - //立即删除 - if immediately || ok == false { - break - } - + if immediately ==false { //正在连接中不主动断开,只断开没有连接中的 if rpc.client.IsConnected() { - nodeInfo.status = Discard - log.Info("Discard node",log.Int("nodeId",nodeInfo.NodeId),log.String("ListenAddr", nodeInfo.ListenAddr)) + rpc.nodeInfo.status = Discard + log.Info("Discard node",log.Int("nodeId",rpc.nodeInfo.NodeId),log.String("ListenAddr", rpc.nodeInfo.ListenAddr)) return } - - break } - for _, serviceName := range nodeInfo.ServiceList { + for _, serviceName := range rpc.nodeInfo.ServiceList { cls.delServiceNode(serviceName, nodeId) } - delete(cls.mapIdNode, nodeId) delete(cls.mapRpc, nodeId) if ok == true { rpc.client.Close(false) } - log.Info("remove node ",log.Int("NodeId", nodeInfo.NodeId),log.String("ListenAddr", nodeInfo.ListenAddr)) + log.Info("remove node ",log.Int("NodeId", rpc.nodeInfo.NodeId),log.String("ListenAddr", rpc.nodeInfo.ListenAddr)) } func (cls *Cluster) serviceDiscoveryDelNode(nodeId int, immediately bool) { @@ -168,9 +161,9 @@ func (cls *Cluster) serviceDiscoverySetNodeInfo(nodeInfo *NodeInfo) { defer cls.locker.Unlock() //先清一次的NodeId对应的所有服务清理 - lastNodeInfo, ok := cls.mapIdNode[nodeInfo.NodeId] + lastNodeInfo, ok := cls.mapRpc[nodeInfo.NodeId] if ok == true { - for _, serviceName := range lastNodeInfo.ServiceList { + for _, serviceName := range lastNodeInfo.nodeInfo.ServiceList { cls.delServiceNode(serviceName, nodeInfo.NodeId) } } @@ -189,27 +182,22 @@ func (cls *Cluster) serviceDiscoverySetNodeInfo(nodeInfo *NodeInfo) { } cls.mapServiceNode[serviceName][nodeInfo.NodeId] = struct{}{} } - cls.mapIdNode[nodeInfo.NodeId] = *nodeInfo - log.Info("Discovery nodeId",log.Int("NodeId", nodeInfo.NodeId),log.Any("services:", nodeInfo.PublicServiceList)) - //已经存在连接,则不需要进行设置 - if _, rpcInfoOK := cls.mapRpc[nodeInfo.NodeId]; rpcInfoOK == true { + if lastNodeInfo != nil { + log.Info("Discovery nodeId",log.Int("NodeId", nodeInfo.NodeId),log.Any("services:", nodeInfo.PublicServiceList),log.Bool("Retire",nodeInfo.Retire)) + lastNodeInfo.nodeInfo = *nodeInfo return } + //不存在时,则建立连接 rpcInfo := NodeRpcInfo{} rpcInfo.nodeInfo = *nodeInfo rpcInfo.client =rpc.NewRClient(nodeInfo.NodeId, nodeInfo.ListenAddr, nodeInfo.MaxRpcParamLen,cls.localNodeInfo.CompressBytesLen,cls.triggerRpcEvent) - cls.mapRpc[nodeInfo.NodeId] = rpcInfo + cls.mapRpc[nodeInfo.NodeId] = &rpcInfo + log.Info("Discovery nodeId and new rpc client",log.Int("NodeId", nodeInfo.NodeId),log.Any("services:", nodeInfo.PublicServiceList),log.Bool("Retire",nodeInfo.Retire),log.String("nodeListenAddr",nodeInfo.ListenAddr)) } -func (cls *Cluster) buildLocalRpc() { - rpcInfo := NodeRpcInfo{} - rpcInfo.nodeInfo = cls.localNodeInfo - rpcInfo.client = rpc.NewLClient(rpcInfo.nodeInfo.NodeId) - cls.mapRpc[cls.localNodeInfo.NodeId] = rpcInfo -} func (cls *Cluster) Init(localNodeId int, setupServiceFun SetupServiceFun) error { //1.初始化配置 @@ -219,7 +207,6 @@ func (cls *Cluster) Init(localNodeId int, setupServiceFun SetupServiceFun) error } cls.rpcServer.Init(cls) - cls.buildLocalRpc() //2.安装服务发现结点 cls.SetupServiceDiscovery(localNodeId, setupServiceFun) @@ -313,27 +300,33 @@ func (cls *Cluster) FindRpcHandler(serviceName string) rpc.IRpcHandler { return pService.GetRpcHandler() } -func (cls *Cluster) getRpcClient(nodeId int) *rpc.Client { +func (cls *Cluster) getRpcClient(nodeId int) (*rpc.Client,bool) { c, ok := cls.mapRpc[nodeId] if ok == false { - return nil + return nil,false } - return c.client + return c.client,c.nodeInfo.Retire } -func (cls *Cluster) GetRpcClient(nodeId int) *rpc.Client { +func (cls *Cluster) GetRpcClient(nodeId int) (*rpc.Client,bool) { cls.locker.RLock() defer cls.locker.RUnlock() return cls.getRpcClient(nodeId) } -func GetRpcClient(nodeId int, serviceMethod string, clientList []*rpc.Client) (error, int) { +func GetRpcClient(nodeId int, serviceMethod string,filterRetire bool, clientList []*rpc.Client) (error, int) { if nodeId > 0 { - pClient := GetCluster().GetRpcClient(nodeId) + pClient,retire := GetCluster().GetRpcClient(nodeId) if pClient == nil { return fmt.Errorf("cannot find nodeid %d!", nodeId), 0 } + + //如果需要筛选掉退休结点 + if filterRetire == true && retire == true { + return fmt.Errorf("cannot find nodeid %d!", nodeId), 0 + } + clientList[0] = pClient return nil, 1 } @@ -345,7 +338,7 @@ func GetRpcClient(nodeId int, serviceMethod string, clientList []*rpc.Client) (e serviceName := serviceMethod[:findIndex] //1.找到对应的rpcNodeid - return GetCluster().GetNodeIdByService(serviceName, clientList, true) + return GetCluster().GetNodeIdByService(serviceName, clientList, filterRetire) } func GetRpcServer() *rpc.Server { @@ -353,7 +346,7 @@ func GetRpcServer() *rpc.Server { } func (cls *Cluster) IsNodeConnected(nodeId int) bool { - pClient := cls.GetRpcClient(nodeId) + pClient,_ := cls.GetRpcClient(nodeId) return pClient != nil && pClient.IsConnected() } @@ -475,11 +468,3 @@ func (cls *Cluster) GetGlobalCfg() interface{} { return cls.globalCfg } - -func (cls *Cluster) GetNodeInfo(nodeId int) (NodeInfo,bool) { - cls.locker.RLock() - defer cls.locker.RUnlock() - - nodeInfo,ok:= cls.mapIdNode[nodeId] - return nodeInfo,ok -} diff --git a/cluster/dynamicdiscovery.go b/cluster/dynamicdiscovery.go index 3e7495d..84c6cca 100644 --- a/cluster/dynamicdiscovery.go +++ b/cluster/dynamicdiscovery.go @@ -7,6 +7,7 @@ import ( "github.com/duanhf2012/origin/service" "time" "github.com/duanhf2012/origin/util/timer" + "google.golang.org/protobuf/proto" ) const DynamicDiscoveryMasterName = "DiscoveryMaster" @@ -14,6 +15,7 @@ const DynamicDiscoveryClientName = "DiscoveryClient" const RegServiceDiscover = DynamicDiscoveryMasterName + ".RPC_RegServiceDiscover" const SubServiceDiscover = DynamicDiscoveryClientName + ".RPC_SubServiceDiscover" const AddSubServiceDiscover = DynamicDiscoveryMasterName + ".RPC_AddSubServiceDiscover" +const NodeRetireRpcMethod = DynamicDiscoveryMasterName+".RPC_NodeRetire" type DynamicDiscoveryMaster struct { service.Service @@ -30,6 +32,7 @@ type DynamicDiscoveryClient struct { localNodeId int mapDiscovery map[int32]map[int32]struct{} //map[masterNodeId]map[nodeId]struct{} + bRetire bool } var masterService DynamicDiscoveryMaster @@ -49,16 +52,32 @@ func (ds *DynamicDiscoveryMaster) isRegNode(nodeId int32) bool { return ok } -func (ds *DynamicDiscoveryMaster) addNodeInfo(nodeInfo *rpc.NodeInfo) { - if len(nodeInfo.PublicServiceList) == 0 { +func (ds *DynamicDiscoveryMaster) updateNodeInfo(nInfo *rpc.NodeInfo) { + if _,ok:= ds.mapNodeInfo[nInfo.NodeId];ok == false { return } - _, ok := ds.mapNodeInfo[nodeInfo.NodeId] + nodeInfo := proto.Clone(nInfo).(*rpc.NodeInfo) + for i:=0;i 0 { @@ -293,8 +327,7 @@ func (dc *DynamicDiscoveryClient) RPC_SubServiceDiscover(req *rpc.SubscribeDisco //删除不必要的结点 for _, nodeId := range willDelNodeId { - nodeInfo,_ := cluster.GetNodeInfo(int(nodeId)) - cluster.TriggerDiscoveryEvent(false,int(nodeId),nodeInfo.PublicServiceList) + cluster.TriggerDiscoveryEvent(false,int(nodeId),nil) dc.removeMasterNode(req.MasterNodeId, int32(nodeId)) if dc.findNodeId(nodeId) == false { dc.funDelService(int(nodeId), false) @@ -328,6 +361,29 @@ func (dc *DynamicDiscoveryClient) OnNodeConnected(nodeId int) { dc.regServiceDiscover(nodeId) } +func (dc *DynamicDiscoveryClient) OnRetire(){ + dc.bRetire = true + + masterNodeList := cluster.GetDiscoveryNodeList() + for i:=0;i= cap(rpcClientList) { diff --git a/event/eventtype.go b/event/eventtype.go index a0ff85e..d96cb14 100644 --- a/event/eventtype.go +++ b/event/eventtype.go @@ -7,16 +7,15 @@ const ( ServiceRpcRequestEvent EventType = -1 ServiceRpcResponseEvent EventType = -2 - Sys_Event_Tcp EventType = -3 - Sys_Event_Http_Event EventType = -4 - Sys_Event_WebSocket EventType = -5 - Sys_Event_Node_Event EventType = -6 - Sys_Event_DiscoverService EventType = -7 - Sys_Event_DiscardGoroutine EventType = -8 - Sys_Event_QueueTaskFinish EventType = -9 - - Sys_Event_User_Define EventType = 1 - + Sys_Event_Tcp EventType = -3 + Sys_Event_Http_Event EventType = -4 + Sys_Event_WebSocket EventType = -5 + Sys_Event_Node_Event EventType = -6 + Sys_Event_DiscoverService EventType = -7 + Sys_Event_DiscardGoroutine EventType = -8 + Sys_Event_QueueTaskFinish EventType = -9 + Sys_Event_Retire EventType = -10 + Sys_Event_User_Define EventType = 1 ) diff --git a/node/node.go b/node/node.go index d669a12..f91dc9c 100644 --- a/node/node.go +++ b/node/node.go @@ -28,6 +28,10 @@ var profilerInterval time.Duration var bValid bool var configDir = "./config/" +const( + SingleStop syscall.Signal = 10 + SignalRetire syscall.Signal = 12 +) type BuildOSType = int8 @@ -38,13 +42,14 @@ const( ) func init() { - sig = make(chan os.Signal, 3) - signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM, syscall.Signal(10)) + sig = make(chan os.Signal, 4) + signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM, SingleStop,SignalRetire) console.RegisterCommandBool("help", false, "<-help> This help.", usage) console.RegisterCommandString("name", "", "<-name nodeName> Node's name.", setName) console.RegisterCommandString("start", "", "<-start nodeid=nodeid> Run originserver.", startNode) console.RegisterCommandString("stop", "", "<-stop nodeid=nodeid> Stop originserver process.", stopNode) + console.RegisterCommandString("retire", "", "<-retire nodeid=nodeid> retire originserver process.", retireNode) console.RegisterCommandString("config", "", "<-config path> Configuration file path.", setConfigPath) console.RegisterCommandString("console", "", "<-console true|false> Turn on or off screen log output.", openConsole) console.RegisterCommandString("loglevel", "debug", "<-loglevel debug|release|warning|error|fatal> Set loglevel.", setLevel) @@ -54,6 +59,11 @@ func init() { console.RegisterCommandString("pprof", "", "<-pprof ip:port> Open performance analysis.", setPprof) } + +func notifyAllServiceRetire(){ + service.NotifyAllServiceRetire() +} + func usage(val interface{}) error { ret := val.(bool) if ret == false { @@ -201,6 +211,36 @@ func Start() { } } + +func retireNode(args interface{}) error { + //1.解析参数 + param := args.(string) + if param == "" { + return nil + } + + sParam := strings.Split(param, "=") + if len(sParam) != 2 { + return fmt.Errorf("invalid option %s", param) + } + if sParam[0] != "nodeid" { + return fmt.Errorf("invalid option %s", param) + } + nId, err := strconv.Atoi(sParam[1]) + if err != nil { + return fmt.Errorf("invalid option %s", param) + } + + processId, err := getRunProcessPid(nId) + if err != nil { + return err + } + + RetireProcess(processId) + return nil +} + + func stopNode(args interface{}) error { //1.解析参数 param := args.(string) @@ -215,12 +255,12 @@ func stopNode(args interface{}) error { if sParam[0] != "nodeid" { return fmt.Errorf("invalid option %s", param) } - nodeId, err := strconv.Atoi(sParam[1]) + nId, err := strconv.Atoi(sParam[1]) if err != nil { return fmt.Errorf("invalid option %s", param) } - processId, err := getRunProcessPid(nodeId) + processId, err := getRunProcessPid(nId) if err != nil { return err } @@ -268,15 +308,23 @@ func startNode(args interface{}) error { if profilerInterval > 0 { pProfilerTicker = time.NewTicker(profilerInterval) } + for bRun { select { - case <-sig: - log.Info("receipt stop signal.") - bRun = false + case s := <-sig: + signal := s.(syscall.Signal) + if signal == SignalRetire { + log.Info("receipt downline signal.") + notifyAllServiceRetire() + }else { + bRun = false + log.Info("receipt stop signal.") + } case <-pProfilerTicker.C: profiler.Report() } } + cluster.GetCluster().Stop() //7.退出 service.StopAllService() diff --git a/node/node_linux.go b/node/node_linux.go index 8661b04..fe98f9a 100644 --- a/node/node_linux.go +++ b/node/node_linux.go @@ -8,7 +8,7 @@ import ( ) func KillProcess(processId int){ - err := syscall.Kill(processId,syscall.Signal(10)) + err := syscall.Kill(processId,SingleStop) if err != nil { fmt.Printf("kill processid %d is fail:%+v.\n",processId,err) }else{ @@ -19,3 +19,12 @@ func KillProcess(processId int){ func GetBuildOSType() BuildOSType{ return Linux } + +func RetireProcess(processId int){ + err := syscall.Kill(processId,SignalRetire) + if err != nil { + fmt.Printf("offline processid %d is fail:%+v.\n",processId,err) + }else{ + fmt.Printf("offline processid %d is successful.\n",processId) + } +} diff --git a/node/node_mac.go b/node/node_mac.go index fa780c1..88f0fb4 100644 --- a/node/node_mac.go +++ b/node/node_mac.go @@ -8,7 +8,7 @@ import ( ) func KillProcess(processId int){ - err := syscall.Kill(processId,syscall.Signal(10)) + err := syscall.Kill(processId,SingleStop) if err != nil { fmt.Printf("kill processid %d is fail:%+v.\n",processId,err) }else{ @@ -19,3 +19,12 @@ func KillProcess(processId int){ func GetBuildOSType() BuildOSType{ return Mac } + +func RetireProcess(processId int){ + err := syscall.Kill(processId,SignalRetire) + if err != nil { + fmt.Printf("offline processid %d is fail:%+v.\n",processId,err) + }else{ + fmt.Printf("offline processid %d is successful.\n",processId) + } +} diff --git a/node/node_win.go b/node/node_win.go index 1e84112..ac44e1e 100644 --- a/node/node_win.go +++ b/node/node_win.go @@ -2,10 +2,28 @@ package node -func KillProcess(processId int){ +import ( + "os" + "fmt" +) +func KillProcess(processId int){ + procss,err := os.FindProcess(processId) + if err != nil { + fmt.Printf("kill processid %d is fail:%+v.\n",processId,err) + return + } + + err = procss.Kill() + if err != nil { + fmt.Printf("kill processid %d is fail:%+v.\n",processId,err) + } } func GetBuildOSType() BuildOSType{ return Windows } + +func RetireProcess(processId int){ + fmt.Printf("This command does not support Windows") +} diff --git a/rpc/dynamicdiscover.pb.go b/rpc/dynamicdiscover.pb.go index 1d31933..2e3003c 100644 --- a/rpc/dynamicdiscover.pb.go +++ b/rpc/dynamicdiscover.pb.go @@ -1,8 +1,8 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.31.0 -// protoc v3.11.4 -// source: test/rpc/dynamicdiscover.proto +// protoc v4.24.0 +// source: proto/rpcproto/dynamicdiscover.proto package rpc @@ -30,13 +30,14 @@ type NodeInfo struct { ListenAddr string `protobuf:"bytes,3,opt,name=ListenAddr,proto3" json:"ListenAddr,omitempty"` MaxRpcParamLen uint32 `protobuf:"varint,4,opt,name=MaxRpcParamLen,proto3" json:"MaxRpcParamLen,omitempty"` Private bool `protobuf:"varint,5,opt,name=Private,proto3" json:"Private,omitempty"` - PublicServiceList []string `protobuf:"bytes,6,rep,name=PublicServiceList,proto3" json:"PublicServiceList,omitempty"` + Retire bool `protobuf:"varint,6,opt,name=Retire,proto3" json:"Retire,omitempty"` + PublicServiceList []string `protobuf:"bytes,7,rep,name=PublicServiceList,proto3" json:"PublicServiceList,omitempty"` } func (x *NodeInfo) Reset() { *x = NodeInfo{} if protoimpl.UnsafeEnabled { - mi := &file_test_rpc_dynamicdiscover_proto_msgTypes[0] + mi := &file_proto_rpcproto_dynamicdiscover_proto_msgTypes[0] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -49,7 +50,7 @@ func (x *NodeInfo) String() string { func (*NodeInfo) ProtoMessage() {} func (x *NodeInfo) ProtoReflect() protoreflect.Message { - mi := &file_test_rpc_dynamicdiscover_proto_msgTypes[0] + mi := &file_proto_rpcproto_dynamicdiscover_proto_msgTypes[0] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -62,7 +63,7 @@ func (x *NodeInfo) ProtoReflect() protoreflect.Message { // Deprecated: Use NodeInfo.ProtoReflect.Descriptor instead. func (*NodeInfo) Descriptor() ([]byte, []int) { - return file_test_rpc_dynamicdiscover_proto_rawDescGZIP(), []int{0} + return file_proto_rpcproto_dynamicdiscover_proto_rawDescGZIP(), []int{0} } func (x *NodeInfo) GetNodeId() int32 { @@ -100,6 +101,13 @@ func (x *NodeInfo) GetPrivate() bool { return false } +func (x *NodeInfo) GetRetire() bool { + if x != nil { + return x.Retire + } + return false +} + func (x *NodeInfo) GetPublicServiceList() []string { if x != nil { return x.PublicServiceList @@ -119,7 +127,7 @@ type ServiceDiscoverReq struct { func (x *ServiceDiscoverReq) Reset() { *x = ServiceDiscoverReq{} if protoimpl.UnsafeEnabled { - mi := &file_test_rpc_dynamicdiscover_proto_msgTypes[1] + mi := &file_proto_rpcproto_dynamicdiscover_proto_msgTypes[1] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -132,7 +140,7 @@ func (x *ServiceDiscoverReq) String() string { func (*ServiceDiscoverReq) ProtoMessage() {} func (x *ServiceDiscoverReq) ProtoReflect() protoreflect.Message { - mi := &file_test_rpc_dynamicdiscover_proto_msgTypes[1] + mi := &file_proto_rpcproto_dynamicdiscover_proto_msgTypes[1] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -145,7 +153,7 @@ func (x *ServiceDiscoverReq) ProtoReflect() protoreflect.Message { // Deprecated: Use ServiceDiscoverReq.ProtoReflect.Descriptor instead. func (*ServiceDiscoverReq) Descriptor() ([]byte, []int) { - return file_test_rpc_dynamicdiscover_proto_rawDescGZIP(), []int{1} + return file_proto_rpcproto_dynamicdiscover_proto_rawDescGZIP(), []int{1} } func (x *ServiceDiscoverReq) GetNodeInfo() *NodeInfo { @@ -170,7 +178,7 @@ type SubscribeDiscoverNotify struct { func (x *SubscribeDiscoverNotify) Reset() { *x = SubscribeDiscoverNotify{} if protoimpl.UnsafeEnabled { - mi := &file_test_rpc_dynamicdiscover_proto_msgTypes[2] + mi := &file_proto_rpcproto_dynamicdiscover_proto_msgTypes[2] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -183,7 +191,7 @@ func (x *SubscribeDiscoverNotify) String() string { func (*SubscribeDiscoverNotify) ProtoMessage() {} func (x *SubscribeDiscoverNotify) ProtoReflect() protoreflect.Message { - mi := &file_test_rpc_dynamicdiscover_proto_msgTypes[2] + mi := &file_proto_rpcproto_dynamicdiscover_proto_msgTypes[2] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -196,7 +204,7 @@ func (x *SubscribeDiscoverNotify) ProtoReflect() protoreflect.Message { // Deprecated: Use SubscribeDiscoverNotify.ProtoReflect.Descriptor instead. func (*SubscribeDiscoverNotify) Descriptor() ([]byte, []int) { - return file_test_rpc_dynamicdiscover_proto_rawDescGZIP(), []int{2} + return file_proto_rpcproto_dynamicdiscover_proto_rawDescGZIP(), []int{2} } func (x *SubscribeDiscoverNotify) GetMasterNodeId() int32 { @@ -227,6 +235,54 @@ func (x *SubscribeDiscoverNotify) GetNodeInfo() []*NodeInfo { return nil } +// Client->Master +type NodeRetireReq struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + NodeInfo *NodeInfo `protobuf:"bytes,1,opt,name=nodeInfo,proto3" json:"nodeInfo,omitempty"` +} + +func (x *NodeRetireReq) Reset() { + *x = NodeRetireReq{} + if protoimpl.UnsafeEnabled { + mi := &file_proto_rpcproto_dynamicdiscover_proto_msgTypes[3] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *NodeRetireReq) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*NodeRetireReq) ProtoMessage() {} + +func (x *NodeRetireReq) ProtoReflect() protoreflect.Message { + mi := &file_proto_rpcproto_dynamicdiscover_proto_msgTypes[3] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use NodeRetireReq.ProtoReflect.Descriptor instead. +func (*NodeRetireReq) Descriptor() ([]byte, []int) { + return file_proto_rpcproto_dynamicdiscover_proto_rawDescGZIP(), []int{3} +} + +func (x *NodeRetireReq) GetNodeInfo() *NodeInfo { + if x != nil { + return x.NodeInfo + } + return nil +} + // Master->Client type Empty struct { state protoimpl.MessageState @@ -237,7 +293,7 @@ type Empty struct { func (x *Empty) Reset() { *x = Empty{} if protoimpl.UnsafeEnabled { - mi := &file_test_rpc_dynamicdiscover_proto_msgTypes[3] + mi := &file_proto_rpcproto_dynamicdiscover_proto_msgTypes[4] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -250,7 +306,7 @@ func (x *Empty) String() string { func (*Empty) ProtoMessage() {} func (x *Empty) ProtoReflect() protoreflect.Message { - mi := &file_test_rpc_dynamicdiscover_proto_msgTypes[3] + mi := &file_proto_rpcproto_dynamicdiscover_proto_msgTypes[4] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -263,82 +319,89 @@ func (x *Empty) ProtoReflect() protoreflect.Message { // Deprecated: Use Empty.ProtoReflect.Descriptor instead. func (*Empty) Descriptor() ([]byte, []int) { - return file_test_rpc_dynamicdiscover_proto_rawDescGZIP(), []int{3} + return file_proto_rpcproto_dynamicdiscover_proto_rawDescGZIP(), []int{4} } -var File_test_rpc_dynamicdiscover_proto protoreflect.FileDescriptor +var File_proto_rpcproto_dynamicdiscover_proto protoreflect.FileDescriptor -var file_test_rpc_dynamicdiscover_proto_rawDesc = []byte{ - 0x0a, 0x1e, 0x74, 0x65, 0x73, 0x74, 0x2f, 0x72, 0x70, 0x63, 0x2f, 0x64, 0x79, 0x6e, 0x61, 0x6d, - 0x69, 0x63, 0x64, 0x69, 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, - 0x12, 0x03, 0x72, 0x70, 0x63, 0x22, 0xce, 0x01, 0x0a, 0x08, 0x4e, 0x6f, 0x64, 0x65, 0x49, 0x6e, - 0x66, 0x6f, 0x12, 0x16, 0x0a, 0x06, 0x4e, 0x6f, 0x64, 0x65, 0x49, 0x64, 0x18, 0x01, 0x20, 0x01, - 0x28, 0x05, 0x52, 0x06, 0x4e, 0x6f, 0x64, 0x65, 0x49, 0x64, 0x12, 0x1a, 0x0a, 0x08, 0x4e, 0x6f, - 0x64, 0x65, 0x4e, 0x61, 0x6d, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x4e, 0x6f, - 0x64, 0x65, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x1e, 0x0a, 0x0a, 0x4c, 0x69, 0x73, 0x74, 0x65, 0x6e, - 0x41, 0x64, 0x64, 0x72, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x4c, 0x69, 0x73, 0x74, - 0x65, 0x6e, 0x41, 0x64, 0x64, 0x72, 0x12, 0x26, 0x0a, 0x0e, 0x4d, 0x61, 0x78, 0x52, 0x70, 0x63, - 0x50, 0x61, 0x72, 0x61, 0x6d, 0x4c, 0x65, 0x6e, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x0e, - 0x4d, 0x61, 0x78, 0x52, 0x70, 0x63, 0x50, 0x61, 0x72, 0x61, 0x6d, 0x4c, 0x65, 0x6e, 0x12, 0x18, - 0x0a, 0x07, 0x50, 0x72, 0x69, 0x76, 0x61, 0x74, 0x65, 0x18, 0x05, 0x20, 0x01, 0x28, 0x08, 0x52, - 0x07, 0x50, 0x72, 0x69, 0x76, 0x61, 0x74, 0x65, 0x12, 0x2c, 0x0a, 0x11, 0x50, 0x75, 0x62, 0x6c, - 0x69, 0x63, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x4c, 0x69, 0x73, 0x74, 0x18, 0x06, 0x20, - 0x03, 0x28, 0x09, 0x52, 0x11, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x63, 0x53, 0x65, 0x72, 0x76, 0x69, - 0x63, 0x65, 0x4c, 0x69, 0x73, 0x74, 0x22, 0x3f, 0x0a, 0x12, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, - 0x65, 0x44, 0x69, 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x52, 0x65, 0x71, 0x12, 0x29, 0x0a, 0x08, - 0x6e, 0x6f, 0x64, 0x65, 0x49, 0x6e, 0x66, 0x6f, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0d, - 0x2e, 0x72, 0x70, 0x63, 0x2e, 0x4e, 0x6f, 0x64, 0x65, 0x49, 0x6e, 0x66, 0x6f, 0x52, 0x08, 0x6e, - 0x6f, 0x64, 0x65, 0x49, 0x6e, 0x66, 0x6f, 0x22, 0x9e, 0x01, 0x0a, 0x17, 0x53, 0x75, 0x62, 0x73, - 0x63, 0x72, 0x69, 0x62, 0x65, 0x44, 0x69, 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x4e, 0x6f, 0x74, - 0x69, 0x66, 0x79, 0x12, 0x22, 0x0a, 0x0c, 0x4d, 0x61, 0x73, 0x74, 0x65, 0x72, 0x4e, 0x6f, 0x64, - 0x65, 0x49, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x05, 0x52, 0x0c, 0x4d, 0x61, 0x73, 0x74, 0x65, - 0x72, 0x4e, 0x6f, 0x64, 0x65, 0x49, 0x64, 0x12, 0x16, 0x0a, 0x06, 0x49, 0x73, 0x46, 0x75, 0x6c, - 0x6c, 0x18, 0x02, 0x20, 0x01, 0x28, 0x08, 0x52, 0x06, 0x49, 0x73, 0x46, 0x75, 0x6c, 0x6c, 0x12, - 0x1c, 0x0a, 0x09, 0x44, 0x65, 0x6c, 0x4e, 0x6f, 0x64, 0x65, 0x49, 0x64, 0x18, 0x03, 0x20, 0x01, - 0x28, 0x05, 0x52, 0x09, 0x44, 0x65, 0x6c, 0x4e, 0x6f, 0x64, 0x65, 0x49, 0x64, 0x12, 0x29, 0x0a, - 0x08, 0x6e, 0x6f, 0x64, 0x65, 0x49, 0x6e, 0x66, 0x6f, 0x18, 0x04, 0x20, 0x03, 0x28, 0x0b, 0x32, - 0x0d, 0x2e, 0x72, 0x70, 0x63, 0x2e, 0x4e, 0x6f, 0x64, 0x65, 0x49, 0x6e, 0x66, 0x6f, 0x52, 0x08, - 0x6e, 0x6f, 0x64, 0x65, 0x49, 0x6e, 0x66, 0x6f, 0x22, 0x07, 0x0a, 0x05, 0x45, 0x6d, 0x70, 0x74, - 0x79, 0x42, 0x07, 0x5a, 0x05, 0x2e, 0x3b, 0x72, 0x70, 0x63, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, - 0x6f, 0x33, +var file_proto_rpcproto_dynamicdiscover_proto_rawDesc = []byte{ + 0x0a, 0x24, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x72, 0x70, 0x63, 0x70, 0x72, 0x6f, 0x74, 0x6f, + 0x2f, 0x64, 0x79, 0x6e, 0x61, 0x6d, 0x69, 0x63, 0x64, 0x69, 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, + 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x03, 0x72, 0x70, 0x63, 0x22, 0xe6, 0x01, 0x0a, 0x08, + 0x4e, 0x6f, 0x64, 0x65, 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x16, 0x0a, 0x06, 0x4e, 0x6f, 0x64, 0x65, + 0x49, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x05, 0x52, 0x06, 0x4e, 0x6f, 0x64, 0x65, 0x49, 0x64, + 0x12, 0x1a, 0x0a, 0x08, 0x4e, 0x6f, 0x64, 0x65, 0x4e, 0x61, 0x6d, 0x65, 0x18, 0x02, 0x20, 0x01, + 0x28, 0x09, 0x52, 0x08, 0x4e, 0x6f, 0x64, 0x65, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x1e, 0x0a, 0x0a, + 0x4c, 0x69, 0x73, 0x74, 0x65, 0x6e, 0x41, 0x64, 0x64, 0x72, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, + 0x52, 0x0a, 0x4c, 0x69, 0x73, 0x74, 0x65, 0x6e, 0x41, 0x64, 0x64, 0x72, 0x12, 0x26, 0x0a, 0x0e, + 0x4d, 0x61, 0x78, 0x52, 0x70, 0x63, 0x50, 0x61, 0x72, 0x61, 0x6d, 0x4c, 0x65, 0x6e, 0x18, 0x04, + 0x20, 0x01, 0x28, 0x0d, 0x52, 0x0e, 0x4d, 0x61, 0x78, 0x52, 0x70, 0x63, 0x50, 0x61, 0x72, 0x61, + 0x6d, 0x4c, 0x65, 0x6e, 0x12, 0x18, 0x0a, 0x07, 0x50, 0x72, 0x69, 0x76, 0x61, 0x74, 0x65, 0x18, + 0x05, 0x20, 0x01, 0x28, 0x08, 0x52, 0x07, 0x50, 0x72, 0x69, 0x76, 0x61, 0x74, 0x65, 0x12, 0x16, + 0x0a, 0x06, 0x52, 0x65, 0x74, 0x69, 0x72, 0x65, 0x18, 0x06, 0x20, 0x01, 0x28, 0x08, 0x52, 0x06, + 0x52, 0x65, 0x74, 0x69, 0x72, 0x65, 0x12, 0x2c, 0x0a, 0x11, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x63, + 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x4c, 0x69, 0x73, 0x74, 0x18, 0x07, 0x20, 0x03, 0x28, + 0x09, 0x52, 0x11, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x63, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, + 0x4c, 0x69, 0x73, 0x74, 0x22, 0x3f, 0x0a, 0x12, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x44, + 0x69, 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x52, 0x65, 0x71, 0x12, 0x29, 0x0a, 0x08, 0x6e, 0x6f, + 0x64, 0x65, 0x49, 0x6e, 0x66, 0x6f, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0d, 0x2e, 0x72, + 0x70, 0x63, 0x2e, 0x4e, 0x6f, 0x64, 0x65, 0x49, 0x6e, 0x66, 0x6f, 0x52, 0x08, 0x6e, 0x6f, 0x64, + 0x65, 0x49, 0x6e, 0x66, 0x6f, 0x22, 0x9e, 0x01, 0x0a, 0x17, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, + 0x69, 0x62, 0x65, 0x44, 0x69, 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x4e, 0x6f, 0x74, 0x69, 0x66, + 0x79, 0x12, 0x22, 0x0a, 0x0c, 0x4d, 0x61, 0x73, 0x74, 0x65, 0x72, 0x4e, 0x6f, 0x64, 0x65, 0x49, + 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x05, 0x52, 0x0c, 0x4d, 0x61, 0x73, 0x74, 0x65, 0x72, 0x4e, + 0x6f, 0x64, 0x65, 0x49, 0x64, 0x12, 0x16, 0x0a, 0x06, 0x49, 0x73, 0x46, 0x75, 0x6c, 0x6c, 0x18, + 0x02, 0x20, 0x01, 0x28, 0x08, 0x52, 0x06, 0x49, 0x73, 0x46, 0x75, 0x6c, 0x6c, 0x12, 0x1c, 0x0a, + 0x09, 0x44, 0x65, 0x6c, 0x4e, 0x6f, 0x64, 0x65, 0x49, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x05, + 0x52, 0x09, 0x44, 0x65, 0x6c, 0x4e, 0x6f, 0x64, 0x65, 0x49, 0x64, 0x12, 0x29, 0x0a, 0x08, 0x6e, + 0x6f, 0x64, 0x65, 0x49, 0x6e, 0x66, 0x6f, 0x18, 0x04, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x0d, 0x2e, + 0x72, 0x70, 0x63, 0x2e, 0x4e, 0x6f, 0x64, 0x65, 0x49, 0x6e, 0x66, 0x6f, 0x52, 0x08, 0x6e, 0x6f, + 0x64, 0x65, 0x49, 0x6e, 0x66, 0x6f, 0x22, 0x3a, 0x0a, 0x0d, 0x4e, 0x6f, 0x64, 0x65, 0x52, 0x65, + 0x74, 0x69, 0x72, 0x65, 0x52, 0x65, 0x71, 0x12, 0x29, 0x0a, 0x08, 0x6e, 0x6f, 0x64, 0x65, 0x49, + 0x6e, 0x66, 0x6f, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0d, 0x2e, 0x72, 0x70, 0x63, 0x2e, + 0x4e, 0x6f, 0x64, 0x65, 0x49, 0x6e, 0x66, 0x6f, 0x52, 0x08, 0x6e, 0x6f, 0x64, 0x65, 0x49, 0x6e, + 0x66, 0x6f, 0x22, 0x07, 0x0a, 0x05, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x42, 0x07, 0x5a, 0x05, 0x2e, + 0x3b, 0x72, 0x70, 0x63, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( - file_test_rpc_dynamicdiscover_proto_rawDescOnce sync.Once - file_test_rpc_dynamicdiscover_proto_rawDescData = file_test_rpc_dynamicdiscover_proto_rawDesc + file_proto_rpcproto_dynamicdiscover_proto_rawDescOnce sync.Once + file_proto_rpcproto_dynamicdiscover_proto_rawDescData = file_proto_rpcproto_dynamicdiscover_proto_rawDesc ) -func file_test_rpc_dynamicdiscover_proto_rawDescGZIP() []byte { - file_test_rpc_dynamicdiscover_proto_rawDescOnce.Do(func() { - file_test_rpc_dynamicdiscover_proto_rawDescData = protoimpl.X.CompressGZIP(file_test_rpc_dynamicdiscover_proto_rawDescData) +func file_proto_rpcproto_dynamicdiscover_proto_rawDescGZIP() []byte { + file_proto_rpcproto_dynamicdiscover_proto_rawDescOnce.Do(func() { + file_proto_rpcproto_dynamicdiscover_proto_rawDescData = protoimpl.X.CompressGZIP(file_proto_rpcproto_dynamicdiscover_proto_rawDescData) }) - return file_test_rpc_dynamicdiscover_proto_rawDescData + return file_proto_rpcproto_dynamicdiscover_proto_rawDescData } -var file_test_rpc_dynamicdiscover_proto_msgTypes = make([]protoimpl.MessageInfo, 4) -var file_test_rpc_dynamicdiscover_proto_goTypes = []interface{}{ +var file_proto_rpcproto_dynamicdiscover_proto_msgTypes = make([]protoimpl.MessageInfo, 5) +var file_proto_rpcproto_dynamicdiscover_proto_goTypes = []interface{}{ (*NodeInfo)(nil), // 0: rpc.NodeInfo (*ServiceDiscoverReq)(nil), // 1: rpc.ServiceDiscoverReq (*SubscribeDiscoverNotify)(nil), // 2: rpc.SubscribeDiscoverNotify - (*Empty)(nil), // 3: rpc.Empty + (*NodeRetireReq)(nil), // 3: rpc.NodeRetireReq + (*Empty)(nil), // 4: rpc.Empty } -var file_test_rpc_dynamicdiscover_proto_depIdxs = []int32{ +var file_proto_rpcproto_dynamicdiscover_proto_depIdxs = []int32{ 0, // 0: rpc.ServiceDiscoverReq.nodeInfo:type_name -> rpc.NodeInfo 0, // 1: rpc.SubscribeDiscoverNotify.nodeInfo:type_name -> rpc.NodeInfo - 2, // [2:2] is the sub-list for method output_type - 2, // [2:2] is the sub-list for method input_type - 2, // [2:2] is the sub-list for extension type_name - 2, // [2:2] is the sub-list for extension extendee - 0, // [0:2] is the sub-list for field type_name + 0, // 2: rpc.NodeRetireReq.nodeInfo:type_name -> rpc.NodeInfo + 3, // [3:3] is the sub-list for method output_type + 3, // [3:3] is the sub-list for method input_type + 3, // [3:3] is the sub-list for extension type_name + 3, // [3:3] is the sub-list for extension extendee + 0, // [0:3] is the sub-list for field type_name } -func init() { file_test_rpc_dynamicdiscover_proto_init() } -func file_test_rpc_dynamicdiscover_proto_init() { - if File_test_rpc_dynamicdiscover_proto != nil { +func init() { file_proto_rpcproto_dynamicdiscover_proto_init() } +func file_proto_rpcproto_dynamicdiscover_proto_init() { + if File_proto_rpcproto_dynamicdiscover_proto != nil { return } if !protoimpl.UnsafeEnabled { - file_test_rpc_dynamicdiscover_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + file_proto_rpcproto_dynamicdiscover_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*NodeInfo); i { case 0: return &v.state @@ -350,7 +413,7 @@ func file_test_rpc_dynamicdiscover_proto_init() { return nil } } - file_test_rpc_dynamicdiscover_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + file_proto_rpcproto_dynamicdiscover_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*ServiceDiscoverReq); i { case 0: return &v.state @@ -362,7 +425,7 @@ func file_test_rpc_dynamicdiscover_proto_init() { return nil } } - file_test_rpc_dynamicdiscover_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { + file_proto_rpcproto_dynamicdiscover_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*SubscribeDiscoverNotify); i { case 0: return &v.state @@ -374,7 +437,19 @@ func file_test_rpc_dynamicdiscover_proto_init() { return nil } } - file_test_rpc_dynamicdiscover_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} { + file_proto_rpcproto_dynamicdiscover_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*NodeRetireReq); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_proto_rpcproto_dynamicdiscover_proto_msgTypes[4].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*Empty); i { case 0: return &v.state @@ -391,18 +466,18 @@ func file_test_rpc_dynamicdiscover_proto_init() { out := protoimpl.TypeBuilder{ File: protoimpl.DescBuilder{ GoPackagePath: reflect.TypeOf(x{}).PkgPath(), - RawDescriptor: file_test_rpc_dynamicdiscover_proto_rawDesc, + RawDescriptor: file_proto_rpcproto_dynamicdiscover_proto_rawDesc, NumEnums: 0, - NumMessages: 4, + NumMessages: 5, NumExtensions: 0, NumServices: 0, }, - GoTypes: file_test_rpc_dynamicdiscover_proto_goTypes, - DependencyIndexes: file_test_rpc_dynamicdiscover_proto_depIdxs, - MessageInfos: file_test_rpc_dynamicdiscover_proto_msgTypes, + GoTypes: file_proto_rpcproto_dynamicdiscover_proto_goTypes, + DependencyIndexes: file_proto_rpcproto_dynamicdiscover_proto_depIdxs, + MessageInfos: file_proto_rpcproto_dynamicdiscover_proto_msgTypes, }.Build() - File_test_rpc_dynamicdiscover_proto = out.File - file_test_rpc_dynamicdiscover_proto_rawDesc = nil - file_test_rpc_dynamicdiscover_proto_goTypes = nil - file_test_rpc_dynamicdiscover_proto_depIdxs = nil + File_proto_rpcproto_dynamicdiscover_proto = out.File + file_proto_rpcproto_dynamicdiscover_proto_rawDesc = nil + file_proto_rpcproto_dynamicdiscover_proto_goTypes = nil + file_proto_rpcproto_dynamicdiscover_proto_depIdxs = nil } diff --git a/rpc/dynamicdiscover.proto b/rpc/dynamicdiscover.proto index 3538e6b..6f5e1d8 100644 --- a/rpc/dynamicdiscover.proto +++ b/rpc/dynamicdiscover.proto @@ -8,7 +8,8 @@ message NodeInfo{ string ListenAddr = 3; uint32 MaxRpcParamLen = 4; bool Private = 5; - repeated string PublicServiceList = 6; + bool Retire = 6; + repeated string PublicServiceList = 7; } //Client->Master @@ -24,6 +25,12 @@ message SubscribeDiscoverNotify{ repeated NodeInfo nodeInfo = 4; } + +//Client->Master +message NodeRetireReq{ + NodeInfo nodeInfo = 1; +} + //Master->Client message Empty{ } \ No newline at end of file diff --git a/rpc/pbprocessor.go b/rpc/pbprocessor.go index 7d91453..3070f3f 100644 --- a/rpc/pbprocessor.go +++ b/rpc/pbprocessor.go @@ -74,7 +74,7 @@ func (slf *PBProcessor) IsParse(param interface{}) bool { } func (slf *PBProcessor) GetProcessorType() RpcProcessorType{ - return RpcProcessorGoGoPB + return RpcProcessorPB } func (slf *PBProcessor) Clone(src interface{}) (interface{},error){ diff --git a/rpc/rpchandler.go b/rpc/rpchandler.go index 459e5e5..8158137 100644 --- a/rpc/rpchandler.go +++ b/rpc/rpchandler.go @@ -14,7 +14,7 @@ import ( const maxClusterNode int = 128 -type FuncRpcClient func(nodeId int, serviceMethod string, client []*Client) (error, int) +type FuncRpcClient func(nodeId int, serviceMethod string,filterRetire bool, client []*Client) (error, int) type FuncRpcServer func() *Server @@ -73,7 +73,7 @@ type INodeListener interface { type IDiscoveryServiceListener interface { OnDiscoveryService(nodeId int, serviceName []string) - OnUnDiscoveryService(nodeId int, serviceName []string) + OnUnDiscoveryService(nodeId int) } type CancelRpc func() @@ -428,7 +428,7 @@ func (handler *RpcHandler) CallMethod(client *Client,ServiceMethod string, param func (handler *RpcHandler) goRpc(processor IRpcProcessor, bCast bool, nodeId int, serviceMethod string, args interface{}) error { var pClientList [maxClusterNode]*Client - err, count := handler.funcRpcClient(nodeId, serviceMethod, pClientList[:]) + err, count := handler.funcRpcClient(nodeId, serviceMethod,false, pClientList[:]) if count == 0 { if err != nil { log.Error("call serviceMethod is failed",log.String("serviceMethod",serviceMethod),log.ErrorAttr("error",err)) @@ -458,7 +458,7 @@ func (handler *RpcHandler) goRpc(processor IRpcProcessor, bCast bool, nodeId int func (handler *RpcHandler) callRpc(timeout time.Duration,nodeId int, serviceMethod string, args interface{}, reply interface{}) error { var pClientList [maxClusterNode]*Client - err, count := handler.funcRpcClient(nodeId, serviceMethod, pClientList[:]) + err, count := handler.funcRpcClient(nodeId, serviceMethod,false, pClientList[:]) if err != nil { log.Error("Call serviceMethod is failed",log.ErrorAttr("error",err)) return err @@ -502,7 +502,7 @@ func (handler *RpcHandler) asyncCallRpc(timeout time.Duration,nodeId int, servic reply := reflect.New(fVal.Type().In(0).Elem()).Interface() var pClientList [2]*Client - err, count := handler.funcRpcClient(nodeId, serviceMethod, pClientList[:]) + err, count := handler.funcRpcClient(nodeId, serviceMethod,false, pClientList[:]) if count == 0 || err != nil { if err == nil { if nodeId > 0 { @@ -585,7 +585,7 @@ func (handler *RpcHandler) CastGo(serviceMethod string, args interface{}) error func (handler *RpcHandler) RawGoNode(rpcProcessorType RpcProcessorType, nodeId int, rpcMethodId uint32, serviceName string, rawArgs []byte) error { processor := GetProcessor(uint8(rpcProcessorType)) - err, count := handler.funcRpcClient(nodeId, serviceName, handler.pClientList) + err, count := handler.funcRpcClient(nodeId, serviceName,false, handler.pClientList) if count == 0 || err != nil { log.Error("call serviceMethod is failed",log.ErrorAttr("error",err)) return err diff --git a/rpc/server.go b/rpc/server.go index c51d487..935aff6 100644 --- a/rpc/server.go +++ b/rpc/server.go @@ -17,7 +17,7 @@ type RpcProcessorType uint8 const ( RpcProcessorJson RpcProcessorType = 0 - RpcProcessorGoGoPB RpcProcessorType = 1 + RpcProcessorPB RpcProcessorType = 1 ) var arrayProcessor = []IRpcProcessor{&JsonProcessor{}, &PBProcessor{}} diff --git a/service/service.go b/service/service.go index cd8da8f..c353729 100644 --- a/service/service.go +++ b/service/service.go @@ -28,6 +28,7 @@ type IService interface { OnSetup(iService IService) OnInit() error OnStart() + OnRetire() OnRelease() SetName(serviceName string) @@ -40,6 +41,9 @@ type IService interface { SetEventChannelNum(num int) OpenProfiler() + + SetRetire() //设置服务退休状态 + IsRetire() bool //服务是否退休 } type Service struct { @@ -51,6 +55,7 @@ type Service struct { serviceCfg interface{} goroutineNum int32 startStatus bool + retire int32 eventProcessor event.IEventProcessor profiler *profiler.Profiler //性能分析器 nodeEventLister rpc.INodeListener @@ -97,6 +102,19 @@ func (s *Service) OpenProfiler() { } } +func (s *Service) IsRetire() bool{ + return atomic.LoadInt32(&s.retire) != 0 +} + +func (s *Service) SetRetire(){ + atomic.StoreInt32(&s.retire,1) + + ev := event.NewEvent() + ev.Type = event.Sys_Event_Retire + + s.pushEvent(ev) +} + func (s *Service) Init(iService IService,getClientFun rpc.FuncRpcClient,getServerFun rpc.FuncRpcServer,serviceCfg interface{}) { s.closeSig = make(chan struct{}) s.dispatcher =timer.NewDispatcher(timerDispatcherLen) @@ -155,6 +173,9 @@ func (s *Service) Run() { concurrent.DoCallback(cb) case ev := <- s.chanEvent: switch ev.GetEventType() { + case event.Sys_Event_Retire: + log.Info("service OnRetire",log.String("servceName",s.GetName())) + s.self.(IService).OnRetire() case event.ServiceRpcRequestEvent: cEvent,ok := ev.(*event.Event) if ok == false { @@ -304,7 +325,7 @@ func (s *Service) OnDiscoverServiceEvent(ev event.IEvent){ if event.IsDiscovery { s.discoveryServiceLister.OnDiscoveryService(event.NodeId,event.ServiceName) }else{ - s.discoveryServiceLister.OnUnDiscoveryService(event.NodeId,event.ServiceName) + s.discoveryServiceLister.OnUnDiscoveryService(event.NodeId) } } @@ -387,3 +408,6 @@ func (s *Service) SetGoRoutineNum(goroutineNum int32) bool { s.goroutineNum = goroutineNum return true } + +func (s *Service) OnRetire(){ +} \ No newline at end of file diff --git a/service/servicemgr.go b/service/servicemgr.go index 4f08f2a..c15b5a1 100644 --- a/service/servicemgr.go +++ b/service/servicemgr.go @@ -60,3 +60,9 @@ func StopAllService(){ setupServiceList[i].Stop() } } + +func NotifyAllServiceRetire(){ + for i := len(setupServiceList) - 1; i >= 0; i-- { + setupServiceList[i].SetRetire() + } +} \ No newline at end of file diff --git a/util/math/math.go b/util/math/math.go index b8ffe84..9d50e11 100644 --- a/util/math/math.go +++ b/util/math/math.go @@ -1,6 +1,8 @@ package math -import "github.com/duanhf2012/origin/log" +import ( + "github.com/duanhf2012/origin/log" +) type NumberType interface { int | int8 | int16 | int32 | int64 | float32 | float64 | uint | uint8 | uint16 | uint32 | uint64 @@ -38,41 +40,90 @@ func Abs[NumType SignedNumberType](Num NumType) NumType { return Num } - -func Add[NumType NumberType](number1 NumType, number2 NumType) NumType { +func AddSafe[NumType NumberType](number1 NumType, number2 NumType) (NumType, bool) { ret := number1 + number2 - if number2> 0 && ret < number1 { - log.Stack("Calculation overflow" ,log.Any("number1",number1),log.Any("number2",number2)) - }else if (number2<0 && ret > number1){ - log.Stack("Calculation overflow" ,log.Any("number1",number1),log.Any("number2",number2)) + if number2 > 0 && ret < number1 { + log.Stack("Calculation overflow", log.Any("number1", number1), log.Any("number2", number2)) + return ret, false + } else if number2 < 0 && ret > number1 { + log.Stack("Calculation overflow", log.Any("number1", number1), log.Any("number2", number2)) + return ret, false } + return ret, true +} + +func SubSafe[NumType NumberType](number1 NumType, number2 NumType) (NumType, bool) { + ret := number1 - number2 + if number2 > 0 && ret > number1 { + log.Stack("Calculation overflow", log.Any("number1", number1), log.Any("number2", number2)) + return ret, false + } else if number2 < 0 && ret < number1 { + log.Stack("Calculation overflow", log.Any("number1", number1), log.Any("number2", number2)) + return ret, false + } + + return ret, true +} + +func MulSafe[NumType NumberType](number1 NumType, number2 NumType) (NumType, bool) { + ret := number1 * number2 + if number1 == 0 || number2 == 0 { + return ret, true + } + + if ret/number2 == number1 { + return ret, true + } + + log.Stack("Calculation overflow", log.Any("number1", number1), log.Any("number2", number2)) + return ret, true +} + +func Add[NumType NumberType](number1 NumType, number2 NumType) NumType { + ret, _ := AddSafe(number1, number2) return ret } func Sub[NumType NumberType](number1 NumType, number2 NumType) NumType { - ret := number1 - number2 - if number2> 0 && ret > number1 { - log.Stack("Calculation overflow" ,log.Any("number1",number1),log.Any("number2",number2)) - }else if (number2<0 && ret < number1){ - log.Stack("Calculation overflow" ,log.Any("number1",number1),log.Any("number2",number2)) - } - + ret, _ := SubSafe(number1, number2) return ret } - func Mul[NumType NumberType](number1 NumType, number2 NumType) NumType { - ret := number1 * number2 - if number1 == 0 || number2 == 0 { - return ret - } - - if ret / number2 == number1 { - return ret - } - - log.Stack("Calculation overflow" ,log.Any("number1",number1),log.Any("number2",number2)) + ret, _ := MulSafe(number1, number2) return ret } +// 安全的求比例 +func PercentRateSafe[NumType NumberType, OutNumType NumberType](maxValue int64, rate NumType, numbers ...NumType) (OutNumType, bool) { + // 比例不能为负数 + if rate < 0 { + log.Stack("rate must not positive") + return 0, false + } + + if rate == 0 { + // 比例为0 + return 0, true + } + + ret := int64(rate) + for _, number := range numbers { + number64 := int64(number) + result, success := MulSafe(number64, ret) + if !success { + // 基数*比例越界了,int64都越界了,没办法了 + return 0, false + } + + ret = result + } + + ret = ret / 10000 + if ret > maxValue { + return 0, false + } + + return OutNumType(ret), true +}