194 lines
6.3 KiB
Python
194 lines
6.3 KiB
Python
import logging
|
|
import os
|
|
import re
|
|
|
|
import discord
|
|
import nltk
|
|
import pandas as pd
|
|
from dotenv import load_dotenv
|
|
|
|
logging.basicConfig(level=logging.INFO)
|
|
|
|
LIL_STINKY_ID = 704043422276780072
|
|
|
|
|
|
def cancelled_totals(df):
|
|
totals = df.groupby('display_name')['cancelled'].sum().sort_values(ascending=False)
|
|
name_width = df['display_name'].apply(str).apply(len).max()
|
|
|
|
df2 = df.groupby(['display_name', 'channel']).sum().reset_index().groupby('display_name').max()
|
|
df2.columns = ['channel most cancelled', 'channel cancel count']
|
|
df2['total cancelled'] = df2.index.to_series().apply(lambda n: df[df['display_name'] == n]['cancelled'].sum())
|
|
df2['link'] = df2.index.to_series().apply(lambda n: df[df['channel'] == df2.loc[n].iloc[0]]['channel link'].iloc[0])
|
|
|
|
res = 'Cancellation totals:\n'
|
|
res += '\n'.join(
|
|
f'`{total} {name.ljust(name_width)}` most in {df2.loc[name, "link"]}' for name, total in totals.iteritems()
|
|
)
|
|
return res
|
|
|
|
|
|
class RoboPage(discord.Client):
|
|
def __init__(self, *args, **kwargs):
|
|
super(RoboPage, self).__init__(*args, **kwargs)
|
|
self.jokes = [
|
|
CumJoke(),
|
|
BlackJoke(),
|
|
AssJoke()
|
|
]
|
|
print()
|
|
|
|
async def reaction_messages(self, target: str, **kwargs):
|
|
for c in self.get_all_channels():
|
|
if c.guild.name == 'Family Dinner' and \
|
|
isinstance(c, discord.TextChannel):
|
|
if c.id != LIL_STINKY_ID:
|
|
print(f'Scanning in {c.name} for cancellations')
|
|
for m in await c.history(**kwargs).flatten():
|
|
if len(m.reactions) > 0:
|
|
for r in m.reactions:
|
|
if isinstance(r.emoji, discord.Emoji) and r.emoji.name == target:
|
|
print(r.count, m.author.display_name)
|
|
yield m, r.count
|
|
|
|
|
|
async def get_cancelled_totals(self, limit=1000):
|
|
df = pd.DataFrame(
|
|
[{
|
|
'display_name': m.author.display_name,
|
|
'cancelled': count,
|
|
'message': m.content,
|
|
'channel': m.channel.name,
|
|
'channel link': f'<#{m.channel.id}>',
|
|
'link': m.jump_url
|
|
}
|
|
async for m, count in self.reaction_messages('cancelled', limit=limit)]
|
|
)
|
|
|
|
df.to_csv('msgs.csv', index=False)
|
|
return cancelled_totals(df)
|
|
|
|
def run(self):
|
|
return super().run(os.getenv('DISCORD_TOKEN'))
|
|
|
|
async def handle_ready(self):
|
|
channel: discord.TextChannel = discord.utils.get(self.get_all_channels(), name='robotics-facility')
|
|
await channel.send(f"I'm aliiiiiive {discord.utils.get(self.emojis, name='kaylon')}")
|
|
|
|
async def handle_message(self, message):
|
|
if message.author != self.user:
|
|
if 'most cancelled' in message.content:
|
|
msg: discord.Message = await message.reply('Hold please...')
|
|
await msg.reply(await self.get_cancelled_totals(limit=1000))
|
|
|
|
for joke in self.jokes:
|
|
if (scan_res := joke.scan(message)):
|
|
print(f'{joke.__class__.__name__} detected:\n{message.content}\n{scan_res}')
|
|
await joke.respond(message, self, scan_res)
|
|
|
|
|
|
class Joke:
|
|
@property
|
|
def regex(self) -> re.Pattern:
|
|
raise NotImplementedError
|
|
|
|
def scan(self, message: discord.Message):
|
|
if (match := self.regex.search(message.content)):
|
|
return match
|
|
|
|
async def respond(self, message: discord.Message, client: discord.Client, scan_res):
|
|
raise NotImplementedError
|
|
|
|
|
|
class CumJoke(Joke):
|
|
@property
|
|
def regex(self) -> re.Pattern:
|
|
words = [
|
|
'come',
|
|
'coming',
|
|
'came',
|
|
'cum',
|
|
'cumming',
|
|
'cummed'
|
|
]
|
|
return re.compile(f"(?<!\w)({'|'.join(words)})(?!\w)", re.IGNORECASE)
|
|
|
|
async def respond(self, message: discord.Message, client: discord.Client, match: re.Match):
|
|
if not match.group(0).startswith('be'):
|
|
await message.add_reaction(discord.utils.get(client.emojis, name='kaylon'))
|
|
|
|
|
|
class BlackJoke(Joke):
|
|
@property
|
|
def regex(self) -> re.Pattern:
|
|
return re.compile('black (\w+)', re.IGNORECASE)
|
|
|
|
async def respond(self, message: discord.Message, client: discord.Client, match: re.Match):
|
|
res = unblack(message.content)
|
|
if res is not None:
|
|
msg = await message.reply(res)
|
|
await msg.add_reaction(discord.utils.get(client.emojis, name='kaylon'))
|
|
|
|
|
|
class AssJoke(Joke):
|
|
@property
|
|
def regex(self) -> re.Pattern:
|
|
return re.compile('[ \-]ass[ \-](?P<target>\w+)', re.IGNORECASE)
|
|
|
|
async def respond(self, message: discord.Message, client: discord.Client, match: re.Match):
|
|
res = assify(message.content)
|
|
if res is not None:
|
|
await message.reply(f'{res} {discord.utils.get(client.emojis, name="kaylon")}')
|
|
|
|
|
|
pattern = 'NP: {<DT>?<JJ>*<NN>}'
|
|
cp = nltk.RegexpParser(pattern)
|
|
|
|
def token_list(s):
|
|
return nltk.chunk.tree2conlltags(
|
|
cp.parse(
|
|
nltk.pos_tag(
|
|
nltk.word_tokenize(s)
|
|
)))
|
|
|
|
|
|
def assify(s):
|
|
tag_list = token_list(s)
|
|
for i, (text, tag, iob) in enumerate(tag_list):
|
|
if text[-3:].lower() == 'ass':
|
|
try:
|
|
next_tag = tag_list[i+1][1]
|
|
if next_tag == 'NN' or next_tag == 'NNS':
|
|
return f'ass-{tag_list[i+1][0]}'
|
|
except IndexError as e:
|
|
return
|
|
|
|
|
|
def unblack(s):
|
|
tag_list = token_list(s)
|
|
for i, (text, tag, iob) in enumerate(tag_list):
|
|
if text.lower() == 'black':
|
|
if tag.startswith('JJ') or tag.startswith('NN'):
|
|
for text, tag, iob in tag_list[i+1:]:
|
|
if tag.startswith('NN'):
|
|
return f'Or as I would say, {text.lower()}'
|
|
|
|
|
|
if __name__ == '__main__':
|
|
load_dotenv()
|
|
TOKEN = os.getenv('DISCORD_TOKEN')
|
|
|
|
client = RoboPage()
|
|
|
|
@client.event
|
|
async def on_ready():
|
|
print(f'{client.user} has connected to Discord!')
|
|
await client.handle_ready()
|
|
# print(await client.get_cancelled_totals(limit=100))
|
|
|
|
@client.event
|
|
async def on_message(message: discord.Message):
|
|
await client.handle_message(message)
|
|
|
|
client.run()
|