mirror of
https://github.com/duanhf2012/origin.git
synced 2026-02-03 22:45:13 +08:00
150 lines
3.7 KiB
Go
150 lines
3.7 KiB
Go
package messagequeueservice
|
||
|
||
import (
|
||
"errors"
|
||
"github.com/duanhf2012/origin/v2/log"
|
||
"github.com/duanhf2012/origin/v2/util/coroutine"
|
||
"sync"
|
||
"sync/atomic"
|
||
"time"
|
||
)
|
||
|
||
type TopicData struct {
|
||
Seq uint64 //序号
|
||
RawData []byte //原始数据
|
||
|
||
ExtendParam interface{} //扩展参数
|
||
}
|
||
|
||
func (t TopicData) GetValue() uint64 {
|
||
return t.Seq
|
||
}
|
||
|
||
var topicFullError = errors.New("topic room is full")
|
||
|
||
const DefaultOnceProcessTopicDataNum = 1024 //一次处理的topic数量,考虑批量落地的数量
|
||
const DefaultMaxTopicBacklogNum = 100000 //处理的channel最大数量
|
||
const DefaultMemoryQueueLen = 50000 //内存的最大长度
|
||
const maxTryPersistNum = 3000 //最大重试次数,约>5分钟
|
||
|
||
type TopicRoom struct {
|
||
topic string //主题名称
|
||
channelTopic chan TopicData //主题push过来待处理的数据
|
||
|
||
Subscriber //订阅器
|
||
|
||
//序号生成
|
||
seq uint32
|
||
lastTime int64
|
||
|
||
//onceProcessTopicDataNum int //一次处理的订阅数据最大量,方便订阅器Subscriber和QueueDataProcessor批量处理
|
||
StagingBuff []TopicData
|
||
|
||
isStop int32
|
||
}
|
||
|
||
// Init maxProcessTopicBacklogNum:主题最大积压数量
|
||
func (tr *TopicRoom) Init(maxTopicBacklogNum int32, memoryQueueLen int32, topic string, queueWait *sync.WaitGroup, dataPersist QueueDataPersist) {
|
||
if maxTopicBacklogNum == 0 {
|
||
maxTopicBacklogNum = DefaultMaxTopicBacklogNum
|
||
}
|
||
|
||
tr.channelTopic = make(chan TopicData, maxTopicBacklogNum)
|
||
tr.topic = topic
|
||
tr.dataPersist = dataPersist
|
||
tr.queueWait = queueWait
|
||
tr.StagingBuff = make([]TopicData, DefaultOnceProcessTopicDataNum)
|
||
tr.queueWait.Add(1)
|
||
tr.Subscriber.Init(memoryQueueLen)
|
||
coroutine.GoRecover(tr.topicRoomRun, -1)
|
||
}
|
||
|
||
func (tr *TopicRoom) Publish(data [][]byte) error {
|
||
if len(tr.channelTopic)+len(data) > cap(tr.channelTopic) {
|
||
return topicFullError
|
||
}
|
||
|
||
//生成有序序号
|
||
for _, rawData := range data {
|
||
tr.channelTopic <- TopicData{RawData: rawData, Seq: tr.NextSeq()}
|
||
}
|
||
|
||
return nil
|
||
}
|
||
|
||
func (tr *TopicRoom) NextSeq() uint64 {
|
||
now := time.Now()
|
||
|
||
nowSec := now.Unix()
|
||
if nowSec != tr.lastTime {
|
||
tr.seq = 0
|
||
tr.lastTime = nowSec
|
||
}
|
||
//必需从1开始,查询时seq>0
|
||
tr.seq += 1
|
||
|
||
return uint64(nowSec)<<32 | uint64(tr.seq)
|
||
}
|
||
|
||
func (tr *TopicRoom) Stop() {
|
||
atomic.StoreInt32(&tr.isStop, 1)
|
||
}
|
||
|
||
func (tr *TopicRoom) topicRoomRun() {
|
||
defer tr.queueWait.Done()
|
||
|
||
log.SInfo("topic room ", tr.topic, " is running..")
|
||
for {
|
||
if atomic.LoadInt32(&tr.isStop) != 0 {
|
||
break
|
||
}
|
||
stagingBuff := tr.StagingBuff[:0]
|
||
|
||
for i := 0; i < len(tr.channelTopic) && i < DefaultOnceProcessTopicDataNum; i++ {
|
||
topicData := <-tr.channelTopic
|
||
|
||
stagingBuff = append(stagingBuff, topicData)
|
||
}
|
||
tr.Subscriber.dataPersist.OnReceiveTopicData(tr.topic, stagingBuff)
|
||
//持久化与放内存
|
||
if len(stagingBuff) == 0 {
|
||
time.Sleep(time.Millisecond * 50)
|
||
continue
|
||
}
|
||
|
||
//如果落地失败,最大重试maxTryPersistNum次数
|
||
for retryCount := 0; retryCount < maxTryPersistNum; {
|
||
//持久化处理
|
||
stagingBuff, savedBuff, ret := tr.PersistTopicData(tr.topic, stagingBuff, retryCount+1)
|
||
|
||
if ret == true {
|
||
// 1. 把成功存储的数据放入内存中
|
||
if len(savedBuff) > 0 {
|
||
tr.PushTopicDataToQueue(tr.topic, savedBuff)
|
||
}
|
||
|
||
// 2. 如果存档成功,并且有后续批次,则继续存档
|
||
if len(stagingBuff) > 0 {
|
||
continue
|
||
}
|
||
|
||
// 3. 成功了,跳出
|
||
break
|
||
} else {
|
||
//计数增加一次,并且等待100ms,继续重试
|
||
retryCount++
|
||
time.Sleep(time.Millisecond * 100)
|
||
}
|
||
}
|
||
}
|
||
|
||
//将所有的订阅取消
|
||
tr.customerLocker.Lock()
|
||
for _, customer := range tr.mapCustomer {
|
||
customer.UnSubscribe()
|
||
}
|
||
tr.customerLocker.Unlock()
|
||
|
||
log.SInfo("topic room ", tr.topic, " is stop")
|
||
}
|