Compare commits

...

21 Commits

Author SHA1 Message Date
duanhf2012
a61979e985 优化消息队列服务持久化 2023-05-17 11:26:29 +08:00
duanhf2012
6de25d1c6d 优化rankservice错误返回 2023-05-09 14:34:33 +08:00
duanhf2012
b392617d6e 优化性能监控与rankservice持久化 2023-05-09 14:06:17 +08:00
duanhf2012
92fdb7860c 优化本地node中的服务rpc 2023-05-04 17:53:42 +08:00
duanhf2012
f78d0d58be 优化rpc与rankservice持久化 2023-05-04 17:35:40 +08:00
duanhf2012
5675681ab1 优化concurrent与rpc模块 2023-05-04 14:21:29 +08:00
duanhf2012
ddeaaf7d77 优化concurrent模块 2023-04-11 10:29:06 +08:00
duanhf2012
1174b47475 IService接口新增扩展IConcurrent 2023-04-04 16:36:05 +08:00
duanhf2012
18fff3b567 优化concurrent模块,新增返回值控制是否回调 2023-03-31 15:12:27 +08:00
duanhf2012
7ab6c88f9c 整理优化rpc 2023-03-23 10:06:41 +08:00
duanhf2012
6b64de06a2 优化增加TcpService的包长度字段配置 2023-03-22 14:59:22 +08:00
duanhf2012
95b153f8cf 优化network包长度字段自动计算 2023-03-20 15:20:04 +08:00
duanhf2012
f3ff09b90f 优化rpc调用错误日志
限制配置的服务必需安装
优化结点断开连接时删除结点
2023-03-17 12:09:00 +08:00
duanhf2012
f9738fb9d0 Merge branch 'master' of https://github.com/duanhf2012/origin 2023-03-06 15:57:33 +08:00
duanhf2012
91e773aa8c 补充说明RPC函数名的规范支持RPCFunctionName 2023-03-06 15:57:23 +08:00
origin
c9b96404f4 Merge pull request #873 from duanhf2012/dependabot/go_modules/golang.org/x/crypto-0.1.0
Bump golang.org/x/crypto from 0.0.0-20201216223049-8b5274cf687f to 0.1.0
2023-03-06 15:45:40 +08:00
duanhf2012
aaae63a674 新增支持RPC函数命名RPCXXX格式 2023-03-06 15:41:51 +08:00
duanhf2012
47dc21aee1 优化rpc返回参数与请求参数不一致时报错 2023-03-06 11:47:23 +08:00
dependabot[bot]
4d09532801 Bump golang.org/x/crypto from 0.0.0-20201216223049-8b5274cf687f to 0.1.0
Bumps [golang.org/x/crypto](https://github.com/golang/crypto) from 0.0.0-20201216223049-8b5274cf687f to 0.1.0.
- [Release notes](https://github.com/golang/crypto/releases)
- [Commits](https://github.com/golang/crypto/commits/v0.1.0)

---
updated-dependencies:
- dependency-name: golang.org/x/crypto
  dependency-type: indirect
...

Signed-off-by: dependabot[bot] <support@github.com>
2023-03-06 02:26:33 +00:00
origin
d3ad7fc898 Merge pull request #871 from duanhf2012/dependabot/go_modules/golang.org/x/text-0.3.8
Bump golang.org/x/text from 0.3.6 to 0.3.8
2023-03-06 10:08:56 +08:00
dependabot[bot]
ba2b0568b2 Bump golang.org/x/text from 0.3.6 to 0.3.8
Bumps [golang.org/x/text](https://github.com/golang/text) from 0.3.6 to 0.3.8.
- [Release notes](https://github.com/golang/text/releases)
- [Commits](https://github.com/golang/text/compare/v0.3.6...v0.3.8)

---
updated-dependencies:
- dependency-name: golang.org/x/text
  dependency-type: indirect
...

Signed-off-by: dependabot[bot] <support@github.com>
2023-02-23 07:30:17 +00:00
25 changed files with 289 additions and 161 deletions

View File

@@ -667,6 +667,7 @@ type InputData struct {
B int
}
// 注意RPC函数名的格式必需为RPC_FunctionName或者是RPCFunctionName如下的RPC_Sum也可以写成RPCSum
func (slf *TestService6) RPC_Sum(input *InputData,output *int) error{
*output = input.A+input.B
return nil
@@ -766,34 +767,57 @@ func (slf *TestService7) GoTest(){
//slf.OpenConcurrent(5, 10, 1000000)
```
普通调用可以使用以下方法:
使用示例如下:
```
func (slf *TestService1) testAsyncDo() {
func (slf *TestService13) testAsyncDo() {
var context struct {
data int64
}
slf.AsyncDo(func() {
//1.示例普通使用
//参数一的函数在其他协程池中执行完成,将执行完成事件放入服务工作协程,
//参数二的函数在服务协程中执行,是协程安全的。
slf.AsyncDo(func() bool {
//该函数回调在协程池中执行
context.data = 100
return true
}, func(err error) {
//函数将在服务协程中执行
fmt.Print(context.data) //显示100
})
}
```
以下方法将函数扔到任务管道中,由协程池去抢执行。但某些任务是由先后顺序的,可以使用以下方法:
```
func (slf *TestService1) testAsyncDoByQueue() {
queueId := int64(1)
//2.示例按队列顺序
//参数一传入队列Id,同一个队列Id将在协程池中被排队执行
//以下进行两次调用因为两次都传入参数queueId都为1所以它们会都进入queueId为1的排队执行
queueId := int64(1)
for i := 0; i < 2; i++ {
slf.AsyncDoByQueue(queueId, func() {
slf.AsyncDoByQueue(queueId, func() bool {
//该函数会被2次调用但是会排队执行
return true
}, func(err error) {
//函数将在服务协程中执行
})
}
//3.函数参数可以某中一个为空
//参数二函数将被延迟执行
slf.AsyncDo(nil, func(err error) {
//将在下
})
//参数一函数在协程池中执行,但没有在服务协程中回调
slf.AsyncDo(func() bool {
return true
}, nil)
//4.函数返回值控制不进行回调
slf.AsyncDo(func() bool {
//返回false时参数二函数将不会被执行; 为true时则会被执行
return false
}, func(err error) {
//该函数将不会被执行
})
}
```

View File

@@ -60,6 +60,21 @@ func (ds *DynamicDiscoveryMaster) addNodeInfo(nodeInfo *rpc.NodeInfo) {
ds.nodeInfo = append(ds.nodeInfo, nodeInfo)
}
func (ds *DynamicDiscoveryMaster) removeNodeInfo(nodeId int32) {
if _,ok:= ds.mapNodeInfo[nodeId];ok == false {
return
}
for i:=0;i<len(ds.nodeInfo);i++ {
if ds.nodeInfo[i].NodeId == nodeId {
ds.nodeInfo = append(ds.nodeInfo[:i],ds.nodeInfo[i+1:]...)
break
}
}
delete(ds.mapNodeInfo,nodeId)
}
func (ds *DynamicDiscoveryMaster) OnInit() error {
ds.mapNodeInfo = make(map[int32]struct{}, 20)
ds.RegRpcListener(ds)
@@ -103,6 +118,8 @@ func (ds *DynamicDiscoveryMaster) OnNodeDisconnect(nodeId int) {
return
}
ds.removeNodeInfo(int32(nodeId))
var notifyDiscover rpc.SubscribeDiscoverNotify
notifyDiscover.MasterNodeId = int32(cluster.GetLocalNodeInfo().NodeId)
notifyDiscover.DelNodeId = int32(nodeId)

View File

@@ -12,8 +12,8 @@ const defaultMaxTaskChannelNum = 1000000
type IConcurrent interface {
OpenConcurrentByNumCPU(cpuMul float32)
OpenConcurrent(minGoroutineNum int32, maxGoroutineNum int32, maxTaskChannelNum int)
AsyncDoByQueue(queueId int64, fn func(), cb func(err error))
AsyncDo(f func(), cb func(err error))
AsyncDoByQueue(queueId int64, fn func() bool, cb func(err error))
AsyncDo(f func() bool, cb func(err error))
}
type Concurrent struct {
@@ -40,11 +40,11 @@ func (c *Concurrent) OpenConcurrent(minGoroutineNum int32, maxGoroutineNum int32
c.dispatch.open(minGoroutineNum, maxGoroutineNum, c.tasks, c.cbChannel)
}
func (c *Concurrent) AsyncDo(f func(), cb func(err error)) {
func (c *Concurrent) AsyncDo(f func() bool, cb func(err error)) {
c.AsyncDoByQueue(0, f, cb)
}
func (c *Concurrent) AsyncDoByQueue(queueId int64, fn func(), cb func(err error)) {
func (c *Concurrent) AsyncDoByQueue(queueId int64, fn func() bool, cb func(err error)) {
if cap(c.tasks) == 0 {
panic("not open concurrent")
}
@@ -54,16 +54,6 @@ func (c *Concurrent) AsyncDoByQueue(queueId int64, fn func(), cb func(err error)
return
}
if len(c.tasks) > cap(c.tasks) {
log.SError("tasks channel is full")
if cb != nil {
c.pushAsyncDoCallbackEvent(func(err error) {
cb(errors.New("tasks channel is full"))
})
}
return
}
if fn == nil {
c.pushAsyncDoCallbackEvent(cb)
return
@@ -75,6 +65,14 @@ func (c *Concurrent) AsyncDoByQueue(queueId int64, fn func(), cb func(err error)
select {
case c.tasks <- task{queueId, fn, cb}:
default:
log.SError("tasks channel is full")
if cb != nil {
c.pushAsyncDoCallbackEvent(func(err error) {
cb(errors.New("tasks channel is full"))
})
}
return
}
}

View File

@@ -12,7 +12,7 @@ import (
"github.com/duanhf2012/origin/util/queue"
)
var idleTimeout = 2 * time.Second
var idleTimeout = int64(2 * time.Second)
const maxTaskQueueSessionId = 10000
type dispatch struct {
@@ -47,7 +47,7 @@ func (d *dispatch) open(minGoroutineNum int32, maxGoroutineNum int32, tasks chan
func (d *dispatch) run() {
defer d.waitDispatch.Done()
timeout := time.NewTimer(idleTimeout)
timeout := time.NewTimer(time.Duration(atomic.LoadInt64(&idleTimeout)))
for {
select {
@@ -65,9 +65,9 @@ func (d *dispatch) run() {
case <-timeout.C:
d.processTimer()
if atomic.LoadInt32(&d.minConcurrentNum) == -1 && len(d.tasks) == 0 {
idleTimeout = time.Millisecond * 10
atomic.StoreInt64(&idleTimeout,int64(time.Millisecond * 10))
}
timeout.Reset(idleTimeout)
timeout.Reset(time.Duration(atomic.LoadInt64(&idleTimeout)))
}
}
@@ -80,7 +80,7 @@ func (d *dispatch) run() {
}
func (d *dispatch) processTimer() {
if d.idle == true && d.workerNum > d.minConcurrentNum {
if d.idle == true && d.workerNum > atomic.LoadInt32(&d.minConcurrentNum) {
d.processIdle()
}

View File

@@ -12,7 +12,7 @@ import (
type task struct {
queueId int64
fn func()
fn func() bool
cb func(err error)
}
@@ -60,17 +60,18 @@ func (w *worker) exec(t *task) {
cb(errors.New(errString))
}
w.endCallFun(t)
w.endCallFun(true,t)
log.SError("core dump info[", errString, "]\n", string(buf[:l]))
}
}()
t.fn()
w.endCallFun(t)
w.endCallFun(t.fn(),t)
}
func (w *worker) endCallFun(t *task) {
w.pushAsyncDoCallbackEvent(t.cb)
func (w *worker) endCallFun(isDocallBack bool,t *task) {
if isDocallBack {
w.pushAsyncDoCallbackEvent(t.cb)
}
if t.queueId != 0 {
w.pushQueueTaskFinishEvent(t.queueId)

4
go.mod
View File

@@ -23,8 +23,8 @@ require (
github.com/xdg-go/scram v1.0.2 // indirect
github.com/xdg-go/stringprep v1.0.2 // indirect
github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d // indirect
golang.org/x/crypto v0.0.0-20201216223049-8b5274cf687f // indirect
golang.org/x/crypto v0.1.0 // indirect
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9 // indirect
golang.org/x/text v0.3.6 // indirect
golang.org/x/text v0.4.0 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
)

7
go.sum
View File

@@ -58,8 +58,9 @@ go.mongodb.org/mongo-driver v1.9.1/go.mod h1:0sQWfOeY63QTntERDJJ/0SuKK0T1uVSgKCu
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20201216223049-8b5274cf687f h1:aZp0e2vLN4MToVqnjNEYEtrEA8RH8U8FN1CU7JgqsPU=
golang.org/x/crypto v0.0.0-20201216223049-8b5274cf687f/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I=
golang.org/x/crypto v0.1.0 h1:MDRAIl0xIo9Io2xV565hzXHw3zVseKrJKodhohM5CjU=
golang.org/x/crypto v0.1.0/go.mod h1:RecgLatLF4+eUMCP1PoPZQb+cVrJcOPbHkTkbkB9sbw=
golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
@@ -79,8 +80,8 @@ golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXR
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.6 h1:aRYxNxv6iGQlyVaZmk6ZgYEDa+Jg18DxebPSrd6bg1M=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.4.0 h1:BrVqGRd7+k1DiOgtnFvAkoQEWQvBc25ouMJM6429SFg=
golang.org/x/text v0.4.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190531172133-b3315ee88b7d/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=

View File

@@ -64,20 +64,24 @@ func (client *TCPClient) init() {
if client.cons != nil {
log.SFatal("client is running")
}
if client.LenMsgLen == 0 {
client.LenMsgLen = Default_LenMsgLen
}
if client.MinMsgLen == 0 {
client.MinMsgLen = Default_MinMsgLen
}
if client.MaxMsgLen == 0 {
client.MaxMsgLen = Default_MaxMsgLen
}
if client.LenMsgLen ==0 {
client.LenMsgLen = Default_LenMsgLen
}
maxMsgLen := client.MsgParser.getMaxMsgLen(client.LenMsgLen)
if client.MaxMsgLen > maxMsgLen {
client.MaxMsgLen = maxMsgLen
log.SRelease("invalid MaxMsgLen, reset to ", maxMsgLen)
}
client.cons = make(ConnSet)
client.closeFlag = false
// msg parser
client.MsgParser.init()
}

View File

@@ -1,11 +1,12 @@
package network
import (
"errors"
"github.com/duanhf2012/origin/log"
"net"
"sync"
"sync/atomic"
"time"
"errors"
)
type ConnSet map[net.Conn]struct{}
@@ -14,7 +15,7 @@ type TCPConn struct {
sync.Mutex
conn net.Conn
writeChan chan []byte
closeFlag bool
closeFlag int32
msgParser *MsgParser
}
@@ -49,7 +50,7 @@ func newTCPConn(conn net.Conn, pendingWriteNum int, msgParser *MsgParser,writeDe
conn.Close()
tcpConn.Lock()
freeChannel(tcpConn)
tcpConn.closeFlag = true
atomic.StoreInt32(&tcpConn.closeFlag,1)
tcpConn.Unlock()
}()
@@ -60,9 +61,9 @@ func (tcpConn *TCPConn) doDestroy() {
tcpConn.conn.(*net.TCPConn).SetLinger(0)
tcpConn.conn.Close()
if !tcpConn.closeFlag {
if atomic.LoadInt32(&tcpConn.closeFlag)==0 {
close(tcpConn.writeChan)
tcpConn.closeFlag = true
atomic.StoreInt32(&tcpConn.closeFlag,1)
}
}
@@ -76,12 +77,12 @@ func (tcpConn *TCPConn) Destroy() {
func (tcpConn *TCPConn) Close() {
tcpConn.Lock()
defer tcpConn.Unlock()
if tcpConn.closeFlag {
if atomic.LoadInt32(&tcpConn.closeFlag)==1 {
return
}
tcpConn.doWrite(nil)
tcpConn.closeFlag = true
atomic.StoreInt32(&tcpConn.closeFlag,1)
}
func (tcpConn *TCPConn) GetRemoteIp() string {
@@ -104,7 +105,7 @@ func (tcpConn *TCPConn) doWrite(b []byte) error{
func (tcpConn *TCPConn) Write(b []byte) error{
tcpConn.Lock()
defer tcpConn.Unlock()
if tcpConn.closeFlag || b == nil {
if atomic.LoadInt32(&tcpConn.closeFlag)==1 || b == nil {
tcpConn.ReleaseReadMsg(b)
return errors.New("conn is close")
}
@@ -133,14 +134,14 @@ func (tcpConn *TCPConn) ReleaseReadMsg(byteBuff []byte){
}
func (tcpConn *TCPConn) WriteMsg(args ...[]byte) error {
if tcpConn.closeFlag == true {
if atomic.LoadInt32(&tcpConn.closeFlag) == 1 {
return errors.New("conn is close")
}
return tcpConn.msgParser.Write(tcpConn, args...)
}
func (tcpConn *TCPConn) WriteRawMsg(args []byte) error {
if tcpConn.closeFlag == true {
if atomic.LoadInt32(&tcpConn.closeFlag) == 1 {
return errors.New("conn is close")
}
@@ -149,7 +150,7 @@ func (tcpConn *TCPConn) WriteRawMsg(args []byte) error {
func (tcpConn *TCPConn) IsConnected() bool {
return tcpConn.closeFlag == false
return atomic.LoadInt32(&tcpConn.closeFlag) == 0
}
func (tcpConn *TCPConn) SetReadDeadline(d time.Duration) {

View File

@@ -20,30 +20,22 @@ type MsgParser struct {
}
func (p *MsgParser) init(){
var max uint32
func (p *MsgParser) getMaxMsgLen(lenMsgLen int) uint32 {
switch p.LenMsgLen {
case 1:
max = math.MaxUint8
return math.MaxUint8
case 2:
max = math.MaxUint16
return math.MaxUint16
case 4:
max = math.MaxUint32
return math.MaxUint32
default:
panic("LenMsgLen value must be 1 or 2 or 4")
}
if p.MinMsgLen > max {
p.MinMsgLen = max
}
if p.MaxMsgLen > max {
p.MaxMsgLen = max
}
p.INetMempool = NewMemAreaPool()
}
func (p *MsgParser) init(){
p.INetMempool = NewMemAreaPool()
}
// goroutine safe
func (p *MsgParser) Read(conn *TCPConn) ([]byte, error) {

View File

@@ -7,14 +7,16 @@ import (
"time"
)
const Default_ReadDeadline = time.Second*30 //30s
const Default_WriteDeadline = time.Second*30 //30s
const Default_MaxConnNum = 9000
const Default_PendingWriteNum = 10000
const Default_LittleEndian = false
const Default_MinMsgLen = 2
const Default_MaxMsgLen = 65535
const Default_LenMsgLen = 2
const(
Default_ReadDeadline = time.Second*30 //默认读超时30s
Default_WriteDeadline = time.Second*30 //默认写超时30s
Default_MaxConnNum = 1000000 //默认最大连接数
Default_PendingWriteNum = 100000 //单连接写消息Channel容量
Default_LittleEndian = false //默认大小端
Default_MinMsgLen = 2 //最小消息长度2byte
Default_LenMsgLen = 2 //包头字段长度占用2byte
Default_MaxMsgLen = 65535 //最大消息长度
)
type TCPServer struct {
Addr string
@@ -29,8 +31,7 @@ type TCPServer struct {
mutexConns sync.Mutex
wgLn sync.WaitGroup
wgConns sync.WaitGroup
// msg parser
MsgParser
}
@@ -49,14 +50,15 @@ func (server *TCPServer) init() {
server.MaxConnNum = Default_MaxConnNum
log.SRelease("invalid MaxConnNum, reset to ", server.MaxConnNum)
}
if server.PendingWriteNum <= 0 {
server.PendingWriteNum = Default_PendingWriteNum
log.SRelease("invalid PendingWriteNum, reset to ", server.PendingWriteNum)
}
if server.MinMsgLen <= 0 {
server.MinMsgLen = Default_MinMsgLen
log.SRelease("invalid MinMsgLen, reset to ", server.MinMsgLen)
if server.LenMsgLen <= 0 {
server.LenMsgLen = Default_LenMsgLen
log.SRelease("invalid LenMsgLen, reset to ", server.LenMsgLen)
}
if server.MaxMsgLen <= 0 {
@@ -64,6 +66,17 @@ func (server *TCPServer) init() {
log.SRelease("invalid MaxMsgLen, reset to ", server.MaxMsgLen)
}
maxMsgLen := server.MsgParser.getMaxMsgLen(server.LenMsgLen)
if server.MaxMsgLen > maxMsgLen {
server.MaxMsgLen = maxMsgLen
log.SRelease("invalid MaxMsgLen, reset to ", maxMsgLen)
}
if server.MinMsgLen <= 0 {
server.MinMsgLen = Default_MinMsgLen
log.SRelease("invalid MinMsgLen, reset to ", server.MinMsgLen)
}
if server.WriteDeadline == 0 {
server.WriteDeadline = Default_WriteDeadline
log.SRelease("invalid WriteDeadline, reset to ", server.WriteDeadline.Seconds(),"s")
@@ -74,18 +87,12 @@ func (server *TCPServer) init() {
log.SRelease("invalid ReadDeadline, reset to ", server.ReadDeadline.Seconds(),"s")
}
if server.LenMsgLen == 0 {
server.LenMsgLen = Default_LenMsgLen
}
if server.NewAgent == nil {
log.SFatal("NewAgent must not be nil")
}
server.ln = ln
server.conns = make(ConnSet)
server.INetMempool = NewMemAreaPool()
server.MsgParser.init()
}

View File

@@ -155,16 +155,21 @@ func initNode(id int) {
//2.顺序安装服务
serviceOrder := cluster.GetCluster().GetLocalNodeInfo().ServiceList
for _,serviceName:= range serviceOrder{
bSetup := false
for _, s := range preSetupService {
if s.GetName() != serviceName {
continue
}
bSetup = true
pServiceCfg := cluster.GetCluster().GetServiceCfg(s.GetName())
s.Init(s, cluster.GetRpcClient, cluster.GetRpcServer, pServiceCfg)
service.Setup(s)
}
if bSetup == false {
log.SFatal("Service name "+serviceName+" configuration error")
}
}
//3.service初始化
@@ -290,9 +295,9 @@ func GetService(serviceName string) service.IService {
return service.GetService(serviceName)
}
func SetConfigDir(configDir string) {
configDir = configDir
cluster.SetConfigDir(configDir)
func SetConfigDir(cfgDir string) {
configDir = cfgDir
cluster.SetConfigDir(cfgDir)
}
func GetConfigDir() string {

View File

@@ -193,9 +193,11 @@ func Report() {
record = prof.record
prof.record = list.New()
callNum := prof.callNum
totalCostTime := prof.totalCostTime
prof.stackLocker.RUnlock()
DefaultReportFunction(name,prof.callNum,prof.totalCostTime,record)
DefaultReportFunction(name,callNum,totalCostTime,record)
}
}

View File

@@ -11,14 +11,19 @@ import (
"time"
)
const MaxCheckCallRpcCount = 1000
const MaxPendingWriteNum = 200000
const ConnectInterval = 2*time.Second
const RpcConnNum = 1
const RpcLenMsgLen = 4
const RpcMinMsgLen = 2
const CheckRpcCallTimeoutInterval = 5*time.Second
const DefaultRpcTimeout = 15*time.Second
const(
DefaultRpcConnNum = 1
DefaultRpcLenMsgLen = 4
DefaultRpcMinMsgLen = 2
DefaultMaxCheckCallRpcCount = 1000
DefaultMaxPendingWriteNum = 200000
DefaultConnectInterval = 2*time.Second
DefaultCheckRpcCallTimeoutInterval = 5*time.Second
DefaultRpcTimeout = 15*time.Second
)
var clientSeq uint32
type IRealClient interface {
@@ -64,7 +69,7 @@ func (bc *Client) makeCallFail(call *Call) {
func (bc *Client) checkRpcCallTimeout() {
for{
time.Sleep(CheckRpcCallTimeoutInterval)
time.Sleep(DefaultCheckRpcCallTimeoutInterval)
now := time.Now()
for i := 0; i < bc.maxCheckCallRpcCount; i++ {

View File

@@ -41,7 +41,10 @@ func (slf *GoGoPBProcessor) Marshal(v interface{}) ([]byte, error){
}
func (slf *GoGoPBProcessor) Unmarshal(data []byte, msg interface{}) error{
protoMsg := msg.(proto.Message)
protoMsg,ok := msg.(proto.Message)
if ok == false {
return fmt.Errorf("%+v is not of proto.Message type",msg)
}
return proto.Unmarshal(data, protoMsg)
}

View File

@@ -44,7 +44,8 @@ func (lc *LClient) Go(rpcHandler IRpcHandler,noReply bool, serviceMethod string,
sErr := errors.New("Call serviceMethod " + serviceMethod + " is error!")
log.SError(sErr.Error())
call := MakeCall()
call.Err = sErr
call.DoError(sErr)
return call
}
@@ -53,12 +54,13 @@ func (lc *LClient) Go(rpcHandler IRpcHandler,noReply bool, serviceMethod string,
//调用自己rpcHandler处理器
err := pLocalRpcServer.myselfRpcHandlerGo(lc.selfClient,serviceName, serviceMethod, args, requestHandlerNull,reply)
call := MakeCall()
if err != nil {
call.Err = err
call.DoError(err)
return call
}
call.done<-call
call.DoOK()
return call
}
@@ -119,7 +121,7 @@ func NewLClient(nodeId int) *Client{
client := &Client{}
client.clientId = atomic.AddUint32(&clientSeq, 1)
client.nodeId = nodeId
client.maxCheckCallRpcCount = MaxCheckCallRpcCount
client.maxCheckCallRpcCount = DefaultMaxCheckCallRpcCount
client.callRpcTimeout = DefaultRpcTimeout
lClient := &LClient{}

View File

@@ -44,8 +44,9 @@ func (rc *RClient) Go(rpcHandler IRpcHandler,noReply bool, serviceMethod string,
_, processor := GetProcessorType(args)
InParam, err := processor.Marshal(args)
if err != nil {
log.SError(err.Error())
call := MakeCall()
call.Err = err
call.DoError(err)
return call
}
@@ -65,14 +66,17 @@ func (rc *RClient) RawGo(rpcHandler IRpcHandler,processor IRpcProcessor, noReply
if err != nil {
call.Seq = 0
call.Err = err
log.SError(err.Error())
call.DoError(err)
return call
}
conn := rc.GetConn()
if conn == nil || conn.IsConnected()==false {
call.Seq = 0
call.Err = errors.New(serviceMethod + " was called failed,rpc client is disconnect")
sErr := errors.New(serviceMethod + " was called failed,rpc client is disconnect")
log.SError(sErr.Error())
call.DoError(sErr)
return call
}
@@ -83,8 +87,11 @@ func (rc *RClient) RawGo(rpcHandler IRpcHandler,processor IRpcProcessor, noReply
err = conn.WriteMsg([]byte{uint8(processor.GetProcessorType())}, bytes)
if err != nil {
rc.selfClient.RemovePending(call.Seq)
log.SError(err.Error())
call.Seq = 0
call.Err = err
call.DoError(err)
}
return call
@@ -211,19 +218,19 @@ func NewRClient(nodeId int, addr string, maxRpcParamLen uint32,triggerRpcConnEve
client := &Client{}
client.clientId = atomic.AddUint32(&clientSeq, 1)
client.nodeId = nodeId
client.maxCheckCallRpcCount = MaxCheckCallRpcCount
client.maxCheckCallRpcCount = DefaultMaxCheckCallRpcCount
client.callRpcTimeout = DefaultRpcTimeout
c:= &RClient{}
c.selfClient = client
c.Addr = addr
c.ConnectInterval = ConnectInterval
c.PendingWriteNum = MaxPendingWriteNum
c.ConnectInterval = DefaultConnectInterval
c.PendingWriteNum = DefaultMaxPendingWriteNum
c.AutoReconnect = true
c.TriggerRpcConnEvent = triggerRpcConnEvent
c.ConnNum = RpcConnNum
c.LenMsgLen = RpcLenMsgLen
c.MinMsgLen = RpcMinMsgLen
c.ConnNum = DefaultRpcConnNum
c.LenMsgLen = DefaultRpcLenMsgLen
c.MinMsgLen = DefaultRpcMinMsgLen
c.ReadDeadline = Default_ReadWriteDeadline
c.WriteDeadline = Default_ReadWriteDeadline
c.LittleEndian = LittleEndian

View File

@@ -102,6 +102,15 @@ func (rpcResponse *RpcResponse) Clear() *RpcResponse{
return rpcResponse
}
func (call *Call) DoError(err error){
call.Err = err
call.done <- call
}
func (call *Call) DoOK(){
call.done <- call
}
func (call *Call) Clear() *Call{
call.Seq = 0
call.ServiceMethod = ""

View File

@@ -138,7 +138,7 @@ func (handler *RpcHandler) isExportedOrBuiltinType(t reflect.Type) bool {
func (handler *RpcHandler) suitableMethods(method reflect.Method) error {
//只有RPC_开头的才能被调用
if strings.Index(method.Name, "RPC_") != 0 {
if strings.Index(method.Name, "RPC_") != 0 && strings.Index(method.Name, "RPC") != 0 {
return nil
}
@@ -291,14 +291,16 @@ func (handler *RpcHandler) HandlerRpcRequest(request *RpcRequest) {
request.requestHandle(nil, RpcError(rErr))
return
}
requestHanle := request.requestHandle
returnValues := v.method.Func.Call(paramList)
errInter := returnValues[0].Interface()
if errInter != nil {
err = errInter.(error)
}
if request.requestHandle != nil && v.hasResponder == false {
request.requestHandle(oParam.Interface(), ConvertError(err))
if v.hasResponder == false && requestHanle != nil {
requestHanle(oParam.Interface(), ConvertError(err))
}
}
@@ -459,11 +461,6 @@ func (handler *RpcHandler) callRpc(nodeId int, serviceMethod string, args interf
pClient := pClientList[0]
pCall := pClient.Go(handler.rpcHandler,false, serviceMethod, args, reply)
if pCall.Err != nil {
err = pCall.Err
ReleaseCall(pCall)
return err
}
err = pCall.Done().Err
pClient.RemovePending(pCall.Seq)
@@ -496,7 +493,11 @@ func (handler *RpcHandler) asyncCallRpc(nodeId int, serviceMethod string, args i
err, count := handler.funcRpcClient(nodeId, serviceMethod, pClientList[:])
if count == 0 || err != nil {
if err == nil {
err = fmt.Errorf("cannot find %s from nodeId %d",serviceMethod,nodeId)
if nodeId > 0 {
err = fmt.Errorf("cannot find %s from nodeId %d",serviceMethod,nodeId)
}else {
err = fmt.Errorf("No %s service found in the origin network",serviceMethod)
}
}
fVal.Call([]reflect.Value{reflect.ValueOf(reply), reflect.ValueOf(err)})
log.SError("Call serviceMethod is error:", err.Error())

View File

@@ -71,7 +71,6 @@ func (server *Server) Start(listenAddr string, maxRpcParamLen uint32) {
}
server.rpcServer.Addr = ":" + splitAddr[1]
server.rpcServer.LenMsgLen = 4 //uint16
server.rpcServer.MinMsgLen = 2
if maxRpcParamLen > 0 {
server.rpcServer.MaxMsgLen = maxRpcParamLen
@@ -85,6 +84,8 @@ func (server *Server) Start(listenAddr string, maxRpcParamLen uint32) {
server.rpcServer.LittleEndian = LittleEndian
server.rpcServer.WriteDeadline = Default_ReadWriteDeadline
server.rpcServer.ReadDeadline = Default_ReadWriteDeadline
server.rpcServer.LenMsgLen = DefaultRpcLenMsgLen
server.rpcServer.Start()
}
@@ -147,7 +148,6 @@ func (agent *RpcAgent) Run() {
ReleaseRpcRequest(req)
continue
} else {
//will close tcpconn
ReleaseRpcRequest(req)
break
}
@@ -256,10 +256,10 @@ func (server *Server) selfNodeRpcHandlerGo(processor IRpcProcessor, client *Clie
rpcHandler := server.rpcHandleFinder.FindRpcHandler(handlerName)
if rpcHandler == nil {
err := errors.New("service method " + serviceMethod + " not config!")
log.SError(err.Error())
pCall.Seq = 0
pCall.Err = errors.New("service method " + serviceMethod + " not config!")
pCall.done <- pCall
log.SError(pCall.Err.Error())
pCall.DoError(err)
return pCall
}
@@ -273,10 +273,10 @@ func (server *Server) selfNodeRpcHandlerGo(processor IRpcProcessor, client *Clie
var err error
iParam,err = processor.Clone(args)
if err != nil {
sErr := errors.New("RpcHandler " + handlerName + "."+serviceMethod+" deep copy inParam is error:" + err.Error())
log.SError(sErr.Error())
pCall.Seq = 0
pCall.Err = errors.New("RpcHandler " + handlerName + "."+serviceMethod+" deep copy inParam is error:" + err.Error())
pCall.done <- pCall
log.SError(pCall.Err.Error())
pCall.DoError(sErr)
return pCall
}
@@ -289,9 +289,10 @@ func (server *Server) selfNodeRpcHandlerGo(processor IRpcProcessor, client *Clie
var err error
req.inParam, err = rpcHandler.UnmarshalInParam(processor, serviceMethod, rpcMethodId, rawArgs)
if err != nil {
log.SError(err.Error())
pCall.Seq = 0
pCall.DoError(err)
ReleaseRpcRequest(req)
pCall.Err = err
pCall.done <- pCall
return pCall
}
}
@@ -303,38 +304,40 @@ func (server *Server) selfNodeRpcHandlerGo(processor IRpcProcessor, client *Clie
if reply != nil && Returns != reply && Returns != nil {
byteReturns, err := req.rpcProcessor.Marshal(Returns)
if err != nil {
log.SError("returns data cannot be marshal ", callSeq)
ReleaseRpcRequest(req)
}
err = req.rpcProcessor.Unmarshal(byteReturns, reply)
if err != nil {
log.SError("returns data cannot be Unmarshal ", callSeq)
ReleaseRpcRequest(req)
Err = ConvertError(err)
log.SError("returns data cannot be marshal,callSeq is ", callSeq," error is ",err.Error())
}else{
err = req.rpcProcessor.Unmarshal(byteReturns, reply)
if err != nil {
Err = ConvertError(err)
log.SError("returns data cannot be Unmarshal,callSeq is ", callSeq," error is ",err.Error())
}
}
}
ReleaseRpcRequest(req)
v := client.RemovePending(callSeq)
if v == nil {
log.SError("rpcClient cannot find seq ",callSeq, " in pending")
ReleaseRpcRequest(req)
return
}
if len(Err) == 0 {
v.Err = nil
v.DoOK()
} else {
v.Err = Err
log.SError(Err.Error())
v.DoError(Err)
}
v.done <- v
ReleaseRpcRequest(req)
}
}
err := rpcHandler.PushRpcRequest(req)
if err != nil {
log.SError(err.Error())
pCall.DoError(err)
ReleaseRpcRequest(req)
pCall.Err = err
pCall.done <- pCall
}
return pCall

View File

@@ -20,6 +20,7 @@ var timerDispatcherLen = 100000
var maxServiceEventChannelNum = 2000000
type IService interface {
concurrent.IConcurrent
Init(iService IService,getClientFun rpc.FuncRpcClient,getServerFun rpc.FuncRpcServer,serviceCfg interface{})
Stop()
Start()

View File

@@ -1,18 +1,49 @@
package messagequeueservice
import (
"errors"
"fmt"
"github.com/duanhf2012/origin/log"
"github.com/duanhf2012/origin/service"
"github.com/duanhf2012/origin/sysmodule/mongodbmodule"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo/options"
"sunserver/common/util"
"time"
)
const MaxDays = 180
type DataType interface {
int | uint | int64 | uint64 | float32 | float64 | int32 | uint32 | int16 | uint16
}
func convertToNumber[DType DataType](val interface{}) (error, DType) {
switch val.(type) {
case int64:
return nil, DType(val.(int64))
case int:
return nil, DType(val.(int))
case uint:
return nil, DType(val.(uint))
case uint64:
return nil, DType(val.(uint64))
case float32:
return nil, DType(val.(float32))
case float64:
return nil, DType(val.(float64))
case int32:
return nil, DType(val.(int32))
case uint32:
return nil, DType(val.(uint32))
case int16:
return nil, DType(val.(int16))
case uint16:
return nil, DType(val.(uint16))
}
return errors.New("unsupported type"), 0
}
type MongoPersist struct {
service.Module
mongo mongodbmodule.MongoModule
@@ -363,7 +394,7 @@ func (mp *MongoPersist) GetIndex(topicData *TopicData) uint64 {
for _, e := range document {
if e.Key == "_id" {
errC, seq := util.ConvertToNumber[uint64](e.Value)
errC, seq := convertToNumber[uint64](e.Value)
if errC != nil {
log.Error("value is error:%s,%+v, ", errC.Error(), e.Value)
}

View File

@@ -6,9 +6,9 @@ import (
"github.com/duanhf2012/origin/rpc"
"github.com/duanhf2012/origin/service"
"github.com/duanhf2012/origin/sysmodule/mongodbmodule"
"github.com/duanhf2012/origin/util/coroutine"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo/options"
"runtime"
"sync"
"sync/atomic"
"time"
@@ -71,7 +71,9 @@ func (mp *MongoPersist) OnInit() error {
}
//开启协程
coroutine.GoRecover(mp.persistCoroutine,-1)
mp.waitGroup.Add(1)
go mp.persistCoroutine()
return nil
}
@@ -260,7 +262,6 @@ func (mp *MongoPersist) JugeTimeoutSave() bool{
}
func (mp *MongoPersist) persistCoroutine(){
mp.waitGroup.Add(1)
defer mp.waitGroup.Done()
for atomic.LoadInt32(&mp.stop)==0 || mp.hasPersistData(){
//间隔时间sleep
@@ -291,6 +292,15 @@ func (mp *MongoPersist) hasPersistData() bool{
}
func (mp *MongoPersist) saveToDB(){
defer func() {
if r := recover(); r != nil {
buf := make([]byte, 4096)
l := runtime.Stack(buf, false)
errString := fmt.Sprint(r)
log.SError(" Core dump info[", errString, "]\n", string(buf[:l]))
}
}()
//1.copy数据
mp.Lock()
mapRemoveRankData := mp.mapRemoveRankData

View File

@@ -356,12 +356,12 @@ func (rs *RankSkip) GetRankNodeDataByRank(rank uint64) (*RankData, uint64) {
// GetRankKeyPrevToLimit 获取key前count名的数据
func (rs *RankSkip) GetRankKeyPrevToLimit(findKey, count uint64, result *rpc.RankDataList) error {
if rs.GetRankLen() <= 0 {
return fmt.Errorf("rank[", rs.rankId, "] no data")
return fmt.Errorf("rank[%d] no data", rs.rankId)
}
findData, ok := rs.mapRankData[findKey]
if ok == false {
return fmt.Errorf("rank[", rs.rankId, "] no data")
return fmt.Errorf("rank[%d] no data", rs.rankId)
}
_, rankPos := rs.skipList.GetWithPosition(findData)
@@ -385,12 +385,12 @@ func (rs *RankSkip) GetRankKeyPrevToLimit(findKey, count uint64, result *rpc.Ran
// GetRankKeyPrevToLimit 获取key前count名的数据
func (rs *RankSkip) GetRankKeyNextToLimit(findKey, count uint64, result *rpc.RankDataList) error {
if rs.GetRankLen() <= 0 {
return fmt.Errorf("rank[", rs.rankId, "] no data")
return fmt.Errorf("rank[%d] no data", rs.rankId)
}
findData, ok := rs.mapRankData[findKey]
if ok == false {
return fmt.Errorf("rank[", rs.rankId, "] no data")
return fmt.Errorf("rank[%d] no data", rs.rankId)
}
_, rankPos := rs.skipList.GetWithPosition(findData)

View File

@@ -90,6 +90,10 @@ func (tcpService *TcpService) OnInit() error{
if ok == true {
tcpService.tcpServer.LittleEndian = LittleEndian.(bool)
}
LenMsgLen,ok := tcpCfg["LenMsgLen"]
if ok == true {
tcpService.tcpServer.LenMsgLen = int(LenMsgLen.(float64))
}
MinMsgLen,ok := tcpCfg["MinMsgLen"]
if ok == true {
tcpService.tcpServer.MinMsgLen = uint32(MinMsgLen.(float64))