diff --git a/cluster/cluster.go b/cluster/cluster.go index d5c28f6..7a91bae 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -175,6 +175,7 @@ func (cls *Cluster) serviceDiscoverySetNodeInfo (nodeInfo *NodeInfo){ } cls.mapIdNode[nodeInfo.NodeId] = *nodeInfo + log.Debug("Discovery nodeId: %d discovery service:%+v",nodeInfo.NodeId,nodeInfo.PublicServiceList) //已经存在连接,则不需要进行设置 if _,rpcInfoOK := cls.mapRpc[nodeInfo.NodeId];rpcInfoOK == true { return @@ -186,9 +187,7 @@ func (cls *Cluster) serviceDiscoverySetNodeInfo (nodeInfo *NodeInfo){ rpcInfo.client.Connect(nodeInfo.NodeId,nodeInfo.ListenAddr) cls.mapRpc[nodeInfo.NodeId] = rpcInfo - //debug - fmt.Printf("xxxxxxxxxxxx") - fmt.Println(nodeInfo) + } func (cls *Cluster) buildLocalRpc(){ diff --git a/event/event.go b/event/event.go index b311e93..2a87fc0 100644 --- a/event/event.go +++ b/event/event.go @@ -233,7 +233,7 @@ func (processor *EventProcessor) EventHandler(ev IEvent) { func (processor *EventProcessor) pushEvent(event IEvent){ if len(processor.eventChannel)>=cap(processor.eventChannel){ - log.Error("event process channel is full.") + log.Error("event process channel is full,data:%+v!",event) return } diff --git a/node/node.go b/node/node.go index 35374a9..f585127 100644 --- a/node/node.go +++ b/node/node.go @@ -232,12 +232,12 @@ func startNode(args interface{}) error{ //2.初始化node initNode(nodeId) - //3.运行集群 - cluster.GetCluster().Start() - - //4.运行service + //3.运行service service.Start() + //4.运行集群 + cluster.GetCluster().Start() + //5.记录进程id号 writeProcessPid(nodeId) diff --git a/rpc/server.go b/rpc/server.go index 133fe67..cb0904b 100644 --- a/rpc/server.go +++ b/rpc/server.go @@ -112,7 +112,7 @@ func (agent *RpcAgent) Run() { for { data,err := agent.conn.ReadMsg() if err != nil { - log.Error("read message: %v", err) + log.Error("read message: %v,remoteAddress:%s", err,agent.conn.RemoteAddr().String()) //will close tcpconn break } diff --git a/service/service.go b/service/service.go index 7b3eabc..6ccab8d 100644 --- a/service/service.go +++ b/service/service.go @@ -98,6 +98,8 @@ func (s *Service) SetGoRoutineNum(goroutineNum int32) bool { func (s *Service) Start() { s.startStatus = true + s.eventProcessor.SetEventChannel(0) + for i:=int32(0);i< s.goroutineNum;i++{ s.wg.Add(1) go func(){