mirror of
https://github.com/artiemis/artemis.git
synced 2026-02-14 08:31:55 +00:00
replace lens with yandex + initial pyright cleanup
This commit is contained in:
parent
2c60e09317
commit
8cb2c3e861
@ -104,7 +104,7 @@ class Artemis(commands.Bot):
|
||||
|
||||
await self.load_extensions()
|
||||
|
||||
self.api = API(self, self.keys.api)
|
||||
self.api = API(self, config.internal_api_url, self.keys.api)
|
||||
self.catbox = Catbox(self.keys.catbox, session=self.session)
|
||||
self.litterbox = Litterbox(session=self.session)
|
||||
self.reddit = Reddit(self.session)
|
||||
|
||||
@ -311,7 +311,7 @@ class Anime(commands.Cog):
|
||||
Search for art on Danbooru or show a random image.
|
||||
This uses the common tag search logic found on booru imageboards, fuzzy matching for tags is enabled.
|
||||
"""
|
||||
params = None
|
||||
params = {}
|
||||
|
||||
await ctx.typing()
|
||||
|
||||
|
||||
@ -51,6 +51,7 @@ class Funhouse(commands.Cog):
|
||||
|
||||
async def invoke_reddit(self, ctx: commands.Context, subreddit: str):
|
||||
reddit = self.bot.get_command("reddit")
|
||||
assert reddit
|
||||
return await reddit(ctx, subreddit)
|
||||
|
||||
@commands.command()
|
||||
@ -144,6 +145,7 @@ class Funhouse(commands.Cog):
|
||||
banner_colour = user.accent_colour
|
||||
if banner_colour:
|
||||
colour_cmd = self.bot.get_command("color")
|
||||
assert colour_cmd
|
||||
return await colour_cmd(ctx, colour=banner_colour)
|
||||
else:
|
||||
raise ArtemisError(f"{user.display_name} does not have a custom banner set.")
|
||||
@ -223,9 +225,14 @@ class Funhouse(commands.Cog):
|
||||
if not title:
|
||||
title = f"{post.select_one('.post_author').text} {post.select_one('time').text} UTC"
|
||||
|
||||
post_url = post.find(
|
||||
post_a = post.find(
|
||||
"a", attrs={"href": re.compile(r"https://desuarchive.org/.*?/thread/")}
|
||||
)["href"]
|
||||
)
|
||||
|
||||
if not post_a:
|
||||
continue
|
||||
|
||||
post_url = post_a["href"]
|
||||
board = post_url.split("/")[-4]
|
||||
if board in banned_boards:
|
||||
continue
|
||||
@ -397,7 +404,7 @@ class Funhouse(commands.Cog):
|
||||
embed.set_author(
|
||||
name="#" + result["number"], icon_url="https://www.pokemon.com/favicon.ico"
|
||||
)
|
||||
embed.set_image(url=f"{config.cdn_base_url}/pokedex/{result['id']:>03}.png")
|
||||
embed.set_image(url=f"{config.cdn_url}/pokedex/{result['id']:>03}.png")
|
||||
|
||||
types = ", ".join([t.title() for t in result["type"]])
|
||||
abilities = ", ".join(result["abilities"])
|
||||
|
||||
@ -262,7 +262,6 @@ class Language(commands.Cog):
|
||||
file = discord.File(buff, f"{src}-{dest}.txt")
|
||||
|
||||
return await ctx.reply(
|
||||
"The translation could not fit on the screen, so here's a file:",
|
||||
file=file,
|
||||
)
|
||||
|
||||
@ -339,7 +338,6 @@ class Language(commands.Cog):
|
||||
file = discord.File(buff, f"{display_src}-{display_dest}.txt")
|
||||
|
||||
return await ctx.reply(
|
||||
"The translation could not fit on the screen, so here's a file:",
|
||||
file=file,
|
||||
)
|
||||
|
||||
@ -568,6 +566,9 @@ class Language(commands.Cog):
|
||||
|
||||
embeds = []
|
||||
for entry in entries:
|
||||
if not entry:
|
||||
continue
|
||||
|
||||
embed = discord.Embed(
|
||||
title=entry["word"],
|
||||
description=entry["definition"],
|
||||
|
||||
@ -71,13 +71,13 @@ YOUTUBE_BANNED_MESSAGE = """
|
||||
"""
|
||||
|
||||
|
||||
async def run_ytdlp(query: str, opts: dict, download: bool = True) -> dict:
|
||||
def run_ytdlp(query: str, opts: dict, download: bool = True):
|
||||
if YoutubeIE.suitable(query):
|
||||
raise ArtemisError(YOUTUBE_BANNED_MESSAGE)
|
||||
|
||||
try:
|
||||
with yt_dlp.YoutubeDL(opts) as ytdl:
|
||||
return await asyncio.to_thread(ytdl.extract_info, query, download=download)
|
||||
return asyncio.to_thread(ytdl.extract_info, query, download=download)
|
||||
except yt_dlp.utils.YoutubeDLError as error:
|
||||
raise ArtemisError(format_ytdlp_error(error))
|
||||
|
||||
@ -259,6 +259,7 @@ class Media(commands.Cog):
|
||||
|
||||
async with ctx.typing():
|
||||
info_dict = await run_ytdlp(url, ytdl_opts, download=False)
|
||||
assert info_dict
|
||||
|
||||
title = info_dict.get("title")
|
||||
url = info_dict["url"]
|
||||
@ -296,6 +297,7 @@ class Media(commands.Cog):
|
||||
|
||||
async with ctx.typing():
|
||||
info_dict = await run_ytdlp(url, ytdl_opts, download=False)
|
||||
assert info_dict
|
||||
|
||||
title = info_dict["title"]
|
||||
url = info_dict["url"]
|
||||
@ -347,9 +349,6 @@ class Media(commands.Cog):
|
||||
`{prefix}dl t:120-160 https://www.reddit.com/r/anime/comments/f86otf/`
|
||||
"""
|
||||
path: Path = None
|
||||
msg: discord.Message = None
|
||||
finished = False
|
||||
state = "downloading"
|
||||
template = TEMP_DIR.joinpath("%(id)s.%(ext)s").as_posix()
|
||||
|
||||
url = flags.url
|
||||
@ -357,32 +356,6 @@ class Media(commands.Cog):
|
||||
trim = flags.trim
|
||||
ss, to = flags.ss, None
|
||||
|
||||
async def _monitor_download():
|
||||
nonlocal msg, state
|
||||
while not finished:
|
||||
content = "Processing..."
|
||||
if state == "downloading":
|
||||
match = None
|
||||
files = list(TEMP_DIR.iterdir())
|
||||
if files:
|
||||
match = max(files, key=lambda f: f.stat().st_size)
|
||||
if match:
|
||||
size = match.stat().st_size
|
||||
size = humanize.naturalsize(size, binary=True)
|
||||
content = f":arrow_down: `Downloading...` {size}"
|
||||
else:
|
||||
content = ":arrow_down: `Downloading...`"
|
||||
elif state == "uploading":
|
||||
content = ":arrow_up: `Uploading...`"
|
||||
|
||||
if not msg:
|
||||
msg = await ctx.reply(content)
|
||||
else:
|
||||
msg = await msg.edit(content=content)
|
||||
await asyncio.sleep(1)
|
||||
if msg:
|
||||
await msg.delete()
|
||||
|
||||
try:
|
||||
url = url.strip("<>")
|
||||
utils.check_for_ssrf(url)
|
||||
@ -442,10 +415,8 @@ class Media(commands.Cog):
|
||||
ytdl_opts["format"] = format
|
||||
|
||||
info_dict = None
|
||||
# asyncio.create_task(monitor_download())
|
||||
async with ctx.typing():
|
||||
info_dict = await run_ytdlp(url, ytdl_opts)
|
||||
state = "uploading"
|
||||
|
||||
title = utils.romajify(info_dict.get("title"))
|
||||
vid_id = info_dict.get("id")
|
||||
@ -483,7 +454,6 @@ class Media(commands.Cog):
|
||||
except Exception as err:
|
||||
raise err
|
||||
finally:
|
||||
finished = True
|
||||
if path and path.exists():
|
||||
path.unlink()
|
||||
|
||||
@ -568,7 +538,7 @@ class Media(commands.Cog):
|
||||
if year:
|
||||
title += f" ({year})"
|
||||
author = cells[1].text
|
||||
mirrors = [cell.a["href"] for cell in cells[9:11]]
|
||||
mirrors = [cell.a["href"] for cell in cells[9:11] if cell.a]
|
||||
ext = cells[8].text
|
||||
entries.append((title, author, mirrors, ext))
|
||||
|
||||
@ -589,10 +559,12 @@ class Media(commands.Cog):
|
||||
continue
|
||||
|
||||
soup = BeautifulSoup(html, "lxml")
|
||||
url = soup.find("a", text="GET")["href"]
|
||||
if not url:
|
||||
a = soup.find("a", text="GET")
|
||||
if not a or not a.has_attr("href"):
|
||||
continue
|
||||
|
||||
url = a["href"]
|
||||
|
||||
try:
|
||||
async with self.bot.session.get(url, headers=headers) as r:
|
||||
filesize = r.headers.get("content-length")
|
||||
|
||||
@ -13,7 +13,7 @@ if TYPE_CHECKING:
|
||||
|
||||
|
||||
class ShortTime(commands.Converter):
|
||||
async def convert(self, ctx: commands.Context, argument: str) -> pendulum.DateTime:
|
||||
async def convert(self, _: commands.Context, argument: str):
|
||||
return parse_short_time(argument)
|
||||
|
||||
|
||||
|
||||
@ -156,6 +156,8 @@ class Music(commands.Cog):
|
||||
|
||||
ytdl_opts = {**DEFAULT_OPTS, "default_search": "auto", "format": "251/ba*"}
|
||||
info_dict = await run_ytdlp(url_or_query, ytdl_opts, download=False)
|
||||
assert info_dict
|
||||
|
||||
if info_dict.get("entries"):
|
||||
info_dict = info_dict["entries"][0]
|
||||
|
||||
@ -212,10 +214,10 @@ class Music(commands.Cog):
|
||||
self.queue.append(song)
|
||||
await self.real_play()
|
||||
|
||||
@commands.command()
|
||||
@commands.command(name="queue")
|
||||
@commands.check(in_voice_channel)
|
||||
@commands.check(audio_playing)
|
||||
async def queue(self, ctx: commands.Context):
|
||||
async def queue_cmd(self, ctx: commands.Context):
|
||||
if not self.queue:
|
||||
return await ctx.reply("The queue is empty.")
|
||||
|
||||
|
||||
@ -1,15 +1,14 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import mimetypes
|
||||
import re
|
||||
from io import StringIO
|
||||
from typing import TYPE_CHECKING, Literal, Optional
|
||||
|
||||
import discord
|
||||
import magic
|
||||
from discord.ext import commands
|
||||
|
||||
import discord.ext
|
||||
import discord.ext.commands
|
||||
|
||||
from .. import utils
|
||||
from ..utils.common import ArtemisError, compress_image, get_reply
|
||||
from ..utils.constants import TESSERACT_LANGUAGES
|
||||
@ -79,20 +78,14 @@ class OCR(commands.Cog):
|
||||
else:
|
||||
flags = Flags(text=text, source=None, dest=None)
|
||||
cmd = self.bot.get_command(translate)
|
||||
assert cmd
|
||||
await cmd(ctx, flags=flags)
|
||||
else:
|
||||
if len(text) > 2000 - 8:
|
||||
return await ctx.reply(file=discord.File(StringIO(text), "ocr.txt"))
|
||||
await ctx.reply(self.bot.codeblock(text, ""))
|
||||
|
||||
async def lens_impl(self, ctx: commands.Context[Artemis], url: str | None) -> str:
|
||||
headers = {"User-Agent": self.bot.user_agent}
|
||||
cookies = self.bot.keys.google
|
||||
final_data_re = r"\"([\w-]+)\",\[\[(\[\".*?\"\])\]"
|
||||
|
||||
cur_time = utils.time("ms")
|
||||
upload_url = f"https://lens.google.com/v3/upload?hl=en&re=df&st={cur_time}&ep=gsbubb"
|
||||
|
||||
async def yandex_impl(self, ctx: commands.Context[Artemis], url: str | None):
|
||||
await ctx.typing()
|
||||
|
||||
if url or ctx.message.attachments:
|
||||
@ -113,31 +106,8 @@ class OCR(commands.Cog):
|
||||
except Exception as e:
|
||||
raise ArtemisError(f"Could not compress image: {e}") from e
|
||||
|
||||
content_type = magic.from_buffer(image, mime=True)
|
||||
ext = mimetypes.guess_extension(content_type)
|
||||
|
||||
files = {"encoded_image": (f"image{ext}", image, content_type)}
|
||||
r = await ctx.bot.httpx_session.post(
|
||||
upload_url,
|
||||
files=files,
|
||||
headers=headers,
|
||||
cookies=cookies,
|
||||
follow_redirects=True,
|
||||
)
|
||||
if r.is_error:
|
||||
print(r.text)
|
||||
raise ArtemisError(f"Google Lens Upload returned {r.status_code} {r.reason_phrase}")
|
||||
html = r.text
|
||||
|
||||
match = re.search(final_data_re, html)
|
||||
if not match:
|
||||
if ctx.author.id == self.bot.owner.id:
|
||||
await ctx.send(file=utils.file(html, "lens.html"))
|
||||
raise ArtemisError("No text detected.")
|
||||
_lang, lines = match.groups()
|
||||
|
||||
text = "\n".join(json.loads(lines))
|
||||
return text
|
||||
result = await self.bot.api.yandex_ocr(image, "image/jpeg")
|
||||
return result
|
||||
|
||||
@commands.command(usage="[lang:eng] [l:eng] <url>")
|
||||
@commands.cooldown(1, 2, commands.BucketType.default)
|
||||
@ -175,23 +145,32 @@ class OCR(commands.Cog):
|
||||
@commands.cooldown(1, 10, commands.BucketType.default)
|
||||
async def lens(self, ctx: commands.Context, *, url: Optional[str]):
|
||||
"""
|
||||
OCR using Google Lens.
|
||||
OCR using Yandex.
|
||||
"""
|
||||
text = await self.lens_impl(ctx, url)
|
||||
if len(text) > 2000 - 8:
|
||||
return await ctx.reply(file=discord.File(StringIO(text), "lens.txt"))
|
||||
await ctx.reply(self.bot.codeblock(text, ""))
|
||||
result = await self.yandex_impl(ctx, url)
|
||||
|
||||
assert result.detected_lang
|
||||
lang = get_language_name(result.detected_lang) or result.detected_lang
|
||||
msg = f"Detected language: {lang}\n" + self.bot.codeblock(result.text, "")
|
||||
|
||||
if len(msg) > 2000:
|
||||
return await ctx.reply(
|
||||
content=f"Detected language: {lang}",
|
||||
file=discord.File(StringIO(result.text), "lens.txt"),
|
||||
)
|
||||
await ctx.reply(msg)
|
||||
|
||||
@commands.command()
|
||||
@commands.max_concurrency(1)
|
||||
@commands.cooldown(1, 10, commands.BucketType.default)
|
||||
async def lensgt(self, ctx: commands.Context, *, url: Optional[str]):
|
||||
"""
|
||||
OCR using Google Lens and translation using Google Translate.
|
||||
OCR using Yandex and translation using Google Translate.
|
||||
"""
|
||||
text = await self.lens_impl(ctx, url)
|
||||
flags = Flags(text=text, source=None, dest=None)
|
||||
result = await self.yandex_impl(ctx, url)
|
||||
flags = Flags(text=result.text, source=None, dest=None)
|
||||
cmd = self.bot.get_command("gt")
|
||||
assert cmd
|
||||
await cmd(ctx, flags=flags)
|
||||
|
||||
@commands.command(aliases=["lensdl", "lenstr"])
|
||||
@ -199,11 +178,12 @@ class OCR(commands.Cog):
|
||||
@commands.cooldown(1, 10, commands.BucketType.default)
|
||||
async def lensdeepl(self, ctx: commands.Context, *, url: Optional[str]):
|
||||
"""
|
||||
OCR using Google Lens and translation using DeepL.
|
||||
OCR using Yandex and translation using DeepL.
|
||||
"""
|
||||
text = await self.lens_impl(ctx, url)
|
||||
flags = Flags(text=text, source=None, dest=None)
|
||||
result = await self.yandex_impl(ctx, url)
|
||||
flags = Flags(text=result.text, source=None, dest=None)
|
||||
cmd = self.bot.get_command("deepl")
|
||||
assert cmd
|
||||
await cmd(ctx, flags=flags)
|
||||
|
||||
|
||||
|
||||
@ -178,6 +178,7 @@ except Exception:
|
||||
code = silencer + code
|
||||
|
||||
jsk_py = self.bot.get_command("jsk py")
|
||||
assert jsk_py
|
||||
await jsk_py(ctx, argument=codeblocks.codeblock_converter(code))
|
||||
|
||||
@dev.command()
|
||||
|
||||
@ -496,6 +496,7 @@ class Useful(commands.Cog):
|
||||
soup = BeautifulSoup(html, "lxml")
|
||||
|
||||
preview_img = soup.select_one(".CbirPreview-Image")
|
||||
assert preview_img
|
||||
preview_img_url = preview_img["src"]
|
||||
|
||||
embed = discord.Embed(title="Uploaded image", color=0xFDDE55, url=r.url)
|
||||
@ -526,8 +527,10 @@ class Useful(commands.Cog):
|
||||
|
||||
for result in results[:3]:
|
||||
a = result.select_one(".CbirSites-ItemTitle a")
|
||||
title = a.text
|
||||
if not a:
|
||||
continue
|
||||
|
||||
title = a.text
|
||||
url = a["href"]
|
||||
url = f"[{utils.trim(url.split('//', 1)[-1], 50)}]({url})"
|
||||
description = result.select_one(".CbirSites-ItemDescription").text
|
||||
@ -663,6 +666,7 @@ class Useful(commands.Cog):
|
||||
|
||||
satellite_data = result.select("td")
|
||||
satellite_pos = satellite_data[0].text.strip()
|
||||
assert satellite_data[1].a
|
||||
satellite_url = satellite_data[1].a["href"]
|
||||
|
||||
sat_pos = re.search(r"(\d{1,3}(?:\.\d)?).*?((?:E|W))", satellite_pos)
|
||||
|
||||
@ -1,48 +1,43 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import base64
|
||||
from dataclasses import dataclass
|
||||
from typing import TYPE_CHECKING, Any, Literal
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
import aiohttp
|
||||
|
||||
from artemis.utils.common import ArtemisError
|
||||
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from ..bot import Artemis
|
||||
|
||||
|
||||
@dataclass
|
||||
class DeepLResult:
|
||||
translation: str
|
||||
class YandexResult:
|
||||
text: str
|
||||
detected_lang: str | None = None
|
||||
|
||||
|
||||
class API:
|
||||
def __init__(self, bot: Artemis, token: str):
|
||||
self.base_url = "http://127.0.0.1:3000"
|
||||
def __init__(self, bot: Artemis, base_url: str, token: str):
|
||||
self.base_url = base_url
|
||||
self.token = token
|
||||
self.session: aiohttp.ClientSession = bot.session
|
||||
self.headers = {"User-Agent": bot.real_user_agent}
|
||||
self.authed_headers = {**self.headers, "Authorization": f"Bearer {self.token}"}
|
||||
|
||||
async def _aioread(self, fp):
|
||||
return await asyncio.to_thread(fp.read)
|
||||
async def yandex_ocr(self, image: bytes, mime: str):
|
||||
base64_image = base64.b64encode(image).decode("utf-8")
|
||||
data = {"file": base64_image, "mime": mime}
|
||||
|
||||
async def _request(
|
||||
self,
|
||||
method: str,
|
||||
path: str,
|
||||
authed: bool = False,
|
||||
res_type: Literal["json", "text", "bytes"] = "json",
|
||||
**kwargs,
|
||||
) -> Any:
|
||||
headers = self.authed_headers if authed else self.headers
|
||||
async with self.session.request(
|
||||
method, self.base_url + path, headers=headers, **kwargs
|
||||
async with self.session.post(
|
||||
self.base_url + "/ocr/yandex", json=data, headers=self.authed_headers
|
||||
) as r:
|
||||
match res_type:
|
||||
case "json":
|
||||
return await r.json()
|
||||
case "text":
|
||||
return await r.text()
|
||||
case "bytes":
|
||||
return await r.read()
|
||||
data = await r.json()
|
||||
if not r.ok:
|
||||
raise ArtemisError(f"Yandex Error: {data.get('error', 'Unknown')}")
|
||||
result = YandexResult(**data)
|
||||
if not result.text:
|
||||
raise ArtemisError("No text detected.")
|
||||
return result
|
||||
|
||||
@ -12,7 +12,6 @@ class Keys:
|
||||
cloudflare: str
|
||||
openai: str
|
||||
deepl: str
|
||||
google: dict[str, str]
|
||||
|
||||
|
||||
@dataclass
|
||||
@ -21,14 +20,14 @@ class Config:
|
||||
prefix: str
|
||||
user_agent: str
|
||||
real_user_agent: str
|
||||
api_base_url: str
|
||||
cdn_base_url: str
|
||||
internal_api_url: str
|
||||
cdn_url: str
|
||||
main_guild_id: int
|
||||
dev_guild_id: int
|
||||
keys: Keys
|
||||
|
||||
def __post_init__(self):
|
||||
self.keys = Keys(**self.keys) # type: ignore
|
||||
self.keys = Keys(**self.keys)
|
||||
|
||||
|
||||
def load_config() -> Config:
|
||||
|
||||
@ -2,8 +2,8 @@ token = "token"
|
||||
prefix = "!"
|
||||
user_agent = "user_agent"
|
||||
real_user_agent = "real_user_agent"
|
||||
api_base_url = "api_base_url"
|
||||
cdn_base_url = "cdn_base_url"
|
||||
internal_api_url = "internal_api_url"
|
||||
cdn_url = "cdn_url"
|
||||
main_guild_id = 1
|
||||
dev_guild_id = 1
|
||||
|
||||
@ -14,6 +14,3 @@ github = "github"
|
||||
cloudflare = "cloudflare"
|
||||
openai = "openai"
|
||||
deepl = "deepl"
|
||||
|
||||
# google cookies for lens API
|
||||
[keys.google]
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user