From 85a66e4d3de36c8d22ad156a007b1f268ee14a94 Mon Sep 17 00:00:00 2001 From: boyce <6549168@qq.com> Date: Mon, 3 Jun 2019 16:38:34 +0800 Subject: [PATCH] =?UTF-8?q?1.=E6=96=B0=E5=A2=9Ewebsocket=20agent=20module?= =?UTF-8?q?=E5=AF=B9=E8=B1=A1=202.=E5=AF=B9module=E6=B7=BB=E5=8A=A0SetUnOn?= =?UTF-8?q?Run=E6=8E=A5=E5=8F=A3=EF=BC=8C=E8=AE=BE=E7=BD=AE=E8=AF=A5?= =?UTF-8?q?=E6=8E=A5=E5=8F=A3=E6=97=B6=E4=B8=8D=E4=BC=9A=E5=BC=80=E5=90=AF?= =?UTF-8?q?=E5=8D=8F=E7=A8=8B=E6=89=A7=E8=A1=8COnRun?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- network/wsagentserver.go | 323 +++++++++++++++++++++++++++++++++++ service/Module.go | 25 ++- sysservice/wsagentservice.go | 41 +++++ 3 files changed, 380 insertions(+), 9 deletions(-) create mode 100644 network/wsagentserver.go create mode 100644 sysservice/wsagentservice.go diff --git a/network/wsagentserver.go b/network/wsagentserver.go new file mode 100644 index 0000000..6734878 --- /dev/null +++ b/network/wsagentserver.go @@ -0,0 +1,323 @@ +package network + +import ( + "crypto/tls" + "errors" + "fmt" + "net/http" + "os" + "reflect" + "runtime/debug" + "sync" + "time" + + "github.com/duanhf2012/origin/service" + "github.com/duanhf2012/origin/sysmodule" + "github.com/gotoxu/cors" + + "github.com/gorilla/mux" + "github.com/gorilla/websocket" +) + +type IWSAgentServer interface { + SendMsg(agentid uint32, messageType int, msg []byte) bool + CreateAgent(urlPattern string, conn *websocket.Conn) IAgent + Disconnect(agentid uint32) + ReleaseAgent(iagent IAgent) +} + +type IAgent interface { + initAgent(conn *websocket.Conn, agentid uint32, iagent IAgent, WSAgentServer IWSAgentServer) + startReadMsg() + startSendMsg() + OnConnected() + OnDisconnect(err error) + OnRecvMsg(msgtype int, data []byte) + //OnHandleHttp(w http.ResponseWriter, r *http.Request) + GetAgentId() uint32 + getConn() *websocket.Conn + getWriteMsgChan() chan WSAgentMessage +} + +type BaseAgent struct { + service.BaseModule + WsServer IWSAgentServer + agent IAgent + agentid uint32 + conn *websocket.Conn + bwritemsg chan WSAgentMessage + iagent IAgent +} + +type WSAgentMessage struct { + msgtype int + bwritemsg []byte +} + +type WSAgentServer struct { + service.BaseModule + wsUri string + maxAgentid uint32 //记录当前最新agentid + //mapAgent map[uint32]IAgent + locker sync.Mutex + + port uint16 + + httpserver *http.Server + regAgent map[string]reflect.Type + + caList []CA + iswss bool +} + +const ( + MAX_AGENT_MSG_COUNT = 20480 +) + +func (slf *WSAgentServer) Init(port uint16) { + slf.port = port +} + +func (slf *WSAgentServer) CreateAgent(urlPattern string, conn *websocket.Conn) IAgent { + slf.locker.Lock() + iAgent, ok := slf.regAgent[urlPattern] + if ok == false { + slf.locker.Unlock() + service.GetLogger().Printf(sysmodule.LEVER_WARN, "Cannot find %s pattern!", urlPattern) + return nil + } + + v := reflect.New(iAgent).Elem().Addr().Interface() + if v == nil { + slf.locker.Unlock() + service.GetLogger().Printf(sysmodule.LEVER_WARN, "new %s pattern agent type is error!", urlPattern) + return nil + } + + pModule := v.(service.IModule) + iagent := v.(IAgent) + slf.maxAgentid++ + agentid := slf.maxAgentid + iagent.initAgent(conn, agentid, iagent, slf) + slf.AddModule(pModule) + + slf.locker.Unlock() + + service.GetLogger().Printf(sysmodule.LEVER_INFO, "Agent id %d is connected.", iagent.GetAgentId()) + return iagent +} + +func (slf *WSAgentServer) ReleaseAgent(iagent IAgent) { + iagent.getConn().Close() + slf.locker.Lock() + slf.ReleaseModule(iagent.GetAgentId()) + //delete(slf.mapAgent, iagent.GetAgentId()) + slf.locker.Unlock() + //关闭写管道 + close(iagent.getWriteMsgChan()) + service.GetLogger().Printf(sysmodule.LEVER_INFO, "Agent id %d is disconnected.", iagent.GetAgentId()) +} + +func (slf *WSAgentServer) SetupAgent(pattern string, agent IAgent, bEnableCompression bool) { + if slf.regAgent == nil { + slf.regAgent = make(map[string]reflect.Type) + } + + slf.regAgent[pattern] = reflect.TypeOf(agent).Elem() //reflect.TypeOf(agent).Elem() +} + +func (slf *WSAgentServer) startListen() { + listenPort := fmt.Sprintf(":%d", slf.port) + + var tlscatList []tls.Certificate + var tlsConfig *tls.Config + for _, cadata := range slf.caList { + cer, err := tls.LoadX509KeyPair(cadata.certfile, cadata.keyfile) + if err != nil { + service.GetLogger().Printf(sysmodule.LEVER_FATAL, "load CA %s-%s file is error :%s", cadata.certfile, cadata.keyfile, err.Error()) + os.Exit(1) + return + } + tlscatList = append(tlscatList, cer) + } + + if len(tlscatList) > 0 { + tlsConfig = &tls.Config{Certificates: tlscatList} + } + + slf.httpserver = &http.Server{ + Addr: listenPort, + Handler: slf.initRouterHandler(), + ReadTimeout: 10 * time.Second, + WriteTimeout: 10 * time.Second, + MaxHeaderBytes: 1 << 20, + TLSConfig: tlsConfig, + } + + var err error + if slf.iswss == true { + err = slf.httpserver.ListenAndServeTLS("", "") + } else { + err = slf.httpserver.ListenAndServe() + } + + if err != nil { + service.GetLogger().Printf(sysmodule.LEVER_FATAL, "http.ListenAndServe(%d, nil) error:%v\n", slf.port, err) + os.Exit(1) + } +} + +func (slf *BaseAgent) startSendMsg() { + for { + msgbuf, ok := <-slf.bwritemsg + if ok == false { + break + } + slf.conn.SetWriteDeadline(time.Now().Add(15 * time.Second)) + err := slf.conn.WriteMessage(msgbuf.msgtype, msgbuf.bwritemsg) + if err != nil { + service.GetLogger().Printf(sysmodule.LEVER_INFO, "write agent id %d is error :%v\n", slf.GetAgentId(), err) + break + } + } +} + +func (slf *WSAgentServer) Start() { + go slf.startListen() +} + +func (slf *WSAgentServer) GetAgentById(agentid uint32) IAgent { + pModule := slf.GetModuleById(agentid) + if pModule == nil { + service.GetLogger().Printf(sysmodule.LEVER_ERROR, "GetAgentById :%d is fail.\n", agentid) + return nil + } + + return pModule.(IAgent) +} + +func (slf *WSAgentServer) SendMsg(agentid uint32, messageType int, msg []byte) bool { + slf.locker.Lock() + defer slf.locker.Unlock() + + iagent := slf.GetAgentById(agentid) + if iagent == nil { + return false + } + + if len(iagent.getWriteMsgChan()) >= MAX_AGENT_MSG_COUNT { + service.GetLogger().Printf(sysmodule.LEVER_ERROR, "message chan is full :%d\n", len(iagent.getWriteMsgChan())) + return false + } + + iagent.getWriteMsgChan() <- WSAgentMessage{messageType, msg} + + return true +} + +func (slf *WSAgentServer) Disconnect(agentid uint32) { + slf.locker.Lock() + defer slf.locker.Unlock() + iagent := slf.GetAgentById(agentid) + if iagent == nil { + return + } + + iagent.getConn().Close() +} + +func (slf *WSAgentServer) Stop() { +} + +func (slf *BaseAgent) startReadMsg() { + defer func() { + if r := recover(); r != nil { + var coreInfo string + coreInfo = string(debug.Stack()) + coreInfo += "\n" + fmt.Sprintf("Core information is %v\n", r) + service.GetLogger().Printf(service.LEVER_FATAL, coreInfo) + slf.agent.OnDisconnect(errors.New("Core dump")) + slf.WsServer.ReleaseAgent(slf.agent) + } + }() + + slf.agent.OnConnected() + for { + slf.conn.SetReadDeadline(time.Now().Add(15 * time.Second)) + msgtype, message, err := slf.conn.ReadMessage() + if err != nil { + slf.agent.OnDisconnect(err) + slf.WsServer.ReleaseAgent(slf.agent) + return + } + + slf.agent.OnRecvMsg(msgtype, message) + } +} + +func (slf *WSAgentServer) initRouterHandler() http.Handler { + r := mux.NewRouter() + + for pattern, _ := range slf.regAgent { + r.HandleFunc(pattern, slf.OnHandleHttp) + } + + cors := cors.AllowAll() + return cors.Handler(r) +} + +func (slf *WSAgentServer) SetWSS(certfile string, keyfile string) bool { + if certfile == "" || keyfile == "" { + return false + } + slf.caList = append(slf.caList, CA{certfile, keyfile}) + slf.iswss = true + return true +} + +func (slf *BaseAgent) GetAgentId() uint32 { + return slf.agentid +} + +func (slf *BaseAgent) initAgent(conn *websocket.Conn, agentid uint32, iagent IAgent, WSAgentServer IWSAgentServer) { + slf.agent = iagent + slf.WsServer = WSAgentServer + slf.bwritemsg = make(chan WSAgentMessage, MAX_AGENT_MSG_COUNT) + slf.agentid = agentid + slf.conn = conn +} + +func (slf *BaseAgent) OnConnected() { +} + +func (slf *BaseAgent) OnDisconnect(err error) { +} + +func (slf *BaseAgent) OnRecvMsg(msgtype int, data []byte) { +} + +func (slf *BaseAgent) getConn() *websocket.Conn { + return slf.conn +} + +func (slf *BaseAgent) getWriteMsgChan() chan WSAgentMessage { + return slf.bwritemsg +} + +func (slf *BaseAgent) SendMsg(agentid uint32, messageType int, msg []byte) bool { + return slf.WsServer.SendMsg(agentid, messageType, msg) +} + +func (slf *WSAgentServer) OnHandleHttp(w http.ResponseWriter, r *http.Request) { + conn, err := websocket.Upgrade(w, r, w.Header(), 1024, 1024) + if err != nil { + http.Error(w, "Could not open websocket connection!", http.StatusBadRequest) + return + } + + agent := slf.CreateAgent(r.URL.Path, conn) + fmt.Print(agent.GetAgentId()) + slf.AddModule(agent.(service.IModule)) + go agent.startSendMsg() + go agent.startReadMsg() +} diff --git a/service/Module.go b/service/Module.go index cee5404..44dc33f 100644 --- a/service/Module.go +++ b/service/Module.go @@ -12,8 +12,7 @@ import ( const ( //ModuleNone ... - MAX_ALLOW_SET_MODULE_ID = iota + 100000000 - INIT_AUTO_INCREMENT + INIT_AUTO_INCREMENT = 0 ) type IModule interface { @@ -38,6 +37,8 @@ type IModule interface { InitModule(exit chan bool, pwaitGroup *sync.WaitGroup) error //手动初始化Module getBaseModule() *BaseModule //获取BaseModule指针 IsInit() bool + SetUnOnRun() + getUnOnRun() bool } type BaseModule struct { @@ -57,6 +58,7 @@ type BaseModule struct { bInit bool recoverCount int8 + bUnOnRun bool } func (slf *BaseModule) GetRoot() IModule { @@ -168,6 +170,14 @@ func (slf *BaseModule) GetSelf() IModule { return slf.selfModule } +func (slf *BaseModule) SetUnOnRun() { + slf.bUnOnRun = true +} + +func (slf *BaseModule) getUnOnRun() bool { + return slf.bUnOnRun +} + func (slf *BaseModule) AddModule(module IModule) uint32 { //消亡状态不允许加入模块 if atomic.LoadInt32(&slf.corouterstatus) != 0 { @@ -175,12 +185,6 @@ func (slf *BaseModule) AddModule(module IModule) uint32 { return 0 } - //用户设置的id不允许大于MAX_ALLOW_SET_MODULE_ID - if module.GetModuleId() > MAX_ALLOW_SET_MODULE_ID { - GetLogger().Printf(LEVER_ERROR, "Module Id %d is error %T", module.GetModuleId(), module.GetSelf()) - return 0 - } - pModule := slf.GetModuleById(module.GetModuleId()) if pModule != nil { GetLogger().Printf(LEVER_ERROR, "%T Cannot AddModule %T,moduleid %d is repeat!", slf.GetSelf(), module.GetSelf(), module.GetModuleId()) @@ -230,7 +234,10 @@ func (slf *BaseModule) AddModule(module IModule) uint32 { module.getBaseModule().OnInit() GetLogger().Printf(LEVER_INFO, "End Init module %T.", module) - go module.RunModule(module) + if module.getUnOnRun() == false { + go module.RunModule(module) + } + return module.GetModuleId() } diff --git a/sysservice/wsagentservice.go b/sysservice/wsagentservice.go new file mode 100644 index 0000000..08666f6 --- /dev/null +++ b/sysservice/wsagentservice.go @@ -0,0 +1,41 @@ +package sysservice + +import ( + "github.com/duanhf2012/origin/network" + "github.com/duanhf2012/origin/service" +) + +type WSAgentService struct { + service.BaseService + agentserver network.WSAgentServer + pattern string + port uint16 + bEnableCompression bool +} + +func (ws *WSAgentService) OnInit() error { + ws.AddModule(&ws.agentserver) + ws.agentserver.Init(ws.port) + return nil +} + +func (ws *WSAgentService) OnRun() bool { + ws.agentserver.Start() + + return false +} + +func NewWSAgentService(port uint16) *WSAgentService { + wss := new(WSAgentService) + + wss.port = port + return wss +} + +func (ws *WSAgentService) OnDestory() error { + return nil +} + +func (ws *WSAgentService) SetupAgent(pattern string, agent network.IAgent, bEnableCompression bool) { + ws.agentserver.SetupAgent(pattern, agent, bEnableCompression) +}