failture record recover

This commit is contained in:
mubai
2025-03-22 23:05:02 +08:00
parent fab3fb73e7
commit c1e28380d0
18 changed files with 496 additions and 120 deletions

View File

@@ -70,7 +70,7 @@ const (
// DefaultUpdateSpec 每20分钟执行一次
DefaultUpdateSpec = "0 */20 * * * ?"
// EveryWeekSpec 每周日凌晨4点更新一次
EveryWeekSpec = "0 0 4 * * 7"
EveryWeekSpec = "0 0 4 * * 0"
// DefaultUpdateTime 每次采集最近 3 小时内更新的影片
DefaultUpdateTime = 3
)
@@ -93,7 +93,7 @@ const (
//mysql服务配置信息 root:root 设置mysql账户的用户名和密码
MysqlDsn = "root:root@(192.168.20.5:3306)/FilmSite?charset=utf8mb4&parseTime=True&loc=Local"
//MysqlDsn = "root:MuBai0916$@(1.94.30.26:3610)/FilmSite?charset=utf8mb4&parseTime=True&loc=Local"
//MysqlDsn = "root:MuBai0916$@(113.44.5.201:3610)/FilmSite?charset=utf8mb4&parseTime=True&loc=Local"
// MysqlDsn docker compose 环境下的链接信息 mysql:3306 为 docker compose 中 mysql服务对应的网络名称和端口
//MysqlDsn = "root:root@(mysql:3306)/FilmSite?charset=utf8mb4&parseTime=True&loc=Local"
@@ -104,8 +104,10 @@ const (
RedisPassword redis访问密码
RedisDBNo 使用第几号库
*/
//RedisAddr = `1.94.30.26:3620`
//RedisAddr = `113.44.5.201:3620`
//RedisPassword = `MuBai0916$`
//RedisDBNo = 0
RedisAddr = `192.168.20.5:6379`
RedisPassword = `root`
RedisDBNo = 0

View File

@@ -7,6 +7,7 @@ import (
"server/model/system"
"server/plugin/spider"
"strconv"
"time"
)
// ------------------------------------------------------ 影视采集 ------------------------------------------------------
@@ -199,10 +200,30 @@ func FailureRecordList(c *gin.Context) {
var err error
// 获取筛选条件
params.OriginId = c.DefaultQuery("originId", "")
params.CollectType, err = strconv.Atoi(c.DefaultQuery("collectType", "-1"))
//params.CollectType, err = strconv.Atoi(c.DefaultQuery("collectType", "-1"))
params.Hour, err = strconv.Atoi(c.DefaultQuery("hour", "0"))
params.Status, err = strconv.Atoi(c.DefaultQuery("status", "-1"))
// 处理时间参数
begin := c.DefaultQuery("beginTime", "")
if begin != "" {
beginTime, e := time.ParseInLocation(time.DateTime, begin, time.Local)
if e != nil {
system.Failed("影片分页数据获取失败, 请求参数异常", c)
return
}
params.BeginTime = beginTime
}
end := c.DefaultQuery("endTime", "")
if end != "" {
endTime, e := time.ParseInLocation(time.DateTime, end, time.Local)
if e != nil {
system.Failed("影片分页数据获取失败, 请求参数异常", c)
return
}
params.EndTime = endTime
}
// 分页参数
params.Paging.Current, err = strconv.Atoi(c.DefaultQuery("current", "1"))
params.Paging.PageSize, err = strconv.Atoi(c.DefaultQuery("pageSize", "10"))
@@ -215,12 +236,15 @@ func FailureRecordList(c *gin.Context) {
params.Paging.PageSize = 10
}
// 条件筛选select选项参数
options := logic.CollectL.GetRecordOptions()
// 获取满足条件的分页数据
list := logic.CollectL.GetRecordList(params)
system.Success(gin.H{"params": params, "list": list}, "影片分页信息获取成功", c)
system.Success(gin.H{"params": params, "list": list, "options": options}, "影片分页信息获取成功", c)
}
// CollectRecover 对失败的采集进行
// CollectRecover 对失败的采集进行处理
func CollectRecover(c *gin.Context) {
// 获取记录id
id, err := strconv.Atoi(c.DefaultQuery("id", "0"))
@@ -236,3 +260,24 @@ func CollectRecover(c *gin.Context) {
}
system.SuccessOnlyMsg("采集重试已开启, 请勿重复操作", c)
}
// CollectRecoverAll 恢复采集-全量
func CollectRecoverAll(c *gin.Context) {
// 重新采集表中所有失败记录
logic.CollectL.RecoverAll()
system.SuccessOnlyMsg("恢复任务已成功开启!!!", c)
}
// ClearDoneRecord 清理已处理的记录
func ClearDoneRecord(c *gin.Context) {
// 删除表中已处理完成的记录
logic.CollectL.ClearDoneRecord()
system.SuccessOnlyMsg("处理完成的记录信息已删除!!!", c)
}
// ClearAllRecord 删除所有记录
func ClearAllRecord(c *gin.Context) {
// 截断失败采集记录表
logic.CollectL.ClearAllRecord()
system.SuccessOnlyMsg("采集异常记录信息已清空!!!", c)
}

View File

@@ -140,17 +140,34 @@ func validTaskInfo(t system.FilmCollectTask) error {
// 任务添加参数校验
func validTaskAddVo(vo system.FilmCronVo) error {
if vo.Model != 0 && vo.Model != 1 && vo.Model != 2 {
switch vo.Model {
case 0:
if vo.Time == 0 {
return errors.New("参数校验失败, 采集时长不能为零值")
}
case 1:
if vo.Time == 0 {
return errors.New("参数校验失败, 采集时长不能为零值")
}
if vo.Ids == nil || len(vo.Ids) <= 0 {
return errors.New("参数校验失败, 自定义更新未绑定任何资源站点")
}
case 2:
break
default:
return errors.New("参数校验失败, 未定义的任务类型")
}
if vo.Time == 0 {
return errors.New("参数校验失败, 采集时长不能为零值")
}
//if vo.Model != 0 && vo.Model != 1 && vo.Model != 2 {
// return errors.New("参数校验失败, 未定义的任务类型")
//}
//if vo.Time == 0 {
// return errors.New("参数校验失败, 采集时长不能为零值")
//}
if err := spider.ValidSpec(vo.Spec); err != nil {
return errors.New(fmt.Sprint("参数校验失败 cron表达式校验失败: ", err.Error()))
}
if vo.Model == 1 && (vo.Ids == nil || len(vo.Ids) <= 0) {
return errors.New("参数校验失败, 自定义更新未绑定任何资源站点")
}
//if vo.Model == 1 && (vo.Ids == nil || len(vo.Ids) <= 0) {
// return errors.New("参数校验失败, 自定义更新未绑定任何资源站点")
//}
return nil
}

View File

@@ -56,6 +56,21 @@ func (cl *CollectLogic) GetRecordList(params system.RecordRequestVo) []system.Fa
return system.FailureRecordList(params)
}
// GetRecordOptions 获取采集记录筛选参数
func (cl *CollectLogic) GetRecordOptions() system.OptionGroup {
var options = make(system.OptionGroup)
// 获取筛选参数, 采集源(ID:name) | 采集类型 | 状态
options["collectType"] = []system.Option{{"全部", -1}, {"影片详情", 0}, {"文章", 1}, {"演员", 2}, {"角色", 3}, {"网站", 4}}
options["status"] = []system.Option{{"全部", -1}, {"待重试", 1}, {"已处理", 0}}
// 获取全部采集站
var originOptions = []system.Option{{"全部", ""}}
for _, v := range system.GetCollectSourceList() {
originOptions = append(originOptions, system.Option{Name: v.Name, Value: v.Id})
}
options["origin"] = originOptions
return options
}
// CollectRecover 恢复采集
func (cl *CollectLogic) CollectRecover(id int) error {
// 通过ID获取完整的失败记录信息
@@ -69,3 +84,23 @@ func (cl *CollectLogic) CollectRecover(id int) error {
return nil
}
// RecoverAll 恢复重新对所有失效记录进行重新采集处理
func (cl *CollectLogic) RecoverAll() {
// 是否进行身份验证, 暂定无需处理
// 对数据表中的所有待处理记录进行恢复采集
go spider.FullRecoverSpider()
}
// ClearDoneRecord 清除已处理完成的记录信息 (将记录表中已经完成处理的记录删除)
func (cl *CollectLogic) ClearDoneRecord() {
// <逻辑删除 or 真实删除> 为避免ID中断暂定逻辑删除
system.DelDoneRecord()
}
// ClearAllRecord 清除所有记录信息 (直接对记录表直接进行截断处理)
func (cl *CollectLogic) ClearAllRecord() {
// 重置记录表状态, 删除所有数据并将自增ID归零
system.TruncateRecordTable()
}

View File

@@ -44,7 +44,7 @@ func (cl *CronLogic) AddFilmCrontab(cv system.FilmCronVo) error {
cid, err := spider.AddFilmRecoverCron(task.Spec)
// 如果任务添加失败则直接返回错误信息
if err != nil {
return errors.New(fmt.Sprint("影视更新定时任务添加失败: ", err.Error()))
return errors.New(fmt.Sprint("失败采集处理定时任务添加失败: ", err.Error()))
}
// 将定时任务Id记录到Task中
task.Cid = cid

View File

@@ -1,6 +1,7 @@
package system
import (
"fmt"
"gorm.io/gorm"
"log"
"server/config"
@@ -50,15 +51,18 @@ func FailureRecordList(vo RecordRequestVo) []FailureRecord {
if vo.OriginId != "" {
qw.Where("origin_id = ?", vo.OriginId)
}
if !vo.BeginTime.IsZero() && !vo.EndTime.IsZero() {
qw.Where("created_at BETWEEN ? AND ? ", vo.BeginTime, vo.EndTime)
}
//if vo.CollectType >= 0 {
// qw.Where("collect_type = ?", vo.CollectType)
//}
//if vo.Hour != 0 {
// qw.Where("hour = ?", vo.Hour)
//}
//if vo.Status >= 0 {
// qw.Where("status = ?", vo.Status)
//}
if vo.Status >= 0 {
qw.Where("status = ?", vo.Status)
}
// 获取分页数据
GetPage(qw, vo.Paging)
@@ -114,3 +118,19 @@ func RetryRecord(id uint, status int64) error {
return db.Mdb.Model(&FailureRecord{}).Where("update_at > ?", fr.UpdatedAt).Update("status", 0).Error
}
// DelDoneRecord 删除已处理的记录信息 -- 逻辑删除
func DelDoneRecord() {
if err := db.Mdb.Where("status = ?", 0).Delete(&FailureRecord{}).Error; err != nil {
log.Println("Delete failure record failed:", err)
}
}
// TruncateRecordTable 截断 record table
func TruncateRecordTable() {
var s FailureRecord
err := db.Mdb.Exec(fmt.Sprintf("TRUNCATE Table %s", s.TableName())).Error
if err != nil {
log.Println("TRUNCATE TABLE Error: ", err)
}
}

View File

@@ -73,19 +73,19 @@ func FilmZero() {
db.Rdb.Del(db.Cxt, db.Rdb.Keys(db.Cxt, "OriginalResource*").Val()...)
db.Rdb.Del(db.Cxt, db.Rdb.Keys(db.Cxt, "Search*").Val()...)
// 删除mysql中留存的检索表
var s *SearchInfo
var s SearchInfo
//db.Mdb.Exec(fmt.Sprintf(`drop table if exists %s`, s.TableName()))
// 截断数据表 truncate table users
if ExistSearchTable() {
db.Mdb.Exec(fmt.Sprintf(`TRUNCATE table %s`, s.TableName()))
db.Mdb.Exec(fmt.Sprintf("TRUNCATE table %s", s.TableName()))
}
}
// ResetSearchTable 重置Search表
func ResetSearchTable() {
// 删除 Search 表
var s *SearchInfo
db.Mdb.Exec(fmt.Sprintf(`drop table if exists %s`, s.TableName()))
var s SearchInfo
db.Mdb.Exec(fmt.Sprintf("drop table if exists %s", s.TableName()))
// 重新创建 Search 表
CreateSearchTable()
}
@@ -230,6 +230,7 @@ func CreateSearchTable() {
}
}
// ExistSearchTable 是否存在Search Table
func ExistSearchTable() bool {
// 1. 判断表中是否存在当前表
return db.Mdb.Migrator().HasTable(&SearchInfo{})
@@ -237,7 +238,7 @@ func ExistSearchTable() bool {
// AddSearchIndex search表中数据保存完毕后 将常用字段添加索引提高查询效率
func AddSearchIndex() {
var s *SearchInfo
var s SearchInfo
tableName := s.TableName()
// 添加索引
db.Mdb.Exec(fmt.Sprintf("CREATE UNIQUE INDEX idx_mid ON %s (mid)", tableName))
@@ -331,8 +332,8 @@ func ExistSearchInfo(mid int64) bool {
// TunCateSearchTable 截断SearchInfo数据表
func TunCateSearchTable() {
var searchInfo *SearchInfo
err := db.Mdb.Exec(fmt.Sprint("TRUNCATE TABLE ", searchInfo.TableName())).Error
var searchInfo SearchInfo
err := db.Mdb.Exec(fmt.Sprintf("TRUNCATE TABLE %s", searchInfo.TableName())).Error
if err != nil {
log.Println("TRUNCATE TABLE Error: ", err)
}

View File

@@ -36,7 +36,7 @@ func CreateUserTable() {
// 如果不存在则创建表 并设置自增ID初始值为10000
if !ExistUserTable() {
err := db.Mdb.AutoMigrate(u)
db.Mdb.Exec(fmt.Sprintf("alter table %s auto_Increment=%d", u.TableName(), config.UserIdInitialVal))
db.Mdb.Exec(fmt.Sprintf("alter table %s auto_Increment = %s", u.TableName(), config.UserIdInitialVal))
if err != nil {
log.Println("Create Table SearchInfo Failed: ", err)
}

View File

@@ -1,5 +1,7 @@
package system
import "time"
// SearchTagsVO 搜索标签请求参数
type SearchTagsVO struct {
Pid int64 `json:"pid"`
@@ -34,6 +36,12 @@ type FilmTaskOptions struct {
Name string `json:"name"`
}
type Option struct {
Name string `json:"name"`
Value any `json:"value"`
}
type OptionGroup map[string][]Option
// CollectParams 数据采集所需要的参数
type CollectParams struct {
Id string `json:"id"` // 资源站id
@@ -116,9 +124,11 @@ type MovieDetailVo struct {
}
type RecordRequestVo struct {
OriginId string `json:"originId"` // 源站点ID
CollectType int `json:"collectType"` // 采集类型
Hour int `json:"hour"` // 采集时长
Status int `json:"status"` // 状态
Paging *Page `json:"paging"` // 分页参数
OriginId string `json:"originId"` // 源站点ID
CollectType int `json:"collectType"` // 采集类型
Hour int `json:"hour"` // 采集时长
Status int `json:"status"` // 状态
BeginTime time.Time `json:"beginTime"` // 起始时间
EndTime time.Time `json:"endTime"` // 结束时间
Paging *Page `json:"paging"` // 分页参数
}

View File

@@ -81,7 +81,7 @@ func CollectCrontabInit() {
/*
如果系统中不存在任何定时任务信息, 则添加默认的定时任务
1. 添加一条默认任务, 定时更新所有已启用站点的影片信息
2. 添加一条默认任务, 定时处理采集失败的记录
2. 添加一条默认任务, 定时处理采集失败的记录
3.生成任务信息
*/
task := system.FilmCollectTask{Id: util.GenerateSalt(), Time: config.DefaultUpdateTime, Spec: config.DefaultUpdateSpec,

View File

@@ -3,7 +3,6 @@ package db
import (
"gorm.io/driver/mysql"
"gorm.io/gorm"
"gorm.io/gorm/logger"
"gorm.io/gorm/schema"
"server/config"
)
@@ -25,7 +24,7 @@ func InitMysql() (err error) {
SingularTable: true, //是否使用 结构体名称作为表名 (关闭自动变复数)
//NameReplacer: strings.NewReplacer("spider_", ""), // 替表名和字段中的 Me 为 空
},
Logger: logger.Default.LogMode(logger.Info), //设置日志级别为Info
//Logger: logger.Default.LogMode(logger.Info), //设置日志级别为Info
})
return
}

View File

@@ -326,7 +326,7 @@ func FullRecoverSpider() {
/*
获取待处理的记录数据
1. 采集时长 > 168h (一周,7天) 状态-1 待处理, | 只获取满足条件的最早的待处理记录
2. 采集时长 > 4320h (半年,180天) 状态-1 待处理, | 获取满足条件的所有数据
2. 采集时长 > 4320h (半年,180天) 状态-1 待处理, | 获取满足条件的所有数据
*/
list := system.PendingRecord()
@@ -336,7 +336,7 @@ func FullRecoverSpider() {
case fr.Hour > 0 && fr.Hour < 4320:
// 将此记录之后的所有同类采集记录变更为已重试
system.ChangeRecord(&fr, 0)
// 如果采集的内容是 7~15 天之内更新的内容,则采集此记录之后的所有更新内容
// 如果采集的内容是 0~180 天之内更新的内容,则采集此记录之后的所有更新内容
// 获取采集参数h, 采集时长变更为 原采集时长 + 采集记录距现在的时长
h := fr.Hour + int(math.Ceil(time.Since(fr.CreatedAt).Hours()))
// 对当前所有已启用的站点 更新最新 h 小时的内容
@@ -344,7 +344,7 @@ func FullRecoverSpider() {
case fr.Hour < 0, fr.Hour > 4320:
// 将此记录状态修改为已重试
system.ChangeRecord(&fr, 0)
// 如果采集的是 最近180天更新的内容 或全部内容, 则只对当前一条记录进行二次采集
// 如果采集的是 180天之前更新的内容 或全部内容, 则只对当前一条记录进行二次采集
s := system.FindCollectSourceById(fr.OriginId)
collectFilm(s, fr.Hour, fr.PageNumber)
default:

View File

@@ -18,7 +18,7 @@ func CreateCron() *cron.Cron {
return cron.New(cron.WithSeconds())
}
// AddFilmUpdateCron 添加影片更新定时任务
// AddFilmUpdateCron 添加 指定站点的影片更新定时任务
func AddFilmUpdateCron(id, spec string) (cron.EntryID, error) {
// 校验 spec 表达式的有效性
if err := ValidSpec(spec); err != nil {
@@ -40,7 +40,7 @@ func AddFilmUpdateCron(id, spec string) (cron.EntryID, error) {
})
}
// AddAutoUpdateCron 自动更新定时任务
// AddAutoUpdateCron 添加 所有已启用站点的影片更新定时任务
func AddAutoUpdateCron(id, spec string) (cron.EntryID, error) {
// 校验 spec 表达式的有效性
if err := ValidSpec(spec); err != nil {

View File

@@ -75,6 +75,10 @@ func SetupRouter() *gin.Engine {
collect.GET(`/record/list`, controller.FailureRecordList)
collect.GET(`/record/retry`, controller.CollectRecover)
collect.GET(`/record/retry/all`, controller.CollectRecoverAll)
collect.GET(`/record/clear/done`, controller.ClearDoneRecord)
collect.GET(`/record/clear/all`, controller.ClearAllRecord)
}
// 定时任务相关