ABS v1 test

This commit is contained in:
mubai
2023-12-24 23:22:20 +08:00
parent b48e53a637
commit 86e0501d2f
59 changed files with 538 additions and 1699 deletions

View File

@@ -1,14 +1,18 @@
package spider
import (
"encoding/json"
"encoding/xml"
"errors"
"fmt"
"log"
"net/url"
"server/config"
"server/model/collect"
"server/model/system"
"server/plugin/common/conver"
"server/plugin/common/util"
"time"
)
/*
@@ -18,7 +22,7 @@ import (
var spiderCore = &JsonCollect{}
// =========================通用采集方法==============================
// ======================================================= 通用采集方法 =======================================================
// HandleCollect 影视采集 id-采集站ID h-时长/h
func HandleCollect(id string, h int) error {
@@ -64,7 +68,15 @@ func HandleCollect(id string, h int) error {
switch s.CollectType {
case system.CollectVideo:
// 采集视频资源
if pageCount <= config.MAXGoroutine*2 {
// 如果采集源参数中采集间隔参数大于500ms,则使用单线程采集
if s.Interval > 500 {
// 少量数据不开启协程
for i := 1; i <= pageCount; i++ {
collectFilm(s, h, i)
// 执行一次采集后休眠指定时长
time.Sleep(time.Duration(s.Interval) * time.Millisecond)
}
} else if pageCount <= config.MAXGoroutine*2 {
// 少量数据不开启协程
for i := 1; i <= pageCount; i++ {
collectFilm(s, h, i)
@@ -75,8 +87,6 @@ func HandleCollect(id string, h int) error {
}
// 视频数据采集完成后同步相关信息到mysql
if s.Grade == system.MasterCollect {
// 每次成功执行完都清理redis中的相关API接口数据缓存
clearCache()
// 执行影片信息更新操作
if h > 0 {
// 执行数据更新操作
@@ -89,6 +99,8 @@ func HandleCollect(id string, h int) error {
if s.SyncPictures {
system.SyncFilmPicture()
}
// 每次成功执行完都清理redis中的相关API接口数据缓存
clearCache()
}
case system.CollectArticle, system.CollectActor, system.CollectRole, system.CollectWebSite:
@@ -145,7 +157,7 @@ func collectFilm(s *system.FilmSource, h, pg int) {
}
case system.SlaveCollect:
// 附属站点 仅保存影片播放信息到redis
if err = system.SaveSitePlayList(s.Name, list); err != nil {
if err = system.SaveSitePlayList(s.Id, list); err != nil {
log.Println("SaveDetails Error: ", err)
}
}
@@ -217,3 +229,34 @@ func StarZero(h int) {
// 开启自动采集
AutoCollect(h)
}
// ======================================================= 公共方法 =======================================================
// CollectApiTest 测试采集接口是否可用
func CollectApiTest(s system.FilmSource) error {
// 使用当前采集站接口采集一页数据
r := util.RequestInfo{Uri: s.Uri, Params: url.Values{}}
r.Params.Set("ac", s.CollectType.GetActionType())
r.Params.Set("pg", "3")
err := util.ApiTest(&r)
// 首先核对接口返回值类型
if err == nil {
// 如果返回值类型为Json则执行Json序列化
if s.ResultModel == system.JsonResult {
var dp = collect.FilmDetailLPage{}
if err = json.Unmarshal(r.Resp, &dp); err != nil {
return errors.New(fmt.Sprint("测试失败, 返回数据异常, JSON序列化失败: ", err))
}
return nil
} else if s.ResultModel == system.XmlResult {
// 如果返回值类型为XML则执行XML序列化
var rd = collect.RssD{}
if err = xml.Unmarshal(r.Resp, &rd); err != nil {
return errors.New(fmt.Sprint("测试失败, 返回数据异常, XML序列化失败", err))
}
return nil
}
return errors.New("测试失败, 接口返回值类型不符合规范")
}
return errors.New(fmt.Sprint("测试失败, 请求响应异常 : ", err.Error()))
}

View File

@@ -3,7 +3,6 @@ package spider
import (
"encoding/json"
"errors"
"fmt"
"log"
"server/model/collect"
"server/model/system"
@@ -20,8 +19,8 @@ type FilmCollect interface {
GetCategoryTree(r util.RequestInfo) (*system.CategoryTree, error)
// GetPageCount 获取API接口的分页页数
GetPageCount(r util.RequestInfo) (count int, err error)
// GetDetail 获取指定pageNumber的具体数据
GetDetail(pageNumber int, r util.RequestInfo) (list []system.MovieDetail, err error)
// GetFilmDetail 获取影片详情信息,返回影片详情列表
GetFilmDetail(r util.RequestInfo) (list []system.MovieDetail, err error)
}
// ------------------------------------------------- JSON Collect -------------------------------------------------
@@ -55,7 +54,7 @@ func (jc *JsonCollect) GetCategoryTree(r util.RequestInfo) (*system.CategoryTree
return tree, err
}
// GetPageCount 获取总页数
// GetPageCount 获取分页总页数
func (jc *JsonCollect) GetPageCount(r util.RequestInfo) (count int, err error) {
// 发送请求获取pageCount, 默认为获取 ac = detail
if len(r.Params.Get("ac")) <= 0 {
@@ -78,43 +77,6 @@ func (jc *JsonCollect) GetPageCount(r util.RequestInfo) (count int, err error) {
return
}
// GetDetail 处理详情接口请求返回的数据
func (jc *JsonCollect) GetDetail(pageNumber int, r util.RequestInfo) (list []system.MovieDetail, err error) {
// 防止json解析异常引发panic
defer func() {
if e := recover(); e != nil {
log.Println("GetMovieDetail Failed : ", e)
}
}()
// 设置分页请求参数
r.Params.Set(`ac`, `detail`)
r.Params.Set(`pg`, fmt.Sprint(pageNumber))
util.ApiGet(&r)
// 影视详情信息
detailPage := collect.FilmDetailLPage{}
//details := system.DetailListInfo{}
// 如果返回数据为空则直接结束本次循环
if len(r.Resp) <= 0 {
err = errors.New("response is empty")
return
}
// 序列化详情数据
if err = json.Unmarshal(r.Resp, &detailPage); err != nil {
return
}
// 将影视原始详情信息保存到redis中
// 获取主站点uri
mc := system.GetCollectSourceListByGrade(system.MasterCollect)[0]
if mc.Uri == r.Uri {
collect.BatchSaveOriginalDetail(detailPage.List)
}
// 处理details信息
list = conver.ConvertFilmDetails(detailPage.List)
return
}
// GetFilmDetail 通过 RequestInfo 获取并解析出对应的 MovieDetail list
func (jc *JsonCollect) GetFilmDetail(r util.RequestInfo) (list []system.MovieDetail, err error) {
// 防止json解析异常引发panic

View File

@@ -14,42 +14,7 @@ var (
CronCollect *cron.Cron = CreateCron()
)
// RegularUpdateMovie 定时更新, 每半小时获取一次站点的最近x小时数据
func RegularUpdateMovie() {
//创建一个定时任务对象
c := cron.New(cron.WithSeconds())
// 添加定时任务每x 分钟更新一次最近x小时的影片数据
taskId, err := c.AddFunc(config.CornMovieUpdate, func() {
// 执行更新最近x小时影片的Spider
log.Println("执行一次影片更新任务...")
UpdateMovieDetail()
// 执行更新任务后清理redis中的相关API接口数据缓存
clearCache()
})
// 开启定时任务每月最后一天凌晨两点, 执行一次清库重取数据
taskId2, err := c.AddFunc(config.CornUpdateAll, func() {
StartSpiderRe()
})
if err != nil {
log.Println("Corn Start Error: ", err)
}
log.Println(taskId, "------", taskId2)
log.Printf("%v", c.Entries())
//c.Start()
}
// StartCrontab 启动定时任务
func StartCrontab() {
// 从redis中读取待启动的定时任务列表
// 影片更新定时任务列表
CronCollect.Start()
}
// CreateCron 创建定时任务
func CreateCron() *cron.Cron {
return cron.New(cron.WithSeconds())
}

View File

@@ -1,275 +0,0 @@
package spider
import (
"encoding/json"
"encoding/xml"
"errors"
"fmt"
"log"
"net/url"
"server/config"
"server/model/collect"
"server/model/system"
"server/plugin/common/util"
"time"
)
/*
舍弃第一版的数据处理思路, v2版本
直接分页获取采集站点的影片详情信息
*/
/*
1. 选择一个采集主站点, mysql检索表中只存储主站点检索的信息
2. 采集多个站点数据
2.1 主站点的采集数据完整地保存相关信息, basicInfo movieDetail search 等信息
2.2 其余站点数据只存储 name(影片名称), playUrl(播放url), 存储形式 Key<hash(name)>:value([]MovieUrlInfo)
3. api数据格式不变, 获取影片详情时通过subTitle 去redis匹配其他站点的对应播放源并整合到主站详情信息的playUrl中
4. 影片搜索时不再使用name进行匹配, 改为使用 subTitle 进行匹配
*/
// StartSpider 执行多源spider
func StartSpider() {
// 保存分类树
CategoryList()
log.Println("CategoryList 影片分类信息保存完毕")
// 爬取主站点数据
MainSiteSpider()
log.Println("MainSiteSpider 主站点影片信息保存完毕")
// 查找并创建search数据库, 保存search信息, 添加索引
time.Sleep(time.Second * 10)
system.SyncSearchInfo(0)
system.AddSearchIndex()
log.Println("SearchInfoToMdb 影片检索信息保存完毕")
//获取其他站点数据
scl := system.GetCollectSourceListByGrade(system.SlaveCollect)
go MtSiteSpider(scl...)
log.Println("Spider End , 数据保存执行完成")
time.Sleep(time.Second * 10)
}
// CategoryList 获取分类数据
func CategoryList() {
// 获取主站点uri
mc := system.GetCollectSourceListByGrade(system.MasterCollect)[0]
// 获取分类树形数据
categoryTree, err := spiderCore.GetCategoryTree(util.RequestInfo{Uri: mc.Uri, Params: url.Values{}})
if err != nil {
log.Println("GetCategoryTree Error: ", err)
return
}
// 保存 tree 到redis
err = system.SaveCategoryTree(categoryTree)
if err != nil {
log.Println("SaveCategoryTree Error: ", err)
}
}
// MainSiteSpider 主站点数据处理
func MainSiteSpider() {
// 获取主站点uri
mc := system.GetCollectSourceListByGrade(system.MasterCollect)[0]
// 获取分页页数
pageCount, err := spiderCore.GetPageCount(util.RequestInfo{Uri: mc.Uri, Params: url.Values{}})
// 主站点分页出错直接终止程序
if err != nil {
panic(err)
}
// 开启协程加快分页请求速度
ch := make(chan int, pageCount)
waitCh := make(chan int)
for i := 1; i <= pageCount; i++ {
ch <- i
}
close(ch)
for i := 0; i < config.MAXGoroutine; i++ {
go func() {
defer func() { waitCh <- 0 }()
for {
pg, ok := <-ch
if !ok {
break
}
list, e := spiderCore.GetDetail(pg, util.RequestInfo{Uri: mc.Uri, Params: url.Values{}})
if e != nil {
log.Println("GetMovieDetail Error: ", err)
continue
}
// 保存影片详情信息到redis
if err = system.SaveDetails(list); err != nil {
log.Println("SaveDetails Error: ", err)
}
}
}()
}
for i := 0; i < config.MAXGoroutine; i++ {
<-waitCh
}
}
// MtSiteSpider 附属站点数据源处理
func MtSiteSpider(scl ...system.FilmSource) {
for _, s := range scl {
// 执行每个站点的播放url缓存
PlayDetailSpider(s)
log.Println(s.Name, "playUrl 爬取完毕!!!")
}
}
// PlayDetailSpider SpiderSimpleInfo 获取单个站点的播放源
func PlayDetailSpider(s system.FilmSource) {
// 获取分页页数
pageCount, err := spiderCore.GetPageCount(util.RequestInfo{Uri: s.Uri, Params: url.Values{}})
// 出错直接终止当前站点数据获取
if err != nil {
log.Println(err)
return
}
// 开启协程加快分页请求速度
ch := make(chan int, pageCount)
waitCh := make(chan int)
for i := 1; i <= pageCount; i++ {
ch <- i
}
close(ch)
for i := 0; i < config.MAXGoroutine; i++ {
go func() {
defer func() { waitCh <- 0 }()
for {
pg, ok := <-ch
if !ok {
break
}
list, e := spiderCore.GetDetail(pg, util.RequestInfo{Uri: s.Uri, Params: url.Values{}})
if e != nil || len(list) <= 0 {
log.Println("GetMovieDetail Error: ", err)
continue
}
// 保存影片播放信息到redis
if err = system.SaveSitePlayList(s.Name, list); err != nil {
log.Println("SaveDetails Error: ", err)
}
}
}()
}
for i := 0; i < config.MAXGoroutine; i++ {
<-waitCh
}
}
// UpdateMovieDetail 定时更新主站点和其余播放源信息
func UpdateMovieDetail() {
// 更新主站系列信息
UpdateMainDetail()
// 更新附属播放源数据信息
scl := system.GetCollectSourceListByGrade(system.SlaveCollect)
UpdatePlayDetail(scl...)
}
// UpdateMainDetail 更新主站点的最新影片
func UpdateMainDetail() {
// 获取主站点uri
l := system.GetCollectSourceListByGrade(system.MasterCollect)
mc := system.FilmSource{}
for _, v := range l {
if len(v.Uri) > 0 {
mc = v
break
}
}
// 获取分页页数
r := util.RequestInfo{Uri: mc.Uri, Params: url.Values{}}
r.Params.Set("h", config.UpdateInterval)
pageCount, err := spiderCore.GetPageCount(r)
if err != nil {
log.Printf("Update MianStieDetail failed\n")
}
// 保存本次更新的所有详情信息
var ds []system.MovieDetail
// 获取分页数据
for i := 1; i <= pageCount; i++ {
list, err := spiderCore.GetDetail(i, r)
if err != nil {
continue
}
// 保存更新的影片信息, 同类型直接覆盖
if err = system.SaveDetails(list); err != nil {
log.Println("Update MainSiteDetail failed, SaveDetails Error ")
}
ds = append(ds, list...)
}
// 整合详情信息切片
var sl []system.SearchInfo
for _, d := range ds {
// 通过id 获取对应的详情信息
sl = append(sl, system.ConvertSearchInfo(d))
}
// 调用批量保存或更新方法, 如果对应mid数据存在则更新, 否则执行插入
system.BatchSaveOrUpdate(sl)
}
// UpdatePlayDetail 更新最x小时的影片播放源数据
func UpdatePlayDetail(scl ...system.FilmSource) {
for _, s := range scl {
// 获取单个站点的分页数
r := util.RequestInfo{Uri: s.Uri, Params: url.Values{}}
r.Params.Set("h", config.UpdateInterval)
pageCount, err := spiderCore.GetPageCount(r)
if err != nil {
log.Printf("Update %s playDetail failed\n", s.Name)
}
for i := 1; i <= pageCount; i++ {
// 获取详情信息, 保存到对应hashKey中
list, e := spiderCore.GetDetail(i, r)
if e != nil || len(list) <= 0 {
log.Println("GetMovieDetail Error: ", err)
continue
}
// 保存影片播放信息到redis
if err = system.SaveSitePlayList(s.Name, list); err != nil {
log.Println("SaveDetails Error: ", err)
}
}
}
}
// StartSpiderRe 清空存储数据,从零开始获取
func StartSpiderRe() {
// 删除已有的存储数据, redis 和 mysql中的存储数据全部清空
system.FilmZero()
// 执行完整数据获取
StartSpider()
}
// =========================公共方法==============================
// CollectApiTest 测试采集接口是否可用
func CollectApiTest(s system.FilmSource) error {
// 使用当前采集站接口采集一页数据
r := util.RequestInfo{Uri: s.Uri, Params: url.Values{}}
r.Params.Set("ac", s.CollectType.GetActionType())
r.Params.Set("pg", "3")
err := util.ApiTest(&r)
// 首先核对接口返回值类型
if err == nil {
// 如果返回值类型为Json则执行Json序列化
if s.ResultModel == system.JsonResult {
var dp = collect.FilmDetailLPage{}
if err = json.Unmarshal(r.Resp, &dp); err != nil {
return errors.New(fmt.Sprint("测试失败, 返回数据异常, JSON序列化失败: ", err))
}
return nil
} else if s.ResultModel == system.XmlResult {
// 如果返回值类型为XML则执行XML序列化
var rd = collect.RssD{}
if err = xml.Unmarshal(r.Resp, &rd); err != nil {
return errors.New(fmt.Sprint("测试失败, 返回数据异常, XML序列化失败", err))
}
return nil
}
return errors.New("测试失败, 接口返回值类型不符合规范")
}
return errors.New(fmt.Sprint("测试失败, 请求响应异常 : ", err.Error()))
}