From a0b02c7ec258d1a9efae503f9499e0bc90693929 Mon Sep 17 00:00:00 2001 From: duanhf2012 Date: Wed, 8 Apr 2020 14:43:30 +0800 Subject: [PATCH 1/5] =?UTF-8?q?=E6=96=B0=E5=A2=9E=E5=88=A4=E6=96=AD?= =?UTF-8?q?=E8=BF=9E=E6=8E=A5=E7=8A=B6=E6=80=81=E6=8E=A5=E5=8F=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cluster/cluster.go | 5 +++++ network/tcp_conn.go | 4 ++++ rpc/client.go | 4 ++++ 3 files changed, 13 insertions(+) diff --git a/cluster/cluster.go b/cluster/cluster.go index aa9470d..84b30eb 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -145,3 +145,8 @@ func GetRpcClient(nodeId int,serviceMethod string) ([]*rpc.Client,error) { func GetRpcServer() *rpc.Server{ return &cluster.rpcServer } + +func (slf *Cluster) IsNodeConnected (nodeId int) bool { + pClient := slf.GetRpcClient(nodeId) + return pClient!=nil && pClient.IsConnected() +} diff --git a/network/tcp_conn.go b/network/tcp_conn.go index 58b36cc..26c2318 100644 --- a/network/tcp_conn.go +++ b/network/tcp_conn.go @@ -111,3 +111,7 @@ func (tcpConn *TCPConn) ReadMsg() ([]byte, error) { func (tcpConn *TCPConn) WriteMsg(args ...[]byte) error { return tcpConn.msgParser.Write(tcpConn, args...) } + +func (tcpConn *TCPConn) IsConnected() bool { + return tcpConn.closeFlag == false +} \ No newline at end of file diff --git a/rpc/client.go b/rpc/client.go index c308f4e..fa9dc6f 100644 --- a/rpc/client.go +++ b/rpc/client.go @@ -219,3 +219,7 @@ func (slf *Client) Run(){ func (slf *Client) OnClose(){ } + +func (slf *Client) IsConnected() bool { + return slf.conn!=nil && slf.conn.IsConnected()==true +} \ No newline at end of file From 4445b583fbfb3b822038b9d6b1e7c453e2c4aa3c Mon Sep 17 00:00:00 2001 From: duanhf2012 Date: Wed, 8 Apr 2020 20:54:59 +0800 Subject: [PATCH 2/5] =?UTF-8?q?=E4=BF=AE=E6=94=B9=E4=BA=8B=E4=BB=B6?= =?UTF-8?q?=E5=A4=84=E7=90=86=E6=9C=BA=E5=88=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- event/event.go | 27 ++++++++++++++++++++------- service/service.go | 3 +-- 2 files changed, 21 insertions(+), 9 deletions(-) diff --git a/event/event.go b/event/event.go index 26f699f..f5b20ac 100644 --- a/event/event.go +++ b/event/event.go @@ -10,7 +10,7 @@ import ( const Default_EventChannelLen = 10000 //事件接受器 -type EventReciver func(event *Event) error +type EventReciverFunc func(event *Event) type Event struct { Type EventType @@ -19,10 +19,12 @@ type Event struct { type IEventProcessor interface { NotifyEvent(*Event) - OnEventHandler(event *Event) error + SetEventReciver(eventProcessor IEventProcessor) GetEventReciver() IEventProcessor SetEventChanNum(num int32) bool + RegEventReciverFunc(eventType EventType,reciverFunc EventReciverFunc) + UnRegEventReciverFun(eventType EventType) } type EventProcessor struct { @@ -32,6 +34,15 @@ type EventProcessor struct { eventChanNumLocker sync.RWMutex eventChanNum int32 + mapEventReciverFunc map[EventType]EventReciverFunc +} + +func (slf *EventProcessor) RegEventReciverFunc(eventType EventType,reciverFunc EventReciverFunc){ + slf.mapEventReciverFunc[eventType] = reciverFunc +} + +func (slf *EventProcessor) UnRegEventReciverFun(eventType EventType){ + delete(slf.mapEventReciverFunc,eventType) } func (slf *EventProcessor) NotifyEvent(pEvent *Event) { @@ -41,9 +52,7 @@ func (slf *EventProcessor) NotifyEvent(pEvent *Event) { slf.EventChan <-pEvent } -func (slf *EventProcessor) OnEventHandler(event *Event) error{ - return nil -} + func (slf *EventProcessor) GetEventChan() chan *Event{ slf.eventChanNumLocker.Lock() @@ -85,7 +94,7 @@ type IHttpEventData interface { Handle() } -func (slf *EventProcessor) EventHandler(processor IEventProcessor,ev *Event) { +func (slf *EventProcessor) EventHandler(ev *Event) { defer func() { if r := recover(); r != nil { buf := make([]byte, 4096) @@ -99,7 +108,11 @@ func (slf *EventProcessor) EventHandler(processor IEventProcessor,ev *Event) { return } - processor.OnEventHandler(ev) + if fun,ok := slf.mapEventReciverFunc[ev.Type];ok == false{ + return + }else{ + fun(ev) + } } func (slf *EventProcessor) innerEventHandler(ev *Event) bool { diff --git a/service/service.go b/service/service.go index db8ab8c..31ed0f6 100644 --- a/service/service.go +++ b/service/service.go @@ -2,7 +2,6 @@ package service import ( "fmt" - "github.com/duanhf2012/origin/event" "github.com/duanhf2012/origin/log" "github.com/duanhf2012/origin/profiler" "github.com/duanhf2012/origin/rpc" @@ -130,7 +129,7 @@ func (slf *Service) Run() { if slf.profiler!=nil { analyzer = slf.profiler.Push(fmt.Sprintf("Event_%d", int(ev.Type))) } - slf.EventHandler(slf.this.(event.IEventProcessor),ev) + slf.EventHandler(ev) if analyzer!=nil { analyzer.Pop() analyzer = nil From eecb20ab6c40f0486248944e5039a460c12abaff Mon Sep 17 00:00:00 2001 From: duanhf2012 Date: Wed, 8 Apr 2020 21:04:44 +0800 Subject: [PATCH 3/5] =?UTF-8?q?=E5=88=9D=E5=A7=8B=E5=8C=96=E5=AF=B9?= =?UTF-8?q?=E8=B1=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- event/event.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/event/event.go b/event/event.go index f5b20ac..c0a01b9 100644 --- a/event/event.go +++ b/event/event.go @@ -38,6 +38,9 @@ type EventProcessor struct { } func (slf *EventProcessor) RegEventReciverFunc(eventType EventType,reciverFunc EventReciverFunc){ + if slf.mapEventReciverFunc == nil { + slf.mapEventReciverFunc = map[EventType]EventReciverFunc{} + } slf.mapEventReciverFunc[eventType] = reciverFunc } From c7c1558cc3036ff4db429fe2025c3690c307f95a Mon Sep 17 00:00:00 2001 From: duanhf2012 Date: Thu, 9 Apr 2020 10:54:35 +0800 Subject: [PATCH 4/5] =?UTF-8?q?=E6=96=B0=E5=A2=9E=E5=88=A0=E9=99=A4?= =?UTF-8?q?=E8=8E=B7=E5=8F=96http=E5=A4=B4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- sysservice/httpservice.go | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/sysservice/httpservice.go b/sysservice/httpservice.go index 6c7c1bb..5a43629 100644 --- a/sysservice/httpservice.go +++ b/sysservice/httpservice.go @@ -121,6 +121,7 @@ func NewHttpHttpRouter(eventReciver event.IEventProcessor) IHttpRouter { func (slf *HttpSession) Query(key string) (string, bool) { + if slf.mapParam == nil { slf.mapParam = make(map[string]string) @@ -158,6 +159,15 @@ func (slf *HttpSession) AddHeader(key, value string) { slf.w.Header().Add(key,value) } +func (slf *HttpSession) GetHeader(key string) string{ + return slf.r.Header.Get(key) +} + +func (slf *HttpSession) DelHeader(key string) { + slf.r.Header.Del(key) +} + + func (slf *HttpSession) WriteStatusCode(statusCode int){ slf.statusCode = statusCode } From ff8ec204f6f53327e3b26dfe2a68a29e033f757e Mon Sep 17 00:00:00 2001 From: duanhf2012 Date: Sat, 11 Apr 2020 10:03:03 +0800 Subject: [PATCH 5/5] =?UTF-8?q?=E4=BF=AE=E6=94=B9http=E8=BF=94=E5=9B=9E?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- example/GateService/GateService.go | 13 ++++++++++++- sysservice/httpservice.go | 16 +++++++++------- 2 files changed, 21 insertions(+), 8 deletions(-) diff --git a/example/GateService/GateService.go b/example/GateService/GateService.go index 2963a66..b8c4f83 100644 --- a/example/GateService/GateService.go +++ b/example/GateService/GateService.go @@ -9,6 +9,8 @@ import ( "github.com/duanhf2012/origin/node" "github.com/duanhf2012/origin/service" "github.com/duanhf2012/origin/sysservice" + "github.com/duanhf2012/origin/util/timer" + "net/http" ) type GateService struct { @@ -29,9 +31,17 @@ func (slf *GateService) OnInit() error{ slf.httpRouter.GET("/get/query", slf.HttpTest) slf.httpRouter.POST("/post/query", slf.HttpTestPost) slf.httpRouter.SetServeFile(sysservice.METHOD_GET,"/img/head/","d:/img") + + pCronExpr,_ := timer.NewCronExpr("0 * * * * *") + slf.CronFunc(pCronExpr,slf.Test) + return nil } +func (slf *GateService) Test(){ + fmt.Print("xxxxx\n") +} + func (slf *GateService) HttpTest(session *sysservice.HttpSession) { session.SetHeader("a","b") session.Write([]byte("this is a test")) @@ -57,10 +67,11 @@ func (slf *GateService) HttpTestPost(session *sysservice.HttpSession) { testa.AA = 100 testa.BB = "this is a test" - session.WriteJson("asdasda") + session.WriteJsonDone(http.StatusOK,"asdasda") } func (slf *GateService) OnEventHandler(ev *event.Event) error{ + if ev.Type == event.Sys_Event_Tcp_RecvPack { pPack := ev.Data.(*sysservice.TcpPack) slf.processor.Route(ev.Data,pPack.ClientId) diff --git a/sysservice/httpservice.go b/sysservice/httpservice.go index 5a43629..e804cac 100644 --- a/sysservice/httpservice.go +++ b/sysservice/httpservice.go @@ -167,7 +167,6 @@ func (slf *HttpSession) DelHeader(key string) { slf.r.Header.Del(key) } - func (slf *HttpSession) WriteStatusCode(statusCode int){ slf.statusCode = statusCode } @@ -176,15 +175,18 @@ func (slf *HttpSession) Write(msg []byte) { slf.msg = msg } -func (slf *HttpSession) WriteJson(msgJson interface{}) error { +func (slf *HttpSession) WriteJsonDone(statusCode int,msgJson interface{}) error { msg, err := json.Marshal(msgJson) if err == nil { slf.Write(msg) } + slf.Done() return err } + + func (slf *HttpSession) flush() { slf.w.WriteHeader(slf.statusCode) if slf.msg!=nil { @@ -192,7 +194,7 @@ func (slf *HttpSession) flush() { } } -func (slf *HttpSession) done(){ +func (slf *HttpSession) Done(){ slf.sessionDone <- slf } @@ -259,7 +261,7 @@ func (slf *HttpRouter) Router(session *HttpSession){ if slf.httpFiltrateList!=nil { for _,fun := range slf.httpFiltrateList{ if fun(session) == false { - session.done() + //session.done() return } } @@ -278,7 +280,7 @@ func (slf *HttpRouter) Router(session *HttpSession){ } v.httpHandle(session) - session.done() + //session.done() return } @@ -286,13 +288,13 @@ func (slf *HttpRouter) Router(session *HttpSession){ idx := strings.Index(urlPath, k) if idx != -1 { session.fileData = v - session.done() + session.Done() return } } session.WriteStatusCode(http.StatusNotFound) - session.done() + session.Done() } func (slf *HttpService) SetHttpRouter(httpRouter IHttpRouter) {