import logging import os import re from typing import Union import discord import pandas as pd from discord import RawReactionActionEvent, RawReactionClearEmojiEvent from dotenv import load_dotenv import data import jokes logging.basicConfig(level=logging.INFO) LIL_STINKY_ID = 704043422276780072 LOGGER = logging.getLogger(__name__) class RoboPage(discord.Client): db_path: str = 'messages.db' def __init__(self, *args, **kwargs): super(RoboPage, self).__init__(*args, **kwargs) attrs = filter(lambda n: n.endswith('Joke') and n not in ['Joke', 'GifJoke'], dir(jokes)) attrs = map(lambda n: getattr(jokes, n)(), attrs) self.jokes = list(attrs) self.most_regex = re.compile( "^who is the most\s+(?P\S+)\s*?(?:in the past (?P\d+) days)?\??$", re.IGNORECASE, ) self.leaderboard_regex = re.compile( '^most\s*?(?P\S+?)\s*?(leaderboard|((?:.+?(?P\d+) days)))', re.IGNORECASE ) async def handle_ready(self): async def alive(): channel: discord.TextChannel = discord.utils.get(self.get_all_channels(), name='robotics-facility') await channel.send(f"I'm aliiiiiive {discord.utils.get(self.emojis, name='kaylon')}") self.data: data.MsgData = await data.MsgData.create( client=self, limit=5000, days=30, # limit=100, # days=14, ) self.data.to_sql(self.db_path) LOGGER.info(f'{self.data.msgs.shape[0]} messages total') # await alive() async def handle_message(self, message): if message.author != self.user: if hasattr(self, 'data'): await self.data.add_msg(message) if (m := self.leaderboard_regex.match(message.content)) is not None: try: await message.reply(await self.leaderboard(match=m)) except KeyError as e: LOGGER.exception(e) await message.reply(f"I couldn't find any {m.group('emoji')} reactions. Leave me alone!") return elif (m := self.most_regex.match(message.content)) is not None: try: await message.reply(await self.biggest_single(match=m)) except Exception as e: LOGGER.exception(e) await message.reply('NObody') else: LOGGER.warning(f'No self.data attribute') for joke in self.jokes: if (m := joke.scan(message)) is not None: LOGGER.info(f'{joke.__class__.__name__} detected: {message.content}, {m.group()}') await joke.respond(message, self, m) async def handle_raw_reaction(self, payload: Union[RawReactionActionEvent, RawReactionClearEmojiEvent]): LOGGER.info(payload) guild = await client.fetch_guild(payload.guild_id) channel = await guild.fetch_channel(payload.channel_id) message = await channel.fetch_message(payload.message_id) if payload.event_type == 'REACTION_REMOVE': LOGGER.info(f'{payload.emoji} removed from\n{message.author}: {message.content}') elif payload.event_type == 'REACTION_ADD': LOGGER.info( f'{payload.member.display_name} added {payload.emoji} to\n' + \ f'{message.author.display_name}: {message.content}') if hasattr(self, 'data'): await self.data.update_reaction(msg=message) async def leaderboard(self, match: re.Match) -> str: emoji_name = get_emoji_name(match.group('emoji')) days = match.group('days') or 14 days = int(days) counts = await self.data.emoji_user_counts(client=self, emoji_name=emoji_name, days=days) width = max([len(str(s)) for s in counts.index.values]) res = f'{match.group("emoji")} totals, past {days} days\n' res += '\n'.join(f"`{str(name).ljust(width + 1)}with {cnt:<2.0f} total`" for name, cnt in counts.iteritems()) return res async def biggest_single(self, match: re.Match) -> str: days = match.group('days') or 14 days = int(days) data: pd.Series = self.data.emoji_totals( emoji_name=get_emoji_name(match.group('emoji')), days=days ) user: discord.User = await client.fetch_user(user_id=data.index[0]) LOGGER.info(f'User: {user.mention}') msg = f'{user.mention} with {data.iloc[0]:.0f}x {match.group("emoji")} over the past {days} days' msg += '\n' + await self.worst_offsenses(user=user, days=days, top=3, emoji_str=match.group('emoji')) return msg async def worst_offsenses(self, user: discord.User, days: int, top: int, emoji_str: str) -> str: cdf = self.data.emoji_messages(get_emoji_name(emoji_str), days=days) cdf = cdf[cdf['user id'] == user.id].sort_values('count', ascending=False).iloc[:top] if cdf.shape[0] > 0: res = f'Top {top} {emoji_str}\n' res += f'\n'.join( f'{emoji_str}x{row["count"]:.0f}\n{row["link"]}' for idx, row in cdf.iterrows()) else: res = f'No {emoji_str} for {user} in the past {days} days' return res def get_emoji_name(string: str) -> str: if (m := re.search('<:(?P\w+):(?P\d+)>', string)): string = m.group('name') return string.lower().strip() if __name__ == '__main__': client = RoboPage() @client.event async def on_ready(): await client.handle_ready() @client.event async def on_message(message: discord.Message): await client.handle_message(message) @client.event async def on_raw_reaction_add(payload: RawReactionActionEvent): await client.handle_raw_reaction(payload) @client.event async def on_raw_reaction_remove(payload: RawReactionActionEvent): await client.handle_raw_reaction(payload) load_dotenv() client.run(os.getenv('DISCORD_TOKEN'))