mirror of
https://github.com/duanhf2012/origin.git
synced 2026-02-16 00:44:42 +08:00
168 lines
3.8 KiB
Go
168 lines
3.8 KiB
Go
package synccacheservice
|
|
|
|
import (
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"time"
|
|
|
|
"github.com/duanhf2012/origin/cluster"
|
|
"github.com/duanhf2012/origin/service"
|
|
"github.com/duanhf2012/origin/util"
|
|
)
|
|
|
|
const (
|
|
MAX_SYNC_DATA_CHAN_NUM = 10000
|
|
)
|
|
|
|
//CReportService ...
|
|
type CSyncCacheService struct {
|
|
service.BaseService
|
|
mapCache util.Map
|
|
syncQueue *util.SyncQueue
|
|
|
|
nodeIdList []int
|
|
syncDataChanList []chan *SyncCacheData
|
|
}
|
|
|
|
type SyncCacheData struct {
|
|
OperType int8 //0 表示添加或者更新 1表示删除
|
|
Key string
|
|
Val string
|
|
Wxpire int32 //ms
|
|
NodeIdList []int
|
|
reTryCount uint32
|
|
reTryTime int64
|
|
}
|
|
|
|
//OnInit ...
|
|
func (slf *CSyncCacheService) OnInit() error {
|
|
slf.syncQueue = util.NewSyncQueue()
|
|
var callServiceName string
|
|
slf.nodeIdList = cluster.InstanceClusterMgr().GetNodeList("CSyncCacheService.RPC_SyncString", &callServiceName, nil)
|
|
for _, nodeId := range slf.nodeIdList {
|
|
syncCacheData := make(chan *SyncCacheData, MAX_SYNC_DATA_CHAN_NUM)
|
|
slf.syncDataChanList = append(slf.syncDataChanList, syncCacheData)
|
|
go slf.syncRouter(nodeId, syncCacheData)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (slf *CSyncCacheService) syncRouter(nodeId int, syncDataChan chan *SyncCacheData) error {
|
|
|
|
tryCount := 0
|
|
for {
|
|
select {
|
|
case <-slf.ExitChan:
|
|
break
|
|
case data := <-syncDataChan:
|
|
var ret int
|
|
cluster.CallNode(nodeId, "CSyncCacheService.RPC_SyncString", data, &ret)
|
|
if ret == 0 {
|
|
if tryCount < 3 {
|
|
time.Sleep(800 * time.Millisecond)
|
|
} else {
|
|
time.Sleep(1500 * time.Millisecond)
|
|
}
|
|
|
|
slf.tryPushSyncData(syncDataChan, data)
|
|
tryCount++
|
|
} else {
|
|
tryCount = 0
|
|
}
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (slf *CSyncCacheService) tryPushSyncData(syncDataChan chan *SyncCacheData, syncData *SyncCacheData) bool {
|
|
if len(syncDataChan) >= MAX_SYNC_DATA_CHAN_NUM {
|
|
return false
|
|
}
|
|
syncDataChan <- syncData
|
|
|
|
return true
|
|
}
|
|
|
|
func (slf *CSyncCacheService) RPC_SyncString(request *SyncCacheData, ret *int) error {
|
|
|
|
if request.OperType == 0 {
|
|
slf.mapCache.Set(request.Key, request.Val)
|
|
} else {
|
|
slf.mapCache.Del(request.Key)
|
|
}
|
|
|
|
*ret = 1
|
|
return nil
|
|
}
|
|
|
|
func SetStringJson(key string, val interface{}) error {
|
|
|
|
byteBuf, err := json.Marshal(val)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
SetString(key, string(byteBuf))
|
|
return nil
|
|
}
|
|
|
|
func GetStringJson(key string, val interface{}) error {
|
|
ret, err := GetString(key)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
err = json.Unmarshal([]byte(ret), val)
|
|
return err
|
|
}
|
|
|
|
func DelString(key string) error {
|
|
pubcacheservice := service.InstanceServiceMgr().FindService("CSyncCacheService")
|
|
if pubcacheservice == nil {
|
|
return errors.New("Cannot find CSyncCacheService")
|
|
}
|
|
|
|
pPubCacheService := pubcacheservice.(*CSyncCacheService)
|
|
syncCacheData := SyncCacheData{1, key, "", 0, pPubCacheService.nodeIdList[:], 0, 0}
|
|
|
|
for _, syncChan := range pPubCacheService.syncDataChanList {
|
|
pPubCacheService.tryPushSyncData(syncChan, &syncCacheData)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func SetString(key string, val string) error {
|
|
pubcacheservice := service.InstanceServiceMgr().FindService("CSyncCacheService")
|
|
if pubcacheservice == nil {
|
|
return errors.New("Cannot find CSyncCacheService")
|
|
}
|
|
|
|
//同步所有远程结点
|
|
pPubCacheService := pubcacheservice.(*CSyncCacheService)
|
|
syncCacheData := SyncCacheData{0, key, val, 0, pPubCacheService.nodeIdList[:], 0, 0}
|
|
for _, syncChan := range pPubCacheService.syncDataChanList {
|
|
pPubCacheService.tryPushSyncData(syncChan, &syncCacheData)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func GetString(key string) (string, error) {
|
|
pubcacheservice := service.InstanceServiceMgr().FindService("CSyncCacheService")
|
|
if pubcacheservice == nil {
|
|
return "", errors.New("Cannot find CSyncCacheService")
|
|
}
|
|
|
|
pPubCacheService := pubcacheservice.(*CSyncCacheService)
|
|
ret := pPubCacheService.mapCache.Get(key)
|
|
if ret == nil {
|
|
return "", errors.New(fmt.Sprintf("Cannot find key :%s", key))
|
|
}
|
|
|
|
return ret.(string), nil
|
|
}
|