Files
kwaylon/robopage.py
2021-08-02 15:10:59 -05:00

211 lines
7.1 KiB
Python

import logging
import os
import re
from threading import Lock
import discord
import nltk
import stockquotes
from dotenv import load_dotenv
from msg import get_and_save, cancellations, cancelled_totals, report_string
logging.basicConfig(level=logging.INFO)
LIL_STINKY_ID = 704043422276780072
class RoboPage(discord.Client):
db_path: str = 'messages.db'
def __init__(self, *args, **kwargs):
super(RoboPage, self).__init__(*args, **kwargs)
self.jokes = [
CumJoke(),
BlackJoke(),
AssJoke(),
DominosJoke()
]
self.lock = Lock()
def run(self):
return super().run(os.getenv('DISCORD_TOKEN'))
async def reaction_messages(self, target: str, **kwargs):
for c in self.get_all_channels():
if c.guild.name == 'Family Dinner' and \
isinstance(c, discord.TextChannel):
if c.id != LIL_STINKY_ID:
print(f'Scanning in {c.name} for cancellations')
for m in await c.history(**kwargs).flatten():
if len(m.reactions) > 0:
for r in m.reactions:
if isinstance(r.emoji, discord.Emoji) and r.emoji.name == target:
print(r.count, m.author.display_name)
yield m, r.count
async def handle_ready(self):
channel: discord.TextChannel = discord.utils.get(self.get_all_channels(), name='robotics-facility')
await channel.send(f"I'm aliiiiiive {discord.utils.get(self.emojis, name='kaylon')}")
async def handle_message(self, message):
if message.author != self.user:
if 'most cancelled' in message.content:
msg: discord.Message = await message.reply('Hold please...')
await message.reply(await self.get_cancelled_totals(limit=1000, days=14))
elif (m := re.search('top cancelled (?P<name>\w+)', message.content)) is not None:
if self.lock.acquire(blocking=False):
msg: discord.Message = await message.reply('Hold please...')
await message.reply(await self.top_cancellations(user=m.group('name'), limit=1000, days=14))
self.lock.release()
else:
await message.reply("I'm busy!")
for joke in self.jokes:
if (scan_res := joke.scan(message)):
print(f'{joke.__class__.__name__} detected:\n{message.content}\n{scan_res}')
await joke.respond(message, self, scan_res)
async def get_cancelled_totals(self, limit=1000, days: int = 90):
msg_df, react_df = await get_and_save(self.db_path, client=self, limit=limit, days=days)
res = cancelled_totals(cancellations(msg_df, react_df, days=days))
res = f'Cancellation totals, past {days} days\n' + report_string(res.iloc[:5])
return res
async def top_cancellations(self, user: str, limit: int, days: int):
msg_df, react_df = await get_and_save(self.db_path, client=self, limit=limit, days=days)
cdf = cancellations(msg_df, react_df, days=days)
cdf = cdf[cdf['display_name'].str.contains(user, case=False)]
if cdf.shape[0] > 0:
res = f'{user}\'s top 5 cancellations in the last {days} days:\n'
res += f'\n'.join(f'`{row["count"]} cancellations`\n{row["link"]}' for idx, row in cdf.iloc[:5].iterrows())
else:
res = f'No cancellations in the past {days} days'
return res
class Joke:
@property
def regex(self) -> re.Pattern:
raise NotImplementedError
def scan(self, message: discord.Message):
if (match := self.regex.search(message.content)):
return match
async def respond(self, message: discord.Message, client: discord.Client, scan_res):
raise NotImplementedError
class CumJoke(Joke):
@property
def regex(self) -> re.Pattern:
words = [
'come',
'coming',
'came',
'cum',
'cumming',
'cummed'
]
return re.compile(f"(?<!\w)({'|'.join(words)})(?!\w)", re.IGNORECASE)
async def respond(self, message: discord.Message, client: discord.Client, match: re.Match):
if not match.group(0).startswith('be'):
await message.add_reaction(discord.utils.get(client.emojis, name='kaylon'))
class BlackJoke(Joke):
@property
def regex(self) -> re.Pattern:
return re.compile('black (\w+)', re.IGNORECASE)
async def respond(self, message: discord.Message, client: discord.Client, match: re.Match):
res = unblack(message.content)
if res is not None:
msg = await message.reply(res)
await msg.add_reaction(discord.utils.get(client.emojis, name='kaylon'))
class AssJoke(Joke):
@property
def regex(self) -> re.Pattern:
return re.compile('[ \-]ass[ \-](?P<target>\w+)', re.IGNORECASE)
async def respond(self, message: discord.Message, client: discord.Client, match: re.Match):
res = assify(message.content)
if res is not None:
await message.reply(f'{res} {discord.utils.get(client.emojis, name="kaylon")}')
class DominosJoke(Joke):
@property
def regex(self) -> re.Pattern:
return re.compile('domino\'?s', re.IGNORECASE)
async def respond(self, message: discord.Message, client: discord.Client, match: re.Match):
cp = stockquotes.Stock('DPZ').current_price
msg = f'You know, my friend Ben has made about ${cp - 16:.0f} on Domino\'s stock. He basically owns it now'
if (e := discord.utils.get(client.emojis, name="pizza")):
await message.add_reaction(e)
await message.reply(msg)
pattern = 'NP: {<DT>?<JJ>*<NN>}'
cp = nltk.RegexpParser(pattern)
def token_list(s):
return nltk.chunk.tree2conlltags(
cp.parse(
nltk.pos_tag(
nltk.word_tokenize(s)
)))
def assify(s):
tag_list = token_list(s)
for i, (text, tag, iob) in enumerate(tag_list):
if text[-3:].lower() == 'ass':
try:
next_tag = tag_list[i + 1][1]
if next_tag == 'NN' or next_tag == 'NNS':
return f'ass-{tag_list[i + 1][0]}'
except IndexError as e:
return
def unblack(s):
tag_list = token_list(s)
for i, (text, tag, iob) in enumerate(tag_list):
if text.lower() == 'black':
if tag.startswith('JJ') or tag.startswith('NN'):
for text, tag, iob in tag_list[i + 1:]:
if tag.startswith('NN'):
return f'Or as I would say, {text.lower()}'
if __name__ == '__main__':
load_dotenv()
client = RoboPage()
@client.event
async def on_ready():
print(f'{client.user} has connected to Discord!')
# await client.handle_ready()
# msg_df, react_df = await get_and_save('messages.db', client=client, limit=5000, days=90)
@client.event
async def on_message(message: discord.Message):
await client.handle_message(message)
client.run()