Compare commits

...

11 Commits

Author SHA1 Message Date
duanhf2012
6de25d1c6d 优化rankservice错误返回 2023-05-09 14:34:33 +08:00
duanhf2012
b392617d6e 优化性能监控与rankservice持久化 2023-05-09 14:06:17 +08:00
duanhf2012
92fdb7860c 优化本地node中的服务rpc 2023-05-04 17:53:42 +08:00
duanhf2012
f78d0d58be 优化rpc与rankservice持久化 2023-05-04 17:35:40 +08:00
duanhf2012
5675681ab1 优化concurrent与rpc模块 2023-05-04 14:21:29 +08:00
duanhf2012
ddeaaf7d77 优化concurrent模块 2023-04-11 10:29:06 +08:00
duanhf2012
1174b47475 IService接口新增扩展IConcurrent 2023-04-04 16:36:05 +08:00
duanhf2012
18fff3b567 优化concurrent模块,新增返回值控制是否回调 2023-03-31 15:12:27 +08:00
duanhf2012
7ab6c88f9c 整理优化rpc 2023-03-23 10:06:41 +08:00
duanhf2012
6b64de06a2 优化增加TcpService的包长度字段配置 2023-03-22 14:59:22 +08:00
duanhf2012
95b153f8cf 优化network包长度字段自动计算 2023-03-20 15:20:04 +08:00
20 changed files with 206 additions and 139 deletions

View File

@@ -767,34 +767,57 @@ func (slf *TestService7) GoTest(){
//slf.OpenConcurrent(5, 10, 1000000)
```
普通调用可以使用以下方法:
使用示例如下:
```
func (slf *TestService1) testAsyncDo() {
func (slf *TestService13) testAsyncDo() {
var context struct {
data int64
}
slf.AsyncDo(func() {
//1.示例普通使用
//参数一的函数在其他协程池中执行完成,将执行完成事件放入服务工作协程,
//参数二的函数在服务协程中执行,是协程安全的。
slf.AsyncDo(func() bool {
//该函数回调在协程池中执行
context.data = 100
return true
}, func(err error) {
//函数将在服务协程中执行
fmt.Print(context.data) //显示100
})
}
```
以下方法将函数扔到任务管道中,由协程池去抢执行。但某些任务是由先后顺序的,可以使用以下方法:
```
func (slf *TestService1) testAsyncDoByQueue() {
queueId := int64(1)
//2.示例按队列顺序
//参数一传入队列Id,同一个队列Id将在协程池中被排队执行
//以下进行两次调用因为两次都传入参数queueId都为1所以它们会都进入queueId为1的排队执行
queueId := int64(1)
for i := 0; i < 2; i++ {
slf.AsyncDoByQueue(queueId, func() {
slf.AsyncDoByQueue(queueId, func() bool {
//该函数会被2次调用但是会排队执行
return true
}, func(err error) {
//函数将在服务协程中执行
})
}
//3.函数参数可以某中一个为空
//参数二函数将被延迟执行
slf.AsyncDo(nil, func(err error) {
//将在下
})
//参数一函数在协程池中执行,但没有在服务协程中回调
slf.AsyncDo(func() bool {
return true
}, nil)
//4.函数返回值控制不进行回调
slf.AsyncDo(func() bool {
//返回false时参数二函数将不会被执行; 为true时则会被执行
return false
}, func(err error) {
//该函数将不会被执行
})
}
```

View File

@@ -12,8 +12,8 @@ const defaultMaxTaskChannelNum = 1000000
type IConcurrent interface {
OpenConcurrentByNumCPU(cpuMul float32)
OpenConcurrent(minGoroutineNum int32, maxGoroutineNum int32, maxTaskChannelNum int)
AsyncDoByQueue(queueId int64, fn func(), cb func(err error))
AsyncDo(f func(), cb func(err error))
AsyncDoByQueue(queueId int64, fn func() bool, cb func(err error))
AsyncDo(f func() bool, cb func(err error))
}
type Concurrent struct {
@@ -40,11 +40,11 @@ func (c *Concurrent) OpenConcurrent(minGoroutineNum int32, maxGoroutineNum int32
c.dispatch.open(minGoroutineNum, maxGoroutineNum, c.tasks, c.cbChannel)
}
func (c *Concurrent) AsyncDo(f func(), cb func(err error)) {
func (c *Concurrent) AsyncDo(f func() bool, cb func(err error)) {
c.AsyncDoByQueue(0, f, cb)
}
func (c *Concurrent) AsyncDoByQueue(queueId int64, fn func(), cb func(err error)) {
func (c *Concurrent) AsyncDoByQueue(queueId int64, fn func() bool, cb func(err error)) {
if cap(c.tasks) == 0 {
panic("not open concurrent")
}
@@ -54,16 +54,6 @@ func (c *Concurrent) AsyncDoByQueue(queueId int64, fn func(), cb func(err error)
return
}
if len(c.tasks) > cap(c.tasks) {
log.SError("tasks channel is full")
if cb != nil {
c.pushAsyncDoCallbackEvent(func(err error) {
cb(errors.New("tasks channel is full"))
})
}
return
}
if fn == nil {
c.pushAsyncDoCallbackEvent(cb)
return
@@ -75,6 +65,14 @@ func (c *Concurrent) AsyncDoByQueue(queueId int64, fn func(), cb func(err error)
select {
case c.tasks <- task{queueId, fn, cb}:
default:
log.SError("tasks channel is full")
if cb != nil {
c.pushAsyncDoCallbackEvent(func(err error) {
cb(errors.New("tasks channel is full"))
})
}
return
}
}

View File

@@ -12,7 +12,7 @@ import (
"github.com/duanhf2012/origin/util/queue"
)
var idleTimeout = 2 * time.Second
var idleTimeout = int64(2 * time.Second)
const maxTaskQueueSessionId = 10000
type dispatch struct {
@@ -47,7 +47,7 @@ func (d *dispatch) open(minGoroutineNum int32, maxGoroutineNum int32, tasks chan
func (d *dispatch) run() {
defer d.waitDispatch.Done()
timeout := time.NewTimer(idleTimeout)
timeout := time.NewTimer(time.Duration(atomic.LoadInt64(&idleTimeout)))
for {
select {
@@ -65,9 +65,9 @@ func (d *dispatch) run() {
case <-timeout.C:
d.processTimer()
if atomic.LoadInt32(&d.minConcurrentNum) == -1 && len(d.tasks) == 0 {
idleTimeout = time.Millisecond * 10
atomic.StoreInt64(&idleTimeout,int64(time.Millisecond * 10))
}
timeout.Reset(idleTimeout)
timeout.Reset(time.Duration(atomic.LoadInt64(&idleTimeout)))
}
}
@@ -80,7 +80,7 @@ func (d *dispatch) run() {
}
func (d *dispatch) processTimer() {
if d.idle == true && d.workerNum > d.minConcurrentNum {
if d.idle == true && d.workerNum > atomic.LoadInt32(&d.minConcurrentNum) {
d.processIdle()
}

View File

@@ -12,7 +12,7 @@ import (
type task struct {
queueId int64
fn func()
fn func() bool
cb func(err error)
}
@@ -60,17 +60,18 @@ func (w *worker) exec(t *task) {
cb(errors.New(errString))
}
w.endCallFun(t)
w.endCallFun(true,t)
log.SError("core dump info[", errString, "]\n", string(buf[:l]))
}
}()
t.fn()
w.endCallFun(t)
w.endCallFun(t.fn(),t)
}
func (w *worker) endCallFun(t *task) {
w.pushAsyncDoCallbackEvent(t.cb)
func (w *worker) endCallFun(isDocallBack bool,t *task) {
if isDocallBack {
w.pushAsyncDoCallbackEvent(t.cb)
}
if t.queueId != 0 {
w.pushQueueTaskFinishEvent(t.queueId)

View File

@@ -64,20 +64,24 @@ func (client *TCPClient) init() {
if client.cons != nil {
log.SFatal("client is running")
}
if client.LenMsgLen == 0 {
client.LenMsgLen = Default_LenMsgLen
}
if client.MinMsgLen == 0 {
client.MinMsgLen = Default_MinMsgLen
}
if client.MaxMsgLen == 0 {
client.MaxMsgLen = Default_MaxMsgLen
}
if client.LenMsgLen ==0 {
client.LenMsgLen = Default_LenMsgLen
}
maxMsgLen := client.MsgParser.getMaxMsgLen(client.LenMsgLen)
if client.MaxMsgLen > maxMsgLen {
client.MaxMsgLen = maxMsgLen
log.SRelease("invalid MaxMsgLen, reset to ", maxMsgLen)
}
client.cons = make(ConnSet)
client.closeFlag = false
// msg parser
client.MsgParser.init()
}

View File

@@ -1,11 +1,12 @@
package network
import (
"errors"
"github.com/duanhf2012/origin/log"
"net"
"sync"
"sync/atomic"
"time"
"errors"
)
type ConnSet map[net.Conn]struct{}
@@ -14,7 +15,7 @@ type TCPConn struct {
sync.Mutex
conn net.Conn
writeChan chan []byte
closeFlag bool
closeFlag int32
msgParser *MsgParser
}
@@ -49,7 +50,7 @@ func newTCPConn(conn net.Conn, pendingWriteNum int, msgParser *MsgParser,writeDe
conn.Close()
tcpConn.Lock()
freeChannel(tcpConn)
tcpConn.closeFlag = true
atomic.StoreInt32(&tcpConn.closeFlag,1)
tcpConn.Unlock()
}()
@@ -60,9 +61,9 @@ func (tcpConn *TCPConn) doDestroy() {
tcpConn.conn.(*net.TCPConn).SetLinger(0)
tcpConn.conn.Close()
if !tcpConn.closeFlag {
if atomic.LoadInt32(&tcpConn.closeFlag)==0 {
close(tcpConn.writeChan)
tcpConn.closeFlag = true
atomic.StoreInt32(&tcpConn.closeFlag,1)
}
}
@@ -76,12 +77,12 @@ func (tcpConn *TCPConn) Destroy() {
func (tcpConn *TCPConn) Close() {
tcpConn.Lock()
defer tcpConn.Unlock()
if tcpConn.closeFlag {
if atomic.LoadInt32(&tcpConn.closeFlag)==1 {
return
}
tcpConn.doWrite(nil)
tcpConn.closeFlag = true
atomic.StoreInt32(&tcpConn.closeFlag,1)
}
func (tcpConn *TCPConn) GetRemoteIp() string {
@@ -104,7 +105,7 @@ func (tcpConn *TCPConn) doWrite(b []byte) error{
func (tcpConn *TCPConn) Write(b []byte) error{
tcpConn.Lock()
defer tcpConn.Unlock()
if tcpConn.closeFlag || b == nil {
if atomic.LoadInt32(&tcpConn.closeFlag)==1 || b == nil {
tcpConn.ReleaseReadMsg(b)
return errors.New("conn is close")
}
@@ -133,14 +134,14 @@ func (tcpConn *TCPConn) ReleaseReadMsg(byteBuff []byte){
}
func (tcpConn *TCPConn) WriteMsg(args ...[]byte) error {
if tcpConn.closeFlag == true {
if atomic.LoadInt32(&tcpConn.closeFlag) == 1 {
return errors.New("conn is close")
}
return tcpConn.msgParser.Write(tcpConn, args...)
}
func (tcpConn *TCPConn) WriteRawMsg(args []byte) error {
if tcpConn.closeFlag == true {
if atomic.LoadInt32(&tcpConn.closeFlag) == 1 {
return errors.New("conn is close")
}
@@ -149,7 +150,7 @@ func (tcpConn *TCPConn) WriteRawMsg(args []byte) error {
func (tcpConn *TCPConn) IsConnected() bool {
return tcpConn.closeFlag == false
return atomic.LoadInt32(&tcpConn.closeFlag) == 0
}
func (tcpConn *TCPConn) SetReadDeadline(d time.Duration) {

View File

@@ -20,30 +20,22 @@ type MsgParser struct {
}
func (p *MsgParser) init(){
var max uint32
func (p *MsgParser) getMaxMsgLen(lenMsgLen int) uint32 {
switch p.LenMsgLen {
case 1:
max = math.MaxUint8
return math.MaxUint8
case 2:
max = math.MaxUint16
return math.MaxUint16
case 4:
max = math.MaxUint32
return math.MaxUint32
default:
panic("LenMsgLen value must be 1 or 2 or 4")
}
if p.MinMsgLen > max {
p.MinMsgLen = max
}
if p.MaxMsgLen > max {
p.MaxMsgLen = max
}
p.INetMempool = NewMemAreaPool()
}
func (p *MsgParser) init(){
p.INetMempool = NewMemAreaPool()
}
// goroutine safe
func (p *MsgParser) Read(conn *TCPConn) ([]byte, error) {

View File

@@ -7,14 +7,16 @@ import (
"time"
)
const Default_ReadDeadline = time.Second*30 //30s
const Default_WriteDeadline = time.Second*30 //30s
const Default_MaxConnNum = 9000
const Default_PendingWriteNum = 10000
const Default_LittleEndian = false
const Default_MinMsgLen = 2
const Default_MaxMsgLen = 65535
const Default_LenMsgLen = 2
const(
Default_ReadDeadline = time.Second*30 //默认读超时30s
Default_WriteDeadline = time.Second*30 //默认写超时30s
Default_MaxConnNum = 1000000 //默认最大连接数
Default_PendingWriteNum = 100000 //单连接写消息Channel容量
Default_LittleEndian = false //默认大小端
Default_MinMsgLen = 2 //最小消息长度2byte
Default_LenMsgLen = 2 //包头字段长度占用2byte
Default_MaxMsgLen = 65535 //最大消息长度
)
type TCPServer struct {
Addr string
@@ -29,8 +31,7 @@ type TCPServer struct {
mutexConns sync.Mutex
wgLn sync.WaitGroup
wgConns sync.WaitGroup
// msg parser
MsgParser
}
@@ -49,14 +50,15 @@ func (server *TCPServer) init() {
server.MaxConnNum = Default_MaxConnNum
log.SRelease("invalid MaxConnNum, reset to ", server.MaxConnNum)
}
if server.PendingWriteNum <= 0 {
server.PendingWriteNum = Default_PendingWriteNum
log.SRelease("invalid PendingWriteNum, reset to ", server.PendingWriteNum)
}
if server.MinMsgLen <= 0 {
server.MinMsgLen = Default_MinMsgLen
log.SRelease("invalid MinMsgLen, reset to ", server.MinMsgLen)
if server.LenMsgLen <= 0 {
server.LenMsgLen = Default_LenMsgLen
log.SRelease("invalid LenMsgLen, reset to ", server.LenMsgLen)
}
if server.MaxMsgLen <= 0 {
@@ -64,6 +66,17 @@ func (server *TCPServer) init() {
log.SRelease("invalid MaxMsgLen, reset to ", server.MaxMsgLen)
}
maxMsgLen := server.MsgParser.getMaxMsgLen(server.LenMsgLen)
if server.MaxMsgLen > maxMsgLen {
server.MaxMsgLen = maxMsgLen
log.SRelease("invalid MaxMsgLen, reset to ", maxMsgLen)
}
if server.MinMsgLen <= 0 {
server.MinMsgLen = Default_MinMsgLen
log.SRelease("invalid MinMsgLen, reset to ", server.MinMsgLen)
}
if server.WriteDeadline == 0 {
server.WriteDeadline = Default_WriteDeadline
log.SRelease("invalid WriteDeadline, reset to ", server.WriteDeadline.Seconds(),"s")
@@ -74,18 +87,12 @@ func (server *TCPServer) init() {
log.SRelease("invalid ReadDeadline, reset to ", server.ReadDeadline.Seconds(),"s")
}
if server.LenMsgLen == 0 {
server.LenMsgLen = Default_LenMsgLen
}
if server.NewAgent == nil {
log.SFatal("NewAgent must not be nil")
}
server.ln = ln
server.conns = make(ConnSet)
server.INetMempool = NewMemAreaPool()
server.MsgParser.init()
}

View File

@@ -295,9 +295,9 @@ func GetService(serviceName string) service.IService {
return service.GetService(serviceName)
}
func SetConfigDir(configDir string) {
configDir = configDir
cluster.SetConfigDir(configDir)
func SetConfigDir(cfgDir string) {
configDir = cfgDir
cluster.SetConfigDir(cfgDir)
}
func GetConfigDir() string {

View File

@@ -193,9 +193,11 @@ func Report() {
record = prof.record
prof.record = list.New()
callNum := prof.callNum
totalCostTime := prof.totalCostTime
prof.stackLocker.RUnlock()
DefaultReportFunction(name,prof.callNum,prof.totalCostTime,record)
DefaultReportFunction(name,callNum,totalCostTime,record)
}
}

View File

@@ -11,14 +11,19 @@ import (
"time"
)
const MaxCheckCallRpcCount = 1000
const MaxPendingWriteNum = 200000
const ConnectInterval = 2*time.Second
const RpcConnNum = 1
const RpcLenMsgLen = 4
const RpcMinMsgLen = 2
const CheckRpcCallTimeoutInterval = 5*time.Second
const DefaultRpcTimeout = 15*time.Second
const(
DefaultRpcConnNum = 1
DefaultRpcLenMsgLen = 4
DefaultRpcMinMsgLen = 2
DefaultMaxCheckCallRpcCount = 1000
DefaultMaxPendingWriteNum = 200000
DefaultConnectInterval = 2*time.Second
DefaultCheckRpcCallTimeoutInterval = 5*time.Second
DefaultRpcTimeout = 15*time.Second
)
var clientSeq uint32
type IRealClient interface {
@@ -64,7 +69,7 @@ func (bc *Client) makeCallFail(call *Call) {
func (bc *Client) checkRpcCallTimeout() {
for{
time.Sleep(CheckRpcCallTimeoutInterval)
time.Sleep(DefaultCheckRpcCallTimeoutInterval)
now := time.Now()
for i := 0; i < bc.maxCheckCallRpcCount; i++ {

View File

@@ -44,7 +44,8 @@ func (lc *LClient) Go(rpcHandler IRpcHandler,noReply bool, serviceMethod string,
sErr := errors.New("Call serviceMethod " + serviceMethod + " is error!")
log.SError(sErr.Error())
call := MakeCall()
call.Err = sErr
call.DoError(sErr)
return call
}
@@ -53,12 +54,13 @@ func (lc *LClient) Go(rpcHandler IRpcHandler,noReply bool, serviceMethod string,
//调用自己rpcHandler处理器
err := pLocalRpcServer.myselfRpcHandlerGo(lc.selfClient,serviceName, serviceMethod, args, requestHandlerNull,reply)
call := MakeCall()
if err != nil {
call.Err = err
call.DoError(err)
return call
}
call.done<-call
call.DoOK()
return call
}
@@ -119,7 +121,7 @@ func NewLClient(nodeId int) *Client{
client := &Client{}
client.clientId = atomic.AddUint32(&clientSeq, 1)
client.nodeId = nodeId
client.maxCheckCallRpcCount = MaxCheckCallRpcCount
client.maxCheckCallRpcCount = DefaultMaxCheckCallRpcCount
client.callRpcTimeout = DefaultRpcTimeout
lClient := &LClient{}

View File

@@ -44,8 +44,9 @@ func (rc *RClient) Go(rpcHandler IRpcHandler,noReply bool, serviceMethod string,
_, processor := GetProcessorType(args)
InParam, err := processor.Marshal(args)
if err != nil {
log.SError(err.Error())
call := MakeCall()
call.Err = err
call.DoError(err)
return call
}
@@ -65,14 +66,17 @@ func (rc *RClient) RawGo(rpcHandler IRpcHandler,processor IRpcProcessor, noReply
if err != nil {
call.Seq = 0
call.Err = err
log.SError(err.Error())
call.DoError(err)
return call
}
conn := rc.GetConn()
if conn == nil || conn.IsConnected()==false {
call.Seq = 0
call.Err = errors.New(serviceMethod + " was called failed,rpc client is disconnect")
sErr := errors.New(serviceMethod + " was called failed,rpc client is disconnect")
log.SError(sErr.Error())
call.DoError(sErr)
return call
}
@@ -83,8 +87,11 @@ func (rc *RClient) RawGo(rpcHandler IRpcHandler,processor IRpcProcessor, noReply
err = conn.WriteMsg([]byte{uint8(processor.GetProcessorType())}, bytes)
if err != nil {
rc.selfClient.RemovePending(call.Seq)
log.SError(err.Error())
call.Seq = 0
call.Err = err
call.DoError(err)
}
return call
@@ -211,19 +218,19 @@ func NewRClient(nodeId int, addr string, maxRpcParamLen uint32,triggerRpcConnEve
client := &Client{}
client.clientId = atomic.AddUint32(&clientSeq, 1)
client.nodeId = nodeId
client.maxCheckCallRpcCount = MaxCheckCallRpcCount
client.maxCheckCallRpcCount = DefaultMaxCheckCallRpcCount
client.callRpcTimeout = DefaultRpcTimeout
c:= &RClient{}
c.selfClient = client
c.Addr = addr
c.ConnectInterval = ConnectInterval
c.PendingWriteNum = MaxPendingWriteNum
c.ConnectInterval = DefaultConnectInterval
c.PendingWriteNum = DefaultMaxPendingWriteNum
c.AutoReconnect = true
c.TriggerRpcConnEvent = triggerRpcConnEvent
c.ConnNum = RpcConnNum
c.LenMsgLen = RpcLenMsgLen
c.MinMsgLen = RpcMinMsgLen
c.ConnNum = DefaultRpcConnNum
c.LenMsgLen = DefaultRpcLenMsgLen
c.MinMsgLen = DefaultRpcMinMsgLen
c.ReadDeadline = Default_ReadWriteDeadline
c.WriteDeadline = Default_ReadWriteDeadline
c.LittleEndian = LittleEndian

View File

@@ -102,6 +102,15 @@ func (rpcResponse *RpcResponse) Clear() *RpcResponse{
return rpcResponse
}
func (call *Call) DoError(err error){
call.Err = err
call.done <- call
}
func (call *Call) DoOK(){
call.done <- call
}
func (call *Call) Clear() *Call{
call.Seq = 0
call.ServiceMethod = ""

View File

@@ -291,14 +291,16 @@ func (handler *RpcHandler) HandlerRpcRequest(request *RpcRequest) {
request.requestHandle(nil, RpcError(rErr))
return
}
requestHanle := request.requestHandle
returnValues := v.method.Func.Call(paramList)
errInter := returnValues[0].Interface()
if errInter != nil {
err = errInter.(error)
}
if request.requestHandle != nil && v.hasResponder == false {
request.requestHandle(oParam.Interface(), ConvertError(err))
if v.hasResponder == false && requestHanle != nil {
requestHanle(oParam.Interface(), ConvertError(err))
}
}
@@ -459,11 +461,6 @@ func (handler *RpcHandler) callRpc(nodeId int, serviceMethod string, args interf
pClient := pClientList[0]
pCall := pClient.Go(handler.rpcHandler,false, serviceMethod, args, reply)
if pCall.Err != nil {
err = pCall.Err
ReleaseCall(pCall)
return err
}
err = pCall.Done().Err
pClient.RemovePending(pCall.Seq)

View File

@@ -71,7 +71,6 @@ func (server *Server) Start(listenAddr string, maxRpcParamLen uint32) {
}
server.rpcServer.Addr = ":" + splitAddr[1]
server.rpcServer.LenMsgLen = 4 //uint16
server.rpcServer.MinMsgLen = 2
if maxRpcParamLen > 0 {
server.rpcServer.MaxMsgLen = maxRpcParamLen
@@ -85,6 +84,8 @@ func (server *Server) Start(listenAddr string, maxRpcParamLen uint32) {
server.rpcServer.LittleEndian = LittleEndian
server.rpcServer.WriteDeadline = Default_ReadWriteDeadline
server.rpcServer.ReadDeadline = Default_ReadWriteDeadline
server.rpcServer.LenMsgLen = DefaultRpcLenMsgLen
server.rpcServer.Start()
}
@@ -255,10 +256,10 @@ func (server *Server) selfNodeRpcHandlerGo(processor IRpcProcessor, client *Clie
rpcHandler := server.rpcHandleFinder.FindRpcHandler(handlerName)
if rpcHandler == nil {
err := errors.New("service method " + serviceMethod + " not config!")
log.SError(err.Error())
pCall.Seq = 0
pCall.Err = errors.New("service method " + serviceMethod + " not config!")
pCall.done <- pCall
log.SError(pCall.Err.Error())
pCall.DoError(err)
return pCall
}
@@ -272,10 +273,10 @@ func (server *Server) selfNodeRpcHandlerGo(processor IRpcProcessor, client *Clie
var err error
iParam,err = processor.Clone(args)
if err != nil {
sErr := errors.New("RpcHandler " + handlerName + "."+serviceMethod+" deep copy inParam is error:" + err.Error())
log.SError(sErr.Error())
pCall.Seq = 0
pCall.Err = errors.New("RpcHandler " + handlerName + "."+serviceMethod+" deep copy inParam is error:" + err.Error())
pCall.done <- pCall
log.SError(pCall.Err.Error())
pCall.DoError(sErr)
return pCall
}
@@ -288,9 +289,10 @@ func (server *Server) selfNodeRpcHandlerGo(processor IRpcProcessor, client *Clie
var err error
req.inParam, err = rpcHandler.UnmarshalInParam(processor, serviceMethod, rpcMethodId, rawArgs)
if err != nil {
log.SError(err.Error())
pCall.Seq = 0
pCall.DoError(err)
ReleaseRpcRequest(req)
pCall.Err = err
pCall.done <- pCall
return pCall
}
}
@@ -320,20 +322,22 @@ func (server *Server) selfNodeRpcHandlerGo(processor IRpcProcessor, client *Clie
return
}
if len(Err) == 0 {
v.Err = nil
v.DoOK()
} else {
v.Err = Err
log.SError(Err.Error())
v.DoError(Err)
}
v.done <- v
}
}
err := rpcHandler.PushRpcRequest(req)
if err != nil {
log.SError(err.Error())
pCall.DoError(err)
ReleaseRpcRequest(req)
pCall.Err = err
pCall.done <- pCall
}
return pCall

View File

@@ -20,6 +20,7 @@ var timerDispatcherLen = 100000
var maxServiceEventChannelNum = 2000000
type IService interface {
concurrent.IConcurrent
Init(iService IService,getClientFun rpc.FuncRpcClient,getServerFun rpc.FuncRpcServer,serviceCfg interface{})
Stop()
Start()

View File

@@ -6,9 +6,9 @@ import (
"github.com/duanhf2012/origin/rpc"
"github.com/duanhf2012/origin/service"
"github.com/duanhf2012/origin/sysmodule/mongodbmodule"
"github.com/duanhf2012/origin/util/coroutine"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo/options"
"runtime"
"sync"
"sync/atomic"
"time"
@@ -71,7 +71,9 @@ func (mp *MongoPersist) OnInit() error {
}
//开启协程
coroutine.GoRecover(mp.persistCoroutine,-1)
mp.waitGroup.Add(1)
go mp.persistCoroutine()
return nil
}
@@ -260,7 +262,6 @@ func (mp *MongoPersist) JugeTimeoutSave() bool{
}
func (mp *MongoPersist) persistCoroutine(){
mp.waitGroup.Add(1)
defer mp.waitGroup.Done()
for atomic.LoadInt32(&mp.stop)==0 || mp.hasPersistData(){
//间隔时间sleep
@@ -291,6 +292,15 @@ func (mp *MongoPersist) hasPersistData() bool{
}
func (mp *MongoPersist) saveToDB(){
defer func() {
if r := recover(); r != nil {
buf := make([]byte, 4096)
l := runtime.Stack(buf, false)
errString := fmt.Sprint(r)
log.SError(" Core dump info[", errString, "]\n", string(buf[:l]))
}
}()
//1.copy数据
mp.Lock()
mapRemoveRankData := mp.mapRemoveRankData

View File

@@ -356,12 +356,12 @@ func (rs *RankSkip) GetRankNodeDataByRank(rank uint64) (*RankData, uint64) {
// GetRankKeyPrevToLimit 获取key前count名的数据
func (rs *RankSkip) GetRankKeyPrevToLimit(findKey, count uint64, result *rpc.RankDataList) error {
if rs.GetRankLen() <= 0 {
return fmt.Errorf("rank[", rs.rankId, "] no data")
return fmt.Errorf("rank[%d] no data", rs.rankId)
}
findData, ok := rs.mapRankData[findKey]
if ok == false {
return fmt.Errorf("rank[", rs.rankId, "] no data")
return fmt.Errorf("rank[%d] no data", rs.rankId)
}
_, rankPos := rs.skipList.GetWithPosition(findData)
@@ -385,12 +385,12 @@ func (rs *RankSkip) GetRankKeyPrevToLimit(findKey, count uint64, result *rpc.Ran
// GetRankKeyPrevToLimit 获取key前count名的数据
func (rs *RankSkip) GetRankKeyNextToLimit(findKey, count uint64, result *rpc.RankDataList) error {
if rs.GetRankLen() <= 0 {
return fmt.Errorf("rank[", rs.rankId, "] no data")
return fmt.Errorf("rank[%d] no data", rs.rankId)
}
findData, ok := rs.mapRankData[findKey]
if ok == false {
return fmt.Errorf("rank[", rs.rankId, "] no data")
return fmt.Errorf("rank[%d] no data", rs.rankId)
}
_, rankPos := rs.skipList.GetWithPosition(findData)

View File

@@ -90,6 +90,10 @@ func (tcpService *TcpService) OnInit() error{
if ok == true {
tcpService.tcpServer.LittleEndian = LittleEndian.(bool)
}
LenMsgLen,ok := tcpCfg["LenMsgLen"]
if ok == true {
tcpService.tcpServer.LenMsgLen = int(LenMsgLen.(float64))
}
MinMsgLen,ok := tcpCfg["MinMsgLen"]
if ok == true {
tcpService.tcpServer.MinMsgLen = uint32(MinMsgLen.(float64))