From 3833884777792e7e821bea049e600aa3deeeaaf3 Mon Sep 17 00:00:00 2001 From: origin Date: Thu, 29 Dec 2022 14:54:54 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E6=B6=88=E6=81=AF=E9=98=9F?= =?UTF-8?q?=E5=88=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../messagequeueservice/CustomerSubscriber.go | 4 ++-- .../messagequeueservice/MessageQueueService.go | 2 +- sysservice/messagequeueservice/MongoPersist.go | 17 +++-------------- 3 files changed, 6 insertions(+), 17 deletions(-) diff --git a/sysservice/messagequeueservice/CustomerSubscriber.go b/sysservice/messagequeueservice/CustomerSubscriber.go index 2ffbffc..4a45aa1 100644 --- a/sysservice/messagequeueservice/CustomerSubscriber.go +++ b/sysservice/messagequeueservice/CustomerSubscriber.go @@ -163,9 +163,9 @@ func (cs *CustomerSubscriber) subscribe() bool { cs.publishToCustomer(topicData) return true } - + //从持久化数据中来找 - topicData = cs.subscriber.dataPersist.FindTopicData(cs.topic, cs.StartIndex, int64(cs.oneBatchQuantity)) + topicData = cs.subscriber.dataPersist.FindTopicData(cs.topic, cs.StartIndex, int64(cs.oneBatchQuantity),cs.topicCache[:0]) return cs.publishToCustomer(topicData) } diff --git a/sysservice/messagequeueservice/MessageQueueService.go b/sysservice/messagequeueservice/MessageQueueService.go index 2a694cf..77e6032 100644 --- a/sysservice/messagequeueservice/MessageQueueService.go +++ b/sysservice/messagequeueservice/MessageQueueService.go @@ -16,7 +16,7 @@ type QueueDataPersist interface { OnReceiveTopicData(topic string, topicData []TopicData) //当收到推送过来的数据时 OnPushTopicDataToCustomer(topic string, topicData []TopicData) //当推送数据到Customer时回调 PersistTopicData(topic string, topicData []TopicData, retryCount int) ([]TopicData, []TopicData, bool) //持久化数据,失败则返回false,上层会重复尝试,直到成功,建议在函数中加入次数,超过次数则返回true - FindTopicData(topic string, startIndex uint64, limit int64) []TopicData //查找数据,参数bool代表数据库查找是否成功 + FindTopicData(topic string, startIndex uint64, limit int64, topicBuff []TopicData) []TopicData //查找数据,参数bool代表数据库查找是否成功 LoadCustomerIndex(topic string, customerId string) (uint64, bool) //false时代表获取失败,一般是读取错误,会进行重试。如果不存在时,返回(0,true) GetIndex(topicData *TopicData) uint64 //通过topic数据获取进度索引号 PersistIndex(topic string, customerId string, index uint64) //持久化进度索引号 diff --git a/sysservice/messagequeueservice/MongoPersist.go b/sysservice/messagequeueservice/MongoPersist.go index 062d578..c488f11 100644 --- a/sysservice/messagequeueservice/MongoPersist.go +++ b/sysservice/messagequeueservice/MongoPersist.go @@ -20,8 +20,6 @@ type MongoPersist struct { url string //连接url dbName string //数据库名称 retryCount int //落地数据库重试次数 - - topic []TopicData //用于临时缓存 } const CustomerCollectName = "SysCustomer" @@ -85,14 +83,6 @@ func (mp *MongoPersist) ReadCfg() error { return nil } -func (mp *MongoPersist) getTopicBuff(limit int) []TopicData { - if cap(mp.topic) < limit { - mp.topic = make([]TopicData, limit) - } - - return mp.topic[:0] -} - func (mp *MongoPersist) OnExit() { } @@ -184,7 +174,7 @@ func (mp *MongoPersist) PersistTopicData(topic string, topicData []TopicData, re } // FindTopicData 查找数据 -func (mp *MongoPersist) findTopicData(topic string, startIndex uint64, limit int64) ([]TopicData, bool) { +func (mp *MongoPersist) findTopicData(topic string, startIndex uint64, limit int64,topicBuff []TopicData) ([]TopicData, bool) { s := mp.mongo.TakeSession() @@ -226,7 +216,6 @@ func (mp *MongoPersist) findTopicData(topic string, startIndex uint64, limit int } //序列化返回 - topicBuff := mp.getTopicBuff(int(limit)) for i := 0; i < len(res); i++ { rawData, errM := bson.Marshal(res[i]) if errM != nil { @@ -261,7 +250,7 @@ func (mp *MongoPersist) getCollectCount(topic string,today string) (int64 ,error } // FindTopicData 查找数据 -func (mp *MongoPersist) FindTopicData(topic string, startIndex uint64, limit int64) []TopicData { +func (mp *MongoPersist) FindTopicData(topic string, startIndex uint64, limit int64,topicBuff []TopicData) []TopicData { //某表找不到,一直往前找,找到当前置为止 for days := 1; days <= MaxDays; days++ { //是否可以跳天 @@ -285,7 +274,7 @@ func (mp *MongoPersist) FindTopicData(topic string, startIndex uint64, limit int } //从startIndex开始一直往后查 - topicData, isSucc := mp.findTopicData(topic, startIndex, limit) + topicData, isSucc := mp.findTopicData(topic, startIndex, limit,topicBuff) //有数据或者数据库出错时返回,返回后,会进行下一轮的查询遍历 if len(topicData) > 0 || isSucc == false { return topicData