mirror of
https://github.com/duanhf2012/origin.git
synced 2026-03-16 22:47:33 +08:00
优化日志
This commit is contained in:
@@ -202,7 +202,11 @@ func (cls *Cluster) serviceDiscoverySetNodeInfo(nodeInfo *NodeInfo) {
|
|||||||
rpcInfo.client =rpc.NewRClient(nodeInfo.NodeId, nodeInfo.ListenAddr, nodeInfo.MaxRpcParamLen,cls.localNodeInfo.CompressBytesLen,&cls.callSet,cls.NotifyAllService)
|
rpcInfo.client =rpc.NewRClient(nodeInfo.NodeId, nodeInfo.ListenAddr, nodeInfo.MaxRpcParamLen,cls.localNodeInfo.CompressBytesLen,&cls.callSet,cls.NotifyAllService)
|
||||||
}
|
}
|
||||||
cls.mapRpc[nodeInfo.NodeId] = &rpcInfo
|
cls.mapRpc[nodeInfo.NodeId] = &rpcInfo
|
||||||
log.Info("Discovery nodeId and new rpc client",log.String("NodeId", nodeInfo.NodeId),log.Any("services:", nodeInfo.PublicServiceList),log.Bool("Retire",nodeInfo.Retire),log.String("nodeListenAddr",nodeInfo.ListenAddr))
|
if cls.IsNatsMode() == true {
|
||||||
|
log.Info("Discovery nodeId and new rpc client",log.String("NodeId", nodeInfo.NodeId),log.Any("services:", nodeInfo.PublicServiceList),log.Bool("Retire",nodeInfo.Retire))
|
||||||
|
}else{
|
||||||
|
log.Info("Discovery nodeId and new rpc client",log.String("NodeId", nodeInfo.NodeId),log.Any("services:", nodeInfo.PublicServiceList),log.Bool("Retire",nodeInfo.Retire),log.String("nodeListenAddr",nodeInfo.ListenAddr))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -36,7 +36,6 @@ type OriginDiscoveryClient struct {
|
|||||||
localNodeId string
|
localNodeId string
|
||||||
|
|
||||||
mapDiscovery map[string]map[string][]string //map[masterNodeId]map[nodeId]struct{}
|
mapDiscovery map[string]map[string][]string //map[masterNodeId]map[nodeId]struct{}
|
||||||
//mapMasterNetwork map[string]string
|
|
||||||
bRetire bool
|
bRetire bool
|
||||||
isRegisterOk bool
|
isRegisterOk bool
|
||||||
}
|
}
|
||||||
@@ -127,7 +126,7 @@ func (ds *OriginDiscoveryMaster) checkTTL(){
|
|||||||
ds.NewTicker(interval,func(t *timer.Ticker){
|
ds.NewTicker(interval,func(t *timer.Ticker){
|
||||||
ds.nsTTL.checkTTL(func(nodeIdList []string) {
|
ds.nsTTL.checkTTL(func(nodeIdList []string) {
|
||||||
for _,nodeId := range nodeIdList {
|
for _,nodeId := range nodeIdList {
|
||||||
log.Debug("TTL expiry",log.String("nodeId",nodeId))
|
log.Info("TTL expiry",log.String("nodeId",nodeId))
|
||||||
ds.OnNodeDisconnect(nodeId)
|
ds.OnNodeDisconnect(nodeId)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
@@ -189,13 +188,11 @@ func (ds *OriginDiscoveryMaster) RpcCastGo(serviceMethod string, args interface{
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (ds *OriginDiscoveryMaster) RPC_Ping(req *rpc.Ping, res *rpc.Pong) error {
|
func (ds *OriginDiscoveryMaster) RPC_Ping(req *rpc.Ping, res *rpc.Pong) error {
|
||||||
log.Debug("ping",log.String("nodeId",req.NodeId))
|
|
||||||
if ds.isRegNode(req.NodeId) == false{
|
if ds.isRegNode(req.NodeId) == false{
|
||||||
res.Ok = false
|
res.Ok = false
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
//return nil
|
|
||||||
res.Ok = true
|
res.Ok = true
|
||||||
ds.nsTTL.addAndRefreshNode(req.NodeId)
|
ds.nsTTL.addAndRefreshNode(req.NodeId)
|
||||||
return nil
|
return nil
|
||||||
@@ -386,7 +383,6 @@ func (dc *OriginDiscoveryClient) fullCompareDiffNode(masterNodeId string, mapNod
|
|||||||
|
|
||||||
//订阅发现的服务通知
|
//订阅发现的服务通知
|
||||||
func (dc *OriginDiscoveryClient) RPC_SubServiceDiscover(req *rpc.SubscribeDiscoverNotify) error {
|
func (dc *OriginDiscoveryClient) RPC_SubServiceDiscover(req *rpc.SubscribeDiscoverNotify) error {
|
||||||
log.Debug("RPC_SubServiceDiscover",log.String("masterNodeId",req.MasterNodeId),log.String("delNodeId",req.GetDelNodeId()))
|
|
||||||
mapNodeInfo := map[string]*rpc.NodeInfo{}
|
mapNodeInfo := map[string]*rpc.NodeInfo{}
|
||||||
for _, nodeInfo := range req.NodeInfo {
|
for _, nodeInfo := range req.NodeInfo {
|
||||||
//不对本地结点或者不存在任何公开服务的结点
|
//不对本地结点或者不存在任何公开服务的结点
|
||||||
@@ -572,7 +568,6 @@ func (dc *OriginDiscoveryClient) setNodeInfo(masterNodeId string,nodeInfo *rpc.N
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (dc *OriginDiscoveryClient) OnNodeDisconnect(nodeId string) {
|
func (dc *OriginDiscoveryClient) OnNodeDisconnect(nodeId string) {
|
||||||
log.Debug("OnNodeDisconnect",log.String("nodeId",nodeId))
|
|
||||||
//将Discard结点清理
|
//将Discard结点清理
|
||||||
cluster.DiscardNode(nodeId)
|
cluster.DiscardNode(nodeId)
|
||||||
}
|
}
|
||||||
@@ -616,7 +611,6 @@ func (cls *Cluster) AddDiscoveryService(serviceName string, bPublicService bool)
|
|||||||
|
|
||||||
|
|
||||||
func (cls *Cluster) IsOriginMasterDiscoveryNode(nodeId string) bool {
|
func (cls *Cluster) IsOriginMasterDiscoveryNode(nodeId string) bool {
|
||||||
//return cls.GetMasterDiscoveryNodeInfo(cls.GetLocalNodeInfo().NodeId) != nil
|
|
||||||
return cls.getOriginMasterDiscoveryNodeInfo(nodeId) != nil
|
return cls.getOriginMasterDiscoveryNodeInfo(nodeId) != nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -635,7 +629,6 @@ func (cls *Cluster) getOriginMasterDiscoveryNodeInfo(nodeId string) *NodeInfo {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (dc *OriginDiscoveryClient) OnNatsConnected(){
|
func (dc *OriginDiscoveryClient) OnNatsConnected(){
|
||||||
log.Debug("OnNatsConnected")
|
|
||||||
masterNodes := GetCluster().GetOriginDiscovery().MasterNodeList
|
masterNodes := GetCluster().GetOriginDiscovery().MasterNodeList
|
||||||
for i:=0;i<len(masterNodes);i++ {
|
for i:=0;i<len(masterNodes);i++ {
|
||||||
dc.regServiceDiscover(masterNodes[i].NodeId)
|
dc.regServiceDiscover(masterNodes[i].NodeId)
|
||||||
@@ -643,5 +636,4 @@ func (dc *OriginDiscoveryClient) OnNatsConnected(){
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (dc *OriginDiscoveryClient) OnNatsDisconnect(){
|
func (dc *OriginDiscoveryClient) OnNatsDisconnect(){
|
||||||
|
|
||||||
}
|
}
|
||||||
@@ -18,31 +18,29 @@ type NatsServer struct {
|
|||||||
notifyEventFun NotifyEventFun
|
notifyEventFun NotifyEventFun
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const reconnectWait = 3*time.Second
|
||||||
func (ns *NatsServer) Start() error{
|
func (ns *NatsServer) Start() error{
|
||||||
var err error
|
var err error
|
||||||
var options []nats.Option
|
var options []nats.Option
|
||||||
|
|
||||||
options = append(options,nats.DisconnectErrHandler(func(nc *nats.Conn, err error) {
|
options = append(options,nats.DisconnectErrHandler(func(nc *nats.Conn, err error) {
|
||||||
log.Error("nats is disconnect",log.String("connUrl",nc.ConnectedUrl()))
|
log.Error("nats is disconnected",log.String("connUrl",nc.ConnectedUrl()))
|
||||||
ns.notifyEventFun(&NatsConnEvent{IsConnect:false})
|
ns.notifyEventFun(&NatsConnEvent{IsConnect:false})
|
||||||
// handle disconnect error event
|
|
||||||
}))
|
}))
|
||||||
|
|
||||||
options = append(options,nats.ConnectHandler(func(nc *nats.Conn) {
|
options = append(options,nats.ConnectHandler(func(nc *nats.Conn) {
|
||||||
|
log.Info("nats is connected",log.String("connUrl",nc.ConnectedUrl()))
|
||||||
ns.notifyEventFun(&NatsConnEvent{IsConnect:true})
|
ns.notifyEventFun(&NatsConnEvent{IsConnect:true})
|
||||||
//log.Error("nats is connect",log.String("connUrl",nc.ConnectedUrl()))
|
//log.Error("nats is connect",log.String("connUrl",nc.ConnectedUrl()))
|
||||||
// handle disconnect error event
|
|
||||||
}))
|
}))
|
||||||
|
|
||||||
options = append(options,nats.ReconnectHandler(func(nc *nats.Conn) {
|
options = append(options,nats.ReconnectHandler(func(nc *nats.Conn) {
|
||||||
ns.notifyEventFun(&NatsConnEvent{IsConnect:true})
|
ns.notifyEventFun(&NatsConnEvent{IsConnect:true})
|
||||||
log.Error("nats is reconnection",log.String("connUrl",nc.ConnectedUrl()))
|
log.Info("nats is reconnected",log.String("connUrl",nc.ConnectedUrl()))
|
||||||
// handle reconnect event
|
|
||||||
}))
|
}))
|
||||||
|
|
||||||
options = append(options,nats.MaxReconnects(-1))
|
options = append(options,nats.MaxReconnects(-1))
|
||||||
|
options = append(options,nats.ReconnectWait(reconnectWait))
|
||||||
options = append(options,nats.ReconnectWait(time.Second*3))
|
|
||||||
|
|
||||||
if ns.NoRandomize {
|
if ns.NoRandomize {
|
||||||
options = append(options,nats.DontRandomize())
|
options = append(options,nats.DontRandomize())
|
||||||
|
|||||||
@@ -81,7 +81,7 @@ type INatsConnListener interface {
|
|||||||
|
|
||||||
type IDiscoveryServiceListener interface {
|
type IDiscoveryServiceListener interface {
|
||||||
OnDiscoveryService(nodeId string, serviceName []string)
|
OnDiscoveryService(nodeId string, serviceName []string)
|
||||||
OnUnDiscoveryService(nodeId string)
|
OnUnDiscoveryService(nodeId string, serviceName []string)
|
||||||
}
|
}
|
||||||
|
|
||||||
type CancelRpc func()
|
type CancelRpc func()
|
||||||
|
|||||||
@@ -339,7 +339,7 @@ func (s *Service) OnDiscoverServiceEvent(ev event.IEvent){
|
|||||||
if event.IsDiscovery {
|
if event.IsDiscovery {
|
||||||
s.discoveryServiceLister.OnDiscoveryService(event.NodeId,event.ServiceName)
|
s.discoveryServiceLister.OnDiscoveryService(event.NodeId,event.ServiceName)
|
||||||
}else{
|
}else{
|
||||||
s.discoveryServiceLister.OnUnDiscoveryService(event.NodeId)
|
s.discoveryServiceLister.OnUnDiscoveryService(event.NodeId,event.ServiceName)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user