mirror of
https://github.com/duanhf2012/origin.git
synced 2026-02-03 22:45:13 +08:00
260 lines
6.7 KiB
Go
260 lines
6.7 KiB
Go
package network
|
||
|
||
import (
|
||
"github.com/duanhf2012/origin/v2/log"
|
||
"github.com/duanhf2012/origin/v2/network/processor"
|
||
"github.com/xtaci/kcp-go/v5"
|
||
"sync"
|
||
"time"
|
||
)
|
||
|
||
type KCPServer struct {
|
||
NewAgent func(Conn) Agent
|
||
|
||
kcpCfg *KcpCfg
|
||
blockCrypt kcp.BlockCrypt
|
||
|
||
process processor.IRawProcessor
|
||
msgParser MsgParser
|
||
conns ConnSet
|
||
mutexConns sync.Mutex
|
||
wgLn sync.WaitGroup
|
||
wgConns sync.WaitGroup
|
||
|
||
listener *kcp.Listener
|
||
}
|
||
|
||
/*
|
||
NoDelayCfg
|
||
|
||
普通模式: ikcp_nodelay(kcp, 0, 40, 0, 0);
|
||
极速模式: ikcp_nodelay(kcp, 1, 10, 2, 1);
|
||
*/
|
||
type NoDelayCfg struct {
|
||
NoDelay int // 是否启用 nodelay模式,0不启用;1启用
|
||
IntervalMill int // 协议内部工作的 interval,单位毫秒,比如 10ms或者 20ms
|
||
Resend int // 快速重传模式,默认0关闭,可以设置2(2次ACK跨越将会直接重传)
|
||
CongestionControl int // 是否关闭流控,默认是0代表不关闭,1代表关闭
|
||
}
|
||
|
||
const (
|
||
DefaultNoDelay = 1
|
||
DefaultIntervalMill = 10
|
||
DefaultResend = 2
|
||
DefaultCongestionControl = 1
|
||
|
||
DefaultMtu = 1400
|
||
DefaultSndWndSize = 4096
|
||
DefaultRcvWndSize = 4096
|
||
DefaultStreamMode = true
|
||
DefaultDSCP = 46
|
||
DefaultDataShards = 10
|
||
DefaultParityShards = 0
|
||
|
||
DefaultReadDeadlineMill = 15 * time.Second
|
||
DefaultWriteDeadlineMill = 15 * time.Second
|
||
|
||
DefaultMaxConnNum = 20000
|
||
)
|
||
|
||
type KcpCfg struct {
|
||
ListenAddr string // 监听地址
|
||
MaxConnNum int //最大连接数
|
||
NoDelay *NoDelayCfg
|
||
|
||
Mtu *int // mtu大小
|
||
SndWndSize *int // 发送窗口大小,默认1024
|
||
RcvWndSize *int // 接收窗口大小,默认1024
|
||
ReadDeadlineMill *time.Duration // 读超时毫秒
|
||
WriteDeadlineMill *time.Duration // 写超时毫秒
|
||
StreamMode *bool // 是否打开流模式,默认true
|
||
DSCP *int // 差分服务代码点,默认46
|
||
ReadBuffSize *int // 读Buff大小,默认
|
||
WriteBuffSize *int // 写Buff大小
|
||
|
||
// 用于 FEC(前向纠错)的数据分片和校验分片数量,,默认10,0
|
||
DataShards *int
|
||
ParityShards *int
|
||
|
||
// 包体内容
|
||
|
||
LittleEndian bool //是否小端序
|
||
LenMsgLen int //消息头占用byte数量,只能是1byte,2byte,4byte。如果是4byte,意味着消息最大可以是math.MaxUint32(4GB)
|
||
MinMsgLen uint32 //最小消息长度
|
||
MaxReadMsgLen uint32 //最大读消息长度,超过判定不合法,断开连接
|
||
MaxWriteMsgLen uint32 // 最大写消息长度
|
||
PendingWriteNum int //写channel最大消息数量
|
||
}
|
||
|
||
func (kp *KCPServer) Init(kcpCfg *KcpCfg) {
|
||
kp.kcpCfg = kcpCfg
|
||
kp.msgParser.Init()
|
||
kp.msgParser.LenMsgLen = kp.kcpCfg.LenMsgLen
|
||
kp.msgParser.MaxReadMsgLen = kp.kcpCfg.MaxReadMsgLen
|
||
kp.msgParser.MaxWriteMsgLen = kp.kcpCfg.MaxWriteMsgLen
|
||
kp.msgParser.MinMsgLen = kp.kcpCfg.MinMsgLen
|
||
kp.msgParser.LittleEndian = kp.kcpCfg.LittleEndian
|
||
|
||
// setting default noDelay
|
||
if kp.kcpCfg.NoDelay == nil {
|
||
var noDelay NoDelayCfg
|
||
noDelay.NoDelay = DefaultNoDelay
|
||
noDelay.IntervalMill = DefaultIntervalMill
|
||
noDelay.Resend = DefaultResend
|
||
noDelay.CongestionControl = DefaultCongestionControl
|
||
kp.kcpCfg.NoDelay = &noDelay
|
||
}
|
||
|
||
if kp.kcpCfg.Mtu == nil {
|
||
mtu := DefaultMtu
|
||
kp.kcpCfg.Mtu = &mtu
|
||
}
|
||
|
||
if kp.kcpCfg.SndWndSize == nil {
|
||
sndWndSize := DefaultSndWndSize
|
||
kp.kcpCfg.SndWndSize = &sndWndSize
|
||
}
|
||
if kp.kcpCfg.RcvWndSize == nil {
|
||
rcvWndSize := DefaultRcvWndSize
|
||
kp.kcpCfg.RcvWndSize = &rcvWndSize
|
||
}
|
||
if kp.kcpCfg.ReadDeadlineMill == nil {
|
||
readDeadlineMill := DefaultReadDeadlineMill
|
||
kp.kcpCfg.ReadDeadlineMill = &readDeadlineMill
|
||
} else {
|
||
*kp.kcpCfg.ReadDeadlineMill *= time.Millisecond
|
||
}
|
||
if kp.kcpCfg.WriteDeadlineMill == nil {
|
||
writeDeadlineMill := DefaultWriteDeadlineMill
|
||
kp.kcpCfg.WriteDeadlineMill = &writeDeadlineMill
|
||
} else {
|
||
*kp.kcpCfg.WriteDeadlineMill *= time.Millisecond
|
||
}
|
||
if kp.kcpCfg.StreamMode == nil {
|
||
streamMode := DefaultStreamMode
|
||
kp.kcpCfg.StreamMode = &streamMode
|
||
}
|
||
if kp.kcpCfg.DataShards == nil {
|
||
dataShards := DefaultDataShards
|
||
kp.kcpCfg.DataShards = &dataShards
|
||
}
|
||
if kp.kcpCfg.ParityShards == nil {
|
||
parityShards := DefaultParityShards
|
||
kp.kcpCfg.ParityShards = &parityShards
|
||
}
|
||
if kp.kcpCfg.DSCP == nil {
|
||
dss := DefaultDSCP
|
||
kp.kcpCfg.DSCP = &dss
|
||
}
|
||
|
||
if kp.kcpCfg.MaxConnNum == 0 {
|
||
kp.kcpCfg.MaxConnNum = DefaultMaxConnNum
|
||
}
|
||
|
||
kp.conns = make(ConnSet, 2048)
|
||
kp.msgParser.Init()
|
||
return
|
||
}
|
||
|
||
func (kp *KCPServer) Start() error {
|
||
listener, err := kcp.ListenWithOptions(kp.kcpCfg.ListenAddr, kp.blockCrypt, *kp.kcpCfg.DataShards, *kp.kcpCfg.ParityShards)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
if kp.kcpCfg.ReadBuffSize != nil {
|
||
err = listener.SetReadBuffer(*kp.kcpCfg.ReadBuffSize)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
}
|
||
if kp.kcpCfg.WriteBuffSize != nil {
|
||
err = listener.SetWriteBuffer(*kp.kcpCfg.WriteBuffSize)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
}
|
||
err = listener.SetDSCP(*kp.kcpCfg.DSCP)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
kp.listener = listener
|
||
|
||
kp.wgLn.Add(1)
|
||
go func() {
|
||
defer kp.wgLn.Done()
|
||
for kp.run(listener) {
|
||
}
|
||
}()
|
||
|
||
return nil
|
||
}
|
||
|
||
func (kp *KCPServer) initSession(session *kcp.UDPSession) {
|
||
session.SetStreamMode(*kp.kcpCfg.StreamMode)
|
||
session.SetWindowSize(*kp.kcpCfg.SndWndSize, *kp.kcpCfg.RcvWndSize)
|
||
session.SetNoDelay(kp.kcpCfg.NoDelay.NoDelay, kp.kcpCfg.NoDelay.IntervalMill, kp.kcpCfg.NoDelay.Resend, kp.kcpCfg.NoDelay.CongestionControl)
|
||
session.SetDSCP(*kp.kcpCfg.DSCP)
|
||
session.SetMtu(*kp.kcpCfg.Mtu)
|
||
session.SetACKNoDelay(false)
|
||
|
||
//session.SetWriteDeadline(time.Now().Add(*kp.kcpCfg.WriteDeadlineMill))
|
||
}
|
||
|
||
func (kp *KCPServer) run(listener *kcp.Listener) bool {
|
||
conn, err := listener.Accept()
|
||
if err != nil {
|
||
log.Error("accept error", log.String("ListenAddr", kp.kcpCfg.ListenAddr), log.ErrorField("err", err))
|
||
return false
|
||
}
|
||
|
||
kp.mutexConns.Lock()
|
||
if len(kp.conns) >= kp.kcpCfg.MaxConnNum {
|
||
kp.mutexConns.Unlock()
|
||
conn.Close()
|
||
log.Warn("too many connections")
|
||
return true
|
||
}
|
||
kp.conns[conn] = struct{}{}
|
||
kp.mutexConns.Unlock()
|
||
|
||
if kp.kcpCfg.ReadBuffSize != nil {
|
||
conn.(*kcp.UDPSession).SetReadBuffer(*kp.kcpCfg.ReadBuffSize)
|
||
}
|
||
if kp.kcpCfg.WriteBuffSize != nil {
|
||
conn.(*kcp.UDPSession).SetWriteBuffer(*kp.kcpCfg.WriteBuffSize)
|
||
}
|
||
kp.initSession(conn.(*kcp.UDPSession))
|
||
|
||
netConn := newNetConn(conn, kp.kcpCfg.PendingWriteNum, &kp.msgParser, *kp.kcpCfg.WriteDeadlineMill)
|
||
agent := kp.NewAgent(netConn)
|
||
kp.wgConns.Add(1)
|
||
go func() {
|
||
agent.Run()
|
||
// cleanup
|
||
conn.Close()
|
||
kp.mutexConns.Lock()
|
||
delete(kp.conns, conn)
|
||
kp.mutexConns.Unlock()
|
||
agent.OnClose()
|
||
|
||
kp.wgConns.Done()
|
||
}()
|
||
|
||
return true
|
||
}
|
||
|
||
func (kp *KCPServer) Close() {
|
||
kp.listener.Close()
|
||
kp.wgLn.Wait()
|
||
|
||
kp.mutexConns.Lock()
|
||
for conn := range kp.conns {
|
||
conn.Close()
|
||
}
|
||
kp.conns = nil
|
||
kp.mutexConns.Unlock()
|
||
kp.wgConns.Wait()
|
||
}
|