From 263aeb232cbd3163037a182361adab8a9749243a Mon Sep 17 00:00:00 2001 From: duanhf2012 Date: Tue, 21 Jan 2020 10:47:52 +0800 Subject: [PATCH] =?UTF-8?q?=E6=96=B0=E5=A2=9Etcp=20protobuf=E6=8E=A5?= =?UTF-8?q?=E5=8F=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Test/SubNet1_Service.go | 34 ------- Test/logicservice/SubNet1_Service.go | 64 +++++++++++++ Test/main.go | 7 +- Test/msgpb/gen.bat | 1 + Test/msgpb/test.pb.go | 132 +++++++++++++++++++++++++++ Test/msgpb/test.proto | 18 ++++ network/tcpsocketclient.go | 42 +++++++++ network/tcpsocketserver.go | 41 +++++++-- sysservice/tcpsocketpbservice.go | 114 ++++++++++++++++++++++- 9 files changed, 402 insertions(+), 51 deletions(-) delete mode 100644 Test/SubNet1_Service.go create mode 100644 Test/logicservice/SubNet1_Service.go create mode 100644 Test/msgpb/gen.bat create mode 100644 Test/msgpb/test.pb.go create mode 100644 Test/msgpb/test.proto create mode 100644 network/tcpsocketclient.go diff --git a/Test/SubNet1_Service.go b/Test/SubNet1_Service.go deleted file mode 100644 index 11f6a74..0000000 --- a/Test/SubNet1_Service.go +++ /dev/null @@ -1,34 +0,0 @@ -package main - -import ( - "fmt" - - "github.com/duanhf2012/origin/cluster" - "github.com/duanhf2012/origin/originnode" - "github.com/duanhf2012/origin/service" -) - -type SubNet1_Service struct { - service.BaseService -} - -func init() { - originnode.InitService(&SubNet1_Service{}) -} - -//OnInit ... -func (ws *SubNet1_Service) OnInit() error { - - return nil -} - -//OnRun ... -func (ws *SubNet1_Service) OnRun() bool { - var in InputData - var ret int - in.A1 = 10 - in.A2 = 20 - err := cluster.Call("SubNet2_Service1.RPC_Multi", &in, &ret) - fmt.Printf("%+v", err) - return false -} diff --git a/Test/logicservice/SubNet1_Service.go b/Test/logicservice/SubNet1_Service.go new file mode 100644 index 0000000..f54dbe7 --- /dev/null +++ b/Test/logicservice/SubNet1_Service.go @@ -0,0 +1,64 @@ +package logicservice + +import ( + + "github.com/duanhf2012/origin/Test/msgpb" + "github.com/duanhf2012/origin/network" + "github.com/duanhf2012/origin/sysservice" + "github.com/golang/protobuf/proto" + "time" + + "github.com/duanhf2012/origin/originnode" + "github.com/duanhf2012/origin/service" + +) + +type SubNet1_Service struct { + service.BaseService +} + +func init() { + originnode.InitService(&SubNet1_Service{}) +} + +//OnInit ... +func (ws *SubNet1_Service) OnInit() error { + sysservice.DefaultTSPbService().RegConnectEvent(ws.ConnEventHandler) + sysservice.DefaultTSPbService().RegDisconnectEvent(ws.DisconnEventHandler) + sysservice.DefaultTSPbService().RegExceptMessage(ws.ExceptMessage) + sysservice.DefaultTSPbService().RegMessage(110,&msgpb.Test{},ws.MessageHandler) + + return nil +} + + +//OnRun ... +func (ws *SubNet1_Service) OnRun() bool { + + + time.Sleep(time.Second*10) + var cli network.TcpSocketClient + cli.Connect("127.0.0.1:9004") + test := msgpb.Test{} + test.AssistCount = proto.Int32(343) + + + + cli.SendMsg(110,&test) + cli.SendMsg(110,&test) + return false +} + + + func (ws *SubNet1_Service) MessageHandler(pClient *network.SClient,msgtype uint16,msg proto.Message){ + } + +func (ws *SubNet1_Service) ConnEventHandler (pClient *network.SClient){ +} + +func (ws *SubNet1_Service) DisconnEventHandler (pClient *network.SClient){ +} + +func (ws *SubNet1_Service) ExceptMessage(pClient *network.SClient,pPack *network.MsgBasePack,err error){ +} + diff --git a/Test/main.go b/Test/main.go index e362efd..e56e426 100644 --- a/Test/main.go +++ b/Test/main.go @@ -1,11 +1,12 @@ package main import ( + _ "github.com/duanhf2012/origin/Test/logicservice" "github.com/duanhf2012/origin/cluster" + "github.com/duanhf2012/origin/network" "github.com/duanhf2012/origin/originnode" "github.com/duanhf2012/origin/sysservice" "github.com/duanhf2012/origin/sysservice/originhttp" - "github.com/duanhf2012/origin/network" ) @@ -21,9 +22,7 @@ func (slf *TcpSocketServerReciver) OnDisconnect(pClient *network.SClient){ } -func (slf *TcpSocketServerReciver) OnRecvMsg(pClient *network.SClient, pPack *network.MsgBasePack){ -} func main() { @@ -38,7 +37,7 @@ func main() { httpserver.SetHttps(ca.CertFile, ca.KeyFile) } - pTcpService := sysservice.NewTcpSocketPbService(":9004",&TcpSocketServerReciver{}) + pTcpService := sysservice.NewTcpSocketPbService(":9004") httpserver.SetPrintRequestTime(true) node.SetupService(httpserver,pTcpService) diff --git a/Test/msgpb/gen.bat b/Test/msgpb/gen.bat new file mode 100644 index 0000000..546eec0 --- /dev/null +++ b/Test/msgpb/gen.bat @@ -0,0 +1 @@ +protoc --go_out=. test.proto \ No newline at end of file diff --git a/Test/msgpb/test.pb.go b/Test/msgpb/test.pb.go new file mode 100644 index 0000000..2fb5f59 --- /dev/null +++ b/Test/msgpb/test.pb.go @@ -0,0 +1,132 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// source: test.proto + +package msgpb + +import ( + fmt "fmt" + proto "github.com/golang/protobuf/proto" + math "math" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package + +//* +// @brief base_score_info +type Test struct { + WinCount *int32 `protobuf:"varint,1,opt,name=win_count,json=winCount" json:"win_count,omitempty"` + LoseCount *int32 `protobuf:"varint,2,opt,name=lose_count,json=loseCount" json:"lose_count,omitempty"` + ExceptionCount *int32 `protobuf:"varint,3,opt,name=exception_count,json=exceptionCount" json:"exception_count,omitempty"` + KillCount *int32 `protobuf:"varint,4,opt,name=kill_count,json=killCount" json:"kill_count,omitempty"` + DeathCount *int32 `protobuf:"varint,5,opt,name=death_count,json=deathCount" json:"death_count,omitempty"` + AssistCount *int32 `protobuf:"varint,6,opt,name=assist_count,json=assistCount" json:"assist_count,omitempty"` + Rating *int64 `protobuf:"varint,7,opt,name=rating" json:"rating,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *Test) Reset() { *m = Test{} } +func (m *Test) String() string { return proto.CompactTextString(m) } +func (*Test) ProtoMessage() {} +func (*Test) Descriptor() ([]byte, []int) { + return fileDescriptor_c161fcfdc0c3ff1e, []int{0} +} + +func (m *Test) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_Test.Unmarshal(m, b) +} +func (m *Test) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_Test.Marshal(b, m, deterministic) +} +func (m *Test) XXX_Merge(src proto.Message) { + xxx_messageInfo_Test.Merge(m, src) +} +func (m *Test) XXX_Size() int { + return xxx_messageInfo_Test.Size(m) +} +func (m *Test) XXX_DiscardUnknown() { + xxx_messageInfo_Test.DiscardUnknown(m) +} + +var xxx_messageInfo_Test proto.InternalMessageInfo + +func (m *Test) GetWinCount() int32 { + if m != nil && m.WinCount != nil { + return *m.WinCount + } + return 0 +} + +func (m *Test) GetLoseCount() int32 { + if m != nil && m.LoseCount != nil { + return *m.LoseCount + } + return 0 +} + +func (m *Test) GetExceptionCount() int32 { + if m != nil && m.ExceptionCount != nil { + return *m.ExceptionCount + } + return 0 +} + +func (m *Test) GetKillCount() int32 { + if m != nil && m.KillCount != nil { + return *m.KillCount + } + return 0 +} + +func (m *Test) GetDeathCount() int32 { + if m != nil && m.DeathCount != nil { + return *m.DeathCount + } + return 0 +} + +func (m *Test) GetAssistCount() int32 { + if m != nil && m.AssistCount != nil { + return *m.AssistCount + } + return 0 +} + +func (m *Test) GetRating() int64 { + if m != nil && m.Rating != nil { + return *m.Rating + } + return 0 +} + +func init() { + proto.RegisterType((*Test)(nil), "msgpb.test") +} + +func init() { proto.RegisterFile("test.proto", fileDescriptor_c161fcfdc0c3ff1e) } + +var fileDescriptor_c161fcfdc0c3ff1e = []byte{ + // 178 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x3c, 0x8c, 0x5d, 0x0a, 0x82, 0x40, + 0x14, 0x46, 0x99, 0xfc, 0x29, 0xaf, 0x51, 0xe0, 0x43, 0x08, 0x11, 0x59, 0x2f, 0xf9, 0xd4, 0x26, + 0xda, 0x81, 0x1b, 0x08, 0xb3, 0xc1, 0x86, 0x6c, 0x46, 0xbc, 0x37, 0x6c, 0xc5, 0xad, 0x23, 0x66, + 0xee, 0xe4, 0xe3, 0x77, 0xce, 0xe1, 0x03, 0x20, 0x89, 0x74, 0xee, 0x07, 0x43, 0x26, 0x8b, 0x5e, + 0xd8, 0xf6, 0xb7, 0xe3, 0x57, 0x40, 0x68, 0x69, 0xb6, 0x85, 0x64, 0x54, 0xfa, 0xda, 0x98, 0xb7, + 0xa6, 0x5c, 0x14, 0xa2, 0x8c, 0xaa, 0xc5, 0xa8, 0xf4, 0xc5, 0xee, 0x6c, 0x07, 0xd0, 0x19, 0x94, + 0xde, 0xce, 0x9c, 0x4d, 0x2c, 0x61, 0x7d, 0x82, 0xb5, 0xfc, 0x34, 0xb2, 0x27, 0x65, 0xfe, 0x0f, + 0x81, 0x6b, 0x56, 0x13, 0x9e, 0x7e, 0x9e, 0xaa, 0xeb, 0x7c, 0x13, 0xf2, 0x8f, 0x25, 0xac, 0xf7, + 0x90, 0xde, 0x65, 0x4d, 0x0f, 0xef, 0x23, 0xe7, 0xc1, 0x21, 0x0e, 0x0e, 0xb0, 0xac, 0x11, 0x15, + 0x92, 0x2f, 0x62, 0x57, 0xa4, 0xcc, 0x38, 0xd9, 0x40, 0x3c, 0xd4, 0xa4, 0x74, 0x9b, 0xcf, 0x0b, + 0x51, 0x06, 0x95, 0x5f, 0xbf, 0x00, 0x00, 0x00, 0xff, 0xff, 0x46, 0xa0, 0x89, 0x59, 0xfc, 0x00, + 0x00, 0x00, +} diff --git a/Test/msgpb/test.proto b/Test/msgpb/test.proto new file mode 100644 index 0000000..001411d --- /dev/null +++ b/Test/msgpb/test.proto @@ -0,0 +1,18 @@ +syntax = "proto2"; + +package msgpb; + + +/** + * @brief base_score_info + */ +message test{ + optional int32 win_count = 1; // 玩家胜局局数 + optional int32 lose_count = 2; // 玩家负局局数 + optional int32 exception_count = 3; // 玩家异常局局数 + optional int32 kill_count = 4; // 总人头数 + optional int32 death_count = 5; // 总死亡数 + optional int32 assist_count = 6; // 总总助攻数 + optional int64 rating = 7; // 评价积分 +} + diff --git a/network/tcpsocketclient.go b/network/tcpsocketclient.go new file mode 100644 index 0000000..2b13bb5 --- /dev/null +++ b/network/tcpsocketclient.go @@ -0,0 +1,42 @@ +package network + +import ( + "fmt" + "github.com/golang/protobuf/proto" + "net" +) + +type TcpSocketClient struct { + conn net.Conn +} + +func (slf *TcpSocketClient) Connect(addr string) error{ + tcpAddr,terr := net.ResolveTCPAddr("tcp",addr) + if terr != nil { + return terr + } + + conn,err := net.DialTCP("tcp",nil,tcpAddr) + if err!=nil { + fmt.Println("Client connect error ! " + err.Error()) + return err + } + slf.conn = conn + + // + return nil +} + +func (slf *TcpSocketClient) SendMsg(packtype uint16,message proto.Message) error{ + var msg MsgBasePack + data,err := proto.Marshal(message) + if err != nil { + return err + } + + msg.Make(packtype,data) + + slf.conn.Write(msg.Bytes()) + + return nil +} diff --git a/network/tcpsocketserver.go b/network/tcpsocketserver.go index 45e7731..22235a3 100644 --- a/network/tcpsocketserver.go +++ b/network/tcpsocketserver.go @@ -7,6 +7,7 @@ import ( "github.com/duanhf2012/origin/service" "io" "net" + "unsafe" "os" "time" @@ -23,11 +24,12 @@ type SClient struct { id uint64 conn net.Conn - recvPack util.SyncQueue - sendPack util.SyncQueue + recvPack *util.SyncQueue + sendPack *util.SyncQueue tcpserver *TcpSocketServer remoteip string starttime int64 + bClose bool } type TcpSocketServer struct { @@ -46,6 +48,15 @@ type MsgBasePack struct { StartTime time.Time } +func (slf *MsgBasePack) PackType() uint16 { + return slf.packtype +} + +func (slf *MsgBasePack) Body() []byte{ + return slf.body +} + + func (slf *TcpSocketServer) Register(listenAddr string,iReciver ITcpSocketServerReciver){ slf.listenAddr = listenAddr slf.iReciver = iReciver @@ -80,7 +91,8 @@ func (slf *TcpSocketServer) listenServer(){ continue } - sc :=&SClient{id:clientId,conn:conn,tcpserver:slf,remoteip:conn.RemoteAddr().String(),starttime:time.Now().UnixNano()} + sc :=&SClient{id:clientId,conn:conn,tcpserver:slf,remoteip:conn.RemoteAddr().String(),starttime:time.Now().UnixNano(), + recvPack:util.NewSyncQueue(),sendPack:util.NewSyncQueue()} slf.iReciver.OnConnected(sc) util.Go(sc.listendata) //收来自客户端数据 @@ -98,6 +110,7 @@ func (slf *TcpSocketServer) listenServer(){ func (slf *SClient) listendata(){ defer func() { slf.tcpserver.iReciver.OnDisconnect(slf) + slf.bClose = true slf.conn.Close() slf.tcpserver.mapClient.Del(slf.id) }() @@ -137,12 +150,14 @@ func (slf *SClient) listendata(){ } -func (slf *MsgBasePack) Bytes() (bRet []byte){ +func (slf *MsgBasePack) Bytes() []byte{ + var bRet []byte + bRet = make([]byte,4) binary.BigEndian.PutUint16(bRet,slf.packsize) - binary.BigEndian.PutUint16(bRet,slf.packtype) + binary.BigEndian.PutUint16(bRet[2:],slf.packtype) bRet = append(bRet,slf.body...) - return + return bRet } //返回值:填充多少字节,是否完成 @@ -155,7 +170,7 @@ func (slf *MsgBasePack) FillData(bdata []byte,datasize uint16) (uint16,bool) { } slf.packsize= binary.BigEndian.Uint16(bdata[:2]) - slf.packtype= binary.BigEndian.Uint16(bdata[2:2]) + slf.packtype= binary.BigEndian.Uint16(bdata[2:4]) fillsize += 4 } @@ -172,6 +187,12 @@ func (slf *MsgBasePack) FillData(bdata []byte,datasize uint16) (uint16,bool) { func (slf *MsgBasePack) Clear() { } +func (slf *MsgBasePack) Make(packtype uint16,data []byte) { + slf.packtype = packtype + slf.body = data + slf.packsize = uint16(unsafe.Sizeof(slf.packtype)*2)+uint16(len(data)) +} + func (slf *SClient) Send(pack *MsgBasePack){ slf.sendPack.Push(pack) } @@ -199,5 +220,9 @@ func (slf *SClient) onrecv(){ } } - +func (slf *SClient) Close(){ + if slf.bClose == false { + slf.conn.Close() + } +} diff --git a/sysservice/tcpsocketpbservice.go b/sysservice/tcpsocketpbservice.go index 3a4a6fd..4229b95 100644 --- a/sysservice/tcpsocketpbservice.go +++ b/sysservice/tcpsocketpbservice.go @@ -1,25 +1,38 @@ package sysservice import ( + "errors" "github.com/duanhf2012/origin/network" "github.com/duanhf2012/origin/service" + "github.com/golang/protobuf/proto" + "reflect" ) type TcpSocketPbService struct { service.BaseService listenaddr string tcpsocketserver network.TcpSocketServer - reciver network.ITcpSocketServerReciver + mapMsg map[uint16]MessageInfo + + connEvent EventHandler + disconnEvent EventHandler + + exceptMsgHandler ExceptMsgHandler } -func NewTcpSocketPbService(listenaddr string,reciver network.ITcpSocketServerReciver) *TcpSocketPbService { +type MessageHandler func(pClient *network.SClient,msgtype uint16,msg proto.Message) +type EventHandler func(pClient *network.SClient) +type ExceptMsgHandler func(pClient *network.SClient,pPack *network.MsgBasePack,err error) + + + +func NewTcpSocketPbService(listenaddr string) *TcpSocketPbService { ts := new(TcpSocketPbService) ts.listenaddr = listenaddr - ts.reciver = reciver - - ts.tcpsocketserver.Register(listenaddr,reciver) + ts.mapMsg = make(map[uint16]MessageInfo,1) + ts.tcpsocketserver.Register(listenaddr,ts) return ts } @@ -29,6 +42,97 @@ func (slf *TcpSocketPbService) OnInit() error { func (slf *TcpSocketPbService) OnRun() bool { slf.tcpsocketserver.Start() + +/* + slf.RegisterMessage(10,&msgpb.Test{},slf.Test) + var testpack network.MsgBasePack + a := msgpb.Test{} + a.WinCount =proto.Int32(33) + d,err := proto.Marshal(&a) + fmt.Print(err) + + testpack.Make(10,d) + slf.OnRecvMsg(nil,&testpack) + + */ return false } + +type MessageInfo struct { + msgType reflect.Type + msgHandler MessageHandler +} + + +func (slf *TcpSocketPbService) RegMessage(msgtype uint16,msg proto.Message,handle MessageHandler){ + var info MessageInfo + + info.msgType = reflect.TypeOf(msg.(proto.Message)) + info.msgHandler = handle + slf.mapMsg[msgtype] = info +} + +func (slf *TcpSocketPbService) RegConnectEvent(eventHandler EventHandler){ + slf.connEvent = eventHandler +} + +func (slf *TcpSocketPbService) RegDisconnectEvent(eventHandler EventHandler){ + slf.disconnEvent = eventHandler +} + +func (slf *TcpSocketPbService) RegExceptMessage(exceptMsgHandler ExceptMsgHandler){ + slf.exceptMsgHandler = exceptMsgHandler +} + + +func (slf *TcpSocketPbService) OnConnected(pClient *network.SClient){ + if slf.connEvent!=nil { + slf.connEvent(pClient) + } +} + +func (slf *TcpSocketPbService) OnDisconnect(pClient *network.SClient){ + if slf.disconnEvent!=nil { + slf.disconnEvent(pClient) + } +} + +func (slf *TcpSocketPbService) OnExceptMsg (pClient *network.SClient,pPack *network.MsgBasePack,err error){ + if slf.exceptMsgHandler!=nil { + slf.exceptMsgHandler(pClient,pPack,err) + }else{ + pClient.Close() + //记录日志 + service.GetLogger().Printf(service.LEVER_WARN, "OnExceptMsg packtype %d,error %+v",pPack.PackType(),err) + } +} + +func (slf *TcpSocketPbService) OnRecvMsg(pClient *network.SClient, pPack *network.MsgBasePack){ + if info, ok := slf.mapMsg[pPack.PackType()]; ok { + msg := reflect.New(info.msgType.Elem()).Interface() + tmp := msg.(proto.Message) + err := proto.Unmarshal(pPack.Body(), tmp) + if err != nil { + slf.OnExceptMsg(pClient,pPack,err) + return + } + + info.msgHandler(pClient,pPack.PackType(), msg.(proto.Message)) + return + } + + slf.OnExceptMsg(pClient,pPack,errors.New("not found PackType")) + + return +} + +func DefaultTSPbService() *TcpSocketPbService{ + iservice := service.InstanceServiceMgr().FindService("TcpSocketPbService") + if iservice == nil { + return nil + } + + return iservice.(*TcpSocketPbService) +} +