Files
origin/sysmodule/mongomodule/mongomodule.go
2021-03-12 12:08:28 +08:00

130 lines
2.5 KiB
Go

package mongomodule
import (
"github.com/duanhf2012/origin/log"
"gopkg.in/mgo.v2"
"gopkg.in/mgo.v2/bson"
"sync"
"time"
_ "gopkg.in/mgo.v2"
)
// session
type Session struct {
*mgo.Session
}
type DialContext struct {
sync.Mutex
sessions []*Session
sessionNum uint32
takeSessionIdx uint32
}
type MongoModule struct {
dailContext *DialContext
}
func (slf *MongoModule) Init(url string,sessionNum uint32,dialTimeout time.Duration, timeout time.Duration) error {
var err error
slf.dailContext, err = dialWithTimeout(url, sessionNum, dialTimeout, timeout)
return err
}
func (slf *MongoModule) Take() *Session{
return slf.dailContext.Take()
}
// goroutine safe
func dialWithTimeout(url string, sessionNum uint32, dialTimeout time.Duration, timeout time.Duration) (*DialContext, error) {
if sessionNum <= 0 {
sessionNum = 100
log.Release("invalid sessionNum, reset to %v", sessionNum)
}
s, err := mgo.DialWithTimeout(url, dialTimeout)
if err != nil {
return nil, err
}
s.SetMode(mgo.Strong,true)
s.SetSyncTimeout(timeout)
s.SetSocketTimeout(timeout)
c := new(DialContext)
// sessions
c.sessions = make([]*Session, sessionNum)
c.sessions[0] = &Session{s}
for i:=uint32(1) ;i< sessionNum;i++{
c.sessions[i] = &Session{s.New()}
}
c.sessionNum = sessionNum
return c, nil
}
// goroutine safe
func (c *DialContext) Close() {
c.Lock()
for _, s := range c.sessions {
s.Close()
}
c.Unlock()
}
func (c *DialContext) Take()*Session{
c.Lock()
idx := c.takeSessionIdx %c.sessionNum
c.takeSessionIdx++
c.Unlock()
return c.sessions[idx]
}
// goroutine safe
func (s *Session) EnsureCounter(db string, collection string, id string) error {
err := s.DB(db).C(collection).Insert(bson.M{
"_id": id,
"seq": 0,
})
if mgo.IsDup(err) {
return nil
} else {
return err
}
}
// goroutine safe
func (s *Session) NextSeq(db string, collection string, id string) (int, error) {
var res struct {
Seq int
}
_, err := s.DB(db).C(collection).FindId(id).Apply(mgo.Change{
Update: bson.M{"$inc": bson.M{"seq": 1}},
ReturnNew: true,
}, &res)
return res.Seq, err
}
// goroutine safe
func (s *Session) EnsureIndex(db string, collection string, key []string) error {
return s.DB(db).C(collection).EnsureIndex(mgo.Index{
Key: key,
Unique: false,
Sparse: true,
})
}
// goroutine safe
func (s *Session) EnsureUniqueIndex(db string, collection string, key []string) error {
return s.DB(db).C(collection).EnsureIndex(mgo.Index{
Key: key,
Unique: true,
Sparse: true,
})
}