Compare commits

...

5 Commits

Author SHA1 Message Date
boyce
c6d0bd9a19 优化gin模块 2024-05-08 14:18:39 +08:00
boyce
61bf95e457 优化RankService 2024-05-07 18:57:38 +08:00
boyce
8b2a551ee5 优化gin模块 2024-05-06 10:51:51 +08:00
boyce
927c2ffa37 优化gin模块 2024-05-06 10:36:04 +08:00
boyce
b23b30aac5 优化服务发现 2024-04-30 19:05:44 +08:00
5 changed files with 165 additions and 35 deletions

View File

@@ -183,6 +183,10 @@ func (ds *OriginDiscoveryMaster) OnNodeDisconnect(nodeId string) {
func (ds *OriginDiscoveryMaster) RpcCastGo(serviceMethod string, args interface{}) { func (ds *OriginDiscoveryMaster) RpcCastGo(serviceMethod string, args interface{}) {
for nodeId, _ := range ds.mapNodeInfo { for nodeId, _ := range ds.mapNodeInfo {
if nodeId == cluster.GetLocalNodeInfo().NodeId {
continue
}
ds.GoNode(nodeId, serviceMethod, args) ds.GoNode(nodeId, serviceMethod, args)
} }
} }

View File

@@ -17,6 +17,7 @@ const (
Sys_Event_QueueTaskFinish EventType = -10 Sys_Event_QueueTaskFinish EventType = -10
Sys_Event_Retire EventType = -11 Sys_Event_Retire EventType = -11
Sys_Event_EtcdDiscovery EventType = -12 Sys_Event_EtcdDiscovery EventType = -12
Sys_Event_Gin_Event EventType = -13
Sys_Event_User_Define EventType = 1 Sys_Event_User_Define EventType = 1
) )

View File

@@ -2,7 +2,6 @@ package ginmodule
import ( import (
"context" "context"
"datacenter/common/processor"
"github.com/duanhf2012/origin/v2/event" "github.com/duanhf2012/origin/v2/event"
"github.com/duanhf2012/origin/v2/log" "github.com/duanhf2012/origin/v2/log"
"github.com/duanhf2012/origin/v2/service" "github.com/duanhf2012/origin/v2/service"
@@ -10,34 +9,36 @@ import (
"log/slog" "log/slog"
"net/http" "net/http"
"strings" "strings"
"time"
"io"
) )
type IGinProcessor interface {
Process(data *gin.Context) (*gin.Context, error)
}
type GinModule struct { type GinModule struct {
service.Module service.Module
*GinConf
*gin.Engine *gin.Engine
srv *http.Server srv *http.Server
processor []processor.IGinProcessor listenAddr string
handleTimeout time.Duration
processor []IGinProcessor
} }
type GinConf struct { func (gm *GinModule) Init(addr string, handleTimeout time.Duration,engine *gin.Engine) {
Addr string gm.listenAddr = addr
} gm.handleTimeout = handleTimeout
const Sys_Event_Gin_Event event.EventType = -11
func (gm *GinModule) Init(conf *GinConf, engine *gin.Engine) {
gm.GinConf = conf
gm.Engine = engine gm.Engine = engine
} }
func (gm *GinModule) SetupDataProcessor(processor ...processor.IGinProcessor) { func (gm *GinModule) SetupDataProcessor(processor ...IGinProcessor) {
gm.processor = processor gm.processor = processor
} }
func (gm *GinModule) AppendDataProcessor(processor ...processor.IGinProcessor) { func (gm *GinModule) AppendDataProcessor(processor ...IGinProcessor) {
gm.processor = append(gm.processor, processor...) gm.processor = append(gm.processor, processor...)
} }
@@ -47,27 +48,28 @@ func (gm *GinModule) OnInit() error {
} }
gm.srv = &http.Server{ gm.srv = &http.Server{
Addr: gm.Addr, Addr: gm.listenAddr,
Handler: gm.Engine, Handler: gm.Engine,
} }
gm.Engine.Use(Logger()) gm.Engine.Use(Logger())
gm.Engine.Use(gin.Recovery()) gm.Engine.Use(gin.Recovery())
gm.GetEventProcessor().RegEventReceiverFunc(Sys_Event_Gin_Event, gm.GetEventHandler(), gm.eventHandler) gm.GetEventProcessor().RegEventReceiverFunc(event.Sys_Event_Gin_Event, gm.GetEventHandler(), gm.eventHandler)
return nil return nil
} }
func (gm *GinModule) eventHandler(ev event.IEvent) { func (gm *GinModule) eventHandler(ev event.IEvent) {
ginEvent := ev.(*GinEvent) ginEvent := ev.(*GinEvent)
for _, handler := range ginEvent.handlersChain { for _, handler := range ginEvent.handlersChain {
handler(ginEvent.c) handler(&ginEvent.c)
} }
ginEvent.chanWait <- struct{}{} //ginEvent.chanWait <- struct{}{}
} }
func (gm *GinModule) Start() { func (gm *GinModule) Start() {
log.Info("http start listen", slog.Any("addr", gm.Addr)) gm.srv.Addr = gm.listenAddr
log.Info("http start listen", slog.Any("addr", gm.listenAddr))
go func() { go func() {
err := gm.srv.ListenAndServe() err := gm.srv.ListenAndServe()
if err != nil { if err != nil {
@@ -77,7 +79,7 @@ func (gm *GinModule) Start() {
} }
func (gm *GinModule) StartTLS(certFile, keyFile string) { func (gm *GinModule) StartTLS(certFile, keyFile string) {
log.Info("http start listen", slog.Any("addr", gm.Addr)) log.Info("http start listen", slog.Any("addr", gm.listenAddr))
go func() { go func() {
err := gm.srv.ListenAndServeTLS(certFile, keyFile) err := gm.srv.ListenAndServeTLS(certFile, keyFile)
if err != nil { if err != nil {
@@ -92,17 +94,102 @@ func (gm *GinModule) Stop(ctx context.Context) {
} }
} }
type GinEvent struct { type SafeContext struct {
handlersChain gin.HandlersChain *gin.Context
chanWait chan struct{} chanWait chan struct{}
c *gin.Context
} }
func (c *SafeContext) JSONAndDone(code int, obj any) {
c.Context.JSON(code,obj)
c.Done()
}
func (c *SafeContext) AsciiJSONAndDone(code int, obj any){
c.Context.AsciiJSON(code,obj)
c.Done()
}
func (c *SafeContext) PureJSONAndDone(code int, obj any){
c.Context.PureJSON(code,obj)
c.Done()
}
func (c *SafeContext) XMLAndDone(code int, obj any){
c.Context.XML(code,obj)
c.Done()
}
func (c *SafeContext) YAMLAndDone(code int, obj any){
c.Context.YAML(code,obj)
c.Done()
}
func (c *SafeContext) TOMLAndDone(code int, obj any){
c.Context.TOML(code,obj)
c.Done()
}
func (c *SafeContext) ProtoBufAndDone(code int, obj any){
c.Context.ProtoBuf(code,obj)
c.Done()
}
func (c *SafeContext) StringAndDone(code int, format string, values ...any){
c.Context.String(code,format,values...)
c.Done()
}
func (c *SafeContext) RedirectAndDone(code int, location string){
c.Context.Redirect(code,location)
c.Done()
}
func (c *SafeContext) DataAndDone(code int, contentType string, data []byte){
c.Context.Data(code,contentType,data)
c.Done()
}
func (c *SafeContext) DataFromReaderAndDone(code int, contentLength int64, contentType string, reader io.Reader, extraHeaders map[string]string){
c.DataFromReader(code,contentLength,contentType,reader,extraHeaders)
c.Done()
}
func (c *SafeContext) HTMLAndDone(code int, name string, obj any){
c.Context.HTML(code,name,obj)
c.Done()
}
func (c *SafeContext) IndentedJSONAndDone(code int, obj any){
c.Context.IndentedJSON(code,obj)
c.Done()
}
func (c *SafeContext) SecureJSONAndDone(code int, obj any){
c.Context.SecureJSON(code,obj)
c.Done()
}
func (c *SafeContext) JSONPAndDone(code int, obj any){
c.Context.JSONP(code,obj)
c.Done()
}
func (c *SafeContext) Done(){
c.chanWait <- struct{}{}
}
type GinEvent struct {
handlersChain []SafeHandlerFunc
c SafeContext
}
type SafeHandlerFunc func(*SafeContext)
func (ge *GinEvent) GetEventType() event.EventType { func (ge *GinEvent) GetEventType() event.EventType {
return Sys_Event_Gin_Event return event.Sys_Event_Gin_Event
} }
func (gm *GinModule) handleMethod(httpMethod, relativePath string, handlers ...gin.HandlerFunc) gin.IRoutes { func (gm *GinModule) handleMethod(httpMethod, relativePath string, handlers ...SafeHandlerFunc) gin.IRoutes {
return gm.Engine.Handle(httpMethod, relativePath, func(c *gin.Context) { return gm.Engine.Handle(httpMethod, relativePath, func(c *gin.Context) {
for _, p := range gm.processor { for _, p := range gm.processor {
_, err := p.Process(c) _, err := p.Process(c)
@@ -112,33 +199,71 @@ func (gm *GinModule) handleMethod(httpMethod, relativePath string, handlers ...g
} }
var ev GinEvent var ev GinEvent
chanWait := make(chan struct{}) chanWait := make(chan struct{},2)
ev.chanWait = chanWait ev.c.chanWait = chanWait
ev.handlersChain = handlers ev.handlersChain = handlers
ev.c = c ev.c.Context = c
gm.NotifyEvent(&ev) gm.NotifyEvent(&ev)
<-chanWait ctx,cancel := context.WithTimeout(context.Background(), gm.handleTimeout)
defer cancel()
select{
case <-ctx.Done():
log.Error("GinModule process timeout", slog.Any("path", c.Request.URL.Path))
c.AbortWithStatus(http.StatusRequestTimeout)
case <-chanWait:
}
}) })
} }
func (gm *GinModule) SafeGET(relativePath string, handlers ...gin.HandlerFunc) gin.IRoutes { // GET 回调处理是在gin协程中
func (gm *GinModule) GET(relativePath string, handlers ...gin.HandlerFunc) gin.IRoutes {
return gm.Engine.GET(relativePath, handlers...)
}
// POST 回调处理是在gin协程中
func (gm *GinModule) POST(relativePath string, handlers ...gin.HandlerFunc) gin.IRoutes {
return gm.Engine.POST(relativePath, handlers...)
}
// DELETE 回调处理是在gin协程中
func (gm *GinModule) DELETE(relativePath string, handlers ...gin.HandlerFunc) gin.IRoutes {
return gm.Engine.DELETE(relativePath, handlers...)
}
// PATCH 回调处理是在gin协程中
func (gm *GinModule) PATCH(relativePath string, handlers ...gin.HandlerFunc) gin.IRoutes {
return gm.Engine.PATCH(relativePath, handlers...)
}
// Put 回调处理是在gin协程中
func (gm *GinModule) Put(relativePath string, handlers ...gin.HandlerFunc) gin.IRoutes {
return gm.Engine.PUT(relativePath, handlers...)
}
// SafeGET 回调处理是在service协程中
func (gm *GinModule) SafeGET(relativePath string, handlers ...SafeHandlerFunc) gin.IRoutes {
return gm.handleMethod(http.MethodGet, relativePath, handlers...) return gm.handleMethod(http.MethodGet, relativePath, handlers...)
} }
func (gm *GinModule) SafePOST(relativePath string, handlers ...gin.HandlerFunc) gin.IRoutes { // SafePOST 回调处理是在service协程中
func (gm *GinModule) SafePOST(relativePath string, handlers ...SafeHandlerFunc) gin.IRoutes {
return gm.handleMethod(http.MethodPost, relativePath, handlers...) return gm.handleMethod(http.MethodPost, relativePath, handlers...)
} }
func (gm *GinModule) SafeDELETE(relativePath string, handlers ...gin.HandlerFunc) gin.IRoutes { // SafeDELETE 回调处理是在service协程中
func (gm *GinModule) SafeDELETE(relativePath string, handlers ...SafeHandlerFunc) gin.IRoutes {
return gm.handleMethod(http.MethodDelete, relativePath, handlers...) return gm.handleMethod(http.MethodDelete, relativePath, handlers...)
} }
func (gm *GinModule) SafePATCH(relativePath string, handlers ...gin.HandlerFunc) gin.IRoutes { // SafePATCH 回调处理是在service协程中
func (gm *GinModule) SafePATCH(relativePath string, handlers ...SafeHandlerFunc) gin.IRoutes {
return gm.handleMethod(http.MethodPatch, relativePath, handlers...) return gm.handleMethod(http.MethodPatch, relativePath, handlers...)
} }
func (gm *GinModule) SafePut(relativePath string, handlers ...gin.HandlerFunc) gin.IRoutes { // SafePut 回调处理是在service协程中
func (gm *GinModule) SafePut(relativePath string, handlers ...SafeHandlerFunc) gin.IRoutes {
return gm.handleMethod(http.MethodPut, relativePath, handlers...) return gm.handleMethod(http.MethodPut, relativePath, handlers...)
} }

View File

@@ -2,7 +2,7 @@ package ginmodule
import ( import (
"fmt" "fmt"
"github.com/duanhf2012/origin/log" "github.com/duanhf2012/origin/v2/log"
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
"time" "time"
) )

View File

@@ -263,7 +263,7 @@ func (mp *MongoPersist) JugeTimeoutSave() bool{
func (mp *MongoPersist) persistCoroutine(){ func (mp *MongoPersist) persistCoroutine(){
defer mp.waitGroup.Done() defer mp.waitGroup.Done()
for atomic.LoadInt32(&mp.stop)==0 || mp.hasPersistData(){ for atomic.LoadInt32(&mp.stop)==0 {
//间隔时间sleep //间隔时间sleep
time.Sleep(time.Second*1) time.Sleep(time.Second*1)