integrate with deepl free api, fallback to our scraper

This commit is contained in:
artie 2024-10-02 19:30:26 +02:00
parent 348ef8af3d
commit 4476e4df73
10 changed files with 222 additions and 231 deletions

12
.pylintrc Normal file
View File

@ -0,0 +1,12 @@
[MESSAGES CONTROL]
disable=
missing-class-docstring,
missing-function-docstring,
missing-module-docstring,
line-too-long,
attribute-defined-outside-init,
redefined-builtin,
disallowed-name,
unspecified-encoding,
broad-exception-caught,
raise-missing-from,

View File

@ -19,12 +19,12 @@ from discord.ext.commands.cooldowns import BucketType
from .cogs import EXTENSIONS
from . import utils
from .utils import reddit
from .utils.reddit import Reddit
from .utils.api import API
from .utils.catbox import Catbox, Litterbox
from .utils.deepl import DeepL
from .utils.common import read_json, ArtemisError
from .utils.constants import TEMP_DIR
from .utils.unogs import uNoGS
from .utils import config
@ -107,8 +107,8 @@ class Artemis(commands.Bot):
self.api = API(self, self.keys.api)
self.catbox = Catbox(self.keys.catbox, session=self.session)
self.litterbox = Litterbox(session=self.session)
self.unogs = uNoGS(session=self.session)
self.reddit = reddit.Reddit(self.session)
self.reddit = Reddit(self.session)
self.deepl = DeepL(self, self.keys.deepl)
await self.maybe_send_restarted()

View File

@ -284,7 +284,7 @@ class Anime(commands.Cog):
embed.set_footer(text="Powered by trace.moe")
await ctx.reply(embed=embed)
@whatanime.command()
@whatanime.command(aliases=["usage"])
async def quota(self, ctx: commands.Context):
"""
Returns the search quota left for the month.

View File

@ -15,7 +15,9 @@ from aiogoogletrans import Translator
from bs4 import BeautifulSoup, Tag
from discord import app_commands
from discord.ext import commands
from discord.utils import format_dt
from wiktionaryparser import WiktionaryParser
from langdetect import detect
from .. import utils
from ..utils import iso_639
@ -46,73 +48,40 @@ nimi_lookup = {entry["word"]: entry for entry in nimi}
nimi_reverse_lookup = {entry["definition"]: entry for entry in nimi}
@cached()
async def get_deepl_languages():
languages = [
"bg",
"cs",
"da",
"de",
"el",
"en",
"es",
"et",
"fi",
"fr",
"hu",
"id",
"it",
"ja",
"ko",
"lt",
"lv",
"nb",
"nl",
"pl",
"pt",
"ro",
"ru",
"sk",
"sl",
"sv",
"tr",
"uk",
"zh",
]
languages = {code: iso_639.get_language_name(code) for code in languages}
if languages.get("el"):
languages["el"] = "Greek"
return languages
# Translation slash commands
@app_commands.context_menu(name="Translate (DeepL)")
@app_commands.allowed_installs(guilds=False, users=True)
@app_commands.allowed_contexts(guilds=True, dms=True, private_channels=True)
async def deepl_slash(interaction: discord.Interaction, message: discord.Message):
async def deepl_slash(interaction: discord.Interaction[Artemis], message: discord.Message):
await interaction.response.defer(ephemeral=True)
content = message.content
if not content:
return await interaction.followup.send("No text detected.", ephemeral=True)
languages = await get_deepl_languages()
languages = interaction.client.deepl.languages
result = None
result_src = None
result_dest = 'en'
billed_characters = None
try:
result = await interaction.client.api.deepl(content, "auto", "en")
except Exception as err:
return await interaction.followup.send(f"Error: {err}", ephemeral=True)
src = result.src.lower()
dest = result.dst.lower()
try:
src = languages[src]
dest = languages[dest]
result = await interaction.client.deepl.translate(content, 'auto', 'EN')
result_src = result.src.lower()
billed_characters = result.billed_characters
except Exception:
pass
src = detect(content)
if src == 'unknown' or src not in languages:
raise ArtemisError("Could not detect language, sorry!")
try:
result = await interaction.client.api.deepl(content, src, 'en')
result_src = src
except Exception as err:
raise ArtemisError(f"Could not translate with any method, epxloding with last error:\n`{err}`")
display_src = languages.get(result_src) or result_src
display_dest = languages.get(result_dest) or result_dest
translation = result.translation
embed = discord.Embed(colour=0x0F2B46)
@ -120,7 +89,9 @@ async def deepl_slash(interaction: discord.Interaction, message: discord.Message
name="DeepL",
icon_url="https://www.google.com/s2/favicons?domain=deepl.com&sz=64",
)
embed.add_field(name=f"From {src} to {dest}", value=translation)
embed.add_field(name=f"From {display_src} to {display_dest}", value=translation)
if billed_characters:
embed.set_footer(text=f"Billed characters: {billed_characters}")
await interaction.followup.send(embed=embed, ephemeral=True)
@ -345,7 +316,9 @@ class Language(commands.Cog):
embed.add_field(name=f"From {src} to {dest}", value=translation)
await ctx.reply(embed=embed)
@commands.command(usage="[source:auto] [s:auto] [dest:en] [d:en] <text>")
@commands.group(
invoke_without_command=True, usage="[source:auto] [s:auto] [dest:en] [d:en] <text>"
)
@commands.max_concurrency(1)
@commands.cooldown(1, 2, commands.BucketType.default)
async def deepl(self, ctx: commands.Context, *, flags: TranslateFlags):
@ -369,7 +342,7 @@ class Language(commands.Cog):
await ctx.typing()
languages = await get_deepl_languages()
languages = self.bot.deepl.languages
if src != "auto" and src not in languages or dest not in languages:
msg = "Unsupported language code, list of supported languages:\n\n"
@ -377,25 +350,36 @@ class Language(commands.Cog):
embed = discord.Embed(description=msg, color=discord.Color.red())
return await ctx.reply(embed=embed)
try:
result = await self.bot.api.deepl(text, src, dest)
except Exception as err:
return await ctx.reply(err)
result = None
result_src = None
result_dest = dest.lower()
billed_characters = None
src = result.src.lower()
dest = result.dst.lower()
# try deepl api first
try:
src = languages[src]
dest = languages[dest]
result = await self.bot.deepl.translate(text, src.upper(), dest.upper())
result_src = result.src.lower()
billed_characters = result.billed_characters
except Exception:
pass
# if that fails, try our scraper
if src == 'auto':
src = detect(text)
if src == 'unknown' or src not in languages:
raise ArtemisError("Could not detect language, try specifying one?")
try:
result = await self.bot.api.deepl(text, src, dest)
result_src = src
except Exception as err:
raise ArtemisError(f"Could not translate with any method, epxloding with last error:\n`{err}`")
display_src = languages.get(result_src) or result_src
display_dest = languages.get(result_dest) or result_dest
translation = result.translation
if len(translation) > 1024:
buff = f"--- From {src} to {dest} ---\n{translation}".encode("utf-8")
buff = f"--- From {display_src} to {display_dest} ---\n{translation}".encode("utf-8")
buff = BytesIO(buff)
file = discord.File(buff, f"{src}-{dest}.txt")
file = discord.File(buff, f"{display_src}-{display_dest}.txt")
return await ctx.reply(
"The translation could not fit on the screen, so here's a file:",
@ -407,9 +391,25 @@ class Language(commands.Cog):
name="DeepL",
icon_url="https://www.google.com/s2/favicons?domain=deepl.com&sz=64",
)
embed.add_field(name=f"From {src} to {dest}", value=translation)
embed.add_field(name=f"From {display_src} to {display_dest}", value=translation)
if billed_characters:
embed.set_footer(text=f"Billed characters: {billed_characters}")
await ctx.reply(embed=embed)
@deepl.command(aliases=["quota"])
async def usage(self, ctx: commands.Context):
"""
Returns the character quota left for the month.
"""
await ctx.typing()
usage = await self.bot.deepl.usage()
reset = (
pendulum.now("UTC").add(months=1).replace(day=2, hour=16, minute=30, second=0)
)
await ctx.reply(
f"Characters used: **{usage.character_count}**\nCharacters left: **{usage.character_limit - usage.character_count}**\nQuota resets {format_dt(reset, "R")}."
)
@commands.command(usage="[lang:en] [l:en] <text>")
@commands.max_concurrency(1)
async def tts(self, ctx: commands.Context, *, flags: TTSFlags):

View File

@ -68,59 +68,6 @@ class Media(commands.Cog):
def __init__(self, bot: Artemis):
self.bot: Artemis = bot
@commands.command(aliases=["nf"])
@commands.cooldown(1, 2, commands.BucketType.user)
async def netflix(self, ctx: commands.Context, *, query: str):
"""Check if and where a show is available on Netflix."""
await ctx.typing()
data = await self.bot.unogs.search(query)
if "total" not in data:
return await ctx.reply("The API returned no data, weird!")
elif data["total"] == 0:
return await ctx.reply("No results found.")
elif data["total"] == 1:
data = data["results"][0]
else:
view = DropdownView(
ctx,
data["results"],
lambda x: html.unescape(x["title"]),
placeholder="Choose title...",
)
data = await view.prompt()
if not data:
return
title = html.unescape(data["title"])
synopsis = html.unescape(data["synopsis"])
nfid = data["nfid"]
nfurl = f"https://www.netflix.com/title/{data['nfid']}"
img = data.get("poster") or data.get("img")
countries = await self.bot.unogs.fetch_details(nfid, "countries")
flags = " ".join([f":flag_{country['cc'].strip().lower()}:" for country in countries])
audio = []
subtitles = []
for country in countries:
audio += country["audio"].split(",")
subtitles += country["subtitle"].split(",")
audio, subtitles = sorted(set(audio)), sorted(set(subtitles))
audio, subtitles = [a for a in audio if a], [s for s in subtitles if s]
embed = discord.Embed(title=title, description=synopsis, url=nfurl, color=0xE50914)
if img and "http" in img:
embed.set_image(url=img)
embed.set_author(
name="Netflix",
icon_url="https://assets.nflxext.com/us/ffe/siteui/common/icons/nficon2016.png",
)
embed.add_field(name="Availability", value=flags)
embed.add_field(name="Audio", value=", ".join(audio), inline=False)
embed.add_field(name="Subtitles", value=", ".join(subtitles), inline=False)
await ctx.reply(embed=embed)
@commands.command(aliases=["thumb"])
async def thumbnail(self, ctx: commands.Context, url: str):
"""Gives you a video thumbnail URL for a video from any site supported by YTDL."""

View File

@ -4,18 +4,18 @@ import asyncio
import io
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any, Literal
from .common import ArtemisError
import aiohttp
from .common import ArtemisError
if TYPE_CHECKING:
from ..bot import Artemis
@dataclass
class DeepLResult:
src: str
dst: str
translation: str
@ -24,8 +24,8 @@ class API:
self.base_url = "http://127.0.0.1:3000"
self.token = token
self.session: aiohttp.ClientSession = bot.session
self.HEADERS = {"User-Agent": bot.real_user_agent}
self.AUTHED_HEADERS = {**self.HEADERS, "Authorization": f"Bearer {self.token}"}
self.headers = {"User-Agent": bot.real_user_agent}
self.authed_headers = {**self.headers, "Authorization": f"Bearer {self.token}"}
async def _aioread(self, fp):
return await asyncio.to_thread(fp.read)
@ -38,7 +38,7 @@ class API:
res_type: Literal["json", "text", "bytes"] = "json",
**kwargs,
) -> Any:
headers = self.AUTHED_HEADERS if authed else self.HEADERS
headers = self.authed_headers if authed else self.headers
async with self.session.request(
method, self.base_url + path, headers=headers, **kwargs
) as r:
@ -54,17 +54,17 @@ class API:
self,
url: str,
selector: str | None = None,
waitForSelector: str | None = None,
waitForFunction: str | None = None,
wait_for_selector: str | None = None,
wait_for_function: str | None = None,
) -> io.BytesIO:
"""Returns a PNG screenshot of the website at url with optional selector."""
params = {"url": url}
if selector:
params["selector"] = selector
if waitForSelector:
params["waitForSelector"] = waitForSelector
if waitForFunction:
params["waitForFunction"] = waitForFunction
if wait_for_selector:
params["waitForSelector"] = wait_for_selector
if wait_for_function:
params["waitForFunction"] = wait_for_function
res: bytes = await self._request(
"GET", "/webdriver/screenshot", authed=True, res_type="bytes", params=params
@ -76,9 +76,9 @@ class API:
data = {"src": src.lower(), "dst": dst.lower(), "text": text}
async with self.session.post(
self.base_url + "/webdriver/deepl", json=data, headers=self.AUTHED_HEADERS
self.base_url + "/webdriver/deepl", json=data, headers=self.authed_headers
) as r:
data = await r.json()
if not r.ok:
raise ArtemisError(f"DeepL Error: `{data.get('error', 'Unknown')}`")
raise ArtemisError(f"DeepL Error: {data.get('error', 'Unknown')}")
return DeepLResult(**data)

View File

@ -10,6 +10,7 @@ class Keys:
github: str
cloudflare: str
openai: str
deepl: str
@dataclass

121
artemis/utils/deepl.py Normal file
View File

@ -0,0 +1,121 @@
from __future__ import annotations
from dataclasses import dataclass
from functools import cached_property
from typing import TYPE_CHECKING
import httpx
from artemis.utils.common import ArtemisError
if TYPE_CHECKING:
from ..bot import Artemis
class DeepLError(ArtemisError):
pass
@dataclass
class Translation:
src: str
translation: str
billed_characters: int
@dataclass
class Usage:
character_count: int
character_limit: int
class DeepL:
API_URL = "https://api-free.deepl.com/v2"
session: httpx.AsyncClient
api_key: str
headers: dict[str, str]
over_quota: bool = False
def __init__(self, bot: Artemis, api_key: str):
self.session = bot.httpx_session
self.api_key = api_key
self.headers = {
"User-Agent": bot.real_user_agent,
"Authorization": f"DeepL-Auth-Key {api_key}",
}
async def translate(
self, text: str, source_lang: str | None = None, target_lang="en"
) -> Translation:
if self.over_quota:
raise DeepLError("DeepL API quota exceeded.")
url = f"{self.API_URL}/translate"
data = {
"text": [text],
"target_lang": target_lang,
"formality": "prefer_less",
"show_billed_characters": True,
}
if source_lang and source_lang.lower() != "auto":
data["source_lang"] = source_lang
r = await self.session.post(url, json=data, headers=self.headers)
if not r.is_success:
if r.status_code == 456:
self.over_quota = True
raise DeepLError("DeepL API quota exceeded.")
raise DeepLError(f"DeepL API returned an error: {r.status_code} {r.reason_phrase}")
data = r.json()
if not data["translations"]:
raise DeepLError("DeepL API returned no translations.")
result = data["translations"][0]
translation = result["text"]
source_lang = result.get("detected_source_language") or source_lang
billed_characters = result["billed_characters"]
return Translation(source_lang, translation, billed_characters)
async def usage(self) -> Usage:
url = f"{self.API_URL}/usage"
r = await self.session.get(url, headers=self.headers)
if not r.is_success:
raise DeepLError(f"DeepL API returned an error: {r.status_code} {r.reason_phrase}")
data = r.json()
return Usage(**data)
@cached_property
def languages(self):
return {
"bg": "Bulgarian",
"cs": "Czech",
"da": "Danish",
"de": "German",
"el": "Greek",
"en": "English",
"es": "Spanish",
"et": "Estonian",
"fi": "Finnish",
"fr": "French",
"hu": "Hungarian",
"id": "Indonesian",
"it": "Italian",
"ja": "Japanese",
"ko": "Korean",
"lt": "Lithuanian",
"lv": "Latvian",
"nb": "Norwegian",
"nl": "Dutch",
"pl": "Polish",
"pt": "Portuguese",
"ro": "Romanian",
"ru": "Russian",
"sk": "Slovak",
"sl": "Slovenian",
"sv": "Swedish",
"tr": "Turkish",
"uk": "Ukrainian",
"zh": "Chinese",
}

View File

@ -1,91 +0,0 @@
import json
import time
from base64 import b64decode
from typing import Optional
from urllib.parse import quote
from aiohttp import ClientSession
from yt_dlp.utils import random_user_agent
from . import utils
class uNoGSError(Exception):
pass
class uNoGS:
token: Optional[str]
token_expiry: Optional[int]
_API_BASE = "https://unogs.com/api"
_EMPTY_PARAMS = [
"country_andorunique",
"start_year",
"end_year",
"start_rating",
"end_rating",
"genrelist",
"type",
"audio",
"subtitle",
"audiosubtitle_andor",
"person",
"filterby",
"orderby",
]
_COUNTRY_LIST = "21,23,26,29,33,36,307,45,39,327,331,334,265,337,336,269,267,357,378,65,67,390,392,268,400,402,408,412,447,348,270,73,34,425,432,436,46,78"
_DEFAULT_HEADERS = {
"User-Agent": random_user_agent(),
"Referer": "https://unogs.com",
"Referrer": "http://unogs.com",
}
_DETAILS = ["detail", "bgimages", "genres", "people", "countries", "episodes"]
def __init__(self, session: ClientSession):
self.session: ClientSession = session
self.token = None
self.token_expiry = None
async def _validate_token(self):
if not self.token or self.token_expiry < utils.time():
await self._fetch_token()
async def _fetch_token(self):
data = {"user_name": round(time.time(), 3)}
async with self.session.post(
self._API_BASE + "/user", headers=self._DEFAULT_HEADERS, data=data
) as r:
data = await r.json()
token = data["token"]["access_token"]
self.token = token
token_data = token.split(".")[1] + "=="
token_data = b64decode(token_data).decode()
self.token_expiry = json.loads(token_data)["exp"]
async def _request(self, path: str, **kwargs):
await self._validate_token()
headers = {**self._DEFAULT_HEADERS, "Authorization": f"Bearer {self.token}"}
cookies = {"authtoken": "token"}
async with self.session.get(
self._API_BASE + path, headers=headers, cookies=cookies, **kwargs
) as r:
return await r.json()
async def search(self, query: str):
params = {
"limit": "20",
"offset": "0",
"query": quote(query),
"countrylist": self._COUNTRY_LIST,
}
for param in self._EMPTY_PARAMS:
params[param] = ""
return await self._request("/search", params=params)
async def fetch_details(self, nfid, kind="detail"):
if kind not in self._DETAILS:
raise uNoGSError("Incorrect detail kind.")
return await self._request(f"/title/{kind}", params={"netflixid": nfid})

View File

@ -24,3 +24,4 @@ h2
aiogoogletrans
setuptools
git+https://github.com/Suyash458/WiktionaryParser
langdetect