import asyncio import logging import os import sqlite3 from datetime import datetime, timedelta from pathlib import Path from typing import Dict, Iterable import discord import pandas as pd from discord.raw_models import RawReactionActionEvent from dotenv import load_dotenv LOGGER = logging.getLogger(__name__) class MsgData: msgs: pd.DataFrame reactions: pd.DataFrame lock: asyncio.Lock @classmethod async def create(cls, client: discord.Client, **kwargs): self = MsgData() self.msgs: pd.DataFrame = await message_df(client, **kwargs) self.msgs = self.msgs.sort_values('created') self.reactions: pd.DataFrame = await reaction_df(self.msgs['object'].tolist()) self.lock = asyncio.Lock() return self @classmethod def from_sql(cls, db): if isinstance(db, (str, Path)): con = sqlite3.connect(db) elif isinstance(db, sqlite3.Connection): con = db self = MsgData() self.msgs: pd.DataFrame = pd.read_sql('select * from msgs', con=con, index_col='id') self.msgs['created'] = self.msgs['created'].apply(pd.to_datetime, utc=True) self.reactions: pd.DataFrame = pd.read_sql('select * from reactions', con).set_index(['msg id', 'emoji']) return self def to_sql(self, db): if isinstance(db, (str, Path)): con = sqlite3.connect(db) elif isinstance(db, sqlite3.Connection): con = db self.msgs.drop('object', axis=1).to_sql( name='msgs', con=con, if_exists='replace', index=True, index_label=self.msgs.index.name ) self.reactions.to_sql( name='reactions', con=con, if_exists='replace', index=True, index_label=self.reactions.index.name ) def __str__(self): return str(self.msgs) + '\n\n' + str(self.reactions) async def add_msg(self, message: discord.Message): async with self.lock: mdict = message_dict(message) mdict.pop('id') self.msgs.loc[message.id] = pd.Series(mdict) LOGGER.info(f'Added message id {message.id} from {message.author}: {message.content}') async def update_reaction(self, client: discord.Client, payload: RawReactionActionEvent): if isinstance(payload.emoji, discord.PartialEmoji): chan: discord.TextChannel = await client.fetch_channel(channel_id=payload.channel_id) msg: discord.Message = await chan.fetch_message(id=payload.message_id) idx = (msg.id, payload.emoji.name) for reaction in msg.reactions: if isinstance(reaction.emoji, discord.Emoji) and reaction.emoji.name == payload.emoji.name: reactions = pd.Series(await reaction_dict(reaction)) async with self.lock: self.reactions.loc[pd.IndexSlice[idx], :] = reactions LOGGER.info(f'Added {str(idx)}, {int(self.reactions.loc[pd.IndexSlice[idx], "count"])} total') break else: # only reaches here if the remove action was to take off the last reaction of that type if payload.event_type == 'REACTION_REMOVE': try: async with self.lock: self.reactions = self.reactions.drop(idx, axis=0) except KeyError as e: LOGGER.info(f'{idx} not in index') else: LOGGER.info(f'Dropped {idx}') def cancellations(self, days: int = 14): return cancellations(msg_df=self.msgs, react_df=self.reactions, days=days) def cancellation_totals(self, days): return cancelled_totals(cdf=self.cancellations(days=days)) def cancellation_leaderboard(self, days, top: int = None): df = self.cancellation_totals(days) if top is not None: df = df.iloc[:top] width = max(list(map(lambda s: len(str(s)), df.index.values))) res = f'Cancellation totals, past {days} days\n' res += '\n'.join( f"`{name.ljust(width + 1)}with {row['total']:<2.0f} total`" for name, row in df.iterrows() ) return res async def message_df(client: discord.Client, **kwargs): return pd.DataFrame( [message_dict(m) async for m in message_gen(client, **kwargs)] ).set_index('id').sort_values('created', ascending=False) async def message_gen(client: discord.Client, limit=20, days: int = 90, **kwargs): channels = client.get_all_channels() channels = filter(lambda c: isinstance(c, discord.TextChannel), channels) channels = filter(lambda c: c.category.name != 'Archive', channels) channels = sorted(channels, key=lambda c: (c.category.name, c.name)) for channel in channels: LOGGER.info(f'{channel.category.name} #{channel.name}') if 'after' not in kwargs: kwargs['after'] = (datetime.today() - timedelta(days=days)) elif isinstance((after := kwargs.get('after', None)), datetime): kwargs['after'] = after.replace(tzinfo=None) async for msg in channel.history(limit=limit, **kwargs): yield msg def message_dict(m: discord.Message) -> Dict: return { 'object': m, 'id': m.id, 'created': m.created_at.astimezone(), 'display_name': m.author.display_name, 'user id': m.author.id, 'message': m.content, 'channel': m.channel.name, 'channel link': f'<#{m.channel.id}>', 'link': m.jump_url, } async def reaction_df(msgs: Iterable[discord.Message]): return pd.concat([await reaction_series(msg) for msg in msgs]).set_index(['msg id', 'emoji']) async def reaction_series(msg: discord.Message): return pd.DataFrame([ await reaction_dict(r) for r in msg.reactions if isinstance(r.emoji, discord.Emoji) ]) async def reaction_dict(r: discord.Reaction) -> Dict: return { 'msg id': r.message.id, 'emoji': r.emoji.name, 'emoji id': r.emoji.id, 'count': int(r.count), # 'users': str(list(map(lambda u: u.display_name, (u for u in await r.users().flatten())))), } def cancellations(msg_df, react_df, days: int = 10) -> pd.DataFrame: # get reactions with a cancellation emoji cancel_reactions = react_df.loc[pd.IndexSlice[:, 'cancelled'], :] cancel_msgs = msg_df.loc[cancel_reactions.index.get_level_values(0).to_list()] cancel_msgs['count'] = cancel_msgs.index.to_series().apply( lambda idx: cancel_reactions.loc[pd.IndexSlice[idx, 'cancelled'], 'count']) # filter outdated messages cancel_msgs = cancel_msgs[cancel_msgs['created'] >= (datetime.today() - timedelta(days=days)).astimezone()] cancel_msgs = cancel_msgs.sort_values('count', ascending=False) return cancel_msgs return cancelled_msgs def cancelled_totals(cdf: pd.DataFrame) -> pd.DataFrame: totals = cdf.groupby('display_name').sum()['count'].sort_values(ascending=False) max_channels = ( cdf .groupby(['display_name', 'channel']) .sum()['count'] .sort_values(ascending=False) .groupby(level=0) .apply(lambda gdf: gdf.idxmax()[1]) ) return pd.DataFrame({ 'total': totals, 'max channel': max_channels, 'worst': cdf.groupby('display_name').max()['link'] }).sort_values('total', ascending=False) if __name__ == '__main__': client = discord.Client() logging.basicConfig(level=logging.INFO) @client.event async def on_ready(): print(f'{client.user} has connected to Discord!') load_dotenv() client.run(os.getenv('DISCORD_TOKEN'))