diff --git a/sysservice/messagequeueservice/CustomerSubscriber.go b/sysservice/messagequeueservice/CustomerSubscriber.go index 7867c1a..741ef00 100644 --- a/sysservice/messagequeueservice/CustomerSubscriber.go +++ b/sysservice/messagequeueservice/CustomerSubscriber.go @@ -104,11 +104,11 @@ func (cs *CustomerSubscriber) UnSubscribe() { func (cs *CustomerSubscriber) LoadLastIndex() { for { if atomic.LoadInt32(&cs.isStop) != 0 { - log.Info("topic ", cs.topic, " out of subscription") + log.SInfo("topic ", cs.topic, " out of subscription") break } - log.Info("customer ", cs.customerId, " start load last index ") + log.SInfo("customer ", cs.customerId, " start load last index ") lastIndex, ret := cs.subscriber.dataPersist.LoadCustomerIndex(cs.topic, cs.customerId) if ret == true { if lastIndex > 0 { @@ -116,18 +116,18 @@ func (cs *CustomerSubscriber) LoadLastIndex() { } else { //否则直接使用客户端发回来的 } - log.Info("customer ", cs.customerId, " load finish,start index is ", cs.StartIndex) + log.SInfo("customer ", cs.customerId, " load finish,start index is ", cs.StartIndex) break } - log.Info("customer ", cs.customerId, " load last index is fail...") + log.SInfo("customer ", cs.customerId, " load last index is fail...") time.Sleep(5 * time.Second) } } func (cs *CustomerSubscriber) SubscribeRun() { defer cs.subscriber.queueWait.Done() - log.Info("topic ", cs.topic, " start subscription") + log.SInfo("topic ", cs.topic, " start subscription") //加载之前的位置 if cs.subscribeMethod == MethodLast { @@ -136,7 +136,7 @@ func (cs *CustomerSubscriber) SubscribeRun() { for { if atomic.LoadInt32(&cs.isStop) != 0 { - log.Info("topic ", cs.topic, " out of subscription") + log.SInfo("topic ", cs.topic, " out of subscription") break } @@ -146,14 +146,14 @@ func (cs *CustomerSubscriber) SubscribeRun() { //todo 检测退出 if cs.subscribe() == false { - log.Info("topic ", cs.topic, " out of subscription") + log.SInfo("topic ", cs.topic, " out of subscription") break } } //删除订阅关系 cs.subscriber.removeCustomer(cs.customerId, cs) - log.Info("topic ", cs.topic, " unsubscription") + log.SInfo("topic ", cs.topic, " unsubscription") } func (cs *CustomerSubscriber) subscribe() bool { diff --git a/sysservice/messagequeueservice/MessageQueueService.go b/sysservice/messagequeueservice/MessageQueueService.go index 80ff8cc..7b0fd96 100644 --- a/sysservice/messagequeueservice/MessageQueueService.go +++ b/sysservice/messagequeueservice/MessageQueueService.go @@ -63,7 +63,7 @@ func (ms *MessageQueueService) ReadCfg() error { maxProcessTopicBacklogNum, ok := mapDBServiceCfg["MaxProcessTopicBacklogNum"] if ok == false { ms.maxProcessTopicBacklogNum = DefaultMaxTopicBacklogNum - log.Info("MaxProcessTopicBacklogNum config is set to the default value of ", maxProcessTopicBacklogNum) + log.SInfo("MaxProcessTopicBacklogNum config is set to the default value of ", maxProcessTopicBacklogNum) } else { ms.maxProcessTopicBacklogNum = int32(maxProcessTopicBacklogNum.(float64)) } @@ -71,7 +71,7 @@ func (ms *MessageQueueService) ReadCfg() error { memoryQueueLen, ok := mapDBServiceCfg["MemoryQueueLen"] if ok == false { ms.memoryQueueLen = DefaultMemoryQueueLen - log.Info("MemoryQueueLen config is set to the default value of ", DefaultMemoryQueueLen) + log.SInfo("MemoryQueueLen config is set to the default value of ", DefaultMemoryQueueLen) } else { ms.memoryQueueLen = int32(memoryQueueLen.(float64)) } diff --git a/sysservice/messagequeueservice/MongoPersist.go b/sysservice/messagequeueservice/MongoPersist.go index 52519cd..05c8246 100644 --- a/sysservice/messagequeueservice/MongoPersist.go +++ b/sysservice/messagequeueservice/MongoPersist.go @@ -237,7 +237,7 @@ func (mp *MongoPersist) findTopicData(topic string, startIndex uint64, limit int defer cancelAll() err = cursor.All(ctxAll, &res) if err != nil { - log.Error("find collect name ", topic, " is error", log.ErrorAttr("err", err)) + log.Error("find collect name error",log.String("topic",topic) ,log.ErrorField("err",err)) return nil, false } @@ -246,7 +246,7 @@ func (mp *MongoPersist) findTopicData(topic string, startIndex uint64, limit int rawData, errM := bson.Marshal(res[i]) if errM != nil { if errM != nil { - log.Error("collect name ", topic, " Marshal is error", log.ErrorAttr("err", err)) + log.Error("Marshal error",log.String("topic",topic) , log.ErrorField("err", err)) return nil, false } continue @@ -391,7 +391,7 @@ func (mp *MongoPersist) GetIndex(topicData *TopicData) uint64 { if e.Key == "_id" { errC, seq := convertToNumber[uint64](e.Value) if errC != nil { - log.Error("value is error:%s,%+v, ", errC.Error(), e.Value) + log.Error("value is error", log.ErrorField("err",errC), log.Any("val",e.Value)) } return seq diff --git a/sysservice/messagequeueservice/Subscriber.go b/sysservice/messagequeueservice/Subscriber.go index 5fcd749..4c0b3cf 100644 --- a/sysservice/messagequeueservice/Subscriber.go +++ b/sysservice/messagequeueservice/Subscriber.go @@ -56,9 +56,9 @@ func (ss *Subscriber) TopicSubscribe(rpcHandler rpc.IRpcHandler, subScribeType r } if ok == true { - log.Info("repeat subscription for customer ", customerId) + log.SInfo("repeat subscription for customer ", customerId) } else { - log.Info("subscription for customer ", customerId) + log.SInfo("subscription for customer ", customerId) } } @@ -72,7 +72,7 @@ func (ss *Subscriber) UnSubscribe(customerId string) { customerSubscriber, ok := ss.mapCustomer[customerId] if ok == false { - log.SWarning("failed to unsubscribe customer " + customerId) + log.SWarn("failed to unsubscribe customer ", customerId) return } diff --git a/sysservice/messagequeueservice/TopicRoom.go b/sysservice/messagequeueservice/TopicRoom.go index c0f7f34..d620a2a 100644 --- a/sysservice/messagequeueservice/TopicRoom.go +++ b/sysservice/messagequeueservice/TopicRoom.go @@ -93,7 +93,7 @@ func (tr *TopicRoom) Stop() { func (tr *TopicRoom) topicRoomRun() { defer tr.queueWait.Done() - log.Info("topic room ", tr.topic, " is running..") + log.SInfo("topic room ", tr.topic, " is running..") for { if atomic.LoadInt32(&tr.isStop) != 0 { break @@ -145,5 +145,5 @@ func (tr *TopicRoom) topicRoomRun() { } tr.customerLocker.Unlock() - log.Info("topic room ", tr.topic, " is stop") + log.SInfo("topic room ", tr.topic, " is stop") }