mirror of
https://github.com/duanhf2012/origin.git
synced 2026-02-11 13:04:41 +08:00
Compare commits
10 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c6d0bd9a19 | ||
|
|
61bf95e457 | ||
|
|
8b2a551ee5 | ||
|
|
927c2ffa37 | ||
|
|
b23b30aac5 | ||
|
|
03f8ba0316 | ||
|
|
277480a7f0 | ||
|
|
647a654a36 | ||
|
|
de483a88f1 | ||
|
|
bbbb511b5f |
@@ -54,7 +54,6 @@ type Cluster struct {
|
|||||||
|
|
||||||
discoveryInfo DiscoveryInfo //服务发现配置
|
discoveryInfo DiscoveryInfo //服务发现配置
|
||||||
rpcMode RpcMode
|
rpcMode RpcMode
|
||||||
//masterDiscoveryNodeList []NodeInfo //配置发现Master结点
|
|
||||||
globalCfg interface{} //全局配置
|
globalCfg interface{} //全局配置
|
||||||
|
|
||||||
localServiceCfg map[string]interface{} //map[serviceName]配置数据*
|
localServiceCfg map[string]interface{} //map[serviceName]配置数据*
|
||||||
@@ -70,7 +69,6 @@ type Cluster struct {
|
|||||||
|
|
||||||
rpcEventLocker sync.RWMutex //Rpc事件监听保护锁
|
rpcEventLocker sync.RWMutex //Rpc事件监听保护锁
|
||||||
mapServiceListenRpcEvent map[string]struct{} //ServiceName
|
mapServiceListenRpcEvent map[string]struct{} //ServiceName
|
||||||
mapServiceListenDiscoveryEvent map[string]struct{} //ServiceName
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func GetCluster() *Cluster {
|
func GetCluster() *Cluster {
|
||||||
@@ -228,8 +226,6 @@ func (cls *Cluster) Init(localNodeId string, setupServiceFun SetupServiceFun) er
|
|||||||
}
|
}
|
||||||
service.RegRpcEventFun = cls.RegRpcEvent
|
service.RegRpcEventFun = cls.RegRpcEvent
|
||||||
service.UnRegRpcEventFun = cls.UnRegRpcEvent
|
service.UnRegRpcEventFun = cls.UnRegRpcEvent
|
||||||
service.RegDiscoveryServiceEventFun = cls.RegDiscoveryEvent
|
|
||||||
service.UnRegDiscoveryServiceEventFun = cls.UnReDiscoveryEvent
|
|
||||||
|
|
||||||
err = cls.serviceDiscovery.InitDiscovery(localNodeId, cls.serviceDiscoveryDelNode, cls.serviceDiscoverySetNodeInfo)
|
err = cls.serviceDiscovery.InitDiscovery(localNodeId, cls.serviceDiscoveryDelNode, cls.serviceDiscoverySetNodeInfo)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -322,23 +318,12 @@ func (cls *Cluster) NotifyAllService(event event.IEvent){
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (cls *Cluster) TriggerDiscoveryEvent(bDiscovery bool, nodeId string, serviceName []string) {
|
func (cls *Cluster) TriggerDiscoveryEvent(bDiscovery bool, nodeId string, serviceName []string) {
|
||||||
cls.rpcEventLocker.Lock()
|
var eventData service.DiscoveryServiceEvent
|
||||||
defer cls.rpcEventLocker.Unlock()
|
eventData.IsDiscovery = bDiscovery
|
||||||
|
eventData.NodeId = nodeId
|
||||||
for sName, _ := range cls.mapServiceListenDiscoveryEvent {
|
eventData.ServiceName = serviceName
|
||||||
ser := service.GetService(sName)
|
|
||||||
if ser == nil {
|
|
||||||
log.Error("cannot find service",log.Any("services",serviceName))
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
var eventData service.DiscoveryServiceEvent
|
|
||||||
eventData.IsDiscovery = bDiscovery
|
|
||||||
eventData.NodeId = nodeId
|
|
||||||
eventData.ServiceName = serviceName
|
|
||||||
ser.(service.IModule).NotifyEvent(&eventData)
|
|
||||||
}
|
|
||||||
|
|
||||||
|
cls.NotifyAllService(&eventData)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cls *Cluster) GetLocalNodeInfo() *NodeInfo {
|
func (cls *Cluster) GetLocalNodeInfo() *NodeInfo {
|
||||||
@@ -361,25 +346,6 @@ func (cls *Cluster) UnRegRpcEvent(serviceName string) {
|
|||||||
cls.rpcEventLocker.Unlock()
|
cls.rpcEventLocker.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
func (cls *Cluster) RegDiscoveryEvent(serviceName string) {
|
|
||||||
cls.rpcEventLocker.Lock()
|
|
||||||
if cls.mapServiceListenDiscoveryEvent == nil {
|
|
||||||
cls.mapServiceListenDiscoveryEvent = map[string]struct{}{}
|
|
||||||
}
|
|
||||||
|
|
||||||
cls.mapServiceListenDiscoveryEvent[serviceName] = struct{}{}
|
|
||||||
cls.rpcEventLocker.Unlock()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (cls *Cluster) UnReDiscoveryEvent(serviceName string) {
|
|
||||||
cls.rpcEventLocker.Lock()
|
|
||||||
delete(cls.mapServiceListenDiscoveryEvent, serviceName)
|
|
||||||
cls.rpcEventLocker.Unlock()
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
func HasService(nodeId string, serviceName string) bool {
|
func HasService(nodeId string, serviceName string) bool {
|
||||||
cluster.locker.RLock()
|
cluster.locker.RLock()
|
||||||
defer cluster.locker.RUnlock()
|
defer cluster.locker.RUnlock()
|
||||||
|
|||||||
@@ -183,6 +183,10 @@ func (ds *OriginDiscoveryMaster) OnNodeDisconnect(nodeId string) {
|
|||||||
|
|
||||||
func (ds *OriginDiscoveryMaster) RpcCastGo(serviceMethod string, args interface{}) {
|
func (ds *OriginDiscoveryMaster) RpcCastGo(serviceMethod string, args interface{}) {
|
||||||
for nodeId, _ := range ds.mapNodeInfo {
|
for nodeId, _ := range ds.mapNodeInfo {
|
||||||
|
if nodeId == cluster.GetLocalNodeInfo().NodeId {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
ds.GoNode(nodeId, serviceMethod, args)
|
ds.GoNode(nodeId, serviceMethod, args)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -236,7 +236,6 @@ func (processor *EventProcessor) castEvent(event IEvent){
|
|||||||
|
|
||||||
eventProcessor,ok := processor.mapListenerEvent[event.GetEventType()]
|
eventProcessor,ok := processor.mapListenerEvent[event.GetEventType()]
|
||||||
if ok == false || processor == nil{
|
if ok == false || processor == nil{
|
||||||
log.Debug("event is not listen",log.Int("event type",int(event.GetEventType())))
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -17,6 +17,7 @@ const (
|
|||||||
Sys_Event_QueueTaskFinish EventType = -10
|
Sys_Event_QueueTaskFinish EventType = -10
|
||||||
Sys_Event_Retire EventType = -11
|
Sys_Event_Retire EventType = -11
|
||||||
Sys_Event_EtcdDiscovery EventType = -12
|
Sys_Event_EtcdDiscovery EventType = -12
|
||||||
|
Sys_Event_Gin_Event EventType = -13
|
||||||
|
|
||||||
Sys_Event_User_Define EventType = 1
|
Sys_Event_User_Define EventType = 1
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -117,7 +117,7 @@ func setConfigPath(val interface{}) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func getRunProcessPid(nodeId string) (int, error) {
|
func getRunProcessPid(nodeId string) (int, error) {
|
||||||
f, err := os.OpenFile(fmt.Sprintf("%s_%d.pid", os.Args[0], nodeId), os.O_RDONLY, 0600)
|
f, err := os.OpenFile(fmt.Sprintf("%s_%s.pid", os.Args[0], nodeId), os.O_RDONLY, 0600)
|
||||||
defer f.Close()
|
defer f.Close()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, err
|
return 0, err
|
||||||
|
|||||||
@@ -368,12 +368,12 @@ func (s *Service) UnRegNatsConnListener() {
|
|||||||
func (s *Service) RegDiscoverListener(discoveryServiceListener rpc.IDiscoveryServiceListener) {
|
func (s *Service) RegDiscoverListener(discoveryServiceListener rpc.IDiscoveryServiceListener) {
|
||||||
s.discoveryServiceLister = discoveryServiceListener
|
s.discoveryServiceLister = discoveryServiceListener
|
||||||
s.RegEventReceiverFunc(event.Sys_Event_DiscoverService,s.GetEventHandler(),s.OnDiscoverServiceEvent)
|
s.RegEventReceiverFunc(event.Sys_Event_DiscoverService,s.GetEventHandler(),s.OnDiscoverServiceEvent)
|
||||||
RegDiscoveryServiceEventFun(s.GetName())
|
RegRpcEventFun(s.GetName())
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Service) UnRegDiscoverListener() {
|
func (s *Service) UnRegDiscoverListener() {
|
||||||
s.UnRegEventReceiverFunc(event.Sys_Event_DiscoverService,s.GetEventHandler())
|
s.UnRegEventReceiverFunc(event.Sys_Event_DiscoverService,s.GetEventHandler())
|
||||||
UnRegDiscoveryServiceEventFun(s.GetName())
|
UnRegRpcEventFun(s.GetName())
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Service) PushRpcRequest(rpcRequest *rpc.RpcRequest) error{
|
func (s *Service) PushRpcRequest(rpcRequest *rpc.RpcRequest) error{
|
||||||
|
|||||||
@@ -14,9 +14,6 @@ type RegDiscoveryServiceEventFunType func(serviceName string)
|
|||||||
var RegRpcEventFun RegRpcEventFunType
|
var RegRpcEventFun RegRpcEventFunType
|
||||||
var UnRegRpcEventFun RegRpcEventFunType
|
var UnRegRpcEventFun RegRpcEventFunType
|
||||||
|
|
||||||
var RegDiscoveryServiceEventFun RegDiscoveryServiceEventFunType
|
|
||||||
var UnRegDiscoveryServiceEventFun RegDiscoveryServiceEventFunType
|
|
||||||
|
|
||||||
func init(){
|
func init(){
|
||||||
mapServiceName = map[string]IService{}
|
mapServiceName = map[string]IService{}
|
||||||
setupServiceList = []IService{}
|
setupServiceList = []IService{}
|
||||||
|
|||||||
@@ -2,7 +2,6 @@ package ginmodule
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"datacenter/common/processor"
|
|
||||||
"github.com/duanhf2012/origin/v2/event"
|
"github.com/duanhf2012/origin/v2/event"
|
||||||
"github.com/duanhf2012/origin/v2/log"
|
"github.com/duanhf2012/origin/v2/log"
|
||||||
"github.com/duanhf2012/origin/v2/service"
|
"github.com/duanhf2012/origin/v2/service"
|
||||||
@@ -10,34 +9,36 @@ import (
|
|||||||
"log/slog"
|
"log/slog"
|
||||||
"net/http"
|
"net/http"
|
||||||
"strings"
|
"strings"
|
||||||
|
"time"
|
||||||
|
"io"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type IGinProcessor interface {
|
||||||
|
Process(data *gin.Context) (*gin.Context, error)
|
||||||
|
}
|
||||||
|
|
||||||
type GinModule struct {
|
type GinModule struct {
|
||||||
service.Module
|
service.Module
|
||||||
|
|
||||||
*GinConf
|
|
||||||
*gin.Engine
|
*gin.Engine
|
||||||
srv *http.Server
|
srv *http.Server
|
||||||
|
|
||||||
processor []processor.IGinProcessor
|
listenAddr string
|
||||||
|
handleTimeout time.Duration
|
||||||
|
processor []IGinProcessor
|
||||||
}
|
}
|
||||||
|
|
||||||
type GinConf struct {
|
func (gm *GinModule) Init(addr string, handleTimeout time.Duration,engine *gin.Engine) {
|
||||||
Addr string
|
gm.listenAddr = addr
|
||||||
}
|
gm.handleTimeout = handleTimeout
|
||||||
|
|
||||||
const Sys_Event_Gin_Event event.EventType = -11
|
|
||||||
|
|
||||||
func (gm *GinModule) Init(conf *GinConf, engine *gin.Engine) {
|
|
||||||
gm.GinConf = conf
|
|
||||||
gm.Engine = engine
|
gm.Engine = engine
|
||||||
}
|
}
|
||||||
|
|
||||||
func (gm *GinModule) SetupDataProcessor(processor ...processor.IGinProcessor) {
|
func (gm *GinModule) SetupDataProcessor(processor ...IGinProcessor) {
|
||||||
gm.processor = processor
|
gm.processor = processor
|
||||||
}
|
}
|
||||||
|
|
||||||
func (gm *GinModule) AppendDataProcessor(processor ...processor.IGinProcessor) {
|
func (gm *GinModule) AppendDataProcessor(processor ...IGinProcessor) {
|
||||||
gm.processor = append(gm.processor, processor...)
|
gm.processor = append(gm.processor, processor...)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -47,27 +48,28 @@ func (gm *GinModule) OnInit() error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
gm.srv = &http.Server{
|
gm.srv = &http.Server{
|
||||||
Addr: gm.Addr,
|
Addr: gm.listenAddr,
|
||||||
Handler: gm.Engine,
|
Handler: gm.Engine,
|
||||||
}
|
}
|
||||||
|
|
||||||
gm.Engine.Use(Logger())
|
gm.Engine.Use(Logger())
|
||||||
gm.Engine.Use(gin.Recovery())
|
gm.Engine.Use(gin.Recovery())
|
||||||
gm.GetEventProcessor().RegEventReceiverFunc(Sys_Event_Gin_Event, gm.GetEventHandler(), gm.eventHandler)
|
gm.GetEventProcessor().RegEventReceiverFunc(event.Sys_Event_Gin_Event, gm.GetEventHandler(), gm.eventHandler)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (gm *GinModule) eventHandler(ev event.IEvent) {
|
func (gm *GinModule) eventHandler(ev event.IEvent) {
|
||||||
ginEvent := ev.(*GinEvent)
|
ginEvent := ev.(*GinEvent)
|
||||||
for _, handler := range ginEvent.handlersChain {
|
for _, handler := range ginEvent.handlersChain {
|
||||||
handler(ginEvent.c)
|
handler(&ginEvent.c)
|
||||||
}
|
}
|
||||||
|
|
||||||
ginEvent.chanWait <- struct{}{}
|
//ginEvent.chanWait <- struct{}{}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (gm *GinModule) Start() {
|
func (gm *GinModule) Start() {
|
||||||
log.Info("http start listen", slog.Any("addr", gm.Addr))
|
gm.srv.Addr = gm.listenAddr
|
||||||
|
log.Info("http start listen", slog.Any("addr", gm.listenAddr))
|
||||||
go func() {
|
go func() {
|
||||||
err := gm.srv.ListenAndServe()
|
err := gm.srv.ListenAndServe()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -77,7 +79,7 @@ func (gm *GinModule) Start() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (gm *GinModule) StartTLS(certFile, keyFile string) {
|
func (gm *GinModule) StartTLS(certFile, keyFile string) {
|
||||||
log.Info("http start listen", slog.Any("addr", gm.Addr))
|
log.Info("http start listen", slog.Any("addr", gm.listenAddr))
|
||||||
go func() {
|
go func() {
|
||||||
err := gm.srv.ListenAndServeTLS(certFile, keyFile)
|
err := gm.srv.ListenAndServeTLS(certFile, keyFile)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -92,17 +94,102 @@ func (gm *GinModule) Stop(ctx context.Context) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
type GinEvent struct {
|
type SafeContext struct {
|
||||||
handlersChain gin.HandlersChain
|
*gin.Context
|
||||||
chanWait chan struct{}
|
chanWait chan struct{}
|
||||||
c *gin.Context
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *SafeContext) JSONAndDone(code int, obj any) {
|
||||||
|
c.Context.JSON(code,obj)
|
||||||
|
c.Done()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *SafeContext) AsciiJSONAndDone(code int, obj any){
|
||||||
|
c.Context.AsciiJSON(code,obj)
|
||||||
|
c.Done()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *SafeContext) PureJSONAndDone(code int, obj any){
|
||||||
|
c.Context.PureJSON(code,obj)
|
||||||
|
c.Done()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *SafeContext) XMLAndDone(code int, obj any){
|
||||||
|
c.Context.XML(code,obj)
|
||||||
|
c.Done()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *SafeContext) YAMLAndDone(code int, obj any){
|
||||||
|
c.Context.YAML(code,obj)
|
||||||
|
c.Done()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *SafeContext) TOMLAndDone(code int, obj any){
|
||||||
|
c.Context.TOML(code,obj)
|
||||||
|
c.Done()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *SafeContext) ProtoBufAndDone(code int, obj any){
|
||||||
|
c.Context.ProtoBuf(code,obj)
|
||||||
|
c.Done()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *SafeContext) StringAndDone(code int, format string, values ...any){
|
||||||
|
c.Context.String(code,format,values...)
|
||||||
|
c.Done()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *SafeContext) RedirectAndDone(code int, location string){
|
||||||
|
c.Context.Redirect(code,location)
|
||||||
|
c.Done()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *SafeContext) DataAndDone(code int, contentType string, data []byte){
|
||||||
|
c.Context.Data(code,contentType,data)
|
||||||
|
c.Done()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *SafeContext) DataFromReaderAndDone(code int, contentLength int64, contentType string, reader io.Reader, extraHeaders map[string]string){
|
||||||
|
c.DataFromReader(code,contentLength,contentType,reader,extraHeaders)
|
||||||
|
c.Done()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *SafeContext) HTMLAndDone(code int, name string, obj any){
|
||||||
|
c.Context.HTML(code,name,obj)
|
||||||
|
c.Done()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *SafeContext) IndentedJSONAndDone(code int, obj any){
|
||||||
|
c.Context.IndentedJSON(code,obj)
|
||||||
|
c.Done()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *SafeContext) SecureJSONAndDone(code int, obj any){
|
||||||
|
c.Context.SecureJSON(code,obj)
|
||||||
|
c.Done()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *SafeContext) JSONPAndDone(code int, obj any){
|
||||||
|
c.Context.JSONP(code,obj)
|
||||||
|
c.Done()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *SafeContext) Done(){
|
||||||
|
c.chanWait <- struct{}{}
|
||||||
|
}
|
||||||
|
|
||||||
|
type GinEvent struct {
|
||||||
|
handlersChain []SafeHandlerFunc
|
||||||
|
c SafeContext
|
||||||
|
}
|
||||||
|
|
||||||
|
type SafeHandlerFunc func(*SafeContext)
|
||||||
|
|
||||||
func (ge *GinEvent) GetEventType() event.EventType {
|
func (ge *GinEvent) GetEventType() event.EventType {
|
||||||
return Sys_Event_Gin_Event
|
return event.Sys_Event_Gin_Event
|
||||||
}
|
}
|
||||||
|
|
||||||
func (gm *GinModule) handleMethod(httpMethod, relativePath string, handlers ...gin.HandlerFunc) gin.IRoutes {
|
func (gm *GinModule) handleMethod(httpMethod, relativePath string, handlers ...SafeHandlerFunc) gin.IRoutes {
|
||||||
return gm.Engine.Handle(httpMethod, relativePath, func(c *gin.Context) {
|
return gm.Engine.Handle(httpMethod, relativePath, func(c *gin.Context) {
|
||||||
for _, p := range gm.processor {
|
for _, p := range gm.processor {
|
||||||
_, err := p.Process(c)
|
_, err := p.Process(c)
|
||||||
@@ -112,33 +199,71 @@ func (gm *GinModule) handleMethod(httpMethod, relativePath string, handlers ...g
|
|||||||
}
|
}
|
||||||
|
|
||||||
var ev GinEvent
|
var ev GinEvent
|
||||||
chanWait := make(chan struct{})
|
chanWait := make(chan struct{},2)
|
||||||
ev.chanWait = chanWait
|
ev.c.chanWait = chanWait
|
||||||
ev.handlersChain = handlers
|
ev.handlersChain = handlers
|
||||||
ev.c = c
|
ev.c.Context = c
|
||||||
gm.NotifyEvent(&ev)
|
gm.NotifyEvent(&ev)
|
||||||
|
|
||||||
<-chanWait
|
ctx,cancel := context.WithTimeout(context.Background(), gm.handleTimeout)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
select{
|
||||||
|
case <-ctx.Done():
|
||||||
|
log.Error("GinModule process timeout", slog.Any("path", c.Request.URL.Path))
|
||||||
|
c.AbortWithStatus(http.StatusRequestTimeout)
|
||||||
|
case <-chanWait:
|
||||||
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (gm *GinModule) SafeGET(relativePath string, handlers ...gin.HandlerFunc) gin.IRoutes {
|
// GET 回调处理是在gin协程中
|
||||||
|
func (gm *GinModule) GET(relativePath string, handlers ...gin.HandlerFunc) gin.IRoutes {
|
||||||
|
return gm.Engine.GET(relativePath, handlers...)
|
||||||
|
}
|
||||||
|
|
||||||
|
// POST 回调处理是在gin协程中
|
||||||
|
func (gm *GinModule) POST(relativePath string, handlers ...gin.HandlerFunc) gin.IRoutes {
|
||||||
|
return gm.Engine.POST(relativePath, handlers...)
|
||||||
|
}
|
||||||
|
|
||||||
|
// DELETE 回调处理是在gin协程中
|
||||||
|
func (gm *GinModule) DELETE(relativePath string, handlers ...gin.HandlerFunc) gin.IRoutes {
|
||||||
|
return gm.Engine.DELETE(relativePath, handlers...)
|
||||||
|
}
|
||||||
|
|
||||||
|
// PATCH 回调处理是在gin协程中
|
||||||
|
func (gm *GinModule) PATCH(relativePath string, handlers ...gin.HandlerFunc) gin.IRoutes {
|
||||||
|
return gm.Engine.PATCH(relativePath, handlers...)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Put 回调处理是在gin协程中
|
||||||
|
func (gm *GinModule) Put(relativePath string, handlers ...gin.HandlerFunc) gin.IRoutes {
|
||||||
|
return gm.Engine.PUT(relativePath, handlers...)
|
||||||
|
}
|
||||||
|
|
||||||
|
// SafeGET 回调处理是在service协程中
|
||||||
|
func (gm *GinModule) SafeGET(relativePath string, handlers ...SafeHandlerFunc) gin.IRoutes {
|
||||||
return gm.handleMethod(http.MethodGet, relativePath, handlers...)
|
return gm.handleMethod(http.MethodGet, relativePath, handlers...)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (gm *GinModule) SafePOST(relativePath string, handlers ...gin.HandlerFunc) gin.IRoutes {
|
// SafePOST 回调处理是在service协程中
|
||||||
|
func (gm *GinModule) SafePOST(relativePath string, handlers ...SafeHandlerFunc) gin.IRoutes {
|
||||||
return gm.handleMethod(http.MethodPost, relativePath, handlers...)
|
return gm.handleMethod(http.MethodPost, relativePath, handlers...)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (gm *GinModule) SafeDELETE(relativePath string, handlers ...gin.HandlerFunc) gin.IRoutes {
|
// SafeDELETE 回调处理是在service协程中
|
||||||
|
func (gm *GinModule) SafeDELETE(relativePath string, handlers ...SafeHandlerFunc) gin.IRoutes {
|
||||||
return gm.handleMethod(http.MethodDelete, relativePath, handlers...)
|
return gm.handleMethod(http.MethodDelete, relativePath, handlers...)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (gm *GinModule) SafePATCH(relativePath string, handlers ...gin.HandlerFunc) gin.IRoutes {
|
// SafePATCH 回调处理是在service协程中
|
||||||
|
func (gm *GinModule) SafePATCH(relativePath string, handlers ...SafeHandlerFunc) gin.IRoutes {
|
||||||
return gm.handleMethod(http.MethodPatch, relativePath, handlers...)
|
return gm.handleMethod(http.MethodPatch, relativePath, handlers...)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (gm *GinModule) SafePut(relativePath string, handlers ...gin.HandlerFunc) gin.IRoutes {
|
// SafePut 回调处理是在service协程中
|
||||||
|
func (gm *GinModule) SafePut(relativePath string, handlers ...SafeHandlerFunc) gin.IRoutes {
|
||||||
return gm.handleMethod(http.MethodPut, relativePath, handlers...)
|
return gm.handleMethod(http.MethodPut, relativePath, handlers...)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -2,7 +2,7 @@ package ginmodule
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"github.com/duanhf2012/origin/log"
|
"github.com/duanhf2012/origin/v2/log"
|
||||||
"github.com/gin-gonic/gin"
|
"github.com/gin-gonic/gin"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|||||||
65
sysmodule/kafkamodule/Admin.go
Normal file
65
sysmodule/kafkamodule/Admin.go
Normal file
@@ -0,0 +1,65 @@
|
|||||||
|
package kafkamodule
|
||||||
|
|
||||||
|
import "github.com/IBM/sarama"
|
||||||
|
|
||||||
|
type KafkaAdmin struct {
|
||||||
|
sarama.ClusterAdmin
|
||||||
|
|
||||||
|
mapTopic map[string]sarama.TopicDetail
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ka *KafkaAdmin) Setup(kafkaVersion string, addrs []string) error {
|
||||||
|
config := sarama.NewConfig()
|
||||||
|
var err error
|
||||||
|
config.Version, err = sarama.ParseKafkaVersion(kafkaVersion)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
ka.ClusterAdmin, err = sarama.NewClusterAdmin(addrs, config)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
ka.mapTopic, err = ka.GetTopics()
|
||||||
|
if err != nil {
|
||||||
|
ka.ClusterAdmin.Close()
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ka *KafkaAdmin) RefreshTopic() error {
|
||||||
|
var err error
|
||||||
|
ka.mapTopic, err = ka.GetTopics()
|
||||||
|
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ka *KafkaAdmin) HasTopic(topic string) bool {
|
||||||
|
_, ok := ka.mapTopic[topic]
|
||||||
|
|
||||||
|
return ok
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ka *KafkaAdmin) GetTopicDetail(topic string) *sarama.TopicDetail {
|
||||||
|
topicDetail, ok := ka.mapTopic[topic]
|
||||||
|
if ok == false {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return &topicDetail
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ka *KafkaAdmin) GetTopics() (map[string]sarama.TopicDetail, error) {
|
||||||
|
return ka.ListTopics()
|
||||||
|
}
|
||||||
|
|
||||||
|
// CreateTopic 创建主题
|
||||||
|
// numPartitions分区数
|
||||||
|
// replicationFactor副本数
|
||||||
|
// validateOnly参数执行操作时只进行参数验证而不实际执行操作
|
||||||
|
func (ka *KafkaAdmin) CreateTopic(topic string, numPartitions int32, replicationFactor int16, validateOnly bool) error {
|
||||||
|
return ka.ClusterAdmin.CreateTopic(topic, &sarama.TopicDetail{NumPartitions: numPartitions, ReplicationFactor: replicationFactor}, validateOnly)
|
||||||
|
}
|
||||||
289
sysmodule/kafkamodule/Consumer.go
Normal file
289
sysmodule/kafkamodule/Consumer.go
Normal file
@@ -0,0 +1,289 @@
|
|||||||
|
package kafkamodule
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"github.com/IBM/sarama"
|
||||||
|
"github.com/duanhf2012/origin/v2/log"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
type ConsumerGroup struct {
|
||||||
|
sarama.ConsumerGroup
|
||||||
|
waitGroup sync.WaitGroup
|
||||||
|
|
||||||
|
chanExit chan error
|
||||||
|
ready chan bool
|
||||||
|
cancel context.CancelFunc
|
||||||
|
groupId string
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewConsumerConfig(kafkaVersion string, assignor string, offsetsInitial int64) (*sarama.Config, error) {
|
||||||
|
var err error
|
||||||
|
|
||||||
|
config := sarama.NewConfig()
|
||||||
|
config.Version, err = sarama.ParseKafkaVersion(kafkaVersion)
|
||||||
|
config.Consumer.Offsets.Initial = offsetsInitial
|
||||||
|
config.Consumer.Offsets.AutoCommit.Enable = false
|
||||||
|
|
||||||
|
switch assignor {
|
||||||
|
case "sticky":
|
||||||
|
// 黏性roundRobin,rebalance之后首先保证前面的分配,从后面剥离
|
||||||
|
// topic:T0{P0,P1,P2,P3,P4,P5},消费者:C1,C2
|
||||||
|
// ---------------before rebalance:即roundRobin
|
||||||
|
// C1: T0{P0} T0{P2} T0{P4}
|
||||||
|
// C2: T0{P1} T0{P3} T0{P5}
|
||||||
|
// ----------------after rebalance:增加了一个消费者
|
||||||
|
// C1: T0{P0} T0{P2}
|
||||||
|
// C2: T0{P1} T0{P3}
|
||||||
|
// C3: T0{P4} T0{P5} until每个消费者的分区数误差不超过1
|
||||||
|
config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.NewBalanceStrategySticky()}
|
||||||
|
case "roundrobin":
|
||||||
|
// roundRobin --逐个平均分发
|
||||||
|
// topic: T0{P0,P1,P2},T1{P0,P1,P2,P3}两个消费者C1,C2
|
||||||
|
// C1: T0{P0} T0{P2} T1{P1} T1{P3}
|
||||||
|
// C2: T0{P1} T1{P0} T1{P2}
|
||||||
|
config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.NewBalanceStrategyRoundRobin()}
|
||||||
|
case "range":
|
||||||
|
// 默认值 --一次平均分发
|
||||||
|
// topic: T0{P0,P1,P2,P3},T1{P0,P1,P2,P3},两个消费者C1,C2
|
||||||
|
// T1分区总数6 / 消费者数2 = 3 ,即该会话的分区每个消费者分3个
|
||||||
|
// T2分区总数4 / 消费者数2 = 2 ,即该会话的分区每个消费者分2个
|
||||||
|
// C1: T0{P0, P1, P2} T1{P0, P1}
|
||||||
|
// C2: T0{P3, P4, P5} T1{P2, P3}
|
||||||
|
config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.NewBalanceStrategyRange()}
|
||||||
|
default:
|
||||||
|
return nil, fmt.Errorf("Unrecognized consumer group partition assignor: %s", assignor)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return config, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type IMsgReceiver interface {
|
||||||
|
Receiver(msgs []*sarama.ConsumerMessage) bool
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *ConsumerGroup) Setup(addr []string, topics []string, groupId string, config *sarama.Config, receiverInterval time.Duration, maxReceiverNum int, msgReceiver IMsgReceiver) error {
|
||||||
|
var err error
|
||||||
|
c.ConsumerGroup, err = sarama.NewConsumerGroup(addr, groupId, config)
|
||||||
|
if err != nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
c.groupId = groupId
|
||||||
|
c.chanExit = make(chan error, 1)
|
||||||
|
|
||||||
|
var handler ConsumerGroupHandler
|
||||||
|
handler.receiver = msgReceiver
|
||||||
|
handler.maxReceiverNum = maxReceiverNum
|
||||||
|
handler.receiverInterval = receiverInterval
|
||||||
|
handler.chanExit = c.chanExit
|
||||||
|
|
||||||
|
var ctx context.Context
|
||||||
|
ctx, c.cancel = context.WithCancel(context.Background())
|
||||||
|
|
||||||
|
c.waitGroup.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer c.waitGroup.Done()
|
||||||
|
|
||||||
|
for {
|
||||||
|
if err = c.Consume(ctx, topics, &handler); err != nil {
|
||||||
|
// 当setup失败的时候,error会返回到这里
|
||||||
|
log.Error("Error from consumer", log.Any("err", err))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// check if context was cancelled, signaling that the consumer should stop
|
||||||
|
if ctx.Err() != nil {
|
||||||
|
log.Info("consumer stop", log.Any("info", ctx.Err()))
|
||||||
|
}
|
||||||
|
|
||||||
|
c.chanExit <- err
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
err = <-c.chanExit
|
||||||
|
//已经准备好了
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *ConsumerGroup) Close() {
|
||||||
|
log.Info("close consumerGroup")
|
||||||
|
//1.cancel掉
|
||||||
|
c.cancel()
|
||||||
|
|
||||||
|
//2.关闭连接
|
||||||
|
err := c.ConsumerGroup.Close()
|
||||||
|
if err != nil {
|
||||||
|
log.Error("close consumerGroup fail", log.Any("err", err.Error()))
|
||||||
|
}
|
||||||
|
|
||||||
|
//3.等待退出
|
||||||
|
c.waitGroup.Wait()
|
||||||
|
}
|
||||||
|
|
||||||
|
type ConsumerGroupHandler struct {
|
||||||
|
receiver IMsgReceiver
|
||||||
|
|
||||||
|
receiverInterval time.Duration
|
||||||
|
maxReceiverNum int
|
||||||
|
|
||||||
|
//mapTopicOffset map[string]map[int32]int //map[topic]map[partitionId]offsetInfo
|
||||||
|
mapTopicData map[string]*MsgData
|
||||||
|
mx sync.Mutex
|
||||||
|
|
||||||
|
chanExit chan error
|
||||||
|
isRebalance bool //是否为再平衡
|
||||||
|
//stopSig *int32
|
||||||
|
}
|
||||||
|
|
||||||
|
type MsgData struct {
|
||||||
|
sync.Mutex
|
||||||
|
msg []*sarama.ConsumerMessage
|
||||||
|
|
||||||
|
mapPartitionOffset map[int32]int64
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ch *ConsumerGroupHandler) Flush(session sarama.ConsumerGroupSession, topic string) {
|
||||||
|
if topic != "" {
|
||||||
|
msgData := ch.GetMsgData(topic)
|
||||||
|
msgData.flush(session, ch.receiver, topic)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
for tp, msgData := range ch.mapTopicData {
|
||||||
|
msgData.flush(session, ch.receiver, tp)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ch *ConsumerGroupHandler) GetMsgData(topic string) *MsgData {
|
||||||
|
ch.mx.Lock()
|
||||||
|
defer ch.mx.Unlock()
|
||||||
|
|
||||||
|
msgData := ch.mapTopicData[topic]
|
||||||
|
if msgData == nil {
|
||||||
|
msgData = &MsgData{}
|
||||||
|
msgData.msg = make([]*sarama.ConsumerMessage, 0, ch.maxReceiverNum)
|
||||||
|
ch.mapTopicData[topic] = msgData
|
||||||
|
}
|
||||||
|
|
||||||
|
return msgData
|
||||||
|
}
|
||||||
|
|
||||||
|
func (md *MsgData) flush(session sarama.ConsumerGroupSession, receiver IMsgReceiver, topic string) {
|
||||||
|
if len(md.msg) == 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
//发送给接收者
|
||||||
|
for {
|
||||||
|
ok := receiver.Receiver(md.msg)
|
||||||
|
if ok == true {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for pId, offset := range md.mapPartitionOffset {
|
||||||
|
|
||||||
|
session.MarkOffset(topic, pId, offset+1, "")
|
||||||
|
log.Info(fmt.Sprintf("topic %s,pid %d,offset %d", topic, pId, offset+1))
|
||||||
|
}
|
||||||
|
session.Commit()
|
||||||
|
//log.Info("commit")
|
||||||
|
//time.Sleep(1000 * time.Second)
|
||||||
|
//置空
|
||||||
|
md.msg = md.msg[:0]
|
||||||
|
clear(md.mapPartitionOffset)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (md *MsgData) appendMsg(session sarama.ConsumerGroupSession, msg *sarama.ConsumerMessage, receiver IMsgReceiver, maxReceiverNum int) {
|
||||||
|
md.Lock()
|
||||||
|
defer md.Unlock()
|
||||||
|
|
||||||
|
//收到的offset只会越来越大在
|
||||||
|
if md.mapPartitionOffset == nil {
|
||||||
|
md.mapPartitionOffset = make(map[int32]int64, 10)
|
||||||
|
}
|
||||||
|
|
||||||
|
md.mapPartitionOffset[msg.Partition] = msg.Offset
|
||||||
|
|
||||||
|
md.msg = append(md.msg, msg)
|
||||||
|
if len(md.msg) < maxReceiverNum {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
md.flush(session, receiver, msg.Topic)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ch *ConsumerGroupHandler) AppendMsg(session sarama.ConsumerGroupSession, msg *sarama.ConsumerMessage) {
|
||||||
|
dataMsg := ch.GetMsgData(msg.Topic)
|
||||||
|
dataMsg.appendMsg(session, msg, ch.receiver, ch.maxReceiverNum)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ch *ConsumerGroupHandler) Setup(session sarama.ConsumerGroupSession) error {
|
||||||
|
ch.mapTopicData = make(map[string]*MsgData, 128)
|
||||||
|
|
||||||
|
if ch.isRebalance == false {
|
||||||
|
ch.chanExit <- nil
|
||||||
|
}
|
||||||
|
|
||||||
|
ch.isRebalance = true
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ch *ConsumerGroupHandler) Cleanup(session sarama.ConsumerGroupSession) error {
|
||||||
|
ch.Flush(session, "")
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ch *ConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
|
||||||
|
|
||||||
|
ticker := time.NewTicker(ch.receiverInterval)
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case msg := <-claim.Messages():
|
||||||
|
if msg == nil {
|
||||||
|
log.SWarning("claim will exit", log.Any("topic", claim.Topic()), log.Any("Partition", claim.Partition()))
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
ch.AppendMsg(session, msg)
|
||||||
|
case <-ticker.C:
|
||||||
|
ch.Flush(session, claim.Topic())
|
||||||
|
case <-session.Context().Done():
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
阿里云参数说明:https://sls.aliyun.com/doc/oscompatibledemo/sarama_go_kafka_consume.html
|
||||||
|
conf.Net.TLS.Enable = true
|
||||||
|
conf.Net.SASL.Enable = true
|
||||||
|
conf.Net.SASL.User = project
|
||||||
|
conf.Net.SASL.Password = fmt.Sprintf("%s#%s", accessId, accessKey)
|
||||||
|
conf.Net.SASL.Mechanism = "PLAIN"
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
conf.Net.TLS.Enable = true
|
||||||
|
conf.Net.SASL.Enable = true
|
||||||
|
conf.Net.SASL.User = project
|
||||||
|
conf.Net.SASL.Password = fmt.Sprintf("%s#%s", accessId, accessKey)
|
||||||
|
conf.Net.SASL.Mechanism = "PLAIN"
|
||||||
|
|
||||||
|
conf.Consumer.Fetch.Min = 1
|
||||||
|
conf.Consumer.Fetch.Default = 1024 * 1024
|
||||||
|
conf.Consumer.Retry.Backoff = 2 * time.Second
|
||||||
|
conf.Consumer.MaxWaitTime = 250 * time.Millisecond
|
||||||
|
conf.Consumer.MaxProcessingTime = 100 * time.Millisecond
|
||||||
|
conf.Consumer.Return.Errors = false
|
||||||
|
conf.Consumer.Offsets.AutoCommit.Enable = true
|
||||||
|
conf.Consumer.Offsets.AutoCommit.Interval = 1 * time.Second
|
||||||
|
conf.Consumer.Offsets.Initial = sarama.OffsetOldest
|
||||||
|
conf.Consumer.Offsets.Retry.Max = 3
|
||||||
|
*/
|
||||||
146
sysmodule/kafkamodule/Producer.go
Normal file
146
sysmodule/kafkamodule/Producer.go
Normal file
@@ -0,0 +1,146 @@
|
|||||||
|
package kafkamodule
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"github.com/IBM/sarama"
|
||||||
|
"github.com/duanhf2012/origin/v2/log"
|
||||||
|
"github.com/duanhf2012/origin/v2/service"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
type IProducer interface {
|
||||||
|
}
|
||||||
|
|
||||||
|
type SyncProducer struct {
|
||||||
|
}
|
||||||
|
|
||||||
|
type AsyncProducer struct {
|
||||||
|
}
|
||||||
|
|
||||||
|
type Producer struct {
|
||||||
|
service.Module
|
||||||
|
|
||||||
|
sarama.SyncProducer
|
||||||
|
|
||||||
|
sarama.AsyncProducer
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewProducerConfig 新建producerConfig
|
||||||
|
// kafkaVersion kafka版本
|
||||||
|
// returnErr,returnSucc 是否返回错误与成功
|
||||||
|
// requiredAcks -1 #全量同步确认,强可靠性保证(当所有的 leader 和 follower 都接收成功时)#WaitForAll 1 #leader 确认收到, 默认(仅 leader 反馈)#WaitForLocal 0 #不确认,但是吞吐量大(不 care 结果) #NoResponse
|
||||||
|
// Idempotent(幂等) 确保信息都准确写入一份副本,用于幂等生产者,当这一项设置为true的时候,生产者将保证生产的消息一定是有序且精确一次的
|
||||||
|
// partitioner 生成分区器,用于选择向哪个分区发送信息,默认情况下对消息密钥进行散列
|
||||||
|
func NewProducerConfig(kafkaVersion string, returnErr bool, returnSucc bool, requiredAcks sarama.RequiredAcks, Idempotent bool,
|
||||||
|
partitioner sarama.PartitionerConstructor) (*sarama.Config, error) {
|
||||||
|
config := sarama.NewConfig()
|
||||||
|
var err error
|
||||||
|
config.Version, err = sarama.ParseKafkaVersion(kafkaVersion)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
config.Producer.Return.Errors = returnErr
|
||||||
|
config.Producer.Return.Successes = returnSucc
|
||||||
|
config.Producer.RequiredAcks = requiredAcks
|
||||||
|
config.Producer.Partitioner = partitioner
|
||||||
|
config.Producer.Timeout = 10 * time.Second
|
||||||
|
|
||||||
|
config.Producer.Idempotent = Idempotent
|
||||||
|
if Idempotent == true {
|
||||||
|
config.Net.MaxOpenRequests = 1
|
||||||
|
}
|
||||||
|
return config, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *Producer) SyncSetup(addr []string, config *sarama.Config) error {
|
||||||
|
var err error
|
||||||
|
p.SyncProducer, err = sarama.NewSyncProducer(addr, config)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *Producer) ASyncSetup(addr []string, config *sarama.Config) error {
|
||||||
|
var err error
|
||||||
|
p.AsyncProducer, err = sarama.NewAsyncProducer(addr, config)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
p.asyncRun()
|
||||||
|
}()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *Producer) asyncRun() {
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case sm := <-p.Successes():
|
||||||
|
if sm.Metadata == nil {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
asyncReturn := sm.Metadata.(*AsyncReturn)
|
||||||
|
asyncReturn.chanReturn <- asyncReturn
|
||||||
|
case em := <-p.Errors():
|
||||||
|
log.Error("async kafkamodule error", log.ErrorAttr("err", em.Err))
|
||||||
|
if em.Msg.Metadata == nil {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
asyncReturn := em.Msg.Metadata.(*AsyncReturn)
|
||||||
|
asyncReturn.Err = em.Err
|
||||||
|
asyncReturn.chanReturn <- asyncReturn
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type AsyncReturn struct {
|
||||||
|
Msg *sarama.ProducerMessage
|
||||||
|
Err error
|
||||||
|
chanReturn chan *AsyncReturn
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ar *AsyncReturn) WaitOk(ctx context.Context) (*sarama.ProducerMessage, error) {
|
||||||
|
asyncReturn := ar.Msg.Metadata.(*AsyncReturn)
|
||||||
|
select {
|
||||||
|
case <-asyncReturn.chanReturn:
|
||||||
|
return asyncReturn.Msg, asyncReturn.Err
|
||||||
|
case <-ctx.Done():
|
||||||
|
return nil, ctx.Err()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *Producer) AsyncSendMessage(msg *sarama.ProducerMessage) *AsyncReturn {
|
||||||
|
asyncReturn := AsyncReturn{Msg: msg, chanReturn: make(chan *AsyncReturn, 1)}
|
||||||
|
msg.Metadata = &asyncReturn
|
||||||
|
p.AsyncProducer.Input() <- msg
|
||||||
|
|
||||||
|
return &asyncReturn
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *Producer) AsyncPushMessage(msg *sarama.ProducerMessage) {
|
||||||
|
p.AsyncProducer.Input() <- msg
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *Producer) Close() {
|
||||||
|
if p.SyncProducer != nil {
|
||||||
|
p.SyncProducer.Close()
|
||||||
|
p.SyncProducer = nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if p.AsyncProducer != nil {
|
||||||
|
p.AsyncProducer.Close()
|
||||||
|
p.AsyncProducer = nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *Producer) SendMessage(msg *sarama.ProducerMessage) (partition int32, offset int64, err error) {
|
||||||
|
return p.SyncProducer.SendMessage(msg)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *Producer) SendMessages(msgs []*sarama.ProducerMessage) error {
|
||||||
|
return p.SyncProducer.SendMessages(msgs)
|
||||||
|
}
|
||||||
151
sysmodule/kafkamodule/ProducerAndConsumer_test.go
Normal file
151
sysmodule/kafkamodule/ProducerAndConsumer_test.go
Normal file
@@ -0,0 +1,151 @@
|
|||||||
|
package kafkamodule
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"github.com/IBM/sarama"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
// 对各参数和机制名称的说明:https://blog.csdn.net/u013311345/article/details/129217728
|
||||||
|
type MsgReceiver struct {
|
||||||
|
t *testing.T
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mr *MsgReceiver) Receiver(msgs []*sarama.ConsumerMessage) bool {
|
||||||
|
for _, m := range msgs {
|
||||||
|
mr.t.Logf("time:%s, topic:%s, partition:%d, offset:%d, key:%s, value:%s", time.Now().Format("2006-01-02 15:04:05.000"), m.Topic, m.Partition, m.Offset, m.Key, string(m.Value))
|
||||||
|
}
|
||||||
|
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
var addr = []string{"192.168.13.24:9092", "192.168.13.24:9093", "192.168.13.24:9094", "192.168.13.24:9095"}
|
||||||
|
var topicName = []string{"test_topic_1", "test_topic_2"}
|
||||||
|
var kafkaVersion = "3.3.1"
|
||||||
|
|
||||||
|
func producer(t *testing.T) {
|
||||||
|
var admin KafkaAdmin
|
||||||
|
err := admin.Setup(kafkaVersion, addr)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tName := range topicName {
|
||||||
|
if admin.HasTopic(tName) == false {
|
||||||
|
err = admin.CreateTopic(tName, 2, 2, false)
|
||||||
|
t.Log(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
var pd Producer
|
||||||
|
cfg, err := NewProducerConfig(kafkaVersion, true, true, sarama.WaitForAll, false, sarama.NewHashPartitioner)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
err = pd.SyncSetup(addr, cfg)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
now := time.Now()
|
||||||
|
|
||||||
|
//msgs := make([]*sarama.ProducerMessage, 0, 20000)
|
||||||
|
for i := 0; i < 20000; i++ {
|
||||||
|
var msg sarama.ProducerMessage
|
||||||
|
msg.Key = sarama.StringEncoder(fmt.Sprintf("%d", i))
|
||||||
|
msg.Topic = topicName[0]
|
||||||
|
msg.Value = sarama.StringEncoder(fmt.Sprintf("i'm %d", i))
|
||||||
|
pd.SendMessage(&msg)
|
||||||
|
//msgs = append(msgs, &msg)
|
||||||
|
}
|
||||||
|
//err = pd.SendMessages(msgs)
|
||||||
|
//t.Log(err)
|
||||||
|
t.Log(time.Now().Sub(now).Milliseconds())
|
||||||
|
pd.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
func producer_async(t *testing.T) {
|
||||||
|
var admin KafkaAdmin
|
||||||
|
err := admin.Setup(kafkaVersion, addr)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tName := range topicName {
|
||||||
|
if admin.HasTopic(tName) == false {
|
||||||
|
err = admin.CreateTopic(tName, 10, 2, false)
|
||||||
|
t.Log(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
var pd Producer
|
||||||
|
cfg, err := NewProducerConfig(kafkaVersion, true, true, sarama.WaitForAll, false, sarama.NewHashPartitioner)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
err = pd.ASyncSetup(addr, cfg)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
now := time.Now()
|
||||||
|
|
||||||
|
msgs := make([]*AsyncReturn, 0, 20000)
|
||||||
|
for i := 0; i < 200000; i++ {
|
||||||
|
var msg sarama.ProducerMessage
|
||||||
|
msg.Key = sarama.StringEncoder(fmt.Sprintf("%d", i))
|
||||||
|
msg.Topic = topicName[0]
|
||||||
|
msg.Value = sarama.StringEncoder(fmt.Sprintf("i'm %d", i))
|
||||||
|
|
||||||
|
r := pd.AsyncSendMessage(&msg)
|
||||||
|
msgs = append(msgs, r)
|
||||||
|
}
|
||||||
|
//err = pd.SendMessages(msgs)
|
||||||
|
//t.Log(err)
|
||||||
|
|
||||||
|
for _, r := range msgs {
|
||||||
|
r.WaitOk(context.Background())
|
||||||
|
//t.Log(m, e)
|
||||||
|
}
|
||||||
|
t.Log(time.Now().Sub(now).Milliseconds())
|
||||||
|
|
||||||
|
time.Sleep(1000 * time.Second)
|
||||||
|
pd.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
func consumer(t *testing.T) {
|
||||||
|
var admin KafkaAdmin
|
||||||
|
err := admin.Setup(kafkaVersion, addr)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tName := range topicName {
|
||||||
|
if admin.HasTopic(tName) == false {
|
||||||
|
err = admin.CreateTopic(tName, 10, 2, false)
|
||||||
|
t.Log(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
var cg ConsumerGroup
|
||||||
|
cfg, err := NewConsumerConfig(kafkaVersion, "sticky", sarama.OffsetOldest)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
err = cg.Setup(addr, topicName, "test_groupId", cfg, 50*time.Second, 10, &MsgReceiver{t: t})
|
||||||
|
t.Log(err)
|
||||||
|
time.Sleep(10000 * time.Second)
|
||||||
|
cg.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestConsumerAndProducer(t *testing.T) {
|
||||||
|
producer_async(t)
|
||||||
|
//go producer(t)
|
||||||
|
//producer(t)
|
||||||
|
//consumer(t)
|
||||||
|
}
|
||||||
7
sysmodule/kafkamodule/Sasl.go
Normal file
7
sysmodule/kafkamodule/Sasl.go
Normal file
@@ -0,0 +1,7 @@
|
|||||||
|
package kafkamodule
|
||||||
|
|
||||||
|
type Sasl struct {
|
||||||
|
UserName string `json:"UserName"`
|
||||||
|
Passwd string `json:"Passwd"`
|
||||||
|
InstanceId string `json:"InstanceId"`
|
||||||
|
}
|
||||||
@@ -263,7 +263,7 @@ func (mp *MongoPersist) JugeTimeoutSave() bool{
|
|||||||
|
|
||||||
func (mp *MongoPersist) persistCoroutine(){
|
func (mp *MongoPersist) persistCoroutine(){
|
||||||
defer mp.waitGroup.Done()
|
defer mp.waitGroup.Done()
|
||||||
for atomic.LoadInt32(&mp.stop)==0 || mp.hasPersistData(){
|
for atomic.LoadInt32(&mp.stop)==0 {
|
||||||
//间隔时间sleep
|
//间隔时间sleep
|
||||||
time.Sleep(time.Second*1)
|
time.Sleep(time.Second*1)
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user