1.支持动态服务发现功能

2.Service中支持对RPC结点连接或断开事件监听
This commit is contained in:
boyce
2021-04-29 17:18:13 +08:00
parent 63c2ac4c98
commit a60ad1cccf
16 changed files with 1797 additions and 158 deletions

View File

@@ -11,13 +11,16 @@ import (
var configDir = "./config/"
type SetupServiceFun func(s ...service.IService)
type NodeInfo struct {
NodeId int
NodeName string
Private bool
ListenAddr string
ServiceList []string
ServiceList []string //所有的服务列表
PublicServiceList []string //对外公开的服务列表
DiscoveryService []string //筛选发现的服务,如果不配置,不进行筛选
}
type NodeRpcInfo struct {
@@ -27,16 +30,24 @@ type NodeRpcInfo struct {
var cluster Cluster
type Cluster struct {
localNodeInfo NodeInfo
localServiceCfg map[string]interface{} //map[serviceName]配置数据*
mapRpc map[int] NodeRpcInfo //nodeId
serviceDiscovery IServiceDiscovery //服务发现接口
mapIdNode map[int]NodeInfo //map[NodeId]NodeInfo
mapServiceNode map[string][]int //map[serviceName]NodeInfo
locker sync.RWMutex
rpcServer rpc.Server
localNodeInfo NodeInfo //本结点配置信息
discoveryNodeList []NodeInfo //配置发现Master结点
rpcListerList []rpc.IRpcListener
localServiceCfg map[string]interface{} //map[serviceName]配置数据*
mapRpc map[int] NodeRpcInfo //nodeId
serviceDiscovery IServiceDiscovery //服务发现接口
locker sync.RWMutex //结点与服务关系保护锁
mapIdNode map[int]NodeInfo //map[NodeId]NodeInfo
mapServiceNode map[string]map[int]struct{} //map[serviceName]map[NodeId]
rpcServer rpc.Server
rpcEventLocker sync.RWMutex //Rpc事件监听保护锁
mapServiceListenRpcEvent map[string]struct{} //ServiceName
}
func GetCluster() *Cluster{
return &cluster
}
func SetConfigDir(cfgDir string){
@@ -47,23 +58,19 @@ func SetServiceDiscovery(serviceDiscovery IServiceDiscovery) {
cluster.serviceDiscovery = serviceDiscovery
}
func (cls *Cluster) serviceDiscoveryDelNode (nodeId int){
cls.locker.Lock()
defer cls.locker.Unlock()
cls.delNode(nodeId)
func (cls *Cluster) Start() {
cls.rpcServer.Start(cls.localNodeInfo.ListenAddr)
}
func (cls *Cluster) delNode(nodeId int){
//删除rpc连接关系
rpc,ok := cls.mapRpc[nodeId]
if ok == true {
delete(cls.mapRpc,nodeId)
rpc.client.Close(false)
}
func (cls *Cluster) Stop() {
cls.serviceDiscovery.OnNodeStop()
}
func (cls *Cluster) DelNode(nodeId int){
cls.locker.Lock()
nodeInfo,ok := cls.mapIdNode[nodeId]
if ok == false {
cls.locker.Unlock()
return
}
@@ -71,43 +78,73 @@ func (cls *Cluster) delNode(nodeId int){
cls.delServiceNode(serviceName,nodeId)
}
rpc,ok := cls.mapRpc[nodeId]
delete(cls.mapIdNode,nodeId)
delete(cls.mapRpc,nodeId)
cls.locker.Unlock()
if ok == true {
rpc.client.Close(false)
}
}
func (cls *Cluster) serviceDiscoveryDelNode (nodeId int){
if nodeId == 0 {
return
}
cls.DelNode(nodeId)
}
func (cls *Cluster) delServiceNode(serviceName string,nodeId int){
nodeList := cls.mapServiceNode[serviceName]
for idx,nId := range nodeList {
if nId == nodeId {
cls.mapServiceNode[serviceName] = append(nodeList[:idx],nodeList[idx+1:]...)
return
}
if nodeId == cls.localNodeInfo.NodeId{
return
}
mapNode := cls.mapServiceNode[serviceName]
delete(mapNode,nodeId)
if len(mapNode)==0 {
delete(cls.mapServiceNode,serviceName)
}
}
func (cls *Cluster) serviceDiscoverySetNodeInfo (nodeInfo *NodeInfo){
if nodeInfo.NodeId == cls.localNodeInfo.NodeId || len(nodeInfo.ServiceList)==0 || nodeInfo.Private == true {
//本地结点不加入
if nodeInfo.NodeId == cls.localNodeInfo.NodeId {
return
}
cls.locker.Lock()
defer cls.locker.Unlock()
//先清理删除
cls.delNode(nodeInfo.NodeId)
//先清一次的NodeId对应的所有服务清
lastNodeInfo,ok := cls.mapIdNode[nodeInfo.NodeId]
if ok == true{
for _,serviceName := range lastNodeInfo.ServiceList{
cls.delServiceNode(serviceName,nodeInfo.NodeId)
}
}
//再重新组装
mapDuplicate := map[string]interface{}{} //预防重复数据
for _,serviceName := range nodeInfo.ServiceList {
for _,serviceName := range nodeInfo.PublicServiceList {
if _,ok := mapDuplicate[serviceName];ok == true {
//存在重复
log.Error("Bad duplicate Service Cfg.")
continue
}
mapDuplicate[serviceName] = nil
cls.mapServiceNode[serviceName] = append(cls.mapServiceNode[serviceName],nodeInfo.NodeId)
if _,ok:=cls.mapServiceNode[serviceName];ok==false {
cls.mapServiceNode[serviceName] = make(map[int]struct{},1)
}
cls.mapServiceNode[serviceName][nodeInfo.NodeId] = struct{}{}
}
cls.mapIdNode[nodeInfo.NodeId] = *nodeInfo
//已经存在连接,则不需要进行设置
if _,rpcInfoOK := cls.mapRpc[nodeInfo.NodeId];rpcInfoOK == true {
return
}
rpcInfo := NodeRpcInfo{}
rpcInfo.nodeInfo = *nodeInfo
rpcInfo.client = &rpc.Client{}
@@ -125,29 +162,21 @@ func (cls *Cluster) buildLocalRpc(){
cls.mapRpc[cls.localNodeInfo.NodeId] = rpcInfo
}
func (cls *Cluster) Init(localNodeId int) error{
cls.locker.Lock()
//1.处理服务发现接口
if cls.serviceDiscovery == nil {
cls.serviceDiscovery = &ConfigDiscovery{}
}
//2.初始化配置
func (cls *Cluster) Init(localNodeId int,setupServiceFun SetupServiceFun) error{
//1.初始化配置
err := cls.InitCfg(localNodeId)
if err != nil {
cls.locker.Unlock()
return err
}
cls.rpcServer.Init(cls)
cls.buildLocalRpc()
cls.serviceDiscovery.RegFunDelNode(cls.serviceDiscoveryDelNode)
cls.serviceDiscovery.RegFunSetNode(cls.serviceDiscoverySetNodeInfo)
cls.locker.Unlock()
//2.安装服务发现结点
cls.SetupServiceDiscovery(localNodeId,setupServiceFun)
service.RegRpcEventFun = cls.RegRpcEvent
err = cls.serviceDiscovery.Init(localNodeId)
err = cls.serviceDiscovery.InitDiscovery(localNodeId,cls.serviceDiscoveryDelNode,cls.serviceDiscoverySetNodeInfo)
if err != nil {
return err
}
@@ -155,6 +184,56 @@ func (cls *Cluster) Init(localNodeId int) error{
return nil
}
func (cls *Cluster) checkDynamicDiscovery(localNodeId int) (bool,bool){
var localMaster bool //本结点是否为Master结点
var hasMaster bool //是否配置Master服务
//遍历所有结点
for _,nodeInfo := range cls.discoveryNodeList{
if nodeInfo.NodeId == localNodeId {
localMaster = true
}
hasMaster = true
}
//返回查询结果
return localMaster,hasMaster
}
func (cls *Cluster) appendService(serviceName string){
cls.localNodeInfo.ServiceList = append(cls.localNodeInfo.ServiceList,serviceName)
if _,ok:=cls.mapServiceNode[serviceName];ok==false {
cls.mapServiceNode[serviceName] = map[int]struct{}{}
}
cls.mapServiceNode[serviceName][cls.localNodeInfo.NodeId]= struct{}{}
}
func (cls *Cluster) GetDiscoveryNodeList() []NodeInfo{
return cls.discoveryNodeList
}
func (cls *Cluster) SetupServiceDiscovery(localNodeId int,setupServiceFun SetupServiceFun) {
if cls.serviceDiscovery!=nil {
return
}
//1.如果没有配置DiscoveryNode配置则使用默认配置文件发现服务
localMaster,hasMaster := cls.checkDynamicDiscovery(localNodeId)
if hasMaster == false {
cls.serviceDiscovery = &ConfigDiscovery{}
return
}
setupServiceFun(&masterService,&clientService)
//2.如果为动态服务发现安装本地发现服务
cls.serviceDiscovery = getDynamicDiscovery()
if localMaster == true {
cls.appendService(DynamicDiscoveryMasterName)
}
cls.appendService(DynamicDiscoveryClientName)
}
func (cls *Cluster) FindRpcHandler(serviceName string) rpc.IRpcHandler {
pService := service.GetService(serviceName)
if pService == nil {
@@ -164,21 +243,7 @@ func (cls *Cluster) FindRpcHandler(serviceName string) rpc.IRpcHandler {
return pService.GetRpcHandler()
}
func (cls *Cluster) Start() {
cls.rpcServer.Start(cls.localNodeInfo.ListenAddr)
}
func (cls *Cluster) Stop() {
cls.serviceDiscovery.OnNodeStop()
}
func GetCluster() *Cluster{
return &cluster
}
func (cls *Cluster) GetRpcClient(nodeId int) *rpc.Client {
cls.locker.RLock()
defer cls.locker.RUnlock()
func (cls *Cluster) getRpcClient(nodeId int) *rpc.Client {
c,ok := cls.mapRpc[nodeId]
if ok == false {
return nil
@@ -187,6 +252,12 @@ func (cls *Cluster) GetRpcClient(nodeId int) *rpc.Client {
return c.client
}
func (cls *Cluster) GetRpcClient(nodeId int) *rpc.Client {
cls.locker.RLock()
defer cls.locker.RUnlock()
return cls.getRpcClient(nodeId)
}
func GetRpcClient(nodeId int,serviceMethod string,clientList []*rpc.Client) (error,int) {
if nodeId>0 {
pClient := GetCluster().GetRpcClient(nodeId)
@@ -217,21 +288,55 @@ func (cls *Cluster) IsNodeConnected (nodeId int) bool {
return pClient!=nil && pClient.IsConnected()
}
func (cls *Cluster) RegisterRpcListener (rpcLister rpc.IRpcListener) {
cls.rpcListerList = append(cls.rpcListerList,rpcLister)
}
func (cls *Cluster) triggerRpcEvent (bConnect bool,nodeId int) {
for _,lister := range cls.rpcListerList {
if bConnect {
lister.OnRpcConnected(nodeId)
}else{
lister.OnRpcDisconnect(nodeId)
}
func (cls *Cluster) triggerRpcEvent (bConnect bool,clientSeq uint32,nodeId int) {
cls.locker.Lock()
nodeInfo,ok := cls.mapRpc[nodeId]
if ok == false || nodeInfo.client==nil || nodeInfo.client.GetClientSeq()!=clientSeq {
cls.locker.Unlock()
return
}
cls.locker.Unlock()
cls.rpcEventLocker.Lock()
for serviceName,_:= range cls.mapServiceListenRpcEvent{
ser := service.GetService(serviceName)
if ser == nil {
log.Error("cannot find service name %s",serviceName)
continue
}
var eventData service.RpcEventData
eventData.IsConnect = bConnect
eventData.NodeId = nodeId
ser.(service.IModule).NotifyEvent(&eventData)
}
cls.rpcEventLocker.Unlock()
}
func (cls *Cluster) GetLocalNodeInfo() *NodeInfo {
return &cls.localNodeInfo
}
func (cls *Cluster) RegRpcEvent(serviceName string){
cls.rpcEventLocker.Lock()
if cls.mapServiceListenRpcEvent == nil {
cls.mapServiceListenRpcEvent = map[string]struct{}{}
}
cls.mapServiceListenRpcEvent[serviceName] = struct{}{}
cls.rpcEventLocker.Unlock()
}
func (cls *Cluster) UnRegRpcEvent(serviceName string){
cls.rpcEventLocker.Lock()
delete(cls.mapServiceListenRpcEvent,serviceName)
cls.rpcEventLocker.Unlock()
}
func (cls *Cluster) FetchAllNodeId(fetchFun func(nodeId int)){
cls.locker.Lock()
for nodeId,_:= range cls.mapIdNode {
fetchFun(nodeId)
}
cls.locker.Unlock()
}

View File

@@ -1,29 +1,19 @@
package cluster
import "strings"
type ConfigDiscovery struct {
funDelService FunDelNode
funSetService FunSetNodeInfo
localNodeId int
}
func (discovery *ConfigDiscovery) privateService(nodeInfo *NodeInfo){
var serviceList []string
for _,s := range nodeInfo.ServiceList {
if strings.HasPrefix(s,"_") {
continue
}
serviceList = append(serviceList,s)
}
nodeInfo.ServiceList = serviceList
}
func (discovery *ConfigDiscovery) Init(localNodeId int) error{
func (discovery *ConfigDiscovery) InitDiscovery(localNodeId int,funDelNode FunDelNode,funSetNodeInfo FunSetNodeInfo) error{
discovery.localNodeId = localNodeId
discovery.funDelService = funDelNode
discovery.funSetService = funSetNodeInfo
//解析本地其他服务配置
nodeInfoList,err := GetCluster().readLocalClusterConfig(0)
_,nodeInfoList,err := GetCluster().readLocalClusterConfig(0)
if err != nil {
return err
}
@@ -32,8 +22,7 @@ func (discovery *ConfigDiscovery) Init(localNodeId int) error{
if nodeInfo.NodeId == localNodeId {
continue
}
//去除私有服务
discovery.privateService(&nodeInfo)
discovery.funSetService(&nodeInfo)
}
@@ -43,10 +32,3 @@ func (discovery *ConfigDiscovery) Init(localNodeId int) error{
func (discovery *ConfigDiscovery) OnNodeStop(){
}
func (discovery *ConfigDiscovery) RegFunDelNode(funDelNode FunDelNode){
discovery.funDelService = funDelNode
}
func (discovery *ConfigDiscovery) RegFunSetNode(funSetNodeInfo FunSetNodeInfo){
discovery.funSetService = funSetNodeInfo
}

269
cluster/dynamicdiscovery.go Normal file
View File

@@ -0,0 +1,269 @@
package cluster
import (
"fmt"
"github.com/duanhf2012/origin/log"
"github.com/duanhf2012/origin/rpc"
"github.com/duanhf2012/origin/service"
"time"
)
const maxTryCount = 30 //最大重试次数
const perTrySecond = 2*time.Second //每次重试间隔2秒
const DynamicDiscoveryMasterName = "DiscoveryMaster"
const DynamicDiscoveryClientName = "DiscoveryClient"
const DynamicDiscoveryMasterNameRpcMethod = DynamicDiscoveryMasterName+".RPC_RegServiceDiscover"
const DynamicDiscoveryClientNameRpcMethod = DynamicDiscoveryClientName+".RPC_SubServiceDiscover"
type DynamicDiscoveryMaster struct {
service.Service
mapNodeInfo map[int32] *rpc.NodeInfo
nodeInfo []*rpc.NodeInfo
}
type DynamicDiscoveryClient struct {
service.Service
funDelService FunDelNode
funSetService FunSetNodeInfo
localNodeId int
}
var masterService DynamicDiscoveryMaster
var clientService DynamicDiscoveryClient
func getDynamicDiscovery() IServiceDiscovery{
return &clientService
}
func init(){
masterService.SetName(DynamicDiscoveryMasterName)
clientService.SetName(DynamicDiscoveryClientName)
}
func (ds *DynamicDiscoveryMaster) addNodeInfo(nodeInfo *rpc.NodeInfo){
_,ok := ds.mapNodeInfo[nodeInfo.NodeId]
if ok == true {
return
}
ds.nodeInfo = append(ds.nodeInfo,nodeInfo)
}
func (ds *DynamicDiscoveryMaster) OnInit() error{
ds.mapNodeInfo = make(map[int32] *rpc.NodeInfo,20)
ds.RegisterRpcListener(ds)
return nil
}
func (ds *DynamicDiscoveryMaster) OnStart(){
var nodeInfo rpc.NodeInfo
localNodeInfo := cluster.GetLocalNodeInfo()
if localNodeInfo.Private == true {
return
}
nodeInfo.NodeId = int32(localNodeInfo.NodeId)
nodeInfo.NodeName = localNodeInfo.NodeName
nodeInfo.ListenAddr = localNodeInfo.ListenAddr
nodeInfo.PublicServiceList = localNodeInfo.PublicServiceList
ds.addNodeInfo(&nodeInfo)
}
func (ds *DynamicDiscoveryMaster) OnRpcConnected(nodeId int){
//向它发布所有服务列表信息
var notifyDiscover rpc.SubscribeDiscoverNotify
notifyDiscover.IsFull = true
notifyDiscover.NodeInfo = ds.nodeInfo
ds.GoNode(nodeId,DynamicDiscoveryClientNameRpcMethod,&notifyDiscover)
}
func (ds *DynamicDiscoveryMaster) OnRpcDisconnect(nodeId int){
var notifyDiscover rpc.SubscribeDiscoverNotify
notifyDiscover.DelNodeId = int32(nodeId)
//删除结点
cluster.DelNode(nodeId)
ds.CastGo(DynamicDiscoveryClientNameRpcMethod,&notifyDiscover)
}
// 收到注册过来的结点
func (ds *DynamicDiscoveryMaster) RPC_RegServiceDiscover(req *rpc.ServiceDiscoverReq, res *rpc.ServiceDiscoverRes) error{
if req.NodeInfo == nil {
err := fmt.Errorf("RPC_RegServiceDiscover req is error.")
log.Error(err.Error())
return err
}
//广播给其他所有结点
var notifyDiscover rpc.SubscribeDiscoverNotify
notifyDiscover.NodeInfo = append(notifyDiscover.NodeInfo,req.NodeInfo)
ds.CastGo(DynamicDiscoveryClientNameRpcMethod,&notifyDiscover)
//存入本地
ds.addNodeInfo(req.NodeInfo)
//初始化结点信息
var nodeInfo NodeInfo
nodeInfo.NodeId = int(req.NodeInfo.NodeId)
nodeInfo.NodeName = req.NodeInfo.NodeName
nodeInfo.Private = req.NodeInfo.Private
nodeInfo.ServiceList = req.NodeInfo.PublicServiceList
nodeInfo.PublicServiceList = req.NodeInfo.PublicServiceList
nodeInfo.ListenAddr = req.NodeInfo.ListenAddr
//主动删除已经存在的结点,确保先断开,再连接
cluster.serviceDiscoveryDelNode(nodeInfo.NodeId)
//加入到本地Cluster模块中将连接该结点
//如果本结点不为master结点而且没有可使用的服务不加入
cluster.serviceDiscoverySetNodeInfo(&nodeInfo)
res.NodeInfo = ds.nodeInfo
return nil
}
func (dc *DynamicDiscoveryClient) OnInit() error{
dc.RegisterRpcListener(dc)
return nil
}
func (dc *DynamicDiscoveryClient) OnStart(){
//2.添加并连接发现主结点
localNodeInfo := cluster.GetLocalNodeInfo()
localNodeInfo.PublicServiceList = append(localNodeInfo.PublicServiceList,DynamicDiscoveryClientName)
dc.addDiscoveryMaster()
}
func (dc *DynamicDiscoveryClient) addDiscoveryMaster(){
discoveryNodeList := cluster.GetDiscoveryNodeList()
for i:=0;i<len(discoveryNodeList);i++ {
if discoveryNodeList[i].NodeId == cluster.GetLocalNodeInfo().NodeId {
continue
}
dc.funSetService(&discoveryNodeList[i])
//cluster.serviceDiscoverySetNodeInfo(&discoveryNodeList[i])
}
}
//订阅发现的服务通知
func (dc *DynamicDiscoveryClient) RPC_SubServiceDiscover(req *rpc.SubscribeDiscoverNotify) error{
//如果为完整同步,则找出差异的结点
var willDelNodeId []int
if req.IsFull {
mapNodeId := make(map[int32]struct{},len(req.NodeInfo))
for _,nodeInfo:= range req.NodeInfo{
mapNodeId[nodeInfo.NodeId] = struct{}{}
}
cluster.FetchAllNodeId(func(nodeId int){
if nodeId != dc.localNodeId {
if _, ok := mapNodeId[int32(nodeId)]; ok == false {
willDelNodeId = append(willDelNodeId, nodeId)
}
}
})
}
//忽略本地结点
if req.DelNodeId != int32(dc.localNodeId) && req.DelNodeId>0 {
willDelNodeId = append(willDelNodeId, int(req.DelNodeId))
}
//删除不必要的结点
for _,nodeId := range willDelNodeId {
dc.funDelService(nodeId)
}
//发现新结点
for _, nodeInfo := range req.NodeInfo {
dc.setNodeInfo(nodeInfo)
}
return nil
}
func (dc *DynamicDiscoveryClient) isDiscoverNode(nodeId int) bool{
for i:=0;i< len(cluster.discoveryNodeList);i++{
if cluster.discoveryNodeList[i].NodeId == nodeId {
return true
}
}
return false
}
func (dc *DynamicDiscoveryClient) OnRpcConnected(nodeId int) {
if dc.isDiscoverNode(nodeId) == false {
return
}
var req rpc.ServiceDiscoverReq
req.NodeInfo = &rpc.NodeInfo{}
req.NodeInfo.NodeId = int32(cluster.localNodeInfo.NodeId)
req.NodeInfo.NodeName = cluster.localNodeInfo.NodeName
req.NodeInfo.ListenAddr = cluster.localNodeInfo.ListenAddr
req.NodeInfo.PublicServiceList = cluster.localNodeInfo.PublicServiceList
//如果是连接发现主服成功,则同步服务信息
err := dc.AsyncCallNode(nodeId, DynamicDiscoveryMasterNameRpcMethod, &req, func(res *rpc.ServiceDiscoverRes, err error) {
if err != nil {
log.Error("call %s is fail :%s", DynamicDiscoveryMasterNameRpcMethod, err.Error())
return
}
})
if err != nil {
log.Error("call %s is fail :%s", DynamicDiscoveryMasterNameRpcMethod, err.Error())
}
}
func (dc *DynamicDiscoveryClient) setNodeInfo(nodeInfo *rpc.NodeInfo){
if nodeInfo==nil || nodeInfo.Private == true || int(nodeInfo.NodeId) == dc.localNodeId{
return
}
//筛选关注的服务
localNodeInfo := cluster.GetLocalNodeInfo()
if len(localNodeInfo.DiscoveryService) >0 {
var discoverServiceSlice = make([]string,0,24)
for _,pubService := range nodeInfo.PublicServiceList {
for _, discoverService := range localNodeInfo.DiscoveryService {
if pubService == discoverService {
discoverServiceSlice = append(discoverServiceSlice,pubService)
}
}
}
nodeInfo.PublicServiceList = discoverServiceSlice
}
if len(nodeInfo.PublicServiceList)==0{
return
}
var nInfo NodeInfo
nInfo.ServiceList = nodeInfo.PublicServiceList
nInfo.PublicServiceList = nodeInfo.PublicServiceList
nInfo.NodeId = int(nodeInfo.NodeId)
nInfo.NodeName = nodeInfo.NodeName
nInfo.ListenAddr = nodeInfo.ListenAddr
dc.funSetService(&nInfo)
}
func (dc *DynamicDiscoveryClient) OnRpcDisconnect(nodeId int){
}
func (dc *DynamicDiscoveryClient) InitDiscovery(localNodeId int,funDelNode FunDelNode,funSetNodeInfo FunSetNodeInfo) error{
dc.localNodeId = localNodeId
dc.funDelService = funDelNode
dc.funSetService = funSetNodeInfo
return nil
}
func (dc *DynamicDiscoveryClient) OnNodeStop(){
}

View File

@@ -11,6 +11,7 @@ import (
var json = jsoniter.ConfigCompatibleWithStandardLibrary
type NodeInfoList struct {
DiscoveryNode []NodeInfo //用于服务发现Node
NodeList []NodeInfo
}
@@ -62,12 +63,13 @@ func (cls *Cluster) readServiceConfig(filepath string) (map[string]interface{},
return serviceConfig,mapNodeService,nil
}
func (cls *Cluster) readLocalClusterConfig(nodeId int) ([]NodeInfo,error) {
var nodeInfoList [] NodeInfo
func (cls *Cluster) readLocalClusterConfig(nodeId int) ([]NodeInfo,[]NodeInfo,error) {
var nodeInfoList []NodeInfo
var discoverNodeList []NodeInfo
clusterCfgPath :=strings.TrimRight(configDir,"/") +"/cluster"
fileInfoList,err := ioutil.ReadDir(clusterCfgPath)
if err != nil {
return nil,fmt.Errorf("Read dir %s is fail :%+v",clusterCfgPath,err)
return nil,nil,fmt.Errorf("Read dir %s is fail :%+v",clusterCfgPath,err)
}
//读取任何文件,只读符合格式的配置,目录下的文件可以自定义分文件
@@ -76,9 +78,9 @@ func (cls *Cluster) readLocalClusterConfig(nodeId int) ([]NodeInfo,error) {
filePath := strings.TrimRight(strings.TrimRight(clusterCfgPath,"/"),"\\")+"/"+f.Name()
localNodeInfoList,err := cls.ReadClusterConfig(filePath)
if err != nil {
return nil,fmt.Errorf("read file path %s is error:%+v" ,filePath,err)
return nil,nil,fmt.Errorf("read file path %s is error:%+v" ,filePath,err)
}
discoverNodeList = append(discoverNodeList,localNodeInfoList.DiscoveryNode...)
for _,nodeInfo := range localNodeInfoList.NodeList {
if nodeInfo.NodeId == nodeId || nodeId == 0 {
nodeInfoList = append(nodeInfoList,nodeInfo)
@@ -88,10 +90,22 @@ func (cls *Cluster) readLocalClusterConfig(nodeId int) ([]NodeInfo,error) {
}
if nodeId != 0 && (len(nodeInfoList)!=1){
return nil,fmt.Errorf("%d configurations were found for the configuration with node ID %d!",len(nodeInfoList),nodeId)
return nil,nil,fmt.Errorf("%d configurations were found for the configuration with node ID %d!",len(nodeInfoList),nodeId)
}
return nodeInfoList,nil
for i,_ := range nodeInfoList{
for j,s := range nodeInfoList[i].ServiceList{
//私有结点不加入到Public服务列表中
if strings.HasPrefix(s,"_") == false && nodeInfoList[i].Private==false {
nodeInfoList[i].PublicServiceList = append(nodeInfoList[i].PublicServiceList,strings.TrimLeft(s,"_"))
}else{
nodeInfoList[i].ServiceList[j] = strings.TrimLeft(s,"_")
}
}
}
return discoverNodeList,nodeInfoList,nil
}
func (cls *Cluster) readLocalService(localNodeId int) error {
@@ -142,29 +156,44 @@ func (cls *Cluster) parseLocalCfg(){
cls.mapIdNode[cls.localNodeInfo.NodeId] = cls.localNodeInfo
for _,sName := range cls.localNodeInfo.ServiceList{
cls.mapServiceNode[sName] = append(cls.mapServiceNode[sName], cls.localNodeInfo.NodeId)
if _,ok:=cls.mapServiceNode[sName];ok==false{
cls.mapServiceNode[sName] = make(map[int]struct{})
}
cls.mapServiceNode[sName][cls.localNodeInfo.NodeId]= struct{}{}
}
}
func (cls *Cluster) localPrivateService(localNodeInfo *NodeInfo){
for i:=0;i<len(localNodeInfo.ServiceList);i++{
localNodeInfo.ServiceList[i] = strings.TrimLeft(localNodeInfo.ServiceList[i],"_")
func (cls *Cluster) checkDiscoveryNodeList(discoverMasterNode []NodeInfo) bool{
for i:=0;i<len(discoverMasterNode)-1;i++{
for j:=i+1;i<len(discoverMasterNode);j++{
if discoverMasterNode[i].NodeId == discoverMasterNode[j].NodeId ||
discoverMasterNode[i].ListenAddr == discoverMasterNode[j].ListenAddr {
return false
}
}
}
return true
}
func (cls *Cluster) InitCfg(localNodeId int) error{
cls.localServiceCfg = map[string]interface{}{}
cls.mapRpc = map[int] NodeRpcInfo{}
cls.mapIdNode = map[int]NodeInfo{}
cls.mapServiceNode = map[string][]int{}
cls.mapServiceNode = map[string]map[int]struct{}{}
//加载本地结点的NodeList配置
nodeInfoList,err := cls.readLocalClusterConfig(localNodeId)
discoveryNode,nodeInfoList,err := cls.readLocalClusterConfig(localNodeId)
if err != nil {
return err
}
cls.localNodeInfo = nodeInfoList[0]
cls.localPrivateService(&cls.localNodeInfo)
if cls.checkDiscoveryNodeList(discoveryNode) ==false {
return fmt.Errorf("DiscoveryNode config is error!")
}
cls.discoveryNodeList = discoveryNode
//读取本地服务配置
err = cls.readLocalService(localNodeId)
@@ -180,28 +209,23 @@ func (cls *Cluster) InitCfg(localNodeId int) error{
func (cls *Cluster) IsConfigService(serviceName string) bool {
cls.locker.RLock()
defer cls.locker.RUnlock()
nodeList,ok := cls.mapServiceNode[serviceName]
mapNode,ok := cls.mapServiceNode[serviceName]
if ok == false {
return false
}
for _,nodeId := range nodeList{
if cls.localNodeInfo.NodeId == nodeId {
return true
}
}
return false
_,ok = mapNode[cls.localNodeInfo.NodeId]
return ok
}
func (cls *Cluster) GetNodeIdByService(serviceName string,rpcClientList []*rpc.Client,bAll bool) (error,int) {
cls.locker.RLock()
defer cls.locker.RUnlock()
nodeIdList,ok := cls.mapServiceNode[serviceName]
mapNodeId,ok := cls.mapServiceNode[serviceName]
count := 0
if ok == true {
for _,nodeId := range nodeIdList {
pClient := GetCluster().GetRpcClient(nodeId)
for nodeId,_ := range mapNodeId {
pClient := GetCluster().getRpcClient(nodeId)
if pClient==nil || (bAll == false && pClient.IsConnected()==false) {
continue
}

View File

@@ -6,9 +6,7 @@ type FunDelNode func (nodeId int)
type FunSetNodeInfo func(nodeInfo *NodeInfo)
type IServiceDiscovery interface {
Init(localNodeId int) error
InitDiscovery(localNodeId int,funDelNode FunDelNode,funSetNodeInfo FunSetNodeInfo) error
OnNodeStop()
RegFunDelNode(funDelNode FunDelNode)
RegFunSetNode(funSetNodeInfo FunSetNodeInfo)
}

View File

@@ -7,6 +7,7 @@ const (
Sys_Event_Tcp EventType = 1
Sys_Event_Http_Event EventType = 2
Sys_Event_WebSocket EventType = 3
Sys_Event_Rpc_Event EventType = 4
Sys_Event_User_Define EventType = 1000
)

View File

@@ -79,7 +79,7 @@ func (client *TCPClient) dial() net.Conn {
return conn
}
log.Release("connect to %v error: %v", client.Addr, err)
log.Warning("connect to %v error: %v", client.Addr, err)
time.Sleep(client.ConnectInterval)
continue
}

View File

@@ -129,7 +129,7 @@ func GetNodeId() int {
func initNode(id int){
//1.初始化集群
nodeId = id
err := cluster.GetCluster().Init(GetNodeId())
err := cluster.GetCluster().Init(GetNodeId(),Setup)
if err != nil {
log.Fatal("read system config is error %+v",err)
}

View File

@@ -15,6 +15,7 @@ import (
)
type Client struct {
clientSeq uint32
id int
bSelfNode bool
network.TCPClient
@@ -29,6 +30,8 @@ type Client struct {
TriggerRpcEvent
}
var clientSeq uint32
func (client *Client) NewClientAgent(conn *network.TCPConn) network.Agent {
client.conn = conn
client.ResetPending()
@@ -37,6 +40,7 @@ func (client *Client) NewClientAgent(conn *network.TCPConn) network.Agent {
}
func (client *Client) Connect(id int,addr string) error {
client.clientSeq = atomic.AddUint32(&clientSeq,1)
client.id = id
client.Addr = addr
client.maxCheckCallRpcCount = 1000
@@ -82,9 +86,6 @@ func (client *Client) makeCallFail(call *Call){
}else{
call.done <- call
}
}
func (client *Client) checkRpcCallTimeout(){
@@ -263,7 +264,7 @@ func (client *Client) Run(){
}
}()
client.TriggerRpcEvent(true,client.GetId())
client.TriggerRpcEvent(true,client.GetClientSeq(),client.GetId())
for {
bytes,err := client.conn.ReadMsg()
if err != nil {
@@ -319,7 +320,7 @@ func (client *Client) Run(){
}
func (client *Client) OnClose(){
client.TriggerRpcEvent(false,client.GetId())
client.TriggerRpcEvent(false,client.GetClientSeq(),client.GetId())
}
func (client *Client) IsConnected() bool {
@@ -329,3 +330,11 @@ func (client *Client) IsConnected() bool {
func (client *Client) GetId() int{
return client.id
}
func (client *Client) Close(waitDone bool){
client.TCPClient.Close(waitDone)
}
func (client *Client) GetClientSeq() uint32 {
return client.clientSeq
}

1181
rpc/dynamicdiscover.pb.go Normal file

File diff suppressed because it is too large Load Diff

29
rpc/dynamicdiscover.proto Normal file
View File

@@ -0,0 +1,29 @@
syntax = "proto3";
package rpc;
option go_package = "./rpc";
message NodeInfo{
int32 NodeId = 1;
string NodeName = 2;
string ListenAddr = 3;
bool Private = 4;
repeated string PublicServiceList = 5;
}
//Client->Master
message ServiceDiscoverReq{
NodeInfo nodeInfo = 1;
}
//Master->Client
message SubscribeDiscoverNotify{
bool IsFull = 1;
int32 DelNodeId = 2;
repeated NodeInfo nodeInfo = 3;
}
//Master->Client
message ServiceDiscoverRes{
repeated NodeInfo nodeInfo = 1;
}

View File

@@ -1,5 +1,5 @@
// Code generated by protoc-gen-gogo. DO NOT EDIT.
// source: bin/gogopb/gogorpc.proto
// source: rpc/gogorpc.proto
package rpc
@@ -37,7 +37,7 @@ func (m *GoGoPBRpcRequestData) Reset() { *m = GoGoPBRpcRequestData{} }
func (m *GoGoPBRpcRequestData) String() string { return proto.CompactTextString(m) }
func (*GoGoPBRpcRequestData) ProtoMessage() {}
func (*GoGoPBRpcRequestData) Descriptor() ([]byte, []int) {
return fileDescriptor_b3b900b0f45d7fb5, []int{0}
return fileDescriptor_38afb24c36168563, []int{0}
}
func (m *GoGoPBRpcRequestData) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
@@ -114,7 +114,7 @@ func (m *GoGoPBRpcResponseData) Reset() { *m = GoGoPBRpcResponseData{} }
func (m *GoGoPBRpcResponseData) String() string { return proto.CompactTextString(m) }
func (*GoGoPBRpcResponseData) ProtoMessage() {}
func (*GoGoPBRpcResponseData) Descriptor() ([]byte, []int) {
return fileDescriptor_b3b900b0f45d7fb5, []int{1}
return fileDescriptor_38afb24c36168563, []int{1}
}
func (m *GoGoPBRpcResponseData) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
@@ -169,26 +169,25 @@ func init() {
proto.RegisterType((*GoGoPBRpcResponseData)(nil), "rpc.GoGoPBRpcResponseData")
}
func init() { proto.RegisterFile("bin/gogopb/gogorpc.proto", fileDescriptor_b3b900b0f45d7fb5) }
func init() { proto.RegisterFile("rpc/gogorpc.proto", fileDescriptor_38afb24c36168563) }
var fileDescriptor_b3b900b0f45d7fb5 = []byte{
// 241 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x92, 0x48, 0xca, 0xcc, 0xd3,
0x4f, 0xcf, 0x4f, 0xcf, 0x2f, 0x48, 0x02, 0x53, 0x45, 0x05, 0xc9, 0x7a, 0x05, 0x45, 0xf9, 0x25,
0xf9, 0x42, 0xcc, 0x45, 0x05, 0xc9, 0x4a, 0x4b, 0x18, 0xb9, 0x44, 0xdc, 0xf3, 0xdd, 0xf3, 0x03,
0x9c, 0x82, 0x0a, 0x92, 0x83, 0x52, 0x0b, 0x4b, 0x53, 0x8b, 0x4b, 0x5c, 0x12, 0x4b, 0x12, 0x85,
0x04, 0xb8, 0x98, 0x83, 0x53, 0x0b, 0x25, 0x18, 0x15, 0x18, 0x35, 0x58, 0x82, 0x40, 0x4c, 0x21,
0x05, 0x2e, 0xee, 0xa0, 0x82, 0x64, 0xdf, 0xd4, 0x92, 0x8c, 0xfc, 0x14, 0xcf, 0x14, 0x09, 0x26,
0x05, 0x46, 0x0d, 0xde, 0x20, 0x64, 0x21, 0x21, 0x15, 0x2e, 0xde, 0xe0, 0xd4, 0xa2, 0xb2, 0xcc,
0xe4, 0x54, 0x88, 0x90, 0x04, 0xb3, 0x02, 0xa3, 0x06, 0x67, 0x10, 0xaa, 0xa0, 0x90, 0x04, 0x17,
0xbb, 0x5f, 0x7e, 0x50, 0x6a, 0x41, 0x4e, 0xa5, 0x04, 0x8b, 0x02, 0xa3, 0x06, 0x47, 0x10, 0x8c,
0x0b, 0x92, 0xf1, 0xcc, 0x0b, 0x48, 0x2c, 0x4a, 0xcc, 0x95, 0x60, 0x55, 0x60, 0xd4, 0xe0, 0x09,
0x82, 0x71, 0x95, 0x42, 0xb9, 0x44, 0x91, 0x5c, 0x59, 0x5c, 0x90, 0x9f, 0x57, 0x9c, 0x8a, 0xc3,
0x99, 0x22, 0x5c, 0xac, 0xae, 0x45, 0x45, 0xf9, 0x45, 0x60, 0x07, 0x72, 0x06, 0x41, 0x38, 0x20,
0x51, 0x88, 0x95, 0xcc, 0x60, 0x83, 0x21, 0x1c, 0x27, 0xe1, 0x13, 0x8f, 0xe4, 0x18, 0x2f, 0x3c,
0x92, 0x63, 0x7c, 0xf0, 0x48, 0x8e, 0x31, 0x8a, 0x55, 0x4f, 0xbf, 0xa8, 0x20, 0x39, 0x89, 0x0d,
0x1c, 0x3c, 0xc6, 0x80, 0x00, 0x00, 0x00, 0xff, 0xff, 0xe8, 0x18, 0x52, 0x1a, 0x3a, 0x01, 0x00,
0x00,
var fileDescriptor_38afb24c36168563 = []byte{
// 237 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x12, 0x2c, 0x2a, 0x48, 0xd6,
0x4f, 0xcf, 0x4f, 0xcf, 0x2f, 0x2a, 0x48, 0xd6, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x62, 0x2e,
0x2a, 0x48, 0x56, 0x5a, 0xc2, 0xc8, 0x25, 0xe2, 0x9e, 0xef, 0x9e, 0x1f, 0xe0, 0x14, 0x54, 0x90,
0x1c, 0x94, 0x5a, 0x58, 0x9a, 0x5a, 0x5c, 0xe2, 0x92, 0x58, 0x92, 0x28, 0x24, 0xc0, 0xc5, 0x1c,
0x9c, 0x5a, 0x28, 0xc1, 0xa8, 0xc0, 0xa8, 0xc1, 0x12, 0x04, 0x62, 0x0a, 0x29, 0x70, 0x71, 0x07,
0x15, 0x24, 0xfb, 0xa6, 0x96, 0x64, 0xe4, 0xa7, 0x78, 0xa6, 0x48, 0x30, 0x29, 0x30, 0x6a, 0xf0,
0x06, 0x21, 0x0b, 0x09, 0xa9, 0x70, 0xf1, 0x06, 0xa7, 0x16, 0x95, 0x65, 0x26, 0xa7, 0x42, 0x84,
0x24, 0x98, 0x15, 0x18, 0x35, 0x38, 0x83, 0x50, 0x05, 0x85, 0x24, 0xb8, 0xd8, 0xfd, 0xf2, 0x83,
0x52, 0x0b, 0x72, 0x2a, 0x25, 0x58, 0x14, 0x18, 0x35, 0x38, 0x82, 0x60, 0x5c, 0x90, 0x8c, 0x67,
0x5e, 0x40, 0x62, 0x51, 0x62, 0xae, 0x04, 0xab, 0x02, 0xa3, 0x06, 0x4f, 0x10, 0x8c, 0xab, 0x14,
0xca, 0x25, 0x8a, 0xe4, 0xca, 0xe2, 0x82, 0xfc, 0xbc, 0xe2, 0x54, 0x1c, 0xce, 0x14, 0xe1, 0x62,
0x75, 0x2d, 0x2a, 0xca, 0x2f, 0x02, 0x3b, 0x90, 0x33, 0x08, 0xc2, 0x01, 0x89, 0x42, 0xac, 0x64,
0x06, 0x1b, 0x0c, 0xe1, 0x38, 0x09, 0x9f, 0x78, 0x24, 0xc7, 0x78, 0xe1, 0x91, 0x1c, 0xe3, 0x83,
0x47, 0x72, 0x8c, 0x51, 0xac, 0x7a, 0xfa, 0x45, 0x05, 0xc9, 0x49, 0x6c, 0xe0, 0xe0, 0x31, 0x06,
0x04, 0x00, 0x00, 0xff, 0xff, 0xc7, 0xfc, 0x50, 0x87, 0x33, 0x01, 0x00, 0x00,
}
func (m *GoGoPBRpcRequestData) Marshal() (dAtA []byte, err error) {

View File

@@ -63,7 +63,7 @@ type RpcHandler struct {
callResponseCallBack chan *Call //异步返回的回调
}
type TriggerRpcEvent func(bConnect bool,nodeId int)
type TriggerRpcEvent func(bConnect bool,clientSeq uint32,nodeId int)
type IRpcListener interface {
OnRpcConnected(nodeId int)
OnRpcDisconnect(nodeId int)
@@ -467,6 +467,9 @@ func (handler *RpcHandler) asyncCallRpc(nodeid int,serviceMethod string,args int
var pClientList [maxClusterNode]*Client
err,count := handler.funcRpcClient(nodeid,serviceMethod,pClientList[:])
if count==0||err != nil {
if err == nil {
err = fmt.Errorf("cannot find rpcclient from nodeid %d serviceMethod %s",nodeid,serviceMethod)
}
fVal.Call([]reflect.Value{reflect.ValueOf(reply),reflect.ValueOf(err)})
log.Error("Call serviceMethod is error:%+v!",err)
return nil

View File

@@ -26,7 +26,7 @@ type IModule interface {
GetService() IService
GetModuleName() string
GetEventProcessor()event.IEventProcessor
NotifyEvent(ev *event.Event)
NotifyEvent(ev event.IEvent)
}
type IModuleTimer interface {
@@ -213,7 +213,7 @@ func (m *Module) GetEventProcessor() event.IEventProcessor{
return m.eventHandler.GetEventProcessor()
}
func (m *Module) NotifyEvent(ev *event.Event){
func (m *Module) NotifyEvent(ev event.IEvent){
m.eventHandler.NotifyEvent(ev)
}

View File

@@ -23,6 +23,7 @@ type IService interface {
GetName() string
OnSetup(iService IService)
OnInit() error
OnStart()
OnRelease()
Wait()
Start()
@@ -42,6 +43,16 @@ type Service struct {
startStatus bool
eventProcessor event.IEventProcessor
profiler *profiler.Profiler //性能分析器
rpcEventLister rpc.IRpcListener
}
type RpcEventData struct{
IsConnect bool
NodeId int
}
func (rpcEventData *RpcEventData) GetEventType() event.EventType{
return event.Sys_Event_Rpc_Event
}
func (s *Service) OnSetup(iService IService){
@@ -99,6 +110,7 @@ func (s *Service) Run() {
log.Debug("Start running Service %s.", s.GetName())
defer s.wg.Done()
var bStop = false
s.self.(IService).OnStart()
for{
rpcRequestChan := s.GetRpcRequestChan()
rpcResponseCallBack := s.GetRpcResponseChan()
@@ -212,3 +224,27 @@ func (s *Service) IsSingleCoroutine() bool {
func (s *Service) RegRawRpc(rpcMethodId uint32,rawRpcCB rpc.RawRpcCallBack){
s.rpcHandler.RegRawRpc(rpcMethodId,rawRpcCB)
}
func (s *Service) OnStart(){
}
func (s *Service) OnRpcEvent(ev event.IEvent){
event := ev.(*RpcEventData)
if event.IsConnect {
s.rpcEventLister.OnRpcConnected(event.NodeId)
}else{
s.rpcEventLister.OnRpcDisconnect(event.NodeId)
}
}
func (s *Service) RegisterRpcListener (rpcEventLister rpc.IRpcListener) {
s.rpcEventLister = rpcEventLister
s.RegEventReceiverFunc(event.Sys_Event_Rpc_Event,s.GetEventHandler(),s.OnRpcEvent)
RegRpcEventFun(s.GetName())
}
func (s *Service) UnRegisterRpcListener (rpcLister rpc.IRpcListener) {
s.UnRegEventReceiverFunc(event.Sys_Event_Rpc_Event,s.GetEventHandler())
RegRpcEventFun(s.GetName())
}

View File

@@ -4,6 +4,9 @@ package service
var mapServiceName map[string]IService
var setupServiceList []IService
type RegRpcEventFunType func(serviceName string)
var RegRpcEventFun RegRpcEventFunType
func init(){
mapServiceName = map[string]IService{}
setupServiceList = []IService{}