From c7e0fcbdbb8f17f0c477679847f52b85f4ff8e27 Mon Sep 17 00:00:00 2001 From: orgin Date: Wed, 23 Nov 2022 09:45:26 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E6=8E=92=E8=A1=8C=E6=A6=9CRa?= =?UTF-8?q?nkService=EF=BC=8C=E6=96=B0=E5=A2=9Emongodb=E6=8C=81=E4=B9=85?= =?UTF-8?q?=E5=8C=96Module?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- rpc/rank.pb.go | 156 +++++++-- sysservice/rankservice/MongodbPersist.go | 391 +++++++++++++++++++++++ sysservice/rankservice/RankData.go | 6 +- sysservice/rankservice/RankDataExpire.go | 4 +- sysservice/rankservice/RankInterface.go | 18 +- sysservice/rankservice/RankService.go | 60 +++- sysservice/rankservice/RankSkip.go | 67 ++-- 7 files changed, 623 insertions(+), 79 deletions(-) create mode 100644 sysservice/rankservice/MongodbPersist.go diff --git a/rpc/rank.pb.go b/rpc/rank.pb.go index 2374600..f92a709 100644 --- a/rpc/rank.pb.go +++ b/rpc/rank.pb.go @@ -515,6 +515,7 @@ type FindRankDataList struct { RankId uint64 `protobuf:"varint,1,opt,name=RankId,proto3" json:"RankId,omitempty"` StartRank uint64 `protobuf:"varint,2,opt,name=StartRank,proto3" json:"StartRank,omitempty"` Count uint64 `protobuf:"varint,3,opt,name=Count,proto3" json:"Count,omitempty"` + Key uint64 `protobuf:"varint,4,opt,name=Key,proto3" json:"Key,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` @@ -574,10 +575,18 @@ func (m *FindRankDataList) GetCount() uint64 { return 0 } +func (m *FindRankDataList) GetKey() uint64 { + if m != nil { + return m.Key + } + return 0 +} + // RankDataList type RankDataList struct { RankDataCount uint64 `protobuf:"varint,1,opt,name=RankDataCount,proto3" json:"RankDataCount,omitempty"` RankPosDataList []*RankPosData `protobuf:"bytes,2,rep,name=RankPosDataList,proto3" json:"RankPosDataList,omitempty"` + KeyRank *RankPosData `protobuf:"bytes,3,opt,name=KeyRank,proto3" json:"KeyRank,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` @@ -630,6 +639,13 @@ func (m *RankDataList) GetRankPosDataList() []*RankPosData { return nil } +func (m *RankDataList) GetKeyRank() *RankPosData { + if m != nil { + return m.KeyRank + } + return nil +} + // RankResult type RankResult struct { AddCount int32 `protobuf:"varint,1,opt,name=AddCount,proto3" json:"AddCount,omitempty"` @@ -711,37 +727,38 @@ func init() { func init() { proto.RegisterFile("proto/rpcproto/rank.proto", fileDescriptor_d5b64eda47521620) } var fileDescriptor_d5b64eda47521620 = []byte{ - // 470 bytes of a gzipped FileDescriptorProto + // 489 bytes of a gzipped FileDescriptorProto 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x7c, 0x54, 0x51, 0x8b, 0xd3, 0x40, - 0x10, 0x66, 0x9b, 0xe4, 0x5a, 0x27, 0x57, 0xac, 0xab, 0x48, 0x14, 0x29, 0x61, 0x11, 0xec, 0x53, - 0x45, 0x05, 0x1f, 0x14, 0xd1, 0x9e, 0x55, 0x38, 0xae, 0x05, 0xd9, 0xe2, 0xcb, 0x3d, 0x08, 0x31, - 0xbb, 0x42, 0x68, 0x6d, 0x96, 0xcd, 0xde, 0x71, 0xfd, 0x1f, 0xfe, 0x28, 0x1f, 0xfd, 0x09, 0xd2, - 0x5f, 0x22, 0xb3, 0xc9, 0xa6, 0xc9, 0x61, 0xfa, 0x36, 0xdf, 0x37, 0x5f, 0xe6, 0x9b, 0x9d, 0x19, - 0x02, 0x8f, 0x94, 0xce, 0x4d, 0xfe, 0x5c, 0xab, 0xb4, 0x0a, 0x92, 0xed, 0x7a, 0x6a, 0x43, 0xea, - 0x69, 0x95, 0xb2, 0x05, 0x0c, 0x78, 0xb2, 0x5d, 0xcf, 0x13, 0x93, 0xd0, 0x11, 0x78, 0x17, 0x72, - 0x17, 0x91, 0x98, 0x4c, 0x7c, 0x8e, 0x21, 0x7d, 0x0c, 0x83, 0x55, 0xae, 0x0d, 0x66, 0xa3, 0x5e, - 0xec, 0x4d, 0x3c, 0x5e, 0x63, 0x4a, 0xc1, 0xb7, 0xbc, 0x17, 0x93, 0xc9, 0x29, 0xb7, 0x31, 0x4b, - 0x21, 0xc4, 0x6a, 0x5f, 0xf2, 0xa2, 0xa3, 0x20, 0x05, 0x1f, 0x05, 0x51, 0xcf, 0x52, 0x36, 0x6e, - 0x99, 0x78, 0x1d, 0x26, 0x7e, 0xc3, 0xe4, 0x17, 0x29, 0x7b, 0x5e, 0x64, 0x85, 0xa1, 0x0f, 0xe1, - 0x04, 0xe3, 0x73, 0x51, 0xb9, 0x54, 0x88, 0x3e, 0x85, 0xe1, 0x6a, 0x9d, 0x29, 0xd4, 0x2c, 0xe4, - 0xb5, 0xdc, 0x58, 0xc7, 0x80, 0xb7, 0x49, 0xfa, 0x00, 0x82, 0xf3, 0x62, 0x2e, 0x53, 0xfb, 0x88, - 0x01, 0x2f, 0x01, 0x8d, 0xa0, 0xbf, 0x4c, 0x6e, 0x6c, 0x9f, 0xbe, 0x2d, 0xea, 0x20, 0xb6, 0xfa, - 0xe9, 0x46, 0x65, 0x5a, 0x2e, 0x8b, 0x28, 0x88, 0x09, 0xb6, 0xea, 0x30, 0xbb, 0x84, 0xe1, 0x57, - 0x55, 0x48, 0x53, 0x8f, 0xb3, 0xab, 0xb5, 0x17, 0x70, 0xea, 0x34, 0xd8, 0x89, 0x1d, 0x6c, 0xf8, - 0x72, 0x38, 0xd5, 0x2a, 0x9d, 0xba, 0x04, 0x6f, 0x49, 0xd8, 0x7b, 0x08, 0xe7, 0x72, 0x23, 0x8d, - 0x3c, 0xdb, 0xe1, 0x14, 0xbb, 0x2a, 0x47, 0xd0, 0xbf, 0x90, 0xbb, 0xba, 0xa8, 0xcf, 0x1d, 0x64, - 0xaf, 0x21, 0x9c, 0x09, 0x51, 0x4f, 0xed, 0x19, 0xf4, 0x67, 0x42, 0x58, 0x21, 0xb9, 0xe5, 0x8e, - 0x24, 0x77, 0x59, 0xf6, 0x0e, 0xee, 0x7d, 0xce, 0xb6, 0xc2, 0x35, 0x73, 0xdc, 0xbe, 0x5a, 0x77, - 0xaf, 0x5e, 0x37, 0xfb, 0x00, 0xb4, 0xfd, 0xb9, 0x9d, 0x62, 0xd7, 0xf7, 0xff, 0x39, 0x0e, 0xf6, - 0x0d, 0x46, 0xcd, 0x0a, 0x47, 0x77, 0xfe, 0x04, 0xee, 0xac, 0x4c, 0xa2, 0x4d, 0xa3, 0xc8, 0x81, - 0xc0, 0x5d, 0x7f, 0xcc, 0xaf, 0xb6, 0xc6, 0xee, 0xda, 0xe7, 0x25, 0x60, 0xaa, 0xbd, 0x0c, 0xbc, - 0x1b, 0x87, 0x4b, 0x75, 0x69, 0xd1, 0x26, 0xe9, 0x1b, 0xb8, 0xdb, 0xb8, 0xf3, 0xc6, 0x16, 0x47, - 0xf5, 0x1c, 0xab, 0x1c, 0xbf, 0x2d, 0x64, 0x1b, 0x00, 0xa4, 0xb8, 0x2c, 0xae, 0x36, 0x06, 0x2f, - 0x6a, 0x26, 0xc4, 0xc1, 0x2a, 0xe0, 0x35, 0xa6, 0x31, 0x84, 0xcb, 0x5c, 0x64, 0x3f, 0x76, 0x65, - 0xba, 0xbc, 0xe0, 0x26, 0x85, 0x0a, 0x2e, 0x7f, 0xe6, 0xd7, 0xf2, 0xf0, 0xb2, 0x80, 0x37, 0xa9, - 0xb3, 0xfb, 0xbf, 0xf7, 0x63, 0xf2, 0x67, 0x3f, 0x26, 0x7f, 0xf7, 0x63, 0x72, 0x19, 0x4c, 0xdf, - 0x6a, 0x95, 0x7e, 0x3f, 0xb1, 0x3f, 0x80, 0x57, 0xff, 0x02, 0x00, 0x00, 0xff, 0xff, 0xc9, 0xb1, - 0x14, 0x9a, 0x1d, 0x04, 0x00, 0x00, + 0x10, 0x66, 0x9b, 0xe4, 0x5a, 0x27, 0x57, 0xac, 0xab, 0x48, 0x14, 0x29, 0x61, 0x11, 0x2c, 0x3e, + 0x54, 0x54, 0xf0, 0x41, 0x11, 0xed, 0x59, 0x85, 0xe3, 0x5a, 0x90, 0x2d, 0xbe, 0xdc, 0x5b, 0xcc, + 0xae, 0x10, 0x5a, 0x9b, 0x65, 0xb3, 0x77, 0x5c, 0xff, 0x87, 0x8f, 0xfe, 0x20, 0x1f, 0xfd, 0x09, + 0xd2, 0x5f, 0x22, 0xb3, 0xc9, 0xa6, 0xc9, 0x71, 0xb9, 0xa7, 0xce, 0x37, 0x33, 0x3b, 0xdf, 0xd7, + 0xf9, 0xa6, 0x85, 0x47, 0x4a, 0xe7, 0x26, 0x7f, 0xa1, 0x55, 0x5a, 0x05, 0xc9, 0x76, 0x3d, 0xb5, + 0x21, 0xf5, 0xb4, 0x4a, 0xd9, 0x02, 0x06, 0x3c, 0xd9, 0xae, 0xe7, 0x89, 0x49, 0xe8, 0x08, 0xbc, + 0x33, 0xb9, 0x8b, 0x48, 0x4c, 0x26, 0x3e, 0xc7, 0x90, 0x3e, 0x86, 0xc1, 0x2a, 0xd7, 0x06, 0xab, + 0x51, 0x2f, 0xf6, 0x26, 0x1e, 0xaf, 0x31, 0xa5, 0xe0, 0xdb, 0xbc, 0x17, 0x93, 0xc9, 0x31, 0xb7, + 0x31, 0x4b, 0x21, 0xc4, 0x69, 0x5f, 0xf3, 0xa2, 0x63, 0x20, 0x05, 0x1f, 0x1b, 0xa2, 0x9e, 0x4d, + 0xd9, 0xb8, 0x45, 0xe2, 0x75, 0x90, 0xf8, 0x0d, 0x92, 0x5f, 0xa4, 0xd4, 0xbc, 0xc8, 0x0a, 0x43, + 0x1f, 0xc2, 0x11, 0xc6, 0xa7, 0xa2, 0x62, 0xa9, 0x10, 0x7d, 0x0a, 0xc3, 0xd5, 0x3a, 0x53, 0xd8, + 0xb3, 0x90, 0x97, 0x72, 0x63, 0x19, 0x03, 0xde, 0x4e, 0xd2, 0x07, 0x10, 0x9c, 0x16, 0x73, 0x99, + 0xda, 0x2f, 0x31, 0xe0, 0x25, 0xa0, 0x11, 0xf4, 0x97, 0xc9, 0x95, 0xd5, 0xe9, 0xdb, 0xa1, 0x0e, + 0xa2, 0xd4, 0xcf, 0x57, 0x2a, 0xd3, 0x72, 0x59, 0x44, 0x41, 0x4c, 0x50, 0xaa, 0xc3, 0xec, 0x1c, + 0x86, 0xdf, 0x54, 0x21, 0x4d, 0xbd, 0xce, 0x2e, 0x69, 0x2f, 0xe1, 0xd8, 0xf5, 0xa0, 0x12, 0xbb, + 0xd8, 0xf0, 0xd5, 0x70, 0xaa, 0x55, 0x3a, 0x75, 0x05, 0xde, 0x6a, 0x61, 0x1f, 0x20, 0x9c, 0xcb, + 0x8d, 0x34, 0xf2, 0x64, 0x87, 0x5b, 0xec, 0x9a, 0x1c, 0x41, 0xff, 0x4c, 0xee, 0xea, 0xa1, 0x3e, + 0x77, 0x90, 0xbd, 0x81, 0x70, 0x26, 0x44, 0xbd, 0xb5, 0x67, 0xd0, 0x9f, 0x09, 0x61, 0x1b, 0xc9, + 0x35, 0x76, 0x4c, 0x72, 0x57, 0x65, 0xef, 0xe1, 0xde, 0x97, 0x6c, 0x2b, 0x9c, 0x98, 0xdb, 0xe9, + 0x2b, 0xbb, 0x7b, 0xb5, 0xdd, 0xec, 0x23, 0xd0, 0xf6, 0x73, 0xbb, 0xc5, 0xae, 0xf7, 0x37, 0x1c, + 0x07, 0x53, 0x30, 0x6a, 0x4e, 0xb8, 0xd5, 0xf3, 0x27, 0x70, 0x67, 0x65, 0x12, 0x6d, 0x1a, 0x43, + 0x0e, 0x09, 0xf4, 0xfa, 0x53, 0x7e, 0xb1, 0x35, 0xd6, 0x6b, 0x9f, 0x97, 0xc0, 0x69, 0xf6, 0x0f, + 0x9a, 0x7f, 0x93, 0xb6, 0x3f, 0x78, 0x4a, 0x0e, 0x97, 0x03, 0x4a, 0xd6, 0x76, 0x92, 0xbe, 0x85, + 0xbb, 0x8d, 0xd3, 0x6f, 0x18, 0x3b, 0xaa, 0x57, 0x5b, 0xd5, 0xf8, 0xf5, 0x46, 0xfa, 0xdc, 0xfa, + 0x66, 0x65, 0xa3, 0xb8, 0x9b, 0xde, 0xb8, 0x06, 0xb6, 0x01, 0xc0, 0x4f, 0x2e, 0x8b, 0x8b, 0x8d, + 0xc1, 0x83, 0x9c, 0x09, 0x71, 0x90, 0x15, 0xf0, 0x1a, 0xd3, 0x18, 0xc2, 0x65, 0x2e, 0xb2, 0x1f, + 0xbb, 0xb2, 0x5c, 0xfe, 0x00, 0x9a, 0x29, 0xec, 0xe0, 0xf2, 0x67, 0x7e, 0x29, 0x0f, 0x8b, 0x09, + 0x78, 0x33, 0x75, 0x72, 0xff, 0xcf, 0x7e, 0x4c, 0xfe, 0xee, 0xc7, 0xe4, 0xdf, 0x7e, 0x4c, 0xce, + 0x83, 0xe9, 0x3b, 0xad, 0xd2, 0xef, 0x47, 0xf6, 0xff, 0xe3, 0xf5, 0xff, 0x00, 0x00, 0x00, 0xff, + 0xff, 0x5b, 0x21, 0xc1, 0xf8, 0x5c, 0x04, 0x00, 0x00, } func (m *RankData) Marshal() (dAtA []byte, err error) { @@ -1157,6 +1174,11 @@ func (m *FindRankDataList) MarshalToSizedBuffer(dAtA []byte) (int, error) { i -= len(m.XXX_unrecognized) copy(dAtA[i:], m.XXX_unrecognized) } + if m.Key != 0 { + i = encodeVarintRank(dAtA, i, uint64(m.Key)) + i-- + dAtA[i] = 0x20 + } if m.Count != 0 { i = encodeVarintRank(dAtA, i, uint64(m.Count)) i-- @@ -1199,6 +1221,18 @@ func (m *RankDataList) MarshalToSizedBuffer(dAtA []byte) (int, error) { i -= len(m.XXX_unrecognized) copy(dAtA[i:], m.XXX_unrecognized) } + if m.KeyRank != nil { + { + size, err := m.KeyRank.MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintRank(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0x1a + } if len(m.RankPosDataList) > 0 { for iNdEx := len(m.RankPosDataList) - 1; iNdEx >= 0; iNdEx-- { { @@ -1468,6 +1502,9 @@ func (m *FindRankDataList) Size() (n int) { if m.Count != 0 { n += 1 + sovRank(uint64(m.Count)) } + if m.Key != 0 { + n += 1 + sovRank(uint64(m.Key)) + } if m.XXX_unrecognized != nil { n += len(m.XXX_unrecognized) } @@ -1489,6 +1526,10 @@ func (m *RankDataList) Size() (n int) { n += 1 + l + sovRank(uint64(l)) } } + if m.KeyRank != nil { + l = m.KeyRank.Size() + n += 1 + l + sovRank(uint64(l)) + } if m.XXX_unrecognized != nil { n += len(m.XXX_unrecognized) } @@ -2671,6 +2712,25 @@ func (m *FindRankDataList) Unmarshal(dAtA []byte) error { break } } + case 4: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Key", wireType) + } + m.Key = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRank + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.Key |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } default: iNdEx = preIndex skippy, err := skipRank(dAtA[iNdEx:]) @@ -2778,6 +2838,42 @@ func (m *RankDataList) Unmarshal(dAtA []byte) error { return err } iNdEx = postIndex + case 3: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field KeyRank", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRank + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthRank + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthRank + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.KeyRank == nil { + m.KeyRank = &RankPosData{} + } + if err := m.KeyRank.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex default: iNdEx = preIndex skippy, err := skipRank(dAtA[iNdEx:]) diff --git a/sysservice/rankservice/MongodbPersist.go b/sysservice/rankservice/MongodbPersist.go new file mode 100644 index 0000000..3f0b785 --- /dev/null +++ b/sysservice/rankservice/MongodbPersist.go @@ -0,0 +1,391 @@ +package rankservice + +import ( + "fmt" + "github.com/duanhf2012/origin/log" + "github.com/duanhf2012/origin/rpc" + "github.com/duanhf2012/origin/service" + "github.com/duanhf2012/origin/sysmodule/mongodbmodule" + "github.com/duanhf2012/origin/util/coroutine" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/mongo/options" + "sync" + "sync/atomic" + "time" +) + +const batchRemoveNum = 100 +const MaxDays = 180 +type RankDataDB struct { + Id uint64 `bson:"_id,omitempty"` + RefreshTime int64 `bson:"RefreshTime,omitempty"` + SortData []int64 `bson:"SortData,omitempty"` + Data []byte `bson:"Data,omitempty"` +} + +type MongoPersist struct { + service.Module + mongo mongodbmodule.MongoModule + + url string //连接url + dbName string //数据库名称 + SaveInterval time.Duration //落地数据库时间间隔 + + waitGroup sync.WaitGroup + sync.Mutex + mapRemoveRankData map[uint64]map[uint64]struct{} //将要删除的排行数据 map[RankId]map[Key]struct{} + mapUpsertRankData map[uint64]map[uint64]RankData //需要upsert的排行数据 map[RankId][key]RankData + //mapRankCfg map[uint64]string //map[RankId]RankCollectName + + mapRankSkip map[uint64]IRankSkip + maxRetrySaveCount int //存档重试次数 + retryTimeIntervalMs time.Duration //重试时间间隔 + stop int32 +} + +const CustomerCollectName = "SysCustomer" + +func (mp *MongoPersist) OnInit() error { + mp.mapRemoveRankData = map[uint64]map[uint64]struct{}{} + mp.mapUpsertRankData = map[uint64]map[uint64]RankData{} + //mp.mapRankCfg = map[uint64]string{} + mp.mapRankSkip = map[uint64]IRankSkip{} + + if errC := mp.ReadCfg(); errC != nil { + return errC + } + + err := mp.mongo.Init(mp.url, time.Second*15) + if err != nil { + return err + } + + err = mp.mongo.Start() + if err != nil { + log.SError("start dbService[", mp.dbName, "], url[", mp.url, "] init error:", err.Error()) + return err + } + + coroutine.GoRecover(mp.persistCoroutine,-1) + return nil +} + +func (mp *MongoPersist) ReadCfg() error { + mapDBServiceCfg, ok := mp.GetService().GetServiceCfg().(map[string]interface{}) + if ok == false { + return fmt.Errorf("RankService config is error") + } + + + //读取数据库配置 + saveMongoCfg,ok := mapDBServiceCfg["SaveMongo"] + if ok == false { + return fmt.Errorf("RankService.SaveMongo config is error") + } + + mongodbCfg,ok := saveMongoCfg.(map[string]interface{}) + if ok == false { + return fmt.Errorf("RankService.SaveMongo config is error") + } + + //parse MsgRouter + url, ok := mongodbCfg["Url"] + if ok == false { + return fmt.Errorf("RankService.SaveMongo.Url config is error") + } + mp.url = url.(string) + + dbName, ok := mongodbCfg["DBName"] + if ok == false { + return fmt.Errorf("RankService.SaveMongo.DBName config is error") + } + mp.dbName = dbName.(string) + + saveInterval, ok := mongodbCfg["SaveIntervalMs"] + if ok == false { + return fmt.Errorf("RankService.SaveMongo.SaveIntervalMs config is error") + } + + mp.SaveInterval = time.Duration(saveInterval.(float64))*time.Millisecond + + maxRetrySaveCount, ok := mongodbCfg["MaxRetrySaveCount"] + if ok == false { + return fmt.Errorf("RankService.SaveMongo.MaxRetrySaveCount config is error") + } + mp.maxRetrySaveCount = int(maxRetrySaveCount.(float64)) + + retryTimeIntervalMs, ok := mongodbCfg["RetryTimeIntervalMs"] + if ok == false { + return fmt.Errorf("RankService.SaveMongo.RetryTimeIntervalMs config is error") + } + mp.retryTimeIntervalMs = time.Duration(retryTimeIntervalMs.(float64))*time.Millisecond + + + return nil +} + +//启服从数据库加载 +func (mp *MongoPersist) OnStart() { +} + +func (mp *MongoPersist) OnFinishSetupRank(mapRankSkip map[uint64]*RankSkip) error{ + mp.mapRankSkip = map[uint64]IRankSkip{} + for rankId,rank := range mapRankSkip { + mp.mapRankSkip[rankId] = rank + } + + for rankId,rank := range mp.mapRankSkip{ + err := mp.loadFromDB(rankId,rank.GetRankName()) + if err != nil { + log.SError("load from db is fail :%s",err.Error()) + return err + } + } + + return nil +} + +func (mp *MongoPersist) loadFromDB(rankId uint64,rankCollectName string) error{ + s := mp.mongo.TakeSession() + ctx, cancel := s.GetDefaultContext() + defer cancel() + + condition := bson.D{} + cursor, err := s.Collection(mp.dbName, rankCollectName).Find(ctx, condition) + if err != nil { + log.SError("find collect name ", rankCollectName, " is error:", err.Error()) + return err + } + + if cursor.Err()!=nil { + log.SError("find collect name ", rankCollectName, " is error:", cursor.Err().Error()) + return err + } + + rankSkip := mp.mapRankSkip[rankId] + if rankSkip == nil { + err = fmt.Errorf("rank ", rankCollectName, " is not setup:") + log.SError(err.Error()) + return err + } + + defer cursor.Close(ctx) + for cursor.Next(ctx) { + var rankDataDB RankDataDB + err = cursor.Decode(&rankDataDB) + if err != nil { + log.SError(" collect name ", rankCollectName, " Decode is error:", err.Error()) + return err + } + + var rankData rpc.RankData + rankData.Data = rankDataDB.Data + rankData.Key = rankDataDB.Id + rankData.SortData = rankDataDB.SortData + + //更新到排行榜 + rankSkip.UpsetRank(&rankData,rankDataDB.RefreshTime,true) + } + + return nil +} + +func (mp *MongoPersist) OnEnterRank(rankSkip IRankSkip, enterData *RankData){ + mp.Lock() + defer mp.Unlock() + + delete(mp.mapRemoveRankData,enterData.Key) + + mp.lazyInitUpsertMap(rankSkip.GetRankID()) + mp.mapUpsertRankData[rankSkip.GetRankID()][enterData.Key] = *enterData + //mp.SaveToDB() +} + +func (mp *MongoPersist) lazyInitRemoveMap(rankId uint64){ + if mp.mapRemoveRankData[rankId] == nil { + mp.mapRemoveRankData[rankId] = make(map[uint64]struct{},256) + } +} + +func (mp *MongoPersist) lazyInitUpsertMap(rankId uint64){ + if mp.mapUpsertRankData[rankId] == nil { + mp.mapUpsertRankData[rankId] = make(map[uint64]RankData,256) + } +} + +func (mp *MongoPersist) OnLeaveRank(rankSkip IRankSkip, leaveData *RankData){ + mp.Lock() + defer mp.Unlock() + + //先删掉更新中的数据 + delete(mp.mapUpsertRankData,leaveData.Key) + mp.lazyInitRemoveMap(rankSkip.GetRankID()) + mp.mapRemoveRankData[rankSkip.GetRankID()][leaveData.Key] = struct{}{} +} + +func (mp *MongoPersist) OnChangeRankData(rankSkip IRankSkip, changeData *RankData){ + mp.Lock() + defer mp.Unlock() + //先删掉要删除的数据 + delete(mp.mapRemoveRankData,changeData.Key) + + //更新数据 + mp.lazyInitUpsertMap(rankSkip.GetRankID()) + mp.mapUpsertRankData[rankSkip.GetRankID()][changeData.Key] = *changeData +} + +//停存持久化到DB +func (mp *MongoPersist) OnStop(mapRankSkip map[uint64]*RankSkip){ + atomic.StoreInt32(&mp.stop,1) + mp.waitGroup.Wait() +} + +func (mp *MongoPersist) persistCoroutine(){ + mp.waitGroup.Add(1) + defer mp.waitGroup.Done() + for atomic.LoadInt32(&mp.stop)==0 || mp.hasPersistData(){ + //间隔时间sleep + time.Sleep(mp.SaveInterval) + + //没有持久化数据continue + if mp.hasPersistData() == false { + continue + } + + //存档数据到数据库 + mp.saveToDB() + } +} + +func (mp *MongoPersist) hasPersistData() bool{ + mp.Lock() + defer mp.Unlock() + + return len(mp.mapUpsertRankData)>0 || len(mp.mapRemoveRankData) >0 +} + +func (mp *MongoPersist) saveToDB(){ + //1.copy数据 + mp.Lock() + mapRemoveRankData := mp.mapRemoveRankData + mapUpsertRankData := mp.mapUpsertRankData + mp.mapRemoveRankData = map[uint64]map[uint64]struct{}{} + mp.mapUpsertRankData = map[uint64]map[uint64]RankData{} + mp.Unlock() + + //2.存档 + for len(mapUpsertRankData) > 0 { + mp.upsertRankDataToDB(mapUpsertRankData) + } + + for len(mapRemoveRankData) >0 { + mp.removeRankDataToDB(mapRemoveRankData) + } +} + +func (mp *MongoPersist) removeToDB(collectName string,keys []uint64) error{ + s := mp.mongo.TakeSession() + ctx, cancel := s.GetDefaultContext() + defer cancel() + + condition := bson.D{{Key: "_id", Value: bson.M{"$in": keys}}} + _, err := s.Collection(mp.dbName, collectName).DeleteMany(ctx, condition) + if err != nil { + log.SError("MongoPersist DeleteMany fail,collect name is ", collectName) + return err + } + + return nil +} + +func (mp *MongoPersist) removeRankData(rankId uint64,keys []uint64) bool { + rank := mp.mapRankSkip[rankId] + if rank== nil { + log.SError("cannot find rankId ",rankId,"config") + return false + } + + for i:=0;i= batchRemoveNum { + break + } + } + + mp.removeRankData(rankId,keyList) + + //如果删完,删掉rankid下所有 + if len(mapRemoveKey) == 0 { + delete(mapRemoveRankData,rankId) + } + } + +} diff --git a/sysservice/rankservice/RankData.go b/sysservice/rankservice/RankData.go index e4bbe35..bdff64c 100644 --- a/sysservice/rankservice/RankData.go +++ b/sysservice/rankservice/RankData.go @@ -15,18 +15,20 @@ var RankDataPool = sync.NewPoolEx(make(chan sync.IPoolData, 10240), func() sync. type RankData struct { *rpc.RankData - bRelease bool + refreshTimestamp int64 //刷新时间 + //bRelease bool ref bool compareFunc func(other skip.Comparator) int } -func NewRankData(isDec bool, data *rpc.RankData) *RankData { +func NewRankData(isDec bool, data *rpc.RankData,refreshTimestamp int64) *RankData { ret := RankDataPool.Get().(*RankData) ret.compareFunc = ret.ascCompare if isDec { ret.compareFunc = ret.desCompare } ret.RankData = data + ret.refreshTimestamp = refreshTimestamp return ret } diff --git a/sysservice/rankservice/RankDataExpire.go b/sysservice/rankservice/RankDataExpire.go index be7c24e..2c30a86 100644 --- a/sysservice/rankservice/RankDataExpire.go +++ b/sysservice/rankservice/RankDataExpire.go @@ -93,7 +93,7 @@ func (rd *rankDataHeap) PopExpireKey() uint64{ return rankData.Key } -func (rd *rankDataHeap) PushOrRefreshExpireKey(key uint64){ +func (rd *rankDataHeap) PushOrRefreshExpireKey(key uint64,refreshTimestamp int64){ //1.先删掉之前的 expData ,ok := rd.mapExpireData[key] if ok == true { @@ -105,7 +105,7 @@ func (rd *rankDataHeap) PushOrRefreshExpireKey(key uint64){ //2.直接插入 expData = expireDataPool.Get().(*ExpireData) expData.Key = key - expData.RefreshTimestamp = time.Now().UnixNano() + expData.RefreshTimestamp = refreshTimestamp rd.mapExpireData[key] = expData heap.Push(rd,expData) diff --git a/sysservice/rankservice/RankInterface.go b/sysservice/rankservice/RankInterface.go index c9af3f0..2b52769 100644 --- a/sysservice/rankservice/RankInterface.go +++ b/sysservice/rankservice/RankInterface.go @@ -1,6 +1,9 @@ package rankservice -import "github.com/duanhf2012/origin/service" +import ( + "github.com/duanhf2012/origin/service" + "github.com/duanhf2012/origin/rpc" +) type RankDataChangeType int8 @@ -8,16 +11,21 @@ type RankDataChangeType int8 type IRankSkip interface { GetRankID() uint64 + GetRankName() string GetRankLen() uint64 + UpsetRank(upsetData *rpc.RankData,refreshTimestamp int64,fromLoad bool) (*RankData, RankDataChangeType) } + + type IRankModule interface { service.IModule - OnStart(mapRankSkip map[uint64]*RankSkip) error //服务开启时回调 - OnEnterRank(rankSkip IRankSkip, enterData []*RankData) //进入排行 - OnLeaveRank(rankSkip IRankSkip, leaveData []*RankData) //离开排行 - OnChangeRankData(rankSkip IRankSkip, changeData []*RankData) //当排行数据变化时 + OnFinishSetupRank(map[uint64]*RankSkip) error //当完成安装排行榜对象时 + OnStart() //服务开启时回调 + OnEnterRank(rankSkip IRankSkip, enterData *RankData) //进入排行 + OnLeaveRank(rankSkip IRankSkip, leaveData *RankData) //离开排行 + OnChangeRankData(rankSkip IRankSkip, changeData *RankData) //当排行数据变化时 OnStop(mapRankSkip map[uint64]*RankSkip) //服务结束时回调 } diff --git a/sysservice/rankservice/RankService.go b/sysservice/rankservice/RankService.go index 77acd43..77a1563 100644 --- a/sysservice/rankservice/RankService.go +++ b/sysservice/rankservice/RankService.go @@ -18,14 +18,8 @@ type RankService struct { } func (rs *RankService) OnInit() error { - rs.mapRankSkip = make(map[uint64]*RankSkip, PreMapRankSkipLen) - err := rs.dealCfg() - if err != nil { - return err - } - if rs.rankModule != nil { - _, err = rs.AddModule(rs.rankModule) + _, err := rs.AddModule(rs.rankModule) if err != nil { return err } @@ -33,11 +27,19 @@ func (rs *RankService) OnInit() error { rs.AddModule(&DefaultRankModule{}) } + rs.mapRankSkip = make(map[uint64]*RankSkip, PreMapRankSkipLen) + err := rs.dealCfg() + if err != nil { + return err + } + + + return nil } func (rs *RankService) OnStart() { - rs.rankModule.OnStart(rs.mapRankSkip) + rs.rankModule.OnStart() } func (rs *RankService) OnRelease() { @@ -58,7 +60,8 @@ func (rs *RankService) RPC_ManualAddRankSkip(addInfo *rpc.AddRankList, addResult return fmt.Errorf("RPC_AddRankSkip must has rank id") } - newSkip := NewRankSkip(addRankListData.IsDec, transformLevel(addRankListData.SkipListLevel), addRankListData.MaxRank,time.Duration(addRankListData.ExpireMs)*time.Millisecond) + newSkip := NewRankSkip(addRankListData.RankId,"",addRankListData.IsDec, transformLevel(addRankListData.SkipListLevel), addRankListData.MaxRank,time.Duration(addRankListData.ExpireMs)*time.Millisecond) + newSkip.SetupRankModule(rs.rankModule) rs.mapRankSkip[addRankListData.RankId] = newSkip addList = append(addList, addRankListData.RankId) } @@ -75,7 +78,7 @@ func (rs *RankService) RPC_UpsetRank(upsetInfo *rpc.UpsetRankData, upsetResult * return fmt.Errorf("RPC_UpsetRank[", upsetInfo.RankId, "] no this rank id") } - addCount, updateCount := rankSkip.UpsetRank(upsetInfo.RankDataList) + addCount, updateCount := rankSkip.UpsetRankList(upsetInfo.RankDataList) upsetResult.AddCount = addCount upsetResult.ModifyCount = updateCount return nil @@ -135,11 +138,30 @@ func (rs *RankService) RPC_FindRankDataByRank(findInfo *rpc.FindRankDataByRank, func (rs *RankService) RPC_FindRankDataList(findInfo *rpc.FindRankDataList, findResult *rpc.RankDataList) error { rankObj, ok := rs.mapRankSkip[findInfo.RankId] if ok == false || rankObj == nil { - return fmt.Errorf("RPC_FindRankDataListStartTo[", findInfo.RankId, "] no this rank type") + err := fmt.Errorf("not config rank %d",findInfo.RankId) + log.SError(err.Error()) + return err } findResult.RankDataCount = rankObj.GetRankLen() - return rankObj.GetRankDataFromToLimit(findInfo.StartRank, findInfo.Count, findResult) + err := rankObj.GetRankDataFromToLimit(findInfo.StartRank-1, findInfo.Count, findResult) + if err != nil { + return err + } + + //查询附带的key + if findInfo.Key!= 0 { + findRankData, rank := rankObj.GetRankNodeData(findInfo.Key) + if findRankData != nil { + findResult.KeyRank = &rpc.RankPosData{} + findResult.KeyRank.Data = findRankData.Data + findResult.KeyRank.Key = findRankData.Key + findResult.KeyRank.SortData = findRankData.SortData + findResult.KeyRank.Rank = rank + } + } + + return nil } func (rs *RankService) deleteRankList(delIdList []uint64) { @@ -170,21 +192,27 @@ func (rs *RankService) dealCfg() error { } rankId, okId := mapCfg["RankID"].(float64) - if okId == false { + if okId == false || uint64(rankId)==0 { return fmt.Errorf("RankService SortCfg data must has RankID[number]") } + rankName, okId := mapCfg["RankName"].(string) + if okId == false || len(rankName)==0 { + return fmt.Errorf("RankService SortCfg data must has RankName[string]") + } + level, _ := mapCfg["SkipListLevel"].(float64) isDec, _ := mapCfg["IsDec"].(bool) maxRank, _ := mapCfg["MaxRank"].(float64) expireMs, _ := mapCfg["ExpireMs"].(float64) - newSkip := NewRankSkip(isDec, transformLevel(int32(level)), uint64(maxRank),time.Duration(expireMs)*time.Millisecond) + + newSkip := NewRankSkip(uint64(rankId),rankName,isDec, transformLevel(int32(level)), uint64(maxRank),time.Duration(expireMs)*time.Millisecond) + newSkip.SetupRankModule(rs.rankModule) rs.mapRankSkip[uint64(rankId)] = newSkip } - - return nil + return rs.rankModule.OnFinishSetupRank(rs.mapRankSkip) } diff --git a/sysservice/rankservice/RankSkip.go b/sysservice/rankservice/RankSkip.go index 07bd17c..5a7ed1f 100644 --- a/sysservice/rankservice/RankSkip.go +++ b/sysservice/rankservice/RankSkip.go @@ -9,6 +9,7 @@ import ( type RankSkip struct { rankId uint64 //排行榜ID + rankName string //排行榜名称 isDes bool //是否为降序 true:降序 false:升序 skipList *skip.SkipList //跳表 mapRankData map[uint64]*RankData //排行数据map @@ -27,9 +28,11 @@ const ( ) // NewRankSkip 创建排行榜 -func NewRankSkip(isDes bool, level interface{}, maxLen uint64,expireMs time.Duration) *RankSkip { +func NewRankSkip(rankId uint64,rankName string,isDes bool, level interface{}, maxLen uint64,expireMs time.Duration) *RankSkip { rs := &RankSkip{} + rs.rankId = rankId + rs.rankName = rankName rs.isDes = isDes rs.skipList = skip.New(level) rs.mapRankData = make(map[uint64]*RankData, 10240) @@ -64,16 +67,21 @@ func (rs *RankSkip) GetRankID() uint64 { return rs.rankId } +// GetRankName 获取排行榜名称 +func (rs *RankSkip) GetRankName() string { + return rs.rankName +} + // GetRankLen 获取排行榜长度 func (rs *RankSkip) GetRankLen() uint64 { return rs.skipList.Len() } -func (rs *RankSkip) UpsetRank(upsetRankData []*rpc.RankData) (addCount int32, modifyCount int32) { +func (rs *RankSkip) UpsetRankList(upsetRankData []*rpc.RankData) (addCount int32, modifyCount int32) { addList := make([]*RankData, 0, 1) updateList := make([]*RankData, 0, 1) for _, upsetData := range upsetRankData { - changeData, changeType := rs.upsetRank(upsetData) + changeData, changeType := rs.UpsetRank(upsetData,time.Now().UnixNano(),false) if changeData == nil { continue } @@ -86,6 +94,7 @@ func (rs *RankSkip) UpsetRank(upsetRankData []*rpc.RankData) (addCount int32, mo } } + /* if len(addList) > 0 { rs.rankModule.OnEnterRank(rs, addList) } @@ -93,7 +102,7 @@ func (rs *RankSkip) UpsetRank(upsetRankData []*rpc.RankData) (addCount int32, mo if len(updateList) > 0 { rs.rankModule.OnChangeRankData(rs, updateList) } - +*/ addCount = int32(len(addList)) modifyCount = int32(len(updateList)) @@ -102,12 +111,17 @@ func (rs *RankSkip) UpsetRank(upsetRankData []*rpc.RankData) (addCount int32, mo } // UpsetRank 更新玩家排行数据,返回变化后的数据及变化类型 -func (rs *RankSkip) upsetRank(upsetData *rpc.RankData) (*RankData, RankDataChangeType) { +func (rs *RankSkip) UpsetRank(upsetData *rpc.RankData,refreshTimestamp int64,fromLoad bool) (*RankData, RankDataChangeType) { rankNode, ok := rs.mapRankData[upsetData.Key] if ok == true { //找到的情况对比排名数据是否有变化,无变化进行data更新,有变化则进行删除更新 if compareIsEqual(rankNode.SortData, upsetData.SortData) { rankNode.Data = upsetData.GetData() + rankNode.refreshTimestamp = refreshTimestamp + + if fromLoad == false { + rs.rankModule.OnChangeRankData(rs,rankNode) + } return nil, RankDataNone } @@ -117,21 +131,29 @@ func (rs *RankSkip) upsetRank(upsetData *rpc.RankData) (*RankData, RankDataChang rs.skipList.Delete(rankNode) ReleaseRankData(rankNode) - newRankData := NewRankData(rs.isDes, upsetData) + newRankData := NewRankData(rs.isDes, upsetData,refreshTimestamp) rs.skipList.Insert(newRankData) rs.mapRankData[upsetData.Key] = newRankData //刷新有效期 - rs.rankDataExpire.PushOrRefreshExpireKey(upsetData.Key) + rs.rankDataExpire.PushOrRefreshExpireKey(upsetData.Key,refreshTimestamp) + if fromLoad == false { + rs.rankModule.OnChangeRankData(rs, rankNode) + } return newRankData, RankDataUpdate } if rs.checkInsertAndReplace(upsetData) { - newRankData := NewRankData(rs.isDes, upsetData) + newRankData := NewRankData(rs.isDes, upsetData,refreshTimestamp) rs.skipList.Insert(newRankData) rs.mapRankData[upsetData.Key] = newRankData - rs.rankDataExpire.PushOrRefreshExpireKey(upsetData.Key) + rs.rankDataExpire.PushOrRefreshExpireKey(upsetData.Key,refreshTimestamp) + + if fromLoad == false { + rs.rankModule.OnEnterRank(rs, newRankData) + } + return newRankData, RankDataAdd } @@ -140,7 +162,7 @@ func (rs *RankSkip) upsetRank(upsetData *rpc.RankData) (*RankData, RankDataChang // DeleteRankData 删除排行数据 func (rs *RankSkip) DeleteRankData(delKeys []uint64) int32 { - removeRankData := make([]*RankData, 0, 1) + var removeRankData int32 //预统计处理,进行回调 for _, key := range delKeys { rankData, ok := rs.mapRankData[key] @@ -148,20 +170,16 @@ func (rs *RankSkip) DeleteRankData(delKeys []uint64) int32 { continue } - removeRankData = append(removeRankData, rankData) - } - rs.rankModule.OnLeaveRank(rs, removeRankData) - - //从排行榜中删除 - for _, rankData := range removeRankData { + removeRankData+=1 rs.skipList.Delete(rankData) delete(rs.mapRankData, rankData.Key) rs.rankDataExpire.RemoveExpireKey(rankData.Key) + rs.rankModule.OnLeaveRank(rs, rankData) ReleaseRankData(rankData) } rs.pickExpireKey() - return int32(len(removeRankData)) + return removeRankData } // GetRankNodeData 获取,返回排名节点与名次 @@ -172,12 +190,12 @@ func (rs *RankSkip) GetRankNodeData(findKey uint64) (*RankData, uint64) { } _, index := rs.skipList.GetWithPosition(rankNode) - return rankNode, index + return rankNode, index+1 } // GetRankNodeDataByPos 获取,返回排名节点与名次 func (rs *RankSkip) GetRankNodeDataByRank(rank uint64) (*RankData, uint64) { - rankNode := rs.skipList.ByPosition(rank) + rankNode := rs.skipList.ByPosition(rank-1) if rankNode == nil { return nil, 0 } @@ -203,7 +221,7 @@ func (rs *RankSkip) GetRankKeyPrevToLimit(findKey, count uint64, result *rpc.Ran rankData := iter.Value().(*RankData) result.RankPosDataList = append(result.RankPosDataList, &rpc.RankPosData{ Key: rankData.Key, - Rank: rankPos - iterCount, + Rank: rankPos - iterCount+1, SortData: rankData.SortData, Data: rankData.Data, }) @@ -231,7 +249,7 @@ func (rs *RankSkip) GetRankKeyNextToLimit(findKey, count uint64, result *rpc.Ran rankData := iter.Value().(*RankData) result.RankPosDataList = append(result.RankPosDataList, &rpc.RankPosData{ Key: rankData.Key, - Rank: rankPos + iterCount, + Rank: rankPos + iterCount+1, SortData: rankData.SortData, Data: rankData.Data, }) @@ -244,7 +262,8 @@ func (rs *RankSkip) GetRankKeyNextToLimit(findKey, count uint64, result *rpc.Ran // GetRankList 获取排行榜数据,startPos开始的count个数据 func (rs *RankSkip) GetRankDataFromToLimit(startPos, count uint64, result *rpc.RankDataList) error { if rs.GetRankLen() <= 0 { - return fmt.Errorf("rank[", rs.rankId, "] no data") + //初始排行榜可能没有数据 + return nil } if result.RankDataCount < startPos { @@ -257,7 +276,7 @@ func (rs *RankSkip) GetRankDataFromToLimit(startPos, count uint64, result *rpc.R rankData := iter.Value().(*RankData) result.RankPosDataList = append(result.RankPosDataList, &rpc.RankPosData{ Key: rankData.Key, - Rank: iterCount + startPos, + Rank: iterCount + startPos+1, SortData: rankData.SortData, Data: rankData.Data, }) @@ -292,7 +311,7 @@ func (rs *RankSkip) checkInsertAndReplace(upsetData *rpc.RankData) bool { //移除最后一位 //回调模块,该RandData从排行中删除 rs.rankDataExpire.RemoveExpireKey(lastRankData.Key) - rs.rankModule.OnLeaveRank(rs, []*RankData{lastRankData}) + rs.rankModule.OnLeaveRank(rs, lastRankData) rs.skipList.Delete(lastPosData) delete(rs.mapRankData, lastRankData.Key) ReleaseRankData(lastRankData)