From c3cb6ff977517f07fd0a2a6fece3cee65ea83f9d Mon Sep 17 00:00:00 2001 From: duanhf2012 Date: Mon, 20 Jan 2020 13:26:50 +0800 Subject: [PATCH] =?UTF-8?q?=E6=96=B0=E5=A2=9Etcpsocket=E5=8D=8F=E8=AE=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Test/config/cluster.json | 3 +- Test/main.go | 23 +++- network/tcpsocketserver.go | 203 +++++++++++++++++++++++++++++++ sysservice/tcpsocketpbservice.go | 34 ++++++ 4 files changed, 261 insertions(+), 2 deletions(-) create mode 100644 network/tcpsocketserver.go create mode 100644 sysservice/tcpsocketpbservice.go diff --git a/Test/config/cluster.json b/Test/config/cluster.json index 9ccedf6..4dc1a88 100644 --- a/Test/config/cluster.json +++ b/Test/config/cluster.json @@ -10,7 +10,8 @@ "ServiceList": [ "HttpServerService", "SubNet1_Service", - "SubNet1_Service1" + "SubNet1_Service1", + "TcpSocketPbService" ], "ClusterNode":["SubNet2.N_Node1","N_Node2"] }, diff --git a/Test/main.go b/Test/main.go index 13eb261..e362efd 100644 --- a/Test/main.go +++ b/Test/main.go @@ -3,9 +3,28 @@ package main import ( "github.com/duanhf2012/origin/cluster" "github.com/duanhf2012/origin/originnode" + "github.com/duanhf2012/origin/sysservice" "github.com/duanhf2012/origin/sysservice/originhttp" + "github.com/duanhf2012/origin/network" ) + +type TcpSocketServerReciver struct { + +} + +func (slf *TcpSocketServerReciver) OnConnected(pClient *network.SClient){ + +} + +func (slf *TcpSocketServerReciver) OnDisconnect(pClient *network.SClient){ + +} + +func (slf *TcpSocketServerReciver) OnRecvMsg(pClient *network.SClient, pPack *network.MsgBasePack){ + +} + func main() { node := originnode.NewOriginNode() @@ -18,9 +37,11 @@ func main() { for _, ca := range nodeCfg.CAFile { httpserver.SetHttps(ca.CertFile, ca.KeyFile) } + + pTcpService := sysservice.NewTcpSocketPbService(":9004",&TcpSocketServerReciver{}) httpserver.SetPrintRequestTime(true) - node.SetupService(httpserver) + node.SetupService(httpserver,pTcpService) node.Init() node.Start() } diff --git a/network/tcpsocketserver.go b/network/tcpsocketserver.go new file mode 100644 index 0000000..45e7731 --- /dev/null +++ b/network/tcpsocketserver.go @@ -0,0 +1,203 @@ +package network + +import ( + "bufio" + "encoding/binary" + "github.com/duanhf2012/origin/util" + "github.com/duanhf2012/origin/service" + "io" + "net" + + "os" + "time" +) + +type ITcpSocketServerReciver interface { + OnConnected(pClient *SClient) + OnDisconnect(pClient *SClient) + OnRecvMsg(pClient *SClient, pPack *MsgBasePack) +} + + +type SClient struct { + id uint64 + conn net.Conn + + recvPack util.SyncQueue + sendPack util.SyncQueue + tcpserver *TcpSocketServer + remoteip string + starttime int64 +} + +type TcpSocketServer struct { + listenAddr string //ip:port + mapClient util.Map + + MaxRecvPackSize uint16 + MaxSendPackSize uint16 + iReciver ITcpSocketServerReciver +} + +type MsgBasePack struct { + packsize uint16 + packtype uint16 + body []byte + StartTime time.Time +} + +func (slf *TcpSocketServer) Register(listenAddr string,iReciver ITcpSocketServerReciver){ + slf.listenAddr = listenAddr + slf.iReciver = iReciver +} + + +func (slf *TcpSocketServer) Start(){ + slf.MaxRecvPackSize = 2048 + slf.MaxSendPackSize = 40960 + + util.Go(slf.listenServer) +} + +func (slf *TcpSocketServer) listenServer(){ + listener, err := net.Listen("tcp", slf.listenAddr) + if err != nil { + service.GetLogger().Printf(service.LEVER_FATAL, "TcpSocketServer Listen error %+v",err) + os.Exit(1) + } + + var clientId uint64 + for { + conn, aerr := listener.Accept() + if aerr != nil { + service.GetLogger().Printf(service.LEVER_FATAL, "TcpSocketServer accept error %+v",aerr) + continue + } + + for { + clientId += 1 + if slf.mapClient.Get(clientId)!= nil { + continue + } + + sc :=&SClient{id:clientId,conn:conn,tcpserver:slf,remoteip:conn.RemoteAddr().String(),starttime:time.Now().UnixNano()} + slf.iReciver.OnConnected(sc) + util.Go(sc.listendata) + //收来自客户端数据 + util.Go(sc.onrecv) + //发送数据队列 + util.Go(sc.onsend) + slf.mapClient.Set(clientId,sc) + + break + } + } +} + + +func (slf *SClient) listendata(){ + defer func() { + slf.tcpserver.iReciver.OnDisconnect(slf) + slf.conn.Close() + slf.tcpserver.mapClient.Del(slf.id) + }() + + //获取一个连接的reader读取流 + reader := bufio.NewReader(slf.conn) + + //临时接受数据的buff + var buff []byte //tmprecvbuf + var tmpbuff []byte + var buffDataSize uint16 + tmpbuff = make([]byte,2048) + + //解析包数据 + var pack MsgBasePack + for { + n,err := reader.Read(tmpbuff) + if err != nil || err == io.EOF { + service.GetLogger().Printf(service.LEVER_INFO, "clent id %d is disconnect %+v",slf.id,err) + return + } + buff = append(buff,tmpbuff[:n]...) + buffDataSize += uint16(n) + if buffDataSize> slf.tcpserver.MaxRecvPackSize { + service.GetLogger().Print(service.LEVER_WARN,"recv client id %d data size %d is over %d",slf.id,buffDataSize,slf.tcpserver.MaxRecvPackSize) + return + } + + fillsize,bfillRet := pack.FillData(buff,buffDataSize) + if bfillRet == true { + slf.recvPack.Push(pack) + pack = MsgBasePack{} + } + buff = append(buff[fillsize:]) + buffDataSize -= fillsize + } +} + + +func (slf *MsgBasePack) Bytes() (bRet []byte){ + binary.BigEndian.PutUint16(bRet,slf.packsize) + binary.BigEndian.PutUint16(bRet,slf.packtype) + bRet = append(bRet,slf.body...) + + return +} + +//返回值:填充多少字节,是否完成 +func (slf *MsgBasePack) FillData(bdata []byte,datasize uint16) (uint16,bool) { + var fillsize uint16 + //解包头 + if slf.packsize ==0 { + if datasize < 4 { + return 0,false + } + + slf.packsize= binary.BigEndian.Uint16(bdata[:2]) + slf.packtype= binary.BigEndian.Uint16(bdata[2:2]) + fillsize += 4 + } + + //解包体 + if slf.packsize>0 && slf.packsize>=datasize { + slf.body = append(slf.body, bdata[fillsize:slf.packsize]...) + fillsize += (datasize - fillsize) + return fillsize,true + } + + return fillsize,false +} + +func (slf *MsgBasePack) Clear() { +} + +func (slf *SClient) Send(pack *MsgBasePack){ + slf.sendPack.Push(pack) +} + +func (slf *SClient) onsend(){ + for { + pack := slf.sendPack.Pop() + if pack == nil { + continue + } + pPackData := pack.(*MsgBasePack) + slf.conn.Write(pPackData.Bytes()) + } +} + +func (slf *SClient) onrecv(){ + for { + pack := slf.recvPack.Pop() + if pack == nil { + continue + } + + pMsg := pack.(MsgBasePack) + slf.tcpserver.iReciver.OnRecvMsg(slf,&pMsg) + } +} + + + diff --git a/sysservice/tcpsocketpbservice.go b/sysservice/tcpsocketpbservice.go new file mode 100644 index 0000000..3a4a6fd --- /dev/null +++ b/sysservice/tcpsocketpbservice.go @@ -0,0 +1,34 @@ +package sysservice + +import ( + "github.com/duanhf2012/origin/network" + "github.com/duanhf2012/origin/service" +) + +type TcpSocketPbService struct { + service.BaseService + listenaddr string + tcpsocketserver network.TcpSocketServer + reciver network.ITcpSocketServerReciver +} + + +func NewTcpSocketPbService(listenaddr string,reciver network.ITcpSocketServerReciver) *TcpSocketPbService { + ts := new(TcpSocketPbService) + + ts.listenaddr = listenaddr + ts.reciver = reciver + + ts.tcpsocketserver.Register(listenaddr,reciver) + return ts +} + +func (slf *TcpSocketPbService) OnInit() error { + return nil +} + +func (slf *TcpSocketPbService) OnRun() bool { + slf.tcpsocketserver.Start() + return false +} +