collect optimization

This commit is contained in:
mubai
2026-03-30 21:47:26 +08:00
parent 0d258e90bd
commit 8a6bc33cad
23 changed files with 827 additions and 354 deletions

View File

@@ -13,6 +13,7 @@ import (
"server/model/system"
"server/plugin/common/conver"
"server/plugin/common/util"
"strconv"
"time"
)
@@ -46,14 +47,22 @@ func HandleCollect(id string, h int) error {
// 生成 RequestInfo
r := util.RequestInfo{Uri: s.Uri, Params: url.Values{}}
// 如果 h == 0 则直接返回错误信息
if h == 0 {
log.Println(" Collect time cannot be zero ")
return errors.New(" Collect time cannot be zer ")
}
// 如果 h = -1 则进行全量采集
if h > 0 {
// 通过 采集时长 h 的不同来执行不同出处理方式
switch {
case h < 0:
// 采集时长为负数则先执行对应数据表的重置
if s.Grade == system.MasterCollect {
system.FilmZero()
} else {
// 如果所处站点是次级站点, 则删除对应站点在表中的数据
system.ResetSlaveMovieInfoTable()
}
case h > 0:
// 如果采集时长是正常数值, 则设置参数 h
r.Params.Set("h", fmt.Sprint(h))
default:
log.Println("Params Collect time Exception !!!")
return errors.New(" Params Collect time Exception !!! ")
}
// 2. 首先获取分页采集的页数
pageCount, err := spiderCore.GetPageCount(r)
@@ -86,22 +95,169 @@ func HandleCollect(id string, h int) error {
// 如果分页数量较大则开启协程
ConcurrentPageSpider(pageCount, s, h, collectFilm)
}
// 视频数据采集完成后同步相关信息到mysql
// 视频数据采集完成后 对暂存数据进行处理和优化
if s.Grade == system.MasterCollect {
// 执行影片信息更新操作
if h > 0 {
// 执行数据更新操作
system.SyncSearchInfo(1)
} else {
// 清空searchInfo中的数据并重新添加, 否则执行
system.SyncSearchInfo(0)
// 如果采集时长为负, (全量采集), 则在数据采集完成后为search表添加索引
if h < 0 {
// 全量采集时进行数据同步(仅保存)
system.SyncMovieDetail(s.Id, s.Grade)
system.AddSearchIndex()
}
// 采集时长在一定阈值内时执行redis数据同步 (存在则更新, 不存在则新增)
// 开启图片同步
if s.SyncPictures {
system.SyncFilmPicture()
}
// 每次成功执行完都清理redis中的相关API接口数据缓存
ClearCache()
} else if s.Grade == system.SlaveCollect {
// 如果采集时长为负, (全量采集), 则在数据采集完成后为search表添加索引
if h < 0 {
// 全量采集时进行数据同步
system.SyncMovieDetail(s.Id, s.Grade)
}
}
case system.CollectArticle, system.CollectActor, system.CollectRole, system.CollectWebSite:
log.Println("暂未开放此采集功能!!!")
return errors.New("暂未开放此采集功能")
}
log.Println("Spider Task Exercise Success")
return nil
}
func HandleCollectRefine(id string, h int) error {
// 1. 首先通过ID获取对应采集站信息
s := system.FindCollectSourceById(id)
if s == nil {
log.Println("Cannot Find Collect Source Site")
return errors.New(" Cannot Find Collect Source Site ")
} else if !s.State {
log.Println(" The acquisition site was disabled ")
return errors.New(" The acquisition site was disabled ")
}
// 如果是主站点且状态为启用则先获取分类tree信息
if s.Grade == system.MasterCollect && s.State {
// 是否存在分类树信息, 不存在则获取
if !system.ExistsCategoryTree() {
CollectCategory(s)
}
}
// 生成 RequestInfo
r := util.RequestInfo{Uri: s.Uri, Params: url.Values{}}
// 通过 采集时长 h 的不同来执行不同前置出处理方式
switch {
case h < 0:
// 采集时长为负数则先执行对应数据表的重置
if s.Grade == system.MasterCollect {
system.FilmZero()
} else {
// 如果所处站点是次级站点, 则删除对应站点在表中的数据
system.ResetSlaveMovieInfoTable()
}
case h > 0:
// 如果采集时长是正常数值, 则设置参数 h
r.Params.Set("h", fmt.Sprint(h))
default:
log.Println("Params Collect time Exception !!!")
return errors.New(" Params Collect time Exception !!! ")
}
// 通过采集类型分别执行不同的采集方法
switch s.CollectType {
case system.CollectVideo:
// 采集视频资源 根据采集站类型进行不同逻辑
switch s.Grade {
case system.MasterCollect:
// 获取展示的分类切片信息
cl := system.GetRevealCategoryList()
for _, c := range cl {
// 获取分类采集页数
r.Params.Set("t", fmt.Sprint(c.Id))
pageCount, err := spiderCore.GetPageCount(r)
if err != nil {
// 如果第二次获取分页页数依旧获取失败则关闭当前采集任务
pageCount, err = spiderCore.GetPageCount(r)
if err != nil {
return err
}
}
// 如果采集源参数中采集间隔参数大于500ms,则使用单线程采集
if s.Interval > 500 {
// 少量数据不开启协程
for i := 1; i <= pageCount; i++ {
// 设置采集参数pg
r.Params.Set("pg", fmt.Sprint(i))
collectFilmRefine(s, r)
// 执行一次采集后休眠指定时长
time.Sleep(time.Duration(s.Interval) * time.Millisecond)
}
} else if pageCount <= config.MAXGoroutine*5 {
// 少量数据不开启协程
for i := 1; i <= pageCount; i++ {
r.Params.Set("pg", fmt.Sprint(i))
collectFilmRefine(s, r)
}
} else {
// 如果分页数量较大则开启协程
collectFilmMT(pageCount, s, r, collectFilmRefine)
}
}
case system.SlaveCollect:
pageCount, err := spiderCore.GetPageCount(r)
if err != nil {
// 如果第二次获取分页页数依旧获取失败则关闭当前采集任务
pageCount, err = spiderCore.GetPageCount(r)
if err != nil {
return err
}
}
// 如果采集源参数中采集间隔参数大于500ms,则使用单线程采集
if s.Interval > 500 {
// 少量数据不开启协程
for i := 1; i <= pageCount; i++ {
// 设置采集参数pg
r.Params.Set("pg", fmt.Sprint(i))
collectFilmRefine(s, r)
// 执行一次采集后休眠指定时长
time.Sleep(time.Duration(s.Interval) * time.Millisecond)
}
} else if pageCount <= config.MAXGoroutine*5 {
// 少量数据不开启协程
for i := 1; i <= pageCount; i++ {
r.Params.Set("pg", fmt.Sprint(i))
collectFilmRefine(s, r)
}
} else {
// 如果分页数量较大则开启协程
collectFilmMT(pageCount, s, r, collectFilmRefine)
}
}
// 视频数据采集完成后 对暂存数据进行处理和优化
if s.Grade == system.MasterCollect {
// 如果采集时长为负, (全量采集), 则在数据采集完成后为search表添加索引
if h < 0 {
// 全量采集时进行数据同步以及添加索引(仅保存)
system.SyncMovieDetail(s.Id, s.Grade)
system.AddSearchIndex()
system.AddMovieDetailIndex()
}
// 采集时长在一定阈值内时执行redis数据同步 (存在则更新, 不存在则新增)
// 开启图片同步
if s.SyncPictures {
system.SyncFilmPicture()
}
// 每次成功执行完都清理redis中的相关API接口数据缓存
ClearCache()
} else if s.Grade == system.SlaveCollect {
// 如果采集时长为负, (全量采集), 则在数据采集完成后为search表添加索引
if h < 0 {
// 全量采集时进行数据同步
system.SyncMovieDetail(s.Id, s.Grade)
system.AddSlaveMovieInfoIndex()
}
}
case system.CollectArticle, system.CollectActor, system.CollectRole, system.CollectWebSite:
@@ -149,10 +305,23 @@ func collectFilm(s *system.FilmSource, h, pg int) {
// 通过采集站 Grade 类型, 执行不同的存储逻辑
switch s.Grade {
case system.MasterCollect:
// 主站点 保存完整影片详情信息到 redis
if err = system.SaveDetails(list); err != nil {
// 将数据缓存到redis
if err = system.MovieDetailCache(list); err != nil {
log.Println("SaveDetails Error: ", err)
}
//break
// 如果 采集时长 h 小于阈值, 则将主体数据缓存到redis
//if h > 0 && h < config.FilmSaveCacheThreshold {
// // 主站点 执行保存或更新
// if err = system.BatchUpdateDetails(list); err != nil {
// log.Println("SaveDetails Error: ", err)
// }
//} else {
// // 主站点 从零开始只执行保存逻辑
// if err = system.MovieDetailCache(list); err != nil {
// log.Println("SaveDetails Error: ", err)
// }
//}
// 如果主站点开启了图片同步, 则将图片url以及对应的mid存入ZSet集合中
if s.SyncPictures {
if err = system.SaveVirtualPic(conver.ConvertVirtualPicture(list)); err != nil {
@@ -160,10 +329,22 @@ func collectFilm(s *system.FilmSource, h, pg int) {
}
}
case system.SlaveCollect:
// 附属站点 仅保存影片播放信息到redis
if err = system.SaveSitePlayList(s.Id, list); err != nil {
// 将采集数据缓存到redis
if err = system.SlaveDetailCache(s.Id, list); err != nil {
log.Println("SaveDetails Error: ", err)
}
//if h > 0 && h < config.FilmSaveCacheThreshold {
// // 附属站点 仅保存影片播放信息到mysql
// if err = system.UpdateSitePlayList(s.Id, list); err != nil {
// log.Println("SaveDetails Error: ", err)
// }
//} else {
// // 附属站点 仅保存影片播放信息到mysql
// if err = system.SlaveDetailCache(s.Id, list); err != nil {
// log.Println("SaveDetails Error: ", err)
// }
//}
}
}
@@ -196,7 +377,42 @@ func collectFilmById(ids string, s *system.FilmSource) {
}
case system.SlaveCollect:
// 附属站点 仅保存影片播放信息到redis
if err = system.SaveSitePlayList(s.Id, list); err != nil {
if err = system.UpdateSitePlayList(s.Id, list); err != nil {
log.Println("SaveDetails Error: ", err)
}
}
}
// 影片信息采集, 改进版
func collectFilmRefine(s *system.FilmSource, r util.RequestInfo) {
// 执行采集方法 获取影片详情list
//log.Printf("%s?%s", r.Uri, r.Params.Encode())
list, err := spiderCore.GetFilmDetail(r)
if err != nil || len(list) <= 0 {
// 添加采集失败记录
pg, _ := strconv.Atoi(r.Params.Get("pg"))
h, _ := strconv.Atoi(r.Params.Get("h"))
fr := system.FailureRecord{OriginId: s.Id, OriginName: s.Name, Uri: s.Uri, CollectType: system.CollectVideo, PageNumber: pg, Hour: h, Cause: fmt.Sprintln(err), Status: 1}
system.SaveFailureRecord(fr)
log.Println("GetMovieDetail Error: ", err)
return
}
// 通过采集站 Grade 类型, 执行不同的存储逻辑
switch s.Grade {
case system.MasterCollect:
// 将数据缓存到redis中
if err = system.MovieDetailCache(list); err != nil {
log.Println("SaveDetails Error: ", err)
}
// 如果主站点开启了图片同步, 则将图片url以及对应的mid存入ZSet集合中
if s.SyncPictures {
if err = system.SaveVirtualPic(conver.ConvertVirtualPicture(list)); err != nil {
log.Println("SaveVirtualPic Error: ", err)
}
}
case system.SlaveCollect:
// 将采集数据缓存到redis中
if err = system.SlaveDetailCache(s.Id, list); err != nil {
log.Println("SaveDetails Error: ", err)
}
}
@@ -235,6 +451,46 @@ func ConcurrentPageSpider(capacity int, s *system.FilmSource, h int, collectFunc
}
}
// collectFilmMT 并发采集影片信息
func collectFilmMT(capacity int, s *system.FilmSource, r util.RequestInfo, collectFunc func(s *system.FilmSource, r util.RequestInfo)) {
// 初始化 channel, 容量为 capacity
ch := make(chan int, capacity)
// 收集结束标识
waitCh := make(chan int)
// 循环将所有需采集的页码写入 ch
for i := 1; i <= capacity; i++ {
ch <- i
}
close(ch)
// 开启 MAXGoroutine 数量的协程, 如果分页页数小于设定的最大线程数, 则将线程数设置为1
var GoroutineNum = config.MAXGoroutine
if capacity < GoroutineNum*5 {
GoroutineNum = 1
}
// 如果满足开启并发的条件, 则开启GoroutineNum数量的协程进行并发采集
for i := 0; i < GoroutineNum; i++ {
go func() {
defer func() { waitCh <- 0 }()
for {
// 从channel中获取 pageNumber
pg, ok := <-ch
if !ok {
break
}
// 执行对应的采集方法, 并发时不同使用同一个requestInfo
requestInfo := util.CopyRequestInfo(r)
requestInfo.Params.Set("pg", fmt.Sprint(pg))
collectFunc(s, requestInfo)
}
}()
}
// 等待所有协程执行完毕
for i := 0; i < GoroutineNum; i++ {
<-waitCh
}
}
// BatchCollect 批量采集, 采集指定的所有站点最近x小时内更新的数据
func BatchCollect(h int, ids ...string) {
for _, id := range ids {
@@ -242,7 +498,7 @@ func BatchCollect(h int, ids ...string) {
if fs := system.FindCollectSourceById(id); fs != nil && fs.State {
// 采用协程并发执行, 每个站点单独开启一个协程执行
go func() {
err := HandleCollect(fs.Id, h)
err := HandleCollectRefine(fs.Id, h)
if err != nil {
log.Println(err)
}
@@ -261,7 +517,7 @@ func AutoCollect(h int) {
for _, s := range system.GetCollectSourceList() {
// 如果当前站点为启用状态 则执行 HandleCollect 进行数据采集
if s.State {
if err := HandleCollect(s.Id, h); err != nil {
if err := HandleCollectRefine(s.Id, h); err != nil {
log.Println(err)
}
}