diff --git a/Test/config/cluster.json b/Test/config/cluster.json index cce0306..329fab7 100644 --- a/Test/config/cluster.json +++ b/Test/config/cluster.json @@ -10,7 +10,8 @@ "ServiceList": [ "HttpServerService", "SubNet1_Service", - "TcpSocketPbService" + "TcpSocketPbService", + "ls" ], "ClusterNode":["SubNet2.N_Node1","N_Node2"] }, diff --git a/Test/logicservice/SubNet1_Service.go b/Test/logicservice/SubNet1_Service.go index 465395b..ba7e451 100644 --- a/Test/logicservice/SubNet1_Service.go +++ b/Test/logicservice/SubNet1_Service.go @@ -22,10 +22,17 @@ func init() { //OnInit ... func (ws *SubNet1_Service) OnInit() error { - sysservice.DefaultTSPbService().RegConnectEvent(ws.ConnEventHandler) - sysservice.DefaultTSPbService().RegDisconnectEvent(ws.DisconnEventHandler) - sysservice.DefaultTSPbService().RegExceptMessage(ws.ExceptMessage) - sysservice.DefaultTSPbService().RegMessage(110, &msgpb.Test{}, ws.MessageHandler) + sysservice.GetTcpSocketPbService("ls").RegConnectEvent(ws.ConnEventHandler) + sysservice.GetTcpSocketPbService("ls").RegDisconnectEvent(ws.DisconnEventHandler) + sysservice.GetTcpSocketPbService("ls").RegExceptMessage(ws.ExceptMessage) + sysservice.GetTcpSocketPbService("ls").RegMessage(110, &msgpb.Test{}, ws.MessageHandler) + + /* + sysservice.GetTcpSocketPbService("lc").RegConnectEvent(ws.ConnEventHandler2) + sysservice.GetTcpSocketPbService("lc").RegDisconnectEvent(ws.DisconnEventHandler2) + sysservice.GetTcpSocketPbService("lc").RegExceptMessage(ws.ExceptMessage2) + sysservice.GetTcpSocketPbService("lc").RegMessage(110, &msgpb.Test{}, ws.MessageHandler2) +*/ return nil } @@ -33,6 +40,7 @@ func (ws *SubNet1_Service) OnInit() error { //OnRun ... func (ws *SubNet1_Service) OnRun() bool { + time.Sleep(time.Second * 10) var cli network.TcpSocketClient cli.Connect("127.0.0.1:9004") @@ -47,10 +55,15 @@ func (ws *SubNet1_Service) OnRun() bool { func (ws *SubNet1_Service) MessageHandler(pClient *network.SClient, msgtype uint16, msg proto.Message) { fmt.Print("recv:",pClient.GetId(), ":", msg,"\n") pClient.SendMsg(msgtype,msg) + + var a map[int]int + a[33] = 3 + fmt.Print(a[44]) } func (ws *SubNet1_Service) ConnEventHandler(pClient *network.SClient) { fmt.Print("connected..",pClient.GetId(),"\n") + } func (ws *SubNet1_Service) DisconnEventHandler(pClient *network.SClient) { @@ -60,3 +73,24 @@ func (ws *SubNet1_Service) DisconnEventHandler(pClient *network.SClient) { func (ws *SubNet1_Service) ExceptMessage(pClient *network.SClient, pPack *network.MsgBasePack, err error) { fmt.Print("except..",pClient.GetId(),",",pPack,"\n") } + + + +/////////////////////////// + +func (ws *SubNet1_Service) MessageHandler2(pClient *network.SClient, msgtype uint16, msg proto.Message) { + fmt.Print("recv:",pClient.GetId(), ":", msg,"\n") + pClient.SendMsg(msgtype,msg) +} + +func (ws *SubNet1_Service) ConnEventHandler2(pClient *network.SClient) { + fmt.Print("connected..",pClient.GetId(),"\n") +} + +func (ws *SubNet1_Service) DisconnEventHandler2(pClient *network.SClient) { + fmt.Print("disconnected..",pClient.GetId(),"\n") +} + +func (ws *SubNet1_Service) ExceptMessage2(pClient *network.SClient, pPack *network.MsgBasePack, err error) { + fmt.Print("except..",pClient.GetId(),",",pPack,"\n") +} \ No newline at end of file diff --git a/Test/main.go b/Test/main.go index e56e426..2980b47 100644 --- a/Test/main.go +++ b/Test/main.go @@ -38,9 +38,14 @@ func main() { } pTcpService := sysservice.NewTcpSocketPbService(":9004") + pTcpService.SetServiceName("ls") + + pTcpService2 := sysservice.NewTcpSocketPbService(":9005") + pTcpService2.SetServiceName("lc") + httpserver.SetPrintRequestTime(true) - node.SetupService(httpserver,pTcpService) + node.SetupService(httpserver,pTcpService,pTcpService2) node.Init() node.Start() } diff --git a/network/tcpsocketserver.go b/network/tcpsocketserver.go index f1357a0..d7f707b 100644 --- a/network/tcpsocketserver.go +++ b/network/tcpsocketserver.go @@ -3,8 +3,9 @@ package network import ( "bufio" "encoding/binary" - "github.com/duanhf2012/origin/util" + "fmt" "github.com/duanhf2012/origin/service" + "github.com/duanhf2012/origin/util" "github.com/golang/protobuf/proto" "io" "net" @@ -95,13 +96,13 @@ func (slf *TcpSocketServer) listenServer(){ sc :=&SClient{id:clientId,conn:conn,tcpserver:slf,remoteip:conn.RemoteAddr().String(),starttime:time.Now().UnixNano(), recvPack:util.NewSyncQueue(),sendPack:util.NewSyncQueue()} - slf.iReciver.OnConnected(sc) + + slf.mapClient.Set(clientId,sc) util.Go(sc.listendata) //收来自客户端数据 util.Go(sc.onrecv) //发送数据队列 util.Go(sc.onsend) - slf.mapClient.Set(clientId,sc) break } @@ -111,12 +112,13 @@ func (slf *TcpSocketServer) listenServer(){ func (slf *SClient) listendata(){ defer func() { - slf.tcpserver.iReciver.OnDisconnect(slf) - slf.bClose = true - slf.conn.Close() + slf.Close() slf.tcpserver.mapClient.Del(slf.id) + slf.tcpserver.iReciver.OnDisconnect(slf) + service.GetLogger().Printf(service.LEVER_DEBUG, "clent id %d return listendata...",slf.id) }() + slf.tcpserver.iReciver.OnConnected(slf) //获取一个连接的reader读取流 reader := bufio.NewReader(slf.conn) @@ -157,7 +159,6 @@ func (slf *SClient) listendata(){ buff = append(buff[fillsize:]) buffDataSize -= fillsize } - } } @@ -213,6 +214,10 @@ func (slf *SClient) Send(pack *MsgBasePack){ func (slf *SClient) SendMsg(packtype uint16,message proto.Message) error{ + if slf.bClose == true { + return fmt.Errorf("client id %d is close!",slf.id) + } + var msg MsgBasePack data,err := proto.Marshal(message) if err != nil { @@ -226,20 +231,39 @@ func (slf *SClient) SendMsg(packtype uint16,message proto.Message) error{ } func (slf *SClient) onsend(){ + defer func() { + slf.Close() + service.GetLogger().Printf(service.LEVER_DEBUG, "clent id %d return onsend...",slf.id) + }() + for { - pack := slf.sendPack.Pop() - if pack == nil { + pack,ok := slf.sendPack.TryPop() + if slf.bClose == true { + break + } + if ok == false || pack == nil { + time.Sleep(time.Millisecond*1) continue } + pPackData := pack.(*MsgBasePack) slf.conn.Write(pPackData.Bytes()) } } func (slf *SClient) onrecv(){ + defer func() { + slf.Close() + service.GetLogger().Printf(service.LEVER_DEBUG, "clent id %d return onrecv...",slf.id) + }() + for { - pack := slf.recvPack.Pop() - if pack == nil { + pack,ok := slf.recvPack.TryPop() + if slf.bClose == true { + break + } + if ok == false || pack == nil { + time.Sleep(time.Millisecond*1) continue } @@ -248,9 +272,14 @@ func (slf *SClient) onrecv(){ } } + func (slf *SClient) Close(){ if slf.bClose == false { slf.conn.Close() + slf.bClose = true + + slf.recvPack.Close() + slf.sendPack.Close() } } diff --git a/sysservice/tcpsocketpbservice.go b/sysservice/tcpsocketpbservice.go index 75dadbe..f926384 100644 --- a/sysservice/tcpsocketpbservice.go +++ b/sysservice/tcpsocketpbservice.go @@ -142,3 +142,11 @@ func DefaultTSPbService() *TcpSocketPbService{ return iservice.(*TcpSocketPbService) } +func GetTcpSocketPbService(serviceName string) *TcpSocketPbService{ + iservice := service.InstanceServiceMgr().FindService(serviceName) + if iservice == nil { + return nil + } + + return iservice.(*TcpSocketPbService) +} \ No newline at end of file