新增自定义持久化的消息队列

This commit is contained in:
orgin
2022-11-15 17:09:17 +08:00
parent 80c73b0bdb
commit 68dfbc46f0
11 changed files with 3046 additions and 3 deletions

View File

@@ -82,9 +82,6 @@ func (client *Client) Connect(id int, addr string, maxRpcParamLen uint32) error
func (client *Client) startCheckRpcCallTimer() {
for {
time.Sleep(5 * time.Second)
if client.GetCloseFlag() == true {
break
}
client.checkRpcCallTimeout()
}
}
@@ -348,6 +345,19 @@ func (client *Client) GetId() int {
func (client *Client) Close(waitDone bool) {
client.TCPClient.Close(waitDone)
client.pendingLock.Lock()
for {
pElem := client.pendingTimer.Front()
if pElem == nil {
break
}
pCall := pElem.Value.(*Call)
pCall.Err = errors.New("nodeid is disconnect ")
client.makeCallFail(pCall)
}
client.pendingLock.Unlock()
}
func (client *Client) GetClientSeq() uint32 {

1777
rpc/messagequeue.pb.go Normal file

File diff suppressed because it is too large Load Diff

51
rpc/messagequeue.proto Normal file
View File

@@ -0,0 +1,51 @@
syntax = "proto3";
option go_package = ".;rpc";
message DBQueuePopReq {
string CustomerId = 1;
string QueueName = 2;
int32 PopStartPos = 3;
int32 PopNum = 4;
bytes pushData = 5;
}
message DBQueuePopRes {
string QueueName = 1;
repeated bytes pushData = 2;
}
enum SubscribeType {
Subscribe = 0;
Unsubscribe = 1;
}
enum SubscribeMethod {
Method_Custom = 0;//自定义模式以消费者设置的StartIndex开始获取或订阅
Method_Last = 1;//Last模式以该消费者上次记录的位置开始订阅
}
//订阅
message DBQueueSubscribeReq {
SubscribeType SubType = 1; //订阅类型
SubscribeMethod Method = 2; //订阅方法
string CustomerId = 3; //消费者Id
int32 FromNodeId = 4;
string RpcMethod = 5;
string TopicName = 6; //主题名称
uint64 StartIndex = 7; //开始位置 ,格式前4位是时间戳秒后面是序号。如果填0时服务自动修改成(4bit 当前时间秒)| (0000 4bit)
int32 OneBatchQuantity = 8;//订阅一次发送的数量不设置有默认值1000条
}
message DBQueueSubscribeRes {
}
message DBQueuePublishReq {
string TopicName = 1; //主是,名称,数据
repeated bytes pushData = 2;
}
message DBQueuePublishRes {
}

View File

@@ -0,0 +1,229 @@
package messagequeueservice
import (
"errors"
"fmt"
"github.com/duanhf2012/origin/cluster"
"github.com/duanhf2012/origin/log"
"github.com/duanhf2012/origin/rpc"
"github.com/duanhf2012/origin/util/coroutine"
"strings"
"sync/atomic"
"time"
)
type CustomerSubscriber struct {
rpc.IRpcHandler
topic string
subscriber *Subscriber
fromNodeId int
callBackRpcMethod string
serviceName string
StartIndex uint64
oneBatchQuantity int32
subscribeMethod SubscribeMethod
customerId string
isStop int32 //退出标记
}
const DefaultOneBatchQuantity = 1000
type SubscribeMethod = int32
const (
MethodCustom SubscribeMethod = 0 //自定义模式以消费者设置的StartIndex开始获取或订阅
MethodLast SubscribeMethod = 1 //Last模式以该消费者上次记录的位置开始订阅
)
func (cs *CustomerSubscriber) trySetSubscriberBaseInfo(rpcHandler rpc.IRpcHandler, ss *Subscriber, topic string, subscribeMethod SubscribeMethod, customerId string, fromNodeId int, callBackRpcMethod string, startIndex uint64, oneBatchQuantity int32) error {
cs.subscriber = ss
cs.fromNodeId = fromNodeId
cs.callBackRpcMethod = callBackRpcMethod
//cs.StartIndex = startIndex
cs.subscribeMethod = subscribeMethod
cs.customerId = customerId
cs.StartIndex = startIndex
cs.topic = topic
cs.IRpcHandler = rpcHandler
if oneBatchQuantity == 0 {
cs.oneBatchQuantity = DefaultOneBatchQuantity
} else {
cs.oneBatchQuantity = oneBatchQuantity
}
strRpcMethod := strings.Split(callBackRpcMethod, ".")
if len(strRpcMethod) != 2 {
err := errors.New("RpcMethod " + callBackRpcMethod + " is error")
log.SError(err.Error())
return err
}
cs.serviceName = strRpcMethod[0]
if cluster.HasService(fromNodeId, cs.serviceName) == false {
err := fmt.Errorf("nodeId %d cannot found %s", fromNodeId, cs.serviceName)
log.SError(err.Error())
return err
}
if cluster.GetCluster().IsNodeConnected(fromNodeId) == false {
err := fmt.Errorf("nodeId %d is disconnect", fromNodeId)
log.SError(err.Error())
return err
}
if startIndex == 0 {
now := time.Now()
zeroTime := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, now.Location())
//fmt.Println(zeroTime.Unix())
cs.StartIndex = uint64(zeroTime.Unix() << 32)
}
return nil
}
// 开始订阅
func (cs *CustomerSubscriber) Subscribe(rpcHandler rpc.IRpcHandler, ss *Subscriber, topic string, subscribeMethod SubscribeMethod, customerId string, fromNodeId int, callBackRpcMethod string, startIndex uint64, oneBatchQuantity int32) error {
err := cs.trySetSubscriberBaseInfo(rpcHandler, ss, topic, subscribeMethod, customerId, fromNodeId, callBackRpcMethod, startIndex, oneBatchQuantity)
if err != nil {
return err
}
cs.subscriber.queueWait.Add(1)
coroutine.GoRecover(cs.SubscribeRun, -1)
return nil
}
// 取消订阅
func (cs *CustomerSubscriber) UnSubscribe() {
atomic.StoreInt32(&cs.isStop, 1)
}
func (cs *CustomerSubscriber) LoadLastIndex() {
for {
if atomic.LoadInt32(&cs.isStop) != 0 {
log.SRelease("topic ", cs.topic, " out of subscription")
break
}
log.SRelease("customer ", cs.customerId, " start load last index ")
lastIndex, ret := cs.subscriber.dataPersist.LoadCustomerIndex(cs.topic, cs.customerId)
if ret == true {
if lastIndex > 0 {
cs.StartIndex = lastIndex
} else {
//否则直接使用客户端发回来的
}
log.SRelease("customer ", cs.customerId, " load finish,start index is ", cs.StartIndex)
break
}
log.SRelease("customer ", cs.customerId, " load last index is fail...")
time.Sleep(5 * time.Second)
}
}
func (cs *CustomerSubscriber) SubscribeRun() {
defer cs.subscriber.queueWait.Done()
log.SRelease("topic ", cs.topic, " start subscription")
//加载之前的位置
if cs.subscribeMethod == MethodLast {
cs.LoadLastIndex()
}
for {
if atomic.LoadInt32(&cs.isStop) != 0 {
log.SRelease("topic ", cs.topic, " out of subscription")
break
}
if cs.checkCustomerIsValid() == false {
break
}
//todo 检测退出
if cs.subscribe() == false {
log.SRelease("topic ", cs.topic, " out of subscription")
break
}
}
//删除订阅关系
cs.subscriber.removeCustomer(cs.customerId, cs)
log.SRelease("topic ", cs.topic, " unsubscription")
}
func (cs *CustomerSubscriber) subscribe() bool {
//先从内存中查找
topicData, ret := cs.subscriber.queue.FindData(cs.StartIndex, cs.oneBatchQuantity)
if ret == true {
cs.publishToCustomer(topicData)
return true
}
//从持久化数据中来找
topicData = cs.subscriber.dataPersist.FindTopicData(cs.topic, cs.StartIndex, int64(cs.oneBatchQuantity))
return cs.publishToCustomer(topicData)
}
func (cs *CustomerSubscriber) checkCustomerIsValid() bool {
//1.检查nodeid是否在线不在线直接取消订阅
if cluster.GetCluster().IsNodeConnected(cs.fromNodeId) == false {
return false
}
//2.验证是否有该服务,如果没有则退出
if cluster.HasService(cs.fromNodeId, cs.serviceName) == false {
return false
}
return true
}
func (cs *CustomerSubscriber) publishToCustomer(topicData []TopicData) bool {
if cs.checkCustomerIsValid() == false {
return false
}
if len(topicData) == 0 {
//没有任何数据待一秒吧
time.Sleep(time.Millisecond * 100)
return true
}
//3.发送失败重试发送
var dbQueuePublishReq rpc.DBQueuePublishReq
var dbQueuePushRes rpc.DBQueuePublishRes
dbQueuePublishReq.TopicName = cs.topic
cs.subscriber.dataPersist.OnPushTopicDataToCustomer(cs.topic, topicData)
for i := 0; i < len(topicData); i++ {
dbQueuePublishReq.PushData = append(dbQueuePublishReq.PushData, topicData[i].RawData)
}
for {
if atomic.LoadInt32(&cs.isStop) != 0 {
break
}
if cs.checkCustomerIsValid() == false {
return false
}
//推送数据
err := cs.CallNode(cs.fromNodeId, cs.callBackRpcMethod, &dbQueuePublishReq, &dbQueuePushRes)
if err != nil {
time.Sleep(time.Second * 1)
continue
}
//持久化进度
endIndex := cs.subscriber.dataPersist.GetIndex(&topicData[len(topicData)-1])
cs.StartIndex = endIndex
cs.subscriber.dataPersist.PersistIndex(cs.topic, cs.customerId, endIndex)
return true
}
return true
}

View File

@@ -0,0 +1,97 @@
package messagequeueservice
import (
"github.com/duanhf2012/origin/util/algorithms"
"sync"
)
type MemoryQueue struct {
subscriber *Subscriber
topicQueue []TopicData
head int32
tail int32
locker sync.RWMutex
}
func (mq *MemoryQueue) Init(cap int32) {
mq.topicQueue = make([]TopicData, cap+1)
}
// 从队尾Push数据
func (mq *MemoryQueue) Push(topicData *TopicData) bool {
mq.locker.Lock()
defer mq.locker.Unlock()
nextPos := (mq.tail + 1) % int32(len(mq.topicQueue))
//如果队列满了
if nextPos == mq.head {
//将对首的数据删除掉
mq.head++
mq.head = mq.head % int32(len(mq.topicQueue))
}
mq.tail = nextPos
mq.topicQueue[mq.tail] = *topicData
return true
}
func (mq *MemoryQueue) findData(startPos int32, startIndex uint64, limit int32) ([]TopicData, bool) {
//空队列,无数据
if mq.head == mq.tail {
return nil, true
}
var findStartPos int32
var findEndPos int32
findStartPos = startPos //(mq.head + 1) % cap(mq.topicQueue)
if findStartPos <= mq.tail {
findEndPos = mq.tail + 1
} else {
findEndPos = int32(cap(mq.topicQueue))
}
//二分查找位置
pos := int32(algorithms.BiSearch(mq.topicQueue[findStartPos:findEndPos], startIndex, 1))
if pos == -1 {
return nil, true
}
pos += findStartPos
//取得结束位置
endPos := limit + pos
if endPos > findEndPos {
endPos = findEndPos
}
return mq.topicQueue[pos:endPos], true
}
// FindData 返回参数[]TopicData 表示查找到的数据nil表示无数据。bool表示是否不应该在内存中来查
func (mq *MemoryQueue) FindData(startIndex uint64, limit int32) ([]TopicData, bool) {
mq.locker.RLock()
defer mq.locker.RUnlock()
//队列为空时,应该从数据库查找
if mq.head == mq.tail {
return nil, false
}
/*
//先判断startIndex是否比第一个元素要大
headTopic := (mq.head + 1) % int32(len(mq.topicQueue))
//此时需要从持久化数据中取
if startIndex+1 > mq.topicQueue[headTopic].Seq {
return nil, false
}
*/
retData, ret := mq.findData(mq.head+1, startIndex, limit)
if mq.head <= mq.tail || ret == true {
return retData, true
}
//如果是正常head在后尾在前从数组0下标开始找到tail
return mq.findData(0, startIndex, limit)
}

View File

@@ -0,0 +1,36 @@
package messagequeueservice
import (
"fmt"
"testing"
)
type In int
func (i In) GetValue() int {
return int(i)
}
func Test_BiSearch(t *testing.T) {
var memQueue MemoryQueue
memQueue.Init(5)
for i := 1; i <= 8; i++ {
memQueue.Push(&TopicData{Seq: uint64(i)})
}
startindex := uint64(0)
for {
retData, ret := memQueue.FindData(startindex+1, 10)
fmt.Println(retData, ret)
for _, d := range retData {
if d.Seq > startindex {
startindex = d.Seq
}
}
if ret == false {
break
}
}
}

View File

@@ -0,0 +1,126 @@
package messagequeueservice
import (
"errors"
"fmt"
"github.com/duanhf2012/origin/log"
"github.com/duanhf2012/origin/service"
"github.com/duanhf2012/origin/rpc"
"sync"
)
type QueueDataPersist interface {
service.IModule
OnExit()
OnReceiveTopicData(topic string, topicData []TopicData) //当收到推送过来的数据时
OnPushTopicDataToCustomer(topic string, topicData []TopicData) //当推送数据到Customer时回调
PersistTopicData(topic string, topicData []TopicData, retryCount int) ([]TopicData, bool) //持久化数据,失败则返回false上层会重复尝试直到成功建议在函数中加入次数超过次数则返回true
FindTopicData(topic string, startIndex uint64, limit int64) []TopicData //查找数据,参数bool代表数据库查找是否成功
LoadCustomerIndex(topic string, customerId string) (uint64, bool) //false时代表获取失败一般是读取错误会进行重试。如果不存在时返回(0,true)
GetIndex(topicData *TopicData) uint64 //通过topic数据获取进度索引号
PersistIndex(topic string, customerId string, index uint64) //持久化进度索引号
}
type MessageQueueService struct {
service.Service
sync.Mutex
mapTopicRoom map[string]*TopicRoom
queueWait sync.WaitGroup
dataPersist QueueDataPersist
memoryQueueLen int32
maxProcessTopicBacklogNum int32 //最大积压的数据量因为是写入到channel中然后由协程取出再持久化,不设置有默认值100000
}
func (ms *MessageQueueService) OnInit() error {
ms.mapTopicRoom = map[string]*TopicRoom{}
errC := ms.ReadCfg()
if errC != nil {
return errC
}
if ms.dataPersist == nil {
return errors.New("not setup QueueDataPersist.")
}
_, err := ms.AddModule(ms.dataPersist)
if err != nil {
return err
}
return nil
}
func (ms *MessageQueueService) ReadCfg() error {
mapDBServiceCfg, ok := ms.GetService().GetServiceCfg().(map[string]interface{})
if ok == false {
return fmt.Errorf("MessageQueueService config is error")
}
maxProcessTopicBacklogNum, ok := mapDBServiceCfg["MaxProcessTopicBacklogNum"]
if ok == false {
ms.maxProcessTopicBacklogNum = DefaultMaxTopicBacklogNum
log.SRelease("MaxProcessTopicBacklogNum config is set to the default value of ", maxProcessTopicBacklogNum)
} else {
ms.maxProcessTopicBacklogNum = int32(maxProcessTopicBacklogNum.(float64))
}
memoryQueueLen, ok := mapDBServiceCfg["MemoryQueueLen"]
if ok == false {
ms.memoryQueueLen = DefaultMemoryQueueLen
log.SRelease("MemoryQueueLen config is set to the default value of ", DefaultMemoryQueueLen)
} else {
ms.memoryQueueLen = int32(memoryQueueLen.(float64))
}
return nil
}
func (ms *MessageQueueService) Setup(dataPersist QueueDataPersist) {
ms.dataPersist = dataPersist
}
func (ms *MessageQueueService) OnRelease() {
//停止所有的TopicRoom房间
ms.Lock()
for _, room := range ms.mapTopicRoom {
room.Stop()
}
ms.Unlock()
//释放时确保所有的协程退出
ms.queueWait.Wait()
//通知持久化对象
ms.dataPersist.OnExit()
}
func (ms *MessageQueueService) GetTopicRoom(topic string) *TopicRoom {
ms.Lock()
defer ms.Unlock()
topicRoom := ms.mapTopicRoom[topic]
if topicRoom != nil {
return topicRoom
}
topicRoom = &TopicRoom{}
topicRoom.Init(ms.maxProcessTopicBacklogNum, ms.memoryQueueLen, topic, &ms.queueWait, ms.dataPersist)
ms.mapTopicRoom[topic] = topicRoom
return topicRoom
}
func (ms *MessageQueueService) RPC_Publish(inParam *rpc.DBQueuePublishReq, outParam *rpc.DBQueuePublishRes) error {
topicRoom := ms.GetTopicRoom(inParam.TopicName)
return topicRoom.Publish(inParam.PushData)
}
func (ms *MessageQueueService) RPC_Subscribe(req *rpc.DBQueueSubscribeReq, res *rpc.DBQueueSubscribeRes) error {
topicRoom := ms.GetTopicRoom(req.TopicName)
return topicRoom.TopicSubscribe(ms.GetRpcHandler(), req.SubType, int32(req.Method), int(req.FromNodeId), req.RpcMethod, req.TopicName, req.CustomerId, req.StartIndex, req.OneBatchQuantity)
}

View File

@@ -0,0 +1,358 @@
package messagequeueservice
import (
"fmt"
"github.com/duanhf2012/origin/log"
"github.com/duanhf2012/origin/service"
"github.com/duanhf2012/origin/sysmodule/mongodbmodule"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo/options"
"sunserver/common/util"
"time"
)
const MaxDays = 180
type MongoPersist struct {
service.Module
mongo mongodbmodule.MongoModule
url string //连接url
dbName string //数据库名称
retryCount int //落地数据库重试次数
topic []TopicData //用于临时缓存
}
const CustomerCollectName = "SysCustomer"
func (mp *MongoPersist) OnInit() error {
if errC := mp.ReadCfg(); errC != nil {
return errC
}
err := mp.mongo.Init(mp.url, time.Second*15)
if err != nil {
return err
}
err = mp.mongo.Start()
if err != nil {
log.SError("start dbService[", mp.dbName, "], url[", mp.url, "] init error:", err.Error())
return err
}
//添加索引
var IndexKey [][]string
var keys []string
keys = append(keys, "Customer", "Topic")
IndexKey = append(IndexKey, keys)
s := mp.mongo.TakeSession()
if err := s.EnsureUniqueIndex(mp.dbName, CustomerCollectName, IndexKey, true, true); err != nil {
log.SError("EnsureUniqueIndex is fail ", err.Error())
return err
}
return nil
}
func (mp *MongoPersist) ReadCfg() error {
mapDBServiceCfg, ok := mp.GetService().GetServiceCfg().(map[string]interface{})
if ok == false {
return fmt.Errorf("MessageQueueService config is error")
}
//parse MsgRouter
url, ok := mapDBServiceCfg["Url"]
if ok == false {
return fmt.Errorf("MessageQueueService config is error")
}
mp.url = url.(string)
dbName, ok := mapDBServiceCfg["DBName"]
if ok == false {
return fmt.Errorf("MessageQueueService config is error")
}
mp.dbName = dbName.(string)
//
goroutineNum, ok := mapDBServiceCfg["RetryCount"]
if ok == false {
return fmt.Errorf("MongoPersist config is error")
}
mp.retryCount = int(goroutineNum.(float64))
return nil
}
func (mp *MongoPersist) getTopicBuff(limit int) []TopicData {
if cap(mp.topic) < limit {
mp.topic = make([]TopicData, limit)
}
return mp.topic[:0]
}
func (mp *MongoPersist) OnExit() {
}
// OnReceiveTopicData 当收到推送过来的数据时
func (mp *MongoPersist) OnReceiveTopicData(topic string, topicData []TopicData) {
//1.收到推送过来的数据在里面插入_id字段
for i := 0; i < len(topicData); i++ {
var document bson.D
err := bson.Unmarshal(topicData[i].RawData, &document)
if err != nil {
topicData[i].RawData = nil
log.SError(topic, " data Unmarshal is fail ", err.Error())
continue
}
document = append(document, bson.E{Key: "_id", Value: topicData[i].Seq})
byteRet, err := bson.Marshal(document)
if err != nil {
topicData[i].RawData = nil
log.SError(topic, " data Marshal is fail ", err.Error())
continue
}
topicData[i].ExtendParam = document
topicData[i].RawData = byteRet
}
}
// OnPushTopicDataToCustomer 当推送数据到Customer时回调
func (mp *MongoPersist) OnPushTopicDataToCustomer(topic string, topicData []TopicData) {
}
// PersistTopicData 持久化数据
func (mp *MongoPersist) persistTopicData(collectionName string, topicData []TopicData, retryCount int) bool {
s := mp.mongo.TakeSession()
ctx, cancel := s.GetDefaultContext()
defer cancel()
var documents []interface{}
for _, tData := range topicData {
if tData.ExtendParam == nil {
continue
}
documents = append(documents, tData.ExtendParam)
}
_, err := s.Collection(mp.dbName, collectionName).InsertMany(ctx, documents)
if err != nil {
log.SError("PersistTopicData InsertMany fail,collect name is ", collectionName)
//失败最大重试数量
return retryCount >= mp.retryCount
}
//log.SRelease("+++++++++====", time.Now().UnixNano())
return true
}
// PersistTopicData 持久化数据
func (mp *MongoPersist) PersistTopicData(topic string, topicData []TopicData, retryCount int) ([]TopicData, bool) {
if len(topicData) == 0 {
return nil, true
}
preDate := topicData[0].Seq >> 32
var findPos int
for findPos = 1; findPos < len(topicData); findPos++ {
newDate := topicData[findPos].Seq >> 32
//说明换天了
if preDate != newDate {
break
}
}
collectName := fmt.Sprintf("%s_%s", topic, mp.GetDateByIndex(topicData[0].Seq))
ret := mp.persistTopicData(collectName, topicData[:findPos], retryCount)
//如果失败,下次重试
if ret == false {
return nil, false
}
//如果成功
return topicData[findPos:len(topicData)], true
}
// FindTopicData 查找数据
func (mp *MongoPersist) findTopicData(topic string, startIndex uint64, limit int64) ([]TopicData, bool) {
s := mp.mongo.TakeSession()
ctx, cancel := s.GetDefaultContext()
defer cancel()
condition := bson.D{{Key: "_id", Value: bson.D{{Key: "$gt", Value: startIndex}}}}
var findOption options.FindOptions
findOption.SetLimit(limit)
var findOptions []*options.FindOptions
findOptions = append(findOptions, &findOption)
collectName := fmt.Sprintf("%s_%s", topic, mp.GetDateByIndex(startIndex))
cursor, err := s.Collection(mp.dbName, collectName).Find(ctx, condition, findOptions...)
if err != nil || cursor.Err() != nil {
if err == nil {
err = cursor.Err()
}
if err != nil {
log.SError("find collect name ", topic, " is error:", err.Error())
return nil, false
}
return nil, false
}
var res []interface{}
ctxAll, cancelAll := s.GetDefaultContext()
defer cancelAll()
err = cursor.All(ctxAll, &res)
if err != nil {
if err != nil {
log.SError("find collect name ", topic, " is error:", err.Error())
return nil, false
}
return nil, false
}
//序列化返回
topicBuff := mp.getTopicBuff(int(limit))
for i := 0; i < len(res); i++ {
rawData, errM := bson.Marshal(res[i])
if errM != nil {
if errM != nil {
log.SError("collect name ", topic, " Marshal is error:", err.Error())
return nil, false
}
continue
}
topicBuff = append(topicBuff, TopicData{RawData: rawData})
}
return topicBuff, true
}
// FindTopicData 查找数据
func (mp *MongoPersist) FindTopicData(topic string, startIndex uint64, limit int64) []TopicData {
//某表找不到,一直往前找,找到当前置为止
for days := 1; days <= MaxDays; days++ {
//从startIndex开始一直往后查
topicData, isSucc := mp.findTopicData(topic, startIndex, limit)
//有数据或者出错时,返回
if len(topicData) > 0 || isSucc == false {
return topicData
}
//找不到数据时,判断当前日期是否一致
if mp.GetDateByIndex(startIndex) >= mp.GetNowTime() {
break
}
startIndex = mp.GetNextIndex(startIndex, days)
}
return nil
}
func (mp *MongoPersist) GetNowTime() string {
now := time.Now()
zeroTime := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, now.Location())
return zeroTime.Format("20060102")
}
func (mp *MongoPersist) GetDateByIndex(startIndex uint64) string {
startTm := int64(startIndex >> 32)
return time.Unix(startTm, 0).Format("20060102")
}
func (mp *MongoPersist) GetNextIndex(startIndex uint64, addDay int) uint64 {
startTime := time.Unix(int64(startIndex>>32), 0)
dateTime := time.Date(startTime.Year(), startTime.Month(), startTime.Day(), 0, 0, 0, 0, startTime.Location())
newDateTime := dateTime.AddDate(0, 0, addDay)
nextIndex := uint64(newDateTime.Unix()) << 32
return nextIndex
}
// LoadCustomerIndex false时代表获取失败一般是读取错误会进行重试。如果不存在时返回(0,true)
func (mp *MongoPersist) LoadCustomerIndex(topic string, customerId string) (uint64, bool) {
s := mp.mongo.TakeSession()
ctx, cancel := s.GetDefaultContext()
defer cancel()
condition := bson.D{{Key: "Customer", Value: customerId}, {Key: "Topic", Value: topic}}
cursor, err := s.Collection(mp.dbName, CustomerCollectName).Find(ctx, condition)
if err != nil {
log.SError("Load topic ", topic, " customer ", customerId, " is fail:", err.Error())
return 0, false
}
type findRes struct {
Index uint64 `bson:"Index,omitempty"`
}
var res []findRes
ctxAll, cancelAll := s.GetDefaultContext()
defer cancelAll()
err = cursor.All(ctxAll, &res)
if err != nil {
log.SError("Load topic ", topic, " customer ", customerId, " is fail:", err.Error())
return 0, false
}
if len(res) == 0 {
return 0, true
}
return res[0].Index, true
}
// GetIndex 通过topic数据获取进度索引号
func (mp *MongoPersist) GetIndex(topicData *TopicData) uint64 {
if topicData.Seq > 0 {
return topicData.Seq
}
var document bson.D
err := bson.Unmarshal(topicData.RawData, &document)
if err != nil {
log.SError("GetIndex is fail ", err.Error())
return 0
}
for _, e := range document {
if e.Key == "_id" {
errC, seq := util.ConvertToNumber[uint64](e.Value)
if errC != nil {
log.Error("value is error:%s,%+v, ", errC.Error(), e.Value)
}
return seq
}
}
return topicData.Seq
}
// PersistIndex 持久化进度索引号
func (mp *MongoPersist) PersistIndex(topic string, customerId string, index uint64) {
s := mp.mongo.TakeSession()
condition := bson.D{{Key: "Customer", Value: customerId}, {Key: "Topic", Value: topic}}
upsert := bson.M{"Customer": customerId, "Topic": topic, "Index": index}
updata := bson.M{"$set": upsert}
var UpdateOptionsOpts []*options.UpdateOptions
UpdateOptionsOpts = append(UpdateOptionsOpts, options.Update().SetUpsert(true))
ctx, cancel := s.GetDefaultContext()
defer cancel()
ret, err := s.Collection(mp.dbName, CustomerCollectName).UpdateOne(ctx, condition, updata, UpdateOptionsOpts...)
fmt.Println(ret)
if err != nil {
log.SError("PersistIndex fail :", err.Error())
}
}

View File

@@ -0,0 +1,122 @@
package messagequeueservice
import (
"fmt"
"go.mongodb.org/mongo-driver/bson"
"testing"
"time"
)
var seq uint64
var lastTime int64
func NextSeq(addDays int) uint64 {
now := time.Now().AddDate(0, 0, addDays)
nowSec := now.Unix()
if nowSec != lastTime {
seq = 0
lastTime = nowSec
}
//必需从1开始查询时seq>0
seq += 1
return uint64(nowSec)<<32 | uint64(seq)
}
func Test_MongoPersist(t *testing.T) {
//1.初始化
var mongoPersist MongoPersist
mongoPersist.url = "mongodb://admin:123456@192.168.2.15:27017/?minPoolSize=5&maxPoolSize=35&maxIdleTimeMS=30000"
mongoPersist.dbName = "MongoPersistTest"
mongoPersist.retryCount = 10
mongoPersist.OnInit()
//2.
//加载索引
index, ret := mongoPersist.LoadCustomerIndex("TestTopic", "TestCustomer")
fmt.Println(index, ret)
now := time.Now()
zeroTime := time.Date(now.Year(), now.Month(), now.Day()+1, 0, 0, 0, 0, now.Location())
//fmt.Println(zeroTime.Unix())
startIndex := uint64(zeroTime.Unix()<<32) | 1
//存储索引
mongoPersist.PersistIndex("TestTopic", "TestCustomer", startIndex)
//加载索引
index, ret = mongoPersist.LoadCustomerIndex("TestTopic", "TestCustomer")
type RowTest struct {
Name string `bson:"Name,omitempty"`
MapTest map[int]int `bson:"MapTest,omitempty"`
Message string `bson:"Message,omitempty"`
}
type RowTest2 struct {
Id uint64 `bson:"_id,omitempty"`
Name string `bson:"Name,omitempty"`
MapTest map[int]int `bson:"MapTest,omitempty"`
Message string `bson:"Message,omitempty"`
}
//存档
var findStartIndex uint64
var topicData []TopicData
for i := 1; i <= 1000; i++ {
var rowTest RowTest
rowTest.Name = fmt.Sprintf("Name_%d", i)
rowTest.MapTest = make(map[int]int, 1)
rowTest.MapTest[i] = i*1000 + i
rowTest.Message = fmt.Sprintf("xxxxxxxxxxxxxxxxxx%d", i)
byteRet, _ := bson.Marshal(rowTest)
var dataSeq uint64
if i <= 500 {
dataSeq = NextSeq(-1)
} else {
dataSeq = NextSeq(0)
}
topicData = append(topicData, TopicData{RawData: byteRet, Seq: dataSeq})
if i == 1 {
findStartIndex = topicData[0].Seq
}
}
mongoPersist.OnReceiveTopicData("TestTopic", topicData)
for {
if len(topicData) == 0 {
break
}
topicData, ret = mongoPersist.PersistTopicData("TestTopic", topicData, 1)
fmt.Println(ret)
}
//
for {
retTopicData := mongoPersist.FindTopicData("TestTopic", findStartIndex, 300)
for i, data := range retTopicData {
var rowTest RowTest2
bson.Unmarshal(data.RawData, &rowTest)
t.Log(rowTest.Name)
if i == len(retTopicData)-1 {
findStartIndex = mongoPersist.GetIndex(&data)
}
}
t.Log("..................")
if len(retTopicData) == 0 {
break
}
}
//t.Log(mongoPersist.GetIndex(&retTopicData[0]))
//t.Log(mongoPersist.GetIndex(&retTopicData[len(retTopicData)-1]))
}

View File

@@ -0,0 +1,91 @@
package messagequeueservice
import (
"github.com/duanhf2012/origin/log"
"github.com/duanhf2012/origin/rpc"
"sync"
)
// 订阅器
type Subscriber struct {
customerLocker sync.RWMutex
mapCustomer map[string]*CustomerSubscriber
queue MemoryQueue
dataPersist QueueDataPersist //对列数据处理器
queueWait *sync.WaitGroup
}
func (ss *Subscriber) Init(memoryQueueCap int32) {
ss.queue.Init(memoryQueueCap)
ss.mapCustomer = make(map[string]*CustomerSubscriber, 5)
}
func (ss *Subscriber) PushTopicDataToQueue(topic string, topics []TopicData) {
for i := 0; i < len(topics); i++ {
ss.queue.Push(&topics[i])
}
}
func (ss *Subscriber) PersistTopicData(topic string, topics []TopicData, retryCount int) ([]TopicData, bool) {
return ss.dataPersist.PersistTopicData(topic, topics, retryCount)
}
func (ss *Subscriber) TopicSubscribe(rpcHandler rpc.IRpcHandler, subScribeType rpc.SubscribeType, subscribeMethod SubscribeMethod, fromNodeId int, callBackRpcMethod string, topic string, customerId string, StartIndex uint64, oneBatchQuantity int32) error {
//取消订阅时
if subScribeType == rpc.SubscribeType_Unsubscribe {
ss.UnSubscribe(customerId)
return nil
} else {
ss.customerLocker.Lock()
customerSubscriber, ok := ss.mapCustomer[customerId]
if ok == true {
//已经订阅过,则取消订阅
customerSubscriber.UnSubscribe()
delete(ss.mapCustomer, customerId)
}
//不存在,则订阅
customerSubscriber = &CustomerSubscriber{}
ss.mapCustomer[customerId] = customerSubscriber
ss.customerLocker.Unlock()
err := customerSubscriber.Subscribe(rpcHandler, ss, topic, subscribeMethod, customerId, fromNodeId, callBackRpcMethod, StartIndex, oneBatchQuantity)
if err != nil {
return err
}
if ok == true {
log.SRelease("repeat subscription for customer ", customerId)
} else {
log.SRelease("subscription for customer ", customerId)
}
}
return nil
}
func (ss *Subscriber) UnSubscribe(customerId string) {
ss.customerLocker.RLocker()
defer ss.customerLocker.RUnlock()
customerSubscriber, ok := ss.mapCustomer[customerId]
if ok == false {
log.SWarning("failed to unsubscribe customer " + customerId)
return
}
customerSubscriber.UnSubscribe()
}
func (ss *Subscriber) removeCustomer(customerId string, cs *CustomerSubscriber) {
ss.customerLocker.Lock()
//确保删掉是当前的关系。有可能在替换订阅时将该customer替换的情况
customer, _ := ss.mapCustomer[customerId]
if customer == cs {
delete(ss.mapCustomer, customerId)
}
ss.customerLocker.Unlock()
}

View File

@@ -0,0 +1,146 @@
package messagequeueservice
import (
"errors"
"github.com/duanhf2012/origin/log"
"github.com/duanhf2012/origin/util/coroutine"
"sync"
"sync/atomic"
"time"
)
type TopicData struct {
Seq uint64 //序号
RawData []byte //原始数据
ExtendParam interface{} //扩展参数
}
func (t TopicData) GetValue() uint64 {
return t.Seq
}
var topicFullError = errors.New("topic room is full")
const DefaultOnceProcessTopicDataNum = 1024 //一次处理的topic数量考虑批量落地的数量
const DefaultMaxTopicBacklogNum = 100000 //处理的channel最大数量
const DefaultMemoryQueueLen = 50000 //内存的最大长度
const maxTryPersistNum = 3000 //最大重试次数,约>5分钟
type TopicRoom struct {
topic string //主题名称
channelTopic chan TopicData //主题push过来待处理的数据
Subscriber //订阅器
//序号生成
seq uint32
lastTime int64
//onceProcessTopicDataNum int //一次处理的订阅数据最大量方便订阅器Subscriber和QueueDataProcessor批量处理
StagingBuff []TopicData
isStop int32
}
// maxProcessTopicBacklogNum:主题最大积压数量
func (tr *TopicRoom) Init(maxTopicBacklogNum int32, memoryQueueLen int32, topic string, queueWait *sync.WaitGroup, dataPersist QueueDataPersist) {
if maxTopicBacklogNum == 0 {
maxTopicBacklogNum = DefaultMaxTopicBacklogNum
}
tr.channelTopic = make(chan TopicData, maxTopicBacklogNum)
tr.topic = topic
tr.dataPersist = dataPersist
tr.queueWait = queueWait
tr.StagingBuff = make([]TopicData, DefaultOnceProcessTopicDataNum)
tr.queueWait.Add(1)
tr.Subscriber.Init(memoryQueueLen)
coroutine.GoRecover(tr.topicRoomRun, -1)
}
func (tr *TopicRoom) Publish(data [][]byte) error {
if len(tr.channelTopic)+len(data) > cap(tr.channelTopic) {
return topicFullError
}
//生成有序序号
for _, rawData := range data {
tr.channelTopic <- TopicData{RawData: rawData, Seq: tr.NextSeq()}
}
return nil
}
func (tr *TopicRoom) NextSeq() uint64 {
now := time.Now()
nowSec := now.Unix()
if nowSec != tr.lastTime {
tr.seq = 0
tr.lastTime = nowSec
}
//必需从1开始查询时seq>0
tr.seq += 1
return uint64(nowSec)<<32 | uint64(tr.seq)
}
func (tr *TopicRoom) Stop() {
atomic.StoreInt32(&tr.isStop, 1)
}
func (tr *TopicRoom) topicRoomRun() {
defer tr.queueWait.Done()
log.SRelease("topic room ", tr.topic, " is running..")
for {
if atomic.LoadInt32(&tr.isStop) != 0 {
break
}
stagingBuff := tr.StagingBuff[:0]
for i := 0; i < len(tr.channelTopic) && i < DefaultOnceProcessTopicDataNum; i++ {
topicData := <-tr.channelTopic
stagingBuff = append(stagingBuff, topicData)
}
tr.Subscriber.dataPersist.OnReceiveTopicData(tr.topic, stagingBuff)
//持久化与放内存
if len(stagingBuff) == 0 {
time.Sleep(time.Millisecond * 50)
continue
}
//如果落地失败最大重试maxTryPersistNum次数
var ret bool
for j := 0; j < maxTryPersistNum; {
//持久化处理
stagingBuff, ret = tr.PersistTopicData(tr.topic, stagingBuff, j+1)
//如果存档成功,并且有后续批次,则继续存档
if ret == true && len(stagingBuff) > 0 {
//二次存档不计次数
continue
}
//计数增加一次并且等待100ms继续重试
j += 1
if ret == false {
time.Sleep(time.Millisecond * 100)
continue
}
tr.PushTopicDataToQueue(tr.topic, stagingBuff)
break
}
}
//将所有的订阅取消
tr.customerLocker.Lock()
for _, customer := range tr.mapCustomer {
customer.UnSubscribe()
}
tr.customerLocker.Unlock()
log.SRelease("topic room ", tr.topic, " is stop")
}