Compare commits

...

8 Commits

Author SHA1 Message Date
orgin
be3daf19f9 优化服务配置获取函数 2022-06-09 17:44:42 +08:00
orgin
aa91c7bf1b Merge pull request #860 from cgc1983/master
module exit error
2022-06-09 11:52:50 +08:00
cgc1983
7fe73e55fb module exit error 2022-06-09 11:32:25 +08:00
cgc1983
e5ceaa9e76 ignore macosx 2022-06-02 17:51:03 +08:00
orgin
97c55ada71 优化网络连接Id生成规则 2022-06-02 17:07:01 +08:00
orgin
776b234022 优化网络连接Id生成规则&优化WebSocket服务 2022-06-02 16:09:16 +08:00
orgin
a4f425bd69 同步jsonprocessor 2022-05-31 10:57:47 +08:00
orgin
ee54862be2 fixed HTTP status codes write error 2022-05-12 09:45:22 +08:00
10 changed files with 170 additions and 119 deletions

1
.gitignore vendored
View File

@@ -10,3 +10,4 @@
# Output of the go coverage tool, specifically when used with LiteIDE
*.out
.DS_Store

View File

@@ -244,15 +244,6 @@ func (cls *Cluster) GetNodeIdByService(serviceName string, rpcClientList []*rpc.
return nil, count
}
func (cls *Cluster) getServiceCfg(serviceName string) interface{} {
v, ok := cls.localServiceCfg[serviceName]
if ok == false {
return nil
}
return v
}
func (cls *Cluster) GetServiceCfg(serviceName string) interface{} {
serviceCfg, ok := cls.localServiceCfg[serviceName]
if ok == false {

View File

@@ -5,6 +5,7 @@ import (
"fmt"
"github.com/duanhf2012/origin/network"
"reflect"
"github.com/duanhf2012/origin/log"
)
type MessageJsonInfo struct {
@@ -44,18 +45,18 @@ func (jsonProcessor *JsonProcessor) SetByteOrder(littleEndian bool) {
}
// must goroutine safe
func (jsonProcessor *JsonProcessor ) MsgRoute(msg interface{},userdata interface{}) error{
func (jsonProcessor *JsonProcessor ) MsgRoute(clientId uint64,msg interface{}) error{
pPackInfo := msg.(*JsonPackInfo)
v,ok := jsonProcessor.mapMsg[pPackInfo.typ]
if ok == false {
return fmt.Errorf("cannot find msgtype %d is register!",pPackInfo.typ)
}
v.msgHandler(userdata.(uint64),pPackInfo.msg)
v.msgHandler(clientId,pPackInfo.msg)
return nil
}
func (jsonProcessor *JsonProcessor) Unmarshal(data []byte) (interface{}, error) {
func (jsonProcessor *JsonProcessor) Unmarshal(clientId uint64,data []byte) (interface{}, error) {
typeStruct := struct {Type int `json:"typ"`}{}
defer jsonProcessor.ReleaseByteSlice(data)
err := json.Unmarshal(data, &typeStruct)
@@ -78,7 +79,7 @@ func (jsonProcessor *JsonProcessor) Unmarshal(data []byte) (interface{}, error)
return &JsonPackInfo{typ:msgType,msg:msgData},nil
}
func (jsonProcessor *JsonProcessor) Marshal(msg interface{}) ([]byte, error) {
func (jsonProcessor *JsonProcessor) Marshal(clientId uint64,msg interface{}) ([]byte, error) {
rawMsg,err := json.Marshal(msg)
if err != nil {
return nil,err
@@ -103,16 +104,26 @@ func (jsonProcessor *JsonProcessor) MakeRawMsg(msgType uint16,msg []byte) *JsonP
return &JsonPackInfo{typ:msgType,rawMsg:msg}
}
func (jsonProcessor *JsonProcessor) UnknownMsgRoute(msg interface{}, userData interface{}){
jsonProcessor.unknownMessageHandler(userData.(uint64),msg.([]byte))
func (jsonProcessor *JsonProcessor) UnknownMsgRoute(clientId uint64,msg interface{}){
if jsonProcessor.unknownMessageHandler==nil {
log.SDebug("Unknown message received from ",clientId)
return
}
jsonProcessor.unknownMessageHandler(clientId,msg.([]byte))
}
func (jsonProcessor *JsonProcessor) ConnectedRoute(userData interface{}){
jsonProcessor.connectHandler(userData.(uint64))
func (jsonProcessor *JsonProcessor) ConnectedRoute(clientId uint64){
if jsonProcessor.connectHandler != nil {
jsonProcessor.connectHandler(clientId)
}
}
func (jsonProcessor *JsonProcessor) DisConnectedRoute(userData interface{}){
jsonProcessor.disconnectHandler(userData.(uint64))
func (jsonProcessor *JsonProcessor) DisConnectedRoute(clientId uint64){
if jsonProcessor.disconnectHandler != nil {
jsonProcessor.disconnectHandler(clientId)
}
}
func (jsonProcessor *JsonProcessor) RegisterUnknownMsg(unknownMessageHandler UnknownMessageJsonHandler){

View File

@@ -21,6 +21,7 @@ type WSClient struct {
cons WebsocketConnSet
wg sync.WaitGroup
closeFlag bool
messageType int
}
func (client *WSClient) Start() {
@@ -62,7 +63,7 @@ func (client *WSClient) init() {
if client.cons != nil {
log.SFatal("client is running")
}
client.messageType = websocket.TextMessage
client.cons = make(WebsocketConnSet)
client.closeFlag = false
client.dialer = websocket.Dialer{
@@ -83,6 +84,9 @@ func (client *WSClient) dial() *websocket.Conn {
}
}
func (client *WSClient) SetMessageType(messageType int){
client.messageType = messageType
}
func (client *WSClient) connect() {
defer client.wg.Done()
@@ -102,7 +106,7 @@ reconnect:
client.cons[conn] = struct{}{}
client.Unlock()
wsConn := newWSConn(conn, client.PendingWriteNum, client.MaxMsgLen)
wsConn := newWSConn(conn, client.PendingWriteNum, client.MaxMsgLen,client.messageType)
agent := client.NewAgent(wsConn)
agent.Run()

View File

@@ -18,7 +18,7 @@ type WSConn struct {
closeFlag bool
}
func newWSConn(conn *websocket.Conn, pendingWriteNum int, maxMsgLen uint32) *WSConn {
func newWSConn(conn *websocket.Conn, pendingWriteNum int, maxMsgLen uint32,messageType int) *WSConn {
wsConn := new(WSConn)
wsConn.conn = conn
wsConn.writeChan = make(chan []byte, pendingWriteNum)
@@ -30,7 +30,7 @@ func newWSConn(conn *websocket.Conn, pendingWriteNum int, maxMsgLen uint32) *WSC
break
}
err := conn.WriteMessage(websocket.BinaryMessage, b)
err := conn.WriteMessage(messageType, b)
if err != nil {
break
}

View File

@@ -21,6 +21,7 @@ type WSServer struct {
NewAgent func(*WSConn) Agent
ln net.Listener
handler *WSHandler
messageType int
}
type WSHandler struct {
@@ -32,6 +33,11 @@ type WSHandler struct {
conns WebsocketConnSet
mutexConns sync.Mutex
wg sync.WaitGroup
messageType int
}
func (handler *WSHandler) SetMessageType(messageType int){
handler.messageType = messageType
}
func (handler *WSHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
@@ -45,6 +51,7 @@ func (handler *WSHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return
}
conn.SetReadLimit(int64(handler.maxMsgLen))
handler.messageType = websocket.TextMessage
handler.wg.Add(1)
defer handler.wg.Done()
@@ -64,7 +71,7 @@ func (handler *WSHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
handler.conns[conn] = struct{}{}
handler.mutexConns.Unlock()
wsConn := newWSConn(conn, handler.pendingWriteNum, handler.maxMsgLen)
wsConn := newWSConn(conn, handler.pendingWriteNum, handler.maxMsgLen,handler.messageType)
agent := handler.newAgent(wsConn)
agent.Run()
@@ -76,6 +83,13 @@ func (handler *WSHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
agent.OnClose()
}
func (server *WSServer) SetMessageType(messageType int){
server.messageType = messageType
if server.handler!= nil {
server.handler.SetMessageType(messageType)
}
}
func (server *WSServer) Start() {
ln, err := net.Listen("tcp", server.Addr)
if err != nil {

View File

@@ -2,31 +2,33 @@ package service
import (
"fmt"
"reflect"
"sync/atomic"
"time"
"github.com/duanhf2012/origin/event"
"github.com/duanhf2012/origin/log"
rpcHandle "github.com/duanhf2012/origin/rpc"
"github.com/duanhf2012/origin/util/timer"
"reflect"
"sync/atomic"
"time"
)
const InitModuleId = 1e9
type IModule interface {
SetModuleId(moduleId uint32) bool
GetModuleId() uint32
AddModule(module IModule) (uint32,error)
AddModule(module IModule) (uint32, error)
GetModule(moduleId uint32) IModule
GetAncestor()IModule
GetAncestor() IModule
ReleaseModule(moduleId uint32)
NewModuleId() uint32
GetParent()IModule
GetParent() IModule
OnInit() error
OnRelease()
getBaseModule() IModule
GetService() IService
GetModuleName() string
GetEventProcessor()event.IEventProcessor
GetEventProcessor() event.IEventProcessor
NotifyEvent(ev event.IEvent)
}
@@ -38,25 +40,25 @@ type IModuleTimer interface {
type Module struct {
rpcHandle.IRpcHandler
moduleId uint32 //模块Id
moduleName string //模块名称
parent IModule //父亲
self IModule //自己
child map[uint32]IModule //孩子们
mapActiveTimer map[timer.ITimer]struct{}
moduleId uint32 //模块Id
moduleName string //模块名称
parent IModule //父亲
self IModule //自己
child map[uint32]IModule //孩子们
mapActiveTimer map[timer.ITimer]struct{}
mapActiveIdTimer map[uint64]timer.ITimer
dispatcher *timer.Dispatcher //timer
dispatcher *timer.Dispatcher //timer
//根结点
ancestor IModule //始祖
seedModuleId uint32 //模块id种子
descendants map[uint32]IModule //始祖的后裔们
ancestor IModule //始祖
seedModuleId uint32 //模块id种子
descendants map[uint32]IModule //始祖的后裔们
//事件管道
eventHandler event.IEventHandler
}
func (m *Module) SetModuleId(moduleId uint32) bool{
func (m *Module) SetModuleId(moduleId uint32) bool {
if m.moduleId > 0 {
return false
}
@@ -65,35 +67,35 @@ func (m *Module) SetModuleId(moduleId uint32) bool{
return true
}
func (m *Module) GetModuleId() uint32{
func (m *Module) GetModuleId() uint32 {
return m.moduleId
}
func (m *Module) GetModuleName() string{
func (m *Module) GetModuleName() string {
return m.moduleName
}
func (m *Module) OnInit() error{
return nil
func (m *Module) OnInit() error {
return nil
}
func (m *Module) AddModule(module IModule) (uint32,error){
func (m *Module) AddModule(module IModule) (uint32, error) {
//没有事件处理器不允许加入其他模块
if m.GetEventProcessor() == nil {
return 0,fmt.Errorf("module %+v Event Processor is nil", m.self)
return 0, fmt.Errorf("module %+v Event Processor is nil", m.self)
}
pAddModule := module.getBaseModule().(*Module)
if pAddModule.GetModuleId()==0 {
if pAddModule.GetModuleId() == 0 {
pAddModule.moduleId = m.NewModuleId()
}
if m.child == nil {
m.child = map[uint32]IModule{}
}
_,ok := m.child[module.GetModuleId()]
_, ok := m.child[module.GetModuleId()]
if ok == true {
return 0,fmt.Errorf("exists module id %d",module.GetModuleId())
return 0, fmt.Errorf("exists module id %d", module.GetModuleId())
}
pAddModule.IRpcHandler = m.IRpcHandler
pAddModule.self = module
@@ -105,17 +107,17 @@ func (m *Module) AddModule(module IModule) (uint32,error){
pAddModule.eventHandler.Init(m.eventHandler.GetEventProcessor())
err := module.OnInit()
if err != nil {
return 0,err
return 0, err
}
m.child[module.GetModuleId()] = module
m.ancestor.getBaseModule().(*Module).descendants[module.GetModuleId()] = module
log.SDebug("Add module ",module.GetModuleName()," completed")
return module.GetModuleId(),nil
log.SDebug("Add module ", module.GetModuleName(), " completed")
return module.GetModuleId(), nil
}
func (m *Module) ReleaseModule(moduleId uint32){
func (m *Module) ReleaseModule(moduleId uint32) {
pModule := m.GetModule(moduleId).getBaseModule().(*Module)
//释放子孙
@@ -123,19 +125,19 @@ func (m *Module) ReleaseModule(moduleId uint32){
m.ReleaseModule(id)
}
pModule.GetEventHandler().Destroy()
pModule.self.OnRelease()
pModule.GetEventHandler().Destroy()
log.SDebug("Release module ", pModule.GetModuleName())
for pTimer := range pModule.mapActiveTimer {
pTimer.Cancel()
}
for _,t := range pModule.mapActiveIdTimer {
for _, t := range pModule.mapActiveIdTimer {
t.Cancel()
}
delete(m.child,moduleId)
delete (m.ancestor.getBaseModule().(*Module).descendants,moduleId)
delete(m.child, moduleId)
delete(m.ancestor.getBaseModule().(*Module).descendants, moduleId)
//清理被删除的Module
pModule.self = nil
@@ -149,16 +151,17 @@ func (m *Module) ReleaseModule(moduleId uint32){
pModule.mapActiveIdTimer = nil
}
func (m *Module) NewModuleId() uint32{
m.ancestor.getBaseModule().(*Module).seedModuleId+=1
func (m *Module) NewModuleId() uint32 {
m.ancestor.getBaseModule().(*Module).seedModuleId += 1
return m.ancestor.getBaseModule().(*Module).seedModuleId
}
var timerSeedId uint32
func (m *Module) GenTimerId() uint64{
for{
newTimerId := (uint64(m.GetModuleId())<<32)|uint64(atomic.AddUint32(&timerSeedId,1))
if _,ok := m.mapActiveIdTimer[newTimerId];ok == true {
func (m *Module) GenTimerId() uint64 {
for {
newTimerId := (uint64(m.GetModuleId()) << 32) | uint64(atomic.AddUint32(&timerSeedId, 1))
if _, ok := m.mapActiveIdTimer[newTimerId]; ok == true {
continue
}
@@ -166,33 +169,32 @@ func (m *Module) GenTimerId() uint64{
}
}
func (m *Module) GetAncestor()IModule{
func (m *Module) GetAncestor() IModule {
return m.ancestor
}
func (m *Module) GetModule(moduleId uint32) IModule{
iModule,ok := m.GetAncestor().getBaseModule().(*Module).descendants[moduleId]
func (m *Module) GetModule(moduleId uint32) IModule {
iModule, ok := m.GetAncestor().getBaseModule().(*Module).descendants[moduleId]
if ok == false {
return nil
}
return iModule
}
func (m *Module) getBaseModule() IModule{
func (m *Module) getBaseModule() IModule {
return m
}
func (m *Module) GetParent()IModule{
func (m *Module) GetParent() IModule {
return m.parent
}
func (m *Module) OnCloseTimer(t timer.ITimer){
delete(m.mapActiveIdTimer,t.GetId())
delete(m.mapActiveTimer,t)
func (m *Module) OnCloseTimer(t timer.ITimer) {
delete(m.mapActiveIdTimer, t.GetId())
delete(m.mapActiveTimer, t)
}
func (m *Module) OnAddTimer(t timer.ITimer){
func (m *Module) OnAddTimer(t timer.ITimer) {
if t != nil {
if m.mapActiveTimer == nil {
m.mapActiveTimer = map[timer.ITimer]struct{}{}
@@ -204,33 +206,33 @@ func (m *Module) OnAddTimer(t timer.ITimer){
func (m *Module) AfterFunc(d time.Duration, cb func(*timer.Timer)) *timer.Timer {
if m.mapActiveTimer == nil {
m.mapActiveTimer =map[timer.ITimer]struct{}{}
m.mapActiveTimer = map[timer.ITimer]struct{}{}
}
return m.dispatcher.AfterFunc(d,nil,cb,m.OnCloseTimer,m.OnAddTimer)
return m.dispatcher.AfterFunc(d, nil, cb, m.OnCloseTimer, m.OnAddTimer)
}
func (m *Module) CronFunc(cronExpr *timer.CronExpr, cb func(*timer.Cron)) *timer.Cron {
if m.mapActiveTimer == nil {
m.mapActiveTimer =map[timer.ITimer]struct{}{}
m.mapActiveTimer = map[timer.ITimer]struct{}{}
}
return m.dispatcher.CronFunc(cronExpr,nil,cb,m.OnCloseTimer,m.OnAddTimer)
return m.dispatcher.CronFunc(cronExpr, nil, cb, m.OnCloseTimer, m.OnAddTimer)
}
func (m *Module) NewTicker(d time.Duration, cb func(*timer.Ticker)) *timer.Ticker {
if m.mapActiveTimer == nil {
m.mapActiveTimer =map[timer.ITimer]struct{}{}
m.mapActiveTimer = map[timer.ITimer]struct{}{}
}
return m.dispatcher.TickerFunc(d,nil,cb,m.OnCloseTimer,m.OnAddTimer)
return m.dispatcher.TickerFunc(d, nil, cb, m.OnCloseTimer, m.OnAddTimer)
}
func (m *Module) cb(*timer.Timer){
func (m *Module) cb(*timer.Timer) {
}
func (m *Module) SafeAfterFunc(timerId *uint64,d time.Duration, AdditionData interface{},cb func(uint64,interface{})) {
func (m *Module) SafeAfterFunc(timerId *uint64, d time.Duration, AdditionData interface{}, cb func(uint64, interface{})) {
if m.mapActiveIdTimer == nil {
m.mapActiveIdTimer = map[uint64]timer.ITimer{}
}
@@ -240,45 +242,45 @@ func (m *Module) SafeAfterFunc(timerId *uint64,d time.Duration, AdditionData int
}
*timerId = m.GenTimerId()
t := m.dispatcher.AfterFunc(d,cb,nil,m.OnCloseTimer,m.OnAddTimer)
t := m.dispatcher.AfterFunc(d, cb, nil, m.OnCloseTimer, m.OnAddTimer)
t.AdditionData = AdditionData
t.Id = *timerId
m.mapActiveIdTimer[*timerId] = t
}
func (m *Module) SafeCronFunc(cronId *uint64,cronExpr *timer.CronExpr, AdditionData interface{}, cb func(uint64,interface{})) {
func (m *Module) SafeCronFunc(cronId *uint64, cronExpr *timer.CronExpr, AdditionData interface{}, cb func(uint64, interface{})) {
if m.mapActiveIdTimer == nil {
m.mapActiveIdTimer = map[uint64]timer.ITimer{}
}
*cronId = m.GenTimerId()
c := m.dispatcher.CronFunc(cronExpr,cb,nil,m.OnCloseTimer,m.OnAddTimer)
c := m.dispatcher.CronFunc(cronExpr, cb, nil, m.OnCloseTimer, m.OnAddTimer)
c.AdditionData = AdditionData
c.Id = *cronId
m.mapActiveIdTimer[*cronId] = c
}
func (m *Module) SafeNewTicker(tickerId *uint64,d time.Duration, AdditionData interface{}, cb func(uint64,interface{})) {
func (m *Module) SafeNewTicker(tickerId *uint64, d time.Duration, AdditionData interface{}, cb func(uint64, interface{})) {
if m.mapActiveIdTimer == nil {
m.mapActiveIdTimer = map[uint64]timer.ITimer{}
}
*tickerId = m.GenTimerId()
t := m.dispatcher.TickerFunc(d,cb,nil,m.OnCloseTimer,m.OnAddTimer)
t := m.dispatcher.TickerFunc(d, cb, nil, m.OnCloseTimer, m.OnAddTimer)
t.AdditionData = AdditionData
t.Id = *tickerId
m.mapActiveIdTimer[*tickerId] = t
}
func (m *Module) CancelTimerId(timerId *uint64) bool{
func (m *Module) CancelTimerId(timerId *uint64) bool {
if m.mapActiveIdTimer == nil {
log.SError("mapActiveIdTimer is nil")
return false
}
t,ok := m.mapActiveIdTimer[*timerId]
t, ok := m.mapActiveIdTimer[*timerId]
if ok == false {
log.SError("cannot find timer id ",timerId)
log.SError("cannot find timer id ", timerId)
return false
}
@@ -287,23 +289,21 @@ func (m *Module) CancelTimerId(timerId *uint64) bool{
return true
}
func (m *Module) OnRelease(){
func (m *Module) OnRelease() {
}
func (m *Module) GetService() IService {
return m.GetAncestor().(IService)
}
func (m *Module) GetEventProcessor() event.IEventProcessor{
func (m *Module) GetEventProcessor() event.IEventProcessor {
return m.eventHandler.GetEventProcessor()
}
func (m *Module) NotifyEvent(ev event.IEvent){
func (m *Module) NotifyEvent(ev event.IEvent) {
m.eventHandler.NotifyEvent(ev)
}
func (m *Module) GetEventHandler() event.IEventHandler{
func (m *Module) GetEventHandler() event.IEventHandler {
return m.eventHandler
}
}

View File

@@ -175,10 +175,12 @@ func (slf *HttpSession) Write(msg []byte) {
func (slf *HttpSession) WriteJsonDone(statusCode int,msgJson interface{}) error {
msg, err := json.Marshal(msgJson)
if err == nil {
slf.Write(msg)
if err != nil {
return err
}
slf.statusCode = statusCode
slf.Write(msg)
slf.Done()
return err
}

View File

@@ -8,6 +8,7 @@ import (
"github.com/duanhf2012/origin/network/processor"
"github.com/duanhf2012/origin/node"
"github.com/duanhf2012/origin/service"
"sync/atomic"
"sync"
"time"
"runtime"
@@ -42,12 +43,12 @@ const Default_ReadDeadline = 180 //30s
const Default_WriteDeadline = 180 //30s
const (
MaxNodeId = 1<<10 - 1 //Uint10
MaxSeed = 1<<22 - 1 //MaxUint24
MaxNodeId = 1<<14 - 1 //最大值 16383
MaxSeed = 1<<19 - 1 //最大值 524287
MaxTime = 1<<31 - 1 //最大值 2147483647
)
var seed uint32
var seedLocker sync.Mutex
type TcpPack struct {
Type TcpPackType //0表示连接 1表示断开 2表示数据
@@ -66,16 +67,14 @@ func (tcpService *TcpService) genId() uint64 {
panic("nodeId exceeds the maximum!")
}
seedLocker.Lock()
seed = (seed+1)%MaxSeed
seedLocker.Unlock()
nowTime := uint64(time.Now().Second())
return (uint64(node.GetNodeId())<<54)|(nowTime<<22)|uint64(seed)
newSeed := atomic.AddUint32(&seed,1) % MaxSeed
nowTime := uint64(time.Now().Unix())%MaxTime
return (uint64(node.GetNodeId())<<50)|(nowTime<<19)|uint64(newSeed)
}
func GetNodeId(agentId uint64) int {
return int(agentId>>54)
return int(agentId>>50)
}
func (tcpService *TcpService) OnInit() error{

View File

@@ -7,19 +7,27 @@ import (
"github.com/duanhf2012/origin/network"
"github.com/duanhf2012/origin/network/processor"
"github.com/duanhf2012/origin/service"
"github.com/duanhf2012/origin/node"
"sync"
"sync/atomic"
"time"
)
type WSService struct {
service.Service
wsServer network.WSServer
mapClientLocker sync.RWMutex
mapClient map[uint64] *WSClient
initClientId uint64
process processor.IProcessor
}
var seed uint32
type WSPackType int8
const(
WPT_Connected WSPackType = 0
@@ -32,6 +40,12 @@ const Default_WS_MaxConnNum = 3000
const Default_WS_PendingWriteNum = 10000
const Default_WS_MaxMsgLen = 65535
const (
MaxNodeId = 1<<14 - 1 //最大值 16383
MaxSeed = 1<<19 - 1 //最大值 524287
MaxTime = 1<<31 - 1 //最大值 2147483647
)
type WSClient struct {
id uint64
wsConn *network.WSConn
@@ -46,6 +60,7 @@ type WSPack struct {
}
func (ws *WSService) OnInit() error{
iConfig := ws.GetServiceCfg()
if iConfig == nil {
return fmt.Errorf("%s service config is error!", ws.GetName())
@@ -80,6 +95,10 @@ func (ws *WSService) OnInit() error{
return nil
}
func (ws *WSService) SetMessageType(messageType int){
ws.wsServer.SetMessageType(messageType)
}
func (ws *WSService) WSEventHandler(ev event.IEvent) {
pack := ev.(*event.Event).Data.(*WSPack)
switch pack.Type {
@@ -88,9 +107,9 @@ func (ws *WSService) WSEventHandler(ev event.IEvent) {
case WPT_DisConnected:
pack.MsgProcessor.DisConnectedRoute(pack.ClientId)
case WPT_UnknownPack:
pack.MsgProcessor.UnknownMsgRoute(pack.Data,pack.ClientId)
pack.MsgProcessor.UnknownMsgRoute(pack.ClientId,pack.Data)
case WPT_Pack:
pack.MsgProcessor.MsgRoute(pack.Data, pack.ClientId)
pack.MsgProcessor.MsgRoute(pack.ClientId,pack.Data)
}
}
@@ -99,20 +118,30 @@ func (ws *WSService) SetProcessor(process processor.IProcessor,handler event.IEv
ws.RegEventReceiverFunc(event.Sys_Event_WebSocket,handler, ws.WSEventHandler)
}
func (ws *WSService) genId() uint64 {
if node.GetNodeId()>MaxNodeId{
panic("nodeId exceeds the maximum!")
}
newSeed := atomic.AddUint32(&seed,1) % MaxSeed
nowTime := uint64(time.Now().Unix())%MaxTime
return (uint64(node.GetNodeId())<<50)|(nowTime<<19)|uint64(newSeed)
}
func (ws *WSService) NewWSClient(conn *network.WSConn) network.Agent {
ws.mapClientLocker.Lock()
defer ws.mapClientLocker.Unlock()
for {
ws.initClientId+=1
_,ok := ws.mapClient[ws.initClientId]
clientId := ws.genId()
_,ok := ws.mapClient[clientId]
if ok == true {
continue
}
pClient := &WSClient{wsConn:conn, id: ws.initClientId}
pClient := &WSClient{wsConn:conn, id: clientId}
pClient.wsService = ws
ws.mapClient[ws.initClientId] = pClient
ws.mapClient[clientId] = pClient
return pClient
}
@@ -131,7 +160,7 @@ func (slf *WSClient) Run() {
log.Debug("read client id %d is error:%+v",slf.id,err)
break
}
data,err:=slf.wsService.process.Unmarshal(bytes)
data,err:=slf.wsService.process.Unmarshal(slf.id,bytes)
if err != nil {
slf.wsService.NotifyEvent(&event.Event{Type:event.Sys_Event_WebSocket,Data:&WSPack{ClientId:slf.id,Type:WPT_UnknownPack,Data:bytes,MsgProcessor:slf.wsService.process}})
continue
@@ -156,7 +185,7 @@ func (ws *WSService) SendMsg(clientid uint64,msg interface{}) error{
}
ws.mapClientLocker.Unlock()
bytes,err := ws.process.Marshal(msg)
bytes,err := ws.process.Marshal(clientid,msg)
if err != nil {
return err
}