Files
origin/rpc/server.go
2024-04-23 10:44:21 +08:00

218 lines
5.9 KiB
Go

package rpc
import (
"fmt"
"github.com/duanhf2012/origin/v2/log"
"github.com/duanhf2012/origin/v2/network"
"math"
"net"
"reflect"
"strings"
"time"
"runtime"
)
const Default_ReadWriteDeadline = 15*time.Second
type RpcProcessorType uint8
const (
RpcProcessorJson RpcProcessorType = 0
RpcProcessorPB RpcProcessorType = 1
)
var arrayProcessor = []IRpcProcessor{&JsonProcessor{}, &PBProcessor{}}
var arrayProcessorLen uint8 = 2
var LittleEndian bool
type IServer interface {
Start() error
Stop()
selfNodeRpcHandlerGo(timeout time.Duration,processor IRpcProcessor, client *Client, noReply bool, handlerName string, rpcMethodId uint32, serviceMethod string, args interface{}, reply interface{}, rawArgs []byte) *Call
myselfRpcHandlerGo(client *Client,handlerName string, serviceMethod string, args interface{},callBack reflect.Value, reply interface{}) error
selfNodeRpcHandlerAsyncGo(timeout time.Duration,client *Client, callerRpcHandler IRpcHandler, noReply bool, handlerName string, serviceMethod string, args interface{}, reply interface{}, callback reflect.Value,cancelable bool) (CancelRpc,error)
}
type writeResponse func(processor IRpcProcessor,connTag string, serviceMethod string, seq uint64, reply interface{}, rpcError RpcError)
type Server struct {
BaseServer
functions map[interface{}]interface{}
rpcServer *network.TCPServer
listenAddr string
maxRpcParamLen uint32
}
type RpcAgent struct {
conn network.Conn
rpcServer *Server
userData interface{}
}
func AppendProcessor(rpcProcessor IRpcProcessor) {
arrayProcessor = append(arrayProcessor, rpcProcessor)
arrayProcessorLen++
}
func GetProcessorType(param interface{}) (RpcProcessorType, IRpcProcessor) {
for i := uint8(1); i < arrayProcessorLen; i++ {
if arrayProcessor[i].IsParse(param) == true {
return RpcProcessorType(i), arrayProcessor[i]
}
}
return RpcProcessorJson, arrayProcessor[RpcProcessorJson]
}
func GetProcessor(processorType uint8) IRpcProcessor {
if processorType >= arrayProcessorLen {
return nil
}
return arrayProcessor[processorType]
}
func (server *Server) Init(listenAddr string, maxRpcParamLen uint32,compressBytesLen int,rpcHandleFinder RpcHandleFinder) {
server.initBaseServer(compressBytesLen,rpcHandleFinder)
server.listenAddr = listenAddr
server.maxRpcParamLen = maxRpcParamLen
server.rpcServer = &network.TCPServer{}
}
func (server *Server) Start() error{
splitAddr := strings.Split(server.listenAddr, ":")
if len(splitAddr) != 2 {
return fmt.Errorf("listen addr is failed,listenAddr:%s", server.listenAddr)
}
server.rpcServer.Addr = ":" + splitAddr[1]
server.rpcServer.MinMsgLen = 2
server.compressBytesLen = server.compressBytesLen
if server.maxRpcParamLen > 0 {
server.rpcServer.MaxMsgLen = server.maxRpcParamLen
} else {
server.rpcServer.MaxMsgLen = math.MaxUint32
}
server.rpcServer.MaxConnNum = 100000
server.rpcServer.PendingWriteNum = 2000000
server.rpcServer.NewAgent = server.NewAgent
server.rpcServer.LittleEndian = LittleEndian
server.rpcServer.WriteDeadline = Default_ReadWriteDeadline
server.rpcServer.ReadDeadline = Default_ReadWriteDeadline
server.rpcServer.LenMsgLen = DefaultRpcLenMsgLen
return server.rpcServer.Start()
}
func (server *Server) Stop(){
server.rpcServer.Close()
}
func (agent *RpcAgent) OnDestroy() {}
func (agent *RpcAgent) WriteResponse(processor IRpcProcessor, connTag string, serviceMethod string, seq uint64, reply interface{}, rpcError RpcError) {
var mReply []byte
var errM error
if reply != nil {
mReply, errM = processor.Marshal(reply)
if errM != nil {
rpcError = ConvertError(errM)
}
}
var rpcResponse RpcResponse
rpcResponse.RpcResponseData = processor.MakeRpcResponse(seq, rpcError, mReply)
bytes, errM := processor.Marshal(rpcResponse.RpcResponseData)
defer processor.ReleaseRpcResponse(rpcResponse.RpcResponseData)
if errM != nil {
log.Error("mashal RpcResponseData failed",log.String("serviceMethod",serviceMethod),log.ErrorAttr("error",errM))
return
}
var compressBuff[]byte
bCompress := uint8(0)
if agent.rpcServer.compressBytesLen >0 && len(bytes) >= agent.rpcServer.compressBytesLen {
var cErr error
compressBuff,cErr = compressor.CompressBlock(bytes)
if cErr != nil {
log.Error("CompressBlock failed",log.String("serviceMethod",serviceMethod),log.ErrorAttr("error",cErr))
return
}
if len(compressBuff) < len(bytes) {
bytes = compressBuff
bCompress = 1<<7
}
}
errM = agent.conn.WriteMsg([]byte{uint8(processor.GetProcessorType())|bCompress}, bytes)
if cap(compressBuff) >0 {
compressor.CompressBufferCollection(compressBuff)
}
if errM != nil {
log.Error("WriteMsg error,Rpc return is fail",log.String("serviceMethod",serviceMethod),log.ErrorAttr("error",errM))
}
}
func (agent *RpcAgent) Run() {
defer func() {
if r := recover(); r != nil {
buf := make([]byte, 4096)
l := runtime.Stack(buf, false)
errString := fmt.Sprint(r)
log.Dump(string(buf[:l]),log.String("error",errString))
}
}()
for {
data, err := agent.conn.ReadMsg()
if err != nil {
log.Error("read message is error",log.String("remoteAddress",agent.conn.RemoteAddr().String()),log.ErrorAttr("error",err))
//will close tcpconn
break
}
defer agent.conn.ReleaseReadMsg(data)
err = agent.rpcServer.processRpcRequest( data,"",agent.WriteResponse)
if err != nil {
log.Error("processRpcRequest is error",log.String("remoteAddress",agent.conn.RemoteAddr().String()),log.ErrorAttr("error",err))
//will close tcpconn
break
}
}
}
func (agent *RpcAgent) OnClose() {
}
func (agent *RpcAgent) WriteMsg(msg interface{}) {
}
func (agent *RpcAgent) LocalAddr() net.Addr {
return agent.conn.LocalAddr()
}
func (agent *RpcAgent) RemoteAddr() net.Addr {
return agent.conn.RemoteAddr()
}
func (agent *RpcAgent) Close() {
agent.conn.Close()
}
func (agent *RpcAgent) Destroy() {
agent.conn.Destroy()
}
func (server *Server) NewAgent(c *network.TCPConn) network.Agent {
agent := &RpcAgent{conn: c, rpcServer: server}
return agent
}