Compare commits

...

17 Commits

Author SHA1 Message Date
boyce
d1935b1bbc 优化日志 2024-05-27 18:09:37 +08:00
boyce
90d54bf3e2 1.新增服务和Global配置接口,支持通过结构体解析
2.优化日志
2024-05-15 10:06:50 +08:00
boyce
78cc33c84e 优化服务模板的配置读取 2024-05-11 15:11:24 +08:00
boyce
9cf21bf418 1.新增模板服务
2.优化消息队列
3.优化相关日志
2024-05-11 14:42:34 +08:00
boyce
c6d0bd9a19 优化gin模块 2024-05-08 14:18:39 +08:00
boyce
61bf95e457 优化RankService 2024-05-07 18:57:38 +08:00
boyce
8b2a551ee5 优化gin模块 2024-05-06 10:51:51 +08:00
boyce
927c2ffa37 优化gin模块 2024-05-06 10:36:04 +08:00
boyce
b23b30aac5 优化服务发现 2024-04-30 19:05:44 +08:00
boyce
03f8ba0316 精简事件通知 2024-04-30 17:33:34 +08:00
boyce
277480a7f0 Merge branch 'v2' of https://github.com/duanhf2012/origin into v2 2024-04-29 09:24:04 +08:00
boyce
647a654a36 补充优化说明文档 2024-04-29 09:23:54 +08:00
boyce
de483a88f1 优化停服 2024-04-28 18:02:20 +08:00
boyce
bbbb511b5f 新增kafka模块 2024-04-28 11:21:59 +08:00
boyce
0489ee3ef4 新增gin模块&优化pb处理器 2024-04-28 11:13:46 +08:00
boyce
692dacda0c 删除mongomodule,使用mongodbmodule替代 2024-04-28 10:40:56 +08:00
boyce
7f86b1007d 优化支持自定义logger 2024-04-26 18:13:41 +08:00
28 changed files with 1813 additions and 881 deletions

809
README.md

File diff suppressed because it is too large Load Diff

View File

@@ -8,6 +8,8 @@ import (
"strings" "strings"
"sync" "sync"
"github.com/duanhf2012/origin/v2/event" "github.com/duanhf2012/origin/v2/event"
"errors"
"reflect"
) )
var configDir = "./config/" var configDir = "./config/"
@@ -54,7 +56,6 @@ type Cluster struct {
discoveryInfo DiscoveryInfo //服务发现配置 discoveryInfo DiscoveryInfo //服务发现配置
rpcMode RpcMode rpcMode RpcMode
//masterDiscoveryNodeList []NodeInfo //配置发现Master结点
globalCfg interface{} //全局配置 globalCfg interface{} //全局配置
localServiceCfg map[string]interface{} //map[serviceName]配置数据* localServiceCfg map[string]interface{} //map[serviceName]配置数据*
@@ -63,6 +64,7 @@ type Cluster struct {
locker sync.RWMutex //结点与服务关系保护锁 locker sync.RWMutex //结点与服务关系保护锁
mapRpc map[string]*NodeRpcInfo //nodeId mapRpc map[string]*NodeRpcInfo //nodeId
mapServiceNode map[string]map[string]struct{} //map[serviceName]map[NodeId] mapServiceNode map[string]map[string]struct{} //map[serviceName]map[NodeId]
mapTemplateServiceNode map[string]map[string]struct{} //map[templateServiceName]map[serviceName]nodeId
callSet rpc.CallSet callSet rpc.CallSet
rpcNats rpc.RpcNats rpcNats rpc.RpcNats
@@ -70,7 +72,6 @@ type Cluster struct {
rpcEventLocker sync.RWMutex //Rpc事件监听保护锁 rpcEventLocker sync.RWMutex //Rpc事件监听保护锁
mapServiceListenRpcEvent map[string]struct{} //ServiceName mapServiceListenRpcEvent map[string]struct{} //ServiceName
mapServiceListenDiscoveryEvent map[string]struct{} //ServiceName
} }
func GetCluster() *Cluster { func GetCluster() *Cluster {
@@ -139,6 +140,20 @@ func (cls *Cluster) delServiceNode(serviceName string, nodeId string) {
return return
} }
//处理模板服务
splitServiceName := strings.Split(serviceName,":")
if len(splitServiceName) == 2 {
serviceName = splitServiceName[0]
templateServiceName := splitServiceName[1]
mapService := cls.mapTemplateServiceNode[templateServiceName]
delete(mapService,serviceName)
if len(cls.mapTemplateServiceNode[templateServiceName]) == 0 {
delete(cls.mapTemplateServiceNode,templateServiceName)
}
}
mapNode := cls.mapServiceNode[serviceName] mapNode := cls.mapServiceNode[serviceName]
delete(mapNode, nodeId) delete(mapNode, nodeId)
if len(mapNode) == 0 { if len(mapNode) == 0 {
@@ -173,7 +188,20 @@ func (cls *Cluster) serviceDiscoverySetNodeInfo(nodeInfo *NodeInfo) {
continue continue
} }
mapDuplicate[serviceName] = nil mapDuplicate[serviceName] = nil
if _, ok := cls.mapServiceNode[serviceName]; ok == false {
//如果是模板服务,则记录模板关系
splitServiceName := strings.Split(serviceName,":")
if len(splitServiceName) == 2 {
serviceName = splitServiceName[0]
templateServiceName := splitServiceName[1]
//记录模板
if _, ok = cls.mapTemplateServiceNode[templateServiceName]; ok == false {
cls.mapTemplateServiceNode[templateServiceName]=map[string]struct{}{}
}
cls.mapTemplateServiceNode[templateServiceName][serviceName] = struct{}{}
}
if _, ok = cls.mapServiceNode[serviceName]; ok == false {
cls.mapServiceNode[serviceName] = make(map[string]struct{}, 1) cls.mapServiceNode[serviceName] = make(map[string]struct{}, 1)
} }
cls.mapServiceNode[serviceName][nodeInfo.NodeId] = struct{}{} cls.mapServiceNode[serviceName][nodeInfo.NodeId] = struct{}{}
@@ -228,8 +256,6 @@ func (cls *Cluster) Init(localNodeId string, setupServiceFun SetupServiceFun) er
} }
service.RegRpcEventFun = cls.RegRpcEvent service.RegRpcEventFun = cls.RegRpcEvent
service.UnRegRpcEventFun = cls.UnRegRpcEvent service.UnRegRpcEventFun = cls.UnRegRpcEvent
service.RegDiscoveryServiceEventFun = cls.RegDiscoveryEvent
service.UnRegDiscoveryServiceEventFun = cls.UnReDiscoveryEvent
err = cls.serviceDiscovery.InitDiscovery(localNodeId, cls.serviceDiscoveryDelNode, cls.serviceDiscoverySetNodeInfo) err = cls.serviceDiscovery.InitDiscovery(localNodeId, cls.serviceDiscoveryDelNode, cls.serviceDiscoverySetNodeInfo)
if err != nil { if err != nil {
@@ -263,25 +289,29 @@ func (cls *Cluster) GetRpcClient(nodeId string) (*rpc.Client,bool) {
return cls.getRpcClient(nodeId) return cls.getRpcClient(nodeId)
} }
func GetRpcClient(nodeId string, serviceMethod string,filterRetire bool, clientList []*rpc.Client) (error, int) { func GetNodeIdByTemplateService(templateServiceName string, rpcClientList []*rpc.Client, filterRetire bool) (error, []*rpc.Client) {
return GetCluster().GetNodeIdByTemplateService(templateServiceName, rpcClientList, filterRetire)
}
func GetRpcClient(nodeId string, serviceMethod string,filterRetire bool, clientList []*rpc.Client) (error, []*rpc.Client) {
if nodeId != rpc.NodeIdNull { if nodeId != rpc.NodeIdNull {
pClient,retire := GetCluster().GetRpcClient(nodeId) pClient,retire := GetCluster().GetRpcClient(nodeId)
if pClient == nil { if pClient == nil {
return fmt.Errorf("cannot find nodeid %d!", nodeId), 0 return fmt.Errorf("cannot find nodeid %d!", nodeId), nil
} }
//如果需要筛选掉退休结点 //如果需要筛选掉退休结点
if filterRetire == true && retire == true { if filterRetire == true && retire == true {
return fmt.Errorf("cannot find nodeid %d!", nodeId), 0 return fmt.Errorf("cannot find nodeid %d!", nodeId), nil
} }
clientList[0] = pClient clientList = append(clientList,pClient)
return nil, 1 return nil, clientList
} }
findIndex := strings.Index(serviceMethod, ".") findIndex := strings.Index(serviceMethod, ".")
if findIndex == -1 { if findIndex == -1 {
return fmt.Errorf("servicemethod param %s is error!", serviceMethod), 0 return fmt.Errorf("servicemethod param %s is error!", serviceMethod), nil
} }
serviceName := serviceMethod[:findIndex] serviceName := serviceMethod[:findIndex]
@@ -322,23 +352,12 @@ func (cls *Cluster) NotifyAllService(event event.IEvent){
} }
func (cls *Cluster) TriggerDiscoveryEvent(bDiscovery bool, nodeId string, serviceName []string) { func (cls *Cluster) TriggerDiscoveryEvent(bDiscovery bool, nodeId string, serviceName []string) {
cls.rpcEventLocker.Lock() var eventData service.DiscoveryServiceEvent
defer cls.rpcEventLocker.Unlock() eventData.IsDiscovery = bDiscovery
eventData.NodeId = nodeId
for sName, _ := range cls.mapServiceListenDiscoveryEvent { eventData.ServiceName = serviceName
ser := service.GetService(sName)
if ser == nil {
log.Error("cannot find service",log.Any("services",serviceName))
continue
}
var eventData service.DiscoveryServiceEvent
eventData.IsDiscovery = bDiscovery
eventData.NodeId = nodeId
eventData.ServiceName = serviceName
ser.(service.IModule).NotifyEvent(&eventData)
}
cls.NotifyAllService(&eventData)
} }
func (cls *Cluster) GetLocalNodeInfo() *NodeInfo { func (cls *Cluster) GetLocalNodeInfo() *NodeInfo {
@@ -361,25 +380,6 @@ func (cls *Cluster) UnRegRpcEvent(serviceName string) {
cls.rpcEventLocker.Unlock() cls.rpcEventLocker.Unlock()
} }
func (cls *Cluster) RegDiscoveryEvent(serviceName string) {
cls.rpcEventLocker.Lock()
if cls.mapServiceListenDiscoveryEvent == nil {
cls.mapServiceListenDiscoveryEvent = map[string]struct{}{}
}
cls.mapServiceListenDiscoveryEvent[serviceName] = struct{}{}
cls.rpcEventLocker.Unlock()
}
func (cls *Cluster) UnReDiscoveryEvent(serviceName string) {
cls.rpcEventLocker.Lock()
delete(cls.mapServiceListenDiscoveryEvent, serviceName)
cls.rpcEventLocker.Unlock()
}
func HasService(nodeId string, serviceName string) bool { func HasService(nodeId string, serviceName string) bool {
cluster.locker.RLock() cluster.locker.RLock()
defer cluster.locker.RUnlock() defer cluster.locker.RUnlock()
@@ -410,10 +410,50 @@ func GetNodeByServiceName(serviceName string) map[string]struct{} {
return mapNodeId return mapNodeId
} }
// GetNodeByTemplateServiceName 通过模板服务名获取服务名,返回 map[serviceName真实服务名]NodeId
func GetNodeByTemplateServiceName(templateServiceName string) map[string]string {
cluster.locker.RLock()
defer cluster.locker.RUnlock()
mapServiceName := cluster.mapTemplateServiceNode[templateServiceName]
mapNodeId := make(map[string]string,9)
for serviceName := range mapServiceName {
mapNode, ok := cluster.mapServiceNode[serviceName]
if ok == false {
return nil
}
for nodeId,_ := range mapNode {
mapNodeId[serviceName] = nodeId
}
}
return mapNodeId
}
func (cls *Cluster) GetGlobalCfg() interface{} { func (cls *Cluster) GetGlobalCfg() interface{} {
return cls.globalCfg return cls.globalCfg
} }
func (cls *Cluster) ParseGlobalCfg(cfg interface{}) error{
if cls.globalCfg == nil {
return errors.New("no service configuration found")
}
rv := reflect.ValueOf(cls.globalCfg)
if rv.Kind() == reflect.Ptr && rv.IsNil() {
return errors.New("no service configuration found")
}
bytes,err := json.Marshal(cls.globalCfg)
if err != nil {
return err
}
return json.Unmarshal(bytes,cfg)
}
func (cls *Cluster) GetNodeInfo(nodeId string) (NodeInfo,bool) { func (cls *Cluster) GetNodeInfo(nodeId string) (NodeInfo,bool) {
cls.locker.RLock() cls.locker.RLock()
defer cls.locker.RUnlock() defer cls.locker.RUnlock()

View File

@@ -183,6 +183,10 @@ func (ds *OriginDiscoveryMaster) OnNodeDisconnect(nodeId string) {
func (ds *OriginDiscoveryMaster) RpcCastGo(serviceMethod string, args interface{}) { func (ds *OriginDiscoveryMaster) RpcCastGo(serviceMethod string, args interface{}) {
for nodeId, _ := range ds.mapNodeInfo { for nodeId, _ := range ds.mapNodeInfo {
if nodeId == cluster.GetLocalNodeInfo().NodeId {
continue
}
ds.GoNode(nodeId, serviceMethod, args) ds.GoNode(nodeId, serviceMethod, args)
} }
} }

View File

@@ -270,7 +270,7 @@ func (cls *Cluster) readLocalClusterConfig(nodeId string) (DiscoveryInfo, []Node
} }
if nodeId != rpc.NodeIdNull && (len(nodeInfoList) != 1) { if nodeId != rpc.NodeIdNull && (len(nodeInfoList) != 1) {
return discoveryInfo, nil,rpcMode, fmt.Errorf("%d configurations were found for the configuration with node ID %d!", len(nodeInfoList), nodeId) return discoveryInfo, nil,rpcMode, fmt.Errorf("nodeid %s configuration error in NodeList", nodeId)
} }
for i, _ := range nodeInfoList { for i, _ := range nodeInfoList {
@@ -325,6 +325,10 @@ func (cls *Cluster) readLocalService(localNodeId string) error {
//保存公共配置 //保存公共配置
for _, s := range cls.localNodeInfo.ServiceList { for _, s := range cls.localNodeInfo.ServiceList {
for { for {
splitServiceName := strings.Split(s,":")
if len(splitServiceName) == 2 {
s = splitServiceName[0]
}
//取公共服务配置 //取公共服务配置
pubCfg, ok := serviceConfig[s] pubCfg, ok := serviceConfig[s]
if ok == true { if ok == true {
@@ -355,6 +359,11 @@ func (cls *Cluster) readLocalService(localNodeId string) error {
//组合所有的配置 //组合所有的配置
for _, s := range cls.localNodeInfo.ServiceList { for _, s := range cls.localNodeInfo.ServiceList {
splitServiceName := strings.Split(s,":")
if len(splitServiceName) == 2 {
s = splitServiceName[0]
}
//先从NodeService中找 //先从NodeService中找
var serviceCfg interface{} var serviceCfg interface{}
var ok bool var ok bool
@@ -382,12 +391,24 @@ func (cls *Cluster) parseLocalCfg() {
cls.mapRpc[cls.localNodeInfo.NodeId] = &rpcInfo cls.mapRpc[cls.localNodeInfo.NodeId] = &rpcInfo
for _, sName := range cls.localNodeInfo.ServiceList { for _, serviceName := range cls.localNodeInfo.ServiceList {
if _, ok := cls.mapServiceNode[sName]; ok == false { splitServiceName := strings.Split(serviceName,":")
cls.mapServiceNode[sName] = make(map[string]struct{}) if len(splitServiceName) == 2 {
serviceName = splitServiceName[0]
templateServiceName := splitServiceName[1]
//记录模板
if _, ok := cls.mapTemplateServiceNode[templateServiceName]; ok == false {
cls.mapTemplateServiceNode[templateServiceName]=map[string]struct{}{}
}
cls.mapTemplateServiceNode[templateServiceName][serviceName] = struct{}{}
} }
cls.mapServiceNode[sName][cls.localNodeInfo.NodeId] = struct{}{}
if _, ok := cls.mapServiceNode[serviceName]; ok == false {
cls.mapServiceNode[serviceName] = make(map[string]struct{})
}
cls.mapServiceNode[serviceName][cls.localNodeInfo.NodeId] = struct{}{}
} }
} }
@@ -403,6 +424,7 @@ func (cls *Cluster) InitCfg(localNodeId string) error {
cls.localServiceCfg = map[string]interface{}{} cls.localServiceCfg = map[string]interface{}{}
cls.mapRpc = map[string]*NodeRpcInfo{} cls.mapRpc = map[string]*NodeRpcInfo{}
cls.mapServiceNode = map[string]map[string]struct{}{} cls.mapServiceNode = map[string]map[string]struct{}{}
cls.mapTemplateServiceNode = map[string]map[string]struct{}{}
//加载本地结点的NodeList配置 //加载本地结点的NodeList配置
discoveryInfo, nodeInfoList,rpcMode, err := cls.readLocalClusterConfig(localNodeId) discoveryInfo, nodeInfoList,rpcMode, err := cls.readLocalClusterConfig(localNodeId)
@@ -436,12 +458,37 @@ func (cls *Cluster) IsConfigService(serviceName string) bool {
return ok return ok
} }
func (cls *Cluster) GetNodeIdByTemplateService(templateServiceName string, rpcClientList []*rpc.Client, filterRetire bool) (error, []*rpc.Client) {
cls.locker.RLock()
defer cls.locker.RUnlock()
func (cls *Cluster) GetNodeIdByService(serviceName string, rpcClientList []*rpc.Client, filterRetire bool) (error, int) { mapServiceName := cls.mapTemplateServiceNode[templateServiceName]
for serviceName := range mapServiceName {
mapNodeId, ok := cls.mapServiceNode[serviceName]
if ok == true {
for nodeId, _ := range mapNodeId {
pClient,retire := GetCluster().getRpcClient(nodeId)
if pClient == nil || pClient.IsConnected() == false {
continue
}
//如果需要筛选掉退休的对retire状态的结点略过
if filterRetire == true && retire == true {
continue
}
rpcClientList = append(rpcClientList,pClient)
}
}
}
return nil, rpcClientList
}
func (cls *Cluster) GetNodeIdByService(serviceName string, rpcClientList []*rpc.Client, filterRetire bool) (error, []*rpc.Client) {
cls.locker.RLock() cls.locker.RLock()
defer cls.locker.RUnlock() defer cls.locker.RUnlock()
mapNodeId, ok := cls.mapServiceNode[serviceName] mapNodeId, ok := cls.mapServiceNode[serviceName]
count := 0
if ok == true { if ok == true {
for nodeId, _ := range mapNodeId { for nodeId, _ := range mapNodeId {
pClient,retire := GetCluster().getRpcClient(nodeId) pClient,retire := GetCluster().getRpcClient(nodeId)
@@ -454,15 +501,11 @@ func (cls *Cluster) GetNodeIdByService(serviceName string, rpcClientList []*rpc.
continue continue
} }
rpcClientList[count] = pClient rpcClientList = append(rpcClientList,pClient)
count++
if count >= cap(rpcClientList) {
break
}
} }
} }
return nil, count return nil, rpcClientList
} }
func (cls *Cluster) GetServiceCfg(serviceName string) interface{} { func (cls *Cluster) GetServiceCfg(serviceName string) interface{} {

View File

@@ -236,7 +236,6 @@ func (processor *EventProcessor) castEvent(event IEvent){
eventProcessor,ok := processor.mapListenerEvent[event.GetEventType()] eventProcessor,ok := processor.mapListenerEvent[event.GetEventType()]
if ok == false || processor == nil{ if ok == false || processor == nil{
log.Debug("event is not listen",log.Int("event type",int(event.GetEventType())))
return return
} }

View File

@@ -17,6 +17,7 @@ const (
Sys_Event_QueueTaskFinish EventType = -10 Sys_Event_QueueTaskFinish EventType = -10
Sys_Event_Retire EventType = -11 Sys_Event_Retire EventType = -11
Sys_Event_EtcdDiscovery EventType = -12 Sys_Event_EtcdDiscovery EventType = -12
Sys_Event_Gin_Event EventType = -13
Sys_Event_User_Define EventType = 1 Sys_Event_User_Define EventType = 1
) )

View File

@@ -22,7 +22,10 @@ var LogSize int64
var LogChannelCap int var LogChannelCap int
var LogPath string var LogPath string
var LogLevel slog.Level = LevelTrace var LogLevel slog.Level = LevelTrace
var gLogger, _ = NewTextLogger(LevelDebug, "", "",true,LogChannelCap) var gLogger, _ = NewTextLogger(LevelDebug, "", "",true,LogChannelCap)
var isSetLogger bool
var memPool = bytespool.NewMemAreaPool() var memPool = bytespool.NewMemAreaPool()
// levels // levels
@@ -37,6 +40,21 @@ const (
LevelFatal = slog.Level(20) LevelFatal = slog.Level(20)
) )
type ILogger interface {
Trace(msg string, args ...any)
Debug(msg string, args ...any)
Info(msg string, args ...any)
Warning(msg string, args ...any)
Error(msg string, args ...any)
Stack(msg string, args ...any)
Dump(msg string, args ...any)
Fatal(msg string, args ...any)
DoSPrintf(level slog.Level,a []interface{})
FormatHeader(buf *Buffer,level slog.Level,calldepth int)
Close()
}
type Logger struct { type Logger struct {
Slogger *slog.Logger Slogger *slog.Logger
@@ -47,7 +65,6 @@ type Logger struct {
type IoWriter struct { type IoWriter struct {
outFile io.Writer // destination for output outFile io.Writer // destination for output
outConsole io.Writer //os.Stdout
writeBytes int64 writeBytes int64
logChannel chan []byte logChannel chan []byte
wg sync.WaitGroup wg sync.WaitGroup
@@ -122,8 +139,8 @@ func (iw *IoWriter) Write(p []byte) (n int, err error){
func (iw *IoWriter) writeIo(p []byte) (n int, err error){ func (iw *IoWriter) writeIo(p []byte) (n int, err error){
n,err = iw.writeFile(p) n,err = iw.writeFile(p)
if iw.outConsole != nil { if OpenConsole {
n,err = iw.outConsole.Write(p) n,err = os.Stdout.Write(p)
} }
return return
@@ -217,17 +234,12 @@ func (iw *IoWriter) swichFile() error{
iw.fileDay = now.Day() iw.fileDay = now.Day()
iw.fileCreateTime = now.Unix() iw.fileCreateTime = now.Unix()
atomic.StoreInt64(&iw.writeBytes,0) atomic.StoreInt64(&iw.writeBytes,0)
if OpenConsole == true {
iw.outConsole = os.Stdout
}
}else{
iw.outConsole = os.Stdout
} }
return nil return nil
} }
func NewTextLogger(level slog.Level,pathName string,filePrefix string,addSource bool,logChannelCap int) (*Logger,error){ func NewTextLogger(level slog.Level,pathName string,filePrefix string,addSource bool,logChannelCap int) (ILogger,error){
var logger Logger var logger Logger
logger.ioWriter.filePath = pathName logger.ioWriter.filePath = pathName
logger.ioWriter.fileprefix = filePrefix logger.ioWriter.fileprefix = filePrefix
@@ -242,7 +254,7 @@ func NewTextLogger(level slog.Level,pathName string,filePrefix string,addSource
return &logger,nil return &logger,nil
} }
func NewJsonLogger(level slog.Level,pathName string,filePrefix string,addSource bool,logChannelCap int) (*Logger,error){ func NewJsonLogger(level slog.Level,pathName string,filePrefix string,addSource bool,logChannelCap int) (ILogger,error){
var logger Logger var logger Logger
logger.ioWriter.filePath = pathName logger.ioWriter.filePath = pathName
logger.ioWriter.fileprefix = filePrefix logger.ioWriter.fileprefix = filePrefix
@@ -296,13 +308,18 @@ func (logger *Logger) Fatal(msg string, args ...any) {
os.Exit(1) os.Exit(1)
} }
// It's dangerous to call the method on logging // It's non-thread-safe
func Export(logger *Logger) { func SetLogger(logger ILogger) {
if logger != nil { if logger != nil && isSetLogger == false {
gLogger = logger gLogger = logger
isSetLogger = true
} }
} }
func GetLogger() ILogger{
return gLogger
}
func Trace(msg string, args ...any){ func Trace(msg string, args ...any){
gLogger.Trace(msg, args...) gLogger.Trace(msg, args...)
} }
@@ -415,7 +432,7 @@ func Group(key string, args ...any) slog.Attr {
return slog.Group(key, args...) return slog.Group(key, args...)
} }
func (logger *Logger) doSPrintf(level slog.Level,a []interface{}) { func (logger *Logger) DoSPrintf(level slog.Level,a []interface{}) {
if logger.Slogger.Enabled(context.Background(),level) == false{ if logger.Slogger.Enabled(context.Background(),level) == false{
return return
} }
@@ -425,7 +442,7 @@ func (logger *Logger) doSPrintf(level slog.Level,a []interface{}) {
logger.sBuff.Reset() logger.sBuff.Reset()
logger.formatHeader(&logger.sBuff,level,3) logger.FormatHeader(&logger.sBuff,level,3)
for _,s := range a { for _,s := range a {
logger.sBuff.AppendString(slog.AnyValue(s).String()) logger.sBuff.AppendString(slog.AnyValue(s).String())
@@ -435,46 +452,46 @@ func (logger *Logger) doSPrintf(level slog.Level,a []interface{}) {
} }
func (logger *Logger) STrace(a ...interface{}) { func (logger *Logger) STrace(a ...interface{}) {
logger.doSPrintf(LevelTrace,a) logger.DoSPrintf(LevelTrace,a)
} }
func (logger *Logger) SDebug(a ...interface{}) { func (logger *Logger) SDebug(a ...interface{}) {
logger.doSPrintf(LevelDebug,a) logger.DoSPrintf(LevelDebug,a)
} }
func (logger *Logger) SInfo(a ...interface{}) { func (logger *Logger) SInfo(a ...interface{}) {
logger.doSPrintf(LevelInfo,a) logger.DoSPrintf(LevelInfo,a)
} }
func (logger *Logger) SWarning(a ...interface{}) { func (logger *Logger) SWarning(a ...interface{}) {
logger.doSPrintf(LevelWarning,a) logger.DoSPrintf(LevelWarning,a)
} }
func (logger *Logger) SError(a ...interface{}) { func (logger *Logger) SError(a ...interface{}) {
logger.doSPrintf(LevelError,a) logger.DoSPrintf(LevelError,a)
} }
func STrace(a ...interface{}) { func STrace(a ...interface{}) {
gLogger.doSPrintf(LevelTrace,a) gLogger.DoSPrintf(LevelTrace,a)
} }
func SDebug(a ...interface{}) { func SDebug(a ...interface{}) {
gLogger.doSPrintf(LevelDebug,a) gLogger.DoSPrintf(LevelDebug,a)
} }
func SInfo(a ...interface{}) { func SInfo(a ...interface{}) {
gLogger.doSPrintf(LevelInfo,a) gLogger.DoSPrintf(LevelInfo,a)
} }
func SWarning(a ...interface{}) { func SWarning(a ...interface{}) {
gLogger.doSPrintf(LevelWarning,a) gLogger.DoSPrintf(LevelWarning,a)
} }
func SError(a ...interface{}) { func SError(a ...interface{}) {
gLogger.doSPrintf(LevelError,a) gLogger.DoSPrintf(LevelError,a)
} }
func (logger *Logger) formatHeader(buf *Buffer,level slog.Level,calldepth int) { func (logger *Logger) FormatHeader(buf *Buffer,level slog.Level,calldepth int) {
t := time.Now() t := time.Now()
var file string var file string
var line int var line int

View File

@@ -10,9 +10,9 @@ type RawMessageInfo struct {
msgHandler RawMessageHandler msgHandler RawMessageHandler
} }
type RawMessageHandler func(clientId uint64,packType uint16,msg []byte) type RawMessageHandler func(clientId string,packType uint16,msg []byte)
type RawConnectHandler func(clientId uint64) type RawConnectHandler func(clientId string)
type UnknownRawMessageHandler func(clientId uint64,msg []byte) type UnknownRawMessageHandler func(clientId string,msg []byte)
const RawMsgTypeSize = 2 const RawMsgTypeSize = 2
type PBRawProcessor struct { type PBRawProcessor struct {
@@ -38,14 +38,14 @@ func (pbRawProcessor *PBRawProcessor) SetByteOrder(littleEndian bool) {
} }
// must goroutine safe // must goroutine safe
func (pbRawProcessor *PBRawProcessor ) MsgRoute(clientId uint64, msg interface{}) error{ func (pbRawProcessor *PBRawProcessor ) MsgRoute(clientId string, msg interface{}) error{
pPackInfo := msg.(*PBRawPackInfo) pPackInfo := msg.(*PBRawPackInfo)
pbRawProcessor.msgHandler(clientId,pPackInfo.typ,pPackInfo.rawMsg) pbRawProcessor.msgHandler(clientId,pPackInfo.typ,pPackInfo.rawMsg)
return nil return nil
} }
// must goroutine safe // must goroutine safe
func (pbRawProcessor *PBRawProcessor ) Unmarshal(clientId uint64,data []byte) (interface{}, error) { func (pbRawProcessor *PBRawProcessor ) Unmarshal(clientId string,data []byte) (interface{}, error) {
var msgType uint16 var msgType uint16
if pbRawProcessor.LittleEndian == true { if pbRawProcessor.LittleEndian == true {
msgType = binary.LittleEndian.Uint16(data[:2]) msgType = binary.LittleEndian.Uint16(data[:2])
@@ -57,7 +57,7 @@ func (pbRawProcessor *PBRawProcessor ) Unmarshal(clientId uint64,data []byte) (i
} }
// must goroutine safe // must goroutine safe
func (pbRawProcessor *PBRawProcessor ) Marshal(clientId uint64,msg interface{}) ([]byte, error){ func (pbRawProcessor *PBRawProcessor ) Marshal(clientId string,msg interface{}) ([]byte, error){
pMsg := msg.(*PBRawPackInfo) pMsg := msg.(*PBRawPackInfo)
buff := make([]byte, 2, len(pMsg.rawMsg)+RawMsgTypeSize) buff := make([]byte, 2, len(pMsg.rawMsg)+RawMsgTypeSize)
@@ -80,7 +80,7 @@ func (pbRawProcessor *PBRawProcessor) MakeRawMsg(msgType uint16,msg []byte,pbRaw
pbRawPackInfo.rawMsg = msg pbRawPackInfo.rawMsg = msg
} }
func (pbRawProcessor *PBRawProcessor) UnknownMsgRoute(clientId uint64,msg interface{}){ func (pbRawProcessor *PBRawProcessor) UnknownMsgRoute(clientId string,msg interface{}){
if pbRawProcessor.unknownMessageHandler == nil { if pbRawProcessor.unknownMessageHandler == nil {
return return
} }
@@ -88,11 +88,11 @@ func (pbRawProcessor *PBRawProcessor) UnknownMsgRoute(clientId uint64,msg interf
} }
// connect event // connect event
func (pbRawProcessor *PBRawProcessor) ConnectedRoute(clientId uint64){ func (pbRawProcessor *PBRawProcessor) ConnectedRoute(clientId string){
pbRawProcessor.connectHandler(clientId) pbRawProcessor.connectHandler(clientId)
} }
func (pbRawProcessor *PBRawProcessor) DisConnectedRoute(clientId uint64){ func (pbRawProcessor *PBRawProcessor) DisConnectedRoute(clientId string){
pbRawProcessor.disconnectHandler(clientId) pbRawProcessor.disconnectHandler(clientId)
} }

View File

@@ -25,6 +25,7 @@ import (
var sig chan os.Signal var sig chan os.Signal
var nodeId string var nodeId string
var preSetupService []service.IService //预安装 var preSetupService []service.IService //预安装
var preSetupTemplateService []func()service.IService
var profilerInterval time.Duration var profilerInterval time.Duration
var bValid bool var bValid bool
var configDir = "./config/" var configDir = "./config/"
@@ -117,7 +118,7 @@ func setConfigPath(val interface{}) error {
} }
func getRunProcessPid(nodeId string) (int, error) { func getRunProcessPid(nodeId string) (int, error) {
f, err := os.OpenFile(fmt.Sprintf("%s_%d.pid", os.Args[0], nodeId), os.O_RDONLY, 0600) f, err := os.OpenFile(fmt.Sprintf("%s_%s.pid", os.Args[0], nodeId), os.O_RDONLY, 0600)
defer f.Close() defer f.Close()
if err != nil { if err != nil {
return 0, err return 0, err
@@ -169,6 +170,31 @@ func initNode(id string) {
serviceOrder := cluster.GetCluster().GetLocalNodeInfo().ServiceList serviceOrder := cluster.GetCluster().GetLocalNodeInfo().ServiceList
for _,serviceName:= range serviceOrder{ for _,serviceName:= range serviceOrder{
bSetup := false bSetup := false
//判断是否有配置模板服务
splitServiceName := strings.Split(serviceName,":")
if len(splitServiceName) == 2 {
serviceName = splitServiceName[0]
templateServiceName := splitServiceName[1]
for _,newSer := range preSetupTemplateService {
ser := newSer()
ser.OnSetup(ser)
if ser.GetName() == templateServiceName {
ser.SetName(serviceName)
ser.Init(ser,cluster.GetRpcClient,cluster.GetRpcServer,cluster.GetCluster().GetServiceCfg(ser.GetName()))
service.Setup(ser)
bSetup = true
break
}
}
if bSetup == false{
log.Error("Template service not found",log.String("service name",serviceName),log.String("template service name",templateServiceName))
os.Exit(1)
}
}
for _, s := range preSetupService { for _, s := range preSetupService {
if s.GetName() != serviceName { if s.GetName() != serviceName {
continue continue
@@ -201,7 +227,7 @@ func initLog() error {
fmt.Printf("cannot create log file!\n") fmt.Printf("cannot create log file!\n")
return err return err
} }
log.Export(logger) log.SetLogger(logger)
return nil return nil
} }
@@ -367,6 +393,12 @@ func Setup(s ...service.IService) {
} }
} }
func SetupTemplate(fs ...func()service.IService){
for _, f := range fs {
preSetupTemplateService = append(preSetupTemplateService, f)
}
}
func GetService(serviceName string) service.IService { func GetService(serviceName string) service.IService {
return service.GetService(serviceName) return service.GetService(serviceName)
} }

View File

@@ -1,8 +1,8 @@
// Code generated by protoc-gen-go. DO NOT EDIT. // Code generated by protoc-gen-go. DO NOT EDIT.
// versions: // versions:
// protoc-gen-go v1.31.0 // protoc-gen-go v1.31.0
// protoc v3.11.4 // protoc v4.24.0
// source: test/rpc/messagequeue.proto // source: rpcproto/messagequeue.proto
package rpc package rpc
@@ -50,11 +50,11 @@ func (x SubscribeType) String() string {
} }
func (SubscribeType) Descriptor() protoreflect.EnumDescriptor { func (SubscribeType) Descriptor() protoreflect.EnumDescriptor {
return file_test_rpc_messagequeue_proto_enumTypes[0].Descriptor() return file_rpcproto_messagequeue_proto_enumTypes[0].Descriptor()
} }
func (SubscribeType) Type() protoreflect.EnumType { func (SubscribeType) Type() protoreflect.EnumType {
return &file_test_rpc_messagequeue_proto_enumTypes[0] return &file_rpcproto_messagequeue_proto_enumTypes[0]
} }
func (x SubscribeType) Number() protoreflect.EnumNumber { func (x SubscribeType) Number() protoreflect.EnumNumber {
@@ -63,7 +63,7 @@ func (x SubscribeType) Number() protoreflect.EnumNumber {
// Deprecated: Use SubscribeType.Descriptor instead. // Deprecated: Use SubscribeType.Descriptor instead.
func (SubscribeType) EnumDescriptor() ([]byte, []int) { func (SubscribeType) EnumDescriptor() ([]byte, []int) {
return file_test_rpc_messagequeue_proto_rawDescGZIP(), []int{0} return file_rpcproto_messagequeue_proto_rawDescGZIP(), []int{0}
} }
type SubscribeMethod int32 type SubscribeMethod int32
@@ -96,11 +96,11 @@ func (x SubscribeMethod) String() string {
} }
func (SubscribeMethod) Descriptor() protoreflect.EnumDescriptor { func (SubscribeMethod) Descriptor() protoreflect.EnumDescriptor {
return file_test_rpc_messagequeue_proto_enumTypes[1].Descriptor() return file_rpcproto_messagequeue_proto_enumTypes[1].Descriptor()
} }
func (SubscribeMethod) Type() protoreflect.EnumType { func (SubscribeMethod) Type() protoreflect.EnumType {
return &file_test_rpc_messagequeue_proto_enumTypes[1] return &file_rpcproto_messagequeue_proto_enumTypes[1]
} }
func (x SubscribeMethod) Number() protoreflect.EnumNumber { func (x SubscribeMethod) Number() protoreflect.EnumNumber {
@@ -109,7 +109,7 @@ func (x SubscribeMethod) Number() protoreflect.EnumNumber {
// Deprecated: Use SubscribeMethod.Descriptor instead. // Deprecated: Use SubscribeMethod.Descriptor instead.
func (SubscribeMethod) EnumDescriptor() ([]byte, []int) { func (SubscribeMethod) EnumDescriptor() ([]byte, []int) {
return file_test_rpc_messagequeue_proto_rawDescGZIP(), []int{1} return file_rpcproto_messagequeue_proto_rawDescGZIP(), []int{1}
} }
type DBQueuePopReq struct { type DBQueuePopReq struct {
@@ -127,7 +127,7 @@ type DBQueuePopReq struct {
func (x *DBQueuePopReq) Reset() { func (x *DBQueuePopReq) Reset() {
*x = DBQueuePopReq{} *x = DBQueuePopReq{}
if protoimpl.UnsafeEnabled { if protoimpl.UnsafeEnabled {
mi := &file_test_rpc_messagequeue_proto_msgTypes[0] mi := &file_rpcproto_messagequeue_proto_msgTypes[0]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi) ms.StoreMessageInfo(mi)
} }
@@ -140,7 +140,7 @@ func (x *DBQueuePopReq) String() string {
func (*DBQueuePopReq) ProtoMessage() {} func (*DBQueuePopReq) ProtoMessage() {}
func (x *DBQueuePopReq) ProtoReflect() protoreflect.Message { func (x *DBQueuePopReq) ProtoReflect() protoreflect.Message {
mi := &file_test_rpc_messagequeue_proto_msgTypes[0] mi := &file_rpcproto_messagequeue_proto_msgTypes[0]
if protoimpl.UnsafeEnabled && x != nil { if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil { if ms.LoadMessageInfo() == nil {
@@ -153,7 +153,7 @@ func (x *DBQueuePopReq) ProtoReflect() protoreflect.Message {
// Deprecated: Use DBQueuePopReq.ProtoReflect.Descriptor instead. // Deprecated: Use DBQueuePopReq.ProtoReflect.Descriptor instead.
func (*DBQueuePopReq) Descriptor() ([]byte, []int) { func (*DBQueuePopReq) Descriptor() ([]byte, []int) {
return file_test_rpc_messagequeue_proto_rawDescGZIP(), []int{0} return file_rpcproto_messagequeue_proto_rawDescGZIP(), []int{0}
} }
func (x *DBQueuePopReq) GetCustomerId() string { func (x *DBQueuePopReq) GetCustomerId() string {
@@ -203,7 +203,7 @@ type DBQueuePopRes struct {
func (x *DBQueuePopRes) Reset() { func (x *DBQueuePopRes) Reset() {
*x = DBQueuePopRes{} *x = DBQueuePopRes{}
if protoimpl.UnsafeEnabled { if protoimpl.UnsafeEnabled {
mi := &file_test_rpc_messagequeue_proto_msgTypes[1] mi := &file_rpcproto_messagequeue_proto_msgTypes[1]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi) ms.StoreMessageInfo(mi)
} }
@@ -216,7 +216,7 @@ func (x *DBQueuePopRes) String() string {
func (*DBQueuePopRes) ProtoMessage() {} func (*DBQueuePopRes) ProtoMessage() {}
func (x *DBQueuePopRes) ProtoReflect() protoreflect.Message { func (x *DBQueuePopRes) ProtoReflect() protoreflect.Message {
mi := &file_test_rpc_messagequeue_proto_msgTypes[1] mi := &file_rpcproto_messagequeue_proto_msgTypes[1]
if protoimpl.UnsafeEnabled && x != nil { if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil { if ms.LoadMessageInfo() == nil {
@@ -229,7 +229,7 @@ func (x *DBQueuePopRes) ProtoReflect() protoreflect.Message {
// Deprecated: Use DBQueuePopRes.ProtoReflect.Descriptor instead. // Deprecated: Use DBQueuePopRes.ProtoReflect.Descriptor instead.
func (*DBQueuePopRes) Descriptor() ([]byte, []int) { func (*DBQueuePopRes) Descriptor() ([]byte, []int) {
return file_test_rpc_messagequeue_proto_rawDescGZIP(), []int{1} return file_rpcproto_messagequeue_proto_rawDescGZIP(), []int{1}
} }
func (x *DBQueuePopRes) GetQueueName() string { func (x *DBQueuePopRes) GetQueueName() string {
@@ -255,7 +255,7 @@ type DBQueueSubscribeReq struct {
SubType SubscribeType `protobuf:"varint,1,opt,name=SubType,proto3,enum=SubscribeType" json:"SubType,omitempty"` //订阅类型 SubType SubscribeType `protobuf:"varint,1,opt,name=SubType,proto3,enum=SubscribeType" json:"SubType,omitempty"` //订阅类型
Method SubscribeMethod `protobuf:"varint,2,opt,name=Method,proto3,enum=SubscribeMethod" json:"Method,omitempty"` //订阅方法 Method SubscribeMethod `protobuf:"varint,2,opt,name=Method,proto3,enum=SubscribeMethod" json:"Method,omitempty"` //订阅方法
CustomerId string `protobuf:"bytes,3,opt,name=CustomerId,proto3" json:"CustomerId,omitempty"` //消费者Id CustomerId string `protobuf:"bytes,3,opt,name=CustomerId,proto3" json:"CustomerId,omitempty"` //消费者Id
FromNodeId int32 `protobuf:"varint,4,opt,name=FromNodeId,proto3" json:"FromNodeId,omitempty"` FromNodeId string `protobuf:"bytes,4,opt,name=FromNodeId,proto3" json:"FromNodeId,omitempty"`
RpcMethod string `protobuf:"bytes,5,opt,name=RpcMethod,proto3" json:"RpcMethod,omitempty"` RpcMethod string `protobuf:"bytes,5,opt,name=RpcMethod,proto3" json:"RpcMethod,omitempty"`
TopicName string `protobuf:"bytes,6,opt,name=TopicName,proto3" json:"TopicName,omitempty"` //主题名称 TopicName string `protobuf:"bytes,6,opt,name=TopicName,proto3" json:"TopicName,omitempty"` //主题名称
StartIndex uint64 `protobuf:"varint,7,opt,name=StartIndex,proto3" json:"StartIndex,omitempty"` //开始位置 ,格式前4位是时间戳秒后面是序号。如果填0时服务自动修改成(4bit 当前时间秒)| (0000 4bit) StartIndex uint64 `protobuf:"varint,7,opt,name=StartIndex,proto3" json:"StartIndex,omitempty"` //开始位置 ,格式前4位是时间戳秒后面是序号。如果填0时服务自动修改成(4bit 当前时间秒)| (0000 4bit)
@@ -265,7 +265,7 @@ type DBQueueSubscribeReq struct {
func (x *DBQueueSubscribeReq) Reset() { func (x *DBQueueSubscribeReq) Reset() {
*x = DBQueueSubscribeReq{} *x = DBQueueSubscribeReq{}
if protoimpl.UnsafeEnabled { if protoimpl.UnsafeEnabled {
mi := &file_test_rpc_messagequeue_proto_msgTypes[2] mi := &file_rpcproto_messagequeue_proto_msgTypes[2]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi) ms.StoreMessageInfo(mi)
} }
@@ -278,7 +278,7 @@ func (x *DBQueueSubscribeReq) String() string {
func (*DBQueueSubscribeReq) ProtoMessage() {} func (*DBQueueSubscribeReq) ProtoMessage() {}
func (x *DBQueueSubscribeReq) ProtoReflect() protoreflect.Message { func (x *DBQueueSubscribeReq) ProtoReflect() protoreflect.Message {
mi := &file_test_rpc_messagequeue_proto_msgTypes[2] mi := &file_rpcproto_messagequeue_proto_msgTypes[2]
if protoimpl.UnsafeEnabled && x != nil { if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil { if ms.LoadMessageInfo() == nil {
@@ -291,7 +291,7 @@ func (x *DBQueueSubscribeReq) ProtoReflect() protoreflect.Message {
// Deprecated: Use DBQueueSubscribeReq.ProtoReflect.Descriptor instead. // Deprecated: Use DBQueueSubscribeReq.ProtoReflect.Descriptor instead.
func (*DBQueueSubscribeReq) Descriptor() ([]byte, []int) { func (*DBQueueSubscribeReq) Descriptor() ([]byte, []int) {
return file_test_rpc_messagequeue_proto_rawDescGZIP(), []int{2} return file_rpcproto_messagequeue_proto_rawDescGZIP(), []int{2}
} }
func (x *DBQueueSubscribeReq) GetSubType() SubscribeType { func (x *DBQueueSubscribeReq) GetSubType() SubscribeType {
@@ -315,11 +315,11 @@ func (x *DBQueueSubscribeReq) GetCustomerId() string {
return "" return ""
} }
func (x *DBQueueSubscribeReq) GetFromNodeId() int32 { func (x *DBQueueSubscribeReq) GetFromNodeId() string {
if x != nil { if x != nil {
return x.FromNodeId return x.FromNodeId
} }
return 0 return ""
} }
func (x *DBQueueSubscribeReq) GetRpcMethod() string { func (x *DBQueueSubscribeReq) GetRpcMethod() string {
@@ -359,7 +359,7 @@ type DBQueueSubscribeRes struct {
func (x *DBQueueSubscribeRes) Reset() { func (x *DBQueueSubscribeRes) Reset() {
*x = DBQueueSubscribeRes{} *x = DBQueueSubscribeRes{}
if protoimpl.UnsafeEnabled { if protoimpl.UnsafeEnabled {
mi := &file_test_rpc_messagequeue_proto_msgTypes[3] mi := &file_rpcproto_messagequeue_proto_msgTypes[3]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi) ms.StoreMessageInfo(mi)
} }
@@ -372,7 +372,7 @@ func (x *DBQueueSubscribeRes) String() string {
func (*DBQueueSubscribeRes) ProtoMessage() {} func (*DBQueueSubscribeRes) ProtoMessage() {}
func (x *DBQueueSubscribeRes) ProtoReflect() protoreflect.Message { func (x *DBQueueSubscribeRes) ProtoReflect() protoreflect.Message {
mi := &file_test_rpc_messagequeue_proto_msgTypes[3] mi := &file_rpcproto_messagequeue_proto_msgTypes[3]
if protoimpl.UnsafeEnabled && x != nil { if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil { if ms.LoadMessageInfo() == nil {
@@ -385,7 +385,7 @@ func (x *DBQueueSubscribeRes) ProtoReflect() protoreflect.Message {
// Deprecated: Use DBQueueSubscribeRes.ProtoReflect.Descriptor instead. // Deprecated: Use DBQueueSubscribeRes.ProtoReflect.Descriptor instead.
func (*DBQueueSubscribeRes) Descriptor() ([]byte, []int) { func (*DBQueueSubscribeRes) Descriptor() ([]byte, []int) {
return file_test_rpc_messagequeue_proto_rawDescGZIP(), []int{3} return file_rpcproto_messagequeue_proto_rawDescGZIP(), []int{3}
} }
type DBQueuePublishReq struct { type DBQueuePublishReq struct {
@@ -400,7 +400,7 @@ type DBQueuePublishReq struct {
func (x *DBQueuePublishReq) Reset() { func (x *DBQueuePublishReq) Reset() {
*x = DBQueuePublishReq{} *x = DBQueuePublishReq{}
if protoimpl.UnsafeEnabled { if protoimpl.UnsafeEnabled {
mi := &file_test_rpc_messagequeue_proto_msgTypes[4] mi := &file_rpcproto_messagequeue_proto_msgTypes[4]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi) ms.StoreMessageInfo(mi)
} }
@@ -413,7 +413,7 @@ func (x *DBQueuePublishReq) String() string {
func (*DBQueuePublishReq) ProtoMessage() {} func (*DBQueuePublishReq) ProtoMessage() {}
func (x *DBQueuePublishReq) ProtoReflect() protoreflect.Message { func (x *DBQueuePublishReq) ProtoReflect() protoreflect.Message {
mi := &file_test_rpc_messagequeue_proto_msgTypes[4] mi := &file_rpcproto_messagequeue_proto_msgTypes[4]
if protoimpl.UnsafeEnabled && x != nil { if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil { if ms.LoadMessageInfo() == nil {
@@ -426,7 +426,7 @@ func (x *DBQueuePublishReq) ProtoReflect() protoreflect.Message {
// Deprecated: Use DBQueuePublishReq.ProtoReflect.Descriptor instead. // Deprecated: Use DBQueuePublishReq.ProtoReflect.Descriptor instead.
func (*DBQueuePublishReq) Descriptor() ([]byte, []int) { func (*DBQueuePublishReq) Descriptor() ([]byte, []int) {
return file_test_rpc_messagequeue_proto_rawDescGZIP(), []int{4} return file_rpcproto_messagequeue_proto_rawDescGZIP(), []int{4}
} }
func (x *DBQueuePublishReq) GetTopicName() string { func (x *DBQueuePublishReq) GetTopicName() string {
@@ -452,7 +452,7 @@ type DBQueuePublishRes struct {
func (x *DBQueuePublishRes) Reset() { func (x *DBQueuePublishRes) Reset() {
*x = DBQueuePublishRes{} *x = DBQueuePublishRes{}
if protoimpl.UnsafeEnabled { if protoimpl.UnsafeEnabled {
mi := &file_test_rpc_messagequeue_proto_msgTypes[5] mi := &file_rpcproto_messagequeue_proto_msgTypes[5]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi) ms.StoreMessageInfo(mi)
} }
@@ -465,7 +465,7 @@ func (x *DBQueuePublishRes) String() string {
func (*DBQueuePublishRes) ProtoMessage() {} func (*DBQueuePublishRes) ProtoMessage() {}
func (x *DBQueuePublishRes) ProtoReflect() protoreflect.Message { func (x *DBQueuePublishRes) ProtoReflect() protoreflect.Message {
mi := &file_test_rpc_messagequeue_proto_msgTypes[5] mi := &file_rpcproto_messagequeue_proto_msgTypes[5]
if protoimpl.UnsafeEnabled && x != nil { if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil { if ms.LoadMessageInfo() == nil {
@@ -478,13 +478,13 @@ func (x *DBQueuePublishRes) ProtoReflect() protoreflect.Message {
// Deprecated: Use DBQueuePublishRes.ProtoReflect.Descriptor instead. // Deprecated: Use DBQueuePublishRes.ProtoReflect.Descriptor instead.
func (*DBQueuePublishRes) Descriptor() ([]byte, []int) { func (*DBQueuePublishRes) Descriptor() ([]byte, []int) {
return file_test_rpc_messagequeue_proto_rawDescGZIP(), []int{5} return file_rpcproto_messagequeue_proto_rawDescGZIP(), []int{5}
} }
var File_test_rpc_messagequeue_proto protoreflect.FileDescriptor var File_rpcproto_messagequeue_proto protoreflect.FileDescriptor
var file_test_rpc_messagequeue_proto_rawDesc = []byte{ var file_rpcproto_messagequeue_proto_rawDesc = []byte{
0x0a, 0x1b, 0x74, 0x65, 0x73, 0x74, 0x2f, 0x72, 0x70, 0x63, 0x2f, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x0a, 0x1b, 0x72, 0x70, 0x63, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x6d, 0x65, 0x73, 0x73, 0x61,
0x67, 0x65, 0x71, 0x75, 0x65, 0x75, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0xa3, 0x01, 0x67, 0x65, 0x71, 0x75, 0x65, 0x75, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0xa3, 0x01,
0x0a, 0x0d, 0x44, 0x42, 0x51, 0x75, 0x65, 0x75, 0x65, 0x50, 0x6f, 0x70, 0x52, 0x65, 0x71, 0x12, 0x0a, 0x0d, 0x44, 0x42, 0x51, 0x75, 0x65, 0x75, 0x65, 0x50, 0x6f, 0x70, 0x52, 0x65, 0x71, 0x12,
0x1e, 0x0a, 0x0a, 0x43, 0x75, 0x73, 0x74, 0x6f, 0x6d, 0x65, 0x72, 0x49, 0x64, 0x18, 0x01, 0x20, 0x1e, 0x0a, 0x0a, 0x43, 0x75, 0x73, 0x74, 0x6f, 0x6d, 0x65, 0x72, 0x49, 0x64, 0x18, 0x01, 0x20,
@@ -510,7 +510,7 @@ var file_test_rpc_messagequeue_proto_rawDesc = []byte{
0x6f, 0x64, 0x52, 0x06, 0x4d, 0x65, 0x74, 0x68, 0x6f, 0x64, 0x12, 0x1e, 0x0a, 0x0a, 0x43, 0x75, 0x6f, 0x64, 0x52, 0x06, 0x4d, 0x65, 0x74, 0x68, 0x6f, 0x64, 0x12, 0x1e, 0x0a, 0x0a, 0x43, 0x75,
0x73, 0x74, 0x6f, 0x6d, 0x65, 0x72, 0x49, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x73, 0x74, 0x6f, 0x6d, 0x65, 0x72, 0x49, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a,
0x43, 0x75, 0x73, 0x74, 0x6f, 0x6d, 0x65, 0x72, 0x49, 0x64, 0x12, 0x1e, 0x0a, 0x0a, 0x46, 0x72, 0x43, 0x75, 0x73, 0x74, 0x6f, 0x6d, 0x65, 0x72, 0x49, 0x64, 0x12, 0x1e, 0x0a, 0x0a, 0x46, 0x72,
0x6f, 0x6d, 0x4e, 0x6f, 0x64, 0x65, 0x49, 0x64, 0x18, 0x04, 0x20, 0x01, 0x28, 0x05, 0x52, 0x0a, 0x6f, 0x6d, 0x4e, 0x6f, 0x64, 0x65, 0x49, 0x64, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a,
0x46, 0x72, 0x6f, 0x6d, 0x4e, 0x6f, 0x64, 0x65, 0x49, 0x64, 0x12, 0x1c, 0x0a, 0x09, 0x52, 0x70, 0x46, 0x72, 0x6f, 0x6d, 0x4e, 0x6f, 0x64, 0x65, 0x49, 0x64, 0x12, 0x1c, 0x0a, 0x09, 0x52, 0x70,
0x63, 0x4d, 0x65, 0x74, 0x68, 0x6f, 0x64, 0x18, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x52, 0x63, 0x4d, 0x65, 0x74, 0x68, 0x6f, 0x64, 0x18, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x52,
0x70, 0x63, 0x4d, 0x65, 0x74, 0x68, 0x6f, 0x64, 0x12, 0x1c, 0x0a, 0x09, 0x54, 0x6f, 0x70, 0x69, 0x70, 0x63, 0x4d, 0x65, 0x74, 0x68, 0x6f, 0x64, 0x12, 0x1c, 0x0a, 0x09, 0x54, 0x6f, 0x70, 0x69,
@@ -539,20 +539,20 @@ var file_test_rpc_messagequeue_proto_rawDesc = []byte{
} }
var ( var (
file_test_rpc_messagequeue_proto_rawDescOnce sync.Once file_rpcproto_messagequeue_proto_rawDescOnce sync.Once
file_test_rpc_messagequeue_proto_rawDescData = file_test_rpc_messagequeue_proto_rawDesc file_rpcproto_messagequeue_proto_rawDescData = file_rpcproto_messagequeue_proto_rawDesc
) )
func file_test_rpc_messagequeue_proto_rawDescGZIP() []byte { func file_rpcproto_messagequeue_proto_rawDescGZIP() []byte {
file_test_rpc_messagequeue_proto_rawDescOnce.Do(func() { file_rpcproto_messagequeue_proto_rawDescOnce.Do(func() {
file_test_rpc_messagequeue_proto_rawDescData = protoimpl.X.CompressGZIP(file_test_rpc_messagequeue_proto_rawDescData) file_rpcproto_messagequeue_proto_rawDescData = protoimpl.X.CompressGZIP(file_rpcproto_messagequeue_proto_rawDescData)
}) })
return file_test_rpc_messagequeue_proto_rawDescData return file_rpcproto_messagequeue_proto_rawDescData
} }
var file_test_rpc_messagequeue_proto_enumTypes = make([]protoimpl.EnumInfo, 2) var file_rpcproto_messagequeue_proto_enumTypes = make([]protoimpl.EnumInfo, 2)
var file_test_rpc_messagequeue_proto_msgTypes = make([]protoimpl.MessageInfo, 6) var file_rpcproto_messagequeue_proto_msgTypes = make([]protoimpl.MessageInfo, 6)
var file_test_rpc_messagequeue_proto_goTypes = []interface{}{ var file_rpcproto_messagequeue_proto_goTypes = []interface{}{
(SubscribeType)(0), // 0: SubscribeType (SubscribeType)(0), // 0: SubscribeType
(SubscribeMethod)(0), // 1: SubscribeMethod (SubscribeMethod)(0), // 1: SubscribeMethod
(*DBQueuePopReq)(nil), // 2: DBQueuePopReq (*DBQueuePopReq)(nil), // 2: DBQueuePopReq
@@ -562,7 +562,7 @@ var file_test_rpc_messagequeue_proto_goTypes = []interface{}{
(*DBQueuePublishReq)(nil), // 6: DBQueuePublishReq (*DBQueuePublishReq)(nil), // 6: DBQueuePublishReq
(*DBQueuePublishRes)(nil), // 7: DBQueuePublishRes (*DBQueuePublishRes)(nil), // 7: DBQueuePublishRes
} }
var file_test_rpc_messagequeue_proto_depIdxs = []int32{ var file_rpcproto_messagequeue_proto_depIdxs = []int32{
0, // 0: DBQueueSubscribeReq.SubType:type_name -> SubscribeType 0, // 0: DBQueueSubscribeReq.SubType:type_name -> SubscribeType
1, // 1: DBQueueSubscribeReq.Method:type_name -> SubscribeMethod 1, // 1: DBQueueSubscribeReq.Method:type_name -> SubscribeMethod
2, // [2:2] is the sub-list for method output_type 2, // [2:2] is the sub-list for method output_type
@@ -572,13 +572,13 @@ var file_test_rpc_messagequeue_proto_depIdxs = []int32{
0, // [0:2] is the sub-list for field type_name 0, // [0:2] is the sub-list for field type_name
} }
func init() { file_test_rpc_messagequeue_proto_init() } func init() { file_rpcproto_messagequeue_proto_init() }
func file_test_rpc_messagequeue_proto_init() { func file_rpcproto_messagequeue_proto_init() {
if File_test_rpc_messagequeue_proto != nil { if File_rpcproto_messagequeue_proto != nil {
return return
} }
if !protoimpl.UnsafeEnabled { if !protoimpl.UnsafeEnabled {
file_test_rpc_messagequeue_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { file_rpcproto_messagequeue_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*DBQueuePopReq); i { switch v := v.(*DBQueuePopReq); i {
case 0: case 0:
return &v.state return &v.state
@@ -590,7 +590,7 @@ func file_test_rpc_messagequeue_proto_init() {
return nil return nil
} }
} }
file_test_rpc_messagequeue_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { file_rpcproto_messagequeue_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*DBQueuePopRes); i { switch v := v.(*DBQueuePopRes); i {
case 0: case 0:
return &v.state return &v.state
@@ -602,7 +602,7 @@ func file_test_rpc_messagequeue_proto_init() {
return nil return nil
} }
} }
file_test_rpc_messagequeue_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { file_rpcproto_messagequeue_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*DBQueueSubscribeReq); i { switch v := v.(*DBQueueSubscribeReq); i {
case 0: case 0:
return &v.state return &v.state
@@ -614,7 +614,7 @@ func file_test_rpc_messagequeue_proto_init() {
return nil return nil
} }
} }
file_test_rpc_messagequeue_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} { file_rpcproto_messagequeue_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*DBQueueSubscribeRes); i { switch v := v.(*DBQueueSubscribeRes); i {
case 0: case 0:
return &v.state return &v.state
@@ -626,7 +626,7 @@ func file_test_rpc_messagequeue_proto_init() {
return nil return nil
} }
} }
file_test_rpc_messagequeue_proto_msgTypes[4].Exporter = func(v interface{}, i int) interface{} { file_rpcproto_messagequeue_proto_msgTypes[4].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*DBQueuePublishReq); i { switch v := v.(*DBQueuePublishReq); i {
case 0: case 0:
return &v.state return &v.state
@@ -638,7 +638,7 @@ func file_test_rpc_messagequeue_proto_init() {
return nil return nil
} }
} }
file_test_rpc_messagequeue_proto_msgTypes[5].Exporter = func(v interface{}, i int) interface{} { file_rpcproto_messagequeue_proto_msgTypes[5].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*DBQueuePublishRes); i { switch v := v.(*DBQueuePublishRes); i {
case 0: case 0:
return &v.state return &v.state
@@ -655,19 +655,19 @@ func file_test_rpc_messagequeue_proto_init() {
out := protoimpl.TypeBuilder{ out := protoimpl.TypeBuilder{
File: protoimpl.DescBuilder{ File: protoimpl.DescBuilder{
GoPackagePath: reflect.TypeOf(x{}).PkgPath(), GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_test_rpc_messagequeue_proto_rawDesc, RawDescriptor: file_rpcproto_messagequeue_proto_rawDesc,
NumEnums: 2, NumEnums: 2,
NumMessages: 6, NumMessages: 6,
NumExtensions: 0, NumExtensions: 0,
NumServices: 0, NumServices: 0,
}, },
GoTypes: file_test_rpc_messagequeue_proto_goTypes, GoTypes: file_rpcproto_messagequeue_proto_goTypes,
DependencyIndexes: file_test_rpc_messagequeue_proto_depIdxs, DependencyIndexes: file_rpcproto_messagequeue_proto_depIdxs,
EnumInfos: file_test_rpc_messagequeue_proto_enumTypes, EnumInfos: file_rpcproto_messagequeue_proto_enumTypes,
MessageInfos: file_test_rpc_messagequeue_proto_msgTypes, MessageInfos: file_rpcproto_messagequeue_proto_msgTypes,
}.Build() }.Build()
File_test_rpc_messagequeue_proto = out.File File_rpcproto_messagequeue_proto = out.File
file_test_rpc_messagequeue_proto_rawDesc = nil file_rpcproto_messagequeue_proto_rawDesc = nil
file_test_rpc_messagequeue_proto_goTypes = nil file_rpcproto_messagequeue_proto_goTypes = nil
file_test_rpc_messagequeue_proto_depIdxs = nil file_rpcproto_messagequeue_proto_depIdxs = nil
} }

View File

@@ -31,7 +31,7 @@ message DBQueueSubscribeReq {
SubscribeType SubType = 1; //订阅类型 SubscribeType SubType = 1; //订阅类型
SubscribeMethod Method = 2; //订阅方法 SubscribeMethod Method = 2; //订阅方法
string CustomerId = 3; //消费者Id string CustomerId = 3; //消费者Id
int32 FromNodeId = 4; string FromNodeId = 4;
string RpcMethod = 5; string RpcMethod = 5;
string TopicName = 6; //主题名称 string TopicName = 6; //主题名称
uint64 StartIndex = 7; //开始位置 ,格式前4位是时间戳秒后面是序号。如果填0时服务自动修改成(4bit 当前时间秒)| (0000 4bit) uint64 StartIndex = 7; //开始位置 ,格式前4位是时间戳秒后面是序号。如果填0时服务自动修改成(4bit 当前时间秒)| (0000 4bit)

View File

@@ -13,9 +13,9 @@ import (
"time" "time"
) )
const maxClusterNode int = 128 const maxClusterNode int = 32
type FuncRpcClient func(nodeId string, serviceMethod string,filterRetire bool, client []*Client) (error, int) type FuncRpcClient func(nodeId string, serviceMethod string,filterRetire bool, client []*Client) (error, []*Client)
type FuncRpcServer func() IServer type FuncRpcServer func() IServer
const NodeIdNull = "" const NodeIdNull = ""
@@ -63,7 +63,7 @@ type RpcHandler struct {
funcRpcClient FuncRpcClient funcRpcClient FuncRpcClient
funcRpcServer FuncRpcServer funcRpcServer FuncRpcServer
pClientList []*Client //pClientList []*Client
} }
//type TriggerRpcConnEvent func(bConnect bool, clientSeq uint32, nodeId string) //type TriggerRpcConnEvent func(bConnect bool, clientSeq uint32, nodeId string)
@@ -135,7 +135,6 @@ func (handler *RpcHandler) InitRpcHandler(rpcHandler IRpcHandler, getClientFun F
handler.mapFunctions = map[string]RpcMethodInfo{} handler.mapFunctions = map[string]RpcMethodInfo{}
handler.funcRpcClient = getClientFun handler.funcRpcClient = getClientFun
handler.funcRpcServer = getServerFun handler.funcRpcServer = getServerFun
handler.pClientList = make([]*Client, maxClusterNode)
handler.RegisterRpc(rpcHandler) handler.RegisterRpc(rpcHandler)
} }
@@ -274,7 +273,7 @@ func (handler *RpcHandler) HandlerRpcRequest(request *RpcRequest) {
//普通的rpc请求 //普通的rpc请求
v, ok := handler.mapFunctions[request.RpcRequestData.GetServiceMethod()] v, ok := handler.mapFunctions[request.RpcRequestData.GetServiceMethod()]
if ok == false { if ok == false {
err := "RpcHandler " + handler.rpcHandler.GetName() + "cannot find " + request.RpcRequestData.GetServiceMethod() err := "RpcHandler " + handler.rpcHandler.GetName() + " cannot find " + request.RpcRequestData.GetServiceMethod()
log.Error("HandlerRpcRequest cannot find serviceMethod",log.String("RpcHandlerName",handler.rpcHandler.GetName()),log.String("serviceMethod",request.RpcRequestData.GetServiceMethod())) log.Error("HandlerRpcRequest cannot find serviceMethod",log.String("RpcHandlerName",handler.rpcHandler.GetName()),log.String("serviceMethod",request.RpcRequestData.GetServiceMethod()))
if request.requestHandle != nil { if request.requestHandle != nil {
request.requestHandle(nil, RpcError(err)) request.requestHandle(nil, RpcError(err))
@@ -435,9 +434,9 @@ func (handler *RpcHandler) CallMethod(client *Client,ServiceMethod string, param
} }
func (handler *RpcHandler) goRpc(processor IRpcProcessor, bCast bool, nodeId string, serviceMethod string, args interface{}) error { func (handler *RpcHandler) goRpc(processor IRpcProcessor, bCast bool, nodeId string, serviceMethod string, args interface{}) error {
var pClientList [maxClusterNode]*Client pClientList :=make([]*Client,0,maxClusterNode)
err, count := handler.funcRpcClient(nodeId, serviceMethod,false, pClientList[:]) err, pClientList := handler.funcRpcClient(nodeId, serviceMethod,false, pClientList)
if count == 0 { if len(pClientList) == 0 {
if err != nil { if err != nil {
log.Error("call serviceMethod is failed",log.String("serviceMethod",serviceMethod),log.ErrorAttr("error",err)) log.Error("call serviceMethod is failed",log.String("serviceMethod",serviceMethod),log.ErrorAttr("error",err))
} else { } else {
@@ -446,13 +445,13 @@ func (handler *RpcHandler) goRpc(processor IRpcProcessor, bCast bool, nodeId str
return err return err
} }
if count > 1 && bCast == false { if len(pClientList) > 1 && bCast == false {
log.Error("cannot call serviceMethod more then 1 node",log.String("serviceMethod",serviceMethod)) log.Error("cannot call serviceMethod more then 1 node",log.String("serviceMethod",serviceMethod))
return errors.New("cannot call more then 1 node") return errors.New("cannot call more then 1 node")
} }
//2.rpcClient调用 //2.rpcClient调用
for i := 0; i < count; i++ { for i := 0; i < len(pClientList); i++ {
pCall := pClientList[i].Go(pClientList[i].GetTargetNodeId(),DefaultRpcTimeout,handler.rpcHandler,true, serviceMethod, args, nil) pCall := pClientList[i].Go(pClientList[i].GetTargetNodeId(),DefaultRpcTimeout,handler.rpcHandler,true, serviceMethod, args, nil)
if pCall.Err != nil { if pCall.Err != nil {
err = pCall.Err err = pCall.Err
@@ -465,16 +464,16 @@ func (handler *RpcHandler) goRpc(processor IRpcProcessor, bCast bool, nodeId str
} }
func (handler *RpcHandler) callRpc(timeout time.Duration,nodeId string, serviceMethod string, args interface{}, reply interface{}) error { func (handler *RpcHandler) callRpc(timeout time.Duration,nodeId string, serviceMethod string, args interface{}, reply interface{}) error {
var pClientList [maxClusterNode]*Client pClientList :=make([]*Client,0,maxClusterNode)
err, count := handler.funcRpcClient(nodeId, serviceMethod,false, pClientList[:]) err, pClientList := handler.funcRpcClient(nodeId, serviceMethod,false, pClientList)
if err != nil { if err != nil {
log.Error("Call serviceMethod is failed",log.ErrorAttr("error",err)) log.Error("Call serviceMethod is failed",log.ErrorAttr("error",err))
return err return err
} else if count <= 0 { } else if len(pClientList) <= 0 {
err = errors.New("Call serviceMethod is error:cannot find " + serviceMethod) err = errors.New("Call serviceMethod is error:cannot find " + serviceMethod)
log.Error("cannot find serviceMethod",log.String("serviceMethod",serviceMethod)) log.Error("cannot find serviceMethod",log.String("serviceMethod",serviceMethod))
return err return err
} else if count > 1 { } else if len(pClientList) > 1 {
log.Error("Cannot call more then 1 node!",log.String("serviceMethod",serviceMethod)) log.Error("Cannot call more then 1 node!",log.String("serviceMethod",serviceMethod))
return errors.New("cannot call more then 1 node") return errors.New("cannot call more then 1 node")
} }
@@ -509,9 +508,9 @@ func (handler *RpcHandler) asyncCallRpc(timeout time.Duration,nodeId string, ser
} }
reply := reflect.New(fVal.Type().In(0).Elem()).Interface() reply := reflect.New(fVal.Type().In(0).Elem()).Interface()
var pClientList [2]*Client pClientList :=make([]*Client,0,1)
err, count := handler.funcRpcClient(nodeId, serviceMethod,false, pClientList[:]) err, pClientList := handler.funcRpcClient(nodeId, serviceMethod,false, pClientList[:])
if count == 0 || err != nil { if len(pClientList) == 0 || err != nil {
if err == nil { if err == nil {
if nodeId != NodeIdNull { if nodeId != NodeIdNull {
err = fmt.Errorf("cannot find %s from nodeId %d",serviceMethod,nodeId) err = fmt.Errorf("cannot find %s from nodeId %d",serviceMethod,nodeId)
@@ -524,7 +523,7 @@ func (handler *RpcHandler) asyncCallRpc(timeout time.Duration,nodeId string, ser
return emptyCancelRpc,nil return emptyCancelRpc,nil
} }
if count > 1 { if len(pClientList) > 1 {
err := errors.New("cannot call more then 1 node") err := errors.New("cannot call more then 1 node")
fVal.Call([]reflect.Value{reflect.ValueOf(reply), reflect.ValueOf(err)}) fVal.Call([]reflect.Value{reflect.ValueOf(reply), reflect.ValueOf(err)})
log.Error("cannot call more then 1 node",log.String("serviceMethod",serviceMethod)) log.Error("cannot call more then 1 node",log.String("serviceMethod",serviceMethod))
@@ -593,12 +592,13 @@ func (handler *RpcHandler) CastGo(serviceMethod string, args interface{}) error
func (handler *RpcHandler) RawGoNode(rpcProcessorType RpcProcessorType, nodeId string, rpcMethodId uint32, serviceName string, rawArgs []byte) error { func (handler *RpcHandler) RawGoNode(rpcProcessorType RpcProcessorType, nodeId string, rpcMethodId uint32, serviceName string, rawArgs []byte) error {
processor := GetProcessor(uint8(rpcProcessorType)) processor := GetProcessor(uint8(rpcProcessorType))
err, count := handler.funcRpcClient(nodeId, serviceName,false, handler.pClientList) pClientList := make([]*Client,0,1)
if count == 0 || err != nil { err, pClientList := handler.funcRpcClient(nodeId, serviceName,false, pClientList)
if len(pClientList) == 0 || err != nil {
log.Error("call serviceMethod is failed",log.ErrorAttr("error",err)) log.Error("call serviceMethod is failed",log.ErrorAttr("error",err))
return err return err
} }
if count > 1 { if len(pClientList) > 1 {
err := errors.New("cannot call more then 1 node") err := errors.New("cannot call more then 1 node")
log.Error("cannot call more then 1 node",log.String("serviceName",serviceName)) log.Error("cannot call more then 1 node",log.String("serviceName",serviceName))
return err return err
@@ -606,14 +606,14 @@ func (handler *RpcHandler) RawGoNode(rpcProcessorType RpcProcessorType, nodeId s
//2.rpcClient调用 //2.rpcClient调用
//如果调用本结点服务 //如果调用本结点服务
for i := 0; i < count; i++ { for i := 0; i < len(pClientList); i++ {
//跨node调用 //跨node调用
pCall := handler.pClientList[i].RawGo(handler.pClientList[i].GetTargetNodeId(),DefaultRpcTimeout,handler.rpcHandler,processor, true, rpcMethodId, serviceName, rawArgs, nil) pCall := pClientList[i].RawGo(pClientList[i].GetTargetNodeId(),DefaultRpcTimeout,handler.rpcHandler,processor, true, rpcMethodId, serviceName, rawArgs, nil)
if pCall.Err != nil { if pCall.Err != nil {
err = pCall.Err err = pCall.Err
} }
handler.pClientList[i].RemovePending(pCall.Seq) pClientList[i].RemovePending(pCall.Seq)
ReleaseCall(pCall) ReleaseCall(pCall)
} }

View File

@@ -14,11 +14,14 @@ import (
"sync" "sync"
"sync/atomic" "sync/atomic"
"github.com/duanhf2012/origin/v2/concurrent" "github.com/duanhf2012/origin/v2/concurrent"
"encoding/json"
) )
var timerDispatcherLen = 100000 var timerDispatcherLen = 100000
var maxServiceEventChannelNum = 2000000 var maxServiceEventChannelNum = 2000000
type IService interface { type IService interface {
concurrent.IConcurrent concurrent.IConcurrent
Init(iService IService,getClientFun rpc.FuncRpcClient,getServerFun rpc.FuncRpcServer,serviceCfg interface{}) Init(iService IService,getClientFun rpc.FuncRpcClient,getServerFun rpc.FuncRpcServer,serviceCfg interface{})
@@ -293,6 +296,24 @@ func (s *Service) GetServiceCfg()interface{}{
return s.serviceCfg return s.serviceCfg
} }
func (s *Service) ParseServiceCfg(cfg interface{}) error{
if s.serviceCfg == nil {
return errors.New("no service configuration found")
}
rv := reflect.ValueOf(s.serviceCfg)
if rv.Kind() == reflect.Ptr && rv.IsNil() {
return errors.New("no service configuration found")
}
bytes,err := json.Marshal(s.serviceCfg)
if err != nil {
return err
}
return json.Unmarshal(bytes,cfg)
}
func (s *Service) GetProfiler() *profiler.Profiler{ func (s *Service) GetProfiler() *profiler.Profiler{
return s.profiler return s.profiler
} }
@@ -368,12 +389,12 @@ func (s *Service) UnRegNatsConnListener() {
func (s *Service) RegDiscoverListener(discoveryServiceListener rpc.IDiscoveryServiceListener) { func (s *Service) RegDiscoverListener(discoveryServiceListener rpc.IDiscoveryServiceListener) {
s.discoveryServiceLister = discoveryServiceListener s.discoveryServiceLister = discoveryServiceListener
s.RegEventReceiverFunc(event.Sys_Event_DiscoverService,s.GetEventHandler(),s.OnDiscoverServiceEvent) s.RegEventReceiverFunc(event.Sys_Event_DiscoverService,s.GetEventHandler(),s.OnDiscoverServiceEvent)
RegDiscoveryServiceEventFun(s.GetName()) RegRpcEventFun(s.GetName())
} }
func (s *Service) UnRegDiscoverListener() { func (s *Service) UnRegDiscoverListener() {
s.UnRegEventReceiverFunc(event.Sys_Event_DiscoverService,s.GetEventHandler()) s.UnRegEventReceiverFunc(event.Sys_Event_DiscoverService,s.GetEventHandler())
UnRegDiscoveryServiceEventFun(s.GetName()) UnRegRpcEventFun(s.GetName())
} }
func (s *Service) PushRpcRequest(rpcRequest *rpc.RpcRequest) error{ func (s *Service) PushRpcRequest(rpcRequest *rpc.RpcRequest) error{

View File

@@ -14,9 +14,6 @@ type RegDiscoveryServiceEventFunType func(serviceName string)
var RegRpcEventFun RegRpcEventFunType var RegRpcEventFun RegRpcEventFunType
var UnRegRpcEventFun RegRpcEventFunType var UnRegRpcEventFun RegRpcEventFunType
var RegDiscoveryServiceEventFun RegDiscoveryServiceEventFunType
var UnRegDiscoveryServiceEventFun RegDiscoveryServiceEventFunType
func init(){ func init(){
mapServiceName = map[string]IService{} mapServiceName = map[string]IService{}
setupServiceList = []IService{} setupServiceList = []IService{}
@@ -26,7 +23,7 @@ func Init() {
for _,s := range setupServiceList { for _,s := range setupServiceList {
err := s.OnInit() err := s.OnInit()
if err != nil { if err != nil {
log.SError("Failed to initialize "+s.GetName()+" service:"+err.Error()) log.Error("Failed to initialize "+s.GetName()+" service",log.ErrorAttr("err",err))
os.Exit(1) os.Exit(1)
} }
} }

View File

@@ -0,0 +1,314 @@
package ginmodule
import (
"context"
"github.com/duanhf2012/origin/v2/event"
"github.com/duanhf2012/origin/v2/log"
"github.com/duanhf2012/origin/v2/service"
"github.com/gin-gonic/gin"
"log/slog"
"net/http"
"strings"
"time"
"io"
)
type IGinProcessor interface {
Process(data *gin.Context) (*gin.Context, error)
}
type GinModule struct {
service.Module
*gin.Engine
srv *http.Server
listenAddr string
handleTimeout time.Duration
processor []IGinProcessor
}
func (gm *GinModule) Init(addr string, handleTimeout time.Duration,engine *gin.Engine) {
gm.listenAddr = addr
gm.handleTimeout = handleTimeout
gm.Engine = engine
}
func (gm *GinModule) SetupDataProcessor(processor ...IGinProcessor) {
gm.processor = processor
}
func (gm *GinModule) AppendDataProcessor(processor ...IGinProcessor) {
gm.processor = append(gm.processor, processor...)
}
func (gm *GinModule) OnInit() error {
if gm.Engine == nil {
gm.Engine = gin.Default()
}
gm.srv = &http.Server{
Addr: gm.listenAddr,
Handler: gm.Engine,
}
gm.Engine.Use(Logger())
gm.Engine.Use(gin.Recovery())
gm.GetEventProcessor().RegEventReceiverFunc(event.Sys_Event_Gin_Event, gm.GetEventHandler(), gm.eventHandler)
return nil
}
func (gm *GinModule) eventHandler(ev event.IEvent) {
ginEvent := ev.(*GinEvent)
for _, handler := range ginEvent.handlersChain {
handler(&ginEvent.c)
}
//ginEvent.chanWait <- struct{}{}
}
func (gm *GinModule) Start() {
gm.srv.Addr = gm.listenAddr
log.Info("http start listen", slog.Any("addr", gm.listenAddr))
go func() {
err := gm.srv.ListenAndServe()
if err != nil {
log.Error("ListenAndServe error", slog.Any("error", err.Error()))
}
}()
}
func (gm *GinModule) StartTLS(certFile, keyFile string) {
log.Info("http start listen", slog.Any("addr", gm.listenAddr))
go func() {
err := gm.srv.ListenAndServeTLS(certFile, keyFile)
if err != nil {
log.Fatal("ListenAndServeTLS error", slog.Any("error", err.Error()))
}
}()
}
func (gm *GinModule) Stop(ctx context.Context) {
if err := gm.srv.Shutdown(ctx); err != nil {
log.SError("Server Shutdown", slog.Any("error", err))
}
}
type SafeContext struct {
*gin.Context
chanWait chan struct{}
}
func (c *SafeContext) JSONAndDone(code int, obj any) {
c.Context.JSON(code,obj)
c.Done()
}
func (c *SafeContext) AsciiJSONAndDone(code int, obj any){
c.Context.AsciiJSON(code,obj)
c.Done()
}
func (c *SafeContext) PureJSONAndDone(code int, obj any){
c.Context.PureJSON(code,obj)
c.Done()
}
func (c *SafeContext) XMLAndDone(code int, obj any){
c.Context.XML(code,obj)
c.Done()
}
func (c *SafeContext) YAMLAndDone(code int, obj any){
c.Context.YAML(code,obj)
c.Done()
}
func (c *SafeContext) TOMLAndDone(code int, obj any){
c.Context.TOML(code,obj)
c.Done()
}
func (c *SafeContext) ProtoBufAndDone(code int, obj any){
c.Context.ProtoBuf(code,obj)
c.Done()
}
func (c *SafeContext) StringAndDone(code int, format string, values ...any){
c.Context.String(code,format,values...)
c.Done()
}
func (c *SafeContext) RedirectAndDone(code int, location string){
c.Context.Redirect(code,location)
c.Done()
}
func (c *SafeContext) DataAndDone(code int, contentType string, data []byte){
c.Context.Data(code,contentType,data)
c.Done()
}
func (c *SafeContext) DataFromReaderAndDone(code int, contentLength int64, contentType string, reader io.Reader, extraHeaders map[string]string){
c.DataFromReader(code,contentLength,contentType,reader,extraHeaders)
c.Done()
}
func (c *SafeContext) HTMLAndDone(code int, name string, obj any){
c.Context.HTML(code,name,obj)
c.Done()
}
func (c *SafeContext) IndentedJSONAndDone(code int, obj any){
c.Context.IndentedJSON(code,obj)
c.Done()
}
func (c *SafeContext) SecureJSONAndDone(code int, obj any){
c.Context.SecureJSON(code,obj)
c.Done()
}
func (c *SafeContext) JSONPAndDone(code int, obj any){
c.Context.JSONP(code,obj)
c.Done()
}
func (c *SafeContext) Done(){
c.chanWait <- struct{}{}
}
type GinEvent struct {
handlersChain []SafeHandlerFunc
c SafeContext
}
type SafeHandlerFunc func(*SafeContext)
func (ge *GinEvent) GetEventType() event.EventType {
return event.Sys_Event_Gin_Event
}
func (gm *GinModule) handleMethod(httpMethod, relativePath string, handlers ...SafeHandlerFunc) gin.IRoutes {
return gm.Engine.Handle(httpMethod, relativePath, func(c *gin.Context) {
for _, p := range gm.processor {
_, err := p.Process(c)
if err != nil {
return
}
}
var ev GinEvent
chanWait := make(chan struct{},2)
ev.c.chanWait = chanWait
ev.handlersChain = handlers
ev.c.Context = c
gm.NotifyEvent(&ev)
ctx,cancel := context.WithTimeout(context.Background(), gm.handleTimeout)
defer cancel()
select{
case <-ctx.Done():
log.Error("GinModule process timeout", slog.Any("path", c.Request.URL.Path))
c.AbortWithStatus(http.StatusRequestTimeout)
case <-chanWait:
}
})
}
// GET 回调处理是在gin协程中
func (gm *GinModule) GET(relativePath string, handlers ...gin.HandlerFunc) gin.IRoutes {
return gm.Engine.GET(relativePath, handlers...)
}
// POST 回调处理是在gin协程中
func (gm *GinModule) POST(relativePath string, handlers ...gin.HandlerFunc) gin.IRoutes {
return gm.Engine.POST(relativePath, handlers...)
}
// DELETE 回调处理是在gin协程中
func (gm *GinModule) DELETE(relativePath string, handlers ...gin.HandlerFunc) gin.IRoutes {
return gm.Engine.DELETE(relativePath, handlers...)
}
// PATCH 回调处理是在gin协程中
func (gm *GinModule) PATCH(relativePath string, handlers ...gin.HandlerFunc) gin.IRoutes {
return gm.Engine.PATCH(relativePath, handlers...)
}
// Put 回调处理是在gin协程中
func (gm *GinModule) Put(relativePath string, handlers ...gin.HandlerFunc) gin.IRoutes {
return gm.Engine.PUT(relativePath, handlers...)
}
// SafeGET 回调处理是在service协程中
func (gm *GinModule) SafeGET(relativePath string, handlers ...SafeHandlerFunc) gin.IRoutes {
return gm.handleMethod(http.MethodGet, relativePath, handlers...)
}
// SafePOST 回调处理是在service协程中
func (gm *GinModule) SafePOST(relativePath string, handlers ...SafeHandlerFunc) gin.IRoutes {
return gm.handleMethod(http.MethodPost, relativePath, handlers...)
}
// SafeDELETE 回调处理是在service协程中
func (gm *GinModule) SafeDELETE(relativePath string, handlers ...SafeHandlerFunc) gin.IRoutes {
return gm.handleMethod(http.MethodDelete, relativePath, handlers...)
}
// SafePATCH 回调处理是在service协程中
func (gm *GinModule) SafePATCH(relativePath string, handlers ...SafeHandlerFunc) gin.IRoutes {
return gm.handleMethod(http.MethodPatch, relativePath, handlers...)
}
// SafePut 回调处理是在service协程中
func (gm *GinModule) SafePut(relativePath string, handlers ...SafeHandlerFunc) gin.IRoutes {
return gm.handleMethod(http.MethodPut, relativePath, handlers...)
}
func GetIPWithProxyHeaders(c *gin.Context) string {
// 尝试从 X-Real-IP 头部获取真实 IP
ip := c.GetHeader("X-Real-IP")
// 如果 X-Real-IP 头部不存在,则尝试从 X-Forwarded-For 头部获取
if ip == "" {
ip = c.GetHeader("X-Forwarded-For")
}
// 如果两者都不存在,则使用默认的 ClientIP 方法获取 IP
if ip == "" {
ip = c.ClientIP()
}
return ip
}
func GetIPWithValidatedProxyHeaders(c *gin.Context) string {
// 获取代理头部
proxyHeaders := c.Request.Header.Get("X-Real-IP,X-Forwarded-For")
// 分割代理头部,取第一个 IP 作为真实 IP
ips := strings.Split(proxyHeaders, ",")
ip := strings.TrimSpace(ips[0])
// 如果 IP 格式合法,则使用获取到的 IP否则使用默认的 ClientIP 方法获取
if isValidIP(ip) {
return ip
} else {
ip = c.ClientIP()
return ip
}
}
// isValidIP 判断 IP 格式是否合法
func isValidIP(ip string) bool {
// 此处添加自定义的 IP 格式验证逻辑
// 例如,使用正则表达式验证 IP 格式
// ...
if ip == "" {
return false
}
return true
}

View File

@@ -0,0 +1,79 @@
package ginmodule
import (
"fmt"
"github.com/duanhf2012/origin/v2/log"
"github.com/gin-gonic/gin"
"time"
)
// Logger 是一个自定义的日志中间件
func Logger() gin.HandlerFunc {
return func(c *gin.Context) {
// 处理请求前记录日志
// 开始时间
startTime := time.Now()
// 调用该请求的剩余处理程序
c.Next()
// 结束时间
endTime := time.Now()
// 执行时间
latencyTime := endTime.Sub(startTime)
// 请求IP
clientIP := c.ClientIP()
// remoteIP := c.RemoteIP()
// 请求方式
reqMethod := c.Request.Method
// 请求路由
reqUri := c.Request.RequestURI
// 请求协议
reqProto := c.Request.Proto
// 请求来源
repReferer := c.Request.Referer()
// 请求UA
reqUA := c.Request.UserAgent()
// 请求响应内容长度
resLength := c.Writer.Size()
if resLength < 0 {
resLength = 0
}
// 响应状态码
statusCode := c.Writer.Status()
log.SDebug(fmt.Sprintf(
"%s | %3d | %s %10s | \033[44;37m%-6s\033[0m %s %s | %10v | \"%s\" \"%s\"",
colorForStatus(statusCode),
statusCode,
colorForStatus(0),
clientIP,
// remoteIP,
reqMethod,
reqUri,
reqProto,
latencyTime,
reqUA,
repReferer,
))
}
}
// colorForStatus 根据 HTTP 状态码返回 ANSI 颜色代码
func colorForStatus(code int) string {
switch {
case code >= 200 && code < 300:
return "\033[42;1;37m" // green
case code >= 300 && code < 400:
return "\033[34m" // blue
case code >= 400 && code < 500:
return "\033[33m" // yellow
case code == 0:
return "\033[0m" // cancel
default:
return "\033[31m" // red
}
}

View File

@@ -0,0 +1,65 @@
package kafkamodule
import "github.com/IBM/sarama"
type KafkaAdmin struct {
sarama.ClusterAdmin
mapTopic map[string]sarama.TopicDetail
}
func (ka *KafkaAdmin) Setup(kafkaVersion string, addrs []string) error {
config := sarama.NewConfig()
var err error
config.Version, err = sarama.ParseKafkaVersion(kafkaVersion)
if err != nil {
return err
}
ka.ClusterAdmin, err = sarama.NewClusterAdmin(addrs, config)
if err != nil {
return err
}
ka.mapTopic, err = ka.GetTopics()
if err != nil {
ka.ClusterAdmin.Close()
return err
}
return nil
}
func (ka *KafkaAdmin) RefreshTopic() error {
var err error
ka.mapTopic, err = ka.GetTopics()
return err
}
func (ka *KafkaAdmin) HasTopic(topic string) bool {
_, ok := ka.mapTopic[topic]
return ok
}
func (ka *KafkaAdmin) GetTopicDetail(topic string) *sarama.TopicDetail {
topicDetail, ok := ka.mapTopic[topic]
if ok == false {
return nil
}
return &topicDetail
}
func (ka *KafkaAdmin) GetTopics() (map[string]sarama.TopicDetail, error) {
return ka.ListTopics()
}
// CreateTopic 创建主题
// numPartitions分区数
// replicationFactor副本数
// validateOnly参数执行操作时只进行参数验证而不实际执行操作
func (ka *KafkaAdmin) CreateTopic(topic string, numPartitions int32, replicationFactor int16, validateOnly bool) error {
return ka.ClusterAdmin.CreateTopic(topic, &sarama.TopicDetail{NumPartitions: numPartitions, ReplicationFactor: replicationFactor}, validateOnly)
}

View File

@@ -0,0 +1,289 @@
package kafkamodule
import (
"context"
"fmt"
"github.com/IBM/sarama"
"github.com/duanhf2012/origin/v2/log"
"sync"
"time"
)
type ConsumerGroup struct {
sarama.ConsumerGroup
waitGroup sync.WaitGroup
chanExit chan error
ready chan bool
cancel context.CancelFunc
groupId string
}
func NewConsumerConfig(kafkaVersion string, assignor string, offsetsInitial int64) (*sarama.Config, error) {
var err error
config := sarama.NewConfig()
config.Version, err = sarama.ParseKafkaVersion(kafkaVersion)
config.Consumer.Offsets.Initial = offsetsInitial
config.Consumer.Offsets.AutoCommit.Enable = false
switch assignor {
case "sticky":
// 黏性roundRobin,rebalance之后首先保证前面的分配,从后面剥离
// topic:T0{P0,P1,P2,P3,P4,P5},消费者:C1,C2
// ---------------before rebalance:即roundRobin
// C1: T0{P0} T0{P2} T0{P4}
// C2: T0{P1} T0{P3} T0{P5}
// ----------------after rebalance:增加了一个消费者
// C1: T0{P0} T0{P2}
// C2: T0{P1} T0{P3}
// C3: T0{P4} T0{P5} until每个消费者的分区数误差不超过1
config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.NewBalanceStrategySticky()}
case "roundrobin":
// roundRobin --逐个平均分发
// topic: T0{P0,P1,P2},T1{P0,P1,P2,P3}两个消费者C1,C2
// C1: T0{P0} T0{P2} T1{P1} T1{P3}
// C2: T0{P1} T1{P0} T1{P2}
config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.NewBalanceStrategyRoundRobin()}
case "range":
// 默认值 --一次平均分发
// topic: T0{P0,P1,P2,P3},T1{P0,P1,P2,P3},两个消费者C1,C2
// T1分区总数6 / 消费者数2 = 3 ,即该会话的分区每个消费者分3个
// T2分区总数4 / 消费者数2 = 2 ,即该会话的分区每个消费者分2个
// C1: T0{P0, P1, P2} T1{P0, P1}
// C2: T0{P3, P4, P5} T1{P2, P3}
config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.NewBalanceStrategyRange()}
default:
return nil, fmt.Errorf("Unrecognized consumer group partition assignor: %s", assignor)
}
if err != nil {
return nil, err
}
return config, nil
}
type IMsgReceiver interface {
Receiver(msgs []*sarama.ConsumerMessage) bool
}
func (c *ConsumerGroup) Setup(addr []string, topics []string, groupId string, config *sarama.Config, receiverInterval time.Duration, maxReceiverNum int, msgReceiver IMsgReceiver) error {
var err error
c.ConsumerGroup, err = sarama.NewConsumerGroup(addr, groupId, config)
if err != nil {
return nil
}
c.groupId = groupId
c.chanExit = make(chan error, 1)
var handler ConsumerGroupHandler
handler.receiver = msgReceiver
handler.maxReceiverNum = maxReceiverNum
handler.receiverInterval = receiverInterval
handler.chanExit = c.chanExit
var ctx context.Context
ctx, c.cancel = context.WithCancel(context.Background())
c.waitGroup.Add(1)
go func() {
defer c.waitGroup.Done()
for {
if err = c.Consume(ctx, topics, &handler); err != nil {
// 当setup失败的时候error会返回到这里
log.Error("Error from consumer", log.Any("err", err))
return
}
// check if context was cancelled, signaling that the consumer should stop
if ctx.Err() != nil {
log.Info("consumer stop", log.Any("info", ctx.Err()))
}
c.chanExit <- err
}
}()
err = <-c.chanExit
//已经准备好了
return err
}
func (c *ConsumerGroup) Close() {
log.Info("close consumerGroup")
//1.cancel掉
c.cancel()
//2.关闭连接
err := c.ConsumerGroup.Close()
if err != nil {
log.Error("close consumerGroup fail", log.Any("err", err.Error()))
}
//3.等待退出
c.waitGroup.Wait()
}
type ConsumerGroupHandler struct {
receiver IMsgReceiver
receiverInterval time.Duration
maxReceiverNum int
//mapTopicOffset map[string]map[int32]int //map[topic]map[partitionId]offsetInfo
mapTopicData map[string]*MsgData
mx sync.Mutex
chanExit chan error
isRebalance bool //是否为再平衡
//stopSig *int32
}
type MsgData struct {
sync.Mutex
msg []*sarama.ConsumerMessage
mapPartitionOffset map[int32]int64
}
func (ch *ConsumerGroupHandler) Flush(session sarama.ConsumerGroupSession, topic string) {
if topic != "" {
msgData := ch.GetMsgData(topic)
msgData.flush(session, ch.receiver, topic)
return
}
for tp, msgData := range ch.mapTopicData {
msgData.flush(session, ch.receiver, tp)
}
}
func (ch *ConsumerGroupHandler) GetMsgData(topic string) *MsgData {
ch.mx.Lock()
defer ch.mx.Unlock()
msgData := ch.mapTopicData[topic]
if msgData == nil {
msgData = &MsgData{}
msgData.msg = make([]*sarama.ConsumerMessage, 0, ch.maxReceiverNum)
ch.mapTopicData[topic] = msgData
}
return msgData
}
func (md *MsgData) flush(session sarama.ConsumerGroupSession, receiver IMsgReceiver, topic string) {
if len(md.msg) == 0 {
return
}
//发送给接收者
for {
ok := receiver.Receiver(md.msg)
if ok == true {
break
}
}
for pId, offset := range md.mapPartitionOffset {
session.MarkOffset(topic, pId, offset+1, "")
log.Info(fmt.Sprintf("topic %s,pid %d,offset %d", topic, pId, offset+1))
}
session.Commit()
//log.Info("commit")
//time.Sleep(1000 * time.Second)
//置空
md.msg = md.msg[:0]
clear(md.mapPartitionOffset)
}
func (md *MsgData) appendMsg(session sarama.ConsumerGroupSession, msg *sarama.ConsumerMessage, receiver IMsgReceiver, maxReceiverNum int) {
md.Lock()
defer md.Unlock()
//收到的offset只会越来越大在
if md.mapPartitionOffset == nil {
md.mapPartitionOffset = make(map[int32]int64, 10)
}
md.mapPartitionOffset[msg.Partition] = msg.Offset
md.msg = append(md.msg, msg)
if len(md.msg) < maxReceiverNum {
return
}
md.flush(session, receiver, msg.Topic)
}
func (ch *ConsumerGroupHandler) AppendMsg(session sarama.ConsumerGroupSession, msg *sarama.ConsumerMessage) {
dataMsg := ch.GetMsgData(msg.Topic)
dataMsg.appendMsg(session, msg, ch.receiver, ch.maxReceiverNum)
}
func (ch *ConsumerGroupHandler) Setup(session sarama.ConsumerGroupSession) error {
ch.mapTopicData = make(map[string]*MsgData, 128)
if ch.isRebalance == false {
ch.chanExit <- nil
}
ch.isRebalance = true
return nil
}
func (ch *ConsumerGroupHandler) Cleanup(session sarama.ConsumerGroupSession) error {
ch.Flush(session, "")
return nil
}
func (ch *ConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
ticker := time.NewTicker(ch.receiverInterval)
for {
select {
case msg := <-claim.Messages():
if msg == nil {
log.SWarning("claim will exit", log.Any("topic", claim.Topic()), log.Any("Partition", claim.Partition()))
return nil
}
ch.AppendMsg(session, msg)
case <-ticker.C:
ch.Flush(session, claim.Topic())
case <-session.Context().Done():
return nil
}
}
}
/*
阿里云参数说明https://sls.aliyun.com/doc/oscompatibledemo/sarama_go_kafka_consume.html
conf.Net.TLS.Enable = true
conf.Net.SASL.Enable = true
conf.Net.SASL.User = project
conf.Net.SASL.Password = fmt.Sprintf("%s#%s", accessId, accessKey)
conf.Net.SASL.Mechanism = "PLAIN"
conf.Net.TLS.Enable = true
conf.Net.SASL.Enable = true
conf.Net.SASL.User = project
conf.Net.SASL.Password = fmt.Sprintf("%s#%s", accessId, accessKey)
conf.Net.SASL.Mechanism = "PLAIN"
conf.Consumer.Fetch.Min = 1
conf.Consumer.Fetch.Default = 1024 * 1024
conf.Consumer.Retry.Backoff = 2 * time.Second
conf.Consumer.MaxWaitTime = 250 * time.Millisecond
conf.Consumer.MaxProcessingTime = 100 * time.Millisecond
conf.Consumer.Return.Errors = false
conf.Consumer.Offsets.AutoCommit.Enable = true
conf.Consumer.Offsets.AutoCommit.Interval = 1 * time.Second
conf.Consumer.Offsets.Initial = sarama.OffsetOldest
conf.Consumer.Offsets.Retry.Max = 3
*/

View File

@@ -0,0 +1,146 @@
package kafkamodule
import (
"context"
"github.com/IBM/sarama"
"github.com/duanhf2012/origin/v2/log"
"github.com/duanhf2012/origin/v2/service"
"time"
)
type IProducer interface {
}
type SyncProducer struct {
}
type AsyncProducer struct {
}
type Producer struct {
service.Module
sarama.SyncProducer
sarama.AsyncProducer
}
// NewProducerConfig 新建producerConfig
// kafkaVersion kafka版本
// returnErrreturnSucc 是否返回错误与成功
// requiredAcks -1 #全量同步确认,强可靠性保证(当所有的 leader 和 follower 都接收成功时)#WaitForAll 1 #leader 确认收到, 默认(仅 leader 反馈)#WaitForLocal 0 #不确认,但是吞吐量大(不 care 结果) #NoResponse
// Idempotent幂等 确保信息都准确写入一份副本,用于幂等生产者当这一项设置为true的时候生产者将保证生产的消息一定是有序且精确一次的
// partitioner 生成分区器,用于选择向哪个分区发送信息,默认情况下对消息密钥进行散列
func NewProducerConfig(kafkaVersion string, returnErr bool, returnSucc bool, requiredAcks sarama.RequiredAcks, Idempotent bool,
partitioner sarama.PartitionerConstructor) (*sarama.Config, error) {
config := sarama.NewConfig()
var err error
config.Version, err = sarama.ParseKafkaVersion(kafkaVersion)
if err != nil {
return nil, err
}
config.Producer.Return.Errors = returnErr
config.Producer.Return.Successes = returnSucc
config.Producer.RequiredAcks = requiredAcks
config.Producer.Partitioner = partitioner
config.Producer.Timeout = 10 * time.Second
config.Producer.Idempotent = Idempotent
if Idempotent == true {
config.Net.MaxOpenRequests = 1
}
return config, nil
}
func (p *Producer) SyncSetup(addr []string, config *sarama.Config) error {
var err error
p.SyncProducer, err = sarama.NewSyncProducer(addr, config)
if err != nil {
return err
}
return nil
}
func (p *Producer) ASyncSetup(addr []string, config *sarama.Config) error {
var err error
p.AsyncProducer, err = sarama.NewAsyncProducer(addr, config)
if err != nil {
return err
}
go func() {
p.asyncRun()
}()
return nil
}
func (p *Producer) asyncRun() {
for {
select {
case sm := <-p.Successes():
if sm.Metadata == nil {
break
}
asyncReturn := sm.Metadata.(*AsyncReturn)
asyncReturn.chanReturn <- asyncReturn
case em := <-p.Errors():
log.Error("async kafkamodule error", log.ErrorAttr("err", em.Err))
if em.Msg.Metadata == nil {
break
}
asyncReturn := em.Msg.Metadata.(*AsyncReturn)
asyncReturn.Err = em.Err
asyncReturn.chanReturn <- asyncReturn
}
}
}
type AsyncReturn struct {
Msg *sarama.ProducerMessage
Err error
chanReturn chan *AsyncReturn
}
func (ar *AsyncReturn) WaitOk(ctx context.Context) (*sarama.ProducerMessage, error) {
asyncReturn := ar.Msg.Metadata.(*AsyncReturn)
select {
case <-asyncReturn.chanReturn:
return asyncReturn.Msg, asyncReturn.Err
case <-ctx.Done():
return nil, ctx.Err()
}
}
func (p *Producer) AsyncSendMessage(msg *sarama.ProducerMessage) *AsyncReturn {
asyncReturn := AsyncReturn{Msg: msg, chanReturn: make(chan *AsyncReturn, 1)}
msg.Metadata = &asyncReturn
p.AsyncProducer.Input() <- msg
return &asyncReturn
}
func (p *Producer) AsyncPushMessage(msg *sarama.ProducerMessage) {
p.AsyncProducer.Input() <- msg
}
func (p *Producer) Close() {
if p.SyncProducer != nil {
p.SyncProducer.Close()
p.SyncProducer = nil
}
if p.AsyncProducer != nil {
p.AsyncProducer.Close()
p.AsyncProducer = nil
}
}
func (p *Producer) SendMessage(msg *sarama.ProducerMessage) (partition int32, offset int64, err error) {
return p.SyncProducer.SendMessage(msg)
}
func (p *Producer) SendMessages(msgs []*sarama.ProducerMessage) error {
return p.SyncProducer.SendMessages(msgs)
}

View File

@@ -0,0 +1,151 @@
package kafkamodule
import (
"context"
"fmt"
"github.com/IBM/sarama"
"testing"
"time"
)
// 对各参数和机制名称的说明https://blog.csdn.net/u013311345/article/details/129217728
type MsgReceiver struct {
t *testing.T
}
func (mr *MsgReceiver) Receiver(msgs []*sarama.ConsumerMessage) bool {
for _, m := range msgs {
mr.t.Logf("time:%s, topic:%s, partition:%d, offset:%d, key:%s, value:%s", time.Now().Format("2006-01-02 15:04:05.000"), m.Topic, m.Partition, m.Offset, m.Key, string(m.Value))
}
return true
}
var addr = []string{"192.168.13.24:9092", "192.168.13.24:9093", "192.168.13.24:9094", "192.168.13.24:9095"}
var topicName = []string{"test_topic_1", "test_topic_2"}
var kafkaVersion = "3.3.1"
func producer(t *testing.T) {
var admin KafkaAdmin
err := admin.Setup(kafkaVersion, addr)
if err != nil {
t.Fatal(err)
}
for _, tName := range topicName {
if admin.HasTopic(tName) == false {
err = admin.CreateTopic(tName, 2, 2, false)
t.Log(err)
}
}
var pd Producer
cfg, err := NewProducerConfig(kafkaVersion, true, true, sarama.WaitForAll, false, sarama.NewHashPartitioner)
if err != nil {
t.Fatal(err)
}
err = pd.SyncSetup(addr, cfg)
if err != nil {
t.Fatal(err)
}
now := time.Now()
//msgs := make([]*sarama.ProducerMessage, 0, 20000)
for i := 0; i < 20000; i++ {
var msg sarama.ProducerMessage
msg.Key = sarama.StringEncoder(fmt.Sprintf("%d", i))
msg.Topic = topicName[0]
msg.Value = sarama.StringEncoder(fmt.Sprintf("i'm %d", i))
pd.SendMessage(&msg)
//msgs = append(msgs, &msg)
}
//err = pd.SendMessages(msgs)
//t.Log(err)
t.Log(time.Now().Sub(now).Milliseconds())
pd.Close()
}
func producer_async(t *testing.T) {
var admin KafkaAdmin
err := admin.Setup(kafkaVersion, addr)
if err != nil {
t.Fatal(err)
}
for _, tName := range topicName {
if admin.HasTopic(tName) == false {
err = admin.CreateTopic(tName, 10, 2, false)
t.Log(err)
}
}
var pd Producer
cfg, err := NewProducerConfig(kafkaVersion, true, true, sarama.WaitForAll, false, sarama.NewHashPartitioner)
if err != nil {
t.Fatal(err)
}
err = pd.ASyncSetup(addr, cfg)
if err != nil {
t.Fatal(err)
}
now := time.Now()
msgs := make([]*AsyncReturn, 0, 20000)
for i := 0; i < 200000; i++ {
var msg sarama.ProducerMessage
msg.Key = sarama.StringEncoder(fmt.Sprintf("%d", i))
msg.Topic = topicName[0]
msg.Value = sarama.StringEncoder(fmt.Sprintf("i'm %d", i))
r := pd.AsyncSendMessage(&msg)
msgs = append(msgs, r)
}
//err = pd.SendMessages(msgs)
//t.Log(err)
for _, r := range msgs {
r.WaitOk(context.Background())
//t.Log(m, e)
}
t.Log(time.Now().Sub(now).Milliseconds())
time.Sleep(1000 * time.Second)
pd.Close()
}
func consumer(t *testing.T) {
var admin KafkaAdmin
err := admin.Setup(kafkaVersion, addr)
if err != nil {
t.Fatal(err)
}
for _, tName := range topicName {
if admin.HasTopic(tName) == false {
err = admin.CreateTopic(tName, 10, 2, false)
t.Log(err)
}
}
var cg ConsumerGroup
cfg, err := NewConsumerConfig(kafkaVersion, "sticky", sarama.OffsetOldest)
if err != nil {
t.Fatal(err)
}
err = cg.Setup(addr, topicName, "test_groupId", cfg, 50*time.Second, 10, &MsgReceiver{t: t})
t.Log(err)
time.Sleep(10000 * time.Second)
cg.Close()
}
func TestConsumerAndProducer(t *testing.T) {
producer_async(t)
//go producer(t)
//producer(t)
//consumer(t)
}

View File

@@ -0,0 +1,7 @@
package kafkamodule
type Sasl struct {
UserName string `json:"UserName"`
Passwd string `json:"Passwd"`
InstanceId string `json:"InstanceId"`
}

View File

@@ -1,181 +0,0 @@
package mongomodule
import (
"github.com/duanhf2012/origin/v2/log"
"gopkg.in/mgo.v2"
"gopkg.in/mgo.v2/bson"
"sync"
"time"
"container/heap"
_ "gopkg.in/mgo.v2"
)
// session
type Session struct {
*mgo.Session
ref int
index int
}
type SessionHeap []*Session
func (h SessionHeap) Len() int {
return len(h)
}
func (h SessionHeap) Less(i, j int) bool {
return h[i].ref < h[j].ref
}
func (h SessionHeap) Swap(i, j int) {
h[i], h[j] = h[j], h[i]
h[i].index = i
h[j].index = j
}
func (h *SessionHeap) Push(s interface{}) {
s.(*Session).index = len(*h)
*h = append(*h, s.(*Session))
}
func (h *SessionHeap) Pop() interface{} {
l := len(*h)
s := (*h)[l-1]
s.index = -1
*h = (*h)[:l-1]
return s
}
type DialContext struct {
sync.Mutex
sessions SessionHeap
}
type MongoModule struct {
dailContext *DialContext
}
func (slf *MongoModule) Init(url string,sessionNum uint32,dialTimeout time.Duration, timeout time.Duration) error {
var err error
slf.dailContext, err = dialWithTimeout(url, sessionNum, dialTimeout, timeout)
return err
}
func (slf *MongoModule) Ref() *Session{
return slf.dailContext.Ref()
}
func (slf *MongoModule) UnRef(s *Session) {
slf.dailContext.UnRef(s)
}
// goroutine safe
func dialWithTimeout(url string, sessionNum uint32, dialTimeout time.Duration, timeout time.Duration) (*DialContext, error) {
if sessionNum <= 0 {
sessionNum = 100
log.Release("invalid sessionNum, reset to %v", sessionNum)
}
s, err := mgo.DialWithTimeout(url, dialTimeout)
if err != nil {
return nil, err
}
s.SetMode(mgo.Strong,true)
s.SetSyncTimeout(timeout)
s.SetSocketTimeout(timeout)
c := new(DialContext)
// sessions
c.sessions = make(SessionHeap, sessionNum)
c.sessions[0] = &Session{s, 0, 0}
for i := 1; i < int(sessionNum); i++ {
c.sessions[i] = &Session{s.New(), 0, i}
}
heap.Init(&c.sessions)
return c, nil
}
// goroutine safe
func (c *DialContext) Close() {
c.Lock()
for _, s := range c.sessions {
s.Close()
}
c.Unlock()
}
// goroutine safe
func (c *DialContext) Ref() *Session {
c.Lock()
s := c.sessions[0]
if s.ref == 0 {
s.Refresh()
}
s.ref++
heap.Fix(&c.sessions, 0)
c.Unlock()
return s
}
// goroutine safe
func (c *DialContext) UnRef(s *Session) {
if s == nil {
return
}
c.Lock()
s.ref--
heap.Fix(&c.sessions, s.index)
c.Unlock()
}
// goroutine safe
func (s *Session) EnsureCounter(db string, collection string, id string) error {
err := s.DB(db).C(collection).Insert(bson.M{
"_id": id,
"seq": 0,
})
if mgo.IsDup(err) {
return nil
} else {
return err
}
}
// goroutine safe
func (s *Session) NextSeq(db string, collection string, id string) (int, error) {
var res struct {
Seq int
}
_, err := s.DB(db).C(collection).FindId(id).Apply(mgo.Change{
Update: bson.M{"$inc": bson.M{"seq": 1}},
ReturnNew: true,
}, &res)
return res.Seq, err
}
// goroutine safe
func (s *Session) EnsureIndex(db string, collection string, key []string, bBackground bool) error {
return s.DB(db).C(collection).EnsureIndex(mgo.Index{
Key: key,
Unique: false,
Sparse: true,
Background: bBackground,
})
}
// goroutine safe
func (s *Session) EnsureUniqueIndex(db string, collection string, key []string, bBackground bool) error {
return s.DB(db).C(collection).EnsureIndex(mgo.Index{
Key: key,
Unique: true,
Sparse: true,
Background: bBackground,
})
}

View File

@@ -1,95 +0,0 @@
package mongomodule
import (
"fmt"
_ "gopkg.in/mgo.v2"
"gopkg.in/mgo.v2/bson"
"testing"
"time"
)
type Student struct {
ID bson.ObjectId `bson:"_id"`
Name string `bson: "name"`
Age int `bson: "age"`
Sid string `bson: "sid"`
Status int `bson: "status"`
}
type StudentName struct {
Name string `bson: "name"`
}
func Test_Example(t *testing.T) {
module:=MongoModule{}
module.Init("mongodb://admin:123456@192.168.2.119:27017",100, 5*time.Second,5*time.Second)
// take session
s := module.Take()
c := s.DB("test2").C("t_student")
//2.定义对象
insertData := Student{
ID:bson.NewObjectId(),
Name: "seeta11",
Age: 35, //*^_^*
Sid: "s20180907",
Status: 1,
}
updateData := Student{
Name: "seeta11",
Age: 18,
Sid: "s20180907",
Status: 1,
}
//3.插入数据
err := c.Insert(&insertData)
//4.查找数据
selector := bson.M{"_id":bson.ObjectIdHex("5f25303e999c622d361989b0")}
m:=Student{}
err = c.Find(selector).One(&m)
//5.更新数据
//selector2 := bson.M{"_id":bson.ObjectIdHex("5f25303e999c622d361989b0")}
updateData.ID = bson.ObjectIdHex("5f25303e999c622d361989b0")
err = c.UpdateId(bson.ObjectIdHex("5f25303e999c622d361989b0"),&updateData)
if err != nil {
fmt.Print(err)
}
//6.删除数据
err = c.RemoveId(bson.ObjectIdHex("5f252f09999c622d36198951"))
if err != nil {
fmt.Print(err)
}
//7.序号自增
s.EnsureCounter("test2","t_student","5f252f09999c622d36198951")
for i := 0; i < 3; i++ {
id, err := s.NextSeq("test2", "t_student", "5f252f09999c622d36198951")
if err != nil {
fmt.Println(err)
return
}
fmt.Println(id)
}
//8.setoninsert使用
info,uErr := c.Upsert(bson.M{"_id":bson.ObjectIdHex("5f252f09999c622d36198951")},bson.M{
"$setOnInsert":bson.M{"Name":"setoninsert","Age":55}})
//9.修改部分数字数据
selector1 := bson.M{"_id":bson.ObjectIdHex("60473de655f1012e7453b369")}
update1 := bson.M{"$set":bson.M{"name":"xxxxx","age":1111}}
c.Update(selector1,update1)
fmt.Println(info,uErr)
}

View File

@@ -16,7 +16,7 @@ type CustomerSubscriber struct {
rpc.IRpcHandler rpc.IRpcHandler
topic string topic string
subscriber *Subscriber subscriber *Subscriber
fromNodeId int fromNodeId string
callBackRpcMethod string callBackRpcMethod string
serviceName string serviceName string
StartIndex uint64 StartIndex uint64
@@ -37,7 +37,7 @@ const (
MethodLast SubscribeMethod = 1 //Last模式以该消费者上次记录的位置开始订阅 MethodLast SubscribeMethod = 1 //Last模式以该消费者上次记录的位置开始订阅
) )
func (cs *CustomerSubscriber) trySetSubscriberBaseInfo(rpcHandler rpc.IRpcHandler, ss *Subscriber, topic string, subscribeMethod SubscribeMethod, customerId string, fromNodeId int, callBackRpcMethod string, startIndex uint64, oneBatchQuantity int32) error { func (cs *CustomerSubscriber) trySetSubscriberBaseInfo(rpcHandler rpc.IRpcHandler, ss *Subscriber, topic string, subscribeMethod SubscribeMethod, customerId string, fromNodeId string, callBackRpcMethod string, startIndex uint64, oneBatchQuantity int32) error {
cs.subscriber = ss cs.subscriber = ss
cs.fromNodeId = fromNodeId cs.fromNodeId = fromNodeId
cs.callBackRpcMethod = callBackRpcMethod cs.callBackRpcMethod = callBackRpcMethod
@@ -85,7 +85,7 @@ func (cs *CustomerSubscriber) trySetSubscriberBaseInfo(rpcHandler rpc.IRpcHandle
} }
// 开始订阅 // 开始订阅
func (cs *CustomerSubscriber) Subscribe(rpcHandler rpc.IRpcHandler, ss *Subscriber, topic string, subscribeMethod SubscribeMethod, customerId string, fromNodeId int, callBackRpcMethod string, startIndex uint64, oneBatchQuantity int32) error { func (cs *CustomerSubscriber) Subscribe(rpcHandler rpc.IRpcHandler, ss *Subscriber, topic string, subscribeMethod SubscribeMethod, customerId string, fromNodeId string, callBackRpcMethod string, startIndex uint64, oneBatchQuantity int32) error {
err := cs.trySetSubscriberBaseInfo(rpcHandler, ss, topic, subscribeMethod, customerId, fromNodeId, callBackRpcMethod, startIndex, oneBatchQuantity) err := cs.trySetSubscriberBaseInfo(rpcHandler, ss, topic, subscribeMethod, customerId, fromNodeId, callBackRpcMethod, startIndex, oneBatchQuantity)
if err != nil { if err != nil {
return err return err

View File

@@ -122,5 +122,5 @@ func (ms *MessageQueueService) RPC_Publish(inParam *rpc.DBQueuePublishReq, outPa
func (ms *MessageQueueService) RPC_Subscribe(req *rpc.DBQueueSubscribeReq, res *rpc.DBQueueSubscribeRes) error { func (ms *MessageQueueService) RPC_Subscribe(req *rpc.DBQueueSubscribeReq, res *rpc.DBQueueSubscribeRes) error {
topicRoom := ms.GetTopicRoom(req.TopicName) topicRoom := ms.GetTopicRoom(req.TopicName)
return topicRoom.TopicSubscribe(ms.GetRpcHandler(), req.SubType, int32(req.Method), int(req.FromNodeId), req.RpcMethod, req.TopicName, req.CustomerId, req.StartIndex, req.OneBatchQuantity) return topicRoom.TopicSubscribe(ms.GetRpcHandler(), req.SubType, int32(req.Method), req.FromNodeId, req.RpcMethod, req.TopicName, req.CustomerId, req.StartIndex, req.OneBatchQuantity)
} }

View File

@@ -31,7 +31,7 @@ func (ss *Subscriber) PersistTopicData(topic string, topics []TopicData, retryCo
return ss.dataPersist.PersistTopicData(topic, topics, retryCount) return ss.dataPersist.PersistTopicData(topic, topics, retryCount)
} }
func (ss *Subscriber) TopicSubscribe(rpcHandler rpc.IRpcHandler, subScribeType rpc.SubscribeType, subscribeMethod SubscribeMethod, fromNodeId int, callBackRpcMethod string, topic string, customerId string, StartIndex uint64, oneBatchQuantity int32) error { func (ss *Subscriber) TopicSubscribe(rpcHandler rpc.IRpcHandler, subScribeType rpc.SubscribeType, subscribeMethod SubscribeMethod, fromNodeId string, callBackRpcMethod string, topic string, customerId string, StartIndex uint64, oneBatchQuantity int32) error {
//取消订阅时 //取消订阅时
if subScribeType == rpc.SubscribeType_Unsubscribe { if subScribeType == rpc.SubscribeType_Unsubscribe {
ss.UnSubscribe(customerId) ss.UnSubscribe(customerId)

View File

@@ -263,7 +263,7 @@ func (mp *MongoPersist) JugeTimeoutSave() bool{
func (mp *MongoPersist) persistCoroutine(){ func (mp *MongoPersist) persistCoroutine(){
defer mp.waitGroup.Done() defer mp.waitGroup.Done()
for atomic.LoadInt32(&mp.stop)==0 || mp.hasPersistData(){ for atomic.LoadInt32(&mp.stop)==0 {
//间隔时间sleep //间隔时间sleep
time.Sleep(time.Second*1) time.Sleep(time.Second*1)

View File

@@ -129,7 +129,7 @@ func (slf *WSClient) Run() {
for{ for{
bytes,err := slf.wsConn.ReadMsg() bytes,err := slf.wsConn.ReadMsg()
if err != nil { if err != nil {
log.Debug("read client id %d is error:%+v",slf.id,err) log.Debug("read client id %s is error:%+v",slf.id,err)
break break
} }
data,err:=slf.wsService.process.Unmarshal(slf.id,bytes) data,err:=slf.wsService.process.Unmarshal(slf.id,bytes)
@@ -153,7 +153,7 @@ func (ws *WSService) SendMsg(clientid string,msg interface{}) error{
client,ok := ws.mapClient[clientid] client,ok := ws.mapClient[clientid]
if ok == false{ if ok == false{
ws.mapClientLocker.Unlock() ws.mapClientLocker.Unlock()
return fmt.Errorf("client %d is disconnect!",clientid) return fmt.Errorf("client %s is disconnect!",clientid)
} }
ws.mapClientLocker.Unlock() ws.mapClientLocker.Unlock()