diff --git a/sysservice/messagequeueservice/CustomerSubscriber.go b/sysservice/messagequeueservice/CustomerSubscriber.go index 31f18ba..f7a4deb 100644 --- a/sysservice/messagequeueservice/CustomerSubscriber.go +++ b/sysservice/messagequeueservice/CustomerSubscriber.go @@ -156,7 +156,7 @@ func (cs *CustomerSubscriber) SubscribeRun() { func (cs *CustomerSubscriber) subscribe() bool { //先从内存中查找 - topicData, ret := cs.subscriber.queue.FindData(cs.StartIndex, cs.oneBatchQuantity) + topicData, ret := cs.subscriber.queue.FindData(cs.StartIndex+1, cs.oneBatchQuantity) if ret == true { cs.publishToCustomer(topicData) return true diff --git a/sysservice/messagequeueservice/MemoryQueue.go b/sysservice/messagequeueservice/MemoryQueue.go index 676f8c9..e533489 100644 --- a/sysservice/messagequeueservice/MemoryQueue.go +++ b/sysservice/messagequeueservice/MemoryQueue.go @@ -49,13 +49,22 @@ func (mq *MemoryQueue) findData(startPos int32, startIndex uint64, limit int32) if findStartPos <= mq.tail { findEndPos = mq.tail + 1 } else { - findEndPos = int32(cap(mq.topicQueue)) + findEndPos = int32(len(mq.topicQueue)) + } + + if findStartPos >= findEndPos { + return nil, false + } + + // 要取的Seq 比内存中最小的数据的Seq还小,那么需要返回错误 + if mq.topicQueue[findStartPos].Seq > startIndex { + return nil, false } //二分查找位置 pos := int32(algorithms.BiSearch(mq.topicQueue[findStartPos:findEndPos], startIndex, 1)) if pos == -1 { - return nil, true + return nil, false } pos += findStartPos @@ -76,22 +85,17 @@ func (mq *MemoryQueue) FindData(startIndex uint64, limit int32) ([]TopicData, bo //队列为空时,应该从数据库查找 if mq.head == mq.tail { return nil, false - } - - /* - //先判断startIndex是否比第一个元素要大 - headTopic := (mq.head + 1) % int32(len(mq.topicQueue)) - //此时需要从持久化数据中取 - if startIndex+1 > mq.topicQueue[headTopic].Seq { - return nil, false + } else if mq.head < mq.tail { + // 队列没有折叠 + return mq.findData(mq.head + 1, startIndex, limit) + } else { + // 折叠先找后面的部分 + datas,ret := mq.findData(mq.head+1, startIndex, limit) + if ret { + return datas, ret } - */ - retData, ret := mq.findData(mq.head+1, startIndex, limit) - if mq.head <= mq.tail || ret == true { - return retData, true + // 后面没找到,从前面开始找 + return mq.findData(0, startIndex, limit) } - - //如果是正常head在后,尾在前,从数组0下标开始找到tail - return mq.findData(0, startIndex, limit) } diff --git a/sysservice/messagequeueservice/MessageQueueService.go b/sysservice/messagequeueservice/MessageQueueService.go index 7f93822..2a694cf 100644 --- a/sysservice/messagequeueservice/MessageQueueService.go +++ b/sysservice/messagequeueservice/MessageQueueService.go @@ -15,7 +15,7 @@ type QueueDataPersist interface { OnExit() OnReceiveTopicData(topic string, topicData []TopicData) //当收到推送过来的数据时 OnPushTopicDataToCustomer(topic string, topicData []TopicData) //当推送数据到Customer时回调 - PersistTopicData(topic string, topicData []TopicData, retryCount int) ([]TopicData, bool) //持久化数据,失败则返回false,上层会重复尝试,直到成功,建议在函数中加入次数,超过次数则返回true + PersistTopicData(topic string, topicData []TopicData, retryCount int) ([]TopicData, []TopicData, bool) //持久化数据,失败则返回false,上层会重复尝试,直到成功,建议在函数中加入次数,超过次数则返回true FindTopicData(topic string, startIndex uint64, limit int64) []TopicData //查找数据,参数bool代表数据库查找是否成功 LoadCustomerIndex(topic string, customerId string) (uint64, bool) //false时代表获取失败,一般是读取错误,会进行重试。如果不存在时,返回(0,true) GetIndex(topicData *TopicData) uint64 //通过topic数据获取进度索引号 diff --git a/sysservice/messagequeueservice/MongoPersist.go b/sysservice/messagequeueservice/MongoPersist.go index f2f5933..0b33c30 100644 --- a/sysservice/messagequeueservice/MongoPersist.go +++ b/sysservice/messagequeueservice/MongoPersist.go @@ -157,9 +157,9 @@ func (mp *MongoPersist) IsSameDay(timestamp1 int64,timestamp2 int64) bool{ } // PersistTopicData 持久化数据 -func (mp *MongoPersist) PersistTopicData(topic string, topicData []TopicData, retryCount int) ([]TopicData, bool) { +func (mp *MongoPersist) PersistTopicData(topic string, topicData []TopicData, retryCount int) ([]TopicData, []TopicData, bool) { if len(topicData) == 0 { - return nil, true + return nil, nil,true } preDate := topicData[0].Seq >> 32 @@ -176,11 +176,11 @@ func (mp *MongoPersist) PersistTopicData(topic string, topicData []TopicData, re ret := mp.persistTopicData(collectName, topicData[:findPos], retryCount) //如果失败,下次重试 if ret == false { - return nil, false + return nil, nil, false } //如果成功 - return topicData[findPos:len(topicData)], true + return topicData[findPos:len(topicData)], topicData[0:findPos], true } // FindTopicData 查找数据 diff --git a/sysservice/messagequeueservice/Subscriber.go b/sysservice/messagequeueservice/Subscriber.go index d394c23..6dcc0a4 100644 --- a/sysservice/messagequeueservice/Subscriber.go +++ b/sysservice/messagequeueservice/Subscriber.go @@ -27,7 +27,7 @@ func (ss *Subscriber) PushTopicDataToQueue(topic string, topics []TopicData) { } } -func (ss *Subscriber) PersistTopicData(topic string, topics []TopicData, retryCount int) ([]TopicData, bool) { +func (ss *Subscriber) PersistTopicData(topic string, topics []TopicData, retryCount int) ([]TopicData, []TopicData, bool) { return ss.dataPersist.PersistTopicData(topic, topics, retryCount) } diff --git a/sysservice/messagequeueservice/TopicRoom.go b/sysservice/messagequeueservice/TopicRoom.go index 918380d..3975927 100644 --- a/sysservice/messagequeueservice/TopicRoom.go +++ b/sysservice/messagequeueservice/TopicRoom.go @@ -113,25 +113,28 @@ func (tr *TopicRoom) topicRoomRun() { } //如果落地失败,最大重试maxTryPersistNum次数 - var ret bool - for j := 0; j < maxTryPersistNum; { + for retryCount := 0; retryCount < maxTryPersistNum; { //持久化处理 - stagingBuff, ret = tr.PersistTopicData(tr.topic, stagingBuff, j+1) - //如果存档成功,并且有后续批次,则继续存档 - if ret == true && len(stagingBuff) > 0 { - //二次存档不计次数 - continue - } - - //计数增加一次,并且等待100ms,继续重试 - j += 1 - if ret == false { + stagingBuff, savedBuff, ret := tr.PersistTopicData(tr.topic, stagingBuff, retryCount+1) + + if ret == true { + // 1. 把成功存储的数据放入内存中 + if len(savedBuff) > 0 { + tr.PushTopicDataToQueue(tr.topic, savedBuff) + } + + // 2. 如果存档成功,并且有后续批次,则继续存档 + if ret == true && len(stagingBuff) > 0 { + continue + } + + // 3. 成功了,跳出 + break + } else { + //计数增加一次,并且等待100ms,继续重试 + retryCount++ time.Sleep(time.Millisecond * 100) - continue } - - tr.PushTopicDataToQueue(tr.topic, stagingBuff) - break } }