Compare commits

...

6 Commits

Author SHA1 Message Date
duanhf2012
39b862e3d9 新增IsNodeRetire函数,用于判断NodeId是否为Retire状态 2023-10-09 16:57:05 +08:00
duanhf2012
8c9b796fce 优化日志输出 2023-10-09 16:50:09 +08:00
duanhf2012
c0971a46a7 新增retire命令参数用于将置于服务退休状态,使得服务软关闭 2023-10-09 15:27:01 +08:00
duanhf2012
ba019ac466 优化服务发现README文档 2023-09-24 13:53:18 +08:00
duanhf2012
c803b9b9ad 优化tcpservice 2023-09-22 16:29:31 +08:00
duanhf2012
3f52ea8331 优化筛选服务发现---支持对指定主master结点进行筛选 2023-09-22 15:43:41 +08:00
18 changed files with 566 additions and 268 deletions

View File

@@ -19,7 +19,7 @@ Hello world!
```go
go get -v -u github.com/duanhf2012/origin
```
[README.md](README.md)
于是下载到GOPATH环境目录中,在src中加入main.go,内容如下:
```go
@@ -858,8 +858,7 @@ origin引擎默认使用读取所有结点配置的进行确认结点有哪些Se
"MasterDiscoveryNode": [{
"NodeId": 2,
"ListenAddr": "127.0.0.1:10001",
"MaxRpcParamLen": 409600,
"NeighborService":["HttpGateService"]
"MaxRpcParamLen": 409600
},
{
"NodeId": 1,
@@ -876,17 +875,18 @@ origin引擎默认使用读取所有结点配置的进行确认结点有哪些Se
"Private": false,
"remark": "//以_打头的表示只在本机进程不对整个子网开发",
"ServiceList": ["_TestService1", "TestService9", "TestService10"],
"DiscoveryService": ["TestService8"]
"MasterDiscoveryService": [
{
"MasterNodeId": 2,
"DiscoveryService": ["TestService8"]
}
]
}]
}
```
MasterDiscoveryNode: 配置了结点Id为1的服务发现Master他的监听地址ListenAddr为127.0.0.1:8801结点为2的也是一个服务发现Master。NodeId为1的结点会从结点为1和2的网络中发现服务。
新上有两新不同的字段分别为MasterDiscoveryNode与DiscoveryService。其中:
MasterDiscoveryNode中配置了结点Id为1的服务发现Master他的监听地址ListenAddr为127.0.0.1:8801结点为2的也是一个服务发现Master不同在于多了"NeighborService":["HttpGateService"]配置。如果"NeighborService"有配置具体的服务时则表示该结点是一个邻居Master结点。当前运行的Node结点会从该Master结点上筛选HttpGateService的服务并且当前运行的Node结点不会向上同步本地所有公开的服务和邻居结点关系是单向的。
NeighborService可以用在当有多个以Master中心结点的网络发现跨网络的服务场景。
DiscoveryService表示将筛选origin网络中的TestService8服务注意如果DiscoveryService不配置则筛选功能不生效。
MasterDiscoveryService: 表示将筛选origin网络中MasterNodeId为2中的TestService8服务注意如果MasterDiscoveryService不配置则筛选功能不生效。MasterNodeId也可以填为0表示NodeId为1的结点在所有网络中只发现TestService8的服务。
第八章HttpService使用
-----------------------

View File

@@ -20,6 +20,11 @@ const (
Discard NodeStatus = 1 //丢弃
)
type MasterDiscoveryService struct {
MasterNodeId int32 //要筛选的主结点Id如果不配置或者配置成0表示针对所有的主结点
DiscoveryService []string //只发现的服务列表
}
type NodeInfo struct {
NodeId int
NodeName string
@@ -29,9 +34,9 @@ type NodeInfo struct {
CompressBytesLen int //超过字节进行压缩的长度
ServiceList []string //所有的有序服务列表
PublicServiceList []string //对外公开的服务列表
DiscoveryService []string //筛选发现的服务,如果不配置,不进行筛选
NeighborService []string
MasterDiscoveryService []MasterDiscoveryService //筛选发现的服务,如果不配置,不进行筛选
status NodeStatus
Retire bool
}
type NodeRpcInfo struct {
@@ -51,8 +56,8 @@ type Cluster struct {
locker sync.RWMutex //结点与服务关系保护锁
mapRpc map[int]NodeRpcInfo //nodeId
mapIdNode map[int]NodeInfo //map[NodeId]NodeInfo
mapRpc map[int]*NodeRpcInfo //nodeId
//mapIdNode map[int]NodeInfo //map[NodeId]NodeInfo
mapServiceNode map[string]map[int]struct{} //map[serviceName]map[NodeId]
rpcServer rpc.Server
@@ -83,10 +88,11 @@ func (cls *Cluster) Stop() {
func (cls *Cluster) DiscardNode(nodeId int) {
cls.locker.Lock()
nodeInfo, ok := cls.mapIdNode[nodeId]
nodeInfo, ok := cls.mapRpc[nodeId]
bDel := (ok == true) && nodeInfo.nodeInfo.status == Discard
cls.locker.Unlock()
if ok == true && nodeInfo.status == Discard {
if bDel {
cls.DelNode(nodeId, true)
}
}
@@ -99,39 +105,30 @@ func (cls *Cluster) DelNode(nodeId int, immediately bool) {
cls.locker.Lock()
defer cls.locker.Unlock()
nodeInfo, ok := cls.mapIdNode[nodeId]
rpc, ok := cls.mapRpc[nodeId]
if ok == false {
return
}
rpc, ok := cls.mapRpc[nodeId]
for {
//立即删除
if immediately || ok == false {
break
}
if immediately ==false {
//正在连接中不主动断开,只断开没有连接中的
if rpc.client.IsConnected() {
nodeInfo.status = Discard
log.Info("Discard node",log.Int("nodeId",nodeInfo.NodeId),log.String("ListenAddr", nodeInfo.ListenAddr))
rpc.nodeInfo.status = Discard
log.Info("Discard node",log.Int("nodeId",rpc.nodeInfo.NodeId),log.String("ListenAddr", rpc.nodeInfo.ListenAddr))
return
}
break
}
for _, serviceName := range nodeInfo.ServiceList {
for _, serviceName := range rpc.nodeInfo.ServiceList {
cls.delServiceNode(serviceName, nodeId)
}
delete(cls.mapIdNode, nodeId)
delete(cls.mapRpc, nodeId)
if ok == true {
rpc.client.Close(false)
}
log.Info("remove node ",log.Int("NodeId", nodeInfo.NodeId),log.String("ListenAddr", nodeInfo.ListenAddr))
log.Info("remove node ",log.Int("NodeId", rpc.nodeInfo.NodeId),log.String("ListenAddr", rpc.nodeInfo.ListenAddr))
}
func (cls *Cluster) serviceDiscoveryDelNode(nodeId int, immediately bool) {
@@ -164,9 +161,9 @@ func (cls *Cluster) serviceDiscoverySetNodeInfo(nodeInfo *NodeInfo) {
defer cls.locker.Unlock()
//先清一次的NodeId对应的所有服务清理
lastNodeInfo, ok := cls.mapIdNode[nodeInfo.NodeId]
lastNodeInfo, ok := cls.mapRpc[nodeInfo.NodeId]
if ok == true {
for _, serviceName := range lastNodeInfo.ServiceList {
for _, serviceName := range lastNodeInfo.nodeInfo.ServiceList {
cls.delServiceNode(serviceName, nodeInfo.NodeId)
}
}
@@ -185,27 +182,22 @@ func (cls *Cluster) serviceDiscoverySetNodeInfo(nodeInfo *NodeInfo) {
}
cls.mapServiceNode[serviceName][nodeInfo.NodeId] = struct{}{}
}
cls.mapIdNode[nodeInfo.NodeId] = *nodeInfo
log.Info("Discovery nodeId",log.Int("NodeId", nodeInfo.NodeId),log.Any("services:", nodeInfo.PublicServiceList))
//已经存在连接,则不需要进行设置
if _, rpcInfoOK := cls.mapRpc[nodeInfo.NodeId]; rpcInfoOK == true {
if lastNodeInfo != nil {
log.Info("Discovery nodeId",log.Int("NodeId", nodeInfo.NodeId),log.Any("services:", nodeInfo.PublicServiceList),log.Bool("Retire",nodeInfo.Retire))
lastNodeInfo.nodeInfo = *nodeInfo
return
}
//不存在时,则建立连接
rpcInfo := NodeRpcInfo{}
rpcInfo.nodeInfo = *nodeInfo
rpcInfo.client =rpc.NewRClient(nodeInfo.NodeId, nodeInfo.ListenAddr, nodeInfo.MaxRpcParamLen,cls.localNodeInfo.CompressBytesLen,cls.triggerRpcEvent)
cls.mapRpc[nodeInfo.NodeId] = rpcInfo
cls.mapRpc[nodeInfo.NodeId] = &rpcInfo
log.Info("Discovery nodeId and new rpc client",log.Int("NodeId", nodeInfo.NodeId),log.Any("services:", nodeInfo.PublicServiceList),log.Bool("Retire",nodeInfo.Retire),log.String("nodeListenAddr",nodeInfo.ListenAddr))
}
func (cls *Cluster) buildLocalRpc() {
rpcInfo := NodeRpcInfo{}
rpcInfo.nodeInfo = cls.localNodeInfo
rpcInfo.client = rpc.NewLClient(rpcInfo.nodeInfo.NodeId)
cls.mapRpc[cls.localNodeInfo.NodeId] = rpcInfo
}
func (cls *Cluster) Init(localNodeId int, setupServiceFun SetupServiceFun) error {
//1.初始化配置
@@ -215,7 +207,6 @@ func (cls *Cluster) Init(localNodeId int, setupServiceFun SetupServiceFun) error
}
cls.rpcServer.Init(cls)
cls.buildLocalRpc()
//2.安装服务发现结点
cls.SetupServiceDiscovery(localNodeId, setupServiceFun)
@@ -309,27 +300,33 @@ func (cls *Cluster) FindRpcHandler(serviceName string) rpc.IRpcHandler {
return pService.GetRpcHandler()
}
func (cls *Cluster) getRpcClient(nodeId int) *rpc.Client {
func (cls *Cluster) getRpcClient(nodeId int) (*rpc.Client,bool) {
c, ok := cls.mapRpc[nodeId]
if ok == false {
return nil
return nil,false
}
return c.client
return c.client,c.nodeInfo.Retire
}
func (cls *Cluster) GetRpcClient(nodeId int) *rpc.Client {
func (cls *Cluster) GetRpcClient(nodeId int) (*rpc.Client,bool) {
cls.locker.RLock()
defer cls.locker.RUnlock()
return cls.getRpcClient(nodeId)
}
func GetRpcClient(nodeId int, serviceMethod string, clientList []*rpc.Client) (error, int) {
func GetRpcClient(nodeId int, serviceMethod string,filterRetire bool, clientList []*rpc.Client) (error, int) {
if nodeId > 0 {
pClient := GetCluster().GetRpcClient(nodeId)
pClient,retire := GetCluster().GetRpcClient(nodeId)
if pClient == nil {
return fmt.Errorf("cannot find nodeid %d!", nodeId), 0
}
//如果需要筛选掉退休结点
if filterRetire == true && retire == true {
return fmt.Errorf("cannot find nodeid %d!", nodeId), 0
}
clientList[0] = pClient
return nil, 1
}
@@ -341,7 +338,7 @@ func GetRpcClient(nodeId int, serviceMethod string, clientList []*rpc.Client) (e
serviceName := serviceMethod[:findIndex]
//1.找到对应的rpcNodeid
return GetCluster().GetNodeIdByService(serviceName, clientList, true)
return GetCluster().GetNodeIdByService(serviceName, clientList, filterRetire)
}
func GetRpcServer() *rpc.Server {
@@ -349,10 +346,19 @@ func GetRpcServer() *rpc.Server {
}
func (cls *Cluster) IsNodeConnected(nodeId int) bool {
pClient := cls.GetRpcClient(nodeId)
pClient,_ := cls.GetRpcClient(nodeId)
return pClient != nil && pClient.IsConnected()
}
func (cls *Cluster) IsNodeRetire(nodeId int) bool {
cls.locker.RLock()
defer cls.locker.RUnlock()
_,retire :=cls.getRpcClient(nodeId)
return retire
}
func (cls *Cluster) triggerRpcEvent(bConnect bool, clientId uint32, nodeId int) {
cls.locker.Lock()
nodeInfo, ok := cls.mapRpc[nodeId]
@@ -471,11 +477,3 @@ func (cls *Cluster) GetGlobalCfg() interface{} {
return cls.globalCfg
}
func (cls *Cluster) GetNodeInfo(nodeId int) (NodeInfo,bool) {
cls.locker.RLock()
defer cls.locker.RUnlock()
nodeInfo,ok:= cls.mapIdNode[nodeId]
return nodeInfo,ok
}

View File

@@ -7,6 +7,7 @@ import (
"github.com/duanhf2012/origin/service"
"time"
"github.com/duanhf2012/origin/util/timer"
"google.golang.org/protobuf/proto"
)
const DynamicDiscoveryMasterName = "DiscoveryMaster"
@@ -14,6 +15,7 @@ const DynamicDiscoveryClientName = "DiscoveryClient"
const RegServiceDiscover = DynamicDiscoveryMasterName + ".RPC_RegServiceDiscover"
const SubServiceDiscover = DynamicDiscoveryClientName + ".RPC_SubServiceDiscover"
const AddSubServiceDiscover = DynamicDiscoveryMasterName + ".RPC_AddSubServiceDiscover"
const NodeRetireRpcMethod = DynamicDiscoveryMasterName+".RPC_NodeRetire"
type DynamicDiscoveryMaster struct {
service.Service
@@ -30,6 +32,7 @@ type DynamicDiscoveryClient struct {
localNodeId int
mapDiscovery map[int32]map[int32]struct{} //map[masterNodeId]map[nodeId]struct{}
bRetire bool
}
var masterService DynamicDiscoveryMaster
@@ -49,16 +52,32 @@ func (ds *DynamicDiscoveryMaster) isRegNode(nodeId int32) bool {
return ok
}
func (ds *DynamicDiscoveryMaster) addNodeInfo(nodeInfo *rpc.NodeInfo) {
if len(nodeInfo.PublicServiceList) == 0 {
func (ds *DynamicDiscoveryMaster) updateNodeInfo(nInfo *rpc.NodeInfo) {
if _,ok:= ds.mapNodeInfo[nInfo.NodeId];ok == false {
return
}
_, ok := ds.mapNodeInfo[nodeInfo.NodeId]
nodeInfo := proto.Clone(nInfo).(*rpc.NodeInfo)
for i:=0;i<len(ds.nodeInfo);i++ {
if ds.nodeInfo[i].NodeId == nodeInfo.NodeId {
ds.nodeInfo[i] = nodeInfo
break
}
}
}
func (ds *DynamicDiscoveryMaster) addNodeInfo(nInfo *rpc.NodeInfo) {
if len(nInfo.PublicServiceList) == 0 {
return
}
_, ok := ds.mapNodeInfo[nInfo.NodeId]
if ok == true {
return
}
ds.mapNodeInfo[nodeInfo.NodeId] = struct{}{}
ds.mapNodeInfo[nInfo.NodeId] = struct{}{}
nodeInfo := proto.Clone(nInfo).(*rpc.NodeInfo)
ds.nodeInfo = append(ds.nodeInfo, nodeInfo)
}
@@ -87,16 +106,14 @@ func (ds *DynamicDiscoveryMaster) OnInit() error {
func (ds *DynamicDiscoveryMaster) OnStart() {
var nodeInfo rpc.NodeInfo
localNodeInfo := cluster.GetLocalNodeInfo()
if localNodeInfo.Private == true {
return
}
nodeInfo.NodeId = int32(localNodeInfo.NodeId)
nodeInfo.NodeName = localNodeInfo.NodeName
nodeInfo.ListenAddr = localNodeInfo.ListenAddr
nodeInfo.PublicServiceList = localNodeInfo.PublicServiceList
nodeInfo.MaxRpcParamLen = localNodeInfo.MaxRpcParamLen
nodeInfo.Private = localNodeInfo.Private
nodeInfo.Retire = localNodeInfo.Retire
ds.addNodeInfo(&nodeInfo)
}
@@ -138,6 +155,19 @@ func (ds *DynamicDiscoveryMaster) RpcCastGo(serviceMethod string, args interface
}
}
func (ds *DynamicDiscoveryMaster) RPC_NodeRetire(req *rpc.NodeRetireReq, res *rpc.Empty) error {
log.Info("node is retire",log.Int32("nodeId",req.NodeInfo.NodeId),log.Bool("retire",req.NodeInfo.Retire))
ds.updateNodeInfo(req.NodeInfo)
var notifyDiscover rpc.SubscribeDiscoverNotify
notifyDiscover.MasterNodeId = int32(cluster.GetLocalNodeInfo().NodeId)
notifyDiscover.NodeInfo = append(notifyDiscover.NodeInfo, req.NodeInfo)
ds.RpcCastGo(SubServiceDiscover, &notifyDiscover)
return nil
}
// 收到注册过来的结点
func (ds *DynamicDiscoveryMaster) RPC_RegServiceDiscover(req *rpc.ServiceDiscoverReq, res *rpc.Empty) error {
if req.NodeInfo == nil {
@@ -165,6 +195,8 @@ func (ds *DynamicDiscoveryMaster) RPC_RegServiceDiscover(req *rpc.ServiceDiscove
nodeInfo.PublicServiceList = req.NodeInfo.PublicServiceList
nodeInfo.ListenAddr = req.NodeInfo.ListenAddr
nodeInfo.MaxRpcParamLen = req.NodeInfo.MaxRpcParamLen
nodeInfo.Retire = req.NodeInfo.Retire
//主动删除已经存在的结点,确保先断开,再连接
cluster.serviceDiscoveryDelNode(nodeInfo.NodeId, true)
@@ -248,15 +280,6 @@ func (dc *DynamicDiscoveryClient) fullCompareDiffNode(masterNodeId int32, mapNod
//订阅发现的服务通知
func (dc *DynamicDiscoveryClient) RPC_SubServiceDiscover(req *rpc.SubscribeDiscoverNotify) error {
//整理当前master结点需要筛选的NeighborService
masterDiscoveryNodeInfo := cluster.GetMasterDiscoveryNodeInfo(int(req.MasterNodeId))
mapMasterDiscoveryService := map[string]struct{}{}
if masterDiscoveryNodeInfo != nil {
for i := 0; i < len(masterDiscoveryNodeInfo.NeighborService); i++ {
mapMasterDiscoveryService[masterDiscoveryNodeInfo.NeighborService[i]] = struct{}{}
}
}
mapNodeInfo := map[int32]*rpc.NodeInfo{}
for _, nodeInfo := range req.NodeInfo {
//不对本地结点或者不存在任何公开服务的结点
@@ -271,13 +294,6 @@ func (dc *DynamicDiscoveryClient) RPC_SubServiceDiscover(req *rpc.SubscribeDisco
//遍历所有的公开服务,并筛选之
for _, serviceName := range nodeInfo.PublicServiceList {
//只有存在配置时才做筛选
if len(mapMasterDiscoveryService) > 0 {
if _, ok := mapMasterDiscoveryService[serviceName]; ok == false {
continue
}
}
nInfo := mapNodeInfo[nodeInfo.NodeId]
if nInfo == nil {
nInfo = &rpc.NodeInfo{}
@@ -285,6 +301,9 @@ func (dc *DynamicDiscoveryClient) RPC_SubServiceDiscover(req *rpc.SubscribeDisco
nInfo.NodeName = nodeInfo.NodeName
nInfo.ListenAddr = nodeInfo.ListenAddr
nInfo.MaxRpcParamLen = nodeInfo.MaxRpcParamLen
nInfo.Retire = nodeInfo.Retire
nInfo.Private = nodeInfo.Private
mapNodeInfo[nodeInfo.NodeId] = nInfo
}
@@ -294,7 +313,6 @@ func (dc *DynamicDiscoveryClient) RPC_SubServiceDiscover(req *rpc.SubscribeDisco
//如果为完整同步,则找出差异的结点
var willDelNodeId []int32
//如果不是邻居结点,则做筛选
if req.IsFull == true {
diffNode := dc.fullCompareDiffNode(req.MasterNodeId, mapNodeInfo)
if len(diffNode) > 0 {
@@ -309,8 +327,7 @@ func (dc *DynamicDiscoveryClient) RPC_SubServiceDiscover(req *rpc.SubscribeDisco
//删除不必要的结点
for _, nodeId := range willDelNodeId {
nodeInfo,_ := cluster.GetNodeInfo(int(nodeId))
cluster.TriggerDiscoveryEvent(false,int(nodeId),nodeInfo.PublicServiceList)
cluster.TriggerDiscoveryEvent(false,int(nodeId),nil)
dc.removeMasterNode(req.MasterNodeId, int32(nodeId))
if dc.findNodeId(nodeId) == false {
dc.funDelService(int(nodeId), false)
@@ -319,10 +336,8 @@ func (dc *DynamicDiscoveryClient) RPC_SubServiceDiscover(req *rpc.SubscribeDisco
//设置新结点
for _, nodeInfo := range mapNodeInfo {
dc.addMasterNode(req.MasterNodeId, nodeInfo.NodeId)
dc.setNodeInfo(nodeInfo)
if len(nodeInfo.PublicServiceList) == 0 {
bSet := dc.setNodeInfo(req.MasterNodeId,nodeInfo)
if bSet == false {
continue
}
@@ -346,6 +361,29 @@ func (dc *DynamicDiscoveryClient) OnNodeConnected(nodeId int) {
dc.regServiceDiscover(nodeId)
}
func (dc *DynamicDiscoveryClient) OnRetire(){
dc.bRetire = true
masterNodeList := cluster.GetDiscoveryNodeList()
for i:=0;i<len(masterNodeList);i++{
var nodeRetireReq rpc.NodeRetireReq
nodeRetireReq.NodeInfo = &rpc.NodeInfo{}
nodeRetireReq.NodeInfo.NodeId = int32(cluster.localNodeInfo.NodeId)
nodeRetireReq.NodeInfo.NodeName = cluster.localNodeInfo.NodeName
nodeRetireReq.NodeInfo.ListenAddr = cluster.localNodeInfo.ListenAddr
nodeRetireReq.NodeInfo.MaxRpcParamLen = cluster.localNodeInfo.MaxRpcParamLen
nodeRetireReq.NodeInfo.PublicServiceList = cluster.localNodeInfo.PublicServiceList
nodeRetireReq.NodeInfo.Retire = dc.bRetire
nodeRetireReq.NodeInfo.Private = cluster.localNodeInfo.Private
err := dc.GoNode(int(masterNodeList[i].NodeId),NodeRetireRpcMethod,&nodeRetireReq)
if err!= nil {
log.Error("call "+NodeRetireRpcMethod+" is fail",log.ErrorAttr("err",err))
}
}
}
func (dc *DynamicDiscoveryClient) regServiceDiscover(nodeId int){
nodeInfo := cluster.GetMasterDiscoveryNodeInfo(nodeId)
if nodeInfo == nil {
@@ -358,13 +396,9 @@ func (dc *DynamicDiscoveryClient) regServiceDiscover(nodeId int){
req.NodeInfo.NodeName = cluster.localNodeInfo.NodeName
req.NodeInfo.ListenAddr = cluster.localNodeInfo.ListenAddr
req.NodeInfo.MaxRpcParamLen = cluster.localNodeInfo.MaxRpcParamLen
//MasterDiscoveryNode配置中没有配置NeighborService则同步当前结点所有服务
if len(nodeInfo.NeighborService) == 0 {
req.NodeInfo.PublicServiceList = cluster.localNodeInfo.PublicServiceList
} else {
req.NodeInfo.PublicServiceList = append(req.NodeInfo.PublicServiceList, DynamicDiscoveryClientName)
}
req.NodeInfo.PublicServiceList = cluster.localNodeInfo.PublicServiceList
req.NodeInfo.Retire = dc.bRetire
req.NodeInfo.Private = cluster.localNodeInfo.Private
//向Master服务同步本Node服务信息
err := dc.AsyncCallNode(nodeId, RegServiceDiscover, &req, func(res *rpc.Empty, err error) {
@@ -382,37 +416,56 @@ func (dc *DynamicDiscoveryClient) regServiceDiscover(nodeId int){
}
}
func (dc *DynamicDiscoveryClient) setNodeInfo(nodeInfo *rpc.NodeInfo) {
if nodeInfo == nil || nodeInfo.Private == true || int(nodeInfo.NodeId) == dc.localNodeId {
return
}
func (dc *DynamicDiscoveryClient) canDiscoveryService(fromMasterNodeId int32,serviceName string) bool{
canDiscovery := true
//筛选关注的服务
localNodeInfo := cluster.GetLocalNodeInfo()
if len(localNodeInfo.DiscoveryService) > 0 {
var discoverServiceSlice = make([]string, 0, 24)
for _, pubService := range nodeInfo.PublicServiceList {
for _, discoverService := range localNodeInfo.DiscoveryService {
if pubService == discoverService {
discoverServiceSlice = append(discoverServiceSlice, pubService)
for i:=0;i<len(cluster.GetLocalNodeInfo().MasterDiscoveryService);i++{
masterNodeId := cluster.GetLocalNodeInfo().MasterDiscoveryService[i].MasterNodeId
if masterNodeId == fromMasterNodeId || masterNodeId == 0 {
canDiscovery = false
for _,discoveryService := range cluster.GetLocalNodeInfo().MasterDiscoveryService[i].DiscoveryService {
if discoveryService == serviceName {
return true
}
}
}
nodeInfo.PublicServiceList = discoverServiceSlice
}
if len(nodeInfo.PublicServiceList) == 0 {
return
return canDiscovery
}
func (dc *DynamicDiscoveryClient) setNodeInfo(masterNodeId int32,nodeInfo *rpc.NodeInfo) bool{
if nodeInfo == nil || nodeInfo.Private == true || int(nodeInfo.NodeId) == dc.localNodeId {
return false
}
//筛选关注的服务
var discoverServiceSlice = make([]string, 0, 24)
for _, pubService := range nodeInfo.PublicServiceList {
if dc.canDiscoveryService(masterNodeId,pubService) == true {
discoverServiceSlice = append(discoverServiceSlice,pubService)
}
}
if len(discoverServiceSlice) == 0 {
return false
}
var nInfo NodeInfo
nInfo.ServiceList = nodeInfo.PublicServiceList
nInfo.PublicServiceList = nodeInfo.PublicServiceList
nInfo.ServiceList = discoverServiceSlice
nInfo.PublicServiceList = discoverServiceSlice
nInfo.NodeId = int(nodeInfo.NodeId)
nInfo.NodeName = nodeInfo.NodeName
nInfo.ListenAddr = nodeInfo.ListenAddr
nInfo.MaxRpcParamLen = nodeInfo.MaxRpcParamLen
nInfo.Retire = nodeInfo.Retire
nInfo.Private = nodeInfo.Private
dc.funSetService(&nInfo)
return true
}
func (dc *DynamicDiscoveryClient) OnNodeDisconnect(nodeId int) {

View File

@@ -199,7 +199,11 @@ func (cls *Cluster) readLocalService(localNodeId int) error {
}
func (cls *Cluster) parseLocalCfg() {
cls.mapIdNode[cls.localNodeInfo.NodeId] = cls.localNodeInfo
rpcInfo := NodeRpcInfo{}
rpcInfo.nodeInfo = cls.localNodeInfo
rpcInfo.client = rpc.NewLClient(rpcInfo.nodeInfo.NodeId)
cls.mapRpc[cls.localNodeInfo.NodeId] = &rpcInfo
for _, sName := range cls.localNodeInfo.ServiceList {
if _, ok := cls.mapServiceNode[sName]; ok == false {
@@ -225,8 +229,7 @@ func (cls *Cluster) checkDiscoveryNodeList(discoverMasterNode []NodeInfo) bool {
func (cls *Cluster) InitCfg(localNodeId int) error {
cls.localServiceCfg = map[string]interface{}{}
cls.mapRpc = map[int]NodeRpcInfo{}
cls.mapIdNode = map[int]NodeInfo{}
cls.mapRpc = map[int]*NodeRpcInfo{}
cls.mapServiceNode = map[string]map[int]struct{}{}
//加载本地结点的NodeList配置
@@ -263,17 +266,24 @@ func (cls *Cluster) IsConfigService(serviceName string) bool {
return ok
}
func (cls *Cluster) GetNodeIdByService(serviceName string, rpcClientList []*rpc.Client, bAll bool) (error, int) {
func (cls *Cluster) GetNodeIdByService(serviceName string, rpcClientList []*rpc.Client, filterRetire bool) (error, int) {
cls.locker.RLock()
defer cls.locker.RUnlock()
mapNodeId, ok := cls.mapServiceNode[serviceName]
count := 0
if ok == true {
for nodeId, _ := range mapNodeId {
pClient := GetCluster().getRpcClient(nodeId)
if pClient == nil || (bAll == false && pClient.IsConnected() == false) {
pClient,retire := GetCluster().getRpcClient(nodeId)
if pClient == nil || pClient.IsConnected() == false {
continue
}
//如果需要筛选掉退休的对retire状态的结点略过
if filterRetire == true && retire == true {
continue
}
rpcClientList[count] = pClient
count++
if count >= cap(rpcClientList) {

View File

@@ -7,16 +7,15 @@ const (
ServiceRpcRequestEvent EventType = -1
ServiceRpcResponseEvent EventType = -2
Sys_Event_Tcp EventType = -3
Sys_Event_Http_Event EventType = -4
Sys_Event_WebSocket EventType = -5
Sys_Event_Node_Event EventType = -6
Sys_Event_DiscoverService EventType = -7
Sys_Event_DiscardGoroutine EventType = -8
Sys_Event_QueueTaskFinish EventType = -9
Sys_Event_User_Define EventType = 1
Sys_Event_Tcp EventType = -3
Sys_Event_Http_Event EventType = -4
Sys_Event_WebSocket EventType = -5
Sys_Event_Node_Event EventType = -6
Sys_Event_DiscoverService EventType = -7
Sys_Event_DiscardGoroutine EventType = -8
Sys_Event_QueueTaskFinish EventType = -9
Sys_Event_Retire EventType = -10
Sys_Event_User_Define EventType = 1
)

View File

@@ -28,6 +28,10 @@ var profilerInterval time.Duration
var bValid bool
var configDir = "./config/"
const(
SingleStop syscall.Signal = 10
SignalRetire syscall.Signal = 12
)
type BuildOSType = int8
@@ -38,13 +42,14 @@ const(
)
func init() {
sig = make(chan os.Signal, 3)
signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM, syscall.Signal(10))
sig = make(chan os.Signal, 4)
signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM, SingleStop,SignalRetire)
console.RegisterCommandBool("help", false, "<-help> This help.", usage)
console.RegisterCommandString("name", "", "<-name nodeName> Node's name.", setName)
console.RegisterCommandString("start", "", "<-start nodeid=nodeid> Run originserver.", startNode)
console.RegisterCommandString("stop", "", "<-stop nodeid=nodeid> Stop originserver process.", stopNode)
console.RegisterCommandString("retire", "", "<-retire nodeid=nodeid> retire originserver process.", retireNode)
console.RegisterCommandString("config", "", "<-config path> Configuration file path.", setConfigPath)
console.RegisterCommandString("console", "", "<-console true|false> Turn on or off screen log output.", openConsole)
console.RegisterCommandString("loglevel", "debug", "<-loglevel debug|release|warning|error|fatal> Set loglevel.", setLevel)
@@ -54,6 +59,11 @@ func init() {
console.RegisterCommandString("pprof", "", "<-pprof ip:port> Open performance analysis.", setPprof)
}
func notifyAllServiceRetire(){
service.NotifyAllServiceRetire()
}
func usage(val interface{}) error {
ret := val.(bool)
if ret == false {
@@ -201,6 +211,36 @@ func Start() {
}
}
func retireNode(args interface{}) error {
//1.解析参数
param := args.(string)
if param == "" {
return nil
}
sParam := strings.Split(param, "=")
if len(sParam) != 2 {
return fmt.Errorf("invalid option %s", param)
}
if sParam[0] != "nodeid" {
return fmt.Errorf("invalid option %s", param)
}
nId, err := strconv.Atoi(sParam[1])
if err != nil {
return fmt.Errorf("invalid option %s", param)
}
processId, err := getRunProcessPid(nId)
if err != nil {
return err
}
RetireProcess(processId)
return nil
}
func stopNode(args interface{}) error {
//1.解析参数
param := args.(string)
@@ -215,12 +255,12 @@ func stopNode(args interface{}) error {
if sParam[0] != "nodeid" {
return fmt.Errorf("invalid option %s", param)
}
nodeId, err := strconv.Atoi(sParam[1])
nId, err := strconv.Atoi(sParam[1])
if err != nil {
return fmt.Errorf("invalid option %s", param)
}
processId, err := getRunProcessPid(nodeId)
processId, err := getRunProcessPid(nId)
if err != nil {
return err
}
@@ -268,15 +308,23 @@ func startNode(args interface{}) error {
if profilerInterval > 0 {
pProfilerTicker = time.NewTicker(profilerInterval)
}
for bRun {
select {
case <-sig:
log.Info("receipt stop signal.")
bRun = false
case s := <-sig:
signal := s.(syscall.Signal)
if signal == SignalRetire {
log.Info("receipt retire signal.")
notifyAllServiceRetire()
}else {
bRun = false
log.Info("receipt stop signal.")
}
case <-pProfilerTicker.C:
profiler.Report()
}
}
cluster.GetCluster().Stop()
//7.退出
service.StopAllService()

View File

@@ -8,7 +8,7 @@ import (
)
func KillProcess(processId int){
err := syscall.Kill(processId,syscall.Signal(10))
err := syscall.Kill(processId,SingleStop)
if err != nil {
fmt.Printf("kill processid %d is fail:%+v.\n",processId,err)
}else{
@@ -19,3 +19,12 @@ func KillProcess(processId int){
func GetBuildOSType() BuildOSType{
return Linux
}
func RetireProcess(processId int){
err := syscall.Kill(processId,SignalRetire)
if err != nil {
fmt.Printf("retire processid %d is fail:%+v.\n",processId,err)
}else{
fmt.Printf("retire processid %d is successful.\n",processId)
}
}

View File

@@ -8,7 +8,7 @@ import (
)
func KillProcess(processId int){
err := syscall.Kill(processId,syscall.Signal(10))
err := syscall.Kill(processId,SingleStop)
if err != nil {
fmt.Printf("kill processid %d is fail:%+v.\n",processId,err)
}else{
@@ -19,3 +19,12 @@ func KillProcess(processId int){
func GetBuildOSType() BuildOSType{
return Mac
}
func RetireProcess(processId int){
err := syscall.Kill(processId,SignalRetire)
if err != nil {
fmt.Printf("retire processid %d is fail:%+v.\n",processId,err)
}else{
fmt.Printf("retire processid %d is successful.\n",processId)
}
}

View File

@@ -2,10 +2,28 @@
package node
func KillProcess(processId int){
import (
"os"
"fmt"
)
func KillProcess(processId int){
procss,err := os.FindProcess(processId)
if err != nil {
fmt.Printf("kill processid %d is fail:%+v.\n",processId,err)
return
}
err = procss.Kill()
if err != nil {
fmt.Printf("kill processid %d is fail:%+v.\n",processId,err)
}
}
func GetBuildOSType() BuildOSType{
return Windows
}
func RetireProcess(processId int){
fmt.Printf("This command does not support Windows")
}

View File

@@ -1,8 +1,8 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.31.0
// protoc v3.11.4
// source: test/rpc/dynamicdiscover.proto
// protoc v4.24.0
// source: proto/rpcproto/dynamicdiscover.proto
package rpc
@@ -30,13 +30,14 @@ type NodeInfo struct {
ListenAddr string `protobuf:"bytes,3,opt,name=ListenAddr,proto3" json:"ListenAddr,omitempty"`
MaxRpcParamLen uint32 `protobuf:"varint,4,opt,name=MaxRpcParamLen,proto3" json:"MaxRpcParamLen,omitempty"`
Private bool `protobuf:"varint,5,opt,name=Private,proto3" json:"Private,omitempty"`
PublicServiceList []string `protobuf:"bytes,6,rep,name=PublicServiceList,proto3" json:"PublicServiceList,omitempty"`
Retire bool `protobuf:"varint,6,opt,name=Retire,proto3" json:"Retire,omitempty"`
PublicServiceList []string `protobuf:"bytes,7,rep,name=PublicServiceList,proto3" json:"PublicServiceList,omitempty"`
}
func (x *NodeInfo) Reset() {
*x = NodeInfo{}
if protoimpl.UnsafeEnabled {
mi := &file_test_rpc_dynamicdiscover_proto_msgTypes[0]
mi := &file_proto_rpcproto_dynamicdiscover_proto_msgTypes[0]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
@@ -49,7 +50,7 @@ func (x *NodeInfo) String() string {
func (*NodeInfo) ProtoMessage() {}
func (x *NodeInfo) ProtoReflect() protoreflect.Message {
mi := &file_test_rpc_dynamicdiscover_proto_msgTypes[0]
mi := &file_proto_rpcproto_dynamicdiscover_proto_msgTypes[0]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
@@ -62,7 +63,7 @@ func (x *NodeInfo) ProtoReflect() protoreflect.Message {
// Deprecated: Use NodeInfo.ProtoReflect.Descriptor instead.
func (*NodeInfo) Descriptor() ([]byte, []int) {
return file_test_rpc_dynamicdiscover_proto_rawDescGZIP(), []int{0}
return file_proto_rpcproto_dynamicdiscover_proto_rawDescGZIP(), []int{0}
}
func (x *NodeInfo) GetNodeId() int32 {
@@ -100,6 +101,13 @@ func (x *NodeInfo) GetPrivate() bool {
return false
}
func (x *NodeInfo) GetRetire() bool {
if x != nil {
return x.Retire
}
return false
}
func (x *NodeInfo) GetPublicServiceList() []string {
if x != nil {
return x.PublicServiceList
@@ -119,7 +127,7 @@ type ServiceDiscoverReq struct {
func (x *ServiceDiscoverReq) Reset() {
*x = ServiceDiscoverReq{}
if protoimpl.UnsafeEnabled {
mi := &file_test_rpc_dynamicdiscover_proto_msgTypes[1]
mi := &file_proto_rpcproto_dynamicdiscover_proto_msgTypes[1]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
@@ -132,7 +140,7 @@ func (x *ServiceDiscoverReq) String() string {
func (*ServiceDiscoverReq) ProtoMessage() {}
func (x *ServiceDiscoverReq) ProtoReflect() protoreflect.Message {
mi := &file_test_rpc_dynamicdiscover_proto_msgTypes[1]
mi := &file_proto_rpcproto_dynamicdiscover_proto_msgTypes[1]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
@@ -145,7 +153,7 @@ func (x *ServiceDiscoverReq) ProtoReflect() protoreflect.Message {
// Deprecated: Use ServiceDiscoverReq.ProtoReflect.Descriptor instead.
func (*ServiceDiscoverReq) Descriptor() ([]byte, []int) {
return file_test_rpc_dynamicdiscover_proto_rawDescGZIP(), []int{1}
return file_proto_rpcproto_dynamicdiscover_proto_rawDescGZIP(), []int{1}
}
func (x *ServiceDiscoverReq) GetNodeInfo() *NodeInfo {
@@ -170,7 +178,7 @@ type SubscribeDiscoverNotify struct {
func (x *SubscribeDiscoverNotify) Reset() {
*x = SubscribeDiscoverNotify{}
if protoimpl.UnsafeEnabled {
mi := &file_test_rpc_dynamicdiscover_proto_msgTypes[2]
mi := &file_proto_rpcproto_dynamicdiscover_proto_msgTypes[2]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
@@ -183,7 +191,7 @@ func (x *SubscribeDiscoverNotify) String() string {
func (*SubscribeDiscoverNotify) ProtoMessage() {}
func (x *SubscribeDiscoverNotify) ProtoReflect() protoreflect.Message {
mi := &file_test_rpc_dynamicdiscover_proto_msgTypes[2]
mi := &file_proto_rpcproto_dynamicdiscover_proto_msgTypes[2]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
@@ -196,7 +204,7 @@ func (x *SubscribeDiscoverNotify) ProtoReflect() protoreflect.Message {
// Deprecated: Use SubscribeDiscoverNotify.ProtoReflect.Descriptor instead.
func (*SubscribeDiscoverNotify) Descriptor() ([]byte, []int) {
return file_test_rpc_dynamicdiscover_proto_rawDescGZIP(), []int{2}
return file_proto_rpcproto_dynamicdiscover_proto_rawDescGZIP(), []int{2}
}
func (x *SubscribeDiscoverNotify) GetMasterNodeId() int32 {
@@ -227,6 +235,54 @@ func (x *SubscribeDiscoverNotify) GetNodeInfo() []*NodeInfo {
return nil
}
// Client->Master
type NodeRetireReq struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
NodeInfo *NodeInfo `protobuf:"bytes,1,opt,name=nodeInfo,proto3" json:"nodeInfo,omitempty"`
}
func (x *NodeRetireReq) Reset() {
*x = NodeRetireReq{}
if protoimpl.UnsafeEnabled {
mi := &file_proto_rpcproto_dynamicdiscover_proto_msgTypes[3]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *NodeRetireReq) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*NodeRetireReq) ProtoMessage() {}
func (x *NodeRetireReq) ProtoReflect() protoreflect.Message {
mi := &file_proto_rpcproto_dynamicdiscover_proto_msgTypes[3]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use NodeRetireReq.ProtoReflect.Descriptor instead.
func (*NodeRetireReq) Descriptor() ([]byte, []int) {
return file_proto_rpcproto_dynamicdiscover_proto_rawDescGZIP(), []int{3}
}
func (x *NodeRetireReq) GetNodeInfo() *NodeInfo {
if x != nil {
return x.NodeInfo
}
return nil
}
// Master->Client
type Empty struct {
state protoimpl.MessageState
@@ -237,7 +293,7 @@ type Empty struct {
func (x *Empty) Reset() {
*x = Empty{}
if protoimpl.UnsafeEnabled {
mi := &file_test_rpc_dynamicdiscover_proto_msgTypes[3]
mi := &file_proto_rpcproto_dynamicdiscover_proto_msgTypes[4]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
@@ -250,7 +306,7 @@ func (x *Empty) String() string {
func (*Empty) ProtoMessage() {}
func (x *Empty) ProtoReflect() protoreflect.Message {
mi := &file_test_rpc_dynamicdiscover_proto_msgTypes[3]
mi := &file_proto_rpcproto_dynamicdiscover_proto_msgTypes[4]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
@@ -263,82 +319,89 @@ func (x *Empty) ProtoReflect() protoreflect.Message {
// Deprecated: Use Empty.ProtoReflect.Descriptor instead.
func (*Empty) Descriptor() ([]byte, []int) {
return file_test_rpc_dynamicdiscover_proto_rawDescGZIP(), []int{3}
return file_proto_rpcproto_dynamicdiscover_proto_rawDescGZIP(), []int{4}
}
var File_test_rpc_dynamicdiscover_proto protoreflect.FileDescriptor
var File_proto_rpcproto_dynamicdiscover_proto protoreflect.FileDescriptor
var file_test_rpc_dynamicdiscover_proto_rawDesc = []byte{
0x0a, 0x1e, 0x74, 0x65, 0x73, 0x74, 0x2f, 0x72, 0x70, 0x63, 0x2f, 0x64, 0x79, 0x6e, 0x61, 0x6d,
0x69, 0x63, 0x64, 0x69, 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f,
0x12, 0x03, 0x72, 0x70, 0x63, 0x22, 0xce, 0x01, 0x0a, 0x08, 0x4e, 0x6f, 0x64, 0x65, 0x49, 0x6e,
0x66, 0x6f, 0x12, 0x16, 0x0a, 0x06, 0x4e, 0x6f, 0x64, 0x65, 0x49, 0x64, 0x18, 0x01, 0x20, 0x01,
0x28, 0x05, 0x52, 0x06, 0x4e, 0x6f, 0x64, 0x65, 0x49, 0x64, 0x12, 0x1a, 0x0a, 0x08, 0x4e, 0x6f,
0x64, 0x65, 0x4e, 0x61, 0x6d, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x4e, 0x6f,
0x64, 0x65, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x1e, 0x0a, 0x0a, 0x4c, 0x69, 0x73, 0x74, 0x65, 0x6e,
0x41, 0x64, 0x64, 0x72, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x4c, 0x69, 0x73, 0x74,
0x65, 0x6e, 0x41, 0x64, 0x64, 0x72, 0x12, 0x26, 0x0a, 0x0e, 0x4d, 0x61, 0x78, 0x52, 0x70, 0x63,
0x50, 0x61, 0x72, 0x61, 0x6d, 0x4c, 0x65, 0x6e, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x0e,
0x4d, 0x61, 0x78, 0x52, 0x70, 0x63, 0x50, 0x61, 0x72, 0x61, 0x6d, 0x4c, 0x65, 0x6e, 0x12, 0x18,
0x0a, 0x07, 0x50, 0x72, 0x69, 0x76, 0x61, 0x74, 0x65, 0x18, 0x05, 0x20, 0x01, 0x28, 0x08, 0x52,
0x07, 0x50, 0x72, 0x69, 0x76, 0x61, 0x74, 0x65, 0x12, 0x2c, 0x0a, 0x11, 0x50, 0x75, 0x62, 0x6c,
0x69, 0x63, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x4c, 0x69, 0x73, 0x74, 0x18, 0x06, 0x20,
0x03, 0x28, 0x09, 0x52, 0x11, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x63, 0x53, 0x65, 0x72, 0x76, 0x69,
0x63, 0x65, 0x4c, 0x69, 0x73, 0x74, 0x22, 0x3f, 0x0a, 0x12, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63,
0x65, 0x44, 0x69, 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x52, 0x65, 0x71, 0x12, 0x29, 0x0a, 0x08,
0x6e, 0x6f, 0x64, 0x65, 0x49, 0x6e, 0x66, 0x6f, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0d,
0x2e, 0x72, 0x70, 0x63, 0x2e, 0x4e, 0x6f, 0x64, 0x65, 0x49, 0x6e, 0x66, 0x6f, 0x52, 0x08, 0x6e,
0x6f, 0x64, 0x65, 0x49, 0x6e, 0x66, 0x6f, 0x22, 0x9e, 0x01, 0x0a, 0x17, 0x53, 0x75, 0x62, 0x73,
0x63, 0x72, 0x69, 0x62, 0x65, 0x44, 0x69, 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x4e, 0x6f, 0x74,
0x69, 0x66, 0x79, 0x12, 0x22, 0x0a, 0x0c, 0x4d, 0x61, 0x73, 0x74, 0x65, 0x72, 0x4e, 0x6f, 0x64,
0x65, 0x49, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x05, 0x52, 0x0c, 0x4d, 0x61, 0x73, 0x74, 0x65,
0x72, 0x4e, 0x6f, 0x64, 0x65, 0x49, 0x64, 0x12, 0x16, 0x0a, 0x06, 0x49, 0x73, 0x46, 0x75, 0x6c,
0x6c, 0x18, 0x02, 0x20, 0x01, 0x28, 0x08, 0x52, 0x06, 0x49, 0x73, 0x46, 0x75, 0x6c, 0x6c, 0x12,
0x1c, 0x0a, 0x09, 0x44, 0x65, 0x6c, 0x4e, 0x6f, 0x64, 0x65, 0x49, 0x64, 0x18, 0x03, 0x20, 0x01,
0x28, 0x05, 0x52, 0x09, 0x44, 0x65, 0x6c, 0x4e, 0x6f, 0x64, 0x65, 0x49, 0x64, 0x12, 0x29, 0x0a,
0x08, 0x6e, 0x6f, 0x64, 0x65, 0x49, 0x6e, 0x66, 0x6f, 0x18, 0x04, 0x20, 0x03, 0x28, 0x0b, 0x32,
0x0d, 0x2e, 0x72, 0x70, 0x63, 0x2e, 0x4e, 0x6f, 0x64, 0x65, 0x49, 0x6e, 0x66, 0x6f, 0x52, 0x08,
0x6e, 0x6f, 0x64, 0x65, 0x49, 0x6e, 0x66, 0x6f, 0x22, 0x07, 0x0a, 0x05, 0x45, 0x6d, 0x70, 0x74,
0x79, 0x42, 0x07, 0x5a, 0x05, 0x2e, 0x3b, 0x72, 0x70, 0x63, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74,
0x6f, 0x33,
var file_proto_rpcproto_dynamicdiscover_proto_rawDesc = []byte{
0x0a, 0x24, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x72, 0x70, 0x63, 0x70, 0x72, 0x6f, 0x74, 0x6f,
0x2f, 0x64, 0x79, 0x6e, 0x61, 0x6d, 0x69, 0x63, 0x64, 0x69, 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72,
0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x03, 0x72, 0x70, 0x63, 0x22, 0xe6, 0x01, 0x0a, 0x08,
0x4e, 0x6f, 0x64, 0x65, 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x16, 0x0a, 0x06, 0x4e, 0x6f, 0x64, 0x65,
0x49, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x05, 0x52, 0x06, 0x4e, 0x6f, 0x64, 0x65, 0x49, 0x64,
0x12, 0x1a, 0x0a, 0x08, 0x4e, 0x6f, 0x64, 0x65, 0x4e, 0x61, 0x6d, 0x65, 0x18, 0x02, 0x20, 0x01,
0x28, 0x09, 0x52, 0x08, 0x4e, 0x6f, 0x64, 0x65, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x1e, 0x0a, 0x0a,
0x4c, 0x69, 0x73, 0x74, 0x65, 0x6e, 0x41, 0x64, 0x64, 0x72, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09,
0x52, 0x0a, 0x4c, 0x69, 0x73, 0x74, 0x65, 0x6e, 0x41, 0x64, 0x64, 0x72, 0x12, 0x26, 0x0a, 0x0e,
0x4d, 0x61, 0x78, 0x52, 0x70, 0x63, 0x50, 0x61, 0x72, 0x61, 0x6d, 0x4c, 0x65, 0x6e, 0x18, 0x04,
0x20, 0x01, 0x28, 0x0d, 0x52, 0x0e, 0x4d, 0x61, 0x78, 0x52, 0x70, 0x63, 0x50, 0x61, 0x72, 0x61,
0x6d, 0x4c, 0x65, 0x6e, 0x12, 0x18, 0x0a, 0x07, 0x50, 0x72, 0x69, 0x76, 0x61, 0x74, 0x65, 0x18,
0x05, 0x20, 0x01, 0x28, 0x08, 0x52, 0x07, 0x50, 0x72, 0x69, 0x76, 0x61, 0x74, 0x65, 0x12, 0x16,
0x0a, 0x06, 0x52, 0x65, 0x74, 0x69, 0x72, 0x65, 0x18, 0x06, 0x20, 0x01, 0x28, 0x08, 0x52, 0x06,
0x52, 0x65, 0x74, 0x69, 0x72, 0x65, 0x12, 0x2c, 0x0a, 0x11, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x63,
0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x4c, 0x69, 0x73, 0x74, 0x18, 0x07, 0x20, 0x03, 0x28,
0x09, 0x52, 0x11, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x63, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65,
0x4c, 0x69, 0x73, 0x74, 0x22, 0x3f, 0x0a, 0x12, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x44,
0x69, 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x52, 0x65, 0x71, 0x12, 0x29, 0x0a, 0x08, 0x6e, 0x6f,
0x64, 0x65, 0x49, 0x6e, 0x66, 0x6f, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0d, 0x2e, 0x72,
0x70, 0x63, 0x2e, 0x4e, 0x6f, 0x64, 0x65, 0x49, 0x6e, 0x66, 0x6f, 0x52, 0x08, 0x6e, 0x6f, 0x64,
0x65, 0x49, 0x6e, 0x66, 0x6f, 0x22, 0x9e, 0x01, 0x0a, 0x17, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72,
0x69, 0x62, 0x65, 0x44, 0x69, 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x4e, 0x6f, 0x74, 0x69, 0x66,
0x79, 0x12, 0x22, 0x0a, 0x0c, 0x4d, 0x61, 0x73, 0x74, 0x65, 0x72, 0x4e, 0x6f, 0x64, 0x65, 0x49,
0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x05, 0x52, 0x0c, 0x4d, 0x61, 0x73, 0x74, 0x65, 0x72, 0x4e,
0x6f, 0x64, 0x65, 0x49, 0x64, 0x12, 0x16, 0x0a, 0x06, 0x49, 0x73, 0x46, 0x75, 0x6c, 0x6c, 0x18,
0x02, 0x20, 0x01, 0x28, 0x08, 0x52, 0x06, 0x49, 0x73, 0x46, 0x75, 0x6c, 0x6c, 0x12, 0x1c, 0x0a,
0x09, 0x44, 0x65, 0x6c, 0x4e, 0x6f, 0x64, 0x65, 0x49, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x05,
0x52, 0x09, 0x44, 0x65, 0x6c, 0x4e, 0x6f, 0x64, 0x65, 0x49, 0x64, 0x12, 0x29, 0x0a, 0x08, 0x6e,
0x6f, 0x64, 0x65, 0x49, 0x6e, 0x66, 0x6f, 0x18, 0x04, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x0d, 0x2e,
0x72, 0x70, 0x63, 0x2e, 0x4e, 0x6f, 0x64, 0x65, 0x49, 0x6e, 0x66, 0x6f, 0x52, 0x08, 0x6e, 0x6f,
0x64, 0x65, 0x49, 0x6e, 0x66, 0x6f, 0x22, 0x3a, 0x0a, 0x0d, 0x4e, 0x6f, 0x64, 0x65, 0x52, 0x65,
0x74, 0x69, 0x72, 0x65, 0x52, 0x65, 0x71, 0x12, 0x29, 0x0a, 0x08, 0x6e, 0x6f, 0x64, 0x65, 0x49,
0x6e, 0x66, 0x6f, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0d, 0x2e, 0x72, 0x70, 0x63, 0x2e,
0x4e, 0x6f, 0x64, 0x65, 0x49, 0x6e, 0x66, 0x6f, 0x52, 0x08, 0x6e, 0x6f, 0x64, 0x65, 0x49, 0x6e,
0x66, 0x6f, 0x22, 0x07, 0x0a, 0x05, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x42, 0x07, 0x5a, 0x05, 0x2e,
0x3b, 0x72, 0x70, 0x63, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
}
var (
file_test_rpc_dynamicdiscover_proto_rawDescOnce sync.Once
file_test_rpc_dynamicdiscover_proto_rawDescData = file_test_rpc_dynamicdiscover_proto_rawDesc
file_proto_rpcproto_dynamicdiscover_proto_rawDescOnce sync.Once
file_proto_rpcproto_dynamicdiscover_proto_rawDescData = file_proto_rpcproto_dynamicdiscover_proto_rawDesc
)
func file_test_rpc_dynamicdiscover_proto_rawDescGZIP() []byte {
file_test_rpc_dynamicdiscover_proto_rawDescOnce.Do(func() {
file_test_rpc_dynamicdiscover_proto_rawDescData = protoimpl.X.CompressGZIP(file_test_rpc_dynamicdiscover_proto_rawDescData)
func file_proto_rpcproto_dynamicdiscover_proto_rawDescGZIP() []byte {
file_proto_rpcproto_dynamicdiscover_proto_rawDescOnce.Do(func() {
file_proto_rpcproto_dynamicdiscover_proto_rawDescData = protoimpl.X.CompressGZIP(file_proto_rpcproto_dynamicdiscover_proto_rawDescData)
})
return file_test_rpc_dynamicdiscover_proto_rawDescData
return file_proto_rpcproto_dynamicdiscover_proto_rawDescData
}
var file_test_rpc_dynamicdiscover_proto_msgTypes = make([]protoimpl.MessageInfo, 4)
var file_test_rpc_dynamicdiscover_proto_goTypes = []interface{}{
var file_proto_rpcproto_dynamicdiscover_proto_msgTypes = make([]protoimpl.MessageInfo, 5)
var file_proto_rpcproto_dynamicdiscover_proto_goTypes = []interface{}{
(*NodeInfo)(nil), // 0: rpc.NodeInfo
(*ServiceDiscoverReq)(nil), // 1: rpc.ServiceDiscoverReq
(*SubscribeDiscoverNotify)(nil), // 2: rpc.SubscribeDiscoverNotify
(*Empty)(nil), // 3: rpc.Empty
(*NodeRetireReq)(nil), // 3: rpc.NodeRetireReq
(*Empty)(nil), // 4: rpc.Empty
}
var file_test_rpc_dynamicdiscover_proto_depIdxs = []int32{
var file_proto_rpcproto_dynamicdiscover_proto_depIdxs = []int32{
0, // 0: rpc.ServiceDiscoverReq.nodeInfo:type_name -> rpc.NodeInfo
0, // 1: rpc.SubscribeDiscoverNotify.nodeInfo:type_name -> rpc.NodeInfo
2, // [2:2] is the sub-list for method output_type
2, // [2:2] is the sub-list for method input_type
2, // [2:2] is the sub-list for extension type_name
2, // [2:2] is the sub-list for extension extendee
0, // [0:2] is the sub-list for field type_name
0, // 2: rpc.NodeRetireReq.nodeInfo:type_name -> rpc.NodeInfo
3, // [3:3] is the sub-list for method output_type
3, // [3:3] is the sub-list for method input_type
3, // [3:3] is the sub-list for extension type_name
3, // [3:3] is the sub-list for extension extendee
0, // [0:3] is the sub-list for field type_name
}
func init() { file_test_rpc_dynamicdiscover_proto_init() }
func file_test_rpc_dynamicdiscover_proto_init() {
if File_test_rpc_dynamicdiscover_proto != nil {
func init() { file_proto_rpcproto_dynamicdiscover_proto_init() }
func file_proto_rpcproto_dynamicdiscover_proto_init() {
if File_proto_rpcproto_dynamicdiscover_proto != nil {
return
}
if !protoimpl.UnsafeEnabled {
file_test_rpc_dynamicdiscover_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} {
file_proto_rpcproto_dynamicdiscover_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*NodeInfo); i {
case 0:
return &v.state
@@ -350,7 +413,7 @@ func file_test_rpc_dynamicdiscover_proto_init() {
return nil
}
}
file_test_rpc_dynamicdiscover_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} {
file_proto_rpcproto_dynamicdiscover_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*ServiceDiscoverReq); i {
case 0:
return &v.state
@@ -362,7 +425,7 @@ func file_test_rpc_dynamicdiscover_proto_init() {
return nil
}
}
file_test_rpc_dynamicdiscover_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} {
file_proto_rpcproto_dynamicdiscover_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*SubscribeDiscoverNotify); i {
case 0:
return &v.state
@@ -374,7 +437,19 @@ func file_test_rpc_dynamicdiscover_proto_init() {
return nil
}
}
file_test_rpc_dynamicdiscover_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} {
file_proto_rpcproto_dynamicdiscover_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*NodeRetireReq); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_proto_rpcproto_dynamicdiscover_proto_msgTypes[4].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*Empty); i {
case 0:
return &v.state
@@ -391,18 +466,18 @@ func file_test_rpc_dynamicdiscover_proto_init() {
out := protoimpl.TypeBuilder{
File: protoimpl.DescBuilder{
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_test_rpc_dynamicdiscover_proto_rawDesc,
RawDescriptor: file_proto_rpcproto_dynamicdiscover_proto_rawDesc,
NumEnums: 0,
NumMessages: 4,
NumMessages: 5,
NumExtensions: 0,
NumServices: 0,
},
GoTypes: file_test_rpc_dynamicdiscover_proto_goTypes,
DependencyIndexes: file_test_rpc_dynamicdiscover_proto_depIdxs,
MessageInfos: file_test_rpc_dynamicdiscover_proto_msgTypes,
GoTypes: file_proto_rpcproto_dynamicdiscover_proto_goTypes,
DependencyIndexes: file_proto_rpcproto_dynamicdiscover_proto_depIdxs,
MessageInfos: file_proto_rpcproto_dynamicdiscover_proto_msgTypes,
}.Build()
File_test_rpc_dynamicdiscover_proto = out.File
file_test_rpc_dynamicdiscover_proto_rawDesc = nil
file_test_rpc_dynamicdiscover_proto_goTypes = nil
file_test_rpc_dynamicdiscover_proto_depIdxs = nil
File_proto_rpcproto_dynamicdiscover_proto = out.File
file_proto_rpcproto_dynamicdiscover_proto_rawDesc = nil
file_proto_rpcproto_dynamicdiscover_proto_goTypes = nil
file_proto_rpcproto_dynamicdiscover_proto_depIdxs = nil
}

View File

@@ -8,7 +8,8 @@ message NodeInfo{
string ListenAddr = 3;
uint32 MaxRpcParamLen = 4;
bool Private = 5;
repeated string PublicServiceList = 6;
bool Retire = 6;
repeated string PublicServiceList = 7;
}
//Client->Master
@@ -24,6 +25,12 @@ message SubscribeDiscoverNotify{
repeated NodeInfo nodeInfo = 4;
}
//Client->Master
message NodeRetireReq{
NodeInfo nodeInfo = 1;
}
//Master->Client
message Empty{
}

View File

@@ -74,7 +74,7 @@ func (slf *PBProcessor) IsParse(param interface{}) bool {
}
func (slf *PBProcessor) GetProcessorType() RpcProcessorType{
return RpcProcessorGoGoPB
return RpcProcessorPB
}
func (slf *PBProcessor) Clone(src interface{}) (interface{},error){

View File

@@ -14,7 +14,7 @@ import (
const maxClusterNode int = 128
type FuncRpcClient func(nodeId int, serviceMethod string, client []*Client) (error, int)
type FuncRpcClient func(nodeId int, serviceMethod string,filterRetire bool, client []*Client) (error, int)
type FuncRpcServer func() *Server
@@ -73,7 +73,7 @@ type INodeListener interface {
type IDiscoveryServiceListener interface {
OnDiscoveryService(nodeId int, serviceName []string)
OnUnDiscoveryService(nodeId int, serviceName []string)
OnUnDiscoveryService(nodeId int)
}
type CancelRpc func()
@@ -428,7 +428,7 @@ func (handler *RpcHandler) CallMethod(client *Client,ServiceMethod string, param
func (handler *RpcHandler) goRpc(processor IRpcProcessor, bCast bool, nodeId int, serviceMethod string, args interface{}) error {
var pClientList [maxClusterNode]*Client
err, count := handler.funcRpcClient(nodeId, serviceMethod, pClientList[:])
err, count := handler.funcRpcClient(nodeId, serviceMethod,false, pClientList[:])
if count == 0 {
if err != nil {
log.Error("call serviceMethod is failed",log.String("serviceMethod",serviceMethod),log.ErrorAttr("error",err))
@@ -458,7 +458,7 @@ func (handler *RpcHandler) goRpc(processor IRpcProcessor, bCast bool, nodeId int
func (handler *RpcHandler) callRpc(timeout time.Duration,nodeId int, serviceMethod string, args interface{}, reply interface{}) error {
var pClientList [maxClusterNode]*Client
err, count := handler.funcRpcClient(nodeId, serviceMethod, pClientList[:])
err, count := handler.funcRpcClient(nodeId, serviceMethod,false, pClientList[:])
if err != nil {
log.Error("Call serviceMethod is failed",log.ErrorAttr("error",err))
return err
@@ -502,7 +502,7 @@ func (handler *RpcHandler) asyncCallRpc(timeout time.Duration,nodeId int, servic
reply := reflect.New(fVal.Type().In(0).Elem()).Interface()
var pClientList [2]*Client
err, count := handler.funcRpcClient(nodeId, serviceMethod, pClientList[:])
err, count := handler.funcRpcClient(nodeId, serviceMethod,false, pClientList[:])
if count == 0 || err != nil {
if err == nil {
if nodeId > 0 {
@@ -585,7 +585,7 @@ func (handler *RpcHandler) CastGo(serviceMethod string, args interface{}) error
func (handler *RpcHandler) RawGoNode(rpcProcessorType RpcProcessorType, nodeId int, rpcMethodId uint32, serviceName string, rawArgs []byte) error {
processor := GetProcessor(uint8(rpcProcessorType))
err, count := handler.funcRpcClient(nodeId, serviceName, handler.pClientList)
err, count := handler.funcRpcClient(nodeId, serviceName,false, handler.pClientList)
if count == 0 || err != nil {
log.Error("call serviceMethod is failed",log.ErrorAttr("error",err))
return err

View File

@@ -17,7 +17,7 @@ type RpcProcessorType uint8
const (
RpcProcessorJson RpcProcessorType = 0
RpcProcessorGoGoPB RpcProcessorType = 1
RpcProcessorPB RpcProcessorType = 1
)
var arrayProcessor = []IRpcProcessor{&JsonProcessor{}, &PBProcessor{}}

View File

@@ -28,6 +28,7 @@ type IService interface {
OnSetup(iService IService)
OnInit() error
OnStart()
OnRetire()
OnRelease()
SetName(serviceName string)
@@ -40,6 +41,9 @@ type IService interface {
SetEventChannelNum(num int)
OpenProfiler()
SetRetire() //设置服务退休状态
IsRetire() bool //服务是否退休
}
type Service struct {
@@ -51,6 +55,7 @@ type Service struct {
serviceCfg interface{}
goroutineNum int32
startStatus bool
retire int32
eventProcessor event.IEventProcessor
profiler *profiler.Profiler //性能分析器
nodeEventLister rpc.INodeListener
@@ -97,6 +102,19 @@ func (s *Service) OpenProfiler() {
}
}
func (s *Service) IsRetire() bool{
return atomic.LoadInt32(&s.retire) != 0
}
func (s *Service) SetRetire(){
atomic.StoreInt32(&s.retire,1)
ev := event.NewEvent()
ev.Type = event.Sys_Event_Retire
s.pushEvent(ev)
}
func (s *Service) Init(iService IService,getClientFun rpc.FuncRpcClient,getServerFun rpc.FuncRpcServer,serviceCfg interface{}) {
s.closeSig = make(chan struct{})
s.dispatcher =timer.NewDispatcher(timerDispatcherLen)
@@ -155,6 +173,9 @@ func (s *Service) Run() {
concurrent.DoCallback(cb)
case ev := <- s.chanEvent:
switch ev.GetEventType() {
case event.Sys_Event_Retire:
log.Info("service OnRetire",log.String("servceName",s.GetName()))
s.self.(IService).OnRetire()
case event.ServiceRpcRequestEvent:
cEvent,ok := ev.(*event.Event)
if ok == false {
@@ -304,7 +325,7 @@ func (s *Service) OnDiscoverServiceEvent(ev event.IEvent){
if event.IsDiscovery {
s.discoveryServiceLister.OnDiscoveryService(event.NodeId,event.ServiceName)
}else{
s.discoveryServiceLister.OnUnDiscoveryService(event.NodeId,event.ServiceName)
s.discoveryServiceLister.OnUnDiscoveryService(event.NodeId)
}
}
@@ -387,3 +408,6 @@ func (s *Service) SetGoRoutineNum(goroutineNum int32) bool {
s.goroutineNum = goroutineNum
return true
}
func (s *Service) OnRetire(){
}

View File

@@ -60,3 +60,9 @@ func StopAllService(){
setupServiceList[i].Stop()
}
}
func NotifyAllServiceRetire(){
for i := len(setupServiceList) - 1; i >= 0; i-- {
setupServiceList[i].SetRetire()
}
}

View File

@@ -53,18 +53,9 @@ type Client struct {
}
func (tcpService *TcpService) genId() uint64 {
if node.GetNodeId()>MaxNodeId{
panic("nodeId exceeds the maximum!")
}
newSeed := atomic.AddUint32(&seed,1) % MaxSeed
nowTime := uint64(time.Now().Unix())%MaxTime
return (uint64(node.GetNodeId())<<50)|(nowTime<<19)|uint64(newSeed)
}
func GetNodeId(agentId uint64) int {
return int(agentId>>50)
return (uint64(node.GetNodeId()%MaxNodeId)<<50)|(nowTime<<19)|uint64(newSeed)
}
func (tcpService *TcpService) OnInit() error{

View File

@@ -1,6 +1,8 @@
package math
import "github.com/duanhf2012/origin/log"
import (
"github.com/duanhf2012/origin/log"
)
type NumberType interface {
int | int8 | int16 | int32 | int64 | float32 | float64 | uint | uint8 | uint16 | uint32 | uint64
@@ -38,41 +40,90 @@ func Abs[NumType SignedNumberType](Num NumType) NumType {
return Num
}
func Add[NumType NumberType](number1 NumType, number2 NumType) NumType {
func AddSafe[NumType NumberType](number1 NumType, number2 NumType) (NumType, bool) {
ret := number1 + number2
if number2> 0 && ret < number1 {
log.Stack("Calculation overflow" ,log.Any("number1",number1),log.Any("number2",number2))
}else if (number2<0 && ret > number1){
log.Stack("Calculation overflow" ,log.Any("number1",number1),log.Any("number2",number2))
if number2 > 0 && ret < number1 {
log.Stack("Calculation overflow", log.Any("number1", number1), log.Any("number2", number2))
return ret, false
} else if number2 < 0 && ret > number1 {
log.Stack("Calculation overflow", log.Any("number1", number1), log.Any("number2", number2))
return ret, false
}
return ret, true
}
func SubSafe[NumType NumberType](number1 NumType, number2 NumType) (NumType, bool) {
ret := number1 - number2
if number2 > 0 && ret > number1 {
log.Stack("Calculation overflow", log.Any("number1", number1), log.Any("number2", number2))
return ret, false
} else if number2 < 0 && ret < number1 {
log.Stack("Calculation overflow", log.Any("number1", number1), log.Any("number2", number2))
return ret, false
}
return ret, true
}
func MulSafe[NumType NumberType](number1 NumType, number2 NumType) (NumType, bool) {
ret := number1 * number2
if number1 == 0 || number2 == 0 {
return ret, true
}
if ret/number2 == number1 {
return ret, true
}
log.Stack("Calculation overflow", log.Any("number1", number1), log.Any("number2", number2))
return ret, true
}
func Add[NumType NumberType](number1 NumType, number2 NumType) NumType {
ret, _ := AddSafe(number1, number2)
return ret
}
func Sub[NumType NumberType](number1 NumType, number2 NumType) NumType {
ret := number1 - number2
if number2> 0 && ret > number1 {
log.Stack("Calculation overflow" ,log.Any("number1",number1),log.Any("number2",number2))
}else if (number2<0 && ret < number1){
log.Stack("Calculation overflow" ,log.Any("number1",number1),log.Any("number2",number2))
}
ret, _ := SubSafe(number1, number2)
return ret
}
func Mul[NumType NumberType](number1 NumType, number2 NumType) NumType {
ret := number1 * number2
if number1 == 0 || number2 == 0 {
return ret
}
if ret / number2 == number1 {
return ret
}
log.Stack("Calculation overflow" ,log.Any("number1",number1),log.Any("number2",number2))
ret, _ := MulSafe(number1, number2)
return ret
}
// 安全的求比例
func PercentRateSafe[NumType NumberType, OutNumType NumberType](maxValue int64, rate NumType, numbers ...NumType) (OutNumType, bool) {
// 比例不能为负数
if rate < 0 {
log.Stack("rate must not positive")
return 0, false
}
if rate == 0 {
// 比例为0
return 0, true
}
ret := int64(rate)
for _, number := range numbers {
number64 := int64(number)
result, success := MulSafe(number64, ret)
if !success {
// 基数*比例越界了int64都越界了没办法了
return 0, false
}
ret = result
}
ret = ret / 10000
if ret > maxValue {
return 0, false
}
return OutNumType(ret), true
}