Files
kwaylon/robopage.py
2021-07-16 19:48:55 -05:00

209 lines
6.9 KiB
Python

import logging
import os
import re
import discord
import nltk
import pandas as pd
import stockquotes
from dotenv import load_dotenv
logging.basicConfig(level=logging.INFO)
LIL_STINKY_ID = 704043422276780072
def cancelled_totals(df):
totals = df.groupby('display_name')['cancelled'].sum().sort_values(ascending=False)
name_width = df['display_name'].apply(str).apply(len).max()
df2 = df.groupby(['display_name', 'channel']).sum().reset_index().groupby('display_name').max()
df2.columns = ['channel most cancelled', 'channel cancel count']
df2['total cancelled'] = df2.index.to_series().apply(lambda n: df[df['display_name'] == n]['cancelled'].sum())
df2['link'] = df2.index.to_series().apply(lambda n: df[df['channel'] == df2.loc[n].iloc[0]]['channel link'].iloc[0])
res = 'Cancellation totals:\n'
res += '\n'.join(
f'`{total} {name.ljust(name_width)}` most in {df2.loc[name, "link"]}' for name, total in totals.iteritems()
)
return res
class RoboPage(discord.Client):
def __init__(self, *args, **kwargs):
super(RoboPage, self).__init__(*args, **kwargs)
self.jokes = [
CumJoke(),
BlackJoke(),
AssJoke(),
DominosJoke()
]
print()
async def reaction_messages(self, target: str, **kwargs):
for c in self.get_all_channels():
if c.guild.name == 'Family Dinner' and \
isinstance(c, discord.TextChannel):
if c.id != LIL_STINKY_ID:
print(f'Scanning in {c.name} for cancellations')
for m in await c.history(**kwargs).flatten():
if len(m.reactions) > 0:
for r in m.reactions:
if isinstance(r.emoji, discord.Emoji) and r.emoji.name == target:
print(r.count, m.author.display_name)
yield m, r.count
async def get_cancelled_totals(self, limit=1000):
df = pd.DataFrame(
[{
'display_name': m.author.display_name,
'cancelled': count,
'message': m.content,
'channel': m.channel.name,
'channel link': f'<#{m.channel.id}>',
'link': m.jump_url
}
async for m, count in self.reaction_messages('cancelled', limit=limit)]
)
df.to_csv('msgs.csv', index=False)
return cancelled_totals(df)
def run(self):
return super().run(os.getenv('DISCORD_TOKEN'))
async def handle_ready(self):
channel: discord.TextChannel = discord.utils.get(self.get_all_channels(), name='robotics-facility')
await channel.send(f"I'm aliiiiiive {discord.utils.get(self.emojis, name='kaylon')}")
async def handle_message(self, message):
if message.author != self.user:
if 'most cancelled' in message.content:
msg: discord.Message = await message.reply('Hold please...')
await msg.reply(await self.get_cancelled_totals(limit=1000))
for joke in self.jokes:
if (scan_res := joke.scan(message)):
print(f'{joke.__class__.__name__} detected:\n{message.content}\n{scan_res}')
await joke.respond(message, self, scan_res)
class Joke:
@property
def regex(self) -> re.Pattern:
raise NotImplementedError
def scan(self, message: discord.Message):
if (match := self.regex.search(message.content)):
return match
async def respond(self, message: discord.Message, client: discord.Client, scan_res):
raise NotImplementedError
class CumJoke(Joke):
@property
def regex(self) -> re.Pattern:
words = [
'come',
'coming',
'came',
'cum',
'cumming',
'cummed'
]
return re.compile(f"(?<!\w)({'|'.join(words)})(?!\w)", re.IGNORECASE)
async def respond(self, message: discord.Message, client: discord.Client, match: re.Match):
if not match.group(0).startswith('be'):
await message.add_reaction(discord.utils.get(client.emojis, name='kaylon'))
class BlackJoke(Joke):
@property
def regex(self) -> re.Pattern:
return re.compile('black (\w+)', re.IGNORECASE)
async def respond(self, message: discord.Message, client: discord.Client, match: re.Match):
res = unblack(message.content)
if res is not None:
msg = await message.reply(res)
await msg.add_reaction(discord.utils.get(client.emojis, name='kaylon'))
class AssJoke(Joke):
@property
def regex(self) -> re.Pattern:
return re.compile('[ \-]ass[ \-](?P<target>\w+)', re.IGNORECASE)
async def respond(self, message: discord.Message, client: discord.Client, match: re.Match):
res = assify(message.content)
if res is not None:
await message.reply(f'{res} {discord.utils.get(client.emojis, name="kaylon")}')
class DominosJoke(Joke):
@property
def regex(self) -> re.Pattern:
return re.compile('domino\'?s', re.IGNORECASE)
async def respond(self, message: discord.Message, client: discord.Client, match: re.Match):
cp = stockquotes.Stock('DPZ').current_price
msg = f'You know, my friend Ben has made about ${cp - 16:.0f} on Domino\'s stock. He basically owns it now'
if (e := discord.utils.get(client.emojis, name="pizza")):
await message.add_reaction(e)
await message.reply(msg)
pattern = 'NP: {<DT>?<JJ>*<NN>}'
cp = nltk.RegexpParser(pattern)
def token_list(s):
return nltk.chunk.tree2conlltags(
cp.parse(
nltk.pos_tag(
nltk.word_tokenize(s)
)))
def assify(s):
tag_list = token_list(s)
for i, (text, tag, iob) in enumerate(tag_list):
if text[-3:].lower() == 'ass':
try:
next_tag = tag_list[i+1][1]
if next_tag == 'NN' or next_tag == 'NNS':
return f'ass-{tag_list[i+1][0]}'
except IndexError as e:
return
def unblack(s):
tag_list = token_list(s)
for i, (text, tag, iob) in enumerate(tag_list):
if text.lower() == 'black':
if tag.startswith('JJ') or tag.startswith('NN'):
for text, tag, iob in tag_list[i+1:]:
if tag.startswith('NN'):
return f'Or as I would say, {text.lower()}'
if __name__ == '__main__':
load_dotenv()
TOKEN = os.getenv('DISCORD_TOKEN')
client = RoboPage()
@client.event
async def on_ready():
print(f'{client.user} has connected to Discord!')
await client.handle_ready()
# print(await client.get_cancelled_totals(limit=100))
@client.event
async def on_message(message: discord.Message):
await client.handle_message(message)
client.run()