This commit is contained in:
mubai
2023-12-23 22:32:52 +08:00
parent d85dbe915c
commit b48e53a637
151 changed files with 12451 additions and 1382 deletions

View File

@@ -1,138 +1,181 @@
package spider
import (
"encoding/json"
"errors"
"fmt"
"log"
"net/url"
"server/config"
"server/model"
"server/plugin/common/dp"
"server/model/system"
"server/plugin/common/conver"
"server/plugin/common/util"
"time"
)
/*
舍弃第一版的数据处理思路, v2版本
直接分页获取采集站点的影片详情信息
采集逻辑 v3
*/
/*
1. 选择一个采集主站点, mysql检索表中只存储主站点检索的信息
2. 采集多个站点数据
2.1 主站点的采集数据完整地保存相关信息, basicInfo movieDetail search 等信息
2.2 其余站点数据只存储 name(影片名称), playUrl(播放url), 存储形式 Key<hash(name)>:value([]MovieUrlInfo)
3. api数据格式不变, 获取影片详情时通过subTitle 去redis匹配其他站点的对应播放源并整合到主站详情信息的playUrl中
4. 影片搜索时不再使用name进行匹配, 改为使用 subTitle 进行匹配
*/
var spiderCore = &JsonCollect{}
const (
MainSite = "https://cj.lzcaiji.com/api.php/provide/vod/"
)
// =========================通用采集方法==============================
type Site struct {
Name string
Uri string
}
// SiteList 播放源采集站
var SiteList = []Site{
// 备用采集站
//{"lz_bk", "https://cj.lzcaiji.com/api.php/provide/vod/"},
//{"fs", "https://www.feisuzyapi.com/api.php/provide/vod/"},
//{"su", "https://subocaiji.com/api.php/provide/vod/at/json"},
//{"bf", "https://bfzyapi.com/api.php/provide/vod/"},
//{"ff", "https://svip.ffzyapi8.com/api.php/provide/vod/"},
//{"lz", "https://cj.lziapi.com/api.php/provide/vod/"},
{"kk", "https://kuaikan-api.com/api.php/provide/vod/from/kuaikan"},
{"bf", "http://by.bfzyapi.com/api.php/provide/vod/"},
{"ff", "https://cj.ffzyapi.com/api.php/provide/vod/"},
}
// StartSpider 执行多源spider
func StartSpider() {
// 保存分类树
CategoryList()
log.Println("CategoryList 影片分类信息保存完毕")
// 爬取主站点数据
MainSiteSpider()
log.Println("MainSiteSpider 主站点影片信息保存完毕")
// 查找并创建search数据库, 保存search信息, 添加索引
time.Sleep(time.Second * 10)
model.CreateSearchTable()
SearchInfoToMdb()
model.AddSearchIndex()
log.Println("SearchInfoToMdb 影片检索信息保存完毕")
//获取其他站点数据
go MtSiteSpider()
log.Println("Spider End , 数据保存执行完成")
time.Sleep(time.Second * 10)
}
// CategoryList 获取分类数据
func CategoryList() {
// 设置请求参数信息
r := util.RequestInfo{Uri: MainSite, Params: url.Values{}}
r.Params.Set(`ac`, "list")
r.Params.Set(`pg`, "1")
r.Params.Set(`t`, "1")
// 执行请求, 获取一次list数据
util.ApiGet(&r)
// 解析resp数据
movieListInfo := model.MovieListInfo{}
if len(r.Resp) <= 0 {
log.Println("MovieListInfo数据获取异常 : Resp Is Empty")
// HandleCollect 影视采集 id-采集站ID h-时长/h
func HandleCollect(id string, h int) error {
// 1. 首先通过ID获取对应采集站信息
s := system.FindCollectSourceById(id)
if s == nil {
log.Println("Cannot Find Collect Source Site")
return errors.New(" Cannot Find Collect Source Site ")
} else if !s.State {
log.Println(" The acquisition site was disabled ")
return errors.New(" The acquisition site was disabled ")
}
// 如果是主站点且状态为启用则先获取分类tree信息
if s.Grade == system.MasterCollect && s.State {
// 是否存在分类树信息, 不存在则获取
if !system.ExistsCategoryTree() {
CollectCategory(s)
}
}
// 生成 RequestInfo
r := util.RequestInfo{Uri: s.Uri, Params: url.Values{}}
// 如果 h == 0 则直接返回错误信息
if h == 0 {
log.Println(" Collect time cannot be zero ")
return errors.New(" Collect time cannot be zer ")
}
// 如果 h = -1 则进行全量采集
if h > 0 {
r.Params.Set("h", fmt.Sprint(h))
}
// 2. 首先获取分页采集的页数
pageCount, err := spiderCore.GetPageCount(r)
// 分页页数失败 则再进行一次尝试
if err != nil {
// 如果第二次获取分页页数依旧获取失败则关闭当前采集任务
pageCount, err = spiderCore.GetPageCount(r)
if err != nil {
return err
}
}
// 通过采集类型分别执行不同的采集方法
switch s.CollectType {
case system.CollectVideo:
// 采集视频资源
if pageCount <= config.MAXGoroutine*2 {
// 少量数据不开启协程
for i := 1; i <= pageCount; i++ {
collectFilm(s, h, i)
}
} else {
// 如果分页数量较大则开启协程
ConcurrentPageSpider(pageCount, s, h, collectFilm)
}
// 视频数据采集完成后同步相关信息到mysql
if s.Grade == system.MasterCollect {
// 每次成功执行完都清理redis中的相关API接口数据缓存
clearCache()
// 执行影片信息更新操作
if h > 0 {
// 执行数据更新操作
system.SyncSearchInfo(1)
} else {
// 清空searchInfo中的数据并重新添加, 否则执行
system.SyncSearchInfo(0)
}
// 开启图片同步
if s.SyncPictures {
system.SyncFilmPicture()
}
}
case system.CollectArticle, system.CollectActor, system.CollectRole, system.CollectWebSite:
log.Println("暂未开放此采集功能!!!")
return errors.New("暂未开放此采集功能")
}
log.Println("Spider Task Exercise Success")
return nil
}
// CollectCategory 影视分类采集
func CollectCategory(s *system.FilmSource) {
// 获取分类树形数据
categoryTree, err := spiderCore.GetCategoryTree(util.RequestInfo{Uri: s.Uri, Params: url.Values{}})
if err != nil {
log.Println("GetCategoryTree Error: ", err)
return
}
_ = json.Unmarshal(r.Resp, &movieListInfo)
// 获取分类列表信息
classList := movieListInfo.Class
// 组装分类数据信息树形结构
categoryTree := dp.CategoryTree(classList)
// 序列化tree
data, _ := json.Marshal(categoryTree)
// 保存 tree 到redis
err := model.SaveCategoryTree(string(data))
err = system.SaveCategoryTree(categoryTree)
if err != nil {
log.Println("SaveCategoryTree Error: ", err)
}
}
// MainSiteSpider 主站点数据处理
func MainSiteSpider() {
// 获取分页页
pageCount, err := GetPageCount(util.RequestInfo{Uri: MainSite, Params: url.Values{}})
// 主站点分页出错直接终止程序
if err != nil {
panic(err)
// 影视详情采集
func collectFilm(s *system.FilmSource, h, pg int) {
// 生成请求参
r := util.RequestInfo{Uri: s.Uri, Params: url.Values{}}
// 设置分页页数
r.Params.Set("pg", fmt.Sprint(pg))
// 如果 h = -1 则进行全量采集
if h > 0 {
r.Params.Set("h", fmt.Sprint(h))
}
// 开启协程加快分页请求速度
ch := make(chan int, pageCount)
// 执行采集方法 获取影片详情list
list, err := spiderCore.GetFilmDetail(r)
if err != nil || len(list) <= 0 {
log.Println("GetMovieDetail Error: ", err)
return
}
// 通过采集站 Grade 类型, 执行不同的存储逻辑
switch s.Grade {
case system.MasterCollect:
// 主站点 保存完整影片详情信息到 redis
if err = system.SaveDetails(list); err != nil {
log.Println("SaveDetails Error: ", err)
}
// 如果主站点开启了图片同步, 则将图片url以及对应的mid存入ZSet集合中
if s.SyncPictures {
if err = system.SaveVirtualPic(conver.ConvertVirtualPicture(list)); err != nil {
log.Println("SaveVirtualPic Error: ", err)
}
}
case system.SlaveCollect:
// 附属站点 仅保存影片播放信息到redis
if err = system.SaveSitePlayList(s.Name, list); err != nil {
log.Println("SaveDetails Error: ", err)
}
}
}
// ConcurrentPageSpider 并发分页采集, 不限类型
func ConcurrentPageSpider(capacity int, s *system.FilmSource, h int, collectFunc func(s *system.FilmSource, hour, pageNumber int)) {
// 开启协程并发执行
ch := make(chan int, capacity)
waitCh := make(chan int)
for i := 1; i <= pageCount; i++ {
for i := 1; i <= capacity; i++ {
ch <- i
}
close(ch)
for i := 0; i < config.MAXGoroutine; i++ {
// 开启 MAXGoroutine 数量的协程, 如果分页页数小于协程数则将协程数限制为分页页数
var GoroutineNum = config.MAXGoroutine
if capacity < GoroutineNum {
GoroutineNum = capacity
}
for i := 0; i < GoroutineNum; i++ {
go func() {
defer func() { waitCh <- 0 }()
for {
// 从channel中获取 pageNumber
pg, ok := <-ch
if !ok {
break
}
list, e := GetMovieDetail(pg, util.RequestInfo{Uri: MainSite, Params: url.Values{}})
if e != nil {
log.Println("GetMovieDetail Error: ", err)
continue
}
// 保存影片详情信息到redis
if err = model.SaveDetails(list); err != nil {
log.Println("SaveDetails Error: ", err)
}
// 执行对应的采集方法
collectFunc(s, h, pg)
}
}()
}
@@ -141,198 +184,36 @@ func MainSiteSpider() {
}
}
// MtSiteSpider 附属数据源处理
func MtSiteSpider() {
for _, s := range SiteList {
// 执行每个站点的播放url缓存
PlayDetailSpider(s)
log.Println(s.Name, "playUrl 爬取完毕!!!")
}
}
// PlayDetailSpider SpiderSimpleInfo 获取单个站点的播放源
func PlayDetailSpider(s Site) {
// 获取分页页数
pageCount, err := GetPageCount(util.RequestInfo{Uri: s.Uri, Params: url.Values{}})
// 出错直接终止当前站点数据获取
if err != nil {
log.Println(err)
return
}
// 开启协程加快分页请求速度
ch := make(chan int, pageCount)
waitCh := make(chan int)
for i := 1; i <= pageCount; i++ {
ch <- i
}
close(ch)
for i := 0; i < config.MAXGoroutine; i++ {
go func() {
defer func() { waitCh <- 0 }()
for {
pg, ok := <-ch
if !ok {
break
}
list, e := GetMovieDetail(pg, util.RequestInfo{Uri: s.Uri, Params: url.Values{}})
if e != nil || len(list) <= 0 {
log.Println("GetMovieDetail Error: ", err)
continue
}
// 保存影片播放信息到redis
if err = model.SaveSitePlayList(s.Name, list); err != nil {
log.Println("SaveDetails Error: ", err)
}
}
}()
}
for i := 0; i < config.MAXGoroutine; i++ {
<-waitCh
}
}
// SearchInfoToMdb 扫描redis中的检索信息, 并批量存入mysql
func SearchInfoToMdb() {
// 1. 从redis的Zset集合中scan扫描数据, 每次100条
var cursor uint64 = 0
var count int64 = 100
for {
infoList, nextStar := model.ScanSearchInfo(cursor, count)
// 2. 将扫描到的数据插入mysql中
model.BatchSave(infoList)
// 3.设置下次开始的游标
cursor = nextStar
// 4. 判断迭代是否已经结束 cursor为0则表示已经迭代完毕
if cursor == 0 {
return
}
}
}
// UpdateMovieDetail 定时更新主站点和其余播放源信息
func UpdateMovieDetail() {
// 更新主站系列信息
UpdateMainDetail()
// 更新播放源数据信息
UpdatePlayDetail()
}
// UpdateMainDetail 更新主站点的最新影片
func UpdateMainDetail() {
// 获取分页页数
r := util.RequestInfo{Uri: MainSite, Params: url.Values{}}
r.Params.Set("h", config.UpdateInterval)
pageCount, err := GetPageCount(r)
if err != nil {
log.Printf("Update MianStieDetail failed")
}
// 保存本次更新的所有详情信息
var ds []model.MovieDetail
// 获取分页数据
for i := 1; i <= pageCount; i++ {
list, err := GetMovieDetail(i, r)
if err != nil {
continue
}
// 保存更新的影片信息, 同类型直接覆盖
if err = model.SaveDetails(list); err != nil {
log.Printf("Update MianStieDetail failed, SaveDetails Error ")
}
ds = append(ds, list...)
}
// 整合详情信息切片
var sl []model.SearchInfo
for _, d := range ds {
// 通过id 获取对应的详情信息
sl = append(sl, model.ConvertSearchInfo(d))
}
// 调用批量保存或更新方法, 如果对应mid数据存在则更新, 否则执行插入
model.BatchSaveOrUpdate(sl)
}
// UpdatePlayDetail 更新最x小时的影片播放源数据
func UpdatePlayDetail() {
for _, s := range SiteList {
// 获取单个站点的分页数
r := util.RequestInfo{Uri: s.Uri, Params: url.Values{}}
r.Params.Set("h", config.UpdateInterval)
pageCount, err := GetPageCount(r)
if err != nil {
log.Printf("Update %s playDetail failed", s.Name)
}
for i := 1; i <= pageCount; i++ {
// 获取详情信息, 保存到对应hashKey中
list, e := GetMovieDetail(i, r)
if e != nil || len(list) <= 0 {
log.Println("GetMovieDetail Error: ", err)
continue
}
// 保存影片播放信息到redis
if err = model.SaveSitePlayList(s.Name, list); err != nil {
log.Println("SaveDetails Error: ", err)
// BatchCollect 批量采集, 采集指定的所有站点最近x小时内更新的数据
func BatchCollect(h int, ids ...string) {
for _, id := range ids {
// 如果查询到对应Id的资源站信息, 且资源站处于启用状态
if fs := system.FindCollectSourceById(id); fs != nil && fs.State {
// 执行当前站点的采集任务
if err := HandleCollect(fs.Id, h); err != nil {
log.Println(err)
}
}
}
}
// StartSpiderRe 清空存储数据,从零开始获取
func StartSpiderRe() {
// 删除已有的存储数据, redis 和 mysql中的存储数据全部清空
model.RemoveAll()
// 执行完整数据获取
StartSpider()
}
// =========================公共方法==============================
// GetPageCount 获取总页数
func GetPageCount(r util.RequestInfo) (count int, err error) {
// 发送请求获取pageCount
r.Params.Set("ac", "detail")
r.Params.Set("pg", "2")
util.ApiGet(&r)
// 判断请求结果是否为空, 如果为空直接输出错误并终止
if len(r.Resp) <= 0 {
err = errors.New("response is empty")
return
}
// 获取pageCount
res := model.DetailListInfo{}
err = json.Unmarshal(r.Resp, &res)
if err != nil {
return
}
count = int(res.PageCount)
return
}
// GetMovieDetail 处理详情接口请求返回的数据
func GetMovieDetail(pageNumber int, r util.RequestInfo) (list []model.MovieDetail, err error) {
// 防止json解析异常引发panic
defer func() {
if e := recover(); e != nil {
log.Println("GetMovieDetail Failed : ", e)
// AutoCollect 自动进行对所有已启用站点的采集任务
func AutoCollect(h int) {
// 获取采集站中所有站点, 进行遍历
for _, s := range system.GetCollectSourceList() {
// 如果当前站点为启用状态 则执行 HandleCollect 进行数据采集
if s.State {
if err := HandleCollect(s.Id, h); err != nil {
log.Println(err)
}
}
}()
// 设置分页请求参数
r.Params.Set(`ac`, `detail`)
r.Params.Set(`pg`, fmt.Sprint(pageNumber))
util.ApiGet(&r)
// 影视详情信息
details := model.DetailListInfo{}
// 如果返回数据为空则直接结束本次循环
if len(r.Resp) <= 0 {
err = errors.New("response is empty")
return
}
// 序列化详情数据
if err = json.Unmarshal(r.Resp, &details); err != nil {
return
}
// 处理details信息
list = dp.ProcessMovieDetailList(details.List)
return
}
// StarZero 情况站点内所有影片信息
func StarZero(h int) {
// 首先清除影视信息
system.FilmZero()
// 开启自动采集
AutoCollect(h)
}

View File

@@ -1 +1,158 @@
package spider
import (
"encoding/json"
"errors"
"fmt"
"log"
"server/model/collect"
"server/model/system"
"server/plugin/common/conver"
"server/plugin/common/util"
)
/*
Spider 数据 爬取 & 处理 & 转换
*/
type FilmCollect interface {
// GetCategoryTree 获取影视分类数据
GetCategoryTree(r util.RequestInfo) (*system.CategoryTree, error)
// GetPageCount 获取API接口的分页页数
GetPageCount(r util.RequestInfo) (count int, err error)
// GetDetail 获取指定pageNumber的具体数据
GetDetail(pageNumber int, r util.RequestInfo) (list []system.MovieDetail, err error)
}
// ------------------------------------------------- JSON Collect -------------------------------------------------
// JsonCollect 处理返回值为JSON格式的采集数据
type JsonCollect struct {
}
// GetCategoryTree 获取分类树形数据
func (jc *JsonCollect) GetCategoryTree(r util.RequestInfo) (*system.CategoryTree, error) {
// 设置请求参数信息
r.Params.Set(`ac`, "list")
r.Params.Set(`pg`, "1")
// 执行请求, 获取一次list数据
util.ApiGet(&r)
// 解析resp数据
filmListPage := collect.FilmListPage{}
if len(r.Resp) <= 0 {
log.Println("filmListPage 数据获取异常 : Resp Is Empty")
return nil, errors.New("filmListPage 数据获取异常 : Resp Is Empty")
}
err := json.Unmarshal(r.Resp, &filmListPage)
// 获取分类列表信息
cl := filmListPage.Class
// 组装分类数据信息树形结构
tree := conver.GenCategoryTree(cl)
// 将分类列表信息存储到redis
_ = collect.SaveFilmClass(cl)
return tree, err
}
// GetPageCount 获取总页数
func (jc *JsonCollect) GetPageCount(r util.RequestInfo) (count int, err error) {
// 发送请求获取pageCount, 默认为获取 ac = detail
if len(r.Params.Get("ac")) <= 0 {
r.Params.Set("ac", "detail")
}
r.Params.Set("pg", "1")
util.ApiGet(&r)
// 判断请求结果是否为空, 如果为空直接输出错误并终止
if len(r.Resp) <= 0 {
err = errors.New("response is empty")
return
}
// 获取pageCount
res := collect.CommonPage{}
err = json.Unmarshal(r.Resp, &res)
if err != nil {
return
}
count = int(res.PageCount)
return
}
// GetDetail 处理详情接口请求返回的数据
func (jc *JsonCollect) GetDetail(pageNumber int, r util.RequestInfo) (list []system.MovieDetail, err error) {
// 防止json解析异常引发panic
defer func() {
if e := recover(); e != nil {
log.Println("GetMovieDetail Failed : ", e)
}
}()
// 设置分页请求参数
r.Params.Set(`ac`, `detail`)
r.Params.Set(`pg`, fmt.Sprint(pageNumber))
util.ApiGet(&r)
// 影视详情信息
detailPage := collect.FilmDetailLPage{}
//details := system.DetailListInfo{}
// 如果返回数据为空则直接结束本次循环
if len(r.Resp) <= 0 {
err = errors.New("response is empty")
return
}
// 序列化详情数据
if err = json.Unmarshal(r.Resp, &detailPage); err != nil {
return
}
// 将影视原始详情信息保存到redis中
// 获取主站点uri
mc := system.GetCollectSourceListByGrade(system.MasterCollect)[0]
if mc.Uri == r.Uri {
collect.BatchSaveOriginalDetail(detailPage.List)
}
// 处理details信息
list = conver.ConvertFilmDetails(detailPage.List)
return
}
// GetFilmDetail 通过 RequestInfo 获取并解析出对应的 MovieDetail list
func (jc *JsonCollect) GetFilmDetail(r util.RequestInfo) (list []system.MovieDetail, err error) {
// 防止json解析异常引发panic
defer func() {
if e := recover(); e != nil {
log.Println("GetMovieDetail Failed : ", e)
}
}()
// 设置分页请求参数
r.Params.Set(`ac`, `detail`)
util.ApiGet(&r)
// 影视详情信息
detailPage := collect.FilmDetailLPage{}
//details := system.DetailListInfo{}
// 如果返回数据为空则直接结束本次循环
if len(r.Resp) <= 0 {
err = errors.New("response is empty")
return
}
// 序列化详情数据
if err = json.Unmarshal(r.Resp, &detailPage); err != nil {
return
}
// 将影视原始详情信息保存到redis中
// 获取主站点uri
//mc := system.GetCollectSourceListByGrade(system.MasterCollect)[0]
//if mc.Uri == r.Uri {
// collect.BatchSaveOriginalDetail(detailPage.List)
//}
// 处理details信息
list = conver.ConvertFilmDetails(detailPage.List)
return
}
// ------------------------------------------------- XML Collect -------------------------------------------------
// XmlCollect 处理返回值为XML格式的采集数据
type XmlCollect struct {
}

View File

@@ -1,17 +1,25 @@
package spider
import (
"errors"
"fmt"
"github.com/robfig/cron/v3"
"log"
"server/config"
"server/model"
"server/model/system"
"time"
)
var (
CronCollect *cron.Cron = CreateCron()
)
// RegularUpdateMovie 定时更新, 每半小时获取一次站点的最近x小时数据
func RegularUpdateMovie() {
//创建一个定时任务对象
c := cron.New(cron.WithSeconds())
// 开启定时任务每x 分钟更新一次最近x小时的影片数据
_, err := c.AddFunc(config.CornMovieUpdate, func() {
// 添加定时任务每x 分钟更新一次最近x小时的影片数据
taskId, err := c.AddFunc(config.CornMovieUpdate, func() {
// 执行更新最近x小时影片的Spider
log.Println("执行一次影片更新任务...")
UpdateMovieDetail()
@@ -20,7 +28,7 @@ func RegularUpdateMovie() {
})
// 开启定时任务每月最后一天凌晨两点, 执行一次清库重取数据
_, err = c.AddFunc(config.CornUpdateAll, func() {
taskId2, err := c.AddFunc(config.CornUpdateAll, func() {
StartSpiderRe()
})
@@ -28,10 +36,91 @@ func RegularUpdateMovie() {
log.Println("Corn Start Error: ", err)
}
c.Start()
log.Println(taskId, "------", taskId2)
log.Printf("%v", c.Entries())
//c.Start()
}
// StartCrontab 启动定时任务
func StartCrontab() {
// 从redis中读取待启动的定时任务列表
// 影片更新定时任务列表
CronCollect.Start()
}
func CreateCron() *cron.Cron {
return cron.New(cron.WithSeconds())
}
// AddFilmUpdateCron 添加影片更新定时任务
func AddFilmUpdateCron(id, spec string) (cron.EntryID, error) {
// 校验 spec 表达式的有效性
if err := ValidSpec(spec); err != nil {
return -99, errors.New(fmt.Sprint("定时任务添加失败,Cron表达式校验失败: ", err.Error()))
}
return CronCollect.AddFunc(spec, func() {
// 通过创建任务时生成的 Id 获取任务相关数据
ft, err := system.GetFilmTaskById(id)
if err != nil {
log.Println("FilmCollectCron Exec Failed: ", err)
}
// 如果当前定时任务状态为开启则执行对应的采集任务
if ft.State && ft.Model == 1 {
// 对指定ids的资源站数据进行更新操作
BatchCollect(ft.Time, ft.Ids...)
}
// 任务执行完毕
log.Printf("执行一次定时任务: Task[%s]\n", ft.Id)
})
}
// AddAutoUpdateCron 自动更新定时任务
func AddAutoUpdateCron(id, spec string) (cron.EntryID, error) {
// 校验 spec 表达式的有效性
if err := ValidSpec(spec); err != nil {
return -99, errors.New(fmt.Sprint("定时任务添加失败,Cron表达式校验失败: ", err.Error()))
}
return CronCollect.AddFunc(spec, func() {
// 通过 Id 获取任务相关数据
ft, err := system.GetFilmTaskById(id)
if err != nil {
log.Println("FilmCollectCron Exec Failed: ", err)
}
// 开启对系统中已启用站点的自动更新
if ft.State && ft.Model == 0 {
AutoCollect(ft.Time)
log.Println("执行一次自动更新任务")
}
})
}
// RemoveCron 删除定时任务
func RemoveCron(id cron.EntryID) {
// 通过定时任务EntryID移出对应的定时任务
CronCollect.Remove(id)
}
// GetEntryById 返回定时任务的相关时间信息
func GetEntryById(id cron.EntryID) cron.Entry {
log.Printf("%+v\n", CronCollect.Entries())
log.Println("", CronCollect.Entry(id).Next.Format(time.DateTime))
return CronCollect.Entry(id)
}
// ValidSpec 校验cron表达式是否有效 不能精确到秒
func ValidSpec(spec string) error {
// 自定义解释器
parser := cron.NewParser(cron.Second | cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow | cron.Descriptor)
//if _, err := parser.Parse(spec); err != nil {
// return err
//}
_, err := parser.Parse(spec)
return err
}
// 清理API接口数据缓存
func clearCache() {
model.RemoveCache(config.IndexCacheKey)
system.RemoveCache(config.IndexCacheKey)
}

View File

@@ -0,0 +1,275 @@
package spider
import (
"encoding/json"
"encoding/xml"
"errors"
"fmt"
"log"
"net/url"
"server/config"
"server/model/collect"
"server/model/system"
"server/plugin/common/util"
"time"
)
/*
舍弃第一版的数据处理思路, v2版本
直接分页获取采集站点的影片详情信息
*/
/*
1. 选择一个采集主站点, mysql检索表中只存储主站点检索的信息
2. 采集多个站点数据
2.1 主站点的采集数据完整地保存相关信息, basicInfo movieDetail search 等信息
2.2 其余站点数据只存储 name(影片名称), playUrl(播放url), 存储形式 Key<hash(name)>:value([]MovieUrlInfo)
3. api数据格式不变, 获取影片详情时通过subTitle 去redis匹配其他站点的对应播放源并整合到主站详情信息的playUrl中
4. 影片搜索时不再使用name进行匹配, 改为使用 subTitle 进行匹配
*/
// StartSpider 执行多源spider
func StartSpider() {
// 保存分类树
CategoryList()
log.Println("CategoryList 影片分类信息保存完毕")
// 爬取主站点数据
MainSiteSpider()
log.Println("MainSiteSpider 主站点影片信息保存完毕")
// 查找并创建search数据库, 保存search信息, 添加索引
time.Sleep(time.Second * 10)
system.SyncSearchInfo(0)
system.AddSearchIndex()
log.Println("SearchInfoToMdb 影片检索信息保存完毕")
//获取其他站点数据
scl := system.GetCollectSourceListByGrade(system.SlaveCollect)
go MtSiteSpider(scl...)
log.Println("Spider End , 数据保存执行完成")
time.Sleep(time.Second * 10)
}
// CategoryList 获取分类数据
func CategoryList() {
// 获取主站点uri
mc := system.GetCollectSourceListByGrade(system.MasterCollect)[0]
// 获取分类树形数据
categoryTree, err := spiderCore.GetCategoryTree(util.RequestInfo{Uri: mc.Uri, Params: url.Values{}})
if err != nil {
log.Println("GetCategoryTree Error: ", err)
return
}
// 保存 tree 到redis
err = system.SaveCategoryTree(categoryTree)
if err != nil {
log.Println("SaveCategoryTree Error: ", err)
}
}
// MainSiteSpider 主站点数据处理
func MainSiteSpider() {
// 获取主站点uri
mc := system.GetCollectSourceListByGrade(system.MasterCollect)[0]
// 获取分页页数
pageCount, err := spiderCore.GetPageCount(util.RequestInfo{Uri: mc.Uri, Params: url.Values{}})
// 主站点分页出错直接终止程序
if err != nil {
panic(err)
}
// 开启协程加快分页请求速度
ch := make(chan int, pageCount)
waitCh := make(chan int)
for i := 1; i <= pageCount; i++ {
ch <- i
}
close(ch)
for i := 0; i < config.MAXGoroutine; i++ {
go func() {
defer func() { waitCh <- 0 }()
for {
pg, ok := <-ch
if !ok {
break
}
list, e := spiderCore.GetDetail(pg, util.RequestInfo{Uri: mc.Uri, Params: url.Values{}})
if e != nil {
log.Println("GetMovieDetail Error: ", err)
continue
}
// 保存影片详情信息到redis
if err = system.SaveDetails(list); err != nil {
log.Println("SaveDetails Error: ", err)
}
}
}()
}
for i := 0; i < config.MAXGoroutine; i++ {
<-waitCh
}
}
// MtSiteSpider 附属站点数据源处理
func MtSiteSpider(scl ...system.FilmSource) {
for _, s := range scl {
// 执行每个站点的播放url缓存
PlayDetailSpider(s)
log.Println(s.Name, "playUrl 爬取完毕!!!")
}
}
// PlayDetailSpider SpiderSimpleInfo 获取单个站点的播放源
func PlayDetailSpider(s system.FilmSource) {
// 获取分页页数
pageCount, err := spiderCore.GetPageCount(util.RequestInfo{Uri: s.Uri, Params: url.Values{}})
// 出错直接终止当前站点数据获取
if err != nil {
log.Println(err)
return
}
// 开启协程加快分页请求速度
ch := make(chan int, pageCount)
waitCh := make(chan int)
for i := 1; i <= pageCount; i++ {
ch <- i
}
close(ch)
for i := 0; i < config.MAXGoroutine; i++ {
go func() {
defer func() { waitCh <- 0 }()
for {
pg, ok := <-ch
if !ok {
break
}
list, e := spiderCore.GetDetail(pg, util.RequestInfo{Uri: s.Uri, Params: url.Values{}})
if e != nil || len(list) <= 0 {
log.Println("GetMovieDetail Error: ", err)
continue
}
// 保存影片播放信息到redis
if err = system.SaveSitePlayList(s.Name, list); err != nil {
log.Println("SaveDetails Error: ", err)
}
}
}()
}
for i := 0; i < config.MAXGoroutine; i++ {
<-waitCh
}
}
// UpdateMovieDetail 定时更新主站点和其余播放源信息
func UpdateMovieDetail() {
// 更新主站系列信息
UpdateMainDetail()
// 更新附属播放源数据信息
scl := system.GetCollectSourceListByGrade(system.SlaveCollect)
UpdatePlayDetail(scl...)
}
// UpdateMainDetail 更新主站点的最新影片
func UpdateMainDetail() {
// 获取主站点uri
l := system.GetCollectSourceListByGrade(system.MasterCollect)
mc := system.FilmSource{}
for _, v := range l {
if len(v.Uri) > 0 {
mc = v
break
}
}
// 获取分页页数
r := util.RequestInfo{Uri: mc.Uri, Params: url.Values{}}
r.Params.Set("h", config.UpdateInterval)
pageCount, err := spiderCore.GetPageCount(r)
if err != nil {
log.Printf("Update MianStieDetail failed\n")
}
// 保存本次更新的所有详情信息
var ds []system.MovieDetail
// 获取分页数据
for i := 1; i <= pageCount; i++ {
list, err := spiderCore.GetDetail(i, r)
if err != nil {
continue
}
// 保存更新的影片信息, 同类型直接覆盖
if err = system.SaveDetails(list); err != nil {
log.Println("Update MainSiteDetail failed, SaveDetails Error ")
}
ds = append(ds, list...)
}
// 整合详情信息切片
var sl []system.SearchInfo
for _, d := range ds {
// 通过id 获取对应的详情信息
sl = append(sl, system.ConvertSearchInfo(d))
}
// 调用批量保存或更新方法, 如果对应mid数据存在则更新, 否则执行插入
system.BatchSaveOrUpdate(sl)
}
// UpdatePlayDetail 更新最x小时的影片播放源数据
func UpdatePlayDetail(scl ...system.FilmSource) {
for _, s := range scl {
// 获取单个站点的分页数
r := util.RequestInfo{Uri: s.Uri, Params: url.Values{}}
r.Params.Set("h", config.UpdateInterval)
pageCount, err := spiderCore.GetPageCount(r)
if err != nil {
log.Printf("Update %s playDetail failed\n", s.Name)
}
for i := 1; i <= pageCount; i++ {
// 获取详情信息, 保存到对应hashKey中
list, e := spiderCore.GetDetail(i, r)
if e != nil || len(list) <= 0 {
log.Println("GetMovieDetail Error: ", err)
continue
}
// 保存影片播放信息到redis
if err = system.SaveSitePlayList(s.Name, list); err != nil {
log.Println("SaveDetails Error: ", err)
}
}
}
}
// StartSpiderRe 清空存储数据,从零开始获取
func StartSpiderRe() {
// 删除已有的存储数据, redis 和 mysql中的存储数据全部清空
system.FilmZero()
// 执行完整数据获取
StartSpider()
}
// =========================公共方法==============================
// CollectApiTest 测试采集接口是否可用
func CollectApiTest(s system.FilmSource) error {
// 使用当前采集站接口采集一页数据
r := util.RequestInfo{Uri: s.Uri, Params: url.Values{}}
r.Params.Set("ac", s.CollectType.GetActionType())
r.Params.Set("pg", "3")
err := util.ApiTest(&r)
// 首先核对接口返回值类型
if err == nil {
// 如果返回值类型为Json则执行Json序列化
if s.ResultModel == system.JsonResult {
var dp = collect.FilmDetailLPage{}
if err = json.Unmarshal(r.Resp, &dp); err != nil {
return errors.New(fmt.Sprint("测试失败, 返回数据异常, JSON序列化失败: ", err))
}
return nil
} else if s.ResultModel == system.XmlResult {
// 如果返回值类型为XML则执行XML序列化
var rd = collect.RssD{}
if err = xml.Unmarshal(r.Resp, &rd); err != nil {
return errors.New(fmt.Sprint("测试失败, 返回数据异常, XML序列化失败", err))
}
return nil
}
return errors.New("测试失败, 接口返回值类型不符合规范")
}
return errors.New(fmt.Sprint("测试失败, 请求响应异常 : ", err.Error()))
}