From d2cade0d5f8f3bed6b6b8d9204c601248e7f4002 Mon Sep 17 00:00:00 2001 From: boyce Date: Fri, 25 Jan 2019 10:06:15 +0800 Subject: [PATCH] =?UTF-8?q?=E6=96=B0=E5=A2=9Ereadme=E4=BD=BF=E7=94=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 121 ++++++++++++++++++++++++++++- helper/websocketclient.go | 159 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 279 insertions(+), 1 deletion(-) create mode 100644 helper/websocketclient.go diff --git a/README.md b/README.md index 0dd64af..17d4255 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,122 @@ # Origin 全分布式服务器引擎 -1服务示例: +1.添加服务 +Test.go + +package Test + +import ( + "fmt" + "os" + "time" + + "github.com/duanhf2012/origin/cluster" + "github.com/duanhf2012/origin/service" +) + +type CTest struct { + service.BaseService + tmp int +} + +type TestModule struct { + service.BaseModule +} + +func (ws *TestModule) OnInit() error { + return nil +} + +func (ws *TestModule) OnRun() error { + return nil +} + +func (ws *CTest) OnInit() error { + + //添加模块 + test := &TestModule{} + ws.AddModule(test, true) + + //获取模块 + pModule := ws.GetModule("TestModule") + fmt.Print(pModule) + return nil +} + + +type CTestData struct { + Bbbb int64 + Cccc int + Ddd string +} + +func (ws *CTest) RPC_LogTicker2(args *CTestData, quo *CTestData) error { + + *quo = *args + return nil +} + +func (ws *CTest) OnRun() error { + + ws.tmp = ws.tmp + 1 + time.Sleep(1 * time.Second) + if ws.tmp%10 == 0 { + var test CTestData + test.Bbbb = 1111 + test.Cccc = 111 + test.Ddd = "1111" + err := cluster.Go("collectTickLogService.RPC_LogTicker2", &test) + fmt.Print(err) + } + + return nil +} + +func NewCTest(servicetype int) *CTest { + wss := new(CTest) + wss.Init(wss, servicetype) + return wss +} + +func checkFileIsExist(filename string) bool { + var exist = true + if _, err := os.Stat(filename); os.IsNotExist(err) { + exist = false + } + return exist +} + +func (ws *CTest) OnDestory() error { + return nil +} +2.使用服务 +main.go +package main + +import ( + "Test" + "bytes" + "compress/flate" + "fmt" + "io/ioutil" + + "github.com/duanhf2012/origin/server" + "github.com/duanhf2012/origin/sysmodule" +) + + +func main() { + server := server.NewServer() + if server == nil { + return + } + + test := Test.NewCTest(1000) + server.SetupService(test) + + server.Init() + server.Start() +} + + + diff --git a/helper/websocketclient.go b/helper/websocketclient.go new file mode 100644 index 0000000..29d8291 --- /dev/null +++ b/helper/websocketclient.go @@ -0,0 +1,159 @@ +package sysmodule + +import ( + "fmt" + "log" + "net/http" + "net/url" + + "github.com/gorilla/websocket" + + "time" +) + +type IWebsocketClient interface { + Init(slf IWebsocketClient, strurl string, bproxy bool, timeoutsec time.Duration) error + Start() error + WriteMessage(msg []byte) error + OnDisconnect() error + OnConnected() error + OnReadMessage(msg []byte) error +} + +type WebsocketClient struct { + WsDailer *websocket.Dialer + conn *websocket.Conn + url string + state int //0未连接状态 1连接状态 + bwritemsg chan []byte + slf IWebsocketClient + timeoutsec time.Duration +} + +func (ws *WebsocketClient) Init(slf IWebsocketClient, strurl string, bproxy bool, timeoutsec time.Duration) error { + + ws.timeoutsec = timeoutsec + ws.slf = slf + if bproxy == true { + proxy := func(_ *http.Request) (*url.URL, error) { + return url.Parse("http://127.0.0.1:1080") + } + + if timeoutsec > 0 { + tosec := timeoutsec * time.Second + ws.WsDailer = &websocket.Dialer{Proxy: proxy, HandshakeTimeout: tosec} + } else { + ws.WsDailer = &websocket.Dialer{Proxy: proxy} + } + } else { + if timeoutsec > 0 { + tosec := timeoutsec * time.Second + ws.WsDailer = &websocket.Dialer{HandshakeTimeout: tosec} + } else { + ws.WsDailer = &websocket.Dialer{} + } + } + + ws.url = strurl + ws.bwritemsg = make(chan []byte, 1000) + + return nil +} + +func (ws *WebsocketClient) OnRun() error { + for { + if ws.state == 0 { + time.Sleep(1 * time.Second) + ws.StartConnect() + } else { + ws.conn.SetReadDeadline(time.Now().Add(ws.timeoutsec * time.Second)) + _, message, err := ws.conn.ReadMessage() + + if err != nil { + log.Printf("到服务器的连接断开 %+v\n", err) + ws.conn.Close() + ws.state = 0 + ws.slf.OnDisconnect() + continue + } + + ws.slf.OnReadMessage(message) + } + } + + return nil +} + +func (ws *WebsocketClient) StartConnect() error { + + var err error + ws.conn, _, err = ws.WsDailer.Dial(ws.url, nil) + fmt.Printf("connecting %s, %+v\n", ws.url, err) + if err != nil { + return err + } + + ws.state = 1 + ws.slf.OnConnected() + + return nil +} + +func (ws *WebsocketClient) Start() error { + ws.state = 0 + go ws.OnRun() + go ws.writeMsg() + return nil +} + +//触发 +func (ws *WebsocketClient) writeMsg() error { + timerC := time.NewTicker(time.Second * 5).C + for { + if ws.state == 0 { + time.Sleep(1 * time.Second) + continue + } + select { + case <-timerC: + if ws.state == 1 { + ws.WriteMessage([]byte(`ping`)) + } + case msg := <-ws.bwritemsg: + if ws.state == 1 { + ws.conn.SetWriteDeadline(time.Now().Add(ws.timeoutsec * time.Second)) + err := ws.conn.WriteMessage(websocket.TextMessage, msg) + + if err != nil { + fmt.Print(err) + ws.state = 0 + ws.conn.Close() + ws.slf.OnDisconnect() + } + } + } + } + + return nil +} + +func (ws *WebsocketClient) WriteMessage(msg []byte) error { + ws.bwritemsg <- msg + return nil +} + +func (ws *WebsocketClient) OnDisconnect() error { + + return nil +} + +func (ws *WebsocketClient) OnConnected() error { + + return nil +} + +//触发 +func (ws *WebsocketClient) OnReadMessage(msg []byte) error { + + return nil +}