Files
kwaylon/robopage.py

169 lines
6.2 KiB
Python

import logging
import os
import re
from typing import Union
import discord
import pandas as pd
from discord import RawReactionActionEvent, RawReactionClearEmojiEvent
from dotenv import load_dotenv
import data
import jokes
logging.basicConfig(level=logging.INFO)
LIL_STINKY_ID = 704043422276780072
LOGGER = logging.getLogger(__name__)
class RoboPage(discord.Client):
db_path: str = 'messages.db'
def __init__(self, *args, **kwargs):
super(RoboPage, self).__init__(*args, **kwargs)
attrs = filter(lambda n: n.endswith('Joke') and n not in ['Joke', 'GifJoke'], dir(jokes))
attrs = map(lambda n: getattr(jokes, n)(), attrs)
self.jokes = list(attrs)
self.most_regex = re.compile(
"^who is the most\s+(?P<emoji>\S+)\s*?(?:in the past (?P<days>\d+) days)?\??$",
re.IGNORECASE & re.UNICODE,
)
self.leaderboard_regex = re.compile(
'^most (?P<emoji>\S+) (leaderboard|((?:.+?(?P<days>\d+) days)))',
re.IGNORECASE & re.UNICODE
)
async def handle_ready(self):
async def alive():
channel: discord.TextChannel = discord.utils.get(self.get_all_channels(), name='robotics-facility')
await channel.send(f"I'm aliiiiiive {discord.utils.get(self.emojis, name='kaylon')}")
self.data: data.MsgData = await data.MsgData.create(
client=self,
limit=5000,
days=30,
# limit=100,
# days=14,
)
self.data.to_sql(self.db_path)
LOGGER.info(f'{self.data.msgs.shape[0]} messages total')
# await alive()
async def handle_message(self, message):
if message.author != self.user:
if hasattr(self, 'data'):
await self.data.add_msg(message)
if (m := self.leaderboard_regex.match(message.content)) is not None:
try:
await message.reply(await self.leaderboard(match=m))
except KeyError as e:
LOGGER.exception(e)
await message.reply(f"I couldn't find any {m.group('emoji')} reactions. Leave me alone!")
return
elif (m := self.most_regex.match(message.content)) is not None:
try:
await message.reply(await self.biggest_single(match=m))
except Exception as e:
LOGGER.exception(e)
await message.reply('NObody')
else:
LOGGER.warning(f'No self.data attribute')
for joke in self.jokes:
if (scan_res := joke.scan(message)):
LOGGER.info(f'{joke.__class__.__name__} detected: {message.content}, {scan_res.group()}')
await joke.respond(message, self, scan_res)
async def handle_raw_reaction(self, payload: Union[RawReactionActionEvent, RawReactionClearEmojiEvent]):
LOGGER.info(payload)
guild = await client.fetch_guild(payload.guild_id)
channel = await guild.fetch_channel(payload.channel_id)
message = await channel.fetch_message(payload.message_id)
if payload.event_type == 'REACTION_REMOVE':
LOGGER.info(f'{payload.emoji} removed from\n{message.author}: {message.content}')
elif payload.event_type == 'REACTION_ADD':
LOGGER.info(
f'{payload.member.display_name} added {payload.emoji} to\n' + \
f'{message.author.display_name}: {message.content}')
if hasattr(self, 'data'):
await self.data.update_reaction(msg=message)
async def leaderboard(self, match: re.Match) -> str:
emoji_name = get_emoji_name(match.group('emoji'))
days = match.group('days') or 14
days = int(days)
counts = await self.data.emoji_user_counts(client=self,
emoji_name=emoji_name,
days=days)
width = max([len(str(s)) for s in counts.index.values])
res = f'{match.group("emoji")} totals, past {days} days\n'
res += '\n'.join(f"`{str(name).ljust(width + 1)}with {cnt:<2.0f} total`"
for name, cnt in counts.iteritems())
return res
async def biggest_single(self, match: re.Match) -> str:
days = match.group('days') or 14
days = int(days)
data: pd.Series = self.data.emoji_totals(
emoji_name=get_emoji_name(match.group('emoji')),
days=days
)
user: discord.User = await client.fetch_user(user_id=data.index[0])
LOGGER.info(f'User: {user.mention}')
msg = f'{user.mention} with {data.iloc[0]:.0f}x {match.group("emoji")} over the past {days} days'
msg += '\n' + await self.worst_offsenses(user=user, days=days, top=3, emoji_str=match.group('emoji'))
return msg
async def worst_offsenses(self, user: discord.User, days: int, top: int, emoji_str: str) -> str:
cdf = self.data.emoji_messages(get_emoji_name(emoji_str), days=days)
cdf = cdf[cdf['user id'] == user.id].sort_values('count', ascending=False).iloc[:top]
if cdf.shape[0] > 0:
res = f'Top {top} {emoji_str}\n'
res += f'\n'.join(
f'{emoji_str}x{row["count"]:.0f}\n{row["link"]}' for idx, row in cdf.iterrows())
else:
res = f'No {emoji_str} for {user} in the past {days} days'
return res
def get_emoji_name(string: str) -> str:
if (m := re.search('<:(?P<name>\w+):(?P<id>\d+)>', string)):
string = m.group('name')
return string.lower().strip()
if __name__ == '__main__':
client = RoboPage()
@client.event
async def on_ready():
await client.handle_ready()
@client.event
async def on_message(message: discord.Message):
await client.handle_message(message)
@client.event
async def on_raw_reaction_add(payload: RawReactionActionEvent):
await client.handle_raw_reaction(payload)
@client.event
async def on_raw_reaction_remove(payload: RawReactionActionEvent):
await client.handle_raw_reaction(payload)
load_dotenv()
client.run(os.getenv('DISCORD_TOKEN'))