diff --git a/sysservice/synccacheservice/synccacheservice.go b/sysservice/synccacheservice/synccacheservice.go new file mode 100644 index 0000000..8e8bcdc --- /dev/null +++ b/sysservice/synccacheservice/synccacheservice.go @@ -0,0 +1,167 @@ +package synccacheservice + +import ( + "encoding/json" + "errors" + "fmt" + "time" + + "github.com/duanhf2012/origin/cluster" + "github.com/duanhf2012/origin/service" + "github.com/duanhf2012/origin/util" +) + +const ( + MAX_SYNC_DATA_CHAN_NUM = 10000 +) + +//CReportService ... +type CSyncCacheService struct { + service.BaseService + mapCache util.Map + syncQueue *util.SyncQueue + + nodeIdList []int + syncDataChanList []chan *SyncCacheData +} + +type SyncCacheData struct { + OperType int8 //0 表示添加或者更新 1表示删除 + Key string + Val string + Wxpire int32 //ms + NodeIdList []int + reTryCount uint32 + reTryTime int64 +} + +//OnInit ... +func (slf *CSyncCacheService) OnInit() error { + slf.syncQueue = util.NewSyncQueue() + var callServiceName string + slf.nodeIdList = cluster.InstanceClusterMgr().GetNodeList("CSyncCacheService.RPC_SyncString", &callServiceName) + for _, nodeId := range slf.nodeIdList { + syncCacheData := make(chan *SyncCacheData, MAX_SYNC_DATA_CHAN_NUM) + slf.syncDataChanList = append(slf.syncDataChanList, syncCacheData) + go slf.syncRouter(nodeId, syncCacheData) + } + + return nil +} + +func (slf *CSyncCacheService) syncRouter(nodeId int, syncDataChan chan *SyncCacheData) error { + + tryCount := 0 + for { + select { + case <-slf.ExitChan: + break + case data := <-syncDataChan: + var ret int + cluster.CallNode(nodeId, "CSyncCacheService.RPC_SyncString", data, &ret) + if ret == 0 { + if tryCount < 3 { + time.Sleep(800 * time.Millisecond) + } else { + time.Sleep(1500 * time.Millisecond) + } + + slf.tryPushSyncData(syncDataChan, data) + tryCount++ + } else { + tryCount = 0 + } + } + } + + return nil +} + +func (slf *CSyncCacheService) tryPushSyncData(syncDataChan chan *SyncCacheData, syncData *SyncCacheData) bool { + if len(syncDataChan) >= MAX_SYNC_DATA_CHAN_NUM { + return false + } + syncDataChan <- syncData + + return true +} + +func (slf *CSyncCacheService) RPC_SyncString(request *SyncCacheData, ret *int) error { + + if request.OperType == 0 { + slf.mapCache.Set(request.Key, request.Val) + } else { + slf.mapCache.Del(request.Key) + } + + *ret = 1 + return nil +} + +func SetStringJson(key string, val interface{}) error { + + byteBuf, err := json.Marshal(val) + if err != nil { + return err + } + + SetString(key, string(byteBuf)) + return nil +} + +func GetStringJson(key string, val interface{}) error { + ret, err := GetString(key) + if err != nil { + return err + } + + err = json.Unmarshal([]byte(ret), val) + return err +} + +func DelString(key string) error { + pubcacheservice := service.InstanceServiceMgr().FindService("CSyncCacheService") + if pubcacheservice == nil { + return errors.New("Cannot find CSyncCacheService") + } + + pPubCacheService := pubcacheservice.(*CSyncCacheService) + syncCacheData := SyncCacheData{1, key, "", 0, pPubCacheService.nodeIdList[:], 0, 0} + + for _, syncChan := range pPubCacheService.syncDataChanList { + pPubCacheService.tryPushSyncData(syncChan, &syncCacheData) + } + + return nil +} + +func SetString(key string, val string) error { + pubcacheservice := service.InstanceServiceMgr().FindService("CSyncCacheService") + if pubcacheservice == nil { + return errors.New("Cannot find CSyncCacheService") + } + + //同步所有远程结点 + pPubCacheService := pubcacheservice.(*CSyncCacheService) + syncCacheData := SyncCacheData{0, key, val, 0, pPubCacheService.nodeIdList[:], 0, 0} + for _, syncChan := range pPubCacheService.syncDataChanList { + pPubCacheService.tryPushSyncData(syncChan, &syncCacheData) + } + + return nil +} + +func GetString(key string) (string, error) { + pubcacheservice := service.InstanceServiceMgr().FindService("CSyncCacheService") + if pubcacheservice == nil { + return "", errors.New("Cannot find CSyncCacheService") + } + + pPubCacheService := pubcacheservice.(*CSyncCacheService) + ret := pPubCacheService.mapCache.Get(key) + if ret == nil { + return "", errors.New(fmt.Sprintf("Cannot find key :%s", key)) + } + + return ret.(string), nil +}