Compare commits

...

20 Commits

Author SHA1 Message Date
duanhf2012
6de25d1c6d 优化rankservice错误返回 2023-05-09 14:34:33 +08:00
duanhf2012
b392617d6e 优化性能监控与rankservice持久化 2023-05-09 14:06:17 +08:00
duanhf2012
92fdb7860c 优化本地node中的服务rpc 2023-05-04 17:53:42 +08:00
duanhf2012
f78d0d58be 优化rpc与rankservice持久化 2023-05-04 17:35:40 +08:00
duanhf2012
5675681ab1 优化concurrent与rpc模块 2023-05-04 14:21:29 +08:00
duanhf2012
ddeaaf7d77 优化concurrent模块 2023-04-11 10:29:06 +08:00
duanhf2012
1174b47475 IService接口新增扩展IConcurrent 2023-04-04 16:36:05 +08:00
duanhf2012
18fff3b567 优化concurrent模块,新增返回值控制是否回调 2023-03-31 15:12:27 +08:00
duanhf2012
7ab6c88f9c 整理优化rpc 2023-03-23 10:06:41 +08:00
duanhf2012
6b64de06a2 优化增加TcpService的包长度字段配置 2023-03-22 14:59:22 +08:00
duanhf2012
95b153f8cf 优化network包长度字段自动计算 2023-03-20 15:20:04 +08:00
duanhf2012
f3ff09b90f 优化rpc调用错误日志
限制配置的服务必需安装
优化结点断开连接时删除结点
2023-03-17 12:09:00 +08:00
duanhf2012
f9738fb9d0 Merge branch 'master' of https://github.com/duanhf2012/origin 2023-03-06 15:57:33 +08:00
duanhf2012
91e773aa8c 补充说明RPC函数名的规范支持RPCFunctionName 2023-03-06 15:57:23 +08:00
origin
c9b96404f4 Merge pull request #873 from duanhf2012/dependabot/go_modules/golang.org/x/crypto-0.1.0
Bump golang.org/x/crypto from 0.0.0-20201216223049-8b5274cf687f to 0.1.0
2023-03-06 15:45:40 +08:00
duanhf2012
aaae63a674 新增支持RPC函数命名RPCXXX格式 2023-03-06 15:41:51 +08:00
duanhf2012
47dc21aee1 优化rpc返回参数与请求参数不一致时报错 2023-03-06 11:47:23 +08:00
dependabot[bot]
4d09532801 Bump golang.org/x/crypto from 0.0.0-20201216223049-8b5274cf687f to 0.1.0
Bumps [golang.org/x/crypto](https://github.com/golang/crypto) from 0.0.0-20201216223049-8b5274cf687f to 0.1.0.
- [Release notes](https://github.com/golang/crypto/releases)
- [Commits](https://github.com/golang/crypto/commits/v0.1.0)

---
updated-dependencies:
- dependency-name: golang.org/x/crypto
  dependency-type: indirect
...

Signed-off-by: dependabot[bot] <support@github.com>
2023-03-06 02:26:33 +00:00
origin
d3ad7fc898 Merge pull request #871 from duanhf2012/dependabot/go_modules/golang.org/x/text-0.3.8
Bump golang.org/x/text from 0.3.6 to 0.3.8
2023-03-06 10:08:56 +08:00
dependabot[bot]
ba2b0568b2 Bump golang.org/x/text from 0.3.6 to 0.3.8
Bumps [golang.org/x/text](https://github.com/golang/text) from 0.3.6 to 0.3.8.
- [Release notes](https://github.com/golang/text/releases)
- [Commits](https://github.com/golang/text/compare/v0.3.6...v0.3.8)

---
updated-dependencies:
- dependency-name: golang.org/x/text
  dependency-type: indirect
...

Signed-off-by: dependabot[bot] <support@github.com>
2023-02-23 07:30:17 +00:00
24 changed files with 256 additions and 159 deletions

View File

@@ -667,6 +667,7 @@ type InputData struct {
B int B int
} }
// 注意RPC函数名的格式必需为RPC_FunctionName或者是RPCFunctionName如下的RPC_Sum也可以写成RPCSum
func (slf *TestService6) RPC_Sum(input *InputData,output *int) error{ func (slf *TestService6) RPC_Sum(input *InputData,output *int) error{
*output = input.A+input.B *output = input.A+input.B
return nil return nil
@@ -766,34 +767,57 @@ func (slf *TestService7) GoTest(){
//slf.OpenConcurrent(5, 10, 1000000) //slf.OpenConcurrent(5, 10, 1000000)
``` ```
普通调用可以使用以下方法: 使用示例如下:
``` ```
func (slf *TestService1) testAsyncDo() {
func (slf *TestService13) testAsyncDo() {
var context struct { var context struct {
data int64 data int64
} }
slf.AsyncDo(func() {
//1.示例普通使用
//参数一的函数在其他协程池中执行完成,将执行完成事件放入服务工作协程,
//参数二的函数在服务协程中执行,是协程安全的。
slf.AsyncDo(func() bool {
//该函数回调在协程池中执行 //该函数回调在协程池中执行
context.data = 100 context.data = 100
return true
}, func(err error) { }, func(err error) {
//函数将在服务协程中执行 //函数将在服务协程中执行
fmt.Print(context.data) //显示100 fmt.Print(context.data) //显示100
}) })
}
```
以下方法将函数扔到任务管道中,由协程池去抢执行。但某些任务是由先后顺序的,可以使用以下方法:
```
func (slf *TestService1) testAsyncDoByQueue() {
queueId := int64(1)
//2.示例按队列顺序
//参数一传入队列Id,同一个队列Id将在协程池中被排队执行
//以下进行两次调用因为两次都传入参数queueId都为1所以它们会都进入queueId为1的排队执行 //以下进行两次调用因为两次都传入参数queueId都为1所以它们会都进入queueId为1的排队执行
queueId := int64(1)
for i := 0; i < 2; i++ { for i := 0; i < 2; i++ {
slf.AsyncDoByQueue(queueId, func() { slf.AsyncDoByQueue(queueId, func() bool {
//该函数会被2次调用但是会排队执行 //该函数会被2次调用但是会排队执行
return true
}, func(err error) { }, func(err error) {
//函数将在服务协程中执行 //函数将在服务协程中执行
}) })
} }
//3.函数参数可以某中一个为空
//参数二函数将被延迟执行
slf.AsyncDo(nil, func(err error) {
//将在下
})
//参数一函数在协程池中执行,但没有在服务协程中回调
slf.AsyncDo(func() bool {
return true
}, nil)
//4.函数返回值控制不进行回调
slf.AsyncDo(func() bool {
//返回false时参数二函数将不会被执行; 为true时则会被执行
return false
}, func(err error) {
//该函数将不会被执行
})
} }
``` ```

View File

@@ -60,6 +60,21 @@ func (ds *DynamicDiscoveryMaster) addNodeInfo(nodeInfo *rpc.NodeInfo) {
ds.nodeInfo = append(ds.nodeInfo, nodeInfo) ds.nodeInfo = append(ds.nodeInfo, nodeInfo)
} }
func (ds *DynamicDiscoveryMaster) removeNodeInfo(nodeId int32) {
if _,ok:= ds.mapNodeInfo[nodeId];ok == false {
return
}
for i:=0;i<len(ds.nodeInfo);i++ {
if ds.nodeInfo[i].NodeId == nodeId {
ds.nodeInfo = append(ds.nodeInfo[:i],ds.nodeInfo[i+1:]...)
break
}
}
delete(ds.mapNodeInfo,nodeId)
}
func (ds *DynamicDiscoveryMaster) OnInit() error { func (ds *DynamicDiscoveryMaster) OnInit() error {
ds.mapNodeInfo = make(map[int32]struct{}, 20) ds.mapNodeInfo = make(map[int32]struct{}, 20)
ds.RegRpcListener(ds) ds.RegRpcListener(ds)
@@ -103,6 +118,8 @@ func (ds *DynamicDiscoveryMaster) OnNodeDisconnect(nodeId int) {
return return
} }
ds.removeNodeInfo(int32(nodeId))
var notifyDiscover rpc.SubscribeDiscoverNotify var notifyDiscover rpc.SubscribeDiscoverNotify
notifyDiscover.MasterNodeId = int32(cluster.GetLocalNodeInfo().NodeId) notifyDiscover.MasterNodeId = int32(cluster.GetLocalNodeInfo().NodeId)
notifyDiscover.DelNodeId = int32(nodeId) notifyDiscover.DelNodeId = int32(nodeId)

View File

@@ -12,8 +12,8 @@ const defaultMaxTaskChannelNum = 1000000
type IConcurrent interface { type IConcurrent interface {
OpenConcurrentByNumCPU(cpuMul float32) OpenConcurrentByNumCPU(cpuMul float32)
OpenConcurrent(minGoroutineNum int32, maxGoroutineNum int32, maxTaskChannelNum int) OpenConcurrent(minGoroutineNum int32, maxGoroutineNum int32, maxTaskChannelNum int)
AsyncDoByQueue(queueId int64, fn func(), cb func(err error)) AsyncDoByQueue(queueId int64, fn func() bool, cb func(err error))
AsyncDo(f func(), cb func(err error)) AsyncDo(f func() bool, cb func(err error))
} }
type Concurrent struct { type Concurrent struct {
@@ -40,11 +40,11 @@ func (c *Concurrent) OpenConcurrent(minGoroutineNum int32, maxGoroutineNum int32
c.dispatch.open(minGoroutineNum, maxGoroutineNum, c.tasks, c.cbChannel) c.dispatch.open(minGoroutineNum, maxGoroutineNum, c.tasks, c.cbChannel)
} }
func (c *Concurrent) AsyncDo(f func(), cb func(err error)) { func (c *Concurrent) AsyncDo(f func() bool, cb func(err error)) {
c.AsyncDoByQueue(0, f, cb) c.AsyncDoByQueue(0, f, cb)
} }
func (c *Concurrent) AsyncDoByQueue(queueId int64, fn func(), cb func(err error)) { func (c *Concurrent) AsyncDoByQueue(queueId int64, fn func() bool, cb func(err error)) {
if cap(c.tasks) == 0 { if cap(c.tasks) == 0 {
panic("not open concurrent") panic("not open concurrent")
} }
@@ -54,16 +54,6 @@ func (c *Concurrent) AsyncDoByQueue(queueId int64, fn func(), cb func(err error)
return return
} }
if len(c.tasks) > cap(c.tasks) {
log.SError("tasks channel is full")
if cb != nil {
c.pushAsyncDoCallbackEvent(func(err error) {
cb(errors.New("tasks channel is full"))
})
}
return
}
if fn == nil { if fn == nil {
c.pushAsyncDoCallbackEvent(cb) c.pushAsyncDoCallbackEvent(cb)
return return
@@ -75,6 +65,14 @@ func (c *Concurrent) AsyncDoByQueue(queueId int64, fn func(), cb func(err error)
select { select {
case c.tasks <- task{queueId, fn, cb}: case c.tasks <- task{queueId, fn, cb}:
default:
log.SError("tasks channel is full")
if cb != nil {
c.pushAsyncDoCallbackEvent(func(err error) {
cb(errors.New("tasks channel is full"))
})
}
return
} }
} }

View File

@@ -12,7 +12,7 @@ import (
"github.com/duanhf2012/origin/util/queue" "github.com/duanhf2012/origin/util/queue"
) )
var idleTimeout = 2 * time.Second var idleTimeout = int64(2 * time.Second)
const maxTaskQueueSessionId = 10000 const maxTaskQueueSessionId = 10000
type dispatch struct { type dispatch struct {
@@ -47,7 +47,7 @@ func (d *dispatch) open(minGoroutineNum int32, maxGoroutineNum int32, tasks chan
func (d *dispatch) run() { func (d *dispatch) run() {
defer d.waitDispatch.Done() defer d.waitDispatch.Done()
timeout := time.NewTimer(idleTimeout) timeout := time.NewTimer(time.Duration(atomic.LoadInt64(&idleTimeout)))
for { for {
select { select {
@@ -65,9 +65,9 @@ func (d *dispatch) run() {
case <-timeout.C: case <-timeout.C:
d.processTimer() d.processTimer()
if atomic.LoadInt32(&d.minConcurrentNum) == -1 && len(d.tasks) == 0 { if atomic.LoadInt32(&d.minConcurrentNum) == -1 && len(d.tasks) == 0 {
idleTimeout = time.Millisecond * 10 atomic.StoreInt64(&idleTimeout,int64(time.Millisecond * 10))
} }
timeout.Reset(idleTimeout) timeout.Reset(time.Duration(atomic.LoadInt64(&idleTimeout)))
} }
} }
@@ -80,7 +80,7 @@ func (d *dispatch) run() {
} }
func (d *dispatch) processTimer() { func (d *dispatch) processTimer() {
if d.idle == true && d.workerNum > d.minConcurrentNum { if d.idle == true && d.workerNum > atomic.LoadInt32(&d.minConcurrentNum) {
d.processIdle() d.processIdle()
} }

View File

@@ -12,7 +12,7 @@ import (
type task struct { type task struct {
queueId int64 queueId int64
fn func() fn func() bool
cb func(err error) cb func(err error)
} }
@@ -60,17 +60,18 @@ func (w *worker) exec(t *task) {
cb(errors.New(errString)) cb(errors.New(errString))
} }
w.endCallFun(t) w.endCallFun(true,t)
log.SError("core dump info[", errString, "]\n", string(buf[:l])) log.SError("core dump info[", errString, "]\n", string(buf[:l]))
} }
}() }()
t.fn() w.endCallFun(t.fn(),t)
w.endCallFun(t)
} }
func (w *worker) endCallFun(t *task) { func (w *worker) endCallFun(isDocallBack bool,t *task) {
w.pushAsyncDoCallbackEvent(t.cb) if isDocallBack {
w.pushAsyncDoCallbackEvent(t.cb)
}
if t.queueId != 0 { if t.queueId != 0 {
w.pushQueueTaskFinishEvent(t.queueId) w.pushQueueTaskFinishEvent(t.queueId)

4
go.mod
View File

@@ -23,8 +23,8 @@ require (
github.com/xdg-go/scram v1.0.2 // indirect github.com/xdg-go/scram v1.0.2 // indirect
github.com/xdg-go/stringprep v1.0.2 // indirect github.com/xdg-go/stringprep v1.0.2 // indirect
github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d // indirect github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d // indirect
golang.org/x/crypto v0.0.0-20201216223049-8b5274cf687f // indirect golang.org/x/crypto v0.1.0 // indirect
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9 // indirect golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9 // indirect
golang.org/x/text v0.3.6 // indirect golang.org/x/text v0.4.0 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect
) )

7
go.sum
View File

@@ -58,8 +58,9 @@ go.mongodb.org/mongo-driver v1.9.1/go.mod h1:0sQWfOeY63QTntERDJJ/0SuKK0T1uVSgKCu
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20201216223049-8b5274cf687f h1:aZp0e2vLN4MToVqnjNEYEtrEA8RH8U8FN1CU7JgqsPU=
golang.org/x/crypto v0.0.0-20201216223049-8b5274cf687f/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I= golang.org/x/crypto v0.0.0-20201216223049-8b5274cf687f/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I=
golang.org/x/crypto v0.1.0 h1:MDRAIl0xIo9Io2xV565hzXHw3zVseKrJKodhohM5CjU=
golang.org/x/crypto v0.1.0/go.mod h1:RecgLatLF4+eUMCP1PoPZQb+cVrJcOPbHkTkbkB9sbw=
golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
@@ -79,8 +80,8 @@ golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXR
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.6 h1:aRYxNxv6iGQlyVaZmk6ZgYEDa+Jg18DxebPSrd6bg1M= golang.org/x/text v0.4.0 h1:BrVqGRd7+k1DiOgtnFvAkoQEWQvBc25ouMJM6429SFg=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.4.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190531172133-b3315ee88b7d/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc= golang.org/x/tools v0.0.0-20190531172133-b3315ee88b7d/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=

View File

@@ -64,20 +64,24 @@ func (client *TCPClient) init() {
if client.cons != nil { if client.cons != nil {
log.SFatal("client is running") log.SFatal("client is running")
} }
if client.LenMsgLen == 0 {
client.LenMsgLen = Default_LenMsgLen
}
if client.MinMsgLen == 0 { if client.MinMsgLen == 0 {
client.MinMsgLen = Default_MinMsgLen client.MinMsgLen = Default_MinMsgLen
} }
if client.MaxMsgLen == 0 { if client.MaxMsgLen == 0 {
client.MaxMsgLen = Default_MaxMsgLen client.MaxMsgLen = Default_MaxMsgLen
} }
if client.LenMsgLen ==0 {
client.LenMsgLen = Default_LenMsgLen
}
maxMsgLen := client.MsgParser.getMaxMsgLen(client.LenMsgLen)
if client.MaxMsgLen > maxMsgLen {
client.MaxMsgLen = maxMsgLen
log.SRelease("invalid MaxMsgLen, reset to ", maxMsgLen)
}
client.cons = make(ConnSet) client.cons = make(ConnSet)
client.closeFlag = false client.closeFlag = false
// msg parser
client.MsgParser.init() client.MsgParser.init()
} }

View File

@@ -1,11 +1,12 @@
package network package network
import ( import (
"errors"
"github.com/duanhf2012/origin/log" "github.com/duanhf2012/origin/log"
"net" "net"
"sync" "sync"
"sync/atomic"
"time" "time"
"errors"
) )
type ConnSet map[net.Conn]struct{} type ConnSet map[net.Conn]struct{}
@@ -14,7 +15,7 @@ type TCPConn struct {
sync.Mutex sync.Mutex
conn net.Conn conn net.Conn
writeChan chan []byte writeChan chan []byte
closeFlag bool closeFlag int32
msgParser *MsgParser msgParser *MsgParser
} }
@@ -49,7 +50,7 @@ func newTCPConn(conn net.Conn, pendingWriteNum int, msgParser *MsgParser,writeDe
conn.Close() conn.Close()
tcpConn.Lock() tcpConn.Lock()
freeChannel(tcpConn) freeChannel(tcpConn)
tcpConn.closeFlag = true atomic.StoreInt32(&tcpConn.closeFlag,1)
tcpConn.Unlock() tcpConn.Unlock()
}() }()
@@ -60,9 +61,9 @@ func (tcpConn *TCPConn) doDestroy() {
tcpConn.conn.(*net.TCPConn).SetLinger(0) tcpConn.conn.(*net.TCPConn).SetLinger(0)
tcpConn.conn.Close() tcpConn.conn.Close()
if !tcpConn.closeFlag { if atomic.LoadInt32(&tcpConn.closeFlag)==0 {
close(tcpConn.writeChan) close(tcpConn.writeChan)
tcpConn.closeFlag = true atomic.StoreInt32(&tcpConn.closeFlag,1)
} }
} }
@@ -76,12 +77,12 @@ func (tcpConn *TCPConn) Destroy() {
func (tcpConn *TCPConn) Close() { func (tcpConn *TCPConn) Close() {
tcpConn.Lock() tcpConn.Lock()
defer tcpConn.Unlock() defer tcpConn.Unlock()
if tcpConn.closeFlag { if atomic.LoadInt32(&tcpConn.closeFlag)==1 {
return return
} }
tcpConn.doWrite(nil) tcpConn.doWrite(nil)
tcpConn.closeFlag = true atomic.StoreInt32(&tcpConn.closeFlag,1)
} }
func (tcpConn *TCPConn) GetRemoteIp() string { func (tcpConn *TCPConn) GetRemoteIp() string {
@@ -104,7 +105,7 @@ func (tcpConn *TCPConn) doWrite(b []byte) error{
func (tcpConn *TCPConn) Write(b []byte) error{ func (tcpConn *TCPConn) Write(b []byte) error{
tcpConn.Lock() tcpConn.Lock()
defer tcpConn.Unlock() defer tcpConn.Unlock()
if tcpConn.closeFlag || b == nil { if atomic.LoadInt32(&tcpConn.closeFlag)==1 || b == nil {
tcpConn.ReleaseReadMsg(b) tcpConn.ReleaseReadMsg(b)
return errors.New("conn is close") return errors.New("conn is close")
} }
@@ -133,14 +134,14 @@ func (tcpConn *TCPConn) ReleaseReadMsg(byteBuff []byte){
} }
func (tcpConn *TCPConn) WriteMsg(args ...[]byte) error { func (tcpConn *TCPConn) WriteMsg(args ...[]byte) error {
if tcpConn.closeFlag == true { if atomic.LoadInt32(&tcpConn.closeFlag) == 1 {
return errors.New("conn is close") return errors.New("conn is close")
} }
return tcpConn.msgParser.Write(tcpConn, args...) return tcpConn.msgParser.Write(tcpConn, args...)
} }
func (tcpConn *TCPConn) WriteRawMsg(args []byte) error { func (tcpConn *TCPConn) WriteRawMsg(args []byte) error {
if tcpConn.closeFlag == true { if atomic.LoadInt32(&tcpConn.closeFlag) == 1 {
return errors.New("conn is close") return errors.New("conn is close")
} }
@@ -149,7 +150,7 @@ func (tcpConn *TCPConn) WriteRawMsg(args []byte) error {
func (tcpConn *TCPConn) IsConnected() bool { func (tcpConn *TCPConn) IsConnected() bool {
return tcpConn.closeFlag == false return atomic.LoadInt32(&tcpConn.closeFlag) == 0
} }
func (tcpConn *TCPConn) SetReadDeadline(d time.Duration) { func (tcpConn *TCPConn) SetReadDeadline(d time.Duration) {

View File

@@ -20,30 +20,22 @@ type MsgParser struct {
} }
func (p *MsgParser) init(){ func (p *MsgParser) getMaxMsgLen(lenMsgLen int) uint32 {
var max uint32
switch p.LenMsgLen { switch p.LenMsgLen {
case 1: case 1:
max = math.MaxUint8 return math.MaxUint8
case 2: case 2:
max = math.MaxUint16 return math.MaxUint16
case 4: case 4:
max = math.MaxUint32 return math.MaxUint32
default: default:
panic("LenMsgLen value must be 1 or 2 or 4") panic("LenMsgLen value must be 1 or 2 or 4")
} }
if p.MinMsgLen > max {
p.MinMsgLen = max
}
if p.MaxMsgLen > max {
p.MaxMsgLen = max
}
p.INetMempool = NewMemAreaPool()
} }
func (p *MsgParser) init(){
p.INetMempool = NewMemAreaPool()
}
// goroutine safe // goroutine safe
func (p *MsgParser) Read(conn *TCPConn) ([]byte, error) { func (p *MsgParser) Read(conn *TCPConn) ([]byte, error) {

View File

@@ -7,14 +7,16 @@ import (
"time" "time"
) )
const Default_ReadDeadline = time.Second*30 //30s const(
const Default_WriteDeadline = time.Second*30 //30s Default_ReadDeadline = time.Second*30 //默认读超时30s
const Default_MaxConnNum = 9000 Default_WriteDeadline = time.Second*30 //默认写超时30s
const Default_PendingWriteNum = 10000 Default_MaxConnNum = 1000000 //默认最大连接数
const Default_LittleEndian = false Default_PendingWriteNum = 100000 //单连接写消息Channel容量
const Default_MinMsgLen = 2 Default_LittleEndian = false //默认大小端
const Default_MaxMsgLen = 65535 Default_MinMsgLen = 2 //最小消息长度2byte
const Default_LenMsgLen = 2 Default_LenMsgLen = 2 //包头字段长度占用2byte
Default_MaxMsgLen = 65535 //最大消息长度
)
type TCPServer struct { type TCPServer struct {
Addr string Addr string
@@ -29,8 +31,7 @@ type TCPServer struct {
mutexConns sync.Mutex mutexConns sync.Mutex
wgLn sync.WaitGroup wgLn sync.WaitGroup
wgConns sync.WaitGroup wgConns sync.WaitGroup
// msg parser
MsgParser MsgParser
} }
@@ -49,14 +50,15 @@ func (server *TCPServer) init() {
server.MaxConnNum = Default_MaxConnNum server.MaxConnNum = Default_MaxConnNum
log.SRelease("invalid MaxConnNum, reset to ", server.MaxConnNum) log.SRelease("invalid MaxConnNum, reset to ", server.MaxConnNum)
} }
if server.PendingWriteNum <= 0 { if server.PendingWriteNum <= 0 {
server.PendingWriteNum = Default_PendingWriteNum server.PendingWriteNum = Default_PendingWriteNum
log.SRelease("invalid PendingWriteNum, reset to ", server.PendingWriteNum) log.SRelease("invalid PendingWriteNum, reset to ", server.PendingWriteNum)
} }
if server.MinMsgLen <= 0 { if server.LenMsgLen <= 0 {
server.MinMsgLen = Default_MinMsgLen server.LenMsgLen = Default_LenMsgLen
log.SRelease("invalid MinMsgLen, reset to ", server.MinMsgLen) log.SRelease("invalid LenMsgLen, reset to ", server.LenMsgLen)
} }
if server.MaxMsgLen <= 0 { if server.MaxMsgLen <= 0 {
@@ -64,6 +66,17 @@ func (server *TCPServer) init() {
log.SRelease("invalid MaxMsgLen, reset to ", server.MaxMsgLen) log.SRelease("invalid MaxMsgLen, reset to ", server.MaxMsgLen)
} }
maxMsgLen := server.MsgParser.getMaxMsgLen(server.LenMsgLen)
if server.MaxMsgLen > maxMsgLen {
server.MaxMsgLen = maxMsgLen
log.SRelease("invalid MaxMsgLen, reset to ", maxMsgLen)
}
if server.MinMsgLen <= 0 {
server.MinMsgLen = Default_MinMsgLen
log.SRelease("invalid MinMsgLen, reset to ", server.MinMsgLen)
}
if server.WriteDeadline == 0 { if server.WriteDeadline == 0 {
server.WriteDeadline = Default_WriteDeadline server.WriteDeadline = Default_WriteDeadline
log.SRelease("invalid WriteDeadline, reset to ", server.WriteDeadline.Seconds(),"s") log.SRelease("invalid WriteDeadline, reset to ", server.WriteDeadline.Seconds(),"s")
@@ -74,18 +87,12 @@ func (server *TCPServer) init() {
log.SRelease("invalid ReadDeadline, reset to ", server.ReadDeadline.Seconds(),"s") log.SRelease("invalid ReadDeadline, reset to ", server.ReadDeadline.Seconds(),"s")
} }
if server.LenMsgLen == 0 {
server.LenMsgLen = Default_LenMsgLen
}
if server.NewAgent == nil { if server.NewAgent == nil {
log.SFatal("NewAgent must not be nil") log.SFatal("NewAgent must not be nil")
} }
server.ln = ln server.ln = ln
server.conns = make(ConnSet) server.conns = make(ConnSet)
server.INetMempool = NewMemAreaPool()
server.MsgParser.init() server.MsgParser.init()
} }

View File

@@ -155,16 +155,21 @@ func initNode(id int) {
//2.顺序安装服务 //2.顺序安装服务
serviceOrder := cluster.GetCluster().GetLocalNodeInfo().ServiceList serviceOrder := cluster.GetCluster().GetLocalNodeInfo().ServiceList
for _,serviceName:= range serviceOrder{ for _,serviceName:= range serviceOrder{
bSetup := false
for _, s := range preSetupService { for _, s := range preSetupService {
if s.GetName() != serviceName { if s.GetName() != serviceName {
continue continue
} }
bSetup = true
pServiceCfg := cluster.GetCluster().GetServiceCfg(s.GetName()) pServiceCfg := cluster.GetCluster().GetServiceCfg(s.GetName())
s.Init(s, cluster.GetRpcClient, cluster.GetRpcServer, pServiceCfg) s.Init(s, cluster.GetRpcClient, cluster.GetRpcServer, pServiceCfg)
service.Setup(s) service.Setup(s)
} }
if bSetup == false {
log.SFatal("Service name "+serviceName+" configuration error")
}
} }
//3.service初始化 //3.service初始化
@@ -290,9 +295,9 @@ func GetService(serviceName string) service.IService {
return service.GetService(serviceName) return service.GetService(serviceName)
} }
func SetConfigDir(configDir string) { func SetConfigDir(cfgDir string) {
configDir = configDir configDir = cfgDir
cluster.SetConfigDir(configDir) cluster.SetConfigDir(cfgDir)
} }
func GetConfigDir() string { func GetConfigDir() string {

View File

@@ -193,9 +193,11 @@ func Report() {
record = prof.record record = prof.record
prof.record = list.New() prof.record = list.New()
callNum := prof.callNum
totalCostTime := prof.totalCostTime
prof.stackLocker.RUnlock() prof.stackLocker.RUnlock()
DefaultReportFunction(name,prof.callNum,prof.totalCostTime,record) DefaultReportFunction(name,callNum,totalCostTime,record)
} }
} }

View File

@@ -11,14 +11,19 @@ import (
"time" "time"
) )
const MaxCheckCallRpcCount = 1000 const(
const MaxPendingWriteNum = 200000 DefaultRpcConnNum = 1
const ConnectInterval = 2*time.Second DefaultRpcLenMsgLen = 4
const RpcConnNum = 1 DefaultRpcMinMsgLen = 2
const RpcLenMsgLen = 4 DefaultMaxCheckCallRpcCount = 1000
const RpcMinMsgLen = 2 DefaultMaxPendingWriteNum = 200000
const CheckRpcCallTimeoutInterval = 5*time.Second
const DefaultRpcTimeout = 15*time.Second
DefaultConnectInterval = 2*time.Second
DefaultCheckRpcCallTimeoutInterval = 5*time.Second
DefaultRpcTimeout = 15*time.Second
)
var clientSeq uint32 var clientSeq uint32
type IRealClient interface { type IRealClient interface {
@@ -64,7 +69,7 @@ func (bc *Client) makeCallFail(call *Call) {
func (bc *Client) checkRpcCallTimeout() { func (bc *Client) checkRpcCallTimeout() {
for{ for{
time.Sleep(CheckRpcCallTimeoutInterval) time.Sleep(DefaultCheckRpcCallTimeoutInterval)
now := time.Now() now := time.Now()
for i := 0; i < bc.maxCheckCallRpcCount; i++ { for i := 0; i < bc.maxCheckCallRpcCount; i++ {

View File

@@ -41,7 +41,10 @@ func (slf *GoGoPBProcessor) Marshal(v interface{}) ([]byte, error){
} }
func (slf *GoGoPBProcessor) Unmarshal(data []byte, msg interface{}) error{ func (slf *GoGoPBProcessor) Unmarshal(data []byte, msg interface{}) error{
protoMsg := msg.(proto.Message) protoMsg,ok := msg.(proto.Message)
if ok == false {
return fmt.Errorf("%+v is not of proto.Message type",msg)
}
return proto.Unmarshal(data, protoMsg) return proto.Unmarshal(data, protoMsg)
} }

View File

@@ -44,7 +44,8 @@ func (lc *LClient) Go(rpcHandler IRpcHandler,noReply bool, serviceMethod string,
sErr := errors.New("Call serviceMethod " + serviceMethod + " is error!") sErr := errors.New("Call serviceMethod " + serviceMethod + " is error!")
log.SError(sErr.Error()) log.SError(sErr.Error())
call := MakeCall() call := MakeCall()
call.Err = sErr call.DoError(sErr)
return call return call
} }
@@ -53,12 +54,13 @@ func (lc *LClient) Go(rpcHandler IRpcHandler,noReply bool, serviceMethod string,
//调用自己rpcHandler处理器 //调用自己rpcHandler处理器
err := pLocalRpcServer.myselfRpcHandlerGo(lc.selfClient,serviceName, serviceMethod, args, requestHandlerNull,reply) err := pLocalRpcServer.myselfRpcHandlerGo(lc.selfClient,serviceName, serviceMethod, args, requestHandlerNull,reply)
call := MakeCall() call := MakeCall()
if err != nil { if err != nil {
call.Err = err call.DoError(err)
return call return call
} }
call.done<-call call.DoOK()
return call return call
} }
@@ -119,7 +121,7 @@ func NewLClient(nodeId int) *Client{
client := &Client{} client := &Client{}
client.clientId = atomic.AddUint32(&clientSeq, 1) client.clientId = atomic.AddUint32(&clientSeq, 1)
client.nodeId = nodeId client.nodeId = nodeId
client.maxCheckCallRpcCount = MaxCheckCallRpcCount client.maxCheckCallRpcCount = DefaultMaxCheckCallRpcCount
client.callRpcTimeout = DefaultRpcTimeout client.callRpcTimeout = DefaultRpcTimeout
lClient := &LClient{} lClient := &LClient{}

View File

@@ -44,8 +44,9 @@ func (rc *RClient) Go(rpcHandler IRpcHandler,noReply bool, serviceMethod string,
_, processor := GetProcessorType(args) _, processor := GetProcessorType(args)
InParam, err := processor.Marshal(args) InParam, err := processor.Marshal(args)
if err != nil { if err != nil {
log.SError(err.Error())
call := MakeCall() call := MakeCall()
call.Err = err call.DoError(err)
return call return call
} }
@@ -65,14 +66,17 @@ func (rc *RClient) RawGo(rpcHandler IRpcHandler,processor IRpcProcessor, noReply
if err != nil { if err != nil {
call.Seq = 0 call.Seq = 0
call.Err = err log.SError(err.Error())
call.DoError(err)
return call return call
} }
conn := rc.GetConn() conn := rc.GetConn()
if conn == nil || conn.IsConnected()==false { if conn == nil || conn.IsConnected()==false {
call.Seq = 0 call.Seq = 0
call.Err = errors.New(serviceMethod + " was called failed,rpc client is disconnect") sErr := errors.New(serviceMethod + " was called failed,rpc client is disconnect")
log.SError(sErr.Error())
call.DoError(sErr)
return call return call
} }
@@ -83,8 +87,11 @@ func (rc *RClient) RawGo(rpcHandler IRpcHandler,processor IRpcProcessor, noReply
err = conn.WriteMsg([]byte{uint8(processor.GetProcessorType())}, bytes) err = conn.WriteMsg([]byte{uint8(processor.GetProcessorType())}, bytes)
if err != nil { if err != nil {
rc.selfClient.RemovePending(call.Seq) rc.selfClient.RemovePending(call.Seq)
log.SError(err.Error())
call.Seq = 0 call.Seq = 0
call.Err = err call.DoError(err)
} }
return call return call
@@ -211,19 +218,19 @@ func NewRClient(nodeId int, addr string, maxRpcParamLen uint32,triggerRpcConnEve
client := &Client{} client := &Client{}
client.clientId = atomic.AddUint32(&clientSeq, 1) client.clientId = atomic.AddUint32(&clientSeq, 1)
client.nodeId = nodeId client.nodeId = nodeId
client.maxCheckCallRpcCount = MaxCheckCallRpcCount client.maxCheckCallRpcCount = DefaultMaxCheckCallRpcCount
client.callRpcTimeout = DefaultRpcTimeout client.callRpcTimeout = DefaultRpcTimeout
c:= &RClient{} c:= &RClient{}
c.selfClient = client c.selfClient = client
c.Addr = addr c.Addr = addr
c.ConnectInterval = ConnectInterval c.ConnectInterval = DefaultConnectInterval
c.PendingWriteNum = MaxPendingWriteNum c.PendingWriteNum = DefaultMaxPendingWriteNum
c.AutoReconnect = true c.AutoReconnect = true
c.TriggerRpcConnEvent = triggerRpcConnEvent c.TriggerRpcConnEvent = triggerRpcConnEvent
c.ConnNum = RpcConnNum c.ConnNum = DefaultRpcConnNum
c.LenMsgLen = RpcLenMsgLen c.LenMsgLen = DefaultRpcLenMsgLen
c.MinMsgLen = RpcMinMsgLen c.MinMsgLen = DefaultRpcMinMsgLen
c.ReadDeadline = Default_ReadWriteDeadline c.ReadDeadline = Default_ReadWriteDeadline
c.WriteDeadline = Default_ReadWriteDeadline c.WriteDeadline = Default_ReadWriteDeadline
c.LittleEndian = LittleEndian c.LittleEndian = LittleEndian

View File

@@ -102,6 +102,15 @@ func (rpcResponse *RpcResponse) Clear() *RpcResponse{
return rpcResponse return rpcResponse
} }
func (call *Call) DoError(err error){
call.Err = err
call.done <- call
}
func (call *Call) DoOK(){
call.done <- call
}
func (call *Call) Clear() *Call{ func (call *Call) Clear() *Call{
call.Seq = 0 call.Seq = 0
call.ServiceMethod = "" call.ServiceMethod = ""

View File

@@ -138,7 +138,7 @@ func (handler *RpcHandler) isExportedOrBuiltinType(t reflect.Type) bool {
func (handler *RpcHandler) suitableMethods(method reflect.Method) error { func (handler *RpcHandler) suitableMethods(method reflect.Method) error {
//只有RPC_开头的才能被调用 //只有RPC_开头的才能被调用
if strings.Index(method.Name, "RPC_") != 0 { if strings.Index(method.Name, "RPC_") != 0 && strings.Index(method.Name, "RPC") != 0 {
return nil return nil
} }
@@ -291,14 +291,16 @@ func (handler *RpcHandler) HandlerRpcRequest(request *RpcRequest) {
request.requestHandle(nil, RpcError(rErr)) request.requestHandle(nil, RpcError(rErr))
return return
} }
requestHanle := request.requestHandle
returnValues := v.method.Func.Call(paramList) returnValues := v.method.Func.Call(paramList)
errInter := returnValues[0].Interface() errInter := returnValues[0].Interface()
if errInter != nil { if errInter != nil {
err = errInter.(error) err = errInter.(error)
} }
if request.requestHandle != nil && v.hasResponder == false { if v.hasResponder == false && requestHanle != nil {
request.requestHandle(oParam.Interface(), ConvertError(err)) requestHanle(oParam.Interface(), ConvertError(err))
} }
} }
@@ -459,11 +461,6 @@ func (handler *RpcHandler) callRpc(nodeId int, serviceMethod string, args interf
pClient := pClientList[0] pClient := pClientList[0]
pCall := pClient.Go(handler.rpcHandler,false, serviceMethod, args, reply) pCall := pClient.Go(handler.rpcHandler,false, serviceMethod, args, reply)
if pCall.Err != nil {
err = pCall.Err
ReleaseCall(pCall)
return err
}
err = pCall.Done().Err err = pCall.Done().Err
pClient.RemovePending(pCall.Seq) pClient.RemovePending(pCall.Seq)
@@ -496,7 +493,11 @@ func (handler *RpcHandler) asyncCallRpc(nodeId int, serviceMethod string, args i
err, count := handler.funcRpcClient(nodeId, serviceMethod, pClientList[:]) err, count := handler.funcRpcClient(nodeId, serviceMethod, pClientList[:])
if count == 0 || err != nil { if count == 0 || err != nil {
if err == nil { if err == nil {
err = fmt.Errorf("cannot find %s from nodeId %d",serviceMethod,nodeId) if nodeId > 0 {
err = fmt.Errorf("cannot find %s from nodeId %d",serviceMethod,nodeId)
}else {
err = fmt.Errorf("No %s service found in the origin network",serviceMethod)
}
} }
fVal.Call([]reflect.Value{reflect.ValueOf(reply), reflect.ValueOf(err)}) fVal.Call([]reflect.Value{reflect.ValueOf(reply), reflect.ValueOf(err)})
log.SError("Call serviceMethod is error:", err.Error()) log.SError("Call serviceMethod is error:", err.Error())

View File

@@ -71,7 +71,6 @@ func (server *Server) Start(listenAddr string, maxRpcParamLen uint32) {
} }
server.rpcServer.Addr = ":" + splitAddr[1] server.rpcServer.Addr = ":" + splitAddr[1]
server.rpcServer.LenMsgLen = 4 //uint16
server.rpcServer.MinMsgLen = 2 server.rpcServer.MinMsgLen = 2
if maxRpcParamLen > 0 { if maxRpcParamLen > 0 {
server.rpcServer.MaxMsgLen = maxRpcParamLen server.rpcServer.MaxMsgLen = maxRpcParamLen
@@ -85,6 +84,8 @@ func (server *Server) Start(listenAddr string, maxRpcParamLen uint32) {
server.rpcServer.LittleEndian = LittleEndian server.rpcServer.LittleEndian = LittleEndian
server.rpcServer.WriteDeadline = Default_ReadWriteDeadline server.rpcServer.WriteDeadline = Default_ReadWriteDeadline
server.rpcServer.ReadDeadline = Default_ReadWriteDeadline server.rpcServer.ReadDeadline = Default_ReadWriteDeadline
server.rpcServer.LenMsgLen = DefaultRpcLenMsgLen
server.rpcServer.Start() server.rpcServer.Start()
} }
@@ -147,7 +148,6 @@ func (agent *RpcAgent) Run() {
ReleaseRpcRequest(req) ReleaseRpcRequest(req)
continue continue
} else { } else {
//will close tcpconn
ReleaseRpcRequest(req) ReleaseRpcRequest(req)
break break
} }
@@ -256,10 +256,10 @@ func (server *Server) selfNodeRpcHandlerGo(processor IRpcProcessor, client *Clie
rpcHandler := server.rpcHandleFinder.FindRpcHandler(handlerName) rpcHandler := server.rpcHandleFinder.FindRpcHandler(handlerName)
if rpcHandler == nil { if rpcHandler == nil {
err := errors.New("service method " + serviceMethod + " not config!")
log.SError(err.Error())
pCall.Seq = 0 pCall.Seq = 0
pCall.Err = errors.New("service method " + serviceMethod + " not config!") pCall.DoError(err)
pCall.done <- pCall
log.SError(pCall.Err.Error())
return pCall return pCall
} }
@@ -273,10 +273,10 @@ func (server *Server) selfNodeRpcHandlerGo(processor IRpcProcessor, client *Clie
var err error var err error
iParam,err = processor.Clone(args) iParam,err = processor.Clone(args)
if err != nil { if err != nil {
sErr := errors.New("RpcHandler " + handlerName + "."+serviceMethod+" deep copy inParam is error:" + err.Error())
log.SError(sErr.Error())
pCall.Seq = 0 pCall.Seq = 0
pCall.Err = errors.New("RpcHandler " + handlerName + "."+serviceMethod+" deep copy inParam is error:" + err.Error()) pCall.DoError(sErr)
pCall.done <- pCall
log.SError(pCall.Err.Error())
return pCall return pCall
} }
@@ -289,9 +289,10 @@ func (server *Server) selfNodeRpcHandlerGo(processor IRpcProcessor, client *Clie
var err error var err error
req.inParam, err = rpcHandler.UnmarshalInParam(processor, serviceMethod, rpcMethodId, rawArgs) req.inParam, err = rpcHandler.UnmarshalInParam(processor, serviceMethod, rpcMethodId, rawArgs)
if err != nil { if err != nil {
log.SError(err.Error())
pCall.Seq = 0
pCall.DoError(err)
ReleaseRpcRequest(req) ReleaseRpcRequest(req)
pCall.Err = err
pCall.done <- pCall
return pCall return pCall
} }
} }
@@ -303,38 +304,40 @@ func (server *Server) selfNodeRpcHandlerGo(processor IRpcProcessor, client *Clie
if reply != nil && Returns != reply && Returns != nil { if reply != nil && Returns != reply && Returns != nil {
byteReturns, err := req.rpcProcessor.Marshal(Returns) byteReturns, err := req.rpcProcessor.Marshal(Returns)
if err != nil { if err != nil {
log.SError("returns data cannot be marshal ", callSeq) Err = ConvertError(err)
ReleaseRpcRequest(req) log.SError("returns data cannot be marshal,callSeq is ", callSeq," error is ",err.Error())
} }else{
err = req.rpcProcessor.Unmarshal(byteReturns, reply)
err = req.rpcProcessor.Unmarshal(byteReturns, reply) if err != nil {
if err != nil { Err = ConvertError(err)
log.SError("returns data cannot be Unmarshal ", callSeq) log.SError("returns data cannot be Unmarshal,callSeq is ", callSeq," error is ",err.Error())
ReleaseRpcRequest(req) }
} }
} }
ReleaseRpcRequest(req)
v := client.RemovePending(callSeq) v := client.RemovePending(callSeq)
if v == nil { if v == nil {
log.SError("rpcClient cannot find seq ",callSeq, " in pending") log.SError("rpcClient cannot find seq ",callSeq, " in pending")
ReleaseRpcRequest(req)
return return
} }
if len(Err) == 0 { if len(Err) == 0 {
v.Err = nil v.Err = nil
v.DoOK()
} else { } else {
v.Err = Err log.SError(Err.Error())
v.DoError(Err)
} }
v.done <- v
ReleaseRpcRequest(req)
} }
} }
err := rpcHandler.PushRpcRequest(req) err := rpcHandler.PushRpcRequest(req)
if err != nil { if err != nil {
log.SError(err.Error())
pCall.DoError(err)
ReleaseRpcRequest(req) ReleaseRpcRequest(req)
pCall.Err = err
pCall.done <- pCall
} }
return pCall return pCall

View File

@@ -20,6 +20,7 @@ var timerDispatcherLen = 100000
var maxServiceEventChannelNum = 2000000 var maxServiceEventChannelNum = 2000000
type IService interface { type IService interface {
concurrent.IConcurrent
Init(iService IService,getClientFun rpc.FuncRpcClient,getServerFun rpc.FuncRpcServer,serviceCfg interface{}) Init(iService IService,getClientFun rpc.FuncRpcClient,getServerFun rpc.FuncRpcServer,serviceCfg interface{})
Stop() Stop()
Start() Start()

View File

@@ -6,9 +6,9 @@ import (
"github.com/duanhf2012/origin/rpc" "github.com/duanhf2012/origin/rpc"
"github.com/duanhf2012/origin/service" "github.com/duanhf2012/origin/service"
"github.com/duanhf2012/origin/sysmodule/mongodbmodule" "github.com/duanhf2012/origin/sysmodule/mongodbmodule"
"github.com/duanhf2012/origin/util/coroutine"
"go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo/options" "go.mongodb.org/mongo-driver/mongo/options"
"runtime"
"sync" "sync"
"sync/atomic" "sync/atomic"
"time" "time"
@@ -71,7 +71,9 @@ func (mp *MongoPersist) OnInit() error {
} }
//开启协程 //开启协程
coroutine.GoRecover(mp.persistCoroutine,-1) mp.waitGroup.Add(1)
go mp.persistCoroutine()
return nil return nil
} }
@@ -260,7 +262,6 @@ func (mp *MongoPersist) JugeTimeoutSave() bool{
} }
func (mp *MongoPersist) persistCoroutine(){ func (mp *MongoPersist) persistCoroutine(){
mp.waitGroup.Add(1)
defer mp.waitGroup.Done() defer mp.waitGroup.Done()
for atomic.LoadInt32(&mp.stop)==0 || mp.hasPersistData(){ for atomic.LoadInt32(&mp.stop)==0 || mp.hasPersistData(){
//间隔时间sleep //间隔时间sleep
@@ -291,6 +292,15 @@ func (mp *MongoPersist) hasPersistData() bool{
} }
func (mp *MongoPersist) saveToDB(){ func (mp *MongoPersist) saveToDB(){
defer func() {
if r := recover(); r != nil {
buf := make([]byte, 4096)
l := runtime.Stack(buf, false)
errString := fmt.Sprint(r)
log.SError(" Core dump info[", errString, "]\n", string(buf[:l]))
}
}()
//1.copy数据 //1.copy数据
mp.Lock() mp.Lock()
mapRemoveRankData := mp.mapRemoveRankData mapRemoveRankData := mp.mapRemoveRankData

View File

@@ -356,12 +356,12 @@ func (rs *RankSkip) GetRankNodeDataByRank(rank uint64) (*RankData, uint64) {
// GetRankKeyPrevToLimit 获取key前count名的数据 // GetRankKeyPrevToLimit 获取key前count名的数据
func (rs *RankSkip) GetRankKeyPrevToLimit(findKey, count uint64, result *rpc.RankDataList) error { func (rs *RankSkip) GetRankKeyPrevToLimit(findKey, count uint64, result *rpc.RankDataList) error {
if rs.GetRankLen() <= 0 { if rs.GetRankLen() <= 0 {
return fmt.Errorf("rank[", rs.rankId, "] no data") return fmt.Errorf("rank[%d] no data", rs.rankId)
} }
findData, ok := rs.mapRankData[findKey] findData, ok := rs.mapRankData[findKey]
if ok == false { if ok == false {
return fmt.Errorf("rank[", rs.rankId, "] no data") return fmt.Errorf("rank[%d] no data", rs.rankId)
} }
_, rankPos := rs.skipList.GetWithPosition(findData) _, rankPos := rs.skipList.GetWithPosition(findData)
@@ -385,12 +385,12 @@ func (rs *RankSkip) GetRankKeyPrevToLimit(findKey, count uint64, result *rpc.Ran
// GetRankKeyPrevToLimit 获取key前count名的数据 // GetRankKeyPrevToLimit 获取key前count名的数据
func (rs *RankSkip) GetRankKeyNextToLimit(findKey, count uint64, result *rpc.RankDataList) error { func (rs *RankSkip) GetRankKeyNextToLimit(findKey, count uint64, result *rpc.RankDataList) error {
if rs.GetRankLen() <= 0 { if rs.GetRankLen() <= 0 {
return fmt.Errorf("rank[", rs.rankId, "] no data") return fmt.Errorf("rank[%d] no data", rs.rankId)
} }
findData, ok := rs.mapRankData[findKey] findData, ok := rs.mapRankData[findKey]
if ok == false { if ok == false {
return fmt.Errorf("rank[", rs.rankId, "] no data") return fmt.Errorf("rank[%d] no data", rs.rankId)
} }
_, rankPos := rs.skipList.GetWithPosition(findData) _, rankPos := rs.skipList.GetWithPosition(findData)

View File

@@ -90,6 +90,10 @@ func (tcpService *TcpService) OnInit() error{
if ok == true { if ok == true {
tcpService.tcpServer.LittleEndian = LittleEndian.(bool) tcpService.tcpServer.LittleEndian = LittleEndian.(bool)
} }
LenMsgLen,ok := tcpCfg["LenMsgLen"]
if ok == true {
tcpService.tcpServer.LenMsgLen = int(LenMsgLen.(float64))
}
MinMsgLen,ok := tcpCfg["MinMsgLen"] MinMsgLen,ok := tcpCfg["MinMsgLen"]
if ok == true { if ok == true {
tcpService.tcpServer.MinMsgLen = uint32(MinMsgLen.(float64)) tcpService.tcpServer.MinMsgLen = uint32(MinMsgLen.(float64))