优化代码

This commit is contained in:
boyce
2020-10-29 20:45:33 +08:00
parent be792337fb
commit 3025eaebd5
7 changed files with 189 additions and 206 deletions

View File

@@ -9,9 +9,7 @@ import (
"sync"
)
var configdir = "./config/"
var configDir = "./config/"
type NodeInfo struct {
NodeId int
@@ -21,83 +19,77 @@ type NodeInfo struct {
}
type NodeRpcInfo struct {
nodeinfo NodeInfo
nodeInfo NodeInfo
client *rpc.Client
}
var cluster Cluster
type Cluster struct {
localNodeInfo NodeInfo //×
localServiceCfg map[string]interface{} //map[servicename]配置数据*
mapRpc map[int] NodeRpcInfo//nodeid
rpcServer rpc.Server
serviceDiscovery IServiceDiscovery //服务发现接口
mapIdNode map[int]NodeInfo //map[NodeId]NodeInfo
mapServiceNode map[string][]int //map[serviceName]NodeInfo
localNodeInfo NodeInfo
localServiceCfg map[string]interface{} //map[serviceName]配置数据*
mapRpc map[int] NodeRpcInfo //nodeId
serviceDiscovery IServiceDiscovery //服务发现接口
mapIdNode map[int]NodeInfo //map[NodeId]NodeInfo
mapServiceNode map[string][]int //map[serviceName]NodeInfo
locker sync.RWMutex
rpcServer rpc.Server
}
func SetConfigDir(cfgdir string){
configdir = cfgdir
func SetConfigDir(cfgDir string){
configDir = cfgDir
}
func SetServiceDiscovery(serviceDiscovery IServiceDiscovery) {
cluster.serviceDiscovery = serviceDiscovery
}
func (slf *Cluster) serviceDiscoveryDelNode (nodeId int){
slf.locker.Lock()
defer slf.locker.Unlock()
func (cls *Cluster) serviceDiscoveryDelNode (nodeId int){
cls.locker.Lock()
defer cls.locker.Unlock()
slf.delNode(nodeId)
cls.delNode(nodeId)
}
func (slf *Cluster) delNode(nodeId int){
func (cls *Cluster) delNode(nodeId int){
//删除rpc连接关系
rpc,ok := slf.mapRpc[nodeId]
rpc,ok := cls.mapRpc[nodeId]
if ok == true {
delete(slf.mapRpc,nodeId)
delete(cls.mapRpc,nodeId)
rpc.client.Close(false)
}
nodeInfo,ok := slf.mapIdNode[nodeId]
nodeInfo,ok := cls.mapIdNode[nodeId]
if ok == false {
return
}
for _,serviceName := range nodeInfo.ServiceList{
slf.delServiceNode(serviceName,nodeId)
cls.delServiceNode(serviceName,nodeId)
}
delete(slf.mapIdNode,nodeId)
delete(cls.mapIdNode,nodeId)
}
func (slf *Cluster) delServiceNode(serviceName string,nodeId int){
nodeList := slf.mapServiceNode[serviceName]
func (cls *Cluster) delServiceNode(serviceName string,nodeId int){
nodeList := cls.mapServiceNode[serviceName]
for idx,nId := range nodeList {
if nId == nodeId {
slf.mapServiceNode[serviceName] = append(nodeList[idx:],nodeList[idx+1:]...)
cls.mapServiceNode[serviceName] = append(nodeList[idx:],nodeList[idx+1:]...)
return
}
}
}
func (slf *Cluster) serviceDiscoverySetNodeInfo (nodeInfo *NodeInfo){
if nodeInfo.NodeId == slf.localNodeInfo.NodeId {
func (cls *Cluster) serviceDiscoverySetNodeInfo (nodeInfo *NodeInfo){
if nodeInfo.NodeId == cls.localNodeInfo.NodeId {
return
}
slf.locker.Lock()
defer slf.locker.Unlock()
cls.locker.Lock()
defer cls.locker.Unlock()
//先清理删除
slf.delNode(nodeInfo.NodeId)
cls.delNode(nodeInfo.NodeId)
//再重新组装
mapDuplicate := map[string]interface{}{} //预防重复数据
@@ -108,58 +100,58 @@ func (slf *Cluster) serviceDiscoverySetNodeInfo (nodeInfo *NodeInfo){
continue
}
slf.mapServiceNode[serviceName] = append(slf.mapServiceNode[serviceName],nodeInfo.NodeId)
cls.mapServiceNode[serviceName] = append(cls.mapServiceNode[serviceName],nodeInfo.NodeId)
}
slf.mapIdNode[nodeInfo.NodeId] = *nodeInfo
cls.mapIdNode[nodeInfo.NodeId] = *nodeInfo
rpcInfo := NodeRpcInfo{}
rpcInfo.nodeinfo = *nodeInfo
rpcInfo.nodeInfo = *nodeInfo
rpcInfo.client = &rpc.Client{}
rpcInfo.client.Connect(nodeInfo.ListenAddr)
slf.mapRpc[nodeInfo.NodeId] = rpcInfo
cls.mapRpc[nodeInfo.NodeId] = rpcInfo
}
func (slf *Cluster) buildLocalRpc(){
func (cls *Cluster) buildLocalRpc(){
rpcInfo := NodeRpcInfo{}
rpcInfo.nodeinfo = slf.localNodeInfo
rpcInfo.nodeInfo = cls.localNodeInfo
rpcInfo.client = &rpc.Client{}
rpcInfo.client.Connect("")
slf.mapRpc[slf.localNodeInfo.NodeId] = rpcInfo
cls.mapRpc[cls.localNodeInfo.NodeId] = rpcInfo
}
func (slf *Cluster) Init(localNodeId int) error{
slf.locker.Lock()
func (cls *Cluster) Init(localNodeId int) error{
cls.locker.Lock()
//1.处理服务发现接口
if slf.serviceDiscovery == nil {
slf.serviceDiscovery = &ConfigDiscovery{}
if cls.serviceDiscovery == nil {
cls.serviceDiscovery = &ConfigDiscovery{}
}
//2.初始化配置
err := slf.InitCfg(localNodeId)
err := cls.InitCfg(localNodeId)
if err != nil {
slf.locker.Unlock()
cls.locker.Unlock()
return err
}
slf.rpcServer.Init(slf)
slf.buildLocalRpc()
cls.rpcServer.Init(cls)
cls.buildLocalRpc()
slf.serviceDiscovery.RegFunDelNode(slf.serviceDiscoveryDelNode)
slf.serviceDiscovery.RegFunSetNode(slf.serviceDiscoverySetNodeInfo)
slf.locker.Unlock()
cls.serviceDiscovery.RegFunDelNode(cls.serviceDiscoveryDelNode)
cls.serviceDiscovery.RegFunSetNode(cls.serviceDiscoverySetNodeInfo)
cls.locker.Unlock()
err = slf.serviceDiscovery.Init(localNodeId)
err = cls.serviceDiscovery.Init(localNodeId)
if err != nil {
return err
}
return nil
}
func (slf *Cluster) FindRpcHandler(servicename string) rpc.IRpcHandler {
pService := service.GetService(servicename)
func (cls *Cluster) FindRpcHandler(serviceName string) rpc.IRpcHandler {
pService := service.GetService(serviceName)
if pService == nil {
return nil
}
@@ -167,22 +159,22 @@ func (slf *Cluster) FindRpcHandler(servicename string) rpc.IRpcHandler {
return pService.GetRpcHandler()
}
func (slf *Cluster) Start() {
slf.rpcServer.Start(slf.localNodeInfo.ListenAddr)
func (cls *Cluster) Start() {
cls.rpcServer.Start(cls.localNodeInfo.ListenAddr)
}
func (slf *Cluster) Stop() {
slf.serviceDiscovery.OnNodeStop()
func (cls *Cluster) Stop() {
cls.serviceDiscovery.OnNodeStop()
}
func GetCluster() *Cluster{
return &cluster
}
func (slf *Cluster) GetRpcClient(nodeid int) *rpc.Client {
slf.locker.RLock()
defer slf.locker.RUnlock()
c,ok := slf.mapRpc[nodeid]
func (cls *Cluster) GetRpcClient(nodeId int) *rpc.Client {
cls.locker.RLock()
defer cls.locker.RUnlock()
c,ok := cls.mapRpc[nodeId]
if ok == false {
return nil
}
@@ -205,7 +197,7 @@ func GetRpcClient(nodeId int,serviceMethod string,clientList *[]*rpc.Client) err
return fmt.Errorf("servicemethod param %s is error!",serviceMethod)
}
//1.找到对应的rpcnodeid
//1.找到对应的rpcNodeid
GetCluster().GetNodeIdByService(serviceAndMethod[0],clientList)
return nil
}
@@ -214,7 +206,7 @@ func GetRpcServer() *rpc.Server{
return &cluster.rpcServer
}
func (slf *Cluster) IsNodeConnected (nodeId int) bool {
pClient := slf.GetRpcClient(nodeId)
func (cls *Cluster) IsNodeConnected (nodeId int) bool {
pClient := cls.GetRpcClient(nodeId)
return pClient!=nil && pClient.IsConnected()
}

View File

@@ -7,8 +7,8 @@ type ConfigDiscovery struct {
localNodeId int
}
func (slf *ConfigDiscovery) Init(localNodeId int) error{
slf.localNodeId = localNodeId
func (discovery *ConfigDiscovery) Init(localNodeId int) error{
discovery.localNodeId = localNodeId
//解析本地其他服务配置
nodeInfoList,err := GetCluster().readLocalClusterConfig(0)
@@ -20,21 +20,19 @@ func (slf *ConfigDiscovery) Init(localNodeId int) error{
if nodeInfo.NodeId == localNodeId {
continue
}
slf.funSetService(&nodeInfo)
discovery.funSetService(&nodeInfo)
}
return nil
}
func (slf *ConfigDiscovery) OnNodeStop(){
func (discovery *ConfigDiscovery) OnNodeStop(){
}
func (slf *ConfigDiscovery) RegFunDelNode(funDelNode FunDelNode){
slf.funDelService = funDelNode
func (discovery *ConfigDiscovery) RegFunDelNode(funDelNode FunDelNode){
discovery.funDelService = funDelNode
}
func (slf *ConfigDiscovery) RegFunSetNode(funSetNodeInfo FunSetNodeInfo){
slf.funSetService = funSetNodeInfo
func (discovery *ConfigDiscovery) RegFunSetNode(funSetNodeInfo FunSetNodeInfo){
discovery.funSetService = funSetNodeInfo
}

View File

@@ -14,8 +14,7 @@ type NodeInfoList struct {
NodeList []NodeInfo
}
func (slf *Cluster) ReadClusterConfig(filepath string) (*NodeInfoList,error) {
func (cls *Cluster) ReadClusterConfig(filepath string) (*NodeInfoList,error) {
c := &NodeInfoList{}
d, err := ioutil.ReadFile(filepath)
if err != nil {
@@ -29,9 +28,7 @@ func (slf *Cluster) ReadClusterConfig(filepath string) (*NodeInfoList,error) {
return c,nil
}
func (slf *Cluster) readServiceConfig(filepath string) (map[string]interface{},map[int]map[string]interface{},error) {
func (cls *Cluster) readServiceConfig(filepath string) (map[string]interface{},map[int]map[string]interface{},error) {
c := map[string]interface{}{}
//读取配置
d, err := ioutil.ReadFile(filepath)
@@ -65,10 +62,9 @@ func (slf *Cluster) readServiceConfig(filepath string) (map[string]interface{},
return serviceConfig,mapNodeService,nil
}
func (slf *Cluster) readLocalClusterConfig(nodeId int) ([]NodeInfo,error) {
func (cls *Cluster) readLocalClusterConfig(nodeId int) ([]NodeInfo,error) {
var nodeInfoList [] NodeInfo
clusterCfgPath :=strings.TrimRight(configdir,"/") +"/cluster"
clusterCfgPath :=strings.TrimRight(configDir,"/") +"/cluster"
fileInfoList,err := ioutil.ReadDir(clusterCfgPath)
if err != nil {
return nil,fmt.Errorf("Read dir %s is fail :%+v",clusterCfgPath,err)
@@ -78,7 +74,7 @@ func (slf *Cluster) readLocalClusterConfig(nodeId int) ([]NodeInfo,error) {
for _,f := range fileInfoList{
if f.IsDir() == false {
filePath := strings.TrimRight(strings.TrimRight(clusterCfgPath,"/"),"\\")+"/"+f.Name()
localNodeInfoList,err := slf.ReadClusterConfig(filePath)
localNodeInfoList,err := cls.ReadClusterConfig(filePath)
if err != nil {
return nil,fmt.Errorf("read file path %s is error:%+v" ,filePath,err)
}
@@ -86,7 +82,6 @@ func (slf *Cluster) readLocalClusterConfig(nodeId int) ([]NodeInfo,error) {
for _,nodeInfo := range localNodeInfoList.NodeList {
if nodeInfo.NodeId == nodeId || nodeId == 0 {
nodeInfoList = append(nodeInfoList,nodeInfo)
//slf.localNodeInfo = nodeInfo
}
}
}
@@ -99,8 +94,8 @@ func (slf *Cluster) readLocalClusterConfig(nodeId int) ([]NodeInfo,error) {
return nodeInfoList,nil
}
func (slf *Cluster) readLocalService(localNodeId int) error {
clusterCfgPath :=strings.TrimRight(configdir,"/") +"/cluster"
func (cls *Cluster) readLocalService(localNodeId int) error {
clusterCfgPath :=strings.TrimRight(configDir,"/") +"/cluster"
fileInfoList,err := ioutil.ReadDir(clusterCfgPath)
if err != nil {
return fmt.Errorf("Read dir %s is fail :%+v",clusterCfgPath,err)
@@ -110,17 +105,17 @@ func (slf *Cluster) readLocalService(localNodeId int) error {
for _,f := range fileInfoList {
if f.IsDir() == false {
filePath := strings.TrimRight(strings.TrimRight(clusterCfgPath, "/"), "\\") + "/" + f.Name()
serviceConfig,mapNodeService,err := slf.readServiceConfig(filePath)
serviceConfig,mapNodeService,err := cls.readServiceConfig(filePath)
if err != nil {
continue
}
for _,s := range slf.localNodeInfo.ServiceList{
for _,s := range cls.localNodeInfo.ServiceList{
for{
//取公共服务配置
pubCfg,ok := serviceConfig[s]
if ok == true {
slf.localServiceCfg[s] = pubCfg
cls.localServiceCfg[s] = pubCfg
}
//如果结点也配置了该服务,则覆盖之
@@ -133,7 +128,7 @@ func (slf *Cluster) readLocalService(localNodeId int) error {
break
}
slf.localServiceCfg[s] = sCfg
cls.localServiceCfg[s] = sCfg
break
}
}
@@ -143,49 +138,48 @@ func (slf *Cluster) readLocalService(localNodeId int) error {
return nil
}
func (slf *Cluster) parseLocalCfg(){
slf.mapIdNode[slf.localNodeInfo.NodeId] = slf.localNodeInfo
func (cls *Cluster) parseLocalCfg(){
cls.mapIdNode[cls.localNodeInfo.NodeId] = cls.localNodeInfo
for _,sName := range slf.localNodeInfo.ServiceList{
slf.mapServiceNode[sName] = append(slf.mapServiceNode[sName],slf.localNodeInfo.NodeId)
for _,sName := range cls.localNodeInfo.ServiceList{
cls.mapServiceNode[sName] = append(cls.mapServiceNode[sName], cls.localNodeInfo.NodeId)
}
}
func (slf *Cluster) InitCfg(localNodeId int) error{
slf.localServiceCfg = map[string]interface{}{}
slf.mapRpc = map[int] NodeRpcInfo{}
slf.mapIdNode = map[int]NodeInfo{}
slf.mapServiceNode = map[string][]int{}
func (cls *Cluster) InitCfg(localNodeId int) error{
cls.localServiceCfg = map[string]interface{}{}
cls.mapRpc = map[int] NodeRpcInfo{}
cls.mapIdNode = map[int]NodeInfo{}
cls.mapServiceNode = map[string][]int{}
//加载本地结点的NodeList配置
nodeInfoList,err := slf.readLocalClusterConfig(localNodeId)
nodeInfoList,err := cls.readLocalClusterConfig(localNodeId)
if err != nil {
return err
}
slf.localNodeInfo = nodeInfoList[0]
cls.localNodeInfo = nodeInfoList[0]
//读取本地服务配置
err = slf.readLocalService(localNodeId)
err = cls.readLocalService(localNodeId)
if err != nil {
return err
}
//本地配置服务加到全局map信息中
slf.parseLocalCfg()
cls.parseLocalCfg()
return nil
}
func (slf *Cluster) IsConfigService(serviceName string) bool {
slf.locker.RLock()
defer slf.locker.RUnlock()
nodeList,ok := slf.mapServiceNode[serviceName]
func (cls *Cluster) IsConfigService(serviceName string) bool {
cls.locker.RLock()
defer cls.locker.RUnlock()
nodeList,ok := cls.mapServiceNode[serviceName]
if ok == false {
return false
}
for _,nodeId := range nodeList{
if slf.localNodeInfo.NodeId == nodeId {
if cls.localNodeInfo.NodeId == nodeId {
return true
}
}
@@ -193,10 +187,10 @@ func (slf *Cluster) IsConfigService(serviceName string) bool {
return false
}
func (slf *Cluster) GetNodeIdByService(servicename string,rpcClientList *[]*rpc.Client) {
slf.locker.RLock()
defer slf.locker.RUnlock()
nodeIdList,ok := slf.mapServiceNode[servicename]
func (cls *Cluster) GetNodeIdByService(serviceName string,rpcClientList *[]*rpc.Client) {
cls.locker.RLock()
defer cls.locker.RUnlock()
nodeIdList,ok := cls.mapServiceNode[serviceName]
if ok == true {
for _,nodeId := range nodeIdList {
pClient := GetCluster().GetRpcClient(nodeId)
@@ -209,9 +203,8 @@ func (slf *Cluster) GetNodeIdByService(servicename string,rpcClientList *[]*rpc.
}
}
func (slf *Cluster) getServiceCfg(servicename string) interface{}{
v,ok := slf.localServiceCfg[servicename]
func (cls *Cluster) getServiceCfg(serviceName string) interface{}{
v,ok := cls.localServiceCfg[serviceName]
if ok == false {
return nil
}
@@ -219,8 +212,8 @@ func (slf *Cluster) getServiceCfg(servicename string) interface{}{
return v
}
func (slf *Cluster) GetServiceCfg(serviceName string) interface{}{
serviceCfg,ok := slf.localServiceCfg[serviceName]
func (cls *Cluster) GetServiceCfg(serviceName string) interface{}{
serviceCfg,ok := cls.localServiceCfg[serviceName]
if ok == false {
return nil
}

View File

@@ -8,7 +8,6 @@ type FunSetNodeInfo func(nodeInfo *NodeInfo)
type IServiceDiscovery interface {
Init(localNodeId int) error
OnNodeStop()
RegFunDelNode(funDelNode FunDelNode)
RegFunSetNode(funSetNodeInfo FunSetNodeInfo)
}