Compare commits

...

17 Commits

Author SHA1 Message Date
boyce
d1935b1bbc 优化日志 2024-05-27 18:09:37 +08:00
boyce
90d54bf3e2 1.新增服务和Global配置接口,支持通过结构体解析
2.优化日志
2024-05-15 10:06:50 +08:00
boyce
78cc33c84e 优化服务模板的配置读取 2024-05-11 15:11:24 +08:00
boyce
9cf21bf418 1.新增模板服务
2.优化消息队列
3.优化相关日志
2024-05-11 14:42:34 +08:00
boyce
c6d0bd9a19 优化gin模块 2024-05-08 14:18:39 +08:00
boyce
61bf95e457 优化RankService 2024-05-07 18:57:38 +08:00
boyce
8b2a551ee5 优化gin模块 2024-05-06 10:51:51 +08:00
boyce
927c2ffa37 优化gin模块 2024-05-06 10:36:04 +08:00
boyce
b23b30aac5 优化服务发现 2024-04-30 19:05:44 +08:00
boyce
03f8ba0316 精简事件通知 2024-04-30 17:33:34 +08:00
boyce
277480a7f0 Merge branch 'v2' of https://github.com/duanhf2012/origin into v2 2024-04-29 09:24:04 +08:00
boyce
647a654a36 补充优化说明文档 2024-04-29 09:23:54 +08:00
boyce
de483a88f1 优化停服 2024-04-28 18:02:20 +08:00
boyce
bbbb511b5f 新增kafka模块 2024-04-28 11:21:59 +08:00
boyce
0489ee3ef4 新增gin模块&优化pb处理器 2024-04-28 11:13:46 +08:00
boyce
692dacda0c 删除mongomodule,使用mongodbmodule替代 2024-04-28 10:40:56 +08:00
boyce
7f86b1007d 优化支持自定义logger 2024-04-26 18:13:41 +08:00
28 changed files with 1813 additions and 881 deletions

819
README.md

File diff suppressed because it is too large Load Diff

View File

@@ -8,6 +8,8 @@ import (
"strings"
"sync"
"github.com/duanhf2012/origin/v2/event"
"errors"
"reflect"
)
var configDir = "./config/"
@@ -54,7 +56,6 @@ type Cluster struct {
discoveryInfo DiscoveryInfo //服务发现配置
rpcMode RpcMode
//masterDiscoveryNodeList []NodeInfo //配置发现Master结点
globalCfg interface{} //全局配置
localServiceCfg map[string]interface{} //map[serviceName]配置数据*
@@ -63,6 +64,7 @@ type Cluster struct {
locker sync.RWMutex //结点与服务关系保护锁
mapRpc map[string]*NodeRpcInfo //nodeId
mapServiceNode map[string]map[string]struct{} //map[serviceName]map[NodeId]
mapTemplateServiceNode map[string]map[string]struct{} //map[templateServiceName]map[serviceName]nodeId
callSet rpc.CallSet
rpcNats rpc.RpcNats
@@ -70,7 +72,6 @@ type Cluster struct {
rpcEventLocker sync.RWMutex //Rpc事件监听保护锁
mapServiceListenRpcEvent map[string]struct{} //ServiceName
mapServiceListenDiscoveryEvent map[string]struct{} //ServiceName
}
func GetCluster() *Cluster {
@@ -139,6 +140,20 @@ func (cls *Cluster) delServiceNode(serviceName string, nodeId string) {
return
}
//处理模板服务
splitServiceName := strings.Split(serviceName,":")
if len(splitServiceName) == 2 {
serviceName = splitServiceName[0]
templateServiceName := splitServiceName[1]
mapService := cls.mapTemplateServiceNode[templateServiceName]
delete(mapService,serviceName)
if len(cls.mapTemplateServiceNode[templateServiceName]) == 0 {
delete(cls.mapTemplateServiceNode,templateServiceName)
}
}
mapNode := cls.mapServiceNode[serviceName]
delete(mapNode, nodeId)
if len(mapNode) == 0 {
@@ -173,7 +188,20 @@ func (cls *Cluster) serviceDiscoverySetNodeInfo(nodeInfo *NodeInfo) {
continue
}
mapDuplicate[serviceName] = nil
if _, ok := cls.mapServiceNode[serviceName]; ok == false {
//如果是模板服务,则记录模板关系
splitServiceName := strings.Split(serviceName,":")
if len(splitServiceName) == 2 {
serviceName = splitServiceName[0]
templateServiceName := splitServiceName[1]
//记录模板
if _, ok = cls.mapTemplateServiceNode[templateServiceName]; ok == false {
cls.mapTemplateServiceNode[templateServiceName]=map[string]struct{}{}
}
cls.mapTemplateServiceNode[templateServiceName][serviceName] = struct{}{}
}
if _, ok = cls.mapServiceNode[serviceName]; ok == false {
cls.mapServiceNode[serviceName] = make(map[string]struct{}, 1)
}
cls.mapServiceNode[serviceName][nodeInfo.NodeId] = struct{}{}
@@ -228,8 +256,6 @@ func (cls *Cluster) Init(localNodeId string, setupServiceFun SetupServiceFun) er
}
service.RegRpcEventFun = cls.RegRpcEvent
service.UnRegRpcEventFun = cls.UnRegRpcEvent
service.RegDiscoveryServiceEventFun = cls.RegDiscoveryEvent
service.UnRegDiscoveryServiceEventFun = cls.UnReDiscoveryEvent
err = cls.serviceDiscovery.InitDiscovery(localNodeId, cls.serviceDiscoveryDelNode, cls.serviceDiscoverySetNodeInfo)
if err != nil {
@@ -263,25 +289,29 @@ func (cls *Cluster) GetRpcClient(nodeId string) (*rpc.Client,bool) {
return cls.getRpcClient(nodeId)
}
func GetRpcClient(nodeId string, serviceMethod string,filterRetire bool, clientList []*rpc.Client) (error, int) {
func GetNodeIdByTemplateService(templateServiceName string, rpcClientList []*rpc.Client, filterRetire bool) (error, []*rpc.Client) {
return GetCluster().GetNodeIdByTemplateService(templateServiceName, rpcClientList, filterRetire)
}
func GetRpcClient(nodeId string, serviceMethod string,filterRetire bool, clientList []*rpc.Client) (error, []*rpc.Client) {
if nodeId != rpc.NodeIdNull {
pClient,retire := GetCluster().GetRpcClient(nodeId)
if pClient == nil {
return fmt.Errorf("cannot find nodeid %d!", nodeId), 0
return fmt.Errorf("cannot find nodeid %d!", nodeId), nil
}
//如果需要筛选掉退休结点
if filterRetire == true && retire == true {
return fmt.Errorf("cannot find nodeid %d!", nodeId), 0
return fmt.Errorf("cannot find nodeid %d!", nodeId), nil
}
clientList[0] = pClient
return nil, 1
clientList = append(clientList,pClient)
return nil, clientList
}
findIndex := strings.Index(serviceMethod, ".")
if findIndex == -1 {
return fmt.Errorf("servicemethod param %s is error!", serviceMethod), 0
return fmt.Errorf("servicemethod param %s is error!", serviceMethod), nil
}
serviceName := serviceMethod[:findIndex]
@@ -322,23 +352,12 @@ func (cls *Cluster) NotifyAllService(event event.IEvent){
}
func (cls *Cluster) TriggerDiscoveryEvent(bDiscovery bool, nodeId string, serviceName []string) {
cls.rpcEventLocker.Lock()
defer cls.rpcEventLocker.Unlock()
for sName, _ := range cls.mapServiceListenDiscoveryEvent {
ser := service.GetService(sName)
if ser == nil {
log.Error("cannot find service",log.Any("services",serviceName))
continue
}
var eventData service.DiscoveryServiceEvent
eventData.IsDiscovery = bDiscovery
eventData.NodeId = nodeId
eventData.ServiceName = serviceName
ser.(service.IModule).NotifyEvent(&eventData)
}
var eventData service.DiscoveryServiceEvent
eventData.IsDiscovery = bDiscovery
eventData.NodeId = nodeId
eventData.ServiceName = serviceName
cls.NotifyAllService(&eventData)
}
func (cls *Cluster) GetLocalNodeInfo() *NodeInfo {
@@ -361,25 +380,6 @@ func (cls *Cluster) UnRegRpcEvent(serviceName string) {
cls.rpcEventLocker.Unlock()
}
func (cls *Cluster) RegDiscoveryEvent(serviceName string) {
cls.rpcEventLocker.Lock()
if cls.mapServiceListenDiscoveryEvent == nil {
cls.mapServiceListenDiscoveryEvent = map[string]struct{}{}
}
cls.mapServiceListenDiscoveryEvent[serviceName] = struct{}{}
cls.rpcEventLocker.Unlock()
}
func (cls *Cluster) UnReDiscoveryEvent(serviceName string) {
cls.rpcEventLocker.Lock()
delete(cls.mapServiceListenDiscoveryEvent, serviceName)
cls.rpcEventLocker.Unlock()
}
func HasService(nodeId string, serviceName string) bool {
cluster.locker.RLock()
defer cluster.locker.RUnlock()
@@ -410,10 +410,50 @@ func GetNodeByServiceName(serviceName string) map[string]struct{} {
return mapNodeId
}
// GetNodeByTemplateServiceName 通过模板服务名获取服务名,返回 map[serviceName真实服务名]NodeId
func GetNodeByTemplateServiceName(templateServiceName string) map[string]string {
cluster.locker.RLock()
defer cluster.locker.RUnlock()
mapServiceName := cluster.mapTemplateServiceNode[templateServiceName]
mapNodeId := make(map[string]string,9)
for serviceName := range mapServiceName {
mapNode, ok := cluster.mapServiceNode[serviceName]
if ok == false {
return nil
}
for nodeId,_ := range mapNode {
mapNodeId[serviceName] = nodeId
}
}
return mapNodeId
}
func (cls *Cluster) GetGlobalCfg() interface{} {
return cls.globalCfg
}
func (cls *Cluster) ParseGlobalCfg(cfg interface{}) error{
if cls.globalCfg == nil {
return errors.New("no service configuration found")
}
rv := reflect.ValueOf(cls.globalCfg)
if rv.Kind() == reflect.Ptr && rv.IsNil() {
return errors.New("no service configuration found")
}
bytes,err := json.Marshal(cls.globalCfg)
if err != nil {
return err
}
return json.Unmarshal(bytes,cfg)
}
func (cls *Cluster) GetNodeInfo(nodeId string) (NodeInfo,bool) {
cls.locker.RLock()
defer cls.locker.RUnlock()

View File

@@ -183,6 +183,10 @@ func (ds *OriginDiscoveryMaster) OnNodeDisconnect(nodeId string) {
func (ds *OriginDiscoveryMaster) RpcCastGo(serviceMethod string, args interface{}) {
for nodeId, _ := range ds.mapNodeInfo {
if nodeId == cluster.GetLocalNodeInfo().NodeId {
continue
}
ds.GoNode(nodeId, serviceMethod, args)
}
}

View File

@@ -270,7 +270,7 @@ func (cls *Cluster) readLocalClusterConfig(nodeId string) (DiscoveryInfo, []Node
}
if nodeId != rpc.NodeIdNull && (len(nodeInfoList) != 1) {
return discoveryInfo, nil,rpcMode, fmt.Errorf("%d configurations were found for the configuration with node ID %d!", len(nodeInfoList), nodeId)
return discoveryInfo, nil,rpcMode, fmt.Errorf("nodeid %s configuration error in NodeList", nodeId)
}
for i, _ := range nodeInfoList {
@@ -325,6 +325,10 @@ func (cls *Cluster) readLocalService(localNodeId string) error {
//保存公共配置
for _, s := range cls.localNodeInfo.ServiceList {
for {
splitServiceName := strings.Split(s,":")
if len(splitServiceName) == 2 {
s = splitServiceName[0]
}
//取公共服务配置
pubCfg, ok := serviceConfig[s]
if ok == true {
@@ -355,6 +359,11 @@ func (cls *Cluster) readLocalService(localNodeId string) error {
//组合所有的配置
for _, s := range cls.localNodeInfo.ServiceList {
splitServiceName := strings.Split(s,":")
if len(splitServiceName) == 2 {
s = splitServiceName[0]
}
//先从NodeService中找
var serviceCfg interface{}
var ok bool
@@ -382,12 +391,24 @@ func (cls *Cluster) parseLocalCfg() {
cls.mapRpc[cls.localNodeInfo.NodeId] = &rpcInfo
for _, sName := range cls.localNodeInfo.ServiceList {
if _, ok := cls.mapServiceNode[sName]; ok == false {
cls.mapServiceNode[sName] = make(map[string]struct{})
for _, serviceName := range cls.localNodeInfo.ServiceList {
splitServiceName := strings.Split(serviceName,":")
if len(splitServiceName) == 2 {
serviceName = splitServiceName[0]
templateServiceName := splitServiceName[1]
//记录模板
if _, ok := cls.mapTemplateServiceNode[templateServiceName]; ok == false {
cls.mapTemplateServiceNode[templateServiceName]=map[string]struct{}{}
}
cls.mapTemplateServiceNode[templateServiceName][serviceName] = struct{}{}
}
cls.mapServiceNode[sName][cls.localNodeInfo.NodeId] = struct{}{}
if _, ok := cls.mapServiceNode[serviceName]; ok == false {
cls.mapServiceNode[serviceName] = make(map[string]struct{})
}
cls.mapServiceNode[serviceName][cls.localNodeInfo.NodeId] = struct{}{}
}
}
@@ -403,6 +424,7 @@ func (cls *Cluster) InitCfg(localNodeId string) error {
cls.localServiceCfg = map[string]interface{}{}
cls.mapRpc = map[string]*NodeRpcInfo{}
cls.mapServiceNode = map[string]map[string]struct{}{}
cls.mapTemplateServiceNode = map[string]map[string]struct{}{}
//加载本地结点的NodeList配置
discoveryInfo, nodeInfoList,rpcMode, err := cls.readLocalClusterConfig(localNodeId)
@@ -436,12 +458,37 @@ func (cls *Cluster) IsConfigService(serviceName string) bool {
return ok
}
func (cls *Cluster) GetNodeIdByTemplateService(templateServiceName string, rpcClientList []*rpc.Client, filterRetire bool) (error, []*rpc.Client) {
cls.locker.RLock()
defer cls.locker.RUnlock()
func (cls *Cluster) GetNodeIdByService(serviceName string, rpcClientList []*rpc.Client, filterRetire bool) (error, int) {
mapServiceName := cls.mapTemplateServiceNode[templateServiceName]
for serviceName := range mapServiceName {
mapNodeId, ok := cls.mapServiceNode[serviceName]
if ok == true {
for nodeId, _ := range mapNodeId {
pClient,retire := GetCluster().getRpcClient(nodeId)
if pClient == nil || pClient.IsConnected() == false {
continue
}
//如果需要筛选掉退休的对retire状态的结点略过
if filterRetire == true && retire == true {
continue
}
rpcClientList = append(rpcClientList,pClient)
}
}
}
return nil, rpcClientList
}
func (cls *Cluster) GetNodeIdByService(serviceName string, rpcClientList []*rpc.Client, filterRetire bool) (error, []*rpc.Client) {
cls.locker.RLock()
defer cls.locker.RUnlock()
mapNodeId, ok := cls.mapServiceNode[serviceName]
count := 0
if ok == true {
for nodeId, _ := range mapNodeId {
pClient,retire := GetCluster().getRpcClient(nodeId)
@@ -454,15 +501,11 @@ func (cls *Cluster) GetNodeIdByService(serviceName string, rpcClientList []*rpc.
continue
}
rpcClientList[count] = pClient
count++
if count >= cap(rpcClientList) {
break
}
rpcClientList = append(rpcClientList,pClient)
}
}
return nil, count
return nil, rpcClientList
}
func (cls *Cluster) GetServiceCfg(serviceName string) interface{} {

View File

@@ -236,7 +236,6 @@ func (processor *EventProcessor) castEvent(event IEvent){
eventProcessor,ok := processor.mapListenerEvent[event.GetEventType()]
if ok == false || processor == nil{
log.Debug("event is not listen",log.Int("event type",int(event.GetEventType())))
return
}

View File

@@ -17,6 +17,7 @@ const (
Sys_Event_QueueTaskFinish EventType = -10
Sys_Event_Retire EventType = -11
Sys_Event_EtcdDiscovery EventType = -12
Sys_Event_Gin_Event EventType = -13
Sys_Event_User_Define EventType = 1
)

View File

@@ -22,7 +22,10 @@ var LogSize int64
var LogChannelCap int
var LogPath string
var LogLevel slog.Level = LevelTrace
var gLogger, _ = NewTextLogger(LevelDebug, "", "",true,LogChannelCap)
var isSetLogger bool
var memPool = bytespool.NewMemAreaPool()
// levels
@@ -37,6 +40,21 @@ const (
LevelFatal = slog.Level(20)
)
type ILogger interface {
Trace(msg string, args ...any)
Debug(msg string, args ...any)
Info(msg string, args ...any)
Warning(msg string, args ...any)
Error(msg string, args ...any)
Stack(msg string, args ...any)
Dump(msg string, args ...any)
Fatal(msg string, args ...any)
DoSPrintf(level slog.Level,a []interface{})
FormatHeader(buf *Buffer,level slog.Level,calldepth int)
Close()
}
type Logger struct {
Slogger *slog.Logger
@@ -47,7 +65,6 @@ type Logger struct {
type IoWriter struct {
outFile io.Writer // destination for output
outConsole io.Writer //os.Stdout
writeBytes int64
logChannel chan []byte
wg sync.WaitGroup
@@ -122,8 +139,8 @@ func (iw *IoWriter) Write(p []byte) (n int, err error){
func (iw *IoWriter) writeIo(p []byte) (n int, err error){
n,err = iw.writeFile(p)
if iw.outConsole != nil {
n,err = iw.outConsole.Write(p)
if OpenConsole {
n,err = os.Stdout.Write(p)
}
return
@@ -217,17 +234,12 @@ func (iw *IoWriter) swichFile() error{
iw.fileDay = now.Day()
iw.fileCreateTime = now.Unix()
atomic.StoreInt64(&iw.writeBytes,0)
if OpenConsole == true {
iw.outConsole = os.Stdout
}
}else{
iw.outConsole = os.Stdout
}
return nil
}
func NewTextLogger(level slog.Level,pathName string,filePrefix string,addSource bool,logChannelCap int) (*Logger,error){
func NewTextLogger(level slog.Level,pathName string,filePrefix string,addSource bool,logChannelCap int) (ILogger,error){
var logger Logger
logger.ioWriter.filePath = pathName
logger.ioWriter.fileprefix = filePrefix
@@ -242,7 +254,7 @@ func NewTextLogger(level slog.Level,pathName string,filePrefix string,addSource
return &logger,nil
}
func NewJsonLogger(level slog.Level,pathName string,filePrefix string,addSource bool,logChannelCap int) (*Logger,error){
func NewJsonLogger(level slog.Level,pathName string,filePrefix string,addSource bool,logChannelCap int) (ILogger,error){
var logger Logger
logger.ioWriter.filePath = pathName
logger.ioWriter.fileprefix = filePrefix
@@ -296,13 +308,18 @@ func (logger *Logger) Fatal(msg string, args ...any) {
os.Exit(1)
}
// It's dangerous to call the method on logging
func Export(logger *Logger) {
if logger != nil {
// It's non-thread-safe
func SetLogger(logger ILogger) {
if logger != nil && isSetLogger == false {
gLogger = logger
isSetLogger = true
}
}
func GetLogger() ILogger{
return gLogger
}
func Trace(msg string, args ...any){
gLogger.Trace(msg, args...)
}
@@ -415,7 +432,7 @@ func Group(key string, args ...any) slog.Attr {
return slog.Group(key, args...)
}
func (logger *Logger) doSPrintf(level slog.Level,a []interface{}) {
func (logger *Logger) DoSPrintf(level slog.Level,a []interface{}) {
if logger.Slogger.Enabled(context.Background(),level) == false{
return
}
@@ -425,7 +442,7 @@ func (logger *Logger) doSPrintf(level slog.Level,a []interface{}) {
logger.sBuff.Reset()
logger.formatHeader(&logger.sBuff,level,3)
logger.FormatHeader(&logger.sBuff,level,3)
for _,s := range a {
logger.sBuff.AppendString(slog.AnyValue(s).String())
@@ -435,46 +452,46 @@ func (logger *Logger) doSPrintf(level slog.Level,a []interface{}) {
}
func (logger *Logger) STrace(a ...interface{}) {
logger.doSPrintf(LevelTrace,a)
logger.DoSPrintf(LevelTrace,a)
}
func (logger *Logger) SDebug(a ...interface{}) {
logger.doSPrintf(LevelDebug,a)
logger.DoSPrintf(LevelDebug,a)
}
func (logger *Logger) SInfo(a ...interface{}) {
logger.doSPrintf(LevelInfo,a)
logger.DoSPrintf(LevelInfo,a)
}
func (logger *Logger) SWarning(a ...interface{}) {
logger.doSPrintf(LevelWarning,a)
logger.DoSPrintf(LevelWarning,a)
}
func (logger *Logger) SError(a ...interface{}) {
logger.doSPrintf(LevelError,a)
logger.DoSPrintf(LevelError,a)
}
func STrace(a ...interface{}) {
gLogger.doSPrintf(LevelTrace,a)
gLogger.DoSPrintf(LevelTrace,a)
}
func SDebug(a ...interface{}) {
gLogger.doSPrintf(LevelDebug,a)
gLogger.DoSPrintf(LevelDebug,a)
}
func SInfo(a ...interface{}) {
gLogger.doSPrintf(LevelInfo,a)
gLogger.DoSPrintf(LevelInfo,a)
}
func SWarning(a ...interface{}) {
gLogger.doSPrintf(LevelWarning,a)
gLogger.DoSPrintf(LevelWarning,a)
}
func SError(a ...interface{}) {
gLogger.doSPrintf(LevelError,a)
gLogger.DoSPrintf(LevelError,a)
}
func (logger *Logger) formatHeader(buf *Buffer,level slog.Level,calldepth int) {
func (logger *Logger) FormatHeader(buf *Buffer,level slog.Level,calldepth int) {
t := time.Now()
var file string
var line int

View File

@@ -10,9 +10,9 @@ type RawMessageInfo struct {
msgHandler RawMessageHandler
}
type RawMessageHandler func(clientId uint64,packType uint16,msg []byte)
type RawConnectHandler func(clientId uint64)
type UnknownRawMessageHandler func(clientId uint64,msg []byte)
type RawMessageHandler func(clientId string,packType uint16,msg []byte)
type RawConnectHandler func(clientId string)
type UnknownRawMessageHandler func(clientId string,msg []byte)
const RawMsgTypeSize = 2
type PBRawProcessor struct {
@@ -38,14 +38,14 @@ func (pbRawProcessor *PBRawProcessor) SetByteOrder(littleEndian bool) {
}
// must goroutine safe
func (pbRawProcessor *PBRawProcessor ) MsgRoute(clientId uint64, msg interface{}) error{
func (pbRawProcessor *PBRawProcessor ) MsgRoute(clientId string, msg interface{}) error{
pPackInfo := msg.(*PBRawPackInfo)
pbRawProcessor.msgHandler(clientId,pPackInfo.typ,pPackInfo.rawMsg)
return nil
}
// must goroutine safe
func (pbRawProcessor *PBRawProcessor ) Unmarshal(clientId uint64,data []byte) (interface{}, error) {
func (pbRawProcessor *PBRawProcessor ) Unmarshal(clientId string,data []byte) (interface{}, error) {
var msgType uint16
if pbRawProcessor.LittleEndian == true {
msgType = binary.LittleEndian.Uint16(data[:2])
@@ -57,7 +57,7 @@ func (pbRawProcessor *PBRawProcessor ) Unmarshal(clientId uint64,data []byte) (i
}
// must goroutine safe
func (pbRawProcessor *PBRawProcessor ) Marshal(clientId uint64,msg interface{}) ([]byte, error){
func (pbRawProcessor *PBRawProcessor ) Marshal(clientId string,msg interface{}) ([]byte, error){
pMsg := msg.(*PBRawPackInfo)
buff := make([]byte, 2, len(pMsg.rawMsg)+RawMsgTypeSize)
@@ -80,7 +80,7 @@ func (pbRawProcessor *PBRawProcessor) MakeRawMsg(msgType uint16,msg []byte,pbRaw
pbRawPackInfo.rawMsg = msg
}
func (pbRawProcessor *PBRawProcessor) UnknownMsgRoute(clientId uint64,msg interface{}){
func (pbRawProcessor *PBRawProcessor) UnknownMsgRoute(clientId string,msg interface{}){
if pbRawProcessor.unknownMessageHandler == nil {
return
}
@@ -88,11 +88,11 @@ func (pbRawProcessor *PBRawProcessor) UnknownMsgRoute(clientId uint64,msg interf
}
// connect event
func (pbRawProcessor *PBRawProcessor) ConnectedRoute(clientId uint64){
func (pbRawProcessor *PBRawProcessor) ConnectedRoute(clientId string){
pbRawProcessor.connectHandler(clientId)
}
func (pbRawProcessor *PBRawProcessor) DisConnectedRoute(clientId uint64){
func (pbRawProcessor *PBRawProcessor) DisConnectedRoute(clientId string){
pbRawProcessor.disconnectHandler(clientId)
}

View File

@@ -25,6 +25,7 @@ import (
var sig chan os.Signal
var nodeId string
var preSetupService []service.IService //预安装
var preSetupTemplateService []func()service.IService
var profilerInterval time.Duration
var bValid bool
var configDir = "./config/"
@@ -117,7 +118,7 @@ func setConfigPath(val interface{}) error {
}
func getRunProcessPid(nodeId string) (int, error) {
f, err := os.OpenFile(fmt.Sprintf("%s_%d.pid", os.Args[0], nodeId), os.O_RDONLY, 0600)
f, err := os.OpenFile(fmt.Sprintf("%s_%s.pid", os.Args[0], nodeId), os.O_RDONLY, 0600)
defer f.Close()
if err != nil {
return 0, err
@@ -169,6 +170,31 @@ func initNode(id string) {
serviceOrder := cluster.GetCluster().GetLocalNodeInfo().ServiceList
for _,serviceName:= range serviceOrder{
bSetup := false
//判断是否有配置模板服务
splitServiceName := strings.Split(serviceName,":")
if len(splitServiceName) == 2 {
serviceName = splitServiceName[0]
templateServiceName := splitServiceName[1]
for _,newSer := range preSetupTemplateService {
ser := newSer()
ser.OnSetup(ser)
if ser.GetName() == templateServiceName {
ser.SetName(serviceName)
ser.Init(ser,cluster.GetRpcClient,cluster.GetRpcServer,cluster.GetCluster().GetServiceCfg(ser.GetName()))
service.Setup(ser)
bSetup = true
break
}
}
if bSetup == false{
log.Error("Template service not found",log.String("service name",serviceName),log.String("template service name",templateServiceName))
os.Exit(1)
}
}
for _, s := range preSetupService {
if s.GetName() != serviceName {
continue
@@ -201,7 +227,7 @@ func initLog() error {
fmt.Printf("cannot create log file!\n")
return err
}
log.Export(logger)
log.SetLogger(logger)
return nil
}
@@ -367,6 +393,12 @@ func Setup(s ...service.IService) {
}
}
func SetupTemplate(fs ...func()service.IService){
for _, f := range fs {
preSetupTemplateService = append(preSetupTemplateService, f)
}
}
func GetService(serviceName string) service.IService {
return service.GetService(serviceName)
}

View File

@@ -1,8 +1,8 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.31.0
// protoc v3.11.4
// source: test/rpc/messagequeue.proto
// protoc v4.24.0
// source: rpcproto/messagequeue.proto
package rpc
@@ -50,11 +50,11 @@ func (x SubscribeType) String() string {
}
func (SubscribeType) Descriptor() protoreflect.EnumDescriptor {
return file_test_rpc_messagequeue_proto_enumTypes[0].Descriptor()
return file_rpcproto_messagequeue_proto_enumTypes[0].Descriptor()
}
func (SubscribeType) Type() protoreflect.EnumType {
return &file_test_rpc_messagequeue_proto_enumTypes[0]
return &file_rpcproto_messagequeue_proto_enumTypes[0]
}
func (x SubscribeType) Number() protoreflect.EnumNumber {
@@ -63,7 +63,7 @@ func (x SubscribeType) Number() protoreflect.EnumNumber {
// Deprecated: Use SubscribeType.Descriptor instead.
func (SubscribeType) EnumDescriptor() ([]byte, []int) {
return file_test_rpc_messagequeue_proto_rawDescGZIP(), []int{0}
return file_rpcproto_messagequeue_proto_rawDescGZIP(), []int{0}
}
type SubscribeMethod int32
@@ -96,11 +96,11 @@ func (x SubscribeMethod) String() string {
}
func (SubscribeMethod) Descriptor() protoreflect.EnumDescriptor {
return file_test_rpc_messagequeue_proto_enumTypes[1].Descriptor()
return file_rpcproto_messagequeue_proto_enumTypes[1].Descriptor()
}
func (SubscribeMethod) Type() protoreflect.EnumType {
return &file_test_rpc_messagequeue_proto_enumTypes[1]
return &file_rpcproto_messagequeue_proto_enumTypes[1]
}
func (x SubscribeMethod) Number() protoreflect.EnumNumber {
@@ -109,7 +109,7 @@ func (x SubscribeMethod) Number() protoreflect.EnumNumber {
// Deprecated: Use SubscribeMethod.Descriptor instead.
func (SubscribeMethod) EnumDescriptor() ([]byte, []int) {
return file_test_rpc_messagequeue_proto_rawDescGZIP(), []int{1}
return file_rpcproto_messagequeue_proto_rawDescGZIP(), []int{1}
}
type DBQueuePopReq struct {
@@ -127,7 +127,7 @@ type DBQueuePopReq struct {
func (x *DBQueuePopReq) Reset() {
*x = DBQueuePopReq{}
if protoimpl.UnsafeEnabled {
mi := &file_test_rpc_messagequeue_proto_msgTypes[0]
mi := &file_rpcproto_messagequeue_proto_msgTypes[0]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
@@ -140,7 +140,7 @@ func (x *DBQueuePopReq) String() string {
func (*DBQueuePopReq) ProtoMessage() {}
func (x *DBQueuePopReq) ProtoReflect() protoreflect.Message {
mi := &file_test_rpc_messagequeue_proto_msgTypes[0]
mi := &file_rpcproto_messagequeue_proto_msgTypes[0]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
@@ -153,7 +153,7 @@ func (x *DBQueuePopReq) ProtoReflect() protoreflect.Message {
// Deprecated: Use DBQueuePopReq.ProtoReflect.Descriptor instead.
func (*DBQueuePopReq) Descriptor() ([]byte, []int) {
return file_test_rpc_messagequeue_proto_rawDescGZIP(), []int{0}
return file_rpcproto_messagequeue_proto_rawDescGZIP(), []int{0}
}
func (x *DBQueuePopReq) GetCustomerId() string {
@@ -203,7 +203,7 @@ type DBQueuePopRes struct {
func (x *DBQueuePopRes) Reset() {
*x = DBQueuePopRes{}
if protoimpl.UnsafeEnabled {
mi := &file_test_rpc_messagequeue_proto_msgTypes[1]
mi := &file_rpcproto_messagequeue_proto_msgTypes[1]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
@@ -216,7 +216,7 @@ func (x *DBQueuePopRes) String() string {
func (*DBQueuePopRes) ProtoMessage() {}
func (x *DBQueuePopRes) ProtoReflect() protoreflect.Message {
mi := &file_test_rpc_messagequeue_proto_msgTypes[1]
mi := &file_rpcproto_messagequeue_proto_msgTypes[1]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
@@ -229,7 +229,7 @@ func (x *DBQueuePopRes) ProtoReflect() protoreflect.Message {
// Deprecated: Use DBQueuePopRes.ProtoReflect.Descriptor instead.
func (*DBQueuePopRes) Descriptor() ([]byte, []int) {
return file_test_rpc_messagequeue_proto_rawDescGZIP(), []int{1}
return file_rpcproto_messagequeue_proto_rawDescGZIP(), []int{1}
}
func (x *DBQueuePopRes) GetQueueName() string {
@@ -255,7 +255,7 @@ type DBQueueSubscribeReq struct {
SubType SubscribeType `protobuf:"varint,1,opt,name=SubType,proto3,enum=SubscribeType" json:"SubType,omitempty"` //订阅类型
Method SubscribeMethod `protobuf:"varint,2,opt,name=Method,proto3,enum=SubscribeMethod" json:"Method,omitempty"` //订阅方法
CustomerId string `protobuf:"bytes,3,opt,name=CustomerId,proto3" json:"CustomerId,omitempty"` //消费者Id
FromNodeId int32 `protobuf:"varint,4,opt,name=FromNodeId,proto3" json:"FromNodeId,omitempty"`
FromNodeId string `protobuf:"bytes,4,opt,name=FromNodeId,proto3" json:"FromNodeId,omitempty"`
RpcMethod string `protobuf:"bytes,5,opt,name=RpcMethod,proto3" json:"RpcMethod,omitempty"`
TopicName string `protobuf:"bytes,6,opt,name=TopicName,proto3" json:"TopicName,omitempty"` //主题名称
StartIndex uint64 `protobuf:"varint,7,opt,name=StartIndex,proto3" json:"StartIndex,omitempty"` //开始位置 ,格式前4位是时间戳秒后面是序号。如果填0时服务自动修改成(4bit 当前时间秒)| (0000 4bit)
@@ -265,7 +265,7 @@ type DBQueueSubscribeReq struct {
func (x *DBQueueSubscribeReq) Reset() {
*x = DBQueueSubscribeReq{}
if protoimpl.UnsafeEnabled {
mi := &file_test_rpc_messagequeue_proto_msgTypes[2]
mi := &file_rpcproto_messagequeue_proto_msgTypes[2]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
@@ -278,7 +278,7 @@ func (x *DBQueueSubscribeReq) String() string {
func (*DBQueueSubscribeReq) ProtoMessage() {}
func (x *DBQueueSubscribeReq) ProtoReflect() protoreflect.Message {
mi := &file_test_rpc_messagequeue_proto_msgTypes[2]
mi := &file_rpcproto_messagequeue_proto_msgTypes[2]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
@@ -291,7 +291,7 @@ func (x *DBQueueSubscribeReq) ProtoReflect() protoreflect.Message {
// Deprecated: Use DBQueueSubscribeReq.ProtoReflect.Descriptor instead.
func (*DBQueueSubscribeReq) Descriptor() ([]byte, []int) {
return file_test_rpc_messagequeue_proto_rawDescGZIP(), []int{2}
return file_rpcproto_messagequeue_proto_rawDescGZIP(), []int{2}
}
func (x *DBQueueSubscribeReq) GetSubType() SubscribeType {
@@ -315,11 +315,11 @@ func (x *DBQueueSubscribeReq) GetCustomerId() string {
return ""
}
func (x *DBQueueSubscribeReq) GetFromNodeId() int32 {
func (x *DBQueueSubscribeReq) GetFromNodeId() string {
if x != nil {
return x.FromNodeId
}
return 0
return ""
}
func (x *DBQueueSubscribeReq) GetRpcMethod() string {
@@ -359,7 +359,7 @@ type DBQueueSubscribeRes struct {
func (x *DBQueueSubscribeRes) Reset() {
*x = DBQueueSubscribeRes{}
if protoimpl.UnsafeEnabled {
mi := &file_test_rpc_messagequeue_proto_msgTypes[3]
mi := &file_rpcproto_messagequeue_proto_msgTypes[3]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
@@ -372,7 +372,7 @@ func (x *DBQueueSubscribeRes) String() string {
func (*DBQueueSubscribeRes) ProtoMessage() {}
func (x *DBQueueSubscribeRes) ProtoReflect() protoreflect.Message {
mi := &file_test_rpc_messagequeue_proto_msgTypes[3]
mi := &file_rpcproto_messagequeue_proto_msgTypes[3]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
@@ -385,7 +385,7 @@ func (x *DBQueueSubscribeRes) ProtoReflect() protoreflect.Message {
// Deprecated: Use DBQueueSubscribeRes.ProtoReflect.Descriptor instead.
func (*DBQueueSubscribeRes) Descriptor() ([]byte, []int) {
return file_test_rpc_messagequeue_proto_rawDescGZIP(), []int{3}
return file_rpcproto_messagequeue_proto_rawDescGZIP(), []int{3}
}
type DBQueuePublishReq struct {
@@ -400,7 +400,7 @@ type DBQueuePublishReq struct {
func (x *DBQueuePublishReq) Reset() {
*x = DBQueuePublishReq{}
if protoimpl.UnsafeEnabled {
mi := &file_test_rpc_messagequeue_proto_msgTypes[4]
mi := &file_rpcproto_messagequeue_proto_msgTypes[4]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
@@ -413,7 +413,7 @@ func (x *DBQueuePublishReq) String() string {
func (*DBQueuePublishReq) ProtoMessage() {}
func (x *DBQueuePublishReq) ProtoReflect() protoreflect.Message {
mi := &file_test_rpc_messagequeue_proto_msgTypes[4]
mi := &file_rpcproto_messagequeue_proto_msgTypes[4]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
@@ -426,7 +426,7 @@ func (x *DBQueuePublishReq) ProtoReflect() protoreflect.Message {
// Deprecated: Use DBQueuePublishReq.ProtoReflect.Descriptor instead.
func (*DBQueuePublishReq) Descriptor() ([]byte, []int) {
return file_test_rpc_messagequeue_proto_rawDescGZIP(), []int{4}
return file_rpcproto_messagequeue_proto_rawDescGZIP(), []int{4}
}
func (x *DBQueuePublishReq) GetTopicName() string {
@@ -452,7 +452,7 @@ type DBQueuePublishRes struct {
func (x *DBQueuePublishRes) Reset() {
*x = DBQueuePublishRes{}
if protoimpl.UnsafeEnabled {
mi := &file_test_rpc_messagequeue_proto_msgTypes[5]
mi := &file_rpcproto_messagequeue_proto_msgTypes[5]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
@@ -465,7 +465,7 @@ func (x *DBQueuePublishRes) String() string {
func (*DBQueuePublishRes) ProtoMessage() {}
func (x *DBQueuePublishRes) ProtoReflect() protoreflect.Message {
mi := &file_test_rpc_messagequeue_proto_msgTypes[5]
mi := &file_rpcproto_messagequeue_proto_msgTypes[5]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
@@ -478,13 +478,13 @@ func (x *DBQueuePublishRes) ProtoReflect() protoreflect.Message {
// Deprecated: Use DBQueuePublishRes.ProtoReflect.Descriptor instead.
func (*DBQueuePublishRes) Descriptor() ([]byte, []int) {
return file_test_rpc_messagequeue_proto_rawDescGZIP(), []int{5}
return file_rpcproto_messagequeue_proto_rawDescGZIP(), []int{5}
}
var File_test_rpc_messagequeue_proto protoreflect.FileDescriptor
var File_rpcproto_messagequeue_proto protoreflect.FileDescriptor
var file_test_rpc_messagequeue_proto_rawDesc = []byte{
0x0a, 0x1b, 0x74, 0x65, 0x73, 0x74, 0x2f, 0x72, 0x70, 0x63, 0x2f, 0x6d, 0x65, 0x73, 0x73, 0x61,
var file_rpcproto_messagequeue_proto_rawDesc = []byte{
0x0a, 0x1b, 0x72, 0x70, 0x63, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x6d, 0x65, 0x73, 0x73, 0x61,
0x67, 0x65, 0x71, 0x75, 0x65, 0x75, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0xa3, 0x01,
0x0a, 0x0d, 0x44, 0x42, 0x51, 0x75, 0x65, 0x75, 0x65, 0x50, 0x6f, 0x70, 0x52, 0x65, 0x71, 0x12,
0x1e, 0x0a, 0x0a, 0x43, 0x75, 0x73, 0x74, 0x6f, 0x6d, 0x65, 0x72, 0x49, 0x64, 0x18, 0x01, 0x20,
@@ -510,7 +510,7 @@ var file_test_rpc_messagequeue_proto_rawDesc = []byte{
0x6f, 0x64, 0x52, 0x06, 0x4d, 0x65, 0x74, 0x68, 0x6f, 0x64, 0x12, 0x1e, 0x0a, 0x0a, 0x43, 0x75,
0x73, 0x74, 0x6f, 0x6d, 0x65, 0x72, 0x49, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a,
0x43, 0x75, 0x73, 0x74, 0x6f, 0x6d, 0x65, 0x72, 0x49, 0x64, 0x12, 0x1e, 0x0a, 0x0a, 0x46, 0x72,
0x6f, 0x6d, 0x4e, 0x6f, 0x64, 0x65, 0x49, 0x64, 0x18, 0x04, 0x20, 0x01, 0x28, 0x05, 0x52, 0x0a,
0x6f, 0x6d, 0x4e, 0x6f, 0x64, 0x65, 0x49, 0x64, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a,
0x46, 0x72, 0x6f, 0x6d, 0x4e, 0x6f, 0x64, 0x65, 0x49, 0x64, 0x12, 0x1c, 0x0a, 0x09, 0x52, 0x70,
0x63, 0x4d, 0x65, 0x74, 0x68, 0x6f, 0x64, 0x18, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x52,
0x70, 0x63, 0x4d, 0x65, 0x74, 0x68, 0x6f, 0x64, 0x12, 0x1c, 0x0a, 0x09, 0x54, 0x6f, 0x70, 0x69,
@@ -539,20 +539,20 @@ var file_test_rpc_messagequeue_proto_rawDesc = []byte{
}
var (
file_test_rpc_messagequeue_proto_rawDescOnce sync.Once
file_test_rpc_messagequeue_proto_rawDescData = file_test_rpc_messagequeue_proto_rawDesc
file_rpcproto_messagequeue_proto_rawDescOnce sync.Once
file_rpcproto_messagequeue_proto_rawDescData = file_rpcproto_messagequeue_proto_rawDesc
)
func file_test_rpc_messagequeue_proto_rawDescGZIP() []byte {
file_test_rpc_messagequeue_proto_rawDescOnce.Do(func() {
file_test_rpc_messagequeue_proto_rawDescData = protoimpl.X.CompressGZIP(file_test_rpc_messagequeue_proto_rawDescData)
func file_rpcproto_messagequeue_proto_rawDescGZIP() []byte {
file_rpcproto_messagequeue_proto_rawDescOnce.Do(func() {
file_rpcproto_messagequeue_proto_rawDescData = protoimpl.X.CompressGZIP(file_rpcproto_messagequeue_proto_rawDescData)
})
return file_test_rpc_messagequeue_proto_rawDescData
return file_rpcproto_messagequeue_proto_rawDescData
}
var file_test_rpc_messagequeue_proto_enumTypes = make([]protoimpl.EnumInfo, 2)
var file_test_rpc_messagequeue_proto_msgTypes = make([]protoimpl.MessageInfo, 6)
var file_test_rpc_messagequeue_proto_goTypes = []interface{}{
var file_rpcproto_messagequeue_proto_enumTypes = make([]protoimpl.EnumInfo, 2)
var file_rpcproto_messagequeue_proto_msgTypes = make([]protoimpl.MessageInfo, 6)
var file_rpcproto_messagequeue_proto_goTypes = []interface{}{
(SubscribeType)(0), // 0: SubscribeType
(SubscribeMethod)(0), // 1: SubscribeMethod
(*DBQueuePopReq)(nil), // 2: DBQueuePopReq
@@ -562,7 +562,7 @@ var file_test_rpc_messagequeue_proto_goTypes = []interface{}{
(*DBQueuePublishReq)(nil), // 6: DBQueuePublishReq
(*DBQueuePublishRes)(nil), // 7: DBQueuePublishRes
}
var file_test_rpc_messagequeue_proto_depIdxs = []int32{
var file_rpcproto_messagequeue_proto_depIdxs = []int32{
0, // 0: DBQueueSubscribeReq.SubType:type_name -> SubscribeType
1, // 1: DBQueueSubscribeReq.Method:type_name -> SubscribeMethod
2, // [2:2] is the sub-list for method output_type
@@ -572,13 +572,13 @@ var file_test_rpc_messagequeue_proto_depIdxs = []int32{
0, // [0:2] is the sub-list for field type_name
}
func init() { file_test_rpc_messagequeue_proto_init() }
func file_test_rpc_messagequeue_proto_init() {
if File_test_rpc_messagequeue_proto != nil {
func init() { file_rpcproto_messagequeue_proto_init() }
func file_rpcproto_messagequeue_proto_init() {
if File_rpcproto_messagequeue_proto != nil {
return
}
if !protoimpl.UnsafeEnabled {
file_test_rpc_messagequeue_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} {
file_rpcproto_messagequeue_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*DBQueuePopReq); i {
case 0:
return &v.state
@@ -590,7 +590,7 @@ func file_test_rpc_messagequeue_proto_init() {
return nil
}
}
file_test_rpc_messagequeue_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} {
file_rpcproto_messagequeue_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*DBQueuePopRes); i {
case 0:
return &v.state
@@ -602,7 +602,7 @@ func file_test_rpc_messagequeue_proto_init() {
return nil
}
}
file_test_rpc_messagequeue_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} {
file_rpcproto_messagequeue_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*DBQueueSubscribeReq); i {
case 0:
return &v.state
@@ -614,7 +614,7 @@ func file_test_rpc_messagequeue_proto_init() {
return nil
}
}
file_test_rpc_messagequeue_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} {
file_rpcproto_messagequeue_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*DBQueueSubscribeRes); i {
case 0:
return &v.state
@@ -626,7 +626,7 @@ func file_test_rpc_messagequeue_proto_init() {
return nil
}
}
file_test_rpc_messagequeue_proto_msgTypes[4].Exporter = func(v interface{}, i int) interface{} {
file_rpcproto_messagequeue_proto_msgTypes[4].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*DBQueuePublishReq); i {
case 0:
return &v.state
@@ -638,7 +638,7 @@ func file_test_rpc_messagequeue_proto_init() {
return nil
}
}
file_test_rpc_messagequeue_proto_msgTypes[5].Exporter = func(v interface{}, i int) interface{} {
file_rpcproto_messagequeue_proto_msgTypes[5].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*DBQueuePublishRes); i {
case 0:
return &v.state
@@ -655,19 +655,19 @@ func file_test_rpc_messagequeue_proto_init() {
out := protoimpl.TypeBuilder{
File: protoimpl.DescBuilder{
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_test_rpc_messagequeue_proto_rawDesc,
RawDescriptor: file_rpcproto_messagequeue_proto_rawDesc,
NumEnums: 2,
NumMessages: 6,
NumExtensions: 0,
NumServices: 0,
},
GoTypes: file_test_rpc_messagequeue_proto_goTypes,
DependencyIndexes: file_test_rpc_messagequeue_proto_depIdxs,
EnumInfos: file_test_rpc_messagequeue_proto_enumTypes,
MessageInfos: file_test_rpc_messagequeue_proto_msgTypes,
GoTypes: file_rpcproto_messagequeue_proto_goTypes,
DependencyIndexes: file_rpcproto_messagequeue_proto_depIdxs,
EnumInfos: file_rpcproto_messagequeue_proto_enumTypes,
MessageInfos: file_rpcproto_messagequeue_proto_msgTypes,
}.Build()
File_test_rpc_messagequeue_proto = out.File
file_test_rpc_messagequeue_proto_rawDesc = nil
file_test_rpc_messagequeue_proto_goTypes = nil
file_test_rpc_messagequeue_proto_depIdxs = nil
File_rpcproto_messagequeue_proto = out.File
file_rpcproto_messagequeue_proto_rawDesc = nil
file_rpcproto_messagequeue_proto_goTypes = nil
file_rpcproto_messagequeue_proto_depIdxs = nil
}

View File

@@ -31,7 +31,7 @@ message DBQueueSubscribeReq {
SubscribeType SubType = 1; //订阅类型
SubscribeMethod Method = 2; //订阅方法
string CustomerId = 3; //消费者Id
int32 FromNodeId = 4;
string FromNodeId = 4;
string RpcMethod = 5;
string TopicName = 6; //主题名称
uint64 StartIndex = 7; //开始位置 ,格式前4位是时间戳秒后面是序号。如果填0时服务自动修改成(4bit 当前时间秒)| (0000 4bit)

View File

@@ -13,9 +13,9 @@ import (
"time"
)
const maxClusterNode int = 128
const maxClusterNode int = 32
type FuncRpcClient func(nodeId string, serviceMethod string,filterRetire bool, client []*Client) (error, int)
type FuncRpcClient func(nodeId string, serviceMethod string,filterRetire bool, client []*Client) (error, []*Client)
type FuncRpcServer func() IServer
const NodeIdNull = ""
@@ -63,7 +63,7 @@ type RpcHandler struct {
funcRpcClient FuncRpcClient
funcRpcServer FuncRpcServer
pClientList []*Client
//pClientList []*Client
}
//type TriggerRpcConnEvent func(bConnect bool, clientSeq uint32, nodeId string)
@@ -135,7 +135,6 @@ func (handler *RpcHandler) InitRpcHandler(rpcHandler IRpcHandler, getClientFun F
handler.mapFunctions = map[string]RpcMethodInfo{}
handler.funcRpcClient = getClientFun
handler.funcRpcServer = getServerFun
handler.pClientList = make([]*Client, maxClusterNode)
handler.RegisterRpc(rpcHandler)
}
@@ -274,7 +273,7 @@ func (handler *RpcHandler) HandlerRpcRequest(request *RpcRequest) {
//普通的rpc请求
v, ok := handler.mapFunctions[request.RpcRequestData.GetServiceMethod()]
if ok == false {
err := "RpcHandler " + handler.rpcHandler.GetName() + "cannot find " + request.RpcRequestData.GetServiceMethod()
err := "RpcHandler " + handler.rpcHandler.GetName() + " cannot find " + request.RpcRequestData.GetServiceMethod()
log.Error("HandlerRpcRequest cannot find serviceMethod",log.String("RpcHandlerName",handler.rpcHandler.GetName()),log.String("serviceMethod",request.RpcRequestData.GetServiceMethod()))
if request.requestHandle != nil {
request.requestHandle(nil, RpcError(err))
@@ -435,9 +434,9 @@ func (handler *RpcHandler) CallMethod(client *Client,ServiceMethod string, param
}
func (handler *RpcHandler) goRpc(processor IRpcProcessor, bCast bool, nodeId string, serviceMethod string, args interface{}) error {
var pClientList [maxClusterNode]*Client
err, count := handler.funcRpcClient(nodeId, serviceMethod,false, pClientList[:])
if count == 0 {
pClientList :=make([]*Client,0,maxClusterNode)
err, pClientList := handler.funcRpcClient(nodeId, serviceMethod,false, pClientList)
if len(pClientList) == 0 {
if err != nil {
log.Error("call serviceMethod is failed",log.String("serviceMethod",serviceMethod),log.ErrorAttr("error",err))
} else {
@@ -446,13 +445,13 @@ func (handler *RpcHandler) goRpc(processor IRpcProcessor, bCast bool, nodeId str
return err
}
if count > 1 && bCast == false {
if len(pClientList) > 1 && bCast == false {
log.Error("cannot call serviceMethod more then 1 node",log.String("serviceMethod",serviceMethod))
return errors.New("cannot call more then 1 node")
}
//2.rpcClient调用
for i := 0; i < count; i++ {
for i := 0; i < len(pClientList); i++ {
pCall := pClientList[i].Go(pClientList[i].GetTargetNodeId(),DefaultRpcTimeout,handler.rpcHandler,true, serviceMethod, args, nil)
if pCall.Err != nil {
err = pCall.Err
@@ -465,16 +464,16 @@ func (handler *RpcHandler) goRpc(processor IRpcProcessor, bCast bool, nodeId str
}
func (handler *RpcHandler) callRpc(timeout time.Duration,nodeId string, serviceMethod string, args interface{}, reply interface{}) error {
var pClientList [maxClusterNode]*Client
err, count := handler.funcRpcClient(nodeId, serviceMethod,false, pClientList[:])
pClientList :=make([]*Client,0,maxClusterNode)
err, pClientList := handler.funcRpcClient(nodeId, serviceMethod,false, pClientList)
if err != nil {
log.Error("Call serviceMethod is failed",log.ErrorAttr("error",err))
return err
} else if count <= 0 {
} else if len(pClientList) <= 0 {
err = errors.New("Call serviceMethod is error:cannot find " + serviceMethod)
log.Error("cannot find serviceMethod",log.String("serviceMethod",serviceMethod))
return err
} else if count > 1 {
} else if len(pClientList) > 1 {
log.Error("Cannot call more then 1 node!",log.String("serviceMethod",serviceMethod))
return errors.New("cannot call more then 1 node")
}
@@ -509,9 +508,9 @@ func (handler *RpcHandler) asyncCallRpc(timeout time.Duration,nodeId string, ser
}
reply := reflect.New(fVal.Type().In(0).Elem()).Interface()
var pClientList [2]*Client
err, count := handler.funcRpcClient(nodeId, serviceMethod,false, pClientList[:])
if count == 0 || err != nil {
pClientList :=make([]*Client,0,1)
err, pClientList := handler.funcRpcClient(nodeId, serviceMethod,false, pClientList[:])
if len(pClientList) == 0 || err != nil {
if err == nil {
if nodeId != NodeIdNull {
err = fmt.Errorf("cannot find %s from nodeId %d",serviceMethod,nodeId)
@@ -524,7 +523,7 @@ func (handler *RpcHandler) asyncCallRpc(timeout time.Duration,nodeId string, ser
return emptyCancelRpc,nil
}
if count > 1 {
if len(pClientList) > 1 {
err := errors.New("cannot call more then 1 node")
fVal.Call([]reflect.Value{reflect.ValueOf(reply), reflect.ValueOf(err)})
log.Error("cannot call more then 1 node",log.String("serviceMethod",serviceMethod))
@@ -593,12 +592,13 @@ func (handler *RpcHandler) CastGo(serviceMethod string, args interface{}) error
func (handler *RpcHandler) RawGoNode(rpcProcessorType RpcProcessorType, nodeId string, rpcMethodId uint32, serviceName string, rawArgs []byte) error {
processor := GetProcessor(uint8(rpcProcessorType))
err, count := handler.funcRpcClient(nodeId, serviceName,false, handler.pClientList)
if count == 0 || err != nil {
pClientList := make([]*Client,0,1)
err, pClientList := handler.funcRpcClient(nodeId, serviceName,false, pClientList)
if len(pClientList) == 0 || err != nil {
log.Error("call serviceMethod is failed",log.ErrorAttr("error",err))
return err
}
if count > 1 {
if len(pClientList) > 1 {
err := errors.New("cannot call more then 1 node")
log.Error("cannot call more then 1 node",log.String("serviceName",serviceName))
return err
@@ -606,14 +606,14 @@ func (handler *RpcHandler) RawGoNode(rpcProcessorType RpcProcessorType, nodeId s
//2.rpcClient调用
//如果调用本结点服务
for i := 0; i < count; i++ {
for i := 0; i < len(pClientList); i++ {
//跨node调用
pCall := handler.pClientList[i].RawGo(handler.pClientList[i].GetTargetNodeId(),DefaultRpcTimeout,handler.rpcHandler,processor, true, rpcMethodId, serviceName, rawArgs, nil)
pCall := pClientList[i].RawGo(pClientList[i].GetTargetNodeId(),DefaultRpcTimeout,handler.rpcHandler,processor, true, rpcMethodId, serviceName, rawArgs, nil)
if pCall.Err != nil {
err = pCall.Err
}
handler.pClientList[i].RemovePending(pCall.Seq)
pClientList[i].RemovePending(pCall.Seq)
ReleaseCall(pCall)
}

View File

@@ -14,11 +14,14 @@ import (
"sync"
"sync/atomic"
"github.com/duanhf2012/origin/v2/concurrent"
"encoding/json"
)
var timerDispatcherLen = 100000
var maxServiceEventChannelNum = 2000000
type IService interface {
concurrent.IConcurrent
Init(iService IService,getClientFun rpc.FuncRpcClient,getServerFun rpc.FuncRpcServer,serviceCfg interface{})
@@ -293,6 +296,24 @@ func (s *Service) GetServiceCfg()interface{}{
return s.serviceCfg
}
func (s *Service) ParseServiceCfg(cfg interface{}) error{
if s.serviceCfg == nil {
return errors.New("no service configuration found")
}
rv := reflect.ValueOf(s.serviceCfg)
if rv.Kind() == reflect.Ptr && rv.IsNil() {
return errors.New("no service configuration found")
}
bytes,err := json.Marshal(s.serviceCfg)
if err != nil {
return err
}
return json.Unmarshal(bytes,cfg)
}
func (s *Service) GetProfiler() *profiler.Profiler{
return s.profiler
}
@@ -368,12 +389,12 @@ func (s *Service) UnRegNatsConnListener() {
func (s *Service) RegDiscoverListener(discoveryServiceListener rpc.IDiscoveryServiceListener) {
s.discoveryServiceLister = discoveryServiceListener
s.RegEventReceiverFunc(event.Sys_Event_DiscoverService,s.GetEventHandler(),s.OnDiscoverServiceEvent)
RegDiscoveryServiceEventFun(s.GetName())
RegRpcEventFun(s.GetName())
}
func (s *Service) UnRegDiscoverListener() {
s.UnRegEventReceiverFunc(event.Sys_Event_DiscoverService,s.GetEventHandler())
UnRegDiscoveryServiceEventFun(s.GetName())
UnRegRpcEventFun(s.GetName())
}
func (s *Service) PushRpcRequest(rpcRequest *rpc.RpcRequest) error{

View File

@@ -14,9 +14,6 @@ type RegDiscoveryServiceEventFunType func(serviceName string)
var RegRpcEventFun RegRpcEventFunType
var UnRegRpcEventFun RegRpcEventFunType
var RegDiscoveryServiceEventFun RegDiscoveryServiceEventFunType
var UnRegDiscoveryServiceEventFun RegDiscoveryServiceEventFunType
func init(){
mapServiceName = map[string]IService{}
setupServiceList = []IService{}
@@ -26,7 +23,7 @@ func Init() {
for _,s := range setupServiceList {
err := s.OnInit()
if err != nil {
log.SError("Failed to initialize "+s.GetName()+" service:"+err.Error())
log.Error("Failed to initialize "+s.GetName()+" service",log.ErrorAttr("err",err))
os.Exit(1)
}
}

View File

@@ -0,0 +1,314 @@
package ginmodule
import (
"context"
"github.com/duanhf2012/origin/v2/event"
"github.com/duanhf2012/origin/v2/log"
"github.com/duanhf2012/origin/v2/service"
"github.com/gin-gonic/gin"
"log/slog"
"net/http"
"strings"
"time"
"io"
)
type IGinProcessor interface {
Process(data *gin.Context) (*gin.Context, error)
}
type GinModule struct {
service.Module
*gin.Engine
srv *http.Server
listenAddr string
handleTimeout time.Duration
processor []IGinProcessor
}
func (gm *GinModule) Init(addr string, handleTimeout time.Duration,engine *gin.Engine) {
gm.listenAddr = addr
gm.handleTimeout = handleTimeout
gm.Engine = engine
}
func (gm *GinModule) SetupDataProcessor(processor ...IGinProcessor) {
gm.processor = processor
}
func (gm *GinModule) AppendDataProcessor(processor ...IGinProcessor) {
gm.processor = append(gm.processor, processor...)
}
func (gm *GinModule) OnInit() error {
if gm.Engine == nil {
gm.Engine = gin.Default()
}
gm.srv = &http.Server{
Addr: gm.listenAddr,
Handler: gm.Engine,
}
gm.Engine.Use(Logger())
gm.Engine.Use(gin.Recovery())
gm.GetEventProcessor().RegEventReceiverFunc(event.Sys_Event_Gin_Event, gm.GetEventHandler(), gm.eventHandler)
return nil
}
func (gm *GinModule) eventHandler(ev event.IEvent) {
ginEvent := ev.(*GinEvent)
for _, handler := range ginEvent.handlersChain {
handler(&ginEvent.c)
}
//ginEvent.chanWait <- struct{}{}
}
func (gm *GinModule) Start() {
gm.srv.Addr = gm.listenAddr
log.Info("http start listen", slog.Any("addr", gm.listenAddr))
go func() {
err := gm.srv.ListenAndServe()
if err != nil {
log.Error("ListenAndServe error", slog.Any("error", err.Error()))
}
}()
}
func (gm *GinModule) StartTLS(certFile, keyFile string) {
log.Info("http start listen", slog.Any("addr", gm.listenAddr))
go func() {
err := gm.srv.ListenAndServeTLS(certFile, keyFile)
if err != nil {
log.Fatal("ListenAndServeTLS error", slog.Any("error", err.Error()))
}
}()
}
func (gm *GinModule) Stop(ctx context.Context) {
if err := gm.srv.Shutdown(ctx); err != nil {
log.SError("Server Shutdown", slog.Any("error", err))
}
}
type SafeContext struct {
*gin.Context
chanWait chan struct{}
}
func (c *SafeContext) JSONAndDone(code int, obj any) {
c.Context.JSON(code,obj)
c.Done()
}
func (c *SafeContext) AsciiJSONAndDone(code int, obj any){
c.Context.AsciiJSON(code,obj)
c.Done()
}
func (c *SafeContext) PureJSONAndDone(code int, obj any){
c.Context.PureJSON(code,obj)
c.Done()
}
func (c *SafeContext) XMLAndDone(code int, obj any){
c.Context.XML(code,obj)
c.Done()
}
func (c *SafeContext) YAMLAndDone(code int, obj any){
c.Context.YAML(code,obj)
c.Done()
}
func (c *SafeContext) TOMLAndDone(code int, obj any){
c.Context.TOML(code,obj)
c.Done()
}
func (c *SafeContext) ProtoBufAndDone(code int, obj any){
c.Context.ProtoBuf(code,obj)
c.Done()
}
func (c *SafeContext) StringAndDone(code int, format string, values ...any){
c.Context.String(code,format,values...)
c.Done()
}
func (c *SafeContext) RedirectAndDone(code int, location string){
c.Context.Redirect(code,location)
c.Done()
}
func (c *SafeContext) DataAndDone(code int, contentType string, data []byte){
c.Context.Data(code,contentType,data)
c.Done()
}
func (c *SafeContext) DataFromReaderAndDone(code int, contentLength int64, contentType string, reader io.Reader, extraHeaders map[string]string){
c.DataFromReader(code,contentLength,contentType,reader,extraHeaders)
c.Done()
}
func (c *SafeContext) HTMLAndDone(code int, name string, obj any){
c.Context.HTML(code,name,obj)
c.Done()
}
func (c *SafeContext) IndentedJSONAndDone(code int, obj any){
c.Context.IndentedJSON(code,obj)
c.Done()
}
func (c *SafeContext) SecureJSONAndDone(code int, obj any){
c.Context.SecureJSON(code,obj)
c.Done()
}
func (c *SafeContext) JSONPAndDone(code int, obj any){
c.Context.JSONP(code,obj)
c.Done()
}
func (c *SafeContext) Done(){
c.chanWait <- struct{}{}
}
type GinEvent struct {
handlersChain []SafeHandlerFunc
c SafeContext
}
type SafeHandlerFunc func(*SafeContext)
func (ge *GinEvent) GetEventType() event.EventType {
return event.Sys_Event_Gin_Event
}
func (gm *GinModule) handleMethod(httpMethod, relativePath string, handlers ...SafeHandlerFunc) gin.IRoutes {
return gm.Engine.Handle(httpMethod, relativePath, func(c *gin.Context) {
for _, p := range gm.processor {
_, err := p.Process(c)
if err != nil {
return
}
}
var ev GinEvent
chanWait := make(chan struct{},2)
ev.c.chanWait = chanWait
ev.handlersChain = handlers
ev.c.Context = c
gm.NotifyEvent(&ev)
ctx,cancel := context.WithTimeout(context.Background(), gm.handleTimeout)
defer cancel()
select{
case <-ctx.Done():
log.Error("GinModule process timeout", slog.Any("path", c.Request.URL.Path))
c.AbortWithStatus(http.StatusRequestTimeout)
case <-chanWait:
}
})
}
// GET 回调处理是在gin协程中
func (gm *GinModule) GET(relativePath string, handlers ...gin.HandlerFunc) gin.IRoutes {
return gm.Engine.GET(relativePath, handlers...)
}
// POST 回调处理是在gin协程中
func (gm *GinModule) POST(relativePath string, handlers ...gin.HandlerFunc) gin.IRoutes {
return gm.Engine.POST(relativePath, handlers...)
}
// DELETE 回调处理是在gin协程中
func (gm *GinModule) DELETE(relativePath string, handlers ...gin.HandlerFunc) gin.IRoutes {
return gm.Engine.DELETE(relativePath, handlers...)
}
// PATCH 回调处理是在gin协程中
func (gm *GinModule) PATCH(relativePath string, handlers ...gin.HandlerFunc) gin.IRoutes {
return gm.Engine.PATCH(relativePath, handlers...)
}
// Put 回调处理是在gin协程中
func (gm *GinModule) Put(relativePath string, handlers ...gin.HandlerFunc) gin.IRoutes {
return gm.Engine.PUT(relativePath, handlers...)
}
// SafeGET 回调处理是在service协程中
func (gm *GinModule) SafeGET(relativePath string, handlers ...SafeHandlerFunc) gin.IRoutes {
return gm.handleMethod(http.MethodGet, relativePath, handlers...)
}
// SafePOST 回调处理是在service协程中
func (gm *GinModule) SafePOST(relativePath string, handlers ...SafeHandlerFunc) gin.IRoutes {
return gm.handleMethod(http.MethodPost, relativePath, handlers...)
}
// SafeDELETE 回调处理是在service协程中
func (gm *GinModule) SafeDELETE(relativePath string, handlers ...SafeHandlerFunc) gin.IRoutes {
return gm.handleMethod(http.MethodDelete, relativePath, handlers...)
}
// SafePATCH 回调处理是在service协程中
func (gm *GinModule) SafePATCH(relativePath string, handlers ...SafeHandlerFunc) gin.IRoutes {
return gm.handleMethod(http.MethodPatch, relativePath, handlers...)
}
// SafePut 回调处理是在service协程中
func (gm *GinModule) SafePut(relativePath string, handlers ...SafeHandlerFunc) gin.IRoutes {
return gm.handleMethod(http.MethodPut, relativePath, handlers...)
}
func GetIPWithProxyHeaders(c *gin.Context) string {
// 尝试从 X-Real-IP 头部获取真实 IP
ip := c.GetHeader("X-Real-IP")
// 如果 X-Real-IP 头部不存在,则尝试从 X-Forwarded-For 头部获取
if ip == "" {
ip = c.GetHeader("X-Forwarded-For")
}
// 如果两者都不存在,则使用默认的 ClientIP 方法获取 IP
if ip == "" {
ip = c.ClientIP()
}
return ip
}
func GetIPWithValidatedProxyHeaders(c *gin.Context) string {
// 获取代理头部
proxyHeaders := c.Request.Header.Get("X-Real-IP,X-Forwarded-For")
// 分割代理头部,取第一个 IP 作为真实 IP
ips := strings.Split(proxyHeaders, ",")
ip := strings.TrimSpace(ips[0])
// 如果 IP 格式合法,则使用获取到的 IP否则使用默认的 ClientIP 方法获取
if isValidIP(ip) {
return ip
} else {
ip = c.ClientIP()
return ip
}
}
// isValidIP 判断 IP 格式是否合法
func isValidIP(ip string) bool {
// 此处添加自定义的 IP 格式验证逻辑
// 例如,使用正则表达式验证 IP 格式
// ...
if ip == "" {
return false
}
return true
}

View File

@@ -0,0 +1,79 @@
package ginmodule
import (
"fmt"
"github.com/duanhf2012/origin/v2/log"
"github.com/gin-gonic/gin"
"time"
)
// Logger 是一个自定义的日志中间件
func Logger() gin.HandlerFunc {
return func(c *gin.Context) {
// 处理请求前记录日志
// 开始时间
startTime := time.Now()
// 调用该请求的剩余处理程序
c.Next()
// 结束时间
endTime := time.Now()
// 执行时间
latencyTime := endTime.Sub(startTime)
// 请求IP
clientIP := c.ClientIP()
// remoteIP := c.RemoteIP()
// 请求方式
reqMethod := c.Request.Method
// 请求路由
reqUri := c.Request.RequestURI
// 请求协议
reqProto := c.Request.Proto
// 请求来源
repReferer := c.Request.Referer()
// 请求UA
reqUA := c.Request.UserAgent()
// 请求响应内容长度
resLength := c.Writer.Size()
if resLength < 0 {
resLength = 0
}
// 响应状态码
statusCode := c.Writer.Status()
log.SDebug(fmt.Sprintf(
"%s | %3d | %s %10s | \033[44;37m%-6s\033[0m %s %s | %10v | \"%s\" \"%s\"",
colorForStatus(statusCode),
statusCode,
colorForStatus(0),
clientIP,
// remoteIP,
reqMethod,
reqUri,
reqProto,
latencyTime,
reqUA,
repReferer,
))
}
}
// colorForStatus 根据 HTTP 状态码返回 ANSI 颜色代码
func colorForStatus(code int) string {
switch {
case code >= 200 && code < 300:
return "\033[42;1;37m" // green
case code >= 300 && code < 400:
return "\033[34m" // blue
case code >= 400 && code < 500:
return "\033[33m" // yellow
case code == 0:
return "\033[0m" // cancel
default:
return "\033[31m" // red
}
}

View File

@@ -0,0 +1,65 @@
package kafkamodule
import "github.com/IBM/sarama"
type KafkaAdmin struct {
sarama.ClusterAdmin
mapTopic map[string]sarama.TopicDetail
}
func (ka *KafkaAdmin) Setup(kafkaVersion string, addrs []string) error {
config := sarama.NewConfig()
var err error
config.Version, err = sarama.ParseKafkaVersion(kafkaVersion)
if err != nil {
return err
}
ka.ClusterAdmin, err = sarama.NewClusterAdmin(addrs, config)
if err != nil {
return err
}
ka.mapTopic, err = ka.GetTopics()
if err != nil {
ka.ClusterAdmin.Close()
return err
}
return nil
}
func (ka *KafkaAdmin) RefreshTopic() error {
var err error
ka.mapTopic, err = ka.GetTopics()
return err
}
func (ka *KafkaAdmin) HasTopic(topic string) bool {
_, ok := ka.mapTopic[topic]
return ok
}
func (ka *KafkaAdmin) GetTopicDetail(topic string) *sarama.TopicDetail {
topicDetail, ok := ka.mapTopic[topic]
if ok == false {
return nil
}
return &topicDetail
}
func (ka *KafkaAdmin) GetTopics() (map[string]sarama.TopicDetail, error) {
return ka.ListTopics()
}
// CreateTopic 创建主题
// numPartitions分区数
// replicationFactor副本数
// validateOnly参数执行操作时只进行参数验证而不实际执行操作
func (ka *KafkaAdmin) CreateTopic(topic string, numPartitions int32, replicationFactor int16, validateOnly bool) error {
return ka.ClusterAdmin.CreateTopic(topic, &sarama.TopicDetail{NumPartitions: numPartitions, ReplicationFactor: replicationFactor}, validateOnly)
}

View File

@@ -0,0 +1,289 @@
package kafkamodule
import (
"context"
"fmt"
"github.com/IBM/sarama"
"github.com/duanhf2012/origin/v2/log"
"sync"
"time"
)
type ConsumerGroup struct {
sarama.ConsumerGroup
waitGroup sync.WaitGroup
chanExit chan error
ready chan bool
cancel context.CancelFunc
groupId string
}
func NewConsumerConfig(kafkaVersion string, assignor string, offsetsInitial int64) (*sarama.Config, error) {
var err error
config := sarama.NewConfig()
config.Version, err = sarama.ParseKafkaVersion(kafkaVersion)
config.Consumer.Offsets.Initial = offsetsInitial
config.Consumer.Offsets.AutoCommit.Enable = false
switch assignor {
case "sticky":
// 黏性roundRobin,rebalance之后首先保证前面的分配,从后面剥离
// topic:T0{P0,P1,P2,P3,P4,P5},消费者:C1,C2
// ---------------before rebalance:即roundRobin
// C1: T0{P0} T0{P2} T0{P4}
// C2: T0{P1} T0{P3} T0{P5}
// ----------------after rebalance:增加了一个消费者
// C1: T0{P0} T0{P2}
// C2: T0{P1} T0{P3}
// C3: T0{P4} T0{P5} until每个消费者的分区数误差不超过1
config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.NewBalanceStrategySticky()}
case "roundrobin":
// roundRobin --逐个平均分发
// topic: T0{P0,P1,P2},T1{P0,P1,P2,P3}两个消费者C1,C2
// C1: T0{P0} T0{P2} T1{P1} T1{P3}
// C2: T0{P1} T1{P0} T1{P2}
config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.NewBalanceStrategyRoundRobin()}
case "range":
// 默认值 --一次平均分发
// topic: T0{P0,P1,P2,P3},T1{P0,P1,P2,P3},两个消费者C1,C2
// T1分区总数6 / 消费者数2 = 3 ,即该会话的分区每个消费者分3个
// T2分区总数4 / 消费者数2 = 2 ,即该会话的分区每个消费者分2个
// C1: T0{P0, P1, P2} T1{P0, P1}
// C2: T0{P3, P4, P5} T1{P2, P3}
config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.NewBalanceStrategyRange()}
default:
return nil, fmt.Errorf("Unrecognized consumer group partition assignor: %s", assignor)
}
if err != nil {
return nil, err
}
return config, nil
}
type IMsgReceiver interface {
Receiver(msgs []*sarama.ConsumerMessage) bool
}
func (c *ConsumerGroup) Setup(addr []string, topics []string, groupId string, config *sarama.Config, receiverInterval time.Duration, maxReceiverNum int, msgReceiver IMsgReceiver) error {
var err error
c.ConsumerGroup, err = sarama.NewConsumerGroup(addr, groupId, config)
if err != nil {
return nil
}
c.groupId = groupId
c.chanExit = make(chan error, 1)
var handler ConsumerGroupHandler
handler.receiver = msgReceiver
handler.maxReceiverNum = maxReceiverNum
handler.receiverInterval = receiverInterval
handler.chanExit = c.chanExit
var ctx context.Context
ctx, c.cancel = context.WithCancel(context.Background())
c.waitGroup.Add(1)
go func() {
defer c.waitGroup.Done()
for {
if err = c.Consume(ctx, topics, &handler); err != nil {
// 当setup失败的时候error会返回到这里
log.Error("Error from consumer", log.Any("err", err))
return
}
// check if context was cancelled, signaling that the consumer should stop
if ctx.Err() != nil {
log.Info("consumer stop", log.Any("info", ctx.Err()))
}
c.chanExit <- err
}
}()
err = <-c.chanExit
//已经准备好了
return err
}
func (c *ConsumerGroup) Close() {
log.Info("close consumerGroup")
//1.cancel掉
c.cancel()
//2.关闭连接
err := c.ConsumerGroup.Close()
if err != nil {
log.Error("close consumerGroup fail", log.Any("err", err.Error()))
}
//3.等待退出
c.waitGroup.Wait()
}
type ConsumerGroupHandler struct {
receiver IMsgReceiver
receiverInterval time.Duration
maxReceiverNum int
//mapTopicOffset map[string]map[int32]int //map[topic]map[partitionId]offsetInfo
mapTopicData map[string]*MsgData
mx sync.Mutex
chanExit chan error
isRebalance bool //是否为再平衡
//stopSig *int32
}
type MsgData struct {
sync.Mutex
msg []*sarama.ConsumerMessage
mapPartitionOffset map[int32]int64
}
func (ch *ConsumerGroupHandler) Flush(session sarama.ConsumerGroupSession, topic string) {
if topic != "" {
msgData := ch.GetMsgData(topic)
msgData.flush(session, ch.receiver, topic)
return
}
for tp, msgData := range ch.mapTopicData {
msgData.flush(session, ch.receiver, tp)
}
}
func (ch *ConsumerGroupHandler) GetMsgData(topic string) *MsgData {
ch.mx.Lock()
defer ch.mx.Unlock()
msgData := ch.mapTopicData[topic]
if msgData == nil {
msgData = &MsgData{}
msgData.msg = make([]*sarama.ConsumerMessage, 0, ch.maxReceiverNum)
ch.mapTopicData[topic] = msgData
}
return msgData
}
func (md *MsgData) flush(session sarama.ConsumerGroupSession, receiver IMsgReceiver, topic string) {
if len(md.msg) == 0 {
return
}
//发送给接收者
for {
ok := receiver.Receiver(md.msg)
if ok == true {
break
}
}
for pId, offset := range md.mapPartitionOffset {
session.MarkOffset(topic, pId, offset+1, "")
log.Info(fmt.Sprintf("topic %s,pid %d,offset %d", topic, pId, offset+1))
}
session.Commit()
//log.Info("commit")
//time.Sleep(1000 * time.Second)
//置空
md.msg = md.msg[:0]
clear(md.mapPartitionOffset)
}
func (md *MsgData) appendMsg(session sarama.ConsumerGroupSession, msg *sarama.ConsumerMessage, receiver IMsgReceiver, maxReceiverNum int) {
md.Lock()
defer md.Unlock()
//收到的offset只会越来越大在
if md.mapPartitionOffset == nil {
md.mapPartitionOffset = make(map[int32]int64, 10)
}
md.mapPartitionOffset[msg.Partition] = msg.Offset
md.msg = append(md.msg, msg)
if len(md.msg) < maxReceiverNum {
return
}
md.flush(session, receiver, msg.Topic)
}
func (ch *ConsumerGroupHandler) AppendMsg(session sarama.ConsumerGroupSession, msg *sarama.ConsumerMessage) {
dataMsg := ch.GetMsgData(msg.Topic)
dataMsg.appendMsg(session, msg, ch.receiver, ch.maxReceiverNum)
}
func (ch *ConsumerGroupHandler) Setup(session sarama.ConsumerGroupSession) error {
ch.mapTopicData = make(map[string]*MsgData, 128)
if ch.isRebalance == false {
ch.chanExit <- nil
}
ch.isRebalance = true
return nil
}
func (ch *ConsumerGroupHandler) Cleanup(session sarama.ConsumerGroupSession) error {
ch.Flush(session, "")
return nil
}
func (ch *ConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
ticker := time.NewTicker(ch.receiverInterval)
for {
select {
case msg := <-claim.Messages():
if msg == nil {
log.SWarning("claim will exit", log.Any("topic", claim.Topic()), log.Any("Partition", claim.Partition()))
return nil
}
ch.AppendMsg(session, msg)
case <-ticker.C:
ch.Flush(session, claim.Topic())
case <-session.Context().Done():
return nil
}
}
}
/*
阿里云参数说明https://sls.aliyun.com/doc/oscompatibledemo/sarama_go_kafka_consume.html
conf.Net.TLS.Enable = true
conf.Net.SASL.Enable = true
conf.Net.SASL.User = project
conf.Net.SASL.Password = fmt.Sprintf("%s#%s", accessId, accessKey)
conf.Net.SASL.Mechanism = "PLAIN"
conf.Net.TLS.Enable = true
conf.Net.SASL.Enable = true
conf.Net.SASL.User = project
conf.Net.SASL.Password = fmt.Sprintf("%s#%s", accessId, accessKey)
conf.Net.SASL.Mechanism = "PLAIN"
conf.Consumer.Fetch.Min = 1
conf.Consumer.Fetch.Default = 1024 * 1024
conf.Consumer.Retry.Backoff = 2 * time.Second
conf.Consumer.MaxWaitTime = 250 * time.Millisecond
conf.Consumer.MaxProcessingTime = 100 * time.Millisecond
conf.Consumer.Return.Errors = false
conf.Consumer.Offsets.AutoCommit.Enable = true
conf.Consumer.Offsets.AutoCommit.Interval = 1 * time.Second
conf.Consumer.Offsets.Initial = sarama.OffsetOldest
conf.Consumer.Offsets.Retry.Max = 3
*/

View File

@@ -0,0 +1,146 @@
package kafkamodule
import (
"context"
"github.com/IBM/sarama"
"github.com/duanhf2012/origin/v2/log"
"github.com/duanhf2012/origin/v2/service"
"time"
)
type IProducer interface {
}
type SyncProducer struct {
}
type AsyncProducer struct {
}
type Producer struct {
service.Module
sarama.SyncProducer
sarama.AsyncProducer
}
// NewProducerConfig 新建producerConfig
// kafkaVersion kafka版本
// returnErrreturnSucc 是否返回错误与成功
// requiredAcks -1 #全量同步确认,强可靠性保证(当所有的 leader 和 follower 都接收成功时)#WaitForAll 1 #leader 确认收到, 默认(仅 leader 反馈)#WaitForLocal 0 #不确认,但是吞吐量大(不 care 结果) #NoResponse
// Idempotent幂等 确保信息都准确写入一份副本,用于幂等生产者当这一项设置为true的时候生产者将保证生产的消息一定是有序且精确一次的
// partitioner 生成分区器,用于选择向哪个分区发送信息,默认情况下对消息密钥进行散列
func NewProducerConfig(kafkaVersion string, returnErr bool, returnSucc bool, requiredAcks sarama.RequiredAcks, Idempotent bool,
partitioner sarama.PartitionerConstructor) (*sarama.Config, error) {
config := sarama.NewConfig()
var err error
config.Version, err = sarama.ParseKafkaVersion(kafkaVersion)
if err != nil {
return nil, err
}
config.Producer.Return.Errors = returnErr
config.Producer.Return.Successes = returnSucc
config.Producer.RequiredAcks = requiredAcks
config.Producer.Partitioner = partitioner
config.Producer.Timeout = 10 * time.Second
config.Producer.Idempotent = Idempotent
if Idempotent == true {
config.Net.MaxOpenRequests = 1
}
return config, nil
}
func (p *Producer) SyncSetup(addr []string, config *sarama.Config) error {
var err error
p.SyncProducer, err = sarama.NewSyncProducer(addr, config)
if err != nil {
return err
}
return nil
}
func (p *Producer) ASyncSetup(addr []string, config *sarama.Config) error {
var err error
p.AsyncProducer, err = sarama.NewAsyncProducer(addr, config)
if err != nil {
return err
}
go func() {
p.asyncRun()
}()
return nil
}
func (p *Producer) asyncRun() {
for {
select {
case sm := <-p.Successes():
if sm.Metadata == nil {
break
}
asyncReturn := sm.Metadata.(*AsyncReturn)
asyncReturn.chanReturn <- asyncReturn
case em := <-p.Errors():
log.Error("async kafkamodule error", log.ErrorAttr("err", em.Err))
if em.Msg.Metadata == nil {
break
}
asyncReturn := em.Msg.Metadata.(*AsyncReturn)
asyncReturn.Err = em.Err
asyncReturn.chanReturn <- asyncReturn
}
}
}
type AsyncReturn struct {
Msg *sarama.ProducerMessage
Err error
chanReturn chan *AsyncReturn
}
func (ar *AsyncReturn) WaitOk(ctx context.Context) (*sarama.ProducerMessage, error) {
asyncReturn := ar.Msg.Metadata.(*AsyncReturn)
select {
case <-asyncReturn.chanReturn:
return asyncReturn.Msg, asyncReturn.Err
case <-ctx.Done():
return nil, ctx.Err()
}
}
func (p *Producer) AsyncSendMessage(msg *sarama.ProducerMessage) *AsyncReturn {
asyncReturn := AsyncReturn{Msg: msg, chanReturn: make(chan *AsyncReturn, 1)}
msg.Metadata = &asyncReturn
p.AsyncProducer.Input() <- msg
return &asyncReturn
}
func (p *Producer) AsyncPushMessage(msg *sarama.ProducerMessage) {
p.AsyncProducer.Input() <- msg
}
func (p *Producer) Close() {
if p.SyncProducer != nil {
p.SyncProducer.Close()
p.SyncProducer = nil
}
if p.AsyncProducer != nil {
p.AsyncProducer.Close()
p.AsyncProducer = nil
}
}
func (p *Producer) SendMessage(msg *sarama.ProducerMessage) (partition int32, offset int64, err error) {
return p.SyncProducer.SendMessage(msg)
}
func (p *Producer) SendMessages(msgs []*sarama.ProducerMessage) error {
return p.SyncProducer.SendMessages(msgs)
}

View File

@@ -0,0 +1,151 @@
package kafkamodule
import (
"context"
"fmt"
"github.com/IBM/sarama"
"testing"
"time"
)
// 对各参数和机制名称的说明https://blog.csdn.net/u013311345/article/details/129217728
type MsgReceiver struct {
t *testing.T
}
func (mr *MsgReceiver) Receiver(msgs []*sarama.ConsumerMessage) bool {
for _, m := range msgs {
mr.t.Logf("time:%s, topic:%s, partition:%d, offset:%d, key:%s, value:%s", time.Now().Format("2006-01-02 15:04:05.000"), m.Topic, m.Partition, m.Offset, m.Key, string(m.Value))
}
return true
}
var addr = []string{"192.168.13.24:9092", "192.168.13.24:9093", "192.168.13.24:9094", "192.168.13.24:9095"}
var topicName = []string{"test_topic_1", "test_topic_2"}
var kafkaVersion = "3.3.1"
func producer(t *testing.T) {
var admin KafkaAdmin
err := admin.Setup(kafkaVersion, addr)
if err != nil {
t.Fatal(err)
}
for _, tName := range topicName {
if admin.HasTopic(tName) == false {
err = admin.CreateTopic(tName, 2, 2, false)
t.Log(err)
}
}
var pd Producer
cfg, err := NewProducerConfig(kafkaVersion, true, true, sarama.WaitForAll, false, sarama.NewHashPartitioner)
if err != nil {
t.Fatal(err)
}
err = pd.SyncSetup(addr, cfg)
if err != nil {
t.Fatal(err)
}
now := time.Now()
//msgs := make([]*sarama.ProducerMessage, 0, 20000)
for i := 0; i < 20000; i++ {
var msg sarama.ProducerMessage
msg.Key = sarama.StringEncoder(fmt.Sprintf("%d", i))
msg.Topic = topicName[0]
msg.Value = sarama.StringEncoder(fmt.Sprintf("i'm %d", i))
pd.SendMessage(&msg)
//msgs = append(msgs, &msg)
}
//err = pd.SendMessages(msgs)
//t.Log(err)
t.Log(time.Now().Sub(now).Milliseconds())
pd.Close()
}
func producer_async(t *testing.T) {
var admin KafkaAdmin
err := admin.Setup(kafkaVersion, addr)
if err != nil {
t.Fatal(err)
}
for _, tName := range topicName {
if admin.HasTopic(tName) == false {
err = admin.CreateTopic(tName, 10, 2, false)
t.Log(err)
}
}
var pd Producer
cfg, err := NewProducerConfig(kafkaVersion, true, true, sarama.WaitForAll, false, sarama.NewHashPartitioner)
if err != nil {
t.Fatal(err)
}
err = pd.ASyncSetup(addr, cfg)
if err != nil {
t.Fatal(err)
}
now := time.Now()
msgs := make([]*AsyncReturn, 0, 20000)
for i := 0; i < 200000; i++ {
var msg sarama.ProducerMessage
msg.Key = sarama.StringEncoder(fmt.Sprintf("%d", i))
msg.Topic = topicName[0]
msg.Value = sarama.StringEncoder(fmt.Sprintf("i'm %d", i))
r := pd.AsyncSendMessage(&msg)
msgs = append(msgs, r)
}
//err = pd.SendMessages(msgs)
//t.Log(err)
for _, r := range msgs {
r.WaitOk(context.Background())
//t.Log(m, e)
}
t.Log(time.Now().Sub(now).Milliseconds())
time.Sleep(1000 * time.Second)
pd.Close()
}
func consumer(t *testing.T) {
var admin KafkaAdmin
err := admin.Setup(kafkaVersion, addr)
if err != nil {
t.Fatal(err)
}
for _, tName := range topicName {
if admin.HasTopic(tName) == false {
err = admin.CreateTopic(tName, 10, 2, false)
t.Log(err)
}
}
var cg ConsumerGroup
cfg, err := NewConsumerConfig(kafkaVersion, "sticky", sarama.OffsetOldest)
if err != nil {
t.Fatal(err)
}
err = cg.Setup(addr, topicName, "test_groupId", cfg, 50*time.Second, 10, &MsgReceiver{t: t})
t.Log(err)
time.Sleep(10000 * time.Second)
cg.Close()
}
func TestConsumerAndProducer(t *testing.T) {
producer_async(t)
//go producer(t)
//producer(t)
//consumer(t)
}

View File

@@ -0,0 +1,7 @@
package kafkamodule
type Sasl struct {
UserName string `json:"UserName"`
Passwd string `json:"Passwd"`
InstanceId string `json:"InstanceId"`
}

View File

@@ -1,181 +0,0 @@
package mongomodule
import (
"github.com/duanhf2012/origin/v2/log"
"gopkg.in/mgo.v2"
"gopkg.in/mgo.v2/bson"
"sync"
"time"
"container/heap"
_ "gopkg.in/mgo.v2"
)
// session
type Session struct {
*mgo.Session
ref int
index int
}
type SessionHeap []*Session
func (h SessionHeap) Len() int {
return len(h)
}
func (h SessionHeap) Less(i, j int) bool {
return h[i].ref < h[j].ref
}
func (h SessionHeap) Swap(i, j int) {
h[i], h[j] = h[j], h[i]
h[i].index = i
h[j].index = j
}
func (h *SessionHeap) Push(s interface{}) {
s.(*Session).index = len(*h)
*h = append(*h, s.(*Session))
}
func (h *SessionHeap) Pop() interface{} {
l := len(*h)
s := (*h)[l-1]
s.index = -1
*h = (*h)[:l-1]
return s
}
type DialContext struct {
sync.Mutex
sessions SessionHeap
}
type MongoModule struct {
dailContext *DialContext
}
func (slf *MongoModule) Init(url string,sessionNum uint32,dialTimeout time.Duration, timeout time.Duration) error {
var err error
slf.dailContext, err = dialWithTimeout(url, sessionNum, dialTimeout, timeout)
return err
}
func (slf *MongoModule) Ref() *Session{
return slf.dailContext.Ref()
}
func (slf *MongoModule) UnRef(s *Session) {
slf.dailContext.UnRef(s)
}
// goroutine safe
func dialWithTimeout(url string, sessionNum uint32, dialTimeout time.Duration, timeout time.Duration) (*DialContext, error) {
if sessionNum <= 0 {
sessionNum = 100
log.Release("invalid sessionNum, reset to %v", sessionNum)
}
s, err := mgo.DialWithTimeout(url, dialTimeout)
if err != nil {
return nil, err
}
s.SetMode(mgo.Strong,true)
s.SetSyncTimeout(timeout)
s.SetSocketTimeout(timeout)
c := new(DialContext)
// sessions
c.sessions = make(SessionHeap, sessionNum)
c.sessions[0] = &Session{s, 0, 0}
for i := 1; i < int(sessionNum); i++ {
c.sessions[i] = &Session{s.New(), 0, i}
}
heap.Init(&c.sessions)
return c, nil
}
// goroutine safe
func (c *DialContext) Close() {
c.Lock()
for _, s := range c.sessions {
s.Close()
}
c.Unlock()
}
// goroutine safe
func (c *DialContext) Ref() *Session {
c.Lock()
s := c.sessions[0]
if s.ref == 0 {
s.Refresh()
}
s.ref++
heap.Fix(&c.sessions, 0)
c.Unlock()
return s
}
// goroutine safe
func (c *DialContext) UnRef(s *Session) {
if s == nil {
return
}
c.Lock()
s.ref--
heap.Fix(&c.sessions, s.index)
c.Unlock()
}
// goroutine safe
func (s *Session) EnsureCounter(db string, collection string, id string) error {
err := s.DB(db).C(collection).Insert(bson.M{
"_id": id,
"seq": 0,
})
if mgo.IsDup(err) {
return nil
} else {
return err
}
}
// goroutine safe
func (s *Session) NextSeq(db string, collection string, id string) (int, error) {
var res struct {
Seq int
}
_, err := s.DB(db).C(collection).FindId(id).Apply(mgo.Change{
Update: bson.M{"$inc": bson.M{"seq": 1}},
ReturnNew: true,
}, &res)
return res.Seq, err
}
// goroutine safe
func (s *Session) EnsureIndex(db string, collection string, key []string, bBackground bool) error {
return s.DB(db).C(collection).EnsureIndex(mgo.Index{
Key: key,
Unique: false,
Sparse: true,
Background: bBackground,
})
}
// goroutine safe
func (s *Session) EnsureUniqueIndex(db string, collection string, key []string, bBackground bool) error {
return s.DB(db).C(collection).EnsureIndex(mgo.Index{
Key: key,
Unique: true,
Sparse: true,
Background: bBackground,
})
}

View File

@@ -1,95 +0,0 @@
package mongomodule
import (
"fmt"
_ "gopkg.in/mgo.v2"
"gopkg.in/mgo.v2/bson"
"testing"
"time"
)
type Student struct {
ID bson.ObjectId `bson:"_id"`
Name string `bson: "name"`
Age int `bson: "age"`
Sid string `bson: "sid"`
Status int `bson: "status"`
}
type StudentName struct {
Name string `bson: "name"`
}
func Test_Example(t *testing.T) {
module:=MongoModule{}
module.Init("mongodb://admin:123456@192.168.2.119:27017",100, 5*time.Second,5*time.Second)
// take session
s := module.Take()
c := s.DB("test2").C("t_student")
//2.定义对象
insertData := Student{
ID:bson.NewObjectId(),
Name: "seeta11",
Age: 35, //*^_^*
Sid: "s20180907",
Status: 1,
}
updateData := Student{
Name: "seeta11",
Age: 18,
Sid: "s20180907",
Status: 1,
}
//3.插入数据
err := c.Insert(&insertData)
//4.查找数据
selector := bson.M{"_id":bson.ObjectIdHex("5f25303e999c622d361989b0")}
m:=Student{}
err = c.Find(selector).One(&m)
//5.更新数据
//selector2 := bson.M{"_id":bson.ObjectIdHex("5f25303e999c622d361989b0")}
updateData.ID = bson.ObjectIdHex("5f25303e999c622d361989b0")
err = c.UpdateId(bson.ObjectIdHex("5f25303e999c622d361989b0"),&updateData)
if err != nil {
fmt.Print(err)
}
//6.删除数据
err = c.RemoveId(bson.ObjectIdHex("5f252f09999c622d36198951"))
if err != nil {
fmt.Print(err)
}
//7.序号自增
s.EnsureCounter("test2","t_student","5f252f09999c622d36198951")
for i := 0; i < 3; i++ {
id, err := s.NextSeq("test2", "t_student", "5f252f09999c622d36198951")
if err != nil {
fmt.Println(err)
return
}
fmt.Println(id)
}
//8.setoninsert使用
info,uErr := c.Upsert(bson.M{"_id":bson.ObjectIdHex("5f252f09999c622d36198951")},bson.M{
"$setOnInsert":bson.M{"Name":"setoninsert","Age":55}})
//9.修改部分数字数据
selector1 := bson.M{"_id":bson.ObjectIdHex("60473de655f1012e7453b369")}
update1 := bson.M{"$set":bson.M{"name":"xxxxx","age":1111}}
c.Update(selector1,update1)
fmt.Println(info,uErr)
}

View File

@@ -16,7 +16,7 @@ type CustomerSubscriber struct {
rpc.IRpcHandler
topic string
subscriber *Subscriber
fromNodeId int
fromNodeId string
callBackRpcMethod string
serviceName string
StartIndex uint64
@@ -37,7 +37,7 @@ const (
MethodLast SubscribeMethod = 1 //Last模式以该消费者上次记录的位置开始订阅
)
func (cs *CustomerSubscriber) trySetSubscriberBaseInfo(rpcHandler rpc.IRpcHandler, ss *Subscriber, topic string, subscribeMethod SubscribeMethod, customerId string, fromNodeId int, callBackRpcMethod string, startIndex uint64, oneBatchQuantity int32) error {
func (cs *CustomerSubscriber) trySetSubscriberBaseInfo(rpcHandler rpc.IRpcHandler, ss *Subscriber, topic string, subscribeMethod SubscribeMethod, customerId string, fromNodeId string, callBackRpcMethod string, startIndex uint64, oneBatchQuantity int32) error {
cs.subscriber = ss
cs.fromNodeId = fromNodeId
cs.callBackRpcMethod = callBackRpcMethod
@@ -85,7 +85,7 @@ func (cs *CustomerSubscriber) trySetSubscriberBaseInfo(rpcHandler rpc.IRpcHandle
}
// 开始订阅
func (cs *CustomerSubscriber) Subscribe(rpcHandler rpc.IRpcHandler, ss *Subscriber, topic string, subscribeMethod SubscribeMethod, customerId string, fromNodeId int, callBackRpcMethod string, startIndex uint64, oneBatchQuantity int32) error {
func (cs *CustomerSubscriber) Subscribe(rpcHandler rpc.IRpcHandler, ss *Subscriber, topic string, subscribeMethod SubscribeMethod, customerId string, fromNodeId string, callBackRpcMethod string, startIndex uint64, oneBatchQuantity int32) error {
err := cs.trySetSubscriberBaseInfo(rpcHandler, ss, topic, subscribeMethod, customerId, fromNodeId, callBackRpcMethod, startIndex, oneBatchQuantity)
if err != nil {
return err

View File

@@ -122,5 +122,5 @@ func (ms *MessageQueueService) RPC_Publish(inParam *rpc.DBQueuePublishReq, outPa
func (ms *MessageQueueService) RPC_Subscribe(req *rpc.DBQueueSubscribeReq, res *rpc.DBQueueSubscribeRes) error {
topicRoom := ms.GetTopicRoom(req.TopicName)
return topicRoom.TopicSubscribe(ms.GetRpcHandler(), req.SubType, int32(req.Method), int(req.FromNodeId), req.RpcMethod, req.TopicName, req.CustomerId, req.StartIndex, req.OneBatchQuantity)
return topicRoom.TopicSubscribe(ms.GetRpcHandler(), req.SubType, int32(req.Method), req.FromNodeId, req.RpcMethod, req.TopicName, req.CustomerId, req.StartIndex, req.OneBatchQuantity)
}

View File

@@ -31,7 +31,7 @@ func (ss *Subscriber) PersistTopicData(topic string, topics []TopicData, retryCo
return ss.dataPersist.PersistTopicData(topic, topics, retryCount)
}
func (ss *Subscriber) TopicSubscribe(rpcHandler rpc.IRpcHandler, subScribeType rpc.SubscribeType, subscribeMethod SubscribeMethod, fromNodeId int, callBackRpcMethod string, topic string, customerId string, StartIndex uint64, oneBatchQuantity int32) error {
func (ss *Subscriber) TopicSubscribe(rpcHandler rpc.IRpcHandler, subScribeType rpc.SubscribeType, subscribeMethod SubscribeMethod, fromNodeId string, callBackRpcMethod string, topic string, customerId string, StartIndex uint64, oneBatchQuantity int32) error {
//取消订阅时
if subScribeType == rpc.SubscribeType_Unsubscribe {
ss.UnSubscribe(customerId)

View File

@@ -263,7 +263,7 @@ func (mp *MongoPersist) JugeTimeoutSave() bool{
func (mp *MongoPersist) persistCoroutine(){
defer mp.waitGroup.Done()
for atomic.LoadInt32(&mp.stop)==0 || mp.hasPersistData(){
for atomic.LoadInt32(&mp.stop)==0 {
//间隔时间sleep
time.Sleep(time.Second*1)

View File

@@ -129,7 +129,7 @@ func (slf *WSClient) Run() {
for{
bytes,err := slf.wsConn.ReadMsg()
if err != nil {
log.Debug("read client id %d is error:%+v",slf.id,err)
log.Debug("read client id %s is error:%+v",slf.id,err)
break
}
data,err:=slf.wsService.process.Unmarshal(slf.id,bytes)
@@ -153,7 +153,7 @@ func (ws *WSService) SendMsg(clientid string,msg interface{}) error{
client,ok := ws.mapClient[clientid]
if ok == false{
ws.mapClientLocker.Unlock()
return fmt.Errorf("client %d is disconnect!",clientid)
return fmt.Errorf("client %s is disconnect!",clientid)
}
ws.mapClientLocker.Unlock()