collect optimize

This commit is contained in:
mubai
2026-04-01 00:21:29 +08:00
parent 8a6bc33cad
commit 8243d2171f
47 changed files with 2774 additions and 1770 deletions

View File

@@ -2,10 +2,11 @@ package system
import (
"fmt"
"gorm.io/gorm"
"log"
"server/config"
"server/plugin/db"
"gorm.io/gorm"
)
// FailureRecord 失败采集记录信息机构体
@@ -15,10 +16,11 @@ type FailureRecord struct {
OriginName string `json:"originName"` // 采集站唯一ID
Uri string `json:"uri"` // 采集源链接
CollectType ResourceType `json:"collectType"` // 采集类型
PageNumber int `json:"pageNumber"` // 页码
Hour int `json:"hour"` // 采集参数 h 时长
Cause string `json:"cause"` // 失败原因
Status int `json:"status"` // 重试状态
Params string `json:"params"` //采集参数
//PageNumber int `json:"pageNumber"` // 页码
Hour int `json:"hour"` // 采集参数 h 时长
Cause string `json:"cause"` // 失败原因
Status int `json:"status"` // 重试状态
}
// TableName 采集失败记录表表名
@@ -101,7 +103,7 @@ func PendingRecord() []FailureRecord {
db.Mdb.Where("(hour > 4320 OR hour < 0) AND status = 1").Find(&list)
// 2. 获取 hour > 0 && hour < 4320 && status = 1 的影片信息(只获取最早的一条记录)
var fr FailureRecord
db.Mdb.Where("hour > 0 AND hour < 4320 AND status = 1").Order("hour DESC, created_at ASC").First(&fr)
db.Mdb.Where("hour > 2 AND hour < 360 AND status = 1").Order("hour DESC, created_at ASC").First(&fr)
// 3. 将 fr 添加到 list中
list = append(list, fr)
return list
@@ -109,10 +111,12 @@ func PendingRecord() []FailureRecord {
// ChangeRecord 修改已完成二次采集的记录状态
func ChangeRecord(fr *FailureRecord, status int) {
// 获取采集参数中的h时间参数
h := fr.Hour
switch {
case fr.Hour > 168 && fr.Hour < 360:
db.Mdb.Model(&FailureRecord{}).Where("hour > 168 AND hour < 360 AND created_at >= ?", fr.CreatedAt).Update("status", status)
case fr.Hour < 0, fr.Hour > 4320:
case h > 2 && h < 360:
db.Mdb.Model(&FailureRecord{}).Where("hour > 2 AND hour < 360 AND created_at >= ?", fr.CreatedAt).Update("status", status)
case h < 0, h > 4320:
db.Mdb.Model(&FailureRecord{}).Where("id = ?", fr.ID).Update("status", status)
default:
// 其余范围,暂不处理

View File

@@ -78,7 +78,7 @@ func SaveUserToken(token string, userId uint) error {
func GetUserTokenById(userId uint) string {
token, err := db.Rdb.Get(db.Cxt, fmt.Sprintf(config.UserTokenKey, userId)).Result()
if err != nil {
log.Println(err)
log.Println("User Token Not Found: ", err)
return ""
}
return token

View File

@@ -15,6 +15,7 @@ import (
"strings"
"time"
"github.com/redis/go-redis/v9"
"gorm.io/gorm"
)
@@ -186,9 +187,12 @@ func AddMovieDetailIndex() {
func AddSlaveMovieInfoIndex() {
var s SlaveMovieInfo
tableName := s.TableName()
// 添加索引
db.Mdb.Exec(fmt.Sprintf("CREATE UNIQUE INDEX idx_mid ON %s (mid)", tableName))
db.Mdb.Exec(fmt.Sprintf("CREATE INDEX idx_dbId ON %s (db_id)", tableName))
// 如果不存在索引则创建对应索引
if !db.Mdb.Migrator().HasIndex(&s, "idx_mid") {
// 添加索引
db.Mdb.Exec(fmt.Sprintf("CREATE INDEX idx_mid ON %s (mid)", tableName))
db.Mdb.Exec(fmt.Sprintf("CREATE INDEX idx_dbId ON %s (db_id)", tableName))
}
}
// =================================== column序列化 接口========================================================
@@ -312,27 +316,6 @@ func ExistMovieDetailByMid(mid int64) bool {
return count > 0
}
// SaveAllSitePlayList 保存附属站点影片信息 (仅在全量采集时使调用)
func SaveAllSitePlayList(id string, ml []MovieDetail) (err error) {
// 如果ml 为空则直接返回
if len(ml) <= 0 {
return nil
}
var sl []SlaveMovieInfo
for _, m := range ml {
// 只执行保存操作, 不考虑更新情况
s := SlaveMovieInfo{Sid: id, Mid: GenerateHashKey(m.Name), DbId: m.DbId, PlayList: m.PlayList}
sl = append(sl, s)
}
// 将处理后的结果存储到 SalveMovieInfo表中
if len(sl) > 0 {
if err = db.Mdb.Create(&sl).Error; err != nil {
log.Println("附属站点影片信息保存失败: ", err)
}
}
return
}
// SaveSitePlayList 保存附属站点影片信息
func SaveSitePlayList(sl []SlaveMovieInfo) (err error) {
if len(sl) <= 0 {
@@ -371,6 +354,56 @@ func UpdateSitePlayList(id string, ml []MovieDetail) (err error) {
return
}
// BatchUpdateSlaveInfo 批量更新SlaveMovieInfo
func BatchUpdateSlaveInfo(sl []SlaveMovieInfo) (err error) {
// 如果ml 为空则直接返回
if len(sl) <= 0 {
return nil
}
//
var rl []SlaveMovieInfo
for _, s := range sl {
if id := ExistSlaveMovieInfo(s); id > 0 {
if err = db.Mdb.Model(&s).Where("id", id).Updates(s).Error; err != nil {
log.Println("附属站点影片信息更新失败: ", err)
}
continue
}
rl = append(rl, s)
}
if len(sl) > 0 {
if err = db.Mdb.Create(&sl).Error; err != nil {
log.Println("附属站点影片信息保存失败: ", err)
}
}
return err
}
// DelSlaveInfoBySid 删除sid对应的采集站的所有数据
func DelSlaveInfoBySid(id string) {
// 查询表中是否存在对应采集站的数据信息
var count int64
db.Mdb.Model(&SlaveMovieInfo{}).Count(&count).Where("sid = ?", id)
// 如果存在对应数据,则进行后续操作
if count > 0 {
for {
res := db.Mdb.Where("sid = ?", id).Limit(5000).Delete(&SlaveMovieInfo{})
if res.Error != nil {
log.Println("Delete SlaveMovieInfo Failed: ", res.Error)
break
}
if res.RowsAffected == 0 {
log.Println("Delete SlaveMovieInfo Over !!!")
break
}
// 短暂休眠, 防止mysql紊乱
time.Sleep(100 * time.Millisecond)
}
}
}
// ExistSlaveMovieInfo 查询对应记录, 如果存在则返还id, 不存在则返还 -1
func ExistSlaveMovieInfo(s SlaveMovieInfo) int64 {
var id int64
@@ -394,7 +427,7 @@ func MovieDetailCache(ml []MovieDetail) error {
var data = make(map[string]string)
for _, m := range ml {
r, _ := json.Marshal(m)
data[fmt.Sprint(m.Mid)] = string(r)
data[strconv.FormatInt(m.Mid, 10)] = string(r)
}
return db.Rdb.HSet(db.Cxt, config.MovieDetailKey, data).Err()
}
@@ -414,7 +447,7 @@ func SlaveDetailCache(id string, ml []MovieDetail) error {
}
// SyncMovieDetail 同步redis中的影片数据到mysql中
func SyncMovieDetail(sid string, grade SourceGrade) {
func SyncMovieDetail(sid string, grade SourceGrade, mode int) {
// 初始化游标
var cursor uint64 = 0
// 根据采集站的类型 Master | Slave 进行不同的处理逻辑
@@ -436,9 +469,21 @@ func SyncMovieDetail(sid string, grade SourceGrade) {
ml = append(ml, m)
}
// 批量保存movieDetail
if err := SaveDetails(ml); err != nil {
log.Println("SyncMovieDetail Failed: ", err)
switch mode {
case 0:
// 执行全量保存
if err := SaveDetails(ml); err != nil {
log.Println("SyncMovieDetail AllSave Failed: ", err)
}
case 1:
// 执行更新
if err := BatchUpdateDetails(ml); err != nil {
log.Println("SyncMovieDetail SaveOrUpdate Failed: ", err)
}
default:
log.Println("Synchronization Mode Exception:", mode)
}
// 删除已提取的元素
if err := db.Rdb.HDel(db.Cxt, config.MovieDetailKey, ks...).Err(); err != nil {
log.Println("DeleteMovieDetailCache Failed: ", err)
@@ -468,9 +513,19 @@ func SyncMovieDetail(sid string, grade SourceGrade) {
sl = append(sl, s)
}
// 批量保存movieDetail
err := SaveSitePlayList(sl)
if err != nil {
log.Println("SyncSlaveDetail Failed: ", err)
switch mode {
case 0:
// 执行全量保存
if err := SaveSitePlayList(sl); err != nil {
log.Println("SyncSlaveDetail AllSave Failed: ", err)
}
case 1:
// 执行更新
if err := BatchUpdateSlaveInfo(sl); err != nil {
log.Println("SyncSlaveDetail SaveOrUpdate Failed: ", err)
}
default:
log.Println("Synchronization Mode Exception:", mode)
}
// 删除已提取的元素
if err := db.Rdb.HDel(db.Cxt, fmt.Sprintf(config.MultipleSiteDetailKey, sid), ks...).Err(); err != nil {
@@ -492,15 +547,37 @@ func SyncMovieDetail(sid string, grade SourceGrade) {
// GetDetailByMid 获取影片对应的详情信息
func GetDetailByMid(mid int64) MovieDetail {
var d MovieDetail
// 查询mid对应的影片详情信息, 只查询部分字段
if err := db.Mdb.Model(&MovieDetail{}).Where("mid = ?", mid).First(&d).Error; err != nil {
// 初始化返回值
var m MovieDetail
// 从redis获取对应的影片信息
v, err := db.Rdb.HGet(db.Cxt, config.MovieDetailKey, strconv.FormatInt(mid, 10)).Result()
if err != nil {
// 如果没有获取到对应值, 则去mysql中进行查找
if errors.Is(err, redis.Nil) {
if err := db.Mdb.Model(&MovieDetail{}).Select("id, mid, cid, pid, name, sub_title, c_name, state, picture, actor, director,"+
" content, remarks, area, year").Where("mid = ?", mid).Find(&m).Error; err != nil {
log.Println("Find BasicInfo Failed: ", err)
return m
}
//// 执行本地图片匹配
ReplaceDetailPic(&m)
return m
}
log.Println("Find MovieDetail Failed: ", err)
return d
return m
}
// 执行本地图片匹配
ReplaceDetailPic(&d)
return d
// 如果获取到对应值,则进行反序列化
_ = json.Unmarshal([]byte(v), &m)
return m
//var d MovieDetail
//// 查询mid对应的影片详情信息, 只查询部分字段
//if err := db.Mdb.Model(&MovieDetail{}).Where("mid = ?", mid).First(&d).Error; err != nil {
// log.Println("Find MovieDetail Failed: ", err)
// return d
//}
//// 执行本地图片匹配
//ReplaceDetailPic(&d)
//return d
}
// GetBasicInfoByMid 获取Id对应的影片基本信息
@@ -523,20 +600,62 @@ func GetBasicInfoByMid(mid int64) MovieBasicInfo {
// GetBasicInfoByIds 通过searchInfo 获取影片的基本信息
func GetBasicInfoByIds(ids []int64) []MovieBasicInfo {
var ml []MovieDetail
// 初始化返回值
var l []MovieBasicInfo
// 首先从redis中获取影片的最新信息, 如果没有则转为去mysql表中获取
var ks []string
for _, id := range ids {
ks = append(ks, strconv.FormatInt(id, 10))
}
// 一次性获取所有
vs, err := db.Rdb.HMGet(db.Cxt, config.MovieDetailKey, ks...).Result()
if err != nil {
log.Println("Find MovieDetail Failed: ", err)
return l
}
// 迭代转换 basicInfo, 并将未获取到值的id进行整合
var newIds []int64
var ml []MovieDetail
if len(vs) > 0 {
for i, v := range vs {
if v != nil {
var m MovieDetail
_ = json.Unmarshal([]byte(v.(string)), &m)
ReplaceDetailPic(&m)
l = append(l, ConvertBasicInfo(m))
} else {
newIds = append(newIds, ids[i])
}
}
}
// 如果存在nil值,则去mysql进行补全
if len(newIds) > 0 {
if err := db.Mdb.Model(&MovieDetail{}).Select("id, mid, cid, pid, name, sub_title, c_name, state, picture, actor, director,"+
" content, remarks, area, year").Where("mid IN (?)", ids).Find(&ml).Error; err != nil {
log.Println("BatchFind BasicInfo Failed: ", err)
return nil
}
for _, m := range ml {
// 执行本地图片匹配
ReplaceDetailPic(&m)
l = append(l, ConvertBasicInfo(m))
}
}
//var ml []MovieDetail
//var l []MovieBasicInfo
// 使用in查询, 一次性拿到满足条件的数据
if err := db.Mdb.Model(&MovieDetail{}).Select("id, mid, cid, pid, name, sub_title, c_name, state, picture, actor, director,"+
" content, remarks, area, year").Where("mid IN (?)", ids).Find(&ml).Error; err != nil {
log.Println("BatchFind BasicInfo Failed: ", err)
return nil
}
// 将查询到的结果批量转化为BasicInfo
for _, m := range ml {
// 执行本地图片匹配
ReplaceDetailPic(&m)
l = append(l, ConvertBasicInfo(m))
}
//if err := db.Mdb.Model(&MovieDetail{}).Select("id, mid, cid, pid, name, sub_title, c_name, state, picture, actor, director,"+
// " content, remarks, area, year").Where("mid IN (?)", ids).Find(&ml).Error; err != nil {
// log.Println("BatchFind BasicInfo Failed: ", err)
// return nil
//}
//// 将查询到的结果批量转化为BasicInfo
//for _, m := range ml {
// // 执行本地图片匹配
// ReplaceDetailPic(&m)
// l = append(l, ConvertBasicInfo(m))
//}
return l
}
@@ -619,14 +738,41 @@ func GetRelateMovieBasicInfo(search SearchInfo, page *Page) []MovieBasicInfo {
}
// GetMultiplePlay 通过影片名的ID值匹配播放源, 不区分站点
func GetMultiplePlay(mIds []string, dbIds int64) []SlaveMovieInfo {
func GetMultiplePlay(mIds []string, dbId int64) []SlaveMovieInfo {
// 初始化返回值
var l []SlaveMovieInfo
// 通过siteId, mIds, dbIds 检索满足条件的数据
if err := db.Mdb.Model(&SlaveMovieInfo{}).Select("sid, play_list").Where("mid IN (?) OR db_id = ?", mIds, dbIds).Find(&l).Error; err != nil {
log.Println("GetMultiplePlay Failed: ", err)
return nil
// 首先从redis进行匹配
for _, c := range GetCollectSourceListByGrade(SlaveCollect) {
if !c.State {
continue
}
var s SlaveMovieInfo
for _, mid := range mIds {
// 初始化临时变量 SlaveMovieInfo
v, err := db.Rdb.HGet(db.Cxt, fmt.Sprintf(config.MultipleSiteDetailKey, c.Id), mid).Result()
if err != nil {
// 如果没有获取到对应值, 则直接continue
continue
}
// 如果获取到数据则直接退出本次循环
_ = json.Unmarshal([]byte(v), &s)
l = append(l, s)
break
}
// 如果迭代完s依旧为空,则去mysql中进行匹配
if s.Mid == "" {
if err := db.Mdb.Model(&SlaveMovieInfo{}).Select("sid, play_list").Where("sid = ? AND (mid IN (?) OR db_id = ?)", c.Id, mIds, dbId).First(&s).Error; err != nil {
log.Println("GetMultiplePlay Failed: ", err)
continue
}
l = append(l, s)
}
}
// 通过siteId, mIds, dbIds 检索满足条件的数据
//if err := db.Mdb.Model(&SlaveMovieInfo{}).Select("sid, play_list").Where("mid IN (?) OR db_id = ?", mIds, dbId).Find(&l).Error; err != nil {
// log.Println("GetMultiplePlay Failed: ", err)
// return nil
//}
return l
}

View File

@@ -75,6 +75,8 @@ func AddSearchIndex() {
tableName := s.TableName()
// 添加索引
db.Mdb.Exec(fmt.Sprintf("CREATE UNIQUE INDEX idx_mid ON %s (mid)", tableName))
db.Mdb.Exec(fmt.Sprintf("CREATE INDEX idx_pid ON %s (pid)", tableName))
db.Mdb.Exec(fmt.Sprintf("CREATE INDEX idx_cid ON %s (cid)", tableName))
db.Mdb.Exec(fmt.Sprintf("CREATE INDEX idx_time ON %s (update_stamp DESC)", tableName))
db.Mdb.Exec(fmt.Sprintf("CREATE INDEX idx_hits ON %s (hits DESC)", tableName))
db.Mdb.Exec(fmt.Sprintf("CREATE INDEX idx_score ON %s (score DESC)", tableName))
@@ -274,31 +276,32 @@ func SaveSearchInfo(s SearchInfo) error {
}
// BatchSaveOrUpdate 判断数据库中是否存在对应mid的数据, 如果存在则更新, 否则插入
func BatchSaveOrUpdate(list []SearchInfo) {
tx := db.Mdb.Begin()
for _, info := range list {
var count int64
// 通过当前影片id 对应的记录数
tx.Model(&SearchInfo{}).Where("mid", info.Mid).Count(&count)
// 如果存在对应数据则进行更新, 否则保存相应数据
if count > 0 {
// 记录已经存在则执行更新部分内容
err := tx.Model(&SearchInfo{}).Where("mid", info.Mid).Updates(SearchInfo{UpdateStamp: info.UpdateStamp, Hits: info.Hits, State: info.State,
Remarks: info.Remarks, Score: info.Score, ReleaseStamp: info.ReleaseStamp}).Error
func BatchSaveOrUpdate(ml []MovieDetail) error {
//
var sl []SearchInfo
for _, m := range ml {
s := ConvertSearchInfo(m)
// 如果存在对应数据则直接进行更新操作
if ExistSearchInfo(s.Mid) {
// 如果已经存在当前记录则将当前记录进行更新
err := db.Mdb.Model(&SearchInfo{}).Where("mid", s.Mid).Updates(SearchInfo{UpdateStamp: s.UpdateStamp, Hits: s.Hits, State: s.State,
Remarks: s.Remarks, Score: s.Score, ReleaseStamp: s.ReleaseStamp}).Error
if err != nil {
tx.Rollback()
log.Println("Save Search Info error: ", err)
}
} else {
// 执行插入操作
if err := tx.Create(&info).Error; err != nil {
tx.Rollback()
}
// 插入成功后保存一份tag信息到redis中
BatchHandleSearchTag(info)
break
}
// 如果不存在对应信息则保存一份tag
BatchHandleSearchTag(s)
sl = append(sl, s)
}
// 将需要添加的信息切片进行整合,统一添加
if len(sl) > 0 {
if err := db.Mdb.Create(&sl).Error; err != nil {
return err
}
}
// 提交事务
tx.Commit()
return nil
}
// BatchSaveSearchInfo 批量保存Search信息(全量采集时使用,不考虑更新情况)