Compare commits

...

9 Commits

Author SHA1 Message Date
duanhf2012
dfb6959843 优化消息队列默认超时推送时间 2023-07-28 17:43:39 +08:00
duanhf2012
dd4aaf9c57 优化rpc超时 2023-07-28 17:38:52 +08:00
duanhf2012
6ef98a2104 优化新增Rpc压缩与自定义超时功能 2023-07-25 17:36:47 +08:00
duanhf2012
1890b300ee 优化新增rpc压缩功能 2023-07-25 15:13:58 +08:00
duanhf2012
6fea2226e1 优化协议解析器 2023-07-21 17:03:15 +08:00
duanhf2012
ec1c2b4517 node支持rpc压缩 2023-07-21 15:28:52 +08:00
duanhf2012
4b84d9a1d5 新增rpc自定义超时 2023-07-13 16:42:23 +08:00
duanhf2012
85a8ec58e5 rpc加入压缩功能 2023-07-10 14:22:23 +08:00
duanhf2012
962016d476 优化rpc 2023-07-07 13:50:57 +08:00
14 changed files with 504 additions and 142 deletions

View File

@@ -26,6 +26,7 @@ type NodeInfo struct {
Private bool Private bool
ListenAddr string ListenAddr string
MaxRpcParamLen uint32 //最大Rpc参数长度 MaxRpcParamLen uint32 //最大Rpc参数长度
CompressBytesLen int //超过字节进行压缩的长度
ServiceList []string //所有的有序服务列表 ServiceList []string //所有的有序服务列表
PublicServiceList []string //对外公开的服务列表 PublicServiceList []string //对外公开的服务列表
DiscoveryService []string //筛选发现的服务,如果不配置,不进行筛选 DiscoveryService []string //筛选发现的服务,如果不配置,不进行筛选
@@ -73,7 +74,7 @@ func SetServiceDiscovery(serviceDiscovery IServiceDiscovery) {
} }
func (cls *Cluster) Start() { func (cls *Cluster) Start() {
cls.rpcServer.Start(cls.localNodeInfo.ListenAddr, cls.localNodeInfo.MaxRpcParamLen) cls.rpcServer.Start(cls.localNodeInfo.ListenAddr, cls.localNodeInfo.MaxRpcParamLen,cls.localNodeInfo.CompressBytesLen)
} }
func (cls *Cluster) Stop() { func (cls *Cluster) Stop() {
@@ -195,7 +196,7 @@ func (cls *Cluster) serviceDiscoverySetNodeInfo(nodeInfo *NodeInfo) {
rpcInfo := NodeRpcInfo{} rpcInfo := NodeRpcInfo{}
rpcInfo.nodeInfo = *nodeInfo rpcInfo.nodeInfo = *nodeInfo
rpcInfo.client =rpc.NewRClient(nodeInfo.NodeId, nodeInfo.ListenAddr, nodeInfo.MaxRpcParamLen,cls.triggerRpcEvent) rpcInfo.client =rpc.NewRClient(nodeInfo.NodeId, nodeInfo.ListenAddr, nodeInfo.MaxRpcParamLen,cls.localNodeInfo.CompressBytesLen,cls.triggerRpcEvent)
cls.mapRpc[nodeInfo.NodeId] = rpcInfo cls.mapRpc[nodeInfo.NodeId] = rpcInfo
} }

View File

@@ -5,6 +5,8 @@ import (
"github.com/duanhf2012/origin/log" "github.com/duanhf2012/origin/log"
"github.com/duanhf2012/origin/rpc" "github.com/duanhf2012/origin/rpc"
"github.com/duanhf2012/origin/service" "github.com/duanhf2012/origin/service"
"time"
"github.com/duanhf2012/origin/util/timer"
) )
const DynamicDiscoveryMasterName = "DiscoveryMaster" const DynamicDiscoveryMasterName = "DiscoveryMaster"
@@ -341,6 +343,10 @@ func (dc *DynamicDiscoveryClient) isDiscoverNode(nodeId int) bool {
} }
func (dc *DynamicDiscoveryClient) OnNodeConnected(nodeId int) { func (dc *DynamicDiscoveryClient) OnNodeConnected(nodeId int) {
dc.regServiceDiscover(nodeId)
}
func (dc *DynamicDiscoveryClient) regServiceDiscover(nodeId int){
nodeInfo := cluster.GetMasterDiscoveryNodeInfo(nodeId) nodeInfo := cluster.GetMasterDiscoveryNodeInfo(nodeId)
if nodeInfo == nil { if nodeInfo == nil {
return return
@@ -364,6 +370,10 @@ func (dc *DynamicDiscoveryClient) OnNodeConnected(nodeId int) {
err := dc.AsyncCallNode(nodeId, RegServiceDiscover, &req, func(res *rpc.Empty, err error) { err := dc.AsyncCallNode(nodeId, RegServiceDiscover, &req, func(res *rpc.Empty, err error) {
if err != nil { if err != nil {
log.SError("call ", RegServiceDiscover, " is fail :", err.Error()) log.SError("call ", RegServiceDiscover, " is fail :", err.Error())
dc.AfterFunc(time.Second*3, func(timer *timer.Timer) {
dc.regServiceDiscover(nodeId)
})
return return
} }
}) })

View File

@@ -68,6 +68,11 @@ func (pbProcessor *PBProcessor) MsgRoute(clientId uint64, msg interface{}) error
// must goroutine safe // must goroutine safe
func (pbProcessor *PBProcessor) Unmarshal(clientId uint64, data []byte) (interface{}, error) { func (pbProcessor *PBProcessor) Unmarshal(clientId uint64, data []byte) (interface{}, error) {
defer pbProcessor.ReleaseByteSlice(data) defer pbProcessor.ReleaseByteSlice(data)
return pbProcessor.UnmarshalWithOutRelease(clientId, data)
}
// unmarshal but not release data
func (pbProcessor *PBProcessor) UnmarshalWithOutRelease(clientId uint64, data []byte) (interface{}, error) {
var msgType uint16 var msgType uint16
if pbProcessor.LittleEndian == true { if pbProcessor.LittleEndian == true {
msgType = binary.LittleEndian.Uint16(data[:2]) msgType = binary.LittleEndian.Uint16(data[:2])

View File

@@ -16,7 +16,7 @@ type memAreaPool struct {
pool []sync.Pool pool []sync.Pool
} }
var memAreaPoolList = [3]*memAreaPool{&memAreaPool{minAreaValue: 1, maxAreaValue: 4096, growthValue: 512}, &memAreaPool{minAreaValue: 4097, maxAreaValue: 40960, growthValue: 4096}, &memAreaPool{minAreaValue: 40961, maxAreaValue: 417792, growthValue: 16384}} var memAreaPoolList = [4]*memAreaPool{&memAreaPool{minAreaValue: 1, maxAreaValue: 4096, growthValue: 512}, &memAreaPool{minAreaValue: 4097, maxAreaValue: 40960, growthValue: 4096}, &memAreaPool{minAreaValue: 40961, maxAreaValue: 417792, growthValue: 16384}, &memAreaPool{minAreaValue: 417793, maxAreaValue: 1925120, growthValue: 65536}}
func init() { func init() {
for i := 0; i < len(memAreaPoolList); i++ { for i := 0; i < len(memAreaPoolList); i++ {

View File

@@ -1,7 +1,6 @@
package rpc package rpc
import ( import (
"container/list"
"errors" "errors"
"github.com/duanhf2012/origin/network" "github.com/duanhf2012/origin/network"
"reflect" "reflect"
@@ -9,6 +8,7 @@ import (
"sync" "sync"
"sync/atomic" "sync/atomic"
"time" "time"
"github.com/duanhf2012/origin/log"
) )
const( const(
@@ -20,7 +20,7 @@ const(
DefaultConnectInterval = 2*time.Second DefaultConnectInterval = 2*time.Second
DefaultCheckRpcCallTimeoutInterval = 5*time.Second DefaultCheckRpcCallTimeoutInterval = 1*time.Second
DefaultRpcTimeout = 15*time.Second DefaultRpcTimeout = 15*time.Second
) )
@@ -30,9 +30,9 @@ type IRealClient interface {
SetConn(conn *network.TCPConn) SetConn(conn *network.TCPConn)
Close(waitDone bool) Close(waitDone bool)
AsyncCall(rpcHandler IRpcHandler, serviceMethod string, callback reflect.Value, args interface{}, replyParam interface{}) error AsyncCall(timeout time.Duration,rpcHandler IRpcHandler, serviceMethod string, callback reflect.Value, args interface{}, replyParam interface{},cancelable bool) (CancelRpc,error)
Go(rpcHandler IRpcHandler, noReply bool, serviceMethod string, args interface{}, reply interface{}) *Call Go(timeout time.Duration,rpcHandler IRpcHandler, noReply bool, serviceMethod string, args interface{}, reply interface{}) *Call
RawGo(rpcHandler IRpcHandler,processor IRpcProcessor, noReply bool, rpcMethodId uint32, serviceMethod string, rawArgs []byte, reply interface{}) *Call RawGo(timeout time.Duration,rpcHandler IRpcHandler,processor IRpcProcessor, noReply bool, rpcMethodId uint32, serviceMethod string, rawArgs []byte, reply interface{}) *Call
IsConnected() bool IsConnected() bool
Run() Run()
@@ -44,11 +44,11 @@ type Client struct {
nodeId int nodeId int
pendingLock sync.RWMutex pendingLock sync.RWMutex
startSeq uint64 startSeq uint64
pending map[uint64]*list.Element pending map[uint64]*Call
pendingTimer *list.List
callRpcTimeout time.Duration callRpcTimeout time.Duration
maxCheckCallRpcCount int maxCheckCallRpcCount int
callTimerHeap CallTimerHeap
IRealClient IRealClient
} }
@@ -59,7 +59,6 @@ func (client *Client) NewClientAgent(conn *network.TCPConn) network.Agent {
} }
func (bc *Client) makeCallFail(call *Call) { func (bc *Client) makeCallFail(call *Call) {
bc.removePending(call.Seq)
if call.callback != nil && call.callback.IsValid() { if call.callback != nil && call.callback.IsValid() {
call.rpcHandler.PushRpcResponse(call) call.rpcHandler.PushRpcResponse(call)
} else { } else {
@@ -70,54 +69,52 @@ func (bc *Client) makeCallFail(call *Call) {
func (bc *Client) checkRpcCallTimeout() { func (bc *Client) checkRpcCallTimeout() {
for{ for{
time.Sleep(DefaultCheckRpcCallTimeoutInterval) time.Sleep(DefaultCheckRpcCallTimeoutInterval)
now := time.Now()
for i := 0; i < bc.maxCheckCallRpcCount; i++ { for i := 0; i < bc.maxCheckCallRpcCount; i++ {
bc.pendingLock.Lock() bc.pendingLock.Lock()
if bc.pendingTimer == nil {
callSeq := bc.callTimerHeap.PopTimeout()
if callSeq == 0 {
bc.pendingLock.Unlock() bc.pendingLock.Unlock()
break break
} }
pElem := bc.pendingTimer.Front() pCall := bc.pending[callSeq]
if pElem == nil { if pCall == nil {
bc.pendingLock.Unlock()
break
}
pCall := pElem.Value.(*Call)
if now.Sub(pCall.callTime) > bc.callRpcTimeout {
strTimeout := strconv.FormatInt(int64(bc.callRpcTimeout/time.Second), 10)
pCall.Err = errors.New("RPC call takes more than " + strTimeout + " seconds")
bc.makeCallFail(pCall)
bc.pendingLock.Unlock() bc.pendingLock.Unlock()
log.SError("callSeq ",callSeq," is not find")
continue continue
} }
delete(bc.pending,callSeq)
strTimeout := strconv.FormatInt(int64(pCall.TimeOut.Seconds()), 10)
pCall.Err = errors.New("RPC call takes more than " + strTimeout + " seconds,method is "+pCall.ServiceMethod)
log.SError(pCall.Err.Error())
bc.makeCallFail(pCall)
bc.pendingLock.Unlock() bc.pendingLock.Unlock()
break continue
} }
} }
} }
func (client *Client) InitPending() { func (client *Client) InitPending() {
client.pendingLock.Lock() client.pendingLock.Lock()
if client.pending != nil { client.callTimerHeap.Init()
for _, v := range client.pending { client.pending = make(map[uint64]*Call,4096)
v.Value.(*Call).Err = errors.New("node is disconnect")
v.Value.(*Call).done <- v.Value.(*Call)
}
}
client.pending = make(map[uint64]*list.Element, 4096)
client.pendingTimer = list.New()
client.pendingLock.Unlock() client.pendingLock.Unlock()
} }
func (bc *Client) AddPending(call *Call) { func (bc *Client) AddPending(call *Call) {
bc.pendingLock.Lock() bc.pendingLock.Lock()
call.callTime = time.Now()
elemTimer := bc.pendingTimer.PushBack(call) if call.Seq == 0 {
bc.pending[call.Seq] = elemTimer //如果下面发送失败,将会一一直存在这里 bc.pendingLock.Unlock()
log.SStack("call is error.")
return
}
bc.pending[call.Seq] = call
bc.callTimerHeap.AddTimer(call.Seq,call.TimeOut)
bc.pendingLock.Unlock() bc.pendingLock.Unlock()
} }
@@ -136,30 +133,45 @@ func (bc *Client) removePending(seq uint64) *Call {
if ok == false { if ok == false {
return nil return nil
} }
call := v.Value.(*Call)
bc.pendingTimer.Remove(v) bc.callTimerHeap.Cancel(seq)
delete(bc.pending, seq) delete(bc.pending, seq)
return call return v
} }
func (bc *Client) FindPending(seq uint64) *Call { func (bc *Client) FindPending(seq uint64) (pCall *Call) {
if seq == 0 { if seq == 0 {
return nil return nil
} }
bc.pendingLock.Lock() bc.pendingLock.Lock()
v, ok := bc.pending[seq] pCall = bc.pending[seq]
if ok == false {
bc.pendingLock.Unlock()
return nil
}
pCall := v.Value.(*Call)
bc.pendingLock.Unlock() bc.pendingLock.Unlock()
return pCall return pCall
} }
func (bc *Client) cleanPending(){
bc.pendingLock.Lock()
for {
callSeq := bc.callTimerHeap.PopFirst()
if callSeq == 0 {
break
}
pCall := bc.pending[callSeq]
if pCall == nil {
log.SError("callSeq ",callSeq," is not find")
continue
}
delete(bc.pending,callSeq)
pCall.Err = errors.New("nodeid is disconnect ")
bc.makeCallFail(pCall)
}
bc.pendingLock.Unlock()
}
func (bc *Client) generateSeq() uint64 { func (bc *Client) generateSeq() uint64 {
return atomic.AddUint64(&bc.startSeq, 1) return atomic.AddUint64(&bc.startSeq, 1)
} }

102
rpc/compressor.go Normal file
View File

@@ -0,0 +1,102 @@
package rpc
import (
"runtime"
"errors"
"github.com/pierrec/lz4/v4"
"fmt"
"github.com/duanhf2012/origin/network"
)
var memPool network.INetMempool = network.NewMemAreaPool()
type ICompressor interface {
CompressBlock(src []byte) ([]byte, error) //dst如果有预申请使用dst内存传入nil时内部申请
UncompressBlock(src []byte) ([]byte, error) //dst如果有预申请使用dst内存传入nil时内部申请
CompressBufferCollection(buffer []byte) //压缩的Buffer内存回收
UnCompressBufferCollection(buffer []byte) //解压缩的Buffer内存回收
}
var compressor ICompressor
func init(){
SetCompressor(&Lz4Compressor{})
}
func SetCompressor(cp ICompressor){
compressor = cp
}
type Lz4Compressor struct {
}
func (lc *Lz4Compressor) CompressBlock(src []byte) (dest []byte, err error) {
defer func() {
if r := recover(); r != nil {
buf := make([]byte, 4096)
l := runtime.Stack(buf, false)
errString := fmt.Sprint(r)
err = errors.New("core dump info[" + errString + "]\n" + string(buf[:l]))
}
}()
var c lz4.Compressor
var cnt int
dest = memPool.MakeByteSlice(lz4.CompressBlockBound(len(src))+1)
cnt, err = c.CompressBlock(src, dest[1:])
if err != nil {
memPool.ReleaseByteSlice(dest)
return nil,err
}
ratio := len(src) / cnt
if len(src) % cnt > 0 {
ratio += 1
}
if ratio > 255 {
memPool.ReleaseByteSlice(dest)
return nil,fmt.Errorf("Impermissible errors")
}
dest[0] = uint8(ratio)
dest = dest[:cnt+1]
return
}
func (lc *Lz4Compressor) UncompressBlock(src []byte) (dest []byte, err error) {
defer func() {
if r := recover(); r != nil {
buf := make([]byte, 4096)
l := runtime.Stack(buf, false)
errString := fmt.Sprint(r)
err = errors.New("core dump info[" + errString + "]\n" + string(buf[:l]))
}
}()
radio := uint8(src[0])
if radio == 0 {
return nil,fmt.Errorf("Impermissible errors")
}
dest = memPool.MakeByteSlice(len(src)*int(radio))
cnt, err := lz4.UncompressBlock(src[1:], dest)
if err != nil {
memPool.ReleaseByteSlice(dest)
return nil,err
}
return dest[:cnt],nil
}
func (lc *Lz4Compressor) compressBlockBound(n int) int{
return lz4.CompressBlockBound(n)
}
func (lc *Lz4Compressor) CompressBufferCollection(buffer []byte){
memPool.ReleaseByteSlice(buffer)
}
func (lc *Lz4Compressor) UnCompressBufferCollection(buffer []byte) {
memPool.ReleaseByteSlice(buffer)
}

View File

@@ -7,6 +7,7 @@ import (
"reflect" "reflect"
"strings" "strings"
"sync/atomic" "sync/atomic"
"time"
) )
//本结点的Client //本结点的Client
@@ -36,7 +37,7 @@ func (lc *LClient) SetConn(conn *network.TCPConn){
func (lc *LClient) Close(waitDone bool){ func (lc *LClient) Close(waitDone bool){
} }
func (lc *LClient) Go(rpcHandler IRpcHandler,noReply bool, serviceMethod string, args interface{}, reply interface{}) *Call { func (lc *LClient) Go(timeout time.Duration,rpcHandler IRpcHandler,noReply bool, serviceMethod string, args interface{}, reply interface{}) *Call {
pLocalRpcServer := rpcHandler.GetRpcServer()() pLocalRpcServer := rpcHandler.GetRpcServer()()
//判断是否是同一服务 //判断是否是同一服务
findIndex := strings.Index(serviceMethod, ".") findIndex := strings.Index(serviceMethod, ".")
@@ -65,19 +66,20 @@ func (lc *LClient) Go(rpcHandler IRpcHandler,noReply bool, serviceMethod string,
} }
//其他的rpcHandler的处理器 //其他的rpcHandler的处理器
return pLocalRpcServer.selfNodeRpcHandlerGo(nil, lc.selfClient, noReply, serviceName, 0, serviceMethod, args, reply, nil) return pLocalRpcServer.selfNodeRpcHandlerGo(timeout,nil, lc.selfClient, noReply, serviceName, 0, serviceMethod, args, reply, nil)
} }
func (rc *LClient) RawGo(rpcHandler IRpcHandler,processor IRpcProcessor, noReply bool, rpcMethodId uint32, serviceName string, rawArgs []byte, reply interface{}) *Call { func (rc *LClient) RawGo(timeout time.Duration,rpcHandler IRpcHandler,processor IRpcProcessor, noReply bool, rpcMethodId uint32, serviceName string, rawArgs []byte, reply interface{}) *Call {
pLocalRpcServer := rpcHandler.GetRpcServer()() pLocalRpcServer := rpcHandler.GetRpcServer()()
call := MakeCall()
call.ServiceMethod = serviceName
call.Reply = reply
//服务自我调用 //服务自我调用
if serviceName == rpcHandler.GetName() { if serviceName == rpcHandler.GetName() {
call := MakeCall()
call.ServiceMethod = serviceName
call.Reply = reply
call.TimeOut = timeout
err := pLocalRpcServer.myselfRpcHandlerGo(rc.selfClient,serviceName, serviceName, rawArgs, requestHandlerNull,nil) err := pLocalRpcServer.myselfRpcHandlerGo(rc.selfClient,serviceName, serviceName, rawArgs, requestHandlerNull,nil)
call.Err = err call.Err = err
call.done <- call call.done <- call
@@ -86,11 +88,11 @@ func (rc *LClient) RawGo(rpcHandler IRpcHandler,processor IRpcProcessor, noReply
} }
//其他的rpcHandler的处理器 //其他的rpcHandler的处理器
return pLocalRpcServer.selfNodeRpcHandlerGo(processor,rc.selfClient, true, serviceName, rpcMethodId, serviceName, nil, nil, rawArgs) return pLocalRpcServer.selfNodeRpcHandlerGo(timeout,processor,rc.selfClient, true, serviceName, rpcMethodId, serviceName, nil, nil, rawArgs)
} }
func (lc *LClient) AsyncCall(rpcHandler IRpcHandler, serviceMethod string, callback reflect.Value, args interface{}, reply interface{}) error { func (lc *LClient) AsyncCall(timeout time.Duration,rpcHandler IRpcHandler, serviceMethod string, callback reflect.Value, args interface{}, reply interface{},cancelable bool) (CancelRpc,error) {
pLocalRpcServer := rpcHandler.GetRpcServer()() pLocalRpcServer := rpcHandler.GetRpcServer()()
//判断是否是同一服务 //判断是否是同一服务
@@ -99,22 +101,22 @@ func (lc *LClient) AsyncCall(rpcHandler IRpcHandler, serviceMethod string, callb
err := errors.New("Call serviceMethod " + serviceMethod + " is error!") err := errors.New("Call serviceMethod " + serviceMethod + " is error!")
callback.Call([]reflect.Value{reflect.ValueOf(reply), reflect.ValueOf(err)}) callback.Call([]reflect.Value{reflect.ValueOf(reply), reflect.ValueOf(err)})
log.SError(err.Error()) log.SError(err.Error())
return nil return emptyCancelRpc,nil
} }
serviceName := serviceMethod[:findIndex] serviceName := serviceMethod[:findIndex]
//调用自己rpcHandler处理器 //调用自己rpcHandler处理器
if serviceName == rpcHandler.GetName() { //自己服务调用 if serviceName == rpcHandler.GetName() { //自己服务调用
return pLocalRpcServer.myselfRpcHandlerGo(lc.selfClient,serviceName, serviceMethod, args,callback ,reply) return emptyCancelRpc,pLocalRpcServer.myselfRpcHandlerGo(lc.selfClient,serviceName, serviceMethod, args,callback ,reply)
} }
//其他的rpcHandler的处理器 //其他的rpcHandler的处理器
err := pLocalRpcServer.selfNodeRpcHandlerAsyncGo(lc.selfClient, rpcHandler, false, serviceName, serviceMethod, args, reply, callback) calcelRpc,err := pLocalRpcServer.selfNodeRpcHandlerAsyncGo(timeout,lc.selfClient, rpcHandler, false, serviceName, serviceMethod, args, reply, callback,cancelable)
if err != nil { if err != nil {
callback.Call([]reflect.Value{reflect.ValueOf(reply), reflect.ValueOf(err)}) callback.Call([]reflect.Value{reflect.ValueOf(reply), reflect.ValueOf(err)})
} }
return nil return calcelRpc,nil
} }
func NewLClient(nodeId int) *Client{ func NewLClient(nodeId int) *Client{

View File

@@ -9,10 +9,12 @@ import (
"reflect" "reflect"
"runtime" "runtime"
"sync/atomic" "sync/atomic"
"time"
) )
//跨结点连接的Client //跨结点连接的Client
type RClient struct { type RClient struct {
compressBytesLen int
selfClient *Client selfClient *Client
network.TCPClient network.TCPClient
conn *network.TCPConn conn *network.TCPConn
@@ -40,7 +42,7 @@ func (rc *RClient) SetConn(conn *network.TCPConn){
rc.Unlock() rc.Unlock()
} }
func (rc *RClient) Go(rpcHandler IRpcHandler,noReply bool, serviceMethod string, args interface{}, reply interface{}) *Call { func (rc *RClient) Go(timeout time.Duration,rpcHandler IRpcHandler,noReply bool, serviceMethod string, args interface{}, reply interface{}) *Call {
_, processor := GetProcessorType(args) _, processor := GetProcessorType(args)
InParam, err := processor.Marshal(args) InParam, err := processor.Marshal(args)
if err != nil { if err != nil {
@@ -50,15 +52,15 @@ func (rc *RClient) Go(rpcHandler IRpcHandler,noReply bool, serviceMethod string,
return call return call
} }
return rc.RawGo(rpcHandler,processor, noReply, 0, serviceMethod, InParam, reply) return rc.RawGo(timeout,rpcHandler,processor, noReply, 0, serviceMethod, InParam, reply)
} }
func (rc *RClient) RawGo(timeout time.Duration,rpcHandler IRpcHandler,processor IRpcProcessor, noReply bool, rpcMethodId uint32, serviceMethod string, rawArgs []byte, reply interface{}) *Call {
func (rc *RClient) RawGo(rpcHandler IRpcHandler,processor IRpcProcessor, noReply bool, rpcMethodId uint32, serviceMethod string, rawArgs []byte, reply interface{}) *Call {
call := MakeCall() call := MakeCall()
call.ServiceMethod = serviceMethod call.ServiceMethod = serviceMethod
call.Reply = reply call.Reply = reply
call.Seq = rc.selfClient.generateSeq() call.Seq = rc.selfClient.generateSeq()
call.TimeOut = timeout
request := MakeRpcRequest(processor, call.Seq, rpcMethodId, serviceMethod, noReply, rawArgs) request := MakeRpcRequest(processor, call.Seq, rpcMethodId, serviceMethod, noReply, rawArgs)
bytes, err := processor.Marshal(request.RpcRequestData) bytes, err := processor.Marshal(request.RpcRequestData)
@@ -80,11 +82,31 @@ func (rc *RClient) RawGo(rpcHandler IRpcHandler,processor IRpcProcessor, noReply
return call return call
} }
var compressBuff[]byte
bCompress := uint8(0)
if rc.compressBytesLen > 0 && len(bytes) >= rc.compressBytesLen {
var cErr error
compressBuff,cErr = compressor.CompressBlock(bytes)
if cErr != nil {
call.Seq = 0
log.SError(cErr.Error())
call.DoError(cErr)
return call
}
if len(compressBuff) < len(bytes) {
bytes = compressBuff
bCompress = 1<<7
}
}
if noReply == false { if noReply == false {
rc.selfClient.AddPending(call) rc.selfClient.AddPending(call)
} }
err = conn.WriteMsg([]byte{uint8(processor.GetProcessorType())}, bytes) err = conn.WriteMsg([]byte{uint8(processor.GetProcessorType())|bCompress}, bytes)
if cap(compressBuff) >0 {
compressor.CompressBufferCollection(compressBuff)
}
if err != nil { if err != nil {
rc.selfClient.RemovePending(call.Seq) rc.selfClient.RemovePending(call.Seq)
@@ -98,20 +120,20 @@ func (rc *RClient) RawGo(rpcHandler IRpcHandler,processor IRpcProcessor, noReply
} }
func (rc *RClient) AsyncCall(rpcHandler IRpcHandler, serviceMethod string, callback reflect.Value, args interface{}, replyParam interface{}) error { func (rc *RClient) AsyncCall(timeout time.Duration,rpcHandler IRpcHandler, serviceMethod string, callback reflect.Value, args interface{}, replyParam interface{},cancelable bool) (CancelRpc,error) {
err := rc.asyncCall(rpcHandler, serviceMethod, callback, args, replyParam) cancelRpc,err := rc.asyncCall(timeout,rpcHandler, serviceMethod, callback, args, replyParam,cancelable)
if err != nil { if err != nil {
callback.Call([]reflect.Value{reflect.ValueOf(replyParam), reflect.ValueOf(err)}) callback.Call([]reflect.Value{reflect.ValueOf(replyParam), reflect.ValueOf(err)})
} }
return nil return cancelRpc,nil
} }
func (rc *RClient) asyncCall(rpcHandler IRpcHandler, serviceMethod string, callback reflect.Value, args interface{}, replyParam interface{}) error { func (rc *RClient) asyncCall(timeout time.Duration,rpcHandler IRpcHandler, serviceMethod string, callback reflect.Value, args interface{}, replyParam interface{},cancelable bool) (CancelRpc,error) {
processorType, processor := GetProcessorType(args) processorType, processor := GetProcessorType(args)
InParam, herr := processor.Marshal(args) InParam, herr := processor.Marshal(args)
if herr != nil { if herr != nil {
return herr return emptyCancelRpc,herr
} }
seq := rc.selfClient.generateSeq() seq := rc.selfClient.generateSeq()
@@ -119,12 +141,27 @@ func (rc *RClient) asyncCall(rpcHandler IRpcHandler, serviceMethod string, callb
bytes, err := processor.Marshal(request.RpcRequestData) bytes, err := processor.Marshal(request.RpcRequestData)
ReleaseRpcRequest(request) ReleaseRpcRequest(request)
if err != nil { if err != nil {
return err return emptyCancelRpc,err
} }
conn := rc.GetConn() conn := rc.GetConn()
if conn == nil || conn.IsConnected()==false { if conn == nil || conn.IsConnected()==false {
return errors.New("Rpc server is disconnect,call " + serviceMethod) return emptyCancelRpc,errors.New("Rpc server is disconnect,call " + serviceMethod)
}
var compressBuff[]byte
bCompress := uint8(0)
if rc.compressBytesLen>0 &&len(bytes) >= rc.compressBytesLen {
var cErr error
compressBuff,cErr = compressor.CompressBlock(bytes)
if cErr != nil {
return emptyCancelRpc,cErr
}
if len(compressBuff) < len(bytes) {
bytes = compressBuff
bCompress = 1<<7
}
} }
call := MakeCall() call := MakeCall()
@@ -133,18 +170,28 @@ func (rc *RClient) asyncCall(rpcHandler IRpcHandler, serviceMethod string, callb
call.rpcHandler = rpcHandler call.rpcHandler = rpcHandler
call.ServiceMethod = serviceMethod call.ServiceMethod = serviceMethod
call.Seq = seq call.Seq = seq
call.TimeOut = timeout
rc.selfClient.AddPending(call) rc.selfClient.AddPending(call)
err = conn.WriteMsg([]byte{uint8(processorType)}, bytes) err = conn.WriteMsg([]byte{uint8(processorType)|bCompress}, bytes)
if cap(compressBuff) >0 {
compressor.CompressBufferCollection(compressBuff)
}
if err != nil { if err != nil {
rc.selfClient.RemovePending(call.Seq) rc.selfClient.RemovePending(call.Seq)
ReleaseCall(call) ReleaseCall(call)
return err return emptyCancelRpc,err
} }
return nil if cancelable {
rpcCancel := RpcCancel{CallSeq:seq,Cli: rc.selfClient}
return rpcCancel.CancelRpc,nil
}
return emptyCancelRpc,nil
} }
func (rc *RClient) Run() { func (rc *RClient) Run() {
defer func() { defer func() {
if r := recover(); r != nil { if r := recover(); r != nil {
@@ -163,7 +210,8 @@ func (rc *RClient) Run() {
return return
} }
processor := GetProcessor(bytes[0]) bCompress := (bytes[0]>>7) > 0
processor := GetProcessor(bytes[0]&0x7f)
if processor == nil { if processor == nil {
rc.conn.ReleaseReadMsg(bytes) rc.conn.ReleaseReadMsg(bytes)
log.SError("rpcClient ", rc.Addr, " ReadMsg head error:", err.Error()) log.SError("rpcClient ", rc.Addr, " ReadMsg head error:", err.Error())
@@ -174,14 +222,33 @@ func (rc *RClient) Run() {
response := RpcResponse{} response := RpcResponse{}
response.RpcResponseData = processor.MakeRpcResponse(0, "", nil) response.RpcResponseData = processor.MakeRpcResponse(0, "", nil)
err = processor.Unmarshal(bytes[1:], response.RpcResponseData) //解压缩
byteData := bytes[1:]
var compressBuff []byte
if bCompress == true {
var unCompressErr error
compressBuff,unCompressErr = compressor.UncompressBlock(byteData)
if unCompressErr!= nil {
rc.conn.ReleaseReadMsg(bytes)
log.SError("rpcClient ", rc.Addr, " ReadMsg head error:", unCompressErr.Error())
return
}
byteData = compressBuff
}
err = processor.Unmarshal(byteData, response.RpcResponseData)
if cap(compressBuff) > 0 {
compressor.UnCompressBufferCollection(compressBuff)
}
rc.conn.ReleaseReadMsg(bytes) rc.conn.ReleaseReadMsg(bytes)
if err != nil { if err != nil {
processor.ReleaseRpcResponse(response.RpcResponseData) processor.ReleaseRpcResponse(response.RpcResponseData)
log.SError("rpcClient Unmarshal head error:", err.Error()) log.SError("rpcClient Unmarshal head error:", err.Error())
continue continue
} }
v := rc.selfClient.RemovePending(response.RpcResponseData.GetSeq()) v := rc.selfClient.RemovePending(response.RpcResponseData.GetSeq())
if v == nil { if v == nil {
log.SError("rpcClient cannot find seq ", response.RpcResponseData.GetSeq(), " in pending") log.SError("rpcClient cannot find seq ", response.RpcResponseData.GetSeq(), " in pending")
@@ -214,14 +281,14 @@ func (rc *RClient) OnClose() {
rc.TriggerRpcConnEvent(false, rc.selfClient.GetClientId(), rc.selfClient.GetNodeId()) rc.TriggerRpcConnEvent(false, rc.selfClient.GetClientId(), rc.selfClient.GetNodeId())
} }
func NewRClient(nodeId int, addr string, maxRpcParamLen uint32,triggerRpcConnEvent TriggerRpcConnEvent) *Client{ func NewRClient(nodeId int, addr string, maxRpcParamLen uint32,compressBytesLen int,triggerRpcConnEvent TriggerRpcConnEvent) *Client{
client := &Client{} client := &Client{}
client.clientId = atomic.AddUint32(&clientSeq, 1) client.clientId = atomic.AddUint32(&clientSeq, 1)
client.nodeId = nodeId client.nodeId = nodeId
client.maxCheckCallRpcCount = DefaultMaxCheckCallRpcCount client.maxCheckCallRpcCount = DefaultMaxCheckCallRpcCount
client.callRpcTimeout = DefaultRpcTimeout client.callRpcTimeout = DefaultRpcTimeout
c:= &RClient{} c:= &RClient{}
c.compressBytesLen = compressBytesLen
c.selfClient = client c.selfClient = client
c.Addr = addr c.Addr = addr
c.ConnectInterval = DefaultConnectInterval c.ConnectInterval = DefaultConnectInterval
@@ -251,18 +318,6 @@ func NewRClient(nodeId int, addr string, maxRpcParamLen uint32,triggerRpcConnEve
func (rc *RClient) Close(waitDone bool) { func (rc *RClient) Close(waitDone bool) {
rc.TCPClient.Close(waitDone) rc.TCPClient.Close(waitDone)
rc.selfClient.cleanPending()
rc.selfClient.pendingLock.Lock()
for {
pElem := rc.selfClient.pendingTimer.Front()
if pElem == nil {
break
}
pCall := pElem.Value.(*Call)
pCall.Err = errors.New("nodeid is disconnect ")
rc.selfClient.makeCallFail(pCall)
}
rc.selfClient.pendingLock.Unlock()
} }

View File

@@ -68,7 +68,16 @@ type Call struct {
connId int connId int
callback *reflect.Value callback *reflect.Value
rpcHandler IRpcHandler rpcHandler IRpcHandler
callTime time.Time TimeOut time.Duration
}
type RpcCancel struct {
Cli *Client
CallSeq uint64
}
func (rc *RpcCancel) CancelRpc(){
rc.Cli.RemovePending(rc.CallSeq)
} }
func (slf *RpcRequest) Clear() *RpcRequest{ func (slf *RpcRequest) Clear() *RpcRequest{
@@ -124,6 +133,8 @@ func (call *Call) Clear() *Call{
call.connId = 0 call.connId = 0
call.callback = nil call.callback = nil
call.rpcHandler = nil call.rpcHandler = nil
call.TimeOut = 0
return call return call
} }

View File

@@ -9,6 +9,7 @@ import (
"strings" "strings"
"unicode" "unicode"
"unicode/utf8" "unicode/utf8"
"time"
) )
const maxClusterNode int = 128 const maxClusterNode int = 128
@@ -75,6 +76,9 @@ type IDiscoveryServiceListener interface {
OnUnDiscoveryService(nodeId int, serviceName []string) OnUnDiscoveryService(nodeId int, serviceName []string)
} }
type CancelRpc func()
func emptyCancelRpc(){}
type IRpcHandler interface { type IRpcHandler interface {
IRpcHandlerChannel IRpcHandlerChannel
GetName() string GetName() string
@@ -83,11 +87,18 @@ type IRpcHandler interface {
HandlerRpcRequest(request *RpcRequest) HandlerRpcRequest(request *RpcRequest)
HandlerRpcResponseCB(call *Call) HandlerRpcResponseCB(call *Call)
CallMethod(client *Client,ServiceMethod string, param interface{},callBack reflect.Value, reply interface{}) error CallMethod(client *Client,ServiceMethod string, param interface{},callBack reflect.Value, reply interface{}) error
AsyncCall(serviceMethod string, args interface{}, callback interface{}) error
Call(serviceMethod string, args interface{}, reply interface{}) error Call(serviceMethod string, args interface{}, reply interface{}) error
Go(serviceMethod string, args interface{}) error
AsyncCallNode(nodeId int, serviceMethod string, args interface{}, callback interface{}) error
CallNode(nodeId int, serviceMethod string, args interface{}, reply interface{}) error CallNode(nodeId int, serviceMethod string, args interface{}, reply interface{}) error
AsyncCall(serviceMethod string, args interface{}, callback interface{}) error
AsyncCallNode(nodeId int, serviceMethod string, args interface{}, callback interface{}) error
CallWithTimeout(timeout time.Duration,serviceMethod string, args interface{}, reply interface{}) error
CallNodeWithTimeout(timeout time.Duration,nodeId int, serviceMethod string, args interface{}, reply interface{}) error
AsyncCallWithTimeout(timeout time.Duration,serviceMethod string, args interface{}, callback interface{}) (CancelRpc,error)
AsyncCallNodeWithTimeout(timeout time.Duration,nodeId int, serviceMethod string, args interface{}, callback interface{}) (CancelRpc,error)
Go(serviceMethod string, args interface{}) error
GoNode(nodeId int, serviceMethod string, args interface{}) error GoNode(nodeId int, serviceMethod string, args interface{}) error
RawGoNode(rpcProcessorType RpcProcessorType, nodeId int, rpcMethodId uint32, serviceName string, rawArgs []byte) error RawGoNode(rpcProcessorType RpcProcessorType, nodeId int, rpcMethodId uint32, serviceName string, rawArgs []byte) error
CastGo(serviceMethod string, args interface{}) error CastGo(serviceMethod string, args interface{}) error
@@ -433,7 +444,7 @@ func (handler *RpcHandler) goRpc(processor IRpcProcessor, bCast bool, nodeId int
//2.rpcClient调用 //2.rpcClient调用
for i := 0; i < count; i++ { for i := 0; i < count; i++ {
pCall := pClientList[i].Go(handler.rpcHandler,true, serviceMethod, args, nil) pCall := pClientList[i].Go(DefaultRpcTimeout,handler.rpcHandler,true, serviceMethod, args, nil)
if pCall.Err != nil { if pCall.Err != nil {
err = pCall.Err err = pCall.Err
} }
@@ -444,7 +455,7 @@ func (handler *RpcHandler) goRpc(processor IRpcProcessor, bCast bool, nodeId int
return err return err
} }
func (handler *RpcHandler) callRpc(nodeId int, serviceMethod string, args interface{}, reply interface{}) error { func (handler *RpcHandler) callRpc(timeout time.Duration,nodeId int, serviceMethod string, args interface{}, reply interface{}) error {
var pClientList [maxClusterNode]*Client var pClientList [maxClusterNode]*Client
err, count := handler.funcRpcClient(nodeId, serviceMethod, pClientList[:]) err, count := handler.funcRpcClient(nodeId, serviceMethod, pClientList[:])
if err != nil { if err != nil {
@@ -460,7 +471,7 @@ func (handler *RpcHandler) callRpc(nodeId int, serviceMethod string, args interf
} }
pClient := pClientList[0] pClient := pClientList[0]
pCall := pClient.Go(handler.rpcHandler,false, serviceMethod, args, reply) pCall := pClient.Go(timeout,handler.rpcHandler,false, serviceMethod, args, reply)
err = pCall.Done().Err err = pCall.Done().Err
pClient.RemovePending(pCall.Seq) pClient.RemovePending(pCall.Seq)
@@ -468,24 +479,24 @@ func (handler *RpcHandler) callRpc(nodeId int, serviceMethod string, args interf
return err return err
} }
func (handler *RpcHandler) asyncCallRpc(nodeId int, serviceMethod string, args interface{}, callback interface{}) error { func (handler *RpcHandler) asyncCallRpc(timeout time.Duration,nodeId int, serviceMethod string, args interface{}, callback interface{}) (CancelRpc,error) {
fVal := reflect.ValueOf(callback) fVal := reflect.ValueOf(callback)
if fVal.Kind() != reflect.Func { if fVal.Kind() != reflect.Func {
err := errors.New("call " + serviceMethod + " input callback param is error!") err := errors.New("call " + serviceMethod + " input callback param is error!")
log.SError(err.Error()) log.SError(err.Error())
return err return emptyCancelRpc,err
} }
if fVal.Type().NumIn() != 2 { if fVal.Type().NumIn() != 2 {
err := errors.New("call " + serviceMethod + " callback param function is error!") err := errors.New("call " + serviceMethod + " callback param function is error!")
log.SError(err.Error()) log.SError(err.Error())
return err return emptyCancelRpc,err
} }
if fVal.Type().In(0).Kind() != reflect.Ptr || fVal.Type().In(1).String() != "error" { if fVal.Type().In(0).Kind() != reflect.Ptr || fVal.Type().In(1).String() != "error" {
err := errors.New("call " + serviceMethod + " callback param function is error!") err := errors.New("call " + serviceMethod + " callback param function is error!")
log.SError(err.Error()) log.SError(err.Error())
return err return emptyCancelRpc,err
} }
reply := reflect.New(fVal.Type().In(0).Elem()).Interface() reply := reflect.New(fVal.Type().In(0).Elem()).Interface()
@@ -501,23 +512,19 @@ func (handler *RpcHandler) asyncCallRpc(nodeId int, serviceMethod string, args i
} }
fVal.Call([]reflect.Value{reflect.ValueOf(reply), reflect.ValueOf(err)}) fVal.Call([]reflect.Value{reflect.ValueOf(reply), reflect.ValueOf(err)})
log.SError("Call serviceMethod is error:", err.Error()) log.SError("Call serviceMethod is error:", err.Error())
return nil return emptyCancelRpc,nil
} }
if count > 1 { if count > 1 {
err := errors.New("cannot call more then 1 node") err := errors.New("cannot call more then 1 node")
fVal.Call([]reflect.Value{reflect.ValueOf(reply), reflect.ValueOf(err)}) fVal.Call([]reflect.Value{reflect.ValueOf(reply), reflect.ValueOf(err)})
log.SError(err.Error()) log.SError(err.Error())
return nil return emptyCancelRpc,nil
} }
//2.rpcClient调用 //2.rpcClient调用
//如果调用本结点服务 //如果调用本结点服务
pClient := pClientList[0] return pClientList[0].AsyncCall(timeout,handler.rpcHandler, serviceMethod, fVal, args, reply,false)
pClient.AsyncCall(handler.rpcHandler, serviceMethod, fVal, args, reply)
return nil
} }
func (handler *RpcHandler) GetName() string { func (handler *RpcHandler) GetName() string {
@@ -528,12 +535,29 @@ func (handler *RpcHandler) IsSingleCoroutine() bool {
return handler.rpcHandler.IsSingleCoroutine() return handler.rpcHandler.IsSingleCoroutine()
} }
func (handler *RpcHandler) CallWithTimeout(timeout time.Duration,serviceMethod string, args interface{}, reply interface{}) error {
return handler.callRpc(timeout,0, serviceMethod, args, reply)
}
func (handler *RpcHandler) CallNodeWithTimeout(timeout time.Duration,nodeId int, serviceMethod string, args interface{}, reply interface{}) error{
return handler.callRpc(timeout,nodeId, serviceMethod, args, reply)
}
func (handler *RpcHandler) AsyncCallWithTimeout(timeout time.Duration,serviceMethod string, args interface{}, callback interface{}) (CancelRpc,error){
return handler.asyncCallRpc(timeout,0, serviceMethod, args, callback)
}
func (handler *RpcHandler) AsyncCallNodeWithTimeout(timeout time.Duration,nodeId int, serviceMethod string, args interface{}, callback interface{}) (CancelRpc,error){
return handler.asyncCallRpc(timeout,nodeId, serviceMethod, args, callback)
}
func (handler *RpcHandler) AsyncCall(serviceMethod string, args interface{}, callback interface{}) error { func (handler *RpcHandler) AsyncCall(serviceMethod string, args interface{}, callback interface{}) error {
return handler.asyncCallRpc(0, serviceMethod, args, callback) _,err := handler.asyncCallRpc(DefaultRpcTimeout,0, serviceMethod, args, callback)
return err
} }
func (handler *RpcHandler) Call(serviceMethod string, args interface{}, reply interface{}) error { func (handler *RpcHandler) Call(serviceMethod string, args interface{}, reply interface{}) error {
return handler.callRpc(0, serviceMethod, args, reply) return handler.callRpc(DefaultRpcTimeout,0, serviceMethod, args, reply)
} }
func (handler *RpcHandler) Go(serviceMethod string, args interface{}) error { func (handler *RpcHandler) Go(serviceMethod string, args interface{}) error {
@@ -541,11 +565,13 @@ func (handler *RpcHandler) Go(serviceMethod string, args interface{}) error {
} }
func (handler *RpcHandler) AsyncCallNode(nodeId int, serviceMethod string, args interface{}, callback interface{}) error { func (handler *RpcHandler) AsyncCallNode(nodeId int, serviceMethod string, args interface{}, callback interface{}) error {
return handler.asyncCallRpc(nodeId, serviceMethod, args, callback) _,err:= handler.asyncCallRpc(DefaultRpcTimeout,nodeId, serviceMethod, args, callback)
return err
} }
func (handler *RpcHandler) CallNode(nodeId int, serviceMethod string, args interface{}, reply interface{}) error { func (handler *RpcHandler) CallNode(nodeId int, serviceMethod string, args interface{}, reply interface{}) error {
return handler.callRpc(nodeId, serviceMethod, args, reply) return handler.callRpc(DefaultRpcTimeout,nodeId, serviceMethod, args, reply)
} }
func (handler *RpcHandler) GoNode(nodeId int, serviceMethod string, args interface{}) error { func (handler *RpcHandler) GoNode(nodeId int, serviceMethod string, args interface{}) error {
@@ -573,7 +599,7 @@ func (handler *RpcHandler) RawGoNode(rpcProcessorType RpcProcessorType, nodeId i
//如果调用本结点服务 //如果调用本结点服务
for i := 0; i < count; i++ { for i := 0; i < count; i++ {
//跨node调用 //跨node调用
pCall := handler.pClientList[i].RawGo(handler.rpcHandler,processor, true, rpcMethodId, serviceName, rawArgs, nil) pCall := handler.pClientList[i].RawGo(DefaultRpcTimeout,handler.rpcHandler,processor, true, rpcMethodId, serviceName, rawArgs, nil)
if pCall.Err != nil { if pCall.Err != nil {
err = pCall.Err err = pCall.Err
} }

89
rpc/rpctimer.go Normal file
View File

@@ -0,0 +1,89 @@
package rpc
import (
"container/heap"
"time"
)
type CallTimer struct {
SeqId uint64
FireTime int64
}
type CallTimerHeap struct {
callTimer []CallTimer
mapSeqIndex map[uint64]int
}
func (h *CallTimerHeap) Init() {
h.mapSeqIndex = make(map[uint64]int, 4096)
h.callTimer = make([]CallTimer, 0, 4096)
}
func (h *CallTimerHeap) Len() int {
return len(h.callTimer)
}
func (h *CallTimerHeap) Less(i, j int) bool {
return h.callTimer[i].FireTime < h.callTimer[j].FireTime
}
func (h *CallTimerHeap) Swap(i, j int) {
h.callTimer[i], h.callTimer[j] = h.callTimer[j], h.callTimer[i]
h.mapSeqIndex[h.callTimer[i].SeqId] = i
h.mapSeqIndex[h.callTimer[j].SeqId] = j
}
func (h *CallTimerHeap) Push(t any) {
callTimer := t.(CallTimer)
h.mapSeqIndex[callTimer.SeqId] = len(h.callTimer)
h.callTimer = append(h.callTimer, callTimer)
}
func (h *CallTimerHeap) Pop() any {
l := len(h.callTimer)
seqId := h.callTimer[l-1].SeqId
h.callTimer = h.callTimer[:l-1]
delete(h.mapSeqIndex, seqId)
return seqId
}
func (h *CallTimerHeap) Cancel(seq uint64) bool {
index, ok := h.mapSeqIndex[seq]
if ok == false {
return false
}
heap.Remove(h, index)
return true
}
func (h *CallTimerHeap) AddTimer(seqId uint64,d time.Duration){
heap.Push(h, CallTimer{
SeqId: seqId,
FireTime: time.Now().Add(d).UnixNano(),
})
}
func (h *CallTimerHeap) PopTimeout() uint64 {
if h.Len() == 0 {
return 0
}
nextFireTime := h.callTimer[0].FireTime
if nextFireTime > time.Now().UnixNano() {
return 0
}
return heap.Pop(h).(uint64)
}
func (h *CallTimerHeap) PopFirst() uint64 {
if h.Len() == 0 {
return 0
}
return heap.Pop(h).(uint64)
}

View File

@@ -27,6 +27,8 @@ type Server struct {
functions map[interface{}]interface{} functions map[interface{}]interface{}
rpcHandleFinder RpcHandleFinder rpcHandleFinder RpcHandleFinder
rpcServer *network.TCPServer rpcServer *network.TCPServer
compressBytesLen int
} }
type RpcAgent struct { type RpcAgent struct {
@@ -64,7 +66,7 @@ func (server *Server) Init(rpcHandleFinder RpcHandleFinder) {
const Default_ReadWriteDeadline = 15*time.Second const Default_ReadWriteDeadline = 15*time.Second
func (server *Server) Start(listenAddr string, maxRpcParamLen uint32) { func (server *Server) Start(listenAddr string, maxRpcParamLen uint32,compressBytesLen int) {
splitAddr := strings.Split(listenAddr, ":") splitAddr := strings.Split(listenAddr, ":")
if len(splitAddr) != 2 { if len(splitAddr) != 2 {
log.SFatal("listen addr is error :", listenAddr) log.SFatal("listen addr is error :", listenAddr)
@@ -72,6 +74,7 @@ func (server *Server) Start(listenAddr string, maxRpcParamLen uint32) {
server.rpcServer.Addr = ":" + splitAddr[1] server.rpcServer.Addr = ":" + splitAddr[1]
server.rpcServer.MinMsgLen = 2 server.rpcServer.MinMsgLen = 2
server.compressBytesLen = compressBytesLen
if maxRpcParamLen > 0 { if maxRpcParamLen > 0 {
server.rpcServer.MaxMsgLen = maxRpcParamLen server.rpcServer.MaxMsgLen = maxRpcParamLen
} else { } else {
@@ -112,7 +115,26 @@ func (agent *RpcAgent) WriteResponse(processor IRpcProcessor, serviceMethod stri
return return
} }
errM = agent.conn.WriteMsg([]byte{uint8(processor.GetProcessorType())}, bytes) var compressBuff[]byte
bCompress := uint8(0)
if agent.rpcServer.compressBytesLen >0 && len(bytes) >= agent.rpcServer.compressBytesLen {
var cErr error
compressBuff,cErr = compressor.CompressBlock(bytes)
if cErr != nil {
log.SError("service method ", serviceMethod, " CompressBlock error:", cErr.Error())
return
}
if len(compressBuff) < len(bytes) {
bytes = compressBuff
bCompress = 1<<7
}
}
errM = agent.conn.WriteMsg([]byte{uint8(processor.GetProcessorType())|bCompress}, bytes)
if cap(compressBuff) >0 {
compressor.CompressBufferCollection(compressBuff)
}
if errM != nil { if errM != nil {
log.SError("Rpc ", serviceMethod, " return is error:", errM.Error()) log.SError("Rpc ", serviceMethod, " return is error:", errM.Error())
} }
@@ -127,16 +149,34 @@ func (agent *RpcAgent) Run() {
break break
} }
processor := GetProcessor(data[0]) bCompress := (data[0]>>7) > 0
processor := GetProcessor(data[0]&0x7f)
if processor == nil { if processor == nil {
agent.conn.ReleaseReadMsg(data) agent.conn.ReleaseReadMsg(data)
log.SError("remote rpc ", agent.conn.RemoteAddr(), " cannot find processor:", data[0]) log.SError("remote rpc ", agent.conn.RemoteAddr().String(), " cannot find processor:", data[0])
return return
} }
//解析head //解析head
var compressBuff []byte
byteData := data[1:]
if bCompress == true {
var unCompressErr error
compressBuff,unCompressErr = compressor.UncompressBlock(byteData)
if unCompressErr!= nil {
agent.conn.ReleaseReadMsg(data)
log.SError("rpcClient ", agent.conn.RemoteAddr().String(), " ReadMsg head error:", unCompressErr.Error())
return
}
byteData = compressBuff
}
req := MakeRpcRequest(processor, 0, 0, "", false, nil) req := MakeRpcRequest(processor, 0, 0, "", false, nil)
err = processor.Unmarshal(data[1:], req.RpcRequestData) err = processor.Unmarshal(byteData, req.RpcRequestData)
if cap(compressBuff) > 0 {
compressor.UnCompressBufferCollection(compressBuff)
}
agent.conn.ReleaseReadMsg(data) agent.conn.ReleaseReadMsg(data)
if err != nil { if err != nil {
log.SError("rpc Unmarshal request is error:", err.Error()) log.SError("rpc Unmarshal request is error:", err.Error())
@@ -248,11 +288,10 @@ func (server *Server) myselfRpcHandlerGo(client *Client,handlerName string, serv
return rpcHandler.CallMethod(client,serviceMethod, args,callBack, reply) return rpcHandler.CallMethod(client,serviceMethod, args,callBack, reply)
} }
func (server *Server) selfNodeRpcHandlerGo(timeout time.Duration,processor IRpcProcessor, client *Client, noReply bool, handlerName string, rpcMethodId uint32, serviceMethod string, args interface{}, reply interface{}, rawArgs []byte) *Call {
func (server *Server) selfNodeRpcHandlerGo(processor IRpcProcessor, client *Client, noReply bool, handlerName string, rpcMethodId uint32, serviceMethod string, args interface{}, reply interface{}, rawArgs []byte) *Call {
pCall := MakeCall() pCall := MakeCall()
pCall.Seq = client.generateSeq() pCall.Seq = client.generateSeq()
pCall.TimeOut = timeout
rpcHandler := server.rpcHandleFinder.FindRpcHandler(handlerName) rpcHandler := server.rpcHandleFinder.FindRpcHandler(handlerName)
if rpcHandler == nil { if rpcHandler == nil {
@@ -343,12 +382,12 @@ func (server *Server) selfNodeRpcHandlerGo(processor IRpcProcessor, client *Clie
return pCall return pCall
} }
func (server *Server) selfNodeRpcHandlerAsyncGo(client *Client, callerRpcHandler IRpcHandler, noReply bool, handlerName string, serviceMethod string, args interface{}, reply interface{}, callback reflect.Value) error { func (server *Server) selfNodeRpcHandlerAsyncGo(timeout time.Duration,client *Client, callerRpcHandler IRpcHandler, noReply bool, handlerName string, serviceMethod string, args interface{}, reply interface{}, callback reflect.Value,cancelable bool) (CancelRpc,error) {
rpcHandler := server.rpcHandleFinder.FindRpcHandler(handlerName) rpcHandler := server.rpcHandleFinder.FindRpcHandler(handlerName)
if rpcHandler == nil { if rpcHandler == nil {
err := errors.New("service method " + serviceMethod + " not config!") err := errors.New("service method " + serviceMethod + " not config!")
log.SError(err.Error()) log.SError(err.Error())
return err return emptyCancelRpc,err
} }
_, processor := GetProcessorType(args) _, processor := GetProcessorType(args)
@@ -356,40 +395,46 @@ func (server *Server) selfNodeRpcHandlerAsyncGo(client *Client, callerRpcHandler
if err != nil { if err != nil {
errM := errors.New("RpcHandler " + handlerName + "."+serviceMethod+" deep copy inParam is error:" + err.Error()) errM := errors.New("RpcHandler " + handlerName + "."+serviceMethod+" deep copy inParam is error:" + err.Error())
log.SError(errM.Error()) log.SError(errM.Error())
return errM return emptyCancelRpc,errM
} }
req := MakeRpcRequest(processor, 0, 0, serviceMethod, noReply, nil) req := MakeRpcRequest(processor, 0, 0, serviceMethod, noReply, nil)
req.inParam = iParam req.inParam = iParam
req.localReply = reply req.localReply = reply
cancelRpc := emptyCancelRpc
var callSeq uint64
if noReply == false { if noReply == false {
callSeq := client.generateSeq() callSeq = client.generateSeq()
pCall := MakeCall() pCall := MakeCall()
pCall.Seq = callSeq pCall.Seq = callSeq
pCall.rpcHandler = callerRpcHandler pCall.rpcHandler = callerRpcHandler
pCall.callback = &callback pCall.callback = &callback
pCall.Reply = reply pCall.Reply = reply
pCall.ServiceMethod = serviceMethod pCall.ServiceMethod = serviceMethod
pCall.TimeOut = timeout
client.AddPending(pCall) client.AddPending(pCall)
rpcCancel := RpcCancel{CallSeq: callSeq,Cli: client}
cancelRpc = rpcCancel.CancelRpc
req.requestHandle = func(Returns interface{}, Err RpcError) { req.requestHandle = func(Returns interface{}, Err RpcError) {
v := client.RemovePending(callSeq) v := client.RemovePending(callSeq)
if v == nil { if v == nil {
log.SError("rpcClient cannot find seq ", pCall.Seq, " in pending") log.SError("rpcClient cannot find seq ", callSeq, " in pending, service method is ",serviceMethod)
//ReleaseCall(pCall) //ReleaseCall(pCall)
ReleaseRpcRequest(req) ReleaseRpcRequest(req)
return return
} }
if len(Err) == 0 { if len(Err) == 0 {
pCall.Err = nil v.Err = nil
} else { } else {
pCall.Err = Err v.Err = Err
} }
if Returns != nil { if Returns != nil {
pCall.Reply = Returns v.Reply = Returns
} }
pCall.rpcHandler.PushRpcResponse(pCall) v.rpcHandler.PushRpcResponse(v)
ReleaseRpcRequest(req) ReleaseRpcRequest(req)
} }
} }
@@ -397,8 +442,11 @@ func (server *Server) selfNodeRpcHandlerAsyncGo(client *Client, callerRpcHandler
err = rpcHandler.PushRpcRequest(req) err = rpcHandler.PushRpcRequest(req)
if err != nil { if err != nil {
ReleaseRpcRequest(req) ReleaseRpcRequest(req)
return err if callSeq > 0 {
client.RemovePending(callSeq)
}
return emptyCancelRpc,err
} }
return nil return cancelRpc,nil
} }

View File

@@ -213,7 +213,7 @@ func (cs *CustomerSubscriber) publishToCustomer(topicData []TopicData) bool {
} }
//推送数据 //推送数据
err := cs.CallNode(cs.fromNodeId, cs.callBackRpcMethod, &dbQueuePublishReq, &dbQueuePushRes) err := cs.CallNodeWithTimeout(4*time.Minute,cs.fromNodeId, cs.callBackRpcMethod, &dbQueuePublishReq, &dbQueuePushRes)
if err != nil { if err != nil {
time.Sleep(time.Second * 1) time.Sleep(time.Second * 1)
continue continue

View File

@@ -7,6 +7,7 @@ import (
"reflect" "reflect"
"runtime" "runtime"
"time" "time"
"sync/atomic"
) )
// ITimer // ITimer
@@ -29,7 +30,7 @@ type OnAddTimer func(timer ITimer)
// Timer // Timer
type Timer struct { type Timer struct {
Id uint64 Id uint64
cancelled bool //是否关闭 cancelled int32 //是否关闭
C chan ITimer //定时器管道 C chan ITimer //定时器管道
interval time.Duration // 时间间隔(用于循环定时器) interval time.Duration // 时间间隔(用于循环定时器)
fireTime time.Time // 触发时间 fireTime time.Time // 触发时间
@@ -171,12 +172,12 @@ func (t *Timer) GetInterval() time.Duration {
} }
func (t *Timer) Cancel() { func (t *Timer) Cancel() {
t.cancelled = true atomic.StoreInt32(&t.cancelled,1)
} }
// 判断定时器是否已经取消 // 判断定时器是否已经取消
func (t *Timer) IsActive() bool { func (t *Timer) IsActive() bool {
return !t.cancelled return atomic.LoadInt32(&t.cancelled) == 0
} }
func (t *Timer) GetName() string { func (t *Timer) GetName() string {