Compare commits

..

16 Commits

Author SHA1 Message Date
boyce
f89232c2dd 优化蓝图 2026-03-12 16:54:43 +08:00
boyce
f3acdc860c 优化蓝图 2026-03-10 17:32:47 +08:00
boyce
e508c69fa0 新增蓝图接口 2026-03-05 13:03:23 +08:00
boyce
6f87cda07f 优化滚动日志 2026-02-02 15:02:33 +08:00
boyce
2e4863d073 优化网络层 2026-01-08 14:37:12 +08:00
boyce
330644cebb 优化ws读写最大限制 2026-01-08 08:33:29 +08:00
boyce
ef7ee0ab8e 优化协议解析器和服务接口 2025-12-25 14:32:13 +08:00
boyce
976efe0c04 新增通过模板名获取服务和node信息 2025-12-18 13:15:38 +08:00
boyce
b61906fb24 优化日志依赖 2025-12-18 09:36:35 +08:00
boyce
29dcf2bfeb 优化日志文件生成 2025-12-18 09:33:36 +08:00
boyce
857e3c4311 优化日志 2025-12-12 10:52:08 +08:00
boyce
210a3fa982 补充readme 2025-12-11 17:33:44 +08:00
boyce
d365dde8c0 优化服务发现 2025-12-11 09:56:39 +08:00
boyce
e0d412810f 优化服务发现 2025-12-10 10:13:53 +08:00
boyce
593abd263e 优化websocket
优化蓝图
2025-12-08 15:30:44 +08:00
boyce
75660bdec0 新增结点 2025-11-27 09:15:18 +08:00
28 changed files with 534 additions and 187 deletions

View File

@@ -80,8 +80,22 @@ Etcd方式示例
"DialTimeoutMillisecond": 3000, "DialTimeoutMillisecond": 3000,
"EtcdList": [ "EtcdList": [
{ {
"NetworkName": ["network1"], "LocalNetworkName": "network_Area1",
"Endpoints": ["http://192.168.13.24:12379"] "Endpoints": ["http://127.0.0.1:12379"],
"UserName": "",
"Password": "",
"Cert": "",
"CertKey": "",
"Ca": ""
},
{
"NeighborNetworkName": ["network_Area2"],
"Endpoints": ["http://127.0.0.1:12379"],
"UserName": "",
"Password": "",
"Cert": "",
"CertKey": "",
"Ca": ""
} }
] ]
} }
@@ -92,12 +106,16 @@ TTLSecond表示健康检查TTL失效时间10秒
DialTimeoutMillisecond: 与etcd连接超时时间 DialTimeoutMillisecond: 与etcd连接超时时间
EtcdListEtcd列表可以多个Etcd服务器连接 EtcdListEtcd列表可以多个Etcd服务器连接注意列表中必需有一个LocalNetworkName项表示当前所有的Node归属当前网络名为network_Area1Node下所有的服务会往network_Area1中注册。监听该网络的结点可以发现该网络中的Service。本地网络会默认监听本地网络中所有的服务。
NetworkName所在的网络名,可以配置多个。node会往对应的网络名称中注册、监听发现Service。NetworkName也起到发现隔离的作用。 NeighborNetworkName表示监听的邻居网络名,可以发现该网络中所有Service
EndpointsEtcd服务器地址 EndpointsEtcd服务器地址
Origin方式示例 Origin方式示例
```json ```json
@@ -105,10 +123,15 @@ Origin方式示例
"Discovery": { "Discovery": {
"Origin":{ "Origin":{
"TTLSecond": 10, "TTLSecond": 10,
"LocalMasterNodeId": "bot",
"MasterNodeList": [ "MasterNodeList": [
{ {
"NodeId": "test_1", "NodeId": "bot",
"ListenAddr": "127.0.0.1:8801" "ListenAddr": "127.0.0.1:11001"
},
{
"NodeId": "mp1server",
"ListenAddr": "127.0.0.1:11000"
} }
] ]
} }
@@ -118,8 +141,12 @@ Origin方式示例
TTLSecond表示健康检查TTL失效时间10秒 TTLSecond表示健康检查TTL失效时间10秒
LocalMasterNodeId本地所有的Node归属当前Master NodeId。归属当前的所有的Node会往该Master Node中注册服务。MasterNode会自动同步给所有的监听结点。本地Node会默认监听本地Master Node中所有的服务。注意LocalMasterNodeId配置的NodeId要在MasterNodeList列表中。
MasterNodeList指定哪些Node为服务发现Master结点需要配置NodeId与ListenAddr注意它们要与实际的Node配置一致。 MasterNodeList指定哪些Node为服务发现Master结点需要配置NodeId与ListenAddr注意它们要与实际的Node配置一致。
### RpcMode部分 ### RpcMode部分
默认模式 默认模式
@@ -947,20 +974,45 @@ origin引擎默认使用读取所有结点配置的进行确认结点有哪些Se
"Private": false, "Private": false,
"remark": "//以_打头的表示只在本机进程不对整个子网开发", "remark": "//以_打头的表示只在本机进程不对整个子网开发",
"ServiceList": ["_TestService1", "TestService9", "TestService10"], "ServiceList": ["_TestService1", "TestService9", "TestService10"],
"DiscoveryService": [ "AllowDiscovery": [
{ {
"MasterNodeId": "nodeid_1", "MasterNodeId": "nodeid_1",
"NetworkName":"networkname1" "NetworkName":"networkname1",
"DiscoveryService": ["TestService8"] "NodeIdList":[".*server"],
"ServiceList": ["TestService8"]
} }
] ]
}] }]
} }
``` ```
DiscoveryService在当前nodeid为nodeid_test的结点中只发现 MasterNodeId为nodeid_1或NetworkName为networkname1网络中的TestService8服务。 以上如果是使用Etcd发现模式则表示可以发现网络名networkname1NodeId为server结尾服务名为TestService8服务。
AllowDiscovery可以配置发现的规则如果只配置MasterNodeId或NetworkName时(如果使用Etcd则只配置NetworkName,Origin则只配置MasterNodeId),则会筛选指定网络的所有服务,如下:
```
"AllowDiscovery": [
{
"NetworkName":"networkname1",
}
]
```
则以上只匹配networkname1网络名的所有服务支持正则表达式例如可以配置为"NetworkName":".*name1",则可以发现网络名为name1结尾的所有服务。
如果只发现NodeId为server结尾的所有服务可以使用以下配置方式
```
"AllowDiscovery": [
{
"NodeIdList":[".*server"]
}
]
```
筛选服务也是同上。也可以组合配置NetworkName和NodeIdList配置。
**注意**MasterNodeId与NetworkName只配置一个分别在模式为origin或者etcd服务发现类型时。
第八章HttpService使用 第八章HttpService使用
----------------------- -----------------------

View File

@@ -7,6 +7,8 @@ import (
"strings" "strings"
"sync" "sync"
"regexp"
"github.com/duanhf2012/origin/v2/event" "github.com/duanhf2012/origin/v2/event"
"github.com/duanhf2012/origin/v2/log" "github.com/duanhf2012/origin/v2/log"
"github.com/duanhf2012/origin/v2/rpc" "github.com/duanhf2012/origin/v2/rpc"
@@ -24,21 +26,23 @@ const (
Discard NodeStatus = 1 //丢弃 Discard NodeStatus = 1 //丢弃
) )
type DiscoveryService struct { // AllowDiscovery 允许发现的网络服务
MasterNodeId string //要筛选的主结点Id如果不配置或者配置成0表示针对所有的主结点 type AllowDiscovery struct {
NetworkName string //如果是etcd指定要筛选的网络名中的服务不配置表示所有的网络 MasterNodeId string // 支持正则表达式
ServiceList []string //只发现的服务列表 NetworkName string // 支持正则表达式
NodeIdList []string // 支持正则表达式
ServiceList []string
} }
type NodeInfo struct { type NodeInfo struct {
NodeId string NodeId string
Private bool Private bool
ListenAddr string ListenAddr string
MaxRpcParamLen uint32 //最大Rpc参数长度 MaxRpcParamLen uint32 //最大Rpc参数长度
CompressBytesLen int //超过字节进行压缩的长度 CompressBytesLen int //超过字节进行压缩的长度
ServiceList []string //所有的有序服务列表 ServiceList []string //所有的有序服务列表
PublicServiceList []string //对外公开的服务列表 PublicServiceList []string //对外公开的服务列表
DiscoveryService []DiscoveryService //筛选发现的服务,如果不配置,不进行筛选 AllowDiscovery []AllowDiscovery //允许发现的网络服务
status NodeStatus status NodeStatus
Retire bool Retire bool
} }
@@ -223,9 +227,9 @@ func (cls *Cluster) serviceDiscoverySetNodeInfo(nodeInfo *NodeInfo) {
} }
cls.mapRpc[nodeInfo.NodeId] = &rpcInfo cls.mapRpc[nodeInfo.NodeId] = &rpcInfo
if cls.IsNatsMode() == true || cls.discoveryInfo.discoveryType != OriginType { if cls.IsNatsMode() == true || cls.discoveryInfo.discoveryType != OriginType {
log.Info("Discovery nodeId and new rpc client", log.String("NodeId", nodeInfo.NodeId), log.Any("services:", nodeInfo.PublicServiceList), log.Bool("Retire", nodeInfo.Retire)) log.Info("Discovery node and new rpc client", log.String("NodeId", nodeInfo.NodeId), log.Any("services:", nodeInfo.PublicServiceList), log.Bool("Retire", nodeInfo.Retire))
} else { } else {
log.Info("Discovery nodeId and new rpc client", log.String("NodeId", nodeInfo.NodeId), log.Any("services:", nodeInfo.PublicServiceList), log.Bool("Retire", nodeInfo.Retire), log.String("nodeListenAddr", nodeInfo.ListenAddr)) log.Info("Discovery node and new rpc client", log.String("NodeId", nodeInfo.NodeId), log.Any("services:", nodeInfo.PublicServiceList), log.Bool("Retire", nodeInfo.Retire), log.String("nodeListenAddr", nodeInfo.ListenAddr))
} }
} }
@@ -407,13 +411,13 @@ func GetNodeByServiceName(serviceName string) map[string]struct{} {
return mapNodeId return mapNodeId
} }
// GetNodeByTemplateServiceName 通过模板服务名获取服务名,返回 map[serviceName真实服务名]NodeId // GetNodeByTemplateServiceName 通过模板服务名获取服务名,返回 map[serviceName真实服务名][]NodeId
func GetNodeByTemplateServiceName(templateServiceName string) map[string]string { func GetNodeByTemplateServiceName(templateServiceName string) map[string][]string {
cluster.locker.RLock() cluster.locker.RLock()
defer cluster.locker.RUnlock() defer cluster.locker.RUnlock()
mapServiceName := cluster.mapTemplateServiceNode[templateServiceName] mapServiceName := cluster.mapTemplateServiceNode[templateServiceName]
mapNodeId := make(map[string]string, 9) mapNodeId := make(map[string][]string, 9)
for serviceName := range mapServiceName { for serviceName := range mapServiceName {
mapNode, ok := cluster.mapServiceNode[serviceName] mapNode, ok := cluster.mapServiceNode[serviceName]
if ok == false { if ok == false {
@@ -421,7 +425,9 @@ func GetNodeByTemplateServiceName(templateServiceName string) map[string]string
} }
for nodeId := range mapNode { for nodeId := range mapNode {
mapNodeId[serviceName] = nodeId nodes := mapNodeId[serviceName]
nodes = append(nodes, nodeId)
mapNodeId[serviceName] = nodes
} }
} }
@@ -462,28 +468,76 @@ func (cls *Cluster) GetNodeInfo(nodeId string) (NodeInfo, bool) {
return nodeInfo.nodeInfo, true return nodeInfo.nodeInfo, true
} }
func (cls *Cluster) CanDiscoveryService(fromMasterNodeId string, serviceName string) bool { func (cls *Cluster) CanDiscoveryService(fromNetworkName string, fromMasterNodeId string, fromNodeId string, serviceName string) bool {
canDiscovery := true canDiscovery := true
// 筛选允许的服务
splitServiceName := strings.Split(serviceName, ":") splitServiceName := strings.Split(serviceName, ":")
if len(splitServiceName) == 2 { if len(splitServiceName) == 2 {
serviceName = splitServiceName[0] serviceName = splitServiceName[0]
} }
for i := 0; i < len(cls.GetLocalNodeInfo().DiscoveryService); i++ { // 先筛选允许的网络,有配置才会检测
masterNodeId := cls.GetLocalNodeInfo().DiscoveryService[i].MasterNodeId if len(cls.GetLocalNodeInfo().AllowDiscovery) > 0 {
//无效的配置,则跳过 allowNetwork := false
if masterNodeId == rpc.NodeIdNull && len(cls.GetLocalNodeInfo().DiscoveryService[i].ServiceList) == 0 { for i := 0; i < len(cls.GetLocalNodeInfo().AllowDiscovery); i++ {
continue masterNodeId := cls.GetLocalNodeInfo().AllowDiscovery[i].MasterNodeId
} networkName := cls.GetLocalNodeInfo().AllowDiscovery[i].NetworkName
nodeIdList := cls.GetLocalNodeInfo().AllowDiscovery[i].NodeIdList
serviceList := cls.GetLocalNodeInfo().AllowDiscovery[i].ServiceList
canDiscovery = false // 如果配置了网络及Master结点则匹配之
if masterNodeId == fromMasterNodeId || masterNodeId == rpc.NodeIdNull { if fromNetworkName != "" {
for _, discoveryService := range cls.GetLocalNodeInfo().DiscoveryService[i].ServiceList { matchNetWork, _ := regexp.MatchString(networkName, fromNetworkName)
if discoveryService == serviceName { if !matchNetWork {
return true continue
}
} else if fromMasterNodeId != "" {
matchMasterNode, _ := regexp.MatchString(masterNodeId, fromMasterNodeId)
if !matchMasterNode {
continue
} }
} }
// 如果配置了
if len(nodeIdList) > 0 {
hasNode := false
for _, nodeId := range nodeIdList {
matchNodeId, _ := regexp.MatchString(nodeId, fromNodeId)
if !matchNodeId {
continue
}
hasNode = true
break
}
if !hasNode {
continue
}
}
// 如果配置了服务,则匹配之
if len(serviceList) > 0 {
hasService := false
for _, service := range serviceList {
// service按正则表达式匹配serviceName
matched, _ := regexp.MatchString(service, serviceName)
if matched {
hasService = true
break
}
}
if !hasService {
continue
}
}
allowNetwork = true
break
}
if !allowNetwork {
return false
} }
} }

View File

@@ -6,6 +6,12 @@ import (
"crypto/x509" "crypto/x509"
"errors" "errors"
"fmt" "fmt"
"os"
"path"
"strings"
"sync/atomic"
"time"
"github.com/duanhf2012/origin/v2/event" "github.com/duanhf2012/origin/v2/event"
"github.com/duanhf2012/origin/v2/log" "github.com/duanhf2012/origin/v2/log"
"github.com/duanhf2012/origin/v2/rpc" "github.com/duanhf2012/origin/v2/rpc"
@@ -14,19 +20,15 @@ import (
"go.etcd.io/etcd/api/v3/mvccpb" "go.etcd.io/etcd/api/v3/mvccpb"
"go.etcd.io/etcd/client/v3" "go.etcd.io/etcd/client/v3"
"google.golang.org/protobuf/proto" "google.golang.org/protobuf/proto"
"os"
"path"
"strings"
"sync/atomic"
"time"
) )
const originDir = "/origin" const originDir = "/origin"
type etcdClientInfo struct { type etcdClientInfo struct {
watchKeys []string isLocalNetwork bool
leaseID clientv3.LeaseID watchKeys []string
keepAliveChan <-chan *clientv3.LeaseKeepAliveResponse leaseID clientv3.LeaseID
keepAliveChan <-chan *clientv3.LeaseKeepAliveResponse
} }
type EtcdDiscoveryService struct { type EtcdDiscoveryService struct {
@@ -93,6 +95,7 @@ func (ed *EtcdDiscoveryService) OnInit() error {
return errors.New("etcd discovery config is nil.") return errors.New("etcd discovery config is nil.")
} }
var hasLocalNetwork bool
for i := 0; i < len(etcdDiscoveryCfg.EtcdList); i++ { for i := 0; i < len(etcdDiscoveryCfg.EtcdList); i++ {
var client *clientv3.Client var client *clientv3.Client
var tlsConfig *tls.Config var tlsConfig *tls.Config
@@ -141,13 +144,25 @@ func (ed *EtcdDiscoveryService) OnInit() error {
} }
ec := &etcdClientInfo{} ec := &etcdClientInfo{}
for _, networkName := range etcdDiscoveryCfg.EtcdList[i].NetworkName {
ec.watchKeys = append(ec.watchKeys, fmt.Sprintf("%s/%s", originDir, networkName)) if etcdDiscoveryCfg.EtcdList[i].LocalNetworkName != "" {
hasLocalNetwork = true
ec.isLocalNetwork = true
ec.watchKeys = append(ec.watchKeys, fmt.Sprintf("%s/%s", originDir, etcdDiscoveryCfg.EtcdList[i].LocalNetworkName))
} else {
ec.isLocalNetwork = false
for _, networkName := range etcdDiscoveryCfg.EtcdList[i].NeighborNetworkName {
ec.watchKeys = append(ec.watchKeys, fmt.Sprintf("%s/%s", originDir, networkName))
}
} }
ed.mapClient[client] = ec ed.mapClient[client] = ec
} }
if !hasLocalNetwork {
return errors.New("etcd discovery init fail,cannot find local network")
}
return nil return nil
} }
@@ -167,14 +182,18 @@ func (ed *EtcdDiscoveryService) registerServiceByClient(client *clientv3.Client,
} }
etcdClient.leaseID = resp.ID etcdClient.leaseID = resp.ID
for _, watchKey := range etcdClient.watchKeys {
// 注册服务节点到 etcd // 注册服务节点到 etcd,LocalNetwork时才会注册且etcdClient.watchKeys必然>0
_, err = client.Put(context.Background(), ed.getRegisterKey(watchKey), ed.byteLocalNodeInfo, clientv3.WithLease(resp.ID)) if len(etcdClient.watchKeys) != 1 {
if err != nil { log.Error("LocalNetwork watchkey is error")
log.Error("etcd Put fail", log.ErrorField("err", err)) return
ed.tryRegisterService(client, etcdClient) }
return
} _, err = client.Put(context.Background(), ed.getRegisterKey(etcdClient.watchKeys[0]), ed.byteLocalNodeInfo, clientv3.WithLease(resp.ID))
if err != nil {
log.Error("etcd Put fail", log.ErrorField("err", err))
ed.tryRegisterService(client, etcdClient)
return
} }
etcdClient.keepAliveChan, err = client.KeepAlive(context.Background(), etcdClient.leaseID) etcdClient.keepAliveChan, err = client.KeepAlive(context.Background(), etcdClient.leaseID)
@@ -204,6 +223,10 @@ func (ed *EtcdDiscoveryService) tryRegisterService(client *clientv3.Client, etcd
return return
} }
if !etcdClient.isLocalNetwork {
return
}
ed.AfterFunc(time.Second*3, func(t *timer.Timer) { ed.AfterFunc(time.Second*3, func(t *timer.Timer) {
ed.registerServiceByClient(client, etcdClient) ed.registerServiceByClient(client, etcdClient)
}) })
@@ -229,13 +252,18 @@ func (ed *EtcdDiscoveryService) tryLaterRetire() {
func (ed *EtcdDiscoveryService) retire() error { func (ed *EtcdDiscoveryService) retire() error {
//从etcd中更新 //从etcd中更新
for c, ec := range ed.mapClient { for c, ec := range ed.mapClient {
for _, watchKey := range ec.watchKeys { if !ec.isLocalNetwork {
// 注册服务节点到 etcd continue
_, err := c.Put(context.Background(), ed.getRegisterKey(watchKey), ed.byteLocalNodeInfo, clientv3.WithLease(ec.leaseID)) }
if err != nil {
log.Error("etcd Put fail", log.ErrorField("err", err)) if len(ec.watchKeys)!=1 {
return err log.Error("LocalNetwork watchkey is error")
} continue
}
_, err := c.Put(context.Background(), ed.getRegisterKey(ec.watchKeys[0]), ed.byteLocalNodeInfo, clientv3.WithLease(ec.leaseID))
if err != nil {
log.Error("etcd Put fail", log.ErrorField("err", err))
return err
} }
} }
@@ -292,7 +320,7 @@ func (ed *EtcdDiscoveryService) setNodeInfo(networkName string, nodeInfo *rpc.No
//筛选关注的服务 //筛选关注的服务
var discoverServiceSlice = make([]string, 0, 24) var discoverServiceSlice = make([]string, 0, 24)
for _, pubService := range nodeInfo.PublicServiceList { for _, pubService := range nodeInfo.PublicServiceList {
if cluster.CanDiscoveryService(networkName, pubService) == true { if cluster.CanDiscoveryService(networkName, "", nodeInfo.NodeId, pubService) == true {
discoverServiceSlice = append(discoverServiceSlice, pubService) discoverServiceSlice = append(discoverServiceSlice, pubService)
} }
} }
@@ -503,11 +531,16 @@ func (ed *EtcdDiscoveryService) RPC_ServiceRecord(etcdServiceRecord *service.Etc
//写入到etcd中 //写入到etcd中
for c, info := range ed.mapClient { for c, info := range ed.mapClient {
for _, watchKey := range info.watchKeys { if !info.isLocalNetwork {
if ed.getNetworkNameByWatchKey(watchKey) == etcdServiceRecord.NetworkName { continue
client = c }
break
} if len(info.watchKeys)!=1 {
log.Error("")
}
if ed.getNetworkNameByWatchKey(info.watchKeys[0]) == etcdServiceRecord.NetworkName {
client = c
break
} }
} }

View File

@@ -508,11 +508,17 @@ func (dc *OriginDiscoveryClient) regServiceDiscover(nodeId string) {
if nodeId == cluster.GetLocalNodeInfo().NodeId { if nodeId == cluster.GetLocalNodeInfo().NodeId {
return return
} }
nodeInfo := cluster.getOriginMasterDiscoveryNodeInfo(nodeId) nodeInfo := cluster.getOriginMasterDiscoveryNodeInfo(nodeId)
if nodeInfo == nil { if nodeInfo == nil {
return return
} }
// 只能往本地Master结点上注册
if !cluster.IsLocalMasterNodeId(nodeId) {
return
}
var req rpc.RegServiceDiscoverReq var req rpc.RegServiceDiscoverReq
req.NodeInfo = &rpc.NodeInfo{} req.NodeInfo = &rpc.NodeInfo{}
req.NodeInfo.NodeId = cluster.localNodeInfo.NodeId req.NodeInfo.NodeId = cluster.localNodeInfo.NodeId
@@ -548,7 +554,7 @@ func (dc *OriginDiscoveryClient) setNodeInfo(masterNodeId string, nodeInfo *rpc.
//筛选关注的服务 //筛选关注的服务
var discoverServiceSlice = make([]string, 0, 24) var discoverServiceSlice = make([]string, 0, 24)
for _, pubService := range nodeInfo.PublicServiceList { for _, pubService := range nodeInfo.PublicServiceList {
if cluster.CanDiscoveryService(masterNodeId, pubService) == true { if cluster.CanDiscoveryService("",masterNodeId, nodeInfo.NodeId,pubService) == true {
discoverServiceSlice = append(discoverServiceSlice, pubService) discoverServiceSlice = append(discoverServiceSlice, pubService)
} }
} }
@@ -617,13 +623,22 @@ func (cls *Cluster) IsOriginMasterDiscoveryNode(nodeId string) bool {
return cls.getOriginMasterDiscoveryNodeInfo(nodeId) != nil return cls.getOriginMasterDiscoveryNodeInfo(nodeId) != nil
} }
func (cls *Cluster) IsLocalMasterNodeId(nodeId string) bool {
if cls.discoveryInfo.Origin == nil {
return false
}
return cls.discoveryInfo.Origin.LocalMasterNodeId == nodeId
}
func (cls *Cluster) getOriginMasterDiscoveryNodeInfo(nodeId string) *NodeInfo { func (cls *Cluster) getOriginMasterDiscoveryNodeInfo(nodeId string) *NodeInfo {
if cls.discoveryInfo.Origin == nil { if cls.discoveryInfo.Origin == nil {
return nil return nil
} }
for i := 0; i < len(cls.discoveryInfo.Origin.MasterNodeList); i++ { for i := 0; i < len(cls.discoveryInfo.Origin.MasterNodeList); i++ {
if cls.discoveryInfo.Origin.MasterNodeList[i].NodeId == nodeId {
if cls.discoveryInfo.Origin.MasterNodeList[i].NodeId == nodeId {
return &cls.discoveryInfo.Origin.MasterNodeList[i] return &cls.discoveryInfo.Origin.MasterNodeList[i]
} }
} }

View File

@@ -16,27 +16,27 @@ import (
var json = jsoniter.ConfigCompatibleWithStandardLibrary var json = jsoniter.ConfigCompatibleWithStandardLibrary
type EtcdList struct { type EtcdList struct {
NetworkName []string LocalNetworkName string // 如果配置,则为本地网络,必需配置一个本地网络
Endpoints []string NeighborNetworkName []string
UserName string Endpoints []string
Password string UserName string
Cert string Password string
CertKey string Cert string
Ca string CertKey string
Ca string
} }
type EtcdDiscovery struct { type EtcdDiscovery struct {
DialTimeoutMillisecond time.Duration DialTimeoutMillisecond time.Duration
TTLSecond int64 TTLSecond int64
EtcdList []EtcdList
EtcdList []EtcdList
} }
type OriginDiscovery struct { type OriginDiscovery struct {
TTLSecond int64 TTLSecond int64
MasterNodeList []NodeInfo LocalMasterNodeId string
MasterNodeList []NodeInfo
} }
type DiscoveryType int type DiscoveryType int
@@ -72,7 +72,7 @@ type NodeInfoList struct {
} }
func validConfigFile(f string) bool { func validConfigFile(f string) bool {
return strings.HasSuffix(f, ".json")|| strings.HasSuffix(f, ".yml") || strings.HasSuffix(f, ".yaml") return strings.HasSuffix(f, ".json") || strings.HasSuffix(f, ".yml") || strings.HasSuffix(f, ".yaml")
} }
func yamlToJson(data []byte, v interface{}) ([]byte, error) { func yamlToJson(data []byte, v interface{}) ([]byte, error) {
@@ -131,21 +131,20 @@ func (d *DiscoveryInfo) setEtcd(etcd *EtcdDiscovery) error {
return fmt.Errorf("repeat configuration of Discovery") return fmt.Errorf("repeat configuration of Discovery")
} }
//Endpoints不允许重复
mapAddr := make(map[string]struct{})
for _, n := range etcd.EtcdList { for _, n := range etcd.EtcdList {
for _, endPoint := range n.Endpoints {
if _, ok := mapAddr[endPoint]; ok == true {
return fmt.Errorf("etcd discovery config Etcd.EtcdList.Endpoints %+v is repeat", endPoint)
}
mapAddr[endPoint] = struct{}{}
}
//networkName不允许重复 //networkName不允许重复
mapNetworkName := make(map[string]struct{}) mapNetworkName := make(map[string]struct{})
for _, netName := range n.NetworkName { if n.LocalNetworkName != "" {
if _, ok := mapNetworkName[n.LocalNetworkName]; ok == true {
return fmt.Errorf("etcd discovery config Etcd.EtcdList.LocalNetworkName %+v is repeat", n.LocalNetworkName)
}
mapNetworkName[n.LocalNetworkName] = struct{}{}
continue
}
for _, netName := range n.NeighborNetworkName {
if _, ok := mapNetworkName[netName]; ok == true { if _, ok := mapNetworkName[netName]; ok == true {
return fmt.Errorf("etcd discovery config Etcd.EtcdList.NetworkName %+v is repeat", n.NetworkName) return fmt.Errorf("etcd discovery config Etcd.EtcdList.NetworkName %+v is repeat", netName)
} }
mapNetworkName[netName] = struct{}{} mapNetworkName[netName] = struct{}{}
@@ -275,7 +274,7 @@ func (cls *Cluster) readLocalClusterConfig(nodeId string) (DiscoveryInfo, []Node
var rpcMode RpcMode var rpcMode RpcMode
//读取任何文件,只读符合格式的配置,目录下的文件可以自定义分文件 //读取任何文件,只读符合格式的配置,目录下的文件可以自定义分文件
err := filepath.Walk(configDir, func(path string, info fs.FileInfo, err error)error { err := filepath.Walk(configDir, func(path string, info fs.FileInfo, err error) error {
if info.IsDir() { if info.IsDir() {
return nil return nil
} }
@@ -340,7 +339,7 @@ func (cls *Cluster) readLocalService(localNodeId string) error {
nodeService := map[string]interface{}{} nodeService := map[string]interface{}{}
//读取任何文件,只读符合格式的配置,目录下的文件可以自定义分文件 //读取任何文件,只读符合格式的配置,目录下的文件可以自定义分文件
err := filepath.Walk(configDir, func(path string, info fs.FileInfo, err error)error{ err := filepath.Walk(configDir, func(path string, info fs.FileInfo, err error) error {
if info.IsDir() { if info.IsDir() {
return nil return nil
} }
@@ -432,7 +431,7 @@ func (cls *Cluster) readLocalService(localNodeId string) error {
return nil return nil
} }
func (cls *Cluster) parseLocalCfg() error{ func (cls *Cluster) parseLocalCfg() error {
rpcInfo := NodeRpcInfo{} rpcInfo := NodeRpcInfo{}
rpcInfo.nodeInfo = cls.localNodeInfo rpcInfo.nodeInfo = cls.localNodeInfo
rpcInfo.client = rpc.NewLClient(rpcInfo.nodeInfo.NodeId, &cls.callSet) rpcInfo.client = rpc.NewLClient(rpcInfo.nodeInfo.NodeId, &cls.callSet)
@@ -454,7 +453,7 @@ func (cls *Cluster) parseLocalCfg() error{
cls.mapServiceNode[serviceName] = make(map[string]struct{}) cls.mapServiceNode[serviceName] = make(map[string]struct{})
} }
if _,ok:=cls.mapServiceNode[serviceName][cls.localNodeInfo.NodeId];ok { if _, ok := cls.mapServiceNode[serviceName][cls.localNodeInfo.NodeId]; ok {
return fmt.Errorf("duplicate service %s is configured in node %s", serviceName, cls.localNodeInfo.NodeId) return fmt.Errorf("duplicate service %s is configured in node %s", serviceName, cls.localNodeInfo.NodeId)
} }
cls.mapServiceNode[serviceName][cls.localNodeInfo.NodeId] = struct{}{} cls.mapServiceNode[serviceName][cls.localNodeInfo.NodeId] = struct{}{}
@@ -508,6 +507,30 @@ func (cls *Cluster) IsConfigService(serviceName string) bool {
return ok return ok
} }
// GetServiceInfoByTemplateService 通过模板服务名获取map[服务名]map[NodeId]struct{}
func (cls *Cluster) GetServiceInfoByTemplateService(templateServiceName string) map[string]map[string]struct{} {
mapService := map[string]map[string]struct{}{}
cls.locker.RLock()
defer cls.locker.RUnlock()
mapServiceName := cls.mapTemplateServiceNode[templateServiceName]
for serviceName := range mapServiceName {
mapNodeId, ok := cls.mapServiceNode[serviceName]
if ok == true {
for nodeId := range mapNodeId{
mapNodeIds:=mapService[serviceName]
if mapNodeIds==nil {
mapNodeIds = map[string]struct{}{}
mapService[serviceName] = mapNodeIds
}
mapNodeIds[nodeId] = struct{}{}
}
}
}
return mapService
}
func (cls *Cluster) GetNodeIdByTemplateService(templateServiceName string, rpcClientList []*rpc.Client, filterRetire bool) (error, []*rpc.Client) { func (cls *Cluster) GetNodeIdByTemplateService(templateServiceName string, rpcClientList []*rpc.Client, filterRetire bool) (error, []*rpc.Client) {
cls.locker.RLock() cls.locker.RLock()
defer cls.locker.RUnlock() defer cls.locker.RUnlock()

2
go.mod
View File

@@ -6,7 +6,7 @@ toolchain go1.22.7
require ( require (
github.com/IBM/sarama v1.43.3 github.com/IBM/sarama v1.43.3
github.com/duanhf2012/rotatelogs v0.0.0-20250124024205-39765c212d8a github.com/duanhf2012/rotatelogs v0.0.0-20260202065658-f38ef69c6a39
github.com/gin-gonic/gin v1.10.0 github.com/gin-gonic/gin v1.10.0
github.com/go-sql-driver/mysql v1.6.0 github.com/go-sql-driver/mysql v1.6.0
github.com/goccy/go-json v0.10.2 github.com/goccy/go-json v0.10.2

4
go.sum
View File

@@ -20,8 +20,8 @@ github.com/coreos/go-systemd/v22 v22.3.2/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSV
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/duanhf2012/rotatelogs v0.0.0-20250124024205-39765c212d8a h1:BVmZrOSKTg9ry1YjqY6IjVXmBDsFdX/W+pnvO5cPUDc= github.com/duanhf2012/rotatelogs v0.0.0-20260202065658-f38ef69c6a39 h1:T+lS1jdEUNgkx3gG6MrO4rgkf8jGJNHSfRLKsLz31MM=
github.com/duanhf2012/rotatelogs v0.0.0-20250124024205-39765c212d8a/go.mod h1:S/NNkpdnXps6VXaYVVDFtqQAm/NKayHxxOAhsrFnCgg= github.com/duanhf2012/rotatelogs v0.0.0-20260202065658-f38ef69c6a39/go.mod h1:S/NNkpdnXps6VXaYVVDFtqQAm/NKayHxxOAhsrFnCgg=
github.com/eapache/go-resiliency v1.7.0 h1:n3NRTnBn5N0Cbi/IeOHuQn9s2UwVUH7Ga0ZWcP+9JTA= github.com/eapache/go-resiliency v1.7.0 h1:n3NRTnBn5N0Cbi/IeOHuQn9s2UwVUH7Ga0ZWcP+9JTA=
github.com/eapache/go-resiliency v1.7.0/go.mod h1:5yPzW0MIvSe0JDsv0v+DvcjEv2FyD6iZYSs1ZI+iQho= github.com/eapache/go-resiliency v1.7.0/go.mod h1:5yPzW0MIvSe0JDsv0v+DvcjEv2FyD6iZYSs1ZI+iQho=
github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 h1:Oy0F4ALJ04o5Qqpdz8XLIpNA3WM/iSIXqxtqo7UGVws= github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 h1:Oy0F4ALJ04o5Qqpdz8XLIpNA3WM/iSIXqxtqo7UGVws=

View File

@@ -142,7 +142,7 @@ func (logger *Logger) NewRotatelogsWriter() zapcore.WriteSyncer {
options = append(options, rotatelogs.WithRotationTime(time.Hour*24)) options = append(options, rotatelogs.WithRotationTime(time.Hour*24))
fileName := strings.TrimRight(logger.FileName, filepath.Ext(logger.FileName)) fileName := strings.TrimRight(logger.FileName, filepath.Ext(logger.FileName))
rotateLogs, err := rotatelogs.NewRotateLogs(LogPath, "20060102/"+fileName+"_20060102_150405", options...) rotateLogs, err := rotatelogs.NewRotateLogs(LogPath, fileName,"20060102","_20060102_150405", options...)
if err != nil { if err != nil {
panic(err) panic(err)
} }

View File

@@ -69,15 +69,15 @@ func (client *KCPClient) init() {
if client.MinMsgLen == 0 { if client.MinMsgLen == 0 {
client.MinMsgLen = Default_MinMsgLen client.MinMsgLen = Default_MinMsgLen
} }
if client.MaxMsgLen == 0 { if client.MaxReadMsgLen == 0 {
client.MaxMsgLen = Default_MaxMsgLen client.MaxReadMsgLen = Default_MaxReadMsgLen
} }
if client.LenMsgLen == 0 { if client.LenMsgLen == 0 {
client.LenMsgLen = Default_LenMsgLen client.LenMsgLen = Default_LenMsgLen
} }
maxMsgLen := client.MsgParser.getMaxMsgLen() maxMsgLen := client.MsgParser.getMaxMsgLen()
if client.MaxMsgLen > maxMsgLen { if client.MaxReadMsgLen > maxMsgLen {
client.MaxMsgLen = maxMsgLen client.MaxReadMsgLen = maxMsgLen
log.Info("invalid MaxMsgLen", log.Uint32("reset", maxMsgLen)) log.Info("invalid MaxMsgLen", log.Uint32("reset", maxMsgLen))
} }

View File

@@ -81,7 +81,8 @@ type KcpCfg struct {
LittleEndian bool //是否小端序 LittleEndian bool //是否小端序
LenMsgLen int //消息头占用byte数量只能是1byte,2byte,4byte。如果是4byte意味着消息最大可以是math.MaxUint32(4GB) LenMsgLen int //消息头占用byte数量只能是1byte,2byte,4byte。如果是4byte意味着消息最大可以是math.MaxUint32(4GB)
MinMsgLen uint32 //最小消息长度 MinMsgLen uint32 //最小消息长度
MaxMsgLen uint32 //最大消息长度,超过判定不合法,断开连接 MaxReadMsgLen uint32 //最大消息长度,超过判定不合法,断开连接
MaxWriteMsgLen uint32 // 最大写消息长度
PendingWriteNum int //写channel最大消息数量 PendingWriteNum int //写channel最大消息数量
} }
@@ -89,7 +90,8 @@ func (kp *KCPServer) Init(kcpCfg *KcpCfg) {
kp.kcpCfg = kcpCfg kp.kcpCfg = kcpCfg
kp.msgParser.Init() kp.msgParser.Init()
kp.msgParser.LenMsgLen = kp.kcpCfg.LenMsgLen kp.msgParser.LenMsgLen = kp.kcpCfg.LenMsgLen
kp.msgParser.MaxMsgLen = kp.kcpCfg.MaxMsgLen kp.msgParser.MaxReadMsgLen = kp.kcpCfg.MaxReadMsgLen
kp.msgParser.MaxWriteMsgLen = kp.kcpCfg.MaxWriteMsgLen
kp.msgParser.MinMsgLen = kp.kcpCfg.MinMsgLen kp.msgParser.MinMsgLen = kp.kcpCfg.MinMsgLen
kp.msgParser.LittleEndian = kp.kcpCfg.LittleEndian kp.msgParser.LittleEndian = kp.kcpCfg.LittleEndian

View File

@@ -45,9 +45,8 @@ func (jsonProcessor *JsonProcessor) SetByteOrder(littleEndian bool) {
} }
// MsgRoute must goroutine safe // MsgRoute must goroutine safe
func (jsonProcessor *JsonProcessor) MsgRoute(clientId string, msg interface{}, recyclerReaderBytes func(data []byte)) error { func (jsonProcessor *JsonProcessor) MsgRoute(clientId string, msg interface{}) error {
pPackInfo := msg.(*JsonPackInfo) pPackInfo := msg.(*JsonPackInfo)
defer recyclerReaderBytes(pPackInfo.rawMsg)
v, ok := jsonProcessor.mapMsg[pPackInfo.typ] v, ok := jsonProcessor.mapMsg[pPackInfo.typ]
if ok == false { if ok == false {
@@ -107,8 +106,7 @@ func (jsonProcessor *JsonProcessor) MakeRawMsg(msgType uint16, msg []byte) *Json
return &JsonPackInfo{typ: msgType, rawMsg: msg} return &JsonPackInfo{typ: msgType, rawMsg: msg}
} }
func (jsonProcessor *JsonProcessor) UnknownMsgRoute(clientId string, msg interface{}, recyclerReaderBytes func(data []byte)) { func (jsonProcessor *JsonProcessor) UnknownMsgRoute(clientId string, msg interface{}) {
defer recyclerReaderBytes(msg.([]byte))
if jsonProcessor.unknownMessageHandler == nil { if jsonProcessor.unknownMessageHandler == nil {
log.Debug("Unknown message", log.String("clientId", clientId)) log.Debug("Unknown message", log.String("clientId", clientId))
return return

View File

@@ -54,10 +54,8 @@ func (slf *PBPackInfo) GetMsg() proto.Message {
} }
// MsgRoute must goroutine safe // MsgRoute must goroutine safe
func (pbProcessor *PBProcessor) MsgRoute(clientId string, msg interface{}, recyclerReaderBytes func(data []byte)) error { func (pbProcessor *PBProcessor) MsgRoute(clientId string, msg interface{}) error {
pPackInfo := msg.(*PBPackInfo) pPackInfo := msg.(*PBPackInfo)
defer recyclerReaderBytes(pPackInfo.rawMsg)
v, ok := pbProcessor.mapMsg[pPackInfo.typ] v, ok := pbProcessor.mapMsg[pPackInfo.typ]
if ok == false { if ok == false {
return fmt.Errorf("cannot find msgtype %d is register", pPackInfo.typ) return fmt.Errorf("cannot find msgtype %d is register", pPackInfo.typ)
@@ -134,9 +132,8 @@ func (pbProcessor *PBProcessor) MakeRawMsg(msgType uint16, msg []byte) *PBPackIn
return &PBPackInfo{typ: msgType, rawMsg: msg} return &PBPackInfo{typ: msgType, rawMsg: msg}
} }
func (pbProcessor *PBProcessor) UnknownMsgRoute(clientId string, msg interface{}, recyclerReaderBytes func(data []byte)) { func (pbProcessor *PBProcessor) UnknownMsgRoute(clientId string, msg interface{}) {
pbProcessor.unknownMessageHandler(clientId, msg.([]byte)) pbProcessor.unknownMessageHandler(clientId, msg.([]byte))
recyclerReaderBytes(msg.([]byte))
} }
func (pbProcessor *PBProcessor) ConnectedRoute(clientId string) { func (pbProcessor *PBProcessor) ConnectedRoute(clientId string) {

View File

@@ -10,7 +10,7 @@ type RawMessageInfo struct {
msgHandler RawMessageHandler msgHandler RawMessageHandler
} }
type RawMessageHandler func(clientId string, packType uint16, msg []byte) type RawMessageHandler func(clientId string, packType uint16,additionData any, msg []byte)
type RawConnectHandler func(clientId string) type RawConnectHandler func(clientId string)
type UnknownRawMessageHandler func(clientId string, msg []byte) type UnknownRawMessageHandler func(clientId string, msg []byte)
@@ -39,10 +39,9 @@ func (pbRawProcessor *PBRawProcessor) SetByteOrder(littleEndian bool) {
} }
// MsgRoute must goroutine safe // MsgRoute must goroutine safe
func (pbRawProcessor *PBRawProcessor) MsgRoute(clientId string, msg interface{}, recyclerReaderBytes func(data []byte)) error { func (pbRawProcessor *PBRawProcessor) MsgRoute(clientId string, msg interface{}) error {
pPackInfo := msg.(*PBRawPackInfo) pPackInfo := msg.(*PBRawPackInfo)
pbRawProcessor.msgHandler(clientId, pPackInfo.typ, pPackInfo.rawMsg) pbRawProcessor.msgHandler(clientId, pPackInfo.typ, nil, pPackInfo.rawMsg)
recyclerReaderBytes(pPackInfo.rawMsg)
return nil return nil
} }
@@ -83,8 +82,7 @@ func (pbRawProcessor *PBRawProcessor) MakeRawMsg(msgType uint16, msg []byte, pbR
pbRawPackInfo.rawMsg = msg pbRawPackInfo.rawMsg = msg
} }
func (pbRawProcessor *PBRawProcessor) UnknownMsgRoute(clientId string, msg interface{}, recyclerReaderBytes func(data []byte)) { func (pbRawProcessor *PBRawProcessor) UnknownMsgRoute(clientId string, msg interface{}) {
defer recyclerReaderBytes(msg.([]byte))
if pbRawProcessor.unknownMessageHandler == nil { if pbRawProcessor.unknownMessageHandler == nil {
return return
} }

View File

@@ -2,9 +2,9 @@ package processor
type IProcessor interface { type IProcessor interface {
// MsgRoute must goroutine safe // MsgRoute must goroutine safe
MsgRoute(clientId string, msg interface{}, recyclerReaderBytes func(data []byte)) error MsgRoute(clientId string, msg interface{}) error
// UnknownMsgRoute must goroutine safe // UnknownMsgRoute must goroutine safe
UnknownMsgRoute(clientId string, msg interface{}, recyclerReaderBytes func(data []byte)) UnknownMsgRoute(clientId string, msg interface{})
// ConnectedRoute connect event // ConnectedRoute connect event
ConnectedRoute(clientId string) ConnectedRoute(clientId string)
DisConnectedRoute(clientId string) DisConnectedRoute(clientId string)
@@ -20,7 +20,6 @@ type IRawProcessor interface {
SetByteOrder(littleEndian bool) SetByteOrder(littleEndian bool)
SetRawMsgHandler(handle RawMessageHandler) SetRawMsgHandler(handle RawMessageHandler)
MakeRawMsg(msgType uint16, msg []byte, pbRawPackInfo *PBRawPackInfo)
SetUnknownMsgHandler(unknownMessageHandler UnknownRawMessageHandler) SetUnknownMsgHandler(unknownMessageHandler UnknownRawMessageHandler)
SetConnectedHandler(connectHandler RawConnectHandler) SetConnectedHandler(connectHandler RawConnectHandler)
SetDisConnectedHandler(disconnectHandler RawConnectHandler) SetDisConnectedHandler(disconnectHandler RawConnectHandler)

View File

@@ -68,18 +68,19 @@ func (client *TCPClient) init() {
if client.MinMsgLen == 0 { if client.MinMsgLen == 0 {
client.MinMsgLen = Default_MinMsgLen client.MinMsgLen = Default_MinMsgLen
} }
if client.MaxMsgLen == 0 { if client.MaxReadMsgLen == 0 {
client.MaxMsgLen = Default_MaxMsgLen client.MaxReadMsgLen = Default_MaxReadMsgLen
} }
if client.LenMsgLen == 0 { if client.LenMsgLen == 0 {
client.LenMsgLen = Default_LenMsgLen client.LenMsgLen = Default_LenMsgLen
} }
maxMsgLen := client.MsgParser.getMaxMsgLen() maxMsgLen := client.MsgParser.getMaxMsgLen()
if client.MaxMsgLen > maxMsgLen { if client.MaxReadMsgLen > maxMsgLen {
client.MaxMsgLen = maxMsgLen client.MaxReadMsgLen = maxMsgLen
log.Info("invalid MaxMsgLen", log.Uint32("reset", maxMsgLen)) log.Info("invalid MaxMsgLen", log.Uint32("reset", maxMsgLen))
} }
client.cons = make(ConnSet) client.cons = make(ConnSet)
client.closeFlag = false client.closeFlag = false
client.MsgParser.Init() client.MsgParser.Init()

View File

@@ -14,7 +14,8 @@ import (
type MsgParser struct { type MsgParser struct {
LenMsgLen int LenMsgLen int
MinMsgLen uint32 MinMsgLen uint32
MaxMsgLen uint32 MaxReadMsgLen uint32
MaxWriteMsgLen uint32
LittleEndian bool LittleEndian bool
bytespool.IBytesMemPool bytespool.IBytesMemPool
@@ -67,7 +68,7 @@ func (p *MsgParser) Read(r io.Reader) ([]byte, error) {
} }
// check len // check len
if msgLen > p.MaxMsgLen { if msgLen > p.MaxReadMsgLen {
return nil, errors.New("message too long") return nil, errors.New("message too long")
} else if msgLen < p.MinMsgLen { } else if msgLen < p.MinMsgLen {
return nil, errors.New("message too short") return nil, errors.New("message too short")
@@ -92,7 +93,7 @@ func (p *MsgParser) Write(conn io.Writer, args ...[]byte) error {
} }
// check len // check len
if msgLen > p.MaxMsgLen { if p.MaxWriteMsgLen > 0 && msgLen > p.MaxWriteMsgLen {
return errors.New("message too long") return errors.New("message too long")
} else if msgLen < p.MinMsgLen { } else if msgLen < p.MinMsgLen {
return errors.New("message too short") return errors.New("message too short")

View File

@@ -11,13 +11,13 @@ import (
) )
const ( const (
Default_ReadDeadline = time.Second * 30 //默认读超时30s Default_ReadDeadline = time.Second * 30 // 默认读超时30s
Default_WriteDeadline = time.Second * 30 //默认写超时30s Default_WriteDeadline = time.Second * 30 // 默认写超时30s
Default_MaxConnNum = 1000000 //默认最大连接数 Default_MaxConnNum = 1000000 // 默认最大连接数
Default_PendingWriteNum = 100000 //单连接写消息Channel容量 Default_PendingWriteNum = 100000 // 单连接写消息Channel容量
Default_MinMsgLen = 2 //最小消息长度2byte Default_MinMsgLen = 2 // 最小消息长度2byte
Default_LenMsgLen = 2 //包头字段长度占用2byte Default_LenMsgLen = 2 // 包头字段长度占用2byte
Default_MaxMsgLen = 65535 //最大消息长度 Default_MaxReadMsgLen = 65535 // 最大消息长度
) )
type TCPServer struct { type TCPServer struct {
@@ -70,14 +70,14 @@ func (server *TCPServer) init() error {
log.Info("invalid LenMsgLen", log.Int("reset", server.LenMsgLen)) log.Info("invalid LenMsgLen", log.Int("reset", server.LenMsgLen))
} }
if server.MaxMsgLen <= 0 { if server.MaxReadMsgLen <= 0 {
server.MaxMsgLen = Default_MaxMsgLen server.MaxReadMsgLen = Default_MaxReadMsgLen
log.Info("invalid MaxMsgLen", log.Uint32("reset to", server.MaxMsgLen)) log.Info("invalid MaxMsgLen", log.Uint32("reset to", server.MaxReadMsgLen))
} }
maxMsgLen := server.MsgParser.getMaxMsgLen() maxMsgLen := server.MsgParser.getMaxMsgLen()
if server.MaxMsgLen > maxMsgLen { if server.MaxReadMsgLen > maxMsgLen {
server.MaxMsgLen = maxMsgLen server.MaxReadMsgLen = maxMsgLen
log.Info("invalid MaxMsgLen", log.Uint32("reset", maxMsgLen)) log.Info("invalid MaxMsgLen", log.Uint32("reset", maxMsgLen))
} }

View File

@@ -15,16 +15,16 @@ type WSConn struct {
sync.Mutex sync.Mutex
conn *websocket.Conn conn *websocket.Conn
writeChan chan []byte writeChan chan []byte
maxMsgLen uint32 maxWriteMsgLen uint32
closeFlag bool closeFlag bool
header http.Header header http.Header
} }
func newWSConn(conn *websocket.Conn, header http.Header, pendingWriteNum int, maxMsgLen uint32, messageType int) *WSConn { func newWSConn(conn *websocket.Conn, header http.Header, pendingWriteNum int, maxWriteMsgLen uint32, messageType int) *WSConn {
wsConn := new(WSConn) wsConn := new(WSConn)
wsConn.conn = conn wsConn.conn = conn
wsConn.writeChan = make(chan []byte, pendingWriteNum) wsConn.writeChan = make(chan []byte, pendingWriteNum)
wsConn.maxMsgLen = maxMsgLen wsConn.maxWriteMsgLen = maxWriteMsgLen
wsConn.header = header wsConn.header = header
go func() { go func() {
@@ -118,7 +118,7 @@ func (wsConn *WSConn) WriteMsg(args ...[]byte) error {
} }
// check len // check len
if msgLen > wsConn.maxMsgLen { if wsConn.maxWriteMsgLen > 0 && msgLen > wsConn.maxWriteMsgLen {
return errors.New("message too long") return errors.New("message too long")
} else if msgLen < 1 { } else if msgLen < 1 {
return errors.New("message too short") return errors.New("message too short")

View File

@@ -16,7 +16,8 @@ type WSServer struct {
Addr string Addr string
MaxConnNum int MaxConnNum int
PendingWriteNum int PendingWriteNum int
MaxMsgLen uint32 MaxReadMsgLen uint32
MaxWriteMsgLen uint32
CertFile string CertFile string
KeyFile string KeyFile string
NewAgent func(*WSConn) Agent NewAgent func(*WSConn) Agent
@@ -32,7 +33,8 @@ type WSServer struct {
type WSHandler struct { type WSHandler struct {
maxConnNum int maxConnNum int
pendingWriteNum int pendingWriteNum int
maxMsgLen uint32 maxReadMsgLen uint32
maxWriteMsgLen uint32
newAgent func(*WSConn) Agent newAgent func(*WSConn) Agent
upgrader websocket.Upgrader upgrader websocket.Upgrader
conns WebsocketConnSet conns WebsocketConnSet
@@ -55,7 +57,7 @@ func (handler *WSHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
log.Error("upgrade fail", log.String("error", err.Error())) log.Error("upgrade fail", log.String("error", err.Error()))
return return
} }
conn.SetReadLimit(int64(handler.maxMsgLen)) conn.SetReadLimit(int64(handler.maxReadMsgLen))
if handler.messageType == 0 { if handler.messageType == 0 {
handler.messageType = websocket.TextMessage handler.messageType = websocket.TextMessage
} }
@@ -93,7 +95,7 @@ func (handler *WSHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
c.SetLinger(0) c.SetLinger(0)
c.SetNoDelay(true) c.SetNoDelay(true)
wsConn := newWSConn(conn, r.Header, handler.pendingWriteNum, handler.maxMsgLen, handler.messageType) wsConn := newWSConn(conn, r.Header, handler.pendingWriteNum, handler.maxWriteMsgLen, handler.messageType)
agent := handler.newAgent(wsConn) agent := handler.newAgent(wsConn)
agent.Run() agent.Run()
@@ -118,7 +120,6 @@ func (server *WSServer) Start() error {
log.Error("WSServer Listen fail", log.String("error", err.Error())) log.Error("WSServer Listen fail", log.String("error", err.Error()))
return err return err
} }
if server.MaxConnNum <= 0 { if server.MaxConnNum <= 0 {
server.MaxConnNum = 100 server.MaxConnNum = 100
log.Info("invalid MaxConnNum", log.Int("reset", server.MaxConnNum)) log.Info("invalid MaxConnNum", log.Int("reset", server.MaxConnNum))
@@ -127,9 +128,9 @@ func (server *WSServer) Start() error {
server.PendingWriteNum = 100 server.PendingWriteNum = 100
log.Info("invalid PendingWriteNum", log.Int("reset", server.PendingWriteNum)) log.Info("invalid PendingWriteNum", log.Int("reset", server.PendingWriteNum))
} }
if server.MaxMsgLen <= 0 { if server.MaxReadMsgLen <= 0 {
server.MaxMsgLen = 4096 server.MaxReadMsgLen = 4096
log.Info("invalid MaxMsgLen", log.Uint32("reset", server.MaxMsgLen)) log.Info("invalid MaxReadMsgLen", log.Uint32("reset", server.MaxReadMsgLen))
} }
if server.HandshakeTimeout <= 0 { if server.HandshakeTimeout <= 0 {
server.HandshakeTimeout = 15 * time.Second server.HandshakeTimeout = 15 * time.Second
@@ -167,7 +168,8 @@ func (server *WSServer) Start() error {
server.handler = &WSHandler{ server.handler = &WSHandler{
maxConnNum: server.MaxConnNum, maxConnNum: server.MaxConnNum,
pendingWriteNum: server.PendingWriteNum, pendingWriteNum: server.PendingWriteNum,
maxMsgLen: server.MaxMsgLen, maxReadMsgLen: server.MaxReadMsgLen,
maxWriteMsgLen: server.MaxWriteMsgLen,
newAgent: server.NewAgent, newAgent: server.NewAgent,
conns: make(WebsocketConnSet), conns: make(WebsocketConnSet),
messageType: server.messageType, messageType: server.messageType,

View File

@@ -127,9 +127,11 @@ func NewRClient(targetNodeId string, addr string, maxRpcParamLen uint32, compres
c.NewAgent = client.NewClientAgent c.NewAgent = client.NewClientAgent
if maxRpcParamLen > 0 { if maxRpcParamLen > 0 {
c.MaxMsgLen = maxRpcParamLen c.MaxReadMsgLen = maxRpcParamLen
c.MaxWriteMsgLen = maxRpcParamLen
} else { } else {
c.MaxMsgLen = math.MaxUint32 c.MaxReadMsgLen = math.MaxUint32
c.MaxWriteMsgLen = math.MaxUint32
} }
client.IRealClient = c client.IRealClient = c
client.CallSet = callSet client.CallSet = callSet

View File

@@ -91,9 +91,11 @@ func (server *Server) Start() error {
server.rpcServer.Addr = ":" + splitAddr[1] server.rpcServer.Addr = ":" + splitAddr[1]
server.rpcServer.MinMsgLen = 2 server.rpcServer.MinMsgLen = 2
if server.maxRpcParamLen > 0 { if server.maxRpcParamLen > 0 {
server.rpcServer.MaxMsgLen = server.maxRpcParamLen server.rpcServer.MaxReadMsgLen = server.maxRpcParamLen
server.rpcServer.MaxWriteMsgLen = server.maxRpcParamLen
} else { } else {
server.rpcServer.MaxMsgLen = math.MaxUint32 server.rpcServer.MaxReadMsgLen = math.MaxUint32
server.rpcServer.MaxWriteMsgLen = math.MaxUint32
} }
server.rpcServer.MaxConnNum = 100000 server.rpcServer.MaxConnNum = 100000

View File

@@ -19,6 +19,7 @@ type KcpModule struct {
mapClientLocker sync.RWMutex mapClientLocker sync.RWMutex
mapClient map[string]*Client mapClient map[string]*Client
process processor.IRawProcessor process processor.IRawProcessor
newClientIdHandler func() string
kcpServer network.KCPServer kcpServer network.KCPServer
kcpCfg *network.KcpCfg kcpCfg *network.KcpCfg
@@ -56,7 +57,11 @@ func (km *KcpModule) OnInit() error {
km.process.SetByteOrder(km.kcpCfg.LittleEndian) km.process.SetByteOrder(km.kcpCfg.LittleEndian)
km.kcpServer.Init(km.kcpCfg) km.kcpServer.Init(km.kcpCfg)
km.kcpServer.NewAgent = km.NewAgent km.kcpServer.NewAgent = km.NewAgent
if km.newClientIdHandler == nil {
km.newClientIdHandler = func()string{
return primitive.NewObjectID().Hex()
}
}
return nil return nil
} }
@@ -65,6 +70,10 @@ func (km *KcpModule) Init(kcpCfg *network.KcpCfg, process processor.IRawProcesso
km.process = process km.process = process
} }
func (km *KcpModule) SetNewClientIdHandler(newClientIdHandler func() string){
km.newClientIdHandler = newClientIdHandler
}
func (km *KcpModule) Start() error { func (km *KcpModule) Start() error {
return km.kcpServer.Start() return km.kcpServer.Start()
} }
@@ -77,9 +86,9 @@ func (km *KcpModule) kcpEventHandler(ev event.IEvent) {
case KPTDisConnected: case KPTDisConnected:
km.process.DisConnectedRoute(e.StringExt[0]) km.process.DisConnectedRoute(e.StringExt[0])
case KPTUnknownPack: case KPTUnknownPack:
km.process.UnknownMsgRoute(e.StringExt[0], e.Data, e.AnyExt[0].(func(data []byte))) km.process.UnknownMsgRoute(e.StringExt[0], e.Data)
case KPTPack: case KPTPack:
km.process.MsgRoute(e.StringExt[0], e.Data, e.AnyExt[0].(func(data []byte))) km.process.MsgRoute(e.StringExt[0], e.Data)
} }
event.DeleteEvent(ev) event.DeleteEvent(ev)
@@ -111,7 +120,7 @@ func (km *KcpModule) newClient(conn network.Conn) *Client {
km.mapClientLocker.Lock() km.mapClientLocker.Lock()
defer km.mapClientLocker.Unlock() defer km.mapClientLocker.Unlock()
pClient := &Client{kcpConn: conn.(*network.NetConn), id: primitive.NewObjectID().Hex()} pClient := &Client{kcpConn: conn.(*network.NetConn), id: km.newClientIdHandler()}
pClient.kcpModule = km pClient.kcpModule = km
km.mapClient[pClient.id] = pClient km.mapClient[pClient.id] = pClient

View File

@@ -20,6 +20,7 @@ type TcpModule struct {
mapClient map[string]*Client mapClient map[string]*Client
process processor.IRawProcessor process processor.IRawProcessor
tcpCfg *TcpCfg tcpCfg *TcpCfg
newClientIdHandler func() string
} }
type TcpPackType int8 type TcpPackType int8
@@ -35,6 +36,7 @@ type TcpPack struct {
Type TcpPackType //0表示连接 1表示断开 2表示数据 Type TcpPackType //0表示连接 1表示断开 2表示数据
ClientId string ClientId string
Data interface{} Data interface{}
rawData []byte
RecyclerReaderBytes func(data []byte) RecyclerReaderBytes func(data []byte)
} }
@@ -51,7 +53,8 @@ type TcpCfg struct {
LittleEndian bool //是否小端序 LittleEndian bool //是否小端序
LenMsgLen int //消息头占用byte数量只能是1byte,2byte,4byte。如果是4byte意味着消息最大可以是math.MaxUint32(4GB) LenMsgLen int //消息头占用byte数量只能是1byte,2byte,4byte。如果是4byte意味着消息最大可以是math.MaxUint32(4GB)
MinMsgLen uint32 //最小消息长度 MinMsgLen uint32 //最小消息长度
MaxMsgLen uint32 //最大消息长度,超过判定不合法,断开连接 MaxReadMsgLen uint32 //最大消息长度,超过判定不合法,断开连接
MaxWriteMsgLen uint32 // 最大写消息长度
ReadDeadlineSecond time.Duration //读超时 ReadDeadlineSecond time.Duration //读超时
WriteDeadlineSecond time.Duration //写超时 WriteDeadlineSecond time.Duration //写超时
} }
@@ -68,11 +71,17 @@ func (tm *TcpModule) OnInit() error {
tm.tcpServer.LittleEndian = tm.tcpCfg.LittleEndian tm.tcpServer.LittleEndian = tm.tcpCfg.LittleEndian
tm.tcpServer.LenMsgLen = tm.tcpCfg.LenMsgLen tm.tcpServer.LenMsgLen = tm.tcpCfg.LenMsgLen
tm.tcpServer.MinMsgLen = tm.tcpCfg.MinMsgLen tm.tcpServer.MinMsgLen = tm.tcpCfg.MinMsgLen
tm.tcpServer.MaxMsgLen = tm.tcpCfg.MaxMsgLen tm.tcpServer.MaxReadMsgLen = tm.tcpCfg.MaxReadMsgLen
tm.tcpServer.MaxWriteMsgLen = tm.tcpCfg.MaxWriteMsgLen
tm.tcpServer.ReadDeadline = tm.tcpCfg.ReadDeadlineSecond * time.Second tm.tcpServer.ReadDeadline = tm.tcpCfg.ReadDeadlineSecond * time.Second
tm.tcpServer.WriteDeadline = tm.tcpCfg.WriteDeadlineSecond * time.Second tm.tcpServer.WriteDeadline = tm.tcpCfg.WriteDeadlineSecond * time.Second
tm.mapClient = make(map[string]*Client, tm.tcpServer.MaxConnNum) tm.mapClient = make(map[string]*Client, tm.tcpServer.MaxConnNum)
tm.tcpServer.NewAgent = tm.NewClient tm.tcpServer.NewAgent = tm.NewClient
if tm.newClientIdHandler == nil {
tm.newClientIdHandler = func()string{
return primitive.NewObjectID().Hex()
}
}
//3.设置解析处理器 //3.设置解析处理器
tm.process.SetByteOrder(tm.tcpCfg.LittleEndian) tm.process.SetByteOrder(tm.tcpCfg.LittleEndian)
@@ -87,6 +96,10 @@ func (tm *TcpModule) Init(tcpCfg *TcpCfg, process processor.IRawProcessor) {
tm.process = process tm.process = process
} }
func (tm *TcpModule) SetNewClientIdHandler(newClientIdHandler func() string){
tm.newClientIdHandler = newClientIdHandler
}
func (tm *TcpModule) Start() error { func (tm *TcpModule) Start() error {
return tm.tcpServer.Start() return tm.tcpServer.Start()
} }
@@ -99,9 +112,11 @@ func (tm *TcpModule) tcpEventHandler(ev event.IEvent) {
case TPTDisConnected: case TPTDisConnected:
tm.process.DisConnectedRoute(pack.ClientId) tm.process.DisConnectedRoute(pack.ClientId)
case TPTUnknownPack: case TPTUnknownPack:
tm.process.UnknownMsgRoute(pack.ClientId, pack.Data, pack.RecyclerReaderBytes) tm.process.UnknownMsgRoute(pack.ClientId, pack.Data)
pack.RecyclerReaderBytes(pack.rawData)
case TPTPack: case TPTPack:
tm.process.MsgRoute(pack.ClientId, pack.Data, pack.RecyclerReaderBytes) tm.process.MsgRoute(pack.ClientId, pack.Data)
pack.RecyclerReaderBytes(pack.rawData)
} }
} }
@@ -109,7 +124,7 @@ func (tm *TcpModule) NewClient(conn network.Conn) network.Agent {
tm.mapClientLocker.Lock() tm.mapClientLocker.Lock()
defer tm.mapClientLocker.Unlock() defer tm.mapClientLocker.Unlock()
clientId := primitive.NewObjectID().Hex() clientId := tm.newClientIdHandler()
pClient := &Client{tcpConn: conn.(*network.NetConn), id: clientId} pClient := &Client{tcpConn: conn.(*network.NetConn), id: clientId}
pClient.tcpModule = tm pClient.tcpModule = tm
tm.mapClient[clientId] = pClient tm.mapClient[clientId] = pClient
@@ -138,10 +153,10 @@ func (slf *Client) Run() {
} }
data, err := slf.tcpModule.process.Unmarshal(slf.id, bytes) data, err := slf.tcpModule.process.Unmarshal(slf.id, bytes)
if err != nil { if err != nil {
slf.tcpModule.NotifyEvent(&event.Event{Type: event.Sys_Event_Tcp, Data: TcpPack{ClientId: slf.id, Type: TPTUnknownPack, Data: bytes, RecyclerReaderBytes: slf.tcpConn.GetRecyclerReaderBytes()}}) slf.tcpModule.NotifyEvent(&event.Event{Type: event.Sys_Event_Tcp, Data: TcpPack{ClientId: slf.id, Type: TPTUnknownPack, Data: data,rawData: bytes,RecyclerReaderBytes: slf.tcpConn.GetRecyclerReaderBytes()}})
continue continue
} }
slf.tcpModule.NotifyEvent(&event.Event{Type: event.Sys_Event_Tcp, Data: TcpPack{ClientId: slf.id, Type: TPTPack, Data: data, RecyclerReaderBytes: slf.tcpConn.GetRecyclerReaderBytes()}}) slf.tcpModule.NotifyEvent(&event.Event{Type: event.Sys_Event_Tcp, Data: TcpPack{ClientId: slf.id, Type: TPTPack, Data: data,rawData: bytes, RecyclerReaderBytes: slf.tcpConn.GetRecyclerReaderBytes()}})
} }
} }

View File

@@ -22,6 +22,7 @@ type WSModule struct {
mapClient map[string]*WSClient mapClient map[string]*WSClient
process processor.IRawProcessor process processor.IRawProcessor
wsCfg *WSCfg wsCfg *WSCfg
newClientIdHandler func() string
} }
type WSClient struct { type WSClient struct {
@@ -34,7 +35,8 @@ type WSCfg struct {
ListenAddr string ListenAddr string
MaxConnNum int MaxConnNum int
PendingWriteNum int PendingWriteNum int
MaxMsgLen uint32 MaxReadMsgLen uint32
MaxWriteMsgLen uint32
LittleEndian bool //是否小端序 LittleEndian bool //是否小端序
KeyFile string KeyFile string
CertFile string CertFile string
@@ -67,12 +69,17 @@ func (ws *WSModule) OnInit() error {
ws.WSServer.MaxConnNum = ws.wsCfg.MaxConnNum ws.WSServer.MaxConnNum = ws.wsCfg.MaxConnNum
ws.WSServer.PendingWriteNum = ws.wsCfg.PendingWriteNum ws.WSServer.PendingWriteNum = ws.wsCfg.PendingWriteNum
ws.WSServer.MaxMsgLen = ws.wsCfg.MaxMsgLen ws.WSServer.MaxReadMsgLen = ws.wsCfg.MaxReadMsgLen
ws.WSServer.MaxWriteMsgLen = ws.wsCfg.MaxWriteMsgLen
ws.WSServer.Addr = ws.wsCfg.ListenAddr ws.WSServer.Addr = ws.wsCfg.ListenAddr
ws.WSServer.HandshakeTimeout = ws.wsCfg.HandshakeTimeoutSecond*time.Second ws.WSServer.HandshakeTimeout = ws.wsCfg.HandshakeTimeoutSecond*time.Second
ws.WSServer.ReadTimeout = ws.wsCfg.ReadTimeoutSecond*time.Second ws.WSServer.ReadTimeout = ws.wsCfg.ReadTimeoutSecond*time.Second
ws.WSServer.WriteTimeout = ws.wsCfg.WriteTimeoutSecond*time.Second ws.WSServer.WriteTimeout = ws.wsCfg.WriteTimeoutSecond*time.Second
if ws.newClientIdHandler == nil {
ws.newClientIdHandler = func()string{
return primitive.NewObjectID().Hex()
}
}
if ws.wsCfg.KeyFile != "" && ws.wsCfg.CertFile != "" { if ws.wsCfg.KeyFile != "" && ws.wsCfg.CertFile != "" {
ws.WSServer.KeyFile = ws.wsCfg.KeyFile ws.WSServer.KeyFile = ws.wsCfg.KeyFile
ws.WSServer.CertFile = ws.wsCfg.CertFile ws.WSServer.CertFile = ws.wsCfg.CertFile
@@ -95,6 +102,10 @@ func (ws *WSModule) Init(wsCfg *WSCfg, process processor.IRawProcessor) {
ws.process = process ws.process = process
} }
func (ws *WSModule) SetNewClientIdHandler(newClientIdHandler func() string){
ws.newClientIdHandler = newClientIdHandler
}
func (ws *WSModule) Start() error { func (ws *WSModule) Start() error {
return ws.WSServer.Start() return ws.WSServer.Start()
} }
@@ -107,9 +118,9 @@ func (ws *WSModule) wsEventHandler(ev event.IEvent) {
case WPTDisConnected: case WPTDisConnected:
ws.process.DisConnectedRoute(pack.ClientId) ws.process.DisConnectedRoute(pack.ClientId)
case WPTUnknownPack: case WPTUnknownPack:
ws.process.UnknownMsgRoute(pack.ClientId, pack.Data, ws.recyclerReaderBytes) ws.process.UnknownMsgRoute(pack.ClientId, pack.Data)
case WPTPack: case WPTPack:
ws.process.MsgRoute(pack.ClientId, pack.Data, ws.recyclerReaderBytes) ws.process.MsgRoute(pack.ClientId, pack.Data)
} }
} }
@@ -119,8 +130,7 @@ func (ws *WSModule) recyclerReaderBytes([]byte) {
func (ws *WSModule) NewWSClient(conn *network.WSConn) network.Agent { func (ws *WSModule) NewWSClient(conn *network.WSConn) network.Agent {
ws.mapClientLocker.Lock() ws.mapClientLocker.Lock()
defer ws.mapClientLocker.Unlock() defer ws.mapClientLocker.Unlock()
pClient := &WSClient{wsConn: conn, id: ws.newClientIdHandler()}
pClient := &WSClient{wsConn: conn, id: primitive.NewObjectID().Hex()}
pClient.wsModule = ws pClient.wsModule = ws
ws.mapClient[pClient.id] = pClient ws.mapClient[pClient.id] = pClient
@@ -141,7 +151,7 @@ func (wc *WSClient) Run() {
} }
data, err := wc.wsModule.process.Unmarshal(wc.id, bytes) data, err := wc.wsModule.process.Unmarshal(wc.id, bytes)
if err != nil { if err != nil {
wc.wsModule.NotifyEvent(&event.Event{Type: event.Sys_Event_WebSocket, Data: &WSPack{ClientId: wc.id, Type: WPTUnknownPack, Data: bytes}}) wc.wsModule.NotifyEvent(&event.Event{Type: event.Sys_Event_WebSocket, Data: &WSPack{ClientId: wc.id, Type: WPTUnknownPack, Data: data}})
continue continue
} }
wc.wsModule.NotifyEvent(&event.Event{Type: event.Sys_Event_WebSocket, Data: &WSPack{ClientId: wc.id, Type: WPTPack, Data: data}}) wc.wsModule.NotifyEvent(&event.Event{Type: event.Sys_Event_WebSocket, Data: &WSPack{ClientId: wc.id, Type: WPTPack, Data: data}})
@@ -159,6 +169,22 @@ func (ws *WSModule) GetProcessor() processor.IRawProcessor {
return ws.process return ws.process
} }
func (ws *WSModule) GetClientHeader(clientId string,key string) string {
ws.mapClientLocker.Lock()
defer ws.mapClientLocker.Unlock()
pClient, ok := ws.mapClient[clientId]
if ok == false || pClient.wsConn == nil {
return ""
}
if pClient.wsConn.GetHeader() == nil {
log.Warn("clientId header is nil", log.String("clientId", clientId))
}
return pClient.wsConn.GetHeader().Get(key)
}
func (ws *WSModule) GetClientIp(clientId string) string { func (ws *WSModule) GetClientIp(clientId string) string {
ws.mapClientLocker.Lock() ws.mapClientLocker.Lock()
defer ws.mapClientLocker.Unlock() defer ws.mapClientLocker.Unlock()

View File

@@ -69,6 +69,9 @@ func (bm *Blueprint) regSysNodes() {
bm.RegisterExecNode(NewExecNode[EqualSwitch]()) bm.RegisterExecNode(NewExecNode[EqualSwitch]())
bm.RegisterExecNode(NewExecNode[Probability]()) bm.RegisterExecNode(NewExecNode[Probability]())
bm.RegisterExecNode(NewExecNode[CreateTimer]()) bm.RegisterExecNode(NewExecNode[CreateTimer]())
bm.RegisterExecNode(NewExecNode[AppendIntReturn]())
bm.RegisterExecNode(NewExecNode[AppendStringReturn]())
bm.RegisterExecNode(NewExecNode[IntInArray]())
} }
@@ -185,9 +188,12 @@ func (bm *Blueprint) Do(graphID int64, entranceID int64, args ...any) (Port_Arra
return nil, fmt.Errorf("can not find graph:%d", graphID) return nil, fmt.Errorf("can not find graph:%d", graphID)
} }
return graph.Do(entranceID, args...) clone := graph.Clone()
return clone.Do(entranceID, args...)
} }
func (bm *Blueprint) ReleaseGraph(graphID int64) { func (bm *Blueprint) ReleaseGraph(graphID int64) {
if graphID == 0 { if graphID == 0 {
return return

View File

@@ -579,3 +579,11 @@ func (en *BaseExecNode) GetBlueprintModule() IBlueprintModule {
return en.gr.IBlueprintModule return en.gr.IBlueprintModule
} }
func (en *BaseExecNode) GetAndCreateReturnPort() IPort{
if en.gr == nil {
return nil
}
return en.gr.GetAndCreateReturnPort()
}

View File

@@ -18,6 +18,7 @@ type IGraph interface {
Release() Release()
GetGraphFileName() string GetGraphFileName() string
HotReload(newBaseGraph *baseGraph) HotReload(newBaseGraph *baseGraph)
Clone() IGraph
} }
type IBlueprintModule interface { type IBlueprintModule interface {
@@ -142,6 +143,17 @@ func (gc *graphConfig) GetNodeByID(nodeID string) *nodeConfig {
return nil return nil
} }
func (gr *Graph) GetAndCreateReturnPort() IPort {
p, ok := gr.globalVariables[ReturnVarial]
if ok && p != nil {
return p
}
p = NewPortArray()
gr.globalVariables[ReturnVarial] = p
return p
}
func (gr *Graph) Do(entranceID int64, args ...any) (Port_Array, error) { func (gr *Graph) Do(entranceID int64, args ...any) (Port_Array, error) {
if IsDebug { if IsDebug {
log.Debug("Graph Do", log.String("graphName", gr.graphFileName), log.Int64("graphID", gr.graphID), log.Int64("entranceID", entranceID)) log.Debug("Graph Do", log.String("graphName", gr.graphFileName), log.Int64("graphID", gr.graphID), log.Int64("entranceID", entranceID))
@@ -157,6 +169,8 @@ func (gr *Graph) Do(entranceID int64, args ...any) (Port_Array, error) {
if gr.globalVariables == nil { if gr.globalVariables == nil {
gr.globalVariables = map[string]IPort{} gr.globalVariables = map[string]IPort{}
} else {
gr.globalVariables[ReturnVarial] = nil
} }
err := entranceNode.Do(gr, args...) err := entranceNode.Do(gr, args...)
@@ -213,6 +227,11 @@ func (gr *Graph) HotReload(newBaseGraph *baseGraph) {
gr.baseGraph = newBaseGraph gr.baseGraph = newBaseGraph
} }
func (gr *Graph) GetGraphFileName() string{ func (gr *Graph) GetGraphFileName() string {
return gr.graphFileName return gr.graphFileName
} }
func (gr *Graph) Clone() IGraph {
cloneGr := *gr
return &cloneGr
}

View File

@@ -681,7 +681,7 @@ func (em *CreateTimer) Exec() (int, error) {
array, ok := em.GetInPortArray(1) array, ok := em.GetInPortArray(1)
if !ok { if !ok {
return -1, fmt.Errorf("CreateTimer inParam 0 error") return -1, fmt.Errorf("CreateTimer inParam 1 error")
} }
var timerId uint64 var timerId uint64
@@ -718,7 +718,7 @@ func (em *CloseTimer) GetName() string {
func (em *CloseTimer) Exec() (int, error) { func (em *CloseTimer) Exec() (int, error) {
timerID, ok := em.GetInPortInt(1) timerID, ok := em.GetInPortInt(1)
if !ok { if !ok {
return -1, fmt.Errorf("CreateTimer inParam 0 error") return -1, fmt.Errorf("CreateTimer inParam 1 error")
} }
id := uint64(timerID) id := uint64(timerID)
@@ -729,3 +729,88 @@ func (em *CloseTimer) Exec() (int, error) {
return 0, nil return 0, nil
} }
// AppendIntReturn 追加返回结果(Int)
type AppendIntReturn struct {
BaseExecNode
}
func (em *AppendIntReturn) GetName() string {
return "AppendIntReturn"
}
func (em *AppendIntReturn) Exec() (int, error) {
val, ok := em.GetInPortInt(1)
if !ok {
return -1, fmt.Errorf("AppendIntReturn inParam 1 error")
}
returnPort := em.gr.GetAndCreateReturnPort()
if returnPort == nil {
return -1, fmt.Errorf("GetAndCreateReturnPort fail")
}
returnPort.AppendArrayValInt(val)
return -0, nil
}
// AppendStringReturn 追加返回结果(String)
type AppendStringReturn struct {
BaseExecNode
}
func (em *AppendStringReturn) GetName() string {
return "AppendStringReturn"
}
func (em *AppendStringReturn) Exec() (int, error) {
val, ok := em.GetInPortStr(1)
if !ok {
return -1, fmt.Errorf("AppendStringReturn inParam 1 error")
}
returnPort := em.gr.GetAndCreateReturnPort()
if returnPort == nil {
return -1, fmt.Errorf("GetAndCreateReturnPort fail")
}
returnPort.AppendArrayValStr(val)
return -0, nil
}
// IntInArray 判断整型值是否在整型数组中
type IntInArray struct {
BaseExecNode
}
func (em *IntInArray) GetName() string {
return "IntInArray"
}
func (em *IntInArray) Exec() (int, error) {
val, ok := em.GetInPortInt(1)
if !ok {
return -1, fmt.Errorf("IntInArray inParam 1 not found")
}
array, ok := em.GetInPortArray(2)
if !ok {
return -1, fmt.Errorf("IntInArray inParam 0 not found")
}
bFind := false
for i := range array {
if array[i].IntVal == val {
bFind = true
break
}
}
em.SetOutPortBool(1,bFind)
return -0, nil
}