mirror of
https://github.com/duanhf2012/origin.git
synced 2026-02-07 00:48:11 +08:00
Compare commits
21 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3ce873ef04 | ||
|
|
3763f7d848 | ||
|
|
769f680b17 | ||
|
|
77988906f8 | ||
|
|
ae0ba1d966 | ||
|
|
f61fd5d1be | ||
|
|
eb1867c5fd | ||
|
|
8823d5fba4 | ||
|
|
9945c29a5c | ||
|
|
173d84f7e6 | ||
|
|
d687780517 | ||
|
|
a8a030c0f5 | ||
|
|
f8469ea10e | ||
|
|
be3daf19f9 | ||
|
|
aa91c7bf1b | ||
|
|
7fe73e55fb | ||
|
|
e5ceaa9e76 | ||
|
|
97c55ada71 | ||
|
|
776b234022 | ||
|
|
a4f425bd69 | ||
|
|
ee54862be2 |
1
.gitignore
vendored
1
.gitignore
vendored
@@ -10,3 +10,4 @@
|
||||
|
||||
# Output of the go coverage tool, specifically when used with LiteIDE
|
||||
*.out
|
||||
.DS_Store
|
||||
|
||||
@@ -46,16 +46,18 @@ type Cluster struct {
|
||||
globalCfg interface{} //全局配置
|
||||
|
||||
localServiceCfg map[string]interface{} //map[serviceName]配置数据*
|
||||
mapRpc map[int]NodeRpcInfo //nodeId
|
||||
serviceDiscovery IServiceDiscovery //服务发现接口
|
||||
|
||||
|
||||
locker sync.RWMutex //结点与服务关系保护锁
|
||||
mapRpc map[int]NodeRpcInfo //nodeId
|
||||
mapIdNode map[int]NodeInfo //map[NodeId]NodeInfo
|
||||
mapServiceNode map[string]map[int]struct{} //map[serviceName]map[NodeId]
|
||||
|
||||
rpcServer rpc.Server
|
||||
rpcEventLocker sync.RWMutex //Rpc事件监听保护锁
|
||||
mapServiceListenRpcEvent map[string]struct{} //ServiceName
|
||||
mapServiceListenDiscoveryEvent map[string]struct{} //ServiceName
|
||||
}
|
||||
|
||||
func GetCluster() *Cluster {
|
||||
@@ -94,9 +96,10 @@ func (cls *Cluster) DelNode(nodeId int, immediately bool) {
|
||||
return
|
||||
}
|
||||
cls.locker.Lock()
|
||||
defer cls.locker.Unlock()
|
||||
|
||||
nodeInfo, ok := cls.mapIdNode[nodeId]
|
||||
if ok == false {
|
||||
cls.locker.Unlock()
|
||||
return
|
||||
}
|
||||
|
||||
@@ -112,7 +115,6 @@ func (cls *Cluster) DelNode(nodeId int, immediately bool) {
|
||||
if rpc.client.IsConnected() {
|
||||
nodeInfo.status = Discard
|
||||
rpc.client.Unlock()
|
||||
cls.locker.Unlock()
|
||||
log.SRelease("Discard node ", nodeInfo.NodeId, " ", nodeInfo.ListenAddr)
|
||||
return
|
||||
}
|
||||
@@ -126,7 +128,6 @@ func (cls *Cluster) DelNode(nodeId int, immediately bool) {
|
||||
|
||||
delete(cls.mapIdNode, nodeId)
|
||||
delete(cls.mapRpc, nodeId)
|
||||
cls.locker.Unlock()
|
||||
if ok == true {
|
||||
rpc.client.Close(false)
|
||||
}
|
||||
@@ -224,6 +225,9 @@ func (cls *Cluster) Init(localNodeId int, setupServiceFun SetupServiceFun) error
|
||||
//2.安装服务发现结点
|
||||
cls.SetupServiceDiscovery(localNodeId, setupServiceFun)
|
||||
service.RegRpcEventFun = cls.RegRpcEvent
|
||||
service.UnRegRpcEventFun = cls.UnRegRpcEvent
|
||||
service.RegDiscoveryServiceEventFun = cls.RegDiscoveryEvent
|
||||
service.UnRegDiscoveryServiceEventFun = cls.UnReDiscoveryEvent
|
||||
|
||||
err = cls.serviceDiscovery.InitDiscovery(localNodeId, cls.serviceDiscoveryDelNode, cls.serviceDiscoverySetNodeInfo)
|
||||
if err != nil {
|
||||
@@ -364,6 +368,7 @@ func (cls *Cluster) triggerRpcEvent(bConnect bool, clientSeq uint32, nodeId int)
|
||||
cls.locker.Unlock()
|
||||
|
||||
cls.rpcEventLocker.Lock()
|
||||
defer cls.rpcEventLocker.Unlock()
|
||||
for serviceName, _ := range cls.mapServiceListenRpcEvent {
|
||||
ser := service.GetService(serviceName)
|
||||
if ser == nil {
|
||||
@@ -376,7 +381,27 @@ func (cls *Cluster) triggerRpcEvent(bConnect bool, clientSeq uint32, nodeId int)
|
||||
eventData.NodeId = nodeId
|
||||
ser.(service.IModule).NotifyEvent(&eventData)
|
||||
}
|
||||
cls.rpcEventLocker.Unlock()
|
||||
}
|
||||
|
||||
|
||||
func (cls *Cluster) TriggerDiscoveryEvent(bDiscovery bool, nodeId int, serviceName []string) {
|
||||
cls.rpcEventLocker.Lock()
|
||||
defer cls.rpcEventLocker.Unlock()
|
||||
|
||||
for sName, _ := range cls.mapServiceListenDiscoveryEvent {
|
||||
ser := service.GetService(sName)
|
||||
if ser == nil {
|
||||
log.SError("cannot find service name ", serviceName)
|
||||
continue
|
||||
}
|
||||
|
||||
var eventData service.DiscoveryServiceEvent
|
||||
eventData.IsDiscovery = bDiscovery
|
||||
eventData.NodeId = nodeId
|
||||
eventData.ServiceName = serviceName
|
||||
ser.(service.IModule).NotifyEvent(&eventData)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func (cls *Cluster) GetLocalNodeInfo() *NodeInfo {
|
||||
@@ -399,14 +424,25 @@ func (cls *Cluster) UnRegRpcEvent(serviceName string) {
|
||||
cls.rpcEventLocker.Unlock()
|
||||
}
|
||||
|
||||
func (cls *Cluster) FetchAllNodeId(fetchFun func(nodeId int)) {
|
||||
cls.locker.Lock()
|
||||
for nodeId, _ := range cls.mapIdNode {
|
||||
fetchFun(nodeId)
|
||||
|
||||
func (cls *Cluster) RegDiscoveryEvent(serviceName string) {
|
||||
cls.rpcEventLocker.Lock()
|
||||
if cls.mapServiceListenDiscoveryEvent == nil {
|
||||
cls.mapServiceListenDiscoveryEvent = map[string]struct{}{}
|
||||
}
|
||||
cls.locker.Unlock()
|
||||
|
||||
cls.mapServiceListenDiscoveryEvent[serviceName] = struct{}{}
|
||||
cls.rpcEventLocker.Unlock()
|
||||
}
|
||||
|
||||
func (cls *Cluster) UnReDiscoveryEvent(serviceName string) {
|
||||
cls.rpcEventLocker.Lock()
|
||||
delete(cls.mapServiceListenDiscoveryEvent, serviceName)
|
||||
cls.rpcEventLocker.Unlock()
|
||||
}
|
||||
|
||||
|
||||
|
||||
func HasService(nodeId int, serviceName string) bool {
|
||||
cluster.locker.RLock()
|
||||
defer cluster.locker.RUnlock()
|
||||
@@ -420,6 +456,32 @@ func HasService(nodeId int, serviceName string) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
func GetNodeByServiceName(serviceName string) map[int]struct{} {
|
||||
cluster.locker.RLock()
|
||||
defer cluster.locker.RUnlock()
|
||||
|
||||
mapNode, ok := cluster.mapServiceNode[serviceName]
|
||||
if ok == false {
|
||||
return nil
|
||||
}
|
||||
|
||||
mapNodeId := map[int]struct{}{}
|
||||
for nodeId,_ := range mapNode {
|
||||
mapNodeId[nodeId] = struct{}{}
|
||||
}
|
||||
|
||||
return mapNodeId
|
||||
}
|
||||
|
||||
func (cls *Cluster) GetGlobalCfg() interface{} {
|
||||
return cls.globalCfg
|
||||
}
|
||||
|
||||
|
||||
func (cls *Cluster) GetNodeInfo(nodeId int) (NodeInfo,bool) {
|
||||
cls.locker.RLock()
|
||||
defer cls.locker.RUnlock()
|
||||
|
||||
nodeInfo,ok:= cls.mapIdNode[nodeId]
|
||||
return nodeInfo,ok
|
||||
}
|
||||
|
||||
@@ -290,6 +290,8 @@ func (dc *DynamicDiscoveryClient) RPC_SubServiceDiscover(req *rpc.SubscribeDisco
|
||||
|
||||
//删除不必要的结点
|
||||
for _, nodeId := range willDelNodeId {
|
||||
nodeInfo,_ := cluster.GetNodeInfo(int(nodeId))
|
||||
cluster.TriggerDiscoveryEvent(false,int(nodeId),nodeInfo.PublicServiceList)
|
||||
dc.removeMasterNode(req.MasterNodeId, int32(nodeId))
|
||||
if dc.findNodeId(nodeId) == false {
|
||||
dc.funDelService(int(nodeId), false)
|
||||
@@ -300,6 +302,7 @@ func (dc *DynamicDiscoveryClient) RPC_SubServiceDiscover(req *rpc.SubscribeDisco
|
||||
for _, nodeInfo := range mapNodeInfo {
|
||||
dc.addMasterNode(req.MasterNodeId, nodeInfo.NodeId)
|
||||
dc.setNodeInfo(nodeInfo)
|
||||
cluster.TriggerDiscoveryEvent(true,int(nodeInfo.NodeId),nodeInfo.PublicServiceList)
|
||||
}
|
||||
|
||||
return nil
|
||||
|
||||
@@ -244,15 +244,6 @@ func (cls *Cluster) GetNodeIdByService(serviceName string, rpcClientList []*rpc.
|
||||
return nil, count
|
||||
}
|
||||
|
||||
func (cls *Cluster) getServiceCfg(serviceName string) interface{} {
|
||||
v, ok := cls.localServiceCfg[serviceName]
|
||||
if ok == false {
|
||||
return nil
|
||||
}
|
||||
|
||||
return v
|
||||
}
|
||||
|
||||
func (cls *Cluster) GetServiceCfg(serviceName string) interface{} {
|
||||
serviceCfg, ok := cls.localServiceCfg[serviceName]
|
||||
if ok == false {
|
||||
|
||||
@@ -9,8 +9,9 @@ const (
|
||||
|
||||
Sys_Event_Tcp EventType = -3
|
||||
Sys_Event_Http_Event EventType = -4
|
||||
Sys_Event_WebSocket EventType = -5
|
||||
Sys_Event_Rpc_Event EventType = -6
|
||||
Sys_Event_WebSocket EventType = -5
|
||||
Sys_Event_Node_Event EventType = -6
|
||||
Sys_Event_DiscoverService EventType = -7
|
||||
|
||||
Sys_Event_User_Define EventType = 1
|
||||
)
|
||||
|
||||
31
go.mod
31
go.mod
@@ -1,13 +1,30 @@
|
||||
module github.com/duanhf2012/origin
|
||||
|
||||
go 1.17
|
||||
go 1.18
|
||||
|
||||
require (
|
||||
github.com/go-sql-driver/mysql v1.5.0
|
||||
github.com/golang/protobuf v1.4.3
|
||||
github.com/gomodule/redigo v1.8.3
|
||||
github.com/gorilla/websocket v1.4.2
|
||||
github.com/json-iterator/go v1.1.10
|
||||
google.golang.org/protobuf v1.25.0
|
||||
github.com/go-sql-driver/mysql v1.6.0
|
||||
github.com/gogo/protobuf v1.3.2
|
||||
github.com/gomodule/redigo v1.8.8
|
||||
github.com/gorilla/websocket v1.5.0
|
||||
github.com/json-iterator/go v1.1.12
|
||||
go.mongodb.org/mongo-driver v1.9.1
|
||||
gopkg.in/mgo.v2 v2.0.0-20190816093944-a6b53ec6cb22
|
||||
)
|
||||
|
||||
require (
|
||||
github.com/go-stack/stack v1.8.0 // indirect
|
||||
github.com/golang/snappy v0.0.1 // indirect
|
||||
github.com/klauspost/compress v1.13.6 // indirect
|
||||
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 // indirect
|
||||
github.com/modern-go/reflect2 v1.0.2 // indirect
|
||||
github.com/pkg/errors v0.9.1 // indirect
|
||||
github.com/xdg-go/pbkdf2 v1.0.0 // indirect
|
||||
github.com/xdg-go/scram v1.0.2 // indirect
|
||||
github.com/xdg-go/stringprep v1.0.2 // indirect
|
||||
github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d // indirect
|
||||
golang.org/x/crypto v0.0.0-20201216223049-8b5274cf687f // indirect
|
||||
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9 // indirect
|
||||
golang.org/x/text v0.3.6 // indirect
|
||||
gopkg.in/yaml.v2 v2.4.0 // indirect
|
||||
)
|
||||
|
||||
159
go.sum
159
go.sum
@@ -1,95 +1,102 @@
|
||||
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
|
||||
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
|
||||
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
|
||||
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
|
||||
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
|
||||
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
|
||||
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
|
||||
github.com/go-sql-driver/mysql v1.5.0 h1:ozyZYNQW3x3HtqT1jira07DN2PArx2v7/mN66gGcHOs=
|
||||
github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg=
|
||||
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
|
||||
github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
|
||||
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
|
||||
github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
|
||||
github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8=
|
||||
github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA=
|
||||
github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs=
|
||||
github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w=
|
||||
github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0=
|
||||
github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QDs8UjoX8=
|
||||
github.com/golang/protobuf v1.4.3 h1:JjCZWpVbqXDqFVmTfYWEVTMIYrL/NPdPSCHPJ0T/raM=
|
||||
github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
|
||||
github.com/gomodule/redigo v1.8.3 h1:HR0kYDX2RJZvAup8CsiJwxB4dTCSC0AaUq6S4SiLwUc=
|
||||
github.com/gomodule/redigo v1.8.3/go.mod h1:P9dn9mFrCBvWhGE1wpxx6fgq7BAeLBk+UUUzlpkBYO0=
|
||||
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
|
||||
github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
|
||||
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
|
||||
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
|
||||
github.com/google/go-cmp v0.5.0 h1:/QaMHBdZ26BB3SSst0Iwl10Epc+xhTquomWX0oZEB6w=
|
||||
github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
|
||||
github.com/go-sql-driver/mysql v1.6.0 h1:BCTh4TKNUYmOmMUcQ3IipzF5prigylS7XXjEkfCHuOE=
|
||||
github.com/go-sql-driver/mysql v1.6.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg=
|
||||
github.com/go-stack/stack v1.8.0 h1:5SgMzNM5HxrEjV0ww2lTmX6E2Izsfxas4+YHWRs3Lsk=
|
||||
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
|
||||
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
|
||||
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
|
||||
github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
|
||||
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
|
||||
github.com/gomodule/redigo v1.8.8 h1:f6cXq6RRfiyrOJEV7p3JhLDlmawGBVBBP1MggY8Mo4E=
|
||||
github.com/gomodule/redigo v1.8.8/go.mod h1:7ArFNvsTjH8GMMzB4uy1snslv2BwmginuMs06a1uzZE=
|
||||
github.com/google/go-cmp v0.5.2 h1:X2ev0eStA3AbceY54o37/0PQ/UWqKEiiO2dKL5OPaFM=
|
||||
github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
|
||||
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
|
||||
github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc=
|
||||
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
|
||||
github.com/json-iterator/go v1.1.10 h1:Kz6Cvnvv2wGdaG/V8yMvfkmNiXq9Ya2KUv4rouJJr68=
|
||||
github.com/json-iterator/go v1.1.10/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
|
||||
github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc=
|
||||
github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
|
||||
github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
|
||||
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
|
||||
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
|
||||
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
|
||||
github.com/klauspost/compress v1.13.6 h1:P76CopJELS0TiO2mebmnzgWaajssP/EszplttgQxcgc=
|
||||
github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
|
||||
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
|
||||
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
|
||||
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
|
||||
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
|
||||
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
|
||||
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 h1:ZqeYNhU3OHLH3mGKHDcjJRFFRrJa6eAM5H+CtDdOsPc=
|
||||
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
|
||||
github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742 h1:Esafd1046DLDQ0W1YjYsBW+p8U2u7vzgW2SQVmlNazg=
|
||||
github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
|
||||
github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M=
|
||||
github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
|
||||
github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe/go.mod h1:wL8QJuTMNUDYhXwkmfOly8iTdp5TEcJFWZD2D7SIkUc=
|
||||
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
|
||||
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
|
||||
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
||||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
|
||||
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
||||
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
|
||||
github.com/stretchr/testify v1.5.1 h1:nOGnQDM7FYENwehXlg/kFVnos3rEvtKTjRvOWSzb6H4=
|
||||
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
|
||||
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
|
||||
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
|
||||
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
|
||||
github.com/tidwall/pretty v1.0.0 h1:HsD+QiTn7sK6flMKIvNmpqz1qrpP3Ps6jOKIKMooyg4=
|
||||
github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk=
|
||||
github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c=
|
||||
github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI=
|
||||
github.com/xdg-go/scram v1.0.2 h1:akYIkZ28e6A96dkWNJQu3nmCzH3YfwMPQExUYDaRv7w=
|
||||
github.com/xdg-go/scram v1.0.2/go.mod h1:1WAq6h33pAW+iRreB34OORO2Nf7qel3VV3fjBj+hCSs=
|
||||
github.com/xdg-go/stringprep v1.0.2 h1:6iq84/ryjjeRmMJwxutI51F2GIPlP5BfTvXHeYjyhBc=
|
||||
github.com/xdg-go/stringprep v1.0.2/go.mod h1:8F9zXuvzgwmyT5DUm4GUfZGDdT3W+LCvS6+da4O5kxM=
|
||||
github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d h1:splanxYIlg+5LfHAM6xpdFEAYOk8iySO56hMFq6uLyA=
|
||||
github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d/go.mod h1:rHwXgn7JulP+udvsHwJoVG1YGAP6VLg4y9I5dyZdqmA=
|
||||
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
|
||||
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
|
||||
go.mongodb.org/mongo-driver v1.9.1 h1:m078y9v7sBItkt1aaoe2YlvWEXcD263e1a4E1fBrJ1c=
|
||||
go.mongodb.org/mongo-driver v1.9.1/go.mod h1:0sQWfOeY63QTntERDJJ/0SuKK0T1uVSgKCuAROlKEPY=
|
||||
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
|
||||
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
|
||||
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
|
||||
golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU=
|
||||
golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
|
||||
golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
|
||||
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
|
||||
golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
|
||||
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
|
||||
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
|
||||
golang.org/x/crypto v0.0.0-20201216223049-8b5274cf687f h1:aZp0e2vLN4MToVqnjNEYEtrEA8RH8U8FN1CU7JgqsPU=
|
||||
golang.org/x/crypto v0.0.0-20201216223049-8b5274cf687f/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I=
|
||||
golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
|
||||
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
|
||||
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
|
||||
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
|
||||
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
|
||||
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
|
||||
golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
|
||||
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
|
||||
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9 h1:SQFwaSi55rU7vdNs9Yr0Z324VNlrF+0wMqRXT4St8ck=
|
||||
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw=
|
||||
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
|
||||
golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
|
||||
golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY=
|
||||
golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
|
||||
golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q=
|
||||
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
|
||||
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
|
||||
golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
|
||||
golang.org/x/text v0.3.6 h1:aRYxNxv6iGQlyVaZmk6ZgYEDa+Jg18DxebPSrd6bg1M=
|
||||
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
|
||||
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
|
||||
golang.org/x/tools v0.0.0-20190531172133-b3315ee88b7d/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc=
|
||||
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
|
||||
golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
|
||||
golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
|
||||
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
|
||||
google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
|
||||
google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
|
||||
google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc=
|
||||
google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo=
|
||||
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
|
||||
google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg=
|
||||
google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk=
|
||||
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
|
||||
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
|
||||
google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=
|
||||
google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE=
|
||||
google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo=
|
||||
google.golang.org/protobuf v1.22.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
|
||||
google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
|
||||
google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
|
||||
google.golang.org/protobuf v1.25.0 h1:Ejskq+SyPohKW+1uil0JJMtmHCgJPJ/qWTxr8qp+R4c=
|
||||
google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c=
|
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
|
||||
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE=
|
||||
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
|
||||
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/mgo.v2 v2.0.0-20190816093944-a6b53ec6cb22 h1:VpOs+IwYnYBaFnrNAeB8UUWtL3vEUnzSCL1nVjPhqrw=
|
||||
gopkg.in/mgo.v2 v2.0.0-20190816093944-a6b53ec6cb22/go.mod h1:yeKp02qBN3iKW1OzL3MGk2IdtZzaj7SFntXj72NppTA=
|
||||
gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=
|
||||
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
||||
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
|
||||
honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
|
||||
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
|
||||
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
|
||||
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
|
||||
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||
|
||||
@@ -5,6 +5,7 @@ import (
|
||||
"fmt"
|
||||
"github.com/duanhf2012/origin/network"
|
||||
"reflect"
|
||||
"github.com/duanhf2012/origin/log"
|
||||
)
|
||||
|
||||
type MessageJsonInfo struct {
|
||||
@@ -44,18 +45,18 @@ func (jsonProcessor *JsonProcessor) SetByteOrder(littleEndian bool) {
|
||||
}
|
||||
|
||||
// must goroutine safe
|
||||
func (jsonProcessor *JsonProcessor ) MsgRoute(msg interface{},userdata interface{}) error{
|
||||
func (jsonProcessor *JsonProcessor ) MsgRoute(clientId uint64,msg interface{}) error{
|
||||
pPackInfo := msg.(*JsonPackInfo)
|
||||
v,ok := jsonProcessor.mapMsg[pPackInfo.typ]
|
||||
if ok == false {
|
||||
return fmt.Errorf("cannot find msgtype %d is register!",pPackInfo.typ)
|
||||
}
|
||||
|
||||
v.msgHandler(userdata.(uint64),pPackInfo.msg)
|
||||
v.msgHandler(clientId,pPackInfo.msg)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (jsonProcessor *JsonProcessor) Unmarshal(data []byte) (interface{}, error) {
|
||||
func (jsonProcessor *JsonProcessor) Unmarshal(clientId uint64,data []byte) (interface{}, error) {
|
||||
typeStruct := struct {Type int `json:"typ"`}{}
|
||||
defer jsonProcessor.ReleaseByteSlice(data)
|
||||
err := json.Unmarshal(data, &typeStruct)
|
||||
@@ -78,7 +79,7 @@ func (jsonProcessor *JsonProcessor) Unmarshal(data []byte) (interface{}, error)
|
||||
return &JsonPackInfo{typ:msgType,msg:msgData},nil
|
||||
}
|
||||
|
||||
func (jsonProcessor *JsonProcessor) Marshal(msg interface{}) ([]byte, error) {
|
||||
func (jsonProcessor *JsonProcessor) Marshal(clientId uint64,msg interface{}) ([]byte, error) {
|
||||
rawMsg,err := json.Marshal(msg)
|
||||
if err != nil {
|
||||
return nil,err
|
||||
@@ -103,16 +104,26 @@ func (jsonProcessor *JsonProcessor) MakeRawMsg(msgType uint16,msg []byte) *JsonP
|
||||
return &JsonPackInfo{typ:msgType,rawMsg:msg}
|
||||
}
|
||||
|
||||
func (jsonProcessor *JsonProcessor) UnknownMsgRoute(msg interface{}, userData interface{}){
|
||||
jsonProcessor.unknownMessageHandler(userData.(uint64),msg.([]byte))
|
||||
func (jsonProcessor *JsonProcessor) UnknownMsgRoute(clientId uint64,msg interface{}){
|
||||
if jsonProcessor.unknownMessageHandler==nil {
|
||||
log.SDebug("Unknown message received from ",clientId)
|
||||
return
|
||||
}
|
||||
|
||||
jsonProcessor.unknownMessageHandler(clientId,msg.([]byte))
|
||||
|
||||
}
|
||||
|
||||
func (jsonProcessor *JsonProcessor) ConnectedRoute(userData interface{}){
|
||||
jsonProcessor.connectHandler(userData.(uint64))
|
||||
func (jsonProcessor *JsonProcessor) ConnectedRoute(clientId uint64){
|
||||
if jsonProcessor.connectHandler != nil {
|
||||
jsonProcessor.connectHandler(clientId)
|
||||
}
|
||||
}
|
||||
|
||||
func (jsonProcessor *JsonProcessor) DisConnectedRoute(userData interface{}){
|
||||
jsonProcessor.disconnectHandler(userData.(uint64))
|
||||
func (jsonProcessor *JsonProcessor) DisConnectedRoute(clientId uint64){
|
||||
if jsonProcessor.disconnectHandler != nil {
|
||||
jsonProcessor.disconnectHandler(clientId)
|
||||
}
|
||||
}
|
||||
|
||||
func (jsonProcessor *JsonProcessor) RegisterUnknownMsg(unknownMessageHandler UnknownMessageJsonHandler){
|
||||
|
||||
@@ -21,6 +21,7 @@ type WSClient struct {
|
||||
cons WebsocketConnSet
|
||||
wg sync.WaitGroup
|
||||
closeFlag bool
|
||||
messageType int
|
||||
}
|
||||
|
||||
func (client *WSClient) Start() {
|
||||
@@ -62,7 +63,7 @@ func (client *WSClient) init() {
|
||||
if client.cons != nil {
|
||||
log.SFatal("client is running")
|
||||
}
|
||||
|
||||
client.messageType = websocket.TextMessage
|
||||
client.cons = make(WebsocketConnSet)
|
||||
client.closeFlag = false
|
||||
client.dialer = websocket.Dialer{
|
||||
@@ -83,6 +84,9 @@ func (client *WSClient) dial() *websocket.Conn {
|
||||
}
|
||||
}
|
||||
|
||||
func (client *WSClient) SetMessageType(messageType int){
|
||||
client.messageType = messageType
|
||||
}
|
||||
func (client *WSClient) connect() {
|
||||
defer client.wg.Done()
|
||||
|
||||
@@ -102,7 +106,7 @@ reconnect:
|
||||
client.cons[conn] = struct{}{}
|
||||
client.Unlock()
|
||||
|
||||
wsConn := newWSConn(conn, client.PendingWriteNum, client.MaxMsgLen)
|
||||
wsConn := newWSConn(conn, client.PendingWriteNum, client.MaxMsgLen,client.messageType)
|
||||
agent := client.NewAgent(wsConn)
|
||||
agent.Run()
|
||||
|
||||
|
||||
@@ -18,7 +18,7 @@ type WSConn struct {
|
||||
closeFlag bool
|
||||
}
|
||||
|
||||
func newWSConn(conn *websocket.Conn, pendingWriteNum int, maxMsgLen uint32) *WSConn {
|
||||
func newWSConn(conn *websocket.Conn, pendingWriteNum int, maxMsgLen uint32,messageType int) *WSConn {
|
||||
wsConn := new(WSConn)
|
||||
wsConn.conn = conn
|
||||
wsConn.writeChan = make(chan []byte, pendingWriteNum)
|
||||
@@ -30,7 +30,7 @@ func newWSConn(conn *websocket.Conn, pendingWriteNum int, maxMsgLen uint32) *WSC
|
||||
break
|
||||
}
|
||||
|
||||
err := conn.WriteMessage(websocket.BinaryMessage, b)
|
||||
err := conn.WriteMessage(messageType, b)
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
|
||||
@@ -21,6 +21,7 @@ type WSServer struct {
|
||||
NewAgent func(*WSConn) Agent
|
||||
ln net.Listener
|
||||
handler *WSHandler
|
||||
messageType int
|
||||
}
|
||||
|
||||
type WSHandler struct {
|
||||
@@ -32,6 +33,11 @@ type WSHandler struct {
|
||||
conns WebsocketConnSet
|
||||
mutexConns sync.Mutex
|
||||
wg sync.WaitGroup
|
||||
messageType int
|
||||
}
|
||||
|
||||
func (handler *WSHandler) SetMessageType(messageType int) {
|
||||
handler.messageType = messageType
|
||||
}
|
||||
|
||||
func (handler *WSHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
@@ -45,6 +51,9 @@ func (handler *WSHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
return
|
||||
}
|
||||
conn.SetReadLimit(int64(handler.maxMsgLen))
|
||||
if handler.messageType == 0 {
|
||||
handler.messageType = websocket.TextMessage
|
||||
}
|
||||
|
||||
handler.wg.Add(1)
|
||||
defer handler.wg.Done()
|
||||
@@ -64,7 +73,7 @@ func (handler *WSHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
handler.conns[conn] = struct{}{}
|
||||
handler.mutexConns.Unlock()
|
||||
|
||||
wsConn := newWSConn(conn, handler.pendingWriteNum, handler.maxMsgLen)
|
||||
wsConn := newWSConn(conn, handler.pendingWriteNum, handler.maxMsgLen, handler.messageType)
|
||||
agent := handler.newAgent(wsConn)
|
||||
agent.Run()
|
||||
|
||||
@@ -76,6 +85,13 @@ func (handler *WSHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
agent.OnClose()
|
||||
}
|
||||
|
||||
func (server *WSServer) SetMessageType(messageType int) {
|
||||
server.messageType = messageType
|
||||
if server.handler != nil {
|
||||
server.handler.SetMessageType(messageType)
|
||||
}
|
||||
}
|
||||
|
||||
func (server *WSServer) Start() {
|
||||
ln, err := net.Listen("tcp", server.Addr)
|
||||
if err != nil {
|
||||
|
||||
@@ -68,11 +68,16 @@ type RpcHandler struct {
|
||||
}
|
||||
|
||||
type TriggerRpcEvent func(bConnect bool, clientSeq uint32, nodeId int)
|
||||
type IRpcListener interface {
|
||||
type INodeListener interface {
|
||||
OnNodeConnected(nodeId int)
|
||||
OnNodeDisconnect(nodeId int)
|
||||
}
|
||||
|
||||
type IDiscoveryServiceListener interface {
|
||||
OnDiscoveryService(nodeId int, serviceName []string)
|
||||
OnUnDiscoveryService(nodeId int, serviceName []string)
|
||||
}
|
||||
|
||||
type IRpcHandler interface {
|
||||
IRpcHandlerChannel
|
||||
GetName() string
|
||||
|
||||
@@ -338,7 +338,7 @@ func (server *Server) selfNodeRpcHandlerAsyncGo(client *Client, callerRpcHandler
|
||||
pCall.rpcHandler = callerRpcHandler
|
||||
pCall.callback = &callback
|
||||
pCall.Reply = reply
|
||||
|
||||
pCall.ServiceMethod = serviceMethod
|
||||
client.AddPending(pCall)
|
||||
req.requestHandle = func(Returns interface{}, Err RpcError) {
|
||||
v := client.RemovePending(callSeq)
|
||||
|
||||
@@ -2,31 +2,33 @@ package service
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"reflect"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/duanhf2012/origin/event"
|
||||
"github.com/duanhf2012/origin/log"
|
||||
rpcHandle "github.com/duanhf2012/origin/rpc"
|
||||
"github.com/duanhf2012/origin/util/timer"
|
||||
"reflect"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
)
|
||||
|
||||
const InitModuleId = 1e9
|
||||
|
||||
type IModule interface {
|
||||
SetModuleId(moduleId uint32) bool
|
||||
GetModuleId() uint32
|
||||
AddModule(module IModule) (uint32,error)
|
||||
AddModule(module IModule) (uint32, error)
|
||||
GetModule(moduleId uint32) IModule
|
||||
GetAncestor()IModule
|
||||
GetAncestor() IModule
|
||||
ReleaseModule(moduleId uint32)
|
||||
NewModuleId() uint32
|
||||
GetParent()IModule
|
||||
GetParent() IModule
|
||||
OnInit() error
|
||||
OnRelease()
|
||||
getBaseModule() IModule
|
||||
GetService() IService
|
||||
GetModuleName() string
|
||||
GetEventProcessor()event.IEventProcessor
|
||||
GetEventProcessor() event.IEventProcessor
|
||||
NotifyEvent(ev event.IEvent)
|
||||
}
|
||||
|
||||
@@ -38,25 +40,25 @@ type IModuleTimer interface {
|
||||
|
||||
type Module struct {
|
||||
rpcHandle.IRpcHandler
|
||||
moduleId uint32 //模块Id
|
||||
moduleName string //模块名称
|
||||
parent IModule //父亲
|
||||
self IModule //自己
|
||||
child map[uint32]IModule //孩子们
|
||||
mapActiveTimer map[timer.ITimer]struct{}
|
||||
moduleId uint32 //模块Id
|
||||
moduleName string //模块名称
|
||||
parent IModule //父亲
|
||||
self IModule //自己
|
||||
child map[uint32]IModule //孩子们
|
||||
mapActiveTimer map[timer.ITimer]struct{}
|
||||
mapActiveIdTimer map[uint64]timer.ITimer
|
||||
dispatcher *timer.Dispatcher //timer
|
||||
dispatcher *timer.Dispatcher //timer
|
||||
|
||||
//根结点
|
||||
ancestor IModule //始祖
|
||||
seedModuleId uint32 //模块id种子
|
||||
descendants map[uint32]IModule //始祖的后裔们
|
||||
ancestor IModule //始祖
|
||||
seedModuleId uint32 //模块id种子
|
||||
descendants map[uint32]IModule //始祖的后裔们
|
||||
|
||||
//事件管道
|
||||
eventHandler event.IEventHandler
|
||||
}
|
||||
|
||||
func (m *Module) SetModuleId(moduleId uint32) bool{
|
||||
func (m *Module) SetModuleId(moduleId uint32) bool {
|
||||
if m.moduleId > 0 {
|
||||
return false
|
||||
}
|
||||
@@ -65,35 +67,35 @@ func (m *Module) SetModuleId(moduleId uint32) bool{
|
||||
return true
|
||||
}
|
||||
|
||||
func (m *Module) GetModuleId() uint32{
|
||||
func (m *Module) GetModuleId() uint32 {
|
||||
return m.moduleId
|
||||
}
|
||||
|
||||
func (m *Module) GetModuleName() string{
|
||||
func (m *Module) GetModuleName() string {
|
||||
return m.moduleName
|
||||
}
|
||||
|
||||
func (m *Module) OnInit() error{
|
||||
return nil
|
||||
func (m *Module) OnInit() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *Module) AddModule(module IModule) (uint32,error){
|
||||
func (m *Module) AddModule(module IModule) (uint32, error) {
|
||||
//没有事件处理器不允许加入其他模块
|
||||
if m.GetEventProcessor() == nil {
|
||||
return 0,fmt.Errorf("module %+v Event Processor is nil", m.self)
|
||||
return 0, fmt.Errorf("module %+v Event Processor is nil", m.self)
|
||||
}
|
||||
|
||||
pAddModule := module.getBaseModule().(*Module)
|
||||
if pAddModule.GetModuleId()==0 {
|
||||
if pAddModule.GetModuleId() == 0 {
|
||||
pAddModule.moduleId = m.NewModuleId()
|
||||
}
|
||||
|
||||
if m.child == nil {
|
||||
m.child = map[uint32]IModule{}
|
||||
}
|
||||
_,ok := m.child[module.GetModuleId()]
|
||||
_, ok := m.child[module.GetModuleId()]
|
||||
if ok == true {
|
||||
return 0,fmt.Errorf("exists module id %d",module.GetModuleId())
|
||||
return 0, fmt.Errorf("exists module id %d", module.GetModuleId())
|
||||
}
|
||||
pAddModule.IRpcHandler = m.IRpcHandler
|
||||
pAddModule.self = module
|
||||
@@ -105,17 +107,17 @@ func (m *Module) AddModule(module IModule) (uint32,error){
|
||||
pAddModule.eventHandler.Init(m.eventHandler.GetEventProcessor())
|
||||
err := module.OnInit()
|
||||
if err != nil {
|
||||
return 0,err
|
||||
return 0, err
|
||||
}
|
||||
|
||||
m.child[module.GetModuleId()] = module
|
||||
m.ancestor.getBaseModule().(*Module).descendants[module.GetModuleId()] = module
|
||||
|
||||
log.SDebug("Add module ",module.GetModuleName()," completed")
|
||||
return module.GetModuleId(),nil
|
||||
log.SDebug("Add module ", module.GetModuleName(), " completed")
|
||||
return module.GetModuleId(), nil
|
||||
}
|
||||
|
||||
func (m *Module) ReleaseModule(moduleId uint32){
|
||||
func (m *Module) ReleaseModule(moduleId uint32) {
|
||||
pModule := m.GetModule(moduleId).getBaseModule().(*Module)
|
||||
|
||||
//释放子孙
|
||||
@@ -123,19 +125,19 @@ func (m *Module) ReleaseModule(moduleId uint32){
|
||||
m.ReleaseModule(id)
|
||||
}
|
||||
|
||||
pModule.GetEventHandler().Destroy()
|
||||
pModule.self.OnRelease()
|
||||
pModule.GetEventHandler().Destroy()
|
||||
log.SDebug("Release module ", pModule.GetModuleName())
|
||||
for pTimer := range pModule.mapActiveTimer {
|
||||
pTimer.Cancel()
|
||||
}
|
||||
|
||||
for _,t := range pModule.mapActiveIdTimer {
|
||||
for _, t := range pModule.mapActiveIdTimer {
|
||||
t.Cancel()
|
||||
}
|
||||
|
||||
delete(m.child,moduleId)
|
||||
delete (m.ancestor.getBaseModule().(*Module).descendants,moduleId)
|
||||
delete(m.child, moduleId)
|
||||
delete(m.ancestor.getBaseModule().(*Module).descendants, moduleId)
|
||||
|
||||
//清理被删除的Module
|
||||
pModule.self = nil
|
||||
@@ -149,16 +151,17 @@ func (m *Module) ReleaseModule(moduleId uint32){
|
||||
pModule.mapActiveIdTimer = nil
|
||||
}
|
||||
|
||||
func (m *Module) NewModuleId() uint32{
|
||||
m.ancestor.getBaseModule().(*Module).seedModuleId+=1
|
||||
func (m *Module) NewModuleId() uint32 {
|
||||
m.ancestor.getBaseModule().(*Module).seedModuleId += 1
|
||||
return m.ancestor.getBaseModule().(*Module).seedModuleId
|
||||
}
|
||||
|
||||
var timerSeedId uint32
|
||||
func (m *Module) GenTimerId() uint64{
|
||||
for{
|
||||
newTimerId := (uint64(m.GetModuleId())<<32)|uint64(atomic.AddUint32(&timerSeedId,1))
|
||||
if _,ok := m.mapActiveIdTimer[newTimerId];ok == true {
|
||||
|
||||
func (m *Module) GenTimerId() uint64 {
|
||||
for {
|
||||
newTimerId := (uint64(m.GetModuleId()) << 32) | uint64(atomic.AddUint32(&timerSeedId, 1))
|
||||
if _, ok := m.mapActiveIdTimer[newTimerId]; ok == true {
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -166,33 +169,32 @@ func (m *Module) GenTimerId() uint64{
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
func (m *Module) GetAncestor()IModule{
|
||||
func (m *Module) GetAncestor() IModule {
|
||||
return m.ancestor
|
||||
}
|
||||
|
||||
func (m *Module) GetModule(moduleId uint32) IModule{
|
||||
iModule,ok := m.GetAncestor().getBaseModule().(*Module).descendants[moduleId]
|
||||
func (m *Module) GetModule(moduleId uint32) IModule {
|
||||
iModule, ok := m.GetAncestor().getBaseModule().(*Module).descendants[moduleId]
|
||||
if ok == false {
|
||||
return nil
|
||||
}
|
||||
return iModule
|
||||
}
|
||||
|
||||
func (m *Module) getBaseModule() IModule{
|
||||
func (m *Module) getBaseModule() IModule {
|
||||
return m
|
||||
}
|
||||
|
||||
func (m *Module) GetParent()IModule{
|
||||
func (m *Module) GetParent() IModule {
|
||||
return m.parent
|
||||
}
|
||||
|
||||
func (m *Module) OnCloseTimer(t timer.ITimer){
|
||||
delete(m.mapActiveIdTimer,t.GetId())
|
||||
delete(m.mapActiveTimer,t)
|
||||
func (m *Module) OnCloseTimer(t timer.ITimer) {
|
||||
delete(m.mapActiveIdTimer, t.GetId())
|
||||
delete(m.mapActiveTimer, t)
|
||||
}
|
||||
|
||||
func (m *Module) OnAddTimer(t timer.ITimer){
|
||||
func (m *Module) OnAddTimer(t timer.ITimer) {
|
||||
if t != nil {
|
||||
if m.mapActiveTimer == nil {
|
||||
m.mapActiveTimer = map[timer.ITimer]struct{}{}
|
||||
@@ -204,33 +206,33 @@ func (m *Module) OnAddTimer(t timer.ITimer){
|
||||
|
||||
func (m *Module) AfterFunc(d time.Duration, cb func(*timer.Timer)) *timer.Timer {
|
||||
if m.mapActiveTimer == nil {
|
||||
m.mapActiveTimer =map[timer.ITimer]struct{}{}
|
||||
m.mapActiveTimer = map[timer.ITimer]struct{}{}
|
||||
}
|
||||
|
||||
return m.dispatcher.AfterFunc(d,nil,cb,m.OnCloseTimer,m.OnAddTimer)
|
||||
return m.dispatcher.AfterFunc(d, nil, cb, m.OnCloseTimer, m.OnAddTimer)
|
||||
}
|
||||
|
||||
func (m *Module) CronFunc(cronExpr *timer.CronExpr, cb func(*timer.Cron)) *timer.Cron {
|
||||
if m.mapActiveTimer == nil {
|
||||
m.mapActiveTimer =map[timer.ITimer]struct{}{}
|
||||
m.mapActiveTimer = map[timer.ITimer]struct{}{}
|
||||
}
|
||||
|
||||
return m.dispatcher.CronFunc(cronExpr,nil,cb,m.OnCloseTimer,m.OnAddTimer)
|
||||
return m.dispatcher.CronFunc(cronExpr, nil, cb, m.OnCloseTimer, m.OnAddTimer)
|
||||
}
|
||||
|
||||
func (m *Module) NewTicker(d time.Duration, cb func(*timer.Ticker)) *timer.Ticker {
|
||||
if m.mapActiveTimer == nil {
|
||||
m.mapActiveTimer =map[timer.ITimer]struct{}{}
|
||||
m.mapActiveTimer = map[timer.ITimer]struct{}{}
|
||||
}
|
||||
|
||||
return m.dispatcher.TickerFunc(d,nil,cb,m.OnCloseTimer,m.OnAddTimer)
|
||||
return m.dispatcher.TickerFunc(d, nil, cb, m.OnCloseTimer, m.OnAddTimer)
|
||||
}
|
||||
|
||||
func (m *Module) cb(*timer.Timer){
|
||||
func (m *Module) cb(*timer.Timer) {
|
||||
|
||||
}
|
||||
|
||||
func (m *Module) SafeAfterFunc(timerId *uint64,d time.Duration, AdditionData interface{},cb func(uint64,interface{})) {
|
||||
func (m *Module) SafeAfterFunc(timerId *uint64, d time.Duration, AdditionData interface{}, cb func(uint64, interface{})) {
|
||||
if m.mapActiveIdTimer == nil {
|
||||
m.mapActiveIdTimer = map[uint64]timer.ITimer{}
|
||||
}
|
||||
@@ -240,45 +242,45 @@ func (m *Module) SafeAfterFunc(timerId *uint64,d time.Duration, AdditionData int
|
||||
}
|
||||
|
||||
*timerId = m.GenTimerId()
|
||||
t := m.dispatcher.AfterFunc(d,cb,nil,m.OnCloseTimer,m.OnAddTimer)
|
||||
t := m.dispatcher.AfterFunc(d, cb, nil, m.OnCloseTimer, m.OnAddTimer)
|
||||
t.AdditionData = AdditionData
|
||||
t.Id = *timerId
|
||||
m.mapActiveIdTimer[*timerId] = t
|
||||
}
|
||||
|
||||
func (m *Module) SafeCronFunc(cronId *uint64,cronExpr *timer.CronExpr, AdditionData interface{}, cb func(uint64,interface{})) {
|
||||
func (m *Module) SafeCronFunc(cronId *uint64, cronExpr *timer.CronExpr, AdditionData interface{}, cb func(uint64, interface{})) {
|
||||
if m.mapActiveIdTimer == nil {
|
||||
m.mapActiveIdTimer = map[uint64]timer.ITimer{}
|
||||
}
|
||||
|
||||
*cronId = m.GenTimerId()
|
||||
c := m.dispatcher.CronFunc(cronExpr,cb,nil,m.OnCloseTimer,m.OnAddTimer)
|
||||
c := m.dispatcher.CronFunc(cronExpr, cb, nil, m.OnCloseTimer, m.OnAddTimer)
|
||||
c.AdditionData = AdditionData
|
||||
c.Id = *cronId
|
||||
m.mapActiveIdTimer[*cronId] = c
|
||||
}
|
||||
|
||||
func (m *Module) SafeNewTicker(tickerId *uint64,d time.Duration, AdditionData interface{}, cb func(uint64,interface{})) {
|
||||
func (m *Module) SafeNewTicker(tickerId *uint64, d time.Duration, AdditionData interface{}, cb func(uint64, interface{})) {
|
||||
if m.mapActiveIdTimer == nil {
|
||||
m.mapActiveIdTimer = map[uint64]timer.ITimer{}
|
||||
}
|
||||
|
||||
*tickerId = m.GenTimerId()
|
||||
t := m.dispatcher.TickerFunc(d,cb,nil,m.OnCloseTimer,m.OnAddTimer)
|
||||
t := m.dispatcher.TickerFunc(d, cb, nil, m.OnCloseTimer, m.OnAddTimer)
|
||||
t.AdditionData = AdditionData
|
||||
t.Id = *tickerId
|
||||
m.mapActiveIdTimer[*tickerId] = t
|
||||
}
|
||||
|
||||
func (m *Module) CancelTimerId(timerId *uint64) bool{
|
||||
func (m *Module) CancelTimerId(timerId *uint64) bool {
|
||||
if m.mapActiveIdTimer == nil {
|
||||
log.SError("mapActiveIdTimer is nil")
|
||||
return false
|
||||
}
|
||||
|
||||
t,ok := m.mapActiveIdTimer[*timerId]
|
||||
t, ok := m.mapActiveIdTimer[*timerId]
|
||||
if ok == false {
|
||||
log.SError("cannot find timer id ",timerId)
|
||||
log.SError("cannot find timer id ", timerId)
|
||||
return false
|
||||
}
|
||||
|
||||
@@ -287,23 +289,21 @@ func (m *Module) CancelTimerId(timerId *uint64) bool{
|
||||
return true
|
||||
}
|
||||
|
||||
|
||||
|
||||
func (m *Module) OnRelease(){
|
||||
func (m *Module) OnRelease() {
|
||||
}
|
||||
|
||||
func (m *Module) GetService() IService {
|
||||
return m.GetAncestor().(IService)
|
||||
}
|
||||
|
||||
func (m *Module) GetEventProcessor() event.IEventProcessor{
|
||||
func (m *Module) GetEventProcessor() event.IEventProcessor {
|
||||
return m.eventHandler.GetEventProcessor()
|
||||
}
|
||||
|
||||
func (m *Module) NotifyEvent(ev event.IEvent){
|
||||
func (m *Module) NotifyEvent(ev event.IEvent) {
|
||||
m.eventHandler.NotifyEvent(ev)
|
||||
}
|
||||
|
||||
func (m *Module) GetEventHandler() event.IEventHandler{
|
||||
func (m *Module) GetEventHandler() event.IEventHandler {
|
||||
return m.eventHandler
|
||||
}
|
||||
}
|
||||
|
||||
@@ -22,18 +22,24 @@ var timerDispatcherLen = 100000
|
||||
|
||||
type IService interface {
|
||||
Init(iService IService,getClientFun rpc.FuncRpcClient,getServerFun rpc.FuncRpcServer,serviceCfg interface{})
|
||||
SetName(serviceName string)
|
||||
GetName() string
|
||||
Wait()
|
||||
Start()
|
||||
|
||||
OnSetup(iService IService)
|
||||
OnInit() error
|
||||
OnStart()
|
||||
OnRelease()
|
||||
Wait()
|
||||
Start()
|
||||
|
||||
SetName(serviceName string)
|
||||
GetName() string
|
||||
GetRpcHandler() rpc.IRpcHandler
|
||||
GetServiceCfg()interface{}
|
||||
OpenProfiler()
|
||||
GetProfiler() *profiler.Profiler
|
||||
GetServiceEventChannelNum() int
|
||||
GetServiceTimerChannelNum() int
|
||||
|
||||
SetEventChannelNum(num int)
|
||||
OpenProfiler()
|
||||
}
|
||||
|
||||
// eventPool的内存池,缓存Event
|
||||
@@ -52,7 +58,8 @@ type Service struct {
|
||||
startStatus bool
|
||||
eventProcessor event.IEventProcessor
|
||||
profiler *profiler.Profiler //性能分析器
|
||||
rpcEventLister rpc.IRpcListener
|
||||
nodeEventLister rpc.INodeListener
|
||||
discoveryServiceLister rpc.IDiscoveryServiceListener
|
||||
chanEvent chan event.IEvent
|
||||
}
|
||||
|
||||
@@ -62,6 +69,13 @@ type RpcConnEvent struct{
|
||||
NodeId int
|
||||
}
|
||||
|
||||
// DiscoveryServiceEvent 发现服务结点
|
||||
type DiscoveryServiceEvent struct{
|
||||
IsDiscovery bool
|
||||
ServiceName []string
|
||||
NodeId int
|
||||
}
|
||||
|
||||
func SetMaxServiceChannel(maxEventChannel int){
|
||||
maxServiceEventChannel = maxEventChannel
|
||||
eventPool = originSync.NewPoolEx(make(chan originSync.IPoolData, maxServiceEventChannel), func() originSync.IPoolData {
|
||||
@@ -69,8 +83,12 @@ func SetMaxServiceChannel(maxEventChannel int){
|
||||
})
|
||||
}
|
||||
|
||||
func (rpcEventData *DiscoveryServiceEvent) GetEventType() event.EventType{
|
||||
return event.Sys_Event_DiscoverService
|
||||
}
|
||||
|
||||
func (rpcEventData *RpcConnEvent) GetEventType() event.EventType{
|
||||
return event.Sys_Event_Rpc_Event
|
||||
return event.Sys_Event_Node_Event
|
||||
}
|
||||
|
||||
func (s *Service) OnSetup(iService IService){
|
||||
@@ -88,7 +106,10 @@ func (s *Service) OpenProfiler() {
|
||||
|
||||
func (s *Service) Init(iService IService,getClientFun rpc.FuncRpcClient,getServerFun rpc.FuncRpcServer,serviceCfg interface{}) {
|
||||
s.dispatcher =timer.NewDispatcher(timerDispatcherLen)
|
||||
s.chanEvent = make(chan event.IEvent,maxServiceEventChannel)
|
||||
if s.chanEvent == nil {
|
||||
s.chanEvent = make(chan event.IEvent,maxServiceEventChannel)
|
||||
}
|
||||
|
||||
s.rpcHandler.InitRpcHandler(iService.(rpc.IRpcHandler),getClientFun,getServerFun,iService.(rpc.IRpcHandlerChannel))
|
||||
s.IRpcHandler = &s.rpcHandler
|
||||
s.self = iService.(IModule)
|
||||
@@ -259,24 +280,44 @@ func (s *Service) RegRawRpc(rpcMethodId uint32,rawRpcCB rpc.RawRpcCallBack){
|
||||
func (s *Service) OnStart(){
|
||||
}
|
||||
|
||||
func (s *Service) OnRpcEvent(ev event.IEvent){
|
||||
func (s *Service) OnNodeEvent(ev event.IEvent){
|
||||
event := ev.(*RpcConnEvent)
|
||||
if event.IsConnect {
|
||||
s.rpcEventLister.OnNodeConnected(event.NodeId)
|
||||
s.nodeEventLister.OnNodeConnected(event.NodeId)
|
||||
}else{
|
||||
s.rpcEventLister.OnNodeDisconnect(event.NodeId)
|
||||
s.nodeEventLister.OnNodeDisconnect(event.NodeId)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Service) RegRpcListener(rpcEventLister rpc.IRpcListener) {
|
||||
s.rpcEventLister = rpcEventLister
|
||||
s.RegEventReceiverFunc(event.Sys_Event_Rpc_Event,s.GetEventHandler(),s.OnRpcEvent)
|
||||
func (s *Service) OnDiscoverServiceEvent(ev event.IEvent){
|
||||
event := ev.(*DiscoveryServiceEvent)
|
||||
if event.IsDiscovery {
|
||||
s.discoveryServiceLister.OnDiscoveryService(event.NodeId,event.ServiceName)
|
||||
}else{
|
||||
s.discoveryServiceLister.OnUnDiscoveryService(event.NodeId,event.ServiceName)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Service) RegRpcListener(rpcEventLister rpc.INodeListener) {
|
||||
s.nodeEventLister = rpcEventLister
|
||||
s.RegEventReceiverFunc(event.Sys_Event_Node_Event,s.GetEventHandler(),s.OnNodeEvent)
|
||||
RegRpcEventFun(s.GetName())
|
||||
}
|
||||
|
||||
func (s *Service) UnRegRpcListener(rpcLister rpc.IRpcListener) {
|
||||
s.UnRegEventReceiverFunc(event.Sys_Event_Rpc_Event,s.GetEventHandler())
|
||||
RegRpcEventFun(s.GetName())
|
||||
func (s *Service) UnRegRpcListener(rpcLister rpc.INodeListener) {
|
||||
s.UnRegEventReceiverFunc(event.Sys_Event_Node_Event,s.GetEventHandler())
|
||||
UnRegRpcEventFun(s.GetName())
|
||||
}
|
||||
|
||||
func (s *Service) RegDiscoverListener(discoveryServiceListener rpc.IDiscoveryServiceListener) {
|
||||
s.discoveryServiceLister = discoveryServiceListener
|
||||
s.RegEventReceiverFunc(event.Sys_Event_DiscoverService,s.GetEventHandler(),s.OnDiscoverServiceEvent)
|
||||
RegDiscoveryServiceEventFun(s.GetName())
|
||||
}
|
||||
|
||||
func (s *Service) UnRegDiscoverListener(rpcLister rpc.INodeListener) {
|
||||
s.UnRegEventReceiverFunc(event.Sys_Event_DiscoverService,s.GetEventHandler())
|
||||
UnRegDiscoveryServiceEventFun(s.GetName())
|
||||
}
|
||||
|
||||
|
||||
@@ -311,6 +352,21 @@ func (s *Service) pushEvent(ev event.IEvent) error{
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Service) GetServiceEventChannelNum() int{
|
||||
return len(s.chanEvent)
|
||||
}
|
||||
|
||||
func (s *Service) GetServiceTimerChannelNum() int{
|
||||
return len(s.dispatcher.ChanTimer)
|
||||
}
|
||||
|
||||
func (s *Service) SetEventChannelNum(num int){
|
||||
if s.chanEvent == nil {
|
||||
s.chanEvent = make(chan event.IEvent,num)
|
||||
}else {
|
||||
panic("this stage cannot be set")
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Service) SetGoRoutineNum(goroutineNum int32) bool {
|
||||
//已经开始状态不允许修改协程数量,打开性能分析器不允许开多线程
|
||||
|
||||
@@ -5,7 +5,12 @@ var mapServiceName map[string]IService
|
||||
var setupServiceList []IService
|
||||
|
||||
type RegRpcEventFunType func(serviceName string)
|
||||
type RegDiscoveryServiceEventFunType func(serviceName string)
|
||||
var RegRpcEventFun RegRpcEventFunType
|
||||
var UnRegRpcEventFun RegRpcEventFunType
|
||||
|
||||
var RegDiscoveryServiceEventFun RegDiscoveryServiceEventFunType
|
||||
var UnRegDiscoveryServiceEventFun RegDiscoveryServiceEventFunType
|
||||
|
||||
func init(){
|
||||
mapServiceName = map[string]IService{}
|
||||
|
||||
@@ -175,10 +175,12 @@ func (slf *HttpSession) Write(msg []byte) {
|
||||
|
||||
func (slf *HttpSession) WriteJsonDone(statusCode int,msgJson interface{}) error {
|
||||
msg, err := json.Marshal(msgJson)
|
||||
if err == nil {
|
||||
slf.Write(msg)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
slf.statusCode = statusCode
|
||||
slf.Write(msg)
|
||||
slf.Done()
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -8,6 +8,7 @@ import (
|
||||
"github.com/duanhf2012/origin/network/processor"
|
||||
"github.com/duanhf2012/origin/node"
|
||||
"github.com/duanhf2012/origin/service"
|
||||
"sync/atomic"
|
||||
"sync"
|
||||
"time"
|
||||
"runtime"
|
||||
@@ -42,12 +43,12 @@ const Default_ReadDeadline = 180 //30s
|
||||
const Default_WriteDeadline = 180 //30s
|
||||
|
||||
const (
|
||||
MaxNodeId = 1<<10 - 1 //Uint10
|
||||
MaxSeed = 1<<22 - 1 //MaxUint24
|
||||
MaxNodeId = 1<<14 - 1 //最大值 16383
|
||||
MaxSeed = 1<<19 - 1 //最大值 524287
|
||||
MaxTime = 1<<31 - 1 //最大值 2147483647
|
||||
)
|
||||
|
||||
var seed uint32
|
||||
var seedLocker sync.Mutex
|
||||
|
||||
type TcpPack struct {
|
||||
Type TcpPackType //0表示连接 1表示断开 2表示数据
|
||||
@@ -66,16 +67,14 @@ func (tcpService *TcpService) genId() uint64 {
|
||||
panic("nodeId exceeds the maximum!")
|
||||
}
|
||||
|
||||
seedLocker.Lock()
|
||||
seed = (seed+1)%MaxSeed
|
||||
seedLocker.Unlock()
|
||||
|
||||
nowTime := uint64(time.Now().Second())
|
||||
return (uint64(node.GetNodeId())<<54)|(nowTime<<22)|uint64(seed)
|
||||
newSeed := atomic.AddUint32(&seed,1) % MaxSeed
|
||||
nowTime := uint64(time.Now().Unix())%MaxTime
|
||||
return (uint64(node.GetNodeId())<<50)|(nowTime<<19)|uint64(newSeed)
|
||||
}
|
||||
|
||||
|
||||
func GetNodeId(agentId uint64) int {
|
||||
return int(agentId>>54)
|
||||
return int(agentId>>50)
|
||||
}
|
||||
|
||||
func (tcpService *TcpService) OnInit() error{
|
||||
|
||||
@@ -7,19 +7,27 @@ import (
|
||||
"github.com/duanhf2012/origin/network"
|
||||
"github.com/duanhf2012/origin/network/processor"
|
||||
"github.com/duanhf2012/origin/service"
|
||||
"github.com/duanhf2012/origin/node"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
)
|
||||
|
||||
|
||||
|
||||
type WSService struct {
|
||||
service.Service
|
||||
wsServer network.WSServer
|
||||
|
||||
mapClientLocker sync.RWMutex
|
||||
mapClient map[uint64] *WSClient
|
||||
initClientId uint64
|
||||
process processor.IProcessor
|
||||
|
||||
|
||||
}
|
||||
|
||||
var seed uint32
|
||||
|
||||
type WSPackType int8
|
||||
const(
|
||||
WPT_Connected WSPackType = 0
|
||||
@@ -32,6 +40,12 @@ const Default_WS_MaxConnNum = 3000
|
||||
const Default_WS_PendingWriteNum = 10000
|
||||
const Default_WS_MaxMsgLen = 65535
|
||||
|
||||
const (
|
||||
MaxNodeId = 1<<14 - 1 //最大值 16383
|
||||
MaxSeed = 1<<19 - 1 //最大值 524287
|
||||
MaxTime = 1<<31 - 1 //最大值 2147483647
|
||||
)
|
||||
|
||||
type WSClient struct {
|
||||
id uint64
|
||||
wsConn *network.WSConn
|
||||
@@ -46,6 +60,7 @@ type WSPack struct {
|
||||
}
|
||||
|
||||
func (ws *WSService) OnInit() error{
|
||||
|
||||
iConfig := ws.GetServiceCfg()
|
||||
if iConfig == nil {
|
||||
return fmt.Errorf("%s service config is error!", ws.GetName())
|
||||
@@ -80,6 +95,10 @@ func (ws *WSService) OnInit() error{
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ws *WSService) SetMessageType(messageType int){
|
||||
ws.wsServer.SetMessageType(messageType)
|
||||
}
|
||||
|
||||
func (ws *WSService) WSEventHandler(ev event.IEvent) {
|
||||
pack := ev.(*event.Event).Data.(*WSPack)
|
||||
switch pack.Type {
|
||||
@@ -88,9 +107,9 @@ func (ws *WSService) WSEventHandler(ev event.IEvent) {
|
||||
case WPT_DisConnected:
|
||||
pack.MsgProcessor.DisConnectedRoute(pack.ClientId)
|
||||
case WPT_UnknownPack:
|
||||
pack.MsgProcessor.UnknownMsgRoute(pack.Data,pack.ClientId)
|
||||
pack.MsgProcessor.UnknownMsgRoute(pack.ClientId,pack.Data)
|
||||
case WPT_Pack:
|
||||
pack.MsgProcessor.MsgRoute(pack.Data, pack.ClientId)
|
||||
pack.MsgProcessor.MsgRoute(pack.ClientId,pack.Data)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -99,20 +118,30 @@ func (ws *WSService) SetProcessor(process processor.IProcessor,handler event.IEv
|
||||
ws.RegEventReceiverFunc(event.Sys_Event_WebSocket,handler, ws.WSEventHandler)
|
||||
}
|
||||
|
||||
func (ws *WSService) genId() uint64 {
|
||||
if node.GetNodeId()>MaxNodeId{
|
||||
panic("nodeId exceeds the maximum!")
|
||||
}
|
||||
|
||||
newSeed := atomic.AddUint32(&seed,1) % MaxSeed
|
||||
nowTime := uint64(time.Now().Unix())%MaxTime
|
||||
return (uint64(node.GetNodeId())<<50)|(nowTime<<19)|uint64(newSeed)
|
||||
}
|
||||
|
||||
func (ws *WSService) NewWSClient(conn *network.WSConn) network.Agent {
|
||||
ws.mapClientLocker.Lock()
|
||||
defer ws.mapClientLocker.Unlock()
|
||||
|
||||
for {
|
||||
ws.initClientId+=1
|
||||
_,ok := ws.mapClient[ws.initClientId]
|
||||
clientId := ws.genId()
|
||||
_,ok := ws.mapClient[clientId]
|
||||
if ok == true {
|
||||
continue
|
||||
}
|
||||
|
||||
pClient := &WSClient{wsConn:conn, id: ws.initClientId}
|
||||
pClient := &WSClient{wsConn:conn, id: clientId}
|
||||
pClient.wsService = ws
|
||||
ws.mapClient[ws.initClientId] = pClient
|
||||
ws.mapClient[clientId] = pClient
|
||||
return pClient
|
||||
}
|
||||
|
||||
@@ -131,7 +160,7 @@ func (slf *WSClient) Run() {
|
||||
log.Debug("read client id %d is error:%+v",slf.id,err)
|
||||
break
|
||||
}
|
||||
data,err:=slf.wsService.process.Unmarshal(bytes)
|
||||
data,err:=slf.wsService.process.Unmarshal(slf.id,bytes)
|
||||
if err != nil {
|
||||
slf.wsService.NotifyEvent(&event.Event{Type:event.Sys_Event_WebSocket,Data:&WSPack{ClientId:slf.id,Type:WPT_UnknownPack,Data:bytes,MsgProcessor:slf.wsService.process}})
|
||||
continue
|
||||
@@ -156,7 +185,7 @@ func (ws *WSService) SendMsg(clientid uint64,msg interface{}) error{
|
||||
}
|
||||
|
||||
ws.mapClientLocker.Unlock()
|
||||
bytes,err := ws.process.Marshal(msg)
|
||||
bytes,err := ws.process.Marshal(clientid,msg)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
40
util/algorithms/BiSearch.go
Normal file
40
util/algorithms/BiSearch.go
Normal file
@@ -0,0 +1,40 @@
|
||||
package algorithms
|
||||
|
||||
|
||||
type NumberType interface {
|
||||
int | int8 | int16 | int32 | int64 | string | float32 | float64 | uint | uint8 | uint16 | uint32 | uint64
|
||||
}
|
||||
|
||||
type Element[ValueType NumberType] interface {
|
||||
GetValue() ValueType
|
||||
}
|
||||
|
||||
//BiSearch 二分查找,切片必需有序号。matchUp表示是否向上范围查找。比如:数列10 20 30 ,当value传入25时,返回结果是2,表示落到3的范围
|
||||
func BiSearch[ValueType NumberType, T Element[ValueType]](sElement []T, value ValueType, matchUp bool) int {
|
||||
low, high := 0, len(sElement)-1
|
||||
if high == -1 {
|
||||
return -1
|
||||
}
|
||||
|
||||
var mid int
|
||||
for low <= high {
|
||||
mid = low + (high-low)>>1
|
||||
if sElement[mid].GetValue() > value {
|
||||
high = mid - 1
|
||||
} else if sElement[mid].GetValue() < value {
|
||||
low = mid + 1
|
||||
} else {
|
||||
return mid
|
||||
}
|
||||
}
|
||||
|
||||
if matchUp == true {
|
||||
if (sElement[mid].GetValue()) < value &&
|
||||
(mid+1 < len(sElement)-1) {
|
||||
return mid + 1
|
||||
}
|
||||
return mid
|
||||
}
|
||||
|
||||
return -1
|
||||
}
|
||||
28
util/algorithms/BiSearch_test.go
Normal file
28
util/algorithms/BiSearch_test.go
Normal file
@@ -0,0 +1,28 @@
|
||||
package algorithms
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"fmt"
|
||||
)
|
||||
|
||||
type MyElement struct {
|
||||
Score int
|
||||
}
|
||||
|
||||
func (s MyElement) GetValue() int {
|
||||
return s.Score
|
||||
}
|
||||
|
||||
func Test_BiSearch(t *testing.T){
|
||||
var schedulePoolCfgList []MyElement = []MyElement{MyElement{10}, MyElement{12}, MyElement{14}, MyElement{16}} //
|
||||
index := BiSearch[int, MyElement](schedulePoolCfgList, 9, true)
|
||||
index = BiSearch[int, MyElement](schedulePoolCfgList, 10, true)
|
||||
index = BiSearch[int, MyElement](schedulePoolCfgList, 11, true)
|
||||
index = BiSearch[int, MyElement](schedulePoolCfgList, 12, true)
|
||||
index = BiSearch[int, MyElement](schedulePoolCfgList, 13, true)
|
||||
index = BiSearch[int, MyElement](schedulePoolCfgList, 14, true)
|
||||
index = BiSearch[int, MyElement](schedulePoolCfgList, 15, true)
|
||||
index = BiSearch[int, MyElement](schedulePoolCfgList, 16, true)
|
||||
index = BiSearch[int, MyElement](schedulePoolCfgList, 17, true)
|
||||
fmt.Println(index)
|
||||
}
|
||||
37
util/math/math.go
Normal file
37
util/math/math.go
Normal file
@@ -0,0 +1,37 @@
|
||||
package math
|
||||
|
||||
type NumberType interface {
|
||||
int | int8 | int16 | int32 | int64 | float32 | float64 | uint | uint8 | uint16 | uint32 | uint64
|
||||
}
|
||||
|
||||
type SignedNumberType interface {
|
||||
int | int8 | int16 | int32 | int64 | float32 | float64
|
||||
}
|
||||
|
||||
type FloatType interface {
|
||||
float32 | float64
|
||||
}
|
||||
|
||||
func Max[NumType NumberType](number1 NumType, number2 NumType) NumType {
|
||||
if number1 > number2 {
|
||||
return number1
|
||||
}
|
||||
|
||||
return number2
|
||||
}
|
||||
|
||||
func Min[NumType NumberType](number1 NumType, number2 NumType) NumType {
|
||||
if number1 < number2 {
|
||||
return number1
|
||||
}
|
||||
|
||||
return number2
|
||||
}
|
||||
|
||||
func Abs[NumType SignedNumberType](Num NumType) NumType {
|
||||
if Num < 0 {
|
||||
return -1 * Num
|
||||
}
|
||||
|
||||
return Num
|
||||
}
|
||||
167
util/queue/squeue.go
Normal file
167
util/queue/squeue.go
Normal file
@@ -0,0 +1,167 @@
|
||||
package queue
|
||||
|
||||
import (
|
||||
"sync"
|
||||
)
|
||||
|
||||
/*
|
||||
这是一个循环队列
|
||||
*/
|
||||
type SQueue[ElementType any] struct {
|
||||
elements []ElementType
|
||||
head int
|
||||
tail int
|
||||
locker sync.RWMutex
|
||||
}
|
||||
|
||||
//游标,通过该游标获取数据
|
||||
type SCursor[ElementType any] struct {
|
||||
pos int
|
||||
squeue *SQueue[ElementType]
|
||||
}
|
||||
|
||||
func NewSQueue[ElementType any](maxElementNum int) *SQueue[ElementType]{
|
||||
queue := &SQueue[ElementType]{}
|
||||
queue.elements = make([]ElementType,maxElementNum+1)
|
||||
|
||||
return queue
|
||||
}
|
||||
|
||||
//游标移动到队首
|
||||
func (s *SCursor[ElementType]) First(){
|
||||
s.squeue.locker.RLock()
|
||||
defer s.squeue.locker.RUnlock()
|
||||
s.pos = s.squeue.head
|
||||
}
|
||||
|
||||
//从当前位置移动游标,注意如果在多协程读或者pop时,可能会导致游标失效
|
||||
func (s *SCursor[ElementType]) Next() (elem ElementType,ret bool){
|
||||
s.squeue.locker.RLock()
|
||||
defer s.squeue.locker.RUnlock()
|
||||
|
||||
if s.pos == s.squeue.tail {
|
||||
return
|
||||
}
|
||||
|
||||
s.pos++
|
||||
s.pos = (s.pos)%(len(s.squeue.elements))
|
||||
return s.squeue.elements[s.pos],true
|
||||
}
|
||||
|
||||
//获取队列元数个数
|
||||
func (s *SQueue[ElementType]) Len() int {
|
||||
s.locker.RLock()
|
||||
defer s.locker.RUnlock()
|
||||
|
||||
return s.len()
|
||||
}
|
||||
|
||||
func (s *SQueue[ElementType]) len() int {
|
||||
if s.head <= s.tail {
|
||||
return s.tail - s.head
|
||||
}
|
||||
|
||||
//(len(s.elements)-1-s.head)+(s.tail+1)
|
||||
return len(s.elements)-s.head+s.tail
|
||||
}
|
||||
|
||||
//获取游标,默认是队首
|
||||
func (s *SQueue[ElementType]) GetCursor() (cur SCursor[ElementType]){
|
||||
s.locker.RLock()
|
||||
defer s.locker.RUnlock()
|
||||
|
||||
cur.squeue = s
|
||||
cur.pos = s.head
|
||||
return
|
||||
}
|
||||
|
||||
//获取指定位置的游标
|
||||
func (s *SQueue[ElementType]) GetPosCursor(pos int) (cur SCursor[ElementType],ret bool){
|
||||
s.locker.RLock()
|
||||
defer s.locker.RUnlock()
|
||||
|
||||
if s.head < s.tail {
|
||||
if pos<=s.head || pos>s.tail{
|
||||
return
|
||||
}
|
||||
|
||||
ret = true
|
||||
cur.squeue = s
|
||||
cur.pos = pos
|
||||
return
|
||||
}
|
||||
|
||||
if pos >s.tail && pos <=s.head {
|
||||
return
|
||||
}
|
||||
|
||||
cur.squeue = s
|
||||
cur.pos = pos
|
||||
return
|
||||
}
|
||||
|
||||
//从队首移除掉指定数量元素
|
||||
func (s *SQueue[ElementType]) RemoveElement(elementNum int) (removeNum int) {
|
||||
s.locker.Lock()
|
||||
defer s.locker.Unlock()
|
||||
|
||||
lens := s.len()
|
||||
if elementNum > lens{
|
||||
removeNum = lens
|
||||
}else{
|
||||
removeNum = elementNum
|
||||
}
|
||||
|
||||
|
||||
s.head = (s.head + removeNum)%len(s.elements)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
//从队首Pop元素
|
||||
func (s *SQueue[ElementType]) Pop() (elem ElementType,ret bool){
|
||||
s.locker.Lock()
|
||||
defer s.locker.Unlock()
|
||||
|
||||
if s.head == s.tail {
|
||||
return
|
||||
}
|
||||
|
||||
s.head++
|
||||
s.head = s.head%len(s.elements)
|
||||
return s.elements[s.head],true
|
||||
}
|
||||
|
||||
//从队尾Push数据
|
||||
func (s *SQueue[ElementType]) Push(elem ElementType) bool {
|
||||
s.locker.Lock()
|
||||
defer s.locker.Unlock()
|
||||
|
||||
nextPos := (s.tail+1) % len(s.elements)
|
||||
if nextPos == s.head {
|
||||
//is full
|
||||
return false
|
||||
}
|
||||
|
||||
s.tail = nextPos
|
||||
s.elements[s.tail] = elem
|
||||
return true
|
||||
}
|
||||
|
||||
//判断队列是否为空
|
||||
func (s *SQueue[ElementType]) IsEmpty() bool{
|
||||
s.locker.RLock()
|
||||
defer s.locker.RUnlock()
|
||||
|
||||
return s.head == s.tail
|
||||
}
|
||||
|
||||
//判断队列是否已满
|
||||
func (s *SQueue[ElementType]) IsFull() bool{
|
||||
s.locker.RLock()
|
||||
defer s.locker.RUnlock()
|
||||
|
||||
nextPos := (s.tail+1) % len(s.elements)
|
||||
return nextPos == s.head
|
||||
}
|
||||
|
||||
66
util/queue/syncqueue_test.go
Normal file
66
util/queue/syncqueue_test.go
Normal file
@@ -0,0 +1,66 @@
|
||||
package queue
|
||||
|
||||
import (
|
||||
"testing"
|
||||
)
|
||||
|
||||
func Test_Example(t *testing.T) {
|
||||
//1.创建阶列
|
||||
queue := NewSQueue[int](5)
|
||||
|
||||
//2.判断是否为空
|
||||
t.Log("is empty :", queue.IsEmpty())
|
||||
t.Log("is full :", queue.IsFull())
|
||||
|
||||
//3.游标使用,打印所有数据
|
||||
cursor := queue.GetCursor()
|
||||
cursor.First()
|
||||
for {
|
||||
elem, ret := cursor.Next()
|
||||
if ret == false {
|
||||
break
|
||||
}
|
||||
t.Log("elem:", elem)
|
||||
}
|
||||
|
||||
//4.push数据,塞满队列
|
||||
for i := 0; i < 6; i++ {
|
||||
t.Log("push:", queue.Push(i))
|
||||
}
|
||||
|
||||
t.Log("is empty :", queue.IsEmpty())
|
||||
t.Log("is full :", queue.IsFull())
|
||||
|
||||
//5.使用游标遍历所有数据
|
||||
cursor.First()
|
||||
for {
|
||||
elem, ret := cursor.Next()
|
||||
if ret == false {
|
||||
break
|
||||
}
|
||||
t.Log("elem:", elem)
|
||||
}
|
||||
|
||||
//6.删除2个元素
|
||||
removeNum := queue.RemoveElement(2)
|
||||
t.Log("Remove Num:", removeNum)
|
||||
|
||||
//7.游标遍历
|
||||
cursor.First()
|
||||
for {
|
||||
elem, ret := cursor.Next()
|
||||
if ret == false {
|
||||
break
|
||||
}
|
||||
t.Log("elem:", elem)
|
||||
}
|
||||
|
||||
//8.pop数据所有
|
||||
for i := 0; i < 6; i++ {
|
||||
elem, ret := queue.Pop()
|
||||
t.Log("pop:", elem, "-", ret, " len:", queue.Len())
|
||||
}
|
||||
|
||||
t.Log("is empty :", queue.IsEmpty())
|
||||
t.Log("is full :", queue.IsFull())
|
||||
}
|
||||
@@ -51,7 +51,6 @@ func NewPool(C chan interface{},New func()interface{}) *Pool{
|
||||
func NewPoolEx(C chan IPoolData,New func()IPoolData) *PoolEx{
|
||||
var pool PoolEx
|
||||
pool.C = C
|
||||
//pool.New = New
|
||||
pool.syncPool.New = func() interface{} {
|
||||
return New()
|
||||
}
|
||||
@@ -61,10 +60,18 @@ func NewPoolEx(C chan IPoolData,New func()IPoolData) *PoolEx{
|
||||
func (pool *PoolEx) Get() IPoolData{
|
||||
select {
|
||||
case d := <-pool.C:
|
||||
if d.IsRef() {
|
||||
panic("Pool data is in use.")
|
||||
}
|
||||
|
||||
d.Ref()
|
||||
return d
|
||||
default:
|
||||
data := pool.syncPool.Get().(IPoolData)
|
||||
if data.IsRef() {
|
||||
panic("Pool data is in use.")
|
||||
}
|
||||
|
||||
data.Ref()
|
||||
return data
|
||||
}
|
||||
@@ -76,7 +83,10 @@ func (pool *PoolEx) Put(data IPoolData){
|
||||
if data.IsRef() == false {
|
||||
panic("Repeatedly freeing memory")
|
||||
}
|
||||
//提前解引用,防止递归释放
|
||||
data.UnRef()
|
||||
data.Reset()
|
||||
//再次解引用,防止Rest时错误标记
|
||||
data.UnRef()
|
||||
select {
|
||||
case pool.C <- data:
|
||||
|
||||
Reference in New Issue
Block a user