diff --git a/artemis/bot.py b/artemis/bot.py index 63c24cf..2c62ff4 100644 --- a/artemis/bot.py +++ b/artemis/bot.py @@ -104,7 +104,7 @@ class Artemis(commands.Bot): await self.load_extensions() - self.api = API(self, self.keys.api) + self.api = API(self, config.internal_api_url, self.keys.api) self.catbox = Catbox(self.keys.catbox, session=self.session) self.litterbox = Litterbox(session=self.session) self.reddit = Reddit(self.session) diff --git a/artemis/cogs/anime.py b/artemis/cogs/anime.py index 9cf043a..92cbbf9 100644 --- a/artemis/cogs/anime.py +++ b/artemis/cogs/anime.py @@ -311,7 +311,7 @@ class Anime(commands.Cog): Search for art on Danbooru or show a random image. This uses the common tag search logic found on booru imageboards, fuzzy matching for tags is enabled. """ - params = None + params = {} await ctx.typing() diff --git a/artemis/cogs/funhouse.py b/artemis/cogs/funhouse.py index 6d9a51e..9df08f7 100644 --- a/artemis/cogs/funhouse.py +++ b/artemis/cogs/funhouse.py @@ -51,6 +51,7 @@ class Funhouse(commands.Cog): async def invoke_reddit(self, ctx: commands.Context, subreddit: str): reddit = self.bot.get_command("reddit") + assert reddit return await reddit(ctx, subreddit) @commands.command() @@ -144,6 +145,7 @@ class Funhouse(commands.Cog): banner_colour = user.accent_colour if banner_colour: colour_cmd = self.bot.get_command("color") + assert colour_cmd return await colour_cmd(ctx, colour=banner_colour) else: raise ArtemisError(f"{user.display_name} does not have a custom banner set.") @@ -223,9 +225,14 @@ class Funhouse(commands.Cog): if not title: title = f"{post.select_one('.post_author').text} {post.select_one('time').text} UTC" - post_url = post.find( + post_a = post.find( "a", attrs={"href": re.compile(r"https://desuarchive.org/.*?/thread/")} - )["href"] + ) + + if not post_a: + continue + + post_url = post_a["href"] board = post_url.split("/")[-4] if board in banned_boards: continue @@ -397,7 +404,7 @@ class Funhouse(commands.Cog): embed.set_author( name="#" + result["number"], icon_url="https://www.pokemon.com/favicon.ico" ) - embed.set_image(url=f"{config.cdn_base_url}/pokedex/{result['id']:>03}.png") + embed.set_image(url=f"{config.cdn_url}/pokedex/{result['id']:>03}.png") types = ", ".join([t.title() for t in result["type"]]) abilities = ", ".join(result["abilities"]) diff --git a/artemis/cogs/language.py b/artemis/cogs/language.py index b853707..06a8349 100644 --- a/artemis/cogs/language.py +++ b/artemis/cogs/language.py @@ -262,7 +262,6 @@ class Language(commands.Cog): file = discord.File(buff, f"{src}-{dest}.txt") return await ctx.reply( - "The translation could not fit on the screen, so here's a file:", file=file, ) @@ -339,7 +338,6 @@ class Language(commands.Cog): file = discord.File(buff, f"{display_src}-{display_dest}.txt") return await ctx.reply( - "The translation could not fit on the screen, so here's a file:", file=file, ) @@ -568,6 +566,9 @@ class Language(commands.Cog): embeds = [] for entry in entries: + if not entry: + continue + embed = discord.Embed( title=entry["word"], description=entry["definition"], diff --git a/artemis/cogs/media.py b/artemis/cogs/media.py index 1e30b7f..e10fcac 100644 --- a/artemis/cogs/media.py +++ b/artemis/cogs/media.py @@ -71,13 +71,13 @@ YOUTUBE_BANNED_MESSAGE = """ """ -async def run_ytdlp(query: str, opts: dict, download: bool = True) -> dict: +def run_ytdlp(query: str, opts: dict, download: bool = True): if YoutubeIE.suitable(query): raise ArtemisError(YOUTUBE_BANNED_MESSAGE) try: with yt_dlp.YoutubeDL(opts) as ytdl: - return await asyncio.to_thread(ytdl.extract_info, query, download=download) + return asyncio.to_thread(ytdl.extract_info, query, download=download) except yt_dlp.utils.YoutubeDLError as error: raise ArtemisError(format_ytdlp_error(error)) @@ -259,6 +259,7 @@ class Media(commands.Cog): async with ctx.typing(): info_dict = await run_ytdlp(url, ytdl_opts, download=False) + assert info_dict title = info_dict.get("title") url = info_dict["url"] @@ -296,6 +297,7 @@ class Media(commands.Cog): async with ctx.typing(): info_dict = await run_ytdlp(url, ytdl_opts, download=False) + assert info_dict title = info_dict["title"] url = info_dict["url"] @@ -347,9 +349,6 @@ class Media(commands.Cog): `{prefix}dl t:120-160 https://www.reddit.com/r/anime/comments/f86otf/` """ path: Path = None - msg: discord.Message = None - finished = False - state = "downloading" template = TEMP_DIR.joinpath("%(id)s.%(ext)s").as_posix() url = flags.url @@ -357,32 +356,6 @@ class Media(commands.Cog): trim = flags.trim ss, to = flags.ss, None - async def _monitor_download(): - nonlocal msg, state - while not finished: - content = "Processing..." - if state == "downloading": - match = None - files = list(TEMP_DIR.iterdir()) - if files: - match = max(files, key=lambda f: f.stat().st_size) - if match: - size = match.stat().st_size - size = humanize.naturalsize(size, binary=True) - content = f":arrow_down: `Downloading...` {size}" - else: - content = ":arrow_down: `Downloading...`" - elif state == "uploading": - content = ":arrow_up: `Uploading...`" - - if not msg: - msg = await ctx.reply(content) - else: - msg = await msg.edit(content=content) - await asyncio.sleep(1) - if msg: - await msg.delete() - try: url = url.strip("<>") utils.check_for_ssrf(url) @@ -442,10 +415,8 @@ class Media(commands.Cog): ytdl_opts["format"] = format info_dict = None - # asyncio.create_task(monitor_download()) async with ctx.typing(): info_dict = await run_ytdlp(url, ytdl_opts) - state = "uploading" title = utils.romajify(info_dict.get("title")) vid_id = info_dict.get("id") @@ -483,7 +454,6 @@ class Media(commands.Cog): except Exception as err: raise err finally: - finished = True if path and path.exists(): path.unlink() @@ -568,7 +538,7 @@ class Media(commands.Cog): if year: title += f" ({year})" author = cells[1].text - mirrors = [cell.a["href"] for cell in cells[9:11]] + mirrors = [cell.a["href"] for cell in cells[9:11] if cell.a] ext = cells[8].text entries.append((title, author, mirrors, ext)) @@ -589,10 +559,12 @@ class Media(commands.Cog): continue soup = BeautifulSoup(html, "lxml") - url = soup.find("a", text="GET")["href"] - if not url: + a = soup.find("a", text="GET") + if not a or not a.has_attr("href"): continue + url = a["href"] + try: async with self.bot.session.get(url, headers=headers) as r: filesize = r.headers.get("content-length") diff --git a/artemis/cogs/mod.py b/artemis/cogs/mod.py index f25ae74..df346b3 100644 --- a/artemis/cogs/mod.py +++ b/artemis/cogs/mod.py @@ -13,7 +13,7 @@ if TYPE_CHECKING: class ShortTime(commands.Converter): - async def convert(self, ctx: commands.Context, argument: str) -> pendulum.DateTime: + async def convert(self, _: commands.Context, argument: str): return parse_short_time(argument) diff --git a/artemis/cogs/music.py b/artemis/cogs/music.py index 810fce8..b95fe4b 100644 --- a/artemis/cogs/music.py +++ b/artemis/cogs/music.py @@ -156,6 +156,8 @@ class Music(commands.Cog): ytdl_opts = {**DEFAULT_OPTS, "default_search": "auto", "format": "251/ba*"} info_dict = await run_ytdlp(url_or_query, ytdl_opts, download=False) + assert info_dict + if info_dict.get("entries"): info_dict = info_dict["entries"][0] @@ -212,10 +214,10 @@ class Music(commands.Cog): self.queue.append(song) await self.real_play() - @commands.command() + @commands.command(name="queue") @commands.check(in_voice_channel) @commands.check(audio_playing) - async def queue(self, ctx: commands.Context): + async def queue_cmd(self, ctx: commands.Context): if not self.queue: return await ctx.reply("The queue is empty.") diff --git a/artemis/cogs/ocr.py b/artemis/cogs/ocr.py index b3d27eb..0c94a89 100644 --- a/artemis/cogs/ocr.py +++ b/artemis/cogs/ocr.py @@ -1,15 +1,14 @@ from __future__ import annotations -import json -import mimetypes -import re from io import StringIO from typing import TYPE_CHECKING, Literal, Optional import discord -import magic from discord.ext import commands +import discord.ext +import discord.ext.commands + from .. import utils from ..utils.common import ArtemisError, compress_image, get_reply from ..utils.constants import TESSERACT_LANGUAGES @@ -79,20 +78,14 @@ class OCR(commands.Cog): else: flags = Flags(text=text, source=None, dest=None) cmd = self.bot.get_command(translate) + assert cmd await cmd(ctx, flags=flags) else: if len(text) > 2000 - 8: return await ctx.reply(file=discord.File(StringIO(text), "ocr.txt")) await ctx.reply(self.bot.codeblock(text, "")) - async def lens_impl(self, ctx: commands.Context[Artemis], url: str | None) -> str: - headers = {"User-Agent": self.bot.user_agent} - cookies = self.bot.keys.google - final_data_re = r"\"([\w-]+)\",\[\[(\[\".*?\"\])\]" - - cur_time = utils.time("ms") - upload_url = f"https://lens.google.com/v3/upload?hl=en&re=df&st={cur_time}&ep=gsbubb" - + async def yandex_impl(self, ctx: commands.Context[Artemis], url: str | None): await ctx.typing() if url or ctx.message.attachments: @@ -113,31 +106,8 @@ class OCR(commands.Cog): except Exception as e: raise ArtemisError(f"Could not compress image: {e}") from e - content_type = magic.from_buffer(image, mime=True) - ext = mimetypes.guess_extension(content_type) - - files = {"encoded_image": (f"image{ext}", image, content_type)} - r = await ctx.bot.httpx_session.post( - upload_url, - files=files, - headers=headers, - cookies=cookies, - follow_redirects=True, - ) - if r.is_error: - print(r.text) - raise ArtemisError(f"Google Lens Upload returned {r.status_code} {r.reason_phrase}") - html = r.text - - match = re.search(final_data_re, html) - if not match: - if ctx.author.id == self.bot.owner.id: - await ctx.send(file=utils.file(html, "lens.html")) - raise ArtemisError("No text detected.") - _lang, lines = match.groups() - - text = "\n".join(json.loads(lines)) - return text + result = await self.bot.api.yandex_ocr(image, "image/jpeg") + return result @commands.command(usage="[lang:eng] [l:eng] ") @commands.cooldown(1, 2, commands.BucketType.default) @@ -175,23 +145,32 @@ class OCR(commands.Cog): @commands.cooldown(1, 10, commands.BucketType.default) async def lens(self, ctx: commands.Context, *, url: Optional[str]): """ - OCR using Google Lens. + OCR using Yandex. """ - text = await self.lens_impl(ctx, url) - if len(text) > 2000 - 8: - return await ctx.reply(file=discord.File(StringIO(text), "lens.txt")) - await ctx.reply(self.bot.codeblock(text, "")) + result = await self.yandex_impl(ctx, url) + + assert result.detected_lang + lang = get_language_name(result.detected_lang) or result.detected_lang + msg = f"Detected language: {lang}\n" + self.bot.codeblock(result.text, "") + + if len(msg) > 2000: + return await ctx.reply( + content=f"Detected language: {lang}", + file=discord.File(StringIO(result.text), "lens.txt"), + ) + await ctx.reply(msg) @commands.command() @commands.max_concurrency(1) @commands.cooldown(1, 10, commands.BucketType.default) async def lensgt(self, ctx: commands.Context, *, url: Optional[str]): """ - OCR using Google Lens and translation using Google Translate. + OCR using Yandex and translation using Google Translate. """ - text = await self.lens_impl(ctx, url) - flags = Flags(text=text, source=None, dest=None) + result = await self.yandex_impl(ctx, url) + flags = Flags(text=result.text, source=None, dest=None) cmd = self.bot.get_command("gt") + assert cmd await cmd(ctx, flags=flags) @commands.command(aliases=["lensdl", "lenstr"]) @@ -199,11 +178,12 @@ class OCR(commands.Cog): @commands.cooldown(1, 10, commands.BucketType.default) async def lensdeepl(self, ctx: commands.Context, *, url: Optional[str]): """ - OCR using Google Lens and translation using DeepL. + OCR using Yandex and translation using DeepL. """ - text = await self.lens_impl(ctx, url) - flags = Flags(text=text, source=None, dest=None) + result = await self.yandex_impl(ctx, url) + flags = Flags(text=result.text, source=None, dest=None) cmd = self.bot.get_command("deepl") + assert cmd await cmd(ctx, flags=flags) diff --git a/artemis/cogs/owner.py b/artemis/cogs/owner.py index d7d8314..43893be 100644 --- a/artemis/cogs/owner.py +++ b/artemis/cogs/owner.py @@ -178,6 +178,7 @@ except Exception: code = silencer + code jsk_py = self.bot.get_command("jsk py") + assert jsk_py await jsk_py(ctx, argument=codeblocks.codeblock_converter(code)) @dev.command() diff --git a/artemis/cogs/useful.py b/artemis/cogs/useful.py index 6e78c78..fae8a11 100644 --- a/artemis/cogs/useful.py +++ b/artemis/cogs/useful.py @@ -496,6 +496,7 @@ class Useful(commands.Cog): soup = BeautifulSoup(html, "lxml") preview_img = soup.select_one(".CbirPreview-Image") + assert preview_img preview_img_url = preview_img["src"] embed = discord.Embed(title="Uploaded image", color=0xFDDE55, url=r.url) @@ -526,8 +527,10 @@ class Useful(commands.Cog): for result in results[:3]: a = result.select_one(".CbirSites-ItemTitle a") - title = a.text + if not a: + continue + title = a.text url = a["href"] url = f"[{utils.trim(url.split('//', 1)[-1], 50)}]({url})" description = result.select_one(".CbirSites-ItemDescription").text @@ -663,6 +666,7 @@ class Useful(commands.Cog): satellite_data = result.select("td") satellite_pos = satellite_data[0].text.strip() + assert satellite_data[1].a satellite_url = satellite_data[1].a["href"] sat_pos = re.search(r"(\d{1,3}(?:\.\d)?).*?((?:E|W))", satellite_pos) diff --git a/artemis/utils/api.py b/artemis/utils/api.py index b68c12d..09f2d54 100644 --- a/artemis/utils/api.py +++ b/artemis/utils/api.py @@ -1,48 +1,43 @@ from __future__ import annotations -import asyncio +import base64 from dataclasses import dataclass -from typing import TYPE_CHECKING, Any, Literal +from typing import TYPE_CHECKING import aiohttp +from artemis.utils.common import ArtemisError + if TYPE_CHECKING: from ..bot import Artemis @dataclass -class DeepLResult: - translation: str +class YandexResult: + text: str + detected_lang: str | None = None class API: - def __init__(self, bot: Artemis, token: str): - self.base_url = "http://127.0.0.1:3000" + def __init__(self, bot: Artemis, base_url: str, token: str): + self.base_url = base_url self.token = token self.session: aiohttp.ClientSession = bot.session self.headers = {"User-Agent": bot.real_user_agent} self.authed_headers = {**self.headers, "Authorization": f"Bearer {self.token}"} - async def _aioread(self, fp): - return await asyncio.to_thread(fp.read) + async def yandex_ocr(self, image: bytes, mime: str): + base64_image = base64.b64encode(image).decode("utf-8") + data = {"file": base64_image, "mime": mime} - async def _request( - self, - method: str, - path: str, - authed: bool = False, - res_type: Literal["json", "text", "bytes"] = "json", - **kwargs, - ) -> Any: - headers = self.authed_headers if authed else self.headers - async with self.session.request( - method, self.base_url + path, headers=headers, **kwargs + async with self.session.post( + self.base_url + "/ocr/yandex", json=data, headers=self.authed_headers ) as r: - match res_type: - case "json": - return await r.json() - case "text": - return await r.text() - case "bytes": - return await r.read() + data = await r.json() + if not r.ok: + raise ArtemisError(f"Yandex Error: {data.get('error', 'Unknown')}") + result = YandexResult(**data) + if not result.text: + raise ArtemisError("No text detected.") + return result diff --git a/artemis/utils/config.py b/artemis/utils/config.py index d6aad33..b75cce5 100644 --- a/artemis/utils/config.py +++ b/artemis/utils/config.py @@ -12,7 +12,6 @@ class Keys: cloudflare: str openai: str deepl: str - google: dict[str, str] @dataclass @@ -21,14 +20,14 @@ class Config: prefix: str user_agent: str real_user_agent: str - api_base_url: str - cdn_base_url: str + internal_api_url: str + cdn_url: str main_guild_id: int dev_guild_id: int keys: Keys def __post_init__(self): - self.keys = Keys(**self.keys) # type: ignore + self.keys = Keys(**self.keys) def load_config() -> Config: diff --git a/config.example.toml b/config.example.toml index f49c622..d2c1a6f 100644 --- a/config.example.toml +++ b/config.example.toml @@ -2,8 +2,8 @@ token = "token" prefix = "!" user_agent = "user_agent" real_user_agent = "real_user_agent" -api_base_url = "api_base_url" -cdn_base_url = "cdn_base_url" +internal_api_url = "internal_api_url" +cdn_url = "cdn_url" main_guild_id = 1 dev_guild_id = 1 @@ -14,6 +14,3 @@ github = "github" cloudflare = "cloudflare" openai = "openai" deepl = "deepl" - -# google cookies for lens API -[keys.google]