This commit is contained in:
artie 2025-01-02 12:35:52 +01:00
parent 85d44ef56e
commit 2c60e09317
6 changed files with 3 additions and 415 deletions

View File

@ -1,7 +1,5 @@
from __future__ import annotations from __future__ import annotations
import json
import re
import time import time
from enum import Enum from enum import Enum
from io import BytesIO from io import BytesIO
@ -12,7 +10,6 @@ import discord
import feedparser import feedparser
import pendulum import pendulum
from anilist.async_client import Client as Anilist from anilist.async_client import Client as Anilist
from bs4 import BeautifulSoup
from discord.ext import commands from discord.ext import commands
from discord.utils import format_dt from discord.utils import format_dt

View File

@ -21,9 +21,6 @@ log = logging.getLogger("artemis")
TIKTOK_RE = re.compile( TIKTOK_RE = re.compile(
r"https://vm\.tiktok\.com/(\w+)|https://(?:www\.)?tiktok\.com/(@.+?/video/\d+)" r"https://vm\.tiktok\.com/(\w+)|https://(?:www\.)?tiktok\.com/(@.+?/video/\d+)"
) )
REDDIT_RE = re.compile(
r"https?:\/\/(?:www\.)?(?:old\.)?reddit\.com\/r\/\w+\/comments\/(?P<id>[a-zA-Z0-9]+)(?:\/)?(?:[^\s]*)?"
)
class Events(commands.Cog): class Events(commands.Cog):
@ -59,18 +56,6 @@ class Events(commands.Cog):
self.suppress_embeds(message, 0.1) self.suppress_embeds(message, 0.1)
return await message.reply(f"https://vm.dstn.to/{vid}") return await message.reply(f"https://vm.dstn.to/{vid}")
reddit_url = REDDIT_RE.search(content)
if reddit_url:
pid = reddit_url.group("id")
reddit_post = await self.bot.reddit.post(pid=pid)
if reddit_post:
self.suppress_embeds(message, 0.1)
embeds = await reddit_post.to_embed(message)
return await message.reply(embeds=embeds)
log.warn(f"Invalid Reddit post URL/ID: {reddit_url.group(0)}")
@commands.Cog.listener() @commands.Cog.listener()
async def on_message(self, message: discord.Message): async def on_message(self, message: discord.Message):
if message.author.bot: if message.author.bot:

View File

@ -1,10 +1,8 @@
from __future__ import annotations from __future__ import annotations
import http import http
import mimetypes
import random import random
import re import re
from io import BytesIO
from typing import TYPE_CHECKING, Optional, TypedDict from typing import TYPE_CHECKING, Optional, TypedDict
from urllib.parse import quote from urllib.parse import quote
@ -16,7 +14,7 @@ from discord.ext import commands
from .. import utils from .. import utils
from ..utils import config from ..utils import config
from ..utils.common import ArtemisError, read_json, trim from ..utils.common import ArtemisError, read_json, trim
from ..utils.views import DropdownView, ViewPages from ..utils.views import ViewPages
if TYPE_CHECKING: if TYPE_CHECKING:
from ..bot import Artemis from ..bot import Artemis
@ -55,71 +53,6 @@ class Funhouse(commands.Cog):
reddit = self.bot.get_command("reddit") reddit = self.bot.get_command("reddit")
return await reddit(ctx, subreddit) return await reddit(ctx, subreddit)
@commands.command()
async def cat(self, ctx: commands.Context):
"""Random cat picture."""
await ctx.typing()
async with self.bot.session.get("https://cataas.com/cat") as r:
ext = mimetypes.guess_extension(r.content_type)
image = discord.File(BytesIO(await r.read()), f"{utils.time()}.{ext}")
await ctx.send(file=image)
@commands.command()
async def dog(self, ctx: commands.Context):
"""Random dog picture."""
async with self.bot.session.get("https://random.dog/woof.json") as r:
json = await r.json(content_type=None)
await ctx.send(json["url"])
@commands.command()
async def fox(self, ctx: commands.Context):
"""Random fox picture."""
async with self.bot.session.get("https://randomfox.ca/floof/") as r:
json = await r.json(content_type=None)
await ctx.send(json["image"])
@commands.command()
async def waifu(self, ctx: commands.Context):
"""Random waifu (anime girl)."""
await self.invoke_reddit(ctx, "awwnime")
@commands.command()
async def husbando(self, ctx: commands.Context):
"""Random husbando (anime boy)."""
sub = random.choice(("cuteanimeboys", "bishounen"))
await self.invoke_reddit(ctx, sub)
@commands.command()
async def yuri(self, ctx: commands.Context):
"""Random yuri (anime lesbian couple) art."""
await self.invoke_reddit(ctx, "wholesomeyuri")
@commands.command()
async def neko(self, ctx: commands.Context):
"""Random neko (anime cat girl/boy)."""
db = self.bot.get_command("db")
await db(ctx, tags="cat_ears")
@commands.command()
@commands.is_nsfw()
async def ecchi(self, ctx: commands.Context):
"""
Random ecchi image.
NSFW channels only.
"""
db = self.bot.get_command("db")
await db(ctx, tags="rating:q score:>10")
@commands.command()
@commands.is_nsfw()
async def hentai(self, ctx: commands.Context):
"""
Random hentai image.
NSFW channels only.
"""
db = self.bot.get_command("db")
await db(ctx, tags="rating:e score:>10")
@commands.command() @commands.command()
async def hug(self, ctx: commands.Context, member: discord.Member): async def hug(self, ctx: commands.Context, member: discord.Member):
"""Hug someone.""" """Hug someone."""
@ -362,165 +295,6 @@ class Funhouse(commands.Cog):
img = random.choice(data) img = random.choice(data)
await ctx.reply(img["download_url"]) await ctx.reply(img["download_url"])
@commands.group(aliases=["ffxiv"])
async def xiv(self, ctx: commands.Context):
"""Final Fantasy XIV commands."""
if ctx.invoked_subcommand is None:
await ctx.send("Invalid subcommand passed.")
@xiv.command(aliases=["chara"])
async def character(self, ctx: commands.Context, *, query: str):
"""Search for player characters in all worlds."""
LODESTONE_URL = "https://eu.finalfantasyxiv.com/lodestone/character/"
await ctx.typing()
params = {"name": query, "columns": "ID,Name,Server"}
async with self.bot.session.get("https://xivapi.com/character/search", params=params) as r:
if not r.ok:
return await ctx.reply(f"XIV API Error: {r.status} {r.reason}")
data = await r.json()
characters = data["Results"]
if not characters:
return await ctx.reply("No results found.")
elif len(characters) == 1:
character = characters[0]
else:
view = DropdownView(ctx, characters, lambda x: x["Name"], lambda x: x["Server"])
character = await view.prompt("Which character?")
if not character:
return
await ctx.typing()
chid = character["ID"]
params = {
"columns": "Character.Name,Character.Avatar,Character.Portrait,Character.ActiveClassJob"
}
async with self.bot.session.get(f"https://xivapi.com/character/{chid}", params=params) as r:
if not r.ok:
return await ctx.reply(f"XIV API Error: {r.status} {r.reason}")
data = await r.json()
character = data["Character"]
name = character["Name"]
url = LODESTONE_URL + str(chid)
portrait_url = character["Portrait"]
avatar_url = character["Avatar"]
embed = discord.Embed(title=name, url=url, color=0x293C66)
embed.set_image(url=portrait_url)
embed.set_thumbnail(url=avatar_url)
embed.set_author(
name="The Lodestone",
icon_url="https://img.finalfantasyxiv.com/lds/h/0/U2uGfVX4GdZgU1jASO0m9h_xLg.png",
)
active_job = character["ActiveClassJob"]
job_name = active_job["Name"].title()
job_level = active_job["Level"]
embed.description = f"Level **{job_level}**\n{job_name}"
await ctx.reply(embed=embed)
@xiv.command()
async def item(self, ctx: commands.Context, *, query: str):
"""Search for items."""
await ctx.typing()
params = {"string": query, "columns": "Name,Url", "indexes": "item"}
async with self.bot.session.get("https://xivapi.com/search", params=params) as r:
if not r.ok:
return await ctx.reply(f"XIV API Error: {r.status} {r.reason}")
data = await r.json()
results = data["Results"]
if not results:
return await ctx.reply("No results found.")
elif len(results) == 1:
result = results[0]
else:
view = DropdownView(ctx, results, lambda x: x["Name"])
result = await view.prompt("Which item?")
if not result:
return
await ctx.typing()
url = "https://xivapi.com" + result["Url"]
params = {
"columns": "ClassJobCategory.Name,DamageMag,DamagePhys,DefenseMag,DefensePhys,DelayMs,Description,IconHD,ItemUICategory.Name,LevelEquip,LevelItem,Name,Rarity,Stats"
}
async with self.bot.session.get(url, params=params) as r:
if not r.ok:
return await ctx.reply(f"XIV API Error: {r.status} {r.reason}")
data = await r.json()
name = data["Name"]
category = data["ItemUICategory"]["Name"]
icon_url = "https://xivapi.com" + data["IconHD"]
# rarity = item_rarity[data["Rarity"]]
item_level = data["LevelItem"]
job_category = data["ClassJobCategory"]["Name"]
equip_level = data["LevelEquip"]
description = data["Description"].replace("\n\n\n\n", "\n\n")
mag_dmg = ("Magic Damage", int(data["DamageMag"]))
phys_dmg = ("Damage", int(data["DamagePhys"]))
dmg = max(mag_dmg, phys_dmg, key=lambda x: x[1])
mag_def = ("Magic Defense", int(data["DefenseMag"]))
phys_def = ("Defense", int(data["DefensePhys"]))
main_stats = [dmg, mag_def, phys_def]
main_stats = [stat for stat in main_stats if stat[1] > 0]
main_stats.sort(key=lambda x: x[0])
if int(data["DelayMs"]):
main_stats.append(("Delay", round(int(data["DelayMs"]) / 1000, 2)))
if data["Stats"]:
bonuses = [(re.sub("([A-Z]+)", r" \1", k), v["NQ"]) for k, v in data["Stats"].items()]
else:
bonuses = []
embed = discord.Embed(title=name, color=0x293C66)
embed.set_thumbnail(url=icon_url)
embed.set_author(
name="Eorzea Database",
icon_url="https://img.finalfantasyxiv.com/lds/h/0/U2uGfVX4GdZgU1jASO0m9h_xLg.png",
)
desc = f"{category}\nItem Level **{item_level}**\n\n{job_category or 'All Classes'}\nLv. **{equip_level}**\n\n"
if description:
desc += f"{description}\n\n"
for bonus in bonuses:
desc += f"{bonus[0]}: **+{bonus[1]}**\n"
embed.description = desc
for stat in main_stats:
embed.add_field(name=stat[0], value=stat[1])
await ctx.reply(embed=embed)
@xiv.command(aliases=["fr", "fashion"])
async def fashionreport(self, ctx: commands.Context):
"""Displays the latest Fashion Report requirements."""
headers = {"User-Agent": self.bot.real_user_agent}
await ctx.typing()
async with self.bot.session.get(
f"{config.api_base_url}/xiv/kaiyoko", headers=headers, allow_redirects=False
) as r:
title = r.headers.get("x-title")
embed = discord.Embed(title=title, color=0xE7DFCE)
embed.set_image(url=f"{config.api_base_url}/xiv/kaiyoko?includeMeta=false&t={utils.time()}")
await ctx.reply(embed=embed)
@commands.command(aliases=["fs"]) @commands.command(aliases=["fs"])
async def foalsay(self, ctx: commands.Context, *, query: str): async def foalsay(self, ctx: commands.Context, *, query: str):
""" """

View File

@ -3,8 +3,8 @@ from __future__ import annotations
import asyncio import asyncio
import re import re
from io import BytesIO from io import BytesIO
from typing import TYPE_CHECKING, Optional from typing import TYPE_CHECKING
from urllib.parse import quote, quote_plus, unquote from urllib.parse import quote, quote_plus
import discord import discord
import gtts import gtts

View File

@ -33,32 +33,6 @@ class Useful(commands.Cog):
def __init__(self, bot: Artemis): def __init__(self, bot: Artemis):
self.bot: Artemis = bot self.bot: Artemis = bot
@commands.command()
@commands.cooldown(1, 2, commands.BucketType.default)
async def bing(self, ctx: commands.Context, *, query: str):
"""
Bing Search.
Uses the RSS feed, useless for complex searches.
"""
await ctx.typing()
results = await utils.search_bing(ctx, query)
if not results:
return await ctx.reply("No results found.")
embed = discord.Embed(title=f"Search Results for '{query}'", color=0x1E5DD4)
embed.set_author(
name="Bing", icon_url="https://www.google.com/s2/favicons?domain=bing.com&sz=128"
)
for result in results[:5]:
embed.add_field(
name=result.title,
value=f"[{utils.trim(result.url, 50)}]({result.url})\n{utils.trim(result.description, 120)}",
inline=False,
)
await ctx.reply(embed=embed)
@commands.command(aliases=["char"]) @commands.command(aliases=["char"])
async def charinfo(self, ctx: commands.Context, *, characters: str): async def charinfo(self, ctx: commands.Context, *, characters: str):
"""Shows you information about a number of characters using unicode data lookup.""" """Shows you information about a number of characters using unicode data lookup."""

View File

@ -1,142 +0,0 @@
from __future__ import annotations
import asyncio
from dataclasses import dataclass
import logging
from typing import TYPE_CHECKING, TypeVar
from collections import deque
from bs4 import BeautifulSoup
if TYPE_CHECKING:
from ..bot import Artemis
T = TypeVar("T")
class FeedNotifier:
NAME: str = "Base"
CHECK_INTERVAL: int | float = 60 * 5
FEED_INTERVAL: int | float = 0.1
CACHE_SIZE: int = 100
bot: Artemis
feeds: list[str]
_cache = dict[str, list[str]]
_task = asyncio.Task
def __init__(self, bot: Artemis, feeds: list[str]):
self.bot = bot
self.feeds = feeds
self._log = logging.getLogger(f"{self.NAME}Notifier")
self._cache = {}
for feed in self.feeds:
self._cache[feed] = deque([], maxlen=self.CACHE_SIZE)
def log(self, msg):
self._log.info(msg)
async def _run(self):
try:
await self._init_cache()
await asyncio.sleep(self.CHECK_INTERVAL)
self.log("Starting check loop...")
while True:
self.log("Processing feeds...")
for feed in self.feeds:
entries = await self.fetch_entries(feed)
for entry in entries:
key = self.get_cache_key(entry)
if key in self._cache[feed]:
continue
self.log(f"{feed}: New entry found, handing over to on_new_entry()")
self._cache[feed].append(key)
await self.on_new_entry(entry)
await asyncio.sleep(self.FEED_INTERVAL)
await asyncio.sleep(self.CHECK_INTERVAL)
except Exception as error:
await self.on_error(error)
async def _init_cache(self):
self.log("Bootstrapping cache...")
for feed in self.feeds:
self._cache[feed].extend(
[self.get_cache_key(entry) for entry in await self.fetch_entries(feed)]
)
self.log(f"{feed}: Bootstrapped cache with {len(self._cache[feed])} entries.")
def start(self):
self._task = asyncio.create_task(self._run())
self.log("Worker started.")
return self
def stop(self):
try:
self._task.cancel()
except asyncio.CancelledError:
pass
finally:
self.log("Worker stopped.")
def get_cache_key(self, entry: T) -> str:
raise NotImplementedError()
async def fetch_entries(self, feed: str) -> list[T]:
raise NotImplementedError()
async def on_new_entry(self, entry: T):
raise NotImplementedError()
async def on_error(self, error: Exception):
await self.send_to_user(
self.bot.owner_id, f"[{self.NAME}Notifier] {error.__class__.__name__}: {str(error)}"
)
async def fetch_html(self, url):
self.log(f"Fetching {url}")
headers = {"User-Agent": self.bot.user_agent}
async with self.bot.session.get(url, headers=headers) as r:
html = await r.text()
return BeautifulSoup(html, "lxml")
async def fetch_json(self, url) -> dict:
headers = {"User-Agent": self.bot.user_agent}
async with self.bot.session.get(url, headers=headers) as r:
return await r.json()
async def send_to_channel(self, channel_id: int, *args, **kwargs):
self.log(f"Sending new entry to channel {channel_id}.")
await self.bot.get_channel(channel_id).send(*args, **kwargs)
async def send_to_user(self, user_id: int, *args, **kwargs):
self.log(f"Sending new entry to user {user_id}.")
await self.bot.get_user(user_id).send(*args, **kwargs)
@dataclass
class HNEntry:
title: str
url: str
class HackerNewsNotifier(FeedNotifier):
NAME = "HackerNews"
CHECK_INTERVAL = 60
def get_cache_key(self, entry: HNEntry) -> str:
return entry.url
async def fetch_entries(self, feed: str) -> list[HNEntry]:
url = "https://news.ycombinator.com/" + feed
soup = await self.fetch_html(url)
articles = []
for article in soup.select("tr.athing"):
titleline = article.select_one("span.titleline > a")
url = titleline["href"]
title = titleline.text
articles.append(HNEntry(title, url))
return list(reversed(articles))
async def on_new_entry(self, entry: HNEntry):
await self.send_to_user(self.bot.owner_id, f"{entry.title}\n{entry.url}")