mirror of
https://github.com/duanhf2012/origin.git
synced 2026-03-06 14:17:31 +08:00
优化消息队列化的跨天的数据延迟问题
This commit is contained in:
@@ -182,8 +182,7 @@ func (mp *MongoPersist) PersistTopicData(topic string, topicData []TopicData, re
|
|||||||
// FindTopicData 查找数据
|
// FindTopicData 查找数据
|
||||||
func (mp *MongoPersist) findTopicData(topic string, startIndex uint64, limit int64) ([]TopicData, bool) {
|
func (mp *MongoPersist) findTopicData(topic string, startIndex uint64, limit int64) ([]TopicData, bool) {
|
||||||
s := mp.mongo.TakeSession()
|
s := mp.mongo.TakeSession()
|
||||||
ctx, cancel := s.GetDefaultContext()
|
|
||||||
defer cancel()
|
|
||||||
|
|
||||||
condition := bson.D{{Key: "_id", Value: bson.D{{Key: "$gt", Value: startIndex}}}}
|
condition := bson.D{{Key: "_id", Value: bson.D{{Key: "$gt", Value: startIndex}}}}
|
||||||
|
|
||||||
@@ -192,6 +191,8 @@ func (mp *MongoPersist) findTopicData(topic string, startIndex uint64, limit int
|
|||||||
var findOptions []*options.FindOptions
|
var findOptions []*options.FindOptions
|
||||||
findOptions = append(findOptions, &findOption)
|
findOptions = append(findOptions, &findOption)
|
||||||
|
|
||||||
|
ctx, cancel := s.GetDefaultContext()
|
||||||
|
defer cancel()
|
||||||
collectName := fmt.Sprintf("%s_%s", topic, mp.GetDateByIndex(startIndex))
|
collectName := fmt.Sprintf("%s_%s", topic, mp.GetDateByIndex(startIndex))
|
||||||
cursor, err := s.Collection(mp.dbName, collectName).Find(ctx, condition, findOptions...)
|
cursor, err := s.Collection(mp.dbName, collectName).Find(ctx, condition, findOptions...)
|
||||||
if err != nil || cursor.Err() != nil {
|
if err != nil || cursor.Err() != nil {
|
||||||
@@ -237,13 +238,51 @@ func (mp *MongoPersist) findTopicData(topic string, startIndex uint64, limit int
|
|||||||
return topicBuff, true
|
return topicBuff, true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (mp *MongoPersist) IsYesterday(startIndex uint64) (bool,string){
|
||||||
|
timeStamp := int64(startIndex>>32)
|
||||||
|
|
||||||
|
startTime := time.Unix(timeStamp, 0).AddDate(0,0,1)
|
||||||
|
nowTm := time.Now()
|
||||||
|
|
||||||
|
return startTime.Year() == nowTm.Year() && startTime.Month() == nowTm.Month()&&startTime.Day() == nowTm.Day(),nowTm.Format("20060102")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mp *MongoPersist) getCollectCount(topic string,today string) (int64 ,error){
|
||||||
|
s := mp.mongo.TakeSession()
|
||||||
|
ctx, cancel := s.GetDefaultContext()
|
||||||
|
defer cancel()
|
||||||
|
collectName := fmt.Sprintf("%s_%s", topic, today)
|
||||||
|
count, err := s.Collection(mp.dbName, collectName).EstimatedDocumentCount(ctx)
|
||||||
|
return count,err
|
||||||
|
}
|
||||||
|
|
||||||
// FindTopicData 查找数据
|
// FindTopicData 查找数据
|
||||||
func (mp *MongoPersist) FindTopicData(topic string, startIndex uint64, limit int64) []TopicData {
|
func (mp *MongoPersist) FindTopicData(topic string, startIndex uint64, limit int64) []TopicData {
|
||||||
//某表找不到,一直往前找,找到当前置为止
|
//某表找不到,一直往前找,找到当前置为止
|
||||||
for days := 1; days <= MaxDays; days++ {
|
for days := 1; days <= MaxDays; days++ {
|
||||||
|
//是否可以跳天
|
||||||
|
//在换天时,如果记录在其他协程还没insert完成时,因为没查到直接跳到第二天,导致漏掉数据
|
||||||
|
//解决的方法是在换天时,先判断新换的当天有没有记录,有记录时,说明昨天的数据已经插入完成,才可以跳天查询
|
||||||
|
IsJumpDays := true
|
||||||
|
|
||||||
|
//如果是昨天,先判断当天有没有表数据
|
||||||
|
bYesterday,strToday := mp.IsYesterday(startIndex)
|
||||||
|
if bYesterday {
|
||||||
|
count,err := mp.getCollectCount(topic,strToday)
|
||||||
|
if err != nil {
|
||||||
|
//失败时,重新开始
|
||||||
|
log.SError("getCollectCount ",topic,"_",strToday," is fail:",err.Error())
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
//当天没有记录,则不能跳表,有可能当天还有数据
|
||||||
|
if count == 0 {
|
||||||
|
IsJumpDays = false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
//从startIndex开始一直往后查
|
//从startIndex开始一直往后查
|
||||||
topicData, isSucc := mp.findTopicData(topic, startIndex, limit)
|
topicData, isSucc := mp.findTopicData(topic, startIndex, limit)
|
||||||
//有数据或者出错时,返回
|
//有数据或者数据库出错时返回,返回后,会进行下一轮的查询遍历
|
||||||
if len(topicData) > 0 || isSucc == false {
|
if len(topicData) > 0 || isSucc == false {
|
||||||
return topicData
|
return topicData
|
||||||
}
|
}
|
||||||
@@ -253,6 +292,11 @@ func (mp *MongoPersist) FindTopicData(topic string, startIndex uint64, limit int
|
|||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//不允许跳天,则直接跳出
|
||||||
|
if IsJumpDays == false {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
startIndex = mp.GetNextIndex(startIndex, days)
|
startIndex = mp.GetNextIndex(startIndex, days)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user