新增TcpGateWay服务,支持通过配置进行路由转发

This commit is contained in:
boyce
2020-10-07 16:14:19 +08:00
parent 40ff2f7932
commit 0c55961c44
17 changed files with 1203 additions and 58 deletions

View File

@@ -19,20 +19,17 @@ type Event struct {
}
type IEventHandler interface {
Init(processor IEventProcessor)
GetEventProcessor() IEventProcessor //获得事件
NotifyEvent(*Event)
Desctory()
//注册了事件
addRegInfo(eventType EventType,eventProcessor IEventProcessor)
removeRegInfo(eventType EventType,eventProcessor IEventProcessor)
}
type IEventProcessor interface {
EventHandler(ev *Event)
//同一个IEventHandler只能接受一个EventType类型回调
RegEventReciverFunc(eventType EventType,reciver IEventHandler,callback EventCallBack)
UnRegEventReciverFun(eventType EventType,reciver IEventHandler)
@@ -44,6 +41,7 @@ type IEventProcessor interface {
addListen(eventType EventType,reciver IEventHandler)
removeBindEvent(eventType EventType,reciver IEventHandler)
removeListen(eventType EventType,reciver IEventHandler)
GetEventChan() chan *Event
}
type EventHandler struct {
@@ -64,6 +62,21 @@ type EventProcessor struct {
mapBindHandlerEvent map[EventType]map[IEventHandler]EventCallBack//收到事件处理
}
func NewEventHandler() IEventHandler{
eh := EventHandler{}
eh.mapRegEvent = map[EventType]map[IEventProcessor]interface{}{}
return &eh
}
func NewEventProcessor() IEventProcessor{
ep := EventProcessor{}
ep.mapListenerEvent = map[EventType]map[IEventProcessor]int{}
ep.mapBindHandlerEvent = map[EventType]map[IEventHandler]EventCallBack{}
return &ep
}
func (slf *EventHandler) addRegInfo(eventType EventType,eventProcessor IEventProcessor){
slf.locker.Lock()
defer slf.locker.Unlock()
@@ -93,6 +106,7 @@ func (slf *EventHandler) NotifyEvent(ev *Event){
func (slf *EventHandler) Init(processor IEventProcessor){
slf.eventProcessor = processor
slf.mapRegEvent =map[EventType]map[IEventProcessor]interface{}{}
}
@@ -112,12 +126,8 @@ func (slf *EventProcessor) SetEventChannel(channelNum int) bool{
}
func (slf *EventProcessor) addBindEvent(eventType EventType,reciver IEventHandler,callback EventCallBack){
//mapBindHandlerEvent map[EventType]map[IEventHandler]EventCallBack//收到事件处理
slf.locker.Lock()
defer slf.locker.Unlock()
if slf.mapBindHandlerEvent == nil {
slf.mapBindHandlerEvent = map[EventType]map[IEventHandler]EventCallBack{}
}
if _,ok := slf.mapBindHandlerEvent[eventType]; ok == false {
slf.mapBindHandlerEvent[eventType] = map[IEventHandler]EventCallBack{}
@@ -130,11 +140,6 @@ func (slf *EventProcessor) addListen(eventType EventType,reciver IEventHandler){
slf.locker.Lock()
defer slf.locker.Unlock()
//mapListenerEvent map[EventType]map[IEventProcessor]int
if slf.mapListenerEvent == nil {
slf.mapListenerEvent = map[EventType]map[IEventProcessor]int{}
}
if _,ok :=slf.mapListenerEvent[eventType];ok == false{
slf.mapListenerEvent[eventType] = map[IEventProcessor]int{}
}

View File

@@ -1,17 +0,0 @@
package network
type Processor interface {
// must goroutine safe
MsgRoute(msg interface{}, userData interface{}) error
//must goroutine safe
UnknownMsgRoute(msg interface{}, userData interface{})
// connect event
ConnectedRoute(userData interface{})
DisConnectedRoute(userData interface{})
// must goroutine safe
Unmarshal(data []byte) (interface{}, error)
// must goroutine safe
Marshal(msg interface{}) ([]byte, error)
}

View File

@@ -0,0 +1,119 @@
package processor
import (
"encoding/binary"
"reflect"
)
type RawMessageInfo struct {
msgType reflect.Type
msgHandler RawMessageHandler
}
type RawMessageHandler func(clientId uint64,packType uint16,msg []byte)
type RawConnectHandler func(clientId uint64)
type UnknownRawMessageHandler func(clientId uint64,msg []byte)
const RawMsgTypeSize = 2
type PBRawProcessor struct {
msgHandler RawMessageHandler
LittleEndian bool
unknownMessageHandler UnknownRawMessageHandler
connectHandler RawConnectHandler
disconnectHandler RawConnectHandler
}
func NewPBRawProcessor() *PBRawProcessor {
processor := &PBRawProcessor{}
return processor
}
func (p *PBRawProcessor) SetByteOrder(littleEndian bool) {
p.LittleEndian = littleEndian
}
type PBRawPackInfo struct {
typ uint16
rawMsg []byte
}
func (slf *PBRawPackInfo) GetPackType() uint16 {
return slf.typ
}
func (slf *PBRawPackInfo) GetMsg() []byte {
return slf.rawMsg
}
// must goroutine safe
func (slf *PBRawProcessor ) MsgRoute(msg interface{},userdata interface{}) error{
pPackInfo := msg.(*PBRawPackInfo)
slf.msgHandler(userdata.(uint64),pPackInfo.typ,pPackInfo.rawMsg)
return nil
}
// must goroutine safe
func (slf *PBRawProcessor ) Unmarshal(data []byte) (interface{}, error) {
var msgType uint16
if slf.LittleEndian == true {
msgType = binary.LittleEndian.Uint16(data[:2])
}else{
msgType = binary.BigEndian.Uint16(data[:2])
}
return &PBRawPackInfo{typ:msgType,rawMsg:data[2:]},nil
}
// must goroutine safe
func (slf *PBRawProcessor ) Marshal(msg interface{}) ([]byte, error){
pMsg := msg.(*PBRawPackInfo)
buff := make([]byte, 2, len(pMsg.rawMsg)+RawMsgTypeSize)
if slf.LittleEndian == true {
binary.LittleEndian.PutUint16(buff[:2],pMsg.typ)
}else{
binary.BigEndian.PutUint16(buff[:2],pMsg.typ)
}
buff = append(buff,pMsg.rawMsg...)
return buff,nil
}
func (slf *PBRawProcessor) SetRawMsgHandler(handle RawMessageHandler) {
slf.msgHandler = handle
}
func (slf *PBRawProcessor) MakeRawMsg(msgType uint16,msg []byte) *PBRawPackInfo {
return &PBRawPackInfo{typ:msgType,rawMsg:msg}
}
func (slf *PBRawProcessor) UnknownMsgRoute(msg interface{}, userData interface{}){
if slf.unknownMessageHandler == nil {
return
}
slf.unknownMessageHandler(userData.(uint64),msg.([]byte))
}
// connect event
func (slf *PBRawProcessor) ConnectedRoute(userData interface{}){
slf.connectHandler(userData.(uint64))
}
func (slf *PBRawProcessor) DisConnectedRoute(userData interface{}){
slf.disconnectHandler(userData.(uint64))
}
func (slf *PBRawProcessor) SetUnknownMsgHandler(unknownMessageHandler UnknownRawMessageHandler){
slf.unknownMessageHandler = unknownMessageHandler
}
func (slf *PBRawProcessor) SetConnectedHandler(connectHandler RawConnectHandler){
slf.connectHandler = connectHandler
}
func (slf *PBRawProcessor) SetDisConnectedHandler(disconnectHandler RawConnectHandler){
slf.disconnectHandler = disconnectHandler
}

View File

@@ -0,0 +1,34 @@
package processor
type IProcessor interface {
// must goroutine safe
MsgRoute(msg interface{}, userData interface{}) error
//must goroutine safe
UnknownMsgRoute(msg interface{}, userData interface{})
// connect event
ConnectedRoute(userData interface{})
DisConnectedRoute(userData interface{})
// must goroutine safe
Unmarshal(data []byte) (interface{}, error)
// must goroutine safe
Marshal(msg interface{}) ([]byte, error)
}
type IRawProcessor interface {
SetByteOrder(littleEndian bool)
MsgRoute(msg interface{},userdata interface{}) error
Unmarshal(data []byte) (interface{}, error)
Marshal(msg interface{}) ([]byte, error)
SetRawMsgHandler(handle RawMessageHandler)
MakeRawMsg(msgType uint16,msg []byte) *PBRawPackInfo
UnknownMsgRoute(msg interface{}, userData interface{})
ConnectedRoute(userData interface{})
DisConnectedRoute(userData interface{})
SetUnknownMsgHandler(unknownMessageHandler UnknownRawMessageHandler)
SetConnectedHandler(connectHandler RawConnectHandler)
SetDisConnectedHandler(disconnectHandler RawConnectHandler)
}

View File

@@ -134,7 +134,7 @@ func (slf *RpcHandler) suitableMethods(method reflect.Method) error {
return fmt.Errorf("%s The return parameter must be of type error!",method.Name)
}
if typ.NumIn() <3 || typ.NumIn() > 4 {
if typ.NumIn() <2 || typ.NumIn() > 4 {
return fmt.Errorf("%s Unsupported parameter format!",method.Name)
}

View File

@@ -52,7 +52,8 @@ type Module struct {
//事件管道
moduleName string
eventHandler event.EventHandler
eventHandler event.IEventHandler
//eventHandler event.EventHandler
}
@@ -74,6 +75,7 @@ func (slf *Module) GetModuleName() string{
}
func (slf *Module) OnInit() error{
// slf.eventHandler = event.NewEventHandler()
return nil
}
@@ -100,6 +102,7 @@ func (slf *Module) AddModule(module IModule) (int64,error){
pAddModule.dispatcher = slf.GetAncestor().getBaseModule().(*Module).dispatcher
pAddModule.ancestor = slf.ancestor
pAddModule.moduleName = reflect.Indirect(reflect.ValueOf(module)).Type().Name()
pAddModule.eventHandler = event.NewEventHandler()
pAddModule.eventHandler.Init(slf.eventHandler.GetEventProcessor())
err := module.OnInit()
if err != nil {
@@ -216,5 +219,5 @@ func (slf *Module) NotifyEvent(ev *event.Event){
}
func (slf *Module) GetEventHandler() event.IEventHandler{
return &slf.eventHandler
return slf.eventHandler
}

View File

@@ -41,7 +41,9 @@ type Service struct {
serviceCfg interface{}
gorouterNum int32
startStatus bool
eventProcessor event.EventProcessor //事件接收者
eventProcessor event.IEventProcessor
//eventProcessor event.EventProcessor //事件接收者
profiler *profiler.Profiler //性能分析器
}
@@ -69,7 +71,9 @@ func (slf *Service) Init(iservice IService,getClientFun rpc.FuncRpcClient,getSer
slf.descendants = map[int64]IModule{}
slf.serviceCfg = serviceCfg
slf.gorouterNum = 1
slf.eventHandler.Init(&slf.eventProcessor)
slf.eventProcessor = event.NewEventProcessor()
slf.eventHandler = event.NewEventHandler()
slf.eventHandler.Init(slf.eventProcessor)
}
func (slf *Service) SetGoRouterNum(gorouterNum int32) bool {

View File

@@ -0,0 +1,77 @@
package tcpgateway
import (
"fmt"
"github.com/duanhf2012/origin/service"
"github.com/duanhf2012/origin/sysservice/tcpservice"
"github.com/golang/protobuf/proto"
)
type GateProxyModule struct{
service.Module
defaultGateRpc string
}
func NewGateProxyModule() *GateProxyModule{
return &GateProxyModule{defaultGateRpc:"TcpGateService.RPC_Dispatch"}
}
func (slf *GateProxyModule) Send(clientId interface{},msgType uint16,msg proto.Message) error {
//对agentId进行分组
mapNodeClientId := map[int][]uint64{}
switch clientId.(type) {
case uint64:
id := clientId.(uint64)
nodeId := tcpservice.GetNodeId(id)
mapNodeClientId[nodeId] = append(mapNodeClientId[nodeId],id)
case []uint64:
idList := clientId.([]uint64)
for _,id := range idList{
nodeId := tcpservice.GetNodeId(id)
mapNodeClientId[nodeId] = append(mapNodeClientId[nodeId],id)
}
}
bData,err := proto.Marshal(msg)
if err!=nil {
return err
}
var replyMsg ReplyMessage
replyMsg.MsgType = proto.Uint32(uint32(msgType))
replyMsg.Msg = bData
for nodeId,clientIdList := range mapNodeClientId {
if nodeId <0 || nodeId>tcpservice.MaxNodeId {
fmt.Errorf("nodeid is error %d",nodeId)
continue
}
replyMsg.ClientList = clientIdList
slf.GetService().GetRpcHandler().GoNode(nodeId,slf.defaultGateRpc,&replyMsg)
}
return nil
}
func (slf *GateProxyModule) SetDefaultGateRpcMethodName(rpcMethodName string){
slf.defaultGateRpc = rpcMethodName
}
func (slf *GateProxyModule) send(clientId uint64,msgType uint16,msg []byte) error {
nodeId := tcpservice.GetNodeId(clientId)
if nodeId <0 || nodeId>tcpservice.MaxNodeId {
return fmt.Errorf("nodeid is error %d",nodeId)
}
var replyMsg ReplyMessage
replyMsg.MsgType = proto.Uint32(uint32(msgType))
replyMsg.Msg = msg
replyMsg.ClientList = append(replyMsg.ClientList ,clientId)
return slf.GetService().GetRpcHandler().GoNode(nodeId,slf.defaultGateRpc,&replyMsg)
}

View File

@@ -0,0 +1,5 @@
package tcpgateway
type ILoadBalance interface {
SelectNode(serviceName string) int //选择一个结点,通过服务名称
}

View File

@@ -0,0 +1,11 @@
package tcpgateway
type IRouter interface {
RouterMessage(clientId uint64,msgType uint16,msg []byte) //消息转发
RouterEvent(clientId uint64,eventType string) bool//消息转发
Load() //加载路由规则
OnDisconnected(clientId uint64)
OnConnected(clientId uint64)
//ReplyMessage(clientId uint64,msg []byte)
}

View File

@@ -0,0 +1,8 @@
package tcpgateway
type LoadBalance struct {
}
func (slf *LoadBalance) SelectNode(serviceName string) int {
return 1
}

View File

@@ -0,0 +1,242 @@
package tcpgateway
import (
"github.com/duanhf2012/origin/log"
"github.com/duanhf2012/origin/node"
"github.com/duanhf2012/origin/rpc"
"github.com/duanhf2012/origin/sysservice/tcpservice"
"github.com/golang/protobuf/proto"
"strings"
)
type RouterCfg struct {
}
type Router struct {
loadBalance ILoadBalance
rpcHandler rpc.IRpcHandler
mapMsgRouterInfo map[uint16]*MsgRouterInfo
mapEventRouterInfo map[string]*EventRouterInfo
tcpService *tcpservice.TcpService
mapClientRouterCache map[uint64]map[string]int //map[clientid]nodeid
}
type MsgRouterInfo struct {
Rpc string
ServiceName string
LoadBalanceType string
}
type EventRouterInfo struct {
Rpc string
ServiceName string
LoadBalanceType string
}
func NewRouter(loadBalance ILoadBalance,rpcHandler rpc.IRpcHandler,cfg interface{}) IRouter {
router := &Router{}
router.loadBalance = loadBalance
router.rpcHandler = rpcHandler
router.tcpService = node.GetService("TcpService").(*tcpservice.TcpService)
router.loadCfg(cfg)
router.mapClientRouterCache = map[uint64]map[string]int{}
return router
}
func (slf *Router) loadCfg(cfg interface{}){
slf.mapMsgRouterInfo = map[uint16]*MsgRouterInfo{}
slf.mapEventRouterInfo = map[string]*EventRouterInfo{}
mapRouter,ok := cfg.(map[string]interface{})
if ok == false{
//error....
return
}
//parse MsgRouter
routerInfo,ok := mapRouter["MsgRouter"]
if ok == false{
//error...
return
}
//ar routerList []RouterItem
routerList,ok := routerInfo.([]interface{})
if ok == false{
//error...
return
}
for _,v := range routerList{
mapItem := v.(map[string]interface{})
var iMsgId interface{}
var iRpc interface{}
var iLoadBalanceType interface{}
if iMsgId,ok = mapItem["MsgId"];ok == false {
//error ...
continue
}
if iRpc,ok = mapItem["Rpc"];ok == false {
//error ...
continue
}
if iLoadBalanceType,ok = mapItem["LoadBalanceType"];ok == false {
//error ...
continue
}
msgId,ok := iMsgId.(float64)
if ok == false {
//error ...
continue
}
strService := strings.Split(iRpc.(string),".")
if len(strService)!=2 {
//error ...
continue
}
slf.mapMsgRouterInfo[uint16(msgId)] = &MsgRouterInfo{ServiceName:strService[0],Rpc: iRpc.(string),LoadBalanceType: iLoadBalanceType.(string)}
}
//parse EventRouter
eventRouterInfo,ok := mapRouter["EventRouter"]
if ok == false{
//error...
return
}
//ar routerList []RouterItem
eRouterList,ok := eventRouterInfo.([]interface{})
if ok == false{
//error...
return
}
for _,v := range eRouterList{
mapItem := v.(map[string]interface{})
var eventType interface{}
var iRpc interface{}
var iLoadBalanceType interface{}
if eventType,ok = mapItem["EventType"];ok == false {
//error ...
continue
}
if iRpc,ok = mapItem["Rpc"];ok == false {
//error ...
continue
}
if iLoadBalanceType,ok = mapItem["LoadBalanceType"];ok == false {
//error ...
continue
}
strEventType,ok := eventType.(string)
if ok == false {
//error ...
continue
}
strService := strings.Split(iRpc.(string),".")
if len(strService)!=2 {
//error ...
continue
}
slf.mapEventRouterInfo[strEventType] = &EventRouterInfo{ServiceName:strService[0],Rpc: iRpc.(string),LoadBalanceType: iLoadBalanceType.(string)}
}
}
func (slf *Router) GetMsgRouterService(msgType uint16) *MsgRouterInfo{
info,ok := slf.mapMsgRouterInfo[msgType]
if ok == false {
return nil
}
return info
}
func (slf *Router) GetEventRouterService(eventType string) *EventRouterInfo{
info,ok := slf.mapEventRouterInfo[eventType]
if ok == false {
return nil
}
return info
}
func (slf *Router) GetRouterId(clientId uint64,serviceName *string) int {
mapServiceRouter,ok := slf.mapClientRouterCache[clientId]
if ok == false{
return 0
}
routerId,ok := mapServiceRouter[*serviceName]
if ok == false {
return 0
}
return routerId
}
func (slf *Router) SetRouterId(clientId uint64,serviceName *string,routerId int){
slf.mapClientRouterCache[clientId][*serviceName] = routerId
}
func (slf *Router) RouterMessage(clientId uint64,msgType uint16,msg []byte) {
routerInfo:= slf.GetMsgRouterService(msgType)
if routerInfo==nil {
log.Error("The message type is %d with no configured route!",msgType)
return
}
routerId := slf.GetRouterId(clientId,&routerInfo.ServiceName)
if routerId ==0 {
routerId = slf.loadBalance.SelectNode(routerInfo.ServiceName)
slf.SetRouterId(clientId,&routerInfo.ServiceName,routerId)
}
if routerId>0 {
slf.rpcHandler.RawGoNode(routerId,routerInfo.Rpc,msg,proto.Uint64(clientId))
}
}
func (slf *Router) Load(){
}
func (slf *Router) RouterEvent(clientId uint64,eventType string) bool{
routerInfo:= slf.GetEventRouterService(eventType)
if routerInfo==nil {
log.Error("The event type is %s with no register!",eventType)
return false
}
routerId := slf.GetRouterId(clientId,&routerInfo.ServiceName)
if routerId ==0 {
routerId = slf.loadBalance.SelectNode(routerInfo.ServiceName)
slf.SetRouterId(clientId,&routerInfo.ServiceName,routerId)
}
if routerId>0 {
slf.rpcHandler.RawGoNode(routerId,routerInfo.Rpc,[]byte{},proto.Uint64(clientId))
return true
}
return false
}
func (slf *Router) OnDisconnected(clientId uint64){
delete(slf.mapClientRouterCache,clientId)
//通知事件
slf.RouterEvent(clientId,"DisConnect")
}
func (slf *Router) OnConnected(clientId uint64){
slf.mapClientRouterCache[clientId] = map[string]int{}
slf.RouterEvent(clientId,"Connect")
}

View File

@@ -0,0 +1,81 @@
package tcpgateway
import (
"github.com/duanhf2012/origin/log"
"github.com/duanhf2012/origin/network/processor"
"github.com/duanhf2012/origin/node"
"github.com/duanhf2012/origin/service"
"github.com/duanhf2012/origin/sysservice/tcpservice"
)
func init(){
node.Setup(&tcpservice.TcpService{})
node.Setup(&TcpGateService{})
}
type MsgTypeRouterInfo struct {
router IRouter
serviceName string
}
type TcpGateService struct {
service.Service
processor processor.IRawProcessor
tcpService *tcpservice.TcpService
loadBalance ILoadBalance
router IRouter
}
func (slf *TcpGateService) OnInit() error {
slf.OnLoad()
//获取安装好了的TcpService对象
slf.tcpService = node.GetService("TcpService").(*tcpservice.TcpService)
//新建内置的protobuf处理器您也可以自定义路由器比如json后续会补充
slf.processor = processor.NewPBRawProcessor()
//注册监听客户连接断开事件
slf.processor.SetDisConnectedHandler(slf.router.OnDisconnected)
//注册监听客户连接事件
slf.processor.SetConnectedHandler(slf.router.OnConnected)
//注册监听消息类型MsgType_MsgReq并注册回调
slf.processor.SetRawMsgHandler(slf.router.RouterMessage)
//将protobuf消息处理器设置到TcpService服务中
slf.tcpService.SetProcessor(slf.processor,slf.GetEventHandler())
return nil
}
func (slf *TcpGateService) OnLoad() {
slf.loadBalance = &LoadBalance{}
slf.router = NewRouter(slf.loadBalance,slf,slf.GetServiceCfg())
//加载路由
slf.router.Load()
}
func (slf *TcpGateService) SetupLoadBalance(loadBalance ILoadBalance){
slf.loadBalance = loadBalance
}
func (slf *TcpGateService) SetupRouter(router IRouter){
slf.router = router
}
func (slf *TcpGateService) RPC_Dispatch(replyMsg *ReplyMessage) error {
for _,id := range replyMsg.ClientList {
err := slf.tcpService.SendRawMsg(id,replyMsg.Msg)
if err != nil {
log.Debug("SendRawMsg fail:%+v!",err)
}
}
return nil
}

View File

@@ -0,0 +1,473 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// source: base/base.proto
package tcpgateway
import (
fmt "fmt"
proto "github.com/golang/protobuf/proto"
math "math"
)
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
// This is a compile-time assertion to ensure that this generated file
// is compatible with the proto package it is being compiled against.
// A compilation error at this line likely means your copy of the
// proto package needs to be updated.
const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package
type Int struct {
Value *int32 `protobuf:"varint,1,opt,name=value" json:"value,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *Int) Reset() { *m = Int{} }
func (m *Int) String() string { return proto.CompactTextString(m) }
func (*Int) ProtoMessage() {}
func (*Int) Descriptor() ([]byte, []int) {
return fileDescriptor_d66ec2e140567106, []int{0}
}
func (m *Int) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_Int.Unmarshal(m, b)
}
func (m *Int) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_Int.Marshal(b, m, deterministic)
}
func (m *Int) XXX_Merge(src proto.Message) {
xxx_messageInfo_Int.Merge(m, src)
}
func (m *Int) XXX_Size() int {
return xxx_messageInfo_Int.Size(m)
}
func (m *Int) XXX_DiscardUnknown() {
xxx_messageInfo_Int.DiscardUnknown(m)
}
var xxx_messageInfo_Int proto.InternalMessageInfo
func (m *Int) GetValue() int32 {
if m != nil && m.Value != nil {
return *m.Value
}
return 0
}
type Int64 struct {
Value *int64 `protobuf:"varint,1,opt,name=value" json:"value,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *Int64) Reset() { *m = Int64{} }
func (m *Int64) String() string { return proto.CompactTextString(m) }
func (*Int64) ProtoMessage() {}
func (*Int64) Descriptor() ([]byte, []int) {
return fileDescriptor_d66ec2e140567106, []int{1}
}
func (m *Int64) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_Int64.Unmarshal(m, b)
}
func (m *Int64) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_Int64.Marshal(b, m, deterministic)
}
func (m *Int64) XXX_Merge(src proto.Message) {
xxx_messageInfo_Int64.Merge(m, src)
}
func (m *Int64) XXX_Size() int {
return xxx_messageInfo_Int64.Size(m)
}
func (m *Int64) XXX_DiscardUnknown() {
xxx_messageInfo_Int64.DiscardUnknown(m)
}
var xxx_messageInfo_Int64 proto.InternalMessageInfo
func (m *Int64) GetValue() int64 {
if m != nil && m.Value != nil {
return *m.Value
}
return 0
}
type Bool struct {
Value *bool `protobuf:"varint,1,opt,name=value" json:"value,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *Bool) Reset() { *m = Bool{} }
func (m *Bool) String() string { return proto.CompactTextString(m) }
func (*Bool) ProtoMessage() {}
func (*Bool) Descriptor() ([]byte, []int) {
return fileDescriptor_d66ec2e140567106, []int{2}
}
func (m *Bool) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_Bool.Unmarshal(m, b)
}
func (m *Bool) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_Bool.Marshal(b, m, deterministic)
}
func (m *Bool) XXX_Merge(src proto.Message) {
xxx_messageInfo_Bool.Merge(m, src)
}
func (m *Bool) XXX_Size() int {
return xxx_messageInfo_Bool.Size(m)
}
func (m *Bool) XXX_DiscardUnknown() {
xxx_messageInfo_Bool.DiscardUnknown(m)
}
var xxx_messageInfo_Bool proto.InternalMessageInfo
func (m *Bool) GetValue() bool {
if m != nil && m.Value != nil {
return *m.Value
}
return false
}
type String struct {
Value *string `protobuf:"bytes,1,opt,name=value" json:"value,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *String) Reset() { *m = String{} }
func (m *String) String() string { return proto.CompactTextString(m) }
func (*String) ProtoMessage() {}
func (*String) Descriptor() ([]byte, []int) {
return fileDescriptor_d66ec2e140567106, []int{3}
}
func (m *String) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_String.Unmarshal(m, b)
}
func (m *String) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_String.Marshal(b, m, deterministic)
}
func (m *String) XXX_Merge(src proto.Message) {
xxx_messageInfo_String.Merge(m, src)
}
func (m *String) XXX_Size() int {
return xxx_messageInfo_String.Size(m)
}
func (m *String) XXX_DiscardUnknown() {
xxx_messageInfo_String.DiscardUnknown(m)
}
var xxx_messageInfo_String proto.InternalMessageInfo
func (m *String) GetValue() string {
if m != nil && m.Value != nil {
return *m.Value
}
return ""
}
type Bytes struct {
Value []byte `protobuf:"bytes,1,opt,name=value" json:"value,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *Bytes) Reset() { *m = Bytes{} }
func (m *Bytes) String() string { return proto.CompactTextString(m) }
func (*Bytes) ProtoMessage() {}
func (*Bytes) Descriptor() ([]byte, []int) {
return fileDescriptor_d66ec2e140567106, []int{4}
}
func (m *Bytes) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_Bytes.Unmarshal(m, b)
}
func (m *Bytes) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_Bytes.Marshal(b, m, deterministic)
}
func (m *Bytes) XXX_Merge(src proto.Message) {
xxx_messageInfo_Bytes.Merge(m, src)
}
func (m *Bytes) XXX_Size() int {
return xxx_messageInfo_Bytes.Size(m)
}
func (m *Bytes) XXX_DiscardUnknown() {
xxx_messageInfo_Bytes.DiscardUnknown(m)
}
var xxx_messageInfo_Bytes proto.InternalMessageInfo
func (m *Bytes) GetValue() []byte {
if m != nil {
return m.Value
}
return nil
}
type MsgHead struct {
Cid *int32 `protobuf:"varint,1,opt,name=cid" json:"cid,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *MsgHead) Reset() { *m = MsgHead{} }
func (m *MsgHead) String() string { return proto.CompactTextString(m) }
func (*MsgHead) ProtoMessage() {}
func (*MsgHead) Descriptor() ([]byte, []int) {
return fileDescriptor_d66ec2e140567106, []int{5}
}
func (m *MsgHead) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_MsgHead.Unmarshal(m, b)
}
func (m *MsgHead) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_MsgHead.Marshal(b, m, deterministic)
}
func (m *MsgHead) XXX_Merge(src proto.Message) {
xxx_messageInfo_MsgHead.Merge(m, src)
}
func (m *MsgHead) XXX_Size() int {
return xxx_messageInfo_MsgHead.Size(m)
}
func (m *MsgHead) XXX_DiscardUnknown() {
xxx_messageInfo_MsgHead.DiscardUnknown(m)
}
var xxx_messageInfo_MsgHead proto.InternalMessageInfo
func (m *MsgHead) GetCid() int32 {
if m != nil && m.Cid != nil {
return *m.Cid
}
return 0
}
type Msg struct {
Head *MsgHead `protobuf:"bytes,1,opt,name=head" json:"head,omitempty"`
Ret *int32 `protobuf:"varint,2,opt,name=ret" json:"ret,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
proto.XXX_InternalExtensions `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *Msg) Reset() { *m = Msg{} }
func (m *Msg) String() string { return proto.CompactTextString(m) }
func (*Msg) ProtoMessage() {}
func (*Msg) Descriptor() ([]byte, []int) {
return fileDescriptor_d66ec2e140567106, []int{6}
}
var extRange_Msg = []proto.ExtensionRange{
{Start: 100, End: 10000},
}
func (*Msg) ExtensionRangeArray() []proto.ExtensionRange {
return extRange_Msg
}
func (m *Msg) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_Msg.Unmarshal(m, b)
}
func (m *Msg) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_Msg.Marshal(b, m, deterministic)
}
func (m *Msg) XXX_Merge(src proto.Message) {
xxx_messageInfo_Msg.Merge(m, src)
}
func (m *Msg) XXX_Size() int {
return xxx_messageInfo_Msg.Size(m)
}
func (m *Msg) XXX_DiscardUnknown() {
xxx_messageInfo_Msg.DiscardUnknown(m)
}
var xxx_messageInfo_Msg proto.InternalMessageInfo
func (m *Msg) GetHead() *MsgHead {
if m != nil {
return m.Head
}
return nil
}
func (m *Msg) GetRet() int32 {
if m != nil && m.Ret != nil {
return *m.Ret
}
return 0
}
type ClientList struct {
ClientList []uint64 `protobuf:"varint,1,rep,name=clientList" json:"clientList,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *ClientList) Reset() { *m = ClientList{} }
func (m *ClientList) String() string { return proto.CompactTextString(m) }
func (*ClientList) ProtoMessage() {}
func (*ClientList) Descriptor() ([]byte, []int) {
return fileDescriptor_d66ec2e140567106, []int{7}
}
func (m *ClientList) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_ClientList.Unmarshal(m, b)
}
func (m *ClientList) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_ClientList.Marshal(b, m, deterministic)
}
func (m *ClientList) XXX_Merge(src proto.Message) {
xxx_messageInfo_ClientList.Merge(m, src)
}
func (m *ClientList) XXX_Size() int {
return xxx_messageInfo_ClientList.Size(m)
}
func (m *ClientList) XXX_DiscardUnknown() {
xxx_messageInfo_ClientList.DiscardUnknown(m)
}
var xxx_messageInfo_ClientList proto.InternalMessageInfo
func (m *ClientList) GetClientList() []uint64 {
if m != nil {
return m.ClientList
}
return nil
}
type ReplyMessage struct {
ClientList []uint64 `protobuf:"varint,1,rep,name=clientList" json:"clientList,omitempty"`
MsgType *uint32 `protobuf:"varint,2,opt,name=msgType" json:"msgType,omitempty"`
Msg []byte `protobuf:"bytes,3,opt,name=msg" json:"msg,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *ReplyMessage) Reset() { *m = ReplyMessage{} }
func (m *ReplyMessage) String() string { return proto.CompactTextString(m) }
func (*ReplyMessage) ProtoMessage() {}
func (*ReplyMessage) Descriptor() ([]byte, []int) {
return fileDescriptor_d66ec2e140567106, []int{8}
}
func (m *ReplyMessage) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_ReplyMessage.Unmarshal(m, b)
}
func (m *ReplyMessage) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_ReplyMessage.Marshal(b, m, deterministic)
}
func (m *ReplyMessage) XXX_Merge(src proto.Message) {
xxx_messageInfo_ReplyMessage.Merge(m, src)
}
func (m *ReplyMessage) XXX_Size() int {
return xxx_messageInfo_ReplyMessage.Size(m)
}
func (m *ReplyMessage) XXX_DiscardUnknown() {
xxx_messageInfo_ReplyMessage.DiscardUnknown(m)
}
var xxx_messageInfo_ReplyMessage proto.InternalMessageInfo
func (m *ReplyMessage) GetClientList() []uint64 {
if m != nil {
return m.ClientList
}
return nil
}
func (m *ReplyMessage) GetMsgType() uint32 {
if m != nil && m.MsgType != nil {
return *m.MsgType
}
return 0
}
func (m *ReplyMessage) GetMsg() []byte {
if m != nil {
return m.Msg
}
return nil
}
type PlaceHolders struct {
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *PlaceHolders) Reset() { *m = PlaceHolders{} }
func (m *PlaceHolders) String() string { return proto.CompactTextString(m) }
func (*PlaceHolders) ProtoMessage() {}
func (*PlaceHolders) Descriptor() ([]byte, []int) {
return fileDescriptor_d66ec2e140567106, []int{9}
}
func (m *PlaceHolders) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_PlaceHolders.Unmarshal(m, b)
}
func (m *PlaceHolders) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_PlaceHolders.Marshal(b, m, deterministic)
}
func (m *PlaceHolders) XXX_Merge(src proto.Message) {
xxx_messageInfo_PlaceHolders.Merge(m, src)
}
func (m *PlaceHolders) XXX_Size() int {
return xxx_messageInfo_PlaceHolders.Size(m)
}
func (m *PlaceHolders) XXX_DiscardUnknown() {
xxx_messageInfo_PlaceHolders.DiscardUnknown(m)
}
var xxx_messageInfo_PlaceHolders proto.InternalMessageInfo
func init() {
proto.RegisterType((*Int)(nil), "tcpgateway.Int")
proto.RegisterType((*Int64)(nil), "tcpgateway.Int64")
proto.RegisterType((*Bool)(nil), "tcpgateway.Bool")
proto.RegisterType((*String)(nil), "tcpgateway.String")
proto.RegisterType((*Bytes)(nil), "tcpgateway.Bytes")
proto.RegisterType((*MsgHead)(nil), "tcpgateway.MsgHead")
proto.RegisterType((*Msg)(nil), "tcpgateway.Msg")
proto.RegisterType((*ClientList)(nil), "tcpgateway.ClientList")
proto.RegisterType((*ReplyMessage)(nil), "tcpgateway.ReplyMessage")
proto.RegisterType((*PlaceHolders)(nil), "tcpgateway.PlaceHolders")
}
func init() { proto.RegisterFile("base/base.proto", fileDescriptor_d66ec2e140567106) }
var fileDescriptor_d66ec2e140567106 = []byte{
// 276 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x84, 0x8f, 0x41, 0x4b, 0xc3, 0x40,
0x10, 0x85, 0xa9, 0x9b, 0xd8, 0x3a, 0x46, 0x2d, 0xab, 0x87, 0x40, 0xb5, 0x94, 0xbd, 0x58, 0x44,
0x22, 0x88, 0xf8, 0x03, 0xea, 0xc1, 0x16, 0x8c, 0x48, 0xf4, 0xe4, 0x6d, 0x4d, 0x86, 0x35, 0xb0,
0x4d, 0x42, 0x66, 0x54, 0xf2, 0x33, 0xfc, 0xc7, 0x92, 0xa4, 0xd1, 0xe6, 0xe4, 0x65, 0x99, 0xb7,
0xdf, 0xec, 0xdb, 0xf7, 0xe0, 0xe8, 0x4d, 0x13, 0x5e, 0xd5, 0x47, 0x50, 0x94, 0x39, 0xe7, 0x12,
0x38, 0x2e, 0x8c, 0x66, 0xfc, 0xd2, 0x95, 0x9a, 0x80, 0x58, 0x65, 0x2c, 0x4f, 0xc0, 0xfd, 0xd4,
0xf6, 0x03, 0xfd, 0xc1, 0x6c, 0x30, 0x77, 0xa3, 0x56, 0xa8, 0x33, 0x70, 0x57, 0x19, 0xdf, 0xde,
0xf4, 0xb1, 0xe8, 0xf0, 0x29, 0x38, 0x8b, 0x3c, 0xb7, 0x7d, 0x3a, 0xea, 0xe8, 0x14, 0x76, 0x9f,
0xb9, 0x4c, 0x33, 0xd3, 0xe7, 0x7b, 0x5b, 0xe6, 0x8b, 0x8a, 0x91, 0xfa, 0xd8, 0xeb, 0xf0, 0x04,
0x86, 0x21, 0x99, 0x25, 0xea, 0x44, 0x8e, 0x41, 0xc4, 0x69, 0xb2, 0x89, 0x56, 0x8f, 0xea, 0x1e,
0x44, 0x48, 0x46, 0x9e, 0x83, 0xf3, 0x8e, 0xba, 0x25, 0xfb, 0xd7, 0xc7, 0xc1, 0x5f, 0xaf, 0x60,
0xf3, 0x36, 0x6a, 0x16, 0x6a, 0x87, 0x12, 0xd9, 0xdf, 0x69, 0x1d, 0x4a, 0xe4, 0x0b, 0x77, 0x94,
0x8c, 0xbf, 0x1f, 0xd5, 0x25, 0xc0, 0x9d, 0x4d, 0x31, 0xe3, 0x87, 0x94, 0x58, 0x4e, 0x01, 0xe2,
0x5f, 0xe5, 0x0f, 0x66, 0x62, 0xee, 0x44, 0x5b, 0x37, 0xea, 0x15, 0xbc, 0x08, 0x0b, 0x5b, 0x85,
0x48, 0xa4, 0x0d, 0xfe, 0xb7, 0x2f, 0x7d, 0x18, 0xae, 0xc9, 0xbc, 0x54, 0x05, 0x36, 0x5f, 0x1f,
0x44, 0x9d, 0xac, 0x03, 0xad, 0xc9, 0xf8, 0xa2, 0x69, 0x5c, 0x8f, 0xea, 0x10, 0xbc, 0x27, 0xab,
0x63, 0x5c, 0xe6, 0x36, 0xc1, 0x92, 0x7e, 0x02, 0x00, 0x00, 0xff, 0xff, 0xc7, 0xb1, 0xeb, 0xcb,
0xb6, 0x01, 0x00, 0x00,
}

View File

@@ -0,0 +1,55 @@
syntax = "proto2";
package tcpgateway;
message Int{
optional int32 value = 1;
}
message Int64{
optional int64 value = 1;
}
message Bool{
optional bool value = 1;
}
message String{
optional string value = 1;
}
message Bytes{
optional bytes value = 1;
}
message MsgHead
{
optional int32 cid = 1;
}
message Msg
{
optional MsgHead head = 1;
optional int32 ret = 2;
extensions 100 to 10000;
}
message ClientList
{
repeated uint64 clientList = 1;
}
message ReplyMessage
{
repeated uint64 clientList = 1;
optional uint32 msgType = 2;
optional bytes msg = 3;
}
message PlaceHolders
{
}

View File

@@ -5,8 +5,11 @@ import (
"github.com/duanhf2012/origin/event"
"github.com/duanhf2012/origin/log"
"github.com/duanhf2012/origin/network"
"github.com/duanhf2012/origin/network/processor"
"github.com/duanhf2012/origin/node"
"github.com/duanhf2012/origin/service"
"sync"
"time"
)
type TcpService struct {
@@ -15,8 +18,8 @@ type TcpService struct {
mapClientLocker sync.RWMutex
mapClient map[uint64] *Client
initClientId uint64
process network.Processor
//initClientId uint64
process processor.IProcessor
}
type TcpPackType int8
@@ -28,10 +31,10 @@ const(
)
type TcpPack struct {
Type TcpPackType //0表示连接 1表示断开 2表示数据
MsgProcessor network.Processor
ClientId uint64
Data interface{}
Type TcpPackType //0表示连接 1表示断开 2表示数据
MsgProcessor processor.IProcessor
ClientId uint64
Data interface{}
}
const Default_MaxConnNum = 3000
@@ -40,6 +43,31 @@ const Default_LittleEndian = false
const Default_MinMsgLen = 2
const Default_MaxMsgLen = 65535
const (
MaxNodeId = 1<<10 - 1 //Uint10
MaxSeed = 1<<22 - 1 //MaxUint24
)
var seed uint32
var seedLocker sync.Mutex
func (slf *TcpService) genId() uint64 {
if node.GetNodeId()>MaxNodeId{
panic("nodeId exceeds the maximum!")
}
seedLocker.Lock()
seed = (seed+1)%MaxSeed
seedLocker.Unlock()
nowTime := uint64(time.Now().Second())
return (uint64(node.GetNodeId())<<54)|(nowTime<<22)|uint64(seed)
}
func GetNodeId(agentId uint64) int {
return int(agentId>>54)
}
func (slf *TcpService) OnInit() error{
iConfig := slf.GetServiceCfg()
if iConfig == nil {
@@ -96,7 +124,7 @@ func (slf *TcpService) TcpEventHandler(ev *event.Event) {
}
}
func (slf *TcpService) SetProcessor(process network.Processor,handler event.IEventHandler){
func (slf *TcpService) SetProcessor(process processor.IProcessor,handler event.IEventHandler){
slf.process = process
slf.RegEventReciverFunc(event.Sys_Event_Tcp,handler,slf.TcpEventHandler)
}
@@ -106,15 +134,16 @@ func (slf *TcpService) NewClient(conn *network.TCPConn) network.Agent {
defer slf.mapClientLocker.Unlock()
for {
slf.initClientId+=1
_,ok := slf.mapClient[slf.initClientId]
clientId := slf.genId()
_,ok := slf.mapClient[clientId]
if ok == true {
continue
}
pClient := &Client{tcpConn:conn, id:slf.initClientId}
pClient := &Client{tcpConn:conn, id:clientId}
pClient.tcpService = slf
slf.mapClient[slf.initClientId] = pClient
slf.mapClient[clientId] = pClient
return pClient
}
@@ -134,6 +163,9 @@ func (slf *Client) GetId() uint64 {
func (slf *Client) Run() {
slf.tcpService.NotifyEvent(&event.Event{Type:event.Sys_Event_Tcp,Data:&TcpPack{ClientId:slf.id,Type:TPT_Connected,MsgProcessor:slf.tcpService.process}})
for{
if slf.tcpConn == nil {
break
}
bytes,err := slf.tcpConn.ReadMsg()
if err != nil {
log.Debug("read client id %d is error:%+v",slf.id,err)
@@ -155,12 +187,12 @@ func (slf *Client) OnClose(){
delete (slf.tcpService.mapClient,slf.GetId())
}
func (slf *TcpService) SendMsg(clientid uint64,msg interface{}) error{
func (slf *TcpService) SendMsg(clientId uint64,msg interface{}) error{
slf.mapClientLocker.Lock()
client,ok := slf.mapClient[clientid]
client,ok := slf.mapClient[clientId]
if ok == false{
slf.mapClientLocker.Unlock()
return fmt.Errorf("client %d is disconnect!",clientid)
return fmt.Errorf("client %d is disconnect!",clientId)
}
slf.mapClientLocker.Unlock()
@@ -171,11 +203,11 @@ func (slf *TcpService) SendMsg(clientid uint64,msg interface{}) error{
return client.tcpConn.WriteMsg(bytes)
}
func (slf *TcpService) Close(clientid uint64) {
func (slf *TcpService) Close(clientId uint64) {
slf.mapClientLocker.Lock()
defer slf.mapClientLocker.Unlock()
client,ok := slf.mapClient[clientid]
client,ok := slf.mapClient[clientId]
if ok == false{
return
}
@@ -197,3 +229,15 @@ func (slf *TcpService) GetClientIp(clientid uint64) string{
return pClient.tcpConn.GetRemoteIp()
}
func (slf *TcpService) SendRawMsg(clientId uint64,msg []byte) error{
slf.mapClientLocker.Lock()
client,ok := slf.mapClient[clientId]
if ok == false{
slf.mapClientLocker.Unlock()
return fmt.Errorf("client %d is disconnect!",clientId)
}
slf.mapClientLocker.Unlock()
return client.tcpConn.WriteMsg(msg)
}

View File

@@ -5,6 +5,7 @@ import (
"github.com/duanhf2012/origin/event"
"github.com/duanhf2012/origin/log"
"github.com/duanhf2012/origin/network"
"github.com/duanhf2012/origin/network/processor"
"github.com/duanhf2012/origin/service"
"sync"
)
@@ -16,7 +17,7 @@ type WSService struct {
mapClientLocker sync.RWMutex
mapClient map[uint64] *WSClient
initClientId uint64
process network.Processor
process processor.Processor
}
type WSPackType int8
@@ -28,10 +29,10 @@ const(
)
type WSPack struct {
Type WSPackType //0表示连接 1表示断开 2表示数据
MsgProcessor network.Processor
ClientId uint64
Data interface{}
Type WSPackType //0表示连接 1表示断开 2表示数据
MsgProcessor processor.Processor
ClientId uint64
Data interface{}
}
@@ -89,7 +90,7 @@ func (slf *WSService) WSEventHandler(ev *event.Event) {
}
}
func (slf *WSService) SetProcessor(process network.Processor,handler event.IEventHandler){
func (slf *WSService) SetProcessor(process processor.Processor,handler event.IEventHandler){
slf.process = process
slf.RegEventReciverFunc(event.Sys_Event_WebSocket,handler,slf.WSEventHandler)
}