Compare commits

..

17 Commits

Author SHA1 Message Date
orgin
7f93aa5ff9 新增对连续buff内存中指定索引位打标记函数 2022-09-27 16:49:11 +08:00
orgin
7a8d312aeb 修复WSService服务在特定服务安装顺序下设置MessageType不成功的问题 2022-09-23 10:49:44 +08:00
orgin
f931f61f7b 新增编译标记以及编译操作系统类型 2022-09-22 10:54:27 +08:00
orgin
151ed123f4 优化WSClient的MessageType 2022-09-22 10:50:24 +08:00
orgin
5a6a4c8a0d 新增编译标记以及编译操作系统类型 2022-09-22 10:37:27 +08:00
orgin
280c04a5d7 优化二分查找算法 2022-09-08 09:18:18 +08:00
orgin
1520dae223 替换ioutil包为os/io包,它在1.16开始被弃用 2022-08-17 14:28:01 +08:00
orgin
84f3429564 新增加、减、乘的运算函数 2022-08-08 15:28:21 +08:00
orgin
89fd5d273b 优化自恢复开协程函数GoRecover 2022-07-15 20:25:51 +08:00
orgin
3ce873ef04 优化获取NodeId接口 2022-07-11 15:34:28 +08:00
orgin
3763f7d848 整理优化cluster 2022-07-11 10:55:57 +08:00
orgin
769f680b17 新增服务发现事件监听 2022-07-07 13:38:38 +08:00
orgin
77988906f8 优化记录rpc的方法名称 2022-06-30 17:56:16 +08:00
orgin
ae0ba1d966 优化循环队列test 2022-06-30 10:38:05 +08:00
orgin
f61fd5d1be 新增循环队列 2022-06-30 09:50:32 +08:00
orgin
eb1867c5fd 扩展IService新增接口 2022-06-29 11:15:22 +08:00
orgin
8823d5fba4 扩展服务,新增自定义服务事件管道大小接口 2022-06-28 17:33:03 +08:00
24 changed files with 762 additions and 215 deletions

View File

@@ -46,16 +46,18 @@ type Cluster struct {
globalCfg interface{} //全局配置 globalCfg interface{} //全局配置
localServiceCfg map[string]interface{} //map[serviceName]配置数据* localServiceCfg map[string]interface{} //map[serviceName]配置数据*
mapRpc map[int]NodeRpcInfo //nodeId
serviceDiscovery IServiceDiscovery //服务发现接口 serviceDiscovery IServiceDiscovery //服务发现接口
locker sync.RWMutex //结点与服务关系保护锁 locker sync.RWMutex //结点与服务关系保护锁
mapRpc map[int]NodeRpcInfo //nodeId
mapIdNode map[int]NodeInfo //map[NodeId]NodeInfo mapIdNode map[int]NodeInfo //map[NodeId]NodeInfo
mapServiceNode map[string]map[int]struct{} //map[serviceName]map[NodeId] mapServiceNode map[string]map[int]struct{} //map[serviceName]map[NodeId]
rpcServer rpc.Server rpcServer rpc.Server
rpcEventLocker sync.RWMutex //Rpc事件监听保护锁 rpcEventLocker sync.RWMutex //Rpc事件监听保护锁
mapServiceListenRpcEvent map[string]struct{} //ServiceName mapServiceListenRpcEvent map[string]struct{} //ServiceName
mapServiceListenDiscoveryEvent map[string]struct{} //ServiceName
} }
func GetCluster() *Cluster { func GetCluster() *Cluster {
@@ -94,9 +96,10 @@ func (cls *Cluster) DelNode(nodeId int, immediately bool) {
return return
} }
cls.locker.Lock() cls.locker.Lock()
defer cls.locker.Unlock()
nodeInfo, ok := cls.mapIdNode[nodeId] nodeInfo, ok := cls.mapIdNode[nodeId]
if ok == false { if ok == false {
cls.locker.Unlock()
return return
} }
@@ -112,7 +115,6 @@ func (cls *Cluster) DelNode(nodeId int, immediately bool) {
if rpc.client.IsConnected() { if rpc.client.IsConnected() {
nodeInfo.status = Discard nodeInfo.status = Discard
rpc.client.Unlock() rpc.client.Unlock()
cls.locker.Unlock()
log.SRelease("Discard node ", nodeInfo.NodeId, " ", nodeInfo.ListenAddr) log.SRelease("Discard node ", nodeInfo.NodeId, " ", nodeInfo.ListenAddr)
return return
} }
@@ -126,7 +128,6 @@ func (cls *Cluster) DelNode(nodeId int, immediately bool) {
delete(cls.mapIdNode, nodeId) delete(cls.mapIdNode, nodeId)
delete(cls.mapRpc, nodeId) delete(cls.mapRpc, nodeId)
cls.locker.Unlock()
if ok == true { if ok == true {
rpc.client.Close(false) rpc.client.Close(false)
} }
@@ -224,6 +225,9 @@ func (cls *Cluster) Init(localNodeId int, setupServiceFun SetupServiceFun) error
//2.安装服务发现结点 //2.安装服务发现结点
cls.SetupServiceDiscovery(localNodeId, setupServiceFun) cls.SetupServiceDiscovery(localNodeId, setupServiceFun)
service.RegRpcEventFun = cls.RegRpcEvent service.RegRpcEventFun = cls.RegRpcEvent
service.UnRegRpcEventFun = cls.UnRegRpcEvent
service.RegDiscoveryServiceEventFun = cls.RegDiscoveryEvent
service.UnRegDiscoveryServiceEventFun = cls.UnReDiscoveryEvent
err = cls.serviceDiscovery.InitDiscovery(localNodeId, cls.serviceDiscoveryDelNode, cls.serviceDiscoverySetNodeInfo) err = cls.serviceDiscovery.InitDiscovery(localNodeId, cls.serviceDiscoveryDelNode, cls.serviceDiscoverySetNodeInfo)
if err != nil { if err != nil {
@@ -364,6 +368,7 @@ func (cls *Cluster) triggerRpcEvent(bConnect bool, clientSeq uint32, nodeId int)
cls.locker.Unlock() cls.locker.Unlock()
cls.rpcEventLocker.Lock() cls.rpcEventLocker.Lock()
defer cls.rpcEventLocker.Unlock()
for serviceName, _ := range cls.mapServiceListenRpcEvent { for serviceName, _ := range cls.mapServiceListenRpcEvent {
ser := service.GetService(serviceName) ser := service.GetService(serviceName)
if ser == nil { if ser == nil {
@@ -376,7 +381,27 @@ func (cls *Cluster) triggerRpcEvent(bConnect bool, clientSeq uint32, nodeId int)
eventData.NodeId = nodeId eventData.NodeId = nodeId
ser.(service.IModule).NotifyEvent(&eventData) ser.(service.IModule).NotifyEvent(&eventData)
} }
cls.rpcEventLocker.Unlock() }
func (cls *Cluster) TriggerDiscoveryEvent(bDiscovery bool, nodeId int, serviceName []string) {
cls.rpcEventLocker.Lock()
defer cls.rpcEventLocker.Unlock()
for sName, _ := range cls.mapServiceListenDiscoveryEvent {
ser := service.GetService(sName)
if ser == nil {
log.SError("cannot find service name ", serviceName)
continue
}
var eventData service.DiscoveryServiceEvent
eventData.IsDiscovery = bDiscovery
eventData.NodeId = nodeId
eventData.ServiceName = serviceName
ser.(service.IModule).NotifyEvent(&eventData)
}
} }
func (cls *Cluster) GetLocalNodeInfo() *NodeInfo { func (cls *Cluster) GetLocalNodeInfo() *NodeInfo {
@@ -399,14 +424,25 @@ func (cls *Cluster) UnRegRpcEvent(serviceName string) {
cls.rpcEventLocker.Unlock() cls.rpcEventLocker.Unlock()
} }
func (cls *Cluster) FetchAllNodeId(fetchFun func(nodeId int)) {
cls.locker.Lock() func (cls *Cluster) RegDiscoveryEvent(serviceName string) {
for nodeId, _ := range cls.mapIdNode { cls.rpcEventLocker.Lock()
fetchFun(nodeId) if cls.mapServiceListenDiscoveryEvent == nil {
cls.mapServiceListenDiscoveryEvent = map[string]struct{}{}
} }
cls.locker.Unlock()
cls.mapServiceListenDiscoveryEvent[serviceName] = struct{}{}
cls.rpcEventLocker.Unlock()
} }
func (cls *Cluster) UnReDiscoveryEvent(serviceName string) {
cls.rpcEventLocker.Lock()
delete(cls.mapServiceListenDiscoveryEvent, serviceName)
cls.rpcEventLocker.Unlock()
}
func HasService(nodeId int, serviceName string) bool { func HasService(nodeId int, serviceName string) bool {
cluster.locker.RLock() cluster.locker.RLock()
defer cluster.locker.RUnlock() defer cluster.locker.RUnlock()
@@ -420,6 +456,32 @@ func HasService(nodeId int, serviceName string) bool {
return false return false
} }
func GetNodeByServiceName(serviceName string) map[int]struct{} {
cluster.locker.RLock()
defer cluster.locker.RUnlock()
mapNode, ok := cluster.mapServiceNode[serviceName]
if ok == false {
return nil
}
mapNodeId := map[int]struct{}{}
for nodeId,_ := range mapNode {
mapNodeId[nodeId] = struct{}{}
}
return mapNodeId
}
func (cls *Cluster) GetGlobalCfg() interface{} { func (cls *Cluster) GetGlobalCfg() interface{} {
return cls.globalCfg return cls.globalCfg
} }
func (cls *Cluster) GetNodeInfo(nodeId int) (NodeInfo,bool) {
cls.locker.RLock()
defer cls.locker.RUnlock()
nodeInfo,ok:= cls.mapIdNode[nodeId]
return nodeInfo,ok
}

View File

@@ -290,6 +290,8 @@ func (dc *DynamicDiscoveryClient) RPC_SubServiceDiscover(req *rpc.SubscribeDisco
//删除不必要的结点 //删除不必要的结点
for _, nodeId := range willDelNodeId { for _, nodeId := range willDelNodeId {
nodeInfo,_ := cluster.GetNodeInfo(int(nodeId))
cluster.TriggerDiscoveryEvent(false,int(nodeId),nodeInfo.PublicServiceList)
dc.removeMasterNode(req.MasterNodeId, int32(nodeId)) dc.removeMasterNode(req.MasterNodeId, int32(nodeId))
if dc.findNodeId(nodeId) == false { if dc.findNodeId(nodeId) == false {
dc.funDelService(int(nodeId), false) dc.funDelService(int(nodeId), false)
@@ -300,6 +302,7 @@ func (dc *DynamicDiscoveryClient) RPC_SubServiceDiscover(req *rpc.SubscribeDisco
for _, nodeInfo := range mapNodeInfo { for _, nodeInfo := range mapNodeInfo {
dc.addMasterNode(req.MasterNodeId, nodeInfo.NodeId) dc.addMasterNode(req.MasterNodeId, nodeInfo.NodeId)
dc.setNodeInfo(nodeInfo) dc.setNodeInfo(nodeInfo)
cluster.TriggerDiscoveryEvent(true,int(nodeInfo.NodeId),nodeInfo.PublicServiceList)
} }
return nil return nil

View File

@@ -5,7 +5,7 @@ import (
"github.com/duanhf2012/origin/log" "github.com/duanhf2012/origin/log"
"github.com/duanhf2012/origin/rpc" "github.com/duanhf2012/origin/rpc"
jsoniter "github.com/json-iterator/go" jsoniter "github.com/json-iterator/go"
"io/ioutil" "os"
"strings" "strings"
) )
@@ -18,7 +18,7 @@ type NodeInfoList struct {
func (cls *Cluster) ReadClusterConfig(filepath string) (*NodeInfoList, error) { func (cls *Cluster) ReadClusterConfig(filepath string) (*NodeInfoList, error) {
c := &NodeInfoList{} c := &NodeInfoList{}
d, err := ioutil.ReadFile(filepath) d, err := os.ReadFile(filepath)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -33,7 +33,7 @@ func (cls *Cluster) ReadClusterConfig(filepath string) (*NodeInfoList, error) {
func (cls *Cluster) readServiceConfig(filepath string) (interface{}, map[string]interface{}, map[int]map[string]interface{}, error) { func (cls *Cluster) readServiceConfig(filepath string) (interface{}, map[string]interface{}, map[int]map[string]interface{}, error) {
c := map[string]interface{}{} c := map[string]interface{}{}
//读取配置 //读取配置
d, err := ioutil.ReadFile(filepath) d, err := os.ReadFile(filepath)
if err != nil { if err != nil {
return nil, nil, nil, err return nil, nil, nil, err
} }
@@ -69,7 +69,7 @@ func (cls *Cluster) readLocalClusterConfig(nodeId int) ([]NodeInfo, []NodeInfo,
var nodeInfoList []NodeInfo var nodeInfoList []NodeInfo
var masterDiscoverNodeList []NodeInfo var masterDiscoverNodeList []NodeInfo
clusterCfgPath := strings.TrimRight(configDir, "/") + "/cluster" clusterCfgPath := strings.TrimRight(configDir, "/") + "/cluster"
fileInfoList, err := ioutil.ReadDir(clusterCfgPath) fileInfoList, err := os.ReadDir(clusterCfgPath)
if err != nil { if err != nil {
return nil, nil, fmt.Errorf("Read dir %s is fail :%+v", clusterCfgPath, err) return nil, nil, fmt.Errorf("Read dir %s is fail :%+v", clusterCfgPath, err)
} }
@@ -111,7 +111,7 @@ func (cls *Cluster) readLocalClusterConfig(nodeId int) ([]NodeInfo, []NodeInfo,
func (cls *Cluster) readLocalService(localNodeId int) error { func (cls *Cluster) readLocalService(localNodeId int) error {
clusterCfgPath := strings.TrimRight(configDir, "/") + "/cluster" clusterCfgPath := strings.TrimRight(configDir, "/") + "/cluster"
fileInfoList, err := ioutil.ReadDir(clusterCfgPath) fileInfoList, err := os.ReadDir(clusterCfgPath)
if err != nil { if err != nil {
return fmt.Errorf("Read dir %s is fail :%+v", clusterCfgPath, err) return fmt.Errorf("Read dir %s is fail :%+v", clusterCfgPath, err)
} }

View File

@@ -10,7 +10,8 @@ const (
Sys_Event_Tcp EventType = -3 Sys_Event_Tcp EventType = -3
Sys_Event_Http_Event EventType = -4 Sys_Event_Http_Event EventType = -4
Sys_Event_WebSocket EventType = -5 Sys_Event_WebSocket EventType = -5
Sys_Event_Rpc_Event EventType = -6 Sys_Event_Node_Event EventType = -6
Sys_Event_DiscoverService EventType = -7
Sys_Event_User_Define EventType = 1 Sys_Event_User_Define EventType = 1
) )

View File

@@ -14,6 +14,7 @@ type WSClient struct {
ConnectInterval time.Duration ConnectInterval time.Duration
PendingWriteNum int PendingWriteNum int
MaxMsgLen uint32 MaxMsgLen uint32
MessageType int
HandshakeTimeout time.Duration HandshakeTimeout time.Duration
AutoReconnect bool AutoReconnect bool
NewAgent func(*WSConn) Agent NewAgent func(*WSConn) Agent
@@ -21,7 +22,7 @@ type WSClient struct {
cons WebsocketConnSet cons WebsocketConnSet
wg sync.WaitGroup wg sync.WaitGroup
closeFlag bool closeFlag bool
messageType int
} }
func (client *WSClient) Start() { func (client *WSClient) Start() {
@@ -63,7 +64,11 @@ func (client *WSClient) init() {
if client.cons != nil { if client.cons != nil {
log.SFatal("client is running") log.SFatal("client is running")
} }
client.messageType = websocket.TextMessage
if client.MessageType == 0 {
client.MessageType = websocket.TextMessage
}
client.cons = make(WebsocketConnSet) client.cons = make(WebsocketConnSet)
client.closeFlag = false client.closeFlag = false
client.dialer = websocket.Dialer{ client.dialer = websocket.Dialer{
@@ -84,9 +89,6 @@ func (client *WSClient) dial() *websocket.Conn {
} }
} }
func (client *WSClient) SetMessageType(messageType int){
client.messageType = messageType
}
func (client *WSClient) connect() { func (client *WSClient) connect() {
defer client.wg.Done() defer client.wg.Done()
@@ -106,7 +108,7 @@ reconnect:
client.cons[conn] = struct{}{} client.cons[conn] = struct{}{}
client.Unlock() client.Unlock()
wsConn := newWSConn(conn, client.PendingWriteNum, client.MaxMsgLen,client.messageType) wsConn := newWSConn(conn, client.PendingWriteNum, client.MaxMsgLen,client.MessageType)
agent := client.NewAgent(wsConn) agent := client.NewAgent(wsConn)
agent.Run() agent.Run()

View File

@@ -139,6 +139,7 @@ func (server *WSServer) Start() {
maxMsgLen: server.MaxMsgLen, maxMsgLen: server.MaxMsgLen,
newAgent: server.NewAgent, newAgent: server.NewAgent,
conns: make(WebsocketConnSet), conns: make(WebsocketConnSet),
messageType:server.messageType,
upgrader: websocket.Upgrader{ upgrader: websocket.Upgrader{
HandshakeTimeout: server.HTTPTimeout, HandshakeTimeout: server.HTTPTimeout,
CheckOrigin: func(_ *http.Request) bool { return true }, CheckOrigin: func(_ *http.Request) bool { return true },

View File

@@ -8,9 +8,9 @@ import (
"github.com/duanhf2012/origin/log" "github.com/duanhf2012/origin/log"
"github.com/duanhf2012/origin/profiler" "github.com/duanhf2012/origin/profiler"
"github.com/duanhf2012/origin/service" "github.com/duanhf2012/origin/service"
"github.com/duanhf2012/origin/util/timer"
"github.com/duanhf2012/origin/util/buildtime" "github.com/duanhf2012/origin/util/buildtime"
"io/ioutil" "github.com/duanhf2012/origin/util/timer"
"io"
slog "log" slog "log"
"net/http" "net/http"
_ "net/http/pprof" _ "net/http/pprof"
@@ -31,6 +31,13 @@ var bValid bool
var configDir = "./config/" var configDir = "./config/"
var logLevel string = "debug" var logLevel string = "debug"
var logPath string var logPath string
type BuildOSType = int8
const(
Windows BuildOSType = 0
Linux BuildOSType = 1
Mac BuildOSType = 2
)
func init() { func init() {
@@ -107,7 +114,7 @@ func getRunProcessPid(nodeId int) (int,error) {
return 0, err return 0, err
} }
pidByte,errs := ioutil.ReadAll(f) pidByte, errs := io.ReadAll(f)
if errs != nil { if errs != nil {
return 0, errs return 0, errs
} }
@@ -274,7 +281,6 @@ func startNode(args interface{}) error{
return nil return nil
} }
func Setup(s ...service.IService) { func Setup(s ...service.IService) {
for _, sv := range s { for _, sv := range s {
sv.OnSetup(sv) sv.OnSetup(sv)

View File

@@ -15,3 +15,7 @@ func KillProcess(processId int){
fmt.Printf("kill processid %d is successful.\n",processId) fmt.Printf("kill processid %d is successful.\n",processId)
} }
} }
func GetBuildOSType() BuildOSType{
return Linux
}

View File

@@ -15,3 +15,7 @@ func KillProcess(processId int){
fmt.Printf("kill processid %d is successful.\n",processId) fmt.Printf("kill processid %d is successful.\n",processId)
} }
} }
func GetBuildOSType() BuildOSType{
return Mac
}

View File

@@ -5,3 +5,7 @@ package node
func KillProcess(processId int){ func KillProcess(processId int){
} }
func GetBuildOSType() BuildOSType{
return Windows
}

View File

@@ -68,11 +68,16 @@ type RpcHandler struct {
} }
type TriggerRpcEvent func(bConnect bool, clientSeq uint32, nodeId int) type TriggerRpcEvent func(bConnect bool, clientSeq uint32, nodeId int)
type IRpcListener interface { type INodeListener interface {
OnNodeConnected(nodeId int) OnNodeConnected(nodeId int)
OnNodeDisconnect(nodeId int) OnNodeDisconnect(nodeId int)
} }
type IDiscoveryServiceListener interface {
OnDiscoveryService(nodeId int, serviceName []string)
OnUnDiscoveryService(nodeId int, serviceName []string)
}
type IRpcHandler interface { type IRpcHandler interface {
IRpcHandlerChannel IRpcHandlerChannel
GetName() string GetName() string

View File

@@ -338,7 +338,7 @@ func (server *Server) selfNodeRpcHandlerAsyncGo(client *Client, callerRpcHandler
pCall.rpcHandler = callerRpcHandler pCall.rpcHandler = callerRpcHandler
pCall.callback = &callback pCall.callback = &callback
pCall.Reply = reply pCall.Reply = reply
pCall.ServiceMethod = serviceMethod
client.AddPending(pCall) client.AddPending(pCall)
req.requestHandle = func(Returns interface{}, Err RpcError) { req.requestHandle = func(Returns interface{}, Err RpcError) {
v := client.RemovePending(callSeq) v := client.RemovePending(callSeq)

View File

@@ -22,18 +22,24 @@ var timerDispatcherLen = 100000
type IService interface { type IService interface {
Init(iService IService,getClientFun rpc.FuncRpcClient,getServerFun rpc.FuncRpcServer,serviceCfg interface{}) Init(iService IService,getClientFun rpc.FuncRpcClient,getServerFun rpc.FuncRpcServer,serviceCfg interface{})
SetName(serviceName string) Wait()
GetName() string Start()
OnSetup(iService IService) OnSetup(iService IService)
OnInit() error OnInit() error
OnStart() OnStart()
OnRelease() OnRelease()
Wait()
Start() SetName(serviceName string)
GetName() string
GetRpcHandler() rpc.IRpcHandler GetRpcHandler() rpc.IRpcHandler
GetServiceCfg()interface{} GetServiceCfg()interface{}
OpenProfiler()
GetProfiler() *profiler.Profiler GetProfiler() *profiler.Profiler
GetServiceEventChannelNum() int
GetServiceTimerChannelNum() int
SetEventChannelNum(num int)
OpenProfiler()
} }
// eventPool的内存池,缓存Event // eventPool的内存池,缓存Event
@@ -52,7 +58,8 @@ type Service struct {
startStatus bool startStatus bool
eventProcessor event.IEventProcessor eventProcessor event.IEventProcessor
profiler *profiler.Profiler //性能分析器 profiler *profiler.Profiler //性能分析器
rpcEventLister rpc.IRpcListener nodeEventLister rpc.INodeListener
discoveryServiceLister rpc.IDiscoveryServiceListener
chanEvent chan event.IEvent chanEvent chan event.IEvent
} }
@@ -62,6 +69,13 @@ type RpcConnEvent struct{
NodeId int NodeId int
} }
// DiscoveryServiceEvent 发现服务结点
type DiscoveryServiceEvent struct{
IsDiscovery bool
ServiceName []string
NodeId int
}
func SetMaxServiceChannel(maxEventChannel int){ func SetMaxServiceChannel(maxEventChannel int){
maxServiceEventChannel = maxEventChannel maxServiceEventChannel = maxEventChannel
eventPool = originSync.NewPoolEx(make(chan originSync.IPoolData, maxServiceEventChannel), func() originSync.IPoolData { eventPool = originSync.NewPoolEx(make(chan originSync.IPoolData, maxServiceEventChannel), func() originSync.IPoolData {
@@ -69,8 +83,12 @@ func SetMaxServiceChannel(maxEventChannel int){
}) })
} }
func (rpcEventData *DiscoveryServiceEvent) GetEventType() event.EventType{
return event.Sys_Event_DiscoverService
}
func (rpcEventData *RpcConnEvent) GetEventType() event.EventType{ func (rpcEventData *RpcConnEvent) GetEventType() event.EventType{
return event.Sys_Event_Rpc_Event return event.Sys_Event_Node_Event
} }
func (s *Service) OnSetup(iService IService){ func (s *Service) OnSetup(iService IService){
@@ -88,7 +106,10 @@ func (s *Service) OpenProfiler() {
func (s *Service) Init(iService IService,getClientFun rpc.FuncRpcClient,getServerFun rpc.FuncRpcServer,serviceCfg interface{}) { func (s *Service) Init(iService IService,getClientFun rpc.FuncRpcClient,getServerFun rpc.FuncRpcServer,serviceCfg interface{}) {
s.dispatcher =timer.NewDispatcher(timerDispatcherLen) s.dispatcher =timer.NewDispatcher(timerDispatcherLen)
if s.chanEvent == nil {
s.chanEvent = make(chan event.IEvent,maxServiceEventChannel) s.chanEvent = make(chan event.IEvent,maxServiceEventChannel)
}
s.rpcHandler.InitRpcHandler(iService.(rpc.IRpcHandler),getClientFun,getServerFun,iService.(rpc.IRpcHandlerChannel)) s.rpcHandler.InitRpcHandler(iService.(rpc.IRpcHandler),getClientFun,getServerFun,iService.(rpc.IRpcHandlerChannel))
s.IRpcHandler = &s.rpcHandler s.IRpcHandler = &s.rpcHandler
s.self = iService.(IModule) s.self = iService.(IModule)
@@ -259,24 +280,44 @@ func (s *Service) RegRawRpc(rpcMethodId uint32,rawRpcCB rpc.RawRpcCallBack){
func (s *Service) OnStart(){ func (s *Service) OnStart(){
} }
func (s *Service) OnRpcEvent(ev event.IEvent){ func (s *Service) OnNodeEvent(ev event.IEvent){
event := ev.(*RpcConnEvent) event := ev.(*RpcConnEvent)
if event.IsConnect { if event.IsConnect {
s.rpcEventLister.OnNodeConnected(event.NodeId) s.nodeEventLister.OnNodeConnected(event.NodeId)
}else{ }else{
s.rpcEventLister.OnNodeDisconnect(event.NodeId) s.nodeEventLister.OnNodeDisconnect(event.NodeId)
} }
} }
func (s *Service) RegRpcListener(rpcEventLister rpc.IRpcListener) { func (s *Service) OnDiscoverServiceEvent(ev event.IEvent){
s.rpcEventLister = rpcEventLister event := ev.(*DiscoveryServiceEvent)
s.RegEventReceiverFunc(event.Sys_Event_Rpc_Event,s.GetEventHandler(),s.OnRpcEvent) if event.IsDiscovery {
s.discoveryServiceLister.OnDiscoveryService(event.NodeId,event.ServiceName)
}else{
s.discoveryServiceLister.OnUnDiscoveryService(event.NodeId,event.ServiceName)
}
}
func (s *Service) RegRpcListener(rpcEventLister rpc.INodeListener) {
s.nodeEventLister = rpcEventLister
s.RegEventReceiverFunc(event.Sys_Event_Node_Event,s.GetEventHandler(),s.OnNodeEvent)
RegRpcEventFun(s.GetName()) RegRpcEventFun(s.GetName())
} }
func (s *Service) UnRegRpcListener(rpcLister rpc.IRpcListener) { func (s *Service) UnRegRpcListener(rpcLister rpc.INodeListener) {
s.UnRegEventReceiverFunc(event.Sys_Event_Rpc_Event,s.GetEventHandler()) s.UnRegEventReceiverFunc(event.Sys_Event_Node_Event,s.GetEventHandler())
RegRpcEventFun(s.GetName()) UnRegRpcEventFun(s.GetName())
}
func (s *Service) RegDiscoverListener(discoveryServiceListener rpc.IDiscoveryServiceListener) {
s.discoveryServiceLister = discoveryServiceListener
s.RegEventReceiverFunc(event.Sys_Event_DiscoverService,s.GetEventHandler(),s.OnDiscoverServiceEvent)
RegDiscoveryServiceEventFun(s.GetName())
}
func (s *Service) UnRegDiscoverListener(rpcLister rpc.INodeListener) {
s.UnRegEventReceiverFunc(event.Sys_Event_DiscoverService,s.GetEventHandler())
UnRegDiscoveryServiceEventFun(s.GetName())
} }
@@ -311,6 +352,21 @@ func (s *Service) pushEvent(ev event.IEvent) error{
return nil return nil
} }
func (s *Service) GetServiceEventChannelNum() int{
return len(s.chanEvent)
}
func (s *Service) GetServiceTimerChannelNum() int{
return len(s.dispatcher.ChanTimer)
}
func (s *Service) SetEventChannelNum(num int){
if s.chanEvent == nil {
s.chanEvent = make(chan event.IEvent,num)
}else {
panic("this stage cannot be set")
}
}
func (s *Service) SetGoRoutineNum(goroutineNum int32) bool { func (s *Service) SetGoRoutineNum(goroutineNum int32) bool {
//已经开始状态不允许修改协程数量,打开性能分析器不允许开多线程 //已经开始状态不允许修改协程数量,打开性能分析器不允许开多线程

View File

@@ -5,7 +5,12 @@ var mapServiceName map[string]IService
var setupServiceList []IService var setupServiceList []IService
type RegRpcEventFunType func(serviceName string) type RegRpcEventFunType func(serviceName string)
type RegDiscoveryServiceEventFunType func(serviceName string)
var RegRpcEventFun RegRpcEventFunType var RegRpcEventFun RegRpcEventFunType
var UnRegRpcEventFun RegRpcEventFunType
var RegDiscoveryServiceEventFun RegDiscoveryServiceEventFunType
var UnRegDiscoveryServiceEventFun RegDiscoveryServiceEventFunType
func init(){ func init(){
mapServiceName = map[string]IService{} mapServiceName = map[string]IService{}

View File

@@ -4,7 +4,7 @@ import (
"bytes" "bytes"
"crypto/tls" "crypto/tls"
"fmt" "fmt"
"io/ioutil" "io"
"net" "net"
"net/http" "net/http"
"net/url" "net/url"
@@ -103,7 +103,7 @@ func (m *HttpClientModule) Request(method string, url string, body []byte, heade
} }
defer rsp.Body.Close() defer rsp.Body.Close()
ret.Body, err = ioutil.ReadAll(rsp.Body) ret.Body, err = io.ReadAll(rsp.Body)
if err != nil { if err != nil {
ret.Err = err ret.Err = err
return ret return ret

View File

@@ -8,7 +8,6 @@ import (
"github.com/duanhf2012/origin/util/uuid" "github.com/duanhf2012/origin/util/uuid"
jsoniter "github.com/json-iterator/go" jsoniter "github.com/json-iterator/go"
"io" "io"
"io/ioutil"
"net/http" "net/http"
"os" "os"
"strings" "strings"
@@ -85,7 +84,6 @@ type HttpSession struct {
sessionDone chan *HttpSession sessionDone chan *HttpSession
} }
type HttpService struct { type HttpService struct {
service.Service service.Service
@@ -496,7 +494,7 @@ func (httpService *HttpService) ServeHTTP(w http.ResponseWriter, r *http.Request
session.w = w session.w = w
defer r.Body.Close() defer r.Body.Close()
body, err := ioutil.ReadAll(r.Body) body, err := io.ReadAll(r.Body)
if err != nil { if err != nil {
session.WriteStatusCode(http.StatusGatewayTimeout) session.WriteStatusCode(http.StatusGatewayTimeout)
session.flush() session.flush()

View File

@@ -10,7 +10,7 @@ type Element[ValueType NumberType] interface {
} }
//BiSearch 二分查找,切片必需有序号。matchUp表示是否向上范围查找。比如数列10 20 30 当value传入25时返回结果是2,表示落到3的范围 //BiSearch 二分查找,切片必需有序号。matchUp表示是否向上范围查找。比如数列10 20 30 当value传入25时返回结果是2,表示落到3的范围
func BiSearch[ValueType NumberType, T Element[ValueType]](sElement []T, value ValueType, matchUp bool) int { func BiSearch[ValueType NumberType, T Element[ValueType]](sElement []T, value ValueType, matchUp int) int {
low, high := 0, len(sElement)-1 low, high := 0, len(sElement)-1
if high == -1 { if high == -1 {
return -1 return -1
@@ -28,12 +28,29 @@ func BiSearch[ValueType NumberType, T Element[ValueType]](sElement []T, value Va
} }
} }
if matchUp == true { switch matchUp {
case 1:
if (sElement[mid].GetValue()) < value && if (sElement[mid].GetValue()) < value &&
(mid+1 < len(sElement)-1) { (mid+1 < len(sElement)-1) {
return mid + 1 return mid + 1
} }
return mid return mid
case -1:
if (sElement[mid].GetValue()) > value {
if mid - 1 < 0 {
return -1
} else {
return mid - 1
}
} else if (sElement[mid].GetValue()) < value {
if (mid+1 < len(sElement)-1) {
return mid + 1
} else {
return mid
}
} else {
return mid
}
} }
return -1 return -1

View File

@@ -0,0 +1,61 @@
package algorithms
import (
"errors"
"unsafe"
)
type BitNumber interface {
int | int8 | int16 | int32 | int64 | uint | uint8 | uint16 | uint32 | uint64 | uintptr
}
type UnsignedNumber interface {
uint | uint8 | uint16 | uint32 | uint64 | uintptr
}
func getBitTagIndex[Number BitNumber, UNumber UnsignedNumber](bitBuff []Number, bitPositionIndex UNumber) (uintptr, uintptr, bool) {
sliceIndex := uintptr(bitPositionIndex) / (8 * unsafe.Sizeof(bitBuff[0]))
sliceBitIndex := uintptr(bitPositionIndex) % (8 * unsafe.Sizeof(bitBuff[0]))
//位index不能越界
if uintptr(bitPositionIndex) >= uintptr(len(bitBuff))*unsafe.Sizeof(bitBuff[0])*8 {
return 0, 0, false
}
return sliceIndex, sliceBitIndex, true
}
func setBitTagByIndex[Number BitNumber, UNumber UnsignedNumber](bitBuff []Number, bitPositionIndex UNumber, setTag bool) bool {
sliceIndex, sliceBitIndex, ret := getBitTagIndex(bitBuff, bitPositionIndex)
if ret == false {
return ret
}
if setTag {
bitBuff[sliceIndex] = bitBuff[sliceIndex] | 1<<sliceBitIndex
} else {
bitBuff[sliceIndex] = bitBuff[sliceIndex] &^ (1 << sliceBitIndex)
}
return true
}
func GetBitwiseTag[Number BitNumber, UNumber UnsignedNumber](bitBuff []Number, bitPositionIndex UNumber) (bool, error) {
sliceIndex, sliceBitIndex, ret := getBitTagIndex(bitBuff, bitPositionIndex)
if ret == false {
return false, errors.New("Invalid parameter")
}
return (bitBuff[sliceIndex] & (1 << sliceBitIndex)) > 0, nil
}
func SetBitwiseTag[Number BitNumber, UNumber UnsignedNumber](bitBuff []Number, bitPositionIndex UNumber) bool {
return setBitTagByIndex(bitBuff, bitPositionIndex, true)
}
func ClearBitwiseTag[Number BitNumber, UNumber UnsignedNumber](bitBuff []Number, bitPositionIndex UNumber) bool {
return setBitTagByIndex(bitBuff, bitPositionIndex, false)
}
func GetBitwiseNum[Number BitNumber](bitBuff []Number) int {
return len(bitBuff) * int(unsafe.Sizeof(bitBuff[0])*8)
}

View File

@@ -0,0 +1,37 @@
package algorithms
import "testing"
func Test_Bitwise(t *testing.T) {
//1.预分配10个byte切片用于存储位标识
byteBuff := make([]byte, 10)
//2.获取buff总共位数
bitNum := GetBitwiseNum(byteBuff)
t.Log(bitNum)
//3..对索引79位打标记注意是从0开始79即为最后一个位
idx := uint(79)
//4.对byteBuff索引idx位置打上标记
SetBitwiseTag(byteBuff, idx)
//5.获取索引idx位置标记
isTag, ret := GetBitwiseTag(byteBuff, idx)
t.Log("set index ", idx, " :", isTag, ret)
if isTag != true {
t.Fatal("error")
}
//6.清除掉索引idx位标记
ClearBitwiseTag(byteBuff, idx)
//7.获取索引idx位置标记
isTag, ret = GetBitwiseTag(byteBuff, idx)
t.Log("get index ", idx, " :", isTag, ret)
if isTag != false {
t.Fatal("error")
}
}

View File

@@ -6,10 +6,15 @@ go tool nm ./originserver.exe |grep buildtime
//编译传入编译时间信息 //编译传入编译时间信息
go build -ldflags "-X 'github.com/duanhf2012/origin/util/buildtime.BuildTime=20200101'" go build -ldflags "-X 'github.com/duanhf2012/origin/util/buildtime.BuildTime=20200101'"
go build -ldflags "-X github.com/duanhf2012/origin/util/buildtime.BuildTime=20200101 -X github.com/duanhf2012/origin/util/buildtime.BuildTag=debug"
*/ */
var BuildTime string var BuildTime string
var BuildTag string
func GetBuildDateTime() string { func GetBuildDateTime() string {
return BuildTime return BuildTime
} }
func GetBuildTag() string {
return BuildTag
}

View File

@@ -2,6 +2,7 @@ package coroutine
import ( import (
"fmt" "fmt"
"github.com/duanhf2012/origin/log"
"reflect" "reflect"
"runtime/debug" "runtime/debug"
) )
@@ -12,10 +13,11 @@ func F(callback interface{},recoverNum int, args ...interface{}) {
var coreInfo string var coreInfo string
coreInfo = string(debug.Stack()) coreInfo = string(debug.Stack())
coreInfo += "\n" + fmt.Sprintf("Core information is %v\n", r) coreInfo += "\n" + fmt.Sprintf("Core information is %v\n", r)
fmt.Print(coreInfo) log.SError(coreInfo)
if recoverNum > 0{
if recoverNum==-1 ||recoverNum-1 >= 0 {
recoverNum -= 1 recoverNum -= 1
}
if recoverNum == -1 || recoverNum > 0 {
go F(callback,recoverNum, args...) go F(callback,recoverNum, args...)
} }
} }

View File

@@ -1,5 +1,7 @@
package math package math
import "github.com/duanhf2012/origin/log"
type NumberType interface { type NumberType interface {
int | int8 | int16 | int32 | int64 | float32 | float64 | uint | uint8 | uint16 | uint32 | uint64 int | int8 | int16 | int32 | int64 | float32 | float64 | uint | uint8 | uint16 | uint32 | uint64
} }
@@ -35,3 +37,42 @@ func Abs[NumType SignedNumberType](Num NumType) NumType {
return Num return Num
} }
func Add[NumType NumberType](number1 NumType, number2 NumType) NumType {
ret := number1 + number2
if number2> 0 && ret < number1 {
log.SStack("Calculation overflow , number1 is ",number1," number2 is ",number2)
}else if (number2<0 && ret > number1){
log.SStack("Calculation overflow , number1 is ",number1," number2 is ",number2)
}
return ret
}
func Sub[NumType NumberType](number1 NumType, number2 NumType) NumType {
ret := number1 - number2
if number2> 0 && ret > number1 {
log.SStack("Calculation overflow , number1 is ",number1," number2 is ",number2)
}else if (number2<0 && ret < number1){
log.SStack("Calculation overflow , number1 is ",number1," number2 is ",number2)
}
return ret
}
func Mul[NumType NumberType](number1 NumType, number2 NumType) NumType {
ret := number1 * number2
if number1 == 0 || number2 == 0 {
return ret
}
if ret / number2 == number1 {
return ret
}
log.SStack("Calculation overflow , number1 is ",number1," number2 is ",number2)
return ret
}

167
util/queue/squeue.go Normal file
View File

@@ -0,0 +1,167 @@
package queue
import (
"sync"
)
/*
这是一个循环队列
*/
type SQueue[ElementType any] struct {
elements []ElementType
head int
tail int
locker sync.RWMutex
}
//游标,通过该游标获取数据
type SCursor[ElementType any] struct {
pos int
squeue *SQueue[ElementType]
}
func NewSQueue[ElementType any](maxElementNum int) *SQueue[ElementType]{
queue := &SQueue[ElementType]{}
queue.elements = make([]ElementType,maxElementNum+1)
return queue
}
//游标移动到队首
func (s *SCursor[ElementType]) First(){
s.squeue.locker.RLock()
defer s.squeue.locker.RUnlock()
s.pos = s.squeue.head
}
//从当前位置移动游标,注意如果在多协程读或者pop时可能会导致游标失效
func (s *SCursor[ElementType]) Next() (elem ElementType,ret bool){
s.squeue.locker.RLock()
defer s.squeue.locker.RUnlock()
if s.pos == s.squeue.tail {
return
}
s.pos++
s.pos = (s.pos)%(len(s.squeue.elements))
return s.squeue.elements[s.pos],true
}
//获取队列元数个数
func (s *SQueue[ElementType]) Len() int {
s.locker.RLock()
defer s.locker.RUnlock()
return s.len()
}
func (s *SQueue[ElementType]) len() int {
if s.head <= s.tail {
return s.tail - s.head
}
//(len(s.elements)-1-s.head)+(s.tail+1)
return len(s.elements)-s.head+s.tail
}
//获取游标,默认是队首
func (s *SQueue[ElementType]) GetCursor() (cur SCursor[ElementType]){
s.locker.RLock()
defer s.locker.RUnlock()
cur.squeue = s
cur.pos = s.head
return
}
//获取指定位置的游标
func (s *SQueue[ElementType]) GetPosCursor(pos int) (cur SCursor[ElementType],ret bool){
s.locker.RLock()
defer s.locker.RUnlock()
if s.head < s.tail {
if pos<=s.head || pos>s.tail{
return
}
ret = true
cur.squeue = s
cur.pos = pos
return
}
if pos >s.tail && pos <=s.head {
return
}
cur.squeue = s
cur.pos = pos
return
}
//从队首移除掉指定数量元素
func (s *SQueue[ElementType]) RemoveElement(elementNum int) (removeNum int) {
s.locker.Lock()
defer s.locker.Unlock()
lens := s.len()
if elementNum > lens{
removeNum = lens
}else{
removeNum = elementNum
}
s.head = (s.head + removeNum)%len(s.elements)
return
}
//从队首Pop元素
func (s *SQueue[ElementType]) Pop() (elem ElementType,ret bool){
s.locker.Lock()
defer s.locker.Unlock()
if s.head == s.tail {
return
}
s.head++
s.head = s.head%len(s.elements)
return s.elements[s.head],true
}
//从队尾Push数据
func (s *SQueue[ElementType]) Push(elem ElementType) bool {
s.locker.Lock()
defer s.locker.Unlock()
nextPos := (s.tail+1) % len(s.elements)
if nextPos == s.head {
//is full
return false
}
s.tail = nextPos
s.elements[s.tail] = elem
return true
}
//判断队列是否为空
func (s *SQueue[ElementType]) IsEmpty() bool{
s.locker.RLock()
defer s.locker.RUnlock()
return s.head == s.tail
}
//判断队列是否已满
func (s *SQueue[ElementType]) IsFull() bool{
s.locker.RLock()
defer s.locker.RUnlock()
nextPos := (s.tail+1) % len(s.elements)
return nextPos == s.head
}

View File

@@ -0,0 +1,66 @@
package queue
import (
"testing"
)
func Test_Example(t *testing.T) {
//1.创建阶列
queue := NewSQueue[int](5)
//2.判断是否为空
t.Log("is empty :", queue.IsEmpty())
t.Log("is full :", queue.IsFull())
//3.游标使用,打印所有数据
cursor := queue.GetCursor()
cursor.First()
for {
elem, ret := cursor.Next()
if ret == false {
break
}
t.Log("elem:", elem)
}
//4.push数据塞满队列
for i := 0; i < 6; i++ {
t.Log("push:", queue.Push(i))
}
t.Log("is empty :", queue.IsEmpty())
t.Log("is full :", queue.IsFull())
//5.使用游标遍历所有数据
cursor.First()
for {
elem, ret := cursor.Next()
if ret == false {
break
}
t.Log("elem:", elem)
}
//6.删除2个元素
removeNum := queue.RemoveElement(2)
t.Log("Remove Num:", removeNum)
//7.游标遍历
cursor.First()
for {
elem, ret := cursor.Next()
if ret == false {
break
}
t.Log("elem:", elem)
}
//8.pop数据所有
for i := 0; i < 6; i++ {
elem, ret := queue.Pop()
t.Log("pop:", elem, "-", ret, " len:", queue.Len())
}
t.Log("is empty :", queue.IsEmpty())
t.Log("is full :", queue.IsFull())
}