From 0c55961c44a8dda17d69bd83f914474d0ceed67f Mon Sep 17 00:00:00 2001 From: boyce Date: Wed, 7 Oct 2020 16:14:19 +0800 Subject: [PATCH] =?UTF-8?q?=E6=96=B0=E5=A2=9ETcpGateWay=E6=9C=8D=E5=8A=A1?= =?UTF-8?q?=EF=BC=8C=E6=94=AF=E6=8C=81=E9=80=9A=E8=BF=87=E9=85=8D=E7=BD=AE?= =?UTF-8?q?=E8=BF=9B=E8=A1=8C=E8=B7=AF=E7=94=B1=E8=BD=AC=E5=8F=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- event/event.go | 33 +- network/processor.go | 17 - network/processor/pbrawprocessor.go | 119 ++++++ network/processor/processor.go | 34 ++ rpc/rpchandler.go | 2 +- service/module.go | 7 +- service/service.go | 8 +- sysservice/tcpgateway/GateProxyModule.go | 77 ++++ sysservice/tcpgateway/ILoadBalance.go | 5 + sysservice/tcpgateway/IRouter.go | 11 + sysservice/tcpgateway/LoadBalance.go | 8 + sysservice/tcpgateway/Router.go | 242 ++++++++++++ sysservice/tcpgateway/TcpGateService.go | 81 ++++ sysservice/tcpgateway/base.pb.go | 473 +++++++++++++++++++++++ sysservice/tcpgateway/base.proto | 55 +++ sysservice/tcpservice/tcpservice.go | 76 +++- sysservice/wsservice/wsservice.go | 13 +- 17 files changed, 1203 insertions(+), 58 deletions(-) delete mode 100644 network/processor.go create mode 100644 network/processor/pbrawprocessor.go create mode 100644 network/processor/processor.go create mode 100644 sysservice/tcpgateway/GateProxyModule.go create mode 100644 sysservice/tcpgateway/ILoadBalance.go create mode 100644 sysservice/tcpgateway/IRouter.go create mode 100644 sysservice/tcpgateway/LoadBalance.go create mode 100644 sysservice/tcpgateway/Router.go create mode 100644 sysservice/tcpgateway/TcpGateService.go create mode 100644 sysservice/tcpgateway/base.pb.go create mode 100644 sysservice/tcpgateway/base.proto diff --git a/event/event.go b/event/event.go index 963489f..c3133bc 100644 --- a/event/event.go +++ b/event/event.go @@ -19,20 +19,17 @@ type Event struct { } type IEventHandler interface { - + Init(processor IEventProcessor) GetEventProcessor() IEventProcessor //获得事件 NotifyEvent(*Event) - Desctory() - //注册了事件 addRegInfo(eventType EventType,eventProcessor IEventProcessor) removeRegInfo(eventType EventType,eventProcessor IEventProcessor) - - } type IEventProcessor interface { + EventHandler(ev *Event) //同一个IEventHandler,只能接受一个EventType类型回调 RegEventReciverFunc(eventType EventType,reciver IEventHandler,callback EventCallBack) UnRegEventReciverFun(eventType EventType,reciver IEventHandler) @@ -44,6 +41,7 @@ type IEventProcessor interface { addListen(eventType EventType,reciver IEventHandler) removeBindEvent(eventType EventType,reciver IEventHandler) removeListen(eventType EventType,reciver IEventHandler) + GetEventChan() chan *Event } type EventHandler struct { @@ -64,6 +62,21 @@ type EventProcessor struct { mapBindHandlerEvent map[EventType]map[IEventHandler]EventCallBack//收到事件处理 } +func NewEventHandler() IEventHandler{ + eh := EventHandler{} + eh.mapRegEvent = map[EventType]map[IEventProcessor]interface{}{} + + return &eh +} + +func NewEventProcessor() IEventProcessor{ + ep := EventProcessor{} + ep.mapListenerEvent = map[EventType]map[IEventProcessor]int{} + ep.mapBindHandlerEvent = map[EventType]map[IEventHandler]EventCallBack{} + + return &ep +} + func (slf *EventHandler) addRegInfo(eventType EventType,eventProcessor IEventProcessor){ slf.locker.Lock() defer slf.locker.Unlock() @@ -93,6 +106,7 @@ func (slf *EventHandler) NotifyEvent(ev *Event){ func (slf *EventHandler) Init(processor IEventProcessor){ slf.eventProcessor = processor + slf.mapRegEvent =map[EventType]map[IEventProcessor]interface{}{} } @@ -112,12 +126,8 @@ func (slf *EventProcessor) SetEventChannel(channelNum int) bool{ } func (slf *EventProcessor) addBindEvent(eventType EventType,reciver IEventHandler,callback EventCallBack){ - //mapBindHandlerEvent map[EventType]map[IEventHandler]EventCallBack//收到事件处理 slf.locker.Lock() defer slf.locker.Unlock() - if slf.mapBindHandlerEvent == nil { - slf.mapBindHandlerEvent = map[EventType]map[IEventHandler]EventCallBack{} - } if _,ok := slf.mapBindHandlerEvent[eventType]; ok == false { slf.mapBindHandlerEvent[eventType] = map[IEventHandler]EventCallBack{} @@ -130,11 +140,6 @@ func (slf *EventProcessor) addListen(eventType EventType,reciver IEventHandler){ slf.locker.Lock() defer slf.locker.Unlock() - //mapListenerEvent map[EventType]map[IEventProcessor]int - if slf.mapListenerEvent == nil { - slf.mapListenerEvent = map[EventType]map[IEventProcessor]int{} - } - if _,ok :=slf.mapListenerEvent[eventType];ok == false{ slf.mapListenerEvent[eventType] = map[IEventProcessor]int{} } diff --git a/network/processor.go b/network/processor.go deleted file mode 100644 index 48d1828..0000000 --- a/network/processor.go +++ /dev/null @@ -1,17 +0,0 @@ -package network - - -type Processor interface { - // must goroutine safe - MsgRoute(msg interface{}, userData interface{}) error - //must goroutine safe - UnknownMsgRoute(msg interface{}, userData interface{}) - // connect event - ConnectedRoute(userData interface{}) - DisConnectedRoute(userData interface{}) - - // must goroutine safe - Unmarshal(data []byte) (interface{}, error) - // must goroutine safe - Marshal(msg interface{}) ([]byte, error) -} diff --git a/network/processor/pbrawprocessor.go b/network/processor/pbrawprocessor.go new file mode 100644 index 0000000..51c4c58 --- /dev/null +++ b/network/processor/pbrawprocessor.go @@ -0,0 +1,119 @@ +package processor + +import ( + "encoding/binary" + "reflect" +) + +type RawMessageInfo struct { + msgType reflect.Type + msgHandler RawMessageHandler +} + +type RawMessageHandler func(clientId uint64,packType uint16,msg []byte) +type RawConnectHandler func(clientId uint64) +type UnknownRawMessageHandler func(clientId uint64,msg []byte) + +const RawMsgTypeSize = 2 +type PBRawProcessor struct { + msgHandler RawMessageHandler + LittleEndian bool + + unknownMessageHandler UnknownRawMessageHandler + connectHandler RawConnectHandler + disconnectHandler RawConnectHandler +} + +func NewPBRawProcessor() *PBRawProcessor { + processor := &PBRawProcessor{} + return processor +} + +func (p *PBRawProcessor) SetByteOrder(littleEndian bool) { + p.LittleEndian = littleEndian +} + +type PBRawPackInfo struct { + typ uint16 + rawMsg []byte +} + +func (slf *PBRawPackInfo) GetPackType() uint16 { + return slf.typ +} + +func (slf *PBRawPackInfo) GetMsg() []byte { + return slf.rawMsg +} + +// must goroutine safe +func (slf *PBRawProcessor ) MsgRoute(msg interface{},userdata interface{}) error{ + pPackInfo := msg.(*PBRawPackInfo) + slf.msgHandler(userdata.(uint64),pPackInfo.typ,pPackInfo.rawMsg) + return nil +} + +// must goroutine safe +func (slf *PBRawProcessor ) Unmarshal(data []byte) (interface{}, error) { + var msgType uint16 + if slf.LittleEndian == true { + msgType = binary.LittleEndian.Uint16(data[:2]) + }else{ + msgType = binary.BigEndian.Uint16(data[:2]) + } + + return &PBRawPackInfo{typ:msgType,rawMsg:data[2:]},nil +} + +// must goroutine safe +func (slf *PBRawProcessor ) Marshal(msg interface{}) ([]byte, error){ + pMsg := msg.(*PBRawPackInfo) + + buff := make([]byte, 2, len(pMsg.rawMsg)+RawMsgTypeSize) + if slf.LittleEndian == true { + binary.LittleEndian.PutUint16(buff[:2],pMsg.typ) + }else{ + binary.BigEndian.PutUint16(buff[:2],pMsg.typ) + } + + buff = append(buff,pMsg.rawMsg...) + return buff,nil +} + +func (slf *PBRawProcessor) SetRawMsgHandler(handle RawMessageHandler) { + slf.msgHandler = handle +} + + +func (slf *PBRawProcessor) MakeRawMsg(msgType uint16,msg []byte) *PBRawPackInfo { + return &PBRawPackInfo{typ:msgType,rawMsg:msg} +} + +func (slf *PBRawProcessor) UnknownMsgRoute(msg interface{}, userData interface{}){ + if slf.unknownMessageHandler == nil { + return + } + slf.unknownMessageHandler(userData.(uint64),msg.([]byte)) +} + +// connect event +func (slf *PBRawProcessor) ConnectedRoute(userData interface{}){ + slf.connectHandler(userData.(uint64)) +} + +func (slf *PBRawProcessor) DisConnectedRoute(userData interface{}){ + slf.disconnectHandler(userData.(uint64)) +} + +func (slf *PBRawProcessor) SetUnknownMsgHandler(unknownMessageHandler UnknownRawMessageHandler){ + slf.unknownMessageHandler = unknownMessageHandler +} + +func (slf *PBRawProcessor) SetConnectedHandler(connectHandler RawConnectHandler){ + slf.connectHandler = connectHandler +} + +func (slf *PBRawProcessor) SetDisConnectedHandler(disconnectHandler RawConnectHandler){ + slf.disconnectHandler = disconnectHandler +} + diff --git a/network/processor/processor.go b/network/processor/processor.go new file mode 100644 index 0000000..f28877b --- /dev/null +++ b/network/processor/processor.go @@ -0,0 +1,34 @@ +package processor + + +type IProcessor interface { + // must goroutine safe + MsgRoute(msg interface{}, userData interface{}) error + //must goroutine safe + UnknownMsgRoute(msg interface{}, userData interface{}) + // connect event + ConnectedRoute(userData interface{}) + DisConnectedRoute(userData interface{}) + + // must goroutine safe + Unmarshal(data []byte) (interface{}, error) + // must goroutine safe + Marshal(msg interface{}) ([]byte, error) +} + +type IRawProcessor interface { + SetByteOrder(littleEndian bool) + MsgRoute(msg interface{},userdata interface{}) error + Unmarshal(data []byte) (interface{}, error) + Marshal(msg interface{}) ([]byte, error) + + SetRawMsgHandler(handle RawMessageHandler) + MakeRawMsg(msgType uint16,msg []byte) *PBRawPackInfo + UnknownMsgRoute(msg interface{}, userData interface{}) + ConnectedRoute(userData interface{}) + DisConnectedRoute(userData interface{}) + + SetUnknownMsgHandler(unknownMessageHandler UnknownRawMessageHandler) + SetConnectedHandler(connectHandler RawConnectHandler) + SetDisConnectedHandler(disconnectHandler RawConnectHandler) +} diff --git a/rpc/rpchandler.go b/rpc/rpchandler.go index 99d4801..dc29c7e 100644 --- a/rpc/rpchandler.go +++ b/rpc/rpchandler.go @@ -134,7 +134,7 @@ func (slf *RpcHandler) suitableMethods(method reflect.Method) error { return fmt.Errorf("%s The return parameter must be of type error!",method.Name) } - if typ.NumIn() <3 || typ.NumIn() > 4 { + if typ.NumIn() <2 || typ.NumIn() > 4 { return fmt.Errorf("%s Unsupported parameter format!",method.Name) } diff --git a/service/module.go b/service/module.go index 181aa86..51dd486 100644 --- a/service/module.go +++ b/service/module.go @@ -52,7 +52,8 @@ type Module struct { //事件管道 moduleName string - eventHandler event.EventHandler + eventHandler event.IEventHandler + //eventHandler event.EventHandler } @@ -74,6 +75,7 @@ func (slf *Module) GetModuleName() string{ } func (slf *Module) OnInit() error{ +// slf.eventHandler = event.NewEventHandler() return nil } @@ -100,6 +102,7 @@ func (slf *Module) AddModule(module IModule) (int64,error){ pAddModule.dispatcher = slf.GetAncestor().getBaseModule().(*Module).dispatcher pAddModule.ancestor = slf.ancestor pAddModule.moduleName = reflect.Indirect(reflect.ValueOf(module)).Type().Name() + pAddModule.eventHandler = event.NewEventHandler() pAddModule.eventHandler.Init(slf.eventHandler.GetEventProcessor()) err := module.OnInit() if err != nil { @@ -216,5 +219,5 @@ func (slf *Module) NotifyEvent(ev *event.Event){ } func (slf *Module) GetEventHandler() event.IEventHandler{ - return &slf.eventHandler + return slf.eventHandler } \ No newline at end of file diff --git a/service/service.go b/service/service.go index 2c33c5a..b339fb8 100644 --- a/service/service.go +++ b/service/service.go @@ -41,7 +41,9 @@ type Service struct { serviceCfg interface{} gorouterNum int32 startStatus bool - eventProcessor event.EventProcessor //事件接收者 + eventProcessor event.IEventProcessor + + //eventProcessor event.EventProcessor //事件接收者 profiler *profiler.Profiler //性能分析器 } @@ -69,7 +71,9 @@ func (slf *Service) Init(iservice IService,getClientFun rpc.FuncRpcClient,getSer slf.descendants = map[int64]IModule{} slf.serviceCfg = serviceCfg slf.gorouterNum = 1 - slf.eventHandler.Init(&slf.eventProcessor) + slf.eventProcessor = event.NewEventProcessor() + slf.eventHandler = event.NewEventHandler() + slf.eventHandler.Init(slf.eventProcessor) } func (slf *Service) SetGoRouterNum(gorouterNum int32) bool { diff --git a/sysservice/tcpgateway/GateProxyModule.go b/sysservice/tcpgateway/GateProxyModule.go new file mode 100644 index 0000000..2ecc49d --- /dev/null +++ b/sysservice/tcpgateway/GateProxyModule.go @@ -0,0 +1,77 @@ +package tcpgateway + +import ( + "fmt" + "github.com/duanhf2012/origin/service" + "github.com/duanhf2012/origin/sysservice/tcpservice" + "github.com/golang/protobuf/proto" +) + +type GateProxyModule struct{ + service.Module + defaultGateRpc string +} + +func NewGateProxyModule() *GateProxyModule{ + return &GateProxyModule{defaultGateRpc:"TcpGateService.RPC_Dispatch"} +} + +func (slf *GateProxyModule) Send(clientId interface{},msgType uint16,msg proto.Message) error { + //对agentId进行分组 + mapNodeClientId := map[int][]uint64{} + switch clientId.(type) { + case uint64: + id := clientId.(uint64) + nodeId := tcpservice.GetNodeId(id) + mapNodeClientId[nodeId] = append(mapNodeClientId[nodeId],id) + + case []uint64: + idList := clientId.([]uint64) + for _,id := range idList{ + nodeId := tcpservice.GetNodeId(id) + mapNodeClientId[nodeId] = append(mapNodeClientId[nodeId],id) + } + } + + bData,err := proto.Marshal(msg) + if err!=nil { + return err + } + var replyMsg ReplyMessage + replyMsg.MsgType = proto.Uint32(uint32(msgType)) + replyMsg.Msg = bData + + + for nodeId,clientIdList := range mapNodeClientId { + if nodeId <0 || nodeId>tcpservice.MaxNodeId { + fmt.Errorf("nodeid is error %d",nodeId) + continue + } + + replyMsg.ClientList = clientIdList + slf.GetService().GetRpcHandler().GoNode(nodeId,slf.defaultGateRpc,&replyMsg) + } + + return nil +} + + + +func (slf *GateProxyModule) SetDefaultGateRpcMethodName(rpcMethodName string){ + slf.defaultGateRpc = rpcMethodName +} + + +func (slf *GateProxyModule) send(clientId uint64,msgType uint16,msg []byte) error { + nodeId := tcpservice.GetNodeId(clientId) + if nodeId <0 || nodeId>tcpservice.MaxNodeId { + return fmt.Errorf("nodeid is error %d",nodeId) + } + + var replyMsg ReplyMessage + replyMsg.MsgType = proto.Uint32(uint32(msgType)) + replyMsg.Msg = msg + replyMsg.ClientList = append(replyMsg.ClientList ,clientId) + + return slf.GetService().GetRpcHandler().GoNode(nodeId,slf.defaultGateRpc,&replyMsg) +} diff --git a/sysservice/tcpgateway/ILoadBalance.go b/sysservice/tcpgateway/ILoadBalance.go new file mode 100644 index 0000000..1bea6be --- /dev/null +++ b/sysservice/tcpgateway/ILoadBalance.go @@ -0,0 +1,5 @@ +package tcpgateway + +type ILoadBalance interface { + SelectNode(serviceName string) int //选择一个结点,通过服务名称 +} diff --git a/sysservice/tcpgateway/IRouter.go b/sysservice/tcpgateway/IRouter.go new file mode 100644 index 0000000..89b5393 --- /dev/null +++ b/sysservice/tcpgateway/IRouter.go @@ -0,0 +1,11 @@ +package tcpgateway + +type IRouter interface { + RouterMessage(clientId uint64,msgType uint16,msg []byte) //消息转发 + RouterEvent(clientId uint64,eventType string) bool//消息转发 + Load() //加载路由规则 + + OnDisconnected(clientId uint64) + OnConnected(clientId uint64) + //ReplyMessage(clientId uint64,msg []byte) +} diff --git a/sysservice/tcpgateway/LoadBalance.go b/sysservice/tcpgateway/LoadBalance.go new file mode 100644 index 0000000..48b8622 --- /dev/null +++ b/sysservice/tcpgateway/LoadBalance.go @@ -0,0 +1,8 @@ +package tcpgateway + +type LoadBalance struct { +} + +func (slf *LoadBalance) SelectNode(serviceName string) int { + return 1 +} diff --git a/sysservice/tcpgateway/Router.go b/sysservice/tcpgateway/Router.go new file mode 100644 index 0000000..12717de --- /dev/null +++ b/sysservice/tcpgateway/Router.go @@ -0,0 +1,242 @@ +package tcpgateway + +import ( + "github.com/duanhf2012/origin/log" + "github.com/duanhf2012/origin/node" + "github.com/duanhf2012/origin/rpc" + "github.com/duanhf2012/origin/sysservice/tcpservice" + "github.com/golang/protobuf/proto" + "strings" +) + +type RouterCfg struct { +} + +type Router struct { + loadBalance ILoadBalance + rpcHandler rpc.IRpcHandler + + mapMsgRouterInfo map[uint16]*MsgRouterInfo + mapEventRouterInfo map[string]*EventRouterInfo + + tcpService *tcpservice.TcpService + + mapClientRouterCache map[uint64]map[string]int //map[clientid]nodeid +} + +type MsgRouterInfo struct { + Rpc string + ServiceName string + LoadBalanceType string +} + +type EventRouterInfo struct { + Rpc string + ServiceName string + LoadBalanceType string +} + +func NewRouter(loadBalance ILoadBalance,rpcHandler rpc.IRpcHandler,cfg interface{}) IRouter { + router := &Router{} + router.loadBalance = loadBalance + router.rpcHandler = rpcHandler + router.tcpService = node.GetService("TcpService").(*tcpservice.TcpService) + router.loadCfg(cfg) + router.mapClientRouterCache = map[uint64]map[string]int{} + return router +} + +func (slf *Router) loadCfg(cfg interface{}){ + slf.mapMsgRouterInfo = map[uint16]*MsgRouterInfo{} + slf.mapEventRouterInfo = map[string]*EventRouterInfo{} + + mapRouter,ok := cfg.(map[string]interface{}) + if ok == false{ + //error.... + return + } + + //parse MsgRouter + routerInfo,ok := mapRouter["MsgRouter"] + if ok == false{ + //error... + return + } + + //ar routerList []RouterItem + routerList,ok := routerInfo.([]interface{}) + if ok == false{ + //error... + return + } + + for _,v := range routerList{ + mapItem := v.(map[string]interface{}) + var iMsgId interface{} + var iRpc interface{} + var iLoadBalanceType interface{} + + if iMsgId,ok = mapItem["MsgId"];ok == false { + //error ... + continue + } + if iRpc,ok = mapItem["Rpc"];ok == false { + //error ... + continue + } + if iLoadBalanceType,ok = mapItem["LoadBalanceType"];ok == false { + //error ... + continue + } + msgId,ok := iMsgId.(float64) + if ok == false { + //error ... + continue + } + + strService := strings.Split(iRpc.(string),".") + if len(strService)!=2 { + //error ... + continue + } + + slf.mapMsgRouterInfo[uint16(msgId)] = &MsgRouterInfo{ServiceName:strService[0],Rpc: iRpc.(string),LoadBalanceType: iLoadBalanceType.(string)} + } + + //parse EventRouter + eventRouterInfo,ok := mapRouter["EventRouter"] + if ok == false{ + //error... + return + } + + //ar routerList []RouterItem + eRouterList,ok := eventRouterInfo.([]interface{}) + if ok == false{ + //error... + return + } + + for _,v := range eRouterList{ + mapItem := v.(map[string]interface{}) + var eventType interface{} + var iRpc interface{} + var iLoadBalanceType interface{} + + if eventType,ok = mapItem["EventType"];ok == false { + //error ... + continue + } + if iRpc,ok = mapItem["Rpc"];ok == false { + //error ... + continue + } + if iLoadBalanceType,ok = mapItem["LoadBalanceType"];ok == false { + //error ... + continue + } + strEventType,ok := eventType.(string) + if ok == false { + //error ... + continue + } + + strService := strings.Split(iRpc.(string),".") + if len(strService)!=2 { + //error ... + continue + } + + slf.mapEventRouterInfo[strEventType] = &EventRouterInfo{ServiceName:strService[0],Rpc: iRpc.(string),LoadBalanceType: iLoadBalanceType.(string)} + } +} + +func (slf *Router) GetMsgRouterService(msgType uint16) *MsgRouterInfo{ + info,ok := slf.mapMsgRouterInfo[msgType] + if ok == false { + return nil + } + + return info +} + +func (slf *Router) GetEventRouterService(eventType string) *EventRouterInfo{ + info,ok := slf.mapEventRouterInfo[eventType] + if ok == false { + return nil + } + + return info +} + +func (slf *Router) GetRouterId(clientId uint64,serviceName *string) int { + mapServiceRouter,ok := slf.mapClientRouterCache[clientId] + if ok == false{ + return 0 + } + + routerId,ok := mapServiceRouter[*serviceName] + if ok == false { + return 0 + } + + return routerId +} + +func (slf *Router) SetRouterId(clientId uint64,serviceName *string,routerId int){ + slf.mapClientRouterCache[clientId][*serviceName] = routerId +} + +func (slf *Router) RouterMessage(clientId uint64,msgType uint16,msg []byte) { + routerInfo:= slf.GetMsgRouterService(msgType) + if routerInfo==nil { + log.Error("The message type is %d with no configured route!",msgType) + return + } + + routerId := slf.GetRouterId(clientId,&routerInfo.ServiceName) + if routerId ==0 { + routerId = slf.loadBalance.SelectNode(routerInfo.ServiceName) + slf.SetRouterId(clientId,&routerInfo.ServiceName,routerId) + } + + if routerId>0 { + slf.rpcHandler.RawGoNode(routerId,routerInfo.Rpc,msg,proto.Uint64(clientId)) + } +} + +func (slf *Router) Load(){ +} + +func (slf *Router) RouterEvent(clientId uint64,eventType string) bool{ + routerInfo:= slf.GetEventRouterService(eventType) + if routerInfo==nil { + log.Error("The event type is %s with no register!",eventType) + return false + } + + routerId := slf.GetRouterId(clientId,&routerInfo.ServiceName) + if routerId ==0 { + routerId = slf.loadBalance.SelectNode(routerInfo.ServiceName) + slf.SetRouterId(clientId,&routerInfo.ServiceName,routerId) + } + + if routerId>0 { + slf.rpcHandler.RawGoNode(routerId,routerInfo.Rpc,[]byte{},proto.Uint64(clientId)) + return true + } + + return false +} + + +func (slf *Router) OnDisconnected(clientId uint64){ + delete(slf.mapClientRouterCache,clientId) + //通知事件 + slf.RouterEvent(clientId,"DisConnect") +} + +func (slf *Router) OnConnected(clientId uint64){ + slf.mapClientRouterCache[clientId] = map[string]int{} + slf.RouterEvent(clientId,"Connect") +} diff --git a/sysservice/tcpgateway/TcpGateService.go b/sysservice/tcpgateway/TcpGateService.go new file mode 100644 index 0000000..2031b09 --- /dev/null +++ b/sysservice/tcpgateway/TcpGateService.go @@ -0,0 +1,81 @@ +package tcpgateway + +import ( + "github.com/duanhf2012/origin/log" + "github.com/duanhf2012/origin/network/processor" + "github.com/duanhf2012/origin/node" + "github.com/duanhf2012/origin/service" + "github.com/duanhf2012/origin/sysservice/tcpservice" +) + +func init(){ + node.Setup(&tcpservice.TcpService{}) + node.Setup(&TcpGateService{}) +} + +type MsgTypeRouterInfo struct { + router IRouter + serviceName string +} + +type TcpGateService struct { + service.Service + + processor processor.IRawProcessor + tcpService *tcpservice.TcpService + + loadBalance ILoadBalance + router IRouter +} + + +func (slf *TcpGateService) OnInit() error { + slf.OnLoad() + + //获取安装好了的TcpService对象 + slf.tcpService = node.GetService("TcpService").(*tcpservice.TcpService) + + //新建内置的protobuf处理器,您也可以自定义路由器,比如json,后续会补充 + slf.processor = processor.NewPBRawProcessor() + + //注册监听客户连接断开事件 + slf.processor.SetDisConnectedHandler(slf.router.OnDisconnected) + //注册监听客户连接事件 + slf.processor.SetConnectedHandler(slf.router.OnConnected) + + //注册监听消息类型MsgType_MsgReq,并注册回调 + slf.processor.SetRawMsgHandler(slf.router.RouterMessage) + //将protobuf消息处理器设置到TcpService服务中 + slf.tcpService.SetProcessor(slf.processor,slf.GetEventHandler()) + + return nil +} + +func (slf *TcpGateService) OnLoad() { + slf.loadBalance = &LoadBalance{} + slf.router = NewRouter(slf.loadBalance,slf,slf.GetServiceCfg()) + + //加载路由 + slf.router.Load() +} + +func (slf *TcpGateService) SetupLoadBalance(loadBalance ILoadBalance){ + slf.loadBalance = loadBalance +} + +func (slf *TcpGateService) SetupRouter(router IRouter){ + slf.router = router +} + +func (slf *TcpGateService) RPC_Dispatch(replyMsg *ReplyMessage) error { + for _,id := range replyMsg.ClientList { + err := slf.tcpService.SendRawMsg(id,replyMsg.Msg) + if err != nil { + log.Debug("SendRawMsg fail:%+v!",err) + } + } + + return nil +} + + diff --git a/sysservice/tcpgateway/base.pb.go b/sysservice/tcpgateway/base.pb.go new file mode 100644 index 0000000..bbcccfd --- /dev/null +++ b/sysservice/tcpgateway/base.pb.go @@ -0,0 +1,473 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// source: base/base.proto + +package tcpgateway + +import ( + fmt "fmt" + proto "github.com/golang/protobuf/proto" + math "math" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package + +type Int struct { + Value *int32 `protobuf:"varint,1,opt,name=value" json:"value,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *Int) Reset() { *m = Int{} } +func (m *Int) String() string { return proto.CompactTextString(m) } +func (*Int) ProtoMessage() {} +func (*Int) Descriptor() ([]byte, []int) { + return fileDescriptor_d66ec2e140567106, []int{0} +} + +func (m *Int) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_Int.Unmarshal(m, b) +} +func (m *Int) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_Int.Marshal(b, m, deterministic) +} +func (m *Int) XXX_Merge(src proto.Message) { + xxx_messageInfo_Int.Merge(m, src) +} +func (m *Int) XXX_Size() int { + return xxx_messageInfo_Int.Size(m) +} +func (m *Int) XXX_DiscardUnknown() { + xxx_messageInfo_Int.DiscardUnknown(m) +} + +var xxx_messageInfo_Int proto.InternalMessageInfo + +func (m *Int) GetValue() int32 { + if m != nil && m.Value != nil { + return *m.Value + } + return 0 +} + +type Int64 struct { + Value *int64 `protobuf:"varint,1,opt,name=value" json:"value,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *Int64) Reset() { *m = Int64{} } +func (m *Int64) String() string { return proto.CompactTextString(m) } +func (*Int64) ProtoMessage() {} +func (*Int64) Descriptor() ([]byte, []int) { + return fileDescriptor_d66ec2e140567106, []int{1} +} + +func (m *Int64) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_Int64.Unmarshal(m, b) +} +func (m *Int64) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_Int64.Marshal(b, m, deterministic) +} +func (m *Int64) XXX_Merge(src proto.Message) { + xxx_messageInfo_Int64.Merge(m, src) +} +func (m *Int64) XXX_Size() int { + return xxx_messageInfo_Int64.Size(m) +} +func (m *Int64) XXX_DiscardUnknown() { + xxx_messageInfo_Int64.DiscardUnknown(m) +} + +var xxx_messageInfo_Int64 proto.InternalMessageInfo + +func (m *Int64) GetValue() int64 { + if m != nil && m.Value != nil { + return *m.Value + } + return 0 +} + +type Bool struct { + Value *bool `protobuf:"varint,1,opt,name=value" json:"value,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *Bool) Reset() { *m = Bool{} } +func (m *Bool) String() string { return proto.CompactTextString(m) } +func (*Bool) ProtoMessage() {} +func (*Bool) Descriptor() ([]byte, []int) { + return fileDescriptor_d66ec2e140567106, []int{2} +} + +func (m *Bool) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_Bool.Unmarshal(m, b) +} +func (m *Bool) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_Bool.Marshal(b, m, deterministic) +} +func (m *Bool) XXX_Merge(src proto.Message) { + xxx_messageInfo_Bool.Merge(m, src) +} +func (m *Bool) XXX_Size() int { + return xxx_messageInfo_Bool.Size(m) +} +func (m *Bool) XXX_DiscardUnknown() { + xxx_messageInfo_Bool.DiscardUnknown(m) +} + +var xxx_messageInfo_Bool proto.InternalMessageInfo + +func (m *Bool) GetValue() bool { + if m != nil && m.Value != nil { + return *m.Value + } + return false +} + +type String struct { + Value *string `protobuf:"bytes,1,opt,name=value" json:"value,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *String) Reset() { *m = String{} } +func (m *String) String() string { return proto.CompactTextString(m) } +func (*String) ProtoMessage() {} +func (*String) Descriptor() ([]byte, []int) { + return fileDescriptor_d66ec2e140567106, []int{3} +} + +func (m *String) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_String.Unmarshal(m, b) +} +func (m *String) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_String.Marshal(b, m, deterministic) +} +func (m *String) XXX_Merge(src proto.Message) { + xxx_messageInfo_String.Merge(m, src) +} +func (m *String) XXX_Size() int { + return xxx_messageInfo_String.Size(m) +} +func (m *String) XXX_DiscardUnknown() { + xxx_messageInfo_String.DiscardUnknown(m) +} + +var xxx_messageInfo_String proto.InternalMessageInfo + +func (m *String) GetValue() string { + if m != nil && m.Value != nil { + return *m.Value + } + return "" +} + +type Bytes struct { + Value []byte `protobuf:"bytes,1,opt,name=value" json:"value,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *Bytes) Reset() { *m = Bytes{} } +func (m *Bytes) String() string { return proto.CompactTextString(m) } +func (*Bytes) ProtoMessage() {} +func (*Bytes) Descriptor() ([]byte, []int) { + return fileDescriptor_d66ec2e140567106, []int{4} +} + +func (m *Bytes) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_Bytes.Unmarshal(m, b) +} +func (m *Bytes) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_Bytes.Marshal(b, m, deterministic) +} +func (m *Bytes) XXX_Merge(src proto.Message) { + xxx_messageInfo_Bytes.Merge(m, src) +} +func (m *Bytes) XXX_Size() int { + return xxx_messageInfo_Bytes.Size(m) +} +func (m *Bytes) XXX_DiscardUnknown() { + xxx_messageInfo_Bytes.DiscardUnknown(m) +} + +var xxx_messageInfo_Bytes proto.InternalMessageInfo + +func (m *Bytes) GetValue() []byte { + if m != nil { + return m.Value + } + return nil +} + +type MsgHead struct { + Cid *int32 `protobuf:"varint,1,opt,name=cid" json:"cid,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *MsgHead) Reset() { *m = MsgHead{} } +func (m *MsgHead) String() string { return proto.CompactTextString(m) } +func (*MsgHead) ProtoMessage() {} +func (*MsgHead) Descriptor() ([]byte, []int) { + return fileDescriptor_d66ec2e140567106, []int{5} +} + +func (m *MsgHead) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_MsgHead.Unmarshal(m, b) +} +func (m *MsgHead) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_MsgHead.Marshal(b, m, deterministic) +} +func (m *MsgHead) XXX_Merge(src proto.Message) { + xxx_messageInfo_MsgHead.Merge(m, src) +} +func (m *MsgHead) XXX_Size() int { + return xxx_messageInfo_MsgHead.Size(m) +} +func (m *MsgHead) XXX_DiscardUnknown() { + xxx_messageInfo_MsgHead.DiscardUnknown(m) +} + +var xxx_messageInfo_MsgHead proto.InternalMessageInfo + +func (m *MsgHead) GetCid() int32 { + if m != nil && m.Cid != nil { + return *m.Cid + } + return 0 +} + +type Msg struct { + Head *MsgHead `protobuf:"bytes,1,opt,name=head" json:"head,omitempty"` + Ret *int32 `protobuf:"varint,2,opt,name=ret" json:"ret,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + proto.XXX_InternalExtensions `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *Msg) Reset() { *m = Msg{} } +func (m *Msg) String() string { return proto.CompactTextString(m) } +func (*Msg) ProtoMessage() {} +func (*Msg) Descriptor() ([]byte, []int) { + return fileDescriptor_d66ec2e140567106, []int{6} +} + +var extRange_Msg = []proto.ExtensionRange{ + {Start: 100, End: 10000}, +} + +func (*Msg) ExtensionRangeArray() []proto.ExtensionRange { + return extRange_Msg +} + +func (m *Msg) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_Msg.Unmarshal(m, b) +} +func (m *Msg) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_Msg.Marshal(b, m, deterministic) +} +func (m *Msg) XXX_Merge(src proto.Message) { + xxx_messageInfo_Msg.Merge(m, src) +} +func (m *Msg) XXX_Size() int { + return xxx_messageInfo_Msg.Size(m) +} +func (m *Msg) XXX_DiscardUnknown() { + xxx_messageInfo_Msg.DiscardUnknown(m) +} + +var xxx_messageInfo_Msg proto.InternalMessageInfo + +func (m *Msg) GetHead() *MsgHead { + if m != nil { + return m.Head + } + return nil +} + +func (m *Msg) GetRet() int32 { + if m != nil && m.Ret != nil { + return *m.Ret + } + return 0 +} + +type ClientList struct { + ClientList []uint64 `protobuf:"varint,1,rep,name=clientList" json:"clientList,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *ClientList) Reset() { *m = ClientList{} } +func (m *ClientList) String() string { return proto.CompactTextString(m) } +func (*ClientList) ProtoMessage() {} +func (*ClientList) Descriptor() ([]byte, []int) { + return fileDescriptor_d66ec2e140567106, []int{7} +} + +func (m *ClientList) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_ClientList.Unmarshal(m, b) +} +func (m *ClientList) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_ClientList.Marshal(b, m, deterministic) +} +func (m *ClientList) XXX_Merge(src proto.Message) { + xxx_messageInfo_ClientList.Merge(m, src) +} +func (m *ClientList) XXX_Size() int { + return xxx_messageInfo_ClientList.Size(m) +} +func (m *ClientList) XXX_DiscardUnknown() { + xxx_messageInfo_ClientList.DiscardUnknown(m) +} + +var xxx_messageInfo_ClientList proto.InternalMessageInfo + +func (m *ClientList) GetClientList() []uint64 { + if m != nil { + return m.ClientList + } + return nil +} + +type ReplyMessage struct { + ClientList []uint64 `protobuf:"varint,1,rep,name=clientList" json:"clientList,omitempty"` + MsgType *uint32 `protobuf:"varint,2,opt,name=msgType" json:"msgType,omitempty"` + Msg []byte `protobuf:"bytes,3,opt,name=msg" json:"msg,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *ReplyMessage) Reset() { *m = ReplyMessage{} } +func (m *ReplyMessage) String() string { return proto.CompactTextString(m) } +func (*ReplyMessage) ProtoMessage() {} +func (*ReplyMessage) Descriptor() ([]byte, []int) { + return fileDescriptor_d66ec2e140567106, []int{8} +} + +func (m *ReplyMessage) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_ReplyMessage.Unmarshal(m, b) +} +func (m *ReplyMessage) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_ReplyMessage.Marshal(b, m, deterministic) +} +func (m *ReplyMessage) XXX_Merge(src proto.Message) { + xxx_messageInfo_ReplyMessage.Merge(m, src) +} +func (m *ReplyMessage) XXX_Size() int { + return xxx_messageInfo_ReplyMessage.Size(m) +} +func (m *ReplyMessage) XXX_DiscardUnknown() { + xxx_messageInfo_ReplyMessage.DiscardUnknown(m) +} + +var xxx_messageInfo_ReplyMessage proto.InternalMessageInfo + +func (m *ReplyMessage) GetClientList() []uint64 { + if m != nil { + return m.ClientList + } + return nil +} + +func (m *ReplyMessage) GetMsgType() uint32 { + if m != nil && m.MsgType != nil { + return *m.MsgType + } + return 0 +} + +func (m *ReplyMessage) GetMsg() []byte { + if m != nil { + return m.Msg + } + return nil +} + +type PlaceHolders struct { + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *PlaceHolders) Reset() { *m = PlaceHolders{} } +func (m *PlaceHolders) String() string { return proto.CompactTextString(m) } +func (*PlaceHolders) ProtoMessage() {} +func (*PlaceHolders) Descriptor() ([]byte, []int) { + return fileDescriptor_d66ec2e140567106, []int{9} +} + +func (m *PlaceHolders) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_PlaceHolders.Unmarshal(m, b) +} +func (m *PlaceHolders) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_PlaceHolders.Marshal(b, m, deterministic) +} +func (m *PlaceHolders) XXX_Merge(src proto.Message) { + xxx_messageInfo_PlaceHolders.Merge(m, src) +} +func (m *PlaceHolders) XXX_Size() int { + return xxx_messageInfo_PlaceHolders.Size(m) +} +func (m *PlaceHolders) XXX_DiscardUnknown() { + xxx_messageInfo_PlaceHolders.DiscardUnknown(m) +} + +var xxx_messageInfo_PlaceHolders proto.InternalMessageInfo + +func init() { + proto.RegisterType((*Int)(nil), "tcpgateway.Int") + proto.RegisterType((*Int64)(nil), "tcpgateway.Int64") + proto.RegisterType((*Bool)(nil), "tcpgateway.Bool") + proto.RegisterType((*String)(nil), "tcpgateway.String") + proto.RegisterType((*Bytes)(nil), "tcpgateway.Bytes") + proto.RegisterType((*MsgHead)(nil), "tcpgateway.MsgHead") + proto.RegisterType((*Msg)(nil), "tcpgateway.Msg") + proto.RegisterType((*ClientList)(nil), "tcpgateway.ClientList") + proto.RegisterType((*ReplyMessage)(nil), "tcpgateway.ReplyMessage") + proto.RegisterType((*PlaceHolders)(nil), "tcpgateway.PlaceHolders") +} + +func init() { proto.RegisterFile("base/base.proto", fileDescriptor_d66ec2e140567106) } + +var fileDescriptor_d66ec2e140567106 = []byte{ + // 276 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x84, 0x8f, 0x41, 0x4b, 0xc3, 0x40, + 0x10, 0x85, 0xa9, 0x9b, 0xd8, 0x3a, 0x46, 0x2d, 0xab, 0x87, 0x40, 0xb5, 0x94, 0xbd, 0x58, 0x44, + 0x22, 0x88, 0xf8, 0x03, 0xea, 0xc1, 0x16, 0x8c, 0x48, 0xf4, 0xe4, 0x6d, 0x4d, 0x86, 0x35, 0xb0, + 0x4d, 0x42, 0x66, 0x54, 0xf2, 0x33, 0xfc, 0xc7, 0x92, 0xa4, 0xd1, 0xe6, 0xe4, 0x65, 0x99, 0xb7, + 0xdf, 0xec, 0xdb, 0xf7, 0xe0, 0xe8, 0x4d, 0x13, 0x5e, 0xd5, 0x47, 0x50, 0x94, 0x39, 0xe7, 0x12, + 0x38, 0x2e, 0x8c, 0x66, 0xfc, 0xd2, 0x95, 0x9a, 0x80, 0x58, 0x65, 0x2c, 0x4f, 0xc0, 0xfd, 0xd4, + 0xf6, 0x03, 0xfd, 0xc1, 0x6c, 0x30, 0x77, 0xa3, 0x56, 0xa8, 0x33, 0x70, 0x57, 0x19, 0xdf, 0xde, + 0xf4, 0xb1, 0xe8, 0xf0, 0x29, 0x38, 0x8b, 0x3c, 0xb7, 0x7d, 0x3a, 0xea, 0xe8, 0x14, 0x76, 0x9f, + 0xb9, 0x4c, 0x33, 0xd3, 0xe7, 0x7b, 0x5b, 0xe6, 0x8b, 0x8a, 0x91, 0xfa, 0xd8, 0xeb, 0xf0, 0x04, + 0x86, 0x21, 0x99, 0x25, 0xea, 0x44, 0x8e, 0x41, 0xc4, 0x69, 0xb2, 0x89, 0x56, 0x8f, 0xea, 0x1e, + 0x44, 0x48, 0x46, 0x9e, 0x83, 0xf3, 0x8e, 0xba, 0x25, 0xfb, 0xd7, 0xc7, 0xc1, 0x5f, 0xaf, 0x60, + 0xf3, 0x36, 0x6a, 0x16, 0x6a, 0x87, 0x12, 0xd9, 0xdf, 0x69, 0x1d, 0x4a, 0xe4, 0x0b, 0x77, 0x94, + 0x8c, 0xbf, 0x1f, 0xd5, 0x25, 0xc0, 0x9d, 0x4d, 0x31, 0xe3, 0x87, 0x94, 0x58, 0x4e, 0x01, 0xe2, + 0x5f, 0xe5, 0x0f, 0x66, 0x62, 0xee, 0x44, 0x5b, 0x37, 0xea, 0x15, 0xbc, 0x08, 0x0b, 0x5b, 0x85, + 0x48, 0xa4, 0x0d, 0xfe, 0xb7, 0x2f, 0x7d, 0x18, 0xae, 0xc9, 0xbc, 0x54, 0x05, 0x36, 0x5f, 0x1f, + 0x44, 0x9d, 0xac, 0x03, 0xad, 0xc9, 0xf8, 0xa2, 0x69, 0x5c, 0x8f, 0xea, 0x10, 0xbc, 0x27, 0xab, + 0x63, 0x5c, 0xe6, 0x36, 0xc1, 0x92, 0x7e, 0x02, 0x00, 0x00, 0xff, 0xff, 0xc7, 0xb1, 0xeb, 0xcb, + 0xb6, 0x01, 0x00, 0x00, +} diff --git a/sysservice/tcpgateway/base.proto b/sysservice/tcpgateway/base.proto new file mode 100644 index 0000000..88a5cad --- /dev/null +++ b/sysservice/tcpgateway/base.proto @@ -0,0 +1,55 @@ +syntax = "proto2"; + +package tcpgateway; + + +message Int{ + optional int32 value = 1; +} + + +message Int64{ + optional int64 value = 1; +} + + +message Bool{ + optional bool value = 1; +} + +message String{ + optional string value = 1; +} + +message Bytes{ + optional bytes value = 1; +} + + +message MsgHead +{ + optional int32 cid = 1; +} + +message Msg +{ + optional MsgHead head = 1; + optional int32 ret = 2; + extensions 100 to 10000; +} + +message ClientList +{ + repeated uint64 clientList = 1; +} + +message ReplyMessage +{ + repeated uint64 clientList = 1; + optional uint32 msgType = 2; + optional bytes msg = 3; +} + +message PlaceHolders +{ +} \ No newline at end of file diff --git a/sysservice/tcpservice/tcpservice.go b/sysservice/tcpservice/tcpservice.go index e5ba281..f5ebfe9 100644 --- a/sysservice/tcpservice/tcpservice.go +++ b/sysservice/tcpservice/tcpservice.go @@ -5,8 +5,11 @@ import ( "github.com/duanhf2012/origin/event" "github.com/duanhf2012/origin/log" "github.com/duanhf2012/origin/network" + "github.com/duanhf2012/origin/network/processor" + "github.com/duanhf2012/origin/node" "github.com/duanhf2012/origin/service" "sync" + "time" ) type TcpService struct { @@ -15,8 +18,8 @@ type TcpService struct { mapClientLocker sync.RWMutex mapClient map[uint64] *Client - initClientId uint64 - process network.Processor + //initClientId uint64 + process processor.IProcessor } type TcpPackType int8 @@ -28,10 +31,10 @@ const( ) type TcpPack struct { - Type TcpPackType //0表示连接 1表示断开 2表示数据 - MsgProcessor network.Processor - ClientId uint64 - Data interface{} + Type TcpPackType //0表示连接 1表示断开 2表示数据 + MsgProcessor processor.IProcessor + ClientId uint64 + Data interface{} } const Default_MaxConnNum = 3000 @@ -40,6 +43,31 @@ const Default_LittleEndian = false const Default_MinMsgLen = 2 const Default_MaxMsgLen = 65535 +const ( + MaxNodeId = 1<<10 - 1 //Uint10 + MaxSeed = 1<<22 - 1 //MaxUint24 +) +var seed uint32 +var seedLocker sync.Mutex + +func (slf *TcpService) genId() uint64 { + if node.GetNodeId()>MaxNodeId{ + panic("nodeId exceeds the maximum!") + } + + seedLocker.Lock() + seed = (seed+1)%MaxSeed + seedLocker.Unlock() + + nowTime := uint64(time.Now().Second()) + + return (uint64(node.GetNodeId())<<54)|(nowTime<<22)|uint64(seed) +} + +func GetNodeId(agentId uint64) int { + return int(agentId>>54) +} + func (slf *TcpService) OnInit() error{ iConfig := slf.GetServiceCfg() if iConfig == nil { @@ -96,7 +124,7 @@ func (slf *TcpService) TcpEventHandler(ev *event.Event) { } } -func (slf *TcpService) SetProcessor(process network.Processor,handler event.IEventHandler){ +func (slf *TcpService) SetProcessor(process processor.IProcessor,handler event.IEventHandler){ slf.process = process slf.RegEventReciverFunc(event.Sys_Event_Tcp,handler,slf.TcpEventHandler) } @@ -106,15 +134,16 @@ func (slf *TcpService) NewClient(conn *network.TCPConn) network.Agent { defer slf.mapClientLocker.Unlock() for { - slf.initClientId+=1 - _,ok := slf.mapClient[slf.initClientId] + clientId := slf.genId() + _,ok := slf.mapClient[clientId] if ok == true { continue } - pClient := &Client{tcpConn:conn, id:slf.initClientId} + pClient := &Client{tcpConn:conn, id:clientId} pClient.tcpService = slf - slf.mapClient[slf.initClientId] = pClient + slf.mapClient[clientId] = pClient + return pClient } @@ -134,6 +163,9 @@ func (slf *Client) GetId() uint64 { func (slf *Client) Run() { slf.tcpService.NotifyEvent(&event.Event{Type:event.Sys_Event_Tcp,Data:&TcpPack{ClientId:slf.id,Type:TPT_Connected,MsgProcessor:slf.tcpService.process}}) for{ + if slf.tcpConn == nil { + break + } bytes,err := slf.tcpConn.ReadMsg() if err != nil { log.Debug("read client id %d is error:%+v",slf.id,err) @@ -155,12 +187,12 @@ func (slf *Client) OnClose(){ delete (slf.tcpService.mapClient,slf.GetId()) } -func (slf *TcpService) SendMsg(clientid uint64,msg interface{}) error{ +func (slf *TcpService) SendMsg(clientId uint64,msg interface{}) error{ slf.mapClientLocker.Lock() - client,ok := slf.mapClient[clientid] + client,ok := slf.mapClient[clientId] if ok == false{ slf.mapClientLocker.Unlock() - return fmt.Errorf("client %d is disconnect!",clientid) + return fmt.Errorf("client %d is disconnect!",clientId) } slf.mapClientLocker.Unlock() @@ -171,11 +203,11 @@ func (slf *TcpService) SendMsg(clientid uint64,msg interface{}) error{ return client.tcpConn.WriteMsg(bytes) } -func (slf *TcpService) Close(clientid uint64) { +func (slf *TcpService) Close(clientId uint64) { slf.mapClientLocker.Lock() defer slf.mapClientLocker.Unlock() - client,ok := slf.mapClient[clientid] + client,ok := slf.mapClient[clientId] if ok == false{ return } @@ -197,3 +229,15 @@ func (slf *TcpService) GetClientIp(clientid uint64) string{ return pClient.tcpConn.GetRemoteIp() } + + +func (slf *TcpService) SendRawMsg(clientId uint64,msg []byte) error{ + slf.mapClientLocker.Lock() + client,ok := slf.mapClient[clientId] + if ok == false{ + slf.mapClientLocker.Unlock() + return fmt.Errorf("client %d is disconnect!",clientId) + } + slf.mapClientLocker.Unlock() + return client.tcpConn.WriteMsg(msg) +} diff --git a/sysservice/wsservice/wsservice.go b/sysservice/wsservice/wsservice.go index 07c3f20..61c7631 100644 --- a/sysservice/wsservice/wsservice.go +++ b/sysservice/wsservice/wsservice.go @@ -5,6 +5,7 @@ import ( "github.com/duanhf2012/origin/event" "github.com/duanhf2012/origin/log" "github.com/duanhf2012/origin/network" + "github.com/duanhf2012/origin/network/processor" "github.com/duanhf2012/origin/service" "sync" ) @@ -16,7 +17,7 @@ type WSService struct { mapClientLocker sync.RWMutex mapClient map[uint64] *WSClient initClientId uint64 - process network.Processor + process processor.Processor } type WSPackType int8 @@ -28,10 +29,10 @@ const( ) type WSPack struct { - Type WSPackType //0表示连接 1表示断开 2表示数据 - MsgProcessor network.Processor - ClientId uint64 - Data interface{} + Type WSPackType //0表示连接 1表示断开 2表示数据 + MsgProcessor processor.Processor + ClientId uint64 + Data interface{} } @@ -89,7 +90,7 @@ func (slf *WSService) WSEventHandler(ev *event.Event) { } } -func (slf *WSService) SetProcessor(process network.Processor,handler event.IEventHandler){ +func (slf *WSService) SetProcessor(process processor.Processor,handler event.IEventHandler){ slf.process = process slf.RegEventReciverFunc(event.Sys_Event_WebSocket,handler,slf.WSEventHandler) }