Compare commits

...

6 Commits

Author SHA1 Message Date
duanhf2012
fa6039e2cb 优化异步日志 2023-08-17 15:54:36 +08:00
duanhf2012
25a672ca53 新增优化异步日志 2023-08-17 15:35:22 +08:00
duanhf2012
75f881be28 新增异步日志功能 2023-08-17 14:00:36 +08:00
duanhf2012
ef8182eec7 替换日志库为slog 2023-08-15 15:46:38 +08:00
duanhf2012
4ad8204fde 优化module包的名称 2023-08-04 17:21:23 +08:00
duanhf2012
8f15546fb1 整理README格式 2023-08-01 11:12:54 +08:00
44 changed files with 855 additions and 846 deletions

View File

@@ -64,19 +64,19 @@ cluster.json如下
"Private": false, "Private": false,
"ListenAddr":"127.0.0.1:8001", "ListenAddr":"127.0.0.1:8001",
"MaxRpcParamLen": 409600, "MaxRpcParamLen": 409600,
"CompressBytesLen": 20480, "CompressBytesLen": 20480,
"NodeName": "Node_Test1", "NodeName": "Node_Test1",
"remark":"//以_打头的表示只在本机进程不对整个子网公开", "remark":"//以_打头的表示只在本机进程不对整个子网公开",
"ServiceList": ["TestService1","TestService2","TestServiceCall","GateService","_TcpService","HttpService","WSService"] "ServiceList": ["TestService1","TestService2","TestServiceCall","GateService","_TcpService","HttpService","WSService"]
}, },
{ {
"NodeId": 2, "NodeId": 2,
"Private": false, "Private": false,
"ListenAddr":"127.0.0.1:8002", "ListenAddr":"127.0.0.1:8002",
"MaxRpcParamLen": 409600, "MaxRpcParamLen": 409600,
"CompressBytesLen": 20480, "CompressBytesLen": 20480,
"NodeName": "Node_Test1", "NodeName": "Node_Test1",
"remark":"//以_打头的表示只在本机进程不对整个子网公开", "remark":"//以_打头的表示只在本机进程不对整个子网公开",
"ServiceList": ["TestService1","TestService2","TestServiceCall","GateService","TcpService","HttpService","WSService"] "ServiceList": ["TestService1","TestService2","TestServiceCall","GateService","TcpService","HttpService","WSService"]
} }
] ]

View File

@@ -114,7 +114,7 @@ func (cls *Cluster) DelNode(nodeId int, immediately bool) {
//正在连接中不主动断开,只断开没有连接中的 //正在连接中不主动断开,只断开没有连接中的
if rpc.client.IsConnected() { if rpc.client.IsConnected() {
nodeInfo.status = Discard nodeInfo.status = Discard
log.SRelease("Discard node ", nodeInfo.NodeId, " ", nodeInfo.ListenAddr) log.Info("Discard node",log.Int("nodeId",nodeInfo.NodeId),log.String("ListenAddr", nodeInfo.ListenAddr))
return return
} }
@@ -131,7 +131,7 @@ func (cls *Cluster) DelNode(nodeId int, immediately bool) {
rpc.client.Close(false) rpc.client.Close(false)
} }
log.SRelease("remove node ", nodeInfo.NodeId, " ", nodeInfo.ListenAddr) log.Info("remove node ",log.Int("NodeId", nodeInfo.NodeId),log.String("ListenAddr", nodeInfo.ListenAddr))
} }
func (cls *Cluster) serviceDiscoveryDelNode(nodeId int, immediately bool) { func (cls *Cluster) serviceDiscoveryDelNode(nodeId int, immediately bool) {
@@ -176,7 +176,7 @@ func (cls *Cluster) serviceDiscoverySetNodeInfo(nodeInfo *NodeInfo) {
for _, serviceName := range nodeInfo.PublicServiceList { for _, serviceName := range nodeInfo.PublicServiceList {
if _, ok := mapDuplicate[serviceName]; ok == true { if _, ok := mapDuplicate[serviceName]; ok == true {
//存在重复 //存在重复
log.SError("Bad duplicate Service Cfg.") log.Error("Bad duplicate Service Cfg.")
continue continue
} }
mapDuplicate[serviceName] = nil mapDuplicate[serviceName] = nil
@@ -186,8 +186,7 @@ func (cls *Cluster) serviceDiscoverySetNodeInfo(nodeInfo *NodeInfo) {
cls.mapServiceNode[serviceName][nodeInfo.NodeId] = struct{}{} cls.mapServiceNode[serviceName][nodeInfo.NodeId] = struct{}{}
} }
cls.mapIdNode[nodeInfo.NodeId] = *nodeInfo cls.mapIdNode[nodeInfo.NodeId] = *nodeInfo
log.Info("Discovery nodeId",log.Int("NodeId", nodeInfo.NodeId),log.Any("services:", nodeInfo.PublicServiceList))
log.SRelease("Discovery nodeId: ", nodeInfo.NodeId, " services:", nodeInfo.PublicServiceList)
//已经存在连接,则不需要进行设置 //已经存在连接,则不需要进行设置
if _, rpcInfoOK := cls.mapRpc[nodeInfo.NodeId]; rpcInfoOK == true { if _, rpcInfoOK := cls.mapRpc[nodeInfo.NodeId]; rpcInfoOK == true {
@@ -368,7 +367,7 @@ func (cls *Cluster) triggerRpcEvent(bConnect bool, clientId uint32, nodeId int)
for serviceName, _ := range cls.mapServiceListenRpcEvent { for serviceName, _ := range cls.mapServiceListenRpcEvent {
ser := service.GetService(serviceName) ser := service.GetService(serviceName)
if ser == nil { if ser == nil {
log.SError("cannot find service name ", serviceName) log.Error("cannot find service name "+serviceName)
continue continue
} }
@@ -386,7 +385,7 @@ func (cls *Cluster) TriggerDiscoveryEvent(bDiscovery bool, nodeId int, serviceNa
for sName, _ := range cls.mapServiceListenDiscoveryEvent { for sName, _ := range cls.mapServiceListenDiscoveryEvent {
ser := service.GetService(sName) ser := service.GetService(sName)
if ser == nil { if ser == nil {
log.SError("cannot find service name ", serviceName) log.Error("cannot find service",log.Any("services",serviceName))
continue continue
} }

View File

@@ -142,7 +142,7 @@ func (ds *DynamicDiscoveryMaster) RpcCastGo(serviceMethod string, args interface
func (ds *DynamicDiscoveryMaster) RPC_RegServiceDiscover(req *rpc.ServiceDiscoverReq, res *rpc.Empty) error { func (ds *DynamicDiscoveryMaster) RPC_RegServiceDiscover(req *rpc.ServiceDiscoverReq, res *rpc.Empty) error {
if req.NodeInfo == nil { if req.NodeInfo == nil {
err := errors.New("RPC_RegServiceDiscover req is error.") err := errors.New("RPC_RegServiceDiscover req is error.")
log.SError(err.Error()) log.Error(err.Error())
return err return err
} }
@@ -369,7 +369,7 @@ func (dc *DynamicDiscoveryClient) regServiceDiscover(nodeId int){
//向Master服务同步本Node服务信息 //向Master服务同步本Node服务信息
err := dc.AsyncCallNode(nodeId, RegServiceDiscover, &req, func(res *rpc.Empty, err error) { err := dc.AsyncCallNode(nodeId, RegServiceDiscover, &req, func(res *rpc.Empty, err error) {
if err != nil { if err != nil {
log.SError("call ", RegServiceDiscover, " is fail :", err.Error()) log.Error("call "+RegServiceDiscover+" is fail :"+ err.Error())
dc.AfterFunc(time.Second*3, func(timer *timer.Timer) { dc.AfterFunc(time.Second*3, func(timer *timer.Timer) {
dc.regServiceDiscover(nodeId) dc.regServiceDiscover(nodeId)
}) })
@@ -378,7 +378,7 @@ func (dc *DynamicDiscoveryClient) regServiceDiscover(nodeId int){
} }
}) })
if err != nil { if err != nil {
log.SError("call ", RegServiceDiscover, " is fail :", err.Error()) log.Error("call "+ RegServiceDiscover+" is fail :"+ err.Error())
} }
} }

View File

@@ -58,7 +58,7 @@ func (cls *Cluster) readServiceConfig(filepath string) (interface{}, map[string]
serviceCfg := v.(map[string]interface{}) serviceCfg := v.(map[string]interface{})
nodeId, ok := serviceCfg["NodeId"] nodeId, ok := serviceCfg["NodeId"]
if ok == false { if ok == false {
log.SFatal("NodeService list not find nodeId field") log.Fatal("NodeService list not find nodeId field")
} }
mapNodeService[int(nodeId.(float64))] = serviceCfg mapNodeService[int(nodeId.(float64))] = serviceCfg
} }

View File

@@ -50,7 +50,7 @@ func (c *Concurrent) AsyncDoByQueue(queueId int64, fn func() bool, cb func(err e
} }
if fn == nil && cb == nil { if fn == nil && cb == nil {
log.SStack("fn and cb is nil") log.Stack("fn and cb is nil")
return return
} }
@@ -66,7 +66,7 @@ func (c *Concurrent) AsyncDoByQueue(queueId int64, fn func() bool, cb func(err e
select { select {
case c.tasks <- task{queueId, fn, cb}: case c.tasks <- task{queueId, fn, cb}:
default: default:
log.SError("tasks channel is full") log.Error("tasks channel is full")
if cb != nil { if cb != nil {
c.pushAsyncDoCallbackEvent(func(err error) { c.pushAsyncDoCallbackEvent(func(err error) {
cb(errors.New("tasks channel is full")) cb(errors.New("tasks channel is full"))
@@ -81,11 +81,11 @@ func (c *Concurrent) Close() {
return return
} }
log.SRelease("wait close concurrent") log.Info("wait close concurrent")
c.dispatch.close() c.dispatch.close()
log.SRelease("concurrent has successfully exited") log.Info("concurrent has successfully exited")
} }
func (c *Concurrent) GetCallBackChannel() chan func(error) { func (c *Concurrent) GetCallBackChannel() chan func(error) {

View File

@@ -187,8 +187,7 @@ func (d *dispatch) DoCallback(cb func(err error)) {
buf := make([]byte, 4096) buf := make([]byte, 4096)
l := runtime.Stack(buf, false) l := runtime.Stack(buf, false)
errString := fmt.Sprint(r) errString := fmt.Sprint(r)
log.Dump(string(buf[:l]),log.String("error",errString))
log.SError("core dump info[", errString, "]\n", string(buf[:l]))
} }
}() }()

View File

@@ -40,7 +40,7 @@ func (w *worker) run(waitGroup *sync.WaitGroup, t task) {
case tw := <-w.workerQueue: case tw := <-w.workerQueue:
if tw.isExistTask() { if tw.isExistTask() {
//exit goroutine //exit goroutine
log.SRelease("worker goroutine exit") log.Info("worker goroutine exit")
return return
} }
w.exec(&tw) w.exec(&tw)
@@ -59,9 +59,8 @@ func (w *worker) exec(t *task) {
t.cb = func(err error) { t.cb = func(err error) {
cb(errors.New(errString)) cb(errors.New(errString))
} }
log.Dump(string(buf[:l]),log.String("error",errString))
w.endCallFun(true,t) w.endCallFun(true,t)
log.SError("core dump info[", errString, "]\n", string(buf[:l]))
} }
}() }()

View File

@@ -11,8 +11,9 @@ type CommandFunctionCB func(args interface{}) error
var commandList []*command var commandList []*command
var programName string var programName string
const( const(
boolType valueType = iota boolType valueType = 0
stringType valueType = iota stringType valueType = 1
intType valueType = 2
) )
type command struct{ type command struct{
@@ -20,6 +21,7 @@ type command struct{
name string name string
bValue bool bValue bool
strValue string strValue string
intValue int
usage string usage string
fn CommandFunctionCB fn CommandFunctionCB
} }
@@ -29,6 +31,8 @@ func (cmd *command) execute() error{
return cmd.fn(cmd.bValue) return cmd.fn(cmd.bValue)
}else if cmd.valType == stringType { }else if cmd.valType == stringType {
return cmd.fn(cmd.strValue) return cmd.fn(cmd.strValue)
}else if cmd.valType == intType {
return cmd.fn(cmd.intValue)
}else{ }else{
return fmt.Errorf("Unknow command type.") return fmt.Errorf("Unknow command type.")
} }
@@ -72,6 +76,16 @@ func RegisterCommandBool(cmdName string, defaultValue bool, usage string,fn Comm
commandList = append(commandList,&cmd) commandList = append(commandList,&cmd)
} }
func RegisterCommandInt(cmdName string, defaultValue int, usage string,fn CommandFunctionCB){
var cmd command
cmd.valType = intType
cmd.name = cmdName
cmd.fn = fn
cmd.usage = usage
flag.IntVar(&cmd.intValue, cmdName, defaultValue, usage)
commandList = append(commandList,&cmd)
}
func RegisterCommandString(cmdName string, defaultValue string, usage string,fn CommandFunctionCB){ func RegisterCommandString(cmdName string, defaultValue string, usage string,fn CommandFunctionCB){
var cmd command var cmd command
cmd.valType = stringType cmd.valType = stringType

View File

@@ -215,7 +215,7 @@ func (processor *EventProcessor) EventHandler(ev IEvent) {
buf := make([]byte, 4096) buf := make([]byte, 4096)
l := runtime.Stack(buf, false) l := runtime.Stack(buf, false)
errString := fmt.Sprint(r) errString := fmt.Sprint(r)
log.SError("core dump info[",errString,"]\n",string(buf[:l])) log.Dump(string(buf[:l]),log.String("error",errString))
} }
}() }()
@@ -230,13 +230,13 @@ func (processor *EventProcessor) EventHandler(ev IEvent) {
func (processor *EventProcessor) castEvent(event IEvent){ func (processor *EventProcessor) castEvent(event IEvent){
if processor.mapListenerEvent == nil { if processor.mapListenerEvent == nil {
log.SError("mapListenerEvent not init!") log.Error("mapListenerEvent not init!")
return return
} }
eventProcessor,ok := processor.mapListenerEvent[event.GetEventType()] eventProcessor,ok := processor.mapListenerEvent[event.GetEventType()]
if ok == false || processor == nil{ if ok == false || processor == nil{
log.SDebug("event type ",event.GetEventType()," not listen.") log.Debug("event is not listen",log.Int("event type",int(event.GetEventType())))
return return
} }

2
go.mod
View File

@@ -1,6 +1,6 @@
module github.com/duanhf2012/origin module github.com/duanhf2012/origin
go 1.19 go 1.21
require ( require (
github.com/go-sql-driver/mysql v1.6.0 github.com/go-sql-driver/mysql v1.6.0

View File

@@ -2,27 +2,20 @@ package log // import "go.uber.org/zap/buffer"
import ( import (
"strconv" "strconv"
"sync"
) )
const _size = 9216 const _size = 9216
type Buffer struct { type Buffer struct {
bs []byte bs []byte
mu sync.Mutex // ensures atomic writes; protects the following fields //mu sync.Mutex // ensures atomic writes; protects the following fields
} }
func (buff *Buffer) Init(){ func (buff *Buffer) Init(){
buff.bs = make([]byte,_size) buff.bs = make([]byte,_size)
} }
func (buff *Buffer) Locker() {
buff.mu.Lock()
}
func (buff *Buffer) UnLocker() {
buff.mu.Unlock()
}
// AppendByte writes a single byte to the Buffer. // AppendByte writes a single byte to the Buffer.
func (b *Buffer) AppendByte(v byte) { func (b *Buffer) AppendByte(v byte) {

146
log/handler.go Normal file
View File

@@ -0,0 +1,146 @@
package log
import (
"log/slog"
"io"
"path/filepath"
"context"
"runtime"
"runtime/debug"
"sync"
)
type IOriginHandler interface {
slog.Handler
Lock()
UnLock()
}
type BaseHandler struct {
addSource bool
w io.Writer
locker sync.Mutex
}
type OriginTextHandler struct {
BaseHandler
*slog.TextHandler
}
type OriginJsonHandler struct {
BaseHandler
*slog.JSONHandler
}
func getStrLevel(level slog.Level) string{
switch level {
case LevelTrace:
return "TRACE"
case LevelDebug:
return "DEBUG"
case LevelInfo:
return "INFO"
case LevelWarning:
return "WARNING"
case LevelError:
return "ERROR"
case LevelStack:
return "STACK"
case LevelDump:
return "DUMP"
case LevelFatal:
return "FATAL"
}
return ""
}
func defaultReplaceAttr(groups []string, a slog.Attr) slog.Attr {
if a.Key == slog.LevelKey {
level := a.Value.Any().(slog.Level)
a.Value = slog.StringValue(getStrLevel(level))
}else if a.Key == slog.TimeKey && len(groups) == 0 {
a.Value = slog.StringValue(a.Value.Time().Format("2006/01/02 15:04:05"))
}else if a.Key == slog.SourceKey {
source := a.Value.Any().(*slog.Source)
source.File = filepath.Base(source.File)
}
return a
}
func NewOriginTextHandler(level slog.Level,w io.Writer,addSource bool,replaceAttr func([]string,slog.Attr) slog.Attr) slog.Handler{
var textHandler OriginTextHandler
textHandler.addSource = addSource
textHandler.w = w
textHandler.TextHandler = slog.NewTextHandler(w,&slog.HandlerOptions{
AddSource: addSource,
Level: level,
ReplaceAttr: replaceAttr,
})
return &textHandler
}
func (oh *OriginTextHandler) Handle(context context.Context, record slog.Record) error{
oh.Fill(context,&record)
oh.locker.Lock()
defer oh.locker.Unlock()
if record.Level == LevelStack || record.Level == LevelFatal{
err := oh.TextHandler.Handle(context, record)
oh.logStack(&record)
return err
}else if record.Level == LevelDump {
strDump := record.Message
record.Message = "dump info"
err := oh.TextHandler.Handle(context, record)
oh.w.Write([]byte(strDump))
return err
}
return oh.TextHandler.Handle(context, record)
}
func (b *BaseHandler) logStack(record *slog.Record){
b.w.Write(debug.Stack())
}
func (b *BaseHandler) Lock(){
b.locker.Lock()
}
func (b *BaseHandler) UnLock(){
b.locker.Unlock()
}
func NewOriginJsonHandler(level slog.Level,w io.Writer,addSource bool,replaceAttr func([]string,slog.Attr) slog.Attr) slog.Handler{
var jsonHandler OriginJsonHandler
jsonHandler.addSource = addSource
jsonHandler.w = w
jsonHandler.JSONHandler = slog.NewJSONHandler(w,&slog.HandlerOptions{
AddSource: addSource,
Level: level,
ReplaceAttr: replaceAttr,
})
return &jsonHandler
}
func (oh *OriginJsonHandler) Handle(context context.Context, record slog.Record) error{
oh.Fill(context,&record)
if record.Level == LevelStack || record.Level == LevelFatal || record.Level == LevelDump{
record.Add("stack",debug.Stack())
}
oh.locker.Lock()
defer oh.locker.Unlock()
return oh.JSONHandler.Handle(context, record)
}
func (b *BaseHandler) Fill(context context.Context, record *slog.Record) {
if b.addSource {
var pcs [1]uintptr
runtime.Callers(7, pcs[:])
record.PC = pcs[0]
}
}

File diff suppressed because it is too large Load Diff

View File

@@ -47,7 +47,7 @@ func (slf *HttpServer) startListen() error {
for _, caFile := range slf.caFileList { for _, caFile := range slf.caFileList {
cer, err := tls.LoadX509KeyPair(caFile.CertFile, caFile.Keyfile) cer, err := tls.LoadX509KeyPair(caFile.CertFile, caFile.Keyfile)
if err != nil { if err != nil {
log.SFatal("Load CA [",caFile.CertFile,"]-[",caFile.Keyfile,"] file is fail:",err.Error()) log.Fatal("Load CA file is fail",log.String("error",err.Error()),log.String("certFile",caFile.CertFile),log.String("keyFile",caFile.Keyfile))
return err return err
} }
tlsCaList = append(tlsCaList, cer) tlsCaList = append(tlsCaList, cer)
@@ -74,7 +74,7 @@ func (slf *HttpServer) startListen() error {
} }
if err != nil { if err != nil {
log.SFatal("Listen for address ",slf.listenAddr," failure:",err.Error()) log.Fatal("Listen failure",log.String("error",err.Error()),log.String("addr:",slf.listenAddr))
return err return err
} }

View File

@@ -3,9 +3,9 @@ package processor
import ( import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"github.com/duanhf2012/origin/network"
"reflect"
"github.com/duanhf2012/origin/log" "github.com/duanhf2012/origin/log"
"github.com/duanhf2012/origin/util/bytespool"
"reflect"
) )
type MessageJsonInfo struct { type MessageJsonInfo struct {
@@ -24,7 +24,7 @@ type JsonProcessor struct {
unknownMessageHandler UnknownMessageJsonHandler unknownMessageHandler UnknownMessageJsonHandler
connectHandler ConnectJsonHandler connectHandler ConnectJsonHandler
disconnectHandler ConnectJsonHandler disconnectHandler ConnectJsonHandler
network.INetMempool bytespool.IBytesMempool
} }
type JsonPackInfo struct { type JsonPackInfo struct {
@@ -35,7 +35,7 @@ type JsonPackInfo struct {
func NewJsonProcessor() *JsonProcessor { func NewJsonProcessor() *JsonProcessor {
processor := &JsonProcessor{mapMsg:map[uint16]MessageJsonInfo{}} processor := &JsonProcessor{mapMsg:map[uint16]MessageJsonInfo{}}
processor.INetMempool = network.NewMemAreaPool() processor.IBytesMempool = bytespool.NewMemAreaPool()
return processor return processor
} }
@@ -58,7 +58,7 @@ func (jsonProcessor *JsonProcessor ) MsgRoute(clientId uint64,msg interface{}) e
func (jsonProcessor *JsonProcessor) Unmarshal(clientId uint64,data []byte) (interface{}, error) { func (jsonProcessor *JsonProcessor) Unmarshal(clientId uint64,data []byte) (interface{}, error) {
typeStruct := struct {Type int `json:"typ"`}{} typeStruct := struct {Type int `json:"typ"`}{}
defer jsonProcessor.ReleaseByteSlice(data) defer jsonProcessor.ReleaseBytes(data)
err := json.Unmarshal(data, &typeStruct) err := json.Unmarshal(data, &typeStruct)
if err != nil { if err != nil {
return nil, err return nil, err
@@ -106,7 +106,7 @@ func (jsonProcessor *JsonProcessor) MakeRawMsg(msgType uint16,msg []byte) *JsonP
func (jsonProcessor *JsonProcessor) UnknownMsgRoute(clientId uint64,msg interface{}){ func (jsonProcessor *JsonProcessor) UnknownMsgRoute(clientId uint64,msg interface{}){
if jsonProcessor.unknownMessageHandler==nil { if jsonProcessor.unknownMessageHandler==nil {
log.SDebug("Unknown message received from ",clientId) log.Debug("Unknown message",log.Uint64("clientId",clientId))
return return
} }

View File

@@ -3,7 +3,7 @@ package processor
import ( import (
"encoding/binary" "encoding/binary"
"fmt" "fmt"
"github.com/duanhf2012/origin/network" "github.com/duanhf2012/origin/util/bytespool"
"github.com/gogo/protobuf/proto" "github.com/gogo/protobuf/proto"
"reflect" "reflect"
) )
@@ -26,7 +26,7 @@ type PBProcessor struct {
unknownMessageHandler UnknownMessageHandler unknownMessageHandler UnknownMessageHandler
connectHandler ConnectHandler connectHandler ConnectHandler
disconnectHandler ConnectHandler disconnectHandler ConnectHandler
network.INetMempool bytespool.IBytesMempool
} }
type PBPackInfo struct { type PBPackInfo struct {
@@ -37,7 +37,7 @@ type PBPackInfo struct {
func NewPBProcessor() *PBProcessor { func NewPBProcessor() *PBProcessor {
processor := &PBProcessor{mapMsg: map[uint16]MessageInfo{}} processor := &PBProcessor{mapMsg: map[uint16]MessageInfo{}}
processor.INetMempool = network.NewMemAreaPool() processor.IBytesMempool = bytespool.NewMemAreaPool()
return processor return processor
} }
@@ -67,7 +67,7 @@ func (pbProcessor *PBProcessor) MsgRoute(clientId uint64, msg interface{}) error
// must goroutine safe // must goroutine safe
func (pbProcessor *PBProcessor) Unmarshal(clientId uint64, data []byte) (interface{}, error) { func (pbProcessor *PBProcessor) Unmarshal(clientId uint64, data []byte) (interface{}, error) {
defer pbProcessor.ReleaseByteSlice(data) defer pbProcessor.ReleaseBytes(data)
return pbProcessor.UnmarshalWithOutRelease(clientId, data) return pbProcessor.UnmarshalWithOutRelease(clientId, data)
} }

View File

@@ -40,29 +40,29 @@ func (client *TCPClient) init() {
if client.ConnNum <= 0 { if client.ConnNum <= 0 {
client.ConnNum = 1 client.ConnNum = 1
log.SRelease("invalid ConnNum, reset to ", client.ConnNum) log.Info("invalid ConnNum",log.Int("reset", client.ConnNum))
} }
if client.ConnectInterval <= 0 { if client.ConnectInterval <= 0 {
client.ConnectInterval = 3 * time.Second client.ConnectInterval = 3 * time.Second
log.SRelease("invalid ConnectInterval, reset to ", client.ConnectInterval) log.Info("invalid ConnectInterval",log.Duration("reset", client.ConnectInterval))
} }
if client.PendingWriteNum <= 0 { if client.PendingWriteNum <= 0 {
client.PendingWriteNum = 1000 client.PendingWriteNum = 1000
log.SRelease("invalid PendingWriteNum, reset to ", client.PendingWriteNum) log.Info("invalid PendingWriteNum",log.Int("reset",client.PendingWriteNum))
} }
if client.ReadDeadline == 0 { if client.ReadDeadline == 0 {
client.ReadDeadline = 15*time.Second client.ReadDeadline = 15*time.Second
log.SRelease("invalid ReadDeadline, reset to ", int64(client.ReadDeadline.Seconds()),"s") log.Info("invalid ReadDeadline",log.Int64("reset", int64(client.ReadDeadline.Seconds())))
} }
if client.WriteDeadline == 0 { if client.WriteDeadline == 0 {
client.WriteDeadline = 15*time.Second client.WriteDeadline = 15*time.Second
log.SRelease("invalid WriteDeadline, reset to ", int64(client.WriteDeadline.Seconds()),"s") log.Info("invalid WriteDeadline",log.Int64("reset", int64(client.WriteDeadline.Seconds())))
} }
if client.NewAgent == nil { if client.NewAgent == nil {
log.SFatal("NewAgent must not be nil") log.Fatal("NewAgent must not be nil")
} }
if client.cons != nil { if client.cons != nil {
log.SFatal("client is running") log.Fatal("client is running")
} }
if client.MinMsgLen == 0 { if client.MinMsgLen == 0 {
@@ -77,7 +77,7 @@ func (client *TCPClient) init() {
maxMsgLen := client.MsgParser.getMaxMsgLen(client.LenMsgLen) maxMsgLen := client.MsgParser.getMaxMsgLen(client.LenMsgLen)
if client.MaxMsgLen > maxMsgLen { if client.MaxMsgLen > maxMsgLen {
client.MaxMsgLen = maxMsgLen client.MaxMsgLen = maxMsgLen
log.SRelease("invalid MaxMsgLen, reset to ", maxMsgLen) log.Info("invalid MaxMsgLen",log.Uint32("reset", maxMsgLen))
} }
client.cons = make(ConnSet) client.cons = make(ConnSet)
@@ -102,7 +102,7 @@ func (client *TCPClient) dial() net.Conn {
return conn return conn
} }
log.SWarning("connect to ",client.Addr," error:", err.Error()) log.Warning("connect error ",log.String("error",err.Error()), log.String("Addr",client.Addr))
time.Sleep(client.ConnectInterval) time.Sleep(client.ConnectInterval)
continue continue
} }

View File

@@ -41,7 +41,7 @@ func newTCPConn(conn net.Conn, pendingWriteNum int, msgParser *MsgParser,writeDe
conn.SetWriteDeadline(time.Now().Add(writeDeadline)) conn.SetWriteDeadline(time.Now().Add(writeDeadline))
_, err := conn.Write(b) _, err := conn.Write(b)
tcpConn.msgParser.ReleaseByteSlice(b) tcpConn.msgParser.ReleaseBytes(b)
if err != nil { if err != nil {
break break
@@ -92,7 +92,7 @@ func (tcpConn *TCPConn) GetRemoteIp() string {
func (tcpConn *TCPConn) doWrite(b []byte) error{ func (tcpConn *TCPConn) doWrite(b []byte) error{
if len(tcpConn.writeChan) == cap(tcpConn.writeChan) { if len(tcpConn.writeChan) == cap(tcpConn.writeChan) {
tcpConn.ReleaseReadMsg(b) tcpConn.ReleaseReadMsg(b)
log.SError("close conn: channel full") log.Error("close conn: channel full")
tcpConn.doDestroy() tcpConn.doDestroy()
return errors.New("close conn: channel full") return errors.New("close conn: channel full")
} }
@@ -130,7 +130,7 @@ func (tcpConn *TCPConn) ReadMsg() ([]byte, error) {
} }
func (tcpConn *TCPConn) ReleaseReadMsg(byteBuff []byte){ func (tcpConn *TCPConn) ReleaseReadMsg(byteBuff []byte){
tcpConn.msgParser.ReleaseByteSlice(byteBuff) tcpConn.msgParser.ReleaseBytes(byteBuff)
} }
func (tcpConn *TCPConn) WriteMsg(args ...[]byte) error { func (tcpConn *TCPConn) WriteMsg(args ...[]byte) error {

View File

@@ -3,6 +3,7 @@ package network
import ( import (
"encoding/binary" "encoding/binary"
"errors" "errors"
"github.com/duanhf2012/origin/util/bytespool"
"io" "io"
"math" "math"
) )
@@ -16,7 +17,7 @@ type MsgParser struct {
MaxMsgLen uint32 MaxMsgLen uint32
LittleEndian bool LittleEndian bool
INetMempool bytespool.IBytesMempool
} }
@@ -34,7 +35,7 @@ func (p *MsgParser) getMaxMsgLen(lenMsgLen int) uint32 {
} }
func (p *MsgParser) init(){ func (p *MsgParser) init(){
p.INetMempool = NewMemAreaPool() p.IBytesMempool = bytespool.NewMemAreaPool()
} }
// goroutine safe // goroutine safe
@@ -74,9 +75,9 @@ func (p *MsgParser) Read(conn *TCPConn) ([]byte, error) {
} }
// data // data
msgData := p.MakeByteSlice(int(msgLen)) msgData := p.MakeBytes(int(msgLen))
if _, err := io.ReadFull(conn, msgData[:msgLen]); err != nil { if _, err := io.ReadFull(conn, msgData[:msgLen]); err != nil {
p.ReleaseByteSlice(msgData) p.ReleaseBytes(msgData)
return nil, err return nil, err
} }
@@ -99,7 +100,7 @@ func (p *MsgParser) Write(conn *TCPConn, args ...[]byte) error {
} }
//msg := make([]byte, uint32(p.lenMsgLen)+msgLen) //msg := make([]byte, uint32(p.lenMsgLen)+msgLen)
msg := p.MakeByteSlice(p.LenMsgLen+int(msgLen)) msg := p.MakeBytes(p.LenMsgLen+int(msgLen))
// write len // write len
switch p.LenMsgLen { switch p.LenMsgLen {
case 1: case 1:

View File

@@ -2,6 +2,7 @@ package network
import ( import (
"github.com/duanhf2012/origin/log" "github.com/duanhf2012/origin/log"
"github.com/duanhf2012/origin/util/bytespool"
"net" "net"
"sync" "sync"
"time" "time"
@@ -43,52 +44,52 @@ func (server *TCPServer) Start() {
func (server *TCPServer) init() { func (server *TCPServer) init() {
ln, err := net.Listen("tcp", server.Addr) ln, err := net.Listen("tcp", server.Addr)
if err != nil { if err != nil {
log.SFatal("Listen tcp error:", err.Error()) log.Fatal("Listen tcp fail",log.String("error", err.Error()))
} }
if server.MaxConnNum <= 0 { if server.MaxConnNum <= 0 {
server.MaxConnNum = Default_MaxConnNum server.MaxConnNum = Default_MaxConnNum
log.SRelease("invalid MaxConnNum, reset to ", server.MaxConnNum) log.Info("invalid MaxConnNum",log.Int("reset", server.MaxConnNum))
} }
if server.PendingWriteNum <= 0 { if server.PendingWriteNum <= 0 {
server.PendingWriteNum = Default_PendingWriteNum server.PendingWriteNum = Default_PendingWriteNum
log.SRelease("invalid PendingWriteNum, reset to ", server.PendingWriteNum) log.Info("invalid PendingWriteNum",log.Int("reset", server.PendingWriteNum))
} }
if server.LenMsgLen <= 0 { if server.LenMsgLen <= 0 {
server.LenMsgLen = Default_LenMsgLen server.LenMsgLen = Default_LenMsgLen
log.SRelease("invalid LenMsgLen, reset to ", server.LenMsgLen) log.Info("invalid LenMsgLen", log.Int("reset", server.LenMsgLen))
} }
if server.MaxMsgLen <= 0 { if server.MaxMsgLen <= 0 {
server.MaxMsgLen = Default_MaxMsgLen server.MaxMsgLen = Default_MaxMsgLen
log.SRelease("invalid MaxMsgLen, reset to ", server.MaxMsgLen) log.Info("invalid MaxMsgLen", log.Uint32("reset to", server.MaxMsgLen))
} }
maxMsgLen := server.MsgParser.getMaxMsgLen(server.LenMsgLen) maxMsgLen := server.MsgParser.getMaxMsgLen(server.LenMsgLen)
if server.MaxMsgLen > maxMsgLen { if server.MaxMsgLen > maxMsgLen {
server.MaxMsgLen = maxMsgLen server.MaxMsgLen = maxMsgLen
log.SRelease("invalid MaxMsgLen, reset to ", maxMsgLen) log.Info("invalid MaxMsgLen",log.Uint32("reset", maxMsgLen))
} }
if server.MinMsgLen <= 0 { if server.MinMsgLen <= 0 {
server.MinMsgLen = Default_MinMsgLen server.MinMsgLen = Default_MinMsgLen
log.SRelease("invalid MinMsgLen, reset to ", server.MinMsgLen) log.Info("invalid MinMsgLen",log.Uint32("reset", server.MinMsgLen))
} }
if server.WriteDeadline == 0 { if server.WriteDeadline == 0 {
server.WriteDeadline = Default_WriteDeadline server.WriteDeadline = Default_WriteDeadline
log.SRelease("invalid WriteDeadline, reset to ", server.WriteDeadline.Seconds(),"s") log.Info("invalid WriteDeadline",log.Int64("reset",int64(server.WriteDeadline.Seconds())))
} }
if server.ReadDeadline == 0 { if server.ReadDeadline == 0 {
server.ReadDeadline = Default_ReadDeadline server.ReadDeadline = Default_ReadDeadline
log.SRelease("invalid ReadDeadline, reset to ", server.ReadDeadline.Seconds(),"s") log.Info("invalid ReadDeadline",log.Int64("reset", int64(server.ReadDeadline.Seconds())))
} }
if server.NewAgent == nil { if server.NewAgent == nil {
log.SFatal("NewAgent must not be nil") log.Fatal("NewAgent must not be nil")
} }
server.ln = ln server.ln = ln
@@ -96,12 +97,12 @@ func (server *TCPServer) init() {
server.MsgParser.init() server.MsgParser.init()
} }
func (server *TCPServer) SetNetMempool(mempool INetMempool){ func (server *TCPServer) SetNetMempool(mempool bytespool.IBytesMempool){
server.INetMempool = mempool server.IBytesMempool = mempool
} }
func (server *TCPServer) GetNetMempool() INetMempool{ func (server *TCPServer) GetNetMempool() bytespool.IBytesMempool {
return server.INetMempool return server.IBytesMempool
} }
func (server *TCPServer) run() { func (server *TCPServer) run() {
@@ -121,7 +122,7 @@ func (server *TCPServer) run() {
if max := 1 * time.Second; tempDelay > max { if max := 1 * time.Second; tempDelay > max {
tempDelay = max tempDelay = max
} }
log.SRelease("accept error:",err.Error(),"; retrying in ", tempDelay) log.Info("accept fail",log.String("error",err.Error()),log.Duration("sleep time", tempDelay))
time.Sleep(tempDelay) time.Sleep(tempDelay)
continue continue
} }
@@ -135,7 +136,7 @@ func (server *TCPServer) run() {
if len(server.conns) >= server.MaxConnNum { if len(server.conns) >= server.MaxConnNum {
server.mutexConns.Unlock() server.mutexConns.Unlock()
conn.Close() conn.Close()
log.SWarning("too many connections") log.Warning("too many connections")
continue continue
} }

View File

@@ -40,29 +40,29 @@ func (client *WSClient) init() {
if client.ConnNum <= 0 { if client.ConnNum <= 0 {
client.ConnNum = 1 client.ConnNum = 1
log.SRelease("invalid ConnNum, reset to ", client.ConnNum) log.Info("invalid ConnNum",log.Int("reset", client.ConnNum))
} }
if client.ConnectInterval <= 0 { if client.ConnectInterval <= 0 {
client.ConnectInterval = 3 * time.Second client.ConnectInterval = 3 * time.Second
log.SRelease("invalid ConnectInterval, reset to ", client.ConnectInterval) log.Info("invalid ConnectInterval",log.Duration("reset", client.ConnectInterval))
} }
if client.PendingWriteNum <= 0 { if client.PendingWriteNum <= 0 {
client.PendingWriteNum = 100 client.PendingWriteNum = 100
log.SRelease("invalid PendingWriteNum, reset to ", client.PendingWriteNum) log.Info("invalid PendingWriteNum",log.Int("reset", client.PendingWriteNum))
} }
if client.MaxMsgLen <= 0 { if client.MaxMsgLen <= 0 {
client.MaxMsgLen = 4096 client.MaxMsgLen = 4096
log.SRelease("invalid MaxMsgLen, reset to ", client.MaxMsgLen) log.Info("invalid MaxMsgLen",log.Uint32("reset", client.MaxMsgLen))
} }
if client.HandshakeTimeout <= 0 { if client.HandshakeTimeout <= 0 {
client.HandshakeTimeout = 10 * time.Second client.HandshakeTimeout = 10 * time.Second
log.SRelease("invalid HandshakeTimeout, reset to ", client.HandshakeTimeout) log.Info("invalid HandshakeTimeout",log.Duration("reset", client.HandshakeTimeout))
} }
if client.NewAgent == nil { if client.NewAgent == nil {
log.SFatal("NewAgent must not be nil") log.Fatal("NewAgent must not be nil")
} }
if client.cons != nil { if client.cons != nil {
log.SFatal("client is running") log.Fatal("client is running")
} }
if client.MessageType == 0 { if client.MessageType == 0 {
@@ -83,7 +83,7 @@ func (client *WSClient) dial() *websocket.Conn {
return conn return conn
} }
log.SRelease("connect to ", client.Addr," error: ", err.Error()) log.Info("connect fail", log.String("error",err.Error()),log.String("addr",client.Addr))
time.Sleep(client.ConnectInterval) time.Sleep(client.ConnectInterval)
continue continue
} }

View File

@@ -75,7 +75,7 @@ func (wsConn *WSConn) Close() {
func (wsConn *WSConn) doWrite(b []byte) { func (wsConn *WSConn) doWrite(b []byte) {
if len(wsConn.writeChan) == cap(wsConn.writeChan) { if len(wsConn.writeChan) == cap(wsConn.writeChan) {
log.SDebug("close conn: channel full") log.Debug("close conn: channel full")
wsConn.doDestroy() wsConn.doDestroy()
return return
} }

View File

@@ -47,7 +47,7 @@ func (handler *WSHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
} }
conn, err := handler.upgrader.Upgrade(w, r, nil) conn, err := handler.upgrader.Upgrade(w, r, nil)
if err != nil { if err != nil {
log.SError("upgrade error: ", err.Error()) log.Error("upgrade fail",log.String("error",err.Error()))
return return
} }
conn.SetReadLimit(int64(handler.maxMsgLen)) conn.SetReadLimit(int64(handler.maxMsgLen))
@@ -67,7 +67,7 @@ func (handler *WSHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if len(handler.conns) >= handler.maxConnNum { if len(handler.conns) >= handler.maxConnNum {
handler.mutexConns.Unlock() handler.mutexConns.Unlock()
conn.Close() conn.Close()
log.SWarning("too many connections") log.Warning("too many connections")
return return
} }
handler.conns[conn] = struct{}{} handler.conns[conn] = struct{}{}
@@ -95,27 +95,27 @@ func (server *WSServer) SetMessageType(messageType int) {
func (server *WSServer) Start() { func (server *WSServer) Start() {
ln, err := net.Listen("tcp", server.Addr) ln, err := net.Listen("tcp", server.Addr)
if err != nil { if err != nil {
log.SFatal("WSServer Listen fail:", err.Error()) log.Fatal("WSServer Listen fail",log.String("error", err.Error()))
} }
if server.MaxConnNum <= 0 { if server.MaxConnNum <= 0 {
server.MaxConnNum = 100 server.MaxConnNum = 100
log.SRelease("invalid MaxConnNum, reset to ", server.MaxConnNum) log.Info("invalid MaxConnNum", log.Int("reset", server.MaxConnNum))
} }
if server.PendingWriteNum <= 0 { if server.PendingWriteNum <= 0 {
server.PendingWriteNum = 100 server.PendingWriteNum = 100
log.SRelease("invalid PendingWriteNum, reset to ", server.PendingWriteNum) log.Info("invalid PendingWriteNum", log.Int("reset", server.PendingWriteNum))
} }
if server.MaxMsgLen <= 0 { if server.MaxMsgLen <= 0 {
server.MaxMsgLen = 4096 server.MaxMsgLen = 4096
log.SRelease("invalid MaxMsgLen, reset to ", server.MaxMsgLen) log.Info("invalid MaxMsgLen", log.Uint32("reset", server.MaxMsgLen))
} }
if server.HTTPTimeout <= 0 { if server.HTTPTimeout <= 0 {
server.HTTPTimeout = 10 * time.Second server.HTTPTimeout = 10 * time.Second
log.SRelease("invalid HTTPTimeout, reset to ", server.HTTPTimeout) log.Info("invalid HTTPTimeout", log.Duration("reset", server.HTTPTimeout))
} }
if server.NewAgent == nil { if server.NewAgent == nil {
log.SFatal("NewAgent must not be nil") log.Fatal("NewAgent must not be nil")
} }
if server.CertFile != "" || server.KeyFile != "" { if server.CertFile != "" || server.KeyFile != "" {
@@ -126,7 +126,7 @@ func (server *WSServer) Start() {
config.Certificates = make([]tls.Certificate, 1) config.Certificates = make([]tls.Certificate, 1)
config.Certificates[0], err = tls.LoadX509KeyPair(server.CertFile, server.KeyFile) config.Certificates[0], err = tls.LoadX509KeyPair(server.CertFile, server.KeyFile)
if err != nil { if err != nil {
log.SFatal("LoadX509KeyPair fail:", err.Error()) log.Fatal("LoadX509KeyPair fail",log.String("error", err.Error()))
} }
ln = tls.NewListener(ln, config) ln = tls.NewListener(ln, config)

View File

@@ -11,7 +11,6 @@ import (
"github.com/duanhf2012/origin/util/buildtime" "github.com/duanhf2012/origin/util/buildtime"
"github.com/duanhf2012/origin/util/timer" "github.com/duanhf2012/origin/util/timer"
"io" "io"
slog "log"
"net/http" "net/http"
_ "net/http/pprof" _ "net/http/pprof"
"os" "os"
@@ -28,8 +27,8 @@ var preSetupService []service.IService //预安装
var profilerInterval time.Duration var profilerInterval time.Duration
var bValid bool var bValid bool
var configDir = "./config/" var configDir = "./config/"
var logLevel string = "debug"
var logPath string
type BuildOSType = int8 type BuildOSType = int8
const( const(
@@ -50,6 +49,8 @@ func init() {
console.RegisterCommandString("console", "", "<-console true|false> Turn on or off screen log output.", openConsole) console.RegisterCommandString("console", "", "<-console true|false> Turn on or off screen log output.", openConsole)
console.RegisterCommandString("loglevel", "debug", "<-loglevel debug|release|warning|error|fatal> Set loglevel.", setLevel) console.RegisterCommandString("loglevel", "debug", "<-loglevel debug|release|warning|error|fatal> Set loglevel.", setLevel)
console.RegisterCommandString("logpath", "", "<-logpath path> Set log file path.", setLogPath) console.RegisterCommandString("logpath", "", "<-logpath path> Set log file path.", setLogPath)
console.RegisterCommandInt("logsize", 0, "<-logsize size> Set log size(MB).", setLogSize)
console.RegisterCommandInt("logchannelcap", 0, "<-logchannelcap num> Set log channel cap.", setLogChannelCapNum)
console.RegisterCommandString("pprof", "", "<-pprof ip:port> Open performance analysis.", setPprof) console.RegisterCommandString("pprof", "", "<-pprof ip:port> Open performance analysis.", setPprof)
} }
@@ -144,7 +145,7 @@ func initNode(id int) {
nodeId = id nodeId = id
err := cluster.GetCluster().Init(GetNodeId(), Setup) err := cluster.GetCluster().Init(GetNodeId(), Setup)
if err != nil { if err != nil {
log.SFatal("read system config is error ", err.Error()) log.Fatal("read system config is error ",log.ErrorAttr("error",err))
} }
err = initLog() err = initLog()
@@ -168,7 +169,7 @@ func initNode(id int) {
} }
if bSetup == false { if bSetup == false {
log.SFatal("Service name "+serviceName+" configuration error") log.Fatal("Service name "+serviceName+" configuration error")
} }
} }
@@ -177,13 +178,13 @@ func initNode(id int) {
} }
func initLog() error { func initLog() error {
if logPath == "" { if log.LogPath == "" {
setLogPath("./log") setLogPath("./log")
} }
localnodeinfo := cluster.GetCluster().GetLocalNodeInfo() localnodeinfo := cluster.GetCluster().GetLocalNodeInfo()
filepre := fmt.Sprintf("%s_%d_", localnodeinfo.NodeName, localnodeinfo.NodeId) filepre := fmt.Sprintf("%s_%d_", localnodeinfo.NodeName, localnodeinfo.NodeId)
logger, err := log.New(logLevel, logPath, filepre, slog.LstdFlags|slog.Lshortfile, 10) logger, err := log.NewTextLogger(log.LogLevel,log.LogPath,filepre,true,log.LogChannelCap)
if err != nil { if err != nil {
fmt.Printf("cannot create log file!\n") fmt.Printf("cannot create log file!\n")
return err return err
@@ -248,7 +249,7 @@ func startNode(args interface{}) error {
} }
timer.StartTimer(10*time.Millisecond, 1000000) timer.StartTimer(10*time.Millisecond, 1000000)
log.SRelease("Start running server.") log.Info("Start running server.")
//2.初始化node //2.初始化node
initNode(nodeId) initNode(nodeId)
@@ -270,7 +271,7 @@ func startNode(args interface{}) error {
for bRun { for bRun {
select { select {
case <-sig: case <-sig:
log.SRelease("receipt stop signal.") log.Info("receipt stop signal.")
bRun = false bRun = false
case <-pProfilerTicker.C: case <-pProfilerTicker.C:
profiler.Report() profiler.Report()
@@ -280,7 +281,8 @@ func startNode(args interface{}) error {
//7.退出 //7.退出
service.StopAllService() service.StopAllService()
log.SRelease("Server is stop.") log.Info("Server is stop.")
log.Close()
return nil return nil
} }
@@ -304,11 +306,6 @@ func GetConfigDir() string {
return configDir return configDir
} }
func SetSysLog(strLevel string, pathname string, flag int) {
logs, _ := log.New(strLevel, pathname, "", flag, 10)
log.Export(logs)
}
func OpenProfilerReport(interval time.Duration) { func OpenProfilerReport(interval time.Duration) {
profilerInterval = interval profilerInterval = interval
} }
@@ -333,9 +330,24 @@ func setLevel(args interface{}) error {
return nil return nil
} }
logLevel = strings.TrimSpace(args.(string)) strlogLevel := strings.TrimSpace(args.(string))
if logLevel != "debug" && logLevel != "release" && logLevel != "warning" && logLevel != "error" && logLevel != "fatal" { switch strlogLevel {
return errors.New("unknown level: " + logLevel) case "trace":
log.LogLevel = log.LevelTrace
case "debug":
log.LogLevel = log.LevelDebug
case "info":
log.LogLevel = log.LevelInfo
case "warning":
log.LogLevel = log.LevelWarning
case "error":
log.LogLevel = log.LevelError
case "stack":
log.LogLevel = log.LevelStack
case "fatal":
log.LogLevel = log.LevelFatal
default:
return errors.New("unknown level: " + strlogLevel)
} }
return nil return nil
} }
@@ -344,18 +356,48 @@ func setLogPath(args interface{}) error {
if args == "" { if args == "" {
return nil return nil
} }
logPath = strings.TrimSpace(args.(string))
dir, err := os.Stat(logPath) //这个文件夹不存在 log.LogPath = strings.TrimSpace(args.(string))
dir, err := os.Stat(log.LogPath) //这个文件夹不存在
if err == nil && dir.IsDir() == false { if err == nil && dir.IsDir() == false {
return errors.New("Not found dir " + logPath) return errors.New("Not found dir " + log.LogPath)
} }
if err != nil { if err != nil {
err = os.Mkdir(logPath, os.ModePerm) err = os.Mkdir(log.LogPath, os.ModePerm)
if err != nil { if err != nil {
return errors.New("Cannot create dir " + logPath) return errors.New("Cannot create dir " + log.LogPath)
} }
} }
return nil return nil
} }
func setLogSize(args interface{}) error {
if args == "" {
return nil
}
logSize,ok := args.(int)
if ok == false{
return errors.New("param logsize is error")
}
log.LogSize = int64(logSize)*1024*1024
return nil
}
func setLogChannelCapNum(args interface{}) error {
if args == "" {
return nil
}
logChannelCap,ok := args.(int)
if ok == false{
return errors.New("param logsize is error")
}
log.LogChannelCap = logChannelCap
return nil
}

View File

@@ -167,7 +167,7 @@ func DefaultReportFunction(name string,callNum int,costTime time.Duration,record
elem = elem.Next() elem = elem.Next()
} }
log.SRelease(strReport) log.SInfo("report",strReport)
} }
func Report() { func Report() {

View File

@@ -81,14 +81,14 @@ func (bc *Client) checkRpcCallTimeout() {
pCall := bc.pending[callSeq] pCall := bc.pending[callSeq]
if pCall == nil { if pCall == nil {
bc.pendingLock.Unlock() bc.pendingLock.Unlock()
log.SError("callSeq ",callSeq," is not find") log.Error("call seq is not find",log.Uint64("seq", callSeq))
continue continue
} }
delete(bc.pending,callSeq) delete(bc.pending,callSeq)
strTimeout := strconv.FormatInt(int64(pCall.TimeOut.Seconds()), 10) strTimeout := strconv.FormatInt(int64(pCall.TimeOut.Seconds()), 10)
pCall.Err = errors.New("RPC call takes more than " + strTimeout + " seconds,method is "+pCall.ServiceMethod) pCall.Err = errors.New("RPC call takes more than " + strTimeout + " seconds,method is "+pCall.ServiceMethod)
log.SError(pCall.Err.Error()) log.Error("call timeout",log.String("error",pCall.Err.Error()))
bc.makeCallFail(pCall) bc.makeCallFail(pCall)
bc.pendingLock.Unlock() bc.pendingLock.Unlock()
continue continue
@@ -108,7 +108,7 @@ func (bc *Client) AddPending(call *Call) {
if call.Seq == 0 { if call.Seq == 0 {
bc.pendingLock.Unlock() bc.pendingLock.Unlock()
log.SStack("call is error.") log.Stack("call is error.")
return return
} }
@@ -160,7 +160,7 @@ func (bc *Client) cleanPending(){
} }
pCall := bc.pending[callSeq] pCall := bc.pending[callSeq]
if pCall == nil { if pCall == nil {
log.SError("callSeq ",callSeq," is not find") log.Error("call Seq is not find",log.Uint64("seq",callSeq))
continue continue
} }

View File

@@ -1,14 +1,14 @@
package rpc package rpc
import ( import (
"runtime"
"errors" "errors"
"github.com/pierrec/lz4/v4"
"fmt" "fmt"
"github.com/duanhf2012/origin/network" "github.com/duanhf2012/origin/util/bytespool"
"github.com/pierrec/lz4/v4"
"runtime"
) )
var memPool network.INetMempool = network.NewMemAreaPool() var memPool bytespool.IBytesMempool = bytespool.NewMemAreaPool()
type ICompressor interface { type ICompressor interface {
CompressBlock(src []byte) ([]byte, error) //dst如果有预申请使用dst内存传入nil时内部申请 CompressBlock(src []byte) ([]byte, error) //dst如果有预申请使用dst内存传入nil时内部申请
@@ -42,10 +42,10 @@ func (lc *Lz4Compressor) CompressBlock(src []byte) (dest []byte, err error) {
var c lz4.Compressor var c lz4.Compressor
var cnt int var cnt int
dest = memPool.MakeByteSlice(lz4.CompressBlockBound(len(src))+1) dest = memPool.MakeBytes(lz4.CompressBlockBound(len(src))+1)
cnt, err = c.CompressBlock(src, dest[1:]) cnt, err = c.CompressBlock(src, dest[1:])
if err != nil { if err != nil {
memPool.ReleaseByteSlice(dest) memPool.ReleaseBytes(dest)
return nil,err return nil,err
} }
@@ -55,7 +55,7 @@ func (lc *Lz4Compressor) CompressBlock(src []byte) (dest []byte, err error) {
} }
if ratio > 255 { if ratio > 255 {
memPool.ReleaseByteSlice(dest) memPool.ReleaseBytes(dest)
return nil,fmt.Errorf("Impermissible errors") return nil,fmt.Errorf("Impermissible errors")
} }
@@ -79,10 +79,10 @@ func (lc *Lz4Compressor) UncompressBlock(src []byte) (dest []byte, err error) {
return nil,fmt.Errorf("Impermissible errors") return nil,fmt.Errorf("Impermissible errors")
} }
dest = memPool.MakeByteSlice(len(src)*int(radio)) dest = memPool.MakeBytes(len(src)*int(radio))
cnt, err := lz4.UncompressBlock(src[1:], dest) cnt, err := lz4.UncompressBlock(src[1:], dest)
if err != nil { if err != nil {
memPool.ReleaseByteSlice(dest) memPool.ReleaseBytes(dest)
return nil,err return nil,err
} }
@@ -94,9 +94,9 @@ func (lc *Lz4Compressor) compressBlockBound(n int) int{
} }
func (lc *Lz4Compressor) CompressBufferCollection(buffer []byte){ func (lc *Lz4Compressor) CompressBufferCollection(buffer []byte){
memPool.ReleaseByteSlice(buffer) memPool.ReleaseBytes(buffer)
} }
func (lc *Lz4Compressor) UnCompressBufferCollection(buffer []byte) { func (lc *Lz4Compressor) UnCompressBufferCollection(buffer []byte) {
memPool.ReleaseByteSlice(buffer) memPool.ReleaseBytes(buffer)
} }

View File

@@ -43,7 +43,7 @@ func (lc *LClient) Go(timeout time.Duration,rpcHandler IRpcHandler,noReply bool,
findIndex := strings.Index(serviceMethod, ".") findIndex := strings.Index(serviceMethod, ".")
if findIndex == -1 { if findIndex == -1 {
sErr := errors.New("Call serviceMethod " + serviceMethod + " is error!") sErr := errors.New("Call serviceMethod " + serviceMethod + " is error!")
log.SError(sErr.Error()) log.Error("call rpc fail",log.String("error",sErr.Error()))
call := MakeCall() call := MakeCall()
call.DoError(sErr) call.DoError(sErr)
@@ -100,7 +100,7 @@ func (lc *LClient) AsyncCall(timeout time.Duration,rpcHandler IRpcHandler, servi
if findIndex == -1 { if findIndex == -1 {
err := errors.New("Call serviceMethod " + serviceMethod + " is error!") err := errors.New("Call serviceMethod " + serviceMethod + " is error!")
callback.Call([]reflect.Value{reflect.ValueOf(reply), reflect.ValueOf(err)}) callback.Call([]reflect.Value{reflect.ValueOf(reply), reflect.ValueOf(err)})
log.SError(err.Error()) log.Error("serviceMethod format is error",log.String("error",err.Error()))
return emptyCancelRpc,nil return emptyCancelRpc,nil
} }

View File

@@ -46,7 +46,7 @@ func (rc *RClient) Go(timeout time.Duration,rpcHandler IRpcHandler,noReply bool,
_, processor := GetProcessorType(args) _, processor := GetProcessorType(args)
InParam, err := processor.Marshal(args) InParam, err := processor.Marshal(args)
if err != nil { if err != nil {
log.SError(err.Error()) log.Error("Marshal is fail",log.ErrorAttr("error",err))
call := MakeCall() call := MakeCall()
call.DoError(err) call.DoError(err)
return call return call
@@ -68,7 +68,7 @@ func (rc *RClient) RawGo(timeout time.Duration,rpcHandler IRpcHandler,processor
if err != nil { if err != nil {
call.Seq = 0 call.Seq = 0
log.SError(err.Error()) log.Error("marshal is fail",log.String("error",err.Error()))
call.DoError(err) call.DoError(err)
return call return call
} }
@@ -77,7 +77,7 @@ func (rc *RClient) RawGo(timeout time.Duration,rpcHandler IRpcHandler,processor
if conn == nil || conn.IsConnected()==false { if conn == nil || conn.IsConnected()==false {
call.Seq = 0 call.Seq = 0
sErr := errors.New(serviceMethod + " was called failed,rpc client is disconnect") sErr := errors.New(serviceMethod + " was called failed,rpc client is disconnect")
log.SError(sErr.Error()) log.Error("conn is disconnect",log.String("error",sErr.Error()))
call.DoError(sErr) call.DoError(sErr)
return call return call
} }
@@ -89,7 +89,7 @@ func (rc *RClient) RawGo(timeout time.Duration,rpcHandler IRpcHandler,processor
compressBuff,cErr = compressor.CompressBlock(bytes) compressBuff,cErr = compressor.CompressBlock(bytes)
if cErr != nil { if cErr != nil {
call.Seq = 0 call.Seq = 0
log.SError(cErr.Error()) log.Error("compress fail",log.String("error",cErr.Error()))
call.DoError(cErr) call.DoError(cErr)
return call return call
} }
@@ -109,9 +109,7 @@ func (rc *RClient) RawGo(timeout time.Duration,rpcHandler IRpcHandler,processor
} }
if err != nil { if err != nil {
rc.selfClient.RemovePending(call.Seq) rc.selfClient.RemovePending(call.Seq)
log.Error("WiteMsg is fail",log.ErrorAttr("error",err))
log.SError(err.Error())
call.Seq = 0 call.Seq = 0
call.DoError(err) call.DoError(err)
} }
@@ -191,14 +189,13 @@ func (rc *RClient) asyncCall(timeout time.Duration,rpcHandler IRpcHandler, servi
return emptyCancelRpc,nil return emptyCancelRpc,nil
} }
func (rc *RClient) Run() { func (rc *RClient) Run() {
defer func() { defer func() {
if r := recover(); r != nil { if r := recover(); r != nil {
buf := make([]byte, 4096) buf := make([]byte, 4096)
l := runtime.Stack(buf, false) l := runtime.Stack(buf, false)
errString := fmt.Sprint(r) errString := fmt.Sprint(r)
log.SError("core dump info[", errString, "]\n", string(buf[:l])) log.Dump(string(buf[:l]),log.String("error",errString))
} }
}() }()
@@ -206,7 +203,7 @@ func (rc *RClient) Run() {
for { for {
bytes, err := rc.conn.ReadMsg() bytes, err := rc.conn.ReadMsg()
if err != nil { if err != nil {
log.SError("rpcClient ", rc.Addr, " ReadMsg error:", err.Error()) log.Error("rclient read msg is failed",log.ErrorAttr("error",err))
return return
} }
@@ -214,7 +211,7 @@ func (rc *RClient) Run() {
processor := GetProcessor(bytes[0]&0x7f) processor := GetProcessor(bytes[0]&0x7f)
if processor == nil { if processor == nil {
rc.conn.ReleaseReadMsg(bytes) rc.conn.ReleaseReadMsg(bytes)
log.SError("rpcClient ", rc.Addr, " ReadMsg head error:", err.Error()) log.Error("cannot find process",log.Uint8("process type",bytes[0]&0x7f))
return return
} }
@@ -231,7 +228,7 @@ func (rc *RClient) Run() {
compressBuff,unCompressErr = compressor.UncompressBlock(byteData) compressBuff,unCompressErr = compressor.UncompressBlock(byteData)
if unCompressErr!= nil { if unCompressErr!= nil {
rc.conn.ReleaseReadMsg(bytes) rc.conn.ReleaseReadMsg(bytes)
log.SError("rpcClient ", rc.Addr, " ReadMsg head error:", unCompressErr.Error()) log.Error("uncompressBlock failed",log.ErrorAttr("error",unCompressErr))
return return
} }
byteData = compressBuff byteData = compressBuff
@@ -245,19 +242,19 @@ func (rc *RClient) Run() {
rc.conn.ReleaseReadMsg(bytes) rc.conn.ReleaseReadMsg(bytes)
if err != nil { if err != nil {
processor.ReleaseRpcResponse(response.RpcResponseData) processor.ReleaseRpcResponse(response.RpcResponseData)
log.SError("rpcClient Unmarshal head error:", err.Error()) log.Error("rpcClient Unmarshal head error",log.ErrorAttr("error",err))
continue continue
} }
v := rc.selfClient.RemovePending(response.RpcResponseData.GetSeq()) v := rc.selfClient.RemovePending(response.RpcResponseData.GetSeq())
if v == nil { if v == nil {
log.SError("rpcClient cannot find seq ", response.RpcResponseData.GetSeq(), " in pending") log.Error("rpcClient cannot find seq",log.Uint64("seq",response.RpcResponseData.GetSeq()))
} else { } else {
v.Err = nil v.Err = nil
if len(response.RpcResponseData.GetReply()) > 0 { if len(response.RpcResponseData.GetReply()) > 0 {
err = processor.Unmarshal(response.RpcResponseData.GetReply(), v.Reply) err = processor.Unmarshal(response.RpcResponseData.GetReply(), v.Reply)
if err != nil { if err != nil {
log.SError("rpcClient Unmarshal body error:", err.Error()) log.Error("rpcClient Unmarshal body failed",log.ErrorAttr("error",err))
v.Err = err v.Err = err
} }
} }

View File

@@ -215,7 +215,7 @@ func (handler *RpcHandler) HandlerRpcResponseCB(call *Call) {
buf := make([]byte, 4096) buf := make([]byte, 4096)
l := runtime.Stack(buf, false) l := runtime.Stack(buf, false)
errString := fmt.Sprint(r) errString := fmt.Sprint(r)
log.SError("core dump info[", errString, "]\n", string(buf[:l])) log.Dump(string(buf[:l]),log.String("error",errString))
} }
}() }()
@@ -237,7 +237,7 @@ func (handler *RpcHandler) HandlerRpcRequest(request *RpcRequest) {
buf := make([]byte, 4096) buf := make([]byte, 4096)
l := runtime.Stack(buf, false) l := runtime.Stack(buf, false)
errString := fmt.Sprint(r) errString := fmt.Sprint(r)
log.SError("Handler Rpc ", request.RpcRequestData.GetServiceMethod(), " Core dump info[", errString, "]\n", string(buf[:l])) log.Dump(string(buf[:l]),log.String("error",errString))
rpcErr := RpcError("call error : core dumps") rpcErr := RpcError("call error : core dumps")
if request.requestHandle != nil { if request.requestHandle != nil {
request.requestHandle(nil, rpcErr) request.requestHandle(nil, rpcErr)
@@ -250,12 +250,12 @@ func (handler *RpcHandler) HandlerRpcRequest(request *RpcRequest) {
if rawRpcId > 0 { if rawRpcId > 0 {
v, ok := handler.mapRawFunctions[rawRpcId] v, ok := handler.mapRawFunctions[rawRpcId]
if ok == false { if ok == false {
log.SError("RpcHandler cannot find request rpc id", rawRpcId) log.Error("RpcHandler cannot find request rpc id",log.Uint32("rawRpcId",rawRpcId))
return return
} }
rawData,ok := request.inParam.([]byte) rawData,ok := request.inParam.([]byte)
if ok == false { if ok == false {
log.SError("RpcHandler " + handler.rpcHandler.GetName()," cannot convert in param to []byte", rawRpcId) log.Error("RpcHandler cannot convert",log.String("RpcHandlerName",handler.rpcHandler.GetName()),log.Uint32("rawRpcId",rawRpcId))
return return
} }
@@ -267,7 +267,7 @@ func (handler *RpcHandler) HandlerRpcRequest(request *RpcRequest) {
v, ok := handler.mapFunctions[request.RpcRequestData.GetServiceMethod()] v, ok := handler.mapFunctions[request.RpcRequestData.GetServiceMethod()]
if ok == false { if ok == false {
err := "RpcHandler " + handler.rpcHandler.GetName() + "cannot find " + request.RpcRequestData.GetServiceMethod() err := "RpcHandler " + handler.rpcHandler.GetName() + "cannot find " + request.RpcRequestData.GetServiceMethod()
log.SError(err) log.Error("HandlerRpcRequest cannot find serviceMethod",log.String("RpcHandlerName",handler.rpcHandler.GetName()),log.String("serviceMethod",request.RpcRequestData.GetServiceMethod()))
if request.requestHandle != nil { if request.requestHandle != nil {
request.requestHandle(nil, RpcError(err)) request.requestHandle(nil, RpcError(err))
} }
@@ -298,7 +298,7 @@ func (handler *RpcHandler) HandlerRpcRequest(request *RpcRequest) {
paramList = append(paramList, oParam) //输出参数 paramList = append(paramList, oParam) //输出参数
} else if request.requestHandle != nil && v.hasResponder == false { //调用方有返回值,但被调用函数没有返回参数 } else if request.requestHandle != nil && v.hasResponder == false { //调用方有返回值,但被调用函数没有返回参数
rErr := "Call Rpc " + request.RpcRequestData.GetServiceMethod() + " without return parameter!" rErr := "Call Rpc " + request.RpcRequestData.GetServiceMethod() + " without return parameter!"
log.SError(rErr) log.Error("call serviceMethod without return parameter",log.String("serviceMethod",request.RpcRequestData.GetServiceMethod()))
request.requestHandle(nil, RpcError(rErr)) request.requestHandle(nil, RpcError(rErr))
return return
} }
@@ -320,7 +320,7 @@ func (handler *RpcHandler) CallMethod(client *Client,ServiceMethod string, param
v, ok := handler.mapFunctions[ServiceMethod] v, ok := handler.mapFunctions[ServiceMethod]
if ok == false { if ok == false {
err = errors.New("RpcHandler " + handler.rpcHandler.GetName() + " cannot find" + ServiceMethod) err = errors.New("RpcHandler " + handler.rpcHandler.GetName() + " cannot find" + ServiceMethod)
log.SError(err.Error()) log.Error("CallMethod cannot find serviceMethod",log.String("rpcHandlerName",handler.rpcHandler.GetName()),log.String("serviceMethod",ServiceMethod))
return err return err
} }
@@ -344,7 +344,7 @@ func (handler *RpcHandler) CallMethod(client *Client,ServiceMethod string, param
hander :=func(Returns interface{}, Err RpcError) { hander :=func(Returns interface{}, Err RpcError) {
rpcCall := client.RemovePending(callSeq) rpcCall := client.RemovePending(callSeq)
if rpcCall == nil { if rpcCall == nil {
log.SError("cannot find call seq ",callSeq) log.Error("cannot find call seq",log.Uint64("seq",callSeq))
return return
} }
@@ -431,15 +431,15 @@ func (handler *RpcHandler) goRpc(processor IRpcProcessor, bCast bool, nodeId int
err, count := handler.funcRpcClient(nodeId, serviceMethod, pClientList[:]) err, count := handler.funcRpcClient(nodeId, serviceMethod, pClientList[:])
if count == 0 { if count == 0 {
if err != nil { if err != nil {
log.SError("Call ", serviceMethod, " is error:", err.Error()) log.Error("call serviceMethod is failed",log.String("serviceMethod",serviceMethod),log.ErrorAttr("error",err))
} else { } else {
log.SError("Can not find ", serviceMethod) log.Error("cannot find serviceMethod",log.String("serviceMethod",serviceMethod))
} }
return err return err
} }
if count > 1 && bCast == false { if count > 1 && bCast == false {
log.SError("Cannot call %s more then 1 node!", serviceMethod) log.Error("cannot call serviceMethod more then 1 node",log.String("serviceMethod",serviceMethod))
return errors.New("cannot call more then 1 node") return errors.New("cannot call more then 1 node")
} }
@@ -460,14 +460,14 @@ func (handler *RpcHandler) callRpc(timeout time.Duration,nodeId int, serviceMeth
var pClientList [maxClusterNode]*Client var pClientList [maxClusterNode]*Client
err, count := handler.funcRpcClient(nodeId, serviceMethod, pClientList[:]) err, count := handler.funcRpcClient(nodeId, serviceMethod, pClientList[:])
if err != nil { if err != nil {
log.SError("Call serviceMethod is error:", err.Error()) log.Error("Call serviceMethod is failed",log.ErrorAttr("error",err))
return err return err
} else if count <= 0 { } else if count <= 0 {
err = errors.New("Call serviceMethod is error:cannot find " + serviceMethod) err = errors.New("Call serviceMethod is error:cannot find " + serviceMethod)
log.SError(err.Error()) log.Error("cannot find serviceMethod",log.String("serviceMethod",serviceMethod))
return err return err
} else if count > 1 { } else if count > 1 {
log.SError("Cannot call more then 1 node!") log.Error("Cannot call more then 1 node!",log.String("serviceMethod",serviceMethod))
return errors.New("cannot call more then 1 node") return errors.New("cannot call more then 1 node")
} }
@@ -484,19 +484,19 @@ func (handler *RpcHandler) asyncCallRpc(timeout time.Duration,nodeId int, servic
fVal := reflect.ValueOf(callback) fVal := reflect.ValueOf(callback)
if fVal.Kind() != reflect.Func { if fVal.Kind() != reflect.Func {
err := errors.New("call " + serviceMethod + " input callback param is error!") err := errors.New("call " + serviceMethod + " input callback param is error!")
log.SError(err.Error()) log.Error("input callback param is error",log.String("serviceMethod",serviceMethod))
return emptyCancelRpc,err return emptyCancelRpc,err
} }
if fVal.Type().NumIn() != 2 { if fVal.Type().NumIn() != 2 {
err := errors.New("call " + serviceMethod + " callback param function is error!") err := errors.New("call " + serviceMethod + " callback param function is error!")
log.SError(err.Error()) log.Error("callback param function is error",log.String("serviceMethod",serviceMethod))
return emptyCancelRpc,err return emptyCancelRpc,err
} }
if fVal.Type().In(0).Kind() != reflect.Ptr || fVal.Type().In(1).String() != "error" { if fVal.Type().In(0).Kind() != reflect.Ptr || fVal.Type().In(1).String() != "error" {
err := errors.New("call " + serviceMethod + " callback param function is error!") err := errors.New("call " + serviceMethod + " callback param function is error!")
log.SError(err.Error()) log.Error("callback param function is error",log.String("serviceMethod",serviceMethod))
return emptyCancelRpc,err return emptyCancelRpc,err
} }
@@ -512,14 +512,14 @@ func (handler *RpcHandler) asyncCallRpc(timeout time.Duration,nodeId int, servic
} }
} }
fVal.Call([]reflect.Value{reflect.ValueOf(reply), reflect.ValueOf(err)}) fVal.Call([]reflect.Value{reflect.ValueOf(reply), reflect.ValueOf(err)})
log.SError("Call serviceMethod is error:", err.Error()) log.Error("cannot find serviceMethod from node",log.String("serviceMethod",serviceMethod),log.Int("nodeId",nodeId))
return emptyCancelRpc,nil return emptyCancelRpc,nil
} }
if count > 1 { if count > 1 {
err := errors.New("cannot call more then 1 node") err := errors.New("cannot call more then 1 node")
fVal.Call([]reflect.Value{reflect.ValueOf(reply), reflect.ValueOf(err)}) fVal.Call([]reflect.Value{reflect.ValueOf(reply), reflect.ValueOf(err)})
log.SError(err.Error()) log.Error("cannot call more then 1 node",log.String("serviceMethod",serviceMethod))
return emptyCancelRpc,nil return emptyCancelRpc,nil
} }
@@ -587,12 +587,12 @@ func (handler *RpcHandler) RawGoNode(rpcProcessorType RpcProcessorType, nodeId i
processor := GetProcessor(uint8(rpcProcessorType)) processor := GetProcessor(uint8(rpcProcessorType))
err, count := handler.funcRpcClient(nodeId, serviceName, handler.pClientList) err, count := handler.funcRpcClient(nodeId, serviceName, handler.pClientList)
if count == 0 || err != nil { if count == 0 || err != nil {
log.SError("Call serviceMethod is error:", err.Error()) log.Error("call serviceMethod is failed",log.ErrorAttr("error",err))
return err return err
} }
if count > 1 { if count > 1 {
err := errors.New("cannot call more then 1 node") err := errors.New("cannot call more then 1 node")
log.SError(err.Error()) log.Error("cannot call more then 1 node",log.String("serviceName",serviceName))
return err return err
} }

View File

@@ -69,7 +69,7 @@ const Default_ReadWriteDeadline = 15*time.Second
func (server *Server) Start(listenAddr string, maxRpcParamLen uint32,compressBytesLen int) { func (server *Server) Start(listenAddr string, maxRpcParamLen uint32,compressBytesLen int) {
splitAddr := strings.Split(listenAddr, ":") splitAddr := strings.Split(listenAddr, ":")
if len(splitAddr) != 2 { if len(splitAddr) != 2 {
log.SFatal("listen addr is error :", listenAddr) log.Fatal("listen addr is failed", log.String("listenAddr",listenAddr))
} }
server.rpcServer.Addr = ":" + splitAddr[1] server.rpcServer.Addr = ":" + splitAddr[1]
@@ -111,7 +111,7 @@ func (agent *RpcAgent) WriteResponse(processor IRpcProcessor, serviceMethod stri
defer processor.ReleaseRpcResponse(rpcResponse.RpcResponseData) defer processor.ReleaseRpcResponse(rpcResponse.RpcResponseData)
if errM != nil { if errM != nil {
log.SError("service method ", serviceMethod, " Marshal error:", errM.Error()) log.Error("mashal RpcResponseData failed",log.String("serviceMethod",serviceMethod),log.ErrorAttr("error",errM))
return return
} }
@@ -122,7 +122,7 @@ func (agent *RpcAgent) WriteResponse(processor IRpcProcessor, serviceMethod stri
compressBuff,cErr = compressor.CompressBlock(bytes) compressBuff,cErr = compressor.CompressBlock(bytes)
if cErr != nil { if cErr != nil {
log.SError("service method ", serviceMethod, " CompressBlock error:", cErr.Error()) log.Error("CompressBlock failed",log.String("serviceMethod",serviceMethod),log.ErrorAttr("error",cErr))
return return
} }
if len(compressBuff) < len(bytes) { if len(compressBuff) < len(bytes) {
@@ -136,7 +136,7 @@ func (agent *RpcAgent) WriteResponse(processor IRpcProcessor, serviceMethod stri
compressor.CompressBufferCollection(compressBuff) compressor.CompressBufferCollection(compressBuff)
} }
if errM != nil { if errM != nil {
log.SError("Rpc ", serviceMethod, " return is error:", errM.Error()) log.Error("WriteMsg error,Rpc return is fail",log.String("serviceMethod",serviceMethod),log.ErrorAttr("error",errM))
} }
} }
@@ -144,7 +144,7 @@ func (agent *RpcAgent) Run() {
for { for {
data, err := agent.conn.ReadMsg() data, err := agent.conn.ReadMsg()
if err != nil { if err != nil {
log.SError("remoteAddress:", agent.conn.RemoteAddr().String(), ",read message: ", err.Error()) log.Error("read message is error",log.String("remoteAddress",agent.conn.RemoteAddr().String()),log.ErrorAttr("error",err))
//will close tcpconn //will close tcpconn
break break
} }
@@ -153,7 +153,7 @@ func (agent *RpcAgent) Run() {
processor := GetProcessor(data[0]&0x7f) processor := GetProcessor(data[0]&0x7f)
if processor == nil { if processor == nil {
agent.conn.ReleaseReadMsg(data) agent.conn.ReleaseReadMsg(data)
log.SError("remote rpc ", agent.conn.RemoteAddr().String(), " cannot find processor:", data[0]) log.Warning("cannot find processor",log.String("RemoteAddr",agent.conn.RemoteAddr().String()))
return return
} }
@@ -166,7 +166,7 @@ func (agent *RpcAgent) Run() {
compressBuff,unCompressErr = compressor.UncompressBlock(byteData) compressBuff,unCompressErr = compressor.UncompressBlock(byteData)
if unCompressErr!= nil { if unCompressErr!= nil {
agent.conn.ReleaseReadMsg(data) agent.conn.ReleaseReadMsg(data)
log.SError("rpcClient ", agent.conn.RemoteAddr().String(), " ReadMsg head error:", unCompressErr.Error()) log.Error("UncompressBlock failed",log.String("RemoteAddr",agent.conn.RemoteAddr().String()),log.ErrorAttr("error",unCompressErr))
return return
} }
byteData = compressBuff byteData = compressBuff
@@ -179,7 +179,7 @@ func (agent *RpcAgent) Run() {
} }
agent.conn.ReleaseReadMsg(data) agent.conn.ReleaseReadMsg(data)
if err != nil { if err != nil {
log.SError("rpc Unmarshal request is error:", err.Error()) log.Error("Unmarshal failed",log.String("RemoteAddr",agent.conn.RemoteAddr().String()),log.ErrorAttr("error",err))
if req.RpcRequestData.GetSeq() > 0 { if req.RpcRequestData.GetSeq() > 0 {
rpcError := RpcError(err.Error()) rpcError := RpcError(err.Error())
if req.RpcRequestData.IsNoReply() == false { if req.RpcRequestData.IsNoReply() == false {
@@ -201,7 +201,7 @@ func (agent *RpcAgent) Run() {
agent.WriteResponse(processor, req.RpcRequestData.GetServiceMethod(), req.RpcRequestData.GetSeq(), nil, rpcError) agent.WriteResponse(processor, req.RpcRequestData.GetServiceMethod(), req.RpcRequestData.GetSeq(), nil, rpcError)
} }
ReleaseRpcRequest(req) ReleaseRpcRequest(req)
log.SError("rpc request req.ServiceMethod is error") log.Error("rpc request req.ServiceMethod is error")
continue continue
} }
@@ -211,8 +211,7 @@ func (agent *RpcAgent) Run() {
if req.RpcRequestData.IsNoReply() == false { if req.RpcRequestData.IsNoReply() == false {
agent.WriteResponse(processor, req.RpcRequestData.GetServiceMethod(), req.RpcRequestData.GetSeq(), nil, rpcError) agent.WriteResponse(processor, req.RpcRequestData.GetServiceMethod(), req.RpcRequestData.GetSeq(), nil, rpcError)
} }
log.Error("serviceMethod not config",log.String("serviceMethod",req.RpcRequestData.GetServiceMethod()))
log.SError("service method ", req.RpcRequestData.GetServiceMethod(), " not config!")
ReleaseRpcRequest(req) ReleaseRpcRequest(req)
continue continue
} }
@@ -232,7 +231,7 @@ func (agent *RpcAgent) Run() {
} else { } else {
ReleaseRpcRequest(req) ReleaseRpcRequest(req)
} }
log.SError(rErr) log.Error("call rpc param error",log.String("serviceMethod",req.RpcRequestData.GetServiceMethod()),log.ErrorAttr("error",err))
continue continue
} }
@@ -281,7 +280,7 @@ func (server *Server) myselfRpcHandlerGo(client *Client,handlerName string, serv
rpcHandler := server.rpcHandleFinder.FindRpcHandler(handlerName) rpcHandler := server.rpcHandleFinder.FindRpcHandler(handlerName)
if rpcHandler == nil { if rpcHandler == nil {
err := errors.New("service method " + serviceMethod + " not config!") err := errors.New("service method " + serviceMethod + " not config!")
log.SError(err.Error()) log.Error("service method not config",log.String("serviceMethod",serviceMethod))
return err return err
} }
@@ -297,7 +296,7 @@ func (server *Server) selfNodeRpcHandlerGo(timeout time.Duration,processor IRpcP
rpcHandler := server.rpcHandleFinder.FindRpcHandler(handlerName) rpcHandler := server.rpcHandleFinder.FindRpcHandler(handlerName)
if rpcHandler == nil { if rpcHandler == nil {
err := errors.New("service method " + serviceMethod + " not config!") err := errors.New("service method " + serviceMethod + " not config!")
log.SError(err.Error()) log.Error("service method not config",log.String("serviceMethod",serviceMethod),log.ErrorAttr("error",err))
pCall.Seq = 0 pCall.Seq = 0
pCall.DoError(err) pCall.DoError(err)
@@ -314,7 +313,7 @@ func (server *Server) selfNodeRpcHandlerGo(timeout time.Duration,processor IRpcP
iParam,err = processor.Clone(args) iParam,err = processor.Clone(args)
if err != nil { if err != nil {
sErr := errors.New("RpcHandler " + handlerName + "."+serviceMethod+" deep copy inParam is error:" + err.Error()) sErr := errors.New("RpcHandler " + handlerName + "."+serviceMethod+" deep copy inParam is error:" + err.Error())
log.SError(sErr.Error()) log.Error("deep copy inParam is failed",log.String("handlerName",handlerName),log.String("serviceMethod",serviceMethod))
pCall.Seq = 0 pCall.Seq = 0
pCall.DoError(sErr) pCall.DoError(sErr)
@@ -329,7 +328,7 @@ func (server *Server) selfNodeRpcHandlerGo(timeout time.Duration,processor IRpcP
var err error var err error
req.inParam, err = rpcHandler.UnmarshalInParam(processor, serviceMethod, rpcMethodId, rawArgs) req.inParam, err = rpcHandler.UnmarshalInParam(processor, serviceMethod, rpcMethodId, rawArgs)
if err != nil { if err != nil {
log.SError(err.Error()) log.Error("unmarshalInParam is failed",log.String("serviceMethod",serviceMethod),log.Uint32("rpcMethodId",rpcMethodId),log.ErrorAttr("error",err))
pCall.Seq = 0 pCall.Seq = 0
pCall.DoError(err) pCall.DoError(err)
ReleaseRpcRequest(req) ReleaseRpcRequest(req)
@@ -345,12 +344,12 @@ func (server *Server) selfNodeRpcHandlerGo(timeout time.Duration,processor IRpcP
byteReturns, err := req.rpcProcessor.Marshal(Returns) byteReturns, err := req.rpcProcessor.Marshal(Returns)
if err != nil { if err != nil {
Err = ConvertError(err) Err = ConvertError(err)
log.SError("returns data cannot be marshal,callSeq is ", callSeq," error is ",err.Error()) log.Error("returns data cannot be marshal",log.Uint64("seq",callSeq),log.ErrorAttr("error",err))
}else{ }else{
err = req.rpcProcessor.Unmarshal(byteReturns, reply) err = req.rpcProcessor.Unmarshal(byteReturns, reply)
if err != nil { if err != nil {
Err = ConvertError(err) Err = ConvertError(err)
log.SError("returns data cannot be Unmarshal,callSeq is ", callSeq," error is ",err.Error()) log.Error("returns data cannot be Unmarshal",log.Uint64("seq",callSeq),log.ErrorAttr("error",err))
} }
} }
} }
@@ -358,8 +357,7 @@ func (server *Server) selfNodeRpcHandlerGo(timeout time.Duration,processor IRpcP
ReleaseRpcRequest(req) ReleaseRpcRequest(req)
v := client.RemovePending(callSeq) v := client.RemovePending(callSeq)
if v == nil { if v == nil {
log.SError("rpcClient cannot find seq ",callSeq, " in pending") log.Error("rpcClient cannot find seq",log.Uint64("seq",callSeq))
return return
} }
@@ -367,7 +365,7 @@ func (server *Server) selfNodeRpcHandlerGo(timeout time.Duration,processor IRpcP
v.Err = nil v.Err = nil
v.DoOK() v.DoOK()
} else { } else {
log.SError(Err.Error()) log.Error(Err.Error())
v.DoError(Err) v.DoError(Err)
} }
} }
@@ -375,7 +373,7 @@ func (server *Server) selfNodeRpcHandlerGo(timeout time.Duration,processor IRpcP
err := rpcHandler.PushRpcRequest(req) err := rpcHandler.PushRpcRequest(req)
if err != nil { if err != nil {
log.SError(err.Error()) log.Error(err.Error())
pCall.DoError(err) pCall.DoError(err)
ReleaseRpcRequest(req) ReleaseRpcRequest(req)
} }
@@ -387,7 +385,7 @@ func (server *Server) selfNodeRpcHandlerAsyncGo(timeout time.Duration,client *Cl
rpcHandler := server.rpcHandleFinder.FindRpcHandler(handlerName) rpcHandler := server.rpcHandleFinder.FindRpcHandler(handlerName)
if rpcHandler == nil { if rpcHandler == nil {
err := errors.New("service method " + serviceMethod + " not config!") err := errors.New("service method " + serviceMethod + " not config!")
log.SError(err.Error()) log.Error(err.Error())
return emptyCancelRpc,err return emptyCancelRpc,err
} }
@@ -395,7 +393,7 @@ func (server *Server) selfNodeRpcHandlerAsyncGo(timeout time.Duration,client *Cl
iParam,err := processor.Clone(args) iParam,err := processor.Clone(args)
if err != nil { if err != nil {
errM := errors.New("RpcHandler " + handlerName + "."+serviceMethod+" deep copy inParam is error:" + err.Error()) errM := errors.New("RpcHandler " + handlerName + "."+serviceMethod+" deep copy inParam is error:" + err.Error())
log.SError(errM.Error()) log.Error(errM.Error())
return emptyCancelRpc,errM return emptyCancelRpc,errM
} }

View File

@@ -117,7 +117,7 @@ func (m *Module) AddModule(module IModule) (uint32, error) {
m.child[module.GetModuleId()] = module m.child[module.GetModuleId()] = module
m.ancestor.getBaseModule().(*Module).descendants[module.GetModuleId()] = module m.ancestor.getBaseModule().(*Module).descendants[module.GetModuleId()] = module
log.SDebug("Add module ", module.GetModuleName(), " completed") log.Debug("Add module "+module.GetModuleName()+ " completed")
return module.GetModuleId(), nil return module.GetModuleId(), nil
} }
@@ -131,7 +131,7 @@ func (m *Module) ReleaseModule(moduleId uint32) {
pModule.self.OnRelease() pModule.self.OnRelease()
pModule.GetEventHandler().Destroy() pModule.GetEventHandler().Destroy()
log.SDebug("Release module ", pModule.GetModuleName()) log.Debug("Release module "+ pModule.GetModuleName())
for pTimer := range pModule.mapActiveTimer { for pTimer := range pModule.mapActiveTimer {
pTimer.Cancel() pTimer.Cancel()
} }
@@ -278,18 +278,18 @@ func (m *Module) SafeNewTicker(tickerId *uint64, d time.Duration, AdditionData i
func (m *Module) CancelTimerId(timerId *uint64) bool { func (m *Module) CancelTimerId(timerId *uint64) bool {
if timerId==nil || *timerId == 0 { if timerId==nil || *timerId == 0 {
log.SWarning("timerId is invalid") log.Warning("timerId is invalid")
return false return false
} }
if m.mapActiveIdTimer == nil { if m.mapActiveIdTimer == nil {
log.SError("mapActiveIdTimer is nil") log.Error("mapActiveIdTimer is nil")
return false return false
} }
t, ok := m.mapActiveIdTimer[*timerId] t, ok := m.mapActiveIdTimer[*timerId]
if ok == false { if ok == false {
log.SStack("cannot find timer id ", timerId) log.Stack("cannot find timer id ", log.Uint64("timerId",*timerId))
return false return false
} }

View File

@@ -93,7 +93,7 @@ func (s *Service) OnSetup(iService IService){
func (s *Service) OpenProfiler() { func (s *Service) OpenProfiler() {
s.profiler = profiler.RegProfiler(s.GetName()) s.profiler = profiler.RegProfiler(s.GetName())
if s.profiler==nil { if s.profiler==nil {
log.SFatal("rofiler.RegProfiler ",s.GetName()," fail.") log.Fatal("rofiler.RegProfiler "+s.GetName()+" fail.")
} }
} }
@@ -128,7 +128,7 @@ func (s *Service) Start() {
s.wg.Add(1) s.wg.Add(1)
waitRun.Add(1) waitRun.Add(1)
go func(){ go func(){
log.SRelease(s.GetName()," service is running",) log.Info(s.GetName()+" service is running",)
waitRun.Done() waitRun.Done()
s.Run() s.Run()
}() }()
@@ -158,12 +158,12 @@ func (s *Service) Run() {
case event.ServiceRpcRequestEvent: case event.ServiceRpcRequestEvent:
cEvent,ok := ev.(*event.Event) cEvent,ok := ev.(*event.Event)
if ok == false { if ok == false {
log.SError("Type event conversion error") log.Error("Type event conversion error")
break break
} }
rpcRequest,ok := cEvent.Data.(*rpc.RpcRequest) rpcRequest,ok := cEvent.Data.(*rpc.RpcRequest)
if ok == false { if ok == false {
log.SError("Type *rpc.RpcRequest conversion error") log.Error("Type *rpc.RpcRequest conversion error")
break break
} }
if s.profiler!=nil { if s.profiler!=nil {
@@ -179,12 +179,12 @@ func (s *Service) Run() {
case event.ServiceRpcResponseEvent: case event.ServiceRpcResponseEvent:
cEvent,ok := ev.(*event.Event) cEvent,ok := ev.(*event.Event)
if ok == false { if ok == false {
log.SError("Type event conversion error") log.Error("Type event conversion error")
break break
} }
rpcResponseCB,ok := cEvent.Data.(*rpc.Call) rpcResponseCB,ok := cEvent.Data.(*rpc.Call)
if ok == false { if ok == false {
log.SError("Type *rpc.Call conversion error") log.Error("Type *rpc.Call conversion error")
break break
} }
if s.profiler!=nil { if s.profiler!=nil {
@@ -242,7 +242,7 @@ func (s *Service) Release(){
buf := make([]byte, 4096) buf := make([]byte, 4096)
l := runtime.Stack(buf, false) l := runtime.Stack(buf, false)
errString := fmt.Sprint(r) errString := fmt.Sprint(r)
log.SError("core dump info[",errString,"]\n",string(buf[:l])) log.Dump(string(buf[:l]),log.String("error",errString))
} }
}() }()
@@ -257,10 +257,10 @@ func (s *Service) OnInit() error {
} }
func (s *Service) Stop(){ func (s *Service) Stop(){
log.SRelease("stop ",s.GetName()," service ") log.Info("stop "+s.GetName()+" service ")
close(s.closeSig) close(s.closeSig)
s.wg.Wait() s.wg.Wait()
log.SRelease(s.GetName()," service has been stopped") log.Info(s.GetName()+" service has been stopped")
} }
func (s *Service) GetServiceCfg()interface{}{ func (s *Service) GetServiceCfg()interface{}{
@@ -353,7 +353,7 @@ func (s *Service) PushEvent(ev event.IEvent) error{
func (s *Service) pushEvent(ev event.IEvent) error{ func (s *Service) pushEvent(ev event.IEvent) error{
if len(s.chanEvent) >= maxServiceEventChannelNum { if len(s.chanEvent) >= maxServiceEventChannelNum {
err := errors.New("The event channel in the service is full") err := errors.New("The event channel in the service is full")
log.SError(err.Error()) log.Error(err.Error())
return err return err
} }
@@ -380,7 +380,7 @@ func (s *Service) SetEventChannelNum(num int){
func (s *Service) SetGoRoutineNum(goroutineNum int32) bool { func (s *Service) SetGoRoutineNum(goroutineNum int32) bool {
//已经开始状态不允许修改协程数量,打开性能分析器不允许开多线程 //已经开始状态不允许修改协程数量,打开性能分析器不允许开多线程
if s.startStatus == true || s.profiler!=nil { if s.startStatus == true || s.profiler!=nil {
log.SError("open profiler mode is not allowed to set Multi-coroutine.") log.Error("open profiler mode is not allowed to set Multi-coroutine.")
return false return false
} }

View File

@@ -1,4 +1,4 @@
package mysqlmondule package mysqlmodule
import ( import (
"database/sql" "database/sql"

View File

@@ -1,4 +1,4 @@
package mysqlmondule package mysqlmodule
import ( import (
"testing" "testing"

View File

@@ -104,11 +104,11 @@ func (cs *CustomerSubscriber) UnSubscribe() {
func (cs *CustomerSubscriber) LoadLastIndex() { func (cs *CustomerSubscriber) LoadLastIndex() {
for { for {
if atomic.LoadInt32(&cs.isStop) != 0 { if atomic.LoadInt32(&cs.isStop) != 0 {
log.SRelease("topic ", cs.topic, " out of subscription") log.Info("topic ", cs.topic, " out of subscription")
break break
} }
log.SRelease("customer ", cs.customerId, " start load last index ") log.Info("customer ", cs.customerId, " start load last index ")
lastIndex, ret := cs.subscriber.dataPersist.LoadCustomerIndex(cs.topic, cs.customerId) lastIndex, ret := cs.subscriber.dataPersist.LoadCustomerIndex(cs.topic, cs.customerId)
if ret == true { if ret == true {
if lastIndex > 0 { if lastIndex > 0 {
@@ -116,18 +116,18 @@ func (cs *CustomerSubscriber) LoadLastIndex() {
} else { } else {
//否则直接使用客户端发回来的 //否则直接使用客户端发回来的
} }
log.SRelease("customer ", cs.customerId, " load finish,start index is ", cs.StartIndex) log.Info("customer ", cs.customerId, " load finish,start index is ", cs.StartIndex)
break break
} }
log.SRelease("customer ", cs.customerId, " load last index is fail...") log.Info("customer ", cs.customerId, " load last index is fail...")
time.Sleep(5 * time.Second) time.Sleep(5 * time.Second)
} }
} }
func (cs *CustomerSubscriber) SubscribeRun() { func (cs *CustomerSubscriber) SubscribeRun() {
defer cs.subscriber.queueWait.Done() defer cs.subscriber.queueWait.Done()
log.SRelease("topic ", cs.topic, " start subscription") log.Info("topic ", cs.topic, " start subscription")
//加载之前的位置 //加载之前的位置
if cs.subscribeMethod == MethodLast { if cs.subscribeMethod == MethodLast {
@@ -136,7 +136,7 @@ func (cs *CustomerSubscriber) SubscribeRun() {
for { for {
if atomic.LoadInt32(&cs.isStop) != 0 { if atomic.LoadInt32(&cs.isStop) != 0 {
log.SRelease("topic ", cs.topic, " out of subscription") log.Info("topic ", cs.topic, " out of subscription")
break break
} }
@@ -146,14 +146,14 @@ func (cs *CustomerSubscriber) SubscribeRun() {
//todo 检测退出 //todo 检测退出
if cs.subscribe() == false { if cs.subscribe() == false {
log.SRelease("topic ", cs.topic, " out of subscription") log.Info("topic ", cs.topic, " out of subscription")
break break
} }
} }
//删除订阅关系 //删除订阅关系
cs.subscriber.removeCustomer(cs.customerId, cs) cs.subscriber.removeCustomer(cs.customerId, cs)
log.SRelease("topic ", cs.topic, " unsubscription") log.Info("topic ", cs.topic, " unsubscription")
} }
func (cs *CustomerSubscriber) subscribe() bool { func (cs *CustomerSubscriber) subscribe() bool {

View File

@@ -63,7 +63,7 @@ func (ms *MessageQueueService) ReadCfg() error {
maxProcessTopicBacklogNum, ok := mapDBServiceCfg["MaxProcessTopicBacklogNum"] maxProcessTopicBacklogNum, ok := mapDBServiceCfg["MaxProcessTopicBacklogNum"]
if ok == false { if ok == false {
ms.maxProcessTopicBacklogNum = DefaultMaxTopicBacklogNum ms.maxProcessTopicBacklogNum = DefaultMaxTopicBacklogNum
log.SRelease("MaxProcessTopicBacklogNum config is set to the default value of ", maxProcessTopicBacklogNum) log.Info("MaxProcessTopicBacklogNum config is set to the default value of ", maxProcessTopicBacklogNum)
} else { } else {
ms.maxProcessTopicBacklogNum = int32(maxProcessTopicBacklogNum.(float64)) ms.maxProcessTopicBacklogNum = int32(maxProcessTopicBacklogNum.(float64))
} }
@@ -71,7 +71,7 @@ func (ms *MessageQueueService) ReadCfg() error {
memoryQueueLen, ok := mapDBServiceCfg["MemoryQueueLen"] memoryQueueLen, ok := mapDBServiceCfg["MemoryQueueLen"]
if ok == false { if ok == false {
ms.memoryQueueLen = DefaultMemoryQueueLen ms.memoryQueueLen = DefaultMemoryQueueLen
log.SRelease("MemoryQueueLen config is set to the default value of ", DefaultMemoryQueueLen) log.Info("MemoryQueueLen config is set to the default value of ", DefaultMemoryQueueLen)
} else { } else {
ms.memoryQueueLen = int32(memoryQueueLen.(float64)) ms.memoryQueueLen = int32(memoryQueueLen.(float64))
} }

View File

@@ -56,9 +56,9 @@ func (ss *Subscriber) TopicSubscribe(rpcHandler rpc.IRpcHandler, subScribeType r
} }
if ok == true { if ok == true {
log.SRelease("repeat subscription for customer ", customerId) log.Info("repeat subscription for customer ", customerId)
} else { } else {
log.SRelease("subscription for customer ", customerId) log.Info("subscription for customer ", customerId)
} }
} }

View File

@@ -93,7 +93,7 @@ func (tr *TopicRoom) Stop() {
func (tr *TopicRoom) topicRoomRun() { func (tr *TopicRoom) topicRoomRun() {
defer tr.queueWait.Done() defer tr.queueWait.Done()
log.SRelease("topic room ", tr.topic, " is running..") log.Info("topic room ", tr.topic, " is running..")
for { for {
if atomic.LoadInt32(&tr.isStop) != 0 { if atomic.LoadInt32(&tr.isStop) != 0 {
break break
@@ -145,5 +145,5 @@ func (tr *TopicRoom) topicRoomRun() {
} }
tr.customerLocker.Unlock() tr.customerLocker.Unlock()
log.SRelease("topic room ", tr.topic, " is stop") log.Info("topic room ", tr.topic, " is stop")
} }

View File

@@ -142,13 +142,13 @@ func (mp *MongoPersist) OnSetupRank(manual bool,rankSkip *RankSkip) error{
return nil return nil
} }
log.SRelease("start load rank ",rankSkip.GetRankName()," from mongodb.") log.Info("start load rank ",rankSkip.GetRankName()," from mongodb.")
err := mp.loadFromDB(rankSkip.GetRankID(),rankSkip.GetRankName()) err := mp.loadFromDB(rankSkip.GetRankID(),rankSkip.GetRankName())
if err != nil { if err != nil {
log.SError("load from db is fail :%s",err.Error()) log.SError("load from db is fail :%s",err.Error())
return err return err
} }
log.SRelease("finish load rank ",rankSkip.GetRankName()," from mongodb.") log.Info("finish load rank ",rankSkip.GetRankName()," from mongodb.")
return nil return nil
} }
@@ -297,7 +297,7 @@ func (mp *MongoPersist) saveToDB(){
buf := make([]byte, 4096) buf := make([]byte, 4096)
l := runtime.Stack(buf, false) l := runtime.Stack(buf, false)
errString := fmt.Sprint(r) errString := fmt.Sprint(r)
log.SError(" Core dump info[", errString, "]\n", string(buf[:l])) log.Dump(string(buf[:l]),log.String("error",errString))
} }
}() }()

View File

@@ -8,10 +8,11 @@ import (
"github.com/duanhf2012/origin/network/processor" "github.com/duanhf2012/origin/network/processor"
"github.com/duanhf2012/origin/node" "github.com/duanhf2012/origin/node"
"github.com/duanhf2012/origin/service" "github.com/duanhf2012/origin/service"
"sync/atomic" "github.com/duanhf2012/origin/util/bytespool"
"sync"
"time"
"runtime" "runtime"
"sync"
"sync/atomic"
"time"
) )
type TcpService struct { type TcpService struct {
@@ -170,7 +171,7 @@ func (slf *Client) Run() {
buf := make([]byte, 4096) buf := make([]byte, 4096)
l := runtime.Stack(buf, false) l := runtime.Stack(buf, false)
errString := fmt.Sprint(r) errString := fmt.Sprint(r)
log.SError("core dump info[",errString,"]\n",string(buf[:l])) log.Dump(string(buf[:l]),log.String("error",errString))
} }
}() }()
@@ -183,7 +184,7 @@ func (slf *Client) Run() {
slf.tcpConn.SetReadDeadline(slf.tcpService.tcpServer.ReadDeadline) slf.tcpConn.SetReadDeadline(slf.tcpService.tcpServer.ReadDeadline)
bytes,err := slf.tcpConn.ReadMsg() bytes,err := slf.tcpConn.ReadMsg()
if err != nil { if err != nil {
log.SDebug("read client id ",slf.id," is error:",err.Error()) log.Debug("read client failed",log.ErrorAttr("error",err),log.Uint64("clientId",slf.id))
break break
} }
data,err:=slf.tcpService.process.Unmarshal(slf.id,bytes) data,err:=slf.tcpService.process.Unmarshal(slf.id,bytes)
@@ -277,14 +278,14 @@ func (tcpService *TcpService) GetConnNum() int {
return connNum return connNum
} }
func (server *TcpService) SetNetMempool(mempool network.INetMempool){ func (server *TcpService) SetNetMempool(mempool bytespool.IBytesMempool){
server.tcpServer.SetNetMempool(mempool) server.tcpServer.SetNetMempool(mempool)
} }
func (server *TcpService) GetNetMempool() network.INetMempool{ func (server *TcpService) GetNetMempool() bytespool.IBytesMempool {
return server.tcpServer.GetNetMempool() return server.tcpServer.GetNetMempool()
} }
func (server *TcpService) ReleaseNetMem(byteBuff []byte) { func (server *TcpService) ReleaseNetMem(byteBuff []byte) {
server.tcpServer.GetNetMempool().ReleaseByteSlice(byteBuff) server.tcpServer.GetNetMempool().ReleaseBytes(byteBuff)
} }

View File

@@ -1,12 +1,12 @@
package network package bytespool
import ( import (
"sync" "sync"
) )
type INetMempool interface { type IBytesMempool interface {
MakeByteSlice(size int) []byte MakeBytes(size int) []byte
ReleaseByteSlice(byteBuff []byte) bool ReleaseBytes(byteBuff []byte) bool
} }
type memAreaPool struct { type memAreaPool struct {
@@ -68,7 +68,7 @@ func (areaPool *memAreaPool) releaseByteSlice(byteBuff []byte) bool {
return true return true
} }
func (areaPool *memAreaPool) MakeByteSlice(size int) []byte { func (areaPool *memAreaPool) MakeBytes(size int) []byte {
for i := 0; i < len(memAreaPoolList); i++ { for i := 0; i < len(memAreaPoolList); i++ {
if size <= memAreaPoolList[i].maxAreaValue { if size <= memAreaPoolList[i].maxAreaValue {
return memAreaPoolList[i].makeByteSlice(size) return memAreaPoolList[i].makeByteSlice(size)
@@ -78,7 +78,7 @@ func (areaPool *memAreaPool) MakeByteSlice(size int) []byte {
return make([]byte, size) return make([]byte, size)
} }
func (areaPool *memAreaPool) ReleaseByteSlice(byteBuff []byte) bool { func (areaPool *memAreaPool) ReleaseBytes(byteBuff []byte) bool {
for i := 0; i < len(memAreaPoolList); i++ { for i := 0; i < len(memAreaPoolList); i++ {
if cap(byteBuff) <= memAreaPoolList[i].maxAreaValue { if cap(byteBuff) <= memAreaPoolList[i].maxAreaValue {
return memAreaPoolList[i].releaseByteSlice(byteBuff) return memAreaPoolList[i].releaseByteSlice(byteBuff)

View File

@@ -42,9 +42,9 @@ func Abs[NumType SignedNumberType](Num NumType) NumType {
func Add[NumType NumberType](number1 NumType, number2 NumType) NumType { func Add[NumType NumberType](number1 NumType, number2 NumType) NumType {
ret := number1 + number2 ret := number1 + number2
if number2> 0 && ret < number1 { if number2> 0 && ret < number1 {
log.SStack("Calculation overflow , number1 is ",number1," number2 is ",number2) log.Stack("Calculation overflow" ,log.Any("number1",number1),log.Any("number2",number2))
}else if (number2<0 && ret > number1){ }else if (number2<0 && ret > number1){
log.SStack("Calculation overflow , number1 is ",number1," number2 is ",number2) log.Stack("Calculation overflow" ,log.Any("number1",number1),log.Any("number2",number2))
} }
return ret return ret
@@ -53,9 +53,9 @@ func Add[NumType NumberType](number1 NumType, number2 NumType) NumType {
func Sub[NumType NumberType](number1 NumType, number2 NumType) NumType { func Sub[NumType NumberType](number1 NumType, number2 NumType) NumType {
ret := number1 - number2 ret := number1 - number2
if number2> 0 && ret > number1 { if number2> 0 && ret > number1 {
log.SStack("Calculation overflow , number1 is ",number1," number2 is ",number2) log.Stack("Calculation overflow" ,log.Any("number1",number1),log.Any("number2",number2))
}else if (number2<0 && ret < number1){ }else if (number2<0 && ret < number1){
log.SStack("Calculation overflow , number1 is ",number1," number2 is ",number2) log.Stack("Calculation overflow" ,log.Any("number1",number1),log.Any("number2",number2))
} }
return ret return ret
@@ -72,7 +72,7 @@ func Mul[NumType NumberType](number1 NumType, number2 NumType) NumType {
return ret return ret
} }
log.SStack("Calculation overflow , number1 is ",number1," number2 is ",number2) log.Stack("Calculation overflow" ,log.Any("number1",number1),log.Any("number2",number2))
return ret return ret
} }

View File

@@ -132,7 +132,7 @@ func (t *Timer) Do() {
buf := make([]byte, 4096) buf := make([]byte, 4096)
l := runtime.Stack(buf, false) l := runtime.Stack(buf, false)
errString := fmt.Sprint(r) errString := fmt.Sprint(r)
log.SError("core dump info[", errString, "]\n", string(buf[:l])) log.Dump(string(buf[:l]),log.String("error",errString))
} }
}() }()
@@ -218,7 +218,7 @@ func (c *Cron) Do() {
buf := make([]byte, 4096) buf := make([]byte, 4096)
l := runtime.Stack(buf, false) l := runtime.Stack(buf, false)
errString := fmt.Sprint(r) errString := fmt.Sprint(r)
log.SError("core dump info[", errString, "]\n", string(buf[:l])) log.Dump(string(buf[:l]),log.String("error",errString))
} }
}() }()
@@ -274,7 +274,7 @@ func (c *Ticker) Do() {
buf := make([]byte, 4096) buf := make([]byte, 4096)
l := runtime.Stack(buf, false) l := runtime.Stack(buf, false)
errString := fmt.Sprint(r) errString := fmt.Sprint(r)
log.SError("core dump info[", errString, "]\n", string(buf[:l])) log.Dump(string(buf[:l]),log.String("error",errString))
} }
}() }()