新增tcp protobuf接口

This commit is contained in:
duanhf2012
2020-01-21 10:47:52 +08:00
parent c3cb6ff977
commit 263aeb232c
9 changed files with 402 additions and 51 deletions

View File

@@ -1,34 +0,0 @@
package main
import (
"fmt"
"github.com/duanhf2012/origin/cluster"
"github.com/duanhf2012/origin/originnode"
"github.com/duanhf2012/origin/service"
)
type SubNet1_Service struct {
service.BaseService
}
func init() {
originnode.InitService(&SubNet1_Service{})
}
//OnInit ...
func (ws *SubNet1_Service) OnInit() error {
return nil
}
//OnRun ...
func (ws *SubNet1_Service) OnRun() bool {
var in InputData
var ret int
in.A1 = 10
in.A2 = 20
err := cluster.Call("SubNet2_Service1.RPC_Multi", &in, &ret)
fmt.Printf("%+v", err)
return false
}

View File

@@ -0,0 +1,64 @@
package logicservice
import (
"github.com/duanhf2012/origin/Test/msgpb"
"github.com/duanhf2012/origin/network"
"github.com/duanhf2012/origin/sysservice"
"github.com/golang/protobuf/proto"
"time"
"github.com/duanhf2012/origin/originnode"
"github.com/duanhf2012/origin/service"
)
type SubNet1_Service struct {
service.BaseService
}
func init() {
originnode.InitService(&SubNet1_Service{})
}
//OnInit ...
func (ws *SubNet1_Service) OnInit() error {
sysservice.DefaultTSPbService().RegConnectEvent(ws.ConnEventHandler)
sysservice.DefaultTSPbService().RegDisconnectEvent(ws.DisconnEventHandler)
sysservice.DefaultTSPbService().RegExceptMessage(ws.ExceptMessage)
sysservice.DefaultTSPbService().RegMessage(110,&msgpb.Test{},ws.MessageHandler)
return nil
}
//OnRun ...
func (ws *SubNet1_Service) OnRun() bool {
time.Sleep(time.Second*10)
var cli network.TcpSocketClient
cli.Connect("127.0.0.1:9004")
test := msgpb.Test{}
test.AssistCount = proto.Int32(343)
cli.SendMsg(110,&test)
cli.SendMsg(110,&test)
return false
}
func (ws *SubNet1_Service) MessageHandler(pClient *network.SClient,msgtype uint16,msg proto.Message){
}
func (ws *SubNet1_Service) ConnEventHandler (pClient *network.SClient){
}
func (ws *SubNet1_Service) DisconnEventHandler (pClient *network.SClient){
}
func (ws *SubNet1_Service) ExceptMessage(pClient *network.SClient,pPack *network.MsgBasePack,err error){
}

View File

@@ -1,11 +1,12 @@
package main package main
import ( import (
_ "github.com/duanhf2012/origin/Test/logicservice"
"github.com/duanhf2012/origin/cluster" "github.com/duanhf2012/origin/cluster"
"github.com/duanhf2012/origin/network"
"github.com/duanhf2012/origin/originnode" "github.com/duanhf2012/origin/originnode"
"github.com/duanhf2012/origin/sysservice" "github.com/duanhf2012/origin/sysservice"
"github.com/duanhf2012/origin/sysservice/originhttp" "github.com/duanhf2012/origin/sysservice/originhttp"
"github.com/duanhf2012/origin/network"
) )
@@ -21,9 +22,7 @@ func (slf *TcpSocketServerReciver) OnDisconnect(pClient *network.SClient){
} }
func (slf *TcpSocketServerReciver) OnRecvMsg(pClient *network.SClient, pPack *network.MsgBasePack){
}
func main() { func main() {
@@ -38,7 +37,7 @@ func main() {
httpserver.SetHttps(ca.CertFile, ca.KeyFile) httpserver.SetHttps(ca.CertFile, ca.KeyFile)
} }
pTcpService := sysservice.NewTcpSocketPbService(":9004",&TcpSocketServerReciver{}) pTcpService := sysservice.NewTcpSocketPbService(":9004")
httpserver.SetPrintRequestTime(true) httpserver.SetPrintRequestTime(true)
node.SetupService(httpserver,pTcpService) node.SetupService(httpserver,pTcpService)

1
Test/msgpb/gen.bat Normal file
View File

@@ -0,0 +1 @@
protoc --go_out=. test.proto

132
Test/msgpb/test.pb.go Normal file
View File

@@ -0,0 +1,132 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// source: test.proto
package msgpb
import (
fmt "fmt"
proto "github.com/golang/protobuf/proto"
math "math"
)
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
// This is a compile-time assertion to ensure that this generated file
// is compatible with the proto package it is being compiled against.
// A compilation error at this line likely means your copy of the
// proto package needs to be updated.
const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package
//*
// @brief base_score_info
type Test struct {
WinCount *int32 `protobuf:"varint,1,opt,name=win_count,json=winCount" json:"win_count,omitempty"`
LoseCount *int32 `protobuf:"varint,2,opt,name=lose_count,json=loseCount" json:"lose_count,omitempty"`
ExceptionCount *int32 `protobuf:"varint,3,opt,name=exception_count,json=exceptionCount" json:"exception_count,omitempty"`
KillCount *int32 `protobuf:"varint,4,opt,name=kill_count,json=killCount" json:"kill_count,omitempty"`
DeathCount *int32 `protobuf:"varint,5,opt,name=death_count,json=deathCount" json:"death_count,omitempty"`
AssistCount *int32 `protobuf:"varint,6,opt,name=assist_count,json=assistCount" json:"assist_count,omitempty"`
Rating *int64 `protobuf:"varint,7,opt,name=rating" json:"rating,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *Test) Reset() { *m = Test{} }
func (m *Test) String() string { return proto.CompactTextString(m) }
func (*Test) ProtoMessage() {}
func (*Test) Descriptor() ([]byte, []int) {
return fileDescriptor_c161fcfdc0c3ff1e, []int{0}
}
func (m *Test) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_Test.Unmarshal(m, b)
}
func (m *Test) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_Test.Marshal(b, m, deterministic)
}
func (m *Test) XXX_Merge(src proto.Message) {
xxx_messageInfo_Test.Merge(m, src)
}
func (m *Test) XXX_Size() int {
return xxx_messageInfo_Test.Size(m)
}
func (m *Test) XXX_DiscardUnknown() {
xxx_messageInfo_Test.DiscardUnknown(m)
}
var xxx_messageInfo_Test proto.InternalMessageInfo
func (m *Test) GetWinCount() int32 {
if m != nil && m.WinCount != nil {
return *m.WinCount
}
return 0
}
func (m *Test) GetLoseCount() int32 {
if m != nil && m.LoseCount != nil {
return *m.LoseCount
}
return 0
}
func (m *Test) GetExceptionCount() int32 {
if m != nil && m.ExceptionCount != nil {
return *m.ExceptionCount
}
return 0
}
func (m *Test) GetKillCount() int32 {
if m != nil && m.KillCount != nil {
return *m.KillCount
}
return 0
}
func (m *Test) GetDeathCount() int32 {
if m != nil && m.DeathCount != nil {
return *m.DeathCount
}
return 0
}
func (m *Test) GetAssistCount() int32 {
if m != nil && m.AssistCount != nil {
return *m.AssistCount
}
return 0
}
func (m *Test) GetRating() int64 {
if m != nil && m.Rating != nil {
return *m.Rating
}
return 0
}
func init() {
proto.RegisterType((*Test)(nil), "msgpb.test")
}
func init() { proto.RegisterFile("test.proto", fileDescriptor_c161fcfdc0c3ff1e) }
var fileDescriptor_c161fcfdc0c3ff1e = []byte{
// 178 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x3c, 0x8c, 0x5d, 0x0a, 0x82, 0x40,
0x14, 0x46, 0x99, 0xfc, 0x29, 0xaf, 0x51, 0xe0, 0x43, 0x08, 0x11, 0x59, 0x2f, 0xf9, 0xd4, 0x26,
0xda, 0x81, 0x1b, 0x08, 0xb3, 0xc1, 0x86, 0x6c, 0x46, 0xbc, 0x37, 0x6c, 0xc5, 0xad, 0x23, 0x66,
0xee, 0xe4, 0xe3, 0x77, 0xce, 0xe1, 0x03, 0x20, 0x89, 0x74, 0xee, 0x07, 0x43, 0x26, 0x8b, 0x5e,
0xd8, 0xf6, 0xb7, 0xe3, 0x57, 0x40, 0x68, 0x69, 0xb6, 0x85, 0x64, 0x54, 0xfa, 0xda, 0x98, 0xb7,
0xa6, 0x5c, 0x14, 0xa2, 0x8c, 0xaa, 0xc5, 0xa8, 0xf4, 0xc5, 0xee, 0x6c, 0x07, 0xd0, 0x19, 0x94,
0xde, 0xce, 0x9c, 0x4d, 0x2c, 0x61, 0x7d, 0x82, 0xb5, 0xfc, 0x34, 0xb2, 0x27, 0x65, 0xfe, 0x0f,
0x81, 0x6b, 0x56, 0x13, 0x9e, 0x7e, 0x9e, 0xaa, 0xeb, 0x7c, 0x13, 0xf2, 0x8f, 0x25, 0xac, 0xf7,
0x90, 0xde, 0x65, 0x4d, 0x0f, 0xef, 0x23, 0xe7, 0xc1, 0x21, 0x0e, 0x0e, 0xb0, 0xac, 0x11, 0x15,
0x92, 0x2f, 0x62, 0x57, 0xa4, 0xcc, 0x38, 0xd9, 0x40, 0x3c, 0xd4, 0xa4, 0x74, 0x9b, 0xcf, 0x0b,
0x51, 0x06, 0x95, 0x5f, 0xbf, 0x00, 0x00, 0x00, 0xff, 0xff, 0x46, 0xa0, 0x89, 0x59, 0xfc, 0x00,
0x00, 0x00,
}

18
Test/msgpb/test.proto Normal file
View File

@@ -0,0 +1,18 @@
syntax = "proto2";
package msgpb;
/**
* @brief base_score_info
*/
message test{
optional int32 win_count = 1; // 玩家胜局局数
optional int32 lose_count = 2; // 玩家负局局数
optional int32 exception_count = 3; // 玩家异常局局数
optional int32 kill_count = 4; // 总人头数
optional int32 death_count = 5; // 总死亡数
optional int32 assist_count = 6; // 总总助攻数
optional int64 rating = 7; // 评价积分
}

View File

@@ -0,0 +1,42 @@
package network
import (
"fmt"
"github.com/golang/protobuf/proto"
"net"
)
type TcpSocketClient struct {
conn net.Conn
}
func (slf *TcpSocketClient) Connect(addr string) error{
tcpAddr,terr := net.ResolveTCPAddr("tcp",addr)
if terr != nil {
return terr
}
conn,err := net.DialTCP("tcp",nil,tcpAddr)
if err!=nil {
fmt.Println("Client connect error ! " + err.Error())
return err
}
slf.conn = conn
//
return nil
}
func (slf *TcpSocketClient) SendMsg(packtype uint16,message proto.Message) error{
var msg MsgBasePack
data,err := proto.Marshal(message)
if err != nil {
return err
}
msg.Make(packtype,data)
slf.conn.Write(msg.Bytes())
return nil
}

View File

@@ -7,6 +7,7 @@ import (
"github.com/duanhf2012/origin/service" "github.com/duanhf2012/origin/service"
"io" "io"
"net" "net"
"unsafe"
"os" "os"
"time" "time"
@@ -23,11 +24,12 @@ type SClient struct {
id uint64 id uint64
conn net.Conn conn net.Conn
recvPack util.SyncQueue recvPack *util.SyncQueue
sendPack util.SyncQueue sendPack *util.SyncQueue
tcpserver *TcpSocketServer tcpserver *TcpSocketServer
remoteip string remoteip string
starttime int64 starttime int64
bClose bool
} }
type TcpSocketServer struct { type TcpSocketServer struct {
@@ -46,6 +48,15 @@ type MsgBasePack struct {
StartTime time.Time StartTime time.Time
} }
func (slf *MsgBasePack) PackType() uint16 {
return slf.packtype
}
func (slf *MsgBasePack) Body() []byte{
return slf.body
}
func (slf *TcpSocketServer) Register(listenAddr string,iReciver ITcpSocketServerReciver){ func (slf *TcpSocketServer) Register(listenAddr string,iReciver ITcpSocketServerReciver){
slf.listenAddr = listenAddr slf.listenAddr = listenAddr
slf.iReciver = iReciver slf.iReciver = iReciver
@@ -80,7 +91,8 @@ func (slf *TcpSocketServer) listenServer(){
continue continue
} }
sc :=&SClient{id:clientId,conn:conn,tcpserver:slf,remoteip:conn.RemoteAddr().String(),starttime:time.Now().UnixNano()} sc :=&SClient{id:clientId,conn:conn,tcpserver:slf,remoteip:conn.RemoteAddr().String(),starttime:time.Now().UnixNano(),
recvPack:util.NewSyncQueue(),sendPack:util.NewSyncQueue()}
slf.iReciver.OnConnected(sc) slf.iReciver.OnConnected(sc)
util.Go(sc.listendata) util.Go(sc.listendata)
//收来自客户端数据 //收来自客户端数据
@@ -98,6 +110,7 @@ func (slf *TcpSocketServer) listenServer(){
func (slf *SClient) listendata(){ func (slf *SClient) listendata(){
defer func() { defer func() {
slf.tcpserver.iReciver.OnDisconnect(slf) slf.tcpserver.iReciver.OnDisconnect(slf)
slf.bClose = true
slf.conn.Close() slf.conn.Close()
slf.tcpserver.mapClient.Del(slf.id) slf.tcpserver.mapClient.Del(slf.id)
}() }()
@@ -137,12 +150,14 @@ func (slf *SClient) listendata(){
} }
func (slf *MsgBasePack) Bytes() (bRet []byte){ func (slf *MsgBasePack) Bytes() []byte{
var bRet []byte
bRet = make([]byte,4)
binary.BigEndian.PutUint16(bRet,slf.packsize) binary.BigEndian.PutUint16(bRet,slf.packsize)
binary.BigEndian.PutUint16(bRet,slf.packtype) binary.BigEndian.PutUint16(bRet[2:],slf.packtype)
bRet = append(bRet,slf.body...) bRet = append(bRet,slf.body...)
return return bRet
} }
//返回值:填充多少字节,是否完成 //返回值:填充多少字节,是否完成
@@ -155,7 +170,7 @@ func (slf *MsgBasePack) FillData(bdata []byte,datasize uint16) (uint16,bool) {
} }
slf.packsize= binary.BigEndian.Uint16(bdata[:2]) slf.packsize= binary.BigEndian.Uint16(bdata[:2])
slf.packtype= binary.BigEndian.Uint16(bdata[2:2]) slf.packtype= binary.BigEndian.Uint16(bdata[2:4])
fillsize += 4 fillsize += 4
} }
@@ -172,6 +187,12 @@ func (slf *MsgBasePack) FillData(bdata []byte,datasize uint16) (uint16,bool) {
func (slf *MsgBasePack) Clear() { func (slf *MsgBasePack) Clear() {
} }
func (slf *MsgBasePack) Make(packtype uint16,data []byte) {
slf.packtype = packtype
slf.body = data
slf.packsize = uint16(unsafe.Sizeof(slf.packtype)*2)+uint16(len(data))
}
func (slf *SClient) Send(pack *MsgBasePack){ func (slf *SClient) Send(pack *MsgBasePack){
slf.sendPack.Push(pack) slf.sendPack.Push(pack)
} }
@@ -199,5 +220,9 @@ func (slf *SClient) onrecv(){
} }
} }
func (slf *SClient) Close(){
if slf.bClose == false {
slf.conn.Close()
}
}

View File

@@ -1,25 +1,38 @@
package sysservice package sysservice
import ( import (
"errors"
"github.com/duanhf2012/origin/network" "github.com/duanhf2012/origin/network"
"github.com/duanhf2012/origin/service" "github.com/duanhf2012/origin/service"
"github.com/golang/protobuf/proto"
"reflect"
) )
type TcpSocketPbService struct { type TcpSocketPbService struct {
service.BaseService service.BaseService
listenaddr string listenaddr string
tcpsocketserver network.TcpSocketServer tcpsocketserver network.TcpSocketServer
reciver network.ITcpSocketServerReciver mapMsg map[uint16]MessageInfo
connEvent EventHandler
disconnEvent EventHandler
exceptMsgHandler ExceptMsgHandler
} }
func NewTcpSocketPbService(listenaddr string,reciver network.ITcpSocketServerReciver) *TcpSocketPbService { type MessageHandler func(pClient *network.SClient,msgtype uint16,msg proto.Message)
type EventHandler func(pClient *network.SClient)
type ExceptMsgHandler func(pClient *network.SClient,pPack *network.MsgBasePack,err error)
func NewTcpSocketPbService(listenaddr string) *TcpSocketPbService {
ts := new(TcpSocketPbService) ts := new(TcpSocketPbService)
ts.listenaddr = listenaddr ts.listenaddr = listenaddr
ts.reciver = reciver ts.mapMsg = make(map[uint16]MessageInfo,1)
ts.tcpsocketserver.Register(listenaddr,ts)
ts.tcpsocketserver.Register(listenaddr,reciver)
return ts return ts
} }
@@ -29,6 +42,97 @@ func (slf *TcpSocketPbService) OnInit() error {
func (slf *TcpSocketPbService) OnRun() bool { func (slf *TcpSocketPbService) OnRun() bool {
slf.tcpsocketserver.Start() slf.tcpsocketserver.Start()
/*
slf.RegisterMessage(10,&msgpb.Test{},slf.Test)
var testpack network.MsgBasePack
a := msgpb.Test{}
a.WinCount =proto.Int32(33)
d,err := proto.Marshal(&a)
fmt.Print(err)
testpack.Make(10,d)
slf.OnRecvMsg(nil,&testpack)
*/
return false return false
} }
type MessageInfo struct {
msgType reflect.Type
msgHandler MessageHandler
}
func (slf *TcpSocketPbService) RegMessage(msgtype uint16,msg proto.Message,handle MessageHandler){
var info MessageInfo
info.msgType = reflect.TypeOf(msg.(proto.Message))
info.msgHandler = handle
slf.mapMsg[msgtype] = info
}
func (slf *TcpSocketPbService) RegConnectEvent(eventHandler EventHandler){
slf.connEvent = eventHandler
}
func (slf *TcpSocketPbService) RegDisconnectEvent(eventHandler EventHandler){
slf.disconnEvent = eventHandler
}
func (slf *TcpSocketPbService) RegExceptMessage(exceptMsgHandler ExceptMsgHandler){
slf.exceptMsgHandler = exceptMsgHandler
}
func (slf *TcpSocketPbService) OnConnected(pClient *network.SClient){
if slf.connEvent!=nil {
slf.connEvent(pClient)
}
}
func (slf *TcpSocketPbService) OnDisconnect(pClient *network.SClient){
if slf.disconnEvent!=nil {
slf.disconnEvent(pClient)
}
}
func (slf *TcpSocketPbService) OnExceptMsg (pClient *network.SClient,pPack *network.MsgBasePack,err error){
if slf.exceptMsgHandler!=nil {
slf.exceptMsgHandler(pClient,pPack,err)
}else{
pClient.Close()
//记录日志
service.GetLogger().Printf(service.LEVER_WARN, "OnExceptMsg packtype %d,error %+v",pPack.PackType(),err)
}
}
func (slf *TcpSocketPbService) OnRecvMsg(pClient *network.SClient, pPack *network.MsgBasePack){
if info, ok := slf.mapMsg[pPack.PackType()]; ok {
msg := reflect.New(info.msgType.Elem()).Interface()
tmp := msg.(proto.Message)
err := proto.Unmarshal(pPack.Body(), tmp)
if err != nil {
slf.OnExceptMsg(pClient,pPack,err)
return
}
info.msgHandler(pClient,pPack.PackType(), msg.(proto.Message))
return
}
slf.OnExceptMsg(pClient,pPack,errors.New("not found PackType"))
return
}
func DefaultTSPbService() *TcpSocketPbService{
iservice := service.InstanceServiceMgr().FindService("TcpSocketPbService")
if iservice == nil {
return nil
}
return iservice.(*TcpSocketPbService)
}