mirror of
https://github.com/duanhf2012/origin.git
synced 2026-02-09 02:54:45 +08:00
Compare commits
3 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c803b9b9ad | ||
|
|
3f52ea8331 | ||
|
|
2d1bee4dea |
@@ -20,6 +20,11 @@ const (
|
||||
Discard NodeStatus = 1 //丢弃
|
||||
)
|
||||
|
||||
type MasterDiscoveryService struct {
|
||||
MasterNodeId int32 //要筛选的主结点Id,如果不配置或者配置成0,表示针对所有的主结点
|
||||
DiscoveryService []string //只发现的服务列表
|
||||
}
|
||||
|
||||
type NodeInfo struct {
|
||||
NodeId int
|
||||
NodeName string
|
||||
@@ -29,8 +34,7 @@ type NodeInfo struct {
|
||||
CompressBytesLen int //超过字节进行压缩的长度
|
||||
ServiceList []string //所有的有序服务列表
|
||||
PublicServiceList []string //对外公开的服务列表
|
||||
DiscoveryService []string //筛选发现的服务,如果不配置,不进行筛选
|
||||
NeighborService []string
|
||||
MasterDiscoveryService []MasterDiscoveryService //筛选发现的服务,如果不配置,不进行筛选
|
||||
status NodeStatus
|
||||
}
|
||||
|
||||
|
||||
@@ -248,15 +248,6 @@ func (dc *DynamicDiscoveryClient) fullCompareDiffNode(masterNodeId int32, mapNod
|
||||
|
||||
//订阅发现的服务通知
|
||||
func (dc *DynamicDiscoveryClient) RPC_SubServiceDiscover(req *rpc.SubscribeDiscoverNotify) error {
|
||||
//整理当前master结点需要筛选的NeighborService
|
||||
masterDiscoveryNodeInfo := cluster.GetMasterDiscoveryNodeInfo(int(req.MasterNodeId))
|
||||
mapMasterDiscoveryService := map[string]struct{}{}
|
||||
if masterDiscoveryNodeInfo != nil {
|
||||
for i := 0; i < len(masterDiscoveryNodeInfo.NeighborService); i++ {
|
||||
mapMasterDiscoveryService[masterDiscoveryNodeInfo.NeighborService[i]] = struct{}{}
|
||||
}
|
||||
}
|
||||
|
||||
mapNodeInfo := map[int32]*rpc.NodeInfo{}
|
||||
for _, nodeInfo := range req.NodeInfo {
|
||||
//不对本地结点或者不存在任何公开服务的结点
|
||||
@@ -271,13 +262,6 @@ func (dc *DynamicDiscoveryClient) RPC_SubServiceDiscover(req *rpc.SubscribeDisco
|
||||
|
||||
//遍历所有的公开服务,并筛选之
|
||||
for _, serviceName := range nodeInfo.PublicServiceList {
|
||||
//只有存在配置时才做筛选
|
||||
if len(mapMasterDiscoveryService) > 0 {
|
||||
if _, ok := mapMasterDiscoveryService[serviceName]; ok == false {
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
nInfo := mapNodeInfo[nodeInfo.NodeId]
|
||||
if nInfo == nil {
|
||||
nInfo = &rpc.NodeInfo{}
|
||||
@@ -319,10 +303,8 @@ func (dc *DynamicDiscoveryClient) RPC_SubServiceDiscover(req *rpc.SubscribeDisco
|
||||
|
||||
//设置新结点
|
||||
for _, nodeInfo := range mapNodeInfo {
|
||||
dc.addMasterNode(req.MasterNodeId, nodeInfo.NodeId)
|
||||
dc.setNodeInfo(nodeInfo)
|
||||
|
||||
if len(nodeInfo.PublicServiceList) == 0 {
|
||||
bSet := dc.setNodeInfo(req.MasterNodeId,nodeInfo)
|
||||
if bSet == false {
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -358,13 +340,8 @@ func (dc *DynamicDiscoveryClient) regServiceDiscover(nodeId int){
|
||||
req.NodeInfo.NodeName = cluster.localNodeInfo.NodeName
|
||||
req.NodeInfo.ListenAddr = cluster.localNodeInfo.ListenAddr
|
||||
req.NodeInfo.MaxRpcParamLen = cluster.localNodeInfo.MaxRpcParamLen
|
||||
req.NodeInfo.PublicServiceList = cluster.localNodeInfo.PublicServiceList
|
||||
|
||||
//MasterDiscoveryNode配置中没有配置NeighborService,则同步当前结点所有服务
|
||||
if len(nodeInfo.NeighborService) == 0 {
|
||||
req.NodeInfo.PublicServiceList = cluster.localNodeInfo.PublicServiceList
|
||||
} else {
|
||||
req.NodeInfo.PublicServiceList = append(req.NodeInfo.PublicServiceList, DynamicDiscoveryClientName)
|
||||
}
|
||||
|
||||
//向Master服务同步本Node服务信息
|
||||
err := dc.AsyncCallNode(nodeId, RegServiceDiscover, &req, func(res *rpc.Empty, err error) {
|
||||
@@ -382,37 +359,53 @@ func (dc *DynamicDiscoveryClient) regServiceDiscover(nodeId int){
|
||||
}
|
||||
}
|
||||
|
||||
func (dc *DynamicDiscoveryClient) setNodeInfo(nodeInfo *rpc.NodeInfo) {
|
||||
if nodeInfo == nil || nodeInfo.Private == true || int(nodeInfo.NodeId) == dc.localNodeId {
|
||||
return
|
||||
}
|
||||
func (dc *DynamicDiscoveryClient) canDiscoveryService(fromMasterNodeId int32,serviceName string) bool{
|
||||
canDiscovery := true
|
||||
|
||||
//筛选关注的服务
|
||||
localNodeInfo := cluster.GetLocalNodeInfo()
|
||||
if len(localNodeInfo.DiscoveryService) > 0 {
|
||||
var discoverServiceSlice = make([]string, 0, 24)
|
||||
for _, pubService := range nodeInfo.PublicServiceList {
|
||||
for _, discoverService := range localNodeInfo.DiscoveryService {
|
||||
if pubService == discoverService {
|
||||
discoverServiceSlice = append(discoverServiceSlice, pubService)
|
||||
for i:=0;i<len(cluster.GetLocalNodeInfo().MasterDiscoveryService);i++{
|
||||
masterNodeId := cluster.GetLocalNodeInfo().MasterDiscoveryService[i].MasterNodeId
|
||||
|
||||
if masterNodeId == fromMasterNodeId || masterNodeId == 0 {
|
||||
canDiscovery = false
|
||||
|
||||
for _,discoveryService := range cluster.GetLocalNodeInfo().MasterDiscoveryService[i].DiscoveryService {
|
||||
if discoveryService == serviceName {
|
||||
return true
|
||||
}
|
||||
}
|
||||
}
|
||||
nodeInfo.PublicServiceList = discoverServiceSlice
|
||||
}
|
||||
|
||||
if len(nodeInfo.PublicServiceList) == 0 {
|
||||
return
|
||||
return canDiscovery
|
||||
}
|
||||
|
||||
func (dc *DynamicDiscoveryClient) setNodeInfo(masterNodeId int32,nodeInfo *rpc.NodeInfo) bool{
|
||||
if nodeInfo == nil || nodeInfo.Private == true || int(nodeInfo.NodeId) == dc.localNodeId {
|
||||
return false
|
||||
}
|
||||
|
||||
//筛选关注的服务
|
||||
var discoverServiceSlice = make([]string, 0, 24)
|
||||
for _, pubService := range nodeInfo.PublicServiceList {
|
||||
if dc.canDiscoveryService(masterNodeId,pubService) == true {
|
||||
discoverServiceSlice = append(discoverServiceSlice,pubService)
|
||||
}
|
||||
}
|
||||
|
||||
if len(discoverServiceSlice) == 0 {
|
||||
return false
|
||||
}
|
||||
|
||||
var nInfo NodeInfo
|
||||
nInfo.ServiceList = nodeInfo.PublicServiceList
|
||||
nInfo.PublicServiceList = nodeInfo.PublicServiceList
|
||||
nInfo.ServiceList = discoverServiceSlice
|
||||
nInfo.PublicServiceList = discoverServiceSlice
|
||||
nInfo.NodeId = int(nodeInfo.NodeId)
|
||||
nInfo.NodeName = nodeInfo.NodeName
|
||||
nInfo.ListenAddr = nodeInfo.ListenAddr
|
||||
nInfo.MaxRpcParamLen = nodeInfo.MaxRpcParamLen
|
||||
dc.funSetService(&nInfo)
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
func (dc *DynamicDiscoveryClient) OnNodeDisconnect(nodeId int) {
|
||||
|
||||
@@ -1,10 +1,10 @@
|
||||
package log
|
||||
|
||||
import (
|
||||
"log/slog"
|
||||
"io"
|
||||
"path/filepath"
|
||||
"context"
|
||||
"io"
|
||||
"log/slog"
|
||||
"path/filepath"
|
||||
"runtime"
|
||||
"runtime/debug"
|
||||
"sync"
|
||||
@@ -35,21 +35,21 @@ type OriginJsonHandler struct {
|
||||
func getStrLevel(level slog.Level) string{
|
||||
switch level {
|
||||
case LevelTrace:
|
||||
return "TRACE"
|
||||
return "Trace"
|
||||
case LevelDebug:
|
||||
return "DEBUG"
|
||||
return "Debug"
|
||||
case LevelInfo:
|
||||
return "INFO"
|
||||
return "Info"
|
||||
case LevelWarning:
|
||||
return "WARNING"
|
||||
return "Warning"
|
||||
case LevelError:
|
||||
return "ERROR"
|
||||
return "Error"
|
||||
case LevelStack:
|
||||
return "STACK"
|
||||
return "Stack"
|
||||
case LevelDump:
|
||||
return "DUMP"
|
||||
return "Dump"
|
||||
case LevelFatal:
|
||||
return "FATAL"
|
||||
return "Fatal"
|
||||
}
|
||||
|
||||
return ""
|
||||
@@ -97,7 +97,8 @@ func (oh *OriginTextHandler) Handle(context context.Context, record slog.Record)
|
||||
oh.w.Write([]byte(strDump))
|
||||
return err
|
||||
}
|
||||
|
||||
|
||||
|
||||
return oh.TextHandler.Handle(context, record)
|
||||
}
|
||||
|
||||
|
||||
17
log/log.go
17
log/log.go
@@ -489,23 +489,8 @@ func (logger *Logger) formatHeader(buf *Buffer,level slog.Level,calldepth int) {
|
||||
file = filepath.Base(file)
|
||||
|
||||
buf.AppendString("time=\"")
|
||||
year, month, day := t.Date()
|
||||
buf.AppendInt(int64(year))
|
||||
buf.AppendByte('/')
|
||||
buf.AppendInt(int64(month))
|
||||
buf.AppendByte('/')
|
||||
buf.AppendInt(int64(day))
|
||||
buf.AppendByte(' ')
|
||||
|
||||
hour, min, sec := t.Clock()
|
||||
buf.AppendInt(int64(hour))
|
||||
buf.AppendByte(':')
|
||||
buf.AppendInt(int64(min))
|
||||
buf.AppendByte(':')
|
||||
|
||||
buf.AppendInt(int64(sec))
|
||||
buf.AppendString(t.Format("2006/01/02 15:04:05"))
|
||||
buf.AppendString("\"")
|
||||
|
||||
logger.sBuff.AppendString(" level=")
|
||||
logger.sBuff.AppendString(getStrLevel(level))
|
||||
logger.sBuff.AppendString(" source=")
|
||||
|
||||
@@ -53,18 +53,9 @@ type Client struct {
|
||||
}
|
||||
|
||||
func (tcpService *TcpService) genId() uint64 {
|
||||
if node.GetNodeId()>MaxNodeId{
|
||||
panic("nodeId exceeds the maximum!")
|
||||
}
|
||||
|
||||
newSeed := atomic.AddUint32(&seed,1) % MaxSeed
|
||||
nowTime := uint64(time.Now().Unix())%MaxTime
|
||||
return (uint64(node.GetNodeId())<<50)|(nowTime<<19)|uint64(newSeed)
|
||||
}
|
||||
|
||||
|
||||
func GetNodeId(agentId uint64) int {
|
||||
return int(agentId>>50)
|
||||
return (uint64(node.GetNodeId()%MaxNodeId)<<50)|(nowTime<<19)|uint64(newSeed)
|
||||
}
|
||||
|
||||
func (tcpService *TcpService) OnInit() error{
|
||||
|
||||
Reference in New Issue
Block a user