diff --git a/artemis/bot.py b/artemis/bot.py index 2c62ff4..c4b41e4 100644 --- a/artemis/bot.py +++ b/artemis/bot.py @@ -19,7 +19,6 @@ from discord.ext.commands.cooldowns import BucketType from .cogs import EXTENSIONS from . import utils -from .utils.reddit import Reddit from .utils.api import API from .utils.catbox import Catbox, Litterbox from .utils.deepl import DeepL @@ -76,7 +75,7 @@ class Artemis(commands.Bot): self.user_agent: str = config.user_agent self.real_user_agent: str = config.real_user_agent - self.keys = config.keys + self.secrets = config.secrets self.pink = discord.Colour(0xFFCFF1) self.invisible = discord.Colour(0x2F3136) @@ -104,11 +103,10 @@ class Artemis(commands.Bot): await self.load_extensions() - self.api = API(self, config.internal_api_url, self.keys.api) - self.catbox = Catbox(self.keys.catbox, session=self.session) + self.api = API(self, config.internal_api_url, self.secrets.api) + self.catbox = Catbox(self.secrets.catbox, session=self.session) self.litterbox = Litterbox(session=self.session) - self.reddit = Reddit(self.session) - self.deepl = DeepL(self, self.keys.deepl) + self.deepl = DeepL(self, self.secrets.deepl) await self.maybe_send_restarted() diff --git a/artemis/cogs/anime.py b/artemis/cogs/anime.py index 92cbbf9..f52c94a 100644 --- a/artemis/cogs/anime.py +++ b/artemis/cogs/anime.py @@ -3,7 +3,7 @@ from __future__ import annotations import time from enum import Enum from io import BytesIO -from typing import TYPE_CHECKING, Optional +from typing import TYPE_CHECKING, Optional, cast from urllib.parse import quote import discord @@ -129,7 +129,7 @@ class Anime(commands.Cog): """Search for anime.""" await ctx.typing() - results, _ = await self.anilist.search_anime(query, 10) + results, _ = await self.anilist.search_anime(query, 10) # type: ignore wrong return type if not results: return await ctx.reply("No results found.") @@ -161,7 +161,7 @@ class Anime(commands.Cog): """Search for manga.""" await ctx.typing() - results, _ = await self.anilist.search_manga(query, 10) + results, _ = await self.anilist.search_manga(query, 10) # type: ignore wrong return type if not results: return await ctx.reply("No results found.") @@ -192,7 +192,7 @@ class Anime(commands.Cog): async def character(self, ctx: commands.Context, *, query: str): """Search for anime and manga characters.""" await ctx.typing() - results, _ = await self.anilist.search_character(query, 10) + results, _ = await self.anilist.search_character(query, 10) # type: ignore wrong return type if not results: return await ctx.reply("No results found.") @@ -234,7 +234,7 @@ class Anime(commands.Cog): await ctx.typing() - if "discord" in url: + if "discord" in cast(str, url): async with self.bot.session.get(url) as r: if r.status != 200: return await ctx.reply(f"Discord CDN Error: {r.status} {r.reason}") diff --git a/artemis/cogs/funhouse.py b/artemis/cogs/funhouse.py index 9df08f7..17cabf4 100644 --- a/artemis/cogs/funhouse.py +++ b/artemis/cogs/funhouse.py @@ -49,11 +49,6 @@ class Funhouse(commands.Cog): description=description, timestamp=pendulum.now("UTC"), colour=discord.Colour.random() ) - async def invoke_reddit(self, ctx: commands.Context, subreddit: str): - reddit = self.bot.get_command("reddit") - assert reddit - return await reddit(ctx, subreddit) - @commands.command() async def hug(self, ctx: commands.Context, member: discord.Member): """Hug someone.""" @@ -110,7 +105,7 @@ class Funhouse(commands.Cog): """ if not user: user = ctx.message.author - if user.display_avatar.is_animated(): + elif user.display_avatar.is_animated(): url = gif = user.display_avatar.replace(size=4096, format="gif").url static = user.display_avatar.replace(size=4096, format="png").url description = f"[gif]({gif}) | [static]({static})" @@ -145,7 +140,6 @@ class Funhouse(commands.Cog): banner_colour = user.accent_colour if banner_colour: colour_cmd = self.bot.get_command("color") - assert colour_cmd return await colour_cmd(ctx, colour=banner_colour) else: raise ArtemisError(f"{user.display_name} does not have a custom banner set.") @@ -165,28 +159,6 @@ class Funhouse(commands.Cog): embed.set_author(name=user.display_name, icon_url=user.display_avatar.url) await ctx.reply(embed=embed) - @commands.group(name="reddit", invoke_without_command=True) - async def reddit_(self, ctx: commands.Context, subreddit: str = "all"): - """Shows a random post from reddit or a given subreddit.""" - - async with ctx.typing(): - post = await self.bot.reddit.random(subreddit) - embeds = await post.to_embed(ctx.message) - - await ctx.reply(embeds=embeds) - - @reddit_.command() - async def show(self, ctx: commands.Context, pid: str): - """Displays a rich reddit post embed for a given post ID.""" - await ctx.typing() - - post = await self.bot.reddit.post(pid=pid) - if not post: - raise ArtemisError("Invalid post ID.") - - embeds = await post.to_embed(ctx.message) - await ctx.reply(embeds=embeds) - @commands.command(aliases=["4chan", "da"]) @commands.cooldown(1, 2, commands.BucketType.default) async def desuarchive(self, ctx: commands.Context, board: str, *, query: str): @@ -239,6 +211,10 @@ class Funhouse(commands.Cog): board_url = f"https://desuarchive.org/{board}/" description = post.select_one(".text") + + if not description: + continue + for br in description.select("br"): br.replace_with("\n") description = trim(re.sub(r"(>)(\w.*)", r"\g<1> \g<2>", description.text), 4096) diff --git a/artemis/cogs/media.py b/artemis/cogs/media.py index e10fcac..0e82063 100644 --- a/artemis/cogs/media.py +++ b/artemis/cogs/media.py @@ -7,11 +7,10 @@ import struct import zipfile from io import BytesIO from pathlib import Path -from typing import TYPE_CHECKING, Optional +from typing import TYPE_CHECKING, Optional, cast from urllib.parse import quote_plus import discord -import humanize import pendulum import yt_dlp from bs4 import BeautifulSoup @@ -180,8 +179,8 @@ class Media(commands.Cog): async def process(data: list[dict], lang: str = None) -> discord.File: if lang: - found = discord.utils.find(lambda x: x["lang"] == lang) - if not data: + found = discord.utils.find(lambda x: x["lang"] == lang, data) + if not found: raise ArtemisError("No subtitles available for that language.") return await process_one(found) elif len(data) == 1: @@ -259,7 +258,6 @@ class Media(commands.Cog): async with ctx.typing(): info_dict = await run_ytdlp(url, ytdl_opts, download=False) - assert info_dict title = info_dict.get("title") url = info_dict["url"] @@ -297,7 +295,6 @@ class Media(commands.Cog): async with ctx.typing(): info_dict = await run_ytdlp(url, ytdl_opts, download=False) - assert info_dict title = info_dict["title"] url = info_dict["url"] @@ -364,7 +361,7 @@ class Media(commands.Cog): def match_filter(info_dict, incomplete): nonlocal url - if "#_sudo" in url and ctx.author.id == self.bot.owner_id: + if "#_sudo" in cast(str, url) and ctx.author.id == self.bot.owner_id: return None duration = info_dict.get("duration") filesize = info_dict.get("filesize") or info_dict.get("filesize_approx") @@ -404,6 +401,8 @@ class Media(commands.Cog): ytdl_opts["external_downloader"] = {"default": "ffmpeg"} ytdl_opts["external_downloader_args"] = args + assert ss and to + diff = to - ss if diff > 3600: raise ArtemisError("The trim selection is too long (> 1 hour).") @@ -457,44 +456,6 @@ class Media(commands.Cog): if path and path.exists(): path.unlink() - @commands.command() - @commands.cooldown(1, 1, commands.BucketType.default) - async def dislikes(self, ctx: commands.Context, url: str): - """Shows some statistics for a YouTube video including dislikes using Return YouTube Dislikes API.""" - YT_RE = r"(?:youtube\.com/watch\?v=|youtu\.be/|youtube\.com/shorts/)([\w-]+)" - - if len(url) == 11: - vid = url - else: - m = re.search(YT_RE, url) - if not m: - raise ArtemisError("Invalid YouTube URL or ID.") - vid = m.group(1) - - params = {"videoId": vid} - - async with ctx.typing(): - async with self.bot.session.get( - "https://returnyoutubedislikeapi.com/votes", params=params - ) as r: - if not r.ok: - if r.status == 404: - raise ArtemisError("Video not found.") - elif r.status == 400: - raise ArtemisError("Invalid video ID.") - else: - raise ArtemisError( - f"Return YouTube Dislikes API returned {r.status} {r.reason}" - ) - data = await r.json() - - views = humanize.intcomma(data["viewCount"]) - likes = humanize.intcomma(data["likes"]) - dislikes = humanize.intcomma(data["dislikes"]) - - msg = f"**{views}** views\n**{likes}** likes\n**{dislikes}** dislikes" - await ctx.reply(msg) - @commands.command(aliases=["lg"]) @commands.cooldown(1, 2, commands.BucketType.default) async def libgen(self, ctx: commands.Context, *, query: str): diff --git a/artemis/cogs/meta.py b/artemis/cogs/meta.py index 2e8634b..1c2b32c 100644 --- a/artemis/cogs/meta.py +++ b/artemis/cogs/meta.py @@ -16,7 +16,6 @@ from discord.utils import format_dt, snowflake_time from humanize import naturalsize from ..utils.common import ArtemisError, check_for_ssrf, is_valid_url -from ..utils.views import BaseView if TYPE_CHECKING: from ..bot import Artemis @@ -174,17 +173,6 @@ class Meta(commands.Cog): msg = f"`{self.to_discord_timestamp(datetime)}`" await ctx.reply(msg) - @commands.command(aliases=["ffmpeg"]) - async def getffmpeg(self, ctx: commands.Context): - """ffmpeg-dl script information.""" - view = BaseView(ctx) - view.add_item( - discord.ui.Button( - label="Download", url="https://github.com/artiemis/get-ffmpeg/releases/latest" - ) - ) - await ctx.reply("https://github.com/artiemis/get-ffmpeg", view=view) - async def setup(bot: Artemis): await bot.add_cog(Meta(bot)) diff --git a/artemis/cogs/mod.py b/artemis/cogs/mod.py index df346b3..3aec53e 100644 --- a/artemis/cogs/mod.py +++ b/artemis/cogs/mod.py @@ -1,10 +1,11 @@ from __future__ import annotations -from typing import TYPE_CHECKING, Optional +from typing import TYPE_CHECKING, Optional, cast import discord import pendulum from discord.ext import commands +from pendulum import DateTime from ..utils.common import ArtemisError, parse_short_time @@ -67,7 +68,7 @@ class Mod(commands.Cog): `{prefix}mute Artemis 2d12h for being nosy` """ max_timeout = pendulum.now("UTC").add(days=28) - if time > max_timeout: + if cast(DateTime, time) > max_timeout: raise ArtemisError("Mute time cannot exceed 28 days.") if not reason: @@ -137,7 +138,11 @@ class Mod(commands.Cog): await webhook.send( content=message.content, username=message.author.display_name, - avatar_url=message.author.avatar.url, + avatar_url=( + message.author.avatar.url + if message.author.avatar + else message.author.default_avatar.url + ), files=files, embeds=embeds, ) diff --git a/artemis/cogs/music.py b/artemis/cogs/music.py index b95fe4b..69ecd30 100644 --- a/artemis/cogs/music.py +++ b/artemis/cogs/music.py @@ -108,7 +108,10 @@ class Music(commands.Cog): ) as r: html = await r.text() - data = re.search(r"var\s?ytInitialData\s?=\s?(\{.*?\});", html).group(1) + data_res = re.search(r"var\s?ytInitialData\s?=\s?(\{.*?\});", html) + if not data_res: + return [] + data = data_res.group(1) data = json.loads(data) videos = data["contents"]["twoColumnSearchResultsRenderer"]["primaryContents"][ "sectionListRenderer" @@ -156,7 +159,6 @@ class Music(commands.Cog): ytdl_opts = {**DEFAULT_OPTS, "default_search": "auto", "format": "251/ba*"} info_dict = await run_ytdlp(url_or_query, ytdl_opts, download=False) - assert info_dict if info_dict.get("entries"): info_dict = info_dict["entries"][0] diff --git a/artemis/cogs/ocr.py b/artemis/cogs/ocr.py index 0c94a89..94bb402 100644 --- a/artemis/cogs/ocr.py +++ b/artemis/cogs/ocr.py @@ -78,7 +78,6 @@ class OCR(commands.Cog): else: flags = Flags(text=text, source=None, dest=None) cmd = self.bot.get_command(translate) - assert cmd await cmd(ctx, flags=flags) else: if len(text) > 2000 - 8: @@ -129,11 +128,9 @@ class OCR(commands.Cog): """ await self.ocr_impl(ctx, flags, translate="gt") - @commands.command( - aliases=["ocrdl"], usage="[source:auto] [lang:eng] [l:eng] [s:auto] [dest:en] [d:en] " - ) + @commands.command(usage="[source:auto] [lang:eng] [l:eng] [s:auto] [dest:en] [d:en] ") @commands.cooldown(1, 2, commands.BucketType.default) - async def ocrdeepl(self, ctx: commands.Context, *, flags: Optional[OCRTranslateFlags]): + async def ocrdl(self, ctx: commands.Context, *, flags: Optional[OCRTranslateFlags]): """ OCR using tesseract and translation using DeepL. Takes $deepl and $ocr flags combined. @@ -149,7 +146,6 @@ class OCR(commands.Cog): """ result = await self.yandex_impl(ctx, url) - assert result.detected_lang lang = get_language_name(result.detected_lang) or result.detected_lang msg = f"Detected language: {lang}\n" + self.bot.codeblock(result.text, "") @@ -170,20 +166,18 @@ class OCR(commands.Cog): result = await self.yandex_impl(ctx, url) flags = Flags(text=result.text, source=None, dest=None) cmd = self.bot.get_command("gt") - assert cmd await cmd(ctx, flags=flags) - @commands.command(aliases=["lensdl", "lenstr"]) + @commands.command(aliases=["lenstr"]) @commands.max_concurrency(1) @commands.cooldown(1, 10, commands.BucketType.default) - async def lensdeepl(self, ctx: commands.Context, *, url: Optional[str]): + async def lensdl(self, ctx: commands.Context, *, url: Optional[str]): """ OCR using Yandex and translation using DeepL. """ result = await self.yandex_impl(ctx, url) flags = Flags(text=result.text, source=None, dest=None) cmd = self.bot.get_command("deepl") - assert cmd await cmd(ctx, flags=flags) diff --git a/artemis/cogs/owner.py b/artemis/cogs/owner.py index 43893be..d7d8314 100644 --- a/artemis/cogs/owner.py +++ b/artemis/cogs/owner.py @@ -178,7 +178,6 @@ except Exception: code = silencer + code jsk_py = self.bot.get_command("jsk py") - assert jsk_py await jsk_py(ctx, argument=codeblocks.codeblock_converter(code)) @dev.command() diff --git a/artemis/cogs/useful.py b/artemis/cogs/useful.py index fae8a11..1257ad0 100644 --- a/artemis/cogs/useful.py +++ b/artemis/cogs/useful.py @@ -4,7 +4,6 @@ import json import re import unicodedata from io import BytesIO, StringIO -from math import ceil, log2 from typing import TYPE_CHECKING, Optional from urllib.parse import quote, urlencode @@ -356,7 +355,8 @@ class Useful(commands.Cog): if "Sorry" in data: return await ctx.reply(data.split("\n\n")[0]) - loc = LOC_RE.search(data).group(1) + loc_res = LOC_RE.search(data) + loc = loc_res.group(1) if loc_res else "" text = "\n".join(data.split("\n")[1:7]) wrapped = self.bot.codeblock(text, "py") @@ -396,153 +396,6 @@ class Useful(commands.Cog): await ctx.reply(file=discord.File(StringIO(text), "decoded_QR_code.txt")) await ctx.reply(text) - @commands.command(name="map", aliases=["maps"]) - @commands.cooldown(1, 2, commands.BucketType.default) - async def _map(self, ctx: commands.Context, *, query: str): - """ - Return a static map for a given location. - - Examples: - `{prefix}map statue of liberty` - `{prefix}map cieszyn, stawowa` - """ - GEOCODER_API = "https://nominatim.openstreetmap.org/search" - HEADERS = {"User-Agent": self.bot.real_user_agent, "Accept-Language": "en-US"} - STATIC_MAP_URL = ( - "https://tyler-demo.herokuapp.com/?lat={lat}&lon={lon}&width=800&height=600&zoom={zoom}" - ) - - results = await self.bot.cache.get(f"geocoder:{query}") - if not results: - await ctx.typing() - params = {"q": query, "format": "jsonv2"} - async with self.bot.session.get(GEOCODER_API, params=params, headers=HEADERS) as r: - results = await r.json() - await self.bot.cache.set(f"geocoder:{query}", results, ttl=60) - - if not results: - raise ArtemisError("No results found.") - elif len(results) == 1: - result = results[0] - else: - view = DropdownView( - ctx, - results, - lambda x: x.get("display_name") or "Unknown display name.", - lambda x: f"{x['osm_type']} {x['osm_id']}", - "Choose place...", - ) - result = await view.prompt() - if not result: - return - await ctx.typing() - - lat = result["lat"] - lon = result["lon"] - address = result["display_name"] - osm_id = result["osm_id"] - osm_type = result["osm_type"] - bbox: list[float] = result["boundingbox"] - - lon_diff = abs(float(bbox[2]) - float(bbox[3])) - lat_diff = abs(float(bbox[0]) - float(bbox[1])) - zoom_lon = ceil(log2(360 * 2 / lon_diff)) - zoom_lat = ceil(log2(180 * 2 / lat_diff)) - zoom = max(zoom_lon, zoom_lat) - 1 - zoom = max(0, min(zoom, 19)) - - url = f"https://www.openstreetmap.org/{osm_type}/{osm_id}" - - async with self.bot.session.get(STATIC_MAP_URL.format(lat=lat, lon=lon, zoom=zoom)) as r: - data = await r.read() - - data = BytesIO(data) - file = discord.File(data, f"{osm_id}.png") - - embed = discord.Embed(title=utils.trim(address, 256), url=url, color=0xFEFEFE) - embed.set_image(url=f"attachment://{osm_id}.png") - embed.set_footer( - text="Data © OpenStreetMap contributors, ODbL 1.0. https://osm.org/copyright" - ) - - await ctx.reply(embed=embed, file=file) - - @commands.command() - @commands.cooldown(1, 2, commands.BucketType.default) - async def reverse(self, ctx: commands.Context, *, url: Optional[utils.URL]): - """ - Yandex Reverse Image Search. - """ - headers = {"User-Agent": self.bot.user_agent} - bad_link_msg = "Couldn't upload image. Try uploading a different one." - - await ctx.typing() - - if not ctx.message.attachments and not url: - return await ctx.reply("Please send me a valid image first!") - elif ctx.message.attachments: - url = ctx.message.attachments[0].url - - async with self.bot.session.get( - f"https://yandex.com/images/search?url={url}&rpt=imageview", headers=headers - ) as r: - if not r.ok: - return await ctx.reply(f"Yandex API Error: {r.status} {r.reason}") - html = await r.text() - - if bad_link_msg in html: - return await ctx.reply(bad_link_msg) - - soup = BeautifulSoup(html, "lxml") - - preview_img = soup.select_one(".CbirPreview-Image") - assert preview_img - preview_img_url = preview_img["src"] - - embed = discord.Embed(title="Uploaded image", color=0xFDDE55, url=r.url) - embed.set_thumbnail(url=preview_img_url) - embed.set_author( - name="Yandex", - icon_url="https://yastatic.net/s3/web4static/_/v2/oxjfXL1EO-B5Arm80ZrL00p0al4.png", - ) - - tags = soup.select(".CbirTags a") - if tags: - tags_fmt = [] - for tag in tags: - href = "https://yandex.com" + tag["href"] - tags_fmt.append(f"[{tag.span.text}]({href})") - embed.add_field( - name="Image appears to contain", value=", ".join(tags_fmt), inline=False - ) - - sizes = soup.select(".CbirOtherSizes a") - if sizes: - sizes_fmt = [] - for size in sizes[:4]: - sizes_fmt.append(f"[{size.span.text}]({size['href']})") - embed.add_field(name="Other image sizes", value=", ".join(sizes_fmt), inline=False) - - results = soup.select(".CbirSites-ItemInfo") - - for result in results[:3]: - a = result.select_one(".CbirSites-ItemTitle a") - if not a: - continue - - title = a.text - url = a["href"] - url = f"[{utils.trim(url.split('//', 1)[-1], 50)}]({url})" - description = result.select_one(".CbirSites-ItemDescription").text - description = description if "http" not in description else None - - value = f"{url}\n{description}" if description else url - embed.add_field( - name=utils.trim(title, 256), value=utils.trim(value, 1024), inline=False - ) - - await ctx.reply(embed=embed) - @cached(ttl=6 * 60 * 60) async def get_lyngsat_cse_url(self): headers = {"User-Agent": self.bot.user_agent} @@ -666,7 +519,6 @@ class Useful(commands.Cog): satellite_data = result.select("td") satellite_pos = satellite_data[0].text.strip() - assert satellite_data[1].a satellite_url = satellite_data[1].a["href"] sat_pos = re.search(r"(\d{1,3}(?:\.\d)?).*?((?:E|W))", satellite_pos) diff --git a/artemis/utils/anilist.py b/artemis/utils/anilist.py index 29af246..9a94717 100644 --- a/artemis/utils/anilist.py +++ b/artemis/utils/anilist.py @@ -87,7 +87,7 @@ def build_anilist_embed(result: Anime | Manga) -> discord.Embed: if start_date and get(start_date, "year"): embed.add_field(name="Release Year", value=start_date.year, inline=True) - if result is Anime: + if isinstance(result, Anime): nextairing = get(result, "next_airing", None) episodes = get(result, "episodes") or get(nextairing, "episode") duration = get(result, "duration") @@ -102,7 +102,7 @@ def build_anilist_embed(result: Anime | Manga) -> discord.Embed: str(duration) + " mins per ep." if media_format == "TV" else str(duration) + " mins" ) embed.add_field(name="Duration", value=duration) - elif result is Manga: + elif isinstance(result, Manga): volumes = get(result, "volumes") chapters = get(result, "chapters") diff --git a/artemis/utils/common.py b/artemis/utils/common.py index fccc91a..28523d2 100644 --- a/artemis/utils/common.py +++ b/artemis/utils/common.py @@ -547,9 +547,11 @@ async def get_file_from_attachment_or_url( return await r.read() except BaseException as err: raise ArtemisError("An error occured when trying to connect to the given URL.") from err + else: + raise ArtemisError("Unreachable code.") -T = TypeVar("T") +T = TypeVar("T", str, dict) def fuzzy_search( diff --git a/artemis/utils/config.py b/artemis/utils/config.py index b75cce5..bae7283 100644 --- a/artemis/utils/config.py +++ b/artemis/utils/config.py @@ -5,7 +5,7 @@ from .common import read_toml @dataclass -class Keys: +class Secrets: api: str catbox: str github: str @@ -24,10 +24,10 @@ class Config: cdn_url: str main_guild_id: int dev_guild_id: int - keys: Keys + secrets: Secrets def __post_init__(self): - self.keys = Keys(**self.keys) + self.secrets = Secrets(**self.secrets) def load_config() -> Config: diff --git a/artemis/utils/reddit.py b/artemis/utils/reddit.py deleted file mode 100644 index d1406e8..0000000 --- a/artemis/utils/reddit.py +++ /dev/null @@ -1,270 +0,0 @@ -from __future__ import annotations -from functools import cached_property - -import html -import random -import re -from typing import Any, Literal, Optional - -import discord -import pendulum -from aiohttp import ClientSession -from humanize import intcomma -from yt_dlp.utils import random_user_agent - -from . import utils -from .common import ArtemisError - - -class Route: - method: str - path: str - url: str - - BASE = "https://old.reddit.com" - - def __init__(self, method, path): - self.method = method - self.path = path - self.url = self.BASE + self.path - - -class Reddit: - def __init__(self, session: ClientSession): - self.session: ClientSession = session - - @staticmethod - def _gen_session_id() -> str: - id_length = 16 - rand_max = 1 << (id_length * 4) - return "%0.*x" % (id_length, random.randrange(rand_max)) - - async def _request(self, route: Route, **kwargs) -> dict[str, Any]: - headers = {"User-Agent": random_user_agent()} - cookies = { - "reddit_session": self._gen_session_id(), - "_options": "%7B%22pref_quarantine_optin%22%3A%20true%7D", - } - async with self.session.request( - route.method, route.url, headers=headers, cookies=cookies, **kwargs - ) as r: - data = await r.json() - return data - - async def subreddit( - self, - name: str = "all", - sort: Literal["hot", "new"] = "hot", - include_stickied_and_pinned: bool = False, - ) -> list[Post]: - route = Route("GET", f"/r/{name}/{sort}.json") - data = await self._request(route) - - if data.get("reason"): - raise ArtemisError(f"This subreddit is inaccessible.\nReason: `{data['reason']}`") - if not data.get("data") or not data["data"]["children"]: - raise ArtemisError(f"Subreddit `{name}` not found.") - - posts = [Post(post["data"]) for post in data["data"]["children"]] - - if not include_stickied_and_pinned: - posts = [post for post in posts if not post.stickied and not post.pinned] - - return posts - - async def post(self, pid: str): - route = Route("GET", f"/{pid}.json?limit=1") - try: - data = await self._request(route) - post_data = data[0]["data"]["children"][0]["data"] - return Post(post_data) - except Exception: - return None - - async def random(self, subreddit: str = "all", *args, **kwargs) -> Post: - posts = await self.subreddit(subreddit, *args, **kwargs) - return random.choice(posts) - - async def random_image(self, subreddit: str = "all", *args, **kwargs) -> str: - posts = await self.subreddit(subreddit, *args, **kwargs) - images = [post for post in posts if post.image or post.gallery] - post = random.choice(images) - - if post.gallery: - return post.gallery[0] - return post.image - - -class Post: - ICON = "https://www.redditstatic.com/desktop2x/img/favicon/android-icon-192x192.png" - GOLD_ICON = "https://www.redditstatic.com/gold/awards/icon/gold_64.png" - - def __init__(self, data: dict): - self.data = data - - self.title = html.unescape(self.data["title"]) - self.body = self.data.get("selftext") - self.thumbnail = self.data.get("thumbnail", "") - - self.over_18 = self.data.get("over_18") - self.stickied = self.data.get("stickied") - self.pinned = self.data.get("pinned") - self.spoiler = self.data.get("spoiler") - self.score = self.data.get("score", "N/A") - self.num_comments = self.data.get("num_comments", "N/A") - self.gilded = self.data.get("gilded") - self.awards = self.data.get("all_awardings") - - self.permalink = "https://reddit.com" + self.data.get("permalink", "") - self.subreddit = self.data.get("subreddit") - self.subreddit_prefixed = f"r/{self.subreddit}" - self.created_at = pendulum.from_timestamp(self.data["created_utc"], "UTC") - - @cached_property - def image(self) -> Optional[str]: - image = self.data.get("url_overridden_by_dest", "") - - if not self.body: - if self.data.get("secure_media") or self.data.get("media_embed"): - return None - if re.search(r"(i\.redd\.it\/[^\/]+\.gifv)|gifv|webm", image): - return None - elif re.search(r"jpg|png|webp|gif|jfif|jpeg|imgur", image): - return image - return None - - @cached_property - def video(self) -> Optional[str]: - media = self.data.get("media") or self.data.get("secure_media") - if not media: - return None - reddit_video = media.get("reddit_video") - if not reddit_video: - return None - playlist = reddit_video.get("dash_url") or reddit_video.get("hls_url") - if not playlist: - return None - return playlist - - @cached_property - def preview(self) -> Optional[str]: - try: - preview = self.data["preview"]["images"][0]["source"]["url"] - return html.unescape(preview) - except Exception: - return None - - @cached_property - def gallery(self) -> list[str]: - if not self.data.get("gallery_data") or not self.data.get("media_metadata"): - return [] - - images: list[str] = [] - metadata = self.data["media_metadata"] - - for image in self.data["gallery_data"]["items"]: - media_id = image["media_id"] - try: - url = html.unescape(metadata[media_id]["s"]["u"]) - except Exception: - url = html.unescape(metadata[media_id]["s"]["gif"]) - images.append(url) - - return images - - def get_warnings(self, nsfw: bool) -> str | None: - warnings = [] - if self.spoiler: - warnings.append("SPOILER") - if nsfw: - warnings.append("NSFW") - if self.data.get("secure_media") or self.data.get("media_embed"): - warnings.append("UNSUPPORTED MEDIA") - - if warnings: - return f"`❗ {', '.join(warnings)}`" # type: ignore - return None - - def is_nsfw(self, message: discord.Message): - return self.over_18 and message.guild and not message.channel.is_nsfw() - - async def to_embed(self, message: discord.Message) -> list[discord.Embed]: - COLOUR = discord.Colour(0xFF4500) - SPOILER_IMG_URL = "https://derpicdn.net/img/2016/5/22/1160541/medium.png" - NSFW_IMG_URL = "https://upload.wikimedia.org/wikipedia/commons/thumb/7/7a/Znaczek_TV_-_dozwolone_od_lat_18.svg/150px-Znaczek_TV_-_dozwolone_od_lat_18.svg.png" - - files = [] - icon_url = None - embed = discord.Embed(title=utils.trim(self.title, 256), url=self.permalink, colour=COLOUR) - embeds = [embed] - - nsfw = self.is_nsfw(message) - - if self.image: - embed.set_image(url=self.image) - - if self.body: - body = html.unescape(utils.trim(self.body.strip(), 4096)) - images = re.findall(r"https.*(?:png|jpg|jpeg|webp|gif)\S*", body) - for idx, url in enumerate(images[:10]): - if idx == 0: - embed.set_image(url=url) - else: - embeds.append(discord.Embed(color=COLOUR).set_image(url=url)) - - if self.spoiler or nsfw: - body = f"||{body}||" - - embed.description = body - - if self.gallery: - for idx, url in enumerate(self.gallery[:10]): - if idx == 0: - embed.set_image(url=url) - else: - embeds.append(discord.Embed(colour=COLOUR).set_image(url=url)) - - if not embed.image: - if self.preview: - embed.set_image(url=self.preview) - elif self.thumbnail and "http" in self.thumbnail: - embed.set_thumbnail(url=self.thumbnail) - - if nsfw: - if embed.thumbnail: - embed.set_thumbnail(url=NSFW_IMG_URL) - elif embed.image: - for idx, embed in enumerate(embeds): - embed.set_image(url=NSFW_IMG_URL) - - if self.spoiler: - if embed.image and not files: - for idx, embed in enumerate(embeds): - embed.set_image(url=SPOILER_IMG_URL) - - warnings = self.get_warnings(nsfw) - if warnings: - if embed.description: - embed.description = f"{warnings}\n\n{embed.description}" - else: - embed.description = warnings - - if self.gilded: - icon_url = self.GOLD_ICON - elif self.awards: - sorted_awards = sorted(self.awards, key=lambda x: int(x.get("count")), reverse=True) - icon_url = sorted_awards[0]["icon_url"] - - upvotes = f"{intcomma(self.score)} upvote{'s' if self.score != 1 else ''}" - comments = f"{intcomma(self.num_comments)} comment{'s' if self.num_comments != 1 else ''}" - - embed.set_author( - name=self.subreddit_prefixed, - icon_url=self.ICON, - url=f"https://reddit.com/r/{self.subreddit}", - ) - - embeds[-1].set_footer(text=f"{upvotes} and {comments}", icon_url=icon_url) - embeds[-1].timestamp = self.created_at - - return embeds diff --git a/artemis/utils/views.py b/artemis/utils/views.py index 61728e3..86e1aa0 100644 --- a/artemis/utils/views.py +++ b/artemis/utils/views.py @@ -127,6 +127,8 @@ class ViewPages(BaseView): class BaseDropdown(discord.ui.Select): + view: BaseView + def __init__(self, items: list, label_key, description_key, placeholder: str, max_values: int): self.items = items diff --git a/config.example.toml b/config.example.toml index d2c1a6f..cccc272 100644 --- a/config.example.toml +++ b/config.example.toml @@ -7,7 +7,7 @@ cdn_url = "cdn_url" main_guild_id = 1 dev_guild_id = 1 -[keys] +[secrets] api = "api" catbox = "catbox" github = "github"