327 lines
13 KiB
Python
Executable File
327 lines
13 KiB
Python
Executable File
import discord
|
|
from discord.ext import commands
|
|
from data import constants
|
|
import asyncio
|
|
import threading
|
|
import random
|
|
from functions.timer import Timer
|
|
from data.DatabaseConnection import *
|
|
from functions import checks
|
|
|
|
class QuizQuestions(commands.Cog):
|
|
def __init__(self, client):
|
|
self.client = client
|
|
|
|
@commands.group(name="questions",case_insensitive=True, invoke_without_command=True)
|
|
@commands.check(checks.isModPlus)
|
|
# /q&a add "What is ...." "Australia"
|
|
async def questions(self, ctx, *args):
|
|
pass
|
|
|
|
@questions.command(name="add")
|
|
@commands.check(checks.isModPlus)
|
|
async def add_question(self, ctx, q, a, reward=50):
|
|
try:
|
|
quizdb = QuizDB()
|
|
q_id = quizdb.add_question(q, a, reward)
|
|
embed = discord.Embed()
|
|
embed.colour = discord.Colour.green()
|
|
embed.add_field(name="Success", value=f"Added question with {q_id}", inline=False)
|
|
await ctx.send(embed=embed)
|
|
except:
|
|
await ctx.send("Something went wrong")
|
|
finally:
|
|
quizdb.close()
|
|
|
|
|
|
@questions.command(name="rm")
|
|
@commands.check(checks.isModPlus)
|
|
async def rm_question(self, ctx, id):
|
|
try:
|
|
quizdb = QuizDB()
|
|
quizdb.rm_question(id)
|
|
embed = discord.Embed()
|
|
embed.colour = discord.Colour.green()
|
|
embed.add_field(name="Success", value=f"Question removed with id {id}", inline=False)
|
|
await ctx.send(embed=embed)
|
|
except QuestionNotFound:
|
|
await ctx.send("No question found with id " + str(id))
|
|
# except:
|
|
# await ctx.send("Something went wrong")
|
|
finally:
|
|
quizdb.close()
|
|
|
|
@questions.command(name="show")
|
|
@commands.check(checks.isModPlus)
|
|
async def show_questions(self, ctx):
|
|
try:
|
|
quizdb = QuizDB()
|
|
questions = quizdb.get_questions()
|
|
if len(questions) == 0:
|
|
embed = discord.Embed()
|
|
embed.colour = discord.Colour.red()
|
|
embed.add_field(name="Questions", value=f"No questions found", inline=False)
|
|
|
|
await ctx.send(embed=embed)
|
|
else:
|
|
embed = discord.Embed()
|
|
embed.set_author(name="Questions")
|
|
embed.colour = discord.Colour.orange()
|
|
for q in questions:
|
|
embed.add_field(name=f"id: {q[0]}", value=f"Question: '{q[1]}'\nAnswer: '{q[2]}'\nReward: {q[3]}\nUsed: {q[4]}", inline=False)
|
|
await ctx.send(embed=embed)
|
|
except:
|
|
await ctx.send("Something went wrong")
|
|
finally:
|
|
quizdb.close()
|
|
|
|
|
|
|
|
@commands.group(name="wordgame",case_insensitive=True, invoke_without_command=True)
|
|
@commands.check(checks.isModPlus)
|
|
# /q&a add "What is ...." "Australia"
|
|
async def wordgame(self, ctx, *args):
|
|
pass
|
|
|
|
@wordgame.command(name="add")
|
|
@commands.check(checks.isModPlus)
|
|
async def add_word(self, ctx, w, reward=50):
|
|
try:
|
|
wordgamedb = WordGameDB()
|
|
w_id = wordgamedb.add_word(w, reward)
|
|
embed = discord.Embed()
|
|
embed.colour = discord.Colour.green()
|
|
embed.add_field(name="Success", value=f"Added word with {w_id}", inline=False)
|
|
await ctx.send(embed=embed)
|
|
except:
|
|
await ctx.send("Something went wrong")
|
|
finally:
|
|
wordgamedb.close()
|
|
|
|
|
|
@wordgame.command(name="rm")
|
|
@commands.check(checks.isModPlus)
|
|
async def rm_word(self, ctx, id):
|
|
try:
|
|
wordgamedb = WordGameDB()
|
|
wordgamedb.rm_word(id)
|
|
embed = discord.Embed()
|
|
embed.colour = discord.Colour.green()
|
|
embed.add_field(name="Success", value=f"Question removed with id {id}", inline=False)
|
|
await ctx.send(embed=embed)
|
|
except QuestionNotFound:
|
|
await ctx.send("No question found with id " + str(id))
|
|
except:
|
|
await ctx.send("Something went wrong")
|
|
finally:
|
|
wordgamedb.close()
|
|
|
|
@wordgame.command(name="show")
|
|
@commands.check(checks.isModPlus)
|
|
async def show_words(self, ctx):
|
|
try:
|
|
wordgamedb = WordGameDB()
|
|
words = wordgamedb.get_words()
|
|
if len(words) == 0:
|
|
embed = discord.Embed()
|
|
embed.colour = discord.Colour.red()
|
|
embed.add_field(name="Words", value=f"No words found", inline=False)
|
|
|
|
await ctx.send(embed=embed)
|
|
else:
|
|
embed = discord.Embed()
|
|
embed.set_author(name="Words")
|
|
embed.colour = discord.Colour.orange()
|
|
for w in words:
|
|
embed.add_field(name=f"id: {w['id']}", value=f"Word: '{w['word']}'\nReward: {w['reward']}\nUsed: {w['asked']}", inline=False)
|
|
await ctx.send(embed=embed)
|
|
except:
|
|
await ctx.send("Something went wrong")
|
|
finally:
|
|
wordgamedb.close()
|
|
|
|
class Quiz(commands.Cog):
|
|
def __init__(self, client):
|
|
self.client = client
|
|
self.interval = (1, 4)
|
|
#self.interval = (5*60, 20*60)
|
|
self.auto = False
|
|
self.timeout = 10*60
|
|
|
|
|
|
@commands.group(name="quiz",case_insensitive=True, invoke_without_command=True)
|
|
@commands.check(checks.isModPlus)
|
|
async def quiz(self, ctx, f, *args):
|
|
pass
|
|
|
|
@quiz.command(name="auto")
|
|
@commands.check(checks.isModPlus)
|
|
async def auto_quiz(self, ctx):
|
|
self.auto = True
|
|
asyncio.create_task(self.start_auto_quiz())
|
|
|
|
@quiz.command(name="stop")
|
|
@commands.check(checks.isModPlus)
|
|
async def auto_quiz(self, ctx):
|
|
self.auto = False
|
|
|
|
|
|
async def start_auto_quiz(self):
|
|
while self.auto:
|
|
await asyncio.sleep(random.randint(self.interval[0], self.interval[1]))
|
|
perc = random.random()
|
|
if perc < 0.5:
|
|
await self.guess_the_number()
|
|
elif perc < 0.8:
|
|
await self.start_word_game()
|
|
else:
|
|
await self.ask_question()
|
|
|
|
@commands.Cog.listener()
|
|
async def on_ready(self):
|
|
self.auto = True
|
|
asyncio.create_task(self.start_auto_quiz())
|
|
|
|
def create_question_embed(self, question, reward):
|
|
embed = discord.Embed()
|
|
embed.colour = discord.Colour.orange()
|
|
|
|
embed.add_field(name="Question:", value=f"{question}", inline=False)
|
|
embed.add_field(name="Reward:", value=f"${reward} x streak in game", inline=False)
|
|
embed.add_field(name="End:", value=f"The quiz will end in 10 minutes!", inline=False)
|
|
return embed
|
|
|
|
def create_answer_embed(self, question, answer, reward, streak=-1, author=None, linked=False, correct=False):
|
|
embed = discord.Embed()
|
|
|
|
embed.add_field(name="Question:", value=f"{question}", inline=False)
|
|
embed.add_field(name="Reward:", value=f"${reward} in game", inline=False)
|
|
embed.add_field(name="Answer:", value=f"{answer}", inline=False)
|
|
|
|
if correct:
|
|
embed.colour = discord.Colour.green()
|
|
if linked:
|
|
embed.add_field(name="Winner:", value=f"{author.mention} 🎉🎉", inline=False)
|
|
embed.add_field(name="Claim:", value=f"Claim your reward in Minecraft by using /redeem")
|
|
if streak > 1:
|
|
embed.add_field(name="Streak:", value=f"{author.mention} has a streak going of {streak}", inline=False)
|
|
else:
|
|
embed.add_field(name="Winner:", value=f"{author.mention} 🎉🎉", inline=False)
|
|
embed.add_field(name="Claim:", value=f"1. Link your account by using /link <MinecraftName> \n2. Claim your reward in Minecraft by using /redeem", inline=False)
|
|
if streak > 1:
|
|
embed.add_field(name="Streak:", value=f"{author.mention} has a streak going of {streak}", inline=False)
|
|
else:
|
|
embed.colour = discord.Colour.red()
|
|
return embed
|
|
|
|
async def guess_the_number(self):
|
|
ranges = {10: 50, 25: 75, 50: 100, 75: 125, 100: 150}
|
|
range = random.choice(list(ranges))
|
|
random_number = random.randint(1, range)
|
|
channel = self.client.get_channel(constants.QuizChannelID)
|
|
await channel.send(embed=self.create_question_embed(f"Guess the number from 1 to {range}", ranges[range]))
|
|
await channel.edit(slowmode_delay=1)
|
|
try:
|
|
message = await self.client.wait_for("message", check=lambda message: message.content.isdigit() and int(message.content) == random_number, timeout=self.timeout)
|
|
streak = self.give_reward(message.author.id, ranges[range])
|
|
playerdblinker = PlayerDBLinker()
|
|
linked = playerdblinker.discordidused(message.author.id)
|
|
await message.channel.send(embed=self.create_answer_embed(f"Guess the number from 1 to {range}", str(random_number), ranges[range] * streak, streak, message.author, linked, True))
|
|
|
|
playerdblinker.close()
|
|
except asyncio.TimeoutError:
|
|
self.reset_current_streak()
|
|
await channel.send("Nobody found the correct answer!")
|
|
await channel.send(embed=self.create_answer_embed(f"Guess the number from 1 to {range}", str(random_number), ranges[range]))
|
|
await channel.edit(slowmode_delay=10)
|
|
|
|
async def ask_question(self):
|
|
question = self.get_random_question()
|
|
self.increment_asked_count(question[0])
|
|
channel = self.client.get_channel(constants.QuizChannelID)
|
|
await channel.send(embed=self.create_question_embed(question[1], question[3]))
|
|
try:
|
|
message = await self.client.wait_for("message", check=lambda message: message.content.lower() == question[2].lower(), timeout=self.timeout)
|
|
streak = self.give_reward(message.author.id, question[3])
|
|
playerdblinker = PlayerDBLinker()
|
|
linked = playerdblinker.discordidused(message.author.id)
|
|
await message.channel.send(embed=self.create_answer_embed(question[1], question[2], question[3] * streak, streak, message.author, linked, True))
|
|
playerdblinker.close()
|
|
except asyncio.TimeoutError:
|
|
self.reset_current_streak()
|
|
await channel.send("Nobody found the correct answer!")
|
|
await channel.send(embed=self.create_answer_embed(question[1], question[2], question[3]))
|
|
|
|
async def start_word_game(self):
|
|
word = self.get_random_word()
|
|
self.increment_asked_word(int(word["id"]))
|
|
channel = self.client.get_channel(constants.QuizChannelID)
|
|
shaken_word = self.shake_word(word["word"].lower())
|
|
await channel.send(embed=self.create_question_embed(f"Find the word from: {shaken_word}", word['reward']))
|
|
try:
|
|
message = await self.client.wait_for("message", check=lambda message: message.content.lower() == word['word'].lower(), timeout=self.timeout)
|
|
streak = self.give_reward(message.author.id, word["reward"])
|
|
playerdblinker = PlayerDBLinker()
|
|
linked = playerdblinker.discordidused(message.author.id)
|
|
await message.channel.send(embed=self.create_answer_embed(f"Find the word from: {shaken_word}", word["word"], word["reward"] * streak, streak, message.author, linked, True))
|
|
|
|
playerdblinker.close()
|
|
except asyncio.TimeoutError:
|
|
self.reset_current_streak()
|
|
await channel.send("Nobody found the correct answer!")
|
|
await channel.send(embed=self.create_answer_embed(f"Find the word from: {shaken_word}", word["word"], word["reward"]))
|
|
|
|
|
|
def shake_word(self, word):
|
|
word_list = list(word)
|
|
random.shuffle(word_list)
|
|
return "".join(word_list)
|
|
|
|
def increment_asked_count(self, q_id):
|
|
quizdb = QuizDB()
|
|
quizdb.question_asked_increment(q_id)
|
|
quizdb.close()
|
|
|
|
def get_random_question(self):
|
|
quizdb = QuizDB()
|
|
q = quizdb.get_random_question()
|
|
quizdb.close()
|
|
return q
|
|
|
|
def get_random_word(self):
|
|
wordgamedb = WordGameDB()
|
|
w = wordgamedb.get_random_word()
|
|
wordgamedb.close()
|
|
return w
|
|
|
|
def increment_asked_word(self, w_id):
|
|
wordgamedb = WordGameDB()
|
|
wordgamedb.word_asked_increment(w_id)
|
|
wordgamedb.close()
|
|
|
|
def reset_current_streak(self):
|
|
quiz_players_db = QuizPlayersDB()
|
|
current = quiz_players_db.get_current_streak()
|
|
if current != None:
|
|
quiz_players_db.reset_streak(current["discordid"])
|
|
|
|
def give_reward(self, discordid, reward):
|
|
quiz_players_db = QuizPlayersDB()
|
|
previousstreak = quiz_players_db.get_current_streak()
|
|
if previousstreak is not None and str(previousstreak["discordid"]) != str(discordid):
|
|
quiz_players_db.reset_streak(previousstreak["discordid"])
|
|
|
|
if previousstreak is not None and str(previousstreak["discordid"]) == str(discordid):
|
|
quiz_players_db.player_won(discordid, reward * (previousstreak["cur_streak"] + 1))
|
|
else:
|
|
quiz_players_db.player_won(discordid, reward)
|
|
|
|
current_streak = quiz_players_db.get_current_streak()
|
|
quiz_players_db.close()
|
|
return current_streak["cur_streak"]
|
|
|
|
|
|
def setup(client):
|
|
client.add_cog(QuizQuestions(client))
|
|
client.add_cog(Quiz(client))
|