mirror of
https://github.com/duanhf2012/origin.git
synced 2026-02-03 22:45:13 +08:00
优化消息队列服务持久化
This commit is contained in:
@@ -1,18 +1,49 @@
|
||||
package messagequeueservice
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"github.com/duanhf2012/origin/log"
|
||||
"github.com/duanhf2012/origin/service"
|
||||
"github.com/duanhf2012/origin/sysmodule/mongodbmodule"
|
||||
"go.mongodb.org/mongo-driver/bson"
|
||||
"go.mongodb.org/mongo-driver/mongo/options"
|
||||
"sunserver/common/util"
|
||||
"time"
|
||||
)
|
||||
|
||||
const MaxDays = 180
|
||||
|
||||
type DataType interface {
|
||||
int | uint | int64 | uint64 | float32 | float64 | int32 | uint32 | int16 | uint16
|
||||
}
|
||||
|
||||
func convertToNumber[DType DataType](val interface{}) (error, DType) {
|
||||
switch val.(type) {
|
||||
case int64:
|
||||
return nil, DType(val.(int64))
|
||||
case int:
|
||||
return nil, DType(val.(int))
|
||||
case uint:
|
||||
return nil, DType(val.(uint))
|
||||
case uint64:
|
||||
return nil, DType(val.(uint64))
|
||||
case float32:
|
||||
return nil, DType(val.(float32))
|
||||
case float64:
|
||||
return nil, DType(val.(float64))
|
||||
case int32:
|
||||
return nil, DType(val.(int32))
|
||||
case uint32:
|
||||
return nil, DType(val.(uint32))
|
||||
case int16:
|
||||
return nil, DType(val.(int16))
|
||||
case uint16:
|
||||
return nil, DType(val.(uint16))
|
||||
}
|
||||
|
||||
return errors.New("unsupported type"), 0
|
||||
}
|
||||
|
||||
type MongoPersist struct {
|
||||
service.Module
|
||||
mongo mongodbmodule.MongoModule
|
||||
@@ -363,7 +394,7 @@ func (mp *MongoPersist) GetIndex(topicData *TopicData) uint64 {
|
||||
|
||||
for _, e := range document {
|
||||
if e.Key == "_id" {
|
||||
errC, seq := util.ConvertToNumber[uint64](e.Value)
|
||||
errC, seq := convertToNumber[uint64](e.Value)
|
||||
if errC != nil {
|
||||
log.Error("value is error:%s,%+v, ", errC.Error(), e.Value)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user