created MsgData, reworked cancellation calculations
This commit is contained in:
176
msg.py
176
msg.py
@@ -1,20 +1,85 @@
|
||||
import asyncio
|
||||
import logging
|
||||
import os
|
||||
import sqlite3
|
||||
from datetime import datetime, timedelta
|
||||
from typing import Dict, Iterable, Tuple
|
||||
from typing import Dict, Iterable
|
||||
|
||||
import discord
|
||||
import pandas as pd
|
||||
from discord.raw_models import RawReactionActionEvent
|
||||
from dotenv import load_dotenv
|
||||
|
||||
LOGGER = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class MsgData:
|
||||
msgs: pd.DataFrame
|
||||
reactions: pd.DataFrame
|
||||
lock: asyncio.Lock
|
||||
|
||||
@classmethod
|
||||
async def create(cls, client: discord.Client, **kwargs):
|
||||
self = MsgData()
|
||||
self.msgs: pd.DataFrame = await message_df(client, **kwargs)
|
||||
self.msgs = self.msgs.sort_values('created')
|
||||
self.reactions: pd.DataFrame = await reaction_df(self.msgs['object'].tolist())
|
||||
self.lock = asyncio.Lock()
|
||||
return self
|
||||
|
||||
def __str__(self):
|
||||
return str(self.msgs) + '\n\n' + str(self.reactions)
|
||||
|
||||
async def add_msg(self, message: discord.Message):
|
||||
async with self.lock:
|
||||
mdict = message_dict(message)
|
||||
mdict.pop('id')
|
||||
self.msgs.loc[message.id] = pd.Series(mdict)
|
||||
LOGGER.info(f'Added message id {message.id} from {message.author}: {message.content}')
|
||||
|
||||
async def update_reaction(self, client: discord.Client, payload: RawReactionActionEvent):
|
||||
if isinstance(payload.emoji, discord.PartialEmoji):
|
||||
chan: discord.TextChannel = await client.fetch_channel(channel_id=payload.channel_id)
|
||||
msg: discord.Message = await chan.fetch_message(id=payload.message_id)
|
||||
idx = (msg.id, payload.emoji.name)
|
||||
|
||||
for reaction in msg.reactions:
|
||||
if isinstance(reaction.emoji, discord.Emoji) and reaction.emoji.name == payload.emoji.name:
|
||||
reactions = pd.Series(await reaction_dict(reaction))
|
||||
async with self.lock:
|
||||
self.reactions.loc[pd.IndexSlice[idx], :] = reactions
|
||||
LOGGER.info(f'Added {str(idx)}, {int(self.reactions.loc[pd.IndexSlice[idx], "count"])} total')
|
||||
break
|
||||
else:
|
||||
# only reaches here if the remove action was to take off the last reaction of that type
|
||||
if payload.event_type == 'REACTION_REMOVE':
|
||||
try:
|
||||
async with self.lock:
|
||||
self.reactions = self.reactions.drop(idx, axis=0)
|
||||
except KeyError as e:
|
||||
LOGGER.info(f'{idx} not in index')
|
||||
else:
|
||||
LOGGER.info(f'Dropped {idx}')
|
||||
|
||||
def cancellations(self, days: int = 14):
|
||||
return cancellations(msg_df=self.msgs, react_df=self.reactions, days=days)
|
||||
|
||||
def cancellation_totals(self, days: int = 14):
|
||||
return cancelled_totals(cdf=self.cancellations(days=days))
|
||||
|
||||
|
||||
async def message_df(client: discord.Client, **kwargs):
|
||||
return pd.DataFrame(
|
||||
[message_dict(m) async for m in message_gen(client, **kwargs)]
|
||||
).set_index('id').sort_values('created', ascending=False)
|
||||
|
||||
|
||||
async def message_gen(client: discord.Client, limit=20, days: int = 90, **kwargs):
|
||||
channels = client.get_all_channels()
|
||||
channels = filter(lambda c: isinstance(c, discord.TextChannel), channels)
|
||||
channels = filter(lambda c: c.category.name != 'Archive', channels)
|
||||
channels = sorted(channels, key=lambda c: (c.category.name, c.name))
|
||||
for channel in channels:
|
||||
print(f'{channel.category.name} #{channel.name}')
|
||||
LOGGER.info(f'{channel.category.name} #{channel.name}')
|
||||
if 'after' not in kwargs:
|
||||
kwargs['after'] = (datetime.today() - timedelta(days=days))
|
||||
elif isinstance((after := kwargs.get('after', None)), datetime):
|
||||
@@ -36,100 +101,44 @@ def message_dict(m: discord.Message) -> Dict:
|
||||
}
|
||||
|
||||
|
||||
async def message_df(client: discord.Client, **kwargs):
|
||||
return pd.DataFrame(
|
||||
[message_dict(m) async for m in message_gen(client, **kwargs)]
|
||||
).set_index('id').sort_values('created', ascending=False)
|
||||
async def reaction_df(msgs: Iterable[discord.Message]):
|
||||
return pd.concat([await reaction_series(msg) for msg in msgs]).set_index(['msg id', 'emoji'])
|
||||
|
||||
|
||||
async def reaction_series(msg: discord.Message):
|
||||
return pd.DataFrame(
|
||||
[{
|
||||
'msg id': msg.id,
|
||||
'emoji': r.emoji.name,
|
||||
'emoji id': r.emoji.id,
|
||||
'count': r.count,
|
||||
'users': str(list(map(lambda u: u.display_name, (u for u in await r.users().flatten())))),
|
||||
}
|
||||
[
|
||||
await reaction_dict(r)
|
||||
for r in msg.reactions
|
||||
if isinstance(r.emoji, discord.Emoji)
|
||||
]
|
||||
)
|
||||
|
||||
|
||||
async def reaction_df(msgs: Iterable[discord.Message]):
|
||||
return pd.concat([await reaction_series(msg) for msg in msgs])
|
||||
|
||||
|
||||
def add_reactions(con: sqlite3.Connection, new_reacts: pd.DataFrame, table_name: str = 'reactions'):
|
||||
react_df = load_reactions(con, table_name)
|
||||
react_df = react_df.append(new_reacts, ignore_index=True)
|
||||
react_df = react_df.drop_duplicates(['msg id', 'emoji id']).reset_index(drop=True)
|
||||
try:
|
||||
react_df.to_sql('reactions', con, if_exists='replace', index=False)
|
||||
except sqlite3.InterfaceError as e:
|
||||
logging.exception(e)
|
||||
else:
|
||||
print(f'Saved {react_df.shape[0]} reactions')
|
||||
return react_df
|
||||
|
||||
|
||||
def add_msgs(con: sqlite3.Connection, new_msgs: pd.DataFrame, table_name: str = 'msgs'):
|
||||
msg_df = load_msgs(con, table_name)
|
||||
msg_df = msg_df.append(new_msgs)
|
||||
msg_df['created'] = pd.to_datetime(msg_df['created'], utc=True)
|
||||
msg_df = msg_df[~msg_df.index.duplicated()].sort_values('created', ascending=False)
|
||||
try:
|
||||
msg_df.to_sql('msgs', con, if_exists='replace', index=True, index_label=msg_df.index.name)
|
||||
except sqlite3.InterfaceError as e:
|
||||
logging.exception(e)
|
||||
else:
|
||||
print(f'Saved {msg_df.shape[0]} messages')
|
||||
return msg_df
|
||||
|
||||
|
||||
async def get_and_save(db_file, client: discord.Client, limit: int, days: int):
|
||||
df = await message_df(client, limit=limit, days=days)
|
||||
print(f'Getting users for each reaction of {df.shape[0]} messages...')
|
||||
reactions = await reaction_df(df['object'].tolist())
|
||||
print('Done')
|
||||
|
||||
df = df.drop('object', axis=1)
|
||||
|
||||
con = sqlite3.connect(db_file)
|
||||
try:
|
||||
msg_df = add_msgs(con, df)
|
||||
react_df = add_reactions(con, reactions)
|
||||
except Exception as e:
|
||||
logging.exception(e)
|
||||
finally:
|
||||
con.close()
|
||||
return msg_df, react_df
|
||||
|
||||
|
||||
def load_both(con: sqlite3.Connection) -> Tuple[pd.DataFrame, pd.DataFrame]:
|
||||
return (load_msgs(con), load_reactions(con))
|
||||
|
||||
|
||||
def load_msgs(con: sqlite3.Connection, table_name: str = 'msgs') -> pd.DataFrame:
|
||||
df = pd.read_sql(f'select * from {table_name}', con, index_col='id')
|
||||
df['created'] = df['created'].apply(pd.to_datetime, utc=True)
|
||||
return df
|
||||
|
||||
|
||||
def load_reactions(con: sqlite3.Connection, table_name: str = 'reactions') -> pd.DataFrame:
|
||||
return pd.read_sql(f'select * from {table_name}', con)
|
||||
async def reaction_dict(r: discord.Reaction) -> Dict:
|
||||
return {
|
||||
'msg id': r.message.id,
|
||||
'emoji': r.emoji.name,
|
||||
'emoji id': r.emoji.id,
|
||||
'count': int(r.count),
|
||||
# 'users': str(list(map(lambda u: u.display_name, (u for u in await r.users().flatten())))),
|
||||
}
|
||||
|
||||
|
||||
def cancellations(msg_df, react_df, days: int = 10) -> pd.DataFrame:
|
||||
cancelled = react_df[react_df['emoji'] == 'cancelled']
|
||||
cancelled_msgs = msg_df.loc[cancelled['msg id'].to_list()]
|
||||
# get reactions with a cancellation emoji
|
||||
cancel_reactions = react_df.loc[pd.IndexSlice[:, 'cancelled'], :]
|
||||
cancel_msgs = msg_df.loc[cancel_reactions.index.get_level_values(0).to_list()]
|
||||
|
||||
cancelled_msgs['created'] = cancelled_msgs['created'].apply(pd.to_datetime, utc=True)
|
||||
cancelled_msgs = cancelled_msgs[cancelled_msgs['created'] >= (datetime.today() - timedelta(days=days)).astimezone()]
|
||||
cancel_msgs['count'] = cancel_msgs.index.to_series().apply(
|
||||
lambda idx: cancel_reactions.loc[pd.IndexSlice[idx, 'cancelled'], 'count'])
|
||||
|
||||
cancelled_msgs['count'] = cancelled.set_index('msg id').loc[cancelled_msgs.index]['count']
|
||||
cancelled_msgs = cancelled_msgs.sort_values('count', ascending=False)
|
||||
# filter outdated messages
|
||||
cancel_msgs = cancel_msgs[cancel_msgs['created'] >= (datetime.today() - timedelta(days=days)).astimezone()]
|
||||
|
||||
cancel_msgs = cancel_msgs.sort_values('count', ascending=False)
|
||||
|
||||
return cancel_msgs
|
||||
|
||||
return cancelled_msgs
|
||||
|
||||
@@ -154,7 +163,7 @@ def cancelled_totals(cdf: pd.DataFrame) -> pd.DataFrame:
|
||||
def report_string(df):
|
||||
width = max(list(map(lambda s: len(str(s)), df.index.values)))
|
||||
return '\n'.join(
|
||||
f"`{name.ljust(width + 1)}with {row['total']:<2} total`"
|
||||
f"`{name.ljust(width + 1)}with {row['total']:<2.0f} total`"
|
||||
for name, row in df.iterrows()
|
||||
)
|
||||
|
||||
@@ -168,7 +177,6 @@ if __name__ == '__main__':
|
||||
@client.event
|
||||
async def on_ready():
|
||||
print(f'{client.user} has connected to Discord!')
|
||||
await get_and_save('messages.db', client, limit=5000, days=90)
|
||||
|
||||
|
||||
load_dotenv()
|
||||
|
||||
Reference in New Issue
Block a user