新增rpc调用数据传输数据最大长度参数配置

This commit is contained in:
orgin
2022-03-03 14:50:10 +08:00
parent b91f09b9ff
commit 4f42b768e8
7 changed files with 370 additions and 319 deletions

View File

@@ -14,51 +14,54 @@ var configDir = "./config/"
type SetupServiceFun func(s ...service.IService)
type NodeStatus int
const(
Normal NodeStatus = 0 //正常
Discard NodeStatus = 1 //丢弃
const (
Normal NodeStatus = 0 //正常
Discard NodeStatus = 1 //丢弃
)
type NodeInfo struct {
NodeId int
NodeName string
Private bool
ListenAddr string
ServiceList []string //所有的服务列表
NodeId int
NodeName string
Private bool
ListenAddr string
MaxRpcParamLen uint32 //最大Rpc参数长度
ServiceList []string //所有的服务列表
PublicServiceList []string //对外公开的服务列表
DiscoveryService []string //筛选发现的服务,如果不配置,不进行筛选
NeighborService []string
status NodeStatus
DiscoveryService []string //筛选发现的服务,如果不配置,不进行筛选
NeighborService []string
status NodeStatus
}
type NodeRpcInfo struct {
nodeInfo NodeInfo
client *rpc.Client
client *rpc.Client
}
var cluster Cluster
type Cluster struct {
localNodeInfo NodeInfo //本结点配置信息
masterDiscoveryNodeList []NodeInfo //配置发现Master结点
localServiceCfg map[string]interface{} //map[serviceName]配置数据*
mapRpc map[int] NodeRpcInfo //nodeId
serviceDiscovery IServiceDiscovery //服务发现接口
localServiceCfg map[string]interface{} //map[serviceName]配置数据*
mapRpc map[int]NodeRpcInfo //nodeId
serviceDiscovery IServiceDiscovery //服务发现接口
locker sync.RWMutex //结点与服务关系保护锁
mapIdNode map[int]NodeInfo //map[NodeId]NodeInfo
mapServiceNode map[string]map[int]struct{} //map[serviceName]map[NodeId]
locker sync.RWMutex //结点与服务关系保护锁
mapIdNode map[int]NodeInfo //map[NodeId]NodeInfo
mapServiceNode map[string]map[int]struct{} //map[serviceName]map[NodeId]
rpcServer rpc.Server
rpcEventLocker sync.RWMutex //Rpc事件监听保护锁
rpcServer rpc.Server
rpcEventLocker sync.RWMutex //Rpc事件监听保护锁
mapServiceListenRpcEvent map[string]struct{} //ServiceName
}
func GetCluster() *Cluster{
func GetCluster() *Cluster {
return &cluster
}
func SetConfigDir(cfgDir string){
func SetConfigDir(cfgDir string) {
configDir = cfgDir
}
@@ -67,37 +70,37 @@ func SetServiceDiscovery(serviceDiscovery IServiceDiscovery) {
}
func (cls *Cluster) Start() {
cls.rpcServer.Start(cls.localNodeInfo.ListenAddr)
cls.rpcServer.Start(cls.localNodeInfo.ListenAddr, cls.localNodeInfo.MaxRpcParamLen)
}
func (cls *Cluster) Stop() {
cls.serviceDiscovery.OnNodeStop()
}
func (cls *Cluster) DiscardNode(nodeId int){
func (cls *Cluster) DiscardNode(nodeId int) {
cls.locker.Lock()
nodeInfo,ok := cls.mapIdNode[nodeId]
nodeInfo, ok := cls.mapIdNode[nodeId]
cls.locker.Unlock()
if ok==true && nodeInfo.status == Discard {
cls.DelNode(nodeId,true)
if ok == true && nodeInfo.status == Discard {
cls.DelNode(nodeId, true)
}
}
func (cls *Cluster) DelNode(nodeId int,immediately bool){
func (cls *Cluster) DelNode(nodeId int, immediately bool) {
//MasterDiscover结点与本地结点不删除
if cls.GetMasterDiscoveryNodeInfo(nodeId)!=nil || nodeId == cls.localNodeInfo.NodeId {
if cls.GetMasterDiscoveryNodeInfo(nodeId) != nil || nodeId == cls.localNodeInfo.NodeId {
return
}
cls.locker.Lock()
nodeInfo,ok := cls.mapIdNode[nodeId]
nodeInfo, ok := cls.mapIdNode[nodeId]
if ok == false {
cls.locker.Unlock()
return
}
rpc,ok := cls.mapRpc[nodeId]
for{
rpc, ok := cls.mapRpc[nodeId]
for {
//立即删除
if immediately || ok == false {
break
@@ -109,49 +112,48 @@ func (cls *Cluster) DelNode(nodeId int,immediately bool){
nodeInfo.status = Discard
rpc.client.Unlock()
cls.locker.Unlock()
log.SRelease("Discard node ",nodeInfo.NodeId," ",nodeInfo.ListenAddr)
log.SRelease("Discard node ", nodeInfo.NodeId, " ", nodeInfo.ListenAddr)
return
}
rpc.client.Unlock()
break
}
for _,serviceName := range nodeInfo.ServiceList{
cls.delServiceNode(serviceName,nodeId)
for _, serviceName := range nodeInfo.ServiceList {
cls.delServiceNode(serviceName, nodeId)
}
delete(cls.mapIdNode,nodeId)
delete(cls.mapRpc,nodeId)
delete(cls.mapIdNode, nodeId)
delete(cls.mapRpc, nodeId)
cls.locker.Unlock()
if ok == true {
rpc.client.Close(false)
}
log.SRelease("remove node ",nodeInfo.NodeId," ",nodeInfo.ListenAddr)
log.SRelease("remove node ", nodeInfo.NodeId, " ", nodeInfo.ListenAddr)
}
func (cls *Cluster) serviceDiscoveryDelNode (nodeId int,immediately bool){
func (cls *Cluster) serviceDiscoveryDelNode(nodeId int, immediately bool) {
if nodeId == 0 {
return
}
cls.DelNode(nodeId,immediately)
cls.DelNode(nodeId, immediately)
}
func (cls *Cluster) delServiceNode(serviceName string,nodeId int){
if nodeId == cls.localNodeInfo.NodeId{
func (cls *Cluster) delServiceNode(serviceName string, nodeId int) {
if nodeId == cls.localNodeInfo.NodeId {
return
}
mapNode := cls.mapServiceNode[serviceName]
delete(mapNode,nodeId)
if len(mapNode)==0 {
delete(cls.mapServiceNode,serviceName)
delete(mapNode, nodeId)
if len(mapNode) == 0 {
delete(cls.mapServiceNode, serviceName)
}
}
func (cls *Cluster) serviceDiscoverySetNodeInfo (nodeInfo *NodeInfo){
func (cls *Cluster) serviceDiscoverySetNodeInfo(nodeInfo *NodeInfo) {
//本地结点不加入
if nodeInfo.NodeId == cls.localNodeInfo.NodeId {
return
@@ -161,55 +163,54 @@ func (cls *Cluster) serviceDiscoverySetNodeInfo (nodeInfo *NodeInfo){
defer cls.locker.Unlock()
//先清一次的NodeId对应的所有服务清理
lastNodeInfo,ok := cls.mapIdNode[nodeInfo.NodeId]
if ok == true{
for _,serviceName := range lastNodeInfo.ServiceList{
cls.delServiceNode(serviceName,nodeInfo.NodeId)
lastNodeInfo, ok := cls.mapIdNode[nodeInfo.NodeId]
if ok == true {
for _, serviceName := range lastNodeInfo.ServiceList {
cls.delServiceNode(serviceName, nodeInfo.NodeId)
}
}
//再重新组装
mapDuplicate := map[string]interface{}{} //预防重复数据
for _,serviceName := range nodeInfo.PublicServiceList {
if _,ok := mapDuplicate[serviceName];ok == true {
for _, serviceName := range nodeInfo.PublicServiceList {
if _, ok := mapDuplicate[serviceName]; ok == true {
//存在重复
log.SError("Bad duplicate Service Cfg.")
continue
}
mapDuplicate[serviceName] = nil
if _,ok:=cls.mapServiceNode[serviceName];ok==false {
cls.mapServiceNode[serviceName] = make(map[int]struct{},1)
if _, ok := cls.mapServiceNode[serviceName]; ok == false {
cls.mapServiceNode[serviceName] = make(map[int]struct{}, 1)
}
cls.mapServiceNode[serviceName][nodeInfo.NodeId] = struct{}{}
}
cls.mapIdNode[nodeInfo.NodeId] = *nodeInfo
log.SRelease("Discovery nodeId: ",nodeInfo.NodeId," services:",nodeInfo.PublicServiceList)
log.SRelease("Discovery nodeId: ", nodeInfo.NodeId, " services:", nodeInfo.PublicServiceList)
//已经存在连接,则不需要进行设置
if _,rpcInfoOK := cls.mapRpc[nodeInfo.NodeId];rpcInfoOK == true {
if _, rpcInfoOK := cls.mapRpc[nodeInfo.NodeId]; rpcInfoOK == true {
return
}
rpcInfo := NodeRpcInfo{}
rpcInfo.nodeInfo = *nodeInfo
rpcInfo.client = &rpc.Client{}
rpcInfo.client.TriggerRpcEvent = cls.triggerRpcEvent
rpcInfo.client.Connect(nodeInfo.NodeId,nodeInfo.ListenAddr)
rpcInfo.client.Connect(nodeInfo.NodeId, nodeInfo.ListenAddr, nodeInfo.MaxRpcParamLen)
cls.mapRpc[nodeInfo.NodeId] = rpcInfo
}
func (cls *Cluster) buildLocalRpc(){
func (cls *Cluster) buildLocalRpc() {
rpcInfo := NodeRpcInfo{}
rpcInfo.nodeInfo = cls.localNodeInfo
rpcInfo.client = &rpc.Client{}
rpcInfo.client.Connect(rpcInfo.nodeInfo.NodeId,"")
rpcInfo.client.Connect(rpcInfo.nodeInfo.NodeId, "", 0)
cls.mapRpc[cls.localNodeInfo.NodeId] = rpcInfo
}
func (cls *Cluster) Init(localNodeId int,setupServiceFun SetupServiceFun) error{
func (cls *Cluster) Init(localNodeId int, setupServiceFun SetupServiceFun) error {
//1.初始化配置
err := cls.InitCfg(localNodeId)
if err != nil {
@@ -220,10 +221,10 @@ func (cls *Cluster) Init(localNodeId int,setupServiceFun SetupServiceFun) error{
cls.buildLocalRpc()
//2.安装服务发现结点
cls.SetupServiceDiscovery(localNodeId,setupServiceFun)
cls.SetupServiceDiscovery(localNodeId, setupServiceFun)
service.RegRpcEventFun = cls.RegRpcEvent
err = cls.serviceDiscovery.InitDiscovery(localNodeId,cls.serviceDiscoveryDelNode,cls.serviceDiscoverySetNodeInfo)
err = cls.serviceDiscovery.InitDiscovery(localNodeId, cls.serviceDiscoveryDelNode, cls.serviceDiscoverySetNodeInfo)
if err != nil {
return err
}
@@ -231,12 +232,12 @@ func (cls *Cluster) Init(localNodeId int,setupServiceFun SetupServiceFun) error{
return nil
}
func (cls *Cluster) checkDynamicDiscovery(localNodeId int) (bool,bool){
var localMaster bool //本结点是否为Master结点
var hasMaster bool //是否配置Master服务
func (cls *Cluster) checkDynamicDiscovery(localNodeId int) (bool, bool) {
var localMaster bool //本结点是否为Master结点
var hasMaster bool //是否配置Master服务
//遍历所有结点
for _,nodeInfo := range cls.masterDiscoveryNodeList{
for _, nodeInfo := range cls.masterDiscoveryNodeList {
if nodeInfo.NodeId == localNodeId {
localMaster = true
}
@@ -244,27 +245,27 @@ func (cls *Cluster) checkDynamicDiscovery(localNodeId int) (bool,bool){
}
//返回查询结果
return localMaster,hasMaster
return localMaster, hasMaster
}
func (cls *Cluster) appendService(serviceName string,bPublicService bool){
cls.localNodeInfo.ServiceList = append(cls.localNodeInfo.ServiceList,serviceName)
func (cls *Cluster) appendService(serviceName string, bPublicService bool) {
cls.localNodeInfo.ServiceList = append(cls.localNodeInfo.ServiceList, serviceName)
if bPublicService {
cls.localNodeInfo.PublicServiceList = append(cls.localNodeInfo.PublicServiceList,serviceName)
cls.localNodeInfo.PublicServiceList = append(cls.localNodeInfo.PublicServiceList, serviceName)
}
if _,ok:=cls.mapServiceNode[serviceName];ok==false {
if _, ok := cls.mapServiceNode[serviceName]; ok == false {
cls.mapServiceNode[serviceName] = map[int]struct{}{}
}
cls.mapServiceNode[serviceName][cls.localNodeInfo.NodeId]= struct{}{}
cls.mapServiceNode[serviceName][cls.localNodeInfo.NodeId] = struct{}{}
}
func (cls *Cluster) GetDiscoveryNodeList() []NodeInfo{
func (cls *Cluster) GetDiscoveryNodeList() []NodeInfo {
return cls.masterDiscoveryNodeList
}
func (cls *Cluster) GetMasterDiscoveryNodeInfo(nodeId int) *NodeInfo{
for i:=0;i<len(cls.masterDiscoveryNodeList);i++{
func (cls *Cluster) GetMasterDiscoveryNodeInfo(nodeId int) *NodeInfo {
for i := 0; i < len(cls.masterDiscoveryNodeList); i++ {
if cls.masterDiscoveryNodeList[i].NodeId == nodeId {
return &cls.masterDiscoveryNodeList[i]
}
@@ -273,29 +274,29 @@ func (cls *Cluster) GetMasterDiscoveryNodeInfo(nodeId int) *NodeInfo{
return nil
}
func (cls *Cluster) IsMasterDiscoveryNode() bool{
return cls.GetMasterDiscoveryNodeInfo(cls.GetLocalNodeInfo().NodeId)!=nil
func (cls *Cluster) IsMasterDiscoveryNode() bool {
return cls.GetMasterDiscoveryNodeInfo(cls.GetLocalNodeInfo().NodeId) != nil
}
func (cls *Cluster) SetupServiceDiscovery(localNodeId int,setupServiceFun SetupServiceFun) {
if cls.serviceDiscovery!=nil {
func (cls *Cluster) SetupServiceDiscovery(localNodeId int, setupServiceFun SetupServiceFun) {
if cls.serviceDiscovery != nil {
return
}
//1.如果没有配置DiscoveryNode配置则使用默认配置文件发现服务
localMaster,hasMaster := cls.checkDynamicDiscovery(localNodeId)
localMaster, hasMaster := cls.checkDynamicDiscovery(localNodeId)
if hasMaster == false {
cls.serviceDiscovery = &ConfigDiscovery{}
return
}
setupServiceFun(&masterService,&clientService)
setupServiceFun(&masterService, &clientService)
//2.如果为动态服务发现安装本地发现服务
cls.serviceDiscovery = getDynamicDiscovery()
if localMaster == true {
cls.appendService(DynamicDiscoveryMasterName,false)
cls.appendService(DynamicDiscoveryMasterName, false)
}
cls.appendService(DynamicDiscoveryClientName,true)
cls.appendService(DynamicDiscoveryClientName, true)
}
@@ -309,7 +310,7 @@ func (cls *Cluster) FindRpcHandler(serviceName string) rpc.IRpcHandler {
}
func (cls *Cluster) getRpcClient(nodeId int) *rpc.Client {
c,ok := cls.mapRpc[nodeId]
c, ok := cls.mapRpc[nodeId]
if ok == false {
return nil
}
@@ -323,50 +324,49 @@ func (cls *Cluster) GetRpcClient(nodeId int) *rpc.Client {
return cls.getRpcClient(nodeId)
}
func GetRpcClient(nodeId int,serviceMethod string,clientList []*rpc.Client) (error,int) {
if nodeId>0 {
func GetRpcClient(nodeId int, serviceMethod string, clientList []*rpc.Client) (error, int) {
if nodeId > 0 {
pClient := GetCluster().GetRpcClient(nodeId)
if pClient==nil {
return fmt.Errorf("cannot find nodeid %d!",nodeId),0
if pClient == nil {
return fmt.Errorf("cannot find nodeid %d!", nodeId), 0
}
clientList[0] = pClient
return nil,1
return nil, 1
}
findIndex := strings.Index(serviceMethod,".")
if findIndex==-1 {
return fmt.Errorf("servicemethod param %s is error!",serviceMethod),0
findIndex := strings.Index(serviceMethod, ".")
if findIndex == -1 {
return fmt.Errorf("servicemethod param %s is error!", serviceMethod), 0
}
serviceName := serviceMethod[:findIndex]
//1.找到对应的rpcNodeid
return GetCluster().GetNodeIdByService(serviceName,clientList,true)
return GetCluster().GetNodeIdByService(serviceName, clientList, true)
}
func GetRpcServer() *rpc.Server{
func GetRpcServer() *rpc.Server {
return &cluster.rpcServer
}
func (cls *Cluster) IsNodeConnected (nodeId int) bool {
func (cls *Cluster) IsNodeConnected(nodeId int) bool {
pClient := cls.GetRpcClient(nodeId)
return pClient!=nil && pClient.IsConnected()
return pClient != nil && pClient.IsConnected()
}
func (cls *Cluster) triggerRpcEvent (bConnect bool,clientSeq uint32,nodeId int) {
func (cls *Cluster) triggerRpcEvent(bConnect bool, clientSeq uint32, nodeId int) {
cls.locker.Lock()
nodeInfo,ok := cls.mapRpc[nodeId]
if ok == false || nodeInfo.client==nil || nodeInfo.client.GetClientSeq()!=clientSeq {
nodeInfo, ok := cls.mapRpc[nodeId]
if ok == false || nodeInfo.client == nil || nodeInfo.client.GetClientSeq() != clientSeq {
cls.locker.Unlock()
return
}
cls.locker.Unlock()
cls.rpcEventLocker.Lock()
for serviceName,_:= range cls.mapServiceListenRpcEvent{
for serviceName, _ := range cls.mapServiceListenRpcEvent {
ser := service.GetService(serviceName)
if ser == nil {
log.SError("cannot find service name ",serviceName)
log.SError("cannot find service name ", serviceName)
continue
}
@@ -382,7 +382,7 @@ func (cls *Cluster) GetLocalNodeInfo() *NodeInfo {
return &cls.localNodeInfo
}
func (cls *Cluster) RegRpcEvent(serviceName string){
func (cls *Cluster) RegRpcEvent(serviceName string) {
cls.rpcEventLocker.Lock()
if cls.mapServiceListenRpcEvent == nil {
cls.mapServiceListenRpcEvent = map[string]struct{}{}
@@ -392,27 +392,27 @@ func (cls *Cluster) RegRpcEvent(serviceName string){
cls.rpcEventLocker.Unlock()
}
func (cls *Cluster) UnRegRpcEvent(serviceName string){
func (cls *Cluster) UnRegRpcEvent(serviceName string) {
cls.rpcEventLocker.Lock()
delete(cls.mapServiceListenRpcEvent,serviceName)
delete(cls.mapServiceListenRpcEvent, serviceName)
cls.rpcEventLocker.Unlock()
}
func (cls *Cluster) FetchAllNodeId(fetchFun func(nodeId int)){
func (cls *Cluster) FetchAllNodeId(fetchFun func(nodeId int)) {
cls.locker.Lock()
for nodeId,_:= range cls.mapIdNode {
for nodeId, _ := range cls.mapIdNode {
fetchFun(nodeId)
}
cls.locker.Unlock()
}
func HasService(nodeId int,serviceName string) bool{
func HasService(nodeId int, serviceName string) bool {
cluster.locker.RLock()
defer cluster.locker.RUnlock()
mapNode,_ := cluster.mapServiceNode[serviceName]
if mapNode!=nil {
_,ok := mapNode[nodeId]
mapNode, _ := cluster.mapServiceNode[serviceName]
if mapNode != nil {
_, ok := mapNode[nodeId]
return ok
}

View File

@@ -78,7 +78,8 @@ func (ds *DynamicDiscoveryMaster) OnStart() {
nodeInfo.NodeName = localNodeInfo.NodeName
nodeInfo.ListenAddr = localNodeInfo.ListenAddr
nodeInfo.PublicServiceList = localNodeInfo.PublicServiceList
nodeInfo.MaxRpcParamLen = localNodeInfo.MaxRpcParamLen
ds.addNodeInfo(&nodeInfo)
}
@@ -144,7 +145,7 @@ func (ds *DynamicDiscoveryMaster) RPC_RegServiceDiscover(req *rpc.ServiceDiscove
nodeInfo.ServiceList = req.NodeInfo.PublicServiceList
nodeInfo.PublicServiceList = req.NodeInfo.PublicServiceList
nodeInfo.ListenAddr = req.NodeInfo.ListenAddr
nodeInfo.MaxRpcParamLen = req.NodeInfo.MaxRpcParamLen
//主动删除已经存在的结点,确保先断开,再连接
cluster.serviceDiscoveryDelNode(nodeInfo.NodeId, true)
@@ -264,6 +265,7 @@ func (dc *DynamicDiscoveryClient) RPC_SubServiceDiscover(req *rpc.SubscribeDisco
nInfo.NodeId = nodeInfo.NodeId
nInfo.NodeName = nodeInfo.NodeName
nInfo.ListenAddr = nodeInfo.ListenAddr
nInfo.MaxRpcParamLen = nodeInfo.MaxRpcParamLen
mapNodeInfo[nodeInfo.NodeId] = nInfo
}
@@ -324,6 +326,7 @@ func (dc *DynamicDiscoveryClient) OnNodeConnected(nodeId int) {
req.NodeInfo.NodeId = int32(cluster.localNodeInfo.NodeId)
req.NodeInfo.NodeName = cluster.localNodeInfo.NodeName
req.NodeInfo.ListenAddr = cluster.localNodeInfo.ListenAddr
req.NodeInfo.MaxRpcParamLen = cluster.localNodeInfo.MaxRpcParamLen
//MasterDiscoveryNode配置中没有配置NeighborService则同步当前结点所有服务
if len(nodeInfo.NeighborService) == 0 {
@@ -335,12 +338,12 @@ func (dc *DynamicDiscoveryClient) OnNodeConnected(nodeId int) {
//向Master服务同步本Node服务信息
err := dc.AsyncCallNode(nodeId, RegServiceDiscover, &req, func(res *rpc.Empty, err error) {
if err != nil {
log.SError("call ",RegServiceDiscover," is fail :", err.Error())
log.SError("call ", RegServiceDiscover, " is fail :", err.Error())
return
}
})
if err != nil {
log.SError("call ",RegServiceDiscover," is fail :", err.Error())
log.SError("call ", RegServiceDiscover, " is fail :", err.Error())
}
}
@@ -373,6 +376,7 @@ func (dc *DynamicDiscoveryClient) setNodeInfo(nodeInfo *rpc.NodeInfo) {
nInfo.NodeId = int(nodeInfo.NodeId)
nInfo.NodeName = nodeInfo.NodeName
nInfo.ListenAddr = nodeInfo.ListenAddr
nInfo.MaxRpcParamLen = nodeInfo.MaxRpcParamLen
dc.funSetService(&nInfo)
}