优化消息队列服务

This commit is contained in:
orgin
2022-11-26 16:15:41 +08:00
parent 11b78f84c4
commit 1fcd870f1d
6 changed files with 47 additions and 40 deletions

View File

@@ -156,7 +156,7 @@ func (cs *CustomerSubscriber) SubscribeRun() {
func (cs *CustomerSubscriber) subscribe() bool {
//先从内存中查找
topicData, ret := cs.subscriber.queue.FindData(cs.StartIndex, cs.oneBatchQuantity)
topicData, ret := cs.subscriber.queue.FindData(cs.StartIndex+1, cs.oneBatchQuantity)
if ret == true {
cs.publishToCustomer(topicData)
return true

View File

@@ -49,13 +49,22 @@ func (mq *MemoryQueue) findData(startPos int32, startIndex uint64, limit int32)
if findStartPos <= mq.tail {
findEndPos = mq.tail + 1
} else {
findEndPos = int32(cap(mq.topicQueue))
findEndPos = int32(len(mq.topicQueue))
}
if findStartPos >= findEndPos {
return nil, false
}
// 要取的Seq 比内存中最小的数据的Seq还小那么需要返回错误
if mq.topicQueue[findStartPos].Seq > startIndex {
return nil, false
}
//二分查找位置
pos := int32(algorithms.BiSearch(mq.topicQueue[findStartPos:findEndPos], startIndex, 1))
if pos == -1 {
return nil, true
return nil, false
}
pos += findStartPos
@@ -76,22 +85,17 @@ func (mq *MemoryQueue) FindData(startIndex uint64, limit int32) ([]TopicData, bo
//队列为空时,应该从数据库查找
if mq.head == mq.tail {
return nil, false
}
/*
//先判断startIndex是否比第一个元素要大
headTopic := (mq.head + 1) % int32(len(mq.topicQueue))
//此时需要从持久化数据中取
if startIndex+1 > mq.topicQueue[headTopic].Seq {
return nil, false
} else if mq.head < mq.tail {
// 队列没有折叠
return mq.findData(mq.head + 1, startIndex, limit)
} else {
// 折叠先找后面的部分
datas,ret := mq.findData(mq.head+1, startIndex, limit)
if ret {
return datas, ret
}
*/
retData, ret := mq.findData(mq.head+1, startIndex, limit)
if mq.head <= mq.tail || ret == true {
return retData, true
// 后面没找到,从前面开始找
return mq.findData(0, startIndex, limit)
}
//如果是正常head在后尾在前从数组0下标开始找到tail
return mq.findData(0, startIndex, limit)
}

View File

@@ -15,7 +15,7 @@ type QueueDataPersist interface {
OnExit()
OnReceiveTopicData(topic string, topicData []TopicData) //当收到推送过来的数据时
OnPushTopicDataToCustomer(topic string, topicData []TopicData) //当推送数据到Customer时回调
PersistTopicData(topic string, topicData []TopicData, retryCount int) ([]TopicData, bool) //持久化数据,失败则返回false上层会重复尝试直到成功建议在函数中加入次数超过次数则返回true
PersistTopicData(topic string, topicData []TopicData, retryCount int) ([]TopicData, []TopicData, bool) //持久化数据,失败则返回false上层会重复尝试直到成功建议在函数中加入次数超过次数则返回true
FindTopicData(topic string, startIndex uint64, limit int64) []TopicData //查找数据,参数bool代表数据库查找是否成功
LoadCustomerIndex(topic string, customerId string) (uint64, bool) //false时代表获取失败一般是读取错误会进行重试。如果不存在时返回(0,true)
GetIndex(topicData *TopicData) uint64 //通过topic数据获取进度索引号

View File

@@ -157,9 +157,9 @@ func (mp *MongoPersist) IsSameDay(timestamp1 int64,timestamp2 int64) bool{
}
// PersistTopicData 持久化数据
func (mp *MongoPersist) PersistTopicData(topic string, topicData []TopicData, retryCount int) ([]TopicData, bool) {
func (mp *MongoPersist) PersistTopicData(topic string, topicData []TopicData, retryCount int) ([]TopicData, []TopicData, bool) {
if len(topicData) == 0 {
return nil, true
return nil, nil,true
}
preDate := topicData[0].Seq >> 32
@@ -176,11 +176,11 @@ func (mp *MongoPersist) PersistTopicData(topic string, topicData []TopicData, re
ret := mp.persistTopicData(collectName, topicData[:findPos], retryCount)
//如果失败,下次重试
if ret == false {
return nil, false
return nil, nil, false
}
//如果成功
return topicData[findPos:len(topicData)], true
return topicData[findPos:len(topicData)], topicData[0:findPos], true
}
// FindTopicData 查找数据

View File

@@ -27,7 +27,7 @@ func (ss *Subscriber) PushTopicDataToQueue(topic string, topics []TopicData) {
}
}
func (ss *Subscriber) PersistTopicData(topic string, topics []TopicData, retryCount int) ([]TopicData, bool) {
func (ss *Subscriber) PersistTopicData(topic string, topics []TopicData, retryCount int) ([]TopicData, []TopicData, bool) {
return ss.dataPersist.PersistTopicData(topic, topics, retryCount)
}

View File

@@ -113,25 +113,28 @@ func (tr *TopicRoom) topicRoomRun() {
}
//如果落地失败最大重试maxTryPersistNum次数
var ret bool
for j := 0; j < maxTryPersistNum; {
for retryCount := 0; retryCount < maxTryPersistNum; {
//持久化处理
stagingBuff, ret = tr.PersistTopicData(tr.topic, stagingBuff, j+1)
//如果存档成功,并且有后续批次,则继续存档
if ret == true && len(stagingBuff) > 0 {
//二次存档不计次数
continue
}
//计数增加一次并且等待100ms继续重试
j += 1
if ret == false {
stagingBuff, savedBuff, ret := tr.PersistTopicData(tr.topic, stagingBuff, retryCount+1)
if ret == true {
// 1. 把成功存储的数据放入内存中
if len(savedBuff) > 0 {
tr.PushTopicDataToQueue(tr.topic, savedBuff)
}
// 2. 如果存档成功,并且有后续批次,则继续存档
if ret == true && len(stagingBuff) > 0 {
continue
}
// 3. 成功了,跳出
break
} else {
//计数增加一次并且等待100ms继续重试
retryCount++
time.Sleep(time.Millisecond * 100)
continue
}
tr.PushTopicDataToQueue(tr.topic, stagingBuff)
break
}
}