Compare commits

...

10 Commits

Author SHA1 Message Date
duanhf2012
2d1bee4dea 优化日志输出格式 2023-09-01 17:44:59 +08:00
duanhf2012
fa8cbfb40e 优化rpc处理异常问题 2023-08-29 18:23:20 +08:00
duanhf2012
388b946401 优化日志的ErrorAttr 2023-08-24 11:34:43 +08:00
duanhf2012
582a0faa6f 替换gogoprotobuf为google标准库 2023-08-21 15:44:55 +08:00
duanhf2012
fa6039e2cb 优化异步日志 2023-08-17 15:54:36 +08:00
duanhf2012
25a672ca53 新增优化异步日志 2023-08-17 15:35:22 +08:00
duanhf2012
75f881be28 新增异步日志功能 2023-08-17 14:00:36 +08:00
duanhf2012
ef8182eec7 替换日志库为slog 2023-08-15 15:46:38 +08:00
duanhf2012
4ad8204fde 优化module包的名称 2023-08-04 17:21:23 +08:00
duanhf2012
8f15546fb1 整理README格式 2023-08-01 11:12:54 +08:00
54 changed files with 3305 additions and 9458 deletions

View File

@@ -64,19 +64,19 @@ cluster.json如下
"Private": false,
"ListenAddr":"127.0.0.1:8001",
"MaxRpcParamLen": 409600,
"CompressBytesLen": 20480,
"CompressBytesLen": 20480,
"NodeName": "Node_Test1",
"remark":"//以_打头的表示只在本机进程不对整个子网公开",
"remark":"//以_打头的表示只在本机进程不对整个子网公开",
"ServiceList": ["TestService1","TestService2","TestServiceCall","GateService","_TcpService","HttpService","WSService"]
},
{
{
"NodeId": 2,
"Private": false,
"ListenAddr":"127.0.0.1:8002",
"MaxRpcParamLen": 409600,
"CompressBytesLen": 20480,
"MaxRpcParamLen": 409600,
"CompressBytesLen": 20480,
"NodeName": "Node_Test1",
"remark":"//以_打头的表示只在本机进程不对整个子网公开",
"remark":"//以_打头的表示只在本机进程不对整个子网公开",
"ServiceList": ["TestService1","TestService2","TestServiceCall","GateService","TcpService","HttpService","WSService"]
}
]

View File

@@ -114,7 +114,7 @@ func (cls *Cluster) DelNode(nodeId int, immediately bool) {
//正在连接中不主动断开,只断开没有连接中的
if rpc.client.IsConnected() {
nodeInfo.status = Discard
log.SRelease("Discard node ", nodeInfo.NodeId, " ", nodeInfo.ListenAddr)
log.Info("Discard node",log.Int("nodeId",nodeInfo.NodeId),log.String("ListenAddr", nodeInfo.ListenAddr))
return
}
@@ -131,7 +131,7 @@ func (cls *Cluster) DelNode(nodeId int, immediately bool) {
rpc.client.Close(false)
}
log.SRelease("remove node ", nodeInfo.NodeId, " ", nodeInfo.ListenAddr)
log.Info("remove node ",log.Int("NodeId", nodeInfo.NodeId),log.String("ListenAddr", nodeInfo.ListenAddr))
}
func (cls *Cluster) serviceDiscoveryDelNode(nodeId int, immediately bool) {
@@ -176,7 +176,7 @@ func (cls *Cluster) serviceDiscoverySetNodeInfo(nodeInfo *NodeInfo) {
for _, serviceName := range nodeInfo.PublicServiceList {
if _, ok := mapDuplicate[serviceName]; ok == true {
//存在重复
log.SError("Bad duplicate Service Cfg.")
log.Error("Bad duplicate Service Cfg.")
continue
}
mapDuplicate[serviceName] = nil
@@ -186,8 +186,7 @@ func (cls *Cluster) serviceDiscoverySetNodeInfo(nodeInfo *NodeInfo) {
cls.mapServiceNode[serviceName][nodeInfo.NodeId] = struct{}{}
}
cls.mapIdNode[nodeInfo.NodeId] = *nodeInfo
log.SRelease("Discovery nodeId: ", nodeInfo.NodeId, " services:", nodeInfo.PublicServiceList)
log.Info("Discovery nodeId",log.Int("NodeId", nodeInfo.NodeId),log.Any("services:", nodeInfo.PublicServiceList))
//已经存在连接,则不需要进行设置
if _, rpcInfoOK := cls.mapRpc[nodeInfo.NodeId]; rpcInfoOK == true {
@@ -368,7 +367,7 @@ func (cls *Cluster) triggerRpcEvent(bConnect bool, clientId uint32, nodeId int)
for serviceName, _ := range cls.mapServiceListenRpcEvent {
ser := service.GetService(serviceName)
if ser == nil {
log.SError("cannot find service name ", serviceName)
log.Error("cannot find service name "+serviceName)
continue
}
@@ -386,7 +385,7 @@ func (cls *Cluster) TriggerDiscoveryEvent(bDiscovery bool, nodeId int, serviceNa
for sName, _ := range cls.mapServiceListenDiscoveryEvent {
ser := service.GetService(sName)
if ser == nil {
log.SError("cannot find service name ", serviceName)
log.Error("cannot find service",log.Any("services",serviceName))
continue
}

View File

@@ -142,7 +142,7 @@ func (ds *DynamicDiscoveryMaster) RpcCastGo(serviceMethod string, args interface
func (ds *DynamicDiscoveryMaster) RPC_RegServiceDiscover(req *rpc.ServiceDiscoverReq, res *rpc.Empty) error {
if req.NodeInfo == nil {
err := errors.New("RPC_RegServiceDiscover req is error.")
log.SError(err.Error())
log.Error(err.Error())
return err
}
@@ -369,7 +369,7 @@ func (dc *DynamicDiscoveryClient) regServiceDiscover(nodeId int){
//向Master服务同步本Node服务信息
err := dc.AsyncCallNode(nodeId, RegServiceDiscover, &req, func(res *rpc.Empty, err error) {
if err != nil {
log.SError("call ", RegServiceDiscover, " is fail :", err.Error())
log.Error("call "+RegServiceDiscover+" is fail :"+ err.Error())
dc.AfterFunc(time.Second*3, func(timer *timer.Timer) {
dc.regServiceDiscover(nodeId)
})
@@ -378,7 +378,7 @@ func (dc *DynamicDiscoveryClient) regServiceDiscover(nodeId int){
}
})
if err != nil {
log.SError("call ", RegServiceDiscover, " is fail :", err.Error())
log.Error("call "+ RegServiceDiscover+" is fail :"+ err.Error())
}
}

View File

@@ -58,7 +58,7 @@ func (cls *Cluster) readServiceConfig(filepath string) (interface{}, map[string]
serviceCfg := v.(map[string]interface{})
nodeId, ok := serviceCfg["NodeId"]
if ok == false {
log.SFatal("NodeService list not find nodeId field")
log.Fatal("NodeService list not find nodeId field")
}
mapNodeService[int(nodeId.(float64))] = serviceCfg
}

View File

@@ -50,7 +50,7 @@ func (c *Concurrent) AsyncDoByQueue(queueId int64, fn func() bool, cb func(err e
}
if fn == nil && cb == nil {
log.SStack("fn and cb is nil")
log.Stack("fn and cb is nil")
return
}
@@ -66,7 +66,7 @@ func (c *Concurrent) AsyncDoByQueue(queueId int64, fn func() bool, cb func(err e
select {
case c.tasks <- task{queueId, fn, cb}:
default:
log.SError("tasks channel is full")
log.Error("tasks channel is full")
if cb != nil {
c.pushAsyncDoCallbackEvent(func(err error) {
cb(errors.New("tasks channel is full"))
@@ -81,11 +81,11 @@ func (c *Concurrent) Close() {
return
}
log.SRelease("wait close concurrent")
log.Info("wait close concurrent")
c.dispatch.close()
log.SRelease("concurrent has successfully exited")
log.Info("concurrent has successfully exited")
}
func (c *Concurrent) GetCallBackChannel() chan func(error) {

View File

@@ -187,8 +187,7 @@ func (d *dispatch) DoCallback(cb func(err error)) {
buf := make([]byte, 4096)
l := runtime.Stack(buf, false)
errString := fmt.Sprint(r)
log.SError("core dump info[", errString, "]\n", string(buf[:l]))
log.Dump(string(buf[:l]),log.String("error",errString))
}
}()

View File

@@ -40,7 +40,7 @@ func (w *worker) run(waitGroup *sync.WaitGroup, t task) {
case tw := <-w.workerQueue:
if tw.isExistTask() {
//exit goroutine
log.SRelease("worker goroutine exit")
log.Info("worker goroutine exit")
return
}
w.exec(&tw)
@@ -59,9 +59,8 @@ func (w *worker) exec(t *task) {
t.cb = func(err error) {
cb(errors.New(errString))
}
log.Dump(string(buf[:l]),log.String("error",errString))
w.endCallFun(true,t)
log.SError("core dump info[", errString, "]\n", string(buf[:l]))
}
}()

View File

@@ -11,8 +11,9 @@ type CommandFunctionCB func(args interface{}) error
var commandList []*command
var programName string
const(
boolType valueType = iota
stringType valueType = iota
boolType valueType = 0
stringType valueType = 1
intType valueType = 2
)
type command struct{
@@ -20,6 +21,7 @@ type command struct{
name string
bValue bool
strValue string
intValue int
usage string
fn CommandFunctionCB
}
@@ -29,6 +31,8 @@ func (cmd *command) execute() error{
return cmd.fn(cmd.bValue)
}else if cmd.valType == stringType {
return cmd.fn(cmd.strValue)
}else if cmd.valType == intType {
return cmd.fn(cmd.intValue)
}else{
return fmt.Errorf("Unknow command type.")
}
@@ -72,6 +76,16 @@ func RegisterCommandBool(cmdName string, defaultValue bool, usage string,fn Comm
commandList = append(commandList,&cmd)
}
func RegisterCommandInt(cmdName string, defaultValue int, usage string,fn CommandFunctionCB){
var cmd command
cmd.valType = intType
cmd.name = cmdName
cmd.fn = fn
cmd.usage = usage
flag.IntVar(&cmd.intValue, cmdName, defaultValue, usage)
commandList = append(commandList,&cmd)
}
func RegisterCommandString(cmdName string, defaultValue string, usage string,fn CommandFunctionCB){
var cmd command
cmd.valType = stringType

View File

@@ -215,7 +215,7 @@ func (processor *EventProcessor) EventHandler(ev IEvent) {
buf := make([]byte, 4096)
l := runtime.Stack(buf, false)
errString := fmt.Sprint(r)
log.SError("core dump info[",errString,"]\n",string(buf[:l]))
log.Dump(string(buf[:l]),log.String("error",errString))
}
}()
@@ -230,13 +230,13 @@ func (processor *EventProcessor) EventHandler(ev IEvent) {
func (processor *EventProcessor) castEvent(event IEvent){
if processor.mapListenerEvent == nil {
log.SError("mapListenerEvent not init!")
log.Error("mapListenerEvent not init!")
return
}
eventProcessor,ok := processor.mapListenerEvent[event.GetEventType()]
if ok == false || processor == nil{
log.SDebug("event type ",event.GetEventType()," not listen.")
log.Debug("event is not listen",log.Int("event type",int(event.GetEventType())))
return
}

5
go.mod
View File

@@ -1,14 +1,15 @@
module github.com/duanhf2012/origin
go 1.19
go 1.21
require (
github.com/go-sql-driver/mysql v1.6.0
github.com/gogo/protobuf v1.3.2
github.com/gomodule/redigo v1.8.8
github.com/gorilla/websocket v1.5.0
github.com/json-iterator/go v1.1.12
github.com/pierrec/lz4/v4 v4.1.18
go.mongodb.org/mongo-driver v1.9.1
google.golang.org/protobuf v1.31.0
gopkg.in/mgo.v2 v2.0.0-20190816093944-a6b53ec6cb22
)

9
go.sum
View File

@@ -7,12 +7,14 @@ github.com/go-stack/stack v1.8.0 h1:5SgMzNM5HxrEjV0ww2lTmX6E2Izsfxas4+YHWRs3Lsk=
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/gomodule/redigo v1.8.8 h1:f6cXq6RRfiyrOJEV7p3JhLDlmawGBVBBP1MggY8Mo4E=
github.com/gomodule/redigo v1.8.8/go.mod h1:7ArFNvsTjH8GMMzB4uy1snslv2BwmginuMs06a1uzZE=
github.com/google/go-cmp v0.5.2 h1:X2ev0eStA3AbceY54o37/0PQ/UWqKEiiO2dKL5OPaFM=
github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc=
github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
@@ -32,6 +34,8 @@ github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJ
github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M=
github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe/go.mod h1:wL8QJuTMNUDYhXwkmfOly8iTdp5TEcJFWZD2D7SIkUc=
github.com/pierrec/lz4/v4 v4.1.18 h1:xaKrnTkyoqfh1YItXl56+6KJNVYWlEEPuAQW9xsplYQ=
github.com/pierrec/lz4/v4 v4.1.18/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
@@ -92,6 +96,9 @@ golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8T
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8=
google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=

View File

@@ -2,27 +2,20 @@ package log // import "go.uber.org/zap/buffer"
import (
"strconv"
"sync"
)
const _size = 9216
type Buffer struct {
bs []byte
mu sync.Mutex // ensures atomic writes; protects the following fields
//mu sync.Mutex // ensures atomic writes; protects the following fields
}
func (buff *Buffer) Init(){
buff.bs = make([]byte,_size)
}
func (buff *Buffer) Locker() {
buff.mu.Lock()
}
func (buff *Buffer) UnLocker() {
buff.mu.Unlock()
}
// AppendByte writes a single byte to the Buffer.
func (b *Buffer) AppendByte(v byte) {

147
log/handler.go Normal file
View File

@@ -0,0 +1,147 @@
package log
import (
"context"
"io"
"log/slog"
"path/filepath"
"runtime"
"runtime/debug"
"sync"
)
type IOriginHandler interface {
slog.Handler
Lock()
UnLock()
}
type BaseHandler struct {
addSource bool
w io.Writer
locker sync.Mutex
}
type OriginTextHandler struct {
BaseHandler
*slog.TextHandler
}
type OriginJsonHandler struct {
BaseHandler
*slog.JSONHandler
}
func getStrLevel(level slog.Level) string{
switch level {
case LevelTrace:
return "Trace"
case LevelDebug:
return "Debug"
case LevelInfo:
return "Info"
case LevelWarning:
return "Warning"
case LevelError:
return "Error"
case LevelStack:
return "Stack"
case LevelDump:
return "Dump"
case LevelFatal:
return "Fatal"
}
return ""
}
func defaultReplaceAttr(groups []string, a slog.Attr) slog.Attr {
if a.Key == slog.LevelKey {
level := a.Value.Any().(slog.Level)
a.Value = slog.StringValue(getStrLevel(level))
}else if a.Key == slog.TimeKey && len(groups) == 0 {
a.Value = slog.StringValue(a.Value.Time().Format("2006/01/02 15:04:05"))
}else if a.Key == slog.SourceKey {
source := a.Value.Any().(*slog.Source)
source.File = filepath.Base(source.File)
}
return a
}
func NewOriginTextHandler(level slog.Level,w io.Writer,addSource bool,replaceAttr func([]string,slog.Attr) slog.Attr) slog.Handler{
var textHandler OriginTextHandler
textHandler.addSource = addSource
textHandler.w = w
textHandler.TextHandler = slog.NewTextHandler(w,&slog.HandlerOptions{
AddSource: addSource,
Level: level,
ReplaceAttr: replaceAttr,
})
return &textHandler
}
func (oh *OriginTextHandler) Handle(context context.Context, record slog.Record) error{
oh.Fill(context,&record)
oh.locker.Lock()
defer oh.locker.Unlock()
if record.Level == LevelStack || record.Level == LevelFatal{
err := oh.TextHandler.Handle(context, record)
oh.logStack(&record)
return err
}else if record.Level == LevelDump {
strDump := record.Message
record.Message = "dump info"
err := oh.TextHandler.Handle(context, record)
oh.w.Write([]byte(strDump))
return err
}
return oh.TextHandler.Handle(context, record)
}
func (b *BaseHandler) logStack(record *slog.Record){
b.w.Write(debug.Stack())
}
func (b *BaseHandler) Lock(){
b.locker.Lock()
}
func (b *BaseHandler) UnLock(){
b.locker.Unlock()
}
func NewOriginJsonHandler(level slog.Level,w io.Writer,addSource bool,replaceAttr func([]string,slog.Attr) slog.Attr) slog.Handler{
var jsonHandler OriginJsonHandler
jsonHandler.addSource = addSource
jsonHandler.w = w
jsonHandler.JSONHandler = slog.NewJSONHandler(w,&slog.HandlerOptions{
AddSource: addSource,
Level: level,
ReplaceAttr: replaceAttr,
})
return &jsonHandler
}
func (oh *OriginJsonHandler) Handle(context context.Context, record slog.Record) error{
oh.Fill(context,&record)
if record.Level == LevelStack || record.Level == LevelFatal || record.Level == LevelDump{
record.Add("stack",debug.Stack())
}
oh.locker.Lock()
defer oh.locker.Unlock()
return oh.JSONHandler.Handle(context, record)
}
func (b *BaseHandler) Fill(context context.Context, record *slog.Record) {
if b.addSource {
var pcs [1]uintptr
runtime.Callers(7, pcs[:])
record.PC = pcs[0]
}
}

File diff suppressed because it is too large Load Diff

View File

@@ -47,7 +47,7 @@ func (slf *HttpServer) startListen() error {
for _, caFile := range slf.caFileList {
cer, err := tls.LoadX509KeyPair(caFile.CertFile, caFile.Keyfile)
if err != nil {
log.SFatal("Load CA [",caFile.CertFile,"]-[",caFile.Keyfile,"] file is fail:",err.Error())
log.Fatal("Load CA file is fail",log.String("error",err.Error()),log.String("certFile",caFile.CertFile),log.String("keyFile",caFile.Keyfile))
return err
}
tlsCaList = append(tlsCaList, cer)
@@ -74,7 +74,7 @@ func (slf *HttpServer) startListen() error {
}
if err != nil {
log.SFatal("Listen for address ",slf.listenAddr," failure:",err.Error())
log.Fatal("Listen failure",log.String("error",err.Error()),log.String("addr:",slf.listenAddr))
return err
}

View File

@@ -3,9 +3,9 @@ package processor
import (
"encoding/json"
"fmt"
"github.com/duanhf2012/origin/network"
"reflect"
"github.com/duanhf2012/origin/log"
"github.com/duanhf2012/origin/util/bytespool"
"reflect"
)
type MessageJsonInfo struct {
@@ -24,7 +24,7 @@ type JsonProcessor struct {
unknownMessageHandler UnknownMessageJsonHandler
connectHandler ConnectJsonHandler
disconnectHandler ConnectJsonHandler
network.INetMempool
bytespool.IBytesMempool
}
type JsonPackInfo struct {
@@ -35,7 +35,7 @@ type JsonPackInfo struct {
func NewJsonProcessor() *JsonProcessor {
processor := &JsonProcessor{mapMsg:map[uint16]MessageJsonInfo{}}
processor.INetMempool = network.NewMemAreaPool()
processor.IBytesMempool = bytespool.NewMemAreaPool()
return processor
}
@@ -58,7 +58,7 @@ func (jsonProcessor *JsonProcessor ) MsgRoute(clientId uint64,msg interface{}) e
func (jsonProcessor *JsonProcessor) Unmarshal(clientId uint64,data []byte) (interface{}, error) {
typeStruct := struct {Type int `json:"typ"`}{}
defer jsonProcessor.ReleaseByteSlice(data)
defer jsonProcessor.ReleaseBytes(data)
err := json.Unmarshal(data, &typeStruct)
if err != nil {
return nil, err
@@ -106,7 +106,7 @@ func (jsonProcessor *JsonProcessor) MakeRawMsg(msgType uint16,msg []byte) *JsonP
func (jsonProcessor *JsonProcessor) UnknownMsgRoute(clientId uint64,msg interface{}){
if jsonProcessor.unknownMessageHandler==nil {
log.SDebug("Unknown message received from ",clientId)
log.Debug("Unknown message",log.Uint64("clientId",clientId))
return
}

View File

@@ -3,8 +3,8 @@ package processor
import (
"encoding/binary"
"fmt"
"github.com/duanhf2012/origin/network"
"github.com/gogo/protobuf/proto"
"github.com/duanhf2012/origin/util/bytespool"
"google.golang.org/protobuf/proto"
"reflect"
)
@@ -26,7 +26,7 @@ type PBProcessor struct {
unknownMessageHandler UnknownMessageHandler
connectHandler ConnectHandler
disconnectHandler ConnectHandler
network.INetMempool
bytespool.IBytesMempool
}
type PBPackInfo struct {
@@ -37,7 +37,7 @@ type PBPackInfo struct {
func NewPBProcessor() *PBProcessor {
processor := &PBProcessor{mapMsg: map[uint16]MessageInfo{}}
processor.INetMempool = network.NewMemAreaPool()
processor.IBytesMempool = bytespool.NewMemAreaPool()
return processor
}
@@ -67,7 +67,7 @@ func (pbProcessor *PBProcessor) MsgRoute(clientId uint64, msg interface{}) error
// must goroutine safe
func (pbProcessor *PBProcessor) Unmarshal(clientId uint64, data []byte) (interface{}, error) {
defer pbProcessor.ReleaseByteSlice(data)
defer pbProcessor.ReleaseBytes(data)
return pbProcessor.UnmarshalWithOutRelease(clientId, data)
}

View File

@@ -40,29 +40,29 @@ func (client *TCPClient) init() {
if client.ConnNum <= 0 {
client.ConnNum = 1
log.SRelease("invalid ConnNum, reset to ", client.ConnNum)
log.Info("invalid ConnNum",log.Int("reset", client.ConnNum))
}
if client.ConnectInterval <= 0 {
client.ConnectInterval = 3 * time.Second
log.SRelease("invalid ConnectInterval, reset to ", client.ConnectInterval)
log.Info("invalid ConnectInterval",log.Duration("reset", client.ConnectInterval))
}
if client.PendingWriteNum <= 0 {
client.PendingWriteNum = 1000
log.SRelease("invalid PendingWriteNum, reset to ", client.PendingWriteNum)
log.Info("invalid PendingWriteNum",log.Int("reset",client.PendingWriteNum))
}
if client.ReadDeadline == 0 {
client.ReadDeadline = 15*time.Second
log.SRelease("invalid ReadDeadline, reset to ", int64(client.ReadDeadline.Seconds()),"s")
log.Info("invalid ReadDeadline",log.Int64("reset", int64(client.ReadDeadline.Seconds())))
}
if client.WriteDeadline == 0 {
client.WriteDeadline = 15*time.Second
log.SRelease("invalid WriteDeadline, reset to ", int64(client.WriteDeadline.Seconds()),"s")
log.Info("invalid WriteDeadline",log.Int64("reset", int64(client.WriteDeadline.Seconds())))
}
if client.NewAgent == nil {
log.SFatal("NewAgent must not be nil")
log.Fatal("NewAgent must not be nil")
}
if client.cons != nil {
log.SFatal("client is running")
log.Fatal("client is running")
}
if client.MinMsgLen == 0 {
@@ -77,7 +77,7 @@ func (client *TCPClient) init() {
maxMsgLen := client.MsgParser.getMaxMsgLen(client.LenMsgLen)
if client.MaxMsgLen > maxMsgLen {
client.MaxMsgLen = maxMsgLen
log.SRelease("invalid MaxMsgLen, reset to ", maxMsgLen)
log.Info("invalid MaxMsgLen",log.Uint32("reset", maxMsgLen))
}
client.cons = make(ConnSet)
@@ -102,7 +102,7 @@ func (client *TCPClient) dial() net.Conn {
return conn
}
log.SWarning("connect to ",client.Addr," error:", err.Error())
log.Warning("connect error ",log.String("error",err.Error()), log.String("Addr",client.Addr))
time.Sleep(client.ConnectInterval)
continue
}

View File

@@ -41,7 +41,7 @@ func newTCPConn(conn net.Conn, pendingWriteNum int, msgParser *MsgParser,writeDe
conn.SetWriteDeadline(time.Now().Add(writeDeadline))
_, err := conn.Write(b)
tcpConn.msgParser.ReleaseByteSlice(b)
tcpConn.msgParser.ReleaseBytes(b)
if err != nil {
break
@@ -92,7 +92,7 @@ func (tcpConn *TCPConn) GetRemoteIp() string {
func (tcpConn *TCPConn) doWrite(b []byte) error{
if len(tcpConn.writeChan) == cap(tcpConn.writeChan) {
tcpConn.ReleaseReadMsg(b)
log.SError("close conn: channel full")
log.Error("close conn: channel full")
tcpConn.doDestroy()
return errors.New("close conn: channel full")
}
@@ -130,7 +130,7 @@ func (tcpConn *TCPConn) ReadMsg() ([]byte, error) {
}
func (tcpConn *TCPConn) ReleaseReadMsg(byteBuff []byte){
tcpConn.msgParser.ReleaseByteSlice(byteBuff)
tcpConn.msgParser.ReleaseBytes(byteBuff)
}
func (tcpConn *TCPConn) WriteMsg(args ...[]byte) error {

View File

@@ -3,6 +3,7 @@ package network
import (
"encoding/binary"
"errors"
"github.com/duanhf2012/origin/util/bytespool"
"io"
"math"
)
@@ -16,7 +17,7 @@ type MsgParser struct {
MaxMsgLen uint32
LittleEndian bool
INetMempool
bytespool.IBytesMempool
}
@@ -34,7 +35,7 @@ func (p *MsgParser) getMaxMsgLen(lenMsgLen int) uint32 {
}
func (p *MsgParser) init(){
p.INetMempool = NewMemAreaPool()
p.IBytesMempool = bytespool.NewMemAreaPool()
}
// goroutine safe
@@ -74,9 +75,9 @@ func (p *MsgParser) Read(conn *TCPConn) ([]byte, error) {
}
// data
msgData := p.MakeByteSlice(int(msgLen))
msgData := p.MakeBytes(int(msgLen))
if _, err := io.ReadFull(conn, msgData[:msgLen]); err != nil {
p.ReleaseByteSlice(msgData)
p.ReleaseBytes(msgData)
return nil, err
}
@@ -99,7 +100,7 @@ func (p *MsgParser) Write(conn *TCPConn, args ...[]byte) error {
}
//msg := make([]byte, uint32(p.lenMsgLen)+msgLen)
msg := p.MakeByteSlice(p.LenMsgLen+int(msgLen))
msg := p.MakeBytes(p.LenMsgLen+int(msgLen))
// write len
switch p.LenMsgLen {
case 1:

View File

@@ -2,6 +2,7 @@ package network
import (
"github.com/duanhf2012/origin/log"
"github.com/duanhf2012/origin/util/bytespool"
"net"
"sync"
"time"
@@ -43,52 +44,52 @@ func (server *TCPServer) Start() {
func (server *TCPServer) init() {
ln, err := net.Listen("tcp", server.Addr)
if err != nil {
log.SFatal("Listen tcp error:", err.Error())
log.Fatal("Listen tcp fail",log.String("error", err.Error()))
}
if server.MaxConnNum <= 0 {
server.MaxConnNum = Default_MaxConnNum
log.SRelease("invalid MaxConnNum, reset to ", server.MaxConnNum)
log.Info("invalid MaxConnNum",log.Int("reset", server.MaxConnNum))
}
if server.PendingWriteNum <= 0 {
server.PendingWriteNum = Default_PendingWriteNum
log.SRelease("invalid PendingWriteNum, reset to ", server.PendingWriteNum)
log.Info("invalid PendingWriteNum",log.Int("reset", server.PendingWriteNum))
}
if server.LenMsgLen <= 0 {
server.LenMsgLen = Default_LenMsgLen
log.SRelease("invalid LenMsgLen, reset to ", server.LenMsgLen)
log.Info("invalid LenMsgLen", log.Int("reset", server.LenMsgLen))
}
if server.MaxMsgLen <= 0 {
server.MaxMsgLen = Default_MaxMsgLen
log.SRelease("invalid MaxMsgLen, reset to ", server.MaxMsgLen)
log.Info("invalid MaxMsgLen", log.Uint32("reset to", server.MaxMsgLen))
}
maxMsgLen := server.MsgParser.getMaxMsgLen(server.LenMsgLen)
if server.MaxMsgLen > maxMsgLen {
server.MaxMsgLen = maxMsgLen
log.SRelease("invalid MaxMsgLen, reset to ", maxMsgLen)
log.Info("invalid MaxMsgLen",log.Uint32("reset", maxMsgLen))
}
if server.MinMsgLen <= 0 {
server.MinMsgLen = Default_MinMsgLen
log.SRelease("invalid MinMsgLen, reset to ", server.MinMsgLen)
log.Info("invalid MinMsgLen",log.Uint32("reset", server.MinMsgLen))
}
if server.WriteDeadline == 0 {
server.WriteDeadline = Default_WriteDeadline
log.SRelease("invalid WriteDeadline, reset to ", server.WriteDeadline.Seconds(),"s")
log.Info("invalid WriteDeadline",log.Int64("reset",int64(server.WriteDeadline.Seconds())))
}
if server.ReadDeadline == 0 {
server.ReadDeadline = Default_ReadDeadline
log.SRelease("invalid ReadDeadline, reset to ", server.ReadDeadline.Seconds(),"s")
log.Info("invalid ReadDeadline",log.Int64("reset", int64(server.ReadDeadline.Seconds())))
}
if server.NewAgent == nil {
log.SFatal("NewAgent must not be nil")
log.Fatal("NewAgent must not be nil")
}
server.ln = ln
@@ -96,12 +97,12 @@ func (server *TCPServer) init() {
server.MsgParser.init()
}
func (server *TCPServer) SetNetMempool(mempool INetMempool){
server.INetMempool = mempool
func (server *TCPServer) SetNetMempool(mempool bytespool.IBytesMempool){
server.IBytesMempool = mempool
}
func (server *TCPServer) GetNetMempool() INetMempool{
return server.INetMempool
func (server *TCPServer) GetNetMempool() bytespool.IBytesMempool {
return server.IBytesMempool
}
func (server *TCPServer) run() {
@@ -121,7 +122,7 @@ func (server *TCPServer) run() {
if max := 1 * time.Second; tempDelay > max {
tempDelay = max
}
log.SRelease("accept error:",err.Error(),"; retrying in ", tempDelay)
log.Info("accept fail",log.String("error",err.Error()),log.Duration("sleep time", tempDelay))
time.Sleep(tempDelay)
continue
}
@@ -135,7 +136,7 @@ func (server *TCPServer) run() {
if len(server.conns) >= server.MaxConnNum {
server.mutexConns.Unlock()
conn.Close()
log.SWarning("too many connections")
log.Warning("too many connections")
continue
}

View File

@@ -40,29 +40,29 @@ func (client *WSClient) init() {
if client.ConnNum <= 0 {
client.ConnNum = 1
log.SRelease("invalid ConnNum, reset to ", client.ConnNum)
log.Info("invalid ConnNum",log.Int("reset", client.ConnNum))
}
if client.ConnectInterval <= 0 {
client.ConnectInterval = 3 * time.Second
log.SRelease("invalid ConnectInterval, reset to ", client.ConnectInterval)
log.Info("invalid ConnectInterval",log.Duration("reset", client.ConnectInterval))
}
if client.PendingWriteNum <= 0 {
client.PendingWriteNum = 100
log.SRelease("invalid PendingWriteNum, reset to ", client.PendingWriteNum)
log.Info("invalid PendingWriteNum",log.Int("reset", client.PendingWriteNum))
}
if client.MaxMsgLen <= 0 {
client.MaxMsgLen = 4096
log.SRelease("invalid MaxMsgLen, reset to ", client.MaxMsgLen)
log.Info("invalid MaxMsgLen",log.Uint32("reset", client.MaxMsgLen))
}
if client.HandshakeTimeout <= 0 {
client.HandshakeTimeout = 10 * time.Second
log.SRelease("invalid HandshakeTimeout, reset to ", client.HandshakeTimeout)
log.Info("invalid HandshakeTimeout",log.Duration("reset", client.HandshakeTimeout))
}
if client.NewAgent == nil {
log.SFatal("NewAgent must not be nil")
log.Fatal("NewAgent must not be nil")
}
if client.cons != nil {
log.SFatal("client is running")
log.Fatal("client is running")
}
if client.MessageType == 0 {
@@ -83,7 +83,7 @@ func (client *WSClient) dial() *websocket.Conn {
return conn
}
log.SRelease("connect to ", client.Addr," error: ", err.Error())
log.Info("connect fail", log.String("error",err.Error()),log.String("addr",client.Addr))
time.Sleep(client.ConnectInterval)
continue
}

View File

@@ -75,7 +75,7 @@ func (wsConn *WSConn) Close() {
func (wsConn *WSConn) doWrite(b []byte) {
if len(wsConn.writeChan) == cap(wsConn.writeChan) {
log.SDebug("close conn: channel full")
log.Debug("close conn: channel full")
wsConn.doDestroy()
return
}

View File

@@ -47,7 +47,7 @@ func (handler *WSHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
conn, err := handler.upgrader.Upgrade(w, r, nil)
if err != nil {
log.SError("upgrade error: ", err.Error())
log.Error("upgrade fail",log.String("error",err.Error()))
return
}
conn.SetReadLimit(int64(handler.maxMsgLen))
@@ -67,7 +67,7 @@ func (handler *WSHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if len(handler.conns) >= handler.maxConnNum {
handler.mutexConns.Unlock()
conn.Close()
log.SWarning("too many connections")
log.Warning("too many connections")
return
}
handler.conns[conn] = struct{}{}
@@ -95,27 +95,27 @@ func (server *WSServer) SetMessageType(messageType int) {
func (server *WSServer) Start() {
ln, err := net.Listen("tcp", server.Addr)
if err != nil {
log.SFatal("WSServer Listen fail:", err.Error())
log.Fatal("WSServer Listen fail",log.String("error", err.Error()))
}
if server.MaxConnNum <= 0 {
server.MaxConnNum = 100
log.SRelease("invalid MaxConnNum, reset to ", server.MaxConnNum)
log.Info("invalid MaxConnNum", log.Int("reset", server.MaxConnNum))
}
if server.PendingWriteNum <= 0 {
server.PendingWriteNum = 100
log.SRelease("invalid PendingWriteNum, reset to ", server.PendingWriteNum)
log.Info("invalid PendingWriteNum", log.Int("reset", server.PendingWriteNum))
}
if server.MaxMsgLen <= 0 {
server.MaxMsgLen = 4096
log.SRelease("invalid MaxMsgLen, reset to ", server.MaxMsgLen)
log.Info("invalid MaxMsgLen", log.Uint32("reset", server.MaxMsgLen))
}
if server.HTTPTimeout <= 0 {
server.HTTPTimeout = 10 * time.Second
log.SRelease("invalid HTTPTimeout, reset to ", server.HTTPTimeout)
log.Info("invalid HTTPTimeout", log.Duration("reset", server.HTTPTimeout))
}
if server.NewAgent == nil {
log.SFatal("NewAgent must not be nil")
log.Fatal("NewAgent must not be nil")
}
if server.CertFile != "" || server.KeyFile != "" {
@@ -126,7 +126,7 @@ func (server *WSServer) Start() {
config.Certificates = make([]tls.Certificate, 1)
config.Certificates[0], err = tls.LoadX509KeyPair(server.CertFile, server.KeyFile)
if err != nil {
log.SFatal("LoadX509KeyPair fail:", err.Error())
log.Fatal("LoadX509KeyPair fail",log.String("error", err.Error()))
}
ln = tls.NewListener(ln, config)

View File

@@ -11,7 +11,6 @@ import (
"github.com/duanhf2012/origin/util/buildtime"
"github.com/duanhf2012/origin/util/timer"
"io"
slog "log"
"net/http"
_ "net/http/pprof"
"os"
@@ -28,8 +27,8 @@ var preSetupService []service.IService //预安装
var profilerInterval time.Duration
var bValid bool
var configDir = "./config/"
var logLevel string = "debug"
var logPath string
type BuildOSType = int8
const(
@@ -50,6 +49,8 @@ func init() {
console.RegisterCommandString("console", "", "<-console true|false> Turn on or off screen log output.", openConsole)
console.RegisterCommandString("loglevel", "debug", "<-loglevel debug|release|warning|error|fatal> Set loglevel.", setLevel)
console.RegisterCommandString("logpath", "", "<-logpath path> Set log file path.", setLogPath)
console.RegisterCommandInt("logsize", 0, "<-logsize size> Set log size(MB).", setLogSize)
console.RegisterCommandInt("logchannelcap", 0, "<-logchannelcap num> Set log channel cap.", setLogChannelCapNum)
console.RegisterCommandString("pprof", "", "<-pprof ip:port> Open performance analysis.", setPprof)
}
@@ -144,7 +145,7 @@ func initNode(id int) {
nodeId = id
err := cluster.GetCluster().Init(GetNodeId(), Setup)
if err != nil {
log.SFatal("read system config is error ", err.Error())
log.Fatal("read system config is error ",log.ErrorAttr("error",err))
}
err = initLog()
@@ -168,7 +169,7 @@ func initNode(id int) {
}
if bSetup == false {
log.SFatal("Service name "+serviceName+" configuration error")
log.Fatal("Service name "+serviceName+" configuration error")
}
}
@@ -177,13 +178,13 @@ func initNode(id int) {
}
func initLog() error {
if logPath == "" {
if log.LogPath == "" {
setLogPath("./log")
}
localnodeinfo := cluster.GetCluster().GetLocalNodeInfo()
filepre := fmt.Sprintf("%s_%d_", localnodeinfo.NodeName, localnodeinfo.NodeId)
logger, err := log.New(logLevel, logPath, filepre, slog.LstdFlags|slog.Lshortfile, 10)
logger, err := log.NewTextLogger(log.LogLevel,log.LogPath,filepre,true,log.LogChannelCap)
if err != nil {
fmt.Printf("cannot create log file!\n")
return err
@@ -248,7 +249,7 @@ func startNode(args interface{}) error {
}
timer.StartTimer(10*time.Millisecond, 1000000)
log.SRelease("Start running server.")
log.Info("Start running server.")
//2.初始化node
initNode(nodeId)
@@ -270,7 +271,7 @@ func startNode(args interface{}) error {
for bRun {
select {
case <-sig:
log.SRelease("receipt stop signal.")
log.Info("receipt stop signal.")
bRun = false
case <-pProfilerTicker.C:
profiler.Report()
@@ -280,7 +281,8 @@ func startNode(args interface{}) error {
//7.退出
service.StopAllService()
log.SRelease("Server is stop.")
log.Info("Server is stop.")
log.Close()
return nil
}
@@ -304,11 +306,6 @@ func GetConfigDir() string {
return configDir
}
func SetSysLog(strLevel string, pathname string, flag int) {
logs, _ := log.New(strLevel, pathname, "", flag, 10)
log.Export(logs)
}
func OpenProfilerReport(interval time.Duration) {
profilerInterval = interval
}
@@ -333,9 +330,24 @@ func setLevel(args interface{}) error {
return nil
}
logLevel = strings.TrimSpace(args.(string))
if logLevel != "debug" && logLevel != "release" && logLevel != "warning" && logLevel != "error" && logLevel != "fatal" {
return errors.New("unknown level: " + logLevel)
strlogLevel := strings.TrimSpace(args.(string))
switch strlogLevel {
case "trace":
log.LogLevel = log.LevelTrace
case "debug":
log.LogLevel = log.LevelDebug
case "info":
log.LogLevel = log.LevelInfo
case "warning":
log.LogLevel = log.LevelWarning
case "error":
log.LogLevel = log.LevelError
case "stack":
log.LogLevel = log.LevelStack
case "fatal":
log.LogLevel = log.LevelFatal
default:
return errors.New("unknown level: " + strlogLevel)
}
return nil
}
@@ -344,18 +356,48 @@ func setLogPath(args interface{}) error {
if args == "" {
return nil
}
logPath = strings.TrimSpace(args.(string))
dir, err := os.Stat(logPath) //这个文件夹不存在
log.LogPath = strings.TrimSpace(args.(string))
dir, err := os.Stat(log.LogPath) //这个文件夹不存在
if err == nil && dir.IsDir() == false {
return errors.New("Not found dir " + logPath)
return errors.New("Not found dir " + log.LogPath)
}
if err != nil {
err = os.Mkdir(logPath, os.ModePerm)
err = os.Mkdir(log.LogPath, os.ModePerm)
if err != nil {
return errors.New("Cannot create dir " + logPath)
return errors.New("Cannot create dir " + log.LogPath)
}
}
return nil
}
func setLogSize(args interface{}) error {
if args == "" {
return nil
}
logSize,ok := args.(int)
if ok == false{
return errors.New("param logsize is error")
}
log.LogSize = int64(logSize)*1024*1024
return nil
}
func setLogChannelCapNum(args interface{}) error {
if args == "" {
return nil
}
logChannelCap,ok := args.(int)
if ok == false{
return errors.New("param logsize is error")
}
log.LogChannelCap = logChannelCap
return nil
}

View File

@@ -167,7 +167,7 @@ func DefaultReportFunction(name string,callNum int,costTime time.Duration,record
elem = elem.Next()
}
log.SRelease(strReport)
log.SInfo("report",strReport)
}
func Report() {

View File

@@ -81,14 +81,14 @@ func (bc *Client) checkRpcCallTimeout() {
pCall := bc.pending[callSeq]
if pCall == nil {
bc.pendingLock.Unlock()
log.SError("callSeq ",callSeq," is not find")
log.Error("call seq is not find",log.Uint64("seq", callSeq))
continue
}
delete(bc.pending,callSeq)
strTimeout := strconv.FormatInt(int64(pCall.TimeOut.Seconds()), 10)
pCall.Err = errors.New("RPC call takes more than " + strTimeout + " seconds,method is "+pCall.ServiceMethod)
log.SError(pCall.Err.Error())
log.Error("call timeout",log.String("error",pCall.Err.Error()))
bc.makeCallFail(pCall)
bc.pendingLock.Unlock()
continue
@@ -108,7 +108,7 @@ func (bc *Client) AddPending(call *Call) {
if call.Seq == 0 {
bc.pendingLock.Unlock()
log.SStack("call is error.")
log.Stack("call is error.")
return
}
@@ -160,7 +160,7 @@ func (bc *Client) cleanPending(){
}
pCall := bc.pending[callSeq]
if pCall == nil {
log.SError("callSeq ",callSeq," is not find")
log.Error("call Seq is not find",log.Uint64("seq",callSeq))
continue
}

View File

@@ -1,14 +1,14 @@
package rpc
import (
"runtime"
"errors"
"github.com/pierrec/lz4/v4"
"fmt"
"github.com/duanhf2012/origin/network"
"github.com/duanhf2012/origin/util/bytespool"
"github.com/pierrec/lz4/v4"
"runtime"
)
var memPool network.INetMempool = network.NewMemAreaPool()
var memPool bytespool.IBytesMempool = bytespool.NewMemAreaPool()
type ICompressor interface {
CompressBlock(src []byte) ([]byte, error) //dst如果有预申请使用dst内存传入nil时内部申请
@@ -42,10 +42,10 @@ func (lc *Lz4Compressor) CompressBlock(src []byte) (dest []byte, err error) {
var c lz4.Compressor
var cnt int
dest = memPool.MakeByteSlice(lz4.CompressBlockBound(len(src))+1)
dest = memPool.MakeBytes(lz4.CompressBlockBound(len(src))+1)
cnt, err = c.CompressBlock(src, dest[1:])
if err != nil {
memPool.ReleaseByteSlice(dest)
memPool.ReleaseBytes(dest)
return nil,err
}
@@ -55,7 +55,7 @@ func (lc *Lz4Compressor) CompressBlock(src []byte) (dest []byte, err error) {
}
if ratio > 255 {
memPool.ReleaseByteSlice(dest)
memPool.ReleaseBytes(dest)
return nil,fmt.Errorf("Impermissible errors")
}
@@ -79,10 +79,10 @@ func (lc *Lz4Compressor) UncompressBlock(src []byte) (dest []byte, err error) {
return nil,fmt.Errorf("Impermissible errors")
}
dest = memPool.MakeByteSlice(len(src)*int(radio))
dest = memPool.MakeBytes(len(src)*int(radio))
cnt, err := lz4.UncompressBlock(src[1:], dest)
if err != nil {
memPool.ReleaseByteSlice(dest)
memPool.ReleaseBytes(dest)
return nil,err
}
@@ -94,9 +94,9 @@ func (lc *Lz4Compressor) compressBlockBound(n int) int{
}
func (lc *Lz4Compressor) CompressBufferCollection(buffer []byte){
memPool.ReleaseByteSlice(buffer)
memPool.ReleaseBytes(buffer)
}
func (lc *Lz4Compressor) UnCompressBufferCollection(buffer []byte) {
memPool.ReleaseByteSlice(buffer)
memPool.ReleaseBytes(buffer)
}

File diff suppressed because it is too large Load Diff

View File

@@ -1,6 +1,6 @@
syntax = "proto3";
package rpc;
option go_package = "./rpc";
option go_package = ".;rpc";
message NodeInfo{
int32 NodeId = 1;

View File

@@ -1,106 +0,0 @@
package rpc
import (
"github.com/duanhf2012/origin/util/sync"
"github.com/gogo/protobuf/proto"
"fmt"
)
type GoGoPBProcessor struct {
}
var rpcGoGoPbResponseDataPool =sync.NewPool(make(chan interface{},10240), func()interface{}{
return &GoGoPBRpcResponseData{}
})
var rpcGoGoPbRequestDataPool =sync.NewPool(make(chan interface{},10240), func()interface{}{
return &GoGoPBRpcRequestData{}
})
func (slf *GoGoPBRpcRequestData) MakeRequest(seq uint64,rpcMethodId uint32,serviceMethod string,noReply bool,inParam []byte) *GoGoPBRpcRequestData{
slf.Seq = seq
slf.RpcMethodId = rpcMethodId
slf.ServiceMethod = serviceMethod
slf.NoReply = noReply
slf.InParam = inParam
return slf
}
func (slf *GoGoPBRpcResponseData) MakeRespone(seq uint64,err RpcError,reply []byte) *GoGoPBRpcResponseData{
slf.Seq = seq
slf.Error = err.Error()
slf.Reply = reply
return slf
}
func (slf *GoGoPBProcessor) Marshal(v interface{}) ([]byte, error){
return proto.Marshal(v.(proto.Message))
}
func (slf *GoGoPBProcessor) Unmarshal(data []byte, msg interface{}) error{
protoMsg,ok := msg.(proto.Message)
if ok == false {
return fmt.Errorf("%+v is not of proto.Message type",msg)
}
return proto.Unmarshal(data, protoMsg)
}
func (slf *GoGoPBProcessor) MakeRpcRequest(seq uint64,rpcMethodId uint32,serviceMethod string,noReply bool,inParam []byte) IRpcRequestData{
pGogoPbRpcRequestData := rpcGoGoPbRequestDataPool.Get().(*GoGoPBRpcRequestData)
pGogoPbRpcRequestData.MakeRequest(seq,rpcMethodId,serviceMethod,noReply,inParam)
return pGogoPbRpcRequestData
}
func (slf *GoGoPBProcessor) MakeRpcResponse(seq uint64,err RpcError,reply []byte) IRpcResponseData {
pGoGoPBRpcResponseData := rpcGoGoPbResponseDataPool.Get().(*GoGoPBRpcResponseData)
pGoGoPBRpcResponseData.MakeRespone(seq,err,reply)
return pGoGoPBRpcResponseData
}
func (slf *GoGoPBProcessor) ReleaseRpcRequest(rpcRequestData IRpcRequestData){
rpcGoGoPbRequestDataPool.Put(rpcRequestData)
}
func (slf *GoGoPBProcessor) ReleaseRpcResponse(rpcResponseData IRpcResponseData){
rpcGoGoPbResponseDataPool.Put(rpcResponseData)
}
func (slf *GoGoPBProcessor) IsParse(param interface{}) bool {
_,ok := param.(proto.Message)
return ok
}
func (slf *GoGoPBProcessor) GetProcessorType() RpcProcessorType{
return RpcProcessorGoGoPB
}
func (slf *GoGoPBProcessor) Clone(src interface{}) (interface{},error){
srcMsg,ok := src.(proto.Message)
if ok == false {
return nil,fmt.Errorf("param is not of proto.message type")
}
return proto.Clone(srcMsg),nil
}
func (slf *GoGoPBRpcRequestData) IsNoReply() bool{
return slf.GetNoReply()
}
func (slf *GoGoPBRpcResponseData) GetErr() *RpcError {
if slf.GetError() == "" {
return nil
}
err := RpcError(slf.GetError())
return &err
}

View File

@@ -1,769 +0,0 @@
// Code generated by protoc-gen-gogo. DO NOT EDIT.
// source: gogorpc.proto
package rpc
import (
fmt "fmt"
proto "github.com/gogo/protobuf/proto"
io "io"
math "math"
math_bits "math/bits"
)
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
// This is a compile-time assertion to ensure that this generated file
// is compatible with the proto package it is being compiled against.
// A compilation error at this line likely means your copy of the
// proto package needs to be updated.
const _ = proto.GoGoProtoPackageIsVersion3 // please upgrade the proto package
type GoGoPBRpcRequestData struct {
Seq uint64 `protobuf:"varint,1,opt,name=Seq,proto3" json:"Seq,omitempty"`
RpcMethodId uint32 `protobuf:"varint,2,opt,name=RpcMethodId,proto3" json:"RpcMethodId,omitempty"`
ServiceMethod string `protobuf:"bytes,3,opt,name=ServiceMethod,proto3" json:"ServiceMethod,omitempty"`
NoReply bool `protobuf:"varint,4,opt,name=NoReply,proto3" json:"NoReply,omitempty"`
InParam []byte `protobuf:"bytes,5,opt,name=InParam,proto3" json:"InParam,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *GoGoPBRpcRequestData) Reset() { *m = GoGoPBRpcRequestData{} }
func (m *GoGoPBRpcRequestData) String() string { return proto.CompactTextString(m) }
func (*GoGoPBRpcRequestData) ProtoMessage() {}
func (*GoGoPBRpcRequestData) Descriptor() ([]byte, []int) {
return fileDescriptor_d0e25d3af112ec8f, []int{0}
}
func (m *GoGoPBRpcRequestData) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
}
func (m *GoGoPBRpcRequestData) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
if deterministic {
return xxx_messageInfo_GoGoPBRpcRequestData.Marshal(b, m, deterministic)
} else {
b = b[:cap(b)]
n, err := m.MarshalToSizedBuffer(b)
if err != nil {
return nil, err
}
return b[:n], nil
}
}
func (m *GoGoPBRpcRequestData) XXX_Merge(src proto.Message) {
xxx_messageInfo_GoGoPBRpcRequestData.Merge(m, src)
}
func (m *GoGoPBRpcRequestData) XXX_Size() int {
return m.Size()
}
func (m *GoGoPBRpcRequestData) XXX_DiscardUnknown() {
xxx_messageInfo_GoGoPBRpcRequestData.DiscardUnknown(m)
}
var xxx_messageInfo_GoGoPBRpcRequestData proto.InternalMessageInfo
func (m *GoGoPBRpcRequestData) GetSeq() uint64 {
if m != nil {
return m.Seq
}
return 0
}
func (m *GoGoPBRpcRequestData) GetRpcMethodId() uint32 {
if m != nil {
return m.RpcMethodId
}
return 0
}
func (m *GoGoPBRpcRequestData) GetServiceMethod() string {
if m != nil {
return m.ServiceMethod
}
return ""
}
func (m *GoGoPBRpcRequestData) GetNoReply() bool {
if m != nil {
return m.NoReply
}
return false
}
func (m *GoGoPBRpcRequestData) GetInParam() []byte {
if m != nil {
return m.InParam
}
return nil
}
type GoGoPBRpcResponseData struct {
Seq uint64 `protobuf:"varint,1,opt,name=Seq,proto3" json:"Seq,omitempty"`
Error string `protobuf:"bytes,2,opt,name=Error,proto3" json:"Error,omitempty"`
Reply []byte `protobuf:"bytes,3,opt,name=Reply,proto3" json:"Reply,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *GoGoPBRpcResponseData) Reset() { *m = GoGoPBRpcResponseData{} }
func (m *GoGoPBRpcResponseData) String() string { return proto.CompactTextString(m) }
func (*GoGoPBRpcResponseData) ProtoMessage() {}
func (*GoGoPBRpcResponseData) Descriptor() ([]byte, []int) {
return fileDescriptor_d0e25d3af112ec8f, []int{1}
}
func (m *GoGoPBRpcResponseData) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
}
func (m *GoGoPBRpcResponseData) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
if deterministic {
return xxx_messageInfo_GoGoPBRpcResponseData.Marshal(b, m, deterministic)
} else {
b = b[:cap(b)]
n, err := m.MarshalToSizedBuffer(b)
if err != nil {
return nil, err
}
return b[:n], nil
}
}
func (m *GoGoPBRpcResponseData) XXX_Merge(src proto.Message) {
xxx_messageInfo_GoGoPBRpcResponseData.Merge(m, src)
}
func (m *GoGoPBRpcResponseData) XXX_Size() int {
return m.Size()
}
func (m *GoGoPBRpcResponseData) XXX_DiscardUnknown() {
xxx_messageInfo_GoGoPBRpcResponseData.DiscardUnknown(m)
}
var xxx_messageInfo_GoGoPBRpcResponseData proto.InternalMessageInfo
func (m *GoGoPBRpcResponseData) GetSeq() uint64 {
if m != nil {
return m.Seq
}
return 0
}
func (m *GoGoPBRpcResponseData) GetError() string {
if m != nil {
return m.Error
}
return ""
}
func (m *GoGoPBRpcResponseData) GetReply() []byte {
if m != nil {
return m.Reply
}
return nil
}
func init() {
proto.RegisterType((*GoGoPBRpcRequestData)(nil), "rpc.GoGoPBRpcRequestData")
proto.RegisterType((*GoGoPBRpcResponseData)(nil), "rpc.GoGoPBRpcResponseData")
}
func init() { proto.RegisterFile("gogorpc.proto", fileDescriptor_d0e25d3af112ec8f) }
var fileDescriptor_d0e25d3af112ec8f = []byte{
// 233 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0x4d, 0xcf, 0x4f, 0xcf,
0x2f, 0x2a, 0x48, 0xd6, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x62, 0x2e, 0x2a, 0x48, 0x56, 0x5a,
0xc2, 0xc8, 0x25, 0xe2, 0x9e, 0xef, 0x9e, 0x1f, 0xe0, 0x14, 0x54, 0x90, 0x1c, 0x94, 0x5a, 0x58,
0x9a, 0x5a, 0x5c, 0xe2, 0x92, 0x58, 0x92, 0x28, 0x24, 0xc0, 0xc5, 0x1c, 0x9c, 0x5a, 0x28, 0xc1,
0xa8, 0xc0, 0xa8, 0xc1, 0x12, 0x04, 0x62, 0x0a, 0x29, 0x70, 0x71, 0x07, 0x15, 0x24, 0xfb, 0xa6,
0x96, 0x64, 0xe4, 0xa7, 0x78, 0xa6, 0x48, 0x30, 0x29, 0x30, 0x6a, 0xf0, 0x06, 0x21, 0x0b, 0x09,
0xa9, 0x70, 0xf1, 0x06, 0xa7, 0x16, 0x95, 0x65, 0x26, 0xa7, 0x42, 0x84, 0x24, 0x98, 0x15, 0x18,
0x35, 0x38, 0x83, 0x50, 0x05, 0x85, 0x24, 0xb8, 0xd8, 0xfd, 0xf2, 0x83, 0x52, 0x0b, 0x72, 0x2a,
0x25, 0x58, 0x14, 0x18, 0x35, 0x38, 0x82, 0x60, 0x5c, 0x90, 0x8c, 0x67, 0x5e, 0x40, 0x62, 0x51,
0x62, 0xae, 0x04, 0xab, 0x02, 0xa3, 0x06, 0x4f, 0x10, 0x8c, 0xab, 0x14, 0xca, 0x25, 0x8a, 0xe4,
0xca, 0xe2, 0x82, 0xfc, 0xbc, 0xe2, 0x54, 0x1c, 0xce, 0x14, 0xe1, 0x62, 0x75, 0x2d, 0x2a, 0xca,
0x2f, 0x02, 0x3b, 0x90, 0x33, 0x08, 0xc2, 0x01, 0x89, 0x42, 0xac, 0x64, 0x06, 0x1b, 0x0c, 0xe1,
0x38, 0x09, 0x9f, 0x78, 0x24, 0xc7, 0x78, 0xe1, 0x91, 0x1c, 0xe3, 0x83, 0x47, 0x72, 0x8c, 0x51,
0xac, 0x7a, 0xfa, 0x45, 0x05, 0xc9, 0x49, 0x6c, 0xe0, 0xe0, 0x31, 0x06, 0x04, 0x00, 0x00, 0xff,
0xff, 0x26, 0xcf, 0x31, 0x39, 0x2f, 0x01, 0x00, 0x00,
}
func (m *GoGoPBRpcRequestData) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
n, err := m.MarshalToSizedBuffer(dAtA[:size])
if err != nil {
return nil, err
}
return dAtA[:n], nil
}
func (m *GoGoPBRpcRequestData) MarshalTo(dAtA []byte) (int, error) {
size := m.Size()
return m.MarshalToSizedBuffer(dAtA[:size])
}
func (m *GoGoPBRpcRequestData) MarshalToSizedBuffer(dAtA []byte) (int, error) {
i := len(dAtA)
_ = i
var l int
_ = l
if m.XXX_unrecognized != nil {
i -= len(m.XXX_unrecognized)
copy(dAtA[i:], m.XXX_unrecognized)
}
if len(m.InParam) > 0 {
i -= len(m.InParam)
copy(dAtA[i:], m.InParam)
i = encodeVarintGogorpc(dAtA, i, uint64(len(m.InParam)))
i--
dAtA[i] = 0x2a
}
if m.NoReply {
i--
if m.NoReply {
dAtA[i] = 1
} else {
dAtA[i] = 0
}
i--
dAtA[i] = 0x20
}
if len(m.ServiceMethod) > 0 {
i -= len(m.ServiceMethod)
copy(dAtA[i:], m.ServiceMethod)
i = encodeVarintGogorpc(dAtA, i, uint64(len(m.ServiceMethod)))
i--
dAtA[i] = 0x1a
}
if m.RpcMethodId != 0 {
i = encodeVarintGogorpc(dAtA, i, uint64(m.RpcMethodId))
i--
dAtA[i] = 0x10
}
if m.Seq != 0 {
i = encodeVarintGogorpc(dAtA, i, uint64(m.Seq))
i--
dAtA[i] = 0x8
}
return len(dAtA) - i, nil
}
func (m *GoGoPBRpcResponseData) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
n, err := m.MarshalToSizedBuffer(dAtA[:size])
if err != nil {
return nil, err
}
return dAtA[:n], nil
}
func (m *GoGoPBRpcResponseData) MarshalTo(dAtA []byte) (int, error) {
size := m.Size()
return m.MarshalToSizedBuffer(dAtA[:size])
}
func (m *GoGoPBRpcResponseData) MarshalToSizedBuffer(dAtA []byte) (int, error) {
i := len(dAtA)
_ = i
var l int
_ = l
if m.XXX_unrecognized != nil {
i -= len(m.XXX_unrecognized)
copy(dAtA[i:], m.XXX_unrecognized)
}
if len(m.Reply) > 0 {
i -= len(m.Reply)
copy(dAtA[i:], m.Reply)
i = encodeVarintGogorpc(dAtA, i, uint64(len(m.Reply)))
i--
dAtA[i] = 0x1a
}
if len(m.Error) > 0 {
i -= len(m.Error)
copy(dAtA[i:], m.Error)
i = encodeVarintGogorpc(dAtA, i, uint64(len(m.Error)))
i--
dAtA[i] = 0x12
}
if m.Seq != 0 {
i = encodeVarintGogorpc(dAtA, i, uint64(m.Seq))
i--
dAtA[i] = 0x8
}
return len(dAtA) - i, nil
}
func encodeVarintGogorpc(dAtA []byte, offset int, v uint64) int {
offset -= sovGogorpc(v)
base := offset
for v >= 1<<7 {
dAtA[offset] = uint8(v&0x7f | 0x80)
v >>= 7
offset++
}
dAtA[offset] = uint8(v)
return base
}
func (m *GoGoPBRpcRequestData) Size() (n int) {
if m == nil {
return 0
}
var l int
_ = l
if m.Seq != 0 {
n += 1 + sovGogorpc(uint64(m.Seq))
}
if m.RpcMethodId != 0 {
n += 1 + sovGogorpc(uint64(m.RpcMethodId))
}
l = len(m.ServiceMethod)
if l > 0 {
n += 1 + l + sovGogorpc(uint64(l))
}
if m.NoReply {
n += 2
}
l = len(m.InParam)
if l > 0 {
n += 1 + l + sovGogorpc(uint64(l))
}
if m.XXX_unrecognized != nil {
n += len(m.XXX_unrecognized)
}
return n
}
func (m *GoGoPBRpcResponseData) Size() (n int) {
if m == nil {
return 0
}
var l int
_ = l
if m.Seq != 0 {
n += 1 + sovGogorpc(uint64(m.Seq))
}
l = len(m.Error)
if l > 0 {
n += 1 + l + sovGogorpc(uint64(l))
}
l = len(m.Reply)
if l > 0 {
n += 1 + l + sovGogorpc(uint64(l))
}
if m.XXX_unrecognized != nil {
n += len(m.XXX_unrecognized)
}
return n
}
func sovGogorpc(x uint64) (n int) {
return (math_bits.Len64(x|1) + 6) / 7
}
func sozGogorpc(x uint64) (n int) {
return sovGogorpc(uint64((x << 1) ^ uint64((int64(x) >> 63))))
}
func (m *GoGoPBRpcRequestData) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowGogorpc
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return fmt.Errorf("proto: GoGoPBRpcRequestData: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: GoGoPBRpcRequestData: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
case 1:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field Seq", wireType)
}
m.Seq = 0
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowGogorpc
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
m.Seq |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
case 2:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field RpcMethodId", wireType)
}
m.RpcMethodId = 0
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowGogorpc
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
m.RpcMethodId |= uint32(b&0x7F) << shift
if b < 0x80 {
break
}
}
case 3:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field ServiceMethod", wireType)
}
var stringLen uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowGogorpc
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
stringLen |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
intStringLen := int(stringLen)
if intStringLen < 0 {
return ErrInvalidLengthGogorpc
}
postIndex := iNdEx + intStringLen
if postIndex < 0 {
return ErrInvalidLengthGogorpc
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.ServiceMethod = string(dAtA[iNdEx:postIndex])
iNdEx = postIndex
case 4:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field NoReply", wireType)
}
var v int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowGogorpc
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
v |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
m.NoReply = bool(v != 0)
case 5:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field InParam", wireType)
}
var byteLen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowGogorpc
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
byteLen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if byteLen < 0 {
return ErrInvalidLengthGogorpc
}
postIndex := iNdEx + byteLen
if postIndex < 0 {
return ErrInvalidLengthGogorpc
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.InParam = append(m.InParam[:0], dAtA[iNdEx:postIndex]...)
if m.InParam == nil {
m.InParam = []byte{}
}
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skipGogorpc(dAtA[iNdEx:])
if err != nil {
return err
}
if skippy < 0 {
return ErrInvalidLengthGogorpc
}
if (iNdEx + skippy) < 0 {
return ErrInvalidLengthGogorpc
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
}
m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...)
iNdEx += skippy
}
}
if iNdEx > l {
return io.ErrUnexpectedEOF
}
return nil
}
func (m *GoGoPBRpcResponseData) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowGogorpc
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return fmt.Errorf("proto: GoGoPBRpcResponseData: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: GoGoPBRpcResponseData: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
case 1:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field Seq", wireType)
}
m.Seq = 0
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowGogorpc
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
m.Seq |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
case 2:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Error", wireType)
}
var stringLen uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowGogorpc
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
stringLen |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
intStringLen := int(stringLen)
if intStringLen < 0 {
return ErrInvalidLengthGogorpc
}
postIndex := iNdEx + intStringLen
if postIndex < 0 {
return ErrInvalidLengthGogorpc
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Error = string(dAtA[iNdEx:postIndex])
iNdEx = postIndex
case 3:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Reply", wireType)
}
var byteLen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowGogorpc
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
byteLen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if byteLen < 0 {
return ErrInvalidLengthGogorpc
}
postIndex := iNdEx + byteLen
if postIndex < 0 {
return ErrInvalidLengthGogorpc
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Reply = append(m.Reply[:0], dAtA[iNdEx:postIndex]...)
if m.Reply == nil {
m.Reply = []byte{}
}
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skipGogorpc(dAtA[iNdEx:])
if err != nil {
return err
}
if skippy < 0 {
return ErrInvalidLengthGogorpc
}
if (iNdEx + skippy) < 0 {
return ErrInvalidLengthGogorpc
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
}
m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...)
iNdEx += skippy
}
}
if iNdEx > l {
return io.ErrUnexpectedEOF
}
return nil
}
func skipGogorpc(dAtA []byte) (n int, err error) {
l := len(dAtA)
iNdEx := 0
depth := 0
for iNdEx < l {
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return 0, ErrIntOverflowGogorpc
}
if iNdEx >= l {
return 0, io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
wireType := int(wire & 0x7)
switch wireType {
case 0:
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return 0, ErrIntOverflowGogorpc
}
if iNdEx >= l {
return 0, io.ErrUnexpectedEOF
}
iNdEx++
if dAtA[iNdEx-1] < 0x80 {
break
}
}
case 1:
iNdEx += 8
case 2:
var length int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return 0, ErrIntOverflowGogorpc
}
if iNdEx >= l {
return 0, io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
length |= (int(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
if length < 0 {
return 0, ErrInvalidLengthGogorpc
}
iNdEx += length
case 3:
depth++
case 4:
if depth == 0 {
return 0, ErrUnexpectedEndOfGroupGogorpc
}
depth--
case 5:
iNdEx += 4
default:
return 0, fmt.Errorf("proto: illegal wireType %d", wireType)
}
if iNdEx < 0 {
return 0, ErrInvalidLengthGogorpc
}
if depth == 0 {
return iNdEx, nil
}
}
return 0, io.ErrUnexpectedEOF
}
var (
ErrInvalidLengthGogorpc = fmt.Errorf("proto: negative length found during unmarshaling")
ErrIntOverflowGogorpc = fmt.Errorf("proto: integer overflow")
ErrUnexpectedEndOfGroupGogorpc = fmt.Errorf("proto: unexpected end of group")
)

View File

@@ -43,7 +43,7 @@ func (lc *LClient) Go(timeout time.Duration,rpcHandler IRpcHandler,noReply bool,
findIndex := strings.Index(serviceMethod, ".")
if findIndex == -1 {
sErr := errors.New("Call serviceMethod " + serviceMethod + " is error!")
log.SError(sErr.Error())
log.Error("call rpc fail",log.String("error",sErr.Error()))
call := MakeCall()
call.DoError(sErr)
@@ -100,7 +100,7 @@ func (lc *LClient) AsyncCall(timeout time.Duration,rpcHandler IRpcHandler, servi
if findIndex == -1 {
err := errors.New("Call serviceMethod " + serviceMethod + " is error!")
callback.Call([]reflect.Value{reflect.ValueOf(reply), reflect.ValueOf(err)})
log.SError(err.Error())
log.Error("serviceMethod format is error",log.String("error",err.Error()))
return emptyCancelRpc,nil
}

File diff suppressed because it is too large Load Diff

106
rpc/pbprocessor.go Normal file
View File

@@ -0,0 +1,106 @@
package rpc
import (
"github.com/duanhf2012/origin/util/sync"
"google.golang.org/protobuf/proto"
"fmt"
)
type PBProcessor struct {
}
var rpcPbResponseDataPool =sync.NewPool(make(chan interface{},10240), func()interface{}{
return &PBRpcResponseData{}
})
var rpcPbRequestDataPool =sync.NewPool(make(chan interface{},10240), func()interface{}{
return &PBRpcRequestData{}
})
func (slf *PBRpcRequestData) MakeRequest(seq uint64,rpcMethodId uint32,serviceMethod string,noReply bool,inParam []byte) *PBRpcRequestData{
slf.Seq = seq
slf.RpcMethodId = rpcMethodId
slf.ServiceMethod = serviceMethod
slf.NoReply = noReply
slf.InParam = inParam
return slf
}
func (slf *PBRpcResponseData) MakeRespone(seq uint64,err RpcError,reply []byte) *PBRpcResponseData{
slf.Seq = seq
slf.Error = err.Error()
slf.Reply = reply
return slf
}
func (slf *PBProcessor) Marshal(v interface{}) ([]byte, error){
return proto.Marshal(v.(proto.Message))
}
func (slf *PBProcessor) Unmarshal(data []byte, msg interface{}) error{
protoMsg,ok := msg.(proto.Message)
if ok == false {
return fmt.Errorf("%+v is not of proto.Message type",msg)
}
return proto.Unmarshal(data, protoMsg)
}
func (slf *PBProcessor) MakeRpcRequest(seq uint64,rpcMethodId uint32,serviceMethod string,noReply bool,inParam []byte) IRpcRequestData{
pGogoPbRpcRequestData := rpcPbRequestDataPool.Get().(*PBRpcRequestData)
pGogoPbRpcRequestData.MakeRequest(seq,rpcMethodId,serviceMethod,noReply,inParam)
return pGogoPbRpcRequestData
}
func (slf *PBProcessor) MakeRpcResponse(seq uint64,err RpcError,reply []byte) IRpcResponseData {
pPBRpcResponseData := rpcPbResponseDataPool.Get().(*PBRpcResponseData)
pPBRpcResponseData.MakeRespone(seq,err,reply)
return pPBRpcResponseData
}
func (slf *PBProcessor) ReleaseRpcRequest(rpcRequestData IRpcRequestData){
rpcPbRequestDataPool.Put(rpcRequestData)
}
func (slf *PBProcessor) ReleaseRpcResponse(rpcResponseData IRpcResponseData){
rpcPbResponseDataPool.Put(rpcResponseData)
}
func (slf *PBProcessor) IsParse(param interface{}) bool {
_,ok := param.(proto.Message)
return ok
}
func (slf *PBProcessor) GetProcessorType() RpcProcessorType{
return RpcProcessorGoGoPB
}
func (slf *PBProcessor) Clone(src interface{}) (interface{},error){
srcMsg,ok := src.(proto.Message)
if ok == false {
return nil,fmt.Errorf("param is not of proto.message type")
}
return proto.Clone(srcMsg),nil
}
func (slf *PBRpcRequestData) IsNoReply() bool{
return slf.GetNoReply()
}
func (slf *PBRpcResponseData) GetErr() *RpcError {
if slf.GetError() == "" {
return nil
}
err := RpcError(slf.GetError())
return &err
}

263
rpc/protorpc.pb.go Normal file
View File

@@ -0,0 +1,263 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.31.0
// protoc v3.11.4
// source: test/rpc/protorpc.proto
package rpc
import (
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
reflect "reflect"
sync "sync"
)
const (
// Verify that this generated code is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion)
// Verify that runtime/protoimpl is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
)
type PBRpcRequestData struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Seq uint64 `protobuf:"varint,1,opt,name=Seq,proto3" json:"Seq,omitempty"`
RpcMethodId uint32 `protobuf:"varint,2,opt,name=RpcMethodId,proto3" json:"RpcMethodId,omitempty"`
ServiceMethod string `protobuf:"bytes,3,opt,name=ServiceMethod,proto3" json:"ServiceMethod,omitempty"`
NoReply bool `protobuf:"varint,4,opt,name=NoReply,proto3" json:"NoReply,omitempty"`
InParam []byte `protobuf:"bytes,5,opt,name=InParam,proto3" json:"InParam,omitempty"`
}
func (x *PBRpcRequestData) Reset() {
*x = PBRpcRequestData{}
if protoimpl.UnsafeEnabled {
mi := &file_test_rpc_protorpc_proto_msgTypes[0]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *PBRpcRequestData) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*PBRpcRequestData) ProtoMessage() {}
func (x *PBRpcRequestData) ProtoReflect() protoreflect.Message {
mi := &file_test_rpc_protorpc_proto_msgTypes[0]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use PBRpcRequestData.ProtoReflect.Descriptor instead.
func (*PBRpcRequestData) Descriptor() ([]byte, []int) {
return file_test_rpc_protorpc_proto_rawDescGZIP(), []int{0}
}
func (x *PBRpcRequestData) GetSeq() uint64 {
if x != nil {
return x.Seq
}
return 0
}
func (x *PBRpcRequestData) GetRpcMethodId() uint32 {
if x != nil {
return x.RpcMethodId
}
return 0
}
func (x *PBRpcRequestData) GetServiceMethod() string {
if x != nil {
return x.ServiceMethod
}
return ""
}
func (x *PBRpcRequestData) GetNoReply() bool {
if x != nil {
return x.NoReply
}
return false
}
func (x *PBRpcRequestData) GetInParam() []byte {
if x != nil {
return x.InParam
}
return nil
}
type PBRpcResponseData struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Seq uint64 `protobuf:"varint,1,opt,name=Seq,proto3" json:"Seq,omitempty"`
Error string `protobuf:"bytes,2,opt,name=Error,proto3" json:"Error,omitempty"`
Reply []byte `protobuf:"bytes,3,opt,name=Reply,proto3" json:"Reply,omitempty"`
}
func (x *PBRpcResponseData) Reset() {
*x = PBRpcResponseData{}
if protoimpl.UnsafeEnabled {
mi := &file_test_rpc_protorpc_proto_msgTypes[1]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *PBRpcResponseData) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*PBRpcResponseData) ProtoMessage() {}
func (x *PBRpcResponseData) ProtoReflect() protoreflect.Message {
mi := &file_test_rpc_protorpc_proto_msgTypes[1]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use PBRpcResponseData.ProtoReflect.Descriptor instead.
func (*PBRpcResponseData) Descriptor() ([]byte, []int) {
return file_test_rpc_protorpc_proto_rawDescGZIP(), []int{1}
}
func (x *PBRpcResponseData) GetSeq() uint64 {
if x != nil {
return x.Seq
}
return 0
}
func (x *PBRpcResponseData) GetError() string {
if x != nil {
return x.Error
}
return ""
}
func (x *PBRpcResponseData) GetReply() []byte {
if x != nil {
return x.Reply
}
return nil
}
var File_test_rpc_protorpc_proto protoreflect.FileDescriptor
var file_test_rpc_protorpc_proto_rawDesc = []byte{
0x0a, 0x17, 0x74, 0x65, 0x73, 0x74, 0x2f, 0x72, 0x70, 0x63, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f,
0x72, 0x70, 0x63, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x03, 0x72, 0x70, 0x63, 0x22, 0xa0,
0x01, 0x0a, 0x10, 0x50, 0x42, 0x52, 0x70, 0x63, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x44,
0x61, 0x74, 0x61, 0x12, 0x10, 0x0a, 0x03, 0x53, 0x65, 0x71, 0x18, 0x01, 0x20, 0x01, 0x28, 0x04,
0x52, 0x03, 0x53, 0x65, 0x71, 0x12, 0x20, 0x0a, 0x0b, 0x52, 0x70, 0x63, 0x4d, 0x65, 0x74, 0x68,
0x6f, 0x64, 0x49, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x0b, 0x52, 0x70, 0x63, 0x4d,
0x65, 0x74, 0x68, 0x6f, 0x64, 0x49, 0x64, 0x12, 0x24, 0x0a, 0x0d, 0x53, 0x65, 0x72, 0x76, 0x69,
0x63, 0x65, 0x4d, 0x65, 0x74, 0x68, 0x6f, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0d,
0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x4d, 0x65, 0x74, 0x68, 0x6f, 0x64, 0x12, 0x18, 0x0a,
0x07, 0x4e, 0x6f, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x18, 0x04, 0x20, 0x01, 0x28, 0x08, 0x52, 0x07,
0x4e, 0x6f, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x12, 0x18, 0x0a, 0x07, 0x49, 0x6e, 0x50, 0x61, 0x72,
0x61, 0x6d, 0x18, 0x05, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x07, 0x49, 0x6e, 0x50, 0x61, 0x72, 0x61,
0x6d, 0x22, 0x51, 0x0a, 0x11, 0x50, 0x42, 0x52, 0x70, 0x63, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e,
0x73, 0x65, 0x44, 0x61, 0x74, 0x61, 0x12, 0x10, 0x0a, 0x03, 0x53, 0x65, 0x71, 0x18, 0x01, 0x20,
0x01, 0x28, 0x04, 0x52, 0x03, 0x53, 0x65, 0x71, 0x12, 0x14, 0x0a, 0x05, 0x45, 0x72, 0x72, 0x6f,
0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x45, 0x72, 0x72, 0x6f, 0x72, 0x12, 0x14,
0x0a, 0x05, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x05, 0x52,
0x65, 0x70, 0x6c, 0x79, 0x42, 0x07, 0x5a, 0x05, 0x2e, 0x3b, 0x72, 0x70, 0x63, 0x62, 0x06, 0x70,
0x72, 0x6f, 0x74, 0x6f, 0x33,
}
var (
file_test_rpc_protorpc_proto_rawDescOnce sync.Once
file_test_rpc_protorpc_proto_rawDescData = file_test_rpc_protorpc_proto_rawDesc
)
func file_test_rpc_protorpc_proto_rawDescGZIP() []byte {
file_test_rpc_protorpc_proto_rawDescOnce.Do(func() {
file_test_rpc_protorpc_proto_rawDescData = protoimpl.X.CompressGZIP(file_test_rpc_protorpc_proto_rawDescData)
})
return file_test_rpc_protorpc_proto_rawDescData
}
var file_test_rpc_protorpc_proto_msgTypes = make([]protoimpl.MessageInfo, 2)
var file_test_rpc_protorpc_proto_goTypes = []interface{}{
(*PBRpcRequestData)(nil), // 0: rpc.PBRpcRequestData
(*PBRpcResponseData)(nil), // 1: rpc.PBRpcResponseData
}
var file_test_rpc_protorpc_proto_depIdxs = []int32{
0, // [0:0] is the sub-list for method output_type
0, // [0:0] is the sub-list for method input_type
0, // [0:0] is the sub-list for extension type_name
0, // [0:0] is the sub-list for extension extendee
0, // [0:0] is the sub-list for field type_name
}
func init() { file_test_rpc_protorpc_proto_init() }
func file_test_rpc_protorpc_proto_init() {
if File_test_rpc_protorpc_proto != nil {
return
}
if !protoimpl.UnsafeEnabled {
file_test_rpc_protorpc_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*PBRpcRequestData); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_test_rpc_protorpc_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*PBRpcResponseData); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
}
type x struct{}
out := protoimpl.TypeBuilder{
File: protoimpl.DescBuilder{
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_test_rpc_protorpc_proto_rawDesc,
NumEnums: 0,
NumMessages: 2,
NumExtensions: 0,
NumServices: 0,
},
GoTypes: file_test_rpc_protorpc_proto_goTypes,
DependencyIndexes: file_test_rpc_protorpc_proto_depIdxs,
MessageInfos: file_test_rpc_protorpc_proto_msgTypes,
}.Build()
File_test_rpc_protorpc_proto = out.File
file_test_rpc_protorpc_proto_rawDesc = nil
file_test_rpc_protorpc_proto_goTypes = nil
file_test_rpc_protorpc_proto_depIdxs = nil
}

View File

@@ -1,8 +1,8 @@
syntax = "proto3";
package rpc;
option go_package = "./rpc";
option go_package = ".;rpc";
message GoGoPBRpcRequestData{
message PBRpcRequestData{
uint64 Seq = 1;
uint32 RpcMethodId = 2;
string ServiceMethod = 3;
@@ -10,7 +10,7 @@ message GoGoPBRpcRequestData{
bytes InParam = 5;
}
message GoGoPBRpcResponseData{
message PBRpcResponseData{
uint64 Seq = 1;
string Error = 2;
bytes Reply = 3;

File diff suppressed because it is too large Load Diff

View File

@@ -46,7 +46,7 @@ func (rc *RClient) Go(timeout time.Duration,rpcHandler IRpcHandler,noReply bool,
_, processor := GetProcessorType(args)
InParam, err := processor.Marshal(args)
if err != nil {
log.SError(err.Error())
log.Error("Marshal is fail",log.ErrorAttr("error",err))
call := MakeCall()
call.DoError(err)
return call
@@ -68,7 +68,7 @@ func (rc *RClient) RawGo(timeout time.Duration,rpcHandler IRpcHandler,processor
if err != nil {
call.Seq = 0
log.SError(err.Error())
log.Error("marshal is fail",log.String("error",err.Error()))
call.DoError(err)
return call
}
@@ -77,7 +77,7 @@ func (rc *RClient) RawGo(timeout time.Duration,rpcHandler IRpcHandler,processor
if conn == nil || conn.IsConnected()==false {
call.Seq = 0
sErr := errors.New(serviceMethod + " was called failed,rpc client is disconnect")
log.SError(sErr.Error())
log.Error("conn is disconnect",log.String("error",sErr.Error()))
call.DoError(sErr)
return call
}
@@ -89,7 +89,7 @@ func (rc *RClient) RawGo(timeout time.Duration,rpcHandler IRpcHandler,processor
compressBuff,cErr = compressor.CompressBlock(bytes)
if cErr != nil {
call.Seq = 0
log.SError(cErr.Error())
log.Error("compress fail",log.String("error",cErr.Error()))
call.DoError(cErr)
return call
}
@@ -109,9 +109,7 @@ func (rc *RClient) RawGo(timeout time.Duration,rpcHandler IRpcHandler,processor
}
if err != nil {
rc.selfClient.RemovePending(call.Seq)
log.SError(err.Error())
log.Error("WiteMsg is fail",log.ErrorAttr("error",err))
call.Seq = 0
call.DoError(err)
}
@@ -191,14 +189,13 @@ func (rc *RClient) asyncCall(timeout time.Duration,rpcHandler IRpcHandler, servi
return emptyCancelRpc,nil
}
func (rc *RClient) Run() {
defer func() {
if r := recover(); r != nil {
buf := make([]byte, 4096)
l := runtime.Stack(buf, false)
errString := fmt.Sprint(r)
log.SError("core dump info[", errString, "]\n", string(buf[:l]))
log.Dump(string(buf[:l]),log.String("error",errString))
}
}()
@@ -206,7 +203,7 @@ func (rc *RClient) Run() {
for {
bytes, err := rc.conn.ReadMsg()
if err != nil {
log.SError("rpcClient ", rc.Addr, " ReadMsg error:", err.Error())
log.Error("rclient read msg is failed",log.ErrorAttr("error",err))
return
}
@@ -214,7 +211,7 @@ func (rc *RClient) Run() {
processor := GetProcessor(bytes[0]&0x7f)
if processor == nil {
rc.conn.ReleaseReadMsg(bytes)
log.SError("rpcClient ", rc.Addr, " ReadMsg head error:", err.Error())
log.Error("cannot find process",log.Uint8("process type",bytes[0]&0x7f))
return
}
@@ -231,7 +228,7 @@ func (rc *RClient) Run() {
compressBuff,unCompressErr = compressor.UncompressBlock(byteData)
if unCompressErr!= nil {
rc.conn.ReleaseReadMsg(bytes)
log.SError("rpcClient ", rc.Addr, " ReadMsg head error:", unCompressErr.Error())
log.Error("uncompressBlock failed",log.ErrorAttr("error",unCompressErr))
return
}
byteData = compressBuff
@@ -245,19 +242,19 @@ func (rc *RClient) Run() {
rc.conn.ReleaseReadMsg(bytes)
if err != nil {
processor.ReleaseRpcResponse(response.RpcResponseData)
log.SError("rpcClient Unmarshal head error:", err.Error())
log.Error("rpcClient Unmarshal head error",log.ErrorAttr("error",err))
continue
}
v := rc.selfClient.RemovePending(response.RpcResponseData.GetSeq())
if v == nil {
log.SError("rpcClient cannot find seq ", response.RpcResponseData.GetSeq(), " in pending")
log.Error("rpcClient cannot find seq",log.Uint64("seq",response.RpcResponseData.GetSeq()))
} else {
v.Err = nil
if len(response.RpcResponseData.GetReply()) > 0 {
err = processor.Unmarshal(response.RpcResponseData.GetReply(), v.Reply)
if err != nil {
log.SError("rpcClient Unmarshal body error:", err.Error())
log.Error("rpcClient Unmarshal body failed",log.ErrorAttr("error",err))
v.Err = err
}
}

View File

@@ -215,7 +215,7 @@ func (handler *RpcHandler) HandlerRpcResponseCB(call *Call) {
buf := make([]byte, 4096)
l := runtime.Stack(buf, false)
errString := fmt.Sprint(r)
log.SError("core dump info[", errString, "]\n", string(buf[:l]))
log.Dump(string(buf[:l]),log.String("error",errString))
}
}()
@@ -237,7 +237,7 @@ func (handler *RpcHandler) HandlerRpcRequest(request *RpcRequest) {
buf := make([]byte, 4096)
l := runtime.Stack(buf, false)
errString := fmt.Sprint(r)
log.SError("Handler Rpc ", request.RpcRequestData.GetServiceMethod(), " Core dump info[", errString, "]\n", string(buf[:l]))
log.Dump(string(buf[:l]),log.String("error",errString))
rpcErr := RpcError("call error : core dumps")
if request.requestHandle != nil {
request.requestHandle(nil, rpcErr)
@@ -250,12 +250,12 @@ func (handler *RpcHandler) HandlerRpcRequest(request *RpcRequest) {
if rawRpcId > 0 {
v, ok := handler.mapRawFunctions[rawRpcId]
if ok == false {
log.SError("RpcHandler cannot find request rpc id", rawRpcId)
log.Error("RpcHandler cannot find request rpc id",log.Uint32("rawRpcId",rawRpcId))
return
}
rawData,ok := request.inParam.([]byte)
if ok == false {
log.SError("RpcHandler " + handler.rpcHandler.GetName()," cannot convert in param to []byte", rawRpcId)
log.Error("RpcHandler cannot convert",log.String("RpcHandlerName",handler.rpcHandler.GetName()),log.Uint32("rawRpcId",rawRpcId))
return
}
@@ -267,7 +267,7 @@ func (handler *RpcHandler) HandlerRpcRequest(request *RpcRequest) {
v, ok := handler.mapFunctions[request.RpcRequestData.GetServiceMethod()]
if ok == false {
err := "RpcHandler " + handler.rpcHandler.GetName() + "cannot find " + request.RpcRequestData.GetServiceMethod()
log.SError(err)
log.Error("HandlerRpcRequest cannot find serviceMethod",log.String("RpcHandlerName",handler.rpcHandler.GetName()),log.String("serviceMethod",request.RpcRequestData.GetServiceMethod()))
if request.requestHandle != nil {
request.requestHandle(nil, RpcError(err))
}
@@ -298,7 +298,7 @@ func (handler *RpcHandler) HandlerRpcRequest(request *RpcRequest) {
paramList = append(paramList, oParam) //输出参数
} else if request.requestHandle != nil && v.hasResponder == false { //调用方有返回值,但被调用函数没有返回参数
rErr := "Call Rpc " + request.RpcRequestData.GetServiceMethod() + " without return parameter!"
log.SError(rErr)
log.Error("call serviceMethod without return parameter",log.String("serviceMethod",request.RpcRequestData.GetServiceMethod()))
request.requestHandle(nil, RpcError(rErr))
return
}
@@ -320,7 +320,7 @@ func (handler *RpcHandler) CallMethod(client *Client,ServiceMethod string, param
v, ok := handler.mapFunctions[ServiceMethod]
if ok == false {
err = errors.New("RpcHandler " + handler.rpcHandler.GetName() + " cannot find" + ServiceMethod)
log.SError(err.Error())
log.Error("CallMethod cannot find serviceMethod",log.String("rpcHandlerName",handler.rpcHandler.GetName()),log.String("serviceMethod",ServiceMethod))
return err
}
@@ -344,7 +344,7 @@ func (handler *RpcHandler) CallMethod(client *Client,ServiceMethod string, param
hander :=func(Returns interface{}, Err RpcError) {
rpcCall := client.RemovePending(callSeq)
if rpcCall == nil {
log.SError("cannot find call seq ",callSeq)
log.Error("cannot find call seq",log.Uint64("seq",callSeq))
return
}
@@ -431,15 +431,15 @@ func (handler *RpcHandler) goRpc(processor IRpcProcessor, bCast bool, nodeId int
err, count := handler.funcRpcClient(nodeId, serviceMethod, pClientList[:])
if count == 0 {
if err != nil {
log.SError("Call ", serviceMethod, " is error:", err.Error())
log.Error("call serviceMethod is failed",log.String("serviceMethod",serviceMethod),log.ErrorAttr("error",err))
} else {
log.SError("Can not find ", serviceMethod)
log.Error("cannot find serviceMethod",log.String("serviceMethod",serviceMethod))
}
return err
}
if count > 1 && bCast == false {
log.SError("Cannot call %s more then 1 node!", serviceMethod)
log.Error("cannot call serviceMethod more then 1 node",log.String("serviceMethod",serviceMethod))
return errors.New("cannot call more then 1 node")
}
@@ -460,14 +460,14 @@ func (handler *RpcHandler) callRpc(timeout time.Duration,nodeId int, serviceMeth
var pClientList [maxClusterNode]*Client
err, count := handler.funcRpcClient(nodeId, serviceMethod, pClientList[:])
if err != nil {
log.SError("Call serviceMethod is error:", err.Error())
log.Error("Call serviceMethod is failed",log.ErrorAttr("error",err))
return err
} else if count <= 0 {
err = errors.New("Call serviceMethod is error:cannot find " + serviceMethod)
log.SError(err.Error())
log.Error("cannot find serviceMethod",log.String("serviceMethod",serviceMethod))
return err
} else if count > 1 {
log.SError("Cannot call more then 1 node!")
log.Error("Cannot call more then 1 node!",log.String("serviceMethod",serviceMethod))
return errors.New("cannot call more then 1 node")
}
@@ -484,19 +484,19 @@ func (handler *RpcHandler) asyncCallRpc(timeout time.Duration,nodeId int, servic
fVal := reflect.ValueOf(callback)
if fVal.Kind() != reflect.Func {
err := errors.New("call " + serviceMethod + " input callback param is error!")
log.SError(err.Error())
log.Error("input callback param is error",log.String("serviceMethod",serviceMethod))
return emptyCancelRpc,err
}
if fVal.Type().NumIn() != 2 {
err := errors.New("call " + serviceMethod + " callback param function is error!")
log.SError(err.Error())
log.Error("callback param function is error",log.String("serviceMethod",serviceMethod))
return emptyCancelRpc,err
}
if fVal.Type().In(0).Kind() != reflect.Ptr || fVal.Type().In(1).String() != "error" {
err := errors.New("call " + serviceMethod + " callback param function is error!")
log.SError(err.Error())
log.Error("callback param function is error",log.String("serviceMethod",serviceMethod))
return emptyCancelRpc,err
}
@@ -512,14 +512,14 @@ func (handler *RpcHandler) asyncCallRpc(timeout time.Duration,nodeId int, servic
}
}
fVal.Call([]reflect.Value{reflect.ValueOf(reply), reflect.ValueOf(err)})
log.SError("Call serviceMethod is error:", err.Error())
log.Error("cannot find serviceMethod from node",log.String("serviceMethod",serviceMethod),log.Int("nodeId",nodeId))
return emptyCancelRpc,nil
}
if count > 1 {
err := errors.New("cannot call more then 1 node")
fVal.Call([]reflect.Value{reflect.ValueOf(reply), reflect.ValueOf(err)})
log.SError(err.Error())
log.Error("cannot call more then 1 node",log.String("serviceMethod",serviceMethod))
return emptyCancelRpc,nil
}
@@ -587,12 +587,12 @@ func (handler *RpcHandler) RawGoNode(rpcProcessorType RpcProcessorType, nodeId i
processor := GetProcessor(uint8(rpcProcessorType))
err, count := handler.funcRpcClient(nodeId, serviceName, handler.pClientList)
if count == 0 || err != nil {
log.SError("Call serviceMethod is error:", err.Error())
log.Error("call serviceMethod is failed",log.ErrorAttr("error",err))
return err
}
if count > 1 {
err := errors.New("cannot call more then 1 node")
log.SError(err.Error())
log.Error("cannot call more then 1 node",log.String("serviceName",serviceName))
return err
}

View File

@@ -10,6 +10,7 @@ import (
"reflect"
"strings"
"time"
"runtime"
)
type RpcProcessorType uint8
@@ -19,7 +20,7 @@ const (
RpcProcessorGoGoPB RpcProcessorType = 1
)
var arrayProcessor = []IRpcProcessor{&JsonProcessor{}, &GoGoPBProcessor{}}
var arrayProcessor = []IRpcProcessor{&JsonProcessor{}, &PBProcessor{}}
var arrayProcessorLen uint8 = 2
var LittleEndian bool
@@ -69,7 +70,7 @@ const Default_ReadWriteDeadline = 15*time.Second
func (server *Server) Start(listenAddr string, maxRpcParamLen uint32,compressBytesLen int) {
splitAddr := strings.Split(listenAddr, ":")
if len(splitAddr) != 2 {
log.SFatal("listen addr is error :", listenAddr)
log.Fatal("listen addr is failed", log.String("listenAddr",listenAddr))
}
server.rpcServer.Addr = ":" + splitAddr[1]
@@ -111,7 +112,7 @@ func (agent *RpcAgent) WriteResponse(processor IRpcProcessor, serviceMethod stri
defer processor.ReleaseRpcResponse(rpcResponse.RpcResponseData)
if errM != nil {
log.SError("service method ", serviceMethod, " Marshal error:", errM.Error())
log.Error("mashal RpcResponseData failed",log.String("serviceMethod",serviceMethod),log.ErrorAttr("error",errM))
return
}
@@ -122,7 +123,7 @@ func (agent *RpcAgent) WriteResponse(processor IRpcProcessor, serviceMethod stri
compressBuff,cErr = compressor.CompressBlock(bytes)
if cErr != nil {
log.SError("service method ", serviceMethod, " CompressBlock error:", cErr.Error())
log.Error("CompressBlock failed",log.String("serviceMethod",serviceMethod),log.ErrorAttr("error",cErr))
return
}
if len(compressBuff) < len(bytes) {
@@ -136,15 +137,24 @@ func (agent *RpcAgent) WriteResponse(processor IRpcProcessor, serviceMethod stri
compressor.CompressBufferCollection(compressBuff)
}
if errM != nil {
log.SError("Rpc ", serviceMethod, " return is error:", errM.Error())
log.Error("WriteMsg error,Rpc return is fail",log.String("serviceMethod",serviceMethod),log.ErrorAttr("error",errM))
}
}
func (agent *RpcAgent) Run() {
defer func() {
if r := recover(); r != nil {
buf := make([]byte, 4096)
l := runtime.Stack(buf, false)
errString := fmt.Sprint(r)
log.Dump(string(buf[:l]),log.String("error",errString))
}
}()
for {
data, err := agent.conn.ReadMsg()
if err != nil {
log.SError("remoteAddress:", agent.conn.RemoteAddr().String(), ",read message: ", err.Error())
log.Error("read message is error",log.String("remoteAddress",agent.conn.RemoteAddr().String()),log.ErrorAttr("error",err))
//will close tcpconn
break
}
@@ -153,7 +163,7 @@ func (agent *RpcAgent) Run() {
processor := GetProcessor(data[0]&0x7f)
if processor == nil {
agent.conn.ReleaseReadMsg(data)
log.SError("remote rpc ", agent.conn.RemoteAddr().String(), " cannot find processor:", data[0])
log.Warning("cannot find processor",log.String("RemoteAddr",agent.conn.RemoteAddr().String()))
return
}
@@ -166,7 +176,7 @@ func (agent *RpcAgent) Run() {
compressBuff,unCompressErr = compressor.UncompressBlock(byteData)
if unCompressErr!= nil {
agent.conn.ReleaseReadMsg(data)
log.SError("rpcClient ", agent.conn.RemoteAddr().String(), " ReadMsg head error:", unCompressErr.Error())
log.Error("UncompressBlock failed",log.String("RemoteAddr",agent.conn.RemoteAddr().String()),log.ErrorAttr("error",unCompressErr))
return
}
byteData = compressBuff
@@ -179,7 +189,7 @@ func (agent *RpcAgent) Run() {
}
agent.conn.ReleaseReadMsg(data)
if err != nil {
log.SError("rpc Unmarshal request is error:", err.Error())
log.Error("Unmarshal failed",log.String("RemoteAddr",agent.conn.RemoteAddr().String()),log.ErrorAttr("error",err))
if req.RpcRequestData.GetSeq() > 0 {
rpcError := RpcError(err.Error())
if req.RpcRequestData.IsNoReply() == false {
@@ -201,7 +211,7 @@ func (agent *RpcAgent) Run() {
agent.WriteResponse(processor, req.RpcRequestData.GetServiceMethod(), req.RpcRequestData.GetSeq(), nil, rpcError)
}
ReleaseRpcRequest(req)
log.SError("rpc request req.ServiceMethod is error")
log.Error("rpc request req.ServiceMethod is error")
continue
}
@@ -211,8 +221,7 @@ func (agent *RpcAgent) Run() {
if req.RpcRequestData.IsNoReply() == false {
agent.WriteResponse(processor, req.RpcRequestData.GetServiceMethod(), req.RpcRequestData.GetSeq(), nil, rpcError)
}
log.SError("service method ", req.RpcRequestData.GetServiceMethod(), " not config!")
log.Error("serviceMethod not config",log.String("serviceMethod",req.RpcRequestData.GetServiceMethod()))
ReleaseRpcRequest(req)
continue
}
@@ -227,12 +236,13 @@ func (agent *RpcAgent) Run() {
req.inParam, err = rpcHandler.UnmarshalInParam(req.rpcProcessor, req.RpcRequestData.GetServiceMethod(), req.RpcRequestData.GetRpcMethodId(), req.RpcRequestData.GetInParam())
if err != nil {
rErr := "Call Rpc " + req.RpcRequestData.GetServiceMethod() + " Param error " + err.Error()
log.Error("call rpc param error",log.String("serviceMethod",req.RpcRequestData.GetServiceMethod()),log.ErrorAttr("error",err))
if req.requestHandle != nil {
req.requestHandle(nil, RpcError(rErr))
} else {
ReleaseRpcRequest(req)
}
log.SError(rErr)
continue
}
@@ -281,7 +291,7 @@ func (server *Server) myselfRpcHandlerGo(client *Client,handlerName string, serv
rpcHandler := server.rpcHandleFinder.FindRpcHandler(handlerName)
if rpcHandler == nil {
err := errors.New("service method " + serviceMethod + " not config!")
log.SError(err.Error())
log.Error("service method not config",log.String("serviceMethod",serviceMethod))
return err
}
@@ -297,7 +307,7 @@ func (server *Server) selfNodeRpcHandlerGo(timeout time.Duration,processor IRpcP
rpcHandler := server.rpcHandleFinder.FindRpcHandler(handlerName)
if rpcHandler == nil {
err := errors.New("service method " + serviceMethod + " not config!")
log.SError(err.Error())
log.Error("service method not config",log.String("serviceMethod",serviceMethod),log.ErrorAttr("error",err))
pCall.Seq = 0
pCall.DoError(err)
@@ -314,7 +324,7 @@ func (server *Server) selfNodeRpcHandlerGo(timeout time.Duration,processor IRpcP
iParam,err = processor.Clone(args)
if err != nil {
sErr := errors.New("RpcHandler " + handlerName + "."+serviceMethod+" deep copy inParam is error:" + err.Error())
log.SError(sErr.Error())
log.Error("deep copy inParam is failed",log.String("handlerName",handlerName),log.String("serviceMethod",serviceMethod))
pCall.Seq = 0
pCall.DoError(sErr)
@@ -329,7 +339,7 @@ func (server *Server) selfNodeRpcHandlerGo(timeout time.Duration,processor IRpcP
var err error
req.inParam, err = rpcHandler.UnmarshalInParam(processor, serviceMethod, rpcMethodId, rawArgs)
if err != nil {
log.SError(err.Error())
log.Error("unmarshalInParam is failed",log.String("serviceMethod",serviceMethod),log.Uint32("rpcMethodId",rpcMethodId),log.ErrorAttr("error",err))
pCall.Seq = 0
pCall.DoError(err)
ReleaseRpcRequest(req)
@@ -345,12 +355,12 @@ func (server *Server) selfNodeRpcHandlerGo(timeout time.Duration,processor IRpcP
byteReturns, err := req.rpcProcessor.Marshal(Returns)
if err != nil {
Err = ConvertError(err)
log.SError("returns data cannot be marshal,callSeq is ", callSeq," error is ",err.Error())
log.Error("returns data cannot be marshal",log.Uint64("seq",callSeq),log.ErrorAttr("error",err))
}else{
err = req.rpcProcessor.Unmarshal(byteReturns, reply)
if err != nil {
Err = ConvertError(err)
log.SError("returns data cannot be Unmarshal,callSeq is ", callSeq," error is ",err.Error())
log.Error("returns data cannot be Unmarshal",log.Uint64("seq",callSeq),log.ErrorAttr("error",err))
}
}
}
@@ -358,8 +368,7 @@ func (server *Server) selfNodeRpcHandlerGo(timeout time.Duration,processor IRpcP
ReleaseRpcRequest(req)
v := client.RemovePending(callSeq)
if v == nil {
log.SError("rpcClient cannot find seq ",callSeq, " in pending")
log.Error("rpcClient cannot find seq",log.Uint64("seq",callSeq))
return
}
@@ -367,7 +376,7 @@ func (server *Server) selfNodeRpcHandlerGo(timeout time.Duration,processor IRpcP
v.Err = nil
v.DoOK()
} else {
log.SError(Err.Error())
log.Error(Err.Error())
v.DoError(Err)
}
}
@@ -375,7 +384,7 @@ func (server *Server) selfNodeRpcHandlerGo(timeout time.Duration,processor IRpcP
err := rpcHandler.PushRpcRequest(req)
if err != nil {
log.SError(err.Error())
log.Error(err.Error())
pCall.DoError(err)
ReleaseRpcRequest(req)
}
@@ -387,7 +396,7 @@ func (server *Server) selfNodeRpcHandlerAsyncGo(timeout time.Duration,client *Cl
rpcHandler := server.rpcHandleFinder.FindRpcHandler(handlerName)
if rpcHandler == nil {
err := errors.New("service method " + serviceMethod + " not config!")
log.SError(err.Error())
log.Error(err.Error())
return emptyCancelRpc,err
}
@@ -395,7 +404,7 @@ func (server *Server) selfNodeRpcHandlerAsyncGo(timeout time.Duration,client *Cl
iParam,err := processor.Clone(args)
if err != nil {
errM := errors.New("RpcHandler " + handlerName + "."+serviceMethod+" deep copy inParam is error:" + err.Error())
log.SError(errM.Error())
log.Error(errM.Error())
return emptyCancelRpc,errM
}

View File

@@ -117,7 +117,7 @@ func (m *Module) AddModule(module IModule) (uint32, error) {
m.child[module.GetModuleId()] = module
m.ancestor.getBaseModule().(*Module).descendants[module.GetModuleId()] = module
log.SDebug("Add module ", module.GetModuleName(), " completed")
log.Debug("Add module "+module.GetModuleName()+ " completed")
return module.GetModuleId(), nil
}
@@ -131,7 +131,7 @@ func (m *Module) ReleaseModule(moduleId uint32) {
pModule.self.OnRelease()
pModule.GetEventHandler().Destroy()
log.SDebug("Release module ", pModule.GetModuleName())
log.Debug("Release module "+ pModule.GetModuleName())
for pTimer := range pModule.mapActiveTimer {
pTimer.Cancel()
}
@@ -278,18 +278,18 @@ func (m *Module) SafeNewTicker(tickerId *uint64, d time.Duration, AdditionData i
func (m *Module) CancelTimerId(timerId *uint64) bool {
if timerId==nil || *timerId == 0 {
log.SWarning("timerId is invalid")
log.Warning("timerId is invalid")
return false
}
if m.mapActiveIdTimer == nil {
log.SError("mapActiveIdTimer is nil")
log.Error("mapActiveIdTimer is nil")
return false
}
t, ok := m.mapActiveIdTimer[*timerId]
if ok == false {
log.SStack("cannot find timer id ", timerId)
log.Stack("cannot find timer id ", log.Uint64("timerId",*timerId))
return false
}

View File

@@ -93,7 +93,7 @@ func (s *Service) OnSetup(iService IService){
func (s *Service) OpenProfiler() {
s.profiler = profiler.RegProfiler(s.GetName())
if s.profiler==nil {
log.SFatal("rofiler.RegProfiler ",s.GetName()," fail.")
log.Fatal("rofiler.RegProfiler "+s.GetName()+" fail.")
}
}
@@ -128,7 +128,7 @@ func (s *Service) Start() {
s.wg.Add(1)
waitRun.Add(1)
go func(){
log.SRelease(s.GetName()," service is running",)
log.Info(s.GetName()+" service is running",)
waitRun.Done()
s.Run()
}()
@@ -158,12 +158,12 @@ func (s *Service) Run() {
case event.ServiceRpcRequestEvent:
cEvent,ok := ev.(*event.Event)
if ok == false {
log.SError("Type event conversion error")
log.Error("Type event conversion error")
break
}
rpcRequest,ok := cEvent.Data.(*rpc.RpcRequest)
if ok == false {
log.SError("Type *rpc.RpcRequest conversion error")
log.Error("Type *rpc.RpcRequest conversion error")
break
}
if s.profiler!=nil {
@@ -179,12 +179,12 @@ func (s *Service) Run() {
case event.ServiceRpcResponseEvent:
cEvent,ok := ev.(*event.Event)
if ok == false {
log.SError("Type event conversion error")
log.Error("Type event conversion error")
break
}
rpcResponseCB,ok := cEvent.Data.(*rpc.Call)
if ok == false {
log.SError("Type *rpc.Call conversion error")
log.Error("Type *rpc.Call conversion error")
break
}
if s.profiler!=nil {
@@ -242,7 +242,7 @@ func (s *Service) Release(){
buf := make([]byte, 4096)
l := runtime.Stack(buf, false)
errString := fmt.Sprint(r)
log.SError("core dump info[",errString,"]\n",string(buf[:l]))
log.Dump(string(buf[:l]),log.String("error",errString))
}
}()
@@ -257,10 +257,10 @@ func (s *Service) OnInit() error {
}
func (s *Service) Stop(){
log.SRelease("stop ",s.GetName()," service ")
log.Info("stop "+s.GetName()+" service ")
close(s.closeSig)
s.wg.Wait()
log.SRelease(s.GetName()," service has been stopped")
log.Info(s.GetName()+" service has been stopped")
}
func (s *Service) GetServiceCfg()interface{}{
@@ -353,7 +353,7 @@ func (s *Service) PushEvent(ev event.IEvent) error{
func (s *Service) pushEvent(ev event.IEvent) error{
if len(s.chanEvent) >= maxServiceEventChannelNum {
err := errors.New("The event channel in the service is full")
log.SError(err.Error())
log.Error(err.Error())
return err
}
@@ -380,7 +380,7 @@ func (s *Service) SetEventChannelNum(num int){
func (s *Service) SetGoRoutineNum(goroutineNum int32) bool {
//已经开始状态不允许修改协程数量,打开性能分析器不允许开多线程
if s.startStatus == true || s.profiler!=nil {
log.SError("open profiler mode is not allowed to set Multi-coroutine.")
log.Error("open profiler mode is not allowed to set Multi-coroutine.")
return false
}

View File

@@ -1,4 +1,4 @@
package mysqlmondule
package mysqlmodule
import (
"database/sql"

View File

@@ -1,4 +1,4 @@
package mysqlmondule
package mysqlmodule
import (
"testing"

View File

@@ -104,11 +104,11 @@ func (cs *CustomerSubscriber) UnSubscribe() {
func (cs *CustomerSubscriber) LoadLastIndex() {
for {
if atomic.LoadInt32(&cs.isStop) != 0 {
log.SRelease("topic ", cs.topic, " out of subscription")
log.Info("topic ", cs.topic, " out of subscription")
break
}
log.SRelease("customer ", cs.customerId, " start load last index ")
log.Info("customer ", cs.customerId, " start load last index ")
lastIndex, ret := cs.subscriber.dataPersist.LoadCustomerIndex(cs.topic, cs.customerId)
if ret == true {
if lastIndex > 0 {
@@ -116,18 +116,18 @@ func (cs *CustomerSubscriber) LoadLastIndex() {
} else {
//否则直接使用客户端发回来的
}
log.SRelease("customer ", cs.customerId, " load finish,start index is ", cs.StartIndex)
log.Info("customer ", cs.customerId, " load finish,start index is ", cs.StartIndex)
break
}
log.SRelease("customer ", cs.customerId, " load last index is fail...")
log.Info("customer ", cs.customerId, " load last index is fail...")
time.Sleep(5 * time.Second)
}
}
func (cs *CustomerSubscriber) SubscribeRun() {
defer cs.subscriber.queueWait.Done()
log.SRelease("topic ", cs.topic, " start subscription")
log.Info("topic ", cs.topic, " start subscription")
//加载之前的位置
if cs.subscribeMethod == MethodLast {
@@ -136,7 +136,7 @@ func (cs *CustomerSubscriber) SubscribeRun() {
for {
if atomic.LoadInt32(&cs.isStop) != 0 {
log.SRelease("topic ", cs.topic, " out of subscription")
log.Info("topic ", cs.topic, " out of subscription")
break
}
@@ -146,14 +146,14 @@ func (cs *CustomerSubscriber) SubscribeRun() {
//todo 检测退出
if cs.subscribe() == false {
log.SRelease("topic ", cs.topic, " out of subscription")
log.Info("topic ", cs.topic, " out of subscription")
break
}
}
//删除订阅关系
cs.subscriber.removeCustomer(cs.customerId, cs)
log.SRelease("topic ", cs.topic, " unsubscription")
log.Info("topic ", cs.topic, " unsubscription")
}
func (cs *CustomerSubscriber) subscribe() bool {

View File

@@ -63,7 +63,7 @@ func (ms *MessageQueueService) ReadCfg() error {
maxProcessTopicBacklogNum, ok := mapDBServiceCfg["MaxProcessTopicBacklogNum"]
if ok == false {
ms.maxProcessTopicBacklogNum = DefaultMaxTopicBacklogNum
log.SRelease("MaxProcessTopicBacklogNum config is set to the default value of ", maxProcessTopicBacklogNum)
log.Info("MaxProcessTopicBacklogNum config is set to the default value of ", maxProcessTopicBacklogNum)
} else {
ms.maxProcessTopicBacklogNum = int32(maxProcessTopicBacklogNum.(float64))
}
@@ -71,7 +71,7 @@ func (ms *MessageQueueService) ReadCfg() error {
memoryQueueLen, ok := mapDBServiceCfg["MemoryQueueLen"]
if ok == false {
ms.memoryQueueLen = DefaultMemoryQueueLen
log.SRelease("MemoryQueueLen config is set to the default value of ", DefaultMemoryQueueLen)
log.Info("MemoryQueueLen config is set to the default value of ", DefaultMemoryQueueLen)
} else {
ms.memoryQueueLen = int32(memoryQueueLen.(float64))
}

View File

@@ -56,9 +56,9 @@ func (ss *Subscriber) TopicSubscribe(rpcHandler rpc.IRpcHandler, subScribeType r
}
if ok == true {
log.SRelease("repeat subscription for customer ", customerId)
log.Info("repeat subscription for customer ", customerId)
} else {
log.SRelease("subscription for customer ", customerId)
log.Info("subscription for customer ", customerId)
}
}

View File

@@ -93,7 +93,7 @@ func (tr *TopicRoom) Stop() {
func (tr *TopicRoom) topicRoomRun() {
defer tr.queueWait.Done()
log.SRelease("topic room ", tr.topic, " is running..")
log.Info("topic room ", tr.topic, " is running..")
for {
if atomic.LoadInt32(&tr.isStop) != 0 {
break
@@ -145,5 +145,5 @@ func (tr *TopicRoom) topicRoomRun() {
}
tr.customerLocker.Unlock()
log.SRelease("topic room ", tr.topic, " is stop")
log.Info("topic room ", tr.topic, " is stop")
}

View File

@@ -142,13 +142,13 @@ func (mp *MongoPersist) OnSetupRank(manual bool,rankSkip *RankSkip) error{
return nil
}
log.SRelease("start load rank ",rankSkip.GetRankName()," from mongodb.")
log.Info("start load rank ",rankSkip.GetRankName()," from mongodb.")
err := mp.loadFromDB(rankSkip.GetRankID(),rankSkip.GetRankName())
if err != nil {
log.SError("load from db is fail :%s",err.Error())
return err
}
log.SRelease("finish load rank ",rankSkip.GetRankName()," from mongodb.")
log.Info("finish load rank ",rankSkip.GetRankName()," from mongodb.")
return nil
}
@@ -297,7 +297,7 @@ func (mp *MongoPersist) saveToDB(){
buf := make([]byte, 4096)
l := runtime.Stack(buf, false)
errString := fmt.Sprint(r)
log.SError(" Core dump info[", errString, "]\n", string(buf[:l]))
log.Dump(string(buf[:l]),log.String("error",errString))
}
}()

View File

@@ -8,10 +8,11 @@ import (
"github.com/duanhf2012/origin/network/processor"
"github.com/duanhf2012/origin/node"
"github.com/duanhf2012/origin/service"
"sync/atomic"
"sync"
"time"
"github.com/duanhf2012/origin/util/bytespool"
"runtime"
"sync"
"sync/atomic"
"time"
)
type TcpService struct {
@@ -170,7 +171,7 @@ func (slf *Client) Run() {
buf := make([]byte, 4096)
l := runtime.Stack(buf, false)
errString := fmt.Sprint(r)
log.SError("core dump info[",errString,"]\n",string(buf[:l]))
log.Dump(string(buf[:l]),log.String("error",errString))
}
}()
@@ -183,7 +184,7 @@ func (slf *Client) Run() {
slf.tcpConn.SetReadDeadline(slf.tcpService.tcpServer.ReadDeadline)
bytes,err := slf.tcpConn.ReadMsg()
if err != nil {
log.SDebug("read client id ",slf.id," is error:",err.Error())
log.Debug("read client failed",log.ErrorAttr("error",err),log.Uint64("clientId",slf.id))
break
}
data,err:=slf.tcpService.process.Unmarshal(slf.id,bytes)
@@ -277,14 +278,14 @@ func (tcpService *TcpService) GetConnNum() int {
return connNum
}
func (server *TcpService) SetNetMempool(mempool network.INetMempool){
func (server *TcpService) SetNetMempool(mempool bytespool.IBytesMempool){
server.tcpServer.SetNetMempool(mempool)
}
func (server *TcpService) GetNetMempool() network.INetMempool{
func (server *TcpService) GetNetMempool() bytespool.IBytesMempool {
return server.tcpServer.GetNetMempool()
}
func (server *TcpService) ReleaseNetMem(byteBuff []byte) {
server.tcpServer.GetNetMempool().ReleaseByteSlice(byteBuff)
server.tcpServer.GetNetMempool().ReleaseBytes(byteBuff)
}

View File

@@ -1,12 +1,12 @@
package network
package bytespool
import (
"sync"
)
type INetMempool interface {
MakeByteSlice(size int) []byte
ReleaseByteSlice(byteBuff []byte) bool
type IBytesMempool interface {
MakeBytes(size int) []byte
ReleaseBytes(byteBuff []byte) bool
}
type memAreaPool struct {
@@ -68,7 +68,7 @@ func (areaPool *memAreaPool) releaseByteSlice(byteBuff []byte) bool {
return true
}
func (areaPool *memAreaPool) MakeByteSlice(size int) []byte {
func (areaPool *memAreaPool) MakeBytes(size int) []byte {
for i := 0; i < len(memAreaPoolList); i++ {
if size <= memAreaPoolList[i].maxAreaValue {
return memAreaPoolList[i].makeByteSlice(size)
@@ -78,7 +78,7 @@ func (areaPool *memAreaPool) MakeByteSlice(size int) []byte {
return make([]byte, size)
}
func (areaPool *memAreaPool) ReleaseByteSlice(byteBuff []byte) bool {
func (areaPool *memAreaPool) ReleaseBytes(byteBuff []byte) bool {
for i := 0; i < len(memAreaPoolList); i++ {
if cap(byteBuff) <= memAreaPoolList[i].maxAreaValue {
return memAreaPoolList[i].releaseByteSlice(byteBuff)

View File

@@ -42,9 +42,9 @@ func Abs[NumType SignedNumberType](Num NumType) NumType {
func Add[NumType NumberType](number1 NumType, number2 NumType) NumType {
ret := number1 + number2
if number2> 0 && ret < number1 {
log.SStack("Calculation overflow , number1 is ",number1," number2 is ",number2)
log.Stack("Calculation overflow" ,log.Any("number1",number1),log.Any("number2",number2))
}else if (number2<0 && ret > number1){
log.SStack("Calculation overflow , number1 is ",number1," number2 is ",number2)
log.Stack("Calculation overflow" ,log.Any("number1",number1),log.Any("number2",number2))
}
return ret
@@ -53,9 +53,9 @@ func Add[NumType NumberType](number1 NumType, number2 NumType) NumType {
func Sub[NumType NumberType](number1 NumType, number2 NumType) NumType {
ret := number1 - number2
if number2> 0 && ret > number1 {
log.SStack("Calculation overflow , number1 is ",number1," number2 is ",number2)
log.Stack("Calculation overflow" ,log.Any("number1",number1),log.Any("number2",number2))
}else if (number2<0 && ret < number1){
log.SStack("Calculation overflow , number1 is ",number1," number2 is ",number2)
log.Stack("Calculation overflow" ,log.Any("number1",number1),log.Any("number2",number2))
}
return ret
@@ -71,8 +71,8 @@ func Mul[NumType NumberType](number1 NumType, number2 NumType) NumType {
if ret / number2 == number1 {
return ret
}
log.SStack("Calculation overflow , number1 is ",number1," number2 is ",number2)
log.Stack("Calculation overflow" ,log.Any("number1",number1),log.Any("number2",number2))
return ret
}

View File

@@ -132,7 +132,7 @@ func (t *Timer) Do() {
buf := make([]byte, 4096)
l := runtime.Stack(buf, false)
errString := fmt.Sprint(r)
log.SError("core dump info[", errString, "]\n", string(buf[:l]))
log.Dump(string(buf[:l]),log.String("error",errString))
}
}()
@@ -218,7 +218,7 @@ func (c *Cron) Do() {
buf := make([]byte, 4096)
l := runtime.Stack(buf, false)
errString := fmt.Sprint(r)
log.SError("core dump info[", errString, "]\n", string(buf[:l]))
log.Dump(string(buf[:l]),log.String("error",errString))
}
}()
@@ -274,7 +274,7 @@ func (c *Ticker) Do() {
buf := make([]byte, 4096)
l := runtime.Stack(buf, false)
errString := fmt.Sprint(r)
log.SError("core dump info[", errString, "]\n", string(buf[:l]))
log.Dump(string(buf[:l]),log.String("error",errString))
}
}()