Skip to content
Snippets Groups Projects
Commit 06a5cf11 authored by Timuçin Boldt's avatar Timuçin Boldt :bike:
Browse files

Merge branch 'master' into 'main'

Upstream update

See merge request tudo-fsinfo/oeffentlichkeit/rss!2
parents b3e9147b e4960733
No related branches found
No related tags found
No related merge requests found
# Feed update interval in minutes # Feed update interval in minutes
update_interval: 30 update_interval: 60
# Maximum backoff in minutes when failing to fetch feeds (defaults to 5 days)
max_backoff: 7200
# The time to sleep between send requests when broadcasting a new feed entry. # The time to sleep between send requests when broadcasting a new feed entry.
# Set to 0 to disable sleep or -1 to run all requests asynchronously at once. # Set to 0 to disable sleep or -1 to run all requests asynchronously at once.
spam_sleep: 0 spam_sleep: 0
......
# rss - A maubot plugin to subscribe to RSS/Atom feeds. # rss - A maubot plugin to subscribe to RSS/Atom feeds.
# Copyright (C) 2020 Tulir Asokan # Copyright (C) 2021 Tulir Asokan
# #
# This program is free software: you can redistribute it and/or modify # This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by # it under the terms of the GNU Affero General Public License as published by
...@@ -31,10 +31,13 @@ from maubot.handlers import command, event ...@@ -31,10 +31,13 @@ from maubot.handlers import command, event
from .db import Database, Feed, Entry, Subscription from .db import Database, Feed, Entry, Subscription
rss_change_level = EventType.find("xyz.maubot.rss", t_class=EventType.Class.STATE)
class Config(BaseProxyConfig): class Config(BaseProxyConfig):
def do_update(self, helper: ConfigUpdateHelper) -> None: def do_update(self, helper: ConfigUpdateHelper) -> None:
helper.copy("update_interval") helper.copy("update_interval")
helper.copy("max_backoff")
helper.copy("spam_sleep") helper.copy("spam_sleep")
helper.copy("command_prefix") helper.copy("command_prefix")
helper.copy("admins") helper.copy("admins")
...@@ -101,16 +104,23 @@ class RSSBot(Plugin): ...@@ -101,16 +104,23 @@ class RSSBot(Plugin):
except Exception: except Exception:
self.log.exception("Fatal error while polling feeds") self.log.exception("Fatal error while polling feeds")
def _send(self, feed: Feed, entry: Entry, sub: Subscription) -> Awaitable[EventID]: async def _send(self, feed: Feed, entry: Entry, sub: Subscription) -> EventID:
return self.client.send_markdown(sub.room_id, sub.notification_template.safe_substitute({ message = sub.notification_template.safe_substitute({
"feed_url": feed.url, "feed_url": feed.url,
"feed_title": feed.title, "feed_title": feed.title,
"feed_subtitle": feed.subtitle, "feed_subtitle": feed.subtitle,
"feed_link": feed.link, "feed_link": feed.link,
**entry._asdict(), **entry._asdict(),
}), msgtype=MessageType.NOTICE if sub.send_notice else MessageType.TEXT, allow_html=True) })
msgtype = MessageType.NOTICE if sub.send_notice else MessageType.TEXT
try:
return await self.client.send_markdown(sub.room_id, message, msgtype=msgtype,
allow_html=True)
except Exception as e:
self.log.warning(f"Failed to send {entry.id} of {feed.id} to {sub.room_id}: {e}")
async def _broadcast(self, feed: Feed, entry: Entry, subscriptions: List[Subscription]) -> None: async def _broadcast(self, feed: Feed, entry: Entry, subscriptions: List[Subscription]) -> None:
self.log.debug(f"Broadcasting {entry.id} of {feed.id}")
spam_sleep = self.config["spam_sleep"] spam_sleep = self.config["spam_sleep"]
tasks = [self._send(feed, entry, sub) for sub in subscriptions] tasks = [self._send(feed, entry, sub) for sub in subscriptions]
if spam_sleep >= 0: if spam_sleep >= 0:
...@@ -124,17 +134,33 @@ class RSSBot(Plugin): ...@@ -124,17 +134,33 @@ class RSSBot(Plugin):
subs = self.db.get_feeds() subs = self.db.get_feeds()
if not subs: if not subs:
return return
for res in asyncio.as_completed([self.try_parse_feed(feed=feed) for feed in subs]): now = int(time())
tasks = [self.try_parse_feed(feed=feed) for feed in subs if feed.next_retry < now]
feed: Feed
entries: Iterable[Entry]
for res in asyncio.as_completed(tasks):
feed, entries = await res feed, entries = await res
self.log.trace(f"Fetching {feed.id} (backoff: {feed.error_count} / {feed.next_retry}) "
f"success: {bool(entries)}")
if not entries: if not entries:
error_count = feed.error_count + 1
next_retry_delay = self.config["update_interval"] * 60 * error_count
next_retry_delay = min(next_retry_delay, self.config["max_backoff"] * 60)
next_retry = int(time() + next_retry_delay)
self.log.debug(f"Setting backoff of {feed.id} to {error_count} / {next_retry}")
self.db.set_backoff(feed, error_count, next_retry)
continue continue
elif feed.error_count > 0:
self.log.debug(f"Resetting backoff of {feed.id}")
self.db.set_backoff(feed, error_count=0, next_retry=0)
try: try:
new_entries = {entry.id: entry for entry in entries} new_entries = {entry.id: entry for entry in entries}
except Exception: except Exception:
self.log.exception(f"Error items of {feed.url}") self.log.exception(f"Weird error in items of {feed.url}")
continue continue
for old_entry in self.db.get_entries(feed.id): for old_entry in self.db.get_entries(feed.id):
new_entries.pop(old_entry.id, None) new_entries.pop(old_entry.id, None)
self.log.trace(f"Feed {feed.id} had {len(new_entries)} new entries")
self.db.add_entries(new_entries.values()) self.db.add_entries(new_entries.values())
for entry in new_entries.values(): for entry in new_entries.values():
await self._broadcast(feed, entry, feed.subscriptions) await self._broadcast(feed, entry, feed.subscriptions)
...@@ -153,9 +179,11 @@ class RSSBot(Plugin): ...@@ -153,9 +179,11 @@ class RSSBot(Plugin):
async def try_parse_feed(self, feed: Optional[Feed] = None) -> Tuple[Feed, Iterable[Entry]]: async def try_parse_feed(self, feed: Optional[Feed] = None) -> Tuple[Feed, Iterable[Entry]]:
try: try:
self.log.trace(f"Trying to fetch {feed.id} / {feed.url} "
f"(backoff: {feed.error_count} / {feed.next_retry})")
return await self.parse_feed(feed=feed) return await self.parse_feed(feed=feed)
except Exception: except Exception as e:
self.log.warning(f"Failed to parse feed {feed.id} / {feed.url}") self.log.warning(f"Failed to parse feed {feed.id} / {feed.url}: {e}")
return feed, [] return feed, []
async def parse_feed(self, *, feed: Optional[Feed] = None, url: Optional[str] = None async def parse_feed(self, *, feed: Optional[Feed] = None, url: Optional[str] = None
...@@ -163,7 +191,7 @@ class RSSBot(Plugin): ...@@ -163,7 +191,7 @@ class RSSBot(Plugin):
if feed is None: if feed is None:
if url is None: if url is None:
raise ValueError("Either feed or url must be set") raise ValueError("Either feed or url must be set")
feed = Feed(-1, url, "", "", "", []) feed = Feed(-1, url, "", "", "", 0, 0, [])
elif url is not None: elif url is not None:
raise ValueError("Only one of feed or url must be set") raise ValueError("Only one of feed or url must be set")
resp = await self.http.get(feed.url) resp = await self.http.get(feed.url)
...@@ -184,6 +212,7 @@ class RSSBot(Plugin): ...@@ -184,6 +212,7 @@ class RSSBot(Plugin):
raise ValueError("Feed is not a valid JSON feed (items is not a list)") raise ValueError("Feed is not a valid JSON feed (items is not a list)")
feed = Feed(id=feed.id, title=content["title"], subtitle=content.get("subtitle", ""), feed = Feed(id=feed.id, title=content["title"], subtitle=content.get("subtitle", ""),
url=feed.url, link=content.get("home_page_url", ""), url=feed.url, link=content.get("home_page_url", ""),
next_retry=feed.next_retry, error_count=feed.error_count,
subscriptions=feed.subscriptions) subscriptions=feed.subscriptions)
return feed, (cls._parse_json_entry(feed.id, entry) for entry in content["items"]) return feed, (cls._parse_json_entry(feed.id, entry) for entry in content["items"])
...@@ -220,6 +249,7 @@ class RSSBot(Plugin): ...@@ -220,6 +249,7 @@ class RSSBot(Plugin):
feed_data = parsed_data.get("feed", {}) feed_data = parsed_data.get("feed", {})
feed = Feed(id=feed.id, url=feed.url, title=feed_data.get("title", feed.url), feed = Feed(id=feed.id, url=feed.url, title=feed_data.get("title", feed.url),
subtitle=feed_data.get("description", ""), link=feed_data.get("link", ""), subtitle=feed_data.get("description", ""), link=feed_data.get("link", ""),
error_count=feed.error_count, next_retry=feed.next_retry,
subscriptions=feed.subscriptions) subscriptions=feed.subscriptions)
return feed, (cls._parse_rss_entry(feed.id, entry) for entry in parsed_data.entries) return feed, (cls._parse_rss_entry(feed.id, entry) for entry in parsed_data.entries)
...@@ -270,8 +300,8 @@ class RSSBot(Plugin): ...@@ -270,8 +300,8 @@ class RSSBot(Plugin):
return True return True
levels = await self.get_power_levels(evt.room_id) levels = await self.get_power_levels(evt.room_id)
user_level = levels.get_user_level(evt.sender) user_level = levels.get_user_level(evt.sender)
state_level = levels.events.get("xyz.maubot.rss", levels.state_default) state_level = levels.get_event_level(rss_change_level)
if type(state_level) != int: if not isinstance(state_level, int):
state_level = 50 state_level = 50
if user_level < state_level: if user_level < state_level:
await evt.reply("You don't have the permission to " await evt.reply("You don't have the permission to "
...@@ -299,8 +329,17 @@ class RSSBot(Plugin): ...@@ -299,8 +329,17 @@ class RSSBot(Plugin):
return return
feed = self.db.create_feed(info) feed = self.db.create_feed(info)
self.db.add_entries(entries, override_feed_id=feed.id) self.db.add_entries(entries, override_feed_id=feed.id)
elif feed.error_count > 0:
self.db.set_backoff(feed, error_count=feed.error_count, next_retry=0)
feed_info = f"feed ID {feed.id}: [{feed.title}]({feed.url})"
sub, _ = self.db.get_subscription(feed.id, evt.room_id)
if sub is not None:
subscriber = ("You" if sub.user_id == evt.sender
else f"[{sub.user_id}](https://matrix.to/#/{sub.user_id})")
await evt.reply(f"{subscriber} had already subscribed this room to {feed_info}")
else:
self.db.subscribe(feed.id, evt.room_id, evt.sender) self.db.subscribe(feed.id, evt.room_id, evt.sender)
await evt.reply(f"Subscribed to feed ID {feed.id}: [{feed.title}]({feed.url})") await evt.reply(f"Subscribed to {feed_info}")
@rss.subcommand("unsubscribe", aliases=("u", "unsub"), @rss.subcommand("unsubscribe", aliases=("u", "unsub"),
help="Unsubscribe this room from a feed.") help="Unsubscribe this room from a feed.")
...@@ -350,13 +389,20 @@ class RSSBot(Plugin): ...@@ -350,13 +389,20 @@ class RSSBot(Plugin):
send_type = "m.notice" if setting else "m.text" send_type = "m.notice" if setting else "m.text"
await evt.reply(f"Updates for feed ID {feed.id} will now be sent as `{send_type}`") await evt.reply(f"Updates for feed ID {feed.id} will now be sent as `{send_type}`")
@staticmethod
def _format_subscription(feed: Feed, subscriber: str) -> str:
msg = (f"* {feed.id} - [{feed.title}]({feed.url}) "
f"(subscribed by [{subscriber}](https://matrix.to/#/{subscriber}))")
if feed.error_count > 1:
msg += f" \n ⚠️ The last {feed.error_count} attempts to fetch the feed have failed!"
return msg
@rss.subcommand("subscriptions", aliases=("ls", "list", "subs"), @rss.subcommand("subscriptions", aliases=("ls", "list", "subs"),
help="List the subscriptions in the current room.") help="List the subscriptions in the current room.")
async def command_subscriptions(self, evt: MessageEvent) -> None: async def command_subscriptions(self, evt: MessageEvent) -> None:
subscriptions = self.db.get_feeds_by_room(evt.room_id) subscriptions = self.db.get_feeds_by_room(evt.room_id)
await evt.reply("**Subscriptions in this room:**\n\n" await evt.reply("**Subscriptions in this room:**\n\n"
+ "\n".join(f"* {feed.id} - [{feed.title}]({feed.url}) (subscribed by " + "\n".join(self._format_subscription(feed, subscriber)
f"[{subscriber}](https://matrix.to/#/{subscriber}))"
for feed, subscriber in subscriptions)) for feed, subscriber in subscriptions))
@event.on(EventType.ROOM_TOMBSTONE) @event.on(EventType.ROOM_TOMBSTONE)
......
...@@ -28,8 +28,8 @@ from mautrix.util.config import BaseProxyConfig ...@@ -28,8 +28,8 @@ from mautrix.util.config import BaseProxyConfig
Subscription = NamedTuple("Subscription", feed_id=int, room_id=RoomID, user_id=UserID, Subscription = NamedTuple("Subscription", feed_id=int, room_id=RoomID, user_id=UserID,
notification_template=Template, send_notice=bool) notification_template=Template, send_notice=bool)
Feed = NamedTuple("Feed", id=int, url=str, title=str, subtitle=str, link=str, Feed = NamedTuple("Feed", id=int, url=str, title=str, subtitle=str, link=str, next_retry=int,
subscriptions=List[Subscription]) error_count=int, subscriptions=List[Subscription])
Entry = NamedTuple("Entry", feed_id=int, id=str, date=datetime, title=str, summary=str, content=str, link=str) Entry = NamedTuple("Entry", feed_id=int, id=str, date=datetime, title=str, summary=str, content=str, link=str)
...@@ -49,7 +49,9 @@ class Database: ...@@ -49,7 +49,9 @@ class Database:
Column("url", Text, nullable=False, unique=True), Column("url", Text, nullable=False, unique=True),
Column("title", Text, nullable=False), Column("title", Text, nullable=False),
Column("subtitle", Text, nullable=False), Column("subtitle", Text, nullable=False),
Column("link", Text, nullable=False)) Column("link", Text, nullable=False),
Column("next_retry", Integer, nullable=False),
Column("error_count", Integer, nullable=False))
self.subscription = Table("subscription", metadata, self.subscription = Table("subscription", metadata,
Column("feed_id", Integer, ForeignKey("feed.id"), Column("feed_id", Integer, ForeignKey("feed.id"),
primary_key=True), primary_key=True),
...@@ -109,6 +111,10 @@ class Database: ...@@ -109,6 +111,10 @@ class Database:
if version == 1: if version == 1:
self.db.execute("ALTER TABLE subscription ADD COLUMN send_notice BOOLEAN DEFAULT true") self.db.execute("ALTER TABLE subscription ADD COLUMN send_notice BOOLEAN DEFAULT true")
version = 2 version = 2
if version == 2:
self.db.execute("ALTER TABLE feed ADD COLUMN next_retry BIGINT DEFAULT 0")
self.db.execute("ALTER TABLE feed ADD COLUMN error_count BIGINT DEFAULT 0")
version = 3
self.db.execute(self.version.delete()) self.db.execute(self.version.delete())
self.db.execute(self.version.insert().values(version=version)) self.db.execute(self.version.insert().values(version=version))
...@@ -121,9 +127,10 @@ class Database: ...@@ -121,9 +127,10 @@ class Database:
.where(self.subscription.c.feed_id == self.feed.c.id)) .where(self.subscription.c.feed_id == self.feed.c.id))
map: Dict[int, Feed] = {} map: Dict[int, Feed] = {}
for row in rows: for row in rows:
(feed_id, url, title, subtitle, link, (feed_id, url, title, subtitle, link, next_retry, error_count,
room_id, user_id, notification_template, send_notice) = row room_id, user_id, notification_template, send_notice) = row
map.setdefault(feed_id, Feed(feed_id, url, title, subtitle, link, subscriptions=[])) map.setdefault(feed_id, Feed(feed_id, url, title, subtitle, link, next_retry,
error_count, subscriptions=[]))
map[feed_id].subscriptions.append( map[feed_id].subscriptions.append(
Subscription(feed_id=feed_id, room_id=room_id, user_id=user_id, Subscription(feed_id=feed_id, room_id=room_id, user_id=user_id,
notification_template=Template(notification_template), notification_template=Template(notification_template),
...@@ -131,8 +138,10 @@ class Database: ...@@ -131,8 +138,10 @@ class Database:
return map.values() return map.values()
def get_feeds_by_room(self, room_id: RoomID) -> Iterable[Tuple[Feed, UserID]]: def get_feeds_by_room(self, room_id: RoomID) -> Iterable[Tuple[Feed, UserID]]:
return ((Feed(feed_id, url, title, subtitle, link, subscriptions=[]), user_id) return ((Feed(feed_id, url, title, subtitle, link, next_retry, error_count,
for (feed_id, url, title, subtitle, link, user_id) in subscriptions=[]),
user_id)
for (feed_id, url, title, subtitle, link, next_retry, error_count, user_id) in
self.db.execute(select([self.feed, self.subscription.c.user_id]) self.db.execute(select([self.feed, self.subscription.c.user_id])
.where(and_(self.subscription.c.room_id == room_id, .where(and_(self.subscription.c.room_id == room_id,
self.subscription.c.feed_id == self.feed.c.id)))) self.subscription.c.feed_id == self.feed.c.id))))
...@@ -179,12 +188,12 @@ class Database: ...@@ -179,12 +188,12 @@ class Database:
.where(and_(tbl.c.feed_id == feed_id, tbl.c.room_id == room_id, .where(and_(tbl.c.feed_id == feed_id, tbl.c.room_id == room_id,
self.feed.c.id == feed_id))) self.feed.c.id == feed_id)))
try: try:
(feed_id, url, title, subtitle, link, (feed_id, url, title, subtitle, link, next_retry, error_count,
room_id, user_id, template, send_notice) = next(rows) room_id, user_id, template, send_notice) = next(rows)
notification_template = Template(template) notification_template = Template(template)
return (Subscription(feed_id, room_id, user_id, notification_template, send_notice) return (Subscription(feed_id, room_id, user_id, notification_template, send_notice)
if room_id else None, if room_id else None,
Feed(feed_id, url, title, subtitle, link, [])) Feed(feed_id, url, title, subtitle, link, next_retry, error_count, []))
except (ValueError, StopIteration): except (ValueError, StopIteration):
return None, None return None, None
...@@ -195,9 +204,16 @@ class Database: ...@@ -195,9 +204,16 @@ class Database:
def create_feed(self, info: Feed) -> Feed: def create_feed(self, info: Feed) -> Feed:
res = self.db.execute(self.feed.insert().values(url=info.url, title=info.title, res = self.db.execute(self.feed.insert().values(url=info.url, title=info.title,
subtitle=info.subtitle, link=info.link)) subtitle=info.subtitle, link=info.link,
next_retry=info.next_retry))
return Feed(id=res.inserted_primary_key[0], url=info.url, title=info.title, return Feed(id=res.inserted_primary_key[0], url=info.url, title=info.title,
subtitle=info.subtitle, link=info.link, subscriptions=[]) subtitle=info.subtitle, link=info.link, next_retry=info.next_retry,
error_count=info.error_count, subscriptions=[])
def set_backoff(self, info: Feed, error_count: int, next_retry: int) -> None:
self.db.execute(self.feed.update()
.where(self.feed.c.id == info.id)
.values(error_count=error_count, next_retry=next_retry))
def subscribe(self, feed_id: int, room_id: RoomID, user_id: UserID) -> None: def subscribe(self, feed_id: int, room_id: RoomID, user_id: UserID) -> None:
notification_template = self.config["notification_template"] notification_template = self.config["notification_template"]
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment