diff --git a/cluster/cluster.go b/cluster/cluster.go index 6c5da83..cb5feb1 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -108,7 +108,7 @@ func (cls *Cluster) serviceDiscoverySetNodeInfo (nodeInfo *NodeInfo){ rpcInfo := NodeRpcInfo{} rpcInfo.nodeInfo = *nodeInfo rpcInfo.client = &rpc.Client{} - rpcInfo.client.Connect(nodeInfo.ListenAddr) + rpcInfo.client.Connect(nodeInfo.NodeId,nodeInfo.ListenAddr) cls.mapRpc[nodeInfo.NodeId] = rpcInfo } @@ -116,7 +116,7 @@ func (cls *Cluster) buildLocalRpc(){ rpcInfo := NodeRpcInfo{} rpcInfo.nodeInfo = cls.localNodeInfo rpcInfo.client = &rpc.Client{} - rpcInfo.client.Connect("") + rpcInfo.client.Connect(rpcInfo.nodeInfo.NodeId,"") cls.mapRpc[cls.localNodeInfo.NodeId] = rpcInfo } diff --git a/rpc/client.go b/rpc/client.go index 361d5aa..577b391 100644 --- a/rpc/client.go +++ b/rpc/client.go @@ -15,6 +15,7 @@ import ( ) type Client struct { + id int bSelfNode bool network.TCPClient conn *network.TCPConn @@ -34,7 +35,8 @@ func (client *Client) NewClientAgent(conn *network.TCPConn) network.Agent { return client } -func (client *Client) Connect(addr string) error { +func (client *Client) Connect(id int,addr string) error { + client.id = id client.Addr = addr client.maxCheckCallRpcCount = 1000 client.callRpcTimeout = 15*time.Second @@ -316,4 +318,8 @@ func (client *Client) OnClose(){ func (client *Client) IsConnected() bool { return client.conn!=nil && client.conn.IsConnected()==true -} \ No newline at end of file +} + +func (client *Client) GetId() int{ + return client.id +} diff --git a/sysservice/tcpgateway/ILoadBalance.go b/sysservice/tcpgateway/ILoadBalance.go index 1bea6be..b3479c9 100644 --- a/sysservice/tcpgateway/ILoadBalance.go +++ b/sysservice/tcpgateway/ILoadBalance.go @@ -1,5 +1,5 @@ package tcpgateway type ILoadBalance interface { - SelectNode(serviceName string) int //选择一个结点,通过服务名称 + SelectNode(serviceName string,clientId uint64,eventType string,msgType uint16,msg []byte) int //选择一个结点,通过服务名称 } diff --git a/sysservice/tcpgateway/LoadBalance.go b/sysservice/tcpgateway/LoadBalance.go index 983a45a..a8ca2b2 100644 --- a/sysservice/tcpgateway/LoadBalance.go +++ b/sysservice/tcpgateway/LoadBalance.go @@ -3,6 +3,6 @@ package tcpgateway type LoadBalance struct { } -func (balance *LoadBalance) SelectNode(serviceName string) int { +func (balance *LoadBalance) SelectNode(serviceName string,clientId uint64,eventType string,msgType uint16,msg []byte) int { return 1 } diff --git a/sysservice/tcpgateway/Router.go b/sysservice/tcpgateway/Router.go index 24a2a21..2b8ccef 100644 --- a/sysservice/tcpgateway/Router.go +++ b/sysservice/tcpgateway/Router.go @@ -211,6 +211,7 @@ func (args RawInputArgs) DoGc() { } network.ReleaseByteSlice(args.rawData) } +var msgEventType string func (r *Router) RouterMessage(cliId uint64,msgType uint16,msg []byte) { routerInfo:= r.GetMsgRouterService(msgType) @@ -221,7 +222,7 @@ func (r *Router) RouterMessage(cliId uint64,msgType uint16,msg []byte) { routerId := r.GetRouterId(cliId,&routerInfo.ServiceName) if routerId ==0 { - routerId = r.loadBalance.SelectNode(routerInfo.ServiceName) + routerId = r.loadBalance.SelectNode(routerInfo.ServiceName,cliId,msgEventType,msgType,msg) r.SetRouterId(cliId,routerInfo.ServiceName,routerId) } @@ -242,7 +243,7 @@ func (r *Router) RouterEvent(clientId uint64,eventType string) bool{ routerId := r.GetRouterId(clientId,&routerInfo.ServiceName) if routerId ==0 { - routerId = r.loadBalance.SelectNode(routerInfo.ServiceName) + routerId = r.loadBalance.SelectNode(routerInfo.ServiceName,clientId,eventType,0,nil) r.SetRouterId(clientId,routerInfo.ServiceName,routerId) } diff --git a/sysservice/tcpgateway/TcpGateService.go b/sysservice/tcpgateway/TcpGateService.go index 6ba864a..551148c 100644 --- a/sysservice/tcpgateway/TcpGateService.go +++ b/sysservice/tcpgateway/TcpGateService.go @@ -8,11 +8,6 @@ import ( "github.com/duanhf2012/origin/sysservice/tcpservice" ) -func init(){ - node.Setup(&tcpservice.TcpService{}) - node.Setup(&TcpGateService{}) -} - type MsgTypeRouterInfo struct { router IRouter serviceName string