mirror of
https://github.com/duanhf2012/origin.git
synced 2026-02-13 15:14:46 +08:00
Compare commits
1 Commits
v2.1.8
...
dependabot
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
2c0b168e66 |
14
go.mod
14
go.mod
@@ -1,11 +1,11 @@
|
||||
module github.com/duanhf2012/origin/v2
|
||||
|
||||
go 1.22
|
||||
|
||||
toolchain go1.22.7
|
||||
toolchain go1.24.1
|
||||
|
||||
require (
|
||||
github.com/IBM/sarama v1.43.3
|
||||
github.com/duanhf2012/rotatelogs v0.0.0-20250124024205-39765c212d8a
|
||||
github.com/gin-gonic/gin v1.10.0
|
||||
github.com/go-sql-driver/mysql v1.6.0
|
||||
github.com/gomodule/redigo v1.8.8
|
||||
@@ -83,11 +83,11 @@ require (
|
||||
go.etcd.io/etcd/client/pkg/v3 v3.5.13 // indirect
|
||||
go.uber.org/multierr v1.10.0 // indirect
|
||||
golang.org/x/arch v0.8.0 // indirect
|
||||
golang.org/x/crypto v0.26.0 // indirect
|
||||
golang.org/x/net v0.28.0 // indirect
|
||||
golang.org/x/sync v0.8.0 // indirect
|
||||
golang.org/x/sys v0.23.0 // indirect
|
||||
golang.org/x/text v0.17.0 // indirect
|
||||
golang.org/x/crypto v0.36.0 // indirect
|
||||
golang.org/x/net v0.38.0 // indirect
|
||||
golang.org/x/sync v0.12.0 // indirect
|
||||
golang.org/x/sys v0.31.0 // indirect
|
||||
golang.org/x/text v0.23.0 // indirect
|
||||
google.golang.org/genproto v0.0.0-20230822172742-b8732ec3820d // indirect
|
||||
google.golang.org/genproto/googleapis/api v0.0.0-20230822172742-b8732ec3820d // indirect
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20230822172742-b8732ec3820d // indirect
|
||||
|
||||
22
go.sum
22
go.sum
@@ -20,6 +20,8 @@ github.com/coreos/go-systemd/v22 v22.3.2/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSV
|
||||
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
|
||||
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/duanhf2012/rotatelogs v0.0.0-20250124024205-39765c212d8a h1:BVmZrOSKTg9ry1YjqY6IjVXmBDsFdX/W+pnvO5cPUDc=
|
||||
github.com/duanhf2012/rotatelogs v0.0.0-20250124024205-39765c212d8a/go.mod h1:S/NNkpdnXps6VXaYVVDFtqQAm/NKayHxxOAhsrFnCgg=
|
||||
github.com/eapache/go-resiliency v1.7.0 h1:n3NRTnBn5N0Cbi/IeOHuQn9s2UwVUH7Ga0ZWcP+9JTA=
|
||||
github.com/eapache/go-resiliency v1.7.0/go.mod h1:5yPzW0MIvSe0JDsv0v+DvcjEv2FyD6iZYSs1ZI+iQho=
|
||||
github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 h1:Oy0F4ALJ04o5Qqpdz8XLIpNA3WM/iSIXqxtqo7UGVws=
|
||||
@@ -229,8 +231,8 @@ golang.org/x/crypto v0.0.0-20201012173705-84dcc777aaee/go.mod h1:LzIPMQfyMNhhGPh
|
||||
golang.org/x/crypto v0.0.0-20201216223049-8b5274cf687f/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I=
|
||||
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
|
||||
golang.org/x/crypto v0.6.0/go.mod h1:OFC/31mSvZgRz0V1QTNCzfAI1aIRzbiufJtkMIlEp58=
|
||||
golang.org/x/crypto v0.26.0 h1:RrRspgV4mU+YwB4FYnuBoKsUapNIL5cohGAmSH3azsw=
|
||||
golang.org/x/crypto v0.26.0/go.mod h1:GY7jblb9wI+FOo5y8/S2oY4zWP07AkOJ4+jxCqdqn54=
|
||||
golang.org/x/crypto v0.36.0 h1:AnAEvhDddvBdpY+uR+MyHmuZzzNqXSe/GvuDeob5L34=
|
||||
golang.org/x/crypto v0.36.0/go.mod h1:Y4J0ReaxCR1IMaabaSMugxJES1EpwhBHhv2bDHklZvc=
|
||||
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
|
||||
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
|
||||
golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU=
|
||||
@@ -252,8 +254,8 @@ golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v
|
||||
golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
|
||||
golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
|
||||
golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
|
||||
golang.org/x/net v0.28.0 h1:a9JDOJc5GMUJ0+UDqmLT86WiEy7iWyIhz8gz8E4e5hE=
|
||||
golang.org/x/net v0.28.0/go.mod h1:yqtgsTWOOnlGLG9GFRrK3++bGOUEkNBoHZc8MEDWPNg=
|
||||
golang.org/x/net v0.38.0 h1:vRMAPTMaeGqVhG5QyLJHqNDwecKTomGeqbnfZyKlBI8=
|
||||
golang.org/x/net v0.38.0/go.mod h1:ivrbrMbzFq5J41QOQh0siUuly180yBYtLp+CKbEaFx8=
|
||||
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
|
||||
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
@@ -261,8 +263,8 @@ golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJ
|
||||
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ=
|
||||
golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
|
||||
golang.org/x/sync v0.12.0 h1:MHc5BpPuC30uJk597Ri8TV3CNZcTLu6B6z4lJy+g6Jw=
|
||||
golang.org/x/sync v0.12.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA=
|
||||
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
@@ -275,8 +277,8 @@ golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBc
|
||||
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.23.0 h1:YfKFowiIMvtgl1UERQoTPPToxltDeZfbj4H7dVUCwmM=
|
||||
golang.org/x/sys v0.23.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
|
||||
golang.org/x/sys v0.31.0 h1:ioabZlmFYtWhL+TRYpcnNlLwhyxaM9kWTDEmfnprqik=
|
||||
golang.org/x/sys v0.31.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
|
||||
golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw=
|
||||
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
|
||||
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
|
||||
@@ -286,8 +288,8 @@ golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
|
||||
golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
|
||||
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
|
||||
golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
|
||||
golang.org/x/text v0.17.0 h1:XtiM5bkSOt+ewxlOE/aE/AKEHibwj/6gvWMl9Rsh0Qc=
|
||||
golang.org/x/text v0.17.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY=
|
||||
golang.org/x/text v0.23.0 h1:D71I7dUrlY+VX0gQShAThNGHFxZ13dGLBHQLVl1mJlY=
|
||||
golang.org/x/text v0.23.0/go.mod h1:/BLNzu4aZCJ1+kcD0DNRotWKage4q2rGVAg4o22unh4=
|
||||
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
|
||||
golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
|
||||
golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY=
|
||||
|
||||
@@ -73,22 +73,9 @@ func (handler *WSHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
handler.conns[conn] = struct{}{}
|
||||
handler.mutexConns.Unlock()
|
||||
c,ok:=conn.NetConn().(*net.TCPConn)
|
||||
if !ok {
|
||||
tlsConn,ok := conn.NetConn().(*tls.Conn)
|
||||
if !ok {
|
||||
log.Error("conn error")
|
||||
return
|
||||
}
|
||||
c,ok = tlsConn.NetConn().(*net.TCPConn)
|
||||
if !ok {
|
||||
log.Error("conn error")
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
c.SetLinger(0)
|
||||
c.SetNoDelay(true)
|
||||
conn.UnderlyingConn().(*net.TCPConn).SetLinger(0)
|
||||
conn.UnderlyingConn().(*net.TCPConn).SetNoDelay(true)
|
||||
wsConn := newWSConn(conn, r.Header, handler.pendingWriteNum, handler.maxMsgLen, handler.messageType)
|
||||
agent := handler.newAgent(wsConn)
|
||||
agent.Run()
|
||||
|
||||
@@ -248,7 +248,7 @@ func (ch *ConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession
|
||||
select {
|
||||
case msg := <-claim.Messages():
|
||||
if msg == nil {
|
||||
log.SWarn("claim will exit", log.Any("topic", claim.Topic()), log.Any("Partition", claim.Partition()))
|
||||
log.SWarning("claim will exit", log.Any("topic", claim.Topic()), log.Any("Partition", claim.Partition()))
|
||||
return nil
|
||||
}
|
||||
ch.AppendMsg(session, msg)
|
||||
|
||||
@@ -86,7 +86,7 @@ func (p *Producer) asyncRun() {
|
||||
asyncReturn := sm.Metadata.(*AsyncReturn)
|
||||
asyncReturn.chanReturn <- asyncReturn
|
||||
case em := <-p.Errors():
|
||||
log.Error("async kafkamodule error", log.ErrorField("err", em.Err))
|
||||
log.Error("async kafkamodule error", log.ErrorAttr("err", em.Err))
|
||||
if em.Msg.Metadata == nil {
|
||||
break
|
||||
}
|
||||
|
||||
@@ -95,7 +95,7 @@ func (m *MySQLModule) Begin() (*Tx, error) {
|
||||
var txDBModule Tx
|
||||
txDb, err := m.db.Begin()
|
||||
if err != nil {
|
||||
log.Error("Begin error", log.ErrorField("err",err))
|
||||
log.Error("Begin error:%s", err.Error())
|
||||
return &txDBModule, err
|
||||
}
|
||||
txDBModule.slowDuration = m.slowDuration
|
||||
@@ -155,7 +155,7 @@ func (m *MySQLModule) runPing() {
|
||||
for {
|
||||
select {
|
||||
case <-m.pingCoroutine.pintExit:
|
||||
log.Error("RunPing stopping",log.String("url", m.url),log.String("dbname", m.dbname))
|
||||
log.Error("RunPing stopping %s...", fmt.Sprintf("%T", m))
|
||||
return
|
||||
case <-m.pingCoroutine.tickerPing.C:
|
||||
if m.db != nil {
|
||||
@@ -221,12 +221,12 @@ func query(slowDuration time.Duration, db dbControl, strQuery string, args ...in
|
||||
datasetList.blur = true
|
||||
|
||||
if checkArgs(args) != nil {
|
||||
log.Error("CheckArgs is error",log.String("sql",strQuery))
|
||||
log.Error("CheckArgs is error :%s", strQuery)
|
||||
return &datasetList, fmt.Errorf("checkArgs is error")
|
||||
}
|
||||
|
||||
if db == nil {
|
||||
log.Error("cannot connect database",log.String("sql", strQuery))
|
||||
log.Error("cannot connect database:%s", strQuery)
|
||||
return &datasetList, fmt.Errorf("cannot connect database")
|
||||
}
|
||||
|
||||
@@ -235,10 +235,10 @@ func query(slowDuration time.Duration, db dbControl, strQuery string, args ...in
|
||||
timeFuncPass := time.Since(TimeFuncStart)
|
||||
|
||||
if checkSlow(slowDuration, timeFuncPass) {
|
||||
log.Error("Query slow",log.Int64("time_ms",timeFuncPass.Milliseconds()),log.String("sql", strQuery), log.Any("args",args))
|
||||
log.Error("DBModule QueryEx Time %s , Query :%s , args :%+v", timeFuncPass, strQuery, args)
|
||||
}
|
||||
if err != nil {
|
||||
log.Error("Query error", log.String("sql",strQuery),log.ErrorField("err",err))
|
||||
log.Error("Query:%s(%v)", strQuery, err)
|
||||
if rows != nil {
|
||||
rows.Close()
|
||||
}
|
||||
@@ -278,8 +278,8 @@ func query(slowDuration time.Duration, db dbControl, strQuery string, args ...in
|
||||
hasRet := rows.NextResultSet()
|
||||
|
||||
if hasRet == false {
|
||||
if rowErr :=rows.Err();rowErr != nil {
|
||||
log.Error("NextResultSet error", log.String("sql",strQuery), log.ErrorField("err",rowErr))
|
||||
if rows.Err() != nil {
|
||||
log.Error("Query:%s(%+v)", strQuery, rows)
|
||||
}
|
||||
break
|
||||
}
|
||||
@@ -291,12 +291,12 @@ func query(slowDuration time.Duration, db dbControl, strQuery string, args ...in
|
||||
func exec(slowDuration time.Duration, db dbControl, strSql string, args ...interface{}) (*DBResult, error) {
|
||||
ret := &DBResult{}
|
||||
if db == nil {
|
||||
log.Error("cannot connect database", log.String("sql",strSql))
|
||||
log.Error("cannot connect database:%s", strSql)
|
||||
return ret, fmt.Errorf("cannot connect database")
|
||||
}
|
||||
|
||||
if checkArgs(args) != nil {
|
||||
log.Error("CheckArgs is error", log.String("sql",strSql))
|
||||
log.Error("CheckArgs is error :%s", strSql)
|
||||
return ret, fmt.Errorf("checkArgs is error")
|
||||
}
|
||||
|
||||
@@ -304,10 +304,10 @@ func exec(slowDuration time.Duration, db dbControl, strSql string, args ...inter
|
||||
res, err := db.Exec(strSql, args...)
|
||||
timeFuncPass := time.Since(TimeFuncStart)
|
||||
if checkSlow(slowDuration, timeFuncPass) {
|
||||
log.Error("Exec slow",log.Int64("time_ms",timeFuncPass.Milliseconds()),log.String("sql",strSql),log.Any("args",args) )
|
||||
log.Error("DBModule QueryEx Time %s , Query :%s , args :%+v", timeFuncPass, strSql, args)
|
||||
}
|
||||
if err != nil {
|
||||
log.Error("Exec error",log.String("sql",strSql),log.ErrorField("err", err))
|
||||
log.Error("Exec:%s(%v)", strSql, err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
|
||||
@@ -34,8 +34,6 @@ type WSCfg struct {
|
||||
PendingWriteNum int
|
||||
MaxMsgLen uint32
|
||||
LittleEndian bool //是否小端序
|
||||
KeyFile string
|
||||
CertFile string
|
||||
}
|
||||
|
||||
type WSPackType int8
|
||||
@@ -64,18 +62,13 @@ func (ws *WSModule) OnInit() error {
|
||||
ws.WSServer.MaxMsgLen = ws.wsCfg.MaxMsgLen
|
||||
ws.WSServer.Addr = ws.wsCfg.ListenAddr
|
||||
|
||||
if ws.wsCfg.KeyFile != "" && ws.wsCfg.CertFile != "" {
|
||||
ws.WSServer.KeyFile = ws.wsCfg.KeyFile
|
||||
ws.WSServer.CertFile = ws.wsCfg.CertFile
|
||||
}
|
||||
|
||||
// 设置解析处理器
|
||||
//3.设置解析处理器
|
||||
ws.process.SetByteOrder(ws.wsCfg.LittleEndian)
|
||||
|
||||
ws.mapClient = make(map[string]*WSClient, ws.WSServer.MaxConnNum)
|
||||
ws.WSServer.NewAgent = ws.NewWSClient
|
||||
|
||||
// 设置网络事件处理
|
||||
//4.设置网络事件处理
|
||||
ws.GetEventProcessor().RegEventReceiverFunc(event.Sys_Event_WebSocket, ws.GetEventHandler(), ws.wsEventHandler)
|
||||
|
||||
return nil
|
||||
|
||||
@@ -104,11 +104,11 @@ func (cs *CustomerSubscriber) UnSubscribe() {
|
||||
func (cs *CustomerSubscriber) LoadLastIndex() {
|
||||
for {
|
||||
if atomic.LoadInt32(&cs.isStop) != 0 {
|
||||
log.SInfo("topic ", cs.topic, " out of subscription")
|
||||
log.Info("topic ", cs.topic, " out of subscription")
|
||||
break
|
||||
}
|
||||
|
||||
log.SInfo("customer ", cs.customerId, " start load last index ")
|
||||
log.Info("customer ", cs.customerId, " start load last index ")
|
||||
lastIndex, ret := cs.subscriber.dataPersist.LoadCustomerIndex(cs.topic, cs.customerId)
|
||||
if ret == true {
|
||||
if lastIndex > 0 {
|
||||
@@ -116,18 +116,18 @@ func (cs *CustomerSubscriber) LoadLastIndex() {
|
||||
} else {
|
||||
//否则直接使用客户端发回来的
|
||||
}
|
||||
log.SInfo("customer ", cs.customerId, " load finish,start index is ", cs.StartIndex)
|
||||
log.Info("customer ", cs.customerId, " load finish,start index is ", cs.StartIndex)
|
||||
break
|
||||
}
|
||||
|
||||
log.SInfo("customer ", cs.customerId, " load last index is fail...")
|
||||
log.Info("customer ", cs.customerId, " load last index is fail...")
|
||||
time.Sleep(5 * time.Second)
|
||||
}
|
||||
}
|
||||
|
||||
func (cs *CustomerSubscriber) SubscribeRun() {
|
||||
defer cs.subscriber.queueWait.Done()
|
||||
log.SInfo("topic ", cs.topic, " start subscription")
|
||||
log.Info("topic ", cs.topic, " start subscription")
|
||||
|
||||
//加载之前的位置
|
||||
if cs.subscribeMethod == MethodLast {
|
||||
@@ -136,7 +136,7 @@ func (cs *CustomerSubscriber) SubscribeRun() {
|
||||
|
||||
for {
|
||||
if atomic.LoadInt32(&cs.isStop) != 0 {
|
||||
log.SInfo("topic ", cs.topic, " out of subscription")
|
||||
log.Info("topic ", cs.topic, " out of subscription")
|
||||
break
|
||||
}
|
||||
|
||||
@@ -146,14 +146,14 @@ func (cs *CustomerSubscriber) SubscribeRun() {
|
||||
|
||||
//todo 检测退出
|
||||
if cs.subscribe() == false {
|
||||
log.SInfo("topic ", cs.topic, " out of subscription")
|
||||
log.Info("topic ", cs.topic, " out of subscription")
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
//删除订阅关系
|
||||
cs.subscriber.removeCustomer(cs.customerId, cs)
|
||||
log.SInfo("topic ", cs.topic, " unsubscription")
|
||||
log.Info("topic ", cs.topic, " unsubscription")
|
||||
}
|
||||
|
||||
func (cs *CustomerSubscriber) subscribe() bool {
|
||||
|
||||
@@ -63,7 +63,7 @@ func (ms *MessageQueueService) ReadCfg() error {
|
||||
maxProcessTopicBacklogNum, ok := mapDBServiceCfg["MaxProcessTopicBacklogNum"]
|
||||
if ok == false {
|
||||
ms.maxProcessTopicBacklogNum = DefaultMaxTopicBacklogNum
|
||||
log.SInfo("MaxProcessTopicBacklogNum config is set to the default value of ", maxProcessTopicBacklogNum)
|
||||
log.Info("MaxProcessTopicBacklogNum config is set to the default value of ", maxProcessTopicBacklogNum)
|
||||
} else {
|
||||
ms.maxProcessTopicBacklogNum = int32(maxProcessTopicBacklogNum.(float64))
|
||||
}
|
||||
@@ -71,7 +71,7 @@ func (ms *MessageQueueService) ReadCfg() error {
|
||||
memoryQueueLen, ok := mapDBServiceCfg["MemoryQueueLen"]
|
||||
if ok == false {
|
||||
ms.memoryQueueLen = DefaultMemoryQueueLen
|
||||
log.SInfo("MemoryQueueLen config is set to the default value of ", DefaultMemoryQueueLen)
|
||||
log.Info("MemoryQueueLen config is set to the default value of ", DefaultMemoryQueueLen)
|
||||
} else {
|
||||
ms.memoryQueueLen = int32(memoryQueueLen.(float64))
|
||||
}
|
||||
|
||||
@@ -237,7 +237,7 @@ func (mp *MongoPersist) findTopicData(topic string, startIndex uint64, limit int
|
||||
defer cancelAll()
|
||||
err = cursor.All(ctxAll, &res)
|
||||
if err != nil {
|
||||
log.Error("find collect name error",log.String("topic",topic) ,log.ErrorField("err",err))
|
||||
log.Error("find collect name ", topic, " is error", log.ErrorAttr("err", err))
|
||||
return nil, false
|
||||
}
|
||||
|
||||
@@ -246,7 +246,7 @@ func (mp *MongoPersist) findTopicData(topic string, startIndex uint64, limit int
|
||||
rawData, errM := bson.Marshal(res[i])
|
||||
if errM != nil {
|
||||
if errM != nil {
|
||||
log.Error("Marshal error",log.String("topic",topic) , log.ErrorField("err", err))
|
||||
log.Error("collect name ", topic, " Marshal is error", log.ErrorAttr("err", err))
|
||||
return nil, false
|
||||
}
|
||||
continue
|
||||
@@ -391,7 +391,7 @@ func (mp *MongoPersist) GetIndex(topicData *TopicData) uint64 {
|
||||
if e.Key == "_id" {
|
||||
errC, seq := convertToNumber[uint64](e.Value)
|
||||
if errC != nil {
|
||||
log.Error("value is error", log.ErrorField("err",errC), log.Any("val",e.Value))
|
||||
log.Error("value is error:%s,%+v, ", errC.Error(), e.Value)
|
||||
}
|
||||
|
||||
return seq
|
||||
|
||||
@@ -56,9 +56,9 @@ func (ss *Subscriber) TopicSubscribe(rpcHandler rpc.IRpcHandler, subScribeType r
|
||||
}
|
||||
|
||||
if ok == true {
|
||||
log.SInfo("repeat subscription for customer ", customerId)
|
||||
log.Info("repeat subscription for customer ", customerId)
|
||||
} else {
|
||||
log.SInfo("subscription for customer ", customerId)
|
||||
log.Info("subscription for customer ", customerId)
|
||||
}
|
||||
|
||||
}
|
||||
@@ -72,7 +72,7 @@ func (ss *Subscriber) UnSubscribe(customerId string) {
|
||||
|
||||
customerSubscriber, ok := ss.mapCustomer[customerId]
|
||||
if ok == false {
|
||||
log.SWarn("failed to unsubscribe customer ", customerId)
|
||||
log.SWarning("failed to unsubscribe customer " + customerId)
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
@@ -93,7 +93,7 @@ func (tr *TopicRoom) Stop() {
|
||||
func (tr *TopicRoom) topicRoomRun() {
|
||||
defer tr.queueWait.Done()
|
||||
|
||||
log.SInfo("topic room ", tr.topic, " is running..")
|
||||
log.Info("topic room ", tr.topic, " is running..")
|
||||
for {
|
||||
if atomic.LoadInt32(&tr.isStop) != 0 {
|
||||
break
|
||||
@@ -145,5 +145,5 @@ func (tr *TopicRoom) topicRoomRun() {
|
||||
}
|
||||
tr.customerLocker.Unlock()
|
||||
|
||||
log.SInfo("topic room ", tr.topic, " is stop")
|
||||
log.Info("topic room ", tr.topic, " is stop")
|
||||
}
|
||||
|
||||
@@ -142,13 +142,13 @@ func (mp *MongoPersist) OnSetupRank(manual bool, rankSkip *RankSkip) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
log.SInfo("start load rank ", rankSkip.GetRankName(), " from mongodb.")
|
||||
log.Info("start load rank ", rankSkip.GetRankName(), " from mongodb.")
|
||||
err := mp.loadFromDB(rankSkip.GetRankID(), rankSkip.GetRankName())
|
||||
if err != nil {
|
||||
log.SError("load from db is fail :%s", err.Error())
|
||||
return err
|
||||
}
|
||||
log.SInfo("finish load rank ", rankSkip.GetRankName(), " from mongodb.")
|
||||
log.Info("finish load rank ", rankSkip.GetRankName(), " from mongodb.")
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -296,7 +296,7 @@ func (mp *MongoPersist) saveToDB() {
|
||||
buf := make([]byte, 4096)
|
||||
l := runtime.Stack(buf, false)
|
||||
errString := fmt.Sprint(r)
|
||||
log.StackError(string(buf[:l]), log.String("error", errString))
|
||||
log.Dump(string(buf[:l]), log.String("error", errString))
|
||||
}
|
||||
}()
|
||||
|
||||
|
||||
Reference in New Issue
Block a user