Compare commits

..

4 Commits

Author SHA1 Message Date
boyce
6c44ba180c 优化kafkamoudule日志 2025-05-21 21:38:36 +08:00
boyce
ecfd42bdec 优化消息队列日志 2025-05-21 21:34:32 +08:00
boyce
01d3b3e535 优化RankService日志 2025-05-21 21:22:24 +08:00
boyce
550d65a354 优化mysql模块日志 2025-05-21 21:19:03 +08:00
9 changed files with 35 additions and 35 deletions

View File

@@ -248,7 +248,7 @@ func (ch *ConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession
select {
case msg := <-claim.Messages():
if msg == nil {
log.SWarning("claim will exit", log.Any("topic", claim.Topic()), log.Any("Partition", claim.Partition()))
log.SWarn("claim will exit", log.Any("topic", claim.Topic()), log.Any("Partition", claim.Partition()))
return nil
}
ch.AppendMsg(session, msg)

View File

@@ -86,7 +86,7 @@ func (p *Producer) asyncRun() {
asyncReturn := sm.Metadata.(*AsyncReturn)
asyncReturn.chanReturn <- asyncReturn
case em := <-p.Errors():
log.Error("async kafkamodule error", log.ErrorAttr("err", em.Err))
log.Error("async kafkamodule error", log.ErrorField("err", em.Err))
if em.Msg.Metadata == nil {
break
}

View File

@@ -95,7 +95,7 @@ func (m *MySQLModule) Begin() (*Tx, error) {
var txDBModule Tx
txDb, err := m.db.Begin()
if err != nil {
log.Error("Begin error:%s", err.Error())
log.Error("Begin error", log.ErrorField("err",err))
return &txDBModule, err
}
txDBModule.slowDuration = m.slowDuration
@@ -155,7 +155,7 @@ func (m *MySQLModule) runPing() {
for {
select {
case <-m.pingCoroutine.pintExit:
log.Error("RunPing stopping %s...", fmt.Sprintf("%T", m))
log.Error("RunPing stopping",log.String("url", m.url),log.String("dbname", m.dbname))
return
case <-m.pingCoroutine.tickerPing.C:
if m.db != nil {
@@ -221,12 +221,12 @@ func query(slowDuration time.Duration, db dbControl, strQuery string, args ...in
datasetList.blur = true
if checkArgs(args) != nil {
log.Error("CheckArgs is error :%s", strQuery)
log.Error("CheckArgs is error",log.String("sql",strQuery))
return &datasetList, fmt.Errorf("checkArgs is error")
}
if db == nil {
log.Error("cannot connect database:%s", strQuery)
log.Error("cannot connect database",log.String("sql", strQuery))
return &datasetList, fmt.Errorf("cannot connect database")
}
@@ -235,10 +235,10 @@ func query(slowDuration time.Duration, db dbControl, strQuery string, args ...in
timeFuncPass := time.Since(TimeFuncStart)
if checkSlow(slowDuration, timeFuncPass) {
log.Error("DBModule QueryEx Time %s , Query :%s , args :%+v", timeFuncPass, strQuery, args)
log.Error("Query slow",log.Int64("time_ms",timeFuncPass.Milliseconds()),log.String("sql", strQuery), log.Any("args",args))
}
if err != nil {
log.Error("Query:%s(%v)", strQuery, err)
log.Error("Query error", log.String("sql",strQuery),log.ErrorField("err",err))
if rows != nil {
rows.Close()
}
@@ -278,8 +278,8 @@ func query(slowDuration time.Duration, db dbControl, strQuery string, args ...in
hasRet := rows.NextResultSet()
if hasRet == false {
if rows.Err() != nil {
log.Error("Query:%s(%+v)", strQuery, rows)
if rowErr :=rows.Err();rowErr != nil {
log.Error("NextResultSet error", log.String("sql",strQuery), log.ErrorField("err",rowErr))
}
break
}
@@ -291,12 +291,12 @@ func query(slowDuration time.Duration, db dbControl, strQuery string, args ...in
func exec(slowDuration time.Duration, db dbControl, strSql string, args ...interface{}) (*DBResult, error) {
ret := &DBResult{}
if db == nil {
log.Error("cannot connect database:%s", strSql)
log.Error("cannot connect database", log.String("sql",strSql))
return ret, fmt.Errorf("cannot connect database")
}
if checkArgs(args) != nil {
log.Error("CheckArgs is error :%s", strSql)
log.Error("CheckArgs is error", log.String("sql",strSql))
return ret, fmt.Errorf("checkArgs is error")
}
@@ -304,10 +304,10 @@ func exec(slowDuration time.Duration, db dbControl, strSql string, args ...inter
res, err := db.Exec(strSql, args...)
timeFuncPass := time.Since(TimeFuncStart)
if checkSlow(slowDuration, timeFuncPass) {
log.Error("DBModule QueryEx Time %s , Query :%s , args :%+v", timeFuncPass, strSql, args)
log.Error("Exec slow",log.Int64("time_ms",timeFuncPass.Milliseconds()),log.String("sql",strSql),log.Any("args",args) )
}
if err != nil {
log.Error("Exec:%s(%v)", strSql, err)
log.Error("Exec error",log.String("sql",strSql),log.ErrorField("err", err))
return nil, err
}

View File

@@ -104,11 +104,11 @@ func (cs *CustomerSubscriber) UnSubscribe() {
func (cs *CustomerSubscriber) LoadLastIndex() {
for {
if atomic.LoadInt32(&cs.isStop) != 0 {
log.Info("topic ", cs.topic, " out of subscription")
log.SInfo("topic ", cs.topic, " out of subscription")
break
}
log.Info("customer ", cs.customerId, " start load last index ")
log.SInfo("customer ", cs.customerId, " start load last index ")
lastIndex, ret := cs.subscriber.dataPersist.LoadCustomerIndex(cs.topic, cs.customerId)
if ret == true {
if lastIndex > 0 {
@@ -116,18 +116,18 @@ func (cs *CustomerSubscriber) LoadLastIndex() {
} else {
//否则直接使用客户端发回来的
}
log.Info("customer ", cs.customerId, " load finish,start index is ", cs.StartIndex)
log.SInfo("customer ", cs.customerId, " load finish,start index is ", cs.StartIndex)
break
}
log.Info("customer ", cs.customerId, " load last index is fail...")
log.SInfo("customer ", cs.customerId, " load last index is fail...")
time.Sleep(5 * time.Second)
}
}
func (cs *CustomerSubscriber) SubscribeRun() {
defer cs.subscriber.queueWait.Done()
log.Info("topic ", cs.topic, " start subscription")
log.SInfo("topic ", cs.topic, " start subscription")
//加载之前的位置
if cs.subscribeMethod == MethodLast {
@@ -136,7 +136,7 @@ func (cs *CustomerSubscriber) SubscribeRun() {
for {
if atomic.LoadInt32(&cs.isStop) != 0 {
log.Info("topic ", cs.topic, " out of subscription")
log.SInfo("topic ", cs.topic, " out of subscription")
break
}
@@ -146,14 +146,14 @@ func (cs *CustomerSubscriber) SubscribeRun() {
//todo 检测退出
if cs.subscribe() == false {
log.Info("topic ", cs.topic, " out of subscription")
log.SInfo("topic ", cs.topic, " out of subscription")
break
}
}
//删除订阅关系
cs.subscriber.removeCustomer(cs.customerId, cs)
log.Info("topic ", cs.topic, " unsubscription")
log.SInfo("topic ", cs.topic, " unsubscription")
}
func (cs *CustomerSubscriber) subscribe() bool {

View File

@@ -63,7 +63,7 @@ func (ms *MessageQueueService) ReadCfg() error {
maxProcessTopicBacklogNum, ok := mapDBServiceCfg["MaxProcessTopicBacklogNum"]
if ok == false {
ms.maxProcessTopicBacklogNum = DefaultMaxTopicBacklogNum
log.Info("MaxProcessTopicBacklogNum config is set to the default value of ", maxProcessTopicBacklogNum)
log.SInfo("MaxProcessTopicBacklogNum config is set to the default value of ", maxProcessTopicBacklogNum)
} else {
ms.maxProcessTopicBacklogNum = int32(maxProcessTopicBacklogNum.(float64))
}
@@ -71,7 +71,7 @@ func (ms *MessageQueueService) ReadCfg() error {
memoryQueueLen, ok := mapDBServiceCfg["MemoryQueueLen"]
if ok == false {
ms.memoryQueueLen = DefaultMemoryQueueLen
log.Info("MemoryQueueLen config is set to the default value of ", DefaultMemoryQueueLen)
log.SInfo("MemoryQueueLen config is set to the default value of ", DefaultMemoryQueueLen)
} else {
ms.memoryQueueLen = int32(memoryQueueLen.(float64))
}

View File

@@ -237,7 +237,7 @@ func (mp *MongoPersist) findTopicData(topic string, startIndex uint64, limit int
defer cancelAll()
err = cursor.All(ctxAll, &res)
if err != nil {
log.Error("find collect name ", topic, " is error", log.ErrorAttr("err", err))
log.Error("find collect name error",log.String("topic",topic) ,log.ErrorField("err",err))
return nil, false
}
@@ -246,7 +246,7 @@ func (mp *MongoPersist) findTopicData(topic string, startIndex uint64, limit int
rawData, errM := bson.Marshal(res[i])
if errM != nil {
if errM != nil {
log.Error("collect name ", topic, " Marshal is error", log.ErrorAttr("err", err))
log.Error("Marshal error",log.String("topic",topic) , log.ErrorField("err", err))
return nil, false
}
continue
@@ -391,7 +391,7 @@ func (mp *MongoPersist) GetIndex(topicData *TopicData) uint64 {
if e.Key == "_id" {
errC, seq := convertToNumber[uint64](e.Value)
if errC != nil {
log.Error("value is error:%s,%+v, ", errC.Error(), e.Value)
log.Error("value is error", log.ErrorField("err",errC), log.Any("val",e.Value))
}
return seq

View File

@@ -56,9 +56,9 @@ func (ss *Subscriber) TopicSubscribe(rpcHandler rpc.IRpcHandler, subScribeType r
}
if ok == true {
log.Info("repeat subscription for customer ", customerId)
log.SInfo("repeat subscription for customer ", customerId)
} else {
log.Info("subscription for customer ", customerId)
log.SInfo("subscription for customer ", customerId)
}
}
@@ -72,7 +72,7 @@ func (ss *Subscriber) UnSubscribe(customerId string) {
customerSubscriber, ok := ss.mapCustomer[customerId]
if ok == false {
log.SWarning("failed to unsubscribe customer " + customerId)
log.SWarn("failed to unsubscribe customer ", customerId)
return
}

View File

@@ -93,7 +93,7 @@ func (tr *TopicRoom) Stop() {
func (tr *TopicRoom) topicRoomRun() {
defer tr.queueWait.Done()
log.Info("topic room ", tr.topic, " is running..")
log.SInfo("topic room ", tr.topic, " is running..")
for {
if atomic.LoadInt32(&tr.isStop) != 0 {
break
@@ -145,5 +145,5 @@ func (tr *TopicRoom) topicRoomRun() {
}
tr.customerLocker.Unlock()
log.Info("topic room ", tr.topic, " is stop")
log.SInfo("topic room ", tr.topic, " is stop")
}

View File

@@ -142,13 +142,13 @@ func (mp *MongoPersist) OnSetupRank(manual bool, rankSkip *RankSkip) error {
return nil
}
log.Info("start load rank ", rankSkip.GetRankName(), " from mongodb.")
log.SInfo("start load rank ", rankSkip.GetRankName(), " from mongodb.")
err := mp.loadFromDB(rankSkip.GetRankID(), rankSkip.GetRankName())
if err != nil {
log.SError("load from db is fail :%s", err.Error())
return err
}
log.Info("finish load rank ", rankSkip.GetRankName(), " from mongodb.")
log.SInfo("finish load rank ", rankSkip.GetRankName(), " from mongodb.")
return nil
}
@@ -296,7 +296,7 @@ func (mp *MongoPersist) saveToDB() {
buf := make([]byte, 4096)
l := runtime.Stack(buf, false)
errString := fmt.Sprint(r)
log.Dump(string(buf[:l]), log.String("error", errString))
log.StackError(string(buf[:l]), log.String("error", errString))
}
}()