新增网络服务

This commit is contained in:
boyce
2020-02-04 16:49:03 +08:00
parent f026c0ca9e
commit 29b04d82c3
5 changed files with 94 additions and 17 deletions

View File

@@ -3,8 +3,9 @@ package network
import (
"bufio"
"encoding/binary"
"github.com/duanhf2012/origin/util"
"fmt"
"github.com/duanhf2012/origin/service"
"github.com/duanhf2012/origin/util"
"github.com/golang/protobuf/proto"
"io"
"net"
@@ -95,13 +96,13 @@ func (slf *TcpSocketServer) listenServer(){
sc :=&SClient{id:clientId,conn:conn,tcpserver:slf,remoteip:conn.RemoteAddr().String(),starttime:time.Now().UnixNano(),
recvPack:util.NewSyncQueue(),sendPack:util.NewSyncQueue()}
slf.iReciver.OnConnected(sc)
slf.mapClient.Set(clientId,sc)
util.Go(sc.listendata)
//收来自客户端数据
util.Go(sc.onrecv)
//发送数据队列
util.Go(sc.onsend)
slf.mapClient.Set(clientId,sc)
break
}
@@ -111,12 +112,13 @@ func (slf *TcpSocketServer) listenServer(){
func (slf *SClient) listendata(){
defer func() {
slf.tcpserver.iReciver.OnDisconnect(slf)
slf.bClose = true
slf.conn.Close()
slf.Close()
slf.tcpserver.mapClient.Del(slf.id)
slf.tcpserver.iReciver.OnDisconnect(slf)
service.GetLogger().Printf(service.LEVER_DEBUG, "clent id %d return listendata...",slf.id)
}()
slf.tcpserver.iReciver.OnConnected(slf)
//获取一个连接的reader读取流
reader := bufio.NewReader(slf.conn)
@@ -157,7 +159,6 @@ func (slf *SClient) listendata(){
buff = append(buff[fillsize:])
buffDataSize -= fillsize
}
}
}
@@ -213,6 +214,10 @@ func (slf *SClient) Send(pack *MsgBasePack){
func (slf *SClient) SendMsg(packtype uint16,message proto.Message) error{
if slf.bClose == true {
return fmt.Errorf("client id %d is close!",slf.id)
}
var msg MsgBasePack
data,err := proto.Marshal(message)
if err != nil {
@@ -226,20 +231,39 @@ func (slf *SClient) SendMsg(packtype uint16,message proto.Message) error{
}
func (slf *SClient) onsend(){
defer func() {
slf.Close()
service.GetLogger().Printf(service.LEVER_DEBUG, "clent id %d return onsend...",slf.id)
}()
for {
pack := slf.sendPack.Pop()
if pack == nil {
pack,ok := slf.sendPack.TryPop()
if slf.bClose == true {
break
}
if ok == false || pack == nil {
time.Sleep(time.Millisecond*1)
continue
}
pPackData := pack.(*MsgBasePack)
slf.conn.Write(pPackData.Bytes())
}
}
func (slf *SClient) onrecv(){
defer func() {
slf.Close()
service.GetLogger().Printf(service.LEVER_DEBUG, "clent id %d return onrecv...",slf.id)
}()
for {
pack := slf.recvPack.Pop()
if pack == nil {
pack,ok := slf.recvPack.TryPop()
if slf.bClose == true {
break
}
if ok == false || pack == nil {
time.Sleep(time.Millisecond*1)
continue
}
@@ -248,9 +272,14 @@ func (slf *SClient) onrecv(){
}
}
func (slf *SClient) Close(){
if slf.bClose == false {
slf.conn.Close()
slf.bClose = true
slf.recvPack.Close()
slf.sendPack.Close()
}
}