Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 108 additions & 20 deletions dozer/cogs/moderation.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from typing import Union

import discord
from discord import app_commands
from discord.ext import tasks, commands
from discord.ext.commands import BadArgument, has_permissions, RoleConverter, guild_only
from discord.utils import escape_markdown
Expand Down Expand Up @@ -377,8 +378,8 @@ async def _unmute(self, member: discord.Member):
return False # member not muted

async def _deafen(self, member: discord.Member, reason: str = "No reason provided", seconds: int = 0,
self_inflicted: bool = False, actor=None,
orig_channel=None):
self_inflicted: bool = False, actor = None,
orig_channel = None):
"""Deafens a user.
member: the member to be deafened
reason: a reason string without a time specifier
Expand All @@ -387,49 +388,71 @@ async def _deafen(self, member: discord.Member, reason: str = "No reason provide
actor: the acting user who requested the mute
orig_channel: the channel of the request origin
"""
results = await Deafen.get_by(guild_id=member.guild.id, member_id=member.id)
results = await Deafen.get_by(guild_id = member.guild.id, member_id = member.id)
deafen = await DeafenRole.get_by(guild_id = member.guild.id)
if not deafen[0].deafen_role:
newrole = await member.guild.create_role(name = "Deafened", reason = "Deafened role for Dozer",
permissions = discord.Permissions.none())
await DeafenRole(guild_id = member.guild.id, deafen_role = newrole.id).update_or_add()
deafen = await DeafenRole.get_by(guild_id = member.guild.id)
else:
deafen = deafen[0]
if results:
await PunishmentTimerRecords.delete(target_id=member.id, guild_id=member.guild.id, type_of_punishment=Deafen.type)
await PunishmentTimerRecords.delete(target_id = member.id, guild_id = member.guild.id,
type_of_punishment = Deafen.type)

await self.restart_all_timers()
self.bot.loop.create_task(
self.punishment_timer(seconds, member,
Deafen,
reason,
actor or member.guild.me,
orig_channel=orig_channel,
global_modlog=not self_inflicted))
orig_channel = orig_channel,
global_modlog = not self_inflicted))
return False
else:
user = Deafen(member_id=member.id, guild_id=member.guild.id, self_inflicted=self_inflicted)
user = Deafen(member_id = member.id, guild_id = member.guild.id, self_inflicted = self_inflicted)
await user.update_or_add()
await self.perm_override(member, read_messages=False)

current_role_ids = [role.id for role in member.roles[1:]]
await member.edit(roles = [member.guild.get_role(deafen.deafen_role)])
for role in current_role_ids:
print(role)
if role != deafen.deafen_role:
print(f"Adding role {role}")
await PausedRole(guild_id = member.guild.id, member_id = member.id, paused_role_id = role).add()
if self_inflicted and seconds == 0:
seconds = 30 # prevent lockout in case of bad argument
self.bot.loop.create_task(
self.punishment_timer(seconds, member,
punishment=Deafen,
reason=reason,
actor=actor or member.guild.me,
orig_channel=orig_channel,
global_modlog=not self_inflicted))
punishment = Deafen,
reason = reason,
actor = actor or member.guild.me,
orig_channel = orig_channel,
global_modlog = not self_inflicted))
return True

async def _undeafen(self, member: discord.Member):
"""Undeafens a user."""
results = await Deafen.get_by(guild_id=member.guild.id, member_id=member.id)
results = await Deafen.get_by(guild_id = member.guild.id, member_id = member.id)
if results:
await self.perm_override(member=member, read_messages=None)
await PunishmentTimerRecords.delete(target_id=member.id, guild_id=member.guild.id,
type_of_punishment=Deafen.type)
# await self.perm_override(member=member, read_messages=None)
deafen = await DeafenRole.get_by(guild_id = member.guild.id)
restore = await PausedRole.get_by(guild_id = member.guild.id, member_id = member.id)
roles = []
print(f"Restore: {restore}")
for role in restore:
if role.paused_role_id != deafen[0].deafen_role:
roles.append(member.guild.get_role(role.paused_role_id))
await member.edit(roles = roles)
await PunishmentTimerRecords.delete(target_id = member.id, guild_id = member.guild.id,
type_of_punishment = Deafen.type)
await self.restart_all_timers()
await Deafen.delete(member_id=member.id, guild_id=member.guild.id)
await PausedRole.delete(member_id = member.id, guild_id = member.guild.id)
await Deafen.delete(member_id = member.id, guild_id = member.guild.id)
truths = [True, results[0].self_inflicted]
return truths
else:
return [False]

"""=== Event handlers ==="""

@Cog.listener('on_ready')
Expand Down Expand Up @@ -773,6 +796,8 @@ async def deafen(self, ctx: DozerContext, member_mentions: discord.Member, *, re

@command()
@bot_has_permissions(manage_permissions=True) # Once instance globally, don't wait instead throw exception
@app_commands.describe(
reason = "time (number followed by s,m,h,d,m,y) and (optional): The reason for deafening yourself")
@discord.ext.commands.max_concurrency(1, wait=False, per=discord.ext.commands.BucketType.default)
@discord.ext.commands.cooldown(rate=10, per=2,
type=discord.ext.commands.BucketType.guild) # 10 seconds per 2 members in the guild
Expand Down Expand Up @@ -1246,6 +1271,69 @@ async def get_by(cls, **kwargs):
obj = MemberRole(member_role=result.get("member_role"), guild_id=result.get("guild_id"))
result_list.append(obj)
return result_list


class DeafenRole(db.DatabaseTable):
"""Holds info on member roles used for deafens"""
__tablename__ = 'deafen_roles'
__uniques__ = 'guild_id'

@classmethod
async def initial_create(cls):
"""Create the table in the database"""
async with db.Pool.acquire() as conn:
await conn.execute(f"""
CREATE TABLE {cls.__tablename__} (
guild_id bigint PRIMARY KEY NOT NULL,
deafen_role bigint null
)""")

def __init__(self, guild_id: int, deafen_role: int = None):
super().__init__()
self.guild_id = guild_id
self.deafen_role = deafen_role

@classmethod
async def get_by(cls, **kwargs):
results = await super().get_by(**kwargs)
result_list = []
for result in results:
obj = DeafenRole(deafen_role = result.get("deafen_role"), guild_id = result.get("guild_id"))
result_list.append(obj)
return result_list


class PausedRole(db.DatabaseTable):
"""Holds all roles that were removed for a deafen/mute"""
__tablename__ = 'paused_roles'
__uniques__ = 'member_id, paused_role_id'

@classmethod
async def initial_create(cls):
"""Create the table in the database"""
async with db.Pool.acquire() as conn:
await conn.execute(f"""
CREATE TABLE {cls.__tablename__} (
guild_id bigint not null,
member_id bigint not null,
paused_role_id bigint not null,
PRIMARY KEY (paused_role_id, member_id)
)""")

def __init__(self, guild_id: int, member_id: int, paused_role_id: int = None):
super().__init__()
self.guild_id = guild_id
self.member_id = member_id
self.paused_role_id = paused_role_id

@classmethod
async def get_by(cls, **kwargs):
results = await super().get_by(**kwargs)
result_list = []
for result in results:
obj = PausedRole(paused_role_id = result.get("paused_role_id"), member_id = result.get("member_id"), guild_id = result.get("guild_id"))
result_list.append(obj)
return result_list


class NewMemPurgeConfig(db.DatabaseTable):
Expand Down
Loading