Files
kwaylon/msg.py

110 lines
3.3 KiB
Python

import logging
import os
from datetime import datetime, timedelta
from typing import Dict, Iterable
import discord
import pandas as pd
from dotenv import load_dotenv
LOGGER = logging.getLogger(__name__)
def convert_emoji(emoji):
try:
emoji.name.encode('ascii')
except UnicodeEncodeError as e:
emoji.name = emoji.name.encode('unicode-escape').decode('ascii')
return emoji
async def message_df(client: discord.Client, **kwargs):
return pd.DataFrame(
[message_dict(m) async for m in message_gen(client, **kwargs)]
).set_index('id').sort_values('created', ascending=False)
async def message_gen(client: discord.Client, limit=20, days: int = 90, **kwargs):
channels = client.get_all_channels()
channels = filter(lambda c: isinstance(c, discord.TextChannel), channels)
channels = filter(lambda c: c.category.name != 'Archive', channels)
channels = sorted(channels, key=lambda c: (c.category.name, c.name))
for channel in channels:
LOGGER.info(f'{channel.category.name} #{channel.name}')
if 'after' not in kwargs:
kwargs['after'] = (datetime.today() - timedelta(days=days))
elif isinstance((after := kwargs.get('after', None)), datetime):
kwargs['after'] = after.replace(tzinfo=None)
async for msg in channel.history(limit=limit, **kwargs):
yield msg
def message_dict(m: discord.Message) -> Dict:
return {
'object': m,
'id': m.id,
'created': m.created_at.astimezone(),
'display_name': m.author.display_name,
'user id': m.author.id,
'message': m.content,
'channel': m.channel.name,
'channel link': m.channel.mention,
'link': m.jump_url,
}
async def reaction_df(msgs: Iterable[discord.Message]):
return pd.concat([await reaction_series(msg) for msg in msgs if len(msg.reactions) > 0]).set_index(
['msg id', 'emoji'])
async def reaction_series(msg: discord.Message):
if len(msg.reactions) > 0:
return pd.DataFrame([
await reaction_dict(r)
for r in msg.reactions
])
async def reaction_dict(r: discord.Reaction) -> Dict:
is_emoji = isinstance(r.emoji, (discord.Emoji, discord.PartialEmoji))
# LOGGER.info(repr(r.emoji))
return {
'msg id': r.message.id,
'emoji': r.emoji.name if is_emoji else r.emoji.encode('unicode-escape').decode('ascii'),
'emoji id': r.emoji.id if is_emoji else None,
'count': int(r.count),
}
def emoji_totals(edf: pd.DataFrame) -> pd.DataFrame:
totals = edf.groupby('display_name').sum()['count'].sort_values(ascending=False).apply(int)
max_channels = (
edf
.groupby(['display_name', 'channel'])
.sum()['count']
.sort_values(ascending=False)
.groupby(level=0)
.apply(lambda gdf: gdf.idxmax()[1])
)
return pd.DataFrame({
'total': totals,
'max channel': max_channels,
# 'worst': cdf.groupby('display_name').max()['link']
}).sort_values('total', ascending=False)
if __name__ == '__main__':
client = discord.Client()
logging.basicConfig(level=logging.INFO)
@client.event
async def on_ready():
print(f'{client.user} has connected to Discord!')
load_dotenv()
client.run(os.getenv('DISCORD_TOKEN'))