mirror of
https://github.com/duanhf2012/origin.git
synced 2026-02-14 15:54:42 +08:00
Compare commits
12 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
8c8d681093 | ||
|
|
b8150cfc51 | ||
|
|
3833884777 | ||
|
|
66770f07a5 | ||
|
|
76c8541b34 | ||
|
|
b1fee9bc57 | ||
|
|
284d43dc71 | ||
|
|
fd43863b73 | ||
|
|
1fcd870f1d | ||
|
|
11b78f84c4 | ||
|
|
8c6ee24b16 | ||
|
|
ca23925796 |
@@ -265,11 +265,35 @@ func (server *Server) selfNodeRpcHandlerGo(processor IRpcProcessor, client *Clie
|
|||||||
return pCall
|
return pCall
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var iParam interface{}
|
||||||
|
|
||||||
|
|
||||||
if processor == nil {
|
if processor == nil {
|
||||||
_, processor = GetProcessorType(args)
|
_, processor = GetProcessorType(args)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if args != nil {
|
||||||
|
inParamValue := reflect.New(reflect.ValueOf(args).Type().Elem())
|
||||||
|
//args
|
||||||
|
//复制输入参数
|
||||||
|
iParam = inParamValue.Interface()
|
||||||
|
bytes,err := processor.Marshal(args)
|
||||||
|
if err == nil {
|
||||||
|
err = processor.Unmarshal(bytes,iParam)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
pCall.Seq = 0
|
||||||
|
pCall.Err = errors.New("RpcHandler " + handlerName + "."+serviceMethod+" deep copy inParam is error:" + err.Error())
|
||||||
|
pCall.done <- pCall
|
||||||
|
log.SError(pCall.Err.Error())
|
||||||
|
|
||||||
|
return pCall
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
req := MakeRpcRequest(processor, 0, rpcMethodId, serviceMethod, noReply, nil)
|
req := MakeRpcRequest(processor, 0, rpcMethodId, serviceMethod, noReply, nil)
|
||||||
req.inParam = args
|
req.inParam = iParam
|
||||||
req.localReply = reply
|
req.localReply = reply
|
||||||
if rawArgs != nil {
|
if rawArgs != nil {
|
||||||
var err error
|
var err error
|
||||||
@@ -335,8 +359,23 @@ func (server *Server) selfNodeRpcHandlerAsyncGo(client *Client, callerRpcHandler
|
|||||||
}
|
}
|
||||||
|
|
||||||
_, processor := GetProcessorType(args)
|
_, processor := GetProcessorType(args)
|
||||||
|
inParamValue := reflect.New(reflect.ValueOf(args).Type().Elem())
|
||||||
|
//args
|
||||||
|
//复制输入参数
|
||||||
|
iParam := inParamValue.Interface()
|
||||||
|
bytes,err := processor.Marshal(args)
|
||||||
|
if err == nil {
|
||||||
|
err = processor.Unmarshal(bytes,iParam)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
errM := errors.New("RpcHandler " + handlerName + "."+serviceMethod+" deep copy inParam is error:" + err.Error())
|
||||||
|
log.SError(errM.Error())
|
||||||
|
return errM
|
||||||
|
}
|
||||||
|
|
||||||
req := MakeRpcRequest(processor, 0, 0, serviceMethod, noReply, nil)
|
req := MakeRpcRequest(processor, 0, 0, serviceMethod, noReply, nil)
|
||||||
req.inParam = args
|
req.inParam = iParam
|
||||||
req.localReply = reply
|
req.localReply = reply
|
||||||
|
|
||||||
if noReply == false {
|
if noReply == false {
|
||||||
@@ -370,7 +409,7 @@ func (server *Server) selfNodeRpcHandlerAsyncGo(client *Client, callerRpcHandler
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
err := rpcHandler.PushRpcRequest(req)
|
err = rpcHandler.PushRpcRequest(req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
ReleaseRpcRequest(req)
|
ReleaseRpcRequest(req)
|
||||||
return err
|
return err
|
||||||
|
|||||||
@@ -68,34 +68,39 @@ func (s *Session) NextSeq(db string, collection string, id interface{}) (int, er
|
|||||||
|
|
||||||
after := options.After
|
after := options.After
|
||||||
updateOpts := options.FindOneAndUpdateOptions{ReturnDocument: &after}
|
updateOpts := options.FindOneAndUpdateOptions{ReturnDocument: &after}
|
||||||
err := s.Client.Database(db).Collection(collection).FindOneAndUpdate(ctxTimeout, bson.M{"_id": id}, bson.M{"$inc": bson.M{"Seq": 1}},&updateOpts).Decode(&res)
|
err := s.Client.Database(db).Collection(collection).FindOneAndUpdate(ctxTimeout, bson.M{"_id": id}, bson.M{"$inc": bson.M{"Seq": 1}}, &updateOpts).Decode(&res)
|
||||||
return res.Seq, err
|
return res.Seq, err
|
||||||
}
|
}
|
||||||
|
|
||||||
//indexKeys[索引][每个索引key字段]
|
// indexKeys[索引][每个索引key字段]
|
||||||
func (s *Session) EnsureIndex(db string, collection string, indexKeys [][]string, bBackground bool,sparse bool) error {
|
func (s *Session) EnsureIndex(db string, collection string, indexKeys [][]string, bBackground bool, sparse bool, asc bool) error {
|
||||||
return s.ensureIndex(db, collection, indexKeys, bBackground, false,sparse)
|
return s.ensureIndex(db, collection, indexKeys, bBackground, false, sparse, asc)
|
||||||
}
|
}
|
||||||
|
|
||||||
//indexKeys[索引][每个索引key字段]
|
// indexKeys[索引][每个索引key字段]
|
||||||
func (s *Session) EnsureUniqueIndex(db string, collection string, indexKeys [][]string, bBackground bool,sparse bool) error {
|
func (s *Session) EnsureUniqueIndex(db string, collection string, indexKeys [][]string, bBackground bool, sparse bool, asc bool) error {
|
||||||
return s.ensureIndex(db, collection, indexKeys, bBackground, true,sparse)
|
return s.ensureIndex(db, collection, indexKeys, bBackground, true, sparse, asc)
|
||||||
}
|
}
|
||||||
|
|
||||||
//keys[索引][每个索引key字段]
|
// keys[索引][每个索引key字段]
|
||||||
func (s *Session) ensureIndex(db string, collection string, indexKeys [][]string, bBackground bool, unique bool,sparse bool) error {
|
func (s *Session) ensureIndex(db string, collection string, indexKeys [][]string, bBackground bool, unique bool, sparse bool, asc bool) error {
|
||||||
var indexes []mongo.IndexModel
|
var indexes []mongo.IndexModel
|
||||||
for _, keys := range indexKeys {
|
for _, keys := range indexKeys {
|
||||||
keysDoc := bsonx.Doc{}
|
keysDoc := bsonx.Doc{}
|
||||||
for _, key := range keys {
|
for _, key := range keys {
|
||||||
keysDoc = keysDoc.Append(key, bsonx.Int32(1))
|
if asc {
|
||||||
|
keysDoc = keysDoc.Append(key, bsonx.Int32(1))
|
||||||
|
} else {
|
||||||
|
keysDoc = keysDoc.Append(key, bsonx.Int32(-1))
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
options:= options.Index().SetUnique(unique).SetBackground(bBackground)
|
options := options.Index().SetUnique(unique).SetBackground(bBackground)
|
||||||
if sparse == true {
|
if sparse == true {
|
||||||
options.SetSparse(true)
|
options.SetSparse(true)
|
||||||
}
|
}
|
||||||
indexes = append(indexes, mongo.IndexModel{Keys: keysDoc, Options:options })
|
indexes = append(indexes, mongo.IndexModel{Keys: keysDoc, Options: options})
|
||||||
}
|
}
|
||||||
|
|
||||||
ctxTimeout, cancel := context.WithTimeout(context.Background(), s.maxOperatorTimeOut)
|
ctxTimeout, cancel := context.WithTimeout(context.Background(), s.maxOperatorTimeOut)
|
||||||
|
|||||||
@@ -25,6 +25,7 @@ type CustomerSubscriber struct {
|
|||||||
customerId string
|
customerId string
|
||||||
|
|
||||||
isStop int32 //退出标记
|
isStop int32 //退出标记
|
||||||
|
topicCache []TopicData // 从消息队列中取出来的消息的缓存
|
||||||
}
|
}
|
||||||
|
|
||||||
const DefaultOneBatchQuantity = 1000
|
const DefaultOneBatchQuantity = 1000
|
||||||
@@ -79,6 +80,7 @@ func (cs *CustomerSubscriber) trySetSubscriberBaseInfo(rpcHandler rpc.IRpcHandle
|
|||||||
cs.StartIndex = uint64(zeroTime.Unix() << 32)
|
cs.StartIndex = uint64(zeroTime.Unix() << 32)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
cs.topicCache = make([]TopicData, oneBatchQuantity)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -156,14 +158,14 @@ func (cs *CustomerSubscriber) SubscribeRun() {
|
|||||||
|
|
||||||
func (cs *CustomerSubscriber) subscribe() bool {
|
func (cs *CustomerSubscriber) subscribe() bool {
|
||||||
//先从内存中查找
|
//先从内存中查找
|
||||||
topicData, ret := cs.subscriber.queue.FindData(cs.StartIndex, cs.oneBatchQuantity)
|
topicData, ret := cs.subscriber.queue.FindData(cs.StartIndex+1, cs.oneBatchQuantity, cs.topicCache[:0])
|
||||||
if ret == true {
|
if ret == true {
|
||||||
cs.publishToCustomer(topicData)
|
cs.publishToCustomer(topicData)
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
//从持久化数据中来找
|
//从持久化数据中来找
|
||||||
topicData = cs.subscriber.dataPersist.FindTopicData(cs.topic, cs.StartIndex, int64(cs.oneBatchQuantity))
|
topicData = cs.subscriber.dataPersist.FindTopicData(cs.topic, cs.StartIndex, int64(cs.oneBatchQuantity),cs.topicCache[:0])
|
||||||
return cs.publishToCustomer(topicData)
|
return cs.publishToCustomer(topicData)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -188,7 +190,7 @@ func (cs *CustomerSubscriber) publishToCustomer(topicData []TopicData) bool {
|
|||||||
|
|
||||||
if len(topicData) == 0 {
|
if len(topicData) == 0 {
|
||||||
//没有任何数据待一秒吧
|
//没有任何数据待一秒吧
|
||||||
time.Sleep(time.Millisecond * 100)
|
time.Sleep(time.Second * 1)
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -49,13 +49,22 @@ func (mq *MemoryQueue) findData(startPos int32, startIndex uint64, limit int32)
|
|||||||
if findStartPos <= mq.tail {
|
if findStartPos <= mq.tail {
|
||||||
findEndPos = mq.tail + 1
|
findEndPos = mq.tail + 1
|
||||||
} else {
|
} else {
|
||||||
findEndPos = int32(cap(mq.topicQueue))
|
findEndPos = int32(len(mq.topicQueue))
|
||||||
|
}
|
||||||
|
|
||||||
|
if findStartPos >= findEndPos {
|
||||||
|
return nil, false
|
||||||
|
}
|
||||||
|
|
||||||
|
// 要取的Seq 比内存中最小的数据的Seq还小,那么需要返回错误
|
||||||
|
if mq.topicQueue[findStartPos].Seq > startIndex {
|
||||||
|
return nil, false
|
||||||
}
|
}
|
||||||
|
|
||||||
//二分查找位置
|
//二分查找位置
|
||||||
pos := int32(algorithms.BiSearch(mq.topicQueue[findStartPos:findEndPos], startIndex, 1))
|
pos := int32(algorithms.BiSearch(mq.topicQueue[findStartPos:findEndPos], startIndex, 1))
|
||||||
if pos == -1 {
|
if pos == -1 {
|
||||||
return nil, true
|
return nil, false
|
||||||
}
|
}
|
||||||
|
|
||||||
pos += findStartPos
|
pos += findStartPos
|
||||||
@@ -69,29 +78,31 @@ func (mq *MemoryQueue) findData(startPos int32, startIndex uint64, limit int32)
|
|||||||
}
|
}
|
||||||
|
|
||||||
// FindData 返回参数[]TopicData 表示查找到的数据,nil表示无数据。bool表示是否不应该在内存中来查
|
// FindData 返回参数[]TopicData 表示查找到的数据,nil表示无数据。bool表示是否不应该在内存中来查
|
||||||
func (mq *MemoryQueue) FindData(startIndex uint64, limit int32) ([]TopicData, bool) {
|
func (mq *MemoryQueue) FindData(startIndex uint64, limit int32, dataQueue []TopicData) ([]TopicData, bool) {
|
||||||
mq.locker.RLock()
|
mq.locker.RLock()
|
||||||
defer mq.locker.RUnlock()
|
defer mq.locker.RUnlock()
|
||||||
|
|
||||||
//队列为空时,应该从数据库查找
|
//队列为空时,应该从数据库查找
|
||||||
if mq.head == mq.tail {
|
if mq.head == mq.tail {
|
||||||
return nil, false
|
return nil, false
|
||||||
}
|
} else if mq.head < mq.tail {
|
||||||
|
// 队列没有折叠
|
||||||
/*
|
datas,ret := mq.findData(mq.head + 1, startIndex, limit)
|
||||||
//先判断startIndex是否比第一个元素要大
|
if ret {
|
||||||
headTopic := (mq.head + 1) % int32(len(mq.topicQueue))
|
dataQueue = append(dataQueue, datas...)
|
||||||
//此时需要从持久化数据中取
|
}
|
||||||
if startIndex+1 > mq.topicQueue[headTopic].Seq {
|
return dataQueue, ret
|
||||||
return nil, false
|
} else {
|
||||||
|
// 折叠先找后面的部分
|
||||||
|
datas,ret := mq.findData(mq.head+1, startIndex, limit)
|
||||||
|
if ret {
|
||||||
|
dataQueue = append(dataQueue, datas...)
|
||||||
|
return dataQueue, ret
|
||||||
}
|
}
|
||||||
*/
|
|
||||||
|
|
||||||
retData, ret := mq.findData(mq.head+1, startIndex, limit)
|
// 后面没找到,从前面开始找
|
||||||
if mq.head <= mq.tail || ret == true {
|
datas,ret = mq.findData(0, startIndex, limit)
|
||||||
return retData, true
|
dataQueue = append(dataQueue, datas...)
|
||||||
|
return dataQueue, ret
|
||||||
}
|
}
|
||||||
|
|
||||||
//如果是正常head在后,尾在前,从数组0下标开始找到tail
|
|
||||||
return mq.findData(0, startIndex, limit)
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -15,8 +15,8 @@ type QueueDataPersist interface {
|
|||||||
OnExit()
|
OnExit()
|
||||||
OnReceiveTopicData(topic string, topicData []TopicData) //当收到推送过来的数据时
|
OnReceiveTopicData(topic string, topicData []TopicData) //当收到推送过来的数据时
|
||||||
OnPushTopicDataToCustomer(topic string, topicData []TopicData) //当推送数据到Customer时回调
|
OnPushTopicDataToCustomer(topic string, topicData []TopicData) //当推送数据到Customer时回调
|
||||||
PersistTopicData(topic string, topicData []TopicData, retryCount int) ([]TopicData, bool) //持久化数据,失败则返回false,上层会重复尝试,直到成功,建议在函数中加入次数,超过次数则返回true
|
PersistTopicData(topic string, topicData []TopicData, retryCount int) ([]TopicData, []TopicData, bool) //持久化数据,失败则返回false,上层会重复尝试,直到成功,建议在函数中加入次数,超过次数则返回true
|
||||||
FindTopicData(topic string, startIndex uint64, limit int64) []TopicData //查找数据,参数bool代表数据库查找是否成功
|
FindTopicData(topic string, startIndex uint64, limit int64, topicBuff []TopicData) []TopicData //查找数据,参数bool代表数据库查找是否成功
|
||||||
LoadCustomerIndex(topic string, customerId string) (uint64, bool) //false时代表获取失败,一般是读取错误,会进行重试。如果不存在时,返回(0,true)
|
LoadCustomerIndex(topic string, customerId string) (uint64, bool) //false时代表获取失败,一般是读取错误,会进行重试。如果不存在时,返回(0,true)
|
||||||
GetIndex(topicData *TopicData) uint64 //通过topic数据获取进度索引号
|
GetIndex(topicData *TopicData) uint64 //通过topic数据获取进度索引号
|
||||||
PersistIndex(topic string, customerId string, index uint64) //持久化进度索引号
|
PersistIndex(topic string, customerId string, index uint64) //持久化进度索引号
|
||||||
|
|||||||
@@ -20,8 +20,6 @@ type MongoPersist struct {
|
|||||||
url string //连接url
|
url string //连接url
|
||||||
dbName string //数据库名称
|
dbName string //数据库名称
|
||||||
retryCount int //落地数据库重试次数
|
retryCount int //落地数据库重试次数
|
||||||
|
|
||||||
topic []TopicData //用于临时缓存
|
|
||||||
}
|
}
|
||||||
|
|
||||||
const CustomerCollectName = "SysCustomer"
|
const CustomerCollectName = "SysCustomer"
|
||||||
@@ -48,7 +46,7 @@ func (mp *MongoPersist) OnInit() error {
|
|||||||
keys = append(keys, "Customer", "Topic")
|
keys = append(keys, "Customer", "Topic")
|
||||||
IndexKey = append(IndexKey, keys)
|
IndexKey = append(IndexKey, keys)
|
||||||
s := mp.mongo.TakeSession()
|
s := mp.mongo.TakeSession()
|
||||||
if err := s.EnsureUniqueIndex(mp.dbName, CustomerCollectName, IndexKey, true, true); err != nil {
|
if err := s.EnsureUniqueIndex(mp.dbName, CustomerCollectName, IndexKey, true, true,true); err != nil {
|
||||||
log.SError("EnsureUniqueIndex is fail ", err.Error())
|
log.SError("EnsureUniqueIndex is fail ", err.Error())
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@@ -85,14 +83,6 @@ func (mp *MongoPersist) ReadCfg() error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mp *MongoPersist) getTopicBuff(limit int) []TopicData {
|
|
||||||
if cap(mp.topic) < limit {
|
|
||||||
mp.topic = make([]TopicData, limit)
|
|
||||||
}
|
|
||||||
|
|
||||||
return mp.topic[:0]
|
|
||||||
}
|
|
||||||
|
|
||||||
func (mp *MongoPersist) OnExit() {
|
func (mp *MongoPersist) OnExit() {
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -123,7 +113,6 @@ func (mp *MongoPersist) OnReceiveTopicData(topic string, topicData []TopicData)
|
|||||||
|
|
||||||
// OnPushTopicDataToCustomer 当推送数据到Customer时回调
|
// OnPushTopicDataToCustomer 当推送数据到Customer时回调
|
||||||
func (mp *MongoPersist) OnPushTopicDataToCustomer(topic string, topicData []TopicData) {
|
func (mp *MongoPersist) OnPushTopicDataToCustomer(topic string, topicData []TopicData) {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// PersistTopicData 持久化数据
|
// PersistTopicData 持久化数据
|
||||||
@@ -142,20 +131,25 @@ func (mp *MongoPersist) persistTopicData(collectionName string, topicData []Topi
|
|||||||
|
|
||||||
_, err := s.Collection(mp.dbName, collectionName).InsertMany(ctx, documents)
|
_, err := s.Collection(mp.dbName, collectionName).InsertMany(ctx, documents)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.SError("PersistTopicData InsertMany fail,collect name is ", collectionName)
|
log.SError("PersistTopicData InsertMany fail,collect name is ", collectionName," error:",err.Error())
|
||||||
|
|
||||||
//失败最大重试数量
|
//失败最大重试数量
|
||||||
return retryCount >= mp.retryCount
|
return retryCount >= mp.retryCount
|
||||||
}
|
}
|
||||||
|
|
||||||
//log.SRelease("+++++++++====", time.Now().UnixNano())
|
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (mp *MongoPersist) IsSameDay(timestamp1 int64,timestamp2 int64) bool{
|
||||||
|
t1 := time.Unix(timestamp1, 0)
|
||||||
|
t2 := time.Unix(timestamp2, 0)
|
||||||
|
return t1.Year() == t2.Year() && t1.Month() == t2.Month()&&t1.Day() == t2.Day()
|
||||||
|
}
|
||||||
|
|
||||||
// PersistTopicData 持久化数据
|
// PersistTopicData 持久化数据
|
||||||
func (mp *MongoPersist) PersistTopicData(topic string, topicData []TopicData, retryCount int) ([]TopicData, bool) {
|
func (mp *MongoPersist) PersistTopicData(topic string, topicData []TopicData, retryCount int) ([]TopicData, []TopicData, bool) {
|
||||||
if len(topicData) == 0 {
|
if len(topicData) == 0 {
|
||||||
return nil, true
|
return nil, nil,true
|
||||||
}
|
}
|
||||||
|
|
||||||
preDate := topicData[0].Seq >> 32
|
preDate := topicData[0].Seq >> 32
|
||||||
@@ -163,7 +157,7 @@ func (mp *MongoPersist) PersistTopicData(topic string, topicData []TopicData, re
|
|||||||
for findPos = 1; findPos < len(topicData); findPos++ {
|
for findPos = 1; findPos < len(topicData); findPos++ {
|
||||||
newDate := topicData[findPos].Seq >> 32
|
newDate := topicData[findPos].Seq >> 32
|
||||||
//说明换天了
|
//说明换天了
|
||||||
if preDate != newDate {
|
if mp.IsSameDay(int64(preDate),int64(newDate)) == false {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -172,15 +166,15 @@ func (mp *MongoPersist) PersistTopicData(topic string, topicData []TopicData, re
|
|||||||
ret := mp.persistTopicData(collectName, topicData[:findPos], retryCount)
|
ret := mp.persistTopicData(collectName, topicData[:findPos], retryCount)
|
||||||
//如果失败,下次重试
|
//如果失败,下次重试
|
||||||
if ret == false {
|
if ret == false {
|
||||||
return nil, false
|
return nil, nil, false
|
||||||
}
|
}
|
||||||
|
|
||||||
//如果成功
|
//如果成功
|
||||||
return topicData[findPos:len(topicData)], true
|
return topicData[findPos:len(topicData)], topicData[0:findPos], true
|
||||||
}
|
}
|
||||||
|
|
||||||
// FindTopicData 查找数据
|
// FindTopicData 查找数据
|
||||||
func (mp *MongoPersist) findTopicData(topic string, startIndex uint64, limit int64) ([]TopicData, bool) {
|
func (mp *MongoPersist) findTopicData(topic string, startIndex uint64, limit int64,topicBuff []TopicData) ([]TopicData, bool) {
|
||||||
s := mp.mongo.TakeSession()
|
s := mp.mongo.TakeSession()
|
||||||
|
|
||||||
|
|
||||||
@@ -222,7 +216,6 @@ func (mp *MongoPersist) findTopicData(topic string, startIndex uint64, limit int
|
|||||||
}
|
}
|
||||||
|
|
||||||
//序列化返回
|
//序列化返回
|
||||||
topicBuff := mp.getTopicBuff(int(limit))
|
|
||||||
for i := 0; i < len(res); i++ {
|
for i := 0; i < len(res); i++ {
|
||||||
rawData, errM := bson.Marshal(res[i])
|
rawData, errM := bson.Marshal(res[i])
|
||||||
if errM != nil {
|
if errM != nil {
|
||||||
@@ -257,7 +250,7 @@ func (mp *MongoPersist) getCollectCount(topic string,today string) (int64 ,error
|
|||||||
}
|
}
|
||||||
|
|
||||||
// FindTopicData 查找数据
|
// FindTopicData 查找数据
|
||||||
func (mp *MongoPersist) FindTopicData(topic string, startIndex uint64, limit int64) []TopicData {
|
func (mp *MongoPersist) FindTopicData(topic string, startIndex uint64, limit int64,topicBuff []TopicData) []TopicData {
|
||||||
//某表找不到,一直往前找,找到当前置为止
|
//某表找不到,一直往前找,找到当前置为止
|
||||||
for days := 1; days <= MaxDays; days++ {
|
for days := 1; days <= MaxDays; days++ {
|
||||||
//是否可以跳天
|
//是否可以跳天
|
||||||
@@ -281,7 +274,7 @@ func (mp *MongoPersist) FindTopicData(topic string, startIndex uint64, limit int
|
|||||||
}
|
}
|
||||||
|
|
||||||
//从startIndex开始一直往后查
|
//从startIndex开始一直往后查
|
||||||
topicData, isSucc := mp.findTopicData(topic, startIndex, limit)
|
topicData, isSucc := mp.findTopicData(topic, startIndex, limit,topicBuff)
|
||||||
//有数据或者数据库出错时返回,返回后,会进行下一轮的查询遍历
|
//有数据或者数据库出错时返回,返回后,会进行下一轮的查询遍历
|
||||||
if len(topicData) > 0 || isSucc == false {
|
if len(topicData) > 0 || isSucc == false {
|
||||||
return topicData
|
return topicData
|
||||||
@@ -394,8 +387,7 @@ func (mp *MongoPersist) PersistIndex(topic string, customerId string, index uint
|
|||||||
|
|
||||||
ctx, cancel := s.GetDefaultContext()
|
ctx, cancel := s.GetDefaultContext()
|
||||||
defer cancel()
|
defer cancel()
|
||||||
ret, err := s.Collection(mp.dbName, CustomerCollectName).UpdateOne(ctx, condition, updata, UpdateOptionsOpts...)
|
_, err := s.Collection(mp.dbName, CustomerCollectName).UpdateOne(ctx, condition, updata, UpdateOptionsOpts...)
|
||||||
fmt.Println(ret)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.SError("PersistIndex fail :", err.Error())
|
log.SError("PersistIndex fail :", err.Error())
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -27,7 +27,7 @@ func (ss *Subscriber) PushTopicDataToQueue(topic string, topics []TopicData) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ss *Subscriber) PersistTopicData(topic string, topics []TopicData, retryCount int) ([]TopicData, bool) {
|
func (ss *Subscriber) PersistTopicData(topic string, topics []TopicData, retryCount int) ([]TopicData, []TopicData, bool) {
|
||||||
return ss.dataPersist.PersistTopicData(topic, topics, retryCount)
|
return ss.dataPersist.PersistTopicData(topic, topics, retryCount)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -113,25 +113,28 @@ func (tr *TopicRoom) topicRoomRun() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
//如果落地失败,最大重试maxTryPersistNum次数
|
//如果落地失败,最大重试maxTryPersistNum次数
|
||||||
var ret bool
|
for retryCount := 0; retryCount < maxTryPersistNum; {
|
||||||
for j := 0; j < maxTryPersistNum; {
|
|
||||||
//持久化处理
|
//持久化处理
|
||||||
stagingBuff, ret = tr.PersistTopicData(tr.topic, stagingBuff, j+1)
|
stagingBuff, savedBuff, ret := tr.PersistTopicData(tr.topic, stagingBuff, retryCount+1)
|
||||||
//如果存档成功,并且有后续批次,则继续存档
|
|
||||||
if ret == true && len(stagingBuff) > 0 {
|
if ret == true {
|
||||||
//二次存档不计次数
|
// 1. 把成功存储的数据放入内存中
|
||||||
continue
|
if len(savedBuff) > 0 {
|
||||||
}
|
tr.PushTopicDataToQueue(tr.topic, savedBuff)
|
||||||
|
}
|
||||||
//计数增加一次,并且等待100ms,继续重试
|
|
||||||
j += 1
|
// 2. 如果存档成功,并且有后续批次,则继续存档
|
||||||
if ret == false {
|
if ret == true && len(stagingBuff) > 0 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// 3. 成功了,跳出
|
||||||
|
break
|
||||||
|
} else {
|
||||||
|
//计数增加一次,并且等待100ms,继续重试
|
||||||
|
retryCount++
|
||||||
time.Sleep(time.Millisecond * 100)
|
time.Sleep(time.Millisecond * 100)
|
||||||
continue
|
|
||||||
}
|
}
|
||||||
|
|
||||||
tr.PushTopicDataToQueue(tr.topic, stagingBuff)
|
|
||||||
break
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user