mirror of
https://github.com/duanhf2012/origin.git
synced 2026-02-06 16:14:45 +08:00
Compare commits
17 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d1935b1bbc | ||
|
|
90d54bf3e2 | ||
|
|
78cc33c84e | ||
|
|
9cf21bf418 | ||
|
|
c6d0bd9a19 | ||
|
|
61bf95e457 | ||
|
|
8b2a551ee5 | ||
|
|
927c2ffa37 | ||
|
|
b23b30aac5 | ||
|
|
03f8ba0316 | ||
|
|
277480a7f0 | ||
|
|
647a654a36 | ||
|
|
de483a88f1 | ||
|
|
bbbb511b5f | ||
|
|
0489ee3ef4 | ||
|
|
692dacda0c | ||
|
|
7f86b1007d |
@@ -8,6 +8,8 @@ import (
|
||||
"strings"
|
||||
"sync"
|
||||
"github.com/duanhf2012/origin/v2/event"
|
||||
"errors"
|
||||
"reflect"
|
||||
)
|
||||
|
||||
var configDir = "./config/"
|
||||
@@ -54,7 +56,6 @@ type Cluster struct {
|
||||
|
||||
discoveryInfo DiscoveryInfo //服务发现配置
|
||||
rpcMode RpcMode
|
||||
//masterDiscoveryNodeList []NodeInfo //配置发现Master结点
|
||||
globalCfg interface{} //全局配置
|
||||
|
||||
localServiceCfg map[string]interface{} //map[serviceName]配置数据*
|
||||
@@ -63,6 +64,7 @@ type Cluster struct {
|
||||
locker sync.RWMutex //结点与服务关系保护锁
|
||||
mapRpc map[string]*NodeRpcInfo //nodeId
|
||||
mapServiceNode map[string]map[string]struct{} //map[serviceName]map[NodeId]
|
||||
mapTemplateServiceNode map[string]map[string]struct{} //map[templateServiceName]map[serviceName]nodeId
|
||||
|
||||
callSet rpc.CallSet
|
||||
rpcNats rpc.RpcNats
|
||||
@@ -70,7 +72,6 @@ type Cluster struct {
|
||||
|
||||
rpcEventLocker sync.RWMutex //Rpc事件监听保护锁
|
||||
mapServiceListenRpcEvent map[string]struct{} //ServiceName
|
||||
mapServiceListenDiscoveryEvent map[string]struct{} //ServiceName
|
||||
}
|
||||
|
||||
func GetCluster() *Cluster {
|
||||
@@ -139,6 +140,20 @@ func (cls *Cluster) delServiceNode(serviceName string, nodeId string) {
|
||||
return
|
||||
}
|
||||
|
||||
//处理模板服务
|
||||
splitServiceName := strings.Split(serviceName,":")
|
||||
if len(splitServiceName) == 2 {
|
||||
serviceName = splitServiceName[0]
|
||||
templateServiceName := splitServiceName[1]
|
||||
|
||||
mapService := cls.mapTemplateServiceNode[templateServiceName]
|
||||
delete(mapService,serviceName)
|
||||
|
||||
if len(cls.mapTemplateServiceNode[templateServiceName]) == 0 {
|
||||
delete(cls.mapTemplateServiceNode,templateServiceName)
|
||||
}
|
||||
}
|
||||
|
||||
mapNode := cls.mapServiceNode[serviceName]
|
||||
delete(mapNode, nodeId)
|
||||
if len(mapNode) == 0 {
|
||||
@@ -173,7 +188,20 @@ func (cls *Cluster) serviceDiscoverySetNodeInfo(nodeInfo *NodeInfo) {
|
||||
continue
|
||||
}
|
||||
mapDuplicate[serviceName] = nil
|
||||
if _, ok := cls.mapServiceNode[serviceName]; ok == false {
|
||||
|
||||
//如果是模板服务,则记录模板关系
|
||||
splitServiceName := strings.Split(serviceName,":")
|
||||
if len(splitServiceName) == 2 {
|
||||
serviceName = splitServiceName[0]
|
||||
templateServiceName := splitServiceName[1]
|
||||
//记录模板
|
||||
if _, ok = cls.mapTemplateServiceNode[templateServiceName]; ok == false {
|
||||
cls.mapTemplateServiceNode[templateServiceName]=map[string]struct{}{}
|
||||
}
|
||||
cls.mapTemplateServiceNode[templateServiceName][serviceName] = struct{}{}
|
||||
}
|
||||
|
||||
if _, ok = cls.mapServiceNode[serviceName]; ok == false {
|
||||
cls.mapServiceNode[serviceName] = make(map[string]struct{}, 1)
|
||||
}
|
||||
cls.mapServiceNode[serviceName][nodeInfo.NodeId] = struct{}{}
|
||||
@@ -228,8 +256,6 @@ func (cls *Cluster) Init(localNodeId string, setupServiceFun SetupServiceFun) er
|
||||
}
|
||||
service.RegRpcEventFun = cls.RegRpcEvent
|
||||
service.UnRegRpcEventFun = cls.UnRegRpcEvent
|
||||
service.RegDiscoveryServiceEventFun = cls.RegDiscoveryEvent
|
||||
service.UnRegDiscoveryServiceEventFun = cls.UnReDiscoveryEvent
|
||||
|
||||
err = cls.serviceDiscovery.InitDiscovery(localNodeId, cls.serviceDiscoveryDelNode, cls.serviceDiscoverySetNodeInfo)
|
||||
if err != nil {
|
||||
@@ -263,25 +289,29 @@ func (cls *Cluster) GetRpcClient(nodeId string) (*rpc.Client,bool) {
|
||||
return cls.getRpcClient(nodeId)
|
||||
}
|
||||
|
||||
func GetRpcClient(nodeId string, serviceMethod string,filterRetire bool, clientList []*rpc.Client) (error, int) {
|
||||
func GetNodeIdByTemplateService(templateServiceName string, rpcClientList []*rpc.Client, filterRetire bool) (error, []*rpc.Client) {
|
||||
return GetCluster().GetNodeIdByTemplateService(templateServiceName, rpcClientList, filterRetire)
|
||||
}
|
||||
|
||||
func GetRpcClient(nodeId string, serviceMethod string,filterRetire bool, clientList []*rpc.Client) (error, []*rpc.Client) {
|
||||
if nodeId != rpc.NodeIdNull {
|
||||
pClient,retire := GetCluster().GetRpcClient(nodeId)
|
||||
if pClient == nil {
|
||||
return fmt.Errorf("cannot find nodeid %d!", nodeId), 0
|
||||
return fmt.Errorf("cannot find nodeid %d!", nodeId), nil
|
||||
}
|
||||
|
||||
//如果需要筛选掉退休结点
|
||||
if filterRetire == true && retire == true {
|
||||
return fmt.Errorf("cannot find nodeid %d!", nodeId), 0
|
||||
return fmt.Errorf("cannot find nodeid %d!", nodeId), nil
|
||||
}
|
||||
|
||||
clientList[0] = pClient
|
||||
return nil, 1
|
||||
clientList = append(clientList,pClient)
|
||||
return nil, clientList
|
||||
}
|
||||
|
||||
findIndex := strings.Index(serviceMethod, ".")
|
||||
if findIndex == -1 {
|
||||
return fmt.Errorf("servicemethod param %s is error!", serviceMethod), 0
|
||||
return fmt.Errorf("servicemethod param %s is error!", serviceMethod), nil
|
||||
}
|
||||
serviceName := serviceMethod[:findIndex]
|
||||
|
||||
@@ -322,23 +352,12 @@ func (cls *Cluster) NotifyAllService(event event.IEvent){
|
||||
}
|
||||
|
||||
func (cls *Cluster) TriggerDiscoveryEvent(bDiscovery bool, nodeId string, serviceName []string) {
|
||||
cls.rpcEventLocker.Lock()
|
||||
defer cls.rpcEventLocker.Unlock()
|
||||
|
||||
for sName, _ := range cls.mapServiceListenDiscoveryEvent {
|
||||
ser := service.GetService(sName)
|
||||
if ser == nil {
|
||||
log.Error("cannot find service",log.Any("services",serviceName))
|
||||
continue
|
||||
}
|
||||
|
||||
var eventData service.DiscoveryServiceEvent
|
||||
eventData.IsDiscovery = bDiscovery
|
||||
eventData.NodeId = nodeId
|
||||
eventData.ServiceName = serviceName
|
||||
ser.(service.IModule).NotifyEvent(&eventData)
|
||||
}
|
||||
var eventData service.DiscoveryServiceEvent
|
||||
eventData.IsDiscovery = bDiscovery
|
||||
eventData.NodeId = nodeId
|
||||
eventData.ServiceName = serviceName
|
||||
|
||||
cls.NotifyAllService(&eventData)
|
||||
}
|
||||
|
||||
func (cls *Cluster) GetLocalNodeInfo() *NodeInfo {
|
||||
@@ -361,25 +380,6 @@ func (cls *Cluster) UnRegRpcEvent(serviceName string) {
|
||||
cls.rpcEventLocker.Unlock()
|
||||
}
|
||||
|
||||
|
||||
func (cls *Cluster) RegDiscoveryEvent(serviceName string) {
|
||||
cls.rpcEventLocker.Lock()
|
||||
if cls.mapServiceListenDiscoveryEvent == nil {
|
||||
cls.mapServiceListenDiscoveryEvent = map[string]struct{}{}
|
||||
}
|
||||
|
||||
cls.mapServiceListenDiscoveryEvent[serviceName] = struct{}{}
|
||||
cls.rpcEventLocker.Unlock()
|
||||
}
|
||||
|
||||
func (cls *Cluster) UnReDiscoveryEvent(serviceName string) {
|
||||
cls.rpcEventLocker.Lock()
|
||||
delete(cls.mapServiceListenDiscoveryEvent, serviceName)
|
||||
cls.rpcEventLocker.Unlock()
|
||||
}
|
||||
|
||||
|
||||
|
||||
func HasService(nodeId string, serviceName string) bool {
|
||||
cluster.locker.RLock()
|
||||
defer cluster.locker.RUnlock()
|
||||
@@ -410,10 +410,50 @@ func GetNodeByServiceName(serviceName string) map[string]struct{} {
|
||||
return mapNodeId
|
||||
}
|
||||
|
||||
// GetNodeByTemplateServiceName 通过模板服务名获取服务名,返回 map[serviceName真实服务名]NodeId
|
||||
func GetNodeByTemplateServiceName(templateServiceName string) map[string]string {
|
||||
cluster.locker.RLock()
|
||||
defer cluster.locker.RUnlock()
|
||||
|
||||
mapServiceName := cluster.mapTemplateServiceNode[templateServiceName]
|
||||
mapNodeId := make(map[string]string,9)
|
||||
for serviceName := range mapServiceName {
|
||||
mapNode, ok := cluster.mapServiceNode[serviceName]
|
||||
if ok == false {
|
||||
return nil
|
||||
}
|
||||
|
||||
for nodeId,_ := range mapNode {
|
||||
mapNodeId[serviceName] = nodeId
|
||||
}
|
||||
}
|
||||
|
||||
return mapNodeId
|
||||
}
|
||||
|
||||
func (cls *Cluster) GetGlobalCfg() interface{} {
|
||||
return cls.globalCfg
|
||||
}
|
||||
|
||||
|
||||
func (cls *Cluster) ParseGlobalCfg(cfg interface{}) error{
|
||||
if cls.globalCfg == nil {
|
||||
return errors.New("no service configuration found")
|
||||
}
|
||||
|
||||
rv := reflect.ValueOf(cls.globalCfg)
|
||||
if rv.Kind() == reflect.Ptr && rv.IsNil() {
|
||||
return errors.New("no service configuration found")
|
||||
}
|
||||
|
||||
bytes,err := json.Marshal(cls.globalCfg)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return json.Unmarshal(bytes,cfg)
|
||||
}
|
||||
|
||||
func (cls *Cluster) GetNodeInfo(nodeId string) (NodeInfo,bool) {
|
||||
cls.locker.RLock()
|
||||
defer cls.locker.RUnlock()
|
||||
|
||||
@@ -183,6 +183,10 @@ func (ds *OriginDiscoveryMaster) OnNodeDisconnect(nodeId string) {
|
||||
|
||||
func (ds *OriginDiscoveryMaster) RpcCastGo(serviceMethod string, args interface{}) {
|
||||
for nodeId, _ := range ds.mapNodeInfo {
|
||||
if nodeId == cluster.GetLocalNodeInfo().NodeId {
|
||||
continue
|
||||
}
|
||||
|
||||
ds.GoNode(nodeId, serviceMethod, args)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -270,7 +270,7 @@ func (cls *Cluster) readLocalClusterConfig(nodeId string) (DiscoveryInfo, []Node
|
||||
}
|
||||
|
||||
if nodeId != rpc.NodeIdNull && (len(nodeInfoList) != 1) {
|
||||
return discoveryInfo, nil,rpcMode, fmt.Errorf("%d configurations were found for the configuration with node ID %d!", len(nodeInfoList), nodeId)
|
||||
return discoveryInfo, nil,rpcMode, fmt.Errorf("nodeid %s configuration error in NodeList", nodeId)
|
||||
}
|
||||
|
||||
for i, _ := range nodeInfoList {
|
||||
@@ -325,6 +325,10 @@ func (cls *Cluster) readLocalService(localNodeId string) error {
|
||||
//保存公共配置
|
||||
for _, s := range cls.localNodeInfo.ServiceList {
|
||||
for {
|
||||
splitServiceName := strings.Split(s,":")
|
||||
if len(splitServiceName) == 2 {
|
||||
s = splitServiceName[0]
|
||||
}
|
||||
//取公共服务配置
|
||||
pubCfg, ok := serviceConfig[s]
|
||||
if ok == true {
|
||||
@@ -355,6 +359,11 @@ func (cls *Cluster) readLocalService(localNodeId string) error {
|
||||
|
||||
//组合所有的配置
|
||||
for _, s := range cls.localNodeInfo.ServiceList {
|
||||
splitServiceName := strings.Split(s,":")
|
||||
if len(splitServiceName) == 2 {
|
||||
s = splitServiceName[0]
|
||||
}
|
||||
|
||||
//先从NodeService中找
|
||||
var serviceCfg interface{}
|
||||
var ok bool
|
||||
@@ -382,12 +391,24 @@ func (cls *Cluster) parseLocalCfg() {
|
||||
|
||||
cls.mapRpc[cls.localNodeInfo.NodeId] = &rpcInfo
|
||||
|
||||
for _, sName := range cls.localNodeInfo.ServiceList {
|
||||
if _, ok := cls.mapServiceNode[sName]; ok == false {
|
||||
cls.mapServiceNode[sName] = make(map[string]struct{})
|
||||
for _, serviceName := range cls.localNodeInfo.ServiceList {
|
||||
splitServiceName := strings.Split(serviceName,":")
|
||||
if len(splitServiceName) == 2 {
|
||||
serviceName = splitServiceName[0]
|
||||
templateServiceName := splitServiceName[1]
|
||||
//记录模板
|
||||
if _, ok := cls.mapTemplateServiceNode[templateServiceName]; ok == false {
|
||||
cls.mapTemplateServiceNode[templateServiceName]=map[string]struct{}{}
|
||||
}
|
||||
cls.mapTemplateServiceNode[templateServiceName][serviceName] = struct{}{}
|
||||
}
|
||||
|
||||
cls.mapServiceNode[sName][cls.localNodeInfo.NodeId] = struct{}{}
|
||||
|
||||
if _, ok := cls.mapServiceNode[serviceName]; ok == false {
|
||||
cls.mapServiceNode[serviceName] = make(map[string]struct{})
|
||||
}
|
||||
|
||||
cls.mapServiceNode[serviceName][cls.localNodeInfo.NodeId] = struct{}{}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -403,6 +424,7 @@ func (cls *Cluster) InitCfg(localNodeId string) error {
|
||||
cls.localServiceCfg = map[string]interface{}{}
|
||||
cls.mapRpc = map[string]*NodeRpcInfo{}
|
||||
cls.mapServiceNode = map[string]map[string]struct{}{}
|
||||
cls.mapTemplateServiceNode = map[string]map[string]struct{}{}
|
||||
|
||||
//加载本地结点的NodeList配置
|
||||
discoveryInfo, nodeInfoList,rpcMode, err := cls.readLocalClusterConfig(localNodeId)
|
||||
@@ -436,12 +458,37 @@ func (cls *Cluster) IsConfigService(serviceName string) bool {
|
||||
return ok
|
||||
}
|
||||
|
||||
func (cls *Cluster) GetNodeIdByTemplateService(templateServiceName string, rpcClientList []*rpc.Client, filterRetire bool) (error, []*rpc.Client) {
|
||||
cls.locker.RLock()
|
||||
defer cls.locker.RUnlock()
|
||||
|
||||
func (cls *Cluster) GetNodeIdByService(serviceName string, rpcClientList []*rpc.Client, filterRetire bool) (error, int) {
|
||||
mapServiceName := cls.mapTemplateServiceNode[templateServiceName]
|
||||
for serviceName := range mapServiceName {
|
||||
mapNodeId, ok := cls.mapServiceNode[serviceName]
|
||||
if ok == true {
|
||||
for nodeId, _ := range mapNodeId {
|
||||
pClient,retire := GetCluster().getRpcClient(nodeId)
|
||||
if pClient == nil || pClient.IsConnected() == false {
|
||||
continue
|
||||
}
|
||||
|
||||
//如果需要筛选掉退休的,对retire状态的结点略过
|
||||
if filterRetire == true && retire == true {
|
||||
continue
|
||||
}
|
||||
|
||||
rpcClientList = append(rpcClientList,pClient)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil, rpcClientList
|
||||
}
|
||||
|
||||
func (cls *Cluster) GetNodeIdByService(serviceName string, rpcClientList []*rpc.Client, filterRetire bool) (error, []*rpc.Client) {
|
||||
cls.locker.RLock()
|
||||
defer cls.locker.RUnlock()
|
||||
mapNodeId, ok := cls.mapServiceNode[serviceName]
|
||||
count := 0
|
||||
if ok == true {
|
||||
for nodeId, _ := range mapNodeId {
|
||||
pClient,retire := GetCluster().getRpcClient(nodeId)
|
||||
@@ -454,15 +501,11 @@ func (cls *Cluster) GetNodeIdByService(serviceName string, rpcClientList []*rpc.
|
||||
continue
|
||||
}
|
||||
|
||||
rpcClientList[count] = pClient
|
||||
count++
|
||||
if count >= cap(rpcClientList) {
|
||||
break
|
||||
}
|
||||
rpcClientList = append(rpcClientList,pClient)
|
||||
}
|
||||
}
|
||||
|
||||
return nil, count
|
||||
return nil, rpcClientList
|
||||
}
|
||||
|
||||
func (cls *Cluster) GetServiceCfg(serviceName string) interface{} {
|
||||
|
||||
@@ -236,7 +236,6 @@ func (processor *EventProcessor) castEvent(event IEvent){
|
||||
|
||||
eventProcessor,ok := processor.mapListenerEvent[event.GetEventType()]
|
||||
if ok == false || processor == nil{
|
||||
log.Debug("event is not listen",log.Int("event type",int(event.GetEventType())))
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
@@ -17,6 +17,7 @@ const (
|
||||
Sys_Event_QueueTaskFinish EventType = -10
|
||||
Sys_Event_Retire EventType = -11
|
||||
Sys_Event_EtcdDiscovery EventType = -12
|
||||
Sys_Event_Gin_Event EventType = -13
|
||||
|
||||
Sys_Event_User_Define EventType = 1
|
||||
)
|
||||
|
||||
69
log/log.go
69
log/log.go
@@ -22,7 +22,10 @@ var LogSize int64
|
||||
var LogChannelCap int
|
||||
var LogPath string
|
||||
var LogLevel slog.Level = LevelTrace
|
||||
|
||||
|
||||
var gLogger, _ = NewTextLogger(LevelDebug, "", "",true,LogChannelCap)
|
||||
var isSetLogger bool
|
||||
var memPool = bytespool.NewMemAreaPool()
|
||||
|
||||
// levels
|
||||
@@ -37,6 +40,21 @@ const (
|
||||
LevelFatal = slog.Level(20)
|
||||
)
|
||||
|
||||
type ILogger interface {
|
||||
Trace(msg string, args ...any)
|
||||
Debug(msg string, args ...any)
|
||||
Info(msg string, args ...any)
|
||||
Warning(msg string, args ...any)
|
||||
Error(msg string, args ...any)
|
||||
Stack(msg string, args ...any)
|
||||
Dump(msg string, args ...any)
|
||||
Fatal(msg string, args ...any)
|
||||
|
||||
DoSPrintf(level slog.Level,a []interface{})
|
||||
FormatHeader(buf *Buffer,level slog.Level,calldepth int)
|
||||
Close()
|
||||
}
|
||||
|
||||
type Logger struct {
|
||||
Slogger *slog.Logger
|
||||
|
||||
@@ -47,7 +65,6 @@ type Logger struct {
|
||||
|
||||
type IoWriter struct {
|
||||
outFile io.Writer // destination for output
|
||||
outConsole io.Writer //os.Stdout
|
||||
writeBytes int64
|
||||
logChannel chan []byte
|
||||
wg sync.WaitGroup
|
||||
@@ -122,8 +139,8 @@ func (iw *IoWriter) Write(p []byte) (n int, err error){
|
||||
func (iw *IoWriter) writeIo(p []byte) (n int, err error){
|
||||
n,err = iw.writeFile(p)
|
||||
|
||||
if iw.outConsole != nil {
|
||||
n,err = iw.outConsole.Write(p)
|
||||
if OpenConsole {
|
||||
n,err = os.Stdout.Write(p)
|
||||
}
|
||||
|
||||
return
|
||||
@@ -217,17 +234,12 @@ func (iw *IoWriter) swichFile() error{
|
||||
iw.fileDay = now.Day()
|
||||
iw.fileCreateTime = now.Unix()
|
||||
atomic.StoreInt64(&iw.writeBytes,0)
|
||||
if OpenConsole == true {
|
||||
iw.outConsole = os.Stdout
|
||||
}
|
||||
}else{
|
||||
iw.outConsole = os.Stdout
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func NewTextLogger(level slog.Level,pathName string,filePrefix string,addSource bool,logChannelCap int) (*Logger,error){
|
||||
func NewTextLogger(level slog.Level,pathName string,filePrefix string,addSource bool,logChannelCap int) (ILogger,error){
|
||||
var logger Logger
|
||||
logger.ioWriter.filePath = pathName
|
||||
logger.ioWriter.fileprefix = filePrefix
|
||||
@@ -242,7 +254,7 @@ func NewTextLogger(level slog.Level,pathName string,filePrefix string,addSource
|
||||
return &logger,nil
|
||||
}
|
||||
|
||||
func NewJsonLogger(level slog.Level,pathName string,filePrefix string,addSource bool,logChannelCap int) (*Logger,error){
|
||||
func NewJsonLogger(level slog.Level,pathName string,filePrefix string,addSource bool,logChannelCap int) (ILogger,error){
|
||||
var logger Logger
|
||||
logger.ioWriter.filePath = pathName
|
||||
logger.ioWriter.fileprefix = filePrefix
|
||||
@@ -296,13 +308,18 @@ func (logger *Logger) Fatal(msg string, args ...any) {
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
// It's dangerous to call the method on logging
|
||||
func Export(logger *Logger) {
|
||||
if logger != nil {
|
||||
// It's non-thread-safe
|
||||
func SetLogger(logger ILogger) {
|
||||
if logger != nil && isSetLogger == false {
|
||||
gLogger = logger
|
||||
isSetLogger = true
|
||||
}
|
||||
}
|
||||
|
||||
func GetLogger() ILogger{
|
||||
return gLogger
|
||||
}
|
||||
|
||||
func Trace(msg string, args ...any){
|
||||
gLogger.Trace(msg, args...)
|
||||
}
|
||||
@@ -415,7 +432,7 @@ func Group(key string, args ...any) slog.Attr {
|
||||
return slog.Group(key, args...)
|
||||
}
|
||||
|
||||
func (logger *Logger) doSPrintf(level slog.Level,a []interface{}) {
|
||||
func (logger *Logger) DoSPrintf(level slog.Level,a []interface{}) {
|
||||
if logger.Slogger.Enabled(context.Background(),level) == false{
|
||||
return
|
||||
}
|
||||
@@ -425,7 +442,7 @@ func (logger *Logger) doSPrintf(level slog.Level,a []interface{}) {
|
||||
|
||||
logger.sBuff.Reset()
|
||||
|
||||
logger.formatHeader(&logger.sBuff,level,3)
|
||||
logger.FormatHeader(&logger.sBuff,level,3)
|
||||
|
||||
for _,s := range a {
|
||||
logger.sBuff.AppendString(slog.AnyValue(s).String())
|
||||
@@ -435,46 +452,46 @@ func (logger *Logger) doSPrintf(level slog.Level,a []interface{}) {
|
||||
}
|
||||
|
||||
func (logger *Logger) STrace(a ...interface{}) {
|
||||
logger.doSPrintf(LevelTrace,a)
|
||||
logger.DoSPrintf(LevelTrace,a)
|
||||
}
|
||||
|
||||
func (logger *Logger) SDebug(a ...interface{}) {
|
||||
logger.doSPrintf(LevelDebug,a)
|
||||
logger.DoSPrintf(LevelDebug,a)
|
||||
}
|
||||
|
||||
func (logger *Logger) SInfo(a ...interface{}) {
|
||||
logger.doSPrintf(LevelInfo,a)
|
||||
logger.DoSPrintf(LevelInfo,a)
|
||||
}
|
||||
|
||||
func (logger *Logger) SWarning(a ...interface{}) {
|
||||
logger.doSPrintf(LevelWarning,a)
|
||||
logger.DoSPrintf(LevelWarning,a)
|
||||
}
|
||||
|
||||
func (logger *Logger) SError(a ...interface{}) {
|
||||
logger.doSPrintf(LevelError,a)
|
||||
logger.DoSPrintf(LevelError,a)
|
||||
}
|
||||
|
||||
func STrace(a ...interface{}) {
|
||||
gLogger.doSPrintf(LevelTrace,a)
|
||||
gLogger.DoSPrintf(LevelTrace,a)
|
||||
}
|
||||
|
||||
func SDebug(a ...interface{}) {
|
||||
gLogger.doSPrintf(LevelDebug,a)
|
||||
gLogger.DoSPrintf(LevelDebug,a)
|
||||
}
|
||||
|
||||
func SInfo(a ...interface{}) {
|
||||
gLogger.doSPrintf(LevelInfo,a)
|
||||
gLogger.DoSPrintf(LevelInfo,a)
|
||||
}
|
||||
|
||||
func SWarning(a ...interface{}) {
|
||||
gLogger.doSPrintf(LevelWarning,a)
|
||||
gLogger.DoSPrintf(LevelWarning,a)
|
||||
}
|
||||
|
||||
func SError(a ...interface{}) {
|
||||
gLogger.doSPrintf(LevelError,a)
|
||||
gLogger.DoSPrintf(LevelError,a)
|
||||
}
|
||||
|
||||
func (logger *Logger) formatHeader(buf *Buffer,level slog.Level,calldepth int) {
|
||||
func (logger *Logger) FormatHeader(buf *Buffer,level slog.Level,calldepth int) {
|
||||
t := time.Now()
|
||||
var file string
|
||||
var line int
|
||||
|
||||
@@ -10,9 +10,9 @@ type RawMessageInfo struct {
|
||||
msgHandler RawMessageHandler
|
||||
}
|
||||
|
||||
type RawMessageHandler func(clientId uint64,packType uint16,msg []byte)
|
||||
type RawConnectHandler func(clientId uint64)
|
||||
type UnknownRawMessageHandler func(clientId uint64,msg []byte)
|
||||
type RawMessageHandler func(clientId string,packType uint16,msg []byte)
|
||||
type RawConnectHandler func(clientId string)
|
||||
type UnknownRawMessageHandler func(clientId string,msg []byte)
|
||||
|
||||
const RawMsgTypeSize = 2
|
||||
type PBRawProcessor struct {
|
||||
@@ -38,14 +38,14 @@ func (pbRawProcessor *PBRawProcessor) SetByteOrder(littleEndian bool) {
|
||||
}
|
||||
|
||||
// must goroutine safe
|
||||
func (pbRawProcessor *PBRawProcessor ) MsgRoute(clientId uint64, msg interface{}) error{
|
||||
func (pbRawProcessor *PBRawProcessor ) MsgRoute(clientId string, msg interface{}) error{
|
||||
pPackInfo := msg.(*PBRawPackInfo)
|
||||
pbRawProcessor.msgHandler(clientId,pPackInfo.typ,pPackInfo.rawMsg)
|
||||
return nil
|
||||
}
|
||||
|
||||
// must goroutine safe
|
||||
func (pbRawProcessor *PBRawProcessor ) Unmarshal(clientId uint64,data []byte) (interface{}, error) {
|
||||
func (pbRawProcessor *PBRawProcessor ) Unmarshal(clientId string,data []byte) (interface{}, error) {
|
||||
var msgType uint16
|
||||
if pbRawProcessor.LittleEndian == true {
|
||||
msgType = binary.LittleEndian.Uint16(data[:2])
|
||||
@@ -57,7 +57,7 @@ func (pbRawProcessor *PBRawProcessor ) Unmarshal(clientId uint64,data []byte) (i
|
||||
}
|
||||
|
||||
// must goroutine safe
|
||||
func (pbRawProcessor *PBRawProcessor ) Marshal(clientId uint64,msg interface{}) ([]byte, error){
|
||||
func (pbRawProcessor *PBRawProcessor ) Marshal(clientId string,msg interface{}) ([]byte, error){
|
||||
pMsg := msg.(*PBRawPackInfo)
|
||||
|
||||
buff := make([]byte, 2, len(pMsg.rawMsg)+RawMsgTypeSize)
|
||||
@@ -80,7 +80,7 @@ func (pbRawProcessor *PBRawProcessor) MakeRawMsg(msgType uint16,msg []byte,pbRaw
|
||||
pbRawPackInfo.rawMsg = msg
|
||||
}
|
||||
|
||||
func (pbRawProcessor *PBRawProcessor) UnknownMsgRoute(clientId uint64,msg interface{}){
|
||||
func (pbRawProcessor *PBRawProcessor) UnknownMsgRoute(clientId string,msg interface{}){
|
||||
if pbRawProcessor.unknownMessageHandler == nil {
|
||||
return
|
||||
}
|
||||
@@ -88,11 +88,11 @@ func (pbRawProcessor *PBRawProcessor) UnknownMsgRoute(clientId uint64,msg interf
|
||||
}
|
||||
|
||||
// connect event
|
||||
func (pbRawProcessor *PBRawProcessor) ConnectedRoute(clientId uint64){
|
||||
func (pbRawProcessor *PBRawProcessor) ConnectedRoute(clientId string){
|
||||
pbRawProcessor.connectHandler(clientId)
|
||||
}
|
||||
|
||||
func (pbRawProcessor *PBRawProcessor) DisConnectedRoute(clientId uint64){
|
||||
func (pbRawProcessor *PBRawProcessor) DisConnectedRoute(clientId string){
|
||||
pbRawProcessor.disconnectHandler(clientId)
|
||||
}
|
||||
|
||||
|
||||
36
node/node.go
36
node/node.go
@@ -25,6 +25,7 @@ import (
|
||||
var sig chan os.Signal
|
||||
var nodeId string
|
||||
var preSetupService []service.IService //预安装
|
||||
var preSetupTemplateService []func()service.IService
|
||||
var profilerInterval time.Duration
|
||||
var bValid bool
|
||||
var configDir = "./config/"
|
||||
@@ -117,7 +118,7 @@ func setConfigPath(val interface{}) error {
|
||||
}
|
||||
|
||||
func getRunProcessPid(nodeId string) (int, error) {
|
||||
f, err := os.OpenFile(fmt.Sprintf("%s_%d.pid", os.Args[0], nodeId), os.O_RDONLY, 0600)
|
||||
f, err := os.OpenFile(fmt.Sprintf("%s_%s.pid", os.Args[0], nodeId), os.O_RDONLY, 0600)
|
||||
defer f.Close()
|
||||
if err != nil {
|
||||
return 0, err
|
||||
@@ -169,6 +170,31 @@ func initNode(id string) {
|
||||
serviceOrder := cluster.GetCluster().GetLocalNodeInfo().ServiceList
|
||||
for _,serviceName:= range serviceOrder{
|
||||
bSetup := false
|
||||
|
||||
//判断是否有配置模板服务
|
||||
splitServiceName := strings.Split(serviceName,":")
|
||||
if len(splitServiceName) == 2 {
|
||||
serviceName = splitServiceName[0]
|
||||
templateServiceName := splitServiceName[1]
|
||||
for _,newSer := range preSetupTemplateService {
|
||||
ser := newSer()
|
||||
ser.OnSetup(ser)
|
||||
if ser.GetName() == templateServiceName {
|
||||
ser.SetName(serviceName)
|
||||
ser.Init(ser,cluster.GetRpcClient,cluster.GetRpcServer,cluster.GetCluster().GetServiceCfg(ser.GetName()))
|
||||
service.Setup(ser)
|
||||
|
||||
bSetup = true
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if bSetup == false{
|
||||
log.Error("Template service not found",log.String("service name",serviceName),log.String("template service name",templateServiceName))
|
||||
os.Exit(1)
|
||||
}
|
||||
}
|
||||
|
||||
for _, s := range preSetupService {
|
||||
if s.GetName() != serviceName {
|
||||
continue
|
||||
@@ -201,7 +227,7 @@ func initLog() error {
|
||||
fmt.Printf("cannot create log file!\n")
|
||||
return err
|
||||
}
|
||||
log.Export(logger)
|
||||
log.SetLogger(logger)
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -367,6 +393,12 @@ func Setup(s ...service.IService) {
|
||||
}
|
||||
}
|
||||
|
||||
func SetupTemplate(fs ...func()service.IService){
|
||||
for _, f := range fs {
|
||||
preSetupTemplateService = append(preSetupTemplateService, f)
|
||||
}
|
||||
}
|
||||
|
||||
func GetService(serviceName string) service.IService {
|
||||
return service.GetService(serviceName)
|
||||
}
|
||||
|
||||
@@ -1,8 +1,8 @@
|
||||
// Code generated by protoc-gen-go. DO NOT EDIT.
|
||||
// versions:
|
||||
// protoc-gen-go v1.31.0
|
||||
// protoc v3.11.4
|
||||
// source: test/rpc/messagequeue.proto
|
||||
// protoc v4.24.0
|
||||
// source: rpcproto/messagequeue.proto
|
||||
|
||||
package rpc
|
||||
|
||||
@@ -50,11 +50,11 @@ func (x SubscribeType) String() string {
|
||||
}
|
||||
|
||||
func (SubscribeType) Descriptor() protoreflect.EnumDescriptor {
|
||||
return file_test_rpc_messagequeue_proto_enumTypes[0].Descriptor()
|
||||
return file_rpcproto_messagequeue_proto_enumTypes[0].Descriptor()
|
||||
}
|
||||
|
||||
func (SubscribeType) Type() protoreflect.EnumType {
|
||||
return &file_test_rpc_messagequeue_proto_enumTypes[0]
|
||||
return &file_rpcproto_messagequeue_proto_enumTypes[0]
|
||||
}
|
||||
|
||||
func (x SubscribeType) Number() protoreflect.EnumNumber {
|
||||
@@ -63,7 +63,7 @@ func (x SubscribeType) Number() protoreflect.EnumNumber {
|
||||
|
||||
// Deprecated: Use SubscribeType.Descriptor instead.
|
||||
func (SubscribeType) EnumDescriptor() ([]byte, []int) {
|
||||
return file_test_rpc_messagequeue_proto_rawDescGZIP(), []int{0}
|
||||
return file_rpcproto_messagequeue_proto_rawDescGZIP(), []int{0}
|
||||
}
|
||||
|
||||
type SubscribeMethod int32
|
||||
@@ -96,11 +96,11 @@ func (x SubscribeMethod) String() string {
|
||||
}
|
||||
|
||||
func (SubscribeMethod) Descriptor() protoreflect.EnumDescriptor {
|
||||
return file_test_rpc_messagequeue_proto_enumTypes[1].Descriptor()
|
||||
return file_rpcproto_messagequeue_proto_enumTypes[1].Descriptor()
|
||||
}
|
||||
|
||||
func (SubscribeMethod) Type() protoreflect.EnumType {
|
||||
return &file_test_rpc_messagequeue_proto_enumTypes[1]
|
||||
return &file_rpcproto_messagequeue_proto_enumTypes[1]
|
||||
}
|
||||
|
||||
func (x SubscribeMethod) Number() protoreflect.EnumNumber {
|
||||
@@ -109,7 +109,7 @@ func (x SubscribeMethod) Number() protoreflect.EnumNumber {
|
||||
|
||||
// Deprecated: Use SubscribeMethod.Descriptor instead.
|
||||
func (SubscribeMethod) EnumDescriptor() ([]byte, []int) {
|
||||
return file_test_rpc_messagequeue_proto_rawDescGZIP(), []int{1}
|
||||
return file_rpcproto_messagequeue_proto_rawDescGZIP(), []int{1}
|
||||
}
|
||||
|
||||
type DBQueuePopReq struct {
|
||||
@@ -127,7 +127,7 @@ type DBQueuePopReq struct {
|
||||
func (x *DBQueuePopReq) Reset() {
|
||||
*x = DBQueuePopReq{}
|
||||
if protoimpl.UnsafeEnabled {
|
||||
mi := &file_test_rpc_messagequeue_proto_msgTypes[0]
|
||||
mi := &file_rpcproto_messagequeue_proto_msgTypes[0]
|
||||
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
|
||||
ms.StoreMessageInfo(mi)
|
||||
}
|
||||
@@ -140,7 +140,7 @@ func (x *DBQueuePopReq) String() string {
|
||||
func (*DBQueuePopReq) ProtoMessage() {}
|
||||
|
||||
func (x *DBQueuePopReq) ProtoReflect() protoreflect.Message {
|
||||
mi := &file_test_rpc_messagequeue_proto_msgTypes[0]
|
||||
mi := &file_rpcproto_messagequeue_proto_msgTypes[0]
|
||||
if protoimpl.UnsafeEnabled && x != nil {
|
||||
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
|
||||
if ms.LoadMessageInfo() == nil {
|
||||
@@ -153,7 +153,7 @@ func (x *DBQueuePopReq) ProtoReflect() protoreflect.Message {
|
||||
|
||||
// Deprecated: Use DBQueuePopReq.ProtoReflect.Descriptor instead.
|
||||
func (*DBQueuePopReq) Descriptor() ([]byte, []int) {
|
||||
return file_test_rpc_messagequeue_proto_rawDescGZIP(), []int{0}
|
||||
return file_rpcproto_messagequeue_proto_rawDescGZIP(), []int{0}
|
||||
}
|
||||
|
||||
func (x *DBQueuePopReq) GetCustomerId() string {
|
||||
@@ -203,7 +203,7 @@ type DBQueuePopRes struct {
|
||||
func (x *DBQueuePopRes) Reset() {
|
||||
*x = DBQueuePopRes{}
|
||||
if protoimpl.UnsafeEnabled {
|
||||
mi := &file_test_rpc_messagequeue_proto_msgTypes[1]
|
||||
mi := &file_rpcproto_messagequeue_proto_msgTypes[1]
|
||||
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
|
||||
ms.StoreMessageInfo(mi)
|
||||
}
|
||||
@@ -216,7 +216,7 @@ func (x *DBQueuePopRes) String() string {
|
||||
func (*DBQueuePopRes) ProtoMessage() {}
|
||||
|
||||
func (x *DBQueuePopRes) ProtoReflect() protoreflect.Message {
|
||||
mi := &file_test_rpc_messagequeue_proto_msgTypes[1]
|
||||
mi := &file_rpcproto_messagequeue_proto_msgTypes[1]
|
||||
if protoimpl.UnsafeEnabled && x != nil {
|
||||
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
|
||||
if ms.LoadMessageInfo() == nil {
|
||||
@@ -229,7 +229,7 @@ func (x *DBQueuePopRes) ProtoReflect() protoreflect.Message {
|
||||
|
||||
// Deprecated: Use DBQueuePopRes.ProtoReflect.Descriptor instead.
|
||||
func (*DBQueuePopRes) Descriptor() ([]byte, []int) {
|
||||
return file_test_rpc_messagequeue_proto_rawDescGZIP(), []int{1}
|
||||
return file_rpcproto_messagequeue_proto_rawDescGZIP(), []int{1}
|
||||
}
|
||||
|
||||
func (x *DBQueuePopRes) GetQueueName() string {
|
||||
@@ -255,7 +255,7 @@ type DBQueueSubscribeReq struct {
|
||||
SubType SubscribeType `protobuf:"varint,1,opt,name=SubType,proto3,enum=SubscribeType" json:"SubType,omitempty"` //订阅类型
|
||||
Method SubscribeMethod `protobuf:"varint,2,opt,name=Method,proto3,enum=SubscribeMethod" json:"Method,omitempty"` //订阅方法
|
||||
CustomerId string `protobuf:"bytes,3,opt,name=CustomerId,proto3" json:"CustomerId,omitempty"` //消费者Id
|
||||
FromNodeId int32 `protobuf:"varint,4,opt,name=FromNodeId,proto3" json:"FromNodeId,omitempty"`
|
||||
FromNodeId string `protobuf:"bytes,4,opt,name=FromNodeId,proto3" json:"FromNodeId,omitempty"`
|
||||
RpcMethod string `protobuf:"bytes,5,opt,name=RpcMethod,proto3" json:"RpcMethod,omitempty"`
|
||||
TopicName string `protobuf:"bytes,6,opt,name=TopicName,proto3" json:"TopicName,omitempty"` //主题名称
|
||||
StartIndex uint64 `protobuf:"varint,7,opt,name=StartIndex,proto3" json:"StartIndex,omitempty"` //开始位置 ,格式前4位是时间戳秒,后面是序号。如果填0时,服务自动修改成:(4bit 当前时间秒)| (0000 4bit)
|
||||
@@ -265,7 +265,7 @@ type DBQueueSubscribeReq struct {
|
||||
func (x *DBQueueSubscribeReq) Reset() {
|
||||
*x = DBQueueSubscribeReq{}
|
||||
if protoimpl.UnsafeEnabled {
|
||||
mi := &file_test_rpc_messagequeue_proto_msgTypes[2]
|
||||
mi := &file_rpcproto_messagequeue_proto_msgTypes[2]
|
||||
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
|
||||
ms.StoreMessageInfo(mi)
|
||||
}
|
||||
@@ -278,7 +278,7 @@ func (x *DBQueueSubscribeReq) String() string {
|
||||
func (*DBQueueSubscribeReq) ProtoMessage() {}
|
||||
|
||||
func (x *DBQueueSubscribeReq) ProtoReflect() protoreflect.Message {
|
||||
mi := &file_test_rpc_messagequeue_proto_msgTypes[2]
|
||||
mi := &file_rpcproto_messagequeue_proto_msgTypes[2]
|
||||
if protoimpl.UnsafeEnabled && x != nil {
|
||||
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
|
||||
if ms.LoadMessageInfo() == nil {
|
||||
@@ -291,7 +291,7 @@ func (x *DBQueueSubscribeReq) ProtoReflect() protoreflect.Message {
|
||||
|
||||
// Deprecated: Use DBQueueSubscribeReq.ProtoReflect.Descriptor instead.
|
||||
func (*DBQueueSubscribeReq) Descriptor() ([]byte, []int) {
|
||||
return file_test_rpc_messagequeue_proto_rawDescGZIP(), []int{2}
|
||||
return file_rpcproto_messagequeue_proto_rawDescGZIP(), []int{2}
|
||||
}
|
||||
|
||||
func (x *DBQueueSubscribeReq) GetSubType() SubscribeType {
|
||||
@@ -315,11 +315,11 @@ func (x *DBQueueSubscribeReq) GetCustomerId() string {
|
||||
return ""
|
||||
}
|
||||
|
||||
func (x *DBQueueSubscribeReq) GetFromNodeId() int32 {
|
||||
func (x *DBQueueSubscribeReq) GetFromNodeId() string {
|
||||
if x != nil {
|
||||
return x.FromNodeId
|
||||
}
|
||||
return 0
|
||||
return ""
|
||||
}
|
||||
|
||||
func (x *DBQueueSubscribeReq) GetRpcMethod() string {
|
||||
@@ -359,7 +359,7 @@ type DBQueueSubscribeRes struct {
|
||||
func (x *DBQueueSubscribeRes) Reset() {
|
||||
*x = DBQueueSubscribeRes{}
|
||||
if protoimpl.UnsafeEnabled {
|
||||
mi := &file_test_rpc_messagequeue_proto_msgTypes[3]
|
||||
mi := &file_rpcproto_messagequeue_proto_msgTypes[3]
|
||||
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
|
||||
ms.StoreMessageInfo(mi)
|
||||
}
|
||||
@@ -372,7 +372,7 @@ func (x *DBQueueSubscribeRes) String() string {
|
||||
func (*DBQueueSubscribeRes) ProtoMessage() {}
|
||||
|
||||
func (x *DBQueueSubscribeRes) ProtoReflect() protoreflect.Message {
|
||||
mi := &file_test_rpc_messagequeue_proto_msgTypes[3]
|
||||
mi := &file_rpcproto_messagequeue_proto_msgTypes[3]
|
||||
if protoimpl.UnsafeEnabled && x != nil {
|
||||
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
|
||||
if ms.LoadMessageInfo() == nil {
|
||||
@@ -385,7 +385,7 @@ func (x *DBQueueSubscribeRes) ProtoReflect() protoreflect.Message {
|
||||
|
||||
// Deprecated: Use DBQueueSubscribeRes.ProtoReflect.Descriptor instead.
|
||||
func (*DBQueueSubscribeRes) Descriptor() ([]byte, []int) {
|
||||
return file_test_rpc_messagequeue_proto_rawDescGZIP(), []int{3}
|
||||
return file_rpcproto_messagequeue_proto_rawDescGZIP(), []int{3}
|
||||
}
|
||||
|
||||
type DBQueuePublishReq struct {
|
||||
@@ -400,7 +400,7 @@ type DBQueuePublishReq struct {
|
||||
func (x *DBQueuePublishReq) Reset() {
|
||||
*x = DBQueuePublishReq{}
|
||||
if protoimpl.UnsafeEnabled {
|
||||
mi := &file_test_rpc_messagequeue_proto_msgTypes[4]
|
||||
mi := &file_rpcproto_messagequeue_proto_msgTypes[4]
|
||||
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
|
||||
ms.StoreMessageInfo(mi)
|
||||
}
|
||||
@@ -413,7 +413,7 @@ func (x *DBQueuePublishReq) String() string {
|
||||
func (*DBQueuePublishReq) ProtoMessage() {}
|
||||
|
||||
func (x *DBQueuePublishReq) ProtoReflect() protoreflect.Message {
|
||||
mi := &file_test_rpc_messagequeue_proto_msgTypes[4]
|
||||
mi := &file_rpcproto_messagequeue_proto_msgTypes[4]
|
||||
if protoimpl.UnsafeEnabled && x != nil {
|
||||
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
|
||||
if ms.LoadMessageInfo() == nil {
|
||||
@@ -426,7 +426,7 @@ func (x *DBQueuePublishReq) ProtoReflect() protoreflect.Message {
|
||||
|
||||
// Deprecated: Use DBQueuePublishReq.ProtoReflect.Descriptor instead.
|
||||
func (*DBQueuePublishReq) Descriptor() ([]byte, []int) {
|
||||
return file_test_rpc_messagequeue_proto_rawDescGZIP(), []int{4}
|
||||
return file_rpcproto_messagequeue_proto_rawDescGZIP(), []int{4}
|
||||
}
|
||||
|
||||
func (x *DBQueuePublishReq) GetTopicName() string {
|
||||
@@ -452,7 +452,7 @@ type DBQueuePublishRes struct {
|
||||
func (x *DBQueuePublishRes) Reset() {
|
||||
*x = DBQueuePublishRes{}
|
||||
if protoimpl.UnsafeEnabled {
|
||||
mi := &file_test_rpc_messagequeue_proto_msgTypes[5]
|
||||
mi := &file_rpcproto_messagequeue_proto_msgTypes[5]
|
||||
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
|
||||
ms.StoreMessageInfo(mi)
|
||||
}
|
||||
@@ -465,7 +465,7 @@ func (x *DBQueuePublishRes) String() string {
|
||||
func (*DBQueuePublishRes) ProtoMessage() {}
|
||||
|
||||
func (x *DBQueuePublishRes) ProtoReflect() protoreflect.Message {
|
||||
mi := &file_test_rpc_messagequeue_proto_msgTypes[5]
|
||||
mi := &file_rpcproto_messagequeue_proto_msgTypes[5]
|
||||
if protoimpl.UnsafeEnabled && x != nil {
|
||||
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
|
||||
if ms.LoadMessageInfo() == nil {
|
||||
@@ -478,13 +478,13 @@ func (x *DBQueuePublishRes) ProtoReflect() protoreflect.Message {
|
||||
|
||||
// Deprecated: Use DBQueuePublishRes.ProtoReflect.Descriptor instead.
|
||||
func (*DBQueuePublishRes) Descriptor() ([]byte, []int) {
|
||||
return file_test_rpc_messagequeue_proto_rawDescGZIP(), []int{5}
|
||||
return file_rpcproto_messagequeue_proto_rawDescGZIP(), []int{5}
|
||||
}
|
||||
|
||||
var File_test_rpc_messagequeue_proto protoreflect.FileDescriptor
|
||||
var File_rpcproto_messagequeue_proto protoreflect.FileDescriptor
|
||||
|
||||
var file_test_rpc_messagequeue_proto_rawDesc = []byte{
|
||||
0x0a, 0x1b, 0x74, 0x65, 0x73, 0x74, 0x2f, 0x72, 0x70, 0x63, 0x2f, 0x6d, 0x65, 0x73, 0x73, 0x61,
|
||||
var file_rpcproto_messagequeue_proto_rawDesc = []byte{
|
||||
0x0a, 0x1b, 0x72, 0x70, 0x63, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x6d, 0x65, 0x73, 0x73, 0x61,
|
||||
0x67, 0x65, 0x71, 0x75, 0x65, 0x75, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0xa3, 0x01,
|
||||
0x0a, 0x0d, 0x44, 0x42, 0x51, 0x75, 0x65, 0x75, 0x65, 0x50, 0x6f, 0x70, 0x52, 0x65, 0x71, 0x12,
|
||||
0x1e, 0x0a, 0x0a, 0x43, 0x75, 0x73, 0x74, 0x6f, 0x6d, 0x65, 0x72, 0x49, 0x64, 0x18, 0x01, 0x20,
|
||||
@@ -510,7 +510,7 @@ var file_test_rpc_messagequeue_proto_rawDesc = []byte{
|
||||
0x6f, 0x64, 0x52, 0x06, 0x4d, 0x65, 0x74, 0x68, 0x6f, 0x64, 0x12, 0x1e, 0x0a, 0x0a, 0x43, 0x75,
|
||||
0x73, 0x74, 0x6f, 0x6d, 0x65, 0x72, 0x49, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a,
|
||||
0x43, 0x75, 0x73, 0x74, 0x6f, 0x6d, 0x65, 0x72, 0x49, 0x64, 0x12, 0x1e, 0x0a, 0x0a, 0x46, 0x72,
|
||||
0x6f, 0x6d, 0x4e, 0x6f, 0x64, 0x65, 0x49, 0x64, 0x18, 0x04, 0x20, 0x01, 0x28, 0x05, 0x52, 0x0a,
|
||||
0x6f, 0x6d, 0x4e, 0x6f, 0x64, 0x65, 0x49, 0x64, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a,
|
||||
0x46, 0x72, 0x6f, 0x6d, 0x4e, 0x6f, 0x64, 0x65, 0x49, 0x64, 0x12, 0x1c, 0x0a, 0x09, 0x52, 0x70,
|
||||
0x63, 0x4d, 0x65, 0x74, 0x68, 0x6f, 0x64, 0x18, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x52,
|
||||
0x70, 0x63, 0x4d, 0x65, 0x74, 0x68, 0x6f, 0x64, 0x12, 0x1c, 0x0a, 0x09, 0x54, 0x6f, 0x70, 0x69,
|
||||
@@ -539,20 +539,20 @@ var file_test_rpc_messagequeue_proto_rawDesc = []byte{
|
||||
}
|
||||
|
||||
var (
|
||||
file_test_rpc_messagequeue_proto_rawDescOnce sync.Once
|
||||
file_test_rpc_messagequeue_proto_rawDescData = file_test_rpc_messagequeue_proto_rawDesc
|
||||
file_rpcproto_messagequeue_proto_rawDescOnce sync.Once
|
||||
file_rpcproto_messagequeue_proto_rawDescData = file_rpcproto_messagequeue_proto_rawDesc
|
||||
)
|
||||
|
||||
func file_test_rpc_messagequeue_proto_rawDescGZIP() []byte {
|
||||
file_test_rpc_messagequeue_proto_rawDescOnce.Do(func() {
|
||||
file_test_rpc_messagequeue_proto_rawDescData = protoimpl.X.CompressGZIP(file_test_rpc_messagequeue_proto_rawDescData)
|
||||
func file_rpcproto_messagequeue_proto_rawDescGZIP() []byte {
|
||||
file_rpcproto_messagequeue_proto_rawDescOnce.Do(func() {
|
||||
file_rpcproto_messagequeue_proto_rawDescData = protoimpl.X.CompressGZIP(file_rpcproto_messagequeue_proto_rawDescData)
|
||||
})
|
||||
return file_test_rpc_messagequeue_proto_rawDescData
|
||||
return file_rpcproto_messagequeue_proto_rawDescData
|
||||
}
|
||||
|
||||
var file_test_rpc_messagequeue_proto_enumTypes = make([]protoimpl.EnumInfo, 2)
|
||||
var file_test_rpc_messagequeue_proto_msgTypes = make([]protoimpl.MessageInfo, 6)
|
||||
var file_test_rpc_messagequeue_proto_goTypes = []interface{}{
|
||||
var file_rpcproto_messagequeue_proto_enumTypes = make([]protoimpl.EnumInfo, 2)
|
||||
var file_rpcproto_messagequeue_proto_msgTypes = make([]protoimpl.MessageInfo, 6)
|
||||
var file_rpcproto_messagequeue_proto_goTypes = []interface{}{
|
||||
(SubscribeType)(0), // 0: SubscribeType
|
||||
(SubscribeMethod)(0), // 1: SubscribeMethod
|
||||
(*DBQueuePopReq)(nil), // 2: DBQueuePopReq
|
||||
@@ -562,7 +562,7 @@ var file_test_rpc_messagequeue_proto_goTypes = []interface{}{
|
||||
(*DBQueuePublishReq)(nil), // 6: DBQueuePublishReq
|
||||
(*DBQueuePublishRes)(nil), // 7: DBQueuePublishRes
|
||||
}
|
||||
var file_test_rpc_messagequeue_proto_depIdxs = []int32{
|
||||
var file_rpcproto_messagequeue_proto_depIdxs = []int32{
|
||||
0, // 0: DBQueueSubscribeReq.SubType:type_name -> SubscribeType
|
||||
1, // 1: DBQueueSubscribeReq.Method:type_name -> SubscribeMethod
|
||||
2, // [2:2] is the sub-list for method output_type
|
||||
@@ -572,13 +572,13 @@ var file_test_rpc_messagequeue_proto_depIdxs = []int32{
|
||||
0, // [0:2] is the sub-list for field type_name
|
||||
}
|
||||
|
||||
func init() { file_test_rpc_messagequeue_proto_init() }
|
||||
func file_test_rpc_messagequeue_proto_init() {
|
||||
if File_test_rpc_messagequeue_proto != nil {
|
||||
func init() { file_rpcproto_messagequeue_proto_init() }
|
||||
func file_rpcproto_messagequeue_proto_init() {
|
||||
if File_rpcproto_messagequeue_proto != nil {
|
||||
return
|
||||
}
|
||||
if !protoimpl.UnsafeEnabled {
|
||||
file_test_rpc_messagequeue_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} {
|
||||
file_rpcproto_messagequeue_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} {
|
||||
switch v := v.(*DBQueuePopReq); i {
|
||||
case 0:
|
||||
return &v.state
|
||||
@@ -590,7 +590,7 @@ func file_test_rpc_messagequeue_proto_init() {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
file_test_rpc_messagequeue_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} {
|
||||
file_rpcproto_messagequeue_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} {
|
||||
switch v := v.(*DBQueuePopRes); i {
|
||||
case 0:
|
||||
return &v.state
|
||||
@@ -602,7 +602,7 @@ func file_test_rpc_messagequeue_proto_init() {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
file_test_rpc_messagequeue_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} {
|
||||
file_rpcproto_messagequeue_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} {
|
||||
switch v := v.(*DBQueueSubscribeReq); i {
|
||||
case 0:
|
||||
return &v.state
|
||||
@@ -614,7 +614,7 @@ func file_test_rpc_messagequeue_proto_init() {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
file_test_rpc_messagequeue_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} {
|
||||
file_rpcproto_messagequeue_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} {
|
||||
switch v := v.(*DBQueueSubscribeRes); i {
|
||||
case 0:
|
||||
return &v.state
|
||||
@@ -626,7 +626,7 @@ func file_test_rpc_messagequeue_proto_init() {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
file_test_rpc_messagequeue_proto_msgTypes[4].Exporter = func(v interface{}, i int) interface{} {
|
||||
file_rpcproto_messagequeue_proto_msgTypes[4].Exporter = func(v interface{}, i int) interface{} {
|
||||
switch v := v.(*DBQueuePublishReq); i {
|
||||
case 0:
|
||||
return &v.state
|
||||
@@ -638,7 +638,7 @@ func file_test_rpc_messagequeue_proto_init() {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
file_test_rpc_messagequeue_proto_msgTypes[5].Exporter = func(v interface{}, i int) interface{} {
|
||||
file_rpcproto_messagequeue_proto_msgTypes[5].Exporter = func(v interface{}, i int) interface{} {
|
||||
switch v := v.(*DBQueuePublishRes); i {
|
||||
case 0:
|
||||
return &v.state
|
||||
@@ -655,19 +655,19 @@ func file_test_rpc_messagequeue_proto_init() {
|
||||
out := protoimpl.TypeBuilder{
|
||||
File: protoimpl.DescBuilder{
|
||||
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
|
||||
RawDescriptor: file_test_rpc_messagequeue_proto_rawDesc,
|
||||
RawDescriptor: file_rpcproto_messagequeue_proto_rawDesc,
|
||||
NumEnums: 2,
|
||||
NumMessages: 6,
|
||||
NumExtensions: 0,
|
||||
NumServices: 0,
|
||||
},
|
||||
GoTypes: file_test_rpc_messagequeue_proto_goTypes,
|
||||
DependencyIndexes: file_test_rpc_messagequeue_proto_depIdxs,
|
||||
EnumInfos: file_test_rpc_messagequeue_proto_enumTypes,
|
||||
MessageInfos: file_test_rpc_messagequeue_proto_msgTypes,
|
||||
GoTypes: file_rpcproto_messagequeue_proto_goTypes,
|
||||
DependencyIndexes: file_rpcproto_messagequeue_proto_depIdxs,
|
||||
EnumInfos: file_rpcproto_messagequeue_proto_enumTypes,
|
||||
MessageInfos: file_rpcproto_messagequeue_proto_msgTypes,
|
||||
}.Build()
|
||||
File_test_rpc_messagequeue_proto = out.File
|
||||
file_test_rpc_messagequeue_proto_rawDesc = nil
|
||||
file_test_rpc_messagequeue_proto_goTypes = nil
|
||||
file_test_rpc_messagequeue_proto_depIdxs = nil
|
||||
File_rpcproto_messagequeue_proto = out.File
|
||||
file_rpcproto_messagequeue_proto_rawDesc = nil
|
||||
file_rpcproto_messagequeue_proto_goTypes = nil
|
||||
file_rpcproto_messagequeue_proto_depIdxs = nil
|
||||
}
|
||||
|
||||
@@ -31,7 +31,7 @@ message DBQueueSubscribeReq {
|
||||
SubscribeType SubType = 1; //订阅类型
|
||||
SubscribeMethod Method = 2; //订阅方法
|
||||
string CustomerId = 3; //消费者Id
|
||||
int32 FromNodeId = 4;
|
||||
string FromNodeId = 4;
|
||||
string RpcMethod = 5;
|
||||
string TopicName = 6; //主题名称
|
||||
uint64 StartIndex = 7; //开始位置 ,格式前4位是时间戳秒,后面是序号。如果填0时,服务自动修改成:(4bit 当前时间秒)| (0000 4bit)
|
||||
|
||||
@@ -13,9 +13,9 @@ import (
|
||||
"time"
|
||||
)
|
||||
|
||||
const maxClusterNode int = 128
|
||||
const maxClusterNode int = 32
|
||||
|
||||
type FuncRpcClient func(nodeId string, serviceMethod string,filterRetire bool, client []*Client) (error, int)
|
||||
type FuncRpcClient func(nodeId string, serviceMethod string,filterRetire bool, client []*Client) (error, []*Client)
|
||||
type FuncRpcServer func() IServer
|
||||
const NodeIdNull = ""
|
||||
|
||||
@@ -63,7 +63,7 @@ type RpcHandler struct {
|
||||
funcRpcClient FuncRpcClient
|
||||
funcRpcServer FuncRpcServer
|
||||
|
||||
pClientList []*Client
|
||||
//pClientList []*Client
|
||||
}
|
||||
|
||||
//type TriggerRpcConnEvent func(bConnect bool, clientSeq uint32, nodeId string)
|
||||
@@ -135,7 +135,6 @@ func (handler *RpcHandler) InitRpcHandler(rpcHandler IRpcHandler, getClientFun F
|
||||
handler.mapFunctions = map[string]RpcMethodInfo{}
|
||||
handler.funcRpcClient = getClientFun
|
||||
handler.funcRpcServer = getServerFun
|
||||
handler.pClientList = make([]*Client, maxClusterNode)
|
||||
handler.RegisterRpc(rpcHandler)
|
||||
}
|
||||
|
||||
@@ -274,7 +273,7 @@ func (handler *RpcHandler) HandlerRpcRequest(request *RpcRequest) {
|
||||
//普通的rpc请求
|
||||
v, ok := handler.mapFunctions[request.RpcRequestData.GetServiceMethod()]
|
||||
if ok == false {
|
||||
err := "RpcHandler " + handler.rpcHandler.GetName() + "cannot find " + request.RpcRequestData.GetServiceMethod()
|
||||
err := "RpcHandler " + handler.rpcHandler.GetName() + " cannot find " + request.RpcRequestData.GetServiceMethod()
|
||||
log.Error("HandlerRpcRequest cannot find serviceMethod",log.String("RpcHandlerName",handler.rpcHandler.GetName()),log.String("serviceMethod",request.RpcRequestData.GetServiceMethod()))
|
||||
if request.requestHandle != nil {
|
||||
request.requestHandle(nil, RpcError(err))
|
||||
@@ -435,9 +434,9 @@ func (handler *RpcHandler) CallMethod(client *Client,ServiceMethod string, param
|
||||
}
|
||||
|
||||
func (handler *RpcHandler) goRpc(processor IRpcProcessor, bCast bool, nodeId string, serviceMethod string, args interface{}) error {
|
||||
var pClientList [maxClusterNode]*Client
|
||||
err, count := handler.funcRpcClient(nodeId, serviceMethod,false, pClientList[:])
|
||||
if count == 0 {
|
||||
pClientList :=make([]*Client,0,maxClusterNode)
|
||||
err, pClientList := handler.funcRpcClient(nodeId, serviceMethod,false, pClientList)
|
||||
if len(pClientList) == 0 {
|
||||
if err != nil {
|
||||
log.Error("call serviceMethod is failed",log.String("serviceMethod",serviceMethod),log.ErrorAttr("error",err))
|
||||
} else {
|
||||
@@ -446,13 +445,13 @@ func (handler *RpcHandler) goRpc(processor IRpcProcessor, bCast bool, nodeId str
|
||||
return err
|
||||
}
|
||||
|
||||
if count > 1 && bCast == false {
|
||||
if len(pClientList) > 1 && bCast == false {
|
||||
log.Error("cannot call serviceMethod more then 1 node",log.String("serviceMethod",serviceMethod))
|
||||
return errors.New("cannot call more then 1 node")
|
||||
}
|
||||
|
||||
//2.rpcClient调用
|
||||
for i := 0; i < count; i++ {
|
||||
for i := 0; i < len(pClientList); i++ {
|
||||
pCall := pClientList[i].Go(pClientList[i].GetTargetNodeId(),DefaultRpcTimeout,handler.rpcHandler,true, serviceMethod, args, nil)
|
||||
if pCall.Err != nil {
|
||||
err = pCall.Err
|
||||
@@ -465,16 +464,16 @@ func (handler *RpcHandler) goRpc(processor IRpcProcessor, bCast bool, nodeId str
|
||||
}
|
||||
|
||||
func (handler *RpcHandler) callRpc(timeout time.Duration,nodeId string, serviceMethod string, args interface{}, reply interface{}) error {
|
||||
var pClientList [maxClusterNode]*Client
|
||||
err, count := handler.funcRpcClient(nodeId, serviceMethod,false, pClientList[:])
|
||||
pClientList :=make([]*Client,0,maxClusterNode)
|
||||
err, pClientList := handler.funcRpcClient(nodeId, serviceMethod,false, pClientList)
|
||||
if err != nil {
|
||||
log.Error("Call serviceMethod is failed",log.ErrorAttr("error",err))
|
||||
return err
|
||||
} else if count <= 0 {
|
||||
} else if len(pClientList) <= 0 {
|
||||
err = errors.New("Call serviceMethod is error:cannot find " + serviceMethod)
|
||||
log.Error("cannot find serviceMethod",log.String("serviceMethod",serviceMethod))
|
||||
return err
|
||||
} else if count > 1 {
|
||||
} else if len(pClientList) > 1 {
|
||||
log.Error("Cannot call more then 1 node!",log.String("serviceMethod",serviceMethod))
|
||||
return errors.New("cannot call more then 1 node")
|
||||
}
|
||||
@@ -509,9 +508,9 @@ func (handler *RpcHandler) asyncCallRpc(timeout time.Duration,nodeId string, ser
|
||||
}
|
||||
|
||||
reply := reflect.New(fVal.Type().In(0).Elem()).Interface()
|
||||
var pClientList [2]*Client
|
||||
err, count := handler.funcRpcClient(nodeId, serviceMethod,false, pClientList[:])
|
||||
if count == 0 || err != nil {
|
||||
pClientList :=make([]*Client,0,1)
|
||||
err, pClientList := handler.funcRpcClient(nodeId, serviceMethod,false, pClientList[:])
|
||||
if len(pClientList) == 0 || err != nil {
|
||||
if err == nil {
|
||||
if nodeId != NodeIdNull {
|
||||
err = fmt.Errorf("cannot find %s from nodeId %d",serviceMethod,nodeId)
|
||||
@@ -524,7 +523,7 @@ func (handler *RpcHandler) asyncCallRpc(timeout time.Duration,nodeId string, ser
|
||||
return emptyCancelRpc,nil
|
||||
}
|
||||
|
||||
if count > 1 {
|
||||
if len(pClientList) > 1 {
|
||||
err := errors.New("cannot call more then 1 node")
|
||||
fVal.Call([]reflect.Value{reflect.ValueOf(reply), reflect.ValueOf(err)})
|
||||
log.Error("cannot call more then 1 node",log.String("serviceMethod",serviceMethod))
|
||||
@@ -593,12 +592,13 @@ func (handler *RpcHandler) CastGo(serviceMethod string, args interface{}) error
|
||||
|
||||
func (handler *RpcHandler) RawGoNode(rpcProcessorType RpcProcessorType, nodeId string, rpcMethodId uint32, serviceName string, rawArgs []byte) error {
|
||||
processor := GetProcessor(uint8(rpcProcessorType))
|
||||
err, count := handler.funcRpcClient(nodeId, serviceName,false, handler.pClientList)
|
||||
if count == 0 || err != nil {
|
||||
pClientList := make([]*Client,0,1)
|
||||
err, pClientList := handler.funcRpcClient(nodeId, serviceName,false, pClientList)
|
||||
if len(pClientList) == 0 || err != nil {
|
||||
log.Error("call serviceMethod is failed",log.ErrorAttr("error",err))
|
||||
return err
|
||||
}
|
||||
if count > 1 {
|
||||
if len(pClientList) > 1 {
|
||||
err := errors.New("cannot call more then 1 node")
|
||||
log.Error("cannot call more then 1 node",log.String("serviceName",serviceName))
|
||||
return err
|
||||
@@ -606,14 +606,14 @@ func (handler *RpcHandler) RawGoNode(rpcProcessorType RpcProcessorType, nodeId s
|
||||
|
||||
//2.rpcClient调用
|
||||
//如果调用本结点服务
|
||||
for i := 0; i < count; i++ {
|
||||
for i := 0; i < len(pClientList); i++ {
|
||||
//跨node调用
|
||||
pCall := handler.pClientList[i].RawGo(handler.pClientList[i].GetTargetNodeId(),DefaultRpcTimeout,handler.rpcHandler,processor, true, rpcMethodId, serviceName, rawArgs, nil)
|
||||
pCall := pClientList[i].RawGo(pClientList[i].GetTargetNodeId(),DefaultRpcTimeout,handler.rpcHandler,processor, true, rpcMethodId, serviceName, rawArgs, nil)
|
||||
if pCall.Err != nil {
|
||||
err = pCall.Err
|
||||
}
|
||||
|
||||
handler.pClientList[i].RemovePending(pCall.Seq)
|
||||
pClientList[i].RemovePending(pCall.Seq)
|
||||
ReleaseCall(pCall)
|
||||
}
|
||||
|
||||
|
||||
@@ -14,11 +14,14 @@ import (
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"github.com/duanhf2012/origin/v2/concurrent"
|
||||
"encoding/json"
|
||||
)
|
||||
|
||||
var timerDispatcherLen = 100000
|
||||
var maxServiceEventChannelNum = 2000000
|
||||
|
||||
|
||||
|
||||
type IService interface {
|
||||
concurrent.IConcurrent
|
||||
Init(iService IService,getClientFun rpc.FuncRpcClient,getServerFun rpc.FuncRpcServer,serviceCfg interface{})
|
||||
@@ -293,6 +296,24 @@ func (s *Service) GetServiceCfg()interface{}{
|
||||
return s.serviceCfg
|
||||
}
|
||||
|
||||
func (s *Service) ParseServiceCfg(cfg interface{}) error{
|
||||
if s.serviceCfg == nil {
|
||||
return errors.New("no service configuration found")
|
||||
}
|
||||
|
||||
rv := reflect.ValueOf(s.serviceCfg)
|
||||
if rv.Kind() == reflect.Ptr && rv.IsNil() {
|
||||
return errors.New("no service configuration found")
|
||||
}
|
||||
|
||||
bytes,err := json.Marshal(s.serviceCfg)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return json.Unmarshal(bytes,cfg)
|
||||
}
|
||||
|
||||
func (s *Service) GetProfiler() *profiler.Profiler{
|
||||
return s.profiler
|
||||
}
|
||||
@@ -368,12 +389,12 @@ func (s *Service) UnRegNatsConnListener() {
|
||||
func (s *Service) RegDiscoverListener(discoveryServiceListener rpc.IDiscoveryServiceListener) {
|
||||
s.discoveryServiceLister = discoveryServiceListener
|
||||
s.RegEventReceiverFunc(event.Sys_Event_DiscoverService,s.GetEventHandler(),s.OnDiscoverServiceEvent)
|
||||
RegDiscoveryServiceEventFun(s.GetName())
|
||||
RegRpcEventFun(s.GetName())
|
||||
}
|
||||
|
||||
func (s *Service) UnRegDiscoverListener() {
|
||||
s.UnRegEventReceiverFunc(event.Sys_Event_DiscoverService,s.GetEventHandler())
|
||||
UnRegDiscoveryServiceEventFun(s.GetName())
|
||||
UnRegRpcEventFun(s.GetName())
|
||||
}
|
||||
|
||||
func (s *Service) PushRpcRequest(rpcRequest *rpc.RpcRequest) error{
|
||||
|
||||
@@ -14,9 +14,6 @@ type RegDiscoveryServiceEventFunType func(serviceName string)
|
||||
var RegRpcEventFun RegRpcEventFunType
|
||||
var UnRegRpcEventFun RegRpcEventFunType
|
||||
|
||||
var RegDiscoveryServiceEventFun RegDiscoveryServiceEventFunType
|
||||
var UnRegDiscoveryServiceEventFun RegDiscoveryServiceEventFunType
|
||||
|
||||
func init(){
|
||||
mapServiceName = map[string]IService{}
|
||||
setupServiceList = []IService{}
|
||||
@@ -26,7 +23,7 @@ func Init() {
|
||||
for _,s := range setupServiceList {
|
||||
err := s.OnInit()
|
||||
if err != nil {
|
||||
log.SError("Failed to initialize "+s.GetName()+" service:"+err.Error())
|
||||
log.Error("Failed to initialize "+s.GetName()+" service",log.ErrorAttr("err",err))
|
||||
os.Exit(1)
|
||||
}
|
||||
}
|
||||
|
||||
314
sysmodule/ginmodule/GinModule.go
Normal file
314
sysmodule/ginmodule/GinModule.go
Normal file
@@ -0,0 +1,314 @@
|
||||
package ginmodule
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/duanhf2012/origin/v2/event"
|
||||
"github.com/duanhf2012/origin/v2/log"
|
||||
"github.com/duanhf2012/origin/v2/service"
|
||||
"github.com/gin-gonic/gin"
|
||||
"log/slog"
|
||||
"net/http"
|
||||
"strings"
|
||||
"time"
|
||||
"io"
|
||||
)
|
||||
|
||||
type IGinProcessor interface {
|
||||
Process(data *gin.Context) (*gin.Context, error)
|
||||
}
|
||||
|
||||
type GinModule struct {
|
||||
service.Module
|
||||
|
||||
*gin.Engine
|
||||
srv *http.Server
|
||||
|
||||
listenAddr string
|
||||
handleTimeout time.Duration
|
||||
processor []IGinProcessor
|
||||
}
|
||||
|
||||
func (gm *GinModule) Init(addr string, handleTimeout time.Duration,engine *gin.Engine) {
|
||||
gm.listenAddr = addr
|
||||
gm.handleTimeout = handleTimeout
|
||||
gm.Engine = engine
|
||||
}
|
||||
|
||||
func (gm *GinModule) SetupDataProcessor(processor ...IGinProcessor) {
|
||||
gm.processor = processor
|
||||
}
|
||||
|
||||
func (gm *GinModule) AppendDataProcessor(processor ...IGinProcessor) {
|
||||
gm.processor = append(gm.processor, processor...)
|
||||
}
|
||||
|
||||
func (gm *GinModule) OnInit() error {
|
||||
if gm.Engine == nil {
|
||||
gm.Engine = gin.Default()
|
||||
}
|
||||
|
||||
gm.srv = &http.Server{
|
||||
Addr: gm.listenAddr,
|
||||
Handler: gm.Engine,
|
||||
}
|
||||
|
||||
gm.Engine.Use(Logger())
|
||||
gm.Engine.Use(gin.Recovery())
|
||||
gm.GetEventProcessor().RegEventReceiverFunc(event.Sys_Event_Gin_Event, gm.GetEventHandler(), gm.eventHandler)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (gm *GinModule) eventHandler(ev event.IEvent) {
|
||||
ginEvent := ev.(*GinEvent)
|
||||
for _, handler := range ginEvent.handlersChain {
|
||||
handler(&ginEvent.c)
|
||||
}
|
||||
|
||||
//ginEvent.chanWait <- struct{}{}
|
||||
}
|
||||
|
||||
func (gm *GinModule) Start() {
|
||||
gm.srv.Addr = gm.listenAddr
|
||||
log.Info("http start listen", slog.Any("addr", gm.listenAddr))
|
||||
go func() {
|
||||
err := gm.srv.ListenAndServe()
|
||||
if err != nil {
|
||||
log.Error("ListenAndServe error", slog.Any("error", err.Error()))
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func (gm *GinModule) StartTLS(certFile, keyFile string) {
|
||||
log.Info("http start listen", slog.Any("addr", gm.listenAddr))
|
||||
go func() {
|
||||
err := gm.srv.ListenAndServeTLS(certFile, keyFile)
|
||||
if err != nil {
|
||||
log.Fatal("ListenAndServeTLS error", slog.Any("error", err.Error()))
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func (gm *GinModule) Stop(ctx context.Context) {
|
||||
if err := gm.srv.Shutdown(ctx); err != nil {
|
||||
log.SError("Server Shutdown", slog.Any("error", err))
|
||||
}
|
||||
}
|
||||
|
||||
type SafeContext struct {
|
||||
*gin.Context
|
||||
chanWait chan struct{}
|
||||
}
|
||||
|
||||
func (c *SafeContext) JSONAndDone(code int, obj any) {
|
||||
c.Context.JSON(code,obj)
|
||||
c.Done()
|
||||
}
|
||||
|
||||
func (c *SafeContext) AsciiJSONAndDone(code int, obj any){
|
||||
c.Context.AsciiJSON(code,obj)
|
||||
c.Done()
|
||||
}
|
||||
|
||||
func (c *SafeContext) PureJSONAndDone(code int, obj any){
|
||||
c.Context.PureJSON(code,obj)
|
||||
c.Done()
|
||||
}
|
||||
|
||||
func (c *SafeContext) XMLAndDone(code int, obj any){
|
||||
c.Context.XML(code,obj)
|
||||
c.Done()
|
||||
}
|
||||
|
||||
func (c *SafeContext) YAMLAndDone(code int, obj any){
|
||||
c.Context.YAML(code,obj)
|
||||
c.Done()
|
||||
}
|
||||
|
||||
func (c *SafeContext) TOMLAndDone(code int, obj any){
|
||||
c.Context.TOML(code,obj)
|
||||
c.Done()
|
||||
}
|
||||
|
||||
func (c *SafeContext) ProtoBufAndDone(code int, obj any){
|
||||
c.Context.ProtoBuf(code,obj)
|
||||
c.Done()
|
||||
}
|
||||
|
||||
func (c *SafeContext) StringAndDone(code int, format string, values ...any){
|
||||
c.Context.String(code,format,values...)
|
||||
c.Done()
|
||||
}
|
||||
|
||||
func (c *SafeContext) RedirectAndDone(code int, location string){
|
||||
c.Context.Redirect(code,location)
|
||||
c.Done()
|
||||
}
|
||||
|
||||
func (c *SafeContext) DataAndDone(code int, contentType string, data []byte){
|
||||
c.Context.Data(code,contentType,data)
|
||||
c.Done()
|
||||
}
|
||||
|
||||
func (c *SafeContext) DataFromReaderAndDone(code int, contentLength int64, contentType string, reader io.Reader, extraHeaders map[string]string){
|
||||
c.DataFromReader(code,contentLength,contentType,reader,extraHeaders)
|
||||
c.Done()
|
||||
}
|
||||
|
||||
func (c *SafeContext) HTMLAndDone(code int, name string, obj any){
|
||||
c.Context.HTML(code,name,obj)
|
||||
c.Done()
|
||||
}
|
||||
|
||||
func (c *SafeContext) IndentedJSONAndDone(code int, obj any){
|
||||
c.Context.IndentedJSON(code,obj)
|
||||
c.Done()
|
||||
}
|
||||
|
||||
func (c *SafeContext) SecureJSONAndDone(code int, obj any){
|
||||
c.Context.SecureJSON(code,obj)
|
||||
c.Done()
|
||||
}
|
||||
|
||||
func (c *SafeContext) JSONPAndDone(code int, obj any){
|
||||
c.Context.JSONP(code,obj)
|
||||
c.Done()
|
||||
}
|
||||
|
||||
func (c *SafeContext) Done(){
|
||||
c.chanWait <- struct{}{}
|
||||
}
|
||||
|
||||
type GinEvent struct {
|
||||
handlersChain []SafeHandlerFunc
|
||||
c SafeContext
|
||||
}
|
||||
|
||||
type SafeHandlerFunc func(*SafeContext)
|
||||
|
||||
func (ge *GinEvent) GetEventType() event.EventType {
|
||||
return event.Sys_Event_Gin_Event
|
||||
}
|
||||
|
||||
func (gm *GinModule) handleMethod(httpMethod, relativePath string, handlers ...SafeHandlerFunc) gin.IRoutes {
|
||||
return gm.Engine.Handle(httpMethod, relativePath, func(c *gin.Context) {
|
||||
for _, p := range gm.processor {
|
||||
_, err := p.Process(c)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
var ev GinEvent
|
||||
chanWait := make(chan struct{},2)
|
||||
ev.c.chanWait = chanWait
|
||||
ev.handlersChain = handlers
|
||||
ev.c.Context = c
|
||||
gm.NotifyEvent(&ev)
|
||||
|
||||
ctx,cancel := context.WithTimeout(context.Background(), gm.handleTimeout)
|
||||
defer cancel()
|
||||
|
||||
select{
|
||||
case <-ctx.Done():
|
||||
log.Error("GinModule process timeout", slog.Any("path", c.Request.URL.Path))
|
||||
c.AbortWithStatus(http.StatusRequestTimeout)
|
||||
case <-chanWait:
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// GET 回调处理是在gin协程中
|
||||
func (gm *GinModule) GET(relativePath string, handlers ...gin.HandlerFunc) gin.IRoutes {
|
||||
return gm.Engine.GET(relativePath, handlers...)
|
||||
}
|
||||
|
||||
// POST 回调处理是在gin协程中
|
||||
func (gm *GinModule) POST(relativePath string, handlers ...gin.HandlerFunc) gin.IRoutes {
|
||||
return gm.Engine.POST(relativePath, handlers...)
|
||||
}
|
||||
|
||||
// DELETE 回调处理是在gin协程中
|
||||
func (gm *GinModule) DELETE(relativePath string, handlers ...gin.HandlerFunc) gin.IRoutes {
|
||||
return gm.Engine.DELETE(relativePath, handlers...)
|
||||
}
|
||||
|
||||
// PATCH 回调处理是在gin协程中
|
||||
func (gm *GinModule) PATCH(relativePath string, handlers ...gin.HandlerFunc) gin.IRoutes {
|
||||
return gm.Engine.PATCH(relativePath, handlers...)
|
||||
}
|
||||
|
||||
// Put 回调处理是在gin协程中
|
||||
func (gm *GinModule) Put(relativePath string, handlers ...gin.HandlerFunc) gin.IRoutes {
|
||||
return gm.Engine.PUT(relativePath, handlers...)
|
||||
}
|
||||
|
||||
// SafeGET 回调处理是在service协程中
|
||||
func (gm *GinModule) SafeGET(relativePath string, handlers ...SafeHandlerFunc) gin.IRoutes {
|
||||
return gm.handleMethod(http.MethodGet, relativePath, handlers...)
|
||||
}
|
||||
|
||||
// SafePOST 回调处理是在service协程中
|
||||
func (gm *GinModule) SafePOST(relativePath string, handlers ...SafeHandlerFunc) gin.IRoutes {
|
||||
return gm.handleMethod(http.MethodPost, relativePath, handlers...)
|
||||
}
|
||||
|
||||
// SafeDELETE 回调处理是在service协程中
|
||||
func (gm *GinModule) SafeDELETE(relativePath string, handlers ...SafeHandlerFunc) gin.IRoutes {
|
||||
return gm.handleMethod(http.MethodDelete, relativePath, handlers...)
|
||||
}
|
||||
|
||||
// SafePATCH 回调处理是在service协程中
|
||||
func (gm *GinModule) SafePATCH(relativePath string, handlers ...SafeHandlerFunc) gin.IRoutes {
|
||||
return gm.handleMethod(http.MethodPatch, relativePath, handlers...)
|
||||
}
|
||||
|
||||
// SafePut 回调处理是在service协程中
|
||||
func (gm *GinModule) SafePut(relativePath string, handlers ...SafeHandlerFunc) gin.IRoutes {
|
||||
return gm.handleMethod(http.MethodPut, relativePath, handlers...)
|
||||
}
|
||||
|
||||
func GetIPWithProxyHeaders(c *gin.Context) string {
|
||||
// 尝试从 X-Real-IP 头部获取真实 IP
|
||||
ip := c.GetHeader("X-Real-IP")
|
||||
|
||||
// 如果 X-Real-IP 头部不存在,则尝试从 X-Forwarded-For 头部获取
|
||||
if ip == "" {
|
||||
ip = c.GetHeader("X-Forwarded-For")
|
||||
}
|
||||
|
||||
// 如果两者都不存在,则使用默认的 ClientIP 方法获取 IP
|
||||
if ip == "" {
|
||||
ip = c.ClientIP()
|
||||
}
|
||||
|
||||
return ip
|
||||
}
|
||||
|
||||
func GetIPWithValidatedProxyHeaders(c *gin.Context) string {
|
||||
// 获取代理头部
|
||||
proxyHeaders := c.Request.Header.Get("X-Real-IP,X-Forwarded-For")
|
||||
|
||||
// 分割代理头部,取第一个 IP 作为真实 IP
|
||||
ips := strings.Split(proxyHeaders, ",")
|
||||
ip := strings.TrimSpace(ips[0])
|
||||
|
||||
// 如果 IP 格式合法,则使用获取到的 IP,否则使用默认的 ClientIP 方法获取
|
||||
if isValidIP(ip) {
|
||||
return ip
|
||||
} else {
|
||||
ip = c.ClientIP()
|
||||
return ip
|
||||
}
|
||||
}
|
||||
|
||||
// isValidIP 判断 IP 格式是否合法
|
||||
func isValidIP(ip string) bool {
|
||||
// 此处添加自定义的 IP 格式验证逻辑
|
||||
// 例如,使用正则表达式验证 IP 格式
|
||||
// ...
|
||||
if ip == "" {
|
||||
return false
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
79
sysmodule/ginmodule/logger.go
Normal file
79
sysmodule/ginmodule/logger.go
Normal file
@@ -0,0 +1,79 @@
|
||||
package ginmodule
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/duanhf2012/origin/v2/log"
|
||||
"github.com/gin-gonic/gin"
|
||||
"time"
|
||||
)
|
||||
|
||||
// Logger 是一个自定义的日志中间件
|
||||
func Logger() gin.HandlerFunc {
|
||||
|
||||
return func(c *gin.Context) {
|
||||
// 处理请求前记录日志
|
||||
// 开始时间
|
||||
startTime := time.Now()
|
||||
// 调用该请求的剩余处理程序
|
||||
c.Next()
|
||||
// 结束时间
|
||||
endTime := time.Now()
|
||||
// 执行时间
|
||||
latencyTime := endTime.Sub(startTime)
|
||||
|
||||
// 请求IP
|
||||
clientIP := c.ClientIP()
|
||||
// remoteIP := c.RemoteIP()
|
||||
|
||||
// 请求方式
|
||||
reqMethod := c.Request.Method
|
||||
// 请求路由
|
||||
reqUri := c.Request.RequestURI
|
||||
// 请求协议
|
||||
reqProto := c.Request.Proto
|
||||
// 请求来源
|
||||
repReferer := c.Request.Referer()
|
||||
// 请求UA
|
||||
reqUA := c.Request.UserAgent()
|
||||
|
||||
// 请求响应内容长度
|
||||
resLength := c.Writer.Size()
|
||||
if resLength < 0 {
|
||||
resLength = 0
|
||||
}
|
||||
// 响应状态码
|
||||
statusCode := c.Writer.Status()
|
||||
|
||||
log.SDebug(fmt.Sprintf(
|
||||
"%s | %3d | %s %10s | \033[44;37m%-6s\033[0m %s %s | %10v | \"%s\" \"%s\"",
|
||||
colorForStatus(statusCode),
|
||||
statusCode,
|
||||
colorForStatus(0),
|
||||
clientIP,
|
||||
// remoteIP,
|
||||
reqMethod,
|
||||
reqUri,
|
||||
reqProto,
|
||||
latencyTime,
|
||||
reqUA,
|
||||
repReferer,
|
||||
))
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
// colorForStatus 根据 HTTP 状态码返回 ANSI 颜色代码
|
||||
func colorForStatus(code int) string {
|
||||
switch {
|
||||
case code >= 200 && code < 300:
|
||||
return "\033[42;1;37m" // green
|
||||
case code >= 300 && code < 400:
|
||||
return "\033[34m" // blue
|
||||
case code >= 400 && code < 500:
|
||||
return "\033[33m" // yellow
|
||||
case code == 0:
|
||||
return "\033[0m" // cancel
|
||||
default:
|
||||
return "\033[31m" // red
|
||||
}
|
||||
}
|
||||
65
sysmodule/kafkamodule/Admin.go
Normal file
65
sysmodule/kafkamodule/Admin.go
Normal file
@@ -0,0 +1,65 @@
|
||||
package kafkamodule
|
||||
|
||||
import "github.com/IBM/sarama"
|
||||
|
||||
type KafkaAdmin struct {
|
||||
sarama.ClusterAdmin
|
||||
|
||||
mapTopic map[string]sarama.TopicDetail
|
||||
}
|
||||
|
||||
func (ka *KafkaAdmin) Setup(kafkaVersion string, addrs []string) error {
|
||||
config := sarama.NewConfig()
|
||||
var err error
|
||||
config.Version, err = sarama.ParseKafkaVersion(kafkaVersion)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
ka.ClusterAdmin, err = sarama.NewClusterAdmin(addrs, config)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
ka.mapTopic, err = ka.GetTopics()
|
||||
if err != nil {
|
||||
ka.ClusterAdmin.Close()
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ka *KafkaAdmin) RefreshTopic() error {
|
||||
var err error
|
||||
ka.mapTopic, err = ka.GetTopics()
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func (ka *KafkaAdmin) HasTopic(topic string) bool {
|
||||
_, ok := ka.mapTopic[topic]
|
||||
|
||||
return ok
|
||||
}
|
||||
|
||||
func (ka *KafkaAdmin) GetTopicDetail(topic string) *sarama.TopicDetail {
|
||||
topicDetail, ok := ka.mapTopic[topic]
|
||||
if ok == false {
|
||||
return nil
|
||||
}
|
||||
|
||||
return &topicDetail
|
||||
}
|
||||
|
||||
func (ka *KafkaAdmin) GetTopics() (map[string]sarama.TopicDetail, error) {
|
||||
return ka.ListTopics()
|
||||
}
|
||||
|
||||
// CreateTopic 创建主题
|
||||
// numPartitions分区数
|
||||
// replicationFactor副本数
|
||||
// validateOnly参数执行操作时只进行参数验证而不实际执行操作
|
||||
func (ka *KafkaAdmin) CreateTopic(topic string, numPartitions int32, replicationFactor int16, validateOnly bool) error {
|
||||
return ka.ClusterAdmin.CreateTopic(topic, &sarama.TopicDetail{NumPartitions: numPartitions, ReplicationFactor: replicationFactor}, validateOnly)
|
||||
}
|
||||
289
sysmodule/kafkamodule/Consumer.go
Normal file
289
sysmodule/kafkamodule/Consumer.go
Normal file
@@ -0,0 +1,289 @@
|
||||
package kafkamodule
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/IBM/sarama"
|
||||
"github.com/duanhf2012/origin/v2/log"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
type ConsumerGroup struct {
|
||||
sarama.ConsumerGroup
|
||||
waitGroup sync.WaitGroup
|
||||
|
||||
chanExit chan error
|
||||
ready chan bool
|
||||
cancel context.CancelFunc
|
||||
groupId string
|
||||
}
|
||||
|
||||
func NewConsumerConfig(kafkaVersion string, assignor string, offsetsInitial int64) (*sarama.Config, error) {
|
||||
var err error
|
||||
|
||||
config := sarama.NewConfig()
|
||||
config.Version, err = sarama.ParseKafkaVersion(kafkaVersion)
|
||||
config.Consumer.Offsets.Initial = offsetsInitial
|
||||
config.Consumer.Offsets.AutoCommit.Enable = false
|
||||
|
||||
switch assignor {
|
||||
case "sticky":
|
||||
// 黏性roundRobin,rebalance之后首先保证前面的分配,从后面剥离
|
||||
// topic:T0{P0,P1,P2,P3,P4,P5},消费者:C1,C2
|
||||
// ---------------before rebalance:即roundRobin
|
||||
// C1: T0{P0} T0{P2} T0{P4}
|
||||
// C2: T0{P1} T0{P3} T0{P5}
|
||||
// ----------------after rebalance:增加了一个消费者
|
||||
// C1: T0{P0} T0{P2}
|
||||
// C2: T0{P1} T0{P3}
|
||||
// C3: T0{P4} T0{P5} until每个消费者的分区数误差不超过1
|
||||
config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.NewBalanceStrategySticky()}
|
||||
case "roundrobin":
|
||||
// roundRobin --逐个平均分发
|
||||
// topic: T0{P0,P1,P2},T1{P0,P1,P2,P3}两个消费者C1,C2
|
||||
// C1: T0{P0} T0{P2} T1{P1} T1{P3}
|
||||
// C2: T0{P1} T1{P0} T1{P2}
|
||||
config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.NewBalanceStrategyRoundRobin()}
|
||||
case "range":
|
||||
// 默认值 --一次平均分发
|
||||
// topic: T0{P0,P1,P2,P3},T1{P0,P1,P2,P3},两个消费者C1,C2
|
||||
// T1分区总数6 / 消费者数2 = 3 ,即该会话的分区每个消费者分3个
|
||||
// T2分区总数4 / 消费者数2 = 2 ,即该会话的分区每个消费者分2个
|
||||
// C1: T0{P0, P1, P2} T1{P0, P1}
|
||||
// C2: T0{P3, P4, P5} T1{P2, P3}
|
||||
config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.NewBalanceStrategyRange()}
|
||||
default:
|
||||
return nil, fmt.Errorf("Unrecognized consumer group partition assignor: %s", assignor)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return config, nil
|
||||
}
|
||||
|
||||
type IMsgReceiver interface {
|
||||
Receiver(msgs []*sarama.ConsumerMessage) bool
|
||||
}
|
||||
|
||||
func (c *ConsumerGroup) Setup(addr []string, topics []string, groupId string, config *sarama.Config, receiverInterval time.Duration, maxReceiverNum int, msgReceiver IMsgReceiver) error {
|
||||
var err error
|
||||
c.ConsumerGroup, err = sarama.NewConsumerGroup(addr, groupId, config)
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
c.groupId = groupId
|
||||
c.chanExit = make(chan error, 1)
|
||||
|
||||
var handler ConsumerGroupHandler
|
||||
handler.receiver = msgReceiver
|
||||
handler.maxReceiverNum = maxReceiverNum
|
||||
handler.receiverInterval = receiverInterval
|
||||
handler.chanExit = c.chanExit
|
||||
|
||||
var ctx context.Context
|
||||
ctx, c.cancel = context.WithCancel(context.Background())
|
||||
|
||||
c.waitGroup.Add(1)
|
||||
go func() {
|
||||
defer c.waitGroup.Done()
|
||||
|
||||
for {
|
||||
if err = c.Consume(ctx, topics, &handler); err != nil {
|
||||
// 当setup失败的时候,error会返回到这里
|
||||
log.Error("Error from consumer", log.Any("err", err))
|
||||
return
|
||||
}
|
||||
|
||||
// check if context was cancelled, signaling that the consumer should stop
|
||||
if ctx.Err() != nil {
|
||||
log.Info("consumer stop", log.Any("info", ctx.Err()))
|
||||
}
|
||||
|
||||
c.chanExit <- err
|
||||
}
|
||||
}()
|
||||
|
||||
err = <-c.chanExit
|
||||
//已经准备好了
|
||||
return err
|
||||
}
|
||||
|
||||
func (c *ConsumerGroup) Close() {
|
||||
log.Info("close consumerGroup")
|
||||
//1.cancel掉
|
||||
c.cancel()
|
||||
|
||||
//2.关闭连接
|
||||
err := c.ConsumerGroup.Close()
|
||||
if err != nil {
|
||||
log.Error("close consumerGroup fail", log.Any("err", err.Error()))
|
||||
}
|
||||
|
||||
//3.等待退出
|
||||
c.waitGroup.Wait()
|
||||
}
|
||||
|
||||
type ConsumerGroupHandler struct {
|
||||
receiver IMsgReceiver
|
||||
|
||||
receiverInterval time.Duration
|
||||
maxReceiverNum int
|
||||
|
||||
//mapTopicOffset map[string]map[int32]int //map[topic]map[partitionId]offsetInfo
|
||||
mapTopicData map[string]*MsgData
|
||||
mx sync.Mutex
|
||||
|
||||
chanExit chan error
|
||||
isRebalance bool //是否为再平衡
|
||||
//stopSig *int32
|
||||
}
|
||||
|
||||
type MsgData struct {
|
||||
sync.Mutex
|
||||
msg []*sarama.ConsumerMessage
|
||||
|
||||
mapPartitionOffset map[int32]int64
|
||||
}
|
||||
|
||||
func (ch *ConsumerGroupHandler) Flush(session sarama.ConsumerGroupSession, topic string) {
|
||||
if topic != "" {
|
||||
msgData := ch.GetMsgData(topic)
|
||||
msgData.flush(session, ch.receiver, topic)
|
||||
return
|
||||
}
|
||||
|
||||
for tp, msgData := range ch.mapTopicData {
|
||||
msgData.flush(session, ch.receiver, tp)
|
||||
}
|
||||
}
|
||||
|
||||
func (ch *ConsumerGroupHandler) GetMsgData(topic string) *MsgData {
|
||||
ch.mx.Lock()
|
||||
defer ch.mx.Unlock()
|
||||
|
||||
msgData := ch.mapTopicData[topic]
|
||||
if msgData == nil {
|
||||
msgData = &MsgData{}
|
||||
msgData.msg = make([]*sarama.ConsumerMessage, 0, ch.maxReceiverNum)
|
||||
ch.mapTopicData[topic] = msgData
|
||||
}
|
||||
|
||||
return msgData
|
||||
}
|
||||
|
||||
func (md *MsgData) flush(session sarama.ConsumerGroupSession, receiver IMsgReceiver, topic string) {
|
||||
if len(md.msg) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
//发送给接收者
|
||||
for {
|
||||
ok := receiver.Receiver(md.msg)
|
||||
if ok == true {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
for pId, offset := range md.mapPartitionOffset {
|
||||
|
||||
session.MarkOffset(topic, pId, offset+1, "")
|
||||
log.Info(fmt.Sprintf("topic %s,pid %d,offset %d", topic, pId, offset+1))
|
||||
}
|
||||
session.Commit()
|
||||
//log.Info("commit")
|
||||
//time.Sleep(1000 * time.Second)
|
||||
//置空
|
||||
md.msg = md.msg[:0]
|
||||
clear(md.mapPartitionOffset)
|
||||
}
|
||||
|
||||
func (md *MsgData) appendMsg(session sarama.ConsumerGroupSession, msg *sarama.ConsumerMessage, receiver IMsgReceiver, maxReceiverNum int) {
|
||||
md.Lock()
|
||||
defer md.Unlock()
|
||||
|
||||
//收到的offset只会越来越大在
|
||||
if md.mapPartitionOffset == nil {
|
||||
md.mapPartitionOffset = make(map[int32]int64, 10)
|
||||
}
|
||||
|
||||
md.mapPartitionOffset[msg.Partition] = msg.Offset
|
||||
|
||||
md.msg = append(md.msg, msg)
|
||||
if len(md.msg) < maxReceiverNum {
|
||||
return
|
||||
}
|
||||
|
||||
md.flush(session, receiver, msg.Topic)
|
||||
}
|
||||
|
||||
func (ch *ConsumerGroupHandler) AppendMsg(session sarama.ConsumerGroupSession, msg *sarama.ConsumerMessage) {
|
||||
dataMsg := ch.GetMsgData(msg.Topic)
|
||||
dataMsg.appendMsg(session, msg, ch.receiver, ch.maxReceiverNum)
|
||||
}
|
||||
|
||||
func (ch *ConsumerGroupHandler) Setup(session sarama.ConsumerGroupSession) error {
|
||||
ch.mapTopicData = make(map[string]*MsgData, 128)
|
||||
|
||||
if ch.isRebalance == false {
|
||||
ch.chanExit <- nil
|
||||
}
|
||||
|
||||
ch.isRebalance = true
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ch *ConsumerGroupHandler) Cleanup(session sarama.ConsumerGroupSession) error {
|
||||
ch.Flush(session, "")
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ch *ConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
|
||||
|
||||
ticker := time.NewTicker(ch.receiverInterval)
|
||||
|
||||
for {
|
||||
select {
|
||||
case msg := <-claim.Messages():
|
||||
if msg == nil {
|
||||
log.SWarning("claim will exit", log.Any("topic", claim.Topic()), log.Any("Partition", claim.Partition()))
|
||||
return nil
|
||||
}
|
||||
ch.AppendMsg(session, msg)
|
||||
case <-ticker.C:
|
||||
ch.Flush(session, claim.Topic())
|
||||
case <-session.Context().Done():
|
||||
return nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
阿里云参数说明:https://sls.aliyun.com/doc/oscompatibledemo/sarama_go_kafka_consume.html
|
||||
conf.Net.TLS.Enable = true
|
||||
conf.Net.SASL.Enable = true
|
||||
conf.Net.SASL.User = project
|
||||
conf.Net.SASL.Password = fmt.Sprintf("%s#%s", accessId, accessKey)
|
||||
conf.Net.SASL.Mechanism = "PLAIN"
|
||||
|
||||
|
||||
|
||||
conf.Net.TLS.Enable = true
|
||||
conf.Net.SASL.Enable = true
|
||||
conf.Net.SASL.User = project
|
||||
conf.Net.SASL.Password = fmt.Sprintf("%s#%s", accessId, accessKey)
|
||||
conf.Net.SASL.Mechanism = "PLAIN"
|
||||
|
||||
conf.Consumer.Fetch.Min = 1
|
||||
conf.Consumer.Fetch.Default = 1024 * 1024
|
||||
conf.Consumer.Retry.Backoff = 2 * time.Second
|
||||
conf.Consumer.MaxWaitTime = 250 * time.Millisecond
|
||||
conf.Consumer.MaxProcessingTime = 100 * time.Millisecond
|
||||
conf.Consumer.Return.Errors = false
|
||||
conf.Consumer.Offsets.AutoCommit.Enable = true
|
||||
conf.Consumer.Offsets.AutoCommit.Interval = 1 * time.Second
|
||||
conf.Consumer.Offsets.Initial = sarama.OffsetOldest
|
||||
conf.Consumer.Offsets.Retry.Max = 3
|
||||
*/
|
||||
146
sysmodule/kafkamodule/Producer.go
Normal file
146
sysmodule/kafkamodule/Producer.go
Normal file
@@ -0,0 +1,146 @@
|
||||
package kafkamodule
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/IBM/sarama"
|
||||
"github.com/duanhf2012/origin/v2/log"
|
||||
"github.com/duanhf2012/origin/v2/service"
|
||||
"time"
|
||||
)
|
||||
|
||||
type IProducer interface {
|
||||
}
|
||||
|
||||
type SyncProducer struct {
|
||||
}
|
||||
|
||||
type AsyncProducer struct {
|
||||
}
|
||||
|
||||
type Producer struct {
|
||||
service.Module
|
||||
|
||||
sarama.SyncProducer
|
||||
|
||||
sarama.AsyncProducer
|
||||
}
|
||||
|
||||
// NewProducerConfig 新建producerConfig
|
||||
// kafkaVersion kafka版本
|
||||
// returnErr,returnSucc 是否返回错误与成功
|
||||
// requiredAcks -1 #全量同步确认,强可靠性保证(当所有的 leader 和 follower 都接收成功时)#WaitForAll 1 #leader 确认收到, 默认(仅 leader 反馈)#WaitForLocal 0 #不确认,但是吞吐量大(不 care 结果) #NoResponse
|
||||
// Idempotent(幂等) 确保信息都准确写入一份副本,用于幂等生产者,当这一项设置为true的时候,生产者将保证生产的消息一定是有序且精确一次的
|
||||
// partitioner 生成分区器,用于选择向哪个分区发送信息,默认情况下对消息密钥进行散列
|
||||
func NewProducerConfig(kafkaVersion string, returnErr bool, returnSucc bool, requiredAcks sarama.RequiredAcks, Idempotent bool,
|
||||
partitioner sarama.PartitionerConstructor) (*sarama.Config, error) {
|
||||
config := sarama.NewConfig()
|
||||
var err error
|
||||
config.Version, err = sarama.ParseKafkaVersion(kafkaVersion)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
config.Producer.Return.Errors = returnErr
|
||||
config.Producer.Return.Successes = returnSucc
|
||||
config.Producer.RequiredAcks = requiredAcks
|
||||
config.Producer.Partitioner = partitioner
|
||||
config.Producer.Timeout = 10 * time.Second
|
||||
|
||||
config.Producer.Idempotent = Idempotent
|
||||
if Idempotent == true {
|
||||
config.Net.MaxOpenRequests = 1
|
||||
}
|
||||
return config, nil
|
||||
}
|
||||
|
||||
func (p *Producer) SyncSetup(addr []string, config *sarama.Config) error {
|
||||
var err error
|
||||
p.SyncProducer, err = sarama.NewSyncProducer(addr, config)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *Producer) ASyncSetup(addr []string, config *sarama.Config) error {
|
||||
var err error
|
||||
p.AsyncProducer, err = sarama.NewAsyncProducer(addr, config)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
go func() {
|
||||
p.asyncRun()
|
||||
}()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *Producer) asyncRun() {
|
||||
for {
|
||||
select {
|
||||
case sm := <-p.Successes():
|
||||
if sm.Metadata == nil {
|
||||
break
|
||||
}
|
||||
asyncReturn := sm.Metadata.(*AsyncReturn)
|
||||
asyncReturn.chanReturn <- asyncReturn
|
||||
case em := <-p.Errors():
|
||||
log.Error("async kafkamodule error", log.ErrorAttr("err", em.Err))
|
||||
if em.Msg.Metadata == nil {
|
||||
break
|
||||
}
|
||||
asyncReturn := em.Msg.Metadata.(*AsyncReturn)
|
||||
asyncReturn.Err = em.Err
|
||||
asyncReturn.chanReturn <- asyncReturn
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
type AsyncReturn struct {
|
||||
Msg *sarama.ProducerMessage
|
||||
Err error
|
||||
chanReturn chan *AsyncReturn
|
||||
}
|
||||
|
||||
func (ar *AsyncReturn) WaitOk(ctx context.Context) (*sarama.ProducerMessage, error) {
|
||||
asyncReturn := ar.Msg.Metadata.(*AsyncReturn)
|
||||
select {
|
||||
case <-asyncReturn.chanReturn:
|
||||
return asyncReturn.Msg, asyncReturn.Err
|
||||
case <-ctx.Done():
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
}
|
||||
|
||||
func (p *Producer) AsyncSendMessage(msg *sarama.ProducerMessage) *AsyncReturn {
|
||||
asyncReturn := AsyncReturn{Msg: msg, chanReturn: make(chan *AsyncReturn, 1)}
|
||||
msg.Metadata = &asyncReturn
|
||||
p.AsyncProducer.Input() <- msg
|
||||
|
||||
return &asyncReturn
|
||||
}
|
||||
|
||||
func (p *Producer) AsyncPushMessage(msg *sarama.ProducerMessage) {
|
||||
p.AsyncProducer.Input() <- msg
|
||||
}
|
||||
|
||||
func (p *Producer) Close() {
|
||||
if p.SyncProducer != nil {
|
||||
p.SyncProducer.Close()
|
||||
p.SyncProducer = nil
|
||||
}
|
||||
|
||||
if p.AsyncProducer != nil {
|
||||
p.AsyncProducer.Close()
|
||||
p.AsyncProducer = nil
|
||||
}
|
||||
}
|
||||
|
||||
func (p *Producer) SendMessage(msg *sarama.ProducerMessage) (partition int32, offset int64, err error) {
|
||||
return p.SyncProducer.SendMessage(msg)
|
||||
}
|
||||
|
||||
func (p *Producer) SendMessages(msgs []*sarama.ProducerMessage) error {
|
||||
return p.SyncProducer.SendMessages(msgs)
|
||||
}
|
||||
151
sysmodule/kafkamodule/ProducerAndConsumer_test.go
Normal file
151
sysmodule/kafkamodule/ProducerAndConsumer_test.go
Normal file
@@ -0,0 +1,151 @@
|
||||
package kafkamodule
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/IBM/sarama"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
// 对各参数和机制名称的说明:https://blog.csdn.net/u013311345/article/details/129217728
|
||||
type MsgReceiver struct {
|
||||
t *testing.T
|
||||
}
|
||||
|
||||
func (mr *MsgReceiver) Receiver(msgs []*sarama.ConsumerMessage) bool {
|
||||
for _, m := range msgs {
|
||||
mr.t.Logf("time:%s, topic:%s, partition:%d, offset:%d, key:%s, value:%s", time.Now().Format("2006-01-02 15:04:05.000"), m.Topic, m.Partition, m.Offset, m.Key, string(m.Value))
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
var addr = []string{"192.168.13.24:9092", "192.168.13.24:9093", "192.168.13.24:9094", "192.168.13.24:9095"}
|
||||
var topicName = []string{"test_topic_1", "test_topic_2"}
|
||||
var kafkaVersion = "3.3.1"
|
||||
|
||||
func producer(t *testing.T) {
|
||||
var admin KafkaAdmin
|
||||
err := admin.Setup(kafkaVersion, addr)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
for _, tName := range topicName {
|
||||
if admin.HasTopic(tName) == false {
|
||||
err = admin.CreateTopic(tName, 2, 2, false)
|
||||
t.Log(err)
|
||||
}
|
||||
}
|
||||
|
||||
var pd Producer
|
||||
cfg, err := NewProducerConfig(kafkaVersion, true, true, sarama.WaitForAll, false, sarama.NewHashPartitioner)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
err = pd.SyncSetup(addr, cfg)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
now := time.Now()
|
||||
|
||||
//msgs := make([]*sarama.ProducerMessage, 0, 20000)
|
||||
for i := 0; i < 20000; i++ {
|
||||
var msg sarama.ProducerMessage
|
||||
msg.Key = sarama.StringEncoder(fmt.Sprintf("%d", i))
|
||||
msg.Topic = topicName[0]
|
||||
msg.Value = sarama.StringEncoder(fmt.Sprintf("i'm %d", i))
|
||||
pd.SendMessage(&msg)
|
||||
//msgs = append(msgs, &msg)
|
||||
}
|
||||
//err = pd.SendMessages(msgs)
|
||||
//t.Log(err)
|
||||
t.Log(time.Now().Sub(now).Milliseconds())
|
||||
pd.Close()
|
||||
}
|
||||
|
||||
func producer_async(t *testing.T) {
|
||||
var admin KafkaAdmin
|
||||
err := admin.Setup(kafkaVersion, addr)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
for _, tName := range topicName {
|
||||
if admin.HasTopic(tName) == false {
|
||||
err = admin.CreateTopic(tName, 10, 2, false)
|
||||
t.Log(err)
|
||||
}
|
||||
}
|
||||
|
||||
var pd Producer
|
||||
cfg, err := NewProducerConfig(kafkaVersion, true, true, sarama.WaitForAll, false, sarama.NewHashPartitioner)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
err = pd.ASyncSetup(addr, cfg)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
now := time.Now()
|
||||
|
||||
msgs := make([]*AsyncReturn, 0, 20000)
|
||||
for i := 0; i < 200000; i++ {
|
||||
var msg sarama.ProducerMessage
|
||||
msg.Key = sarama.StringEncoder(fmt.Sprintf("%d", i))
|
||||
msg.Topic = topicName[0]
|
||||
msg.Value = sarama.StringEncoder(fmt.Sprintf("i'm %d", i))
|
||||
|
||||
r := pd.AsyncSendMessage(&msg)
|
||||
msgs = append(msgs, r)
|
||||
}
|
||||
//err = pd.SendMessages(msgs)
|
||||
//t.Log(err)
|
||||
|
||||
for _, r := range msgs {
|
||||
r.WaitOk(context.Background())
|
||||
//t.Log(m, e)
|
||||
}
|
||||
t.Log(time.Now().Sub(now).Milliseconds())
|
||||
|
||||
time.Sleep(1000 * time.Second)
|
||||
pd.Close()
|
||||
}
|
||||
|
||||
func consumer(t *testing.T) {
|
||||
var admin KafkaAdmin
|
||||
err := admin.Setup(kafkaVersion, addr)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
for _, tName := range topicName {
|
||||
if admin.HasTopic(tName) == false {
|
||||
err = admin.CreateTopic(tName, 10, 2, false)
|
||||
t.Log(err)
|
||||
}
|
||||
}
|
||||
|
||||
var cg ConsumerGroup
|
||||
cfg, err := NewConsumerConfig(kafkaVersion, "sticky", sarama.OffsetOldest)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
err = cg.Setup(addr, topicName, "test_groupId", cfg, 50*time.Second, 10, &MsgReceiver{t: t})
|
||||
t.Log(err)
|
||||
time.Sleep(10000 * time.Second)
|
||||
cg.Close()
|
||||
}
|
||||
|
||||
func TestConsumerAndProducer(t *testing.T) {
|
||||
producer_async(t)
|
||||
//go producer(t)
|
||||
//producer(t)
|
||||
//consumer(t)
|
||||
}
|
||||
7
sysmodule/kafkamodule/Sasl.go
Normal file
7
sysmodule/kafkamodule/Sasl.go
Normal file
@@ -0,0 +1,7 @@
|
||||
package kafkamodule
|
||||
|
||||
type Sasl struct {
|
||||
UserName string `json:"UserName"`
|
||||
Passwd string `json:"Passwd"`
|
||||
InstanceId string `json:"InstanceId"`
|
||||
}
|
||||
@@ -1,181 +0,0 @@
|
||||
package mongomodule
|
||||
|
||||
import (
|
||||
"github.com/duanhf2012/origin/v2/log"
|
||||
"gopkg.in/mgo.v2"
|
||||
"gopkg.in/mgo.v2/bson"
|
||||
"sync"
|
||||
"time"
|
||||
"container/heap"
|
||||
_ "gopkg.in/mgo.v2"
|
||||
)
|
||||
|
||||
// session
|
||||
type Session struct {
|
||||
*mgo.Session
|
||||
ref int
|
||||
index int
|
||||
}
|
||||
|
||||
type SessionHeap []*Session
|
||||
|
||||
func (h SessionHeap) Len() int {
|
||||
return len(h)
|
||||
}
|
||||
|
||||
func (h SessionHeap) Less(i, j int) bool {
|
||||
return h[i].ref < h[j].ref
|
||||
}
|
||||
|
||||
func (h SessionHeap) Swap(i, j int) {
|
||||
h[i], h[j] = h[j], h[i]
|
||||
h[i].index = i
|
||||
h[j].index = j
|
||||
}
|
||||
|
||||
func (h *SessionHeap) Push(s interface{}) {
|
||||
s.(*Session).index = len(*h)
|
||||
*h = append(*h, s.(*Session))
|
||||
}
|
||||
|
||||
func (h *SessionHeap) Pop() interface{} {
|
||||
l := len(*h)
|
||||
s := (*h)[l-1]
|
||||
s.index = -1
|
||||
*h = (*h)[:l-1]
|
||||
return s
|
||||
}
|
||||
|
||||
type DialContext struct {
|
||||
sync.Mutex
|
||||
sessions SessionHeap
|
||||
}
|
||||
|
||||
type MongoModule struct {
|
||||
dailContext *DialContext
|
||||
}
|
||||
|
||||
func (slf *MongoModule) Init(url string,sessionNum uint32,dialTimeout time.Duration, timeout time.Duration) error {
|
||||
var err error
|
||||
slf.dailContext, err = dialWithTimeout(url, sessionNum, dialTimeout, timeout)
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func (slf *MongoModule) Ref() *Session{
|
||||
return slf.dailContext.Ref()
|
||||
}
|
||||
|
||||
func (slf *MongoModule) UnRef(s *Session) {
|
||||
slf.dailContext.UnRef(s)
|
||||
}
|
||||
|
||||
// goroutine safe
|
||||
func dialWithTimeout(url string, sessionNum uint32, dialTimeout time.Duration, timeout time.Duration) (*DialContext, error) {
|
||||
if sessionNum <= 0 {
|
||||
sessionNum = 100
|
||||
log.Release("invalid sessionNum, reset to %v", sessionNum)
|
||||
}
|
||||
|
||||
s, err := mgo.DialWithTimeout(url, dialTimeout)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
s.SetMode(mgo.Strong,true)
|
||||
s.SetSyncTimeout(timeout)
|
||||
s.SetSocketTimeout(timeout)
|
||||
|
||||
c := new(DialContext)
|
||||
|
||||
// sessions
|
||||
c.sessions = make(SessionHeap, sessionNum)
|
||||
c.sessions[0] = &Session{s, 0, 0}
|
||||
for i := 1; i < int(sessionNum); i++ {
|
||||
c.sessions[i] = &Session{s.New(), 0, i}
|
||||
}
|
||||
heap.Init(&c.sessions)
|
||||
|
||||
return c, nil
|
||||
}
|
||||
|
||||
// goroutine safe
|
||||
func (c *DialContext) Close() {
|
||||
c.Lock()
|
||||
for _, s := range c.sessions {
|
||||
s.Close()
|
||||
}
|
||||
c.Unlock()
|
||||
}
|
||||
|
||||
// goroutine safe
|
||||
func (c *DialContext) Ref() *Session {
|
||||
c.Lock()
|
||||
s := c.sessions[0]
|
||||
if s.ref == 0 {
|
||||
s.Refresh()
|
||||
}
|
||||
s.ref++
|
||||
heap.Fix(&c.sessions, 0)
|
||||
c.Unlock()
|
||||
|
||||
return s
|
||||
}
|
||||
|
||||
// goroutine safe
|
||||
func (c *DialContext) UnRef(s *Session) {
|
||||
if s == nil {
|
||||
return
|
||||
}
|
||||
c.Lock()
|
||||
s.ref--
|
||||
heap.Fix(&c.sessions, s.index)
|
||||
c.Unlock()
|
||||
}
|
||||
|
||||
|
||||
// goroutine safe
|
||||
func (s *Session) EnsureCounter(db string, collection string, id string) error {
|
||||
err := s.DB(db).C(collection).Insert(bson.M{
|
||||
"_id": id,
|
||||
"seq": 0,
|
||||
})
|
||||
if mgo.IsDup(err) {
|
||||
return nil
|
||||
} else {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// goroutine safe
|
||||
func (s *Session) NextSeq(db string, collection string, id string) (int, error) {
|
||||
var res struct {
|
||||
Seq int
|
||||
}
|
||||
_, err := s.DB(db).C(collection).FindId(id).Apply(mgo.Change{
|
||||
Update: bson.M{"$inc": bson.M{"seq": 1}},
|
||||
ReturnNew: true,
|
||||
}, &res)
|
||||
|
||||
return res.Seq, err
|
||||
}
|
||||
|
||||
// goroutine safe
|
||||
func (s *Session) EnsureIndex(db string, collection string, key []string, bBackground bool) error {
|
||||
return s.DB(db).C(collection).EnsureIndex(mgo.Index{
|
||||
Key: key,
|
||||
Unique: false,
|
||||
Sparse: true,
|
||||
Background: bBackground,
|
||||
})
|
||||
}
|
||||
|
||||
// goroutine safe
|
||||
func (s *Session) EnsureUniqueIndex(db string, collection string, key []string, bBackground bool) error {
|
||||
return s.DB(db).C(collection).EnsureIndex(mgo.Index{
|
||||
Key: key,
|
||||
Unique: true,
|
||||
Sparse: true,
|
||||
Background: bBackground,
|
||||
})
|
||||
}
|
||||
@@ -1,95 +0,0 @@
|
||||
package mongomodule
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
_ "gopkg.in/mgo.v2"
|
||||
"gopkg.in/mgo.v2/bson"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
|
||||
|
||||
type Student struct {
|
||||
ID bson.ObjectId `bson:"_id"`
|
||||
Name string `bson: "name"`
|
||||
Age int `bson: "age"`
|
||||
Sid string `bson: "sid"`
|
||||
Status int `bson: "status"`
|
||||
}
|
||||
|
||||
type StudentName struct {
|
||||
Name string `bson: "name"`
|
||||
}
|
||||
|
||||
|
||||
func Test_Example(t *testing.T) {
|
||||
module:=MongoModule{}
|
||||
module.Init("mongodb://admin:123456@192.168.2.119:27017",100, 5*time.Second,5*time.Second)
|
||||
|
||||
// take session
|
||||
s := module.Take()
|
||||
c := s.DB("test2").C("t_student")
|
||||
|
||||
//2.定义对象
|
||||
insertData := Student{
|
||||
ID:bson.NewObjectId(),
|
||||
Name: "seeta11",
|
||||
Age: 35, //*^_^*
|
||||
Sid: "s20180907",
|
||||
Status: 1,
|
||||
}
|
||||
|
||||
updateData := Student{
|
||||
Name: "seeta11",
|
||||
Age: 18,
|
||||
Sid: "s20180907",
|
||||
Status: 1,
|
||||
}
|
||||
|
||||
|
||||
//3.插入数据
|
||||
err := c.Insert(&insertData)
|
||||
|
||||
//4.查找数据
|
||||
selector := bson.M{"_id":bson.ObjectIdHex("5f25303e999c622d361989b0")}
|
||||
m:=Student{}
|
||||
err = c.Find(selector).One(&m)
|
||||
|
||||
//5.更新数据
|
||||
//selector2 := bson.M{"_id":bson.ObjectIdHex("5f25303e999c622d361989b0")}
|
||||
updateData.ID = bson.ObjectIdHex("5f25303e999c622d361989b0")
|
||||
err = c.UpdateId(bson.ObjectIdHex("5f25303e999c622d361989b0"),&updateData)
|
||||
if err != nil {
|
||||
fmt.Print(err)
|
||||
}
|
||||
|
||||
//6.删除数据
|
||||
err = c.RemoveId(bson.ObjectIdHex("5f252f09999c622d36198951"))
|
||||
if err != nil {
|
||||
fmt.Print(err)
|
||||
}
|
||||
|
||||
//7.序号自增
|
||||
s.EnsureCounter("test2","t_student","5f252f09999c622d36198951")
|
||||
for i := 0; i < 3; i++ {
|
||||
id, err := s.NextSeq("test2", "t_student", "5f252f09999c622d36198951")
|
||||
if err != nil {
|
||||
fmt.Println(err)
|
||||
return
|
||||
}
|
||||
fmt.Println(id)
|
||||
}
|
||||
|
||||
//8.setoninsert使用
|
||||
info,uErr := c.Upsert(bson.M{"_id":bson.ObjectIdHex("5f252f09999c622d36198951")},bson.M{
|
||||
"$setOnInsert":bson.M{"Name":"setoninsert","Age":55}})
|
||||
|
||||
|
||||
//9.修改部分数字数据
|
||||
selector1 := bson.M{"_id":bson.ObjectIdHex("60473de655f1012e7453b369")}
|
||||
update1 := bson.M{"$set":bson.M{"name":"xxxxx","age":1111}}
|
||||
c.Update(selector1,update1)
|
||||
|
||||
fmt.Println(info,uErr)
|
||||
}
|
||||
@@ -16,7 +16,7 @@ type CustomerSubscriber struct {
|
||||
rpc.IRpcHandler
|
||||
topic string
|
||||
subscriber *Subscriber
|
||||
fromNodeId int
|
||||
fromNodeId string
|
||||
callBackRpcMethod string
|
||||
serviceName string
|
||||
StartIndex uint64
|
||||
@@ -37,7 +37,7 @@ const (
|
||||
MethodLast SubscribeMethod = 1 //Last模式,以该消费者上次记录的位置开始订阅
|
||||
)
|
||||
|
||||
func (cs *CustomerSubscriber) trySetSubscriberBaseInfo(rpcHandler rpc.IRpcHandler, ss *Subscriber, topic string, subscribeMethod SubscribeMethod, customerId string, fromNodeId int, callBackRpcMethod string, startIndex uint64, oneBatchQuantity int32) error {
|
||||
func (cs *CustomerSubscriber) trySetSubscriberBaseInfo(rpcHandler rpc.IRpcHandler, ss *Subscriber, topic string, subscribeMethod SubscribeMethod, customerId string, fromNodeId string, callBackRpcMethod string, startIndex uint64, oneBatchQuantity int32) error {
|
||||
cs.subscriber = ss
|
||||
cs.fromNodeId = fromNodeId
|
||||
cs.callBackRpcMethod = callBackRpcMethod
|
||||
@@ -85,7 +85,7 @@ func (cs *CustomerSubscriber) trySetSubscriberBaseInfo(rpcHandler rpc.IRpcHandle
|
||||
}
|
||||
|
||||
// 开始订阅
|
||||
func (cs *CustomerSubscriber) Subscribe(rpcHandler rpc.IRpcHandler, ss *Subscriber, topic string, subscribeMethod SubscribeMethod, customerId string, fromNodeId int, callBackRpcMethod string, startIndex uint64, oneBatchQuantity int32) error {
|
||||
func (cs *CustomerSubscriber) Subscribe(rpcHandler rpc.IRpcHandler, ss *Subscriber, topic string, subscribeMethod SubscribeMethod, customerId string, fromNodeId string, callBackRpcMethod string, startIndex uint64, oneBatchQuantity int32) error {
|
||||
err := cs.trySetSubscriberBaseInfo(rpcHandler, ss, topic, subscribeMethod, customerId, fromNodeId, callBackRpcMethod, startIndex, oneBatchQuantity)
|
||||
if err != nil {
|
||||
return err
|
||||
|
||||
@@ -122,5 +122,5 @@ func (ms *MessageQueueService) RPC_Publish(inParam *rpc.DBQueuePublishReq, outPa
|
||||
|
||||
func (ms *MessageQueueService) RPC_Subscribe(req *rpc.DBQueueSubscribeReq, res *rpc.DBQueueSubscribeRes) error {
|
||||
topicRoom := ms.GetTopicRoom(req.TopicName)
|
||||
return topicRoom.TopicSubscribe(ms.GetRpcHandler(), req.SubType, int32(req.Method), int(req.FromNodeId), req.RpcMethod, req.TopicName, req.CustomerId, req.StartIndex, req.OneBatchQuantity)
|
||||
return topicRoom.TopicSubscribe(ms.GetRpcHandler(), req.SubType, int32(req.Method), req.FromNodeId, req.RpcMethod, req.TopicName, req.CustomerId, req.StartIndex, req.OneBatchQuantity)
|
||||
}
|
||||
|
||||
@@ -31,7 +31,7 @@ func (ss *Subscriber) PersistTopicData(topic string, topics []TopicData, retryCo
|
||||
return ss.dataPersist.PersistTopicData(topic, topics, retryCount)
|
||||
}
|
||||
|
||||
func (ss *Subscriber) TopicSubscribe(rpcHandler rpc.IRpcHandler, subScribeType rpc.SubscribeType, subscribeMethod SubscribeMethod, fromNodeId int, callBackRpcMethod string, topic string, customerId string, StartIndex uint64, oneBatchQuantity int32) error {
|
||||
func (ss *Subscriber) TopicSubscribe(rpcHandler rpc.IRpcHandler, subScribeType rpc.SubscribeType, subscribeMethod SubscribeMethod, fromNodeId string, callBackRpcMethod string, topic string, customerId string, StartIndex uint64, oneBatchQuantity int32) error {
|
||||
//取消订阅时
|
||||
if subScribeType == rpc.SubscribeType_Unsubscribe {
|
||||
ss.UnSubscribe(customerId)
|
||||
|
||||
@@ -263,7 +263,7 @@ func (mp *MongoPersist) JugeTimeoutSave() bool{
|
||||
|
||||
func (mp *MongoPersist) persistCoroutine(){
|
||||
defer mp.waitGroup.Done()
|
||||
for atomic.LoadInt32(&mp.stop)==0 || mp.hasPersistData(){
|
||||
for atomic.LoadInt32(&mp.stop)==0 {
|
||||
//间隔时间sleep
|
||||
time.Sleep(time.Second*1)
|
||||
|
||||
|
||||
@@ -129,7 +129,7 @@ func (slf *WSClient) Run() {
|
||||
for{
|
||||
bytes,err := slf.wsConn.ReadMsg()
|
||||
if err != nil {
|
||||
log.Debug("read client id %d is error:%+v",slf.id,err)
|
||||
log.Debug("read client id %s is error:%+v",slf.id,err)
|
||||
break
|
||||
}
|
||||
data,err:=slf.wsService.process.Unmarshal(slf.id,bytes)
|
||||
@@ -153,7 +153,7 @@ func (ws *WSService) SendMsg(clientid string,msg interface{}) error{
|
||||
client,ok := ws.mapClient[clientid]
|
||||
if ok == false{
|
||||
ws.mapClientLocker.Unlock()
|
||||
return fmt.Errorf("client %d is disconnect!",clientid)
|
||||
return fmt.Errorf("client %s is disconnect!",clientid)
|
||||
}
|
||||
|
||||
ws.mapClientLocker.Unlock()
|
||||
|
||||
Reference in New Issue
Block a user