mirror of
https://github.com/duanhf2012/origin.git
synced 2026-02-05 07:24:57 +08:00
Compare commits
12 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
4ad8204fde | ||
|
|
8f15546fb1 | ||
|
|
0f3a965d73 | ||
|
|
dfb6959843 | ||
|
|
dd4aaf9c57 | ||
|
|
6ef98a2104 | ||
|
|
1890b300ee | ||
|
|
6fea2226e1 | ||
|
|
ec1c2b4517 | ||
|
|
4b84d9a1d5 | ||
|
|
85a8ec58e5 | ||
|
|
962016d476 |
40
README.md
40
README.md
@@ -64,17 +64,19 @@ cluster.json如下:
|
||||
"Private": false,
|
||||
"ListenAddr":"127.0.0.1:8001",
|
||||
"MaxRpcParamLen": 409600,
|
||||
"CompressBytesLen": 20480,
|
||||
"NodeName": "Node_Test1",
|
||||
"remark":"//以_打头的,表示只在本机进程,不对整个子网公开",
|
||||
"remark":"//以_打头的,表示只在本机进程,不对整个子网公开",
|
||||
"ServiceList": ["TestService1","TestService2","TestServiceCall","GateService","_TcpService","HttpService","WSService"]
|
||||
},
|
||||
{
|
||||
{
|
||||
"NodeId": 2,
|
||||
"Private": false,
|
||||
"ListenAddr":"127.0.0.1:8002",
|
||||
"MaxRpcParamLen": 409600,
|
||||
"CompressBytesLen": 20480,
|
||||
"NodeName": "Node_Test1",
|
||||
"remark":"//以_打头的,表示只在本机进程,不对整个子网公开",
|
||||
"remark":"//以_打头的,表示只在本机进程,不对整个子网公开",
|
||||
"ServiceList": ["TestService1","TestService2","TestServiceCall","GateService","TcpService","HttpService","WSService"]
|
||||
}
|
||||
]
|
||||
@@ -88,6 +90,7 @@ cluster.json如下:
|
||||
* Private: 是否私有结点,如果为true,表示其他结点不会发现它,但可以自我运行。
|
||||
* ListenAddr:Rpc通信服务的监听地址
|
||||
* MaxRpcParamLen:Rpc参数数据包最大长度,该参数可以缺省,默认一次Rpc调用支持最大4294967295byte长度数据。
|
||||
* CompressBytesLen:Rpc网络数据压缩,当数据>=20480byte时将被压缩。该参数可以缺省或者填0时不进行压缩。
|
||||
* NodeName:结点名称
|
||||
* remark:备注,可选项
|
||||
* ServiceList:该Node拥有的服务列表,注意:origin按配置的顺序进行安装初始化。但停止服务的顺序是相反。
|
||||
@@ -715,6 +718,15 @@ func (slf *TestService7) CallTest(){
|
||||
}else{
|
||||
fmt.Printf("Call output %d\n",output)
|
||||
}
|
||||
|
||||
|
||||
//自定义超时,默认rpc超时时间为15s
|
||||
err = slf.CallWithTimeout(time.Second*1, "TestService6.RPC_Sum", &input, &output)
|
||||
if err != nil {
|
||||
fmt.Printf("Call error :%+v\n", err)
|
||||
} else {
|
||||
fmt.Printf("Call output %d\n", output)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -726,13 +738,27 @@ func (slf *TestService7) AsyncCallTest(){
|
||||
})*/
|
||||
//异步调用,在数据返回时,会回调传入函数
|
||||
//注意函数的第一个参数一定是RPC_Sum函数的第二个参数,err error为RPC_Sum返回值
|
||||
slf.AsyncCall("TestService6.RPC_Sum",&input,func(output *int,err error){
|
||||
err := slf.AsyncCall("TestService6.RPC_Sum", &input, func(output *int, err error) {
|
||||
if err != nil {
|
||||
fmt.Printf("AsyncCall error :%+v\n",err)
|
||||
}else{
|
||||
fmt.Printf("AsyncCall output %d\n",*output)
|
||||
fmt.Printf("AsyncCall error :%+v\n", err)
|
||||
} else {
|
||||
fmt.Printf("AsyncCall output %d\n", *output)
|
||||
}
|
||||
})
|
||||
fmt.Println(err)
|
||||
|
||||
//自定义超时,返回一个cancel函数,可以在业务需要时取消rpc调用
|
||||
rpcCancel, err := slf.AsyncCallWithTimeout(time.Second*1, "TestService6.RPC_Sum", &input, func(output *int, err error) {
|
||||
//如果下面注释的rpcCancel()函数被调用,这里可能将不再返回
|
||||
if err != nil {
|
||||
fmt.Printf("AsyncCall error :%+v\n", err)
|
||||
} else {
|
||||
fmt.Printf("AsyncCall output %d\n", *output)
|
||||
}
|
||||
})
|
||||
//rpcCancel()
|
||||
fmt.Println(err, rpcCancel)
|
||||
|
||||
}
|
||||
|
||||
func (slf *TestService7) GoTest(){
|
||||
|
||||
@@ -26,6 +26,7 @@ type NodeInfo struct {
|
||||
Private bool
|
||||
ListenAddr string
|
||||
MaxRpcParamLen uint32 //最大Rpc参数长度
|
||||
CompressBytesLen int //超过字节进行压缩的长度
|
||||
ServiceList []string //所有的有序服务列表
|
||||
PublicServiceList []string //对外公开的服务列表
|
||||
DiscoveryService []string //筛选发现的服务,如果不配置,不进行筛选
|
||||
@@ -73,7 +74,7 @@ func SetServiceDiscovery(serviceDiscovery IServiceDiscovery) {
|
||||
}
|
||||
|
||||
func (cls *Cluster) Start() {
|
||||
cls.rpcServer.Start(cls.localNodeInfo.ListenAddr, cls.localNodeInfo.MaxRpcParamLen)
|
||||
cls.rpcServer.Start(cls.localNodeInfo.ListenAddr, cls.localNodeInfo.MaxRpcParamLen,cls.localNodeInfo.CompressBytesLen)
|
||||
}
|
||||
|
||||
func (cls *Cluster) Stop() {
|
||||
@@ -195,7 +196,7 @@ func (cls *Cluster) serviceDiscoverySetNodeInfo(nodeInfo *NodeInfo) {
|
||||
|
||||
rpcInfo := NodeRpcInfo{}
|
||||
rpcInfo.nodeInfo = *nodeInfo
|
||||
rpcInfo.client =rpc.NewRClient(nodeInfo.NodeId, nodeInfo.ListenAddr, nodeInfo.MaxRpcParamLen,cls.triggerRpcEvent)
|
||||
rpcInfo.client =rpc.NewRClient(nodeInfo.NodeId, nodeInfo.ListenAddr, nodeInfo.MaxRpcParamLen,cls.localNodeInfo.CompressBytesLen,cls.triggerRpcEvent)
|
||||
cls.mapRpc[nodeInfo.NodeId] = rpcInfo
|
||||
}
|
||||
|
||||
|
||||
@@ -5,6 +5,8 @@ import (
|
||||
"github.com/duanhf2012/origin/log"
|
||||
"github.com/duanhf2012/origin/rpc"
|
||||
"github.com/duanhf2012/origin/service"
|
||||
"time"
|
||||
"github.com/duanhf2012/origin/util/timer"
|
||||
)
|
||||
|
||||
const DynamicDiscoveryMasterName = "DiscoveryMaster"
|
||||
@@ -341,6 +343,10 @@ func (dc *DynamicDiscoveryClient) isDiscoverNode(nodeId int) bool {
|
||||
}
|
||||
|
||||
func (dc *DynamicDiscoveryClient) OnNodeConnected(nodeId int) {
|
||||
dc.regServiceDiscover(nodeId)
|
||||
}
|
||||
|
||||
func (dc *DynamicDiscoveryClient) regServiceDiscover(nodeId int){
|
||||
nodeInfo := cluster.GetMasterDiscoveryNodeInfo(nodeId)
|
||||
if nodeInfo == nil {
|
||||
return
|
||||
@@ -364,6 +370,10 @@ func (dc *DynamicDiscoveryClient) OnNodeConnected(nodeId int) {
|
||||
err := dc.AsyncCallNode(nodeId, RegServiceDiscover, &req, func(res *rpc.Empty, err error) {
|
||||
if err != nil {
|
||||
log.SError("call ", RegServiceDiscover, " is fail :", err.Error())
|
||||
dc.AfterFunc(time.Second*3, func(timer *timer.Timer) {
|
||||
dc.regServiceDiscover(nodeId)
|
||||
})
|
||||
|
||||
return
|
||||
}
|
||||
})
|
||||
|
||||
@@ -68,6 +68,11 @@ func (pbProcessor *PBProcessor) MsgRoute(clientId uint64, msg interface{}) error
|
||||
// must goroutine safe
|
||||
func (pbProcessor *PBProcessor) Unmarshal(clientId uint64, data []byte) (interface{}, error) {
|
||||
defer pbProcessor.ReleaseByteSlice(data)
|
||||
return pbProcessor.UnmarshalWithOutRelease(clientId, data)
|
||||
}
|
||||
|
||||
// unmarshal but not release data
|
||||
func (pbProcessor *PBProcessor) UnmarshalWithOutRelease(clientId uint64, data []byte) (interface{}, error) {
|
||||
var msgType uint16
|
||||
if pbProcessor.LittleEndian == true {
|
||||
msgType = binary.LittleEndian.Uint16(data[:2])
|
||||
|
||||
@@ -16,7 +16,7 @@ type memAreaPool struct {
|
||||
pool []sync.Pool
|
||||
}
|
||||
|
||||
var memAreaPoolList = [3]*memAreaPool{&memAreaPool{minAreaValue: 1, maxAreaValue: 4096, growthValue: 512}, &memAreaPool{minAreaValue: 4097, maxAreaValue: 40960, growthValue: 4096}, &memAreaPool{minAreaValue: 40961, maxAreaValue: 417792, growthValue: 16384}}
|
||||
var memAreaPoolList = [4]*memAreaPool{&memAreaPool{minAreaValue: 1, maxAreaValue: 4096, growthValue: 512}, &memAreaPool{minAreaValue: 4097, maxAreaValue: 40960, growthValue: 4096}, &memAreaPool{minAreaValue: 40961, maxAreaValue: 417792, growthValue: 16384}, &memAreaPool{minAreaValue: 417793, maxAreaValue: 1925120, growthValue: 65536}}
|
||||
|
||||
func init() {
|
||||
for i := 0; i < len(memAreaPoolList); i++ {
|
||||
|
||||
104
rpc/client.go
104
rpc/client.go
@@ -1,7 +1,6 @@
|
||||
package rpc
|
||||
|
||||
import (
|
||||
"container/list"
|
||||
"errors"
|
||||
"github.com/duanhf2012/origin/network"
|
||||
"reflect"
|
||||
@@ -9,6 +8,7 @@ import (
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
"github.com/duanhf2012/origin/log"
|
||||
)
|
||||
|
||||
const(
|
||||
@@ -20,7 +20,7 @@ const(
|
||||
|
||||
|
||||
DefaultConnectInterval = 2*time.Second
|
||||
DefaultCheckRpcCallTimeoutInterval = 5*time.Second
|
||||
DefaultCheckRpcCallTimeoutInterval = 1*time.Second
|
||||
DefaultRpcTimeout = 15*time.Second
|
||||
)
|
||||
|
||||
@@ -30,9 +30,9 @@ type IRealClient interface {
|
||||
SetConn(conn *network.TCPConn)
|
||||
Close(waitDone bool)
|
||||
|
||||
AsyncCall(rpcHandler IRpcHandler, serviceMethod string, callback reflect.Value, args interface{}, replyParam interface{}) error
|
||||
Go(rpcHandler IRpcHandler, noReply bool, serviceMethod string, args interface{}, reply interface{}) *Call
|
||||
RawGo(rpcHandler IRpcHandler,processor IRpcProcessor, noReply bool, rpcMethodId uint32, serviceMethod string, rawArgs []byte, reply interface{}) *Call
|
||||
AsyncCall(timeout time.Duration,rpcHandler IRpcHandler, serviceMethod string, callback reflect.Value, args interface{}, replyParam interface{},cancelable bool) (CancelRpc,error)
|
||||
Go(timeout time.Duration,rpcHandler IRpcHandler, noReply bool, serviceMethod string, args interface{}, reply interface{}) *Call
|
||||
RawGo(timeout time.Duration,rpcHandler IRpcHandler,processor IRpcProcessor, noReply bool, rpcMethodId uint32, serviceMethod string, rawArgs []byte, reply interface{}) *Call
|
||||
IsConnected() bool
|
||||
|
||||
Run()
|
||||
@@ -44,11 +44,11 @@ type Client struct {
|
||||
nodeId int
|
||||
pendingLock sync.RWMutex
|
||||
startSeq uint64
|
||||
pending map[uint64]*list.Element
|
||||
pendingTimer *list.List
|
||||
pending map[uint64]*Call
|
||||
callRpcTimeout time.Duration
|
||||
maxCheckCallRpcCount int
|
||||
|
||||
callTimerHeap CallTimerHeap
|
||||
IRealClient
|
||||
}
|
||||
|
||||
@@ -59,7 +59,6 @@ func (client *Client) NewClientAgent(conn *network.TCPConn) network.Agent {
|
||||
}
|
||||
|
||||
func (bc *Client) makeCallFail(call *Call) {
|
||||
bc.removePending(call.Seq)
|
||||
if call.callback != nil && call.callback.IsValid() {
|
||||
call.rpcHandler.PushRpcResponse(call)
|
||||
} else {
|
||||
@@ -70,54 +69,52 @@ func (bc *Client) makeCallFail(call *Call) {
|
||||
func (bc *Client) checkRpcCallTimeout() {
|
||||
for{
|
||||
time.Sleep(DefaultCheckRpcCallTimeoutInterval)
|
||||
now := time.Now()
|
||||
|
||||
for i := 0; i < bc.maxCheckCallRpcCount; i++ {
|
||||
bc.pendingLock.Lock()
|
||||
if bc.pendingTimer == nil {
|
||||
|
||||
callSeq := bc.callTimerHeap.PopTimeout()
|
||||
if callSeq == 0 {
|
||||
bc.pendingLock.Unlock()
|
||||
break
|
||||
}
|
||||
|
||||
pElem := bc.pendingTimer.Front()
|
||||
if pElem == nil {
|
||||
bc.pendingLock.Unlock()
|
||||
break
|
||||
}
|
||||
pCall := pElem.Value.(*Call)
|
||||
if now.Sub(pCall.callTime) > bc.callRpcTimeout {
|
||||
strTimeout := strconv.FormatInt(int64(bc.callRpcTimeout/time.Second), 10)
|
||||
pCall.Err = errors.New("RPC call takes more than " + strTimeout + " seconds")
|
||||
bc.makeCallFail(pCall)
|
||||
pCall := bc.pending[callSeq]
|
||||
if pCall == nil {
|
||||
bc.pendingLock.Unlock()
|
||||
log.SError("callSeq ",callSeq," is not find")
|
||||
continue
|
||||
}
|
||||
|
||||
delete(bc.pending,callSeq)
|
||||
strTimeout := strconv.FormatInt(int64(pCall.TimeOut.Seconds()), 10)
|
||||
pCall.Err = errors.New("RPC call takes more than " + strTimeout + " seconds,method is "+pCall.ServiceMethod)
|
||||
log.SError(pCall.Err.Error())
|
||||
bc.makeCallFail(pCall)
|
||||
bc.pendingLock.Unlock()
|
||||
break
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (client *Client) InitPending() {
|
||||
client.pendingLock.Lock()
|
||||
if client.pending != nil {
|
||||
for _, v := range client.pending {
|
||||
v.Value.(*Call).Err = errors.New("node is disconnect")
|
||||
v.Value.(*Call).done <- v.Value.(*Call)
|
||||
}
|
||||
}
|
||||
|
||||
client.pending = make(map[uint64]*list.Element, 4096)
|
||||
client.pendingTimer = list.New()
|
||||
client.callTimerHeap.Init()
|
||||
client.pending = make(map[uint64]*Call,4096)
|
||||
client.pendingLock.Unlock()
|
||||
}
|
||||
|
||||
|
||||
func (bc *Client) AddPending(call *Call) {
|
||||
bc.pendingLock.Lock()
|
||||
call.callTime = time.Now()
|
||||
elemTimer := bc.pendingTimer.PushBack(call)
|
||||
bc.pending[call.Seq] = elemTimer //如果下面发送失败,将会一一直存在这里
|
||||
|
||||
if call.Seq == 0 {
|
||||
bc.pendingLock.Unlock()
|
||||
log.SStack("call is error.")
|
||||
return
|
||||
}
|
||||
|
||||
bc.pending[call.Seq] = call
|
||||
bc.callTimerHeap.AddTimer(call.Seq,call.TimeOut)
|
||||
|
||||
bc.pendingLock.Unlock()
|
||||
}
|
||||
|
||||
@@ -136,30 +133,45 @@ func (bc *Client) removePending(seq uint64) *Call {
|
||||
if ok == false {
|
||||
return nil
|
||||
}
|
||||
call := v.Value.(*Call)
|
||||
bc.pendingTimer.Remove(v)
|
||||
|
||||
bc.callTimerHeap.Cancel(seq)
|
||||
delete(bc.pending, seq)
|
||||
return call
|
||||
return v
|
||||
}
|
||||
|
||||
func (bc *Client) FindPending(seq uint64) *Call {
|
||||
func (bc *Client) FindPending(seq uint64) (pCall *Call) {
|
||||
if seq == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
bc.pendingLock.Lock()
|
||||
v, ok := bc.pending[seq]
|
||||
if ok == false {
|
||||
bc.pendingLock.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
pCall := v.Value.(*Call)
|
||||
pCall = bc.pending[seq]
|
||||
bc.pendingLock.Unlock()
|
||||
|
||||
return pCall
|
||||
}
|
||||
|
||||
func (bc *Client) cleanPending(){
|
||||
bc.pendingLock.Lock()
|
||||
for {
|
||||
callSeq := bc.callTimerHeap.PopFirst()
|
||||
if callSeq == 0 {
|
||||
break
|
||||
}
|
||||
pCall := bc.pending[callSeq]
|
||||
if pCall == nil {
|
||||
log.SError("callSeq ",callSeq," is not find")
|
||||
continue
|
||||
}
|
||||
|
||||
delete(bc.pending,callSeq)
|
||||
pCall.Err = errors.New("nodeid is disconnect ")
|
||||
bc.makeCallFail(pCall)
|
||||
}
|
||||
|
||||
bc.pendingLock.Unlock()
|
||||
}
|
||||
|
||||
func (bc *Client) generateSeq() uint64 {
|
||||
return atomic.AddUint64(&bc.startSeq, 1)
|
||||
}
|
||||
|
||||
102
rpc/compressor.go
Normal file
102
rpc/compressor.go
Normal file
@@ -0,0 +1,102 @@
|
||||
package rpc
|
||||
|
||||
import (
|
||||
"runtime"
|
||||
"errors"
|
||||
"github.com/pierrec/lz4/v4"
|
||||
"fmt"
|
||||
"github.com/duanhf2012/origin/network"
|
||||
)
|
||||
|
||||
var memPool network.INetMempool = network.NewMemAreaPool()
|
||||
|
||||
type ICompressor interface {
|
||||
CompressBlock(src []byte) ([]byte, error) //dst如果有预申请使用dst内存,传入nil时内部申请
|
||||
UncompressBlock(src []byte) ([]byte, error) //dst如果有预申请使用dst内存,传入nil时内部申请
|
||||
|
||||
CompressBufferCollection(buffer []byte) //压缩的Buffer内存回收
|
||||
UnCompressBufferCollection(buffer []byte) //解压缩的Buffer内存回收
|
||||
}
|
||||
|
||||
var compressor ICompressor
|
||||
func init(){
|
||||
SetCompressor(&Lz4Compressor{})
|
||||
}
|
||||
|
||||
func SetCompressor(cp ICompressor){
|
||||
compressor = cp
|
||||
}
|
||||
|
||||
type Lz4Compressor struct {
|
||||
}
|
||||
|
||||
func (lc *Lz4Compressor) CompressBlock(src []byte) (dest []byte, err error) {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
buf := make([]byte, 4096)
|
||||
l := runtime.Stack(buf, false)
|
||||
errString := fmt.Sprint(r)
|
||||
err = errors.New("core dump info[" + errString + "]\n" + string(buf[:l]))
|
||||
}
|
||||
}()
|
||||
|
||||
var c lz4.Compressor
|
||||
var cnt int
|
||||
dest = memPool.MakeByteSlice(lz4.CompressBlockBound(len(src))+1)
|
||||
cnt, err = c.CompressBlock(src, dest[1:])
|
||||
if err != nil {
|
||||
memPool.ReleaseByteSlice(dest)
|
||||
return nil,err
|
||||
}
|
||||
|
||||
ratio := len(src) / cnt
|
||||
if len(src) % cnt > 0 {
|
||||
ratio += 1
|
||||
}
|
||||
|
||||
if ratio > 255 {
|
||||
memPool.ReleaseByteSlice(dest)
|
||||
return nil,fmt.Errorf("Impermissible errors")
|
||||
}
|
||||
|
||||
dest[0] = uint8(ratio)
|
||||
dest = dest[:cnt+1]
|
||||
return
|
||||
}
|
||||
|
||||
func (lc *Lz4Compressor) UncompressBlock(src []byte) (dest []byte, err error) {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
buf := make([]byte, 4096)
|
||||
l := runtime.Stack(buf, false)
|
||||
errString := fmt.Sprint(r)
|
||||
err = errors.New("core dump info[" + errString + "]\n" + string(buf[:l]))
|
||||
}
|
||||
}()
|
||||
|
||||
radio := uint8(src[0])
|
||||
if radio == 0 {
|
||||
return nil,fmt.Errorf("Impermissible errors")
|
||||
}
|
||||
|
||||
dest = memPool.MakeByteSlice(len(src)*int(radio))
|
||||
cnt, err := lz4.UncompressBlock(src[1:], dest)
|
||||
if err != nil {
|
||||
memPool.ReleaseByteSlice(dest)
|
||||
return nil,err
|
||||
}
|
||||
|
||||
return dest[:cnt],nil
|
||||
}
|
||||
|
||||
func (lc *Lz4Compressor) compressBlockBound(n int) int{
|
||||
return lz4.CompressBlockBound(n)
|
||||
}
|
||||
|
||||
func (lc *Lz4Compressor) CompressBufferCollection(buffer []byte){
|
||||
memPool.ReleaseByteSlice(buffer)
|
||||
}
|
||||
|
||||
func (lc *Lz4Compressor) UnCompressBufferCollection(buffer []byte) {
|
||||
memPool.ReleaseByteSlice(buffer)
|
||||
}
|
||||
@@ -7,6 +7,7 @@ import (
|
||||
"reflect"
|
||||
"strings"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
)
|
||||
|
||||
//本结点的Client
|
||||
@@ -36,7 +37,7 @@ func (lc *LClient) SetConn(conn *network.TCPConn){
|
||||
func (lc *LClient) Close(waitDone bool){
|
||||
}
|
||||
|
||||
func (lc *LClient) Go(rpcHandler IRpcHandler,noReply bool, serviceMethod string, args interface{}, reply interface{}) *Call {
|
||||
func (lc *LClient) Go(timeout time.Duration,rpcHandler IRpcHandler,noReply bool, serviceMethod string, args interface{}, reply interface{}) *Call {
|
||||
pLocalRpcServer := rpcHandler.GetRpcServer()()
|
||||
//判断是否是同一服务
|
||||
findIndex := strings.Index(serviceMethod, ".")
|
||||
@@ -65,19 +66,20 @@ func (lc *LClient) Go(rpcHandler IRpcHandler,noReply bool, serviceMethod string,
|
||||
}
|
||||
|
||||
//其他的rpcHandler的处理器
|
||||
return pLocalRpcServer.selfNodeRpcHandlerGo(nil, lc.selfClient, noReply, serviceName, 0, serviceMethod, args, reply, nil)
|
||||
return pLocalRpcServer.selfNodeRpcHandlerGo(timeout,nil, lc.selfClient, noReply, serviceName, 0, serviceMethod, args, reply, nil)
|
||||
}
|
||||
|
||||
|
||||
func (rc *LClient) RawGo(rpcHandler IRpcHandler,processor IRpcProcessor, noReply bool, rpcMethodId uint32, serviceName string, rawArgs []byte, reply interface{}) *Call {
|
||||
func (rc *LClient) RawGo(timeout time.Duration,rpcHandler IRpcHandler,processor IRpcProcessor, noReply bool, rpcMethodId uint32, serviceName string, rawArgs []byte, reply interface{}) *Call {
|
||||
pLocalRpcServer := rpcHandler.GetRpcServer()()
|
||||
|
||||
call := MakeCall()
|
||||
call.ServiceMethod = serviceName
|
||||
call.Reply = reply
|
||||
|
||||
|
||||
//服务自我调用
|
||||
if serviceName == rpcHandler.GetName() {
|
||||
call := MakeCall()
|
||||
call.ServiceMethod = serviceName
|
||||
call.Reply = reply
|
||||
call.TimeOut = timeout
|
||||
|
||||
err := pLocalRpcServer.myselfRpcHandlerGo(rc.selfClient,serviceName, serviceName, rawArgs, requestHandlerNull,nil)
|
||||
call.Err = err
|
||||
call.done <- call
|
||||
@@ -86,11 +88,11 @@ func (rc *LClient) RawGo(rpcHandler IRpcHandler,processor IRpcProcessor, noReply
|
||||
}
|
||||
|
||||
//其他的rpcHandler的处理器
|
||||
return pLocalRpcServer.selfNodeRpcHandlerGo(processor,rc.selfClient, true, serviceName, rpcMethodId, serviceName, nil, nil, rawArgs)
|
||||
return pLocalRpcServer.selfNodeRpcHandlerGo(timeout,processor,rc.selfClient, true, serviceName, rpcMethodId, serviceName, nil, nil, rawArgs)
|
||||
}
|
||||
|
||||
|
||||
func (lc *LClient) AsyncCall(rpcHandler IRpcHandler, serviceMethod string, callback reflect.Value, args interface{}, reply interface{}) error {
|
||||
func (lc *LClient) AsyncCall(timeout time.Duration,rpcHandler IRpcHandler, serviceMethod string, callback reflect.Value, args interface{}, reply interface{},cancelable bool) (CancelRpc,error) {
|
||||
pLocalRpcServer := rpcHandler.GetRpcServer()()
|
||||
|
||||
//判断是否是同一服务
|
||||
@@ -99,22 +101,22 @@ func (lc *LClient) AsyncCall(rpcHandler IRpcHandler, serviceMethod string, callb
|
||||
err := errors.New("Call serviceMethod " + serviceMethod + " is error!")
|
||||
callback.Call([]reflect.Value{reflect.ValueOf(reply), reflect.ValueOf(err)})
|
||||
log.SError(err.Error())
|
||||
return nil
|
||||
return emptyCancelRpc,nil
|
||||
}
|
||||
|
||||
serviceName := serviceMethod[:findIndex]
|
||||
//调用自己rpcHandler处理器
|
||||
if serviceName == rpcHandler.GetName() { //自己服务调用
|
||||
return pLocalRpcServer.myselfRpcHandlerGo(lc.selfClient,serviceName, serviceMethod, args,callback ,reply)
|
||||
return emptyCancelRpc,pLocalRpcServer.myselfRpcHandlerGo(lc.selfClient,serviceName, serviceMethod, args,callback ,reply)
|
||||
}
|
||||
|
||||
//其他的rpcHandler的处理器
|
||||
err := pLocalRpcServer.selfNodeRpcHandlerAsyncGo(lc.selfClient, rpcHandler, false, serviceName, serviceMethod, args, reply, callback)
|
||||
calcelRpc,err := pLocalRpcServer.selfNodeRpcHandlerAsyncGo(timeout,lc.selfClient, rpcHandler, false, serviceName, serviceMethod, args, reply, callback,cancelable)
|
||||
if err != nil {
|
||||
callback.Call([]reflect.Value{reflect.ValueOf(reply), reflect.ValueOf(err)})
|
||||
}
|
||||
|
||||
return nil
|
||||
return calcelRpc,nil
|
||||
}
|
||||
|
||||
func NewLClient(nodeId int) *Client{
|
||||
|
||||
121
rpc/rclient.go
121
rpc/rclient.go
@@ -9,10 +9,12 @@ import (
|
||||
"reflect"
|
||||
"runtime"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
)
|
||||
|
||||
//跨结点连接的Client
|
||||
type RClient struct {
|
||||
compressBytesLen int
|
||||
selfClient *Client
|
||||
network.TCPClient
|
||||
conn *network.TCPConn
|
||||
@@ -40,7 +42,7 @@ func (rc *RClient) SetConn(conn *network.TCPConn){
|
||||
rc.Unlock()
|
||||
}
|
||||
|
||||
func (rc *RClient) Go(rpcHandler IRpcHandler,noReply bool, serviceMethod string, args interface{}, reply interface{}) *Call {
|
||||
func (rc *RClient) Go(timeout time.Duration,rpcHandler IRpcHandler,noReply bool, serviceMethod string, args interface{}, reply interface{}) *Call {
|
||||
_, processor := GetProcessorType(args)
|
||||
InParam, err := processor.Marshal(args)
|
||||
if err != nil {
|
||||
@@ -50,15 +52,15 @@ func (rc *RClient) Go(rpcHandler IRpcHandler,noReply bool, serviceMethod string,
|
||||
return call
|
||||
}
|
||||
|
||||
return rc.RawGo(rpcHandler,processor, noReply, 0, serviceMethod, InParam, reply)
|
||||
return rc.RawGo(timeout,rpcHandler,processor, noReply, 0, serviceMethod, InParam, reply)
|
||||
}
|
||||
|
||||
|
||||
func (rc *RClient) RawGo(rpcHandler IRpcHandler,processor IRpcProcessor, noReply bool, rpcMethodId uint32, serviceMethod string, rawArgs []byte, reply interface{}) *Call {
|
||||
func (rc *RClient) RawGo(timeout time.Duration,rpcHandler IRpcHandler,processor IRpcProcessor, noReply bool, rpcMethodId uint32, serviceMethod string, rawArgs []byte, reply interface{}) *Call {
|
||||
call := MakeCall()
|
||||
call.ServiceMethod = serviceMethod
|
||||
call.Reply = reply
|
||||
call.Seq = rc.selfClient.generateSeq()
|
||||
call.TimeOut = timeout
|
||||
|
||||
request := MakeRpcRequest(processor, call.Seq, rpcMethodId, serviceMethod, noReply, rawArgs)
|
||||
bytes, err := processor.Marshal(request.RpcRequestData)
|
||||
@@ -80,11 +82,31 @@ func (rc *RClient) RawGo(rpcHandler IRpcHandler,processor IRpcProcessor, noReply
|
||||
return call
|
||||
}
|
||||
|
||||
var compressBuff[]byte
|
||||
bCompress := uint8(0)
|
||||
if rc.compressBytesLen > 0 && len(bytes) >= rc.compressBytesLen {
|
||||
var cErr error
|
||||
compressBuff,cErr = compressor.CompressBlock(bytes)
|
||||
if cErr != nil {
|
||||
call.Seq = 0
|
||||
log.SError(cErr.Error())
|
||||
call.DoError(cErr)
|
||||
return call
|
||||
}
|
||||
if len(compressBuff) < len(bytes) {
|
||||
bytes = compressBuff
|
||||
bCompress = 1<<7
|
||||
}
|
||||
}
|
||||
|
||||
if noReply == false {
|
||||
rc.selfClient.AddPending(call)
|
||||
}
|
||||
|
||||
err = conn.WriteMsg([]byte{uint8(processor.GetProcessorType())}, bytes)
|
||||
err = conn.WriteMsg([]byte{uint8(processor.GetProcessorType())|bCompress}, bytes)
|
||||
if cap(compressBuff) >0 {
|
||||
compressor.CompressBufferCollection(compressBuff)
|
||||
}
|
||||
if err != nil {
|
||||
rc.selfClient.RemovePending(call.Seq)
|
||||
|
||||
@@ -98,20 +120,20 @@ func (rc *RClient) RawGo(rpcHandler IRpcHandler,processor IRpcProcessor, noReply
|
||||
}
|
||||
|
||||
|
||||
func (rc *RClient) AsyncCall(rpcHandler IRpcHandler, serviceMethod string, callback reflect.Value, args interface{}, replyParam interface{}) error {
|
||||
err := rc.asyncCall(rpcHandler, serviceMethod, callback, args, replyParam)
|
||||
func (rc *RClient) AsyncCall(timeout time.Duration,rpcHandler IRpcHandler, serviceMethod string, callback reflect.Value, args interface{}, replyParam interface{},cancelable bool) (CancelRpc,error) {
|
||||
cancelRpc,err := rc.asyncCall(timeout,rpcHandler, serviceMethod, callback, args, replyParam,cancelable)
|
||||
if err != nil {
|
||||
callback.Call([]reflect.Value{reflect.ValueOf(replyParam), reflect.ValueOf(err)})
|
||||
}
|
||||
|
||||
return nil
|
||||
return cancelRpc,nil
|
||||
}
|
||||
|
||||
func (rc *RClient) asyncCall(rpcHandler IRpcHandler, serviceMethod string, callback reflect.Value, args interface{}, replyParam interface{}) error {
|
||||
func (rc *RClient) asyncCall(timeout time.Duration,rpcHandler IRpcHandler, serviceMethod string, callback reflect.Value, args interface{}, replyParam interface{},cancelable bool) (CancelRpc,error) {
|
||||
processorType, processor := GetProcessorType(args)
|
||||
InParam, herr := processor.Marshal(args)
|
||||
if herr != nil {
|
||||
return herr
|
||||
return emptyCancelRpc,herr
|
||||
}
|
||||
|
||||
seq := rc.selfClient.generateSeq()
|
||||
@@ -119,12 +141,27 @@ func (rc *RClient) asyncCall(rpcHandler IRpcHandler, serviceMethod string, callb
|
||||
bytes, err := processor.Marshal(request.RpcRequestData)
|
||||
ReleaseRpcRequest(request)
|
||||
if err != nil {
|
||||
return err
|
||||
return emptyCancelRpc,err
|
||||
}
|
||||
|
||||
conn := rc.GetConn()
|
||||
if conn == nil || conn.IsConnected()==false {
|
||||
return errors.New("Rpc server is disconnect,call " + serviceMethod)
|
||||
return emptyCancelRpc,errors.New("Rpc server is disconnect,call " + serviceMethod)
|
||||
}
|
||||
|
||||
var compressBuff[]byte
|
||||
bCompress := uint8(0)
|
||||
if rc.compressBytesLen>0 &&len(bytes) >= rc.compressBytesLen {
|
||||
var cErr error
|
||||
compressBuff,cErr = compressor.CompressBlock(bytes)
|
||||
if cErr != nil {
|
||||
return emptyCancelRpc,cErr
|
||||
}
|
||||
|
||||
if len(compressBuff) < len(bytes) {
|
||||
bytes = compressBuff
|
||||
bCompress = 1<<7
|
||||
}
|
||||
}
|
||||
|
||||
call := MakeCall()
|
||||
@@ -133,18 +170,28 @@ func (rc *RClient) asyncCall(rpcHandler IRpcHandler, serviceMethod string, callb
|
||||
call.rpcHandler = rpcHandler
|
||||
call.ServiceMethod = serviceMethod
|
||||
call.Seq = seq
|
||||
call.TimeOut = timeout
|
||||
rc.selfClient.AddPending(call)
|
||||
|
||||
err = conn.WriteMsg([]byte{uint8(processorType)}, bytes)
|
||||
err = conn.WriteMsg([]byte{uint8(processorType)|bCompress}, bytes)
|
||||
if cap(compressBuff) >0 {
|
||||
compressor.CompressBufferCollection(compressBuff)
|
||||
}
|
||||
if err != nil {
|
||||
rc.selfClient.RemovePending(call.Seq)
|
||||
ReleaseCall(call)
|
||||
return err
|
||||
return emptyCancelRpc,err
|
||||
}
|
||||
|
||||
return nil
|
||||
if cancelable {
|
||||
rpcCancel := RpcCancel{CallSeq:seq,Cli: rc.selfClient}
|
||||
return rpcCancel.CancelRpc,nil
|
||||
}
|
||||
|
||||
return emptyCancelRpc,nil
|
||||
}
|
||||
|
||||
|
||||
func (rc *RClient) Run() {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
@@ -163,7 +210,8 @@ func (rc *RClient) Run() {
|
||||
return
|
||||
}
|
||||
|
||||
processor := GetProcessor(bytes[0])
|
||||
bCompress := (bytes[0]>>7) > 0
|
||||
processor := GetProcessor(bytes[0]&0x7f)
|
||||
if processor == nil {
|
||||
rc.conn.ReleaseReadMsg(bytes)
|
||||
log.SError("rpcClient ", rc.Addr, " ReadMsg head error:", err.Error())
|
||||
@@ -174,14 +222,33 @@ func (rc *RClient) Run() {
|
||||
response := RpcResponse{}
|
||||
response.RpcResponseData = processor.MakeRpcResponse(0, "", nil)
|
||||
|
||||
err = processor.Unmarshal(bytes[1:], response.RpcResponseData)
|
||||
//解压缩
|
||||
byteData := bytes[1:]
|
||||
var compressBuff []byte
|
||||
|
||||
if bCompress == true {
|
||||
var unCompressErr error
|
||||
compressBuff,unCompressErr = compressor.UncompressBlock(byteData)
|
||||
if unCompressErr!= nil {
|
||||
rc.conn.ReleaseReadMsg(bytes)
|
||||
log.SError("rpcClient ", rc.Addr, " ReadMsg head error:", unCompressErr.Error())
|
||||
return
|
||||
}
|
||||
byteData = compressBuff
|
||||
}
|
||||
|
||||
err = processor.Unmarshal(byteData, response.RpcResponseData)
|
||||
if cap(compressBuff) > 0 {
|
||||
compressor.UnCompressBufferCollection(compressBuff)
|
||||
}
|
||||
|
||||
rc.conn.ReleaseReadMsg(bytes)
|
||||
if err != nil {
|
||||
processor.ReleaseRpcResponse(response.RpcResponseData)
|
||||
log.SError("rpcClient Unmarshal head error:", err.Error())
|
||||
continue
|
||||
}
|
||||
|
||||
|
||||
v := rc.selfClient.RemovePending(response.RpcResponseData.GetSeq())
|
||||
if v == nil {
|
||||
log.SError("rpcClient cannot find seq ", response.RpcResponseData.GetSeq(), " in pending")
|
||||
@@ -214,14 +281,14 @@ func (rc *RClient) OnClose() {
|
||||
rc.TriggerRpcConnEvent(false, rc.selfClient.GetClientId(), rc.selfClient.GetNodeId())
|
||||
}
|
||||
|
||||
func NewRClient(nodeId int, addr string, maxRpcParamLen uint32,triggerRpcConnEvent TriggerRpcConnEvent) *Client{
|
||||
func NewRClient(nodeId int, addr string, maxRpcParamLen uint32,compressBytesLen int,triggerRpcConnEvent TriggerRpcConnEvent) *Client{
|
||||
client := &Client{}
|
||||
client.clientId = atomic.AddUint32(&clientSeq, 1)
|
||||
client.nodeId = nodeId
|
||||
client.maxCheckCallRpcCount = DefaultMaxCheckCallRpcCount
|
||||
client.callRpcTimeout = DefaultRpcTimeout
|
||||
|
||||
c:= &RClient{}
|
||||
c.compressBytesLen = compressBytesLen
|
||||
c.selfClient = client
|
||||
c.Addr = addr
|
||||
c.ConnectInterval = DefaultConnectInterval
|
||||
@@ -251,18 +318,6 @@ func NewRClient(nodeId int, addr string, maxRpcParamLen uint32,triggerRpcConnEve
|
||||
|
||||
func (rc *RClient) Close(waitDone bool) {
|
||||
rc.TCPClient.Close(waitDone)
|
||||
|
||||
rc.selfClient.pendingLock.Lock()
|
||||
for {
|
||||
pElem := rc.selfClient.pendingTimer.Front()
|
||||
if pElem == nil {
|
||||
break
|
||||
}
|
||||
|
||||
pCall := pElem.Value.(*Call)
|
||||
pCall.Err = errors.New("nodeid is disconnect ")
|
||||
rc.selfClient.makeCallFail(pCall)
|
||||
}
|
||||
rc.selfClient.pendingLock.Unlock()
|
||||
rc.selfClient.cleanPending()
|
||||
}
|
||||
|
||||
|
||||
13
rpc/rpc.go
13
rpc/rpc.go
@@ -68,7 +68,16 @@ type Call struct {
|
||||
connId int
|
||||
callback *reflect.Value
|
||||
rpcHandler IRpcHandler
|
||||
callTime time.Time
|
||||
TimeOut time.Duration
|
||||
}
|
||||
|
||||
type RpcCancel struct {
|
||||
Cli *Client
|
||||
CallSeq uint64
|
||||
}
|
||||
|
||||
func (rc *RpcCancel) CancelRpc(){
|
||||
rc.Cli.RemovePending(rc.CallSeq)
|
||||
}
|
||||
|
||||
func (slf *RpcRequest) Clear() *RpcRequest{
|
||||
@@ -124,6 +133,8 @@ func (call *Call) Clear() *Call{
|
||||
call.connId = 0
|
||||
call.callback = nil
|
||||
call.rpcHandler = nil
|
||||
call.TimeOut = 0
|
||||
|
||||
return call
|
||||
}
|
||||
|
||||
|
||||
@@ -9,6 +9,7 @@ import (
|
||||
"strings"
|
||||
"unicode"
|
||||
"unicode/utf8"
|
||||
"time"
|
||||
)
|
||||
|
||||
const maxClusterNode int = 128
|
||||
@@ -75,6 +76,9 @@ type IDiscoveryServiceListener interface {
|
||||
OnUnDiscoveryService(nodeId int, serviceName []string)
|
||||
}
|
||||
|
||||
type CancelRpc func()
|
||||
func emptyCancelRpc(){}
|
||||
|
||||
type IRpcHandler interface {
|
||||
IRpcHandlerChannel
|
||||
GetName() string
|
||||
@@ -83,11 +87,18 @@ type IRpcHandler interface {
|
||||
HandlerRpcRequest(request *RpcRequest)
|
||||
HandlerRpcResponseCB(call *Call)
|
||||
CallMethod(client *Client,ServiceMethod string, param interface{},callBack reflect.Value, reply interface{}) error
|
||||
AsyncCall(serviceMethod string, args interface{}, callback interface{}) error
|
||||
|
||||
Call(serviceMethod string, args interface{}, reply interface{}) error
|
||||
Go(serviceMethod string, args interface{}) error
|
||||
AsyncCallNode(nodeId int, serviceMethod string, args interface{}, callback interface{}) error
|
||||
CallNode(nodeId int, serviceMethod string, args interface{}, reply interface{}) error
|
||||
AsyncCall(serviceMethod string, args interface{}, callback interface{}) error
|
||||
AsyncCallNode(nodeId int, serviceMethod string, args interface{}, callback interface{}) error
|
||||
|
||||
CallWithTimeout(timeout time.Duration,serviceMethod string, args interface{}, reply interface{}) error
|
||||
CallNodeWithTimeout(timeout time.Duration,nodeId int, serviceMethod string, args interface{}, reply interface{}) error
|
||||
AsyncCallWithTimeout(timeout time.Duration,serviceMethod string, args interface{}, callback interface{}) (CancelRpc,error)
|
||||
AsyncCallNodeWithTimeout(timeout time.Duration,nodeId int, serviceMethod string, args interface{}, callback interface{}) (CancelRpc,error)
|
||||
|
||||
Go(serviceMethod string, args interface{}) error
|
||||
GoNode(nodeId int, serviceMethod string, args interface{}) error
|
||||
RawGoNode(rpcProcessorType RpcProcessorType, nodeId int, rpcMethodId uint32, serviceName string, rawArgs []byte) error
|
||||
CastGo(serviceMethod string, args interface{}) error
|
||||
@@ -323,7 +334,8 @@ func (handler *RpcHandler) CallMethod(client *Client,ServiceMethod string, param
|
||||
pCall.callback = &callBack
|
||||
pCall.Seq = client.generateSeq()
|
||||
callSeq = pCall.Seq
|
||||
|
||||
pCall.TimeOut = DefaultRpcTimeout
|
||||
pCall.ServiceMethod = ServiceMethod
|
||||
client.AddPending(pCall)
|
||||
|
||||
//有返回值时
|
||||
@@ -433,7 +445,7 @@ func (handler *RpcHandler) goRpc(processor IRpcProcessor, bCast bool, nodeId int
|
||||
|
||||
//2.rpcClient调用
|
||||
for i := 0; i < count; i++ {
|
||||
pCall := pClientList[i].Go(handler.rpcHandler,true, serviceMethod, args, nil)
|
||||
pCall := pClientList[i].Go(DefaultRpcTimeout,handler.rpcHandler,true, serviceMethod, args, nil)
|
||||
if pCall.Err != nil {
|
||||
err = pCall.Err
|
||||
}
|
||||
@@ -444,7 +456,7 @@ func (handler *RpcHandler) goRpc(processor IRpcProcessor, bCast bool, nodeId int
|
||||
return err
|
||||
}
|
||||
|
||||
func (handler *RpcHandler) callRpc(nodeId int, serviceMethod string, args interface{}, reply interface{}) error {
|
||||
func (handler *RpcHandler) callRpc(timeout time.Duration,nodeId int, serviceMethod string, args interface{}, reply interface{}) error {
|
||||
var pClientList [maxClusterNode]*Client
|
||||
err, count := handler.funcRpcClient(nodeId, serviceMethod, pClientList[:])
|
||||
if err != nil {
|
||||
@@ -460,7 +472,7 @@ func (handler *RpcHandler) callRpc(nodeId int, serviceMethod string, args interf
|
||||
}
|
||||
|
||||
pClient := pClientList[0]
|
||||
pCall := pClient.Go(handler.rpcHandler,false, serviceMethod, args, reply)
|
||||
pCall := pClient.Go(timeout,handler.rpcHandler,false, serviceMethod, args, reply)
|
||||
|
||||
err = pCall.Done().Err
|
||||
pClient.RemovePending(pCall.Seq)
|
||||
@@ -468,24 +480,24 @@ func (handler *RpcHandler) callRpc(nodeId int, serviceMethod string, args interf
|
||||
return err
|
||||
}
|
||||
|
||||
func (handler *RpcHandler) asyncCallRpc(nodeId int, serviceMethod string, args interface{}, callback interface{}) error {
|
||||
func (handler *RpcHandler) asyncCallRpc(timeout time.Duration,nodeId int, serviceMethod string, args interface{}, callback interface{}) (CancelRpc,error) {
|
||||
fVal := reflect.ValueOf(callback)
|
||||
if fVal.Kind() != reflect.Func {
|
||||
err := errors.New("call " + serviceMethod + " input callback param is error!")
|
||||
log.SError(err.Error())
|
||||
return err
|
||||
return emptyCancelRpc,err
|
||||
}
|
||||
|
||||
if fVal.Type().NumIn() != 2 {
|
||||
err := errors.New("call " + serviceMethod + " callback param function is error!")
|
||||
log.SError(err.Error())
|
||||
return err
|
||||
return emptyCancelRpc,err
|
||||
}
|
||||
|
||||
if fVal.Type().In(0).Kind() != reflect.Ptr || fVal.Type().In(1).String() != "error" {
|
||||
err := errors.New("call " + serviceMethod + " callback param function is error!")
|
||||
log.SError(err.Error())
|
||||
return err
|
||||
return emptyCancelRpc,err
|
||||
}
|
||||
|
||||
reply := reflect.New(fVal.Type().In(0).Elem()).Interface()
|
||||
@@ -501,23 +513,19 @@ func (handler *RpcHandler) asyncCallRpc(nodeId int, serviceMethod string, args i
|
||||
}
|
||||
fVal.Call([]reflect.Value{reflect.ValueOf(reply), reflect.ValueOf(err)})
|
||||
log.SError("Call serviceMethod is error:", err.Error())
|
||||
return nil
|
||||
return emptyCancelRpc,nil
|
||||
}
|
||||
|
||||
if count > 1 {
|
||||
err := errors.New("cannot call more then 1 node")
|
||||
fVal.Call([]reflect.Value{reflect.ValueOf(reply), reflect.ValueOf(err)})
|
||||
log.SError(err.Error())
|
||||
return nil
|
||||
return emptyCancelRpc,nil
|
||||
}
|
||||
|
||||
//2.rpcClient调用
|
||||
//如果调用本结点服务
|
||||
pClient := pClientList[0]
|
||||
pClient.AsyncCall(handler.rpcHandler, serviceMethod, fVal, args, reply)
|
||||
|
||||
|
||||
return nil
|
||||
return pClientList[0].AsyncCall(timeout,handler.rpcHandler, serviceMethod, fVal, args, reply,false)
|
||||
}
|
||||
|
||||
func (handler *RpcHandler) GetName() string {
|
||||
@@ -528,12 +536,29 @@ func (handler *RpcHandler) IsSingleCoroutine() bool {
|
||||
return handler.rpcHandler.IsSingleCoroutine()
|
||||
}
|
||||
|
||||
func (handler *RpcHandler) CallWithTimeout(timeout time.Duration,serviceMethod string, args interface{}, reply interface{}) error {
|
||||
return handler.callRpc(timeout,0, serviceMethod, args, reply)
|
||||
}
|
||||
|
||||
func (handler *RpcHandler) CallNodeWithTimeout(timeout time.Duration,nodeId int, serviceMethod string, args interface{}, reply interface{}) error{
|
||||
return handler.callRpc(timeout,nodeId, serviceMethod, args, reply)
|
||||
}
|
||||
|
||||
func (handler *RpcHandler) AsyncCallWithTimeout(timeout time.Duration,serviceMethod string, args interface{}, callback interface{}) (CancelRpc,error){
|
||||
return handler.asyncCallRpc(timeout,0, serviceMethod, args, callback)
|
||||
}
|
||||
|
||||
func (handler *RpcHandler) AsyncCallNodeWithTimeout(timeout time.Duration,nodeId int, serviceMethod string, args interface{}, callback interface{}) (CancelRpc,error){
|
||||
return handler.asyncCallRpc(timeout,nodeId, serviceMethod, args, callback)
|
||||
}
|
||||
|
||||
func (handler *RpcHandler) AsyncCall(serviceMethod string, args interface{}, callback interface{}) error {
|
||||
return handler.asyncCallRpc(0, serviceMethod, args, callback)
|
||||
_,err := handler.asyncCallRpc(DefaultRpcTimeout,0, serviceMethod, args, callback)
|
||||
return err
|
||||
}
|
||||
|
||||
func (handler *RpcHandler) Call(serviceMethod string, args interface{}, reply interface{}) error {
|
||||
return handler.callRpc(0, serviceMethod, args, reply)
|
||||
return handler.callRpc(DefaultRpcTimeout,0, serviceMethod, args, reply)
|
||||
}
|
||||
|
||||
func (handler *RpcHandler) Go(serviceMethod string, args interface{}) error {
|
||||
@@ -541,11 +566,13 @@ func (handler *RpcHandler) Go(serviceMethod string, args interface{}) error {
|
||||
}
|
||||
|
||||
func (handler *RpcHandler) AsyncCallNode(nodeId int, serviceMethod string, args interface{}, callback interface{}) error {
|
||||
return handler.asyncCallRpc(nodeId, serviceMethod, args, callback)
|
||||
_,err:= handler.asyncCallRpc(DefaultRpcTimeout,nodeId, serviceMethod, args, callback)
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func (handler *RpcHandler) CallNode(nodeId int, serviceMethod string, args interface{}, reply interface{}) error {
|
||||
return handler.callRpc(nodeId, serviceMethod, args, reply)
|
||||
return handler.callRpc(DefaultRpcTimeout,nodeId, serviceMethod, args, reply)
|
||||
}
|
||||
|
||||
func (handler *RpcHandler) GoNode(nodeId int, serviceMethod string, args interface{}) error {
|
||||
@@ -573,7 +600,7 @@ func (handler *RpcHandler) RawGoNode(rpcProcessorType RpcProcessorType, nodeId i
|
||||
//如果调用本结点服务
|
||||
for i := 0; i < count; i++ {
|
||||
//跨node调用
|
||||
pCall := handler.pClientList[i].RawGo(handler.rpcHandler,processor, true, rpcMethodId, serviceName, rawArgs, nil)
|
||||
pCall := handler.pClientList[i].RawGo(DefaultRpcTimeout,handler.rpcHandler,processor, true, rpcMethodId, serviceName, rawArgs, nil)
|
||||
if pCall.Err != nil {
|
||||
err = pCall.Err
|
||||
}
|
||||
|
||||
89
rpc/rpctimer.go
Normal file
89
rpc/rpctimer.go
Normal file
@@ -0,0 +1,89 @@
|
||||
package rpc
|
||||
|
||||
import (
|
||||
"container/heap"
|
||||
"time"
|
||||
)
|
||||
|
||||
type CallTimer struct {
|
||||
SeqId uint64
|
||||
FireTime int64
|
||||
}
|
||||
|
||||
type CallTimerHeap struct {
|
||||
callTimer []CallTimer
|
||||
mapSeqIndex map[uint64]int
|
||||
}
|
||||
|
||||
func (h *CallTimerHeap) Init() {
|
||||
h.mapSeqIndex = make(map[uint64]int, 4096)
|
||||
h.callTimer = make([]CallTimer, 0, 4096)
|
||||
}
|
||||
|
||||
func (h *CallTimerHeap) Len() int {
|
||||
return len(h.callTimer)
|
||||
}
|
||||
|
||||
func (h *CallTimerHeap) Less(i, j int) bool {
|
||||
return h.callTimer[i].FireTime < h.callTimer[j].FireTime
|
||||
}
|
||||
|
||||
func (h *CallTimerHeap) Swap(i, j int) {
|
||||
h.callTimer[i], h.callTimer[j] = h.callTimer[j], h.callTimer[i]
|
||||
h.mapSeqIndex[h.callTimer[i].SeqId] = i
|
||||
h.mapSeqIndex[h.callTimer[j].SeqId] = j
|
||||
}
|
||||
|
||||
func (h *CallTimerHeap) Push(t any) {
|
||||
callTimer := t.(CallTimer)
|
||||
h.mapSeqIndex[callTimer.SeqId] = len(h.callTimer)
|
||||
h.callTimer = append(h.callTimer, callTimer)
|
||||
}
|
||||
|
||||
func (h *CallTimerHeap) Pop() any {
|
||||
l := len(h.callTimer)
|
||||
seqId := h.callTimer[l-1].SeqId
|
||||
|
||||
h.callTimer = h.callTimer[:l-1]
|
||||
delete(h.mapSeqIndex, seqId)
|
||||
return seqId
|
||||
}
|
||||
|
||||
func (h *CallTimerHeap) Cancel(seq uint64) bool {
|
||||
index, ok := h.mapSeqIndex[seq]
|
||||
if ok == false {
|
||||
return false
|
||||
}
|
||||
|
||||
heap.Remove(h, index)
|
||||
return true
|
||||
}
|
||||
|
||||
func (h *CallTimerHeap) AddTimer(seqId uint64,d time.Duration){
|
||||
heap.Push(h, CallTimer{
|
||||
SeqId: seqId,
|
||||
FireTime: time.Now().Add(d).UnixNano(),
|
||||
})
|
||||
}
|
||||
|
||||
func (h *CallTimerHeap) PopTimeout() uint64 {
|
||||
if h.Len() == 0 {
|
||||
return 0
|
||||
}
|
||||
|
||||
nextFireTime := h.callTimer[0].FireTime
|
||||
if nextFireTime > time.Now().UnixNano() {
|
||||
return 0
|
||||
}
|
||||
|
||||
return heap.Pop(h).(uint64)
|
||||
}
|
||||
|
||||
func (h *CallTimerHeap) PopFirst() uint64 {
|
||||
if h.Len() == 0 {
|
||||
return 0
|
||||
}
|
||||
|
||||
return heap.Pop(h).(uint64)
|
||||
}
|
||||
|
||||
@@ -27,6 +27,8 @@ type Server struct {
|
||||
functions map[interface{}]interface{}
|
||||
rpcHandleFinder RpcHandleFinder
|
||||
rpcServer *network.TCPServer
|
||||
|
||||
compressBytesLen int
|
||||
}
|
||||
|
||||
type RpcAgent struct {
|
||||
@@ -64,7 +66,7 @@ func (server *Server) Init(rpcHandleFinder RpcHandleFinder) {
|
||||
|
||||
const Default_ReadWriteDeadline = 15*time.Second
|
||||
|
||||
func (server *Server) Start(listenAddr string, maxRpcParamLen uint32) {
|
||||
func (server *Server) Start(listenAddr string, maxRpcParamLen uint32,compressBytesLen int) {
|
||||
splitAddr := strings.Split(listenAddr, ":")
|
||||
if len(splitAddr) != 2 {
|
||||
log.SFatal("listen addr is error :", listenAddr)
|
||||
@@ -72,6 +74,7 @@ func (server *Server) Start(listenAddr string, maxRpcParamLen uint32) {
|
||||
|
||||
server.rpcServer.Addr = ":" + splitAddr[1]
|
||||
server.rpcServer.MinMsgLen = 2
|
||||
server.compressBytesLen = compressBytesLen
|
||||
if maxRpcParamLen > 0 {
|
||||
server.rpcServer.MaxMsgLen = maxRpcParamLen
|
||||
} else {
|
||||
@@ -112,7 +115,26 @@ func (agent *RpcAgent) WriteResponse(processor IRpcProcessor, serviceMethod stri
|
||||
return
|
||||
}
|
||||
|
||||
errM = agent.conn.WriteMsg([]byte{uint8(processor.GetProcessorType())}, bytes)
|
||||
var compressBuff[]byte
|
||||
bCompress := uint8(0)
|
||||
if agent.rpcServer.compressBytesLen >0 && len(bytes) >= agent.rpcServer.compressBytesLen {
|
||||
var cErr error
|
||||
|
||||
compressBuff,cErr = compressor.CompressBlock(bytes)
|
||||
if cErr != nil {
|
||||
log.SError("service method ", serviceMethod, " CompressBlock error:", cErr.Error())
|
||||
return
|
||||
}
|
||||
if len(compressBuff) < len(bytes) {
|
||||
bytes = compressBuff
|
||||
bCompress = 1<<7
|
||||
}
|
||||
}
|
||||
|
||||
errM = agent.conn.WriteMsg([]byte{uint8(processor.GetProcessorType())|bCompress}, bytes)
|
||||
if cap(compressBuff) >0 {
|
||||
compressor.CompressBufferCollection(compressBuff)
|
||||
}
|
||||
if errM != nil {
|
||||
log.SError("Rpc ", serviceMethod, " return is error:", errM.Error())
|
||||
}
|
||||
@@ -127,16 +149,34 @@ func (agent *RpcAgent) Run() {
|
||||
break
|
||||
}
|
||||
|
||||
processor := GetProcessor(data[0])
|
||||
bCompress := (data[0]>>7) > 0
|
||||
processor := GetProcessor(data[0]&0x7f)
|
||||
if processor == nil {
|
||||
agent.conn.ReleaseReadMsg(data)
|
||||
log.SError("remote rpc ", agent.conn.RemoteAddr(), " cannot find processor:", data[0])
|
||||
log.SError("remote rpc ", agent.conn.RemoteAddr().String(), " cannot find processor:", data[0])
|
||||
return
|
||||
}
|
||||
|
||||
//解析head
|
||||
var compressBuff []byte
|
||||
byteData := data[1:]
|
||||
if bCompress == true {
|
||||
var unCompressErr error
|
||||
|
||||
compressBuff,unCompressErr = compressor.UncompressBlock(byteData)
|
||||
if unCompressErr!= nil {
|
||||
agent.conn.ReleaseReadMsg(data)
|
||||
log.SError("rpcClient ", agent.conn.RemoteAddr().String(), " ReadMsg head error:", unCompressErr.Error())
|
||||
return
|
||||
}
|
||||
byteData = compressBuff
|
||||
}
|
||||
|
||||
req := MakeRpcRequest(processor, 0, 0, "", false, nil)
|
||||
err = processor.Unmarshal(data[1:], req.RpcRequestData)
|
||||
err = processor.Unmarshal(byteData, req.RpcRequestData)
|
||||
if cap(compressBuff) > 0 {
|
||||
compressor.UnCompressBufferCollection(compressBuff)
|
||||
}
|
||||
agent.conn.ReleaseReadMsg(data)
|
||||
if err != nil {
|
||||
log.SError("rpc Unmarshal request is error:", err.Error())
|
||||
@@ -248,11 +288,11 @@ func (server *Server) myselfRpcHandlerGo(client *Client,handlerName string, serv
|
||||
return rpcHandler.CallMethod(client,serviceMethod, args,callBack, reply)
|
||||
}
|
||||
|
||||
|
||||
|
||||
func (server *Server) selfNodeRpcHandlerGo(processor IRpcProcessor, client *Client, noReply bool, handlerName string, rpcMethodId uint32, serviceMethod string, args interface{}, reply interface{}, rawArgs []byte) *Call {
|
||||
func (server *Server) selfNodeRpcHandlerGo(timeout time.Duration,processor IRpcProcessor, client *Client, noReply bool, handlerName string, rpcMethodId uint32, serviceMethod string, args interface{}, reply interface{}, rawArgs []byte) *Call {
|
||||
pCall := MakeCall()
|
||||
pCall.Seq = client.generateSeq()
|
||||
pCall.TimeOut = timeout
|
||||
pCall.ServiceMethod = serviceMethod
|
||||
|
||||
rpcHandler := server.rpcHandleFinder.FindRpcHandler(handlerName)
|
||||
if rpcHandler == nil {
|
||||
@@ -343,12 +383,12 @@ func (server *Server) selfNodeRpcHandlerGo(processor IRpcProcessor, client *Clie
|
||||
return pCall
|
||||
}
|
||||
|
||||
func (server *Server) selfNodeRpcHandlerAsyncGo(client *Client, callerRpcHandler IRpcHandler, noReply bool, handlerName string, serviceMethod string, args interface{}, reply interface{}, callback reflect.Value) error {
|
||||
func (server *Server) selfNodeRpcHandlerAsyncGo(timeout time.Duration,client *Client, callerRpcHandler IRpcHandler, noReply bool, handlerName string, serviceMethod string, args interface{}, reply interface{}, callback reflect.Value,cancelable bool) (CancelRpc,error) {
|
||||
rpcHandler := server.rpcHandleFinder.FindRpcHandler(handlerName)
|
||||
if rpcHandler == nil {
|
||||
err := errors.New("service method " + serviceMethod + " not config!")
|
||||
log.SError(err.Error())
|
||||
return err
|
||||
return emptyCancelRpc,err
|
||||
}
|
||||
|
||||
_, processor := GetProcessorType(args)
|
||||
@@ -356,40 +396,44 @@ func (server *Server) selfNodeRpcHandlerAsyncGo(client *Client, callerRpcHandler
|
||||
if err != nil {
|
||||
errM := errors.New("RpcHandler " + handlerName + "."+serviceMethod+" deep copy inParam is error:" + err.Error())
|
||||
log.SError(errM.Error())
|
||||
return errM
|
||||
return emptyCancelRpc,errM
|
||||
}
|
||||
|
||||
req := MakeRpcRequest(processor, 0, 0, serviceMethod, noReply, nil)
|
||||
req.inParam = iParam
|
||||
req.localReply = reply
|
||||
|
||||
cancelRpc := emptyCancelRpc
|
||||
var callSeq uint64
|
||||
if noReply == false {
|
||||
callSeq := client.generateSeq()
|
||||
callSeq = client.generateSeq()
|
||||
pCall := MakeCall()
|
||||
pCall.Seq = callSeq
|
||||
pCall.rpcHandler = callerRpcHandler
|
||||
pCall.callback = &callback
|
||||
pCall.Reply = reply
|
||||
pCall.ServiceMethod = serviceMethod
|
||||
pCall.TimeOut = timeout
|
||||
client.AddPending(pCall)
|
||||
rpcCancel := RpcCancel{CallSeq: callSeq,Cli: client}
|
||||
cancelRpc = rpcCancel.CancelRpc
|
||||
|
||||
req.requestHandle = func(Returns interface{}, Err RpcError) {
|
||||
v := client.RemovePending(callSeq)
|
||||
if v == nil {
|
||||
log.SError("rpcClient cannot find seq ", pCall.Seq, " in pending")
|
||||
//ReleaseCall(pCall)
|
||||
ReleaseRpcRequest(req)
|
||||
return
|
||||
}
|
||||
if len(Err) == 0 {
|
||||
pCall.Err = nil
|
||||
v.Err = nil
|
||||
} else {
|
||||
pCall.Err = Err
|
||||
v.Err = Err
|
||||
}
|
||||
|
||||
if Returns != nil {
|
||||
pCall.Reply = Returns
|
||||
v.Reply = Returns
|
||||
}
|
||||
pCall.rpcHandler.PushRpcResponse(pCall)
|
||||
v.rpcHandler.PushRpcResponse(v)
|
||||
ReleaseRpcRequest(req)
|
||||
}
|
||||
}
|
||||
@@ -397,8 +441,11 @@ func (server *Server) selfNodeRpcHandlerAsyncGo(client *Client, callerRpcHandler
|
||||
err = rpcHandler.PushRpcRequest(req)
|
||||
if err != nil {
|
||||
ReleaseRpcRequest(req)
|
||||
return err
|
||||
if callSeq > 0 {
|
||||
client.RemovePending(callSeq)
|
||||
}
|
||||
return emptyCancelRpc,err
|
||||
}
|
||||
|
||||
return nil
|
||||
return cancelRpc,nil
|
||||
}
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
package mysqlmondule
|
||||
package mysqlmodule
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
package mysqlmondule
|
||||
package mysqlmodule
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
@@ -213,7 +213,7 @@ func (cs *CustomerSubscriber) publishToCustomer(topicData []TopicData) bool {
|
||||
}
|
||||
|
||||
//推送数据
|
||||
err := cs.CallNode(cs.fromNodeId, cs.callBackRpcMethod, &dbQueuePublishReq, &dbQueuePushRes)
|
||||
err := cs.CallNodeWithTimeout(4*time.Minute,cs.fromNodeId, cs.callBackRpcMethod, &dbQueuePublishReq, &dbQueuePushRes)
|
||||
if err != nil {
|
||||
time.Sleep(time.Second * 1)
|
||||
continue
|
||||
|
||||
@@ -7,6 +7,7 @@ import (
|
||||
"reflect"
|
||||
"runtime"
|
||||
"time"
|
||||
"sync/atomic"
|
||||
)
|
||||
|
||||
// ITimer
|
||||
@@ -29,7 +30,7 @@ type OnAddTimer func(timer ITimer)
|
||||
// Timer
|
||||
type Timer struct {
|
||||
Id uint64
|
||||
cancelled bool //是否关闭
|
||||
cancelled int32 //是否关闭
|
||||
C chan ITimer //定时器管道
|
||||
interval time.Duration // 时间间隔(用于循环定时器)
|
||||
fireTime time.Time // 触发时间
|
||||
@@ -171,12 +172,12 @@ func (t *Timer) GetInterval() time.Duration {
|
||||
}
|
||||
|
||||
func (t *Timer) Cancel() {
|
||||
t.cancelled = true
|
||||
atomic.StoreInt32(&t.cancelled,1)
|
||||
}
|
||||
|
||||
// 判断定时器是否已经取消
|
||||
func (t *Timer) IsActive() bool {
|
||||
return !t.cancelled
|
||||
return atomic.LoadInt32(&t.cancelled) == 0
|
||||
}
|
||||
|
||||
func (t *Timer) GetName() string {
|
||||
|
||||
Reference in New Issue
Block a user