mirror of
https://github.com/FloatTech/ZeroBot-Plugin.git
synced 2026-02-12 18:20:27 +00:00
feat: 添加 rsshub (#1232)
This commit is contained in:
134
plugin/rsshub/domain/job.go
Normal file
134
plugin/rsshub/domain/job.go
Normal file
@@ -0,0 +1,134 @@
|
||||
// Package domain rsshub领域逻辑
|
||||
package domain
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/mmcdole/gofeed"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
// syncRss 同步所有频道
|
||||
// 返回:更新的频道&订阅信息 map[int64]*RssClientView
|
||||
// 1. 获取所有频道
|
||||
// 2. 遍历所有频道,检查频道是否更新
|
||||
// 3. 如果更新,获取更新的内容,但是返回的数据
|
||||
func (repo *RssDomain) syncRss(ctx context.Context) (updated map[int64]*RssClientView, err error) {
|
||||
updated = make(map[int64]*RssClientView)
|
||||
// 获取所有频道
|
||||
sources, err := repo.storage.GetSources(ctx)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
// 遍历所有源,获取每个channel对应的rss内容
|
||||
rssView := make([]*RssClientView, len(sources))
|
||||
for i, channel := range sources {
|
||||
var feed *gofeed.Feed
|
||||
// 从site获取rss内容
|
||||
feed, err = repo.rssHubClient.FetchFeed(channel.RssHubFeedPath)
|
||||
// 如果获取失败,则跳过
|
||||
if err != nil {
|
||||
logrus.WithContext(ctx).Errorf("[rsshub syncRss] fetch path(%+v) error: %v", channel.RssHubFeedPath, err)
|
||||
continue
|
||||
}
|
||||
rv := convertFeedToRssView(0, channel.RssHubFeedPath, feed)
|
||||
rssView[i] = rv
|
||||
}
|
||||
// 检查频道是否更新
|
||||
for _, cv := range rssView {
|
||||
if cv == nil {
|
||||
continue
|
||||
}
|
||||
var needUpdate bool
|
||||
needUpdate, err = repo.checkSourceNeedUpdate(ctx, cv.Source)
|
||||
if err != nil {
|
||||
logrus.WithContext(ctx).Errorf("[rsshub syncRss] checkSourceNeedUpdate error: %v", err)
|
||||
err = nil
|
||||
continue
|
||||
}
|
||||
// 保存
|
||||
logrus.WithContext(ctx).Infof("[rsshub syncRss] cv %+v, need update(real): %v", cv.Source, needUpdate)
|
||||
// 如果需要更新,更新channel 和 content
|
||||
if needUpdate {
|
||||
err = repo.storage.UpsertSource(ctx, cv.Source)
|
||||
if err != nil {
|
||||
logrus.WithContext(ctx).Errorf("[rsshub syncRss] upsert source error: %v", err)
|
||||
}
|
||||
}
|
||||
var updateChannelView = &RssClientView{Source: cv.Source, Contents: []*RssContent{}}
|
||||
err = repo.processContentsUpdate(ctx, cv, updateChannelView)
|
||||
if err != nil {
|
||||
logrus.WithContext(ctx).Errorf("[rsshub syncRss] processContentsUpdate error: %v", err)
|
||||
continue
|
||||
}
|
||||
if len(updateChannelView.Contents) == 0 {
|
||||
logrus.WithContext(ctx).Infof("[rsshub syncRss] cv %s, no new content", cv.Source.RssHubFeedPath)
|
||||
continue
|
||||
}
|
||||
updateChannelView.Sort()
|
||||
updated[updateChannelView.Source.ID] = updateChannelView
|
||||
logrus.WithContext(ctx).Debugf("[rsshub syncRss] cv %s, new contents: %v", cv.Source.RssHubFeedPath, len(updateChannelView.Contents))
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// checkSourceNeedUpdate 检查频道是否需要更新
|
||||
func (repo *RssDomain) checkSourceNeedUpdate(ctx context.Context, source *RssSource) (needUpdate bool, err error) {
|
||||
var sourceInDB *RssSource
|
||||
sourceInDB, err = repo.storage.GetSourceByRssHubFeedLink(ctx, source.RssHubFeedPath)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
if sourceInDB == nil {
|
||||
logrus.WithContext(ctx).Errorf("[rsshub syncRss] source not found: %v", source.RssHubFeedPath)
|
||||
return
|
||||
}
|
||||
source.ID = sourceInDB.ID
|
||||
// 检查是否需要更新到db
|
||||
if sourceInDB.IfNeedUpdate(source) {
|
||||
needUpdate = true
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// processContentsUpdate 处理内容(s)更新
|
||||
func (repo *RssDomain) processContentsUpdate(ctx context.Context, cv *RssClientView, updateChannelView *RssClientView) error {
|
||||
var err error
|
||||
for _, content := range cv.Contents {
|
||||
if content == nil {
|
||||
continue
|
||||
}
|
||||
content.RssSourceID = cv.Source.ID
|
||||
var existed bool
|
||||
existed, err = repo.processContentItemUpdate(ctx, content)
|
||||
if err != nil {
|
||||
logrus.WithContext(ctx).Errorf("[rsshub syncRss] upsert content error: %v", err)
|
||||
err = nil
|
||||
continue
|
||||
}
|
||||
if !existed {
|
||||
updateChannelView.Contents = append(updateChannelView.Contents, content)
|
||||
logrus.WithContext(ctx).Infof("[rsshub syncRss] cv %s, add new content: %v", cv.Source.RssHubFeedPath, content.Title)
|
||||
}
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// processContentItemUpdate 处理单个内容更新
|
||||
func (repo *RssDomain) processContentItemUpdate(ctx context.Context, content *RssContent) (existed bool, err error) {
|
||||
existed, err = repo.storage.IsContentHashIDExist(ctx, content.HashID)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
// 不需要更新&不需要发送
|
||||
if existed {
|
||||
return
|
||||
}
|
||||
// 保存
|
||||
err = repo.storage.UpsertContent(ctx, content)
|
||||
if err != nil {
|
||||
logrus.WithContext(ctx).Errorf("[rsshub syncRss] upsert content error: %v", err)
|
||||
return
|
||||
}
|
||||
return
|
||||
}
|
||||
118
plugin/rsshub/domain/model.go
Normal file
118
plugin/rsshub/domain/model.go
Normal file
@@ -0,0 +1,118 @@
|
||||
package domain
|
||||
|
||||
import (
|
||||
"encoding/hex"
|
||||
"hash/fnv"
|
||||
"sort"
|
||||
"time"
|
||||
)
|
||||
|
||||
// ======== RSS ========[START]
|
||||
|
||||
func genHashForFeedItem(link, guid string) string {
|
||||
h := fnv.New32()
|
||||
// 分三次写入数据:link、分隔符、guid
|
||||
_, _ = h.Write([]byte(link))
|
||||
_, _ = h.Write([]byte("||"))
|
||||
_, _ = h.Write([]byte(guid))
|
||||
|
||||
encoded := hex.EncodeToString(h.Sum(nil))
|
||||
return encoded
|
||||
}
|
||||
|
||||
// RssClientView 频道视图
|
||||
type RssClientView struct {
|
||||
Source *RssSource
|
||||
Contents []*RssContent
|
||||
}
|
||||
|
||||
// ======== RSS ========[END]
|
||||
|
||||
// ======== DB ========[START]
|
||||
|
||||
const (
|
||||
tableNameRssSource = "rss_source"
|
||||
tableNameRssContent = "rss_content"
|
||||
tableNameRssSubscribe = "rss_subscribe"
|
||||
)
|
||||
|
||||
// RssSource RSS频道
|
||||
type RssSource struct {
|
||||
// Id 自增id
|
||||
ID int64 `gorm:"column:id;primary_key;AUTO_INCREMENT"`
|
||||
// RssHubFeedPath 频道路由 用于区分rss_hub 不同的频道 例如: `/bangumi/tv/calendar/today`
|
||||
RssHubFeedPath string `gorm:"column:rss_hub_feed_path;not null;unique;" json:"rss_hub_feed_path"`
|
||||
// Title 频道标题
|
||||
Title string `gorm:"column:title" json:"title"`
|
||||
// ChannelDesc 频道描述
|
||||
ChannelDesc string `gorm:"column:channel_desc" json:"channel_desc"`
|
||||
// ImageURL 频道图片
|
||||
ImageURL string `gorm:"column:image_url" json:"image_url"`
|
||||
// Link 频道链接
|
||||
Link string `gorm:"column:link" json:"link"`
|
||||
// UpdatedParsed RSS页面更新时间
|
||||
UpdatedParsed time.Time `gorm:"column:updated_parsed" json:"updated_parsed"`
|
||||
// Mtime update time
|
||||
Mtime time.Time `gorm:"column:mtime;default:current_timestamp;" json:"mtime"`
|
||||
}
|
||||
|
||||
// TableName ...
|
||||
func (RssSource) TableName() string {
|
||||
return tableNameRssSource
|
||||
}
|
||||
|
||||
// IfNeedUpdate ...
|
||||
func (r RssSource) IfNeedUpdate(cmp *RssSource) bool {
|
||||
if r.Link != cmp.Link {
|
||||
return false
|
||||
}
|
||||
return r.UpdatedParsed.Unix() < cmp.UpdatedParsed.Unix()
|
||||
}
|
||||
|
||||
// RssContent 订阅的RSS频道的推送信息
|
||||
type RssContent struct {
|
||||
// Id 自增id
|
||||
ID int64 `gorm:"column:id;primary_key;AUTO_INCREMENT"`
|
||||
HashID string `gorm:"column:hash_id;unique" json:"hash_id"`
|
||||
RssSourceID int64 `gorm:"column:rss_source_id;not null" json:"rss_source_id"`
|
||||
Title string `gorm:"column:title" json:"title"`
|
||||
Description string `gorm:"column:description" json:"description"`
|
||||
Link string `gorm:"column:link" json:"link"`
|
||||
Date time.Time `gorm:"column:date" json:"date"`
|
||||
Author string `gorm:"column:author" json:"author"`
|
||||
Thumbnail string `gorm:"column:thumbnail" json:"thumbnail"`
|
||||
Content string `gorm:"column:content" json:"content"`
|
||||
// Mtime update time
|
||||
Mtime time.Time `gorm:"column:mtime;default:current_timestamp;" json:"mtime"`
|
||||
}
|
||||
|
||||
// TableName ...
|
||||
func (RssContent) TableName() string {
|
||||
return tableNameRssContent
|
||||
}
|
||||
|
||||
// Sort ... order by Date desc
|
||||
func (r *RssClientView) Sort() {
|
||||
sort.Slice(r.Contents, func(i, j int) bool {
|
||||
return r.Contents[i].Date.Unix() > r.Contents[j].Date.Unix()
|
||||
})
|
||||
}
|
||||
|
||||
// RssSubscribe 订阅关系表:群组-RSS频道
|
||||
type RssSubscribe struct {
|
||||
// Id 自增id
|
||||
ID int64 `gorm:"column:id;primary_key;AUTO_INCREMENT"`
|
||||
// 订阅群组
|
||||
GroupID int64 `gorm:"column:group_id;not null;uniqueIndex:uk_sid_gid"`
|
||||
// 订阅频道
|
||||
RssSourceID int64 `gorm:"column:rss_source_id;not null;uniqueIndex:uk_sid_gid"`
|
||||
// Mtime update time
|
||||
Mtime time.Time `gorm:"column:mtime;default:current_timestamp;" json:"mtime"`
|
||||
}
|
||||
|
||||
// TableName ...
|
||||
func (RssSubscribe) TableName() string {
|
||||
return tableNameRssSubscribe
|
||||
}
|
||||
|
||||
// ======== DB ========[END]
|
||||
101
plugin/rsshub/domain/rawFeed.go
Normal file
101
plugin/rsshub/domain/rawFeed.go
Normal file
@@ -0,0 +1,101 @@
|
||||
package domain
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"github.com/FloatTech/floatbox/web"
|
||||
"github.com/mmcdole/gofeed"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
var (
|
||||
// RSSHubMirrors RSSHub镜像站地址列表,第一个为默认地址
|
||||
rssHubMirrors = []string{
|
||||
"https://rsshub.rssforever.com",
|
||||
"https://rss.injahow.cn",
|
||||
}
|
||||
)
|
||||
|
||||
// RssHubClient rss hub client (http)
|
||||
type RssHubClient struct {
|
||||
*http.Client
|
||||
}
|
||||
|
||||
// FetchFeed 获取rss feed信息
|
||||
func (c *RssHubClient) FetchFeed(path string) (feed *gofeed.Feed, err error) {
|
||||
var data []byte
|
||||
// 遍历 rssHubMirrors,直到获取成功
|
||||
for _, mirror := range rssHubMirrors {
|
||||
data, err = web.RequestDataWith(c.Client, mirror+path, "GET", "", web.RandUA(), nil)
|
||||
if err == nil && len(data) > 0 {
|
||||
break
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
logrus.Errorf("[rsshub FetchFeed] fetch feed error: %v", err)
|
||||
return nil, err
|
||||
}
|
||||
if len(data) == 0 {
|
||||
logrus.Errorf("[rsshub FetchFeed] fetch feed error: data is empty")
|
||||
return nil, errors.New("feed data is empty")
|
||||
}
|
||||
feed, err = gofeed.NewParser().Parse(bytes.NewBuffer(data))
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func convertFeedToRssView(channelID int64, cPath string, feed *gofeed.Feed) (view *RssClientView) {
|
||||
var imgURL string
|
||||
if feed.Image != nil {
|
||||
imgURL = feed.Image.URL
|
||||
}
|
||||
view = &RssClientView{
|
||||
Source: &RssSource{
|
||||
ID: channelID,
|
||||
RssHubFeedPath: cPath,
|
||||
Title: feed.Title,
|
||||
ChannelDesc: feed.Description,
|
||||
ImageURL: imgURL,
|
||||
Link: feed.Link,
|
||||
UpdatedParsed: *(feed.UpdatedParsed),
|
||||
Mtime: time.Now(),
|
||||
},
|
||||
// 不用定长,后面可能会过滤一些元素再append
|
||||
Contents: []*RssContent{},
|
||||
}
|
||||
// convert feed items to rss content
|
||||
for _, item := range feed.Items {
|
||||
if item.Link == "" || item.Title == "" {
|
||||
continue
|
||||
}
|
||||
var thumbnail string
|
||||
if item.Image != nil {
|
||||
thumbnail = item.Image.URL
|
||||
}
|
||||
var publishedParsed = item.PublishedParsed
|
||||
if publishedParsed == nil {
|
||||
publishedParsed = &time.Time{}
|
||||
}
|
||||
aus, _ := json.Marshal(item.Authors)
|
||||
view.Contents = append(view.Contents, &RssContent{
|
||||
ID: 0,
|
||||
HashID: genHashForFeedItem(item.Link, item.GUID),
|
||||
RssSourceID: channelID,
|
||||
Title: item.Title,
|
||||
Description: item.Description,
|
||||
Link: item.Link,
|
||||
Date: *publishedParsed,
|
||||
Author: string(aus),
|
||||
Thumbnail: thumbnail,
|
||||
Content: item.Content,
|
||||
Mtime: time.Now(),
|
||||
})
|
||||
}
|
||||
return
|
||||
}
|
||||
178
plugin/rsshub/domain/rssHub.go
Normal file
178
plugin/rsshub/domain/rssHub.go
Normal file
@@ -0,0 +1,178 @@
|
||||
package domain
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"net/http"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"github.com/jinzhu/gorm"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
// RssDomain RssRepo定义
|
||||
type RssDomain struct {
|
||||
storage *repoStorage
|
||||
rssHubClient *RssHubClient
|
||||
}
|
||||
|
||||
// NewRssDomain 新建RssDomain,调用方保证单例模式
|
||||
func NewRssDomain(dbPath string) (*RssDomain, error) {
|
||||
return newRssDomain(dbPath)
|
||||
}
|
||||
|
||||
func newRssDomain(dbPath string) (*RssDomain, error) {
|
||||
if _, err := os.Stat(dbPath); err != nil || os.IsNotExist(err) {
|
||||
// 生成文件
|
||||
f, err := os.Create(dbPath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer f.Close()
|
||||
}
|
||||
orm, err := gorm.Open("sqlite3", dbPath)
|
||||
if err != nil {
|
||||
logrus.Errorf("[rsshub NewRssDomain] open db error: %v", err)
|
||||
panic(err)
|
||||
}
|
||||
repo := &RssDomain{
|
||||
storage: &repoStorage{orm: orm},
|
||||
rssHubClient: &RssHubClient{Client: http.DefaultClient},
|
||||
}
|
||||
err = repo.storage.initDB()
|
||||
if err != nil {
|
||||
logrus.Errorf("[rsshub NewRssDomain] open db error: %v", err)
|
||||
panic(err)
|
||||
}
|
||||
return repo, nil
|
||||
}
|
||||
|
||||
// Subscribe QQ群订阅Rss频道
|
||||
func (repo *RssDomain) Subscribe(ctx context.Context, gid int64, feedPath string) (
|
||||
rv *RssClientView, isChannelExisted, isSubExisted bool, err error) {
|
||||
// 验证
|
||||
feed, err := repo.rssHubClient.FetchFeed(feedPath)
|
||||
if err != nil {
|
||||
logrus.WithContext(ctx).Errorf("[rsshub Subscribe] add source error: %v", err)
|
||||
return
|
||||
}
|
||||
logrus.WithContext(ctx).Infof("[rsshub Subscribe] try get source success: %v", len(feed.Title))
|
||||
// 新建source结构体
|
||||
rv = convertFeedToRssView(0, feedPath, feed)
|
||||
feedChannel, err := repo.storage.GetSourceByRssHubFeedLink(ctx, feedPath)
|
||||
if err != nil {
|
||||
logrus.WithContext(ctx).Errorf("[rsshub Subscribe] query source by feedPath error: %v", err)
|
||||
return
|
||||
}
|
||||
// 如果已经存在
|
||||
if feedChannel != nil {
|
||||
logrus.WithContext(ctx).Warningf("[rsshub Subscribe] source existed: %v", feedChannel)
|
||||
isChannelExisted = true
|
||||
} else {
|
||||
// 不存在的情况,要把更新时间置空,保证下一次同步时能够更新
|
||||
rv.Source.UpdatedParsed = time.Time{}
|
||||
}
|
||||
// 保存
|
||||
err = repo.storage.UpsertSource(ctx, rv.Source)
|
||||
if err != nil {
|
||||
logrus.WithContext(ctx).Errorf("[rsshub Subscribe] save source error: %v", err)
|
||||
return
|
||||
}
|
||||
logrus.Infof("[rsshub Subscribe] save/update source success %v", rv.Source.ID)
|
||||
// 添加群号到订阅
|
||||
subscribe, err := repo.storage.GetSubscribeByID(ctx, gid, rv.Source.ID)
|
||||
if err != nil {
|
||||
logrus.WithContext(ctx).Errorf("[rsshub Subscribe] query subscribe error: %v", err)
|
||||
return
|
||||
}
|
||||
logrus.WithContext(ctx).Infof("[rsshub Subscribe] query subscribe success: %v", subscribe)
|
||||
// 如果已经存在,直接返回
|
||||
if subscribe != nil {
|
||||
isSubExisted = true
|
||||
logrus.WithContext(ctx).Infof("[rsshub Subscribe] subscribe existed: %v", subscribe)
|
||||
return
|
||||
}
|
||||
// 如果不存在,保存
|
||||
err = repo.storage.CreateSubscribe(ctx, gid, rv.Source.ID)
|
||||
if err != nil {
|
||||
logrus.WithContext(ctx).Errorf("[rsshub Subscribe] save subscribe error: %v", err)
|
||||
return
|
||||
}
|
||||
logrus.WithContext(ctx).Infof("[rsshub Subscribe] success: %v", len(rv.Contents))
|
||||
return
|
||||
}
|
||||
|
||||
// Unsubscribe 群组取消订阅
|
||||
func (repo *RssDomain) Unsubscribe(ctx context.Context, gid int64, feedPath string) (err error) {
|
||||
existedSubscribes, ifExisted, err := repo.storage.GetIfExistedSubscribe(ctx, gid, feedPath)
|
||||
if err != nil {
|
||||
logrus.WithContext(ctx).Errorf("[rsshub Subscribe] query sub by route error: %v", err)
|
||||
return errors.New("数据库错误")
|
||||
}
|
||||
logrus.WithContext(ctx).Infof("[rsshub Subscribe] query source by route success: %v", existedSubscribes)
|
||||
// 如果不存在订阅关系,直接返回
|
||||
if !ifExisted || existedSubscribes == nil {
|
||||
logrus.WithContext(ctx).Infof("[rsshub Subscribe] source existed: %v", ifExisted)
|
||||
return errors.New("频道不存在")
|
||||
}
|
||||
err = repo.storage.DeleteSubscribe(ctx, existedSubscribes.ID)
|
||||
if err != nil {
|
||||
logrus.WithContext(ctx).Errorf("[rsshub Subscribe] delete source error: %v", err)
|
||||
return errors.New("删除失败")
|
||||
}
|
||||
// 查询是否还有群订阅这个频道
|
||||
subscribesNeedsToDel, err := repo.storage.GetSubscribesBySource(ctx, feedPath)
|
||||
if err != nil {
|
||||
logrus.WithContext(ctx).Errorf("[rsshub Subscribe] query source by route error: %v", err)
|
||||
return
|
||||
}
|
||||
// 没有群订阅的时候,把频道删除
|
||||
if len(subscribesNeedsToDel) == 0 {
|
||||
err = repo.storage.DeleteSource(ctx, existedSubscribes.RssSourceID)
|
||||
if err != nil {
|
||||
logrus.WithContext(ctx).Errorf("[rsshub Subscribe] delete source error: %v", err)
|
||||
return errors.New("清除频道信息失败")
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// GetSubscribedChannelsByGroupID 获取群对应的订阅的频道信息
|
||||
func (repo *RssDomain) GetSubscribedChannelsByGroupID(ctx context.Context, gid int64) ([]*RssClientView, error) {
|
||||
channels, err := repo.storage.GetSubscribedChannelsByGroupID(ctx, gid)
|
||||
if err != nil {
|
||||
logrus.WithContext(ctx).Errorf("[rsshub GetSubscribedChannelsByGroupID] GetSubscribedChannelsByGroupID error: %v", err)
|
||||
return nil, err
|
||||
}
|
||||
rv := make([]*RssClientView, len(channels))
|
||||
logrus.WithContext(ctx).Infof("[rsshub GetSubscribedChannelsByGroupID] query subscribe success: %v", len(channels))
|
||||
for i, cn := range channels {
|
||||
rv[i] = &RssClientView{
|
||||
Source: cn,
|
||||
}
|
||||
}
|
||||
return rv, nil
|
||||
}
|
||||
|
||||
// Sync 同步任务,按照群组订阅情况做好map切片
|
||||
func (repo *RssDomain) Sync(ctx context.Context) (groupView map[int64][]*RssClientView, err error) {
|
||||
groupView = make(map[int64][]*RssClientView)
|
||||
// 获取所有Rss频道
|
||||
// 获取所有频道
|
||||
updatedViews, err := repo.syncRss(ctx)
|
||||
if err != nil {
|
||||
logrus.WithContext(ctx).Errorf("[rsshub Sync] sync rss feed error: %v", err)
|
||||
return
|
||||
}
|
||||
logrus.WithContext(ctx).Infof("[rsshub Sync] updated channels: %v", len(updatedViews))
|
||||
subscribes, err := repo.storage.GetSubscribes(ctx)
|
||||
if err != nil {
|
||||
logrus.WithContext(ctx).Errorf("[rsshub Sync] get subscribes error: %v", err)
|
||||
return
|
||||
}
|
||||
for _, subscribe := range subscribes {
|
||||
groupView[subscribe.GroupID] = append(groupView[subscribe.GroupID], updatedViews[subscribe.RssSourceID])
|
||||
}
|
||||
return
|
||||
}
|
||||
105
plugin/rsshub/domain/rssHub_test.go
Normal file
105
plugin/rsshub/domain/rssHub_test.go
Normal file
@@ -0,0 +1,105 @@
|
||||
package domain
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestNewRssDomain(t *testing.T) {
|
||||
dm, err := newRssDomain("rsshub.db")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
return
|
||||
}
|
||||
if dm == nil {
|
||||
t.Fatal("domain is nil")
|
||||
}
|
||||
}
|
||||
|
||||
//var testRssHubChannelUrl = "https://rsshub.rssforever.com/bangumi/tv/calendar/today"
|
||||
|
||||
var dm, _ = newRssDomain("rsshub.db")
|
||||
|
||||
func TestSub(t *testing.T) {
|
||||
testCases := []struct {
|
||||
name string
|
||||
feedLink string
|
||||
gid int64
|
||||
}{
|
||||
{
|
||||
name: "test1",
|
||||
feedLink: "/bangumi/tv/calendar/today",
|
||||
gid: 99,
|
||||
},
|
||||
{
|
||||
name: "test2",
|
||||
feedLink: "/go-weekly",
|
||||
gid: 99,
|
||||
},
|
||||
{
|
||||
name: "test3",
|
||||
feedLink: "/go-weekly",
|
||||
gid: 123,
|
||||
},
|
||||
{
|
||||
name: "test3",
|
||||
feedLink: "/go-weekly",
|
||||
gid: 321,
|
||||
},
|
||||
{
|
||||
name: "test3",
|
||||
feedLink: "/go-weekly",
|
||||
gid: 4123,
|
||||
},
|
||||
}
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
channel, ifExisted, ifSub, err := dm.Subscribe(ctx, tc.gid, tc.feedLink)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
return
|
||||
}
|
||||
t.Logf("[TEST] add sub res: %+v,%+v,%+v\n", channel, ifExisted, ifSub)
|
||||
res, ext, err := dm.storage.GetIfExistedSubscribe(ctx, tc.gid, tc.feedLink)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
return
|
||||
}
|
||||
t.Logf("[TEST] if exist: %+v,%+v", res, ext)
|
||||
channels, err := dm.GetSubscribedChannelsByGroupID(ctx, 2)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
return
|
||||
}
|
||||
t.Logf("[TEST] 2 channels: %+v", channels)
|
||||
// del
|
||||
//err = dm.Unsubscribe(ctx, tc.gid, tc.feedLink)
|
||||
//if err != nil {
|
||||
// t.Fatal(err)
|
||||
// return
|
||||
//}
|
||||
//res, ext, err = dm.storage.GetIfExistedSubscribe(ctx, tc.gid, tc.feedLink)
|
||||
//if err != nil {
|
||||
// t.Fatal(err)
|
||||
// return
|
||||
//}
|
||||
//t.Logf("[TEST] after del: %+v,%+v", res, ext)
|
||||
//if res != nil || ext {
|
||||
// t.Fatal("delete failed")
|
||||
//}
|
||||
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func Test_SyncFeed(t *testing.T) {
|
||||
feed, err := dm.Sync(context.Background())
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
return
|
||||
}
|
||||
rs, _ := json.Marshal(feed)
|
||||
t.Logf("[Test] feed: %+v", string(rs))
|
||||
}
|
||||
271
plugin/rsshub/domain/storageRepo.go
Normal file
271
plugin/rsshub/domain/storageRepo.go
Normal file
@@ -0,0 +1,271 @@
|
||||
package domain
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/jinzhu/gorm"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
// repoStorage db struct for rss
|
||||
type repoStorage struct {
|
||||
orm *gorm.DB
|
||||
}
|
||||
|
||||
// initDB ...
|
||||
func (s *repoStorage) initDB() (err error) {
|
||||
err = s.orm.AutoMigrate(&RssSource{}, &RssContent{}, &RssSubscribe{}).Error
|
||||
if err != nil {
|
||||
logrus.Errorf("[rsshub initDB] error: %v", err)
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
// s.orm.LogMode(true)
|
||||
}
|
||||
|
||||
// GetSubscribesBySource Impl
|
||||
func (s *repoStorage) GetSubscribesBySource(ctx context.Context, feedPath string) ([]*RssSubscribe, error) {
|
||||
logrus.WithContext(ctx).Infof("[rsshub GetSubscribesBySource] feedPath: %s", feedPath)
|
||||
rs := make([]*RssSubscribe, 0)
|
||||
err := s.orm.Model(&RssSubscribe{}).Joins(fmt.Sprintf("%s left join %s on %s.rss_source_id=%s.id", tableNameRssSubscribe, tableNameRssSource, tableNameRssSubscribe, tableNameRssSource)).
|
||||
Where("rss_source.rss_hub_feed_path = ?", feedPath).Select("rss_subscribe.*").Find(&rs).Error
|
||||
if err != nil {
|
||||
if errors.Is(err, gorm.ErrRecordNotFound) {
|
||||
return nil, nil
|
||||
}
|
||||
logrus.WithContext(ctx).Errorf("[rsshub GetSubscribesBySource] error: %v", err)
|
||||
return nil, err
|
||||
}
|
||||
return rs, nil
|
||||
}
|
||||
|
||||
// GetIfExistedSubscribe Impl
|
||||
func (s *repoStorage) GetIfExistedSubscribe(ctx context.Context, gid int64, feedPath string) (*RssSubscribe, bool, error) {
|
||||
rs := RssSubscribe{}
|
||||
|
||||
err := s.orm.Table(tableNameRssSubscribe).
|
||||
Select("rss_subscribe.id, rss_subscribe.group_id, rss_subscribe.rss_source_id, rss_subscribe.mtime").
|
||||
Joins(fmt.Sprintf("INNER JOIN %s ON %s.rss_source_id=%s.id",
|
||||
tableNameRssSource, tableNameRssSubscribe, tableNameRssSource)).
|
||||
Where("rss_source.rss_hub_feed_path = ? AND rss_subscribe.group_id = ?", feedPath, gid).Scan(&rs).Error
|
||||
|
||||
if err != nil {
|
||||
if errors.Is(err, gorm.ErrRecordNotFound) {
|
||||
return nil, false, nil
|
||||
}
|
||||
logrus.WithContext(ctx).Errorf("[rsshub GetIfExistedSubscribe] error: %v", err)
|
||||
return nil, false, err
|
||||
}
|
||||
if rs.ID == 0 {
|
||||
return nil, false, nil
|
||||
}
|
||||
return &rs, true, nil
|
||||
}
|
||||
|
||||
// ==================== RepoSource ==================== [Start]
|
||||
|
||||
// UpsertSource Impl
|
||||
func (s *repoStorage) UpsertSource(ctx context.Context, source *RssSource) (err error) {
|
||||
// Update columns to default value on `id` conflict
|
||||
querySource := &RssSource{RssHubFeedPath: source.RssHubFeedPath}
|
||||
err = s.orm.First(querySource, "rss_hub_feed_path = ?", querySource.RssHubFeedPath).Error
|
||||
if err != nil {
|
||||
if errors.Is(err, gorm.ErrRecordNotFound) {
|
||||
err = s.orm.Create(source).Omit("id").Error
|
||||
if err != nil {
|
||||
logrus.WithContext(ctx).Errorf("[rsshub] add source error: %v", err)
|
||||
return
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
source.ID = querySource.ID
|
||||
logrus.WithContext(ctx).Infof("[rsshub] update source: %+v", source.UpdatedParsed)
|
||||
err = s.orm.Model(&source).Where(&RssSource{ID: source.ID}).
|
||||
Updates(&RssSource{
|
||||
Title: source.Title,
|
||||
ChannelDesc: source.ChannelDesc,
|
||||
ImageURL: source.ImageURL,
|
||||
Link: source.Link,
|
||||
UpdatedParsed: source.UpdatedParsed,
|
||||
Mtime: time.Now(),
|
||||
}).Error
|
||||
if err != nil {
|
||||
logrus.WithContext(ctx).Errorf("[rsshub] update source error: %v", err)
|
||||
return
|
||||
}
|
||||
logrus.Println("[rsshub] add source success: ", source.ID)
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetSources Impl
|
||||
func (s *repoStorage) GetSources(ctx context.Context) (sources []RssSource, err error) {
|
||||
sources = []RssSource{}
|
||||
err = s.orm.Find(&sources, "id > 0").Error
|
||||
if err != nil {
|
||||
if errors.Is(err, gorm.ErrRecordNotFound) {
|
||||
return nil, errors.New("source not found")
|
||||
}
|
||||
logrus.WithContext(ctx).Errorf("[rsshub] get sources error: %v", err)
|
||||
return
|
||||
}
|
||||
logrus.WithContext(ctx).Infof("[rsshub] get sources success: %d", len(sources))
|
||||
return
|
||||
}
|
||||
|
||||
// GetSourceByRssHubFeedLink Impl
|
||||
func (s *repoStorage) GetSourceByRssHubFeedLink(ctx context.Context, rssHubFeedLink string) (source *RssSource, err error) {
|
||||
source = &RssSource{RssHubFeedPath: rssHubFeedLink}
|
||||
err = s.orm.Take(source, source).Error
|
||||
if err != nil {
|
||||
if errors.Is(err, gorm.ErrRecordNotFound) {
|
||||
return nil, nil
|
||||
}
|
||||
logrus.WithContext(ctx).Errorf("[rsshub] get source error: %v", err)
|
||||
return
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// DeleteSource Impl
|
||||
func (s *repoStorage) DeleteSource(ctx context.Context, fID int64) (err error) {
|
||||
err = s.orm.Delete(&RssSource{}, "id = ?", fID).Error
|
||||
if err != nil {
|
||||
logrus.WithContext(ctx).Errorf("[rsshub] storage.DeleteSource: %v", err)
|
||||
if errors.Is(err, gorm.ErrRecordNotFound) {
|
||||
return errors.New("source not found")
|
||||
}
|
||||
return
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// ==================== RepoSource ==================== [End]
|
||||
|
||||
// ==================== RepoContent ==================== [Start]
|
||||
|
||||
// UpsertContent Impl
|
||||
func (s *repoStorage) UpsertContent(ctx context.Context, content *RssContent) (err error) {
|
||||
// check params
|
||||
if content == nil {
|
||||
err = errors.New("content is nil")
|
||||
return
|
||||
}
|
||||
// check params.RssHubFeedPath and params.HashID
|
||||
if content.RssSourceID < 0 || content.HashID == "" || content.Title == "" {
|
||||
err = errors.New("content.RssSourceID or content.HashID or content.Title is empty")
|
||||
return
|
||||
}
|
||||
err = s.orm.Create(content).Omit("id").Error
|
||||
if err != nil {
|
||||
logrus.WithContext(ctx).Errorf("[rsshub] storage.UpsertContent: %v", err)
|
||||
return
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// DeleteSourceContents Impl
|
||||
func (s *repoStorage) DeleteSourceContents(ctx context.Context, channelID int64) (rows int64, err error) {
|
||||
err = s.orm.Delete(&RssSubscribe{}).Where(&RssSubscribe{RssSourceID: channelID}).Error
|
||||
if err != nil {
|
||||
logrus.WithContext(ctx).Errorf("[rsshub] storage.DeleteSourceContents: %v", err)
|
||||
return
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// IsContentHashIDExist Impl
|
||||
func (s *repoStorage) IsContentHashIDExist(ctx context.Context, hashID string) (bool, error) {
|
||||
wanted := &RssContent{HashID: hashID}
|
||||
err := s.orm.Take(wanted, wanted).Error
|
||||
if err != nil {
|
||||
if errors.Is(err, gorm.ErrRecordNotFound) {
|
||||
return false, nil
|
||||
}
|
||||
logrus.WithContext(ctx).Errorf("[rsshub] storage.IsContentHashIDExist: %v", err)
|
||||
return false, err
|
||||
}
|
||||
return true, nil
|
||||
}
|
||||
|
||||
// ==================== RepoContent ==================== [End]
|
||||
|
||||
// ==================== RepoSubscribe ==================== [Start]
|
||||
|
||||
// CreateSubscribe Impl
|
||||
func (s *repoStorage) CreateSubscribe(ctx context.Context, gid, rssSourceID int64) (err error) {
|
||||
// check subscribe
|
||||
if rssSourceID < 0 || gid == 0 {
|
||||
err = errors.New("gid or rssSourceID is empty")
|
||||
return
|
||||
}
|
||||
err = s.orm.Create(&RssSubscribe{GroupID: gid, RssSourceID: rssSourceID}).Omit("id").Error
|
||||
if err != nil {
|
||||
logrus.WithContext(ctx).Errorf("[rsshub] storage.CreateSubscribe: %v", err)
|
||||
return
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// DeleteSubscribe Impl
|
||||
func (s *repoStorage) DeleteSubscribe(ctx context.Context, subscribeID int64) (err error) {
|
||||
err = s.orm.Delete(&RssSubscribe{}, "id = ?", subscribeID).Error
|
||||
if err != nil {
|
||||
logrus.WithContext(ctx).Errorf("[rsshub] storage.DeleteSubscribe error: %v", err)
|
||||
return
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// GetSubscribeByID Impl
|
||||
func (s *repoStorage) GetSubscribeByID(ctx context.Context, gid int64, subscribeID int64) (res *RssSubscribe, err error) {
|
||||
res = &RssSubscribe{}
|
||||
err = s.orm.First(res, &RssSubscribe{GroupID: gid, RssSourceID: subscribeID}).Error
|
||||
if err != nil {
|
||||
if errors.Is(err, gorm.ErrRecordNotFound) {
|
||||
return nil, nil
|
||||
}
|
||||
logrus.WithContext(ctx).Errorf("[rsshub] storage.GetSubscribeByID: %v", err)
|
||||
return nil, err
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// GetSubscribedChannelsByGroupID Impl
|
||||
func (s *repoStorage) GetSubscribedChannelsByGroupID(ctx context.Context, gid int64) (res []*RssSource, err error) {
|
||||
res = make([]*RssSource, 0)
|
||||
err = s.orm.Model(&RssSource{}).
|
||||
Joins(fmt.Sprintf("join %s on rss_source_id=%s.id", tableNameRssSubscribe, tableNameRssSource)).Where("rss_subscribe.group_id = ?", gid).
|
||||
Select("rss_source.*").
|
||||
Find(&res).
|
||||
Error
|
||||
if err != nil {
|
||||
if errors.Is(err, gorm.ErrRecordNotFound) {
|
||||
err = nil
|
||||
return
|
||||
}
|
||||
logrus.WithContext(ctx).Errorf("[rsshub] storage.GetSubscribedChannelsByGroupID: %v", err)
|
||||
return
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// GetSubscribes Impl
|
||||
func (s *repoStorage) GetSubscribes(ctx context.Context) (res []*RssSubscribe, err error) {
|
||||
res = make([]*RssSubscribe, 0)
|
||||
err = s.orm.Find(&res).Error
|
||||
if err != nil {
|
||||
if errors.Is(err, gorm.ErrRecordNotFound) {
|
||||
err = nil
|
||||
return
|
||||
}
|
||||
logrus.WithContext(ctx).Errorf("[rsshub] storage.GetSubscribes: %v", err)
|
||||
return
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// ==================== RepoSubscribe ==================== [End]
|
||||
Reference in New Issue
Block a user