From 68dfbc46f01d1219918e05369e7247467588a0e4 Mon Sep 17 00:00:00 2001 From: orgin Date: Tue, 15 Nov 2022 17:09:17 +0800 Subject: [PATCH] =?UTF-8?q?=E6=96=B0=E5=A2=9E=E8=87=AA=E5=AE=9A=E4=B9=89?= =?UTF-8?q?=E6=8C=81=E4=B9=85=E5=8C=96=E7=9A=84=E6=B6=88=E6=81=AF=E9=98=9F?= =?UTF-8?q?=E5=88=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- rpc/client.go | 16 +- rpc/messagequeue.pb.go | 1777 +++++++++++++++++ rpc/messagequeue.proto | 51 + .../messagequeueservice/CustomerSubscriber.go | 229 +++ sysservice/messagequeueservice/MemoryQueue.go | 97 + .../messagequeueservice/MemoryQueue_test.go | 36 + .../MessageQueueService.go | 126 ++ .../messagequeueservice/MongoPersist.go | 358 ++++ .../messagequeueservice/MongoPersist_test.go | 122 ++ sysservice/messagequeueservice/Subscriber.go | 91 + sysservice/messagequeueservice/TopicRoom.go | 146 ++ 11 files changed, 3046 insertions(+), 3 deletions(-) create mode 100644 rpc/messagequeue.pb.go create mode 100644 rpc/messagequeue.proto create mode 100644 sysservice/messagequeueservice/CustomerSubscriber.go create mode 100644 sysservice/messagequeueservice/MemoryQueue.go create mode 100644 sysservice/messagequeueservice/MemoryQueue_test.go create mode 100644 sysservice/messagequeueservice/MessageQueueService.go create mode 100644 sysservice/messagequeueservice/MongoPersist.go create mode 100644 sysservice/messagequeueservice/MongoPersist_test.go create mode 100644 sysservice/messagequeueservice/Subscriber.go create mode 100644 sysservice/messagequeueservice/TopicRoom.go diff --git a/rpc/client.go b/rpc/client.go index a366776..9fc7282 100644 --- a/rpc/client.go +++ b/rpc/client.go @@ -82,9 +82,6 @@ func (client *Client) Connect(id int, addr string, maxRpcParamLen uint32) error func (client *Client) startCheckRpcCallTimer() { for { time.Sleep(5 * time.Second) - if client.GetCloseFlag() == true { - break - } client.checkRpcCallTimeout() } } @@ -348,6 +345,19 @@ func (client *Client) GetId() int { func (client *Client) Close(waitDone bool) { client.TCPClient.Close(waitDone) + + client.pendingLock.Lock() + for { + pElem := client.pendingTimer.Front() + if pElem == nil { + break + } + + pCall := pElem.Value.(*Call) + pCall.Err = errors.New("nodeid is disconnect ") + client.makeCallFail(pCall) + } + client.pendingLock.Unlock() } func (client *Client) GetClientSeq() uint32 { diff --git a/rpc/messagequeue.pb.go b/rpc/messagequeue.pb.go new file mode 100644 index 0000000..255683f --- /dev/null +++ b/rpc/messagequeue.pb.go @@ -0,0 +1,1777 @@ +// Code generated by protoc-gen-gogo. DO NOT EDIT. +// source: proto/rpcproto/messagequeue.proto + +package rpc + +import ( + fmt "fmt" + proto "github.com/gogo/protobuf/proto" + io "io" + math "math" + math_bits "math/bits" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.GoGoProtoPackageIsVersion3 // please upgrade the proto package + +type SubscribeType int32 + +const ( + SubscribeType_Subscribe SubscribeType = 0 + SubscribeType_Unsubscribe SubscribeType = 1 +) + +var SubscribeType_name = map[int32]string{ + 0: "Subscribe", + 1: "Unsubscribe", +} + +var SubscribeType_value = map[string]int32{ + "Subscribe": 0, + "Unsubscribe": 1, +} + +func (x SubscribeType) String() string { + return proto.EnumName(SubscribeType_name, int32(x)) +} + +func (SubscribeType) EnumDescriptor() ([]byte, []int) { + return fileDescriptor_2bf98a7a010f6009, []int{0} +} + +type SubscribeMethod int32 + +const ( + SubscribeMethod_Method_Custom SubscribeMethod = 0 + SubscribeMethod_Method_Last SubscribeMethod = 1 +) + +var SubscribeMethod_name = map[int32]string{ + 0: "Method_Custom", + 1: "Method_Last", +} + +var SubscribeMethod_value = map[string]int32{ + "Method_Custom": 0, + "Method_Last": 1, +} + +func (x SubscribeMethod) String() string { + return proto.EnumName(SubscribeMethod_name, int32(x)) +} + +func (SubscribeMethod) EnumDescriptor() ([]byte, []int) { + return fileDescriptor_2bf98a7a010f6009, []int{1} +} + +type DBQueuePopReq struct { + CustomerId string `protobuf:"bytes,1,opt,name=CustomerId,proto3" json:"CustomerId,omitempty"` + QueueName string `protobuf:"bytes,2,opt,name=QueueName,proto3" json:"QueueName,omitempty"` + PopStartPos int32 `protobuf:"varint,3,opt,name=PopStartPos,proto3" json:"PopStartPos,omitempty"` + PopNum int32 `protobuf:"varint,4,opt,name=PopNum,proto3" json:"PopNum,omitempty"` + PushData []byte `protobuf:"bytes,5,opt,name=pushData,proto3" json:"pushData,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *DBQueuePopReq) Reset() { *m = DBQueuePopReq{} } +func (m *DBQueuePopReq) String() string { return proto.CompactTextString(m) } +func (*DBQueuePopReq) ProtoMessage() {} +func (*DBQueuePopReq) Descriptor() ([]byte, []int) { + return fileDescriptor_2bf98a7a010f6009, []int{0} +} +func (m *DBQueuePopReq) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *DBQueuePopReq) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_DBQueuePopReq.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *DBQueuePopReq) XXX_Merge(src proto.Message) { + xxx_messageInfo_DBQueuePopReq.Merge(m, src) +} +func (m *DBQueuePopReq) XXX_Size() int { + return m.Size() +} +func (m *DBQueuePopReq) XXX_DiscardUnknown() { + xxx_messageInfo_DBQueuePopReq.DiscardUnknown(m) +} + +var xxx_messageInfo_DBQueuePopReq proto.InternalMessageInfo + +func (m *DBQueuePopReq) GetCustomerId() string { + if m != nil { + return m.CustomerId + } + return "" +} + +func (m *DBQueuePopReq) GetQueueName() string { + if m != nil { + return m.QueueName + } + return "" +} + +func (m *DBQueuePopReq) GetPopStartPos() int32 { + if m != nil { + return m.PopStartPos + } + return 0 +} + +func (m *DBQueuePopReq) GetPopNum() int32 { + if m != nil { + return m.PopNum + } + return 0 +} + +func (m *DBQueuePopReq) GetPushData() []byte { + if m != nil { + return m.PushData + } + return nil +} + +type DBQueuePopRes struct { + QueueName string `protobuf:"bytes,1,opt,name=QueueName,proto3" json:"QueueName,omitempty"` + PushData [][]byte `protobuf:"bytes,2,rep,name=pushData,proto3" json:"pushData,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *DBQueuePopRes) Reset() { *m = DBQueuePopRes{} } +func (m *DBQueuePopRes) String() string { return proto.CompactTextString(m) } +func (*DBQueuePopRes) ProtoMessage() {} +func (*DBQueuePopRes) Descriptor() ([]byte, []int) { + return fileDescriptor_2bf98a7a010f6009, []int{1} +} +func (m *DBQueuePopRes) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *DBQueuePopRes) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_DBQueuePopRes.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *DBQueuePopRes) XXX_Merge(src proto.Message) { + xxx_messageInfo_DBQueuePopRes.Merge(m, src) +} +func (m *DBQueuePopRes) XXX_Size() int { + return m.Size() +} +func (m *DBQueuePopRes) XXX_DiscardUnknown() { + xxx_messageInfo_DBQueuePopRes.DiscardUnknown(m) +} + +var xxx_messageInfo_DBQueuePopRes proto.InternalMessageInfo + +func (m *DBQueuePopRes) GetQueueName() string { + if m != nil { + return m.QueueName + } + return "" +} + +func (m *DBQueuePopRes) GetPushData() [][]byte { + if m != nil { + return m.PushData + } + return nil +} + +//订阅 +type DBQueueSubscribeReq struct { + SubType SubscribeType `protobuf:"varint,1,opt,name=SubType,proto3,enum=SubscribeType" json:"SubType,omitempty"` + Method SubscribeMethod `protobuf:"varint,2,opt,name=Method,proto3,enum=SubscribeMethod" json:"Method,omitempty"` + CustomerId string `protobuf:"bytes,3,opt,name=CustomerId,proto3" json:"CustomerId,omitempty"` + FromNodeId int32 `protobuf:"varint,4,opt,name=FromNodeId,proto3" json:"FromNodeId,omitempty"` + RpcMethod string `protobuf:"bytes,5,opt,name=RpcMethod,proto3" json:"RpcMethod,omitempty"` + TopicName string `protobuf:"bytes,6,opt,name=TopicName,proto3" json:"TopicName,omitempty"` + StartIndex uint64 `protobuf:"varint,7,opt,name=StartIndex,proto3" json:"StartIndex,omitempty"` + OneBatchQuantity int32 `protobuf:"varint,8,opt,name=OneBatchQuantity,proto3" json:"OneBatchQuantity,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *DBQueueSubscribeReq) Reset() { *m = DBQueueSubscribeReq{} } +func (m *DBQueueSubscribeReq) String() string { return proto.CompactTextString(m) } +func (*DBQueueSubscribeReq) ProtoMessage() {} +func (*DBQueueSubscribeReq) Descriptor() ([]byte, []int) { + return fileDescriptor_2bf98a7a010f6009, []int{2} +} +func (m *DBQueueSubscribeReq) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *DBQueueSubscribeReq) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_DBQueueSubscribeReq.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *DBQueueSubscribeReq) XXX_Merge(src proto.Message) { + xxx_messageInfo_DBQueueSubscribeReq.Merge(m, src) +} +func (m *DBQueueSubscribeReq) XXX_Size() int { + return m.Size() +} +func (m *DBQueueSubscribeReq) XXX_DiscardUnknown() { + xxx_messageInfo_DBQueueSubscribeReq.DiscardUnknown(m) +} + +var xxx_messageInfo_DBQueueSubscribeReq proto.InternalMessageInfo + +func (m *DBQueueSubscribeReq) GetSubType() SubscribeType { + if m != nil { + return m.SubType + } + return SubscribeType_Subscribe +} + +func (m *DBQueueSubscribeReq) GetMethod() SubscribeMethod { + if m != nil { + return m.Method + } + return SubscribeMethod_Method_Custom +} + +func (m *DBQueueSubscribeReq) GetCustomerId() string { + if m != nil { + return m.CustomerId + } + return "" +} + +func (m *DBQueueSubscribeReq) GetFromNodeId() int32 { + if m != nil { + return m.FromNodeId + } + return 0 +} + +func (m *DBQueueSubscribeReq) GetRpcMethod() string { + if m != nil { + return m.RpcMethod + } + return "" +} + +func (m *DBQueueSubscribeReq) GetTopicName() string { + if m != nil { + return m.TopicName + } + return "" +} + +func (m *DBQueueSubscribeReq) GetStartIndex() uint64 { + if m != nil { + return m.StartIndex + } + return 0 +} + +func (m *DBQueueSubscribeReq) GetOneBatchQuantity() int32 { + if m != nil { + return m.OneBatchQuantity + } + return 0 +} + +type DBQueueSubscribeRes struct { + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *DBQueueSubscribeRes) Reset() { *m = DBQueueSubscribeRes{} } +func (m *DBQueueSubscribeRes) String() string { return proto.CompactTextString(m) } +func (*DBQueueSubscribeRes) ProtoMessage() {} +func (*DBQueueSubscribeRes) Descriptor() ([]byte, []int) { + return fileDescriptor_2bf98a7a010f6009, []int{3} +} +func (m *DBQueueSubscribeRes) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *DBQueueSubscribeRes) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_DBQueueSubscribeRes.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *DBQueueSubscribeRes) XXX_Merge(src proto.Message) { + xxx_messageInfo_DBQueueSubscribeRes.Merge(m, src) +} +func (m *DBQueueSubscribeRes) XXX_Size() int { + return m.Size() +} +func (m *DBQueueSubscribeRes) XXX_DiscardUnknown() { + xxx_messageInfo_DBQueueSubscribeRes.DiscardUnknown(m) +} + +var xxx_messageInfo_DBQueueSubscribeRes proto.InternalMessageInfo + +type DBQueuePublishReq struct { + TopicName string `protobuf:"bytes,1,opt,name=TopicName,proto3" json:"TopicName,omitempty"` + PushData [][]byte `protobuf:"bytes,2,rep,name=pushData,proto3" json:"pushData,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *DBQueuePublishReq) Reset() { *m = DBQueuePublishReq{} } +func (m *DBQueuePublishReq) String() string { return proto.CompactTextString(m) } +func (*DBQueuePublishReq) ProtoMessage() {} +func (*DBQueuePublishReq) Descriptor() ([]byte, []int) { + return fileDescriptor_2bf98a7a010f6009, []int{4} +} +func (m *DBQueuePublishReq) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *DBQueuePublishReq) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_DBQueuePublishReq.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *DBQueuePublishReq) XXX_Merge(src proto.Message) { + xxx_messageInfo_DBQueuePublishReq.Merge(m, src) +} +func (m *DBQueuePublishReq) XXX_Size() int { + return m.Size() +} +func (m *DBQueuePublishReq) XXX_DiscardUnknown() { + xxx_messageInfo_DBQueuePublishReq.DiscardUnknown(m) +} + +var xxx_messageInfo_DBQueuePublishReq proto.InternalMessageInfo + +func (m *DBQueuePublishReq) GetTopicName() string { + if m != nil { + return m.TopicName + } + return "" +} + +func (m *DBQueuePublishReq) GetPushData() [][]byte { + if m != nil { + return m.PushData + } + return nil +} + +type DBQueuePublishRes struct { + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *DBQueuePublishRes) Reset() { *m = DBQueuePublishRes{} } +func (m *DBQueuePublishRes) String() string { return proto.CompactTextString(m) } +func (*DBQueuePublishRes) ProtoMessage() {} +func (*DBQueuePublishRes) Descriptor() ([]byte, []int) { + return fileDescriptor_2bf98a7a010f6009, []int{5} +} +func (m *DBQueuePublishRes) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *DBQueuePublishRes) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_DBQueuePublishRes.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *DBQueuePublishRes) XXX_Merge(src proto.Message) { + xxx_messageInfo_DBQueuePublishRes.Merge(m, src) +} +func (m *DBQueuePublishRes) XXX_Size() int { + return m.Size() +} +func (m *DBQueuePublishRes) XXX_DiscardUnknown() { + xxx_messageInfo_DBQueuePublishRes.DiscardUnknown(m) +} + +var xxx_messageInfo_DBQueuePublishRes proto.InternalMessageInfo + +func init() { + proto.RegisterEnum("SubscribeType", SubscribeType_name, SubscribeType_value) + proto.RegisterEnum("SubscribeMethod", SubscribeMethod_name, SubscribeMethod_value) + proto.RegisterType((*DBQueuePopReq)(nil), "DBQueuePopReq") + proto.RegisterType((*DBQueuePopRes)(nil), "DBQueuePopRes") + proto.RegisterType((*DBQueueSubscribeReq)(nil), "DBQueueSubscribeReq") + proto.RegisterType((*DBQueueSubscribeRes)(nil), "DBQueueSubscribeRes") + proto.RegisterType((*DBQueuePublishReq)(nil), "DBQueuePublishReq") + proto.RegisterType((*DBQueuePublishRes)(nil), "DBQueuePublishRes") +} + +func init() { proto.RegisterFile("proto/rpcproto/messagequeue.proto", fileDescriptor_2bf98a7a010f6009) } + +var fileDescriptor_2bf98a7a010f6009 = []byte{ + // 432 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x7c, 0x93, 0xc1, 0x6e, 0xd3, 0x30, + 0x1c, 0xc6, 0x71, 0xbb, 0x76, 0xeb, 0x7f, 0xb4, 0xeb, 0x5c, 0x81, 0x22, 0x84, 0xa2, 0xd0, 0x53, + 0xd4, 0x43, 0x27, 0x0d, 0x71, 0xe2, 0x56, 0x26, 0xa4, 0x48, 0xac, 0x64, 0xee, 0xb8, 0x70, 0x41, + 0x4e, 0x62, 0x91, 0x48, 0x24, 0xf6, 0x62, 0x5b, 0x62, 0xcf, 0xc3, 0x13, 0xf0, 0x16, 0x1c, 0x79, + 0x04, 0xd4, 0x27, 0x41, 0xb1, 0xb3, 0x2c, 0x49, 0x11, 0x37, 0xfb, 0xf7, 0xb9, 0x5f, 0xbf, 0xff, + 0xf7, 0x57, 0xe0, 0x95, 0x28, 0xb9, 0xe2, 0x17, 0xa5, 0x88, 0xed, 0x21, 0x67, 0x52, 0xd2, 0xaf, + 0xec, 0x4e, 0x33, 0xcd, 0xd6, 0x06, 0x2d, 0x7f, 0x20, 0x98, 0x5e, 0x6d, 0x6e, 0x2a, 0x12, 0x72, + 0x41, 0xd8, 0x1d, 0x76, 0x01, 0xde, 0x69, 0xa9, 0x78, 0xce, 0xca, 0x20, 0x71, 0x90, 0x87, 0xfc, + 0x09, 0x69, 0x11, 0xfc, 0x12, 0x26, 0xe6, 0xf9, 0x96, 0xe6, 0xcc, 0x19, 0x18, 0xf9, 0x11, 0x60, + 0x0f, 0x4e, 0x43, 0x2e, 0x76, 0x8a, 0x96, 0x2a, 0xe4, 0xd2, 0x19, 0x7a, 0xc8, 0x1f, 0x91, 0x36, + 0xc2, 0xcf, 0x61, 0x1c, 0x72, 0xb1, 0xd5, 0xb9, 0x73, 0x64, 0xc4, 0xfa, 0x86, 0x5f, 0xc0, 0x89, + 0xd0, 0x32, 0xbd, 0xa2, 0x8a, 0x3a, 0x23, 0x0f, 0xf9, 0x4f, 0x49, 0x73, 0x5f, 0x06, 0xdd, 0x90, + 0xb2, 0x1b, 0x02, 0xf5, 0x43, 0xb4, 0xad, 0x06, 0xde, 0xb0, 0x63, 0xf5, 0x73, 0x00, 0x8b, 0xda, + 0x6b, 0xa7, 0x23, 0x19, 0x97, 0x59, 0xc4, 0xaa, 0xb1, 0x7d, 0x38, 0xde, 0xe9, 0xe8, 0xf6, 0x5e, + 0x58, 0xbf, 0xd9, 0xe5, 0x6c, 0xdd, 0xe8, 0x15, 0x25, 0x0f, 0x32, 0xf6, 0x61, 0x7c, 0xcd, 0x54, + 0xca, 0x13, 0x33, 0xfd, 0xec, 0x72, 0xfe, 0xf8, 0xd0, 0x72, 0x52, 0xeb, 0xbd, 0x2a, 0x87, 0x07, + 0x55, 0xba, 0x00, 0xef, 0x4b, 0x9e, 0x6f, 0x79, 0xc2, 0x82, 0xa4, 0xae, 0xa3, 0x45, 0xaa, 0x29, + 0x89, 0x88, 0xeb, 0x3f, 0x1b, 0xd9, 0x29, 0x1b, 0x50, 0xa9, 0xb7, 0x5c, 0x64, 0xb1, 0xe9, 0x60, + 0x6c, 0xd5, 0x06, 0x54, 0xde, 0xa6, 0xf2, 0xa0, 0x48, 0xd8, 0x77, 0xe7, 0xd8, 0x43, 0xfe, 0x11, + 0x69, 0x11, 0xbc, 0x82, 0xf9, 0xc7, 0x82, 0x6d, 0xa8, 0x8a, 0xd3, 0x1b, 0x4d, 0x0b, 0x95, 0xa9, + 0x7b, 0xe7, 0xc4, 0x24, 0x38, 0xe0, 0xcb, 0x67, 0xff, 0xaa, 0x4c, 0x2e, 0xaf, 0xe1, 0xfc, 0x61, + 0x2b, 0x3a, 0xfa, 0x96, 0xc9, 0xb4, 0xea, 0xb1, 0x93, 0x0a, 0xf5, 0x53, 0xfd, 0x6f, 0x33, 0x8b, + 0x43, 0x3b, 0xb9, 0xba, 0x80, 0x69, 0x67, 0x0d, 0x78, 0x0a, 0x93, 0x06, 0xcc, 0x9f, 0xe0, 0x33, + 0x38, 0xfd, 0x54, 0xc8, 0x06, 0xa0, 0xd5, 0x1b, 0x38, 0xeb, 0xad, 0x03, 0x9f, 0xc3, 0xd4, 0x9e, + 0xbe, 0xd8, 0xee, 0xed, 0xcf, 0x6a, 0xf4, 0x81, 0x4a, 0x35, 0x47, 0x9b, 0xc5, 0xaf, 0xbd, 0x8b, + 0x7e, 0xef, 0x5d, 0xf4, 0x67, 0xef, 0xa2, 0xcf, 0xa3, 0xf5, 0xdb, 0x52, 0xc4, 0xd1, 0xd8, 0x7c, + 0x23, 0xaf, 0xff, 0x06, 0x00, 0x00, 0xff, 0xff, 0x86, 0xbb, 0x7c, 0xa8, 0x48, 0x03, 0x00, 0x00, +} + +func (m *DBQueuePopReq) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *DBQueuePopReq) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *DBQueuePopReq) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if m.XXX_unrecognized != nil { + i -= len(m.XXX_unrecognized) + copy(dAtA[i:], m.XXX_unrecognized) + } + if len(m.PushData) > 0 { + i -= len(m.PushData) + copy(dAtA[i:], m.PushData) + i = encodeVarintMessagequeue(dAtA, i, uint64(len(m.PushData))) + i-- + dAtA[i] = 0x2a + } + if m.PopNum != 0 { + i = encodeVarintMessagequeue(dAtA, i, uint64(m.PopNum)) + i-- + dAtA[i] = 0x20 + } + if m.PopStartPos != 0 { + i = encodeVarintMessagequeue(dAtA, i, uint64(m.PopStartPos)) + i-- + dAtA[i] = 0x18 + } + if len(m.QueueName) > 0 { + i -= len(m.QueueName) + copy(dAtA[i:], m.QueueName) + i = encodeVarintMessagequeue(dAtA, i, uint64(len(m.QueueName))) + i-- + dAtA[i] = 0x12 + } + if len(m.CustomerId) > 0 { + i -= len(m.CustomerId) + copy(dAtA[i:], m.CustomerId) + i = encodeVarintMessagequeue(dAtA, i, uint64(len(m.CustomerId))) + i-- + dAtA[i] = 0xa + } + return len(dAtA) - i, nil +} + +func (m *DBQueuePopRes) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *DBQueuePopRes) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *DBQueuePopRes) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if m.XXX_unrecognized != nil { + i -= len(m.XXX_unrecognized) + copy(dAtA[i:], m.XXX_unrecognized) + } + if len(m.PushData) > 0 { + for iNdEx := len(m.PushData) - 1; iNdEx >= 0; iNdEx-- { + i -= len(m.PushData[iNdEx]) + copy(dAtA[i:], m.PushData[iNdEx]) + i = encodeVarintMessagequeue(dAtA, i, uint64(len(m.PushData[iNdEx]))) + i-- + dAtA[i] = 0x12 + } + } + if len(m.QueueName) > 0 { + i -= len(m.QueueName) + copy(dAtA[i:], m.QueueName) + i = encodeVarintMessagequeue(dAtA, i, uint64(len(m.QueueName))) + i-- + dAtA[i] = 0xa + } + return len(dAtA) - i, nil +} + +func (m *DBQueueSubscribeReq) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *DBQueueSubscribeReq) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *DBQueueSubscribeReq) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if m.XXX_unrecognized != nil { + i -= len(m.XXX_unrecognized) + copy(dAtA[i:], m.XXX_unrecognized) + } + if m.OneBatchQuantity != 0 { + i = encodeVarintMessagequeue(dAtA, i, uint64(m.OneBatchQuantity)) + i-- + dAtA[i] = 0x40 + } + if m.StartIndex != 0 { + i = encodeVarintMessagequeue(dAtA, i, uint64(m.StartIndex)) + i-- + dAtA[i] = 0x38 + } + if len(m.TopicName) > 0 { + i -= len(m.TopicName) + copy(dAtA[i:], m.TopicName) + i = encodeVarintMessagequeue(dAtA, i, uint64(len(m.TopicName))) + i-- + dAtA[i] = 0x32 + } + if len(m.RpcMethod) > 0 { + i -= len(m.RpcMethod) + copy(dAtA[i:], m.RpcMethod) + i = encodeVarintMessagequeue(dAtA, i, uint64(len(m.RpcMethod))) + i-- + dAtA[i] = 0x2a + } + if m.FromNodeId != 0 { + i = encodeVarintMessagequeue(dAtA, i, uint64(m.FromNodeId)) + i-- + dAtA[i] = 0x20 + } + if len(m.CustomerId) > 0 { + i -= len(m.CustomerId) + copy(dAtA[i:], m.CustomerId) + i = encodeVarintMessagequeue(dAtA, i, uint64(len(m.CustomerId))) + i-- + dAtA[i] = 0x1a + } + if m.Method != 0 { + i = encodeVarintMessagequeue(dAtA, i, uint64(m.Method)) + i-- + dAtA[i] = 0x10 + } + if m.SubType != 0 { + i = encodeVarintMessagequeue(dAtA, i, uint64(m.SubType)) + i-- + dAtA[i] = 0x8 + } + return len(dAtA) - i, nil +} + +func (m *DBQueueSubscribeRes) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *DBQueueSubscribeRes) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *DBQueueSubscribeRes) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if m.XXX_unrecognized != nil { + i -= len(m.XXX_unrecognized) + copy(dAtA[i:], m.XXX_unrecognized) + } + return len(dAtA) - i, nil +} + +func (m *DBQueuePublishReq) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *DBQueuePublishReq) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *DBQueuePublishReq) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if m.XXX_unrecognized != nil { + i -= len(m.XXX_unrecognized) + copy(dAtA[i:], m.XXX_unrecognized) + } + if len(m.PushData) > 0 { + for iNdEx := len(m.PushData) - 1; iNdEx >= 0; iNdEx-- { + i -= len(m.PushData[iNdEx]) + copy(dAtA[i:], m.PushData[iNdEx]) + i = encodeVarintMessagequeue(dAtA, i, uint64(len(m.PushData[iNdEx]))) + i-- + dAtA[i] = 0x12 + } + } + if len(m.TopicName) > 0 { + i -= len(m.TopicName) + copy(dAtA[i:], m.TopicName) + i = encodeVarintMessagequeue(dAtA, i, uint64(len(m.TopicName))) + i-- + dAtA[i] = 0xa + } + return len(dAtA) - i, nil +} + +func (m *DBQueuePublishRes) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *DBQueuePublishRes) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *DBQueuePublishRes) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if m.XXX_unrecognized != nil { + i -= len(m.XXX_unrecognized) + copy(dAtA[i:], m.XXX_unrecognized) + } + return len(dAtA) - i, nil +} + +func encodeVarintMessagequeue(dAtA []byte, offset int, v uint64) int { + offset -= sovMessagequeue(v) + base := offset + for v >= 1<<7 { + dAtA[offset] = uint8(v&0x7f | 0x80) + v >>= 7 + offset++ + } + dAtA[offset] = uint8(v) + return base +} +func (m *DBQueuePopReq) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + l = len(m.CustomerId) + if l > 0 { + n += 1 + l + sovMessagequeue(uint64(l)) + } + l = len(m.QueueName) + if l > 0 { + n += 1 + l + sovMessagequeue(uint64(l)) + } + if m.PopStartPos != 0 { + n += 1 + sovMessagequeue(uint64(m.PopStartPos)) + } + if m.PopNum != 0 { + n += 1 + sovMessagequeue(uint64(m.PopNum)) + } + l = len(m.PushData) + if l > 0 { + n += 1 + l + sovMessagequeue(uint64(l)) + } + if m.XXX_unrecognized != nil { + n += len(m.XXX_unrecognized) + } + return n +} + +func (m *DBQueuePopRes) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + l = len(m.QueueName) + if l > 0 { + n += 1 + l + sovMessagequeue(uint64(l)) + } + if len(m.PushData) > 0 { + for _, b := range m.PushData { + l = len(b) + n += 1 + l + sovMessagequeue(uint64(l)) + } + } + if m.XXX_unrecognized != nil { + n += len(m.XXX_unrecognized) + } + return n +} + +func (m *DBQueueSubscribeReq) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.SubType != 0 { + n += 1 + sovMessagequeue(uint64(m.SubType)) + } + if m.Method != 0 { + n += 1 + sovMessagequeue(uint64(m.Method)) + } + l = len(m.CustomerId) + if l > 0 { + n += 1 + l + sovMessagequeue(uint64(l)) + } + if m.FromNodeId != 0 { + n += 1 + sovMessagequeue(uint64(m.FromNodeId)) + } + l = len(m.RpcMethod) + if l > 0 { + n += 1 + l + sovMessagequeue(uint64(l)) + } + l = len(m.TopicName) + if l > 0 { + n += 1 + l + sovMessagequeue(uint64(l)) + } + if m.StartIndex != 0 { + n += 1 + sovMessagequeue(uint64(m.StartIndex)) + } + if m.OneBatchQuantity != 0 { + n += 1 + sovMessagequeue(uint64(m.OneBatchQuantity)) + } + if m.XXX_unrecognized != nil { + n += len(m.XXX_unrecognized) + } + return n +} + +func (m *DBQueueSubscribeRes) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.XXX_unrecognized != nil { + n += len(m.XXX_unrecognized) + } + return n +} + +func (m *DBQueuePublishReq) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + l = len(m.TopicName) + if l > 0 { + n += 1 + l + sovMessagequeue(uint64(l)) + } + if len(m.PushData) > 0 { + for _, b := range m.PushData { + l = len(b) + n += 1 + l + sovMessagequeue(uint64(l)) + } + } + if m.XXX_unrecognized != nil { + n += len(m.XXX_unrecognized) + } + return n +} + +func (m *DBQueuePublishRes) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.XXX_unrecognized != nil { + n += len(m.XXX_unrecognized) + } + return n +} + +func sovMessagequeue(x uint64) (n int) { + return (math_bits.Len64(x|1) + 6) / 7 +} +func sozMessagequeue(x uint64) (n int) { + return sovMessagequeue(uint64((x << 1) ^ uint64((int64(x) >> 63)))) +} +func (m *DBQueuePopReq) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowMessagequeue + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: DBQueuePopReq: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: DBQueuePopReq: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field CustomerId", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowMessagequeue + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthMessagequeue + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthMessagequeue + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.CustomerId = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field QueueName", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowMessagequeue + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthMessagequeue + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthMessagequeue + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.QueueName = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 3: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field PopStartPos", wireType) + } + m.PopStartPos = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowMessagequeue + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.PopStartPos |= int32(b&0x7F) << shift + if b < 0x80 { + break + } + } + case 4: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field PopNum", wireType) + } + m.PopNum = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowMessagequeue + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.PopNum |= int32(b&0x7F) << shift + if b < 0x80 { + break + } + } + case 5: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field PushData", wireType) + } + var byteLen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowMessagequeue + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + byteLen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if byteLen < 0 { + return ErrInvalidLengthMessagequeue + } + postIndex := iNdEx + byteLen + if postIndex < 0 { + return ErrInvalidLengthMessagequeue + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.PushData = append(m.PushData[:0], dAtA[iNdEx:postIndex]...) + if m.PushData == nil { + m.PushData = []byte{} + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipMessagequeue(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthMessagequeue + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthMessagequeue + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...) + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *DBQueuePopRes) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowMessagequeue + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: DBQueuePopRes: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: DBQueuePopRes: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field QueueName", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowMessagequeue + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthMessagequeue + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthMessagequeue + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.QueueName = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field PushData", wireType) + } + var byteLen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowMessagequeue + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + byteLen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if byteLen < 0 { + return ErrInvalidLengthMessagequeue + } + postIndex := iNdEx + byteLen + if postIndex < 0 { + return ErrInvalidLengthMessagequeue + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.PushData = append(m.PushData, make([]byte, postIndex-iNdEx)) + copy(m.PushData[len(m.PushData)-1], dAtA[iNdEx:postIndex]) + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipMessagequeue(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthMessagequeue + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthMessagequeue + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...) + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *DBQueueSubscribeReq) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowMessagequeue + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: DBQueueSubscribeReq: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: DBQueueSubscribeReq: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field SubType", wireType) + } + m.SubType = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowMessagequeue + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.SubType |= SubscribeType(b&0x7F) << shift + if b < 0x80 { + break + } + } + case 2: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Method", wireType) + } + m.Method = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowMessagequeue + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.Method |= SubscribeMethod(b&0x7F) << shift + if b < 0x80 { + break + } + } + case 3: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field CustomerId", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowMessagequeue + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthMessagequeue + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthMessagequeue + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.CustomerId = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 4: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field FromNodeId", wireType) + } + m.FromNodeId = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowMessagequeue + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.FromNodeId |= int32(b&0x7F) << shift + if b < 0x80 { + break + } + } + case 5: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field RpcMethod", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowMessagequeue + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthMessagequeue + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthMessagequeue + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.RpcMethod = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 6: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field TopicName", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowMessagequeue + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthMessagequeue + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthMessagequeue + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.TopicName = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 7: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field StartIndex", wireType) + } + m.StartIndex = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowMessagequeue + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.StartIndex |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + case 8: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field OneBatchQuantity", wireType) + } + m.OneBatchQuantity = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowMessagequeue + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.OneBatchQuantity |= int32(b&0x7F) << shift + if b < 0x80 { + break + } + } + default: + iNdEx = preIndex + skippy, err := skipMessagequeue(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthMessagequeue + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthMessagequeue + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...) + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *DBQueueSubscribeRes) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowMessagequeue + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: DBQueueSubscribeRes: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: DBQueueSubscribeRes: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + default: + iNdEx = preIndex + skippy, err := skipMessagequeue(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthMessagequeue + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthMessagequeue + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...) + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *DBQueuePublishReq) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowMessagequeue + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: DBQueuePublishReq: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: DBQueuePublishReq: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field TopicName", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowMessagequeue + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthMessagequeue + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthMessagequeue + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.TopicName = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field PushData", wireType) + } + var byteLen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowMessagequeue + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + byteLen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if byteLen < 0 { + return ErrInvalidLengthMessagequeue + } + postIndex := iNdEx + byteLen + if postIndex < 0 { + return ErrInvalidLengthMessagequeue + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.PushData = append(m.PushData, make([]byte, postIndex-iNdEx)) + copy(m.PushData[len(m.PushData)-1], dAtA[iNdEx:postIndex]) + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipMessagequeue(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthMessagequeue + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthMessagequeue + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...) + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *DBQueuePublishRes) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowMessagequeue + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: DBQueuePublishRes: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: DBQueuePublishRes: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + default: + iNdEx = preIndex + skippy, err := skipMessagequeue(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthMessagequeue + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthMessagequeue + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...) + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func skipMessagequeue(dAtA []byte) (n int, err error) { + l := len(dAtA) + iNdEx := 0 + depth := 0 + for iNdEx < l { + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowMessagequeue + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + wireType := int(wire & 0x7) + switch wireType { + case 0: + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowMessagequeue + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + iNdEx++ + if dAtA[iNdEx-1] < 0x80 { + break + } + } + case 1: + iNdEx += 8 + case 2: + var length int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowMessagequeue + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + length |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if length < 0 { + return 0, ErrInvalidLengthMessagequeue + } + iNdEx += length + case 3: + depth++ + case 4: + if depth == 0 { + return 0, ErrUnexpectedEndOfGroupMessagequeue + } + depth-- + case 5: + iNdEx += 4 + default: + return 0, fmt.Errorf("proto: illegal wireType %d", wireType) + } + if iNdEx < 0 { + return 0, ErrInvalidLengthMessagequeue + } + if depth == 0 { + return iNdEx, nil + } + } + return 0, io.ErrUnexpectedEOF +} + +var ( + ErrInvalidLengthMessagequeue = fmt.Errorf("proto: negative length found during unmarshaling") + ErrIntOverflowMessagequeue = fmt.Errorf("proto: integer overflow") + ErrUnexpectedEndOfGroupMessagequeue = fmt.Errorf("proto: unexpected end of group") +) diff --git a/rpc/messagequeue.proto b/rpc/messagequeue.proto new file mode 100644 index 0000000..0f0628b --- /dev/null +++ b/rpc/messagequeue.proto @@ -0,0 +1,51 @@ +syntax = "proto3"; + +option go_package = ".;rpc"; + + +message DBQueuePopReq { + string CustomerId = 1; + string QueueName = 2; + int32 PopStartPos = 3; + int32 PopNum = 4; + bytes pushData = 5; +} + +message DBQueuePopRes { + string QueueName = 1; + repeated bytes pushData = 2; +} + +enum SubscribeType { + Subscribe = 0; + Unsubscribe = 1; +} + +enum SubscribeMethod { + Method_Custom = 0;//自定义模式,以消费者设置的StartIndex开始获取或订阅 + Method_Last = 1;//Last模式,以该消费者上次记录的位置开始订阅 +} + +//订阅 +message DBQueueSubscribeReq { + SubscribeType SubType = 1; //订阅类型 + SubscribeMethod Method = 2; //订阅方法 + string CustomerId = 3; //消费者Id + int32 FromNodeId = 4; + string RpcMethod = 5; + string TopicName = 6; //主题名称 + uint64 StartIndex = 7; //开始位置 ,格式前4位是时间戳秒,后面是序号。如果填0时,服务自动修改成:(4bit 当前时间秒)| (0000 4bit) + int32 OneBatchQuantity = 8;//订阅一次发送的数量,不设置有默认值1000条 +} + +message DBQueueSubscribeRes { + +} + +message DBQueuePublishReq { + string TopicName = 1; //主是,名称,数据 + repeated bytes pushData = 2; +} + +message DBQueuePublishRes { +} diff --git a/sysservice/messagequeueservice/CustomerSubscriber.go b/sysservice/messagequeueservice/CustomerSubscriber.go new file mode 100644 index 0000000..31f18ba --- /dev/null +++ b/sysservice/messagequeueservice/CustomerSubscriber.go @@ -0,0 +1,229 @@ +package messagequeueservice + +import ( + "errors" + "fmt" + "github.com/duanhf2012/origin/cluster" + "github.com/duanhf2012/origin/log" + "github.com/duanhf2012/origin/rpc" + "github.com/duanhf2012/origin/util/coroutine" + "strings" + "sync/atomic" + "time" +) + +type CustomerSubscriber struct { + rpc.IRpcHandler + topic string + subscriber *Subscriber + fromNodeId int + callBackRpcMethod string + serviceName string + StartIndex uint64 + oneBatchQuantity int32 + subscribeMethod SubscribeMethod + customerId string + + isStop int32 //退出标记 +} + +const DefaultOneBatchQuantity = 1000 + +type SubscribeMethod = int32 + +const ( + MethodCustom SubscribeMethod = 0 //自定义模式,以消费者设置的StartIndex开始获取或订阅 + MethodLast SubscribeMethod = 1 //Last模式,以该消费者上次记录的位置开始订阅 +) + +func (cs *CustomerSubscriber) trySetSubscriberBaseInfo(rpcHandler rpc.IRpcHandler, ss *Subscriber, topic string, subscribeMethod SubscribeMethod, customerId string, fromNodeId int, callBackRpcMethod string, startIndex uint64, oneBatchQuantity int32) error { + cs.subscriber = ss + cs.fromNodeId = fromNodeId + cs.callBackRpcMethod = callBackRpcMethod + //cs.StartIndex = startIndex + cs.subscribeMethod = subscribeMethod + cs.customerId = customerId + cs.StartIndex = startIndex + cs.topic = topic + cs.IRpcHandler = rpcHandler + if oneBatchQuantity == 0 { + cs.oneBatchQuantity = DefaultOneBatchQuantity + } else { + cs.oneBatchQuantity = oneBatchQuantity + } + + strRpcMethod := strings.Split(callBackRpcMethod, ".") + if len(strRpcMethod) != 2 { + err := errors.New("RpcMethod " + callBackRpcMethod + " is error") + log.SError(err.Error()) + return err + } + cs.serviceName = strRpcMethod[0] + + if cluster.HasService(fromNodeId, cs.serviceName) == false { + err := fmt.Errorf("nodeId %d cannot found %s", fromNodeId, cs.serviceName) + log.SError(err.Error()) + return err + } + + if cluster.GetCluster().IsNodeConnected(fromNodeId) == false { + err := fmt.Errorf("nodeId %d is disconnect", fromNodeId) + log.SError(err.Error()) + return err + } + + if startIndex == 0 { + now := time.Now() + zeroTime := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, now.Location()) + //fmt.Println(zeroTime.Unix()) + cs.StartIndex = uint64(zeroTime.Unix() << 32) + } + + return nil +} + +// 开始订阅 +func (cs *CustomerSubscriber) Subscribe(rpcHandler rpc.IRpcHandler, ss *Subscriber, topic string, subscribeMethod SubscribeMethod, customerId string, fromNodeId int, callBackRpcMethod string, startIndex uint64, oneBatchQuantity int32) error { + err := cs.trySetSubscriberBaseInfo(rpcHandler, ss, topic, subscribeMethod, customerId, fromNodeId, callBackRpcMethod, startIndex, oneBatchQuantity) + if err != nil { + return err + } + + cs.subscriber.queueWait.Add(1) + coroutine.GoRecover(cs.SubscribeRun, -1) + return nil +} + +// 取消订阅 +func (cs *CustomerSubscriber) UnSubscribe() { + atomic.StoreInt32(&cs.isStop, 1) +} + +func (cs *CustomerSubscriber) LoadLastIndex() { + for { + if atomic.LoadInt32(&cs.isStop) != 0 { + log.SRelease("topic ", cs.topic, " out of subscription") + break + } + + log.SRelease("customer ", cs.customerId, " start load last index ") + lastIndex, ret := cs.subscriber.dataPersist.LoadCustomerIndex(cs.topic, cs.customerId) + if ret == true { + if lastIndex > 0 { + cs.StartIndex = lastIndex + } else { + //否则直接使用客户端发回来的 + } + log.SRelease("customer ", cs.customerId, " load finish,start index is ", cs.StartIndex) + break + } + + log.SRelease("customer ", cs.customerId, " load last index is fail...") + time.Sleep(5 * time.Second) + } +} + +func (cs *CustomerSubscriber) SubscribeRun() { + defer cs.subscriber.queueWait.Done() + log.SRelease("topic ", cs.topic, " start subscription") + + //加载之前的位置 + if cs.subscribeMethod == MethodLast { + cs.LoadLastIndex() + } + + for { + if atomic.LoadInt32(&cs.isStop) != 0 { + log.SRelease("topic ", cs.topic, " out of subscription") + break + } + + if cs.checkCustomerIsValid() == false { + break + } + + //todo 检测退出 + if cs.subscribe() == false { + log.SRelease("topic ", cs.topic, " out of subscription") + break + } + } + + //删除订阅关系 + cs.subscriber.removeCustomer(cs.customerId, cs) + log.SRelease("topic ", cs.topic, " unsubscription") +} + +func (cs *CustomerSubscriber) subscribe() bool { + //先从内存中查找 + topicData, ret := cs.subscriber.queue.FindData(cs.StartIndex, cs.oneBatchQuantity) + if ret == true { + cs.publishToCustomer(topicData) + return true + } + + //从持久化数据中来找 + topicData = cs.subscriber.dataPersist.FindTopicData(cs.topic, cs.StartIndex, int64(cs.oneBatchQuantity)) + return cs.publishToCustomer(topicData) +} + +func (cs *CustomerSubscriber) checkCustomerIsValid() bool { + //1.检查nodeid是否在线,不在线,直接取消订阅 + if cluster.GetCluster().IsNodeConnected(cs.fromNodeId) == false { + return false + } + + //2.验证是否有该服务,如果没有则退出 + if cluster.HasService(cs.fromNodeId, cs.serviceName) == false { + return false + } + + return true +} + +func (cs *CustomerSubscriber) publishToCustomer(topicData []TopicData) bool { + if cs.checkCustomerIsValid() == false { + return false + } + + if len(topicData) == 0 { + //没有任何数据待一秒吧 + time.Sleep(time.Millisecond * 100) + return true + } + + //3.发送失败重试发送 + var dbQueuePublishReq rpc.DBQueuePublishReq + var dbQueuePushRes rpc.DBQueuePublishRes + dbQueuePublishReq.TopicName = cs.topic + cs.subscriber.dataPersist.OnPushTopicDataToCustomer(cs.topic, topicData) + for i := 0; i < len(topicData); i++ { + dbQueuePublishReq.PushData = append(dbQueuePublishReq.PushData, topicData[i].RawData) + } + + for { + if atomic.LoadInt32(&cs.isStop) != 0 { + break + } + + if cs.checkCustomerIsValid() == false { + return false + } + + //推送数据 + err := cs.CallNode(cs.fromNodeId, cs.callBackRpcMethod, &dbQueuePublishReq, &dbQueuePushRes) + if err != nil { + time.Sleep(time.Second * 1) + continue + } + + //持久化进度 + endIndex := cs.subscriber.dataPersist.GetIndex(&topicData[len(topicData)-1]) + cs.StartIndex = endIndex + cs.subscriber.dataPersist.PersistIndex(cs.topic, cs.customerId, endIndex) + + return true + } + + return true +} diff --git a/sysservice/messagequeueservice/MemoryQueue.go b/sysservice/messagequeueservice/MemoryQueue.go new file mode 100644 index 0000000..676f8c9 --- /dev/null +++ b/sysservice/messagequeueservice/MemoryQueue.go @@ -0,0 +1,97 @@ +package messagequeueservice + +import ( + "github.com/duanhf2012/origin/util/algorithms" + "sync" +) + +type MemoryQueue struct { + subscriber *Subscriber + + topicQueue []TopicData + head int32 + tail int32 + + locker sync.RWMutex +} + +func (mq *MemoryQueue) Init(cap int32) { + mq.topicQueue = make([]TopicData, cap+1) +} + +// 从队尾Push数据 +func (mq *MemoryQueue) Push(topicData *TopicData) bool { + mq.locker.Lock() + defer mq.locker.Unlock() + + nextPos := (mq.tail + 1) % int32(len(mq.topicQueue)) + //如果队列满了 + if nextPos == mq.head { + //将对首的数据删除掉 + mq.head++ + mq.head = mq.head % int32(len(mq.topicQueue)) + } + + mq.tail = nextPos + mq.topicQueue[mq.tail] = *topicData + return true +} + +func (mq *MemoryQueue) findData(startPos int32, startIndex uint64, limit int32) ([]TopicData, bool) { + //空队列,无数据 + if mq.head == mq.tail { + return nil, true + } + + var findStartPos int32 + var findEndPos int32 + findStartPos = startPos //(mq.head + 1) % cap(mq.topicQueue) + if findStartPos <= mq.tail { + findEndPos = mq.tail + 1 + } else { + findEndPos = int32(cap(mq.topicQueue)) + } + + //二分查找位置 + pos := int32(algorithms.BiSearch(mq.topicQueue[findStartPos:findEndPos], startIndex, 1)) + if pos == -1 { + return nil, true + } + + pos += findStartPos + //取得结束位置 + endPos := limit + pos + if endPos > findEndPos { + endPos = findEndPos + } + + return mq.topicQueue[pos:endPos], true +} + +// FindData 返回参数[]TopicData 表示查找到的数据,nil表示无数据。bool表示是否不应该在内存中来查 +func (mq *MemoryQueue) FindData(startIndex uint64, limit int32) ([]TopicData, bool) { + mq.locker.RLock() + defer mq.locker.RUnlock() + + //队列为空时,应该从数据库查找 + if mq.head == mq.tail { + return nil, false + } + + /* + //先判断startIndex是否比第一个元素要大 + headTopic := (mq.head + 1) % int32(len(mq.topicQueue)) + //此时需要从持久化数据中取 + if startIndex+1 > mq.topicQueue[headTopic].Seq { + return nil, false + } + */ + + retData, ret := mq.findData(mq.head+1, startIndex, limit) + if mq.head <= mq.tail || ret == true { + return retData, true + } + + //如果是正常head在后,尾在前,从数组0下标开始找到tail + return mq.findData(0, startIndex, limit) +} diff --git a/sysservice/messagequeueservice/MemoryQueue_test.go b/sysservice/messagequeueservice/MemoryQueue_test.go new file mode 100644 index 0000000..16ad97b --- /dev/null +++ b/sysservice/messagequeueservice/MemoryQueue_test.go @@ -0,0 +1,36 @@ +package messagequeueservice + +import ( + "fmt" + "testing" +) + +type In int + +func (i In) GetValue() int { + return int(i) +} + +func Test_BiSearch(t *testing.T) { + var memQueue MemoryQueue + memQueue.Init(5) + + for i := 1; i <= 8; i++ { + memQueue.Push(&TopicData{Seq: uint64(i)}) + } + + startindex := uint64(0) + for { + retData, ret := memQueue.FindData(startindex+1, 10) + fmt.Println(retData, ret) + for _, d := range retData { + if d.Seq > startindex { + startindex = d.Seq + } + } + if ret == false { + break + } + } + +} diff --git a/sysservice/messagequeueservice/MessageQueueService.go b/sysservice/messagequeueservice/MessageQueueService.go new file mode 100644 index 0000000..7f93822 --- /dev/null +++ b/sysservice/messagequeueservice/MessageQueueService.go @@ -0,0 +1,126 @@ +package messagequeueservice + +import ( + "errors" + "fmt" + "github.com/duanhf2012/origin/log" + "github.com/duanhf2012/origin/service" + "github.com/duanhf2012/origin/rpc" + "sync" +) + +type QueueDataPersist interface { + service.IModule + + OnExit() + OnReceiveTopicData(topic string, topicData []TopicData) //当收到推送过来的数据时 + OnPushTopicDataToCustomer(topic string, topicData []TopicData) //当推送数据到Customer时回调 + PersistTopicData(topic string, topicData []TopicData, retryCount int) ([]TopicData, bool) //持久化数据,失败则返回false,上层会重复尝试,直到成功,建议在函数中加入次数,超过次数则返回true + FindTopicData(topic string, startIndex uint64, limit int64) []TopicData //查找数据,参数bool代表数据库查找是否成功 + LoadCustomerIndex(topic string, customerId string) (uint64, bool) //false时代表获取失败,一般是读取错误,会进行重试。如果不存在时,返回(0,true) + GetIndex(topicData *TopicData) uint64 //通过topic数据获取进度索引号 + PersistIndex(topic string, customerId string, index uint64) //持久化进度索引号 +} + +type MessageQueueService struct { + service.Service + + sync.Mutex + mapTopicRoom map[string]*TopicRoom + + queueWait sync.WaitGroup + dataPersist QueueDataPersist + + memoryQueueLen int32 + maxProcessTopicBacklogNum int32 //最大积压的数据量,因为是写入到channel中,然后由协程取出再持久化,不设置有默认值100000 +} + +func (ms *MessageQueueService) OnInit() error { + ms.mapTopicRoom = map[string]*TopicRoom{} + errC := ms.ReadCfg() + if errC != nil { + return errC + } + + if ms.dataPersist == nil { + return errors.New("not setup QueueDataPersist.") + } + + _, err := ms.AddModule(ms.dataPersist) + if err != nil { + return err + } + + return nil +} + +func (ms *MessageQueueService) ReadCfg() error { + mapDBServiceCfg, ok := ms.GetService().GetServiceCfg().(map[string]interface{}) + if ok == false { + return fmt.Errorf("MessageQueueService config is error") + } + + maxProcessTopicBacklogNum, ok := mapDBServiceCfg["MaxProcessTopicBacklogNum"] + if ok == false { + ms.maxProcessTopicBacklogNum = DefaultMaxTopicBacklogNum + log.SRelease("MaxProcessTopicBacklogNum config is set to the default value of ", maxProcessTopicBacklogNum) + } else { + ms.maxProcessTopicBacklogNum = int32(maxProcessTopicBacklogNum.(float64)) + } + + memoryQueueLen, ok := mapDBServiceCfg["MemoryQueueLen"] + if ok == false { + ms.memoryQueueLen = DefaultMemoryQueueLen + log.SRelease("MemoryQueueLen config is set to the default value of ", DefaultMemoryQueueLen) + } else { + ms.memoryQueueLen = int32(memoryQueueLen.(float64)) + } + + return nil +} + +func (ms *MessageQueueService) Setup(dataPersist QueueDataPersist) { + ms.dataPersist = dataPersist +} + +func (ms *MessageQueueService) OnRelease() { + + //停止所有的TopicRoom房间 + ms.Lock() + for _, room := range ms.mapTopicRoom { + room.Stop() + } + ms.Unlock() + + //释放时确保所有的协程退出 + ms.queueWait.Wait() + + //通知持久化对象 + ms.dataPersist.OnExit() +} + +func (ms *MessageQueueService) GetTopicRoom(topic string) *TopicRoom { + ms.Lock() + defer ms.Unlock() + topicRoom := ms.mapTopicRoom[topic] + if topicRoom != nil { + return topicRoom + } + + topicRoom = &TopicRoom{} + topicRoom.Init(ms.maxProcessTopicBacklogNum, ms.memoryQueueLen, topic, &ms.queueWait, ms.dataPersist) + ms.mapTopicRoom[topic] = topicRoom + + return topicRoom +} + +func (ms *MessageQueueService) RPC_Publish(inParam *rpc.DBQueuePublishReq, outParam *rpc.DBQueuePublishRes) error { + + topicRoom := ms.GetTopicRoom(inParam.TopicName) + return topicRoom.Publish(inParam.PushData) +} + +func (ms *MessageQueueService) RPC_Subscribe(req *rpc.DBQueueSubscribeReq, res *rpc.DBQueueSubscribeRes) error { + topicRoom := ms.GetTopicRoom(req.TopicName) + return topicRoom.TopicSubscribe(ms.GetRpcHandler(), req.SubType, int32(req.Method), int(req.FromNodeId), req.RpcMethod, req.TopicName, req.CustomerId, req.StartIndex, req.OneBatchQuantity) +} diff --git a/sysservice/messagequeueservice/MongoPersist.go b/sysservice/messagequeueservice/MongoPersist.go new file mode 100644 index 0000000..25914af --- /dev/null +++ b/sysservice/messagequeueservice/MongoPersist.go @@ -0,0 +1,358 @@ +package messagequeueservice + +import ( + "fmt" + "github.com/duanhf2012/origin/log" + "github.com/duanhf2012/origin/service" + "github.com/duanhf2012/origin/sysmodule/mongodbmodule" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/mongo/options" + "sunserver/common/util" + "time" +) + +const MaxDays = 180 + +type MongoPersist struct { + service.Module + mongo mongodbmodule.MongoModule + + url string //连接url + dbName string //数据库名称 + retryCount int //落地数据库重试次数 + + topic []TopicData //用于临时缓存 +} + +const CustomerCollectName = "SysCustomer" + +func (mp *MongoPersist) OnInit() error { + if errC := mp.ReadCfg(); errC != nil { + return errC + } + + err := mp.mongo.Init(mp.url, time.Second*15) + if err != nil { + return err + } + + err = mp.mongo.Start() + if err != nil { + log.SError("start dbService[", mp.dbName, "], url[", mp.url, "] init error:", err.Error()) + return err + } + + //添加索引 + var IndexKey [][]string + var keys []string + keys = append(keys, "Customer", "Topic") + IndexKey = append(IndexKey, keys) + s := mp.mongo.TakeSession() + if err := s.EnsureUniqueIndex(mp.dbName, CustomerCollectName, IndexKey, true, true); err != nil { + log.SError("EnsureUniqueIndex is fail ", err.Error()) + return err + } + + return nil +} + +func (mp *MongoPersist) ReadCfg() error { + mapDBServiceCfg, ok := mp.GetService().GetServiceCfg().(map[string]interface{}) + if ok == false { + return fmt.Errorf("MessageQueueService config is error") + } + + //parse MsgRouter + url, ok := mapDBServiceCfg["Url"] + if ok == false { + return fmt.Errorf("MessageQueueService config is error") + } + mp.url = url.(string) + + dbName, ok := mapDBServiceCfg["DBName"] + if ok == false { + return fmt.Errorf("MessageQueueService config is error") + } + mp.dbName = dbName.(string) + + // + goroutineNum, ok := mapDBServiceCfg["RetryCount"] + if ok == false { + return fmt.Errorf("MongoPersist config is error") + } + mp.retryCount = int(goroutineNum.(float64)) + + return nil +} + +func (mp *MongoPersist) getTopicBuff(limit int) []TopicData { + if cap(mp.topic) < limit { + mp.topic = make([]TopicData, limit) + } + + return mp.topic[:0] +} + +func (mp *MongoPersist) OnExit() { +} + +// OnReceiveTopicData 当收到推送过来的数据时 +func (mp *MongoPersist) OnReceiveTopicData(topic string, topicData []TopicData) { + //1.收到推送过来的数据,在里面插入_id字段 + for i := 0; i < len(topicData); i++ { + var document bson.D + err := bson.Unmarshal(topicData[i].RawData, &document) + if err != nil { + topicData[i].RawData = nil + log.SError(topic, " data Unmarshal is fail ", err.Error()) + continue + } + + document = append(document, bson.E{Key: "_id", Value: topicData[i].Seq}) + + byteRet, err := bson.Marshal(document) + if err != nil { + topicData[i].RawData = nil + log.SError(topic, " data Marshal is fail ", err.Error()) + continue + } + topicData[i].ExtendParam = document + topicData[i].RawData = byteRet + } +} + +// OnPushTopicDataToCustomer 当推送数据到Customer时回调 +func (mp *MongoPersist) OnPushTopicDataToCustomer(topic string, topicData []TopicData) { + +} + +// PersistTopicData 持久化数据 +func (mp *MongoPersist) persistTopicData(collectionName string, topicData []TopicData, retryCount int) bool { + s := mp.mongo.TakeSession() + ctx, cancel := s.GetDefaultContext() + defer cancel() + + var documents []interface{} + for _, tData := range topicData { + if tData.ExtendParam == nil { + continue + } + documents = append(documents, tData.ExtendParam) + } + + _, err := s.Collection(mp.dbName, collectionName).InsertMany(ctx, documents) + if err != nil { + log.SError("PersistTopicData InsertMany fail,collect name is ", collectionName) + + //失败最大重试数量 + return retryCount >= mp.retryCount + } + + //log.SRelease("+++++++++====", time.Now().UnixNano()) + return true +} + +// PersistTopicData 持久化数据 +func (mp *MongoPersist) PersistTopicData(topic string, topicData []TopicData, retryCount int) ([]TopicData, bool) { + if len(topicData) == 0 { + return nil, true + } + + preDate := topicData[0].Seq >> 32 + var findPos int + for findPos = 1; findPos < len(topicData); findPos++ { + newDate := topicData[findPos].Seq >> 32 + //说明换天了 + if preDate != newDate { + break + } + } + + collectName := fmt.Sprintf("%s_%s", topic, mp.GetDateByIndex(topicData[0].Seq)) + ret := mp.persistTopicData(collectName, topicData[:findPos], retryCount) + //如果失败,下次重试 + if ret == false { + return nil, false + } + + //如果成功 + return topicData[findPos:len(topicData)], true +} + +// FindTopicData 查找数据 +func (mp *MongoPersist) findTopicData(topic string, startIndex uint64, limit int64) ([]TopicData, bool) { + s := mp.mongo.TakeSession() + ctx, cancel := s.GetDefaultContext() + defer cancel() + + condition := bson.D{{Key: "_id", Value: bson.D{{Key: "$gt", Value: startIndex}}}} + + var findOption options.FindOptions + findOption.SetLimit(limit) + var findOptions []*options.FindOptions + findOptions = append(findOptions, &findOption) + + collectName := fmt.Sprintf("%s_%s", topic, mp.GetDateByIndex(startIndex)) + cursor, err := s.Collection(mp.dbName, collectName).Find(ctx, condition, findOptions...) + if err != nil || cursor.Err() != nil { + if err == nil { + err = cursor.Err() + } + + if err != nil { + log.SError("find collect name ", topic, " is error:", err.Error()) + return nil, false + } + + return nil, false + } + + var res []interface{} + ctxAll, cancelAll := s.GetDefaultContext() + defer cancelAll() + err = cursor.All(ctxAll, &res) + if err != nil { + if err != nil { + log.SError("find collect name ", topic, " is error:", err.Error()) + return nil, false + } + + return nil, false + } + + //序列化返回 + topicBuff := mp.getTopicBuff(int(limit)) + for i := 0; i < len(res); i++ { + rawData, errM := bson.Marshal(res[i]) + if errM != nil { + if errM != nil { + log.SError("collect name ", topic, " Marshal is error:", err.Error()) + return nil, false + } + continue + } + topicBuff = append(topicBuff, TopicData{RawData: rawData}) + } + + return topicBuff, true +} + +// FindTopicData 查找数据 +func (mp *MongoPersist) FindTopicData(topic string, startIndex uint64, limit int64) []TopicData { + //某表找不到,一直往前找,找到当前置为止 + for days := 1; days <= MaxDays; days++ { + //从startIndex开始一直往后查 + topicData, isSucc := mp.findTopicData(topic, startIndex, limit) + //有数据或者出错时,返回 + if len(topicData) > 0 || isSucc == false { + return topicData + } + + //找不到数据时,判断当前日期是否一致 + if mp.GetDateByIndex(startIndex) >= mp.GetNowTime() { + break + } + + startIndex = mp.GetNextIndex(startIndex, days) + } + + return nil +} + +func (mp *MongoPersist) GetNowTime() string { + now := time.Now() + zeroTime := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, now.Location()) + return zeroTime.Format("20060102") +} + +func (mp *MongoPersist) GetDateByIndex(startIndex uint64) string { + startTm := int64(startIndex >> 32) + return time.Unix(startTm, 0).Format("20060102") +} + +func (mp *MongoPersist) GetNextIndex(startIndex uint64, addDay int) uint64 { + startTime := time.Unix(int64(startIndex>>32), 0) + dateTime := time.Date(startTime.Year(), startTime.Month(), startTime.Day(), 0, 0, 0, 0, startTime.Location()) + newDateTime := dateTime.AddDate(0, 0, addDay) + nextIndex := uint64(newDateTime.Unix()) << 32 + return nextIndex +} + +// LoadCustomerIndex false时代表获取失败,一般是读取错误,会进行重试。如果不存在时,返回(0,true) +func (mp *MongoPersist) LoadCustomerIndex(topic string, customerId string) (uint64, bool) { + s := mp.mongo.TakeSession() + ctx, cancel := s.GetDefaultContext() + defer cancel() + + condition := bson.D{{Key: "Customer", Value: customerId}, {Key: "Topic", Value: topic}} + cursor, err := s.Collection(mp.dbName, CustomerCollectName).Find(ctx, condition) + if err != nil { + log.SError("Load topic ", topic, " customer ", customerId, " is fail:", err.Error()) + return 0, false + } + + type findRes struct { + Index uint64 `bson:"Index,omitempty"` + } + + var res []findRes + ctxAll, cancelAll := s.GetDefaultContext() + defer cancelAll() + err = cursor.All(ctxAll, &res) + if err != nil { + log.SError("Load topic ", topic, " customer ", customerId, " is fail:", err.Error()) + return 0, false + } + + if len(res) == 0 { + return 0, true + } + + return res[0].Index, true +} + +// GetIndex 通过topic数据获取进度索引号 +func (mp *MongoPersist) GetIndex(topicData *TopicData) uint64 { + if topicData.Seq > 0 { + return topicData.Seq + } + + var document bson.D + err := bson.Unmarshal(topicData.RawData, &document) + if err != nil { + log.SError("GetIndex is fail ", err.Error()) + return 0 + } + + for _, e := range document { + if e.Key == "_id" { + errC, seq := util.ConvertToNumber[uint64](e.Value) + if errC != nil { + log.Error("value is error:%s,%+v, ", errC.Error(), e.Value) + } + + return seq + } + } + return topicData.Seq +} + +// PersistIndex 持久化进度索引号 +func (mp *MongoPersist) PersistIndex(topic string, customerId string, index uint64) { + s := mp.mongo.TakeSession() + + condition := bson.D{{Key: "Customer", Value: customerId}, {Key: "Topic", Value: topic}} + upsert := bson.M{"Customer": customerId, "Topic": topic, "Index": index} + updata := bson.M{"$set": upsert} + + var UpdateOptionsOpts []*options.UpdateOptions + UpdateOptionsOpts = append(UpdateOptionsOpts, options.Update().SetUpsert(true)) + + ctx, cancel := s.GetDefaultContext() + defer cancel() + ret, err := s.Collection(mp.dbName, CustomerCollectName).UpdateOne(ctx, condition, updata, UpdateOptionsOpts...) + fmt.Println(ret) + if err != nil { + log.SError("PersistIndex fail :", err.Error()) + } +} diff --git a/sysservice/messagequeueservice/MongoPersist_test.go b/sysservice/messagequeueservice/MongoPersist_test.go new file mode 100644 index 0000000..851b057 --- /dev/null +++ b/sysservice/messagequeueservice/MongoPersist_test.go @@ -0,0 +1,122 @@ +package messagequeueservice + +import ( + "fmt" + "go.mongodb.org/mongo-driver/bson" + "testing" + "time" +) + +var seq uint64 +var lastTime int64 + +func NextSeq(addDays int) uint64 { + now := time.Now().AddDate(0, 0, addDays) + + nowSec := now.Unix() + if nowSec != lastTime { + seq = 0 + lastTime = nowSec + } + //必需从1开始,查询时seq>0 + seq += 1 + + return uint64(nowSec)<<32 | uint64(seq) +} + +func Test_MongoPersist(t *testing.T) { + //1.初始化 + var mongoPersist MongoPersist + mongoPersist.url = "mongodb://admin:123456@192.168.2.15:27017/?minPoolSize=5&maxPoolSize=35&maxIdleTimeMS=30000" + mongoPersist.dbName = "MongoPersistTest" + mongoPersist.retryCount = 10 + mongoPersist.OnInit() + + //2. + //加载索引 + index, ret := mongoPersist.LoadCustomerIndex("TestTopic", "TestCustomer") + fmt.Println(index, ret) + + now := time.Now() + zeroTime := time.Date(now.Year(), now.Month(), now.Day()+1, 0, 0, 0, 0, now.Location()) + //fmt.Println(zeroTime.Unix()) + startIndex := uint64(zeroTime.Unix()<<32) | 1 + + //存储索引 + mongoPersist.PersistIndex("TestTopic", "TestCustomer", startIndex) + + //加载索引 + index, ret = mongoPersist.LoadCustomerIndex("TestTopic", "TestCustomer") + + type RowTest struct { + Name string `bson:"Name,omitempty"` + MapTest map[int]int `bson:"MapTest,omitempty"` + Message string `bson:"Message,omitempty"` + } + + type RowTest2 struct { + Id uint64 `bson:"_id,omitempty"` + Name string `bson:"Name,omitempty"` + MapTest map[int]int `bson:"MapTest,omitempty"` + Message string `bson:"Message,omitempty"` + } + + //存档 + var findStartIndex uint64 + var topicData []TopicData + for i := 1; i <= 1000; i++ { + + var rowTest RowTest + rowTest.Name = fmt.Sprintf("Name_%d", i) + rowTest.MapTest = make(map[int]int, 1) + rowTest.MapTest[i] = i*1000 + i + rowTest.Message = fmt.Sprintf("xxxxxxxxxxxxxxxxxx%d", i) + byteRet, _ := bson.Marshal(rowTest) + + var dataSeq uint64 + if i <= 500 { + dataSeq = NextSeq(-1) + } else { + dataSeq = NextSeq(0) + } + + topicData = append(topicData, TopicData{RawData: byteRet, Seq: dataSeq}) + + if i == 1 { + findStartIndex = topicData[0].Seq + } + } + + mongoPersist.OnReceiveTopicData("TestTopic", topicData) + + for { + if len(topicData) == 0 { + break + } + + topicData, ret = mongoPersist.PersistTopicData("TestTopic", topicData, 1) + fmt.Println(ret) + } + + // + for { + retTopicData := mongoPersist.FindTopicData("TestTopic", findStartIndex, 300) + for i, data := range retTopicData { + var rowTest RowTest2 + bson.Unmarshal(data.RawData, &rowTest) + t.Log(rowTest.Name) + + if i == len(retTopicData)-1 { + findStartIndex = mongoPersist.GetIndex(&data) + } + } + + t.Log("..................") + if len(retTopicData) == 0 { + break + } + } + + //t.Log(mongoPersist.GetIndex(&retTopicData[0])) + //t.Log(mongoPersist.GetIndex(&retTopicData[len(retTopicData)-1])) +} diff --git a/sysservice/messagequeueservice/Subscriber.go b/sysservice/messagequeueservice/Subscriber.go new file mode 100644 index 0000000..d394c23 --- /dev/null +++ b/sysservice/messagequeueservice/Subscriber.go @@ -0,0 +1,91 @@ +package messagequeueservice + +import ( + "github.com/duanhf2012/origin/log" + "github.com/duanhf2012/origin/rpc" + + "sync" +) + +// 订阅器 +type Subscriber struct { + customerLocker sync.RWMutex + mapCustomer map[string]*CustomerSubscriber + queue MemoryQueue + dataPersist QueueDataPersist //对列数据处理器 + queueWait *sync.WaitGroup +} + +func (ss *Subscriber) Init(memoryQueueCap int32) { + ss.queue.Init(memoryQueueCap) + ss.mapCustomer = make(map[string]*CustomerSubscriber, 5) +} + +func (ss *Subscriber) PushTopicDataToQueue(topic string, topics []TopicData) { + for i := 0; i < len(topics); i++ { + ss.queue.Push(&topics[i]) + } +} + +func (ss *Subscriber) PersistTopicData(topic string, topics []TopicData, retryCount int) ([]TopicData, bool) { + return ss.dataPersist.PersistTopicData(topic, topics, retryCount) +} + +func (ss *Subscriber) TopicSubscribe(rpcHandler rpc.IRpcHandler, subScribeType rpc.SubscribeType, subscribeMethod SubscribeMethod, fromNodeId int, callBackRpcMethod string, topic string, customerId string, StartIndex uint64, oneBatchQuantity int32) error { + //取消订阅时 + if subScribeType == rpc.SubscribeType_Unsubscribe { + ss.UnSubscribe(customerId) + return nil + } else { + ss.customerLocker.Lock() + customerSubscriber, ok := ss.mapCustomer[customerId] + if ok == true { + //已经订阅过,则取消订阅 + customerSubscriber.UnSubscribe() + delete(ss.mapCustomer, customerId) + } + + //不存在,则订阅 + customerSubscriber = &CustomerSubscriber{} + ss.mapCustomer[customerId] = customerSubscriber + ss.customerLocker.Unlock() + + err := customerSubscriber.Subscribe(rpcHandler, ss, topic, subscribeMethod, customerId, fromNodeId, callBackRpcMethod, StartIndex, oneBatchQuantity) + if err != nil { + return err + } + + if ok == true { + log.SRelease("repeat subscription for customer ", customerId) + } else { + log.SRelease("subscription for customer ", customerId) + } + + } + + return nil +} + +func (ss *Subscriber) UnSubscribe(customerId string) { + ss.customerLocker.RLocker() + defer ss.customerLocker.RUnlock() + + customerSubscriber, ok := ss.mapCustomer[customerId] + if ok == false { + log.SWarning("failed to unsubscribe customer " + customerId) + return + } + + customerSubscriber.UnSubscribe() +} + +func (ss *Subscriber) removeCustomer(customerId string, cs *CustomerSubscriber) { + + ss.customerLocker.Lock() + //确保删掉是当前的关系。有可能在替换订阅时,将该customer替换的情况 + customer, _ := ss.mapCustomer[customerId] + if customer == cs { + delete(ss.mapCustomer, customerId) + } + ss.customerLocker.Unlock() +} diff --git a/sysservice/messagequeueservice/TopicRoom.go b/sysservice/messagequeueservice/TopicRoom.go new file mode 100644 index 0000000..918380d --- /dev/null +++ b/sysservice/messagequeueservice/TopicRoom.go @@ -0,0 +1,146 @@ +package messagequeueservice + +import ( + "errors" + "github.com/duanhf2012/origin/log" + "github.com/duanhf2012/origin/util/coroutine" + "sync" + "sync/atomic" + "time" +) + +type TopicData struct { + Seq uint64 //序号 + RawData []byte //原始数据 + + ExtendParam interface{} //扩展参数 +} + +func (t TopicData) GetValue() uint64 { + return t.Seq +} + +var topicFullError = errors.New("topic room is full") + +const DefaultOnceProcessTopicDataNum = 1024 //一次处理的topic数量,考虑批量落地的数量 +const DefaultMaxTopicBacklogNum = 100000 //处理的channel最大数量 +const DefaultMemoryQueueLen = 50000 //内存的最大长度 +const maxTryPersistNum = 3000 //最大重试次数,约>5分钟 + +type TopicRoom struct { + topic string //主题名称 + channelTopic chan TopicData //主题push过来待处理的数据 + + Subscriber //订阅器 + + //序号生成 + seq uint32 + lastTime int64 + + //onceProcessTopicDataNum int //一次处理的订阅数据最大量,方便订阅器Subscriber和QueueDataProcessor批量处理 + StagingBuff []TopicData + + isStop int32 +} + +// maxProcessTopicBacklogNum:主题最大积压数量 +func (tr *TopicRoom) Init(maxTopicBacklogNum int32, memoryQueueLen int32, topic string, queueWait *sync.WaitGroup, dataPersist QueueDataPersist) { + if maxTopicBacklogNum == 0 { + maxTopicBacklogNum = DefaultMaxTopicBacklogNum + } + + tr.channelTopic = make(chan TopicData, maxTopicBacklogNum) + tr.topic = topic + tr.dataPersist = dataPersist + tr.queueWait = queueWait + tr.StagingBuff = make([]TopicData, DefaultOnceProcessTopicDataNum) + tr.queueWait.Add(1) + tr.Subscriber.Init(memoryQueueLen) + coroutine.GoRecover(tr.topicRoomRun, -1) +} + +func (tr *TopicRoom) Publish(data [][]byte) error { + if len(tr.channelTopic)+len(data) > cap(tr.channelTopic) { + return topicFullError + } + + //生成有序序号 + for _, rawData := range data { + tr.channelTopic <- TopicData{RawData: rawData, Seq: tr.NextSeq()} + } + + return nil +} + +func (tr *TopicRoom) NextSeq() uint64 { + now := time.Now() + + nowSec := now.Unix() + if nowSec != tr.lastTime { + tr.seq = 0 + tr.lastTime = nowSec + } + //必需从1开始,查询时seq>0 + tr.seq += 1 + + return uint64(nowSec)<<32 | uint64(tr.seq) +} + +func (tr *TopicRoom) Stop() { + atomic.StoreInt32(&tr.isStop, 1) +} + +func (tr *TopicRoom) topicRoomRun() { + defer tr.queueWait.Done() + + log.SRelease("topic room ", tr.topic, " is running..") + for { + if atomic.LoadInt32(&tr.isStop) != 0 { + break + } + stagingBuff := tr.StagingBuff[:0] + + for i := 0; i < len(tr.channelTopic) && i < DefaultOnceProcessTopicDataNum; i++ { + topicData := <-tr.channelTopic + + stagingBuff = append(stagingBuff, topicData) + } + tr.Subscriber.dataPersist.OnReceiveTopicData(tr.topic, stagingBuff) + //持久化与放内存 + if len(stagingBuff) == 0 { + time.Sleep(time.Millisecond * 50) + continue + } + + //如果落地失败,最大重试maxTryPersistNum次数 + var ret bool + for j := 0; j < maxTryPersistNum; { + //持久化处理 + stagingBuff, ret = tr.PersistTopicData(tr.topic, stagingBuff, j+1) + //如果存档成功,并且有后续批次,则继续存档 + if ret == true && len(stagingBuff) > 0 { + //二次存档不计次数 + continue + } + + //计数增加一次,并且等待100ms,继续重试 + j += 1 + if ret == false { + time.Sleep(time.Millisecond * 100) + continue + } + + tr.PushTopicDataToQueue(tr.topic, stagingBuff) + break + } + } + + //将所有的订阅取消 + tr.customerLocker.Lock() + for _, customer := range tr.mapCustomer { + customer.UnSubscribe() + } + tr.customerLocker.Unlock() + + log.SRelease("topic room ", tr.topic, " is stop") +}