From d28094eefacc77d430f97440e16819afd402cc40 Mon Sep 17 00:00:00 2001 From: orgin Date: Wed, 16 Nov 2022 11:19:36 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E6=B6=88=E6=81=AF=E9=98=9F?= =?UTF-8?q?=E5=88=97=E5=8C=96=E7=9A=84=E8=B7=A8=E5=A4=A9=E7=9A=84=E6=95=B0?= =?UTF-8?q?=E6=8D=AE=E5=BB=B6=E8=BF=9F=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../messagequeueservice/MongoPersist.go | 50 +++++++++++++++++-- 1 file changed, 47 insertions(+), 3 deletions(-) diff --git a/sysservice/messagequeueservice/MongoPersist.go b/sysservice/messagequeueservice/MongoPersist.go index 25914af..93f0419 100644 --- a/sysservice/messagequeueservice/MongoPersist.go +++ b/sysservice/messagequeueservice/MongoPersist.go @@ -182,8 +182,7 @@ func (mp *MongoPersist) PersistTopicData(topic string, topicData []TopicData, re // FindTopicData 查找数据 func (mp *MongoPersist) findTopicData(topic string, startIndex uint64, limit int64) ([]TopicData, bool) { s := mp.mongo.TakeSession() - ctx, cancel := s.GetDefaultContext() - defer cancel() + condition := bson.D{{Key: "_id", Value: bson.D{{Key: "$gt", Value: startIndex}}}} @@ -192,6 +191,8 @@ func (mp *MongoPersist) findTopicData(topic string, startIndex uint64, limit int var findOptions []*options.FindOptions findOptions = append(findOptions, &findOption) + ctx, cancel := s.GetDefaultContext() + defer cancel() collectName := fmt.Sprintf("%s_%s", topic, mp.GetDateByIndex(startIndex)) cursor, err := s.Collection(mp.dbName, collectName).Find(ctx, condition, findOptions...) if err != nil || cursor.Err() != nil { @@ -237,13 +238,51 @@ func (mp *MongoPersist) findTopicData(topic string, startIndex uint64, limit int return topicBuff, true } +func (mp *MongoPersist) IsYesterday(startIndex uint64) (bool,string){ + timeStamp := int64(startIndex>>32) + + startTime := time.Unix(timeStamp, 0).AddDate(0,0,1) + nowTm := time.Now() + + return startTime.Year() == nowTm.Year() && startTime.Month() == nowTm.Month()&&startTime.Day() == nowTm.Day(),nowTm.Format("20060102") +} + +func (mp *MongoPersist) getCollectCount(topic string,today string) (int64 ,error){ + s := mp.mongo.TakeSession() + ctx, cancel := s.GetDefaultContext() + defer cancel() + collectName := fmt.Sprintf("%s_%s", topic, today) + count, err := s.Collection(mp.dbName, collectName).EstimatedDocumentCount(ctx) + return count,err +} + // FindTopicData 查找数据 func (mp *MongoPersist) FindTopicData(topic string, startIndex uint64, limit int64) []TopicData { //某表找不到,一直往前找,找到当前置为止 for days := 1; days <= MaxDays; days++ { + //是否可以跳天 + //在换天时,如果记录在其他协程还没insert完成时,因为没查到直接跳到第二天,导致漏掉数据 + //解决的方法是在换天时,先判断新换的当天有没有记录,有记录时,说明昨天的数据已经插入完成,才可以跳天查询 + IsJumpDays := true + + //如果是昨天,先判断当天有没有表数据 + bYesterday,strToday := mp.IsYesterday(startIndex) + if bYesterday { + count,err := mp.getCollectCount(topic,strToday) + if err != nil { + //失败时,重新开始 + log.SError("getCollectCount ",topic,"_",strToday," is fail:",err.Error()) + return nil + } + //当天没有记录,则不能跳表,有可能当天还有数据 + if count == 0 { + IsJumpDays = false + } + } + //从startIndex开始一直往后查 topicData, isSucc := mp.findTopicData(topic, startIndex, limit) - //有数据或者出错时,返回 + //有数据或者数据库出错时返回,返回后,会进行下一轮的查询遍历 if len(topicData) > 0 || isSucc == false { return topicData } @@ -253,6 +292,11 @@ func (mp *MongoPersist) FindTopicData(topic string, startIndex uint64, limit int break } + //不允许跳天,则直接跳出 + if IsJumpDays == false { + break + } + startIndex = mp.GetNextIndex(startIndex, days) }